ktstr 0.15.0

Test harness for Linux process schedulers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
//! Atomic-rename install primitives, cache-key validators, and
//! orphan-tempdir sweep for the kernel image cache.
//!
//! Exports:
//! - [`atomic_swap_dirs`] — `renameat2(RENAME_EXCHANGE)` wrapper
//!   used by [`super::cache_dir::CacheDir::store`] to publish a
//!   freshly-built cache entry over an existing one without ever
//!   leaving readers observing a partial state.
//! - [`TmpDirGuard`] — RAII drop guard that unlinks an
//!   in-progress staging directory on any error path; pairs with
//!   [`super::TMP_DIR_PREFIX`] to keep the cache root self-cleaning.
//! - [`read_metadata`] — metadata.json deserializer; the producer
//!   side of the prefix → kind contract documented on
//!   [`super::metadata::classify_corrupt_reason`].
//! - [`clean_orphaned_tmp_dirs`] — cross-PID GC sweep that removes
//!   `.tmp-{key}-{pid}` directories when `{pid}` is no longer a
//!   live process. Run opportunistically by `store()` to keep the
//!   cache root from accumulating dead writes after a writer crash.
//! - [`validate_cache_key`] / [`validate_filename`] — input
//!   sanitisers that reject path traversal, separators, NUL,
//!   leading dot, the `TMP_DIR_PREFIX` reservation, and any name
//!   that could escape the cache root. Both run before any I/O so
//!   bad input fails fast rather than half-writing a malformed
//!   entry.
//!
//! Sibling modules:
//! - [`super::metadata`] — pure types and the
//!   [`super::metadata::classify_corrupt_reason`] dispatcher whose
//!   prefix list `read_metadata` is the producer for.
//! - [`super::cache_dir`] — orchestrates `store`/`lookup`/
//!   `list`/`clean`, calling into every helper here.
//! - [`super::resolve`] — supplies the cache root path that
//!   `clean_orphaned_tmp_dirs` walks.
//!
//! No public API in this module is `pub` — every helper is
//! `pub(crate)` and intended to be reached only through
//! `super::cache_dir`. Other crate-internal callers can technically
//! see these symbols, but the orchestration contract (lock
//! acquisition, atomic-rename publish, post-store recheck) lives
//! in `super::cache_dir::CacheDir` and bypassing it would skip
//! those guarantees.

use std::fs;
use std::path::Path;

use super::TMP_DIR_PREFIX;
use super::metadata::KernelMetadata;

/// Rejects empty keys, whitespace-only keys, keys starting with `.`
/// (reserved for ktstr bookkeeping — `.locks/`, `.tmp-*`), and keys
/// containing path separators (`/`, `\`), parent-directory traversal
/// (`..`), or null bytes. Returns `Ok(())` on valid keys.
///
/// The leading-dot rejection mirrors `CacheDir::list`'s dotfile
/// filter: every name starting with `.` is treated as ktstr
/// bookkeeping and skipped at list-time, so admitting a dotfile key
/// at store-time would create a silent divergence (the entry is
/// stored on disk but invisible to `list`). Reject up front to make
/// the divergence impossible by construction. The `.tmp-` arm is
/// retained as a more-specific error message because the
/// `TMP_DIR_PREFIX` reservation is the externally-documented contract
/// and operator-facing diagnostics name it explicitly.
pub(crate) fn validate_cache_key(key: &str) -> anyhow::Result<()> {
    if key.is_empty() || key.trim().is_empty() {
        anyhow::bail!("cache key must not be empty or whitespace-only");
    }
    if key.contains('/') || key.contains('\\') {
        anyhow::bail!("cache key must not contain path separators: {key:?}");
    }
    if key == "." || key == ".." {
        anyhow::bail!("cache key must not be a directory reference: {key:?}");
    }
    if key.contains("..") {
        anyhow::bail!("cache key must not contain path traversal: {key:?}");
    }
    if key.contains('\0') {
        anyhow::bail!("cache key must not contain null bytes");
    }
    if key.starts_with(TMP_DIR_PREFIX) {
        anyhow::bail!("cache key must not start with {TMP_DIR_PREFIX} (reserved): {key:?}",);
    }
    if key.starts_with('.') {
        anyhow::bail!(
            "cache key must not start with `.` (reserved for ktstr \
             bookkeeping; `list` skips every dotfile child): {key:?}",
        );
    }
    Ok(())
}

/// Validate a filename (e.g. image_name in metadata).
pub(crate) fn validate_filename(name: &str) -> anyhow::Result<()> {
    if name.is_empty() {
        anyhow::bail!("image name must not be empty");
    }
    if name.contains('/') || name.contains('\\') {
        anyhow::bail!("image name must not contain path separators: {name:?}");
    }
    if name.contains("..") {
        anyhow::bail!("image name must not contain path traversal: {name:?}");
    }
    if name.contains('\0') {
        anyhow::bail!("image name must not contain null bytes");
    }
    Ok(())
}

/// RAII guard that removes a temporary directory on drop.
pub(crate) struct TmpDirGuard<'a>(pub(crate) &'a Path);

impl Drop for TmpDirGuard<'_> {
    fn drop(&mut self) {
        // silent: clean_orphaned_tmp_dirs sweeps any leftover on the next store()
        let _ = fs::remove_dir_all(self.0);
    }
}

