saorsa_core/dht/
content_addressing.rs

1//! Enhanced Content Addressing System with BLAKE3 hashing
2//!
3//! Provides deterministic content addressing with efficient chunking,
4//! deduplication, and integrity verification capabilities.
5
6use anyhow::{Result, anyhow};
7use blake3::Hasher;
8use bytes::{Bytes, BytesMut};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::io::{AsyncRead, AsyncReadExt};
14use tokio::sync::RwLock;
15
16/// Content address uniquely identifying stored content
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct ContentAddress {
19    /// BLAKE3 hash of the content manifest
20    pub root_hash: [u8; 32],
21    /// Individual chunk hashes
22    pub chunk_hashes: Vec<[u8; 32]>,
23    /// Total content size in bytes
24    pub total_size: u64,
25    /// Number of chunks
26    pub chunk_count: u32,
27}
28
29impl ContentAddress {
30    /// Create a new content address from data bytes (convenience method)
31    pub fn new(data: &[u8]) -> Self {
32        let mut hasher = Hasher::new();
33        hasher.update(data);
34        let root_hash = hasher.finalize().into();
35
36        Self {
37            root_hash,
38            chunk_count: 1,
39            chunk_hashes: vec![root_hash],
40            total_size: data.len() as u64,
41        }
42    }
43
44    /// Create a new content address with detailed info
45    pub fn new_detailed(root_hash: [u8; 32], chunk_hashes: Vec<[u8; 32]>, total_size: u64) -> Self {
46        Self {
47            root_hash,
48            chunk_count: chunk_hashes.len() as u32,
49            chunk_hashes,
50            total_size,
51        }
52    }
53
54    /// Verify that data matches this content address
55    pub fn verify(&self, data: &[u8]) -> bool {
56        let mut hasher = Hasher::new();
57        hasher.update(data);
58        let hash: [u8; 32] = hasher.finalize().into();
59        hash == self.root_hash
60    }
61
62    /// Create a content address from bytes (using the bytes as root hash)
63    pub fn from_bytes(bytes: &[u8]) -> Self {
64        let mut root_hash = [0u8; 32];
65        let len = bytes.len().min(32);
66        root_hash[..len].copy_from_slice(&bytes[..len]);
67
68        Self {
69            root_hash,
70            chunk_count: 1,
71            chunk_hashes: vec![root_hash],
72            total_size: bytes.len() as u64,
73        }
74    }
75}
76
77/// Metadata about stored chunks
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ChunkMetadata {
80    pub chunk_sizes: Vec<u32>,
81    pub created_at: SystemTime,
82    pub access_count: u32,
83    pub dedup_count: u32,
84}
85
86/// Statistics about deduplication efficiency
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupStatistics {
89    pub total_chunks: u64,
90    pub unique_chunks: u64,
91    pub dedup_ratio: f64,
92    pub space_saved: u64,
93}
94
95/// Configuration for content-defined chunking
96#[derive(Debug, Clone)]
97pub struct ChunkingConfig {
98    pub min_chunk_size: usize,
99    pub target_chunk_size: usize,
100    pub max_chunk_size: usize,
101    pub window_size: usize,
102}
103
104impl Default for ChunkingConfig {
105    fn default() -> Self {
106        Self {
107            min_chunk_size: 1024,      // 1KB
108            target_chunk_size: 65536,  // 64KB
109            max_chunk_size: 1_048_576, // 1MB
110            window_size: 48,
111        }
112    }
113}
114
115/// Content-defined chunker using rolling hash
116pub struct ContentDefinedChunker {
117    config: ChunkingConfig,
118    buffer: BytesMut,
119}
120
121impl ContentDefinedChunker {
122    pub fn new(config: ChunkingConfig) -> Self {
123        let buffer_capacity = config.max_chunk_size;
124        Self {
125            config,
126            buffer: BytesMut::with_capacity(buffer_capacity),
127        }
128    }
129
130    /// Find next chunk boundary using rolling hash
131    pub fn find_boundary(&self, data: &[u8]) -> Option<usize> {
132        if data.len() < self.config.min_chunk_size {
133            return None;
134        }
135
136        let mut hash = 0u32;
137        let window = self.config.window_size;
138
139        // Skip minimum chunk size
140        let search_start = self.config.min_chunk_size;
141        let search_end = data.len().min(self.config.max_chunk_size);
142
143        for i in search_start..search_end {
144            // Simple rolling hash (Buzhash)
145            if i >= window {
146                let old_byte = data[i - window];
147                hash = hash.rotate_left(1) ^ u32::from(old_byte);
148            }
149
150            let new_byte = data[i];
151            hash = hash.rotate_left(1) ^ u32::from(new_byte);
152
153            // Check if we found a boundary
154            let mask = (1 << 13) - 1; // Target 8KB average
155            if (hash & mask) == 0 {
156                return Some(i);
157            }
158        }
159
160        // Force boundary at max chunk size
161        if data.len() >= self.config.max_chunk_size {
162            Some(self.config.max_chunk_size)
163        } else {
164            None
165        }
166    }
167
168    /// Chunk data into variable-size chunks
169    pub async fn chunk_data(&mut self, mut reader: impl AsyncRead + Unpin) -> Result<Vec<Bytes>> {
170        let mut chunks = Vec::new();
171        let mut buffer = vec![0u8; self.config.max_chunk_size];
172
173        loop {
174            let n = reader.read(&mut buffer).await?;
175            if n == 0 {
176                break;
177            }
178
179            self.buffer.extend_from_slice(&buffer[..n]);
180
181            while self.buffer.len() >= self.config.min_chunk_size {
182                if let Some(boundary) = self.find_boundary(&self.buffer) {
183                    let chunk = self.buffer.split_to(boundary);
184                    chunks.push(chunk.freeze());
185                } else {
186                    break;
187                }
188            }
189        }
190
191        // Handle remaining data
192        if !self.buffer.is_empty() {
193            chunks.push(self.buffer.split().freeze());
194        }
195
196        Ok(chunks)
197    }
198}
199
200/// Reference to a stored chunk
201#[derive(Debug, Clone)]
202struct ChunkRef {
203    _size: u32,
204    _created_at: SystemTime,
205    access_count: u32,
206    reference_count: u32,
207}
208
209/// Simple content store for testing
210pub struct ContentStore {
211    storage: HashMap<ContentAddress, Vec<u8>>,
212}
213
214impl Default for ContentStore {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220impl ContentStore {
221    pub fn new() -> Self {
222        Self {
223            storage: HashMap::new(),
224        }
225    }
226
227    pub fn store(&mut self, address: ContentAddress, data: Vec<u8>) {
228        self.storage.insert(address, data);
229    }
230
231    pub fn retrieve(&self, address: &ContentAddress) -> Option<&Vec<u8>> {
232        self.storage.get(address)
233    }
234
235    pub fn size(&self) -> usize {
236        self.storage.len()
237    }
238}
239
240/// Global deduplication index
241pub struct DedupIndex {
242    chunk_refs: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
243    total_dedup_savings: Arc<RwLock<u64>>,
244}
245
246impl Default for DedupIndex {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl DedupIndex {
253    pub fn new() -> Self {
254        Self {
255            chunk_refs: Arc::new(RwLock::new(HashMap::new())),
256            total_dedup_savings: Arc::new(RwLock::new(0)),
257        }
258    }
259
260    /// Check if chunk exists and update reference count
261    pub async fn check_and_update(&self, hash: &[u8; 32], size: u32) -> bool {
262        let mut refs = self.chunk_refs.write().await;
263
264        if let Some(chunk_ref) = refs.get_mut(hash) {
265            chunk_ref.reference_count += 1;
266            chunk_ref.access_count += 1;
267
268            let mut savings = self.total_dedup_savings.write().await;
269            *savings += size as u64;
270
271            true
272        } else {
273            refs.insert(
274                *hash,
275                ChunkRef {
276                    _size: size,
277                    _created_at: SystemTime::now(),
278                    access_count: 1,
279                    reference_count: 1,
280                },
281            );
282            false
283        }
284    }
285
286    /// Get deduplication statistics
287    pub async fn get_stats(&self) -> DedupStatistics {
288        let refs = self.chunk_refs.read().await;
289        let savings = *self.total_dedup_savings.read().await;
290
291        let total_chunks: u64 = refs.values().map(|r| r.reference_count as u64).sum();
292        let unique_chunks = refs.len() as u64;
293
294        DedupStatistics {
295            total_chunks,
296            unique_chunks,
297            dedup_ratio: if total_chunks > 0 {
298                1.0 - (unique_chunks as f64 / total_chunks as f64)
299            } else {
300                0.0
301            },
302            space_saved: savings,
303        }
304    }
305}
306
307/// Storage backend for chunks
308pub struct ChunkStorage {
309    chunks: Arc<RwLock<HashMap<[u8; 32], Bytes>>>,
310}
311
312impl Default for ChunkStorage {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318impl ChunkStorage {
319    pub fn new() -> Self {
320        Self {
321            chunks: Arc::new(RwLock::new(HashMap::new())),
322        }
323    }
324
325    /// Store a chunk
326    pub async fn store(&self, hash: [u8; 32], data: Bytes) -> Result<()> {
327        let mut chunks = self.chunks.write().await;
328        chunks.insert(hash, data);
329        Ok(())
330    }
331
332    /// Retrieve a chunk
333    pub async fn retrieve(&self, hash: &[u8; 32]) -> Result<Bytes> {
334        let chunks = self.chunks.read().await;
335        chunks
336            .get(hash)
337            .cloned()
338            .ok_or_else(|| anyhow!("Chunk not found"))
339    }
340
341    /// Verify chunk integrity
342    pub async fn verify(&self, hash: &[u8; 32]) -> Result<bool> {
343        let chunks = self.chunks.read().await;
344        if let Some(data) = chunks.get(hash) {
345            let computed_hash = blake3::hash(data);
346            Ok(computed_hash.as_bytes() == hash)
347        } else {
348            Ok(false)
349        }
350    }
351}
352
353/// Main content addressing system
354pub struct ContentAddressingSystem {
355    chunker: ContentDefinedChunker,
356    dedup_index: DedupIndex,
357    chunk_store: ChunkStorage,
358    metadata: Arc<RwLock<HashMap<[u8; 32], ChunkMetadata>>>,
359}
360
361impl Default for ContentAddressingSystem {
362    fn default() -> Self {
363        Self::new()
364    }
365}
366
367impl ContentAddressingSystem {
368    /// Create a new content addressing system
369    pub fn new() -> Self {
370        Self {
371            chunker: ContentDefinedChunker::new(ChunkingConfig::default()),
372            dedup_index: DedupIndex::new(),
373            chunk_store: ChunkStorage::new(),
374            metadata: Arc::new(RwLock::new(HashMap::new())),
375        }
376    }
377
378    /// Store content and return its address
379    pub async fn store_content(
380        &mut self,
381        content: impl AsyncRead + Unpin,
382    ) -> Result<ContentAddress> {
383        // Chunk the content
384        let chunks = self.chunker.chunk_data(content).await?;
385
386        let mut chunk_hashes = Vec::new();
387        let mut total_size = 0u64;
388        let mut chunk_sizes = Vec::new();
389
390        // Process each chunk
391        for chunk in chunks {
392            let hash = blake3::hash(&chunk);
393            let hash_bytes = *hash.as_bytes();
394
395            chunk_hashes.push(hash_bytes);
396            chunk_sizes.push(chunk.len() as u32);
397            total_size += chunk.len() as u64;
398
399            // Check deduplication
400            let is_duplicate = self
401                .dedup_index
402                .check_and_update(&hash_bytes, chunk.len() as u32)
403                .await;
404
405            // Store if new
406            if !is_duplicate {
407                self.chunk_store.store(hash_bytes, chunk).await?;
408            }
409        }
410
411        // Generate root hash from manifest
412        let mut hasher = Hasher::new();
413        for hash in &chunk_hashes {
414            hasher.update(hash);
415        }
416        hasher.update(&total_size.to_le_bytes());
417        let root_hash = *hasher.finalize().as_bytes();
418
419        // Store metadata
420        let metadata = ChunkMetadata {
421            chunk_sizes,
422            created_at: SystemTime::now(),
423            access_count: 0,
424            dedup_count: 0,
425        };
426
427        self.metadata.write().await.insert(root_hash, metadata);
428
429        Ok(ContentAddress::new_detailed(
430            root_hash,
431            chunk_hashes,
432            total_size,
433        ))
434    }
435
436    /// Retrieve content by address
437    pub async fn retrieve_content(&self, address: &ContentAddress) -> Result<Vec<u8>> {
438        let mut content = Vec::with_capacity(address.total_size as usize);
439
440        for chunk_hash in &address.chunk_hashes {
441            let chunk = self.chunk_store.retrieve(chunk_hash).await?;
442            content.extend_from_slice(&chunk);
443        }
444
445        // Update access count
446        if let Some(metadata) = self.metadata.write().await.get_mut(&address.root_hash) {
447            metadata.access_count += 1;
448        }
449
450        Ok(content)
451    }
452
453    /// Verify content integrity without retrieval
454    pub async fn verify_integrity(&self, address: &ContentAddress) -> Result<bool> {
455        // Verify all chunks exist and match their hashes
456        for chunk_hash in &address.chunk_hashes {
457            if !self.chunk_store.verify(chunk_hash).await? {
458                return Ok(false);
459            }
460        }
461
462        // Verify manifest hash
463        let mut hasher = Hasher::new();
464        for hash in &address.chunk_hashes {
465            hasher.update(hash);
466        }
467        hasher.update(&address.total_size.to_le_bytes());
468        let computed_root = hasher.finalize();
469
470        Ok(computed_root.as_bytes() == &address.root_hash)
471    }
472
473    /// Get chunk metadata
474    pub async fn get_chunk_info(&self, address: &ContentAddress) -> Result<ChunkMetadata> {
475        let metadata = self.metadata.read().await;
476        metadata
477            .get(&address.root_hash)
478            .cloned()
479            .ok_or_else(|| anyhow!("Metadata not found"))
480    }
481
482    /// Get deduplication statistics
483    pub async fn get_dedup_stats(&self) -> DedupStatistics {
484        self.dedup_index.get_stats().await
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use std::io::Cursor;
492
493    #[tokio::test]
494    async fn test_deterministic_addressing() {
495        let mut system1 = ContentAddressingSystem::new();
496        let mut system2 = ContentAddressingSystem::new();
497        let content = b"test content for deterministic addressing";
498
499        let addr1 = system1.store_content(Cursor::new(content)).await.unwrap();
500        let addr2 = system2.store_content(Cursor::new(content)).await.unwrap();
501
502        assert_eq!(addr1, addr2, "Same content should produce same address");
503    }
504
505    #[tokio::test]
506    async fn test_content_integrity_verification() {
507        let mut system = ContentAddressingSystem::new();
508        let content = b"test content for integrity verification";
509
510        let addr = system.store_content(Cursor::new(content)).await.unwrap();
511        let is_valid = system.verify_integrity(&addr).await.unwrap();
512
513        assert!(is_valid, "Content integrity should be valid");
514
515        let retrieved = system.retrieve_content(&addr).await.unwrap();
516        assert_eq!(
517            retrieved, content,
518            "Retrieved content should match original"
519        );
520    }
521
522    #[tokio::test]
523    async fn test_chunking_boundaries() {
524        let config = ChunkingConfig::default();
525        let chunker = ContentDefinedChunker::new(config.clone());
526
527        // Test minimum chunk size
528        let small_data = vec![0u8; config.min_chunk_size - 1];
529        assert_eq!(chunker.find_boundary(&small_data), None);
530
531        // Test maximum chunk size
532        let large_data = vec![1u8; config.max_chunk_size + 100];
533        let boundary = chunker.find_boundary(&large_data);
534        assert_eq!(boundary, Some(config.max_chunk_size));
535    }
536
537    #[tokio::test]
538    async fn test_deduplication_efficiency() {
539        let mut system = ContentAddressingSystem::new();
540
541        // Store same content twice
542        let content = b"duplicate content for dedup testing";
543        let addr1 = system.store_content(Cursor::new(content)).await.unwrap();
544        let addr2 = system.store_content(Cursor::new(content)).await.unwrap();
545
546        assert_eq!(addr1, addr2, "Duplicate content should have same address");
547
548        let stats = system.get_dedup_stats().await;
549        assert!(stats.dedup_ratio > 0.0, "Should have deduplication");
550        assert_eq!(stats.unique_chunks, 1, "Should only store one unique chunk");
551    }
552
553    #[tokio::test]
554    async fn test_empty_content() {
555        let mut system = ContentAddressingSystem::new();
556        let content = b"";
557
558        let addr = system.store_content(Cursor::new(content)).await.unwrap();
559        assert_eq!(addr.total_size, 0);
560        assert_eq!(addr.chunk_count, 0);
561
562        let retrieved = system.retrieve_content(&addr).await.unwrap();
563        assert_eq!(retrieved.len(), 0);
564    }
565
566    #[tokio::test]
567    async fn test_large_content_streaming() {
568        let mut system = ContentAddressingSystem::new();
569        let large_content = vec![42u8; 10_000_000]; // 10MB
570
571        let addr = system
572            .store_content(Cursor::new(&large_content))
573            .await
574            .unwrap();
575        assert_eq!(addr.total_size, large_content.len() as u64);
576        assert!(addr.chunk_count > 1, "Large content should be chunked");
577
578        let retrieved = system.retrieve_content(&addr).await.unwrap();
579        assert_eq!(retrieved.len(), large_content.len());
580        assert_eq!(retrieved, large_content);
581    }
582}