use super::{TreeConfigSaver, VersionedKvStore, NAMESPACED_TREE_CONFIG_FILENAME};
use crate::config::TreeConfig;
use crate::diff::{ConflictResolver, IgnoreConflictsResolver, MergeConflict};
use crate::digest::ValueDigest;
use crate::git::metadata::{GitMetadataBackend, MetadataBackend};
use crate::git::types::*;
use crate::storage::{GitNodeStorage, InMemoryNodeStorage, NodeStorage};
use crate::tree::{ProllyTree, Tree};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use crate::storage::FileNodeStorage;
#[cfg(feature = "rocksdb_storage")]
use crate::storage::RocksDBNodeStorage;
#[cfg(feature = "proximity")]
use crate::proximity::text_index::{
dedup_chunk_hits_by_doc, doc_id_prefix, make_chunk_id, text_inner_proximity_name,
text_state_key, validate_or_write_text_identity, OVERFETCH_MULTIPLIER,
};
#[cfg(feature = "proximity")]
use crate::proximity::{
Chunker, Embedder, IdentityChunker, ProximityConfig, ProximityError, ProximityIndex,
ProximityIndexEntry, TextHit, TextIndexConfig, TextIndexError,
};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StoreFormatVersion {
V1,
V2,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NamespaceEntry<const N: usize> {
pub root_hash: Option<ValueDigest<N>>,
pub config: TreeConfig<N>,
}
#[derive(Debug, Clone)]
pub struct MigrationReport {
pub keys_migrated: usize,
pub namespaces_created: Vec<String>,
pub storage_version: StoreFormatVersion,
}
pub const DEFAULT_NAMESPACE: &str = "default";
#[cfg(feature = "proximity")]
pub type ValueTransformer = Arc<dyn Fn(&[u8]) -> Option<String> + Send + Sync>;
pub struct NamespacedKvStore<
const N: usize,
S: NodeStorage<N>,
M: MetadataBackend = GitMetadataBackend,
> {
pub(crate) inner: VersionedKvStore<N, S, M>,
pub(crate) registry: HashMap<String, NamespaceEntry<N>>,
pub(crate) namespaces: HashMap<String, ProllyTree<N, S>>,
pub(crate) namespace_staging: HashMap<String, HashMap<Vec<u8>, Option<Vec<u8>>>>,
pub(crate) default_namespace: String,
pub(crate) dirty_namespaces: HashSet<String>,
pub(crate) format_version: StoreFormatVersion,
#[cfg(feature = "proximity")]
pub(crate) proximity_indexes: HashMap<(String, String), ProximityIndex<N, S>>,
#[cfg(feature = "proximity")]
pub(crate) dirty_proximity_indexes: HashSet<(String, String)>,
#[cfg(feature = "proximity")]
pub(crate) text_embedders: HashMap<(String, String), Arc<dyn Embedder>>,
#[cfg(feature = "proximity")]
pub(crate) text_chunkers: HashMap<(String, String), Arc<dyn Chunker>>,
#[cfg(feature = "proximity")]
pub(crate) cascade_lists: HashMap<String, Vec<String>>,
#[cfg(feature = "proximity")]
pub(crate) text_transformers: HashMap<(String, String), ValueTransformer>,
pub(crate) externalize_threshold: Option<usize>,
}
pub type GitNamespacedKvStore<const N: usize> = NamespacedKvStore<N, GitNodeStorage<N>>;
pub type InMemoryNamespacedKvStore<const N: usize> = NamespacedKvStore<N, InMemoryNodeStorage<N>>;
pub type FileNamespacedKvStore<const N: usize> = NamespacedKvStore<N, FileNodeStorage<N>>;
#[cfg(feature = "rocksdb_storage")]
pub type RocksDBNamespacedKvStore<const N: usize> = NamespacedKvStore<N, RocksDBNodeStorage<N>>;
pub struct ThreadSafeNamespacedKvStore<
const N: usize,
S: NodeStorage<N>,
M: MetadataBackend = GitMetadataBackend,
> {
pub(crate) inner: Arc<Mutex<NamespacedKvStore<N, S, M>>>,
}
pub type ThreadSafeGitNamespacedKvStore<const N: usize> =
ThreadSafeNamespacedKvStore<N, GitNodeStorage<N>>;
pub type ThreadSafeInMemoryNamespacedKvStore<const N: usize> =
ThreadSafeNamespacedKvStore<N, InMemoryNodeStorage<N>>;
pub struct NamespaceHandle<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> {
store: &'a mut NamespacedKvStore<N, S, M>,
ns_name: String,
}
impl<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespaceHandle<'a, N, S, M> {
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
if let Some(staging) = self.store.namespace_staging.get(&self.ns_name) {
if let Some(staged_value) = staging.get(key) {
return staged_value.clone();
}
}
if let Some(tree) = self.store.namespaces.get(&self.ns_name) {
return tree.find(key).and_then(|node| {
node.keys.iter().position(|k| k == key).map(|idx| {
crate::storage::externalize::unwrap_value::<N, _>(
&node.values[idx],
&self.store.inner.tree.storage,
)
})
});
}
None
}
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), GitKvError> {
crate::validation::validate_kv(&key, &value)?;
#[cfg(feature = "proximity")]
self.cascade_insert(&key, &value);
self.store
.namespace_staging
.entry(self.ns_name.clone())
.or_default()
.insert(key, Some(value));
self.store.dirty_namespaces.insert(self.ns_name.clone());
Ok(())
}
pub fn delete(&mut self, key: &[u8]) -> Result<bool, GitKvError> {
let exists = self.get(key).is_some();
if exists {
self.store
.namespace_staging
.entry(self.ns_name.clone())
.or_default()
.insert(key.to_vec(), None);
self.store.dirty_namespaces.insert(self.ns_name.clone());
#[cfg(feature = "proximity")]
self.cascade_delete(key);
}
Ok(exists)
}
#[cfg(feature = "proximity")]
fn cascade_insert(&mut self, key: &[u8], value: &[u8]) {
let Some(cascade_list) = self.store.cascade_lists.get(&self.ns_name).cloned() else {
return;
};
for idx_name in cascade_list {
let target_key = (self.ns_name.clone(), idx_name.clone());
let prox_key = (self.ns_name.clone(), text_inner_proximity_name(&idx_name));
let text: String = match self.store.text_transformers.get(&target_key).cloned() {
Some(transformer) => match transformer(value) {
Some(t) => t,
None => continue, },
None => match std::str::from_utf8(value) {
Ok(s) => s.to_string(),
Err(_) => continue, },
};
let Some(embedder) = self.store.text_embedders.get(&target_key).cloned() else {
continue;
};
let chunker: Arc<dyn Chunker> = self
.store
.text_chunkers
.get(&target_key)
.cloned()
.unwrap_or_else(|| Arc::new(IdentityChunker));
Self::cascade_delete_chunks_for_doc(self.store, &prox_key, key);
let chunks = chunker.split(&text);
for (chunk_idx, chunk_text) in chunks.iter().enumerate() {
let Ok(vec) = embedder.embed(chunk_text) else {
continue;
};
let chunk_id = make_chunk_id(key, chunk_idx as u32);
if let Some(idx) = self.store.proximity_indexes.get_mut(&prox_key) {
let _ = idx.insert(chunk_id, vec);
self.store.dirty_proximity_indexes.insert(prox_key.clone());
}
}
}
}
#[cfg(feature = "proximity")]
fn cascade_delete(&mut self, key: &[u8]) {
let Some(cascade_list) = self.store.cascade_lists.get(&self.ns_name).cloned() else {
return;
};
for idx_name in cascade_list {
let prox_key = (self.ns_name.clone(), text_inner_proximity_name(&idx_name));
Self::cascade_delete_chunks_for_doc(self.store, &prox_key, key);
}
}
#[cfg(feature = "proximity")]
fn cascade_delete_chunks_for_doc(
store: &mut NamespacedKvStore<N, S, M>,
prox_key: &(String, String),
doc_id: &[u8],
) {
let Some(idx) = store.proximity_indexes.get_mut(prox_key) else {
return;
};
let prefix = doc_id_prefix(doc_id);
let to_remove: Vec<Vec<u8>> = idx
.entries_snapshot()
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();
let mut any = false;
for cid in to_remove {
if idx.remove(&cid) {
any = true;
}
}
if any {
store.dirty_proximity_indexes.insert(prox_key.clone());
}
}
pub fn list_keys(&self) -> Vec<Vec<u8>> {
let mut keys = HashSet::new();
if let Some(tree) = self.store.namespaces.get(&self.ns_name) {
for key in tree.collect_keys() {
keys.insert(key);
}
}
if let Some(staging) = self.store.namespace_staging.get(&self.ns_name) {
for (key, value) in staging {
if value.is_some() {
keys.insert(key.clone());
} else {
keys.remove(key);
}
}
}
let mut result: Vec<Vec<u8>> = keys.into_iter().collect();
result.sort();
result
}
pub fn root_hash(&self) -> Option<ValueDigest<N>> {
self.store
.namespaces
.get(&self.ns_name)
.and_then(|tree| tree.get_root_hash())
}
}
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespacedKvStore<N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn namespace(&mut self, prefix: &str) -> NamespaceHandle<'_, N, S, M> {
if !self.namespaces.contains_key(prefix) {
if let Some(entry) = self.registry.get(prefix) {
let mut config = entry.config.clone();
config.root_hash = entry.root_hash.clone();
let tree = if entry.root_hash.is_some() {
ProllyTree::load_from_storage(self.inner.tree.storage.clone(), config.clone())
.unwrap_or_else(|| ProllyTree::new(self.inner.tree.storage.clone(), config))
} else {
ProllyTree::new(self.inner.tree.storage.clone(), config)
};
self.namespaces.insert(prefix.to_string(), tree);
} else {
let tree = ProllyTree::new(self.inner.tree.storage.clone(), TreeConfig::default());
self.namespaces.insert(prefix.to_string(), tree);
self.dirty_namespaces.insert(prefix.to_string());
}
}
self.namespace_staging
.entry(prefix.to_string())
.or_default();
NamespaceHandle {
store: self,
ns_name: prefix.to_string(),
}
}
pub fn list_namespaces(&self) -> Vec<String> {
let mut names: HashSet<String> = HashSet::new();
names.extend(self.registry.keys().cloned());
names.extend(self.namespaces.keys().cloned());
let mut result: Vec<String> = names.into_iter().collect();
result.sort();
result
}
pub fn delete_namespace(&mut self, prefix: &str) -> Result<bool, GitKvError> {
if prefix == self.default_namespace {
return Err(GitKvError::GitObjectError(
"Cannot delete the default namespace".to_string(),
));
}
let existed =
self.registry.remove(prefix).is_some() || self.namespaces.remove(prefix).is_some();
self.namespace_staging.remove(prefix);
self.dirty_namespaces.remove(prefix);
Ok(existed)
}
pub fn get_namespace_root_hash(&self, prefix: &str) -> Option<ValueDigest<N>> {
if let Some(tree) = self.namespaces.get(prefix) {
return tree.get_root_hash();
}
self.registry
.get(prefix)
.and_then(|entry| entry.root_hash.clone())
}
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<(), GitKvError> {
let ns = self.default_namespace.clone();
self.namespace(&ns).insert(key, value)
}
pub fn get(&mut self, key: &[u8]) -> Option<Vec<u8>> {
let ns = self.default_namespace.clone();
self.namespace(&ns).get(key)
}
pub fn delete(&mut self, key: &[u8]) -> Result<bool, GitKvError> {
let ns = self.default_namespace.clone();
self.namespace(&ns).delete(key)
}
pub fn list_keys(&mut self) -> Vec<Vec<u8>> {
let ns = self.default_namespace.clone();
self.namespace(&ns).list_keys()
}
pub fn current_branch(&self) -> &str {
self.inner.current_branch()
}
pub fn list_branches(&self) -> Result<Vec<String>, GitKvError> {
self.inner.list_branches()
}
pub fn log(&self) -> Result<Vec<CommitInfo>, GitKvError> {
self.inner.log()
}
pub fn create_branch(&mut self, name: &str) -> Result<(), GitKvError> {
self.inner.create_branch(name)?;
self.namespace_staging.clear();
self.dirty_namespaces.clear();
Ok(())
}
fn save_namespace_registry(&self) -> Result<(), GitKvError> {
let dataset_dir = self
.inner
.dataset_dir
.as_ref()
.ok_or_else(|| GitKvError::GitObjectError("Dataset directory not set".into()))?;
let version_path = dataset_dir.join("prolly_namespace_version");
std::fs::write(&version_path, "V2").map_err(|e| {
GitKvError::GitObjectError(format!("Failed to write namespace version: {e}"))
})?;
let mut registry_data: HashMap<String, NamespaceEntry<N>> = self.registry.clone();
for (ns_name, tree) in &self.namespaces {
registry_data.insert(
ns_name.clone(),
NamespaceEntry {
root_hash: tree.get_root_hash(),
config: tree.config.clone(),
},
);
}
let registry_json = serde_json::to_string_pretty(®istry_data).map_err(|e| {
GitKvError::GitObjectError(format!("Failed to serialize namespace registry: {e}"))
})?;
let registry_path = dataset_dir.join("prolly_namespace_registry");
std::fs::write(®istry_path, registry_json).map_err(|e| {
GitKvError::GitObjectError(format!("Failed to write namespace registry: {e}"))
})?;
Ok(())
}
fn load_namespace_registry(&mut self) -> Result<(), GitKvError> {
let dataset_dir = self
.inner
.dataset_dir
.as_ref()
.ok_or_else(|| GitKvError::GitObjectError("Dataset directory not set".into()))?
.clone();
let registry_path = dataset_dir.join("prolly_namespace_registry");
if !registry_path.exists() {
return Ok(());
}
let registry_json = std::fs::read_to_string(®istry_path).map_err(|e| {
GitKvError::GitObjectError(format!("Failed to read namespace registry: {e}"))
})?;
let registry: HashMap<String, NamespaceEntry<N>> = serde_json::from_str(®istry_json)
.map_err(|e| {
GitKvError::GitObjectError(format!("Failed to parse namespace registry: {e}"))
})?;
self.registry = registry;
Ok(())
}
fn detect_format_version(dataset_dir: &Path) -> StoreFormatVersion {
let version_path = dataset_dir.join("prolly_namespace_version");
if version_path.exists() {
if let Ok(content) = std::fs::read_to_string(version_path) {
if content.trim() == "V2" {
return StoreFormatVersion::V2;
}
}
}
StoreFormatVersion::V1
}
pub fn set_externalize_threshold(&mut self, threshold: Option<usize>) {
self.externalize_threshold = threshold;
}
pub fn externalize_threshold(&self) -> Option<usize> {
self.externalize_threshold
}
pub fn inner_storage(&self) -> &S {
&self.inner.tree.storage
}
pub fn gc_blobs(&mut self) -> Result<BlobGcReport, GitKvError> {
let ns_names = self.list_namespaces();
for name in &ns_names {
let _ = self.namespace(name);
}
let mut referenced: HashSet<ValueDigest<N>> = HashSet::new();
for tree in self.namespaces.values() {
for key in tree.collect_keys() {
let raw = tree.find(&key).and_then(|node| {
node.keys
.iter()
.position(|k| k == &key)
.map(|i| node.values[i].clone())
});
if let Some(bytes) = raw {
if let Some((hash, _size)) =
crate::storage::externalize::parse_envelope::<N>(&bytes)
{
referenced.insert(hash);
}
}
}
}
let all_blobs = self
.inner
.tree
.storage
.list_blobs()
.map_err(|e| GitKvError::GitObjectError(format!("list_blobs: {e}")))?;
let mut report = BlobGcReport {
total: all_blobs.len(),
referenced: referenced.len(),
removed: 0,
errors: Vec::new(),
};
for h in all_blobs {
if referenced.contains(&h) {
continue;
}
match self.inner.tree.storage.delete_blob(&h) {
Ok(()) => report.removed += 1,
Err(e) => report.errors.push(format!("{h}: {e}")),
}
}
Ok(report)
}
pub(crate) fn commit_impl(&mut self, message: &str) -> Result<gix::ObjectId, GitKvError> {
let dirty: Vec<String> = self.dirty_namespaces.drain().collect();
for ns_name in &dirty {
if let Some(staging) = self.namespace_staging.get_mut(ns_name) {
if !self.namespaces.contains_key(ns_name) {
let config = self
.registry
.get(ns_name)
.map(|e| e.config.clone())
.unwrap_or_default();
let tree = ProllyTree::new(self.inner.tree.storage.clone(), config);
self.namespaces.insert(ns_name.clone(), tree);
}
if let Some(tree) = self.namespaces.get_mut(ns_name) {
for (key, value) in staging.drain() {
match value {
Some(v) => {
let stored = match self.externalize_threshold {
Some(threshold) if v.len() > threshold => {
let hash = ValueDigest::<N>::new(&v);
self.inner
.tree
.storage
.insert_blob(hash.clone(), &v)
.map_err(|e| {
GitKvError::GitObjectError(format!(
"externalise insert_blob: {e}"
))
})?;
crate::storage::externalize::make_envelope::<N>(
&hash,
v.len() as u64,
)
}
_ => v,
};
tree.insert(key, stored);
}
None => {
tree.delete(&key);
}
}
}
tree.persist_root();
}
}
}
for (ns_name, tree) in &self.namespaces {
self.registry.insert(
ns_name.clone(),
NamespaceEntry {
root_hash: tree.get_root_hash(),
config: tree.config.clone(),
},
);
}
#[cfg(feature = "proximity")]
self.flush_dirty_proximity_indexes()?;
self.save_namespace_registry()?;
self.inner.commit(message)
}
}
impl<const N: usize> NamespacedKvStore<N, GitNodeStorage<N>, GitMetadataBackend> {
fn merge_ns_hash_mappings_to_inner_git(&self) {
for tree in self.namespaces.values() {
let ns_mappings = tree.storage.get_hash_mappings();
self.inner.tree.storage.merge_hash_mappings(ns_mappings);
}
}
#[cfg(feature = "proximity")]
fn merge_proximity_hash_mappings_to_inner_git(&self) {
for idx in self.proximity_indexes.values() {
let mappings = idx.storage().get_hash_mappings();
self.inner.tree.storage.merge_hash_mappings(mappings);
}
}
pub fn commit(&mut self, message: &str) -> Result<gix::ObjectId, GitKvError> {
let dirty: Vec<String> = self.dirty_namespaces.drain().collect();
for ns_name in &dirty {
if let Some(staging) = self.namespace_staging.get_mut(ns_name) {
if !self.namespaces.contains_key(ns_name) {
let config = self
.registry
.get(ns_name)
.map(|e| e.config.clone())
.unwrap_or_default();
let tree = ProllyTree::new(self.inner.tree.storage.clone(), config);
self.namespaces.insert(ns_name.clone(), tree);
}
if let Some(tree) = self.namespaces.get_mut(ns_name) {
for (key, value) in staging.drain() {
match value {
Some(v) => {
let stored = match self.externalize_threshold {
Some(threshold) if v.len() > threshold => {
let hash = ValueDigest::<N>::new(&v);
self.inner
.tree
.storage
.insert_blob(hash.clone(), &v)
.map_err(|e| {
GitKvError::GitObjectError(format!(
"externalise insert_blob: {e}"
))
})?;
crate::storage::externalize::make_envelope::<N>(
&hash,
v.len() as u64,
)
}
_ => v,
};
tree.insert(key, stored);
}
None => {
tree.delete(&key);
}
}
}
tree.persist_root();
}
}
}
for (ns_name, tree) in &self.namespaces {
self.registry.insert(
ns_name.clone(),
NamespaceEntry {
root_hash: tree.get_root_hash(),
config: tree.config.clone(),
},
);
}
#[cfg(feature = "proximity")]
self.flush_dirty_proximity_indexes()?;
self.save_namespace_registry()?;
self.merge_ns_hash_mappings_to_inner_git();
#[cfg(feature = "proximity")]
self.merge_proximity_hash_mappings_to_inner_git();
self.inner.commit(message)
}
pub fn init<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let path_ref = path.as_ref();
if path_ref.join(NAMESPACED_TREE_CONFIG_FILENAME).exists()
|| path_ref.join("prolly_namespace_registry").exists()
{
return Self::open(path);
}
let inner = VersionedKvStore::<N, GitNodeStorage<N>>::init_with_config_filename(
&path,
NAMESPACED_TREE_CONFIG_FILENAME,
)?;
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: StoreFormatVersion::V2,
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
let default_tree = ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
store.commit("Initial namespaced store")?;
Ok(store)
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let inner = VersionedKvStore::<N, GitNodeStorage<N>>::open_with_config_filename(
&path,
NAMESPACED_TREE_CONFIG_FILENAME,
)?;
let dataset_dir = inner
.dataset_dir
.as_ref()
.ok_or_else(|| GitKvError::GitObjectError("Dataset directory not set".into()))?;
let format_version = Self::detect_format_version(dataset_dir);
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: format_version.clone(),
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
match format_version {
StoreFormatVersion::V2 => {
store.load_namespace_registry()?;
}
StoreFormatVersion::V1 => {
let mut kv_pairs = Vec::new();
for key in store.inner.tree.collect_keys() {
if let Some(value) = store.inner.get(&key) {
kv_pairs.push((key, value));
}
}
if !kv_pairs.is_empty() {
let mut default_tree =
ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
for (key, value) in kv_pairs {
default_tree.insert(key, value);
}
default_tree.persist_root();
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
} else {
let default_tree =
ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
}
}
}
Ok(store)
}
pub fn checkout(&mut self, branch_or_commit: &str) -> Result<(), GitKvError> {
self.namespace_staging.clear();
self.dirty_namespaces.clear();
self.namespaces.clear();
self.registry.clear();
self.inner.checkout(branch_or_commit)?;
let head_commit = self.inner.metadata.head_commit_id()?;
let registry_at_head = self.load_registry_at_commit(&head_commit)?;
if !registry_at_head.is_empty() {
self.registry = registry_at_head;
self.format_version = StoreFormatVersion::V2;
self.save_namespace_registry()?;
} else {
self.format_version = StoreFormatVersion::V1;
let mut kv_pairs = Vec::new();
for key in self.inner.tree.collect_keys() {
if let Some(value) = self.inner.get(&key) {
kv_pairs.push((key, value));
}
}
if !kv_pairs.is_empty() {
let mut default_tree =
ProllyTree::new(self.inner.tree.storage.clone(), TreeConfig::default());
for (key, value) in kv_pairs {
default_tree.insert(key, value);
}
default_tree.persist_root();
self.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
} else {
let default_tree =
ProllyTree::new(self.inner.tree.storage.clone(), TreeConfig::default());
self.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
}
}
Ok(())
}
pub fn merge<R: ConflictResolver>(
&mut self,
source_branch: &str,
resolver: &R,
) -> Result<gix::ObjectId, GitKvError> {
let dest_branch = self.inner.current_branch.clone();
let base_commit = self.find_merge_base(&dest_branch, source_branch)?;
let source_commit = self.get_branch_commit(source_branch)?;
let base_registry = self.load_registry_at_commit(&base_commit)?;
let source_registry = self.load_registry_at_commit(&source_commit)?;
let dest_registry = self.current_registry_snapshot();
let mut all_ns: HashSet<String> = HashSet::new();
all_ns.extend(base_registry.keys().cloned());
all_ns.extend(source_registry.keys().cloned());
all_ns.extend(dest_registry.keys().cloned());
let mut unresolved_conflicts: Vec<MergeConflict> = Vec::new();
for ns_name in &all_ns {
let base_hash = base_registry.get(ns_name).and_then(|e| e.root_hash.clone());
let source_hash = source_registry
.get(ns_name)
.and_then(|e| e.root_hash.clone());
let dest_hash = dest_registry.get(ns_name).and_then(|e| e.root_hash.clone());
if source_hash == dest_hash {
continue;
}
if base_hash == dest_hash && source_hash != dest_hash {
let source_kv =
self.collect_ns_keys_at_commit(ns_name, &source_commit, &source_registry)?;
let mut tree = ProllyTree::new(
self.inner.tree.storage.clone(),
source_registry
.get(ns_name)
.map(|e| e.config.clone())
.unwrap_or_default(),
);
for (key, value) in source_kv {
tree.insert(key, value);
}
tree.persist_root();
self.namespaces.insert(ns_name.clone(), tree);
continue;
}
if base_hash == source_hash {
continue;
}
let base_kv = self.collect_ns_keys_at_commit(ns_name, &base_commit, &base_registry)?;
let source_kv =
self.collect_ns_keys_at_commit(ns_name, &source_commit, &source_registry)?;
if !self.namespaces.contains_key(ns_name) {
if let Some(entry) = dest_registry.get(ns_name) {
let mut config = entry.config.clone();
config.root_hash = entry.root_hash.clone();
let tree = if entry.root_hash.is_some() {
ProllyTree::load_from_storage(
self.inner.tree.storage.clone(),
config.clone(),
)
.unwrap_or_else(|| ProllyTree::new(self.inner.tree.storage.clone(), config))
} else {
ProllyTree::new(self.inner.tree.storage.clone(), config)
};
self.namespaces.insert(ns_name.clone(), tree);
}
}
let mut dest_kv = HashMap::new();
if let Some(tree) = self.namespaces.get(ns_name) {
for key in tree.collect_keys() {
if let Some(node) = tree.find(&key) {
if let Some(idx) = node.keys.iter().position(|k| k == &key) {
dest_kv.insert(key, node.values[idx].clone());
}
}
}
}
let mut all_keys: HashSet<Vec<u8>> = HashSet::new();
all_keys.extend(base_kv.keys().cloned());
all_keys.extend(source_kv.keys().cloned());
all_keys.extend(dest_kv.keys().cloned());
let mut merge_results = Vec::new();
for key in &all_keys {
let base_val = base_kv.get(key);
let source_val = source_kv.get(key);
let dest_val = dest_kv.get(key);
match (base_val, source_val, dest_val) {
(Some(b), Some(s), Some(d)) => {
if b == s && b == d {
continue;
} else if b == d && b != s {
merge_results
.push(crate::diff::MergeResult::Modified(key.clone(), s.clone()));
} else if b == s || s == d {
continue;
} else {
let conflict = MergeConflict {
key: key.clone(),
base_value: Some(b.clone()),
source_value: Some(s.clone()),
destination_value: Some(d.clone()),
};
merge_results.push(crate::diff::MergeResult::Conflict(conflict));
}
}
(None, Some(s), None) => {
merge_results.push(crate::diff::MergeResult::Added(key.clone(), s.clone()));
}
(None, Some(s), Some(d)) => {
if s == d {
continue;
} else {
let conflict = MergeConflict {
key: key.clone(),
base_value: None,
source_value: Some(s.clone()),
destination_value: Some(d.clone()),
};
merge_results.push(crate::diff::MergeResult::Conflict(conflict));
}
}
(Some(b), None, Some(d)) => {
if b == d {
merge_results.push(crate::diff::MergeResult::Removed(key.clone()));
} else {
let conflict = MergeConflict {
key: key.clone(),
base_value: Some(b.clone()),
source_value: None,
destination_value: Some(d.clone()),
};
merge_results.push(crate::diff::MergeResult::Conflict(conflict));
}
}
(Some(b), Some(s), None) => {
if b == s {
continue;
} else {
let conflict = MergeConflict {
key: key.clone(),
base_value: Some(b.clone()),
source_value: Some(s.clone()),
destination_value: None,
};
merge_results.push(crate::diff::MergeResult::Conflict(conflict));
}
}
(Some(_), None, None) => {
continue;
}
_ => continue,
}
}
let mut resolved = Vec::new();
for result in merge_results {
match result {
crate::diff::MergeResult::Conflict(conflict) => {
if let Some(resolved_result) = resolver.resolve_conflict(&conflict) {
resolved.push(resolved_result);
} else {
unresolved_conflicts.push(conflict);
}
}
other => resolved.push(other),
}
}
if !self.namespaces.contains_key(ns_name) {
let tree = ProllyTree::new(self.inner.tree.storage.clone(), TreeConfig::default());
self.namespaces.insert(ns_name.clone(), tree);
}
if let Some(tree) = self.namespaces.get_mut(ns_name) {
for result in resolved {
match result {
crate::diff::MergeResult::Added(key, value)
| crate::diff::MergeResult::Modified(key, value) => {
tree.insert(key, value);
}
crate::diff::MergeResult::Removed(key) => {
tree.delete(&key);
}
crate::diff::MergeResult::Conflict(_) => unreachable!(),
}
}
tree.persist_root();
}
}
if !unresolved_conflicts.is_empty() {
return Err(GitKvError::MergeConflictError(unresolved_conflicts));
}
self.namespace_staging.clear();
self.dirty_namespaces.clear();
self.save_namespace_registry()?;
for (ns_name, tree) in &self.namespaces {
self.registry.insert(
ns_name.clone(),
NamespaceEntry {
root_hash: tree.get_root_hash(),
config: tree.config.clone(),
},
);
}
self.merge_ns_hash_mappings_to_inner_git();
self.inner.create_merge_commit_for_namespaced(source_branch)
}
#[cfg(feature = "proximity")]
pub fn merge_with_proximity_resolver<KR, PR>(
&mut self,
source_branch: &str,
kv_resolver: &KR,
proximity_resolver: &PR,
) -> Result<gix::ObjectId, GitKvError>
where
KR: ConflictResolver,
PR: crate::proximity::ProximityConflictResolver,
{
let dest_branch = self.inner.current_branch.clone();
let base_commit = self.find_merge_base(&dest_branch, source_branch)?;
let source_commit = self.get_branch_commit(source_branch)?;
let prox_keys: Vec<(String, String)> = self.proximity_indexes.keys().cloned().collect();
let mut prox_merged: Vec<((String, String), ProximityEntrySet)> = Vec::new();
let mut all_conflicts: Vec<crate::proximity::ProximityConflict> = Vec::new();
for (ns_name, idx_name) in &prox_keys {
let base = self
.load_proximity_entries_at_commit(&base_commit, ns_name, idx_name)?
.unwrap_or_default();
let source = self
.load_proximity_entries_at_commit(&source_commit, ns_name, idx_name)?
.unwrap_or_default();
let dest = self
.proximity_indexes
.get(&(ns_name.clone(), idx_name.clone()))
.map(|i| i.entries_snapshot())
.unwrap_or_default();
match crate::proximity::merge_proximity_index_sets(
&base,
&source,
&dest,
proximity_resolver,
) {
Ok(merged) => {
prox_merged.push(((ns_name.clone(), idx_name.clone()), merged));
}
Err(failure) => {
all_conflicts.extend(failure.conflicts);
}
}
}
if !all_conflicts.is_empty() {
return Err(GitKvError::ProximityMergeConflictError(all_conflicts));
}
let primary_commit = self.merge(source_branch, kv_resolver)?;
if prox_merged.is_empty() {
return Ok(primary_commit);
}
let mut any_changes = false;
for ((ns_name, idx_name), merged_entries) in prox_merged {
let dest_unchanged = self
.proximity_indexes
.get(&(ns_name.clone(), idx_name.clone()))
.map(|i| i.entries_snapshot() == merged_entries)
.unwrap_or(false);
if dest_unchanged {
continue;
}
if let Some(idx) = self
.proximity_indexes
.get_mut(&(ns_name.clone(), idx_name.clone()))
{
idx.replace_entries(merged_entries).map_err(|e| {
GitKvError::GitObjectError(format!(
"Failed to install merged proximity entries for {ns_name}:{idx_name}: {e}"
))
})?;
self.dirty_proximity_indexes
.insert((ns_name.clone(), idx_name.clone()));
any_changes = true;
}
}
if !any_changes {
return Ok(primary_commit);
}
self.commit("Proximity index merge follow-up")
}
#[cfg(feature = "proximity")]
fn load_proximity_entries_at_commit(
&self,
commit_id: &gix::ObjectId,
ns_name: &str,
idx_name: &str,
) -> Result<Option<ProximityEntrySet>, GitKvError> {
let dataset_dir = self.inner.tree.storage.dataset_dir();
let git_root = self
.inner
.metadata
.work_dir()
.or_else(|| VersionedKvStore::<N, GitNodeStorage<N>>::find_git_root(dataset_dir))
.ok_or_else(|| GitKvError::GitObjectError("Could not find git root".to_string()))?;
let dataset_relative = dataset_dir
.strip_prefix(&git_root)
.map_err(|e| GitKvError::GitObjectError(format!("Failed to get relative path: {e}")))?;
let rel_str = dataset_relative
.components()
.map(|c| c.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/");
let key = format!("proximity:{ns_name}:{idx_name}:state");
let path = if rel_str.is_empty() {
format!("prolly_config_{key}")
} else {
format!("{rel_str}/prolly_config_{key}")
};
match self.inner.metadata.read_file_at_commit(commit_id, &path) {
Ok(bytes) => {
let state =
crate::proximity::deserialize_persisted_state::<N>(&bytes).map_err(|e| {
GitKvError::GitObjectError(format!(
"Failed to deserialize proximity state at commit: {e}"
))
})?;
Ok(Some(state.entries))
}
Err(_) => Ok(None),
}
}
pub fn migrate_v1_to_v2(&mut self) -> Result<MigrationReport, GitKvError> {
if self.format_version == StoreFormatVersion::V2 {
return Err(GitKvError::GitObjectError(
"Store is already V2 format".to_string(),
));
}
let mut kv_pairs = Vec::new();
for key in self.inner.tree.collect_keys() {
if let Some(value) = self.inner.get(&key) {
kv_pairs.push((key, value));
}
}
let keys_migrated = kv_pairs.len();
let mut default_tree =
ProllyTree::new(self.inner.tree.storage.clone(), TreeConfig::default());
for (key, value) in kv_pairs {
default_tree.insert(key, value);
}
default_tree.persist_root();
self.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
self.format_version = StoreFormatVersion::V2;
self.commit("Migrate store from V1 (flat) to V2 (namespaced)")?;
Ok(MigrationReport {
keys_migrated,
namespaces_created: vec![DEFAULT_NAMESPACE.to_string()],
storage_version: StoreFormatVersion::V2,
})
}
pub fn merge_ignore_conflicts(
&mut self,
source_branch: &str,
) -> Result<gix::ObjectId, GitKvError> {
self.merge(source_branch, &IgnoreConflictsResolver)
}
pub fn namespace_changed(
&self,
prefix: &str,
commit_a: &str,
commit_b: &str,
) -> Result<bool, GitKvError> {
let commit_id_a = self.resolve_commit(commit_a)?;
let commit_id_b = self.resolve_commit(commit_b)?;
let registry_a = self.load_registry_at_commit(&commit_id_a)?;
let registry_b = self.load_registry_at_commit(&commit_id_b)?;
let hash_a = registry_a.get(prefix).and_then(|e| e.root_hash.clone());
let hash_b = registry_b.get(prefix).and_then(|e| e.root_hash.clone());
Ok(hash_a != hash_b)
}
fn resolve_commit(&self, reference: &str) -> Result<gix::ObjectId, GitKvError> {
let branch_ref = format!("refs/heads/{reference}");
if let Ok(r) = self.inner.metadata.repo().refs.find(&branch_ref) {
if let Some(id) = r.target.try_id() {
return Ok(id.to_owned());
}
}
gix::ObjectId::from_hex(reference.as_bytes())
.map_err(|e| GitKvError::GitObjectError(format!("Invalid reference: {e}")))
}
fn get_branch_commit(&self, branch: &str) -> Result<gix::ObjectId, GitKvError> {
let branch_ref = format!("refs/heads/{branch}");
match self.inner.metadata.repo().refs.find(&branch_ref) {
Ok(reference) => match reference.target.try_id() {
Some(id) => Ok(id.to_owned()),
None => Err(GitKvError::GitObjectError(format!(
"Branch {branch} does not point to a commit"
))),
},
Err(_) => Err(GitKvError::BranchNotFound(branch.to_string())),
}
}
fn find_merge_base(&self, branch1: &str, branch2: &str) -> Result<gix::ObjectId, GitKvError> {
let commit1 = self.get_branch_commit(branch1)?;
let commit2 = self.get_branch_commit(branch2)?;
let mut visited1 = HashSet::new();
let mut queue1 = std::collections::VecDeque::new();
queue1.push_back(commit1);
while let Some(cid) = queue1.pop_front() {
if !visited1.insert(cid) {
continue;
}
if let Ok(parents) = self.inner.metadata.commit_parents(&cid) {
for p in parents {
if !visited1.contains(&p) {
queue1.push_back(p);
}
}
}
}
let mut visited2 = HashSet::new();
let mut queue2 = std::collections::VecDeque::new();
queue2.push_back(commit2);
while let Some(cid) = queue2.pop_front() {
if !visited2.insert(cid) {
continue;
}
if visited1.contains(&cid) {
return Ok(cid);
}
if let Ok(parents) = self.inner.metadata.commit_parents(&cid) {
for p in parents {
if !visited2.contains(&p) {
queue2.push_back(p);
}
}
}
}
Err(GitKvError::GitObjectError(
"No common ancestor found".to_string(),
))
}
fn load_registry_at_commit(
&self,
commit_id: &gix::ObjectId,
) -> Result<HashMap<String, NamespaceEntry<N>>, GitKvError> {
let dataset_dir = self.inner.tree.storage.dataset_dir();
let git_root = self
.inner
.metadata
.work_dir()
.or_else(|| VersionedKvStore::<N, GitNodeStorage<N>>::find_git_root(dataset_dir))
.ok_or_else(|| GitKvError::GitObjectError("Could not find git root".to_string()))?;
let dataset_relative = dataset_dir
.strip_prefix(&git_root)
.map_err(|e| GitKvError::GitObjectError(format!("Failed to get relative path: {e}")))?;
let rel_str = dataset_relative
.components()
.map(|c| c.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/");
let registry_path = format!("{rel_str}/prolly_namespace_registry");
match self
.inner
.metadata
.read_file_at_commit(commit_id, ®istry_path)
{
Ok(data) => {
let registry: HashMap<String, NamespaceEntry<N>> = serde_json::from_slice(&data)
.map_err(|e| {
GitKvError::GitObjectError(format!(
"Failed to parse registry at commit: {e}"
))
})?;
Ok(registry)
}
Err(_) => {
Ok(HashMap::new())
}
}
}
fn current_registry_snapshot(&self) -> HashMap<String, NamespaceEntry<N>> {
let mut snapshot = self.registry.clone();
for (ns_name, tree) in &self.namespaces {
snapshot.insert(
ns_name.clone(),
NamespaceEntry {
root_hash: tree.get_root_hash(),
config: tree.config.clone(),
},
);
}
snapshot
}
fn collect_ns_keys_at_commit(
&self,
ns_name: &str,
commit_id: &gix::ObjectId,
registry: &HashMap<String, NamespaceEntry<N>>,
) -> Result<HashMap<Vec<u8>, Vec<u8>>, GitKvError> {
let entry = match registry.get(ns_name) {
Some(e) => e,
None => return Ok(HashMap::new()),
};
let root_hash = match &entry.root_hash {
Some(h) => h,
None => return Ok(HashMap::new()),
};
let dataset_dir = self.inner.tree.storage.dataset_dir();
let git_root = self
.inner
.metadata
.work_dir()
.or_else(|| VersionedKvStore::<N, GitNodeStorage<N>>::find_git_root(dataset_dir))
.ok_or_else(|| GitKvError::GitObjectError("Could not find git root".to_string()))?;
let dataset_relative = dataset_dir
.strip_prefix(&git_root)
.map_err(|e| GitKvError::GitObjectError(format!("Failed to get relative path: {e}")))?;
let rel_str = dataset_relative
.components()
.map(|c| c.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/");
let ns_mapping_path = format!("{rel_str}/prolly_ns_hash_mappings");
let global_mapping_path = format!("{rel_str}/prolly_hash_mappings");
let mut hash_mappings: HashMap<ValueDigest<N>, gix::ObjectId> = HashMap::new();
for path in [&ns_mapping_path, &global_mapping_path] {
if let Ok(data) = self.inner.metadata.read_file_at_commit(commit_id, path) {
let mapping_str = String::from_utf8(data).unwrap_or_default();
for line in mapping_str.lines() {
if let Some((hash_hex, object_hex)) = line.split_once(':') {
if hash_hex.len() == N * 2 {
let mut hash_bytes = Vec::new();
for i in 0..N {
if let Ok(byte) =
u8::from_str_radix(&hash_hex[i * 2..i * 2 + 2], 16)
{
hash_bytes.push(byte);
} else {
break;
}
}
if hash_bytes.len() == N {
if let Ok(object_id) =
gix::ObjectId::from_hex(object_hex.as_bytes())
{
let hash = ValueDigest::raw_hash(&hash_bytes);
hash_mappings.insert(hash, object_id);
}
}
}
}
}
}
}
if hash_mappings.is_empty() {
return Ok(HashMap::new());
}
let temp_storage = GitNodeStorage::with_mappings(
self.inner.metadata.clone_repo(),
self.inner.tree.storage.dataset_dir().to_path_buf(),
hash_mappings,
)?;
let mut config = entry.config.clone();
config.root_hash = Some(root_hash.clone());
let tree = match ProllyTree::load_from_storage(temp_storage, config) {
Some(t) => t,
None => return Ok(HashMap::new()),
};
let mut kv = HashMap::new();
for key in tree.collect_keys() {
if let Some(node) = tree.find(&key) {
if let Some(idx) = node.keys.iter().position(|k| k == &key) {
kv.insert(key, node.values[idx].clone());
}
}
}
Ok(kv)
}
}
impl<const N: usize> VersionedKvStore<N, GitNodeStorage<N>, GitMetadataBackend> {
pub(crate) fn create_merge_commit_for_namespaced(
&mut self,
source_branch: &str,
) -> Result<gix::ObjectId, GitKvError> {
let dest_branch = self.current_branch.clone();
let message = format!("Merge branch '{source_branch}' into '{dest_branch}'");
self.create_merge_commit(&message, source_branch)
}
}
impl<const N: usize> NamespacedKvStore<N, InMemoryNodeStorage<N>, GitMetadataBackend> {
pub fn commit(&mut self, message: &str) -> Result<gix::ObjectId, GitKvError> {
self.commit_impl(message)
}
pub fn init<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let inner = VersionedKvStore::<N, InMemoryNodeStorage<N>>::init(&path)?;
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: StoreFormatVersion::V2,
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
let default_tree = ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
store.commit("Initial namespaced store")?;
Ok(store)
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let inner = VersionedKvStore::<N, InMemoryNodeStorage<N>>::open(&path)?;
let dataset_dir = inner
.dataset_dir
.as_ref()
.ok_or_else(|| GitKvError::GitObjectError("Dataset directory not set".into()))?;
let format_version = Self::detect_format_version(dataset_dir);
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: format_version.clone(),
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
match format_version {
StoreFormatVersion::V2 => {
store.load_namespace_registry()?;
}
StoreFormatVersion::V1 => {
let default_tree =
ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
}
}
Ok(store)
}
}
impl<const N: usize> NamespacedKvStore<N, FileNodeStorage<N>, GitMetadataBackend> {
pub fn commit(&mut self, message: &str) -> Result<gix::ObjectId, GitKvError> {
self.commit_impl(message)
}
pub fn init<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let inner = VersionedKvStore::<N, FileNodeStorage<N>>::init(&path)?;
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: StoreFormatVersion::V2,
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
let default_tree = ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
store.commit("Initial namespaced store")?;
Ok(store)
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let inner = VersionedKvStore::<N, FileNodeStorage<N>>::open(&path)?;
let dataset_dir = inner
.dataset_dir
.as_ref()
.ok_or_else(|| GitKvError::GitObjectError("Dataset directory not set".into()))?;
let format_version = Self::detect_format_version(dataset_dir);
let mut store = NamespacedKvStore {
inner,
registry: HashMap::new(),
namespaces: HashMap::new(),
namespace_staging: HashMap::new(),
default_namespace: DEFAULT_NAMESPACE.to_string(),
dirty_namespaces: HashSet::new(),
format_version: format_version.clone(),
#[cfg(feature = "proximity")]
proximity_indexes: HashMap::new(),
#[cfg(feature = "proximity")]
dirty_proximity_indexes: HashSet::new(),
#[cfg(feature = "proximity")]
text_embedders: HashMap::new(),
#[cfg(feature = "proximity")]
cascade_lists: HashMap::new(),
#[cfg(feature = "proximity")]
text_transformers: HashMap::new(),
#[cfg(feature = "proximity")]
text_chunkers: HashMap::new(),
externalize_threshold: None,
};
match format_version {
StoreFormatVersion::V2 => {
store.load_namespace_registry()?;
}
StoreFormatVersion::V1 => {
let default_tree =
ProllyTree::new(store.inner.tree.storage.clone(), TreeConfig::default());
store
.namespaces
.insert(DEFAULT_NAMESPACE.to_string(), default_tree);
}
}
Ok(store)
}
}
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> Clone
for ThreadSafeNamespacedKvStore<N, S, M>
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> ThreadSafeNamespacedKvStore<N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn new(store: NamespacedKvStore<N, S, M>) -> Self {
Self {
inner: Arc::new(Mutex::new(store)),
}
}
pub fn ns_insert(
&self,
namespace: &str,
key: Vec<u8>,
value: Vec<u8>,
) -> Result<(), GitKvError> {
self.inner.lock().namespace(namespace).insert(key, value)
}
pub fn ns_get(&self, namespace: &str, key: &[u8]) -> Option<Vec<u8>> {
self.inner.lock().namespace(namespace).get(key)
}
pub fn ns_delete(&self, namespace: &str, key: &[u8]) -> Result<bool, GitKvError> {
self.inner.lock().namespace(namespace).delete(key)
}
pub fn ns_list_keys(&self, namespace: &str) -> Vec<Vec<u8>> {
self.inner.lock().namespace(namespace).list_keys()
}
pub fn list_namespaces(&self) -> Vec<String> {
self.inner.lock().list_namespaces()
}
pub fn delete_namespace(&self, prefix: &str) -> Result<bool, GitKvError> {
self.inner.lock().delete_namespace(prefix)
}
pub fn get_namespace_root_hash(&self, prefix: &str) -> Option<ValueDigest<N>> {
self.inner.lock().get_namespace_root_hash(prefix)
}
pub fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), GitKvError> {
self.inner.lock().insert(key, value)
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
self.inner.lock().get(key)
}
pub fn delete(&self, key: &[u8]) -> Result<bool, GitKvError> {
self.inner.lock().delete(key)
}
pub fn list_keys(&self) -> Vec<Vec<u8>> {
self.inner.lock().list_keys()
}
pub fn create_branch(&self, name: &str) -> Result<(), GitKvError> {
self.inner.lock().create_branch(name)
}
pub fn current_branch(&self) -> String {
self.inner.lock().current_branch().to_string()
}
pub fn log(&self) -> Result<Vec<CommitInfo>, GitKvError> {
self.inner.lock().log()
}
}
impl<const N: usize> ThreadSafeNamespacedKvStore<N, GitNodeStorage<N>, GitMetadataBackend> {
pub fn commit(&self, message: &str) -> Result<gix::ObjectId, GitKvError> {
self.inner.lock().commit(message)
}
pub fn init<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let store = GitNamespacedKvStore::init(path)?;
Ok(Self::new(store))
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, GitKvError> {
let store = GitNamespacedKvStore::open(path)?;
Ok(Self::new(store))
}
pub fn checkout(&self, branch_or_commit: &str) -> Result<(), GitKvError> {
self.inner.lock().checkout(branch_or_commit)
}
pub fn merge<R: ConflictResolver>(
&self,
source_branch: &str,
resolver: &R,
) -> Result<gix::ObjectId, GitKvError> {
self.inner.lock().merge(source_branch, resolver)
}
pub fn namespace_changed(
&self,
prefix: &str,
commit_a: &str,
commit_b: &str,
) -> Result<bool, GitKvError> {
self.inner
.lock()
.namespace_changed(prefix, commit_a, commit_b)
}
}
#[cfg(feature = "proximity")]
type ProximityEntrySet = std::collections::BTreeMap<Vec<u8>, Vec<f32>>;
#[cfg(feature = "proximity")]
fn proximity_save_name(ns_name: &str, idx_name: &str) -> String {
format!("{ns_name}:{idx_name}")
}
#[cfg(feature = "proximity")]
pub struct ProximityNamespaceHandle<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> {
store: &'a mut NamespacedKvStore<N, S, M>,
ns_name: String,
idx_name: String,
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> std::fmt::Debug
for ProximityNamespaceHandle<'a, N, S, M>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProximityNamespaceHandle")
.field("ns_name", &self.ns_name)
.field("idx_name", &self.idx_name)
.finish()
}
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend>
ProximityNamespaceHandle<'a, N, S, M>
{
fn key(&self) -> (String, String) {
(self.ns_name.clone(), self.idx_name.clone())
}
pub fn insert(&mut self, id: Vec<u8>, vector: Vec<f32>) -> Result<(), ProximityError> {
let key = self.key();
let idx = self
.store
.proximity_indexes
.get_mut(&key)
.expect("proximity index must be loaded by NamespaceHandle::proximity_index");
idx.insert(id, vector)?;
self.store.dirty_proximity_indexes.insert(key);
Ok(())
}
pub fn remove(&mut self, id: &[u8]) -> bool {
let key = self.key();
let idx = match self.store.proximity_indexes.get_mut(&key) {
Some(i) => i,
None => return false,
};
let removed = idx.remove(id);
if removed {
self.store.dirty_proximity_indexes.insert(key);
}
removed
}
pub fn knn(
&mut self,
query: &[f32],
k: usize,
ef: usize,
) -> Result<Vec<(Vec<u8>, f32)>, ProximityError> {
let key = self.key();
let idx = self
.store
.proximity_indexes
.get_mut(&key)
.expect("proximity index must be loaded");
idx.knn(query, k, ef)
}
pub fn len(&self) -> usize {
self.store
.proximity_indexes
.get(&self.key())
.map_or(0, |i| i.len())
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn root_hash(&mut self) -> Result<Option<ValueDigest<N>>, ProximityError> {
let key = self.key();
let idx = self
.store
.proximity_indexes
.get_mut(&key)
.expect("proximity index must be loaded");
idx.root_hash().map(|opt| opt.cloned())
}
pub fn config(&self) -> ProximityConfig {
self.store
.proximity_indexes
.get(&self.key())
.expect("proximity index must be loaded")
.config()
.clone()
}
pub fn name(&self) -> &str {
&self.idx_name
}
pub fn namespace(&self) -> &str {
&self.ns_name
}
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespaceHandle<'a, N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn proximity_index<'b>(
&'b mut self,
idx_name: &str,
config: ProximityConfig,
) -> Result<ProximityNamespaceHandle<'b, N, S, M>, ProximityError>
where
'a: 'b,
{
let key = (self.ns_name.clone(), idx_name.to_string());
if !self.store.proximity_indexes.contains_key(&key) {
let save_name = proximity_save_name(&self.ns_name, idx_name);
let storage = self.store.inner.tree.storage.clone();
let idx = match ProximityIndex::load(storage.clone(), &save_name) {
Ok(loaded) => {
if loaded.config().dim != config.dim {
return Err(ProximityError::DimensionMismatch {
expected: loaded.config().dim,
got: config.dim,
});
}
loaded
}
Err(ProximityError::NotFound(_)) => ProximityIndex::new(storage, config),
Err(e) => return Err(e),
};
self.store.proximity_indexes.insert(key.clone(), idx);
}
Ok(ProximityNamespaceHandle {
store: &mut *self.store,
ns_name: self.ns_name.clone(),
idx_name: idx_name.to_string(),
})
}
pub fn drop_proximity_index(&mut self, idx_name: &str) -> bool {
let key = (self.ns_name.clone(), idx_name.to_string());
self.store.dirty_proximity_indexes.remove(&key);
self.store.proximity_indexes.remove(&key).is_some()
}
}
#[cfg(feature = "proximity")]
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespacedKvStore<N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn proximity_registry_snapshot(
&mut self,
) -> Result<HashMap<String, HashMap<String, ProximityIndexEntry<N>>>, ProximityError> {
let mut out: HashMap<String, HashMap<String, ProximityIndexEntry<N>>> = HashMap::new();
let keys: Vec<(String, String)> = self.proximity_indexes.keys().cloned().collect();
for (ns, idx_name) in keys {
let entry = {
let idx = self
.proximity_indexes
.get_mut(&(ns.clone(), idx_name.clone()))
.unwrap();
ProximityIndexEntry {
root_hash: idx.root_hash()?.cloned(),
config: idx.config().clone(),
}
};
out.entry(ns).or_default().insert(idx_name, entry);
}
Ok(out)
}
pub(crate) fn flush_dirty_proximity_indexes(&mut self) -> Result<(), GitKvError> {
let dirty: Vec<(String, String)> = self.dirty_proximity_indexes.drain().collect();
for (ns_name, idx_name) in dirty {
let save_name = proximity_save_name(&ns_name, &idx_name);
if let Some(idx) = self
.proximity_indexes
.get_mut(&(ns_name.clone(), idx_name.clone()))
{
idx.persist(&save_name).map_err(|e| {
GitKvError::GitObjectError(format!(
"Failed to persist proximity index {ns_name}:{idx_name}: {e}"
))
})?;
}
}
Ok(())
}
}
#[cfg(feature = "proximity")]
fn text_index_state_key(ns_name: &str, idx_name: &str) -> String {
text_state_key(&format!("{ns_name}:{idx_name}"))
}
#[cfg(feature = "proximity")]
pub struct TextNamespaceHandle<'a, const N: usize, S, M>
where
S: NodeStorage<N>,
M: MetadataBackend,
{
store: &'a mut NamespacedKvStore<N, S, M>,
ns_name: String,
idx_name: String,
inner_idx_key: (String, String),
embedder: Arc<dyn Embedder>,
chunker: Arc<dyn Chunker>,
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S, M> std::fmt::Debug for TextNamespaceHandle<'a, N, S, M>
where
S: NodeStorage<N>,
M: MetadataBackend,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TextNamespaceHandle")
.field("ns_name", &self.ns_name)
.field("idx_name", &self.idx_name)
.field("embedder_id", &self.embedder.id())
.field("dim", &self.embedder.dim())
.field("chunker_id", &self.chunker.id())
.finish()
}
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S, M> TextNamespaceHandle<'a, N, S, M>
where
S: NodeStorage<N>,
M: MetadataBackend,
{
pub fn insert(&mut self, id: &[u8], text: &str) -> Result<(), TextIndexError> {
self.delete_chunks_for_doc(id);
let chunks = self.chunker.split(text);
if chunks.is_empty() {
return Ok(());
}
let idx = self
.store
.proximity_indexes
.get_mut(&self.inner_idx_key)
.expect("inner proximity index must be loaded");
for (chunk_idx, chunk_text) in chunks.iter().enumerate() {
let vec = self.embedder.embed(chunk_text)?;
let chunk_id = make_chunk_id(id, chunk_idx as u32);
idx.insert(chunk_id, vec)?;
}
self.store
.dirty_proximity_indexes
.insert(self.inner_idx_key.clone());
Ok(())
}
pub fn delete(&mut self, id: &[u8]) -> bool {
self.delete_chunks_for_doc(id)
}
pub fn search(&mut self, query: &str, k: usize) -> Result<Vec<TextHit>, TextIndexError> {
if k == 0 {
return Ok(Vec::new());
}
let q = self.embedder.embed(query)?;
let raw_k = (k * OVERFETCH_MULTIPLIER).max(k);
let ef = (raw_k * 4).max(32);
let idx = self
.store
.proximity_indexes
.get_mut(&self.inner_idx_key)
.expect("inner proximity index must be loaded");
let chunk_hits = idx.knn(&q, raw_k, ef)?;
Ok(dedup_chunk_hits_by_doc(chunk_hits, k))
}
pub fn len(&self) -> usize {
let idx = match self.store.proximity_indexes.get(&self.inner_idx_key) {
Some(i) => i,
None => return 0,
};
let mut docs: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
for k in idx.entries_snapshot().keys() {
match crate::proximity::text_index::parse_chunk_id(k) {
Some((doc, _)) => {
docs.insert(doc);
}
None => {
docs.insert(k.clone());
}
}
}
docs.len()
}
pub fn chunk_count(&self) -> usize {
self.store
.proximity_indexes
.get(&self.inner_idx_key)
.map_or(0, |i| i.len())
}
pub fn is_empty(&self) -> bool {
self.chunk_count() == 0
}
fn delete_chunks_for_doc(&mut self, doc_id: &[u8]) -> bool {
let idx = match self.store.proximity_indexes.get_mut(&self.inner_idx_key) {
Some(i) => i,
None => return false,
};
let prefix = doc_id_prefix(doc_id);
let to_remove: Vec<Vec<u8>> = idx
.entries_snapshot()
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();
let mut any = false;
for cid in to_remove {
if idx.remove(&cid) {
any = true;
}
}
if any {
self.store
.dirty_proximity_indexes
.insert(self.inner_idx_key.clone());
}
any
}
pub fn name(&self) -> &str {
&self.idx_name
}
pub fn namespace(&self) -> &str {
&self.ns_name
}
pub fn embedder(&self) -> &dyn Embedder {
&*self.embedder
}
}
#[cfg(feature = "proximity")]
impl<'a, const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespaceHandle<'a, N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn text_index<'b, E: Embedder + 'static>(
&'b mut self,
idx_name: &str,
config: TextIndexConfig<E>,
) -> Result<TextNamespaceHandle<'b, N, S, M>, TextIndexError>
where
'a: 'b,
{
let state_key = text_index_state_key(&self.ns_name, idx_name);
let storage = self.store.inner.tree.storage.clone();
validate_or_write_text_identity::<N, _, _>(
&storage,
&state_key,
&config.embedder,
config.metric,
config.level_bits,
config.max_bucket_size,
)?;
let inner_local_name = text_inner_proximity_name(idx_name);
let proximity_save_name_full = proximity_save_name(&self.ns_name, &inner_local_name);
let inner_idx_key = (self.ns_name.clone(), inner_local_name);
let embedder_key = (self.ns_name.clone(), idx_name.to_string());
if !self.store.proximity_indexes.contains_key(&inner_idx_key) {
let prox_config = ProximityConfig {
dim: config.embedder.dim(),
metric: config.metric,
level_bits: config.level_bits,
max_bucket_size: config.max_bucket_size,
};
let inner = match ProximityIndex::load(storage.clone(), &proximity_save_name_full) {
Ok(loaded) => {
if loaded.config().dim != config.embedder.dim() {
return Err(TextIndexError::DimensionMismatch {
stored: loaded.config().dim,
got: config.embedder.dim(),
});
}
loaded
}
Err(ProximityError::NotFound(_)) => ProximityIndex::new(storage, prox_config),
Err(e) => return Err(TextIndexError::Proximity(e)),
};
self.store
.proximity_indexes
.insert(inner_idx_key.clone(), inner);
}
let arc_embedder: Arc<dyn Embedder> = Arc::new(config.embedder);
self.store
.text_embedders
.insert(embedder_key.clone(), arc_embedder.clone());
let arc_chunker: Arc<dyn Chunker> = config.chunker;
self.store
.text_chunkers
.insert(embedder_key, arc_chunker.clone());
Ok(TextNamespaceHandle {
store: &mut *self.store,
ns_name: self.ns_name.clone(),
idx_name: idx_name.to_string(),
inner_idx_key,
embedder: arc_embedder,
chunker: arc_chunker,
})
}
pub fn drop_text_index(&mut self, idx_name: &str) -> bool {
let inner_idx_key = (self.ns_name.clone(), text_inner_proximity_name(idx_name));
let embedder_key = (self.ns_name.clone(), idx_name.to_string());
self.store.dirty_proximity_indexes.remove(&inner_idx_key);
self.store.text_embedders.remove(&embedder_key);
self.store.text_chunkers.remove(&embedder_key);
self.store.text_transformers.remove(&embedder_key);
self.store
.proximity_indexes
.remove(&inner_idx_key)
.is_some()
}
}
#[cfg(feature = "proximity")]
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespacedKvStore<N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn set_cascade(&mut self, ns_name: &str, text_indexes: Vec<String>) {
self.cascade_lists.insert(ns_name.to_string(), text_indexes);
}
pub fn clear_cascade(&mut self, ns_name: &str) {
self.cascade_lists.remove(ns_name);
}
pub fn cascade_for_namespace(&self, ns_name: &str) -> Option<&[String]> {
self.cascade_lists.get(ns_name).map(|v| v.as_slice())
}
pub fn set_value_transformer<F>(&mut self, ns_name: &str, idx_name: &str, transformer: F)
where
F: Fn(&[u8]) -> Option<String> + Send + Sync + 'static,
{
self.text_transformers.insert(
(ns_name.to_string(), idx_name.to_string()),
Arc::new(transformer),
);
}
pub fn clear_value_transformer(&mut self, ns_name: &str, idx_name: &str) -> bool {
self.text_transformers
.remove(&(ns_name.to_string(), idx_name.to_string()))
.is_some()
}
pub fn has_value_transformer(&self, ns_name: &str, idx_name: &str) -> bool {
self.text_transformers
.contains_key(&(ns_name.to_string(), idx_name.to_string()))
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct BlobGcReport {
pub total: usize,
pub referenced: usize,
pub removed: usize,
pub errors: Vec<String>,
}
impl BlobGcReport {
pub fn remaining(&self) -> usize {
self.total - self.removed
}
}
#[cfg(feature = "proximity")]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TextIndexAudit {
pub orphans_in_index: Vec<Vec<u8>>,
pub missing_from_index: Vec<Vec<u8>>,
}
#[cfg(feature = "proximity")]
impl TextIndexAudit {
pub fn is_in_sync(&self) -> bool {
self.orphans_in_index.is_empty() && self.missing_from_index.is_empty()
}
}
#[cfg(feature = "proximity")]
impl<const N: usize, S: NodeStorage<N>, M: MetadataBackend> NamespacedKvStore<N, S, M>
where
VersionedKvStore<N, S, M>: TreeConfigSaver<N>,
{
pub fn audit_text_index(
&mut self,
ns_name: &str,
idx_name: &str,
) -> Result<TextIndexAudit, TextIndexError> {
let inner_idx_key = (ns_name.to_string(), text_inner_proximity_name(idx_name));
if !self.proximity_indexes.contains_key(&inner_idx_key) {
return Err(TextIndexError::NotFound(format!("{ns_name}:{idx_name}")));
}
let index_ids: std::collections::BTreeSet<Vec<u8>> = self
.proximity_indexes
.get(&inner_idx_key)
.unwrap()
.entries_snapshot()
.keys()
.map(|chunk_id| {
crate::proximity::text_index::parse_chunk_id(chunk_id)
.map(|(doc_id, _)| doc_id)
.unwrap_or_else(|| chunk_id.clone())
})
.collect();
let handle = self.namespace(ns_name);
let primary_ids: std::collections::BTreeSet<Vec<u8>> =
handle.list_keys().into_iter().collect();
drop(handle);
let mut orphans: Vec<Vec<u8>> = index_ids.difference(&primary_ids).cloned().collect();
let mut missing: Vec<Vec<u8>> = primary_ids.difference(&index_ids).cloned().collect();
orphans.sort();
missing.sort();
Ok(TextIndexAudit {
orphans_in_index: orphans,
missing_from_index: missing,
})
}
pub fn purge_text_index_orphans(
&mut self,
ns_name: &str,
idx_name: &str,
) -> Result<usize, TextIndexError> {
let report = self.audit_text_index(ns_name, idx_name)?;
if report.orphans_in_index.is_empty() {
return Ok(0);
}
let inner_idx_key = (ns_name.to_string(), text_inner_proximity_name(idx_name));
let count = report.orphans_in_index.len();
let idx = self
.proximity_indexes
.get_mut(&inner_idx_key)
.expect("loaded by audit_text_index");
for orphan_doc_id in &report.orphans_in_index {
let prefix = crate::proximity::text_index::doc_id_prefix(orphan_doc_id);
let chunk_ids: Vec<Vec<u8>> = idx
.entries_snapshot()
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();
for chunk_id in chunk_ids {
idx.remove(&chunk_id);
}
}
self.dirty_proximity_indexes.insert(inner_idx_key);
Ok(count)
}
}