use super::*;
use crate::vmm::bulk::HostAssembler;
use crate::vmm::wire::{
FRAME_HEADER_SIZE, MSG_TYPE_SCHED_EXIT, MSG_TYPE_SNAPSHOT_REPLY, MSG_TYPE_SNAPSHOT_REQUEST,
MSG_TYPE_STIMULUS, MSG_TYPE_SYS_RDY, MsgType, SNAPSHOT_KIND_CAPTURE, SNAPSHOT_TAG_MAX,
ShmEntry, ShmMessage, SnapshotRequestPayload,
};
use std::sync::atomic::{AtomicBool, Ordering};
use vmm_sys_util::eventfd::{EFD_NONBLOCK, EventFd};
use zerocopy::IntoBytes;
struct DispatchOutcome {
kill: bool,
kill_evt_counter: u32,
sys_rdy_counter: u32,
sys_rdy_remaining: bool,
snapshot_pending: usize,
bucket: Vec<ShmEntry>,
unknown_count: usize,
}
fn run_dispatch(messages: &[crate::vmm::bulk::BulkMessage]) -> DispatchOutcome {
let kill = AtomicBool::new(false);
let kill_evt = EventFd::new(EFD_NONBLOCK).expect("kill eventfd");
let sys_rdy_evt = std::sync::Arc::new(EventFd::new(EFD_NONBLOCK).expect("sys_rdy eventfd"));
let mut sys_rdy_handle: Option<std::sync::Arc<EventFd>> = Some(sys_rdy_evt.clone());
let mut snapshot_pending: Vec<SnapshotRequest> = Vec::new();
let mut bucket: Vec<ShmEntry> = Vec::new();
let mut unknown_count = 0usize;
for msg in messages {
let kind = MsgType::from_wire(msg.msg_type);
match kind {
Some(MsgType::SchedExit) => {
if msg.crc_ok {
kill.store(true, Ordering::Release);
let _ = kill_evt.write(1);
}
if msg.crc_ok {
bucket.push(ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
});
}
}
Some(MsgType::SysRdy) => {
if msg.crc_ok
&& msg.payload.is_empty()
&& let Some(evt) = sys_rdy_handle.take()
{
let _ = evt.write(1);
}
}
Some(MsgType::SnapshotRequest) => {
if msg.crc_ok
&& let Some(req) = decode_snapshot_request(&msg.payload[..])
{
snapshot_pending.push(req);
}
}
Some(other) if !other.is_coordinator_internal() => {
bucket.push(ShmEntry {
msg_type: msg.msg_type,
payload: msg.payload.to_vec(),
crc_ok: msg.crc_ok,
});
}
Some(_) => {
}
None => {
unknown_count = unknown_count.saturating_add(1);
}
}
}
let kill_value = kill.load(Ordering::Acquire);
let kill_evt_counter = match kill_evt.read() {
Ok(n) => n as u32,
Err(_) => 0,
};
let sys_rdy_remaining = sys_rdy_handle.is_some();
let sys_rdy_counter = match sys_rdy_evt.read() {
Ok(n) => n as u32,
Err(_) => 0,
};
DispatchOutcome {
kill: kill_value,
kill_evt_counter,
sys_rdy_counter,
sys_rdy_remaining,
snapshot_pending: snapshot_pending.len(),
bucket,
unknown_count,
}
}
fn frame_with_crc(msg_type: u32, payload: &[u8]) -> Vec<u8> {
let header = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(header.as_bytes());
buf.extend_from_slice(payload);
buf
}
fn frame_with_torn_crc(msg_type: u32, payload: &[u8]) -> Vec<u8> {
let real_crc = crc32fast::hash(payload);
let header = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: real_crc ^ 0xFFFF_FFFF,
_pad: 0,
};
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(header.as_bytes());
buf.extend_from_slice(payload);
buf
}
fn snapshot_request_bytes(request_id: u32, kind: u32, tag: &str) -> Vec<u8> {
let tag_bytes = tag.as_bytes();
let mut tag_buf = [0u8; SNAPSHOT_TAG_MAX];
let n = tag_bytes.len().min(SNAPSHOT_TAG_MAX);
tag_buf[..n].copy_from_slice(&tag_bytes[..n]);
SnapshotRequestPayload {
request_id,
kind,
tag: tag_buf,
}
.as_bytes()
.to_vec()
}
#[test]
fn unknown_msg_type_drops_without_bucketing() {
let mut a = HostAssembler::new();
assert!(MsgType::from_wire(0xDEAD_BEEF).is_none());
let bytes = frame_with_crc(0xDEAD_BEEF, b"unknown-payload");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
let out = run_dispatch(&drained.messages);
assert_eq!(
out.unknown_count, 1,
"unknown msg_type must hit the `None` arm exactly once"
);
assert!(
out.bucket.is_empty(),
"unknown msg_type must NOT surface as a verdict entry"
);
assert!(!out.kill, "unknown msg_type must NOT promote kill");
assert_eq!(
out.sys_rdy_counter, 0,
"unknown msg_type must NOT pump sys_rdy"
);
}
#[test]
fn sched_exit_torn_crc_does_not_bucket() {
let mut a = HostAssembler::new();
let bytes = frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"torn-payload");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(
!drained.messages[0].crc_ok,
"torn SCHED_EXIT must surface as crc_ok=false"
);
let out = run_dispatch(&drained.messages);
assert!(
out.bucket.is_empty(),
"CRC-bad SCHED_EXIT must NOT pollute the verdict bucket — \
a phantom verdict entry would surface in BulkDrainResult"
);
assert!(
!out.kill,
"CRC-bad SCHED_EXIT must NOT promote kill (existing gate)"
);
assert_eq!(
out.kill_evt_counter, 0,
"CRC-bad SCHED_EXIT must NOT write kill_evt (existing gate)"
);
}
#[test]
fn sched_exit_valid_crc_buckets_with_payload() {
let mut a = HostAssembler::new();
let payload = (-1i32).to_le_bytes();
let bytes = frame_with_crc(MSG_TYPE_SCHED_EXIT, &payload);
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(drained.messages[0].crc_ok);
let out = run_dispatch(&drained.messages);
assert_eq!(
out.bucket.len(),
1,
"CRC-valid SCHED_EXIT MUST bucket exactly once"
);
assert_eq!(out.bucket[0].msg_type, MSG_TYPE_SCHED_EXIT);
assert_eq!(out.bucket[0].payload, &payload[..]);
assert!(out.bucket[0].crc_ok);
assert!(out.kill);
assert_eq!(out.kill_evt_counter, 1);
}
#[test]
fn snapshot_reply_on_tx_does_not_bucket() {
let mut a = HostAssembler::new();
let bytes = frame_with_crc(MSG_TYPE_SNAPSHOT_REPLY, b"forged-reply");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(drained.messages[0].crc_ok);
let out = run_dispatch(&drained.messages);
assert!(
out.bucket.is_empty(),
"guest-stamped SNAPSHOT_REPLY must NOT surface as verdict — \
the tag is host→guest only"
);
assert_eq!(
out.snapshot_pending, 0,
"SNAPSHOT_REPLY is not a request — must not push pending"
);
assert!(
MsgType::SnapshotReply.is_coordinator_internal(),
"SnapshotReply must be classified as coordinator-internal"
);
}
#[test]
fn sys_rdy_with_nonempty_payload_does_not_promote() {
let mut a = HostAssembler::new();
let bytes = frame_with_crc(MSG_TYPE_SYS_RDY, b"smuggled-bytes");
let drained = a.feed(&bytes);
assert_eq!(drained.messages.len(), 1);
assert!(drained.messages[0].crc_ok);
assert_eq!(
drained.messages[0].payload.len(),
14,
"smuggled payload must propagate verbatim from assembler"
);
let out = run_dispatch(&drained.messages);
assert_eq!(
out.sys_rdy_counter, 0,
"SysRdy with non-empty payload must NOT fire eventfd — \
shape gate (is_empty) blocks the smuggle path"
);
assert!(
out.sys_rdy_remaining,
"SysRdy handle must remain available for a later \
well-formed (empty-payload) frame"
);
assert!(
out.bucket.is_empty(),
"SysRdy must NOT bucket regardless of shape — \
coordinator-internal classification dominates"
);
}
#[test]
fn sys_rdy_fires_once_across_two_feed_calls() {
let mut a = HostAssembler::new();
let drained1 = a.feed(&frame_with_crc(MSG_TYPE_SYS_RDY, b""));
assert_eq!(drained1.messages.len(), 1);
let drained2 = a.feed(&frame_with_crc(MSG_TYPE_SYS_RDY, b""));
assert_eq!(drained2.messages.len(), 1);
let mut combined: Vec<crate::vmm::bulk::BulkMessage> = Vec::new();
combined.extend_from_slice(&drained1.messages);
combined.extend_from_slice(&drained2.messages);
let out = run_dispatch(&combined);
assert_eq!(
out.sys_rdy_counter, 1,
"second SYS_RDY across feed boundary must NOT pump — \
closure-scope take() is the one-shot, not the assembler"
);
assert!(
!out.sys_rdy_remaining,
"first SYS_RDY must have consumed the handle"
);
}
#[test]
fn interleaved_batch_dispatches_all_arms_independently() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SYS_RDY, b""));
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"exit"));
let snap = snapshot_request_bytes(7, SNAPSHOT_KIND_CAPTURE, "snap");
buf.extend(frame_with_crc(MSG_TYPE_SNAPSHOT_REQUEST, &snap));
buf.extend(frame_with_crc(MSG_TYPE_STIMULUS, b"stim-payload"));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 4);
for m in &drained.messages {
assert!(m.crc_ok, "all four frames must surface crc_ok=true");
}
let out = run_dispatch(&drained.messages);
assert_eq!(out.sys_rdy_counter, 1, "SysRdy promotes");
assert!(!out.sys_rdy_remaining, "SysRdy handle consumed");
assert!(out.kill, "SchedExit promotes kill");
assert_eq!(out.kill_evt_counter, 1);
assert_eq!(
out.snapshot_pending, 1,
"SnapshotRequest decodes and pushes onto pending"
);
assert_eq!(
out.bucket.len(),
2,
"bucket must contain SchedExit + Stimulus — \
SysRdy + SnapshotRequest filtered as coordinator-internal"
);
let bucketed_tags: Vec<u32> = out.bucket.iter().map(|e| e.msg_type).collect();
assert!(bucketed_tags.contains(&MSG_TYPE_SCHED_EXIT));
assert!(bucketed_tags.contains(&MSG_TYPE_STIMULUS));
assert!(!bucketed_tags.contains(&MSG_TYPE_SYS_RDY));
assert!(!bucketed_tags.contains(&MSG_TYPE_SNAPSHOT_REQUEST));
}
#[test]
fn sys_rdy_only_batch_yields_empty_bucket() {
let mut a = HostAssembler::new();
let drained = a.feed(&frame_with_crc(MSG_TYPE_SYS_RDY, b""));
assert_eq!(drained.messages.len(), 1);
let out = run_dispatch(&drained.messages);
assert!(
out.bucket.is_empty(),
"SysRdy-only batch must produce an empty bucket — \
the production short-circuit avoids the shared mutex"
);
assert_eq!(out.sys_rdy_counter, 1);
}
#[test]
fn multiple_sched_exit_frames_pump_eventfd_per_frame() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"first"));
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"second"));
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"third"));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 3);
let out = run_dispatch(&drained.messages);
assert!(out.kill, "kill flag must be set");
assert_eq!(
out.kill_evt_counter, 3,
"EFD_NONBLOCK eventfd accumulates 1 per CRC-valid SchedExit \
frame — pinning the count documents the wakeup-edge \
contract"
);
assert_eq!(
out.bucket.len(),
3,
"every CRC-valid SchedExit must bucket — exit-code \
diagnostic must reach the verdict stream"
);
for entry in &out.bucket {
assert_eq!(entry.msg_type, MSG_TYPE_SCHED_EXIT);
assert!(entry.crc_ok);
}
}
#[test]
fn sched_exit_mixed_crc_batch_filters_torn_frame() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"valid-1"));
buf.extend(frame_with_torn_crc(MSG_TYPE_SCHED_EXIT, b"torn"));
buf.extend(frame_with_crc(MSG_TYPE_SCHED_EXIT, b"valid-2"));
let drained = a.feed(&buf);
assert_eq!(drained.messages.len(), 3);
assert!(drained.messages[0].crc_ok);
assert!(!drained.messages[1].crc_ok);
assert!(drained.messages[2].crc_ok);
let out = run_dispatch(&drained.messages);
assert_eq!(
out.bucket.len(),
2,
"torn middle SchedExit must drop; valid bookends must bucket"
);
for entry in &out.bucket {
assert!(entry.crc_ok, "every bucketed SchedExit is crc_ok=true");
}
assert!(out.kill, "valid SchedExits promote kill");
assert_eq!(
out.kill_evt_counter, 2,
"exactly the two valid SchedExits pump kill_evt"
);
}