pub mod cache;
pub mod compiler;
pub mod hashing;
pub use cache::{DiskPipelineCache, PipelineCacheKey, PipelineFeatureFlags};
pub use compiler::{
compile, compile_owned, compile_owned_with_telemetry, compile_shared,
compile_shared_with_telemetry, compile_with_telemetry, prewarm, prewarm_owned, prewarm_shared,
};
pub use hashing::{
dispatch_policy_cache_string, hex_encode, hex_short, normalized_program_cache_digest,
update_dispatch_policy_cache_hash, PipelineDeviceFingerprint,
};
pub const CURRENT_PIPELINE_CACHE_KEY_VERSION: u32 = 1;
pub const DEFAULT_PIPELINE_CACHE_ENTRIES: usize = 256;
pub const DEFAULT_PIPELINE_CACHE_BYTES: usize = 256 * 1024 * 1024;
pub const DEFAULT_1D_WORKGROUP_SIZE: [u32; 3] = [64, 1, 1];
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct PipelineCacheSnapshot {
pub hits: u64,
pub misses: u64,
}
#[derive(Clone)]
pub struct CompiledPipelineBuild {
pub pipeline: std::sync::Arc<dyn crate::backend::CompiledPipeline>,
pub cache_hit: Option<bool>,
pub manifest: PipelineReproManifest,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PipelinePrewarmReport {
pub pipeline_id: String,
pub cache_hit: Option<bool>,
pub manifest: PipelineReproManifest,
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct PipelineReproManifest {
pub schema: u32,
pub backend_id: String,
pub pipeline_id: String,
pub program_digest: String,
pub dispatch_policy: String,
pub cache_hit: Option<bool>,
}
impl PipelineReproManifest {
pub const SCHEMA: u32 = 1;
#[must_use]
pub fn new(
backend_id: impl Into<String>,
pipeline_id: impl Into<String>,
program_digest: [u8; 32],
dispatch_policy: impl Into<String>,
cache_hit: Option<bool>,
) -> Self {
Self {
schema: Self::SCHEMA,
backend_id: backend_id.into(),
pipeline_id: pipeline_id.into(),
program_digest: hex_encode(&program_digest),
dispatch_policy: dispatch_policy.into(),
cache_hit,
}
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
#[derive(Debug, Default, Clone)]
pub struct PipelineCacheAudit {
hits: u64,
misses: u64,
unknowns: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PipelineCacheAuditReport {
pub hits: u64,
pub misses: u64,
pub unknowns: u64,
pub hit_rate_bps: Option<u32>,
pub below_alarm_threshold: bool,
}
impl PipelineCacheAudit {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn observe(&mut self, cache_hit: Option<bool>) {
match cache_hit {
Some(true) => self.hits = self.hits.saturating_add(1),
Some(false) => self.misses = self.misses.saturating_add(1),
None => self.unknowns = self.unknowns.saturating_add(1),
}
}
#[must_use]
pub fn snapshot(&self, alarm_threshold_bps: u32) -> PipelineCacheAuditReport {
let denominator = self.hits.saturating_add(self.misses);
let hit_rate_bps = if denominator == 0 {
None
} else {
let bps = (self.hits.saturating_mul(10_000) / denominator).min(10_000);
Some(bps as u32)
};
let below_alarm_threshold = match hit_rate_bps {
Some(rate) if alarm_threshold_bps > 0 => rate < alarm_threshold_bps,
_ => false,
};
PipelineCacheAuditReport {
hits: self.hits,
misses: self.misses,
unknowns: self.unknowns,
hit_rate_bps,
below_alarm_threshold,
}
}
}
#[must_use]
pub fn pipeline_cache_limits_from_env() -> (u32, usize) {
let entries = std::env::var("VYRE_PIPELINE_CACHE_ENTRIES")
.ok()
.and_then(|value| value.parse::<u32>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_PIPELINE_CACHE_ENTRIES as u32);
let bytes = std::env::var("VYRE_PIPELINE_CACHE_BYTES")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_PIPELINE_CACHE_BYTES);
(entries, bytes)
}
#[cfg(test)]
mod tests;