1use super::hashing::{
4 dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode,
5 normalized_program_cache_digest, try_normalized_program_cache_digest,
6 PipelineDeviceFingerprint,
7};
8use super::CURRENT_PIPELINE_CACHE_KEY_VERSION;
9use crate::backend::DispatchConfig;
10use std::sync::{Arc, MutexGuard};
11use vyre_foundation::ir::Program;
12use vyre_spec::BackendId;
13
14pub const MAX_DISK_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
16
17pub struct DiskPipelineCache {
19 root: std::path::PathBuf,
20 pending_flushes: std::sync::Mutex<Vec<std::path::PathBuf>>,
21}
22
23impl DiskPipelineCache {
24 fn try_lock_pending_flushes(&self) -> std::io::Result<MutexGuard<'_, Vec<std::path::PathBuf>>> {
25 self.pending_flushes.lock().map_err(|error| {
26 std::io::Error::other(format!(
27 "Vyre disk pipeline cache pending-flush lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could lose or duplicate compiled-pipeline fsync work."
28 ))
29 })
30 }
31
32 pub fn open(root: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
38 let root = root.into();
39 std::fs::create_dir_all(&root)?;
40 Ok(Self {
41 root,
42 pending_flushes: std::sync::Mutex::new(Vec::new()),
43 })
44 }
45
46 #[must_use]
48 pub fn default_root() -> std::path::PathBuf {
49 if let Some(xdg) = std::env::var_os("XDG_CACHE_HOME") {
50 return std::path::PathBuf::from(xdg).join("vyre").join("pipelines");
51 }
52 if let Some(home) = std::env::var_os("HOME") {
53 #[cfg(target_os = "macos")]
54 {
55 return std::path::PathBuf::from(home)
56 .join("Library")
57 .join("Caches")
58 .join("vyre")
59 .join("pipelines");
60 }
61 #[cfg(not(target_os = "macos"))]
62 {
63 return std::path::PathBuf::from(home)
64 .join(".cache")
65 .join("vyre")
66 .join("pipelines");
67 }
68 }
69 if let Some(appdata) = std::env::var_os("LOCALAPPDATA") {
70 return std::path::PathBuf::from(appdata)
71 .join("vyre")
72 .join("pipelines");
73 }
74 std::path::PathBuf::from("./vyre-cache/pipelines")
75 }
76
77 #[must_use]
79 pub fn path_for(
80 &self,
81 program_digest: [u8; 32],
82 fingerprint: PipelineDeviceFingerprint,
83 ) -> std::path::PathBuf {
84 let key = fingerprint.cache_key(program_digest);
85 let mut file_name = hex_encode(&key);
86 let mut path = self.root.join(&file_name[..2]);
87 file_name.push_str(".bin");
88 path.push(file_name);
89 path
90 }
91
92 pub fn read(
98 &self,
99 program_digest: [u8; 32],
100 fingerprint: PipelineDeviceFingerprint,
101 ) -> std::io::Result<Option<Vec<u8>>> {
102 let path = self.path_for(program_digest, fingerprint);
103 match read_bounded(&path, MAX_DISK_PIPELINE_BLOB_BYTES) {
104 Ok(bytes) => Ok(Some(bytes)),
105 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
106 Err(error) => Err(error),
107 }
108 }
109
110 pub fn write(
116 &self,
117 program_digest: [u8; 32],
118 fingerprint: PipelineDeviceFingerprint,
119 bytes: &[u8],
120 ) -> std::io::Result<()> {
121 if bytes.len() as u64 > MAX_DISK_PIPELINE_BLOB_BYTES {
122 return Err(std::io::Error::new(
123 std::io::ErrorKind::InvalidInput,
124 format!("pipeline cache blob exceeds {MAX_DISK_PIPELINE_BLOB_BYTES} byte limit"),
125 ));
126 }
127 let path = self.path_for(program_digest, fingerprint);
128 if let Some(parent) = path.parent() {
129 std::fs::create_dir_all(parent)?;
130 }
131 let tmp = self.tmp_path_for(&path);
132 let write_result = (|| -> std::io::Result<()> {
133 let mut file = std::fs::File::create(&tmp)?;
134 use std::io::Write as _;
135 file.write_all(bytes)?;
136 drop(file);
137 std::fs::rename(&tmp, &path)
138 })();
139 if write_result.is_err() {
140 remove_failed_atomic_write(&tmp)?;
141 }
142 write_result?;
143 self.try_lock_pending_flushes()?.push(path);
144 Ok(())
145 }
146
147 pub fn flush(&self) -> std::io::Result<()> {
153 let paths = {
154 let mut pending = self.try_lock_pending_flushes()?;
155 pending.sort();
156 pending.dedup();
157 std::mem::take(&mut *pending)
158 };
159 if let Err(error) = flush_paths(&paths) {
160 self.try_lock_pending_flushes()?.extend(paths);
161 return Err(error);
162 }
163 Ok(())
164 }
165
166 pub fn invalidate_impacted(
172 &self,
173 impact_mask: &[u32],
174 program_digests: &[[u8; 32]],
175 fingerprint: PipelineDeviceFingerprint,
176 ) -> std::io::Result<()> {
177 for (index, &is_impacted) in impact_mask.iter().enumerate() {
178 if is_impacted != 0 {
179 if let Some(&digest) = program_digests.get(index) {
180 let path = self.path_for(digest, fingerprint);
181 if path.exists() {
182 std::fs::remove_file(path)?;
183 }
184 }
185 }
186 }
187 Ok(())
188 }
189
190 #[must_use]
192 pub fn root(&self) -> &std::path::Path {
193 &self.root
194 }
195
196 fn tmp_path_for(&self, path: &std::path::Path) -> std::path::PathBuf {
197 static TMP_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
198 let tmp_id = TMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
199 path.with_extension(format!("bin.tmp.{}.{}", std::process::id(), tmp_id))
200 }
201}
202
203fn remove_failed_atomic_write(path: &std::path::Path) -> std::io::Result<()> {
204 match std::fs::remove_file(path) {
205 Ok(()) => Ok(()),
206 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
207 Err(error) => Err(error),
208 }
209}
210
211fn read_bounded(path: &std::path::Path, max_bytes: u64) -> std::io::Result<Vec<u8>> {
212 use std::io::Read as _;
213
214 let mut file = std::fs::File::open(path)?;
215 let metadata = file.metadata()?;
216 if metadata.len() > max_bytes {
217 return Err(std::io::Error::new(
218 std::io::ErrorKind::InvalidData,
219 format!("pipeline cache blob exceeds {max_bytes} byte limit"),
220 ));
221 }
222 let byte_capacity = usize::try_from(metadata.len()).map_err(|error| {
223 std::io::Error::new(
224 std::io::ErrorKind::InvalidData,
225 format!(
226 "pipeline cache blob length {} does not fit usize: {error}",
227 metadata.len()
228 ),
229 )
230 })?;
231 let mut bytes = Vec::new();
232 crate::allocation::try_reserve_vec_to_capacity(&mut bytes, byte_capacity).map_err(|error| {
233 std::io::Error::new(
234 std::io::ErrorKind::OutOfMemory,
235 format!(
236 "pipeline cache bounded read could not reserve {byte_capacity} byte(s): {error}. Fix: lower the pipeline cache blob limit or evict oversized entries."
237 ),
238 )
239 })?;
240 file.by_ref().take(max_bytes + 1).read_to_end(&mut bytes)?;
241 if bytes.len() as u64 > max_bytes {
242 return Err(std::io::Error::new(
243 std::io::ErrorKind::InvalidData,
244 format!("pipeline cache blob exceeded {max_bytes} byte bounded read limit"),
245 ));
246 }
247 Ok(bytes)
248}
249
250fn flush_paths(paths: &[std::path::PathBuf]) -> std::io::Result<()> {
251 let mut parents = Vec::new();
252 crate::allocation::try_reserve_vec_to_capacity(&mut parents, paths.len()).map_err(|error| {
253 std::io::Error::new(
254 std::io::ErrorKind::OutOfMemory,
255 format!(
256 "pipeline cache flush could not reserve {} parent path slot(s): {error}. Fix: flush fewer cache paths per batch.",
257 paths.len()
258 ),
259 )
260 })?;
261 sync_files_bounded(
262 paths,
263 std::fs::File::sync_data,
264 "disk cache file sync worker panicked",
265 )?;
266 for path in paths {
267 if let Some(parent) = path.parent() {
268 parents.push(parent.to_path_buf());
269 }
270 }
271 parents.sort();
272 parents.dedup();
273 sync_parent_dirs(&parents)?;
274 Ok(())
275}
276
277#[cfg(unix)]
278fn sync_parent_dirs(parents: &[std::path::PathBuf]) -> std::io::Result<()> {
279 sync_files_bounded(
280 parents,
281 std::fs::File::sync_all,
282 "disk cache dir sync worker panicked",
283 )
284}
285
286#[cfg(not(unix))]
287fn sync_parent_dirs(_parents: &[std::path::PathBuf]) -> std::io::Result<()> {
288 Ok(())
289}
290
291fn sync_files_bounded(
292 paths: &[std::path::PathBuf],
293 sync: fn(&std::fs::File) -> std::io::Result<()>,
294 panic_message: &'static str,
295) -> std::io::Result<()> {
296 if paths.is_empty() {
297 return Ok(());
298 }
299 let workers = std::thread::available_parallelism()
300 .map(usize::from)
301 .unwrap_or(1)
302 .clamp(1, 16);
303 for chunk in paths.chunks(workers) {
304 std::thread::scope(|scope| {
305 let mut handles = Vec::new();
306 crate::allocation::try_reserve_vec_to_capacity(&mut handles, chunk.len()).map_err(|error| {
307 std::io::Error::new(
308 std::io::ErrorKind::OutOfMemory,
309 format!(
310 "pipeline cache sync could not reserve {} worker handle(s): {error}. Fix: lower pipeline cache sync fan-out.",
311 chunk.len()
312 ),
313 )
314 })?;
315 for path in chunk {
316 handles.push(scope.spawn(move || {
317 let file = std::fs::File::open(path)?;
318 sync(&file)
319 }));
320 }
321 for handle in handles {
322 handle
323 .join()
324 .map_err(|_| std::io::Error::other(panic_message))??;
325 }
326 Ok::<(), std::io::Error>(())
327 })?;
328 }
329 Ok(())
330}
331
332#[derive(
343 Copy, Clone, Debug, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize,
344)]
345pub struct PipelineFeatureFlags(pub u32);
346
347impl PipelineFeatureFlags {
348 pub const SUBGROUP_OPS: Self = Self(1 << 0);
351 pub const F16: Self = Self(1 << 1);
353 pub const BF16: Self = Self(1 << 2);
355 pub const TENSOR_CORES: Self = Self(1 << 3);
358 pub const ASYNC_COMPUTE: Self = Self(1 << 4);
360 pub const PUSH_CONSTANTS: Self = Self(1 << 5);
362 pub const INDIRECT_DISPATCH: Self = Self(1 << 6);
364 pub const SPECULATIVE: Self = Self(1 << 7);
367 pub const PERSISTENT_THREAD: Self = Self(1 << 8);
370
371 #[must_use]
373 pub const fn empty() -> Self {
374 Self(0)
375 }
376
377 #[must_use]
379 pub const fn contains(self, other: Self) -> bool {
380 (self.0 & other.0) == other.0
381 }
382
383 #[must_use]
385 pub const fn union(self, other: Self) -> Self {
386 Self(self.0 | other.0)
387 }
388
389 #[must_use]
391 pub const fn bits(self) -> u32 {
392 self.0
393 }
394}
395
396#[derive(Clone, Debug, Eq, PartialEq, Hash)]
412
413pub struct PipelineCacheKey {
414 pub version: u32,
417 pub shader_hash: [u8; 32],
419 pub bind_group_layout_hash: [u8; 32],
423 pub push_constant_size: u32,
427 pub workgroup_size: [u32; 3],
429 pub feature_flags: PipelineFeatureFlags,
431 pub backend_id: BackendId,
434 #[allow(dead_code)]
437 __phantom: core::marker::PhantomData<()>,
438}
439
440impl PipelineCacheKey {
441 #[must_use]
443 #[allow(clippy::too_many_arguments)]
444 pub fn new(
445 shader_hash: [u8; 32],
446 bind_group_layout_hash: [u8; 32],
447 push_constant_size: u32,
448 workgroup_size: [u32; 3],
449 feature_flags: PipelineFeatureFlags,
450 backend_id: BackendId,
451 ) -> Self {
452 Self {
453 version: CURRENT_PIPELINE_CACHE_KEY_VERSION,
454 shader_hash,
455 bind_group_layout_hash,
456 push_constant_size,
457 workgroup_size,
458 feature_flags,
459 backend_id,
460 __phantom: core::marker::PhantomData,
461 }
462 }
463}
464
465#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
472pub struct PipelineCacheIdentity {
473 pub digest: [u8; 32],
475 pub program_digest: [u8; 32],
477 pub policy_digest: [u8; 32],
479 pub device_fingerprint: PipelineDeviceFingerprint,
481}
482
483impl PipelineCacheIdentity {
484 #[must_use]
486 pub fn from_parts(
487 program_digest: [u8; 32],
488 policy_digest: [u8; 32],
489 device_fingerprint: PipelineDeviceFingerprint,
490 ) -> Self {
491 let mut hasher = blake3::Hasher::new();
492 hasher.update(b"vyre-pipeline-cache-identity-v1\0program\0");
493 hasher.update(&program_digest);
494 hasher.update(b"\0policy\0");
495 hasher.update(&policy_digest);
496 hasher.update(b"\0vendor\0");
497 hasher.update(&device_fingerprint.vendor.to_le_bytes());
498 hasher.update(b"\0device\0");
499 hasher.update(&device_fingerprint.device.to_le_bytes());
500 hasher.update(b"\0driver\0");
501 hasher.update(&device_fingerprint.driver_digest);
502 Self {
503 digest: *hasher.finalize().as_bytes(),
504 program_digest,
505 policy_digest,
506 device_fingerprint,
507 }
508 }
509
510 pub fn try_from_program(
518 program: &Program,
519 config: &DispatchConfig,
520 device_fingerprint: PipelineDeviceFingerprint,
521 ) -> Result<Self, String> {
522 let program_digest = try_normalized_program_cache_digest(program)?;
523 let policy_digest = dispatch_policy_cache_digest(config);
524 Ok(Self::from_parts(
525 program_digest,
526 policy_digest,
527 device_fingerprint,
528 ))
529 }
530}
531
532#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
538pub struct PipelineCacheMissEvidence {
539 pub total_entries: usize,
541 pub same_program_entries: usize,
543 pub same_program_and_policy_entries: usize,
545 pub same_program_policy_and_device_entries: usize,
548}
549
550impl PipelineCacheMissEvidence {
551 #[must_use]
553 pub fn from_identities<'a>(
554 cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
555 requested: &PipelineCacheIdentity,
556 ) -> Self {
557 let mut evidence = Self::default();
558 for identity in cached {
559 evidence.total_entries += 1;
560 if identity.program_digest == requested.program_digest {
561 evidence.same_program_entries += 1;
562 if identity.policy_digest == requested.policy_digest {
563 evidence.same_program_and_policy_entries += 1;
564 if identity.device_fingerprint == requested.device_fingerprint {
565 evidence.same_program_policy_and_device_entries += 1;
566 }
567 }
568 }
569 }
570 evidence
571 }
572}
573
574#[derive(Copy, Clone, Debug, Eq, PartialEq)]
576pub enum PipelineCacheMissReason {
577 EmptyCache,
579 ProgramChanged,
581 DispatchPolicyChanged,
584 DeviceOrRuntimeChanged,
586 KeyAbsent,
589}
590
591impl PipelineCacheMissReason {
592 #[must_use]
594 pub const fn classify(evidence: PipelineCacheMissEvidence) -> Self {
595 if evidence.total_entries == 0 {
596 Self::EmptyCache
597 } else if evidence.same_program_entries == 0 {
598 Self::ProgramChanged
599 } else if evidence.same_program_and_policy_entries == 0 {
600 Self::DispatchPolicyChanged
601 } else if evidence.same_program_policy_and_device_entries == 0 {
602 Self::DeviceOrRuntimeChanged
603 } else {
604 Self::KeyAbsent
605 }
606 }
607
608 #[must_use]
610 pub fn classify_identities<'a>(
611 cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
612 requested: &PipelineCacheIdentity,
613 ) -> Self {
614 Self::classify(PipelineCacheMissEvidence::from_identities(
615 cached, requested,
616 ))
617 }
618
619 #[must_use]
621 pub const fn metric_suffix(self) -> &'static str {
622 match self {
623 Self::EmptyCache => "empty_cache",
624 Self::ProgramChanged => "program_changed",
625 Self::DispatchPolicyChanged => "dispatch_policy_changed",
626 Self::DeviceOrRuntimeChanged => "device_or_runtime_changed",
627 Self::KeyAbsent => "key_absent",
628 }
629 }
630}
631
632#[cfg(test)]
633
634mod pipeline_cache_key_tests {
635 use super::*;
636
637 fn hash32(byte: u8) -> [u8; 32] {
638 [byte; 32]
639 }
640
641 #[test]
642 fn different_workgroup_size_differs() {
643 let a = PipelineCacheKey::new(
644 hash32(1),
645 hash32(2),
646 0,
647 [64, 1, 1],
648 PipelineFeatureFlags::empty(),
649 BackendId::from("backend-a"),
650 );
651 let b = PipelineCacheKey::new(
652 hash32(1),
653 hash32(2),
654 0,
655 [128, 1, 1],
656 PipelineFeatureFlags::empty(),
657 BackendId::from("backend-a"),
658 );
659 assert_ne!(a, b);
660 }
661
662 #[test]
663 fn different_feature_flags_differ() {
664 let a = PipelineCacheKey::new(
665 hash32(1),
666 hash32(2),
667 0,
668 [1, 1, 1],
669 PipelineFeatureFlags::empty(),
670 BackendId::from("backend-a"),
671 );
672 let b = PipelineCacheKey::new(
673 hash32(1),
674 hash32(2),
675 0,
676 [1, 1, 1],
677 PipelineFeatureFlags::SUBGROUP_OPS,
678 BackendId::from("backend-a"),
679 );
680 assert_ne!(a, b);
681 }
682
683 #[test]
684 fn different_backend_id_differs() {
685 let a = PipelineCacheKey::new(
686 hash32(1),
687 hash32(2),
688 0,
689 [1, 1, 1],
690 PipelineFeatureFlags::empty(),
691 BackendId::from("backend-a"),
692 );
693 let b = PipelineCacheKey::new(
694 hash32(1),
695 hash32(2),
696 0,
697 [1, 1, 1],
698 PipelineFeatureFlags::empty(),
699 BackendId::from("backend-b"),
700 );
701 assert_ne!(a, b);
702 }
703
704 #[test]
705 fn flag_containment_is_correct() {
706 let a = PipelineFeatureFlags::SUBGROUP_OPS.union(PipelineFeatureFlags::F16);
707 assert!(a.contains(PipelineFeatureFlags::SUBGROUP_OPS));
708 assert!(a.contains(PipelineFeatureFlags::F16));
709 assert!(!a.contains(PipelineFeatureFlags::TENSOR_CORES));
710 }
711
712 #[test]
713 fn version_is_current() {
714 let k = PipelineCacheKey::new(
715 hash32(1),
716 hash32(2),
717 0,
718 [1, 1, 1],
719 PipelineFeatureFlags::empty(),
720 BackendId::from("backend-a"),
721 );
722 assert_eq!(k.version, CURRENT_PIPELINE_CACHE_KEY_VERSION);
723 }
724
725 #[test]
726 fn shared_cache_identity_separates_program_policy_and_device_facts() {
727 let program_a = hash32(1);
728 let program_b = hash32(2);
729 let policy_a = hash32(3);
730 let policy_b = hash32(4);
731 let device_a = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
732 let device_b = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
733
734 let base = PipelineCacheIdentity::from_parts(program_a, policy_a, device_a);
735
736 assert_eq!(base.program_digest, program_a);
737 assert_eq!(base.policy_digest, policy_a);
738 assert_eq!(base.device_fingerprint, device_a);
739 assert_ne!(
740 base.digest,
741 PipelineCacheIdentity::from_parts(program_b, policy_a, device_a).digest,
742 "Fix: shared pipeline cache identity must include the normalized Program digest."
743 );
744 assert_ne!(
745 base.digest,
746 PipelineCacheIdentity::from_parts(program_a, policy_b, device_a).digest,
747 "Fix: shared pipeline cache identity must include dispatch policy as its own tuple field."
748 );
749 assert_ne!(
750 base.digest,
751 PipelineCacheIdentity::from_parts(program_a, policy_a, device_b).digest,
752 "Fix: shared pipeline cache identity must include device/runtime fingerprint facts."
753 );
754 }
755
756 #[test]
757 fn miss_reason_classifies_adjacent_identity_evidence() {
758 assert_eq!(
759 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
760 total_entries: 0,
761 same_program_entries: 0,
762 same_program_and_policy_entries: 0,
763 same_program_policy_and_device_entries: 0,
764 }),
765 PipelineCacheMissReason::EmptyCache
766 );
767 assert_eq!(
768 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
769 total_entries: 3,
770 same_program_entries: 0,
771 same_program_and_policy_entries: 0,
772 same_program_policy_and_device_entries: 0,
773 }),
774 PipelineCacheMissReason::ProgramChanged
775 );
776 assert_eq!(
777 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
778 total_entries: 3,
779 same_program_entries: 2,
780 same_program_and_policy_entries: 0,
781 same_program_policy_and_device_entries: 0,
782 }),
783 PipelineCacheMissReason::DispatchPolicyChanged
784 );
785 assert_eq!(
786 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
787 total_entries: 3,
788 same_program_entries: 2,
789 same_program_and_policy_entries: 1,
790 same_program_policy_and_device_entries: 0,
791 }),
792 PipelineCacheMissReason::DeviceOrRuntimeChanged
793 );
794 assert_eq!(
795 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
796 total_entries: 3,
797 same_program_entries: 2,
798 same_program_and_policy_entries: 1,
799 same_program_policy_and_device_entries: 1,
800 }),
801 PipelineCacheMissReason::KeyAbsent
802 );
803 }
804
805 #[test]
806 fn miss_reason_metric_suffixes_are_stable_snake_case() {
807 assert_eq!(
808 PipelineCacheMissReason::EmptyCache.metric_suffix(),
809 "empty_cache"
810 );
811 assert_eq!(
812 PipelineCacheMissReason::ProgramChanged.metric_suffix(),
813 "program_changed"
814 );
815 assert_eq!(
816 PipelineCacheMissReason::DispatchPolicyChanged.metric_suffix(),
817 "dispatch_policy_changed"
818 );
819 assert_eq!(
820 PipelineCacheMissReason::DeviceOrRuntimeChanged.metric_suffix(),
821 "device_or_runtime_changed"
822 );
823 assert_eq!(
824 PipelineCacheMissReason::KeyAbsent.metric_suffix(),
825 "key_absent"
826 );
827 }
828
829 #[test]
830 fn miss_reason_classifies_cached_shared_identities() {
831 let program = hash32(1);
832 let other_program = hash32(2);
833 let policy = hash32(3);
834 let other_policy = hash32(4);
835 let device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
836 let other_device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
837 let requested = PipelineCacheIdentity::from_parts(program, policy, device);
838
839 assert_eq!(
840 PipelineCacheMissReason::classify_identities([].iter(), &requested),
841 PipelineCacheMissReason::EmptyCache
842 );
843 assert_eq!(
844 PipelineCacheMissReason::classify_identities(
845 [PipelineCacheIdentity::from_parts(
846 other_program,
847 policy,
848 device
849 )]
850 .iter(),
851 &requested,
852 ),
853 PipelineCacheMissReason::ProgramChanged
854 );
855 assert_eq!(
856 PipelineCacheMissReason::classify_identities(
857 [PipelineCacheIdentity::from_parts(
858 program,
859 other_policy,
860 device
861 )]
862 .iter(),
863 &requested,
864 ),
865 PipelineCacheMissReason::DispatchPolicyChanged
866 );
867 assert_eq!(
868 PipelineCacheMissReason::classify_identities(
869 [PipelineCacheIdentity::from_parts(
870 program,
871 policy,
872 other_device
873 )]
874 .iter(),
875 &requested,
876 ),
877 PipelineCacheMissReason::DeviceOrRuntimeChanged
878 );
879 assert_eq!(
880 PipelineCacheMissReason::classify_identities(
881 [PipelineCacheIdentity::from_parts(program, policy, device)].iter(),
882 &requested,
883 ),
884 PipelineCacheMissReason::KeyAbsent
885 );
886 }
887
888 #[test]
889 fn poisoned_pending_flush_lock_returns_structured_error() {
890 let cache = Arc::new(DiskPipelineCache {
891 root: std::env::temp_dir(),
892 pending_flushes: std::sync::Mutex::new(Vec::new()),
893 });
894 let poisoned = Arc::clone(&cache);
895 let _ = std::thread::spawn(move || {
896 let _guard = poisoned
897 .try_lock_pending_flushes()
898 .expect("Fix: first pending-flush lock acquisition should succeed");
899 panic!("poison disk pipeline cache pending flushes");
900 })
901 .join();
902
903 let error = cache
904 .flush()
905 .expect_err("poisoned disk pipeline cache must return an io error");
906 let message = error.to_string();
907 assert!(
908 message.contains("pending-flush lock was poisoned"),
909 "{message}"
910 );
911 }
912}