Skip to main content

saorsa_fec/
storage.rs

1//! Storage backend abstraction for shard storage
2//!
3//! This module provides a trait for different storage implementations
4//! (local filesystem, memory, network, multi-backend) that work with
5//! the v0.3 shard format with 96-byte headers and CID-based addressing.
6
7use crate::config::EncryptionMode;
8use crate::FecError;
9use anyhow::Result;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15use tokio::fs;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17
18/// Content Identifier (CID) for addressing shards
19/// Uses BLAKE3 hash for content-addressable storage
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct Cid([u8; 32]);
22
23impl Cid {
24    /// Create CID from raw bytes
25    pub fn new(bytes: [u8; 32]) -> Self {
26        Self(bytes)
27    }
28
29    /// Create CID from data using BLAKE3
30    pub fn from_data(data: &[u8]) -> Self {
31        let hash = blake3::hash(data);
32        Self(*hash.as_bytes())
33    }
34
35    /// Get raw bytes
36    pub fn as_bytes(&self) -> &[u8; 32] {
37        &self.0
38    }
39
40    /// Convert to hex string
41    pub fn to_hex(&self) -> String {
42        hex::encode(self.0)
43    }
44}
45
46impl From<[u8; 32]> for Cid {
47    fn from(bytes: [u8; 32]) -> Self {
48        Self(bytes)
49    }
50}
51
52impl From<blake3::Hash> for Cid {
53    fn from(hash: blake3::Hash) -> Self {
54        Self(*hash.as_bytes())
55    }
56}
57
58/// Shard header (106 bytes fixed size) for v0.3
59///
60/// Note: With postcard serialization, the actual serialized data is smaller
61/// but we pad to 106 bytes for backwards compatibility.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ShardHeader {
64    /// Shard format version
65    pub version: u8,
66    /// Encryption mode used
67    pub encryption_mode: EncryptionMode,
68    /// FEC parameters (k, n-k)
69    pub nspec: (u8, u8),
70    /// Encrypted data size
71    pub data_size: u32,
72    /// Nonce for encryption (32 bytes)
73    pub nonce: [u8; 32],
74    /// Reserved bytes for future use
75    #[serde(with = "serde_bytes")]
76    pub reserved: Vec<u8>,
77}
78
79impl ShardHeader {
80    const SIZE: usize = 106; // Fixed header size for compatibility
81
82    /// Create new shard header
83    pub fn new(
84        encryption_mode: EncryptionMode,
85        nspec: (u8, u8),
86        data_size: u32,
87        nonce: [u8; 32],
88    ) -> Self {
89        Self {
90            version: 1,
91            encryption_mode,
92            nspec,
93            data_size,
94            nonce,
95            reserved: vec![0u8; 55],
96        }
97    }
98
99    /// Serialize to bytes (padded to fixed size)
100    pub fn to_bytes(&self) -> Result<[u8; Self::SIZE], FecError> {
101        let serialized = postcard::to_stdvec(self)
102            .map_err(|e| FecError::Backend(format!("Failed to serialize header: {}", e)))?;
103
104        // Pad to fixed size for backwards compatibility
105        let mut result = [0u8; Self::SIZE];
106        if serialized.len() > Self::SIZE {
107            return Err(FecError::Backend(format!(
108                "Header too large: {} > {}",
109                serialized.len(),
110                Self::SIZE
111            )));
112        }
113        result[..serialized.len()].copy_from_slice(&serialized);
114        // Store the actual serialized length in the last byte for parsing
115        result[Self::SIZE - 1] = serialized.len() as u8;
116        Ok(result)
117    }
118
119    /// Deserialize from bytes (handles padded format)
120    pub fn from_bytes(bytes: &[u8]) -> Result<Self, FecError> {
121        if bytes.len() != Self::SIZE {
122            return Err(FecError::Backend(format!(
123                "Invalid header size: expected {}, got {}",
124                Self::SIZE,
125                bytes.len()
126            )));
127        }
128        // Read actual length from last byte
129        let actual_len = bytes[Self::SIZE - 1] as usize;
130        if actual_len == 0 || actual_len > Self::SIZE - 1 {
131            // Fallback: try parsing the whole buffer (legacy bincode format)
132            return postcard::from_bytes(bytes)
133                .map_err(|e| FecError::Backend(format!("Failed to deserialize header: {}", e)));
134        }
135        postcard::from_bytes(&bytes[..actual_len])
136            .map_err(|e| FecError::Backend(format!("Failed to deserialize header: {}", e)))
137    }
138}
139
140/// Complete shard with header and encrypted data
141#[derive(Debug, Clone)]
142pub struct Shard {
143    /// 96-byte header
144    pub header: ShardHeader,
145    /// Encrypted data payload
146    pub data: Vec<u8>,
147}
148
149impl Shard {
150    /// Create new shard
151    pub fn new(header: ShardHeader, data: Vec<u8>) -> Self {
152        Self { header, data }
153    }
154
155    /// Get CID for this shard (computed over header + data)
156    pub fn cid(&self) -> Result<Cid, FecError> {
157        let header_bytes = self.header.to_bytes()?;
158        let mut hasher = blake3::Hasher::new();
159        hasher.update(&header_bytes);
160        hasher.update(&self.data);
161        Ok(Cid::from(hasher.finalize()))
162    }
163
164    /// Serialize shard to bytes (header + data)
165    pub fn to_bytes(&self) -> Result<Vec<u8>, FecError> {
166        let header_bytes = self.header.to_bytes()?;
167        let mut result = Vec::with_capacity(ShardHeader::SIZE + self.data.len());
168        result.extend_from_slice(&header_bytes);
169        result.extend_from_slice(&self.data);
170        Ok(result)
171    }
172
173    /// Deserialize shard from bytes
174    pub fn from_bytes(bytes: &[u8]) -> Result<Self, FecError> {
175        if bytes.len() < ShardHeader::SIZE {
176            return Err(FecError::Backend(
177                "Insufficient data for shard header".to_string(),
178            ));
179        }
180
181        let header = ShardHeader::from_bytes(&bytes[..ShardHeader::SIZE])?;
182        let data = bytes[ShardHeader::SIZE..].to_vec();
183
184        Ok(Self { header, data })
185    }
186}
187
188/// Chunk metadata as specified in v0.3
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ChunkMeta {
191    /// FEC parameters (k, n-k)
192    pub nspec: (u8, u8),
193    /// Encryption mode used
194    pub mode: EncryptionMode,
195    /// CIDs of all shards for this chunk
196    pub shard_ids: Vec<String>,
197}
198
199impl ChunkMeta {
200    /// Create new chunk metadata
201    pub fn new(nspec: (u8, u8), mode: EncryptionMode, shard_ids: Vec<String>) -> Self {
202        Self {
203            nspec,
204            mode,
205            shard_ids,
206        }
207    }
208}
209
210/// File metadata as specified in v0.3
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct FileMetadata {
213    /// File identifier
214    pub file_id: [u8; 32],
215    /// Original file size
216    pub file_size: u64,
217    /// Chunks comprising this file
218    pub chunks: Vec<ChunkMeta>,
219    /// Creation timestamp
220    pub created_at: u64,
221    /// Version number
222    pub version: u8,
223}
224
225impl FileMetadata {
226    /// Create new file metadata
227    pub fn new(file_id: [u8; 32], file_size: u64, chunks: Vec<ChunkMeta>) -> Self {
228        let created_at = std::time::SystemTime::now()
229            .duration_since(std::time::UNIX_EPOCH)
230            .map(|d| d.as_secs())
231            .unwrap_or(0);
232
233        Self {
234            file_id,
235            file_size,
236            chunks,
237            created_at,
238            version: 1,
239        }
240    }
241}
242
243/// Abstract storage backend interface for v0.3 specification
244#[async_trait]
245pub trait StorageBackend: Send + Sync {
246    /// Store a shard with the given CID
247    async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError>;
248
249    /// Retrieve a shard by CID
250    async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError>;
251
252    /// Delete a shard by CID
253    async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError>;
254
255    /// Check if a shard exists
256    async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError>;
257
258    /// List all shard CIDs in storage
259    async fn list_shards(&self) -> Result<Vec<Cid>, FecError>;
260
261    /// Store file metadata
262    async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError>;
263
264    /// Retrieve file metadata
265    async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError>;
266
267    /// Delete file metadata
268    async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError>;
269
270    /// List all file metadata
271    async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError>;
272
273    /// Get storage statistics
274    async fn stats(&self) -> Result<StorageStats, FecError>;
275
276    /// Run garbage collection
277    async fn garbage_collect(&self) -> Result<GcReport, FecError>;
278}
279
280/// Storage statistics
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct StorageStats {
283    /// Total number of shards
284    pub total_shards: u64,
285    /// Total storage size in bytes
286    pub total_size: u64,
287    /// Number of file metadata entries
288    pub metadata_count: u64,
289    /// Number of unreferenced shards
290    pub unreferenced_shards: u64,
291}
292
293/// Garbage collection report
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct GcReport {
296    /// Number of shards deleted
297    pub shards_deleted: u64,
298    /// Bytes freed
299    pub bytes_freed: u64,
300    /// Duration of GC run
301    pub duration_ms: u64,
302}
303
304/// Local filesystem storage implementation
305/// Stores shards and metadata on local filesystem with CID-based addressing
306pub struct LocalStorage {
307    /// Base directory for shard storage
308    base_path: PathBuf,
309    /// Directory for metadata storage
310    metadata_path: PathBuf,
311    /// Number of directory levels for sharding
312    shard_levels: usize,
313}
314
315impl LocalStorage {
316    /// Create a new local storage backend
317    pub async fn new(base_path: PathBuf) -> Result<Self, FecError> {
318        let metadata_path = base_path.join("metadata");
319
320        fs::create_dir_all(&base_path).await.map_err(FecError::Io)?;
321        fs::create_dir_all(&metadata_path)
322            .await
323            .map_err(FecError::Io)?;
324
325        Ok(Self {
326            base_path,
327            metadata_path,
328            shard_levels: 2, // Use 2 levels of sharding by default
329        })
330    }
331
332    /// Get the path for a shard based on its CID
333    fn shard_path(&self, cid: &Cid) -> PathBuf {
334        let hex = cid.to_hex();
335
336        // Create sharded path (e.g., ab/cd/abcdef...)
337        let mut path = self.base_path.join("shards");
338
339        for level in 0..self.shard_levels {
340            if hex.len() > level * 2 + 2 {
341                path = path.join(&hex[level * 2..level * 2 + 2]);
342            }
343        }
344
345        path.join(format!("{}.shard", hex))
346    }
347
348    /// Get the path for file metadata
349    fn metadata_file_path(&self, file_id: &[u8; 32]) -> PathBuf {
350        let hex = hex::encode(file_id);
351        self.metadata_path.join(format!("{}.meta", hex))
352    }
353
354    /// Ensure parent directory exists
355    async fn ensure_parent(&self, path: &Path) -> Result<(), FecError> {
356        if let Some(parent) = path.parent() {
357            fs::create_dir_all(parent).await.map_err(FecError::Io)?;
358        }
359        Ok(())
360    }
361}
362
363#[async_trait]
364impl StorageBackend for LocalStorage {
365    async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
366        let path = self.shard_path(cid);
367
368        // Ensure parent directory exists
369        self.ensure_parent(&path).await?;
370
371        // Serialize shard to bytes
372        let shard_bytes = shard.to_bytes()?;
373
374        // Write shard atomically using temp file
375        let temp_path = path.with_extension("tmp");
376
377        let mut file = fs::File::create(&temp_path).await.map_err(FecError::Io)?;
378
379        file.write_all(&shard_bytes).await.map_err(FecError::Io)?;
380
381        file.sync_all().await.map_err(FecError::Io)?;
382
383        // Atomic rename
384        fs::rename(temp_path, path).await.map_err(FecError::Io)?;
385
386        Ok(())
387    }
388
389    async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
390        let path = self.shard_path(cid);
391
392        let mut file = fs::File::open(&path).await.map_err(|e| {
393            FecError::Backend(format!("Failed to open shard file {:?}: {}", path, e))
394        })?;
395
396        let mut data = Vec::new();
397        file.read_to_end(&mut data).await.map_err(FecError::Io)?;
398
399        Shard::from_bytes(&data)
400    }
401
402    async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
403        let path = self.shard_path(cid);
404
405        if path.exists() {
406            fs::remove_file(path).await.map_err(FecError::Io)?;
407        }
408
409        Ok(())
410    }
411
412    async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
413        let path = self.shard_path(cid);
414        Ok(path.exists())
415    }
416
417    async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
418        let mut shards = Vec::new();
419        let shards_dir = self.base_path.join("shards");
420
421        // Walk directory tree
422        let mut stack = vec![shards_dir];
423
424        while let Some(dir) = stack.pop() {
425            if !dir.exists() {
426                continue;
427            }
428
429            let mut entries = fs::read_dir(&dir).await.map_err(|e| {
430                FecError::Backend(format!("Failed to read directory {:?}: {}", dir, e))
431            })?;
432
433            while let Some(entry) = entries.next_entry().await.map_err(FecError::Io)? {
434                let path = entry.path();
435
436                if path.is_dir() {
437                    stack.push(path);
438                } else if let Some(name) = path.file_name() {
439                    if let Some(name_str) = name.to_str() {
440                        if name_str.ends_with(".shard") {
441                            // Extract hex CID from filename
442                            let hex = name_str.trim_end_matches(".shard");
443                            if let Ok(cid_bytes) = hex::decode(hex) {
444                                if cid_bytes.len() == 32 {
445                                    let mut cid_array = [0u8; 32];
446                                    cid_array.copy_from_slice(&cid_bytes);
447                                    shards.push(Cid::new(cid_array));
448                                }
449                            }
450                        }
451                    }
452                }
453            }
454        }
455        Ok(shards)
456    }
457
458    async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
459        let path = self.metadata_file_path(&metadata.file_id);
460
461        let serialized = postcard::to_stdvec(metadata)
462            .map_err(|e| FecError::Backend(format!("Failed to serialize metadata: {}", e)))?;
463
464        let temp_path = path.with_extension("tmp");
465
466        let mut file = fs::File::create(&temp_path).await.map_err(FecError::Io)?;
467
468        file.write_all(&serialized).await.map_err(FecError::Io)?;
469
470        file.sync_all().await.map_err(FecError::Io)?;
471
472        // Atomic rename
473        fs::rename(temp_path, path).await.map_err(FecError::Io)?;
474
475        Ok(())
476    }
477
478    async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
479        let path = self.metadata_file_path(file_id);
480
481        let data = fs::read(&path).await.map_err(|e| {
482            FecError::Backend(format!("Failed to read metadata file {:?}: {}", path, e))
483        })?;
484
485        postcard::from_bytes(&data)
486            .map_err(|e| FecError::Backend(format!("Failed to deserialize metadata: {}", e)))
487    }
488
489    async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
490        let path = self.metadata_file_path(file_id);
491
492        if path.exists() {
493            fs::remove_file(path).await.map_err(FecError::Io)?;
494        }
495
496        Ok(())
497    }
498
499    async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
500        let mut metadata_list = Vec::new();
501
502        let mut entries = fs::read_dir(&self.metadata_path)
503            .await
504            .map_err(FecError::Io)?;
505
506        while let Some(entry) = entries.next_entry().await.map_err(FecError::Io)? {
507            let path = entry.path();
508            if let Some(name) = path.file_name() {
509                if let Some(name_str) = name.to_str() {
510                    if name_str.ends_with(".meta") {
511                        let data = fs::read(&path).await.map_err(FecError::Io)?;
512                        if let Ok(metadata) = postcard::from_bytes::<FileMetadata>(&data) {
513                            metadata_list.push(metadata);
514                        }
515                    }
516                }
517            }
518        }
519
520        Ok(metadata_list)
521    }
522    async fn stats(&self) -> Result<StorageStats, FecError> {
523        let shards = self.list_shards().await?;
524        let metadata = self.list_metadata().await?;
525
526        // Calculate total size by reading all shards
527        let mut total_size = 0u64;
528        for cid in &shards {
529            if let Ok(shard) = self.get_shard(cid).await {
530                total_size += shard.data.len() as u64 + ShardHeader::SIZE as u64;
531            }
532        }
533
534        // Count unreferenced shards (shards not referenced in any metadata)
535        let mut referenced_cids = std::collections::HashSet::new();
536        for meta in &metadata {
537            for chunk in &meta.chunks {
538                for shard_id in &chunk.shard_ids {
539                    if let Ok(cid_bytes) = hex::decode(shard_id) {
540                        if cid_bytes.len() == 32 {
541                            let mut cid_array = [0u8; 32];
542                            cid_array.copy_from_slice(&cid_bytes);
543                            referenced_cids.insert(Cid::new(cid_array));
544                        }
545                    }
546                }
547            }
548        }
549
550        let unreferenced_shards = shards
551            .iter()
552            .filter(|cid| !referenced_cids.contains(cid))
553            .count() as u64;
554
555        Ok(StorageStats {
556            total_shards: shards.len() as u64,
557            total_size,
558            metadata_count: metadata.len() as u64,
559            unreferenced_shards,
560        })
561    }
562
563    async fn garbage_collect(&self) -> Result<GcReport, FecError> {
564        let start_time = std::time::Instant::now();
565        let mut shards_deleted = 0u64;
566        let mut bytes_freed = 0u64;
567
568        // Get all shards and metadata
569        let shards = self.list_shards().await?;
570        let metadata = self.list_metadata().await?;
571
572        // Build set of referenced shards
573        let mut referenced_cids = std::collections::HashSet::new();
574        for meta in &metadata {
575            for chunk in &meta.chunks {
576                for shard_id in &chunk.shard_ids {
577                    if let Ok(cid_bytes) = hex::decode(shard_id) {
578                        if cid_bytes.len() == 32 {
579                            let mut cid_array = [0u8; 32];
580                            cid_array.copy_from_slice(&cid_bytes);
581                            referenced_cids.insert(Cid::new(cid_array));
582                        }
583                    }
584                }
585            }
586        }
587
588        // Delete unreferenced shards
589        for cid in shards {
590            if !referenced_cids.contains(&cid) {
591                if let Ok(shard) = self.get_shard(&cid).await {
592                    let shard_size = shard.data.len() as u64 + ShardHeader::SIZE as u64;
593                    if self.delete_shard(&cid).await.is_ok() {
594                        shards_deleted += 1;
595                        bytes_freed += shard_size;
596                    }
597                }
598            }
599        }
600
601        let duration_ms = start_time.elapsed().as_millis() as u64;
602
603        Ok(GcReport {
604            shards_deleted,
605            bytes_freed,
606            duration_ms,
607        })
608    }
609}
610
611/// In-memory storage implementation for testing and caching
612/// Stores shards and metadata in HashMap structures
613pub struct MemoryStorage {
614    /// In-memory shard storage
615    shards: Arc<RwLock<HashMap<Cid, Shard>>>,
616    /// In-memory metadata storage
617    metadata: Arc<RwLock<HashMap<[u8; 32], FileMetadata>>>,
618}
619
620impl MemoryStorage {
621    /// Create a new memory storage backend
622    pub fn new() -> Self {
623        Self {
624            shards: Arc::new(RwLock::new(HashMap::new())),
625            metadata: Arc::new(RwLock::new(HashMap::new())),
626        }
627    }
628
629    /// Clear all stored data
630    pub fn clear(&self) {
631        // Handle poisoned locks by recovering the data
632        match self.shards.write() {
633            Ok(mut guard) => guard.clear(),
634            Err(poisoned) => poisoned.into_inner().clear(),
635        }
636        match self.metadata.write() {
637            Ok(mut guard) => guard.clear(),
638            Err(poisoned) => poisoned.into_inner().clear(),
639        }
640    }
641
642    /// Get the number of stored shards
643    pub fn shard_count(&self) -> usize {
644        match self.shards.read() {
645            Ok(guard) => guard.len(),
646            Err(poisoned) => poisoned.into_inner().len(),
647        }
648    }
649
650    /// Get the number of stored metadata entries
651    pub fn metadata_count(&self) -> usize {
652        match self.metadata.read() {
653            Ok(guard) => guard.len(),
654            Err(poisoned) => poisoned.into_inner().len(),
655        }
656    }
657}
658
659impl Default for MemoryStorage {
660    fn default() -> Self {
661        Self::new()
662    }
663}
664
665#[async_trait]
666impl StorageBackend for MemoryStorage {
667    async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
668        let mut shards = match self.shards.write() {
669            Ok(guard) => guard,
670            Err(poisoned) => poisoned.into_inner(),
671        };
672        shards.insert(*cid, shard.clone());
673        Ok(())
674    }
675
676    async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
677        let shards = match self.shards.read() {
678            Ok(guard) => guard,
679            Err(poisoned) => poisoned.into_inner(),
680        };
681        shards
682            .get(cid)
683            .cloned()
684            .ok_or_else(|| FecError::Backend(format!("Shard not found: {}", cid.to_hex())))
685    }
686
687    async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
688        let mut shards = match self.shards.write() {
689            Ok(guard) => guard,
690            Err(poisoned) => poisoned.into_inner(),
691        };
692        shards.remove(cid);
693        Ok(())
694    }
695
696    async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
697        let shards = match self.shards.read() {
698            Ok(guard) => guard,
699            Err(poisoned) => poisoned.into_inner(),
700        };
701        Ok(shards.contains_key(cid))
702    }
703
704    async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
705        let shards = match self.shards.read() {
706            Ok(guard) => guard,
707            Err(poisoned) => poisoned.into_inner(),
708        };
709        Ok(shards.keys().copied().collect())
710    }
711
712    async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
713        let mut metadata_store = match self.metadata.write() {
714            Ok(guard) => guard,
715            Err(poisoned) => poisoned.into_inner(),
716        };
717        metadata_store.insert(metadata.file_id, metadata.clone());
718        Ok(())
719    }
720
721    async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
722        let metadata_store = match self.metadata.read() {
723            Ok(guard) => guard,
724            Err(poisoned) => poisoned.into_inner(),
725        };
726        metadata_store.get(file_id).cloned().ok_or_else(|| {
727            FecError::Backend(format!("Metadata not found: {}", hex::encode(file_id)))
728        })
729    }
730
731    async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
732        let mut metadata_store = match self.metadata.write() {
733            Ok(guard) => guard,
734            Err(poisoned) => poisoned.into_inner(),
735        };
736        metadata_store.remove(file_id);
737        Ok(())
738    }
739
740    async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
741        let metadata_store = match self.metadata.read() {
742            Ok(guard) => guard,
743            Err(poisoned) => poisoned.into_inner(),
744        };
745        Ok(metadata_store.values().cloned().collect())
746    }
747
748    async fn stats(&self) -> Result<StorageStats, FecError> {
749        let shards = match self.shards.read() {
750            Ok(guard) => guard,
751            Err(poisoned) => poisoned.into_inner(),
752        };
753        let metadata = match self.metadata.read() {
754            Ok(guard) => guard,
755            Err(poisoned) => poisoned.into_inner(),
756        };
757
758        let total_size: u64 = shards
759            .values()
760            .map(|shard| shard.data.len() as u64 + ShardHeader::SIZE as u64)
761            .sum();
762
763        // Count unreferenced shards
764        let mut referenced_cids = std::collections::HashSet::new();
765        for meta in metadata.values() {
766            for chunk in &meta.chunks {
767                for shard_id in &chunk.shard_ids {
768                    if let Ok(cid_bytes) = hex::decode(shard_id) {
769                        if cid_bytes.len() == 32 {
770                            let mut cid_array = [0u8; 32];
771                            cid_array.copy_from_slice(&cid_bytes);
772                            referenced_cids.insert(Cid::new(cid_array));
773                        }
774                    }
775                }
776            }
777        }
778
779        let unreferenced_shards = shards
780            .keys()
781            .filter(|cid| !referenced_cids.contains(cid))
782            .count() as u64;
783
784        Ok(StorageStats {
785            total_shards: shards.len() as u64,
786            total_size,
787            metadata_count: metadata.len() as u64,
788            unreferenced_shards,
789        })
790    }
791
792    async fn garbage_collect(&self) -> Result<GcReport, FecError> {
793        let start_time = std::time::Instant::now();
794        let mut shards_deleted = 0u64;
795        let mut bytes_freed = 0u64;
796
797        // Get snapshot of current state
798        let shards = match self.shards.read() {
799            Ok(guard) => guard.clone(),
800            Err(poisoned) => poisoned.into_inner().clone(),
801        };
802        let metadata = match self.metadata.read() {
803            Ok(guard) => guard.clone(),
804            Err(poisoned) => poisoned.into_inner().clone(),
805        };
806
807        // Build set of referenced shards
808        let mut referenced_cids = std::collections::HashSet::new();
809        for meta in metadata.values() {
810            for chunk in &meta.chunks {
811                for shard_id in &chunk.shard_ids {
812                    if let Ok(cid_bytes) = hex::decode(shard_id) {
813                        if cid_bytes.len() == 32 {
814                            let mut cid_array = [0u8; 32];
815                            cid_array.copy_from_slice(&cid_bytes);
816                            referenced_cids.insert(Cid::new(cid_array));
817                        }
818                    }
819                }
820            }
821        }
822
823        // Delete unreferenced shards
824        let mut shards_write = match self.shards.write() {
825            Ok(guard) => guard,
826            Err(poisoned) => poisoned.into_inner(),
827        };
828        for (cid, shard) in shards {
829            if !referenced_cids.contains(&cid) {
830                let shard_size = shard.data.len() as u64 + ShardHeader::SIZE as u64;
831                shards_write.remove(&cid);
832                shards_deleted += 1;
833                bytes_freed += shard_size;
834            }
835        }
836        drop(shards_write);
837
838        let duration_ms = start_time.elapsed().as_millis() as u64;
839
840        Ok(GcReport {
841            shards_deleted,
842            bytes_freed,
843            duration_ms,
844        })
845    }
846}
847
848/// Network storage node endpoint
849#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
850pub struct NodeEndpoint {
851    /// Node address (IP or hostname)
852    pub address: String,
853    /// Node port
854    pub port: u16,
855    /// Optional node ID
856    pub node_id: Option<[u8; 32]>,
857}
858
859/// Network-based storage implementation
860pub struct NetworkStorage {
861    /// List of storage nodes
862    nodes: Vec<NodeEndpoint>,
863    /// Replication factor
864    replication: usize,
865}
866
867impl NetworkStorage {
868    /// Create a new network storage backend
869    pub fn new(nodes: Vec<NodeEndpoint>, replication: usize) -> Self {
870        Self { nodes, replication }
871    }
872
873    /// Select nodes for storing a shard
874    fn select_nodes(&self, shard_id: &[u8; 32]) -> Vec<&NodeEndpoint> {
875        // Simple deterministic selection based on shard ID
876        let mut selected = Vec::new();
877        let target_count = self.replication.min(self.nodes.len());
878
879        // Use different parts of the hash to select unique nodes
880        for i in 0..target_count {
881            let hash_offset = i * 4;
882            let index = if hash_offset + 3 < shard_id.len() {
883                u32::from_le_bytes([
884                    shard_id[hash_offset],
885                    shard_id[hash_offset + 1],
886                    shard_id[hash_offset + 2],
887                    shard_id[hash_offset + 3],
888                ]) as usize
889            } else {
890                // Use XOR of all bytes if we run out of unique positions
891                shard_id
892                    .iter()
893                    .enumerate()
894                    .map(|(j, &b)| (j + i) * b as usize)
895                    .sum::<usize>()
896            };
897
898            let mut node_index = index % self.nodes.len();
899            let mut attempts = 0;
900
901            // Find a node we haven't selected yet
902            while selected.iter().any(|n| *n == &self.nodes[node_index])
903                && attempts < self.nodes.len()
904            {
905                node_index = (node_index + 1) % self.nodes.len();
906                attempts += 1;
907            }
908
909            if attempts < self.nodes.len() {
910                selected.push(&self.nodes[node_index]);
911            }
912        }
913
914        selected
915    }
916}
917
918#[async_trait]
919impl StorageBackend for NetworkStorage {
920    async fn put_shard(&self, cid: &Cid, _shard: &Shard) -> Result<(), FecError> {
921        let nodes = self.select_nodes(cid.as_bytes());
922
923        if nodes.is_empty() {
924            return Err(FecError::Backend(
925                "No nodes available for storage".to_string(),
926            ));
927        }
928
929        // Store to selected nodes
930        let mut success_count = 0;
931
932        for node in nodes {
933            // In a real implementation, this would make network calls
934            // For now, we'll simulate success
935            tracing::debug!(
936                "Storing shard {} to node: {}:{}",
937                cid.to_hex(),
938                node.address,
939                node.port
940            );
941            success_count += 1;
942        }
943
944        if success_count == 0 {
945            return Err(FecError::Backend(
946                "Failed to store shard to any node".to_string(),
947            ));
948        }
949
950        Ok(())
951    }
952
953    async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
954        let nodes = self.select_nodes(cid.as_bytes());
955
956        if let Some(node) = nodes.into_iter().next() {
957            // Try to retrieve from the first node
958            // In a real implementation, this would make network calls
959            tracing::debug!(
960                "Retrieving shard {} from node: {}:{}",
961                cid.to_hex(),
962                node.address,
963                node.port
964            );
965
966            // Simulate successful retrieval with dummy data
967            let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 1024, [0u8; 32]);
968            let shard = Shard::new(header, vec![0u8; 1024]);
969            return Ok(shard);
970        }
971
972        Err(FecError::Backend("Shard not found on any node".to_string()))
973    }
974
975    async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
976        let nodes = self.select_nodes(cid.as_bytes());
977
978        for node in nodes {
979            // Delete from each node
980            tracing::debug!(
981                "Deleting shard {} from node: {}:{}",
982                cid.to_hex(),
983                node.address,
984                node.port
985            );
986        }
987
988        Ok(())
989    }
990
991    async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
992        let nodes = self.select_nodes(cid.as_bytes());
993
994        if let Some(node) = nodes.into_iter().next() {
995            // Check the first node
996            tracing::debug!(
997                "Checking shard {} on node: {}:{}",
998                cid.to_hex(),
999                node.address,
1000                node.port
1001            );
1002            return Ok(true); // Simulate found
1003        }
1004
1005        Ok(false)
1006    }
1007
1008    async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
1009        // This would require querying all nodes and deduplicating
1010        // For now, return empty list
1011        Ok(Vec::new())
1012    }
1013
1014    async fn put_metadata(&self, _metadata: &FileMetadata) -> Result<(), FecError> {
1015        // In a real implementation, this would distribute metadata across nodes
1016        // For now, simulate success
1017        Ok(())
1018    }
1019
1020    async fn get_metadata(&self, _file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
1021        // In a real implementation, this would query nodes for metadata
1022        // For now, return an error
1023        Err(FecError::Backend(
1024            "Network metadata retrieval not implemented".to_string(),
1025        ))
1026    }
1027
1028    async fn delete_metadata(&self, _file_id: &[u8; 32]) -> Result<(), FecError> {
1029        // In a real implementation, this would delete from all nodes
1030        Ok(())
1031    }
1032
1033    async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
1034        // This would require querying all nodes
1035        Ok(Vec::new())
1036    }
1037
1038    async fn stats(&self) -> Result<StorageStats, FecError> {
1039        // In a real implementation, this would aggregate stats from all nodes
1040        Ok(StorageStats {
1041            total_shards: 0,
1042            total_size: 0,
1043            metadata_count: 0,
1044            unreferenced_shards: 0,
1045        })
1046    }
1047
1048    async fn garbage_collect(&self) -> Result<GcReport, FecError> {
1049        // In a real implementation, this would trigger GC on all nodes
1050        Ok(GcReport {
1051            shards_deleted: 0,
1052            bytes_freed: 0,
1053            duration_ms: 0,
1054        })
1055    }
1056}
1057
1058/// Multi-backend storage that combines multiple backends for redundancy and load balancing
1059/// Implements failover capabilities and load distribution
1060pub struct MultiStorage {
1061    /// Ordered list of storage backends (priority order)
1062    backends: Vec<Arc<dyn StorageBackend>>,
1063    /// Strategy for backend selection
1064    strategy: MultiStorageStrategy,
1065}
1066
1067/// Strategy for multi-backend operations
1068#[derive(Debug, Clone)]
1069pub enum MultiStorageStrategy {
1070    /// Write to all backends, read from first available
1071    Redundant,
1072    /// Load balance across backends
1073    LoadBalance,
1074    /// Use primary backend with failover to secondary
1075    Failover,
1076}
1077
1078impl MultiStorage {
1079    /// Create a new multi-backend storage with redundant strategy
1080    pub fn new(backends: Vec<Arc<dyn StorageBackend>>) -> Self {
1081        Self {
1082            backends,
1083            strategy: MultiStorageStrategy::Redundant,
1084        }
1085    }
1086
1087    /// Create with specific strategy
1088    pub fn with_strategy(
1089        backends: Vec<Arc<dyn StorageBackend>>,
1090        strategy: MultiStorageStrategy,
1091    ) -> Self {
1092        Self { backends, strategy }
1093    }
1094
1095    /// Add a backend
1096    pub fn add_backend(&mut self, backend: Arc<dyn StorageBackend>) {
1097        self.backends.push(backend);
1098    }
1099
1100    /// Remove a backend
1101    pub fn remove_backend(&mut self, index: usize) -> Option<Arc<dyn StorageBackend>> {
1102        if index < self.backends.len() {
1103            Some(self.backends.remove(index))
1104        } else {
1105            None
1106        }
1107    }
1108
1109    /// Get number of backends
1110    pub fn backend_count(&self) -> usize {
1111        self.backends.len()
1112    }
1113}
1114
1115#[async_trait]
1116impl StorageBackend for MultiStorage {
1117    async fn put_shard(&self, cid: &Cid, shard: &Shard) -> Result<(), FecError> {
1118        match self.strategy {
1119            MultiStorageStrategy::Redundant => {
1120                // Store in all backends
1121                let mut success_count = 0;
1122                let mut last_error = None;
1123
1124                for backend in &self.backends {
1125                    match backend.put_shard(cid, shard).await {
1126                        Ok(()) => success_count += 1,
1127                        Err(e) => {
1128                            tracing::warn!("Failed to store shard in backend: {}", e);
1129                            last_error = Some(e);
1130                        }
1131                    }
1132                }
1133
1134                if success_count > 0 {
1135                    Ok(())
1136                } else if let Some(e) = last_error {
1137                    Err(e)
1138                } else {
1139                    Err(FecError::Backend("No backends available".to_string()))
1140                }
1141            }
1142            MultiStorageStrategy::LoadBalance => {
1143                // Select backend based on CID hash
1144                let index = cid.as_bytes()[0] as usize % self.backends.len();
1145                self.backends[index].put_shard(cid, shard).await
1146            }
1147            MultiStorageStrategy::Failover => {
1148                // Try primary backend first, then failover
1149                for backend in &self.backends {
1150                    match backend.put_shard(cid, shard).await {
1151                        Ok(()) => return Ok(()),
1152                        Err(e) => {
1153                            tracing::warn!("Backend failed, trying next: {}", e);
1154                        }
1155                    }
1156                }
1157                Err(FecError::Backend("All backends failed".to_string()))
1158            }
1159        }
1160    }
1161
1162    async fn get_shard(&self, cid: &Cid) -> Result<Shard, FecError> {
1163        // Try each backend in order until we find the shard
1164        for backend in &self.backends {
1165            match backend.get_shard(cid).await {
1166                Ok(shard) => return Ok(shard),
1167                Err(e) => {
1168                    tracing::debug!("Backend failed to get shard: {}", e);
1169                }
1170            }
1171        }
1172
1173        Err(FecError::Backend(
1174            "Shard not found in any backend".to_string(),
1175        ))
1176    }
1177
1178    async fn delete_shard(&self, cid: &Cid) -> Result<(), FecError> {
1179        // Delete from all backends that have it
1180        for backend in &self.backends {
1181            if let Err(e) = backend.delete_shard(cid).await {
1182                tracing::warn!("Failed to delete shard from backend: {}", e);
1183            }
1184        }
1185        Ok(())
1186    }
1187
1188    async fn has_shard(&self, cid: &Cid) -> Result<bool, FecError> {
1189        // Check if any backend has the shard
1190        for backend in &self.backends {
1191            if backend.has_shard(cid).await? {
1192                return Ok(true);
1193            }
1194        }
1195        Ok(false)
1196    }
1197
1198    async fn list_shards(&self) -> Result<Vec<Cid>, FecError> {
1199        let mut all_shards = std::collections::HashSet::new();
1200
1201        // Collect from all backends
1202        for backend in &self.backends {
1203            if let Ok(shards) = backend.list_shards().await {
1204                all_shards.extend(shards);
1205            }
1206        }
1207
1208        Ok(all_shards.into_iter().collect())
1209    }
1210
1211    async fn put_metadata(&self, metadata: &FileMetadata) -> Result<(), FecError> {
1212        match self.strategy {
1213            MultiStorageStrategy::Redundant => {
1214                // Store in all backends
1215                let mut success_count = 0;
1216                let mut last_error = None;
1217
1218                for backend in &self.backends {
1219                    match backend.put_metadata(metadata).await {
1220                        Ok(()) => success_count += 1,
1221                        Err(e) => {
1222                            tracing::warn!("Failed to store metadata in backend: {}", e);
1223                            last_error = Some(e);
1224                        }
1225                    }
1226                }
1227
1228                if success_count > 0 {
1229                    Ok(())
1230                } else if let Some(e) = last_error {
1231                    Err(e)
1232                } else {
1233                    Err(FecError::Backend("No backends available".to_string()))
1234                }
1235            }
1236            MultiStorageStrategy::LoadBalance => {
1237                // Select backend based on file_id hash
1238                let index = metadata.file_id[0] as usize % self.backends.len();
1239                self.backends[index].put_metadata(metadata).await
1240            }
1241            MultiStorageStrategy::Failover => {
1242                // Try primary backend first, then failover
1243                for backend in &self.backends {
1244                    match backend.put_metadata(metadata).await {
1245                        Ok(()) => return Ok(()),
1246                        Err(e) => {
1247                            tracing::warn!("Backend failed, trying next: {}", e);
1248                        }
1249                    }
1250                }
1251                Err(FecError::Backend("All backends failed".to_string()))
1252            }
1253        }
1254    }
1255
1256    async fn get_metadata(&self, file_id: &[u8; 32]) -> Result<FileMetadata, FecError> {
1257        // Try each backend in order
1258        for backend in &self.backends {
1259            match backend.get_metadata(file_id).await {
1260                Ok(metadata) => return Ok(metadata),
1261                Err(e) => {
1262                    tracing::debug!("Backend failed to get metadata: {}", e);
1263                }
1264            }
1265        }
1266
1267        Err(FecError::Backend(
1268            "Metadata not found in any backend".to_string(),
1269        ))
1270    }
1271
1272    async fn delete_metadata(&self, file_id: &[u8; 32]) -> Result<(), FecError> {
1273        // Delete from all backends
1274        for backend in &self.backends {
1275            if let Err(e) = backend.delete_metadata(file_id).await {
1276                tracing::warn!("Failed to delete metadata from backend: {}", e);
1277            }
1278        }
1279        Ok(())
1280    }
1281
1282    async fn list_metadata(&self) -> Result<Vec<FileMetadata>, FecError> {
1283        let mut all_metadata = std::collections::HashMap::new();
1284
1285        // Collect from all backends, deduplicating by file_id
1286        for backend in &self.backends {
1287            if let Ok(metadata_list) = backend.list_metadata().await {
1288                for metadata in metadata_list {
1289                    all_metadata.insert(metadata.file_id, metadata);
1290                }
1291            }
1292        }
1293
1294        Ok(all_metadata.into_values().collect())
1295    }
1296
1297    async fn stats(&self) -> Result<StorageStats, FecError> {
1298        let mut combined_stats = StorageStats {
1299            total_shards: 0,
1300            total_size: 0,
1301            metadata_count: 0,
1302            unreferenced_shards: 0,
1303        };
1304
1305        // Aggregate stats from all backends
1306        for backend in &self.backends {
1307            if let Ok(stats) = backend.stats().await {
1308                combined_stats.total_shards += stats.total_shards;
1309                combined_stats.total_size += stats.total_size;
1310                combined_stats.metadata_count += stats.metadata_count;
1311                combined_stats.unreferenced_shards += stats.unreferenced_shards;
1312            }
1313        }
1314
1315        Ok(combined_stats)
1316    }
1317
1318    async fn garbage_collect(&self) -> Result<GcReport, FecError> {
1319        let mut combined_report = GcReport {
1320            shards_deleted: 0,
1321            bytes_freed: 0,
1322            duration_ms: 0,
1323        };
1324
1325        let start_time = std::time::Instant::now();
1326
1327        // Run GC on all backends
1328        for backend in &self.backends {
1329            if let Ok(report) = backend.garbage_collect().await {
1330                combined_report.shards_deleted += report.shards_deleted;
1331                combined_report.bytes_freed += report.bytes_freed;
1332            }
1333        }
1334
1335        combined_report.duration_ms = start_time.elapsed().as_millis() as u64;
1336
1337        Ok(combined_report)
1338    }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343    use super::*;
1344    use tempfile::TempDir;
1345
1346    #[tokio::test]
1347    async fn test_local_storage_roundtrip() {
1348        let temp_dir = TempDir::new().unwrap();
1349        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
1350            .await
1351            .unwrap();
1352
1353        let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 13, [1u8; 32]);
1354        let shard = Shard::new(header, b"Hello, World!".to_vec());
1355        let cid = shard.cid().unwrap();
1356
1357        // Store shard
1358        storage.put_shard(&cid, &shard).await.unwrap();
1359
1360        // Verify it exists
1361        assert!(storage.has_shard(&cid).await.unwrap());
1362
1363        // Retrieve shard
1364        let retrieved = storage.get_shard(&cid).await.unwrap();
1365        assert_eq!(retrieved.data, shard.data);
1366
1367        // Delete shard
1368        storage.delete_shard(&cid).await.unwrap();
1369        assert!(!storage.has_shard(&cid).await.unwrap());
1370    }
1371
1372    #[tokio::test]
1373    async fn test_local_storage_list() {
1374        let temp_dir = TempDir::new().unwrap();
1375        let storage = LocalStorage::new(temp_dir.path().to_path_buf())
1376            .await
1377            .unwrap();
1378
1379        // Store multiple shards
1380        let mut shards = Vec::new();
1381        let mut cids = Vec::new();
1382
1383        for i in 1..=3 {
1384            let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 4, [i; 32]);
1385            let shard = Shard::new(header, b"data".to_vec());
1386            let cid = shard.cid().unwrap();
1387            storage.put_shard(&cid, &shard).await.unwrap();
1388            shards.push(shard);
1389            cids.push(cid);
1390        }
1391
1392        // List shards
1393        let listed = storage.list_shards().await.unwrap();
1394        assert_eq!(listed.len(), 3);
1395
1396        for cid in cids {
1397            assert!(listed.contains(&cid));
1398        }
1399    }
1400
1401    #[test]
1402    fn test_network_storage_node_selection() {
1403        let nodes = vec![
1404            NodeEndpoint {
1405                address: "node1".to_string(),
1406                port: 8080,
1407                node_id: None,
1408            },
1409            NodeEndpoint {
1410                address: "node2".to_string(),
1411                port: 8080,
1412                node_id: None,
1413            },
1414            NodeEndpoint {
1415                address: "node3".to_string(),
1416                port: 8080,
1417                node_id: None,
1418            },
1419        ];
1420
1421        let storage = NetworkStorage::new(nodes, 2);
1422
1423        let shard_id = [42u8; 32];
1424        let selected = storage.select_nodes(&shard_id);
1425
1426        assert_eq!(selected.len(), 2);
1427
1428        // Should select same nodes for same shard ID
1429        let selected2 = storage.select_nodes(&shard_id);
1430        assert_eq!(selected, selected2);
1431
1432        // Different shard should select different nodes (probably)
1433        let shard_id2 = [99u8; 32];
1434        let selected3 = storage.select_nodes(&shard_id2);
1435        // May or may not be different, but should be deterministic
1436        assert_eq!(selected3.len(), 2);
1437    }
1438
1439    #[tokio::test]
1440    async fn test_multi_storage() {
1441        let temp_dir1 = TempDir::new().unwrap();
1442        let temp_dir2 = TempDir::new().unwrap();
1443
1444        let backend1 = Arc::new(
1445            LocalStorage::new(temp_dir1.path().to_path_buf())
1446                .await
1447                .unwrap(),
1448        );
1449        let backend2 = Arc::new(
1450            LocalStorage::new(temp_dir2.path().to_path_buf())
1451                .await
1452                .unwrap(),
1453        );
1454
1455        let multi = MultiStorage::new(vec![backend1.clone(), backend2.clone()]);
1456
1457        let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 9, [42u8; 32]);
1458        let shard = Shard::new(header, b"Test data".to_vec());
1459        let cid = shard.cid().unwrap();
1460
1461        // Store through multi-backend
1462        multi.put_shard(&cid, &shard).await.unwrap();
1463
1464        // Verify both backends have the shard
1465        assert!(backend1.has_shard(&cid).await.unwrap());
1466        assert!(backend2.has_shard(&cid).await.unwrap());
1467
1468        // Delete from first backend
1469        backend1.delete_shard(&cid).await.unwrap();
1470
1471        // Multi-backend should still find it in second backend
1472        let retrieved = multi.get_shard(&cid).await.unwrap();
1473        assert_eq!(retrieved.data, shard.data);
1474    }
1475
1476    #[tokio::test]
1477    async fn test_memory_storage() {
1478        let storage = MemoryStorage::new();
1479
1480        let header = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 11, [1u8; 32]);
1481        let shard = Shard::new(header, b"Memory test".to_vec());
1482        let cid = shard.cid().unwrap();
1483
1484        // Store shard
1485        storage.put_shard(&cid, &shard).await.unwrap();
1486
1487        // Verify it exists
1488        assert!(storage.has_shard(&cid).await.unwrap());
1489        assert_eq!(storage.shard_count(), 1);
1490
1491        // Retrieve shard
1492        let retrieved = storage.get_shard(&cid).await.unwrap();
1493        assert_eq!(retrieved.data, shard.data);
1494
1495        // Test metadata
1496        let metadata = FileMetadata::new(
1497            [1u8; 32],
1498            1024,
1499            vec![ChunkMeta::new(
1500                (16, 4),
1501                EncryptionMode::Convergent,
1502                vec![cid.to_hex()],
1503            )],
1504        );
1505
1506        storage.put_metadata(&metadata).await.unwrap();
1507        assert_eq!(storage.metadata_count(), 1);
1508
1509        let retrieved_meta = storage.get_metadata(&metadata.file_id).await.unwrap();
1510        assert_eq!(retrieved_meta.file_id, metadata.file_id);
1511
1512        // Clear storage
1513        storage.clear();
1514        assert_eq!(storage.shard_count(), 0);
1515        assert_eq!(storage.metadata_count(), 0);
1516    }
1517
1518    #[tokio::test]
1519    async fn test_garbage_collection() {
1520        let storage = MemoryStorage::new();
1521
1522        // Create unreferenced shard
1523        let header1 = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 10, [1u8; 32]);
1524        let shard1 = Shard::new(header1, b"Unreferenced".to_vec());
1525        let cid1 = shard1.cid().unwrap();
1526        storage.put_shard(&cid1, &shard1).await.unwrap();
1527
1528        // Create referenced shard
1529        let header2 = ShardHeader::new(EncryptionMode::Convergent, (16, 4), 10, [2u8; 32]);
1530        let shard2 = Shard::new(header2, b"Referenced".to_vec());
1531        let cid2 = shard2.cid().unwrap();
1532        storage.put_shard(&cid2, &shard2).await.unwrap();
1533
1534        // Create metadata referencing only shard2
1535        let metadata = FileMetadata::new(
1536            [1u8; 32],
1537            1024,
1538            vec![ChunkMeta::new(
1539                (16, 4),
1540                EncryptionMode::Convergent,
1541                vec![cid2.to_hex()],
1542            )],
1543        );
1544        storage.put_metadata(&metadata).await.unwrap();
1545
1546        // Run garbage collection
1547        let gc_report = storage.garbage_collect().await.unwrap();
1548
1549        assert_eq!(gc_report.shards_deleted, 1);
1550        assert!(gc_report.bytes_freed > 0);
1551        assert!(!storage.has_shard(&cid1).await.unwrap()); // Unreferenced shard deleted
1552        assert!(storage.has_shard(&cid2).await.unwrap()); // Referenced shard kept
1553    }
1554
1555    #[test]
1556    fn test_shard_header_serialization() {
1557        let header = ShardHeader::new(
1558            EncryptionMode::ConvergentWithSecret,
1559            (20, 5),
1560            2048,
1561            [42u8; 32],
1562        );
1563
1564        let bytes = header.to_bytes().unwrap();
1565        assert_eq!(bytes.len(), ShardHeader::SIZE);
1566
1567        let deserialized = ShardHeader::from_bytes(&bytes).unwrap();
1568        assert_eq!(deserialized.version, header.version);
1569        assert_eq!(deserialized.encryption_mode, header.encryption_mode);
1570        assert_eq!(deserialized.nspec, header.nspec);
1571        assert_eq!(deserialized.data_size, header.data_size);
1572        assert_eq!(deserialized.nonce, header.nonce);
1573    }
1574
1575    #[test]
1576    fn test_shard_cid_calculation() {
1577        let header = ShardHeader::new(EncryptionMode::RandomKey, (16, 4), 1024, [0u8; 32]);
1578        let shard = Shard::new(header, vec![1, 2, 3, 4, 5]);
1579
1580        let cid1 = shard.cid().unwrap();
1581        let cid2 = shard.cid().unwrap();
1582
1583        // CID should be deterministic
1584        assert_eq!(cid1, cid2);
1585
1586        // Different data should produce different CID
1587        let shard2 = Shard::new(shard.header.clone(), vec![1, 2, 3, 4, 6]);
1588        let cid3 = shard2.cid().unwrap();
1589        assert_ne!(cid1, cid3);
1590    }
1591
1592    #[test]
1593    fn test_multi_storage_strategies() {
1594        let backend1 = Arc::new(MemoryStorage::new());
1595        let backend2 = Arc::new(MemoryStorage::new());
1596
1597        // Test different strategies
1598        let redundant = MultiStorage::with_strategy(
1599            vec![backend1.clone(), backend2.clone()],
1600            MultiStorageStrategy::Redundant,
1601        );
1602
1603        let load_balance = MultiStorage::with_strategy(
1604            vec![backend1.clone(), backend2.clone()],
1605            MultiStorageStrategy::LoadBalance,
1606        );
1607
1608        let failover =
1609            MultiStorage::with_strategy(vec![backend1, backend2], MultiStorageStrategy::Failover);
1610
1611        assert_eq!(redundant.backend_count(), 2);
1612        assert_eq!(load_balance.backend_count(), 2);
1613        assert_eq!(failover.backend_count(), 2);
1614    }
1615
1616    #[test]
1617    fn test_cid_operations() {
1618        let data = b"test data";
1619        let cid1 = Cid::from_data(data);
1620        let cid2 = Cid::from_data(data);
1621
1622        // Same data should produce same CID
1623        assert_eq!(cid1, cid2);
1624
1625        // Different data should produce different CID
1626        let cid3 = Cid::from_data(b"different data");
1627        assert_ne!(cid1, cid3);
1628
1629        // Test hex representation
1630        let hex = cid1.to_hex();
1631        assert_eq!(hex.len(), 64); // 32 bytes * 2 hex chars per byte
1632
1633        // Test round-trip
1634        let bytes = cid1.as_bytes();
1635        let cid4 = Cid::new(*bytes);
1636        assert_eq!(cid1, cid4);
1637    }
1638}