car-inference 0.31.0

Local model inference for CAR — Candle backend with Qwen3 models
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
//! LRU-evicting cache for loaded inference backends.
//!
//! Solves three problems at once:
//!
//! 1. **Cold start on every call.** Before: `FluxBackend::load()` /
//!    `LtxBackend::load()` / `KokoroBackend::load()` ran for every
//!    `generate_image` / `generate_video` / `synth_tts` request, paying
//!    the 1–14 s model-load cost on a hot path. After: first call loads,
//!    subsequent calls get a cheap `Arc<Mutex<T>>` handle.
//!
//! 2. **Concurrent calls racing on the same backend.** `mlx-rs` `Array`
//!    is `!Sync`; two tokio tasks calling the same backend simultaneously
//!    is undefined behavior. The per-entry `Mutex` serializes concurrent
//!    callers onto the same backend. Different backends still run in
//!    parallel (MLX itself queues them on the single Metal driver).
//!
//! 3. **Unbounded RAM growth.** Before: loading Flux + LTX + Kokoro +
//!    every text-gen model held ~30 GB of quantized weights forever. The
//!    cache tracks an approximate per-entry size (sum of the model
//!    directory's `.safetensors` bytes) and evicts LRU entries once the
//!    [`SharedModelBudget`] total exceeds budget. The budget is **shared**
//!    across the engine's caches (one aggregate cap, not N) and defaults to a
//!    **RAM-derived** figure (~60% of usable RAM), overridable by
//!    `CAR_INFERENCE_MODEL_CACHE_MB`. Set to 0 to disable caching. (#427)
//!
//! 4. **Resident memory that never releases at idle.** Capacity eviction
//!    only fires when the total *exceeds* the budget — so a daemon that
//!    loads one 2 GB model and then goes quiet pins that 2 GB resident
//!    forever (it never crosses the 24 GB cap). [`evict_idle`] sweeps out
//!    entries untouched for longer than the idle TTL, so a quiet daemon
//!    drops its model working set back down. TTL via
//!    `CAR_INFERENCE_MODEL_IDLE_SECS` (default 300; 0 disables idle
//!    eviction). The caller drives the sweep on a timer — the cache is
//!    otherwise passive. (car-releases#67)
//!
//! The cache is generic over `T: Send + 'static`. Inference backends
//! don't need to implement any trait — just wrap them on insert.
//!
//! Invariant: an evicted entry is only removed from the cache map; any
//! outstanding `Arc<Mutex<T>>` handle continues to work until the last
//! caller drops it. This makes eviction safe even during a long-running
//! inference call — RAM is reclaimed lazily when the last user finishes.

use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

/// Handle to a cached backend. Callers lock the inner mutex for the
/// duration of an inference call to serialize with concurrent requests
/// for the same model.
pub type CachedBackend<T> = Arc<Mutex<T>>;

/// A model-cache memory budget, shared across all backend caches so the
/// *aggregate* resident model working set (text + image + video + TTS) is bounded
/// by one figure instead of N independent per-cache caps. Each cache accounts its
/// loads against this shared total and, when the shared total exceeds the budget,
/// evicts its OWN LRU entries.
///
/// Limitation: eviction is per-cache (a cache can only evict its own entries, as
/// the caches are generic over different backend types). So a single cache's
/// growth is bounded strictly, and the cross-modality aggregate is bounded in
/// steady state (every active cache self-trims on insert; idle eviction reclaims
/// inactive ones). A strict per-instant global-LRU coordinator that can evict
/// across caches is a documented follow-up.
pub struct SharedModelBudget {
    budget_bytes: u64,
    total_bytes: AtomicU64,
}

impl SharedModelBudget {
    /// A budget of `budget_bytes` (0 disables caching).
    pub fn new(budget_bytes: u64) -> Arc<Self> {
        Arc::new(Self {
            budget_bytes,
            total_bytes: AtomicU64::new(0),
        })
    }

    /// `CAR_INFERENCE_MODEL_CACHE_MB` if set, else `default_mb` (the caller's
    /// RAM-derived figure). The env override always wins.
    pub fn from_env_or(default_mb: u64) -> Arc<Self> {
        let mb = std::env::var("CAR_INFERENCE_MODEL_CACHE_MB")
            .ok()
            .and_then(|v| v.parse::<u64>().ok())
            .unwrap_or(default_mb);
        Self::new(mb.saturating_mul(1024 * 1024))
    }

