Skip to main content

storage/
export.rs

1//! Data Export/Import Utilities for Buffer
2//!
3//! Provides tools for exporting and importing vector data in various formats:
4//! - JSON (human-readable, widely compatible)
5//! - JSONL (JSON Lines, streaming-friendly)
6//! - CSV (spreadsheet compatible, metadata as JSON string)
7//! - Binary (compact, fast)
8
9use std::io::{BufRead, BufReader, BufWriter, Read, Write};
10use std::path::Path;
11
12use common::{DakeraError, Vector};
13use serde::{Deserialize, Serialize};
14
15use crate::traits::VectorStorage;
16
17/// Export format options
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum ExportFormat {
20    /// JSON array format - entire dataset as one JSON array
21    #[default]
22    Json,
23    /// JSON Lines format - one JSON object per line (streaming-friendly)
24    JsonLines,
25    /// CSV format with metadata as JSON string column
26    Csv,
27    /// Compact binary format for fast I/O
28    Binary,
29}
30
31/// Import format options
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
33pub enum ImportFormat {
34    /// Auto-detect format from file extension or content
35    #[default]
36    Auto,
37    /// JSON array format
38    Json,
39    /// JSON Lines format
40    JsonLines,
41    /// CSV format
42    Csv,
43    /// Binary format
44    Binary,
45}
46
47/// Configuration for export operations
48#[derive(Debug, Clone)]
49pub struct ExportConfig {
50    /// Output format
51    pub format: ExportFormat,
52    /// Include vector values in export (can be disabled for metadata-only export)
53    pub include_vectors: bool,
54    /// Include metadata in export
55    pub include_metadata: bool,
56    /// Pretty print JSON output
57    pub pretty_print: bool,
58    /// Batch size for streaming exports
59    pub batch_size: usize,
60    /// Compress output (gzip)
61    pub compress: bool,
62}
63
64impl Default for ExportConfig {
65    fn default() -> Self {
66        Self {
67            format: ExportFormat::Json,
68            include_vectors: true,
69            include_metadata: true,
70            pretty_print: false,
71            batch_size: 10000,
72            compress: false,
73        }
74    }
75}
76
77/// Configuration for import operations
78#[derive(Debug, Clone)]
79pub struct ImportConfig {
80    /// Input format (Auto to detect)
81    pub format: ImportFormat,
82    /// Batch size for bulk inserts
83    pub batch_size: usize,
84    /// Skip invalid records instead of failing
85    pub skip_invalid: bool,
86    /// Overwrite existing vectors with same ID
87    pub overwrite: bool,
88    /// Namespace to import into
89    pub namespace: String,
90}
91
92impl Default for ImportConfig {
93    fn default() -> Self {
94        Self {
95            format: ImportFormat::Auto,
96            batch_size: 1000,
97            skip_invalid: false,
98            overwrite: true,
99            namespace: "default".to_string(),
100        }
101    }
102}
103
104/// Statistics from an export operation
105#[derive(Debug, Clone, Default)]
106pub struct ExportStats {
107    /// Number of vectors exported
108    pub vectors_exported: u64,
109    /// Total bytes written
110    pub bytes_written: u64,
111    /// Export duration in milliseconds
112    pub duration_ms: u64,
113    /// Any warnings during export
114    pub warnings: Vec<String>,
115}
116
117/// Statistics from an import operation
118#[derive(Debug, Clone, Default)]
119pub struct ImportStats {
120    /// Number of vectors imported
121    pub vectors_imported: u64,
122    /// Number of vectors skipped (invalid or duplicate)
123    pub vectors_skipped: u64,
124    /// Total bytes read
125    pub bytes_read: u64,
126    /// Import duration in milliseconds
127    pub duration_ms: u64,
128    /// Any warnings during import
129    pub warnings: Vec<String>,
130}
131
132/// Serializable vector record for export/import
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct VectorRecord {
135    /// Vector ID
136    pub id: String,
137    /// Vector values (optional for metadata-only exports)
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub values: Option<Vec<f32>>,
140    /// Metadata (optional)
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub metadata: Option<serde_json::Value>,
143    /// TTL in seconds (optional)
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub ttl_seconds: Option<u64>,
146}
147
148impl From<&Vector> for VectorRecord {
149    fn from(v: &Vector) -> Self {
150        Self {
151            id: v.id.clone(),
152            values: Some(v.values.clone()),
153            metadata: v.metadata.clone(),
154            ttl_seconds: v.ttl_seconds,
155        }
156    }
157}
158
159impl From<VectorRecord> for Vector {
160    fn from(r: VectorRecord) -> Self {
161        Self {
162            id: r.id,
163            values: r.values.unwrap_or_default(),
164            metadata: r.metadata,
165            ttl_seconds: r.ttl_seconds,
166            expires_at: None,
167        }
168    }
169}
170
171/// Data exporter for vector storage
172pub struct DataExporter<S: VectorStorage> {
173    storage: S,
174}
175
176impl<S: VectorStorage> DataExporter<S> {
177    /// Create a new data exporter
178    pub fn new(storage: S) -> Self {
179        Self { storage }
180    }
181
182    /// Export vectors from a namespace to a writer
183    pub async fn export_to_writer<W: Write>(
184        &self,
185        namespace: &str,
186        writer: W,
187        config: &ExportConfig,
188    ) -> Result<ExportStats, DakeraError> {
189        let start = std::time::Instant::now();
190        let mut stats = ExportStats::default();
191
192        let ns = namespace.to_string();
193        let vectors = self.storage.get_all(&ns).await?;
194
195        let records: Vec<VectorRecord> = vectors
196            .iter()
197            .map(|v| {
198                let mut record = VectorRecord::from(v);
199                if !config.include_vectors {
200                    record.values = None;
201                }
202                if !config.include_metadata {
203                    record.metadata = None;
204                }
205                record
206            })
207            .collect();
208
209        stats.vectors_exported = records.len() as u64;
210
211        let bytes_written = match config.format {
212            ExportFormat::Json => self.write_json(writer, &records, config.pretty_print)?,
213            ExportFormat::JsonLines => self.write_jsonl(writer, &records)?,
214            ExportFormat::Csv => self.write_csv(writer, &records)?,
215            ExportFormat::Binary => self.write_binary(writer, &records)?,
216        };
217
218        stats.bytes_written = bytes_written;
219        stats.duration_ms = start.elapsed().as_millis() as u64;
220
221        Ok(stats)
222    }
223
224    /// Export vectors to a file
225    pub async fn export_to_file(
226        &self,
227        namespace: &str,
228        path: impl AsRef<Path>,
229        config: &ExportConfig,
230    ) -> Result<ExportStats, DakeraError> {
231        let file = std::fs::File::create(path.as_ref())
232            .map_err(|e| DakeraError::Storage(e.to_string()))?;
233        let writer = BufWriter::new(file);
234        self.export_to_writer(namespace, writer, config).await
235    }
236
237    /// Export to JSON string
238    pub async fn export_to_string(
239        &self,
240        namespace: &str,
241        config: &ExportConfig,
242    ) -> Result<String, DakeraError> {
243        let mut buffer = Vec::new();
244        self.export_to_writer(namespace, &mut buffer, config)
245            .await?;
246        String::from_utf8(buffer).map_err(|e| DakeraError::Storage(e.to_string()))
247    }
248
249    fn write_json<W: Write>(
250        &self,
251        mut writer: W,
252        records: &[VectorRecord],
253        pretty: bool,
254    ) -> Result<u64, DakeraError> {
255        let json = if pretty {
256            serde_json::to_string_pretty(records)
257        } else {
258            serde_json::to_string(records)
259        }
260        .map_err(|e| DakeraError::Storage(e.to_string()))?;
261
262        let bytes = json.as_bytes();
263        writer
264            .write_all(bytes)
265            .map_err(|e| DakeraError::Storage(e.to_string()))?;
266
267        Ok(bytes.len() as u64)
268    }
269
270    fn write_jsonl<W: Write>(
271        &self,
272        mut writer: W,
273        records: &[VectorRecord],
274    ) -> Result<u64, DakeraError> {
275        let mut total_bytes = 0u64;
276
277        for record in records {
278            let line =
279                serde_json::to_string(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
280            writeln!(writer, "{}", line).map_err(|e| DakeraError::Storage(e.to_string()))?;
281            total_bytes += line.len() as u64 + 1;
282        }
283
284        Ok(total_bytes)
285    }
286
287    fn write_csv<W: Write>(
288        &self,
289        mut writer: W,
290        records: &[VectorRecord],
291    ) -> Result<u64, DakeraError> {
292        let mut total_bytes = 0u64;
293
294        // Write header
295        let header = "id,values,metadata,ttl_seconds\n";
296        writer
297            .write_all(header.as_bytes())
298            .map_err(|e| DakeraError::Storage(e.to_string()))?;
299        total_bytes += header.len() as u64;
300
301        for record in records {
302            let values_json = record
303                .values
304                .as_ref()
305                .map(|v| serde_json::to_string(v).unwrap_or_default())
306                .unwrap_or_default();
307
308            let metadata_json = record
309                .metadata
310                .as_ref()
311                .map(|m| serde_json::to_string(m).unwrap_or_default())
312                .unwrap_or_default();
313
314            let ttl = record
315                .ttl_seconds
316                .map(|t| t.to_string())
317                .unwrap_or_default();
318
319            let line = format!(
320                "\"{}\",\"{}\",\"{}\",{}\n",
321                escape_csv(&record.id),
322                escape_csv(&values_json),
323                escape_csv(&metadata_json),
324                ttl
325            );
326
327            writer
328                .write_all(line.as_bytes())
329                .map_err(|e| DakeraError::Storage(e.to_string()))?;
330            total_bytes += line.len() as u64;
331        }
332
333        Ok(total_bytes)
334    }
335
336    fn write_binary<W: Write>(
337        &self,
338        mut writer: W,
339        records: &[VectorRecord],
340    ) -> Result<u64, DakeraError> {
341        let mut total_bytes = 0u64;
342
343        // Write magic number and version
344        let magic = b"VPUF";
345        let version: u32 = 1;
346        writer
347            .write_all(magic)
348            .map_err(|e| DakeraError::Storage(e.to_string()))?;
349        writer
350            .write_all(&version.to_le_bytes())
351            .map_err(|e| DakeraError::Storage(e.to_string()))?;
352        total_bytes += 8;
353
354        // Write record count
355        let count = records.len() as u64;
356        writer
357            .write_all(&count.to_le_bytes())
358            .map_err(|e| DakeraError::Storage(e.to_string()))?;
359        total_bytes += 8;
360
361        // Write each record
362        for record in records {
363            let json =
364                serde_json::to_vec(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
365            let len = json.len() as u32;
366            writer
367                .write_all(&len.to_le_bytes())
368                .map_err(|e| DakeraError::Storage(e.to_string()))?;
369            writer
370                .write_all(&json)
371                .map_err(|e| DakeraError::Storage(e.to_string()))?;
372            total_bytes += 4 + json.len() as u64;
373        }
374
375        Ok(total_bytes)
376    }
377}
378
379/// Data importer for vector storage
380pub struct DataImporter<S: VectorStorage> {
381    storage: S,
382}
383
384impl<S: VectorStorage> DataImporter<S> {
385    /// Create a new data importer
386    pub fn new(storage: S) -> Self {
387        Self { storage }
388    }
389
390    /// Import vectors from a reader
391    pub async fn import_from_reader<R: Read>(
392        &self,
393        reader: R,
394        config: &ImportConfig,
395    ) -> Result<ImportStats, DakeraError> {
396        let start = std::time::Instant::now();
397        let mut stats = ImportStats::default();
398
399        let format = if config.format == ImportFormat::Auto {
400            // For auto-detect, we need to peek at the content
401            // Default to JSON for now
402            ImportFormat::Json
403        } else {
404            config.format
405        };
406
407        let records = match format {
408            ImportFormat::Json | ImportFormat::Auto => self.read_json(reader)?,
409            ImportFormat::JsonLines => self.read_jsonl(reader, config.skip_invalid, &mut stats)?,
410            ImportFormat::Csv => self.read_csv(reader, config.skip_invalid, &mut stats)?,
411            ImportFormat::Binary => self.read_binary(reader)?,
412        };
413
414        // Ensure namespace exists
415        self.storage.ensure_namespace(&config.namespace).await?;
416
417        // Import in batches
418        let vectors: Vec<Vector> = records.into_iter().map(Vector::from).collect();
419
420        for chunk in vectors.chunks(config.batch_size) {
421            let batch: Vec<Vector> = chunk.to_vec();
422            let count = self.storage.upsert(&config.namespace, batch).await?;
423            stats.vectors_imported += count as u64;
424        }
425
426        stats.duration_ms = start.elapsed().as_millis() as u64;
427
428        Ok(stats)
429    }
430
431    /// Import vectors from a file
432    pub async fn import_from_file(
433        &self,
434        path: impl AsRef<Path>,
435        config: &ImportConfig,
436    ) -> Result<ImportStats, DakeraError> {
437        let path = path.as_ref();
438
439        // Auto-detect format from extension if needed
440        let mut config = config.clone();
441        if config.format == ImportFormat::Auto {
442            config.format = match path.extension().and_then(|e| e.to_str()) {
443                Some("json") => ImportFormat::Json,
444                Some("jsonl") | Some("ndjson") => ImportFormat::JsonLines,
445                Some("csv") => ImportFormat::Csv,
446                Some("bin") | Some("vpuf") => ImportFormat::Binary,
447                _ => ImportFormat::Json,
448            };
449        }
450
451        let file = std::fs::File::open(path).map_err(|e| DakeraError::Storage(e.to_string()))?;
452        let reader = BufReader::new(file);
453
454        let mut stats = self.import_from_reader(reader, &config).await?;
455        stats.bytes_read = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
456
457        Ok(stats)
458    }
459
460    /// Import vectors from a JSON string
461    pub async fn import_from_string(
462        &self,
463        data: &str,
464        config: &ImportConfig,
465    ) -> Result<ImportStats, DakeraError> {
466        let reader = std::io::Cursor::new(data.as_bytes());
467        self.import_from_reader(reader, config).await
468    }
469
470    fn read_json<R: Read>(&self, reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
471        serde_json::from_reader(reader).map_err(|e| DakeraError::Storage(e.to_string()))
472    }
473
474    fn read_jsonl<R: Read>(
475        &self,
476        reader: R,
477        skip_invalid: bool,
478        stats: &mut ImportStats,
479    ) -> Result<Vec<VectorRecord>, DakeraError> {
480        let buf_reader = BufReader::new(reader);
481        let mut records = Vec::new();
482
483        for line in buf_reader.lines() {
484            let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
485            if line.trim().is_empty() {
486                continue;
487            }
488
489            match serde_json::from_str::<VectorRecord>(&line) {
490                Ok(record) => records.push(record),
491                Err(e) => {
492                    if skip_invalid {
493                        stats.vectors_skipped += 1;
494                        stats
495                            .warnings
496                            .push(format!("Skipped invalid record: {}", e));
497                    } else {
498                        return Err(DakeraError::Storage(format!("Invalid JSON record: {}", e)));
499                    }
500                }
501            }
502        }
503
504        Ok(records)
505    }
506
507    fn read_csv<R: Read>(
508        &self,
509        reader: R,
510        skip_invalid: bool,
511        stats: &mut ImportStats,
512    ) -> Result<Vec<VectorRecord>, DakeraError> {
513        let buf_reader = BufReader::new(reader);
514        let mut records = Vec::new();
515        let mut lines = buf_reader.lines();
516
517        // Skip header
518        let _ = lines.next();
519
520        for line in lines {
521            let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
522            if line.trim().is_empty() {
523                continue;
524            }
525
526            match parse_csv_line(&line) {
527                Ok(record) => records.push(record),
528                Err(e) => {
529                    if skip_invalid {
530                        stats.vectors_skipped += 1;
531                        stats
532                            .warnings
533                            .push(format!("Skipped invalid CSV row: {}", e));
534                    } else {
535                        return Err(DakeraError::Storage(format!("Invalid CSV row: {}", e)));
536                    }
537                }
538            }
539        }
540
541        Ok(records)
542    }
543
544    fn read_binary<R: Read>(&self, mut reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
545        // Read and verify magic number
546        let mut magic = [0u8; 4];
547        reader
548            .read_exact(&mut magic)
549            .map_err(|e| DakeraError::Storage(e.to_string()))?;
550
551        if &magic != b"VPUF" {
552            return Err(DakeraError::Storage(
553                "Invalid binary format: bad magic number".to_string(),
554            ));
555        }
556
557        // Read version
558        let mut version_bytes = [0u8; 4];
559        reader
560            .read_exact(&mut version_bytes)
561            .map_err(|e| DakeraError::Storage(e.to_string()))?;
562        let version = u32::from_le_bytes(version_bytes);
563
564        if version != 1 {
565            return Err(DakeraError::Storage(format!(
566                "Unsupported binary format version: {}",
567                version
568            )));
569        }
570
571        // Read record count
572        let mut count_bytes = [0u8; 8];
573        reader
574            .read_exact(&mut count_bytes)
575            .map_err(|e| DakeraError::Storage(e.to_string()))?;
576        let count = u64::from_le_bytes(count_bytes);
577
578        const MAX_IMPORT_RECORDS: u64 = 10_000_000;
579        if count > MAX_IMPORT_RECORDS {
580            return Err(DakeraError::InvalidRequest(format!(
581                "Binary file declares {} records which exceeds import limit of {}",
582                count, MAX_IMPORT_RECORDS
583            )));
584        }
585        let mut records = Vec::with_capacity(count as usize);
586
587        // Read each record
588        for _ in 0..count {
589            let mut len_bytes = [0u8; 4];
590            reader
591                .read_exact(&mut len_bytes)
592                .map_err(|e| DakeraError::Storage(e.to_string()))?;
593            let len = u32::from_le_bytes(len_bytes);
594
595            const MAX_RECORD_BYTES: u32 = 10 * 1024 * 1024; // 10 MB per record
596            if len > MAX_RECORD_BYTES {
597                return Err(DakeraError::InvalidRequest(format!(
598                    "Record size {} bytes exceeds maximum of {} bytes",
599                    len, MAX_RECORD_BYTES
600                )));
601            }
602            let len = len as usize;
603
604            let mut json_bytes = vec![0u8; len];
605            reader
606                .read_exact(&mut json_bytes)
607                .map_err(|e| DakeraError::Storage(e.to_string()))?;
608
609            let record: VectorRecord = serde_json::from_slice(&json_bytes)
610                .map_err(|e| DakeraError::Storage(e.to_string()))?;
611            records.push(record);
612        }
613
614        Ok(records)
615    }
616}
617
618/// Escape special characters for CSV
619fn escape_csv(s: &str) -> String {
620    s.replace('"', "\"\"")
621}
622
623/// Parse a CSV line into a VectorRecord
624fn parse_csv_line(line: &str) -> Result<VectorRecord, String> {
625    // Simple CSV parsing - handles quoted fields with embedded commas
626    let fields = parse_csv_fields(line)?;
627
628    if fields.len() < 2 {
629        return Err("CSV line must have at least id and values".to_string());
630    }
631
632    let id = fields[0].clone();
633
634    let values: Option<Vec<f32>> = if fields.len() > 1 && !fields[1].is_empty() {
635        let values_str = &fields[1];
636        serde_json::from_str(values_str).map_err(|e| format!("Invalid values JSON: {}", e))?
637    } else {
638        None
639    };
640
641    let metadata: Option<serde_json::Value> = if fields.len() > 2 && !fields[2].is_empty() {
642        serde_json::from_str(&fields[2]).ok()
643    } else {
644        None
645    };
646
647    let ttl_seconds: Option<u64> = if fields.len() > 3 && !fields[3].is_empty() {
648        fields[3].parse().ok()
649    } else {
650        None
651    };
652
653    Ok(VectorRecord {
654        id,
655        values,
656        metadata,
657        ttl_seconds,
658    })
659}
660
661/// Parse CSV fields handling quoted strings
662fn parse_csv_fields(line: &str) -> Result<Vec<String>, String> {
663    let mut fields = Vec::new();
664    let mut current = String::new();
665    let mut in_quotes = false;
666    let mut chars = line.chars().peekable();
667
668    while let Some(c) = chars.next() {
669        match c {
670            '"' if !in_quotes => {
671                in_quotes = true;
672            }
673            '"' if in_quotes => {
674                if chars.peek() == Some(&'"') {
675                    // Escaped quote
676                    chars.next();
677                    current.push('"');
678                } else {
679                    in_quotes = false;
680                }
681            }
682            ',' if !in_quotes => {
683                fields.push(current.clone());
684                current.clear();
685            }
686            _ => {
687                current.push(c);
688            }
689        }
690    }
691
692    fields.push(current);
693
694    Ok(fields)
695}
696
697/// Convenience functions for common export/import operations
698pub mod utils {
699    use super::*;
700
701    /// Export namespace to JSON file
702    pub async fn export_json<S: VectorStorage>(
703        storage: S,
704        namespace: &str,
705        path: impl AsRef<Path>,
706    ) -> Result<ExportStats, DakeraError> {
707        let exporter = DataExporter::new(storage);
708        let config = ExportConfig {
709            format: ExportFormat::Json,
710            pretty_print: true,
711            ..Default::default()
712        };
713        exporter.export_to_file(namespace, path, &config).await
714    }
715
716    /// Export namespace to JSONL file (streaming-friendly)
717    pub async fn export_jsonl<S: VectorStorage>(
718        storage: S,
719        namespace: &str,
720        path: impl AsRef<Path>,
721    ) -> Result<ExportStats, DakeraError> {
722        let exporter = DataExporter::new(storage);
723        let config = ExportConfig {
724            format: ExportFormat::JsonLines,
725            ..Default::default()
726        };
727        exporter.export_to_file(namespace, path, &config).await
728    }
729
730    /// Export namespace to CSV file
731    pub async fn export_csv<S: VectorStorage>(
732        storage: S,
733        namespace: &str,
734        path: impl AsRef<Path>,
735    ) -> Result<ExportStats, DakeraError> {
736        let exporter = DataExporter::new(storage);
737        let config = ExportConfig {
738            format: ExportFormat::Csv,
739            ..Default::default()
740        };
741        exporter.export_to_file(namespace, path, &config).await
742    }
743
744    /// Import from file with auto-detection
745    pub async fn import_file<S: VectorStorage>(
746        storage: S,
747        path: impl AsRef<Path>,
748        namespace: &str,
749    ) -> Result<ImportStats, DakeraError> {
750        let importer = DataImporter::new(storage);
751        let config = ImportConfig {
752            format: ImportFormat::Auto,
753            namespace: namespace.to_string(),
754            ..Default::default()
755        };
756        importer.import_from_file(path, &config).await
757    }
758
759    /// Copy vectors between namespaces
760    pub async fn copy_namespace<S: VectorStorage>(
761        storage: &S,
762        source_namespace: &str,
763        target_namespace: &str,
764    ) -> Result<u64, DakeraError> {
765        let source_ns = source_namespace.to_string();
766        let target_ns = target_namespace.to_string();
767
768        let vectors = storage.get_all(&source_ns).await?;
769        let count = vectors.len();
770
771        storage.ensure_namespace(&target_ns).await?;
772        storage.upsert(&target_ns, vectors).await?;
773
774        Ok(count as u64)
775    }
776
777    /// Merge multiple namespaces into one
778    pub async fn merge_namespaces<S: VectorStorage>(
779        storage: &S,
780        source_namespaces: &[&str],
781        target_namespace: &str,
782    ) -> Result<u64, DakeraError> {
783        let mut total_count = 0u64;
784        let target_ns = target_namespace.to_string();
785
786        storage.ensure_namespace(&target_ns).await?;
787
788        for source in source_namespaces {
789            let source_ns = source.to_string();
790            let vectors = storage.get_all(&source_ns).await?;
791            let count = vectors.len();
792            storage.upsert(&target_ns, vectors).await?;
793            total_count += count as u64;
794        }
795
796        Ok(total_count)
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use crate::InMemoryStorage;
804
805    async fn create_test_storage() -> InMemoryStorage {
806        let storage = InMemoryStorage::new();
807        let ns = "test".to_string();
808        storage.ensure_namespace(&ns).await.unwrap();
809
810        let vectors = vec![
811            Vector {
812                id: "vec1".to_string(),
813                values: vec![1.0, 2.0, 3.0],
814                metadata: Some(serde_json::json!({"label": "first"})),
815                ttl_seconds: None,
816                expires_at: None,
817            },
818            Vector {
819                id: "vec2".to_string(),
820                values: vec![4.0, 5.0, 6.0],
821                metadata: Some(serde_json::json!({"label": "second"})),
822                ttl_seconds: Some(3600),
823                expires_at: None,
824            },
825        ];
826
827        storage.upsert(&ns, vectors).await.unwrap();
828        storage
829    }
830
831    #[tokio::test]
832    async fn test_export_json() {
833        let storage = create_test_storage().await;
834        let exporter = DataExporter::new(storage);
835
836        let config = ExportConfig {
837            format: ExportFormat::Json,
838            pretty_print: false,
839            ..Default::default()
840        };
841
842        let json = exporter.export_to_string("test", &config).await.unwrap();
843        let records: Vec<VectorRecord> = serde_json::from_str(&json).unwrap();
844
845        assert_eq!(records.len(), 2);
846        assert!(records.iter().any(|r| r.id == "vec1"));
847        assert!(records.iter().any(|r| r.id == "vec2"));
848    }
849
850    #[tokio::test]
851    async fn test_export_jsonl() {
852        let storage = create_test_storage().await;
853        let exporter = DataExporter::new(storage);
854
855        let config = ExportConfig {
856            format: ExportFormat::JsonLines,
857            ..Default::default()
858        };
859
860        let jsonl = exporter.export_to_string("test", &config).await.unwrap();
861        let lines: Vec<&str> = jsonl.lines().collect();
862
863        assert_eq!(lines.len(), 2);
864
865        for line in lines {
866            let _record: VectorRecord = serde_json::from_str(line).unwrap();
867        }
868    }
869
870    #[tokio::test]
871    async fn test_export_csv() {
872        let storage = create_test_storage().await;
873        let exporter = DataExporter::new(storage);
874
875        let config = ExportConfig {
876            format: ExportFormat::Csv,
877            ..Default::default()
878        };
879
880        let csv = exporter.export_to_string("test", &config).await.unwrap();
881        let lines: Vec<&str> = csv.lines().collect();
882
883        assert_eq!(lines.len(), 3); // header + 2 records
884        assert!(lines[0].contains("id,values,metadata"));
885    }
886
887    #[tokio::test]
888    async fn test_export_binary() {
889        let storage = create_test_storage().await;
890        let exporter = DataExporter::new(storage);
891
892        let config = ExportConfig {
893            format: ExportFormat::Binary,
894            ..Default::default()
895        };
896
897        let mut buffer = Vec::new();
898        let stats = exporter
899            .export_to_writer("test", &mut buffer, &config)
900            .await
901            .unwrap();
902
903        assert_eq!(stats.vectors_exported, 2);
904        assert!(buffer.starts_with(b"VPUF"));
905    }
906
907    #[tokio::test]
908    async fn test_import_json() {
909        let storage = InMemoryStorage::new();
910        let importer = DataImporter::new(storage.clone());
911
912        let json = r#"[
913            {"id": "import1", "values": [1.0, 2.0], "metadata": {"key": "value"}},
914            {"id": "import2", "values": [3.0, 4.0]}
915        ]"#;
916
917        let config = ImportConfig {
918            format: ImportFormat::Json,
919            namespace: "imported".to_string(),
920            ..Default::default()
921        };
922
923        let stats = importer.import_from_string(json, &config).await.unwrap();
924        assert_eq!(stats.vectors_imported, 2);
925
926        let ns = "imported".to_string();
927        let vectors = storage.get_all(&ns).await.unwrap();
928        assert_eq!(vectors.len(), 2);
929    }
930
931    #[tokio::test]
932    async fn test_import_jsonl() {
933        let storage = InMemoryStorage::new();
934        let importer = DataImporter::new(storage.clone());
935
936        let jsonl = r#"{"id": "line1", "values": [1.0, 2.0]}
937{"id": "line2", "values": [3.0, 4.0]}"#;
938
939        let config = ImportConfig {
940            format: ImportFormat::JsonLines,
941            namespace: "jsonl_ns".to_string(),
942            ..Default::default()
943        };
944
945        let stats = importer.import_from_string(jsonl, &config).await.unwrap();
946        assert_eq!(stats.vectors_imported, 2);
947    }
948
949    #[tokio::test]
950    async fn test_import_with_skip_invalid() {
951        let storage = InMemoryStorage::new();
952        let importer = DataImporter::new(storage.clone());
953
954        let jsonl = r#"{"id": "valid", "values": [1.0, 2.0]}
955this is not valid json
956{"id": "also_valid", "values": [3.0, 4.0]}"#;
957
958        let config = ImportConfig {
959            format: ImportFormat::JsonLines,
960            namespace: "skip_ns".to_string(),
961            skip_invalid: true,
962            ..Default::default()
963        };
964
965        let stats = importer.import_from_string(jsonl, &config).await.unwrap();
966        assert_eq!(stats.vectors_imported, 2);
967        assert_eq!(stats.vectors_skipped, 1);
968    }
969
970    #[tokio::test]
971    async fn test_roundtrip_binary() {
972        let storage = create_test_storage().await;
973        let exporter = DataExporter::new(storage.clone());
974
975        // Export to binary
976        let config = ExportConfig {
977            format: ExportFormat::Binary,
978            ..Default::default()
979        };
980
981        let mut buffer = Vec::new();
982        exporter
983            .export_to_writer("test", &mut buffer, &config)
984            .await
985            .unwrap();
986
987        // Import back
988        let new_storage = InMemoryStorage::new();
989        let importer = DataImporter::new(new_storage.clone());
990
991        let import_config = ImportConfig {
992            format: ImportFormat::Binary,
993            namespace: "roundtrip".to_string(),
994            ..Default::default()
995        };
996
997        let stats = importer
998            .import_from_reader(std::io::Cursor::new(buffer), &import_config)
999            .await
1000            .unwrap();
1001
1002        assert_eq!(stats.vectors_imported, 2);
1003
1004        let ns = "roundtrip".to_string();
1005        let vectors = new_storage.get_all(&ns).await.unwrap();
1006        assert_eq!(vectors.len(), 2);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_copy_namespace() {
1011        let storage = create_test_storage().await;
1012
1013        let count = utils::copy_namespace(&storage, "test", "copy")
1014            .await
1015            .unwrap();
1016        assert_eq!(count, 2);
1017
1018        let ns = "copy".to_string();
1019        let copied = storage.get_all(&ns).await.unwrap();
1020        assert_eq!(copied.len(), 2);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_merge_namespaces() {
1025        let storage = InMemoryStorage::new();
1026
1027        // Create two namespaces with vectors
1028        let ns1 = "ns1".to_string();
1029        let ns2 = "ns2".to_string();
1030        storage.ensure_namespace(&ns1).await.unwrap();
1031        storage.ensure_namespace(&ns2).await.unwrap();
1032
1033        storage
1034            .upsert(
1035                &ns1,
1036                vec![Vector {
1037                    id: "a".to_string(),
1038                    values: vec![1.0],
1039                    metadata: None,
1040                    ttl_seconds: None,
1041                    expires_at: None,
1042                }],
1043            )
1044            .await
1045            .unwrap();
1046
1047        storage
1048            .upsert(
1049                &ns2,
1050                vec![Vector {
1051                    id: "b".to_string(),
1052                    values: vec![2.0],
1053                    metadata: None,
1054                    ttl_seconds: None,
1055                    expires_at: None,
1056                }],
1057            )
1058            .await
1059            .unwrap();
1060
1061        let count = utils::merge_namespaces(&storage, &["ns1", "ns2"], "merged")
1062            .await
1063            .unwrap();
1064        assert_eq!(count, 2);
1065
1066        let merged_ns = "merged".to_string();
1067        let merged = storage.get_all(&merged_ns).await.unwrap();
1068        assert_eq!(merged.len(), 2);
1069    }
1070
1071    #[test]
1072    fn test_csv_field_parsing() {
1073        let fields = parse_csv_fields(r#""hello","world""#).unwrap();
1074        assert_eq!(fields, vec!["hello", "world"]);
1075
1076        let fields = parse_csv_fields(r#""has""quote","normal""#).unwrap();
1077        assert_eq!(fields, vec!["has\"quote", "normal"]);
1078
1079        let fields = parse_csv_fields(r#""has,comma","ok""#).unwrap();
1080        assert_eq!(fields, vec!["has,comma", "ok"]);
1081    }
1082
1083    #[test]
1084    fn test_vector_record_conversion() {
1085        let vector = Vector {
1086            id: "test".to_string(),
1087            values: vec![1.0, 2.0, 3.0],
1088            metadata: Some(serde_json::json!({"key": "value"})),
1089            ttl_seconds: Some(3600),
1090            expires_at: None,
1091        };
1092
1093        let record = VectorRecord::from(&vector);
1094        assert_eq!(record.id, "test");
1095        assert_eq!(record.values, Some(vec![1.0, 2.0, 3.0]));
1096        assert!(record.metadata.is_some());
1097        assert_eq!(record.ttl_seconds, Some(3600));
1098
1099        let back: Vector = record.into();
1100        assert_eq!(back.id, "test");
1101        assert_eq!(back.values, vec![1.0, 2.0, 3.0]);
1102    }
1103}