saorsa_core/adaptive/
storage.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Content-addressed storage system with adaptive replication
15//!
16//! This module implements the storage and retrieval system for the adaptive P2P network,
17//! featuring:
18//! - Content-addressed storage using SHA-256
19//! - Parallel retrieval strategies
20//! - Adaptive replication based on network churn
21//! - Efficient chunk management
22//! - RocksDB persistence layer
23
24use super::*;
25use anyhow::Result;
26use serde::{Deserialize, Serialize};
27use sha2::{Digest, Sha256};
28use std::{collections::HashMap, sync::Arc, time::Instant};
29use tokio::sync::RwLock;
30
31/// Storage configuration
32#[derive(Debug, Clone)]
33pub struct StorageConfig {
34    /// Path to RocksDB database
35    pub db_path: String,
36
37    /// Maximum chunk size in bytes
38    pub chunk_size: usize,
39
40    /// Replication configuration
41    pub replication_config: ReplicationConfig,
42
43    /// Cache size in bytes
44    pub cache_size: usize,
45}
46
47impl Default for StorageConfig {
48    fn default() -> Self {
49        Self {
50            db_path: "./data/storage".to_string(),
51            chunk_size: 1024 * 1024, // 1MB chunks
52            replication_config: ReplicationConfig::default(),
53            cache_size: 100 * 1024 * 1024, // 100MB cache
54        }
55    }
56}
57
58/// Replication configuration
59#[derive(Debug, Clone)]
60pub struct ReplicationConfig {
61    /// Minimum replication factor
62    pub min_replicas: u32,
63
64    /// Maximum replication factor
65    pub max_replicas: u32,
66
67    /// Base replication factor
68    pub base_replicas: u32,
69
70    /// Churn threshold for increasing replication
71    pub churn_threshold: f64,
72}
73
74impl Default for ReplicationConfig {
75    fn default() -> Self {
76        Self {
77            min_replicas: 5,
78            max_replicas: 20,
79            base_replicas: 8,
80            churn_threshold: 0.3, // 30% churn rate
81        }
82    }
83}
84
85/// Content store for local storage
86pub struct ContentStore {
87    /// Storage backend (using in-memory for now, would be RocksDB in production)
88    db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
89
90    /// In-memory cache
91    cache: Arc<RwLock<HashMap<ContentHash, CachedContent>>>,
92
93    /// Storage configuration
94    config: StorageConfig,
95
96    /// Storage statistics
97    stats: Arc<RwLock<StorageStats>>,
98}
99
100/// Cached content with metadata
101#[derive(Debug, Clone)]
102pub struct CachedContent {
103    /// Content data
104    pub data: Vec<u8>,
105
106    /// Last access time
107    pub last_access: Instant,
108
109    /// Access count
110    pub access_count: u64,
111
112    /// Content metadata
113    pub metadata: ContentMetadata,
114}
115
116/// Content metadata
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct ContentMetadata {
119    /// Content size
120    pub size: usize,
121
122    /// Content type
123    pub content_type: ContentType,
124
125    /// Creation timestamp (Unix timestamp in seconds)
126    pub created_at: u64,
127
128    /// Number of chunks (if chunked)
129    pub chunk_count: Option<u32>,
130
131    /// Replication factor
132    pub replication_factor: u32,
133}
134
135impl Default for ContentMetadata {
136    fn default() -> Self {
137        Self {
138            size: 0,
139            content_type: ContentType::DataRetrieval,
140            created_at: std::time::SystemTime::now()
141                .duration_since(std::time::UNIX_EPOCH)
142                .map(|d| d.as_secs())
143                .unwrap_or(0),
144            chunk_count: None,
145            replication_factor: 8,
146        }
147    }
148}
149
150/// Storage statistics
151#[derive(Debug, Default, Clone)]
152pub struct StorageStats {
153    /// Total content stored
154    pub total_content: u64,
155
156    /// Total bytes stored
157    pub total_bytes: u64,
158
159    /// Cache hits
160    pub cache_hits: u64,
161
162    /// Cache misses
163    pub cache_misses: u64,
164
165    /// Storage operations
166    pub store_operations: u64,
167
168    /// Retrieval operations
169    pub retrieve_operations: u64,
170
171    /// Total number of items (alias for total_content)
172    pub total_items: u64,
173
174    /// Storage capacity in bytes
175    pub capacity_bytes: u64,
176}
177
178impl StorageStats {
179    /// Calculate storage utilization
180    pub fn utilization(&self) -> f64 {
181        if self.capacity_bytes == 0 {
182            0.0
183        } else {
184            self.total_bytes as f64 / self.capacity_bytes as f64
185        }
186    }
187}
188
189impl ContentStore {
190    /// Create a new content store
191    pub async fn new(config: StorageConfig) -> Result<Self> {
192        // For now, use in-memory storage (would be RocksDB in production)
193        let db = Arc::new(RwLock::new(HashMap::new()));
194
195        Ok(Self {
196            db,
197            cache: Arc::new(RwLock::new(HashMap::new())),
198            config,
199            stats: Arc::new(RwLock::new(StorageStats::default())),
200        })
201    }
202
203    /// Store content
204    pub async fn store(&self, content: Vec<u8>, metadata: ContentMetadata) -> Result<ContentHash> {
205        // Calculate content hash
206        let hash = Self::calculate_hash(&content);
207
208        // Store in database
209        let mut db = self.db.write().await;
210        db.insert(hash.0.to_vec(), content.clone());
211
212        // Store metadata
213        let metadata_key = Self::metadata_key(&hash);
214        let metadata_bytes = bincode::serialize(&metadata)?;
215        db.insert(metadata_key, metadata_bytes);
216
217        // Update cache
218        let content_size = metadata.size;
219        let mut cache = self.cache.write().await;
220        cache.insert(
221            hash,
222            CachedContent {
223                data: content,
224                last_access: Instant::now(),
225                access_count: 0,
226                metadata,
227            },
228        );
229
230        // Update stats
231        let mut stats = self.stats.write().await;
232        stats.total_content += 1;
233        stats.total_bytes += content_size as u64;
234        stats.store_operations += 1;
235
236        Ok(hash)
237    }
238
239    /// Retrieve content
240    pub async fn retrieve(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
241        // Check cache first
242        {
243            let mut cache = self.cache.write().await;
244            if let Some(cached) = cache.get_mut(hash) {
245                cached.last_access = Instant::now();
246                cached.access_count += 1;
247
248                let mut stats = self.stats.write().await;
249                stats.cache_hits += 1;
250                stats.retrieve_operations += 1;
251
252                return Ok(Some(cached.data.clone()));
253            }
254        }
255
256        // Cache miss - fetch from RocksDB
257        let mut stats = self.stats.write().await;
258        stats.cache_misses += 1;
259        stats.retrieve_operations += 1;
260        drop(stats);
261
262        let db = self.db.read().await;
263        match db.get(&hash.0.to_vec()) {
264            Some(data) => {
265                // Get metadata
266                let metadata_key = Self::metadata_key(hash);
267                let metadata = if let Some(metadata_bytes) = db.get(&metadata_key) {
268                    bincode::deserialize(metadata_bytes)?
269                } else {
270                    // Create default metadata if missing
271                    ContentMetadata {
272                        size: data.len(),
273                        content_type: ContentType::DataRetrieval,
274                        created_at: std::time::SystemTime::now()
275                            .duration_since(std::time::UNIX_EPOCH)
276                            .map(|d| d.as_secs())
277                            .unwrap_or(0),
278                        chunk_count: None,
279                        replication_factor: 8,
280                    }
281                };
282
283                // Update cache
284                let mut cache = self.cache.write().await;
285                cache.insert(
286                    *hash,
287                    CachedContent {
288                        data: data.clone(),
289                        last_access: Instant::now(),
290                        access_count: 1,
291                        metadata,
292                    },
293                );
294
295                Ok(Some(data.clone()))
296            }
297            None => Ok(None),
298        }
299    }
300
301    /// Check if content exists
302    pub async fn exists(&self, hash: &ContentHash) -> Result<bool> {
303        // Check cache
304        if self.cache.read().await.contains_key(hash) {
305            return Ok(true);
306        }
307
308        // Check database
309        let db = self.db.read().await;
310        Ok(db.contains_key(&hash.0.to_vec()))
311    }
312
313    /// Delete content
314    pub async fn delete(&self, hash: &ContentHash) -> Result<()> {
315        // Remove from cache
316        self.cache.write().await.remove(hash);
317
318        // Remove from database
319        let mut db = self.db.write().await;
320        db.remove(&hash.0.to_vec());
321
322        // Remove metadata
323        let metadata_key = Self::metadata_key(hash);
324        db.remove(&metadata_key);
325
326        Ok(())
327    }
328
329    /// Calculate SHA-256 hash of content
330    pub fn calculate_hash(content: &[u8]) -> ContentHash {
331        let mut hasher = Sha256::new();
332        hasher.update(content);
333        let result = hasher.finalize();
334        let mut hash_bytes = [0u8; 32];
335        hash_bytes.copy_from_slice(&result);
336        ContentHash(hash_bytes)
337    }
338
339    /// Get metadata key for a content hash
340    fn metadata_key(hash: &ContentHash) -> Vec<u8> {
341        let mut key = vec![0u8]; // Prefix for metadata
342        key.extend_from_slice(&hash.0);
343        key
344    }
345
346    /// Get storage configuration
347    pub fn get_config(&self) -> &StorageConfig {
348        &self.config
349    }
350
351    /// Get storage statistics
352    pub async fn get_stats(&self) -> StorageStats {
353        let mut stats = self.stats.read().await.clone();
354        // Ensure total_items is synchronized with total_content
355        stats.total_items = stats.total_content;
356        // Set capacity (in real implementation would get from disk)
357        stats.capacity_bytes = 10 * 1024 * 1024 * 1024; // 10GB default
358        stats
359    }
360}
361
362/// Stored content with metadata
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct StoredContent {
365    /// Content hash
366    pub hash: ContentHash,
367
368    /// Content data
369    pub data: Vec<u8>,
370
371    /// Content metadata
372    pub metadata: ContentMetadata,
373}
374
375/// Chunk metadata for large content
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct ChunkMetadata {
378    /// Parent content hash
379    pub parent_hash: ContentHash,
380
381    /// Chunk index
382    pub chunk_index: u32,
383
384    /// Total chunks
385    pub total_chunks: u32,
386
387    /// Chunk size
388    pub chunk_size: usize,
389
390    /// Chunk hash
391    pub chunk_hash: ContentHash,
392}
393
394/// Chunk of large content
395#[derive(Debug, Clone)]
396pub struct Chunk {
397    /// Chunk metadata
398    pub metadata: ChunkMetadata,
399
400    /// Chunk data
401    pub data: Vec<u8>,
402}
403
404/// Chunk manager for handling large content
405pub struct ChunkManager {
406    /// Maximum chunk size
407    chunk_size: usize,
408}
409
410impl ChunkManager {
411    /// Create a new chunk manager
412    pub fn new(chunk_size: usize) -> Self {
413        Self { chunk_size }
414    }
415
416    /// Split content into chunks
417    pub fn create_chunks(&self, content: &[u8], parent_hash: ContentHash) -> Vec<Chunk> {
418        let total_chunks = content.len().div_ceil(self.chunk_size);
419
420        content
421            .chunks(self.chunk_size)
422            .enumerate()
423            .map(|(i, chunk_data)| {
424                let chunk_hash = ContentStore::calculate_hash(chunk_data);
425                Chunk {
426                    metadata: ChunkMetadata {
427                        parent_hash,
428                        chunk_index: i as u32,
429                        total_chunks: total_chunks as u32,
430                        chunk_size: chunk_data.len(),
431                        chunk_hash,
432                    },
433                    data: chunk_data.to_vec(),
434                }
435            })
436            .collect()
437    }
438
439    /// Reassemble chunks into content
440    pub fn reassemble_chunks(chunks: Vec<Chunk>) -> Result<Vec<u8>> {
441        // Sort chunks by index
442        let mut sorted_chunks = chunks;
443        sorted_chunks.sort_by_key(|c| c.metadata.chunk_index);
444
445        // Verify we have all chunks
446        if sorted_chunks.is_empty() {
447            return Err(anyhow::anyhow!("No chunks provided"));
448        }
449
450        let total_chunks = sorted_chunks[0].metadata.total_chunks;
451        if sorted_chunks.len() != total_chunks as usize {
452            return Err(anyhow::anyhow!(
453                "Missing chunks: have {}, need {}",
454                sorted_chunks.len(),
455                total_chunks
456            ));
457        }
458
459        // Reassemble
460        let mut content = Vec::new();
461        for chunk in sorted_chunks {
462            content.extend(chunk.data);
463        }
464
465        Ok(content)
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use tempfile::TempDir;
473
474    #[tokio::test]
475    async fn test_content_hash_generation() {
476        let content1 = b"Hello, World!";
477        let content2 = b"Hello, World!";
478        let content3 = b"Different content";
479
480        let hash1 = ContentStore::calculate_hash(content1);
481        let hash2 = ContentStore::calculate_hash(content2);
482        let hash3 = ContentStore::calculate_hash(content3);
483
484        // Same content should produce same hash
485        assert_eq!(hash1, hash2);
486
487        // Different content should produce different hash
488        assert_ne!(hash1, hash3);
489    }
490
491    #[tokio::test]
492    async fn test_content_storage() {
493        let temp_dir = TempDir::new().unwrap();
494        let config = StorageConfig {
495            db_path: temp_dir.path().to_str().unwrap().to_string(),
496            ..Default::default()
497        };
498
499        let store = ContentStore::new(config).await.unwrap();
500        let content = b"Test content".to_vec();
501        let metadata = ContentMetadata {
502            size: content.len(),
503            content_type: ContentType::DataRetrieval,
504            created_at: std::time::SystemTime::now()
505                .duration_since(std::time::UNIX_EPOCH)
506                .map(|d| d.as_secs())
507                .unwrap_or(0),
508            chunk_count: None,
509            replication_factor: 8,
510        };
511
512        // Store content
513        let hash = store.store(content.clone(), metadata).await.unwrap();
514
515        // Retrieve content
516        let retrieved = store.retrieve(&hash).await.unwrap();
517        assert_eq!(retrieved, Some(content));
518
519        // Check existence
520        assert!(store.exists(&hash).await.unwrap());
521
522        // Check stats
523        let stats = store.get_stats().await;
524        assert_eq!(stats.total_content, 1);
525        assert_eq!(stats.store_operations, 1);
526        assert_eq!(stats.retrieve_operations, 1);
527        assert_eq!(stats.cache_hits, 1);
528    }
529
530    #[tokio::test]
531    async fn test_chunk_manager() {
532        let chunk_size = 10;
533        let manager = ChunkManager::new(chunk_size);
534        let content = b"This is a test content that will be chunked".to_vec();
535        let parent_hash = ContentStore::calculate_hash(&content);
536
537        // Create chunks
538        let chunks = manager.create_chunks(&content, parent_hash);
539        assert_eq!(chunks.len(), 5); // 44 bytes / 10 = 5 chunks
540
541        // Verify chunk metadata
542        for (i, chunk) in chunks.iter().enumerate() {
543            assert_eq!(chunk.metadata.chunk_index, i as u32);
544            assert_eq!(chunk.metadata.total_chunks, 5);
545            assert_eq!(chunk.metadata.parent_hash, parent_hash);
546        }
547
548        // Reassemble chunks
549        let reassembled = ChunkManager::reassemble_chunks(chunks).unwrap();
550        assert_eq!(reassembled, content);
551    }
552
553    #[tokio::test]
554    async fn test_cache_behavior() {
555        let temp_dir = TempDir::new().unwrap();
556        let config = StorageConfig {
557            db_path: temp_dir.path().to_str().unwrap().to_string(),
558            ..Default::default()
559        };
560
561        let store = ContentStore::new(config).await.unwrap();
562        let content = b"Cached content".to_vec();
563        let metadata = ContentMetadata {
564            size: content.len(),
565            content_type: ContentType::DataRetrieval,
566            created_at: std::time::SystemTime::now()
567                .duration_since(std::time::UNIX_EPOCH)
568                .map(|d| d.as_secs())
569                .unwrap_or(0),
570            chunk_count: None,
571            replication_factor: 8,
572        };
573
574        // Store content
575        let hash = store.store(content.clone(), metadata).await.unwrap();
576
577        // First retrieval (from cache)
578        let _ = store.retrieve(&hash).await.unwrap();
579
580        // Second retrieval (should hit cache)
581        let _ = store.retrieve(&hash).await.unwrap();
582
583        let stats = store.get_stats().await;
584        assert_eq!(stats.cache_hits, 2);
585        assert_eq!(stats.cache_misses, 0);
586    }
587}