modelexpress_server/
cache.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use chrono::{DateTime, Duration, Utc};
5use serde::{Deserialize, Serialize};
6use tokio::time::{Duration as TokioDuration, interval};
7use tracing::{debug, error, info, warn};
8
9use crate::database::{ModelDatabase, ModelRecord};
10use modelexpress_common::config::DurationConfig;
11use modelexpress_common::models::ModelStatus;
12
13/// Configuration for cache eviction policies
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CacheEvictionConfig {
16    /// Whether cache eviction is enabled
17    pub enabled: bool,
18    /// The eviction policy to use
19    pub policy: EvictionPolicyType,
20    /// How often to run the eviction process (accepts duration strings like "2h", "30m", "45s")
21    pub check_interval: DurationConfig,
22}
23
24impl Default for CacheEvictionConfig {
25    fn default() -> Self {
26        Self {
27            enabled: true,
28            policy: EvictionPolicyType::Lru(LruConfig::default()),
29            check_interval: DurationConfig::hours(1), // Default: check every hour
30        }
31    }
32}
33
34/// Available cache eviction policies
35#[derive(Debug, Clone, Serialize, Deserialize)]
36#[serde(tag = "type", rename_all = "lowercase")]
37pub enum EvictionPolicyType {
38    /// Least Recently Used policy
39    Lru(LruConfig),
40}
41
42/// Configuration for LRU eviction policy
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct LruConfig {
45    /// Time threshold before an unused model is eligible for removal
46    pub unused_threshold: DurationConfig,
47    /// Maximum number of models to keep (None = no limit based on count)
48    pub max_models: Option<u32>,
49    /// Minimum free disk space to maintain (in bytes, None = no disk space checks)
50    pub min_free_space_bytes: Option<u64>,
51}
52
53impl Default for LruConfig {
54    fn default() -> Self {
55        Self {
56            unused_threshold: DurationConfig::new(Duration::days(7)), // Default: 7 days
57            max_models: None,
58            min_free_space_bytes: None,
59        }
60    }
61}
62
63/// Result of a cache eviction operation
64#[derive(Debug, Clone)]
65pub struct EvictionResult {
66    /// Number of models that were evicted
67    pub evicted_count: u32,
68    /// List of model names that were evicted
69    pub evicted_models: Vec<String>,
70    /// Total size freed (if available)
71    pub bytes_freed: Option<u64>,
72    /// Reason for eviction
73    pub reason: EvictionReason,
74}
75
76/// Reason for cache eviction
77#[derive(Debug, Clone)]
78pub enum EvictionReason {
79    /// Models exceeded unused time threshold
80    TimeThreshold,
81    /// Too many models (count limit)
82    CountLimit,
83    /// Insufficient disk space
84    DiskSpace,
85    /// Manual eviction requested
86    Manual,
87}
88
89/// Trait for implementing different eviction policies
90#[async_trait::async_trait]
91pub trait EvictionPolicyTrait {
92    /// Determine which models should be evicted based on the policy
93    async fn select_for_eviction(
94        &self,
95        models: &[ModelRecord],
96        config: &CacheEvictionConfig,
97    ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
98}
99
100/// LRU (Least Recently Used) eviction policy implementation
101pub struct LruEvictionPolicy;
102
103impl LruEvictionPolicy {
104    /// Check if a model should be evicted based on time threshold
105    fn is_time_expired(model: &ModelRecord, threshold: &DurationConfig) -> bool {
106        let threshold_duration = threshold.as_chrono_duration();
107        let cutoff_time = match Utc::now().checked_sub_signed(threshold_duration) {
108            Some(time) => time,
109            None => Utc::now(),
110        };
111        model.last_used_at < cutoff_time
112    }
113
114    /// Get disk space information for the models directory
115    async fn get_disk_space_info() -> Option<(u64, u64)> {
116        // This is a placeholder - in a real implementation you would:
117        // 1. Check the actual models directory path
118        // 2. Use statvfs or similar to get actual disk space
119        // For now, we'll return None to indicate disk space checking is not implemented
120        None
121    }
122}
123
124#[async_trait::async_trait]
125impl EvictionPolicyTrait for LruEvictionPolicy {
126    async fn select_for_eviction(
127        &self,
128        models: &[ModelRecord],
129        config: &CacheEvictionConfig,
130    ) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
131        let EvictionPolicyType::Lru(lru_config) = &config.policy;
132
133        let mut candidates_for_eviction = Vec::new();
134
135        // Filter models that are eligible for eviction (only DOWNLOADED models)
136        let downloaded_models: Vec<&ModelRecord> = models
137            .iter()
138            .filter(|model| model.status == ModelStatus::DOWNLOADED)
139            .collect();
140
141        debug!(
142            "Evaluating {downloaded_count} downloaded models for eviction",
143            downloaded_count = downloaded_models.len()
144        );
145
146        // 1. Check time-based eviction
147        for model in &downloaded_models {
148            if Self::is_time_expired(model, &lru_config.unused_threshold) {
149                debug!(
150                    "Model '{model_name}' is expired (last used: {last_used_at})",
151                    model_name = model.model_name,
152                    last_used_at = model.last_used_at
153                );
154                candidates_for_eviction.push(model.model_name.clone());
155            }
156        }
157
158        // 2. Check count-based eviction
159        if let Some(max_models) = lru_config.max_models {
160            let models_to_remove_by_count =
161                downloaded_models.len().saturating_sub(max_models as usize);
162            if models_to_remove_by_count > 0 {
163                debug!(
164                    "Need to remove {models_to_remove_by_count} models due to count limit (have: {downloaded_count}, max: {max_models})",
165                    models_to_remove_by_count = models_to_remove_by_count,
166                    downloaded_count = downloaded_models.len(),
167                    max_models = max_models
168                );
169
170                // Sort by last_used_at (oldest first) and take the oldest models
171                let mut sorted_models = downloaded_models.clone();
172                sorted_models.sort_by_key(|model| model.last_used_at);
173
174                for model in sorted_models.iter().take(models_to_remove_by_count) {
175                    if !candidates_for_eviction.contains(&model.model_name) {
176                        candidates_for_eviction.push(model.model_name.clone());
177                    }
178                }
179            }
180        }
181
182        // 3. Check disk space-based eviction (if configured and implemented)
183        if let Some(_min_free_space) = lru_config.min_free_space_bytes
184            && let Some((_total_space, _free_space)) = Self::get_disk_space_info().await
185        {
186            // This is where we would implement disk space checking
187            // For now, we'll log that it's not implemented
188            debug!("Disk space checking is not yet implemented");
189        }
190
191        debug!(
192            "Selected {evicted_count} models for eviction: {candidates:?}",
193            evicted_count = candidates_for_eviction.len(),
194            candidates = candidates_for_eviction
195        );
196
197        Ok(candidates_for_eviction)
198    }
199}
200
201/// Background service that manages cache eviction
202pub struct CacheEvictionService {
203    database: ModelDatabase,
204    config: CacheEvictionConfig,
205}
206
207impl CacheEvictionService {
208    /// Create a new cache eviction service
209    pub fn new(database: ModelDatabase, config: CacheEvictionConfig) -> Self {
210        Self { database, config }
211    }
212
213    /// Start the background eviction service
214    pub async fn start(
215        self,
216        mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>,
217    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
218        if !self.config.enabled {
219            info!("Cache eviction service is disabled");
220            return Ok(());
221        }
222
223        info!(
224            "Starting cache eviction service with policy: {policy:?}, check interval: {interval}s",
225            policy = self.config.policy,
226            interval = self.config.check_interval.num_seconds()
227        );
228
229        let mut interval_timer = interval(TokioDuration::from_secs(
230            self.config.check_interval.num_seconds() as u64,
231        ));
232
233        loop {
234            tokio::select! {
235                _ = interval_timer.tick() => {
236                    if let Err(e) = self.run_eviction_cycle().await {
237                        error!("Error during cache eviction cycle: {e}", e = e);
238                    }
239                }
240                _ = &mut shutdown_receiver => {
241                    info!("Cache eviction service received shutdown signal");
242                    break;
243                }
244            }
245        }
246
247        info!("Cache eviction service stopped");
248        Ok(())
249    }
250
251    /// Run a single eviction cycle
252    async fn run_eviction_cycle(
253        &self,
254    ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
255        debug!("Starting cache eviction cycle");
256
257        // Get all models from the database
258        let models = self.database.get_models_by_last_used(None)?;
259        debug!(
260            "Found {total_models} total models in database",
261            total_models = models.len()
262        );
263
264        // Select models for eviction based on the configured policy
265        let models_to_evict = match &self.config.policy {
266            EvictionPolicyType::Lru(_) => {
267                let lru_policy = LruEvictionPolicy;
268                lru_policy
269                    .select_for_eviction(&models, &self.config)
270                    .await?
271            }
272        };
273
274        let evicted_count = models_to_evict.len() as u32;
275
276        if evicted_count == 0 {
277            debug!("No models selected for eviction");
278            return Ok(EvictionResult {
279                evicted_count: 0,
280                evicted_models: Vec::new(),
281                bytes_freed: None,
282                reason: EvictionReason::TimeThreshold,
283            });
284        }
285
286        info!(
287            "Evicting {evicted_count} models: {models:?}",
288            evicted_count = evicted_count,
289            models = models_to_evict
290        );
291
292        // Remove models from the database and filesystem
293        let mut successfully_evicted = Vec::new();
294        for model_name in &models_to_evict {
295            match self.evict_model(model_name).await {
296                Ok(()) => {
297                    successfully_evicted.push(model_name.clone());
298                    info!(
299                        "Successfully evicted model: {model_name}",
300                        model_name = model_name
301                    );
302                }
303                Err(e) => {
304                    warn!(
305                        "Failed to evict model '{model_name}': {e}",
306                        model_name = model_name,
307                        e = e
308                    );
309                }
310            }
311        }
312
313        let result = EvictionResult {
314            evicted_count: successfully_evicted.len() as u32,
315            evicted_models: successfully_evicted,
316            bytes_freed: None, // Could be implemented with actual file size tracking
317            reason: EvictionReason::TimeThreshold,
318        };
319
320        if result.evicted_count > 0 {
321            info!(
322                "Cache eviction cycle completed: {evicted_count} models evicted",
323                evicted_count = result.evicted_count
324            );
325        } else {
326            debug!("Cache eviction cycle completed: no models evicted");
327        }
328
329        Ok(result)
330    }
331
332    /// Evict a single model (remove from database and filesystem)
333    async fn evict_model(
334        &self,
335        model_name: &str,
336    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
337        // Remove from database first
338        self.database.delete_model(model_name)?;
339
340        // Remove from filesystem
341        // This is where you would implement actual file removal
342        // For now, we'll just log the action since the download module
343        // would need to be consulted for the actual file paths
344        debug!(
345            "Would remove model files for: {model_name}",
346            model_name = model_name
347        );
348
349        // In a real implementation, you would:
350        // 1. Get the model file path from the download module
351        // 2. Remove the model files from disk
352        // 3. Update any in-memory caches
353
354        Ok(())
355    }
356
357    /// Manually trigger eviction for specific models
358    pub async fn manual_evict(
359        &self,
360        model_names: &[String],
361    ) -> Result<EvictionResult, Box<dyn std::error::Error + Send + Sync>> {
362        info!(
363            "Manual eviction requested for models: {models:?}",
364            models = model_names
365        );
366
367        let mut successfully_evicted = Vec::new();
368        for model_name in model_names {
369            match self.evict_model(model_name).await {
370                Ok(()) => {
371                    successfully_evicted.push(model_name.clone());
372                    info!(
373                        "Successfully evicted model: {model_name}",
374                        model_name = model_name
375                    );
376                }
377                Err(e) => {
378                    warn!(
379                        "Failed to evict model '{model_name}': {e}",
380                        model_name = model_name,
381                        e = e
382                    );
383                }
384            }
385        }
386
387        Ok(EvictionResult {
388            evicted_count: successfully_evicted.len() as u32,
389            evicted_models: successfully_evicted,
390            bytes_freed: None,
391            reason: EvictionReason::Manual,
392        })
393    }
394
395    /// Get statistics about the current cache state
396    pub async fn get_cache_stats(
397        &self,
398    ) -> Result<CacheStats, Box<dyn std::error::Error + Send + Sync>> {
399        let models = self.database.get_models_by_last_used(None)?;
400        let (downloading, downloaded, error) = self.database.get_status_counts()?;
401
402        let _now = Utc::now();
403        let mut oldest_model: Option<DateTime<Utc>> = None;
404        let mut newest_model: Option<DateTime<Utc>> = None;
405
406        for model in &models {
407            if model.status == ModelStatus::DOWNLOADED {
408                if oldest_model.is_none_or(|oldest| model.last_used_at < oldest) {
409                    oldest_model = Some(model.last_used_at);
410                }
411                if newest_model.is_none_or(|newest| model.last_used_at > newest) {
412                    newest_model = Some(model.last_used_at);
413                }
414            }
415        }
416
417        Ok(CacheStats {
418            total_models: models.len() as u32,
419            downloading_models: downloading,
420            downloaded_models: downloaded,
421            error_models: error,
422            oldest_model_last_used: oldest_model,
423            newest_model_last_used: newest_model,
424        })
425    }
426}
427
428/// Statistics about the current cache state
429#[derive(Debug, Clone, Serialize)]
430pub struct CacheStats {
431    pub total_models: u32,
432    pub downloading_models: u32,
433    pub downloaded_models: u32,
434    pub error_models: u32,
435    pub oldest_model_last_used: Option<DateTime<Utc>>,
436    pub newest_model_last_used: Option<DateTime<Utc>>,
437}
438
439#[cfg(test)]
440#[allow(clippy::expect_used)]
441mod tests {
442    use super::*;
443    use crate::database::ModelDatabase;
444    use modelexpress_common::models::ModelProvider;
445    use tempfile::TempDir;
446
447    fn create_test_database() -> (ModelDatabase, TempDir) {
448        let temp_dir = TempDir::new().expect("Failed to create temporary directory");
449        let db_path = temp_dir.path().join("test_models.db");
450        let db = ModelDatabase::new(db_path.to_str().expect("Invalid path"))
451            .expect("Failed to create test database");
452        (db, temp_dir)
453    }
454
455    #[test]
456    fn test_default_config() {
457        let config = CacheEvictionConfig::default();
458        assert!(config.enabled);
459        assert_eq!(config.check_interval.num_seconds(), 3600);
460        assert!(matches!(config.policy, EvictionPolicyType::Lru(_)));
461    }
462
463    #[test]
464    fn test_lru_config_defaults() {
465        let lru_config = LruConfig::default();
466        assert_eq!(lru_config.unused_threshold.num_seconds(), 7 * 24 * 3600);
467        assert!(lru_config.max_models.is_none());
468        assert!(lru_config.min_free_space_bytes.is_none());
469    }
470
471    #[test]
472    fn test_duration_config_parsing() {
473        use modelexpress_common::config::parse_duration_string;
474
475        // Test string parsing
476        let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": "7d"}, "check_interval": "2h"}"#;
477        let config: CacheEvictionConfig =
478            serde_json::from_str(json).expect("Failed to parse config");
479        assert_eq!(config.check_interval.num_seconds(), 2 * 3600); // 2 hours
480
481        // Test number parsing (seconds)
482        let json = r#"{"enabled": true, "policy": {"type": "lru", "unused_threshold": 604800}, "check_interval": 1800}"#;
483        let config: CacheEvictionConfig =
484            serde_json::from_str(json).expect("Failed to parse config");
485        assert_eq!(config.check_interval.num_seconds(), 1800); // 30 minutes
486
487        // Test various duration formats
488        assert_eq!(
489            parse_duration_string("30m")
490                .expect("Failed to parse 30m")
491                .num_seconds(),
492            30 * 60
493        );
494        assert_eq!(
495            parse_duration_string("45s")
496                .expect("Failed to parse 45s")
497                .num_seconds(),
498            45
499        );
500        assert_eq!(
501            parse_duration_string("1d")
502                .expect("Failed to parse 1d")
503                .num_seconds(),
504            24 * 3600
505        );
506        assert_eq!(
507            parse_duration_string("2h30m")
508                .expect("Failed to parse 2h30m")
509                .num_seconds(),
510            2 * 3600 + 30 * 60
511        );
512    }
513
514    #[test]
515    fn test_is_time_expired() {
516        let now = Utc::now();
517
518        // Create a model that was last used 8 days ago
519        let old_model = ModelRecord {
520            model_name: "old-model".to_string(),
521            provider: ModelProvider::HuggingFace,
522            status: ModelStatus::DOWNLOADED,
523            created_at: now - Duration::days(10),
524            last_used_at: now - Duration::days(8),
525            message: None,
526        };
527
528        // Create a model that was last used 5 days ago
529        let recent_model = ModelRecord {
530            model_name: "recent-model".to_string(),
531            provider: ModelProvider::HuggingFace,
532            status: ModelStatus::DOWNLOADED,
533            created_at: now - Duration::days(6),
534            last_used_at: now - Duration::days(5),
535            message: None,
536        };
537
538        let threshold = DurationConfig::new(Duration::days(7)); // 7 days
539
540        assert!(LruEvictionPolicy::is_time_expired(&old_model, &threshold));
541        assert!(!LruEvictionPolicy::is_time_expired(
542            &recent_model,
543            &threshold
544        ));
545    }
546
547    #[tokio::test]
548    async fn test_lru_eviction_policy_time_based() {
549        let now = Utc::now();
550
551        let models = vec![
552            ModelRecord {
553                model_name: "old-model".to_string(),
554                provider: ModelProvider::HuggingFace,
555                status: ModelStatus::DOWNLOADED,
556                created_at: now - Duration::days(10),
557                last_used_at: now - Duration::days(8),
558                message: None,
559            },
560            ModelRecord {
561                model_name: "recent-model".to_string(),
562                provider: ModelProvider::HuggingFace,
563                status: ModelStatus::DOWNLOADED,
564                created_at: now - Duration::days(6),
565                last_used_at: now - Duration::days(5),
566                message: None,
567            },
568            ModelRecord {
569                model_name: "downloading-model".to_string(),
570                provider: ModelProvider::HuggingFace,
571                status: ModelStatus::DOWNLOADING,
572                created_at: now - Duration::days(10),
573                last_used_at: now - Duration::days(8),
574                message: None,
575            },
576        ];
577
578        let config = CacheEvictionConfig {
579            enabled: true,
580            policy: EvictionPolicyType::Lru(LruConfig {
581                unused_threshold: DurationConfig::new(Duration::days(7)), // 7 days
582                max_models: None,
583                min_free_space_bytes: None,
584            }),
585            check_interval: DurationConfig::hours(1),
586        };
587
588        let policy = LruEvictionPolicy;
589        let evicted = policy
590            .select_for_eviction(&models, &config)
591            .await
592            .expect("Failed to select models for eviction");
593
594        // Should only evict the old downloaded model, not the downloading one
595        assert_eq!(evicted.len(), 1);
596        assert_eq!(evicted[0], "old-model");
597    }
598
599    #[tokio::test]
600    async fn test_lru_eviction_policy_count_based() {
601        let now = Utc::now();
602
603        let models = vec![
604            ModelRecord {
605                model_name: "model1".to_string(),
606                provider: ModelProvider::HuggingFace,
607                status: ModelStatus::DOWNLOADED,
608                created_at: now - Duration::days(3),
609                last_used_at: now - Duration::days(3),
610                message: None,
611            },
612            ModelRecord {
613                model_name: "model2".to_string(),
614                provider: ModelProvider::HuggingFace,
615                status: ModelStatus::DOWNLOADED,
616                created_at: now - Duration::days(2),
617                last_used_at: now - Duration::days(2),
618                message: None,
619            },
620            ModelRecord {
621                model_name: "model3".to_string(),
622                provider: ModelProvider::HuggingFace,
623                status: ModelStatus::DOWNLOADED,
624                created_at: now - Duration::days(1),
625                last_used_at: now - Duration::days(1),
626                message: None,
627            },
628        ];
629
630        let config = CacheEvictionConfig {
631            enabled: true,
632            policy: EvictionPolicyType::Lru(LruConfig {
633                unused_threshold: DurationConfig::new(Duration::days(30)), // 30 days (none should be expired)
634                max_models: Some(2),                                       // Limit to 2 models
635                min_free_space_bytes: None,
636            }),
637            check_interval: DurationConfig::hours(1),
638        };
639
640        let policy = LruEvictionPolicy;
641        let evicted = policy
642            .select_for_eviction(&models, &config)
643            .await
644            .expect("Failed to select models for eviction");
645
646        // Should evict the oldest model to stay within the limit of 2
647        assert_eq!(evicted.len(), 1);
648        assert_eq!(evicted[0], "model1");
649    }
650
651    #[tokio::test]
652    async fn test_cache_eviction_service_creation() {
653        let (db, _temp_dir) = create_test_database();
654        let config = CacheEvictionConfig::default();
655
656        let service = CacheEvictionService::new(db, config.clone());
657        assert!(service.config.enabled);
658    }
659
660    #[tokio::test]
661    async fn test_manual_evict() {
662        let (db, _temp_dir) = create_test_database();
663        let config = CacheEvictionConfig::default();
664
665        // Add a test model
666        db.set_status(
667            "test-model",
668            ModelProvider::HuggingFace,
669            ModelStatus::DOWNLOADED,
670            None,
671        )
672        .expect("Failed to set model status");
673
674        let service = CacheEvictionService::new(db.clone(), config);
675
676        let models_to_evict = vec!["test-model".to_string()];
677        let result = service
678            .manual_evict(&models_to_evict)
679            .await
680            .expect("Failed to manually evict models");
681
682        assert_eq!(result.evicted_count, 1);
683        assert_eq!(result.evicted_models[0], "test-model");
684        assert!(matches!(result.reason, EvictionReason::Manual));
685
686        // Verify model was removed from database
687        assert!(
688            db.get_status("test-model")
689                .expect("Failed to get model status")
690                .is_none()
691        );
692    }
693
694    #[tokio::test]
695    async fn test_get_cache_stats() {
696        let (db, _temp_dir) = create_test_database();
697        let config = CacheEvictionConfig::default();
698
699        // Add test models with different statuses
700        db.set_status(
701            "model1",
702            ModelProvider::HuggingFace,
703            ModelStatus::DOWNLOADED,
704            None,
705        )
706        .expect("Failed to set model1 status");
707        db.set_status(
708            "model2",
709            ModelProvider::HuggingFace,
710            ModelStatus::DOWNLOADING,
711            None,
712        )
713        .expect("Failed to set model2 status");
714        db.set_status(
715            "model3",
716            ModelProvider::HuggingFace,
717            ModelStatus::ERROR,
718            None,
719        )
720        .expect("Failed to set model3 status");
721
722        let service = CacheEvictionService::new(db, config);
723        let stats = service
724            .get_cache_stats()
725            .await
726            .expect("Failed to get cache stats");
727
728        assert_eq!(stats.total_models, 3);
729        assert_eq!(stats.downloaded_models, 1);
730        assert_eq!(stats.downloading_models, 1);
731        assert_eq!(stats.error_models, 1);
732    }
733}