car-inference 0.31.0

Local model inference for CAR — Candle backend with Qwen3 models
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
//! Download progress + acquisition lifecycle for model pulls.
//!
//! Pulling a multi-GB model used to be silent (`ensure_local` symlinked or
//! copied with no feedback). This module adds the feedback and safety layer:
//! a [`DownloadProgress`] sink the pull path drives, a preflight disk check,
//! and (via the registry) per-model locking so a pull can't race a concurrent
//! remove/upgrade.
//!
//! ## Granularity
//!
//! Progress is **file-level**, not byte-level: each model pull is a small set
//! of files (weights + tokenizer, or N safetensors shards), and the sink is
//! told when each starts and finishes plus the file's expected size. Per-byte
//! streaming is not exposed by `hf-hub` 0.4 (its progress hook is internal and
//! `.get()` downloads a whole file atomically), so honest file-level events
//! are what we can drive without forking the downloader. When a public byte
//! callback exists, [`DownloadEvent::FileProgress`] is already in the enum to
//! carry it without a wire-breaking change.

use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};

use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};

/// One observable step in acquiring a model. Serializable so the daemon can
/// forward it verbatim as a `models.pull_progress` notification (wired in the
/// server-parity task).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum DownloadEvent {
    /// The pull is starting. `total_files`/`total_mb` are best-effort
    /// estimates from the schema; they may be 0 when unknown.
    Started {
        model: String,
        total_files: u32,
        total_mb: u64,
    },
    /// A file download has begun. `index` is 1-based.
    FileStarted {
        filename: String,
        index: u32,
        total_files: u32,
        /// Expected size in MB when known, else 0.
        size_mb: u64,
    },
    /// Byte-level progress within the current file. Reserved — not emitted
    /// today (see module docs); present so adding it later is not a
    /// wire-breaking enum change.
    FileProgress {
        filename: String,
        downloaded_mb: u64,
        total_mb: u64,
    },
    /// A file finished downloading (or was already cached).
    FileCompleted { filename: String },
    /// The whole model is ready locally.
    Completed { model: String },
    /// The pull failed; `error` is a plain-language reason.
    Failed { error: String },
}

/// A consumer of [`DownloadEvent`]s. Implementations must be cheap and
/// non-blocking — they run inline on the pull path.
pub trait DownloadProgress: Send + Sync {
    fn on_event(&self, event: &DownloadEvent);
}

/// Shared, cloneable handle to a progress sink. `None`-friendly via
/// [`ProgressSink::none`] so call sites that don't care pay nothing.
#[derive(Clone, Default)]
pub struct ProgressSink(Option<Arc<dyn DownloadProgress>>);

impl ProgressSink {
    /// A sink that drops every event.
    pub fn none() -> Self {
        ProgressSink(None)
    }

    /// Wrap a concrete sink.
    pub fn new(sink: Arc<dyn DownloadProgress>) -> Self {
        ProgressSink(Some(sink))
    }

    /// Emit an event if a sink is attached.
    pub fn emit(&self, event: DownloadEvent) {
        if let Some(s) = &self.0 {
            s.on_event(&event);
        }
    }

    /// True when a real sink is attached (lets callers skip building events).
    pub fn is_active(&self) -> bool {
        self.0.is_some()
    }
}

/// Process-wide per-model locks, so a pull can't race a concurrent pull,
/// `remove_model`, or upgrade download of the *same* model. Keyed by model id.
///
/// Entries are never evicted. The map is bounded by the number of *distinct*
/// model ids ever pulled in this process — i.e. catalog size, a few dozen —
/// so each is a tiny `(String, Arc<Mutex>)`. Eviction-on-last-drop was
/// considered and rejected: a correct evictor must remove the map entry only
/// after the guard's mutex is released, but `Drop::drop` runs while the field
/// guard is still held, opening a window where a new acquirer creates a second
/// mutex for the same id — two tasks then "hold" different locks for one model.
/// The bounded leak is not worth that race.
fn model_locks() -> &'static Mutex<HashMap<String, Arc<AsyncMutex<()>>>> {
    static LOCKS: OnceLock<Mutex<HashMap<String, Arc<AsyncMutex<()>>>>> = OnceLock::new();
    LOCKS.get_or_init(|| Mutex::new(HashMap::new()))
}

