Skip to main content

modelexpress_server/
cache.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use chrono::{DateTime, Duration, Utc};
5use serde::{Deserialize, Serialize};
6use tokio::time::{Duration as TokioDuration, interval};
7use tracing::{debug, error, info, warn};
8
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use crate::registry::backend::ModelRecord;
13use crate::registry::state::RegistryManager;
14use modelexpress_common::config::DurationConfig;
15use modelexpress_common::download::get_provider;
16use modelexpress_common::models::ModelStatus;
17
18/// Configuration for cache eviction policies
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct CacheEvictionConfig {
21    /// Whether cache eviction is enabled
22    pub enabled: bool,
23    /// The eviction policy to use
24    pub policy: EvictionPolicyType,
25    /// How often to run the eviction process (accepts duration strings like "2h", "30m", "45s")
26    pub check_interval: DurationConfig,
27}
28
29impl Default for CacheEvictionConfig {
30    fn default() -> Self {
31        Self {
32            enabled: true,
33            policy: EvictionPolicyType::Lru(LruConfig::default()),
34            check_interval: DurationConfig::hours(1), // Default: check every hour
35        }
36    }
37}
38
39/// Available cache eviction policies
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "type", rename_all = "lowercase")]
42pub enum EvictionPolicyType {
43    /// Least Recently Used policy
44    Lru(LruConfig),
45}
46
47/// Configuration for LRU eviction policy
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct LruConfig {
50    /// Time threshold before an unused model is eligible for removal
51    pub unused_threshold: DurationConfig,
52    /// Maximum number of models to keep (None = no limit based on count)
53    pub max_models: Option<u32>,
54    /// Minimum free disk space to maintain (in bytes, None = no disk space checks)
55    pub min_free_space_bytes: Option<u64>,
56}
57
58impl Default for LruConfig {
59    fn default() -> Self {
60        Self {
61            unused_threshold: DurationConfig::new(Duration::days(7)), // Default: 7 days
62            max_models: None,
63            min_free_space_bytes: None,
64        }
65    }
66}
67
68/// Result of a cache eviction operation
69#[derive(Debug, Clone)]
70pub struct EvictionResult {
71    /// Number of models that were evicted
72    pub evicted_count: u32,
73    /// List of model names that were evicted
74    pub evicted_models: Vec<String>,
75    /// Total size freed (if available)
76    pub bytes_freed: Option<u64>,
77    /// Reason for eviction
78    pub reason: EvictionReason,
79}
80
81/// Reason for cache eviction
82#[derive(Debug, Clone)]
83pub enum EvictionReason {
84    /// Models exceeded unused time threshold
85    TimeThreshold,
86    /// Too many models (count limit)
87    CountLimit,
88    /// Insufficient disk space
89    DiskSpace,
90    /// Manual eviction requested
91    Manual,
92}
93
94/// Trait for implementing different eviction policies
95#[async_trait::async_trait]
96pub trait EvictionPolicyTrait {
97    /// Determine which models should be evicted based on the policy
98    async fn select_for_eviction(
99        &self,
100        models: &[ModelRecord],
101        config: &CacheEvictionConfig,
102    ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
103}
104
105/// LRU (Least Recently Used) eviction policy implementation
106pub struct LruEvictionPolicy;
107
108impl LruEvictionPolicy {
109    /// Check if a model should be evicted based on time threshold
110    fn is_time_expired(model: &ModelRecord, threshold: &DurationConfig) -> bool {
111        let threshold_duration = threshold.as_chrono_duration();
112        let cutoff_time = match Utc::now().checked_sub_signed(threshold_duration) {
113            Some(time) => time,
114            None => Utc::now(),
115        };
116        model.last_used_at < cutoff_time
117    }
118
119    /// Get disk space information for the models directory
120    async fn get_disk_space_info() -> Option<(u64, u64)> {
121        // This is a placeholder - in a real implementation you would:
122        // 1. Check the actual models directory path
123        // 2. Use statvfs or similar to get actual disk space
124        // For now, we'll return None to indicate disk space checking is not implemented
125        None
126    }
127}
128
129#[async_trait::async_trait]
130impl EvictionPolicyTrait for LruEvictionPolicy {
131    async fn select_for_eviction(
132        &self,
133        models: &[ModelRecord],
134        config: &CacheEvictionConfig,
135    ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
136        let EvictionPolicyType::Lru(lru_config) = &config.policy;
137
138        let mut candidates_for_eviction = Vec::new();
139
140        // Filter models that are eligible for eviction (only DOWNLOADED models)
141        let downloaded_models: Vec<&ModelRecord> = models
142            .iter()
143            .filter(|model| model.status == ModelStatus::DOWNLOADED)
144            .collect();
145
146        debug!(
147            "Evaluating {downloaded_count} downloaded models for eviction",
148            downloaded_count = downloaded_models.len()
149        );
150
151        // 1. Check time-based eviction
152        for model in &downloaded_models {
153            if Self::is_time_expired(model, &lru_config.unused_threshold) {
154                debug!(
155                    "Model '{model_name}' is expired (last used: {last_used_at})",
156                    model_name = model.model_name,
157                    last_used_at = model.last_used_at
158                );
159                candidates_for_eviction.push(model.model_name.clone());
160            }
161        }
162
163        // 2. Check count-based eviction
164        if let Some(max_models) = lru_config.max_models {
165            let models_to_remove_by_count =
166                downloaded_models.len().saturating_sub(max_models as usize);
167            if models_to_remove_by_count > 0 {
168                debug!(
169                    "Need to remove {models_to_remove_by_count} models due to count limit (have: {downloaded_count}, max: {max_models})",
170                    models_to_remove_by_count = models_to_remove_by_count,
171                    downloaded_count = downloaded_models.len(),
172                    max_models = max_models
173                );
174
175                // Sort by last_used_at (oldest first) and take the oldest models
176                let mut sorted_models = downloaded_models.clone();
177                sorted_models.sort_by_key(|model| model.last_used_at);
178
179                for model in sorted_models.iter().take(models_to_remove_by_count) {
180                    if !candidates_for_eviction.contains(&model.model_name) {
181                        candidates_for_eviction.push(model.model_name.clone());
182                    }
183                }
184            }
185        }
186
187        // 3. Check disk space-based eviction (if configured and implemented)
188        if let Some(_min_free_space) = lru_config.min_free_space_bytes
189            && let Some((_total_space, _free_space)) = Self::get_disk_space_info().await
190        {
191            // This is where we would implement disk space checking
192            // For now, we'll log that it's not implemented
193            debug!("Disk space checking is not yet implemented");
194        }
195
196        debug!(
197            "Selected {evicted_count} models for eviction: {candidates:?}",
198            evicted_count = candidates_for_eviction.len(),
199            candidates = candidates_for_eviction
200        );
201
202        Ok(candidates_for_eviction)
203    }
204}
205
206/// Background service that manages cache eviction
207pub struct CacheEvictionService {
208    registry: Arc<RegistryManager>,
209    config: CacheEvictionConfig,
210    cache_directory: PathBuf,
211}
212
213impl CacheEvictionService {
214    /// Create a new cache eviction service
215    pub fn new(
216        registry: Arc<RegistryManager>,
217        config: CacheEvictionConfig,
218        cache_directory: PathBuf,
219    ) -> Self {
220        Self {
221            registry,
222            config,
223            cache_directory,
224        }
225    }
226
227    /// Start the background eviction service
228    pub async fn start(
229        self,
230        mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
231    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
232        if !self.config.enabled {
233            info!("Cache eviction service is disabled");
234            return Ok(());
235        }
236
237        info!(
238            "Starting cache eviction service with policy: {policy:?}, check interval: {interval}s",
239            policy = self.config.policy,
240            interval = self.config.check_interval.num_seconds()
241        );
242
243        let mut interval_timer = interval(TokioDuration::from_secs(
244            self.config.check_interval.num_seconds() as u64,
245        ));
246
247        loop {
248            tokio::select! {
249                _ = interval_timer.tick() => {
250                    if let Err(e) = self.run_eviction_cycle().await {
251                        error!("Error during cache eviction cycle: {e}", e = e);
252                    }
253                }
254                _ = &mut shutdown_receiver => {
255                    info!("Cache eviction service received shutdown signal");
256                    break;
257                }
258            }
259        }
260
261        info!("Cache eviction service stopped");
262        Ok(())
263    }
264
265    /// Run a single eviction cycle
266    async fn run_eviction_cycle(
267        &self,
268    ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
269        debug!("Starting cache eviction cycle");
270
271        // Get all models from the database
272        let models = self.registry.get_models_by_last_used(None).await?;
273        debug!(
274            "Found {total_models} total models in database",
275            total_models = models.len()
276        );
277
278        // Select models for eviction based on the configured policy
279        let models_to_evict = match &self.config.policy {
280            EvictionPolicyType::Lru(_) => {
281                let lru_policy = LruEvictionPolicy;
282                lru_policy
283                    .select_for_eviction(&models, &self.config)
284                    .await?
285            }
286        };
287
288        let evicted_count = models_to_evict.len() as u32;
289
290        if evicted_count == 0 {
291            debug!("No models selected for eviction");
292            return Ok(EvictionResult {
293                evicted_count: 0,
294                evicted_models: Vec::new(),
295                bytes_freed: None,
296                reason: EvictionReason::TimeThreshold,
297            });
298        }
299
300        info!(
301            "Evicting {evicted_count} models: {models:?}",
302            evicted_count = evicted_count,
303            models = models_to_evict
304        );
305
306        // Remove models from the database and filesystem
307        let mut successfully_evicted = Vec::new();
308        for model_name in &models_to_evict {
309            match self.evict_model(model_name).await {
310                Ok(()) => {
311                    successfully_evicted.push(model_name.clone());
312                    info!(
313                        "Successfully evicted model: {model_name}",
314                        model_name = model_name
315                    );
316                }
317                Err(e) => {
318                    warn!(
319                        "Failed to evict model '{model_name}': {e}",
320                        model_name = model_name,
321                        e = e
322                    );
323                }
324            }
325        }
326
327        let result = EvictionResult {
328            evicted_count: successfully_evicted.len() as u32,
329            evicted_models: successfully_evicted,
330            bytes_freed: None, // Could be implemented with actual file size tracking
331            reason: EvictionReason::TimeThreshold,
332        };
333
334        if result.evicted_count > 0 {
335            info!(
336                "Cache eviction cycle completed: {evicted_count} models evicted",
337                evicted_count = result.evicted_count
338            );
339        } else {
340            debug!("Cache eviction cycle completed: no models evicted");
341        }
342
343        Ok(result)
344    }
345
346    /// Evict a single model (remove from filesystem, then from database)
347    async fn evict_model(
348        &self,
349        model_name: &str,
350    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
351        // Look up the model record to determine the provider
352        let record = self
353            .registry
354            .get_model_record(model_name)
355            .await?
356            .ok_or_else(|| format!("model '{model_name}' not found in registry"))?;
357
358        // Delete files from disk first. If this fails, we keep the registry record
359        // so the next eviction cycle can retry.
360        let provider = get_provider(record.provider);
361        provider
362            .delete_model(model_name, self.cache_directory.clone())
363            .await
364            .map_err(|e| format!("failed to delete model files for '{model_name}': {e}"))?;
365
366        // Only remove from registry after successful filesystem deletion
367        self.registry.delete_model(model_name).await?;
368
369        Ok(())
370    }
371
372    /// Manually trigger eviction for specific models
373    pub async fn manual_evict(
374        &self,
375        model_names: &[String],
376    ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
377        info!(
378            "Manual eviction requested for models: {models:?}",
379            models = model_names
380        );
381
382        let mut successfully_evicted = Vec::new();
383        for model_name in model_names {
384            match self.evict_model(model_name).await {
385                Ok(()) => {
386                    successfully_evicted.push(model_name.clone());
387                    info!(
388                        "Successfully evicted model: {model_name}",
389                        model_name = model_name
390                    );
391                }
392                Err(e) => {
393                    warn!(
394                        "Failed to evict model '{model_name}': {e}",
395                        model_name = model_name,
396                        e = e
397                    );
398                }
399            }
400        }
401
402        Ok(EvictionResult {
403            evicted_count: successfully_evicted.len() as u32,
404            evicted_models: successfully_evicted,
405            bytes_freed: None,
406            reason: EvictionReason::Manual,
407        })
408    }
409
410    /// Get statistics about the current cache state
411    pub async fn get_cache_stats(
412        &self,
413    ) -> Result<CacheStats, Box<dyn std::error::Error + Send + Sync>> {
414        let models = self.registry.get_models_by_last_used(None).await?;
415        let (downloading, downloaded, error) = self.registry.get_status_counts().await?;
416
417        let _now = Utc::now();
418        let mut oldest_model: Option<DateTime<Utc>> = None;
419        let mut newest_model: Option<DateTime<Utc>> = None;
420
421        for model in &models {
422            if model.status == ModelStatus::DOWNLOADED {
423                if oldest_model.is_none_or(|oldest| model.last_used_at < oldest) {
424                    oldest_model = Some(model.last_used_at);
425                }
426                if newest_model.is_none_or(|newest| model.last_used_at > newest) {
427                    newest_model = Some(model.last_used_at);
428                }
429            }
430        }
431
432        Ok(CacheStats {
433            total_models: models.len() as u32,
434            downloading_models: downloading,
435            downloaded_models: downloaded,
436            error_models: error,
437            oldest_model_last_used: oldest_model,
438            newest_model_last_used: newest_model,
439        })
440    }
441}
442
443/// Statistics about the current cache state
444#[derive(Debug, Clone, Serialize)]
445pub struct CacheStats {
446    pub total_models: u32,
447    pub downloading_models: u32,
448    pub downloaded_models: u32,
449    pub error_models: u32,
450    pub oldest_model_last_used: Option<DateTime<Utc>>,
451    pub newest_model_last_used: Option<DateTime<Utc>>,
452}
453
454#[cfg(test)]
455#[allow(clippy::expect_used)]
456mod tests {
457    use super::*;
458    use crate::registry::backend::MockRegistryBackend;
459    use modelexpress_common::models::ModelProvider;
460    use tempfile::TempDir;
461
462    fn service_with_mock(
463        mock: MockRegistryBackend,
464        config: CacheEvictionConfig,
465    ) -> (CacheEvictionService, TempDir) {
466        let registry = Arc::new(RegistryManager::with_backend(Arc::new(mock)));
467        let cache_dir = TempDir::new().expect("Failed to create cache directory");
468        let service = CacheEvictionService::new(registry, config, cache_dir.path().to_path_buf());
469        (service, cache_dir)
470    }
471
472    #[test]
473    fn test_default_config() {
474        let config = CacheEvictionConfig::default();
475        assert!(config.enabled);
476        assert_eq!(config.check_interval.num_seconds(), 3600);
477        assert!(matches!(config.policy, EvictionPolicyType::Lru(_)));
478    }
479
480    #[test]
481    fn test_lru_config_defaults() {
482        let lru_config = LruConfig::default();
483        assert_eq!(lru_config.unused_threshold.num_seconds(), 7 * 24 * 3600);
484        assert!(lru_config.max_models.is_none());
485        assert!(lru_config.min_free_space_bytes.is_none());
486    }
487
488    #[test]
489    fn test_duration_config_parsing() {
490        use modelexpress_common::config::parse_duration_string;
491
492        // Test string parsing
493        let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": "7d"}, "check_interval": "2h"}"#;
494        let config: CacheEvictionConfig =
495            serde_json::from_str(json).expect("Failed to parse config");
496        assert_eq!(config.check_interval.num_seconds(), 2 * 3600); // 2 hours
497
498        // Test number parsing (seconds)
499        let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": 604800}, "check_interval": 1800}"#;
500        let config: CacheEvictionConfig =
501            serde_json::from_str(json).expect("Failed to parse config");
502        assert_eq!(config.check_interval.num_seconds(), 1800); // 30 minutes
503
504        // Test various duration formats
505        assert_eq!(
506            parse_duration_string("30m")
507                .expect("Failed to parse 30m")
508                .num_seconds(),
509            30 * 60
510        );
511        assert_eq!(
512            parse_duration_string("45s")
513                .expect("Failed to parse 45s")
514                .num_seconds(),
515            45
516        );
517        assert_eq!(
518            parse_duration_string("1d")
519                .expect("Failed to parse 1d")
520                .num_seconds(),
521            24 * 3600
522        );
523        assert_eq!(
524            parse_duration_string("2h30m")
525                .expect("Failed to parse 2h30m")
526                .num_seconds(),
527            2 * 3600 + 30 * 60
528        );
529    }
530
531    #[test]
532    fn test_is_time_expired() {
533        let now = Utc::now();
534
535        // Create a model that was last used 8 days ago
536        let old_model = ModelRecord {
537            model_name: "old-model".to_string(),
538            provider: ModelProvider::HuggingFace,
539            status: ModelStatus::DOWNLOADED,
540            created_at: now - Duration::days(10),
541            last_used_at: now - Duration::days(8),
542            message: None,
543        };
544
545        // Create a model that was last used 5 days ago
546        let recent_model = ModelRecord {
547            model_name: "recent-model".to_string(),
548            provider: ModelProvider::HuggingFace,
549            status: ModelStatus::DOWNLOADED,
550            created_at: now - Duration::days(6),
551            last_used_at: now - Duration::days(5),
552            message: None,
553        };
554
555        let threshold = DurationConfig::new(Duration::days(7)); // 7 days
556
557        assert!(LruEvictionPolicy::is_time_expired(&old_model, &threshold));
558        assert!(!LruEvictionPolicy::is_time_expired(
559            &recent_model,
560            &threshold
561        ));
562    }
563
564    #[tokio::test]
565    async fn test_lru_eviction_policy_time_based() {
566        let now = Utc::now();
567
568        let models = vec![
569            ModelRecord {
570                model_name: "old-model".to_string(),
571                provider: ModelProvider::HuggingFace,
572                status: ModelStatus::DOWNLOADED,
573                created_at: now - Duration::days(10),
574                last_used_at: now - Duration::days(8),
575                message: None,
576            },
577            ModelRecord {
578                model_name: "recent-model".to_string(),
579                provider: ModelProvider::HuggingFace,
580                status: ModelStatus::DOWNLOADED,
581                created_at: now - Duration::days(6),
582                last_used_at: now - Duration::days(5),
583                message: None,
584            },
585            ModelRecord {
586                model_name: "downloading-model".to_string(),
587                provider: ModelProvider::HuggingFace,
588                status: ModelStatus::DOWNLOADING,
589                created_at: now - Duration::days(10),
590                last_used_at: now - Duration::days(8),
591                message: None,
592            },
593        ];
594
595        let config = CacheEvictionConfig {
596            enabled: true,
597            policy: EvictionPolicyType::Lru(LruConfig {
598                unused_threshold: DurationConfig::new(Duration::days(7)), // 7 days
599                max_models: None,
600                min_free_space_bytes: None,
601            }),
602            check_interval: DurationConfig::hours(1),
603        };
604
605        let policy = LruEvictionPolicy;
606        let evicted = policy
607            .select_for_eviction(&models, &config)
608            .await
609            .expect("Failed to select models for eviction");
610
611        // Should only evict the old downloaded model, not the downloading one
612        assert_eq!(evicted.len(), 1);
613        assert_eq!(evicted[0], "old-model");
614    }
615
616    #[tokio::test]
617    async fn test_lru_eviction_policy_count_based() {
618        let now = Utc::now();
619
620        let models = vec![
621            ModelRecord {
622                model_name: "model1".to_string(),
623                provider: ModelProvider::HuggingFace,
624                status: ModelStatus::DOWNLOADED,
625                created_at: now - Duration::days(3),
626                last_used_at: now - Duration::days(3),
627                message: None,
628            },
629            ModelRecord {
630                model_name: "model2".to_string(),
631                provider: ModelProvider::HuggingFace,
632                status: ModelStatus::DOWNLOADED,
633                created_at: now - Duration::days(2),
634                last_used_at: now - Duration::days(2),
635                message: None,
636            },
637            ModelRecord {
638                model_name: "model3".to_string(),
639                provider: ModelProvider::HuggingFace,
640                status: ModelStatus::DOWNLOADED,
641                created_at: now - Duration::days(1),
642                last_used_at: now - Duration::days(1),
643                message: None,
644            },
645        ];
646
647        let config = CacheEvictionConfig {
648            enabled: true,
649            policy: EvictionPolicyType::Lru(LruConfig {
650                unused_threshold: DurationConfig::new(Duration::days(30)), // 30 days (none should be expired)
651                max_models: Some(2),                                       // Limit to 2 models
652                min_free_space_bytes: None,
653            }),
654            check_interval: DurationConfig::hours(1),
655        };
656
657        let policy = LruEvictionPolicy;
658        let evicted = policy
659            .select_for_eviction(&models, &config)
660            .await
661            .expect("Failed to select models for eviction");
662
663        // Should evict the oldest model to stay within the limit of 2
664        assert_eq!(evicted.len(), 1);
665        assert_eq!(evicted[0], "model1");
666    }
667
668    #[tokio::test]
669    async fn test_cache_eviction_service_creation() {
670        let mock = MockRegistryBackend::new();
671        let (service, _cache_dir) = service_with_mock(mock, CacheEvictionConfig::default());
672        assert!(service.config.enabled);
673    }
674
675    #[tokio::test]
676    async fn test_get_cache_stats_uses_registry() {
677        let now = Utc::now();
678        let mut mock = MockRegistryBackend::new();
679        mock.expect_get_models_by_last_used()
680            .once()
681            .returning(move |_| {
682                Ok(vec![ModelRecord {
683                    model_name: "model1".to_string(),
684                    provider: ModelProvider::HuggingFace,
685                    status: ModelStatus::DOWNLOADED,
686                    created_at: now,
687                    last_used_at: now,
688                    message: None,
689                }])
690            });
691        mock.expect_get_status_counts()
692            .once()
693            .returning(|| Ok((1, 1, 1)));
694        let (service, _cache_dir) = service_with_mock(mock, CacheEvictionConfig::default());
695        let stats = service.get_cache_stats().await.expect("stats");
696        assert_eq!(stats.total_models, 1);
697        assert_eq!(stats.downloaded_models, 1);
698        assert_eq!(stats.downloading_models, 1);
699        assert_eq!(stats.error_models, 1);
700    }
701}