/// Atomically swap two filesystem paths via renameat2(RENAME_EXCHANGE).
pub(crate) fn atomic_swap_dirs(src: &Path, dst: &Path) -> anyhow::Result<()> {
    rustix::fs::renameat_with(
        rustix::fs::CWD,
        src,
        rustix::fs::CWD,
        dst,
        rustix::fs::RenameFlags::EXCHANGE,
    )
    .map_err(|e| {
        anyhow::anyhow!(
            "renameat2(RENAME_EXCHANGE) {} <-> {}: {e}",
            src.display(),
            dst.display(),
        )
    })
}

/// Flush a fully-populated staging directory to stable storage before
/// it is published via a rename(2). fsyncs every regular file directly
/// in `dir` (their data + metadata), then fsyncs `dir` itself (its
/// directory entries). Pairs with [`fsync_parent`] — called on the
/// published path AFTER the rename — to form the crash-consistent
/// atomic-publish idiom: the same fsync → rename → parent-fsync
/// sequence the freeze coordinator uses for single files, extended to
/// a directory rename.
///
/// `dir` is treated as flat: both cache layers that publish a directory
/// — [`super::cache_dir::CacheDir::store`] and
/// `vmm::disk_template::store_atomic` — stage flat directories (a kernel
/// image plus optional vmlinux plus metadata.json, or a single template
/// image). Nested subdirectories are not recursed into.
///
/// Durability here is opportunistic: callers log and continue on error
/// rather than failing the publish. Each cache layer validates on read
/// and rebuilds the common crash-torn cases (a zeroed or truncated entry
/// fails validation → cache miss → rebuild), so a failed fsync usually
/// costs only a redundant post-crash rebuild. The narrow residual it
/// does NOT cover — a crash leaving a layer's validation bytes durable
/// but later blocks torn — surfaces loudly at the consumer (see each
/// call site), never as silent host-side data loss: the published
/// artifact is a per-test scratch disk or a boot/probe input, never a
/// host-parsed or shared file.
pub(crate) fn fsync_staging_dir(dir: &Path) -> std::io::Result<()> {
    for entry in fs::read_dir(dir)? {
        let entry = entry?;
        if entry.file_type()?.is_file() {
            fs::File::open(entry.path())?.sync_all()?;
        }
    }
    // Open the directory read-only and fsync it so the dentries written
    // into it are durable before the directory is renamed into place.
    fs::File::open(dir)?.sync_all()
}

/// fsync the parent directory of `path` so a rename(2) that placed
/// `path` into it is durable across a host crash. No-op when `path`
/// has no parent. See [`fsync_staging_dir`] for the publish-durability
/// contract and why callers treat failure as non-fatal.
pub(crate) fn fsync_parent(path: &Path) -> std::io::Result<()> {
    match path.parent() {
        Some(parent) => fs::File::open(parent)?.sync_all(),
        None => Ok(()),
    }
}

/// Read and deserialize metadata.json from a cache entry directory.
///
/// On failure returns a human-readable reason with a distinct prefix
/// per failure mode (missing / unreadable / schema-drift / malformed
/// / truncated). Prefix consumers key on
/// [`super::metadata::classify_corrupt_reason`].
///
/// **Producer↔classifier contract.** The reason strings emitted
/// below are the authoritative source of truth for the JSON
/// `error_kind` field that `cargo ktstr kernel list --json`
/// surfaces. Each `Err(format!("metadata.json …: {e}"))` arm in
/// this function corresponds to exactly one row in the prefix→kind
/// table documented on
/// [`super::metadata::classify_corrupt_reason`]. If you add a new
/// failure mode here, both that classifier dispatcher and the
/// `classify_corrupt_reason_covers_every_documented_prefix` test
/// (in `metadata.rs`) MUST be updated in lockstep — silently
/// adding an unrecognised prefix here drops the new failure into
/// the catch-all `"unknown"` bucket and breaks scripted consumers
/// dispatching on `error_kind`.
pub(crate) fn read_metadata(dir: &Path) -> Result<KernelMetadata, String> {
    let meta_path = dir.join("metadata.json");
    let contents = match fs::read_to_string(&meta_path) {
        Ok(c) => c,
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
            return Err("metadata.json missing".to_string());
        }
        Err(e) => return Err(format!("metadata.json unreadable: {e}")),
    };
    serde_json::from_str(&contents).map_err(|e| match e.classify() {
        serde_json::error::Category::Data => format!("metadata.json schema drift: {e}"),
        serde_json::error::Category::Syntax => format!("metadata.json malformed: {e}"),
        serde_json::error::Category::Eof => format!("metadata.json truncated: {e}"),
        // Category::Io is only produced by `from_reader` when the
        // underlying io::Read fails. `from_str` operates on an
        // already-loaded `&str` — there is no I/O surface left for
        // serde_json to fault on, so this arm is dead. Pinned via
        // `unreachable!` so a future swap of `from_str` for
        // `from_reader` (or a serde_json upgrade that broadens the
        // semantics) surfaces here loudly rather than silently
        // landing in a catch-all `parse_error` bucket. The
        // serde_json source itself uses the same `unreachable!()`
        // pattern in its `From<Error> for io::Error` impl.
        serde_json::error::Category::Io => unreachable!(
            "serde_json::from_str cannot return Category::Io — \
             from_str operates on &str, no I/O surface present"
        ),
    })
}

