1use super::hashing::{
4 dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode,
5 normalized_program_cache_digest, try_normalized_program_cache_digest, PipelineDeviceFingerprint,
6};
7use super::CURRENT_PIPELINE_CACHE_KEY_VERSION;
8use crate::backend::DispatchConfig;
9use std::sync::{Arc, MutexGuard};
10use vyre_foundation::ir::Program;
11use vyre_spec::BackendId;
12
13pub const MAX_DISK_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
15
16pub struct DiskPipelineCache {
18 root: std::path::PathBuf,
19 pending_flushes: std::sync::Mutex<Vec<std::path::PathBuf>>,
20}
21
22impl DiskPipelineCache {
23 fn lock_pending_flushes(&self) -> MutexGuard<'_, Vec<std::path::PathBuf>> {
24 self.pending_flushes.lock().unwrap_or_else(|error| {
25 panic!(
26 "Vyre disk pipeline cache pending-flush lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could lose or duplicate compiled-pipeline fsync work."
27 )
28 })
29 }
30
31 pub fn open(root: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
37 let root = root.into();
38 std::fs::create_dir_all(&root)?;
39 Ok(Self {
40 root,
41 pending_flushes: std::sync::Mutex::new(Vec::new()),
42 })
43 }
44
45 #[must_use]
47 pub fn default_root() -> std::path::PathBuf {
48 if let Some(xdg) = std::env::var_os("XDG_CACHE_HOME") {
49 return std::path::PathBuf::from(xdg).join("vyre").join("pipelines");
50 }
51 if let Some(home) = std::env::var_os("HOME") {
52 #[cfg(target_os = "macos")]
53 {
54 return std::path::PathBuf::from(home)
55 .join("Library")
56 .join("Caches")
57 .join("vyre")
58 .join("pipelines");
59 }
60 #[cfg(not(target_os = "macos"))]
61 {
62 return std::path::PathBuf::from(home)
63 .join(".cache")
64 .join("vyre")
65 .join("pipelines");
66 }
67 }
68 if let Some(appdata) = std::env::var_os("LOCALAPPDATA") {
69 return std::path::PathBuf::from(appdata)
70 .join("vyre")
71 .join("pipelines");
72 }
73 std::path::PathBuf::from("./vyre-cache/pipelines")
74 }
75
76 #[must_use]
78 pub fn path_for(
79 &self,
80 program_digest: [u8; 32],
81 fingerprint: PipelineDeviceFingerprint,
82 ) -> std::path::PathBuf {
83 let key = fingerprint.cache_key(program_digest);
84 let mut file_name = hex_encode(&key);
85 let mut path = self.root.join(&file_name[..2]);
86 file_name.push_str(".bin");
87 path.push(file_name);
88 path
89 }
90
91 pub fn read(
97 &self,
98 program_digest: [u8; 32],
99 fingerprint: PipelineDeviceFingerprint,
100 ) -> std::io::Result<Option<Vec<u8>>> {
101 let path = self.path_for(program_digest, fingerprint);
102 match read_bounded(&path, MAX_DISK_PIPELINE_BLOB_BYTES) {
103 Ok(bytes) => Ok(Some(bytes)),
104 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
105 Err(error) => Err(error),
106 }
107 }
108
109 pub fn write(
115 &self,
116 program_digest: [u8; 32],
117 fingerprint: PipelineDeviceFingerprint,
118 bytes: &[u8],
119 ) -> std::io::Result<()> {
120 if bytes.len() as u64 > MAX_DISK_PIPELINE_BLOB_BYTES {
121 return Err(std::io::Error::new(
122 std::io::ErrorKind::InvalidInput,
123 format!("pipeline cache blob exceeds {MAX_DISK_PIPELINE_BLOB_BYTES} byte limit"),
124 ));
125 }
126 let path = self.path_for(program_digest, fingerprint);
127 if let Some(parent) = path.parent() {
128 std::fs::create_dir_all(parent)?;
129 }
130 let tmp = self.tmp_path_for(&path);
131 let write_result = (|| -> std::io::Result<()> {
132 let mut file = std::fs::File::create(&tmp)?;
133 use std::io::Write as _;
134 file.write_all(bytes)?;
135 drop(file);
136 std::fs::rename(&tmp, &path)
137 })();
138 if write_result.is_err() {
139 remove_failed_atomic_write(&tmp)?;
140 }
141 write_result?;
142 self.lock_pending_flushes().push(path);
143 Ok(())
144 }
145
146 pub fn flush(&self) -> std::io::Result<()> {
152 let paths = {
153 let mut pending = self.lock_pending_flushes();
154 pending.sort();
155 pending.dedup();
156 std::mem::take(&mut *pending)
157 };
158 if let Err(error) = flush_paths(&paths) {
159 self.lock_pending_flushes().extend(paths);
160 return Err(error);
161 }
162 Ok(())
163 }
164
165 pub fn invalidate_impacted(
171 &self,
172 impact_mask: &[u32],
173 program_digests: &[[u8; 32]],
174 fingerprint: PipelineDeviceFingerprint,
175 ) -> std::io::Result<()> {
176 for (index, &is_impacted) in impact_mask.iter().enumerate() {
177 if is_impacted != 0 {
178 if let Some(&digest) = program_digests.get(index) {
179 let path = self.path_for(digest, fingerprint);
180 if path.exists() {
181 std::fs::remove_file(path)?;
182 }
183 }
184 }
185 }
186 Ok(())
187 }
188
189 #[must_use]
191 pub fn root(&self) -> &std::path::Path {
192 &self.root
193 }
194
195 fn tmp_path_for(&self, path: &std::path::Path) -> std::path::PathBuf {
196 static TMP_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
197 let tmp_id = TMP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
198 path.with_extension(format!("bin.tmp.{}.{}", std::process::id(), tmp_id))
199 }
200}
201
202fn remove_failed_atomic_write(path: &std::path::Path) -> std::io::Result<()> {
203 match std::fs::remove_file(path) {
204 Ok(()) => Ok(()),
205 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
206 Err(error) => Err(error),
207 }
208}
209
210fn read_bounded(path: &std::path::Path, max_bytes: u64) -> std::io::Result<Vec<u8>> {
211 use std::io::Read as _;
212
213 let mut file = std::fs::File::open(path)?;
214 let metadata = file.metadata()?;
215 if metadata.len() > max_bytes {
216 return Err(std::io::Error::new(
217 std::io::ErrorKind::InvalidData,
218 format!("pipeline cache blob exceeds {max_bytes} byte limit"),
219 ));
220 }
221 let byte_capacity = usize::try_from(metadata.len()).map_err(|error| {
222 std::io::Error::new(
223 std::io::ErrorKind::InvalidData,
224 format!(
225 "pipeline cache blob length {} does not fit usize: {error}",
226 metadata.len()
227 ),
228 )
229 })?;
230 let mut bytes = Vec::new();
231 crate::allocation::try_reserve_vec_to_capacity(&mut bytes, byte_capacity).map_err(|error| {
232 std::io::Error::new(
233 std::io::ErrorKind::OutOfMemory,
234 format!(
235 "pipeline cache bounded read could not reserve {byte_capacity} byte(s): {error}. Fix: lower the pipeline cache blob limit or evict oversized entries."
236 ),
237 )
238 })?;
239 file.by_ref().take(max_bytes + 1).read_to_end(&mut bytes)?;
240 if bytes.len() as u64 > max_bytes {
241 return Err(std::io::Error::new(
242 std::io::ErrorKind::InvalidData,
243 format!("pipeline cache blob exceeded {max_bytes} byte bounded read limit"),
244 ));
245 }
246 Ok(bytes)
247}
248
249fn flush_paths(paths: &[std::path::PathBuf]) -> std::io::Result<()> {
250 let mut parents = Vec::new();
251 crate::allocation::try_reserve_vec_to_capacity(&mut parents, paths.len()).map_err(|error| {
252 std::io::Error::new(
253 std::io::ErrorKind::OutOfMemory,
254 format!(
255 "pipeline cache flush could not reserve {} parent path slot(s): {error}. Fix: flush fewer cache paths per batch.",
256 paths.len()
257 ),
258 )
259 })?;
260 sync_files_bounded(
261 paths,
262 std::fs::File::sync_data,
263 "disk cache file sync worker panicked",
264 )?;
265 for path in paths {
266 if let Some(parent) = path.parent() {
267 parents.push(parent.to_path_buf());
268 }
269 }
270 parents.sort();
271 parents.dedup();
272 sync_parent_dirs(&parents)?;
273 Ok(())
274}
275
276#[cfg(unix)]
277fn sync_parent_dirs(parents: &[std::path::PathBuf]) -> std::io::Result<()> {
278 sync_files_bounded(
279 parents,
280 std::fs::File::sync_all,
281 "disk cache dir sync worker panicked",
282 )
283}
284
285#[cfg(not(unix))]
286fn sync_parent_dirs(_parents: &[std::path::PathBuf]) -> std::io::Result<()> {
287 Ok(())
288}
289
290fn sync_files_bounded(
291 paths: &[std::path::PathBuf],
292 sync: fn(&std::fs::File) -> std::io::Result<()>,
293 panic_message: &'static str,
294) -> std::io::Result<()> {
295 if paths.is_empty() {
296 return Ok(());
297 }
298 let workers = std::thread::available_parallelism()
299 .map(usize::from)
300 .unwrap_or(1)
301 .clamp(1, 16);
302 for chunk in paths.chunks(workers) {
303 std::thread::scope(|scope| {
304 let mut handles = Vec::new();
305 crate::allocation::try_reserve_vec_to_capacity(&mut handles, chunk.len()).map_err(|error| {
306 std::io::Error::new(
307 std::io::ErrorKind::OutOfMemory,
308 format!(
309 "pipeline cache sync could not reserve {} worker handle(s): {error}. Fix: lower pipeline cache sync fan-out.",
310 chunk.len()
311 ),
312 )
313 })?;
314 for path in chunk {
315 handles.push(scope.spawn(move || {
316 let file = std::fs::File::open(path)?;
317 sync(&file)
318 }));
319 }
320 for handle in handles {
321 handle
322 .join()
323 .map_err(|_| std::io::Error::other(panic_message))??;
324 }
325 Ok::<(), std::io::Error>(())
326 })?;
327 }
328 Ok(())
329}
330
331#[derive(
342 Copy, Clone, Debug, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize,
343)]
344pub struct PipelineFeatureFlags(pub u32);
345
346impl PipelineFeatureFlags {
347 pub const SUBGROUP_OPS: Self = Self(1 << 0);
350 pub const F16: Self = Self(1 << 1);
352 pub const BF16: Self = Self(1 << 2);
354 pub const TENSOR_CORES: Self = Self(1 << 3);
357 pub const ASYNC_COMPUTE: Self = Self(1 << 4);
359 pub const PUSH_CONSTANTS: Self = Self(1 << 5);
361 pub const INDIRECT_DISPATCH: Self = Self(1 << 6);
363 pub const SPECULATIVE: Self = Self(1 << 7);
366 pub const PERSISTENT_THREAD: Self = Self(1 << 8);
369
370 #[must_use]
372 pub const fn empty() -> Self {
373 Self(0)
374 }
375
376 #[must_use]
378 pub const fn contains(self, other: Self) -> bool {
379 (self.0 & other.0) == other.0
380 }
381
382 #[must_use]
384 pub const fn union(self, other: Self) -> Self {
385 Self(self.0 | other.0)
386 }
387
388 #[must_use]
390 pub const fn bits(self) -> u32 {
391 self.0
392 }
393}
394
395#[derive(Clone, Debug, Eq, PartialEq, Hash)]
411
412pub struct PipelineCacheKey {
413 pub version: u32,
416 pub shader_hash: [u8; 32],
418 pub bind_group_layout_hash: [u8; 32],
422 pub push_constant_size: u32,
426 pub workgroup_size: [u32; 3],
428 pub feature_flags: PipelineFeatureFlags,
430 pub backend_id: BackendId,
433 #[allow(dead_code)]
436 __phantom: core::marker::PhantomData<()>,
437}
438
439impl PipelineCacheKey {
440 #[must_use]
442 #[allow(clippy::too_many_arguments)]
443 pub fn new(
444 shader_hash: [u8; 32],
445 bind_group_layout_hash: [u8; 32],
446 push_constant_size: u32,
447 workgroup_size: [u32; 3],
448 feature_flags: PipelineFeatureFlags,
449 backend_id: BackendId,
450 ) -> Self {
451 Self {
452 version: CURRENT_PIPELINE_CACHE_KEY_VERSION,
453 shader_hash,
454 bind_group_layout_hash,
455 push_constant_size,
456 workgroup_size,
457 feature_flags,
458 backend_id,
459 __phantom: core::marker::PhantomData,
460 }
461 }
462}
463
464#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
471pub struct PipelineCacheIdentity {
472 pub digest: [u8; 32],
474 pub program_digest: [u8; 32],
476 pub policy_digest: [u8; 32],
478 pub device_fingerprint: PipelineDeviceFingerprint,
480}
481
482impl PipelineCacheIdentity {
483 #[must_use]
485 pub fn from_parts(
486 program_digest: [u8; 32],
487 policy_digest: [u8; 32],
488 device_fingerprint: PipelineDeviceFingerprint,
489 ) -> Self {
490 let mut hasher = blake3::Hasher::new();
491 hasher.update(b"vyre-pipeline-cache-identity-v1\0program\0");
492 hasher.update(&program_digest);
493 hasher.update(b"\0policy\0");
494 hasher.update(&policy_digest);
495 hasher.update(b"\0vendor\0");
496 hasher.update(&device_fingerprint.vendor.to_le_bytes());
497 hasher.update(b"\0device\0");
498 hasher.update(&device_fingerprint.device.to_le_bytes());
499 hasher.update(b"\0driver\0");
500 hasher.update(&device_fingerprint.driver_digest);
501 Self {
502 digest: *hasher.finalize().as_bytes(),
503 program_digest,
504 policy_digest,
505 device_fingerprint,
506 }
507 }
508
509 pub fn try_from_program(
517 program: &Program,
518 config: &DispatchConfig,
519 device_fingerprint: PipelineDeviceFingerprint,
520 ) -> Result<Self, String> {
521 let program_digest = try_normalized_program_cache_digest(program)?;
522 let policy_digest = dispatch_policy_cache_digest(config);
523 Ok(Self::from_parts(
524 program_digest,
525 policy_digest,
526 device_fingerprint,
527 ))
528 }
529}
530
531#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
537pub struct PipelineCacheMissEvidence {
538 pub total_entries: usize,
540 pub same_program_entries: usize,
542 pub same_program_and_policy_entries: usize,
544 pub same_program_policy_and_device_entries: usize,
547}
548
549impl PipelineCacheMissEvidence {
550 #[must_use]
552 pub fn from_identities<'a>(
553 cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
554 requested: &PipelineCacheIdentity,
555 ) -> Self {
556 let mut evidence = Self::default();
557 for identity in cached {
558 evidence.total_entries += 1;
559 if identity.program_digest == requested.program_digest {
560 evidence.same_program_entries += 1;
561 if identity.policy_digest == requested.policy_digest {
562 evidence.same_program_and_policy_entries += 1;
563 if identity.device_fingerprint == requested.device_fingerprint {
564 evidence.same_program_policy_and_device_entries += 1;
565 }
566 }
567 }
568 }
569 evidence
570 }
571}
572
573#[derive(Copy, Clone, Debug, Eq, PartialEq)]
575pub enum PipelineCacheMissReason {
576 EmptyCache,
578 ProgramChanged,
580 DispatchPolicyChanged,
583 DeviceOrRuntimeChanged,
585 KeyAbsent,
588}
589
590impl PipelineCacheMissReason {
591 #[must_use]
593 pub const fn classify(evidence: PipelineCacheMissEvidence) -> Self {
594 if evidence.total_entries == 0 {
595 Self::EmptyCache
596 } else if evidence.same_program_entries == 0 {
597 Self::ProgramChanged
598 } else if evidence.same_program_and_policy_entries == 0 {
599 Self::DispatchPolicyChanged
600 } else if evidence.same_program_policy_and_device_entries == 0 {
601 Self::DeviceOrRuntimeChanged
602 } else {
603 Self::KeyAbsent
604 }
605 }
606
607 #[must_use]
609 pub fn classify_identities<'a>(
610 cached: impl Iterator<Item = &'a PipelineCacheIdentity>,
611 requested: &PipelineCacheIdentity,
612 ) -> Self {
613 Self::classify(PipelineCacheMissEvidence::from_identities(
614 cached, requested,
615 ))
616 }
617
618 #[must_use]
620 pub const fn metric_suffix(self) -> &'static str {
621 match self {
622 Self::EmptyCache => "empty_cache",
623 Self::ProgramChanged => "program_changed",
624 Self::DispatchPolicyChanged => "dispatch_policy_changed",
625 Self::DeviceOrRuntimeChanged => "device_or_runtime_changed",
626 Self::KeyAbsent => "key_absent",
627 }
628 }
629}
630
631#[cfg(test)]
632
633mod pipeline_cache_key_tests {
634 use super::*;
635
636 fn hash32(byte: u8) -> [u8; 32] {
637 [byte; 32]
638 }
639
640 #[test]
641 fn different_workgroup_size_differs() {
642 let a = PipelineCacheKey::new(
643 hash32(1),
644 hash32(2),
645 0,
646 [64, 1, 1],
647 PipelineFeatureFlags::empty(),
648 BackendId::from("backend-a"),
649 );
650 let b = PipelineCacheKey::new(
651 hash32(1),
652 hash32(2),
653 0,
654 [128, 1, 1],
655 PipelineFeatureFlags::empty(),
656 BackendId::from("backend-a"),
657 );
658 assert_ne!(a, b);
659 }
660
661 #[test]
662 fn different_feature_flags_differ() {
663 let a = PipelineCacheKey::new(
664 hash32(1),
665 hash32(2),
666 0,
667 [1, 1, 1],
668 PipelineFeatureFlags::empty(),
669 BackendId::from("backend-a"),
670 );
671 let b = PipelineCacheKey::new(
672 hash32(1),
673 hash32(2),
674 0,
675 [1, 1, 1],
676 PipelineFeatureFlags::SUBGROUP_OPS,
677 BackendId::from("backend-a"),
678 );
679 assert_ne!(a, b);
680 }
681
682 #[test]
683 fn different_backend_id_differs() {
684 let a = PipelineCacheKey::new(
685 hash32(1),
686 hash32(2),
687 0,
688 [1, 1, 1],
689 PipelineFeatureFlags::empty(),
690 BackendId::from("backend-a"),
691 );
692 let b = PipelineCacheKey::new(
693 hash32(1),
694 hash32(2),
695 0,
696 [1, 1, 1],
697 PipelineFeatureFlags::empty(),
698 BackendId::from("backend-b"),
699 );
700 assert_ne!(a, b);
701 }
702
703 #[test]
704 fn flag_containment_is_correct() {
705 let a = PipelineFeatureFlags::SUBGROUP_OPS.union(PipelineFeatureFlags::F16);
706 assert!(a.contains(PipelineFeatureFlags::SUBGROUP_OPS));
707 assert!(a.contains(PipelineFeatureFlags::F16));
708 assert!(!a.contains(PipelineFeatureFlags::TENSOR_CORES));
709 }
710
711 #[test]
712 fn version_is_current() {
713 let k = PipelineCacheKey::new(
714 hash32(1),
715 hash32(2),
716 0,
717 [1, 1, 1],
718 PipelineFeatureFlags::empty(),
719 BackendId::from("backend-a"),
720 );
721 assert_eq!(k.version, CURRENT_PIPELINE_CACHE_KEY_VERSION);
722 }
723
724 #[test]
725 fn shared_cache_identity_separates_program_policy_and_device_facts() {
726 let program_a = hash32(1);
727 let program_b = hash32(2);
728 let policy_a = hash32(3);
729 let policy_b = hash32(4);
730 let device_a = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
731 let device_b = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
732
733 let base = PipelineCacheIdentity::from_parts(program_a, policy_a, device_a);
734
735 assert_eq!(base.program_digest, program_a);
736 assert_eq!(base.policy_digest, policy_a);
737 assert_eq!(base.device_fingerprint, device_a);
738 assert_ne!(
739 base.digest,
740 PipelineCacheIdentity::from_parts(program_b, policy_a, device_a).digest,
741 "Fix: shared pipeline cache identity must include the normalized Program digest."
742 );
743 assert_ne!(
744 base.digest,
745 PipelineCacheIdentity::from_parts(program_a, policy_b, device_a).digest,
746 "Fix: shared pipeline cache identity must include dispatch policy as its own tuple field."
747 );
748 assert_ne!(
749 base.digest,
750 PipelineCacheIdentity::from_parts(program_a, policy_a, device_b).digest,
751 "Fix: shared pipeline cache identity must include device/runtime fingerprint facts."
752 );
753 }
754
755 #[test]
756 fn miss_reason_classifies_adjacent_identity_evidence() {
757 assert_eq!(
758 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
759 total_entries: 0,
760 same_program_entries: 0,
761 same_program_and_policy_entries: 0,
762 same_program_policy_and_device_entries: 0,
763 }),
764 PipelineCacheMissReason::EmptyCache
765 );
766 assert_eq!(
767 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
768 total_entries: 3,
769 same_program_entries: 0,
770 same_program_and_policy_entries: 0,
771 same_program_policy_and_device_entries: 0,
772 }),
773 PipelineCacheMissReason::ProgramChanged
774 );
775 assert_eq!(
776 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
777 total_entries: 3,
778 same_program_entries: 2,
779 same_program_and_policy_entries: 0,
780 same_program_policy_and_device_entries: 0,
781 }),
782 PipelineCacheMissReason::DispatchPolicyChanged
783 );
784 assert_eq!(
785 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
786 total_entries: 3,
787 same_program_entries: 2,
788 same_program_and_policy_entries: 1,
789 same_program_policy_and_device_entries: 0,
790 }),
791 PipelineCacheMissReason::DeviceOrRuntimeChanged
792 );
793 assert_eq!(
794 PipelineCacheMissReason::classify(PipelineCacheMissEvidence {
795 total_entries: 3,
796 same_program_entries: 2,
797 same_program_and_policy_entries: 1,
798 same_program_policy_and_device_entries: 1,
799 }),
800 PipelineCacheMissReason::KeyAbsent
801 );
802 }
803
804 #[test]
805 fn miss_reason_metric_suffixes_are_stable_snake_case() {
806 assert_eq!(PipelineCacheMissReason::EmptyCache.metric_suffix(), "empty_cache");
807 assert_eq!(
808 PipelineCacheMissReason::ProgramChanged.metric_suffix(),
809 "program_changed"
810 );
811 assert_eq!(
812 PipelineCacheMissReason::DispatchPolicyChanged.metric_suffix(),
813 "dispatch_policy_changed"
814 );
815 assert_eq!(
816 PipelineCacheMissReason::DeviceOrRuntimeChanged.metric_suffix(),
817 "device_or_runtime_changed"
818 );
819 assert_eq!(PipelineCacheMissReason::KeyAbsent.metric_suffix(), "key_absent");
820 }
821
822 #[test]
823 fn miss_reason_classifies_cached_shared_identities() {
824 let program = hash32(1);
825 let other_program = hash32(2);
826 let policy = hash32(3);
827 let other_policy = hash32(4);
828 let device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-a");
829 let other_device = PipelineDeviceFingerprint::from_parts(1, 2, "driver-a", "runtime-b");
830 let requested = PipelineCacheIdentity::from_parts(program, policy, device);
831
832 assert_eq!(
833 PipelineCacheMissReason::classify_identities([].iter(), &requested),
834 PipelineCacheMissReason::EmptyCache
835 );
836 assert_eq!(
837 PipelineCacheMissReason::classify_identities(
838 [PipelineCacheIdentity::from_parts(other_program, policy, device)].iter(),
839 &requested,
840 ),
841 PipelineCacheMissReason::ProgramChanged
842 );
843 assert_eq!(
844 PipelineCacheMissReason::classify_identities(
845 [PipelineCacheIdentity::from_parts(program, other_policy, device)].iter(),
846 &requested,
847 ),
848 PipelineCacheMissReason::DispatchPolicyChanged
849 );
850 assert_eq!(
851 PipelineCacheMissReason::classify_identities(
852 [PipelineCacheIdentity::from_parts(program, policy, other_device)].iter(),
853 &requested,
854 ),
855 PipelineCacheMissReason::DeviceOrRuntimeChanged
856 );
857 assert_eq!(
858 PipelineCacheMissReason::classify_identities(
859 [PipelineCacheIdentity::from_parts(program, policy, device)].iter(),
860 &requested,
861 ),
862 PipelineCacheMissReason::KeyAbsent
863 );
864 }
865
866 #[test]
867 fn poisoned_pending_flush_lock_is_not_silently_recovered() {
868 let cache = Arc::new(DiskPipelineCache {
869 root: std::env::temp_dir(),
870 pending_flushes: std::sync::Mutex::new(Vec::new()),
871 });
872 let poisoned = Arc::clone(&cache);
873 let _ = std::thread::spawn(move || {
874 let _guard = poisoned.lock_pending_flushes();
875 panic!("poison disk pipeline cache pending flushes");
876 })
877 .join();
878
879 let panic = std::panic::catch_unwind(|| {
880 drop(cache.lock_pending_flushes());
881 })
882 .expect_err("poisoned disk pipeline cache must panic instead of recovering");
883 let message = panic
884 .downcast_ref::<String>()
885 .map(String::as_str)
886 .or_else(|| panic.downcast_ref::<&'static str>().copied())
887 .unwrap_or("<non-string panic>");
888 assert!(
889 message.contains("pending-flush lock was poisoned"),
890 "{message}"
891 );
892 }
893}