1#[cfg(feature = "self-substrate-adapters")]
16use vyre_self_substrate::decision_telemetry as decision_obs;
17#[cfg(feature = "self-substrate-adapters")]
18use vyre_self_substrate::observability as substrate_obs;
19
20use std::collections::VecDeque;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::{Mutex, OnceLock};
23
24const TRACE_EVENT_CAPACITY: usize = 256;
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct SubstrateAuditEvent {
29 pub substrate: &'static str,
31 pub action: &'static str,
33 pub saved_ns: u128,
35 pub detail: &'static str,
37}
38
39#[derive(Debug, Clone)]
45pub struct DriverObservability {
46 pub substrate_calls: Vec<(&'static str, u64)>,
48 pub substrate_total_calls: u64,
50 pub decision_buckets: Vec<(&'static str, u64)>,
54 pub audit_events: Vec<SubstrateAuditEvent>,
57 pub dispatch: DispatchTelemetry,
60}
61
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
64pub struct DispatchTelemetry {
65 pub launches: u64,
67 pub input_bytes: u64,
69 pub output_bytes: u64,
71 pub output_slots: u64,
73 pub output_slots_reused: u64,
75 pub output_slots_moved: u64,
77 pub output_slots_appended: u64,
80 pub output_slot_incoming_bytes: u64,
82 pub output_slot_copied_bytes: u64,
84 pub output_slot_moved_bytes: u64,
86 pub output_slot_appended_bytes: u64,
88 pub output_slot_retained_capacity_bytes: u64,
90 pub grid_sync_splits: u64,
92 pub grid_sync_segments: u64,
94 pub grid_sync_points: u64,
96}
97
98struct DispatchTelemetryCounters {
99 launches: AtomicU64,
100 input_bytes: AtomicU64,
101 output_bytes: AtomicU64,
102 output_slots: AtomicU64,
103 output_slots_reused: AtomicU64,
104 output_slots_moved: AtomicU64,
105 output_slots_appended: AtomicU64,
106 output_slot_incoming_bytes: AtomicU64,
107 output_slot_copied_bytes: AtomicU64,
108 output_slot_moved_bytes: AtomicU64,
109 output_slot_appended_bytes: AtomicU64,
110 output_slot_retained_capacity_bytes: AtomicU64,
111 grid_sync_splits: AtomicU64,
112 grid_sync_segments: AtomicU64,
113 grid_sync_points: AtomicU64,
114}
115
116impl DispatchTelemetryCounters {
117 const fn new() -> Self {
118 Self {
119 launches: AtomicU64::new(0),
120 input_bytes: AtomicU64::new(0),
121 output_bytes: AtomicU64::new(0),
122 output_slots: AtomicU64::new(0),
123 output_slots_reused: AtomicU64::new(0),
124 output_slots_moved: AtomicU64::new(0),
125 output_slots_appended: AtomicU64::new(0),
126 output_slot_incoming_bytes: AtomicU64::new(0),
127 output_slot_copied_bytes: AtomicU64::new(0),
128 output_slot_moved_bytes: AtomicU64::new(0),
129 output_slot_appended_bytes: AtomicU64::new(0),
130 output_slot_retained_capacity_bytes: AtomicU64::new(0),
131 grid_sync_splits: AtomicU64::new(0),
132 grid_sync_segments: AtomicU64::new(0),
133 grid_sync_points: AtomicU64::new(0),
134 }
135 }
136
137 fn snapshot(&self) -> DispatchTelemetry {
138 DispatchTelemetry {
139 launches: self.launches.load(Ordering::Relaxed),
140 input_bytes: self.input_bytes.load(Ordering::Relaxed),
141 output_bytes: self.output_bytes.load(Ordering::Relaxed),
142 output_slots: self.output_slots.load(Ordering::Relaxed),
143 output_slots_reused: self.output_slots_reused.load(Ordering::Relaxed),
144 output_slots_moved: self.output_slots_moved.load(Ordering::Relaxed),
145 output_slots_appended: self.output_slots_appended.load(Ordering::Relaxed),
146 output_slot_incoming_bytes: self.output_slot_incoming_bytes.load(Ordering::Relaxed),
147 output_slot_copied_bytes: self.output_slot_copied_bytes.load(Ordering::Relaxed),
148 output_slot_moved_bytes: self.output_slot_moved_bytes.load(Ordering::Relaxed),
149 output_slot_appended_bytes: self.output_slot_appended_bytes.load(Ordering::Relaxed),
150 output_slot_retained_capacity_bytes: self
151 .output_slot_retained_capacity_bytes
152 .load(Ordering::Relaxed),
153 grid_sync_splits: self.grid_sync_splits.load(Ordering::Relaxed),
154 grid_sync_segments: self.grid_sync_segments.load(Ordering::Relaxed),
155 grid_sync_points: self.grid_sync_points.load(Ordering::Relaxed),
156 }
157 }
158}
159
160static DISPATCH_TELEMETRY: DispatchTelemetryCounters = DispatchTelemetryCounters::new();
161
162impl DriverObservability {
163 #[must_use]
165 pub fn snapshot() -> Self {
166 #[cfg(feature = "self-substrate-adapters")]
167 {
168 return Self::try_snapshot().unwrap_or_else(|_| Self {
169 substrate_calls: Vec::new(),
170 substrate_total_calls: 0,
171 decision_buckets: Vec::new(),
172 audit_events: Vec::new(),
173 dispatch: snapshot_dispatch_telemetry(),
174 });
175 }
176 #[cfg(not(feature = "self-substrate-adapters"))]
177 {
178 Self {
179 substrate_calls: Vec::new(),
180 substrate_total_calls: 0,
181 decision_buckets: Vec::new(),
182 audit_events: Vec::new(),
183 dispatch: snapshot_dispatch_telemetry(),
184 }
185 }
186 }
187
188 pub fn try_snapshot() -> Result<Self, crate::backend::BackendError> {
197 #[cfg(feature = "self-substrate-adapters")]
198 {
199 Ok(Self {
200 substrate_calls: substrate_obs::snapshot_counters(),
201 substrate_total_calls: substrate_obs::total_calls(),
202 decision_buckets: decision_obs::snapshot_decisions(),
203 audit_events: snapshot_trace_events(),
204 dispatch: snapshot_dispatch_telemetry(),
205 })
206 }
207 #[cfg(not(feature = "self-substrate-adapters"))]
208 {
209 Err(crate::backend::BackendError::new(
210 "vyre-driver observability substrate telemetry requires the self-substrate-adapters feature. Fix: enable the feature for substrate counters, or use DriverObservability::snapshot for dispatch-only compatibility telemetry."
211 .to_string(),
212 ))
213 }
214 }
215
216 #[must_use]
219 pub fn to_prometheus(&self) -> String {
220 let mut out = String::with_capacity(prometheus_capacity(
221 self.substrate_calls.len(),
222 self.decision_buckets.len(),
223 self.audit_events.len(),
224 ));
225 out.push_str(
226 "# HELP vyre_driver_substrate_calls_total Total substrate-consumer calls per module\n",
227 );
228 out.push_str("# TYPE vyre_driver_substrate_calls_total counter\n");
229 for (module, count) in &self.substrate_calls {
230 let module_label = module.trim_end_matches("_calls");
233 use std::fmt::Write;
234 let _ = writeln!(
235 out,
236 "vyre_driver_substrate_calls_total{{module=\"{module_label}\"}} {count}"
237 );
238 }
239 out.push_str(
240 "# HELP vyre_driver_substrate_total_calls Sum of all substrate-consumer calls\n",
241 );
242 out.push_str("# TYPE vyre_driver_substrate_total_calls counter\n");
243 let _ = std::fmt::Write::write_fmt(
244 &mut out,
245 format_args!(
246 "vyre_driver_substrate_total_calls {}\n",
247 self.substrate_total_calls
248 ),
249 );
250 out.push_str("# HELP vyre_driver_substrate_decisions_total Substrate-decision histogram (fusion/eviction/provenance buckets)\n");
251 out.push_str("# TYPE vyre_driver_substrate_decisions_total counter\n");
252 for (bucket, count) in &self.decision_buckets {
253 use std::fmt::Write;
254 let _ = writeln!(
255 out,
256 "vyre_driver_substrate_decisions_total{{bucket=\"{bucket}\"}} {count}"
257 );
258 }
259 out.push_str("# HELP vyre_driver_substrate_audit_saved_ns Predicted or measured savings per optimization event\n");
260 out.push_str("# TYPE vyre_driver_substrate_audit_saved_ns gauge\n");
261 for event in &self.audit_events {
262 use std::fmt::Write;
263 let _ = writeln!(
264 out,
265 "vyre_driver_substrate_audit_saved_ns{{substrate=\"{}\",action=\"{}\",detail=\"{}\"}} {}",
266 event.substrate, event.action, event.detail, event.saved_ns
267 );
268 }
269 out.push_str("# HELP vyre_driver_dispatch_launches_total Dispatch submissions observed by the shared driver boundary\n");
270 out.push_str("# TYPE vyre_driver_dispatch_launches_total counter\n");
271 let _ = std::fmt::Write::write_fmt(
272 &mut out,
273 format_args!(
274 "vyre_driver_dispatch_launches_total {}\n",
275 self.dispatch.launches
276 ),
277 );
278 out.push_str(
279 "# HELP vyre_driver_dispatch_bytes_total Host-visible dispatch bytes by direction\n",
280 );
281 out.push_str("# TYPE vyre_driver_dispatch_bytes_total counter\n");
282 let _ = std::fmt::Write::write_fmt(
283 &mut out,
284 format_args!(
285 "vyre_driver_dispatch_bytes_total{{direction=\"input\"}} {}\nvyre_driver_dispatch_bytes_total{{direction=\"output\"}} {}\n",
286 self.dispatch.input_bytes,
287 self.dispatch.output_bytes
288 ),
289 );
290 out.push_str(
291 "# HELP vyre_driver_dispatch_output_slots_total Output slot handling by kind\n",
292 );
293 out.push_str("# TYPE vyre_driver_dispatch_output_slots_total counter\n");
294 let _ = std::fmt::Write::write_fmt(
295 &mut out,
296 format_args!(
297 "vyre_driver_dispatch_output_slots_total{{kind=\"total\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"reused\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"moved\"}} {}\nvyre_driver_dispatch_output_slots_total{{kind=\"appended\"}} {}\n",
298 self.dispatch.output_slots,
299 self.dispatch.output_slots_reused,
300 self.dispatch.output_slots_moved,
301 self.dispatch.output_slots_appended
302 ),
303 );
304 out.push_str("# HELP vyre_driver_dispatch_output_slot_bytes_total Output slot byte pressure by kind\n");
305 out.push_str("# TYPE vyre_driver_dispatch_output_slot_bytes_total counter\n");
306 let _ = std::fmt::Write::write_fmt(
307 &mut out,
308 format_args!(
309 "vyre_driver_dispatch_output_slot_bytes_total{{kind=\"incoming\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"copied\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"moved\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"appended\"}} {}\nvyre_driver_dispatch_output_slot_bytes_total{{kind=\"retained_capacity\"}} {}\n",
310 self.dispatch.output_slot_incoming_bytes,
311 self.dispatch.output_slot_copied_bytes,
312 self.dispatch.output_slot_moved_bytes,
313 self.dispatch.output_slot_appended_bytes,
314 self.dispatch.output_slot_retained_capacity_bytes
315 ),
316 );
317 out.push_str("# HELP vyre_driver_grid_sync_splits_total Grid-sync split events and produced synchronization structure\n");
318 out.push_str("# TYPE vyre_driver_grid_sync_splits_total counter\n");
319 let _ = std::fmt::Write::write_fmt(
320 &mut out,
321 format_args!(
322 "vyre_driver_grid_sync_splits_total{{kind=\"programs\"}} {}\nvyre_driver_grid_sync_splits_total{{kind=\"segments\"}} {}\nvyre_driver_grid_sync_splits_total{{kind=\"sync_points\"}} {}\n",
323 self.dispatch.grid_sync_splits,
324 self.dispatch.grid_sync_segments,
325 self.dispatch.grid_sync_points
326 ),
327 );
328 out
329 }
330
331 #[must_use]
333 pub fn to_audit_log(&self) -> String {
334 let mut out = String::with_capacity(audit_log_capacity(self.audit_events.len()));
335 for event in &self.audit_events {
336 use std::fmt::Write;
337 let _ = writeln!(
338 out,
339 "{} {} saved={}ns {}",
340 event.substrate, event.action, event.saved_ns, event.detail
341 );
342 }
343 out
344 }
345}
346
347fn prometheus_capacity(
348 substrate_calls: usize,
349 decision_buckets: usize,
350 audit_events: usize,
351) -> usize {
352 let substrate_capacity =
353 checked_capacity_mul(substrate_calls, 96, "substrate call metrics").unwrap_or(usize::MAX);
354 let decision_capacity =
355 checked_capacity_mul(decision_buckets, 112, "decision bucket metrics").unwrap_or(usize::MAX);
356 let audit_capacity =
357 checked_capacity_mul(audit_events, 128, "audit event metrics").unwrap_or(usize::MAX);
358 checked_capacity_add(
359 384,
360 substrate_capacity,
361 "prometheus substrate call capacity",
362 )
363 .and_then(|capacity| {
364 checked_capacity_add(
365 capacity,
366 decision_capacity,
367 "prometheus decision bucket capacity",
368 )
369 })
370 .and_then(|capacity| {
371 checked_capacity_add(capacity, audit_capacity, "prometheus audit event capacity")
372 })
373 .unwrap_or(usize::MAX)
374}
375
376fn audit_log_capacity(audit_events: usize) -> usize {
377 checked_capacity_mul(audit_events, 96, "audit log events").unwrap_or(usize::MAX)
378}
379
380fn checked_capacity_mul(
381 count: usize,
382 bytes_per_entry: usize,
383 label: &str,
384) -> Result<usize, String> {
385 count.checked_mul(bytes_per_entry).ok_or_else(|| {
386 format!(
387 "{label} capacity estimate overflowed: count={count}, bytes_per_entry={bytes_per_entry}. Fix: page observability output instead of silently clamping allocation size."
388 )
389 })
390}
391
392fn checked_capacity_add(left: usize, right: usize, label: &str) -> Result<usize, String> {
393 left.checked_add(right).ok_or_else(|| {
394 format!(
395 "{label} capacity estimate overflowed: left={left}, right={right}. Fix: page observability output instead of silently clamping allocation size."
396 )
397 })
398}
399
400pub fn record_dispatch_io(inputs: &[&[u8]], outputs: &[Vec<u8>]) {
402 DISPATCH_TELEMETRY.launches.fetch_add(1, Ordering::Relaxed);
403 DISPATCH_TELEMETRY
404 .input_bytes
405 .fetch_add(sum_input_bytes(inputs), Ordering::Relaxed);
406 DISPATCH_TELEMETRY
407 .output_bytes
408 .fetch_add(sum_output_bytes(outputs), Ordering::Relaxed);
409}
410
411pub fn record_output_slot_stats(stats: crate::backend::OutputSlotStats) {
413 DISPATCH_TELEMETRY
414 .output_slots
415 .fetch_add(stats.total_slots as u64, Ordering::Relaxed);
416 DISPATCH_TELEMETRY
417 .output_slots_reused
418 .fetch_add(stats.reused_slots as u64, Ordering::Relaxed);
419 DISPATCH_TELEMETRY
420 .output_slots_moved
421 .fetch_add(stats.moved_slots as u64, Ordering::Relaxed);
422 DISPATCH_TELEMETRY
423 .output_slots_appended
424 .fetch_add(stats.appended_slots as u64, Ordering::Relaxed);
425}
426
427pub fn record_output_replacement_stats(stats: crate::backend::OutputReplacementStats) {
429 record_output_slot_stats(stats.slots);
430 record_output_slot_byte_stats(stats.bytes);
431}
432
433pub fn record_output_slot_byte_stats(stats: crate::backend::OutputSlotByteStats) {
435 DISPATCH_TELEMETRY
436 .output_slot_incoming_bytes
437 .fetch_add(stats.incoming_bytes as u64, Ordering::Relaxed);
438 DISPATCH_TELEMETRY
439 .output_slot_copied_bytes
440 .fetch_add(stats.copied_bytes as u64, Ordering::Relaxed);
441 DISPATCH_TELEMETRY
442 .output_slot_moved_bytes
443 .fetch_add(stats.moved_bytes as u64, Ordering::Relaxed);
444 DISPATCH_TELEMETRY
445 .output_slot_appended_bytes
446 .fetch_add(stats.appended_bytes as u64, Ordering::Relaxed);
447 DISPATCH_TELEMETRY
448 .output_slot_retained_capacity_bytes
449 .fetch_add(stats.retained_capacity_bytes as u64, Ordering::Relaxed);
450}
451
452pub fn record_grid_sync_split(segment_count: usize) {
455 DISPATCH_TELEMETRY
456 .grid_sync_splits
457 .fetch_add(1, Ordering::Relaxed);
458 DISPATCH_TELEMETRY
459 .grid_sync_segments
460 .fetch_add(segment_count as u64, Ordering::Relaxed);
461 let sync_points = segment_count.saturating_sub(1);
462 DISPATCH_TELEMETRY
463 .grid_sync_points
464 .fetch_add(sync_points as u64, Ordering::Relaxed);
465}
466
467#[must_use]
469pub fn snapshot_dispatch_telemetry() -> DispatchTelemetry {
470 DISPATCH_TELEMETRY.snapshot()
471}
472
473fn sum_input_bytes(inputs: &[&[u8]]) -> u64 {
474 inputs.iter().map(|input| input.len() as u64).sum()
475}
476
477fn sum_output_bytes(outputs: &[Vec<u8>]) -> u64 {
478 outputs.iter().map(|output| output.len() as u64).sum()
479}
480
481pub trait BackendObservabilityProvider {
485 fn backend_metrics(&self) -> Vec<(&'static str, u64)>;
489}
490
491fn trace_events() -> &'static Mutex<VecDeque<SubstrateAuditEvent>> {
492 static EVENTS: OnceLock<Mutex<VecDeque<SubstrateAuditEvent>>> = OnceLock::new();
493 EVENTS.get_or_init(|| Mutex::new(VecDeque::with_capacity(TRACE_EVENT_CAPACITY)))
494}
495
496fn trace_enabled() -> bool {
497 static ENABLED: OnceLock<bool> = OnceLock::new();
498 *ENABLED.get_or_init(|| {
499 std::env::var("VYRE_TRACE")
500 .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
501 .unwrap_or(false)
502 })
503}
504
505pub fn record_substrate_audit_event(event: SubstrateAuditEvent) {
510 if !trace_enabled() {
511 return;
512 }
513 if let Ok(mut events) = trace_events().lock() {
514 if events.len() == TRACE_EVENT_CAPACITY {
515 events.pop_front();
516 }
517 tracing::info!(
518 target: "vyre_driver::substrate_audit",
519 substrate = event.substrate,
520 action = event.action,
521 saved_ns = event.saved_ns,
522 detail = event.detail,
523 "vyre substrate optimization fired"
524 );
525 events.push_back(event);
526 }
527}
528
529#[cfg(feature = "self-substrate-adapters")]
530fn snapshot_trace_events() -> Vec<SubstrateAuditEvent> {
531 trace_events()
532 .lock()
533 .map(|events| {
534 let mut snapshot = Vec::new();
535 let _ = snapshot.try_reserve_exact(events.len());
536 snapshot.extend(events.iter().cloned());
537 snapshot
538 })
539 .unwrap_or_default()
540}
541
542#[cfg(test)]
543pub(crate) fn record_substrate_audit_event_for_test(event: SubstrateAuditEvent) {
544 if let Ok(mut events) = trace_events().lock() {
545 if events.len() == TRACE_EVENT_CAPACITY {
546 events.pop_front();
547 }
548 events.push_back(event);
549 }
550}
551
552#[cfg(test)]
553pub(crate) fn snapshot_for_test() -> DriverObservability {
554 let audit_events = trace_events()
555 .lock()
556 .map(|events| events.iter().cloned().collect())
557 .unwrap_or_default();
558 DriverObservability {
559 substrate_calls: Vec::new(),
560 substrate_total_calls: 0,
561 decision_buckets: Vec::new(),
562 audit_events,
563 dispatch: snapshot_dispatch_telemetry(),
564 }
565}
566
567#[cfg(test)]
568pub(crate) fn clear_substrate_audit_events_for_test() {
569 if let Ok(mut events) = trace_events().lock() {
570 events.clear();
571 }
572}
573
574#[cfg(test)]
575pub(crate) fn audit_events_test_lock() -> std::sync::MutexGuard<'static, ()> {
576 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
577 LOCK.get_or_init(|| Mutex::new(()))
578 .lock()
579 .expect("Fix: audit event test lock must not be poisoned")
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 #[cfg(feature = "self-substrate-adapters")]
588 fn snapshot_yields_nonempty_substrate_list() {
589 let snap = DriverObservability::snapshot();
590 assert!(
591 snap.substrate_calls
592 .iter()
593 .any(|(module, count)| *count > 0 && !module.is_empty()),
594 "snapshot must record at least one substrate module with nonzero calls"
595 );
596 }
597
598 #[test]
599 #[cfg(feature = "self-substrate-adapters")]
600 fn prometheus_output_contains_module_labels() {
601 let snap = DriverObservability::snapshot();
602 let prom = snap.to_prometheus();
603 assert!(prom.contains("module=\"matroid_megakernel_scheduler\""));
604 assert!(prom.contains("module=\"vsa_fingerprint\""));
605 assert!(prom.contains("# HELP vyre_driver_substrate_calls_total"));
606 }
607
608 #[test]
609 #[cfg(not(feature = "self-substrate-adapters"))]
610 fn try_snapshot_without_adapter_feature_returns_structured_error() {
611 let error = DriverObservability::try_snapshot()
612 .expect_err("try_snapshot must report missing substrate telemetry as an error");
613 let message = error.to_string();
614 assert!(
615 message.contains("self-substrate-adapters"),
616 "structured error must name the missing feature"
617 );
618 assert!(
619 message.contains("DriverObservability::snapshot"),
620 "structured error must name the dispatch-only compatibility path"
621 );
622 }
623
624 #[test]
625 #[cfg(not(feature = "self-substrate-adapters"))]
626 fn snapshot_without_adapter_feature_is_dispatch_only_not_panic() {
627 let snapshot = DriverObservability::snapshot();
628 assert!(snapshot.substrate_calls.is_empty());
629 assert_eq!(snapshot.substrate_total_calls, 0);
630 assert!(snapshot.decision_buckets.is_empty());
631 }
632
633 #[test]
634 #[cfg(feature = "self-substrate-adapters")]
635 fn total_calls_appears_in_prometheus() {
636 let snap = DriverObservability::snapshot();
637 let prom = snap.to_prometheus();
638 assert!(prom.contains("vyre_driver_substrate_total_calls"));
639 }
640
641 #[test]
642 #[cfg(feature = "self-substrate-adapters")]
643 fn audit_log_and_prometheus_include_recorded_events() {
644 let _guard = audit_events_test_lock();
645 clear_substrate_audit_events_for_test();
646 record_substrate_audit_event_for_test(SubstrateAuditEvent {
647 substrate: "trace_jit",
648 action: "speculate",
649 saved_ns: 123,
650 detail: "predicted_shape",
651 });
652 let snap = DriverObservability::snapshot();
653 assert_eq!(snap.audit_events.len(), 1);
654 assert!(snap
655 .to_audit_log()
656 .contains("trace_jit speculate saved=123ns"));
657 assert!(snap
658 .to_prometheus()
659 .contains("vyre_driver_substrate_audit_saved_ns"));
660 clear_substrate_audit_events_for_test();
661 }
662
663 #[test]
664 fn dispatch_telemetry_records_bytes_slots_and_prometheus_metrics() {
665 let before = snapshot_dispatch_telemetry();
666 record_dispatch_io(&[&[1, 2, 3], &[4]], &[vec![9, 8]]);
667 record_output_slot_stats(crate::backend::OutputSlotStats {
668 total_slots: 3,
669 reused_slots: 1,
670 moved_slots: 1,
671 appended_slots: 1,
672 });
673 record_output_slot_byte_stats(crate::backend::OutputSlotByteStats {
674 incoming_bytes: 9,
675 copied_bytes: 2,
676 moved_bytes: 4,
677 appended_bytes: 3,
678 retained_capacity_bytes: 16,
679 });
680
681 let dispatch = snapshot_dispatch_telemetry();
682 assert!(dispatch.launches >= before.launches + 1);
683 assert!(dispatch.input_bytes >= before.input_bytes + 4);
684 assert!(dispatch.output_bytes >= before.output_bytes + 2);
685 assert!(dispatch.output_slots >= before.output_slots + 3);
686 assert!(dispatch.output_slots_reused >= before.output_slots_reused + 1);
687 assert!(dispatch.output_slots_moved >= before.output_slots_moved + 1);
688 assert!(dispatch.output_slots_appended >= before.output_slots_appended + 1);
689 assert!(dispatch.output_slot_incoming_bytes >= before.output_slot_incoming_bytes + 9);
690 assert!(dispatch.output_slot_copied_bytes >= before.output_slot_copied_bytes + 2);
691 assert!(dispatch.output_slot_moved_bytes >= before.output_slot_moved_bytes + 4);
692 assert!(dispatch.output_slot_appended_bytes >= before.output_slot_appended_bytes + 3);
693 assert!(
694 dispatch.output_slot_retained_capacity_bytes
695 >= before.output_slot_retained_capacity_bytes + 16
696 );
697
698 #[cfg(feature = "self-substrate-adapters")]
699 {
700 let snap = DriverObservability::snapshot();
701 let prom = snap.to_prometheus();
702 assert!(prom.contains("vyre_driver_dispatch_launches_total"));
703 assert!(prom.contains("direction=\"input\""));
704 assert!(prom.contains("kind=\"appended\""));
705 assert!(prom.contains("kind=\"retained_capacity\""));
706 }
707 }
708
709 #[test]
710 fn grid_sync_telemetry_records_segments_and_sync_points() {
711 let before = snapshot_dispatch_telemetry();
712 record_grid_sync_split(4);
713 let after = snapshot_dispatch_telemetry();
714
715 assert!(after.grid_sync_splits >= before.grid_sync_splits + 1);
716 assert!(after.grid_sync_segments >= before.grid_sync_segments + 4);
717 assert!(after.grid_sync_points >= before.grid_sync_points + 3);
718
719 #[cfg(feature = "self-substrate-adapters")]
720 assert!(DriverObservability::snapshot()
721 .to_prometheus()
722 .contains("kind=\"sync_points\""));
723 }
724}