chie_core/storage/
mod.rs

1//! Chunk storage and retrieval for CHIE Protocol.
2
3use chie_crypto::{EncryptionKey, EncryptionNonce, StreamDecryptor, StreamEncryptor, hash};
4use chie_shared::CHUNK_SIZE;
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use thiserror::Error;
8use tokio::fs;
9
10/// Storage error types.
11#[derive(Debug, Error)]
12pub enum StorageError {
13    #[error("Content not found: {cid}")]
14    ContentNotFound { cid: String },
15
16    #[error("Chunk not found: {cid}:{chunk_index}")]
17    ChunkNotFound { cid: String, chunk_index: u64 },
18
19    #[error("IO error: {0}")]
20    IoError(#[from] std::io::Error),
21
22    #[error("Encryption error: {0}")]
23    EncryptionError(String),
24
25    #[error("Hash mismatch: expected {expected}, got {actual}")]
26    HashMismatch { expected: String, actual: String },
27
28    #[error("Storage quota exceeded: used {used} bytes, max {max} bytes")]
29    QuotaExceeded { used: u64, max: u64 },
30
31    #[error("Invalid chunk size: {size} bytes")]
32    InvalidChunkSize { size: usize },
33}
34
35/// Chunk metadata stored alongside the chunk.
36#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
37pub struct ChunkMetadata {
38    /// Content CID this chunk belongs to.
39    pub cid: String,
40    /// Chunk index within the content.
41    pub chunk_index: u64,
42    /// Size of the plaintext chunk.
43    pub plaintext_size: usize,
44    /// Size of the encrypted chunk (with auth tag).
45    pub encrypted_size: usize,
46    /// BLAKE3 hash of plaintext.
47    pub hash: [u8; 32],
48}
49
50/// Pinned content info.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct PinnedContentInfo {
53    /// Content CID.
54    pub cid: String,
55    /// Total size in bytes.
56    pub total_size: u64,
57    /// Number of chunks.
58    pub chunk_count: u64,
59    /// Encryption key (encrypted with user's key in production).
60    pub encryption_key: EncryptionKey,
61    /// Base nonce for streaming encryption.
62    pub base_nonce: EncryptionNonce,
63    /// When the content was pinned.
64    pub pinned_at: chrono::DateTime<chrono::Utc>,
65}
66
67/// Storage health status.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StorageHealthStatus {
70    /// Healthy - no issues detected.
71    Healthy,
72    /// Warning - minor issues detected.
73    Warning,
74    /// Degraded - performance issues detected.
75    Degraded,
76    /// Critical - storage failing.
77    Critical,
78}
79
80impl StorageHealthStatus {
81    /// Get a numeric score for this health status (higher is better).
82    #[must_use]
83    #[inline]
84    pub const fn score(&self) -> u8 {
85        match self {
86            Self::Healthy => 100,
87            Self::Warning => 75,
88            Self::Degraded => 50,
89            Self::Critical => 25,
90        }
91    }
92
93    /// Get description of this health status.
94    #[must_use]
95    #[inline]
96    pub const fn description(&self) -> &'static str {
97        match self {
98            Self::Healthy => "Storage is healthy",
99            Self::Warning => "Minor storage issues detected",
100            Self::Degraded => "Storage performance degraded",
101            Self::Critical => "Critical storage failure",
102        }
103    }
104}
105
106/// Storage health metrics.
107#[derive(Debug, Clone)]
108pub struct StorageHealth {
109    /// Current health status.
110    pub status: StorageHealthStatus,
111    /// Number of I/O errors in the last sampling period.
112    pub io_errors: u64,
113    /// Number of slow operations (> threshold).
114    pub slow_operations: u64,
115    /// Average operation latency in milliseconds.
116    pub avg_latency_ms: f64,
117    /// Peak operation latency in milliseconds.
118    pub peak_latency_ms: u64,
119    /// Disk usage percentage (0.0 to 1.0).
120    pub disk_usage: f64,
121    /// Rate of disk usage growth (bytes/sec).
122    pub growth_rate: f64,
123    /// Predicted time until full (seconds), None if not growing.
124    pub time_until_full: Option<u64>,
125    /// Last health check timestamp.
126    pub last_check: std::time::Instant,
127}
128
129impl Default for StorageHealth {
130    fn default() -> Self {
131        Self {
132            status: StorageHealthStatus::Healthy,
133            io_errors: 0,
134            slow_operations: 0,
135            avg_latency_ms: 0.0,
136            peak_latency_ms: 0,
137            disk_usage: 0.0,
138            growth_rate: 0.0,
139            time_until_full: None,
140            last_check: std::time::Instant::now(),
141        }
142    }
143}
144
145impl StorageHealth {
146    /// Calculate health score (0.0 to 1.0).
147    #[must_use]
148    pub fn health_score(&self) -> f64 {
149        let mut score = 1.0;
150
151        // Penalize for I/O errors
152        if self.io_errors > 0 {
153            score -= (self.io_errors as f64 * 0.1).min(0.5);
154        }
155
156        // Penalize for slow operations
157        if self.slow_operations > 10 {
158            score -= 0.2;
159        } else if self.slow_operations > 5 {
160            score -= 0.1;
161        }
162
163        // Penalize for high latency
164        if self.avg_latency_ms > 100.0 {
165            score -= 0.2;
166        } else if self.avg_latency_ms > 50.0 {
167            score -= 0.1;
168        }
169
170        // Penalize for high disk usage
171        if self.disk_usage > 0.95 {
172            score -= 0.3;
173        } else if self.disk_usage > 0.90 {
174            score -= 0.2;
175        } else if self.disk_usage > 0.80 {
176            score -= 0.1;
177        }
178
179        score.max(0.0)
180    }
181
182    /// Predict if storage failure is imminent.
183    #[must_use]
184    pub fn is_failure_imminent(&self) -> bool {
185        // Failure is imminent if:
186        // 1. Critical status
187        // 2. Will be full within 1 hour
188        // 3. Too many I/O errors
189        self.status == StorageHealthStatus::Critical
190            || self.time_until_full.is_some_and(|t| t < 3600)
191            || self.io_errors > 100
192    }
193}
194
195/// Chunk storage manager.
196pub struct ChunkStorage {
197    /// Base storage directory.
198    base_path: PathBuf,
199    /// In-memory index of pinned content.
200    pinned_content: HashMap<String, PinnedContentInfo>,
201    /// Current storage usage in bytes.
202    used_bytes: u64,
203    /// Maximum storage quota in bytes.
204    max_bytes: u64,
205    /// Storage health metrics.
206    health: StorageHealth,
207    /// Previous storage usage for growth rate calculation.
208    previous_usage: Option<(u64, std::time::Instant)>,
209}
210
211impl ChunkStorage {
212    /// Create a new chunk storage.
213    pub async fn new(base_path: PathBuf, max_bytes: u64) -> Result<Self, StorageError> {
214        // Create base directory if it doesn't exist
215        fs::create_dir_all(&base_path).await?;
216        fs::create_dir_all(base_path.join("chunks")).await?;
217        fs::create_dir_all(base_path.join("metadata")).await?;
218
219        let mut storage = Self {
220            base_path,
221            pinned_content: HashMap::new(),
222            used_bytes: 0,
223            max_bytes,
224            health: StorageHealth::default(),
225            previous_usage: None,
226        };
227
228        // Load existing index
229        storage.load_index().await?;
230
231        // Initialize health metrics
232        storage.update_health_metrics();
233
234        Ok(storage)
235    }
236
237    /// Get the storage path.
238    #[inline]
239    pub fn base_path(&self) -> &Path {
240        &self.base_path
241    }
242
243    /// Get current storage usage.
244    #[inline]
245    pub fn used_bytes(&self) -> u64 {
246        self.used_bytes
247    }
248
249    /// Get maximum storage quota.
250    #[inline]
251    pub fn max_bytes(&self) -> u64 {
252        self.max_bytes
253    }
254
255    /// Get available storage.
256    #[inline]
257    pub fn available_bytes(&self) -> u64 {
258        self.max_bytes.saturating_sub(self.used_bytes)
259    }
260
261    /// Check if a content is pinned.
262    #[inline]
263    pub fn is_pinned(&self, cid: &str) -> bool {
264        self.pinned_content.contains_key(cid)
265    }
266
267    /// Get pinned content info.
268    #[inline]
269    pub fn get_pinned_info(&self, cid: &str) -> Option<&PinnedContentInfo> {
270        self.pinned_content.get(cid)
271    }
272
273    /// List all pinned content CIDs.
274    pub fn list_pinned(&self) -> Vec<&str> {
275        self.pinned_content.keys().map(|s| s.as_str()).collect()
276    }
277
278    /// Pin new content (store all chunks).
279    pub async fn pin_content(
280        &mut self,
281        cid: &str,
282        chunks: &[Vec<u8>],
283        key: &EncryptionKey,
284        nonce: &EncryptionNonce,
285    ) -> Result<PinnedContentInfo, StorageError> {
286        // Calculate total size
287        let total_size: u64 = chunks.iter().map(|c| c.len() as u64).sum();
288
289        // Check quota
290        if self.used_bytes + total_size > self.max_bytes {
291            return Err(StorageError::QuotaExceeded {
292                used: self.used_bytes,
293                max: self.max_bytes,
294            });
295        }
296
297        // Create content directory
298        let content_dir = self.chunk_dir(cid);
299        fs::create_dir_all(&content_dir).await?;
300
301        // Encrypt and store each chunk
302        let encryptor = StreamEncryptor::new(key, nonce);
303        let mut stored_size = 0u64;
304
305        for (i, chunk) in chunks.iter().enumerate() {
306            let chunk_index = i as u64;
307
308            // Hash plaintext
309            let chunk_hash = hash(chunk);
310
311            // Encrypt chunk
312            let encrypted = encryptor
313                .encrypt_chunk_at(chunk, chunk_index)
314                .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
315
316            // Store chunk
317            let chunk_path = self.chunk_path(cid, chunk_index);
318            fs::write(&chunk_path, &encrypted).await?;
319
320            // Store metadata
321            let metadata = ChunkMetadata {
322                cid: cid.to_string(),
323                chunk_index,
324                plaintext_size: chunk.len(),
325                encrypted_size: encrypted.len(),
326                hash: chunk_hash,
327            };
328            let meta_path = self.chunk_meta_path(cid, chunk_index);
329            let meta_json = serde_json::to_vec(&metadata)
330                .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
331            fs::write(&meta_path, meta_json).await?;
332
333            stored_size += encrypted.len() as u64;
334        }
335
336        // Create content info
337        let info = PinnedContentInfo {
338            cid: cid.to_string(),
339            total_size,
340            chunk_count: chunks.len() as u64,
341            encryption_key: *key,
342            base_nonce: *nonce,
343            pinned_at: chrono::Utc::now(),
344        };
345
346        // Store content metadata
347        let content_meta_path = self.content_meta_path(cid);
348        let meta_json =
349            serde_json::to_vec(&info).map_err(|e| StorageError::EncryptionError(e.to_string()))?;
350        fs::write(&content_meta_path, meta_json).await?;
351
352        // Update index
353        self.pinned_content.insert(cid.to_string(), info.clone());
354        self.used_bytes += stored_size;
355
356        // Save index
357        self.save_index().await?;
358
359        Ok(info)
360    }
361
362    /// Retrieve and decrypt a chunk.
363    pub async fn get_chunk(&self, cid: &str, chunk_index: u64) -> Result<Vec<u8>, StorageError> {
364        // Get content info
365        let info = self
366            .pinned_content
367            .get(cid)
368            .ok_or_else(|| StorageError::ContentNotFound {
369                cid: cid.to_string(),
370            })?;
371
372        // Read encrypted chunk
373        let chunk_path = self.chunk_path(cid, chunk_index);
374        if !chunk_path.exists() {
375            return Err(StorageError::ChunkNotFound {
376                cid: cid.to_string(),
377                chunk_index,
378            });
379        }
380
381        let encrypted = fs::read(&chunk_path).await?;
382
383        // Decrypt chunk
384        let decryptor = StreamDecryptor::new(&info.encryption_key, &info.base_nonce);
385        let plaintext = decryptor
386            .decrypt_chunk_at(&encrypted, chunk_index)
387            .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
388
389        Ok(plaintext)
390    }
391
392    /// Get chunk with verification (returns hash too).
393    pub async fn get_chunk_verified(
394        &self,
395        cid: &str,
396        chunk_index: u64,
397    ) -> Result<(Vec<u8>, [u8; 32]), StorageError> {
398        let plaintext = self.get_chunk(cid, chunk_index).await?;
399        let chunk_hash = hash(&plaintext);
400
401        // Optionally verify against stored metadata
402        let meta_path = self.chunk_meta_path(cid, chunk_index);
403        if meta_path.exists() {
404            let meta_json = fs::read(&meta_path).await?;
405            let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
406                .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
407
408            if chunk_hash != metadata.hash {
409                return Err(StorageError::HashMismatch {
410                    expected: hex::encode(metadata.hash),
411                    actual: hex::encode(chunk_hash),
412                });
413            }
414        }
415
416        Ok((plaintext, chunk_hash))
417    }
418
419    /// Batch retrieve multiple chunks concurrently for improved performance.
420    pub async fn get_chunks_batch(
421        &self,
422        cid: &str,
423        chunk_indices: &[u64],
424    ) -> Result<Vec<Vec<u8>>, StorageError> {
425        use tokio::task::JoinSet;
426
427        let mut tasks = JoinSet::new();
428
429        // Get content info once
430        let info = self
431            .pinned_content
432            .get(cid)
433            .ok_or_else(|| StorageError::ContentNotFound {
434                cid: cid.to_string(),
435            })?
436            .clone();
437
438        let cid = cid.to_string();
439        let base_path = self.base_path.clone();
440
441        // Spawn concurrent fetch tasks
442        for &chunk_index in chunk_indices {
443            let cid_clone = cid.clone();
444            let info_clone = info.clone();
445            let base_path_clone = base_path.clone();
446
447            tasks.spawn(async move {
448                // Read encrypted chunk
449                let chunk_path = base_path_clone
450                    .join("chunks")
451                    .join(&cid_clone)
452                    .join(format!("{}.enc", chunk_index));
453
454                if !chunk_path.exists() {
455                    return Err(StorageError::ChunkNotFound {
456                        cid: cid_clone,
457                        chunk_index,
458                    });
459                }
460
461                let encrypted = fs::read(&chunk_path).await?;
462
463                // Decrypt chunk
464                let decryptor =
465                    StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
466                let plaintext = decryptor
467                    .decrypt_chunk_at(&encrypted, chunk_index)
468                    .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
469
470                Ok((chunk_index, plaintext))
471            });
472        }
473
474        // Collect results
475        let mut results: Vec<(u64, Vec<u8>)> = Vec::new();
476        while let Some(result) = tasks.join_next().await {
477            let (index, chunk) = result
478                .map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
479            results.push((index, chunk));
480        }
481
482        // Sort by chunk index to maintain order
483        results.sort_by_key(|(idx, _)| *idx);
484
485        Ok(results.into_iter().map(|(_, chunk)| chunk).collect())
486    }
487
488    /// Batch retrieve and verify multiple chunks concurrently.
489    pub async fn get_chunks_batch_verified(
490        &self,
491        cid: &str,
492        chunk_indices: &[u64],
493    ) -> Result<Vec<(Vec<u8>, [u8; 32])>, StorageError> {
494        use tokio::task::JoinSet;
495
496        let mut tasks = JoinSet::new();
497
498        // Get content info once
499        let info = self
500            .pinned_content
501            .get(cid)
502            .ok_or_else(|| StorageError::ContentNotFound {
503                cid: cid.to_string(),
504            })?
505            .clone();
506
507        let cid = cid.to_string();
508        let base_path = self.base_path.clone();
509
510        // Spawn concurrent fetch tasks
511        for &chunk_index in chunk_indices {
512            let cid_clone = cid.clone();
513            let info_clone = info.clone();
514            let base_path_clone = base_path.clone();
515
516            tasks.spawn(async move {
517                // Read encrypted chunk
518                let chunk_path = base_path_clone
519                    .join("chunks")
520                    .join(&cid_clone)
521                    .join(format!("{}.enc", chunk_index));
522
523                if !chunk_path.exists() {
524                    return Err(StorageError::ChunkNotFound {
525                        cid: cid_clone.clone(),
526                        chunk_index,
527                    });
528                }
529
530                let encrypted = fs::read(&chunk_path).await?;
531
532                // Decrypt chunk
533                let decryptor =
534                    StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
535                let plaintext = decryptor
536                    .decrypt_chunk_at(&encrypted, chunk_index)
537                    .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
538
539                let chunk_hash = hash(&plaintext);
540
541                // Verify against stored metadata
542                let meta_path = base_path_clone
543                    .join("chunks")
544                    .join(&cid_clone)
545                    .join(format!("{}.meta", chunk_index));
546
547                if meta_path.exists() {
548                    let meta_json = fs::read(&meta_path).await?;
549                    let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
550                        .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
551
552                    if chunk_hash != metadata.hash {
553                        return Err(StorageError::HashMismatch {
554                            expected: hex::encode(metadata.hash),
555                            actual: hex::encode(chunk_hash),
556                        });
557                    }
558                }
559
560                Ok((chunk_index, plaintext, chunk_hash))
561            });
562        }
563
564        // Collect results
565        let mut results: Vec<(u64, Vec<u8>, [u8; 32])> = Vec::new();
566        while let Some(result) = tasks.join_next().await {
567            let (index, chunk, hash) = result
568                .map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
569            results.push((index, chunk, hash));
570        }
571
572        // Sort by chunk index to maintain order
573        results.sort_by_key(|(idx, _, _)| *idx);
574
575        Ok(results
576            .into_iter()
577            .map(|(_, chunk, hash)| (chunk, hash))
578            .collect())
579    }
580
581    /// Unpin content (remove all chunks).
582    pub async fn unpin_content(&mut self, cid: &str) -> Result<(), StorageError> {
583        if !self.pinned_content.contains_key(cid) {
584            return Ok(()); // Already not pinned
585        }
586
587        // Calculate freed space
588        let content_dir = self.chunk_dir(cid);
589        let mut freed_bytes = 0u64;
590
591        if content_dir.exists() {
592            let mut entries = fs::read_dir(&content_dir).await?;
593            while let Some(entry) = entries.next_entry().await? {
594                let metadata = entry.metadata().await?;
595                freed_bytes += metadata.len();
596            }
597
598            // Remove content directory
599            fs::remove_dir_all(&content_dir).await?;
600        }
601
602        // Remove content metadata
603        let meta_path = self.content_meta_path(cid);
604        if meta_path.exists() {
605            fs::remove_file(&meta_path).await?;
606        }
607
608        // Update index
609        self.pinned_content.remove(cid);
610        self.used_bytes = self.used_bytes.saturating_sub(freed_bytes);
611
612        // Save index
613        self.save_index().await?;
614
615        Ok(())
616    }
617
618    /// Get storage statistics.
619    pub fn stats(&self) -> StorageStats {
620        StorageStats {
621            used_bytes: self.used_bytes,
622            max_bytes: self.max_bytes,
623            available_bytes: self.available_bytes(),
624            pinned_content_count: self.pinned_content.len(),
625            usage_percent: (self.used_bytes as f64 / self.max_bytes as f64) * 100.0,
626        }
627    }
628
629    /// Perform storage health check to verify integrity.
630    pub async fn health_check(&self) -> Result<StorageHealthReport, StorageError> {
631        let mut report = StorageHealthReport {
632            total_content: self.pinned_content.len(),
633            healthy_content: 0,
634            corrupted_chunks: Vec::new(),
635            missing_chunks: Vec::new(),
636            metadata_issues: Vec::new(),
637        };
638
639        for (cid, info) in &self.pinned_content {
640            let mut content_healthy = true;
641
642            // Check each chunk exists and is valid
643            for chunk_index in 0..info.chunk_count {
644                let chunk_path = self.chunk_path(cid, chunk_index);
645                let meta_path = self.chunk_meta_path(cid, chunk_index);
646
647                if !chunk_path.exists() {
648                    report
649                        .missing_chunks
650                        .push(format!("{}:{}", cid, chunk_index));
651                    content_healthy = false;
652                    continue;
653                }
654
655                if !meta_path.exists() {
656                    report
657                        .metadata_issues
658                        .push(format!("{}:{} - missing metadata", cid, chunk_index));
659                    content_healthy = false;
660                    continue;
661                }
662
663                // Verify chunk integrity
664                match self.get_chunk_verified(cid, chunk_index).await {
665                    Ok(_) => {} // Chunk is valid
666                    Err(StorageError::HashMismatch { .. }) => {
667                        report
668                            .corrupted_chunks
669                            .push(format!("{}:{}", cid, chunk_index));
670                        content_healthy = false;
671                    }
672                    Err(e) => {
673                        report
674                            .metadata_issues
675                            .push(format!("{}:{} - {}", cid, chunk_index, e));
676                        content_healthy = false;
677                    }
678                }
679            }
680
681            if content_healthy {
682                report.healthy_content += 1;
683            }
684        }
685
686        Ok(report)
687    }
688
689    /// Repair corrupted or missing chunks (requires re-download from network).
690    pub async fn repair(&mut self, cid: &str) -> Result<RepairResult, StorageError> {
691        // This is a placeholder - actual repair would need network access
692        // For now, we just identify what needs repair
693        let info = self
694            .pinned_content
695            .get(cid)
696            .ok_or_else(|| StorageError::ContentNotFound {
697                cid: cid.to_string(),
698            })?
699            .clone();
700
701        let mut chunks_needing_repair = Vec::new();
702
703        #[allow(clippy::redundant_pattern_matching)]
704        for chunk_index in 0..info.chunk_count {
705            if self.get_chunk_verified(cid, chunk_index).await.is_err() {
706                chunks_needing_repair.push(chunk_index);
707            }
708        }
709
710        let status = if chunks_needing_repair.is_empty() {
711            RepairStatus::Healthy
712        } else {
713            RepairStatus::NeedsRepair
714        };
715
716        Ok(RepairResult {
717            cid: cid.to_string(),
718            chunks_needing_repair,
719            status,
720        })
721    }
722
723    // Helper methods
724
725    fn chunk_dir(&self, cid: &str) -> PathBuf {
726        self.base_path.join("chunks").join(cid)
727    }
728
729    fn chunk_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
730        self.chunk_dir(cid).join(format!("{}.enc", chunk_index))
731    }
732
733    fn chunk_meta_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
734        self.chunk_dir(cid).join(format!("{}.meta", chunk_index))
735    }
736
737    fn content_meta_path(&self, cid: &str) -> PathBuf {
738        self.base_path
739            .join("metadata")
740            .join(format!("{}.json", cid))
741    }
742
743    fn index_path(&self) -> PathBuf {
744        self.base_path.join("index.json")
745    }
746
747    async fn load_index(&mut self) -> Result<(), StorageError> {
748        let index_path = self.index_path();
749        if !index_path.exists() {
750            return Ok(());
751        }
752
753        let data = fs::read(&index_path).await?;
754        let index: StorageIndex = serde_json::from_slice(&data)
755            .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
756
757        self.used_bytes = index.used_bytes;
758
759        // Load all pinned content metadata
760        for cid in index.pinned_cids {
761            let meta_path = self.content_meta_path(&cid);
762            if meta_path.exists() {
763                let meta_data = fs::read(&meta_path).await?;
764                let info: PinnedContentInfo = serde_json::from_slice(&meta_data)
765                    .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
766                self.pinned_content.insert(cid, info);
767            }
768        }
769
770        Ok(())
771    }
772
773    async fn save_index(&self) -> Result<(), StorageError> {
774        let index = StorageIndex {
775            used_bytes: self.used_bytes,
776            pinned_cids: self.pinned_content.keys().cloned().collect(),
777        };
778
779        let data = serde_json::to_vec_pretty(&index)
780            .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
781
782        fs::write(self.index_path(), data).await?;
783        Ok(())
784    }
785
786    /// Update storage health metrics.
787    ///
788    /// This should be called periodically to track storage health and detect issues early.
789    pub fn update_health_metrics(&mut self) {
790        let now = std::time::Instant::now();
791
792        // Calculate disk usage
793        let disk_usage = if self.max_bytes > 0 {
794            self.used_bytes as f64 / self.max_bytes as f64
795        } else {
796            0.0
797        };
798
799        // Calculate growth rate if we have previous data
800        let growth_rate = if let Some((prev_usage, prev_time)) = self.previous_usage {
801            let duration_secs = now.duration_since(prev_time).as_secs_f64();
802            if duration_secs > 0.0 {
803                let bytes_change = self.used_bytes.saturating_sub(prev_usage) as f64;
804                bytes_change / duration_secs
805            } else {
806                0.0
807            }
808        } else {
809            0.0
810        };
811
812        // Predict time until full
813        let time_until_full = if growth_rate > 0.0 {
814            let available = self.max_bytes.saturating_sub(self.used_bytes) as f64;
815            Some((available / growth_rate) as u64)
816        } else {
817            None
818        };
819
820        // Determine health status
821        let status = if self.health.io_errors > 100 || disk_usage > 0.98 {
822            StorageHealthStatus::Critical
823        } else if self.health.io_errors > 50 || disk_usage > 0.95 {
824            StorageHealthStatus::Degraded
825        } else if self.health.io_errors > 10 || disk_usage > 0.90 {
826            StorageHealthStatus::Warning
827        } else {
828            StorageHealthStatus::Healthy
829        };
830
831        // Update health metrics
832        self.health.status = status;
833        self.health.disk_usage = disk_usage;
834        self.health.growth_rate = growth_rate;
835        self.health.time_until_full = time_until_full;
836        self.health.last_check = now;
837
838        // Update previous usage for next calculation
839        self.previous_usage = Some((self.used_bytes, now));
840    }
841
842    /// Get current storage health.
843    #[must_use]
844    #[inline]
845    pub fn health(&self) -> &StorageHealth {
846        &self.health
847    }
848
849    /// Record an I/O error.
850    pub fn record_io_error(&mut self) {
851        self.health.io_errors += 1;
852        self.update_health_metrics();
853    }
854
855    /// Record a slow operation.
856    pub fn record_slow_operation(&mut self, latency_ms: u64) {
857        self.health.slow_operations += 1;
858
859        // Update peak latency
860        if latency_ms > self.health.peak_latency_ms {
861            self.health.peak_latency_ms = latency_ms;
862        }
863
864        // Update average latency (simple moving average)
865        let alpha = 0.1; // Smoothing factor
866        self.health.avg_latency_ms =
867            alpha * latency_ms as f64 + (1.0 - alpha) * self.health.avg_latency_ms;
868
869        self.update_health_metrics();
870    }
871
872    /// Reset health metrics (typically called after a health check period).
873    pub fn reset_health_counters(&mut self) {
874        self.health.io_errors = 0;
875        self.health.slow_operations = 0;
876        self.update_health_metrics();
877    }
878
879    /// Check if storage health is concerning.
880    #[must_use]
881    #[inline]
882    pub fn is_health_concerning(&self) -> bool {
883        self.health.status == StorageHealthStatus::Degraded
884            || self.health.status == StorageHealthStatus::Critical
885            || self.health.is_failure_imminent()
886    }
887}
888
889/// Storage statistics.
890#[derive(Debug, Clone)]
891pub struct StorageStats {
892    pub used_bytes: u64,
893    pub max_bytes: u64,
894    pub available_bytes: u64,
895    pub pinned_content_count: usize,
896    pub usage_percent: f64,
897}
898
899/// Storage health check report.
900#[derive(Debug, Clone)]
901pub struct StorageHealthReport {
902    pub total_content: usize,
903    pub healthy_content: usize,
904    pub corrupted_chunks: Vec<String>,
905    pub missing_chunks: Vec<String>,
906    pub metadata_issues: Vec<String>,
907}
908
909/// Repair operation result.
910#[derive(Debug, Clone)]
911pub struct RepairResult {
912    pub cid: String,
913    pub chunks_needing_repair: Vec<u64>,
914    pub status: RepairStatus,
915}
916
917/// Repair status.
918#[derive(Debug, Clone, PartialEq, Eq)]
919pub enum RepairStatus {
920    Healthy,
921    NeedsRepair,
922}
923
924/// Persisted storage index.
925#[derive(Debug, serde::Serialize, serde::Deserialize)]
926struct StorageIndex {
927    used_bytes: u64,
928    pinned_cids: Vec<String>,
929}
930
931/// Helper function to split data into chunks.
932///
933/// # Examples
934///
935/// ```
936/// use chie_core::storage::split_into_chunks;
937///
938/// let data = b"Hello, World!";
939/// let chunks = split_into_chunks(data, 5);
940///
941/// assert_eq!(chunks.len(), 3);
942/// assert_eq!(chunks[0], b"Hello");
943/// assert_eq!(chunks[1], b", Wor");
944/// assert_eq!(chunks[2], b"ld!");
945/// ```
946pub fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<Vec<u8>> {
947    data.chunks(chunk_size).map(|c| c.to_vec()).collect()
948}
949
950/// Helper to calculate chunk count.
951#[inline]
952#[allow(clippy::manual_div_ceil)] // div_ceil not available in const context
953pub const fn calculate_chunk_count(size: u64) -> u64 {
954    let chunk_size = CHUNK_SIZE as u64;
955    if size == 0 {
956        0
957    } else {
958        (size + chunk_size - 1) / chunk_size
959    }
960}
961
962/// Storage health monitoring with predictive failure detection.
963///
964/// Tracks storage metrics over time to detect anomalies and predict potential failures.
965pub struct StorageHealthMonitor {
966    /// Historical error rates (errors per hour).
967    error_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
968    /// Historical corruption rates (corruptions per check).
969    corruption_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
970    /// Historical I/O latencies (in microseconds).
971    io_latency_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u64)>>>,
972    /// Total errors encountered.
973    total_errors: std::sync::Arc<std::sync::Mutex<u64>>,
974    /// Total corruptions detected.
975    total_corruptions: std::sync::Arc<std::sync::Mutex<u64>>,
976    /// History retention duration.
977    retention_duration: std::time::Duration,
978}
979
980impl StorageHealthMonitor {
981    /// Create a new storage health monitor.
982    ///
983    /// # Arguments
984    ///
985    /// * `retention_duration` - How long to keep historical data (e.g., 24 hours)
986    #[must_use]
987    pub fn new(retention_duration: std::time::Duration) -> Self {
988        Self {
989            error_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
990            corruption_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
991            io_latency_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
992            total_errors: std::sync::Arc::new(std::sync::Mutex::new(0)),
993            total_corruptions: std::sync::Arc::new(std::sync::Mutex::new(0)),
994            retention_duration,
995        }
996    }
997
998    /// Record an I/O operation error.
999    pub fn record_error(&self) {
1000        let mut errors = self.total_errors.lock().unwrap();
1001        *errors += 1;
1002        drop(errors);
1003
1004        let mut history = self.error_history.lock().unwrap();
1005        history.push((std::time::Instant::now(), 1));
1006        self.cleanup_old_records(&mut history);
1007    }
1008
1009    /// Record a corruption detection.
1010    pub fn record_corruption(&self) {
1011        let mut corruptions = self.total_corruptions.lock().unwrap();
1012        *corruptions += 1;
1013        drop(corruptions);
1014
1015        let mut history = self.corruption_history.lock().unwrap();
1016        history.push((std::time::Instant::now(), 1));
1017        self.cleanup_old_records(&mut history);
1018    }
1019
1020    /// Record an I/O operation latency (in microseconds).
1021    pub fn record_io_latency(&self, latency_us: u64) {
1022        let mut history = self.io_latency_history.lock().unwrap();
1023        history.push((std::time::Instant::now(), latency_us));
1024        self.cleanup_old_records(&mut history);
1025    }
1026
1027    /// Clean up records older than retention duration.
1028    fn cleanup_old_records<T>(&self, history: &mut Vec<(std::time::Instant, T)>) {
1029        let cutoff = std::time::Instant::now() - self.retention_duration;
1030        history.retain(|(timestamp, _)| *timestamp > cutoff);
1031    }
1032
1033    /// Get current error rate (errors per hour).
1034    #[must_use]
1035    pub fn error_rate(&self) -> f64 {
1036        let history = self.error_history.lock().unwrap();
1037        if history.is_empty() {
1038            return 0.0;
1039        }
1040
1041        let window = std::time::Duration::from_secs(3600); // 1 hour
1042        let cutoff = std::time::Instant::now() - window;
1043        let recent_errors: u32 = history
1044            .iter()
1045            .filter(|(t, _)| *t > cutoff)
1046            .map(|(_, count)| count)
1047            .sum();
1048
1049        recent_errors as f64
1050    }
1051
1052    /// Get current corruption rate (corruptions per hour).
1053    #[must_use]
1054    pub fn corruption_rate(&self) -> f64 {
1055        let history = self.corruption_history.lock().unwrap();
1056        if history.is_empty() {
1057            return 0.0;
1058        }
1059
1060        let window = std::time::Duration::from_secs(3600); // 1 hour
1061        let cutoff = std::time::Instant::now() - window;
1062        let recent_corruptions: u32 = history
1063            .iter()
1064            .filter(|(t, _)| *t > cutoff)
1065            .map(|(_, count)| count)
1066            .sum();
1067
1068        recent_corruptions as f64
1069    }
1070
1071    /// Get average I/O latency over the last hour (in microseconds).
1072    #[must_use]
1073    pub fn avg_io_latency(&self) -> f64 {
1074        let history = self.io_latency_history.lock().unwrap();
1075        if history.is_empty() {
1076            return 0.0;
1077        }
1078
1079        let window = std::time::Duration::from_secs(3600); // 1 hour
1080        let cutoff = std::time::Instant::now() - window;
1081        let recent_latencies: Vec<u64> = history
1082            .iter()
1083            .filter(|(t, _)| *t > cutoff)
1084            .map(|(_, latency)| *latency)
1085            .collect();
1086
1087        if recent_latencies.is_empty() {
1088            return 0.0;
1089        }
1090
1091        let sum: u64 = recent_latencies.iter().sum();
1092        sum as f64 / recent_latencies.len() as f64
1093    }
1094
1095    /// Predict storage health status based on current trends.
1096    ///
1097    /// Uses historical data to predict if storage is likely to fail soon.
1098    ///
1099    /// # Returns
1100    ///
1101    /// A tuple of (predicted_status, confidence_score) where confidence is 0.0-1.0.
1102    #[must_use]
1103    pub fn predict_health(&self) -> (StorageHealthStatus, f64) {
1104        let error_rate = self.error_rate();
1105        let corruption_rate = self.corruption_rate();
1106        let avg_latency = self.avg_io_latency();
1107
1108        // Calculate health score based on thresholds
1109        let mut score = 100.0;
1110        let mut confidence = 1.0;
1111
1112        // Error rate thresholds (errors per hour)
1113        if error_rate > 100.0 {
1114            score -= 40.0;
1115        } else if error_rate > 50.0 {
1116            score -= 25.0;
1117        } else if error_rate > 10.0 {
1118            score -= 10.0;
1119        }
1120
1121        // Corruption rate thresholds (corruptions per hour)
1122        if corruption_rate > 10.0 {
1123            score -= 50.0; // Corruptions are critical
1124        } else if corruption_rate > 5.0 {
1125            score -= 30.0;
1126        } else if corruption_rate > 1.0 {
1127            score -= 15.0;
1128        }
1129
1130        // I/O latency thresholds (microseconds)
1131        // Normal disk I/O: ~100-500 µs for SSD, ~5000-15000 µs for HDD
1132        if avg_latency > 50_000.0 {
1133            score -= 30.0; // Very slow, indicating potential hardware failure
1134        } else if avg_latency > 20_000.0 {
1135            score -= 15.0;
1136        } else if avg_latency > 10_000.0 {
1137            score -= 5.0;
1138        }
1139
1140        // Reduce confidence if we have limited data
1141        let history = self.io_latency_history.lock().unwrap();
1142        if history.len() < 10 {
1143            confidence = history.len() as f64 / 10.0;
1144        }
1145
1146        // Determine status from score
1147        let status = if score >= 80.0 {
1148            StorageHealthStatus::Healthy
1149        } else if score >= 60.0 {
1150            StorageHealthStatus::Warning
1151        } else if score >= 40.0 {
1152            StorageHealthStatus::Degraded
1153        } else {
1154            StorageHealthStatus::Critical
1155        };
1156
1157        (status, confidence)
1158    }
1159
1160    /// Check if storage is predicted to fail soon.
1161    ///
1162    /// Returns true if failure is likely within the prediction window.
1163    #[must_use]
1164    pub fn is_failure_predicted(&self) -> bool {
1165        let (status, confidence) = self.predict_health();
1166
1167        // Predict failure if status is Critical with high confidence,
1168        // or Degraded with very high confidence
1169        match (status, confidence) {
1170            (StorageHealthStatus::Critical, c) if c > 0.7 => true,
1171            (StorageHealthStatus::Degraded, c) if c > 0.9 => true,
1172            _ => false,
1173        }
1174    }
1175
1176    /// Get a detailed health report with predictions.
1177    #[must_use]
1178    pub fn health_report(&self) -> StorageHealthPrediction {
1179        let (predicted_status, confidence) = self.predict_health();
1180        let total_errors = *self.total_errors.lock().unwrap();
1181        let total_corruptions = *self.total_corruptions.lock().unwrap();
1182
1183        StorageHealthPrediction {
1184            current_status: predicted_status,
1185            confidence,
1186            error_rate_per_hour: self.error_rate(),
1187            corruption_rate_per_hour: self.corruption_rate(),
1188            avg_io_latency_us: self.avg_io_latency(),
1189            total_errors,
1190            total_corruptions,
1191            failure_predicted: self.is_failure_predicted(),
1192        }
1193    }
1194
1195    /// Reset all statistics.
1196    pub fn reset(&self) {
1197        self.error_history.lock().unwrap().clear();
1198        self.corruption_history.lock().unwrap().clear();
1199        self.io_latency_history.lock().unwrap().clear();
1200        *self.total_errors.lock().unwrap() = 0;
1201        *self.total_corruptions.lock().unwrap() = 0;
1202    }
1203}
1204
1205/// Storage health prediction report.
1206#[derive(Debug, Clone)]
1207pub struct StorageHealthPrediction {
1208    /// Predicted health status.
1209    pub current_status: StorageHealthStatus,
1210    /// Confidence in prediction (0.0 to 1.0).
1211    pub confidence: f64,
1212    /// Current error rate (errors per hour).
1213    pub error_rate_per_hour: f64,
1214    /// Current corruption rate (corruptions per hour).
1215    pub corruption_rate_per_hour: f64,
1216    /// Average I/O latency in microseconds.
1217    pub avg_io_latency_us: f64,
1218    /// Total errors since monitoring started.
1219    pub total_errors: u64,
1220    /// Total corruptions detected since monitoring started.
1221    pub total_corruptions: u64,
1222    /// Whether failure is predicted to occur soon.
1223    pub failure_predicted: bool,
1224}
1225
1226impl Default for StorageHealthMonitor {
1227    fn default() -> Self {
1228        Self::new(std::time::Duration::from_secs(24 * 3600)) // 24 hours default
1229    }
1230}
1231
1232// Transaction support methods for ChunkStorage
1233impl ChunkStorage {
1234    /// Get chunk directory path (exposed for transactions).
1235    #[must_use]
1236    pub fn get_chunk_dir(&self, cid: &str) -> PathBuf {
1237        self.chunk_dir(cid)
1238    }
1239
1240    /// Write chunks for a transaction.
1241    ///
1242    /// Returns list of (chunk_index, chunk_path, meta_path, size_bytes) for each written chunk.
1243    pub async fn write_chunks_for_transaction(
1244        &mut self,
1245        cid: &str,
1246        chunks: &[Vec<u8>],
1247        key: &EncryptionKey,
1248        nonce: &EncryptionNonce,
1249    ) -> Result<Vec<(u64, PathBuf, PathBuf, u64)>, StorageError> {
1250        let encryptor = StreamEncryptor::new(key, nonce);
1251        let mut written_chunks = Vec::new();
1252
1253        for (i, chunk) in chunks.iter().enumerate() {
1254            let chunk_index = i as u64;
1255
1256            // Hash plaintext
1257            let chunk_hash = hash(chunk);
1258
1259            // Encrypt chunk
1260            let encrypted = encryptor
1261                .encrypt_chunk_at(chunk, chunk_index)
1262                .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
1263
1264            // Store chunk
1265            let chunk_path = self.chunk_path(cid, chunk_index);
1266            fs::write(&chunk_path, &encrypted).await?;
1267
1268            // Store metadata
1269            let metadata = ChunkMetadata {
1270                cid: cid.to_string(),
1271                chunk_index,
1272                plaintext_size: chunk.len(),
1273                encrypted_size: encrypted.len(),
1274                hash: chunk_hash,
1275            };
1276            let meta_path = self.chunk_meta_path(cid, chunk_index);
1277            let meta_json = serde_json::to_vec(&metadata)
1278                .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
1279            fs::write(&meta_path, &meta_json).await?;
1280
1281            let size_bytes = encrypted.len() as u64;
1282            self.used_bytes += size_bytes;
1283
1284            written_chunks.push((chunk_index, chunk_path, meta_path, size_bytes));
1285        }
1286
1287        Ok(written_chunks)
1288    }
1289
1290    /// Decrease used bytes (for transaction rollback).
1291    pub fn decrease_used_bytes(&mut self, bytes: u64) {
1292        self.used_bytes = self.used_bytes.saturating_sub(bytes);
1293    }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use super::*;
1299    use tempfile::TempDir;
1300
1301    #[tokio::test]
1302    async fn test_chunk_storage_creation() {
1303        let temp_dir = TempDir::new().unwrap();
1304        let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 1024 * 1024)
1305            .await
1306            .unwrap();
1307
1308        assert_eq!(storage.used_bytes(), 0);
1309        assert_eq!(storage.max_bytes(), 1024 * 1024);
1310        assert_eq!(storage.available_bytes(), 1024 * 1024);
1311        assert_eq!(storage.list_pinned().len(), 0);
1312    }
1313
1314    #[tokio::test]
1315    async fn test_pin_and_retrieve_content() {
1316        let temp_dir = TempDir::new().unwrap();
1317        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1318            .await
1319            .unwrap();
1320
1321        let cid = "QmTest123";
1322        let test_data = vec![b"Hello, World!".to_vec(), b"Second chunk".to_vec()];
1323        let key = chie_crypto::generate_key();
1324        let nonce = chie_crypto::generate_nonce();
1325
1326        // Pin content
1327        let info = storage
1328            .pin_content(cid, &test_data, &key, &nonce)
1329            .await
1330            .unwrap();
1331
1332        assert_eq!(info.cid, cid);
1333        assert_eq!(info.chunk_count, 2);
1334        assert!(storage.is_pinned(cid));
1335        assert_eq!(storage.list_pinned().len(), 1);
1336
1337        // Retrieve chunks
1338        let chunk0 = storage.get_chunk(cid, 0).await.unwrap();
1339        let chunk1 = storage.get_chunk(cid, 1).await.unwrap();
1340
1341        assert_eq!(chunk0, test_data[0]);
1342        assert_eq!(chunk1, test_data[1]);
1343    }
1344
1345    #[tokio::test]
1346    async fn test_get_chunk_verified() {
1347        let temp_dir = TempDir::new().unwrap();
1348        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1349            .await
1350            .unwrap();
1351
1352        let cid = "QmVerified";
1353        let test_data = vec![b"Verified chunk data".to_vec()];
1354        let expected_hash = chie_crypto::hash(&test_data[0]);
1355        let key = chie_crypto::generate_key();
1356        let nonce = chie_crypto::generate_nonce();
1357
1358        storage
1359            .pin_content(cid, &test_data, &key, &nonce)
1360            .await
1361            .unwrap();
1362
1363        let (chunk, hash) = storage.get_chunk_verified(cid, 0).await.unwrap();
1364
1365        assert_eq!(chunk, test_data[0]);
1366        assert_eq!(hash, expected_hash);
1367    }
1368
1369    #[tokio::test]
1370    async fn test_unpin_content() {
1371        let temp_dir = TempDir::new().unwrap();
1372        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1373            .await
1374            .unwrap();
1375
1376        let cid = "QmUnpin";
1377        let test_data = vec![b"Data to unpin".to_vec()];
1378        let key = chie_crypto::generate_key();
1379        let nonce = chie_crypto::generate_nonce();
1380
1381        storage
1382            .pin_content(cid, &test_data, &key, &nonce)
1383            .await
1384            .unwrap();
1385        assert!(storage.is_pinned(cid));
1386        let used_before = storage.used_bytes();
1387        assert!(used_before > 0);
1388
1389        storage.unpin_content(cid).await.unwrap();
1390        assert!(!storage.is_pinned(cid));
1391        assert_eq!(storage.used_bytes(), 0);
1392    }
1393
1394    #[tokio::test]
1395    async fn test_quota_exceeded() {
1396        let temp_dir = TempDir::new().unwrap();
1397        let small_quota = 100; // Very small quota
1398        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), small_quota)
1399            .await
1400            .unwrap();
1401
1402        let cid = "QmTooBig";
1403        let large_data = vec![vec![0u8; 1000]]; // Larger than quota
1404        let key = chie_crypto::generate_key();
1405        let nonce = chie_crypto::generate_nonce();
1406
1407        let result = storage.pin_content(cid, &large_data, &key, &nonce).await;
1408        assert!(result.is_err());
1409        assert!(matches!(
1410            result.unwrap_err(),
1411            StorageError::QuotaExceeded { .. }
1412        ));
1413    }
1414
1415    #[tokio::test]
1416    async fn test_content_not_found() {
1417        let temp_dir = TempDir::new().unwrap();
1418        let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1419            .await
1420            .unwrap();
1421
1422        let result = storage.get_chunk("QmNonExistent", 0).await;
1423        assert!(result.is_err());
1424        assert!(matches!(
1425            result.unwrap_err(),
1426            StorageError::ContentNotFound { .. }
1427        ));
1428    }
1429
1430    #[tokio::test]
1431    async fn test_chunk_not_found() {
1432        let temp_dir = TempDir::new().unwrap();
1433        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1434            .await
1435            .unwrap();
1436
1437        let cid = "QmChunkTest";
1438        let test_data = vec![b"Only one chunk".to_vec()];
1439        let key = chie_crypto::generate_key();
1440        let nonce = chie_crypto::generate_nonce();
1441
1442        storage
1443            .pin_content(cid, &test_data, &key, &nonce)
1444            .await
1445            .unwrap();
1446
1447        // Try to get non-existent chunk index
1448        let result = storage.get_chunk(cid, 99).await;
1449        assert!(result.is_err());
1450        assert!(matches!(
1451            result.unwrap_err(),
1452            StorageError::ChunkNotFound { .. }
1453        ));
1454    }
1455
1456    #[tokio::test]
1457    async fn test_storage_stats() {
1458        let temp_dir = TempDir::new().unwrap();
1459        let max_bytes = 10 * 1024 * 1024;
1460        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), max_bytes)
1461            .await
1462            .unwrap();
1463
1464        let stats_empty = storage.stats();
1465        assert_eq!(stats_empty.used_bytes, 0);
1466        assert_eq!(stats_empty.max_bytes, max_bytes);
1467        assert_eq!(stats_empty.available_bytes, max_bytes);
1468        assert_eq!(stats_empty.pinned_content_count, 0);
1469        assert_eq!(stats_empty.usage_percent, 0.0);
1470
1471        // Pin some content
1472        let cid = "QmStats";
1473        let test_data = vec![b"Test data for stats".to_vec()];
1474        let key = chie_crypto::generate_key();
1475        let nonce = chie_crypto::generate_nonce();
1476
1477        storage
1478            .pin_content(cid, &test_data, &key, &nonce)
1479            .await
1480            .unwrap();
1481
1482        let stats_used = storage.stats();
1483        assert!(stats_used.used_bytes > 0);
1484        assert_eq!(stats_used.max_bytes, max_bytes);
1485        assert!(stats_used.available_bytes < max_bytes);
1486        assert_eq!(stats_used.pinned_content_count, 1);
1487        assert!(stats_used.usage_percent > 0.0);
1488    }
1489
1490    #[tokio::test]
1491    async fn test_multiple_content_pins() {
1492        let temp_dir = TempDir::new().unwrap();
1493        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1494            .await
1495            .unwrap();
1496
1497        let key = chie_crypto::generate_key();
1498        let nonce = chie_crypto::generate_nonce();
1499
1500        // Pin multiple pieces of content
1501        for i in 0..5 {
1502            let cid = format!("QmMulti{}", i);
1503            let data = vec![format!("Content {}", i).into_bytes()];
1504            storage
1505                .pin_content(&cid, &data, &key, &nonce)
1506                .await
1507                .unwrap();
1508        }
1509
1510        assert_eq!(storage.list_pinned().len(), 5);
1511        assert!(storage.is_pinned("QmMulti0"));
1512        assert!(storage.is_pinned("QmMulti4"));
1513        assert!(!storage.is_pinned("QmMulti5"));
1514    }
1515
1516    #[tokio::test]
1517    async fn test_persistence() {
1518        let temp_dir = TempDir::new().unwrap();
1519        let path = temp_dir.path().to_path_buf();
1520        let cid = "QmPersist";
1521        let test_data = vec![b"Persistent data".to_vec()];
1522
1523        // Create storage, pin content, then drop it
1524        {
1525            let mut storage = ChunkStorage::new(path.clone(), 10 * 1024 * 1024)
1526                .await
1527                .unwrap();
1528            let key = chie_crypto::generate_key();
1529            let nonce = chie_crypto::generate_nonce();
1530            storage
1531                .pin_content(cid, &test_data, &key, &nonce)
1532                .await
1533                .unwrap();
1534        }
1535
1536        // Recreate storage and verify content is still there
1537        {
1538            let storage = ChunkStorage::new(path, 10 * 1024 * 1024).await.unwrap();
1539            assert!(storage.is_pinned(cid));
1540            assert_eq!(storage.list_pinned().len(), 1);
1541            assert!(storage.used_bytes() > 0);
1542        }
1543    }
1544
1545    #[test]
1546    fn test_split_into_chunks() {
1547        let data = vec![1u8; 100]; // 100 bytes
1548        let chunk_size = 30;
1549
1550        let chunks = split_into_chunks(&data, chunk_size);
1551
1552        // 100 bytes split into chunks of 30 = 4 chunks (30, 30, 30, 10)
1553        assert_eq!(chunks.len(), 4);
1554        assert_eq!(chunks[0].len(), 30);
1555        assert_eq!(chunks[1].len(), 30);
1556        assert_eq!(chunks[2].len(), 30);
1557        assert_eq!(chunks[3].len(), 10);
1558
1559        // Verify we can reconstruct the data
1560        let reconstructed: Vec<u8> = chunks.into_iter().flatten().collect();
1561        assert_eq!(reconstructed, data);
1562    }
1563
1564    #[test]
1565    fn test_calculate_chunk_count() {
1566        assert_eq!(calculate_chunk_count(0), 0);
1567        assert_eq!(calculate_chunk_count(1), 1);
1568        assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64), 1);
1569        assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 + 1), 2);
1570        assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3), 3);
1571        assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3 + 1), 4);
1572    }
1573
1574    #[tokio::test]
1575    async fn test_get_pinned_info() {
1576        let temp_dir = TempDir::new().unwrap();
1577        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1578            .await
1579            .unwrap();
1580
1581        let cid = "QmInfo";
1582        let test_data = vec![b"Info test".to_vec()];
1583        let key = chie_crypto::generate_key();
1584        let nonce = chie_crypto::generate_nonce();
1585
1586        storage
1587            .pin_content(cid, &test_data, &key, &nonce)
1588            .await
1589            .unwrap();
1590
1591        let info = storage.get_pinned_info(cid);
1592        assert!(info.is_some());
1593
1594        let info = info.unwrap();
1595        assert_eq!(info.cid, cid);
1596        assert_eq!(info.chunk_count, 1);
1597        assert_eq!(info.encryption_key, key);
1598        assert_eq!(info.base_nonce, nonce);
1599
1600        assert!(storage.get_pinned_info("QmNonExistent").is_none());
1601    }
1602
1603    #[tokio::test]
1604    async fn test_large_content() {
1605        let temp_dir = TempDir::new().unwrap();
1606        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 100 * 1024 * 1024)
1607            .await
1608            .unwrap();
1609
1610        let cid = "QmLarge";
1611        // Create 10 chunks of 64KB each
1612        let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 64 * 1024]).collect();
1613        let key = chie_crypto::generate_key();
1614        let nonce = chie_crypto::generate_nonce();
1615
1616        let info = storage
1617            .pin_content(cid, &chunks, &key, &nonce)
1618            .await
1619            .unwrap();
1620
1621        assert_eq!(info.chunk_count, 10);
1622        assert_eq!(info.total_size, 64 * 1024 * 10);
1623
1624        // Retrieve all chunks
1625        for i in 0..10 {
1626            let chunk = storage.get_chunk(cid, i).await.unwrap();
1627            assert_eq!(chunk.len(), 64 * 1024);
1628            assert_eq!(chunk[0], i as u8);
1629        }
1630    }
1631}