Skip to main content

vyre_runtime/pipeline_cache/
disk.rs

1//! Disk-backed pipeline cache. Writes one file per fingerprint under
2//! `<root>/<hex>.bin` with a blake3 footer that the reader verifies
3//! before returning the payload (covers torn writes, bit-rot, and
4//! deliberate tampering).
5
6use std::fs::{self, File};
7use std::io::Read;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::OnceLock;
12
13use dashmap::DashMap;
14
15use super::fingerprint::PipelineFingerprint;
16use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
17use super::store::PipelineCacheStore;
18
19/// Disk-backed pipeline cache. Writes one file per fingerprint
20/// under `<root>/<hex>.bin`. Reads are stateless; writes are
21/// `write + rename` for atomicity. No eviction policy today
22/// (user decides)  -  the footprint is bounded by
23/// sum(artifact_size × unique_canonical_programs).
24#[derive(Debug)]
25pub struct DiskCache {
26    root: PathBuf,
27    pending_flushes: DashMap<PathBuf, ()>,
28    metrics: PipelineCacheCounters,
29}
30
31/// Persistent process-crossing pipeline-cache store.
32///
33/// This is the default disk-backed store for callers that need compiled
34/// pipeline artifacts to survive process restarts.
35static DISK_CACHE_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
36
37/// Alias for the disk-backed pipeline cache store.
38pub type PersistentPipelineCacheStore = DiskCache;
39
40// On-disk layout:
41//   <payload bytes..>  <32-byte blake3 footer>
42// Total file size = payload.len() + 32. Get verifies the footer
43// before returning the payload; mismatches or truncated files
44// return None so the caller recompiles. Covers torn writes +
45// bit-rot + deliberate tampering.
46pub(super) const CHECKSUM_LEN: usize = 32;
47pub(super) const CHECKSUM_LEN_U64: u64 = 32;
48pub(super) const MAX_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
49pub(super) const MAX_ENCODED_PIPELINE_BLOB_BYTES: u64 = MAX_PIPELINE_BLOB_BYTES + CHECKSUM_LEN_U64;
50
51impl DiskCache {
52    /// Construct a cache rooted at `root`. Creates the directory if
53    /// it doesn't exist.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`DiskCacheError::Io`] when the directory can't be
58    /// created.
59    pub fn new(root: impl Into<PathBuf>) -> Result<Self, DiskCacheError> {
60        let root = root.into();
61        fs::create_dir_all(&root).map_err(DiskCacheError::Io)?;
62        Ok(Self {
63            root,
64            pending_flushes: DashMap::new(),
65            metrics: PipelineCacheCounters::default(),
66        })
67    }
68
69    /// Construct a cache rooted at `~/.cache/vyre/pipelines/` (or
70    /// `$XDG_CACHE_HOME/vyre/pipelines/` if set).
71    ///
72    /// # Errors
73    ///
74    /// Returns [`DiskCacheError::CacheDirUnknown`] when neither env
75    /// var resolves, or [`DiskCacheError::Io`] on mkdir failure.
76    pub fn in_user_cache() -> Result<Self, DiskCacheError> {
77        let base = std::env::var_os("XDG_CACHE_HOME")
78            .map(PathBuf::from)
79            .or_else(|| std::env::var_os("HOME").map(|h| Path::new(&h).join(".cache")))
80            .ok_or(DiskCacheError::CacheDirUnknown)?;
81        Self::new(base.join("vyre").join("pipelines"))
82    }
83
84    /// Root directory this cache operates on.
85    #[must_use]
86    pub fn root(&self) -> &Path {
87        &self.root
88    }
89
90    fn path_for(&self, fp: &PipelineFingerprint) -> PathBuf {
91        self.root.join(cache_file_name(fp))
92    }
93}
94
95fn cache_file_name(fp: &PipelineFingerprint) -> String {
96    let mut file_name = String::with_capacity(68);
97    fp.push_hex(&mut file_name);
98    file_name.push_str(".bin");
99    file_name
100}
101
102impl PipelineCacheStore for DiskCache {
103    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
104        self.metrics.lookups.fetch_add(1, Ordering::Relaxed);
105        let path = self.path_for(fp);
106        // FINDING-CACHE-1: reject symlinks before reading. `symlink_metadata`
107        // does NOT follow the symlink; regular-file check is strict.
108        let Some(meta) = fs::symlink_metadata(&path).ok() else {
109            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
110            return None;
111        };
112        if !meta.file_type().is_file() {
113            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
114            return None;
115        }
116        if meta.len() > MAX_ENCODED_PIPELINE_BLOB_BYTES {
117            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
118            return None;
119        }
120        let Some(file) = File::open(&path).ok() else {
121            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
122            return None;
123        };
124        let capacity = usize::try_from(meta.len()).ok()?;
125        let result = read_verified_cache_blob_with_capacity(file, capacity);
126        if result.is_some() {
127            self.metrics.hits.fetch_add(1, Ordering::Relaxed);
128        } else {
129            self.metrics.misses.fetch_add(1, Ordering::Relaxed);
130        }
131        result
132    }
133
134    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
135        let tmp_id = DISK_CACHE_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
136        let mut tmp_name = String::with_capacity(85);
137        tmp_name.push('.');
138        fp.push_hex(&mut tmp_name);
139        tmp_name.push('-');
140        append_u64_decimal(&mut tmp_name, tmp_id);
141        tmp_name.push_str(".bin.tmp");
142        let tmp_path = self.root.join(&tmp_name);
143
144        let mut final_name = String::with_capacity(68);
145        fp.push_hex(&mut final_name);
146        final_name.push_str(".bin");
147        let final_path = self.root.join(&final_name);
148
149        // Write payload + blake3 footer in one shot and install by rename so
150        // readers see either the prior complete file or the new complete file.
151        // Durability is batched through `flush`; fsyncing every insertion
152        // turns steady-state cache population into a storage latency bottleneck.
153        let write_rename = || -> io::Result<()> {
154            let checksum = ::blake3::hash(&artifact);
155            let mut f = File::create(&tmp_path)?;
156            f.write_all(&artifact)?;
157            f.write_all(checksum.as_bytes())?;
158            drop(f);
159            // FINDING-CACHE-1: if the final path is a symlink, unlink it
160            // first so rename replaces the symlink (not its target).
161            if let Ok(meta) = fs::symlink_metadata(&final_path) {
162                if meta.file_type().is_symlink() {
163                    fs::remove_file(&final_path)?;
164                }
165            }
166            fs::rename(&tmp_path, &final_path)?;
167            self.pending_flushes.insert(final_path, ());
168            Ok(())
169        };
170        if write_rename().is_err() {
171            self.metrics.rejected_puts.fetch_add(1, Ordering::Relaxed);
172            // Best-effort; caller falls back to recompile. Clean up
173            // the tmp file so it doesn't accumulate on failure.
174            match fs::remove_file(&tmp_path) {
175                Ok(()) => {}
176                Err(error) if error.kind() == io::ErrorKind::NotFound => {}
177                Err(error) => tracing::warn!(
178                    tmp_path = %tmp_path.display(),
179                    error = %error,
180                    "failed to remove temporary disk-cache artifact after rejected put"
181                ),
182            }
183        } else {
184            self.metrics.puts.fetch_add(1, Ordering::Relaxed);
185        }
186    }
187
188    fn flush(&self) -> io::Result<()> {
189        self.metrics.flushes.fetch_add(1, Ordering::Relaxed);
190        let paths: Vec<PathBuf> = self
191            .pending_flushes
192            .iter()
193            .map(|entry| entry.key().clone())
194            .collect();
195        self.pending_flushes.clear();
196        if let Err(error) = flush_paths(&paths) {
197            self.metrics.flush_errors.fetch_add(1, Ordering::Relaxed);
198            for path in paths {
199                self.pending_flushes.insert(path, ());
200            }
201            return Err(error);
202        }
203        Ok(())
204    }
205
206    fn metrics(&self) -> PipelineCacheMetrics {
207        self.metrics.snapshot(0, 0)
208    }
209}
210
211fn flush_paths(paths: &[PathBuf]) -> io::Result<()> {
212    let mut parents = Vec::with_capacity(paths.len());
213    sync_paths_bounded(
214        paths,
215        File::sync_data,
216        "pipeline cache file sync worker panicked",
217    )?;
218    for path in paths {
219        if let Some(parent) = path.parent() {
220            parents.push(parent.to_path_buf());
221        }
222    }
223    parents.sort();
224    parents.dedup();
225    sync_parent_dirs(&parents)?;
226    Ok(())
227}
228
229#[cfg(unix)]
230fn sync_parent_dirs(parents: &[PathBuf]) -> io::Result<()> {
231    sync_paths_bounded(
232        parents,
233        File::sync_all,
234        "pipeline cache directory sync worker panicked",
235    )
236}
237
238#[cfg(not(unix))]
239fn sync_parent_dirs(_parents: &[PathBuf]) -> io::Result<()> {
240    Ok(())
241}
242
243fn sync_paths_bounded(
244    paths: &[PathBuf],
245    sync: fn(&File) -> io::Result<()>,
246    panic_message: &'static str,
247) -> io::Result<()> {
248    if paths.is_empty() {
249        return Ok(());
250    }
251    let workers = sync_worker_count();
252    for chunk in paths.chunks(workers) {
253        std::thread::scope(|scope| {
254            let mut handles = Vec::with_capacity(chunk.len());
255            for path in chunk {
256                handles.push(scope.spawn(move || {
257                    let file = File::open(path)?;
258                    sync(&file)
259                }));
260            }
261            for handle in handles {
262                handle
263                    .join()
264                    .map_err(|_| io::Error::other(panic_message))??;
265            }
266            Ok::<(), io::Error>(())
267        })?;
268    }
269    Ok(())
270}
271
272fn sync_worker_count() -> usize {
273    static WORKERS: OnceLock<usize> = OnceLock::new();
274    *WORKERS.get_or_init(|| {
275        std::thread::available_parallelism()
276            .map(usize::from)
277            .unwrap_or(1)
278            .clamp(1, 16)
279    })
280}
281
282/// Errors from disk-backed pipeline cache construction / use.
283#[derive(Debug, thiserror::Error)]
284#[non_exhaustive]
285pub enum DiskCacheError {
286    /// Neither `$XDG_CACHE_HOME` nor `$HOME` is set.
287    #[error(
288        "could not resolve a user cache directory  -  set XDG_CACHE_HOME or HOME, or call DiskCache::new() with an explicit path"
289    )]
290    CacheDirUnknown,
291    /// `std::io` failure (mkdir, read, write).
292    #[error("disk-cache I/O error: {0}")]
293    Io(#[from] io::Error),
294}
295
296#[cfg_attr(not(any(test, feature = "remote")), allow(dead_code))]
297pub(super) fn read_verified_cache_blob(mut reader: impl Read) -> Option<Vec<u8>> {
298    read_verified_cache_blob_with_capacity(&mut reader, 0)
299}
300
301fn read_verified_cache_blob_with_capacity(
302    mut reader: impl Read,
303    capacity: usize,
304) -> Option<Vec<u8>> {
305    let max_encoded_capacity = usize::try_from(MAX_ENCODED_PIPELINE_BLOB_BYTES).ok()?;
306    let mut bytes = Vec::with_capacity(capacity.min(max_encoded_capacity));
307    reader
308        .by_ref()
309        .take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1)
310        .read_to_end(&mut bytes)
311        .ok()?;
312    verify_cache_blob(bytes)
313}
314
315pub(super) fn verify_cache_blob(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
316    let byte_len = u64::try_from(bytes.len()).ok()?;
317    if byte_len > MAX_ENCODED_PIPELINE_BLOB_BYTES || bytes.len() < CHECKSUM_LEN {
318        return None;
319    }
320    let payload_len = bytes.len() - CHECKSUM_LEN;
321    if u64::try_from(payload_len).ok()? > MAX_PIPELINE_BLOB_BYTES {
322        return None;
323    }
324    let (payload, footer) = bytes.split_at(payload_len);
325    let expected = ::blake3::hash(payload);
326    if footer != expected.as_bytes() {
327        return None;
328    }
329    bytes.truncate(payload_len);
330    Some(bytes)
331}
332
333fn append_u64_decimal(out: &mut String, mut value: u64) {
334    let mut digits = [0u8; 20];
335    let mut len = 0usize;
336    loop {
337        digits[len] = b'0' + (value % 10) as u8;
338        len += 1;
339        value /= 10;
340        if value == 0 {
341            break;
342        }
343    }
344    for digit in digits[..len].iter().rev() {
345        out.push(char::from(*digit));
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352    use crate::pipeline_cache::test_helpers::{tiny_program, unique_u64};
353
354    #[test]
355    fn persistent_alias_disk_cache_persists_across_store_reopen() {
356        let root = std::env::temp_dir().join(format!(
357            "vyre-pipeline-cache-test-{}-{}",
358            std::process::id(),
359            unique_u64()
360        ));
361        let fp = PipelineFingerprint::of(&tiny_program());
362
363        let first = DiskCache::new(&root)
364            .expect("Fix: test must create disk cache directory; restore temp-dir access.");
365        first.put(fp, b"compiled-pipeline".to_vec());
366        drop(first);
367
368        let reopened =
369            PersistentPipelineCacheStore::new(&root).expect("Fix: disk cache must reopen.");
370        assert_eq!(
371            reopened.get(&fp).as_deref(),
372            Some(&b"compiled-pipeline"[..]),
373            "Fix: disk pipeline cache must persist artifacts across process-local store reconstruction"
374        );
375
376        std::fs::remove_dir_all(root).expect("Fix: disk cache test root cleanup must succeed");
377    }
378
379    #[test]
380    fn disk_cache_persists_across_store_reopen() {
381        let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
382        let fp = PipelineFingerprint::of(&tiny_program());
383        {
384            let cache = DiskCache::new(temp.path())
385                .expect("Fix: disk cache test must create isolated cache root");
386            cache.put(fp, b"driver-pipeline-blob".to_vec());
387        }
388        let reopened =
389            DiskCache::new(temp.path()).expect("Fix: disk cache must reopen an existing root");
390        assert_eq!(
391            reopened.get(&fp),
392            Some(b"driver-pipeline-blob".to_vec()),
393            "Fix: disk PipelineCacheStore must survive process/backend reconstruction"
394        );
395    }
396
397    #[test]
398    fn disk_cache_flush_is_explicit_durability_boundary() {
399        let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
400        let fp = PipelineFingerprint::of(&tiny_program());
401        let cache = DiskCache::new(temp.path())
402            .expect("Fix: disk cache test must create isolated cache root");
403        cache.put(fp, b"driver-pipeline-blob".to_vec());
404        assert!(
405            !cache.pending_flushes.is_empty(),
406            "Fix: DiskCache::put must defer fsync work until explicit flush."
407        );
408        cache
409            .flush()
410            .expect("Fix: explicit disk cache flush must fsync pending entries.");
411        assert!(
412            cache.pending_flushes.is_empty(),
413            "Fix: explicit disk cache flush must drain pending entries."
414        );
415        assert_eq!(
416            cache.get(&fp),
417            Some(b"driver-pipeline-blob".to_vec()),
418            "Fix: explicit flush must preserve the installed cache artifact."
419        );
420    }
421
422    #[test]
423    fn cache_blob_verifier_accepts_checksum_footer() {
424        let payload = b"compiled-artifact".to_vec();
425        let mut encoded = payload.clone();
426        encoded.extend_from_slice(::blake3::hash(&payload).as_bytes());
427
428        assert_eq!(verify_cache_blob(encoded), Some(payload));
429    }
430
431    #[test]
432    fn cache_blob_verifier_rejects_corrupted_footer() {
433        let payload = b"compiled-artifact".to_vec();
434        let mut encoded = payload;
435        encoded.extend_from_slice(&[0xA5; CHECKSUM_LEN]);
436
437        assert!(
438            verify_cache_blob(encoded).is_none(),
439            "Fix: disk and remote cache readers must reject artifacts whose checksum footer does not match"
440        );
441    }
442
443    #[test]
444    fn cache_blob_reader_rejects_oversized_encoded_blob() {
445        let oversized = std::io::repeat(0).take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1);
446
447        assert!(
448            read_verified_cache_blob(oversized).is_none(),
449            "Fix: disk and remote cache readers must cap encoded blob bytes before allocation"
450        );
451    }
452}