use chie_crypto::{EncryptionKey, EncryptionNonce, StreamDecryptor, StreamEncryptor, hash};
use chie_shared::CHUNK_SIZE;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::fs;
#[derive(Debug, Error)]
pub enum StorageError {
#[error("Content not found: {cid}")]
ContentNotFound { cid: String },
#[error("Chunk not found: {cid}:{chunk_index}")]
ChunkNotFound { cid: String, chunk_index: u64 },
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Encryption error: {0}")]
EncryptionError(String),
#[error("Hash mismatch: expected {expected}, got {actual}")]
HashMismatch { expected: String, actual: String },
#[error("Storage quota exceeded: used {used} bytes, max {max} bytes")]
QuotaExceeded { used: u64, max: u64 },
#[error("Invalid chunk size: {size} bytes")]
InvalidChunkSize { size: usize },
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChunkMetadata {
pub cid: String,
pub chunk_index: u64,
pub plaintext_size: usize,
pub encrypted_size: usize,
pub hash: [u8; 32],
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PinnedContentInfo {
pub cid: String,
pub total_size: u64,
pub chunk_count: u64,
pub encryption_key: EncryptionKey,
pub base_nonce: EncryptionNonce,
pub pinned_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageHealthStatus {
Healthy,
Warning,
Degraded,
Critical,
}
impl StorageHealthStatus {
#[must_use]
#[inline]
pub const fn score(&self) -> u8 {
match self {
Self::Healthy => 100,
Self::Warning => 75,
Self::Degraded => 50,
Self::Critical => 25,
}
}
#[must_use]
#[inline]
pub const fn description(&self) -> &'static str {
match self {
Self::Healthy => "Storage is healthy",
Self::Warning => "Minor storage issues detected",
Self::Degraded => "Storage performance degraded",
Self::Critical => "Critical storage failure",
}
}
}
#[derive(Debug, Clone)]
pub struct StorageHealth {
pub status: StorageHealthStatus,
pub io_errors: u64,
pub slow_operations: u64,
pub avg_latency_ms: f64,
pub peak_latency_ms: u64,
pub disk_usage: f64,
pub growth_rate: f64,
pub time_until_full: Option<u64>,
pub last_check: std::time::Instant,
}
impl Default for StorageHealth {
fn default() -> Self {
Self {
status: StorageHealthStatus::Healthy,
io_errors: 0,
slow_operations: 0,
avg_latency_ms: 0.0,
peak_latency_ms: 0,
disk_usage: 0.0,
growth_rate: 0.0,
time_until_full: None,
last_check: std::time::Instant::now(),
}
}
}
impl StorageHealth {
#[must_use]
pub fn health_score(&self) -> f64 {
let mut score = 1.0;
if self.io_errors > 0 {
score -= (self.io_errors as f64 * 0.1).min(0.5);
}
if self.slow_operations > 10 {
score -= 0.2;
} else if self.slow_operations > 5 {
score -= 0.1;
}
if self.avg_latency_ms > 100.0 {
score -= 0.2;
} else if self.avg_latency_ms > 50.0 {
score -= 0.1;
}
if self.disk_usage > 0.95 {
score -= 0.3;
} else if self.disk_usage > 0.90 {
score -= 0.2;
} else if self.disk_usage > 0.80 {
score -= 0.1;
}
score.max(0.0)
}
#[must_use]
pub fn is_failure_imminent(&self) -> bool {
self.status == StorageHealthStatus::Critical
|| self.time_until_full.is_some_and(|t| t < 3600)
|| self.io_errors > 100
}
}
pub struct ChunkStorage {
base_path: PathBuf,
pinned_content: HashMap<String, PinnedContentInfo>,
used_bytes: u64,
max_bytes: u64,
health: StorageHealth,
previous_usage: Option<(u64, std::time::Instant)>,
}
impl ChunkStorage {
pub async fn new(base_path: PathBuf, max_bytes: u64) -> Result<Self, StorageError> {
fs::create_dir_all(&base_path).await?;
fs::create_dir_all(base_path.join("chunks")).await?;
fs::create_dir_all(base_path.join("metadata")).await?;
let mut storage = Self {
base_path,
pinned_content: HashMap::new(),
used_bytes: 0,
max_bytes,
health: StorageHealth::default(),
previous_usage: None,
};
storage.load_index().await?;
storage.update_health_metrics();
Ok(storage)
}
#[inline]
pub fn base_path(&self) -> &Path {
&self.base_path
}
#[inline]
pub fn used_bytes(&self) -> u64 {
self.used_bytes
}
#[inline]
pub fn max_bytes(&self) -> u64 {
self.max_bytes
}
#[inline]
pub fn available_bytes(&self) -> u64 {
self.max_bytes.saturating_sub(self.used_bytes)
}
#[inline]
pub fn is_pinned(&self, cid: &str) -> bool {
self.pinned_content.contains_key(cid)
}
#[inline]
pub fn get_pinned_info(&self, cid: &str) -> Option<&PinnedContentInfo> {
self.pinned_content.get(cid)
}
pub fn list_pinned(&self) -> Vec<&str> {
self.pinned_content.keys().map(|s| s.as_str()).collect()
}
pub async fn pin_content(
&mut self,
cid: &str,
chunks: &[Vec<u8>],
key: &EncryptionKey,
nonce: &EncryptionNonce,
) -> Result<PinnedContentInfo, StorageError> {
let total_size: u64 = chunks.iter().map(|c| c.len() as u64).sum();
if self.used_bytes + total_size > self.max_bytes {
return Err(StorageError::QuotaExceeded {
used: self.used_bytes,
max: self.max_bytes,
});
}
let content_dir = self.chunk_dir(cid);
fs::create_dir_all(&content_dir).await?;
let encryptor = StreamEncryptor::new(key, nonce);
let mut stored_size = 0u64;
for (i, chunk) in chunks.iter().enumerate() {
let chunk_index = i as u64;
let chunk_hash = hash(chunk);
let encrypted = encryptor
.encrypt_chunk_at(chunk, chunk_index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
let chunk_path = self.chunk_path(cid, chunk_index);
fs::write(&chunk_path, &encrypted).await?;
let metadata = ChunkMetadata {
cid: cid.to_string(),
chunk_index,
plaintext_size: chunk.len(),
encrypted_size: encrypted.len(),
hash: chunk_hash,
};
let meta_path = self.chunk_meta_path(cid, chunk_index);
let meta_json = serde_json::to_vec(&metadata)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
fs::write(&meta_path, meta_json).await?;
stored_size += encrypted.len() as u64;
}
let info = PinnedContentInfo {
cid: cid.to_string(),
total_size,
chunk_count: chunks.len() as u64,
encryption_key: *key,
base_nonce: *nonce,
pinned_at: chrono::Utc::now(),
};
let content_meta_path = self.content_meta_path(cid);
let meta_json =
serde_json::to_vec(&info).map_err(|e| StorageError::EncryptionError(e.to_string()))?;
fs::write(&content_meta_path, meta_json).await?;
self.pinned_content.insert(cid.to_string(), info.clone());
self.used_bytes += stored_size;
self.save_index().await?;
Ok(info)
}
pub async fn get_chunk(&self, cid: &str, chunk_index: u64) -> Result<Vec<u8>, StorageError> {
let info = self
.pinned_content
.get(cid)
.ok_or_else(|| StorageError::ContentNotFound {
cid: cid.to_string(),
})?;
let chunk_path = self.chunk_path(cid, chunk_index);
if !chunk_path.exists() {
return Err(StorageError::ChunkNotFound {
cid: cid.to_string(),
chunk_index,
});
}
let encrypted = fs::read(&chunk_path).await?;
let decryptor = StreamDecryptor::new(&info.encryption_key, &info.base_nonce);
let plaintext = decryptor
.decrypt_chunk_at(&encrypted, chunk_index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
Ok(plaintext)
}
pub async fn get_chunk_verified(
&self,
cid: &str,
chunk_index: u64,
) -> Result<(Vec<u8>, [u8; 32]), StorageError> {
let plaintext = self.get_chunk(cid, chunk_index).await?;
let chunk_hash = hash(&plaintext);
let meta_path = self.chunk_meta_path(cid, chunk_index);
if meta_path.exists() {
let meta_json = fs::read(&meta_path).await?;
let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
if chunk_hash != metadata.hash {
return Err(StorageError::HashMismatch {
expected: hex::encode(metadata.hash),
actual: hex::encode(chunk_hash),
});
}
}
Ok((plaintext, chunk_hash))
}
pub async fn get_chunks_batch(
&self,
cid: &str,
chunk_indices: &[u64],
) -> Result<Vec<Vec<u8>>, StorageError> {
use tokio::task::JoinSet;
let mut tasks = JoinSet::new();
let info = self
.pinned_content
.get(cid)
.ok_or_else(|| StorageError::ContentNotFound {
cid: cid.to_string(),
})?
.clone();
let cid = cid.to_string();
let base_path = self.base_path.clone();
for &chunk_index in chunk_indices {
let cid_clone = cid.clone();
let info_clone = info.clone();
let base_path_clone = base_path.clone();
tasks.spawn(async move {
let chunk_path = base_path_clone
.join("chunks")
.join(&cid_clone)
.join(format!("{}.enc", chunk_index));
if !chunk_path.exists() {
return Err(StorageError::ChunkNotFound {
cid: cid_clone,
chunk_index,
});
}
let encrypted = fs::read(&chunk_path).await?;
let decryptor =
StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
let plaintext = decryptor
.decrypt_chunk_at(&encrypted, chunk_index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
Ok((chunk_index, plaintext))
});
}
let mut results: Vec<(u64, Vec<u8>)> = Vec::new();
while let Some(result) = tasks.join_next().await {
let (index, chunk) = result
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
results.push((index, chunk));
}
results.sort_by_key(|(idx, _)| *idx);
Ok(results.into_iter().map(|(_, chunk)| chunk).collect())
}
pub async fn get_chunks_batch_verified(
&self,
cid: &str,
chunk_indices: &[u64],
) -> Result<Vec<(Vec<u8>, [u8; 32])>, StorageError> {
use tokio::task::JoinSet;
let mut tasks = JoinSet::new();
let info = self
.pinned_content
.get(cid)
.ok_or_else(|| StorageError::ContentNotFound {
cid: cid.to_string(),
})?
.clone();
let cid = cid.to_string();
let base_path = self.base_path.clone();
for &chunk_index in chunk_indices {
let cid_clone = cid.clone();
let info_clone = info.clone();
let base_path_clone = base_path.clone();
tasks.spawn(async move {
let chunk_path = base_path_clone
.join("chunks")
.join(&cid_clone)
.join(format!("{}.enc", chunk_index));
if !chunk_path.exists() {
return Err(StorageError::ChunkNotFound {
cid: cid_clone.clone(),
chunk_index,
});
}
let encrypted = fs::read(&chunk_path).await?;
let decryptor =
StreamDecryptor::new(&info_clone.encryption_key, &info_clone.base_nonce);
let plaintext = decryptor
.decrypt_chunk_at(&encrypted, chunk_index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
let chunk_hash = hash(&plaintext);
let meta_path = base_path_clone
.join("chunks")
.join(&cid_clone)
.join(format!("{}.meta", chunk_index));
if meta_path.exists() {
let meta_json = fs::read(&meta_path).await?;
let metadata: ChunkMetadata = serde_json::from_slice(&meta_json)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
if chunk_hash != metadata.hash {
return Err(StorageError::HashMismatch {
expected: hex::encode(metadata.hash),
actual: hex::encode(chunk_hash),
});
}
}
Ok((chunk_index, plaintext, chunk_hash))
});
}
let mut results: Vec<(u64, Vec<u8>, [u8; 32])> = Vec::new();
while let Some(result) = tasks.join_next().await {
let (index, chunk, hash) = result
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))??;
results.push((index, chunk, hash));
}
results.sort_by_key(|(idx, _, _)| *idx);
Ok(results
.into_iter()
.map(|(_, chunk, hash)| (chunk, hash))
.collect())
}
pub async fn unpin_content(&mut self, cid: &str) -> Result<(), StorageError> {
if !self.pinned_content.contains_key(cid) {
return Ok(()); }
let content_dir = self.chunk_dir(cid);
let mut freed_bytes = 0u64;
if content_dir.exists() {
let mut entries = fs::read_dir(&content_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
freed_bytes += metadata.len();
}
fs::remove_dir_all(&content_dir).await?;
}
let meta_path = self.content_meta_path(cid);
if meta_path.exists() {
fs::remove_file(&meta_path).await?;
}
self.pinned_content.remove(cid);
self.used_bytes = self.used_bytes.saturating_sub(freed_bytes);
self.save_index().await?;
Ok(())
}
pub fn stats(&self) -> StorageStats {
StorageStats {
used_bytes: self.used_bytes,
max_bytes: self.max_bytes,
available_bytes: self.available_bytes(),
pinned_content_count: self.pinned_content.len(),
usage_percent: (self.used_bytes as f64 / self.max_bytes as f64) * 100.0,
}
}
pub async fn health_check(&self) -> Result<StorageHealthReport, StorageError> {
let mut report = StorageHealthReport {
total_content: self.pinned_content.len(),
healthy_content: 0,
corrupted_chunks: Vec::new(),
missing_chunks: Vec::new(),
metadata_issues: Vec::new(),
};
for (cid, info) in &self.pinned_content {
let mut content_healthy = true;
for chunk_index in 0..info.chunk_count {
let chunk_path = self.chunk_path(cid, chunk_index);
let meta_path = self.chunk_meta_path(cid, chunk_index);
if !chunk_path.exists() {
report
.missing_chunks
.push(format!("{}:{}", cid, chunk_index));
content_healthy = false;
continue;
}
if !meta_path.exists() {
report
.metadata_issues
.push(format!("{}:{} - missing metadata", cid, chunk_index));
content_healthy = false;
continue;
}
match self.get_chunk_verified(cid, chunk_index).await {
Ok(_) => {} Err(StorageError::HashMismatch { .. }) => {
report
.corrupted_chunks
.push(format!("{}:{}", cid, chunk_index));
content_healthy = false;
}
Err(e) => {
report
.metadata_issues
.push(format!("{}:{} - {}", cid, chunk_index, e));
content_healthy = false;
}
}
}
if content_healthy {
report.healthy_content += 1;
}
}
Ok(report)
}
pub async fn repair(&mut self, cid: &str) -> Result<RepairResult, StorageError> {
let info = self
.pinned_content
.get(cid)
.ok_or_else(|| StorageError::ContentNotFound {
cid: cid.to_string(),
})?
.clone();
let mut chunks_needing_repair = Vec::new();
#[allow(clippy::redundant_pattern_matching)]
for chunk_index in 0..info.chunk_count {
if self.get_chunk_verified(cid, chunk_index).await.is_err() {
chunks_needing_repair.push(chunk_index);
}
}
let status = if chunks_needing_repair.is_empty() {
RepairStatus::Healthy
} else {
RepairStatus::NeedsRepair
};
Ok(RepairResult {
cid: cid.to_string(),
chunks_needing_repair,
status,
})
}
fn chunk_dir(&self, cid: &str) -> PathBuf {
self.base_path.join("chunks").join(cid)
}
fn chunk_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
self.chunk_dir(cid).join(format!("{}.enc", chunk_index))
}
fn chunk_meta_path(&self, cid: &str, chunk_index: u64) -> PathBuf {
self.chunk_dir(cid).join(format!("{}.meta", chunk_index))
}
fn content_meta_path(&self, cid: &str) -> PathBuf {
self.base_path
.join("metadata")
.join(format!("{}.json", cid))
}
fn index_path(&self) -> PathBuf {
self.base_path.join("index.json")
}
async fn load_index(&mut self) -> Result<(), StorageError> {
let index_path = self.index_path();
if !index_path.exists() {
return Ok(());
}
let data = fs::read(&index_path).await?;
let index: StorageIndex = serde_json::from_slice(&data)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
self.used_bytes = index.used_bytes;
for cid in index.pinned_cids {
let meta_path = self.content_meta_path(&cid);
if meta_path.exists() {
let meta_data = fs::read(&meta_path).await?;
let info: PinnedContentInfo = serde_json::from_slice(&meta_data)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
self.pinned_content.insert(cid, info);
}
}
Ok(())
}
async fn save_index(&self) -> Result<(), StorageError> {
let index = StorageIndex {
used_bytes: self.used_bytes,
pinned_cids: self.pinned_content.keys().cloned().collect(),
};
let data = serde_json::to_vec_pretty(&index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
fs::write(self.index_path(), data).await?;
Ok(())
}
pub fn update_health_metrics(&mut self) {
let now = std::time::Instant::now();
let disk_usage = if self.max_bytes > 0 {
self.used_bytes as f64 / self.max_bytes as f64
} else {
0.0
};
let growth_rate = if let Some((prev_usage, prev_time)) = self.previous_usage {
let duration_secs = now.duration_since(prev_time).as_secs_f64();
if duration_secs > 0.0 {
let bytes_change = self.used_bytes.saturating_sub(prev_usage) as f64;
bytes_change / duration_secs
} else {
0.0
}
} else {
0.0
};
let time_until_full = if growth_rate > 0.0 {
let available = self.max_bytes.saturating_sub(self.used_bytes) as f64;
Some((available / growth_rate) as u64)
} else {
None
};
let status = if self.health.io_errors > 100 || disk_usage > 0.98 {
StorageHealthStatus::Critical
} else if self.health.io_errors > 50 || disk_usage > 0.95 {
StorageHealthStatus::Degraded
} else if self.health.io_errors > 10 || disk_usage > 0.90 {
StorageHealthStatus::Warning
} else {
StorageHealthStatus::Healthy
};
self.health.status = status;
self.health.disk_usage = disk_usage;
self.health.growth_rate = growth_rate;
self.health.time_until_full = time_until_full;
self.health.last_check = now;
self.previous_usage = Some((self.used_bytes, now));
}
#[must_use]
#[inline]
pub fn health(&self) -> &StorageHealth {
&self.health
}
pub fn record_io_error(&mut self) {
self.health.io_errors += 1;
self.update_health_metrics();
}
pub fn record_slow_operation(&mut self, latency_ms: u64) {
self.health.slow_operations += 1;
if latency_ms > self.health.peak_latency_ms {
self.health.peak_latency_ms = latency_ms;
}
let alpha = 0.1; self.health.avg_latency_ms =
alpha * latency_ms as f64 + (1.0 - alpha) * self.health.avg_latency_ms;
self.update_health_metrics();
}
pub fn reset_health_counters(&mut self) {
self.health.io_errors = 0;
self.health.slow_operations = 0;
self.update_health_metrics();
}
#[must_use]
#[inline]
pub fn is_health_concerning(&self) -> bool {
self.health.status == StorageHealthStatus::Degraded
|| self.health.status == StorageHealthStatus::Critical
|| self.health.is_failure_imminent()
}
}
#[derive(Debug, Clone)]
pub struct StorageStats {
pub used_bytes: u64,
pub max_bytes: u64,
pub available_bytes: u64,
pub pinned_content_count: usize,
pub usage_percent: f64,
}
#[derive(Debug, Clone)]
pub struct StorageHealthReport {
pub total_content: usize,
pub healthy_content: usize,
pub corrupted_chunks: Vec<String>,
pub missing_chunks: Vec<String>,
pub metadata_issues: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct RepairResult {
pub cid: String,
pub chunks_needing_repair: Vec<u64>,
pub status: RepairStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RepairStatus {
Healthy,
NeedsRepair,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct StorageIndex {
used_bytes: u64,
pinned_cids: Vec<String>,
}
pub fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<Vec<u8>> {
data.chunks(chunk_size).map(|c| c.to_vec()).collect()
}
#[inline]
#[allow(clippy::manual_div_ceil)] pub const fn calculate_chunk_count(size: u64) -> u64 {
let chunk_size = CHUNK_SIZE as u64;
if size == 0 {
0
} else {
(size + chunk_size - 1) / chunk_size
}
}
pub struct StorageHealthMonitor {
error_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
corruption_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u32)>>>,
io_latency_history: std::sync::Arc<std::sync::Mutex<Vec<(std::time::Instant, u64)>>>,
total_errors: std::sync::Arc<std::sync::Mutex<u64>>,
total_corruptions: std::sync::Arc<std::sync::Mutex<u64>>,
retention_duration: std::time::Duration,
}
impl StorageHealthMonitor {
#[must_use]
pub fn new(retention_duration: std::time::Duration) -> Self {
Self {
error_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
corruption_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
io_latency_history: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
total_errors: std::sync::Arc::new(std::sync::Mutex::new(0)),
total_corruptions: std::sync::Arc::new(std::sync::Mutex::new(0)),
retention_duration,
}
}
pub fn record_error(&self) {
let mut errors = self.total_errors.lock().unwrap();
*errors += 1;
drop(errors);
let mut history = self.error_history.lock().unwrap();
history.push((std::time::Instant::now(), 1));
self.cleanup_old_records(&mut history);
}
pub fn record_corruption(&self) {
let mut corruptions = self.total_corruptions.lock().unwrap();
*corruptions += 1;
drop(corruptions);
let mut history = self.corruption_history.lock().unwrap();
history.push((std::time::Instant::now(), 1));
self.cleanup_old_records(&mut history);
}
pub fn record_io_latency(&self, latency_us: u64) {
let mut history = self.io_latency_history.lock().unwrap();
history.push((std::time::Instant::now(), latency_us));
self.cleanup_old_records(&mut history);
}
fn cleanup_old_records<T>(&self, history: &mut Vec<(std::time::Instant, T)>) {
let cutoff = std::time::Instant::now() - self.retention_duration;
history.retain(|(timestamp, _)| *timestamp > cutoff);
}
#[must_use]
pub fn error_rate(&self) -> f64 {
let history = self.error_history.lock().unwrap();
if history.is_empty() {
return 0.0;
}
let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
let recent_errors: u32 = history
.iter()
.filter(|(t, _)| *t > cutoff)
.map(|(_, count)| count)
.sum();
recent_errors as f64
}
#[must_use]
pub fn corruption_rate(&self) -> f64 {
let history = self.corruption_history.lock().unwrap();
if history.is_empty() {
return 0.0;
}
let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
let recent_corruptions: u32 = history
.iter()
.filter(|(t, _)| *t > cutoff)
.map(|(_, count)| count)
.sum();
recent_corruptions as f64
}
#[must_use]
pub fn avg_io_latency(&self) -> f64 {
let history = self.io_latency_history.lock().unwrap();
if history.is_empty() {
return 0.0;
}
let window = std::time::Duration::from_secs(3600); let cutoff = std::time::Instant::now() - window;
let recent_latencies: Vec<u64> = history
.iter()
.filter(|(t, _)| *t > cutoff)
.map(|(_, latency)| *latency)
.collect();
if recent_latencies.is_empty() {
return 0.0;
}
let sum: u64 = recent_latencies.iter().sum();
sum as f64 / recent_latencies.len() as f64
}
#[must_use]
pub fn predict_health(&self) -> (StorageHealthStatus, f64) {
let error_rate = self.error_rate();
let corruption_rate = self.corruption_rate();
let avg_latency = self.avg_io_latency();
let mut score = 100.0;
let mut confidence = 1.0;
if error_rate > 100.0 {
score -= 40.0;
} else if error_rate > 50.0 {
score -= 25.0;
} else if error_rate > 10.0 {
score -= 10.0;
}
if corruption_rate > 10.0 {
score -= 50.0; } else if corruption_rate > 5.0 {
score -= 30.0;
} else if corruption_rate > 1.0 {
score -= 15.0;
}
if avg_latency > 50_000.0 {
score -= 30.0; } else if avg_latency > 20_000.0 {
score -= 15.0;
} else if avg_latency > 10_000.0 {
score -= 5.0;
}
let history = self.io_latency_history.lock().unwrap();
if history.len() < 10 {
confidence = history.len() as f64 / 10.0;
}
let status = if score >= 80.0 {
StorageHealthStatus::Healthy
} else if score >= 60.0 {
StorageHealthStatus::Warning
} else if score >= 40.0 {
StorageHealthStatus::Degraded
} else {
StorageHealthStatus::Critical
};
(status, confidence)
}
#[must_use]
pub fn is_failure_predicted(&self) -> bool {
let (status, confidence) = self.predict_health();
match (status, confidence) {
(StorageHealthStatus::Critical, c) if c > 0.7 => true,
(StorageHealthStatus::Degraded, c) if c > 0.9 => true,
_ => false,
}
}
#[must_use]
pub fn health_report(&self) -> StorageHealthPrediction {
let (predicted_status, confidence) = self.predict_health();
let total_errors = *self.total_errors.lock().unwrap();
let total_corruptions = *self.total_corruptions.lock().unwrap();
StorageHealthPrediction {
current_status: predicted_status,
confidence,
error_rate_per_hour: self.error_rate(),
corruption_rate_per_hour: self.corruption_rate(),
avg_io_latency_us: self.avg_io_latency(),
total_errors,
total_corruptions,
failure_predicted: self.is_failure_predicted(),
}
}
pub fn reset(&self) {
self.error_history.lock().unwrap().clear();
self.corruption_history.lock().unwrap().clear();
self.io_latency_history.lock().unwrap().clear();
*self.total_errors.lock().unwrap() = 0;
*self.total_corruptions.lock().unwrap() = 0;
}
}
#[derive(Debug, Clone)]
pub struct StorageHealthPrediction {
pub current_status: StorageHealthStatus,
pub confidence: f64,
pub error_rate_per_hour: f64,
pub corruption_rate_per_hour: f64,
pub avg_io_latency_us: f64,
pub total_errors: u64,
pub total_corruptions: u64,
pub failure_predicted: bool,
}
impl Default for StorageHealthMonitor {
fn default() -> Self {
Self::new(std::time::Duration::from_secs(24 * 3600)) }
}
impl ChunkStorage {
#[must_use]
pub fn get_chunk_dir(&self, cid: &str) -> PathBuf {
self.chunk_dir(cid)
}
pub async fn write_chunks_for_transaction(
&mut self,
cid: &str,
chunks: &[Vec<u8>],
key: &EncryptionKey,
nonce: &EncryptionNonce,
) -> Result<Vec<(u64, PathBuf, PathBuf, u64)>, StorageError> {
let encryptor = StreamEncryptor::new(key, nonce);
let mut written_chunks = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
let chunk_index = i as u64;
let chunk_hash = hash(chunk);
let encrypted = encryptor
.encrypt_chunk_at(chunk, chunk_index)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
let chunk_path = self.chunk_path(cid, chunk_index);
fs::write(&chunk_path, &encrypted).await?;
let metadata = ChunkMetadata {
cid: cid.to_string(),
chunk_index,
plaintext_size: chunk.len(),
encrypted_size: encrypted.len(),
hash: chunk_hash,
};
let meta_path = self.chunk_meta_path(cid, chunk_index);
let meta_json = serde_json::to_vec(&metadata)
.map_err(|e| StorageError::EncryptionError(e.to_string()))?;
fs::write(&meta_path, &meta_json).await?;
let size_bytes = encrypted.len() as u64;
self.used_bytes += size_bytes;
written_chunks.push((chunk_index, chunk_path, meta_path, size_bytes));
}
Ok(written_chunks)
}
pub fn decrease_used_bytes(&mut self, bytes: u64) {
self.used_bytes = self.used_bytes.saturating_sub(bytes);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_chunk_storage_creation() {
let temp_dir = TempDir::new().unwrap();
let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 1024 * 1024)
.await
.unwrap();
assert_eq!(storage.used_bytes(), 0);
assert_eq!(storage.max_bytes(), 1024 * 1024);
assert_eq!(storage.available_bytes(), 1024 * 1024);
assert_eq!(storage.list_pinned().len(), 0);
}
#[tokio::test]
async fn test_pin_and_retrieve_content() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let cid = "QmTest123";
let test_data = vec![b"Hello, World!".to_vec(), b"Second chunk".to_vec()];
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
let info = storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
assert_eq!(info.cid, cid);
assert_eq!(info.chunk_count, 2);
assert!(storage.is_pinned(cid));
assert_eq!(storage.list_pinned().len(), 1);
let chunk0 = storage.get_chunk(cid, 0).await.unwrap();
let chunk1 = storage.get_chunk(cid, 1).await.unwrap();
assert_eq!(chunk0, test_data[0]);
assert_eq!(chunk1, test_data[1]);
}
#[tokio::test]
async fn test_get_chunk_verified() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let cid = "QmVerified";
let test_data = vec![b"Verified chunk data".to_vec()];
let expected_hash = chie_crypto::hash(&test_data[0]);
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
let (chunk, hash) = storage.get_chunk_verified(cid, 0).await.unwrap();
assert_eq!(chunk, test_data[0]);
assert_eq!(hash, expected_hash);
}
#[tokio::test]
async fn test_unpin_content() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let cid = "QmUnpin";
let test_data = vec![b"Data to unpin".to_vec()];
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
assert!(storage.is_pinned(cid));
let used_before = storage.used_bytes();
assert!(used_before > 0);
storage.unpin_content(cid).await.unwrap();
assert!(!storage.is_pinned(cid));
assert_eq!(storage.used_bytes(), 0);
}
#[tokio::test]
async fn test_quota_exceeded() {
let temp_dir = TempDir::new().unwrap();
let small_quota = 100; let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), small_quota)
.await
.unwrap();
let cid = "QmTooBig";
let large_data = vec![vec![0u8; 1000]]; let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
let result = storage.pin_content(cid, &large_data, &key, &nonce).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StorageError::QuotaExceeded { .. }
));
}
#[tokio::test]
async fn test_content_not_found() {
let temp_dir = TempDir::new().unwrap();
let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let result = storage.get_chunk("QmNonExistent", 0).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StorageError::ContentNotFound { .. }
));
}
#[tokio::test]
async fn test_chunk_not_found() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let cid = "QmChunkTest";
let test_data = vec![b"Only one chunk".to_vec()];
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
let result = storage.get_chunk(cid, 99).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
StorageError::ChunkNotFound { .. }
));
}
#[tokio::test]
async fn test_storage_stats() {
let temp_dir = TempDir::new().unwrap();
let max_bytes = 10 * 1024 * 1024;
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), max_bytes)
.await
.unwrap();
let stats_empty = storage.stats();
assert_eq!(stats_empty.used_bytes, 0);
assert_eq!(stats_empty.max_bytes, max_bytes);
assert_eq!(stats_empty.available_bytes, max_bytes);
assert_eq!(stats_empty.pinned_content_count, 0);
assert_eq!(stats_empty.usage_percent, 0.0);
let cid = "QmStats";
let test_data = vec![b"Test data for stats".to_vec()];
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
let stats_used = storage.stats();
assert!(stats_used.used_bytes > 0);
assert_eq!(stats_used.max_bytes, max_bytes);
assert!(stats_used.available_bytes < max_bytes);
assert_eq!(stats_used.pinned_content_count, 1);
assert!(stats_used.usage_percent > 0.0);
}
#[tokio::test]
async fn test_multiple_content_pins() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
for i in 0..5 {
let cid = format!("QmMulti{}", i);
let data = vec![format!("Content {}", i).into_bytes()];
storage
.pin_content(&cid, &data, &key, &nonce)
.await
.unwrap();
}
assert_eq!(storage.list_pinned().len(), 5);
assert!(storage.is_pinned("QmMulti0"));
assert!(storage.is_pinned("QmMulti4"));
assert!(!storage.is_pinned("QmMulti5"));
}
#[tokio::test]
async fn test_persistence() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
let cid = "QmPersist";
let test_data = vec![b"Persistent data".to_vec()];
{
let mut storage = ChunkStorage::new(path.clone(), 10 * 1024 * 1024)
.await
.unwrap();
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
}
{
let storage = ChunkStorage::new(path, 10 * 1024 * 1024).await.unwrap();
assert!(storage.is_pinned(cid));
assert_eq!(storage.list_pinned().len(), 1);
assert!(storage.used_bytes() > 0);
}
}
#[test]
fn test_split_into_chunks() {
let data = vec![1u8; 100]; let chunk_size = 30;
let chunks = split_into_chunks(&data, chunk_size);
assert_eq!(chunks.len(), 4);
assert_eq!(chunks[0].len(), 30);
assert_eq!(chunks[1].len(), 30);
assert_eq!(chunks[2].len(), 30);
assert_eq!(chunks[3].len(), 10);
let reconstructed: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(reconstructed, data);
}
#[test]
fn test_calculate_chunk_count() {
assert_eq!(calculate_chunk_count(0), 0);
assert_eq!(calculate_chunk_count(1), 1);
assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64), 1);
assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 + 1), 2);
assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3), 3);
assert_eq!(calculate_chunk_count(CHUNK_SIZE as u64 * 3 + 1), 4);
}
#[tokio::test]
async fn test_get_pinned_info() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10 * 1024 * 1024)
.await
.unwrap();
let cid = "QmInfo";
let test_data = vec![b"Info test".to_vec()];
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
storage
.pin_content(cid, &test_data, &key, &nonce)
.await
.unwrap();
let info = storage.get_pinned_info(cid);
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.cid, cid);
assert_eq!(info.chunk_count, 1);
assert_eq!(info.encryption_key, key);
assert_eq!(info.base_nonce, nonce);
assert!(storage.get_pinned_info("QmNonExistent").is_none());
}
#[tokio::test]
async fn test_large_content() {
let temp_dir = TempDir::new().unwrap();
let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 100 * 1024 * 1024)
.await
.unwrap();
let cid = "QmLarge";
let chunks: Vec<Vec<u8>> = (0..10).map(|i| vec![i as u8; 64 * 1024]).collect();
let key = chie_crypto::generate_key();
let nonce = chie_crypto::generate_nonce();
let info = storage
.pin_content(cid, &chunks, &key, &nonce)
.await
.unwrap();
assert_eq!(info.chunk_count, 10);
assert_eq!(info.total_size, 64 * 1024 * 10);
for i in 0..10 {
let chunk = storage.get_chunk(cid, i).await.unwrap();
assert_eq!(chunk.len(), 64 * 1024);
assert_eq!(chunk[0], i as u8);
}
}
}