use super::{retention::RetentionPolicies, QoSClass};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredDocument {
pub doc_id: String,
pub qos_class: QoSClass,
pub size_bytes: usize,
pub stored_at: u64,
pub last_accessed: u64,
pub protected: bool,
pub compressed: bool,
}
impl StoredDocument {
pub fn new(doc_id: impl Into<String>, qos_class: QoSClass, size_bytes: usize) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
doc_id: doc_id.into(),
qos_class,
size_bytes,
stored_at: now,
last_accessed: now,
protected: false,
compressed: false,
}
}
pub fn with_age(mut self, age_seconds: u64) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.stored_at = now.saturating_sub(age_seconds);
self.last_accessed = self.stored_at;
self
}
pub fn age_seconds(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now.saturating_sub(self.stored_at)
}
pub fn idle_seconds(&self) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now.saturating_sub(self.last_accessed)
}
pub fn touch(&mut self) {
self.last_accessed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
}
}
#[derive(Debug, Clone)]
pub struct EvictionCandidate {
pub doc_id: String,
pub qos_class: QoSClass,
pub age_seconds: u64,
pub size_bytes: usize,
pub eviction_score: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClassStorageMetrics {
pub doc_count: usize,
pub total_bytes: usize,
pub avg_age_seconds: u64,
pub oldest_doc_age: u64,
pub protected_count: usize,
pub compressed_count: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StorageMetrics {
pub max_bytes: usize,
pub used_bytes: usize,
pub utilization: f32,
pub by_class: HashMap<QoSClass, ClassStorageMetrics>,
pub recent_evictions: usize,
pub recent_bytes_freed: usize,
}
impl StorageMetrics {
pub fn available_bytes(&self) -> usize {
self.max_bytes.saturating_sub(self.used_bytes)
}
pub fn under_pressure(&self) -> bool {
self.utilization > 0.8
}
pub fn is_critical(&self) -> bool {
self.utilization > 0.95
}
}
#[derive(Debug)]
pub struct QoSAwareStorage {
max_storage_bytes: usize,
current_storage_bytes: AtomicUsize,
retention_policies: RetentionPolicies,
documents: RwLock<HashMap<String, StoredDocument>>,
eviction_threshold: f32,
}
impl QoSAwareStorage {
pub fn new(max_storage_bytes: usize) -> Self {
Self {
max_storage_bytes,
current_storage_bytes: AtomicUsize::new(0),
retention_policies: RetentionPolicies::default_tactical(),
documents: RwLock::new(HashMap::new()),
eviction_threshold: 0.9,
}
}
pub fn with_retention_policies(mut self, policies: RetentionPolicies) -> Self {
self.retention_policies = policies;
self
}
pub fn with_eviction_threshold(mut self, threshold: f32) -> Self {
self.eviction_threshold = threshold.clamp(0.5, 0.99);
self
}
pub fn register_document(&self, doc: StoredDocument) {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(existing) = docs.get(&doc.doc_id) {
let old_size = existing.size_bytes;
self.current_storage_bytes
.fetch_sub(old_size, Ordering::Relaxed);
}
let size = doc.size_bytes;
docs.insert(doc.doc_id.clone(), doc);
self.current_storage_bytes
.fetch_add(size, Ordering::Relaxed);
}
pub fn unregister_document(&self, doc_id: &str) -> Option<StoredDocument> {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(doc) = docs.remove(doc_id) {
self.current_storage_bytes
.fetch_sub(doc.size_bytes, Ordering::Relaxed);
Some(doc)
} else {
None
}
}
pub fn touch_document(&self, doc_id: &str) {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(doc) = docs.get_mut(doc_id) {
doc.touch();
}
}
pub fn mark_protected(&self, doc_id: &str) -> bool {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(doc) = docs.get_mut(doc_id) {
doc.protected = true;
true
} else {
false
}
}
pub fn unmark_protected(&self, doc_id: &str) -> bool {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(doc) = docs.get_mut(doc_id) {
doc.protected = false;
true
} else {
false
}
}
pub fn update_compressed(&self, doc_id: &str, new_size: usize) -> Option<usize> {
let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
if let Some(doc) = docs.get_mut(doc_id) {
let old_size = doc.size_bytes;
let diff = old_size.saturating_sub(new_size);
doc.size_bytes = new_size;
doc.compressed = true;
self.current_storage_bytes
.fetch_sub(diff, Ordering::Relaxed);
Some(diff)
} else {
None
}
}
pub fn storage_pressure(&self) -> f32 {
let used = self.current_storage_bytes.load(Ordering::Relaxed);
if self.max_storage_bytes == 0 {
0.0
} else {
used as f32 / self.max_storage_bytes as f32
}
}
pub fn should_evict(&self) -> bool {
self.storage_pressure() >= self.eviction_threshold
}
pub fn get_eviction_candidates(&self, bytes_needed: usize) -> Vec<EvictionCandidate> {
let pressure = self.storage_pressure();
let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
let mut candidates: Vec<EvictionCandidate> = docs
.values()
.filter(|doc| {
if doc.qos_class == QoSClass::Critical {
return false;
}
if doc.protected {
return false;
}
let policy = self.retention_policies.get(doc.qos_class);
policy.should_evict(doc.age_seconds(), pressure)
})
.map(|doc| {
let score = self.calculate_eviction_score(doc, pressure);
EvictionCandidate {
doc_id: doc.doc_id.clone(),
qos_class: doc.qos_class,
age_seconds: doc.age_seconds(),
size_bytes: doc.size_bytes,
eviction_score: score,
}
})
.collect();
candidates.sort_by(|a, b| {
b.eviction_score
.partial_cmp(&a.eviction_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut total_bytes = 0;
let mut selected = Vec::new();
for candidate in candidates {
selected.push(candidate.clone());
total_bytes += candidate.size_bytes;
if total_bytes >= bytes_needed {
break;
}
}
selected
}
fn calculate_eviction_score(&self, doc: &StoredDocument, pressure: f32) -> f64 {
let policy = self.retention_policies.get(doc.qos_class);
let class_score = (6 - policy.eviction_priority) as f64;
let age_factor = if policy.max_retain_seconds == u64::MAX {
0.0 } else {
(doc.age_seconds() as f64 / policy.max_retain_seconds as f64).min(1.0)
};
let idle_factor = (doc.idle_seconds() as f64 / 3600.0).min(1.0);
let size_factor = (doc.size_bytes as f64 / 1_000_000.0).min(1.0);
let pressure_factor = pressure as f64;
class_score * 10.0 + age_factor * 5.0
+ idle_factor * 3.0
+ size_factor * 1.0
+ pressure_factor * 2.0
}
pub fn get_compression_candidates(&self) -> Vec<String> {
let pressure = self.storage_pressure();
let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
docs.values()
.filter(|doc| {
doc.qos_class != QoSClass::Critical
&& !doc.protected
&& !doc.compressed
&& self
.retention_policies
.get(doc.qos_class)
.should_compress(pressure)
})
.map(|doc| doc.doc_id.clone())
.collect()
}
pub fn metrics(&self) -> StorageMetrics {
let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
let used = self.current_storage_bytes.load(Ordering::Relaxed);
let mut by_class: HashMap<QoSClass, ClassStorageMetrics> = HashMap::new();
for doc in docs.values() {
let entry = by_class.entry(doc.qos_class).or_default();
entry.doc_count += 1;
entry.total_bytes += doc.size_bytes;
entry.oldest_doc_age = entry.oldest_doc_age.max(doc.age_seconds());
if doc.protected {
entry.protected_count += 1;
}
if doc.compressed {
entry.compressed_count += 1;
}
}
for (class, metrics) in by_class.iter_mut() {
if metrics.doc_count > 0 {
let total_age: u64 = docs
.values()
.filter(|d| d.qos_class == *class)
.map(|d| d.age_seconds())
.sum();
metrics.avg_age_seconds = total_age / metrics.doc_count as u64;
}
}
StorageMetrics {
max_bytes: self.max_storage_bytes,
used_bytes: used,
utilization: self.storage_pressure(),
by_class,
recent_evictions: 0,
recent_bytes_freed: 0,
}
}
pub fn document_count(&self) -> usize {
self.documents
.read()
.unwrap_or_else(|e| e.into_inner())
.len()
}
pub fn contains(&self, doc_id: &str) -> bool {
self.documents
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key(doc_id)
}
pub fn get_document(&self, doc_id: &str) -> Option<StoredDocument> {
self.documents
.read()
.unwrap_or_else(|e| e.into_inner())
.get(doc_id)
.cloned()
}
pub fn max_storage_bytes(&self) -> usize {
self.max_storage_bytes
}
pub fn current_storage_bytes(&self) -> usize {
self.current_storage_bytes.load(Ordering::Relaxed)
}
pub fn available_bytes(&self) -> usize {
self.max_storage_bytes
.saturating_sub(self.current_storage_bytes.load(Ordering::Relaxed))
}
}
impl Default for QoSAwareStorage {
fn default() -> Self {
Self::new(1024 * 1024 * 1024)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stored_document_creation() {
let doc = StoredDocument::new("doc-123", QoSClass::Normal, 1024);
assert_eq!(doc.doc_id, "doc-123");
assert_eq!(doc.qos_class, QoSClass::Normal);
assert_eq!(doc.size_bytes, 1024);
assert!(!doc.protected);
assert!(!doc.compressed);
assert!(doc.stored_at > 0);
}
#[test]
fn test_storage_registration() {
let storage = QoSAwareStorage::new(1_000_000);
let doc1 = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
let doc2 = StoredDocument::new("doc-2", QoSClass::Low, 20_000);
storage.register_document(doc1);
storage.register_document(doc2);
assert_eq!(storage.document_count(), 2);
assert_eq!(storage.current_storage_bytes(), 30_000);
assert!(storage.contains("doc-1"));
assert!(storage.contains("doc-2"));
}
#[test]
fn test_storage_unregistration() {
let storage = QoSAwareStorage::new(1_000_000);
let doc = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
storage.register_document(doc);
assert_eq!(storage.current_storage_bytes(), 10_000);
let removed = storage.unregister_document("doc-1");
assert!(removed.is_some());
assert_eq!(storage.current_storage_bytes(), 0);
assert!(!storage.contains("doc-1"));
}
#[test]
fn test_storage_pressure() {
let storage = QoSAwareStorage::new(100_000);
assert_eq!(storage.storage_pressure(), 0.0);
let doc = StoredDocument::new("doc-1", QoSClass::Normal, 50_000);
storage.register_document(doc);
assert!((storage.storage_pressure() - 0.5).abs() < 0.01);
let doc2 = StoredDocument::new("doc-2", QoSClass::Normal, 40_000);
storage.register_document(doc2);
assert!((storage.storage_pressure() - 0.9).abs() < 0.01);
}
#[test]
fn test_should_evict_threshold() {
let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.8);
let doc1 = StoredDocument::new("doc-1", QoSClass::Bulk, 70_000);
storage.register_document(doc1);
assert!(!storage.should_evict());
let doc2 = StoredDocument::new("doc-2", QoSClass::Bulk, 15_000);
storage.register_document(doc2);
assert!(storage.should_evict());
}
#[test]
fn test_eviction_candidates_exclude_critical() {
let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
storage.register_document(
StoredDocument::new("critical-1", QoSClass::Critical, 10_000).with_age(1000),
);
storage
.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 10_000).with_age(400)); storage
.register_document(StoredDocument::new("low-1", QoSClass::Low, 10_000).with_age(4000));
let candidates = storage.get_eviction_candidates(20_000);
assert!(!candidates.iter().any(|c| c.qos_class == QoSClass::Critical));
assert!(candidates.iter().any(|c| c.doc_id == "bulk-1"));
}
#[test]
fn test_eviction_candidates_exclude_protected() {
let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
storage
.register_document(StoredDocument::new("doc-1", QoSClass::Bulk, 10_000).with_age(400));
storage
.register_document(StoredDocument::new("doc-2", QoSClass::Bulk, 10_000).with_age(400));
storage.mark_protected("doc-1");
let candidates = storage.get_eviction_candidates(20_000);
assert!(!candidates.iter().any(|c| c.doc_id == "doc-1"));
assert!(candidates.iter().any(|c| c.doc_id == "doc-2"));
}
#[test]
fn test_eviction_priority_order() {
let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.3);
storage.register_document(
StoredDocument::new("high-1", QoSClass::High, 5_000).with_age(700000),
); storage.register_document(
StoredDocument::new("normal-1", QoSClass::Normal, 5_000).with_age(100000),
); storage
.register_document(StoredDocument::new("low-1", QoSClass::Low, 5_000).with_age(4000)); storage
.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 5_000).with_age(400));
let candidates = storage.get_eviction_candidates(20_000);
assert!(!candidates.is_empty(), "Expected eviction candidates");
if candidates.len() >= 2 {
assert!(candidates[0].eviction_score >= candidates[1].eviction_score);
}
assert_eq!(candidates[0].qos_class, QoSClass::Bulk);
}
#[test]
fn test_compression_candidates() {
let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
storage.register_document(StoredDocument::new(
"critical-1",
QoSClass::Critical,
10_000,
));
storage.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 70_000));
let candidates = storage.get_compression_candidates();
assert!(!candidates.contains(&"critical-1".to_string()));
assert!(candidates.contains(&"bulk-1".to_string()));
}
#[test]
fn test_update_compressed() {
let storage = QoSAwareStorage::new(100_000);
storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 10_000));
assert_eq!(storage.current_storage_bytes(), 10_000);
let saved = storage.update_compressed("doc-1", 6_000);
assert_eq!(saved, Some(4_000));
assert_eq!(storage.current_storage_bytes(), 6_000);
let doc = storage.get_document("doc-1").unwrap();
assert!(doc.compressed);
assert_eq!(doc.size_bytes, 6_000);
}
#[test]
fn test_document_replacement() {
let storage = QoSAwareStorage::new(100_000);
storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 10_000));
assert_eq!(storage.current_storage_bytes(), 10_000);
storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 15_000));
assert_eq!(storage.current_storage_bytes(), 15_000);
assert_eq!(storage.document_count(), 1);
}
#[test]
fn test_storage_metrics() {
let storage = QoSAwareStorage::new(100_000);
storage.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 10_000));
storage.register_document(StoredDocument::new("bulk-2", QoSClass::Bulk, 15_000));
storage.register_document(StoredDocument::new("normal-1", QoSClass::Normal, 20_000));
storage.mark_protected("normal-1");
let metrics = storage.metrics();
assert_eq!(metrics.max_bytes, 100_000);
assert_eq!(metrics.used_bytes, 45_000);
assert!((metrics.utilization - 0.45).abs() < 0.01);
let bulk_metrics = metrics.by_class.get(&QoSClass::Bulk).unwrap();
assert_eq!(bulk_metrics.doc_count, 2);
assert_eq!(bulk_metrics.total_bytes, 25_000);
let normal_metrics = metrics.by_class.get(&QoSClass::Normal).unwrap();
assert_eq!(normal_metrics.protected_count, 1);
}
#[test]
fn test_touch_document() {
let storage = QoSAwareStorage::new(100_000);
let doc = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
let original_last_accessed = doc.last_accessed;
storage.register_document(doc);
std::thread::sleep(std::time::Duration::from_millis(10));
storage.touch_document("doc-1");
let updated_doc = storage.get_document("doc-1").unwrap();
assert!(updated_doc.last_accessed >= original_last_accessed);
}
#[test]
fn test_available_bytes() {
let storage = QoSAwareStorage::new(100_000);
assert_eq!(storage.available_bytes(), 100_000);
storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 30_000));
assert_eq!(storage.available_bytes(), 70_000);
}
#[test]
fn test_storage_metrics_helper_methods() {
let metrics = StorageMetrics {
max_bytes: 100_000,
used_bytes: 85_000,
utilization: 0.85,
by_class: HashMap::new(),
recent_evictions: 0,
recent_bytes_freed: 0,
};
assert_eq!(metrics.available_bytes(), 15_000);
assert!(metrics.under_pressure());
assert!(!metrics.is_critical());
let critical_metrics = StorageMetrics {
utilization: 0.97,
..metrics
};
assert!(critical_metrics.is_critical());
}
}