use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use vmm_sys_util::eventfd::EventFd;
use super::snapshot::decode_snapshot_request;
use super::state::SnapshotRequest;
use crate::vmm::KERNEL_HALF_CANONICAL as KERNEL_HALF_CANONICAL_4LEVEL;
pub(super) struct BulkDispatchSinks<'a> {
pub kill: &'a Arc<AtomicBool>,
pub kill_evt: &'a Arc<EventFd>,
pub sys_rdy_evt: &'a mut Option<Arc<EventFd>>,
pub snapshot_requests_pending: &'a mut Vec<SnapshotRequest>,
pub kernel_op_requests_pending: &'a mut Vec<crate::vmm::wire::KernelOpRequestPayload>,
pub kern_phys_base: &'a Arc<std::sync::atomic::AtomicU64>,
pub kern_phys_base_evt: &'a EventFd,
pub kern_virt_kaslr: &'a Arc<std::sync::atomic::AtomicU64>,
pub kern_virt_kaslr_evt: &'a EventFd,
pub kernel_text_link_kva: u64,
pub watchdog_reset: Option<(
&'a std::sync::atomic::AtomicU64,
std::time::Duration,
std::time::Instant,
)>,
pub watchdog_pause_ns: &'a std::sync::atomic::AtomicU64,
pub scenario_start_ns: &'a std::sync::atomic::AtomicU64,
pub scenario_pause_cumulative_ns: &'a std::sync::atomic::AtomicU64,
pub run_start: std::time::Instant,
pub current_step: &'a std::sync::Arc<std::sync::atomic::AtomicU16>,
}
pub(super) fn dispatch_bulk_message(
msg: &crate::vmm::bulk::BulkMessage,
sinks: &mut BulkDispatchSinks<'_>,
) -> Option<crate::vmm::wire::ShmEntry> {
let kind = crate::vmm::wire::MsgType::from_wire(msg.msg_type);
match kind {
Some(crate::vmm::wire::MsgType::SchedExit) => {
if msg.crc_ok {
sinks.kill.store(true, Ordering::Release);
if let Err(e) = sinks.kill_evt.write(1) {
tracing::warn!(
err = %e,
"freeze_coord: kill_evt write on SCHED_EXIT \
promotion failed; the kill AtomicBool above is \
still authoritative"
);
}
}
if msg.crc_ok {
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
} else {
None
}
}
Some(crate::vmm::wire::MsgType::SysRdy) => {
if msg.crc_ok
&& msg.payload.is_empty()
&& let Some(evt) = sinks.sys_rdy_evt.take()
&& let Err(e) = evt.write(1)
{
tracing::warn!(
err = %e,
"freeze_coord: sys_rdy write failed; monitor will \
rely on kill_evt or 5 s timeout to leave its \
pre-sample wait"
);
}
None
}
_ if msg.msg_type == crate::vmm::wire::MSG_TYPE_KERN_ADDRS => {
if msg.crc_ok
&& let Some(addrs) = crate::vmm::wire::KernAddrs::from_payload(&msg.payload)
{
if addrs.has_phys_present_bit() {
let biased_phys = addrs.phys_base.wrapping_add(1);
sinks
.kern_phys_base
.store(biased_phys, std::sync::atomic::Ordering::Release);
let _ = sinks.kern_phys_base_evt.write(1);
}
const RANDOMIZE_BASE_MAX_OFFSET: u64 = 1 << 30; if let Some(runtime) = addrs.kernel_text_runtime_kva
&& sinks.kernel_text_link_kva >= KERNEL_HALF_CANONICAL_4LEVEL
{
let link = sinks.kernel_text_link_kva;
if runtime >= KERNEL_HALF_CANONICAL_4LEVEL
&& runtime >= link
&& (runtime - link) <= RANDOMIZE_BASE_MAX_OFFSET
{
let offset = runtime - link;
let biased_offset = offset.wrapping_add(1);
match sinks.kern_virt_kaslr.compare_exchange(
0,
biased_offset,
std::sync::atomic::Ordering::Release,
std::sync::atomic::Ordering::Acquire,
) {
Ok(_) => {
let _ = sinks.kern_virt_kaslr_evt.write(1);
}
Err(existing) if existing != biased_offset => {
let lstar_derived = existing.saturating_sub(1);
tracing::error!(
kern_addrs_derived = format_args!("{offset:#x}"),
lstar_derived = format_args!("{lstar_derived:#x}"),
delta = format_args!("{:#x}", offset ^ lstar_derived),
"VirtKaslrDivergence: KERN_ADDRS-derived virt-KASLR \
offset disagrees with the previously-published \
MSR_LSTAR-derived value. Both should equal the \
boot-time slot pick. Possible causes: stale vmlinux \
template (rebuild + retest), kernel mid-boot KASLR \
re-roll, hostile guest payload (KERN_ADDRS _text \
was forged)."
);
}
Err(_) => {
}
}
}
}
}
None
}
Some(crate::vmm::wire::MsgType::SnapshotRequest) => {
if msg.crc_ok
&& let Some(req) = decode_snapshot_request(&msg.payload[..])
{
sinks.snapshot_requests_pending.push(req);
}
None
}
Some(crate::vmm::wire::MsgType::KernelOpRequest) => {
if msg.crc_ok
&& let Ok(mut req) = postcard::from_bytes::<crate::vmm::wire::KernelOpRequestPayload>(
&msg.payload[..],
)
{
if req.tag.len() > crate::vmm::wire::KERNEL_OP_TAG_MAX {
let mut idx = crate::vmm::wire::KERNEL_OP_TAG_MAX;
while !req.tag.is_char_boundary(idx) {
idx -= 1;
}
req.tag.truncate(idx);
}
sinks.kernel_op_requests_pending.push(req);
}
None
}
Some(crate::vmm::wire::MsgType::ScenarioStart) => {
if msg.crc_ok {
let elapsed = sinks.run_start.elapsed();
let elapsed_ns = u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX).max(1);
let _ = sinks.scenario_start_ns.compare_exchange(
0,
elapsed_ns,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
);
if let Some((reset_ns, duration, _)) = sinks.watchdog_reset.as_ref() {
let target_ns = elapsed.as_nanos().saturating_add(duration.as_nanos());
let encoded = u64::try_from(target_ns).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
}
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioPause) => {
if msg.crc_ok {
let elapsed = sinks
.watchdog_reset
.as_ref()
.map(|(_, _, run_start)| run_start.elapsed().as_nanos())
.unwrap_or(0);
let encoded = u64::try_from(elapsed).unwrap_or(u64::MAX).max(1);
sinks
.watchdog_pause_ns
.store(encoded, std::sync::atomic::Ordering::Release);
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioResume) => {
if msg.crc_ok
&& let Some((reset_ns, _, run_start)) = sinks.watchdog_reset.as_ref()
{
let paused_at = sinks
.watchdog_pause_ns
.swap(0, std::sync::atomic::Ordering::AcqRel);
if paused_at > 0 {
let elapsed = run_start.elapsed();
let pause_duration = elapsed.as_nanos().saturating_sub(paused_at as u128);
let prior = reset_ns.load(std::sync::atomic::Ordering::Acquire);
let extended = (prior as u128).saturating_add(pause_duration);
let encoded = u64::try_from(extended).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
let prior_cumulative = sinks
.scenario_pause_cumulative_ns
.load(std::sync::atomic::Ordering::Acquire);
let new_cumulative = (prior_cumulative as u128).saturating_add(pause_duration);
let encoded_cumulative = u64::try_from(new_cumulative).unwrap_or(u64::MAX);
sinks
.scenario_pause_cumulative_ns
.store(encoded_cumulative, std::sync::atomic::Ordering::Release);
}
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::ScenarioEnd) => {
if msg.crc_ok
&& let Some((reset_ns, duration, run_start)) = sinks.watchdog_reset.as_ref()
{
let elapsed = run_start.elapsed();
let target_ns = elapsed.as_nanos().saturating_add(duration.as_nanos());
let encoded = u64::try_from(target_ns).unwrap_or(u64::MAX).max(1);
reset_ns.store(encoded, std::sync::atomic::Ordering::Release);
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(crate::vmm::wire::MsgType::Stimulus) => {
if msg.crc_ok
&& let Some(event) = crate::vmm::wire::StimulusEvent::from_payload(&msg.payload)
{
sinks
.current_step
.store(event.step_index, std::sync::atomic::Ordering::Release);
}
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(other) if !other.is_coordinator_internal() => {
Some(crate::vmm::wire::ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
})
}
Some(_) => {
None
}
None => {
tracing::warn!(
msg_type = msg.msg_type,
len = msg.payload.len(),
crc_ok = msg.crc_ok,
"freeze_coord: unknown MSG_TYPE_* on bulk port; dropping"
);
None
}
}
}