use super::consumer::SharedConsumer;
use super::consumer_barrier::{auto_consumer_id, consumer_registration_cursor_name, DiscoveryMode};
use super::producer::{CoordinationMode, SharedProducer};
use crate::env::{read, runtime as runtime_env};
use crate::{MultiProcessResult, SharedCursor, SharedMemoryConfig, SharedRingBuffer};
use disruptor_core::Sequence;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Duration;
static CONSUMER_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn uses_registered_auto_ids(discovery_mode: &DiscoveryMode) -> bool {
matches!(
discovery_mode,
DiscoveryMode::Enabled {
consumer_prefix: None,
..
}
)
}
fn create_consumer_registration_cursor(
base_name: &str,
discovery_mode: &DiscoveryMode,
) -> MultiProcessResult<Option<SharedCursor>> {
if !uses_registered_auto_ids(discovery_mode) {
return Ok(None);
}
let registration_name = consumer_registration_cursor_name(base_name);
let cursor = SharedCursor::new_or_attach(®istration_name, 0)?;
Ok(Some(cursor))
}
fn allocate_registered_consumer_id(base_name: &str) -> Option<String> {
let registration_name = consumer_registration_cursor_name(base_name);
let registration = SharedCursor::attach(®istration_name).ok()?;
let slot = registration.fetch_add(1, Ordering::AcqRel);
if slot < 0 {
return None;
}
Some(auto_consumer_id(slot as usize))
}
fn default_consumer_id(base_name: &str) -> String {
if let Some(consumer_id) = allocate_registered_consumer_id(base_name) {
return consumer_id;
}
let process_id = std::process::id();
let consumer_counter = CONSUMER_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("c{}_{}", process_id % 10000, consumer_counter)
}
#[derive(Debug, Clone, Default)]
pub enum AutoWaitStrategy {
BusySpin,
BusySpinWithSpinLoopHint,
SpinThenYield {
spins: usize,
},
Sleep(Duration),
#[default]
Block,
}
impl AutoWaitStrategy {
pub fn high_performance() -> Self {
AutoWaitStrategy::BusySpin
}
pub fn high_performance_with_hints() -> Self {
AutoWaitStrategy::BusySpinWithSpinLoopHint
}
pub fn spin_then_yield(spins: usize) -> Self {
AutoWaitStrategy::SpinThenYield { spins }
}
pub fn cpu_efficient() -> Self {
AutoWaitStrategy::Sleep(Duration::from_micros(1))
}
pub fn sleep(duration: Duration) -> Self {
AutoWaitStrategy::Sleep(duration)
}
pub fn sleep_nanos(nanos: u64) -> Self {
if nanos == 0 {
AutoWaitStrategy::BusySpinWithSpinLoopHint
} else {
AutoWaitStrategy::Sleep(Duration::from_nanos(nanos))
}
}
pub fn sleep_micros(micros: u64) -> Self {
if micros == 0 {
AutoWaitStrategy::BusySpinWithSpinLoopHint
} else {
AutoWaitStrategy::Sleep(Duration::from_micros(micros))
}
}
pub fn from_env_or(default: AutoWaitStrategy) -> Self {
if let Some(nanos) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_NS) {
return Self::sleep_nanos(nanos);
}
if let Some(micros) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_US) {
return Self::sleep_micros(micros);
}
default
}
}
pub struct AutoConsumer {
join_handle: Option<JoinHandle<()>>,
shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl AutoConsumer {
fn new(
join_handle: JoinHandle<()>,
shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
Self {
join_handle: Some(join_handle),
shutdown_signal,
}
}
pub fn shutdown(&self) {
self.shutdown_signal
.store(true, std::sync::atomic::Ordering::Release);
}
pub fn join(&mut self) {
if let Some(handle) = self.join_handle.take() {
let _ = handle.join();
}
}
pub fn shutdown_and_join(&mut self) {
self.shutdown();
self.join();
}
pub fn is_running(&self) -> bool {
self.join_handle.is_some()
&& !self
.shutdown_signal
.load(std::sync::atomic::Ordering::Acquire)
}
}
impl Drop for AutoConsumer {
fn drop(&mut self) {
self.shutdown_signal
.store(true, std::sync::atomic::Ordering::Release);
if let Some(handle) = self.join_handle.take() {
std::thread::sleep(super::wait::SLEEP_CONFIG.shutdown_grace_duration());
match handle.join() {
Ok(_) => {
}
Err(_) => {
eprintln!(
"Warning: AutoConsumer thread terminated unexpectedly during cleanup"
);
}
}
}
}
}
pub struct SharedDisruptorBuilder<E> {
config: SharedMemoryConfig,
coordination_mode: Option<CoordinationMode>,
discovery_mode: Option<DiscoveryMode>,
consumer_id: Option<String>,
process_core: Option<usize>,
consumer_core: Option<usize>,
coordination_timeout: Option<Duration>,
_phantom: std::marker::PhantomData<E>,
}
#[derive(Copy, Clone)]
enum ProcessRole {
Producer,
Consumer,
}
impl ProcessRole {
fn env_var(self) -> &'static str {
match self {
ProcessRole::Producer => runtime_env::PRODUCER_CORE,
ProcessRole::Consumer => runtime_env::CONSUMER_CORE,
}
}
fn label(self) -> &'static str {
match self {
ProcessRole::Producer => "producer process",
ProcessRole::Consumer => "consumer process",
}
}
}
impl<E> SharedDisruptorBuilder<E>
where
E: Copy + Default + 'static,
{
pub fn new(config: SharedMemoryConfig) -> Self {
Self {
config,
coordination_mode: None,
discovery_mode: None,
consumer_id: None,
process_core: None,
consumer_core: None,
coordination_timeout: None,
_phantom: std::marker::PhantomData,
}
}
pub fn with_coordination_timeout(mut self, timeout: Duration) -> Self {
self.coordination_timeout = Some(timeout);
self
}
pub fn with_coordination(mut self, coordination_mode: CoordinationMode) -> Self {
self.coordination_mode = Some(coordination_mode);
self
}
pub fn with_discovery(mut self, discovery_mode: DiscoveryMode) -> Self {
self.discovery_mode = Some(discovery_mode);
self
}
pub fn disable_discovery(self) -> Self {
self.with_discovery(DiscoveryMode::Disabled)
}
pub fn enable_discovery(self, max_consumers: usize) -> Self {
self.with_discovery(DiscoveryMode::enabled(max_consumers))
}
pub fn discover_consumer_with_prefix(self, max_consumers: usize, prefix: &str) -> Self {
self.with_discovery(DiscoveryMode::with_consumer_prefix(
max_consumers,
prefix.to_string(),
))
}
pub fn with_discovery_interval(self, max_consumers: usize, scan_interval: Duration) -> Self {
self.with_discovery(DiscoveryMode::with_scan_interval(
max_consumers,
scan_interval,
))
}
pub fn discover_consumer_with_prefix_and_interval(
self,
max_consumers: usize,
prefix: &str,
scan_interval: Duration,
) -> Self {
self.with_discovery(DiscoveryMode::with_consumer_prefix_and_interval(
max_consumers,
prefix.to_string(),
scan_interval,
))
}
pub fn wait_for_single_consumer(self, timeout: Duration) -> Self {
self.with_coordination(CoordinationMode::wait_for_single_consumer(timeout))
}
pub fn wait_for_consumers(self, min_consumers: i64, timeout: Duration) -> Self {
self.with_coordination(CoordinationMode::wait_for_consumers(min_consumers, timeout))
}
pub fn with_consumer_id(mut self, consumer_id: &str) -> Self {
self.consumer_id = Some(consumer_id.to_string());
self
}
pub fn with_consumer_core(mut self, core_id: usize) -> Self {
self.consumer_core = Some(core_id);
self
}
pub fn with_process_core(mut self, core_id: usize) -> Self {
self.process_core = Some(core_id);
self
}
fn maybe_pin_process_to_core(&self, role: ProcessRole) {
if let Some(core_id) = resolve_process_core(self.process_core, role) {
pin_current_thread_to_core(core_id, role.label());
}
}
pub fn handle_events_batch<EH>(
self,
mut event_handler: EH,
wait_strategy: AutoWaitStrategy,
) -> MultiProcessResult<AutoConsumer>
where
EH: 'static + Send + FnMut(&E, Sequence, bool),
{
self.maybe_pin_process_to_core(ProcessRole::Consumer);
let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
let producer_sequence_name = format!("{}_producer_seq", self.config.name);
let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
let consumer_id = if let Some(custom_id) = self.consumer_id {
custom_id
} else {
default_consumer_id(&self.config.name)
};
let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
if consumer_sequence.is_owner() {
consumer_sequence.set_owner(false);
}
let mut consumer = SharedConsumer::new_with_coordination(
ring_buffer,
producer_sequence,
consumer_sequence,
consumer_id,
Some(self.config.name.clone()),
);
let shutdown_signal = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_signal_clone = std::sync::Arc::clone(&shutdown_signal);
let consumer_core = resolve_auto_consumer_core(self.consumer_core);
let thread_name = format!("multiprocess-consumer-{}", std::process::id());
let join_handle = thread::Builder::new()
.name(thread_name.clone())
.spawn(move || {
if let Some(core_id) = consumer_core {
pin_current_thread_to_core(core_id, &thread_name);
}
loop {
let processed = match wait_strategy {
AutoWaitStrategy::BusySpin => {
consumer.process_available(|event, seq| {
event_handler(event, seq, false);
})
}
AutoWaitStrategy::BusySpinWithSpinLoopHint => {
let processed = consumer.process_available(|event, seq| {
event_handler(event, seq, false);
});
if processed == 0 {
std::hint::spin_loop();
}
processed
}
AutoWaitStrategy::SpinThenYield { spins } => {
let processed = consumer.process_available(|event, seq| {
event_handler(event, seq, false);
});
if processed == 0 {
for _ in 0..spins {
std::hint::spin_loop();
}
std::thread::yield_now();
}
processed
}
AutoWaitStrategy::Block => {
let processed = consumer.process_available(|event, seq| {
event_handler(event, seq, false);
});
if processed == 0 {
super::wait::perform_default_block_wait();
}
processed
}
AutoWaitStrategy::Sleep(duration) => {
let processed = consumer.process_available(|event, seq| {
event_handler(event, seq, false);
});
if processed == 0 {
super::wait::sleep_or_yield(duration);
}
processed
}
};
if processed == 0
&& shutdown_signal_clone.load(std::sync::atomic::Ordering::Acquire)
{
break;
}
}
})
.expect("Should spawn consumer thread");
Ok(AutoConsumer::new(join_handle, shutdown_signal))
}
pub fn handle_events_with<EH>(self, event_handler: EH) -> MultiProcessResult<AutoConsumer>
where
EH: 'static + Send + FnMut(&E, Sequence, bool),
{
self.handle_events_batch(event_handler, AutoWaitStrategy::default())
}
pub fn build_producer<F>(self, event_factory: F) -> MultiProcessResult<SharedProducer<E>>
where
F: FnMut() -> E,
{
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
std::thread::sleep(Duration::from_millis(100));
}
self.maybe_pin_process_to_core(ProcessRole::Producer);
let discovery_mode = self.discovery_mode.unwrap_or_default();
let consumer_registration =
create_consumer_registration_cursor(&self.config.name, &discovery_mode)?;
let ring_buffer: SharedRingBuffer<E> =
SharedRingBuffer::new(self.config.clone(), event_factory)?;
let producer_sequence_name = format!("{}_producer_seq", self.config.name);
let producer_sequence = SharedCursor::new(&producer_sequence_name, -1)?;
let coordination_mode = self.coordination_mode.unwrap_or_else(|| {
match &discovery_mode {
DiscoveryMode::Enabled { max_consumers, .. } => {
let timeout = self.coordination_timeout.unwrap_or_else(|| {
match *max_consumers {
1 => Duration::from_secs(15), 2 => Duration::from_secs(20), 3..=4 => Duration::from_secs(25), 5..=8 => Duration::from_secs(30), _ => Duration::from_secs(45), }
});
CoordinationMode::wait_for_consumers(*max_consumers as i64, timeout)
}
_ => CoordinationMode::default(), }
});
let mut producer = SharedProducer::new_with_coordination_and_discovery(
ring_buffer,
producer_sequence,
self.config.name.clone(), coordination_mode.clone(),
discovery_mode,
consumer_registration,
);
match coordination_mode {
CoordinationMode::Immediate => {
producer.coordination_completed = true;
}
CoordinationMode::WaitForConsumers {
min_consumers,
timeout,
} => {
println!(
"Framework coordinating startup: waiting for {} consumers (timeout: {:?})...",
min_consumers, timeout
);
let coordination_ready = producer
.consumer_barrier
.wait_for_consumers_ready(min_consumers, timeout);
if !coordination_ready {
eprintln!("Warning: Timed out waiting for {} consumers after {:?}. Producer created anyway.",
min_consumers, timeout);
println!(
"Framework coordination incomplete - producer continuing without {} ready consumers",
min_consumers
);
} else {
println!(
"Framework coordination completed - {} consumers ready",
min_consumers
);
}
producer.coordination_completed = true;
}
CoordinationMode::BufferUntilConsumers { .. } => {
producer.coordination_completed = true;
}
}
Ok(producer)
}
pub fn build_consumer(self) -> MultiProcessResult<SharedConsumer<E>> {
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
std::thread::sleep(Duration::from_millis(100));
}
self.maybe_pin_process_to_core(ProcessRole::Consumer);
let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
let producer_sequence_name = format!("{}_producer_seq", self.config.name);
let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
let consumer_id = if let Some(custom_id) = self.consumer_id {
custom_id
} else {
default_consumer_id(&self.config.name)
};
let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
if consumer_sequence.is_owner() {
consumer_sequence.set_owner(false);
}
Ok(SharedConsumer::new_with_coordination(
ring_buffer,
producer_sequence,
consumer_sequence,
consumer_id,
Some(self.config.name.clone()),
))
}
}
fn resolve_auto_consumer_core(consumer_core: Option<usize>) -> Option<usize> {
if let Some(core_id) = consumer_core {
return Some(core_id);
}
resolve_core_from_env_vars(&[runtime_env::AUTO_CONSUMER_CORE])
}
fn resolve_process_core(process_core: Option<usize>, role: ProcessRole) -> Option<usize> {
if let Some(core_id) = process_core {
return Some(core_id);
}
resolve_core_from_env_vars(&[role.env_var(), runtime_env::PROCESS_CORE])
}
fn resolve_core_from_env_vars(env_vars: &[&str]) -> Option<usize> {
for env_var in env_vars {
match env::var(env_var) {
Ok(raw) => match raw.parse::<usize>() {
Ok(core_id) => return Some(core_id),
Err(_) => {
eprintln!(
"Invalid {}='{}'. Expected a non-negative integer.",
env_var, raw
);
return None;
}
},
Err(env::VarError::NotPresent) => {}
Err(env::VarError::NotUnicode(_)) => {
eprintln!("Invalid {}: value is not valid Unicode.", env_var);
return None;
}
}
}
None
}
#[cfg(target_os = "linux")]
fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
let available_cores = core_affinity::get_core_ids().unwrap_or_default();
let core = core_affinity::CoreId { id: core_id };
if !available_cores
.iter()
.any(|candidate| candidate.id == core_id)
{
eprintln!(
"Could not pin {} to core {}: core not available.",
thread_name, core_id
);
return;
}
if !core_affinity::set_for_current(core) {
eprintln!("Could not pin {} to core {}.", thread_name, core_id);
}
}
#[cfg(not(target_os = "linux"))]
fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
eprintln!(
"CPU affinity support is Linux-only in disruptor-mp. Requested pinning {} -> core {} ignored; bring-up continues without pinning on this platform.",
thread_name, core_id
);
}
pub fn build_shared_single_producer<E: Copy + Default + 'static>(
name: &str,
size: usize,
) -> SharedDisruptorBuilder<E> {
let config = SharedMemoryConfig {
name: name.to_string(),
buffer_size: size,
element_size: std::mem::size_of::<E>(),
create: true,
};
SharedDisruptorBuilder::new(config)
}
pub fn attach_shared_consumer<E: Copy + Default + 'static>(
name: &str,
size: usize,
) -> SharedDisruptorBuilder<E> {
let config = SharedMemoryConfig {
name: name.to_string(),
buffer_size: size,
element_size: std::mem::size_of::<E>(),
create: false,
};
SharedDisruptorBuilder::new(config)
}
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::OsString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
#[derive(Copy, Clone, Default)]
struct TestEvent {
value: i64,
}
fn env_lock() -> &'static Mutex<()> {
static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
ENV_LOCK.get_or_init(|| Mutex::new(()))
}
fn with_env_vars<const N: usize, F, R>(
vars: [(&'static str, Option<&'static str>); N],
f: F,
) -> R
where
F: FnOnce() -> R,
{
let _guard = env_lock().lock().expect("environment lock poisoned");
let previous: [(&'static str, Option<OsString>); N] =
vars.map(|(key, _)| (key, std::env::var_os(key)));
for (key, value) in vars {
match value {
Some(value) => std::env::set_var(key, value),
None => std::env::remove_var(key),
}
}
let result = f();
for (key, value) in previous {
match value {
Some(value) => std::env::set_var(key, value),
None => std::env::remove_var(key),
}
}
result
}
#[test]
fn test_block_wait_strategy_no_hang() {
let segment_name = format!("test_blk_{}", std::process::id() % 10000);
let buffer_size = 128;
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let events_received = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_received);
let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _eob| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
AutoWaitStrategy::Block,
)
.expect("Failed to create consumer");
std::thread::sleep(Duration::from_millis(100));
producer.publish(|event| {
event.value = 42;
});
std::thread::sleep(Duration::from_millis(50));
assert_eq!(events_received.load(Ordering::Relaxed), 1);
drop(consumer);
}
#[test]
fn test_busy_spin_wait_strategy() {
let segment_name = format!("test_spn_{}", std::process::id() % 10000);
let buffer_size = 128;
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let events_received = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_received);
let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _eob| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
AutoWaitStrategy::BusySpin,
)
.expect("Failed to create consumer");
for i in 0..10 {
producer.publish(|event| {
event.value = i;
});
}
std::thread::sleep(Duration::from_millis(50));
assert_eq!(events_received.load(Ordering::Relaxed), 10);
drop(consumer);
}
#[test]
fn test_sleep_wait_strategy() {
let segment_name = format!("test_slp_{}", std::process::id() % 10000);
let buffer_size = 128;
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let events_received = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_received);
let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _eob| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
AutoWaitStrategy::Sleep(Duration::from_millis(5)),
)
.expect("Failed to create consumer");
for i in 0..5 {
producer.publish(|event| {
event.value = i;
});
std::thread::sleep(Duration::from_millis(10));
}
std::thread::sleep(Duration::from_millis(100));
assert_eq!(events_received.load(Ordering::Relaxed), 5);
drop(consumer);
}
#[test]
fn test_auto_consumer_shutdown() {
let segment_name = format!("test_sht_{}", std::process::id() % 10000);
let buffer_size = 128;
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let events_received = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_received);
let mut consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _eob| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
AutoWaitStrategy::Block,
)
.expect("Failed to create consumer");
for i in 0..5 {
producer.publish(|event| {
event.value = i;
});
}
std::thread::sleep(Duration::from_millis(50));
assert_eq!(events_received.load(Ordering::Relaxed), 5);
consumer.shutdown();
for i in 5..10 {
producer.publish(|event| {
event.value = i;
});
}
std::thread::sleep(Duration::from_millis(50));
assert_eq!(events_received.load(Ordering::Relaxed), 5);
consumer.join();
}
#[test]
fn test_auto_consumer_batch_processing() {
let segment_name = format!("test_bat_{}", std::process::id() % 10000);
let buffer_size = 1024;
let events_processed = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_processed);
let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _end_of_batch| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
AutoWaitStrategy::Block,
)
.expect("Failed to create consumer");
for burst in 0..3 {
for i in 0..10 {
producer.publish(|event| {
event.value = burst * 10 + i;
});
}
std::thread::sleep(Duration::from_millis(50));
}
std::thread::sleep(Duration::from_millis(200));
let total_events = events_processed.load(Ordering::Relaxed);
assert_eq!(total_events, 30, "Should have processed all 30 events");
drop(consumer);
}
#[test]
#[ignore] fn test_wait_strategy_performance() {
let buffer_size = 8192;
let num_events = 100_000;
let strategies = vec![
("BusySpin", AutoWaitStrategy::BusySpin),
(
"BusySpinWithHint",
AutoWaitStrategy::BusySpinWithSpinLoopHint,
),
("Block", AutoWaitStrategy::Block),
(
"Sleep_1us",
AutoWaitStrategy::Sleep(Duration::from_micros(1)),
),
(
"Sleep_100us",
AutoWaitStrategy::Sleep(Duration::from_micros(100)),
),
];
for (name, strategy) in strategies {
let segment_name = format!("tst_p_{}_{}", name, std::process::id() % 10000);
let mut producer =
build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.build_producer(TestEvent::default)
.expect("Failed to create producer");
let events_received = Arc::new(AtomicUsize::new(0));
let events_clone = Arc::clone(&events_received);
let start = Instant::now();
let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
.handle_events_batch(
move |_event: &TestEvent, _seq, _eob| {
events_clone.fetch_add(1, Ordering::Relaxed);
},
strategy,
)
.expect("Failed to create consumer");
for i in 0..num_events {
producer.publish(|event| {
event.value = i;
});
}
while events_received.load(Ordering::Relaxed) < num_events as usize {
std::thread::sleep(Duration::from_millis(1));
}
let elapsed = start.elapsed();
let events_per_sec = num_events as f64 / elapsed.as_secs_f64();
println!("{} strategy: {:.0} events/sec", name, events_per_sec);
drop(consumer);
}
}
#[test]
fn test_consumer_core_override_is_preserved() {
let segment_name = format!("test_cpu_{}", std::process::id() % 10000);
let buffer_size = 128;
let builder =
attach_shared_consumer::<TestEvent>(&segment_name, buffer_size).with_consumer_core(3);
assert_eq!(builder.consumer_core, Some(3));
}
#[test]
fn test_consumer_core_resolve_from_builder_overrides_env() {
let override_core = 7usize;
let resolved = resolve_auto_consumer_core(Some(override_core));
assert_eq!(resolved, Some(override_core));
}
#[test]
fn test_consumer_core_resolve_from_env_var() {
with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("9"))], || {
let resolved = resolve_auto_consumer_core(None);
assert_eq!(resolved, Some(9));
});
}
#[test]
fn test_consumer_core_resolve_from_invalid_env_var() {
with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("invalid"))], || {
let resolved = resolve_auto_consumer_core(None);
assert_eq!(resolved, None);
});
}
#[test]
fn test_process_core_override_is_preserved() {
let segment_name = format!("test_prc_{}", std::process::id() % 10000);
let buffer_size = 128;
let builder = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
.with_process_core(5);
assert_eq!(builder.process_core, Some(5));
}
#[test]
fn test_process_core_resolve_from_builder_overrides_env() {
with_env_vars(
[
(runtime_env::PRODUCER_CORE, Some("7")),
(runtime_env::PROCESS_CORE, Some("9")),
],
|| {
let resolved = resolve_process_core(Some(11), ProcessRole::Producer);
assert_eq!(resolved, Some(11));
},
);
}
#[test]
fn test_process_core_resolve_from_role_specific_env_var() {
with_env_vars(
[
(runtime_env::PRODUCER_CORE, Some("6")),
(runtime_env::PROCESS_CORE, Some("8")),
],
|| {
let resolved = resolve_process_core(None, ProcessRole::Producer);
assert_eq!(resolved, Some(6));
},
);
}
#[test]
fn test_process_core_resolve_from_generic_env_var() {
with_env_vars(
[
(runtime_env::PRODUCER_CORE, None),
(runtime_env::PROCESS_CORE, Some("10")),
],
|| {
let resolved = resolve_process_core(None, ProcessRole::Producer);
assert_eq!(resolved, Some(10));
},
);
}
#[test]
fn test_process_core_resolve_from_invalid_role_specific_env_var() {
with_env_vars(
[
(runtime_env::CONSUMER_CORE, Some("invalid")),
(runtime_env::PROCESS_CORE, Some("12")),
],
|| {
let resolved = resolve_process_core(None, ProcessRole::Consumer);
assert_eq!(resolved, None);
},
);
}
}