use crate::store::{Metadata, Neighbor, Query, VecStore};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionConfig {
pub partition_field: String,
pub auto_create: bool,
pub max_vectors_per_partition: Option<usize>,
}
impl Default for PartitionConfig {
fn default() -> Self {
Self {
partition_field: "partition".to_string(),
auto_create: true,
max_vectors_per_partition: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionInfo {
pub id: String,
pub vector_count: usize,
pub path: PathBuf,
pub created_at: std::time::SystemTime,
pub modified_at: std::time::SystemTime,
pub size_bytes: u64,
}
pub struct PartitionedStore {
base_path: PathBuf,
config: PartitionConfig,
partitions: HashMap<String, VecStore>,
partition_info: HashMap<String, PartitionInfo>,
}
impl PartitionedStore {
pub fn new<P: AsRef<Path>>(base_path: P, config: PartitionConfig) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
std::fs::create_dir_all(&base_path)
.map_err(|e| anyhow::anyhow!("Failed to create directory: {}", e))?;
let mut store = Self {
base_path,
config,
partitions: HashMap::new(),
partition_info: HashMap::new(),
};
store.load_existing_partitions()?;
Ok(store)
}
fn load_existing_partitions(&mut self) -> Result<()> {
let entries = std::fs::read_dir(&self.base_path)
.map_err(|e| anyhow::anyhow!("Failed to read directory: {}", e))?;
for entry in entries {
let entry = entry.map_err(|e| anyhow::anyhow!("Read error: {}", e))?;
let path = entry.path();
if path.is_dir() {
let partition_id = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("")
.to_string();
let db_path = path.join("vectors.db");
if db_path.exists() {
match VecStore::open(&db_path) {
Ok(store) => {
let vector_count = store.len();
let metadata = std::fs::metadata(&db_path)
.map_err(|e| anyhow::anyhow!("Metadata error: {}", e))?;
let info = PartitionInfo {
id: partition_id.clone(),
vector_count,
path: path.clone(),
created_at: metadata
.created()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
modified_at: metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
size_bytes: metadata.len(),
};
self.partitions.insert(partition_id.clone(), store);
self.partition_info.insert(partition_id, info);
}
Err(_) => {
continue;
}
}
}
}
}
Ok(())
}
fn get_or_create_partition(&mut self, partition_id: &str) -> Result<&mut VecStore> {
if !self.partitions.contains_key(partition_id) {
if !self.config.auto_create {
return Err(anyhow::anyhow!(
"Partition '{}' does not exist and auto_create is disabled",
partition_id
));
}
let partition_path = self.base_path.join(partition_id);
std::fs::create_dir_all(&partition_path)
.map_err(|e| anyhow::anyhow!("Failed to create partition dir: {}", e))?;
let db_path = partition_path.join("vectors.db");
let store = VecStore::open(&db_path)?;
let info = PartitionInfo {
id: partition_id.to_string(),
vector_count: 0,
path: partition_path,
created_at: std::time::SystemTime::now(),
modified_at: std::time::SystemTime::now(),
size_bytes: 0,
};
self.partitions.insert(partition_id.to_string(), store);
self.partition_info.insert(partition_id.to_string(), info);
}
Ok(self.partitions.get_mut(partition_id).unwrap())
}
pub fn insert(
&mut self,
partition_id: &str,
id: String,
vector: Vec<f32>,
metadata: Metadata,
) -> Result<()> {
if let Some(max_size) = self.config.max_vectors_per_partition {
if let Some(info) = self.partition_info.get(partition_id) {
if info.vector_count >= max_size {
return Err(anyhow::anyhow!(
"Partition '{}' has reached maximum size of {} vectors",
partition_id,
max_size
));
}
}
}
let partition = self.get_or_create_partition(partition_id)?;
partition.upsert(id, vector, metadata)?;
let new_count = partition.len();
if let Some(info) = self.partition_info.get_mut(partition_id) {
info.vector_count = new_count;
info.modified_at = std::time::SystemTime::now();
}
Ok(())
}
pub fn query_partition(&self, partition_id: &str, query: Query) -> Result<Vec<Neighbor>> {
let partition = self
.partitions
.get(partition_id)
.ok_or_else(|| anyhow::anyhow!("Partition '{}' not found", partition_id))?;
partition.query(query)
}
pub fn query_all(&self, query: Query, limit: usize) -> Result<Vec<Neighbor>> {
let mut all_results = Vec::new();
for partition in self.partitions.values() {
let results = partition.query(query.clone())?;
all_results.extend(results);
}
all_results.sort_by(|a, b| a.score.partial_cmp(&b.score).unwrap());
all_results.truncate(limit);
Ok(all_results)
}
pub fn query_partitions(
&self,
partition_ids: &[&str],
query: Query,
limit: usize,
) -> Result<Vec<Neighbor>> {
let mut all_results = Vec::new();
for &partition_id in partition_ids {
if let Some(partition) = self.partitions.get(partition_id) {
let results = partition.query(query.clone())?;
all_results.extend(results);
}
}
all_results.sort_by(|a, b| a.score.partial_cmp(&b.score).unwrap());
all_results.truncate(limit);
Ok(all_results)
}
pub fn delete(&mut self, partition_id: &str, id: &str) -> Result<()> {
let partition = self
.partitions
.get_mut(partition_id)
.ok_or_else(|| anyhow::anyhow!("Partition '{}' not found", partition_id))?;
partition.delete(id)?;
if let Some(info) = self.partition_info.get_mut(partition_id) {
info.vector_count = partition.len();
info.modified_at = std::time::SystemTime::now();
}
Ok(())
}
pub fn get_partition_info(&self, partition_id: &str) -> Option<&PartitionInfo> {
self.partition_info.get(partition_id)
}
pub fn list_partitions(&self) -> Vec<&PartitionInfo> {
self.partition_info.values().collect()
}
pub fn total_vectors(&self) -> usize {
self.partition_info
.values()
.map(|info| info.vector_count)
.sum()
}
pub fn delete_partition(&mut self, partition_id: &str) -> Result<()> {
self.partitions.remove(partition_id);
if let Some(info) = self.partition_info.remove(partition_id) {
std::fs::remove_dir_all(&info.path)
.map_err(|e| anyhow::anyhow!("Failed to delete partition: {}", e))?;
}
Ok(())
}
pub fn compact_partition(&mut self, partition_id: &str) -> Result<usize> {
let partition = self
.partitions
.get_mut(partition_id)
.ok_or_else(|| anyhow::anyhow!("Partition '{}' not found", partition_id))?;
let removed = partition.compact()?;
if let Some(info) = self.partition_info.get_mut(partition_id) {
info.vector_count = partition.len();
info.modified_at = std::time::SystemTime::now();
}
Ok(removed)
}
pub fn partition_stats(&self) -> PartitionStats {
let total_partitions = self.partitions.len();
let total_vectors = self.total_vectors();
let avg_vectors_per_partition = if total_partitions > 0 {
total_vectors as f64 / total_partitions as f64
} else {
0.0
};
let mut largest_partition = None;
let mut smallest_partition = None;
let mut max_size = 0;
let mut min_size = usize::MAX;
for info in self.partition_info.values() {
if info.vector_count > max_size {
max_size = info.vector_count;
largest_partition = Some(info.id.clone());
}
if info.vector_count < min_size {
min_size = info.vector_count;
smallest_partition = Some(info.id.clone());
}
}
PartitionStats {
total_partitions,
total_vectors,
avg_vectors_per_partition,
largest_partition,
smallest_partition,
max_partition_size: max_size,
min_partition_size: if min_size == usize::MAX { 0 } else { min_size },
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionStats {
pub total_partitions: usize,
pub total_vectors: usize,
pub avg_vectors_per_partition: f64,
pub largest_partition: Option<String>,
pub smallest_partition: Option<String>,
pub max_partition_size: usize,
pub min_partition_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tempfile::TempDir;
#[test]
fn test_partitioned_store_creation() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig::default();
let store = PartitionedStore::new(temp_dir.path(), config)?;
assert_eq!(store.list_partitions().len(), 0);
Ok(())
}
#[test]
fn test_insert_and_query() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig::default();
let mut store = PartitionedStore::new(temp_dir.path(), config)?;
let mut metadata = Metadata {
fields: HashMap::new(),
};
metadata
.fields
.insert("partition".to_string(), serde_json::json!("A"));
store.insert("A", "vec1".to_string(), vec![0.1, 0.2, 0.3], metadata)?;
let mut metadata = Metadata {
fields: HashMap::new(),
};
metadata
.fields
.insert("partition".to_string(), serde_json::json!("B"));
store.insert("B", "vec2".to_string(), vec![0.4, 0.5, 0.6], metadata)?;
assert_eq!(store.list_partitions().len(), 2);
assert_eq!(store.total_vectors(), 2);
let query = Query::new(vec![0.1, 0.2, 0.3]).with_limit(10);
let results = store.query_partition("A", query)?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, "vec1");
Ok(())
}
#[test]
fn test_cross_partition_query() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig::default();
let mut store = PartitionedStore::new(temp_dir.path(), config)?;
for i in 0..3 {
let partition_id = format!("partition_{}", i);
let metadata = Metadata {
fields: HashMap::new(),
};
store.insert(
&partition_id,
format!("vec_{}", i),
vec![i as f32 * 0.1, i as f32 * 0.2, i as f32 * 0.3],
metadata,
)?;
}
let query = Query::new(vec![0.0, 0.0, 0.0]).with_limit(10);
let results = store.query_all(query, 10)?;
assert_eq!(results.len(), 3);
Ok(())
}
#[test]
fn test_partition_size_limit() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig {
partition_field: "tenant".to_string(),
auto_create: true,
max_vectors_per_partition: Some(2),
};
let mut store = PartitionedStore::new(temp_dir.path(), config)?;
for i in 0..2 {
let metadata = Metadata {
fields: HashMap::new(),
};
store.insert("tenant1", format!("vec_{}", i), vec![i as f32], metadata)?;
}
let metadata = Metadata {
fields: HashMap::new(),
};
let result = store.insert("tenant1", "vec_3".to_string(), vec![3.0], metadata);
assert!(result.is_err());
Ok(())
}
#[test]
fn test_partition_deletion() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig::default();
let mut store = PartitionedStore::new(temp_dir.path(), config)?;
let metadata = Metadata {
fields: HashMap::new(),
};
store.insert("test_partition", "vec1".to_string(), vec![1.0], metadata)?;
assert_eq!(store.list_partitions().len(), 1);
store.delete_partition("test_partition")?;
assert_eq!(store.list_partitions().len(), 0);
Ok(())
}
#[test]
fn test_partition_stats() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = PartitionConfig::default();
let mut store = PartitionedStore::new(temp_dir.path(), config)?;
let metadata = Metadata {
fields: HashMap::new(),
};
store.insert("small", "vec1".to_string(), vec![1.0], metadata.clone())?;
store.insert("large", "vec2".to_string(), vec![2.0], metadata.clone())?;
store.insert("large", "vec3".to_string(), vec![3.0], metadata.clone())?;
store.insert("large", "vec4".to_string(), vec![4.0], metadata)?;
let stats = store.partition_stats();
assert_eq!(stats.total_partitions, 2);
assert_eq!(stats.total_vectors, 4);
assert_eq!(stats.max_partition_size, 3);
assert_eq!(stats.min_partition_size, 1);
Ok(())
}
}