use std::ptr;
use zerocopy::{FromBytes, IntoBytes};
pub(crate) struct ShmMmap {
pub ptr: *mut u8,
pub map_base: *mut libc::c_void,
pub map_size: usize,
}
pub(crate) fn mmap_devmem(
fd: std::os::unix::io::RawFd,
shm_base: u64,
shm_size: u64,
) -> Option<ShmMmap> {
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
let aligned_base = shm_base & !(page_size - 1);
let offset_in_page = (shm_base - aligned_base) as usize;
let map_size = shm_size as usize + offset_in_page;
let map_base = unsafe {
libc::mmap(
std::ptr::null_mut(),
map_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
aligned_base as libc::off_t,
)
};
if map_base == libc::MAP_FAILED {
return None;
}
let ptr = unsafe { (map_base as *mut u8).add(offset_in_page) };
Some(ShmMmap {
ptr,
map_base,
map_size,
})
}
pub const SHM_RING_MAGIC: u32 = 0x5354_4d52;
pub const MSG_TYPE_STIMULUS: u32 = 0x5354_494D;
pub const MSG_TYPE_SCENARIO_START: u32 = 0x5343_5354;
pub const MSG_TYPE_SCENARIO_END: u32 = 0x5343_454E;
pub const MSG_TYPE_EXIT: u32 = 0x4558_4954;
pub const MSG_TYPE_TEST_RESULT: u32 = 0x5445_5354;
pub const MSG_TYPE_SCHED_EXIT: u32 = 0x5343_4458;
pub const MSG_TYPE_CRASH: u32 = 0x4352_5348;
pub const MSG_TYPE_PAYLOAD_METRICS: u32 = 0x504d_4554;
pub const MSG_TYPE_RAW_PAYLOAD_OUTPUT: u32 = 0x5241_574f;
pub const SHM_RING_VERSION: u32 = 1;
pub const MAX_SHM_CAPACITY: u32 = 1 << 30;
pub const DUMP_REQ_OFFSET: usize = 12;
pub const DUMP_REQ_SYSRQ_D: u8 = b'D';
pub const STALL_REQ_OFFSET: usize = 13;
pub const STALL_REQ_ACTIVATE: u8 = b'S';
pub const SIGNAL_SLOT_BASE: usize = 14;
const SIGNAL_SLOT_COUNT: usize = 2;
pub const SIGNAL_SHUTDOWN_REQ: u8 = 0xDD;
pub const SIGNAL_PROBES_READY: u8 = 2;
pub fn wait_for(slot: u8, timeout: std::time::Duration) -> anyhow::Result<()> {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let (ptr, _) = shm_ptr()?;
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
let deadline = std::time::Instant::now() + timeout;
while std::time::Instant::now() < deadline {
if atom.load(std::sync::atomic::Ordering::Acquire) != 0 {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
anyhow::bail!("signal slot {slot} timed out after {timeout:?}")
}
pub fn signal(slot: u8) {
signal_value(slot, 1);
}
pub fn signal_guest(mem: &crate::monitor::reader::GuestMem, shm_base: u64, slot: u8) {
signal_guest_value(mem, shm_base, slot, 1);
}
pub fn signal_guest_value(
mem: &crate::monitor::reader::GuestMem,
shm_base: u64,
slot: u8,
value: u8,
) {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
mem.write_u8(shm_base, SIGNAL_SLOT_BASE + slot as usize, value);
}
pub fn read_signal(slot: u8) -> u8 {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let Ok((ptr, _)) = shm_ptr() else { return 0 };
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
atom.load(std::sync::atomic::Ordering::Acquire)
}
pub fn signal_value(slot: u8, value: u8) {
assert!(
(slot as usize) < SIGNAL_SLOT_COUNT,
"signal slot {slot} out of range"
);
let Ok((ptr, _)) = shm_ptr() else { return };
let offset = SIGNAL_SLOT_BASE + slot as usize;
let atom = unsafe { &*(ptr.add(offset) as *const std::sync::atomic::AtomicU8) };
atom.store(value, std::sync::atomic::Ordering::Release);
}
pub fn init_shm_ptr(base: *mut u8, size: usize) {
let _ = SHM_PTR.set(ShmPtr { ptr: base, size });
}
pub fn write_msg(msg_type: u32, payload: &[u8]) {
let Ok((ptr, size)) = shm_ptr() else { return };
let _guard = SHM_WRITE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
unsafe { shm_write_raw(ptr, size, msg_type, payload) };
}
pub fn write_msg_nonblocking(msg_type: u32, payload: &[u8]) -> bool {
let Ok((ptr, size)) = shm_ptr() else {
return false;
};
let Ok(_guard) = SHM_WRITE_LOCK.try_lock() else {
return false;
};
unsafe { shm_write_raw(ptr, size, msg_type, payload) };
true
}
struct ShmPtr {
ptr: *mut u8,
size: usize,
}
unsafe impl Send for ShmPtr {}
unsafe impl Sync for ShmPtr {}
static SHM_PTR: std::sync::OnceLock<ShmPtr> = std::sync::OnceLock::new();
pub static SHM_WRITE_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn shm_ptr() -> anyhow::Result<(*mut u8, usize)> {
if let Some(p) = SHM_PTR.get() {
return Ok((p.ptr, p.size));
}
let cmdline = std::fs::read_to_string("/proc/cmdline")
.map_err(|e| anyhow::anyhow!("/proc/cmdline: {e}"))?;
let (shm_base, shm_size) = parse_shm_params_from_str(&cmdline)
.ok_or_else(|| anyhow::anyhow!("no SHM params in cmdline"))?;
let fd = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open("/dev/mem")
.map_err(|e| anyhow::anyhow!("/dev/mem open: {e}"))?;
let m = mmap_devmem(
std::os::unix::io::AsRawFd::as_raw_fd(&fd),
shm_base,
shm_size,
)
.ok_or_else(|| anyhow::anyhow!("/dev/mem mmap failed: {}", std::io::Error::last_os_error()))?;
let size = shm_size as usize;
let _ = SHM_PTR.set(ShmPtr { ptr: m.ptr, size });
Ok((m.ptr, size))
}
pub(crate) fn parse_shm_params_from_str(cmdline: &str) -> Option<(u64, u64)> {
let base = cmdline
.split_whitespace()
.find(|s| s.starts_with("KTSTR_SHM_BASE="))?
.strip_prefix("KTSTR_SHM_BASE=")?;
let size = cmdline
.split_whitespace()
.find(|s| s.starts_with("KTSTR_SHM_SIZE="))?
.strip_prefix("KTSTR_SHM_SIZE=")?;
let base =
u64::from_str_radix(base.trim_start_matches("0x").trim_start_matches("0X"), 16).ok()?;
let size =
u64::from_str_radix(size.trim_start_matches("0x").trim_start_matches("0X"), 16).ok()?;
Some((base, size))
}
#[repr(C)]
#[derive(
Clone, Copy, Default, FromBytes, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout,
)]
pub struct ShmRingHeader {
pub magic: u32,
pub version: u32,
pub capacity: u32,
pub control_bytes: u32,
pub write_ptr: u64,
pub read_ptr: u64,
pub drops: u64,
}
const _HEADER_SIZE: () = assert!(std::mem::size_of::<ShmRingHeader>() == 40);
impl ShmRingHeader {
pub fn new(shm_size: usize) -> Self {
let capacity = shm_size.saturating_sub(HEADER_SIZE);
Self {
magic: SHM_RING_MAGIC,
version: SHM_RING_VERSION,
capacity: capacity as u32,
control_bytes: 0,
write_ptr: 0,
read_ptr: 0,
drops: 0,
}
}
}
#[repr(C)]
#[derive(
Clone, Copy, Default, FromBytes, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout,
)]
pub struct ShmMessage {
pub msg_type: u32,
pub length: u32,
pub crc32: u32,
pub _pad: u32,
}
const _MSG_SIZE: () = assert!(std::mem::size_of::<ShmMessage>() == 16);
pub const HEADER_SIZE: usize = std::mem::size_of::<ShmRingHeader>();
pub const MSG_HEADER_SIZE: usize = std::mem::size_of::<ShmMessage>();
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ShmEntry {
pub msg_type: u32,
pub payload: Vec<u8>,
pub crc_ok: bool,
}
#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct ShmDrainResult {
pub entries: Vec<ShmEntry>,
pub drops: u64,
}
#[repr(C)]
#[derive(Clone, Copy, Default, Debug, IntoBytes, zerocopy::Immutable, zerocopy::KnownLayout)]
pub struct StimulusPayload {
pub elapsed_ms: u32,
pub step_index: u16,
pub op_count: u16,
pub op_kinds: u32,
pub cgroup_count: u16,
pub worker_count: u16,
pub total_iterations: u64,
}
const _STIMULUS_SIZE: () = assert!(std::mem::size_of::<StimulusPayload>() == 24);
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct StimulusEvent {
pub elapsed_ms: u32,
pub step_index: u16,
pub op_count: u16,
pub op_kinds: u32,
pub cgroup_count: u16,
pub worker_count: u16,
pub total_iterations: u64,
}
impl StimulusEvent {
pub fn from_payload(data: &[u8]) -> Option<Self> {
if data.len() < std::mem::size_of::<StimulusPayload>() {
return None;
}
Some(StimulusEvent {
elapsed_ms: u32::from_ne_bytes(data[0..4].try_into().ok()?),
step_index: u16::from_ne_bytes(data[4..6].try_into().ok()?),
op_count: u16::from_ne_bytes(data[6..8].try_into().ok()?),
op_kinds: u32::from_ne_bytes(data[8..12].try_into().ok()?),
cgroup_count: u16::from_ne_bytes(data[12..14].try_into().ok()?),
worker_count: u16::from_ne_bytes(data[14..16].try_into().ok()?),
total_iterations: u64::from_ne_bytes(data[16..24].try_into().ok()?),
})
}
}
#[cfg(test)]
pub fn shm_init(buf: &mut [u8], shm_offset: usize, shm_size: usize) {
let header = ShmRingHeader::new(shm_size);
let hdr_bytes = header.as_bytes();
buf[shm_offset..shm_offset + HEADER_SIZE].copy_from_slice(hdr_bytes);
let data_start = shm_offset + HEADER_SIZE;
let data_end = shm_offset + shm_size;
buf[data_start..data_end].fill(0);
}
fn read_header(buf: &[u8], shm_offset: usize) -> ShmRingHeader {
let s = &buf[shm_offset..shm_offset + HEADER_SIZE];
ShmRingHeader::read_from_bytes(s).expect("HEADER_SIZE matches ShmRingHeader layout")
}
fn read_ring_bytes(
buf: &[u8],
data_start: usize,
capacity: usize,
ptr: u64,
len: usize,
) -> Vec<u8> {
let mut out = vec![0u8; len];
read_ring_into(buf, data_start, capacity, ptr, &mut out);
out
}
fn read_ring_into(buf: &[u8], data_start: usize, capacity: usize, ptr: u64, out: &mut [u8]) {
let len = out.len();
let mut remaining = len;
let mut src_pos = (ptr % capacity as u64) as usize;
let mut dst_pos = 0;
while remaining > 0 {
let chunk = remaining.min(capacity - src_pos);
out[dst_pos..dst_pos + chunk]
.copy_from_slice(&buf[data_start + src_pos..data_start + src_pos + chunk]);
dst_pos += chunk;
src_pos = 0; remaining -= chunk;
}
}
pub fn shm_drain(buf: &[u8], shm_offset: usize) -> ShmDrainResult {
if shm_offset.saturating_add(HEADER_SIZE) > buf.len() {
return ShmDrainResult::default();
}
let header = read_header(buf, shm_offset);
if header.magic != SHM_RING_MAGIC {
return ShmDrainResult::default();
}
if header.capacity == 0 || header.capacity > MAX_SHM_CAPACITY {
return ShmDrainResult::default();
}
let capacity = header.capacity as usize;
let data_start = shm_offset + HEADER_SIZE;
let mut read_pos = header.read_ptr;
let write_pos = header.write_ptr;
let mut entries = Vec::new();
if write_pos.wrapping_sub(read_pos) > capacity as u64 {
return ShmDrainResult {
entries,
drops: header.drops,
};
}
let max_payload = (capacity - MSG_HEADER_SIZE.min(capacity)) as u64;
while write_pos.wrapping_sub(read_pos) >= MSG_HEADER_SIZE as u64 {
let mut hdr_buf = [0u8; MSG_HEADER_SIZE];
read_ring_into(buf, data_start, capacity, read_pos, &mut hdr_buf);
let msg = ShmMessage::read_from_bytes(&hdr_buf)
.expect("MSG_HEADER_SIZE matches ShmMessage layout");
if msg.length as u64 > max_payload {
break;
}
let total_msg_size = MSG_HEADER_SIZE as u64 + msg.length as u64;
if write_pos.wrapping_sub(read_pos) < total_msg_size {
break;
}
let payload = read_ring_bytes(
buf,
data_start,
capacity,
read_pos.wrapping_add(MSG_HEADER_SIZE as u64),
msg.length as usize,
);
let computed_crc = crc32fast::hash(&payload);
entries.push(ShmEntry {
msg_type: msg.msg_type,
payload,
crc_ok: computed_crc == msg.crc32,
});
read_pos = read_pos.wrapping_add(total_msg_size);
}
ShmDrainResult {
entries,
drops: header.drops,
}
}
pub fn shm_drain_live(mem: &crate::monitor::reader::GuestMem, shm_base_pa: u64) -> ShmDrainResult {
let magic = mem.read_u32(shm_base_pa, 0);
if magic != SHM_RING_MAGIC {
return ShmDrainResult::default();
}
let capacity_raw = mem.read_u32(shm_base_pa, 8);
if capacity_raw == 0 || capacity_raw > MAX_SHM_CAPACITY {
return ShmDrainResult::default();
}
let capacity = capacity_raw as usize;
let write_ptr = mem.read_u64(shm_base_pa, 16);
let read_ptr = mem.read_u64(shm_base_pa, 24);
let drops = mem.read_u64(shm_base_pa, 32);
let data_start_pa = shm_base_pa + HEADER_SIZE as u64;
let mut read_pos = read_ptr;
let mut entries = Vec::new();
if write_ptr.wrapping_sub(read_pos) > capacity as u64 {
return ShmDrainResult { entries, drops };
}
let max_payload = (capacity - MSG_HEADER_SIZE.min(capacity)) as u64;
while write_ptr.wrapping_sub(read_pos) >= MSG_HEADER_SIZE as u64 {
let mut hdr_buf = [0u8; MSG_HEADER_SIZE];
read_ring_volatile(mem, data_start_pa, capacity, read_pos, &mut hdr_buf);
let msg = ShmMessage::read_from_bytes(&hdr_buf)
.expect("MSG_HEADER_SIZE matches ShmMessage layout");
if msg.length as u64 > max_payload {
break;
}
let total_msg_size = MSG_HEADER_SIZE as u64 + msg.length as u64;
if write_ptr.wrapping_sub(read_pos) < total_msg_size {
break;
}
let mut payload = vec![0u8; msg.length as usize];
if !payload.is_empty() {
read_ring_volatile(
mem,
data_start_pa,
capacity,
read_pos.wrapping_add(MSG_HEADER_SIZE as u64),
&mut payload,
);
}
let computed_crc = crc32fast::hash(&payload);
entries.push(ShmEntry {
msg_type: msg.msg_type,
payload,
crc_ok: computed_crc == msg.crc32,
});
read_pos = read_pos.wrapping_add(total_msg_size);
}
if read_pos != read_ptr {
mem.write_u64(shm_base_pa, 24, read_pos);
}
ShmDrainResult { entries, drops }
}
fn read_ring_volatile(
mem: &crate::monitor::reader::GuestMem,
data_start_pa: u64,
capacity: usize,
ptr: u64,
out: &mut [u8],
) {
let mut remaining = out.len();
let mut src_pos = (ptr % capacity as u64) as usize;
let mut dst_pos = 0;
while remaining > 0 {
let chunk = remaining.min(capacity - src_pos);
for i in 0..chunk {
let pa = data_start_pa + (src_pos + i) as u64;
out[dst_pos + i] = mem.read_u8(pa, 0);
}
dst_pos += chunk;
src_pos = 0; remaining -= chunk;
}
}
#[allow(dead_code)]
pub fn shm_write(buf: &mut [u8], shm_offset: usize, msg_type: u32, payload: &[u8]) -> usize {
let header = read_header(buf, shm_offset);
let capacity = header.capacity as usize;
let Some(total) = MSG_HEADER_SIZE.checked_add(payload.len()) else {
let drops_offset = shm_offset + 32;
let current = u64::from_ne_bytes(buf[drops_offset..drops_offset + 8].try_into().unwrap());
buf[drops_offset..drops_offset + 8]
.copy_from_slice(¤t.saturating_add(1).to_ne_bytes());
return 0;
};
let used = header.write_ptr.wrapping_sub(header.read_ptr) as usize;
if used > capacity {
tracing::warn!(
write_ptr = header.write_ptr,
read_ptr = header.read_ptr,
capacity = capacity,
used = used,
"shm_ring: used > capacity; ring invariant violated (torn memory?)"
);
return 0;
}
let needed = used.checked_add(total);
if needed.is_none_or(|n| n > capacity) {
let drops_offset = shm_offset + 32; let current = u64::from_ne_bytes(buf[drops_offset..drops_offset + 8].try_into().unwrap());
buf[drops_offset..drops_offset + 8]
.copy_from_slice(¤t.saturating_add(1).to_ne_bytes());
return 0;
}
let data_start = shm_offset + HEADER_SIZE;
let Ok(length_u32) = u32::try_from(payload.len()) else {
let drops_offset = shm_offset + 32;
let current = u64::from_ne_bytes(buf[drops_offset..drops_offset + 8].try_into().unwrap());
buf[drops_offset..drops_offset + 8]
.copy_from_slice(¤t.saturating_add(1).to_ne_bytes());
return 0;
};
let msg = ShmMessage {
msg_type,
length: length_u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
write_ring_bytes(buf, data_start, capacity, header.write_ptr, msg.as_bytes());
if !payload.is_empty() {
write_ring_bytes(
buf,
data_start,
capacity,
header.write_ptr + MSG_HEADER_SIZE as u64,
payload,
);
}
let new_write = header.write_ptr + total as u64;
let wp_offset = shm_offset + 16; buf[wp_offset..wp_offset + 8].copy_from_slice(&new_write.to_ne_bytes());
total
}
#[allow(dead_code)]
fn write_ring_bytes(buf: &mut [u8], data_start: usize, capacity: usize, ptr: u64, data: &[u8]) {
let mut remaining = data.len();
let mut src_pos = 0;
let mut dst_pos = (ptr % capacity as u64) as usize;
while remaining > 0 {
let chunk = remaining.min(capacity - dst_pos);
buf[data_start + dst_pos..data_start + dst_pos + chunk]
.copy_from_slice(&data[src_pos..src_pos + chunk]);
src_pos += chunk;
dst_pos = 0; remaining -= chunk;
}
}
#[allow(dead_code)]
unsafe fn shm_write_raw(base: *mut u8, size: usize, msg_type: u32, payload: &[u8]) {
if size < HEADER_SIZE {
return;
}
let capacity = unsafe { ptr::read_volatile(base.add(8) as *const u32) } as usize;
if capacity == 0 || capacity > size - HEADER_SIZE {
return;
}
let write_ptr = unsafe { ptr::read_volatile(base.add(16) as *const u64) };
let read_ptr = unsafe { ptr::read_volatile(base.add(24) as *const u64) };
let bump_drops = || {
let drops = unsafe { ptr::read_volatile(base.add(32) as *const u64) };
unsafe { ptr::write_volatile(base.add(32) as *mut u64, drops.saturating_add(1)) };
};
let Some(total) = MSG_HEADER_SIZE.checked_add(payload.len()) else {
bump_drops();
return;
};
let used = write_ptr.wrapping_sub(read_ptr) as usize;
if used > capacity {
return;
}
let needed = used.checked_add(total);
if needed.is_none_or(|n| n > capacity) {
bump_drops();
return;
}
let Ok(length_u32) = u32::try_from(payload.len()) else {
bump_drops();
return;
};
let msg = ShmMessage {
msg_type,
length: length_u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
let msg_bytes = msg.as_bytes();
let data_base = unsafe { base.add(HEADER_SIZE) };
unsafe {
write_ring_volatile(data_base, capacity, write_ptr, msg_bytes);
if !payload.is_empty() {
write_ring_volatile(
data_base,
capacity,
write_ptr.wrapping_add(MSG_HEADER_SIZE as u64),
payload,
);
}
}
let new_write = write_ptr.wrapping_add(total as u64);
unsafe { ptr::write_volatile(base.add(16) as *mut u64, new_write) };
}
#[allow(dead_code)]
unsafe fn write_ring_volatile(data_base: *mut u8, capacity: usize, ptr: u64, data: &[u8]) {
let mut remaining = data.len();
let mut src_pos = 0usize;
let mut dst_pos = (ptr % capacity as u64) as usize;
while remaining > 0 {
let chunk = remaining.min(capacity - dst_pos);
for i in 0..chunk {
unsafe {
ptr::write_volatile(data_base.add(dst_pos + i), data[src_pos + i]);
}
}
src_pos += chunk;
remaining -= chunk;
dst_pos = 0; }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn header_size_is_40() {
assert_eq!(std::mem::size_of::<ShmRingHeader>(), 40);
}
#[test]
fn message_size_is_16() {
assert_eq!(std::mem::size_of::<ShmMessage>(), 16);
}
#[test]
fn shm_ring_header_new_zero_input_clamps_to_zero_capacity() {
assert_eq!(ShmRingHeader::new(0).capacity, 0);
}
#[test]
fn shm_ring_header_new_below_header_size_clamps_to_zero_capacity() {
assert_eq!(ShmRingHeader::new(HEADER_SIZE - 1).capacity, 0);
}
#[test]
fn shm_ring_header_new_exactly_header_size_yields_zero_capacity() {
assert_eq!(ShmRingHeader::new(HEADER_SIZE).capacity, 0);
}
#[test]
fn shm_ring_header_new_above_header_size_carries_delta_as_capacity() {
assert_eq!(ShmRingHeader::new(HEADER_SIZE + 100).capacity, 100);
}
fn make_ring(shm_size: usize) -> Vec<u8> {
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
buf
}
#[test]
fn init_sets_magic_and_capacity() {
let buf = make_ring(1024);
let hdr = read_header(&buf, 0);
assert_eq!(hdr.magic, SHM_RING_MAGIC);
assert_eq!(hdr.version, SHM_RING_VERSION);
assert_eq!(hdr.capacity, (1024 - HEADER_SIZE) as u32);
assert_eq!(hdr.write_ptr, 0);
assert_eq!(hdr.read_ptr, 0);
assert_eq!(hdr.drops, 0);
}
#[test]
fn shm_write_rejects_torn_header_read_ptr_past_write_ptr() {
let mut buf = make_ring(1024);
let wp_offset = 16;
let rp_offset = 24;
buf[wp_offset..wp_offset + 8].copy_from_slice(&0u64.to_ne_bytes());
buf[rp_offset..rp_offset + 8].copy_from_slice(&100u64.to_ne_bytes());
let result = shm_write(&mut buf, 0, 1, b"probe");
assert_eq!(
result, 0,
"torn header (read_ptr > write_ptr) must return 0, got {result}"
);
}
#[test]
fn shm_write_wrapping_sub_handles_u64_overflow_of_write_ptr() {
let mut buf = make_ring(4096);
let wp_offset = 16;
let rp_offset = 24;
let new_write_ptr: u64 = 10;
let new_read_ptr: u64 = u64::MAX - 5;
buf[wp_offset..wp_offset + 8].copy_from_slice(&new_write_ptr.to_ne_bytes());
buf[rp_offset..rp_offset + 8].copy_from_slice(&new_read_ptr.to_ne_bytes());
assert_eq!(new_write_ptr.wrapping_sub(new_read_ptr), 16);
let result = shm_write(&mut buf, 0, 1, b"probe");
assert!(
result > 0,
"post-wraparound write should succeed, got {result}"
);
}
#[test]
fn drain_rejects_zero_capacity() {
let mut buf = make_ring(1024);
buf[8..12].copy_from_slice(&0u32.to_ne_bytes());
buf[16..24].copy_from_slice(&64u64.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(
result.entries.is_empty(),
"capacity=0 must bail out, got {} entries",
result.entries.len(),
);
}
#[test]
fn drain_rejects_oversized_capacity() {
let mut buf = make_ring(1024);
buf[8..12].copy_from_slice(&u32::MAX.to_ne_bytes());
buf[16..24].copy_from_slice(&64u64.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(
result.entries.is_empty(),
"capacity>MAX_SHM_CAPACITY must bail out, got {} entries",
result.entries.len(),
);
}
#[test]
fn drain_rejects_capacity_one_past_max() {
let mut buf = make_ring(1024);
let over = (MAX_SHM_CAPACITY as u64) + 1;
let over_u32 = over as u32;
buf[8..12].copy_from_slice(&over_u32.to_ne_bytes());
buf[16..24].copy_from_slice(&64u64.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn drain_empty_ring() {
let buf = make_ring(1024);
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
assert_eq!(result.drops, 0);
}
#[test]
fn drain_bad_magic() {
let mut buf = vec![0u8; 1024];
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
buf[0..4].copy_from_slice(&0xDEADBEEFu32.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn write_and_drain_single_message() {
let mut buf = make_ring(1024);
let payload = b"hello world";
let written = shm_write(&mut buf, 0, 1, payload);
assert_eq!(written, MSG_HEADER_SIZE + payload.len());
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, payload);
assert!(result.entries[0].crc_ok);
assert_eq!(result.drops, 0);
}
#[test]
fn write_and_drain_multiple_messages() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"first");
shm_write(&mut buf, 0, 2, b"second");
shm_write(&mut buf, 0, 3, b"third");
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, b"first");
assert_eq!(result.entries[1].msg_type, 2);
assert_eq!(result.entries[1].payload, b"second");
assert_eq!(result.entries[2].msg_type, 3);
assert_eq!(result.entries[2].payload, b"third");
for e in &result.entries {
assert!(e.crc_ok);
}
}
#[test]
fn write_empty_payload() {
let mut buf = make_ring(1024);
let written = shm_write(&mut buf, 0, 42, b"");
assert_eq!(written, MSG_HEADER_SIZE);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 42);
assert!(result.entries[0].payload.is_empty());
assert!(result.entries[0].crc_ok);
}
#[test]
fn ring_full_increments_drops() {
let shm_size = HEADER_SIZE + 60;
let mut buf = make_ring(shm_size);
let payload = vec![0xAA; 44]; let written = shm_write(&mut buf, 0, 1, &payload);
assert_eq!(written, 60);
let written = shm_write(&mut buf, 0, 2, b"x");
assert_eq!(written, 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.drops, 1);
}
#[test]
fn ring_full_multiple_drops() {
let shm_size = HEADER_SIZE + 32;
let mut buf = make_ring(shm_size);
let payload = vec![0xBB; 16]; shm_write(&mut buf, 0, 1, &payload);
assert_eq!(shm_write(&mut buf, 0, 2, b"a"), 0);
assert_eq!(shm_write(&mut buf, 0, 3, b"b"), 0);
assert_eq!(shm_write(&mut buf, 0, 4, b"c"), 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.drops, 3);
}
#[test]
fn wraparound_single_message() {
let shm_size = HEADER_SIZE + 48;
let mut buf = make_ring(shm_size);
let payload1 = vec![0x11; 16];
shm_write(&mut buf, 0, 1, &payload1);
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
let payload2 = vec![0x22; 16];
shm_write(&mut buf, 0, 2, &payload2);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 2);
assert_eq!(result.entries[0].payload, payload2);
assert!(result.entries[0].crc_ok);
}
#[test]
fn wraparound_message_header_splits() {
let shm_size = HEADER_SIZE + 40;
let mut buf = make_ring(shm_size);
shm_write(&mut buf, 0, 1, &[0xAA; 16]);
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
let payload2 = vec![0xBB; 4];
shm_write(&mut buf, 0, 2, &payload2);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 2);
assert_eq!(result.entries[0].payload, payload2);
assert!(result.entries[0].crc_ok);
}
#[test]
fn crc_detects_corruption() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"integrity check");
let data_start = HEADER_SIZE;
let payload_start = data_start + MSG_HEADER_SIZE;
buf[payload_start] ^= 0xFF;
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert!(!result.entries[0].crc_ok);
}
#[test]
fn crc_empty_payload_is_zero_for_empty() {
assert_eq!(crc32fast::hash(b""), 0x0000_0000);
}
#[test]
fn crc32_known_vectors() {
assert_eq!(crc32fast::hash(b"123456789"), 0xCBF4_3926);
assert_eq!(crc32fast::hash(b""), 0x0000_0000);
assert_eq!(crc32fast::hash(b"a"), 0xE8B7_BE43);
}
#[test]
fn nonzero_shm_offset() {
let offset = 4096;
let shm_size = 512;
let total = offset + shm_size;
let mut buf = vec![0xFFu8; total];
shm_init(&mut buf, offset, shm_size);
shm_write(&mut buf, offset, 7, b"offset test");
let result = shm_drain(&buf, offset);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 7);
assert_eq!(result.entries[0].payload, b"offset test");
assert!(result.entries[0].crc_ok);
}
#[test]
fn large_payload() {
let mut buf = make_ring(65536);
let payload = vec![0x42; 60000];
let written = shm_write(&mut buf, 0, 99, &payload);
assert_eq!(written, MSG_HEADER_SIZE + 60000);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].payload.len(), 60000);
assert!(result.entries[0].payload.iter().all(|&b| b == 0x42));
assert!(result.entries[0].crc_ok);
}
#[test]
fn incomplete_message_not_drained() {
let mut buf = make_ring(1024);
shm_write(&mut buf, 0, 1, b"complete");
let hdr = read_header(&buf, 0);
let fake_write = hdr.write_ptr + 20;
let fake_msg = ShmMessage {
msg_type: 99,
length: 100,
crc32: 0,
_pad: 0,
};
let data_start = HEADER_SIZE;
let capacity = hdr.capacity as usize;
write_ring_bytes(
&mut buf,
data_start,
capacity,
hdr.write_ptr,
fake_msg.as_bytes(),
);
buf[16..24].copy_from_slice(&fake_write.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 1);
assert_eq!(result.entries[0].payload, b"complete");
}
#[test]
fn stimulus_payload_size_is_24() {
assert_eq!(std::mem::size_of::<StimulusPayload>(), 24);
}
#[test]
fn msg_type_stimulus_ascii() {
let bytes = MSG_TYPE_STIMULUS.to_be_bytes();
assert_eq!(&bytes, b"STIM");
}
#[test]
fn msg_type_scenario_start_ascii() {
let bytes = MSG_TYPE_SCENARIO_START.to_be_bytes();
assert_eq!(&bytes, b"SCST");
}
#[test]
fn msg_type_scenario_end_ascii() {
let bytes = MSG_TYPE_SCENARIO_END.to_be_bytes();
assert_eq!(&bytes, b"SCEN");
}
#[test]
fn msg_type_sched_exit_ascii() {
let bytes = MSG_TYPE_SCHED_EXIT.to_be_bytes();
assert_eq!(&bytes, b"SCDX");
}
#[test]
fn msg_type_crash_ascii() {
let bytes = MSG_TYPE_CRASH.to_be_bytes();
assert_eq!(&bytes, b"CRSH");
}
#[test]
fn stimulus_payload_roundtrip() {
let payload = StimulusPayload {
elapsed_ms: 1234,
step_index: 3,
op_count: 5,
op_kinds: 0b1010_0101,
cgroup_count: 4,
worker_count: 16,
total_iterations: 99999,
};
let bytes = payload.as_bytes();
let event = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(event.elapsed_ms, 1234);
assert_eq!(event.step_index, 3);
assert_eq!(event.op_count, 5);
assert_eq!(event.op_kinds, 0b1010_0101);
assert_eq!(event.cgroup_count, 4);
assert_eq!(event.worker_count, 16);
assert_eq!(event.total_iterations, 99999);
}
#[test]
fn stimulus_event_from_short_payload() {
assert!(StimulusEvent::from_payload(&[0u8; 19]).is_none());
assert!(StimulusEvent::from_payload(&[0u8; 24]).is_some());
}
#[test]
fn stimulus_write_and_drain() {
let mut buf = make_ring(1024);
let payload = StimulusPayload {
elapsed_ms: 500,
step_index: 1,
op_count: 3,
op_kinds: 7,
cgroup_count: 2,
worker_count: 8,
total_iterations: 42000,
};
let written = shm_write(&mut buf, 0, MSG_TYPE_STIMULUS, payload.as_bytes());
assert_eq!(written, MSG_HEADER_SIZE + 24);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, MSG_TYPE_STIMULUS);
assert!(result.entries[0].crc_ok);
let event = StimulusEvent::from_payload(&result.entries[0].payload).unwrap();
assert_eq!(event.elapsed_ms, 500);
assert_eq!(event.step_index, 1);
assert_eq!(event.op_count, 3);
}
#[test]
fn header_fields_at_expected_offsets() {
let mut buf = make_ring(256);
let hdr = ShmRingHeader {
magic: SHM_RING_MAGIC,
version: SHM_RING_VERSION,
capacity: 216,
control_bytes: 0,
write_ptr: 0x1122_3344_5566_7788,
read_ptr: 0xAABB_CCDD_EEFF_0011,
drops: 42,
};
buf[..HEADER_SIZE].copy_from_slice(hdr.as_bytes());
assert_eq!(
u32::from_ne_bytes(buf[0..4].try_into().unwrap()),
SHM_RING_MAGIC
);
assert_eq!(
u32::from_ne_bytes(buf[4..8].try_into().unwrap()),
SHM_RING_VERSION
);
assert_eq!(u32::from_ne_bytes(buf[8..12].try_into().unwrap()), 216);
assert_eq!(
u64::from_ne_bytes(buf[16..24].try_into().unwrap()),
0x1122_3344_5566_7788
);
assert_eq!(
u64::from_ne_bytes(buf[24..32].try_into().unwrap()),
0xAABB_CCDD_EEFF_0011
);
assert_eq!(u64::from_ne_bytes(buf[32..40].try_into().unwrap()), 42);
}
#[test]
fn dump_req_offset_in_control_bytes() {
assert_eq!(DUMP_REQ_OFFSET, 12);
assert_eq!(DUMP_REQ_SYSRQ_D, b'D');
}
#[test]
fn stall_req_offset_in_control_bytes() {
assert_eq!(STALL_REQ_OFFSET, 13);
assert_eq!(STALL_REQ_ACTIVATE, b'S');
}
#[test]
fn stimulus_event_from_exact_size_payload() {
let payload = StimulusPayload {
elapsed_ms: 42,
step_index: 7,
op_count: 3,
op_kinds: 0xFF,
cgroup_count: 2,
worker_count: 10,
total_iterations: 4,
};
let bytes = payload.as_bytes();
assert_eq!(bytes.len(), 24);
let event = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(event.elapsed_ms, 42);
assert_eq!(event.step_index, 7);
assert_eq!(event.op_count, 3);
assert_eq!(event.op_kinds, 0xFF);
assert_eq!(event.cgroup_count, 2);
assert_eq!(event.worker_count, 10);
assert_eq!(event.total_iterations, 4);
}
#[test]
fn stimulus_event_from_oversized_payload() {
let mut bytes = vec![0u8; 32];
bytes[0..4].copy_from_slice(&123u32.to_ne_bytes());
let event = StimulusEvent::from_payload(&bytes).unwrap();
assert_eq!(event.elapsed_ms, 123);
}
#[test]
fn concurrent_producer_consumer_simulated() {
let shm_size = HEADER_SIZE + 128;
let mut buf = make_ring(shm_size);
for round in 0..3 {
let base_type = round * 10;
shm_write(&mut buf, 0, base_type + 1, b"aa");
shm_write(&mut buf, 0, base_type + 2, b"bb");
shm_write(&mut buf, 0, base_type + 3, b"cc");
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
for e in &result.entries {
assert!(e.crc_ok);
}
let hdr = read_header(&buf, 0);
buf[24..32].copy_from_slice(&hdr.write_ptr.to_ne_bytes());
}
}
#[test]
fn stimulus_event_from_empty_payload() {
assert!(StimulusEvent::from_payload(&[]).is_none());
}
#[test]
fn stimulus_event_clone_preserves_fields() {
let event = StimulusEvent {
elapsed_ms: 999,
step_index: 7,
op_count: 3,
op_kinds: 0xF0,
cgroup_count: 5,
worker_count: 20,
total_iterations: 16,
};
let c = event.clone();
assert_eq!(c.elapsed_ms, 999);
assert_eq!(c.step_index, 7);
assert_eq!(c.op_count, 3);
assert_eq!(c.op_kinds, 0xF0);
assert_eq!(c.cgroup_count, 5);
assert_eq!(c.worker_count, 20);
assert_eq!(c.total_iterations, 16);
}
#[test]
fn shm_drain_result_default_empty() {
let r = ShmDrainResult::default();
assert!(r.entries.is_empty());
assert_eq!(r.drops, 0);
}
#[test]
fn write_exact_capacity_then_empty() {
let data_size = 64;
let shm_size = HEADER_SIZE + data_size;
let mut buf = make_ring(shm_size);
let payload_len = data_size - MSG_HEADER_SIZE;
let payload = vec![0x55u8; payload_len];
let written = shm_write(&mut buf, 0, 1, &payload);
assert_eq!(written, data_size);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert!(result.entries[0].crc_ok);
assert_eq!(result.entries[0].payload.len(), payload_len);
}
#[test]
fn write_ring_bytes_wraparound_exact() {
let data_start = HEADER_SIZE;
let capacity = 16;
let shm_size = HEADER_SIZE + capacity;
let mut buf = vec![0u8; shm_size];
let data = [1u8, 2, 3, 4, 5, 6, 7, 8];
write_ring_bytes(&mut buf, data_start, capacity, 12, &data);
assert_eq!(&buf[data_start + 12..data_start + 16], &[1, 2, 3, 4]);
assert_eq!(&buf[data_start..data_start + 4], &[5, 6, 7, 8]);
}
#[test]
fn read_ring_bytes_wraparound_exact() {
let data_start = HEADER_SIZE;
let capacity = 16;
let shm_size = HEADER_SIZE + capacity;
let mut buf = vec![0u8; shm_size];
buf[data_start + 14] = 0xAA;
buf[data_start + 15] = 0xBB;
buf[data_start] = 0xCC;
buf[data_start + 1] = 0xDD;
let out = read_ring_bytes(&buf, data_start, capacity, 14, 4);
assert_eq!(out, vec![0xAA, 0xBB, 0xCC, 0xDD]);
}
#[test]
fn stimulus_payload_as_bytes_roundtrip() {
let p = StimulusPayload {
elapsed_ms: u32::MAX,
step_index: u16::MAX,
op_count: u16::MAX,
op_kinds: u32::MAX,
cgroup_count: u16::MAX,
worker_count: u16::MAX,
total_iterations: u64::MAX,
};
let bytes = p.as_bytes();
let e = StimulusEvent::from_payload(bytes).unwrap();
assert_eq!(e.elapsed_ms, u32::MAX);
assert_eq!(e.step_index, u16::MAX);
assert_eq!(e.op_count, u16::MAX);
assert_eq!(e.op_kinds, u32::MAX);
assert_eq!(e.cgroup_count, u16::MAX);
assert_eq!(e.worker_count, u16::MAX);
assert_eq!(e.total_iterations, u64::MAX);
}
#[test]
fn multiple_writes_fill_and_drop() {
let shm_size = HEADER_SIZE + 80;
let mut buf = make_ring(shm_size);
assert_eq!(shm_write(&mut buf, 0, 1, &[0xAA; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 2, &[0xBB; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 3, &[0xCC; 8]), 24);
assert_eq!(shm_write(&mut buf, 0, 4, &[0xDD; 8]), 0);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 3);
assert_eq!(result.drops, 1);
}
#[test]
fn signal_guest_value_writes_to_correct_slot() {
let shm_base: u64 = 64;
let total: u64 = shm_base + 32;
let mut buf = vec![0u8; total as usize];
let mem = unsafe { crate::monitor::reader::GuestMem::new(buf.as_mut_ptr(), total) };
signal_guest_value(&mem, shm_base, 0, SIGNAL_SHUTDOWN_REQ);
signal_guest_value(&mem, shm_base, 1, SIGNAL_PROBES_READY);
assert_eq!(
buf[shm_base as usize + SIGNAL_SLOT_BASE],
SIGNAL_SHUTDOWN_REQ
);
assert_eq!(
buf[shm_base as usize + SIGNAL_SLOT_BASE + 1],
SIGNAL_PROBES_READY
);
}
#[test]
fn signal_guest_value_at_zero_shm_base() {
let mut buf = vec![0u8; 32];
let len = buf.len() as u64;
let mem = unsafe { crate::monitor::reader::GuestMem::new(buf.as_mut_ptr(), len) };
signal_guest_value(&mem, 0, 0, 0xAB);
signal_guest_value(&mem, 0, 1, 0xCD);
assert_eq!(buf[SIGNAL_SLOT_BASE], 0xAB);
assert_eq!(buf[SIGNAL_SLOT_BASE + 1], 0xCD);
}
#[test]
fn signal_guest_value_at_exact_boundary_succeeds() {
let shm_base: u64 = 0;
let total: u64 = (SIGNAL_SLOT_BASE + 2) as u64;
let mut buf = vec![0u8; total as usize];
let mem = unsafe { crate::monitor::reader::GuestMem::new(buf.as_mut_ptr(), total) };
signal_guest_value(&mem, shm_base, 1, 0xEE);
assert_eq!(buf[SIGNAL_SLOT_BASE + 1], 0xEE);
}
#[test]
fn signal_guest_value_past_boundary_is_noop() {
let total: u64 = 32;
let mut buf = vec![0xAAu8; (total + 64) as usize]; let mem = unsafe { crate::monitor::reader::GuestMem::new(buf.as_mut_ptr(), total) };
signal_guest_value(&mem, 32, 0, 0xFF);
assert!(buf.iter().all(|&b| b == 0xAA));
}
#[test]
fn signal_guest_value_offset_only_partially_in_bounds_is_noop() {
let total: u64 = (SIGNAL_SLOT_BASE + 1) as u64; let mut buf = vec![0u8; (total + 64) as usize];
let mem = unsafe { crate::monitor::reader::GuestMem::new(buf.as_mut_ptr(), total) };
signal_guest_value(&mem, 1, 0, 0x77);
assert!(buf.iter().all(|&b| b == 0));
}
#[test]
fn read_ring_volatile_routes_through_correct_region() {
use crate::monitor::reader::{GuestMem, MemRegion};
let mut buf0 = vec![0xAAu8; 4096];
let mut buf1 = vec![0xBBu8; 4096];
let pattern: [u8; 32] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, ];
buf1[0..32].copy_from_slice(&pattern);
let regions = vec![
MemRegion {
host_ptr: buf0.as_mut_ptr(),
offset: 0,
size: 4096,
},
MemRegion {
host_ptr: buf1.as_mut_ptr(),
offset: 1 << 20, size: 4096,
},
];
let mem = unsafe { GuestMem::from_regions_for_test(regions) };
let data_start_pa: u64 = 1 << 20;
let capacity: usize = 4096;
let mut out = vec![0u8; 32];
read_ring_volatile(&mem, data_start_pa, capacity, 0, &mut out);
assert_eq!(out, pattern);
assert!(!out.contains(&0xAA));
}
#[test]
fn read_ring_volatile_wraparound_routes_through_correct_region() {
use crate::monitor::reader::{GuestMem, MemRegion};
let mut buf0 = vec![0xAAu8; 4096];
let mut buf1 = vec![0u8; 4096];
for (i, slot) in buf1[..64].iter_mut().enumerate() {
*slot = i as u8;
}
let regions = vec![
MemRegion {
host_ptr: buf0.as_mut_ptr(),
offset: 0,
size: 4096,
},
MemRegion {
host_ptr: buf1.as_mut_ptr(),
offset: 1 << 20,
size: 4096,
},
];
let mem = unsafe { GuestMem::from_regions_for_test(regions) };
let data_start_pa: u64 = 1 << 20;
let capacity: usize = 64;
let mut out = vec![0u8; 64];
read_ring_volatile(&mem, data_start_pa, capacity, 60, &mut out);
assert_eq!(out[0..4], [60u8, 61, 62, 63]);
for (i, &b) in out[4..].iter().enumerate() {
assert_eq!(b, i as u8);
}
}
#[test]
fn shm_write_raw_round_trips_through_drain() {
let shm_size = 1024usize;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let payload = b"raw-ptr round trip";
unsafe {
shm_write_raw(buf.as_mut_ptr(), shm_size, MSG_TYPE_STIMULUS, payload);
}
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, MSG_TYPE_STIMULUS);
assert_eq!(result.entries[0].payload, payload);
assert!(result.entries[0].crc_ok);
assert_eq!(result.drops, 0);
}
#[test]
fn shm_write_raw_handles_wraparound() {
let shm_size = HEADER_SIZE + 48;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let capacity_u64 = (shm_size - HEADER_SIZE) as u64;
let pre = capacity_u64 - 8;
buf[16..24].copy_from_slice(&pre.to_ne_bytes());
buf[24..32].copy_from_slice(&pre.to_ne_bytes());
let payload = [0xAAu8, 0xBB, 0xCC, 0xDD];
unsafe {
shm_write_raw(buf.as_mut_ptr(), shm_size, 0xDEAD_BEEF, &payload);
}
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].msg_type, 0xDEAD_BEEF);
assert_eq!(result.entries[0].payload, payload);
assert!(result.entries[0].crc_ok);
}
#[test]
fn shm_write_raw_rejects_undersized_mapping() {
let mut buf = vec![0xAAu8; HEADER_SIZE - 1];
unsafe {
shm_write_raw(buf.as_mut_ptr(), buf.len(), 1, b"x");
}
assert!(buf.iter().all(|&b| b == 0xAA));
}
#[test]
fn shm_write_raw_rejects_torn_capacity() {
let shm_size = 1024usize;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
buf[8..12].copy_from_slice(&0u32.to_ne_bytes());
unsafe {
shm_write_raw(buf.as_mut_ptr(), shm_size, 1, b"probe");
}
assert_eq!(u64::from_ne_bytes(buf[16..24].try_into().unwrap()), 0);
}
#[test]
fn shm_write_raw_increments_drops_when_full() {
let shm_size = HEADER_SIZE + 32;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let big = vec![0xCCu8; 16]; unsafe {
shm_write_raw(buf.as_mut_ptr(), shm_size, 1, &big);
shm_write_raw(buf.as_mut_ptr(), shm_size, 2, b"x");
}
let drops = u64::from_ne_bytes(buf[32..40].try_into().unwrap());
assert_eq!(drops, 1);
}
#[test]
fn shm_drain_returns_empty_when_buf_smaller_than_header() {
let buf = vec![0u8; HEADER_SIZE - 1];
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
assert_eq!(result.drops, 0);
}
#[test]
fn shm_drain_returns_empty_when_offset_pushes_past_end() {
let buf = vec![0u8; 1024];
let result = shm_drain(&buf, 1024 - HEADER_SIZE + 1);
assert!(result.entries.is_empty());
}
#[test]
fn shm_drain_caps_torn_msg_length_against_capacity() {
let shm_size = 1024usize;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let data_start = HEADER_SIZE;
buf[data_start..data_start + 4].copy_from_slice(&1u32.to_ne_bytes());
buf[data_start + 4..data_start + 8].copy_from_slice(&u32::MAX.to_ne_bytes());
buf[data_start + 8..data_start + 12].copy_from_slice(&0u32.to_ne_bytes());
buf[data_start + 12..data_start + 16].copy_from_slice(&0u32.to_ne_bytes());
buf[16..24].copy_from_slice(&64u64.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn shm_drain_handles_read_pos_near_u64_max() {
let shm_size = HEADER_SIZE + 64;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let _ = shm_write(&mut buf, 0, 1, b"abcd"); let new_write: u64 = 5;
let new_read: u64 = u64::MAX - 14;
buf[16..24].copy_from_slice(&new_write.to_ne_bytes());
buf[24..32].copy_from_slice(&new_read.to_ne_bytes());
assert_eq!(new_write.wrapping_sub(new_read), 20);
let result = shm_drain(&buf, 0);
assert!(result.entries.len() <= 1);
}
#[test]
fn shm_drain_rejects_used_greater_than_capacity() {
let shm_size = 1024usize;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let bogus_write: u64 = 0;
let bogus_read: u64 = 100;
buf[16..24].copy_from_slice(&bogus_write.to_ne_bytes());
buf[24..32].copy_from_slice(&bogus_read.to_ne_bytes());
assert!(bogus_write.wrapping_sub(bogus_read) > (shm_size - HEADER_SIZE) as u64);
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn shm_drain_rejects_used_one_past_capacity() {
let shm_size = HEADER_SIZE + 64;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let new_write: u64 = 65;
let new_read: u64 = 0;
buf[16..24].copy_from_slice(&new_write.to_ne_bytes());
buf[24..32].copy_from_slice(&new_read.to_ne_bytes());
let result = shm_drain(&buf, 0);
assert!(result.entries.is_empty());
}
#[test]
fn shm_drain_accepts_used_exactly_capacity() {
let shm_size = HEADER_SIZE + 32;
let mut buf = vec![0u8; shm_size];
shm_init(&mut buf, 0, shm_size);
let payload = vec![0xCCu8; 16]; let written = shm_write(&mut buf, 0, 1, &payload);
assert_eq!(written, 32);
let result = shm_drain(&buf, 0);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].payload, payload);
}
}