Skip to main content

vyre_runtime/pipeline_cache/
disk.rs

1//! Disk-backed pipeline cache. Writes one file per fingerprint under
2//! `<root>/<hex>.bin` with a blake3 footer that the reader verifies
3//! before returning the payload (covers torn writes, bit-rot, and
4//! deliberate tampering).
5
6use std::fs::{self, File};
7use std::io::Read;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::OnceLock;
12
13use dashmap::DashMap;
14
15use super::fingerprint::PipelineFingerprint;
16use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
17use super::store::PipelineCacheStore;
18
19/// Disk-backed pipeline cache. Writes one file per fingerprint
20/// under `<root>/<hex>.bin`. Reads are stateless; writes are
21/// `write + rename` for atomicity. No eviction policy today
22/// (user decides)  -  the footprint is bounded by
23/// sum(artifact_size × unique_canonical_programs).
24#[derive(Debug)]
25pub struct DiskCache {
26    root: PathBuf,
27    pending_flushes: DashMap<PathBuf, ()>,
28    metrics: PipelineCacheCounters,
29}
30
31/// Crash-durability evidence for disk cache artifacts.
32#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
33pub struct DiskCacheDurabilityReport {
34    /// Entries installed by rename but not yet explicitly flushed.
35    pub pending_flushes: u64,
36    /// True when no installed artifacts are waiting on file and parent-dir
37    /// fsync evidence.
38    pub durable: bool,
39}
40
41/// Persistent process-crossing pipeline-cache store.
42///
43/// This is the default disk-backed store for callers that need compiled
44/// pipeline artifacts to survive process restarts.
45static DISK_CACHE_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
46
47/// Alias for the disk-backed pipeline cache store.
48pub type PersistentPipelineCacheStore = DiskCache;
49
50// On-disk layout:
51//   <payload bytes..>  <32-byte blake3 footer>
52// Total file size = payload.len() + 32. Get verifies the footer
53// before returning the payload; mismatches or truncated files
54// return None so the caller recompiles. Covers torn writes +
55// bit-rot + deliberate tampering.
56pub(super) const CHECKSUM_LEN: usize = 32;
57pub(super) const CHECKSUM_LEN_U64: u64 = 32;
58pub(super) const MAX_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
59pub(super) const MAX_ENCODED_PIPELINE_BLOB_BYTES: u64 = MAX_PIPELINE_BLOB_BYTES + CHECKSUM_LEN_U64;
60
61impl DiskCache {
62    /// Construct a cache rooted at `root`. Creates the directory if
63    /// it doesn't exist.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`DiskCacheError::Io`] when the directory can't be
68    /// created.
69    pub fn new(root: impl Into<PathBuf>) -> Result<Self, DiskCacheError> {
70        let root = root.into();
71        fs::create_dir_all(&root).map_err(DiskCacheError::Io)?;
72        Ok(Self {
73            root,
74            pending_flushes: DashMap::new(),
75            metrics: PipelineCacheCounters::default(),
76        })
77    }
78
79    /// Construct a cache rooted at `~/.cache/vyre/pipelines/` (or
80    /// `$XDG_CACHE_HOME/vyre/pipelines/` if set).
81    ///
82    /// # Errors
83    ///
84    /// Returns [`DiskCacheError::CacheDirUnknown`] when neither env
85    /// var resolves, or [`DiskCacheError::Io`] on mkdir failure.
86    pub fn in_user_cache() -> Result<Self, DiskCacheError> {
87        let base = std::env::var_os("XDG_CACHE_HOME")
88            .map(PathBuf::from)
89            .or_else(|| std::env::var_os("HOME").map(|h| Path::new(&h).join(".cache")))
90            .ok_or(DiskCacheError::CacheDirUnknown)?;
91        Self::new(base.join("vyre").join("pipelines"))
92    }
93
94    /// Root directory this cache operates on.
95    #[must_use]
96    pub fn root(&self) -> &Path {
97        &self.root
98    }
99
100    /// Snapshot whether installed artifacts have crossed the explicit
101    /// durability boundary.
102    #[must_use]
103    pub fn durability_report(&self) -> DiskCacheDurabilityReport {
104        let pending_flushes = match u64::try_from(self.pending_flushes.len()) {
105            Ok(pending_flushes) => pending_flushes,
106            Err(_) => u64::MAX,
107        };
108        DiskCacheDurabilityReport {
109            pending_flushes,
110            durable: pending_flushes == 0,
111        }
112    }
113
114    fn path_for(&self, fp: &PipelineFingerprint) -> PathBuf {
115        self.root.join(cache_file_name(fp))
116    }
117}
118
119fn cache_file_name(fp: &PipelineFingerprint) -> String {
120    let mut file_name = String::with_capacity(68);
121    fp.push_hex(&mut file_name);
122    file_name.push_str(".bin");
123    file_name
124}
125
126impl PipelineCacheStore for DiskCache {
127    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
128        self.metrics.lookups.fetch_add(1, Ordering::Relaxed);
129        let path = self.path_for(fp);
130        // FINDING-CACHE-1: reject symlinks before reading. `symlink_metadata`
131        // does NOT follow the symlink; regular-file check is strict.
132        let Some(meta) = fs::symlink_metadata(&path).ok() else {
133            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
134            return None;
135        };
136        if !meta.file_type().is_file() {
137            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
138            return None;
139        }
140        if meta.len() > MAX_ENCODED_PIPELINE_BLOB_BYTES {
141            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
142            return None;
143        }
144        let Some(file) = File::open(&path).ok() else {
145            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
146            return None;
147        };
148        let capacity = usize::try_from(meta.len()).ok()?;
149        let result = read_verified_cache_blob_with_capacity(file, capacity);
150        if result.is_some() {
151            self.metrics.hits.fetch_add(1, Ordering::Relaxed);
152        } else {
153            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
154        }
155        result
156    }
157
158    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
159        let tmp_id = DISK_CACHE_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
160        let mut tmp_name = String::with_capacity(85);
161        tmp_name.push('.');
162        fp.push_hex(&mut tmp_name);
163        tmp_name.push('-');
164        append_u64_decimal(&mut tmp_name, tmp_id);
165        tmp_name.push_str(".bin.tmp");
166        let tmp_path = self.root.join(&tmp_name);
167
168        let mut final_name = String::with_capacity(68);
169        fp.push_hex(&mut final_name);
170        final_name.push_str(".bin");
171        let final_path = self.root.join(&final_name);
172
173        // Write payload + blake3 footer in one shot and install by rename so
174        // readers see either the prior complete file or the new complete file.
175        // Durability is batched through `flush`; fsyncing every insertion
176        // turns steady-state cache population into a storage latency bottleneck.
177        let write_rename = || -> io::Result<()> {
178            let checksum = ::blake3::hash(&artifact);
179            let mut f = File::create(&tmp_path)?;
180            f.write_all(&artifact)?;
181            f.write_all(checksum.as_bytes())?;
182            drop(f);
183            // FINDING-CACHE-1: if the final path is a symlink, unlink it
184            // first so rename replaces the symlink (not its target).
185            if let Ok(meta) = fs::symlink_metadata(&final_path) {
186                if meta.file_type().is_symlink() {
187                    fs::remove_file(&final_path)?;
188                }
189            }
190            fs::rename(&tmp_path, &final_path)?;
191            self.pending_flushes.insert(final_path, ());
192            Ok(())
193        };
194        if write_rename().is_err() {
195            self.metrics.rejected_puts.fetch_add(1, Ordering::Relaxed);
196            // Best-effort; caller falls back to recompile. Clean up
197            // the tmp file so it doesn't accumulate on failure.
198            match fs::remove_file(&tmp_path) {
199                Ok(()) => {}
200                Err(error) if error.kind() == io::ErrorKind::NotFound => {}
201                Err(error) => tracing::warn!(
202                    tmp_path = %tmp_path.display(),
203                    error = %error,
204                    "failed to remove temporary disk-cache artifact after rejected put"
205                ),
206            }
207        } else {
208            self.metrics.puts.fetch_add(1, Ordering::Relaxed);
209        }
210    }
211
212    fn flush(&self) -> io::Result<()> {
213        self.metrics.flushes.fetch_add(1, Ordering::Relaxed);
214        let paths: Vec<PathBuf> = self
215            .pending_flushes
216            .iter()
217            .map(|entry| entry.key().clone())
218            .collect();
219        self.pending_flushes.clear();
220        if let Err(error) = flush_paths(&paths) {
221            self.metrics.flush_errors.fetch_add(1, Ordering::Relaxed);
222            for path in paths {
223                self.pending_flushes.insert(path, ());
224            }
225            return Err(error);
226        }
227        Ok(())
228    }
229
230    fn metrics(&self) -> PipelineCacheMetrics {
231        self.metrics.snapshot(0, 0)
232    }
233}
234
235fn flush_paths(paths: &[PathBuf]) -> io::Result<()> {
236    let mut parents = Vec::with_capacity(paths.len());
237    sync_paths_bounded(
238        paths,
239        File::sync_data,
240        "pipeline cache file sync worker panicked",
241    )?;
242    for path in paths {
243        if let Some(parent) = path.parent() {
244            parents.push(parent.to_path_buf());
245        }
246    }
247    parents.sort();
248    parents.dedup();
249    sync_parent_dirs(&parents)?;
250    Ok(())
251}
252
253#[cfg(unix)]
254fn sync_parent_dirs(parents: &[PathBuf]) -> io::Result<()> {
255    sync_paths_bounded(
256        parents,
257        File::sync_all,
258        "pipeline cache directory sync worker panicked",
259    )
260}
261
262#[cfg(not(unix))]
263fn sync_parent_dirs(_parents: &[PathBuf]) -> io::Result<()> {
264    Ok(())
265}
266
267fn sync_paths_bounded(
268    paths: &[PathBuf],
269    sync: fn(&File) -> io::Result<()>,
270    panic_message: &'static str,
271) -> io::Result<()> {
272    if paths.is_empty() {
273        return Ok(());
274    }
275    let workers = sync_worker_count();
276    for chunk in paths.chunks(workers) {
277        std::thread::scope(|scope| {
278            let mut handles = Vec::with_capacity(chunk.len());
279            for path in chunk {
280                handles.push(scope.spawn(move || {
281                    let file = File::open(path)?;
282                    sync(&file)
283                }));
284            }
285            for handle in handles {
286                handle
287                    .join()
288                    .map_err(|_| io::Error::other(panic_message))??;
289            }
290            Ok::<(), io::Error>(())
291        })?;
292    }
293    Ok(())
294}
295
296fn sync_worker_count() -> usize {
297    static WORKERS: OnceLock<usize> = OnceLock::new();
298    *WORKERS.get_or_init(|| {
299        std::thread::available_parallelism()
300            .map(usize::from)
301            .unwrap_or(1)
302            .clamp(1, 16)
303    })
304}
305
306/// Errors from disk-backed pipeline cache construction / use.
307#[derive(Debug, thiserror::Error)]
308#[non_exhaustive]
309pub enum DiskCacheError {
310    /// Neither `$XDG_CACHE_HOME` nor `$HOME` is set.
311    #[error(
312        "could not resolve a user cache directory  -  set XDG_CACHE_HOME or HOME, or call DiskCache::new() with an explicit path"
313    )]
314    CacheDirUnknown,
315    /// `std::io` failure (mkdir, read, write).
316    #[error("disk-cache I/O error: {0}")]
317    Io(#[from] io::Error),
318}
319
320#[cfg_attr(not(any(test, feature = "remote")), allow(dead_code))]
321pub(super) fn read_verified_cache_blob(mut reader: impl Read) -> Option<Vec<u8>> {
322    read_verified_cache_blob_with_capacity(&mut reader, 0)
323}
324
325fn read_verified_cache_blob_with_capacity(
326    mut reader: impl Read,
327    capacity: usize,
328) -> Option<Vec<u8>> {
329    let max_encoded_capacity = usize::try_from(MAX_ENCODED_PIPELINE_BLOB_BYTES).ok()?;
330    let mut bytes = Vec::with_capacity(capacity.min(max_encoded_capacity));
331    reader
332        .by_ref()
333        .take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1)
334        .read_to_end(&mut bytes)
335        .ok()?;
336    verify_cache_blob(bytes)
337}
338
339pub(super) fn verify_cache_blob(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
340    let byte_len = u64::try_from(bytes.len()).ok()?;
341    if byte_len > MAX_ENCODED_PIPELINE_BLOB_BYTES || bytes.len() < CHECKSUM_LEN {
342        return None;
343    }
344    let payload_len = bytes.len() - CHECKSUM_LEN;
345    if u64::try_from(payload_len).ok()? > MAX_PIPELINE_BLOB_BYTES {
346        return None;
347    }
348    let (payload, footer) = bytes.split_at(payload_len);
349    let expected = ::blake3::hash(payload);
350    if footer != expected.as_bytes() {
351        return None;
352    }
353    bytes.truncate(payload_len);
354    Some(bytes)
355}
356
357fn append_u64_decimal(out: &mut String, mut value: u64) {
358    let mut digits = [0u8; 20];
359    let mut len = 0usize;
360    loop {
361        digits[len] = b'0' + (value % 10) as u8;
362        len += 1;
363        value /= 10;
364        if value == 0 {
365            break;
366        }
367    }
368    for digit in digits[..len].iter().rev() {
369        out.push(char::from(*digit));
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::pipeline_cache::test_helpers::{tiny_program, unique_u64};
377
378    #[test]
379    fn persistent_alias_disk_cache_persists_across_store_reopen() {
380        let root = std::env::temp_dir().join(format!(
381            "vyre-pipeline-cache-test-{}-{}",
382            std::process::id(),
383            unique_u64()
384        ));
385        let fp = PipelineFingerprint::of(&tiny_program());
386
387        let first = DiskCache::new(&root)
388            .expect("Fix: test must create disk cache directory; restore temp-dir access.");
389        first.put(fp, b"compiled-pipeline".to_vec());
390        drop(first);
391
392        let reopened =
393            PersistentPipelineCacheStore::new(&root).expect("Fix: disk cache must reopen.");
394        assert_eq!(
395            reopened.get(&fp).as_deref(),
396            Some(&b"compiled-pipeline"[..]),
397            "Fix: disk pipeline cache must persist artifacts across process-local store reconstruction"
398        );
399
400        std::fs::remove_dir_all(root).expect("Fix: disk cache test root cleanup must succeed");
401    }
402
403    #[test]
404    fn disk_cache_persists_across_store_reopen() {
405        let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
406        let fp = PipelineFingerprint::of(&tiny_program());
407        {
408            let cache = DiskCache::new(temp.path())
409                .expect("Fix: disk cache test must create isolated cache root");
410            cache.put(fp, b"driver-pipeline-blob".to_vec());
411        }
412        let reopened =
413            DiskCache::new(temp.path()).expect("Fix: disk cache must reopen an existing root");
414        assert_eq!(
415            reopened.get(&fp),
416            Some(b"driver-pipeline-blob".to_vec()),
417            "Fix: disk PipelineCacheStore must survive process/backend reconstruction"
418        );
419    }
420
421    #[test]
422    fn disk_cache_flush_is_explicit_durability_boundary() {
423        let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
424        let fp = PipelineFingerprint::of(&tiny_program());
425        let cache = DiskCache::new(temp.path())
426            .expect("Fix: disk cache test must create isolated cache root");
427        cache.put(fp, b"driver-pipeline-blob".to_vec());
428        assert!(
429            !cache.pending_flushes.is_empty(),
430            "Fix: DiskCache::put must defer fsync work until explicit flush."
431        );
432        cache
433            .flush()
434            .expect("Fix: explicit disk cache flush must fsync pending entries.");
435        assert!(
436            cache.pending_flushes.is_empty(),
437            "Fix: explicit disk cache flush must drain pending entries."
438        );
439        assert_eq!(
440            cache.get(&fp),
441            Some(b"driver-pipeline-blob".to_vec()),
442            "Fix: explicit flush must preserve the installed cache artifact."
443        );
444    }
445
446    #[test]
447    fn cache_blob_verifier_accepts_checksum_footer() {
448        let payload = b"compiled-artifact".to_vec();
449        let mut encoded = payload.clone();
450        encoded.extend_from_slice(::blake3::hash(&payload).as_bytes());
451
452        assert_eq!(verify_cache_blob(encoded), Some(payload));
453    }
454
455    #[test]
456    fn cache_blob_verifier_rejects_corrupted_footer() {
457        let payload = b"compiled-artifact".to_vec();
458        let mut encoded = payload;
459        encoded.extend_from_slice(&[0xA5; CHECKSUM_LEN]);
460
461        assert!(
462            verify_cache_blob(encoded).is_none(),
463            "Fix: disk and remote cache readers must reject artifacts whose checksum footer does not match"
464        );
465    }
466
467    #[test]
468    fn cache_blob_reader_rejects_oversized_encoded_blob() {
469        let oversized = std::io::repeat(0).take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1);
470
471        assert!(
472            read_verified_cache_blob(oversized).is_none(),
473            "Fix: disk and remote cache readers must cap encoded blob bytes before allocation"
474        );
475    }
476
477    #[test]
478    fn disk_cache_durability_report_tracks_pending_flush_boundary() {
479        let temp = tempfile::tempdir().expect("Fix: create temp disk cache root");
480        let fp = PipelineFingerprint::of(&tiny_program());
481        let cache = DiskCache::new(temp.path()).expect("Fix: create disk cache");
482
483        assert_eq!(
484            cache.durability_report(),
485            DiskCacheDurabilityReport {
486                pending_flushes: 0,
487                durable: true,
488            }
489        );
490
491        cache.put(fp, b"driver-pipeline-blob".to_vec());
492        let after_put = cache.durability_report();
493        assert_eq!(after_put.pending_flushes, 1);
494        assert!(
495            !after_put.durable,
496            "Fix: installed artifacts must not be reported durable before explicit flush"
497        );
498
499        cache
500            .flush()
501            .expect("Fix: disk cache flush must fsync pending file and parent dir");
502        assert_eq!(
503            cache.durability_report(),
504            DiskCacheDurabilityReport {
505                pending_flushes: 0,
506                durable: true,
507            }
508        );
509    }
510}