    fn add(&self, n: u64) {
        self.total_bytes.fetch_add(n, Ordering::Relaxed);
    }
    fn sub(&self, n: u64) {
        // Saturating: every add is matched by a sub, but guard against a stray
        // double-sub wrapping the counter.
        let _ = self
            .total_bytes
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
                Some(cur.saturating_sub(n))
            });
    }
    fn total(&self) -> u64 {
        self.total_bytes.load(Ordering::Relaxed)
    }
    /// Over budget? Always false for a disabled (0) budget.
    fn over_budget(&self) -> bool {
        self.budget_bytes != 0 && self.total() > self.budget_bytes
    }
    /// A disabled (zero-budget) budget — caching off, every call loads fresh.
    pub fn is_disabled(&self) -> bool {
        self.budget_bytes == 0
    }
}

/// RAM-derived default cache budget (MB): ~60% of usable RAM, from `HardwareInfo`
/// (the same figure the model recommender uses). Replaces the old flat 24 GB so
/// the cache scales with the machine instead of pinning a fixed ceiling.
pub fn default_model_cache_mb() -> u64 {
    crate::hardware::HardwareInfo::detect().max_model_mb
}

/// Idle-eviction TTL from `CAR_INFERENCE_MODEL_IDLE_SECS` (default 300; 0 = off).
pub fn idle_ttl_from_env() -> Option<Duration> {
    let idle_secs = std::env::var("CAR_INFERENCE_MODEL_IDLE_SECS")
        .ok()
        .and_then(|v| v.parse::<u64>().ok())
        .unwrap_or(300);
    (idle_secs > 0).then(|| Duration::from_secs(idle_secs))
}

struct Entry<T> {
    backend: CachedBackend<T>,
    size_bytes: u64,
    /// Last time this entry was looked up or inserted. Drives idle
    /// (time-based) eviction, distinct from the LRU capacity eviction.
    last_used: Instant,
}

struct Inner<T> {
    /// Loaded backends keyed by stable ID (typically `ModelSchema.id`).
    map: HashMap<String, Entry<T>>,
    /// LRU recency list — back is most recent, front is oldest.
    lru: VecDeque<String>,
}

/// LRU-bounded cache of loaded inference backends.
///
/// Not itself `Sync` over `T` — the cache stores handles only, and the
/// lock guarding the map is coarse (held only for look-up / insert /
/// evict). Per-entry mutation is serialized by each entry's own `Mutex`.
pub struct BackendCache<T: Send + 'static> {
    inner: Mutex<Inner<T>>,
    /// Shared budget governing this cache's evictions (may be shared with other
    /// backend caches so the aggregate working set is bounded by one figure).
    budget: Arc<SharedModelBudget>,
    /// Idle eviction window. `None` disables time-based eviction (the
    /// cache then only evicts on capacity pressure). See [`evict_idle`].
    idle_ttl: Option<Duration>,
}

