use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use crate::monitor::dump::FailureDumpReport;
use crate::sync::MutexExt;
pub type CaptureCallback = Arc<dyn Fn(&str) -> Option<FailureDumpReport> + Send + Sync + 'static>;
pub type WatchRegisterCallback =
Arc<dyn Fn(&str) -> std::result::Result<(), String> + Send + Sync + 'static>;
pub type KernelOpCallback = Arc<
dyn Fn(&crate::vmm::wire::KernelOpRequestPayload) -> crate::vmm::wire::KernelOpReplyPayload
+ Send
+ Sync
+ 'static,
>;
pub const MAX_WATCH_SNAPSHOTS: usize = 3;
pub const MAX_STORED_SNAPSHOTS: usize = 64;
pub const MAX_STORED_EVENTS: usize = 1024;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum SnapshotBridgeEvent {
CaptureUnavailable {
tag: String,
},
Overwrite {
tag: String,
prior_schema: String,
},
Eviction {
evicted_tag: String,
new_tag: String,
cap: usize,
},
DrainOrderingInvariantViolation {
tag: String,
drain_variant: &'static str,
},
CapInvariantViolation {
reports_len: usize,
cap: usize,
},
EventLogTruncated {
dropped_count: u64,
},
}
pub(super) struct SnapshotStore {
pub(super) reports: HashMap<String, FailureDumpReport>,
pub(super) stats: HashMap<String, Result<serde_json::Value, super::error::MissingStatsReason>>,
pub(super) elapsed_ms: HashMap<String, u64>,
pub(super) step_index: HashMap<String, u16>,
pub(super) order: VecDeque<String>,
events: Vec<SnapshotBridgeEvent>,
events_dropped: u64,
}
impl SnapshotStore {
fn new() -> Self {
Self {
reports: HashMap::new(),
stats: HashMap::new(),
elapsed_ms: HashMap::new(),
step_index: HashMap::new(),
order: VecDeque::new(),
events: Vec::new(),
events_dropped: 0,
}
}
fn push_event(&mut self, event: SnapshotBridgeEvent) {
if self.events.len() >= MAX_STORED_EVENTS {
self.events.remove(0);
self.events_dropped = self.events_dropped.saturating_add(1);
}
self.events.push(event);
}
}
struct WatchSlotGuard<'a> {
count: &'a std::sync::atomic::AtomicUsize,
}
impl Drop for WatchSlotGuard<'_> {
fn drop(&mut self) {
self.count
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Clone)]
#[must_use = "dropping a SnapshotBridge discards the capture pipeline"]
pub struct SnapshotBridge {
capture: CaptureCallback,
register_watch: Option<WatchRegisterCallback>,
kernel_op: Option<KernelOpCallback>,
pub(super) snapshots: Arc<Mutex<SnapshotStore>>,
kernel_ops: Arc<Mutex<Vec<(String, crate::vmm::wire::KernelOpReplyPayload)>>>,
watch_count: Arc<std::sync::atomic::AtomicUsize>,
accessor_publish_seqno: Option<Arc<std::sync::atomic::AtomicU64>>,
accessor_worker_state: Option<Arc<std::sync::atomic::AtomicU8>>,
accessor_dispatcher_wake_evt: Option<Arc<vmm_sys_util::eventfd::EventFd>>,
}
pub mod accessor_worker_state {
pub const TRYING: u8 = 0;
pub const SUCCEEDED: u8 = 1;
pub const FAILED_PERMANENTLY: u8 = 2;
}
impl std::fmt::Debug for SnapshotBridge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotBridge")
.field("snapshots", &self.len())
.field("watch_count", &self.watch_count())
.field("capture", &"<callback>")
.field(
"register_watch",
&if self.register_watch.is_some() {
"<callback>"
} else {
"<none>"
},
)
.finish()
}
}
fn is_periodic_tag(tag: &str) -> bool {
match tag.strip_prefix("periodic_") {
Some(rest) => rest.len() == 3 && rest.bytes().all(|b| b.is_ascii_digit()),
None => false,
}
}
impl SnapshotBridge {
pub fn new(capture: CaptureCallback) -> Self {
Self {
capture,
register_watch: None,
kernel_op: None,
snapshots: Arc::new(Mutex::new(SnapshotStore::new())),
kernel_ops: Arc::new(Mutex::new(Vec::new())),
watch_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
accessor_publish_seqno: None,
accessor_worker_state: None,
accessor_dispatcher_wake_evt: None,
}
}
pub fn with_accessor_state(
mut self,
publish_seqno: Arc<std::sync::atomic::AtomicU64>,
worker_state: Arc<std::sync::atomic::AtomicU8>,
dispatcher_wake_evt: Arc<vmm_sys_util::eventfd::EventFd>,
) -> Self {
self.accessor_publish_seqno = Some(publish_seqno);
self.accessor_worker_state = Some(worker_state);
self.accessor_dispatcher_wake_evt = Some(dispatcher_wake_evt);
self
}
pub fn accessor_publish_seqno(&self) -> u64 {
self.accessor_publish_seqno
.as_ref()
.map(|s| s.load(std::sync::atomic::Ordering::Acquire))
.unwrap_or(0)
}
pub fn wait_for_worker_state_not_trying(
&self,
deadline: std::time::Instant,
op_label: &str,
) -> anyhow::Result<()> {
let Some(state) = self.accessor_worker_state.as_ref() else {
return Ok(());
};
loop {
let cur = state.load(std::sync::atomic::Ordering::Acquire);
if cur != accessor_worker_state::TRYING {
if cur == accessor_worker_state::FAILED_PERMANENTLY {
anyhow::bail!(
"{op_label}: accessor-init worker exited \
(FAILED_PERMANENTLY) before this op could run — \
check freeze-coord 'vmm-accessor-init' logs for ELF \
parse or boot-budget failures"
);
}
return Ok(());
}
let now = std::time::Instant::now();
if now >= deadline {
anyhow::bail!(
"{op_label}: accessor-init worker stayed in TRYING state \
past the deadline — the worker's first publish has not \
completed. Likely cause: kernel boot stalled before \
accessor's bootstrap symbols became readable. Check \
freeze-coord logs for `accessor-init:` lines"
);
}
let remaining = deadline.saturating_duration_since(now);
self.poll_dispatcher_wake(remaining);
}
}
fn poll_dispatcher_wake(&self, remaining: std::time::Duration) {
if remaining.is_zero() {
return;
}
let Some(evt) = self.accessor_dispatcher_wake_evt.as_ref() else {
unreachable!(
"poll_dispatcher_wake reached without an installed wake fd — \
SnapshotBridge::with_accessor_state stores accessor_worker_state \
and accessor_dispatcher_wake_evt together, so any caller that \
passed the worker_state gate must also have the wake fd. A None \
here indicates a bridge constructor that violated the coupling \
invariant"
);
};
let ms = remaining.as_millis().min(i32::MAX as u128) as i32;
let fd = {
use std::os::unix::io::AsRawFd;
evt.as_raw_fd()
};
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
unsafe {
libc::poll(&mut pfd, 1, ms);
}
let _ = evt.read();
}
pub fn wait_for_accessor_publish_advance(
&self,
seqno_before: u64,
deadline: std::time::Instant,
op_label: &str,
) -> anyhow::Result<u64> {
let (seqno, state) = match (
self.accessor_publish_seqno.as_ref(),
self.accessor_worker_state.as_ref(),
) {
(Some(s), Some(w)) => (s, w),
_ => return Ok(0),
};
let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
if cur > seqno_before {
return Ok(cur);
}
loop {
let cur_state = state.load(std::sync::atomic::Ordering::Acquire);
if cur_state == accessor_worker_state::FAILED_PERMANENTLY {
anyhow::bail!(
"{op_label}: accessor-init worker exited (FAILED_PERMANENTLY) — \
check freeze-coord 'vmm-accessor-init' logs for ELF parse or \
boot-budget failures; retrying the op will hit the same wall"
);
}
let cur = seqno.load(std::sync::atomic::Ordering::Acquire);
if cur > seqno_before {
return Ok(cur);
}
let now = std::time::Instant::now();
if now >= deadline {
let remaining_state = state.load(std::sync::atomic::Ordering::Acquire);
anyhow::bail!(
"{op_label}: accessor reinit did not advance publish seqno \
from {seqno_before} within deadline (worker state = \
{remaining_state}; 0=Trying / 1=Succeeded / 2=FailedPermanently). \
A reinit that's stuck in Trying past the deadline indicates the \
coord's scan-tick hasn't observed the rebind or the worker's \
`from_elf_with_hint` retry is hitting a transient address-space \
window — check freeze-coord logs for `accessor-init:` lines"
);
}
let remaining = deadline.saturating_duration_since(now);
self.poll_dispatcher_wake(remaining);
}
}
pub fn with_kernel_op(mut self, callback: KernelOpCallback) -> Self {
self.kernel_op = Some(callback);
self
}
pub fn dispatch_kernel_op(
&self,
request: &crate::vmm::wire::KernelOpRequestPayload,
) -> Option<crate::vmm::wire::KernelOpReplyPayload> {
let callback = self.kernel_op.as_ref()?;
let reply = callback(request);
self.kernel_ops
.lock_unpoisoned()
.push((request.tag.clone(), reply.clone()));
Some(reply)
}
pub fn drain_kernel_ops(&self) -> Vec<(String, crate::vmm::wire::KernelOpReplyPayload)> {
std::mem::take(&mut *self.kernel_ops.lock_unpoisoned())
}
pub fn record_kernel_op_reply(
&self,
tag: String,
reply: crate::vmm::wire::KernelOpReplyPayload,
) {
self.kernel_ops.lock_unpoisoned().push((tag, reply));
}
pub fn kernel_op_value(&self, tag: &str) -> Option<crate::vmm::wire::KernelOpValue> {
self.kernel_ops
.lock_unpoisoned()
.iter()
.find(|(t, reply)| t == tag && reply.success && !reply.read_values.is_empty())
.map(|(_, reply)| reply.read_values[0].clone())
}
pub fn with_watch_register(mut self, register: WatchRegisterCallback) -> Self {
self.register_watch = Some(register);
self
}
pub fn register_watch(&self, symbol: &str) -> std::result::Result<(), String> {
loop {
let prev = self.watch_count.load(std::sync::atomic::Ordering::Relaxed);
if prev >= MAX_WATCH_SNAPSHOTS {
return Err(format!(
"Op::WatchSnapshot cap exceeded: scenario already registered \
{MAX_WATCH_SNAPSHOTS} watchpoints ({MAX_WATCH_SNAPSHOTS} user \
watchpoint slots occupied; slot 0 reserved for the error-class \
exit_kind trigger). Drop a watch or use Op::CaptureSnapshot for a \
time-driven capture instead."
));
}
if self
.watch_count
.compare_exchange_weak(
prev,
prev + 1,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
let guard = WatchSlotGuard {
count: &self.watch_count,
};
let Some(register) = self.register_watch.as_ref() else {
drop(guard);
return Err(format!(
"Op::WatchSnapshot('{symbol}'): no watch-register callback installed \
on this SnapshotBridge — the host wires one via \
SnapshotBridge::with_watch_register before execute_steps; \
in-guest / no-VM scenarios cannot register hardware watchpoints"
));
};
register(symbol)?;
std::mem::forget(guard);
Ok(())
}
pub fn watch_count(&self) -> usize {
self.watch_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn capture(&self, name: &str) -> bool {
let Some(report) = (self.capture)(name) else {
tracing::warn!(
name,
"SnapshotBridge::capture: capture callback returned None — snapshot unavailable"
);
self.snapshots
.lock_unpoisoned()
.push_event(SnapshotBridgeEvent::CaptureUnavailable {
tag: name.to_string(),
});
return false;
};
self.store(name, report);
true
}
pub fn capture_with_step(&self, name: &str, step_index: u16) -> bool {
let Some(report) = (self.capture)(name) else {
tracing::warn!(
name,
step_index,
"SnapshotBridge::capture_with_step: capture callback returned None — snapshot unavailable"
);
self.snapshots
.lock_unpoisoned()
.push_event(SnapshotBridgeEvent::CaptureUnavailable {
tag: name.to_string(),
});
return false;
};
self.store_internal(name, report, None, None, Some(step_index));
true
}
pub fn store(&self, name: &str, report: FailureDumpReport) {
self.store_internal(name, report, None, None, None);
}
pub fn store_with_stats(
&self,
name: &str,
report: FailureDumpReport,
stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
elapsed_ms: Option<u64>,
) {
self.store_internal(name, report, stats, elapsed_ms, None);
}
pub fn store_with_stats_and_step(
&self,
name: &str,
report: FailureDumpReport,
stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
elapsed_ms: Option<u64>,
step_index: u16,
) {
self.store_internal(name, report, stats, elapsed_ms, Some(step_index));
}
fn store_internal(
&self,
name: &str,
report: FailureDumpReport,
stats: Option<Result<serde_json::Value, super::error::MissingStatsReason>>,
elapsed_ms: Option<u64>,
step_index: Option<u16>,
) {
let mut store = self.snapshots.lock_unpoisoned();
if let Some(existing) = store.reports.insert(name.to_string(), report) {
tracing::warn!(
name,
schema = %existing.schema,
"SnapshotBridge::store: name already had a stored report; overwriting prior capture"
);
store.push_event(SnapshotBridgeEvent::Overwrite {
tag: name.to_string(),
prior_schema: existing.schema.clone(),
});
if let Some(pos) = store.order.iter().position(|k| k == name) {
store.order.remove(pos);
}
store.order.push_back(name.to_string());
match stats {
Some(v) => {
store.stats.insert(name.to_string(), v);
}
None => {
store.stats.remove(name);
}
}
match elapsed_ms {
Some(v) => {
store.elapsed_ms.insert(name.to_string(), v);
}
None => {
store.elapsed_ms.remove(name);
}
}
match step_index {
Some(v) => {
store.step_index.insert(name.to_string(), v);
}
None => {
store.step_index.remove(name);
}
}
return;
}
store.order.push_back(name.to_string());
if let Some(v) = stats {
store.stats.insert(name.to_string(), v);
}
if let Some(v) = elapsed_ms {
store.elapsed_ms.insert(name.to_string(), v);
}
if let Some(v) = step_index {
store.step_index.insert(name.to_string(), v);
}
while store.reports.len() > MAX_STORED_SNAPSHOTS {
let Some(evicted) = store.order.pop_front() else {
let nuked = store.reports.len();
tracing::warn!(
reports_len = nuked,
cap = MAX_STORED_SNAPSHOTS,
"SnapshotBridge::store: order empty while reports over cap — bulk-clearing to restore invariant"
);
store.push_event(SnapshotBridgeEvent::CapInvariantViolation {
reports_len: nuked,
cap: MAX_STORED_SNAPSHOTS,
});
store.reports.clear();
store.stats.clear();
store.elapsed_ms.clear();
store.step_index.clear();
break;
};
if store.reports.remove(&evicted).is_some() {
tracing::warn!(
evicted = %evicted,
cap = MAX_STORED_SNAPSHOTS,
"SnapshotBridge::store: cap reached, evicting oldest captured snapshot"
);
store.push_event(SnapshotBridgeEvent::Eviction {
evicted_tag: evicted.clone(),
new_tag: name.to_string(),
cap: MAX_STORED_SNAPSHOTS,
});
}
store.stats.remove(&evicted);
store.elapsed_ms.remove(&evicted);
store.step_index.remove(&evicted);
}
}
pub fn len(&self) -> usize {
self.snapshots.lock_unpoisoned().reports.len()
}
pub fn is_empty(&self) -> bool {
self.snapshots.lock_unpoisoned().reports.is_empty()
}
pub fn periodic_real_count(&self) -> u32 {
let store = self.snapshots.lock_unpoisoned();
let mut n: u32 = 0;
for (tag, report) in &store.reports {
if is_periodic_tag(tag) && !report.is_placeholder {
n = n.saturating_add(1);
}
}
n
}
pub fn has(&self, name: &str) -> bool {
self.snapshots.lock_unpoisoned().reports.contains_key(name)
}
pub fn drain(&self) -> HashMap<String, FailureDumpReport> {
let mut store = self.snapshots.lock_unpoisoned();
store.order.clear();
store.stats.clear();
store.elapsed_ms.clear();
store.step_index.clear();
std::mem::take(&mut store.reports)
}
pub fn drain_ordered(&self) -> Vec<(String, FailureDumpReport)> {
let mut store = self.snapshots.lock_unpoisoned();
let order = std::mem::take(&mut store.order);
let mut reports = std::mem::take(&mut store.reports);
store.stats.clear();
store.elapsed_ms.clear();
store.step_index.clear();
let mut out: Vec<(String, FailureDumpReport)> = Vec::with_capacity(order.len());
for tag in order {
if let Some(report) = reports.remove(&tag) {
out.push((tag, report));
}
}
for (tag, report) in reports {
tracing::warn!(
tag,
"SnapshotBridge::drain_ordered: report present in `reports` \
but missing from `order` — surfacing at tail (FIFO \
invariant violation; please file)"
);
store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
tag: tag.clone(),
drain_variant: "drain_ordered",
});
out.push((tag, report));
}
out
}
pub fn drain_ordered_with_stats(&self) -> Vec<super::error::DrainedSnapshotEntry> {
let mut store = self.snapshots.lock_unpoisoned();
let order = std::mem::take(&mut store.order);
let mut reports = std::mem::take(&mut store.reports);
let mut stats = std::mem::take(&mut store.stats);
let mut elapsed = std::mem::take(&mut store.elapsed_ms);
let mut step_index = std::mem::take(&mut store.step_index);
let mut out: Vec<super::error::DrainedSnapshotEntry> = Vec::with_capacity(order.len());
let stats_fallback = || Err(super::error::MissingStatsReason::NoSchedulerBinary);
for tag in order {
if let Some(report) = reports.remove(&tag) {
let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
let e = elapsed.remove(&tag);
let phase = step_index.remove(&tag);
out.push(super::error::DrainedSnapshotEntry {
tag,
report,
stats: s,
elapsed_ms: e,
step_index: phase,
});
}
}
for (tag, report) in reports {
tracing::warn!(
tag,
"SnapshotBridge::drain_ordered_with_stats: report present in `reports` \
but missing from `order` — surfacing at tail (FIFO \
invariant violation; please file)"
);
store.push_event(SnapshotBridgeEvent::DrainOrderingInvariantViolation {
tag: tag.clone(),
drain_variant: "drain_ordered_with_stats",
});
let s = stats.remove(&tag).unwrap_or_else(stats_fallback);
let e = elapsed.remove(&tag);
let phase = step_index.remove(&tag);
out.push(super::error::DrainedSnapshotEntry {
tag,
report,
stats: s,
elapsed_ms: e,
step_index: phase,
});
}
out
}
pub fn drain_events(&self) -> Vec<SnapshotBridgeEvent> {
let mut store = self.snapshots.lock_unpoisoned();
let mut events = std::mem::take(&mut store.events);
if store.events_dropped > 0 {
events.push(SnapshotBridgeEvent::EventLogTruncated {
dropped_count: store.events_dropped,
});
store.events_dropped = 0;
}
events
}
pub fn event_count(&self) -> usize {
self.snapshots.lock_unpoisoned().events.len()
}
pub fn set_thread_local(self) -> BridgeGuard {
let prev = ACTIVE_BRIDGE.with(|c| c.borrow_mut().replace(self));
BridgeGuard { prev }
}
}
thread_local! {
static ACTIVE_BRIDGE: std::cell::RefCell<Option<SnapshotBridge>> =
const { std::cell::RefCell::new(None) };
}
#[must_use = "BridgeGuard restores the prior bridge on drop; bind it"]
pub struct BridgeGuard {
prev: Option<SnapshotBridge>,
}
impl Drop for BridgeGuard {
fn drop(&mut self) {
let prev = self.prev.take();
ACTIVE_BRIDGE.with(|c| {
*c.borrow_mut() = prev;
});
}
}
pub fn with_active_bridge<R>(f: impl FnOnce(&SnapshotBridge) -> R) -> Option<R> {
ACTIVE_BRIDGE.with(|c| c.borrow().as_ref().map(f))
}
#[cfg(test)]
mod accessor_wait_tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU64};
use std::time::{Duration, Instant};
fn bridge_with_accessor_state() -> (
SnapshotBridge,
Arc<AtomicU64>,
Arc<AtomicU8>,
Arc<vmm_sys_util::eventfd::EventFd>,
) {
let seqno = Arc::new(AtomicU64::new(0));
let worker_state = Arc::new(AtomicU8::new(accessor_worker_state::TRYING));
let wake_evt = Arc::new(
vmm_sys_util::eventfd::EventFd::new(libc::EFD_NONBLOCK)
.expect("eventfd for accessor_dispatcher_wake test fixture"),
);
let cb: CaptureCallback = Arc::new(|_| None);
let bridge = SnapshotBridge::new(cb).with_accessor_state(
seqno.clone(),
worker_state.clone(),
wake_evt.clone(),
);
(bridge, seqno, worker_state, wake_evt)
}
#[test]
fn wait_no_accessor_state_returns_ok_zero_immediately() {
let cb: CaptureCallback = Arc::new(|_| None);
let bridge = SnapshotBridge::new(cb);
let deadline = Instant::now() + Duration::from_secs(60);
assert_eq!(
bridge
.wait_for_accessor_publish_advance(0, deadline, "Op::Test")
.unwrap(),
0
);
}
#[test]
fn wait_seqno_already_advanced_returns_immediately() {
let (bridge, seqno, _, _) = bridge_with_accessor_state();
seqno.store(5, std::sync::atomic::Ordering::Release);
let deadline = Instant::now() + Duration::from_secs(60);
assert_eq!(
bridge
.wait_for_accessor_publish_advance(3, deadline, "Op::Test")
.unwrap(),
5
);
}
#[test]
fn wait_observes_worker_publish() {
let (bridge, seqno, _, wake_evt) = bridge_with_accessor_state();
let bridge_for_thread = bridge.clone();
let seqno_for_thread = seqno.clone();
let wake_for_thread = wake_evt.clone();
let publisher = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(100));
seqno_for_thread.fetch_add(1, std::sync::atomic::Ordering::Release);
let _ = wake_for_thread.write(1);
let _ = bridge_for_thread; });
let deadline = Instant::now() + Duration::from_secs(2);
let t0 = Instant::now();
let observed = bridge
.wait_for_accessor_publish_advance(0, deadline, "Op::Test")
.expect("publish observed within deadline");
let elapsed = t0.elapsed();
publisher.join().unwrap();
assert_eq!(observed, 1);
assert!(
elapsed < Duration::from_millis(500),
"wait did not wake event-driven; took {elapsed:?} \
(expected close to 100ms)"
);
}
#[test]
fn wait_bails_on_worker_failed_permanently() {
let (bridge, _, worker_state, _) = bridge_with_accessor_state();
worker_state.store(
accessor_worker_state::FAILED_PERMANENTLY,
std::sync::atomic::Ordering::Release,
);
let deadline = Instant::now() + Duration::from_secs(60);
let t0 = Instant::now();
let err = bridge
.wait_for_accessor_publish_advance(0, deadline, "Op::Test")
.expect_err("expected terminal-worker bail");
let elapsed = t0.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"wait did not surface terminal state quickly; took {elapsed:?}"
);
let msg = err.to_string();
assert!(
msg.contains("FAILED_PERMANENTLY"),
"bail message missing terminal sentinel: {msg}"
);
assert!(msg.contains("Op::Test"), "bail missing op label: {msg}");
}
#[test]
fn wait_bails_on_deadline_with_worker_state_in_diagnostic() {
let (bridge, _, _, _) = bridge_with_accessor_state();
let deadline = Instant::now() + Duration::from_millis(120);
let err = bridge
.wait_for_accessor_publish_advance(0, deadline, "Op::Test")
.expect_err("expected deadline bail");
let msg = err.to_string();
assert!(
msg.contains("worker state = 0"),
"deadline diagnostic missing worker_state: {msg}"
);
assert!(
msg.contains("Trying"),
"deadline diagnostic missing state name table: {msg}"
);
}
#[test]
fn accessor_publish_seqno_returns_zero_without_accessor_state() {
let cb: CaptureCallback = Arc::new(|_| None);
let bridge = SnapshotBridge::new(cb);
assert_eq!(bridge.accessor_publish_seqno(), 0);
}
#[test]
fn accessor_publish_seqno_reads_atomic() {
let (bridge, seqno, _, _) = bridge_with_accessor_state();
assert_eq!(bridge.accessor_publish_seqno(), 0);
seqno.store(42, std::sync::atomic::Ordering::Release);
assert_eq!(bridge.accessor_publish_seqno(), 42);
}
}
#[cfg(test)]
mod periodic_tag_tests {
use super::*;
#[test]
fn is_periodic_tag_accepts_canonical_three_digit_index() {
assert!(is_periodic_tag("periodic_000"));
assert!(is_periodic_tag("periodic_007"));
assert!(is_periodic_tag("periodic_123"));
assert!(is_periodic_tag("periodic_999"));
}
#[test]
fn is_periodic_tag_rejects_user_tag_collisions() {
assert!(!is_periodic_tag("periodic_kaslr"));
assert!(!is_periodic_tag("periodic_user_baseline"));
assert!(!is_periodic_tag("periodic_"));
assert!(!is_periodic_tag("periodic_1"));
assert!(!is_periodic_tag("periodic_12"));
assert!(!is_periodic_tag("periodic_1234"));
assert!(!is_periodic_tag("periodic_00a"));
assert!(!is_periodic_tag("periodic_007 "));
assert!(!is_periodic_tag("PERIODIC_000"));
assert!(!is_periodic_tag("capture_my_thing"));
assert!(!is_periodic_tag(""));
assert!(!is_periodic_tag("periodic"));
}
#[test]
fn periodic_real_count_ignores_user_tag_with_periodic_prefix() {
let cb: CaptureCallback = Arc::new(|_| None);
let bridge = SnapshotBridge::new(cb);
let real_periodic = crate::monitor::dump::FailureDumpReport {
schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
is_placeholder: false,
..Default::default()
};
bridge.store("periodic_000", real_periodic);
let user_capture = crate::monitor::dump::FailureDumpReport {
schema: crate::monitor::dump::SCHEMA_SINGLE.to_string(),
is_placeholder: false,
..Default::default()
};
bridge.store("periodic_kaslr", user_capture);
bridge.store(
"periodic_001",
crate::monitor::dump::FailureDumpReport::placeholder("rendezvous timed out"),
);
assert_eq!(
bridge.periodic_real_count(),
1,
"only the canonical periodic_000 real capture counts; \
user tag periodic_kaslr (even though real) must be \
excluded by the strict matcher",
);
}
}