Skip to main content

vyre_driver/pipeline/
cache.rs

1//! Shared persistent cache for backend compiled-pipeline blobs.
2
3use super::hashing::{
4    dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode,
5    normalized_program_cache_digest, try_normalized_program_cache_digest, PipelineDeviceFingerprint,
6};
7use super::CURRENT_PIPELINE_CACHE_KEY_VERSION;
8use crate::backend::DispatchConfig;
9use std::sync::{Arc, MutexGuard};
10use vyre_foundation::ir::Program;
11use vyre_spec::BackendId;
12
13/// Maximum persistent pipeline blob read into memory.
14pub const MAX_DISK_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
15
16/// Disk cache for compiled pipeline blobs keyed by program and device.
17pub struct DiskPipelineCache {
18    root: std::path::PathBuf,
19    pending_flushes: std::sync::Mutex<Vec<std::path::PathBuf>>,
20}
21
22impl DiskPipelineCache {
23    fn lock_pending_flushes(&self) -> MutexGuard<'_, Vec<std::path::PathBuf>> {
24        self.pending_flushes.lock().unwrap_or_else(|error| {
25            panic!(
26                "Vyre disk pipeline cache pending-flush lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could lose or duplicate compiled-pipeline fsync work."
27            )
28        })
29    }
30
31    /// Open a cache rooted at `root`.
32    ///
33    /// # Errors
34    ///
35    /// Returns when the root directory cannot be created.
36    pub fn open(root: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
37        let root = root.into();
38        std::fs::create_dir_all(&root)?;
39        Ok(Self {
40            root,
41            pending_flushes: std::sync::Mutex::new(Vec::new()),
42        })
43    }
44
45    /// Default cache directory.
46    #[must_use]
47    pub fn default_root() -> std::path::PathBuf {
48        if let Some(xdg) = std::env::var_os("XDG_CACHE_HOME") {
49            return std::path::PathBuf::from(xdg).join("vyre").join("pipelines");
50        }
51        if let Some(home) = std::env::var_os("HOME") {
52            #[cfg(target_os = "macos")]
53            {
54                return std::path::PathBuf::from(home)
55                    .join("Library")
56                    .join("Caches")
57                    .join("vyre")
58                    .join("pipelines");
59            }
60            #[cfg(not(target_os = "macos"))]
61            {
62                return std::path::PathBuf::from(home)
63                    .join(".cache")
64                    .join("vyre")
65                    .join("pipelines");
66            }
67        }
68        if let Some(appdata) = std::env::var_os("LOCALAPPDATA") {
69            return std::path::PathBuf::from(appdata)
70                .join("vyre")
71                .join("pipelines");
72        }
73        std::path::PathBuf::from("./vyre-cache/pipelines")
74    }
75
76    /// Derive the cache path for a program digest and device fingerprint.
77    #[must_use]
78    pub fn path_for(
79        &self,
80        program_digest: [u8; 32],
81        fingerprint: PipelineDeviceFingerprint,
82    ) -> std::path::PathBuf {
83        let key = fingerprint.cache_key(program_digest);
84        let mut file_name = hex_encode(&key);
85        let mut path = self.root.join(&file_name[..2]);
86        file_name.push_str(".bin");
87        path.push(file_name);
88        path
89    }
90
91    /// Read a cached blob. Returns `None` on a miss.
92    ///
93    /// # Errors
94    ///
95    /// Returns when an existing entry cannot be read.
96    pub fn read(
97        &self,
98        program_digest: [u8; 32],
99        fingerprint: PipelineDeviceFingerprint,
100    ) -> std::io::Result<Option<Vec<u8>>> {
101        let path = self.path_for(program_digest, fingerprint);
102        match read_bounded(&path, MAX_DISK_PIPELINE_BLOB_BYTES) {
103            Ok(bytes) => Ok(Some(bytes)),
104            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
105            Err(error) => Err(error),
106        }
107    }
108
109    /// Write a cache blob with atomic install.
110    ///
111    /// # Errors
112    ///
113    /// Returns when the entry is oversized or cannot be written.
114    pub fn write(
115        &self,
116        program_digest: [u8; 32],
117        fingerprint: PipelineDeviceFingerprint,
118        bytes: &[u8],
119    ) -> std::io::Result<()> {
120        if bytes.len() as u64 > MAX_DISK_PIPELINE_BLOB_BYTES {
121            return Err(std::io::Error::new(
122                std::io::ErrorKind::InvalidInput,
123                format!("pipeline cache blob exceeds {MAX_DISK_PIPELINE_BLOB_BYTES} byte limit"),
124            ));
125        }
126        let path = self.path_for(program_digest, fingerprint);
127        if let Some(parent) = path.parent() {
128            std::fs::create_dir_all(parent)?;
129        }
130        let tmp = self.tmp_path_for(&path);
131        let write_result = (|| -> std::io::Result<()> {
132            let mut file = std::fs::File::create(&tmp)?;
133            use std::io::Write as _;
134            file.write_all(bytes)?;
135            drop(file);
136            std::fs::rename(&tmp, &path)
137        })();
138        if write_result.is_err() {
139            remove_failed_atomic_write(&tmp)?;
140        }
141        write_result?;
142        self.lock_pending_flushes().push(path);
143        Ok(())
144    }
145
146    /// Durably flush entries written by [`Self::write`].
147    ///
148    /// # Errors
149    ///
150    /// Returns when a pending path cannot be flushed.
151    pub fn flush(&self) -> std::io::Result<()> {
152        let paths = {
153            let mut pending = self.lock_pending_flushes();
154            pending.sort();
155            pending.dedup();
156            std::mem::take(&mut *pending)
157        };
158        if let Err(error) = flush_paths(&paths) {
159            self.lock_pending_flushes().extend(paths);
160            return Err(error);
161        }
162        Ok(())
163    }
164
165    /// Remove entries selected by an impact mask.
166    ///
167    /// # Errors
168    ///
169    /// Returns when an impacted entry exists but cannot be removed.
170    pub fn invalidate_impacted(
171        &self,
172        impact_mask: &[u32],
173        program_digests: &[[u8; 32]],
174        fingerprint: PipelineDeviceFingerprint,
175    ) -> std::io::Result<()> {
176        for (index, &is_impacted) in impact_mask.iter().enumerate() {
177            if is_impacted != 0 {
178                if let Some(&digest) = program_digests.get(index) {
179                    let path = self.path_for(digest, fingerprint);
180                    if path.exists() {
181                        std::fs::remove_file(path)?;
182                    }
183                }
184            }
185        }
186        Ok(())
187    }
188
189    /// Root directory used by this cache.
190    #[must_use]
191    pub fn root(&self) -> &std::path::Path {
192        &self.root
193    }
194
195    fn tmp_path_for(&self, path: &std::path::Path) -> std::path::PathBuf {
196        static TMP_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
197        let tmp_id = TMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
198        path.with_extension(format!("bin.tmp.{}.{}", std::process::id(), tmp_id))
199    }
200}
201
202fn remove_failed_atomic_write(path: &std::path::Path) -> std::io::Result<()> {
203    match std::fs::remove_file(path) {
204        Ok(()) => Ok(()),
205        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
206        Err(error) => Err(error),
207    }
208}
209
210fn read_bounded(path: &std::path::Path, max_bytes: u64) -> std::io::Result<Vec<u8>> {
211    use std::io::Read as _;
212
213    let mut file = std::fs::File::open(path)?;
214    let metadata = file.metadata()?;
215    if metadata.len() > max_bytes {
216        return Err(std::io::Error::new(
217            std::io::ErrorKind::InvalidData,
218            format!("pipeline cache blob exceeds {max_bytes} byte limit"),
219        ));
220    }
221    let byte_capacity = usize::try_from(metadata.len()).map_err(|error| {
222        std::io::Error::new(
223            std::io::ErrorKind::InvalidData,
224            format!(
225                "pipeline cache blob length {} does not fit usize: {error}",
226                metadata.len()
227            ),
228        )
229    })?;
230    let mut bytes = Vec::new();
231    crate::allocation::try_reserve_vec_to_capacity(&mut bytes, byte_capacity).map_err(|error| {
232        std::io::Error::new(
233            std::io::ErrorKind::OutOfMemory,
234            format!(
235                "pipeline cache bounded read could not reserve {byte_capacity} byte(s): {error}. Fix: lower the pipeline cache blob limit or evict oversized entries."
236            ),
237        )
238    })?;
239    file.by_ref().take(max_bytes + 1).read_to_end(&mut bytes)?;
240    if bytes.len() as u64 > max_bytes {
241        return Err(std::io::Error::new(
242            std::io::ErrorKind::InvalidData,
243            format!("pipeline cache blob exceeded {max_bytes} byte bounded read limit"),
244        ));
245    }
246    Ok(bytes)
247}
248
249fn flush_paths(paths: &[std::path::PathBuf]) -> std::io::Result<()> {
250    let mut parents = Vec::new();
251    crate::allocation::try_reserve_vec_to_capacity(&mut parents, paths.len()).map_err(|error| {
252        std::io::Error::new(
253            std::io::ErrorKind::OutOfMemory,
254            format!(
255                "pipeline cache flush could not reserve {} parent path slot(s): {error}. Fix: flush fewer cache paths per batch.",
256                paths.len()
257            ),
258        )
259    })?;
260    sync_files_bounded(
261        paths,
262        std::fs::File::sync_data,
263        "disk cache file sync worker panicked",
264    )?;
265    for path in paths {
266        if let Some(parent) = path.parent() {
267            parents.push(parent.to_path_buf());
268        }
269    }
270    parents.sort();
271    parents.dedup();
272    sync_parent_dirs(&parents)?;
273    Ok(())
274}
275
276#[cfg(unix)]
277fn sync_parent_dirs(parents: &[std::path::PathBuf]) -> std::io::Result<()> {
278    sync_files_bounded(
279        parents,
280        std::fs::File::sync_all,
281        "disk cache dir sync worker panicked",
282    )
283}
284
285#[cfg(not(unix))]
286fn sync_parent_dirs(_parents: &[std::path::PathBuf]) -> std::io::Result<()> {
287    Ok(())
288}
289
290fn sync_files_bounded(
291    paths: &[std::path::PathBuf],
292    sync: fn(&std::fs::File) -> std::io::Result<()>,
293    panic_message: &'static str,
294) -> std::io::Result<()> {
295    if paths.is_empty() {
296        return Ok(());
297    }
298    let workers = std::thread::available_parallelism()
299        .map(usize::from)
300        .unwrap_or(1)
301        .clamp(1, 16);
302    for chunk in paths.chunks(workers) {
303        std::thread::scope(|scope| {
304            let mut handles = Vec::new();
305            crate::allocation::try_reserve_vec_to_capacity(&mut handles, chunk.len()).map_err(|error| {
306                std::io::Error::new(
307                    std::io::ErrorKind::OutOfMemory,
308                    format!(
309                        "pipeline cache sync could not reserve {} worker handle(s): {error}. Fix: lower pipeline cache sync fan-out.",
310                        chunk.len()
311                    ),
312                )
313            })?;
314            for path in chunk {
315                handles.push(scope.spawn(move || {
316                    let file = std::fs::File::open(path)?;
317                    sync(&file)
318                }));
319            }
320            for handle in handles {
321                handle
322                    .join()
323                    .map_err(|_| std::io::Error::other(panic_message))??;
324            }
325            Ok::<(), std::io::Error>(())
326        })?;
327    }
328    Ok(())
329}
330
331/// Capability bits that participate in pipeline-cache identity.
332///
333/// Two otherwise-identical pipelines compiled with different
334/// `PipelineFeatureFlags` produce different cache keys  -  a pipeline
335/// that assumed subgroup-op support cannot be reused on an adapter
336/// that does not expose subgroup ops even if the shader bytes match.
337///
338/// Encoded as a bitfield so the wire form is compact and trivially
339/// hashable. Bits `0x01..0x80` are allocated here; higher bits are
340/// reserved for additive backend capability flags.
341#[derive(
342    Copy, Clone, Debug, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize,
343)]
344pub struct PipelineFeatureFlags(pub u32);
345
346impl PipelineFeatureFlags {
347    /// Pipeline was compiled against a lowering that emits subgroup /
348    /// wave intrinsics.
349    pub const SUBGROUP_OPS: Self = Self(1 << 0);
350    /// Pipeline was compiled with native `f16` support.
351    pub const F16: Self = Self(1 << 1);
352    /// Pipeline was compiled with native `bf16` support.
353    pub const BF16: Self = Self(1 << 2);
354    /// Pipeline was compiled with tensor-core / matrix-engine
355    /// intrinsics enabled.
356    pub const TENSOR_CORES: Self = Self(1 << 3);
357    /// Pipeline expects an async-compute queue at dispatch time.
358    pub const ASYNC_COMPUTE: Self = Self(1 << 4);
359    /// Pipeline expects push-constant support at dispatch time.
360    pub const PUSH_CONSTANTS: Self = Self(1 << 5);
361    /// Pipeline emits indirect-dispatch commands.
362    pub const INDIRECT_DISPATCH: Self = Self(1 << 6);
363    /// Pipeline was compiled for speculative (fused prefilter+confirmer)
364    /// dispatch.
365    pub const SPECULATIVE: Self = Self(1 << 7);
366    /// Pipeline was compiled for persistent-thread (device-side work queue)
367    /// dispatch.
368    pub const PERSISTENT_THREAD: Self = Self(1 << 8);
369
370    /// Empty flag set.
371    #[must_use]
372    pub const fn empty() -> Self {
373        Self(0)
374    }
375
376    /// Contains at least every bit of `other`.
377    #[must_use]
378    pub const fn contains(self, other: Self) -> bool {
379        (self.0 & other.0) == other.0
380    }
381
382    /// Union of two flag sets.
383    #[must_use]
384    pub const fn union(self, other: Self) -> Self {
385        Self(self.0 | other.0)
386    }
387
388    /// Raw bit representation.
389    #[must_use]
390    pub const fn bits(self) -> u32 {
391        self.0
392    }
393}
394
395/// Versioned pipeline-cache key shared by every backend.
396///
397/// Replaces the pre-0.6 pattern of using a raw blake3 hash as the key.
398/// A raw hash is not robust: two pipelines that should miss (different
399/// bind-group layout, different push-constant size, different
400/// workgroup-size selection) hashed identically because the hash
401/// covered the shader source only. Silent cache hits against a
402/// non-equivalent pipeline are a correctness hazard (wrong bind-group
403/// layout binds undefined data; wrong workgroup-size launches beyond
404/// guarantees).
405///
406/// `#[non_exhaustive]` is enforced at the type level via the private
407/// `__phantom` field: external callers construct keys through
408/// [`PipelineCacheKey::new`] and cannot match exhaustively, so additive
409/// key fields do not break downstream matches.
410#[derive(Clone, Debug, Eq, PartialEq, Hash)]
411
412pub struct PipelineCacheKey {
413    /// Key format version. Bumped to invalidate every cache entry
414    /// without an API break.
415    pub version: u32,
416    /// blake3 hash of the canonical backend pipeline-source bytes.
417    pub shader_hash: [u8; 32],
418    /// Structural hash of the bind-group layout descriptors. Not the
419    /// backend handle; the bytes that describe slot count, types,
420    /// visibility, and access modes per bind group.
421    pub bind_group_layout_hash: [u8; 32],
422    /// Push-constant range in bytes. Included so a pipeline compiled
423    /// for 16 B push constants never reuses against a layout that
424    /// expects 32 B.
425    pub push_constant_size: u32,
426    /// Workgroup-size `[x, y, z]` the pipeline was specialized for.
427    pub workgroup_size: [u32; 3],
428    /// Feature-flag bits the pipeline assumes at dispatch time.
429    pub feature_flags: PipelineFeatureFlags,
430    /// Backend identity. Prevents pipelines from different backends from
431    /// colliding when they happen to produce identical shader hashes.
432    pub backend_id: BackendId,
433    /// Reserved private field so `PipelineCacheKey` cannot be
434    /// constructed by structural literal (forward-compatibility lever).
435    #[allow(dead_code)]
436    __phantom: core::marker::PhantomData<()>,
437}
438
439impl PipelineCacheKey {
440    /// Construct a key at the current version.
441    #[must_use]
442    #[allow(clippy::too_many_arguments)]
443    pub fn new(
444        shader_hash: [u8; 32],
445        bind_group_layout_hash: [u8; 32],
446        push_constant_size: u32,
447        workgroup_size: [u32; 3],
448        feature_flags: PipelineFeatureFlags,
449        backend_id: BackendId,
450    ) -> Self {
451        Self {
452            version: CURRENT_PIPELINE_CACHE_KEY_VERSION,
453            shader_hash,
454            bind_group_layout_hash,
455            push_constant_size,
456            workgroup_size,
457            feature_flags,
458            backend_id,
459            __phantom: core::marker::PhantomData,
460        }
461    }
462}
463
464/// Shared in-memory identity for a backend-compiled pipeline.
465///
466/// Backends may keep their own cache maps and compiled handles, but this object
467/// keeps the identity facts single-sourced: normalized Program digest, dispatch
468/// policy digest, device/runtime fingerprint, and the final digest used as the
469/// lookup key.
470#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
471pub struct PipelineCacheIdentity {
472    /// Final tuple-boundary-preserving lookup digest.
473    pub digest: [u8; 32],
474    /// Normalized Program digest used by backend pipeline caches.
475    pub program_digest: [u8; 32],
476    /// Dispatch policy fields that alter generated backend code.
477    pub policy_digest: [u8; 32],
478    /// Backend/device/runtime identity participating in the final digest.
479    pub device_fingerprint: PipelineDeviceFingerprint,
480}
481
482impl PipelineCacheIdentity {
483    /// Build identity from already-computed shared components.
484    #[must_use]
485    pub fn from_parts(
486        program_digest: [u8; 32],
487        policy_digest: [u8; 32],
488        device_fingerprint: PipelineDeviceFingerprint,
489    ) -> Self {
490        let mut hasher = blake3::Hasher::new();
491        hasher.update(b"vyre-pipeline-cache-identity-v1\0program\0");
492        hasher.update(&program_digest);
493        hasher.update(b"\0policy\0");
494        hasher.update(&policy_digest);
495        hasher.update(b"\0vendor\0");
496        hasher.update(&device_fingerprint.vendor.to_le_bytes());
497        hasher.update(b"\0device\0");
498        hasher.update(&device_fingerprint.device.to_le_bytes());
499        hasher.update(b"\0driver\0");
500        hasher.update(&device_fingerprint.driver_digest);
501        Self {
502            digest: *hasher.finalize().as_bytes(),
503            program_digest,
504            policy_digest,
505            device_fingerprint,
506        }
507    }
508
509    /// Build identity from a public Program and dispatch config.
510    ///
511    /// # Errors
512    ///
513    /// Returns when the Program cannot be serialized into stable cache
514    /// identity. Callers should surface the error through their backend error
515    /// type instead of hashing invalid IR lossy.
516    pub fn try_from_program(
517        program: &Program,
518        config: &DispatchConfig,
519        device_fingerprint: PipelineDeviceFingerprint,
520    ) -> Result<Self, String> {
521        let program_digest = try_normalized_program_cache_digest(program)?;
522        let policy_digest = dispatch_policy_cache_digest(config);
523        Ok(Self::from_parts(
524            program_digest,
525            policy_digest,
526            device_fingerprint,
527        ))
528    }
529}
530
531/// Evidence a backend can provide when a pipeline-cache lookup misses.
532///
533/// The classifier is intentionally backend-neutral: a concrete driver keeps its
534/// own fast cache key, but records enough adjacent identity facts to explain why
535/// a miss happened without duplicating per-backend reason logic.
536#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
537pub struct PipelineCacheMissEvidence {
538    /// Number of compiled pipeline entries present at lookup time.
539    pub total_entries: usize,
540    /// Entries whose normalized Program digest matched the requested program.
541    pub same_program_entries: usize,
542    /// Entries whose Program digest and dispatch policy digest both matched.
543    pub same_program_and_policy_entries: usize,
544    /// Entries whose Program digest, dispatch policy digest, and
545    /// device/runtime fingerprint all matched.
546    pub same_program_policy_and_device_entries: usize,
547}
548
549impl PipelineCacheMissEvidence {
550    /// Build miss evidence from cached identities adjacent to the requested key.
551    #[must_use]
552    pub fn from_identities<'a>(
553        cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
554        requested: &PipelineCacheIdentity,
555    ) -> Self {
556        let mut evidence = Self::default();
557        for identity in cached {
558            evidence.total_entries += 1;
559            if identity.program_digest == requested.program_digest {
560                evidence.same_program_entries += 1;
561                if identity.policy_digest == requested.policy_digest {
562                    evidence.same_program_and_policy_entries += 1;
563                    if identity.device_fingerprint == requested.device_fingerprint {
564                        evidence.same_program_policy_and_device_entries += 1;
565                    }
566                }
567            }
568        }
569        evidence
570    }
571}
572
573/// Backend-neutral cache-miss reason for operator telemetry.
574#[derive(Copy, Clone, Debug, Eq, PartialEq)]
575pub enum PipelineCacheMissReason {
576    /// The backend had no compiled entries at lookup time.
577    EmptyCache,
578    /// Existing entries were for different normalized programs.
579    ProgramChanged,
580    /// The same program existed, but dispatch policy changed the generated
581    /// pipeline identity.
582    DispatchPolicyChanged,
583    /// Program and dispatch policy matched, but device/runtime identity changed.
584    DeviceOrRuntimeChanged,
585    /// The supplied evidence says adjacent identity matched but the final key
586    /// still missed; this catches future key fields and malformed bookkeeping.
587    KeyAbsent,
588}
589
590impl PipelineCacheMissReason {
591    /// Classify a miss from backend-supplied adjacent identity evidence.
592    #[must_use]
593    pub const fn classify(evidence: PipelineCacheMissEvidence) -> Self {
594        if evidence.total_entries == 0 {
595            Self::EmptyCache
596        } else if evidence.same_program_entries == 0 {
597            Self::ProgramChanged
598        } else if evidence.same_program_and_policy_entries == 0 {
599            Self::DispatchPolicyChanged
600        } else if evidence.same_program_policy_and_device_entries == 0 {
601            Self::DeviceOrRuntimeChanged
602        } else {
603            Self::KeyAbsent
604        }
605    }
606
607    /// Classify a miss directly from cached pipeline identities.
608    #[must_use]
609    pub fn classify_identities<'a>(
610        cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
611        requested: &PipelineCacheIdentity,
612    ) -> Self {
613        Self::classify(PipelineCacheMissEvidence::from_identities(
614            cached, requested,
615        ))
616    }
617
618    /// Stable metric suffix for backend metric snapshots.
619    #[must_use]
620    pub const fn metric_suffix(self) -> &'static str {
621        match self {
622            Self::EmptyCache => "empty_cache",
623            Self::ProgramChanged => "program_changed",
624            Self::DispatchPolicyChanged => "dispatch_policy_changed",
625            Self::DeviceOrRuntimeChanged => "device_or_runtime_changed",
626            Self::KeyAbsent => "key_absent",
627        }
628    }
629}
630
631#[cfg(test)]
632
633mod pipeline_cache_key_tests {
634    use super::*;
635
636    fn hash32(byte: u8) -> [u8; 32] {
637        [byte; 32]
638    }
639
640    #[test]
641    fn different_workgroup_size_differs() {
642        let a = PipelineCacheKey::new(
643            hash32(1),
644            hash32(2),
645            0,
646            [64, 1, 1],
647            PipelineFeatureFlags::empty(),
648            BackendId::from("backend-a"),
649        );
650        let b = PipelineCacheKey::new(
651            hash32(1),
652            hash32(2),
653            0,
654            [128, 1, 1],
655            PipelineFeatureFlags::empty(),
656            BackendId::from("backend-a"),
657        );
658        assert_ne!(a, b);
659    }
660
661    #[test]
662    fn different_feature_flags_differ() {
663        let a = PipelineCacheKey::new(
664            hash32(1),
665            hash32(2),
666            0,
667            [1, 1, 1],
668            PipelineFeatureFlags::empty(),
669            BackendId::from("backend-a"),
670        );
671        let b = PipelineCacheKey::new(
672            hash32(1),
673            hash32(2),
674            0,
675            [1, 1, 1],
676            PipelineFeatureFlags::SUBGROUP_OPS,
677            BackendId::from("backend-a"),
678        );
679        assert_ne!(a, b);
680    }
681
682    #[test]
683    fn different_backend_id_differs() {
684        let a = PipelineCacheKey::new(
685            hash32(1),
686            hash32(2),
687            0,
688            [1, 1, 1],
689            PipelineFeatureFlags::empty(),
690            BackendId::from("backend-a"),
691        );
692        let b = PipelineCacheKey::new(
693            hash32(1),
694            hash32(2),
695            0,
696            [1, 1, 1],
697            PipelineFeatureFlags::empty(),
698            BackendId::from("backend-b"),
699        );
700        assert_ne!(a, b);
701    }
702
703    #[test]
704    fn flag_containment_is_correct() {
705        let a = PipelineFeatureFlags::SUBGROUP_OPS.union(PipelineFeatureFlags::F16);
706        assert!(a.contains(PipelineFeatureFlags::SUBGROUP_OPS));
707        assert!(a.contains(PipelineFeatureFlags::F16));
708        assert!(!a.contains(PipelineFeatureFlags::TENSOR_CORES));
709    }
710
711    #[test]
712    fn version_is_current() {
713        let k = PipelineCacheKey::new(
714            hash32(1),
715            hash32(2),
716            0,
717            [1, 1, 1],
718            PipelineFeatureFlags::empty(),
719            BackendId::from("backend-a"),
720        );
721        assert_eq!(k.version, CURRENT_PIPELINE_CACHE_KEY_VERSION);
722    }
723
724    #[test]
725    fn shared_cache_identity_separates_program_policy_and_device_facts() {
726        let program_a = hash32(1);
727        let program_b = hash32(2);
728        let policy_a = hash32(3);
729        let policy_b = hash32(4);
730        let device_a = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
731        let device_b = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
732
733        let base = PipelineCacheIdentity::from_parts(program_a, policy_a, device_a);
734
735        assert_eq!(base.program_digest, program_a);
736        assert_eq!(base.policy_digest, policy_a);
737        assert_eq!(base.device_fingerprint, device_a);
738        assert_ne!(
739            base.digest,
740            PipelineCacheIdentity::from_parts(program_b, policy_a, device_a).digest,
741            "Fix: shared pipeline cache identity must include the normalized Program digest."
742        );
743        assert_ne!(
744            base.digest,
745            PipelineCacheIdentity::from_parts(program_a, policy_b, device_a).digest,
746            "Fix: shared pipeline cache identity must include dispatch policy as its own tuple field."
747        );
748        assert_ne!(
749            base.digest,
750            PipelineCacheIdentity::from_parts(program_a, policy_a, device_b).digest,
751            "Fix: shared pipeline cache identity must include device/runtime fingerprint facts."
752        );
753    }
754
755    #[test]
756    fn miss_reason_classifies_adjacent_identity_evidence() {
757        assert_eq!(
758            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
759                total_entries: 0,
760                same_program_entries: 0,
761                same_program_and_policy_entries: 0,
762                same_program_policy_and_device_entries: 0,
763            }),
764            PipelineCacheMissReason::EmptyCache
765        );
766        assert_eq!(
767            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
768                total_entries: 3,
769                same_program_entries: 0,
770                same_program_and_policy_entries: 0,
771                same_program_policy_and_device_entries: 0,
772            }),
773            PipelineCacheMissReason::ProgramChanged
774        );
775        assert_eq!(
776            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
777                total_entries: 3,
778                same_program_entries: 2,
779                same_program_and_policy_entries: 0,
780                same_program_policy_and_device_entries: 0,
781            }),
782            PipelineCacheMissReason::DispatchPolicyChanged
783        );
784        assert_eq!(
785            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
786                total_entries: 3,
787                same_program_entries: 2,
788                same_program_and_policy_entries: 1,
789                same_program_policy_and_device_entries: 0,
790            }),
791            PipelineCacheMissReason::DeviceOrRuntimeChanged
792        );
793        assert_eq!(
794            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
795                total_entries: 3,
796                same_program_entries: 2,
797                same_program_and_policy_entries: 1,
798                same_program_policy_and_device_entries: 1,
799            }),
800            PipelineCacheMissReason::KeyAbsent
801        );
802    }
803
804    #[test]
805    fn miss_reason_metric_suffixes_are_stable_snake_case() {
806        assert_eq!(PipelineCacheMissReason::EmptyCache.metric_suffix(), "empty_cache");
807        assert_eq!(
808            PipelineCacheMissReason::ProgramChanged.metric_suffix(),
809            "program_changed"
810        );
811        assert_eq!(
812            PipelineCacheMissReason::DispatchPolicyChanged.metric_suffix(),
813            "dispatch_policy_changed"
814        );
815        assert_eq!(
816            PipelineCacheMissReason::DeviceOrRuntimeChanged.metric_suffix(),
817            "device_or_runtime_changed"
818        );
819        assert_eq!(PipelineCacheMissReason::KeyAbsent.metric_suffix(), "key_absent");
820    }
821
822    #[test]
823    fn miss_reason_classifies_cached_shared_identities() {
824        let program = hash32(1);
825        let other_program = hash32(2);
826        let policy = hash32(3);
827        let other_policy = hash32(4);
828        let device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
829        let other_device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
830        let requested = PipelineCacheIdentity::from_parts(program, policy, device);
831
832        assert_eq!(
833            PipelineCacheMissReason::classify_identities([].iter(), &requested),
834            PipelineCacheMissReason::EmptyCache
835        );
836        assert_eq!(
837            PipelineCacheMissReason::classify_identities(
838                [PipelineCacheIdentity::from_parts(other_program, policy, device)].iter(),
839                &requested,
840            ),
841            PipelineCacheMissReason::ProgramChanged
842        );
843        assert_eq!(
844            PipelineCacheMissReason::classify_identities(
845                [PipelineCacheIdentity::from_parts(program, other_policy, device)].iter(),
846                &requested,
847            ),
848            PipelineCacheMissReason::DispatchPolicyChanged
849        );
850        assert_eq!(
851            PipelineCacheMissReason::classify_identities(
852                [PipelineCacheIdentity::from_parts(program, policy, other_device)].iter(),
853                &requested,
854            ),
855            PipelineCacheMissReason::DeviceOrRuntimeChanged
856        );
857        assert_eq!(
858            PipelineCacheMissReason::classify_identities(
859                [PipelineCacheIdentity::from_parts(program, policy, device)].iter(),
860                &requested,
861            ),
862            PipelineCacheMissReason::KeyAbsent
863        );
864    }
865
866    #[test]
867    fn poisoned_pending_flush_lock_is_not_silently_recovered() {
868        let cache = Arc::new(DiskPipelineCache {
869            root: std::env::temp_dir(),
870            pending_flushes: std::sync::Mutex::new(Vec::new()),
871        });
872        let poisoned = Arc::clone(&cache);
873        let _ = std::thread::spawn(move || {
874            let _guard = poisoned.lock_pending_flushes();
875            panic!("poison disk pipeline cache pending flushes");
876        })
877        .join();
878
879        let panic = std::panic::catch_unwind(|| {
880            drop(cache.lock_pending_flushes());
881        })
882        .expect_err("poisoned disk pipeline cache must panic instead of recovering");
883        let message = panic
884            .downcast_ref::<String>()
885            .map(String::as_str)
886            .or_else(|| panic.downcast_ref::<&'static str>().copied())
887            .unwrap_or("<non-string panic>");
888        assert!(
889            message.contains("pending-flush lock was poisoned"),
890            "{message}"
891        );
892    }
893}