/// Acquire the exclusive lock for a model id, awaiting if another task holds
/// it. The returned guard releases on drop. Different model ids never block
/// each other.
pub async fn acquire_model_lock(model_id: &str) -> OwnedMutexGuard<()> {
    let lock = {
        let mut map = model_locks().lock().unwrap();
        map.entry(model_id.to_string())
            .or_insert_with(|| Arc::new(AsyncMutex::new(())))
            .clone()
    };
    lock.lock_owned().await
}

/// Preflight: is there room on disk for `needed_mb` at `path`, leaving a
/// reasonable free margin? Returns a plain-language error if not. `needed_mb`
/// of 0 (unknown size) skips the check rather than guessing.
pub fn check_disk_space(path: &std::path::Path, needed_mb: u64) -> Result<(), String> {
    if needed_mb == 0 {
        return Ok(());
    }
    let Some(available_mb) = available_disk_mb(path) else {
        // Can't determine free space — don't block the pull on a probe failure.
        return Ok(());
    };
    // Keep a 1 GB cushion so we don't fill the disk to the brim.
    let required = needed_mb.saturating_add(1024);
    if available_mb < required {
        return Err(format!(
            "not enough disk space: need ~{} MB (+1 GB free), but only {} MB available at {}",
            needed_mb,
            available_mb,
            path.display()
        ));
    }
    Ok(())
}

/// Free space in MB on the filesystem holding `path` (or its nearest existing
/// ancestor). `None` if it can't be determined. Shells out to `df` rather than
/// pulling in a new dependency — consistent with hardware.rs's `sysctl`/`wmic`
/// approach.
fn available_disk_mb(path: &std::path::Path) -> Option<u64> {
    // Walk up to the first existing ancestor (the target dir may not exist yet).
    let mut probe = path;
    loop {
        if probe.exists() {
            break;
        }
        probe = probe.parent()?;
    }

    #[cfg(unix)]
    {
        // `df -Pk <path>`: `-P` (POSIX) forces exactly one line per
        // filesystem, so a long device name can't wrap and shift columns.
        // Blocks are 1K; the data row's 4th column is available blocks.
        let out = std::process::Command::new("df")
            .arg("-Pk")
            .arg(probe)
            .output()
            .ok()?;
        let text = String::from_utf8(out.stdout).ok()?;
        let avail_kb: u64 = text
            .lines()
            .nth(1)?
            .split_whitespace()
            .nth(3)?
            .parse()
            .ok()?;
        Some(avail_kb / 1024)
    }
    #[cfg(not(unix))]
    {
        let _ = probe;
        // Windows free-space probing isn't wired up; skip the preflight there
        // rather than block pulls. The download still errors if the disk fills.
        None
    }
}

// ---------------------------------------------------------------------------
// Shared-cache integrity
// ---------------------------------------------------------------------------
//
// CAR does not own the HuggingFace cache (`HF_HOME` / `~/.cache/huggingface`).
// Other tools — `huggingface-cli`, `transformers`, `mlx_lm`, a user's `rm` —
// mutate it concurrently. A cached file that is *present* is therefore not
// necessarily *intact*: it can be a dangling symlink into a pruned blob store,
// a zero-length partial write from an interrupted/out-of-disk download, or a
// truncated/corrupt blob. The pull and load paths historically skipped work on
// a bare `path.exists()`, which treats all of those as "ready" and hands a
// broken file to the backend, surfacing as a cryptic load failure with no
// recovery.
//
// These helpers distinguish "present" from "usable". They are split into a
// cheap hot-path check and an expensive deep check on purpose: the cheap check
// runs on every pull/load and must not hash multi-GB weights; the deep check
// recomputes a content hash and is reserved for explicit verification and
// self-heal paths.

use std::path::Path;

/// Verdict from [`verify_cache_file`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheIntegrity {
    /// The file resolves and its content hashes to the sha256 HuggingFace
    /// encoded in its blob path — provably intact, offline.
    Verified,
    /// The file resolves and is non-empty, but its content can't be *proven*
    /// intact offline: it's a small non-LFS file (etag is a git sha1, not a
    /// content hash) or a plain file rather than a symlinked blob. Treated as
    /// usable — the cheap invariants still hold.
    Unverifiable,
    /// Missing, a dangling symlink, zero-length, unreadable, or its content
    /// does not match its expected hash. Must be re-downloaded.
    Corrupt,
}