impl<T: Send + 'static> BackendCache<T> {
    /// Create a cache with its OWN budget of `budget_bytes` and no idle eviction.
    /// For a budget shared across caches use [`from_shared`](Self::from_shared).
    pub fn new(budget_bytes: u64) -> Self {
        Self::from_shared(SharedModelBudget::new(budget_bytes), None)
    }

    /// Same as [`new`](Self::new) but also arms idle (time-based) eviction.
    pub fn with_idle_ttl(budget_bytes: u64, idle_ttl: Option<Duration>) -> Self {
        Self::from_shared(SharedModelBudget::new(budget_bytes), idle_ttl)
    }

    /// Create a cache bound to a (possibly shared) [`SharedModelBudget`], so
    /// several caches can enforce one aggregate budget.
    pub fn from_shared(budget: Arc<SharedModelBudget>, idle_ttl: Option<Duration>) -> Self {
        Self {
            inner: Mutex::new(Inner {
                map: HashMap::new(),
                lru: VecDeque::new(),
            }),
            budget,
            idle_ttl,
        }
    }

    /// A standalone cache configured from the environment:
    /// - `CAR_INFERENCE_MODEL_CACHE_MB` — capacity budget; default is now
    ///   **RAM-derived** (~60% of usable RAM via `HardwareInfo`), not a flat
    ///   24 GB. 0 disables caching.
    /// - `CAR_INFERENCE_MODEL_IDLE_SECS` — idle eviction TTL, default 300; 0 off.
    ///
    /// NOTE: this builds its OWN budget. To share one budget across the engine's
    /// caches, construct a [`SharedModelBudget`] once and pass it to
    /// [`from_shared`](Self::from_shared).
    pub fn from_env() -> Self {
        Self::from_shared(
            SharedModelBudget::from_env_or(default_model_cache_mb()),
            idle_ttl_from_env(),
        )
    }

    /// Is this a disabled (zero-budget) cache?
    pub fn is_disabled(&self) -> bool {
        self.budget.is_disabled()
    }

    /// Get a handle to the cached backend, or load + insert it.
    ///
    /// `loader` is invoked only on a cache miss. If the new entry plus
    /// existing entries exceed the budget, the oldest entries are
    /// evicted until we're within budget (or only the new entry remains).
    ///
    /// `size_bytes` should be an approximate on-disk or in-memory size
    /// for the loaded backend; [`estimate_model_size`] is a reasonable
    /// default that sums the `.safetensors` file sizes in a model dir.
    pub fn get_or_load<E>(
        &self,
        key: &str,
        size_bytes: u64,
        loader: impl FnOnce() -> Result<T, E>,
    ) -> Result<CachedBackend<T>, E> {
        // Fast path: already cached.
        {
            let mut guard = self.inner.lock().expect("backend cache poisoned");
            if let Some(entry) = guard.map.get_mut(key) {
                let handle = Arc::clone(&entry.backend);
                // Refresh recency for both LRU (capacity) and idle (time)
                // eviction so a model in active use is never swept.
                entry.last_used = Instant::now();
                guard.lru.retain(|k| k != key);
                guard.lru.push_back(key.to_string());
                return Ok(handle);
            }
        }

        // Slow path: load outside the lock. Concurrent loads of the same
        // key may duplicate work, but the second caller's insert simply
        // loses the race — memory isn't leaked because of the evict loop.
        let backend = loader()?;
        let handle = Arc::new(Mutex::new(backend));

        if self.budget.is_disabled() {
            // Caching disabled: return the handle without retaining it.
            return Ok(handle);
        }

        let mut guard = self.inner.lock().expect("backend cache poisoned");

        // Re-check in case someone else inserted while we were loading.
        if let Some(existing) = guard.map.get(key) {
            return Ok(Arc::clone(&existing.backend));
        }

        self.budget.add(size_bytes);
        guard.map.insert(
            key.to_string(),
            Entry {
                backend: Arc::clone(&handle),
                size_bytes,
                last_used: Instant::now(),
            },
        );
        guard.lru.push_back(key.to_string());

        // Evict this cache's LRU entries until the SHARED total is within budget
        // (but never evict the just-inserted key — it's the newest, so it would
        // only be reached once everything else is gone, which defeats the load).
        while self.budget.over_budget() {
            let Some(victim_key) = guard.lru.pop_front() else {
                break;
            };
            if victim_key == key {
                // Only our own (just-inserted) entry is left to evict; stop. The
                // shared total may still exceed budget due to OTHER caches'
                // entries — they self-trim on their next insert, and idle
                // eviction reclaims inactive ones.
                guard.lru.push_front(victim_key);
                break;
            }
            if let Some(victim) = guard.map.remove(&victim_key) {
                self.budget.sub(victim.size_bytes);
                // The `Arc` may still have outstanding handles; they
                // continue to work. Memory is reclaimed when the last
                // one drops.
                drop(victim);
            }
        }

        Ok(handle)
    }

    /// Manually remove a key, e.g. when model weights have been updated
    /// on disk. Outstanding `Arc<Mutex<T>>` handles remain valid.
    pub fn invalidate(&self, key: &str) {
        let mut guard = self.inner.lock().expect("backend cache poisoned");
        if let Some(entry) = guard.map.remove(key) {
            self.budget.sub(entry.size_bytes);
            guard.lru.retain(|k| k != key);
        }
    }

    /// Evict every entry untouched for longer than the configured idle
    /// TTL. Returns `(entries_evicted, bytes_evicted)`. A no-op (returns
    /// `(0, 0)`) when idle eviction is disabled (`idle_ttl == None`).
    ///
    /// Like capacity eviction, this only drops the cache's own handle;
    /// outstanding `Arc<Mutex<T>>` handles keep working and the RAM is
    /// reclaimed when the last one drops. At idle there are none, so the
    /// model's resident memory is released promptly. Intended to be
    /// driven on a timer by the daemon (car-releases#67).
    pub fn evict_idle(&self) -> (usize, u64) {
        let Some(ttl) = self.idle_ttl else {
            return (0, 0);
        };
        let now = Instant::now();
        let mut guard = self.inner.lock().expect("backend cache poisoned");
        let stale: Vec<String> = guard
            .map
            .iter()
            .filter(|(_, e)| now.duration_since(e.last_used) >= ttl)
            .map(|(k, _)| k.clone())
            .collect();
        let mut entries = 0usize;
        let mut bytes = 0u64;
        for key in stale {
            if let Some(victim) = guard.map.remove(&key) {
                self.budget.sub(victim.size_bytes);
                guard.lru.retain(|k| k != &key);
                entries += 1;
                bytes = bytes.saturating_add(victim.size_bytes);
                drop(victim);
            }
        }
        (entries, bytes)
    }

    /// Evict all entries. Outstanding handles continue to work.
    pub fn clear(&self) {
        let mut guard = self.inner.lock().expect("backend cache poisoned");
        // Subtract only THIS cache's bytes from the shared total (it may be
        // shared with other caches whose entries must remain accounted).
        let freed: u64 = guard.map.values().map(|e| e.size_bytes).sum();
        guard.map.clear();
        guard.lru.clear();
        self.budget.sub(freed);
    }

    /// Returns `(entries, total_bytes, budget_bytes)` for diagnostics. Note:
    /// `total_bytes` is the SHARED total across all caches on this budget, not
    /// just this cache's; `entries` is this cache's count.
    pub fn stats(&self) -> (usize, u64, u64) {
        let guard = self.inner.lock().expect("backend cache poisoned");
        (guard.map.len(), self.budget.total(), self.budget.budget_bytes)
    }
}

