Skip to main content

vyre_driver/
observability.rs

1//! Driver-tier observability surface (P-OBS-1).
2//!
3//! Single entry point for metrics consumers (Prometheus,
4//! OpenTelemetry, Datadog, custom dashboards). Aggregates:
5//!
6//! - Substrate-call counters from
7//!   `vyre_self_substrate::observability`.
8//! - Cache hit/miss rates (when caches expose them).
9//! - Substrate-decision telemetry (which math chose what).
10//!
11//! Backends extend this surface with
12//! backend-specific gauges via the
13//! [`crate::observability::BackendObservabilityProvider`] trait.
14
15#[cfg(feature = "self-substrate-adapters")]
16use vyre_self_substrate::decision_telemetry as decision_obs;
17#[cfg(feature = "self-substrate-adapters")]
18use vyre_self_substrate::observability as substrate_obs;
19
20use std::collections::VecDeque;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::{Mutex, OnceLock};
23
24const TRACE_EVENT_CAPACITY: usize = 256;
25
26/// Human-readable optimization event emitted when `VYRE_TRACE=1`.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct SubstrateAuditEvent {
29    /// Substrate or policy that fired.
30    pub substrate: &'static str,
31    /// Action selected by the substrate.
32    pub action: &'static str,
33    /// Predicted or measured savings in nanoseconds.
34    pub saved_ns: u128,
35    /// Static context string suitable for logs and tests.
36    pub detail: &'static str,
37}
38
39/// Snapshot of every driver-tier metric at a single instant.
40///
41/// Cheap to construct (atomic loads + flat `Vec` allocations).
42/// Callers serialize via `serde` or convert to their dashboard's
43/// metric format.
44#[derive(Debug, Clone)]
45pub struct DriverObservability {
46    /// Per-substrate-module call counts.
47    pub substrate_calls: Vec<(&'static str, u64)>,
48    /// Sum across all substrate counters  -  single-number health signal.
49    pub substrate_total_calls: u64,
50    /// Substrate-decision histogram buckets (fusion / eviction /
51    /// provenance) from
52    /// `vyre_self_substrate::decision_telemetry`.
53    pub decision_buckets: Vec<(&'static str, u64)>,
54    /// Bounded recent audit events emitted by substrate decisions
55    /// while `VYRE_TRACE=1` is active.
56    pub audit_events: Vec<SubstrateAuditEvent>,
57    /// Backend-neutral dispatch counters captured at the shared runtime
58    /// boundary.
59    pub dispatch: DispatchTelemetry,
60}
61
62/// Backend-neutral dispatch counters for runtime performance evidence.
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
64pub struct DispatchTelemetry {
65    /// Dispatch submissions observed by the shared driver boundary.
66    pub launches: u64,
67    /// Input bytes presented to dispatch.
68    pub input_bytes: u64,
69    /// Output bytes read back to host-visible buffers.
70    pub output_bytes: u64,
71    /// Output slots written by dispatches.
72    pub output_slots: u64,
73    /// Output slots whose retained allocation was reused.
74    pub output_slots_reused: u64,
75    /// Output slots that moved an oversized incoming allocation into place.
76    pub output_slots_moved: u64,
77    /// New output slots appended because the caller-owned output vector was too
78    /// short.
79    pub output_slots_appended: u64,
80    /// Incoming output bytes presented to caller-owned slot replacement.
81    pub output_slot_incoming_bytes: u64,
82    /// Output bytes copied into retained caller-owned slots.
83    pub output_slot_copied_bytes: u64,
84    /// Output bytes moved into place by swapping oversized incoming slots.
85    pub output_slot_moved_bytes: u64,
86    /// Output bytes appended beyond the previous output vector length.
87    pub output_slot_appended_bytes: u64,
88    /// Retained output-slot capacity observed after replacement.
89    pub output_slot_retained_capacity_bytes: u64,
90    /// Programs split because the backend lacked native grid-sync support.
91    pub grid_sync_splits: u64,
92    /// Total segment dispatches produced by grid-sync splitting.
93    pub grid_sync_segments: u64,
94    /// Total logical grid synchronization points split out of programs.
95    pub grid_sync_points: u64,
96}
97
98struct DispatchTelemetryCounters {
99    launches: AtomicU64,
100    input_bytes: AtomicU64,
101    output_bytes: AtomicU64,
102    output_slots: AtomicU64,
103    output_slots_reused: AtomicU64,
104    output_slots_moved: AtomicU64,
105    output_slots_appended: AtomicU64,
106    output_slot_incoming_bytes: AtomicU64,
107    output_slot_copied_bytes: AtomicU64,
108    output_slot_moved_bytes: AtomicU64,
109    output_slot_appended_bytes: AtomicU64,
110    output_slot_retained_capacity_bytes: AtomicU64,
111    grid_sync_splits: AtomicU64,
112    grid_sync_segments: AtomicU64,
113    grid_sync_points: AtomicU64,
114}
115
116impl DispatchTelemetryCounters {
117    const fn new() -> Self {
118        Self {
119            launches: AtomicU64::new(0),
120            input_bytes: AtomicU64::new(0),
121            output_bytes: AtomicU64::new(0),
122            output_slots: AtomicU64::new(0),
123            output_slots_reused: AtomicU64::new(0),
124            output_slots_moved: AtomicU64::new(0),
125            output_slots_appended: AtomicU64::new(0),
126            output_slot_incoming_bytes: AtomicU64::new(0),
127            output_slot_copied_bytes: AtomicU64::new(0),
128            output_slot_moved_bytes: AtomicU64::new(0),
129            output_slot_appended_bytes: AtomicU64::new(0),
130            output_slot_retained_capacity_bytes: AtomicU64::new(0),
131            grid_sync_splits: AtomicU64::new(0),
132            grid_sync_segments: AtomicU64::new(0),
133            grid_sync_points: AtomicU64::new(0),
134        }
135    }
136
137    fn snapshot(&self) -> DispatchTelemetry {
138        DispatchTelemetry {
139            launches: self.launches.load(Ordering::Relaxed),
140            input_bytes: self.input_bytes.load(Ordering::Relaxed),
141            output_bytes: self.output_bytes.load(Ordering::Relaxed),
142            output_slots: self.output_slots.load(Ordering::Relaxed),
143            output_slots_reused: self.output_slots_reused.load(Ordering::Relaxed),
144            output_slots_moved: self.output_slots_moved.load(Ordering::Relaxed),
145            output_slots_appended: self.output_slots_appended.load(Ordering::Relaxed),
146            output_slot_incoming_bytes: self.output_slot_incoming_bytes.load(Ordering::Relaxed),
147            output_slot_copied_bytes: self.output_slot_copied_bytes.load(Ordering::Relaxed),
148            output_slot_moved_bytes: self.output_slot_moved_bytes.load(Ordering::Relaxed),
149            output_slot_appended_bytes: self.output_slot_appended_bytes.load(Ordering::Relaxed),
150            output_slot_retained_capacity_bytes: self
151                .output_slot_retained_capacity_bytes
152                .load(Ordering::Relaxed),
153            grid_sync_splits: self.grid_sync_splits.load(Ordering::Relaxed),
154            grid_sync_segments: self.grid_sync_segments.load(Ordering::Relaxed),
155            grid_sync_points: self.grid_sync_points.load(Ordering::Relaxed),
156        }
157    }
158}
159
160static DISPATCH_TELEMETRY: DispatchTelemetryCounters = DispatchTelemetryCounters::new();
161
162impl DriverObservability {
163    /// Take a snapshot of all driver-tier metrics now.
164    #[must_use]
165    pub fn snapshot() -> Self {
166        #[cfg(feature = "self-substrate-adapters")]
167        {
168            return Self::try_snapshot().unwrap_or_else(|_| Self {
169                substrate_calls: Vec::new(),
170                substrate_total_calls: 0,
171                decision_buckets: Vec::new(),
172                audit_events: Vec::new(),
173                dispatch: snapshot_dispatch_telemetry(),
174            });
175        }
176        #[cfg(not(feature = "self-substrate-adapters"))]
177        {
178            Self {
179                substrate_calls: Vec::new(),
180                substrate_total_calls: 0,
181                decision_buckets: Vec::new(),
182                audit_events: Vec::new(),
183                dispatch: snapshot_dispatch_telemetry(),
184            }
185        }
186    }
187
188    /// Fallibly take a snapshot of all driver-tier metrics.
189    ///
190    /// # Errors
191    ///
192    /// Returns [`crate::backend::BackendError`] when substrate telemetry is not
193    /// compiled in. Callers that require substrate counters should use this
194    /// method instead of treating the compatibility [`Self::snapshot`] fallback
195    /// as a full observability view.
196    pub fn try_snapshot() -> Result<Self, crate::backend::BackendError> {
197        #[cfg(feature = "self-substrate-adapters")]
198        {
199            Ok(Self {
200                substrate_calls: substrate_obs::snapshot_counters(),
201                substrate_total_calls: substrate_obs::total_calls(),
202                decision_buckets: decision_obs::snapshot_decisions(),
203                audit_events: snapshot_trace_events(),
204                dispatch: snapshot_dispatch_telemetry(),
205            })
206        }
207        #[cfg(not(feature = "self-substrate-adapters"))]
208        {
209            Err(crate::backend::BackendError::new(
210                "vyre-driver observability substrate telemetry requires the self-substrate-adapters feature. Fix: enable the feature for substrate counters, or use DriverObservability::snapshot for dispatch-only compatibility telemetry."
211                    .to_string(),
212            ))
213        }
214    }
215
216    /// Format the snapshot as Prometheus text-exposition format.
217    /// Counter metrics use `vyre_driver_substrate_calls_total{module="<name>"}`.
218    #[must_use]
219    pub fn to_prometheus(&self) -> String {
220        let mut out = String::with_capacity(prometheus_capacity(
221            self.substrate_calls.len(),
222            self.decision_buckets.len(),
223            self.audit_events.len(),
224        ));
225        out.push_str(
226            "# HELP vyre_driver_substrate_calls_total Total substrate-consumer calls per module\n",
227        );
228        out.push_str("# TYPE vyre_driver_substrate_calls_total counter\n");
229        for (module, count) in &self.substrate_calls {
230            // Strip the trailing _calls suffix from the module name
231            // for a cleaner Prometheus label.
232            let module_label = module.trim_end_matches("_calls");
233            use std::fmt::Write;
234            let _ = writeln!(
235                out,
236                "vyre_driver_substrate_calls_total{{module=\"{module_label}\"}} {count}"
237            );
238        }
239        out.push_str(
240            "# HELP vyre_driver_substrate_total_calls Sum of all substrate-consumer calls\n",
241        );
242        out.push_str("# TYPE vyre_driver_substrate_total_calls counter\n");
243        let _ = std::fmt::Write::write_fmt(
244            &mut out,
245            format_args!(
246                "vyre_driver_substrate_total_calls {}\n",
247                self.substrate_total_calls
248            ),
249        );
250        out.push_str("# HELP vyre_driver_substrate_decisions_total Substrate-decision histogram (fusion/eviction/provenance buckets)\n");
251        out.push_str("# TYPE vyre_driver_substrate_decisions_total counter\n");
252        for (bucket, count) in &self.decision_buckets {
253            use std::fmt::Write;
254            let _ = writeln!(
255                out,
256                "vyre_driver_substrate_decisions_total{{bucket=\"{bucket}\"}} {count}"
257            );
258        }
259        out.push_str("# HELP vyre_driver_substrate_audit_saved_ns Predicted or measured savings per optimization event\n");
260        out.push_str("# TYPE vyre_driver_substrate_audit_saved_ns gauge\n");
261        for event in &self.audit_events {
262            use std::fmt::Write;
263            let _ = writeln!(
264                out,
265                "vyre_driver_substrate_audit_saved_ns{{substrate=\"{}\",action=\"{}\",detail=\"{}\"}} {}",
266                event.substrate, event.action, event.detail, event.saved_ns
267            );
268        }
269        out.push_str("# HELP vyre_driver_dispatch_launches_total Dispatch submissions observed by the shared driver boundary\n");
270        out.push_str("# TYPE vyre_driver_dispatch_launches_total counter\n");
271        let _ = std::fmt::Write::write_fmt(
272            &mut out,
273            format_args!(
274                "vyre_driver_dispatch_launches_total {}\n",
275                self.dispatch.launches
276            ),
277        );
278        out.push_str(
279            "# HELP vyre_driver_dispatch_bytes_total Host-visible dispatch bytes by direction\n",
280        );
281        out.push_str("# TYPE vyre_driver_dispatch_bytes_total counter\n");
282        let _ = std::fmt::Write::write_fmt(
283            &mut out,
284            format_args!(
285                "vyre_driver_dispatch_bytes_total{{direction=\"input\"}} {}\nvyre_driver_dispatch_bytes_total{{direction=\"output\"}} {}\n",
286                self.dispatch.input_bytes,
287                self.dispatch.output_bytes
288            ),
289        );
290        out.push_str(
291            "# HELP vyre_driver_dispatch_output_slots_total Output slot handling by kind\n",
292        );
293        out.push_str("# TYPE vyre_driver_dispatch_output_slots_total counter\n");
294        let _ = std::fmt::Write::write_fmt(
295            &mut out,
296            format_args!(
297                "vyre_driver_dispatch_output_slots_total{{kind=\"total\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"reused\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"moved\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"appended\"}} {}\n",
298                self.dispatch.output_slots,
299                self.dispatch.output_slots_reused,
300                self.dispatch.output_slots_moved,
301                self.dispatch.output_slots_appended
302            ),
303        );
304        out.push_str("# HELP vyre_driver_dispatch_output_slot_bytes_total Output slot byte pressure by kind\n");
305        out.push_str("# TYPE vyre_driver_dispatch_output_slot_bytes_total counter\n");
306        let _ = std::fmt::Write::write_fmt(
307            &mut out,
308            format_args!(
309                "vyre_driver_dispatch_output_slot_bytes_total{{kind=\"incoming\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"copied\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"moved\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"appended\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"retained_capacity\"}} {}\n",
310                self.dispatch.output_slot_incoming_bytes,
311                self.dispatch.output_slot_copied_bytes,
312                self.dispatch.output_slot_moved_bytes,
313                self.dispatch.output_slot_appended_bytes,
314                self.dispatch.output_slot_retained_capacity_bytes
315            ),
316        );
317        out.push_str("# HELP vyre_driver_grid_sync_splits_total Grid-sync split events and produced synchronization structure\n");
318        out.push_str("# TYPE vyre_driver_grid_sync_splits_total counter\n");
319        let _ = std::fmt::Write::write_fmt(
320            &mut out,
321            format_args!(
322                "vyre_driver_grid_sync_splits_total{{kind=\"programs\"}} {}\nvyre_driver_grid_sync_splits_total{{kind=\"segments\"}} {}\nvyre_driver_grid_sync_splits_total{{kind=\"sync_points\"}} {}\n",
323                self.dispatch.grid_sync_splits,
324                self.dispatch.grid_sync_segments,
325                self.dispatch.grid_sync_points
326            ),
327        );
328        out
329    }
330
331    /// Format recent substrate audit events as line-oriented text.
332    #[must_use]
333    pub fn to_audit_log(&self) -> String {
334        let mut out = String::with_capacity(audit_log_capacity(self.audit_events.len()));
335        for event in &self.audit_events {
336            use std::fmt::Write;
337            let _ = writeln!(
338                out,
339                "{} {} saved={}ns {}",
340                event.substrate, event.action, event.saved_ns, event.detail
341            );
342        }
343        out
344    }
345}
346
347fn prometheus_capacity(
348    substrate_calls: usize,
349    decision_buckets: usize,
350    audit_events: usize,
351) -> usize {
352    let substrate_capacity =
353        checked_capacity_mul(substrate_calls, 96, "substrate call metrics").unwrap_or(usize::MAX);
354    let decision_capacity =
355        checked_capacity_mul(decision_buckets, 112, "decision bucket metrics").unwrap_or(usize::MAX);
356    let audit_capacity =
357        checked_capacity_mul(audit_events, 128, "audit event metrics").unwrap_or(usize::MAX);
358    checked_capacity_add(
359        384,
360        substrate_capacity,
361        "prometheus substrate call capacity",
362    )
363    .and_then(|capacity| {
364        checked_capacity_add(
365            capacity,
366            decision_capacity,
367            "prometheus decision bucket capacity",
368        )
369    })
370    .and_then(|capacity| {
371        checked_capacity_add(capacity, audit_capacity, "prometheus audit event capacity")
372    })
373    .unwrap_or(usize::MAX)
374}
375
376fn audit_log_capacity(audit_events: usize) -> usize {
377    checked_capacity_mul(audit_events, 96, "audit log events").unwrap_or(usize::MAX)
378}
379
380fn checked_capacity_mul(
381    count: usize,
382    bytes_per_entry: usize,
383    label: &str,
384) -> Result<usize, String> {
385    count.checked_mul(bytes_per_entry).ok_or_else(|| {
386        format!(
387            "{label} capacity estimate overflowed: count={count}, bytes_per_entry={bytes_per_entry}. Fix: page observability output instead of silently clamping allocation size."
388        )
389    })
390}
391
392fn checked_capacity_add(left: usize, right: usize, label: &str) -> Result<usize, String> {
393    left.checked_add(right).ok_or_else(|| {
394        format!(
395            "{label} capacity estimate overflowed: left={left}, right={right}. Fix: page observability output instead of silently clamping allocation size."
396        )
397    })
398}
399
400/// Record one completed dispatch's host-visible input and output volume.
401pub fn record_dispatch_io(inputs: &[&[u8]], outputs: &[Vec<u8>]) {
402    DISPATCH_TELEMETRY.launches.fetch_add(1, Ordering::Relaxed);
403    DISPATCH_TELEMETRY
404        .input_bytes
405        .fetch_add(sum_input_bytes(inputs), Ordering::Relaxed);
406    DISPATCH_TELEMETRY
407        .output_bytes
408        .fetch_add(sum_output_bytes(outputs), Ordering::Relaxed);
409}
410
411/// Record how caller-owned output slots were populated.
412pub fn record_output_slot_stats(stats: crate::backend::OutputSlotStats) {
413    DISPATCH_TELEMETRY
414        .output_slots
415        .fetch_add(stats.total_slots as u64, Ordering::Relaxed);
416    DISPATCH_TELEMETRY
417        .output_slots_reused
418        .fetch_add(stats.reused_slots as u64, Ordering::Relaxed);
419    DISPATCH_TELEMETRY
420        .output_slots_moved
421        .fetch_add(stats.moved_slots as u64, Ordering::Relaxed);
422    DISPATCH_TELEMETRY
423        .output_slots_appended
424        .fetch_add(stats.appended_slots as u64, Ordering::Relaxed);
425}
426
427/// Record full output replacement accounting, including byte pressure.
428pub fn record_output_replacement_stats(stats: crate::backend::OutputReplacementStats) {
429    record_output_slot_stats(stats.slots);
430    record_output_slot_byte_stats(stats.bytes);
431}
432
433/// Record byte-pressure accounting from caller-owned output slot replacement.
434pub fn record_output_slot_byte_stats(stats: crate::backend::OutputSlotByteStats) {
435    DISPATCH_TELEMETRY
436        .output_slot_incoming_bytes
437        .fetch_add(stats.incoming_bytes as u64, Ordering::Relaxed);
438    DISPATCH_TELEMETRY
439        .output_slot_copied_bytes
440        .fetch_add(stats.copied_bytes as u64, Ordering::Relaxed);
441    DISPATCH_TELEMETRY
442        .output_slot_moved_bytes
443        .fetch_add(stats.moved_bytes as u64, Ordering::Relaxed);
444    DISPATCH_TELEMETRY
445        .output_slot_appended_bytes
446        .fetch_add(stats.appended_bytes as u64, Ordering::Relaxed);
447    DISPATCH_TELEMETRY
448        .output_slot_retained_capacity_bytes
449        .fetch_add(stats.retained_capacity_bytes as u64, Ordering::Relaxed);
450}
451
452/// Record that one program was split into multiple dispatch segments because
453/// the selected backend lacks native grid-sync support.
454pub fn record_grid_sync_split(segment_count: usize) {
455    DISPATCH_TELEMETRY
456        .grid_sync_splits
457        .fetch_add(1, Ordering::Relaxed);
458    DISPATCH_TELEMETRY
459        .grid_sync_segments
460        .fetch_add(segment_count as u64, Ordering::Relaxed);
461    let sync_points = segment_count.saturating_sub(1);
462    DISPATCH_TELEMETRY
463        .grid_sync_points
464        .fetch_add(sync_points as u64, Ordering::Relaxed);
465}
466
467/// Snapshot backend-neutral dispatch telemetry.
468#[must_use]
469pub fn snapshot_dispatch_telemetry() -> DispatchTelemetry {
470    DISPATCH_TELEMETRY.snapshot()
471}
472
473fn sum_input_bytes(inputs: &[&[u8]]) -> u64 {
474    inputs.iter().map(|input| input.len() as u64).sum()
475}
476
477fn sum_output_bytes(outputs: &[Vec<u8>]) -> u64 {
478    outputs.iter().map(|output| output.len() as u64).sum()
479}
480
481/// Trait every backend implements to surface backend-specific metrics
482/// alongside the common driver-tier ones. Optional  -  backends not
483/// implementing it still get the substrate-counter view.
484pub trait BackendObservabilityProvider {
485    /// Backend-specific metrics, formatted as a flat list of
486    /// `(metric_name, value)`. The driver core combines these with
487    /// the substrate counters into a unified snapshot.
488    fn backend_metrics(&self) -> Vec<(&'static str, u64)>;
489}
490
491fn trace_events() -> &'static Mutex<VecDeque<SubstrateAuditEvent>> {
492    static EVENTS: OnceLock<Mutex<VecDeque<SubstrateAuditEvent>>> = OnceLock::new();
493    EVENTS.get_or_init(|| Mutex::new(VecDeque::with_capacity(TRACE_EVENT_CAPACITY)))
494}
495
496fn trace_enabled() -> bool {
497    static ENABLED: OnceLock<bool> = OnceLock::new();
498    *ENABLED.get_or_init(|| {
499        std::env::var("VYRE_TRACE")
500            .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
501            .unwrap_or(false)
502    })
503}
504
505/// Record one substrate audit event when `VYRE_TRACE=1`.
506///
507/// This is intentionally a no-op when trace is disabled so dispatch
508/// policies can call it without allocating on normal hot paths.
509pub fn record_substrate_audit_event(event: SubstrateAuditEvent) {
510    if !trace_enabled() {
511        return;
512    }
513    if let Ok(mut events) = trace_events().lock() {
514        if events.len() == TRACE_EVENT_CAPACITY {
515            events.pop_front();
516        }
517        tracing::info!(
518            target: "vyre_driver::substrate_audit",
519            substrate = event.substrate,
520            action = event.action,
521            saved_ns = event.saved_ns,
522            detail = event.detail,
523            "vyre substrate optimization fired"
524        );
525        events.push_back(event);
526    }
527}
528
529#[cfg(feature = "self-substrate-adapters")]
530fn snapshot_trace_events() -> Vec<SubstrateAuditEvent> {
531    trace_events()
532        .lock()
533        .map(|events| {
534            let mut snapshot = Vec::new();
535            let _ = snapshot.try_reserve_exact(events.len());
536            snapshot.extend(events.iter().cloned());
537            snapshot
538        })
539        .unwrap_or_default()
540}
541
542#[cfg(test)]
543pub(crate) fn record_substrate_audit_event_for_test(event: SubstrateAuditEvent) {
544    if let Ok(mut events) = trace_events().lock() {
545        if events.len() == TRACE_EVENT_CAPACITY {
546            events.pop_front();
547        }
548        events.push_back(event);
549    }
550}
551
552#[cfg(test)]
553pub(crate) fn snapshot_for_test() -> DriverObservability {
554    let audit_events = trace_events()
555        .lock()
556        .map(|events| events.iter().cloned().collect())
557        .unwrap_or_default();
558    DriverObservability {
559        substrate_calls: Vec::new(),
560        substrate_total_calls: 0,
561        decision_buckets: Vec::new(),
562        audit_events,
563        dispatch: snapshot_dispatch_telemetry(),
564    }
565}
566
567#[cfg(test)]
568pub(crate) fn clear_substrate_audit_events_for_test() {
569    if let Ok(mut events) = trace_events().lock() {
570        events.clear();
571    }
572}
573
574#[cfg(test)]
575pub(crate) fn audit_events_test_lock() -> std::sync::MutexGuard<'static, ()> {
576    static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
577    LOCK.get_or_init(|| Mutex::new(()))
578        .lock()
579        .expect("Fix: audit event test lock must not be poisoned")
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585
586    #[test]
587    #[cfg(feature = "self-substrate-adapters")]
588    fn snapshot_yields_nonempty_substrate_list() {
589        let snap = DriverObservability::snapshot();
590        assert!(
591            snap.substrate_calls
592                .iter()
593                .any(|(module, count)| *count > 0 && !module.is_empty()),
594            "snapshot must record at least one substrate module with nonzero calls"
595        );
596    }
597
598    #[test]
599    #[cfg(feature = "self-substrate-adapters")]
600    fn prometheus_output_contains_module_labels() {
601        let snap = DriverObservability::snapshot();
602        let prom = snap.to_prometheus();
603        assert!(prom.contains("module=\"matroid_megakernel_scheduler\""));
604        assert!(prom.contains("module=\"vsa_fingerprint\""));
605        assert!(prom.contains("# HELP vyre_driver_substrate_calls_total"));
606    }
607
608    #[test]
609    #[cfg(not(feature = "self-substrate-adapters"))]
610    fn try_snapshot_without_adapter_feature_returns_structured_error() {
611        let error = DriverObservability::try_snapshot()
612            .expect_err("try_snapshot must report missing substrate telemetry as an error");
613        let message = error.to_string();
614        assert!(
615            message.contains("self-substrate-adapters"),
616            "structured error must name the missing feature"
617        );
618        assert!(
619            message.contains("DriverObservability::snapshot"),
620            "structured error must name the dispatch-only compatibility path"
621        );
622    }
623
624    #[test]
625    #[cfg(not(feature = "self-substrate-adapters"))]
626    fn snapshot_without_adapter_feature_is_dispatch_only_not_panic() {
627        let snapshot = DriverObservability::snapshot();
628        assert!(snapshot.substrate_calls.is_empty());
629        assert_eq!(snapshot.substrate_total_calls, 0);
630        assert!(snapshot.decision_buckets.is_empty());
631    }
632
633    #[test]
634    #[cfg(feature = "self-substrate-adapters")]
635    fn total_calls_appears_in_prometheus() {
636        let snap = DriverObservability::snapshot();
637        let prom = snap.to_prometheus();
638        assert!(prom.contains("vyre_driver_substrate_total_calls"));
639    }
640
641    #[test]
642    #[cfg(feature = "self-substrate-adapters")]
643    fn audit_log_and_prometheus_include_recorded_events() {
644        let _guard = audit_events_test_lock();
645        clear_substrate_audit_events_for_test();
646        record_substrate_audit_event_for_test(SubstrateAuditEvent {
647            substrate: "trace_jit",
648            action: "speculate",
649            saved_ns: 123,
650            detail: "predicted_shape",
651        });
652        let snap = DriverObservability::snapshot();
653        assert_eq!(snap.audit_events.len(), 1);
654        assert!(snap
655            .to_audit_log()
656            .contains("trace_jit speculate saved=123ns"));
657        assert!(snap
658            .to_prometheus()
659            .contains("vyre_driver_substrate_audit_saved_ns"));
660        clear_substrate_audit_events_for_test();
661    }
662
663    #[test]
664    fn dispatch_telemetry_records_bytes_slots_and_prometheus_metrics() {
665        let before = snapshot_dispatch_telemetry();
666        record_dispatch_io(&[&[1, 2, 3], &[4]], &[vec![9, 8]]);
667        record_output_slot_stats(crate::backend::OutputSlotStats {
668            total_slots: 3,
669            reused_slots: 1,
670            moved_slots: 1,
671            appended_slots: 1,
672        });
673        record_output_slot_byte_stats(crate::backend::OutputSlotByteStats {
674            incoming_bytes: 9,
675            copied_bytes: 2,
676            moved_bytes: 4,
677            appended_bytes: 3,
678            retained_capacity_bytes: 16,
679        });
680
681        let dispatch = snapshot_dispatch_telemetry();
682        assert!(dispatch.launches >= before.launches + 1);
683        assert!(dispatch.input_bytes >= before.input_bytes + 4);
684        assert!(dispatch.output_bytes >= before.output_bytes + 2);
685        assert!(dispatch.output_slots >= before.output_slots + 3);
686        assert!(dispatch.output_slots_reused >= before.output_slots_reused + 1);
687        assert!(dispatch.output_slots_moved >= before.output_slots_moved + 1);
688        assert!(dispatch.output_slots_appended >= before.output_slots_appended + 1);
689        assert!(dispatch.output_slot_incoming_bytes >= before.output_slot_incoming_bytes + 9);
690        assert!(dispatch.output_slot_copied_bytes >= before.output_slot_copied_bytes + 2);
691        assert!(dispatch.output_slot_moved_bytes >= before.output_slot_moved_bytes + 4);
692        assert!(dispatch.output_slot_appended_bytes >= before.output_slot_appended_bytes + 3);
693        assert!(
694            dispatch.output_slot_retained_capacity_bytes
695                >= before.output_slot_retained_capacity_bytes + 16
696        );
697
698        #[cfg(feature = "self-substrate-adapters")]
699        {
700            let snap = DriverObservability::snapshot();
701            let prom = snap.to_prometheus();
702            assert!(prom.contains("vyre_driver_dispatch_launches_total"));
703            assert!(prom.contains("direction=\"input\""));
704            assert!(prom.contains("kind=\"appended\""));
705            assert!(prom.contains("kind=\"retained_capacity\""));
706        }
707    }
708
709    #[test]
710    fn grid_sync_telemetry_records_segments_and_sync_points() {
711        let before = snapshot_dispatch_telemetry();
712        record_grid_sync_split(4);
713        let after = snapshot_dispatch_telemetry();
714
715        assert!(after.grid_sync_splits >= before.grid_sync_splits + 1);
716        assert!(after.grid_sync_segments >= before.grid_sync_segments + 4);
717        assert!(after.grid_sync_points >= before.grid_sync_points + 3);
718
719        #[cfg(feature = "self-substrate-adapters")]
720        assert!(DriverObservability::snapshot()
721            .to_prometheus()
722            .contains("kind=\"sync_points\""));
723    }
724}