Skip to main content

vyre_runtime/pipeline_cache/
layered.rs

1//! [`LayeredPipelineCache`]  -  composite store that reads from every
2//! backend in order and writes only to the first.
3
4use std::io;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use super::fingerprint::PipelineFingerprint;
9use super::metrics::PipelineCacheMetrics;
10use super::store::PipelineCacheStore;
11
12/// Composite store that reads from every backend and writes to
13/// the first. Lets callers compose `[RamStore, DiskStore, RemoteStore]`
14/// so a miss at the fast layer falls through to slower layers.
15pub struct LayeredPipelineCache {
16    layers: Vec<Arc<dyn PipelineCacheStore>>,
17    promotions: LayeredPromotionCounters,
18}
19
20impl LayeredPipelineCache {
21    /// Construct from an ordered list (fastest-first). Lookups
22    /// consult every layer in order; writes land in the first layer
23    /// only  -  downstream layers are expected to be populated
24    /// independently (e.g., from a pre-compiled blob bundle).
25    #[must_use]
26    pub fn new(layers: Vec<Arc<dyn PipelineCacheStore>>) -> Self {
27        Self {
28            layers,
29            promotions: LayeredPromotionCounters::default(),
30        }
31    }
32
33    /// Snapshot promotion evidence for lower-layer hits copied into faster
34    /// preceding layers.
35    #[must_use]
36    pub fn promotion_report(&self) -> LayeredPromotionReport {
37        self.promotions.snapshot()
38    }
39
40    fn promote_hit_to_faster_layers(
41        &self,
42        fp: PipelineFingerprint,
43        artifact: &Arc<Vec<u8>>,
44        source_layer: usize,
45    ) {
46        if source_layer == 0 {
47            return;
48        }
49        let promoted_bytes = artifact.len() as u64;
50        let mut promoted_layers = 0u64;
51        for layer in &self.layers[..source_layer] {
52            layer.put(fp, artifact.as_ref().clone());
53            promoted_layers = promoted_layers.saturating_add(1);
54        }
55        self.promotions.record(
56            source_layer,
57            promoted_layers,
58            promoted_bytes.saturating_mul(promoted_layers),
59        );
60    }
61}
62
63#[derive(Debug, Default)]
64struct LayeredPromotionCounters {
65    events: AtomicU64,
66    promoted_layers: AtomicU64,
67    promoted_bytes: AtomicU64,
68    last_source_layer: AtomicU64,
69    last_promoted_layers: AtomicU64,
70    last_promoted_bytes: AtomicU64,
71}
72
73impl LayeredPromotionCounters {
74    fn record(&self, source_layer: usize, promoted_layers: u64, promoted_bytes: u64) {
75        self.events.fetch_add(1, Ordering::Relaxed);
76        self.promoted_layers
77            .fetch_add(promoted_layers, Ordering::Relaxed);
78        self.promoted_bytes
79            .fetch_add(promoted_bytes, Ordering::Relaxed);
80        self.last_source_layer
81            .store(source_layer as u64, Ordering::Relaxed);
82        self.last_promoted_layers
83            .store(promoted_layers, Ordering::Relaxed);
84        self.last_promoted_bytes
85            .store(promoted_bytes, Ordering::Relaxed);
86    }
87
88    fn snapshot(&self) -> LayeredPromotionReport {
89        LayeredPromotionReport {
90            events: self.events.load(Ordering::Relaxed),
91            promoted_layers: self.promoted_layers.load(Ordering::Relaxed),
92            promoted_bytes: self.promoted_bytes.load(Ordering::Relaxed),
93            last_source_layer: self.last_source_layer.load(Ordering::Relaxed),
94            last_promoted_layers: self.last_promoted_layers.load(Ordering::Relaxed),
95            last_promoted_bytes: self.last_promoted_bytes.load(Ordering::Relaxed),
96        }
97    }
98}
99
100/// Layered-cache promotion evidence.
101#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
102pub struct LayeredPromotionReport {
103    /// Number of lower-layer hit events that caused promotion.
104    pub events: u64,
105    /// Total faster-layer writes caused by promotion.
106    pub promoted_layers: u64,
107    /// Total artifact bytes copied into faster layers.
108    pub promoted_bytes: u64,
109    /// Index of the most recent source layer that supplied a promoted hit.
110    pub last_source_layer: u64,
111    /// Number of faster layers written by the most recent promotion.
112    pub last_promoted_layers: u64,
113    /// Artifact bytes copied by the most recent promotion.
114    pub last_promoted_bytes: u64,
115}
116
117impl PipelineCacheStore for LayeredPipelineCache {
118    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
119        self.get_arc(fp).map(|artifact| (*artifact).clone())
120    }
121
122    /// V7-PERF-009: forward through to each layer's zero-clone path so
123    /// the hit propagates without an intermediate `Vec<u8>` allocation.
124    fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
125        for (index, layer) in self.layers.iter().enumerate() {
126            if let Some(arc) = layer.get_arc(fp) {
127                self.promote_hit_to_faster_layers(*fp, &arc, index);
128                return Some(arc);
129            }
130        }
131        None
132    }
133
134    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
135        if let Some(first) = self.layers.first() {
136            first.put(fp, artifact);
137        }
138    }
139
140    fn flush(&self) -> io::Result<()> {
141        for layer in &self.layers {
142            layer.flush()?;
143        }
144        Ok(())
145    }
146
147    fn metrics(&self) -> PipelineCacheMetrics {
148        self.layers
149            .iter()
150            .fold(PipelineCacheMetrics::default(), |acc, layer| {
151                acc.checked_add(layer.metrics())
152            })
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::pipeline_cache::test_helpers::tiny_program;
160    use crate::pipeline_cache::InMemoryPipelineCache;
161
162    #[test]
163    fn layered_cache_prefers_first_hit() {
164        let fast = Arc::new(InMemoryPipelineCache::new());
165        let slow = Arc::new(InMemoryPipelineCache::new());
166        let fp = PipelineFingerprint::of(&tiny_program());
167        slow.put(fp, b"fallback".to_vec());
168        let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
169        // Miss in fast, hit in slow.
170        assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
171        // Put lands in fast only.
172        cache.put(fp, b"warmed".to_vec());
173        assert_eq!(fast.get(&fp).unwrap(), b"warmed".to_vec());
174    }
175
176    #[test]
177    fn layered_cache_metrics_aggregate_layers() {
178        let fast = Arc::new(InMemoryPipelineCache::new());
179        let slow = Arc::new(InMemoryPipelineCache::new());
180        let fp = PipelineFingerprint::of(&tiny_program());
181        slow.put(fp, b"slow".to_vec());
182        let cache = LayeredPipelineCache::new(vec![fast, slow]);
183
184        assert_eq!(cache.get(&fp).unwrap(), b"slow".to_vec());
185        let metrics = cache.metrics();
186        assert_eq!(metrics.lookups, 2);
187        assert_eq!(metrics.hits, 1);
188        assert_eq!(metrics.misses, 1);
189    }
190
191    #[test]
192    fn layered_cache_promotes_lower_layer_hit_to_faster_layers_with_report() {
193        let fast = Arc::new(InMemoryPipelineCache::new());
194        let slow = Arc::new(InMemoryPipelineCache::new());
195        let fp = PipelineFingerprint::of(&tiny_program());
196        slow.put(fp, b"fallback".to_vec());
197        let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
198
199        assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
200        assert_eq!(fast.get(&fp).unwrap(), b"fallback".to_vec());
201
202        let report = cache.promotion_report();
203        assert_eq!(report.events, 1);
204        assert_eq!(report.promoted_layers, 1);
205        assert_eq!(report.promoted_bytes, b"fallback".len() as u64);
206        assert_eq!(report.last_source_layer, 1);
207        assert_eq!(report.last_promoted_layers, 1);
208        assert_eq!(report.last_promoted_bytes, b"fallback".len() as u64);
209    }
210}