#[allow(rustdoc::private_doc_tests)]
#[path = "builder.rs"]
mod builder;
#[path = "consumer.rs"]
mod consumer;
#[path = "lock_free/consumer_barrier.rs"]
mod consumer_barrier;
#[path = "lock_free/cursor.rs"]
mod cursor;
#[path = "producer.rs"]
mod producer;
#[path = "backend/shared_memory/ringbuffer.rs"]
mod ringbuffer;
#[path = "runtime/wait.rs"]
mod wait;
pub mod shared_memory {
pub use super::ringbuffer::SharedRingBuffer;
pub use super::SharedMemoryConfig;
pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
}
pub mod backend {
pub mod shared_memory {
pub use super::super::ringbuffer::SharedRingBuffer;
pub use super::super::SharedMemoryConfig;
pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
}
pub mod mmap {
pub use super::super::MmapConsumerBarrier;
pub use super::super::MmapCursorConfig;
pub use super::super::MmapFileConfig;
pub use super::super::MmapTransportLayout;
pub use crate::mmap_consumer::MmapConsumer;
pub use crate::mmap_cursor::MmapCursor;
pub use crate::mmap_producer::MmapProducer;
pub use crate::mmap_ringbuffer::MmapRingBuffer;
}
}
pub mod lock_free {
pub use super::consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
pub use super::cursor::{SharedCursor, SharedCursorTrait};
pub type ProducerBarrier = super::cursor::SharedCursor;
}
#[inline]
pub fn default_block_strategy_duration() -> std::time::Duration {
wait::default_block_strategy_duration()
}
#[inline]
pub fn default_consume_sleep_duration() -> std::time::Duration {
wait::default_consume_sleep_duration()
}
#[inline]
pub fn default_discovery_poll_duration() -> std::time::Duration {
wait::default_discovery_poll_duration()
}
#[inline]
pub fn perform_default_block_wait() {
wait::perform_default_block_wait()
}
#[inline]
pub fn perform_default_consume_sleep_wait() {
wait::perform_default_consume_sleep_wait()
}
#[inline]
pub fn perform_default_discovery_poll_wait() {
wait::perform_default_discovery_poll_wait()
}
#[inline]
pub fn perform_sleep_wait(duration: std::time::Duration) {
wait::perform_sleep_wait(duration)
}
pub use crate::mmap_barrier::MmapConsumerBarrier;
pub use crate::mmap_consumer::MmapConsumer;
pub use crate::mmap_cursor::MmapCursor;
pub use crate::mmap_producer::MmapProducer;
pub use crate::mmap_ringbuffer::MmapRingBuffer;
pub use crate::mmap_transport::MmapTransportLayout;
pub use builder::{
attach_shared_consumer, build_shared_single_producer, AutoConsumer, AutoWaitStrategy,
SharedDisruptorBuilder,
};
pub use consumer::{ConsumerCounterSelection, SharedConsumer};
pub use consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
pub use cursor::{SharedCursor, SharedCursorTrait};
pub use producer::{CoordinationMode, ProducerCounterSelection, SharedProducer};
pub use ringbuffer::SharedRingBuffer;
pub use shared_memory::ShmRingBuffer;
use std::{fmt, path::PathBuf};
pub const DEFAULT_MAX_CONSUMERS: usize = 64;
#[derive(Debug, thiserror::Error)]
pub enum MultiProcessError {
#[error("Failed to create shared memory: {0}")]
SharedMemoryError(String),
#[error("Failed to map memory: {0}")]
MemoryMapError(String),
#[error("Shared segment not found: {0}")]
SegmentNotFound(String),
#[error("Incompatible data layout: {0}")]
IncompatibleLayout(String),
#[error("Permission denied")]
PermissionDenied,
#[error("Coordination timeout: {0}")]
CoordinationTimeout(String),
}
pub type MultiProcessResult<T> = Result<T, MultiProcessError>;
#[derive(Debug, Clone)]
pub struct SharedMemoryConfig {
pub name: String,
pub buffer_size: usize,
pub element_size: usize,
pub create: bool,
}
impl fmt::Display for SharedMemoryConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SharedMemory(name={}, size={}, element_size={})",
self.name, self.buffer_size, self.element_size
)
}
}
#[derive(Debug, Clone)]
pub struct MmapFileConfig {
pub path: PathBuf,
pub buffer_size: usize,
pub element_size: usize,
pub create: bool,
}
impl fmt::Display for MmapFileConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"MmapFile(path={}, size={}, element_size={})",
self.path.display(),
self.buffer_size,
self.element_size
)
}
}
#[derive(Debug, Clone)]
pub struct MmapCursorConfig {
pub path: PathBuf,
pub create: bool,
}
impl fmt::Display for MmapCursorConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MmapCursor(path={})", self.path.display())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::MissingFreeSlots;
use crate::RequiredConsumerError;
use crate::RequiredConsumerLivenessConfig;
use crate::RingBufferFull;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn unique_test_segment(prefix: &str) -> String {
let name = crate::portable_shm_segment_name(prefix);
assert!(
name.len() <= 14,
"test segment name '{}' exceeds macOS-safe budget",
name
);
name
}
#[test]
fn test_unique_test_segment_stays_within_macos_budget() {
assert!(unique_test_segment("process_available_blocking_batch").len() <= 14);
assert!(unique_test_segment("race_condition_fix").len() <= 14);
}
#[derive(Debug, Copy, Clone, Default, PartialEq)]
struct TestEvent {
sequence: i64,
data: i64,
}
fn attach_named_consumer(
name: &str,
buffer_size: usize,
consumer_id: &str,
) -> SharedConsumer<TestEvent> {
let config = SharedMemoryConfig {
name: name.to_string(),
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
SharedDisruptorBuilder::new(config)
.with_consumer_id(consumer_id)
.build_consumer()
.unwrap()
}
#[test]
fn managed_publish_reports_missing_required_consumer_at_startup() {
let name = unique_test_segment("req_cons_start");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(50))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(10)),
);
let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
let error = producer
.publish_managed(|event| {
event.sequence = 1;
event.data = 100;
})
.expect_err("managed publish should fail when c2 never appears");
match error {
RequiredConsumerError::StartupTimeout { missing } => {
assert_eq!(missing, vec!["c2".to_string()]);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn managed_batch_publish_reports_missing_required_consumer_at_startup() {
let name = unique_test_segment("req_cons_batch_start");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(50))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(10)),
);
let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
let error = producer
.publish_batch_managed(2, |event, index| {
event.sequence = index as i64;
event.data = (index as i64) * 10;
})
.expect_err("managed batch publish should fail when c2 never appears");
match error {
RequiredConsumerError::StartupTimeout { missing } => {
assert_eq!(missing, vec!["c2".to_string()]);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn managed_publish_shuts_down_when_required_consumer_stalls() {
let name = unique_test_segment("req_cons_fail");
let buffer_size = 4;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(100))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(20)),
);
let stop_consumer1 = Arc::new(AtomicBool::new(false));
let stop_consumer1_thread = Arc::clone(&stop_consumer1);
let name_for_thread = name.clone();
let consumer1_thread = thread::spawn(move || {
let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
while !stop_consumer1_thread.load(Ordering::Acquire) {
if consumer1.try_consume_next().is_none() {
thread::sleep(Duration::from_millis(1));
}
}
});
let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
producer
.publish_managed(|event| {
event.sequence = 0;
event.data = 0;
})
.unwrap();
let _ = consumer2.consume_next();
drop(consumer2);
for i in 1..=buffer_size {
producer
.publish_managed(|event| {
event.sequence = i as i64;
event.data = (i as i64) * 10;
})
.unwrap();
}
let error = producer
.publish_managed(|event| {
event.sequence = 99;
event.data = 990;
})
.expect_err("managed publish should fail once c2 stops advancing");
stop_consumer1.store(true, Ordering::Release);
consumer1_thread.join().unwrap();
match error {
RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
assert_eq!(consumer_id, "c2");
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn managed_publish_recovers_when_same_consumer_id_rejoins() {
let name = unique_test_segment("req_cons_rejn");
let buffer_size = 4;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(100))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(200)),
);
let stop_consumer1 = Arc::new(AtomicBool::new(false));
let stop_consumer1_thread = Arc::clone(&stop_consumer1);
let name_for_thread = name.clone();
let consumer1_thread = thread::spawn(move || {
let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
while !stop_consumer1_thread.load(Ordering::Acquire) {
if consumer1.try_consume_next().is_none() {
thread::sleep(Duration::from_millis(1));
}
}
});
let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
producer
.publish_managed(|event| {
event.sequence = 0;
event.data = 0;
})
.unwrap();
let _ = consumer2.consume_next();
drop(consumer2);
for i in 1..=buffer_size {
producer
.publish_managed(|event| {
event.sequence = i as i64;
event.data = (i as i64) * 10;
})
.unwrap();
}
let name_for_rejoin = name.clone();
let rejoin_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(40));
let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
let deadline = Instant::now() + Duration::from_millis(500);
let mut consumed = 0usize;
while Instant::now() < deadline && consumed < buffer_size + 2 {
if rejoined.try_consume_next().is_some() {
consumed += 1;
} else {
thread::sleep(Duration::from_millis(1));
}
}
consumed
});
let sequence = producer
.publish_managed(|event| {
event.sequence = 99;
event.data = 990;
})
.expect("same-id rejoin should recover before shutdown");
stop_consumer1.store(true, Ordering::Release);
consumer1_thread.join().unwrap();
let rejoined_consumed = rejoin_thread.join().unwrap();
assert!(sequence >= buffer_size as i64);
assert!(rejoined_consumed > 0, "rejoined c2 should consume backlog");
}
#[test]
fn managed_publish_rejects_different_consumer_id_rejoin() {
let name = unique_test_segment("req_cons_diff");
let buffer_size = 4;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(100))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(200)),
);
let stop_consumer1 = Arc::new(AtomicBool::new(false));
let stop_consumer1_thread = Arc::clone(&stop_consumer1);
let name_for_thread = name.clone();
let consumer1_thread = thread::spawn(move || {
let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
while !stop_consumer1_thread.load(Ordering::Acquire) {
if consumer1.try_consume_next().is_none() {
thread::sleep(Duration::from_millis(1));
}
}
});
let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
producer
.publish_managed(|event| {
event.sequence = 0;
event.data = 0;
})
.unwrap();
let _ = consumer2.consume_next();
drop(consumer2);
for i in 1..=buffer_size {
producer
.publish_managed(|event| {
event.sequence = i as i64;
event.data = (i as i64) * 10;
})
.unwrap();
}
let name_for_wrong_rejoin = name.clone();
let wrong_rejoin_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(40));
let mut wrong_consumer =
attach_named_consumer(&name_for_wrong_rejoin, buffer_size, "c3");
let deadline = Instant::now() + Duration::from_millis(500);
let mut consumed = 0usize;
while Instant::now() < deadline && consumed < buffer_size + 2 {
if wrong_consumer.try_consume_next().is_some() {
consumed += 1;
} else {
thread::sleep(Duration::from_millis(1));
}
}
consumed
});
let error = producer
.publish_managed(|event| {
event.sequence = 99;
event.data = 990;
})
.expect_err("wrong-id rejoin must not clear the c2 stall");
stop_consumer1.store(true, Ordering::Release);
consumer1_thread.join().unwrap();
let wrong_rejoin_consumed = wrong_rejoin_thread.join().unwrap();
assert!(
wrong_rejoin_consumed > 0,
"a new consumer id may still read the broadcast stream"
);
match error {
RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
assert_eq!(consumer_id, "c2");
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn managed_publish_rejoin_after_grace_period_still_fails() {
let name = unique_test_segment("req_cons_late");
let buffer_size = 4;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(100))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(60)),
);
let stop_consumer1 = Arc::new(AtomicBool::new(false));
let stop_consumer1_thread = Arc::clone(&stop_consumer1);
let name_for_thread = name.clone();
let consumer1_thread = thread::spawn(move || {
let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
while !stop_consumer1_thread.load(Ordering::Acquire) {
if consumer1.try_consume_next().is_none() {
thread::sleep(Duration::from_millis(1));
}
}
});
let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
producer
.publish_managed(|event| {
event.sequence = 0;
event.data = 0;
})
.unwrap();
let _ = consumer2.consume_next();
drop(consumer2);
for i in 1..=buffer_size {
producer
.publish_managed(|event| {
event.sequence = i as i64;
event.data = (i as i64) * 10;
})
.unwrap();
}
let name_for_rejoin = name.clone();
let rejoin_thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(140));
let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
let deadline = Instant::now() + Duration::from_millis(300);
let mut consumed = 0usize;
while Instant::now() < deadline && consumed < buffer_size + 2 {
if rejoined.try_consume_next().is_some() {
consumed += 1;
} else {
thread::sleep(Duration::from_millis(1));
}
}
consumed
});
let error = producer
.publish_managed(|event| {
event.sequence = 99;
event.data = 990;
})
.expect_err("late same-id rejoin must not rescue the topology after grace expires");
stop_consumer1.store(true, Ordering::Release);
consumer1_thread.join().unwrap();
let rejoined_consumed = rejoin_thread.join().unwrap();
assert!(
rejoined_consumed > 0,
"late rejoined consumer may still drain retained backlog"
);
match error {
RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
assert_eq!(consumer_id, "c2");
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn managed_publish_does_not_fail_while_topology_is_idle() {
let name = unique_test_segment("req_cons_idle");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2)
.with_coordination(CoordinationMode::Immediate)
.build_producer(TestEvent::default)
.unwrap();
producer.enable_required_consumer_liveness(
RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
.with_startup_wait_timeout(Duration::from_millis(100))
.with_progress_timeout(Duration::from_millis(20))
.with_progress_check_interval(Duration::from_millis(1))
.with_shutdown_grace_period(Duration::from_millis(50)),
);
let mut consumer1 = attach_named_consumer(&name, buffer_size, "c1");
let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
producer
.publish_managed(|event| {
event.sequence = 1;
event.data = 10;
})
.unwrap();
assert_eq!(consumer1.consume_next().0, 0);
assert_eq!(consumer2.consume_next().0, 0);
thread::sleep(Duration::from_millis(75));
producer
.publish_managed(|event| {
event.sequence = 2;
event.data = 20;
})
.expect("idle topology must not trigger stall shutdown");
assert_eq!(consumer1.consume_next().0, 1);
assert_eq!(consumer2.consume_next().0, 1);
}
#[test]
fn test_shared_ring_buffer_creation_and_attachment() {
let name = unique_test_segment("test_ring_basic");
let buffer_size = 8;
let config_create = SharedMemoryConfig {
name: name.clone(),
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: true,
};
let ring_buffer = SharedRingBuffer::new(config_create, TestEvent::default).unwrap();
assert_eq!(ring_buffer.size(), buffer_size);
let config_attach = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let attached_buffer: SharedRingBuffer<TestEvent> =
SharedRingBuffer::attach(config_attach).unwrap();
assert_eq!(attached_buffer.size(), buffer_size);
assert_eq!(ring_buffer.size(), attached_buffer.size());
}
#[test]
fn test_basic_producer_consumer_coordination() {
let name = unique_test_segment("test_basic_coord");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
producer.publish(|event| {
event.sequence = 0;
event.data = 42;
});
let mut consumed_events = Vec::new();
if let Some((seq, event)) = consumer.try_consume_next() {
consumed_events.push((seq, event));
}
assert_eq!(consumed_events.len(), 1);
assert_eq!(consumed_events[0].0, 0); assert_eq!(consumed_events[0].1.sequence, 0);
assert_eq!(consumed_events[0].1.data, 42);
}
#[test]
fn test_spsc_ring_buffer_full_behavior() {
let name = unique_test_segment("spsc_full");
let buffer_size = 4;
let name_clone = name.clone();
let consumer_handle = thread::spawn(move || {
let config = SharedMemoryConfig {
name: name_clone,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
thread::sleep(Duration::from_millis(50));
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
thread::sleep(Duration::from_millis(200));
let mut consumed = Vec::new();
let processed = consumer.process_available(|event: &TestEvent, _| {
consumed.push(event.data);
});
(processed, consumed)
});
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(1) .build_producer(TestEvent::default)
.unwrap();
thread::sleep(Duration::from_millis(100));
for i in 0..buffer_size {
producer
.try_publish(|e| {
e.sequence = i as i64;
e.data = i as i64 * 10;
})
.expect("Should be able to publish to non-full buffer");
}
assert_eq!(
producer
.try_publish(|e| e.sequence = buffer_size as i64)
.err()
.unwrap(),
RingBufferFull
);
let (processed, consumed) = consumer_handle.join().unwrap();
assert_eq!(processed, buffer_size);
producer
.try_publish(|e| {
e.sequence = buffer_size as i64;
e.data = 999;
})
.expect("Should be able to publish after consumer freed space");
let expected: Vec<i64> = (0..buffer_size).map(|i| i as i64 * 10).collect();
assert_eq!(consumed, expected);
}
#[test]
fn test_spsc_ordered_event_processing() {
let name = unique_test_segment("spsc_ordered");
let buffer_size = 16; let num_events = 10;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
for i in 0..num_events {
producer.publish(|event| {
event.sequence = i as i64;
event.data = (i as i64) * (i as i64); });
}
let mut consumed_events = Vec::new();
let mut total_processed = 0;
while total_processed < num_events {
let processed = consumer.process_available(|event: &TestEvent, seq| {
consumed_events.push((seq, event.sequence, event.data));
});
total_processed += processed;
if processed == 0 {
thread::yield_now();
}
}
assert_eq!(consumed_events.len(), num_events);
for (i, &(seq, event_seq, data)) in consumed_events.iter().enumerate() {
assert_eq!(seq, i as i64);
assert_eq!(event_seq, i as i64);
assert_eq!(data, (i as i64) * (i as i64));
}
}
#[test]
fn test_process_available_advances_consumer_sequence_after_batch() {
let name = unique_test_segment("process_available_batch");
let buffer_size = 16;
let num_events = 6;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
for i in 0..num_events {
producer.publish(|event| {
event.sequence = i as i64;
event.data = i as i64 * 10;
});
}
let mut consumed = Vec::new();
let processed = consumer.process_available(|event: &TestEvent, seq| {
consumed.push((seq, event.sequence, event.data));
});
assert_eq!(processed, num_events);
assert_eq!(consumed.len(), num_events);
assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
assert_eq!(consumer.producer_sequence(), (num_events - 1) as i64);
assert_eq!(consumer.consumer_sequence(), (num_events - 1) as i64);
}
#[test]
fn test_process_available_blocking_marks_only_final_event_as_end_of_batch() {
let name = unique_test_segment("process_available_blocking_batch");
let buffer_size = 16;
let num_events = 4;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
for i in 0..num_events {
producer.publish(|event| {
event.sequence = i as i64;
event.data = i as i64;
});
}
let mut observed = Vec::new();
let processed =
consumer.process_available_blocking(|event: &TestEvent, seq, end_of_batch| {
observed.push((seq, event.sequence, end_of_batch));
});
assert_eq!(processed, num_events);
assert_eq!(
observed,
vec![(0, 0, false), (1, 1, false), (2, 2, false), (3, 3, true),]
);
assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
}
#[test]
fn test_per_consumer_sequences_prevent_race_conditions() {
let name = unique_test_segment("per_consumer_test");
let buffer_size = 64;
let num_events = 10;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name: name.clone(),
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
.build_consumer()
.unwrap();
let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
println!("Created two consumers for broadcast test");
for i in 0..num_events {
producer.publish(|event| {
event.sequence = i as i64;
event.data = i as i64;
});
println!("Published event {}", i);
}
let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
println!(
"After publishing - Consumer 1: current={}, producer={}, consumer={}",
seq1, prod_seq1, consumer_seq1
);
println!(
"After publishing - Consumer 2: current={}, producer={}, consumer={}",
seq2, prod_seq2, consumer_seq2
);
let mut consumer1_events = Vec::new();
let mut consumer2_events = Vec::new();
let _processed1 = consumer1.process_available(|event: &TestEvent, _seq| {
consumer1_events.push((event.sequence, event.data));
println!(
"Consumer 1 processed event: seq={}, data={}",
event.sequence, event.data
);
});
let _processed2 = consumer2.process_available(|event: &TestEvent, _seq| {
consumer2_events.push((event.sequence, event.data));
println!(
"Consumer 2 processed event: seq={}, data={}",
event.sequence, event.data
);
});
println!(
"Consumer 1 processed {} events: {:?}",
consumer1_events.len(),
consumer1_events
);
println!(
"Consumer 2 processed {} events: {:?}",
consumer2_events.len(),
consumer2_events
);
let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
println!(
"After processing - Consumer 1: current={}, producer={}, consumer={}",
seq1, prod_seq1, consumer_seq1
);
println!(
"After processing - Consumer 2: current={}, producer={}, consumer={}",
seq2, prod_seq2, consumer_seq2
);
assert_eq!(
consumer1_events.len(),
num_events,
"Consumer 1 should see all events"
);
assert_eq!(
consumer2_events.len(),
num_events,
"Consumer 2 should see all events"
);
for i in 0..num_events {
assert_eq!(consumer1_events[i], (i as i64, i as i64));
assert_eq!(consumer2_events[i], (i as i64, i as i64));
}
println!("SUCCESS: Both consumers saw all events (broadcast semantics)!");
}
#[test]
fn test_broadcast_consumer_basic() {
let name = unique_test_segment("broadcast_basic");
let buffer_size = 64;
let num_events = 5;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name: name.clone(),
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
.build_consumer()
.unwrap();
let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
println!("Created two consumers for basic broadcast test");
println!(
"Consumer 1 ID: {}, Consumer 2 ID: {}",
consumer1.consumer_id(),
consumer2.consumer_id()
);
for i in 0..num_events {
producer.publish(|event| {
event.sequence = i as i64;
event.data = i as i64;
});
}
let mut consumer1_events = Vec::new();
let mut consumer2_events = Vec::new();
consumer1.process_available(|event: &TestEvent, _seq| {
consumer1_events.push(event.sequence);
});
consumer2.process_available(|event: &TestEvent, _seq| {
consumer2_events.push(event.sequence);
});
println!("Consumer 1 processed: {:?}", consumer1_events);
println!("Consumer 2 processed: {:?}", consumer2_events);
assert_eq!(
consumer1_events.len(),
num_events,
"Consumer 1 should see all events"
);
assert_eq!(
consumer2_events.len(),
num_events,
"Consumer 2 should see all events"
);
for i in 0..num_events {
assert_eq!(consumer1_events[i], i as i64);
assert_eq!(consumer2_events[i], i as i64);
}
println!("Both consumers saw all events in order!");
}
#[test]
fn test_shared_cursor_operations() {
let name = unique_test_segment("atomic_ops");
let cursor = SharedCursor::new(&name, 0).unwrap();
assert_eq!(cursor.load(Ordering::Relaxed), 0);
cursor.store(42, Ordering::Relaxed);
assert_eq!(cursor.load(Ordering::Relaxed), 42);
let old = cursor.fetch_add(8, Ordering::Relaxed);
assert_eq!(old, 42);
assert_eq!(cursor.load(Ordering::Relaxed), 50);
let result = cursor.compare_exchange(50, 100, Ordering::Relaxed, Ordering::Relaxed);
assert_eq!(result, Ok(50));
assert_eq!(cursor.load(Ordering::Relaxed), 100);
let result = cursor.compare_exchange(50, 200, Ordering::Relaxed, Ordering::Relaxed);
assert_eq!(result, Err(100));
assert_eq!(cursor.load(Ordering::Relaxed), 100);
}
#[test]
fn test_consumer_attachment_to_nonexistent_segment() {
let name = "nonexistent".to_string();
let config = SharedMemoryConfig {
name,
buffer_size: 8,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let result: Result<SharedConsumer<TestEvent>, MultiProcessError> =
SharedDisruptorBuilder::new(config).build_consumer();
assert!(result.is_err());
match result.err().unwrap() {
MultiProcessError::SegmentNotFound(_) => {} other => panic!("Expected SegmentNotFound, got {:?}", other),
}
}
#[test]
fn test_feature_completeness_documentation() {
}
#[test]
fn test_batch_publish_writes_and_consumes_in_sequence() {
let name = unique_test_segment("batch_sequence");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
let upper = producer
.try_batch_publish(4, |event, i| {
event.sequence = i as i64;
event.data = 100 + i as i64;
})
.unwrap();
assert_eq!(upper, 3);
assert_eq!(producer.last_published_sequence(), 3);
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
let mut consumed = Vec::new();
while consumed.len() < 4 {
let processed = consumer.process_available(|event: &TestEvent, seq| {
consumed.push((seq, event.sequence, event.data));
});
if processed == 0 {
std::thread::yield_now();
}
}
assert_eq!(
consumed,
vec![(0, 0, 100), (1, 1, 101), (2, 2, 102), (3, 3, 103)]
);
}
#[test]
fn test_simple_batch_publish_is_noop_for_zero() {
let name = unique_test_segment("batch_zero");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.build_producer(TestEvent::default)
.unwrap();
assert_eq!(
producer.try_batch_publish(0, |_event, _| {
panic!("indexed closure must not run for n=0")
}),
Ok(-1)
);
producer
.simple_batch_publish(0, |_event, _| {
panic!("simple_batch_publish no-op closure must not run for n=0")
})
.unwrap();
assert_eq!(producer.last_published_sequence(), -1);
}
#[test]
fn test_try_batch_publish_reports_missing_slots_when_full() {
let name = unique_test_segment("batch_full");
let buffer_size = 4;
let start_consume = Arc::new(AtomicBool::new(false));
let consumed = Arc::new(AtomicUsize::new(0));
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.discover_consumer_with_prefix(1, "bchk")
.build_producer(TestEvent::default)
.unwrap();
let consumer_handle = {
let name = name.clone();
let start_consume = start_consume.clone();
let consumed = consumed.clone();
std::thread::spawn(move || {
let config = SharedMemoryConfig {
name,
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.discover_consumer_with_prefix(1, "bchk")
.with_consumer_id("bchk_0")
.build_consumer()
.unwrap();
while !start_consume.load(Ordering::Acquire) {
std::thread::yield_now();
}
while consumed.load(Ordering::Acquire) < (buffer_size + 1) {
let processed = consumer.process_available(|_event, _| {
consumed.fetch_add(1, Ordering::AcqRel);
});
if processed == 0 {
std::thread::yield_now();
}
}
})
};
producer
.try_batch_publish(4, |event, i| {
event.sequence = i as i64;
event.data = 10 + i as i64;
})
.expect("full buffer should accept exactly `buffer_size` slots");
let producer_seq = producer.last_published_sequence();
let discovery_deadline = Instant::now() + Duration::from_secs(2);
loop {
if producer.min_gating_sequence() != producer_seq {
break;
}
if Instant::now() > discovery_deadline {
panic!("consumer discovery did not reduce gating sequence below producer cursor");
}
std::thread::yield_now();
}
let err = producer
.try_batch_publish(1, |_event, _| {
panic!("second batch must not run when capacity is exhausted")
})
.expect_err("producer must report missing free slots when full");
assert_eq!(err, MissingFreeSlots(1));
start_consume.store(true, Ordering::Release);
let deadline = Instant::now() + Duration::from_secs(2);
while consumed.load(Ordering::Acquire) == 0 {
if Instant::now() > deadline {
panic!("consumer did not start consuming after start signal");
}
std::thread::yield_now();
}
producer
.try_batch_publish(1, |event, i| {
event.sequence = 4 + i as i64;
event.data = 14;
})
.expect("single-slot batch should succeed once one slot is released");
consumer_handle.join().unwrap();
assert_eq!(consumed.load(Ordering::Acquire), buffer_size + 1);
}
#[test]
fn test_fast_slow_consumer_race_condition_fix() {
let name = unique_test_segment("race_condition_fix");
let buffer_size = 8;
let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
.enable_discovery(2) .build_producer(TestEvent::default)
.unwrap();
let config = SharedMemoryConfig {
name: name.clone(),
buffer_size,
element_size: std::mem::size_of::<TestEvent>(),
create: false,
};
let mut fast_consumer: SharedConsumer<TestEvent> =
SharedDisruptorBuilder::new(config.clone())
.build_consumer()
.unwrap();
let mut slow_consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
.build_consumer()
.unwrap();
for i in 0..buffer_size {
producer.publish(|event| {
event.sequence = i as i64;
event.data = i as i64 * 100; });
}
println!("Published {} events to fill buffer", buffer_size);
let mut fast_events = Vec::new();
fast_consumer.process_available(|event: &TestEvent, seq| {
fast_events.push((seq, event.sequence, event.data));
});
let mut slow_events = Vec::new();
let mut slow_processed = 0;
slow_consumer.process_available(|event: &TestEvent, seq| {
if slow_processed < 3 {
slow_events.push((seq, event.sequence, event.data));
slow_processed += 1;
}
});
println!("Fast consumer processed: {} events", fast_events.len());
println!("Slow consumer processed: {} events", slow_events.len());
println!("Fast consumer events: {:?}", fast_events);
println!("Slow consumer events: {:?}", slow_events);
let mut successful_publishes = 0;
for i in buffer_size..(buffer_size + 10) {
match producer.try_publish(|event| {
event.sequence = i as i64;
event.data = i as i64 * 100;
}) {
Ok(_) => {
successful_publishes += 1;
println!("Successfully published event {} (data: {})", i, i * 100);
}
Err(_) => {
println!(
"Buffer full at event {} - producer correctly blocked by slow consumer",
i
);
break;
}
}
}
let slow_events_before_catchup = slow_events.len();
slow_consumer.process_available(|event: &TestEvent, seq| {
slow_events.push((seq, event.sequence, event.data));
});
println!("Slow consumer after catchup: {} events", slow_events.len());
println!("Slow consumer all events: {:?}", slow_events);
let overlap_count = std::cmp::min(fast_events.len(), slow_events_before_catchup);
for i in 0..overlap_count {
assert_eq!(
fast_events[i], slow_events[i],
"Data corruption detected at index {}: fast consumer saw {:?}, slow consumer saw {:?}",
i, fast_events[i], slow_events[i]
);
}
assert!(
successful_publishes < 10,
"Producer should have been blocked by slow consumer, but published {} additional events",
successful_publishes
);
println!("SUCCESS: Producer correctly respected slow consumer position!");
println!(
" - Fast consumer processed {} events immediately",
fast_events.len()
);
println!(
" - Slow consumer processed {} events initially",
slow_events_before_catchup
);
println!(
" - Producer was blocked after {} additional publishes",
successful_publishes
);
}
#[test]
fn test_buffer_wrapping_without_consumers() {
const BUFFER_SIZE: usize = 512; const NUM_EVENTS: u64 = 150_000;
println!(
"Testing: Publishing {} events without consumers (no discovery)",
NUM_EVENTS
);
let segment_name = unique_test_segment("wrap_no_disc");
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
println!("Producer created without discovery");
let start = Instant::now();
for i in 0..NUM_EVENTS {
if start.elapsed() > Duration::from_secs(5) {
panic!("Timeout at event {} - buffer not wrapping correctly!", i);
}
producer.publish(|event| {
event.sequence = i as i64;
event.data = (i % 1000) as i64;
});
if i > 0 && (i % 10_000 == 0) {
println!("Published {} events", i);
}
}
println!(
"✅ Successfully published {} events without consumers!",
NUM_EVENTS
);
println!("Buffer wrapped {} times", NUM_EVENTS / BUFFER_SIZE as u64);
}
#[test]
fn test_exact_64kb_boundary_no_deadlock() {
const BUFFER_SIZE: usize = 512; const NUM_EVENTS: u64 = 65_536;
println!(
"Testing: Publishing exactly {} events (64KB boundary)",
NUM_EVENTS
);
let segment_name = unique_test_segment("boundary");
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let start = Instant::now();
for i in 0..NUM_EVENTS {
if start.elapsed() > Duration::from_secs(2) {
panic!("Deadlock at event {} - this is the old bug!", i);
}
producer.publish(|event| {
event.sequence = i as i64;
});
}
println!(
"✅ Successfully published {} events - no deadlock at 64KB boundary!",
NUM_EVENTS
);
}
#[test]
fn test_buffer_wrapping_various_sizes() {
let buffer_sizes = vec![256, 512, 1024, 2048];
for buffer_size in buffer_sizes {
let num_events = (buffer_size * 100) as u64;
println!(
"Testing buffer size {} with {} events",
buffer_size, num_events
);
let segment_name = unique_test_segment(&format!("size_{}", buffer_size));
let mut producer =
build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let start = Instant::now();
for i in 0..num_events {
if start.elapsed() > Duration::from_secs(5) {
panic!("Timeout with buffer size {} at event {}", buffer_size, i);
}
producer.publish(|event| {
event.sequence = i as i64;
});
}
println!(
"✅ Buffer size {} handled {} events correctly",
buffer_size, num_events
);
}
}
}