use super::types::{VersionMismatch, VersionedResult};
use crate::fs::correction::ChunkCorrection;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub struct CorrectionStats {
pub total_chunks: u64,
pub perfect_chunks: u64,
pub corrected_chunks: u64,
pub total_correction_bytes: u64,
pub total_original_bytes: u64,
}
impl CorrectionStats {
pub fn correction_ratio(&self) -> f64 {
if self.total_original_bytes == 0 {
return 0.0;
}
self.total_correction_bytes as f64 / self.total_original_bytes as f64
}
pub fn perfect_ratio(&self) -> f64 {
if self.total_chunks == 0 {
return 0.0;
}
self.perfect_chunks as f64 / self.total_chunks as f64
}
}
pub struct VersionedCorrectionStore {
corrections: Arc<RwLock<HashMap<u64, Arc<ChunkCorrection>>>>,
stats: Arc<RwLock<CorrectionStats>>,
version: Arc<AtomicU64>,
}
impl VersionedCorrectionStore {
pub fn new() -> Self {
Self {
corrections: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(CorrectionStats {
total_chunks: 0,
perfect_chunks: 0,
corrected_chunks: 0,
total_correction_bytes: 0,
total_original_bytes: 0,
})),
version: Arc::new(AtomicU64::new(0)),
}
}
pub fn current_version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn get(&self, chunk_id: u64) -> Option<(Arc<ChunkCorrection>, u64)> {
let corrections = self.corrections.read().unwrap();
let version = self.current_version();
corrections
.get(&chunk_id)
.map(|corr| (Arc::clone(corr), version))
}
pub fn update(
&self,
chunk_id: u64,
correction: ChunkCorrection,
expected_version: u64,
) -> VersionedResult<u64> {
let mut corrections = self.corrections.write().unwrap();
let mut stats = self.stats.write().unwrap();
let current_version = self.current_version();
if current_version != expected_version {
return Err(VersionMismatch {
expected: expected_version,
actual: current_version,
});
}
let is_new = !corrections.contains_key(&chunk_id);
if is_new {
stats.total_chunks += 1;
}
corrections.insert(chunk_id, Arc::new(correction));
let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
Ok(new_version)
}
pub fn batch_update(
&self,
updates: Vec<(u64, ChunkCorrection)>,
expected_version: u64,
) -> VersionedResult<u64> {
let mut corrections = self.corrections.write().unwrap();
let mut stats = self.stats.write().unwrap();
let current_version = self.current_version();
if current_version != expected_version {
return Err(VersionMismatch {
expected: expected_version,
actual: current_version,
});
}
for (chunk_id, correction) in updates {
let is_new = !corrections.contains_key(&chunk_id);
if is_new {
stats.total_chunks += 1;
}
corrections.insert(chunk_id, Arc::new(correction));
}
let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
Ok(new_version)
}
pub fn batch_insert_new(&self, updates: Vec<(u64, ChunkCorrection)>) -> VersionedResult<u64> {
let mut corrections = self.corrections.write().unwrap();
let mut stats = self.stats.write().unwrap();
for (chunk_id, correction) in updates {
corrections.insert(chunk_id, Arc::new(correction));
stats.total_chunks += 1;
}
let new_version = self.version.fetch_add(1, Ordering::AcqRel) + 1;
Ok(new_version)
}
pub fn remove(
&self,
chunk_id: u64,
expected_version: u64,
) -> VersionedResult<Option<Arc<ChunkCorrection>>> {
let mut corrections = self.corrections.write().unwrap();
let mut stats = self.stats.write().unwrap();
let current_version = self.current_version();
if current_version != expected_version {
return Err(VersionMismatch {
expected: expected_version,
actual: current_version,
});
}
let removed = corrections.remove(&chunk_id);
if removed.is_some() {
stats.total_chunks = stats.total_chunks.saturating_sub(1);
}
self.version.fetch_add(1, Ordering::AcqRel);
Ok(removed)
}
pub fn len(&self) -> usize {
self.corrections.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.corrections.read().unwrap().is_empty()
}
pub fn stats(&self) -> CorrectionStats {
self.stats.read().unwrap().clone()
}
pub fn chunk_ids(&self) -> Vec<u64> {
self.corrections.read().unwrap().keys().copied().collect()
}
}
impl Default for VersionedCorrectionStore {
fn default() -> Self {
Self::new()
}
}
impl Clone for VersionedCorrectionStore {
fn clone(&self) -> Self {
Self {
corrections: Arc::clone(&self.corrections),
stats: Arc::clone(&self.stats),
version: Arc::clone(&self.version),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_correction_store_creation() {
let store = VersionedCorrectionStore::new();
assert_eq!(store.current_version(), 0);
assert!(store.is_empty());
}
#[test]
fn test_stats() {
let store = VersionedCorrectionStore::new();
let stats = store.stats();
assert_eq!(stats.total_chunks, 0);
assert_eq!(stats.correction_ratio(), 0.0);
assert_eq!(stats.perfect_ratio(), 0.0);
}
}