disruptor_mp/builder.rs
1//! Builder surface for creating and attaching multiprocess transports.
2//!
3//! [`SharedDisruptorBuilder`] is the main configuration surface for:
4//!
5//! - shared-memory or mmap setup
6//! - startup coordination
7//! - consumer discovery
8//! - automatic event-handler threads
9//! - wait-policy selection
10//!
11//! The builder supports two broad usage styles:
12//!
13//! - automatic event handling via `handle_events_with(...)`
14//! - manual polling via `build_consumer()`
15
16use super::consumer::SharedConsumer;
17use super::consumer_barrier::{auto_consumer_id, consumer_registration_cursor_name, DiscoveryMode};
18use super::producer::{CoordinationMode, SharedProducer};
19use crate::env::{read, runtime as runtime_env};
20use crate::{MultiProcessResult, SharedCursor, SharedMemoryConfig, SharedRingBuffer};
21use disruptor_core::Sequence;
22use std::env;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::thread::{self, JoinHandle};
25use std::time::Duration;
26
27/// Global counter for generating unique consumer IDs within a process
28static CONSUMER_COUNTER: AtomicUsize = AtomicUsize::new(0);
29
30fn uses_registered_auto_ids(discovery_mode: &DiscoveryMode) -> bool {
31 matches!(
32 discovery_mode,
33 DiscoveryMode::Enabled {
34 consumer_prefix: None,
35 ..
36 }
37 )
38}
39
40fn create_consumer_registration_cursor(
41 base_name: &str,
42 discovery_mode: &DiscoveryMode,
43) -> MultiProcessResult<Option<SharedCursor>> {
44 if !uses_registered_auto_ids(discovery_mode) {
45 return Ok(None);
46 }
47
48 let registration_name = consumer_registration_cursor_name(base_name);
49 let cursor = SharedCursor::new_or_attach(®istration_name, 0)?;
50 Ok(Some(cursor))
51}
52
53fn allocate_registered_consumer_id(base_name: &str) -> Option<String> {
54 let registration_name = consumer_registration_cursor_name(base_name);
55 let registration = SharedCursor::attach(®istration_name).ok()?;
56 let slot = registration.fetch_add(1, Ordering::AcqRel);
57 if slot < 0 {
58 return None;
59 }
60
61 Some(auto_consumer_id(slot as usize))
62}
63
64fn default_consumer_id(base_name: &str) -> String {
65 if let Some(consumer_id) = allocate_registered_consumer_id(base_name) {
66 return consumer_id;
67 }
68
69 let process_id = std::process::id();
70 let consumer_counter = CONSUMER_COUNTER.fetch_add(1, Ordering::Relaxed);
71 format!("c{}_{}", process_id % 10000, consumer_counter)
72}
73
74/// Wait strategy for automatic event handlers
75#[derive(Debug, Clone, Default)]
76pub enum AutoWaitStrategy {
77 /// Maximum performance busy spinning (100% CPU usage) - true busy spin, no hints
78 BusySpin,
79 /// High performance busy spinning with spin loop hints (slightly lower CPU usage)
80 BusySpinWithSpinLoopHint,
81 /// Hybrid: spin N iterations, then yield the thread
82 SpinThenYield {
83 /// Number of spin-loop iterations before yielding once.
84 spins: usize,
85 },
86 /// CPU efficient with configurable sleep duration
87 Sleep(Duration),
88 /// Blocking with efficient waiting (balanced performance/CPU)
89 #[default]
90 Block,
91}
92
93impl AutoWaitStrategy {
94 /// Create a high performance wait strategy (true busy spin)
95 pub fn high_performance() -> Self {
96 AutoWaitStrategy::BusySpin
97 }
98
99 /// Create a high performance wait strategy with spin loop hints
100 pub fn high_performance_with_hints() -> Self {
101 AutoWaitStrategy::BusySpinWithSpinLoopHint
102 }
103
104 /// Create a hybrid strategy that spins N times then yields
105 pub fn spin_then_yield(spins: usize) -> Self {
106 AutoWaitStrategy::SpinThenYield { spins }
107 }
108
109 /// Create a CPU efficient wait strategy
110 pub fn cpu_efficient() -> Self {
111 AutoWaitStrategy::Sleep(Duration::from_micros(1))
112 }
113
114 /// Create a custom sleep-based wait strategy
115 pub fn sleep(duration: Duration) -> Self {
116 AutoWaitStrategy::Sleep(duration)
117 }
118
119 /// Create a sleep-based wait strategy with nanosecond precision
120 ///
121 /// # Special Values
122 /// - `0` = use `spin_loop()` instead of sleep (high performance)
123 /// - `1..` = Sleep for the specified nanoseconds (lower performance, CPU efficient)
124 ///
125 /// Note: Actual sleep precision depends on the operating system.
126 /// Very small durations (< 1000ns) may not sleep at all on some systems.
127 pub fn sleep_nanos(nanos: u64) -> Self {
128 if nanos == 0 {
129 AutoWaitStrategy::BusySpinWithSpinLoopHint
130 } else {
131 AutoWaitStrategy::Sleep(Duration::from_nanos(nanos))
132 }
133 }
134
135 /// Create a sleep-based wait strategy with microsecond precision
136 ///
137 /// # Special Values
138 /// - `0` = use `spin_loop()` instead of sleep (high performance)
139 /// - `1..` = Sleep for the specified microseconds (lower performance, CPU efficient)
140 pub fn sleep_micros(micros: u64) -> Self {
141 if micros == 0 {
142 AutoWaitStrategy::BusySpinWithSpinLoopHint
143 } else {
144 AutoWaitStrategy::Sleep(Duration::from_micros(micros))
145 }
146 }
147
148 /// Create wait strategy from environment variables with fallback
149 ///
150 /// Checks these environment variables in order:
151 /// 1. `MYELON_AUTO_WAIT_DELAY_NS` - nanosecond precision (`0` = `spin_loop`)
152 /// 2. `MYELON_AUTO_WAIT_DELAY_US` - microsecond precision (`0` = `spin_loop`)
153 /// 3. Falls back to the provided default
154 ///
155 /// # Examples
156 /// ```bash
157 /// # Use spin_loop (maximum performance)
158 /// export MYELON_AUTO_WAIT_DELAY_NS=0
159 ///
160 /// # Sleep for 100 nanoseconds
161 /// export MYELON_AUTO_WAIT_DELAY_NS=100
162 ///
163 /// # Sleep for 10 microseconds
164 /// export MYELON_AUTO_WAIT_DELAY_US=10
165 /// ```
166 pub fn from_env_or(default: AutoWaitStrategy) -> Self {
167 // Check for nanosecond precision first
168 if let Some(nanos) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_NS) {
169 return Self::sleep_nanos(nanos);
170 }
171
172 // Check for microsecond precision
173 if let Some(micros) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_US) {
174 return Self::sleep_micros(micros);
175 }
176
177 // Fall back to default
178 default
179 }
180}
181
182/// Automatic consumer that runs in a background thread.
183///
184/// This provides a handle to a consumer running automatic event processing
185/// in a separate thread. It supports graceful shutdown and cleanup operations.
186pub struct AutoConsumer {
187 join_handle: Option<JoinHandle<()>>,
188 shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
189}
190
191impl AutoConsumer {
192 fn new(
193 join_handle: JoinHandle<()>,
194 shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
195 ) -> Self {
196 Self {
197 join_handle: Some(join_handle),
198 shutdown_signal,
199 }
200 }
201
202 /// Signal the consumer to shutdown
203 pub fn shutdown(&self) {
204 self.shutdown_signal
205 .store(true, std::sync::atomic::Ordering::Release);
206 }
207
208 /// Wait for the consumer thread to finish
209 pub fn join(&mut self) {
210 if let Some(handle) = self.join_handle.take() {
211 let _ = handle.join();
212 }
213 }
214
215 /// Shutdown and wait for the consumer thread to finish
216 pub fn shutdown_and_join(&mut self) {
217 self.shutdown();
218 self.join();
219 }
220
221 /// Check if the consumer thread is still running
222 pub fn is_running(&self) -> bool {
223 self.join_handle.is_some()
224 && !self
225 .shutdown_signal
226 .load(std::sync::atomic::Ordering::Acquire)
227 }
228}
229
230/// Automatic resource management for [`AutoConsumer`].
231///
232/// Ensures proper cleanup when the consumer goes out of scope, which
233/// matters for embedded host runtimes (e.g. an FFI consumer wrapped by
234/// a garbage-collected language) that expect automatic resource
235/// management.
236impl Drop for AutoConsumer {
237 fn drop(&mut self) {
238 // Signal shutdown and wait for thread to finish
239 self.shutdown_signal
240 .store(true, std::sync::atomic::Ordering::Release);
241
242 if let Some(handle) = self.join_handle.take() {
243 // Give the thread a moment to see the shutdown signal
244 std::thread::sleep(super::wait::SLEEP_CONFIG.shutdown_grace_duration());
245
246 // Wait for clean shutdown (with timeout for safety)
247 match handle.join() {
248 Ok(_) => {
249 // Clean shutdown successful
250 }
251 Err(_) => {
252 // Thread panicked - this is unexpected but not fatal
253 eprintln!(
254 "Warning: AutoConsumer thread terminated unexpectedly during cleanup"
255 );
256 }
257 }
258 }
259 }
260}
261
262/// Builder for creating multi-process disruptors.
263///
264/// This builder provides a fluent API for configuring and creating shared memory
265/// disruptors with various coordination modes, discovery options, and event handling
266/// strategies. It supports both producer and consumer creation with automatic
267/// shared memory management.
268///
269/// ## Key features
270///
271/// - **Automatic event delivery**: `handle_events_with()` creates background processing
272/// - **Automatic resource management**: proper cleanup via `Drop` implementations
273/// - **Configurable timeouts**: customizable coordination and discovery timeouts
274/// - **Error handling**: clear error messages and robust failure handling
275///
276/// ## Usage Patterns
277///
278/// ```rust,no_run
279/// use disruptor_mp::*;
280/// use std::time::Duration;
281///
282/// #[derive(Copy, Clone, Default)]
283/// struct Event { data: i64 }
284///
285/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
286/// // Producer with automatic coordination
287/// let mut producer = build_shared_single_producer::<Event>("test", 1024)
288/// .enable_discovery(2)
289/// .with_coordination_timeout(Duration::from_secs(30))
290/// .build_producer(Event::default)?;
291///
292/// // Consumer with automatic event delivery
293/// let consumer = attach_shared_consumer::<Event>("test", 1024)
294/// .handle_events_with(|event, seq, eob| {
295/// // Events delivered automatically
296/// })?;
297/// # Ok(())
298/// # }
299/// ```
300pub struct SharedDisruptorBuilder<E> {
301 config: SharedMemoryConfig,
302 coordination_mode: Option<CoordinationMode>,
303 discovery_mode: Option<DiscoveryMode>,
304 consumer_id: Option<String>,
305 process_core: Option<usize>,
306 consumer_core: Option<usize>,
307 coordination_timeout: Option<Duration>,
308 _phantom: std::marker::PhantomData<E>,
309}
310
311#[derive(Copy, Clone)]
312enum ProcessRole {
313 Producer,
314 Consumer,
315}
316
317impl ProcessRole {
318 fn env_var(self) -> &'static str {
319 match self {
320 ProcessRole::Producer => runtime_env::PRODUCER_CORE,
321 ProcessRole::Consumer => runtime_env::CONSUMER_CORE,
322 }
323 }
324
325 fn label(self) -> &'static str {
326 match self {
327 ProcessRole::Producer => "producer process",
328 ProcessRole::Consumer => "consumer process",
329 }
330 }
331}
332
333impl<E> SharedDisruptorBuilder<E>
334where
335 E: Copy + Default + 'static,
336{
337 /// Create a new builder with the given configuration
338 pub fn new(config: SharedMemoryConfig) -> Self {
339 Self {
340 config,
341 coordination_mode: None,
342 discovery_mode: None,
343 consumer_id: None,
344 process_core: None,
345 consumer_core: None,
346 coordination_timeout: None,
347 _phantom: std::marker::PhantomData,
348 }
349 }
350
351 /// Set a custom coordination timeout
352 ///
353 /// This overrides the default adaptive timeout behavior for consumer coordination.
354 /// Useful for environments with specific timing requirements.
355 ///
356 /// # Examples
357 /// ```rust,ignore
358 /// use std::time::Duration;
359 /// let builder = builder.with_coordination_timeout(Duration::from_secs(60));
360 /// ```
361 pub fn with_coordination_timeout(mut self, timeout: Duration) -> Self {
362 self.coordination_timeout = Some(timeout);
363 self
364 }
365
366 /// Set the coordination mode for the producer
367 ///
368 /// This enables internal coordination to eliminate the need for external coordination structures.
369 ///
370 /// # Examples
371 /// ```rust,ignore
372 /// use std::time::Duration;
373 /// use disruptor_mp::producer::CoordinationMode;
374 ///
375 /// // Wait for single consumer with 30 second timeout
376 /// let builder = builder.with_coordination(
377 /// CoordinationMode::wait_for_single_consumer(Duration::from_secs(30))
378 /// );
379 ///
380 /// // Wait for multiple consumers with 45 second timeout
381 /// let builder = builder.with_coordination(
382 /// CoordinationMode::wait_for_consumers(3, Duration::from_secs(45))
383 /// );
384 /// ```
385 pub fn with_coordination(mut self, coordination_mode: CoordinationMode) -> Self {
386 self.coordination_mode = Some(coordination_mode);
387 self
388 }
389
390 /// Configure consumer discovery mode
391 ///
392 /// # Discovery Modes (Default: Disabled)
393 /// - `DiscoveryMode::Disabled` - No discovery (default), maximum performance for externally coordinated scenarios
394 /// - `DiscoveryMode::Enabled { consumer_prefix: None }` - Deterministic slot-based discovery
395 /// when coordination is present, with PID scanning as legacy fallback
396 /// - `DiscoveryMode::Enabled { consumer_prefix: Some("DIS_SM") }` - Optimized prefix-based discovery
397 ///
398 /// # Examples
399 /// ```rust,ignore
400 /// use disruptor_mp::lock_free::DiscoveryMode;
401 ///
402 /// // Enable basic discovery for 2 consumers (fixed topology)
403 /// let builder = builder.enable_discovery(2);
404 ///
405 /// // Use consumer naming convention for efficient discovery of 5 consumers
406 /// let builder = builder.discover_consumer_with_prefix(5, "DIS_SM");
407 /// ```
408 pub fn with_discovery(mut self, discovery_mode: DiscoveryMode) -> Self {
409 self.discovery_mode = Some(discovery_mode);
410 self
411 }
412
413 /// Disable consumer discovery (default behavior)
414 ///
415 /// This is ideal for externally coordinated scenarios like benchmarks
416 /// where consumer coordination is handled outside the disruptor.
417 /// Discovery is disabled by default for maximum performance.
418 pub fn disable_discovery(self) -> Self {
419 self.with_discovery(DiscoveryMode::Disabled)
420 }
421
422 /// Enable basic consumer discovery for fixed topology (convenience method)
423 ///
424 /// When startup coordination is present, consumers are assigned deterministic
425 /// slot-based IDs (`ad_0`, `ad_1`, ...) so discovery does not rely on PID
426 /// proximity. Without coordination, discovery falls back to PID scanning with
427 /// the legacy naming pattern `c{pid}_{counter}`.
428 /// Stops scanning once the expected number of consumers are discovered.
429 /// Default scan interval: 100ms.
430 pub fn enable_discovery(self, max_consumers: usize) -> Self {
431 self.with_discovery(DiscoveryMode::enabled(max_consumers))
432 }
433
434 /// Enable optimized discovery with consumer naming convention for fixed topology (convenience method)
435 ///
436 /// This uses consumer name prefixes for much faster consumer discovery
437 /// compared to PID scanning. Consumers will be discovered using names
438 /// like `DIS_SM_1`, `DIS_SM_2`, etc.
439 /// The prefix identifies the consumer group/application, not the OS process name.
440 /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
441 /// Default scan interval: 100ms.
442 pub fn discover_consumer_with_prefix(self, max_consumers: usize, prefix: &str) -> Self {
443 self.with_discovery(DiscoveryMode::with_consumer_prefix(
444 max_consumers,
445 prefix.to_string(),
446 ))
447 }
448
449 /// Enable discovery with custom scan interval for fixed topology (convenience method)
450 ///
451 /// This configures how often the producer scans for new consumers.
452 /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
453 /// Shorter intervals provide faster consumer detection but use more CPU.
454 /// Longer intervals reduce CPU usage but may delay consumer detection.
455 pub fn with_discovery_interval(self, max_consumers: usize, scan_interval: Duration) -> Self {
456 self.with_discovery(DiscoveryMode::with_scan_interval(
457 max_consumers,
458 scan_interval,
459 ))
460 }
461
462 /// Enable discovery with consumer prefix and custom scan interval for fixed topology (convenience method)
463 ///
464 /// Combines optimized consumer prefix discovery with configurable scan timing.
465 /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
466 pub fn discover_consumer_with_prefix_and_interval(
467 self,
468 max_consumers: usize,
469 prefix: &str,
470 scan_interval: Duration,
471 ) -> Self {
472 self.with_discovery(DiscoveryMode::with_consumer_prefix_and_interval(
473 max_consumers,
474 prefix.to_string(),
475 scan_interval,
476 ))
477 }
478
479 /// Enable coordination for single consumer scenarios (convenience method)
480 ///
481 /// This is equivalent to:
482 /// ```rust,ignore
483 /// use std::time::Duration;
484 /// use disruptor_mp::producer::CoordinationMode;
485 /// builder.with_coordination(CoordinationMode::wait_for_single_consumer(timeout))
486 /// ```
487 pub fn wait_for_single_consumer(self, timeout: Duration) -> Self {
488 self.with_coordination(CoordinationMode::wait_for_single_consumer(timeout))
489 }
490
491 /// Enable coordination for multiple consumer scenarios (convenience method)
492 pub fn wait_for_consumers(self, min_consumers: i64, timeout: Duration) -> Self {
493 self.with_coordination(CoordinationMode::wait_for_consumers(min_consumers, timeout))
494 }
495
496 /// Set a custom consumer ID for this consumer instance
497 ///
498 /// This is useful for prefix-based consumer discovery where consumers need
499 /// specific naming patterns. If not set, a default ID will be generated
500 /// using the pattern "c{pid}_{counter}".
501 ///
502 /// # Example
503 /// ```rust,ignore
504 /// let consumer = builder
505 /// .with_consumer_id("TEST_CONSUMER_1")
506 /// .build_consumer()?;
507 /// ```
508 pub fn with_consumer_id(mut self, consumer_id: &str) -> Self {
509 self.consumer_id = Some(consumer_id.to_string());
510 self
511 }
512
513 /// Bind automatic consumer thread to a specific CPU core.
514 ///
515 /// Uses Linux core affinity on Linux builds. On macOS and other non-Linux
516 /// platforms this configuration is accepted, ignored, and logged as a
517 /// best-effort no-op so consumer bring-up still succeeds.
518 ///
519 /// The builder resolves the effective affinity order:
520 /// 1. Explicit builder value.
521 /// 2. `MYELON_AUTO_CONSUMER_CORE` environment variable (if set).
522 pub fn with_consumer_core(mut self, core_id: usize) -> Self {
523 self.consumer_core = Some(core_id);
524 self
525 }
526
527 /// Bind this process (producer or consumer) to a specific CPU core.
528 ///
529 /// On Linux this pins the current thread before ring setup. For the normal
530 /// single-threaded producer/consumer entrypoints this acts as process-role
531 /// affinity, and newly spawned auto-consumer threads inherit the mask unless
532 /// they are pinned separately with `with_consumer_core()`. On macOS and
533 /// other non-Linux platforms this request is accepted, ignored, and logged
534 /// as a best-effort no-op so producer/consumer bring-up still succeeds.
535 ///
536 /// Resolution order:
537 /// 1. Explicit builder value via `with_process_core()`
538 /// 2. Role specific env var (`MYELON_PRODUCER_CORE`
539 /// or `MYELON_CONSUMER_CORE`)
540 /// 3. Generic env var `MYELON_PROCESS_CORE`
541 pub fn with_process_core(mut self, core_id: usize) -> Self {
542 self.process_core = Some(core_id);
543 self
544 }
545
546 fn maybe_pin_process_to_core(&self, role: ProcessRole) {
547 if let Some(core_id) = resolve_process_core(self.process_core, role) {
548 pin_current_thread_to_core(core_id, role.label());
549 }
550 }
551
552 /// Build a consumer with automatic batch event handling
553 ///
554 /// This provides automatic event delivery with configurable wait strategies.
555 /// Events are processed in batches for optimal performance.
556 ///
557 /// # Wait Strategies
558 /// - `AutoWaitStrategy::BusySpin` - Maximum performance (100% CPU, true busy spin)
559 /// - `AutoWaitStrategy::BusySpinWithSpinLoopHint` - High performance with CPU hints
560 /// - `AutoWaitStrategy::Block` - Balanced performance/CPU (default)
561 /// - `AutoWaitStrategy::Sleep(duration)` - CPU efficient with custom sleep
562 ///
563 /// # Examples
564 /// ```rust,ignore
565 /// use std::time::Duration;
566 /// use disruptor_mp::builder::AutoWaitStrategy;
567 ///
568 /// // Maximum performance for AI inference (true busy spin)
569 /// consumer.handle_events_batch(handler, AutoWaitStrategy::high_performance())
570 ///
571 /// // High performance with CPU hints (slightly more efficient)
572 /// consumer.handle_events_batch(handler, AutoWaitStrategy::high_performance_with_hints())
573 ///
574 /// // CPU efficient
575 /// consumer.handle_events_batch(handler, AutoWaitStrategy::cpu_efficient())
576 ///
577 /// // Custom sleep duration
578 /// consumer.handle_events_batch(handler, AutoWaitStrategy::sleep(Duration::from_micros(10)))
579 ///
580 /// // Balanced (default)
581 /// consumer.handle_events_batch(handler, AutoWaitStrategy::default())
582 /// ```
583 pub fn handle_events_batch<EH>(
584 self,
585 mut event_handler: EH,
586 wait_strategy: AutoWaitStrategy,
587 ) -> MultiProcessResult<AutoConsumer>
588 where
589 EH: 'static + Send + FnMut(&E, Sequence, bool),
590 {
591 self.maybe_pin_process_to_core(ProcessRole::Consumer);
592
593 let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
594
595 // Attach to existing shared atomics
596 let producer_sequence_name = format!("{}_producer_seq", self.config.name);
597 let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
598
599 // Use custom consumer ID if provided, otherwise generate a deterministic
600 // slot-based ID when coordinated discovery is available.
601 let consumer_id = if let Some(custom_id) = self.consumer_id {
602 custom_id
603 } else {
604 default_consumer_id(&self.config.name)
605 };
606
607 // Create this consumer's own sequence tracker
608 let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
609 let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
610 if consumer_sequence.is_owner() {
611 consumer_sequence.set_owner(false);
612 }
613
614 // Create the consumer with coordination support
615 let mut consumer = SharedConsumer::new_with_coordination(
616 ring_buffer,
617 producer_sequence,
618 consumer_sequence,
619 consumer_id,
620 Some(self.config.name.clone()),
621 );
622
623 // Create shutdown signal
624 let shutdown_signal = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
625 let shutdown_signal_clone = std::sync::Arc::clone(&shutdown_signal);
626 let consumer_core = resolve_auto_consumer_core(self.consumer_core);
627 let thread_name = format!("multiprocess-consumer-{}", std::process::id());
628
629 // Spawn background thread for automatic event processing
630 let join_handle = thread::Builder::new()
631 .name(thread_name.clone())
632 .spawn(move || {
633 if let Some(core_id) = consumer_core {
634 pin_current_thread_to_core(core_id, &thread_name);
635 }
636
637 loop {
638 let processed = match wait_strategy {
639 AutoWaitStrategy::BusySpin => {
640 // Maximum performance: True busy spin
641 // TRUE BUSY SPIN: Do nothing when no events (like single-process BusySpin)
642 // This achieves maximum throughput at 100% CPU usage
643 consumer.process_available(|event, seq| {
644 // Approximate end_of_batch as false for maximum performance
645 event_handler(event, seq, false);
646 })
647 }
648 AutoWaitStrategy::BusySpinWithSpinLoopHint => {
649 // High performance: Busy spin with hints
650 let processed = consumer.process_available(|event, seq| {
651 // Approximate end_of_batch as false for maximum performance
652 event_handler(event, seq, false);
653 });
654
655 if processed == 0 {
656 std::hint::spin_loop();
657 }
658 processed
659 }
660 AutoWaitStrategy::SpinThenYield { spins } => {
661 // Hybrid: spin N times then yield to reduce tail latencies
662 let processed = consumer.process_available(|event, seq| {
663 event_handler(event, seq, false);
664 });
665
666 if processed == 0 {
667 // Spin N iterations using spin_loop hint
668 for _ in 0..spins {
669 std::hint::spin_loop();
670 }
671 // Then yield the thread to reduce scheduler starvation
672 std::thread::yield_now();
673 }
674 processed
675 }
676 AutoWaitStrategy::Block => {
677 // ⚖️ BALANCED: Batch processing with non-blocking check first
678 // Check if events are immediately available
679 let processed = consumer.process_available(|event, seq| {
680 // Approximate end_of_batch as false for simplicity
681 event_handler(event, seq, false);
682 });
683
684 if processed == 0 {
685 // No events available, wait a bit before trying again
686 // This prevents the infinite spin in consume_next()
687 super::wait::perform_default_block_wait();
688 }
689
690 processed
691 }
692 AutoWaitStrategy::Sleep(duration) => {
693 // CPU efficient: Batch processing with configurable sleep
694 let processed = consumer.process_available(|event, seq| {
695 // Approximate end_of_batch as false for simplicity
696 event_handler(event, seq, false);
697 });
698
699 if processed == 0 {
700 super::wait::sleep_or_yield(duration);
701 }
702 processed
703 }
704 };
705
706 // Continue processing - the wait strategy handles timing
707
708 // Performance optimization: Only check shutdown when idle (no events processed)
709 // This avoids atomic loads in the hot path when processing events
710 if processed == 0
711 && shutdown_signal_clone.load(std::sync::atomic::Ordering::Acquire)
712 {
713 break;
714 }
715 }
716 })
717 .expect("Should spawn consumer thread");
718
719 Ok(AutoConsumer::new(join_handle, shutdown_signal))
720 }
721
722 /// Build a consumer with automatic event handling (convenience method)
723 ///
724 /// Uses the default balanced wait strategy (Block).
725 /// For performance tuning, use `handle_events_batch()` with explicit wait strategy.
726 pub fn handle_events_with<EH>(self, event_handler: EH) -> MultiProcessResult<AutoConsumer>
727 where
728 EH: 'static + Send + FnMut(&E, Sequence, bool),
729 {
730 self.handle_events_batch(event_handler, AutoWaitStrategy::default())
731 }
732
733 /// Build a producer (creates shared memory segments)
734 ///
735 /// Note: Only one producer can exist per shared memory segment.
736 /// For multi-process scenarios, use Single Producer Multiple Consumer (SPMC) pattern.
737 pub fn build_producer<F>(self, event_factory: F) -> MultiProcessResult<SharedProducer<E>>
738 where
739 F: FnMut() -> E,
740 {
741 #[cfg(dst)]
742 if crate::dst::buggify(file!(), line!()) {
743 std::thread::sleep(Duration::from_millis(100));
744 }
745
746 self.maybe_pin_process_to_core(ProcessRole::Producer);
747
748 let discovery_mode = self.discovery_mode.unwrap_or_default();
749 let consumer_registration =
750 create_consumer_registration_cursor(&self.config.name, &discovery_mode)?;
751
752 let ring_buffer: SharedRingBuffer<E> =
753 SharedRingBuffer::new(self.config.clone(), event_factory)?;
754
755 // Create shared atomic for producer sequence
756 let producer_sequence_name = format!("{}_producer_seq", self.config.name);
757 let producer_sequence = SharedCursor::new(&producer_sequence_name, -1)?;
758
759 // Automatic coordination when discovery is enabled
760 // This eliminates the need for users to manually configure coordination
761 let coordination_mode = self.coordination_mode.unwrap_or_else(|| {
762 // Auto-enable coordination when discovery is enabled
763 match &discovery_mode {
764 DiscoveryMode::Enabled { max_consumers, .. } => {
765 // Use custom timeout if provided, otherwise use adaptive timeout
766 let timeout = self.coordination_timeout.unwrap_or_else(|| {
767 // FRAMEWORK INTELLIGENCE: Adaptive timeout based on consumer count
768 match *max_consumers {
769 1 => Duration::from_secs(15), // SPSC: Quick startup
770 2 => Duration::from_secs(20), // SPMC-2: Proven sweet spot
771 3..=4 => Duration::from_secs(25), // SPMC-3/4: Moderate
772 5..=8 => Duration::from_secs(30), // SPMC-5+: More time for coordination
773 _ => Duration::from_secs(45), // SPMC-9+: Maximum timeout
774 }
775 });
776
777 CoordinationMode::wait_for_consumers(*max_consumers as i64, timeout)
778 }
779 _ => CoordinationMode::default(), // No discovery = immediate start
780 }
781 });
782
783 let mut producer = SharedProducer::new_with_coordination_and_discovery(
784 ring_buffer,
785 producer_sequence,
786 self.config.name.clone(), // Pass base name for consumer discovery
787 coordination_mode.clone(),
788 discovery_mode,
789 consumer_registration,
790 );
791
792 // Handle coordination during producer creation, not first publish
793 match coordination_mode {
794 CoordinationMode::Immediate => {
795 // No coordination - for external coordination benchmarks
796 producer.coordination_completed = true;
797 }
798 CoordinationMode::WaitForConsumers {
799 min_consumers,
800 timeout,
801 } => {
802 println!(
803 "Framework coordinating startup: waiting for {} consumers (timeout: {:?})...",
804 min_consumers, timeout
805 );
806 let coordination_ready = producer
807 .consumer_barrier
808 .wait_for_consumers_ready(min_consumers, timeout);
809 if !coordination_ready {
810 eprintln!("Warning: Timed out waiting for {} consumers after {:?}. Producer created anyway.",
811 min_consumers, timeout);
812 println!(
813 "Framework coordination incomplete - producer continuing without {} ready consumers",
814 min_consumers
815 );
816 } else {
817 println!(
818 "Framework coordination completed - {} consumers ready",
819 min_consumers
820 );
821 }
822 // Mark coordination as completed
823 producer.coordination_completed = true;
824 }
825 CoordinationMode::BufferUntilConsumers { .. } => {
826 // Future enhancement - for now, treat as completed
827 producer.coordination_completed = true;
828 }
829 }
830
831 Ok(producer)
832 }
833
834 /// Build a consumer (attaches to existing shared memory segments)
835 pub fn build_consumer(self) -> MultiProcessResult<SharedConsumer<E>> {
836 #[cfg(dst)]
837 if crate::dst::buggify(file!(), line!()) {
838 std::thread::sleep(Duration::from_millis(100));
839 }
840
841 self.maybe_pin_process_to_core(ProcessRole::Consumer);
842
843 let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
844
845 // Attach to existing shared atomics
846 let producer_sequence_name = format!("{}_producer_seq", self.config.name);
847 let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
848
849 // Use custom consumer ID if provided, otherwise generate a deterministic
850 // slot-based ID when coordinated discovery is available.
851 let consumer_id = if let Some(custom_id) = self.consumer_id {
852 custom_id
853 } else {
854 default_consumer_id(&self.config.name)
855 };
856
857 // Create this consumer's own sequence tracker
858 // Note: Uses new_or_attach because multiple consumers might start simultaneously
859 // and try to create the same sequence name (different from coordination structures)
860 let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
861 let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
862 if consumer_sequence.is_owner() {
863 consumer_sequence.set_owner(false);
864 }
865
866 Ok(SharedConsumer::new_with_coordination(
867 ring_buffer,
868 producer_sequence,
869 consumer_sequence,
870 consumer_id,
871 Some(self.config.name.clone()),
872 ))
873 }
874}
875
876fn resolve_auto_consumer_core(consumer_core: Option<usize>) -> Option<usize> {
877 if let Some(core_id) = consumer_core {
878 return Some(core_id);
879 }
880
881 resolve_core_from_env_vars(&[runtime_env::AUTO_CONSUMER_CORE])
882}
883
884fn resolve_process_core(process_core: Option<usize>, role: ProcessRole) -> Option<usize> {
885 if let Some(core_id) = process_core {
886 return Some(core_id);
887 }
888
889 resolve_core_from_env_vars(&[role.env_var(), runtime_env::PROCESS_CORE])
890}
891
892fn resolve_core_from_env_vars(env_vars: &[&str]) -> Option<usize> {
893 for env_var in env_vars {
894 match env::var(env_var) {
895 Ok(raw) => match raw.parse::<usize>() {
896 Ok(core_id) => return Some(core_id),
897 Err(_) => {
898 eprintln!(
899 "Invalid {}='{}'. Expected a non-negative integer.",
900 env_var, raw
901 );
902 return None;
903 }
904 },
905 Err(env::VarError::NotPresent) => {}
906 Err(env::VarError::NotUnicode(_)) => {
907 eprintln!("Invalid {}: value is not valid Unicode.", env_var);
908 return None;
909 }
910 }
911 }
912
913 None
914}
915
916#[cfg(target_os = "linux")]
917fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
918 let available_cores = core_affinity::get_core_ids().unwrap_or_default();
919 let core = core_affinity::CoreId { id: core_id };
920
921 if !available_cores
922 .iter()
923 .any(|candidate| candidate.id == core_id)
924 {
925 eprintln!(
926 "Could not pin {} to core {}: core not available.",
927 thread_name, core_id
928 );
929 return;
930 }
931
932 if !core_affinity::set_for_current(core) {
933 eprintln!("Could not pin {} to core {}.", thread_name, core_id);
934 }
935}
936
937#[cfg(not(target_os = "linux"))]
938fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
939 eprintln!(
940 "CPU affinity support is Linux-only in disruptor-mp. Requested pinning {} -> core {} ignored; bring-up continues without pinning on this platform.",
941 thread_name, core_id
942 );
943}
944
945/// Create a shared single producer for multi-process communication
946///
947/// This creates a Single Producer Multiple Consumer (SPMC) setup where:
948/// - One process creates and owns the producer
949/// - Multiple processes can attach as consumers (each sees all events)
950///
951/// Note: `SharedProducer` cannot be cloned across processes. Each shared memory
952/// segment supports exactly one producer process.
953///
954/// For automatic coordination, use the builder pattern:
955/// ```rust,ignore
956/// use std::time::Duration;
957/// use disruptor_mp::build_shared_single_producer;
958///
959/// let producer = build_shared_single_producer::<Event>("test", 1024)
960/// .wait_for_single_consumer(Duration::from_secs(30))
961/// .build_producer(Event::default)?;
962/// ```
963/// Create a builder for a shared single producer with the given name and size.
964///
965/// This is a convenience function that creates a [`SharedDisruptorBuilder`] configured
966/// for producer creation with the specified shared memory segment name and buffer size.
967///
968/// # Arguments
969/// * `name` - Shared memory segment name (keep under 10 characters for cross-platform compatibility)
970/// * `size` - Ring buffer size (must be power of 2)
971///
972/// # Examples
973/// ```rust
974/// use disruptor_mp::build_shared_single_producer;
975///
976/// #[derive(Copy, Clone, Default)]
977/// struct Event { data: i64 }
978///
979/// let builder = build_shared_single_producer::<Event>("ring123", 1024);
980/// let producer = builder.build_producer(|| Event::default())?;
981/// # Ok::<(), Box<dyn std::error::Error>>(())
982/// ```
983pub fn build_shared_single_producer<E: Copy + Default + 'static>(
984 name: &str,
985 size: usize,
986) -> SharedDisruptorBuilder<E> {
987 let config = SharedMemoryConfig {
988 name: name.to_string(),
989 buffer_size: size,
990 element_size: std::mem::size_of::<E>(),
991 create: true,
992 };
993
994 SharedDisruptorBuilder::new(config)
995}
996
997/// Attach to an existing shared disruptor as a consumer
998///
999/// This allows multiple consumer processes to attach to a shared memory segment
1000/// created by a producer process. Each consumer will see all events (broadcast semantics).
1001/// Create a builder for attaching to an existing shared consumer.
1002///
1003/// This is a convenience function that creates a [`SharedDisruptorBuilder`] configured
1004/// for consumer attachment to an existing shared memory segment created by a producer.
1005///
1006/// # Arguments
1007/// * `name` - Shared memory segment name (must match the producer's name)
1008/// * `size` - Ring buffer size (must match the producer's size)
1009///
1010/// # Examples
1011/// ```rust,no_run
1012/// use disruptor_mp::attach_shared_consumer;
1013///
1014/// #[derive(Copy, Clone, Default)]
1015/// struct Event { data: i64 }
1016///
1017/// let builder = attach_shared_consumer::<Event>("ring123", 1024);
1018/// let consumer = builder.build_consumer()?;
1019/// # Ok::<(), Box<dyn std::error::Error>>(())
1020/// ```
1021pub fn attach_shared_consumer<E: Copy + Default + 'static>(
1022 name: &str,
1023 size: usize,
1024) -> SharedDisruptorBuilder<E> {
1025 let config = SharedMemoryConfig {
1026 name: name.to_string(),
1027 buffer_size: size,
1028 element_size: std::mem::size_of::<E>(),
1029 create: false,
1030 };
1031
1032 SharedDisruptorBuilder::new(config)
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037 use super::*;
1038 use std::ffi::OsString;
1039 use std::sync::atomic::{AtomicUsize, Ordering};
1040 use std::sync::Arc;
1041 use std::sync::{Mutex, OnceLock};
1042 use std::time::{Duration, Instant};
1043
1044 #[derive(Copy, Clone, Default)]
1045 struct TestEvent {
1046 value: i64,
1047 }
1048
1049 fn env_lock() -> &'static Mutex<()> {
1050 static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
1051 ENV_LOCK.get_or_init(|| Mutex::new(()))
1052 }
1053
1054 fn with_env_vars<const N: usize, F, R>(
1055 vars: [(&'static str, Option<&'static str>); N],
1056 f: F,
1057 ) -> R
1058 where
1059 F: FnOnce() -> R,
1060 {
1061 let _guard = env_lock().lock().expect("environment lock poisoned");
1062 let previous: [(&'static str, Option<OsString>); N] =
1063 vars.map(|(key, _)| (key, std::env::var_os(key)));
1064
1065 for (key, value) in vars {
1066 match value {
1067 Some(value) => std::env::set_var(key, value),
1068 None => std::env::remove_var(key),
1069 }
1070 }
1071
1072 let result = f();
1073
1074 for (key, value) in previous {
1075 match value {
1076 Some(value) => std::env::set_var(key, value),
1077 None => std::env::remove_var(key),
1078 }
1079 }
1080
1081 result
1082 }
1083
1084 /// Test that Block wait strategy doesn't hang when no events are available
1085 #[test]
1086 fn test_block_wait_strategy_no_hang() {
1087 let segment_name = format!("test_blk_{}", std::process::id() % 10000);
1088 let buffer_size = 128;
1089
1090 // Create producer
1091 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1092 .build_producer(TestEvent::default)
1093 .expect("Failed to create producer");
1094
1095 // Create consumer with Block wait strategy
1096 let events_received = Arc::new(AtomicUsize::new(0));
1097 let events_clone = Arc::clone(&events_received);
1098
1099 let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1100 .handle_events_batch(
1101 move |_event: &TestEvent, _seq, _eob| {
1102 events_clone.fetch_add(1, Ordering::Relaxed);
1103 },
1104 AutoWaitStrategy::Block,
1105 )
1106 .expect("Failed to create consumer");
1107
1108 // Wait a bit to ensure consumer doesn't hang
1109 std::thread::sleep(Duration::from_millis(100));
1110
1111 // Publish an event
1112 producer.publish(|event| {
1113 event.value = 42;
1114 });
1115
1116 // Wait for event to be processed
1117 std::thread::sleep(Duration::from_millis(50));
1118
1119 // Verify event was received
1120 assert_eq!(events_received.load(Ordering::Relaxed), 1);
1121
1122 // Shutdown consumer
1123 drop(consumer);
1124 }
1125
1126 /// Test that `BusySpin` wait strategy works correctly
1127 #[test]
1128 fn test_busy_spin_wait_strategy() {
1129 let segment_name = format!("test_spn_{}", std::process::id() % 10000);
1130 let buffer_size = 128;
1131
1132 // Create producer
1133 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1134 .build_producer(TestEvent::default)
1135 .expect("Failed to create producer");
1136
1137 // Create consumer with BusySpin wait strategy
1138 let events_received = Arc::new(AtomicUsize::new(0));
1139 let events_clone = Arc::clone(&events_received);
1140
1141 let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1142 .handle_events_batch(
1143 move |_event: &TestEvent, _seq, _eob| {
1144 events_clone.fetch_add(1, Ordering::Relaxed);
1145 },
1146 AutoWaitStrategy::BusySpin,
1147 )
1148 .expect("Failed to create consumer");
1149
1150 // Publish multiple events
1151 for i in 0..10 {
1152 producer.publish(|event| {
1153 event.value = i;
1154 });
1155 }
1156
1157 // Wait for events to be processed
1158 std::thread::sleep(Duration::from_millis(50));
1159
1160 // Verify all events were received
1161 assert_eq!(events_received.load(Ordering::Relaxed), 10);
1162
1163 // Shutdown consumer
1164 drop(consumer);
1165 }
1166
1167 /// Test that Sleep wait strategy works correctly
1168 #[test]
1169 fn test_sleep_wait_strategy() {
1170 let segment_name = format!("test_slp_{}", std::process::id() % 10000);
1171 let buffer_size = 128;
1172
1173 // Create producer
1174 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1175 .build_producer(TestEvent::default)
1176 .expect("Failed to create producer");
1177
1178 // Create consumer with Sleep wait strategy
1179 let events_received = Arc::new(AtomicUsize::new(0));
1180 let events_clone = Arc::clone(&events_received);
1181
1182 let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1183 .handle_events_batch(
1184 move |_event: &TestEvent, _seq, _eob| {
1185 events_clone.fetch_add(1, Ordering::Relaxed);
1186 },
1187 AutoWaitStrategy::Sleep(Duration::from_millis(5)),
1188 )
1189 .expect("Failed to create consumer");
1190
1191 // Publish events
1192 for i in 0..5 {
1193 producer.publish(|event| {
1194 event.value = i;
1195 });
1196 std::thread::sleep(Duration::from_millis(10));
1197 }
1198
1199 // Wait for events to be processed
1200 std::thread::sleep(Duration::from_millis(100));
1201
1202 // Verify all events were received
1203 assert_eq!(events_received.load(Ordering::Relaxed), 5);
1204
1205 // Shutdown consumer
1206 drop(consumer);
1207 }
1208
1209 /// Test `AutoConsumer` shutdown mechanism
1210 #[test]
1211 fn test_auto_consumer_shutdown() {
1212 let segment_name = format!("test_sht_{}", std::process::id() % 10000);
1213 let buffer_size = 128;
1214
1215 // Create producer
1216 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1217 .build_producer(TestEvent::default)
1218 .expect("Failed to create producer");
1219
1220 // Create consumer
1221 let events_received = Arc::new(AtomicUsize::new(0));
1222 let events_clone = Arc::clone(&events_received);
1223
1224 let mut consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1225 .handle_events_batch(
1226 move |_event: &TestEvent, _seq, _eob| {
1227 events_clone.fetch_add(1, Ordering::Relaxed);
1228 },
1229 AutoWaitStrategy::Block,
1230 )
1231 .expect("Failed to create consumer");
1232
1233 // Publish some events
1234 for i in 0..5 {
1235 producer.publish(|event| {
1236 event.value = i;
1237 });
1238 }
1239
1240 // Wait for events to be processed
1241 std::thread::sleep(Duration::from_millis(50));
1242 assert_eq!(events_received.load(Ordering::Relaxed), 5);
1243
1244 // Shutdown consumer
1245 consumer.shutdown();
1246
1247 // Publish more events
1248 for i in 5..10 {
1249 producer.publish(|event| {
1250 event.value = i;
1251 });
1252 }
1253
1254 // Wait and verify no new events are processed
1255 std::thread::sleep(Duration::from_millis(50));
1256 assert_eq!(events_received.load(Ordering::Relaxed), 5);
1257
1258 // Clean up
1259 consumer.join();
1260 }
1261
1262 /// Test that `AutoConsumer` processes events correctly
1263 /// Note: The batch tracking with `end_of_batch` flag is not reliable in the current
1264 /// implementation as it's approximated for performance reasons
1265 #[test]
1266 fn test_auto_consumer_batch_processing() {
1267 let segment_name = format!("test_bat_{}", std::process::id() % 10000);
1268 let buffer_size = 1024;
1269
1270 // Track events processed (simpler test)
1271 let events_processed = Arc::new(AtomicUsize::new(0));
1272 let events_clone = Arc::clone(&events_processed);
1273
1274 // Create producer WITHOUT discovery (since buffer wrapping is now fixed)
1275 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1276 .build_producer(TestEvent::default)
1277 .expect("Failed to create producer");
1278
1279 // Create consumer after producer (now safe with buffer wrapping fix)
1280 let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1281 .handle_events_batch(
1282 move |_event: &TestEvent, _seq, _end_of_batch| {
1283 events_clone.fetch_add(1, Ordering::Relaxed);
1284 },
1285 AutoWaitStrategy::Block,
1286 )
1287 .expect("Failed to create consumer");
1288
1289 // Publish events in bursts
1290 for burst in 0..3 {
1291 for i in 0..10 {
1292 producer.publish(|event| {
1293 event.value = burst * 10 + i;
1294 });
1295 }
1296 std::thread::sleep(Duration::from_millis(50));
1297 }
1298
1299 // Wait for processing
1300 std::thread::sleep(Duration::from_millis(200)); // Slightly longer wait
1301
1302 // Verify all events were processed
1303 let total_events = events_processed.load(Ordering::Relaxed);
1304 assert_eq!(total_events, 30, "Should have processed all 30 events");
1305
1306 // Shutdown consumer
1307 drop(consumer);
1308 }
1309
1310 /// Test performance characteristics of different wait strategies
1311 #[test]
1312 #[ignore] // Ignore by default as this is a performance test
1313 fn test_wait_strategy_performance() {
1314 let buffer_size = 8192;
1315 let num_events = 100_000;
1316
1317 // Test each wait strategy
1318 let strategies = vec![
1319 ("BusySpin", AutoWaitStrategy::BusySpin),
1320 (
1321 "BusySpinWithHint",
1322 AutoWaitStrategy::BusySpinWithSpinLoopHint,
1323 ),
1324 ("Block", AutoWaitStrategy::Block),
1325 (
1326 "Sleep_1us",
1327 AutoWaitStrategy::Sleep(Duration::from_micros(1)),
1328 ),
1329 (
1330 "Sleep_100us",
1331 AutoWaitStrategy::Sleep(Duration::from_micros(100)),
1332 ),
1333 ];
1334
1335 for (name, strategy) in strategies {
1336 let segment_name = format!("tst_p_{}_{}", name, std::process::id() % 10000);
1337
1338 // Create producer
1339 let mut producer =
1340 build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1341 .build_producer(TestEvent::default)
1342 .expect("Failed to create producer");
1343
1344 // Create consumer
1345 let events_received = Arc::new(AtomicUsize::new(0));
1346 let events_clone = Arc::clone(&events_received);
1347 let start = Instant::now();
1348
1349 let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1350 .handle_events_batch(
1351 move |_event: &TestEvent, _seq, _eob| {
1352 events_clone.fetch_add(1, Ordering::Relaxed);
1353 },
1354 strategy,
1355 )
1356 .expect("Failed to create consumer");
1357
1358 // Publish events
1359 for i in 0..num_events {
1360 producer.publish(|event| {
1361 event.value = i;
1362 });
1363 }
1364
1365 // Wait for all events to be processed
1366 while events_received.load(Ordering::Relaxed) < num_events as usize {
1367 std::thread::sleep(Duration::from_millis(1));
1368 }
1369
1370 let elapsed = start.elapsed();
1371 let events_per_sec = num_events as f64 / elapsed.as_secs_f64();
1372
1373 println!("{} strategy: {:.0} events/sec", name, events_per_sec);
1374
1375 // Shutdown consumer
1376 drop(consumer);
1377 }
1378 }
1379
1380 #[test]
1381 fn test_consumer_core_override_is_preserved() {
1382 let segment_name = format!("test_cpu_{}", std::process::id() % 10000);
1383 let buffer_size = 128;
1384 let builder =
1385 attach_shared_consumer::<TestEvent>(&segment_name, buffer_size).with_consumer_core(3);
1386
1387 assert_eq!(builder.consumer_core, Some(3));
1388 }
1389
1390 #[test]
1391 fn test_consumer_core_resolve_from_builder_overrides_env() {
1392 let override_core = 7usize;
1393 let resolved = resolve_auto_consumer_core(Some(override_core));
1394 assert_eq!(resolved, Some(override_core));
1395 }
1396
1397 #[test]
1398 fn test_consumer_core_resolve_from_env_var() {
1399 with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("9"))], || {
1400 let resolved = resolve_auto_consumer_core(None);
1401 assert_eq!(resolved, Some(9));
1402 });
1403 }
1404
1405 #[test]
1406 fn test_consumer_core_resolve_from_invalid_env_var() {
1407 with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("invalid"))], || {
1408 let resolved = resolve_auto_consumer_core(None);
1409 assert_eq!(resolved, None);
1410 });
1411 }
1412
1413 #[test]
1414 fn test_process_core_override_is_preserved() {
1415 let segment_name = format!("test_prc_{}", std::process::id() % 10000);
1416 let buffer_size = 128;
1417 let builder = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1418 .with_process_core(5);
1419
1420 assert_eq!(builder.process_core, Some(5));
1421 }
1422
1423 #[test]
1424 fn test_process_core_resolve_from_builder_overrides_env() {
1425 with_env_vars(
1426 [
1427 (runtime_env::PRODUCER_CORE, Some("7")),
1428 (runtime_env::PROCESS_CORE, Some("9")),
1429 ],
1430 || {
1431 let resolved = resolve_process_core(Some(11), ProcessRole::Producer);
1432 assert_eq!(resolved, Some(11));
1433 },
1434 );
1435 }
1436
1437 #[test]
1438 fn test_process_core_resolve_from_role_specific_env_var() {
1439 with_env_vars(
1440 [
1441 (runtime_env::PRODUCER_CORE, Some("6")),
1442 (runtime_env::PROCESS_CORE, Some("8")),
1443 ],
1444 || {
1445 let resolved = resolve_process_core(None, ProcessRole::Producer);
1446 assert_eq!(resolved, Some(6));
1447 },
1448 );
1449 }
1450
1451 #[test]
1452 fn test_process_core_resolve_from_generic_env_var() {
1453 with_env_vars(
1454 [
1455 (runtime_env::PRODUCER_CORE, None),
1456 (runtime_env::PROCESS_CORE, Some("10")),
1457 ],
1458 || {
1459 let resolved = resolve_process_core(None, ProcessRole::Producer);
1460 assert_eq!(resolved, Some(10));
1461 },
1462 );
1463 }
1464
1465 #[test]
1466 fn test_process_core_resolve_from_invalid_role_specific_env_var() {
1467 with_env_vars(
1468 [
1469 (runtime_env::CONSUMER_CORE, Some("invalid")),
1470 (runtime_env::PROCESS_CORE, Some("12")),
1471 ],
1472 || {
1473 let resolved = resolve_process_core(None, ProcessRole::Consumer);
1474 assert_eq!(resolved, None);
1475 },
1476 );
1477 }
1478}