1use std::{io::Cursor, sync::Arc};
18
19use arrow::{
20 array::{ArrayRef, RecordBatch, StringArray},
21 datatypes::{DataType, Field, Schema},
22};
23use serde::{Deserialize, Serialize};
24
25use crate::{
26 error::{Error, Result},
27 serve::{
28 content::{
29 ContentMetadata, ContentTypeId, ServeableContent, ValidationError, ValidationReport,
30 ValidationWarning,
31 },
32 schema::{ContentSchema, FieldDefinition, FieldType},
33 },
34};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum SourceType {
39 Clipboard,
41 Stdin,
43 Url,
45 File,
47 Direct,
49 Unknown,
51}
52
53impl SourceType {
54 pub fn as_str(&self) -> &'static str {
56 match self {
57 Self::Clipboard => "clipboard",
58 Self::Stdin => "stdin",
59 Self::Url => "url",
60 Self::File => "file",
61 Self::Direct => "direct",
62 Self::Unknown => "unknown",
63 }
64 }
65}
66
67impl std::fmt::Display for SourceType {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(f, "{}", self.as_str())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum DetectedFormat {
76 Csv,
78 Tsv,
80 Json,
82 JsonLines,
84 PlainText,
86 Unknown,
88}
89
90impl DetectedFormat {
91 pub fn as_str(self) -> &'static str {
93 match self {
94 Self::Csv => "csv",
95 Self::Tsv => "tsv",
96 Self::Json => "json",
97 Self::JsonLines => "jsonl",
98 Self::PlainText => "text",
99 Self::Unknown => "unknown",
100 }
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct RawSourceConfig {
107 pub force_format: Option<DetectedFormat>,
109 pub has_header: Option<bool>,
111 pub delimiter: Option<char>,
113 pub max_rows: Option<usize>,
115 pub infer_types: bool,
117 pub source_description: Option<String>,
119}
120
121impl Default for RawSourceConfig {
122 fn default() -> Self {
123 Self {
124 force_format: None,
125 has_header: None,
126 delimiter: None,
127 max_rows: None,
128 infer_types: true,
129 source_description: None,
130 }
131 }
132}
133
134impl RawSourceConfig {
135 pub fn new() -> Self {
137 Self::default()
138 }
139
140 pub fn with_format(mut self, format: DetectedFormat) -> Self {
142 self.force_format = Some(format);
143 self
144 }
145
146 pub fn with_header(mut self, has_header: bool) -> Self {
148 self.has_header = Some(has_header);
149 self
150 }
151
152 pub fn with_delimiter(mut self, delimiter: char) -> Self {
154 self.delimiter = Some(delimiter);
155 self
156 }
157
158 pub fn with_max_rows(mut self, max_rows: usize) -> Self {
160 self.max_rows = Some(max_rows);
161 self
162 }
163
164 pub fn with_description(mut self, description: impl Into<String>) -> Self {
166 self.source_description = Some(description.into());
167 self
168 }
169}
170
171#[derive(Debug, Clone)]
176pub struct RawSource {
177 data: String,
179 source_type: SourceType,
181 config: RawSourceConfig,
183 detected_format: Option<DetectedFormat>,
185 cached_batch: Option<RecordBatch>,
187}
188
189impl RawSource {
190 pub fn from_string(data: impl Into<String>, source_type: SourceType) -> Self {
192 Self {
193 data: data.into(),
194 source_type,
195 config: RawSourceConfig::default(),
196 detected_format: None,
197 cached_batch: None,
198 }
199 }
200
201 pub fn from_clipboard(data: impl Into<String>) -> Self {
203 Self::from_string(data, SourceType::Clipboard)
204 }
205
206 pub fn from_stdin(data: impl Into<String>) -> Self {
208 Self::from_string(data, SourceType::Stdin)
209 }
210
211 pub fn with_config(mut self, config: RawSourceConfig) -> Self {
213 self.config = config;
214 self.detected_format = None; self.cached_batch = None;
216 self
217 }
218
219 pub fn raw_data(&self) -> &str {
221 &self.data
222 }
223
224 pub fn source_type(&self) -> SourceType {
226 self.source_type
227 }
228
229 pub fn detect_format(&self) -> DetectedFormat {
231 if let Some(forced) = self.config.force_format {
232 return forced;
233 }
234
235 let trimmed = self.data.trim();
236
237 if trimmed.is_empty() {
239 return DetectedFormat::Unknown;
240 }
241
242 if trimmed.starts_with('{') || trimmed.starts_with('[') {
244 return DetectedFormat::Json;
245 }
246
247 let first_line = trimmed.lines().next().unwrap_or("");
249 if first_line.starts_with('{') && first_line.ends_with('}') {
250 let second_line = trimmed.lines().nth(1);
251 if let Some(line) = second_line {
252 if line.starts_with('{') {
253 return DetectedFormat::JsonLines;
254 }
255 }
256 }
257
258 let sample_lines: Vec<&str> = trimmed.lines().take(5).collect();
260 if sample_lines.is_empty() {
261 return DetectedFormat::PlainText;
262 }
263
264 let comma_count: usize = sample_lines.iter().map(|l| l.matches(',').count()).sum();
265 let tab_count: usize = sample_lines.iter().map(|l| l.matches('\t').count()).sum();
266
267 let lines_count = sample_lines.len();
269 let avg_commas = comma_count / lines_count;
270 let avg_tabs = tab_count / lines_count;
271
272 if avg_tabs > 0 && avg_tabs >= avg_commas {
273 DetectedFormat::Tsv
274 } else if avg_commas > 0 {
275 DetectedFormat::Csv
276 } else {
277 DetectedFormat::PlainText
278 }
279 }
280
281 pub fn parse(&mut self) -> Result<RecordBatch> {
287 if let Some(ref batch) = self.cached_batch {
288 return Ok(batch.clone());
289 }
290
291 let format = self.detect_format();
292 self.detected_format = Some(format);
293
294 let batch = match format {
295 DetectedFormat::Csv => self.parse_csv(',')?,
296 DetectedFormat::Tsv => self.parse_csv('\t')?,
297 DetectedFormat::Json => self.parse_json()?,
298 DetectedFormat::JsonLines => self.parse_jsonl()?,
299 DetectedFormat::PlainText | DetectedFormat::Unknown => self.parse_plain_text()?,
300 };
301
302 self.cached_batch = Some(batch.clone());
303 Ok(batch)
304 }
305
306 fn parse_csv(&self, default_delimiter: char) -> Result<RecordBatch> {
308 use arrow_csv::reader::Format;
309
310 let delimiter = self.config.delimiter.unwrap_or(default_delimiter);
311 let has_header = self.config.has_header.unwrap_or(true);
312
313 let mut cursor_for_infer = Cursor::new(self.data.as_bytes());
315 let format = Format::default()
316 .with_delimiter(delimiter as u8)
317 .with_header(has_header);
318 let (inferred, _) = format
319 .infer_schema(&mut cursor_for_infer, Some(1000))
320 .map_err(|e| Error::transform(format!("Failed to infer CSV schema: {e}")))?;
321
322 let schema = Arc::new(inferred);
323
324 let cursor = Cursor::new(self.data.as_bytes());
326 let batch_size = self.config.max_rows.unwrap_or(8192);
327 let builder = arrow_csv::ReaderBuilder::new(schema)
328 .with_delimiter(delimiter as u8)
329 .with_header(has_header)
330 .with_batch_size(batch_size);
331
332 let mut reader = builder
333 .build(cursor)
334 .map_err(|e| Error::transform(format!("Failed to parse CSV: {e}")))?;
335
336 reader
337 .next()
338 .ok_or_else(|| Error::transform("No data in CSV"))?
339 .map_err(|e| Error::transform(format!("Failed to read CSV batch: {e}")))
340 }
341
342 fn parse_json(&self) -> Result<RecordBatch> {
344 let cursor = Cursor::new(self.data.as_bytes());
345
346 let (schema, _) = arrow_json::reader::infer_json_schema(cursor, Some(100))
348 .map_err(|e| Error::transform(format!("Failed to infer JSON schema: {e}")))?;
349
350 let cursor = Cursor::new(self.data.as_bytes());
351 let mut reader = arrow_json::ReaderBuilder::new(Arc::new(schema))
352 .build(cursor)
353 .map_err(|e| Error::transform(format!("Failed to create JSON reader: {e}")))?;
354
355 reader
356 .next()
357 .ok_or_else(|| Error::transform("No data in JSON"))?
358 .map_err(|e| Error::transform(format!("Failed to read JSON batch: {e}")))
359 }
360
361 fn parse_jsonl(&self) -> Result<RecordBatch> {
363 self.parse_json()
365 }
366
367 fn parse_plain_text(&self) -> Result<RecordBatch> {
369 let lines: Vec<&str> = self.data.lines().collect();
370
371 let max_rows = self.config.max_rows.unwrap_or(lines.len());
372 let limited_lines: Vec<&str> = lines.into_iter().take(max_rows).collect();
373
374 let schema = Arc::new(Schema::new(vec![Field::new("line", DataType::Utf8, false)]));
375
376 let array: ArrayRef = Arc::new(StringArray::from(limited_lines));
377
378 RecordBatch::try_new(schema, vec![array])
379 .map_err(|e| Error::transform(format!("Failed to create text batch: {e}")))
380 }
381
382 pub fn size(&self) -> usize {
384 self.data.len()
385 }
386
387 pub fn line_count(&self) -> usize {
389 self.data.lines().count()
390 }
391}
392
393impl ServeableContent for RawSource {
394 fn schema(&self) -> ContentSchema {
395 ContentSchema::new(ContentTypeId::raw(), "1.0")
396 .with_field(
397 FieldDefinition::new("data", FieldType::String)
398 .with_description("Raw data content"),
399 )
400 .with_field(
401 FieldDefinition::new("source_type", FieldType::String)
402 .with_description("Source type"),
403 )
404 .with_field(
405 FieldDefinition::new("format", FieldType::String)
406 .with_description("Detected format"),
407 )
408 }
409
410 fn validate(&self) -> Result<ValidationReport> {
411 let mut report = ValidationReport::success();
412
413 if self.data.is_empty() {
414 return Ok(ValidationReport::failure(vec![ValidationError::new(
415 "data",
416 "Raw data is empty",
417 )]));
418 }
419
420 if self.data.len() > 100_000_000 {
422 report = report.with_warning(ValidationWarning::new(
423 "data",
424 "Data size exceeds 100MB, consider chunking",
425 ));
426 }
427
428 let format = self.detect_format();
430 if format == DetectedFormat::Unknown {
431 report = report.with_warning(ValidationWarning::new(
432 "format",
433 "Could not detect data format, treating as plain text",
434 ));
435 }
436
437 Ok(report)
438 }
439
440 fn to_arrow(&self) -> Result<RecordBatch> {
441 let mut source = self.clone();
442 source.parse()
443 }
444
445 fn metadata(&self) -> ContentMetadata {
446 let format = self.detect_format();
447 let mut meta = ContentMetadata::new(ContentTypeId::raw(), "Raw Data", self.size())
448 .with_source(self.source_type.as_str())
449 .with_row_count(self.line_count())
450 .with_custom("format", serde_json::json!(format.as_str()));
451
452 if let Some(ref desc) = self.config.source_description {
453 meta = meta.with_description(desc.clone());
454 }
455
456 meta
457 }
458
459 fn content_type(&self) -> ContentTypeId {
460 ContentTypeId::raw()
461 }
462
463 fn chunks(&self, _chunk_size: usize) -> Box<dyn Iterator<Item = Result<RecordBatch>> + Send> {
464 let batch_result = self.clone().parse();
466 Box::new(std::iter::once(batch_result))
467 }
468
469 fn to_bytes(&self) -> Result<Vec<u8>> {
470 Ok(self.data.as_bytes().to_vec())
471 }
472}
473
474#[cfg(test)]
475#[allow(clippy::unwrap_used)]
476mod tests {
477 use super::*;
478
479 #[test]
480 fn test_detect_csv() {
481 let csv_data = "name,age,city\nAlice,30,NYC\nBob,25,LA";
482 let source = RawSource::from_clipboard(csv_data);
483 assert_eq!(source.detect_format(), DetectedFormat::Csv);
484 }
485
486 #[test]
487 fn test_detect_tsv() {
488 let tsv_data = "name\tage\tcity\nAlice\t30\tNYC\nBob\t25\tLA";
489 let source = RawSource::from_clipboard(tsv_data);
490 assert_eq!(source.detect_format(), DetectedFormat::Tsv);
491 }
492
493 #[test]
494 fn test_detect_json() {
495 let json_data = r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#;
496 let source = RawSource::from_clipboard(json_data);
497 assert_eq!(source.detect_format(), DetectedFormat::Json);
498 }
499
500 #[test]
501 fn test_detect_json_object() {
502 let json_data = r#"{"users": [{"name": "Alice"}]}"#;
503 let source = RawSource::from_clipboard(json_data);
504 assert_eq!(source.detect_format(), DetectedFormat::Json);
505 }
506
507 #[test]
508 fn test_detect_plain_text() {
509 let text_data = "Hello world\nThis is plain text\nNo delimiters here";
510 let source = RawSource::from_clipboard(text_data);
511 assert_eq!(source.detect_format(), DetectedFormat::PlainText);
512 }
513
514 #[test]
515 fn test_force_format() {
516 let data = "Hello world";
517 let config = RawSourceConfig::new().with_format(DetectedFormat::Csv);
518 let source = RawSource::from_clipboard(data).with_config(config);
519 assert_eq!(source.detect_format(), DetectedFormat::Csv);
520 }
521
522 #[test]
523 fn test_parse_plain_text() {
524 let text_data = "Line 1\nLine 2\nLine 3";
525 let mut source = RawSource::from_clipboard(text_data);
526 let batch = source.parse().unwrap();
527
528 assert_eq!(batch.num_rows(), 3);
529 assert_eq!(batch.num_columns(), 1);
530 assert_eq!(batch.schema().field(0).name(), "line");
531 }
532
533 #[test]
534 fn test_parse_csv() {
535 let csv_data = "name,age\nAlice,30\nBob,25";
536 let mut source = RawSource::from_clipboard(csv_data);
537 let batch = source.parse().unwrap();
538
539 assert_eq!(batch.num_rows(), 2);
540 assert_eq!(batch.num_columns(), 2);
541 }
542
543 #[test]
544 fn test_source_type() {
545 let source = RawSource::from_clipboard("data");
546 assert_eq!(source.source_type(), SourceType::Clipboard);
547
548 let source = RawSource::from_stdin("data");
549 assert_eq!(source.source_type(), SourceType::Stdin);
550 }
551
552 #[test]
553 fn test_validation_empty() {
554 let source = RawSource::from_clipboard("");
555 let report = source.validate().unwrap();
556 assert!(!report.valid);
557 }
558
559 #[test]
560 fn test_validation_success() {
561 let source = RawSource::from_clipboard("some data");
562 let report = source.validate().unwrap();
563 assert!(report.valid);
564 }
565
566 #[test]
567 fn test_metadata() {
568 let source = RawSource::from_clipboard("test data")
569 .with_config(RawSourceConfig::new().with_description("Copied from spreadsheet"));
570
571 let meta = source.metadata();
572 assert_eq!(meta.content_type, ContentTypeId::raw());
573 assert_eq!(meta.source, Some("clipboard".to_string()));
574 assert_eq!(
575 meta.description,
576 Some("Copied from spreadsheet".to_string())
577 );
578 }
579
580 #[test]
581 fn test_max_rows() {
582 let text_data = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5";
583 let config = RawSourceConfig::new().with_max_rows(3);
584 let mut source = RawSource::from_clipboard(text_data).with_config(config);
585 let batch = source.parse().unwrap();
586
587 assert_eq!(batch.num_rows(), 3);
588 }
589
590 #[test]
591 fn test_parse_tsv() {
592 let tsv_data = "name\tage\nAlice\t30\nBob\t25";
593 let mut source = RawSource::from_clipboard(tsv_data);
594 let batch = source.parse().unwrap();
595
596 assert_eq!(batch.num_rows(), 2);
597 assert_eq!(batch.num_columns(), 2);
598 }
599
600 #[test]
601 fn test_parse_json() {
602 let json_data = "{\"name\":\"Alice\",\"age\":30}\n{\"name\":\"Bob\",\"age\":25}";
604 let mut source = RawSource::from_clipboard(json_data);
605 let batch = source.parse().unwrap();
606
607 assert!(batch.num_rows() >= 1);
608 }
609
610 #[test]
611 fn test_to_bytes() {
612 let source = RawSource::from_clipboard("test data");
613 let bytes = source.to_bytes().unwrap();
614 assert_eq!(bytes, b"test data");
615 }
616
617 #[test]
618 fn test_chunks() {
619 let source = RawSource::from_clipboard("name,age\nAlice,30");
620 let chunks: Vec<_> = source.chunks(100).collect();
621 assert_eq!(chunks.len(), 1);
622 assert!(chunks[0].is_ok());
623 }
624
625 #[test]
626 fn test_schema() {
627 let source = RawSource::from_clipboard("test");
628 let schema = source.schema();
629 assert_eq!(schema.content_type, ContentTypeId::raw());
630 assert!(schema.get_field("data").is_some());
631 assert!(schema.get_field("source_type").is_some());
632 assert!(schema.get_field("format").is_some());
633 }
634
635 #[test]
636 fn test_content_type() {
637 let source = RawSource::from_clipboard("test");
638 assert_eq!(source.content_type(), ContentTypeId::raw());
639 }
640
641 #[test]
642 fn test_from_string() {
643 let source = RawSource::from_string("test", SourceType::Direct);
644 assert_eq!(source.source_type(), SourceType::Direct);
645 }
646
647 #[test]
648 fn test_line_count() {
649 let source = RawSource::from_clipboard("line1\nline2\nline3");
650 assert_eq!(source.line_count(), 3);
651 }
652
653 #[test]
654 fn test_detected_format_as_str() {
655 assert_eq!(DetectedFormat::Csv.as_str(), "csv");
656 assert_eq!(DetectedFormat::Tsv.as_str(), "tsv");
657 assert_eq!(DetectedFormat::Json.as_str(), "json");
658 assert_eq!(DetectedFormat::JsonLines.as_str(), "jsonl");
659 assert_eq!(DetectedFormat::PlainText.as_str(), "text");
660 assert_eq!(DetectedFormat::Unknown.as_str(), "unknown");
661 }
662
663 #[test]
664 fn test_source_type_as_str() {
665 assert_eq!(SourceType::Clipboard.as_str(), "clipboard");
666 assert_eq!(SourceType::Stdin.as_str(), "stdin");
667 assert_eq!(SourceType::Url.as_str(), "url");
668 assert_eq!(SourceType::File.as_str(), "file");
669 assert_eq!(SourceType::Direct.as_str(), "direct");
670 assert_eq!(SourceType::Unknown.as_str(), "unknown");
671 }
672
673 #[test]
674 fn test_config_with_delimiter() {
675 let config = RawSourceConfig::new().with_delimiter(';');
676 assert_eq!(config.delimiter, Some(';'));
677 }
678
679 #[test]
680 fn test_config_with_header() {
681 let config = RawSourceConfig::new().with_header(false);
682 assert_eq!(config.has_header, Some(false));
683 }
684
685 #[test]
686 fn test_config_default() {
687 let config = RawSourceConfig::default();
688 assert!(config.delimiter.is_none());
689 assert!(config.has_header.is_none());
690 assert!(config.max_rows.is_none());
691 }
692
693 #[test]
694 fn test_cached_batch() {
695 let mut source = RawSource::from_clipboard("name,age\nAlice,30");
696
697 let batch1 = source.parse().unwrap();
699
700 let batch2 = source.parse().unwrap();
702
703 assert_eq!(batch1.num_rows(), batch2.num_rows());
704 }
705
706 #[test]
707 fn test_large_data_validation() {
708 let large_data = "x\n".repeat(2000);
710 let source = RawSource::from_clipboard(&large_data);
711 let report = source.validate().unwrap();
712 assert!(report.valid);
713 }
714}