1use std::io::{BufRead, BufReader, BufWriter, Read, Write};
10use std::path::Path;
11
12use common::{DakeraError, Vector};
13use serde::{Deserialize, Serialize};
14
15use crate::traits::VectorStorage;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum ExportFormat {
20 #[default]
22 Json,
23 JsonLines,
25 Csv,
27 Binary,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
33pub enum ImportFormat {
34 #[default]
36 Auto,
37 Json,
39 JsonLines,
41 Csv,
43 Binary,
45}
46
47#[derive(Debug, Clone)]
49pub struct ExportConfig {
50 pub format: ExportFormat,
52 pub include_vectors: bool,
54 pub include_metadata: bool,
56 pub pretty_print: bool,
58 pub batch_size: usize,
60 pub compress: bool,
62}
63
64impl Default for ExportConfig {
65 fn default() -> Self {
66 Self {
67 format: ExportFormat::Json,
68 include_vectors: true,
69 include_metadata: true,
70 pretty_print: false,
71 batch_size: 10000,
72 compress: false,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct ImportConfig {
80 pub format: ImportFormat,
82 pub batch_size: usize,
84 pub skip_invalid: bool,
86 pub overwrite: bool,
88 pub namespace: String,
90}
91
92impl Default for ImportConfig {
93 fn default() -> Self {
94 Self {
95 format: ImportFormat::Auto,
96 batch_size: 1000,
97 skip_invalid: false,
98 overwrite: true,
99 namespace: "default".to_string(),
100 }
101 }
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct ExportStats {
107 pub vectors_exported: u64,
109 pub bytes_written: u64,
111 pub duration_ms: u64,
113 pub warnings: Vec<String>,
115}
116
117#[derive(Debug, Clone, Default)]
119pub struct ImportStats {
120 pub vectors_imported: u64,
122 pub vectors_skipped: u64,
124 pub bytes_read: u64,
126 pub duration_ms: u64,
128 pub warnings: Vec<String>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct VectorRecord {
135 pub id: String,
137 #[serde(skip_serializing_if = "Option::is_none")]
139 pub values: Option<Vec<f32>>,
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub metadata: Option<serde_json::Value>,
143 #[serde(skip_serializing_if = "Option::is_none")]
145 pub ttl_seconds: Option<u64>,
146}
147
148impl From<&Vector> for VectorRecord {
149 fn from(v: &Vector) -> Self {
150 Self {
151 id: v.id.clone(),
152 values: Some(v.values.clone()),
153 metadata: v.metadata.clone(),
154 ttl_seconds: v.ttl_seconds,
155 }
156 }
157}
158
159impl From<VectorRecord> for Vector {
160 fn from(r: VectorRecord) -> Self {
161 Self {
162 id: r.id,
163 values: r.values.unwrap_or_default(),
164 metadata: r.metadata,
165 ttl_seconds: r.ttl_seconds,
166 expires_at: None,
167 }
168 }
169}
170
171pub struct DataExporter<S: VectorStorage> {
173 storage: S,
174}
175
176impl<S: VectorStorage> DataExporter<S> {
177 pub fn new(storage: S) -> Self {
179 Self { storage }
180 }
181
182 pub async fn export_to_writer<W: Write>(
184 &self,
185 namespace: &str,
186 writer: W,
187 config: &ExportConfig,
188 ) -> Result<ExportStats, DakeraError> {
189 let start = std::time::Instant::now();
190 let mut stats = ExportStats::default();
191
192 let ns = namespace.to_string();
193 let vectors = self.storage.get_all(&ns).await?;
194
195 let records: Vec<VectorRecord> = vectors
196 .iter()
197 .map(|v| {
198 let mut record = VectorRecord::from(v);
199 if !config.include_vectors {
200 record.values = None;
201 }
202 if !config.include_metadata {
203 record.metadata = None;
204 }
205 record
206 })
207 .collect();
208
209 stats.vectors_exported = records.len() as u64;
210
211 let bytes_written = match config.format {
212 ExportFormat::Json => self.write_json(writer, &records, config.pretty_print)?,
213 ExportFormat::JsonLines => self.write_jsonl(writer, &records)?,
214 ExportFormat::Csv => self.write_csv(writer, &records)?,
215 ExportFormat::Binary => self.write_binary(writer, &records)?,
216 };
217
218 stats.bytes_written = bytes_written;
219 stats.duration_ms = start.elapsed().as_millis() as u64;
220
221 Ok(stats)
222 }
223
224 pub async fn export_to_file(
226 &self,
227 namespace: &str,
228 path: impl AsRef<Path>,
229 config: &ExportConfig,
230 ) -> Result<ExportStats, DakeraError> {
231 let file = std::fs::File::create(path.as_ref())
232 .map_err(|e| DakeraError::Storage(e.to_string()))?;
233 let writer = BufWriter::new(file);
234 self.export_to_writer(namespace, writer, config).await
235 }
236
237 pub async fn export_to_string(
239 &self,
240 namespace: &str,
241 config: &ExportConfig,
242 ) -> Result<String, DakeraError> {
243 let mut buffer = Vec::new();
244 self.export_to_writer(namespace, &mut buffer, config)
245 .await?;
246 String::from_utf8(buffer).map_err(|e| DakeraError::Storage(e.to_string()))
247 }
248
249 fn write_json<W: Write>(
250 &self,
251 mut writer: W,
252 records: &[VectorRecord],
253 pretty: bool,
254 ) -> Result<u64, DakeraError> {
255 let json = if pretty {
256 serde_json::to_string_pretty(records)
257 } else {
258 serde_json::to_string(records)
259 }
260 .map_err(|e| DakeraError::Storage(e.to_string()))?;
261
262 let bytes = json.as_bytes();
263 writer
264 .write_all(bytes)
265 .map_err(|e| DakeraError::Storage(e.to_string()))?;
266
267 Ok(bytes.len() as u64)
268 }
269
270 fn write_jsonl<W: Write>(
271 &self,
272 mut writer: W,
273 records: &[VectorRecord],
274 ) -> Result<u64, DakeraError> {
275 let mut total_bytes = 0u64;
276
277 for record in records {
278 let line =
279 serde_json::to_string(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
280 writeln!(writer, "{}", line).map_err(|e| DakeraError::Storage(e.to_string()))?;
281 total_bytes += line.len() as u64 + 1;
282 }
283
284 Ok(total_bytes)
285 }
286
287 fn write_csv<W: Write>(
288 &self,
289 mut writer: W,
290 records: &[VectorRecord],
291 ) -> Result<u64, DakeraError> {
292 let mut total_bytes = 0u64;
293
294 let header = "id,values,metadata,ttl_seconds\n";
296 writer
297 .write_all(header.as_bytes())
298 .map_err(|e| DakeraError::Storage(e.to_string()))?;
299 total_bytes += header.len() as u64;
300
301 for record in records {
302 let values_json = record
303 .values
304 .as_ref()
305 .map(|v| serde_json::to_string(v).unwrap_or_default())
306 .unwrap_or_default();
307
308 let metadata_json = record
309 .metadata
310 .as_ref()
311 .map(|m| serde_json::to_string(m).unwrap_or_default())
312 .unwrap_or_default();
313
314 let ttl = record
315 .ttl_seconds
316 .map(|t| t.to_string())
317 .unwrap_or_default();
318
319 let line = format!(
320 "\"{}\",\"{}\",\"{}\",{}\n",
321 escape_csv(&record.id),
322 escape_csv(&values_json),
323 escape_csv(&metadata_json),
324 ttl
325 );
326
327 writer
328 .write_all(line.as_bytes())
329 .map_err(|e| DakeraError::Storage(e.to_string()))?;
330 total_bytes += line.len() as u64;
331 }
332
333 Ok(total_bytes)
334 }
335
336 fn write_binary<W: Write>(
337 &self,
338 mut writer: W,
339 records: &[VectorRecord],
340 ) -> Result<u64, DakeraError> {
341 let mut total_bytes = 0u64;
342
343 let magic = b"VPUF";
345 let version: u32 = 1;
346 writer
347 .write_all(magic)
348 .map_err(|e| DakeraError::Storage(e.to_string()))?;
349 writer
350 .write_all(&version.to_le_bytes())
351 .map_err(|e| DakeraError::Storage(e.to_string()))?;
352 total_bytes += 8;
353
354 let count = records.len() as u64;
356 writer
357 .write_all(&count.to_le_bytes())
358 .map_err(|e| DakeraError::Storage(e.to_string()))?;
359 total_bytes += 8;
360
361 for record in records {
363 let json =
364 serde_json::to_vec(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
365 let len = json.len() as u32;
366 writer
367 .write_all(&len.to_le_bytes())
368 .map_err(|e| DakeraError::Storage(e.to_string()))?;
369 writer
370 .write_all(&json)
371 .map_err(|e| DakeraError::Storage(e.to_string()))?;
372 total_bytes += 4 + json.len() as u64;
373 }
374
375 Ok(total_bytes)
376 }
377}
378
379pub struct DataImporter<S: VectorStorage> {
381 storage: S,
382}
383
384impl<S: VectorStorage> DataImporter<S> {
385 pub fn new(storage: S) -> Self {
387 Self { storage }
388 }
389
390 pub async fn import_from_reader<R: Read>(
392 &self,
393 reader: R,
394 config: &ImportConfig,
395 ) -> Result<ImportStats, DakeraError> {
396 let start = std::time::Instant::now();
397 let mut stats = ImportStats::default();
398
399 let format = if config.format == ImportFormat::Auto {
400 ImportFormat::Json
403 } else {
404 config.format
405 };
406
407 let records = match format {
408 ImportFormat::Json | ImportFormat::Auto => self.read_json(reader)?,
409 ImportFormat::JsonLines => self.read_jsonl(reader, config.skip_invalid, &mut stats)?,
410 ImportFormat::Csv => self.read_csv(reader, config.skip_invalid, &mut stats)?,
411 ImportFormat::Binary => self.read_binary(reader)?,
412 };
413
414 self.storage.ensure_namespace(&config.namespace).await?;
416
417 let vectors: Vec<Vector> = records.into_iter().map(Vector::from).collect();
419
420 for chunk in vectors.chunks(config.batch_size) {
421 let batch: Vec<Vector> = chunk.to_vec();
422 let count = self.storage.upsert(&config.namespace, batch).await?;
423 stats.vectors_imported += count as u64;
424 }
425
426 stats.duration_ms = start.elapsed().as_millis() as u64;
427
428 Ok(stats)
429 }
430
431 pub async fn import_from_file(
433 &self,
434 path: impl AsRef<Path>,
435 config: &ImportConfig,
436 ) -> Result<ImportStats, DakeraError> {
437 let path = path.as_ref();
438
439 let mut config = config.clone();
441 if config.format == ImportFormat::Auto {
442 config.format = match path.extension().and_then(|e| e.to_str()) {
443 Some("json") => ImportFormat::Json,
444 Some("jsonl") | Some("ndjson") => ImportFormat::JsonLines,
445 Some("csv") => ImportFormat::Csv,
446 Some("bin") | Some("vpuf") => ImportFormat::Binary,
447 _ => ImportFormat::Json,
448 };
449 }
450
451 let file = std::fs::File::open(path).map_err(|e| DakeraError::Storage(e.to_string()))?;
452 let reader = BufReader::new(file);
453
454 let mut stats = self.import_from_reader(reader, &config).await?;
455 stats.bytes_read = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
456
457 Ok(stats)
458 }
459
460 pub async fn import_from_string(
462 &self,
463 data: &str,
464 config: &ImportConfig,
465 ) -> Result<ImportStats, DakeraError> {
466 let reader = std::io::Cursor::new(data.as_bytes());
467 self.import_from_reader(reader, config).await
468 }
469
470 fn read_json<R: Read>(&self, reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
471 serde_json::from_reader(reader).map_err(|e| DakeraError::Storage(e.to_string()))
472 }
473
474 fn read_jsonl<R: Read>(
475 &self,
476 reader: R,
477 skip_invalid: bool,
478 stats: &mut ImportStats,
479 ) -> Result<Vec<VectorRecord>, DakeraError> {
480 let buf_reader = BufReader::new(reader);
481 let mut records = Vec::new();
482
483 for line in buf_reader.lines() {
484 let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
485 if line.trim().is_empty() {
486 continue;
487 }
488
489 match serde_json::from_str::<VectorRecord>(&line) {
490 Ok(record) => records.push(record),
491 Err(e) => {
492 if skip_invalid {
493 stats.vectors_skipped += 1;
494 stats
495 .warnings
496 .push(format!("Skipped invalid record: {}", e));
497 } else {
498 return Err(DakeraError::Storage(format!("Invalid JSON record: {}", e)));
499 }
500 }
501 }
502 }
503
504 Ok(records)
505 }
506
507 fn read_csv<R: Read>(
508 &self,
509 reader: R,
510 skip_invalid: bool,
511 stats: &mut ImportStats,
512 ) -> Result<Vec<VectorRecord>, DakeraError> {
513 let buf_reader = BufReader::new(reader);
514 let mut records = Vec::new();
515 let mut lines = buf_reader.lines();
516
517 let _ = lines.next();
519
520 for line in lines {
521 let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
522 if line.trim().is_empty() {
523 continue;
524 }
525
526 match parse_csv_line(&line) {
527 Ok(record) => records.push(record),
528 Err(e) => {
529 if skip_invalid {
530 stats.vectors_skipped += 1;
531 stats
532 .warnings
533 .push(format!("Skipped invalid CSV row: {}", e));
534 } else {
535 return Err(DakeraError::Storage(format!("Invalid CSV row: {}", e)));
536 }
537 }
538 }
539 }
540
541 Ok(records)
542 }
543
544 fn read_binary<R: Read>(&self, mut reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
545 let mut magic = [0u8; 4];
547 reader
548 .read_exact(&mut magic)
549 .map_err(|e| DakeraError::Storage(e.to_string()))?;
550
551 if &magic != b"VPUF" {
552 return Err(DakeraError::Storage(
553 "Invalid binary format: bad magic number".to_string(),
554 ));
555 }
556
557 let mut version_bytes = [0u8; 4];
559 reader
560 .read_exact(&mut version_bytes)
561 .map_err(|e| DakeraError::Storage(e.to_string()))?;
562 let version = u32::from_le_bytes(version_bytes);
563
564 if version != 1 {
565 return Err(DakeraError::Storage(format!(
566 "Unsupported binary format version: {}",
567 version
568 )));
569 }
570
571 let mut count_bytes = [0u8; 8];
573 reader
574 .read_exact(&mut count_bytes)
575 .map_err(|e| DakeraError::Storage(e.to_string()))?;
576 let count = u64::from_le_bytes(count_bytes);
577
578 const MAX_IMPORT_RECORDS: u64 = 10_000_000;
579 if count > MAX_IMPORT_RECORDS {
580 return Err(DakeraError::InvalidRequest(format!(
581 "Binary file declares {} records which exceeds import limit of {}",
582 count, MAX_IMPORT_RECORDS
583 )));
584 }
585 let mut records = Vec::with_capacity(count as usize);
586
587 for _ in 0..count {
589 let mut len_bytes = [0u8; 4];
590 reader
591 .read_exact(&mut len_bytes)
592 .map_err(|e| DakeraError::Storage(e.to_string()))?;
593 let len = u32::from_le_bytes(len_bytes);
594
595 const MAX_RECORD_BYTES: u32 = 10 * 1024 * 1024; if len > MAX_RECORD_BYTES {
597 return Err(DakeraError::InvalidRequest(format!(
598 "Record size {} bytes exceeds maximum of {} bytes",
599 len, MAX_RECORD_BYTES
600 )));
601 }
602 let len = len as usize;
603
604 let mut json_bytes = vec![0u8; len];
605 reader
606 .read_exact(&mut json_bytes)
607 .map_err(|e| DakeraError::Storage(e.to_string()))?;
608
609 let record: VectorRecord = serde_json::from_slice(&json_bytes)
610 .map_err(|e| DakeraError::Storage(e.to_string()))?;
611 records.push(record);
612 }
613
614 Ok(records)
615 }
616}
617
618fn escape_csv(s: &str) -> String {
620 s.replace('"', "\"\"")
621}
622
623fn parse_csv_line(line: &str) -> Result<VectorRecord, String> {
625 let fields = parse_csv_fields(line)?;
627
628 if fields.len() < 2 {
629 return Err("CSV line must have at least id and values".to_string());
630 }
631
632 let id = fields[0].clone();
633
634 let values: Option<Vec<f32>> = if fields.len() > 1 && !fields[1].is_empty() {
635 let values_str = &fields[1];
636 serde_json::from_str(values_str).map_err(|e| format!("Invalid values JSON: {}", e))?
637 } else {
638 None
639 };
640
641 let metadata: Option<serde_json::Value> = if fields.len() > 2 && !fields[2].is_empty() {
642 serde_json::from_str(&fields[2]).ok()
643 } else {
644 None
645 };
646
647 let ttl_seconds: Option<u64> = if fields.len() > 3 && !fields[3].is_empty() {
648 fields[3].parse().ok()
649 } else {
650 None
651 };
652
653 Ok(VectorRecord {
654 id,
655 values,
656 metadata,
657 ttl_seconds,
658 })
659}
660
661fn parse_csv_fields(line: &str) -> Result<Vec<String>, String> {
663 let mut fields = Vec::new();
664 let mut current = String::new();
665 let mut in_quotes = false;
666 let mut chars = line.chars().peekable();
667
668 while let Some(c) = chars.next() {
669 match c {
670 '"' if !in_quotes => {
671 in_quotes = true;
672 }
673 '"' if in_quotes => {
674 if chars.peek() == Some(&'"') {
675 chars.next();
677 current.push('"');
678 } else {
679 in_quotes = false;
680 }
681 }
682 ',' if !in_quotes => {
683 fields.push(current.clone());
684 current.clear();
685 }
686 _ => {
687 current.push(c);
688 }
689 }
690 }
691
692 fields.push(current);
693
694 Ok(fields)
695}
696
697pub mod utils {
699 use super::*;
700
701 pub async fn export_json<S: VectorStorage>(
703 storage: S,
704 namespace: &str,
705 path: impl AsRef<Path>,
706 ) -> Result<ExportStats, DakeraError> {
707 let exporter = DataExporter::new(storage);
708 let config = ExportConfig {
709 format: ExportFormat::Json,
710 pretty_print: true,
711 ..Default::default()
712 };
713 exporter.export_to_file(namespace, path, &config).await
714 }
715
716 pub async fn export_jsonl<S: VectorStorage>(
718 storage: S,
719 namespace: &str,
720 path: impl AsRef<Path>,
721 ) -> Result<ExportStats, DakeraError> {
722 let exporter = DataExporter::new(storage);
723 let config = ExportConfig {
724 format: ExportFormat::JsonLines,
725 ..Default::default()
726 };
727 exporter.export_to_file(namespace, path, &config).await
728 }
729
730 pub async fn export_csv<S: VectorStorage>(
732 storage: S,
733 namespace: &str,
734 path: impl AsRef<Path>,
735 ) -> Result<ExportStats, DakeraError> {
736 let exporter = DataExporter::new(storage);
737 let config = ExportConfig {
738 format: ExportFormat::Csv,
739 ..Default::default()
740 };
741 exporter.export_to_file(namespace, path, &config).await
742 }
743
744 pub async fn import_file<S: VectorStorage>(
746 storage: S,
747 path: impl AsRef<Path>,
748 namespace: &str,
749 ) -> Result<ImportStats, DakeraError> {
750 let importer = DataImporter::new(storage);
751 let config = ImportConfig {
752 format: ImportFormat::Auto,
753 namespace: namespace.to_string(),
754 ..Default::default()
755 };
756 importer.import_from_file(path, &config).await
757 }
758
759 pub async fn copy_namespace<S: VectorStorage>(
761 storage: &S,
762 source_namespace: &str,
763 target_namespace: &str,
764 ) -> Result<u64, DakeraError> {
765 let source_ns = source_namespace.to_string();
766 let target_ns = target_namespace.to_string();
767
768 let vectors = storage.get_all(&source_ns).await?;
769 let count = vectors.len();
770
771 storage.ensure_namespace(&target_ns).await?;
772 storage.upsert(&target_ns, vectors).await?;
773
774 Ok(count as u64)
775 }
776
777 pub async fn merge_namespaces<S: VectorStorage>(
779 storage: &S,
780 source_namespaces: &[&str],
781 target_namespace: &str,
782 ) -> Result<u64, DakeraError> {
783 let mut total_count = 0u64;
784 let target_ns = target_namespace.to_string();
785
786 storage.ensure_namespace(&target_ns).await?;
787
788 for source in source_namespaces {
789 let source_ns = source.to_string();
790 let vectors = storage.get_all(&source_ns).await?;
791 let count = vectors.len();
792 storage.upsert(&target_ns, vectors).await?;
793 total_count += count as u64;
794 }
795
796 Ok(total_count)
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use crate::InMemoryStorage;
804
805 async fn create_test_storage() -> InMemoryStorage {
806 let storage = InMemoryStorage::new();
807 let ns = "test".to_string();
808 storage.ensure_namespace(&ns).await.unwrap();
809
810 let vectors = vec![
811 Vector {
812 id: "vec1".to_string(),
813 values: vec![1.0, 2.0, 3.0],
814 metadata: Some(serde_json::json!({"label": "first"})),
815 ttl_seconds: None,
816 expires_at: None,
817 },
818 Vector {
819 id: "vec2".to_string(),
820 values: vec![4.0, 5.0, 6.0],
821 metadata: Some(serde_json::json!({"label": "second"})),
822 ttl_seconds: Some(3600),
823 expires_at: None,
824 },
825 ];
826
827 storage.upsert(&ns, vectors).await.unwrap();
828 storage
829 }
830
831 #[tokio::test]
832 async fn test_export_json() {
833 let storage = create_test_storage().await;
834 let exporter = DataExporter::new(storage);
835
836 let config = ExportConfig {
837 format: ExportFormat::Json,
838 pretty_print: false,
839 ..Default::default()
840 };
841
842 let json = exporter.export_to_string("test", &config).await.unwrap();
843 let records: Vec<VectorRecord> = serde_json::from_str(&json).unwrap();
844
845 assert_eq!(records.len(), 2);
846 assert!(records.iter().any(|r| r.id == "vec1"));
847 assert!(records.iter().any(|r| r.id == "vec2"));
848 }
849
850 #[tokio::test]
851 async fn test_export_jsonl() {
852 let storage = create_test_storage().await;
853 let exporter = DataExporter::new(storage);
854
855 let config = ExportConfig {
856 format: ExportFormat::JsonLines,
857 ..Default::default()
858 };
859
860 let jsonl = exporter.export_to_string("test", &config).await.unwrap();
861 let lines: Vec<&str> = jsonl.lines().collect();
862
863 assert_eq!(lines.len(), 2);
864
865 for line in lines {
866 let _record: VectorRecord = serde_json::from_str(line).unwrap();
867 }
868 }
869
870 #[tokio::test]
871 async fn test_export_csv() {
872 let storage = create_test_storage().await;
873 let exporter = DataExporter::new(storage);
874
875 let config = ExportConfig {
876 format: ExportFormat::Csv,
877 ..Default::default()
878 };
879
880 let csv = exporter.export_to_string("test", &config).await.unwrap();
881 let lines: Vec<&str> = csv.lines().collect();
882
883 assert_eq!(lines.len(), 3); assert!(lines[0].contains("id,values,metadata"));
885 }
886
887 #[tokio::test]
888 async fn test_export_binary() {
889 let storage = create_test_storage().await;
890 let exporter = DataExporter::new(storage);
891
892 let config = ExportConfig {
893 format: ExportFormat::Binary,
894 ..Default::default()
895 };
896
897 let mut buffer = Vec::new();
898 let stats = exporter
899 .export_to_writer("test", &mut buffer, &config)
900 .await
901 .unwrap();
902
903 assert_eq!(stats.vectors_exported, 2);
904 assert!(buffer.starts_with(b"VPUF"));
905 }
906
907 #[tokio::test]
908 async fn test_import_json() {
909 let storage = InMemoryStorage::new();
910 let importer = DataImporter::new(storage.clone());
911
912 let json = r#"[
913 {"id": "import1", "values": [1.0, 2.0], "metadata": {"key": "value"}},
914 {"id": "import2", "values": [3.0, 4.0]}
915 ]"#;
916
917 let config = ImportConfig {
918 format: ImportFormat::Json,
919 namespace: "imported".to_string(),
920 ..Default::default()
921 };
922
923 let stats = importer.import_from_string(json, &config).await.unwrap();
924 assert_eq!(stats.vectors_imported, 2);
925
926 let ns = "imported".to_string();
927 let vectors = storage.get_all(&ns).await.unwrap();
928 assert_eq!(vectors.len(), 2);
929 }
930
931 #[tokio::test]
932 async fn test_import_jsonl() {
933 let storage = InMemoryStorage::new();
934 let importer = DataImporter::new(storage.clone());
935
936 let jsonl = r#"{"id": "line1", "values": [1.0, 2.0]}
937{"id": "line2", "values": [3.0, 4.0]}"#;
938
939 let config = ImportConfig {
940 format: ImportFormat::JsonLines,
941 namespace: "jsonl_ns".to_string(),
942 ..Default::default()
943 };
944
945 let stats = importer.import_from_string(jsonl, &config).await.unwrap();
946 assert_eq!(stats.vectors_imported, 2);
947 }
948
949 #[tokio::test]
950 async fn test_import_with_skip_invalid() {
951 let storage = InMemoryStorage::new();
952 let importer = DataImporter::new(storage.clone());
953
954 let jsonl = r#"{"id": "valid", "values": [1.0, 2.0]}
955this is not valid json
956{"id": "also_valid", "values": [3.0, 4.0]}"#;
957
958 let config = ImportConfig {
959 format: ImportFormat::JsonLines,
960 namespace: "skip_ns".to_string(),
961 skip_invalid: true,
962 ..Default::default()
963 };
964
965 let stats = importer.import_from_string(jsonl, &config).await.unwrap();
966 assert_eq!(stats.vectors_imported, 2);
967 assert_eq!(stats.vectors_skipped, 1);
968 }
969
970 #[tokio::test]
971 async fn test_roundtrip_binary() {
972 let storage = create_test_storage().await;
973 let exporter = DataExporter::new(storage.clone());
974
975 let config = ExportConfig {
977 format: ExportFormat::Binary,
978 ..Default::default()
979 };
980
981 let mut buffer = Vec::new();
982 exporter
983 .export_to_writer("test", &mut buffer, &config)
984 .await
985 .unwrap();
986
987 let new_storage = InMemoryStorage::new();
989 let importer = DataImporter::new(new_storage.clone());
990
991 let import_config = ImportConfig {
992 format: ImportFormat::Binary,
993 namespace: "roundtrip".to_string(),
994 ..Default::default()
995 };
996
997 let stats = importer
998 .import_from_reader(std::io::Cursor::new(buffer), &import_config)
999 .await
1000 .unwrap();
1001
1002 assert_eq!(stats.vectors_imported, 2);
1003
1004 let ns = "roundtrip".to_string();
1005 let vectors = new_storage.get_all(&ns).await.unwrap();
1006 assert_eq!(vectors.len(), 2);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_copy_namespace() {
1011 let storage = create_test_storage().await;
1012
1013 let count = utils::copy_namespace(&storage, "test", "copy")
1014 .await
1015 .unwrap();
1016 assert_eq!(count, 2);
1017
1018 let ns = "copy".to_string();
1019 let copied = storage.get_all(&ns).await.unwrap();
1020 assert_eq!(copied.len(), 2);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_merge_namespaces() {
1025 let storage = InMemoryStorage::new();
1026
1027 let ns1 = "ns1".to_string();
1029 let ns2 = "ns2".to_string();
1030 storage.ensure_namespace(&ns1).await.unwrap();
1031 storage.ensure_namespace(&ns2).await.unwrap();
1032
1033 storage
1034 .upsert(
1035 &ns1,
1036 vec![Vector {
1037 id: "a".to_string(),
1038 values: vec![1.0],
1039 metadata: None,
1040 ttl_seconds: None,
1041 expires_at: None,
1042 }],
1043 )
1044 .await
1045 .unwrap();
1046
1047 storage
1048 .upsert(
1049 &ns2,
1050 vec![Vector {
1051 id: "b".to_string(),
1052 values: vec![2.0],
1053 metadata: None,
1054 ttl_seconds: None,
1055 expires_at: None,
1056 }],
1057 )
1058 .await
1059 .unwrap();
1060
1061 let count = utils::merge_namespaces(&storage, &["ns1", "ns2"], "merged")
1062 .await
1063 .unwrap();
1064 assert_eq!(count, 2);
1065
1066 let merged_ns = "merged".to_string();
1067 let merged = storage.get_all(&merged_ns).await.unwrap();
1068 assert_eq!(merged.len(), 2);
1069 }
1070
1071 #[test]
1072 fn test_csv_field_parsing() {
1073 let fields = parse_csv_fields(r#""hello","world""#).unwrap();
1074 assert_eq!(fields, vec!["hello", "world"]);
1075
1076 let fields = parse_csv_fields(r#""has""quote","normal""#).unwrap();
1077 assert_eq!(fields, vec!["has\"quote", "normal"]);
1078
1079 let fields = parse_csv_fields(r#""has,comma","ok""#).unwrap();
1080 assert_eq!(fields, vec!["has,comma", "ok"]);
1081 }
1082
1083 #[test]
1084 fn test_vector_record_conversion() {
1085 let vector = Vector {
1086 id: "test".to_string(),
1087 values: vec![1.0, 2.0, 3.0],
1088 metadata: Some(serde_json::json!({"key": "value"})),
1089 ttl_seconds: Some(3600),
1090 expires_at: None,
1091 };
1092
1093 let record = VectorRecord::from(&vector);
1094 assert_eq!(record.id, "test");
1095 assert_eq!(record.values, Some(vec![1.0, 2.0, 3.0]));
1096 assert!(record.metadata.is_some());
1097 assert_eq!(record.ttl_seconds, Some(3600));
1098
1099 let back: Vector = record.into();
1100 assert_eq!(back.id, "test");
1101 assert_eq!(back.values, vec![1.0, 2.0, 3.0]);
1102 }
1103}