Skip to main content

reddb_server/storage/import/
jsonl.rs

1//! JSONL (JSON Lines) Importer
2//!
3//! Imports data from JSONL (newline-delimited JSON) files into Store.
4//! Supports streaming import for large files.
5//!
6//! # Format
7//!
8//! Each line is a valid JSON object:
9//! ```text
10//! {"id": "1", "name": "Alice", "embedding": [0.1, 0.2, 0.3]}
11//! {"id": "2", "name": "Bob", "embedding": [0.4, 0.5, 0.6]}
12//! ```
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! let importer = JsonlImporter::new(JsonlConfig {
18//!     id_field: Some("id".to_string()),
19//!     embedding_field: Some("embedding".to_string()),
20//!     batch_size: 1000,
21//! });
22//! let stats = importer.import_file("data.jsonl", &mut store)?;
23//! ```
24
25use crate::storage::schema::types::Value;
26use crate::storage::Store;
27use crate::storage::{EntityData, EntityKind, RowData, UnifiedEntity, VectorData};
28use std::collections::HashMap;
29use std::fs::File;
30use std::io::{BufRead, BufReader};
31use std::path::Path;
32use std::sync::Arc;
33
34/// JSONL import configuration
35#[derive(Debug, Clone)]
36pub struct JsonlConfig {
37    /// Field to use as entity ID (auto-generated if None)
38    pub id_field: Option<String>,
39    /// Field containing vector embedding (if any)
40    pub embedding_field: Option<String>,
41    /// Collection/table name
42    pub collection: String,
43    /// Number of records to process in each batch
44    pub batch_size: usize,
45    /// Skip lines that fail to parse
46    pub skip_errors: bool,
47    /// Maximum lines to import (None for all)
48    pub max_lines: Option<usize>,
49}
50
51impl Default for JsonlConfig {
52    fn default() -> Self {
53        Self {
54            id_field: None,
55            embedding_field: None,
56            collection: "imported".to_string(),
57            batch_size: 1000,
58            skip_errors: false,
59            max_lines: None,
60        }
61    }
62}
63
64/// Import statistics
65#[derive(Debug, Clone, Default)]
66pub struct ImportStats {
67    /// Total lines processed
68    pub lines_processed: usize,
69    /// Records successfully imported
70    pub records_imported: usize,
71    /// Lines skipped due to errors
72    pub errors_skipped: usize,
73    /// Import duration in milliseconds
74    pub duration_ms: u64,
75}
76
77/// JSONL importer
78pub struct JsonlImporter {
79    config: JsonlConfig,
80}
81
82impl JsonlImporter {
83    /// Create a new JSONL importer with configuration
84    pub fn new(config: JsonlConfig) -> Self {
85        Self { config }
86    }
87
88    /// Create a default importer
89    pub fn with_defaults() -> Self {
90        Self::new(JsonlConfig::default())
91    }
92
93    /// Import from a file path
94    pub fn import_file<P: AsRef<Path>>(
95        &self,
96        path: P,
97        store: &mut Store,
98    ) -> Result<ImportStats, JsonlError> {
99        let file = File::open(path.as_ref()).map_err(|e| JsonlError::Io(e.to_string()))?;
100        let reader = BufReader::new(file);
101        self.import_reader(reader, store)
102    }
103
104    /// Import from any reader (for flexibility)
105    pub fn import_reader<R: BufRead>(
106        &self,
107        reader: R,
108        store: &mut Store,
109    ) -> Result<ImportStats, JsonlError> {
110        let start = std::time::Instant::now();
111        let mut stats = ImportStats::default();
112
113        for (line_num, line_result) in reader.lines().enumerate() {
114            // Check max lines limit
115            if let Some(max) = self.config.max_lines {
116                if stats.lines_processed >= max {
117                    break;
118                }
119            }
120
121            stats.lines_processed += 1;
122
123            let line = match line_result {
124                Ok(l) => l,
125                Err(e) => {
126                    if self.config.skip_errors {
127                        stats.errors_skipped += 1;
128                        continue;
129                    }
130                    return Err(JsonlError::Io(format!("Line {}: {}", line_num + 1, e)));
131                }
132            };
133
134            // Skip empty lines
135            let trimmed = line.trim();
136            if trimmed.is_empty() {
137                continue;
138            }
139
140            // Parse JSON
141            match self.parse_and_insert(&line, store) {
142                Ok(()) => {
143                    stats.records_imported += 1;
144                }
145                Err(e) => {
146                    if self.config.skip_errors {
147                        stats.errors_skipped += 1;
148                        continue;
149                    }
150                    return Err(JsonlError::Parse(format!("Line {}: {}", line_num + 1, e)));
151                }
152            }
153        }
154
155        stats.duration_ms = start.elapsed().as_millis() as u64;
156        Ok(stats)
157    }
158
159    /// Parse a single JSON line and insert into store
160    fn parse_and_insert(&self, line: &str, store: &mut Store) -> Result<(), String> {
161        let json = parse_json_object(line)?;
162
163        // Extract embedding if configured
164        let embedding = if let Some(ref emb_field) = self.config.embedding_field {
165            json.get(emb_field).and_then(|v| {
166                if let JsonValue::Array(arr) = v {
167                    let floats: Option<Vec<f32>> = arr
168                        .iter()
169                        .map(|v| match v {
170                            JsonValue::Number(n) => Some(*n as f32),
171                            _ => None,
172                        })
173                        .collect();
174                    floats
175                } else {
176                    None
177                }
178            })
179        } else {
180            None
181        };
182
183        // Convert JSON to row data
184        let mut named = HashMap::new();
185        for (key, value) in &json {
186            // Skip embedding field (handled separately)
187            if self
188                .config
189                .embedding_field
190                .as_ref()
191                .map(|f| f == key)
192                .unwrap_or(false)
193            {
194                continue;
195            }
196
197            named.insert(key.clone(), json_to_value(value));
198        }
199
200        // Generate IDs
201        let entity_id = store.next_entity_id();
202        let row_id = entity_id.0;
203
204        // Create entity
205        let entity = if let Some(emb) = embedding {
206            // Entity with embedding - store as vector
207
208            UnifiedEntity::new(
209                entity_id,
210                EntityKind::Vector {
211                    collection: self.config.collection.clone(),
212                },
213                EntityData::Vector(VectorData {
214                    dense: emb,
215                    sparse: None,
216                    content: Some(line.to_string()),
217                }),
218            )
219        } else {
220            // Plain row entity
221            let row_data = RowData {
222                columns: Vec::new(),
223                named: Some(named),
224                schema: None,
225            };
226            UnifiedEntity::new(
227                entity_id,
228                EntityKind::TableRow {
229                    table: Arc::from(self.config.collection.as_str()),
230                    row_id,
231                },
232                EntityData::Row(row_data),
233            )
234        };
235
236        store
237            .insert(&self.config.collection, entity)
238            .map_err(|e| format!("Insert failed: {:?}", e))?;
239
240        Ok(())
241    }
242}
243
244/// JSONL import error
245#[derive(Debug)]
246pub enum JsonlError {
247    Io(String),
248    Parse(String),
249}
250
251impl std::fmt::Display for JsonlError {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            JsonlError::Io(s) => write!(f, "I/O error: {}", s),
255            JsonlError::Parse(s) => write!(f, "Parse error: {}", s),
256        }
257    }
258}
259
260impl std::error::Error for JsonlError {}
261
262// ============================================================================
263// Simple JSON Parser (no external dependencies)
264// ============================================================================
265
266/// Simple JSON value representation
267#[derive(Debug, Clone, PartialEq)]
268pub enum JsonValue {
269    Null,
270    Bool(bool),
271    Number(f64),
272    String(String),
273    Array(Vec<JsonValue>),
274    Object(HashMap<String, JsonValue>),
275}
276
277/// Parse a JSON object from a string
278pub fn parse_json_object(s: &str) -> Result<HashMap<String, JsonValue>, String> {
279    let mut chars = s.trim().chars().peekable();
280
281    // Expect opening brace
282    match chars.next() {
283        Some('{') => {}
284        _ => return Err("Expected '{'".to_string()),
285    }
286
287    skip_whitespace(&mut chars);
288
289    // Empty object
290    if chars.peek() == Some(&'}') {
291        chars.next();
292        return Ok(HashMap::new());
293    }
294
295    let mut result = HashMap::new();
296
297    loop {
298        skip_whitespace(&mut chars);
299
300        // Parse key
301        let key = parse_string(&mut chars)?;
302
303        skip_whitespace(&mut chars);
304
305        // Expect colon
306        match chars.next() {
307            Some(':') => {}
308            _ => return Err("Expected ':'".to_string()),
309        }
310
311        skip_whitespace(&mut chars);
312
313        // Parse value
314        let value = parse_value(&mut chars)?;
315
316        result.insert(key, value);
317
318        skip_whitespace(&mut chars);
319
320        // Check for comma or closing brace
321        match chars.next() {
322            Some(',') => continue,
323            Some('}') => break,
324            _ => return Err("Expected ',' or '}'".to_string()),
325        }
326    }
327
328    Ok(result)
329}
330
331fn parse_value(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
332    skip_whitespace(chars);
333
334    match chars.peek() {
335        Some('"') => Ok(JsonValue::String(parse_string(chars)?)),
336        Some('[') => parse_array(chars),
337        Some('{') => {
338            // Collect the object as a string and parse it
339            let mut depth = 0;
340            let mut obj_str = String::new();
341            for c in chars.by_ref() {
342                obj_str.push(c);
343                if c == '{' {
344                    depth += 1;
345                }
346                if c == '}' {
347                    depth -= 1;
348                    if depth == 0 {
349                        break;
350                    }
351                }
352            }
353            parse_json_object(&obj_str).map(JsonValue::Object)
354        }
355        Some('t') | Some('f') => parse_bool(chars),
356        Some('n') => parse_null(chars),
357        Some(c) if c.is_ascii_digit() || *c == '-' => parse_number(chars),
358        _ => Err("Unexpected character".to_string()),
359    }
360}
361
362fn parse_string(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<String, String> {
363    // Expect opening quote
364    match chars.next() {
365        Some('"') => {}
366        _ => return Err("Expected '\"'".to_string()),
367    }
368
369    let mut result = String::new();
370    let mut escaped = false;
371
372    loop {
373        match chars.next() {
374            Some('\\') if !escaped => {
375                escaped = true;
376            }
377            Some('"') if !escaped => {
378                break;
379            }
380            Some(c) => {
381                if escaped {
382                    match c {
383                        'n' => result.push('\n'),
384                        'r' => result.push('\r'),
385                        't' => result.push('\t'),
386                        '\\' => result.push('\\'),
387                        '"' => result.push('"'),
388                        _ => {
389                            result.push('\\');
390                            result.push(c);
391                        }
392                    }
393                    escaped = false;
394                } else {
395                    result.push(c);
396                }
397            }
398            None => return Err("Unterminated string".to_string()),
399        }
400    }
401
402    Ok(result)
403}
404
405fn parse_number(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
406    let mut num_str = String::new();
407
408    while let Some(&c) = chars.peek() {
409        if c.is_ascii_digit() || c == '.' || c == '-' || c == '+' || c == 'e' || c == 'E' {
410            num_str.push(c);
411            chars.next();
412        } else {
413            break;
414        }
415    }
416
417    num_str
418        .parse::<f64>()
419        .map(JsonValue::Number)
420        .map_err(|_| "Invalid number".to_string())
421}
422
423fn parse_array(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
424    // Expect opening bracket
425    match chars.next() {
426        Some('[') => {}
427        _ => return Err("Expected '['".to_string()),
428    }
429
430    skip_whitespace(chars);
431
432    // Empty array
433    if chars.peek() == Some(&']') {
434        chars.next();
435        return Ok(JsonValue::Array(Vec::new()));
436    }
437
438    let mut result = Vec::new();
439
440    loop {
441        skip_whitespace(chars);
442        result.push(parse_value(chars)?);
443        skip_whitespace(chars);
444
445        match chars.next() {
446            Some(',') => continue,
447            Some(']') => break,
448            _ => return Err("Expected ',' or ']'".to_string()),
449        }
450    }
451
452    Ok(JsonValue::Array(result))
453}
454
455fn parse_bool(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
456    let mut word = String::new();
457    while let Some(&c) = chars.peek() {
458        if c.is_alphabetic() {
459            word.push(c);
460            chars.next();
461        } else {
462            break;
463        }
464    }
465
466    match word.as_str() {
467        "true" => Ok(JsonValue::Bool(true)),
468        "false" => Ok(JsonValue::Bool(false)),
469        _ => Err(format!("Invalid boolean: {}", word)),
470    }
471}
472
473fn parse_null(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
474    let mut word = String::new();
475    while let Some(&c) = chars.peek() {
476        if c.is_alphabetic() {
477            word.push(c);
478            chars.next();
479        } else {
480            break;
481        }
482    }
483
484    if word == "null" {
485        Ok(JsonValue::Null)
486    } else {
487        Err(format!("Invalid null: {}", word))
488    }
489}
490
491fn skip_whitespace(chars: &mut std::iter::Peekable<std::str::Chars>) {
492    while let Some(&c) = chars.peek() {
493        if c.is_whitespace() {
494            chars.next();
495        } else {
496            break;
497        }
498    }
499}
500
501/// Convert JsonValue to storage Value
502fn json_to_value(jv: &JsonValue) -> Value {
503    match jv {
504        JsonValue::Null => Value::Null,
505        JsonValue::Bool(b) => Value::Boolean(*b),
506        JsonValue::Number(n) => {
507            if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
508                Value::Integer(*n as i64)
509            } else {
510                Value::Float(*n)
511            }
512        }
513        JsonValue::String(s) => Value::text(s.clone()),
514        JsonValue::Array(arr) => Value::text(format!(
515            "[{}]",
516            arr.iter()
517                .map(|v| value_to_string(&json_to_value(v)))
518                .collect::<Vec<_>>()
519                .join(",")
520        )),
521        JsonValue::Object(_) => Value::text("[object]".to_string()),
522    }
523}
524
525fn value_to_string(v: &Value) -> String {
526    match v {
527        Value::Null => "null".to_string(),
528        Value::Boolean(b) => b.to_string(),
529        Value::Integer(i) => i.to_string(),
530        Value::UnsignedInteger(u) => u.to_string(),
531        Value::Float(f) => f.to_string(),
532        Value::Text(s) => s.to_string(),
533        Value::Blob(b) => format!("<{} bytes>", b.len()),
534        _ => "?".to_string(),
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn test_parse_simple_object() {
544        let json = r#"{"name": "Alice", "age": 30}"#;
545        let parsed = parse_json_object(json).unwrap();
546
547        assert_eq!(
548            parsed.get("name"),
549            Some(&JsonValue::String("Alice".to_string()))
550        );
551        assert!(
552            matches!(parsed.get("age"), Some(JsonValue::Number(n)) if (*n - 30.0).abs() < 0.01)
553        );
554    }
555
556    #[test]
557    fn test_parse_with_array() {
558        let json = r#"{"embedding": [0.1, 0.2, 0.3]}"#;
559        let parsed = parse_json_object(json).unwrap();
560
561        if let Some(JsonValue::Array(arr)) = parsed.get("embedding") {
562            assert_eq!(arr.len(), 3);
563        } else {
564            panic!("Expected array");
565        }
566    }
567
568    #[test]
569    fn test_parse_nested() {
570        let json = r#"{"user": {"name": "Bob"}}"#;
571        let parsed = parse_json_object(json).unwrap();
572
573        if let Some(JsonValue::Object(obj)) = parsed.get("user") {
574            assert_eq!(obj.get("name"), Some(&JsonValue::String("Bob".to_string())));
575        } else {
576            panic!("Expected nested object");
577        }
578    }
579
580    #[test]
581    fn test_parse_escape_sequences() {
582        let json = r#"{"text": "Hello\nWorld"}"#;
583        let parsed = parse_json_object(json).unwrap();
584
585        assert_eq!(
586            parsed.get("text"),
587            Some(&JsonValue::String("Hello\nWorld".to_string()))
588        );
589    }
590
591    #[test]
592    fn test_json_to_value() {
593        assert_eq!(json_to_value(&JsonValue::Null), Value::Null);
594        assert_eq!(json_to_value(&JsonValue::Bool(true)), Value::Boolean(true));
595        assert_eq!(json_to_value(&JsonValue::Number(42.0)), Value::Integer(42));
596        assert_eq!(json_to_value(&JsonValue::Number(2.5)), Value::Float(2.5));
597        assert_eq!(
598            json_to_value(&JsonValue::String("test".to_string())),
599            Value::text("test".to_string())
600        );
601    }
602}