saorsa_core/dht/
content_addressing.rs

1//! Enhanced Content Addressing System with BLAKE3 hashing
2//!
3//! Provides deterministic content addressing with efficient chunking,
4//! deduplication, and integrity verification capabilities.
5
6use anyhow::{Result, anyhow};
7use blake3::Hasher;
8use bytes::{Bytes, BytesMut};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::io::{AsyncRead, AsyncReadExt};
14use tokio::sync::RwLock;
15
16/// Content address uniquely identifying stored content
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct ContentAddress {
19    /// BLAKE3 hash of the content manifest
20    pub root_hash: [u8; 32],
21    /// Individual chunk hashes
22    pub chunk_hashes: Vec<[u8; 32]>,
23    /// Total content size in bytes
24    pub total_size: u64,
25    /// Number of chunks
26    pub chunk_count: u32,
27}
28
29impl ContentAddress {
30    /// Create a new content address from data bytes (convenience method)
31    pub fn new(data: &[u8]) -> Self {
32        let mut hasher = Hasher::new();
33        hasher.update(data);
34        let root_hash = hasher.finalize().into();
35
36        Self {
37            root_hash,
38            chunk_count: 1,
39            chunk_hashes: vec![root_hash],
40            total_size: data.len() as u64,
41        }
42    }
43
44    /// Create a new content address with detailed info
45    pub fn new_detailed(root_hash: [u8; 32], chunk_hashes: Vec<[u8; 32]>, total_size: u64) -> Self {
46        Self {
47            root_hash,
48            chunk_count: chunk_hashes.len() as u32,
49            chunk_hashes,
50            total_size,
51        }
52    }
53
54    /// Verify that data matches this content address
55    pub fn verify(&self, data: &[u8]) -> bool {
56        let mut hasher = Hasher::new();
57        hasher.update(data);
58        let hash: [u8; 32] = hasher.finalize().into();
59        hash == self.root_hash
60    }
61
62    /// Create a content address from bytes (using the bytes as root hash)
63    pub fn from_bytes(bytes: &[u8]) -> Self {
64        let mut root_hash = [0u8; 32];
65        let len = bytes.len().min(32);
66        root_hash[..len].copy_from_slice(&bytes[..len]);
67
68        Self {
69            root_hash,
70            chunk_count: 1,
71            chunk_hashes: vec![root_hash],
72            total_size: bytes.len() as u64,
73        }
74    }
75}
76
77/// Metadata about stored chunks
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ChunkMetadata {
80    pub chunk_sizes: Vec<u32>,
81    pub created_at: SystemTime,
82    pub access_count: u32,
83    pub dedup_count: u32,
84}
85
86/// Statistics about deduplication efficiency
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupStatistics {
89    pub total_chunks: u64,
90    pub unique_chunks: u64,
91    pub dedup_ratio: f64,
92    pub space_saved: u64,
93}
94
95/// Configuration for content-defined chunking
96#[derive(Debug, Clone)]
97pub struct ChunkingConfig {
98    pub min_chunk_size: usize,
99    pub target_chunk_size: usize,
100    pub max_chunk_size: usize,
101    pub window_size: usize,
102}
103
104impl Default for ChunkingConfig {
105    fn default() -> Self {
106        Self {
107            min_chunk_size: 1024,      // 1KB
108            target_chunk_size: 65536,  // 64KB
109            max_chunk_size: 1_048_576, // 1MB
110            window_size: 48,
111        }
112    }
113}
114
115/// Content-defined chunker using rolling hash
116pub struct ContentDefinedChunker {
117    config: ChunkingConfig,
118    buffer: BytesMut,
119}
120
121impl ContentDefinedChunker {
122    pub fn new(config: ChunkingConfig) -> Self {
123        let buffer_capacity = config.max_chunk_size;
124        Self {
125            config,
126            buffer: BytesMut::with_capacity(buffer_capacity),
127        }
128    }
129
130    /// Find next chunk boundary using rolling hash
131    pub fn find_boundary(&self, data: &[u8]) -> Option<usize> {
132        if data.len() < self.config.min_chunk_size {
133            return None;
134        }
135
136        let mut hash = 0u32;
137        let window = self.config.window_size;
138
139        // Skip minimum chunk size
140        let search_start = self.config.min_chunk_size;
141        let search_end = data.len().min(self.config.max_chunk_size);
142
143        for i in search_start..search_end {
144            // Simple rolling hash (Buzhash)
145            if i >= window {
146                let old_byte = data[i - window];
147                hash = hash.rotate_left(1) ^ u32::from(old_byte);
148            }
149
150            let new_byte = data[i];
151            hash = hash.rotate_left(1) ^ u32::from(new_byte);
152
153            // Check if we found a boundary
154            let mask = (1 << 13) - 1; // Target 8KB average
155            if (hash & mask) == 0 {
156                return Some(i);
157            }
158        }
159
160        // Force boundary at max chunk size (or end of buffer if smaller)
161        if data.len() >= self.config.max_chunk_size {
162            Some(self.config.max_chunk_size)
163        } else if !data.is_empty() {
164            Some(data.len())
165        } else {
166            None
167        }
168    }
169
170    /// Chunk data into variable-size chunks
171    pub async fn chunk_data(&mut self, mut reader: impl AsyncRead + Unpin) -> Result<Vec<Bytes>> {
172        let mut chunks = Vec::new();
173        let mut buffer = vec![0u8; self.config.max_chunk_size];
174
175        loop {
176            let n = reader.read(&mut buffer).await?;
177            if n == 0 {
178                break;
179            }
180
181            self.buffer.extend_from_slice(&buffer[..n]);
182
183            while self.buffer.len() >= self.config.min_chunk_size {
184                if let Some(boundary) = self.find_boundary(&self.buffer) {
185                    let chunk = self.buffer.split_to(boundary);
186                    chunks.push(chunk.freeze());
187                } else {
188                    break;
189                }
190            }
191        }
192
193        // Handle remaining data
194        if !self.buffer.is_empty() {
195            chunks.push(self.buffer.split().freeze());
196        }
197
198        Ok(chunks)
199    }
200}
201
202/// Reference to a stored chunk
203#[derive(Debug, Clone)]
204struct ChunkRef {
205    _size: u32,
206    _created_at: SystemTime,
207    access_count: u32,
208    reference_count: u32,
209}
210
211/// Simple content store for testing
212pub struct ContentStore {
213    storage: HashMap<ContentAddress, Vec<u8>>,
214}
215
216impl Default for ContentStore {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222impl ContentStore {
223    pub fn new() -> Self {
224        Self {
225            storage: HashMap::new(),
226        }
227    }
228
229    pub fn store(&mut self, address: ContentAddress, data: Vec<u8>) {
230        self.storage.insert(address, data);
231    }
232
233    pub fn retrieve(&self, address: &ContentAddress) -> Option<&Vec<u8>> {
234        self.storage.get(address)
235    }
236
237    pub fn size(&self) -> usize {
238        self.storage.len()
239    }
240}
241
242/// Global deduplication index
243pub struct DedupIndex {
244    chunk_refs: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
245    total_dedup_savings: Arc<RwLock<u64>>,
246}
247
248impl Default for DedupIndex {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl DedupIndex {
255    pub fn new() -> Self {
256        Self {
257            chunk_refs: Arc::new(RwLock::new(HashMap::new())),
258            total_dedup_savings: Arc::new(RwLock::new(0)),
259        }
260    }
261
262    /// Check if chunk exists and update reference count
263    pub async fn check_and_update(&self, hash: &[u8; 32], size: u32) -> bool {
264        let mut refs = self.chunk_refs.write().await;
265
266        if let Some(chunk_ref) = refs.get_mut(hash) {
267            chunk_ref.reference_count += 1;
268            chunk_ref.access_count += 1;
269
270            let mut savings = self.total_dedup_savings.write().await;
271            *savings += size as u64;
272
273            true
274        } else {
275            refs.insert(
276                *hash,
277                ChunkRef {
278                    _size: size,
279                    _created_at: SystemTime::now(),
280                    access_count: 1,
281                    reference_count: 1,
282                },
283            );
284            false
285        }
286    }
287
288    /// Get deduplication statistics
289    pub async fn get_stats(&self) -> DedupStatistics {
290        let refs = self.chunk_refs.read().await;
291        let savings = *self.total_dedup_savings.read().await;
292
293        let total_chunks: u64 = refs.values().map(|r| r.reference_count as u64).sum();
294        let unique_chunks = refs.len() as u64;
295
296        DedupStatistics {
297            total_chunks,
298            unique_chunks,
299            dedup_ratio: if total_chunks > 0 {
300                1.0 - (unique_chunks as f64 / total_chunks as f64)
301            } else {
302                0.0
303            },
304            space_saved: savings,
305        }
306    }
307}
308
309/// Storage backend for chunks
310pub struct ChunkStorage {
311    chunks: Arc<RwLock<HashMap<[u8; 32], Bytes>>>,
312}
313
314impl Default for ChunkStorage {
315    fn default() -> Self {
316        Self::new()
317    }
318}
319
320impl ChunkStorage {
321    pub fn new() -> Self {
322        Self {
323            chunks: Arc::new(RwLock::new(HashMap::new())),
324        }
325    }
326
327    /// Store a chunk
328    pub async fn store(&self, hash: [u8; 32], data: Bytes) -> Result<()> {
329        let mut chunks = self.chunks.write().await;
330        chunks.insert(hash, data);
331        Ok(())
332    }
333
334    /// Retrieve a chunk
335    pub async fn retrieve(&self, hash: &[u8; 32]) -> Result<Bytes> {
336        let chunks = self.chunks.read().await;
337        chunks
338            .get(hash)
339            .cloned()
340            .ok_or_else(|| anyhow!("Chunk not found"))
341    }
342
343    /// Verify chunk integrity
344    pub async fn verify(&self, hash: &[u8; 32]) -> Result<bool> {
345        let chunks = self.chunks.read().await;
346        if let Some(data) = chunks.get(hash) {
347            let computed_hash = blake3::hash(data);
348            Ok(computed_hash.as_bytes() == hash)
349        } else {
350            Ok(false)
351        }
352    }
353}
354
355/// Main content addressing system
356pub struct ContentAddressingSystem {
357    chunker: ContentDefinedChunker,
358    dedup_index: DedupIndex,
359    chunk_store: ChunkStorage,
360    metadata: Arc<RwLock<HashMap<[u8; 32], ChunkMetadata>>>,
361}
362
363impl Default for ContentAddressingSystem {
364    fn default() -> Self {
365        Self::new()
366    }
367}
368
369impl ContentAddressingSystem {
370    /// Create a new content addressing system
371    pub fn new() -> Self {
372        Self {
373            chunker: ContentDefinedChunker::new(ChunkingConfig::default()),
374            dedup_index: DedupIndex::new(),
375            chunk_store: ChunkStorage::new(),
376            metadata: Arc::new(RwLock::new(HashMap::new())),
377        }
378    }
379
380    /// Store content and return its address
381    pub async fn store_content(
382        &mut self,
383        content: impl AsyncRead + Unpin,
384    ) -> Result<ContentAddress> {
385        // Chunk the content
386        let chunks = self.chunker.chunk_data(content).await?;
387
388        let mut chunk_hashes = Vec::new();
389        let mut total_size = 0u64;
390        let mut chunk_sizes = Vec::new();
391
392        // Process each chunk
393        for chunk in chunks {
394            let hash = blake3::hash(&chunk);
395            let hash_bytes = *hash.as_bytes();
396
397            chunk_hashes.push(hash_bytes);
398            chunk_sizes.push(chunk.len() as u32);
399            total_size += chunk.len() as u64;
400
401            // Check deduplication
402            let is_duplicate = self
403                .dedup_index
404                .check_and_update(&hash_bytes, chunk.len() as u32)
405                .await;
406
407            // Store if new
408            if !is_duplicate {
409                self.chunk_store.store(hash_bytes, chunk).await?;
410            }
411        }
412
413        // Generate root hash from manifest
414        let mut hasher = Hasher::new();
415        for hash in &chunk_hashes {
416            hasher.update(hash);
417        }
418        hasher.update(&total_size.to_le_bytes());
419        let root_hash = *hasher.finalize().as_bytes();
420
421        // Store metadata
422        let metadata = ChunkMetadata {
423            chunk_sizes,
424            created_at: SystemTime::now(),
425            access_count: 0,
426            dedup_count: 0,
427        };
428
429        self.metadata.write().await.insert(root_hash, metadata);
430
431        Ok(ContentAddress::new_detailed(
432            root_hash,
433            chunk_hashes,
434            total_size,
435        ))
436    }
437
438    /// Retrieve content by address
439    pub async fn retrieve_content(&self, address: &ContentAddress) -> Result<Vec<u8>> {
440        let mut content = Vec::with_capacity(address.total_size as usize);
441
442        for chunk_hash in &address.chunk_hashes {
443            let chunk = self.chunk_store.retrieve(chunk_hash).await?;
444            content.extend_from_slice(&chunk);
445        }
446
447        // Update access count
448        if let Some(metadata) = self.metadata.write().await.get_mut(&address.root_hash) {
449            metadata.access_count += 1;
450        }
451
452        Ok(content)
453    }
454
455    /// Verify content integrity without retrieval
456    pub async fn verify_integrity(&self, address: &ContentAddress) -> Result<bool> {
457        // Verify all chunks exist and match their hashes
458        for chunk_hash in &address.chunk_hashes {
459            if !self.chunk_store.verify(chunk_hash).await? {
460                return Ok(false);
461            }
462        }
463
464        // Verify manifest hash
465        let mut hasher = Hasher::new();
466        for hash in &address.chunk_hashes {
467            hasher.update(hash);
468        }
469        hasher.update(&address.total_size.to_le_bytes());
470        let computed_root = hasher.finalize();
471
472        Ok(computed_root.as_bytes() == &address.root_hash)
473    }
474
475    /// Get chunk metadata
476    pub async fn get_chunk_info(&self, address: &ContentAddress) -> Result<ChunkMetadata> {
477        let metadata = self.metadata.read().await;
478        metadata
479            .get(&address.root_hash)
480            .cloned()
481            .ok_or_else(|| anyhow!("Metadata not found"))
482    }
483
484    /// Get deduplication statistics
485    pub async fn get_dedup_stats(&self) -> DedupStatistics {
486        self.dedup_index.get_stats().await
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use std::io::Cursor;
494
495    #[tokio::test]
496    async fn test_deterministic_addressing() {
497        let mut system1 = ContentAddressingSystem::new();
498        let mut system2 = ContentAddressingSystem::new();
499        let content = b"test content for deterministic addressing";
500
501        let addr1 = system1.store_content(Cursor::new(content)).await.unwrap();
502        let addr2 = system2.store_content(Cursor::new(content)).await.unwrap();
503
504        assert_eq!(addr1, addr2, "Same content should produce same address");
505    }
506
507    #[tokio::test]
508    async fn test_content_integrity_verification() {
509        let mut system = ContentAddressingSystem::new();
510        let content = b"test content for integrity verification";
511
512        let addr = system.store_content(Cursor::new(content)).await.unwrap();
513        let is_valid = system.verify_integrity(&addr).await.unwrap();
514
515        assert!(is_valid, "Content integrity should be valid");
516
517        let retrieved = system.retrieve_content(&addr).await.unwrap();
518        assert_eq!(
519            retrieved, content,
520            "Retrieved content should match original"
521        );
522    }
523
524    #[tokio::test]
525    async fn test_chunking_boundaries() {
526        let config = ChunkingConfig::default();
527        let chunker = ContentDefinedChunker::new(config.clone());
528
529        // Test minimum chunk size
530        let small_data = vec![0u8; config.min_chunk_size - 1];
531        assert_eq!(chunker.find_boundary(&small_data), None);
532
533        // Test boundary behavior when data exceeds max chunk size
534        let large_data = vec![1u8; config.max_chunk_size + 100];
535        let boundary = chunker.find_boundary(&large_data);
536        assert!(boundary.is_some());
537        let b = boundary.unwrap();
538        assert!(b >= config.min_chunk_size && b <= config.max_chunk_size);
539    }
540
541    #[tokio::test]
542    async fn test_deduplication_efficiency() {
543        let mut system = ContentAddressingSystem::new();
544
545        // Store same content twice
546        let content = b"duplicate content for dedup testing";
547        let addr1 = system.store_content(Cursor::new(content)).await.unwrap();
548        let addr2 = system.store_content(Cursor::new(content)).await.unwrap();
549
550        assert_eq!(addr1, addr2, "Duplicate content should have same address");
551
552        let stats = system.get_dedup_stats().await;
553        assert!(stats.dedup_ratio > 0.0, "Should have deduplication");
554        assert_eq!(stats.unique_chunks, 1, "Should only store one unique chunk");
555    }
556
557    #[tokio::test]
558    async fn test_empty_content() {
559        let mut system = ContentAddressingSystem::new();
560        let content = b"";
561
562        let addr = system.store_content(Cursor::new(content)).await.unwrap();
563        assert_eq!(addr.total_size, 0);
564        assert_eq!(addr.chunk_count, 0);
565
566        let retrieved = system.retrieve_content(&addr).await.unwrap();
567        assert_eq!(retrieved.len(), 0);
568    }
569
570    #[tokio::test]
571    async fn test_large_content_streaming() {
572        let mut system = ContentAddressingSystem::new();
573        let large_content = vec![42u8; 10_000_000]; // 10MB
574
575        let addr = system
576            .store_content(Cursor::new(&large_content))
577            .await
578            .unwrap();
579        assert_eq!(addr.total_size, large_content.len() as u64);
580        assert!(addr.chunk_count > 1, "Large content should be chunked");
581
582        let retrieved = system.retrieve_content(&addr).await.unwrap();
583        assert_eq!(retrieved.len(), large_content.len());
584        assert_eq!(retrieved, large_content);
585    }
586}