/// Scan `cache_root` for `.tmp-{key}-{pid}` directories whose `{pid}`
/// is no longer a live process and remove them.
///
/// Cross-PID orphan sweep. `kill(pid, None)` returning `Err(ESRCH)`
/// is the only outcome that justifies removal; alive / EPERM
/// preserve.
pub(crate) fn clean_orphaned_tmp_dirs(cache_root: &Path) -> anyhow::Result<()> {
    if !cache_root.is_dir() {
        return Ok(());
    }
    let read_dir = match fs::read_dir(cache_root) {
        Ok(rd) => rd,
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
        Err(e) => anyhow::bail!("read cache root {}: {e}", cache_root.display()),
    };
    for dir_entry in read_dir {
        let dir_entry = match dir_entry {
            Ok(d) => d,
            Err(e) => {
                tracing::warn!(err = %format!("{e:#}"), "skip unreadable cache root entry");
                continue;
            }
        };
        let name = match dir_entry.file_name().into_string() {
            Ok(n) => n,
            Err(_) => continue,
        };
        if !name.starts_with(TMP_DIR_PREFIX) {
            continue;
        }
        let pid_str = match name.rsplit_once('-') {
            Some((_, suffix)) if !suffix.is_empty() => suffix,
            _ => continue,
        };
        let pid: i32 = match pid_str.parse() {
            Ok(p) => p,
            Err(_) => continue,
        };
        if pid <= 0 {
            continue;
        }
        let dead = matches!(
            nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid), None),
            Err(nix::errno::Errno::ESRCH),
        );
        if !dead {
            continue;
        }
        let path = dir_entry.path();
        match fs::remove_dir_all(&path) {
            Ok(()) => {
                tracing::info!(
                    path = %path.display(),
                    orphan_pid = pid,
                    "cleaned orphaned .tmp- dir from prior crashed process",
                );
            }
            Err(e) => {
                tracing::warn!(
                    err = %format!("{e:#}"),
                    path = %path.display(),
                    "failed to remove orphaned .tmp- dir; leaving in place",
                );
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    // -- clean_orphaned_tmp_dirs unit tests --
    //
    // Parser/dispatcher coverage: the scan must remove directories
    // under `.tmp-{key}-{pid}` whose `{pid}` is verifiably dead,
    // must LEAVE malformed entries and non-`.tmp-` entries alone,
    // and must tolerate a nonexistent cache root.

    /// A `.tmp-{key}-{pid}` directory whose pid refers to a dead
    /// process is removed.
    #[test]
    fn clean_orphaned_tmp_dirs_removes_dead_pid_tempdir() {
        let tmp = TempDir::new().unwrap();
        // pid_t::MAX (i32::MAX = 2147483647) is well beyond Linux's
        // PID_MAX_LIMIT (4194304 on 64-bit). No real PID can match,
        // so kill(MAX, 0) returns ESRCH deterministically. Same
        // sentinel is reused at the other dead-pid sites in this
        // module.
        let dead_pid = libc::pid_t::MAX;
        let orphan = tmp
            .path()
            .join(format!("{TMP_DIR_PREFIX}somekey-{dead_pid}"));
        std::fs::create_dir_all(&orphan).unwrap();
        std::fs::write(orphan.join("inner.txt"), b"data").unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            !orphan.exists(),
            "dead-pid tempdir must be removed by clean_orphaned_tmp_dirs",
        );
    }

    /// A `.tmp-{key}-{pid}` directory whose pid is LIVE (the test
    /// process itself) must be preserved.
    #[test]
    fn clean_orphaned_tmp_dirs_preserves_live_pid_tempdir() {
        let tmp = TempDir::new().unwrap();
        let live_pid = unsafe { libc::getpid() };
        let keeper = tmp
            .path()
            .join(format!("{TMP_DIR_PREFIX}somekey-{live_pid}"));
        std::fs::create_dir_all(&keeper).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            keeper.exists(),
            "live-pid tempdir must NOT be removed — its owner is still running",
        );
    }

    /// Entries whose suffix cannot be parsed as a pid (non-numeric
    /// or empty after the trailing `-`) must be left alone.
    #[test]
    fn clean_orphaned_tmp_dirs_leaves_malformed_suffix_alone() {
        let tmp = TempDir::new().unwrap();
        let nonnum = tmp.path().join(format!("{TMP_DIR_PREFIX}somekey-notapid"));
        std::fs::create_dir_all(&nonnum).unwrap();
        let empty_suf = tmp.path().join(format!("{TMP_DIR_PREFIX}somekey-"));
        std::fs::create_dir_all(&empty_suf).unwrap();
        let no_dash = tmp.path().join(format!("{TMP_DIR_PREFIX}nokeyhere"));
        std::fs::create_dir_all(&no_dash).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(nonnum.exists(), "non-numeric pid suffix must be left alone");
        assert!(empty_suf.exists(), "empty pid suffix must be left alone");
        assert!(no_dash.exists(), "no-pid-suffix entry must be left alone");
    }

    /// Directories that do not begin with [`TMP_DIR_PREFIX`] must
    /// never be touched.
    #[test]
    fn clean_orphaned_tmp_dirs_leaves_unrelated_entries_alone() {
        let tmp = TempDir::new().unwrap();
        let real_entry = tmp.path().join("real-cache-entry");
        std::fs::create_dir_all(&real_entry).unwrap();
        let other = tmp.path().join("not-a-tempdir");
        std::fs::create_dir_all(&other).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            real_entry.exists(),
            "unrelated cache entry must be preserved"
        );
        assert!(other.exists(), "unrelated directory must be preserved");
    }

    /// Non-UTF-8 filenames in the cache root must be skipped silently.
    #[test]
    #[cfg(unix)]
    fn clean_orphaned_tmp_dirs_skips_non_utf8_names() {
        use std::ffi::OsStr;
        use std::os::unix::ffi::OsStrExt;
        let tmp = TempDir::new().unwrap();
        let mut bytes: Vec<u8> = b".tmp-".to_vec();
        bytes.push(0xFF);
        bytes.extend_from_slice(b"-123");
        let bad_name = OsStr::from_bytes(&bytes);
        let bad_path = tmp.path().join(bad_name);
        std::fs::create_dir(&bad_path).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            bad_path.exists(),
            "non-UTF-8 entry must be left alone — the scan cannot \
             confirm it matches our format, so safe-default is skip",
        );
    }

    /// A nonexistent cache root returns `Ok(())` without error.
    #[test]
    fn clean_orphaned_tmp_dirs_handles_missing_cache_root() {
        let tmp = TempDir::new().unwrap();
        let never_created = tmp.path().join("never-created");
        clean_orphaned_tmp_dirs(&never_created).unwrap();
    }

    /// `clean_orphaned_tmp_dirs` against a path that EXISTS but is
    /// NOT a directory (e.g. a regular file at the cache_root path)
    /// returns `Ok(())` rather than erroring. The early `is_dir()`
    /// guard short-circuits so a misconfigured cache_root pointing
    /// at a stray file does not poison the store() pipeline. Pins
    /// the soft-fail semantic — a hard failure here would brick
    /// every store call until the operator manually fixed the
    /// stray file.
    #[test]
    fn clean_orphaned_tmp_dirs_returns_ok_when_root_is_a_file() {
        let tmp = TempDir::new().unwrap();
        let stray_file = tmp.path().join("stray-file");
        std::fs::write(&stray_file, b"not a directory").unwrap();
        clean_orphaned_tmp_dirs(&stray_file)
            .expect("file-shaped cache_root must surface as Ok via the is_dir guard");
        assert!(
            stray_file.exists(),
            "the file must remain in place — clean_orphaned_tmp_dirs \
             must not delete a non-directory cache_root",
        );
    }

    /// Multi-entry mix: a DEAD-pid orphan and a LIVE-pid tempdir
    /// side by side — only the dead one is removed.
    #[test]
    fn clean_orphaned_tmp_dirs_mixed_entries() {
        let tmp = TempDir::new().unwrap();
        // pid_t::MAX sentinel — see comment in
        // `clean_orphaned_tmp_dirs_removes_dead_pid_tempdir` above.
        let dead_pid = libc::pid_t::MAX;
        let live_pid = unsafe { libc::getpid() };
        let dead = tmp.path().join(format!("{TMP_DIR_PREFIX}a-{dead_pid}"));
        let live = tmp.path().join(format!("{TMP_DIR_PREFIX}b-{live_pid}"));
        let unrelated = tmp.path().join("c-regular-entry");
        std::fs::create_dir_all(&dead).unwrap();
        std::fs::create_dir_all(&live).unwrap();
        std::fs::create_dir_all(&unrelated).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(!dead.exists(), "dead orphan must be removed");
        assert!(live.exists(), "live-pid entry must survive");
        assert!(unrelated.exists(), "unrelated entry must survive");
    }

    /// `pid == 0` suffix: the scan rejects non-positive pids before
    /// the liveness probe runs.
    #[test]
    fn clean_orphaned_tmp_dirs_preserves_pid_zero_suffix() {
        let tmp = TempDir::new().unwrap();
        let entry = tmp.path().join(format!("{TMP_DIR_PREFIX}somekey-0"));
        std::fs::create_dir_all(&entry).unwrap();
        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            entry.exists(),
            "pid=0 suffix must be preserved — `pid <= 0` filter \
             skips before the liveness probe so non-positive parses \
             cannot reach kill()",
        );
    }

    /// Documents that `rsplit_once('-')` parses double-dash suffix
    /// as a positive pid, never negative.
    #[test]
    fn clean_orphaned_tmp_dirs_double_dash_parses_as_positive_pid() {
        let tmp = TempDir::new().unwrap();
        let entry = tmp.path().join(format!("{TMP_DIR_PREFIX}somekey--12345"));
        std::fs::create_dir_all(&entry).unwrap();
        clean_orphaned_tmp_dirs(tmp.path()).unwrap();

        let pid_alive = matches!(
            nix::sys::signal::kill(nix::unistd::Pid::from_raw(12345), None),
            Ok(()),
        );
        if pid_alive {
            assert!(
                entry.exists(),
                "pid 12345 was alive at probe time → entry must be \
                 preserved; got: entry removed (regression?)",
            );
        } else {
            assert!(
                !entry.exists(),
                "pid 12345 was dead at probe time → entry must be \
                 removed (proves positive-pid parse). A regression to \
                 negative-pid parse would preserve unconditionally; \
                 entry still exists.",
            );
        }
    }

    /// Regular file entry (not a directory) whose name MATCHES the
    /// `.tmp-{key}-{pid}` pattern with a dead pid stays in place.
    #[test]
    fn clean_orphaned_tmp_dirs_leaves_regular_file_entry() {
        let tmp = TempDir::new().unwrap();
        // pid_t::MAX sentinel — see comment in
        // `clean_orphaned_tmp_dirs_removes_dead_pid_tempdir` above.
        let dead_pid = libc::pid_t::MAX;
        let file_entry = tmp
            .path()
            .join(format!("{TMP_DIR_PREFIX}fileshaped-{dead_pid}"));
        std::fs::write(&file_entry, b"not a directory").unwrap();
        clean_orphaned_tmp_dirs(tmp.path()).unwrap();
        assert!(
            file_entry.exists(),
            "regular file with tempdir-shaped name + dead pid must \
             NOT be removed — `remove_dir_all` errors on a file, \
             and the scan's error-tolerance contract leaves it",
        );
    }

    /// Symlink entry whose NAME matches the tempdir pattern but
    /// whose TARGET is an unrelated path outside the cache.
    #[test]
    #[cfg(unix)]
    fn clean_orphaned_tmp_dirs_leaves_symlink_entry() {
        let tmp = TempDir::new().unwrap();
        let target_root = TempDir::new().unwrap();
        let target_file = target_root.path().join("sentinel.txt");
        std::fs::write(&target_file, b"must-not-be-deleted").unwrap();

        // pid_t::MAX sentinel — see comment in
        // `clean_orphaned_tmp_dirs_removes_dead_pid_tempdir` above.
        let dead_pid = libc::pid_t::MAX;
        let symlink = tmp
            .path()
            .join(format!("{TMP_DIR_PREFIX}symkey-{dead_pid}"));
        std::os::unix::fs::symlink(target_root.path(), &symlink).unwrap();

        clean_orphaned_tmp_dirs(tmp.path()).unwrap();

        assert!(
            target_file.exists(),
            "symlink target's contents must survive the scan — \
             following symlinks would delete unrelated state \
             outside the cache root, a critical security / data- \
             safety regression",
        );
        assert_eq!(
            std::fs::read(&target_file).unwrap(),
            b"must-not-be-deleted",
            "target file content must be unchanged",
        );
    }

    // -- fsync_staging_dir / fsync_parent durability primitives --
    //
    // These flush a staged cache entry + the cache root before/after
    // the publish rename. fsync's crash-survival effect can't be
    // unit-tested (needs a real power loss), so coverage verifies the
    // functional contract: the calls succeed over the shapes both cache
    // layers produce, leave content intact (fsync flushes, never
    // mutates), honor the flat-dir contract (no recursion), and surface
    // I/O errors rather than silently succeeding.

    /// fsyncs a populated flat staging dir without error and leaves
    /// file contents intact.
    #[test]
    fn fsync_staging_dir_flushes_flat_dir_contents_intact() {
        let tmp = TempDir::new().unwrap();
        let staging = tmp.path().join("staging");
        std::fs::create_dir_all(&staging).unwrap();
        std::fs::write(staging.join("image"), b"image-bytes").unwrap();
        std::fs::write(staging.join("metadata.json"), b"{}").unwrap();

        fsync_staging_dir(&staging).expect("fsync of a populated flat dir must succeed");
        assert_eq!(
            std::fs::read(staging.join("image")).unwrap(),
            b"image-bytes",
            "fsync must not mutate file contents",
        );
        assert_eq!(std::fs::read(staging.join("metadata.json")).unwrap(), b"{}");
    }

    /// An empty staging dir fsyncs the directory itself with no files
    /// to flush.
    #[test]
    fn fsync_staging_dir_empty_dir_ok() {
        let tmp = TempDir::new().unwrap();
        let staging = tmp.path().join("empty");
        std::fs::create_dir_all(&staging).unwrap();
        fsync_staging_dir(&staging).expect("fsync of an empty dir must succeed");
    }

    /// A nested subdirectory is not recursed into (the helper documents
    /// a flat contract): the subdir is skipped via `is_file()`, the call
    /// still succeeds, and the subdir's contents are untouched.
    #[test]
    fn fsync_staging_dir_skips_subdirs() {
        let tmp = TempDir::new().unwrap();
        let staging = tmp.path().join("staging");
        std::fs::create_dir_all(staging.join("subdir")).unwrap();
        std::fs::write(staging.join("subdir/inner"), b"inner").unwrap();
        std::fs::write(staging.join("top"), b"top").unwrap();
        fsync_staging_dir(&staging)
            .expect("flat fsync over a dir containing a subdir must succeed");
        assert_eq!(
            std::fs::read(staging.join("subdir/inner")).unwrap(),
            b"inner",
            "the un-recursed subdir's contents must be untouched",
        );
    }

    /// A nonexistent staging dir surfaces the `read_dir` error rather
    /// than silently succeeding.
    #[test]
    fn fsync_staging_dir_missing_dir_errs() {
        let tmp = TempDir::new().unwrap();
        let missing = tmp.path().join("never-created");
        assert!(
            fsync_staging_dir(&missing).is_err(),
            "fsync of a nonexistent dir must surface the read_dir error",
        );
    }

    /// fsync_parent fsyncs the containing directory of a path that has
    /// an existing parent.
    #[test]
    fn fsync_parent_flushes_containing_dir() {
        let tmp = TempDir::new().unwrap();
        let child = tmp.path().join("entry");
        std::fs::create_dir_all(&child).unwrap();
        fsync_parent(&child).expect("fsync_parent on a path with an existing parent must succeed");
    }

    /// fsync_parent on a parent-less path is a no-op `Ok` — it never
    /// errors trying to open a missing parent.
    #[test]
    fn fsync_parent_no_parent_is_ok() {
        fsync_parent(Path::new("/")).expect("root path has no parent → no-op Ok");
    }

    // -- validate_cache_key unit tests --

    #[test]
    fn cache_validate_key_rejects_empty() {
        let err = validate_cache_key("").unwrap_err();
        assert!(err.to_string().contains("empty"));
    }

    #[test]
    fn cache_validate_key_rejects_whitespace_only() {
        let err = validate_cache_key("   ").unwrap_err();
        assert!(err.to_string().contains("empty"));
    }

    #[test]
    fn cache_validate_key_rejects_forward_slash() {
        let err = validate_cache_key("a/b").unwrap_err();
        assert!(err.to_string().contains("path separator"));
    }

    #[test]
    fn cache_validate_key_rejects_backslash() {
        let err = validate_cache_key("a\\b").unwrap_err();
        assert!(err.to_string().contains("path separator"));
    }

    #[test]
    fn cache_validate_key_rejects_dotdot() {
        let err = validate_cache_key("foo..bar").unwrap_err();
        assert!(err.to_string().contains("path traversal"));
    }

    #[test]
    fn cache_validate_key_rejects_null_byte() {
        let err = validate_cache_key("key\0evil").unwrap_err();
        assert!(err.to_string().contains("null"));
    }

    #[test]
    fn cache_validate_key_rejects_tmp_prefix() {
        let err = validate_cache_key(".tmp-in-progress").unwrap_err();
        assert!(
            err.to_string().contains(".tmp-"),
            "expected .tmp- rejection, got: {err}"
        );
    }

    /// Any leading-dot key (not just `.tmp-`) is rejected because
    /// `CacheDir::list`'s dotfile filter would skip it — admitting it
    /// at store-time would produce an entry that exists on disk but
    /// is invisible to `list`. The error message names the
    /// bookkeeping reservation so an operator who hits the rejection
    /// understands why their key was refused.
    #[test]
    fn cache_validate_key_rejects_other_leading_dots() {
        for bad in [".locks", ".bookkeeping", ".my-key"] {
            let err = validate_cache_key(bad).unwrap_err();
            assert!(
                err.to_string().contains("must not start with `.`"),
                "expected leading-dot rejection for {bad:?}, got: {err}",
            );
        }
    }

    #[test]
    fn cache_validate_key_rejects_dot() {
        let err = validate_cache_key(".").unwrap_err();
        assert!(
            err.to_string().contains("directory reference"),
            "expected dot rejection, got: {err}"
        );
    }

    #[test]
    fn cache_validate_key_rejects_dotdot_bare() {
        let err = validate_cache_key("..").unwrap_err();
        assert!(
            err.to_string().contains("directory reference"),
            "expected dotdot rejection, got: {err}"
        );
    }

    #[test]
    fn cache_validate_key_accepts_valid() {
        assert!(validate_cache_key("6.14.2-tarball-x86_64").is_ok());
        assert!(validate_cache_key("local-deadbeef-x86_64").is_ok());
        assert!(validate_cache_key("v6.14-git-a1b2c3d-aarch64").is_ok());
    }

    // -- validate_filename --

    #[test]
    fn cache_validate_filename_rejects_traversal() {
        assert!(validate_filename("../etc/passwd").is_err());
        assert!(validate_filename("foo/../bar").is_err());
    }

    #[test]
    fn cache_validate_filename_rejects_empty() {
        assert!(validate_filename("").is_err());
    }

    #[test]
    fn cache_validate_filename_accepts_valid() {
        assert!(validate_filename("bzImage").is_ok());
        assert!(validate_filename("Image").is_ok());
    }

    /// `validate_filename` rejects null-byte input. The validator
    /// guards against C-string boundary corruption in any callee
    /// that hands the name to a C API (`open`, `mkdir`,
    /// `renameat2` — every cache-store path eventually crosses a
    /// libc boundary, where a null byte truncates the path
    /// silently). Without the guard, a name like `"foo\0/etc"`
    /// would land on disk as `"foo"` and the truncation would
    /// be invisible.
    #[test]
    fn cache_validate_filename_rejects_null_byte() {
        let err = validate_filename("foo\0evil").unwrap_err();
        assert!(
            err.to_string().contains("null"),
            "validator must surface a null-related diagnostic — \
             without the guard a name like 'foo\\0/evil' would \
             truncate to 'foo' inside libc, silently losing the \
             trailing path; got: {err}",
        );
    }

    // -- atomic_swap_dirs direct unit tests --
    //
    // The swap is the publish step of `CacheDir::store` when the
    // destination cache_key already exists; it must atomically
    // swap two existing directory inodes via
    // renameat2(RENAME_EXCHANGE) so a concurrent reader never sees
    // a partial state. Direct coverage exercises the kernel
    // syscall's semantics (both sides materialised, neither lost,
    // contents preserved by reference rather than copy) without
    // the `store()` orchestration on top.

    /// Happy path: two existing directories swap their on-disk
    /// contents in a single atomic operation. Verifies both the
    /// content-swap observable AND that the underlying directory
    /// inodes are preserved across the swap (renameat2 swaps
    /// dentries, not contents — a regression to a copy-based
    /// fallback would observably change the inode numbers).
    #[test]
    #[cfg(unix)]
    fn atomic_swap_dirs_exchanges_two_existing_directories() {
        use std::os::unix::fs::MetadataExt;
        let tmp = TempDir::new().unwrap();
        let a = tmp.path().join("alpha");
        let b = tmp.path().join("bravo");
        std::fs::create_dir_all(&a).unwrap();
        std::fs::create_dir_all(&b).unwrap();
        std::fs::write(a.join("payload"), b"alpha-bytes").unwrap();
        std::fs::write(b.join("payload"), b"bravo-bytes").unwrap();
        let a_ino_before = std::fs::metadata(&a).unwrap().ino();
        let b_ino_before = std::fs::metadata(&b).unwrap().ino();

        atomic_swap_dirs(&a, &b).expect("happy-path swap must succeed");

        assert_eq!(
            std::fs::read(a.join("payload")).unwrap(),
            b"bravo-bytes",
            "after RENAME_EXCHANGE, the path `a` must reference the \
             contents that were under `b` before the swap",
        );
        assert_eq!(
            std::fs::read(b.join("payload")).unwrap(),
            b"alpha-bytes",
            "after RENAME_EXCHANGE, the path `b` must reference the \
             contents that were under `a` before the swap",
        );
        let a_ino_after = std::fs::metadata(&a).unwrap().ino();
        let b_ino_after = std::fs::metadata(&b).unwrap().ino();
        assert_eq!(
            a_ino_after, b_ino_before,
            "inode at path `a` must equal the pre-swap inode at `b` — \
             a copy-based fallback would assign a fresh inode here",
        );
        assert_eq!(
            b_ino_after, a_ino_before,
            "inode at path `b` must equal the pre-swap inode at `a` — \
             a copy-based fallback would assign a fresh inode here",
        );
    }

    /// `RENAME_EXCHANGE` requires BOTH endpoints to exist. A
    /// missing source must surface as an error rather than silently
    /// creating one or losing data — the diagnostic must name both
    /// paths so the operator can pinpoint the missing side.
    #[test]
    fn atomic_swap_dirs_missing_source_surfaces_error() {
        let tmp = TempDir::new().unwrap();
        let nonexistent = tmp.path().join("never-created");
        let dst = tmp.path().join("dst");
        std::fs::create_dir_all(&dst).unwrap();
        let err =
            atomic_swap_dirs(&nonexistent, &dst).expect_err("missing source must produce an Err");
        let msg = format!("{err:#}");
        assert!(
            msg.contains(&nonexistent.display().to_string()),
            "diagnostic must name the missing source path: {msg}",
        );
        assert!(
            msg.contains(&dst.display().to_string()),
            "diagnostic must also name the destination path: {msg}",
        );
        assert!(
            dst.exists(),
            "destination must remain in place when the swap fails",
        );
    }

    /// Symmetric: a missing destination must produce an actionable
    /// error rather than a silent rename.
    #[test]
    fn atomic_swap_dirs_missing_destination_surfaces_error() {
        let tmp = TempDir::new().unwrap();
        let src = tmp.path().join("src");
        std::fs::create_dir_all(&src).unwrap();
        let nonexistent = tmp.path().join("never-created");
        let err = atomic_swap_dirs(&src, &nonexistent)
            .expect_err("missing destination must produce an Err");
        let msg = format!("{err:#}");
        assert!(
            msg.contains(&src.display().to_string())
                && msg.contains(&nonexistent.display().to_string()),
            "diagnostic must name BOTH endpoints so the operator \
             can attribute the failure: {msg}",
        );
        assert!(
            src.exists(),
            "source must remain in place when the swap fails",
        );
    }

    /// Swap preserves arbitrary subtree shape — multiple files,
    /// nested subdirs — by inode reference rather than recursive
    /// copy. A regression that fell back to copy-then-rename would
    /// be observable through changes to inode identity (file
    /// metadata.ino()) but the simpler observable check is that
    /// the swap is fast and doesn't traverse contents: we rely on
    /// content equality post-swap as the proxy assertion.
    #[test]
    fn atomic_swap_dirs_preserves_subtree_shape() {
        let tmp = TempDir::new().unwrap();
        let a = tmp.path().join("alpha");
        let b = tmp.path().join("bravo");
        std::fs::create_dir_all(a.join("nested/deep")).unwrap();
        std::fs::create_dir_all(b.join("other")).unwrap();
        std::fs::write(a.join("nested/deep/leaf"), b"alpha-leaf").unwrap();
        std::fs::write(a.join("top"), b"alpha-top").unwrap();
        std::fs::write(b.join("other/file"), b"bravo-file").unwrap();

        atomic_swap_dirs(&a, &b).expect("subtree swap must succeed");

        assert_eq!(
            std::fs::read(a.join("other/file")).unwrap(),
            b"bravo-file",
            "post-swap, `a` must contain the original `b` subtree",
        );
        assert_eq!(
            std::fs::read(b.join("nested/deep/leaf")).unwrap(),
            b"alpha-leaf",
            "post-swap, `b` must contain the original `a` subtree",
        );
        assert_eq!(
            std::fs::read(b.join("top")).unwrap(),
            b"alpha-top",
            "all files in the swapped subtree must remain reachable",
        );
    }

    // -- read_metadata direct unit tests --
    //
    // `read_metadata` is the producer half of the prefix→kind
    // contract documented on `metadata::classify_corrupt_reason`.
    // The per-failure-mode prefixes are surfaced as `error_kind`
    // strings via `kernel list --json`, so each prefix is part of
    // the JSON contract and needs direct coverage that doesn't
    // require driving a full `CacheDir::list` cycle.

    /// Happy path: a valid metadata.json deserializes into a
    /// `KernelMetadata` whose required fields round-trip.
    #[test]
    fn read_metadata_happy_path_parses_valid_json() {
        use super::super::metadata::KernelSource;
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();
        let meta = KernelMetadata::new(
            KernelSource::Tarball,
            "x86_64",
            "bzImage",
            "2026-04-12T10:00:00Z",
        );
        let json = serde_json::to_string(&meta).unwrap();
        std::fs::write(entry_dir.join("metadata.json"), &json).unwrap();

        let parsed = read_metadata(&entry_dir).expect("valid metadata must parse");
        assert_eq!(parsed.image_name, "bzImage");
        assert_eq!(parsed.arch, "x86_64");
        assert_eq!(parsed.built_at, "2026-04-12T10:00:00Z");
    }

    /// Missing metadata.json → exact reason string `"metadata.json
    /// missing"`. The string is the input the classifier dispatches
    /// on for the `"missing"` error_kind, so the EXACT spelling is
    /// part of the JSON contract.
    #[test]
    fn read_metadata_missing_returns_exact_missing_reason() {
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();

        let reason =
            read_metadata(&entry_dir).expect_err("absent metadata.json must produce an Err");
        assert_eq!(
            reason, "metadata.json missing",
            "exact missing reason is the classifier dispatch key for `missing`",
        );
    }

    /// metadata.json shaped as a directory rather than a file →
    /// `"metadata.json unreadable: …"` prefix. `read_to_string` on
    /// a directory returns `EISDIR`, surfaced through the
    /// `Err(_) => "unreadable"` arm of the producer.
    #[test]
    fn read_metadata_unreadable_returns_unreadable_prefix() {
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();
        // Materialise metadata.json as a DIRECTORY — read_to_string
        // surfaces EISDIR which is neither NotFound nor a successful
        // read. Drives the `Err(e) => unreadable` arm.
        std::fs::create_dir_all(entry_dir.join("metadata.json")).unwrap();

        let reason = read_metadata(&entry_dir)
            .expect_err("metadata.json shaped as a directory must produce an Err");
        assert!(
            reason.starts_with("metadata.json unreadable: "),
            "EISDIR-on-read must surface under the `unreadable` prefix \
             so the classifier dispatches to error_kind=unreadable; \
             got: {reason}",
        );
    }

    /// Malformed JSON (`Category::Syntax`) → `"metadata.json
    /// malformed: "` prefix. The exact prefix is documented on
    /// `metadata::classify_corrupt_reason` as the dispatch key for
    /// the `"malformed"` error_kind.
    #[test]
    fn read_metadata_malformed_json_returns_malformed_prefix() {
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();
        std::fs::write(entry_dir.join("metadata.json"), b"not valid json {[").unwrap();

        let reason = read_metadata(&entry_dir).expect_err("malformed JSON must produce an Err");
        assert!(
            reason.starts_with("metadata.json malformed: "),
            "syntax-error JSON must surface under the `malformed` prefix; \
             got: {reason}",
        );
    }

    /// Truncated JSON (`Category::Eof`) → `"metadata.json
    /// truncated: "` prefix.
    #[test]
    fn read_metadata_truncated_json_returns_truncated_prefix() {
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();
        // Truncated mid-value — Category::Eof.
        std::fs::write(entry_dir.join("metadata.json"), br#"{"source":"#).unwrap();

        let reason = read_metadata(&entry_dir).expect_err("truncated JSON must produce an Err");
        assert!(
            reason.starts_with("metadata.json truncated: "),
            "EOF-mid-parse must surface under the `truncated` prefix; \
             got: {reason}",
        );
    }

    /// Missing required field (`Category::Data`) → `"metadata.json
    /// schema drift: "` prefix.
    #[test]
    fn read_metadata_schema_drift_returns_schema_drift_prefix() {
        let tmp = TempDir::new().unwrap();
        let entry_dir = tmp.path().join("entry");
        std::fs::create_dir_all(&entry_dir).unwrap();
        // Valid JSON, but missing every required `KernelMetadata`
        // field — Category::Data.
        std::fs::write(entry_dir.join("metadata.json"), br#"{"version": "6.14"}"#).unwrap();

        let reason = read_metadata(&entry_dir).expect_err("incomplete JSON must produce an Err");
        assert!(
            reason.starts_with("metadata.json schema drift: "),
            "missing required field must surface under the `schema drift` \
             prefix; got: {reason}",
        );
    }

    /// Producer-classifier round-trip: every direct producer call
    /// surfaces a prefix that the classifier dispatches into a
    /// non-`unknown` `error_kind`. Locks the documented contract
    /// at the producer side without dragging in the consumer-side
    /// table-driven test.
    #[test]
    fn read_metadata_every_failure_mode_is_classifier_recognised() {
        use super::super::metadata::classify_corrupt_reason;
        let tmp = TempDir::new().unwrap();

        // missing
        let entry = tmp.path().join("absent");
        std::fs::create_dir_all(&entry).unwrap();
        let reason = read_metadata(&entry).unwrap_err();
        assert_eq!(classify_corrupt_reason(&reason), "missing");

        // unreadable
        let entry = tmp.path().join("isdir");
        std::fs::create_dir_all(entry.join("metadata.json")).unwrap();
        let reason = read_metadata(&entry).unwrap_err();
        assert_eq!(classify_corrupt_reason(&reason), "unreadable");

        // malformed
        let entry = tmp.path().join("malformed");
        std::fs::create_dir_all(&entry).unwrap();
        std::fs::write(entry.join("metadata.json"), b"not valid json {[").unwrap();
        let reason = read_metadata(&entry).unwrap_err();
        assert_eq!(classify_corrupt_reason(&reason), "malformed");

        // truncated
        let entry = tmp.path().join("truncated");
        std::fs::create_dir_all(&entry).unwrap();
        std::fs::write(entry.join("metadata.json"), br#"{"source":"#).unwrap();
        let reason = read_metadata(&entry).unwrap_err();
        assert_eq!(classify_corrupt_reason(&reason), "truncated");

        // schema_drift
        let entry = tmp.path().join("schema-drift");
        std::fs::create_dir_all(&entry).unwrap();
        std::fs::write(entry.join("metadata.json"), br#"{"version":"6.14"}"#).unwrap();
        let reason = read_metadata(&entry).unwrap_err();
        assert_eq!(classify_corrupt_reason(&reason), "schema_drift");
    }
}