use std::sync::Arc;
use bytes::Bytes;
use super::error::StorageError;
use super::io::WriteCoalescer;
use super::manager::{DiskManager, TorrentStorage};
use crate::cache::{BlockCache, MemoryBudget, PieceCache};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteResult {
Buffered,
PieceComplete {
valid: bool,
},
}
pub struct CachingDiskManager {
disk: DiskManager,
block_cache: Arc<BlockCache>,
piece_cache: Arc<PieceCache>,
memory_budget: Arc<MemoryBudget>,
coalescer: parking_lot::Mutex<WriteCoalescer>,
}
impl CachingDiskManager {
pub fn new(memory_limit: usize, piece_cache_size: usize) -> Self {
let block_cache_limit = (memory_limit as f64 * 0.7) as usize;
let coalescer_limit = (memory_limit as f64 * 0.1) as usize;
Self {
disk: DiskManager::new(),
block_cache: BlockCache::new(block_cache_limit),
piece_cache: PieceCache::new(piece_cache_size),
memory_budget: MemoryBudget::new(memory_limit),
coalescer: parking_lot::Mutex::new(WriteCoalescer::new(coalescer_limit)),
}
}
pub fn disk(&self) -> &DiskManager {
&self.disk
}
pub fn block_cache(&self) -> &Arc<BlockCache> {
&self.block_cache
}
pub fn piece_cache(&self) -> &Arc<PieceCache> {
&self.piece_cache
}
pub fn memory_budget(&self) -> &Arc<MemoryBudget> {
&self.memory_budget
}
pub fn register(&self, info_hash: String, storage: TorrentStorage) {
self.disk.register(info_hash, storage);
}
pub async fn unregister(&self, info_hash: &str) {
let regions = self.coalescer.lock().flush_torrent(info_hash);
if !regions.is_empty() {
tracing::debug!(
"Flushing {} regions for unregistered torrent",
regions.len()
);
}
self.disk.unregister(info_hash);
}
pub async fn write_block(
&self,
info_hash: &str,
piece_index: u32,
offset: u32,
data: Bytes,
piece_length: u32,
expected_hash: &[u8],
) -> Result<WriteResult, StorageError> {
let hash_version = if expected_hash.len() == 32 { 2 } else { 1 };
let is_complete = self.block_cache.add_block(
info_hash,
piece_index,
offset,
data,
piece_length,
hash_version,
);
if !is_complete {
return Ok(WriteResult::Buffered);
}
let valid = self.block_cache.finalize_and_verify_auto(
info_hash,
piece_index,
expected_hash,
piece_length,
);
if valid {
if let Some(piece_data) = self.block_cache.remove_piece(info_hash, piece_index) {
self.disk
.write_piece(info_hash, piece_index, &piece_data)
.await?;
self.piece_cache
.insert(info_hash, piece_index, piece_data, true);
}
} else {
self.block_cache.remove_piece(info_hash, piece_index);
}
Ok(WriteResult::PieceComplete { valid })
}
pub async fn read_piece(
&self,
info_hash: &str,
piece_index: u32,
) -> Result<Bytes, StorageError> {
if let Some(data) = self.piece_cache.get(info_hash, piece_index) {
return Ok(data);
}
let data = self.disk.read_piece(info_hash, piece_index).await?;
self.piece_cache
.insert(info_hash, piece_index, data.clone(), false);
Ok(data)
}
pub async fn read_block(
&self,
info_hash: &str,
piece_index: u32,
offset: u32,
length: u32,
) -> Result<Bytes, StorageError> {
if let Some(piece_data) = self.piece_cache.get(info_hash, piece_index) {
let start = offset as usize;
let end = start + length as usize;
if end <= piece_data.len() {
return Ok(piece_data.slice(start..end));
}
}
self.disk
.read_block(info_hash, piece_index, offset, length)
.await
}
pub async fn verify_piece(
&self,
info_hash: &str,
piece_index: u32,
) -> Result<bool, StorageError> {
self.disk.verify_piece(info_hash, piece_index).await
}
pub fn memory_stats(&self) -> MemoryStats {
MemoryStats {
block_cache_used: self.block_cache.memory_used(),
block_cache_limit: self.block_cache.memory_limit(),
total_budget: self.memory_budget.total_limit(),
budget_used: self.memory_budget.current_usage(),
}
}
pub async fn flush(&self, info_hash: &str) -> Result<(), StorageError> {
self.disk.flush(info_hash).await
}
}
#[derive(Debug, Clone, Copy)]
pub struct MemoryStats {
pub block_cache_used: usize,
pub block_cache_limit: usize,
pub total_budget: usize,
pub budget_used: usize,
}
impl Default for CachingDiskManager {
fn default() -> Self {
Self::new(256 * 1024 * 1024, 1000)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_memory_stats() {
let manager = CachingDiskManager::new(64 * 1024 * 1024, 100);
let stats = manager.memory_stats();
assert!(stats.total_budget > 0);
assert_eq!(stats.budget_used, 0);
}
}