use std::sync::Arc;
use zerocopy::FromBytes;
use super::wire::{FRAME_HEADER_SIZE, ShmMessage};
pub const MAX_BULK_FRAME_PAYLOAD: u32 = 256 * 1024;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct BulkMessage {
pub msg_type: u32,
pub payload: Arc<[u8]>,
pub crc_ok: bool,
}
#[derive(Debug, Default)]
pub struct BulkMessages {
pub messages: Vec<BulkMessage>,
}
#[derive(Debug, Default)]
pub struct HostAssembler {
buf: Vec<u8>,
}
impl HostAssembler {
pub fn new() -> Self {
Self::default()
}
pub fn feed(&mut self, bytes: &[u8]) -> BulkMessages {
if !bytes.is_empty() {
self.buf.extend_from_slice(bytes);
}
let mut out: Vec<BulkMessage> = Vec::new();
let mut consumed = 0usize;
let mut resync = false;
while consumed.saturating_add(FRAME_HEADER_SIZE) <= self.buf.len() {
let hdr_end = consumed + FRAME_HEADER_SIZE;
let hdr_slice = &self.buf[consumed..hdr_end];
let Ok(frame) = ShmMessage::read_from_bytes(hdr_slice) else {
break;
};
let payload_len = frame.length as usize;
let payload_end = hdr_end.saturating_add(payload_len);
if payload_end > self.buf.len() {
break;
}
if frame.length > MAX_BULK_FRAME_PAYLOAD {
tracing::warn!(
msg_type = frame.msg_type,
length = frame.length,
cap = MAX_BULK_FRAME_PAYLOAD,
"bulk assembler: dropping oversized frame; resyncing"
);
resync = true;
break;
}
let payload: Arc<[u8]> = Arc::from(&self.buf[hdr_end..payload_end]);
let computed = crc32fast::hash(&payload);
let crc_ok = computed == frame.crc32;
if !crc_ok {
tracing::warn!(
msg_type = frame.msg_type,
length = frame.length,
expected_crc = frame.crc32,
computed_crc = computed,
"bulk assembler: per-frame CRC mismatch; surfacing crc_ok=false"
);
}
out.push(BulkMessage {
msg_type: frame.msg_type,
payload,
crc_ok,
});
consumed = payload_end;
}
if resync {
self.buf.clear();
} else if consumed > 0 {
self.buf.drain(..consumed);
}
if self.buf.len() > 2 * MAX_BULK_FRAME_PAYLOAD as usize {
tracing::warn!(
pending = self.buf.len(),
cap = 2 * MAX_BULK_FRAME_PAYLOAD as usize,
"bulk assembler: pending buffer exceeded 2× per-frame cap; \
resyncing to prevent unbounded growth"
);
self.buf.clear();
}
BulkMessages { messages: out }
}
#[cfg(test)]
pub fn pending(&self) -> usize {
self.buf.len()
}
pub fn take_residual(&mut self) -> Vec<u8> {
std::mem::take(&mut self.buf)
}
}
#[cfg(test)]
mod tests {
use super::super::wire::{MSG_TYPE_EXIT, MSG_TYPE_STIMULUS};
use super::*;
use zerocopy::IntoBytes;
fn frame_bytes(msg_type: u32, payload: &[u8]) -> Vec<u8> {
let f = ShmMessage {
msg_type,
length: payload.len() as u32,
crc32: crc32fast::hash(payload),
_pad: 0,
};
let mut v = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
v.extend_from_slice(f.as_bytes());
v.extend_from_slice(payload);
v
}
#[test]
fn single_frame_one_feed() {
let mut a = HostAssembler::new();
let bytes = frame_bytes(MSG_TYPE_EXIT, &42i32.to_le_bytes());
let r = a.feed(&bytes);
assert_eq!(r.messages.len(), 1);
assert_eq!(r.messages[0].msg_type, MSG_TYPE_EXIT);
assert!(r.messages[0].crc_ok);
assert_eq!(a.pending(), 0);
}
#[test]
fn multiple_frames_one_feed_preserve_order() {
let mut a = HostAssembler::new();
let mut buf = Vec::new();
buf.extend(frame_bytes(MSG_TYPE_STIMULUS, b"first"));
buf.extend(frame_bytes(MSG_TYPE_EXIT, b"second"));
buf.extend(frame_bytes(MSG_TYPE_STIMULUS, b"third"));
let r = a.feed(&buf);
assert_eq!(r.messages.len(), 3);
assert_eq!(&*r.messages[0].payload, b"first");
assert_eq!(&*r.messages[1].payload, b"second");
assert_eq!(&*r.messages[2].payload, b"third");
}
#[test]
fn frame_split_across_two_feeds() {
let mut a = HostAssembler::new();
let bytes = frame_bytes(MSG_TYPE_EXIT, b"payload-data");
let split = FRAME_HEADER_SIZE + 3;
let r1 = a.feed(&bytes[..split]);
assert!(r1.messages.is_empty(), "partial frame must yield nothing");
assert_eq!(a.pending(), split);
let r2 = a.feed(&bytes[split..]);
assert_eq!(r2.messages.len(), 1, "completing bytes must yield 1 frame");
assert_eq!(&*r2.messages[0].payload, b"payload-data");
assert_eq!(a.pending(), 0);
}
#[test]
fn partial_header_buffered() {
let mut a = HostAssembler::new();
let r = a.feed(&[0xAA, 0xBB, 0xCC]);
assert!(r.messages.is_empty());
assert_eq!(a.pending(), 3);
}
#[test]
fn empty_feed_noop() {
let mut a = HostAssembler::new();
let r = a.feed(&[]);
assert!(r.messages.is_empty());
assert_eq!(a.pending(), 0);
}
#[test]
fn crc_mismatch_marks_entry_continues_walk() {
let mut a = HostAssembler::new();
let mut bad = frame_bytes(MSG_TYPE_EXIT, b"payload");
bad[FRAME_HEADER_SIZE] ^= 0xFF;
let mut good = frame_bytes(MSG_TYPE_STIMULUS, b"valid");
let mut combined = Vec::new();
combined.append(&mut bad);
combined.append(&mut good);
let r = a.feed(&combined);
assert_eq!(r.messages.len(), 2);
assert!(!r.messages[0].crc_ok);
assert!(r.messages[1].crc_ok);
}
#[test]
fn byte_by_byte_feed_reconstructs_frame() {
let mut a = HostAssembler::new();
let bytes = frame_bytes(MSG_TYPE_EXIT, b"hello-world");
let mut total = Vec::new();
for b in &bytes {
let r = a.feed(std::slice::from_ref(b));
total.extend(r.messages);
}
assert_eq!(total.len(), 1);
assert_eq!(&*total[0].payload, b"hello-world");
}
#[test]
fn zero_length_payload() {
let mut a = HostAssembler::new();
let bytes = frame_bytes(MSG_TYPE_EXIT, b"");
assert_eq!(bytes.len(), FRAME_HEADER_SIZE);
let r = a.feed(&bytes);
assert_eq!(r.messages.len(), 1);
assert!(r.messages[0].payload.is_empty());
assert!(r.messages[0].crc_ok);
}
#[test]
fn assembler_does_not_pre_allocate_for_enormous_frame_length() {
let mut a = HostAssembler::new();
let bad = ShmMessage {
msg_type: MSG_TYPE_EXIT,
length: u32::MAX,
crc32: 0,
_pad: 0,
};
let r = a.feed(bad.as_bytes());
assert!(
r.messages.is_empty(),
"header-only feed must not yield any message"
);
assert_eq!(
a.pending(),
FRAME_HEADER_SIZE,
"header bytes must remain buffered; cap check is deferred"
);
let mut blast = Vec::with_capacity(64 * 1024);
blast.resize(64 * 1024, 0xAAu8);
let _ = a.feed(&blast);
assert!(
a.buf.capacity() < 1024 * 1024,
"buffer capacity must not approach the announced 4 GiB length \
(saw {} bytes)",
a.buf.capacity()
);
}
#[test]
fn assembler_drops_oversized_frame_once_complete() {
let mut a = HostAssembler::new();
let oversized_len = MAX_BULK_FRAME_PAYLOAD + 1;
let bad = ShmMessage {
msg_type: MSG_TYPE_EXIT,
length: oversized_len,
crc32: 0,
_pad: 0,
};
let mut bytes = Vec::new();
bytes.extend_from_slice(bad.as_bytes());
bytes.resize(FRAME_HEADER_SIZE + oversized_len as usize, 0xCC);
bytes.extend_from_slice(b"residue");
let r = a.feed(&bytes);
assert!(
r.messages.is_empty(),
"oversized frame must not yield any message"
);
assert_eq!(
a.pending(),
0,
"resync must clear bogus header, payload, and residue"
);
}
#[test]
fn assembler_accepts_at_cap_rejects_above() {
let mut a = HostAssembler::new();
let max_payload = vec![0x55u8; MAX_BULK_FRAME_PAYLOAD as usize];
let at_cap = frame_bytes(MSG_TYPE_STIMULUS, &max_payload);
let r = a.feed(&at_cap);
assert_eq!(
r.messages.len(),
1,
"frame with length == cap must be accepted"
);
assert_eq!(r.messages[0].payload.len(), MAX_BULK_FRAME_PAYLOAD as usize);
assert!(r.messages[0].crc_ok);
let mut b = HostAssembler::new();
let over_payload = vec![0xAAu8; (MAX_BULK_FRAME_PAYLOAD + 1) as usize];
let over = ShmMessage {
msg_type: MSG_TYPE_STIMULUS,
length: MAX_BULK_FRAME_PAYLOAD + 1,
crc32: 0,
_pad: 0,
};
let mut over_bytes = Vec::new();
over_bytes.extend_from_slice(over.as_bytes());
over_bytes.extend_from_slice(&over_payload);
let r2 = b.feed(&over_bytes);
assert!(
r2.messages.is_empty(),
"frame with length == cap + 1 must be rejected"
);
assert_eq!(
b.pending(),
0,
"resync must clear the bogus frame after observing its full length"
);
}
#[test]
fn good_frame_then_oversized_incomplete_frame_returns_good() {
let mut a = HostAssembler::new();
let good = frame_bytes(MSG_TYPE_EXIT, b"valid");
let bad = ShmMessage {
msg_type: MSG_TYPE_STIMULUS,
length: u32::MAX,
crc32: 0,
_pad: 0,
};
let mut combined = Vec::new();
combined.extend_from_slice(&good);
combined.extend_from_slice(bad.as_bytes());
combined.extend_from_slice(b"residue-bytes");
let r = a.feed(&combined);
assert_eq!(r.messages.len(), 1, "valid frame must still be returned");
assert_eq!(&*r.messages[0].payload, b"valid");
assert!(r.messages[0].crc_ok);
assert_eq!(
a.pending(),
FRAME_HEADER_SIZE + b"residue-bytes".len(),
"bogus header + residue stay buffered until the announced length \
is observed"
);
assert!(
a.buf.capacity() < 1024 * 1024 * 1024,
"buffer capacity must not approach the announced 4 GiB length \
(saw {} bytes)",
a.buf.capacity()
);
}
#[test]
fn assembler_clears_buffer_when_residual_exceeds_cap() {
let mut a = HostAssembler::new();
let bad = ShmMessage {
msg_type: MSG_TYPE_STIMULUS,
length: u32::MAX,
crc32: 0,
_pad: 0,
};
let mut buf = Vec::new();
buf.extend_from_slice(bad.as_bytes());
let target_len = 2 * MAX_BULK_FRAME_PAYLOAD as usize + 1;
buf.resize(target_len, 0xCD);
let r = a.feed(&buf);
assert!(
r.messages.is_empty(),
"no message produced — payload incomplete from the parser's view"
);
assert_eq!(
a.pending(),
0,
"residual buffer cap must trigger a clear once the buffer \
exceeds 2× the per-frame cap"
);
}
#[test]
fn assembler_accepts_residual_at_2x_cap() {
let mut a = HostAssembler::new();
let bad = ShmMessage {
msg_type: MSG_TYPE_STIMULUS,
length: u32::MAX,
crc32: 0,
_pad: 0,
};
let total_residual = 2 * MAX_BULK_FRAME_PAYLOAD as usize;
let mut buf = Vec::new();
buf.extend_from_slice(bad.as_bytes());
buf.resize(total_residual, 0xCE);
let r = a.feed(&buf);
assert!(
r.messages.is_empty(),
"no message produced — parser stalls on incomplete u32::MAX payload"
);
assert_eq!(
a.pending(),
total_residual,
"residual at exactly 2× cap must NOT trigger the resync clear"
);
}
#[test]
fn payload_is_arc_slice() {
let mut a = HostAssembler::new();
let bytes = frame_bytes(MSG_TYPE_EXIT, b"arc-payload");
let r = a.feed(&bytes);
assert_eq!(r.messages.len(), 1);
let m0 = &r.messages[0];
let cloned: Arc<[u8]> = m0.payload.clone();
assert!(
Arc::ptr_eq(&m0.payload, &cloned),
"cloning Arc<[u8]> must share the underlying allocation, \
not deep-copy the bytes"
);
assert_eq!(&*cloned, b"arc-payload");
}
}