vyre_runtime/pipeline_cache/
disk.rs1use std::fs::{self, File};
7use std::io::Read;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::OnceLock;
12
13use dashmap::DashMap;
14
15use super::fingerprint::PipelineFingerprint;
16use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
17use super::store::PipelineCacheStore;
18
19#[derive(Debug)]
25pub struct DiskCache {
26 root: PathBuf,
27 pending_flushes: DashMap<PathBuf, ()>,
28 metrics: PipelineCacheCounters,
29}
30
31#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
33pub struct DiskCacheDurabilityReport {
34 pub pending_flushes: u64,
36 pub durable: bool,
39}
40
41static DISK_CACHE_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
46
47pub type PersistentPipelineCacheStore = DiskCache;
49
50pub(super) const CHECKSUM_LEN: usize = 32;
57pub(super) const CHECKSUM_LEN_U64: u64 = 32;
58pub(super) const MAX_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
59pub(super) const MAX_ENCODED_PIPELINE_BLOB_BYTES: u64 = MAX_PIPELINE_BLOB_BYTES + CHECKSUM_LEN_U64;
60
61impl DiskCache {
62 pub fn new(root: impl Into<PathBuf>) -> Result<Self, DiskCacheError> {
70 let root = root.into();
71 fs::create_dir_all(&root).map_err(DiskCacheError::Io)?;
72 Ok(Self {
73 root,
74 pending_flushes: DashMap::new(),
75 metrics: PipelineCacheCounters::default(),
76 })
77 }
78
79 pub fn in_user_cache() -> Result<Self, DiskCacheError> {
87 let base = std::env::var_os("XDG_CACHE_HOME")
88 .map(PathBuf::from)
89 .or_else(|| std::env::var_os("HOME").map(|h| Path::new(&h).join(".cache")))
90 .ok_or(DiskCacheError::CacheDirUnknown)?;
91 Self::new(base.join("vyre").join("pipelines"))
92 }
93
94 #[must_use]
96 pub fn root(&self) -> &Path {
97 &self.root
98 }
99
100 #[must_use]
103 pub fn durability_report(&self) -> DiskCacheDurabilityReport {
104 let pending_flushes = match u64::try_from(self.pending_flushes.len()) {
105 Ok(pending_flushes) => pending_flushes,
106 Err(_) => u64::MAX,
107 };
108 DiskCacheDurabilityReport {
109 pending_flushes,
110 durable: pending_flushes == 0,
111 }
112 }
113
114 fn path_for(&self, fp: &PipelineFingerprint) -> PathBuf {
115 self.root.join(cache_file_name(fp))
116 }
117}
118
119fn cache_file_name(fp: &PipelineFingerprint) -> String {
120 let mut file_name = String::with_capacity(68);
121 fp.push_hex(&mut file_name);
122 file_name.push_str(".bin");
123 file_name
124}
125
126impl PipelineCacheStore for DiskCache {
127 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
128 self.metrics.lookups.fetch_add(1, Ordering::Relaxed);
129 let path = self.path_for(fp);
130 let Some(meta) = fs::symlink_metadata(&path).ok() else {
133 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
134 return None;
135 };
136 if !meta.file_type().is_file() {
137 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
138 return None;
139 }
140 if meta.len() > MAX_ENCODED_PIPELINE_BLOB_BYTES {
141 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
142 return None;
143 }
144 let Some(file) = File::open(&path).ok() else {
145 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
146 return None;
147 };
148 let capacity = usize::try_from(meta.len()).ok()?;
149 let result = read_verified_cache_blob_with_capacity(file, capacity);
150 if result.is_some() {
151 self.metrics.hits.fetch_add(1, Ordering::Relaxed);
152 } else {
153 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
154 }
155 result
156 }
157
158 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
159 let tmp_id = DISK_CACHE_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
160 let mut tmp_name = String::with_capacity(85);
161 tmp_name.push('.');
162 fp.push_hex(&mut tmp_name);
163 tmp_name.push('-');
164 append_u64_decimal(&mut tmp_name, tmp_id);
165 tmp_name.push_str(".bin.tmp");
166 let tmp_path = self.root.join(&tmp_name);
167
168 let mut final_name = String::with_capacity(68);
169 fp.push_hex(&mut final_name);
170 final_name.push_str(".bin");
171 let final_path = self.root.join(&final_name);
172
173 let write_rename = || -> io::Result<()> {
178 let checksum = ::blake3::hash(&artifact);
179 let mut f = File::create(&tmp_path)?;
180 f.write_all(&artifact)?;
181 f.write_all(checksum.as_bytes())?;
182 drop(f);
183 if let Ok(meta) = fs::symlink_metadata(&final_path) {
186 if meta.file_type().is_symlink() {
187 fs::remove_file(&final_path)?;
188 }
189 }
190 fs::rename(&tmp_path, &final_path)?;
191 self.pending_flushes.insert(final_path, ());
192 Ok(())
193 };
194 if write_rename().is_err() {
195 self.metrics.rejected_puts.fetch_add(1, Ordering::Relaxed);
196 match fs::remove_file(&tmp_path) {
199 Ok(()) => {}
200 Err(error) if error.kind() == io::ErrorKind::NotFound => {}
201 Err(error) => tracing::warn!(
202 tmp_path = %tmp_path.display(),
203 error = %error,
204 "failed to remove temporary disk-cache artifact after rejected put"
205 ),
206 }
207 } else {
208 self.metrics.puts.fetch_add(1, Ordering::Relaxed);
209 }
210 }
211
212 fn flush(&self) -> io::Result<()> {
213 self.metrics.flushes.fetch_add(1, Ordering::Relaxed);
214 let paths: Vec<PathBuf> = self
215 .pending_flushes
216 .iter()
217 .map(|entry| entry.key().clone())
218 .collect();
219 self.pending_flushes.clear();
220 if let Err(error) = flush_paths(&paths) {
221 self.metrics.flush_errors.fetch_add(1, Ordering::Relaxed);
222 for path in paths {
223 self.pending_flushes.insert(path, ());
224 }
225 return Err(error);
226 }
227 Ok(())
228 }
229
230 fn metrics(&self) -> PipelineCacheMetrics {
231 self.metrics.snapshot(0, 0)
232 }
233}
234
235fn flush_paths(paths: &[PathBuf]) -> io::Result<()> {
236 let mut parents = Vec::with_capacity(paths.len());
237 sync_paths_bounded(
238 paths,
239 File::sync_data,
240 "pipeline cache file sync worker panicked",
241 )?;
242 for path in paths {
243 if let Some(parent) = path.parent() {
244 parents.push(parent.to_path_buf());
245 }
246 }
247 parents.sort();
248 parents.dedup();
249 sync_parent_dirs(&parents)?;
250 Ok(())
251}
252
253#[cfg(unix)]
254fn sync_parent_dirs(parents: &[PathBuf]) -> io::Result<()> {
255 sync_paths_bounded(
256 parents,
257 File::sync_all,
258 "pipeline cache directory sync worker panicked",
259 )
260}
261
262#[cfg(not(unix))]
263fn sync_parent_dirs(_parents: &[PathBuf]) -> io::Result<()> {
264 Ok(())
265}
266
267fn sync_paths_bounded(
268 paths: &[PathBuf],
269 sync: fn(&File) -> io::Result<()>,
270 panic_message: &'static str,
271) -> io::Result<()> {
272 if paths.is_empty() {
273 return Ok(());
274 }
275 let workers = sync_worker_count();
276 for chunk in paths.chunks(workers) {
277 std::thread::scope(|scope| {
278 let mut handles = Vec::with_capacity(chunk.len());
279 for path in chunk {
280 handles.push(scope.spawn(move || {
281 let file = File::open(path)?;
282 sync(&file)
283 }));
284 }
285 for handle in handles {
286 handle
287 .join()
288 .map_err(|_| io::Error::other(panic_message))??;
289 }
290 Ok::<(), io::Error>(())
291 })?;
292 }
293 Ok(())
294}
295
296fn sync_worker_count() -> usize {
297 static WORKERS: OnceLock<usize> = OnceLock::new();
298 *WORKERS.get_or_init(|| {
299 std::thread::available_parallelism()
300 .map(usize::from)
301 .unwrap_or(1)
302 .clamp(1, 16)
303 })
304}
305
306#[derive(Debug, thiserror::Error)]
308#[non_exhaustive]
309pub enum DiskCacheError {
310 #[error(
312 "could not resolve a user cache directory - set XDG_CACHE_HOME or HOME, or call DiskCache::new() with an explicit path"
313 )]
314 CacheDirUnknown,
315 #[error("disk-cache I/O error: {0}")]
317 Io(#[from] io::Error),
318}
319
320#[cfg_attr(not(any(test, feature = "remote")), allow(dead_code))]
321pub(super) fn read_verified_cache_blob(mut reader: impl Read) -> Option<Vec<u8>> {
322 read_verified_cache_blob_with_capacity(&mut reader, 0)
323}
324
325fn read_verified_cache_blob_with_capacity(
326 mut reader: impl Read,
327 capacity: usize,
328) -> Option<Vec<u8>> {
329 let max_encoded_capacity = usize::try_from(MAX_ENCODED_PIPELINE_BLOB_BYTES).ok()?;
330 let mut bytes = Vec::with_capacity(capacity.min(max_encoded_capacity));
331 reader
332 .by_ref()
333 .take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1)
334 .read_to_end(&mut bytes)
335 .ok()?;
336 verify_cache_blob(bytes)
337}
338
339pub(super) fn verify_cache_blob(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
340 let byte_len = u64::try_from(bytes.len()).ok()?;
341 if byte_len > MAX_ENCODED_PIPELINE_BLOB_BYTES || bytes.len() < CHECKSUM_LEN {
342 return None;
343 }
344 let payload_len = bytes.len() - CHECKSUM_LEN;
345 if u64::try_from(payload_len).ok()? > MAX_PIPELINE_BLOB_BYTES {
346 return None;
347 }
348 let (payload, footer) = bytes.split_at(payload_len);
349 let expected = ::blake3::hash(payload);
350 if footer != expected.as_bytes() {
351 return None;
352 }
353 bytes.truncate(payload_len);
354 Some(bytes)
355}
356
357fn append_u64_decimal(out: &mut String, mut value: u64) {
358 let mut digits = [0u8; 20];
359 let mut len = 0usize;
360 loop {
361 digits[len] = b'0' + (value % 10) as u8;
362 len += 1;
363 value /= 10;
364 if value == 0 {
365 break;
366 }
367 }
368 for digit in digits[..len].iter().rev() {
369 out.push(char::from(*digit));
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use crate::pipeline_cache::test_helpers::{tiny_program, unique_u64};
377
378 #[test]
379 fn persistent_alias_disk_cache_persists_across_store_reopen() {
380 let root = std::env::temp_dir().join(format!(
381 "vyre-pipeline-cache-test-{}-{}",
382 std::process::id(),
383 unique_u64()
384 ));
385 let fp = PipelineFingerprint::of(&tiny_program());
386
387 let first = DiskCache::new(&root)
388 .expect("Fix: test must create disk cache directory; restore temp-dir access.");
389 first.put(fp, b"compiled-pipeline".to_vec());
390 drop(first);
391
392 let reopened =
393 PersistentPipelineCacheStore::new(&root).expect("Fix: disk cache must reopen.");
394 assert_eq!(
395 reopened.get(&fp).as_deref(),
396 Some(&b"compiled-pipeline"[..]),
397 "Fix: disk pipeline cache must persist artifacts across process-local store reconstruction"
398 );
399
400 std::fs::remove_dir_all(root).expect("Fix: disk cache test root cleanup must succeed");
401 }
402
403 #[test]
404 fn disk_cache_persists_across_store_reopen() {
405 let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
406 let fp = PipelineFingerprint::of(&tiny_program());
407 {
408 let cache = DiskCache::new(temp.path())
409 .expect("Fix: disk cache test must create isolated cache root");
410 cache.put(fp, b"driver-pipeline-blob".to_vec());
411 }
412 let reopened =
413 DiskCache::new(temp.path()).expect("Fix: disk cache must reopen an existing root");
414 assert_eq!(
415 reopened.get(&fp),
416 Some(b"driver-pipeline-blob".to_vec()),
417 "Fix: disk PipelineCacheStore must survive process/backend reconstruction"
418 );
419 }
420
421 #[test]
422 fn disk_cache_flush_is_explicit_durability_boundary() {
423 let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
424 let fp = PipelineFingerprint::of(&tiny_program());
425 let cache = DiskCache::new(temp.path())
426 .expect("Fix: disk cache test must create isolated cache root");
427 cache.put(fp, b"driver-pipeline-blob".to_vec());
428 assert!(
429 !cache.pending_flushes.is_empty(),
430 "Fix: DiskCache::put must defer fsync work until explicit flush."
431 );
432 cache
433 .flush()
434 .expect("Fix: explicit disk cache flush must fsync pending entries.");
435 assert!(
436 cache.pending_flushes.is_empty(),
437 "Fix: explicit disk cache flush must drain pending entries."
438 );
439 assert_eq!(
440 cache.get(&fp),
441 Some(b"driver-pipeline-blob".to_vec()),
442 "Fix: explicit flush must preserve the installed cache artifact."
443 );
444 }
445
446 #[test]
447 fn cache_blob_verifier_accepts_checksum_footer() {
448 let payload = b"compiled-artifact".to_vec();
449 let mut encoded = payload.clone();
450 encoded.extend_from_slice(::blake3::hash(&payload).as_bytes());
451
452 assert_eq!(verify_cache_blob(encoded), Some(payload));
453 }
454
455 #[test]
456 fn cache_blob_verifier_rejects_corrupted_footer() {
457 let payload = b"compiled-artifact".to_vec();
458 let mut encoded = payload;
459 encoded.extend_from_slice(&[0xA5; CHECKSUM_LEN]);
460
461 assert!(
462 verify_cache_blob(encoded).is_none(),
463 "Fix: disk and remote cache readers must reject artifacts whose checksum footer does not match"
464 );
465 }
466
467 #[test]
468 fn cache_blob_reader_rejects_oversized_encoded_blob() {
469 let oversized = std::io::repeat(0).take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1);
470
471 assert!(
472 read_verified_cache_blob(oversized).is_none(),
473 "Fix: disk and remote cache readers must cap encoded blob bytes before allocation"
474 );
475 }
476
477 #[test]
478 fn disk_cache_durability_report_tracks_pending_flush_boundary() {
479 let temp = tempfile::tempdir().expect("Fix: create temp disk cache root");
480 let fp = PipelineFingerprint::of(&tiny_program());
481 let cache = DiskCache::new(temp.path()).expect("Fix: create disk cache");
482
483 assert_eq!(
484 cache.durability_report(),
485 DiskCacheDurabilityReport {
486 pending_flushes: 0,
487 durable: true,
488 }
489 );
490
491 cache.put(fp, b"driver-pipeline-blob".to_vec());
492 let after_put = cache.durability_report();
493 assert_eq!(after_put.pending_flushes, 1);
494 assert!(
495 !after_put.durable,
496 "Fix: installed artifacts must not be reported durable before explicit flush"
497 );
498
499 cache
500 .flush()
501 .expect("Fix: disk cache flush must fsync pending file and parent dir");
502 assert_eq!(
503 cache.durability_report(),
504 DiskCacheDurabilityReport {
505 pending_flushes: 0,
506 durable: true,
507 }
508 );
509 }
510}