use crate::{
attach_shared_consumer, build_shared_single_producer, lock_free::SharedCursor,
MultiProcessError, MultiProcessResult, RequiredConsumerError, RequiredConsumerLivenessConfig,
SharedConsumer, SharedProducer,
};
use disruptor_mp::{AutoWaitStrategy, MmapConsumer, MmapProducer, MmapTransportLayout};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
pub const DEFAULT_TRANSPORT_PREFIX: &str = "vmy";
pub const SEGMENT_NAME_LIMIT_MACOS: usize = 14;
pub const DEFAULT_MYELON_RPC_DEPTH: usize = 1024;
pub const DEFAULT_MYELON_RESPONSE_DEPTH: usize = 256;
const DARWIN_SHARED_MEMORY_OBJECT_LIMIT: usize = 31;
const PRODUCER_SEQUENCE_SUFFIX: &str = "_producer_seq";
static NEXT_MESSAGE_ID: AtomicU32 = AtomicU32::new(1);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransportError {
message: String,
}
impl TransportError {
fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl Display for TransportError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.message, f)
}
}
impl Error for TransportError {}
pub type TransportResult<T> = Result<T, TransportError>;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum MyelonWaitStrategy {
BusySpin,
#[default]
Block,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MyelonTransportLayout {
transport_prefix: String,
session_tag: String,
rpc_ring_name: String,
response_ring_names: Vec<String>,
rpc_depth: usize,
response_depth: usize,
}
impl MyelonTransportLayout {
pub fn for_session(
session_label: &str,
runner_count: usize,
rpc_depth: usize,
response_depth: usize,
) -> TransportResult<Self> {
Self::for_session_with_prefix(
DEFAULT_TRANSPORT_PREFIX,
session_label,
runner_count,
rpc_depth,
response_depth,
)
}
pub fn for_session_with_prefix(
transport_prefix: &str,
session_label: &str,
runner_count: usize,
rpc_depth: usize,
response_depth: usize,
) -> TransportResult<Self> {
if transport_prefix.is_empty() {
return Err(TransportError::new("transport_prefix must not be empty"));
}
if !transport_prefix
.chars()
.all(|ch| ch.is_ascii_alphanumeric())
{
return Err(TransportError::new(format!(
"transport_prefix '{transport_prefix}' must be ASCII alphanumeric only"
)));
}
if runner_count == 0 {
return Err(TransportError::new(
"runner_count must be greater than zero",
));
}
if rpc_depth == 0 {
return Err(TransportError::new("rpc_depth must be greater than zero"));
}
if response_depth == 0 {
return Err(TransportError::new(
"response_depth must be greater than zero",
));
}
let session_tag = compact_session_tag(session_label);
let rpc_ring_name = format!("{transport_prefix}{session_tag}p");
validate_segment_name(&rpc_ring_name)?;
let mut response_ring_names = Vec::with_capacity(runner_count);
for rank in 0..runner_count {
let response_ring_name = format!("{transport_prefix}{session_tag}r{rank:x}");
validate_segment_name(&response_ring_name)?;
response_ring_names.push(response_ring_name);
}
Ok(Self {
transport_prefix: transport_prefix.to_string(),
session_tag,
rpc_ring_name,
response_ring_names,
rpc_depth,
response_depth,
})
}
pub fn transport_prefix(&self) -> &str {
&self.transport_prefix
}
pub fn session_tag(&self) -> &str {
&self.session_tag
}
pub fn rpc_ring_name(&self) -> &str {
&self.rpc_ring_name
}
pub fn rpc_depth(&self) -> usize {
self.rpc_depth
}
pub fn response_depth(&self) -> usize {
self.response_depth
}
pub fn runner_count(&self) -> usize {
self.response_ring_names.len()
}
pub fn response_ring_name(&self, rank: usize) -> TransportResult<&str> {
self.response_ring_names
.get(rank)
.map(String::as_str)
.ok_or_else(|| {
TransportError::new(format!(
"rank {rank} is out of range for {}",
self.runner_count()
))
})
}
pub fn rpc_mmap_layout(
&self,
root_dir: impl Into<PathBuf>,
) -> MultiProcessResult<MmapTransportLayout> {
MmapTransportLayout::new(root_dir, self.rpc_ring_name().to_string())
}
pub fn response_mmap_layout(
&self,
root_dir: impl Into<PathBuf>,
rank: usize,
) -> MultiProcessResult<MmapTransportLayout> {
let response_ring_name = self
.response_ring_name(rank)
.map_err(|error| MultiProcessError::SharedMemoryError(error.to_string()))?;
MmapTransportLayout::new(root_dir, response_ring_name.to_string())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunnerMyelonTransportConfig {
pub rpc_ring_name: String,
pub rpc_depth: usize,
pub response_ring_name: String,
pub response_depth: usize,
pub wait_strategy: MyelonWaitStrategy,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MyelonTransportConfig {
pub rpc_depth: usize,
pub response_depth: usize,
pub wait_strategy: MyelonWaitStrategy,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct FrameMeta<'a> {
pub len: usize,
pub kind: u8,
pub flags: u8,
pub msg_id: u32,
pub timestamp_ns: Option<u64>,
pub data: &'a [u8],
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ReceivedMessageMeta {
pub kind: u8,
pub msg_id: u32,
pub sent_timestamp_ns: Option<u64>,
pub received_timestamp_ns: u64,
}
impl ReceivedMessageMeta {
pub fn one_way_latency_ns(&self) -> Option<u64> {
self.sent_timestamp_ns
.filter(|sent| self.received_timestamp_ns > *sent)
.map(|sent| self.received_timestamp_ns - sent)
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct FixedFrame<const DATA_BYTES: usize> {
pub len: u32,
pub kind: u8,
pub flags: u8,
pub msg_id: u32,
pub data: [u8; DATA_BYTES],
}
impl<const DATA_BYTES: usize> Default for FixedFrame<DATA_BYTES> {
fn default() -> Self {
Self {
len: 0,
kind: 0,
flags: 0,
msg_id: 0,
data: [0u8; DATA_BYTES],
}
}
}
#[repr(C, align(16))]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct AlignedPayload<const DATA_BYTES: usize> {
pub bytes: [u8; DATA_BYTES],
}
impl<const DATA_BYTES: usize> Default for AlignedPayload<DATA_BYTES> {
fn default() -> Self {
Self {
bytes: [0u8; DATA_BYTES],
}
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct AlignedFixedFrame<const DATA_BYTES: usize> {
pub len: u32,
pub kind: u8,
pub flags: u8,
pub reserved: [u8; 2],
pub msg_id: u32,
pub padding: [u8; 4],
pub data: AlignedPayload<DATA_BYTES>,
}
impl<const DATA_BYTES: usize> Default for AlignedFixedFrame<DATA_BYTES> {
fn default() -> Self {
Self {
len: 0,
kind: 0,
flags: 0,
reserved: [0u8; 2],
msg_id: 0,
padding: [0u8; 4],
data: AlignedPayload::default(),
}
}
}
impl<const DATA_BYTES: usize> FramedTransportFrame for AlignedFixedFrame<DATA_BYTES> {
fn payload_capacity() -> usize {
DATA_BYTES
}
fn frame_meta(&self) -> FrameMeta<'_> {
FrameMeta {
len: self.len as usize,
kind: self.kind,
flags: self.flags,
msg_id: self.msg_id,
timestamp_ns: None,
data: &self.data.bytes[..self.len as usize],
}
}
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
assert!(
payload.len() <= DATA_BYTES,
"payload len {} exceeds frame capacity {}",
payload.len(),
DATA_BYTES
);
self.len = payload.len() as u32;
self.kind = kind;
self.flags = flags;
self.msg_id = msg_id;
self.data.bytes[..payload.len()].copy_from_slice(payload);
}
}
impl<const DATA_BYTES: usize> FramedTransportFrame for FixedFrame<DATA_BYTES> {
fn payload_capacity() -> usize {
DATA_BYTES
}
fn frame_meta(&self) -> FrameMeta<'_> {
FrameMeta {
len: self.len as usize,
kind: self.kind,
flags: self.flags,
msg_id: self.msg_id,
timestamp_ns: None,
data: &self.data[..self.len as usize],
}
}
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
assert!(
payload.len() <= DATA_BYTES,
"payload len {} exceeds frame capacity {}",
payload.len(),
DATA_BYTES
);
self.len = payload.len() as u32;
self.kind = kind;
self.flags = flags;
self.msg_id = msg_id;
self.data[..payload.len()].copy_from_slice(payload);
}
}
pub trait FramedTransportFrame: Copy + Default + 'static {
fn payload_capacity() -> usize;
fn frame_meta(&self) -> FrameMeta<'_>;
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8);
}
pub struct FramedTransportProducer<T>
where
T: FramedTransportFrame,
{
_coordination_cursor: SharedCursor,
inner: SharedProducer<T>,
}
impl<T> FramedTransportProducer<T>
where
T: FramedTransportFrame,
{
pub fn create(name: &str, depth: usize) -> MultiProcessResult<Self> {
Self::create_with_consumers(name, depth, 1)
}
pub fn create_with_consumers(
name: &str,
depth: usize,
max_consumers: usize,
) -> MultiProcessResult<Self> {
let coordination_cursor = ensure_coordination_cursor(name)?;
let inner = build_shared_single_producer::<T>(name, depth)
.enable_discovery(max_consumers)
.with_coordination(disruptor_mp::CoordinationMode::Immediate)
.build_producer(T::default)?;
Ok(Self {
_coordination_cursor: coordination_cursor,
inner,
})
}
pub fn discover_consumers(&mut self, timeout: std::time::Duration) -> bool {
let deadline = std::time::Instant::now() + timeout;
loop {
if self.inner.get_consumer_count() > 0 {
return true;
}
if std::time::Instant::now() >= deadline {
return false;
}
disruptor_mp::perform_default_discovery_poll_wait();
}
}
pub fn discover_consumer_id(
&mut self,
consumer_id: &str,
timeout: std::time::Duration,
) -> bool {
self.inner.wait_for_consumer_id(consumer_id, timeout)
}
pub fn enable_required_consumer_liveness(
&mut self,
config: RequiredConsumerLivenessConfig,
) -> &mut Self {
self.inner.enable_required_consumer_liveness(config);
self
}
#[inline(always)]
pub fn publish(&mut self, payload: &[u8], kind: u8) {
publish_framed_payload_in_place(
payload,
kind,
T::payload_capacity(),
|chunk, chunk_kind, msg_id, flags| {
self.inner.publish(|slot| {
slot.write_frame(chunk, chunk_kind, msg_id, flags);
});
},
);
}
pub fn publish_managed(
&mut self,
payload: &[u8],
kind: u8,
) -> Result<(), RequiredConsumerError> {
publish_framed_payload_result_in_place(
payload,
kind,
T::payload_capacity(),
|chunk, chunk_kind, msg_id, flags| {
self.inner
.publish_managed(|slot| {
slot.write_frame(chunk, chunk_kind, msg_id, flags);
})
.map(|_| ())
},
)
}
}
pub struct FramedTransportConsumer<T>
where
T: FramedTransportFrame,
{
inner: SharedConsumer<T>,
wait_strategy: MyelonWaitStrategy,
}
impl<T> FramedTransportConsumer<T>
where
T: FramedTransportFrame,
{
pub fn attach(
name: &str,
depth: usize,
wait_strategy: MyelonWaitStrategy,
) -> MultiProcessResult<Self> {
let inner = attach_shared_consumer::<T>(name, depth).build_consumer()?;
Ok(Self {
inner,
wait_strategy,
})
}
pub fn attach_with_consumer_id(
name: &str,
depth: usize,
consumer_id: &str,
wait_strategy: MyelonWaitStrategy,
) -> MultiProcessResult<Self> {
let inner = attach_shared_consumer::<T>(name, depth)
.with_consumer_id(consumer_id)
.build_consumer()?;
Ok(Self {
inner,
wait_strategy,
})
}
pub fn has_coordination_support(&self) -> bool {
self.inner.has_coordination_support()
}
pub fn recv_message_blocking_owned(&mut self) -> (u8, Vec<u8>) {
recv_framed_message(
|| match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next(),
MyelonWaitStrategy::Block => self.inner.consume_next_with_sleep(),
},
|frame| frame.frame_meta(),
)
}
pub fn recv_message_blocking_owned_with_meta(&mut self) -> (ReceivedMessageMeta, Vec<u8>) {
recv_framed_message_with_meta(
|| match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next(),
MyelonWaitStrategy::Block => self.inner.consume_next_with_sleep(),
},
|frame| frame.frame_meta(),
)
}
pub fn recv_message_blocking(&mut self) -> (u8, Vec<u8>) {
self.recv_message_blocking_owned()
}
pub fn recv_message_blocking_with_meta(&mut self) -> (ReceivedMessageMeta, Vec<u8>) {
self.recv_message_blocking_owned_with_meta()
}
pub fn recv_message_blocking_leased<R>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
handler: impl FnOnce(u8, &[u8]) -> R,
) -> R {
self.recv_message_blocking_leased_with_meta(reassembly_buf, |meta, bytes| {
handler(meta.kind, bytes)
})
}
pub fn recv_message_blocking_leased_with_meta<R>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
handler: impl FnOnce(ReceivedMessageMeta, &[u8]) -> R,
) -> R {
let (mut message_meta, msg_id, first_was_last) = {
let first_frame = match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next_leased(),
MyelonWaitStrategy::Block => self.inner.consume_next_leased_with_sleep(),
};
let first = first_frame.frame_meta();
let mut message_meta = ReceivedMessageMeta {
kind: first.kind,
msg_id: first.msg_id,
sent_timestamp_ns: first.timestamp_ns,
received_timestamp_ns: 0,
};
if is_single_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, first.data);
}
reassembly_buf.start(
first.msg_id,
first.kind,
first.data,
first.len,
first.timestamp_ns,
);
(message_meta, first.msg_id, is_last_frame(first.flags))
};
if first_was_last {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
loop {
let frame_lease = match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next_leased(),
MyelonWaitStrategy::Block => self.inner.consume_next_leased_with_sleep(),
};
let frame = frame_lease.frame_meta();
if frame.msg_id != msg_id {
continue;
}
reassembly_buf.append(frame.data);
if is_last_frame(frame.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
}
}
#[inline(always)]
pub fn process_available_messages<F>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
mut handler: F,
) -> usize
where
F: FnMut(u8, &[u8]),
{
let mut delivered = 0usize;
self.inner.process_available(|frame, _seq| {
let meta = frame.frame_meta();
if is_single_frame(meta.flags) {
handler(meta.kind, meta.data);
delivered += 1;
return;
}
#[cfg(dst)]
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("msg_id={} len={}", meta.msg_id, meta.len),
);
if meta.flags & 0x01 != 0 {
reassembly_buf.start(
meta.msg_id,
meta.kind,
meta.data,
meta.len,
meta.timestamp_ns,
);
} else if reassembly_buf.msg_id == meta.msg_id {
reassembly_buf.append(meta.data);
}
if is_last_frame(meta.flags) && reassembly_buf.msg_id == meta.msg_id {
handler(reassembly_buf.kind, reassembly_buf.bytes());
reassembly_buf.reset();
delivered += 1;
}
});
delivered
}
#[inline(always)]
pub fn process_available_messages_with_meta<F>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
mut handler: F,
) -> usize
where
F: FnMut(ReceivedMessageMeta, &[u8]),
{
let mut delivered = 0usize;
self.inner.process_available(|frame, _seq| {
let meta = frame.frame_meta();
if is_single_frame(meta.flags) {
handler(
ReceivedMessageMeta {
kind: meta.kind,
msg_id: meta.msg_id,
sent_timestamp_ns: meta.timestamp_ns,
received_timestamp_ns: wall_clock_ns(),
},
meta.data,
);
delivered += 1;
return;
}
#[cfg(dst)]
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("msg_id={} len={}", meta.msg_id, meta.len),
);
if meta.flags & 0x01 != 0 {
reassembly_buf.start(
meta.msg_id,
meta.kind,
meta.data,
meta.len,
meta.timestamp_ns,
);
} else if reassembly_buf.msg_id == meta.msg_id {
reassembly_buf.append(meta.data);
}
if is_last_frame(meta.flags) && reassembly_buf.msg_id == meta.msg_id {
handler(
ReceivedMessageMeta {
kind: reassembly_buf.kind,
msg_id: reassembly_buf.msg_id,
sent_timestamp_ns: reassembly_buf.sent_timestamp_ns,
received_timestamp_ns: wall_clock_ns(),
},
reassembly_buf.bytes(),
);
reassembly_buf.reset();
delivered += 1;
}
});
delivered
}
}
#[repr(C, align(16))]
#[derive(Clone, Copy, Default)]
struct ReassemblyWord([u8; 16]);
pub struct ReassemblyBuffer {
buf: Vec<ReassemblyWord>,
len: usize,
msg_id: u32,
kind: u8,
sent_timestamp_ns: Option<u64>,
}
impl ReassemblyBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buf: Vec::with_capacity(Self::word_capacity(capacity)),
len: 0,
msg_id: 0,
kind: 0,
sent_timestamp_ns: None,
}
}
fn start(
&mut self,
msg_id: u32,
kind: u8,
first_data: &[u8],
total_len: usize,
sent_timestamp_ns: Option<u64>,
) {
let needed_len = total_len.max(first_data.len());
self.ensure_capacity(needed_len);
self.as_mut_bytes()[..first_data.len()].copy_from_slice(first_data);
self.len = first_data.len();
self.msg_id = msg_id;
self.kind = kind;
self.sent_timestamp_ns = sent_timestamp_ns;
}
fn append(&mut self, data: &[u8]) {
let start = self.len;
let end = start + data.len();
self.ensure_capacity(end);
self.as_mut_bytes()[start..end].copy_from_slice(data);
self.len = end;
}
fn reset(&mut self) {
self.len = 0;
self.msg_id = 0;
self.sent_timestamp_ns = None;
}
fn bytes(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.buf.as_ptr() as *const u8, self.len) }
}
fn as_mut_bytes(&mut self) -> &mut [u8] {
unsafe {
std::slice::from_raw_parts_mut(
self.buf.as_mut_ptr() as *mut u8,
self.buf.len() * std::mem::size_of::<ReassemblyWord>(),
)
}
}
fn ensure_capacity(&mut self, needed_bytes: usize) {
let needed_words = Self::word_capacity(needed_bytes);
if self.buf.len() < needed_words {
self.buf.resize(needed_words, ReassemblyWord::default());
}
}
fn word_capacity(byte_capacity: usize) -> usize {
byte_capacity.saturating_add(std::mem::size_of::<ReassemblyWord>() - 1)
/ std::mem::size_of::<ReassemblyWord>()
}
}
pub struct MmapFramedTransportProducer<T>
where
T: FramedTransportFrame,
{
inner: MmapProducer<T>,
}
impl<T> MmapFramedTransportProducer<T>
where
T: FramedTransportFrame,
{
pub fn create(layout: MmapTransportLayout, depth: usize) -> MultiProcessResult<Self> {
layout.ensure_directories()?;
let inner = MmapProducer::create(layout, depth, T::default)?;
Ok(Self { inner })
}
#[inline(always)]
pub fn publish(&mut self, payload: &[u8], kind: u8) {
publish_framed_payload_in_place(
payload,
kind,
T::payload_capacity(),
|chunk, chunk_kind, msg_id, flags| {
self.inner.publish(|slot| {
slot.write_frame(chunk, chunk_kind, msg_id, flags);
});
},
);
}
pub fn enable_required_consumer_liveness(
&mut self,
config: RequiredConsumerLivenessConfig,
) -> &mut Self {
self.inner.enable_required_consumer_liveness(config);
self
}
pub fn discover_consumer_id(&mut self, consumer_id: &str, timeout: Duration) -> bool {
self.inner.wait_for_consumer_id(consumer_id, timeout)
}
pub fn publish_managed(
&mut self,
payload: &[u8],
kind: u8,
) -> Result<(), RequiredConsumerError> {
publish_framed_payload_result_in_place(
payload,
kind,
T::payload_capacity(),
|chunk, chunk_kind, msg_id, flags| {
self.inner
.publish_managed(|slot| {
slot.write_frame(chunk, chunk_kind, msg_id, flags);
})
.map(|_| ())
},
)
}
pub fn wait_for_consumers_ready(&self, min_consumers: i64, timeout: Duration) -> bool {
self.inner.wait_for_consumers_ready(min_consumers, timeout)
}
pub fn wait_until_consumed(
&mut self,
sequence: i64,
timeout: Duration,
strategy: AutoWaitStrategy,
) -> bool {
self.inner
.wait_until_consumed_with_strategy(sequence, timeout, strategy)
}
pub fn raw(&mut self) -> &mut MmapProducer<T> {
&mut self.inner
}
}
pub struct MmapFramedTransportConsumer<T>
where
T: FramedTransportFrame,
{
inner: MmapConsumer<T>,
wait_strategy: MyelonWaitStrategy,
}
impl<T> MmapFramedTransportConsumer<T>
where
T: FramedTransportFrame,
{
pub fn attach(
layout: MmapTransportLayout,
depth: usize,
consumer_id: &str,
wait_strategy: MyelonWaitStrategy,
) -> MultiProcessResult<Self> {
let inner = MmapConsumer::attach(layout, depth, consumer_id)?;
Ok(Self {
inner,
wait_strategy,
})
}
pub fn has_coordination_support(&self) -> bool {
self.inner.has_coordination_support()
}
pub fn recv_message_blocking_owned(&mut self) -> (u8, Vec<u8>) {
recv_framed_message(
|| match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next(),
MyelonWaitStrategy::Block => self.inner.consume_next_with_sleep(),
},
|frame| frame.frame_meta(),
)
}
pub fn recv_message_blocking_owned_with_meta(&mut self) -> (ReceivedMessageMeta, Vec<u8>) {
recv_framed_message_with_meta(
|| match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next(),
MyelonWaitStrategy::Block => self.inner.consume_next_with_sleep(),
},
|frame| frame.frame_meta(),
)
}
pub fn recv_message_blocking(&mut self) -> (u8, Vec<u8>) {
self.recv_message_blocking_owned()
}
pub fn recv_message_blocking_with_meta(&mut self) -> (ReceivedMessageMeta, Vec<u8>) {
self.recv_message_blocking_owned_with_meta()
}
pub fn recv_message_blocking_leased<R>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
handler: impl FnOnce(u8, &[u8]) -> R,
) -> R {
self.recv_message_blocking_leased_with_meta(reassembly_buf, |meta, bytes| {
handler(meta.kind, bytes)
})
}
pub fn recv_message_blocking_leased_with_meta<R>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
handler: impl FnOnce(ReceivedMessageMeta, &[u8]) -> R,
) -> R {
let (mut message_meta, msg_id, first_was_last) = {
let first_frame = match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next_leased(),
MyelonWaitStrategy::Block => self.inner.consume_next_leased_with_sleep(),
};
let first = first_frame.frame_meta();
let mut message_meta = ReceivedMessageMeta {
kind: first.kind,
msg_id: first.msg_id,
sent_timestamp_ns: first.timestamp_ns,
received_timestamp_ns: 0,
};
if is_single_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, first.data);
}
reassembly_buf.start(
first.msg_id,
first.kind,
first.data,
first.len,
first.timestamp_ns,
);
(message_meta, first.msg_id, is_last_frame(first.flags))
};
if first_was_last {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
loop {
let frame_lease = match self.wait_strategy {
MyelonWaitStrategy::BusySpin => self.inner.consume_next_leased(),
MyelonWaitStrategy::Block => self.inner.consume_next_leased_with_sleep(),
};
let frame = frame_lease.frame_meta();
if frame.msg_id != msg_id {
continue;
}
reassembly_buf.append(frame.data);
if is_last_frame(frame.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
}
}
pub fn raw(&mut self) -> &mut MmapConsumer<T> {
&mut self.inner
}
#[inline(always)]
pub fn process_available_messages<F>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
mut handler: F,
) -> usize
where
F: FnMut(u8, &[u8]),
{
let mut delivered = 0usize;
self.inner.process_available(|frame, _seq| {
let meta = frame.frame_meta();
if is_single_frame(meta.flags) {
handler(meta.kind, meta.data);
delivered += 1;
return;
}
if meta.flags & 0x01 != 0 {
reassembly_buf.start(
meta.msg_id,
meta.kind,
meta.data,
meta.len,
meta.timestamp_ns,
);
} else if reassembly_buf.msg_id == meta.msg_id {
reassembly_buf.append(meta.data);
}
if is_last_frame(meta.flags) && reassembly_buf.msg_id == meta.msg_id {
handler(reassembly_buf.kind, reassembly_buf.bytes());
reassembly_buf.reset();
delivered += 1;
}
});
delivered
}
#[inline(always)]
pub fn process_available_messages_with_meta<F>(
&mut self,
reassembly_buf: &mut ReassemblyBuffer,
mut handler: F,
) -> usize
where
F: FnMut(ReceivedMessageMeta, &[u8]),
{
let mut delivered = 0usize;
self.inner.process_available(|frame, _seq| {
let meta = frame.frame_meta();
if is_single_frame(meta.flags) {
handler(
ReceivedMessageMeta {
kind: meta.kind,
msg_id: meta.msg_id,
sent_timestamp_ns: meta.timestamp_ns,
received_timestamp_ns: wall_clock_ns(),
},
meta.data,
);
delivered += 1;
return;
}
if meta.flags & 0x01 != 0 {
reassembly_buf.start(
meta.msg_id,
meta.kind,
meta.data,
meta.len,
meta.timestamp_ns,
);
} else if reassembly_buf.msg_id == meta.msg_id {
reassembly_buf.append(meta.data);
}
if is_last_frame(meta.flags) && reassembly_buf.msg_id == meta.msg_id {
handler(
ReceivedMessageMeta {
kind: reassembly_buf.kind,
msg_id: reassembly_buf.msg_id,
sent_timestamp_ns: reassembly_buf.sent_timestamp_ns,
received_timestamp_ns: wall_clock_ns(),
},
reassembly_buf.bytes(),
);
reassembly_buf.reset();
delivered += 1;
}
});
delivered
}
}
impl RunnerMyelonTransportConfig {
pub fn for_rank(
layout: &MyelonTransportLayout,
rank: usize,
wait_strategy: MyelonWaitStrategy,
) -> TransportResult<Self> {
Ok(Self {
rpc_ring_name: layout.rpc_ring_name().to_string(),
rpc_depth: layout.rpc_depth(),
response_ring_name: layout.response_ring_name(rank)?.to_string(),
response_depth: layout.response_depth(),
wait_strategy,
})
}
}
impl MyelonTransportConfig {
pub fn new(
rpc_depth: usize,
response_depth: usize,
wait_strategy: MyelonWaitStrategy,
) -> TransportResult<Self> {
if rpc_depth == 0 {
return Err(TransportError::new(
"myelon_rpc_depth must be greater than zero",
));
}
if response_depth == 0 {
return Err(TransportError::new(
"myelon_response_depth must be greater than zero",
));
}
Ok(Self {
rpc_depth,
response_depth,
wait_strategy,
})
}
pub fn resolve(
rpc_depth: Option<usize>,
response_depth: Option<usize>,
busy_spin: Option<bool>,
) -> TransportResult<Self> {
let wait_strategy = if busy_spin.unwrap_or(false) {
MyelonWaitStrategy::BusySpin
} else {
MyelonWaitStrategy::Block
};
Self::new(
rpc_depth.unwrap_or(DEFAULT_MYELON_RPC_DEPTH),
response_depth.unwrap_or(DEFAULT_MYELON_RESPONSE_DEPTH),
wait_strategy,
)
}
}
pub fn compact_session_tag(session_label: &str) -> String {
let filtered = session_label
.chars()
.filter(|ch| ch.is_ascii_alphanumeric())
.map(|ch| ch.to_ascii_lowercase())
.collect::<String>();
if filtered.len() >= 8 {
return filtered[filtered.len() - 8..].to_string();
}
let mut hasher = DefaultHasher::new();
session_label.hash(&mut hasher);
format!("{:08x}", hasher.finish() as u32)
}
pub fn validate_segment_name(name: &str) -> TransportResult<()> {
if name.is_empty() {
return Err(TransportError::new("segment name must not be empty"));
}
if name.len() > SEGMENT_NAME_LIMIT_MACOS {
return Err(TransportError::new(format!(
"segment name '{name}' exceeds conservative macOS limit {SEGMENT_NAME_LIMIT_MACOS}"
)));
}
if !name.chars().all(|ch| ch.is_ascii_alphanumeric()) {
return Err(TransportError::new(format!(
"segment name '{name}' must be ASCII alphanumeric only"
)));
}
if coordination_cursor_name(name).len() > DARWIN_SHARED_MEMORY_OBJECT_LIMIT
|| format!("{name}{PRODUCER_SEQUENCE_SUFFIX}").len() > DARWIN_SHARED_MEMORY_OBJECT_LIMIT
{
return Err(TransportError::new(format!(
"segment name '{name}' does not leave enough room for derived shared-memory suffixes on macOS"
)));
}
Ok(())
}
pub fn coordination_cursor_name(base_name: &str) -> String {
format!("{base_name}_cr")
}
pub fn ensure_coordination_cursor(base_name: &str) -> MultiProcessResult<SharedCursor> {
let cursor_name = coordination_cursor_name(base_name);
SharedCursor::new_or_attach(&cursor_name, 0).map_err(|error| {
MultiProcessError::SharedMemoryError(format!(
"failed to create coordination cursor '{cursor_name}': {error}"
))
})
}
fn wall_clock_ns() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time")
.as_nanos() as u64
}
pub fn recv_framed_message<F, T, M>(next_frame: F, meta: M) -> (u8, Vec<u8>)
where
F: FnMut() -> (i64, T),
M: for<'a> FnMut(&'a T) -> FrameMeta<'a>,
{
let (message_meta, payload) = recv_framed_message_with_meta(next_frame, meta);
(message_meta.kind, payload)
}
pub fn recv_framed_message_with_meta<F, T, M>(
mut next_frame: F,
mut meta: M,
) -> (ReceivedMessageMeta, Vec<u8>)
where
F: FnMut() -> (i64, T),
M: for<'a> FnMut(&'a T) -> FrameMeta<'a>,
{
let (_, first_frame) = next_frame();
let first = meta(&first_frame);
let mut message_meta = ReceivedMessageMeta {
kind: first.kind,
msg_id: first.msg_id,
sent_timestamp_ns: first.timestamp_ns,
received_timestamp_ns: 0,
};
if is_single_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return (message_meta, first.data.to_vec());
}
let mut payload = Vec::with_capacity(first.len.max(1024));
payload.extend_from_slice(first.data);
let msg_id = first.msg_id;
if is_last_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return (message_meta, payload);
}
loop {
let (_, frame) = next_frame();
let frame = meta(&frame);
if frame.msg_id != msg_id {
continue;
}
payload.extend_from_slice(frame.data);
if is_last_frame(frame.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return (message_meta, payload);
}
}
}
pub fn recv_framed_message_leased<F, L, T, M, R>(
next_frame: F,
reassembly_buf: &mut ReassemblyBuffer,
meta: M,
handler: impl FnOnce(u8, &[u8]) -> R,
) -> R
where
F: FnMut() -> L,
L: std::ops::Deref<Target = T>,
M: for<'a> FnMut(&'a T) -> FrameMeta<'a>,
{
recv_framed_message_leased_with_meta(next_frame, reassembly_buf, meta, |message_meta, bytes| {
handler(message_meta.kind, bytes)
})
}
pub fn recv_framed_message_leased_with_meta<F, L, T, M, R>(
mut next_frame: F,
reassembly_buf: &mut ReassemblyBuffer,
mut meta: M,
handler: impl FnOnce(ReceivedMessageMeta, &[u8]) -> R,
) -> R
where
F: FnMut() -> L,
L: std::ops::Deref<Target = T>,
M: for<'a> FnMut(&'a T) -> FrameMeta<'a>,
{
let first_frame = next_frame();
let first = meta(&*first_frame);
let mut message_meta = ReceivedMessageMeta {
kind: first.kind,
msg_id: first.msg_id,
sent_timestamp_ns: first.timestamp_ns,
received_timestamp_ns: 0,
};
if is_single_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, first.data);
}
reassembly_buf.start(
first.msg_id,
first.kind,
first.data,
first.len,
first.timestamp_ns,
);
let msg_id = first.msg_id;
if is_last_frame(first.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
loop {
let frame_lease = next_frame();
let frame = meta(&*frame_lease);
if frame.msg_id != msg_id {
continue;
}
reassembly_buf.append(frame.data);
if is_last_frame(frame.flags) {
message_meta.received_timestamp_ns = wall_clock_ns();
return handler(message_meta, reassembly_buf.bytes());
}
}
}
pub fn publish_framed_payload<T, P, W>(
payload: &[u8],
kind: u8,
chunk_size: usize,
mut write_frame: W,
mut make_frame: P,
) where
T: Copy,
P: FnMut(&[u8], u8, u32, u8) -> T,
W: FnMut(T),
{
let msg_id = NEXT_MESSAGE_ID.fetch_add(1, Ordering::Relaxed);
if payload.is_empty() {
let frame = make_frame(payload, kind, msg_id, frame_flags(true, true));
write_frame(frame);
return;
}
#[cfg(dst)]
if payload.len() > chunk_size {
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("payload_len={} chunk_size={chunk_size}", payload.len()),
);
}
for (index, chunk) in payload.chunks(chunk_size).enumerate() {
let last = (index + 1) * chunk_size >= payload.len();
let flags = frame_flags(index == 0, last);
let frame = make_frame(chunk, kind, msg_id, flags);
write_frame(frame);
}
}
pub fn publish_framed_payload_in_place<W>(
payload: &[u8],
kind: u8,
chunk_size: usize,
mut write_frame: W,
) where
W: FnMut(&[u8], u8, u32, u8),
{
let msg_id = NEXT_MESSAGE_ID.fetch_add(1, Ordering::Relaxed);
if payload.is_empty() {
write_frame(payload, kind, msg_id, frame_flags(true, true));
return;
}
#[cfg(dst)]
if payload.len() > chunk_size {
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("payload_len={} chunk_size={chunk_size}", payload.len()),
);
}
for (index, chunk) in payload.chunks(chunk_size).enumerate() {
let last = (index + 1) * chunk_size >= payload.len();
let flags = frame_flags(index == 0, last);
write_frame(chunk, kind, msg_id, flags);
}
}
pub fn publish_framed_payload_result<T, P, W, E>(
payload: &[u8],
kind: u8,
chunk_size: usize,
mut write_frame: W,
mut make_frame: P,
) -> Result<(), E>
where
T: Copy,
P: FnMut(&[u8], u8, u32, u8) -> T,
W: FnMut(T) -> Result<(), E>,
{
let msg_id = NEXT_MESSAGE_ID.fetch_add(1, Ordering::Relaxed);
if payload.is_empty() {
let frame = make_frame(payload, kind, msg_id, frame_flags(true, true));
write_frame(frame)?;
return Ok(());
}
#[cfg(dst)]
if payload.len() > chunk_size {
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("payload_len={} chunk_size={chunk_size}", payload.len()),
);
}
for (index, chunk) in payload.chunks(chunk_size).enumerate() {
let last = (index + 1) * chunk_size >= payload.len();
let flags = frame_flags(index == 0, last);
let frame = make_frame(chunk, kind, msg_id, flags);
write_frame(frame)?;
}
Ok(())
}
pub fn publish_framed_payload_result_in_place<W, E>(
payload: &[u8],
kind: u8,
chunk_size: usize,
mut write_frame: W,
) -> Result<(), E>
where
W: FnMut(&[u8], u8, u32, u8) -> Result<(), E>,
{
let msg_id = NEXT_MESSAGE_ID.fetch_add(1, Ordering::Relaxed);
if payload.is_empty() {
write_frame(payload, kind, msg_id, frame_flags(true, true))?;
return Ok(());
}
#[cfg(dst)]
if payload.len() > chunk_size {
disruptor_mp::dst::assert_sometimes(
true,
"fragmented message",
format!("payload_len={} chunk_size={chunk_size}", payload.len()),
);
}
for (index, chunk) in payload.chunks(chunk_size).enumerate() {
let last = (index + 1) * chunk_size >= payload.len();
let flags = frame_flags(index == 0, last);
write_frame(chunk, kind, msg_id, flags)?;
}
Ok(())
}
pub const fn frame_flags(is_first: bool, is_last: bool) -> u8 {
let mut flags = 0u8;
if is_first {
flags |= 0b01;
}
if is_last {
flags |= 0b10;
}
flags
}
pub const fn is_single_frame(flags: u8) -> bool {
flags & 0b11 == 0b11
}
pub const fn is_last_frame(flags: u8) -> bool {
flags & 0b10 != 0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::RequiredConsumerLivenessConfig;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{
atomic::{AtomicBool, Ordering as SyncOrdering},
Arc,
};
static UNIQUE_SUFFIX: AtomicU64 = AtomicU64::new(0);
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
struct RawFrame {
len: usize,
kind: u8,
flags: u8,
msg_id: u32,
data: [u8; 16],
}
impl FramedTransportFrame for RawFrame {
fn payload_capacity() -> usize {
16
}
fn frame_meta(&self) -> FrameMeta<'_> {
FrameMeta {
len: self.len,
kind: self.kind,
flags: self.flags,
msg_id: self.msg_id,
timestamp_ns: None,
data: &self.data[..self.len],
}
}
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
self.len = payload.len();
self.kind = kind;
self.flags = flags;
self.msg_id = msg_id;
self.data[..payload.len()].copy_from_slice(payload);
}
}
fn unique_ring_name(prefix: &str) -> String {
let prefix: String = prefix
.chars()
.filter(|ch| ch.is_ascii_alphanumeric())
.take(4)
.collect();
let pid = std::process::id() % 10_000;
let counter = UNIQUE_SUFFIX.fetch_add(1, Ordering::Relaxed) % 100;
let name = format!("{prefix}{pid:04}{counter:02}");
assert!(
name.len() <= SEGMENT_NAME_LIMIT_MACOS,
"ring name exceeds macOS budget: {name}"
);
name
}
fn test_liveness_config() -> RequiredConsumerLivenessConfig {
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(std::time::Duration::from_millis(100))
.with_progress_timeout(std::time::Duration::from_millis(20))
.with_progress_check_interval(std::time::Duration::from_millis(1))
.with_shutdown_grace_period(std::time::Duration::from_millis(200))
}
#[test]
fn compact_session_tag_is_stable_and_short() {
let tag = compact_session_tag("4c2f1cb7-50fa-4edf-bd83-very-long-session-name");
assert_eq!(tag.len(), 8);
assert!(tag.chars().all(|ch| ch.is_ascii_alphanumeric()));
}
#[test]
fn transport_layout_uses_portable_names() {
let layout = MyelonTransportLayout::for_session_with_prefix(
"vmy",
"4c2f1cb7-50fa-4edf-bd83-very-long-session-name",
3,
1024,
256,
)
.unwrap();
assert_eq!(layout.transport_prefix(), "vmy");
assert_eq!(layout.runner_count(), 3);
assert!(layout.rpc_ring_name().len() <= SEGMENT_NAME_LIMIT_MACOS);
assert!(layout.response_ring_name(0).unwrap().len() <= SEGMENT_NAME_LIMIT_MACOS);
assert!(layout.response_ring_name(1).unwrap().starts_with("vmy"));
assert_eq!(layout.rpc_depth(), 1024);
assert_eq!(layout.response_depth(), 256);
}
#[test]
fn transport_layout_names_leave_room_for_shared_memory_suffixes() {
let layout = MyelonTransportLayout::for_session_with_prefix(
"vmy",
"integration-macos-smoke",
2,
16,
8,
)
.unwrap();
for name in [
layout.rpc_ring_name(),
layout.response_ring_name(0).unwrap(),
layout.response_ring_name(1).unwrap(),
] {
assert!(name.len() <= SEGMENT_NAME_LIMIT_MACOS);
assert!(coordination_cursor_name(name).len() <= DARWIN_SHARED_MEMORY_OBJECT_LIMIT);
assert!(
format!("{name}{PRODUCER_SEQUENCE_SUFFIX}").len()
<= DARWIN_SHARED_MEMORY_OBJECT_LIMIT
);
}
}
#[test]
fn transport_layout_builds_mmap_layouts_from_portable_segment_names() {
let layout = MyelonTransportLayout::for_session_with_prefix(
"vmy",
"integration-mmap-smoke",
2,
16,
8,
)
.unwrap();
let root = std::env::temp_dir().join("myelon_transport_layout_mmap_test");
let rpc_layout = layout.rpc_mmap_layout(root.clone()).unwrap();
let response_layout = layout.response_mmap_layout(root.clone(), 1).unwrap();
assert_eq!(rpc_layout.root_dir(), root.as_path());
assert_eq!(rpc_layout.segment_name(), layout.rpc_ring_name());
assert_eq!(response_layout.root_dir(), root.as_path());
assert_eq!(
response_layout.segment_name(),
layout.response_ring_name(1).unwrap()
);
}
#[test]
fn transport_config_defaults_to_blocking_depths() {
let config = MyelonTransportConfig::resolve(None, None, None).unwrap();
assert_eq!(config.rpc_depth, DEFAULT_MYELON_RPC_DEPTH);
assert_eq!(config.response_depth, DEFAULT_MYELON_RESPONSE_DEPTH);
assert_eq!(config.wait_strategy, MyelonWaitStrategy::Block);
}
#[test]
fn transport_config_honors_busy_spin_and_custom_depths() {
let config = MyelonTransportConfig::resolve(Some(2048), Some(512), Some(true)).unwrap();
assert_eq!(config.rpc_depth, 2048);
assert_eq!(config.response_depth, 512);
assert_eq!(config.wait_strategy, MyelonWaitStrategy::BusySpin);
}
#[test]
fn transport_config_rejects_zero_depths() {
let error = MyelonTransportConfig::resolve(Some(0), None, None)
.unwrap_err()
.to_string();
assert!(error.contains("myelon_rpc_depth"));
let error = MyelonTransportConfig::resolve(None, Some(0), None)
.unwrap_err()
.to_string();
assert!(error.contains("myelon_response_depth"));
}
#[test]
fn transport_layout_rejects_invalid_prefix() {
let error =
MyelonTransportLayout::for_session_with_prefix("bad-prefix", "session", 1, 16, 16)
.unwrap_err()
.to_string();
assert!(error.contains("ASCII alphanumeric"));
}
#[test]
fn ensure_coordination_cursor_is_idempotent() {
let base_name = unique_ring_name("mycr");
let first = ensure_coordination_cursor(&base_name).unwrap();
let second = ensure_coordination_cursor(&base_name).unwrap();
first.store(41, AtomicOrdering::Release);
assert_eq!(second.load(AtomicOrdering::Acquire), 41);
}
#[test]
fn segmented_frame_flags_mark_boundaries() {
assert_eq!(frame_flags(true, true), 0b11);
assert_eq!(frame_flags(true, false), 0b01);
assert_eq!(frame_flags(false, true), 0b10);
assert_eq!(frame_flags(false, false), 0b00);
}
#[test]
fn fixed_frame_round_trip_preserves_meta() {
let mut frame = FixedFrame::<16>::default();
frame.write_frame(b"hello", 7, 42, 0b10);
let meta = frame.frame_meta();
assert_eq!(FixedFrame::<16>::payload_capacity(), 16);
assert_eq!(meta.len, 5);
assert_eq!(meta.kind, 7);
assert_eq!(meta.flags, 0b10);
assert_eq!(meta.msg_id, 42);
assert_eq!(meta.timestamp_ns, None);
assert_eq!(meta.data, b"hello");
}
#[test]
fn aligned_fixed_frame_payload_starts_on_a_16_byte_boundary() {
let mut frame = AlignedFixedFrame::<32>::default();
frame.write_frame(b"typed", 3, 9, 0b11);
let meta = frame.frame_meta();
assert_eq!(AlignedFixedFrame::<32>::payload_capacity(), 32);
assert_eq!((meta.data.as_ptr() as usize) % 16, 0);
assert_eq!(meta.data, b"typed");
}
#[test]
fn reassembly_buffer_storage_is_16_byte_aligned() {
let mut reassembly = ReassemblyBuffer::new(96);
reassembly.start(7, 1, b"hello", 48, Some(9));
assert_eq!((reassembly.bytes().as_ptr() as usize) % 16, 0);
assert_eq!(reassembly.bytes(), b"hello");
assert_eq!(reassembly.sent_timestamp_ns, Some(9));
}
#[test]
fn publish_framed_payload_splits_large_payload() {
let payload = vec![7u8; 16 * 2 + 5];
let mut frames = Vec::new();
publish_framed_payload(
&payload,
9,
16,
|frame| frames.push(frame),
|chunk, kind, msg_id, flags| {
let mut frame = RawFrame::default();
frame.write_frame(chunk, kind, msg_id, flags);
frame
},
);
assert_eq!(frames.len(), 3);
assert_eq!(frames[0].flags, 0b01);
assert_eq!(frames[1].flags, 0b00);
assert_eq!(frames[2].flags, 0b10);
assert_eq!(frames[0].kind, 9);
assert_eq!(frames[0].msg_id, frames[1].msg_id);
assert_eq!(frames[1].msg_id, frames[2].msg_id);
}
#[test]
fn publish_framed_payload_in_place_splits_large_payload() {
let payload = vec![7u8; 16 * 2 + 5];
let mut frames = Vec::new();
publish_framed_payload_in_place(&payload, 9, 16, |chunk, kind, msg_id, flags| {
let mut frame = RawFrame::default();
frame.write_frame(chunk, kind, msg_id, flags);
frames.push(frame);
});
assert_eq!(frames.len(), 3);
assert_eq!(frames[0].flags, 0b01);
assert_eq!(frames[1].flags, 0b00);
assert_eq!(frames[2].flags, 0b10);
assert_eq!(frames[0].kind, 9);
assert_eq!(frames[0].msg_id, frames[1].msg_id);
assert_eq!(frames[1].msg_id, frames[2].msg_id);
}
#[test]
fn recv_framed_message_reassembles_large_payload() {
let payload = b"abcdefghijklmnopqrstuvwxyz".to_vec();
let mut frames = Vec::new();
publish_framed_payload(
&payload,
3,
8,
|frame| frames.push(frame),
|chunk, kind, msg_id, flags| {
let mut frame = RawFrame::default();
frame.write_frame(chunk, kind, msg_id, flags);
frame
},
);
let mut index = 0usize;
let (kind, decoded) = recv_framed_message(
|| {
let frame = frames[index];
index += 1;
(index as i64, frame)
},
|frame| frame.frame_meta(),
);
assert_eq!(kind, 3);
assert_eq!(decoded, payload);
}
#[test]
fn recv_framed_message_with_meta_preserves_timestamps() {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
struct TimedRawFrame {
len: usize,
kind: u8,
flags: u8,
msg_id: u32,
timestamp_ns: u64,
data: [u8; 16],
}
impl FramedTransportFrame for TimedRawFrame {
fn payload_capacity() -> usize {
16
}
fn frame_meta(&self) -> FrameMeta<'_> {
FrameMeta {
len: self.len,
kind: self.kind,
flags: self.flags,
msg_id: self.msg_id,
timestamp_ns: Some(self.timestamp_ns),
data: &self.data[..self.len],
}
}
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
self.len = payload.len();
self.kind = kind;
self.flags = flags;
self.msg_id = msg_id;
self.timestamp_ns = 1;
self.data[..payload.len()].copy_from_slice(payload);
}
}
let mut frames = Vec::new();
publish_framed_payload(
b"hello world",
7,
16,
|frame| frames.push(frame),
|chunk, kind, msg_id, flags| {
let mut frame = TimedRawFrame::default();
frame.write_frame(chunk, kind, msg_id, flags);
frame
},
);
let mut index = 0usize;
let (meta, payload) = recv_framed_message_with_meta(
|| {
let frame = frames[index];
index += 1;
(index as i64, frame)
},
|frame| frame.frame_meta(),
);
assert_eq!(meta.kind, 7);
assert_eq!(meta.sent_timestamp_ns, Some(1));
assert!(meta.received_timestamp_ns >= 1);
assert!(meta.one_way_latency_ns().is_some());
assert_eq!(payload, b"hello world");
}
#[test]
fn framed_transport_round_trip_uses_shared_coordination_contract() {
let ring_name = unique_ring_name("myfr");
let mut producer = FramedTransportProducer::<RawFrame>::create(&ring_name, 8).unwrap();
let mut consumer =
FramedTransportConsumer::<RawFrame>::attach(&ring_name, 8, MyelonWaitStrategy::Block)
.unwrap();
assert!(consumer.has_coordination_support());
producer.publish(b"hello world", 7);
let (kind, payload) = consumer.recv_message_blocking();
assert_eq!(kind, 7);
assert_eq!(payload, b"hello world");
}
#[test]
fn process_available_messages_with_meta_preserves_fragment_timestamp() {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
struct TinyTimedFrame {
len: usize,
kind: u8,
flags: u8,
msg_id: u32,
timestamp_ns: u64,
data: [u8; 4],
}
impl FramedTransportFrame for TinyTimedFrame {
fn payload_capacity() -> usize {
4
}
fn frame_meta(&self) -> FrameMeta<'_> {
FrameMeta {
len: self.len,
kind: self.kind,
flags: self.flags,
msg_id: self.msg_id,
timestamp_ns: Some(self.timestamp_ns),
data: &self.data[..self.len],
}
}
fn write_frame(&mut self, payload: &[u8], kind: u8, msg_id: u32, flags: u8) {
self.len = payload.len();
self.kind = kind;
self.flags = flags;
self.msg_id = msg_id;
self.timestamp_ns = 7;
self.data[..payload.len()].copy_from_slice(payload);
}
}
let ring_name = unique_ring_name("myfrmeta");
let mut producer =
FramedTransportProducer::<TinyTimedFrame>::create(&ring_name, 16).unwrap();
let mut consumer = FramedTransportConsumer::<TinyTimedFrame>::attach(
&ring_name,
16,
MyelonWaitStrategy::BusySpin,
)
.unwrap();
let mut reassembly = ReassemblyBuffer::new(64);
let mut seen = None;
producer.publish(b"hello world", 9);
while seen.is_none() {
consumer.process_available_messages_with_meta(&mut reassembly, |meta, bytes| {
seen = Some((meta, bytes.to_vec()));
});
if seen.is_none() {
std::hint::spin_loop();
}
}
let (meta, payload) = seen.unwrap();
assert_eq!(meta.kind, 9);
assert_eq!(meta.sent_timestamp_ns, Some(7));
assert!(meta.received_timestamp_ns >= 7);
assert!(meta.one_way_latency_ns().is_some());
assert_eq!(payload, b"hello world");
}
#[test]
fn mmap_framed_transport_round_trip_block_uses_configured_wait_policy() {
let ring_name = unique_ring_name("mmfr");
let root = std::env::temp_dir().join(format!("myelon_mmap_block_{ring_name}"));
let mut producer = MmapFramedTransportProducer::<RawFrame>::create(
MmapTransportLayout::new(root.clone(), ring_name.clone()).unwrap(),
8,
)
.unwrap();
let mut consumer = MmapFramedTransportConsumer::<RawFrame>::attach(
MmapTransportLayout::new(root, ring_name).unwrap(),
8,
"c0",
MyelonWaitStrategy::Block,
)
.unwrap();
assert!(consumer.has_coordination_support());
producer.publish(b"hello mmap", 9);
let (kind, payload) = consumer.recv_message_blocking();
assert_eq!(kind, 9);
assert_eq!(payload, b"hello mmap");
}
#[test]
fn framed_transport_discover_consumers_times_out_when_no_consumer_attaches() {
let ring_name = unique_ring_name("mydc");
let mut producer = FramedTransportProducer::<RawFrame>::create(&ring_name, 8).unwrap();
assert!(!producer.discover_consumers(std::time::Duration::from_millis(25)));
}
#[test]
fn framed_publish_managed_reports_missing_required_consumer_at_startup() {
let ring_name = unique_ring_name("fmsu");
let depth = 4;
let mut producer =
FramedTransportProducer::<RawFrame>::create_with_consumers(&ring_name, depth, 2)
.unwrap();
producer.enable_required_consumer_liveness(test_liveness_config());
let _consumer1 = FramedTransportConsumer::<RawFrame>::attach_with_consumer_id(
&ring_name,
depth,
"c1",
MyelonWaitStrategy::BusySpin,
)
.unwrap();
let error = producer
.publish_managed(b"hello", 7)
.expect_err("missing c2 should trip startup timeout");
assert!(matches!(
error,
RequiredConsumerError::StartupTimeout { missing } if missing == vec!["c2".to_string()]
));
}
#[test]
fn framed_publish_managed_recovers_when_same_consumer_id_rejoins() {
let ring_name = unique_ring_name("fmrj");
let depth = 4;
let mut producer =
FramedTransportProducer::<RawFrame>::create_with_consumers(&ring_name, depth, 2)
.unwrap();
producer.enable_required_consumer_liveness(test_liveness_config());
let stop_consumer1 = Arc::new(AtomicBool::new(false));
let stop_consumer1_thread = Arc::clone(&stop_consumer1);
let ring_name_for_thread = ring_name.clone();
let consumer1_thread = std::thread::spawn(move || {
let mut consumer1 = FramedTransportConsumer::<RawFrame>::attach_with_consumer_id(
&ring_name_for_thread,
depth,
"c1",
MyelonWaitStrategy::BusySpin,
)
.unwrap();
let mut reassembly = ReassemblyBuffer::new(256);
while !stop_consumer1_thread.load(SyncOrdering::Acquire) {
let processed =
consumer1.process_available_messages(&mut reassembly, |_kind, _bytes| {});
if processed == 0 {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
});
let mut consumer2 = FramedTransportConsumer::<RawFrame>::attach_with_consumer_id(
&ring_name,
depth,
"c2",
MyelonWaitStrategy::BusySpin,
)
.unwrap();
producer.publish_managed(b"seed", 1).unwrap();
let (kind, payload) = consumer2.recv_message_blocking_owned();
assert_eq!(kind, 1);
assert_eq!(payload, b"seed");
drop(consumer2);
for i in 0..depth {
let payload = vec![i as u8; 8];
producer.publish_managed(&payload, 2).unwrap();
}
let ring_name_for_rejoin = ring_name.clone();
let rejoin_thread = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(40));
let mut rejoined = FramedTransportConsumer::<RawFrame>::attach_with_consumer_id(
&ring_name_for_rejoin,
depth,
"c2",
MyelonWaitStrategy::BusySpin,
)
.unwrap();
let mut reassembly = ReassemblyBuffer::new(256);
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
let mut consumed = 0usize;
while std::time::Instant::now() < deadline && consumed < depth + 2 {
let processed =
rejoined.process_available_messages(&mut reassembly, |_kind, _bytes| {
consumed += 1;
});
if processed == 0 {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
consumed
});
producer.publish_managed(b"tail", 3).unwrap();
stop_consumer1.store(true, SyncOrdering::Release);
consumer1_thread.join().unwrap();
let consumed = rejoin_thread.join().unwrap();
assert!(
consumed > 0,
"rejoined framed consumer should drain backlog"
);
}
}