Skip to main content

disruptor_mp/
builder.rs

1//! Builder surface for creating and attaching multiprocess transports.
2//!
3//! [`SharedDisruptorBuilder`] is the main configuration surface for:
4//!
5//! - shared-memory or mmap setup
6//! - startup coordination
7//! - consumer discovery
8//! - automatic event-handler threads
9//! - wait-policy selection
10//!
11//! The builder supports two broad usage styles:
12//!
13//! - automatic event handling via `handle_events_with(...)`
14//! - manual polling via `build_consumer()`
15
16use super::consumer::SharedConsumer;
17use super::consumer_barrier::{auto_consumer_id, consumer_registration_cursor_name, DiscoveryMode};
18use super::producer::{CoordinationMode, SharedProducer};
19use crate::env::{read, runtime as runtime_env};
20use crate::{MultiProcessResult, SharedCursor, SharedMemoryConfig, SharedRingBuffer};
21use disruptor_core::Sequence;
22use std::env;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::thread::{self, JoinHandle};
25use std::time::Duration;
26
27/// Global counter for generating unique consumer IDs within a process
28static CONSUMER_COUNTER: AtomicUsize = AtomicUsize::new(0);
29
30fn uses_registered_auto_ids(discovery_mode: &DiscoveryMode) -> bool {
31    matches!(
32        discovery_mode,
33        DiscoveryMode::Enabled {
34            consumer_prefix: None,
35            ..
36        }
37    )
38}
39
40fn create_consumer_registration_cursor(
41    base_name: &str,
42    discovery_mode: &DiscoveryMode,
43) -> MultiProcessResult<Option<SharedCursor>> {
44    if !uses_registered_auto_ids(discovery_mode) {
45        return Ok(None);
46    }
47
48    let registration_name = consumer_registration_cursor_name(base_name);
49    let cursor = SharedCursor::new_or_attach(&registration_name, 0)?;
50    Ok(Some(cursor))
51}
52
53fn allocate_registered_consumer_id(base_name: &str) -> Option<String> {
54    let registration_name = consumer_registration_cursor_name(base_name);
55    let registration = SharedCursor::attach(&registration_name).ok()?;
56    let slot = registration.fetch_add(1, Ordering::AcqRel);
57    if slot < 0 {
58        return None;
59    }
60
61    Some(auto_consumer_id(slot as usize))
62}
63
64fn default_consumer_id(base_name: &str) -> String {
65    if let Some(consumer_id) = allocate_registered_consumer_id(base_name) {
66        return consumer_id;
67    }
68
69    let process_id = std::process::id();
70    let consumer_counter = CONSUMER_COUNTER.fetch_add(1, Ordering::Relaxed);
71    format!("c{}_{}", process_id % 10000, consumer_counter)
72}
73
74/// Wait strategy for automatic event handlers
75#[derive(Debug, Clone, Default)]
76pub enum AutoWaitStrategy {
77    /// Maximum performance busy spinning (100% CPU usage) - true busy spin, no hints
78    BusySpin,
79    /// High performance busy spinning with spin loop hints (slightly lower CPU usage)
80    BusySpinWithSpinLoopHint,
81    /// Hybrid: spin N iterations, then yield the thread
82    SpinThenYield {
83        /// Number of spin-loop iterations before yielding once.
84        spins: usize,
85    },
86    /// CPU efficient with configurable sleep duration
87    Sleep(Duration),
88    /// Blocking with efficient waiting (balanced performance/CPU)
89    #[default]
90    Block,
91}
92
93impl AutoWaitStrategy {
94    /// Create a high performance wait strategy (true busy spin)
95    pub fn high_performance() -> Self {
96        AutoWaitStrategy::BusySpin
97    }
98
99    /// Create a high performance wait strategy with spin loop hints
100    pub fn high_performance_with_hints() -> Self {
101        AutoWaitStrategy::BusySpinWithSpinLoopHint
102    }
103
104    /// Create a hybrid strategy that spins N times then yields
105    pub fn spin_then_yield(spins: usize) -> Self {
106        AutoWaitStrategy::SpinThenYield { spins }
107    }
108
109    /// Create a CPU efficient wait strategy
110    pub fn cpu_efficient() -> Self {
111        AutoWaitStrategy::Sleep(Duration::from_micros(1))
112    }
113
114    /// Create a custom sleep-based wait strategy
115    pub fn sleep(duration: Duration) -> Self {
116        AutoWaitStrategy::Sleep(duration)
117    }
118
119    /// Create a sleep-based wait strategy with nanosecond precision
120    ///
121    /// # Special Values
122    /// - `0` = use `spin_loop()` instead of sleep (high performance)
123    /// - `1..` = Sleep for the specified nanoseconds (lower performance, CPU efficient)
124    ///
125    /// Note: Actual sleep precision depends on the operating system.
126    /// Very small durations (< 1000ns) may not sleep at all on some systems.
127    pub fn sleep_nanos(nanos: u64) -> Self {
128        if nanos == 0 {
129            AutoWaitStrategy::BusySpinWithSpinLoopHint
130        } else {
131            AutoWaitStrategy::Sleep(Duration::from_nanos(nanos))
132        }
133    }
134
135    /// Create a sleep-based wait strategy with microsecond precision
136    ///
137    /// # Special Values
138    /// - `0` = use `spin_loop()` instead of sleep (high performance)
139    /// - `1..` = Sleep for the specified microseconds (lower performance, CPU efficient)
140    pub fn sleep_micros(micros: u64) -> Self {
141        if micros == 0 {
142            AutoWaitStrategy::BusySpinWithSpinLoopHint
143        } else {
144            AutoWaitStrategy::Sleep(Duration::from_micros(micros))
145        }
146    }
147
148    /// Create wait strategy from environment variables with fallback
149    ///
150    /// Checks these environment variables in order:
151    /// 1. `MYELON_AUTO_WAIT_DELAY_NS` - nanosecond precision (`0` = `spin_loop`)
152    /// 2. `MYELON_AUTO_WAIT_DELAY_US` - microsecond precision (`0` = `spin_loop`)
153    /// 3. Falls back to the provided default
154    ///
155    /// # Examples
156    /// ```bash
157    /// # Use spin_loop (maximum performance)
158    /// export MYELON_AUTO_WAIT_DELAY_NS=0
159    ///
160    /// # Sleep for 100 nanoseconds
161    /// export MYELON_AUTO_WAIT_DELAY_NS=100
162    ///
163    /// # Sleep for 10 microseconds
164    /// export MYELON_AUTO_WAIT_DELAY_US=10
165    /// ```
166    pub fn from_env_or(default: AutoWaitStrategy) -> Self {
167        // Check for nanosecond precision first
168        if let Some(nanos) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_NS) {
169            return Self::sleep_nanos(nanos);
170        }
171
172        // Check for microsecond precision
173        if let Some(micros) = read::parse::<u64>(runtime_env::AUTO_WAIT_DELAY_US) {
174            return Self::sleep_micros(micros);
175        }
176
177        // Fall back to default
178        default
179    }
180}
181
182/// Automatic consumer that runs in a background thread.
183///
184/// This provides a handle to a consumer running automatic event processing
185/// in a separate thread. It supports graceful shutdown and cleanup operations.
186pub struct AutoConsumer {
187    join_handle: Option<JoinHandle<()>>,
188    shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
189}
190
191impl AutoConsumer {
192    fn new(
193        join_handle: JoinHandle<()>,
194        shutdown_signal: std::sync::Arc<std::sync::atomic::AtomicBool>,
195    ) -> Self {
196        Self {
197            join_handle: Some(join_handle),
198            shutdown_signal,
199        }
200    }
201
202    /// Signal the consumer to shutdown
203    pub fn shutdown(&self) {
204        self.shutdown_signal
205            .store(true, std::sync::atomic::Ordering::Release);
206    }
207
208    /// Wait for the consumer thread to finish
209    pub fn join(&mut self) {
210        if let Some(handle) = self.join_handle.take() {
211            let _ = handle.join();
212        }
213    }
214
215    /// Shutdown and wait for the consumer thread to finish
216    pub fn shutdown_and_join(&mut self) {
217        self.shutdown();
218        self.join();
219    }
220
221    /// Check if the consumer thread is still running
222    pub fn is_running(&self) -> bool {
223        self.join_handle.is_some()
224            && !self
225                .shutdown_signal
226                .load(std::sync::atomic::Ordering::Acquire)
227    }
228}
229
230/// Automatic resource management for [`AutoConsumer`].
231///
232/// Ensures proper cleanup when the consumer goes out of scope, which
233/// matters for embedded host runtimes (e.g. an FFI consumer wrapped by
234/// a garbage-collected language) that expect automatic resource
235/// management.
236impl Drop for AutoConsumer {
237    fn drop(&mut self) {
238        // Signal shutdown and wait for thread to finish
239        self.shutdown_signal
240            .store(true, std::sync::atomic::Ordering::Release);
241
242        if let Some(handle) = self.join_handle.take() {
243            // Give the thread a moment to see the shutdown signal
244            std::thread::sleep(super::wait::SLEEP_CONFIG.shutdown_grace_duration());
245
246            // Wait for clean shutdown (with timeout for safety)
247            match handle.join() {
248                Ok(_) => {
249                    // Clean shutdown successful
250                }
251                Err(_) => {
252                    // Thread panicked - this is unexpected but not fatal
253                    eprintln!(
254                        "Warning: AutoConsumer thread terminated unexpectedly during cleanup"
255                    );
256                }
257            }
258        }
259    }
260}
261
262/// Builder for creating multi-process disruptors.
263///
264/// This builder provides a fluent API for configuring and creating shared memory
265/// disruptors with various coordination modes, discovery options, and event handling
266/// strategies. It supports both producer and consumer creation with automatic
267/// shared memory management.
268///
269/// ## Key features
270///
271/// - **Automatic event delivery**: `handle_events_with()` creates background processing
272/// - **Automatic resource management**: proper cleanup via `Drop` implementations
273/// - **Configurable timeouts**: customizable coordination and discovery timeouts
274/// - **Error handling**: clear error messages and robust failure handling
275///
276/// ## Usage Patterns
277///
278/// ```rust,no_run
279/// use disruptor_mp::*;
280/// use std::time::Duration;
281///
282/// #[derive(Copy, Clone, Default)]
283/// struct Event { data: i64 }
284///
285/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
286/// // Producer with automatic coordination
287/// let mut producer = build_shared_single_producer::<Event>("test", 1024)
288///     .enable_discovery(2)
289///     .with_coordination_timeout(Duration::from_secs(30))
290///     .build_producer(Event::default)?;
291///
292/// // Consumer with automatic event delivery  
293/// let consumer = attach_shared_consumer::<Event>("test", 1024)
294///     .handle_events_with(|event, seq, eob| {
295///         // Events delivered automatically
296///     })?;
297/// # Ok(())
298/// # }
299/// ```
300pub struct SharedDisruptorBuilder<E> {
301    config: SharedMemoryConfig,
302    coordination_mode: Option<CoordinationMode>,
303    discovery_mode: Option<DiscoveryMode>,
304    consumer_id: Option<String>,
305    process_core: Option<usize>,
306    consumer_core: Option<usize>,
307    coordination_timeout: Option<Duration>,
308    _phantom: std::marker::PhantomData<E>,
309}
310
311#[derive(Copy, Clone)]
312enum ProcessRole {
313    Producer,
314    Consumer,
315}
316
317impl ProcessRole {
318    fn env_var(self) -> &'static str {
319        match self {
320            ProcessRole::Producer => runtime_env::PRODUCER_CORE,
321            ProcessRole::Consumer => runtime_env::CONSUMER_CORE,
322        }
323    }
324
325    fn label(self) -> &'static str {
326        match self {
327            ProcessRole::Producer => "producer process",
328            ProcessRole::Consumer => "consumer process",
329        }
330    }
331}
332
333impl<E> SharedDisruptorBuilder<E>
334where
335    E: Copy + Default + 'static,
336{
337    /// Create a new builder with the given configuration
338    pub fn new(config: SharedMemoryConfig) -> Self {
339        Self {
340            config,
341            coordination_mode: None,
342            discovery_mode: None,
343            consumer_id: None,
344            process_core: None,
345            consumer_core: None,
346            coordination_timeout: None,
347            _phantom: std::marker::PhantomData,
348        }
349    }
350
351    /// Set a custom coordination timeout
352    ///
353    /// This overrides the default adaptive timeout behavior for consumer coordination.
354    /// Useful for environments with specific timing requirements.
355    ///
356    /// # Examples
357    /// ```rust,ignore
358    /// use std::time::Duration;
359    /// let builder = builder.with_coordination_timeout(Duration::from_secs(60));
360    /// ```
361    pub fn with_coordination_timeout(mut self, timeout: Duration) -> Self {
362        self.coordination_timeout = Some(timeout);
363        self
364    }
365
366    /// Set the coordination mode for the producer
367    ///
368    /// This enables internal coordination to eliminate the need for external coordination structures.
369    ///
370    /// # Examples
371    /// ```rust,ignore
372    /// use std::time::Duration;
373    /// use disruptor_mp::producer::CoordinationMode;
374    ///
375    /// // Wait for single consumer with 30 second timeout
376    /// let builder = builder.with_coordination(
377    ///     CoordinationMode::wait_for_single_consumer(Duration::from_secs(30))
378    /// );
379    ///
380    /// // Wait for multiple consumers with 45 second timeout
381    /// let builder = builder.with_coordination(
382    ///     CoordinationMode::wait_for_consumers(3, Duration::from_secs(45))
383    /// );
384    /// ```
385    pub fn with_coordination(mut self, coordination_mode: CoordinationMode) -> Self {
386        self.coordination_mode = Some(coordination_mode);
387        self
388    }
389
390    /// Configure consumer discovery mode
391    ///
392    /// # Discovery Modes (Default: Disabled)
393    /// - `DiscoveryMode::Disabled` - No discovery (default), maximum performance for externally coordinated scenarios
394    /// - `DiscoveryMode::Enabled { consumer_prefix: None }` - Deterministic slot-based discovery
395    ///   when coordination is present, with PID scanning as legacy fallback
396    /// - `DiscoveryMode::Enabled { consumer_prefix: Some("DIS_SM") }` - Optimized prefix-based discovery
397    ///
398    /// # Examples
399    /// ```rust,ignore
400    /// use disruptor_mp::lock_free::DiscoveryMode;
401    ///
402    /// // Enable basic discovery for 2 consumers (fixed topology)
403    /// let builder = builder.enable_discovery(2);
404    ///
405    /// // Use consumer naming convention for efficient discovery of 5 consumers
406    /// let builder = builder.discover_consumer_with_prefix(5, "DIS_SM");
407    /// ```
408    pub fn with_discovery(mut self, discovery_mode: DiscoveryMode) -> Self {
409        self.discovery_mode = Some(discovery_mode);
410        self
411    }
412
413    /// Disable consumer discovery (default behavior)
414    ///
415    /// This is ideal for externally coordinated scenarios like benchmarks
416    /// where consumer coordination is handled outside the disruptor.
417    /// Discovery is disabled by default for maximum performance.
418    pub fn disable_discovery(self) -> Self {
419        self.with_discovery(DiscoveryMode::Disabled)
420    }
421
422    /// Enable basic consumer discovery for fixed topology (convenience method)
423    ///
424    /// When startup coordination is present, consumers are assigned deterministic
425    /// slot-based IDs (`ad_0`, `ad_1`, ...) so discovery does not rely on PID
426    /// proximity. Without coordination, discovery falls back to PID scanning with
427    /// the legacy naming pattern `c{pid}_{counter}`.
428    /// Stops scanning once the expected number of consumers are discovered.
429    /// Default scan interval: 100ms.
430    pub fn enable_discovery(self, max_consumers: usize) -> Self {
431        self.with_discovery(DiscoveryMode::enabled(max_consumers))
432    }
433
434    /// Enable optimized discovery with consumer naming convention for fixed topology (convenience method)
435    ///
436    /// This uses consumer name prefixes for much faster consumer discovery
437    /// compared to PID scanning. Consumers will be discovered using names
438    /// like `DIS_SM_1`, `DIS_SM_2`, etc.
439    /// The prefix identifies the consumer group/application, not the OS process name.
440    /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
441    /// Default scan interval: 100ms.
442    pub fn discover_consumer_with_prefix(self, max_consumers: usize, prefix: &str) -> Self {
443        self.with_discovery(DiscoveryMode::with_consumer_prefix(
444            max_consumers,
445            prefix.to_string(),
446        ))
447    }
448
449    /// Enable discovery with custom scan interval for fixed topology (convenience method)
450    ///
451    /// This configures how often the producer scans for new consumers.
452    /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
453    /// Shorter intervals provide faster consumer detection but use more CPU.
454    /// Longer intervals reduce CPU usage but may delay consumer detection.
455    pub fn with_discovery_interval(self, max_consumers: usize, scan_interval: Duration) -> Self {
456        self.with_discovery(DiscoveryMode::with_scan_interval(
457            max_consumers,
458            scan_interval,
459        ))
460    }
461
462    /// Enable discovery with consumer prefix and custom scan interval for fixed topology (convenience method)
463    ///
464    /// Combines optimized consumer prefix discovery with configurable scan timing.
465    /// Stops scanning once the expected number of consumers are discovered, saving CPU cycles.
466    pub fn discover_consumer_with_prefix_and_interval(
467        self,
468        max_consumers: usize,
469        prefix: &str,
470        scan_interval: Duration,
471    ) -> Self {
472        self.with_discovery(DiscoveryMode::with_consumer_prefix_and_interval(
473            max_consumers,
474            prefix.to_string(),
475            scan_interval,
476        ))
477    }
478
479    /// Enable coordination for single consumer scenarios (convenience method)
480    ///
481    /// This is equivalent to:
482    /// ```rust,ignore
483    /// use std::time::Duration;
484    /// use disruptor_mp::producer::CoordinationMode;
485    /// builder.with_coordination(CoordinationMode::wait_for_single_consumer(timeout))
486    /// ```
487    pub fn wait_for_single_consumer(self, timeout: Duration) -> Self {
488        self.with_coordination(CoordinationMode::wait_for_single_consumer(timeout))
489    }
490
491    /// Enable coordination for multiple consumer scenarios (convenience method)
492    pub fn wait_for_consumers(self, min_consumers: i64, timeout: Duration) -> Self {
493        self.with_coordination(CoordinationMode::wait_for_consumers(min_consumers, timeout))
494    }
495
496    /// Set a custom consumer ID for this consumer instance
497    ///
498    /// This is useful for prefix-based consumer discovery where consumers need
499    /// specific naming patterns. If not set, a default ID will be generated
500    /// using the pattern "c{pid}_{counter}".
501    ///
502    /// # Example
503    /// ```rust,ignore
504    /// let consumer = builder
505    ///     .with_consumer_id("TEST_CONSUMER_1")
506    ///     .build_consumer()?;
507    /// ```
508    pub fn with_consumer_id(mut self, consumer_id: &str) -> Self {
509        self.consumer_id = Some(consumer_id.to_string());
510        self
511    }
512
513    /// Bind automatic consumer thread to a specific CPU core.
514    ///
515    /// Uses Linux core affinity on Linux builds. On macOS and other non-Linux
516    /// platforms this configuration is accepted, ignored, and logged as a
517    /// best-effort no-op so consumer bring-up still succeeds.
518    ///
519    /// The builder resolves the effective affinity order:
520    /// 1. Explicit builder value.
521    /// 2. `MYELON_AUTO_CONSUMER_CORE` environment variable (if set).
522    pub fn with_consumer_core(mut self, core_id: usize) -> Self {
523        self.consumer_core = Some(core_id);
524        self
525    }
526
527    /// Bind this process (producer or consumer) to a specific CPU core.
528    ///
529    /// On Linux this pins the current thread before ring setup. For the normal
530    /// single-threaded producer/consumer entrypoints this acts as process-role
531    /// affinity, and newly spawned auto-consumer threads inherit the mask unless
532    /// they are pinned separately with `with_consumer_core()`. On macOS and
533    /// other non-Linux platforms this request is accepted, ignored, and logged
534    /// as a best-effort no-op so producer/consumer bring-up still succeeds.
535    ///
536    /// Resolution order:
537    /// 1. Explicit builder value via `with_process_core()`
538    /// 2. Role specific env var (`MYELON_PRODUCER_CORE`
539    ///    or `MYELON_CONSUMER_CORE`)
540    /// 3. Generic env var `MYELON_PROCESS_CORE`
541    pub fn with_process_core(mut self, core_id: usize) -> Self {
542        self.process_core = Some(core_id);
543        self
544    }
545
546    fn maybe_pin_process_to_core(&self, role: ProcessRole) {
547        if let Some(core_id) = resolve_process_core(self.process_core, role) {
548            pin_current_thread_to_core(core_id, role.label());
549        }
550    }
551
552    /// Build a consumer with automatic batch event handling
553    ///
554    /// This provides automatic event delivery with configurable wait strategies.
555    /// Events are processed in batches for optimal performance.
556    ///
557    /// # Wait Strategies
558    /// - `AutoWaitStrategy::BusySpin` - Maximum performance (100% CPU, true busy spin)
559    /// - `AutoWaitStrategy::BusySpinWithSpinLoopHint` - High performance with CPU hints
560    /// - `AutoWaitStrategy::Block` - Balanced performance/CPU (default)
561    /// - `AutoWaitStrategy::Sleep(duration)` - CPU efficient with custom sleep
562    ///
563    /// # Examples
564    /// ```rust,ignore
565    /// use std::time::Duration;
566    /// use disruptor_mp::builder::AutoWaitStrategy;
567    ///
568    /// // Maximum performance for AI inference (true busy spin)
569    /// consumer.handle_events_batch(handler, AutoWaitStrategy::high_performance())
570    ///
571    /// // High performance with CPU hints (slightly more efficient)
572    /// consumer.handle_events_batch(handler, AutoWaitStrategy::high_performance_with_hints())
573    ///
574    /// // CPU efficient
575    /// consumer.handle_events_batch(handler, AutoWaitStrategy::cpu_efficient())
576    ///
577    /// // Custom sleep duration
578    /// consumer.handle_events_batch(handler, AutoWaitStrategy::sleep(Duration::from_micros(10)))
579    ///
580    /// // Balanced (default)
581    /// consumer.handle_events_batch(handler, AutoWaitStrategy::default())
582    /// ```
583    pub fn handle_events_batch<EH>(
584        self,
585        mut event_handler: EH,
586        wait_strategy: AutoWaitStrategy,
587    ) -> MultiProcessResult<AutoConsumer>
588    where
589        EH: 'static + Send + FnMut(&E, Sequence, bool),
590    {
591        self.maybe_pin_process_to_core(ProcessRole::Consumer);
592
593        let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
594
595        // Attach to existing shared atomics
596        let producer_sequence_name = format!("{}_producer_seq", self.config.name);
597        let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
598
599        // Use custom consumer ID if provided, otherwise generate a deterministic
600        // slot-based ID when coordinated discovery is available.
601        let consumer_id = if let Some(custom_id) = self.consumer_id {
602            custom_id
603        } else {
604            default_consumer_id(&self.config.name)
605        };
606
607        // Create this consumer's own sequence tracker
608        let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
609        let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
610        if consumer_sequence.is_owner() {
611            consumer_sequence.set_owner(false);
612        }
613
614        // Create the consumer with coordination support
615        let mut consumer = SharedConsumer::new_with_coordination(
616            ring_buffer,
617            producer_sequence,
618            consumer_sequence,
619            consumer_id,
620            Some(self.config.name.clone()),
621        );
622
623        // Create shutdown signal
624        let shutdown_signal = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
625        let shutdown_signal_clone = std::sync::Arc::clone(&shutdown_signal);
626        let consumer_core = resolve_auto_consumer_core(self.consumer_core);
627        let thread_name = format!("multiprocess-consumer-{}", std::process::id());
628
629        // Spawn background thread for automatic event processing
630        let join_handle = thread::Builder::new()
631            .name(thread_name.clone())
632            .spawn(move || {
633                if let Some(core_id) = consumer_core {
634                    pin_current_thread_to_core(core_id, &thread_name);
635                }
636
637                loop {
638                    let processed = match wait_strategy {
639                        AutoWaitStrategy::BusySpin => {
640                            // Maximum performance: True busy spin
641                            // TRUE BUSY SPIN: Do nothing when no events (like single-process BusySpin)
642                            // This achieves maximum throughput at 100% CPU usage
643                            consumer.process_available(|event, seq| {
644                                // Approximate end_of_batch as false for maximum performance
645                                event_handler(event, seq, false);
646                            })
647                        }
648                        AutoWaitStrategy::BusySpinWithSpinLoopHint => {
649                            // High performance: Busy spin with hints
650                            let processed = consumer.process_available(|event, seq| {
651                                // Approximate end_of_batch as false for maximum performance
652                                event_handler(event, seq, false);
653                            });
654
655                            if processed == 0 {
656                                std::hint::spin_loop();
657                            }
658                            processed
659                        }
660                        AutoWaitStrategy::SpinThenYield { spins } => {
661                            // Hybrid: spin N times then yield to reduce tail latencies
662                            let processed = consumer.process_available(|event, seq| {
663                                event_handler(event, seq, false);
664                            });
665
666                            if processed == 0 {
667                                // Spin N iterations using spin_loop hint
668                                for _ in 0..spins {
669                                    std::hint::spin_loop();
670                                }
671                                // Then yield the thread to reduce scheduler starvation
672                                std::thread::yield_now();
673                            }
674                            processed
675                        }
676                        AutoWaitStrategy::Block => {
677                            // ⚖️ BALANCED: Batch processing with non-blocking check first
678                            // Check if events are immediately available
679                            let processed = consumer.process_available(|event, seq| {
680                                // Approximate end_of_batch as false for simplicity
681                                event_handler(event, seq, false);
682                            });
683
684                            if processed == 0 {
685                                // No events available, wait a bit before trying again
686                                // This prevents the infinite spin in consume_next()
687                                super::wait::perform_default_block_wait();
688                            }
689
690                            processed
691                        }
692                        AutoWaitStrategy::Sleep(duration) => {
693                            // CPU efficient: Batch processing with configurable sleep
694                            let processed = consumer.process_available(|event, seq| {
695                                // Approximate end_of_batch as false for simplicity
696                                event_handler(event, seq, false);
697                            });
698
699                            if processed == 0 {
700                                super::wait::sleep_or_yield(duration);
701                            }
702                            processed
703                        }
704                    };
705
706                    // Continue processing - the wait strategy handles timing
707
708                    // Performance optimization: Only check shutdown when idle (no events processed)
709                    // This avoids atomic loads in the hot path when processing events
710                    if processed == 0
711                        && shutdown_signal_clone.load(std::sync::atomic::Ordering::Acquire)
712                    {
713                        break;
714                    }
715                }
716            })
717            .expect("Should spawn consumer thread");
718
719        Ok(AutoConsumer::new(join_handle, shutdown_signal))
720    }
721
722    /// Build a consumer with automatic event handling (convenience method)
723    ///
724    /// Uses the default balanced wait strategy (Block).
725    /// For performance tuning, use `handle_events_batch()` with explicit wait strategy.
726    pub fn handle_events_with<EH>(self, event_handler: EH) -> MultiProcessResult<AutoConsumer>
727    where
728        EH: 'static + Send + FnMut(&E, Sequence, bool),
729    {
730        self.handle_events_batch(event_handler, AutoWaitStrategy::default())
731    }
732
733    /// Build a producer (creates shared memory segments)
734    ///
735    /// Note: Only one producer can exist per shared memory segment.
736    /// For multi-process scenarios, use Single Producer Multiple Consumer (SPMC) pattern.
737    pub fn build_producer<F>(self, event_factory: F) -> MultiProcessResult<SharedProducer<E>>
738    where
739        F: FnMut() -> E,
740    {
741        #[cfg(dst)]
742        if crate::dst::buggify(file!(), line!()) {
743            std::thread::sleep(Duration::from_millis(100));
744        }
745
746        self.maybe_pin_process_to_core(ProcessRole::Producer);
747
748        let discovery_mode = self.discovery_mode.unwrap_or_default();
749        let consumer_registration =
750            create_consumer_registration_cursor(&self.config.name, &discovery_mode)?;
751
752        let ring_buffer: SharedRingBuffer<E> =
753            SharedRingBuffer::new(self.config.clone(), event_factory)?;
754
755        // Create shared atomic for producer sequence
756        let producer_sequence_name = format!("{}_producer_seq", self.config.name);
757        let producer_sequence = SharedCursor::new(&producer_sequence_name, -1)?;
758
759        // Automatic coordination when discovery is enabled
760        // This eliminates the need for users to manually configure coordination
761        let coordination_mode = self.coordination_mode.unwrap_or_else(|| {
762            // Auto-enable coordination when discovery is enabled
763            match &discovery_mode {
764                DiscoveryMode::Enabled { max_consumers, .. } => {
765                    // Use custom timeout if provided, otherwise use adaptive timeout
766                    let timeout = self.coordination_timeout.unwrap_or_else(|| {
767                        // FRAMEWORK INTELLIGENCE: Adaptive timeout based on consumer count
768                        match *max_consumers {
769                            1 => Duration::from_secs(15),     // SPSC: Quick startup
770                            2 => Duration::from_secs(20),     // SPMC-2: Proven sweet spot
771                            3..=4 => Duration::from_secs(25), // SPMC-3/4: Moderate
772                            5..=8 => Duration::from_secs(30), // SPMC-5+: More time for coordination
773                            _ => Duration::from_secs(45),     // SPMC-9+: Maximum timeout
774                        }
775                    });
776
777                    CoordinationMode::wait_for_consumers(*max_consumers as i64, timeout)
778                }
779                _ => CoordinationMode::default(), // No discovery = immediate start
780            }
781        });
782
783        let mut producer = SharedProducer::new_with_coordination_and_discovery(
784            ring_buffer,
785            producer_sequence,
786            self.config.name.clone(), // Pass base name for consumer discovery
787            coordination_mode.clone(),
788            discovery_mode,
789            consumer_registration,
790        );
791
792        // Handle coordination during producer creation, not first publish
793        match coordination_mode {
794            CoordinationMode::Immediate => {
795                // No coordination - for external coordination benchmarks
796                producer.coordination_completed = true;
797            }
798            CoordinationMode::WaitForConsumers {
799                min_consumers,
800                timeout,
801            } => {
802                println!(
803                    "Framework coordinating startup: waiting for {} consumers (timeout: {:?})...",
804                    min_consumers, timeout
805                );
806                let coordination_ready = producer
807                    .consumer_barrier
808                    .wait_for_consumers_ready(min_consumers, timeout);
809                if !coordination_ready {
810                    eprintln!("Warning: Timed out waiting for {} consumers after {:?}. Producer created anyway.",
811                        min_consumers, timeout);
812                    println!(
813                        "Framework coordination incomplete - producer continuing without {} ready consumers",
814                        min_consumers
815                    );
816                } else {
817                    println!(
818                        "Framework coordination completed - {} consumers ready",
819                        min_consumers
820                    );
821                }
822                // Mark coordination as completed
823                producer.coordination_completed = true;
824            }
825            CoordinationMode::BufferUntilConsumers { .. } => {
826                // Future enhancement - for now, treat as completed
827                producer.coordination_completed = true;
828            }
829        }
830
831        Ok(producer)
832    }
833
834    /// Build a consumer (attaches to existing shared memory segments)
835    pub fn build_consumer(self) -> MultiProcessResult<SharedConsumer<E>> {
836        #[cfg(dst)]
837        if crate::dst::buggify(file!(), line!()) {
838            std::thread::sleep(Duration::from_millis(100));
839        }
840
841        self.maybe_pin_process_to_core(ProcessRole::Consumer);
842
843        let ring_buffer: SharedRingBuffer<E> = SharedRingBuffer::attach(self.config.clone())?;
844
845        // Attach to existing shared atomics
846        let producer_sequence_name = format!("{}_producer_seq", self.config.name);
847        let producer_sequence = SharedCursor::attach(&producer_sequence_name)?;
848
849        // Use custom consumer ID if provided, otherwise generate a deterministic
850        // slot-based ID when coordinated discovery is available.
851        let consumer_id = if let Some(custom_id) = self.consumer_id {
852            custom_id
853        } else {
854            default_consumer_id(&self.config.name)
855        };
856
857        // Create this consumer's own sequence tracker
858        // Note: Uses new_or_attach because multiple consumers might start simultaneously
859        // and try to create the same sequence name (different from coordination structures)
860        let consumer_sequence_name = format!("{}_{}_seq", self.config.name, consumer_id);
861        let mut consumer_sequence = SharedCursor::new_or_attach(&consumer_sequence_name, -1)?;
862        if consumer_sequence.is_owner() {
863            consumer_sequence.set_owner(false);
864        }
865
866        Ok(SharedConsumer::new_with_coordination(
867            ring_buffer,
868            producer_sequence,
869            consumer_sequence,
870            consumer_id,
871            Some(self.config.name.clone()),
872        ))
873    }
874}
875
876fn resolve_auto_consumer_core(consumer_core: Option<usize>) -> Option<usize> {
877    if let Some(core_id) = consumer_core {
878        return Some(core_id);
879    }
880
881    resolve_core_from_env_vars(&[runtime_env::AUTO_CONSUMER_CORE])
882}
883
884fn resolve_process_core(process_core: Option<usize>, role: ProcessRole) -> Option<usize> {
885    if let Some(core_id) = process_core {
886        return Some(core_id);
887    }
888
889    resolve_core_from_env_vars(&[role.env_var(), runtime_env::PROCESS_CORE])
890}
891
892fn resolve_core_from_env_vars(env_vars: &[&str]) -> Option<usize> {
893    for env_var in env_vars {
894        match env::var(env_var) {
895            Ok(raw) => match raw.parse::<usize>() {
896                Ok(core_id) => return Some(core_id),
897                Err(_) => {
898                    eprintln!(
899                        "Invalid {}='{}'. Expected a non-negative integer.",
900                        env_var, raw
901                    );
902                    return None;
903                }
904            },
905            Err(env::VarError::NotPresent) => {}
906            Err(env::VarError::NotUnicode(_)) => {
907                eprintln!("Invalid {}: value is not valid Unicode.", env_var);
908                return None;
909            }
910        }
911    }
912
913    None
914}
915
916#[cfg(target_os = "linux")]
917fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
918    let available_cores = core_affinity::get_core_ids().unwrap_or_default();
919    let core = core_affinity::CoreId { id: core_id };
920
921    if !available_cores
922        .iter()
923        .any(|candidate| candidate.id == core_id)
924    {
925        eprintln!(
926            "Could not pin {} to core {}: core not available.",
927            thread_name, core_id
928        );
929        return;
930    }
931
932    if !core_affinity::set_for_current(core) {
933        eprintln!("Could not pin {} to core {}.", thread_name, core_id);
934    }
935}
936
937#[cfg(not(target_os = "linux"))]
938fn pin_current_thread_to_core(core_id: usize, thread_name: &str) {
939    eprintln!(
940        "CPU affinity support is Linux-only in disruptor-mp. Requested pinning {} -> core {} ignored; bring-up continues without pinning on this platform.",
941        thread_name, core_id
942    );
943}
944
945/// Create a shared single producer for multi-process communication
946///
947/// This creates a Single Producer Multiple Consumer (SPMC) setup where:
948/// - One process creates and owns the producer
949/// - Multiple processes can attach as consumers (each sees all events)
950///
951/// Note: `SharedProducer` cannot be cloned across processes. Each shared memory
952/// segment supports exactly one producer process.
953///
954/// For automatic coordination, use the builder pattern:
955/// ```rust,ignore
956/// use std::time::Duration;
957/// use disruptor_mp::build_shared_single_producer;
958///
959/// let producer = build_shared_single_producer::<Event>("test", 1024)
960///     .wait_for_single_consumer(Duration::from_secs(30))
961///     .build_producer(Event::default)?;
962/// ```
963/// Create a builder for a shared single producer with the given name and size.
964///
965/// This is a convenience function that creates a [`SharedDisruptorBuilder`] configured
966/// for producer creation with the specified shared memory segment name and buffer size.
967///
968/// # Arguments
969/// * `name` - Shared memory segment name (keep under 10 characters for cross-platform compatibility)
970/// * `size` - Ring buffer size (must be power of 2)
971///
972/// # Examples
973/// ```rust
974/// use disruptor_mp::build_shared_single_producer;
975///
976/// #[derive(Copy, Clone, Default)]
977/// struct Event { data: i64 }
978///
979/// let builder = build_shared_single_producer::<Event>("ring123", 1024);
980/// let producer = builder.build_producer(|| Event::default())?;
981/// # Ok::<(), Box<dyn std::error::Error>>(())
982/// ```
983pub fn build_shared_single_producer<E: Copy + Default + 'static>(
984    name: &str,
985    size: usize,
986) -> SharedDisruptorBuilder<E> {
987    let config = SharedMemoryConfig {
988        name: name.to_string(),
989        buffer_size: size,
990        element_size: std::mem::size_of::<E>(),
991        create: true,
992    };
993
994    SharedDisruptorBuilder::new(config)
995}
996
997/// Attach to an existing shared disruptor as a consumer
998///
999/// This allows multiple consumer processes to attach to a shared memory segment
1000/// created by a producer process. Each consumer will see all events (broadcast semantics).
1001/// Create a builder for attaching to an existing shared consumer.
1002///
1003/// This is a convenience function that creates a [`SharedDisruptorBuilder`] configured
1004/// for consumer attachment to an existing shared memory segment created by a producer.
1005///
1006/// # Arguments
1007/// * `name` - Shared memory segment name (must match the producer's name)
1008/// * `size` - Ring buffer size (must match the producer's size)
1009///
1010/// # Examples
1011/// ```rust,no_run
1012/// use disruptor_mp::attach_shared_consumer;
1013///
1014/// #[derive(Copy, Clone, Default)]
1015/// struct Event { data: i64 }
1016///
1017/// let builder = attach_shared_consumer::<Event>("ring123", 1024);
1018/// let consumer = builder.build_consumer()?;
1019/// # Ok::<(), Box<dyn std::error::Error>>(())
1020/// ```
1021pub fn attach_shared_consumer<E: Copy + Default + 'static>(
1022    name: &str,
1023    size: usize,
1024) -> SharedDisruptorBuilder<E> {
1025    let config = SharedMemoryConfig {
1026        name: name.to_string(),
1027        buffer_size: size,
1028        element_size: std::mem::size_of::<E>(),
1029        create: false,
1030    };
1031
1032    SharedDisruptorBuilder::new(config)
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use super::*;
1038    use std::ffi::OsString;
1039    use std::sync::atomic::{AtomicUsize, Ordering};
1040    use std::sync::Arc;
1041    use std::sync::{Mutex, OnceLock};
1042    use std::time::{Duration, Instant};
1043
1044    #[derive(Copy, Clone, Default)]
1045    struct TestEvent {
1046        value: i64,
1047    }
1048
1049    fn env_lock() -> &'static Mutex<()> {
1050        static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
1051        ENV_LOCK.get_or_init(|| Mutex::new(()))
1052    }
1053
1054    fn with_env_vars<const N: usize, F, R>(
1055        vars: [(&'static str, Option<&'static str>); N],
1056        f: F,
1057    ) -> R
1058    where
1059        F: FnOnce() -> R,
1060    {
1061        let _guard = env_lock().lock().expect("environment lock poisoned");
1062        let previous: [(&'static str, Option<OsString>); N] =
1063            vars.map(|(key, _)| (key, std::env::var_os(key)));
1064
1065        for (key, value) in vars {
1066            match value {
1067                Some(value) => std::env::set_var(key, value),
1068                None => std::env::remove_var(key),
1069            }
1070        }
1071
1072        let result = f();
1073
1074        for (key, value) in previous {
1075            match value {
1076                Some(value) => std::env::set_var(key, value),
1077                None => std::env::remove_var(key),
1078            }
1079        }
1080
1081        result
1082    }
1083
1084    /// Test that Block wait strategy doesn't hang when no events are available
1085    #[test]
1086    fn test_block_wait_strategy_no_hang() {
1087        let segment_name = format!("test_blk_{}", std::process::id() % 10000);
1088        let buffer_size = 128;
1089
1090        // Create producer
1091        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1092            .build_producer(TestEvent::default)
1093            .expect("Failed to create producer");
1094
1095        // Create consumer with Block wait strategy
1096        let events_received = Arc::new(AtomicUsize::new(0));
1097        let events_clone = Arc::clone(&events_received);
1098
1099        let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1100            .handle_events_batch(
1101                move |_event: &TestEvent, _seq, _eob| {
1102                    events_clone.fetch_add(1, Ordering::Relaxed);
1103                },
1104                AutoWaitStrategy::Block,
1105            )
1106            .expect("Failed to create consumer");
1107
1108        // Wait a bit to ensure consumer doesn't hang
1109        std::thread::sleep(Duration::from_millis(100));
1110
1111        // Publish an event
1112        producer.publish(|event| {
1113            event.value = 42;
1114        });
1115
1116        // Wait for event to be processed
1117        std::thread::sleep(Duration::from_millis(50));
1118
1119        // Verify event was received
1120        assert_eq!(events_received.load(Ordering::Relaxed), 1);
1121
1122        // Shutdown consumer
1123        drop(consumer);
1124    }
1125
1126    /// Test that `BusySpin` wait strategy works correctly
1127    #[test]
1128    fn test_busy_spin_wait_strategy() {
1129        let segment_name = format!("test_spn_{}", std::process::id() % 10000);
1130        let buffer_size = 128;
1131
1132        // Create producer
1133        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1134            .build_producer(TestEvent::default)
1135            .expect("Failed to create producer");
1136
1137        // Create consumer with BusySpin wait strategy
1138        let events_received = Arc::new(AtomicUsize::new(0));
1139        let events_clone = Arc::clone(&events_received);
1140
1141        let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1142            .handle_events_batch(
1143                move |_event: &TestEvent, _seq, _eob| {
1144                    events_clone.fetch_add(1, Ordering::Relaxed);
1145                },
1146                AutoWaitStrategy::BusySpin,
1147            )
1148            .expect("Failed to create consumer");
1149
1150        // Publish multiple events
1151        for i in 0..10 {
1152            producer.publish(|event| {
1153                event.value = i;
1154            });
1155        }
1156
1157        // Wait for events to be processed
1158        std::thread::sleep(Duration::from_millis(50));
1159
1160        // Verify all events were received
1161        assert_eq!(events_received.load(Ordering::Relaxed), 10);
1162
1163        // Shutdown consumer
1164        drop(consumer);
1165    }
1166
1167    /// Test that Sleep wait strategy works correctly
1168    #[test]
1169    fn test_sleep_wait_strategy() {
1170        let segment_name = format!("test_slp_{}", std::process::id() % 10000);
1171        let buffer_size = 128;
1172
1173        // Create producer
1174        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1175            .build_producer(TestEvent::default)
1176            .expect("Failed to create producer");
1177
1178        // Create consumer with Sleep wait strategy
1179        let events_received = Arc::new(AtomicUsize::new(0));
1180        let events_clone = Arc::clone(&events_received);
1181
1182        let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1183            .handle_events_batch(
1184                move |_event: &TestEvent, _seq, _eob| {
1185                    events_clone.fetch_add(1, Ordering::Relaxed);
1186                },
1187                AutoWaitStrategy::Sleep(Duration::from_millis(5)),
1188            )
1189            .expect("Failed to create consumer");
1190
1191        // Publish events
1192        for i in 0..5 {
1193            producer.publish(|event| {
1194                event.value = i;
1195            });
1196            std::thread::sleep(Duration::from_millis(10));
1197        }
1198
1199        // Wait for events to be processed
1200        std::thread::sleep(Duration::from_millis(100));
1201
1202        // Verify all events were received
1203        assert_eq!(events_received.load(Ordering::Relaxed), 5);
1204
1205        // Shutdown consumer
1206        drop(consumer);
1207    }
1208
1209    /// Test `AutoConsumer` shutdown mechanism
1210    #[test]
1211    fn test_auto_consumer_shutdown() {
1212        let segment_name = format!("test_sht_{}", std::process::id() % 10000);
1213        let buffer_size = 128;
1214
1215        // Create producer
1216        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1217            .build_producer(TestEvent::default)
1218            .expect("Failed to create producer");
1219
1220        // Create consumer
1221        let events_received = Arc::new(AtomicUsize::new(0));
1222        let events_clone = Arc::clone(&events_received);
1223
1224        let mut consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1225            .handle_events_batch(
1226                move |_event: &TestEvent, _seq, _eob| {
1227                    events_clone.fetch_add(1, Ordering::Relaxed);
1228                },
1229                AutoWaitStrategy::Block,
1230            )
1231            .expect("Failed to create consumer");
1232
1233        // Publish some events
1234        for i in 0..5 {
1235            producer.publish(|event| {
1236                event.value = i;
1237            });
1238        }
1239
1240        // Wait for events to be processed
1241        std::thread::sleep(Duration::from_millis(50));
1242        assert_eq!(events_received.load(Ordering::Relaxed), 5);
1243
1244        // Shutdown consumer
1245        consumer.shutdown();
1246
1247        // Publish more events
1248        for i in 5..10 {
1249            producer.publish(|event| {
1250                event.value = i;
1251            });
1252        }
1253
1254        // Wait and verify no new events are processed
1255        std::thread::sleep(Duration::from_millis(50));
1256        assert_eq!(events_received.load(Ordering::Relaxed), 5);
1257
1258        // Clean up
1259        consumer.join();
1260    }
1261
1262    /// Test that `AutoConsumer` processes events correctly
1263    /// Note: The batch tracking with `end_of_batch` flag is not reliable in the current
1264    /// implementation as it's approximated for performance reasons
1265    #[test]
1266    fn test_auto_consumer_batch_processing() {
1267        let segment_name = format!("test_bat_{}", std::process::id() % 10000);
1268        let buffer_size = 1024;
1269
1270        // Track events processed (simpler test)
1271        let events_processed = Arc::new(AtomicUsize::new(0));
1272        let events_clone = Arc::clone(&events_processed);
1273
1274        // Create producer WITHOUT discovery (since buffer wrapping is now fixed)
1275        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1276            .build_producer(TestEvent::default)
1277            .expect("Failed to create producer");
1278
1279        // Create consumer after producer (now safe with buffer wrapping fix)
1280        let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1281            .handle_events_batch(
1282                move |_event: &TestEvent, _seq, _end_of_batch| {
1283                    events_clone.fetch_add(1, Ordering::Relaxed);
1284                },
1285                AutoWaitStrategy::Block,
1286            )
1287            .expect("Failed to create consumer");
1288
1289        // Publish events in bursts
1290        for burst in 0..3 {
1291            for i in 0..10 {
1292                producer.publish(|event| {
1293                    event.value = burst * 10 + i;
1294                });
1295            }
1296            std::thread::sleep(Duration::from_millis(50));
1297        }
1298
1299        // Wait for processing
1300        std::thread::sleep(Duration::from_millis(200)); // Slightly longer wait
1301
1302        // Verify all events were processed
1303        let total_events = events_processed.load(Ordering::Relaxed);
1304        assert_eq!(total_events, 30, "Should have processed all 30 events");
1305
1306        // Shutdown consumer
1307        drop(consumer);
1308    }
1309
1310    /// Test performance characteristics of different wait strategies
1311    #[test]
1312    #[ignore] // Ignore by default as this is a performance test
1313    fn test_wait_strategy_performance() {
1314        let buffer_size = 8192;
1315        let num_events = 100_000;
1316
1317        // Test each wait strategy
1318        let strategies = vec![
1319            ("BusySpin", AutoWaitStrategy::BusySpin),
1320            (
1321                "BusySpinWithHint",
1322                AutoWaitStrategy::BusySpinWithSpinLoopHint,
1323            ),
1324            ("Block", AutoWaitStrategy::Block),
1325            (
1326                "Sleep_1us",
1327                AutoWaitStrategy::Sleep(Duration::from_micros(1)),
1328            ),
1329            (
1330                "Sleep_100us",
1331                AutoWaitStrategy::Sleep(Duration::from_micros(100)),
1332            ),
1333        ];
1334
1335        for (name, strategy) in strategies {
1336            let segment_name = format!("tst_p_{}_{}", name, std::process::id() % 10000);
1337
1338            // Create producer
1339            let mut producer =
1340                build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1341                    .build_producer(TestEvent::default)
1342                    .expect("Failed to create producer");
1343
1344            // Create consumer
1345            let events_received = Arc::new(AtomicUsize::new(0));
1346            let events_clone = Arc::clone(&events_received);
1347            let start = Instant::now();
1348
1349            let consumer = attach_shared_consumer::<TestEvent>(&segment_name, buffer_size)
1350                .handle_events_batch(
1351                    move |_event: &TestEvent, _seq, _eob| {
1352                        events_clone.fetch_add(1, Ordering::Relaxed);
1353                    },
1354                    strategy,
1355                )
1356                .expect("Failed to create consumer");
1357
1358            // Publish events
1359            for i in 0..num_events {
1360                producer.publish(|event| {
1361                    event.value = i;
1362                });
1363            }
1364
1365            // Wait for all events to be processed
1366            while events_received.load(Ordering::Relaxed) < num_events as usize {
1367                std::thread::sleep(Duration::from_millis(1));
1368            }
1369
1370            let elapsed = start.elapsed();
1371            let events_per_sec = num_events as f64 / elapsed.as_secs_f64();
1372
1373            println!("{} strategy: {:.0} events/sec", name, events_per_sec);
1374
1375            // Shutdown consumer
1376            drop(consumer);
1377        }
1378    }
1379
1380    #[test]
1381    fn test_consumer_core_override_is_preserved() {
1382        let segment_name = format!("test_cpu_{}", std::process::id() % 10000);
1383        let buffer_size = 128;
1384        let builder =
1385            attach_shared_consumer::<TestEvent>(&segment_name, buffer_size).with_consumer_core(3);
1386
1387        assert_eq!(builder.consumer_core, Some(3));
1388    }
1389
1390    #[test]
1391    fn test_consumer_core_resolve_from_builder_overrides_env() {
1392        let override_core = 7usize;
1393        let resolved = resolve_auto_consumer_core(Some(override_core));
1394        assert_eq!(resolved, Some(override_core));
1395    }
1396
1397    #[test]
1398    fn test_consumer_core_resolve_from_env_var() {
1399        with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("9"))], || {
1400            let resolved = resolve_auto_consumer_core(None);
1401            assert_eq!(resolved, Some(9));
1402        });
1403    }
1404
1405    #[test]
1406    fn test_consumer_core_resolve_from_invalid_env_var() {
1407        with_env_vars([(runtime_env::AUTO_CONSUMER_CORE, Some("invalid"))], || {
1408            let resolved = resolve_auto_consumer_core(None);
1409            assert_eq!(resolved, None);
1410        });
1411    }
1412
1413    #[test]
1414    fn test_process_core_override_is_preserved() {
1415        let segment_name = format!("test_prc_{}", std::process::id() % 10000);
1416        let buffer_size = 128;
1417        let builder = build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1418            .with_process_core(5);
1419
1420        assert_eq!(builder.process_core, Some(5));
1421    }
1422
1423    #[test]
1424    fn test_process_core_resolve_from_builder_overrides_env() {
1425        with_env_vars(
1426            [
1427                (runtime_env::PRODUCER_CORE, Some("7")),
1428                (runtime_env::PROCESS_CORE, Some("9")),
1429            ],
1430            || {
1431                let resolved = resolve_process_core(Some(11), ProcessRole::Producer);
1432                assert_eq!(resolved, Some(11));
1433            },
1434        );
1435    }
1436
1437    #[test]
1438    fn test_process_core_resolve_from_role_specific_env_var() {
1439        with_env_vars(
1440            [
1441                (runtime_env::PRODUCER_CORE, Some("6")),
1442                (runtime_env::PROCESS_CORE, Some("8")),
1443            ],
1444            || {
1445                let resolved = resolve_process_core(None, ProcessRole::Producer);
1446                assert_eq!(resolved, Some(6));
1447            },
1448        );
1449    }
1450
1451    #[test]
1452    fn test_process_core_resolve_from_generic_env_var() {
1453        with_env_vars(
1454            [
1455                (runtime_env::PRODUCER_CORE, None),
1456                (runtime_env::PROCESS_CORE, Some("10")),
1457            ],
1458            || {
1459                let resolved = resolve_process_core(None, ProcessRole::Producer);
1460                assert_eq!(resolved, Some(10));
1461            },
1462        );
1463    }
1464
1465    #[test]
1466    fn test_process_core_resolve_from_invalid_role_specific_env_var() {
1467        with_env_vars(
1468            [
1469                (runtime_env::CONSUMER_CORE, Some("invalid")),
1470                (runtime_env::PROCESS_CORE, Some("12")),
1471            ],
1472            || {
1473                let resolved = resolve_process_core(None, ProcessRole::Consumer);
1474                assert_eq!(resolved, None);
1475            },
1476        );
1477    }
1478}