1use anyhow::{Result, anyhow};
7use blake3::Hasher;
8use bytes::{Bytes, BytesMut};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::io::{AsyncRead, AsyncReadExt};
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct ContentAddress {
19 pub root_hash: [u8; 32],
21 pub chunk_hashes: Vec<[u8; 32]>,
23 pub total_size: u64,
25 pub chunk_count: u32,
27}
28
29impl ContentAddress {
30 pub fn new(data: &[u8]) -> Self {
32 let mut hasher = Hasher::new();
33 hasher.update(data);
34 let root_hash = hasher.finalize().into();
35
36 Self {
37 root_hash,
38 chunk_count: 1,
39 chunk_hashes: vec![root_hash],
40 total_size: data.len() as u64,
41 }
42 }
43
44 pub fn new_detailed(root_hash: [u8; 32], chunk_hashes: Vec<[u8; 32]>, total_size: u64) -> Self {
46 Self {
47 root_hash,
48 chunk_count: chunk_hashes.len() as u32,
49 chunk_hashes,
50 total_size,
51 }
52 }
53
54 pub fn verify(&self, data: &[u8]) -> bool {
56 let mut hasher = Hasher::new();
57 hasher.update(data);
58 let hash: [u8; 32] = hasher.finalize().into();
59 hash == self.root_hash
60 }
61
62 pub fn from_bytes(bytes: &[u8]) -> Self {
64 let mut root_hash = [0u8; 32];
65 let len = bytes.len().min(32);
66 root_hash[..len].copy_from_slice(&bytes[..len]);
67
68 Self {
69 root_hash,
70 chunk_count: 1,
71 chunk_hashes: vec![root_hash],
72 total_size: bytes.len() as u64,
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct ChunkMetadata {
80 pub chunk_sizes: Vec<u32>,
81 pub created_at: SystemTime,
82 pub access_count: u32,
83 pub dedup_count: u32,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupStatistics {
89 pub total_chunks: u64,
90 pub unique_chunks: u64,
91 pub dedup_ratio: f64,
92 pub space_saved: u64,
93}
94
95#[derive(Debug, Clone)]
97pub struct ChunkingConfig {
98 pub min_chunk_size: usize,
99 pub target_chunk_size: usize,
100 pub max_chunk_size: usize,
101 pub window_size: usize,
102}
103
104impl Default for ChunkingConfig {
105 fn default() -> Self {
106 Self {
107 min_chunk_size: 1024, target_chunk_size: 65536, max_chunk_size: 1_048_576, window_size: 48,
111 }
112 }
113}
114
115pub struct ContentDefinedChunker {
117 config: ChunkingConfig,
118 buffer: BytesMut,
119}
120
121impl ContentDefinedChunker {
122 pub fn new(config: ChunkingConfig) -> Self {
123 let buffer_capacity = config.max_chunk_size;
124 Self {
125 config,
126 buffer: BytesMut::with_capacity(buffer_capacity),
127 }
128 }
129
130 pub fn find_boundary(&self, data: &[u8]) -> Option<usize> {
132 if data.len() < self.config.min_chunk_size {
133 return None;
134 }
135
136 let mut hash = 0u32;
137 let window = self.config.window_size;
138
139 let search_start = self.config.min_chunk_size;
141 let search_end = data.len().min(self.config.max_chunk_size);
142
143 for i in search_start..search_end {
144 if i >= window {
146 let old_byte = data[i - window];
147 hash = hash.rotate_left(1) ^ u32::from(old_byte);
148 }
149
150 let new_byte = data[i];
151 hash = hash.rotate_left(1) ^ u32::from(new_byte);
152
153 let mask = (1 << 13) - 1; if (hash & mask) == 0 {
156 return Some(i);
157 }
158 }
159
160 if data.len() >= self.config.max_chunk_size {
162 Some(self.config.max_chunk_size)
163 } else if !data.is_empty() {
164 Some(data.len())
165 } else {
166 None
167 }
168 }
169
170 pub async fn chunk_data(&mut self, mut reader: impl AsyncRead + Unpin) -> Result<Vec<Bytes>> {
172 let mut chunks = Vec::new();
173 let mut buffer = vec![0u8; self.config.max_chunk_size];
174
175 loop {
176 let n = reader.read(&mut buffer).await?;
177 if n == 0 {
178 break;
179 }
180
181 self.buffer.extend_from_slice(&buffer[..n]);
182
183 while self.buffer.len() >= self.config.min_chunk_size {
184 if let Some(boundary) = self.find_boundary(&self.buffer) {
185 let chunk = self.buffer.split_to(boundary);
186 chunks.push(chunk.freeze());
187 } else {
188 break;
189 }
190 }
191 }
192
193 if !self.buffer.is_empty() {
195 chunks.push(self.buffer.split().freeze());
196 }
197
198 Ok(chunks)
199 }
200}
201
202#[derive(Debug, Clone)]
204struct ChunkRef {
205 _size: u32,
206 _created_at: SystemTime,
207 access_count: u32,
208 reference_count: u32,
209}
210
211pub struct ContentStore {
213 storage: HashMap<ContentAddress, Vec<u8>>,
214}
215
216impl Default for ContentStore {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222impl ContentStore {
223 pub fn new() -> Self {
224 Self {
225 storage: HashMap::new(),
226 }
227 }
228
229 pub fn store(&mut self, address: ContentAddress, data: Vec<u8>) {
230 self.storage.insert(address, data);
231 }
232
233 pub fn retrieve(&self, address: &ContentAddress) -> Option<&Vec<u8>> {
234 self.storage.get(address)
235 }
236
237 pub fn size(&self) -> usize {
238 self.storage.len()
239 }
240}
241
242pub struct DedupIndex {
244 chunk_refs: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
245 total_dedup_savings: Arc<RwLock<u64>>,
246}
247
248impl Default for DedupIndex {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl DedupIndex {
255 pub fn new() -> Self {
256 Self {
257 chunk_refs: Arc::new(RwLock::new(HashMap::new())),
258 total_dedup_savings: Arc::new(RwLock::new(0)),
259 }
260 }
261
262 pub async fn check_and_update(&self, hash: &[u8; 32], size: u32) -> bool {
264 let mut refs = self.chunk_refs.write().await;
265
266 if let Some(chunk_ref) = refs.get_mut(hash) {
267 chunk_ref.reference_count += 1;
268 chunk_ref.access_count += 1;
269
270 let mut savings = self.total_dedup_savings.write().await;
271 *savings += size as u64;
272
273 true
274 } else {
275 refs.insert(
276 *hash,
277 ChunkRef {
278 _size: size,
279 _created_at: SystemTime::now(),
280 access_count: 1,
281 reference_count: 1,
282 },
283 );
284 false
285 }
286 }
287
288 pub async fn get_stats(&self) -> DedupStatistics {
290 let refs = self.chunk_refs.read().await;
291 let savings = *self.total_dedup_savings.read().await;
292
293 let total_chunks: u64 = refs.values().map(|r| r.reference_count as u64).sum();
294 let unique_chunks = refs.len() as u64;
295
296 DedupStatistics {
297 total_chunks,
298 unique_chunks,
299 dedup_ratio: if total_chunks > 0 {
300 1.0 - (unique_chunks as f64 / total_chunks as f64)
301 } else {
302 0.0
303 },
304 space_saved: savings,
305 }
306 }
307}
308
309pub struct ChunkStorage {
311 chunks: Arc<RwLock<HashMap<[u8; 32], Bytes>>>,
312}
313
314impl Default for ChunkStorage {
315 fn default() -> Self {
316 Self::new()
317 }
318}
319
320impl ChunkStorage {
321 pub fn new() -> Self {
322 Self {
323 chunks: Arc::new(RwLock::new(HashMap::new())),
324 }
325 }
326
327 pub async fn store(&self, hash: [u8; 32], data: Bytes) -> Result<()> {
329 let mut chunks = self.chunks.write().await;
330 chunks.insert(hash, data);
331 Ok(())
332 }
333
334 pub async fn retrieve(&self, hash: &[u8; 32]) -> Result<Bytes> {
336 let chunks = self.chunks.read().await;
337 chunks
338 .get(hash)
339 .cloned()
340 .ok_or_else(|| anyhow!("Chunk not found"))
341 }
342
343 pub async fn verify(&self, hash: &[u8; 32]) -> Result<bool> {
345 let chunks = self.chunks.read().await;
346 if let Some(data) = chunks.get(hash) {
347 let computed_hash = blake3::hash(data);
348 Ok(computed_hash.as_bytes() == hash)
349 } else {
350 Ok(false)
351 }
352 }
353}
354
355pub struct ContentAddressingSystem {
357 chunker: ContentDefinedChunker,
358 dedup_index: DedupIndex,
359 chunk_store: ChunkStorage,
360 metadata: Arc<RwLock<HashMap<[u8; 32], ChunkMetadata>>>,
361}
362
363impl Default for ContentAddressingSystem {
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369impl ContentAddressingSystem {
370 pub fn new() -> Self {
372 Self {
373 chunker: ContentDefinedChunker::new(ChunkingConfig::default()),
374 dedup_index: DedupIndex::new(),
375 chunk_store: ChunkStorage::new(),
376 metadata: Arc::new(RwLock::new(HashMap::new())),
377 }
378 }
379
380 pub async fn store_content(
382 &mut self,
383 content: impl AsyncRead + Unpin,
384 ) -> Result<ContentAddress> {
385 let chunks = self.chunker.chunk_data(content).await?;
387
388 let mut chunk_hashes = Vec::new();
389 let mut total_size = 0u64;
390 let mut chunk_sizes = Vec::new();
391
392 for chunk in chunks {
394 let hash = blake3::hash(&chunk);
395 let hash_bytes = *hash.as_bytes();
396
397 chunk_hashes.push(hash_bytes);
398 chunk_sizes.push(chunk.len() as u32);
399 total_size += chunk.len() as u64;
400
401 let is_duplicate = self
403 .dedup_index
404 .check_and_update(&hash_bytes, chunk.len() as u32)
405 .await;
406
407 if !is_duplicate {
409 self.chunk_store.store(hash_bytes, chunk).await?;
410 }
411 }
412
413 let mut hasher = Hasher::new();
415 for hash in &chunk_hashes {
416 hasher.update(hash);
417 }
418 hasher.update(&total_size.to_le_bytes());
419 let root_hash = *hasher.finalize().as_bytes();
420
421 let metadata = ChunkMetadata {
423 chunk_sizes,
424 created_at: SystemTime::now(),
425 access_count: 0,
426 dedup_count: 0,
427 };
428
429 self.metadata.write().await.insert(root_hash, metadata);
430
431 Ok(ContentAddress::new_detailed(
432 root_hash,
433 chunk_hashes,
434 total_size,
435 ))
436 }
437
438 pub async fn retrieve_content(&self, address: &ContentAddress) -> Result<Vec<u8>> {
440 let mut content = Vec::with_capacity(address.total_size as usize);
441
442 for chunk_hash in &address.chunk_hashes {
443 let chunk = self.chunk_store.retrieve(chunk_hash).await?;
444 content.extend_from_slice(&chunk);
445 }
446
447 if let Some(metadata) = self.metadata.write().await.get_mut(&address.root_hash) {
449 metadata.access_count += 1;
450 }
451
452 Ok(content)
453 }
454
455 pub async fn verify_integrity(&self, address: &ContentAddress) -> Result<bool> {
457 for chunk_hash in &address.chunk_hashes {
459 if !self.chunk_store.verify(chunk_hash).await? {
460 return Ok(false);
461 }
462 }
463
464 let mut hasher = Hasher::new();
466 for hash in &address.chunk_hashes {
467 hasher.update(hash);
468 }
469 hasher.update(&address.total_size.to_le_bytes());
470 let computed_root = hasher.finalize();
471
472 Ok(computed_root.as_bytes() == &address.root_hash)
473 }
474
475 pub async fn get_chunk_info(&self, address: &ContentAddress) -> Result<ChunkMetadata> {
477 let metadata = self.metadata.read().await;
478 metadata
479 .get(&address.root_hash)
480 .cloned()
481 .ok_or_else(|| anyhow!("Metadata not found"))
482 }
483
484 pub async fn get_dedup_stats(&self) -> DedupStatistics {
486 self.dedup_index.get_stats().await
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use std::io::Cursor;
494
495 #[tokio::test]
496 async fn test_deterministic_addressing() {
497 let mut system1 = ContentAddressingSystem::new();
498 let mut system2 = ContentAddressingSystem::new();
499 let content = b"test content for deterministic addressing";
500
501 let addr1 = system1.store_content(Cursor::new(content)).await.unwrap();
502 let addr2 = system2.store_content(Cursor::new(content)).await.unwrap();
503
504 assert_eq!(addr1, addr2, "Same content should produce same address");
505 }
506
507 #[tokio::test]
508 async fn test_content_integrity_verification() {
509 let mut system = ContentAddressingSystem::new();
510 let content = b"test content for integrity verification";
511
512 let addr = system.store_content(Cursor::new(content)).await.unwrap();
513 let is_valid = system.verify_integrity(&addr).await.unwrap();
514
515 assert!(is_valid, "Content integrity should be valid");
516
517 let retrieved = system.retrieve_content(&addr).await.unwrap();
518 assert_eq!(
519 retrieved, content,
520 "Retrieved content should match original"
521 );
522 }
523
524 #[tokio::test]
525 async fn test_chunking_boundaries() {
526 let config = ChunkingConfig::default();
527 let chunker = ContentDefinedChunker::new(config.clone());
528
529 let small_data = vec![0u8; config.min_chunk_size - 1];
531 assert_eq!(chunker.find_boundary(&small_data), None);
532
533 let large_data = vec![1u8; config.max_chunk_size + 100];
535 let boundary = chunker.find_boundary(&large_data);
536 assert!(boundary.is_some());
537 let b = boundary.unwrap();
538 assert!(b >= config.min_chunk_size && b <= config.max_chunk_size);
539 }
540
541 #[tokio::test]
542 async fn test_deduplication_efficiency() {
543 let mut system = ContentAddressingSystem::new();
544
545 let content = b"duplicate content for dedup testing";
547 let addr1 = system.store_content(Cursor::new(content)).await.unwrap();
548 let addr2 = system.store_content(Cursor::new(content)).await.unwrap();
549
550 assert_eq!(addr1, addr2, "Duplicate content should have same address");
551
552 let stats = system.get_dedup_stats().await;
553 assert!(stats.dedup_ratio > 0.0, "Should have deduplication");
554 assert_eq!(stats.unique_chunks, 1, "Should only store one unique chunk");
555 }
556
557 #[tokio::test]
558 async fn test_empty_content() {
559 let mut system = ContentAddressingSystem::new();
560 let content = b"";
561
562 let addr = system.store_content(Cursor::new(content)).await.unwrap();
563 assert_eq!(addr.total_size, 0);
564 assert_eq!(addr.chunk_count, 0);
565
566 let retrieved = system.retrieve_content(&addr).await.unwrap();
567 assert_eq!(retrieved.len(), 0);
568 }
569
570 #[tokio::test]
571 async fn test_large_content_streaming() {
572 let mut system = ContentAddressingSystem::new();
573 let large_content = vec![42u8; 10_000_000]; let addr = system
576 .store_content(Cursor::new(&large_content))
577 .await
578 .unwrap();
579 assert_eq!(addr.total_size, large_content.len() as u64);
580 assert!(addr.chunk_count > 1, "Large content should be chunked");
581
582 let retrieved = system.retrieve_content(&addr).await.unwrap();
583 assert_eq!(retrieved.len(), large_content.len());
584 assert_eq!(retrieved, large_content);
585 }
586}