1use chrono::{DateTime, Duration, Utc};
5use serde::{Deserialize, Serialize};
6use tokio::time::{Duration as TokioDuration, interval};
7use tracing::{debug, error, info, warn};
8
9use std::path::PathBuf;
10
11use crate::database::{ModelDatabase, ModelRecord};
12use modelexpress_common::config::DurationConfig;
13use modelexpress_common::download::get_provider;
14use modelexpress_common::models::ModelStatus;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct CacheEvictionConfig {
19 pub enabled: bool,
21 pub policy: EvictionPolicyType,
23 pub check_interval: DurationConfig,
25}
26
27impl Default for CacheEvictionConfig {
28 fn default() -> Self {
29 Self {
30 enabled: true,
31 policy: EvictionPolicyType::Lru(LruConfig::default()),
32 check_interval: DurationConfig::hours(1), }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "type", rename_all = "lowercase")]
40pub enum EvictionPolicyType {
41 Lru(LruConfig),
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct LruConfig {
48 pub unused_threshold: DurationConfig,
50 pub max_models: Option<u32>,
52 pub min_free_space_bytes: Option<u64>,
54}
55
56impl Default for LruConfig {
57 fn default() -> Self {
58 Self {
59 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
61 min_free_space_bytes: None,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct EvictionResult {
69 pub evicted_count: u32,
71 pub evicted_models: Vec<String>,
73 pub bytes_freed: Option<u64>,
75 pub reason: EvictionReason,
77}
78
79#[derive(Debug, Clone)]
81pub enum EvictionReason {
82 TimeThreshold,
84 CountLimit,
86 DiskSpace,
88 Manual,
90}
91
92#[async_trait::async_trait]
94pub trait EvictionPolicyTrait {
95 async fn select_for_eviction(
97 &self,
98 models: &[ModelRecord],
99 config: &CacheEvictionConfig,
100 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
101}
102
103pub struct LruEvictionPolicy;
105
106impl LruEvictionPolicy {
107 fn is_time_expired(model: &ModelRecord, threshold: &DurationConfig) -> bool {
109 let threshold_duration = threshold.as_chrono_duration();
110 let cutoff_time = match Utc::now().checked_sub_signed(threshold_duration) {
111 Some(time) => time,
112 None => Utc::now(),
113 };
114 model.last_used_at < cutoff_time
115 }
116
117 async fn get_disk_space_info() -> Option<(u64, u64)> {
119 None
124 }
125}
126
127#[async_trait::async_trait]
128impl EvictionPolicyTrait for LruEvictionPolicy {
129 async fn select_for_eviction(
130 &self,
131 models: &[ModelRecord],
132 config: &CacheEvictionConfig,
133 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
134 let EvictionPolicyType::Lru(lru_config) = &config.policy;
135
136 let mut candidates_for_eviction = Vec::new();
137
138 let downloaded_models: Vec<&ModelRecord> = models
140 .iter()
141 .filter(|model| model.status == ModelStatus::DOWNLOADED)
142 .collect();
143
144 debug!(
145 "Evaluating {downloaded_count} downloaded models for eviction",
146 downloaded_count = downloaded_models.len()
147 );
148
149 for model in &downloaded_models {
151 if Self::is_time_expired(model, &lru_config.unused_threshold) {
152 debug!(
153 "Model '{model_name}' is expired (last used: {last_used_at})",
154 model_name = model.model_name,
155 last_used_at = model.last_used_at
156 );
157 candidates_for_eviction.push(model.model_name.clone());
158 }
159 }
160
161 if let Some(max_models) = lru_config.max_models {
163 let models_to_remove_by_count =
164 downloaded_models.len().saturating_sub(max_models as usize);
165 if models_to_remove_by_count > 0 {
166 debug!(
167 "Need to remove {models_to_remove_by_count} models due to count limit (have: {downloaded_count}, max: {max_models})",
168 models_to_remove_by_count = models_to_remove_by_count,
169 downloaded_count = downloaded_models.len(),
170 max_models = max_models
171 );
172
173 let mut sorted_models = downloaded_models.clone();
175 sorted_models.sort_by_key(|model| model.last_used_at);
176
177 for model in sorted_models.iter().take(models_to_remove_by_count) {
178 if !candidates_for_eviction.contains(&model.model_name) {
179 candidates_for_eviction.push(model.model_name.clone());
180 }
181 }
182 }
183 }
184
185 if let Some(_min_free_space) = lru_config.min_free_space_bytes
187 && let Some((_total_space, _free_space)) = Self::get_disk_space_info().await
188 {
189 debug!("Disk space checking is not yet implemented");
192 }
193
194 debug!(
195 "Selected {evicted_count} models for eviction: {candidates:?}",
196 evicted_count = candidates_for_eviction.len(),
197 candidates = candidates_for_eviction
198 );
199
200 Ok(candidates_for_eviction)
201 }
202}
203
204pub struct CacheEvictionService {
206 database: ModelDatabase,
207 config: CacheEvictionConfig,
208 cache_directory: PathBuf,
209}
210
211impl CacheEvictionService {
212 pub fn new(
214 database: ModelDatabase,
215 config: CacheEvictionConfig,
216 cache_directory: PathBuf,
217 ) -> Self {
218 Self {
219 database,
220 config,
221 cache_directory,
222 }
223 }
224
225 pub async fn start(
227 self,
228 mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
229 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
230 if !self.config.enabled {
231 info!("Cache eviction service is disabled");
232 return Ok(());
233 }
234
235 info!(
236 "Starting cache eviction service with policy: {policy:?}, check interval: {interval}s",
237 policy = self.config.policy,
238 interval = self.config.check_interval.num_seconds()
239 );
240
241 let mut interval_timer = interval(TokioDuration::from_secs(
242 self.config.check_interval.num_seconds() as u64,
243 ));
244
245 loop {
246 tokio::select! {
247 _ = interval_timer.tick() => {
248 if let Err(e) = self.run_eviction_cycle().await {
249 error!("Error during cache eviction cycle: {e}", e = e);
250 }
251 }
252 _ = &mut shutdown_receiver => {
253 info!("Cache eviction service received shutdown signal");
254 break;
255 }
256 }
257 }
258
259 info!("Cache eviction service stopped");
260 Ok(())
261 }
262
263 async fn run_eviction_cycle(
265 &self,
266 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
267 debug!("Starting cache eviction cycle");
268
269 let models = self.database.get_models_by_last_used(None)?;
271 debug!(
272 "Found {total_models} total models in database",
273 total_models = models.len()
274 );
275
276 let models_to_evict = match &self.config.policy {
278 EvictionPolicyType::Lru(_) => {
279 let lru_policy = LruEvictionPolicy;
280 lru_policy
281 .select_for_eviction(&models, &self.config)
282 .await?
283 }
284 };
285
286 let evicted_count = models_to_evict.len() as u32;
287
288 if evicted_count == 0 {
289 debug!("No models selected for eviction");
290 return Ok(EvictionResult {
291 evicted_count: 0,
292 evicted_models: Vec::new(),
293 bytes_freed: None,
294 reason: EvictionReason::TimeThreshold,
295 });
296 }
297
298 info!(
299 "Evicting {evicted_count} models: {models:?}",
300 evicted_count = evicted_count,
301 models = models_to_evict
302 );
303
304 let mut successfully_evicted = Vec::new();
306 for model_name in &models_to_evict {
307 match self.evict_model(model_name).await {
308 Ok(()) => {
309 successfully_evicted.push(model_name.clone());
310 info!(
311 "Successfully evicted model: {model_name}",
312 model_name = model_name
313 );
314 }
315 Err(e) => {
316 warn!(
317 "Failed to evict model '{model_name}': {e}",
318 model_name = model_name,
319 e = e
320 );
321 }
322 }
323 }
324
325 let result = EvictionResult {
326 evicted_count: successfully_evicted.len() as u32,
327 evicted_models: successfully_evicted,
328 bytes_freed: None, reason: EvictionReason::TimeThreshold,
330 };
331
332 if result.evicted_count > 0 {
333 info!(
334 "Cache eviction cycle completed: {evicted_count} models evicted",
335 evicted_count = result.evicted_count
336 );
337 } else {
338 debug!("Cache eviction cycle completed: no models evicted");
339 }
340
341 Ok(result)
342 }
343
344 async fn evict_model(
346 &self,
347 model_name: &str,
348 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
349 let record = self
351 .database
352 .get_model_record(model_name)?
353 .ok_or_else(|| format!("model '{model_name}' not found in database"))?;
354
355 let provider = get_provider(record.provider);
358 provider
359 .delete_model(model_name, self.cache_directory.clone())
360 .await
361 .map_err(|e| format!("failed to delete model files for '{model_name}': {e}"))?;
362
363 self.database.delete_model(model_name)?;
365
366 Ok(())
367 }
368
369 pub async fn manual_evict(
371 &self,
372 model_names: &[String],
373 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
374 info!(
375 "Manual eviction requested for models: {models:?}",
376 models = model_names
377 );
378
379 let mut successfully_evicted = Vec::new();
380 for model_name in model_names {
381 match self.evict_model(model_name).await {
382 Ok(()) => {
383 successfully_evicted.push(model_name.clone());
384 info!(
385 "Successfully evicted model: {model_name}",
386 model_name = model_name
387 );
388 }
389 Err(e) => {
390 warn!(
391 "Failed to evict model '{model_name}': {e}",
392 model_name = model_name,
393 e = e
394 );
395 }
396 }
397 }
398
399 Ok(EvictionResult {
400 evicted_count: successfully_evicted.len() as u32,
401 evicted_models: successfully_evicted,
402 bytes_freed: None,
403 reason: EvictionReason::Manual,
404 })
405 }
406
407 pub async fn get_cache_stats(
409 &self,
410 ) -> Result<CacheStats, Box<dyn std::error::Error + Send + Sync>> {
411 let models = self.database.get_models_by_last_used(None)?;
412 let (downloading, downloaded, error) = self.database.get_status_counts()?;
413
414 let _now = Utc::now();
415 let mut oldest_model: Option<DateTime<Utc>> = None;
416 let mut newest_model: Option<DateTime<Utc>> = None;
417
418 for model in &models {
419 if model.status == ModelStatus::DOWNLOADED {
420 if oldest_model.is_none_or(|oldest| model.last_used_at < oldest) {
421 oldest_model = Some(model.last_used_at);
422 }
423 if newest_model.is_none_or(|newest| model.last_used_at > newest) {
424 newest_model = Some(model.last_used_at);
425 }
426 }
427 }
428
429 Ok(CacheStats {
430 total_models: models.len() as u32,
431 downloading_models: downloading,
432 downloaded_models: downloaded,
433 error_models: error,
434 oldest_model_last_used: oldest_model,
435 newest_model_last_used: newest_model,
436 })
437 }
438}
439
440#[derive(Debug, Clone, Serialize)]
442pub struct CacheStats {
443 pub total_models: u32,
444 pub downloading_models: u32,
445 pub downloaded_models: u32,
446 pub error_models: u32,
447 pub oldest_model_last_used: Option<DateTime<Utc>>,
448 pub newest_model_last_used: Option<DateTime<Utc>>,
449}
450
451#[cfg(test)]
452#[allow(clippy::expect_used)]
453mod tests {
454 use super::*;
455 use crate::database::ModelDatabase;
456 use modelexpress_common::models::ModelProvider;
457 use tempfile::TempDir;
458
459 fn create_test_database() -> (ModelDatabase, TempDir) {
460 let temp_dir = TempDir::new().expect("Failed to create temporary directory");
461 let db_path = temp_dir.path().join("test_models.db");
462 let db = ModelDatabase::new(db_path.to_str().expect("Invalid path"))
463 .expect("Failed to create test database");
464 (db, temp_dir)
465 }
466
467 fn create_test_service(
468 db: ModelDatabase,
469 config: CacheEvictionConfig,
470 ) -> (CacheEvictionService, TempDir) {
471 let cache_dir = TempDir::new().expect("Failed to create cache directory");
472 let service = CacheEvictionService::new(db, config, cache_dir.path().to_path_buf());
473 (service, cache_dir)
474 }
475
476 #[test]
477 fn test_default_config() {
478 let config = CacheEvictionConfig::default();
479 assert!(config.enabled);
480 assert_eq!(config.check_interval.num_seconds(), 3600);
481 assert!(matches!(config.policy, EvictionPolicyType::Lru(_)));
482 }
483
484 #[test]
485 fn test_lru_config_defaults() {
486 let lru_config = LruConfig::default();
487 assert_eq!(lru_config.unused_threshold.num_seconds(), 7 * 24 * 3600);
488 assert!(lru_config.max_models.is_none());
489 assert!(lru_config.min_free_space_bytes.is_none());
490 }
491
492 #[test]
493 fn test_duration_config_parsing() {
494 use modelexpress_common::config::parse_duration_string;
495
496 let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": "7d"}, "check_interval": "2h"}"#;
498 let config: CacheEvictionConfig =
499 serde_json::from_str(json).expect("Failed to parse config");
500 assert_eq!(config.check_interval.num_seconds(), 2 * 3600); let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": 604800}, "check_interval": 1800}"#;
504 let config: CacheEvictionConfig =
505 serde_json::from_str(json).expect("Failed to parse config");
506 assert_eq!(config.check_interval.num_seconds(), 1800); assert_eq!(
510 parse_duration_string("30m")
511 .expect("Failed to parse 30m")
512 .num_seconds(),
513 30 * 60
514 );
515 assert_eq!(
516 parse_duration_string("45s")
517 .expect("Failed to parse 45s")
518 .num_seconds(),
519 45
520 );
521 assert_eq!(
522 parse_duration_string("1d")
523 .expect("Failed to parse 1d")
524 .num_seconds(),
525 24 * 3600
526 );
527 assert_eq!(
528 parse_duration_string("2h30m")
529 .expect("Failed to parse 2h30m")
530 .num_seconds(),
531 2 * 3600 + 30 * 60
532 );
533 }
534
535 #[test]
536 fn test_is_time_expired() {
537 let now = Utc::now();
538
539 let old_model = ModelRecord {
541 model_name: "old-model".to_string(),
542 provider: ModelProvider::HuggingFace,
543 status: ModelStatus::DOWNLOADED,
544 created_at: now - Duration::days(10),
545 last_used_at: now - Duration::days(8),
546 message: None,
547 };
548
549 let recent_model = ModelRecord {
551 model_name: "recent-model".to_string(),
552 provider: ModelProvider::HuggingFace,
553 status: ModelStatus::DOWNLOADED,
554 created_at: now - Duration::days(6),
555 last_used_at: now - Duration::days(5),
556 message: None,
557 };
558
559 let threshold = DurationConfig::new(Duration::days(7)); assert!(LruEvictionPolicy::is_time_expired(&old_model, &threshold));
562 assert!(!LruEvictionPolicy::is_time_expired(
563 &recent_model,
564 &threshold
565 ));
566 }
567
568 #[tokio::test]
569 async fn test_lru_eviction_policy_time_based() {
570 let now = Utc::now();
571
572 let models = vec![
573 ModelRecord {
574 model_name: "old-model".to_string(),
575 provider: ModelProvider::HuggingFace,
576 status: ModelStatus::DOWNLOADED,
577 created_at: now - Duration::days(10),
578 last_used_at: now - Duration::days(8),
579 message: None,
580 },
581 ModelRecord {
582 model_name: "recent-model".to_string(),
583 provider: ModelProvider::HuggingFace,
584 status: ModelStatus::DOWNLOADED,
585 created_at: now - Duration::days(6),
586 last_used_at: now - Duration::days(5),
587 message: None,
588 },
589 ModelRecord {
590 model_name: "downloading-model".to_string(),
591 provider: ModelProvider::HuggingFace,
592 status: ModelStatus::DOWNLOADING,
593 created_at: now - Duration::days(10),
594 last_used_at: now - Duration::days(8),
595 message: None,
596 },
597 ];
598
599 let config = CacheEvictionConfig {
600 enabled: true,
601 policy: EvictionPolicyType::Lru(LruConfig {
602 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
604 min_free_space_bytes: None,
605 }),
606 check_interval: DurationConfig::hours(1),
607 };
608
609 let policy = LruEvictionPolicy;
610 let evicted = policy
611 .select_for_eviction(&models, &config)
612 .await
613 .expect("Failed to select models for eviction");
614
615 assert_eq!(evicted.len(), 1);
617 assert_eq!(evicted[0], "old-model");
618 }
619
620 #[tokio::test]
621 async fn test_lru_eviction_policy_count_based() {
622 let now = Utc::now();
623
624 let models = vec![
625 ModelRecord {
626 model_name: "model1".to_string(),
627 provider: ModelProvider::HuggingFace,
628 status: ModelStatus::DOWNLOADED,
629 created_at: now - Duration::days(3),
630 last_used_at: now - Duration::days(3),
631 message: None,
632 },
633 ModelRecord {
634 model_name: "model2".to_string(),
635 provider: ModelProvider::HuggingFace,
636 status: ModelStatus::DOWNLOADED,
637 created_at: now - Duration::days(2),
638 last_used_at: now - Duration::days(2),
639 message: None,
640 },
641 ModelRecord {
642 model_name: "model3".to_string(),
643 provider: ModelProvider::HuggingFace,
644 status: ModelStatus::DOWNLOADED,
645 created_at: now - Duration::days(1),
646 last_used_at: now - Duration::days(1),
647 message: None,
648 },
649 ];
650
651 let config = CacheEvictionConfig {
652 enabled: true,
653 policy: EvictionPolicyType::Lru(LruConfig {
654 unused_threshold: DurationConfig::new(Duration::days(30)), max_models: Some(2), min_free_space_bytes: None,
657 }),
658 check_interval: DurationConfig::hours(1),
659 };
660
661 let policy = LruEvictionPolicy;
662 let evicted = policy
663 .select_for_eviction(&models, &config)
664 .await
665 .expect("Failed to select models for eviction");
666
667 assert_eq!(evicted.len(), 1);
669 assert_eq!(evicted[0], "model1");
670 }
671
672 #[tokio::test]
673 async fn test_cache_eviction_service_creation() {
674 let (db, _temp_dir) = create_test_database();
675 let config = CacheEvictionConfig::default();
676
677 let (service, _cache_dir) = create_test_service(db, config);
678 assert!(service.config.enabled);
679 }
680
681 #[tokio::test]
682 async fn test_manual_evict() {
683 let (db, _temp_dir) = create_test_database();
684 let config = CacheEvictionConfig::default();
685
686 db.set_status(
688 "test-model",
689 ModelProvider::HuggingFace,
690 ModelStatus::DOWNLOADED,
691 None,
692 )
693 .expect("Failed to set model status");
694
695 let (service, _cache_dir) = create_test_service(db.clone(), config);
696
697 let models_to_evict = vec!["test-model".to_string()];
698 let result = service
699 .manual_evict(&models_to_evict)
700 .await
701 .expect("Failed to manually evict models");
702
703 assert_eq!(result.evicted_count, 1);
704 assert_eq!(result.evicted_models[0], "test-model");
705 assert!(matches!(result.reason, EvictionReason::Manual));
706
707 assert!(
709 db.get_status("test-model")
710 .expect("Failed to get model status")
711 .is_none()
712 );
713 }
714
715 #[tokio::test]
716 async fn test_get_cache_stats() {
717 let (db, _temp_dir) = create_test_database();
718 let config = CacheEvictionConfig::default();
719
720 db.set_status(
722 "model1",
723 ModelProvider::HuggingFace,
724 ModelStatus::DOWNLOADED,
725 None,
726 )
727 .expect("Failed to set model1 status");
728 db.set_status(
729 "model2",
730 ModelProvider::HuggingFace,
731 ModelStatus::DOWNLOADING,
732 None,
733 )
734 .expect("Failed to set model2 status");
735 db.set_status(
736 "model3",
737 ModelProvider::HuggingFace,
738 ModelStatus::ERROR,
739 None,
740 )
741 .expect("Failed to set model3 status");
742
743 let (service, _cache_dir) = create_test_service(db, config);
744 let stats = service
745 .get_cache_stats()
746 .await
747 .expect("Failed to get cache stats");
748
749 assert_eq!(stats.total_models, 3);
750 assert_eq!(stats.downloaded_models, 1);
751 assert_eq!(stats.downloading_models, 1);
752 assert_eq!(stats.error_models, 1);
753 }
754}