1use crate::error::{CacheError, Result};
11use crate::multi_tier::CacheKey;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
18pub enum Priority {
19 Critical = 4,
21 High = 3,
23 Normal = 2,
25 Low = 1,
27 BestEffort = 0,
29}
30
31impl Priority {
32 pub fn from_level(level: usize) -> Self {
34 match level {
35 4 => Priority::Critical,
36 3 => Priority::High,
37 2 => Priority::Normal,
38 1 => Priority::Low,
39 _ => Priority::BestEffort,
40 }
41 }
42
43 pub fn to_level(&self) -> usize {
45 *self as usize
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct Partition {
52 pub id: String,
54 pub priority: Priority,
56 pub min_size: usize,
58 pub max_size: usize,
60 pub current_size: usize,
62 pub tenant_id: Option<String>,
64}
65
66impl Partition {
67 pub fn new(id: String, priority: Priority, min_size: usize, max_size: usize) -> Self {
69 Self {
70 id,
71 priority,
72 min_size,
73 max_size,
74 current_size: 0,
75 tenant_id: None,
76 }
77 }
78
79 pub fn with_tenant(mut self, tenant_id: String) -> Self {
81 self.tenant_id = Some(tenant_id);
82 self
83 }
84
85 pub fn can_fit(&self, bytes: usize) -> bool {
87 self.current_size + bytes <= self.max_size
88 }
89
90 pub fn available_space(&self) -> usize {
92 self.max_size.saturating_sub(self.current_size)
93 }
94
95 pub fn utilization(&self) -> f64 {
97 if self.max_size == 0 {
98 0.0
99 } else {
100 (self.current_size as f64 / self.max_size as f64) * 100.0
101 }
102 }
103
104 pub fn under_minimum(&self) -> bool {
106 self.current_size < self.min_size
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct PartitionStats {
113 pub hits: u64,
115 pub misses: u64,
117 pub evictions: u64,
119 pub total_access_time_us: u64,
121 pub access_count: u64,
123}
124
125impl PartitionStats {
126 pub fn new() -> Self {
128 Self {
129 hits: 0,
130 misses: 0,
131 evictions: 0,
132 total_access_time_us: 0,
133 access_count: 0,
134 }
135 }
136
137 pub fn hit_rate(&self) -> f64 {
139 if self.hits + self.misses == 0 {
140 0.0
141 } else {
142 (self.hits as f64) / ((self.hits + self.misses) as f64) * 100.0
143 }
144 }
145
146 pub fn avg_access_time_us(&self) -> f64 {
148 if self.access_count == 0 {
149 0.0
150 } else {
151 self.total_access_time_us as f64 / self.access_count as f64
152 }
153 }
154}
155
156impl Default for PartitionStats {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162pub struct PartitionManager {
164 partitions: Arc<RwLock<HashMap<String, Partition>>>,
166 key_partitions: Arc<RwLock<HashMap<CacheKey, String>>>,
168 stats: Arc<RwLock<HashMap<String, PartitionStats>>>,
170 total_size: usize,
172}
173
174impl PartitionManager {
175 pub fn new(total_size: usize) -> Self {
177 Self {
178 partitions: Arc::new(RwLock::new(HashMap::new())),
179 key_partitions: Arc::new(RwLock::new(HashMap::new())),
180 stats: Arc::new(RwLock::new(HashMap::new())),
181 total_size,
182 }
183 }
184
185 pub async fn add_partition(&self, partition: Partition) -> Result<()> {
187 let mut partitions = self.partitions.write().await;
188 let mut stats = self.stats.write().await;
189
190 let total_min: usize = partitions.values().map(|p| p.min_size).sum();
192 if total_min + partition.min_size > self.total_size {
193 return Err(CacheError::InvalidConfig(
194 "Total minimum partition sizes exceed cache size".to_string(),
195 ));
196 }
197
198 stats.insert(partition.id.clone(), PartitionStats::new());
199 partitions.insert(partition.id.clone(), partition);
200
201 Ok(())
202 }
203
204 pub async fn remove_partition(&self, partition_id: &str) -> Result<()> {
206 let mut partitions = self.partitions.write().await;
207 let mut stats = self.stats.write().await;
208
209 partitions.remove(partition_id);
210 stats.remove(partition_id);
211
212 Ok(())
213 }
214
215 pub async fn assign_key(&self, key: CacheKey, partition_id: String, size: usize) -> Result<()> {
217 let mut partitions = self.partitions.write().await;
218 let mut key_partitions = self.key_partitions.write().await;
219
220 let partition = partitions
221 .get_mut(&partition_id)
222 .ok_or_else(|| CacheError::InvalidConfig("Partition not found".to_string()))?;
223
224 if !partition.can_fit(size) {
225 return Err(CacheError::CacheFull(format!(
226 "Partition {} is full",
227 partition_id
228 )));
229 }
230
231 partition.current_size += size;
232 key_partitions.insert(key, partition_id);
233
234 Ok(())
235 }
236
237 pub async fn remove_key(&self, key: &CacheKey, size: usize) -> Result<()> {
239 let mut partitions = self.partitions.write().await;
240 let mut key_partitions = self.key_partitions.write().await;
241
242 if let Some(partition_id) = key_partitions.remove(key) {
243 if let Some(partition) = partitions.get_mut(&partition_id) {
244 partition.current_size = partition.current_size.saturating_sub(size);
245 }
246 }
247
248 Ok(())
249 }
250
251 pub async fn get_partition(&self, key: &CacheKey) -> Option<String> {
253 self.key_partitions.read().await.get(key).cloned()
254 }
255
256 pub async fn record_hit(&self, partition_id: &str, access_time_us: u64) {
258 let mut stats = self.stats.write().await;
259 if let Some(s) = stats.get_mut(partition_id) {
260 s.hits += 1;
261 s.total_access_time_us += access_time_us;
262 s.access_count += 1;
263 }
264 }
265
266 pub async fn record_miss(&self, partition_id: &str) {
268 let mut stats = self.stats.write().await;
269 if let Some(s) = stats.get_mut(partition_id) {
270 s.misses += 1;
271 }
272 }
273
274 pub async fn record_eviction(&self, partition_id: &str) {
276 let mut stats = self.stats.write().await;
277 if let Some(s) = stats.get_mut(partition_id) {
278 s.evictions += 1;
279 }
280 }
281
282 pub async fn get_stats(&self, partition_id: &str) -> Option<PartitionStats> {
284 self.stats.read().await.get(partition_id).cloned()
285 }
286
287 pub async fn get_all_partitions(&self) -> Vec<Partition> {
289 self.partitions.read().await.values().cloned().collect()
290 }
291
292 pub async fn rebalance(&self) -> Result<()> {
294 let mut partitions = self.partitions.write().await;
295 let _stats = self.stats.read().await;
296
297 let total_priority: usize = partitions.values().map(|p| p.priority.to_level()).sum();
299
300 if total_priority == 0 {
301 return Ok(());
302 }
303
304 let total_min: usize = partitions.values().map(|p| p.min_size).sum();
306 let available = self.total_size.saturating_sub(total_min);
307
308 for partition in partitions.values_mut() {
310 let priority_share = partition.priority.to_level() as f64 / total_priority as f64;
311 let additional = (available as f64 * priority_share) as usize;
312 partition.max_size = partition.min_size + additional;
313 }
314
315 Ok(())
316 }
317}
318
319pub struct QoSPolicy {
321 tenant_priorities: Arc<RwLock<HashMap<String, Priority>>>,
323 default_priority: Priority,
325}
326
327impl QoSPolicy {
328 pub fn new(default_priority: Priority) -> Self {
330 Self {
331 tenant_priorities: Arc::new(RwLock::new(HashMap::new())),
332 default_priority,
333 }
334 }
335
336 pub async fn set_tenant_priority(&self, tenant_id: String, priority: Priority) {
338 self.tenant_priorities
339 .write()
340 .await
341 .insert(tenant_id, priority);
342 }
343
344 pub async fn get_partition_for_tenant(&self, tenant_id: &str) -> String {
346 let priorities = self.tenant_priorities.read().await;
347 let priority = priorities
348 .get(tenant_id)
349 .copied()
350 .unwrap_or(self.default_priority);
351
352 format!("partition_{}", priority.to_level())
353 }
354
355 pub async fn get_priority(&self, tenant_id: &str) -> Priority {
357 self.tenant_priorities
358 .read()
359 .await
360 .get(tenant_id)
361 .copied()
362 .unwrap_or(self.default_priority)
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 #[test]
371 fn test_partition_creation() {
372 let partition = Partition::new(
373 "test".to_string(),
374 Priority::High,
375 1024 * 1024,
376 10 * 1024 * 1024,
377 );
378
379 assert_eq!(partition.id, "test");
380 assert_eq!(partition.priority, Priority::High);
381 assert!(partition.can_fit(5 * 1024 * 1024));
382 }
383
384 #[test]
385 fn test_priority_levels() {
386 assert_eq!(Priority::Critical.to_level(), 4);
387 assert_eq!(Priority::High.to_level(), 3);
388 assert_eq!(Priority::Normal.to_level(), 2);
389 assert_eq!(Priority::Low.to_level(), 1);
390 assert_eq!(Priority::BestEffort.to_level(), 0);
391 }
392
393 #[tokio::test]
394 async fn test_partition_manager() {
395 let manager = PartitionManager::new(100 * 1024 * 1024);
396
397 let partition = Partition::new(
398 "high".to_string(),
399 Priority::High,
400 10 * 1024 * 1024,
401 50 * 1024 * 1024,
402 );
403
404 manager.add_partition(partition).await.unwrap_or_default();
405
406 let key = "test_key".to_string();
407 manager
408 .assign_key(key.clone(), "high".to_string(), 1024)
409 .await
410 .unwrap_or_default();
411
412 let partition_id = manager.get_partition(&key).await;
413 assert_eq!(partition_id, Some("high".to_string()));
414 }
415
416 #[tokio::test]
417 async fn test_partition_stats() {
418 let manager = PartitionManager::new(100 * 1024 * 1024);
419
420 let partition = Partition::new(
421 "test".to_string(),
422 Priority::Normal,
423 10 * 1024 * 1024,
424 50 * 1024 * 1024,
425 );
426
427 manager.add_partition(partition).await.unwrap_or_default();
428
429 manager.record_hit("test", 100).await;
430 manager.record_hit("test", 150).await;
431 manager.record_miss("test").await;
432
433 let stats = manager.get_stats("test").await;
434 assert!(stats.is_some());
435
436 let stats = stats.unwrap_or_default();
437 assert_eq!(stats.hits, 2);
438 assert_eq!(stats.misses, 1);
439 assert!(stats.hit_rate() > 0.0);
440 }
441
442 #[tokio::test]
443 async fn test_qos_policy() {
444 let policy = QoSPolicy::new(Priority::Normal);
445
446 policy
447 .set_tenant_priority("tenant1".to_string(), Priority::High)
448 .await;
449
450 let priority = policy.get_priority("tenant1").await;
451 assert_eq!(priority, Priority::High);
452
453 let priority = policy.get_priority("tenant2").await;
454 assert_eq!(priority, Priority::Normal);
455 }
456
457 #[tokio::test]
458 async fn test_partition_rebalance() {
459 let manager = PartitionManager::new(100 * 1024 * 1024);
460
461 let p1 = Partition::new(
462 "high".to_string(),
463 Priority::High,
464 10 * 1024 * 1024,
465 30 * 1024 * 1024,
466 );
467
468 let p2 = Partition::new(
469 "low".to_string(),
470 Priority::Low,
471 10 * 1024 * 1024,
472 20 * 1024 * 1024,
473 );
474
475 manager.add_partition(p1).await.unwrap_or_default();
476 manager.add_partition(p2).await.unwrap_or_default();
477
478 manager.rebalance().await.unwrap_or_default();
479
480 let partitions = manager.get_all_partitions().await;
481 assert_eq!(partitions.len(), 2);
482
483 let high = partitions.iter().find(|p| p.id == "high");
485 let low = partitions.iter().find(|p| p.id == "low");
486
487 if let (Some(h), Some(l)) = (high, low) {
488 assert!(h.max_size >= l.max_size);
489 }
490 }
491}