use zerocopy::IntoBytes;
pub(crate) struct ShmMmap {
pub ptr: *mut u8,
pub map_base: *mut libc::c_void,
pub map_size: usize,
}
pub(crate) fn mmap_devmem(
fd: std::os::unix::io::RawFd,
shm_base: u64,
shm_size: u64,
) -> Option<ShmMmap> {
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
let aligned_base = shm_base & !(page_size - 1);
let offset_in_page = (shm_base - aligned_base) as usize;
let map_size = shm_size as usize + offset_in_page;
let map_base = unsafe {
libc::mmap(
std::ptr::null_mut(),
map_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
aligned_base as libc::off_t,
)
};
if map_base == libc::MAP_FAILED {
return None;
}
let ptr = unsafe { (map_base as *mut u8).add(offset_in_page) };
Some(ShmMmap {
ptr,
map_base,
map_size,
})
}
pub const SHM_RING_MAGIC: u32 = 0x5354_4d52;
pub const MSG_TYPE_STIMULUS: u32 = 0x5354_494D;
pub const MSG_TYPE_SCENARIO_START: u32 = 0x5343_5354;
pub const MSG_TYPE_SCENARIO_END: u32 = 0x5343_454E;
pub const MSG_TYPE_EXIT: u32 = 0x4558_4954;
pub const MSG_TYPE_TEST_RESULT: u32 = 0x5445_5354;
pub const MSG_TYPE_SCHED_EXIT: u32 = 0x5343_4458;
pub const MSG_TYPE_CRASH: u32 = 0x4352_5348;
pub const SHM_RING_VERSION: u32 = 1;
pub const DUMP_REQ_OFFSET: usize = 12;
pub const DUMP_REQ_SYSRQ_D: u8 = b'D';
pub const STALL_REQ_OFFSET: usize = 13;
pub const STALL_REQ_ACTIVATE: u8 = b'S';
pub const SIGNAL_SLOT_BASE: usize = 14;
const SIGNAL_SLOT_COUNT: usize = 2;
pub const SIGNAL_SHUTDOWN_REQ: u8 = 0xDD;
pub const SIGNAL_PROBES_READY: u8 = 2;
pub fn wait_for(slot: u8, timeout: std::time::Duration) -> anyhow::Result<()> {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let (ptr, _) = shm_ptr()?;
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
if atom.load(std::sync::atomic::Ordering::Acquire) != 0 {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
anyhow::bail!("signal slot {slot} timed out after {timeout:?}")
}
pub fn signal(slot: u8) {
signal_value(slot, 1);
}
pub fn signal_guest(mem: &crate::monitor::reader::GuestMem, shm_base: u64, slot: u8) {
signal_guest_value(mem, shm_base, slot, 1);
}
pub fn signal_guest_value(
mem: &crate::monitor::reader::GuestMem,
shm_base: u64,
slot: u8,
value: u8,
) {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
mem.write_u8(shm_base, SIGNAL_SLOT_BASE + slot as usize, value);
}
pub fn read_signal(slot: u8) -> u8 {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let Ok((ptr, _)) = shm_ptr() else { return 0 };
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
atom.load(std::sync::atomic::Ordering::Acquire)
}
pub fn signal_value(slot: u8, value: u8) {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let Ok((ptr, _)) = shm_ptr() else { return };
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
atom.store(value, std::sync::atomic::Ordering::Release);
}
pub fn init_shm_ptr(base: *mut u8, size: usize) {
let _ = SHM_PTR.set(ShmPtr { ptr: base, size });
}
pub fn write_msg(msg_type: u32, payload: &[u8]) {
let Ok((ptr, size)) = shm_ptr() else { return };
let _guard = SHM_WRITE_LOCK.lock();
let buf = unsafe { std::slice::from_raw_parts_mut(ptr, size) };
shm_write(buf, 0, msg_type, payload);
}
pub fn write_msg_nonblocking(msg_type: u32, payload: &[u8]) -> bool {
let Ok((ptr, size)) = shm_ptr() else {
return false;
};
let Some(_guard) = SHM_WRITE_LOCK.try_lock() else {
return false;
};
let buf = unsafe { std::slice::from_raw_parts_mut(ptr, size) };
shm_write(buf, 0, msg_type, payload);
true
}
struct ShmPtr {
ptr: *mut u8,
size: usize,
}
unsafe impl Send for ShmPtr {}
unsafe impl Sync for ShmPtr {}
static SHM_PTR: std::sync::OnceLock<ShmPtr> = std::sync::OnceLock::new();
pub static SHM_WRITE_LOCK: parking_lot::Mutex<()> = parking_lot::Mutex::new(());
fn shm_ptr() -> anyhow::Result<(*mut u8, usize)> {
if let Some(p) = SHM_PTR.get() {
return Ok((p.ptr, p.size));
}
let cmdline = std::fs::read_to_string("/proc/cmdline")
.map_err(|e| anyhow::anyhow!("/proc/cmdline: {e}"))?;
let (shm_base, shm_size) = parse_shm_params_from_str(&cmdline)
.ok_or_else(|| anyhow::anyhow!("no SHM params in cmdline"))?;
let fd = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open("/dev/mem")
.map_err(|e| anyhow::anyhow!("/dev/mem open: {e}"))?;
let m = mmap_devmem(
std::os::unix::io::AsRawFd::as_raw_fd(&fd),
shm_base,
shm_size,
)
.ok_or_else(|| anyhow::anyhow!("/dev/mem mmap failed: {}", std::io::Error::last_os_error()))?;
let size = shm_size as usize;
let _ = SHM_PTR.set(ShmPtr { ptr: m.ptr, size });
Ok((m.ptr, size))
}
pub(crate) fn parse_shm_params_from_str(cmdline: &str) -> Option<(u64, u64)> {
let base = cmdline
.split_whitespace()
.find(|s| s.starts_with("KTSTR_SHM_BASE="))?
.strip_prefix("KTSTR_SHM_BASE=")?;
let size = cmdline
.split_whitespace()
.find(|s| s.starts_with("KTSTR_SHM_SIZE="))?
.strip_prefix("KTSTR_SHM_SIZE=")?;
let base =
u64::from_str_radix(base.trim_start_matches("0x").trim_start_matches("0X"), 16).ok()?;
let size =
u64::from_str_radix(size.trim_start_matches("0x").trim_start_matches("0X"), 16).ok()?;
Some((base, size))
}
#[repr(C)]
#[derive(Clone, Copy, Default, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout)]
pub struct ShmRingHeader {
pub magic: u32,
pub version: u32,
pub capacity: u32,
pub _pad: u32,
pub write_ptr: u64,
pub read_ptr: u64,
pub drops: u64,
}
const _HEADER_SIZE: () = assert!(std::mem::size_of::<ShmRingHeader>() == 40);
#[repr(C)]
#[derive(Clone, Copy, Default, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout)]
pub struct ShmMessage {
pub msg_type: u32,
pub length: u32,
pub crc32: u32,
pub _pad: u32,
}
const _MSG_SIZE: () = assert!(std::mem::size_of::<ShmMessage>() == 16);
pub const HEADER_SIZE: usize = std::mem::size_of::<ShmRingHeader>();
pub const MSG_HEADER_SIZE: usize = std::mem::size_of::<ShmMessage>();
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ShmEntry {
pub msg_type: u32,
pub payload: Vec<u8>,
pub crc_ok: bool,
}
#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct ShmDrainResult {
pub entries: Vec<ShmEntry>,
pub drops: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Default, Debug, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout)]
pub struct StimulusPayload {
pub elapsed_ms: u32,
pub step_index: u16,
pub op_count: u16,
pub op_kinds: u32,
pub cgroup_count: u16,
pub worker_count: u16,
pub total_iterations: u64,
}
const _STIMULUS_SIZE: () = assert!(std::mem::size_of::<StimulusPayload>() == 24);
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct StimulusEvent {
pub elapsed_ms: u32,
pub step_index: u16,
pub op_count: u16,
pub op_kinds: u32,
pub cgroup_count: u16,
pub worker_count: u16,
pub total_iterations: u64,
}
impl StimulusEvent {
pub fn from_payload(data: &[u8]) -> Option<Self> {
if data.len() < std::mem::size_of::<StimulusPayload>() {
return None;
}
Some(StimulusEvent {
elapsed_ms: u32::from_ne_bytes(data[0..4].try_into().ok()?),
step_index: u16::from_ne_bytes(data[4..6].try_into().ok()?),
op_count: u16::from_ne_bytes(data[6..8].try_into().ok()?),
op_kinds: u32::from_ne_bytes(data[8..12].try_into().ok()?),
cgroup_count: u16::from_ne_bytes(data[12..14].try_into().ok()?),
worker_count: u16::from_ne_bytes(data[14..16].try_into().ok()?),
total_iterations: u64::from_ne_bytes(data[16..24].try_into().ok()?),
})
}
}
#[allow(dead_code)]
pub fn shm_init(buf: &mut [u8], shm_offset: usize, shm_size: usize) {
let capacity = shm_size - HEADER_SIZE;
let header = ShmRingHeader {
magic: SHM_RING_MAGIC,
version: SHM_RING_VERSION,
capacity: capacity as u32,
_pad: 0,
write_ptr: 0,
read_ptr: 0,
drops: 0,
};
let hdr_bytes = header.as_bytes();
buf[shm_offset..shm_offset + HEADER_SIZE].copy_from_slice(hdr_bytes);
let data_start = shm_offset + HEADER_SIZE;
let data_end = shm_offset + shm_size;
buf[data_start..data_end].fill(0);
}
fn read_header(buf: &[u8], shm_offset: usize) -> ShmRingHeader {
let s = &buf[shm_offset..shm_offset + HEADER_SIZE];
ShmRingHeader {
magic: u32::from_ne_bytes(s[0..4].try_into().unwrap()),
version: u32::from_ne_bytes(s[4..8].try_into().unwrap()),
capacity: u32::from_ne_bytes(s[8..12].try_into().unwrap()),
_pad: u32::from_ne_bytes(s[12..16].try_into().unwrap()),
write_ptr: u64::from_ne_bytes(s[16..24].try_into().unwrap()),
read_ptr: u64::from_ne_bytes(s[24..32].try_into().unwrap()),
drops: u64::from_ne_bytes(s[32..40].try_into().unwrap()),
}
}
fn read_ring_bytes(
buf: &[u8],
data_start: usize,
capacity: usize,
ptr: u64,
len: usize,
) -> Vec<u8> {
let mut out = vec![0u8; len];
read_ring_into(buf, data_start, capacity, ptr, &mut out);
out
}
fn read_ring_into(buf: &[u8], data_start: usize, capacity: usize, ptr: u64, out: &mut [u8]) {
let len = out.len();
let mut remaining = len;
let mut src_pos = (ptr % capacity as u64) as usize;
let mut dst_pos = 0;
while remaining > 0 {
let chunk = remaining.min(capacity - src_pos);
out[dst_pos..dst_pos + chunk]
.copy_from_slice(&buf[data_start + src_pos..data_start + src_pos + chunk]);
dst_pos += chunk;
src_pos = 0; remaining -= chunk;
}
}
pub fn shm_drain(buf: &[u8], shm_offset: usize) -> ShmDrainResult {
let header = read_header(buf, shm_offset);
if header.magic != SHM_RING_MAGIC {
return ShmDrainResult::default();
}
let capacity = header.capacity as usize;
let data_start = shm_offset + HEADER_SIZE;
let mut read_pos = header.read_ptr;
let write_pos = header.write_ptr;
let mut entries = Vec::new();
while read_pos + MSG_HEADER_SIZE as u64 <= write_pos {
let mut hdr_buf = [0u8; MSG_HEADER_SIZE];
read_ring_into(buf, data_start, capacity, read_pos, &mut hdr_buf);
let msg = ShmMessage {
msg_type: u32::from_ne_bytes(hdr_buf[0..4].try_into().unwrap()),
length: u32::from_ne_bytes(hdr_buf[4..8].try_into().unwrap()),
crc32: u32::from_ne_bytes(hdr_buf[8..12].try_into().unwrap()),
_pad: u32::from_ne_bytes(hdr_buf[12..16].try_into().unwrap()),
};
let total_msg_size = MSG_HEADER_SIZE as u64 + msg.length as u64;
if read_pos + total_msg_size > write_pos {
break;
}
let payload = read_ring_bytes(
buf,
data_start,
capacity,
read_pos + MSG_HEADER_SIZE as u64,
msg.length as usize,
);
let computed_crc = crc32fast::hash(&payload);
entries.push(ShmEntry {
msg_type: msg.msg_type,
payload,
crc_ok: computed_crc == msg.crc32,
});
read_pos += total_msg_size;
}
ShmDrainResult {
entries,
drops: header.drops,
}
}
pub fn shm_drain_live(mem: &crate::monitor::reader::GuestMem, shm_base_pa: u64) -> ShmDrainResult {
let magic = mem.read_u32(shm_base_pa, 0);
if magic != SHM_RING_MAGIC {
return ShmDrainResult::default();
}
let capacity = mem.read_u32(shm_base_pa, 8) as usize;
let write_ptr = mem.read_u64(shm_base_pa, 16);
let read_ptr = mem.read_u64(shm_base_pa, 24);
let drops = mem.read_u64(shm_base_pa, 32);
let data_start_pa = shm_base_pa + HEADER_SIZE as u64;
let mut read_pos = read_ptr;
let mut entries = Vec::new();
while read_pos + MSG_HEADER_SIZE as u64 <= write_ptr {
let mut hdr_buf = [0u8; MSG_HEADER_SIZE];
read_ring_volatile(mem, data_start_pa, capacity, read_pos, &mut hdr_buf);
let msg = ShmMessage {
msg_type: u32::from_ne_bytes(hdr_buf[0..4].try_into().unwrap()),
length: u32::from_ne_bytes(hdr_buf[4..8].try_into().unwrap()),
crc32: u32::from_ne_bytes(hdr_buf[8..12].try_into().unwrap()),
_pad: u32::from_ne_bytes(hdr_buf[12..16].try_into().unwrap()),
};
let total_msg_size = MSG_HEADER_SIZE as u64 + msg.length as u64;
if read_pos + total_msg_size > write_ptr {
break;
}
let mut payload = vec![0u8; msg.length as usize];
if !payload.is_empty() {
read_ring_volatile(
mem,
data_start_pa,
capacity,
read_pos + MSG_HEADER_SIZE as u64,
&mut payload,
);
}
let computed_crc = crc32fast::hash(&payload);
entries.push(ShmEntry {
msg_type: msg.msg_type,
payload,
crc_ok: computed_crc == msg.crc32,
});
read_pos += total_msg_size;
}
if read_pos != read_ptr {
mem.write_u64(shm_base_pa, 24, read_pos);
}
ShmDrainResult { entries, drops }
}
fn read_ring_volatile(
mem: &crate::monitor::reader::GuestMem,
data_start_pa: u64,
capacity: usize,
ptr: u64,
out: &mut [u8],
) {
let mut remaining = out.len();
let mut src_pos = (ptr % capacity as u64) as usize;
let mut dst_pos = 0;
while remaining > 0 {
let chunk = remaining.min(capacity - src_pos);
for i in 0..chunk {
let pa = data_start_pa + (src_pos + i) as u64;
let byte = unsafe { std::ptr::read_volatile(mem.base_ptr().add(pa as usize)) };
out[dst_pos + i] = byte;
}
dst_pos += chunk;
src_pos = 0; remaining -= chunk;
}
}
#[allow(dead_code)]
pub fn shm_write(buf: &mut [u8], shm_offset: usize, msg_type: u32, payload: &[u8]) -> usize {
let header = read_header(buf, shm_offset);
let capacity = header.capacity as usize;
let total = MSG_HEADER_SIZE + payload.len();
let used = (header.write_ptr - header.read_ptr) as usize;
if used + total > capacity {
let drops_offset = shm_offset + 32; let current = u64::from_ne_bytes(buf[drops_offset..drops_offset + 8].try_into().unwrap());
buf[drops_offset..drops_offset + 8].copy_from_slice(&(current + 1).to_ne_bytes());
return 0;
}
let data_start = shm_offset + HEADER_SIZE;
let msg = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
write_ring_bytes(buf, data_start, capacity, header.write_ptr, msg.as_bytes());
if !payload.is_empty() {
write_ring_bytes(
buf,
data_start,
capacity,
header.write_ptr + MSG_HEADER_SIZE as u64,
payload,
);
}
let new_write = header.write_ptr + total as u64;
let wp_offset = shm_offset + 16; buf[wp_offset..wp_offset + 8].copy_from_slice(&new_write.to_ne_bytes());
total
}
#[allow(dead_code)]
fn write_ring_bytes(buf: &mut [u8], data_start: usize, capacity: usize, ptr: u64, data: &[u8]) {
let mut remaining = data.len();
let mut src_pos = 0;
let mut dst_pos = (ptr % capacity as u64) as usize;
while remaining > 0 {
let chunk = remaining.min(capacity - dst_pos);
buf[data_start + dst_pos..data_start + dst_pos + chunk]
.copy_from_slice(&data[src_pos..src_pos + chunk]);
src_pos += chunk;
dst_pos = 0; remaining -= chunk;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn header_size_is_40() {
assert_eq!(std::mem::size_of::<ShmRingHeader>(), 40);
}
#[test]
fn message_size_is_16() {
assert_eq!(std::mem::size_of::<ShmMessage>(), 16);
}
fn make_ring(shm_size: usize) -> Vec<u8> {
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
buf
}
#[test]
fn init_sets_magic_and_capacity() {
let buf = make_ring(1024);
let hdr = read_header(&buf, 0);
assert_eq!(hdr.magic, SHM_RING_MAGIC);
assert_eq!(hdr.version, SHM_RING_VERSION);
assert_eq!(hdr.capacity, (1024 - HEADER_SIZE) as u32);
assert_eq!(hdr.write_ptr, 0);
assert_eq!(hdr.read_ptr, 0);
assert_eq!(hdr.drops, 0);
}
#[test]
fn drain_empty_ring() {
let buf = make_ring(1024);
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
assert_eq!(result.drops, 0);
}
#[test]
fn drain_bad_magic() {
let mut buf = vec![0u8; 1024];
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
buf[0..4].copy_from_slice(&0xDEADBEEFu32.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn write_and_drain_single_message() {
let mut buf = make_ring(1024);
let payload = b"hello world";
let written = shm_write(&mut buf, 0, 1, payload);
assert_eq!(written, MSG_HEADER_SIZE + payload.len());
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, payload);
assert!(result.entries[0].crc_ok);
assert_eq!(result.drops, 0);
}
#[test]
fn write_and_drain_multiple_messages() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"first");
shm_write(&mut buf, 0, 2, b"second");
shm_write(&mut buf, 0, 3, b"third");
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, b"first");
assert_eq!(result.entries[1].msg_type, 2);
assert_eq!(result.entries[1].payload, b"second");
assert_eq!(result.entries[2].msg_type, 3);
assert_eq!(result.entries[2].payload, b"third");
for e in &result.entries {
assert!(e.crc_ok);
}
}
#[test]
fn write_empty_payload() {
let mut buf = make_ring(1024);
let written = shm_write(&mut buf, 0, 42, b"");
assert_eq!(written, MSG_HEADER_SIZE);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 42);
assert!(result.entries[0].payload.is_empty());
assert!(result.entries[0].crc_ok);
}
#[test]
fn ring_full_increments_drops() {
let shm_size = HEADER_SIZE + 60;
let mut buf = make_ring(shm_size);
let payload = vec![0xAA; 44]; let written = shm_write(&mut buf, 0, 1, &payload);
assert_eq!(written, 60);
let written = shm_write(&mut buf, 0, 2, b"x");
assert_eq!(written, 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.drops, 1);
}
#[test]
fn ring_full_multiple_drops() {
let shm_size = HEADER_SIZE + 32;
let mut buf = make_ring(shm_size);
let payload = vec![0xBB; 16]; shm_write(&mut buf, 0, 1, &payload);
assert_eq!(shm_write(&mut buf, 0, 2, b"a"), 0);
assert_eq!(shm_write(&mut buf, 0, 3, b"b"), 0);
assert_eq!(shm_write(&mut buf, 0, 4, b"c"), 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.drops, 3);
}
#[test]
fn wraparound_single_message() {
let shm_size = HEADER_SIZE + 48;
let mut buf = make_ring(shm_size);
let payload1 = vec![0x11; 16];
shm_write(&mut buf, 0, 1, &payload1);
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
let payload2 = vec![0x22; 16];
shm_write(&mut buf, 0, 2, &payload2);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 2);
assert_eq!(result.entries[0].payload, payload2);
assert!(result.entries[0].crc_ok);
}
#[test]
fn wraparound_message_header_splits() {
let shm_size = HEADER_SIZE + 40;
let mut buf = make_ring(shm_size);
shm_write(&mut buf, 0, 1, &[0xAA; 16]);
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
let payload2 = vec![0xBB; 4];
shm_write(&mut buf, 0, 2, &payload2);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 2);
assert_eq!(result.entries[0].payload, payload2);
assert!(result.entries[0].crc_ok);
}
#[test]
fn crc_detects_corruption() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"integrity check");
let data_start = HEADER_SIZE;
let payload_start = data_start + MSG_HEADER_SIZE;
buf[payload_start] ^= 0xFF;
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert!(!result.entries[0].crc_ok);
}
#[test]
fn crc_empty_payload_is_zero_for_empty() {
assert_eq!(crc32fast::hash(b""), 0x0000_0000);
}
#[test]
fn crc32_known_vectors() {
assert_eq!(crc32fast::hash(b"123456789"), 0xCBF4_3926);
assert_eq!(crc32fast::hash(b""), 0x0000_0000);
assert_eq!(crc32fast::hash(b"a"), 0xE8B7_BE43);
}
#[test]
fn nonzero_shm_offset() {
let offset = 4096;
let shm_size = 512;
let total = offset + shm_size;
let mut buf = vec![0xFFu8; total];
shm_init(&mut buf, offset, shm_size);
shm_write(&mut buf, offset, 7, b"offset test");
let result = shm_drain(&buf, offset);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 7);
assert_eq!(result.entries[0].payload, b"offset test");
assert!(result.entries[0].crc_ok);
}
#[test]
fn large_payload() {
let mut buf = make_ring(65536);
let payload = vec![0x42; 60000];
let written = shm_write(&mut buf, 0, 99, &payload);
assert_eq!(written, MSG_HEADER_SIZE + 60000);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].payload.len(), 60000);
assert!(result.entries[0].payload.iter().all(|&b| b == 0x42));
assert!(result.entries[0].crc_ok);
}
#[test]
fn incomplete_message_not_drained() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"complete");
let hdr = read_header(&buf, 0);
let fake_write = hdr.write_ptr + 20;
let fake_msg = ShmMessage {
msg_type: 99,
length: 100,
crc32: 0,
_pad: 0,
};
let data_start = HEADER_SIZE;
let capacity = hdr.capacity as usize;
write_ring_bytes(
&mut buf,
data_start,
capacity,
hdr.write_ptr,
fake_msg.as_bytes(),
);
buf[16..24].copy_from_slice(&fake_write.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, b"complete");
}
#[test]
fn stimulus_payload_size_is_24() {
assert_eq!(std::mem::size_of::<StimulusPayload>(), 24);
}
#[test]
fn msg_type_stimulus_ascii() {
let bytes = MSG_TYPE_STIMULUS.to_be_bytes();
assert_eq!(&bytes, b"STIM");
}
#[test]
fn msg_type_scenario_start_ascii() {
let bytes = MSG_TYPE_SCENARIO_START.to_be_bytes();
assert_eq!(&bytes, b"SCST");
}
#[test]
fn msg_type_scenario_end_ascii() {
let bytes = MSG_TYPE_SCENARIO_END.to_be_bytes();
assert_eq!(&bytes, b"SCEN");
}
#[test]
fn msg_type_sched_exit_ascii() {
let bytes = MSG_TYPE_SCHED_EXIT.to_be_bytes();
assert_eq!(&bytes, b"SCDX");
}
#[test]
fn msg_type_crash_ascii() {
let bytes = MSG_TYPE_CRASH.to_be_bytes();
assert_eq!(&bytes, b"CRSH");
}
#[test]
fn stimulus_payload_roundtrip() {
let payload = StimulusPayload {
elapsed_ms: 1234,
step_index: 3,
op_count: 5,
op_kinds: 0b1010_0101,
cgroup_count: 4,
worker_count: 16,
total_iterations: 99999,
};
let bytes = payload.as_bytes();
let event = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(event.elapsed_ms, 1234);
assert_eq!(event.step_index, 3);
assert_eq!(event.op_count, 5);
assert_eq!(event.op_kinds, 0b1010_0101);
assert_eq!(event.cgroup_count, 4);
assert_eq!(event.worker_count, 16);
assert_eq!(event.total_iterations, 99999);
}
#[test]
fn stimulus_event_from_short_payload() {
assert!(StimulusEvent::from_payload(&[0u8; 19]).is_none());
assert!(StimulusEvent::from_payload(&[0u8; 24]).is_some());
}
#[test]
fn stimulus_write_and_drain() {
let mut buf = make_ring(1024);
let payload = StimulusPayload {
elapsed_ms: 500,
step_index: 1,
op_count: 3,
op_kinds: 7,
cgroup_count: 2,
worker_count: 8,
total_iterations: 42000,
};
let written = shm_write(&mut buf, 0, MSG_TYPE_STIMULUS, payload.as_bytes());
assert_eq!(written, MSG_HEADER_SIZE + 24);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, MSG_TYPE_STIMULUS);
assert!(result.entries[0].crc_ok);
let event = StimulusEvent::from_payload(&result.entries[0].payload).unwrap();
assert_eq!(event.elapsed_ms, 500);
assert_eq!(event.step_index, 1);
assert_eq!(event.op_count, 3);
}
#[test]
fn header_fields_at_expected_offsets() {
let mut buf = make_ring(256);
let hdr = ShmRingHeader {
magic: SHM_RING_MAGIC,
version: SHM_RING_VERSION,
capacity: 216,
_pad: 0,
write_ptr: 0x1122_3344_5566_7788,
read_ptr: 0xAABB_CCDD_EEFF_0011,
drops: 42,
};
buf[..HEADER_SIZE].copy_from_slice(hdr.as_bytes());
assert_eq!(
u32::from_ne_bytes(buf[0..4].try_into().unwrap()),
SHM_RING_MAGIC
);
assert_eq!(
u32::from_ne_bytes(buf[4..8].try_into().unwrap()),
SHM_RING_VERSION
);
assert_eq!(u32::from_ne_bytes(buf[8..12].try_into().unwrap()), 216);
assert_eq!(
u64::from_ne_bytes(buf[16..24].try_into().unwrap()),
0x1122_3344_5566_7788
);
assert_eq!(
u64::from_ne_bytes(buf[24..32].try_into().unwrap()),
0xAABB_CCDD_EEFF_0011
);
assert_eq!(u64::from_ne_bytes(buf[32..40].try_into().unwrap()), 42);
}
#[test]
fn dump_req_offset_in_pad() {
assert_eq!(DUMP_REQ_OFFSET, 12);
assert_eq!(DUMP_REQ_SYSRQ_D, b'D');
}
#[test]
fn stall_req_offset_in_pad() {
assert_eq!(STALL_REQ_OFFSET, 13);
assert_eq!(STALL_REQ_ACTIVATE, b'S');
}
#[test]
fn stimulus_event_from_exact_size_payload() {
let payload = StimulusPayload {
elapsed_ms: 42,
step_index: 7,
op_count: 3,
op_kinds: 0xFF,
cgroup_count: 2,
worker_count: 10,
total_iterations: 4,
};
let bytes = payload.as_bytes();
assert_eq!(bytes.len(), 24);
let event = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(event.elapsed_ms, 42);
assert_eq!(event.step_index, 7);
assert_eq!(event.op_count, 3);
assert_eq!(event.op_kinds, 0xFF);
assert_eq!(event.cgroup_count, 2);
assert_eq!(event.worker_count, 10);
assert_eq!(event.total_iterations, 4);
}
#[test]
fn stimulus_event_from_oversized_payload() {
let mut bytes = vec![0u8; 32];
bytes[0..4].copy_from_slice(&123u32.to_ne_bytes());
let event = StimulusEvent::from_payload(&bytes).unwrap();
assert_eq!(event.elapsed_ms, 123);
}
#[test]
fn concurrent_producer_consumer_simulated() {
let shm_size = HEADER_SIZE + 128;
let mut buf = make_ring(shm_size);
for round in 0..3 {
let base_type = round * 10;
shm_write(&mut buf, 0, base_type + 1, b"aa");
shm_write(&mut buf, 0, base_type + 2, b"bb");
shm_write(&mut buf, 0, base_type + 3, b"cc");
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
for e in &result.entries {
assert!(e.crc_ok);
}
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
}
}
#[test]
fn stimulus_event_from_empty_payload() {
assert!(StimulusEvent::from_payload(&[]).is_none());
}
#[test]
fn stimulus_event_clone_preserves_fields() {
let event = StimulusEvent {
elapsed_ms: 999,
step_index: 7,
op_count: 3,
op_kinds: 0xF0,
cgroup_count: 5,
worker_count: 20,
total_iterations: 16,
};
let c = event.clone();
assert_eq!(c.elapsed_ms, 999);
assert_eq!(c.step_index, 7);
assert_eq!(c.op_count, 3);
assert_eq!(c.op_kinds, 0xF0);
assert_eq!(c.cgroup_count, 5);
assert_eq!(c.worker_count, 20);
assert_eq!(c.total_iterations, 16);
}
#[test]
fn shm_drain_result_default_empty() {
let r = ShmDrainResult::default();
assert!(r.entries.is_empty());
assert_eq!(r.drops, 0);
}
#[test]
fn write_exact_capacity_then_empty() {
let data_size = 64;
let shm_size = HEADER_SIZE + data_size;
let mut buf = make_ring(shm_size);
let payload_len = data_size - MSG_HEADER_SIZE;
let payload = vec![0x55u8; payload_len];
let written = shm_write(&mut buf, 0, 1, &payload);
assert_eq!(written, data_size);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert!(result.entries[0].crc_ok);
assert_eq!(result.entries[0].payload.len(), payload_len);
}
#[test]
fn write_ring_bytes_wraparound_exact() {
let data_start = HEADER_SIZE;
let capacity = 16;
let shm_size = HEADER_SIZE + capacity;
let mut buf = vec![0u8; shm_size];
let data = [1u8, 2, 3, 4, 5, 6, 7, 8];
write_ring_bytes(&mut buf, data_start, capacity, 12, &data);
assert_eq!(&buf[data_start + 12..data_start + 16], &[1, 2, 3, 4]);
assert_eq!(&buf[data_start..data_start + 4], &[5, 6, 7, 8]);
}
#[test]
fn read_ring_bytes_wraparound_exact() {
let data_start = HEADER_SIZE;
let capacity = 16;
let shm_size = HEADER_SIZE + capacity;
let mut buf = vec![0u8; shm_size];
buf[data_start + 14] = 0xAA;
buf[data_start + 15] = 0xBB;
buf[data_start] = 0xCC;
buf[data_start + 1] = 0xDD;
let out = read_ring_bytes(&buf, data_start, capacity, 14, 4);
assert_eq!(out, vec![0xAA, 0xBB, 0xCC, 0xDD]);
}
#[test]
fn stimulus_payload_as_bytes_roundtrip() {
let p = StimulusPayload {
elapsed_ms: u32::MAX,
step_index: u16::MAX,
op_count: u16::MAX,
op_kinds: u32::MAX,
cgroup_count: u16::MAX,
worker_count: u16::MAX,
total_iterations: u64::MAX,
};
let bytes = p.as_bytes();
let e = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(e.elapsed_ms, u32::MAX);
assert_eq!(e.step_index, u16::MAX);
assert_eq!(e.op_count, u16::MAX);
assert_eq!(e.op_kinds, u32::MAX);
assert_eq!(e.cgroup_count, u16::MAX);
assert_eq!(e.worker_count, u16::MAX);
assert_eq!(e.total_iterations, u64::MAX);
}
#[test]
fn multiple_writes_fill_and_drop() {
let shm_size = HEADER_SIZE + 80;
let mut buf = make_ring(shm_size);
assert_eq!(shm_write(&mut buf, 0, 1, &[0xAA; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 2, &[0xBB; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 3, &[0xCC; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 4, &[0xDD; 8]), 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
assert_eq!(result.drops, 1);
}
}