use super::dispatch::{BulkDispatchSinks, dispatch_bulk_message};
use super::state::SnapshotRequest;
use crate::vmm::bulk::HostAssembler;
use crate::vmm::wire::{
FRAME_HEADER_SIZE, MSG_TYPE_SCHED_EXIT, MSG_TYPE_SNAPSHOT_REQUEST, MSG_TYPE_SYS_RDY,
SNAPSHOT_KIND_CAPTURE, SNAPSHOT_TAG_MAX, ShmEntry, ShmMessage, SnapshotRequestPayload,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64};
use std::time::Instant;
use vmm_sys_util::eventfd::{EFD_NONBLOCK, EventFd};
use zerocopy::IntoBytes;
struct SinkState {
kill: Arc<AtomicBool>,
kill_evt: Arc<EventFd>,
sys_rdy_evt: Option<Arc<EventFd>>,
snapshot_requests_pending: Vec<SnapshotRequest>,
kernel_op_requests_pending: Vec<crate::vmm::wire::KernelOpRequestPayload>,
kern_phys_base: Arc<AtomicU64>,
kern_phys_base_evt: EventFd,
kern_virt_kaslr: Arc<AtomicU64>,
kern_virt_kaslr_evt: EventFd,
watchdog_pause_ns: AtomicU64,
scenario_start_ns: AtomicU64,
scenario_pause_cumulative_ns: AtomicU64,
run_start: Instant,
current_step: Arc<AtomicU16>,
}
impl SinkState {
fn new() -> Self {
Self {
kill: Arc::new(AtomicBool::new(false)),
kill_evt: Arc::new(EventFd::new(EFD_NONBLOCK).expect("kill eventfd")),
sys_rdy_evt: Some(Arc::new(
EventFd::new(EFD_NONBLOCK).expect("sys_rdy eventfd"),
)),
snapshot_requests_pending: Vec::new(),
kernel_op_requests_pending: Vec::new(),
kern_phys_base: Arc::new(AtomicU64::new(0)),
kern_phys_base_evt: EventFd::new(EFD_NONBLOCK).expect("phys_base eventfd"),
kern_virt_kaslr: Arc::new(AtomicU64::new(0)),
kern_virt_kaslr_evt: EventFd::new(EFD_NONBLOCK).expect("virt_kaslr eventfd"),
watchdog_pause_ns: AtomicU64::new(0),
scenario_start_ns: AtomicU64::new(0),
scenario_pause_cumulative_ns: AtomicU64::new(0),
run_start: Instant::now(),
current_step: Arc::new(AtomicU16::new(0)),
}
}
fn sinks(&mut self) -> BulkDispatchSinks<'_> {
BulkDispatchSinks {
kill: &self.kill,
kill_evt: &self.kill_evt,
sys_rdy_evt: &mut self.sys_rdy_evt,
snapshot_requests_pending: &mut self.snapshot_requests_pending,
kernel_op_requests_pending: &mut self.kernel_op_requests_pending,
kern_phys_base: &self.kern_phys_base,
kern_phys_base_evt: &self.kern_phys_base_evt,
kern_virt_kaslr: &self.kern_virt_kaslr,
kern_virt_kaslr_evt: &self.kern_virt_kaslr_evt,
kernel_text_link_kva: 0,
watchdog_reset: None,
watchdog_pause_ns: &self.watchdog_pause_ns,
scenario_start_ns: &self.scenario_start_ns,
scenario_pause_cumulative_ns: &self.scenario_pause_cumulative_ns,
run_start: self.run_start,
current_step: &self.current_step,
}
}
}
fn run_dispatch(messages: &[crate::vmm::bulk::BulkMessage]) -> (SinkState, Vec<ShmEntry>) {
let mut state = SinkState::new();
let mut bucket = Vec::new();
{
let mut sinks = state.sinks();
for msg in messages {
if let Some(entry) = dispatch_bulk_message(msg, &mut sinks) {
bucket.push(entry);
}
}
}
(state, bucket)
}
fn read_counter(evt: &EventFd) -> u64 {
evt.read().unwrap_or(0)
}
fn frame_with_crc(msg_type: u32, payload: &[u8]) -> Vec<u8> {
let header = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(header.as_bytes());
buf.extend_from_slice(payload);
buf
}
fn frame_with_torn_crc(msg_type: u32, payload: &[u8]) -> Vec<u8> {
let real_crc = crc32fast::hash(payload);
let header = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: real_crc ^ 0xFFFF_FFFF,
_pad: 0,
};
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(header.as_bytes());
buf.extend_from_slice(payload);
buf
}
fn snapshot_request_bytes(request_id: u32, kind: u32, tag: &str) -> Vec<u8> {
let tag_bytes = tag.as_bytes();
let mut tag_buf = [0u8; SNAPSHOT_TAG_MAX];
let n = tag_bytes.len().min(SNAPSHOT_TAG_MAX);
tag_buf[..n].copy_from_slice(&tag_bytes[..n]);
SnapshotRequestPayload {
request_id,
kind,
tag: tag_buf,
}
.as_bytes()
.to_vec()
}
#[test]
fn sched_exit_with_torn_crc_does_not_promote_kill() {
let mut a = HostAssembler::new();
let bytes = frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"exit-payload");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1, "assembler emits one message");
assert!(
!drained.messages[0].crc_ok,
"torn CRC must surface as crc_ok=false"
);
assert_eq!(
drained.messages[0].msg_type, MSG_TYPE_SCHED_EXIT,
"msg_type unaffected by CRC mismatch — gate dispatch is by type"
);
let (state, bucket) = run_dispatch(&drained.messages);
assert!(
!state.kill.load(std::sync::atomic::Ordering::Acquire),
"kill flag must NOT flip on CRC-failed SCHED_EXIT — \
hostile guest must not force early exit"
);
assert_eq!(
read_counter(&state.kill_evt),
0,
"kill eventfd must NOT be written on CRC-failed SCHED_EXIT — \
the BSP loop and watchdog must not be woken"
);
assert!(
bucket.is_empty(),
"CRC-failed SCHED_EXIT must NOT surface as a verdict entry"
);
}
#[test]
fn sched_exit_with_valid_crc_does_promote_kill() {
let mut a = HostAssembler::new();
let bytes = frame_with_crc(MSG_TYPE_SCHED_EXIT, b"exit-payload");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(
drained.messages[0].crc_ok,
"matching CRC must surface as crc_ok=true"
);
let (state, bucket) = run_dispatch(&drained.messages);
assert!(
state.kill.load(std::sync::atomic::Ordering::Acquire),
"kill flag MUST flip on CRC-valid SCHED_EXIT — promotion is \
the load-bearing path that ends a test promptly"
);
assert_eq!(
read_counter(&state.kill_evt),
1,
"kill eventfd MUST be written once on CRC-valid SCHED_EXIT — \
the BSP loop and watchdog need an epoll wake to exit"
);
assert_eq!(
bucket.len(),
1,
"CRC-valid SCHED_EXIT must bucket exactly once"
);
assert_eq!(bucket[0].msg_type, MSG_TYPE_SCHED_EXIT);
assert_eq!(bucket[0].payload, b"exit-payload"[..]);
assert!(bucket[0].crc_ok);
}
#[test]
fn sched_exit_torn_crc_does_not_promote_when_other_valid_frames_present() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"first"));
buf.extend(frame_with_crc(
crate::vmm::wire::MSG_TYPE_STIMULUS,
b"valid",
));
buf.extend(frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"second"));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 3);
assert!(!drained.messages[0].crc_ok);
assert!(drained.messages[1].crc_ok);
assert!(!drained.messages[2].crc_ok);
let (state, bucket) = run_dispatch(&drained.messages);
assert!(
!state.kill.load(std::sync::atomic::Ordering::Acquire),
"neither torn SCHED_EXIT may promote even though a CRC-valid \
non-SCHED_EXIT frame arrived alongside them"
);
assert_eq!(
read_counter(&state.kill_evt),
0,
"kill eventfd must remain undisturbed"
);
assert_eq!(
bucket.len(),
1,
"only the CRC-valid STIMULUS buckets; both torn SCHED_EXITs drop"
);
assert_eq!(bucket[0].msg_type, crate::vmm::wire::MSG_TYPE_STIMULUS);
}
#[test]
fn snapshot_request_with_torn_crc_is_dropped() {
let mut a = HostAssembler::new();
let payload = snapshot_request_bytes(7, SNAPSHOT_KIND_CAPTURE, "snap_dump");
let bytes = frame_with_torn_crc(MSG_TYPE_SNAPSHOT_REQUEST, &payload);
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1, "assembler emits one message");
assert!(
!drained.messages[0].crc_ok,
"torn CRC must surface as crc_ok=false"
);
assert_eq!(
drained.messages[0].msg_type, MSG_TYPE_SNAPSHOT_REQUEST,
"msg_type unaffected by CRC mismatch"
);
let (state, bucket) = run_dispatch(&drained.messages);
assert_eq!(
state.snapshot_requests_pending.len(),
0,
"CRC-failed SNAPSHOT_REQUEST must NOT decode — \
hostile guest must not force a capture or watchpoint arm"
);
assert!(
bucket.is_empty(),
"SNAPSHOT_REQUEST is coordinator-internal — never buckets"
);
}
#[test]
fn snapshot_request_with_valid_crc_is_pushed() {
let mut a = HostAssembler::new();
let payload = snapshot_request_bytes(42, SNAPSHOT_KIND_CAPTURE, "valid_tag");
let bytes = frame_with_crc(MSG_TYPE_SNAPSHOT_REQUEST, &payload);
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(
drained.messages[0].crc_ok,
"matching CRC must surface as crc_ok=true"
);
let (state, bucket) = run_dispatch(&drained.messages);
assert_eq!(
state.snapshot_requests_pending.len(),
1,
"CRC-valid well-formed SNAPSHOT_REQUEST MUST decode and push"
);
assert_eq!(
state.snapshot_requests_pending[0].request_id, 42,
"decoded request_id must round-trip through the gate"
);
assert!(
bucket.is_empty(),
"SNAPSHOT_REQUEST is coordinator-internal — never buckets"
);
}
#[test]
fn snapshot_request_torn_crc_dropped_in_mixed_batch() {
let mut a = HostAssembler::new();
let p_first = snapshot_request_bytes(1, SNAPSHOT_KIND_CAPTURE, "first");
let p_torn = snapshot_request_bytes(2, SNAPSHOT_KIND_CAPTURE, "torn");
let p_third = snapshot_request_bytes(3, SNAPSHOT_KIND_CAPTURE, "third");
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SNAPSHOT_REQUEST, &p_first));
buf.extend(frame_with_torn_crc(MSG_TYPE_SNAPSHOT_REQUEST, &p_torn));
buf.extend(frame_with_crc(MSG_TYPE_SNAPSHOT_REQUEST, &p_third));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 3);
assert!(drained.messages[0].crc_ok);
assert!(!drained.messages[1].crc_ok);
assert!(drained.messages[2].crc_ok);
let (state, _bucket) = run_dispatch(&drained.messages);
assert_eq!(
state.snapshot_requests_pending.len(),
2,
"exactly the two CRC-valid SNAPSHOT_REQUESTs must push; \
the torn middle frame must drop independently"
);
let ids: Vec<u32> = state
.snapshot_requests_pending
.iter()
.map(|r| r.request_id)
.collect();
assert_eq!(ids, vec![1, 3], "torn id=2 must be filtered out");
}
#[test]
fn both_gates_drop_torn_frames_in_same_drain() {
let mut a = HostAssembler::new();
let snap_payload = snapshot_request_bytes(99, SNAPSHOT_KIND_CAPTURE, "tag");
let mut buf = Vec::new();
buf.extend(frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"sched-exit"));
buf.extend(frame_with_torn_crc(
MSG_TYPE_SNAPSHOT_REQUEST,
&snap_payload,
));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 2);
assert!(!drained.messages[0].crc_ok);
assert!(!drained.messages[1].crc_ok);
let (state, bucket) = run_dispatch(&drained.messages);
assert!(
!state.kill.load(std::sync::atomic::Ordering::Acquire),
"torn SCHED_EXIT must not promote kill"
);
assert_eq!(
read_counter(&state.kill_evt),
0,
"torn SCHED_EXIT must not write kill eventfd"
);
assert_eq!(
state.snapshot_requests_pending.len(),
0,
"torn SNAPSHOT_REQUEST must not decode"
);
assert!(
bucket.is_empty(),
"both torn frames must drop from the bucket"
);
}
#[test]
fn sys_rdy_with_torn_crc_does_not_fire_eventfd() {
let mut a = HostAssembler::new();
let bytes = frame_with_torn_crc(MSG_TYPE_SYS_RDY, b"");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1, "assembler emits one message");
assert!(
!drained.messages[0].crc_ok,
"torn CRC must surface as crc_ok=false"
);
assert_eq!(
drained.messages[0].msg_type, MSG_TYPE_SYS_RDY,
"msg_type unaffected by CRC mismatch"
);
let evt_handle = run_dispatch_sys_rdy(&drained.messages);
assert_eq!(
evt_handle.counter, 0,
"boot-complete eventfd must NOT be written on CRC-failed \
SYS_RDY — hostile guest must not race ahead of percpu/KASLR"
);
assert!(
evt_handle.handle_remaining,
"Option::take must NOT consume the handle on a dropped frame — \
a later CRC-valid SYS_RDY must still be able to promote"
);
}
#[test]
fn sys_rdy_with_valid_crc_fires_eventfd_once() {
let mut a = HostAssembler::new();
let bytes = frame_with_crc(MSG_TYPE_SYS_RDY, b"");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(
drained.messages[0].crc_ok,
"matching CRC must surface as crc_ok=true"
);
let out = run_dispatch_sys_rdy(&drained.messages);
assert_eq!(
out.counter, 1,
"boot-complete eventfd MUST receive a single write on \
CRC-valid SYS_RDY"
);
assert!(
!out.handle_remaining,
"Option::take must consume the handle so subsequent \
SYS_RDY frames do not pump the counter"
);
}
#[test]
fn sys_rdy_with_valid_crc_fires_once_then_subsequent_drops() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SYS_RDY, b""));
buf.extend(frame_with_crc(MSG_TYPE_SYS_RDY, b""));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 2);
assert!(drained.messages[0].crc_ok);
assert!(drained.messages[1].crc_ok);
let out = run_dispatch_sys_rdy(&drained.messages);
assert_eq!(
out.counter, 1,
"second SYS_RDY must NOT pump the eventfd — \
Option::take consumed the handle on the first promotion"
);
assert!(!out.handle_remaining);
}
#[test]
fn sys_rdy_and_sched_exit_fire_independently() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SYS_RDY, b""));
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"exit-payload"));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 2);
assert!(drained.messages[0].crc_ok);
assert!(drained.messages[1].crc_ok);
let out = run_dispatch_sys_rdy(&drained.messages);
assert_eq!(out.counter, 1, "SYS_RDY must promote");
assert!(!out.handle_remaining, "SYS_RDY handle must be consumed");
assert!(out.kill, "SCHED_EXIT must promote kill");
assert_eq!(
out.kill_evt_counter, 1,
"SCHED_EXIT must write kill eventfd"
);
}
struct SysRdyOutcome {
counter: u64,
handle_remaining: bool,
kill: bool,
kill_evt_counter: u64,
}
fn run_dispatch_sys_rdy(messages: &[crate::vmm::bulk::BulkMessage]) -> SysRdyOutcome {
let mut state = SinkState::new();
let evt_clone = state
.sys_rdy_evt
.as_ref()
.expect("sys_rdy handle present at start")
.clone();
{
let mut sinks = state.sinks();
for msg in messages {
let _ = dispatch_bulk_message(msg, &mut sinks);
}
}
SysRdyOutcome {
counter: read_counter(&evt_clone),
handle_remaining: state.sys_rdy_evt.is_some(),
kill: state.kill.load(std::sync::atomic::Ordering::Acquire),
kill_evt_counter: read_counter(&state.kill_evt),
}
}