Skip to main content

vyre_runtime/pipeline_cache/
layered.rs

1//! [`LayeredPipelineCache`]  -  composite store that reads from every
2//! backend in order and writes only to the first.
3
4use std::io;
5use std::sync::Arc;
6
7use super::fingerprint::PipelineFingerprint;
8use super::metrics::PipelineCacheMetrics;
9use super::store::PipelineCacheStore;
10
11/// Composite store that reads from every backend and writes to
12/// the first. Lets callers compose `[RamStore, DiskStore, RemoteStore]`
13/// so a miss at the fast layer falls through to slower layers.
14pub struct LayeredPipelineCache {
15    layers: Vec<Arc<dyn PipelineCacheStore>>,
16}
17
18impl LayeredPipelineCache {
19    /// Construct from an ordered list (fastest-first). Lookups
20    /// consult every layer in order; writes land in the first layer
21    /// only  -  downstream layers are expected to be populated
22    /// independently (e.g., from a pre-compiled blob bundle).
23    #[must_use]
24    pub fn new(layers: Vec<Arc<dyn PipelineCacheStore>>) -> Self {
25        Self { layers }
26    }
27}
28
29impl PipelineCacheStore for LayeredPipelineCache {
30    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
31        self.get_arc(fp).map(|artifact| (*artifact).clone())
32    }
33
34    /// V7-PERF-009: forward through to each layer's zero-clone path so
35    /// the hit propagates without an intermediate `Vec<u8>` allocation.
36    fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
37        for layer in &self.layers {
38            if let Some(arc) = layer.get_arc(fp) {
39                return Some(arc);
40            }
41        }
42        None
43    }
44
45    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
46        if let Some(first) = self.layers.first() {
47            first.put(fp, artifact);
48        }
49    }
50
51    fn flush(&self) -> io::Result<()> {
52        for layer in &self.layers {
53            layer.flush()?;
54        }
55        Ok(())
56    }
57
58    fn metrics(&self) -> PipelineCacheMetrics {
59        self.layers
60            .iter()
61            .fold(PipelineCacheMetrics::default(), |acc, layer| {
62                acc.checked_add(layer.metrics())
63            })
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70    use crate::pipeline_cache::test_helpers::tiny_program;
71    use crate::pipeline_cache::InMemoryPipelineCache;
72
73    #[test]
74    fn layered_cache_prefers_first_hit() {
75        let fast = Arc::new(InMemoryPipelineCache::new());
76        let slow = Arc::new(InMemoryPipelineCache::new());
77        let fp = PipelineFingerprint::of(&tiny_program());
78        slow.put(fp, b"fallback".to_vec());
79        let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
80        // Miss in fast, hit in slow.
81        assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
82        // Put lands in fast only.
83        cache.put(fp, b"warmed".to_vec());
84        assert_eq!(fast.get(&fp).unwrap(), b"warmed".to_vec());
85    }
86
87    #[test]
88    fn layered_cache_metrics_aggregate_layers() {
89        let fast = Arc::new(InMemoryPipelineCache::new());
90        let slow = Arc::new(InMemoryPipelineCache::new());
91        let fp = PipelineFingerprint::of(&tiny_program());
92        slow.put(fp, b"slow".to_vec());
93        let cache = LayeredPipelineCache::new(vec![fast, slow]);
94
95        assert_eq!(cache.get(&fp).unwrap(), b"slow".to_vec());
96        let metrics = cache.metrics();
97        assert_eq!(metrics.lookups, 2);
98        assert_eq!(metrics.hits, 1);
99        assert_eq!(metrics.misses, 1);
100    }
101}