use crate::model::{Triple, TriplePattern};
use crate::OxirsError;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ImmutableConfig {
pub path: PathBuf,
pub block_size: usize,
pub deduplication: bool,
pub merkle_depth: usize,
pub gc_policy: GarbageCollectionPolicy,
}
impl Default for ImmutableConfig {
fn default() -> Self {
ImmutableConfig {
path: PathBuf::from("/var/oxirs/immutable"),
block_size: 4096,
deduplication: true,
merkle_depth: 4,
gc_policy: GarbageCollectionPolicy::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct GarbageCollectionPolicy {
pub auto_gc: bool,
pub threshold: f64,
pub min_age_hours: u32,
}
impl Default for GarbageCollectionPolicy {
fn default() -> Self {
GarbageCollectionPolicy {
auto_gc: true,
threshold: 0.2,
min_age_hours: 24,
}
}
}
pub type ContentHash = [u8; 32];
pub struct ImmutableStorage {
config: ImmutableConfig,
blocks: Arc<RwLock<BlockStore>>,
merkle_tree: Arc<RwLock<MerkleTree>>,
references: Arc<RwLock<ReferenceTracker>>,
dedup_index: Arc<RwLock<DeduplicationIndex>>,
stats: Arc<RwLock<ImmutableStats>>,
}
struct BlockStore {
path: PathBuf,
cache: lru::LruCache<ContentHash, Block>,
metadata: HashMap<ContentHash, BlockMetadata>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Block {
hash: ContentHash,
block_type: BlockType,
data: Vec<u8>,
references: Vec<ContentHash>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum BlockType {
TripleData,
Index,
MerkleNode,
Commit,
Manifest,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BlockMetadata {
created_at: chrono::DateTime<chrono::Utc>,
size: usize,
compression: Option<String>,
ref_count: u32,
}
struct MerkleTree {
root: Option<ContentHash>,
nodes: HashMap<ContentHash, MerkleNode>,
#[allow(dead_code)]
depth: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MerkleNode {
hash: ContentHash,
left: Option<ContentHash>,
right: Option<ContentHash>,
data: Option<ContentHash>,
}
struct ReferenceTracker {
forward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
backward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
roots: HashSet<ContentHash>,
}
struct DeduplicationIndex {
#[allow(dead_code)]
fingerprints: HashMap<u64, Vec<ContentHash>>,
triple_blocks: HashMap<Triple, ContentHash>,
}
#[derive(Debug, Default)]
struct ImmutableStats {
total_blocks: u64,
unique_blocks: u64,
#[allow(dead_code)]
total_size: u64,
#[allow(dead_code)]
dedup_savings: u64,
gc_reclaimed: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Commit {
pub hash: ContentHash,
pub parents: Vec<ContentHash>,
pub tree: ContentHash,
pub author: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub message: String,
pub metadata: HashMap<String, String>,
}
impl ImmutableStorage {
pub async fn new(config: ImmutableConfig) -> Result<Self, OxirsError> {
std::fs::create_dir_all(&config.path)?;
std::fs::create_dir_all(config.path.join("blocks"))?;
std::fs::create_dir_all(config.path.join("refs"))?;
let cache_size = 1000;
Ok(ImmutableStorage {
config: config.clone(),
blocks: Arc::new(RwLock::new(BlockStore {
path: config.path.join("blocks"),
cache: lru::LruCache::new(
std::num::NonZeroUsize::new(cache_size).unwrap_or(
std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
),
),
metadata: HashMap::new(),
})),
merkle_tree: Arc::new(RwLock::new(MerkleTree {
root: None,
nodes: HashMap::new(),
depth: config.merkle_depth,
})),
references: Arc::new(RwLock::new(ReferenceTracker {
forward_refs: HashMap::new(),
backward_refs: HashMap::new(),
roots: HashSet::new(),
})),
dedup_index: Arc::new(RwLock::new(DeduplicationIndex {
fingerprints: HashMap::new(),
triple_blocks: HashMap::new(),
})),
stats: Arc::new(RwLock::new(ImmutableStats::default())),
})
}
pub async fn store_triples(
&self,
triples: &[Triple],
message: &str,
) -> Result<Commit, OxirsError> {
let mut block_hashes = Vec::new();
let mut blocks_guard = self.blocks.write().await;
let mut dedup_guard = self.dedup_index.write().await;
for chunk in triples.chunks(100) {
if self.config.deduplication {
let mut unique_triples = Vec::new();
for triple in chunk {
if !dedup_guard.triple_blocks.contains_key(triple) {
unique_triples.push(triple.clone());
}
}
if !unique_triples.is_empty() {
let block = self.create_triple_block(&unique_triples)?;
let hash = block.hash;
for triple in unique_triples {
dedup_guard.triple_blocks.insert(triple, hash);
}
self.store_block(&mut blocks_guard, block).await?;
block_hashes.push(hash);
}
} else {
let block = self.create_triple_block(chunk)?;
let hash = block.hash;
self.store_block(&mut blocks_guard, block).await?;
block_hashes.push(hash);
}
}
let index_blocks = self.create_index_blocks(&block_hashes)?;
for block in index_blocks {
self.store_block(&mut blocks_guard, block).await?;
}
let tree_root = self.build_merkle_tree(&block_hashes).await?;
let commit = Commit {
hash: self.compute_commit_hash(&tree_root, message),
parents: vec![], tree: tree_root,
author: "system".to_string(),
timestamp: chrono::Utc::now(),
message: message.to_string(),
metadata: HashMap::new(),
};
let commit_block = Block {
hash: commit.hash,
block_type: BlockType::Commit,
data: oxicode::serde::encode_to_vec(&commit, oxicode::config::standard())?,
references: vec![tree_root],
};
self.store_block(&mut blocks_guard, commit_block).await?;
self.update_references(&commit).await?;
let mut stats = self.stats.write().await;
stats.total_blocks += block_hashes.len() as u64 + 1;
stats.unique_blocks += block_hashes.len() as u64 + 1;
Ok(commit)
}
pub async fn read_commit(&self, commit_hash: ContentHash) -> Result<Vec<Triple>, OxirsError> {
let blocks = self.blocks.read().await;
let commit_block = self.load_block(&blocks, &commit_hash).await?;
let commit: Commit =
oxicode::serde::decode_from_slice(&commit_block.data, oxicode::config::standard())
.map(|(v, _)| v)?;
let triple_blocks = self.find_triple_blocks(&commit.tree).await?;
let mut all_triples = Vec::new();
for block_hash in triple_blocks {
let block = self.load_block(&blocks, &block_hash).await?;
let triples: Vec<Triple> =
oxicode::serde::decode_from_slice(&block.data, oxicode::config::standard())
.map(|(v, _)| v)?;
all_triples.extend(triples);
}
Ok(all_triples)
}
pub async fn query_triples(
&self,
pattern: &TriplePattern,
commit_hash: Option<ContentHash>,
) -> Result<Vec<Triple>, OxirsError> {
let commit = if let Some(hash) = commit_hash {
hash
} else {
self.get_head().await?
};
let all_triples = self.read_commit(commit).await?;
Ok(all_triples
.into_iter()
.filter(|triple| pattern.matches(triple))
.collect())
}
pub async fn verify_integrity(&self) -> Result<IntegrityReport, OxirsError> {
let blocks = self.blocks.read().await;
let merkle = self.merkle_tree.read().await;
let mut report = IntegrityReport {
total_blocks: 0,
verified_blocks: 0,
corrupted_blocks: Vec::new(),
missing_blocks: Vec::new(),
merkle_valid: true,
};
for hash in blocks.metadata.keys() {
report.total_blocks += 1;
if let Ok(block) = self.load_block(&blocks, hash).await {
if self.compute_hash(&block.data) == *hash {
report.verified_blocks += 1;
} else {
report.corrupted_blocks.push(*hash);
}
} else {
report.missing_blocks.push(*hash);
}
}
if let Some(root) = merkle.root {
report.merkle_valid = self.verify_merkle_tree(&merkle, root).await?;
}
Ok(report)
}
pub async fn garbage_collect(&self) -> Result<GCReport, OxirsError> {
let mut refs = self.references.write().await;
let mut blocks = self.blocks.write().await;
let mut report = GCReport {
total_blocks: blocks.metadata.len(),
reachable_blocks: 0,
collected_blocks: 0,
reclaimed_bytes: 0,
};
let mut reachable = HashSet::new();
let mut to_visit: Vec<_> = refs.roots.iter().cloned().collect();
while let Some(hash) = to_visit.pop() {
if reachable.insert(hash) {
if let Some(children) = refs.forward_refs.get(&hash) {
to_visit.extend(children.iter().cloned());
}
}
}
report.reachable_blocks = reachable.len();
let unreachable: Vec<_> = blocks
.metadata
.keys()
.filter(|hash| !reachable.contains(*hash))
.cloned()
.collect();
for hash in unreachable {
if let Some(metadata) = blocks.metadata.remove(&hash) {
report.collected_blocks += 1;
report.reclaimed_bytes += metadata.size;
let block_path = blocks.path.join(hex::encode(hash));
let _ = std::fs::remove_file(block_path);
blocks.cache.pop(&hash);
refs.forward_refs.remove(&hash);
for (_, back_refs) in refs.backward_refs.iter_mut() {
back_refs.remove(&hash);
}
}
}
let mut stats = self.stats.write().await;
stats.gc_reclaimed += report.reclaimed_bytes as u64;
Ok(report)
}
fn create_triple_block(&self, triples: &[Triple]) -> Result<Block, OxirsError> {
let data = oxicode::serde::encode_to_vec(&triples, oxicode::config::standard())?;
let hash = self.compute_hash(&data);
Ok(Block {
hash,
block_type: BlockType::TripleData,
data,
references: Vec::new(),
})
}
fn create_index_blocks(&self, _data_blocks: &[ContentHash]) -> Result<Vec<Block>, OxirsError> {
Ok(Vec::new())
}
async fn build_merkle_tree(&self, blocks: &[ContentHash]) -> Result<ContentHash, OxirsError> {
let mut merkle = self.merkle_tree.write().await;
let mut current_level = blocks.to_vec();
while current_level.len() > 1 {
let mut next_level = Vec::new();
for chunk in current_level.chunks(2) {
let left = chunk[0];
let right = chunk.get(1).cloned().unwrap_or(left);
let combined = [left.as_slice(), right.as_slice()].concat();
let parent_hash = self.compute_hash(&combined);
let node = MerkleNode {
hash: parent_hash,
left: Some(left),
right: Some(right),
data: None,
};
merkle.nodes.insert(parent_hash, node);
next_level.push(parent_hash);
}
current_level = next_level;
}
let root = current_level[0];
merkle.root = Some(root);
Ok(root)
}
async fn store_block(&self, blocks: &mut BlockStore, block: Block) -> Result<(), OxirsError> {
let hash = block.hash;
let size = block.data.len();
let block_path = blocks.path.join(hex::encode(hash));
let compressed = self.compress_block(&block)?;
std::fs::write(block_path, compressed)?;
blocks.metadata.insert(
hash,
BlockMetadata {
created_at: chrono::Utc::now(),
size,
compression: Some("zstd".to_string()),
ref_count: 0,
},
);
blocks.cache.put(hash, block);
Ok(())
}
async fn load_block(
&self,
blocks: &BlockStore,
hash: &ContentHash,
) -> Result<Block, OxirsError> {
if let Some(block) = blocks.cache.peek(hash) {
return Ok(block.clone());
}
let block_path = blocks.path.join(hex::encode(hash));
let compressed = std::fs::read(block_path)?;
let block = self.decompress_block(&compressed)?;
Ok(block)
}
async fn find_triple_blocks(
&self,
tree_hash: &ContentHash,
) -> Result<Vec<ContentHash>, OxirsError> {
Ok(vec![*tree_hash])
}
async fn update_references(&self, commit: &Commit) -> Result<(), OxirsError> {
let mut refs = self.references.write().await;
refs.roots.insert(commit.hash);
refs.forward_refs
.entry(commit.hash)
.or_insert_with(HashSet::new)
.insert(commit.tree);
refs.backward_refs
.entry(commit.tree)
.or_insert_with(HashSet::new)
.insert(commit.hash);
Ok(())
}
async fn get_head(&self) -> Result<ContentHash, OxirsError> {
let refs = self.references.read().await;
refs.roots
.iter()
.next()
.cloned()
.ok_or_else(|| OxirsError::Store("No commits found".to_string()))
}
async fn verify_merkle_tree(
&self,
merkle: &MerkleTree,
root: ContentHash,
) -> Result<bool, OxirsError> {
Ok(merkle.nodes.contains_key(&root))
}
fn compute_hash(&self, data: &[u8]) -> ContentHash {
let mut hasher = Sha256::new();
hasher.update(data);
hasher.finalize().into()
}
fn compute_commit_hash(&self, tree: &ContentHash, message: &str) -> ContentHash {
let mut hasher = Sha256::new();
hasher.update(tree);
hasher.update(message.as_bytes());
hasher.finalize().into()
}
fn compress_block(&self, block: &Block) -> Result<Vec<u8>, OxirsError> {
let serialized = oxicode::serde::encode_to_vec(block, oxicode::config::standard())?;
oxiarc_zstd::encode_all(&serialized, 3)
.map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
}
fn decompress_block(&self, data: &[u8]) -> Result<Block, OxirsError> {
let decompressed = oxiarc_zstd::decode_all(data)
.map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))?;
oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(Into::into)
}
}
#[derive(Debug)]
pub struct IntegrityReport {
pub total_blocks: usize,
pub verified_blocks: usize,
pub corrupted_blocks: Vec<ContentHash>,
pub missing_blocks: Vec<ContentHash>,
pub merkle_valid: bool,
}
#[derive(Debug)]
pub struct GCReport {
pub total_blocks: usize,
pub reachable_blocks: usize,
pub collected_blocks: usize,
pub reclaimed_bytes: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
#[tokio::test]
async fn test_immutable_storage() {
let config = ImmutableConfig {
path: PathBuf::from("/tmp/oxirs_immutable_test"),
..Default::default()
};
let storage = ImmutableStorage::new(config)
.await
.expect("async operation should succeed");
let triples = vec![
Triple::new(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
NamedNode::new("http://example.org/p1").expect("valid IRI"),
crate::model::Object::NamedNode(
NamedNode::new("http://example.org/o1").expect("valid IRI"),
),
),
Triple::new(
NamedNode::new("http://example.org/s2").expect("valid IRI"),
NamedNode::new("http://example.org/p2").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("test")),
),
];
let commit = storage
.store_triples(&triples, "Initial commit")
.await
.expect("operation should succeed");
let loaded = storage
.read_commit(commit.hash)
.await
.expect("async operation should succeed");
assert_eq!(loaded.len(), 2);
let pattern = TriplePattern::new(
Some(crate::model::SubjectPattern::NamedNode(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
)),
None,
None,
);
let results = storage
.query_triples(&pattern, Some(commit.hash))
.await
.expect("operation should succeed");
assert_eq!(results.len(), 1);
}
#[tokio::test]
async fn test_deduplication() {
let config = ImmutableConfig {
path: PathBuf::from("/tmp/oxirs_immutable_dedup"),
deduplication: true,
..Default::default()
};
let storage = ImmutableStorage::new(config)
.await
.expect("async operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("value")),
);
let triples = vec![triple.clone(), triple.clone(), triple.clone()];
let _commit = storage
.store_triples(&triples, "Dedup test")
.await
.expect("async operation should succeed");
let stats = storage.stats.read().await;
assert!(stats.unique_blocks < 3);
}
}