1use chie_crypto::{EncryptionKey, EncryptionNonce, StreamDecryptor, StreamEncryptor, hash};
4use chie_shared::CHUNK_SIZE;
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use thiserror::Error;
8use tokio::fs;
9
10#[derive(Debug, Error)]
12pub enum StorageError {
13 #[error("Content not found: {cid}")]
14 ContentNotFound { cid: String },
15
16 #[error("Chunk not found: {cid}:{chunk_index}")]
17 ChunkNotFound { cid: String, chunk_index: u64 },
18
19 #[error("IO error: {0}")]
20 IoError(#[from] std::io::Error),
21
22 #[error("Encryption error: {0}")]
23 EncryptionError(String),
24
25 #[error("Hash mismatch: expected {expected}, got {actual}")]
26 HashMismatch { expected: String, actual: String },
27
28 #[error("Storage quota exceeded: used {used} bytes, max {max} bytes")]
29 QuotaExceeded { used: u64, max: u64 },
30
31 #[error("Invalid chunk size: {size} bytes")]
32 InvalidChunkSize { size: usize },
33}
34
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
37pub struct ChunkMetadata {
38 pub cid: String,
40 pub chunk_index: u64,
42 pub plaintext_size: usize,
44 pub encrypted_size: usize,
46 pub hash: [u8; 32],
48}
49
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct PinnedContentInfo {
53 pub cid: String,
55 pub total_size: u64,
57 pub chunk_count: u64,
59 pub encryption_key: EncryptionKey,
61 pub base_nonce: EncryptionNonce,
63 pub pinned_at: chrono::DateTime<chrono::Utc>,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum StorageHealthStatus {
70 Healthy,
72 Warning,
74 Degraded,
76 Critical,
78}
79
80impl StorageHealthStatus {
81 #[must_use]
83 #[inline]
84 pub const fn score(&self) -> u8 {
85 match self {
86 Self::Healthy => 100,
87 Self::Warning => 75,
88 Self::Degraded => 50,
89 Self::Critical => 25,
90 }
91 }
92
93 #[must_use]
95 #[inline]
96 pub const fn description(&self) -> &'static str {
97 match self {
98 Self::Healthy => "Storage is healthy",
99 Self::Warning => "Minor storage issues detected",
100 Self::Degraded => "Storage performance degraded",
101 Self::Critical => "Critical storage failure",
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct StorageHealth {
109 pub status: StorageHealthStatus,
111 pub io_errors: u64,
113 pub slow_operations: u64,
115 pub avg_latency_ms: f64,
117 pub peak_latency_ms: u64,
119 pub disk_usage: f64,
121 pub growth_rate: f64,
123 pub time_until_full: Option<u64>,
125 pub last_check: std::time::Instant,
127}
128
129impl Default for StorageHealth {
130 fn default() -> Self {
131 Self {
132 status: StorageHealthStatus::Healthy,
133 io_errors: 0,
134 slow_operations: 0,
135 avg_latency_ms: 0.0,
136 peak_latency_ms: 0,
137 disk_usage: 0.0,
138 growth_rate: 0.0,
139 time_until_full: None,
140 last_check: std::time::Instant::now(),
141 }
142 }
143}
144
145impl StorageHealth {
146 #[must_use]
148 pub fn health_score(&self) -> f64 {
149 let mut score = 1.0;
150
151 if self.io_errors > 0 {
153 score -= (self.io_errors as f64 * 0.1).min(0.5);
154 }
155
156 if self.slow_operations > 10 {
158 score -= 0.2;
159 } else if self.slow_operations > 5 {
160 score -= 0.1;
161 }
162
163 if self.avg_latency_ms > 100.0 {
165 score -= 0.2;
166 } else if self.avg_latency_ms > 50.0 {
167 score -= 0.1;
168 }
169
170 if self.disk_usage > 0.95 {
172 score -= 0.3;
173 } else if self.disk_usage > 0.90 {
174 score -= 0.2;
175 } else if self.disk_usage > 0.80 {
176 score -= 0.1;
177 }
178
179 score.max(0.0)
180 }
181
182 #[must_use]
184 pub fn is_failure_imminent(&self) -> bool {
185 self.status == StorageHealthStatus::Critical
190 || self.time_until_full.is_some_and(|t| t < 3600)
191 || self.io_errors > 100
192 }
193}
194
195pub struct ChunkStorage {
197 base_path: PathBuf,
199 pinned_content: HashMap<String, PinnedContentInfo>,
201 used_bytes: u64,
203 max_bytes: u64,
205 health: StorageHealth,
207 previous_usage: Option<(u64, std::time::Instant)>,
209}
210
211impl ChunkStorage {
212 pub async fn new(base_path: PathBuf, max_bytes: u64) -> Result<Self, StorageError> {
214 fs::create_dir_all(&base_path).await?;
216 fs::create_dir_all(base_path.join("chunks")).await?;
217 fs::create_dir_all(base_path.join("metadata")).await?;
218
219 let mut storage = Self {
220 base_path,
221 pinned_content: HashMap::new(),
222 used_bytes: 0,
223 max_bytes,
224 health: StorageHealth::default(),
225 previous_usage: None,
226 };
227
228 storage.load_index().await?;
230
231 storage.update_health_metrics();
233
234 Ok(storage)
235 }
236
237 #[inline]
239 pub fn base_path(&self) -> &Path {
240 &self.base_path
241 }
242
243 #[inline]
245 pub fn used_bytes(&self) -> u64 {
246 self.used_bytes
247 }
248
249 #[inline]
251 pub fn max_bytes(&self) -> u64 {
252 self.max_bytes
253 }
254
255 #[inline]
257 pub fn available_bytes(&self) -> u64 {
258 self.max_bytes.saturating_sub(self.used_bytes)
259 }
260
261 #[inline]
263 pub fn is_pinned(&self, cid: &str) -> bool {
264 self.pinned_content.contains_key(cid)
265 }
266
267 #[inline]
269 pub fn get_pinned_info(&self, cid: &str) -> Option<&PinnedContentInfo> {
270 self.pinned_content.get(cid)
271 }
272
273 pub fn list_pinned(&self) -> Vec<&str> {
275 self.pinned_content.keys().map(|s| s.as_str()).collect()
276 }
277
278 pub async fn pin_content(
280 &mut self,
281 cid: &str,
282 chunks: &[Vec<u8>],
283 key: &EncryptionKey,
284 nonce: &EncryptionNonce,
285 ) -> Result<PinnedContentInfo, StorageError> {
286 let total_size: u64 = chunks.iter().map(|c| c.len() as u64).sum();
288
289 if self.used_bytes + total_size > self.max_bytes {
291 return Err(StorageError::QuotaExceeded {
292 used: self.used_bytes,
293 max: self.max_bytes,
294 });
295 }
296
297 let content_dir = self.chunk_dir(cid);
299 fs::create_dir_all(&content_dir).await?;
300
301 let encryptor = StreamEncryptor::new(key, nonce);
303 let mut stored_size = 0u64;
304
305 for (i, chunk) in chunks.iter().enumerate() {
306 let chunk_index = i as u64;
307
308 let chunk_hash = hash(chunk);
310
311 let encrypted = encryptor
313 .encrypt_chunk_at(chunk, chunk_index)
314 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
315
316 let chunk_path = self.chunk_path(cid, chunk_index);
318 fs::write(&chunk_path, &encrypted).await?;
319
320 let metadata = ChunkMetadata {
322 cid: cid.to_string(),
323 chunk_index,
324 plaintext_size: chunk.len(),
325 encrypted_size: encrypted.len(),
326 hash: chunk_hash,
327 };
328 let meta_path = self.chunk_meta_path(cid, chunk_index);
329 let meta_json = serde_json::to_vec(&metadata)
330 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
331 fs::write(&meta_path, meta_json).await?;
332
333 stored_size += encrypted.len() as u64;
334 }
335
336 let info = PinnedContentInfo {
338 cid: cid.to_string(),
339 total_size,
340 chunk_count: chunks.len() as u64,
341 encryption_key: *key,
342 base_nonce: *nonce,
343 pinned_at: chrono::Utc::now(),
344 };
345
346 let content_meta_path = self.content_meta_path(cid);
348 let meta_json =
349 serde_json::to_vec(&info).map_err(|e| StorageError::EncryptionError(e.to_string()))?;
350 fs::write(&content_meta_path, meta_json).await?;
351
352 self.pinned_content.insert(cid.to_string(), info.clone());
354 self.used_bytes += stored_size;
355
356 self.save_index().await?;
358
359 Ok(info)
360 }
361
362 pub async fn get_chunk(&self, cid: &str, chunk_index: u64) -> Result<Vec<u8>, StorageError> {
364 let info = self
366 .pinned_content
367 .get(cid)
368 .ok_or_else(|| StorageError::ContentNotFound {
369 cid: cid.to_string(),
370 })?;
371
372 let chunk_path = self.chunk_path(cid, chunk_index);
374 if !chunk_path.exists() {
375 return Err(StorageError::ChunkNotFound {
376 cid: cid.to_string(),
377 chunk_index,
378 });
379 }
380
381 let encrypted = fs::read(&chunk_path).await?;
382
383 let decryptor = StreamDecryptor::new(&info.encryption_key, &info.base_nonce);
385 let plaintext = decryptor
386 .decrypt_chunk_at(&encrypted, chunk_index)
387 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
388
389 Ok(plaintext)
390 }
391
392 pub async fn get_chunk_verified(
394 &self,
395 cid: &str,
396 chunk_index: u64,
397 ) -> Result<(Vec<u8>, [u8; 32]), StorageError> {
398 let plaintext = self.get_chunk(cid, chunk_index).await?;
399 let chunk_hash = hash(&plaintext);
400
401 let meta_path = self.chunk_meta_path(cid, chunk_index);
403 if meta_path.exists() {
404 let meta_json = fs::read(&meta_path).await?;
405 let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
406 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
407
408 if chunk_hash != metadata.hash {
409 return Err(StorageError::HashMismatch {
410 expected: hex::encode(metadata.hash),
411 actual: hex::encode(chunk_hash),
412 });
413 }
414 }
415
416 Ok((plaintext, chunk_hash))
417 }
418
419 pub async fn get_chunks_batch(
421 &self,
422 cid: &str,
423 chunk_indices: &[u64],
424 ) -> Result<Vec<Vec<u8>>, StorageError> {
425 use tokio::task::JoinSet;
426
427 let mut tasks = JoinSet::new();
428
429 let info = self
431 .pinned_content
432 .get(cid)
433 .ok_or_else(|| StorageError::ContentNotFound {
434 cid: cid.to_string(),
435 })?
436 .clone();
437
438 let cid = cid.to_string();
439 let base_path = self.base_path.clone();
440
441 for &chunk_index in chunk_indices {
443 let cid_clone = cid.clone();
444 let info_clone = info.clone();
445 let base_path_clone = base_path.clone();
446
447 tasks.spawn(async move {
448 let chunk_path = base_path_clone
450 .join("chunks")
451 .join(&cid_clone)
452 .join(format!("{}.enc", chunk_index));
453
454 if !chunk_path.exists() {
455 return Err(StorageError::ChunkNotFound {
456 cid: cid_clone,
457 chunk_index,
458 });
459 }
460
461 let encrypted = fs::read(&chunk_path).await?;
462
463 let decryptor =
465 StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
466 let plaintext = decryptor
467 .decrypt_chunk_at(&encrypted, chunk_index)
468 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
469
470 Ok((chunk_index, plaintext))
471 });
472 }
473
474 let mut results: Vec<(u64, Vec<u8>)> = Vec::new();
476 while let Some(result) = tasks.join_next().await {
477 let (index, chunk) = result
478 .map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
479 results.push((index, chunk));
480 }
481
482 results.sort_by_key(|(idx, _)| *idx);
484
485 Ok(results.into_iter().map(|(_, chunk)| chunk).collect())
486 }
487
488 pub async fn get_chunks_batch_verified(
490 &self,
491 cid: &str,
492 chunk_indices: &[u64],
493 ) -> Result<Vec<(Vec<u8>, [u8; 32])>, StorageError> {
494 use tokio::task::JoinSet;
495
496 let mut tasks = JoinSet::new();
497
498 let info = self
500 .pinned_content
501 .get(cid)
502 .ok_or_else(|| StorageError::ContentNotFound {
503 cid: cid.to_string(),
504 })?
505 .clone();
506
507 let cid = cid.to_string();
508 let base_path = self.base_path.clone();
509
510 for &chunk_index in chunk_indices {
512 let cid_clone = cid.clone();
513 let info_clone = info.clone();
514 let base_path_clone = base_path.clone();
515
516 tasks.spawn(async move {
517 let chunk_path = base_path_clone
519 .join("chunks")
520 .join(&cid_clone)
521 .join(format!("{}.enc", chunk_index));
522
523 if !chunk_path.exists() {
524 return Err(StorageError::ChunkNotFound {
525 cid: cid_clone.clone(),
526 chunk_index,
527 });
528 }
529
530 let encrypted = fs::read(&chunk_path).await?;
531
532 let decryptor =
534 StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
535 let plaintext = decryptor
536 .decrypt_chunk_at(&encrypted, chunk_index)
537 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
538
539 let chunk_hash = hash(&plaintext);
540
541 let meta_path = base_path_clone
543 .join("chunks")
544 .join(&cid_clone)
545 .join(format!("{}.meta", chunk_index));
546
547 if meta_path.exists() {
548 let meta_json = fs::read(&meta_path).await?;
549 let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
550 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
551
552 if chunk_hash != metadata.hash {
553 return Err(StorageError::HashMismatch {
554 expected: hex::encode(metadata.hash),
555 actual: hex::encode(chunk_hash),
556 });
557 }
558 }
559
560 Ok((chunk_index, plaintext, chunk_hash))
561 });
562 }
563
564 let mut results: Vec<(u64, Vec<u8>, [u8; 32])> = Vec::new();
566 while let Some(result) = tasks.join_next().await {
567 let (index, chunk, hash) = result
568 .map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
569 results.push((index, chunk, hash));
570 }
571
572 results.sort_by_key(|(idx, _, _)| *idx);
574
575 Ok(results
576 .into_iter()
577 .map(|(_, chunk, hash)| (chunk, hash))
578 .collect())
579 }
580
581 pub async fn unpin_content(&mut self, cid: &str) -> Result<(), StorageError> {
583 if !self.pinned_content.contains_key(cid) {
584 return Ok(()); }
586
587 let content_dir = self.chunk_dir(cid);
589 let mut freed_bytes = 0u64;
590
591 if content_dir.exists() {
592 let mut entries = fs::read_dir(&content_dir).await?;
593 while let Some(entry) = entries.next_entry().await? {
594 let metadata = entry.metadata().await?;
595 freed_bytes += metadata.len();
596 }
597
598 fs::remove_dir_all(&content_dir).await?;
600 }
601
602 let meta_path = self.content_meta_path(cid);
604 if meta_path.exists() {
605 fs::remove_file(&meta_path).await?;
606 }
607
608 self.pinned_content.remove(cid);
610 self.used_bytes = self.used_bytes.saturating_sub(freed_bytes);
611
612 self.save_index().await?;
614
615 Ok(())
616 }
617
618 pub fn stats(&self) -> StorageStats {
620 StorageStats {
621 used_bytes: self.used_bytes,
622 max_bytes: self.max_bytes,
623 available_bytes: self.available_bytes(),
624 pinned_content_count: self.pinned_content.len(),
625 usage_percent: (self.used_bytes as f64 / self.max_bytes as f64) * 100.0,
626 }
627 }
628
629 pub async fn health_check(&self) -> Result<StorageHealthReport, StorageError> {
631 let mut report = StorageHealthReport {
632 total_content: self.pinned_content.len(),
633 healthy_content: 0,
634 corrupted_chunks: Vec::new(),
635 missing_chunks: Vec::new(),
636 metadata_issues: Vec::new(),
637 };
638
639 for (cid, info) in &self.pinned_content {
640 let mut content_healthy = true;
641
642 for chunk_index in 0..info.chunk_count {
644 let chunk_path = self.chunk_path(cid, chunk_index);
645 let meta_path = self.chunk_meta_path(cid, chunk_index);
646
647 if !chunk_path.exists() {
648 report
649 .missing_chunks
650 .push(format!("{}:{}", cid, chunk_index));
651 content_healthy = false;
652 continue;
653 }
654
655 if !meta_path.exists() {
656 report
657 .metadata_issues
658 .push(format!("{}:{} - missing metadata", cid, chunk_index));
659 content_healthy = false;
660 continue;
661 }
662
663 match self.get_chunk_verified(cid, chunk_index).await {
665 Ok(_) => {} Err(StorageError::HashMismatch { .. }) => {
667 report
668 .corrupted_chunks
669 .push(format!("{}:{}", cid, chunk_index));
670 content_healthy = false;
671 }
672 Err(e) => {
673 report
674 .metadata_issues
675 .push(format!("{}:{} - {}", cid, chunk_index, e));
676 content_healthy = false;
677 }
678 }
679 }
680
681 if content_healthy {
682 report.healthy_content += 1;
683 }
684 }
685
686 Ok(report)
687 }
688
689 pub async fn repair(&mut self, cid: &str) -> Result<RepairResult, StorageError> {
691 let info = self
694 .pinned_content
695 .get(cid)
696 .ok_or_else(|| StorageError::ContentNotFound {
697 cid: cid.to_string(),
698 })?
699 .clone();
700
701 let mut chunks_needing_repair = Vec::new();
702
703 #[allow(clippy::redundant_pattern_matching)]
704 for chunk_index in 0..info.chunk_count {
705 if self.get_chunk_verified(cid, chunk_index).await.is_err() {
706 chunks_needing_repair.push(chunk_index);
707 }
708 }
709
710 let status = if chunks_needing_repair.is_empty() {
711 RepairStatus::Healthy
712 } else {
713 RepairStatus::NeedsRepair
714 };
715
716 Ok(RepairResult {
717 cid: cid.to_string(),
718 chunks_needing_repair,
719 status,
720 })
721 }
722
723 fn chunk_dir(&self, cid: &str) -> PathBuf {
726 self.base_path.join("chunks").join(cid)
727 }
728
729 fn chunk_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
730 self.chunk_dir(cid).join(format!("{}.enc", chunk_index))
731 }
732
733 fn chunk_meta_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
734 self.chunk_dir(cid).join(format!("{}.meta", chunk_index))
735 }
736
737 fn content_meta_path(&self, cid: &str) -> PathBuf {
738 self.base_path
739 .join("metadata")
740 .join(format!("{}.json", cid))
741 }
742
743 fn index_path(&self) -> PathBuf {
744 self.base_path.join("index.json")
745 }
746
747 async fn load_index(&mut self) -> Result<(), StorageError> {
748 let index_path = self.index_path();
749 if !index_path.exists() {
750 return Ok(());
751 }
752
753 let data = fs::read(&index_path).await?;
754 let index: StorageIndex = serde_json::from_slice(&data)
755 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
756
757 self.used_bytes = index.used_bytes;
758
759 for cid in index.pinned_cids {
761 let meta_path = self.content_meta_path(&cid);
762 if meta_path.exists() {
763 let meta_data = fs::read(&meta_path).await?;
764 let info: PinnedContentInfo = serde_json::from_slice(&meta_data)
765 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
766 self.pinned_content.insert(cid, info);
767 }
768 }
769
770 Ok(())
771 }
772
773 async fn save_index(&self) -> Result<(), StorageError> {
774 let index = StorageIndex {
775 used_bytes: self.used_bytes,
776 pinned_cids: self.pinned_content.keys().cloned().collect(),
777 };
778
779 let data = serde_json::to_vec_pretty(&index)
780 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
781
782 fs::write(self.index_path(), data).await?;
783 Ok(())
784 }
785
786 pub fn update_health_metrics(&mut self) {
790 let now = std::time::Instant::now();
791
792 let disk_usage = if self.max_bytes > 0 {
794 self.used_bytes as f64 / self.max_bytes as f64
795 } else {
796 0.0
797 };
798
799 let growth_rate = if let Some((prev_usage, prev_time)) = self.previous_usage {
801 let duration_secs = now.duration_since(prev_time).as_secs_f64();
802 if duration_secs > 0.0 {
803 let bytes_change = self.used_bytes.saturating_sub(prev_usage) as f64;
804 bytes_change / duration_secs
805 } else {
806 0.0
807 }
808 } else {
809 0.0
810 };
811
812 let time_until_full = if growth_rate > 0.0 {
814 let available = self.max_bytes.saturating_sub(self.used_bytes) as f64;
815 Some((available / growth_rate) as u64)
816 } else {
817 None
818 };
819
820 let status = if self.health.io_errors > 100 || disk_usage > 0.98 {
822 StorageHealthStatus::Critical
823 } else if self.health.io_errors > 50 || disk_usage > 0.95 {
824 StorageHealthStatus::Degraded
825 } else if self.health.io_errors > 10 || disk_usage > 0.90 {
826 StorageHealthStatus::Warning
827 } else {
828 StorageHealthStatus::Healthy
829 };
830
831 self.health.status = status;
833 self.health.disk_usage = disk_usage;
834 self.health.growth_rate = growth_rate;
835 self.health.time_until_full = time_until_full;
836 self.health.last_check = now;
837
838 self.previous_usage = Some((self.used_bytes, now));
840 }
841
842 #[must_use]
844 #[inline]
845 pub fn health(&self) -> &StorageHealth {
846 &self.health
847 }
848
849 pub fn record_io_error(&mut self) {
851 self.health.io_errors += 1;
852 self.update_health_metrics();
853 }
854
855 pub fn record_slow_operation(&mut self, latency_ms: u64) {
857 self.health.slow_operations += 1;
858
859 if latency_ms > self.health.peak_latency_ms {
861 self.health.peak_latency_ms = latency_ms;
862 }
863
864 let alpha = 0.1; self.health.avg_latency_ms =
867 alpha * latency_ms as f64 + (1.0 - alpha) * self.health.avg_latency_ms;
868
869 self.update_health_metrics();
870 }
871
872 pub fn reset_health_counters(&mut self) {
874 self.health.io_errors = 0;
875 self.health.slow_operations = 0;
876 self.update_health_metrics();
877 }
878
879 #[must_use]
881 #[inline]
882 pub fn is_health_concerning(&self) -> bool {
883 self.health.status == StorageHealthStatus::Degraded
884 || self.health.status == StorageHealthStatus::Critical
885 || self.health.is_failure_imminent()
886 }
887}
888
889#[derive(Debug, Clone)]
891pub struct StorageStats {
892 pub used_bytes: u64,
893 pub max_bytes: u64,
894 pub available_bytes: u64,
895 pub pinned_content_count: usize,
896 pub usage_percent: f64,
897}
898
899#[derive(Debug, Clone)]
901pub struct StorageHealthReport {
902 pub total_content: usize,
903 pub healthy_content: usize,
904 pub corrupted_chunks: Vec<String>,
905 pub missing_chunks: Vec<String>,
906 pub metadata_issues: Vec<String>,
907}
908
909#[derive(Debug, Clone)]
911pub struct RepairResult {
912 pub cid: String,
913 pub chunks_needing_repair: Vec<u64>,
914 pub status: RepairStatus,
915}
916
917#[derive(Debug, Clone, PartialEq, Eq)]
919pub enum RepairStatus {
920 Healthy,
921 NeedsRepair,
922}
923
924#[derive(Debug, serde::Serialize, serde::Deserialize)]
926struct StorageIndex {
927 used_bytes: u64,
928 pinned_cids: Vec<String>,
929}
930
931pub fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<Vec<u8>> {
947 data.chunks(chunk_size).map(|c| c.to_vec()).collect()
948}
949
950#[inline]
952#[allow(clippy::manual_div_ceil)] pub const fn calculate_chunk_count(size: u64) -> u64 {
954 let chunk_size = CHUNK_SIZE as u64;
955 if size == 0 {
956 0
957 } else {
958 (size + chunk_size - 1) / chunk_size
959 }
960}
961
962pub struct StorageHealthMonitor {
966 error_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
968 corruption_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
970 io_latency_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u64)>>>,
972 total_errors: std::sync::Arc<std::sync::Mutex<u64>>,
974 total_corruptions: std::sync::Arc<std::sync::Mutex<u64>>,
976 retention_duration: std::time::Duration,
978}
979
980impl StorageHealthMonitor {
981 #[must_use]
987 pub fn new(retention_duration: std::time::Duration) -> Self {
988 Self {
989 error_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
990 corruption_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
991 io_latency_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
992 total_errors: std::sync::Arc::new(std::sync::Mutex::new(0)),
993 total_corruptions: std::sync::Arc::new(std::sync::Mutex::new(0)),
994 retention_duration,
995 }
996 }
997
998 pub fn record_error(&self) {
1000 let mut errors = self.total_errors.lock().unwrap();
1001 *errors += 1;
1002 drop(errors);
1003
1004 let mut history = self.error_history.lock().unwrap();
1005 history.push((std::time::Instant::now(), 1));
1006 self.cleanup_old_records(&mut history);
1007 }
1008
1009 pub fn record_corruption(&self) {
1011 let mut corruptions = self.total_corruptions.lock().unwrap();
1012 *corruptions += 1;
1013 drop(corruptions);
1014
1015 let mut history = self.corruption_history.lock().unwrap();
1016 history.push((std::time::Instant::now(), 1));
1017 self.cleanup_old_records(&mut history);
1018 }
1019
1020 pub fn record_io_latency(&self, latency_us: u64) {
1022 let mut history = self.io_latency_history.lock().unwrap();
1023 history.push((std::time::Instant::now(), latency_us));
1024 self.cleanup_old_records(&mut history);
1025 }
1026
1027 fn cleanup_old_records<T>(&self, history: &mut Vec<(std::time::Instant, T)>) {
1029 let cutoff = std::time::Instant::now() - self.retention_duration;
1030 history.retain(|(timestamp, _)| *timestamp > cutoff);
1031 }
1032
1033 #[must_use]
1035 pub fn error_rate(&self) -> f64 {
1036 let history = self.error_history.lock().unwrap();
1037 if history.is_empty() {
1038 return 0.0;
1039 }
1040
1041 let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
1043 let recent_errors: u32 = history
1044 .iter()
1045 .filter(|(t, _)| *t > cutoff)
1046 .map(|(_, count)| count)
1047 .sum();
1048
1049 recent_errors as f64
1050 }
1051
1052 #[must_use]
1054 pub fn corruption_rate(&self) -> f64 {
1055 let history = self.corruption_history.lock().unwrap();
1056 if history.is_empty() {
1057 return 0.0;
1058 }
1059
1060 let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
1062 let recent_corruptions: u32 = history
1063 .iter()
1064 .filter(|(t, _)| *t > cutoff)
1065 .map(|(_, count)| count)
1066 .sum();
1067
1068 recent_corruptions as f64
1069 }
1070
1071 #[must_use]
1073 pub fn avg_io_latency(&self) -> f64 {
1074 let history = self.io_latency_history.lock().unwrap();
1075 if history.is_empty() {
1076 return 0.0;
1077 }
1078
1079 let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
1081 let recent_latencies: Vec<u64> = history
1082 .iter()
1083 .filter(|(t, _)| *t > cutoff)
1084 .map(|(_, latency)| *latency)
1085 .collect();
1086
1087 if recent_latencies.is_empty() {
1088 return 0.0;
1089 }
1090
1091 let sum: u64 = recent_latencies.iter().sum();
1092 sum as f64 / recent_latencies.len() as f64
1093 }
1094
1095 #[must_use]
1103 pub fn predict_health(&self) -> (StorageHealthStatus, f64) {
1104 let error_rate = self.error_rate();
1105 let corruption_rate = self.corruption_rate();
1106 let avg_latency = self.avg_io_latency();
1107
1108 let mut score = 100.0;
1110 let mut confidence = 1.0;
1111
1112 if error_rate > 100.0 {
1114 score -= 40.0;
1115 } else if error_rate > 50.0 {
1116 score -= 25.0;
1117 } else if error_rate > 10.0 {
1118 score -= 10.0;
1119 }
1120
1121 if corruption_rate > 10.0 {
1123 score -= 50.0; } else if corruption_rate > 5.0 {
1125 score -= 30.0;
1126 } else if corruption_rate > 1.0 {
1127 score -= 15.0;
1128 }
1129
1130 if avg_latency > 50_000.0 {
1133 score -= 30.0; } else if avg_latency > 20_000.0 {
1135 score -= 15.0;
1136 } else if avg_latency > 10_000.0 {
1137 score -= 5.0;
1138 }
1139
1140 let history = self.io_latency_history.lock().unwrap();
1142 if history.len() < 10 {
1143 confidence = history.len() as f64 / 10.0;
1144 }
1145
1146 let status = if score >= 80.0 {
1148 StorageHealthStatus::Healthy
1149 } else if score >= 60.0 {
1150 StorageHealthStatus::Warning
1151 } else if score >= 40.0 {
1152 StorageHealthStatus::Degraded
1153 } else {
1154 StorageHealthStatus::Critical
1155 };
1156
1157 (status, confidence)
1158 }
1159
1160 #[must_use]
1164 pub fn is_failure_predicted(&self) -> bool {
1165 let (status, confidence) = self.predict_health();
1166
1167 match (status, confidence) {
1170 (StorageHealthStatus::Critical, c) if c > 0.7 => true,
1171 (StorageHealthStatus::Degraded, c) if c > 0.9 => true,
1172 _ => false,
1173 }
1174 }
1175
1176 #[must_use]
1178 pub fn health_report(&self) -> StorageHealthPrediction {
1179 let (predicted_status, confidence) = self.predict_health();
1180 let total_errors = *self.total_errors.lock().unwrap();
1181 let total_corruptions = *self.total_corruptions.lock().unwrap();
1182
1183 StorageHealthPrediction {
1184 current_status: predicted_status,
1185 confidence,
1186 error_rate_per_hour: self.error_rate(),
1187 corruption_rate_per_hour: self.corruption_rate(),
1188 avg_io_latency_us: self.avg_io_latency(),
1189 total_errors,
1190 total_corruptions,
1191 failure_predicted: self.is_failure_predicted(),
1192 }
1193 }
1194
1195 pub fn reset(&self) {
1197 self.error_history.lock().unwrap().clear();
1198 self.corruption_history.lock().unwrap().clear();
1199 self.io_latency_history.lock().unwrap().clear();
1200 *self.total_errors.lock().unwrap() = 0;
1201 *self.total_corruptions.lock().unwrap() = 0;
1202 }
1203}
1204
1205#[derive(Debug, Clone)]
1207pub struct StorageHealthPrediction {
1208 pub current_status: StorageHealthStatus,
1210 pub confidence: f64,
1212 pub error_rate_per_hour: f64,
1214 pub corruption_rate_per_hour: f64,
1216 pub avg_io_latency_us: f64,
1218 pub total_errors: u64,
1220 pub total_corruptions: u64,
1222 pub failure_predicted: bool,
1224}
1225
1226impl Default for StorageHealthMonitor {
1227 fn default() -> Self {
1228 Self::new(std::time::Duration::from_secs(24 * 3600)) }
1230}
1231
1232impl ChunkStorage {
1234 #[must_use]
1236 pub fn get_chunk_dir(&self, cid: &str) -> PathBuf {
1237 self.chunk_dir(cid)
1238 }
1239
1240 pub async fn write_chunks_for_transaction(
1244 &mut self,
1245 cid: &str,
1246 chunks: &[Vec<u8>],
1247 key: &EncryptionKey,
1248 nonce: &EncryptionNonce,
1249 ) -> Result<Vec<(u64, PathBuf, PathBuf, u64)>, StorageError> {
1250 let encryptor = StreamEncryptor::new(key, nonce);
1251 let mut written_chunks = Vec::new();
1252
1253 for (i, chunk) in chunks.iter().enumerate() {
1254 let chunk_index = i as u64;
1255
1256 let chunk_hash = hash(chunk);
1258
1259 let encrypted = encryptor
1261 .encrypt_chunk_at(chunk, chunk_index)
1262 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
1263
1264 let chunk_path = self.chunk_path(cid, chunk_index);
1266 fs::write(&chunk_path, &encrypted).await?;
1267
1268 let metadata = ChunkMetadata {
1270 cid: cid.to_string(),
1271 chunk_index,
1272 plaintext_size: chunk.len(),
1273 encrypted_size: encrypted.len(),
1274 hash: chunk_hash,
1275 };
1276 let meta_path = self.chunk_meta_path(cid, chunk_index);
1277 let meta_json = serde_json::to_vec(&metadata)
1278 .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
1279 fs::write(&meta_path, &meta_json).await?;
1280
1281 let size_bytes = encrypted.len() as u64;
1282 self.used_bytes += size_bytes;
1283
1284 written_chunks.push((chunk_index, chunk_path, meta_path, size_bytes));
1285 }
1286
1287 Ok(written_chunks)
1288 }
1289
1290 pub fn decrease_used_bytes(&mut self, bytes: u64) {
1292 self.used_bytes = self.used_bytes.saturating_sub(bytes);
1293 }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use super::*;
1299 use tempfile::TempDir;
1300
1301 #[tokio::test]
1302 async fn test_chunk_storage_creation() {
1303 let temp_dir = TempDir::new().unwrap();
1304 let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 1024 * 1024)
1305 .await
1306 .unwrap();
1307
1308 assert_eq!(storage.used_bytes(), 0);
1309 assert_eq!(storage.max_bytes(), 1024 * 1024);
1310 assert_eq!(storage.available_bytes(), 1024 * 1024);
1311 assert_eq!(storage.list_pinned().len(), 0);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_pin_and_retrieve_content() {
1316 let temp_dir = TempDir::new().unwrap();
1317 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1318 .await
1319 .unwrap();
1320
1321 let cid = "QmTest123";
1322 let test_data = vec![b"Hello, World!".to_vec(), b"Second chunk".to_vec()];
1323 let key = chie_crypto::generate_key();
1324 let nonce = chie_crypto::generate_nonce();
1325
1326 let info = storage
1328 .pin_content(cid, &test_data, &key, &nonce)
1329 .await
1330 .unwrap();
1331
1332 assert_eq!(info.cid, cid);
1333 assert_eq!(info.chunk_count, 2);
1334 assert!(storage.is_pinned(cid));
1335 assert_eq!(storage.list_pinned().len(), 1);
1336
1337 let chunk0 = storage.get_chunk(cid, 0).await.unwrap();
1339 let chunk1 = storage.get_chunk(cid, 1).await.unwrap();
1340
1341 assert_eq!(chunk0, test_data[0]);
1342 assert_eq!(chunk1, test_data[1]);
1343 }
1344
1345 #[tokio::test]
1346 async fn test_get_chunk_verified() {
1347 let temp_dir = TempDir::new().unwrap();
1348 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1349 .await
1350 .unwrap();
1351
1352 let cid = "QmVerified";
1353 let test_data = vec![b"Verified chunk data".to_vec()];
1354 let expected_hash = chie_crypto::hash(&test_data[0]);
1355 let key = chie_crypto::generate_key();
1356 let nonce = chie_crypto::generate_nonce();
1357
1358 storage
1359 .pin_content(cid, &test_data, &key, &nonce)
1360 .await
1361 .unwrap();
1362
1363 let (chunk, hash) = storage.get_chunk_verified(cid, 0).await.unwrap();
1364
1365 assert_eq!(chunk, test_data[0]);
1366 assert_eq!(hash, expected_hash);
1367 }
1368
1369 #[tokio::test]
1370 async fn test_unpin_content() {
1371 let temp_dir = TempDir::new().unwrap();
1372 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1373 .await
1374 .unwrap();
1375
1376 let cid = "QmUnpin";
1377 let test_data = vec![b"Data to unpin".to_vec()];
1378 let key = chie_crypto::generate_key();
1379 let nonce = chie_crypto::generate_nonce();
1380
1381 storage
1382 .pin_content(cid, &test_data, &key, &nonce)
1383 .await
1384 .unwrap();
1385 assert!(storage.is_pinned(cid));
1386 let used_before = storage.used_bytes();
1387 assert!(used_before > 0);
1388
1389 storage.unpin_content(cid).await.unwrap();
1390 assert!(!storage.is_pinned(cid));
1391 assert_eq!(storage.used_bytes(), 0);
1392 }
1393
1394 #[tokio::test]
1395 async fn test_quota_exceeded() {
1396 let temp_dir = TempDir::new().unwrap();
1397 let small_quota = 100; let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), small_quota)
1399 .await
1400 .unwrap();
1401
1402 let cid = "QmTooBig";
1403 let large_data = vec![vec![0u8; 1000]]; let key = chie_crypto::generate_key();
1405 let nonce = chie_crypto::generate_nonce();
1406
1407 let result = storage.pin_content(cid, &large_data, &key, &nonce).await;
1408 assert!(result.is_err());
1409 assert!(matches!(
1410 result.unwrap_err(),
1411 StorageError::QuotaExceeded { .. }
1412 ));
1413 }
1414
1415 #[tokio::test]
1416 async fn test_content_not_found() {
1417 let temp_dir = TempDir::new().unwrap();
1418 let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1419 .await
1420 .unwrap();
1421
1422 let result = storage.get_chunk("QmNonExistent", 0).await;
1423 assert!(result.is_err());
1424 assert!(matches!(
1425 result.unwrap_err(),
1426 StorageError::ContentNotFound { .. }
1427 ));
1428 }
1429
1430 #[tokio::test]
1431 async fn test_chunk_not_found() {
1432 let temp_dir = TempDir::new().unwrap();
1433 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1434 .await
1435 .unwrap();
1436
1437 let cid = "QmChunkTest";
1438 let test_data = vec![b"Only one chunk".to_vec()];
1439 let key = chie_crypto::generate_key();
1440 let nonce = chie_crypto::generate_nonce();
1441
1442 storage
1443 .pin_content(cid, &test_data, &key, &nonce)
1444 .await
1445 .unwrap();
1446
1447 let result = storage.get_chunk(cid, 99).await;
1449 assert!(result.is_err());
1450 assert!(matches!(
1451 result.unwrap_err(),
1452 StorageError::ChunkNotFound { .. }
1453 ));
1454 }
1455
1456 #[tokio::test]
1457 async fn test_storage_stats() {
1458 let temp_dir = TempDir::new().unwrap();
1459 let max_bytes = 10 * 1024 * 1024;
1460 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), max_bytes)
1461 .await
1462 .unwrap();
1463
1464 let stats_empty = storage.stats();
1465 assert_eq!(stats_empty.used_bytes, 0);
1466 assert_eq!(stats_empty.max_bytes, max_bytes);
1467 assert_eq!(stats_empty.available_bytes, max_bytes);
1468 assert_eq!(stats_empty.pinned_content_count, 0);
1469 assert_eq!(stats_empty.usage_percent, 0.0);
1470
1471 let cid = "QmStats";
1473 let test_data = vec![b"Test data for stats".to_vec()];
1474 let key = chie_crypto::generate_key();
1475 let nonce = chie_crypto::generate_nonce();
1476
1477 storage
1478 .pin_content(cid, &test_data, &key, &nonce)
1479 .await
1480 .unwrap();
1481
1482 let stats_used = storage.stats();
1483 assert!(stats_used.used_bytes > 0);
1484 assert_eq!(stats_used.max_bytes, max_bytes);
1485 assert!(stats_used.available_bytes < max_bytes);
1486 assert_eq!(stats_used.pinned_content_count, 1);
1487 assert!(stats_used.usage_percent > 0.0);
1488 }
1489
1490 #[tokio::test]
1491 async fn test_multiple_content_pins() {
1492 let temp_dir = TempDir::new().unwrap();
1493 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1494 .await
1495 .unwrap();
1496
1497 let key = chie_crypto::generate_key();
1498 let nonce = chie_crypto::generate_nonce();
1499
1500 for i in 0..5 {
1502 let cid = format!("QmMulti{}", i);
1503 let data = vec![format!("Content {}", i).into_bytes()];
1504 storage
1505 .pin_content(&cid, &data, &key, &nonce)
1506 .await
1507 .unwrap();
1508 }
1509
1510 assert_eq!(storage.list_pinned().len(), 5);
1511 assert!(storage.is_pinned("QmMulti0"));
1512 assert!(storage.is_pinned("QmMulti4"));
1513 assert!(!storage.is_pinned("QmMulti5"));
1514 }
1515
1516 #[tokio::test]
1517 async fn test_persistence() {
1518 let temp_dir = TempDir::new().unwrap();
1519 let path = temp_dir.path().to_path_buf();
1520 let cid = "QmPersist";
1521 let test_data = vec![b"Persistent data".to_vec()];
1522
1523 {
1525 let mut storage = ChunkStorage::new(path.clone(), 10 * 1024 * 1024)
1526 .await
1527 .unwrap();
1528 let key = chie_crypto::generate_key();
1529 let nonce = chie_crypto::generate_nonce();
1530 storage
1531 .pin_content(cid, &test_data, &key, &nonce)
1532 .await
1533 .unwrap();
1534 }
1535
1536 {
1538 let storage = ChunkStorage::new(path, 10 * 1024 * 1024).await.unwrap();
1539 assert!(storage.is_pinned(cid));
1540 assert_eq!(storage.list_pinned().len(), 1);
1541 assert!(storage.used_bytes() > 0);
1542 }
1543 }
1544
1545 #[test]
1546 fn test_split_into_chunks() {
1547 let data = vec![1u8; 100]; let chunk_size = 30;
1549
1550 let chunks = split_into_chunks(&data, chunk_size);
1551
1552 assert_eq!(chunks.len(), 4);
1554 assert_eq!(chunks[0].len(), 30);
1555 assert_eq!(chunks[1].len(), 30);
1556 assert_eq!(chunks[2].len(), 30);
1557 assert_eq!(chunks[3].len(), 10);
1558
1559 let reconstructed: Vec<u8> = chunks.into_iter().flatten().collect();
1561 assert_eq!(reconstructed, data);
1562 }
1563
1564 #[test]
1565 fn test_calculate_chunk_count() {
1566 assert_eq!(calculate_chunk_count(0), 0);
1567 assert_eq!(calculate_chunk_count(1), 1);
1568 assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64), 1);
1569 assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 + 1), 2);
1570 assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3), 3);
1571 assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3 + 1), 4);
1572 }
1573
1574 #[tokio::test]
1575 async fn test_get_pinned_info() {
1576 let temp_dir = TempDir::new().unwrap();
1577 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
1578 .await
1579 .unwrap();
1580
1581 let cid = "QmInfo";
1582 let test_data = vec![b"Info test".to_vec()];
1583 let key = chie_crypto::generate_key();
1584 let nonce = chie_crypto::generate_nonce();
1585
1586 storage
1587 .pin_content(cid, &test_data, &key, &nonce)
1588 .await
1589 .unwrap();
1590
1591 let info = storage.get_pinned_info(cid);
1592 assert!(info.is_some());
1593
1594 let info = info.unwrap();
1595 assert_eq!(info.cid, cid);
1596 assert_eq!(info.chunk_count, 1);
1597 assert_eq!(info.encryption_key, key);
1598 assert_eq!(info.base_nonce, nonce);
1599
1600 assert!(storage.get_pinned_info("QmNonExistent").is_none());
1601 }
1602
1603 #[tokio::test]
1604 async fn test_large_content() {
1605 let temp_dir = TempDir::new().unwrap();
1606 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 100 * 1024 * 1024)
1607 .await
1608 .unwrap();
1609
1610 let cid = "QmLarge";
1611 let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 64 * 1024]).collect();
1613 let key = chie_crypto::generate_key();
1614 let nonce = chie_crypto::generate_nonce();
1615
1616 let info = storage
1617 .pin_content(cid, &chunks, &key, &nonce)
1618 .await
1619 .unwrap();
1620
1621 assert_eq!(info.chunk_count, 10);
1622 assert_eq!(info.total_size, 64 * 1024 * 10);
1623
1624 for i in 0..10 {
1626 let chunk = storage.get_chunk(cid, i).await.unwrap();
1627 assert_eq!(chunk.len(), 64 * 1024);
1628 assert_eq!(chunk[0], i as u8);
1629 }
1630 }
1631}