Skip to main content

oxirs_core/storage/
immutable.rs

1//! Immutable storage with content-addressable blocks
2//!
3//! This module provides an immutable, content-addressable storage system
4//! inspired by Git and IPFS, optimized for RDF data integrity and versioning.
5
6use crate::model::{Triple, TriplePattern};
7use crate::OxirsError;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, HashSet};
11use std::path::PathBuf;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Immutable storage configuration
16#[derive(Debug, Clone)]
17pub struct ImmutableConfig {
18    /// Base path for immutable storage
19    pub path: PathBuf,
20    /// Block size in bytes
21    pub block_size: usize,
22    /// Enable deduplication
23    pub deduplication: bool,
24    /// Merkle tree depth
25    pub merkle_depth: usize,
26    /// Garbage collection policy
27    pub gc_policy: GarbageCollectionPolicy,
28}
29
30impl Default for ImmutableConfig {
31    fn default() -> Self {
32        ImmutableConfig {
33            path: PathBuf::from("/var/oxirs/immutable"),
34            block_size: 4096,
35            deduplication: true,
36            merkle_depth: 4,
37            gc_policy: GarbageCollectionPolicy::default(),
38        }
39    }
40}
41
42/// Garbage collection policy
43#[derive(Debug, Clone)]
44pub struct GarbageCollectionPolicy {
45    /// Enable automatic GC
46    pub auto_gc: bool,
47    /// GC threshold (percentage of unreachable blocks)
48    pub threshold: f64,
49    /// Minimum age for GC eligibility (hours)
50    pub min_age_hours: u32,
51}
52
53impl Default for GarbageCollectionPolicy {
54    fn default() -> Self {
55        GarbageCollectionPolicy {
56            auto_gc: true,
57            threshold: 0.2,
58            min_age_hours: 24,
59        }
60    }
61}
62
63/// Content hash type
64pub type ContentHash = [u8; 32];
65
66/// Immutable storage engine
67pub struct ImmutableStorage {
68    config: ImmutableConfig,
69    /// Block store
70    blocks: Arc<RwLock<BlockStore>>,
71    /// Merkle tree index
72    merkle_tree: Arc<RwLock<MerkleTree>>,
73    /// Reference tracking
74    references: Arc<RwLock<ReferenceTracker>>,
75    /// Deduplication index
76    dedup_index: Arc<RwLock<DeduplicationIndex>>,
77    /// Statistics
78    stats: Arc<RwLock<ImmutableStats>>,
79}
80
81/// Block store for content-addressable storage
82struct BlockStore {
83    /// Path to block storage
84    path: PathBuf,
85    /// In-memory cache of recent blocks
86    cache: lru::LruCache<ContentHash, Block>,
87    /// Block metadata index
88    metadata: HashMap<ContentHash, BlockMetadata>,
89}
90
91/// Immutable block
92#[derive(Debug, Clone, Serialize, Deserialize)]
93struct Block {
94    /// Block hash (content address)
95    hash: ContentHash,
96    /// Block type
97    block_type: BlockType,
98    /// Block data
99    data: Vec<u8>,
100    /// References to other blocks
101    references: Vec<ContentHash>,
102}
103
104/// Block type
105#[derive(Debug, Clone, Serialize, Deserialize)]
106enum BlockType {
107    /// Triple data block
108    TripleData,
109    /// Index block
110    Index,
111    /// Merkle tree node
112    MerkleNode,
113    /// Commit object
114    Commit,
115    /// Manifest block
116    Manifest,
117}
118
119/// Block metadata
120#[derive(Debug, Clone, Serialize, Deserialize)]
121struct BlockMetadata {
122    /// Creation timestamp
123    created_at: chrono::DateTime<chrono::Utc>,
124    /// Block size in bytes
125    size: usize,
126    /// Compression type
127    compression: Option<String>,
128    /// Reference count
129    ref_count: u32,
130}
131
132/// Merkle tree for integrity verification
133struct MerkleTree {
134    /// Root hash
135    root: Option<ContentHash>,
136    /// Tree nodes
137    nodes: HashMap<ContentHash, MerkleNode>,
138    /// Depth of tree
139    #[allow(dead_code)]
140    depth: usize,
141}
142
143/// Merkle tree node
144#[derive(Debug, Clone, Serialize, Deserialize)]
145struct MerkleNode {
146    /// Node hash
147    hash: ContentHash,
148    /// Left child hash
149    left: Option<ContentHash>,
150    /// Right child hash
151    right: Option<ContentHash>,
152    /// Data hash (for leaf nodes)
153    data: Option<ContentHash>,
154}
155
156/// Reference tracker for garbage collection
157struct ReferenceTracker {
158    /// Forward references (block -> referenced blocks)
159    forward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
160    /// Backward references (block -> blocks that reference it)
161    backward_refs: HashMap<ContentHash, HashSet<ContentHash>>,
162    /// Root blocks (entry points)
163    roots: HashSet<ContentHash>,
164}
165
166/// Deduplication index
167struct DeduplicationIndex {
168    /// Content fingerprint to hash mapping
169    #[allow(dead_code)]
170    fingerprints: HashMap<u64, Vec<ContentHash>>,
171    /// Triple to block mapping
172    triple_blocks: HashMap<Triple, ContentHash>,
173}
174
175/// Immutable storage statistics
176#[derive(Debug, Default)]
177struct ImmutableStats {
178    total_blocks: u64,
179    unique_blocks: u64,
180    #[allow(dead_code)]
181    total_size: u64,
182    #[allow(dead_code)]
183    dedup_savings: u64,
184    gc_reclaimed: u64,
185}
186
187/// Commit object for versioning
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct Commit {
190    /// Commit hash
191    pub hash: ContentHash,
192    /// Parent commits
193    pub parents: Vec<ContentHash>,
194    /// Root of data tree
195    pub tree: ContentHash,
196    /// Author information
197    pub author: String,
198    /// Timestamp
199    pub timestamp: chrono::DateTime<chrono::Utc>,
200    /// Commit message
201    pub message: String,
202    /// Metadata
203    pub metadata: HashMap<String, String>,
204}
205
206impl ImmutableStorage {
207    /// Create new immutable storage
208    pub async fn new(config: ImmutableConfig) -> Result<Self, OxirsError> {
209        std::fs::create_dir_all(&config.path)?;
210        std::fs::create_dir_all(config.path.join("blocks"))?;
211        std::fs::create_dir_all(config.path.join("refs"))?;
212
213        let cache_size = 1000; // Number of blocks to cache
214
215        Ok(ImmutableStorage {
216            config: config.clone(),
217            blocks: Arc::new(RwLock::new(BlockStore {
218                path: config.path.join("blocks"),
219                cache: lru::LruCache::new(
220                    std::num::NonZeroUsize::new(cache_size).unwrap_or(
221                        std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
222                    ),
223                ),
224                metadata: HashMap::new(),
225            })),
226            merkle_tree: Arc::new(RwLock::new(MerkleTree {
227                root: None,
228                nodes: HashMap::new(),
229                depth: config.merkle_depth,
230            })),
231            references: Arc::new(RwLock::new(ReferenceTracker {
232                forward_refs: HashMap::new(),
233                backward_refs: HashMap::new(),
234                roots: HashSet::new(),
235            })),
236            dedup_index: Arc::new(RwLock::new(DeduplicationIndex {
237                fingerprints: HashMap::new(),
238                triple_blocks: HashMap::new(),
239            })),
240            stats: Arc::new(RwLock::new(ImmutableStats::default())),
241        })
242    }
243
244    /// Store triples as immutable blocks
245    pub async fn store_triples(
246        &self,
247        triples: &[Triple],
248        message: &str,
249    ) -> Result<Commit, OxirsError> {
250        let mut block_hashes = Vec::new();
251        let mut blocks_guard = self.blocks.write().await;
252        let mut dedup_guard = self.dedup_index.write().await;
253
254        // Process triples in chunks
255        for chunk in triples.chunks(100) {
256            // Check for deduplication
257            if self.config.deduplication {
258                let mut unique_triples = Vec::new();
259                for triple in chunk {
260                    if !dedup_guard.triple_blocks.contains_key(triple) {
261                        unique_triples.push(triple.clone());
262                    }
263                }
264
265                if !unique_triples.is_empty() {
266                    let block = self.create_triple_block(&unique_triples)?;
267                    let hash = block.hash;
268
269                    // Update deduplication index
270                    for triple in unique_triples {
271                        dedup_guard.triple_blocks.insert(triple, hash);
272                    }
273
274                    // Store block
275                    self.store_block(&mut blocks_guard, block).await?;
276                    block_hashes.push(hash);
277                }
278            } else {
279                let block = self.create_triple_block(chunk)?;
280                let hash = block.hash;
281                self.store_block(&mut blocks_guard, block).await?;
282                block_hashes.push(hash);
283            }
284        }
285
286        // Create index blocks if needed
287        let index_blocks = self.create_index_blocks(&block_hashes)?;
288        for block in index_blocks {
289            self.store_block(&mut blocks_guard, block).await?;
290        }
291
292        // Build Merkle tree
293        let tree_root = self.build_merkle_tree(&block_hashes).await?;
294
295        // Create commit
296        let commit = Commit {
297            hash: self.compute_commit_hash(&tree_root, message),
298            parents: vec![], // Would get from current HEAD
299            tree: tree_root,
300            author: "system".to_string(),
301            timestamp: chrono::Utc::now(),
302            message: message.to_string(),
303            metadata: HashMap::new(),
304        };
305
306        // Store commit block
307        let commit_block = Block {
308            hash: commit.hash,
309            block_type: BlockType::Commit,
310            data: oxicode::serde::encode_to_vec(&commit, oxicode::config::standard())?,
311            references: vec![tree_root],
312        };
313        self.store_block(&mut blocks_guard, commit_block).await?;
314
315        // Update references
316        self.update_references(&commit).await?;
317
318        // Update stats
319        let mut stats = self.stats.write().await;
320        stats.total_blocks += block_hashes.len() as u64 + 1;
321        stats.unique_blocks += block_hashes.len() as u64 + 1;
322
323        Ok(commit)
324    }
325
326    /// Read triples from a commit
327    pub async fn read_commit(&self, commit_hash: ContentHash) -> Result<Vec<Triple>, OxirsError> {
328        let blocks = self.blocks.read().await;
329
330        // Load commit block
331        let commit_block = self.load_block(&blocks, &commit_hash).await?;
332        let commit: Commit =
333            oxicode::serde::decode_from_slice(&commit_block.data, oxicode::config::standard())
334                .map(|(v, _)| v)?;
335
336        // Traverse tree to find triple blocks
337        let triple_blocks = self.find_triple_blocks(&commit.tree).await?;
338
339        // Load and decode triples
340        let mut all_triples = Vec::new();
341        for block_hash in triple_blocks {
342            let block = self.load_block(&blocks, &block_hash).await?;
343            let triples: Vec<Triple> =
344                oxicode::serde::decode_from_slice(&block.data, oxicode::config::standard())
345                    .map(|(v, _)| v)?;
346            all_triples.extend(triples);
347        }
348
349        Ok(all_triples)
350    }
351
352    /// Query triples with pattern matching
353    pub async fn query_triples(
354        &self,
355        pattern: &TriplePattern,
356        commit_hash: Option<ContentHash>,
357    ) -> Result<Vec<Triple>, OxirsError> {
358        // If commit specified, query from that version
359        let commit = if let Some(hash) = commit_hash {
360            hash
361        } else {
362            // Get latest commit (HEAD)
363            self.get_head().await?
364        };
365
366        let all_triples = self.read_commit(commit).await?;
367
368        // Filter by pattern
369        Ok(all_triples
370            .into_iter()
371            .filter(|triple| pattern.matches(triple))
372            .collect())
373    }
374
375    /// Verify integrity of storage
376    pub async fn verify_integrity(&self) -> Result<IntegrityReport, OxirsError> {
377        let blocks = self.blocks.read().await;
378        let merkle = self.merkle_tree.read().await;
379
380        let mut report = IntegrityReport {
381            total_blocks: 0,
382            verified_blocks: 0,
383            corrupted_blocks: Vec::new(),
384            missing_blocks: Vec::new(),
385            merkle_valid: true,
386        };
387
388        // Verify all blocks
389        for hash in blocks.metadata.keys() {
390            report.total_blocks += 1;
391
392            if let Ok(block) = self.load_block(&blocks, hash).await {
393                // Verify hash
394                if self.compute_hash(&block.data) == *hash {
395                    report.verified_blocks += 1;
396                } else {
397                    report.corrupted_blocks.push(*hash);
398                }
399            } else {
400                report.missing_blocks.push(*hash);
401            }
402        }
403
404        // Verify Merkle tree
405        if let Some(root) = merkle.root {
406            report.merkle_valid = self.verify_merkle_tree(&merkle, root).await?;
407        }
408
409        Ok(report)
410    }
411
412    /// Run garbage collection
413    pub async fn garbage_collect(&self) -> Result<GCReport, OxirsError> {
414        let mut refs = self.references.write().await;
415        let mut blocks = self.blocks.write().await;
416
417        let mut report = GCReport {
418            total_blocks: blocks.metadata.len(),
419            reachable_blocks: 0,
420            collected_blocks: 0,
421            reclaimed_bytes: 0,
422        };
423
424        // Mark phase - find all reachable blocks
425        let mut reachable = HashSet::new();
426        let mut to_visit: Vec<_> = refs.roots.iter().cloned().collect();
427
428        while let Some(hash) = to_visit.pop() {
429            if reachable.insert(hash) {
430                if let Some(children) = refs.forward_refs.get(&hash) {
431                    to_visit.extend(children.iter().cloned());
432                }
433            }
434        }
435
436        report.reachable_blocks = reachable.len();
437
438        // Sweep phase - remove unreachable blocks
439        let unreachable: Vec<_> = blocks
440            .metadata
441            .keys()
442            .filter(|hash| !reachable.contains(*hash))
443            .cloned()
444            .collect();
445
446        for hash in unreachable {
447            if let Some(metadata) = blocks.metadata.remove(&hash) {
448                report.collected_blocks += 1;
449                report.reclaimed_bytes += metadata.size;
450
451                // Remove from disk
452                let block_path = blocks.path.join(hex::encode(hash));
453                let _ = std::fs::remove_file(block_path);
454
455                // Remove from cache
456                blocks.cache.pop(&hash);
457
458                // Remove references
459                refs.forward_refs.remove(&hash);
460                for (_, back_refs) in refs.backward_refs.iter_mut() {
461                    back_refs.remove(&hash);
462                }
463            }
464        }
465
466        // Update stats
467        let mut stats = self.stats.write().await;
468        stats.gc_reclaimed += report.reclaimed_bytes as u64;
469
470        Ok(report)
471    }
472
473    /// Create a triple block
474    fn create_triple_block(&self, triples: &[Triple]) -> Result<Block, OxirsError> {
475        let data = oxicode::serde::encode_to_vec(&triples, oxicode::config::standard())?;
476        let hash = self.compute_hash(&data);
477
478        Ok(Block {
479            hash,
480            block_type: BlockType::TripleData,
481            data,
482            references: Vec::new(),
483        })
484    }
485
486    /// Create index blocks for efficient querying
487    fn create_index_blocks(&self, _data_blocks: &[ContentHash]) -> Result<Vec<Block>, OxirsError> {
488        // Simplified - would create actual index structures
489        Ok(Vec::new())
490    }
491
492    /// Build Merkle tree from blocks
493    async fn build_merkle_tree(&self, blocks: &[ContentHash]) -> Result<ContentHash, OxirsError> {
494        let mut merkle = self.merkle_tree.write().await;
495
496        // Build tree bottom-up
497        let mut current_level = blocks.to_vec();
498
499        while current_level.len() > 1 {
500            let mut next_level = Vec::new();
501
502            for chunk in current_level.chunks(2) {
503                let left = chunk[0];
504                let right = chunk.get(1).cloned().unwrap_or(left);
505
506                let combined = [left.as_slice(), right.as_slice()].concat();
507                let parent_hash = self.compute_hash(&combined);
508
509                let node = MerkleNode {
510                    hash: parent_hash,
511                    left: Some(left),
512                    right: Some(right),
513                    data: None,
514                };
515
516                merkle.nodes.insert(parent_hash, node);
517                next_level.push(parent_hash);
518            }
519
520            current_level = next_level;
521        }
522
523        let root = current_level[0];
524        merkle.root = Some(root);
525        Ok(root)
526    }
527
528    /// Store a block
529    async fn store_block(&self, blocks: &mut BlockStore, block: Block) -> Result<(), OxirsError> {
530        let hash = block.hash;
531        let size = block.data.len();
532
533        // Write to disk
534        let block_path = blocks.path.join(hex::encode(hash));
535        let compressed = self.compress_block(&block)?;
536        std::fs::write(block_path, compressed)?;
537
538        // Update metadata
539        blocks.metadata.insert(
540            hash,
541            BlockMetadata {
542                created_at: chrono::Utc::now(),
543                size,
544                compression: Some("zstd".to_string()),
545                ref_count: 0,
546            },
547        );
548
549        // Add to cache
550        blocks.cache.put(hash, block);
551
552        Ok(())
553    }
554
555    /// Load a block
556    async fn load_block(
557        &self,
558        blocks: &BlockStore,
559        hash: &ContentHash,
560    ) -> Result<Block, OxirsError> {
561        // Check cache first
562        if let Some(block) = blocks.cache.peek(hash) {
563            return Ok(block.clone());
564        }
565
566        // Load from disk
567        let block_path = blocks.path.join(hex::encode(hash));
568        let compressed = std::fs::read(block_path)?;
569        let block = self.decompress_block(&compressed)?;
570
571        Ok(block)
572    }
573
574    /// Find all triple blocks under a tree
575    async fn find_triple_blocks(
576        &self,
577        tree_hash: &ContentHash,
578    ) -> Result<Vec<ContentHash>, OxirsError> {
579        // Simplified - would traverse tree structure
580        Ok(vec![*tree_hash])
581    }
582
583    /// Update reference tracking
584    async fn update_references(&self, commit: &Commit) -> Result<(), OxirsError> {
585        let mut refs = self.references.write().await;
586
587        // Add commit as root
588        refs.roots.insert(commit.hash);
589
590        // Add forward reference
591        refs.forward_refs
592            .entry(commit.hash)
593            .or_insert_with(HashSet::new)
594            .insert(commit.tree);
595
596        // Add backward reference
597        refs.backward_refs
598            .entry(commit.tree)
599            .or_insert_with(HashSet::new)
600            .insert(commit.hash);
601
602        Ok(())
603    }
604
605    /// Get current HEAD commit
606    async fn get_head(&self) -> Result<ContentHash, OxirsError> {
607        // Simplified - would read from refs/heads/main
608        let refs = self.references.read().await;
609        refs.roots
610            .iter()
611            .next()
612            .cloned()
613            .ok_or_else(|| OxirsError::Store("No commits found".to_string()))
614    }
615
616    /// Verify Merkle tree integrity
617    async fn verify_merkle_tree(
618        &self,
619        merkle: &MerkleTree,
620        root: ContentHash,
621    ) -> Result<bool, OxirsError> {
622        // Simplified verification
623        Ok(merkle.nodes.contains_key(&root))
624    }
625
626    /// Compute SHA256 hash
627    fn compute_hash(&self, data: &[u8]) -> ContentHash {
628        let mut hasher = Sha256::new();
629        hasher.update(data);
630        hasher.finalize().into()
631    }
632
633    /// Compute commit hash
634    fn compute_commit_hash(&self, tree: &ContentHash, message: &str) -> ContentHash {
635        let mut hasher = Sha256::new();
636        hasher.update(tree);
637        hasher.update(message.as_bytes());
638        hasher.finalize().into()
639    }
640
641    /// Compress block data
642    fn compress_block(&self, block: &Block) -> Result<Vec<u8>, OxirsError> {
643        let serialized = oxicode::serde::encode_to_vec(block, oxicode::config::standard())?;
644        oxiarc_zstd::encode_all(&serialized, 3)
645            .map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
646    }
647
648    /// Decompress block data
649    fn decompress_block(&self, data: &[u8]) -> Result<Block, OxirsError> {
650        let decompressed = oxiarc_zstd::decode_all(data)
651            .map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))?;
652        oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
653            .map(|(v, _)| v)
654            .map_err(Into::into)
655    }
656}
657
658/// Integrity verification report
659#[derive(Debug)]
660pub struct IntegrityReport {
661    pub total_blocks: usize,
662    pub verified_blocks: usize,
663    pub corrupted_blocks: Vec<ContentHash>,
664    pub missing_blocks: Vec<ContentHash>,
665    pub merkle_valid: bool,
666}
667
668/// Garbage collection report
669#[derive(Debug)]
670pub struct GCReport {
671    pub total_blocks: usize,
672    pub reachable_blocks: usize,
673    pub collected_blocks: usize,
674    pub reclaimed_bytes: usize,
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::model::{Literal, NamedNode};
681
682    #[tokio::test]
683    async fn test_immutable_storage() {
684        let config = ImmutableConfig {
685            path: PathBuf::from("/tmp/oxirs_immutable_test"),
686            ..Default::default()
687        };
688
689        let storage = ImmutableStorage::new(config)
690            .await
691            .expect("async operation should succeed");
692
693        // Create test triples
694        let triples = vec![
695            Triple::new(
696                NamedNode::new("http://example.org/s1").expect("valid IRI"),
697                NamedNode::new("http://example.org/p1").expect("valid IRI"),
698                crate::model::Object::NamedNode(
699                    NamedNode::new("http://example.org/o1").expect("valid IRI"),
700                ),
701            ),
702            Triple::new(
703                NamedNode::new("http://example.org/s2").expect("valid IRI"),
704                NamedNode::new("http://example.org/p2").expect("valid IRI"),
705                crate::model::Object::Literal(Literal::new("test")),
706            ),
707        ];
708
709        // Store triples
710        let commit = storage
711            .store_triples(&triples, "Initial commit")
712            .await
713            .expect("operation should succeed");
714
715        // Read back triples
716        let loaded = storage
717            .read_commit(commit.hash)
718            .await
719            .expect("async operation should succeed");
720        assert_eq!(loaded.len(), 2);
721
722        // Query with pattern
723        let pattern = TriplePattern::new(
724            Some(crate::model::SubjectPattern::NamedNode(
725                NamedNode::new("http://example.org/s1").expect("valid IRI"),
726            )),
727            None,
728            None,
729        );
730        let results = storage
731            .query_triples(&pattern, Some(commit.hash))
732            .await
733            .expect("operation should succeed");
734        assert_eq!(results.len(), 1);
735    }
736
737    #[tokio::test]
738    async fn test_deduplication() {
739        let config = ImmutableConfig {
740            path: PathBuf::from("/tmp/oxirs_immutable_dedup"),
741            deduplication: true,
742            ..Default::default()
743        };
744
745        let storage = ImmutableStorage::new(config)
746            .await
747            .expect("async operation should succeed");
748
749        // Store same triple multiple times
750        let triple = Triple::new(
751            NamedNode::new("http://example.org/s").expect("valid IRI"),
752            NamedNode::new("http://example.org/p").expect("valid IRI"),
753            crate::model::Object::Literal(Literal::new("value")),
754        );
755
756        let triples = vec![triple.clone(), triple.clone(), triple.clone()];
757
758        // Should deduplicate
759        let _commit = storage
760            .store_triples(&triples, "Dedup test")
761            .await
762            .expect("async operation should succeed");
763
764        let stats = storage.stats.read().await;
765        // Should only store one unique block for the triples
766        assert!(stats.unique_blocks < 3);
767    }
768}