car-inference 0.22.0

Local model inference for CAR — Candle backend with Qwen3 models
Documentation
//! Download progress + acquisition lifecycle for model pulls.
//!
//! Pulling a multi-GB model used to be silent (`ensure_local` symlinked or
//! copied with no feedback). This module adds the feedback and safety layer:
//! a [`DownloadProgress`] sink the pull path drives, a preflight disk check,
//! and (via the registry) per-model locking so a pull can't race a concurrent
//! remove/upgrade.
//!
//! ## Granularity
//!
//! Progress is **file-level**, not byte-level: each model pull is a small set
//! of files (weights + tokenizer, or N safetensors shards), and the sink is
//! told when each starts and finishes plus the file's expected size. Per-byte
//! streaming is not exposed by `hf-hub` 0.4 (its progress hook is internal and
//! `.get()` downloads a whole file atomically), so honest file-level events
//! are what we can drive without forking the downloader. When a public byte
//! callback exists, [`DownloadEvent::FileProgress`] is already in the enum to
//! carry it without a wire-breaking change.

use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};

use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};

/// One observable step in acquiring a model. Serializable so the daemon can
/// forward it verbatim as a `models.pull_progress` notification (wired in the
/// server-parity task).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum DownloadEvent {
    /// The pull is starting. `total_files`/`total_mb` are best-effort
    /// estimates from the schema; they may be 0 when unknown.
    Started {
        model: String,
        total_files: u32,
        total_mb: u64,
    },
    /// A file download has begun. `index` is 1-based.
    FileStarted {
        filename: String,
        index: u32,
        total_files: u32,
        /// Expected size in MB when known, else 0.
        size_mb: u64,
    },
    /// Byte-level progress within the current file. Reserved — not emitted
    /// today (see module docs); present so adding it later is not a
    /// wire-breaking enum change.
    FileProgress {
        filename: String,
        downloaded_mb: u64,
        total_mb: u64,
    },
    /// A file finished downloading (or was already cached).
    FileCompleted { filename: String },
    /// The whole model is ready locally.
    Completed { model: String },
    /// The pull failed; `error` is a plain-language reason.
    Failed { error: String },
}

/// A consumer of [`DownloadEvent`]s. Implementations must be cheap and
/// non-blocking — they run inline on the pull path.
pub trait DownloadProgress: Send + Sync {
    fn on_event(&self, event: &DownloadEvent);
}

/// Shared, cloneable handle to a progress sink. `None`-friendly via
/// [`ProgressSink::none`] so call sites that don't care pay nothing.
#[derive(Clone, Default)]
pub struct ProgressSink(Option<Arc<dyn DownloadProgress>>);

impl ProgressSink {
    /// A sink that drops every event.
    pub fn none() -> Self {
        ProgressSink(None)
    }

    /// Wrap a concrete sink.
    pub fn new(sink: Arc<dyn DownloadProgress>) -> Self {
        ProgressSink(Some(sink))
    }

    /// Emit an event if a sink is attached.
    pub fn emit(&self, event: DownloadEvent) {
        if let Some(s) = &self.0 {
            s.on_event(&event);
        }
    }

    /// True when a real sink is attached (lets callers skip building events).
    pub fn is_active(&self) -> bool {
        self.0.is_some()
    }
}

/// Process-wide per-model locks, so a pull can't race a concurrent pull,
/// `remove_model`, or upgrade download of the *same* model. Keyed by model id.
///
/// Entries are never evicted. The map is bounded by the number of *distinct*
/// model ids ever pulled in this process — i.e. catalog size, a few dozen —
/// so each is a tiny `(String, Arc<Mutex>)`. Eviction-on-last-drop was
/// considered and rejected: a correct evictor must remove the map entry only
/// after the guard's mutex is released, but `Drop::drop` runs while the field
/// guard is still held, opening a window where a new acquirer creates a second
/// mutex for the same id — two tasks then "hold" different locks for one model.
/// The bounded leak is not worth that race.
fn model_locks() -> &'static Mutex<HashMap<String, Arc<AsyncMutex<()>>>> {
    static LOCKS: OnceLock<Mutex<HashMap<String, Arc<AsyncMutex<()>>>>> = OnceLock::new();
    LOCKS.get_or_init(|| Mutex::new(HashMap::new()))
}

/// Acquire the exclusive lock for a model id, awaiting if another task holds
/// it. The returned guard releases on drop. Different model ids never block
/// each other.
pub async fn acquire_model_lock(model_id: &str) -> OwnedMutexGuard<()> {
    let lock = {
        let mut map = model_locks().lock().unwrap();
        map.entry(model_id.to_string())
            .or_insert_with(|| Arc::new(AsyncMutex::new(())))
            .clone()
    };
    lock.lock_owned().await
}

