use crate::digest::ValueDigest;
use crate::git::types::GitKvError;
use crate::node::ProllyNode;
use gix::prelude::*;
use lru::LruCache;
use std::collections::HashMap;
use std::num::NonZeroUsize;
const DEFAULT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap();
use parking_lot::Mutex;
use std::sync::Arc;
use super::{NodeStorage, StorageError};
#[derive(Debug)]
pub struct GitNodeStorage<const N: usize> {
_repository: Arc<Mutex<gix::Repository>>,
cache: Mutex<LruCache<ValueDigest<N>, Arc<ProllyNode<N>>>>,
configs: Mutex<HashMap<String, Vec<u8>>>,
hash_to_object_id: Mutex<HashMap<ValueDigest<N>, gix::ObjectId>>,
dataset_dir: std::path::PathBuf,
}
impl<const N: usize> Clone for GitNodeStorage<N> {
fn clone(&self) -> Self {
let cloned = Self {
_repository: self._repository.clone(),
cache: Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE)),
configs: Mutex::new(HashMap::new()),
hash_to_object_id: Mutex::new(HashMap::new()),
dataset_dir: self.dataset_dir.clone(),
};
cloned.load_hash_mappings();
cloned
}
}
impl<const N: usize> GitNodeStorage<N> {
pub fn dataset_dir(&self) -> &std::path::Path {
&self.dataset_dir
}
pub fn get_hash_mappings(&self) -> HashMap<ValueDigest<N>, gix::ObjectId> {
self.hash_to_object_id.lock().clone()
}
pub fn merge_hash_mappings(&self, other_mappings: HashMap<ValueDigest<N>, gix::ObjectId>) {
let mut map = self.hash_to_object_id.lock();
map.extend(other_mappings);
}
pub fn new(
repository: gix::Repository,
dataset_dir: std::path::PathBuf,
) -> Result<Self, GitKvError> {
let cache_size = DEFAULT_CACHE_SIZE;
let storage = GitNodeStorage {
_repository: Arc::new(Mutex::new(repository)),
cache: Mutex::new(LruCache::new(cache_size)),
configs: Mutex::new(HashMap::new()),
hash_to_object_id: Mutex::new(HashMap::new()),
dataset_dir,
};
storage.load_hash_mappings();
Ok(storage)
}
pub fn with_cache_size(
repository: gix::Repository,
dataset_dir: std::path::PathBuf,
cache_size: usize,
) -> Result<Self, GitKvError> {
let cache_size = NonZeroUsize::new(cache_size).unwrap_or(DEFAULT_CACHE_SIZE);
let storage = GitNodeStorage {
_repository: Arc::new(Mutex::new(repository)),
cache: Mutex::new(LruCache::new(cache_size)),
configs: Mutex::new(HashMap::new()),
hash_to_object_id: Mutex::new(HashMap::new()),
dataset_dir,
};
storage.load_hash_mappings();
Ok(storage)
}
pub fn with_mappings(
repository: gix::Repository,
dataset_dir: std::path::PathBuf,
hash_mappings: HashMap<ValueDigest<N>, gix::ObjectId>,
) -> Result<Self, GitKvError> {
let cache_size = DEFAULT_CACHE_SIZE;
let storage = GitNodeStorage {
_repository: Arc::new(Mutex::new(repository)),
cache: Mutex::new(LruCache::new(cache_size)),
configs: Mutex::new(HashMap::new()),
hash_to_object_id: Mutex::new(hash_mappings),
dataset_dir,
};
Ok(storage)
}
fn store_node_as_blob(&self, node: &ProllyNode<N>) -> Result<gix::ObjectId, GitKvError> {
let serialized = bincode::serialize(node)?;
let repo = self._repository.lock();
let blob = gix::objs::Blob { data: serialized };
let blob_id = repo
.objects
.write(&blob)
.map_err(|e| GitKvError::GitObjectError(format!("Failed to write blob: {e}")))?;
Ok(blob_id)
}
fn load_node_from_blob(&self, blob_id: &gix::ObjectId) -> Result<ProllyNode<N>, GitKvError> {
let repo = self._repository.lock();
let mut buffer = Vec::new();
let object = repo.objects.find(blob_id, &mut buffer).map_err(|e| {
GitKvError::GitObjectError(format!("Failed to find blob {blob_id}: {e}"))
})?;
let node: ProllyNode<N> =
bincode::deserialize(object.data).map_err(GitKvError::SerializationError)?;
Ok(node)
}
}
impl<const N: usize> NodeStorage<N> for GitNodeStorage<N> {
fn get_node_by_hash(&self, hash: &ValueDigest<N>) -> Option<Arc<ProllyNode<N>>> {
if let Some(node) = self.cache.lock().peek(hash) {
return Some(Arc::clone(node));
}
let object_id = self.hash_to_object_id.lock().get(hash).cloned()?;
let node = Arc::new(self.load_node_from_blob(&object_id).ok()?);
self.cache.lock().put(hash.clone(), Arc::clone(&node));
Some(node)
}
fn insert_node(
&mut self,
hash: ValueDigest<N>,
node: ProllyNode<N>,
) -> Result<(), StorageError> {
let already_exists = self.hash_to_object_id.lock().contains_key(&hash);
self.cache.lock().put(hash.clone(), Arc::new(node.clone()));
let blob_id = self
.store_node_as_blob(&node)
.map_err(|e| StorageError::Other(e.to_string()))?;
self.hash_to_object_id.lock().insert(hash.clone(), blob_id);
if !already_exists {
self.save_hash_mapping(&hash, &blob_id);
}
Ok(())
}
fn delete_node(&mut self, hash: &ValueDigest<N>) -> Result<(), StorageError> {
self.cache.lock().pop(hash);
self.hash_to_object_id.lock().remove(hash);
Ok(())
}
fn save_config(&self, key: &str, config: &[u8]) {
let mut configs = self.configs.lock();
configs.insert(key.to_string(), config.to_vec());
let config_path = self.dataset_dir.join(format!("prolly_config_{key}"));
let _ = std::fs::write(config_path, config);
}
fn get_config(&self, key: &str) -> Option<Vec<u8>> {
if let Some(config) = self.configs.lock().get(key).cloned() {
return Some(config);
}
let config_path = self.dataset_dir.join(format!("prolly_config_{key}"));
if let Ok(config) = std::fs::read(config_path) {
self.configs.lock().insert(key.to_string(), config.clone());
return Some(config);
}
None
}
}
impl<const N: usize> GitNodeStorage<N> {
fn save_hash_mapping(&self, hash: &ValueDigest<N>, object_id: &gix::ObjectId) {
let mapping_path = self.dataset_dir.join("prolly_hash_mappings");
let mut mappings = if mapping_path.exists() {
std::fs::read_to_string(&mapping_path).unwrap_or_default()
} else {
String::new()
};
let hash_bytes: Vec<String> = hash.0.iter().map(|b| format!("{b:02x}")).collect();
let hash_hex = hash_bytes.join("");
let object_hex = object_id.to_hex().to_string();
mappings.push_str(&format!("{hash_hex}:{object_hex}\n"));
let _ = std::fs::write(mapping_path, mappings);
}
fn load_hash_mappings(&self) {
let mapping_path = self.dataset_dir.join("prolly_hash_mappings");
if let Ok(mappings) = std::fs::read_to_string(mapping_path) {
let mut hash_map = self.hash_to_object_id.lock();
for line in mappings.lines() {
if let Some((hash_hex, object_hex)) = line.split_once(':') {
if hash_hex.len() == N * 2 {
let mut hash_bytes = Vec::new();
for i in 0..N {
if let Ok(byte) = u8::from_str_radix(&hash_hex[i * 2..i * 2 + 2], 16) {
hash_bytes.push(byte);
} else {
break;
}
}
if hash_bytes.len() == N {
if let Ok(object_id) = gix::ObjectId::from_hex(object_hex.as_bytes()) {
let mut hash_array = [0u8; N];
hash_array.copy_from_slice(&hash_bytes);
let hash = ValueDigest(hash_array);
hash_map.insert(hash, object_id);
}
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::TreeConfig;
use crate::node::ProllyNode;
use tempfile::TempDir;
fn create_test_repo() -> (TempDir, gix::Repository) {
let temp_dir = TempDir::new().unwrap();
let repo = gix::init_bare(temp_dir.path()).unwrap();
(temp_dir, repo)
}
fn create_test_node<const N: usize>() -> ProllyNode<N> {
let config: TreeConfig<N> = TreeConfig::default();
ProllyNode {
keys: vec![b"key1".to_vec(), b"key2".to_vec()],
key_schema: config.key_schema.clone(),
values: vec![b"value1".to_vec(), b"value2".to_vec()],
value_schema: config.value_schema.clone(),
is_leaf: true,
level: 0,
base: config.base,
modulus: config.modulus,
min_chunk_size: config.min_chunk_size,
max_chunk_size: config.max_chunk_size,
pattern: config.pattern,
split: false,
merged: false,
encode_types: Vec::new(),
encode_values: Vec::new(),
}
}
#[test]
fn test_git_node_storage_basic_operations() {
let (temp_dir, repo) = create_test_repo();
let mut storage = GitNodeStorage::<32>::new(repo, temp_dir.path().to_path_buf()).unwrap();
let node = create_test_node();
let hash = node.get_hash();
storage.insert_node(hash.clone(), node.clone()).unwrap();
let retrieved = storage.get_node_by_hash(&hash);
assert!(retrieved.is_some());
let retrieved_node = retrieved.unwrap();
assert_eq!(retrieved_node.keys, node.keys);
assert_eq!(retrieved_node.values, node.values);
assert_eq!(retrieved_node.is_leaf, node.is_leaf);
storage.delete_node(&hash).unwrap();
}
#[test]
fn test_cache_functionality() {
let (temp_dir, repo) = create_test_repo();
let dataset_dir = temp_dir.path().join("dataset");
std::fs::create_dir_all(&dataset_dir).unwrap();
let mut storage = GitNodeStorage::<32>::with_cache_size(repo, dataset_dir, 2).unwrap();
let node1 = create_test_node();
let hash1 = node1.get_hash();
storage.insert_node(hash1.clone(), node1.clone()).unwrap();
assert!(storage.cache.lock().contains(&hash1));
let cached = storage.get_node_by_hash(&hash1);
assert!(cached.is_some());
}
}