use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use super::fingerprint::PipelineFingerprint;
use super::metrics::PipelineCacheMetrics;
use super::store::PipelineCacheStore;
pub struct LayeredPipelineCache {
layers: Vec<Arc<dyn PipelineCacheStore>>,
promotions: LayeredPromotionCounters,
}
impl LayeredPipelineCache {
#[must_use]
pub fn new(layers: Vec<Arc<dyn PipelineCacheStore>>) -> Self {
Self {
layers,
promotions: LayeredPromotionCounters::default(),
}
}
#[must_use]
pub fn promotion_report(&self) -> LayeredPromotionReport {
self.promotions.snapshot()
}
fn promote_hit_to_faster_layers(
&self,
fp: PipelineFingerprint,
artifact: &Arc<Vec<u8>>,
source_layer: usize,
) {
if source_layer == 0 {
return;
}
let promoted_bytes = artifact.len() as u64;
let mut promoted_layers = 0u64;
for layer in &self.layers[..source_layer] {
layer.put(fp, artifact.as_ref().clone());
promoted_layers = promoted_layers.saturating_add(1);
}
self.promotions.record(
source_layer,
promoted_layers,
promoted_bytes.saturating_mul(promoted_layers),
);
}
}
#[derive(Debug, Default)]
struct LayeredPromotionCounters {
events: AtomicU64,
promoted_layers: AtomicU64,
promoted_bytes: AtomicU64,
last_source_layer: AtomicU64,
last_promoted_layers: AtomicU64,
last_promoted_bytes: AtomicU64,
}
impl LayeredPromotionCounters {
fn record(&self, source_layer: usize, promoted_layers: u64, promoted_bytes: u64) {
self.events.fetch_add(1, Ordering::Relaxed);
self.promoted_layers
.fetch_add(promoted_layers, Ordering::Relaxed);
self.promoted_bytes
.fetch_add(promoted_bytes, Ordering::Relaxed);
self.last_source_layer
.store(source_layer as u64, Ordering::Relaxed);
self.last_promoted_layers
.store(promoted_layers, Ordering::Relaxed);
self.last_promoted_bytes
.store(promoted_bytes, Ordering::Relaxed);
}
fn snapshot(&self) -> LayeredPromotionReport {
LayeredPromotionReport {
events: self.events.load(Ordering::Relaxed),
promoted_layers: self.promoted_layers.load(Ordering::Relaxed),
promoted_bytes: self.promoted_bytes.load(Ordering::Relaxed),
last_source_layer: self.last_source_layer.load(Ordering::Relaxed),
last_promoted_layers: self.last_promoted_layers.load(Ordering::Relaxed),
last_promoted_bytes: self.last_promoted_bytes.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct LayeredPromotionReport {
pub events: u64,
pub promoted_layers: u64,
pub promoted_bytes: u64,
pub last_source_layer: u64,
pub last_promoted_layers: u64,
pub last_promoted_bytes: u64,
}
impl PipelineCacheStore for LayeredPipelineCache {
fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
self.get_arc(fp).map(|artifact| (*artifact).clone())
}
fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
for (index, layer) in self.layers.iter().enumerate() {
if let Some(arc) = layer.get_arc(fp) {
self.promote_hit_to_faster_layers(*fp, &arc, index);
return Some(arc);
}
}
None
}
fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
if let Some(first) = self.layers.first() {
first.put(fp, artifact);
}
}
fn flush(&self) -> io::Result<()> {
for layer in &self.layers {
layer.flush()?;
}
Ok(())
}
fn metrics(&self) -> PipelineCacheMetrics {
self.layers
.iter()
.fold(PipelineCacheMetrics::default(), |acc, layer| {
acc.checked_add(layer.metrics())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline_cache::test_helpers::tiny_program;
use crate::pipeline_cache::InMemoryPipelineCache;
#[test]
fn layered_cache_prefers_first_hit() {
let fast = Arc::new(InMemoryPipelineCache::new());
let slow = Arc::new(InMemoryPipelineCache::new());
let fp = PipelineFingerprint::of(&tiny_program());
slow.put(fp, b"fallback".to_vec());
let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
cache.put(fp, b"warmed".to_vec());
assert_eq!(fast.get(&fp).unwrap(), b"warmed".to_vec());
}
#[test]
fn layered_cache_metrics_aggregate_layers() {
let fast = Arc::new(InMemoryPipelineCache::new());
let slow = Arc::new(InMemoryPipelineCache::new());
let fp = PipelineFingerprint::of(&tiny_program());
slow.put(fp, b"slow".to_vec());
let cache = LayeredPipelineCache::new(vec![fast, slow]);
assert_eq!(cache.get(&fp).unwrap(), b"slow".to_vec());
let metrics = cache.metrics();
assert_eq!(metrics.lookups, 2);
assert_eq!(metrics.hits, 1);
assert_eq!(metrics.misses, 1);
}
#[test]
fn layered_cache_promotes_lower_layer_hit_to_faster_layers_with_report() {
let fast = Arc::new(InMemoryPipelineCache::new());
let slow = Arc::new(InMemoryPipelineCache::new());
let fp = PipelineFingerprint::of(&tiny_program());
slow.put(fp, b"fallback".to_vec());
let cache = LayeredPipelineCache::new(vec![fast.clone(), slow]);
assert_eq!(cache.get(&fp).unwrap(), b"fallback".to_vec());
assert_eq!(fast.get(&fp).unwrap(), b"fallback".to_vec());
let report = cache.promotion_report();
assert_eq!(report.events, 1);
assert_eq!(report.promoted_layers, 1);
assert_eq!(report.promoted_bytes, b"fallback".len() as u64);
assert_eq!(report.last_source_layer, 1);
assert_eq!(report.last_promoted_layers, 1);
assert_eq!(report.last_promoted_bytes, b"fallback".len() as u64);
}
}