/// Cheap, hot-path check that a cached pointer file is *usable*: it resolves
/// (no dangling symlink into a pruned blob store) and is non-empty (no
/// zero-length partial write). One `stat`, no hashing — safe to call per file
/// on the pull/load path. This is the bar for skipping a re-download.
pub fn cache_file_usable(path: &Path) -> bool {
    // `metadata` follows symlinks and returns Err on a dangling target, so a
    // pruned blob (the common shared-cache failure) is rejected here.
    matches!(std::fs::metadata(path), Ok(m) if m.is_file() && m.len() > 0)
}

/// Deep, offline integrity check for a cached HuggingFace pointer file.
///
/// HuggingFace stores blobs content-addressed: `snapshots/<commit>/<file>` is a
/// symlink to `blobs/<etag>`, and for LFS files (model weights — `.safetensors`,
/// `.gguf`) the etag *is* the sha256 of the content. That lets us verify the
/// on-disk bytes with zero network calls: recompute the hash and compare it to
/// the name HuggingFace already gave the blob. Expensive (a full read+hash),
/// so this is for explicit verification (`car doctor`) and self-heal, not the
/// hot path — use [`cache_file_usable`] there.
pub fn verify_cache_file(pointer_path: &Path) -> CacheIntegrity {
    // Cheap invariants first: covers missing, dangling, and zero-length.
    if !cache_file_usable(pointer_path) {
        return CacheIntegrity::Corrupt;
    }
    // Only a symlinked blob carries an etag we can verify against.
    let Some(etag) = blob_etag(pointer_path) else {
        return CacheIntegrity::Unverifiable;
    };
    // sha256 etags are 64 hex chars; git-sha1 etags (small non-LFS files) are
    // 40 and don't hash the raw content, so they're unverifiable here.
    if etag.len() != 64 || !etag.bytes().all(|b| b.is_ascii_hexdigit()) {
        return CacheIntegrity::Unverifiable;
    }
    match sha256_hex(pointer_path) {
        Ok(actual) if actual.eq_ignore_ascii_case(&etag) => CacheIntegrity::Verified,
        // Hash mismatch (truncated/corrupt) or unreadable both mean unusable.
        Ok(_) | Err(_) => CacheIntegrity::Corrupt,
    }
}

/// Deep-verify the files under a model directory and remove any that are
/// *provably* corrupt, so a subsequent pull re-downloads them. Returns the
/// number of files removed.
///
/// "Provably corrupt" means [`verify_cache_file`] returned [`CacheIntegrity::Corrupt`]
/// — a dangling symlink, a zero-length file, or (for LFS weights) content whose
/// sha256 doesn't match the etag HuggingFace named the blob by. Files that are
/// `Verified` or merely `Unverifiable` (small non-LFS configs, plain non-blob
/// files) are left untouched: we never delete a file we can't prove is bad.
///
/// Only weight blobs (64-hex sha256 etags) are actually hashed; everything else
/// short-circuits on the cheap check, so this is bounded to the weights even on
/// a deep directory. Intended for the self-heal path — call it only after a
/// backend load has already failed, never on the hot path.
///
/// Removing only the pointer is NOT enough to heal: a model dir is a tree of
/// symlinks into the shared store (`model_dir/file → snapshots/<commit>/file →
/// blobs/<etag>`), and hf-hub's cache lookup re-uses a snapshot whose pointer
/// still resolves — so a surviving corrupt blob would be re-linked, not
/// re-downloaded. We therefore remove the **content-addressed blob** (the fully
/// resolved target) as well as the pointer, so the re-pull's readiness checks
/// see a missing file and actually re-fetch it (rewriting the blob).
pub fn purge_corrupt_cache_files(model_dir: &Path) -> usize {
    let mut removed = 0usize;
    purge_corrupt_recurse(model_dir, &mut removed);
    removed
}