/// Sum the sizes of all `.safetensors` files under `model_dir`.
/// A loose upper-bound RAM estimate for the loaded model (quantized
/// tensors live in MLX-owned memory roughly matching their on-disk size).
pub fn estimate_model_size(model_dir: &Path) -> u64 {
    fn visit(dir: &Path, total: &mut u64) {
        let Ok(entries) = std::fs::read_dir(dir) else {
            return;
        };
        for entry in entries.flatten() {
            let path = entry.path();
            if path.is_dir() {
                visit(&path, total);
                continue;
            }
            if path.extension().and_then(|e| e.to_str()) == Some("safetensors") {
                if let Ok(meta) = path.metadata() {
                    *total = total.saturating_add(meta.len());
                }
            }
        }
    }
    let mut total = 0u64;
    visit(model_dir, &mut total);
    total
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn cache_hit_returns_same_handle() {
        let cache: BackendCache<u32> = BackendCache::new(1024);
        let a = cache.get_or_load::<()>("a", 100, || Ok(42)).unwrap();
        let b = cache
            .get_or_load::<()>("a", 100, || panic!("should not reload"))
            .unwrap();
        assert!(Arc::ptr_eq(&a, &b));
    }

    #[test]
    fn evicts_lru_when_over_budget() {
        let cache: BackendCache<u32> = BackendCache::new(250);
        let _a = cache.get_or_load::<()>("a", 100, || Ok(1)).unwrap();
        let _b = cache.get_or_load::<()>("b", 100, || Ok(2)).unwrap();
        // Total 200, still under 250. Touch `a` so `b` is LRU.
        let _a_again = cache
            .get_or_load::<()>("a", 100, || panic!("cached"))
            .unwrap();
        // Insert `c`: pushes us to 300, evicts `b` (the LRU one).
        let _c = cache.get_or_load::<()>("c", 100, || Ok(3)).unwrap();
        let (n, bytes, budget) = cache.stats();
        assert_eq!(n, 2, "a + c should remain, b evicted");
        assert_eq!(bytes, 200);
        assert_eq!(budget, 250);
    }

    #[test]
    fn zero_budget_disables_cache_but_returns_handle() {
        let cache: BackendCache<u32> = BackendCache::new(0);
        let mut load_count = 0u32;
        let a = cache
            .get_or_load::<()>("a", 100, || {
                load_count += 1;
                Ok(1)
            })
            .unwrap();
        assert_eq!(*a.lock().unwrap(), 1);
        let b = cache
            .get_or_load::<()>("a", 100, || {
                load_count += 1;
                Ok(1)
            })
            .unwrap();
        assert_eq!(*b.lock().unwrap(), 1);
        assert_eq!(load_count, 2, "disabled cache reloads every call");
        assert!(!Arc::ptr_eq(&a, &b));
    }

    #[test]
    fn evict_idle_drops_stale_entries_below_capacity() {
        // Tiny TTL so the test doesn't sleep long; budget far above usage
        // so capacity eviction never fires — this exercises *idle* eviction.
        let cache: BackendCache<u32> =
            BackendCache::with_idle_ttl(1_000_000, Some(Duration::from_millis(20)));
        let _a = cache.get_or_load::<()>("a", 100, || Ok(1)).unwrap();
        let _b = cache.get_or_load::<()>("b", 100, || Ok(2)).unwrap();
        assert_eq!(cache.stats().0, 2);
        // Nothing is idle yet.
        assert_eq!(cache.evict_idle(), (0, 0));
        std::thread::sleep(Duration::from_millis(40));
        // Touch `a` so only `b` is stale.
        let _a_again = cache
            .get_or_load::<()>("a", 100, || panic!("cached"))
            .unwrap();
        let (entries, bytes) = cache.evict_idle();
        assert_eq!((entries, bytes), (1, 100), "only b should be swept");
        let (n, total, _) = cache.stats();
        assert_eq!(n, 1, "a remains");
        assert_eq!(total, 100);
    }

    #[test]
    fn evict_idle_noop_when_disabled() {
        let cache: BackendCache<u32> = BackendCache::new(1024); // idle_ttl: None
        let _a = cache.get_or_load::<()>("a", 100, || Ok(1)).unwrap();
        assert_eq!(cache.evict_idle(), (0, 0));
        assert_eq!(cache.stats().0, 1, "disabled idle eviction keeps the entry");
    }

    #[test]
    fn invalidate_removes_key() {
        let cache: BackendCache<u32> = BackendCache::new(1024);
        let _a = cache.get_or_load::<()>("a", 100, || Ok(1)).unwrap();
        assert_eq!(cache.stats().0, 1);
        cache.invalidate("a");
        assert_eq!(cache.stats().0, 0);
    }

    #[test]
    fn shared_budget_spans_caches_and_each_self_trims() {
        // One budget shared by two caches (the #427 aggregate-bound fix).
        let budget = SharedModelBudget::new(250);
        let a: BackendCache<u32> = BackendCache::from_shared(budget.clone(), None);
        let b: BackendCache<u32> = BackendCache::from_shared(budget.clone(), None);

        let _ = a.get_or_load::<()>("a1", 100, || Ok(1)).unwrap();
        let _ = b.get_or_load::<()>("b1", 100, || Ok(2)).unwrap();
        // The total is SHARED across both caches.
        assert_eq!(budget.total(), 200);

        // Growing A past the shared budget makes A evict its OWN LRU (a1) back
        // under the shared cap — B's entry in the other cache is untouched.
        let _ = a.get_or_load::<()>("a2", 100, || Ok(3)).unwrap(); // shared→300>250
        assert_eq!(budget.total(), 200, "A self-trimmed to the shared budget");
        assert_eq!(a.stats().0, 1, "A kept a2, evicted a1");
        assert_eq!(b.stats().0, 1, "B's b1 not evicted by A");
    }

    #[test]
    fn default_budget_is_ram_derived_not_flat() {
        // The default scales with detected RAM (~60%); never the old flat 24 GB
        // sentinel on a normal machine, and always positive.
        let mb = default_model_cache_mb();
        assert!(mb > 0, "RAM-derived default should be positive");
    }
}