vyre_runtime/pipeline_cache/
layered.rs1use std::io;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use super::fingerprint::PipelineFingerprint;
9use super::metrics::PipelineCacheMetrics;
10use super::store::PipelineCacheStore;
11
12pub struct LayeredPipelineCache {
16 layers: Vec<Arc<dyn PipelineCacheStore>>,
17 promotions: LayeredPromotionCounters,
18}
19
20impl LayeredPipelineCache {
21 #[must_use]
26 pub fn new(layers: Vec<Arc<dyn PipelineCacheStore>>) -> Self {
27 Self {
28 layers,
29 promotions: LayeredPromotionCounters::default(),
30 }
31 }
32
33 #[must_use]
36 pub fn promotion_report(&self) -> LayeredPromotionReport {
37 self.promotions.snapshot()
38 }
39
40 fn promote_hit_to_faster_layers(
41 &self,
42 fp: PipelineFingerprint,
43 artifact: &Arc<Vec<u8>>,
44 source_layer: usize,
45 ) {
46 if source_layer == 0 {
47 return;
48 }
49 let promoted_bytes = artifact.len() as u64;
50 let mut promoted_layers = 0u64;
51 for layer in &self.layers[..source_layer] {
52 layer.put(fp, artifact.as_ref().clone());
53 promoted_layers = promoted_layers.saturating_add(1);
54 }
55 self.promotions.record(
56 source_layer,
57 promoted_layers,
58 promoted_bytes.saturating_mul(promoted_layers),
59 );
60 }
61}
62
63#[derive(Debug, Default)]
64struct LayeredPromotionCounters {
65 events: AtomicU64,
66 promoted_layers: AtomicU64,
67 promoted_bytes: AtomicU64,
68 last_source_layer: AtomicU64,
69 last_promoted_layers: AtomicU64,
70 last_promoted_bytes: AtomicU64,
71}
72
73impl LayeredPromotionCounters {
74 fn record(&self, source_layer: usize, promoted_layers: u64, promoted_bytes: u64) {
75 self.events.fetch_add(1, Ordering::Relaxed);
76 self.promoted_layers
77 .fetch_add(promoted_layers, Ordering::Relaxed);
78 self.promoted_bytes
79 .fetch_add(promoted_bytes, Ordering::Relaxed);
80 self.last_source_layer
81 .store(source_layer as u64, Ordering::Relaxed);
82 self.last_promoted_layers
83 .store(promoted_layers, Ordering::Relaxed);
84 self.last_promoted_bytes
85 .store(promoted_bytes, Ordering::Relaxed);
86 }
87
88 fn snapshot(&self) -> LayeredPromotionReport {
89 LayeredPromotionReport {
90 events: self.events.load(Ordering::Relaxed),
91 promoted_layers: self.promoted_layers.load(Ordering::Relaxed),
92 promoted_bytes: self.promoted_bytes.load(Ordering::Relaxed),
93 last_source_layer: self.last_source_layer.load(Ordering::Relaxed),
94 last_promoted_layers: self.last_promoted_layers.load(Ordering::Relaxed),
95 last_promoted_bytes: self.last_promoted_bytes.load(Ordering::Relaxed),
96 }
97 }
98}
99
100#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
102pub struct LayeredPromotionReport {
103 pub events: u64,
105 pub promoted_layers: u64,
107 pub promoted_bytes: u64,
109 pub last_source_layer: u64,
111 pub last_promoted_layers: u64,
113 pub last_promoted_bytes: u64,
115}
116
117impl PipelineCacheStore for LayeredPipelineCache {
118 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
119 self.get_arc(fp).map(|artifact| (*artifact).clone())
120 }
121
122 fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
125 for (index, layer) in self.layers.iter().enumerate() {
126 if let Some(arc) = layer.get_arc(fp) {
127 self.promote_hit_to_faster_layers(*fp, &arc, index);
128 return Some(arc);
129 }
130 }
131 None
132 }
133
134 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
135 if let Some(first) = self.layers.first() {
136 first.put(fp, artifact);
137 }
138 }
139
140 fn flush(&self) -> io::Result<()> {
141 for layer in &self.layers {
142 layer.flush()?;
143 }
144 Ok(())
145 }
146
147 fn metrics(&self) -> PipelineCacheMetrics {
148 self.layers
149 .iter()
150 .fold(PipelineCacheMetrics::default(), |acc, layer| {
151 acc.checked_add(layer.metrics())
152 })
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::pipeline_cache::test_helpers::tiny_program;
160 use crate::pipeline_cache::InMemoryPipelineCache;
161
162 #[test]
163 fn layered_cache_prefers_first_hit() {
164 let fast = Arc::new(InMemoryPipelineCache::new());
165 let slow = Arc::new(InMemoryPipelineCache::new());
166 let fp = PipelineFingerprint::of(&tiny_program());
167 slow.put(fp, b"fallback".to_vec());
168 let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
169 assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
171 cache.put(fp, b"warmed".to_vec());
173 assert_eq!(fast.get(&fp).unwrap(), b"warmed".to_vec());
174 }
175
176 #[test]
177 fn layered_cache_metrics_aggregate_layers() {
178 let fast = Arc::new(InMemoryPipelineCache::new());
179 let slow = Arc::new(InMemoryPipelineCache::new());
180 let fp = PipelineFingerprint::of(&tiny_program());
181 slow.put(fp, b"slow".to_vec());
182 let cache = LayeredPipelineCache::new(vec![fast, slow]);
183
184 assert_eq!(cache.get(&fp).unwrap(), b"slow".to_vec());
185 let metrics = cache.metrics();
186 assert_eq!(metrics.lookups, 2);
187 assert_eq!(metrics.hits, 1);
188 assert_eq!(metrics.misses, 1);
189 }
190
191 #[test]
192 fn layered_cache_promotes_lower_layer_hit_to_faster_layers_with_report() {
193 let fast = Arc::new(InMemoryPipelineCache::new());
194 let slow = Arc::new(InMemoryPipelineCache::new());
195 let fp = PipelineFingerprint::of(&tiny_program());
196 slow.put(fp, b"fallback".to_vec());
197 let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
198
199 assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
200 assert_eq!(fast.get(&fp).unwrap(), b"fallback".to_vec());
201
202 let report = cache.promotion_report();
203 assert_eq!(report.events, 1);
204 assert_eq!(report.promoted_layers, 1);
205 assert_eq!(report.promoted_bytes, b"fallback".len() as u64);
206 assert_eq!(report.last_source_layer, 1);
207 assert_eq!(report.last_promoted_layers, 1);
208 assert_eq!(report.last_promoted_bytes, b"fallback".len() as u64);
209 }
210}