fn purge_corrupt_recurse(dir: &Path, removed: &mut usize) {
    let Ok(entries) = std::fs::read_dir(dir) else {
        return;
    };
    for entry in entries.filter_map(Result::ok) {
        let path = entry.path();
        // Recurse only into REAL directories — `file_type()` does NOT follow
        // symlinks, so a symlink pointing at an ancestor dir can't cause
        // infinite recursion. A symlink-to-dir (not seen in real HF/model
        // layouts, whose subdirs are real) falls through to the corruption
        // check, reads as Corrupt, and only its symlink is removed — harmless.
        if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
            purge_corrupt_recurse(&path, removed);
            continue;
        }
        if verify_cache_file(&path) == CacheIntegrity::Corrupt {
            // Remove the content-addressed blob first so a re-pull can't re-link
            // a surviving corrupt blob, then the pointer. Gate blob removal on
            // "is a symlink" rather than `canonicalize() != path`: a symlinked
            // cache root makes those differ for a plain file too, which would
            // delete the file via the blob branch and then miscount the pointer
            // removal as a NotFound — suppressing the heal retry.
            let is_link = std::fs::symlink_metadata(&path)
                .map(|m| m.file_type().is_symlink())
                .unwrap_or(false);
            if is_link {
                if let Ok(real) = std::fs::canonicalize(&path) {
                    let _ = std::fs::remove_file(&real);
                }
            }
            if std::fs::remove_file(&path).is_ok() {
                *removed += 1;
            }
        }
    }
}

/// The etag HuggingFace encoded in a pointer file's blob path, if the file
/// ultimately resolves to a blob under a `blobs/` store. `None` for a plain
/// file (resolves to itself, not under `blobs/`) or any path that doesn't land
/// in a blob store.
///
/// Uses [`std::fs::canonicalize`] to resolve the *entire* symlink chain, not
/// just the first hop: a snapshot pointer is itself a symlink into `blobs/`, and
/// our models dir adds another hop (models dir → snapshot pointer → blob), so a
/// single `read_link` would stop at the intermediate pointer and miss the etag.
fn blob_etag(pointer_path: &Path) -> Option<String> {
    let real = std::fs::canonicalize(pointer_path).ok()?;
    let into_blobs = real
        .parent()
        .and_then(|p| p.file_name())
        .and_then(|n| n.to_str())
        == Some("blobs");
    if !into_blobs {
        return None;
    }
    real.file_name()?.to_str().map(str::to_string)
}

