Skip to main content

alimentar/serve/
raw_source.rs

1//! Raw Source - Handle pasted/clipboard data with automatic format detection
2//!
3//! This module provides functionality for ingesting "raw" data from various
4//! sources like clipboard, stdin, or direct string input. It automatically
5//! detects the format (CSV, JSON, TSV, etc.) and converts to Arrow.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use alimentar::serve::{RawSource, SourceType};
11//!
12//! // From clipboard/pasted text
13//! let raw = RawSource::from_string(pasted_text, SourceType::Clipboard);
14//! let batch = raw.to_arrow()?;
15//! ```
16
17use std::{io::Cursor, sync::Arc};
18
19use arrow::{
20    array::{ArrayRef, RecordBatch, StringArray},
21    datatypes::{DataType, Field, Schema},
22};
23use serde::{Deserialize, Serialize};
24
25use crate::{
26    error::{Error, Result},
27    serve::{
28        content::{
29            ContentMetadata, ContentTypeId, ServeableContent, ValidationError, ValidationReport,
30            ValidationWarning,
31        },
32        schema::{ContentSchema, FieldDefinition, FieldType},
33    },
34};
35
36/// Source type for raw data
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum SourceType {
39    /// Data pasted from clipboard
40    Clipboard,
41    /// Data from stdin
42    Stdin,
43    /// Data from a URL fetch
44    Url,
45    /// Data from a file
46    File,
47    /// Directly provided string
48    Direct,
49    /// Unknown source
50    Unknown,
51}
52
53impl SourceType {
54    /// Get human-readable name
55    pub fn as_str(&self) -> &'static str {
56        match self {
57            Self::Clipboard => "clipboard",
58            Self::Stdin => "stdin",
59            Self::Url => "url",
60            Self::File => "file",
61            Self::Direct => "direct",
62            Self::Unknown => "unknown",
63        }
64    }
65}
66
67impl std::fmt::Display for SourceType {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "{}", self.as_str())
70    }
71}
72
73/// Detected format of raw data
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75pub enum DetectedFormat {
76    /// Comma-separated values
77    Csv,
78    /// Tab-separated values
79    Tsv,
80    /// JSON array or object
81    Json,
82    /// JSON Lines (newline-delimited JSON)
83    JsonLines,
84    /// Plain text (line-per-row)
85    PlainText,
86    /// Could not detect format
87    Unknown,
88}
89
90impl DetectedFormat {
91    /// Get human-readable name
92    pub fn as_str(self) -> &'static str {
93        match self {
94            Self::Csv => "csv",
95            Self::Tsv => "tsv",
96            Self::Json => "json",
97            Self::JsonLines => "jsonl",
98            Self::PlainText => "text",
99            Self::Unknown => "unknown",
100        }
101    }
102}
103
104/// Configuration for raw source parsing
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct RawSourceConfig {
107    /// Force a specific format (skip auto-detection)
108    pub force_format: Option<DetectedFormat>,
109    /// Whether to treat first line as header (for CSV/TSV)
110    pub has_header: Option<bool>,
111    /// Custom delimiter (for CSV-like formats)
112    pub delimiter: Option<char>,
113    /// Maximum rows to parse (for preview/sampling)
114    pub max_rows: Option<usize>,
115    /// Infer types from data (vs all strings)
116    pub infer_types: bool,
117    /// Source description (e.g., "Copied from Excel", "Pasted from website")
118    pub source_description: Option<String>,
119}
120
121impl Default for RawSourceConfig {
122    fn default() -> Self {
123        Self {
124            force_format: None,
125            has_header: None,
126            delimiter: None,
127            max_rows: None,
128            infer_types: true,
129            source_description: None,
130        }
131    }
132}
133
134impl RawSourceConfig {
135    /// Create a new config with default settings
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Force a specific format
141    pub fn with_format(mut self, format: DetectedFormat) -> Self {
142        self.force_format = Some(format);
143        self
144    }
145
146    /// Set whether data has a header
147    pub fn with_header(mut self, has_header: bool) -> Self {
148        self.has_header = Some(has_header);
149        self
150    }
151
152    /// Set custom delimiter
153    pub fn with_delimiter(mut self, delimiter: char) -> Self {
154        self.delimiter = Some(delimiter);
155        self
156    }
157
158    /// Set maximum rows to parse
159    pub fn with_max_rows(mut self, max_rows: usize) -> Self {
160        self.max_rows = Some(max_rows);
161        self
162    }
163
164    /// Set source description
165    pub fn with_description(mut self, description: impl Into<String>) -> Self {
166        self.source_description = Some(description.into());
167        self
168    }
169}
170
171/// Raw source data container
172///
173/// Handles pasted/clipboard data with automatic format detection
174/// and conversion to Arrow RecordBatch.
175#[derive(Debug, Clone)]
176pub struct RawSource {
177    /// The raw string data
178    data: String,
179    /// Source type
180    source_type: SourceType,
181    /// Configuration
182    config: RawSourceConfig,
183    /// Detected format (computed lazily)
184    detected_format: Option<DetectedFormat>,
185    /// Cached Arrow batch
186    cached_batch: Option<RecordBatch>,
187}
188
189impl RawSource {
190    /// Create a new RawSource from a string
191    pub fn from_string(data: impl Into<String>, source_type: SourceType) -> Self {
192        Self {
193            data: data.into(),
194            source_type,
195            config: RawSourceConfig::default(),
196            detected_format: None,
197            cached_batch: None,
198        }
199    }
200
201    /// Create from clipboard content
202    pub fn from_clipboard(data: impl Into<String>) -> Self {
203        Self::from_string(data, SourceType::Clipboard)
204    }
205
206    /// Create from stdin content
207    pub fn from_stdin(data: impl Into<String>) -> Self {
208        Self::from_string(data, SourceType::Stdin)
209    }
210
211    /// Create with custom configuration
212    pub fn with_config(mut self, config: RawSourceConfig) -> Self {
213        self.config = config;
214        self.detected_format = None; // Reset detection
215        self.cached_batch = None;
216        self
217    }
218
219    /// Get the raw data
220    pub fn raw_data(&self) -> &str {
221        &self.data
222    }
223
224    /// Get the source type
225    pub fn source_type(&self) -> SourceType {
226        self.source_type
227    }
228
229    /// Detect the format of the data
230    pub fn detect_format(&self) -> DetectedFormat {
231        if let Some(forced) = self.config.force_format {
232            return forced;
233        }
234
235        let trimmed = self.data.trim();
236
237        // Empty data
238        if trimmed.is_empty() {
239            return DetectedFormat::Unknown;
240        }
241
242        // Check for JSON
243        if trimmed.starts_with('{') || trimmed.starts_with('[') {
244            return DetectedFormat::Json;
245        }
246
247        // Check for JSON Lines (multiple JSON objects per line)
248        let first_line = trimmed.lines().next().unwrap_or("");
249        if first_line.starts_with('{') && first_line.ends_with('}') {
250            let second_line = trimmed.lines().nth(1);
251            if let Some(line) = second_line {
252                if line.starts_with('{') {
253                    return DetectedFormat::JsonLines;
254                }
255            }
256        }
257
258        // Count delimiters in first few lines
259        let sample_lines: Vec<&str> = trimmed.lines().take(5).collect();
260        if sample_lines.is_empty() {
261            return DetectedFormat::PlainText;
262        }
263
264        let comma_count: usize = sample_lines.iter().map(|l| l.matches(',').count()).sum();
265        let tab_count: usize = sample_lines.iter().map(|l| l.matches('\t').count()).sum();
266
267        // Determine format based on delimiter frequency
268        let lines_count = sample_lines.len();
269        let avg_commas = comma_count / lines_count;
270        let avg_tabs = tab_count / lines_count;
271
272        if avg_tabs > 0 && avg_tabs >= avg_commas {
273            DetectedFormat::Tsv
274        } else if avg_commas > 0 {
275            DetectedFormat::Csv
276        } else {
277            DetectedFormat::PlainText
278        }
279    }
280
281    /// Parse the raw data into an Arrow RecordBatch
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the data cannot be parsed (invalid CSV, JSON, etc.).
286    pub fn parse(&mut self) -> Result<RecordBatch> {
287        if let Some(ref batch) = self.cached_batch {
288            return Ok(batch.clone());
289        }
290
291        let format = self.detect_format();
292        self.detected_format = Some(format);
293
294        let batch = match format {
295            DetectedFormat::Csv => self.parse_csv(',')?,
296            DetectedFormat::Tsv => self.parse_csv('\t')?,
297            DetectedFormat::Json => self.parse_json()?,
298            DetectedFormat::JsonLines => self.parse_jsonl()?,
299            DetectedFormat::PlainText | DetectedFormat::Unknown => self.parse_plain_text()?,
300        };
301
302        self.cached_batch = Some(batch.clone());
303        Ok(batch)
304    }
305
306    /// Parse CSV/TSV data
307    fn parse_csv(&self, default_delimiter: char) -> Result<RecordBatch> {
308        use arrow_csv::reader::Format;
309
310        let delimiter = self.config.delimiter.unwrap_or(default_delimiter);
311        let has_header = self.config.has_header.unwrap_or(true);
312
313        // Infer schema using Format
314        let mut cursor_for_infer = Cursor::new(self.data.as_bytes());
315        let format = Format::default()
316            .with_delimiter(delimiter as u8)
317            .with_header(has_header);
318        let (inferred, _) = format
319            .infer_schema(&mut cursor_for_infer, Some(1000))
320            .map_err(|e| Error::transform(format!("Failed to infer CSV schema: {e}")))?;
321
322        let schema = Arc::new(inferred);
323
324        // Build reader with inferred schema
325        let cursor = Cursor::new(self.data.as_bytes());
326        let batch_size = self.config.max_rows.unwrap_or(8192);
327        let builder = arrow_csv::ReaderBuilder::new(schema)
328            .with_delimiter(delimiter as u8)
329            .with_header(has_header)
330            .with_batch_size(batch_size);
331
332        let mut reader = builder
333            .build(cursor)
334            .map_err(|e| Error::transform(format!("Failed to parse CSV: {e}")))?;
335
336        reader
337            .next()
338            .ok_or_else(|| Error::transform("No data in CSV"))?
339            .map_err(|e| Error::transform(format!("Failed to read CSV batch: {e}")))
340    }
341
342    /// Parse JSON data
343    fn parse_json(&self) -> Result<RecordBatch> {
344        let cursor = Cursor::new(self.data.as_bytes());
345
346        // Infer schema from JSON
347        let (schema, _) = arrow_json::reader::infer_json_schema(cursor, Some(100))
348            .map_err(|e| Error::transform(format!("Failed to infer JSON schema: {e}")))?;
349
350        let cursor = Cursor::new(self.data.as_bytes());
351        let mut reader = arrow_json::ReaderBuilder::new(Arc::new(schema))
352            .build(cursor)
353            .map_err(|e| Error::transform(format!("Failed to create JSON reader: {e}")))?;
354
355        reader
356            .next()
357            .ok_or_else(|| Error::transform("No data in JSON"))?
358            .map_err(|e| Error::transform(format!("Failed to read JSON batch: {e}")))
359    }
360
361    /// Parse JSON Lines data
362    fn parse_jsonl(&self) -> Result<RecordBatch> {
363        // JSON Lines is the same as JSON for Arrow reader
364        self.parse_json()
365    }
366
367    /// Parse plain text (one column, one row per line)
368    fn parse_plain_text(&self) -> Result<RecordBatch> {
369        let lines: Vec<&str> = self.data.lines().collect();
370
371        let max_rows = self.config.max_rows.unwrap_or(lines.len());
372        let limited_lines: Vec<&str> = lines.into_iter().take(max_rows).collect();
373
374        let schema = Arc::new(Schema::new(vec![Field::new("line", DataType::Utf8, false)]));
375
376        let array: ArrayRef = Arc::new(StringArray::from(limited_lines));
377
378        RecordBatch::try_new(schema, vec![array])
379            .map_err(|e| Error::transform(format!("Failed to create text batch: {e}")))
380    }
381
382    /// Get the byte size of the raw data
383    pub fn size(&self) -> usize {
384        self.data.len()
385    }
386
387    /// Get the number of lines in the raw data
388    pub fn line_count(&self) -> usize {
389        self.data.lines().count()
390    }
391}
392
393impl ServeableContent for RawSource {
394    fn schema(&self) -> ContentSchema {
395        ContentSchema::new(ContentTypeId::raw(), "1.0")
396            .with_field(
397                FieldDefinition::new("data", FieldType::String)
398                    .with_description("Raw data content"),
399            )
400            .with_field(
401                FieldDefinition::new("source_type", FieldType::String)
402                    .with_description("Source type"),
403            )
404            .with_field(
405                FieldDefinition::new("format", FieldType::String)
406                    .with_description("Detected format"),
407            )
408    }
409
410    fn validate(&self) -> Result<ValidationReport> {
411        let mut report = ValidationReport::success();
412
413        if self.data.is_empty() {
414            return Ok(ValidationReport::failure(vec![ValidationError::new(
415                "data",
416                "Raw data is empty",
417            )]));
418        }
419
420        // Check for potential issues
421        if self.data.len() > 100_000_000 {
422            report = report.with_warning(ValidationWarning::new(
423                "data",
424                "Data size exceeds 100MB, consider chunking",
425            ));
426        }
427
428        // Try to detect format
429        let format = self.detect_format();
430        if format == DetectedFormat::Unknown {
431            report = report.with_warning(ValidationWarning::new(
432                "format",
433                "Could not detect data format, treating as plain text",
434            ));
435        }
436
437        Ok(report)
438    }
439
440    fn to_arrow(&self) -> Result<RecordBatch> {
441        let mut source = self.clone();
442        source.parse()
443    }
444
445    fn metadata(&self) -> ContentMetadata {
446        let format = self.detect_format();
447        let mut meta = ContentMetadata::new(ContentTypeId::raw(), "Raw Data", self.size())
448            .with_source(self.source_type.as_str())
449            .with_row_count(self.line_count())
450            .with_custom("format", serde_json::json!(format.as_str()));
451
452        if let Some(ref desc) = self.config.source_description {
453            meta = meta.with_description(desc.clone());
454        }
455
456        meta
457    }
458
459    fn content_type(&self) -> ContentTypeId {
460        ContentTypeId::raw()
461    }
462
463    fn chunks(&self, _chunk_size: usize) -> Box<dyn Iterator<Item = Result<RecordBatch>> + Send> {
464        // For raw source, we just return a single batch
465        let batch_result = self.clone().parse();
466        Box::new(std::iter::once(batch_result))
467    }
468
469    fn to_bytes(&self) -> Result<Vec<u8>> {
470        Ok(self.data.as_bytes().to_vec())
471    }
472}
473
474#[cfg(test)]
475#[allow(clippy::unwrap_used)]
476mod tests {
477    use super::*;
478
479    #[test]
480    fn test_detect_csv() {
481        let csv_data = "name,age,city\nAlice,30,NYC\nBob,25,LA";
482        let source = RawSource::from_clipboard(csv_data);
483        assert_eq!(source.detect_format(), DetectedFormat::Csv);
484    }
485
486    #[test]
487    fn test_detect_tsv() {
488        let tsv_data = "name\tage\tcity\nAlice\t30\tNYC\nBob\t25\tLA";
489        let source = RawSource::from_clipboard(tsv_data);
490        assert_eq!(source.detect_format(), DetectedFormat::Tsv);
491    }
492
493    #[test]
494    fn test_detect_json() {
495        let json_data = r#"[{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]"#;
496        let source = RawSource::from_clipboard(json_data);
497        assert_eq!(source.detect_format(), DetectedFormat::Json);
498    }
499
500    #[test]
501    fn test_detect_json_object() {
502        let json_data = r#"{"users": [{"name": "Alice"}]}"#;
503        let source = RawSource::from_clipboard(json_data);
504        assert_eq!(source.detect_format(), DetectedFormat::Json);
505    }
506
507    #[test]
508    fn test_detect_plain_text() {
509        let text_data = "Hello world\nThis is plain text\nNo delimiters here";
510        let source = RawSource::from_clipboard(text_data);
511        assert_eq!(source.detect_format(), DetectedFormat::PlainText);
512    }
513
514    #[test]
515    fn test_force_format() {
516        let data = "Hello world";
517        let config = RawSourceConfig::new().with_format(DetectedFormat::Csv);
518        let source = RawSource::from_clipboard(data).with_config(config);
519        assert_eq!(source.detect_format(), DetectedFormat::Csv);
520    }
521
522    #[test]
523    fn test_parse_plain_text() {
524        let text_data = "Line 1\nLine 2\nLine 3";
525        let mut source = RawSource::from_clipboard(text_data);
526        let batch = source.parse().unwrap();
527
528        assert_eq!(batch.num_rows(), 3);
529        assert_eq!(batch.num_columns(), 1);
530        assert_eq!(batch.schema().field(0).name(), "line");
531    }
532
533    #[test]
534    fn test_parse_csv() {
535        let csv_data = "name,age\nAlice,30\nBob,25";
536        let mut source = RawSource::from_clipboard(csv_data);
537        let batch = source.parse().unwrap();
538
539        assert_eq!(batch.num_rows(), 2);
540        assert_eq!(batch.num_columns(), 2);
541    }
542
543    #[test]
544    fn test_source_type() {
545        let source = RawSource::from_clipboard("data");
546        assert_eq!(source.source_type(), SourceType::Clipboard);
547
548        let source = RawSource::from_stdin("data");
549        assert_eq!(source.source_type(), SourceType::Stdin);
550    }
551
552    #[test]
553    fn test_validation_empty() {
554        let source = RawSource::from_clipboard("");
555        let report = source.validate().unwrap();
556        assert!(!report.valid);
557    }
558
559    #[test]
560    fn test_validation_success() {
561        let source = RawSource::from_clipboard("some data");
562        let report = source.validate().unwrap();
563        assert!(report.valid);
564    }
565
566    #[test]
567    fn test_metadata() {
568        let source = RawSource::from_clipboard("test data")
569            .with_config(RawSourceConfig::new().with_description("Copied from spreadsheet"));
570
571        let meta = source.metadata();
572        assert_eq!(meta.content_type, ContentTypeId::raw());
573        assert_eq!(meta.source, Some("clipboard".to_string()));
574        assert_eq!(
575            meta.description,
576            Some("Copied from spreadsheet".to_string())
577        );
578    }
579
580    #[test]
581    fn test_max_rows() {
582        let text_data = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5";
583        let config = RawSourceConfig::new().with_max_rows(3);
584        let mut source = RawSource::from_clipboard(text_data).with_config(config);
585        let batch = source.parse().unwrap();
586
587        assert_eq!(batch.num_rows(), 3);
588    }
589
590    #[test]
591    fn test_parse_tsv() {
592        let tsv_data = "name\tage\nAlice\t30\nBob\t25";
593        let mut source = RawSource::from_clipboard(tsv_data);
594        let batch = source.parse().unwrap();
595
596        assert_eq!(batch.num_rows(), 2);
597        assert_eq!(batch.num_columns(), 2);
598    }
599
600    #[test]
601    fn test_parse_json() {
602        // Arrow JSON reader expects newline-delimited JSON objects
603        let json_data = "{\"name\":\"Alice\",\"age\":30}\n{\"name\":\"Bob\",\"age\":25}";
604        let mut source = RawSource::from_clipboard(json_data);
605        let batch = source.parse().unwrap();
606
607        assert!(batch.num_rows() >= 1);
608    }
609
610    #[test]
611    fn test_to_bytes() {
612        let source = RawSource::from_clipboard("test data");
613        let bytes = source.to_bytes().unwrap();
614        assert_eq!(bytes, b"test data");
615    }
616
617    #[test]
618    fn test_chunks() {
619        let source = RawSource::from_clipboard("name,age\nAlice,30");
620        let chunks: Vec<_> = source.chunks(100).collect();
621        assert_eq!(chunks.len(), 1);
622        assert!(chunks[0].is_ok());
623    }
624
625    #[test]
626    fn test_schema() {
627        let source = RawSource::from_clipboard("test");
628        let schema = source.schema();
629        assert_eq!(schema.content_type, ContentTypeId::raw());
630        assert!(schema.get_field("data").is_some());
631        assert!(schema.get_field("source_type").is_some());
632        assert!(schema.get_field("format").is_some());
633    }
634
635    #[test]
636    fn test_content_type() {
637        let source = RawSource::from_clipboard("test");
638        assert_eq!(source.content_type(), ContentTypeId::raw());
639    }
640
641    #[test]
642    fn test_from_string() {
643        let source = RawSource::from_string("test", SourceType::Direct);
644        assert_eq!(source.source_type(), SourceType::Direct);
645    }
646
647    #[test]
648    fn test_line_count() {
649        let source = RawSource::from_clipboard("line1\nline2\nline3");
650        assert_eq!(source.line_count(), 3);
651    }
652
653    #[test]
654    fn test_detected_format_as_str() {
655        assert_eq!(DetectedFormat::Csv.as_str(), "csv");
656        assert_eq!(DetectedFormat::Tsv.as_str(), "tsv");
657        assert_eq!(DetectedFormat::Json.as_str(), "json");
658        assert_eq!(DetectedFormat::JsonLines.as_str(), "jsonl");
659        assert_eq!(DetectedFormat::PlainText.as_str(), "text");
660        assert_eq!(DetectedFormat::Unknown.as_str(), "unknown");
661    }
662
663    #[test]
664    fn test_source_type_as_str() {
665        assert_eq!(SourceType::Clipboard.as_str(), "clipboard");
666        assert_eq!(SourceType::Stdin.as_str(), "stdin");
667        assert_eq!(SourceType::Url.as_str(), "url");
668        assert_eq!(SourceType::File.as_str(), "file");
669        assert_eq!(SourceType::Direct.as_str(), "direct");
670        assert_eq!(SourceType::Unknown.as_str(), "unknown");
671    }
672
673    #[test]
674    fn test_config_with_delimiter() {
675        let config = RawSourceConfig::new().with_delimiter(';');
676        assert_eq!(config.delimiter, Some(';'));
677    }
678
679    #[test]
680    fn test_config_with_header() {
681        let config = RawSourceConfig::new().with_header(false);
682        assert_eq!(config.has_header, Some(false));
683    }
684
685    #[test]
686    fn test_config_default() {
687        let config = RawSourceConfig::default();
688        assert!(config.delimiter.is_none());
689        assert!(config.has_header.is_none());
690        assert!(config.max_rows.is_none());
691    }
692
693    #[test]
694    fn test_cached_batch() {
695        let mut source = RawSource::from_clipboard("name,age\nAlice,30");
696
697        // First parse
698        let batch1 = source.parse().unwrap();
699
700        // Second parse should return cached
701        let batch2 = source.parse().unwrap();
702
703        assert_eq!(batch1.num_rows(), batch2.num_rows());
704    }
705
706    #[test]
707    fn test_large_data_validation() {
708        // Large data should still validate as OK
709        let large_data = "x\n".repeat(2000);
710        let source = RawSource::from_clipboard(&large_data);
711        let report = source.validate().unwrap();
712        assert!(report.valid);
713    }
714}