1use crate::model::{Triple, TriplePattern};
7use crate::OxirsError;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone)]
17pub struct ImmutableConfig {
18 pub path: PathBuf,
20 pub block_size: usize,
22 pub deduplication: bool,
24 pub merkle_depth: usize,
26 pub gc_policy: GarbageCollectionPolicy,
28}
29
30impl Default for ImmutableConfig {
31 fn default() -> Self {
32 ImmutableConfig {
33 path: PathBuf::from("/var/oxirs/immutable"),
34 block_size: 4096,
35 deduplication: true,
36 merkle_depth: 4,
37 gc_policy: GarbageCollectionPolicy::default(),
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct GarbageCollectionPolicy {
45 pub auto_gc: bool,
47 pub threshold: f64,
49 pub min_age_hours: u32,
51}
52
53impl Default for GarbageCollectionPolicy {
54 fn default() -> Self {
55 GarbageCollectionPolicy {
56 auto_gc: true,
57 threshold: 0.2,
58 min_age_hours: 24,
59 }
60 }
61}
62
63pub type ContentHash = [u8; 32];
65
66pub struct ImmutableStorage {
68 config: ImmutableConfig,
69 blocks: Arc<RwLock<BlockStore>>,
71 merkle_tree: Arc<RwLock<MerkleTree>>,
73 references: Arc<RwLock<ReferenceTracker>>,
75 dedup_index: Arc<RwLock<DeduplicationIndex>>,
77 stats: Arc<RwLock<ImmutableStats>>,
79}
80
81struct BlockStore {
83 path: PathBuf,
85 cache: lru::LruCache<ContentHash, Block>,
87 metadata: HashMap<ContentHash, BlockMetadata>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93struct Block {
94 hash: ContentHash,
96 block_type: BlockType,
98 data: Vec<u8>,
100 references: Vec<ContentHash>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106enum BlockType {
107 TripleData,
109 Index,
111 MerkleNode,
113 Commit,
115 Manifest,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121struct BlockMetadata {
122 created_at: chrono::DateTime<chrono::Utc>,
124 size: usize,
126 compression: Option<String>,
128 ref_count: u32,
130}
131
132struct MerkleTree {
134 root: Option<ContentHash>,
136 nodes: HashMap<ContentHash, MerkleNode>,
138 #[allow(dead_code)]
140 depth: usize,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145struct MerkleNode {
146 hash: ContentHash,
148 left: Option<ContentHash>,
150 right: Option<ContentHash>,
152 data: Option<ContentHash>,
154}
155
156struct ReferenceTracker {
158 forward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
160 backward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
162 roots: HashSet<ContentHash>,
164}
165
166struct DeduplicationIndex {
168 #[allow(dead_code)]
170 fingerprints: HashMap<u64, Vec<ContentHash>>,
171 triple_blocks: HashMap<Triple, ContentHash>,
173}
174
175#[derive(Debug, Default)]
177struct ImmutableStats {
178 total_blocks: u64,
179 unique_blocks: u64,
180 #[allow(dead_code)]
181 total_size: u64,
182 #[allow(dead_code)]
183 dedup_savings: u64,
184 gc_reclaimed: u64,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct Commit {
190 pub hash: ContentHash,
192 pub parents: Vec<ContentHash>,
194 pub tree: ContentHash,
196 pub author: String,
198 pub timestamp: chrono::DateTime<chrono::Utc>,
200 pub message: String,
202 pub metadata: HashMap<String, String>,
204}
205
206impl ImmutableStorage {
207 pub async fn new(config: ImmutableConfig) -> Result<Self, OxirsError> {
209 std::fs::create_dir_all(&config.path)?;
210 std::fs::create_dir_all(config.path.join("blocks"))?;
211 std::fs::create_dir_all(config.path.join("refs"))?;
212
213 let cache_size = 1000; Ok(ImmutableStorage {
216 config: config.clone(),
217 blocks: Arc::new(RwLock::new(BlockStore {
218 path: config.path.join("blocks"),
219 cache: lru::LruCache::new(
220 std::num::NonZeroUsize::new(cache_size).unwrap_or(
221 std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
222 ),
223 ),
224 metadata: HashMap::new(),
225 })),
226 merkle_tree: Arc::new(RwLock::new(MerkleTree {
227 root: None,
228 nodes: HashMap::new(),
229 depth: config.merkle_depth,
230 })),
231 references: Arc::new(RwLock::new(ReferenceTracker {
232 forward_refs: HashMap::new(),
233 backward_refs: HashMap::new(),
234 roots: HashSet::new(),
235 })),
236 dedup_index: Arc::new(RwLock::new(DeduplicationIndex {
237 fingerprints: HashMap::new(),
238 triple_blocks: HashMap::new(),
239 })),
240 stats: Arc::new(RwLock::new(ImmutableStats::default())),
241 })
242 }
243
244 pub async fn store_triples(
246 &self,
247 triples: &[Triple],
248 message: &str,
249 ) -> Result<Commit, OxirsError> {
250 let mut block_hashes = Vec::new();
251 let mut blocks_guard = self.blocks.write().await;
252 let mut dedup_guard = self.dedup_index.write().await;
253
254 for chunk in triples.chunks(100) {
256 if self.config.deduplication {
258 let mut unique_triples = Vec::new();
259 for triple in chunk {
260 if !dedup_guard.triple_blocks.contains_key(triple) {
261 unique_triples.push(triple.clone());
262 }
263 }
264
265 if !unique_triples.is_empty() {
266 let block = self.create_triple_block(&unique_triples)?;
267 let hash = block.hash;
268
269 for triple in unique_triples {
271 dedup_guard.triple_blocks.insert(triple, hash);
272 }
273
274 self.store_block(&mut blocks_guard, block).await?;
276 block_hashes.push(hash);
277 }
278 } else {
279 let block = self.create_triple_block(chunk)?;
280 let hash = block.hash;
281 self.store_block(&mut blocks_guard, block).await?;
282 block_hashes.push(hash);
283 }
284 }
285
286 let index_blocks = self.create_index_blocks(&block_hashes)?;
288 for block in index_blocks {
289 self.store_block(&mut blocks_guard, block).await?;
290 }
291
292 let tree_root = self.build_merkle_tree(&block_hashes).await?;
294
295 let commit = Commit {
297 hash: self.compute_commit_hash(&tree_root, message),
298 parents: vec![], tree: tree_root,
300 author: "system".to_string(),
301 timestamp: chrono::Utc::now(),
302 message: message.to_string(),
303 metadata: HashMap::new(),
304 };
305
306 let commit_block = Block {
308 hash: commit.hash,
309 block_type: BlockType::Commit,
310 data: oxicode::serde::encode_to_vec(&commit, oxicode::config::standard())?,
311 references: vec![tree_root],
312 };
313 self.store_block(&mut blocks_guard, commit_block).await?;
314
315 self.update_references(&commit).await?;
317
318 let mut stats = self.stats.write().await;
320 stats.total_blocks += block_hashes.len() as u64 + 1;
321 stats.unique_blocks += block_hashes.len() as u64 + 1;
322
323 Ok(commit)
324 }
325
326 pub async fn read_commit(&self, commit_hash: ContentHash) -> Result<Vec<Triple>, OxirsError> {
328 let blocks = self.blocks.read().await;
329
330 let commit_block = self.load_block(&blocks, &commit_hash).await?;
332 let commit: Commit =
333 oxicode::serde::decode_from_slice(&commit_block.data, oxicode::config::standard())
334 .map(|(v, _)| v)?;
335
336 let triple_blocks = self.find_triple_blocks(&commit.tree).await?;
338
339 let mut all_triples = Vec::new();
341 for block_hash in triple_blocks {
342 let block = self.load_block(&blocks, &block_hash).await?;
343 let triples: Vec<Triple> =
344 oxicode::serde::decode_from_slice(&block.data, oxicode::config::standard())
345 .map(|(v, _)| v)?;
346 all_triples.extend(triples);
347 }
348
349 Ok(all_triples)
350 }
351
352 pub async fn query_triples(
354 &self,
355 pattern: &TriplePattern,
356 commit_hash: Option<ContentHash>,
357 ) -> Result<Vec<Triple>, OxirsError> {
358 let commit = if let Some(hash) = commit_hash {
360 hash
361 } else {
362 self.get_head().await?
364 };
365
366 let all_triples = self.read_commit(commit).await?;
367
368 Ok(all_triples
370 .into_iter()
371 .filter(|triple| pattern.matches(triple))
372 .collect())
373 }
374
375 pub async fn verify_integrity(&self) -> Result<IntegrityReport, OxirsError> {
377 let blocks = self.blocks.read().await;
378 let merkle = self.merkle_tree.read().await;
379
380 let mut report = IntegrityReport {
381 total_blocks: 0,
382 verified_blocks: 0,
383 corrupted_blocks: Vec::new(),
384 missing_blocks: Vec::new(),
385 merkle_valid: true,
386 };
387
388 for hash in blocks.metadata.keys() {
390 report.total_blocks += 1;
391
392 if let Ok(block) = self.load_block(&blocks, hash).await {
393 if self.compute_hash(&block.data) == *hash {
395 report.verified_blocks += 1;
396 } else {
397 report.corrupted_blocks.push(*hash);
398 }
399 } else {
400 report.missing_blocks.push(*hash);
401 }
402 }
403
404 if let Some(root) = merkle.root {
406 report.merkle_valid = self.verify_merkle_tree(&merkle, root).await?;
407 }
408
409 Ok(report)
410 }
411
412 pub async fn garbage_collect(&self) -> Result<GCReport, OxirsError> {
414 let mut refs = self.references.write().await;
415 let mut blocks = self.blocks.write().await;
416
417 let mut report = GCReport {
418 total_blocks: blocks.metadata.len(),
419 reachable_blocks: 0,
420 collected_blocks: 0,
421 reclaimed_bytes: 0,
422 };
423
424 let mut reachable = HashSet::new();
426 let mut to_visit: Vec<_> = refs.roots.iter().cloned().collect();
427
428 while let Some(hash) = to_visit.pop() {
429 if reachable.insert(hash) {
430 if let Some(children) = refs.forward_refs.get(&hash) {
431 to_visit.extend(children.iter().cloned());
432 }
433 }
434 }
435
436 report.reachable_blocks = reachable.len();
437
438 let unreachable: Vec<_> = blocks
440 .metadata
441 .keys()
442 .filter(|hash| !reachable.contains(*hash))
443 .cloned()
444 .collect();
445
446 for hash in unreachable {
447 if let Some(metadata) = blocks.metadata.remove(&hash) {
448 report.collected_blocks += 1;
449 report.reclaimed_bytes += metadata.size;
450
451 let block_path = blocks.path.join(hex::encode(hash));
453 let _ = std::fs::remove_file(block_path);
454
455 blocks.cache.pop(&hash);
457
458 refs.forward_refs.remove(&hash);
460 for (_, back_refs) in refs.backward_refs.iter_mut() {
461 back_refs.remove(&hash);
462 }
463 }
464 }
465
466 let mut stats = self.stats.write().await;
468 stats.gc_reclaimed += report.reclaimed_bytes as u64;
469
470 Ok(report)
471 }
472
473 fn create_triple_block(&self, triples: &[Triple]) -> Result<Block, OxirsError> {
475 let data = oxicode::serde::encode_to_vec(&triples, oxicode::config::standard())?;
476 let hash = self.compute_hash(&data);
477
478 Ok(Block {
479 hash,
480 block_type: BlockType::TripleData,
481 data,
482 references: Vec::new(),
483 })
484 }
485
486 fn create_index_blocks(&self, _data_blocks: &[ContentHash]) -> Result<Vec<Block>, OxirsError> {
488 Ok(Vec::new())
490 }
491
492 async fn build_merkle_tree(&self, blocks: &[ContentHash]) -> Result<ContentHash, OxirsError> {
494 let mut merkle = self.merkle_tree.write().await;
495
496 let mut current_level = blocks.to_vec();
498
499 while current_level.len() > 1 {
500 let mut next_level = Vec::new();
501
502 for chunk in current_level.chunks(2) {
503 let left = chunk[0];
504 let right = chunk.get(1).cloned().unwrap_or(left);
505
506 let combined = [left.as_slice(), right.as_slice()].concat();
507 let parent_hash = self.compute_hash(&combined);
508
509 let node = MerkleNode {
510 hash: parent_hash,
511 left: Some(left),
512 right: Some(right),
513 data: None,
514 };
515
516 merkle.nodes.insert(parent_hash, node);
517 next_level.push(parent_hash);
518 }
519
520 current_level = next_level;
521 }
522
523 let root = current_level[0];
524 merkle.root = Some(root);
525 Ok(root)
526 }
527
528 async fn store_block(&self, blocks: &mut BlockStore, block: Block) -> Result<(), OxirsError> {
530 let hash = block.hash;
531 let size = block.data.len();
532
533 let block_path = blocks.path.join(hex::encode(hash));
535 let compressed = self.compress_block(&block)?;
536 std::fs::write(block_path, compressed)?;
537
538 blocks.metadata.insert(
540 hash,
541 BlockMetadata {
542 created_at: chrono::Utc::now(),
543 size,
544 compression: Some("zstd".to_string()),
545 ref_count: 0,
546 },
547 );
548
549 blocks.cache.put(hash, block);
551
552 Ok(())
553 }
554
555 async fn load_block(
557 &self,
558 blocks: &BlockStore,
559 hash: &ContentHash,
560 ) -> Result<Block, OxirsError> {
561 if let Some(block) = blocks.cache.peek(hash) {
563 return Ok(block.clone());
564 }
565
566 let block_path = blocks.path.join(hex::encode(hash));
568 let compressed = std::fs::read(block_path)?;
569 let block = self.decompress_block(&compressed)?;
570
571 Ok(block)
572 }
573
574 async fn find_triple_blocks(
576 &self,
577 tree_hash: &ContentHash,
578 ) -> Result<Vec<ContentHash>, OxirsError> {
579 Ok(vec![*tree_hash])
581 }
582
583 async fn update_references(&self, commit: &Commit) -> Result<(), OxirsError> {
585 let mut refs = self.references.write().await;
586
587 refs.roots.insert(commit.hash);
589
590 refs.forward_refs
592 .entry(commit.hash)
593 .or_insert_with(HashSet::new)
594 .insert(commit.tree);
595
596 refs.backward_refs
598 .entry(commit.tree)
599 .or_insert_with(HashSet::new)
600 .insert(commit.hash);
601
602 Ok(())
603 }
604
605 async fn get_head(&self) -> Result<ContentHash, OxirsError> {
607 let refs = self.references.read().await;
609 refs.roots
610 .iter()
611 .next()
612 .cloned()
613 .ok_or_else(|| OxirsError::Store("No commits found".to_string()))
614 }
615
616 async fn verify_merkle_tree(
618 &self,
619 merkle: &MerkleTree,
620 root: ContentHash,
621 ) -> Result<bool, OxirsError> {
622 Ok(merkle.nodes.contains_key(&root))
624 }
625
626 fn compute_hash(&self, data: &[u8]) -> ContentHash {
628 let mut hasher = Sha256::new();
629 hasher.update(data);
630 hasher.finalize().into()
631 }
632
633 fn compute_commit_hash(&self, tree: &ContentHash, message: &str) -> ContentHash {
635 let mut hasher = Sha256::new();
636 hasher.update(tree);
637 hasher.update(message.as_bytes());
638 hasher.finalize().into()
639 }
640
641 fn compress_block(&self, block: &Block) -> Result<Vec<u8>, OxirsError> {
643 let serialized = oxicode::serde::encode_to_vec(block, oxicode::config::standard())?;
644 oxiarc_zstd::encode_all(&serialized, 3)
645 .map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
646 }
647
648 fn decompress_block(&self, data: &[u8]) -> Result<Block, OxirsError> {
650 let decompressed = oxiarc_zstd::decode_all(data)
651 .map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))?;
652 oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
653 .map(|(v, _)| v)
654 .map_err(Into::into)
655 }
656}
657
658#[derive(Debug)]
660pub struct IntegrityReport {
661 pub total_blocks: usize,
662 pub verified_blocks: usize,
663 pub corrupted_blocks: Vec<ContentHash>,
664 pub missing_blocks: Vec<ContentHash>,
665 pub merkle_valid: bool,
666}
667
668#[derive(Debug)]
670pub struct GCReport {
671 pub total_blocks: usize,
672 pub reachable_blocks: usize,
673 pub collected_blocks: usize,
674 pub reclaimed_bytes: usize,
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use crate::model::{Literal, NamedNode};
681
682 #[tokio::test]
683 async fn test_immutable_storage() {
684 let config = ImmutableConfig {
685 path: PathBuf::from("/tmp/oxirs_immutable_test"),
686 ..Default::default()
687 };
688
689 let storage = ImmutableStorage::new(config)
690 .await
691 .expect("async operation should succeed");
692
693 let triples = vec![
695 Triple::new(
696 NamedNode::new("http://example.org/s1").expect("valid IRI"),
697 NamedNode::new("http://example.org/p1").expect("valid IRI"),
698 crate::model::Object::NamedNode(
699 NamedNode::new("http://example.org/o1").expect("valid IRI"),
700 ),
701 ),
702 Triple::new(
703 NamedNode::new("http://example.org/s2").expect("valid IRI"),
704 NamedNode::new("http://example.org/p2").expect("valid IRI"),
705 crate::model::Object::Literal(Literal::new("test")),
706 ),
707 ];
708
709 let commit = storage
711 .store_triples(&triples, "Initial commit")
712 .await
713 .expect("operation should succeed");
714
715 let loaded = storage
717 .read_commit(commit.hash)
718 .await
719 .expect("async operation should succeed");
720 assert_eq!(loaded.len(), 2);
721
722 let pattern = TriplePattern::new(
724 Some(crate::model::SubjectPattern::NamedNode(
725 NamedNode::new("http://example.org/s1").expect("valid IRI"),
726 )),
727 None,
728 None,
729 );
730 let results = storage
731 .query_triples(&pattern, Some(commit.hash))
732 .await
733 .expect("operation should succeed");
734 assert_eq!(results.len(), 1);
735 }
736
737 #[tokio::test]
738 async fn test_deduplication() {
739 let config = ImmutableConfig {
740 path: PathBuf::from("/tmp/oxirs_immutable_dedup"),
741 deduplication: true,
742 ..Default::default()
743 };
744
745 let storage = ImmutableStorage::new(config)
746 .await
747 .expect("async operation should succeed");
748
749 let triple = Triple::new(
751 NamedNode::new("http://example.org/s").expect("valid IRI"),
752 NamedNode::new("http://example.org/p").expect("valid IRI"),
753 crate::model::Object::Literal(Literal::new("value")),
754 );
755
756 let triples = vec![triple.clone(), triple.clone(), triple.clone()];
757
758 let _commit = storage
760 .store_triples(&triples, "Dedup test")
761 .await
762 .expect("async operation should succeed");
763
764 let stats = storage.stats.read().await;
765 assert!(stats.unique_blocks < 3);
767 }
768}