/// Streaming sha256 of a file (follows symlinks), as lowercase hex. Reads in
/// chunks so a multi-GB weight file doesn't land in memory at once.
fn sha256_hex(path: &Path) -> std::io::Result<String> {
    use sha2::{Digest, Sha256};
    use std::io::Read;
    let mut file = std::fs::File::open(path)?;
    let mut hasher = Sha256::new();
    let mut buf = [0u8; 64 * 1024];
    loop {
        let n = file.read(&mut buf)?;
        if n == 0 {
            break;
        }
        hasher.update(&buf[..n]);
    }
    Ok(hex::encode(hasher.finalize()))
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex;

    #[derive(Default)]
    struct Recorder {
        events: Mutex<Vec<DownloadEvent>>,
    }
    impl DownloadProgress for Recorder {
        fn on_event(&self, event: &DownloadEvent) {
            self.events.lock().unwrap().push(event.clone());
        }
    }

    #[test]
    fn none_sink_is_inert() {
        let s = ProgressSink::none();
        assert!(!s.is_active());
        s.emit(DownloadEvent::Completed { model: "x".into() }); // must not panic
    }

    #[test]
    fn sink_records_events_in_order() {
        let rec = Arc::new(Recorder::default());
        let sink = ProgressSink::new(rec.clone());
        assert!(sink.is_active());
        sink.emit(DownloadEvent::Started {
            model: "Qwen3-4B".into(),
            total_files: 2,
            total_mb: 2500,
        });
        sink.emit(DownloadEvent::FileCompleted {
            filename: "model.gguf".into(),
        });
        sink.emit(DownloadEvent::Completed {
            model: "Qwen3-4B".into(),
        });
        let evs = rec.events.lock().unwrap();
        assert_eq!(evs.len(), 3);
        assert!(matches!(
            evs[0],
            DownloadEvent::Started { total_files: 2, .. }
        ));
        assert!(matches!(evs[2], DownloadEvent::Completed { .. }));
    }

    #[test]
    fn event_serializes_with_tag() {
        let json = serde_json::to_string(&DownloadEvent::FileStarted {
            filename: "model.gguf".into(),
            index: 1,
            total_files: 2,
            size_mb: 2400,
        })
        .unwrap();
        assert!(json.contains("\"event\":\"file_started\""));
        assert!(json.contains("\"size_mb\":2400"));
    }

    #[tokio::test]
    async fn same_model_lock_is_exclusive_distinct_ids_are_not() {
        use std::time::Duration;
        // Unique ids so parallel tests can't collide on the global lock map.
        let (a, b) = ("lock-test-a", "lock-test-b");
        let a1 = acquire_model_lock(a).await;
        // A different id acquires immediately even while A is held.
        let _b = tokio::time::timeout(Duration::from_millis(200), acquire_model_lock(b))
            .await
            .expect("distinct id must not block");
        // Re-acquiring A while a1 is held must block (times out).
        let contended =
            tokio::time::timeout(Duration::from_millis(50), acquire_model_lock(a)).await;
        assert!(contended.is_err(), "same-id lock should be contended");
        drop(a1);
        // After release, A acquires promptly.
        tokio::time::timeout(Duration::from_millis(500), acquire_model_lock(a))
            .await
            .expect("acquires after release");
    }

    #[test]
    fn zero_needed_skips_disk_check() {
        // Unknown size must not block a pull.
        assert!(check_disk_space(std::path::Path::new("/nonexistent/x"), 0).is_ok());
    }

    #[test]
    fn absurd_size_is_rejected_when_probe_succeeds() {
        // Asking for ~an exabyte on a real path must fail the preflight
        // (only meaningful where `df` is available).
        let tmp = std::env::temp_dir();
        if available_disk_mb(&tmp).is_some() {
            let res = check_disk_space(&tmp, u64::MAX / (1024 * 1024) - 2048);
            assert!(res.is_err(), "expected disk-space rejection, got {res:?}");
        }
    }

    // --- shared-cache integrity ------------------------------------------

    use sha2::{Digest, Sha256};
    use tempfile::TempDir;

    fn sha256_of(bytes: &[u8]) -> String {
        let mut h = Sha256::new();
        h.update(bytes);
        hex::encode(h.finalize())
    }

    #[test]
    fn cache_file_usable_rejects_missing_and_empty() {
        let tmp = TempDir::new().unwrap();
        let missing = tmp.path().join("nope");
        assert!(!cache_file_usable(&missing), "missing file is not usable");

        let empty = tmp.path().join("empty");
        std::fs::write(&empty, b"").unwrap();
        assert!(!cache_file_usable(&empty), "zero-length file is not usable");

        let good = tmp.path().join("good");
        std::fs::write(&good, b"weights").unwrap();
        assert!(cache_file_usable(&good), "non-empty file is usable");
    }

    #[cfg(unix)]
    #[test]
    fn cache_file_usable_rejects_dangling_symlink() {
        let tmp = TempDir::new().unwrap();
        let link = tmp.path().join("ptr");
        std::os::unix::fs::symlink(tmp.path().join("does-not-exist"), &link).unwrap();
        assert!(
            !cache_file_usable(&link),
            "dangling symlink (pruned blob) must be unusable"
        );

        // A symlink to a real non-empty blob is usable.
        let blob = tmp.path().join("blob");
        std::fs::write(&blob, b"data").unwrap();
        let live = tmp.path().join("live");
        std::os::unix::fs::symlink(&blob, &live).unwrap();
        assert!(cache_file_usable(&live), "resolving symlink is usable");
    }

    /// Build a minimal HF-style cache layout: `blobs/<etag>` + a snapshot
    /// pointer symlink, and return the pointer path.
    #[cfg(unix)]
    fn hf_pointer(root: &Path, etag: &str, content: &[u8], filename: &str) -> std::path::PathBuf {
        let blobs = root.join("blobs");
        std::fs::create_dir_all(&blobs).unwrap();
        let blob = blobs.join(etag);
        std::fs::write(&blob, content).unwrap();
        let snap = root.join("snapshots").join("deadbeef");
        std::fs::create_dir_all(&snap).unwrap();
        let ptr = snap.join(filename);
        // Relative target mirrors hf-hub's `../../blobs/<etag>`.
        std::os::unix::fs::symlink(
            Path::new("..").join("..").join("blobs").join(etag),
            &ptr,
        )
        .unwrap();
        ptr
    }

    #[cfg(unix)]
    #[test]
    fn verify_cache_file_confirms_matching_sha256_blob() {
        let tmp = TempDir::new().unwrap();
        let content = b"the real weights";
        let etag = sha256_of(content);
        let ptr = hf_pointer(tmp.path(), &etag, content, "model.safetensors");
        assert_eq!(verify_cache_file(&ptr), CacheIntegrity::Verified);
    }

    #[cfg(unix)]
    #[test]
    fn verify_cache_file_flags_corrupt_blob() {
        let tmp = TempDir::new().unwrap();
        // Blob is NAMED by one sha256 but CONTAINS different (truncated) bytes.
        let claimed = sha256_of(b"the real weights");
        let ptr = hf_pointer(tmp.path(), &claimed, b"trunc", "model.safetensors");
        assert_eq!(
            verify_cache_file(&ptr),
            CacheIntegrity::Corrupt,
            "content not matching its etag hash is corrupt"
        );
    }

    #[cfg(unix)]
    #[test]
    fn verify_cache_file_flags_dangling_and_empty() {
        let tmp = TempDir::new().unwrap();
        let dangling = tmp.path().join("d");
        std::os::unix::fs::symlink(tmp.path().join("gone"), &dangling).unwrap();
        assert_eq!(verify_cache_file(&dangling), CacheIntegrity::Corrupt);

        let empty = tmp.path().join("e");
        std::fs::write(&empty, b"").unwrap();
        assert_eq!(verify_cache_file(&empty), CacheIntegrity::Corrupt);
    }

    #[cfg(unix)]
    #[test]
    fn verify_cache_file_non_lfs_etag_is_unverifiable() {
        let tmp = TempDir::new().unwrap();
        // A 40-hex git-sha1 etag (small config file) can't be content-verified.
        let git_sha1 = "a".repeat(40);
        let ptr = hf_pointer(tmp.path(), &git_sha1, b"{}", "config.json");
        assert_eq!(verify_cache_file(&ptr), CacheIntegrity::Unverifiable);
    }

    #[test]
    fn verify_cache_file_plain_file_is_unverifiable() {
        // A real, non-empty plain file (not a symlinked blob) has no etag to
        // check against — usable, but not provable.
        let tmp = TempDir::new().unwrap();
        let f = tmp.path().join("model.gguf");
        std::fs::write(&f, b"weights").unwrap();
        assert_eq!(verify_cache_file(&f), CacheIntegrity::Unverifiable);
    }

    #[cfg(unix)]
    #[test]
    fn purge_corrupt_only_removes_provably_bad_files() {
        let tmp = TempDir::new().unwrap();
        // A model dir whose weights are HF symlinks into a blob store, plus a
        // plain config file. One weight is corrupt (content != etag), one good.
        let model = tmp.path().join("model");
        std::fs::create_dir_all(&model).unwrap();

        let good_bytes = b"the real weights";
        let good_etag = sha256_of(good_bytes);
        let good = hf_pointer(tmp.path(), &good_etag, good_bytes, "good.safetensors");
        std::os::unix::fs::symlink(&good, model.join("good.safetensors")).unwrap();

        let bad_etag = sha256_of(b"claimed");
        let bad = hf_pointer(tmp.path(), &bad_etag, b"actually-truncated", "bad.safetensors");
        let bad_blob = tmp.path().join("blobs").join(&bad_etag);
        std::os::unix::fs::symlink(&bad, model.join("bad.safetensors")).unwrap();

        // A plain, unverifiable config must never be removed.
        std::fs::write(model.join("config.json"), b"{}").unwrap();
        // A dangling symlink is provably corrupt and must be removed.
        std::os::unix::fs::symlink(model.join("gone"), model.join("dangling.safetensors")).unwrap();

        let removed = purge_corrupt_cache_files(&model);
        assert_eq!(removed, 2, "corrupt weight + dangling symlink removed");
        assert!(model.join("good.safetensors").exists(), "good weight kept");
        assert!(model.join("config.json").exists(), "config kept");
        assert!(!model.join("bad.safetensors").exists(), "corrupt weight gone");
        assert!(
            !bad_blob.exists(),
            "the corrupt content-addressed blob must be removed, not just the pointer — \
             otherwise the re-pull re-links it instead of re-downloading"
        );
        assert!(
            std::fs::canonicalize(&good).is_ok(),
            "the good blob must survive (shared with other models)"
        );
        assert!(
            std::fs::symlink_metadata(model.join("dangling.safetensors")).is_err(),
            "dangling symlink gone"
        );
    }
}