/// Preflight: is there room on disk for `needed_mb` at `path`, leaving a
/// reasonable free margin? Returns a plain-language error if not. `needed_mb`
/// of 0 (unknown size) skips the check rather than guessing.
pub fn check_disk_space(path: &std::path::Path, needed_mb: u64) -> Result<(), String> {
    if needed_mb == 0 {
        return Ok(());
    }
    let Some(available_mb) = available_disk_mb(path) else {
        // Can't determine free space — don't block the pull on a probe failure.
        return Ok(());
    };
    // Keep a 1 GB cushion so we don't fill the disk to the brim.
    let required = needed_mb.saturating_add(1024);
    if available_mb < required {
        return Err(format!(
            "not enough disk space: need ~{} MB (+1 GB free), but only {} MB available at {}",
            needed_mb,
            available_mb,
            path.display()
        ));
    }
    Ok(())
}

/// Free space in MB on the filesystem holding `path` (or its nearest existing
/// ancestor). `None` if it can't be determined. Shells out to `df` rather than
/// pulling in a new dependency — consistent with hardware.rs's `sysctl`/`wmic`
/// approach.
fn available_disk_mb(path: &std::path::Path) -> Option<u64> {
    // Walk up to the first existing ancestor (the target dir may not exist yet).
    let mut probe = path;
    loop {
        if probe.exists() {
            break;
        }
        probe = probe.parent()?;
    }

    #[cfg(unix)]
    {
        // `df -Pk <path>`: `-P` (POSIX) forces exactly one line per
        // filesystem, so a long device name can't wrap and shift columns.
        // Blocks are 1K; the data row's 4th column is available blocks.
        let out = std::process::Command::new("df")
            .arg("-Pk")
            .arg(probe)
            .output()
            .ok()?;
        let text = String::from_utf8(out.stdout).ok()?;
        let avail_kb: u64 = text
            .lines()
            .nth(1)?
            .split_whitespace()
            .nth(3)?
            .parse()
            .ok()?;
        Some(avail_kb / 1024)
    }
    #[cfg(not(unix))]
    {
        let _ = probe;
        // Windows free-space probing isn't wired up; skip the preflight there
        // rather than block pulls. The download still errors if the disk fills.
        None
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex;

    #[derive(Default)]
    struct Recorder {
        events: Mutex<Vec<DownloadEvent>>,
    }
    impl DownloadProgress for Recorder {
        fn on_event(&self, event: &DownloadEvent) {
            self.events.lock().unwrap().push(event.clone());
        }
    }

    #[test]
    fn none_sink_is_inert() {
        let s = ProgressSink::none();
        assert!(!s.is_active());
        s.emit(DownloadEvent::Completed { model: "x".into() }); // must not panic
    }

    #[test]
    fn sink_records_events_in_order() {
        let rec = Arc::new(Recorder::default());
        let sink = ProgressSink::new(rec.clone());
        assert!(sink.is_active());
        sink.emit(DownloadEvent::Started {
            model: "Qwen3-4B".into(),
            total_files: 2,
            total_mb: 2500,
        });
        sink.emit(DownloadEvent::FileCompleted {
            filename: "model.gguf".into(),
        });
        sink.emit(DownloadEvent::Completed {
            model: "Qwen3-4B".into(),
        });
        let evs = rec.events.lock().unwrap();
        assert_eq!(evs.len(), 3);
        assert!(matches!(evs[0], DownloadEvent::Started { total_files: 2, .. }));
        assert!(matches!(evs[2], DownloadEvent::Completed { .. }));
    }

    #[test]
    fn event_serializes_with_tag() {
        let json = serde_json::to_string(&DownloadEvent::FileStarted {
            filename: "model.gguf".into(),
            index: 1,
            total_files: 2,
            size_mb: 2400,
        })
        .unwrap();
        assert!(json.contains("\"event\":\"file_started\""));
        assert!(json.contains("\"size_mb\":2400"));
    }

    #[tokio::test]
    async fn same_model_lock_is_exclusive_distinct_ids_are_not() {
        use std::time::Duration;
        // Unique ids so parallel tests can't collide on the global lock map.
        let (a, b) = ("lock-test-a", "lock-test-b");
        let a1 = acquire_model_lock(a).await;
        // A different id acquires immediately even while A is held.
        let _b = tokio::time::timeout(Duration::from_millis(200), acquire_model_lock(b))
            .await
            .expect("distinct id must not block");
        // Re-acquiring A while a1 is held must block (times out).
        let contended =
            tokio::time::timeout(Duration::from_millis(50), acquire_model_lock(a)).await;
        assert!(contended.is_err(), "same-id lock should be contended");
        drop(a1);
        // After release, A acquires promptly.
        tokio::time::timeout(Duration::from_millis(500), acquire_model_lock(a))
            .await
            .expect("acquires after release");
    }

    #[test]
    fn zero_needed_skips_disk_check() {
        // Unknown size must not block a pull.
        assert!(check_disk_space(std::path::Path::new("/nonexistent/x"), 0).is_ok());
    }

    #[test]
    fn absurd_size_is_rejected_when_probe_succeeds() {
        // Asking for ~an exabyte on a real path must fail the preflight
        // (only meaningful where `df` is available).
        let tmp = std::env::temp_dir();
        if available_disk_mb(&tmp).is_some() {
            let res = check_disk_space(&tmp, u64::MAX / (1024 * 1024) - 2048);
            assert!(res.is_err(), "expected disk-space rejection, got {res:?}");
        }
    }
}