use std::io::{BufRead, BufReader, BufWriter, Read, Write};
use std::path::Path;
use common::{DakeraError, Vector};
use serde::{Deserialize, Serialize};
use crate::traits::VectorStorage;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ExportFormat {
#[default]
Json,
JsonLines,
Csv,
Binary,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ImportFormat {
#[default]
Auto,
Json,
JsonLines,
Csv,
Binary,
}
#[derive(Debug, Clone)]
pub struct ExportConfig {
pub format: ExportFormat,
pub include_vectors: bool,
pub include_metadata: bool,
pub pretty_print: bool,
pub batch_size: usize,
pub compress: bool,
}
impl Default for ExportConfig {
fn default() -> Self {
Self {
format: ExportFormat::Json,
include_vectors: true,
include_metadata: true,
pretty_print: false,
batch_size: 10000,
compress: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ImportConfig {
pub format: ImportFormat,
pub batch_size: usize,
pub skip_invalid: bool,
pub overwrite: bool,
pub namespace: String,
}
impl Default for ImportConfig {
fn default() -> Self {
Self {
format: ImportFormat::Auto,
batch_size: 1000,
skip_invalid: false,
overwrite: true,
namespace: "default".to_string(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ExportStats {
pub vectors_exported: u64,
pub bytes_written: u64,
pub duration_ms: u64,
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Default)]
pub struct ImportStats {
pub vectors_imported: u64,
pub vectors_skipped: u64,
pub bytes_read: u64,
pub duration_ms: u64,
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorRecord {
pub id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub values: Option<Vec<f32>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ttl_seconds: Option<u64>,
}
impl From<&Vector> for VectorRecord {
fn from(v: &Vector) -> Self {
Self {
id: v.id.clone(),
values: Some(v.values.clone()),
metadata: v.metadata.clone(),
ttl_seconds: v.ttl_seconds,
}
}
}
impl From<VectorRecord> for Vector {
fn from(r: VectorRecord) -> Self {
Self {
id: r.id,
values: r.values.unwrap_or_default(),
metadata: r.metadata,
ttl_seconds: r.ttl_seconds,
expires_at: None,
}
}
}
pub struct DataExporter<S: VectorStorage> {
storage: S,
}
impl<S: VectorStorage> DataExporter<S> {
pub fn new(storage: S) -> Self {
Self { storage }
}
pub async fn export_to_writer<W: Write>(
&self,
namespace: &str,
writer: W,
config: &ExportConfig,
) -> Result<ExportStats, DakeraError> {
let start = std::time::Instant::now();
let mut stats = ExportStats::default();
let ns = namespace.to_string();
let vectors = self.storage.get_all(&ns).await?;
let records: Vec<VectorRecord> = vectors
.iter()
.map(|v| {
let mut record = VectorRecord::from(v);
if !config.include_vectors {
record.values = None;
}
if !config.include_metadata {
record.metadata = None;
}
record
})
.collect();
stats.vectors_exported = records.len() as u64;
let bytes_written = match config.format {
ExportFormat::Json => self.write_json(writer, &records, config.pretty_print)?,
ExportFormat::JsonLines => self.write_jsonl(writer, &records)?,
ExportFormat::Csv => self.write_csv(writer, &records)?,
ExportFormat::Binary => self.write_binary(writer, &records)?,
};
stats.bytes_written = bytes_written;
stats.duration_ms = start.elapsed().as_millis() as u64;
Ok(stats)
}
pub async fn export_to_file(
&self,
namespace: &str,
path: impl AsRef<Path>,
config: &ExportConfig,
) -> Result<ExportStats, DakeraError> {
let file = std::fs::File::create(path.as_ref())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let writer = BufWriter::new(file);
self.export_to_writer(namespace, writer, config).await
}
pub async fn export_to_string(
&self,
namespace: &str,
config: &ExportConfig,
) -> Result<String, DakeraError> {
let mut buffer = Vec::new();
self.export_to_writer(namespace, &mut buffer, config)
.await?;
String::from_utf8(buffer).map_err(|e| DakeraError::Storage(e.to_string()))
}
fn write_json<W: Write>(
&self,
mut writer: W,
records: &[VectorRecord],
pretty: bool,
) -> Result<u64, DakeraError> {
let json = if pretty {
serde_json::to_string_pretty(records)
} else {
serde_json::to_string(records)
}
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let bytes = json.as_bytes();
writer
.write_all(bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
Ok(bytes.len() as u64)
}
fn write_jsonl<W: Write>(
&self,
mut writer: W,
records: &[VectorRecord],
) -> Result<u64, DakeraError> {
let mut total_bytes = 0u64;
for record in records {
let line =
serde_json::to_string(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
writeln!(writer, "{}", line).map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += line.len() as u64 + 1;
}
Ok(total_bytes)
}
fn write_csv<W: Write>(
&self,
mut writer: W,
records: &[VectorRecord],
) -> Result<u64, DakeraError> {
let mut total_bytes = 0u64;
let header = "id,values,metadata,ttl_seconds\n";
writer
.write_all(header.as_bytes())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += header.len() as u64;
for record in records {
let values_json = record
.values
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default())
.unwrap_or_default();
let metadata_json = record
.metadata
.as_ref()
.map(|m| serde_json::to_string(m).unwrap_or_default())
.unwrap_or_default();
let ttl = record
.ttl_seconds
.map(|t| t.to_string())
.unwrap_or_default();
let line = format!(
"\"{}\",\"{}\",\"{}\",{}\n",
escape_csv(&record.id),
escape_csv(&values_json),
escape_csv(&metadata_json),
ttl
);
writer
.write_all(line.as_bytes())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += line.len() as u64;
}
Ok(total_bytes)
}
fn write_binary<W: Write>(
&self,
mut writer: W,
records: &[VectorRecord],
) -> Result<u64, DakeraError> {
let mut total_bytes = 0u64;
let magic = b"VPUF";
let version: u32 = 1;
writer
.write_all(magic)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
writer
.write_all(&version.to_le_bytes())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += 8;
let count = records.len() as u64;
writer
.write_all(&count.to_le_bytes())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += 8;
for record in records {
let json =
serde_json::to_vec(record).map_err(|e| DakeraError::Storage(e.to_string()))?;
let len = json.len() as u32;
writer
.write_all(&len.to_le_bytes())
.map_err(|e| DakeraError::Storage(e.to_string()))?;
writer
.write_all(&json)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
total_bytes += 4 + json.len() as u64;
}
Ok(total_bytes)
}
}
pub struct DataImporter<S: VectorStorage> {
storage: S,
}
impl<S: VectorStorage> DataImporter<S> {
pub fn new(storage: S) -> Self {
Self { storage }
}
pub async fn import_from_reader<R: Read>(
&self,
reader: R,
config: &ImportConfig,
) -> Result<ImportStats, DakeraError> {
let start = std::time::Instant::now();
let mut stats = ImportStats::default();
let format = if config.format == ImportFormat::Auto {
ImportFormat::Json
} else {
config.format
};
let records = match format {
ImportFormat::Json | ImportFormat::Auto => self.read_json(reader)?,
ImportFormat::JsonLines => self.read_jsonl(reader, config.skip_invalid, &mut stats)?,
ImportFormat::Csv => self.read_csv(reader, config.skip_invalid, &mut stats)?,
ImportFormat::Binary => self.read_binary(reader)?,
};
self.storage.ensure_namespace(&config.namespace).await?;
let vectors: Vec<Vector> = records.into_iter().map(Vector::from).collect();
for chunk in vectors.chunks(config.batch_size) {
let batch: Vec<Vector> = chunk.to_vec();
let count = self.storage.upsert(&config.namespace, batch).await?;
stats.vectors_imported += count as u64;
}
stats.duration_ms = start.elapsed().as_millis() as u64;
Ok(stats)
}
pub async fn import_from_file(
&self,
path: impl AsRef<Path>,
config: &ImportConfig,
) -> Result<ImportStats, DakeraError> {
let path = path.as_ref();
let mut config = config.clone();
if config.format == ImportFormat::Auto {
config.format = match path.extension().and_then(|e| e.to_str()) {
Some("json") => ImportFormat::Json,
Some("jsonl") | Some("ndjson") => ImportFormat::JsonLines,
Some("csv") => ImportFormat::Csv,
Some("bin") | Some("vpuf") => ImportFormat::Binary,
_ => ImportFormat::Json,
};
}
let file = std::fs::File::open(path).map_err(|e| DakeraError::Storage(e.to_string()))?;
let reader = BufReader::new(file);
let mut stats = self.import_from_reader(reader, &config).await?;
stats.bytes_read = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
Ok(stats)
}
pub async fn import_from_string(
&self,
data: &str,
config: &ImportConfig,
) -> Result<ImportStats, DakeraError> {
let reader = std::io::Cursor::new(data.as_bytes());
self.import_from_reader(reader, config).await
}
fn read_json<R: Read>(&self, reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
serde_json::from_reader(reader).map_err(|e| DakeraError::Storage(e.to_string()))
}
fn read_jsonl<R: Read>(
&self,
reader: R,
skip_invalid: bool,
stats: &mut ImportStats,
) -> Result<Vec<VectorRecord>, DakeraError> {
let buf_reader = BufReader::new(reader);
let mut records = Vec::new();
for line in buf_reader.lines() {
let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<VectorRecord>(&line) {
Ok(record) => records.push(record),
Err(e) => {
if skip_invalid {
stats.vectors_skipped += 1;
stats
.warnings
.push(format!("Skipped invalid record: {}", e));
} else {
return Err(DakeraError::Storage(format!("Invalid JSON record: {}", e)));
}
}
}
}
Ok(records)
}
fn read_csv<R: Read>(
&self,
reader: R,
skip_invalid: bool,
stats: &mut ImportStats,
) -> Result<Vec<VectorRecord>, DakeraError> {
let buf_reader = BufReader::new(reader);
let mut records = Vec::new();
let mut lines = buf_reader.lines();
let _ = lines.next();
for line in lines {
let line = line.map_err(|e| DakeraError::Storage(e.to_string()))?;
if line.trim().is_empty() {
continue;
}
match parse_csv_line(&line) {
Ok(record) => records.push(record),
Err(e) => {
if skip_invalid {
stats.vectors_skipped += 1;
stats
.warnings
.push(format!("Skipped invalid CSV row: {}", e));
} else {
return Err(DakeraError::Storage(format!("Invalid CSV row: {}", e)));
}
}
}
}
Ok(records)
}
fn read_binary<R: Read>(&self, mut reader: R) -> Result<Vec<VectorRecord>, DakeraError> {
let mut magic = [0u8; 4];
reader
.read_exact(&mut magic)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
if &magic != b"VPUF" {
return Err(DakeraError::Storage(
"Invalid binary format: bad magic number".to_string(),
));
}
let mut version_bytes = [0u8; 4];
reader
.read_exact(&mut version_bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let version = u32::from_le_bytes(version_bytes);
if version != 1 {
return Err(DakeraError::Storage(format!(
"Unsupported binary format version: {}",
version
)));
}
let mut count_bytes = [0u8; 8];
reader
.read_exact(&mut count_bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let count = u64::from_le_bytes(count_bytes);
const MAX_IMPORT_RECORDS: u64 = 10_000_000;
if count > MAX_IMPORT_RECORDS {
return Err(DakeraError::InvalidRequest(format!(
"Binary file declares {} records which exceeds import limit of {}",
count, MAX_IMPORT_RECORDS
)));
}
let mut records = Vec::with_capacity(count as usize);
for _ in 0..count {
let mut len_bytes = [0u8; 4];
reader
.read_exact(&mut len_bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let len = u32::from_le_bytes(len_bytes);
const MAX_RECORD_BYTES: u32 = 10 * 1024 * 1024; if len > MAX_RECORD_BYTES {
return Err(DakeraError::InvalidRequest(format!(
"Record size {} bytes exceeds maximum of {} bytes",
len, MAX_RECORD_BYTES
)));
}
let len = len as usize;
let mut json_bytes = vec![0u8; len];
reader
.read_exact(&mut json_bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
let record: VectorRecord = serde_json::from_slice(&json_bytes)
.map_err(|e| DakeraError::Storage(e.to_string()))?;
records.push(record);
}
Ok(records)
}
}
fn escape_csv(s: &str) -> String {
s.replace('"', "\"\"")
}
fn parse_csv_line(line: &str) -> Result<VectorRecord, String> {
let fields = parse_csv_fields(line)?;
if fields.len() < 2 {
return Err("CSV line must have at least id and values".to_string());
}
let id = fields[0].clone();
let values: Option<Vec<f32>> = if fields.len() > 1 && !fields[1].is_empty() {
let values_str = &fields[1];
serde_json::from_str(values_str).map_err(|e| format!("Invalid values JSON: {}", e))?
} else {
None
};
let metadata: Option<serde_json::Value> = if fields.len() > 2 && !fields[2].is_empty() {
serde_json::from_str(&fields[2]).ok()
} else {
None
};
let ttl_seconds: Option<u64> = if fields.len() > 3 && !fields[3].is_empty() {
fields[3].parse().ok()
} else {
None
};
Ok(VectorRecord {
id,
values,
metadata,
ttl_seconds,
})
}
fn parse_csv_fields(line: &str) -> Result<Vec<String>, String> {
let mut fields = Vec::new();
let mut current = String::new();
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(c) = chars.next() {
match c {
'"' if !in_quotes => {
in_quotes = true;
}
'"' if in_quotes => {
if chars.peek() == Some(&'"') {
chars.next();
current.push('"');
} else {
in_quotes = false;
}
}
',' if !in_quotes => {
fields.push(current.clone());
current.clear();
}
_ => {
current.push(c);
}
}
}
fields.push(current);
Ok(fields)
}
pub mod utils {
use super::*;
pub async fn export_json<S: VectorStorage>(
storage: S,
namespace: &str,
path: impl AsRef<Path>,
) -> Result<ExportStats, DakeraError> {
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::Json,
pretty_print: true,
..Default::default()
};
exporter.export_to_file(namespace, path, &config).await
}
pub async fn export_jsonl<S: VectorStorage>(
storage: S,
namespace: &str,
path: impl AsRef<Path>,
) -> Result<ExportStats, DakeraError> {
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::JsonLines,
..Default::default()
};
exporter.export_to_file(namespace, path, &config).await
}
pub async fn export_csv<S: VectorStorage>(
storage: S,
namespace: &str,
path: impl AsRef<Path>,
) -> Result<ExportStats, DakeraError> {
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::Csv,
..Default::default()
};
exporter.export_to_file(namespace, path, &config).await
}
pub async fn import_file<S: VectorStorage>(
storage: S,
path: impl AsRef<Path>,
namespace: &str,
) -> Result<ImportStats, DakeraError> {
let importer = DataImporter::new(storage);
let config = ImportConfig {
format: ImportFormat::Auto,
namespace: namespace.to_string(),
..Default::default()
};
importer.import_from_file(path, &config).await
}
pub async fn copy_namespace<S: VectorStorage>(
storage: &S,
source_namespace: &str,
target_namespace: &str,
) -> Result<u64, DakeraError> {
let source_ns = source_namespace.to_string();
let target_ns = target_namespace.to_string();
let vectors = storage.get_all(&source_ns).await?;
let count = vectors.len();
storage.ensure_namespace(&target_ns).await?;
storage.upsert(&target_ns, vectors).await?;
Ok(count as u64)
}
pub async fn merge_namespaces<S: VectorStorage>(
storage: &S,
source_namespaces: &[&str],
target_namespace: &str,
) -> Result<u64, DakeraError> {
let mut total_count = 0u64;
let target_ns = target_namespace.to_string();
storage.ensure_namespace(&target_ns).await?;
for source in source_namespaces {
let source_ns = source.to_string();
let vectors = storage.get_all(&source_ns).await?;
let count = vectors.len();
storage.upsert(&target_ns, vectors).await?;
total_count += count as u64;
}
Ok(total_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::InMemoryStorage;
async fn create_test_storage() -> InMemoryStorage {
let storage = InMemoryStorage::new();
let ns = "test".to_string();
storage.ensure_namespace(&ns).await.unwrap();
let vectors = vec![
Vector {
id: "vec1".to_string(),
values: vec![1.0, 2.0, 3.0],
metadata: Some(serde_json::json!({"label": "first"})),
ttl_seconds: None,
expires_at: None,
},
Vector {
id: "vec2".to_string(),
values: vec![4.0, 5.0, 6.0],
metadata: Some(serde_json::json!({"label": "second"})),
ttl_seconds: Some(3600),
expires_at: None,
},
];
storage.upsert(&ns, vectors).await.unwrap();
storage
}
#[tokio::test]
async fn test_export_json() {
let storage = create_test_storage().await;
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::Json,
pretty_print: false,
..Default::default()
};
let json = exporter.export_to_string("test", &config).await.unwrap();
let records: Vec<VectorRecord> = serde_json::from_str(&json).unwrap();
assert_eq!(records.len(), 2);
assert!(records.iter().any(|r| r.id == "vec1"));
assert!(records.iter().any(|r| r.id == "vec2"));
}
#[tokio::test]
async fn test_export_jsonl() {
let storage = create_test_storage().await;
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::JsonLines,
..Default::default()
};
let jsonl = exporter.export_to_string("test", &config).await.unwrap();
let lines: Vec<&str> = jsonl.lines().collect();
assert_eq!(lines.len(), 2);
for line in lines {
let _record: VectorRecord = serde_json::from_str(line).unwrap();
}
}
#[tokio::test]
async fn test_export_csv() {
let storage = create_test_storage().await;
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::Csv,
..Default::default()
};
let csv = exporter.export_to_string("test", &config).await.unwrap();
let lines: Vec<&str> = csv.lines().collect();
assert_eq!(lines.len(), 3); assert!(lines[0].contains("id,values,metadata"));
}
#[tokio::test]
async fn test_export_binary() {
let storage = create_test_storage().await;
let exporter = DataExporter::new(storage);
let config = ExportConfig {
format: ExportFormat::Binary,
..Default::default()
};
let mut buffer = Vec::new();
let stats = exporter
.export_to_writer("test", &mut buffer, &config)
.await
.unwrap();
assert_eq!(stats.vectors_exported, 2);
assert!(buffer.starts_with(b"VPUF"));
}
#[tokio::test]
async fn test_import_json() {
let storage = InMemoryStorage::new();
let importer = DataImporter::new(storage.clone());
let json = r#"[
{"id": "import1", "values": [1.0, 2.0], "metadata": {"key": "value"}},
{"id": "import2", "values": [3.0, 4.0]}
]"#;
let config = ImportConfig {
format: ImportFormat::Json,
namespace: "imported".to_string(),
..Default::default()
};
let stats = importer.import_from_string(json, &config).await.unwrap();
assert_eq!(stats.vectors_imported, 2);
let ns = "imported".to_string();
let vectors = storage.get_all(&ns).await.unwrap();
assert_eq!(vectors.len(), 2);
}
#[tokio::test]
async fn test_import_jsonl() {
let storage = InMemoryStorage::new();
let importer = DataImporter::new(storage.clone());
let jsonl = r#"{"id": "line1", "values": [1.0, 2.0]}
{"id": "line2", "values": [3.0, 4.0]}"#;
let config = ImportConfig {
format: ImportFormat::JsonLines,
namespace: "jsonl_ns".to_string(),
..Default::default()
};
let stats = importer.import_from_string(jsonl, &config).await.unwrap();
assert_eq!(stats.vectors_imported, 2);
}
#[tokio::test]
async fn test_import_with_skip_invalid() {
let storage = InMemoryStorage::new();
let importer = DataImporter::new(storage.clone());
let jsonl = r#"{"id": "valid", "values": [1.0, 2.0]}
this is not valid json
{"id": "also_valid", "values": [3.0, 4.0]}"#;
let config = ImportConfig {
format: ImportFormat::JsonLines,
namespace: "skip_ns".to_string(),
skip_invalid: true,
..Default::default()
};
let stats = importer.import_from_string(jsonl, &config).await.unwrap();
assert_eq!(stats.vectors_imported, 2);
assert_eq!(stats.vectors_skipped, 1);
}
#[tokio::test]
async fn test_roundtrip_binary() {
let storage = create_test_storage().await;
let exporter = DataExporter::new(storage.clone());
let config = ExportConfig {
format: ExportFormat::Binary,
..Default::default()
};
let mut buffer = Vec::new();
exporter
.export_to_writer("test", &mut buffer, &config)
.await
.unwrap();
let new_storage = InMemoryStorage::new();
let importer = DataImporter::new(new_storage.clone());
let import_config = ImportConfig {
format: ImportFormat::Binary,
namespace: "roundtrip".to_string(),
..Default::default()
};
let stats = importer
.import_from_reader(std::io::Cursor::new(buffer), &import_config)
.await
.unwrap();
assert_eq!(stats.vectors_imported, 2);
let ns = "roundtrip".to_string();
let vectors = new_storage.get_all(&ns).await.unwrap();
assert_eq!(vectors.len(), 2);
}
#[tokio::test]
async fn test_copy_namespace() {
let storage = create_test_storage().await;
let count = utils::copy_namespace(&storage, "test", "copy")
.await
.unwrap();
assert_eq!(count, 2);
let ns = "copy".to_string();
let copied = storage.get_all(&ns).await.unwrap();
assert_eq!(copied.len(), 2);
}
#[tokio::test]
async fn test_merge_namespaces() {
let storage = InMemoryStorage::new();
let ns1 = "ns1".to_string();
let ns2 = "ns2".to_string();
storage.ensure_namespace(&ns1).await.unwrap();
storage.ensure_namespace(&ns2).await.unwrap();
storage
.upsert(
&ns1,
vec![Vector {
id: "a".to_string(),
values: vec![1.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}],
)
.await
.unwrap();
storage
.upsert(
&ns2,
vec![Vector {
id: "b".to_string(),
values: vec![2.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}],
)
.await
.unwrap();
let count = utils::merge_namespaces(&storage, &["ns1", "ns2"], "merged")
.await
.unwrap();
assert_eq!(count, 2);
let merged_ns = "merged".to_string();
let merged = storage.get_all(&merged_ns).await.unwrap();
assert_eq!(merged.len(), 2);
}
#[test]
fn test_csv_field_parsing() {
let fields = parse_csv_fields(r#""hello","world""#).unwrap();
assert_eq!(fields, vec!["hello", "world"]);
let fields = parse_csv_fields(r#""has""quote","normal""#).unwrap();
assert_eq!(fields, vec!["has\"quote", "normal"]);
let fields = parse_csv_fields(r#""has,comma","ok""#).unwrap();
assert_eq!(fields, vec!["has,comma", "ok"]);
}
#[test]
fn test_vector_record_conversion() {
let vector = Vector {
id: "test".to_string(),
values: vec![1.0, 2.0, 3.0],
metadata: Some(serde_json::json!({"key": "value"})),
ttl_seconds: Some(3600),
expires_at: None,
};
let record = VectorRecord::from(&vector);
assert_eq!(record.id, "test");
assert_eq!(record.values, Some(vec![1.0, 2.0, 3.0]));
assert!(record.metadata.is_some());
assert_eq!(record.ttl_seconds, Some(3600));
let back: Vector = record.into();
assert_eq!(back.id, "test");
assert_eq!(back.values, vec![1.0, 2.0, 3.0]);
}
}