1use super::{retention::RetentionPolicies, QoSClass};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::RwLock;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct StoredDocument {
23 pub doc_id: String,
25
26 pub qos_class: QoSClass,
28
29 pub size_bytes: usize,
31
32 pub stored_at: u64,
34
35 pub last_accessed: u64,
37
38 pub protected: bool,
40
41 pub compressed: bool,
43}
44
45impl StoredDocument {
46 pub fn new(doc_id: impl Into<String>, qos_class: QoSClass, size_bytes: usize) -> Self {
48 let now = std::time::SystemTime::now()
49 .duration_since(std::time::UNIX_EPOCH)
50 .unwrap_or_default()
51 .as_secs();
52
53 Self {
54 doc_id: doc_id.into(),
55 qos_class,
56 size_bytes,
57 stored_at: now,
58 last_accessed: now,
59 protected: false,
60 compressed: false,
61 }
62 }
63
64 pub fn with_age(mut self, age_seconds: u64) -> Self {
66 let now = std::time::SystemTime::now()
67 .duration_since(std::time::UNIX_EPOCH)
68 .unwrap_or_default()
69 .as_secs();
70 self.stored_at = now.saturating_sub(age_seconds);
71 self.last_accessed = self.stored_at;
72 self
73 }
74
75 pub fn age_seconds(&self) -> u64 {
77 let now = std::time::SystemTime::now()
78 .duration_since(std::time::UNIX_EPOCH)
79 .unwrap_or_default()
80 .as_secs();
81 now.saturating_sub(self.stored_at)
82 }
83
84 pub fn idle_seconds(&self) -> u64 {
86 let now = std::time::SystemTime::now()
87 .duration_since(std::time::UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_secs();
90 now.saturating_sub(self.last_accessed)
91 }
92
93 pub fn touch(&mut self) {
95 self.last_accessed = std::time::SystemTime::now()
96 .duration_since(std::time::UNIX_EPOCH)
97 .unwrap_or_default()
98 .as_secs();
99 }
100}
101
102#[derive(Debug, Clone)]
104pub struct EvictionCandidate {
105 pub doc_id: String,
107
108 pub qos_class: QoSClass,
110
111 pub age_seconds: u64,
113
114 pub size_bytes: usize,
116
117 pub eviction_score: f64,
119}
120
121#[derive(Debug, Clone, Default, Serialize, Deserialize)]
123pub struct ClassStorageMetrics {
124 pub doc_count: usize,
126
127 pub total_bytes: usize,
129
130 pub avg_age_seconds: u64,
132
133 pub oldest_doc_age: u64,
135
136 pub protected_count: usize,
138
139 pub compressed_count: usize,
141}
142
143#[derive(Debug, Clone, Default, Serialize, Deserialize)]
145pub struct StorageMetrics {
146 pub max_bytes: usize,
148
149 pub used_bytes: usize,
151
152 pub utilization: f32,
154
155 pub by_class: HashMap<QoSClass, ClassStorageMetrics>,
157
158 pub recent_evictions: usize,
160
161 pub recent_bytes_freed: usize,
163}
164
165impl StorageMetrics {
166 pub fn available_bytes(&self) -> usize {
168 self.max_bytes.saturating_sub(self.used_bytes)
169 }
170
171 pub fn under_pressure(&self) -> bool {
173 self.utilization > 0.8
174 }
175
176 pub fn is_critical(&self) -> bool {
178 self.utilization > 0.95
179 }
180}
181
182#[derive(Debug)]
187pub struct QoSAwareStorage {
188 max_storage_bytes: usize,
190
191 current_storage_bytes: AtomicUsize,
193
194 retention_policies: RetentionPolicies,
196
197 documents: RwLock<HashMap<String, StoredDocument>>,
199
200 eviction_threshold: f32,
202}
203
204impl QoSAwareStorage {
205 pub fn new(max_storage_bytes: usize) -> Self {
210 Self {
211 max_storage_bytes,
212 current_storage_bytes: AtomicUsize::new(0),
213 retention_policies: RetentionPolicies::default_tactical(),
214 documents: RwLock::new(HashMap::new()),
215 eviction_threshold: 0.9,
216 }
217 }
218
219 pub fn with_retention_policies(mut self, policies: RetentionPolicies) -> Self {
221 self.retention_policies = policies;
222 self
223 }
224
225 pub fn with_eviction_threshold(mut self, threshold: f32) -> Self {
227 self.eviction_threshold = threshold.clamp(0.5, 0.99);
228 self
229 }
230
231 pub fn register_document(&self, doc: StoredDocument) {
233 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
234
235 if let Some(existing) = docs.get(&doc.doc_id) {
237 let old_size = existing.size_bytes;
238 self.current_storage_bytes
239 .fetch_sub(old_size, Ordering::Relaxed);
240 }
241
242 let size = doc.size_bytes;
243 docs.insert(doc.doc_id.clone(), doc);
244 self.current_storage_bytes
245 .fetch_add(size, Ordering::Relaxed);
246 }
247
248 pub fn unregister_document(&self, doc_id: &str) -> Option<StoredDocument> {
250 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
251 if let Some(doc) = docs.remove(doc_id) {
252 self.current_storage_bytes
253 .fetch_sub(doc.size_bytes, Ordering::Relaxed);
254 Some(doc)
255 } else {
256 None
257 }
258 }
259
260 pub fn touch_document(&self, doc_id: &str) {
262 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
263 if let Some(doc) = docs.get_mut(doc_id) {
264 doc.touch();
265 }
266 }
267
268 pub fn mark_protected(&self, doc_id: &str) -> bool {
270 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
271 if let Some(doc) = docs.get_mut(doc_id) {
272 doc.protected = true;
273 true
274 } else {
275 false
276 }
277 }
278
279 pub fn unmark_protected(&self, doc_id: &str) -> bool {
281 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
282 if let Some(doc) = docs.get_mut(doc_id) {
283 doc.protected = false;
284 true
285 } else {
286 false
287 }
288 }
289
290 pub fn update_compressed(&self, doc_id: &str, new_size: usize) -> Option<usize> {
292 let mut docs = self.documents.write().unwrap_or_else(|e| e.into_inner());
293 if let Some(doc) = docs.get_mut(doc_id) {
294 let old_size = doc.size_bytes;
295 let diff = old_size.saturating_sub(new_size);
296
297 doc.size_bytes = new_size;
298 doc.compressed = true;
299 self.current_storage_bytes
300 .fetch_sub(diff, Ordering::Relaxed);
301
302 Some(diff)
303 } else {
304 None
305 }
306 }
307
308 pub fn storage_pressure(&self) -> f32 {
310 let used = self.current_storage_bytes.load(Ordering::Relaxed);
311 if self.max_storage_bytes == 0 {
312 0.0
313 } else {
314 used as f32 / self.max_storage_bytes as f32
315 }
316 }
317
318 pub fn should_evict(&self) -> bool {
320 self.storage_pressure() >= self.eviction_threshold
321 }
322
323 pub fn get_eviction_candidates(&self, bytes_needed: usize) -> Vec<EvictionCandidate> {
328 let pressure = self.storage_pressure();
329 let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
330
331 let mut candidates: Vec<EvictionCandidate> = docs
332 .values()
333 .filter(|doc| {
334 if doc.qos_class == QoSClass::Critical {
336 return false;
337 }
338 if doc.protected {
340 return false;
341 }
342 let policy = self.retention_policies.get(doc.qos_class);
344 policy.should_evict(doc.age_seconds(), pressure)
345 })
346 .map(|doc| {
347 let score = self.calculate_eviction_score(doc, pressure);
348 EvictionCandidate {
349 doc_id: doc.doc_id.clone(),
350 qos_class: doc.qos_class,
351 age_seconds: doc.age_seconds(),
352 size_bytes: doc.size_bytes,
353 eviction_score: score,
354 }
355 })
356 .collect();
357
358 candidates.sort_by(|a, b| {
360 b.eviction_score
361 .partial_cmp(&a.eviction_score)
362 .unwrap_or(std::cmp::Ordering::Equal)
363 });
364
365 let mut total_bytes = 0;
367 let mut selected = Vec::new();
368
369 for candidate in candidates {
370 selected.push(candidate.clone());
371 total_bytes += candidate.size_bytes;
372 if total_bytes >= bytes_needed {
373 break;
374 }
375 }
376
377 selected
378 }
379
380 fn calculate_eviction_score(&self, doc: &StoredDocument, pressure: f32) -> f64 {
388 let policy = self.retention_policies.get(doc.qos_class);
389
390 let class_score = (6 - policy.eviction_priority) as f64;
393
394 let age_factor = if policy.max_retain_seconds == u64::MAX {
396 0.0 } else {
398 (doc.age_seconds() as f64 / policy.max_retain_seconds as f64).min(1.0)
399 };
400
401 let idle_factor = (doc.idle_seconds() as f64 / 3600.0).min(1.0); let size_factor = (doc.size_bytes as f64 / 1_000_000.0).min(1.0); let pressure_factor = pressure as f64;
409
410 class_score * 10.0 + age_factor * 5.0
413 + idle_factor * 3.0
414 + size_factor * 1.0
415 + pressure_factor * 2.0
416 }
417
418 pub fn get_compression_candidates(&self) -> Vec<String> {
420 let pressure = self.storage_pressure();
421 let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
422
423 docs.values()
424 .filter(|doc| {
425 doc.qos_class != QoSClass::Critical
427 && !doc.protected
428 && !doc.compressed
429 && self
430 .retention_policies
431 .get(doc.qos_class)
432 .should_compress(pressure)
433 })
434 .map(|doc| doc.doc_id.clone())
435 .collect()
436 }
437
438 pub fn metrics(&self) -> StorageMetrics {
440 let docs = self.documents.read().unwrap_or_else(|e| e.into_inner());
441 let used = self.current_storage_bytes.load(Ordering::Relaxed);
442
443 let mut by_class: HashMap<QoSClass, ClassStorageMetrics> = HashMap::new();
444
445 for doc in docs.values() {
446 let entry = by_class.entry(doc.qos_class).or_default();
447 entry.doc_count += 1;
448 entry.total_bytes += doc.size_bytes;
449 entry.oldest_doc_age = entry.oldest_doc_age.max(doc.age_seconds());
450 if doc.protected {
451 entry.protected_count += 1;
452 }
453 if doc.compressed {
454 entry.compressed_count += 1;
455 }
456 }
457
458 for (class, metrics) in by_class.iter_mut() {
460 if metrics.doc_count > 0 {
461 let total_age: u64 = docs
462 .values()
463 .filter(|d| d.qos_class == *class)
464 .map(|d| d.age_seconds())
465 .sum();
466 metrics.avg_age_seconds = total_age / metrics.doc_count as u64;
467 }
468 }
469
470 StorageMetrics {
471 max_bytes: self.max_storage_bytes,
472 used_bytes: used,
473 utilization: self.storage_pressure(),
474 by_class,
475 recent_evictions: 0,
476 recent_bytes_freed: 0,
477 }
478 }
479
480 pub fn document_count(&self) -> usize {
482 self.documents
483 .read()
484 .unwrap_or_else(|e| e.into_inner())
485 .len()
486 }
487
488 pub fn contains(&self, doc_id: &str) -> bool {
490 self.documents
491 .read()
492 .unwrap_or_else(|e| e.into_inner())
493 .contains_key(doc_id)
494 }
495
496 pub fn get_document(&self, doc_id: &str) -> Option<StoredDocument> {
498 self.documents
499 .read()
500 .unwrap_or_else(|e| e.into_inner())
501 .get(doc_id)
502 .cloned()
503 }
504
505 pub fn max_storage_bytes(&self) -> usize {
507 self.max_storage_bytes
508 }
509
510 pub fn current_storage_bytes(&self) -> usize {
512 self.current_storage_bytes.load(Ordering::Relaxed)
513 }
514
515 pub fn available_bytes(&self) -> usize {
517 self.max_storage_bytes
518 .saturating_sub(self.current_storage_bytes.load(Ordering::Relaxed))
519 }
520}
521
522impl Default for QoSAwareStorage {
523 fn default() -> Self {
524 Self::new(1024 * 1024 * 1024)
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532
533 #[test]
534 fn test_stored_document_creation() {
535 let doc = StoredDocument::new("doc-123", QoSClass::Normal, 1024);
536
537 assert_eq!(doc.doc_id, "doc-123");
538 assert_eq!(doc.qos_class, QoSClass::Normal);
539 assert_eq!(doc.size_bytes, 1024);
540 assert!(!doc.protected);
541 assert!(!doc.compressed);
542 assert!(doc.stored_at > 0);
543 }
544
545 #[test]
546 fn test_storage_registration() {
547 let storage = QoSAwareStorage::new(1_000_000); let doc1 = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
550 let doc2 = StoredDocument::new("doc-2", QoSClass::Low, 20_000);
551
552 storage.register_document(doc1);
553 storage.register_document(doc2);
554
555 assert_eq!(storage.document_count(), 2);
556 assert_eq!(storage.current_storage_bytes(), 30_000);
557 assert!(storage.contains("doc-1"));
558 assert!(storage.contains("doc-2"));
559 }
560
561 #[test]
562 fn test_storage_unregistration() {
563 let storage = QoSAwareStorage::new(1_000_000);
564
565 let doc = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
566 storage.register_document(doc);
567
568 assert_eq!(storage.current_storage_bytes(), 10_000);
569
570 let removed = storage.unregister_document("doc-1");
571 assert!(removed.is_some());
572 assert_eq!(storage.current_storage_bytes(), 0);
573 assert!(!storage.contains("doc-1"));
574 }
575
576 #[test]
577 fn test_storage_pressure() {
578 let storage = QoSAwareStorage::new(100_000); assert_eq!(storage.storage_pressure(), 0.0);
582
583 let doc = StoredDocument::new("doc-1", QoSClass::Normal, 50_000);
585 storage.register_document(doc);
586 assert!((storage.storage_pressure() - 0.5).abs() < 0.01);
587
588 let doc2 = StoredDocument::new("doc-2", QoSClass::Normal, 40_000);
590 storage.register_document(doc2);
591 assert!((storage.storage_pressure() - 0.9).abs() < 0.01);
592 }
593
594 #[test]
595 fn test_should_evict_threshold() {
596 let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.8);
597
598 let doc1 = StoredDocument::new("doc-1", QoSClass::Bulk, 70_000);
600 storage.register_document(doc1);
601 assert!(!storage.should_evict());
602
603 let doc2 = StoredDocument::new("doc-2", QoSClass::Bulk, 15_000);
605 storage.register_document(doc2);
606 assert!(storage.should_evict());
607 }
608
609 #[test]
610 fn test_eviction_candidates_exclude_critical() {
611 let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
612
613 storage.register_document(
617 StoredDocument::new("critical-1", QoSClass::Critical, 10_000).with_age(1000),
618 );
619 storage
620 .register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 10_000).with_age(400)); storage
622 .register_document(StoredDocument::new("low-1", QoSClass::Low, 10_000).with_age(4000)); let candidates = storage.get_eviction_candidates(20_000);
626
627 assert!(!candidates.iter().any(|c| c.qos_class == QoSClass::Critical));
629
630 assert!(candidates.iter().any(|c| c.doc_id == "bulk-1"));
632 }
633
634 #[test]
635 fn test_eviction_candidates_exclude_protected() {
636 let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
637
638 storage
640 .register_document(StoredDocument::new("doc-1", QoSClass::Bulk, 10_000).with_age(400));
641 storage
642 .register_document(StoredDocument::new("doc-2", QoSClass::Bulk, 10_000).with_age(400));
643
644 storage.mark_protected("doc-1");
646
647 let candidates = storage.get_eviction_candidates(20_000);
648
649 assert!(!candidates.iter().any(|c| c.doc_id == "doc-1"));
651 assert!(candidates.iter().any(|c| c.doc_id == "doc-2"));
652 }
653
654 #[test]
655 fn test_eviction_priority_order() {
656 let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.3);
657
658 storage.register_document(
661 StoredDocument::new("high-1", QoSClass::High, 5_000).with_age(700000),
662 ); storage.register_document(
664 StoredDocument::new("normal-1", QoSClass::Normal, 5_000).with_age(100000),
665 ); storage
667 .register_document(StoredDocument::new("low-1", QoSClass::Low, 5_000).with_age(4000)); storage
669 .register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 5_000).with_age(400)); let candidates = storage.get_eviction_candidates(20_000);
672
673 assert!(!candidates.is_empty(), "Expected eviction candidates");
675
676 if candidates.len() >= 2 {
678 assert!(candidates[0].eviction_score >= candidates[1].eviction_score);
679 }
680
681 assert_eq!(candidates[0].qos_class, QoSClass::Bulk);
683 }
684
685 #[test]
686 fn test_compression_candidates() {
687 let storage = QoSAwareStorage::new(100_000).with_eviction_threshold(0.5);
688
689 storage.register_document(StoredDocument::new(
691 "critical-1",
692 QoSClass::Critical,
693 10_000,
694 ));
695 storage.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 70_000));
696
697 let candidates = storage.get_compression_candidates();
698
699 assert!(!candidates.contains(&"critical-1".to_string()));
701 assert!(candidates.contains(&"bulk-1".to_string()));
703 }
704
705 #[test]
706 fn test_update_compressed() {
707 let storage = QoSAwareStorage::new(100_000);
708
709 storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 10_000));
710 assert_eq!(storage.current_storage_bytes(), 10_000);
711
712 let saved = storage.update_compressed("doc-1", 6_000);
714 assert_eq!(saved, Some(4_000));
715 assert_eq!(storage.current_storage_bytes(), 6_000);
716
717 let doc = storage.get_document("doc-1").unwrap();
718 assert!(doc.compressed);
719 assert_eq!(doc.size_bytes, 6_000);
720 }
721
722 #[test]
723 fn test_document_replacement() {
724 let storage = QoSAwareStorage::new(100_000);
725
726 storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 10_000));
727 assert_eq!(storage.current_storage_bytes(), 10_000);
728
729 storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 15_000));
731 assert_eq!(storage.current_storage_bytes(), 15_000);
732 assert_eq!(storage.document_count(), 1);
733 }
734
735 #[test]
736 fn test_storage_metrics() {
737 let storage = QoSAwareStorage::new(100_000);
738
739 storage.register_document(StoredDocument::new("bulk-1", QoSClass::Bulk, 10_000));
740 storage.register_document(StoredDocument::new("bulk-2", QoSClass::Bulk, 15_000));
741 storage.register_document(StoredDocument::new("normal-1", QoSClass::Normal, 20_000));
742 storage.mark_protected("normal-1");
743
744 let metrics = storage.metrics();
745
746 assert_eq!(metrics.max_bytes, 100_000);
747 assert_eq!(metrics.used_bytes, 45_000);
748 assert!((metrics.utilization - 0.45).abs() < 0.01);
749
750 let bulk_metrics = metrics.by_class.get(&QoSClass::Bulk).unwrap();
751 assert_eq!(bulk_metrics.doc_count, 2);
752 assert_eq!(bulk_metrics.total_bytes, 25_000);
753
754 let normal_metrics = metrics.by_class.get(&QoSClass::Normal).unwrap();
755 assert_eq!(normal_metrics.protected_count, 1);
756 }
757
758 #[test]
759 fn test_touch_document() {
760 let storage = QoSAwareStorage::new(100_000);
761
762 let doc = StoredDocument::new("doc-1", QoSClass::Normal, 10_000);
763 let original_last_accessed = doc.last_accessed;
764 storage.register_document(doc);
765
766 std::thread::sleep(std::time::Duration::from_millis(10));
768 storage.touch_document("doc-1");
769
770 let updated_doc = storage.get_document("doc-1").unwrap();
771 assert!(updated_doc.last_accessed >= original_last_accessed);
772 }
773
774 #[test]
775 fn test_available_bytes() {
776 let storage = QoSAwareStorage::new(100_000);
777
778 assert_eq!(storage.available_bytes(), 100_000);
779
780 storage.register_document(StoredDocument::new("doc-1", QoSClass::Normal, 30_000));
781 assert_eq!(storage.available_bytes(), 70_000);
782 }
783
784 #[test]
785 fn test_storage_metrics_helper_methods() {
786 let metrics = StorageMetrics {
787 max_bytes: 100_000,
788 used_bytes: 85_000,
789 utilization: 0.85,
790 by_class: HashMap::new(),
791 recent_evictions: 0,
792 recent_bytes_freed: 0,
793 };
794
795 assert_eq!(metrics.available_bytes(), 15_000);
796 assert!(metrics.under_pressure());
797 assert!(!metrics.is_critical());
798
799 let critical_metrics = StorageMetrics {
800 utilization: 0.97,
801 ..metrics
802 };
803 assert!(critical_metrics.is_critical());
804 }
805}