reddb_server/storage/import/
jsonl.rs1use crate::storage::schema::types::Value;
26use crate::storage::Store;
27use crate::storage::{EntityData, EntityKind, RowData, UnifiedEntity, VectorData};
28use std::collections::HashMap;
29use std::fs::File;
30use std::io::{BufRead, BufReader};
31use std::path::Path;
32use std::sync::Arc;
33
34#[derive(Debug, Clone)]
36pub struct JsonlConfig {
37 pub id_field: Option<String>,
39 pub embedding_field: Option<String>,
41 pub collection: String,
43 pub batch_size: usize,
45 pub skip_errors: bool,
47 pub max_lines: Option<usize>,
49}
50
51impl Default for JsonlConfig {
52 fn default() -> Self {
53 Self {
54 id_field: None,
55 embedding_field: None,
56 collection: "imported".to_string(),
57 batch_size: 1000,
58 skip_errors: false,
59 max_lines: None,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct ImportStats {
67 pub lines_processed: usize,
69 pub records_imported: usize,
71 pub errors_skipped: usize,
73 pub duration_ms: u64,
75}
76
77pub struct JsonlImporter {
79 config: JsonlConfig,
80}
81
82impl JsonlImporter {
83 pub fn new(config: JsonlConfig) -> Self {
85 Self { config }
86 }
87
88 pub fn with_defaults() -> Self {
90 Self::new(JsonlConfig::default())
91 }
92
93 pub fn import_file<P: AsRef<Path>>(
95 &self,
96 path: P,
97 store: &mut Store,
98 ) -> Result<ImportStats, JsonlError> {
99 let file = File::open(path.as_ref()).map_err(|e| JsonlError::Io(e.to_string()))?;
100 let reader = BufReader::new(file);
101 self.import_reader(reader, store)
102 }
103
104 pub fn import_reader<R: BufRead>(
106 &self,
107 reader: R,
108 store: &mut Store,
109 ) -> Result<ImportStats, JsonlError> {
110 let start = std::time::Instant::now();
111 let mut stats = ImportStats::default();
112
113 for (line_num, line_result) in reader.lines().enumerate() {
114 if let Some(max) = self.config.max_lines {
116 if stats.lines_processed >= max {
117 break;
118 }
119 }
120
121 stats.lines_processed += 1;
122
123 let line = match line_result {
124 Ok(l) => l,
125 Err(e) => {
126 if self.config.skip_errors {
127 stats.errors_skipped += 1;
128 continue;
129 }
130 return Err(JsonlError::Io(format!("Line {}: {}", line_num + 1, e)));
131 }
132 };
133
134 let trimmed = line.trim();
136 if trimmed.is_empty() {
137 continue;
138 }
139
140 match self.parse_and_insert(&line, store) {
142 Ok(()) => {
143 stats.records_imported += 1;
144 }
145 Err(e) => {
146 if self.config.skip_errors {
147 stats.errors_skipped += 1;
148 continue;
149 }
150 return Err(JsonlError::Parse(format!("Line {}: {}", line_num + 1, e)));
151 }
152 }
153 }
154
155 stats.duration_ms = start.elapsed().as_millis() as u64;
156 Ok(stats)
157 }
158
159 fn parse_and_insert(&self, line: &str, store: &mut Store) -> Result<(), String> {
161 let json = parse_json_object(line)?;
162
163 let embedding = if let Some(ref emb_field) = self.config.embedding_field {
165 json.get(emb_field).and_then(|v| {
166 if let JsonValue::Array(arr) = v {
167 let floats: Option<Vec<f32>> = arr
168 .iter()
169 .map(|v| match v {
170 JsonValue::Number(n) => Some(*n as f32),
171 _ => None,
172 })
173 .collect();
174 floats
175 } else {
176 None
177 }
178 })
179 } else {
180 None
181 };
182
183 let mut named = HashMap::new();
185 for (key, value) in &json {
186 if self
188 .config
189 .embedding_field
190 .as_ref()
191 .map(|f| f == key)
192 .unwrap_or(false)
193 {
194 continue;
195 }
196
197 named.insert(key.clone(), json_to_value(value));
198 }
199
200 let entity_id = store.next_entity_id();
202 let row_id = entity_id.0;
203
204 let entity = if let Some(emb) = embedding {
206 UnifiedEntity::new(
209 entity_id,
210 EntityKind::Vector {
211 collection: self.config.collection.clone(),
212 },
213 EntityData::Vector(VectorData {
214 dense: emb,
215 sparse: None,
216 content: Some(line.to_string()),
217 }),
218 )
219 } else {
220 let row_data = RowData {
222 columns: Vec::new(),
223 named: Some(named),
224 schema: None,
225 };
226 UnifiedEntity::new(
227 entity_id,
228 EntityKind::TableRow {
229 table: Arc::from(self.config.collection.as_str()),
230 row_id,
231 },
232 EntityData::Row(row_data),
233 )
234 };
235
236 store
237 .insert(&self.config.collection, entity)
238 .map_err(|e| format!("Insert failed: {:?}", e))?;
239
240 Ok(())
241 }
242}
243
244#[derive(Debug)]
246pub enum JsonlError {
247 Io(String),
248 Parse(String),
249}
250
251impl std::fmt::Display for JsonlError {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 JsonlError::Io(s) => write!(f, "I/O error: {}", s),
255 JsonlError::Parse(s) => write!(f, "Parse error: {}", s),
256 }
257 }
258}
259
260impl std::error::Error for JsonlError {}
261
262#[derive(Debug, Clone, PartialEq)]
268pub enum JsonValue {
269 Null,
270 Bool(bool),
271 Number(f64),
272 String(String),
273 Array(Vec<JsonValue>),
274 Object(HashMap<String, JsonValue>),
275}
276
277pub fn parse_json_object(s: &str) -> Result<HashMap<String, JsonValue>, String> {
279 let mut chars = s.trim().chars().peekable();
280
281 match chars.next() {
283 Some('{') => {}
284 _ => return Err("Expected '{'".to_string()),
285 }
286
287 skip_whitespace(&mut chars);
288
289 if chars.peek() == Some(&'}') {
291 chars.next();
292 return Ok(HashMap::new());
293 }
294
295 let mut result = HashMap::new();
296
297 loop {
298 skip_whitespace(&mut chars);
299
300 let key = parse_string(&mut chars)?;
302
303 skip_whitespace(&mut chars);
304
305 match chars.next() {
307 Some(':') => {}
308 _ => return Err("Expected ':'".to_string()),
309 }
310
311 skip_whitespace(&mut chars);
312
313 let value = parse_value(&mut chars)?;
315
316 result.insert(key, value);
317
318 skip_whitespace(&mut chars);
319
320 match chars.next() {
322 Some(',') => continue,
323 Some('}') => break,
324 _ => return Err("Expected ',' or '}'".to_string()),
325 }
326 }
327
328 Ok(result)
329}
330
331fn parse_value(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
332 skip_whitespace(chars);
333
334 match chars.peek() {
335 Some('"') => Ok(JsonValue::String(parse_string(chars)?)),
336 Some('[') => parse_array(chars),
337 Some('{') => {
338 let mut depth = 0;
340 let mut obj_str = String::new();
341 for c in chars.by_ref() {
342 obj_str.push(c);
343 if c == '{' {
344 depth += 1;
345 }
346 if c == '}' {
347 depth -= 1;
348 if depth == 0 {
349 break;
350 }
351 }
352 }
353 parse_json_object(&obj_str).map(JsonValue::Object)
354 }
355 Some('t') | Some('f') => parse_bool(chars),
356 Some('n') => parse_null(chars),
357 Some(c) if c.is_ascii_digit() || *c == '-' => parse_number(chars),
358 _ => Err("Unexpected character".to_string()),
359 }
360}
361
362fn parse_string(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<String, String> {
363 match chars.next() {
365 Some('"') => {}
366 _ => return Err("Expected '\"'".to_string()),
367 }
368
369 let mut result = String::new();
370 let mut escaped = false;
371
372 loop {
373 match chars.next() {
374 Some('\\') if !escaped => {
375 escaped = true;
376 }
377 Some('"') if !escaped => {
378 break;
379 }
380 Some(c) => {
381 if escaped {
382 match c {
383 'n' => result.push('\n'),
384 'r' => result.push('\r'),
385 't' => result.push('\t'),
386 '\\' => result.push('\\'),
387 '"' => result.push('"'),
388 _ => {
389 result.push('\\');
390 result.push(c);
391 }
392 }
393 escaped = false;
394 } else {
395 result.push(c);
396 }
397 }
398 None => return Err("Unterminated string".to_string()),
399 }
400 }
401
402 Ok(result)
403}
404
405fn parse_number(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
406 let mut num_str = String::new();
407
408 while let Some(&c) = chars.peek() {
409 if c.is_ascii_digit() || c == '.' || c == '-' || c == '+' || c == 'e' || c == 'E' {
410 num_str.push(c);
411 chars.next();
412 } else {
413 break;
414 }
415 }
416
417 num_str
418 .parse::<f64>()
419 .map(JsonValue::Number)
420 .map_err(|_| "Invalid number".to_string())
421}
422
423fn parse_array(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
424 match chars.next() {
426 Some('[') => {}
427 _ => return Err("Expected '['".to_string()),
428 }
429
430 skip_whitespace(chars);
431
432 if chars.peek() == Some(&']') {
434 chars.next();
435 return Ok(JsonValue::Array(Vec::new()));
436 }
437
438 let mut result = Vec::new();
439
440 loop {
441 skip_whitespace(chars);
442 result.push(parse_value(chars)?);
443 skip_whitespace(chars);
444
445 match chars.next() {
446 Some(',') => continue,
447 Some(']') => break,
448 _ => return Err("Expected ',' or ']'".to_string()),
449 }
450 }
451
452 Ok(JsonValue::Array(result))
453}
454
455fn parse_bool(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
456 let mut word = String::new();
457 while let Some(&c) = chars.peek() {
458 if c.is_alphabetic() {
459 word.push(c);
460 chars.next();
461 } else {
462 break;
463 }
464 }
465
466 match word.as_str() {
467 "true" => Ok(JsonValue::Bool(true)),
468 "false" => Ok(JsonValue::Bool(false)),
469 _ => Err(format!("Invalid boolean: {}", word)),
470 }
471}
472
473fn parse_null(chars: &mut std::iter::Peekable<std::str::Chars>) -> Result<JsonValue, String> {
474 let mut word = String::new();
475 while let Some(&c) = chars.peek() {
476 if c.is_alphabetic() {
477 word.push(c);
478 chars.next();
479 } else {
480 break;
481 }
482 }
483
484 if word == "null" {
485 Ok(JsonValue::Null)
486 } else {
487 Err(format!("Invalid null: {}", word))
488 }
489}
490
491fn skip_whitespace(chars: &mut std::iter::Peekable<std::str::Chars>) {
492 while let Some(&c) = chars.peek() {
493 if c.is_whitespace() {
494 chars.next();
495 } else {
496 break;
497 }
498 }
499}
500
501fn json_to_value(jv: &JsonValue) -> Value {
503 match jv {
504 JsonValue::Null => Value::Null,
505 JsonValue::Bool(b) => Value::Boolean(*b),
506 JsonValue::Number(n) => {
507 if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
508 Value::Integer(*n as i64)
509 } else {
510 Value::Float(*n)
511 }
512 }
513 JsonValue::String(s) => Value::text(s.clone()),
514 JsonValue::Array(arr) => Value::text(format!(
515 "[{}]",
516 arr.iter()
517 .map(|v| value_to_string(&json_to_value(v)))
518 .collect::<Vec<_>>()
519 .join(",")
520 )),
521 JsonValue::Object(_) => Value::text("[object]".to_string()),
522 }
523}
524
525fn value_to_string(v: &Value) -> String {
526 match v {
527 Value::Null => "null".to_string(),
528 Value::Boolean(b) => b.to_string(),
529 Value::Integer(i) => i.to_string(),
530 Value::UnsignedInteger(u) => u.to_string(),
531 Value::Float(f) => f.to_string(),
532 Value::Text(s) => s.to_string(),
533 Value::Blob(b) => format!("<{} bytes>", b.len()),
534 _ => "?".to_string(),
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn test_parse_simple_object() {
544 let json = r#"{"name": "Alice", "age": 30}"#;
545 let parsed = parse_json_object(json).unwrap();
546
547 assert_eq!(
548 parsed.get("name"),
549 Some(&JsonValue::String("Alice".to_string()))
550 );
551 assert!(
552 matches!(parsed.get("age"), Some(JsonValue::Number(n)) if (*n - 30.0).abs() < 0.01)
553 );
554 }
555
556 #[test]
557 fn test_parse_with_array() {
558 let json = r#"{"embedding": [0.1, 0.2, 0.3]}"#;
559 let parsed = parse_json_object(json).unwrap();
560
561 if let Some(JsonValue::Array(arr)) = parsed.get("embedding") {
562 assert_eq!(arr.len(), 3);
563 } else {
564 panic!("Expected array");
565 }
566 }
567
568 #[test]
569 fn test_parse_nested() {
570 let json = r#"{"user": {"name": "Bob"}}"#;
571 let parsed = parse_json_object(json).unwrap();
572
573 if let Some(JsonValue::Object(obj)) = parsed.get("user") {
574 assert_eq!(obj.get("name"), Some(&JsonValue::String("Bob".to_string())));
575 } else {
576 panic!("Expected nested object");
577 }
578 }
579
580 #[test]
581 fn test_parse_escape_sequences() {
582 let json = r#"{"text": "Hello\nWorld"}"#;
583 let parsed = parse_json_object(json).unwrap();
584
585 assert_eq!(
586 parsed.get("text"),
587 Some(&JsonValue::String("Hello\nWorld".to_string()))
588 );
589 }
590
591 #[test]
592 fn test_json_to_value() {
593 assert_eq!(json_to_value(&JsonValue::Null), Value::Null);
594 assert_eq!(json_to_value(&JsonValue::Bool(true)), Value::Boolean(true));
595 assert_eq!(json_to_value(&JsonValue::Number(42.0)), Value::Integer(42));
596 assert_eq!(json_to_value(&JsonValue::Number(2.5)), Value::Float(2.5));
597 assert_eq!(
598 json_to_value(&JsonValue::String("test".to_string())),
599 Value::text("test".to_string())
600 );
601 }
602}