1#[allow(rustdoc::private_doc_tests)]
21#[path = "builder.rs"]
22mod builder;
23#[path = "consumer.rs"]
24mod consumer;
25#[path = "lock_free/consumer_barrier.rs"]
26mod consumer_barrier;
27#[path = "lock_free/cursor.rs"]
28mod cursor;
29#[path = "producer.rs"]
30mod producer;
31#[path = "backend/shared_memory/ringbuffer.rs"]
32mod ringbuffer;
33#[path = "runtime/wait.rs"]
34mod wait;
35
36pub mod shared_memory {
40 pub use super::ringbuffer::SharedRingBuffer;
41 pub use super::SharedMemoryConfig;
42
43 pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
45}
46
47pub mod backend {
52 pub mod shared_memory {
54 pub use super::super::ringbuffer::SharedRingBuffer;
55 pub use super::super::SharedMemoryConfig;
56
57 pub type ShmRingBuffer<E> = SharedRingBuffer<E>;
59 }
60
61 pub mod mmap {
63 pub use super::super::MmapConsumerBarrier;
64 pub use super::super::MmapCursorConfig;
65 pub use super::super::MmapFileConfig;
66 pub use super::super::MmapTransportLayout;
67 pub use crate::mmap_consumer::MmapConsumer;
68 pub use crate::mmap_cursor::MmapCursor;
69 pub use crate::mmap_producer::MmapProducer;
70 pub use crate::mmap_ringbuffer::MmapRingBuffer;
71 }
72}
73
74pub mod lock_free {
76 pub use super::consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
77 pub use super::cursor::{SharedCursor, SharedCursorTrait};
78
79 pub type ProducerBarrier = super::cursor::SharedCursor;
81}
82
83#[inline]
85pub fn default_block_strategy_duration() -> std::time::Duration {
86 wait::default_block_strategy_duration()
87}
88
89#[inline]
91pub fn default_consume_sleep_duration() -> std::time::Duration {
92 wait::default_consume_sleep_duration()
93}
94
95#[inline]
97pub fn default_discovery_poll_duration() -> std::time::Duration {
98 wait::default_discovery_poll_duration()
99}
100
101#[inline]
103pub fn perform_default_block_wait() {
104 wait::perform_default_block_wait()
105}
106
107#[inline]
109pub fn perform_default_consume_sleep_wait() {
110 wait::perform_default_consume_sleep_wait()
111}
112
113#[inline]
115pub fn perform_default_discovery_poll_wait() {
116 wait::perform_default_discovery_poll_wait()
117}
118
119#[inline]
121pub fn perform_sleep_wait(duration: std::time::Duration) {
122 wait::perform_sleep_wait(duration)
123}
124
125pub use crate::mmap_barrier::MmapConsumerBarrier;
126pub use crate::mmap_consumer::MmapConsumer;
127pub use crate::mmap_cursor::MmapCursor;
128pub use crate::mmap_producer::MmapProducer;
129pub use crate::mmap_ringbuffer::MmapRingBuffer;
130pub use crate::mmap_transport::MmapTransportLayout;
131pub use builder::{
132 attach_shared_consumer, build_shared_single_producer, AutoConsumer, AutoWaitStrategy,
133 SharedDisruptorBuilder,
134};
135pub use consumer::{ConsumerCounterSelection, SharedConsumer};
136pub use consumer_barrier::{ConsumerBarrier, DiscoveryMode, SharedConsumerBarrier};
137pub use cursor::{SharedCursor, SharedCursorTrait};
138pub use producer::{CoordinationMode, ProducerCounterSelection, SharedProducer};
139pub use ringbuffer::SharedRingBuffer;
140pub use shared_memory::ShmRingBuffer;
141
142use std::{fmt, path::PathBuf};
144
145pub const DEFAULT_MAX_CONSUMERS: usize = 64;
151
152#[derive(Debug, thiserror::Error)]
154pub enum MultiProcessError {
155 #[error("Failed to create shared memory: {0}")]
157 SharedMemoryError(String),
158
159 #[error("Failed to map memory: {0}")]
161 MemoryMapError(String),
162
163 #[error("Shared segment not found: {0}")]
165 SegmentNotFound(String),
166
167 #[error("Incompatible data layout: {0}")]
169 IncompatibleLayout(String),
170
171 #[error("Permission denied")]
173 PermissionDenied,
174
175 #[error("Coordination timeout: {0}")]
177 CoordinationTimeout(String),
178}
179
180pub type MultiProcessResult<T> = Result<T, MultiProcessError>;
182
183#[derive(Debug, Clone)]
185pub struct SharedMemoryConfig {
186 pub name: String,
188 pub buffer_size: usize,
190 pub element_size: usize,
192 pub create: bool,
194}
195
196impl fmt::Display for SharedMemoryConfig {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 write!(
199 f,
200 "SharedMemory(name={}, size={}, element_size={})",
201 self.name, self.buffer_size, self.element_size
202 )
203 }
204}
205
206#[derive(Debug, Clone)]
208pub struct MmapFileConfig {
209 pub path: PathBuf,
211 pub buffer_size: usize,
213 pub element_size: usize,
215 pub create: bool,
217}
218
219impl fmt::Display for MmapFileConfig {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 write!(
222 f,
223 "MmapFile(path={}, size={}, element_size={})",
224 self.path.display(),
225 self.buffer_size,
226 self.element_size
227 )
228 }
229}
230
231#[derive(Debug, Clone)]
233pub struct MmapCursorConfig {
234 pub path: PathBuf,
236 pub create: bool,
238}
239
240impl fmt::Display for MmapCursorConfig {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 write!(f, "MmapCursor(path={})", self.path.display())
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::MissingFreeSlots;
250 use crate::RequiredConsumerError;
251 use crate::RequiredConsumerLivenessConfig;
252 use crate::RingBufferFull;
253 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
254 use std::sync::Arc;
255 use std::thread;
256 use std::time::{Duration, Instant};
257
258 fn unique_test_segment(prefix: &str) -> String {
259 let name = crate::portable_shm_segment_name(prefix);
260 assert!(
261 name.len() <= 14,
262 "test segment name '{}' exceeds macOS-safe budget",
263 name
264 );
265 name
266 }
267
268 #[test]
269 fn test_unique_test_segment_stays_within_macos_budget() {
270 assert!(unique_test_segment("process_available_blocking_batch").len() <= 14);
271 assert!(unique_test_segment("race_condition_fix").len() <= 14);
272 }
273
274 #[derive(Debug, Copy, Clone, Default, PartialEq)]
275 struct TestEvent {
276 sequence: i64,
277 data: i64,
278 }
279
280 fn attach_named_consumer(
281 name: &str,
282 buffer_size: usize,
283 consumer_id: &str,
284 ) -> SharedConsumer<TestEvent> {
285 let config = SharedMemoryConfig {
286 name: name.to_string(),
287 buffer_size,
288 element_size: std::mem::size_of::<TestEvent>(),
289 create: false,
290 };
291
292 SharedDisruptorBuilder::new(config)
293 .with_consumer_id(consumer_id)
294 .build_consumer()
295 .unwrap()
296 }
297
298 #[test]
299 fn managed_publish_reports_missing_required_consumer_at_startup() {
300 let name = unique_test_segment("req_cons_start");
301 let buffer_size = 8;
302
303 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
304 .enable_discovery(2)
305 .with_coordination(CoordinationMode::Immediate)
306 .build_producer(TestEvent::default)
307 .unwrap();
308 producer.enable_required_consumer_liveness(
309 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
310 .with_startup_wait_timeout(Duration::from_millis(50))
311 .with_progress_timeout(Duration::from_millis(20))
312 .with_progress_check_interval(Duration::from_millis(1))
313 .with_shutdown_grace_period(Duration::from_millis(10)),
314 );
315
316 let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
317
318 let error = producer
319 .publish_managed(|event| {
320 event.sequence = 1;
321 event.data = 100;
322 })
323 .expect_err("managed publish should fail when c2 never appears");
324
325 match error {
326 RequiredConsumerError::StartupTimeout { missing } => {
327 assert_eq!(missing, vec!["c2".to_string()]);
328 }
329 other => panic!("unexpected error: {other:?}"),
330 }
331 }
332
333 #[test]
334 fn managed_batch_publish_reports_missing_required_consumer_at_startup() {
335 let name = unique_test_segment("req_cons_batch_start");
336 let buffer_size = 8;
337
338 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
339 .enable_discovery(2)
340 .with_coordination(CoordinationMode::Immediate)
341 .build_producer(TestEvent::default)
342 .unwrap();
343 producer.enable_required_consumer_liveness(
344 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
345 .with_startup_wait_timeout(Duration::from_millis(50))
346 .with_progress_timeout(Duration::from_millis(20))
347 .with_progress_check_interval(Duration::from_millis(1))
348 .with_shutdown_grace_period(Duration::from_millis(10)),
349 );
350
351 let _consumer1 = attach_named_consumer(&name, buffer_size, "c1");
352
353 let error = producer
354 .publish_batch_managed(2, |event, index| {
355 event.sequence = index as i64;
356 event.data = (index as i64) * 10;
357 })
358 .expect_err("managed batch publish should fail when c2 never appears");
359
360 match error {
361 RequiredConsumerError::StartupTimeout { missing } => {
362 assert_eq!(missing, vec!["c2".to_string()]);
363 }
364 other => panic!("unexpected error: {other:?}"),
365 }
366 }
367
368 #[test]
369 fn managed_publish_shuts_down_when_required_consumer_stalls() {
370 let name = unique_test_segment("req_cons_fail");
371 let buffer_size = 4;
372
373 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
374 .enable_discovery(2)
375 .with_coordination(CoordinationMode::Immediate)
376 .build_producer(TestEvent::default)
377 .unwrap();
378 producer.enable_required_consumer_liveness(
379 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
380 .with_startup_wait_timeout(Duration::from_millis(100))
381 .with_progress_timeout(Duration::from_millis(20))
382 .with_progress_check_interval(Duration::from_millis(1))
383 .with_shutdown_grace_period(Duration::from_millis(20)),
384 );
385
386 let stop_consumer1 = Arc::new(AtomicBool::new(false));
387 let stop_consumer1_thread = Arc::clone(&stop_consumer1);
388 let name_for_thread = name.clone();
389 let consumer1_thread = thread::spawn(move || {
390 let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
391 while !stop_consumer1_thread.load(Ordering::Acquire) {
392 if consumer1.try_consume_next().is_none() {
393 thread::sleep(Duration::from_millis(1));
394 }
395 }
396 });
397
398 let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
399
400 producer
401 .publish_managed(|event| {
402 event.sequence = 0;
403 event.data = 0;
404 })
405 .unwrap();
406 let _ = consumer2.consume_next();
407 drop(consumer2);
408
409 for i in 1..=buffer_size {
410 producer
411 .publish_managed(|event| {
412 event.sequence = i as i64;
413 event.data = (i as i64) * 10;
414 })
415 .unwrap();
416 }
417
418 let error = producer
419 .publish_managed(|event| {
420 event.sequence = 99;
421 event.data = 990;
422 })
423 .expect_err("managed publish should fail once c2 stops advancing");
424
425 stop_consumer1.store(true, Ordering::Release);
426 consumer1_thread.join().unwrap();
427
428 match error {
429 RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
430 assert_eq!(consumer_id, "c2");
431 }
432 other => panic!("unexpected error: {other:?}"),
433 }
434 }
435
436 #[test]
437 fn managed_publish_recovers_when_same_consumer_id_rejoins() {
438 let name = unique_test_segment("req_cons_rejn");
439 let buffer_size = 4;
440
441 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
442 .enable_discovery(2)
443 .with_coordination(CoordinationMode::Immediate)
444 .build_producer(TestEvent::default)
445 .unwrap();
446 producer.enable_required_consumer_liveness(
447 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
448 .with_startup_wait_timeout(Duration::from_millis(100))
449 .with_progress_timeout(Duration::from_millis(20))
450 .with_progress_check_interval(Duration::from_millis(1))
451 .with_shutdown_grace_period(Duration::from_millis(200)),
452 );
453
454 let stop_consumer1 = Arc::new(AtomicBool::new(false));
455 let stop_consumer1_thread = Arc::clone(&stop_consumer1);
456 let name_for_thread = name.clone();
457 let consumer1_thread = thread::spawn(move || {
458 let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
459 while !stop_consumer1_thread.load(Ordering::Acquire) {
460 if consumer1.try_consume_next().is_none() {
461 thread::sleep(Duration::from_millis(1));
462 }
463 }
464 });
465
466 let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
467
468 producer
469 .publish_managed(|event| {
470 event.sequence = 0;
471 event.data = 0;
472 })
473 .unwrap();
474 let _ = consumer2.consume_next();
475 drop(consumer2);
476
477 for i in 1..=buffer_size {
478 producer
479 .publish_managed(|event| {
480 event.sequence = i as i64;
481 event.data = (i as i64) * 10;
482 })
483 .unwrap();
484 }
485
486 let name_for_rejoin = name.clone();
487 let rejoin_thread = thread::spawn(move || {
488 thread::sleep(Duration::from_millis(40));
489 let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
490 let deadline = Instant::now() + Duration::from_millis(500);
491 let mut consumed = 0usize;
492 while Instant::now() < deadline && consumed < buffer_size + 2 {
493 if rejoined.try_consume_next().is_some() {
494 consumed += 1;
495 } else {
496 thread::sleep(Duration::from_millis(1));
497 }
498 }
499 consumed
500 });
501
502 let sequence = producer
503 .publish_managed(|event| {
504 event.sequence = 99;
505 event.data = 990;
506 })
507 .expect("same-id rejoin should recover before shutdown");
508
509 stop_consumer1.store(true, Ordering::Release);
510 consumer1_thread.join().unwrap();
511 let rejoined_consumed = rejoin_thread.join().unwrap();
512
513 assert!(sequence >= buffer_size as i64);
514 assert!(rejoined_consumed > 0, "rejoined c2 should consume backlog");
515 }
516
517 #[test]
518 fn managed_publish_rejects_different_consumer_id_rejoin() {
519 let name = unique_test_segment("req_cons_diff");
520 let buffer_size = 4;
521
522 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
523 .enable_discovery(2)
524 .with_coordination(CoordinationMode::Immediate)
525 .build_producer(TestEvent::default)
526 .unwrap();
527 producer.enable_required_consumer_liveness(
528 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
529 .with_startup_wait_timeout(Duration::from_millis(100))
530 .with_progress_timeout(Duration::from_millis(20))
531 .with_progress_check_interval(Duration::from_millis(1))
532 .with_shutdown_grace_period(Duration::from_millis(200)),
533 );
534
535 let stop_consumer1 = Arc::new(AtomicBool::new(false));
536 let stop_consumer1_thread = Arc::clone(&stop_consumer1);
537 let name_for_thread = name.clone();
538 let consumer1_thread = thread::spawn(move || {
539 let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
540 while !stop_consumer1_thread.load(Ordering::Acquire) {
541 if consumer1.try_consume_next().is_none() {
542 thread::sleep(Duration::from_millis(1));
543 }
544 }
545 });
546
547 let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
548 producer
549 .publish_managed(|event| {
550 event.sequence = 0;
551 event.data = 0;
552 })
553 .unwrap();
554 let _ = consumer2.consume_next();
555 drop(consumer2);
556
557 for i in 1..=buffer_size {
558 producer
559 .publish_managed(|event| {
560 event.sequence = i as i64;
561 event.data = (i as i64) * 10;
562 })
563 .unwrap();
564 }
565
566 let name_for_wrong_rejoin = name.clone();
567 let wrong_rejoin_thread = thread::spawn(move || {
568 thread::sleep(Duration::from_millis(40));
569 let mut wrong_consumer =
570 attach_named_consumer(&name_for_wrong_rejoin, buffer_size, "c3");
571 let deadline = Instant::now() + Duration::from_millis(500);
572 let mut consumed = 0usize;
573 while Instant::now() < deadline && consumed < buffer_size + 2 {
574 if wrong_consumer.try_consume_next().is_some() {
575 consumed += 1;
576 } else {
577 thread::sleep(Duration::from_millis(1));
578 }
579 }
580 consumed
581 });
582
583 let error = producer
584 .publish_managed(|event| {
585 event.sequence = 99;
586 event.data = 990;
587 })
588 .expect_err("wrong-id rejoin must not clear the c2 stall");
589
590 stop_consumer1.store(true, Ordering::Release);
591 consumer1_thread.join().unwrap();
592 let wrong_rejoin_consumed = wrong_rejoin_thread.join().unwrap();
593
594 assert!(
595 wrong_rejoin_consumed > 0,
596 "a new consumer id may still read the broadcast stream"
597 );
598 match error {
599 RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
600 assert_eq!(consumer_id, "c2");
601 }
602 other => panic!("unexpected error: {other:?}"),
603 }
604 }
605
606 #[test]
607 fn managed_publish_rejoin_after_grace_period_still_fails() {
608 let name = unique_test_segment("req_cons_late");
609 let buffer_size = 4;
610
611 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
612 .enable_discovery(2)
613 .with_coordination(CoordinationMode::Immediate)
614 .build_producer(TestEvent::default)
615 .unwrap();
616 producer.enable_required_consumer_liveness(
617 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
618 .with_startup_wait_timeout(Duration::from_millis(100))
619 .with_progress_timeout(Duration::from_millis(20))
620 .with_progress_check_interval(Duration::from_millis(1))
621 .with_shutdown_grace_period(Duration::from_millis(60)),
622 );
623
624 let stop_consumer1 = Arc::new(AtomicBool::new(false));
625 let stop_consumer1_thread = Arc::clone(&stop_consumer1);
626 let name_for_thread = name.clone();
627 let consumer1_thread = thread::spawn(move || {
628 let mut consumer1 = attach_named_consumer(&name_for_thread, buffer_size, "c1");
629 while !stop_consumer1_thread.load(Ordering::Acquire) {
630 if consumer1.try_consume_next().is_none() {
631 thread::sleep(Duration::from_millis(1));
632 }
633 }
634 });
635
636 let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
637 producer
638 .publish_managed(|event| {
639 event.sequence = 0;
640 event.data = 0;
641 })
642 .unwrap();
643 let _ = consumer2.consume_next();
644 drop(consumer2);
645
646 for i in 1..=buffer_size {
647 producer
648 .publish_managed(|event| {
649 event.sequence = i as i64;
650 event.data = (i as i64) * 10;
651 })
652 .unwrap();
653 }
654
655 let name_for_rejoin = name.clone();
656 let rejoin_thread = thread::spawn(move || {
657 thread::sleep(Duration::from_millis(140));
658 let mut rejoined = attach_named_consumer(&name_for_rejoin, buffer_size, "c2");
659 let deadline = Instant::now() + Duration::from_millis(300);
660 let mut consumed = 0usize;
661 while Instant::now() < deadline && consumed < buffer_size + 2 {
662 if rejoined.try_consume_next().is_some() {
663 consumed += 1;
664 } else {
665 thread::sleep(Duration::from_millis(1));
666 }
667 }
668 consumed
669 });
670
671 let error = producer
672 .publish_managed(|event| {
673 event.sequence = 99;
674 event.data = 990;
675 })
676 .expect_err("late same-id rejoin must not rescue the topology after grace expires");
677
678 stop_consumer1.store(true, Ordering::Release);
679 consumer1_thread.join().unwrap();
680 let rejoined_consumed = rejoin_thread.join().unwrap();
681
682 assert!(
683 rejoined_consumed > 0,
684 "late rejoined consumer may still drain retained backlog"
685 );
686 match error {
687 RequiredConsumerError::GracefulShutdownTriggered { consumer_id, .. } => {
688 assert_eq!(consumer_id, "c2");
689 }
690 other => panic!("unexpected error: {other:?}"),
691 }
692 }
693
694 #[test]
695 fn managed_publish_does_not_fail_while_topology_is_idle() {
696 let name = unique_test_segment("req_cons_idle");
697 let buffer_size = 8;
698
699 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
700 .enable_discovery(2)
701 .with_coordination(CoordinationMode::Immediate)
702 .build_producer(TestEvent::default)
703 .unwrap();
704 producer.enable_required_consumer_liveness(
705 RequiredConsumerLivenessConfig::new(vec!["c1".into(), "c2".into()])
706 .with_startup_wait_timeout(Duration::from_millis(100))
707 .with_progress_timeout(Duration::from_millis(20))
708 .with_progress_check_interval(Duration::from_millis(1))
709 .with_shutdown_grace_period(Duration::from_millis(50)),
710 );
711
712 let mut consumer1 = attach_named_consumer(&name, buffer_size, "c1");
713 let mut consumer2 = attach_named_consumer(&name, buffer_size, "c2");
714
715 producer
716 .publish_managed(|event| {
717 event.sequence = 1;
718 event.data = 10;
719 })
720 .unwrap();
721 assert_eq!(consumer1.consume_next().0, 0);
722 assert_eq!(consumer2.consume_next().0, 0);
723
724 thread::sleep(Duration::from_millis(75));
725
726 producer
727 .publish_managed(|event| {
728 event.sequence = 2;
729 event.data = 20;
730 })
731 .expect("idle topology must not trigger stall shutdown");
732
733 assert_eq!(consumer1.consume_next().0, 1);
734 assert_eq!(consumer2.consume_next().0, 1);
735 }
736
737 #[test]
742 fn test_shared_ring_buffer_creation_and_attachment() {
743 let name = unique_test_segment("test_ring_basic");
744 let buffer_size = 8;
745
746 let config_create = SharedMemoryConfig {
748 name: name.clone(),
749 buffer_size,
750 element_size: std::mem::size_of::<TestEvent>(),
751 create: true,
752 };
753
754 let ring_buffer = SharedRingBuffer::new(config_create, TestEvent::default).unwrap();
755 assert_eq!(ring_buffer.size(), buffer_size);
756
757 let config_attach = SharedMemoryConfig {
759 name,
760 buffer_size,
761 element_size: std::mem::size_of::<TestEvent>(),
762 create: false,
763 };
764
765 let attached_buffer: SharedRingBuffer<TestEvent> =
766 SharedRingBuffer::attach(config_attach).unwrap();
767 assert_eq!(attached_buffer.size(), buffer_size);
768 assert_eq!(ring_buffer.size(), attached_buffer.size());
769 }
770
771 #[test]
772 fn test_basic_producer_consumer_coordination() {
773 let name = unique_test_segment("test_basic_coord");
774 let buffer_size = 8;
775
776 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
778 .build_producer(TestEvent::default)
779 .unwrap();
780
781 let config = SharedMemoryConfig {
783 name,
784 buffer_size,
785 element_size: std::mem::size_of::<TestEvent>(),
786 create: false,
787 };
788 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
789 .build_consumer()
790 .unwrap();
791
792 producer.publish(|event| {
794 event.sequence = 0;
795 event.data = 42;
796 });
797
798 let mut consumed_events = Vec::new();
799
800 if let Some((seq, event)) = consumer.try_consume_next() {
802 consumed_events.push((seq, event));
803 }
804
805 assert_eq!(consumed_events.len(), 1);
806 assert_eq!(consumed_events[0].0, 0); assert_eq!(consumed_events[0].1.sequence, 0);
808 assert_eq!(consumed_events[0].1.data, 42);
809 }
810
811 #[test]
816 fn test_spsc_ring_buffer_full_behavior() {
817 let name = unique_test_segment("spsc_full");
818 let buffer_size = 4;
819
820 let name_clone = name.clone();
822 let consumer_handle = thread::spawn(move || {
823 let config = SharedMemoryConfig {
824 name: name_clone,
825 buffer_size,
826 element_size: std::mem::size_of::<TestEvent>(),
827 create: false,
828 };
829
830 thread::sleep(Duration::from_millis(50));
832
833 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
834 .build_consumer()
835 .unwrap();
836
837 thread::sleep(Duration::from_millis(200));
839
840 let mut consumed = Vec::new();
841 let processed = consumer.process_available(|event: &TestEvent, _| {
842 consumed.push(event.data);
843 });
844
845 (processed, consumed)
846 });
847
848 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
850 .enable_discovery(1) .build_producer(TestEvent::default)
852 .unwrap();
853
854 thread::sleep(Duration::from_millis(100));
856
857 for i in 0..buffer_size {
859 producer
860 .try_publish(|e| {
861 e.sequence = i as i64;
862 e.data = i as i64 * 10;
863 })
864 .expect("Should be able to publish to non-full buffer");
865 }
866
867 assert_eq!(
869 producer
870 .try_publish(|e| e.sequence = buffer_size as i64)
871 .err()
872 .unwrap(),
873 RingBufferFull
874 );
875
876 let (processed, consumed) = consumer_handle.join().unwrap();
878 assert_eq!(processed, buffer_size); producer
882 .try_publish(|e| {
883 e.sequence = buffer_size as i64;
884 e.data = 999;
885 })
886 .expect("Should be able to publish after consumer freed space");
887
888 let expected: Vec<i64> = (0..buffer_size).map(|i| i as i64 * 10).collect();
890 assert_eq!(consumed, expected);
891 }
892
893 #[test]
894 fn test_spsc_ordered_event_processing() {
895 let name = unique_test_segment("spsc_ordered");
896 let buffer_size = 16; let num_events = 10;
898
899 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
900 .build_producer(TestEvent::default)
901 .unwrap();
902
903 let config = SharedMemoryConfig {
904 name,
905 buffer_size,
906 element_size: std::mem::size_of::<TestEvent>(),
907 create: false,
908 };
909 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
910 .build_consumer()
911 .unwrap();
912
913 for i in 0..num_events {
915 producer.publish(|event| {
916 event.sequence = i as i64;
917 event.data = (i as i64) * (i as i64); });
919 }
920
921 let mut consumed_events = Vec::new();
923 let mut total_processed = 0;
924
925 while total_processed < num_events {
927 let processed = consumer.process_available(|event: &TestEvent, seq| {
928 consumed_events.push((seq, event.sequence, event.data));
929 });
930 total_processed += processed;
931
932 if processed == 0 {
933 thread::yield_now();
935 }
936 }
937
938 assert_eq!(consumed_events.len(), num_events);
940 for (i, &(seq, event_seq, data)) in consumed_events.iter().enumerate() {
941 assert_eq!(seq, i as i64);
942 assert_eq!(event_seq, i as i64);
943 assert_eq!(data, (i as i64) * (i as i64));
944 }
945 }
946
947 #[test]
948 fn test_process_available_advances_consumer_sequence_after_batch() {
949 let name = unique_test_segment("process_available_batch");
950 let buffer_size = 16;
951 let num_events = 6;
952
953 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
954 .build_producer(TestEvent::default)
955 .unwrap();
956
957 let config = SharedMemoryConfig {
958 name,
959 buffer_size,
960 element_size: std::mem::size_of::<TestEvent>(),
961 create: false,
962 };
963 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
964 .build_consumer()
965 .unwrap();
966
967 for i in 0..num_events {
968 producer.publish(|event| {
969 event.sequence = i as i64;
970 event.data = i as i64 * 10;
971 });
972 }
973
974 let mut consumed = Vec::new();
975 let processed = consumer.process_available(|event: &TestEvent, seq| {
976 consumed.push((seq, event.sequence, event.data));
977 });
978
979 assert_eq!(processed, num_events);
980 assert_eq!(consumed.len(), num_events);
981 assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
982 assert_eq!(consumer.producer_sequence(), (num_events - 1) as i64);
983 assert_eq!(consumer.consumer_sequence(), (num_events - 1) as i64);
984 }
985
986 #[test]
987 fn test_process_available_blocking_marks_only_final_event_as_end_of_batch() {
988 let name = unique_test_segment("process_available_blocking_batch");
989 let buffer_size = 16;
990 let num_events = 4;
991
992 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
993 .build_producer(TestEvent::default)
994 .unwrap();
995
996 let config = SharedMemoryConfig {
997 name,
998 buffer_size,
999 element_size: std::mem::size_of::<TestEvent>(),
1000 create: false,
1001 };
1002 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1003 .build_consumer()
1004 .unwrap();
1005
1006 for i in 0..num_events {
1007 producer.publish(|event| {
1008 event.sequence = i as i64;
1009 event.data = i as i64;
1010 });
1011 }
1012
1013 let mut observed = Vec::new();
1014 let processed =
1015 consumer.process_available_blocking(|event: &TestEvent, seq, end_of_batch| {
1016 observed.push((seq, event.sequence, end_of_batch));
1017 });
1018
1019 assert_eq!(processed, num_events);
1020 assert_eq!(
1021 observed,
1022 vec![(0, 0, false), (1, 1, false), (2, 2, false), (3, 3, true),]
1023 );
1024 assert_eq!(consumer.current_sequence(), (num_events - 1) as i64);
1025 }
1026
1027 #[test]
1028 fn test_per_consumer_sequences_prevent_race_conditions() {
1029 let name = unique_test_segment("per_consumer_test");
1030 let buffer_size = 64;
1031 let num_events = 10; let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1035 .build_producer(TestEvent::default)
1036 .unwrap();
1037
1038 let config = SharedMemoryConfig {
1040 name: name.clone(),
1041 buffer_size,
1042 element_size: std::mem::size_of::<TestEvent>(),
1043 create: false,
1044 };
1045
1046 let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
1047 .build_consumer()
1048 .unwrap();
1049
1050 let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1051 .build_consumer()
1052 .unwrap();
1053
1054 println!("Created two consumers for broadcast test");
1055
1056 for i in 0..num_events {
1058 producer.publish(|event| {
1059 event.sequence = i as i64;
1060 event.data = i as i64;
1061 });
1062 println!("Published event {}", i);
1063 }
1064
1065 let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
1067 let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
1068 println!(
1069 "After publishing - Consumer 1: current={}, producer={}, consumer={}",
1070 seq1, prod_seq1, consumer_seq1
1071 );
1072 println!(
1073 "After publishing - Consumer 2: current={}, producer={}, consumer={}",
1074 seq2, prod_seq2, consumer_seq2
1075 );
1076
1077 let mut consumer1_events = Vec::new();
1079 let mut consumer2_events = Vec::new();
1080
1081 let _processed1 = consumer1.process_available(|event: &TestEvent, _seq| {
1083 consumer1_events.push((event.sequence, event.data));
1084 println!(
1085 "Consumer 1 processed event: seq={}, data={}",
1086 event.sequence, event.data
1087 );
1088 });
1089
1090 let _processed2 = consumer2.process_available(|event: &TestEvent, _seq| {
1092 consumer2_events.push((event.sequence, event.data));
1093 println!(
1094 "Consumer 2 processed event: seq={}, data={}",
1095 event.sequence, event.data
1096 );
1097 });
1098
1099 println!(
1100 "Consumer 1 processed {} events: {:?}",
1101 consumer1_events.len(),
1102 consumer1_events
1103 );
1104 println!(
1105 "Consumer 2 processed {} events: {:?}",
1106 consumer2_events.len(),
1107 consumer2_events
1108 );
1109
1110 let (seq1, prod_seq1, consumer_seq1) = consumer1.debug_sequences();
1112 let (seq2, prod_seq2, consumer_seq2) = consumer2.debug_sequences();
1113 println!(
1114 "After processing - Consumer 1: current={}, producer={}, consumer={}",
1115 seq1, prod_seq1, consumer_seq1
1116 );
1117 println!(
1118 "After processing - Consumer 2: current={}, producer={}, consumer={}",
1119 seq2, prod_seq2, consumer_seq2
1120 );
1121
1122 assert_eq!(
1124 consumer1_events.len(),
1125 num_events,
1126 "Consumer 1 should see all events"
1127 );
1128 assert_eq!(
1129 consumer2_events.len(),
1130 num_events,
1131 "Consumer 2 should see all events"
1132 );
1133
1134 for i in 0..num_events {
1136 assert_eq!(consumer1_events[i], (i as i64, i as i64));
1137 assert_eq!(consumer2_events[i], (i as i64, i as i64));
1138 }
1139
1140 println!("SUCCESS: Both consumers saw all events (broadcast semantics)!");
1141 }
1142
1143 #[test]
1144 fn test_broadcast_consumer_basic() {
1145 let name = unique_test_segment("broadcast_basic");
1146 let buffer_size = 64;
1147 let num_events = 5;
1148
1149 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1151 .build_producer(TestEvent::default)
1152 .unwrap();
1153
1154 let config = SharedMemoryConfig {
1156 name: name.clone(),
1157 buffer_size,
1158 element_size: std::mem::size_of::<TestEvent>(),
1159 create: false,
1160 };
1161
1162 let mut consumer1: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config.clone())
1163 .build_consumer()
1164 .unwrap();
1165
1166 let mut consumer2: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1167 .build_consumer()
1168 .unwrap();
1169
1170 println!("Created two consumers for basic broadcast test");
1171 println!(
1172 "Consumer 1 ID: {}, Consumer 2 ID: {}",
1173 consumer1.consumer_id(),
1174 consumer2.consumer_id()
1175 );
1176
1177 for i in 0..num_events {
1179 producer.publish(|event| {
1180 event.sequence = i as i64;
1181 event.data = i as i64;
1182 });
1183 }
1184
1185 let mut consumer1_events = Vec::new();
1187 let mut consumer2_events = Vec::new();
1188
1189 consumer1.process_available(|event: &TestEvent, _seq| {
1190 consumer1_events.push(event.sequence);
1191 });
1192
1193 consumer2.process_available(|event: &TestEvent, _seq| {
1194 consumer2_events.push(event.sequence);
1195 });
1196
1197 println!("Consumer 1 processed: {:?}", consumer1_events);
1198 println!("Consumer 2 processed: {:?}", consumer2_events);
1199
1200 assert_eq!(
1202 consumer1_events.len(),
1203 num_events,
1204 "Consumer 1 should see all events"
1205 );
1206 assert_eq!(
1207 consumer2_events.len(),
1208 num_events,
1209 "Consumer 2 should see all events"
1210 );
1211
1212 for i in 0..num_events {
1214 assert_eq!(consumer1_events[i], i as i64);
1215 assert_eq!(consumer2_events[i], i as i64);
1216 }
1217
1218 println!("Both consumers saw all events in order!");
1219 }
1220
1221 #[test]
1230 fn test_shared_cursor_operations() {
1231 let name = unique_test_segment("atomic_ops");
1232 let cursor = SharedCursor::new(&name, 0).unwrap();
1233
1234 assert_eq!(cursor.load(Ordering::Relaxed), 0);
1236
1237 cursor.store(42, Ordering::Relaxed);
1238 assert_eq!(cursor.load(Ordering::Relaxed), 42);
1239
1240 let old = cursor.fetch_add(8, Ordering::Relaxed);
1241 assert_eq!(old, 42);
1242 assert_eq!(cursor.load(Ordering::Relaxed), 50);
1243
1244 let result = cursor.compare_exchange(50, 100, Ordering::Relaxed, Ordering::Relaxed);
1246 assert_eq!(result, Ok(50));
1247 assert_eq!(cursor.load(Ordering::Relaxed), 100);
1248
1249 let result = cursor.compare_exchange(50, 200, Ordering::Relaxed, Ordering::Relaxed);
1250 assert_eq!(result, Err(100));
1251 assert_eq!(cursor.load(Ordering::Relaxed), 100);
1252 }
1253
1254 #[test]
1259 fn test_consumer_attachment_to_nonexistent_segment() {
1260 let name = "nonexistent".to_string();
1261
1262 let config = SharedMemoryConfig {
1263 name,
1264 buffer_size: 8,
1265 element_size: std::mem::size_of::<TestEvent>(),
1266 create: false,
1267 };
1268
1269 let result: Result<SharedConsumer<TestEvent>, MultiProcessError> =
1270 SharedDisruptorBuilder::new(config).build_consumer();
1271 assert!(result.is_err());
1272
1273 match result.err().unwrap() {
1274 MultiProcessError::SegmentNotFound(_) => {} other => panic!("Expected SegmentNotFound, got {:?}", other),
1276 }
1277 }
1278
1279 #[test]
1280 fn test_feature_completeness_documentation() {
1281 }
1300
1301 #[test]
1302 fn test_batch_publish_writes_and_consumes_in_sequence() {
1303 let name = unique_test_segment("batch_sequence");
1304 let buffer_size = 8;
1305
1306 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1307 .build_producer(TestEvent::default)
1308 .unwrap();
1309
1310 let upper = producer
1311 .try_batch_publish(4, |event, i| {
1312 event.sequence = i as i64;
1313 event.data = 100 + i as i64;
1314 })
1315 .unwrap();
1316 assert_eq!(upper, 3);
1317 assert_eq!(producer.last_published_sequence(), 3);
1318
1319 let config = SharedMemoryConfig {
1320 name,
1321 buffer_size,
1322 element_size: std::mem::size_of::<TestEvent>(),
1323 create: false,
1324 };
1325 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1326 .build_consumer()
1327 .unwrap();
1328
1329 let mut consumed = Vec::new();
1330 while consumed.len() < 4 {
1331 let processed = consumer.process_available(|event: &TestEvent, seq| {
1332 consumed.push((seq, event.sequence, event.data));
1333 });
1334 if processed == 0 {
1335 std::thread::yield_now();
1336 }
1337 }
1338
1339 assert_eq!(
1340 consumed,
1341 vec![(0, 0, 100), (1, 1, 101), (2, 2, 102), (3, 3, 103)]
1342 );
1343 }
1344
1345 #[test]
1346 fn test_simple_batch_publish_is_noop_for_zero() {
1347 let name = unique_test_segment("batch_zero");
1348 let buffer_size = 8;
1349
1350 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1351 .build_producer(TestEvent::default)
1352 .unwrap();
1353
1354 assert_eq!(
1355 producer.try_batch_publish(0, |_event, _| {
1356 panic!("indexed closure must not run for n=0")
1357 }),
1358 Ok(-1)
1359 );
1360
1361 producer
1362 .simple_batch_publish(0, |_event, _| {
1363 panic!("simple_batch_publish no-op closure must not run for n=0")
1364 })
1365 .unwrap();
1366 assert_eq!(producer.last_published_sequence(), -1);
1367 }
1368
1369 #[test]
1370 fn test_try_batch_publish_reports_missing_slots_when_full() {
1371 let name = unique_test_segment("batch_full");
1372 let buffer_size = 4;
1373 let start_consume = Arc::new(AtomicBool::new(false));
1374 let consumed = Arc::new(AtomicUsize::new(0));
1375
1376 let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1377 .discover_consumer_with_prefix(1, "bchk")
1378 .build_producer(TestEvent::default)
1379 .unwrap();
1380
1381 let consumer_handle = {
1382 let name = name.clone();
1383 let start_consume = start_consume.clone();
1384 let consumed = consumed.clone();
1385
1386 std::thread::spawn(move || {
1387 let config = SharedMemoryConfig {
1388 name,
1389 buffer_size,
1390 element_size: std::mem::size_of::<TestEvent>(),
1391 create: false,
1392 };
1393 let mut consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1394 .discover_consumer_with_prefix(1, "bchk")
1395 .with_consumer_id("bchk_0")
1396 .build_consumer()
1397 .unwrap();
1398
1399 while !start_consume.load(Ordering::Acquire) {
1400 std::thread::yield_now();
1401 }
1402
1403 while consumed.load(Ordering::Acquire) < (buffer_size + 1) {
1404 let processed = consumer.process_available(|_event, _| {
1405 consumed.fetch_add(1, Ordering::AcqRel);
1406 });
1407 if processed == 0 {
1408 std::thread::yield_now();
1409 }
1410 }
1411 })
1412 };
1413
1414 producer
1415 .try_batch_publish(4, |event, i| {
1416 event.sequence = i as i64;
1417 event.data = 10 + i as i64;
1418 })
1419 .expect("full buffer should accept exactly `buffer_size` slots");
1420
1421 let producer_seq = producer.last_published_sequence();
1423 let discovery_deadline = Instant::now() + Duration::from_secs(2);
1424 loop {
1425 if producer.min_gating_sequence() != producer_seq {
1426 break;
1427 }
1428 if Instant::now() > discovery_deadline {
1429 panic!("consumer discovery did not reduce gating sequence below producer cursor");
1430 }
1431 std::thread::yield_now();
1432 }
1433
1434 let err = producer
1435 .try_batch_publish(1, |_event, _| {
1436 panic!("second batch must not run when capacity is exhausted")
1437 })
1438 .expect_err("producer must report missing free slots when full");
1439 assert_eq!(err, MissingFreeSlots(1));
1440
1441 start_consume.store(true, Ordering::Release);
1442
1443 let deadline = Instant::now() + Duration::from_secs(2);
1444 while consumed.load(Ordering::Acquire) == 0 {
1445 if Instant::now() > deadline {
1446 panic!("consumer did not start consuming after start signal");
1447 }
1448 std::thread::yield_now();
1449 }
1450
1451 producer
1452 .try_batch_publish(1, |event, i| {
1453 event.sequence = 4 + i as i64;
1454 event.data = 14;
1455 })
1456 .expect("single-slot batch should succeed once one slot is released");
1457
1458 consumer_handle.join().unwrap();
1459 assert_eq!(consumed.load(Ordering::Acquire), buffer_size + 1);
1460 }
1461
1462 #[test]
1463 fn test_fast_slow_consumer_race_condition_fix() {
1464 let name = unique_test_segment("race_condition_fix");
1465 let buffer_size = 8; let mut producer = build_shared_single_producer::<TestEvent>(&name, buffer_size)
1469 .enable_discovery(2) .build_producer(TestEvent::default)
1471 .unwrap();
1472
1473 let config = SharedMemoryConfig {
1475 name: name.clone(),
1476 buffer_size,
1477 element_size: std::mem::size_of::<TestEvent>(),
1478 create: false,
1479 };
1480
1481 let mut fast_consumer: SharedConsumer<TestEvent> =
1482 SharedDisruptorBuilder::new(config.clone())
1483 .build_consumer()
1484 .unwrap();
1485
1486 let mut slow_consumer: SharedConsumer<TestEvent> = SharedDisruptorBuilder::new(config)
1487 .build_consumer()
1488 .unwrap();
1489
1490 for i in 0..buffer_size {
1492 producer.publish(|event| {
1493 event.sequence = i as i64;
1494 event.data = i as i64 * 100; });
1496 }
1497
1498 println!("Published {} events to fill buffer", buffer_size);
1499
1500 let mut fast_events = Vec::new();
1502 fast_consumer.process_available(|event: &TestEvent, seq| {
1503 fast_events.push((seq, event.sequence, event.data));
1504 });
1505
1506 let mut slow_events = Vec::new();
1508 let mut slow_processed = 0;
1509 slow_consumer.process_available(|event: &TestEvent, seq| {
1510 if slow_processed < 3 {
1511 slow_events.push((seq, event.sequence, event.data));
1513 slow_processed += 1;
1514 }
1515 });
1516
1517 println!("Fast consumer processed: {} events", fast_events.len());
1518 println!("Slow consumer processed: {} events", slow_events.len());
1519 println!("Fast consumer events: {:?}", fast_events);
1520 println!("Slow consumer events: {:?}", slow_events);
1521
1522 let mut successful_publishes = 0;
1525 for i in buffer_size..(buffer_size + 10) {
1526 match producer.try_publish(|event| {
1527 event.sequence = i as i64;
1528 event.data = i as i64 * 100;
1529 }) {
1530 Ok(_) => {
1531 successful_publishes += 1;
1532 println!("Successfully published event {} (data: {})", i, i * 100);
1533 }
1534 Err(_) => {
1535 println!(
1536 "Buffer full at event {} - producer correctly blocked by slow consumer",
1537 i
1538 );
1539 break;
1540 }
1541 }
1542 }
1543
1544 let slow_events_before_catchup = slow_events.len();
1546 slow_consumer.process_available(|event: &TestEvent, seq| {
1547 slow_events.push((seq, event.sequence, event.data));
1548 });
1549
1550 println!("Slow consumer after catchup: {} events", slow_events.len());
1551 println!("Slow consumer all events: {:?}", slow_events);
1552
1553 let overlap_count = std::cmp::min(fast_events.len(), slow_events_before_catchup);
1556 for i in 0..overlap_count {
1557 assert_eq!(
1558 fast_events[i], slow_events[i],
1559 "Data corruption detected at index {}: fast consumer saw {:?}, slow consumer saw {:?}",
1560 i, fast_events[i], slow_events[i]
1561 );
1562 }
1563
1564 assert!(
1567 successful_publishes < 10,
1568 "Producer should have been blocked by slow consumer, but published {} additional events",
1569 successful_publishes
1570 );
1571
1572 println!("SUCCESS: Producer correctly respected slow consumer position!");
1573 println!(
1574 " - Fast consumer processed {} events immediately",
1575 fast_events.len()
1576 );
1577 println!(
1578 " - Slow consumer processed {} events initially",
1579 slow_events_before_catchup
1580 );
1581 println!(
1582 " - Producer was blocked after {} additional publishes",
1583 successful_publishes
1584 );
1585 }
1586
1587 #[test]
1590 fn test_buffer_wrapping_without_consumers() {
1591 const BUFFER_SIZE: usize = 512; const NUM_EVENTS: u64 = 150_000; println!(
1595 "Testing: Publishing {} events without consumers (no discovery)",
1596 NUM_EVENTS
1597 );
1598
1599 let segment_name = unique_test_segment("wrap_no_disc");
1600
1601 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
1603 .build_producer(TestEvent::default)
1605 .expect("Failed to create producer");
1606
1607 println!("Producer created without discovery");
1608
1609 let start = Instant::now();
1611 for i in 0..NUM_EVENTS {
1612 if start.elapsed() > Duration::from_secs(5) {
1613 panic!("Timeout at event {} - buffer not wrapping correctly!", i);
1614 }
1615
1616 producer.publish(|event| {
1617 event.sequence = i as i64;
1618 event.data = (i % 1000) as i64;
1619 });
1620
1621 if i > 0 && (i % 10_000 == 0) {
1622 println!("Published {} events", i);
1623 }
1624 }
1625
1626 println!(
1627 "✅ Successfully published {} events without consumers!",
1628 NUM_EVENTS
1629 );
1630 println!("Buffer wrapped {} times", NUM_EVENTS / BUFFER_SIZE as u64);
1631 }
1632
1633 #[test]
1635 fn test_exact_64kb_boundary_no_deadlock() {
1636 const BUFFER_SIZE: usize = 512; const NUM_EVENTS: u64 = 65_536; println!(
1640 "Testing: Publishing exactly {} events (64KB boundary)",
1641 NUM_EVENTS
1642 );
1643
1644 let segment_name = unique_test_segment("boundary");
1645
1646 let mut producer = build_shared_single_producer::<TestEvent>(&segment_name, BUFFER_SIZE)
1648 .build_producer(TestEvent::default)
1649 .expect("Failed to create producer");
1650
1651 let start = Instant::now();
1652 for i in 0..NUM_EVENTS {
1653 if start.elapsed() > Duration::from_secs(2) {
1654 panic!("Deadlock at event {} - this is the old bug!", i);
1655 }
1656
1657 producer.publish(|event| {
1658 event.sequence = i as i64;
1659 });
1660 }
1661
1662 println!(
1663 "✅ Successfully published {} events - no deadlock at 64KB boundary!",
1664 NUM_EVENTS
1665 );
1666 }
1667
1668 #[test]
1670 fn test_buffer_wrapping_various_sizes() {
1671 let buffer_sizes = vec![256, 512, 1024, 2048];
1672
1673 for buffer_size in buffer_sizes {
1674 let num_events = (buffer_size * 100) as u64; println!(
1677 "Testing buffer size {} with {} events",
1678 buffer_size, num_events
1679 );
1680
1681 let segment_name = unique_test_segment(&format!("size_{}", buffer_size));
1682
1683 let mut producer =
1684 build_shared_single_producer::<TestEvent>(&segment_name, buffer_size)
1685 .build_producer(TestEvent::default)
1686 .expect("Failed to create producer");
1687
1688 let start = Instant::now();
1689 for i in 0..num_events {
1690 if start.elapsed() > Duration::from_secs(5) {
1691 panic!("Timeout with buffer size {} at event {}", buffer_size, i);
1692 }
1693
1694 producer.publish(|event| {
1695 event.sequence = i as i64;
1696 });
1697 }
1698
1699 println!(
1700 "✅ Buffer size {} handled {} events correctly",
1701 buffer_size, num_events
1702 );
1703 }
1704 }
1705}