vyre_runtime/pipeline_cache/
disk.rs1use std::fs::{self, File};
7use std::io::Read;
8use std::io::{self, Write};
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::OnceLock;
12
13use dashmap::DashMap;
14
15use super::fingerprint::PipelineFingerprint;
16use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
17use super::store::PipelineCacheStore;
18
19#[derive(Debug)]
25pub struct DiskCache {
26 root: PathBuf,
27 pending_flushes: DashMap<PathBuf, ()>,
28 metrics: PipelineCacheCounters,
29}
30
31static DISK_CACHE_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
36
37pub type PersistentPipelineCacheStore = DiskCache;
39
40pub(super) const CHECKSUM_LEN: usize = 32;
47pub(super) const CHECKSUM_LEN_U64: u64 = 32;
48pub(super) const MAX_PIPELINE_BLOB_BYTES: u64 = 64 * 1024 * 1024;
49pub(super) const MAX_ENCODED_PIPELINE_BLOB_BYTES: u64 = MAX_PIPELINE_BLOB_BYTES + CHECKSUM_LEN_U64;
50
51impl DiskCache {
52 pub fn new(root: impl Into<PathBuf>) -> Result<Self, DiskCacheError> {
60 let root = root.into();
61 fs::create_dir_all(&root).map_err(DiskCacheError::Io)?;
62 Ok(Self {
63 root,
64 pending_flushes: DashMap::new(),
65 metrics: PipelineCacheCounters::default(),
66 })
67 }
68
69 pub fn in_user_cache() -> Result<Self, DiskCacheError> {
77 let base = std::env::var_os("XDG_CACHE_HOME")
78 .map(PathBuf::from)
79 .or_else(|| std::env::var_os("HOME").map(|h| Path::new(&h).join(".cache")))
80 .ok_or(DiskCacheError::CacheDirUnknown)?;
81 Self::new(base.join("vyre").join("pipelines"))
82 }
83
84 #[must_use]
86 pub fn root(&self) -> &Path {
87 &self.root
88 }
89
90 fn path_for(&self, fp: &PipelineFingerprint) -> PathBuf {
91 self.root.join(cache_file_name(fp))
92 }
93}
94
95fn cache_file_name(fp: &PipelineFingerprint) -> String {
96 let mut file_name = String::with_capacity(68);
97 fp.push_hex(&mut file_name);
98 file_name.push_str(".bin");
99 file_name
100}
101
102impl PipelineCacheStore for DiskCache {
103 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
104 self.metrics.lookups.fetch_add(1, Ordering::Relaxed);
105 let path = self.path_for(fp);
106 let Some(meta) = fs::symlink_metadata(&path).ok() else {
109 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
110 return None;
111 };
112 if !meta.file_type().is_file() {
113 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
114 return None;
115 }
116 if meta.len() > MAX_ENCODED_PIPELINE_BLOB_BYTES {
117 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
118 return None;
119 }
120 let Some(file) = File::open(&path).ok() else {
121 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
122 return None;
123 };
124 let capacity = usize::try_from(meta.len()).ok()?;
125 let result = read_verified_cache_blob_with_capacity(file, capacity);
126 if result.is_some() {
127 self.metrics.hits.fetch_add(1, Ordering::Relaxed);
128 } else {
129 self.metrics.misses.fetch_add(1, Ordering::Relaxed);
130 }
131 result
132 }
133
134 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
135 let tmp_id = DISK_CACHE_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
136 let mut tmp_name = String::with_capacity(85);
137 tmp_name.push('.');
138 fp.push_hex(&mut tmp_name);
139 tmp_name.push('-');
140 append_u64_decimal(&mut tmp_name, tmp_id);
141 tmp_name.push_str(".bin.tmp");
142 let tmp_path = self.root.join(&tmp_name);
143
144 let mut final_name = String::with_capacity(68);
145 fp.push_hex(&mut final_name);
146 final_name.push_str(".bin");
147 let final_path = self.root.join(&final_name);
148
149 let write_rename = || -> io::Result<()> {
154 let checksum = ::blake3::hash(&artifact);
155 let mut f = File::create(&tmp_path)?;
156 f.write_all(&artifact)?;
157 f.write_all(checksum.as_bytes())?;
158 drop(f);
159 if let Ok(meta) = fs::symlink_metadata(&final_path) {
162 if meta.file_type().is_symlink() {
163 fs::remove_file(&final_path)?;
164 }
165 }
166 fs::rename(&tmp_path, &final_path)?;
167 self.pending_flushes.insert(final_path, ());
168 Ok(())
169 };
170 if write_rename().is_err() {
171 self.metrics.rejected_puts.fetch_add(1, Ordering::Relaxed);
172 match fs::remove_file(&tmp_path) {
175 Ok(()) => {}
176 Err(error) if error.kind() == io::ErrorKind::NotFound => {}
177 Err(error) => tracing::warn!(
178 tmp_path = %tmp_path.display(),
179 error = %error,
180 "failed to remove temporary disk-cache artifact after rejected put"
181 ),
182 }
183 } else {
184 self.metrics.puts.fetch_add(1, Ordering::Relaxed);
185 }
186 }
187
188 fn flush(&self) -> io::Result<()> {
189 self.metrics.flushes.fetch_add(1, Ordering::Relaxed);
190 let paths: Vec<PathBuf> = self
191 .pending_flushes
192 .iter()
193 .map(|entry| entry.key().clone())
194 .collect();
195 self.pending_flushes.clear();
196 if let Err(error) = flush_paths(&paths) {
197 self.metrics.flush_errors.fetch_add(1, Ordering::Relaxed);
198 for path in paths {
199 self.pending_flushes.insert(path, ());
200 }
201 return Err(error);
202 }
203 Ok(())
204 }
205
206 fn metrics(&self) -> PipelineCacheMetrics {
207 self.metrics.snapshot(0, 0)
208 }
209}
210
211fn flush_paths(paths: &[PathBuf]) -> io::Result<()> {
212 let mut parents = Vec::with_capacity(paths.len());
213 sync_paths_bounded(
214 paths,
215 File::sync_data,
216 "pipeline cache file sync worker panicked",
217 )?;
218 for path in paths {
219 if let Some(parent) = path.parent() {
220 parents.push(parent.to_path_buf());
221 }
222 }
223 parents.sort();
224 parents.dedup();
225 sync_parent_dirs(&parents)?;
226 Ok(())
227}
228
229#[cfg(unix)]
230fn sync_parent_dirs(parents: &[PathBuf]) -> io::Result<()> {
231 sync_paths_bounded(
232 parents,
233 File::sync_all,
234 "pipeline cache directory sync worker panicked",
235 )
236}
237
238#[cfg(not(unix))]
239fn sync_parent_dirs(_parents: &[PathBuf]) -> io::Result<()> {
240 Ok(())
241}
242
243fn sync_paths_bounded(
244 paths: &[PathBuf],
245 sync: fn(&File) -> io::Result<()>,
246 panic_message: &'static str,
247) -> io::Result<()> {
248 if paths.is_empty() {
249 return Ok(());
250 }
251 let workers = sync_worker_count();
252 for chunk in paths.chunks(workers) {
253 std::thread::scope(|scope| {
254 let mut handles = Vec::with_capacity(chunk.len());
255 for path in chunk {
256 handles.push(scope.spawn(move || {
257 let file = File::open(path)?;
258 sync(&file)
259 }));
260 }
261 for handle in handles {
262 handle
263 .join()
264 .map_err(|_| io::Error::other(panic_message))??;
265 }
266 Ok::<(), io::Error>(())
267 })?;
268 }
269 Ok(())
270}
271
272fn sync_worker_count() -> usize {
273 static WORKERS: OnceLock<usize> = OnceLock::new();
274 *WORKERS.get_or_init(|| {
275 std::thread::available_parallelism()
276 .map(usize::from)
277 .unwrap_or(1)
278 .clamp(1, 16)
279 })
280}
281
282#[derive(Debug, thiserror::Error)]
284#[non_exhaustive]
285pub enum DiskCacheError {
286 #[error(
288 "could not resolve a user cache directory - set XDG_CACHE_HOME or HOME, or call DiskCache::new() with an explicit path"
289 )]
290 CacheDirUnknown,
291 #[error("disk-cache I/O error: {0}")]
293 Io(#[from] io::Error),
294}
295
296#[cfg_attr(not(any(test, feature = "remote")), allow(dead_code))]
297pub(super) fn read_verified_cache_blob(mut reader: impl Read) -> Option<Vec<u8>> {
298 read_verified_cache_blob_with_capacity(&mut reader, 0)
299}
300
301fn read_verified_cache_blob_with_capacity(
302 mut reader: impl Read,
303 capacity: usize,
304) -> Option<Vec<u8>> {
305 let max_encoded_capacity = usize::try_from(MAX_ENCODED_PIPELINE_BLOB_BYTES).ok()?;
306 let mut bytes = Vec::with_capacity(capacity.min(max_encoded_capacity));
307 reader
308 .by_ref()
309 .take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1)
310 .read_to_end(&mut bytes)
311 .ok()?;
312 verify_cache_blob(bytes)
313}
314
315pub(super) fn verify_cache_blob(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
316 let byte_len = u64::try_from(bytes.len()).ok()?;
317 if byte_len > MAX_ENCODED_PIPELINE_BLOB_BYTES || bytes.len() < CHECKSUM_LEN {
318 return None;
319 }
320 let payload_len = bytes.len() - CHECKSUM_LEN;
321 if u64::try_from(payload_len).ok()? > MAX_PIPELINE_BLOB_BYTES {
322 return None;
323 }
324 let (payload, footer) = bytes.split_at(payload_len);
325 let expected = ::blake3::hash(payload);
326 if footer != expected.as_bytes() {
327 return None;
328 }
329 bytes.truncate(payload_len);
330 Some(bytes)
331}
332
333fn append_u64_decimal(out: &mut String, mut value: u64) {
334 let mut digits = [0u8; 20];
335 let mut len = 0usize;
336 loop {
337 digits[len] = b'0' + (value % 10) as u8;
338 len += 1;
339 value /= 10;
340 if value == 0 {
341 break;
342 }
343 }
344 for digit in digits[..len].iter().rev() {
345 out.push(char::from(*digit));
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352 use crate::pipeline_cache::test_helpers::{tiny_program, unique_u64};
353
354 #[test]
355 fn persistent_alias_disk_cache_persists_across_store_reopen() {
356 let root = std::env::temp_dir().join(format!(
357 "vyre-pipeline-cache-test-{}-{}",
358 std::process::id(),
359 unique_u64()
360 ));
361 let fp = PipelineFingerprint::of(&tiny_program());
362
363 let first = DiskCache::new(&root)
364 .expect("Fix: test must create disk cache directory; restore temp-dir access.");
365 first.put(fp, b"compiled-pipeline".to_vec());
366 drop(first);
367
368 let reopened =
369 PersistentPipelineCacheStore::new(&root).expect("Fix: disk cache must reopen.");
370 assert_eq!(
371 reopened.get(&fp).as_deref(),
372 Some(&b"compiled-pipeline"[..]),
373 "Fix: disk pipeline cache must persist artifacts across process-local store reconstruction"
374 );
375
376 std::fs::remove_dir_all(root).expect("Fix: disk cache test root cleanup must succeed");
377 }
378
379 #[test]
380 fn disk_cache_persists_across_store_reopen() {
381 let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
382 let fp = PipelineFingerprint::of(&tiny_program());
383 {
384 let cache = DiskCache::new(temp.path())
385 .expect("Fix: disk cache test must create isolated cache root");
386 cache.put(fp, b"driver-pipeline-blob".to_vec());
387 }
388 let reopened =
389 DiskCache::new(temp.path()).expect("Fix: disk cache must reopen an existing root");
390 assert_eq!(
391 reopened.get(&fp),
392 Some(b"driver-pipeline-blob".to_vec()),
393 "Fix: disk PipelineCacheStore must survive process/backend reconstruction"
394 );
395 }
396
397 #[test]
398 fn disk_cache_flush_is_explicit_durability_boundary() {
399 let temp = tempfile::TempDir::new().expect("Fix: tempdir required for disk cache test");
400 let fp = PipelineFingerprint::of(&tiny_program());
401 let cache = DiskCache::new(temp.path())
402 .expect("Fix: disk cache test must create isolated cache root");
403 cache.put(fp, b"driver-pipeline-blob".to_vec());
404 assert!(
405 !cache.pending_flushes.is_empty(),
406 "Fix: DiskCache::put must defer fsync work until explicit flush."
407 );
408 cache
409 .flush()
410 .expect("Fix: explicit disk cache flush must fsync pending entries.");
411 assert!(
412 cache.pending_flushes.is_empty(),
413 "Fix: explicit disk cache flush must drain pending entries."
414 );
415 assert_eq!(
416 cache.get(&fp),
417 Some(b"driver-pipeline-blob".to_vec()),
418 "Fix: explicit flush must preserve the installed cache artifact."
419 );
420 }
421
422 #[test]
423 fn cache_blob_verifier_accepts_checksum_footer() {
424 let payload = b"compiled-artifact".to_vec();
425 let mut encoded = payload.clone();
426 encoded.extend_from_slice(::blake3::hash(&payload).as_bytes());
427
428 assert_eq!(verify_cache_blob(encoded), Some(payload));
429 }
430
431 #[test]
432 fn cache_blob_verifier_rejects_corrupted_footer() {
433 let payload = b"compiled-artifact".to_vec();
434 let mut encoded = payload;
435 encoded.extend_from_slice(&[0xA5; CHECKSUM_LEN]);
436
437 assert!(
438 verify_cache_blob(encoded).is_none(),
439 "Fix: disk and remote cache readers must reject artifacts whose checksum footer does not match"
440 );
441 }
442
443 #[test]
444 fn cache_blob_reader_rejects_oversized_encoded_blob() {
445 let oversized = std::io::repeat(0).take(MAX_ENCODED_PIPELINE_BLOB_BYTES + 1);
446
447 assert!(
448 read_verified_cache_blob(oversized).is_none(),
449 "Fix: disk and remote cache readers must cap encoded blob bytes before allocation"
450 );
451 }
452}