vyre_driver/pipeline/mod.rs
1//! Pipeline mode - pre-compile a Program once, dispatch repeatedly with new inputs.
2
3/// Shared on-disk compiled-pipeline cache.
4pub mod cache;
5/// Backend-neutral pipeline compilation entry points.
6pub mod compiler;
7/// Stable cache hashing and device fingerprint helpers.
8pub mod hashing;
9
10pub use cache::{
11 DiskPipelineCache, PipelineCacheIdentity, PipelineCacheKey, PipelineCacheMissEvidence,
12 PipelineCacheMissReason, PipelineFeatureFlags,
13};
14pub use compiler::{
15 compile, compile_owned, compile_owned_with_telemetry, compile_shared,
16 compile_shared_with_telemetry, compile_with_telemetry, prewarm, prewarm_owned, prewarm_shared,
17};
18pub use hashing::{
19 dispatch_policy_cache_digest, dispatch_policy_cache_string, hex_encode, hex_short,
20 normalized_program_cache_digest, try_normalized_program_cache_digest,
21 update_dispatch_policy_cache_hash, PipelineDeviceFingerprint,
22};
23
24/// Version mixed into every persistent pipeline cache key.
25pub const CURRENT_PIPELINE_CACHE_KEY_VERSION: u32 = 1;
26/// Default maximum number of compiled pipeline artifacts retained in memory.
27pub const DEFAULT_PIPELINE_CACHE_ENTRIES: usize = 256;
28/// Default maximum bytes retained by a backend pipeline cache.
29pub const DEFAULT_PIPELINE_CACHE_BYTES: usize = 256 * 1024 * 1024;
30/// Baseline one-dimensional workgroup used when a caller supplies no override.
31pub const DEFAULT_1D_WORKGROUP_SIZE: [u32; 3] = [64, 1, 1];
32
33/// Backend-reported compiled-pipeline cache counters.
34#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
35pub struct PipelineCacheSnapshot {
36 /// Cache lookups that found an already-compiled artifact.
37 pub hits: u64,
38 /// Cache lookups that required compile/load work.
39 pub misses: u64,
40}
41
42/// Result of compiling a reusable pipeline with honest cache telemetry.
43#[derive(Clone)]
44pub struct CompiledPipelineBuild {
45 /// Reusable pipeline returned by the backend or passthrough wrapper.
46 pub pipeline: std::sync::Arc<dyn crate::backend::CompiledPipeline>,
47 /// `Some(true)` when backend counters prove a cache hit,
48 /// `Some(false)` when counters prove a miss, and `None` when the backend
49 /// does not expose real compile-cache counters.
50 pub cache_hit: Option<bool>,
51 /// Reproducibility manifest for this compiled artifact.
52 pub manifest: PipelineReproManifest,
53}
54
55/// Result of prewarming a backend pipeline cache before the hot dispatch path.
56#[derive(Clone, Debug, Eq, PartialEq)]
57pub struct PipelinePrewarmReport {
58 /// Backend pipeline id that was materialized or fetched from cache.
59 pub pipeline_id: String,
60 /// `Some(true)` when backend counters prove the pipeline was already warm,
61 /// `Some(false)` when this call performed compile/load work, and `None`
62 /// when the backend does not expose real cache counters.
63 pub cache_hit: Option<bool>,
64 /// Reproducibility manifest for the warmed artifact.
65 pub manifest: PipelineReproManifest,
66}
67
68/// JSON-serializable reproducibility sidecar for a compiled pipeline.
69#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
70pub struct PipelineReproManifest {
71 /// Manifest schema version.
72 pub schema: u32,
73 /// Backend id that compiled the artifact.
74 pub backend_id: String,
75 /// Backend pipeline id returned by [`crate::backend::CompiledPipeline::id`].
76 pub pipeline_id: String,
77 /// Canonical normalized Program digest as lowercase hex.
78 pub program_digest: String,
79 /// Dispatch policy fields that affect generated backend code.
80 pub dispatch_policy: String,
81 /// Backend-reported cache status for this compile/prewarm.
82 pub cache_hit: Option<bool>,
83}
84
85impl PipelineReproManifest {
86 /// Current manifest schema.
87 pub const SCHEMA: u32 = 1;
88
89 /// Build a manifest from shared compile facts.
90 #[must_use]
91 pub fn new(
92 backend_id: impl Into<String>,
93 pipeline_id: impl Into<String>,
94 program_digest: [u8; 32],
95 dispatch_policy: impl Into<String>,
96 cache_hit: Option<bool>,
97 ) -> Self {
98 Self {
99 schema: Self::SCHEMA,
100 backend_id: backend_id.into(),
101 pipeline_id: pipeline_id.into(),
102 program_digest: hex_encode(&program_digest),
103 dispatch_policy: dispatch_policy.into(),
104 cache_hit,
105 }
106 }
107
108 /// Serialize as compact JSON for sidecar files and result envelopes.
109 ///
110 /// # Errors
111 ///
112 /// Returns when serde cannot serialize the manifest. This should not occur
113 /// for the current schema, but the error is propagated for forward
114 /// compatibility.
115 pub fn to_json(&self) -> Result<String, serde_json::Error> {
116 serde_json::to_string(self)
117 }
118}
119
120/// ROADMAP C6 substrate: pipeline reuse cache hit-rate audit.
121///
122/// Aggregates a stream of `Option<bool>` cache_hit values from the
123/// dispatcher's [`CompiledPipelineBuild`]/`PipelinePrewarmReport`
124/// reports into hit-rate telemetry. The dispatcher pushes one entry
125/// per resolved pipeline (or one per prewarm); the audit produces a
126/// `PipelineCacheAuditReport` that names the hit rate, the count of
127/// each outcome, and whether the rate falls below a configurable
128/// alarm threshold so operators can wire it into observability and
129/// CI gates.
130///
131/// `Option<bool>::None` values count as `unknown` and are excluded
132/// from the rate denominator. This matches the upstream contract:
133/// some backends do not expose real compile-cache counters and
134/// honestly report `None` rather than lying about a hit.
135#[derive(Debug, Default, Clone)]
136pub struct PipelineCacheAudit {
137 hits: u64,
138 misses: u64,
139 unknowns: u64,
140}
141
142/// Snapshot of a [`PipelineCacheAudit`].
143#[derive(Debug, Clone, PartialEq)]
144pub struct PipelineCacheAuditReport {
145 /// Lookups that found an already-compiled artifact.
146 pub hits: u64,
147 /// Lookups that performed compile/load work.
148 pub misses: u64,
149 /// Lookups whose backend did not report cache state.
150 pub unknowns: u64,
151 /// Hit rate in basis points (0..=10_000) over the
152 /// `hits + misses` denominator (excluding unknowns). `None` when
153 /// `hits + misses == 0` so the caller can distinguish "no data"
154 /// from "0% hit rate".
155 pub hit_rate_bps: Option<u32>,
156 /// Whether the hit rate is below the operator-supplied alarm
157 /// threshold. Always `false` when `hit_rate_bps` is `None`.
158 pub below_alarm_threshold: bool,
159}
160
161impl PipelineCacheAudit {
162 /// Empty audit accumulator.
163 #[must_use]
164 pub fn new() -> Self {
165 Self::default()
166 }
167
168 /// Push one outcome from the dispatcher.
169 pub fn observe(&mut self, cache_hit: Option<bool>) {
170 match cache_hit {
171 Some(true) => self.hits = increment_counter(self.hits, "pipeline cache hits"),
172 Some(false) => self.misses = increment_counter(self.misses, "pipeline cache misses"),
173 None => self.unknowns = increment_counter(self.unknowns, "pipeline cache unknowns"),
174 }
175 }
176
177 /// Snapshot the audit, scoring it against `alarm_threshold_bps`.
178 /// `alarm_threshold_bps = 8000` flags any audit with under 80% hit
179 /// rate; pass `0` to disable the alarm.
180 #[must_use]
181 pub fn snapshot(&self, alarm_threshold_bps: u32) -> PipelineCacheAuditReport {
182 let denominator = self.hits.saturating_add(self.misses);
183 let hit_rate_bps = if denominator == 0 {
184 None
185 } else {
186 Some(crate::numeric::ratio_basis_points_u64(
187 self.hits,
188 denominator,
189 0,
190 "pipeline cache hit rate",
191 "driver",
192 ))
193 };
194 let below_alarm_threshold = match hit_rate_bps {
195 Some(rate) if alarm_threshold_bps > 0 => rate < alarm_threshold_bps,
196 _ => false,
197 };
198 PipelineCacheAuditReport {
199 hits: self.hits,
200 misses: self.misses,
201 unknowns: self.unknowns,
202 hit_rate_bps,
203 below_alarm_threshold,
204 }
205 }
206}
207
208fn increment_counter(value: u64, _label: &str) -> u64 {
209 value.saturating_add(1)
210}
211
212/// Resolve pipeline cache limits from Tier-A operational environment settings.
213#[must_use]
214pub fn pipeline_cache_limits_from_env() -> (u32, usize) {
215 let entries = parse_positive_env_u32(
216 "VYRE_PIPELINE_CACHE_ENTRIES",
217 DEFAULT_PIPELINE_CACHE_ENTRIES as u32,
218 );
219 let bytes = parse_positive_env_usize("VYRE_PIPELINE_CACHE_BYTES", DEFAULT_PIPELINE_CACHE_BYTES);
220 (entries, bytes)
221}
222
223fn parse_positive_env_u32(name: &str, default: u32) -> u32 {
224 let Some(raw) = std::env::var(name).ok() else {
225 return default;
226 };
227 raw.parse::<u32>()
228 .ok()
229 .filter(|value| *value > 0)
230 .unwrap_or(default)
231}
232
233fn parse_positive_env_usize(name: &str, default: usize) -> usize {
234 let Some(raw) = std::env::var(name).ok() else {
235 return default;
236 };
237 raw.parse::<usize>()
238 .ok()
239 .filter(|value| *value > 0)
240 .unwrap_or(default)
241}
242
243#[cfg(test)]
244mod tests;