disruptor_mp/lib.rs
1#![warn(missing_docs)]
2
3//! Multiprocess shared-memory ring buffers for Disruptor-style publication.
4//!
5//! `disruptor-mp` exists for one job: move fixed-size events between
6//! OS processes with as little coordination overhead as possible.
7//!
8//! It extends the upstream single-process `disruptor` crate with a
9//! cross-process data plane. Producers and consumers in different OS
10//! processes coordinate through a shared-memory segment or a
11//! memory-mapped file with cache-line-padded cursors and a fixed-size
12//! ring buffer.
13//!
14//! Use this crate directly when you want the raw substrate only. If
15//! you need framing, codec-backed typed transport, or typed zero-copy,
16//! use `myelon`, which re-exports the relevant public surface from
17//! this crate and builds higher layers on top.
18//!
19//! # What this crate exposes
20//!
21//! | Concern | Type | Purpose |
22//! |---|---|---|
23//! | Raw ring (SHM) | [`SharedProducer<E>`], [`SharedConsumer<E>`] | Cross-process publish/consume of fixed-size events over a POSIX shared-memory segment. |
24//! | Raw ring (mmap) | [`MmapProducer<E>`], [`MmapConsumer<E>`] | Same, backed by a memory-mapped file. |
25//! | Builders | [`build_shared_single_producer`], [`attach_shared_consumer`] | Construct a producer/consumer with discovery and coordination wired up. |
26//! | Coordination | [`CoordinationMode`] | When does the producer consider its peers attached? |
27//! | Liveness | [`RequiredConsumerLivenessConfig`], [`RequiredConsumerFailureAction`] | A stalled required consumer is treated as a failure or alert, not silent backpressure. |
28//! | Naming | [`portable_shm_segment_name`] | Derive a macOS-safe SHM segment name from an arbitrary label. |
29//! | Observability | [`observability`] (RFC-0040) | Hot-path counters file plus `metrics`-rs / Prometheus / OTLP exporters behind feature flags. |
30//!
31//! `E` is your event type: anything `Copy + Default + 'static` with a
32//! stable layout.
33//!
34//! What this crate deliberately does not do:
35//!
36//! - variable-length framing
37//! - typed serialization layers
38//! - zero-copy archived reads over serialized payloads
39//! - inference-specific topology helpers
40//!
41//! # Feature flags
42//!
43//! - `metrics` (default): wire the [`observability`] counters into
44//! the [`metrics`](https://crates.io/crates/metrics) façade so any
45//! `metrics`-rs recorder can ingest them.
46//! - `metrics-prometheus`: pull in `metrics-exporter-prometheus`.
47//! - `metrics-otel`: pull in `opentelemetry-otlp` for OTLP export.
48//! - `RUSTFLAGS="--cfg dst"`: compile deterministic-simulation hooks
49//! used by the internal `myelon-dst` harness and DST integration tests.
50//!
51//! # Required-consumer liveness
52//!
53//! The base model is strict broadcast — the slowest consumer gates
54//! capacity, so a stalled required consumer backpressures the
55//! producer indefinitely. The liveness layer is opt-in and turns
56//! that silent stall into a producer-observable, time-bounded event.
57//!
58//! Enable it via [`SharedProducer::enable_required_consumer_liveness`]
59//! or [`MmapProducer::enable_required_consumer_liveness`], then use
60//! the parallel `*_managed` publish methods (`publish_managed`,
61//! `publish_batch_managed`) instead of the unmanaged ones. Existing
62//! unmanaged calls keep their current semantics for callers that
63//! don't opt in.
64//!
65//! | Case | Outcome |
66//! |---|---|
67//! | Required consumer doesn't appear within `startup_wait_timeout`. | [`RequiredConsumerError::StartupTimeout`] |
68//! | Required consumer stalls past `progress_timeout` while gating the producer. | One stderr alert + optional [`RequiredConsumerAlertHook`] callback. |
69//! | Same consumer ID rejoins before `shutdown_grace_period` expires. | Producer recovers, alert state clears, publishing resumes. |
70//! | Stall persists past `shutdown_grace_period`. | [`RequiredConsumerError::GracefulShutdownTriggered`] |
71//!
72//! The check is **cold-path only** — it runs only while the producer
73//! is blocked on a gating consumer, so steady-state publish cost is
74//! unchanged. There is no consumer-side heartbeat; progress is
75//! observed from the cursor data the producer already needs for
76//! gating. By design the liveness layer does not add dead-consumer
77//! eviction, quorum, or degraded broadcast — the system stays
78//! strict-broadcast.
79//!
80//! The liveness check is cold-path only: it runs while the producer is
81//! blocked on a required consumer, not on the steady-state fast path.
82//!
83//! # Quick start
84//!
85//! Producer side:
86//!
87//! ```no_run
88//! use disruptor_mp::{
89//! build_shared_single_producer, portable_shm_segment_name, CoordinationMode,
90//! };
91//! # #[derive(Copy, Clone, Default)]
92//! # #[repr(C)]
93//! # struct Event { sequence: u64 }
94//! let segment = portable_shm_segment_name("demo");
95//! let mut producer = build_shared_single_producer::<Event>(&segment, 4096)
96//! .discover_consumer_with_prefix(1, "cp")
97//! .with_coordination(CoordinationMode::Immediate)
98//! .build_producer(Event::default)
99//! .expect("build producer");
100//! producer.publish(|slot| slot.sequence = 42);
101//! ```
102//!
103//! Consumer side, in a different process:
104//!
105//! ```no_run
106//! use disruptor_mp::attach_shared_consumer;
107//! # #[derive(Copy, Clone, Default)]
108//! # #[repr(C)]
109//! # struct Event { sequence: u64 }
110//! # let segment = "demo";
111//! let mut consumer = attach_shared_consumer::<Event>(segment, 4096)
112//! .with_consumer_id("cp_0")
113//! .build_consumer()
114//! .expect("attach consumer");
115//! while let Some(event) = consumer.try_consume_next_leased() {
116//! // process &event
117//! # let _ = event;
118//! }
119//! ```
120//!
121//! See the crate README and `examples/` directory for fuller end-to-end
122//! programs and operational guidance.
123
124pub use disruptor_core::{MissingFreeSlots, Producer, RingBufferFull, Sequence};
125
126// `api` and `builder` are private but their public items are re-exported
127// through `pub use api::*;` below. Their doctests reference re-exported
128// types and are valid; suppressing the lint here keeps those examples
129// in place without making the modules `pub`.
130#[allow(rustdoc::private_doc_tests)]
131mod api;
132pub mod env;
133#[path = "backend/mmap/barrier.rs"]
134mod mmap_barrier;
135#[path = "backend/mmap/consumer.rs"]
136mod mmap_consumer;
137#[path = "backend/mmap/cursor.rs"]
138mod mmap_cursor;
139#[path = "backend/mmap/producer.rs"]
140mod mmap_producer;
141#[path = "backend/mmap/ringbuffer.rs"]
142mod mmap_ringbuffer;
143#[path = "backend/mmap/transport.rs"]
144mod mmap_transport;
145pub mod observability;
146mod required_consumer;
147mod segment_name;
148mod shared_memory_layout;
149
150// Deterministic-simulation testing primitives. Available when the
151// build is invoked with `RUSTFLAGS="--cfg dst"`. See the module docs
152// for the FoundationDB/TigerBeetle-style design.
153#[cfg(dst)]
154pub mod dst;
155pub use api::*;
156pub use required_consumer::{
157 RequiredConsumerAlert, RequiredConsumerAlertHook, RequiredConsumerError,
158 RequiredConsumerFailureAction, RequiredConsumerLivenessConfig,
159};
160pub use segment_name::*;
161
162/// Returns true when `value` is an exact multiple of `divisor`.
163///
164/// This uses division/multiplication to avoid unstable and toolchain-dependent
165/// integer helper APIs while staying explicit and compiler-stable.
166#[inline]
167pub fn is_multiple_of_u64(value: u64, divisor: u64) -> bool {
168 divisor != 0 && value / divisor * divisor == value
169}