use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use vmm_sys_util::eventfd::EventFd;
use super::snapshot::decode_snapshot_request;
use super::state::SnapshotRequest;
pub(super) struct BulkDispatchSinks<'a> {
pub kill: &'a Arc<AtomicBool>,
pub kill_evt: &'a Arc<EventFd>,
pub sys_rdy_evt: &'a mut Option<Arc<EventFd>>,
pub snapshot_requests_pending: &'a mut Vec<SnapshotRequest>,
pub kern_phys_base: &'a Arc<std::sync::atomic::AtomicU64>,
pub kern_phys_base_evt: &'a EventFd,
pub watchdog_reset: Option<(
&'a std::sync::atomic::AtomicU64,
std::time::Duration,
std::time::Instant,
)>,
pub watchdog_pause_ns: &'a std::sync::atomic::AtomicU64,
pub scenario_start_ns: &'a std::sync::atomic::AtomicU64,
pub scenario_pause_cumulative_ns: &'a std::sync::atomic::AtomicU64,
pub run_start: std::time::Instant,
}
pub(super) fn dispatch_bulk_message(
msg: &crate::vmm::bulk::BulkMessage,
sinks: &mut BulkDispatchSinks<'_>,
) -> Option<crate::vmm::wire::ShmEntry> {
let kind = crate::vmm::wire::MsgType::from_wire(msg.msg_type);
match kind {
Some(crate::vmm::wire::MsgType::SchedExit) => {
if msg.crc_ok {
sinks.kill.store(true, Ordering::Release);
if let Err(e) = sinks.kill_evt.write(1) {
tracing::warn!(
err = %e,
"freeze_coord: kill_evt write on SCHED_EXIT \
promotion failed; the kill AtomicBool above is \
still authoritative"
);
}
}
if msg.crc_ok {
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
} else {
None
}
}
Some(crate::vmm::wire::MsgType::SysRdy) => {
if msg.crc_ok
&& msg.payload.is_empty()
&& let Some(evt) = sinks.sys_rdy_evt.take()
&& let Err(e) = evt.write(1)
{
tracing::warn!(
err = %e,
"freeze_coord: sys_rdy write failed; monitor will \
rely on kill_evt or 5 s timeout to leave its \
pre-sample wait"
);
}
None
}
_ if msg.msg_type == crate::vmm::wire::MSG_TYPE_KERN_ADDRS => {
if msg.crc_ok && msg.payload.len() >= 8 {
let biased = u64::from_le_bytes(msg.payload[..8].try_into().unwrap_or([0; 8]));
if biased != 0 {
sinks
.kern_phys_base
.store(biased, std::sync::atomic::Ordering::Release);
let _ = sinks.kern_phys_base_evt.write(1);
}
}
None
}
Some(crate::vmm::wire::MsgType::SnapshotRequest) => {
if msg.crc_ok
&& let Some(req) = decode_snapshot_request(&msg.payload[..])
{
sinks.snapshot_requests_pending.push(req);
}
None
}
Some(crate::vmm::wire::MsgType::ScenarioStart) => {
if msg.crc_ok {
let elapsed = sinks.run_start.elapsed();
let elapsed_ns = u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX).max(1);
let _ = sinks.scenario_start_ns.compare_exchange(
0,
elapsed_ns,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
);
if let Some((reset_ns, duration, _)) = sinks.watchdog_reset.as_ref() {
let target_ns = elapsed.as_nanos().saturating_add(duration.as_nanos());
let encoded = u64::try_from(target_ns).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
}
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioPause) => {
if msg.crc_ok {
let elapsed = sinks
.watchdog_reset
.as_ref()
.map(|(_, _, run_start)| run_start.elapsed().as_nanos())
.unwrap_or(0);
let encoded = u64::try_from(elapsed).unwrap_or(u64::MAX).max(1);
sinks
.watchdog_pause_ns
.store(encoded, std::sync::atomic::Ordering::Release);
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioResume) => {
if msg.crc_ok
&& let Some((reset_ns, _, run_start)) = sinks.watchdog_reset.as_ref()
{
let paused_at = sinks
.watchdog_pause_ns
.swap(0, std::sync::atomic::Ordering::AcqRel);
if paused_at > 0 {
let elapsed = run_start.elapsed();
let pause_duration = elapsed.as_nanos().saturating_sub(paused_at as u128);
let prior = reset_ns.load(std::sync::atomic::Ordering::Acquire);
let extended = (prior as u128).saturating_add(pause_duration);
let encoded = u64::try_from(extended).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
let prior_cumulative = sinks
.scenario_pause_cumulative_ns
.load(std::sync::atomic::Ordering::Acquire);
let new_cumulative = (prior_cumulative as u128).saturating_add(pause_duration);
let encoded_cumulative = u64::try_from(new_cumulative).unwrap_or(u64::MAX);
sinks
.scenario_pause_cumulative_ns
.store(encoded_cumulative, std::sync::atomic::Ordering::Release);
}
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioEnd) => {
if msg.crc_ok
&& let Some((reset_ns, duration, run_start)) = sinks.watchdog_reset.as_ref()
{
let elapsed = run_start.elapsed();
let target_ns = elapsed.as_nanos().saturating_add(duration.as_nanos());
let encoded = u64::try_from(target_ns).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(other) if !other.is_coordinator_internal() => {
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(_) => {
None
}
None => {
tracing::warn!(
msg_type = msg.msg_type,
len = msg.payload.len(),
crc_ok = msg.crc_ok,
"freeze_coord: unknown MSG_TYPE_* on bulk port; dropping"
);
None
}
}
}