Skip to main content

vyre_driver/pipeline/
cache.rs

1//! Shared persistent cache for backend compiled-pipeline blobs.
2
3use super::hashing::{
4    dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode,
5    normalized_program_cache_digest, try_normalized_program_cache_digest,
6    PipelineDeviceFingerprint,
7};
8use super::CURRENT_PIPELINE_CACHE_KEY_VERSION;
9use crate::backend::DispatchConfig;
10use std::sync::{Arc, MutexGuard};
11use vyre_foundation::ir::Program;
12use vyre_spec::BackendId;
13
14/// Maximum persistent pipeline blob read into memory.
15pub const MAX_DISK_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
16
17/// Disk cache for compiled pipeline blobs keyed by program and device.
18pub struct DiskPipelineCache {
19    root: std::path::PathBuf,
20    pending_flushes: std::sync::Mutex<Vec<std::path::PathBuf>>,
21}
22
23impl DiskPipelineCache {
24    fn try_lock_pending_flushes(&self) -> std::io::Result<MutexGuard<'_, Vec<std::path::PathBuf>>> {
25        self.pending_flushes.lock().map_err(|error| {
26            std::io::Error::other(format!(
27                "Vyre disk pipeline cache pending-flush lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could lose or duplicate compiled-pipeline fsync work."
28            ))
29        })
30    }
31
32    /// Open a cache rooted at `root`.
33    ///
34    /// # Errors
35    ///
36    /// Returns when the root directory cannot be created.
37    pub fn open(root: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
38        let root = root.into();
39        std::fs::create_dir_all(&root)?;
40        Ok(Self {
41            root,
42            pending_flushes: std::sync::Mutex::new(Vec::new()),
43        })
44    }
45
46    /// Default cache directory.
47    #[must_use]
48    pub fn default_root() -> std::path::PathBuf {
49        if let Some(xdg) = std::env::var_os("XDG_CACHE_HOME") {
50            return std::path::PathBuf::from(xdg).join("vyre").join("pipelines");
51        }
52        if let Some(home) = std::env::var_os("HOME") {
53            #[cfg(target_os = "macos")]
54            {
55                return std::path::PathBuf::from(home)
56                    .join("Library")
57                    .join("Caches")
58                    .join("vyre")
59                    .join("pipelines");
60            }
61            #[cfg(not(target_os = "macos"))]
62            {
63                return std::path::PathBuf::from(home)
64                    .join(".cache")
65                    .join("vyre")
66                    .join("pipelines");
67            }
68        }
69        if let Some(appdata) = std::env::var_os("LOCALAPPDATA") {
70            return std::path::PathBuf::from(appdata)
71                .join("vyre")
72                .join("pipelines");
73        }
74        std::path::PathBuf::from("./vyre-cache/pipelines")
75    }
76
77    /// Derive the cache path for a program digest and device fingerprint.
78    #[must_use]
79    pub fn path_for(
80        &self,
81        program_digest: [u8; 32],
82        fingerprint: PipelineDeviceFingerprint,
83    ) -> std::path::PathBuf {
84        let key = fingerprint.cache_key(program_digest);
85        let mut file_name = hex_encode(&key);
86        let mut path = self.root.join(&file_name[..2]);
87        file_name.push_str(".bin");
88        path.push(file_name);
89        path
90    }
91
92    /// Read a cached blob. Returns `None` on a miss.
93    ///
94    /// # Errors
95    ///
96    /// Returns when an existing entry cannot be read.
97    pub fn read(
98        &self,
99        program_digest: [u8; 32],
100        fingerprint: PipelineDeviceFingerprint,
101    ) -> std::io::Result<Option<Vec<u8>>> {
102        let path = self.path_for(program_digest, fingerprint);
103        match read_bounded(&path, MAX_DISK_PIPELINE_BLOB_BYTES) {
104            Ok(bytes) => Ok(Some(bytes)),
105            Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
106            Err(error) => Err(error),
107        }
108    }
109
110    /// Write a cache blob with atomic install.
111    ///
112    /// # Errors
113    ///
114    /// Returns when the entry is oversized or cannot be written.
115    pub fn write(
116        &self,
117        program_digest: [u8; 32],
118        fingerprint: PipelineDeviceFingerprint,
119        bytes: &[u8],
120    ) -> std::io::Result<()> {
121        if bytes.len() as u64 > MAX_DISK_PIPELINE_BLOB_BYTES {
122            return Err(std::io::Error::new(
123                std::io::ErrorKind::InvalidInput,
124                format!("pipeline cache blob exceeds {MAX_DISK_PIPELINE_BLOB_BYTES} byte limit"),
125            ));
126        }
127        let path = self.path_for(program_digest, fingerprint);
128        if let Some(parent) = path.parent() {
129            std::fs::create_dir_all(parent)?;
130        }
131        let tmp = self.tmp_path_for(&path);
132        let write_result = (|| -> std::io::Result<()> {
133            let mut file = std::fs::File::create(&tmp)?;
134            use std::io::Write as _;
135            file.write_all(bytes)?;
136            drop(file);
137            std::fs::rename(&tmp, &path)
138        })();
139        if write_result.is_err() {
140            remove_failed_atomic_write(&tmp)?;
141        }
142        write_result?;
143        self.try_lock_pending_flushes()?.push(path);
144        Ok(())
145    }
146
147    /// Durably flush entries written by [`Self::write`].
148    ///
149    /// # Errors
150    ///
151    /// Returns when a pending path cannot be flushed.
152    pub fn flush(&self) -> std::io::Result<()> {
153        let paths = {
154            let mut pending = self.try_lock_pending_flushes()?;
155            pending.sort();
156            pending.dedup();
157            std::mem::take(&mut *pending)
158        };
159        if let Err(error) = flush_paths(&paths) {
160            self.try_lock_pending_flushes()?.extend(paths);
161            return Err(error);
162        }
163        Ok(())
164    }
165
166    /// Remove entries selected by an impact mask.
167    ///
168    /// # Errors
169    ///
170    /// Returns when an impacted entry exists but cannot be removed.
171    pub fn invalidate_impacted(
172        &self,
173        impact_mask: &[u32],
174        program_digests: &[[u8; 32]],
175        fingerprint: PipelineDeviceFingerprint,
176    ) -> std::io::Result<()> {
177        for (index, &is_impacted) in impact_mask.iter().enumerate() {
178            if is_impacted != 0 {
179                if let Some(&digest) = program_digests.get(index) {
180                    let path = self.path_for(digest, fingerprint);
181                    if path.exists() {
182                        std::fs::remove_file(path)?;
183                    }
184                }
185            }
186        }
187        Ok(())
188    }
189
190    /// Root directory used by this cache.
191    #[must_use]
192    pub fn root(&self) -> &std::path::Path {
193        &self.root
194    }
195
196    fn tmp_path_for(&self, path: &std::path::Path) -> std::path::PathBuf {
197        static TMP_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
198        let tmp_id = TMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
199        path.with_extension(format!("bin.tmp.{}.{}", std::process::id(), tmp_id))
200    }
201}
202
203fn remove_failed_atomic_write(path: &std::path::Path) -> std::io::Result<()> {
204    match std::fs::remove_file(path) {
205        Ok(()) => Ok(()),
206        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
207        Err(error) => Err(error),
208    }
209}
210
211fn read_bounded(path: &std::path::Path, max_bytes: u64) -> std::io::Result<Vec<u8>> {
212    use std::io::Read as _;
213
214    let mut file = std::fs::File::open(path)?;
215    let metadata = file.metadata()?;
216    if metadata.len() > max_bytes {
217        return Err(std::io::Error::new(
218            std::io::ErrorKind::InvalidData,
219            format!("pipeline cache blob exceeds {max_bytes} byte limit"),
220        ));
221    }
222    let byte_capacity = usize::try_from(metadata.len()).map_err(|error| {
223        std::io::Error::new(
224            std::io::ErrorKind::InvalidData,
225            format!(
226                "pipeline cache blob length {} does not fit usize: {error}",
227                metadata.len()
228            ),
229        )
230    })?;
231    let mut bytes = Vec::new();
232    crate::allocation::try_reserve_vec_to_capacity(&mut bytes, byte_capacity).map_err(|error| {
233        std::io::Error::new(
234            std::io::ErrorKind::OutOfMemory,
235            format!(
236                "pipeline cache bounded read could not reserve {byte_capacity} byte(s): {error}. Fix: lower the pipeline cache blob limit or evict oversized entries."
237            ),
238        )
239    })?;
240    file.by_ref().take(max_bytes + 1).read_to_end(&mut bytes)?;
241    if bytes.len() as u64 > max_bytes {
242        return Err(std::io::Error::new(
243            std::io::ErrorKind::InvalidData,
244            format!("pipeline cache blob exceeded {max_bytes} byte bounded read limit"),
245        ));
246    }
247    Ok(bytes)
248}
249
250fn flush_paths(paths: &[std::path::PathBuf]) -> std::io::Result<()> {
251    let mut parents = Vec::new();
252    crate::allocation::try_reserve_vec_to_capacity(&mut parents, paths.len()).map_err(|error| {
253        std::io::Error::new(
254            std::io::ErrorKind::OutOfMemory,
255            format!(
256                "pipeline cache flush could not reserve {} parent path slot(s): {error}. Fix: flush fewer cache paths per batch.",
257                paths.len()
258            ),
259        )
260    })?;
261    sync_files_bounded(
262        paths,
263        std::fs::File::sync_data,
264        "disk cache file sync worker panicked",
265    )?;
266    for path in paths {
267        if let Some(parent) = path.parent() {
268            parents.push(parent.to_path_buf());
269        }
270    }
271    parents.sort();
272    parents.dedup();
273    sync_parent_dirs(&parents)?;
274    Ok(())
275}
276
277#[cfg(unix)]
278fn sync_parent_dirs(parents: &[std::path::PathBuf]) -> std::io::Result<()> {
279    sync_files_bounded(
280        parents,
281        std::fs::File::sync_all,
282        "disk cache dir sync worker panicked",
283    )
284}
285
286#[cfg(not(unix))]
287fn sync_parent_dirs(_parents: &[std::path::PathBuf]) -> std::io::Result<()> {
288    Ok(())
289}
290
291fn sync_files_bounded(
292    paths: &[std::path::PathBuf],
293    sync: fn(&std::fs::File) -> std::io::Result<()>,
294    panic_message: &'static str,
295) -> std::io::Result<()> {
296    if paths.is_empty() {
297        return Ok(());
298    }
299    let workers = std::thread::available_parallelism()
300        .map(usize::from)
301        .unwrap_or(1)
302        .clamp(1, 16);
303    for chunk in paths.chunks(workers) {
304        std::thread::scope(|scope| {
305            let mut handles = Vec::new();
306            crate::allocation::try_reserve_vec_to_capacity(&mut handles, chunk.len()).map_err(|error| {
307                std::io::Error::new(
308                    std::io::ErrorKind::OutOfMemory,
309                    format!(
310                        "pipeline cache sync could not reserve {} worker handle(s): {error}. Fix: lower pipeline cache sync fan-out.",
311                        chunk.len()
312                    ),
313                )
314            })?;
315            for path in chunk {
316                handles.push(scope.spawn(move || {
317                    let file = std::fs::File::open(path)?;
318                    sync(&file)
319                }));
320            }
321            for handle in handles {
322                handle
323                    .join()
324                    .map_err(|_| std::io::Error::other(panic_message))??;
325            }
326            Ok::<(), std::io::Error>(())
327        })?;
328    }
329    Ok(())
330}
331
332/// Capability bits that participate in pipeline-cache identity.
333///
334/// Two otherwise-identical pipelines compiled with different
335/// `PipelineFeatureFlags` produce different cache keys  -  a pipeline
336/// that assumed subgroup-op support cannot be reused on an adapter
337/// that does not expose subgroup ops even if the shader bytes match.
338///
339/// Encoded as a bitfield so the wire form is compact and trivially
340/// hashable. Bits `0x01..0x80` are allocated here; higher bits are
341/// reserved for additive backend capability flags.
342#[derive(
343    Copy, Clone, Debug, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize,
344)]
345pub struct PipelineFeatureFlags(pub u32);
346
347impl PipelineFeatureFlags {
348    /// Pipeline was compiled against a lowering that emits subgroup /
349    /// wave intrinsics.
350    pub const SUBGROUP_OPS: Self = Self(1 << 0);
351    /// Pipeline was compiled with native `f16` support.
352    pub const F16: Self = Self(1 << 1);
353    /// Pipeline was compiled with native `bf16` support.
354    pub const BF16: Self = Self(1 << 2);
355    /// Pipeline was compiled with tensor-core / matrix-engine
356    /// intrinsics enabled.
357    pub const TENSOR_CORES: Self = Self(1 << 3);
358    /// Pipeline expects an async-compute queue at dispatch time.
359    pub const ASYNC_COMPUTE: Self = Self(1 << 4);
360    /// Pipeline expects push-constant support at dispatch time.
361    pub const PUSH_CONSTANTS: Self = Self(1 << 5);
362    /// Pipeline emits indirect-dispatch commands.
363    pub const INDIRECT_DISPATCH: Self = Self(1 << 6);
364    /// Pipeline was compiled for speculative (fused prefilter+confirmer)
365    /// dispatch.
366    pub const SPECULATIVE: Self = Self(1 << 7);
367    /// Pipeline was compiled for persistent-thread (device-side work queue)
368    /// dispatch.
369    pub const PERSISTENT_THREAD: Self = Self(1 << 8);
370
371    /// Empty flag set.
372    #[must_use]
373    pub const fn empty() -> Self {
374        Self(0)
375    }
376
377    /// Contains at least every bit of `other`.
378    #[must_use]
379    pub const fn contains(self, other: Self) -> bool {
380        (self.0 & other.0) == other.0
381    }
382
383    /// Union of two flag sets.
384    #[must_use]
385    pub const fn union(self, other: Self) -> Self {
386        Self(self.0 | other.0)
387    }
388
389    /// Raw bit representation.
390    #[must_use]
391    pub const fn bits(self) -> u32 {
392        self.0
393    }
394}
395
396/// Versioned pipeline-cache key shared by every backend.
397///
398/// Replaces the pre-0.6 pattern of using a raw blake3 hash as the key.
399/// A raw hash is not robust: two pipelines that should miss (different
400/// bind-group layout, different push-constant size, different
401/// workgroup-size selection) hashed identically because the hash
402/// covered the shader source only. Silent cache hits against a
403/// non-equivalent pipeline are a correctness hazard (wrong bind-group
404/// layout binds undefined data; wrong workgroup-size launches beyond
405/// guarantees).
406///
407/// `#[non_exhaustive]` is enforced at the type level via the private
408/// `__phantom` field: external callers construct keys through
409/// [`PipelineCacheKey::new`] and cannot match exhaustively, so additive
410/// key fields do not break downstream matches.
411#[derive(Clone, Debug, Eq, PartialEq, Hash)]
412
413pub struct PipelineCacheKey {
414    /// Key format version. Bumped to invalidate every cache entry
415    /// without an API break.
416    pub version: u32,
417    /// blake3 hash of the canonical backend pipeline-source bytes.
418    pub shader_hash: [u8; 32],
419    /// Structural hash of the bind-group layout descriptors. Not the
420    /// backend handle; the bytes that describe slot count, types,
421    /// visibility, and access modes per bind group.
422    pub bind_group_layout_hash: [u8; 32],
423    /// Push-constant range in bytes. Included so a pipeline compiled
424    /// for 16 B push constants never reuses against a layout that
425    /// expects 32 B.
426    pub push_constant_size: u32,
427    /// Workgroup-size `[x, y, z]` the pipeline was specialized for.
428    pub workgroup_size: [u32; 3],
429    /// Feature-flag bits the pipeline assumes at dispatch time.
430    pub feature_flags: PipelineFeatureFlags,
431    /// Backend identity. Prevents pipelines from different backends from
432    /// colliding when they happen to produce identical shader hashes.
433    pub backend_id: BackendId,
434    /// Reserved private field so `PipelineCacheKey` cannot be
435    /// constructed by structural literal (forward-compatibility lever).
436    #[allow(dead_code)]
437    __phantom: core::marker::PhantomData<()>,
438}
439
440impl PipelineCacheKey {
441    /// Construct a key at the current version.
442    #[must_use]
443    #[allow(clippy::too_many_arguments)]
444    pub fn new(
445        shader_hash: [u8; 32],
446        bind_group_layout_hash: [u8; 32],
447        push_constant_size: u32,
448        workgroup_size: [u32; 3],
449        feature_flags: PipelineFeatureFlags,
450        backend_id: BackendId,
451    ) -> Self {
452        Self {
453            version: CURRENT_PIPELINE_CACHE_KEY_VERSION,
454            shader_hash,
455            bind_group_layout_hash,
456            push_constant_size,
457            workgroup_size,
458            feature_flags,
459            backend_id,
460            __phantom: core::marker::PhantomData,
461        }
462    }
463}
464
465/// Shared in-memory identity for a backend-compiled pipeline.
466///
467/// Backends may keep their own cache maps and compiled handles, but this object
468/// keeps the identity facts single-sourced: normalized Program digest, dispatch
469/// policy digest, device/runtime fingerprint, and the final digest used as the
470/// lookup key.
471#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
472pub struct PipelineCacheIdentity {
473    /// Final tuple-boundary-preserving lookup digest.
474    pub digest: [u8; 32],
475    /// Normalized Program digest used by backend pipeline caches.
476    pub program_digest: [u8; 32],
477    /// Dispatch policy fields that alter generated backend code.
478    pub policy_digest: [u8; 32],
479    /// Backend/device/runtime identity participating in the final digest.
480    pub device_fingerprint: PipelineDeviceFingerprint,
481}
482
483impl PipelineCacheIdentity {
484    /// Build identity from already-computed shared components.
485    #[must_use]
486    pub fn from_parts(
487        program_digest: [u8; 32],
488        policy_digest: [u8; 32],
489        device_fingerprint: PipelineDeviceFingerprint,
490    ) -> Self {
491        let mut hasher = blake3::Hasher::new();
492        hasher.update(b"vyre-pipeline-cache-identity-v1\0program\0");
493        hasher.update(&program_digest);
494        hasher.update(b"\0policy\0");
495        hasher.update(&policy_digest);
496        hasher.update(b"\0vendor\0");
497        hasher.update(&device_fingerprint.vendor.to_le_bytes());
498        hasher.update(b"\0device\0");
499        hasher.update(&device_fingerprint.device.to_le_bytes());
500        hasher.update(b"\0driver\0");
501        hasher.update(&device_fingerprint.driver_digest);
502        Self {
503            digest: *hasher.finalize().as_bytes(),
504            program_digest,
505            policy_digest,
506            device_fingerprint,
507        }
508    }
509
510    /// Build identity from a public Program and dispatch config.
511    ///
512    /// # Errors
513    ///
514    /// Returns when the Program cannot be serialized into stable cache
515    /// identity. Callers should surface the error through their backend error
516    /// type instead of hashing invalid IR lossy.
517    pub fn try_from_program(
518        program: &Program,
519        config: &DispatchConfig,
520        device_fingerprint: PipelineDeviceFingerprint,
521    ) -> Result<Self, String> {
522        let program_digest = try_normalized_program_cache_digest(program)?;
523        let policy_digest = dispatch_policy_cache_digest(config);
524        Ok(Self::from_parts(
525            program_digest,
526            policy_digest,
527            device_fingerprint,
528        ))
529    }
530}
531
532/// Evidence a backend can provide when a pipeline-cache lookup misses.
533///
534/// The classifier is intentionally backend-neutral: a concrete driver keeps its
535/// own fast cache key, but records enough adjacent identity facts to explain why
536/// a miss happened without duplicating per-backend reason logic.
537#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
538pub struct PipelineCacheMissEvidence {
539    /// Number of compiled pipeline entries present at lookup time.
540    pub total_entries: usize,
541    /// Entries whose normalized Program digest matched the requested program.
542    pub same_program_entries: usize,
543    /// Entries whose Program digest and dispatch policy digest both matched.
544    pub same_program_and_policy_entries: usize,
545    /// Entries whose Program digest, dispatch policy digest, and
546    /// device/runtime fingerprint all matched.
547    pub same_program_policy_and_device_entries: usize,
548}
549
550impl PipelineCacheMissEvidence {
551    /// Build miss evidence from cached identities adjacent to the requested key.
552    #[must_use]
553    pub fn from_identities<'a>(
554        cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
555        requested: &PipelineCacheIdentity,
556    ) -> Self {
557        let mut evidence = Self::default();
558        for identity in cached {
559            evidence.total_entries += 1;
560            if identity.program_digest == requested.program_digest {
561                evidence.same_program_entries += 1;
562                if identity.policy_digest == requested.policy_digest {
563                    evidence.same_program_and_policy_entries += 1;
564                    if identity.device_fingerprint == requested.device_fingerprint {
565                        evidence.same_program_policy_and_device_entries += 1;
566                    }
567                }
568            }
569        }
570        evidence
571    }
572}
573
574/// Backend-neutral cache-miss reason for operator telemetry.
575#[derive(Copy, Clone, Debug, Eq, PartialEq)]
576pub enum PipelineCacheMissReason {
577    /// The backend had no compiled entries at lookup time.
578    EmptyCache,
579    /// Existing entries were for different normalized programs.
580    ProgramChanged,
581    /// The same program existed, but dispatch policy changed the generated
582    /// pipeline identity.
583    DispatchPolicyChanged,
584    /// Program and dispatch policy matched, but device/runtime identity changed.
585    DeviceOrRuntimeChanged,
586    /// The supplied evidence says adjacent identity matched but the final key
587    /// still missed; this catches future key fields and malformed bookkeeping.
588    KeyAbsent,
589}
590
591impl PipelineCacheMissReason {
592    /// Classify a miss from backend-supplied adjacent identity evidence.
593    #[must_use]
594    pub const fn classify(evidence: PipelineCacheMissEvidence) -> Self {
595        if evidence.total_entries == 0 {
596            Self::EmptyCache
597        } else if evidence.same_program_entries == 0 {
598            Self::ProgramChanged
599        } else if evidence.same_program_and_policy_entries == 0 {
600            Self::DispatchPolicyChanged
601        } else if evidence.same_program_policy_and_device_entries == 0 {
602            Self::DeviceOrRuntimeChanged
603        } else {
604            Self::KeyAbsent
605        }
606    }
607
608    /// Classify a miss directly from cached pipeline identities.
609    #[must_use]
610    pub fn classify_identities<'a>(
611        cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
612        requested: &PipelineCacheIdentity,
613    ) -> Self {
614        Self::classify(PipelineCacheMissEvidence::from_identities(
615            cached, requested,
616        ))
617    }
618
619    /// Stable metric suffix for backend metric snapshots.
620    #[must_use]
621    pub const fn metric_suffix(self) -> &'static str {
622        match self {
623            Self::EmptyCache => "empty_cache",
624            Self::ProgramChanged => "program_changed",
625            Self::DispatchPolicyChanged => "dispatch_policy_changed",
626            Self::DeviceOrRuntimeChanged => "device_or_runtime_changed",
627            Self::KeyAbsent => "key_absent",
628        }
629    }
630}
631
632#[cfg(test)]
633
634mod pipeline_cache_key_tests {
635    use super::*;
636
637    fn hash32(byte: u8) -> [u8; 32] {
638        [byte; 32]
639    }
640
641    #[test]
642    fn different_workgroup_size_differs() {
643        let a = PipelineCacheKey::new(
644            hash32(1),
645            hash32(2),
646            0,
647            [64, 1, 1],
648            PipelineFeatureFlags::empty(),
649            BackendId::from("backend-a"),
650        );
651        let b = PipelineCacheKey::new(
652            hash32(1),
653            hash32(2),
654            0,
655            [128, 1, 1],
656            PipelineFeatureFlags::empty(),
657            BackendId::from("backend-a"),
658        );
659        assert_ne!(a, b);
660    }
661
662    #[test]
663    fn different_feature_flags_differ() {
664        let a = PipelineCacheKey::new(
665            hash32(1),
666            hash32(2),
667            0,
668            [1, 1, 1],
669            PipelineFeatureFlags::empty(),
670            BackendId::from("backend-a"),
671        );
672        let b = PipelineCacheKey::new(
673            hash32(1),
674            hash32(2),
675            0,
676            [1, 1, 1],
677            PipelineFeatureFlags::SUBGROUP_OPS,
678            BackendId::from("backend-a"),
679        );
680        assert_ne!(a, b);
681    }
682
683    #[test]
684    fn different_backend_id_differs() {
685        let a = PipelineCacheKey::new(
686            hash32(1),
687            hash32(2),
688            0,
689            [1, 1, 1],
690            PipelineFeatureFlags::empty(),
691            BackendId::from("backend-a"),
692        );
693        let b = PipelineCacheKey::new(
694            hash32(1),
695            hash32(2),
696            0,
697            [1, 1, 1],
698            PipelineFeatureFlags::empty(),
699            BackendId::from("backend-b"),
700        );
701        assert_ne!(a, b);
702    }
703
704    #[test]
705    fn flag_containment_is_correct() {
706        let a = PipelineFeatureFlags::SUBGROUP_OPS.union(PipelineFeatureFlags::F16);
707        assert!(a.contains(PipelineFeatureFlags::SUBGROUP_OPS));
708        assert!(a.contains(PipelineFeatureFlags::F16));
709        assert!(!a.contains(PipelineFeatureFlags::TENSOR_CORES));
710    }
711
712    #[test]
713    fn version_is_current() {
714        let k = PipelineCacheKey::new(
715            hash32(1),
716            hash32(2),
717            0,
718            [1, 1, 1],
719            PipelineFeatureFlags::empty(),
720            BackendId::from("backend-a"),
721        );
722        assert_eq!(k.version, CURRENT_PIPELINE_CACHE_KEY_VERSION);
723    }
724
725    #[test]
726    fn shared_cache_identity_separates_program_policy_and_device_facts() {
727        let program_a = hash32(1);
728        let program_b = hash32(2);
729        let policy_a = hash32(3);
730        let policy_b = hash32(4);
731        let device_a = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
732        let device_b = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
733
734        let base = PipelineCacheIdentity::from_parts(program_a, policy_a, device_a);
735
736        assert_eq!(base.program_digest, program_a);
737        assert_eq!(base.policy_digest, policy_a);
738        assert_eq!(base.device_fingerprint, device_a);
739        assert_ne!(
740            base.digest,
741            PipelineCacheIdentity::from_parts(program_b, policy_a, device_a).digest,
742            "Fix: shared pipeline cache identity must include the normalized Program digest."
743        );
744        assert_ne!(
745            base.digest,
746            PipelineCacheIdentity::from_parts(program_a, policy_b, device_a).digest,
747            "Fix: shared pipeline cache identity must include dispatch policy as its own tuple field."
748        );
749        assert_ne!(
750            base.digest,
751            PipelineCacheIdentity::from_parts(program_a, policy_a, device_b).digest,
752            "Fix: shared pipeline cache identity must include device/runtime fingerprint facts."
753        );
754    }
755
756    #[test]
757    fn miss_reason_classifies_adjacent_identity_evidence() {
758        assert_eq!(
759            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
760                total_entries: 0,
761                same_program_entries: 0,
762                same_program_and_policy_entries: 0,
763                same_program_policy_and_device_entries: 0,
764            }),
765            PipelineCacheMissReason::EmptyCache
766        );
767        assert_eq!(
768            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
769                total_entries: 3,
770                same_program_entries: 0,
771                same_program_and_policy_entries: 0,
772                same_program_policy_and_device_entries: 0,
773            }),
774            PipelineCacheMissReason::ProgramChanged
775        );
776        assert_eq!(
777            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
778                total_entries: 3,
779                same_program_entries: 2,
780                same_program_and_policy_entries: 0,
781                same_program_policy_and_device_entries: 0,
782            }),
783            PipelineCacheMissReason::DispatchPolicyChanged
784        );
785        assert_eq!(
786            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
787                total_entries: 3,
788                same_program_entries: 2,
789                same_program_and_policy_entries: 1,
790                same_program_policy_and_device_entries: 0,
791            }),
792            PipelineCacheMissReason::DeviceOrRuntimeChanged
793        );
794        assert_eq!(
795            PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
796                total_entries: 3,
797                same_program_entries: 2,
798                same_program_and_policy_entries: 1,
799                same_program_policy_and_device_entries: 1,
800            }),
801            PipelineCacheMissReason::KeyAbsent
802        );
803    }
804
805    #[test]
806    fn miss_reason_metric_suffixes_are_stable_snake_case() {
807        assert_eq!(
808            PipelineCacheMissReason::EmptyCache.metric_suffix(),
809            "empty_cache"
810        );
811        assert_eq!(
812            PipelineCacheMissReason::ProgramChanged.metric_suffix(),
813            "program_changed"
814        );
815        assert_eq!(
816            PipelineCacheMissReason::DispatchPolicyChanged.metric_suffix(),
817            "dispatch_policy_changed"
818        );
819        assert_eq!(
820            PipelineCacheMissReason::DeviceOrRuntimeChanged.metric_suffix(),
821            "device_or_runtime_changed"
822        );
823        assert_eq!(
824            PipelineCacheMissReason::KeyAbsent.metric_suffix(),
825            "key_absent"
826        );
827    }
828
829    #[test]
830    fn miss_reason_classifies_cached_shared_identities() {
831        let program = hash32(1);
832        let other_program = hash32(2);
833        let policy = hash32(3);
834        let other_policy = hash32(4);
835        let device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
836        let other_device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
837        let requested = PipelineCacheIdentity::from_parts(program, policy, device);
838
839        assert_eq!(
840            PipelineCacheMissReason::classify_identities([].iter(), &requested),
841            PipelineCacheMissReason::EmptyCache
842        );
843        assert_eq!(
844            PipelineCacheMissReason::classify_identities(
845                [PipelineCacheIdentity::from_parts(
846                    other_program,
847                    policy,
848                    device
849                )]
850                .iter(),
851                &requested,
852            ),
853            PipelineCacheMissReason::ProgramChanged
854        );
855        assert_eq!(
856            PipelineCacheMissReason::classify_identities(
857                [PipelineCacheIdentity::from_parts(
858                    program,
859                    other_policy,
860                    device
861                )]
862                .iter(),
863                &requested,
864            ),
865            PipelineCacheMissReason::DispatchPolicyChanged
866        );
867        assert_eq!(
868            PipelineCacheMissReason::classify_identities(
869                [PipelineCacheIdentity::from_parts(
870                    program,
871                    policy,
872                    other_device
873                )]
874                .iter(),
875                &requested,
876            ),
877            PipelineCacheMissReason::DeviceOrRuntimeChanged
878        );
879        assert_eq!(
880            PipelineCacheMissReason::classify_identities(
881                [PipelineCacheIdentity::from_parts(program, policy, device)].iter(),
882                &requested,
883            ),
884            PipelineCacheMissReason::KeyAbsent
885        );
886    }
887
888    #[test]
889    fn poisoned_pending_flush_lock_returns_structured_error() {
890        let cache = Arc::new(DiskPipelineCache {
891            root: std::env::temp_dir(),
892            pending_flushes: std::sync::Mutex::new(Vec::new()),
893        });
894        let poisoned = Arc::clone(&cache);
895        let _ = std::thread::spawn(move || {
896            let _guard = poisoned
897                .try_lock_pending_flushes()
898                .expect("Fix: first pending-flush lock acquisition should succeed");
899            panic!("poison disk pipeline cache pending flushes");
900        })
901        .join();
902
903        let error = cache
904            .flush()
905            .expect_err("poisoned disk pipeline cache must return an io error");
906        let message = error.to_string();
907        assert!(
908            message.contains("pending-flush lock was poisoned"),
909            "{message}"
910        );
911    }
912}