1use anyhow::{Result, anyhow};
7use blake3::Hasher;
8use bytes::{Bytes, BytesMut};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::io::{AsyncRead, AsyncReadExt};
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct ContentAddress {
19 pub root_hash: [u8; 32],
21 pub chunk_hashes: Vec<[u8; 32]>,
23 pub total_size: u64,
25 pub chunk_count: u32,
27}
28
29impl ContentAddress {
30 pub fn new(data: &[u8]) -> Self {
32 let mut hasher = Hasher::new();
33 hasher.update(data);
34 let root_hash = hasher.finalize().into();
35
36 Self {
37 root_hash,
38 chunk_count: 1,
39 chunk_hashes: vec![root_hash],
40 total_size: data.len() as u64,
41 }
42 }
43
44 pub fn new_detailed(root_hash: [u8; 32], chunk_hashes: Vec<[u8; 32]>, total_size: u64) -> Self {
46 Self {
47 root_hash,
48 chunk_count: chunk_hashes.len() as u32,
49 chunk_hashes,
50 total_size,
51 }
52 }
53
54 pub fn verify(&self, data: &[u8]) -> bool {
56 let mut hasher = Hasher::new();
57 hasher.update(data);
58 let hash: [u8; 32] = hasher.finalize().into();
59 hash == self.root_hash
60 }
61
62 pub fn from_bytes(bytes: &[u8]) -> Self {
64 let mut root_hash = [0u8; 32];
65 let len = bytes.len().min(32);
66 root_hash[..len].copy_from_slice(&bytes[..len]);
67
68 Self {
69 root_hash,
70 chunk_count: 1,
71 chunk_hashes: vec![root_hash],
72 total_size: bytes.len() as u64,
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ChunkMetadata {
80 pub chunk_sizes: Vec<u32>,
81 pub created_at: SystemTime,
82 pub access_count: u32,
83 pub dedup_count: u32,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupStatistics {
89 pub total_chunks: u64,
90 pub unique_chunks: u64,
91 pub dedup_ratio: f64,
92 pub space_saved: u64,
93}
94
95#[derive(Debug, Clone)]
97pub struct ChunkingConfig {
98 pub min_chunk_size: usize,
99 pub target_chunk_size: usize,
100 pub max_chunk_size: usize,
101 pub window_size: usize,
102}
103
104impl Default for ChunkingConfig {
105 fn default() -> Self {
106 Self {
107 min_chunk_size: 1024, target_chunk_size: 65536, max_chunk_size: 1_048_576, window_size: 48,
111 }
112 }
113}
114
115pub struct ContentDefinedChunker {
117 config: ChunkingConfig,
118 buffer: BytesMut,
119}
120
121impl ContentDefinedChunker {
122 pub fn new(config: ChunkingConfig) -> Self {
123 let buffer_capacity = config.max_chunk_size;
124 Self {
125 config,
126 buffer: BytesMut::with_capacity(buffer_capacity),
127 }
128 }
129
130 pub fn find_boundary(&self, data: &[u8]) -> Option<usize> {
132 if data.len() < self.config.min_chunk_size {
133 return None;
134 }
135
136 let mut hash = 0u32;
137 let window = self.config.window_size;
138
139 let search_start = self.config.min_chunk_size;
141 let search_end = data.len().min(self.config.max_chunk_size);
142
143 for i in search_start..search_end {
144 if i >= window {
146 let old_byte = data[i - window];
147 hash = hash.rotate_left(1) ^ u32::from(old_byte);
148 }
149
150 let new_byte = data[i];
151 hash = hash.rotate_left(1) ^ u32::from(new_byte);
152
153 let mask = (1 << 13) - 1; if (hash & mask) == 0 {
156 return Some(i);
157 }
158 }
159
160 if data.len() >= self.config.max_chunk_size {
162 Some(self.config.max_chunk_size)
163 } else {
164 None
165 }
166 }
167
168 pub async fn chunk_data(&mut self, mut reader: impl AsyncRead + Unpin) -> Result<Vec<Bytes>> {
170 let mut chunks = Vec::new();
171 let mut buffer = vec![0u8; self.config.max_chunk_size];
172
173 loop {
174 let n = reader.read(&mut buffer).await?;
175 if n == 0 {
176 break;
177 }
178
179 self.buffer.extend_from_slice(&buffer[..n]);
180
181 while self.buffer.len() >= self.config.min_chunk_size {
182 if let Some(boundary) = self.find_boundary(&self.buffer) {
183 let chunk = self.buffer.split_to(boundary);
184 chunks.push(chunk.freeze());
185 } else {
186 break;
187 }
188 }
189 }
190
191 if !self.buffer.is_empty() {
193 chunks.push(self.buffer.split().freeze());
194 }
195
196 Ok(chunks)
197 }
198}
199
200#[derive(Debug, Clone)]
202struct ChunkRef {
203 _size: u32,
204 _created_at: SystemTime,
205 access_count: u32,
206 reference_count: u32,
207}
208
209pub struct ContentStore {
211 storage: HashMap<ContentAddress, Vec<u8>>,
212}
213
214impl Default for ContentStore {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220impl ContentStore {
221 pub fn new() -> Self {
222 Self {
223 storage: HashMap::new(),
224 }
225 }
226
227 pub fn store(&mut self, address: ContentAddress, data: Vec<u8>) {
228 self.storage.insert(address, data);
229 }
230
231 pub fn retrieve(&self, address: &ContentAddress) -> Option<&Vec<u8>> {
232 self.storage.get(address)
233 }
234
235 pub fn size(&self) -> usize {
236 self.storage.len()
237 }
238}
239
240pub struct DedupIndex {
242 chunk_refs: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
243 total_dedup_savings: Arc<RwLock<u64>>,
244}
245
246impl Default for DedupIndex {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252impl DedupIndex {
253 pub fn new() -> Self {
254 Self {
255 chunk_refs: Arc::new(RwLock::new(HashMap::new())),
256 total_dedup_savings: Arc::new(RwLock::new(0)),
257 }
258 }
259
260 pub async fn check_and_update(&self, hash: &[u8; 32], size: u32) -> bool {
262 let mut refs = self.chunk_refs.write().await;
263
264 if let Some(chunk_ref) = refs.get_mut(hash) {
265 chunk_ref.reference_count += 1;
266 chunk_ref.access_count += 1;
267
268 let mut savings = self.total_dedup_savings.write().await;
269 *savings += size as u64;
270
271 true
272 } else {
273 refs.insert(
274 *hash,
275 ChunkRef {
276 _size: size,
277 _created_at: SystemTime::now(),
278 access_count: 1,
279 reference_count: 1,
280 },
281 );
282 false
283 }
284 }
285
286 pub async fn get_stats(&self) -> DedupStatistics {
288 let refs = self.chunk_refs.read().await;
289 let savings = *self.total_dedup_savings.read().await;
290
291 let total_chunks: u64 = refs.values().map(|r| r.reference_count as u64).sum();
292 let unique_chunks = refs.len() as u64;
293
294 DedupStatistics {
295 total_chunks,
296 unique_chunks,
297 dedup_ratio: if total_chunks > 0 {
298 1.0 - (unique_chunks as f64 / total_chunks as f64)
299 } else {
300 0.0
301 },
302 space_saved: savings,
303 }
304 }
305}
306
307pub struct ChunkStorage {
309 chunks: Arc<RwLock<HashMap<[u8; 32], Bytes>>>,
310}
311
312impl Default for ChunkStorage {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318impl ChunkStorage {
319 pub fn new() -> Self {
320 Self {
321 chunks: Arc::new(RwLock::new(HashMap::new())),
322 }
323 }
324
325 pub async fn store(&self, hash: [u8; 32], data: Bytes) -> Result<()> {
327 let mut chunks = self.chunks.write().await;
328 chunks.insert(hash, data);
329 Ok(())
330 }
331
332 pub async fn retrieve(&self, hash: &[u8; 32]) -> Result<Bytes> {
334 let chunks = self.chunks.read().await;
335 chunks
336 .get(hash)
337 .cloned()
338 .ok_or_else(|| anyhow!("Chunk not found"))
339 }
340
341 pub async fn verify(&self, hash: &[u8; 32]) -> Result<bool> {
343 let chunks = self.chunks.read().await;
344 if let Some(data) = chunks.get(hash) {
345 let computed_hash = blake3::hash(data);
346 Ok(computed_hash.as_bytes() == hash)
347 } else {
348 Ok(false)
349 }
350 }
351}
352
353pub struct ContentAddressingSystem {
355 chunker: ContentDefinedChunker,
356 dedup_index: DedupIndex,
357 chunk_store: ChunkStorage,
358 metadata: Arc<RwLock<HashMap<[u8; 32], ChunkMetadata>>>,
359}
360
361impl Default for ContentAddressingSystem {
362 fn default() -> Self {
363 Self::new()
364 }
365}
366
367impl ContentAddressingSystem {
368 pub fn new() -> Self {
370 Self {
371 chunker: ContentDefinedChunker::new(ChunkingConfig::default()),
372 dedup_index: DedupIndex::new(),
373 chunk_store: ChunkStorage::new(),
374 metadata: Arc::new(RwLock::new(HashMap::new())),
375 }
376 }
377
378 pub async fn store_content(
380 &mut self,
381 content: impl AsyncRead + Unpin,
382 ) -> Result<ContentAddress> {
383 let chunks = self.chunker.chunk_data(content).await?;
385
386 let mut chunk_hashes = Vec::new();
387 let mut total_size = 0u64;
388 let mut chunk_sizes = Vec::new();
389
390 for chunk in chunks {
392 let hash = blake3::hash(&chunk);
393 let hash_bytes = *hash.as_bytes();
394
395 chunk_hashes.push(hash_bytes);
396 chunk_sizes.push(chunk.len() as u32);
397 total_size += chunk.len() as u64;
398
399 let is_duplicate = self
401 .dedup_index
402 .check_and_update(&hash_bytes, chunk.len() as u32)
403 .await;
404
405 if !is_duplicate {
407 self.chunk_store.store(hash_bytes, chunk).await?;
408 }
409 }
410
411 let mut hasher = Hasher::new();
413 for hash in &chunk_hashes {
414 hasher.update(hash);
415 }
416 hasher.update(&total_size.to_le_bytes());
417 let root_hash = *hasher.finalize().as_bytes();
418
419 let metadata = ChunkMetadata {
421 chunk_sizes,
422 created_at: SystemTime::now(),
423 access_count: 0,
424 dedup_count: 0,
425 };
426
427 self.metadata.write().await.insert(root_hash, metadata);
428
429 Ok(ContentAddress::new_detailed(
430 root_hash,
431 chunk_hashes,
432 total_size,
433 ))
434 }
435
436 pub async fn retrieve_content(&self, address: &ContentAddress) -> Result<Vec<u8>> {
438 let mut content = Vec::with_capacity(address.total_size as usize);
439
440 for chunk_hash in &address.chunk_hashes {
441 let chunk = self.chunk_store.retrieve(chunk_hash).await?;
442 content.extend_from_slice(&chunk);
443 }
444
445 if let Some(metadata) = self.metadata.write().await.get_mut(&address.root_hash) {
447 metadata.access_count += 1;
448 }
449
450 Ok(content)
451 }
452
453 pub async fn verify_integrity(&self, address: &ContentAddress) -> Result<bool> {
455 for chunk_hash in &address.chunk_hashes {
457 if !self.chunk_store.verify(chunk_hash).await? {
458 return Ok(false);
459 }
460 }
461
462 let mut hasher = Hasher::new();
464 for hash in &address.chunk_hashes {
465 hasher.update(hash);
466 }
467 hasher.update(&address.total_size.to_le_bytes());
468 let computed_root = hasher.finalize();
469
470 Ok(computed_root.as_bytes() == &address.root_hash)
471 }
472
473 pub async fn get_chunk_info(&self, address: &ContentAddress) -> Result<ChunkMetadata> {
475 let metadata = self.metadata.read().await;
476 metadata
477 .get(&address.root_hash)
478 .cloned()
479 .ok_or_else(|| anyhow!("Metadata not found"))
480 }
481
482 pub async fn get_dedup_stats(&self) -> DedupStatistics {
484 self.dedup_index.get_stats().await
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use std::io::Cursor;
492
493 #[tokio::test]
494 async fn test_deterministic_addressing() {
495 let mut system1 = ContentAddressingSystem::new();
496 let mut system2 = ContentAddressingSystem::new();
497 let content = b"test content for deterministic addressing";
498
499 let addr1 = system1.store_content(Cursor::new(content)).await.unwrap();
500 let addr2 = system2.store_content(Cursor::new(content)).await.unwrap();
501
502 assert_eq!(addr1, addr2, "Same content should produce same address");
503 }
504
505 #[tokio::test]
506 async fn test_content_integrity_verification() {
507 let mut system = ContentAddressingSystem::new();
508 let content = b"test content for integrity verification";
509
510 let addr = system.store_content(Cursor::new(content)).await.unwrap();
511 let is_valid = system.verify_integrity(&addr).await.unwrap();
512
513 assert!(is_valid, "Content integrity should be valid");
514
515 let retrieved = system.retrieve_content(&addr).await.unwrap();
516 assert_eq!(
517 retrieved, content,
518 "Retrieved content should match original"
519 );
520 }
521
522 #[tokio::test]
523 async fn test_chunking_boundaries() {
524 let config = ChunkingConfig::default();
525 let chunker = ContentDefinedChunker::new(config.clone());
526
527 let small_data = vec![0u8; config.min_chunk_size - 1];
529 assert_eq!(chunker.find_boundary(&small_data), None);
530
531 let large_data = vec![1u8; config.max_chunk_size + 100];
533 let boundary = chunker.find_boundary(&large_data);
534 assert_eq!(boundary, Some(config.max_chunk_size));
535 }
536
537 #[tokio::test]
538 async fn test_deduplication_efficiency() {
539 let mut system = ContentAddressingSystem::new();
540
541 let content = b"duplicate content for dedup testing";
543 let addr1 = system.store_content(Cursor::new(content)).await.unwrap();
544 let addr2 = system.store_content(Cursor::new(content)).await.unwrap();
545
546 assert_eq!(addr1, addr2, "Duplicate content should have same address");
547
548 let stats = system.get_dedup_stats().await;
549 assert!(stats.dedup_ratio > 0.0, "Should have deduplication");
550 assert_eq!(stats.unique_chunks, 1, "Should only store one unique chunk");
551 }
552
553 #[tokio::test]
554 async fn test_empty_content() {
555 let mut system = ContentAddressingSystem::new();
556 let content = b"";
557
558 let addr = system.store_content(Cursor::new(content)).await.unwrap();
559 assert_eq!(addr.total_size, 0);
560 assert_eq!(addr.chunk_count, 0);
561
562 let retrieved = system.retrieve_content(&addr).await.unwrap();
563 assert_eq!(retrieved.len(), 0);
564 }
565
566 #[tokio::test]
567 async fn test_large_content_streaming() {
568 let mut system = ContentAddressingSystem::new();
569 let large_content = vec![42u8; 10_000_000]; let addr = system
572 .store_content(Cursor::new(&large_content))
573 .await
574 .unwrap();
575 assert_eq!(addr.total_size, large_content.len() as u64);
576 assert!(addr.chunk_count > 1, "Large content should be chunked");
577
578 let retrieved = system.retrieve_content(&addr).await.unwrap();
579 assert_eq!(retrieved.len(), large_content.len());
580 assert_eq!(retrieved, large_content);
581 }
582}