use crate::store::{Metadata, VecStore};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationConfig {
pub batch_size: usize,
pub validate: bool,
pub resume_from: Option<usize>,
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
batch_size: 1000,
validate: true,
resume_from: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkMigrationStats {
pub total_vectors: usize,
pub errors: usize,
pub duration: Duration,
pub bytes_processed: u64,
pub throughput: f64,
}
pub type ProgressCallback = Box<dyn Fn(usize, usize) + Send>;
pub struct PineconeMigration {
config: MigrationConfig,
progress_callback: Option<ProgressCallback>,
}
impl PineconeMigration {
pub fn new(config: MigrationConfig) -> Self {
Self {
config,
progress_callback: None,
}
}
pub fn with_progress<F>(mut self, callback: F) -> Self
where
F: Fn(usize, usize) + Send + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn import_from_file(&self, path: &str, store: &mut VecStore) -> Result<BulkMigrationStats> {
let start = Instant::now();
let file = File::open(path).map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
let reader = BufReader::new(file);
let data: serde_json::Value = serde_json::from_reader(reader)
.map_err(|e| anyhow::anyhow!("Failed to parse JSON: {}", e))?;
let vectors = data["vectors"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("No 'vectors' array found".to_string()))?;
let total = vectors.len();
let mut migrated = 0;
let mut errors = 0;
let mut bytes = 0u64;
let start_offset = self.config.resume_from.unwrap_or(0);
for (i, vector_data) in vectors.iter().enumerate().skip(start_offset) {
if i % self.config.batch_size == 0 {
if let Some(ref callback) = self.progress_callback {
callback(migrated, total);
}
}
match self.import_vector(vector_data, store) {
Ok(size) => {
migrated += 1;
bytes += size;
}
Err(_) => {
errors += 1;
}
}
}
let duration = start.elapsed();
let throughput = migrated as f64 / duration.as_secs_f64();
Ok(BulkMigrationStats {
total_vectors: migrated,
errors,
duration,
bytes_processed: bytes,
throughput,
})
}
fn import_vector(&self, data: &serde_json::Value, store: &mut VecStore) -> Result<u64> {
let id = data["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing 'id' field"))?
.to_string();
let values: Vec<f32> = data["values"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'values' field"))?
.iter()
.map(|v| v.as_f64().unwrap_or(0.0) as f32)
.collect();
if self.config.validate && values.is_empty() {
return Err(anyhow::anyhow!("Empty vector values"));
}
let metadata = if let Some(meta) = data.get("metadata") {
let fields: HashMap<String, serde_json::Value> =
serde_json::from_value(meta.clone()).unwrap_or_default();
Metadata { fields }
} else {
Metadata {
fields: HashMap::new(),
}
};
store.upsert(id.clone(), values.clone(), metadata)?;
let size = id.len() + values.len() * 4 + 100; Ok(size as u64)
}
}
pub struct QdrantMigration {
config: MigrationConfig,
}
impl QdrantMigration {
pub fn new(config: MigrationConfig) -> Self {
Self { config }
}
pub fn import_from_jsonl(
&self,
path: &str,
store: &mut VecStore,
) -> Result<BulkMigrationStats> {
let start = Instant::now();
let file = File::open(path).map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
let reader = BufReader::new(file);
let mut migrated = 0;
let mut errors = 0;
let mut bytes = 0u64;
for (i, line) in reader.lines().enumerate() {
if let Some(offset) = self.config.resume_from {
if i < offset {
continue;
}
}
let line = line.map_err(|e| anyhow::anyhow!("Read error: {}", e))?;
match self.import_point(&line, store) {
Ok(size) => {
migrated += 1;
bytes += size;
}
Err(_) => {
errors += 1;
}
}
}
let duration = start.elapsed();
let throughput = migrated as f64 / duration.as_secs_f64();
Ok(BulkMigrationStats {
total_vectors: migrated,
errors,
duration,
bytes_processed: bytes,
throughput,
})
}
fn import_point(&self, line: &str, store: &mut VecStore) -> Result<u64> {
let data: serde_json::Value =
serde_json::from_str(line).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
let id = data["id"].to_string().trim_matches('"').to_string();
let vector: Vec<f32> = data["vector"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing vector".to_string()))?
.iter()
.map(|v| v.as_f64().unwrap_or(0.0) as f32)
.collect();
let metadata = if let Some(payload) = data.get("payload") {
let fields: HashMap<String, serde_json::Value> =
serde_json::from_value(payload.clone()).unwrap_or_default();
Metadata { fields }
} else {
Metadata {
fields: HashMap::new(),
}
};
store.upsert(id.clone(), vector.clone(), metadata)?;
Ok((id.len() + vector.len() * 4 + 100) as u64)
}
}
pub struct ChromaDBMigration {
config: MigrationConfig,
}
impl ChromaDBMigration {
pub fn new(config: MigrationConfig) -> Self {
Self { config }
}
pub fn import_from_file(&self, path: &str, store: &mut VecStore) -> Result<BulkMigrationStats> {
let start = Instant::now();
let file = File::open(path).map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
let reader = BufReader::new(file);
let data: serde_json::Value = serde_json::from_reader(reader)
.map_err(|e| anyhow::anyhow!("Failed to parse JSON: {}", e))?;
let ids: Vec<String> = data["ids"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'ids'".to_string()))?
.iter()
.map(|v| v.as_str().unwrap_or("").to_string())
.collect();
let embeddings: Vec<Vec<f32>> = data["embeddings"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Missing 'embeddings'".to_string()))?
.iter()
.map(|arr| {
arr.as_array()
.unwrap_or(&Vec::new())
.iter()
.map(|v| v.as_f64().unwrap_or(0.0) as f32)
.collect()
})
.collect();
let metadatas: Vec<HashMap<String, serde_json::Value>> = data
.get("metadatas")
.and_then(|m| m.as_array())
.map(|arr| {
arr.iter()
.map(|v| serde_json::from_value(v.clone()).unwrap_or_default())
.collect()
})
.unwrap_or_else(|| vec![HashMap::new(); ids.len()]);
let mut migrated = 0;
let mut errors = 0;
let mut bytes = 0u64;
for ((id, embedding), metadata) in ids.iter().zip(embeddings.iter()).zip(metadatas.iter()) {
match store.upsert(
id.clone(),
embedding.clone(),
Metadata {
fields: metadata.clone(),
},
) {
Ok(_) => {
migrated += 1;
bytes += (id.len() + embedding.len() * 4 + 100) as u64;
}
Err(_) => {
errors += 1;
}
}
}
let duration = start.elapsed();
let throughput = migrated as f64 / duration.as_secs_f64();
Ok(BulkMigrationStats {
total_vectors: migrated,
errors,
duration,
bytes_processed: bytes,
throughput,
})
}
}
pub struct FormatConverter;
impl FormatConverter {
pub fn pinecone_to_jsonl(input: &str, output: &str) -> Result<usize> {
let file = File::open(input).map_err(|e| anyhow::anyhow!("Failed to open input: {}", e))?;
let reader = BufReader::new(file);
let data: serde_json::Value = serde_json::from_reader(reader)
.map_err(|e| anyhow::anyhow!("Failed to parse JSON: {}", e))?;
let vectors = data["vectors"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("No 'vectors' array".to_string()))?;
let mut output_file =
File::create(output).map_err(|e| anyhow::anyhow!("Failed to create output: {}", e))?;
use std::io::Write;
for vector in vectors {
let universal = serde_json::json!({
"id": vector["id"],
"vector": vector["values"],
"metadata": vector.get("metadata").unwrap_or(&serde_json::Value::Null)
});
writeln!(output_file, "{}", universal)
.map_err(|e| anyhow::anyhow!("Write error: {}", e))?;
}
Ok(vectors.len())
}
pub fn qdrant_to_jsonl(input: &str, output: &str) -> Result<usize> {
let in_file =
File::open(input).map_err(|e| anyhow::anyhow!("Failed to open input: {}", e))?;
let mut out_file =
File::create(output).map_err(|e| anyhow::anyhow!("Failed to create output: {}", e))?;
let reader = BufReader::new(in_file);
let mut count = 0;
use std::io::Write;
for line in reader.lines() {
let line = line.map_err(|e| anyhow::anyhow!("Read error: {}", e))?;
let data: serde_json::Value =
serde_json::from_str(&line).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
let universal = serde_json::json!({
"id": data["id"].to_string(),
"vector": data["vector"],
"metadata": data.get("payload").unwrap_or(&serde_json::Value::Null)
});
writeln!(out_file, "{}", universal)
.map_err(|e| anyhow::anyhow!("Write error: {}", e))?;
count += 1;
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_pinecone_migration() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let mut store = VecStore::open(temp_dir.path().join("test.db"))?;
let test_data = serde_json::json!({
"vectors": [
{
"id": "vec1",
"values": [0.1, 0.2, 0.3],
"metadata": {"category": "test"}
},
{
"id": "vec2",
"values": [0.4, 0.5, 0.6],
"metadata": {"category": "prod"}
}
]
});
let test_file = temp_dir.path().join("pinecone.json");
std::fs::write(&test_file, test_data.to_string()).unwrap();
let config = MigrationConfig::default();
let migration = PineconeMigration::new(config);
let stats = migration.import_from_file(test_file.to_str().unwrap(), &mut store)?;
assert_eq!(stats.total_vectors, 2);
assert_eq!(stats.errors, 0);
assert_eq!(store.len(), 2);
Ok(())
}
#[test]
fn test_chromadb_migration() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let mut store = VecStore::open(temp_dir.path().join("test.db"))?;
let test_data = serde_json::json!({
"ids": ["id1", "id2"],
"embeddings": [[0.1, 0.2], [0.3, 0.4]],
"metadatas": [{"key": "val1"}, {"key": "val2"}]
});
let test_file = temp_dir.path().join("chroma.json");
std::fs::write(&test_file, test_data.to_string()).unwrap();
let config = MigrationConfig::default();
let migration = ChromaDBMigration::new(config);
let stats = migration.import_from_file(test_file.to_str().unwrap(), &mut store)?;
assert_eq!(stats.total_vectors, 2);
assert_eq!(store.len(), 2);
Ok(())
}
}