1use chrono::{DateTime, Duration, Utc};
5use serde::{Deserialize, Serialize};
6use tokio::time::{Duration as TokioDuration, interval};
7use tracing::{debug, error, info, warn};
8
9use crate::database::{ModelDatabase, ModelRecord};
10use modelexpress_common::config::DurationConfig;
11use modelexpress_common::models::ModelStatus;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheEvictionConfig {
16 pub enabled: bool,
18 pub policy: EvictionPolicyType,
20 pub check_interval: DurationConfig,
22}
23
24impl Default for CacheEvictionConfig {
25 fn default() -> Self {
26 Self {
27 enabled: true,
28 policy: EvictionPolicyType::Lru(LruConfig::default()),
29 check_interval: DurationConfig::hours(1), }
31 }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(tag = "type", rename_all = "lowercase")]
37pub enum EvictionPolicyType {
38 Lru(LruConfig),
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct LruConfig {
45 pub unused_threshold: DurationConfig,
47 pub max_models: Option<u32>,
49 pub min_free_space_bytes: Option<u64>,
51}
52
53impl Default for LruConfig {
54 fn default() -> Self {
55 Self {
56 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
58 min_free_space_bytes: None,
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
65pub struct EvictionResult {
66 pub evicted_count: u32,
68 pub evicted_models: Vec<String>,
70 pub bytes_freed: Option<u64>,
72 pub reason: EvictionReason,
74}
75
76#[derive(Debug, Clone)]
78pub enum EvictionReason {
79 TimeThreshold,
81 CountLimit,
83 DiskSpace,
85 Manual,
87}
88
89#[async_trait::async_trait]
91pub trait EvictionPolicyTrait {
92 async fn select_for_eviction(
94 &self,
95 models: &[ModelRecord],
96 config: &CacheEvictionConfig,
97 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
98}
99
100pub struct LruEvictionPolicy;
102
103impl LruEvictionPolicy {
104 fn is_time_expired(model: &ModelRecord, threshold: &DurationConfig) -> bool {
106 let threshold_duration = threshold.as_chrono_duration();
107 let cutoff_time = match Utc::now().checked_sub_signed(threshold_duration) {
108 Some(time) => time,
109 None => Utc::now(),
110 };
111 model.last_used_at < cutoff_time
112 }
113
114 async fn get_disk_space_info() -> Option<(u64, u64)> {
116 None
121 }
122}
123
124#[async_trait::async_trait]
125impl EvictionPolicyTrait for LruEvictionPolicy {
126 async fn select_for_eviction(
127 &self,
128 models: &[ModelRecord],
129 config: &CacheEvictionConfig,
130 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
131 let EvictionPolicyType::Lru(lru_config) = &config.policy;
132
133 let mut candidates_for_eviction = Vec::new();
134
135 let downloaded_models: Vec<&ModelRecord> = models
137 .iter()
138 .filter(|model| model.status == ModelStatus::DOWNLOADED)
139 .collect();
140
141 debug!(
142 "Evaluating {downloaded_count} downloaded models for eviction",
143 downloaded_count = downloaded_models.len()
144 );
145
146 for model in &downloaded_models {
148 if Self::is_time_expired(model, &lru_config.unused_threshold) {
149 debug!(
150 "Model '{model_name}' is expired (last used: {last_used_at})",
151 model_name = model.model_name,
152 last_used_at = model.last_used_at
153 );
154 candidates_for_eviction.push(model.model_name.clone());
155 }
156 }
157
158 if let Some(max_models) = lru_config.max_models {
160 let models_to_remove_by_count =
161 downloaded_models.len().saturating_sub(max_models as usize);
162 if models_to_remove_by_count > 0 {
163 debug!(
164 "Need to remove {models_to_remove_by_count} models due to count limit (have: {downloaded_count}, max: {max_models})",
165 models_to_remove_by_count = models_to_remove_by_count,
166 downloaded_count = downloaded_models.len(),
167 max_models = max_models
168 );
169
170 let mut sorted_models = downloaded_models.clone();
172 sorted_models.sort_by_key(|model| model.last_used_at);
173
174 for model in sorted_models.iter().take(models_to_remove_by_count) {
175 if !candidates_for_eviction.contains(&model.model_name) {
176 candidates_for_eviction.push(model.model_name.clone());
177 }
178 }
179 }
180 }
181
182 if let Some(_min_free_space) = lru_config.min_free_space_bytes
184 && let Some((_total_space, _free_space)) = Self::get_disk_space_info().await
185 {
186 debug!("Disk space checking is not yet implemented");
189 }
190
191 debug!(
192 "Selected {evicted_count} models for eviction: {candidates:?}",
193 evicted_count = candidates_for_eviction.len(),
194 candidates = candidates_for_eviction
195 );
196
197 Ok(candidates_for_eviction)
198 }
199}
200
201pub struct CacheEvictionService {
203 database: ModelDatabase,
204 config: CacheEvictionConfig,
205}
206
207impl CacheEvictionService {
208 pub fn new(database: ModelDatabase, config: CacheEvictionConfig) -> Self {
210 Self { database, config }
211 }
212
213 pub async fn start(
215 self,
216 mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
217 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
218 if !self.config.enabled {
219 info!("Cache eviction service is disabled");
220 return Ok(());
221 }
222
223 info!(
224 "Starting cache eviction service with policy: {policy:?}, check interval: {interval}s",
225 policy = self.config.policy,
226 interval = self.config.check_interval.num_seconds()
227 );
228
229 let mut interval_timer = interval(TokioDuration::from_secs(
230 self.config.check_interval.num_seconds() as u64,
231 ));
232
233 loop {
234 tokio::select! {
235 _ = interval_timer.tick() => {
236 if let Err(e) = self.run_eviction_cycle().await {
237 error!("Error during cache eviction cycle: {e}", e = e);
238 }
239 }
240 _ = &mut shutdown_receiver => {
241 info!("Cache eviction service received shutdown signal");
242 break;
243 }
244 }
245 }
246
247 info!("Cache eviction service stopped");
248 Ok(())
249 }
250
251 async fn run_eviction_cycle(
253 &self,
254 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
255 debug!("Starting cache eviction cycle");
256
257 let models = self.database.get_models_by_last_used(None)?;
259 debug!(
260 "Found {total_models} total models in database",
261 total_models = models.len()
262 );
263
264 let models_to_evict = match &self.config.policy {
266 EvictionPolicyType::Lru(_) => {
267 let lru_policy = LruEvictionPolicy;
268 lru_policy
269 .select_for_eviction(&models, &self.config)
270 .await?
271 }
272 };
273
274 let evicted_count = models_to_evict.len() as u32;
275
276 if evicted_count == 0 {
277 debug!("No models selected for eviction");
278 return Ok(EvictionResult {
279 evicted_count: 0,
280 evicted_models: Vec::new(),
281 bytes_freed: None,
282 reason: EvictionReason::TimeThreshold,
283 });
284 }
285
286 info!(
287 "Evicting {evicted_count} models: {models:?}",
288 evicted_count = evicted_count,
289 models = models_to_evict
290 );
291
292 let mut successfully_evicted = Vec::new();
294 for model_name in &models_to_evict {
295 match self.evict_model(model_name).await {
296 Ok(()) => {
297 successfully_evicted.push(model_name.clone());
298 info!(
299 "Successfully evicted model: {model_name}",
300 model_name = model_name
301 );
302 }
303 Err(e) => {
304 warn!(
305 "Failed to evict model '{model_name}': {e}",
306 model_name = model_name,
307 e = e
308 );
309 }
310 }
311 }
312
313 let result = EvictionResult {
314 evicted_count: successfully_evicted.len() as u32,
315 evicted_models: successfully_evicted,
316 bytes_freed: None, reason: EvictionReason::TimeThreshold,
318 };
319
320 if result.evicted_count > 0 {
321 info!(
322 "Cache eviction cycle completed: {evicted_count} models evicted",
323 evicted_count = result.evicted_count
324 );
325 } else {
326 debug!("Cache eviction cycle completed: no models evicted");
327 }
328
329 Ok(result)
330 }
331
332 async fn evict_model(
334 &self,
335 model_name: &str,
336 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
337 self.database.delete_model(model_name)?;
339
340 debug!(
345 "Would remove model files for: {model_name}",
346 model_name = model_name
347 );
348
349 Ok(())
355 }
356
357 pub async fn manual_evict(
359 &self,
360 model_names: &[String],
361 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
362 info!(
363 "Manual eviction requested for models: {models:?}",
364 models = model_names
365 );
366
367 let mut successfully_evicted = Vec::new();
368 for model_name in model_names {
369 match self.evict_model(model_name).await {
370 Ok(()) => {
371 successfully_evicted.push(model_name.clone());
372 info!(
373 "Successfully evicted model: {model_name}",
374 model_name = model_name
375 );
376 }
377 Err(e) => {
378 warn!(
379 "Failed to evict model '{model_name}': {e}",
380 model_name = model_name,
381 e = e
382 );
383 }
384 }
385 }
386
387 Ok(EvictionResult {
388 evicted_count: successfully_evicted.len() as u32,
389 evicted_models: successfully_evicted,
390 bytes_freed: None,
391 reason: EvictionReason::Manual,
392 })
393 }
394
395 pub async fn get_cache_stats(
397 &self,
398 ) -> Result<CacheStats, Box<dyn std::error::Error + Send + Sync>> {
399 let models = self.database.get_models_by_last_used(None)?;
400 let (downloading, downloaded, error) = self.database.get_status_counts()?;
401
402 let _now = Utc::now();
403 let mut oldest_model: Option<DateTime<Utc>> = None;
404 let mut newest_model: Option<DateTime<Utc>> = None;
405
406 for model in &models {
407 if model.status == ModelStatus::DOWNLOADED {
408 if oldest_model.is_none_or(|oldest| model.last_used_at < oldest) {
409 oldest_model = Some(model.last_used_at);
410 }
411 if newest_model.is_none_or(|newest| model.last_used_at > newest) {
412 newest_model = Some(model.last_used_at);
413 }
414 }
415 }
416
417 Ok(CacheStats {
418 total_models: models.len() as u32,
419 downloading_models: downloading,
420 downloaded_models: downloaded,
421 error_models: error,
422 oldest_model_last_used: oldest_model,
423 newest_model_last_used: newest_model,
424 })
425 }
426}
427
428#[derive(Debug, Clone, Serialize)]
430pub struct CacheStats {
431 pub total_models: u32,
432 pub downloading_models: u32,
433 pub downloaded_models: u32,
434 pub error_models: u32,
435 pub oldest_model_last_used: Option<DateTime<Utc>>,
436 pub newest_model_last_used: Option<DateTime<Utc>>,
437}
438
439#[cfg(test)]
440#[allow(clippy::expect_used)]
441mod tests {
442 use super::*;
443 use crate::database::ModelDatabase;
444 use modelexpress_common::models::ModelProvider;
445 use tempfile::TempDir;
446
447 fn create_test_database() -> (ModelDatabase, TempDir) {
448 let temp_dir = TempDir::new().expect("Failed to create temporary directory");
449 let db_path = temp_dir.path().join("test_models.db");
450 let db = ModelDatabase::new(db_path.to_str().expect("Invalid path"))
451 .expect("Failed to create test database");
452 (db, temp_dir)
453 }
454
455 #[test]
456 fn test_default_config() {
457 let config = CacheEvictionConfig::default();
458 assert!(config.enabled);
459 assert_eq!(config.check_interval.num_seconds(), 3600);
460 assert!(matches!(config.policy, EvictionPolicyType::Lru(_)));
461 }
462
463 #[test]
464 fn test_lru_config_defaults() {
465 let lru_config = LruConfig::default();
466 assert_eq!(lru_config.unused_threshold.num_seconds(), 7 * 24 * 3600);
467 assert!(lru_config.max_models.is_none());
468 assert!(lru_config.min_free_space_bytes.is_none());
469 }
470
471 #[test]
472 fn test_duration_config_parsing() {
473 use modelexpress_common::config::parse_duration_string;
474
475 let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": "7d"}, "check_interval": "2h"}"#;
477 let config: CacheEvictionConfig =
478 serde_json::from_str(json).expect("Failed to parse config");
479 assert_eq!(config.check_interval.num_seconds(), 2 * 3600); let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": 604800}, "check_interval": 1800}"#;
483 let config: CacheEvictionConfig =
484 serde_json::from_str(json).expect("Failed to parse config");
485 assert_eq!(config.check_interval.num_seconds(), 1800); assert_eq!(
489 parse_duration_string("30m")
490 .expect("Failed to parse 30m")
491 .num_seconds(),
492 30 * 60
493 );
494 assert_eq!(
495 parse_duration_string("45s")
496 .expect("Failed to parse 45s")
497 .num_seconds(),
498 45
499 );
500 assert_eq!(
501 parse_duration_string("1d")
502 .expect("Failed to parse 1d")
503 .num_seconds(),
504 24 * 3600
505 );
506 assert_eq!(
507 parse_duration_string("2h30m")
508 .expect("Failed to parse 2h30m")
509 .num_seconds(),
510 2 * 3600 + 30 * 60
511 );
512 }
513
514 #[test]
515 fn test_is_time_expired() {
516 let now = Utc::now();
517
518 let old_model = ModelRecord {
520 model_name: "old-model".to_string(),
521 provider: ModelProvider::HuggingFace,
522 status: ModelStatus::DOWNLOADED,
523 created_at: now - Duration::days(10),
524 last_used_at: now - Duration::days(8),
525 message: None,
526 };
527
528 let recent_model = ModelRecord {
530 model_name: "recent-model".to_string(),
531 provider: ModelProvider::HuggingFace,
532 status: ModelStatus::DOWNLOADED,
533 created_at: now - Duration::days(6),
534 last_used_at: now - Duration::days(5),
535 message: None,
536 };
537
538 let threshold = DurationConfig::new(Duration::days(7)); assert!(LruEvictionPolicy::is_time_expired(&old_model, &threshold));
541 assert!(!LruEvictionPolicy::is_time_expired(
542 &recent_model,
543 &threshold
544 ));
545 }
546
547 #[tokio::test]
548 async fn test_lru_eviction_policy_time_based() {
549 let now = Utc::now();
550
551 let models = vec![
552 ModelRecord {
553 model_name: "old-model".to_string(),
554 provider: ModelProvider::HuggingFace,
555 status: ModelStatus::DOWNLOADED,
556 created_at: now - Duration::days(10),
557 last_used_at: now - Duration::days(8),
558 message: None,
559 },
560 ModelRecord {
561 model_name: "recent-model".to_string(),
562 provider: ModelProvider::HuggingFace,
563 status: ModelStatus::DOWNLOADED,
564 created_at: now - Duration::days(6),
565 last_used_at: now - Duration::days(5),
566 message: None,
567 },
568 ModelRecord {
569 model_name: "downloading-model".to_string(),
570 provider: ModelProvider::HuggingFace,
571 status: ModelStatus::DOWNLOADING,
572 created_at: now - Duration::days(10),
573 last_used_at: now - Duration::days(8),
574 message: None,
575 },
576 ];
577
578 let config = CacheEvictionConfig {
579 enabled: true,
580 policy: EvictionPolicyType::Lru(LruConfig {
581 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
583 min_free_space_bytes: None,
584 }),
585 check_interval: DurationConfig::hours(1),
586 };
587
588 let policy = LruEvictionPolicy;
589 let evicted = policy
590 .select_for_eviction(&models, &config)
591 .await
592 .expect("Failed to select models for eviction");
593
594 assert_eq!(evicted.len(), 1);
596 assert_eq!(evicted[0], "old-model");
597 }
598
599 #[tokio::test]
600 async fn test_lru_eviction_policy_count_based() {
601 let now = Utc::now();
602
603 let models = vec![
604 ModelRecord {
605 model_name: "model1".to_string(),
606 provider: ModelProvider::HuggingFace,
607 status: ModelStatus::DOWNLOADED,
608 created_at: now - Duration::days(3),
609 last_used_at: now - Duration::days(3),
610 message: None,
611 },
612 ModelRecord {
613 model_name: "model2".to_string(),
614 provider: ModelProvider::HuggingFace,
615 status: ModelStatus::DOWNLOADED,
616 created_at: now - Duration::days(2),
617 last_used_at: now - Duration::days(2),
618 message: None,
619 },
620 ModelRecord {
621 model_name: "model3".to_string(),
622 provider: ModelProvider::HuggingFace,
623 status: ModelStatus::DOWNLOADED,
624 created_at: now - Duration::days(1),
625 last_used_at: now - Duration::days(1),
626 message: None,
627 },
628 ];
629
630 let config = CacheEvictionConfig {
631 enabled: true,
632 policy: EvictionPolicyType::Lru(LruConfig {
633 unused_threshold: DurationConfig::new(Duration::days(30)), max_models: Some(2), min_free_space_bytes: None,
636 }),
637 check_interval: DurationConfig::hours(1),
638 };
639
640 let policy = LruEvictionPolicy;
641 let evicted = policy
642 .select_for_eviction(&models, &config)
643 .await
644 .expect("Failed to select models for eviction");
645
646 assert_eq!(evicted.len(), 1);
648 assert_eq!(evicted[0], "model1");
649 }
650
651 #[tokio::test]
652 async fn test_cache_eviction_service_creation() {
653 let (db, _temp_dir) = create_test_database();
654 let config = CacheEvictionConfig::default();
655
656 let service = CacheEvictionService::new(db, config.clone());
657 assert!(service.config.enabled);
658 }
659
660 #[tokio::test]
661 async fn test_manual_evict() {
662 let (db, _temp_dir) = create_test_database();
663 let config = CacheEvictionConfig::default();
664
665 db.set_status(
667 "test-model",
668 ModelProvider::HuggingFace,
669 ModelStatus::DOWNLOADED,
670 None,
671 )
672 .expect("Failed to set model status");
673
674 let service = CacheEvictionService::new(db.clone(), config);
675
676 let models_to_evict = vec!["test-model".to_string()];
677 let result = service
678 .manual_evict(&models_to_evict)
679 .await
680 .expect("Failed to manually evict models");
681
682 assert_eq!(result.evicted_count, 1);
683 assert_eq!(result.evicted_models[0], "test-model");
684 assert!(matches!(result.reason, EvictionReason::Manual));
685
686 assert!(
688 db.get_status("test-model")
689 .expect("Failed to get model status")
690 .is_none()
691 );
692 }
693
694 #[tokio::test]
695 async fn test_get_cache_stats() {
696 let (db, _temp_dir) = create_test_database();
697 let config = CacheEvictionConfig::default();
698
699 db.set_status(
701 "model1",
702 ModelProvider::HuggingFace,
703 ModelStatus::DOWNLOADED,
704 None,
705 )
706 .expect("Failed to set model1 status");
707 db.set_status(
708 "model2",
709 ModelProvider::HuggingFace,
710 ModelStatus::DOWNLOADING,
711 None,
712 )
713 .expect("Failed to set model2 status");
714 db.set_status(
715 "model3",
716 ModelProvider::HuggingFace,
717 ModelStatus::ERROR,
718 None,
719 )
720 .expect("Failed to set model3 status");
721
722 let service = CacheEvictionService::new(db, config);
723 let stats = service
724 .get_cache_stats()
725 .await
726 .expect("Failed to get cache stats");
727
728 assert_eq!(stats.total_models, 3);
729 assert_eq!(stats.downloaded_models, 1);
730 assert_eq!(stats.downloading_models, 1);
731 assert_eq!(stats.error_models, 1);
732 }
733}