#![allow(dead_code, unused_imports, unused_variables)]
use crate::{
backends::{BackendConfig, BackendHandle, BackendType},
metrics::MetricsCollector,
models::{ModelInfo, ModelManager},
};
use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::{
fs as async_fs,
sync::{RwLock, Semaphore},
task::JoinHandle,
time::interval,
};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
pub max_cached_models: usize,
pub max_memory_mb: u64,
pub model_ttl_seconds: u64,
pub enable_warmup: bool,
pub warmup_strategy: WarmupStrategy,
pub always_warm: Vec<String>,
pub predictive_loading: bool,
pub usage_window_seconds: u64,
pub min_usage_frequency: f64,
pub memory_based_eviction: bool,
pub persist_cache: bool,
pub cache_dir: Option<PathBuf>,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
max_cached_models: 5,
max_memory_mb: 8192, model_ttl_seconds: 3600, enable_warmup: true,
warmup_strategy: WarmupStrategy::UsageBased,
always_warm: Vec::new(),
predictive_loading: true,
usage_window_seconds: 86400, min_usage_frequency: 0.1, memory_based_eviction: true,
persist_cache: false,
cache_dir: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WarmupStrategy {
UsageBased,
Predictive,
SizeOptimized,
Priority,
Hybrid,
}
pub struct CachedModel {
pub backend: BackendHandle,
pub model_info: ModelInfo,
pub last_used: Instant,
pub created_at: Instant,
pub usage_count: AtomicU64,
pub memory_estimate: u64,
pub warmup_priority: u8,
}
impl std::fmt::Debug for CachedModel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CachedModel")
.field("model_info", &self.model_info)
.field("last_used", &self.last_used)
.field("created_at", &self.created_at)
.field("usage_count", &self.usage_count.load(Ordering::Relaxed))
.field("memory_estimate", &self.memory_estimate)
.field("warmup_priority", &self.warmup_priority)
.finish()
}
}
impl Clone for CachedModel {
fn clone(&self) -> Self {
Self {
backend: self.backend.clone(),
model_info: self.model_info.clone(),
last_used: self.last_used,
created_at: self.created_at,
usage_count: AtomicU64::new(self.usage_count.load(Ordering::Relaxed)),
memory_estimate: self.memory_estimate,
warmup_priority: self.warmup_priority,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelUsageStats {
pub model_name: String,
pub request_count: u64,
pub last_request: SystemTime,
pub average_response_time: Duration,
pub total_response_time: Duration,
pub memory_usage: u64,
pub usage_frequency: f64, pub usage_trend: f64, }
#[derive(Debug, Clone)]
pub struct CacheStats {
pub total_models: usize,
pub memory_usage_mb: f64,
pub hit_rate: f64,
pub miss_rate: f64,
pub eviction_count: u64,
pub warmup_count: u64,
pub active_models: Vec<String>,
pub model_stats: HashMap<String, ModelUsageStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableCacheEntry {
pub model_name: String,
pub model_info: ModelInfo,
pub last_used_timestamp: u64, pub created_at_timestamp: u64, pub usage_count: u64,
pub memory_estimate: u64,
pub warmup_priority: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableCacheState {
pub version: u32,
pub cache_entries: Vec<SerializableCacheEntry>,
pub usage_stats: HashMap<String, ModelUsageStats>,
pub cache_hits: u64,
pub cache_misses: u64,
pub evictions: u64,
pub warmups: u64,
pub total_memory: u64,
pub saved_at: u64, }
const CACHE_FORMAT_VERSION: u32 = 1;
const CACHE_FILE_NAME: &str = "cache_state.bin.zst";
const CACHE_STATS_FILE_NAME: &str = "cache_stats.bin.zst";
pub struct ModelCache {
pub config: CacheConfig,
backend_config: BackendConfig,
model_manager: Arc<ModelManager>,
metrics: Option<Arc<MetricsCollector>>,
cached_models: Arc<RwLock<HashMap<String, Arc<CachedModel>>>>,
usage_stats: Arc<RwLock<HashMap<String, ModelUsageStats>>>,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
evictions: AtomicU64,
warmups: AtomicU64,
total_memory: AtomicU64,
cleanup_task: Option<JoinHandle<()>>,
warmup_task: Option<JoinHandle<()>>,
stats_task: Option<JoinHandle<()>>,
loading_semaphore: Arc<Semaphore>,
}
impl ModelCache {
pub async fn new(
config: CacheConfig,
backend_config: BackendConfig,
model_manager: Arc<ModelManager>,
metrics: Option<Arc<MetricsCollector>>,
) -> Result<Self> {
info!(
"Initializing model cache with strategy: {:?}",
config.warmup_strategy
);
let cached_models = Arc::new(RwLock::new(HashMap::new()));
let usage_stats = Arc::new(RwLock::new(HashMap::new()));
let mut cache = Self {
config: config.clone(),
backend_config,
model_manager,
metrics,
cached_models: cached_models.clone(),
usage_stats: usage_stats.clone(),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
evictions: AtomicU64::new(0),
warmups: AtomicU64::new(0),
total_memory: AtomicU64::new(0),
cleanup_task: None,
warmup_task: None,
stats_task: None,
loading_semaphore: Arc::new(Semaphore::new(2)), };
cache.start_background_tasks().await?;
cache.warmup_always_warm_models().await?;
if config.persist_cache {
cache.load_from_disk().await?;
}
Ok(cache)
}
pub async fn get_model(&self, model_name: &str) -> Result<Arc<CachedModel>> {
{
let cache_guard = self.cached_models.read().await;
if let Some(cached_model) = cache_guard.get(model_name) {
cached_model.usage_count.fetch_add(1, Ordering::Relaxed);
self.cache_hits.fetch_add(1, Ordering::Relaxed);
self.update_usage_stats(model_name, Duration::from_millis(0))
.await;
debug!("Cache hit for model: {}", model_name);
return Ok(cached_model.clone());
}
}
self.cache_misses.fetch_add(1, Ordering::Relaxed);
info!("Cache miss for model: {}, loading...", model_name);
let _permit = self
.loading_semaphore
.acquire()
.await
.map_err(|_| anyhow!("Failed to acquire loading permit"))?;
let start_time = Instant::now();
let cached_model = self.load_model(model_name).await?;
let load_time = start_time.elapsed();
info!("Model {} loaded in {:?}", model_name, load_time);
self.update_usage_stats(model_name, load_time).await;
self.maybe_evict_models().await?;
Ok(cached_model)
}
pub async fn warmup_models(&self) -> Result<()> {
match self.config.warmup_strategy {
WarmupStrategy::UsageBased => self.warmup_usage_based().await,
WarmupStrategy::Predictive => self.warmup_predictive().await,
WarmupStrategy::SizeOptimized => self.warmup_size_optimized().await,
WarmupStrategy::Priority => self.warmup_priority_based().await,
WarmupStrategy::Hybrid => self.warmup_hybrid().await,
}
}
pub async fn get_stats(&self) -> CacheStats {
let cached_models = self.cached_models.read().await;
let usage_stats = self.usage_stats.read().await;
let total_requests =
self.cache_hits.load(Ordering::Relaxed) + self.cache_misses.load(Ordering::Relaxed);
let hit_rate = if total_requests > 0 {
self.cache_hits.load(Ordering::Relaxed) as f64 / total_requests as f64
} else {
0.0
};
CacheStats {
total_models: cached_models.len(),
memory_usage_mb: self.total_memory.load(Ordering::Relaxed) as f64 / (1024.0 * 1024.0),
hit_rate,
miss_rate: 1.0 - hit_rate,
eviction_count: self.evictions.load(Ordering::Relaxed),
warmup_count: self.warmups.load(Ordering::Relaxed),
active_models: cached_models.keys().cloned().collect(),
model_stats: usage_stats.clone(),
}
}
pub async fn warmup_model(&self, model_name: &str) -> Result<()> {
info!("Warming up model: {}", model_name);
let _cached_model = self.load_model(model_name).await?;
self.warmups.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub async fn evict_model(&self, model_name: &str) -> Result<()> {
let mut cached_models = self.cached_models.write().await;
if let Some(model) = cached_models.remove(model_name) {
self.total_memory
.fetch_sub(model.memory_estimate, Ordering::Relaxed);
self.evictions.fetch_add(1, Ordering::Relaxed);
info!("Evicted model: {}", model_name);
}
Ok(())
}
pub async fn clear_cache(&self) -> Result<()> {
let mut cached_models = self.cached_models.write().await;
cached_models.clear();
self.total_memory.store(0, Ordering::Relaxed);
info!("Cleared all cached models");
Ok(())
}
pub async fn save_cache(&self) -> Result<()> {
self.save_to_disk().await
}
async fn load_model(&self, model_name: &str) -> Result<Arc<CachedModel>> {
let model_info = self.model_manager.resolve_model(model_name).await?;
let backend_type = BackendType::from_model_path(&model_info.path).ok_or_else(|| {
anyhow::anyhow!(
"No suitable backend found for model: {}",
model_info.path.display()
)
})?;
let backend_handle = BackendHandle::new_shared(backend_type, &self.backend_config)?;
backend_handle.load_model(&model_info).await?;
let memory_estimate = self.estimate_model_memory(&model_info);
let cached_model = Arc::new(CachedModel {
backend: backend_handle,
model_info: model_info.clone(),
last_used: Instant::now(),
created_at: Instant::now(),
usage_count: AtomicU64::new(0),
memory_estimate,
warmup_priority: self.calculate_warmup_priority(model_name).await,
});
let cached_model_ref = Arc::clone(&cached_model);
{
let mut cached_models = self.cached_models.write().await;
cached_models.insert(model_name.to_string(), cached_model);
self.total_memory
.fetch_add(memory_estimate, Ordering::Relaxed);
}
{
let mut usage_stats = self.usage_stats.write().await;
usage_stats
.entry(model_name.to_string())
.or_insert_with(|| ModelUsageStats {
model_name: model_name.to_string(),
request_count: 0,
last_request: SystemTime::now(),
average_response_time: Duration::ZERO,
total_response_time: Duration::ZERO,
memory_usage: memory_estimate,
usage_frequency: 0.0,
usage_trend: 0.0,
});
}
Ok(cached_model_ref)
}
async fn update_usage_stats(&self, model_name: &str, response_time: Duration) {
let mut usage_stats = self.usage_stats.write().await;
if let Some(stats) = usage_stats.get_mut(model_name) {
stats.request_count += 1;
stats.last_request = SystemTime::now();
stats.total_response_time += response_time;
stats.average_response_time = stats.total_response_time / stats.request_count as u32;
let window_start =
SystemTime::now() - Duration::from_secs(self.config.usage_window_seconds);
if stats.last_request >= window_start {
let hours = self.config.usage_window_seconds as f64 / 3600.0;
stats.usage_frequency = stats.request_count as f64 / hours;
}
}
}
async fn maybe_evict_models(&self) -> Result<()> {
let cached_models = self.cached_models.read().await;
let total_models = cached_models.len();
let current_memory = self.total_memory.load(Ordering::Relaxed);
drop(cached_models);
let should_evict = total_models > self.config.max_cached_models
|| current_memory > self.config.max_memory_mb * 1024 * 1024;
if should_evict {
self.evict_least_recently_used().await?;
}
Ok(())
}
async fn evict_least_recently_used(&self) -> Result<()> {
let cached_models = self.cached_models.read().await;
let mut oldest_time = Instant::now();
let mut victim_model = None;
let mut lowest_priority = u8::MAX;
for (name, model) in cached_models.iter() {
if self.config.always_warm.contains(name) {
continue;
}
let is_older = model.last_used < oldest_time;
let lower_priority = model.warmup_priority < lowest_priority;
if is_older || (model.last_used == oldest_time && lower_priority) {
oldest_time = model.last_used;
lowest_priority = model.warmup_priority;
victim_model = Some(name.clone());
}
}
drop(cached_models);
if let Some(model_name) = victim_model {
info!("Evicting least recently used model: {}", model_name);
self.evict_model(&model_name).await?;
}
Ok(())
}
async fn start_background_tasks(&mut self) -> Result<()> {
let cleanup_cached_models = self.cached_models.clone();
let cleanup_config = self.config.clone();
let cleanup_evictions = Arc::new(AtomicU64::new(self.evictions.load(Ordering::Relaxed)));
let cleanup_total_memory =
Arc::new(AtomicU64::new(self.total_memory.load(Ordering::Relaxed)));
self.cleanup_task = Some(tokio::spawn(async move {
let mut cleanup_interval = interval(Duration::from_secs(300));
loop {
cleanup_interval.tick().await;
let mut cached_models = cleanup_cached_models.write().await;
let now = Instant::now();
let ttl = Duration::from_secs(cleanup_config.model_ttl_seconds);
let mut to_remove = Vec::new();
for (name, model) in cached_models.iter() {
if now.duration_since(model.last_used) > ttl
&& !cleanup_config.always_warm.contains(name)
{
to_remove.push((name.clone(), model.memory_estimate));
}
}
for (name, memory) in to_remove {
cached_models.remove(&name);
cleanup_total_memory.fetch_sub(memory, Ordering::Relaxed);
cleanup_evictions.fetch_add(1, Ordering::Relaxed);
debug!("TTL expired, evicted model: {}", name);
}
}
}));
if self.config.persist_cache {
let save_cache_dir = self.config.cache_dir.clone();
let save_cached_models = self.cached_models.clone();
let save_usage_stats = self.usage_stats.clone();
let save_cache_hits = Arc::new(AtomicU64::new(self.cache_hits.load(Ordering::Relaxed)));
let save_cache_misses =
Arc::new(AtomicU64::new(self.cache_misses.load(Ordering::Relaxed)));
let save_evictions = Arc::new(AtomicU64::new(self.evictions.load(Ordering::Relaxed)));
let save_warmups = Arc::new(AtomicU64::new(self.warmups.load(Ordering::Relaxed)));
let save_total_memory =
Arc::new(AtomicU64::new(self.total_memory.load(Ordering::Relaxed)));
self.stats_task = Some(tokio::spawn(async move {
let mut save_interval = interval(Duration::from_secs(300));
loop {
save_interval.tick().await;
if let Some(cache_dir) = &save_cache_dir {
let cached_models = save_cached_models.read().await;
let usage_stats = save_usage_stats.read().await;
let mut cache_entries = Vec::new();
let now_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
for (model_name, cached_model) in cached_models.iter() {
cache_entries.push(SerializableCacheEntry {
model_name: model_name.clone(),
model_info: cached_model.model_info.clone(),
last_used_timestamp: now_timestamp, created_at_timestamp: now_timestamp, usage_count: cached_model.usage_count.load(Ordering::Relaxed),
memory_estimate: cached_model.memory_estimate,
warmup_priority: cached_model.warmup_priority,
});
}
let cache_state = SerializableCacheState {
version: CACHE_FORMAT_VERSION,
cache_entries,
usage_stats: usage_stats.clone(),
cache_hits: save_cache_hits.load(Ordering::Relaxed),
cache_misses: save_cache_misses.load(Ordering::Relaxed),
evictions: save_evictions.load(Ordering::Relaxed),
warmups: save_warmups.load(Ordering::Relaxed),
total_memory: save_total_memory.load(Ordering::Relaxed),
saved_at: now_timestamp,
};
drop(cached_models);
drop(usage_stats);
if let Err(e) = async_fs::create_dir_all(cache_dir).await {
warn!("Failed to create cache directory {:?}: {}", cache_dir, e);
continue;
}
let cache_file = cache_dir.join(CACHE_FILE_NAME);
match save_cache_state_to_file_static(&cache_state, &cache_file).await {
Ok(()) => {
debug!(
"Periodic cache save completed with {} entries",
cache_state.cache_entries.len()
);
}
Err(e) => {
warn!("Periodic cache save failed: {}", e);
}
}
}
}
}));
}
Ok(())
}
async fn warmup_always_warm_models(&self) -> Result<()> {
for model_name in &self.config.always_warm {
if let Err(e) = self.warmup_model(model_name).await {
warn!("Failed to warm up always-warm model {}: {}", model_name, e);
}
}
Ok(())
}
async fn warmup_usage_based(&self) -> Result<()> {
let usage_stats = self.usage_stats.read().await;
let mut candidates: Vec<_> = usage_stats
.values()
.filter(|stats| stats.usage_frequency >= self.config.min_usage_frequency)
.collect();
candidates.sort_by(|a, b| {
b.usage_frequency
.partial_cmp(&a.usage_frequency)
.unwrap_or(std::cmp::Ordering::Equal)
});
for stats in candidates.iter().take(3) {
let cached_models = self.cached_models.read().await;
let should_warmup = !cached_models.contains_key(&stats.model_name);
drop(cached_models);
if should_warmup {
if let Err(e) = self.warmup_model(&stats.model_name).await {
warn!("Failed to warm up model {}: {}", stats.model_name, e);
}
}
}
Ok(())
}
async fn warmup_predictive(&self) -> Result<()> {
let usage_stats = self.usage_stats.read().await;
let mut candidates: Vec<_> = usage_stats
.values()
.filter(|stats| stats.usage_trend > 0.1) .collect();
candidates.sort_by(|a, b| {
b.usage_trend
.partial_cmp(&a.usage_trend)
.unwrap_or(std::cmp::Ordering::Equal)
});
for stats in candidates.iter().take(2) {
if let Err(e) = self.warmup_model(&stats.model_name).await {
warn!(
"Failed to predictively warm up model {}: {}",
stats.model_name, e
);
}
}
Ok(())
}
async fn warmup_size_optimized(&self) -> Result<()> {
if let Ok(models) = self.model_manager.list_models().await {
let mut sorted_models = models;
sorted_models.sort_by(|a, b| a.size.cmp(&b.size));
for model in sorted_models.iter().take(3) {
if let Err(e) = self.warmup_model(&model.name).await {
warn!(
"Failed to warm up size-optimized model {}: {}",
model.name, e
);
}
}
}
Ok(())
}
async fn warmup_priority_based(&self) -> Result<()> {
for model_name in &self.config.always_warm {
if let Err(e) = self.warmup_model(model_name).await {
warn!("Failed to warm up priority model {}: {}", model_name, e);
}
}
Ok(())
}
async fn warmup_hybrid(&self) -> Result<()> {
self.warmup_always_warm_models().await?;
self.warmup_usage_based().await?;
self.warmup_predictive().await?;
Ok(())
}
fn estimate_model_memory(&self, model_info: &ModelInfo) -> u64 {
(model_info.size as f64 * 1.2) as u64 }
async fn calculate_warmup_priority(&self, model_name: &str) -> u8 {
if self.config.always_warm.contains(&model_name.to_string()) {
return 255; }
let usage_stats = self.usage_stats.read().await;
if let Some(stats) = usage_stats.get(model_name) {
(stats.usage_frequency.min(10.0) * 25.0) as u8
} else {
1 }
}
async fn load_from_disk(&self) -> Result<()> {
if let Some(cache_dir) = &self.config.cache_dir {
info!("Loading cache state from disk: {:?}", cache_dir);
if !cache_dir.exists() {
debug!(
"Cache directory does not exist, skipping load: {:?}",
cache_dir
);
return Ok(());
}
let cache_file = cache_dir.join(CACHE_FILE_NAME);
if !cache_file.exists() {
debug!("Cache file does not exist, skipping load: {:?}", cache_file);
return Ok(());
}
match self.load_cache_state_from_file(&cache_file).await {
Ok(cache_state) => {
info!(
"Successfully loaded cache state with {} entries",
cache_state.cache_entries.len()
);
{
let mut usage_stats = self.usage_stats.write().await;
*usage_stats = cache_state.usage_stats;
}
self.cache_hits
.store(cache_state.cache_hits, Ordering::Relaxed);
self.cache_misses
.store(cache_state.cache_misses, Ordering::Relaxed);
self.evictions
.store(cache_state.evictions, Ordering::Relaxed);
self.warmups.store(cache_state.warmups, Ordering::Relaxed);
self.total_memory
.store(cache_state.total_memory, Ordering::Relaxed);
for entry in cache_state.cache_entries {
if self.should_restore_model(&entry).await {
if let Err(e) = self.warmup_model(&entry.model_name).await {
warn!(
"Failed to restore model from cache: {}: {}",
entry.model_name, e
);
} else {
debug!("Restored cached model: {}", entry.model_name);
}
}
}
}
Err(e) => {
warn!("Failed to load cache state from disk: {}", e);
}
}
}
Ok(())
}
async fn save_to_disk(&self) -> Result<()> {
if let Some(cache_dir) = &self.config.cache_dir {
info!("Saving cache state to disk: {:?}", cache_dir);
if let Err(e) = async_fs::create_dir_all(cache_dir).await {
return Err(anyhow!(
"Failed to create cache directory {:?}: {}",
cache_dir,
e
));
}
let cache_state = self.collect_cache_state().await;
let cache_file = cache_dir.join(CACHE_FILE_NAME);
match self
.save_cache_state_to_file(&cache_state, &cache_file)
.await
{
Ok(()) => {
info!(
"Successfully saved cache state with {} entries to {:?}",
cache_state.cache_entries.len(),
cache_file
);
}
Err(e) => {
error!("Failed to save cache state to disk: {}", e);
return Err(e);
}
}
}
Ok(())
}
async fn load_cache_state_from_file(
&self,
file_path: &PathBuf,
) -> Result<SerializableCacheState> {
let compressed_data = async_fs::read(file_path)
.await
.map_err(|e| anyhow!("Failed to read cache file {:?}: {}", file_path, e))?;
let decompressed_data = zstd::decode_all(&compressed_data[..])
.map_err(|e| anyhow!("Failed to decompress cache data: {}", e))?;
let cache_state: SerializableCacheState = bincode::deserialize(&decompressed_data)
.map_err(|e| anyhow!("Failed to deserialize cache data: {}", e))?;
if cache_state.version != CACHE_FORMAT_VERSION {
return Err(anyhow!(
"Incompatible cache format version: {} (expected {})",
cache_state.version,
CACHE_FORMAT_VERSION
));
}
Ok(cache_state)
}
async fn save_cache_state_to_file(
&self,
cache_state: &SerializableCacheState,
file_path: &PathBuf,
) -> Result<()> {
let serialized_data = bincode::serialize(cache_state)
.map_err(|e| anyhow!("Failed to serialize cache data: {}", e))?;
let compressed_data =
zstd::encode_all(&serialized_data[..], 3) .map_err(|e| anyhow!("Failed to compress cache data: {}", e))?;
let temp_file = file_path.with_extension("tmp");
async_fs::write(&temp_file, &compressed_data)
.await
.map_err(|e| {
anyhow!(
"Failed to write temporary cache file {:?}: {}",
temp_file,
e
)
})?;
async_fs::rename(&temp_file, file_path).await.map_err(|e| {
anyhow!(
"Failed to rename cache file {:?} to {:?}: {}",
temp_file,
file_path,
e
)
})?;
Ok(())
}
async fn collect_cache_state(&self) -> SerializableCacheState {
let cached_models = self.cached_models.read().await;
let usage_stats = self.usage_stats.read().await;
let mut cache_entries = Vec::new();
for (model_name, cached_model) in cached_models.iter() {
let last_used_timestamp = self.instant_to_unix_timestamp(cached_model.last_used);
let created_at_timestamp = self.instant_to_unix_timestamp(cached_model.created_at);
cache_entries.push(SerializableCacheEntry {
model_name: model_name.clone(),
model_info: cached_model.model_info.clone(),
last_used_timestamp,
created_at_timestamp,
usage_count: cached_model.usage_count.load(Ordering::Relaxed),
memory_estimate: cached_model.memory_estimate,
warmup_priority: cached_model.warmup_priority,
});
}
SerializableCacheState {
version: CACHE_FORMAT_VERSION,
cache_entries,
usage_stats: usage_stats.clone(),
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
evictions: self.evictions.load(Ordering::Relaxed),
warmups: self.warmups.load(Ordering::Relaxed),
total_memory: self.total_memory.load(Ordering::Relaxed),
saved_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
async fn should_restore_model(&self, entry: &SerializableCacheEntry) -> bool {
if !entry.model_info.path.exists() {
debug!(
"Model file no longer exists, skipping restore: {:?}",
entry.model_info.path
);
return false;
}
if let Ok(current_model_info) = self.model_manager.resolve_model(&entry.model_name).await {
if current_model_info.size != entry.model_info.size {
debug!(
"Model file size changed, skipping restore: {}",
entry.model_name
);
return false;
}
} else {
debug!(
"Failed to get current model info, skipping restore: {}",
entry.model_name
);
return false;
}
let last_used_time =
SystemTime::UNIX_EPOCH + Duration::from_secs(entry.last_used_timestamp);
let time_since_last_use = SystemTime::now()
.duration_since(last_used_time)
.unwrap_or(Duration::from_secs(u64::MAX));
if time_since_last_use > Duration::from_secs(86400) {
debug!(
"Model not used recently, skipping restore: {}",
entry.model_name
);
return false;
}
true
}
fn instant_to_unix_timestamp(&self, _instant: Instant) -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
}
async fn save_cache_state_to_file_static(
cache_state: &SerializableCacheState,
file_path: &PathBuf,
) -> Result<()> {
let serialized_data = bincode::serialize(cache_state)
.map_err(|e| anyhow!("Failed to serialize cache data: {}", e))?;
let compressed_data = zstd::encode_all(&serialized_data[..], 3) .map_err(|e| anyhow!("Failed to compress cache data: {}", e))?;
let temp_file = file_path.with_extension("tmp");
async_fs::write(&temp_file, &compressed_data)
.await
.map_err(|e| {
anyhow!(
"Failed to write temporary cache file {:?}: {}",
temp_file,
e
)
})?;
async_fs::rename(&temp_file, file_path).await.map_err(|e| {
anyhow!(
"Failed to rename cache file {:?} to {:?}: {}",
temp_file,
file_path,
e
)
})?;
Ok(())
}
impl Drop for ModelCache {
fn drop(&mut self) {
if let Some(task) = self.cleanup_task.take() {
task.abort();
}
if let Some(task) = self.warmup_task.take() {
task.abort();
}
if let Some(task) = self.stats_task.take() {
task.abort();
}
if self.config.persist_cache {
if let Ok(rt) = tokio::runtime::Runtime::new() {
if let Err(e) = rt.block_on(self.save_to_disk()) {
error!("Failed to save cache state on shutdown: {}", e);
}
}
}
}
}