Skip to main content

oxigdal_cache_advanced/
partitioning.rs

1//! Cache partitioning for QoS and tenant isolation
2//!
3//! Provides mechanisms for:
4//! - Quality-of-Service (QoS) partitioning
5//! - Priority-based cache allocation
6//! - Tenant isolation
7//! - Dynamic partition resizing
8//! - Performance monitoring per partition
9
10use crate::error::{CacheError, Result};
11use crate::multi_tier::CacheKey;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Partition priority level
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
18pub enum Priority {
19    /// Critical priority (highest)
20    Critical = 4,
21    /// High priority
22    High = 3,
23    /// Normal priority
24    Normal = 2,
25    /// Low priority
26    Low = 1,
27    /// Best effort (lowest)
28    BestEffort = 0,
29}
30
31impl Priority {
32    /// Get priority from integer
33    pub fn from_level(level: usize) -> Self {
34        match level {
35            4 => Priority::Critical,
36            3 => Priority::High,
37            2 => Priority::Normal,
38            1 => Priority::Low,
39            _ => Priority::BestEffort,
40        }
41    }
42
43    /// Get integer level
44    pub fn to_level(&self) -> usize {
45        *self as usize
46    }
47}
48
49/// Cache partition definition
50#[derive(Debug, Clone)]
51pub struct Partition {
52    /// Partition ID
53    pub id: String,
54    /// Priority level
55    pub priority: Priority,
56    /// Minimum guaranteed size (bytes)
57    pub min_size: usize,
58    /// Maximum size limit (bytes)
59    pub max_size: usize,
60    /// Current size (bytes)
61    pub current_size: usize,
62    /// Tenant ID (for multi-tenancy)
63    pub tenant_id: Option<String>,
64}
65
66impl Partition {
67    /// Create new partition
68    pub fn new(id: String, priority: Priority, min_size: usize, max_size: usize) -> Self {
69        Self {
70            id,
71            priority,
72            min_size,
73            max_size,
74            current_size: 0,
75            tenant_id: None,
76        }
77    }
78
79    /// Set tenant ID
80    pub fn with_tenant(mut self, tenant_id: String) -> Self {
81        self.tenant_id = Some(tenant_id);
82        self
83    }
84
85    /// Check if partition can accommodate bytes
86    pub fn can_fit(&self, bytes: usize) -> bool {
87        self.current_size + bytes <= self.max_size
88    }
89
90    /// Get available space
91    pub fn available_space(&self) -> usize {
92        self.max_size.saturating_sub(self.current_size)
93    }
94
95    /// Get utilization percentage
96    pub fn utilization(&self) -> f64 {
97        if self.max_size == 0 {
98            0.0
99        } else {
100            (self.current_size as f64 / self.max_size as f64) * 100.0
101        }
102    }
103
104    /// Check if partition is under minimum guarantee
105    pub fn under_minimum(&self) -> bool {
106        self.current_size < self.min_size
107    }
108}
109
110/// Partition statistics
111#[derive(Debug, Clone)]
112pub struct PartitionStats {
113    /// Number of cache hits
114    pub hits: u64,
115    /// Number of cache misses
116    pub misses: u64,
117    /// Number of evictions
118    pub evictions: u64,
119    /// Total access time (microseconds)
120    pub total_access_time_us: u64,
121    /// Number of accesses
122    pub access_count: u64,
123}
124
125impl PartitionStats {
126    /// Create new stats
127    pub fn new() -> Self {
128        Self {
129            hits: 0,
130            misses: 0,
131            evictions: 0,
132            total_access_time_us: 0,
133            access_count: 0,
134        }
135    }
136
137    /// Calculate hit rate
138    pub fn hit_rate(&self) -> f64 {
139        if self.hits + self.misses == 0 {
140            0.0
141        } else {
142            (self.hits as f64) / ((self.hits + self.misses) as f64) * 100.0
143        }
144    }
145
146    /// Calculate average access time
147    pub fn avg_access_time_us(&self) -> f64 {
148        if self.access_count == 0 {
149            0.0
150        } else {
151            self.total_access_time_us as f64 / self.access_count as f64
152        }
153    }
154}
155
156impl Default for PartitionStats {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162/// Cache partitioning manager
163pub struct PartitionManager {
164    /// Partitions by ID
165    partitions: Arc<RwLock<HashMap<String, Partition>>>,
166    /// Key to partition mapping
167    key_partitions: Arc<RwLock<HashMap<CacheKey, String>>>,
168    /// Partition statistics
169    stats: Arc<RwLock<HashMap<String, PartitionStats>>>,
170    /// Total cache size
171    total_size: usize,
172}
173
174impl PartitionManager {
175    /// Create new partition manager
176    pub fn new(total_size: usize) -> Self {
177        Self {
178            partitions: Arc::new(RwLock::new(HashMap::new())),
179            key_partitions: Arc::new(RwLock::new(HashMap::new())),
180            stats: Arc::new(RwLock::new(HashMap::new())),
181            total_size,
182        }
183    }
184
185    /// Add partition
186    pub async fn add_partition(&self, partition: Partition) -> Result<()> {
187        let mut partitions = self.partitions.write().await;
188        let mut stats = self.stats.write().await;
189
190        // Validate total size doesn't exceed limit
191        let total_min: usize = partitions.values().map(|p| p.min_size).sum();
192        if total_min + partition.min_size > self.total_size {
193            return Err(CacheError::InvalidConfig(
194                "Total minimum partition sizes exceed cache size".to_string(),
195            ));
196        }
197
198        stats.insert(partition.id.clone(), PartitionStats::new());
199        partitions.insert(partition.id.clone(), partition);
200
201        Ok(())
202    }
203
204    /// Remove partition
205    pub async fn remove_partition(&self, partition_id: &str) -> Result<()> {
206        let mut partitions = self.partitions.write().await;
207        let mut stats = self.stats.write().await;
208
209        partitions.remove(partition_id);
210        stats.remove(partition_id);
211
212        Ok(())
213    }
214
215    /// Assign key to partition
216    pub async fn assign_key(&self, key: CacheKey, partition_id: String, size: usize) -> Result<()> {
217        let mut partitions = self.partitions.write().await;
218        let mut key_partitions = self.key_partitions.write().await;
219
220        let partition = partitions
221            .get_mut(&partition_id)
222            .ok_or_else(|| CacheError::InvalidConfig("Partition not found".to_string()))?;
223
224        if !partition.can_fit(size) {
225            return Err(CacheError::CacheFull(format!(
226                "Partition {} is full",
227                partition_id
228            )));
229        }
230
231        partition.current_size += size;
232        key_partitions.insert(key, partition_id);
233
234        Ok(())
235    }
236
237    /// Remove key from partition
238    pub async fn remove_key(&self, key: &CacheKey, size: usize) -> Result<()> {
239        let mut partitions = self.partitions.write().await;
240        let mut key_partitions = self.key_partitions.write().await;
241
242        if let Some(partition_id) = key_partitions.remove(key) {
243            if let Some(partition) = partitions.get_mut(&partition_id) {
244                partition.current_size = partition.current_size.saturating_sub(size);
245            }
246        }
247
248        Ok(())
249    }
250
251    /// Get partition for key
252    pub async fn get_partition(&self, key: &CacheKey) -> Option<String> {
253        self.key_partitions.read().await.get(key).cloned()
254    }
255
256    /// Record cache hit
257    pub async fn record_hit(&self, partition_id: &str, access_time_us: u64) {
258        let mut stats = self.stats.write().await;
259        if let Some(s) = stats.get_mut(partition_id) {
260            s.hits += 1;
261            s.total_access_time_us += access_time_us;
262            s.access_count += 1;
263        }
264    }
265
266    /// Record cache miss
267    pub async fn record_miss(&self, partition_id: &str) {
268        let mut stats = self.stats.write().await;
269        if let Some(s) = stats.get_mut(partition_id) {
270            s.misses += 1;
271        }
272    }
273
274    /// Record eviction
275    pub async fn record_eviction(&self, partition_id: &str) {
276        let mut stats = self.stats.write().await;
277        if let Some(s) = stats.get_mut(partition_id) {
278            s.evictions += 1;
279        }
280    }
281
282    /// Get partition statistics
283    pub async fn get_stats(&self, partition_id: &str) -> Option<PartitionStats> {
284        self.stats.read().await.get(partition_id).cloned()
285    }
286
287    /// Get all partition info
288    pub async fn get_all_partitions(&self) -> Vec<Partition> {
289        self.partitions.read().await.values().cloned().collect()
290    }
291
292    /// Rebalance partitions based on usage
293    pub async fn rebalance(&self) -> Result<()> {
294        let mut partitions = self.partitions.write().await;
295        let _stats = self.stats.read().await;
296
297        // Calculate priority-weighted sizes
298        let total_priority: usize = partitions.values().map(|p| p.priority.to_level()).sum();
299
300        if total_priority == 0 {
301            return Ok(());
302        }
303
304        // Available space after guarantees
305        let total_min: usize = partitions.values().map(|p| p.min_size).sum();
306        let available = self.total_size.saturating_sub(total_min);
307
308        // Distribute available space by priority
309        for partition in partitions.values_mut() {
310            let priority_share = partition.priority.to_level() as f64 / total_priority as f64;
311            let additional = (available as f64 * priority_share) as usize;
312            partition.max_size = partition.min_size + additional;
313        }
314
315        Ok(())
316    }
317}
318
319/// QoS policy for automatic partition assignment
320pub struct QoSPolicy {
321    /// Priority mappings (tenant -> priority)
322    tenant_priorities: Arc<RwLock<HashMap<String, Priority>>>,
323    /// Default priority
324    default_priority: Priority,
325}
326
327impl QoSPolicy {
328    /// Create new QoS policy
329    pub fn new(default_priority: Priority) -> Self {
330        Self {
331            tenant_priorities: Arc::new(RwLock::new(HashMap::new())),
332            default_priority,
333        }
334    }
335
336    /// Set tenant priority
337    pub async fn set_tenant_priority(&self, tenant_id: String, priority: Priority) {
338        self.tenant_priorities
339            .write()
340            .await
341            .insert(tenant_id, priority);
342    }
343
344    /// Get partition ID for tenant
345    pub async fn get_partition_for_tenant(&self, tenant_id: &str) -> String {
346        let priorities = self.tenant_priorities.read().await;
347        let priority = priorities
348            .get(tenant_id)
349            .copied()
350            .unwrap_or(self.default_priority);
351
352        format!("partition_{}", priority.to_level())
353    }
354
355    /// Get priority for tenant
356    pub async fn get_priority(&self, tenant_id: &str) -> Priority {
357        self.tenant_priorities
358            .read()
359            .await
360            .get(tenant_id)
361            .copied()
362            .unwrap_or(self.default_priority)
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn test_partition_creation() {
372        let partition = Partition::new(
373            "test".to_string(),
374            Priority::High,
375            1024 * 1024,
376            10 * 1024 * 1024,
377        );
378
379        assert_eq!(partition.id, "test");
380        assert_eq!(partition.priority, Priority::High);
381        assert!(partition.can_fit(5 * 1024 * 1024));
382    }
383
384    #[test]
385    fn test_priority_levels() {
386        assert_eq!(Priority::Critical.to_level(), 4);
387        assert_eq!(Priority::High.to_level(), 3);
388        assert_eq!(Priority::Normal.to_level(), 2);
389        assert_eq!(Priority::Low.to_level(), 1);
390        assert_eq!(Priority::BestEffort.to_level(), 0);
391    }
392
393    #[tokio::test]
394    async fn test_partition_manager() {
395        let manager = PartitionManager::new(100 * 1024 * 1024);
396
397        let partition = Partition::new(
398            "high".to_string(),
399            Priority::High,
400            10 * 1024 * 1024,
401            50 * 1024 * 1024,
402        );
403
404        manager.add_partition(partition).await.unwrap_or_default();
405
406        let key = "test_key".to_string();
407        manager
408            .assign_key(key.clone(), "high".to_string(), 1024)
409            .await
410            .unwrap_or_default();
411
412        let partition_id = manager.get_partition(&key).await;
413        assert_eq!(partition_id, Some("high".to_string()));
414    }
415
416    #[tokio::test]
417    async fn test_partition_stats() {
418        let manager = PartitionManager::new(100 * 1024 * 1024);
419
420        let partition = Partition::new(
421            "test".to_string(),
422            Priority::Normal,
423            10 * 1024 * 1024,
424            50 * 1024 * 1024,
425        );
426
427        manager.add_partition(partition).await.unwrap_or_default();
428
429        manager.record_hit("test", 100).await;
430        manager.record_hit("test", 150).await;
431        manager.record_miss("test").await;
432
433        let stats = manager.get_stats("test").await;
434        assert!(stats.is_some());
435
436        let stats = stats.unwrap_or_default();
437        assert_eq!(stats.hits, 2);
438        assert_eq!(stats.misses, 1);
439        assert!(stats.hit_rate() > 0.0);
440    }
441
442    #[tokio::test]
443    async fn test_qos_policy() {
444        let policy = QoSPolicy::new(Priority::Normal);
445
446        policy
447            .set_tenant_priority("tenant1".to_string(), Priority::High)
448            .await;
449
450        let priority = policy.get_priority("tenant1").await;
451        assert_eq!(priority, Priority::High);
452
453        let priority = policy.get_priority("tenant2").await;
454        assert_eq!(priority, Priority::Normal);
455    }
456
457    #[tokio::test]
458    async fn test_partition_rebalance() {
459        let manager = PartitionManager::new(100 * 1024 * 1024);
460
461        let p1 = Partition::new(
462            "high".to_string(),
463            Priority::High,
464            10 * 1024 * 1024,
465            30 * 1024 * 1024,
466        );
467
468        let p2 = Partition::new(
469            "low".to_string(),
470            Priority::Low,
471            10 * 1024 * 1024,
472            20 * 1024 * 1024,
473        );
474
475        manager.add_partition(p1).await.unwrap_or_default();
476        manager.add_partition(p2).await.unwrap_or_default();
477
478        manager.rebalance().await.unwrap_or_default();
479
480        let partitions = manager.get_all_partitions().await;
481        assert_eq!(partitions.len(), 2);
482
483        // High priority partition should get more space
484        let high = partitions.iter().find(|p| p.id == "high");
485        let low = partitions.iter().find(|p| p.id == "low");
486
487        if let (Some(h), Some(l)) = (high, low) {
488            assert!(h.max_size >= l.max_size);
489        }
490    }
491}