1use chrono::{DateTime, Duration, Utc};
5use serde::{Deserialize, Serialize};
6use tokio::time::{Duration as TokioDuration, interval};
7use tracing::{debug, error, info, warn};
8
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use crate::registry::backend::ModelRecord;
13use crate::registry::state::RegistryManager;
14use modelexpress_common::config::DurationConfig;
15use modelexpress_common::download::get_provider;
16use modelexpress_common::models::ModelStatus;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct CacheEvictionConfig {
21 pub enabled: bool,
23 pub policy: EvictionPolicyType,
25 pub check_interval: DurationConfig,
27}
28
29impl Default for CacheEvictionConfig {
30 fn default() -> Self {
31 Self {
32 enabled: true,
33 policy: EvictionPolicyType::Lru(LruConfig::default()),
34 check_interval: DurationConfig::hours(1), }
36 }
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "type", rename_all = "lowercase")]
42pub enum EvictionPolicyType {
43 Lru(LruConfig),
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct LruConfig {
50 pub unused_threshold: DurationConfig,
52 pub max_models: Option<u32>,
54 pub min_free_space_bytes: Option<u64>,
56}
57
58impl Default for LruConfig {
59 fn default() -> Self {
60 Self {
61 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
63 min_free_space_bytes: None,
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct EvictionResult {
71 pub evicted_count: u32,
73 pub evicted_models: Vec<String>,
75 pub bytes_freed: Option<u64>,
77 pub reason: EvictionReason,
79}
80
81#[derive(Debug, Clone)]
83pub enum EvictionReason {
84 TimeThreshold,
86 CountLimit,
88 DiskSpace,
90 Manual,
92}
93
94#[async_trait::async_trait]
96pub trait EvictionPolicyTrait {
97 async fn select_for_eviction(
99 &self,
100 models: &[ModelRecord],
101 config: &CacheEvictionConfig,
102 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
103}
104
105pub struct LruEvictionPolicy;
107
108impl LruEvictionPolicy {
109 fn is_time_expired(model: &ModelRecord, threshold: &DurationConfig) -> bool {
111 let threshold_duration = threshold.as_chrono_duration();
112 let cutoff_time = match Utc::now().checked_sub_signed(threshold_duration) {
113 Some(time) => time,
114 None => Utc::now(),
115 };
116 model.last_used_at < cutoff_time
117 }
118
119 async fn get_disk_space_info() -> Option<(u64, u64)> {
121 None
126 }
127}
128
129#[async_trait::async_trait]
130impl EvictionPolicyTrait for LruEvictionPolicy {
131 async fn select_for_eviction(
132 &self,
133 models: &[ModelRecord],
134 config: &CacheEvictionConfig,
135 ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
136 let EvictionPolicyType::Lru(lru_config) = &config.policy;
137
138 let mut candidates_for_eviction = Vec::new();
139
140 let downloaded_models: Vec<&ModelRecord> = models
142 .iter()
143 .filter(|model| model.status == ModelStatus::DOWNLOADED)
144 .collect();
145
146 debug!(
147 "Evaluating {downloaded_count} downloaded models for eviction",
148 downloaded_count = downloaded_models.len()
149 );
150
151 for model in &downloaded_models {
153 if Self::is_time_expired(model, &lru_config.unused_threshold) {
154 debug!(
155 "Model '{model_name}' is expired (last used: {last_used_at})",
156 model_name = model.model_name,
157 last_used_at = model.last_used_at
158 );
159 candidates_for_eviction.push(model.model_name.clone());
160 }
161 }
162
163 if let Some(max_models) = lru_config.max_models {
165 let models_to_remove_by_count =
166 downloaded_models.len().saturating_sub(max_models as usize);
167 if models_to_remove_by_count > 0 {
168 debug!(
169 "Need to remove {models_to_remove_by_count} models due to count limit (have: {downloaded_count}, max: {max_models})",
170 models_to_remove_by_count = models_to_remove_by_count,
171 downloaded_count = downloaded_models.len(),
172 max_models = max_models
173 );
174
175 let mut sorted_models = downloaded_models.clone();
177 sorted_models.sort_by_key(|model| model.last_used_at);
178
179 for model in sorted_models.iter().take(models_to_remove_by_count) {
180 if !candidates_for_eviction.contains(&model.model_name) {
181 candidates_for_eviction.push(model.model_name.clone());
182 }
183 }
184 }
185 }
186
187 if let Some(_min_free_space) = lru_config.min_free_space_bytes
189 && let Some((_total_space, _free_space)) = Self::get_disk_space_info().await
190 {
191 debug!("Disk space checking is not yet implemented");
194 }
195
196 debug!(
197 "Selected {evicted_count} models for eviction: {candidates:?}",
198 evicted_count = candidates_for_eviction.len(),
199 candidates = candidates_for_eviction
200 );
201
202 Ok(candidates_for_eviction)
203 }
204}
205
206pub struct CacheEvictionService {
208 registry: Arc<RegistryManager>,
209 config: CacheEvictionConfig,
210 cache_directory: PathBuf,
211}
212
213impl CacheEvictionService {
214 pub fn new(
216 registry: Arc<RegistryManager>,
217 config: CacheEvictionConfig,
218 cache_directory: PathBuf,
219 ) -> Self {
220 Self {
221 registry,
222 config,
223 cache_directory,
224 }
225 }
226
227 pub async fn start(
229 self,
230 mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
231 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
232 if !self.config.enabled {
233 info!("Cache eviction service is disabled");
234 return Ok(());
235 }
236
237 info!(
238 "Starting cache eviction service with policy: {policy:?}, check interval: {interval}s",
239 policy = self.config.policy,
240 interval = self.config.check_interval.num_seconds()
241 );
242
243 let mut interval_timer = interval(TokioDuration::from_secs(
244 self.config.check_interval.num_seconds() as u64,
245 ));
246
247 loop {
248 tokio::select! {
249 _ = interval_timer.tick() => {
250 if let Err(e) = self.run_eviction_cycle().await {
251 error!("Error during cache eviction cycle: {e}", e = e);
252 }
253 }
254 _ = &mut shutdown_receiver => {
255 info!("Cache eviction service received shutdown signal");
256 break;
257 }
258 }
259 }
260
261 info!("Cache eviction service stopped");
262 Ok(())
263 }
264
265 async fn run_eviction_cycle(
267 &self,
268 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
269 debug!("Starting cache eviction cycle");
270
271 let models = self.registry.get_models_by_last_used(None).await?;
273 debug!(
274 "Found {total_models} total models in database",
275 total_models = models.len()
276 );
277
278 let models_to_evict = match &self.config.policy {
280 EvictionPolicyType::Lru(_) => {
281 let lru_policy = LruEvictionPolicy;
282 lru_policy
283 .select_for_eviction(&models, &self.config)
284 .await?
285 }
286 };
287
288 let evicted_count = models_to_evict.len() as u32;
289
290 if evicted_count == 0 {
291 debug!("No models selected for eviction");
292 return Ok(EvictionResult {
293 evicted_count: 0,
294 evicted_models: Vec::new(),
295 bytes_freed: None,
296 reason: EvictionReason::TimeThreshold,
297 });
298 }
299
300 info!(
301 "Evicting {evicted_count} models: {models:?}",
302 evicted_count = evicted_count,
303 models = models_to_evict
304 );
305
306 let mut successfully_evicted = Vec::new();
308 for model_name in &models_to_evict {
309 match self.evict_model(model_name).await {
310 Ok(()) => {
311 successfully_evicted.push(model_name.clone());
312 info!(
313 "Successfully evicted model: {model_name}",
314 model_name = model_name
315 );
316 }
317 Err(e) => {
318 warn!(
319 "Failed to evict model '{model_name}': {e}",
320 model_name = model_name,
321 e = e
322 );
323 }
324 }
325 }
326
327 let result = EvictionResult {
328 evicted_count: successfully_evicted.len() as u32,
329 evicted_models: successfully_evicted,
330 bytes_freed: None, reason: EvictionReason::TimeThreshold,
332 };
333
334 if result.evicted_count > 0 {
335 info!(
336 "Cache eviction cycle completed: {evicted_count} models evicted",
337 evicted_count = result.evicted_count
338 );
339 } else {
340 debug!("Cache eviction cycle completed: no models evicted");
341 }
342
343 Ok(result)
344 }
345
346 async fn evict_model(
348 &self,
349 model_name: &str,
350 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
351 let record = self
353 .registry
354 .get_model_record(model_name)
355 .await?
356 .ok_or_else(|| format!("model '{model_name}' not found in registry"))?;
357
358 let provider = get_provider(record.provider);
361 provider
362 .delete_model(model_name, self.cache_directory.clone())
363 .await
364 .map_err(|e| format!("failed to delete model files for '{model_name}': {e}"))?;
365
366 self.registry.delete_model(model_name).await?;
368
369 Ok(())
370 }
371
372 pub async fn manual_evict(
374 &self,
375 model_names: &[String],
376 ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
377 info!(
378 "Manual eviction requested for models: {models:?}",
379 models = model_names
380 );
381
382 let mut successfully_evicted = Vec::new();
383 for model_name in model_names {
384 match self.evict_model(model_name).await {
385 Ok(()) => {
386 successfully_evicted.push(model_name.clone());
387 info!(
388 "Successfully evicted model: {model_name}",
389 model_name = model_name
390 );
391 }
392 Err(e) => {
393 warn!(
394 "Failed to evict model '{model_name}': {e}",
395 model_name = model_name,
396 e = e
397 );
398 }
399 }
400 }
401
402 Ok(EvictionResult {
403 evicted_count: successfully_evicted.len() as u32,
404 evicted_models: successfully_evicted,
405 bytes_freed: None,
406 reason: EvictionReason::Manual,
407 })
408 }
409
410 pub async fn get_cache_stats(
412 &self,
413 ) -> Result<CacheStats, Box<dyn std::error::Error + Send + Sync>> {
414 let models = self.registry.get_models_by_last_used(None).await?;
415 let (downloading, downloaded, error) = self.registry.get_status_counts().await?;
416
417 let _now = Utc::now();
418 let mut oldest_model: Option<DateTime<Utc>> = None;
419 let mut newest_model: Option<DateTime<Utc>> = None;
420
421 for model in &models {
422 if model.status == ModelStatus::DOWNLOADED {
423 if oldest_model.is_none_or(|oldest| model.last_used_at < oldest) {
424 oldest_model = Some(model.last_used_at);
425 }
426 if newest_model.is_none_or(|newest| model.last_used_at > newest) {
427 newest_model = Some(model.last_used_at);
428 }
429 }
430 }
431
432 Ok(CacheStats {
433 total_models: models.len() as u32,
434 downloading_models: downloading,
435 downloaded_models: downloaded,
436 error_models: error,
437 oldest_model_last_used: oldest_model,
438 newest_model_last_used: newest_model,
439 })
440 }
441}
442
443#[derive(Debug, Clone, Serialize)]
445pub struct CacheStats {
446 pub total_models: u32,
447 pub downloading_models: u32,
448 pub downloaded_models: u32,
449 pub error_models: u32,
450 pub oldest_model_last_used: Option<DateTime<Utc>>,
451 pub newest_model_last_used: Option<DateTime<Utc>>,
452}
453
454#[cfg(test)]
455#[allow(clippy::expect_used)]
456mod tests {
457 use super::*;
458 use crate::registry::backend::MockRegistryBackend;
459 use modelexpress_common::models::ModelProvider;
460 use tempfile::TempDir;
461
462 fn service_with_mock(
463 mock: MockRegistryBackend,
464 config: CacheEvictionConfig,
465 ) -> (CacheEvictionService, TempDir) {
466 let registry = Arc::new(RegistryManager::with_backend(Arc::new(mock)));
467 let cache_dir = TempDir::new().expect("Failed to create cache directory");
468 let service = CacheEvictionService::new(registry, config, cache_dir.path().to_path_buf());
469 (service, cache_dir)
470 }
471
472 #[test]
473 fn test_default_config() {
474 let config = CacheEvictionConfig::default();
475 assert!(config.enabled);
476 assert_eq!(config.check_interval.num_seconds(), 3600);
477 assert!(matches!(config.policy, EvictionPolicyType::Lru(_)));
478 }
479
480 #[test]
481 fn test_lru_config_defaults() {
482 let lru_config = LruConfig::default();
483 assert_eq!(lru_config.unused_threshold.num_seconds(), 7 * 24 * 3600);
484 assert!(lru_config.max_models.is_none());
485 assert!(lru_config.min_free_space_bytes.is_none());
486 }
487
488 #[test]
489 fn test_duration_config_parsing() {
490 use modelexpress_common::config::parse_duration_string;
491
492 let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": "7d"}, "check_interval": "2h"}"#;
494 let config: CacheEvictionConfig =
495 serde_json::from_str(json).expect("Failed to parse config");
496 assert_eq!(config.check_interval.num_seconds(), 2 * 3600); let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": 604800}, "check_interval": 1800}"#;
500 let config: CacheEvictionConfig =
501 serde_json::from_str(json).expect("Failed to parse config");
502 assert_eq!(config.check_interval.num_seconds(), 1800); assert_eq!(
506 parse_duration_string("30m")
507 .expect("Failed to parse 30m")
508 .num_seconds(),
509 30 * 60
510 );
511 assert_eq!(
512 parse_duration_string("45s")
513 .expect("Failed to parse 45s")
514 .num_seconds(),
515 45
516 );
517 assert_eq!(
518 parse_duration_string("1d")
519 .expect("Failed to parse 1d")
520 .num_seconds(),
521 24 * 3600
522 );
523 assert_eq!(
524 parse_duration_string("2h30m")
525 .expect("Failed to parse 2h30m")
526 .num_seconds(),
527 2 * 3600 + 30 * 60
528 );
529 }
530
531 #[test]
532 fn test_is_time_expired() {
533 let now = Utc::now();
534
535 let old_model = ModelRecord {
537 model_name: "old-model".to_string(),
538 provider: ModelProvider::HuggingFace,
539 status: ModelStatus::DOWNLOADED,
540 created_at: now - Duration::days(10),
541 last_used_at: now - Duration::days(8),
542 message: None,
543 };
544
545 let recent_model = ModelRecord {
547 model_name: "recent-model".to_string(),
548 provider: ModelProvider::HuggingFace,
549 status: ModelStatus::DOWNLOADED,
550 created_at: now - Duration::days(6),
551 last_used_at: now - Duration::days(5),
552 message: None,
553 };
554
555 let threshold = DurationConfig::new(Duration::days(7)); assert!(LruEvictionPolicy::is_time_expired(&old_model, &threshold));
558 assert!(!LruEvictionPolicy::is_time_expired(
559 &recent_model,
560 &threshold
561 ));
562 }
563
564 #[tokio::test]
565 async fn test_lru_eviction_policy_time_based() {
566 let now = Utc::now();
567
568 let models = vec![
569 ModelRecord {
570 model_name: "old-model".to_string(),
571 provider: ModelProvider::HuggingFace,
572 status: ModelStatus::DOWNLOADED,
573 created_at: now - Duration::days(10),
574 last_used_at: now - Duration::days(8),
575 message: None,
576 },
577 ModelRecord {
578 model_name: "recent-model".to_string(),
579 provider: ModelProvider::HuggingFace,
580 status: ModelStatus::DOWNLOADED,
581 created_at: now - Duration::days(6),
582 last_used_at: now - Duration::days(5),
583 message: None,
584 },
585 ModelRecord {
586 model_name: "downloading-model".to_string(),
587 provider: ModelProvider::HuggingFace,
588 status: ModelStatus::DOWNLOADING,
589 created_at: now - Duration::days(10),
590 last_used_at: now - Duration::days(8),
591 message: None,
592 },
593 ];
594
595 let config = CacheEvictionConfig {
596 enabled: true,
597 policy: EvictionPolicyType::Lru(LruConfig {
598 unused_threshold: DurationConfig::new(Duration::days(7)), max_models: None,
600 min_free_space_bytes: None,
601 }),
602 check_interval: DurationConfig::hours(1),
603 };
604
605 let policy = LruEvictionPolicy;
606 let evicted = policy
607 .select_for_eviction(&models, &config)
608 .await
609 .expect("Failed to select models for eviction");
610
611 assert_eq!(evicted.len(), 1);
613 assert_eq!(evicted[0], "old-model");
614 }
615
616 #[tokio::test]
617 async fn test_lru_eviction_policy_count_based() {
618 let now = Utc::now();
619
620 let models = vec![
621 ModelRecord {
622 model_name: "model1".to_string(),
623 provider: ModelProvider::HuggingFace,
624 status: ModelStatus::DOWNLOADED,
625 created_at: now - Duration::days(3),
626 last_used_at: now - Duration::days(3),
627 message: None,
628 },
629 ModelRecord {
630 model_name: "model2".to_string(),
631 provider: ModelProvider::HuggingFace,
632 status: ModelStatus::DOWNLOADED,
633 created_at: now - Duration::days(2),
634 last_used_at: now - Duration::days(2),
635 message: None,
636 },
637 ModelRecord {
638 model_name: "model3".to_string(),
639 provider: ModelProvider::HuggingFace,
640 status: ModelStatus::DOWNLOADED,
641 created_at: now - Duration::days(1),
642 last_used_at: now - Duration::days(1),
643 message: None,
644 },
645 ];
646
647 let config = CacheEvictionConfig {
648 enabled: true,
649 policy: EvictionPolicyType::Lru(LruConfig {
650 unused_threshold: DurationConfig::new(Duration::days(30)), max_models: Some(2), min_free_space_bytes: None,
653 }),
654 check_interval: DurationConfig::hours(1),
655 };
656
657 let policy = LruEvictionPolicy;
658 let evicted = policy
659 .select_for_eviction(&models, &config)
660 .await
661 .expect("Failed to select models for eviction");
662
663 assert_eq!(evicted.len(), 1);
665 assert_eq!(evicted[0], "model1");
666 }
667
668 #[tokio::test]
669 async fn test_cache_eviction_service_creation() {
670 let mock = MockRegistryBackend::new();
671 let (service, _cache_dir) = service_with_mock(mock, CacheEvictionConfig::default());
672 assert!(service.config.enabled);
673 }
674
675 #[tokio::test]
676 async fn test_get_cache_stats_uses_registry() {
677 let now = Utc::now();
678 let mut mock = MockRegistryBackend::new();
679 mock.expect_get_models_by_last_used()
680 .once()
681 .returning(move |_| {
682 Ok(vec![ModelRecord {
683 model_name: "model1".to_string(),
684 provider: ModelProvider::HuggingFace,
685 status: ModelStatus::DOWNLOADED,
686 created_at: now,
687 last_used_at: now,
688 message: None,
689 }])
690 });
691 mock.expect_get_status_counts()
692 .once()
693 .returning(|| Ok((1, 1, 1)));
694 let (service, _cache_dir) = service_with_mock(mock, CacheEvictionConfig::default());
695 let stats = service.get_cache_stats().await.expect("stats");
696 assert_eq!(stats.total_models, 1);
697 assert_eq!(stats.downloaded_models, 1);
698 assert_eq!(stats.downloading_models, 1);
699 assert_eq!(stats.error_models, 1);
700 }
701}