Skip to main content

disruptor_mp/
api.rs

1//! Cross-process producer / consumer API surface for `disruptor-mp`.
2//!
3//! This private module holds the public types re-exported from the
4//! crate root: producers, consumers, shared-memory helpers, mmap
5//! helpers, and coordination primitives.
6//!
7//! The important split is:
8//!
9//! - builder entry points create or attach the underlying transport
10//! - producers publish into the ring
11//! - consumers poll or process available events
12//! - cursors and barriers provide the shared coordination state
13//!
14//! Prefer the crate-root docs and README for the high-level transport
15//! model. This module exists to document the concrete API surface.
16
17// Doctests in `builder.rs` reference items that re-export through this
18// module's public API; suppressing the lint here keeps those examples
19// while not promoting the module to `pub`.
20#[allow(rustdoc::private_doc_tests)]
21#[path = "builder.rs"]
22mod builder;
23#[path = "consumer.rs"]
24mod consumer;
25#[path = "lock_free/consumer_barrier.rs"]
26mod consumer_barrier;
27#[path = "lock_free/cursor.rs"]
28mod cursor;
29#[path = "producer.rs"]
30mod producer;
31#[path = "backend/shared_memory/ringbuffer.rs"]
32mod ringbuffer;
33#[path = "runtime/wait.rs"]
34mod wait;
35
36/// Shared-memory data-plane primitives.
37///
38/// This is the recommended namespace for buffer/config types used by library consumers.
39pub mod shared_memory {
40    pub use super::ringbuffer::SharedRingBuffer;
41    pub use super::SharedMemoryConfig;
42
43    /// Short alias for shared-memory ring buffer.
44    pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
45}
46
47/// Backend namespaces for storage implementations.
48///
49/// Keeping this namespace now makes it easy to add `mmap`/`arena` backends later
50/// without changing high-level imports.
51pub mod backend {
52    /// Shared-memory backend implementation.
53    pub mod shared_memory {
54        pub use super::super::ringbuffer::SharedRingBuffer;
55        pub use super::super::SharedMemoryConfig;
56
57        /// Short alias for shared-memory ring buffer.
58        pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
59    }
60
61    /// File-backed mmap backend implementation.
62    pub mod mmap {
63        pub use super::super::MmapConsumerBarrier;
64        pub use super::super::MmapCursorConfig;
65        pub use super::super::MmapFileConfig;
66        pub use super::super::MmapTransportLayout;
67        pub use crate::mmap_consumer::MmapConsumer;
68        pub use crate::mmap_cursor::MmapCursor;
69        pub use crate::mmap_producer::MmapProducer;
70        pub use crate::mmap_ringbuffer::MmapRingBuffer;
71    }
72}
73
74/// Lock-free coordination primitives.
75pub mod lock_free {
76    pub use super::consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
77    pub use super::cursor::{SharedCursor, SharedCursorTrait};
78
79    /// Producer-side sequencing barrier represented by a shared cursor.
80    pub type ProducerBarrier = super::cursor::SharedCursor;
81}
82
83/// Return the configured backoff used by `AutoWaitStrategy::Block`.
84#[inline]
85pub fn default_block_strategy_duration() -> std::time::Duration {
86    wait::default_block_strategy_duration()
87}
88
89/// Return the configured sleep used by `consume_next_with_sleep` and similar helpers.
90#[inline]
91pub fn default_consume_sleep_duration() -> std::time::Duration {
92    wait::default_consume_sleep_duration()
93}
94
95/// Return the configured poll interval used by discovery/startup coordination loops.
96#[inline]
97pub fn default_discovery_poll_duration() -> std::time::Duration {
98    wait::default_discovery_poll_duration()
99}
100
101/// Apply the configured `AutoWaitStrategy::Block` wait policy.
102#[inline]
103pub fn perform_default_block_wait() {
104    wait::perform_default_block_wait()
105}
106
107/// Apply the configured consumer sleep wait policy.
108#[inline]
109pub fn perform_default_consume_sleep_wait() {
110    wait::perform_default_consume_sleep_wait()
111}
112
113/// Apply the configured discovery/startup polling wait policy.
114#[inline]
115pub fn perform_default_discovery_poll_wait() {
116    wait::perform_default_discovery_poll_wait()
117}
118
119/// Apply the explicit `AutoWaitStrategy::Sleep(duration)` wait policy.
120#[inline]
121pub fn perform_sleep_wait(duration: std::time::Duration) {
122    wait::perform_sleep_wait(duration)
123}
124
125pub use crate::mmap_barrier::MmapConsumerBarrier;
126pub use crate::mmap_consumer::MmapConsumer;
127pub use crate::mmap_cursor::MmapCursor;
128pub use crate::mmap_producer::MmapProducer;
129pub use crate::mmap_ringbuffer::MmapRingBuffer;
130pub use crate::mmap_transport::MmapTransportLayout;
131pub use builder::{
132    attach_shared_consumer, build_shared_single_producer, AutoConsumer, AutoWaitStrategy,
133    SharedDisruptorBuilder,
134};
135pub use consumer::{ConsumerCounterSelection, SharedConsumer};
136pub use consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
137pub use cursor::{SharedCursor, SharedCursorTrait};
138pub use producer::{CoordinationMode, ProducerCounterSelection, SharedProducer};
139pub use ringbuffer::SharedRingBuffer;
140pub use shared_memory::ShmRingBuffer;
141
142// Re-exports for public API
143use std::{fmt, path::PathBuf};
144
145/// Default maximum number of consumers that can be registered with a single shared ring buffer.
146///
147/// This limit helps prevent unbounded memory usage in the consumer registry and ensures
148/// reasonable performance when tracking minimum consumer sequences. Applications requiring
149/// more consumers can implement custom coordination mechanisms.
150pub const DEFAULT_MAX_CONSUMERS: usize = 64;
151
152/// Errors that can occur during multi-process setup
153#[derive(Debug, thiserror::Error)]
154pub enum MultiProcessError {
155    /// Failed to create shared memory
156    #[error("Failed to create shared memory: {0}")]
157    SharedMemoryError(String),
158
159    /// Failed to map memory
160    #[error("Failed to map memory: {0}")]
161    MemoryMapError(String),
162
163    /// Shared segment not found
164    #[error("Shared segment not found: {0}")]
165    SegmentNotFound(String),
166
167    /// Incompatible data layout
168    #[error("Incompatible data layout: {0}")]
169    IncompatibleLayout(String),
170
171    /// Permission denied
172    #[error("Permission denied")]
173    PermissionDenied,
174
175    /// Coordination timeout during startup
176    #[error("Coordination timeout: {0}")]
177    CoordinationTimeout(String),
178}
179
180/// Result type for multi-process operations
181pub type MultiProcessResult<T> = Result<T, MultiProcessError>;
182
183/// Configuration for shared memory segments
184#[derive(Debug, Clone)]
185pub struct SharedMemoryConfig {
186    /// Name of the shared memory segment
187    pub name: String,
188    /// Size of the ring buffer (must be power of 2)
189    pub buffer_size: usize,
190    /// Element size in bytes
191    pub element_size: usize,
192    /// Whether to create the segment (producer) or attach to existing (consumer)
193    pub create: bool,
194}
195
196impl fmt::Display for SharedMemoryConfig {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        write!(
199            f,
200            "SharedMemory(name={}, size={}, element_size={})",
201            self.name, self.buffer_size, self.element_size
202        )
203    }
204}
205
206/// Configuration for file-backed mmap segments.
207#[derive(Debug, Clone)]
208pub struct MmapFileConfig {
209    /// Path to the backing file used for the shared mapping.
210    pub path: PathBuf,
211    /// Size of the ring buffer (must be power of 2).
212    pub buffer_size: usize,
213    /// Element size in bytes.
214    pub element_size: usize,
215    /// Whether to create the backing file or attach to an existing one.
216    pub create: bool,
217}
218
219impl fmt::Display for MmapFileConfig {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        write!(
222            f,
223            "MmapFile(path={}, size={}, element_size={})",
224            self.path.display(),
225            self.buffer_size,
226            self.element_size
227        )
228    }
229}
230
231/// Configuration for file-backed mmap cursor segments.
232#[derive(Debug, Clone)]
233pub struct MmapCursorConfig {
234    /// Path to the backing file used for the shared mapping.
235    pub path: PathBuf,
236    /// Whether to create the backing file or attach to an existing one.
237    pub create: bool,
238}
239
240impl fmt::Display for MmapCursorConfig {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        write!(f, "MmapCursor(path={})", self.path.display())
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::MissingFreeSlots;
250    use crate::RequiredConsumerError;
251    use crate::RequiredConsumerLivenessConfig;
252    use crate::RingBufferFull;
253    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
254    use std::sync::Arc;
255    use std::thread;
256    use std::time::{Duration, Instant};
257
258    fn unique_test_segment(prefix: &str) -> String {
259        let name = crate::portable_shm_segment_name(prefix);
260        assert!(
261            name.len() <= 14,
262            "test segment name '{}' exceeds macOS-safe budget",
263            name
264        );
265        name
266    }
267
268    #[test]
269    fn test_unique_test_segment_stays_within_macos_budget() {
270        assert!(unique_test_segment("process_available_blocking_batch").len() <= 14);
271        assert!(unique_test_segment("race_condition_fix").len() <= 14);
272    }
273
274    #[derive(Debug, Copy, Clone, Default, PartialEq)]
275    struct TestEvent {
276        sequence: i64,
277        data: i64,
278    }
279
280    fn attach_named_consumer(
281        name: &str,
282        buffer_size: usize,
283        consumer_id: &str,
284    ) -> SharedConsumer<TestEvent> {
285        let config = SharedMemoryConfig {
286            name: name.to_string(),
287            buffer_size,
288            element_size: std::mem::size_of::<TestEvent>(),
289            create: false,
290        };
291
292        SharedDisruptorBuilder::new(config)
293            .with_consumer_id(consumer_id)
294            .build_consumer()
295            .unwrap()
296    }
297
298    #[test]
299    fn managed_publish_reports_missing_required_consumer_at_startup() {
300        let name = unique_test_segment("req_cons_start");
301        let buffer_size = 8;
302
303        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
304            .enable_discovery(2)
305            .with_coordination(CoordinationMode::Immediate)
306            .build_producer(TestEvent::default)
307            .unwrap();
308        producer.enable_required_consumer_liveness(
309            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
310                .with_startup_wait_timeout(Duration::from_millis(50))
311                .with_progress_timeout(Duration::from_millis(20))
312                .with_progress_check_interval(Duration::from_millis(1))
313                .with_shutdown_grace_period(Duration::from_millis(10)),
314        );
315
316        let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
317
318        let error = producer
319            .publish_managed(|event| {
320                event.sequence = 1;
321                event.data = 100;
322            })
323            .expect_err("managed publish should fail when c2 never appears");
324
325        match error {
326            RequiredConsumerError::StartupTimeout { missing } => {
327                assert_eq!(missing, vec!["c2".to_string()]);
328            }
329            other => panic!("unexpected error: {other:?}"),
330        }
331    }
332
333    #[test]
334    fn managed_batch_publish_reports_missing_required_consumer_at_startup() {
335        let name = unique_test_segment("req_cons_batch_start");
336        let buffer_size = 8;
337
338        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
339            .enable_discovery(2)
340            .with_coordination(CoordinationMode::Immediate)
341            .build_producer(TestEvent::default)
342            .unwrap();
343        producer.enable_required_consumer_liveness(
344            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
345                .with_startup_wait_timeout(Duration::from_millis(50))
346                .with_progress_timeout(Duration::from_millis(20))
347                .with_progress_check_interval(Duration::from_millis(1))
348                .with_shutdown_grace_period(Duration::from_millis(10)),
349        );
350
351        let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
352
353        let error = producer
354            .publish_batch_managed(2, |event, index| {
355                event.sequence = index as i64;
356                event.data = (index as i64) * 10;
357            })
358            .expect_err("managed batch publish should fail when c2 never appears");
359
360        match error {
361            RequiredConsumerError::StartupTimeout { missing } => {
362                assert_eq!(missing, vec!["c2".to_string()]);
363            }
364            other => panic!("unexpected error: {other:?}"),
365        }
366    }
367
368    #[test]
369    fn managed_publish_shuts_down_when_required_consumer_stalls() {
370        let name = unique_test_segment("req_cons_fail");
371        let buffer_size = 4;
372
373        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
374            .enable_discovery(2)
375            .with_coordination(CoordinationMode::Immediate)
376            .build_producer(TestEvent::default)
377            .unwrap();
378        producer.enable_required_consumer_liveness(
379            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
380                .with_startup_wait_timeout(Duration::from_millis(100))
381                .with_progress_timeout(Duration::from_millis(20))
382                .with_progress_check_interval(Duration::from_millis(1))
383                .with_shutdown_grace_period(Duration::from_millis(20)),
384        );
385
386        let stop_consumer1 = Arc::new(AtomicBool::new(false));
387        let stop_consumer1_thread = Arc::clone(&stop_consumer1);
388        let name_for_thread = name.clone();
389        let consumer1_thread = thread::spawn(move || {
390            let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
391            while !stop_consumer1_thread.load(Ordering::Acquire) {
392                if consumer1.try_consume_next().is_none() {
393                    thread::sleep(Duration::from_millis(1));
394                }
395            }
396        });
397
398        let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
399
400        producer
401            .publish_managed(|event| {
402                event.sequence = 0;
403                event.data = 0;
404            })
405            .unwrap();
406        let _ = consumer2.consume_next();
407        drop(consumer2);
408
409        for i in 1..=buffer_size {
410            producer
411                .publish_managed(|event| {
412                    event.sequence = i as i64;
413                    event.data = (i as i64) * 10;
414                })
415                .unwrap();
416        }
417
418        let error = producer
419            .publish_managed(|event| {
420                event.sequence = 99;
421                event.data = 990;
422            })
423            .expect_err("managed publish should fail once c2 stops advancing");
424
425        stop_consumer1.store(true, Ordering::Release);
426        consumer1_thread.join().unwrap();
427
428        match error {
429            RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
430                assert_eq!(consumer_id, "c2");
431            }
432            other => panic!("unexpected error: {other:?}"),
433        }
434    }
435
436    #[test]
437    fn managed_publish_recovers_when_same_consumer_id_rejoins() {
438        let name = unique_test_segment("req_cons_rejn");
439        let buffer_size = 4;
440
441        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
442            .enable_discovery(2)
443            .with_coordination(CoordinationMode::Immediate)
444            .build_producer(TestEvent::default)
445            .unwrap();
446        producer.enable_required_consumer_liveness(
447            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
448                .with_startup_wait_timeout(Duration::from_millis(100))
449                .with_progress_timeout(Duration::from_millis(20))
450                .with_progress_check_interval(Duration::from_millis(1))
451                .with_shutdown_grace_period(Duration::from_millis(200)),
452        );
453
454        let stop_consumer1 = Arc::new(AtomicBool::new(false));
455        let stop_consumer1_thread = Arc::clone(&stop_consumer1);
456        let name_for_thread = name.clone();
457        let consumer1_thread = thread::spawn(move || {
458            let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
459            while !stop_consumer1_thread.load(Ordering::Acquire) {
460                if consumer1.try_consume_next().is_none() {
461                    thread::sleep(Duration::from_millis(1));
462                }
463            }
464        });
465
466        let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
467
468        producer
469            .publish_managed(|event| {
470                event.sequence = 0;
471                event.data = 0;
472            })
473            .unwrap();
474        let _ = consumer2.consume_next();
475        drop(consumer2);
476
477        for i in 1..=buffer_size {
478            producer
479                .publish_managed(|event| {
480                    event.sequence = i as i64;
481                    event.data = (i as i64) * 10;
482                })
483                .unwrap();
484        }
485
486        let name_for_rejoin = name.clone();
487        let rejoin_thread = thread::spawn(move || {
488            thread::sleep(Duration::from_millis(40));
489            let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
490            let deadline = Instant::now() + Duration::from_millis(500);
491            let mut consumed = 0usize;
492            while Instant::now() < deadline && consumed < buffer_size + 2 {
493                if rejoined.try_consume_next().is_some() {
494                    consumed += 1;
495                } else {
496                    thread::sleep(Duration::from_millis(1));
497                }
498            }
499            consumed
500        });
501
502        let sequence = producer
503            .publish_managed(|event| {
504                event.sequence = 99;
505                event.data = 990;
506            })
507            .expect("same-id rejoin should recover before shutdown");
508
509        stop_consumer1.store(true, Ordering::Release);
510        consumer1_thread.join().unwrap();
511        let rejoined_consumed = rejoin_thread.join().unwrap();
512
513        assert!(sequence >= buffer_size as i64);
514        assert!(rejoined_consumed > 0, "rejoined c2 should consume backlog");
515    }
516
517    #[test]
518    fn managed_publish_rejects_different_consumer_id_rejoin() {
519        let name = unique_test_segment("req_cons_diff");
520        let buffer_size = 4;
521
522        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
523            .enable_discovery(2)
524            .with_coordination(CoordinationMode::Immediate)
525            .build_producer(TestEvent::default)
526            .unwrap();
527        producer.enable_required_consumer_liveness(
528            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
529                .with_startup_wait_timeout(Duration::from_millis(100))
530                .with_progress_timeout(Duration::from_millis(20))
531                .with_progress_check_interval(Duration::from_millis(1))
532                .with_shutdown_grace_period(Duration::from_millis(200)),
533        );
534
535        let stop_consumer1 = Arc::new(AtomicBool::new(false));
536        let stop_consumer1_thread = Arc::clone(&stop_consumer1);
537        let name_for_thread = name.clone();
538        let consumer1_thread = thread::spawn(move || {
539            let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
540            while !stop_consumer1_thread.load(Ordering::Acquire) {
541                if consumer1.try_consume_next().is_none() {
542                    thread::sleep(Duration::from_millis(1));
543                }
544            }
545        });
546
547        let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
548        producer
549            .publish_managed(|event| {
550                event.sequence = 0;
551                event.data = 0;
552            })
553            .unwrap();
554        let _ = consumer2.consume_next();
555        drop(consumer2);
556
557        for i in 1..=buffer_size {
558            producer
559                .publish_managed(|event| {
560                    event.sequence = i as i64;
561                    event.data = (i as i64) * 10;
562                })
563                .unwrap();
564        }
565
566        let name_for_wrong_rejoin = name.clone();
567        let wrong_rejoin_thread = thread::spawn(move || {
568            thread::sleep(Duration::from_millis(40));
569            let mut wrong_consumer =
570                attach_named_consumer(&name_for_wrong_rejoin, buffer_size, "c3");
571            let deadline = Instant::now() + Duration::from_millis(500);
572            let mut consumed = 0usize;
573            while Instant::now() < deadline && consumed < buffer_size + 2 {
574                if wrong_consumer.try_consume_next().is_some() {
575                    consumed += 1;
576                } else {
577                    thread::sleep(Duration::from_millis(1));
578                }
579            }
580            consumed
581        });
582
583        let error = producer
584            .publish_managed(|event| {
585                event.sequence = 99;
586                event.data = 990;
587            })
588            .expect_err("wrong-id rejoin must not clear the c2 stall");
589
590        stop_consumer1.store(true, Ordering::Release);
591        consumer1_thread.join().unwrap();
592        let wrong_rejoin_consumed = wrong_rejoin_thread.join().unwrap();
593
594        assert!(
595            wrong_rejoin_consumed > 0,
596            "a new consumer id may still read the broadcast stream"
597        );
598        match error {
599            RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
600                assert_eq!(consumer_id, "c2");
601            }
602            other => panic!("unexpected error: {other:?}"),
603        }
604    }
605
606    #[test]
607    fn managed_publish_rejoin_after_grace_period_still_fails() {
608        let name = unique_test_segment("req_cons_late");
609        let buffer_size = 4;
610
611        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
612            .enable_discovery(2)
613            .with_coordination(CoordinationMode::Immediate)
614            .build_producer(TestEvent::default)
615            .unwrap();
616        producer.enable_required_consumer_liveness(
617            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
618                .with_startup_wait_timeout(Duration::from_millis(100))
619                .with_progress_timeout(Duration::from_millis(20))
620                .with_progress_check_interval(Duration::from_millis(1))
621                .with_shutdown_grace_period(Duration::from_millis(60)),
622        );
623
624        let stop_consumer1 = Arc::new(AtomicBool::new(false));
625        let stop_consumer1_thread = Arc::clone(&stop_consumer1);
626        let name_for_thread = name.clone();
627        let consumer1_thread = thread::spawn(move || {
628            let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
629            while !stop_consumer1_thread.load(Ordering::Acquire) {
630                if consumer1.try_consume_next().is_none() {
631                    thread::sleep(Duration::from_millis(1));
632                }
633            }
634        });
635
636        let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
637        producer
638            .publish_managed(|event| {
639                event.sequence = 0;
640                event.data = 0;
641            })
642            .unwrap();
643        let _ = consumer2.consume_next();
644        drop(consumer2);
645
646        for i in 1..=buffer_size {
647            producer
648                .publish_managed(|event| {
649                    event.sequence = i as i64;
650                    event.data = (i as i64) * 10;
651                })
652                .unwrap();
653        }
654
655        let name_for_rejoin = name.clone();
656        let rejoin_thread = thread::spawn(move || {
657            thread::sleep(Duration::from_millis(140));
658            let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
659            let deadline = Instant::now() + Duration::from_millis(300);
660            let mut consumed = 0usize;
661            while Instant::now() < deadline && consumed < buffer_size + 2 {
662                if rejoined.try_consume_next().is_some() {
663                    consumed += 1;
664                } else {
665                    thread::sleep(Duration::from_millis(1));
666                }
667            }
668            consumed
669        });
670
671        let error = producer
672            .publish_managed(|event| {
673                event.sequence = 99;
674                event.data = 990;
675            })
676            .expect_err("late same-id rejoin must not rescue the topology after grace expires");
677
678        stop_consumer1.store(true, Ordering::Release);
679        consumer1_thread.join().unwrap();
680        let rejoined_consumed = rejoin_thread.join().unwrap();
681
682        assert!(
683            rejoined_consumed > 0,
684            "late rejoined consumer may still drain retained backlog"
685        );
686        match error {
687            RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
688                assert_eq!(consumer_id, "c2");
689            }
690            other => panic!("unexpected error: {other:?}"),
691        }
692    }
693
694    #[test]
695    fn managed_publish_does_not_fail_while_topology_is_idle() {
696        let name = unique_test_segment("req_cons_idle");
697        let buffer_size = 8;
698
699        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
700            .enable_discovery(2)
701            .with_coordination(CoordinationMode::Immediate)
702            .build_producer(TestEvent::default)
703            .unwrap();
704        producer.enable_required_consumer_liveness(
705            RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
706                .with_startup_wait_timeout(Duration::from_millis(100))
707                .with_progress_timeout(Duration::from_millis(20))
708                .with_progress_check_interval(Duration::from_millis(1))
709                .with_shutdown_grace_period(Duration::from_millis(50)),
710        );
711
712        let mut consumer1 = attach_named_consumer(&name, buffer_size, "c1");
713        let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
714
715        producer
716            .publish_managed(|event| {
717                event.sequence = 1;
718                event.data = 10;
719            })
720            .unwrap();
721        assert_eq!(consumer1.consume_next().0, 0);
722        assert_eq!(consumer2.consume_next().0, 0);
723
724        thread::sleep(Duration::from_millis(75));
725
726        producer
727            .publish_managed(|event| {
728                event.sequence = 2;
729                event.data = 20;
730            })
731            .expect("idle topology must not trigger stall shutdown");
732
733        assert_eq!(consumer1.consume_next().0, 1);
734        assert_eq!(consumer2.consume_next().0, 1);
735    }
736
737    // ============================================================================
738    // BASIC FUNCTIONALITY TESTS
739    // ============================================================================
740
741    #[test]
742    fn test_shared_ring_buffer_creation_and_attachment() {
743        let name = unique_test_segment("test_ring_basic");
744        let buffer_size = 8;
745
746        // Producer creates the ring buffer
747        let config_create = SharedMemoryConfig {
748            name: name.clone(),
749            buffer_size,
750            element_size: std::mem::size_of::<TestEvent>(),
751            create: true,
752        };
753
754        let ring_buffer = SharedRingBuffer::new(config_create, TestEvent::default).unwrap();
755        assert_eq!(ring_buffer.size(), buffer_size);
756
757        // Consumer attaches to existing ring buffer
758        let config_attach = SharedMemoryConfig {
759            name,
760            buffer_size,
761            element_size: std::mem::size_of::<TestEvent>(),
762            create: false,
763        };
764
765        let attached_buffer: SharedRingBuffer<TestEvent> =
766            SharedRingBuffer::attach(config_attach).unwrap();
767        assert_eq!(attached_buffer.size(), buffer_size);
768        assert_eq!(ring_buffer.size(), attached_buffer.size());
769    }
770
771    #[test]
772    fn test_basic_producer_consumer_coordination() {
773        let name = unique_test_segment("test_basic_coord");
774        let buffer_size = 8;
775
776        // Create producer
777        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
778            .build_producer(TestEvent::default)
779            .unwrap();
780
781        // Create consumer
782        let config = SharedMemoryConfig {
783            name,
784            buffer_size,
785            element_size: std::mem::size_of::<TestEvent>(),
786            create: false,
787        };
788        let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
789            .build_consumer()
790            .unwrap();
791
792        // Test single event publish/consume cycle
793        producer.publish(|event| {
794            event.sequence = 0;
795            event.data = 42;
796        });
797
798        let mut consumed_events = Vec::new();
799
800        // Use try_consume_next instead of process_available
801        if let Some((seq, event)) = consumer.try_consume_next() {
802            consumed_events.push((seq, event));
803        }
804
805        assert_eq!(consumed_events.len(), 1);
806        assert_eq!(consumed_events[0].0, 0); // sequence
807        assert_eq!(consumed_events[0].1.sequence, 0);
808        assert_eq!(consumed_events[0].1.data, 42);
809    }
810
811    // ============================================================================
812    // SINGLE PRODUCER SINGLE CONSUMER (SPSC) TESTS
813    // ============================================================================
814
815    #[test]
816    fn test_spsc_ring_buffer_full_behavior() {
817        let name = unique_test_segment("spsc_full");
818        let buffer_size = 4;
819
820        // Create consumer first in a thread
821        let name_clone = name.clone();
822        let consumer_handle = thread::spawn(move || {
823            let config = SharedMemoryConfig {
824                name: name_clone,
825                buffer_size,
826                element_size: std::mem::size_of::<TestEvent>(),
827                create: false,
828            };
829
830            // Wait a bit for producer to create segment
831            thread::sleep(Duration::from_millis(50));
832
833            let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
834                .build_consumer()
835                .unwrap();
836
837            // Process events after a delay to test buffer full behavior
838            thread::sleep(Duration::from_millis(200));
839
840            let mut consumed = Vec::new();
841            let processed = consumer.process_available(|event: &TestEvent, _| {
842                consumed.push(event.data);
843            });
844
845            (processed, consumed)
846        });
847
848        // Create producer with discovery enabled
849        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
850            .enable_discovery(1) // Enable discovery to track consumer
851            .build_producer(TestEvent::default)
852            .unwrap();
853
854        // Give time for consumer to attach and be discovered
855        thread::sleep(Duration::from_millis(100));
856
857        // Fill the ring buffer to capacity
858        for i in 0..buffer_size {
859            producer
860                .try_publish(|e| {
861                    e.sequence = i as i64;
862                    e.data = i as i64 * 10;
863                })
864                .expect("Should be able to publish to non-full buffer");
865        }
866
867        // Next publish should fail - ring buffer is full (consumer hasn't processed yet)
868        assert_eq!(
869            producer
870                .try_publish(|e| e.sequence = buffer_size as i64)
871                .err()
872                .unwrap(),
873            RingBufferFull
874        );
875
876        // Wait for consumer thread to process events
877        let (processed, consumed) = consumer_handle.join().unwrap();
878        assert_eq!(processed, buffer_size); // Consumer processes all available events
879
880        // Now should be able to publish again
881        producer
882            .try_publish(|e| {
883                e.sequence = buffer_size as i64;
884                e.data = 999;
885            })
886            .expect("Should be able to publish after consumer freed space");
887
888        // Verify all events were consumed correctly
889        let expected: Vec<i64> = (0..buffer_size).map(|i| i as i64 * 10).collect();
890        assert_eq!(consumed, expected);
891    }
892
893    #[test]
894    fn test_spsc_ordered_event_processing() {
895        let name = unique_test_segment("spsc_ordered");
896        let buffer_size = 16; // Large enough to hold all test events
897        let num_events = 10;
898
899        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
900            .build_producer(TestEvent::default)
901            .unwrap();
902
903        let config = SharedMemoryConfig {
904            name,
905            buffer_size,
906            element_size: std::mem::size_of::<TestEvent>(),
907            create: false,
908        };
909        let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
910            .build_consumer()
911            .unwrap();
912
913        // Publish events
914        for i in 0..num_events {
915            producer.publish(|event| {
916                event.sequence = i as i64;
917                event.data = (i as i64) * (i as i64); // Square for easy verification
918            });
919        }
920
921        // Consume all events
922        let mut consumed_events = Vec::new();
923        let mut total_processed = 0;
924
925        // May need multiple calls to process_available to get all events
926        while total_processed < num_events {
927            let processed = consumer.process_available(|event: &TestEvent, seq| {
928                consumed_events.push((seq, event.sequence, event.data));
929            });
930            total_processed += processed;
931
932            if processed == 0 {
933                // Small yield to allow any pending operations to complete
934                thread::yield_now();
935            }
936        }
937
938        // Verify all events were consumed in order
939        assert_eq!(consumed_events.len(), num_events);
940        for (i, &(seq, event_seq, data)) in consumed_events.iter().enumerate() {
941            assert_eq!(seq, i as i64);
942            assert_eq!(event_seq, i as i64);
943            assert_eq!(data, (i as i64) * (i as i64));
944        }
945    }
946
947    #[test]
948    fn test_process_available_advances_consumer_sequence_after_batch() {
949        let name = unique_test_segment("process_available_batch");
950        let buffer_size = 16;
951        let num_events = 6;
952
953        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
954            .build_producer(TestEvent::default)
955            .unwrap();
956
957        let config = SharedMemoryConfig {
958            name,
959            buffer_size,
960            element_size: std::mem::size_of::<TestEvent>(),
961            create: false,
962        };
963        let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
964            .build_consumer()
965            .unwrap();
966
967        for i in 0..num_events {
968            producer.publish(|event| {
969                event.sequence = i as i64;
970                event.data = i as i64 * 10;
971            });
972        }
973
974        let mut consumed = Vec::new();
975        let processed = consumer.process_available(|event: &TestEvent, seq| {
976            consumed.push((seq, event.sequence, event.data));
977        });
978
979        assert_eq!(processed, num_events);
980        assert_eq!(consumed.len(), num_events);
981        assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
982        assert_eq!(consumer.producer_sequence(), (num_events - 1) as i64);
983        assert_eq!(consumer.consumer_sequence(), (num_events - 1) as i64);
984    }
985
986    #[test]
987    fn test_process_available_blocking_marks_only_final_event_as_end_of_batch() {
988        let name = unique_test_segment("process_available_blocking_batch");
989        let buffer_size = 16;
990        let num_events = 4;
991
992        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
993            .build_producer(TestEvent::default)
994            .unwrap();
995
996        let config = SharedMemoryConfig {
997            name,
998            buffer_size,
999            element_size: std::mem::size_of::<TestEvent>(),
1000            create: false,
1001        };
1002        let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1003            .build_consumer()
1004            .unwrap();
1005
1006        for i in 0..num_events {
1007            producer.publish(|event| {
1008                event.sequence = i as i64;
1009                event.data = i as i64;
1010            });
1011        }
1012
1013        let mut observed = Vec::new();
1014        let processed =
1015            consumer.process_available_blocking(|event: &TestEvent, seq, end_of_batch| {
1016                observed.push((seq, event.sequence, end_of_batch));
1017            });
1018
1019        assert_eq!(processed, num_events);
1020        assert_eq!(
1021            observed,
1022            vec![(0, 0, false), (1, 1, false), (2, 2, false), (3, 3, true),]
1023        );
1024        assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
1025    }
1026
1027    #[test]
1028    fn test_per_consumer_sequences_prevent_race_conditions() {
1029        let name = unique_test_segment("per_consumer_test");
1030        let buffer_size = 64;
1031        let num_events = 10; // Start with fewer events for debugging
1032
1033        // Create producer
1034        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1035            .build_producer(TestEvent::default)
1036            .unwrap();
1037
1038        // Create two consumers
1039        let config = SharedMemoryConfig {
1040            name: name.clone(),
1041            buffer_size,
1042            element_size: std::mem::size_of::<TestEvent>(),
1043            create: false,
1044        };
1045
1046        let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
1047            .build_consumer()
1048            .unwrap();
1049
1050        let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1051            .build_consumer()
1052            .unwrap();
1053
1054        println!("Created two consumers for broadcast test");
1055
1056        // Publish some events
1057        for i in 0..num_events {
1058            producer.publish(|event| {
1059                event.sequence = i as i64;
1060                event.data = i as i64;
1061            });
1062            println!("Published event {}", i);
1063        }
1064
1065        // Check initial state
1066        let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
1067        let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
1068        println!(
1069            "After publishing - Consumer 1: current={}, producer={}, consumer={}",
1070            seq1, prod_seq1, consumer_seq1
1071        );
1072        println!(
1073            "After publishing - Consumer 2: current={}, producer={}, consumer={}",
1074            seq2, prod_seq2, consumer_seq2
1075        );
1076
1077        // Let each consumer process all events (broadcast semantics)
1078        let mut consumer1_events = Vec::new();
1079        let mut consumer2_events = Vec::new();
1080
1081        // Consumer 1 processes all events
1082        let _processed1 = consumer1.process_available(|event: &TestEvent, _seq| {
1083            consumer1_events.push((event.sequence, event.data));
1084            println!(
1085                "Consumer 1 processed event: seq={}, data={}",
1086                event.sequence, event.data
1087            );
1088        });
1089
1090        // Consumer 2 processes all events
1091        let _processed2 = consumer2.process_available(|event: &TestEvent, _seq| {
1092            consumer2_events.push((event.sequence, event.data));
1093            println!(
1094                "Consumer 2 processed event: seq={}, data={}",
1095                event.sequence, event.data
1096            );
1097        });
1098
1099        println!(
1100            "Consumer 1 processed {} events: {:?}",
1101            consumer1_events.len(),
1102            consumer1_events
1103        );
1104        println!(
1105            "Consumer 2 processed {} events: {:?}",
1106            consumer2_events.len(),
1107            consumer2_events
1108        );
1109
1110        // Check state after processing
1111        let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
1112        let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
1113        println!(
1114            "After processing - Consumer 1: current={}, producer={}, consumer={}",
1115            seq1, prod_seq1, consumer_seq1
1116        );
1117        println!(
1118            "After processing - Consumer 2: current={}, producer={}, consumer={}",
1119            seq2, prod_seq2, consumer_seq2
1120        );
1121
1122        // Verify broadcast semantics: each consumer should see all events
1123        assert_eq!(
1124            consumer1_events.len(),
1125            num_events,
1126            "Consumer 1 should see all events"
1127        );
1128        assert_eq!(
1129            consumer2_events.len(),
1130            num_events,
1131            "Consumer 2 should see all events"
1132        );
1133
1134        // Verify events are in order and complete
1135        for i in 0..num_events {
1136            assert_eq!(consumer1_events[i], (i as i64, i as i64));
1137            assert_eq!(consumer2_events[i], (i as i64, i as i64));
1138        }
1139
1140        println!("SUCCESS: Both consumers saw all events (broadcast semantics)!");
1141    }
1142
1143    #[test]
1144    fn test_broadcast_consumer_basic() {
1145        let name = unique_test_segment("broadcast_basic");
1146        let buffer_size = 64;
1147        let num_events = 5;
1148
1149        // Create producer
1150        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1151            .build_producer(TestEvent::default)
1152            .unwrap();
1153
1154        // Create two consumers
1155        let config = SharedMemoryConfig {
1156            name: name.clone(),
1157            buffer_size,
1158            element_size: std::mem::size_of::<TestEvent>(),
1159            create: false,
1160        };
1161
1162        let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
1163            .build_consumer()
1164            .unwrap();
1165
1166        let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1167            .build_consumer()
1168            .unwrap();
1169
1170        println!("Created two consumers for basic broadcast test");
1171        println!(
1172            "Consumer 1 ID: {}, Consumer 2 ID: {}",
1173            consumer1.consumer_id(),
1174            consumer2.consumer_id()
1175        );
1176
1177        // Publish events
1178        for i in 0..num_events {
1179            producer.publish(|event| {
1180                event.sequence = i as i64;
1181                event.data = i as i64;
1182            });
1183        }
1184
1185        // Each consumer processes all events
1186        let mut consumer1_events = Vec::new();
1187        let mut consumer2_events = Vec::new();
1188
1189        consumer1.process_available(|event: &TestEvent, _seq| {
1190            consumer1_events.push(event.sequence);
1191        });
1192
1193        consumer2.process_available(|event: &TestEvent, _seq| {
1194            consumer2_events.push(event.sequence);
1195        });
1196
1197        println!("Consumer 1 processed: {:?}", consumer1_events);
1198        println!("Consumer 2 processed: {:?}", consumer2_events);
1199
1200        // Both consumers should see all events
1201        assert_eq!(
1202            consumer1_events.len(),
1203            num_events,
1204            "Consumer 1 should see all events"
1205        );
1206        assert_eq!(
1207            consumer2_events.len(),
1208            num_events,
1209            "Consumer 2 should see all events"
1210        );
1211
1212        // Events should be in order
1213        for i in 0..num_events {
1214            assert_eq!(consumer1_events[i], i as i64);
1215            assert_eq!(consumer2_events[i], i as i64);
1216        }
1217
1218        println!("Both consumers saw all events in order!");
1219    }
1220
1221    // ============================================================================
1222    // STRESS AND PERFORMANCE TESTS
1223    // ============================================================================
1224
1225    // ============================================================================
1226    // ATOMIC OPERATIONS TESTS
1227    // ============================================================================
1228
1229    #[test]
1230    fn test_shared_cursor_operations() {
1231        let name = unique_test_segment("atomic_ops");
1232        let cursor = SharedCursor::new(&name, 0).unwrap();
1233
1234        // Basic operations
1235        assert_eq!(cursor.load(Ordering::Relaxed), 0);
1236
1237        cursor.store(42, Ordering::Relaxed);
1238        assert_eq!(cursor.load(Ordering::Relaxed), 42);
1239
1240        let old = cursor.fetch_add(8, Ordering::Relaxed);
1241        assert_eq!(old, 42);
1242        assert_eq!(cursor.load(Ordering::Relaxed), 50);
1243
1244        // Compare and exchange
1245        let result = cursor.compare_exchange(50, 100, Ordering::Relaxed, Ordering::Relaxed);
1246        assert_eq!(result, Ok(50));
1247        assert_eq!(cursor.load(Ordering::Relaxed), 100);
1248
1249        let result = cursor.compare_exchange(50, 200, Ordering::Relaxed, Ordering::Relaxed);
1250        assert_eq!(result, Err(100));
1251        assert_eq!(cursor.load(Ordering::Relaxed), 100);
1252    }
1253
1254    // ============================================================================
1255    // ERROR HANDLING TESTS
1256    // ============================================================================
1257
1258    #[test]
1259    fn test_consumer_attachment_to_nonexistent_segment() {
1260        let name = "nonexistent".to_string();
1261
1262        let config = SharedMemoryConfig {
1263            name,
1264            buffer_size: 8,
1265            element_size: std::mem::size_of::<TestEvent>(),
1266            create: false,
1267        };
1268
1269        let result: Result<SharedConsumer<TestEvent>, MultiProcessError> =
1270            SharedDisruptorBuilder::new(config).build_consumer();
1271        assert!(result.is_err());
1272
1273        match result.err().unwrap() {
1274            MultiProcessError::SegmentNotFound(_) => {} // Expected
1275            other => panic!("Expected SegmentNotFound, got {:?}", other),
1276        }
1277    }
1278
1279    #[test]
1280    fn test_feature_completeness_documentation() {
1281        // This test serves as living documentation of implemented features
1282
1283        // Implemented Core Features
1284        // Basic producer/consumer coordination
1285        // Single event publish/consume
1286        // Ring buffer overflow protection
1287        // Shared memory creation and attachment
1288        // Atomic sequence coordination
1289        // Thread-safe operations
1290        // Proper error handling
1291
1292        // Not Yet Implemented (Future Work)
1293        // - Foundation for multi-writer producer topologies
1294        // - Multiple consumers (SPMC pattern)
1295        // - Multiple producers (MPSC pattern)
1296        // - Consumer dependencies and barriers
1297        // - Wait strategies beyond busy spinning
1298        // - Consumer thread lifecycle management
1299    }
1300
1301    #[test]
1302    fn test_batch_publish_writes_and_consumes_in_sequence() {
1303        let name = unique_test_segment("batch_sequence");
1304        let buffer_size = 8;
1305
1306        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1307            .build_producer(TestEvent::default)
1308            .unwrap();
1309
1310        let upper = producer
1311            .try_batch_publish(4, |event, i| {
1312                event.sequence = i as i64;
1313                event.data = 100 + i as i64;
1314            })
1315            .unwrap();
1316        assert_eq!(upper, 3);
1317        assert_eq!(producer.last_published_sequence(), 3);
1318
1319        let config = SharedMemoryConfig {
1320            name,
1321            buffer_size,
1322            element_size: std::mem::size_of::<TestEvent>(),
1323            create: false,
1324        };
1325        let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1326            .build_consumer()
1327            .unwrap();
1328
1329        let mut consumed = Vec::new();
1330        while consumed.len() < 4 {
1331            let processed = consumer.process_available(|event: &TestEvent, seq| {
1332                consumed.push((seq, event.sequence, event.data));
1333            });
1334            if processed == 0 {
1335                std::thread::yield_now();
1336            }
1337        }
1338
1339        assert_eq!(
1340            consumed,
1341            vec![(0, 0, 100), (1, 1, 101), (2, 2, 102), (3, 3, 103)]
1342        );
1343    }
1344
1345    #[test]
1346    fn test_simple_batch_publish_is_noop_for_zero() {
1347        let name = unique_test_segment("batch_zero");
1348        let buffer_size = 8;
1349
1350        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1351            .build_producer(TestEvent::default)
1352            .unwrap();
1353
1354        assert_eq!(
1355            producer.try_batch_publish(0, |_event, _| {
1356                panic!("indexed closure must not run for n=0")
1357            }),
1358            Ok(-1)
1359        );
1360
1361        producer
1362            .simple_batch_publish(0, |_event, _| {
1363                panic!("simple_batch_publish no-op closure must not run for n=0")
1364            })
1365            .unwrap();
1366        assert_eq!(producer.last_published_sequence(), -1);
1367    }
1368
1369    #[test]
1370    fn test_try_batch_publish_reports_missing_slots_when_full() {
1371        let name = unique_test_segment("batch_full");
1372        let buffer_size = 4;
1373        let start_consume = Arc::new(AtomicBool::new(false));
1374        let consumed = Arc::new(AtomicUsize::new(0));
1375
1376        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1377            .discover_consumer_with_prefix(1, "bchk")
1378            .build_producer(TestEvent::default)
1379            .unwrap();
1380
1381        let consumer_handle = {
1382            let name = name.clone();
1383            let start_consume = start_consume.clone();
1384            let consumed = consumed.clone();
1385
1386            std::thread::spawn(move || {
1387                let config = SharedMemoryConfig {
1388                    name,
1389                    buffer_size,
1390                    element_size: std::mem::size_of::<TestEvent>(),
1391                    create: false,
1392                };
1393                let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1394                    .discover_consumer_with_prefix(1, "bchk")
1395                    .with_consumer_id("bchk_0")
1396                    .build_consumer()
1397                    .unwrap();
1398
1399                while !start_consume.load(Ordering::Acquire) {
1400                    std::thread::yield_now();
1401                }
1402
1403                while consumed.load(Ordering::Acquire) < (buffer_size + 1) {
1404                    let processed = consumer.process_available(|_event, _| {
1405                        consumed.fetch_add(1, Ordering::AcqRel);
1406                    });
1407                    if processed == 0 {
1408                        std::thread::yield_now();
1409                    }
1410                }
1411            })
1412        };
1413
1414        producer
1415            .try_batch_publish(4, |event, i| {
1416                event.sequence = i as i64;
1417                event.data = 10 + i as i64;
1418            })
1419            .expect("full buffer should accept exactly `buffer_size` slots");
1420
1421        // Ensure producer sees the consumer cursor before evaluating full-capacity math.
1422        let producer_seq = producer.last_published_sequence();
1423        let discovery_deadline = Instant::now() + Duration::from_secs(2);
1424        loop {
1425            if producer.min_gating_sequence() != producer_seq {
1426                break;
1427            }
1428            if Instant::now() > discovery_deadline {
1429                panic!("consumer discovery did not reduce gating sequence below producer cursor");
1430            }
1431            std::thread::yield_now();
1432        }
1433
1434        let err = producer
1435            .try_batch_publish(1, |_event, _| {
1436                panic!("second batch must not run when capacity is exhausted")
1437            })
1438            .expect_err("producer must report missing free slots when full");
1439        assert_eq!(err, MissingFreeSlots(1));
1440
1441        start_consume.store(true, Ordering::Release);
1442
1443        let deadline = Instant::now() + Duration::from_secs(2);
1444        while consumed.load(Ordering::Acquire) == 0 {
1445            if Instant::now() > deadline {
1446                panic!("consumer did not start consuming after start signal");
1447            }
1448            std::thread::yield_now();
1449        }
1450
1451        producer
1452            .try_batch_publish(1, |event, i| {
1453                event.sequence = 4 + i as i64;
1454                event.data = 14;
1455            })
1456            .expect("single-slot batch should succeed once one slot is released");
1457
1458        consumer_handle.join().unwrap();
1459        assert_eq!(consumed.load(Ordering::Acquire), buffer_size + 1);
1460    }
1461
1462    #[test]
1463    fn test_fast_slow_consumer_race_condition_fix() {
1464        let name = unique_test_segment("race_condition_fix");
1465        let buffer_size = 8; // Small buffer to force backpressure
1466
1467        // Create producer with discovery to track both consumers
1468        let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1469            .enable_discovery(2) // Track 2 consumers
1470            .build_producer(TestEvent::default)
1471            .unwrap();
1472
1473        // Create two consumers
1474        let config = SharedMemoryConfig {
1475            name: name.clone(),
1476            buffer_size,
1477            element_size: std::mem::size_of::<TestEvent>(),
1478            create: false,
1479        };
1480
1481        let mut fast_consumer: SharedConsumer<TestEvent> =
1482            SharedDisruptorBuilder::new(config.clone())
1483                .build_consumer()
1484                .unwrap();
1485
1486        let mut slow_consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1487            .build_consumer()
1488            .unwrap();
1489
1490        // Publish events to fill buffer
1491        for i in 0..buffer_size {
1492            producer.publish(|event| {
1493                event.sequence = i as i64;
1494                event.data = i as i64 * 100; // Use distinctive values
1495            });
1496        }
1497
1498        println!("Published {} events to fill buffer", buffer_size);
1499
1500        // Fast consumer processes all available events
1501        let mut fast_events = Vec::new();
1502        fast_consumer.process_available(|event: &TestEvent, seq| {
1503            fast_events.push((seq, event.sequence, event.data));
1504        });
1505
1506        // Slow consumer processes only some events (simulating slow processing)
1507        let mut slow_events = Vec::new();
1508        let mut slow_processed = 0;
1509        slow_consumer.process_available(|event: &TestEvent, seq| {
1510            if slow_processed < 3 {
1511                // Only process first 3 events
1512                slow_events.push((seq, event.sequence, event.data));
1513                slow_processed += 1;
1514            }
1515        });
1516
1517        println!("Fast consumer processed: {} events", fast_events.len());
1518        println!("Slow consumer processed: {} events", slow_events.len());
1519        println!("Fast consumer events: {:?}", fast_events);
1520        println!("Slow consumer events: {:?}", slow_events);
1521
1522        // The key test: try to publish more events
1523        // With the race condition fix, producer should be blocked by slow consumer
1524        let mut successful_publishes = 0;
1525        for i in buffer_size..(buffer_size + 10) {
1526            match producer.try_publish(|event| {
1527                event.sequence = i as i64;
1528                event.data = i as i64 * 100;
1529            }) {
1530                Ok(_) => {
1531                    successful_publishes += 1;
1532                    println!("Successfully published event {} (data: {})", i, i * 100);
1533                }
1534                Err(_) => {
1535                    println!(
1536                        "Buffer full at event {} - producer correctly blocked by slow consumer",
1537                        i
1538                    );
1539                    break;
1540                }
1541            }
1542        }
1543
1544        // Now let slow consumer process more events
1545        let slow_events_before_catchup = slow_events.len();
1546        slow_consumer.process_available(|event: &TestEvent, seq| {
1547            slow_events.push((seq, event.sequence, event.data));
1548        });
1549
1550        println!("Slow consumer after catchup: {} events", slow_events.len());
1551        println!("Slow consumer all events: {:?}", slow_events);
1552
1553        // Verify the first few events are identical between consumers
1554        // (this proves no data corruption occurred)
1555        let overlap_count = std::cmp::min(fast_events.len(), slow_events_before_catchup);
1556        for i in 0..overlap_count {
1557            assert_eq!(
1558                fast_events[i], slow_events[i],
1559                "Data corruption detected at index {}: fast consumer saw {:?}, slow consumer saw {:?}",
1560                i, fast_events[i], slow_events[i]
1561            );
1562        }
1563
1564        // Verify that producer was properly constrained by slow consumer
1565        // It should not have been able to publish unlimited events
1566        assert!(
1567            successful_publishes < 10,
1568            "Producer should have been blocked by slow consumer, but published {} additional events",
1569            successful_publishes
1570        );
1571
1572        println!("SUCCESS: Producer correctly respected slow consumer position!");
1573        println!(
1574            "   - Fast consumer processed {} events immediately",
1575            fast_events.len()
1576        );
1577        println!(
1578            "   - Slow consumer processed {} events initially",
1579            slow_events_before_catchup
1580        );
1581        println!(
1582            "   - Producer was blocked after {} additional publishes",
1583            successful_publishes
1584        );
1585    }
1586
1587    /// Test that producer can publish many events without consumers (no discovery)
1588    /// This verifies the buffer wrapping fix - previously would deadlock at 64KB
1589    #[test]
1590    fn test_buffer_wrapping_without_consumers() {
1591        const BUFFER_SIZE: usize = 512; // 512 slots (64KB with 128-byte events)
1592        const NUM_EVENTS: u64 = 150_000; // Way more than buffer size
1593
1594        println!(
1595            "Testing: Publishing {} events without consumers (no discovery)",
1596            NUM_EVENTS
1597        );
1598
1599        let segment_name = unique_test_segment("wrap_no_disc");
1600
1601        // Create producer WITHOUT discovery
1602        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
1603            // Explicitly NOT enabling discovery
1604            .build_producer(TestEvent::default)
1605            .expect("Failed to create producer");
1606
1607        println!("Producer created without discovery");
1608
1609        // Try to publish many events - should wrap the buffer correctly
1610        let start = Instant::now();
1611        for i in 0..NUM_EVENTS {
1612            if start.elapsed() > Duration::from_secs(5) {
1613                panic!("Timeout at event {} - buffer not wrapping correctly!", i);
1614            }
1615
1616            producer.publish(|event| {
1617                event.sequence = i as i64;
1618                event.data = (i % 1000) as i64;
1619            });
1620
1621            if i > 0 && (i % 10_000 == 0) {
1622                println!("Published {} events", i);
1623            }
1624        }
1625
1626        println!(
1627            "✅ Successfully published {} events without consumers!",
1628            NUM_EVENTS
1629        );
1630        println!("Buffer wrapped {} times", NUM_EVENTS / BUFFER_SIZE as u64);
1631    }
1632
1633    /// Test the exact 64KB boundary case that was failing before the fix
1634    #[test]
1635    fn test_exact_64kb_boundary_no_deadlock() {
1636        const BUFFER_SIZE: usize = 512; // 512 slots = 64KB with 128-byte events
1637        const NUM_EVENTS: u64 = 65_536; // Exactly where the old bug occurred
1638
1639        println!(
1640            "Testing: Publishing exactly {} events (64KB boundary)",
1641            NUM_EVENTS
1642        );
1643
1644        let segment_name = unique_test_segment("boundary");
1645
1646        // Create producer without discovery
1647        let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
1648            .build_producer(TestEvent::default)
1649            .expect("Failed to create producer");
1650
1651        let start = Instant::now();
1652        for i in 0..NUM_EVENTS {
1653            if start.elapsed() > Duration::from_secs(2) {
1654                panic!("Deadlock at event {} - this is the old bug!", i);
1655            }
1656
1657            producer.publish(|event| {
1658                event.sequence = i as i64;
1659            });
1660        }
1661
1662        println!(
1663            "✅ Successfully published {} events - no deadlock at 64KB boundary!",
1664            NUM_EVENTS
1665        );
1666    }
1667
1668    /// Test with various buffer sizes to ensure the fix works universally
1669    #[test]
1670    fn test_buffer_wrapping_various_sizes() {
1671        let buffer_sizes = vec![256, 512, 1024, 2048];
1672
1673        for buffer_size in buffer_sizes {
1674            let num_events = (buffer_size * 100) as u64; // 100x the buffer size
1675
1676            println!(
1677                "Testing buffer size {} with {} events",
1678                buffer_size, num_events
1679            );
1680
1681            let segment_name = unique_test_segment(&format!("size_{}", buffer_size));
1682
1683            let mut producer =
1684                build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1685                    .build_producer(TestEvent::default)
1686                    .expect("Failed to create producer");
1687
1688            let start = Instant::now();
1689            for i in 0..num_events {
1690                if start.elapsed() > Duration::from_secs(5) {
1691                    panic!("Timeout with buffer size {} at event {}", buffer_size, i);
1692                }
1693
1694                producer.publish(|event| {
1695                    event.sequence = i as i64;
1696                });
1697            }
1698
1699            println!(
1700                "✅ Buffer size {} handled {} events correctly",
1701                buffer_size, num_events
1702            );
1703        }
1704    }
1705}