Skip to main content

vyre_driver/pipeline/
mod.rs

1//! Pipeline mode  -  pre-compile a Program once, dispatch repeatedly with new inputs.
2
3/// Shared on-disk compiled-pipeline cache.
4pub mod cache;
5/// Backend-neutral pipeline compilation entry points.
6pub mod compiler;
7/// Stable cache hashing and device fingerprint helpers.
8pub mod hashing;
9
10pub use cache::{
11    DiskPipelineCache, PipelineCacheIdentity, PipelineCacheKey, PipelineCacheMissEvidence,
12    PipelineCacheMissReason, PipelineFeatureFlags,
13};
14pub use compiler::{
15    compile, compile_owned, compile_owned_with_telemetry, compile_shared,
16    compile_shared_with_telemetry, compile_with_telemetry, prewarm, prewarm_owned, prewarm_shared,
17};
18pub use hashing::{
19    dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode, hex_short,
20    normalized_program_cache_digest, try_normalized_program_cache_digest,
21    update_dispatch_policy_cache_hash, PipelineDeviceFingerprint,
22};
23
24/// Version mixed into every persistent pipeline cache key.
25pub const CURRENT_PIPELINE_CACHE_KEY_VERSION: u32 = 1;
26/// Default maximum number of compiled pipeline artifacts retained in memory.
27pub const DEFAULT_PIPELINE_CACHE_ENTRIES: usize = 256;
28/// Default maximum bytes retained by a backend pipeline cache.
29pub const DEFAULT_PIPELINE_CACHE_BYTES: usize = 256 * 1024 * 1024;
30/// Baseline one-dimensional workgroup used when a caller supplies no override.
31pub const DEFAULT_1D_WORKGROUP_SIZE: [u32; 3] = [64, 1, 1];
32
33/// Backend-reported compiled-pipeline cache counters.
34#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
35pub struct PipelineCacheSnapshot {
36    /// Cache lookups that found an already-compiled artifact.
37    pub hits: u64,
38    /// Cache lookups that required compile/load work.
39    pub misses: u64,
40}
41
42/// Result of compiling a reusable pipeline with honest cache telemetry.
43#[derive(Clone)]
44pub struct CompiledPipelineBuild {
45    /// Reusable pipeline returned by the backend or passthrough wrapper.
46    pub pipeline: std::sync::Arc<dyn crate::backend::CompiledPipeline>,
47    /// `Some(true)` when backend counters prove a cache hit,
48    /// `Some(false)` when counters prove a miss, and `None` when the backend
49    /// does not expose real compile-cache counters.
50    pub cache_hit: Option<bool>,
51    /// Reproducibility manifest for this compiled artifact.
52    pub manifest: PipelineReproManifest,
53}
54
55/// Result of prewarming a backend pipeline cache before the hot dispatch path.
56#[derive(Clone, Debug, Eq, PartialEq)]
57pub struct PipelinePrewarmReport {
58    /// Backend pipeline id that was materialized or fetched from cache.
59    pub pipeline_id: String,
60    /// `Some(true)` when backend counters prove the pipeline was already warm,
61    /// `Some(false)` when this call performed compile/load work, and `None`
62    /// when the backend does not expose real cache counters.
63    pub cache_hit: Option<bool>,
64    /// Reproducibility manifest for the warmed artifact.
65    pub manifest: PipelineReproManifest,
66}
67
68/// JSON-serializable reproducibility sidecar for a compiled pipeline.
69#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
70pub struct PipelineReproManifest {
71    /// Manifest schema version.
72    pub schema: u32,
73    /// Backend id that compiled the artifact.
74    pub backend_id: String,
75    /// Backend pipeline id returned by [`crate::backend::CompiledPipeline::id`].
76    pub pipeline_id: String,
77    /// Canonical normalized Program digest as lowercase hex.
78    pub program_digest: String,
79    /// Dispatch policy fields that affect generated backend code.
80    pub dispatch_policy: String,
81    /// Backend-reported cache status for this compile/prewarm.
82    pub cache_hit: Option<bool>,
83}
84
85impl PipelineReproManifest {
86    /// Current manifest schema.
87    pub const SCHEMA: u32 = 1;
88
89    /// Build a manifest from shared compile facts.
90    #[must_use]
91    pub fn new(
92        backend_id: impl Into<String>,
93        pipeline_id: impl Into<String>,
94        program_digest: [u8; 32],
95        dispatch_policy: impl Into<String>,
96        cache_hit: Option<bool>,
97    ) -> Self {
98        Self {
99            schema: Self::SCHEMA,
100            backend_id: backend_id.into(),
101            pipeline_id: pipeline_id.into(),
102            program_digest: hex_encode(&program_digest),
103            dispatch_policy: dispatch_policy.into(),
104            cache_hit,
105        }
106    }
107
108    /// Serialize as compact JSON for sidecar files and result envelopes.
109    ///
110    /// # Errors
111    ///
112    /// Returns when serde cannot serialize the manifest. This should not occur
113    /// for the current schema, but the error is propagated for forward
114    /// compatibility.
115    pub fn to_json(&self) -> Result<String, serde_json::Error> {
116        serde_json::to_string(self)
117    }
118}
119
120/// ROADMAP C6 substrate: pipeline reuse cache hit-rate audit.
121///
122/// Aggregates a stream of `Option<bool>` cache_hit values from the
123/// dispatcher's [`CompiledPipelineBuild`]/`PipelinePrewarmReport`
124/// reports into hit-rate telemetry. The dispatcher pushes one entry
125/// per resolved pipeline (or one per prewarm); the audit produces a
126/// `PipelineCacheAuditReport` that names the hit rate, the count of
127/// each outcome, and whether the rate falls below a configurable
128/// alarm threshold so operators can wire it into observability and
129/// CI gates.
130///
131/// `Option<bool>::None` values count as `unknown` and are excluded
132/// from the rate denominator. This matches the upstream contract:
133/// some backends do not expose real compile-cache counters and
134/// honestly report `None` rather than lying about a hit.
135#[derive(Debug, Default, Clone)]
136pub struct PipelineCacheAudit {
137    hits: u64,
138    misses: u64,
139    unknowns: u64,
140}
141
142/// Snapshot of a [`PipelineCacheAudit`].
143#[derive(Debug, Clone, PartialEq)]
144pub struct PipelineCacheAuditReport {
145    /// Lookups that found an already-compiled artifact.
146    pub hits: u64,
147    /// Lookups that performed compile/load work.
148    pub misses: u64,
149    /// Lookups whose backend did not report cache state.
150    pub unknowns: u64,
151    /// Hit rate in basis points (0..=10_000) over the
152    /// `hits + misses` denominator (excluding unknowns). `None` when
153    /// `hits + misses == 0` so the caller can distinguish "no data"
154    /// from "0% hit rate".
155    pub hit_rate_bps: Option<u32>,
156    /// Whether the hit rate is below the operator-supplied alarm
157    /// threshold. Always `false` when `hit_rate_bps` is `None`.
158    pub below_alarm_threshold: bool,
159}
160
161impl PipelineCacheAudit {
162    /// Empty audit accumulator.
163    #[must_use]
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    /// Push one outcome from the dispatcher.
169    pub fn observe(&mut self, cache_hit: Option<bool>) {
170        match cache_hit {
171            Some(true) => self.hits = increment_counter(self.hits, "pipeline cache hits"),
172            Some(false) => self.misses = increment_counter(self.misses, "pipeline cache misses"),
173            None => self.unknowns = increment_counter(self.unknowns, "pipeline cache unknowns"),
174        }
175    }
176
177    /// Snapshot the audit, scoring it against `alarm_threshold_bps`.
178    /// `alarm_threshold_bps = 8000` flags any audit with under 80% hit
179    /// rate; pass `0` to disable the alarm.
180    #[must_use]
181    pub fn snapshot(&self, alarm_threshold_bps: u32) -> PipelineCacheAuditReport {
182        let denominator = self.hits.saturating_add(self.misses);
183        let hit_rate_bps = if denominator == 0 {
184            None
185        } else {
186            Some(crate::numeric::ratio_basis_points_u64(
187                self.hits,
188                denominator,
189                0,
190                "pipeline cache hit rate",
191                "driver",
192            ))
193        };
194        let below_alarm_threshold = match hit_rate_bps {
195            Some(rate) if alarm_threshold_bps > 0 => rate < alarm_threshold_bps,
196            _ => false,
197        };
198        PipelineCacheAuditReport {
199            hits: self.hits,
200            misses: self.misses,
201            unknowns: self.unknowns,
202            hit_rate_bps,
203            below_alarm_threshold,
204        }
205    }
206}
207
208fn increment_counter(value: u64, _label: &str) -> u64 {
209    value.saturating_add(1)
210}
211
212/// Resolve pipeline cache limits from Tier-A operational environment settings.
213#[must_use]
214pub fn pipeline_cache_limits_from_env() -> (u32, usize) {
215    let entries = parse_positive_env_u32(
216        "VYRE_PIPELINE_CACHE_ENTRIES",
217        DEFAULT_PIPELINE_CACHE_ENTRIES as u32,
218    );
219    let bytes = parse_positive_env_usize("VYRE_PIPELINE_CACHE_BYTES", DEFAULT_PIPELINE_CACHE_BYTES);
220    (entries, bytes)
221}
222
223fn parse_positive_env_u32(name: &str, default: u32) -> u32 {
224    let Some(raw) = std::env::var(name).ok() else {
225        return default;
226    };
227    raw.parse::<u32>()
228        .ok()
229        .filter(|value| *value > 0)
230        .unwrap_or(default)
231}
232
233fn parse_positive_env_usize(name: &str, default: usize) -> usize {
234    let Some(raw) = std::env::var(name).ok() else {
235        return default;
236    };
237    raw.parse::<usize>()
238        .ok()
239        .filter(|value| *value > 0)
240        .unwrap_or(default)
241}
242
243#[cfg(test)]
244mod tests;