1use super::*;
25use anyhow::Result;
26use serde::{Deserialize, Serialize};
27use sha2::{Digest, Sha256};
28use std::{collections::HashMap, sync::Arc, time::Instant};
29use tokio::sync::RwLock;
30
31#[derive(Debug, Clone)]
33pub struct StorageConfig {
34 pub db_path: String,
36
37 pub chunk_size: usize,
39
40 pub replication_config: ReplicationConfig,
42
43 pub cache_size: usize,
45}
46
47impl Default for StorageConfig {
48 fn default() -> Self {
49 Self {
50 db_path: "./data/storage".to_string(),
51 chunk_size: 1024 * 1024, replication_config: ReplicationConfig::default(),
53 cache_size: 100 * 1024 * 1024, }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct ReplicationConfig {
61 pub min_replicas: u32,
63
64 pub max_replicas: u32,
66
67 pub base_replicas: u32,
69
70 pub churn_threshold: f64,
72}
73
74impl Default for ReplicationConfig {
75 fn default() -> Self {
76 Self {
77 min_replicas: 5,
78 max_replicas: 20,
79 base_replicas: 8,
80 churn_threshold: 0.3, }
82 }
83}
84
85pub struct ContentStore {
87 db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
89
90 cache: Arc<RwLock<HashMap<ContentHash, CachedContent>>>,
92
93 config: StorageConfig,
95
96 stats: Arc<RwLock<StorageStats>>,
98}
99
100#[derive(Debug, Clone)]
102pub struct CachedContent {
103 pub data: Vec<u8>,
105
106 pub last_access: Instant,
108
109 pub access_count: u64,
111
112 pub metadata: ContentMetadata,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ContentMetadata {
119 pub size: usize,
121
122 pub content_type: ContentType,
124
125 pub created_at: u64,
127
128 pub chunk_count: Option<u32>,
130
131 pub replication_factor: u32,
133}
134
135impl Default for ContentMetadata {
136 fn default() -> Self {
137 Self {
138 size: 0,
139 content_type: ContentType::DataRetrieval,
140 created_at: std::time::SystemTime::now()
141 .duration_since(std::time::UNIX_EPOCH)
142 .map(|d| d.as_secs())
143 .unwrap_or(0),
144 chunk_count: None,
145 replication_factor: 8,
146 }
147 }
148}
149
150#[derive(Debug, Default, Clone)]
152pub struct StorageStats {
153 pub total_content: u64,
155
156 pub total_bytes: u64,
158
159 pub cache_hits: u64,
161
162 pub cache_misses: u64,
164
165 pub store_operations: u64,
167
168 pub retrieve_operations: u64,
170
171 pub total_items: u64,
173
174 pub capacity_bytes: u64,
176}
177
178impl StorageStats {
179 pub fn utilization(&self) -> f64 {
181 if self.capacity_bytes == 0 {
182 0.0
183 } else {
184 self.total_bytes as f64 / self.capacity_bytes as f64
185 }
186 }
187}
188
189impl ContentStore {
190 pub async fn new(config: StorageConfig) -> Result<Self> {
192 let db = Arc::new(RwLock::new(HashMap::new()));
194
195 Ok(Self {
196 db,
197 cache: Arc::new(RwLock::new(HashMap::new())),
198 config,
199 stats: Arc::new(RwLock::new(StorageStats::default())),
200 })
201 }
202
203 pub async fn store(&self, content: Vec<u8>, metadata: ContentMetadata) -> Result<ContentHash> {
205 let hash = Self::calculate_hash(&content);
207
208 let mut db = self.db.write().await;
210 db.insert(hash.0.to_vec(), content.clone());
211
212 let metadata_key = Self::metadata_key(&hash);
214 let metadata_bytes = bincode::serialize(&metadata)?;
215 db.insert(metadata_key, metadata_bytes);
216
217 let content_size = metadata.size;
219 let mut cache = self.cache.write().await;
220 cache.insert(
221 hash,
222 CachedContent {
223 data: content,
224 last_access: Instant::now(),
225 access_count: 0,
226 metadata,
227 },
228 );
229
230 let mut stats = self.stats.write().await;
232 stats.total_content += 1;
233 stats.total_bytes += content_size as u64;
234 stats.store_operations += 1;
235
236 Ok(hash)
237 }
238
239 pub async fn retrieve(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
241 {
243 let mut cache = self.cache.write().await;
244 if let Some(cached) = cache.get_mut(hash) {
245 cached.last_access = Instant::now();
246 cached.access_count += 1;
247
248 let mut stats = self.stats.write().await;
249 stats.cache_hits += 1;
250 stats.retrieve_operations += 1;
251
252 return Ok(Some(cached.data.clone()));
253 }
254 }
255
256 let mut stats = self.stats.write().await;
258 stats.cache_misses += 1;
259 stats.retrieve_operations += 1;
260 drop(stats);
261
262 let db = self.db.read().await;
263 match db.get(&hash.0.to_vec()) {
264 Some(data) => {
265 let metadata_key = Self::metadata_key(hash);
267 let metadata = if let Some(metadata_bytes) = db.get(&metadata_key) {
268 bincode::deserialize(metadata_bytes)?
269 } else {
270 ContentMetadata {
272 size: data.len(),
273 content_type: ContentType::DataRetrieval,
274 created_at: std::time::SystemTime::now()
275 .duration_since(std::time::UNIX_EPOCH)
276 .map(|d| d.as_secs())
277 .unwrap_or(0),
278 chunk_count: None,
279 replication_factor: 8,
280 }
281 };
282
283 let mut cache = self.cache.write().await;
285 cache.insert(
286 *hash,
287 CachedContent {
288 data: data.clone(),
289 last_access: Instant::now(),
290 access_count: 1,
291 metadata,
292 },
293 );
294
295 Ok(Some(data.clone()))
296 }
297 None => Ok(None),
298 }
299 }
300
301 pub async fn exists(&self, hash: &ContentHash) -> Result<bool> {
303 if self.cache.read().await.contains_key(hash) {
305 return Ok(true);
306 }
307
308 let db = self.db.read().await;
310 Ok(db.contains_key(&hash.0.to_vec()))
311 }
312
313 pub async fn delete(&self, hash: &ContentHash) -> Result<()> {
315 self.cache.write().await.remove(hash);
317
318 let mut db = self.db.write().await;
320 db.remove(&hash.0.to_vec());
321
322 let metadata_key = Self::metadata_key(hash);
324 db.remove(&metadata_key);
325
326 Ok(())
327 }
328
329 pub fn calculate_hash(content: &[u8]) -> ContentHash {
331 let mut hasher = Sha256::new();
332 hasher.update(content);
333 let result = hasher.finalize();
334 let mut hash_bytes = [0u8; 32];
335 hash_bytes.copy_from_slice(&result);
336 ContentHash(hash_bytes)
337 }
338
339 fn metadata_key(hash: &ContentHash) -> Vec<u8> {
341 let mut key = vec![0u8]; key.extend_from_slice(&hash.0);
343 key
344 }
345
346 pub fn get_config(&self) -> &StorageConfig {
348 &self.config
349 }
350
351 pub async fn get_stats(&self) -> StorageStats {
353 let mut stats = self.stats.read().await.clone();
354 stats.total_items = stats.total_content;
356 stats.capacity_bytes = 10 * 1024 * 1024 * 1024; stats
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct StoredContent {
365 pub hash: ContentHash,
367
368 pub data: Vec<u8>,
370
371 pub metadata: ContentMetadata,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct ChunkMetadata {
378 pub parent_hash: ContentHash,
380
381 pub chunk_index: u32,
383
384 pub total_chunks: u32,
386
387 pub chunk_size: usize,
389
390 pub chunk_hash: ContentHash,
392}
393
394#[derive(Debug, Clone)]
396pub struct Chunk {
397 pub metadata: ChunkMetadata,
399
400 pub data: Vec<u8>,
402}
403
404pub struct ChunkManager {
406 chunk_size: usize,
408}
409
410impl ChunkManager {
411 pub fn new(chunk_size: usize) -> Self {
413 Self { chunk_size }
414 }
415
416 pub fn create_chunks(&self, content: &[u8], parent_hash: ContentHash) -> Vec<Chunk> {
418 let total_chunks = content.len().div_ceil(self.chunk_size);
419
420 content
421 .chunks(self.chunk_size)
422 .enumerate()
423 .map(|(i, chunk_data)| {
424 let chunk_hash = ContentStore::calculate_hash(chunk_data);
425 Chunk {
426 metadata: ChunkMetadata {
427 parent_hash,
428 chunk_index: i as u32,
429 total_chunks: total_chunks as u32,
430 chunk_size: chunk_data.len(),
431 chunk_hash,
432 },
433 data: chunk_data.to_vec(),
434 }
435 })
436 .collect()
437 }
438
439 pub fn reassemble_chunks(chunks: Vec<Chunk>) -> Result<Vec<u8>> {
441 let mut sorted_chunks = chunks;
443 sorted_chunks.sort_by_key(|c| c.metadata.chunk_index);
444
445 if sorted_chunks.is_empty() {
447 return Err(anyhow::anyhow!("No chunks provided"));
448 }
449
450 let total_chunks = sorted_chunks[0].metadata.total_chunks;
451 if sorted_chunks.len() != total_chunks as usize {
452 return Err(anyhow::anyhow!(
453 "Missing chunks: have {}, need {}",
454 sorted_chunks.len(),
455 total_chunks
456 ));
457 }
458
459 let mut content = Vec::new();
461 for chunk in sorted_chunks {
462 content.extend(chunk.data);
463 }
464
465 Ok(content)
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use tempfile::TempDir;
473
474 #[tokio::test]
475 async fn test_content_hash_generation() {
476 let content1 = b"Hello, World!";
477 let content2 = b"Hello, World!";
478 let content3 = b"Different content";
479
480 let hash1 = ContentStore::calculate_hash(content1);
481 let hash2 = ContentStore::calculate_hash(content2);
482 let hash3 = ContentStore::calculate_hash(content3);
483
484 assert_eq!(hash1, hash2);
486
487 assert_ne!(hash1, hash3);
489 }
490
491 #[tokio::test]
492 async fn test_content_storage() {
493 let temp_dir = TempDir::new().unwrap();
494 let config = StorageConfig {
495 db_path: temp_dir.path().to_str().unwrap().to_string(),
496 ..Default::default()
497 };
498
499 let store = ContentStore::new(config).await.unwrap();
500 let content = b"Test content".to_vec();
501 let metadata = ContentMetadata {
502 size: content.len(),
503 content_type: ContentType::DataRetrieval,
504 created_at: std::time::SystemTime::now()
505 .duration_since(std::time::UNIX_EPOCH)
506 .map(|d| d.as_secs())
507 .unwrap_or(0),
508 chunk_count: None,
509 replication_factor: 8,
510 };
511
512 let hash = store.store(content.clone(), metadata).await.unwrap();
514
515 let retrieved = store.retrieve(&hash).await.unwrap();
517 assert_eq!(retrieved, Some(content));
518
519 assert!(store.exists(&hash).await.unwrap());
521
522 let stats = store.get_stats().await;
524 assert_eq!(stats.total_content, 1);
525 assert_eq!(stats.store_operations, 1);
526 assert_eq!(stats.retrieve_operations, 1);
527 assert_eq!(stats.cache_hits, 1);
528 }
529
530 #[tokio::test]
531 async fn test_chunk_manager() {
532 let chunk_size = 10;
533 let manager = ChunkManager::new(chunk_size);
534 let content = b"This is a test content that will be chunked".to_vec();
535 let parent_hash = ContentStore::calculate_hash(&content);
536
537 let chunks = manager.create_chunks(&content, parent_hash);
539 assert_eq!(chunks.len(), 5); for (i, chunk) in chunks.iter().enumerate() {
543 assert_eq!(chunk.metadata.chunk_index, i as u32);
544 assert_eq!(chunk.metadata.total_chunks, 5);
545 assert_eq!(chunk.metadata.parent_hash, parent_hash);
546 }
547
548 let reassembled = ChunkManager::reassemble_chunks(chunks).unwrap();
550 assert_eq!(reassembled, content);
551 }
552
553 #[tokio::test]
554 async fn test_cache_behavior() {
555 let temp_dir = TempDir::new().unwrap();
556 let config = StorageConfig {
557 db_path: temp_dir.path().to_str().unwrap().to_string(),
558 ..Default::default()
559 };
560
561 let store = ContentStore::new(config).await.unwrap();
562 let content = b"Cached content".to_vec();
563 let metadata = ContentMetadata {
564 size: content.len(),
565 content_type: ContentType::DataRetrieval,
566 created_at: std::time::SystemTime::now()
567 .duration_since(std::time::UNIX_EPOCH)
568 .map(|d| d.as_secs())
569 .unwrap_or(0),
570 chunk_count: None,
571 replication_factor: 8,
572 };
573
574 let hash = store.store(content.clone(), metadata).await.unwrap();
576
577 let _ = store.retrieve(&hash).await.unwrap();
579
580 let _ = store.retrieve(&hash).await.unwrap();
582
583 let stats = store.get_stats().await;
584 assert_eq!(stats.cache_hits, 2);
585 assert_eq!(stats.cache_misses, 0);
586 }
587}