Skip to main content

Crate disruptor_mp

Crate disruptor_mp 

Source
Expand description

Multiprocess shared-memory ring buffers for Disruptor-style publication.

disruptor-mp exists for one job: move fixed-size events between OS processes with as little coordination overhead as possible.

It extends the upstream single-process disruptor crate with a cross-process data plane. Producers and consumers in different OS processes coordinate through a shared-memory segment or a memory-mapped file with cache-line-padded cursors and a fixed-size ring buffer.

Use this crate directly when you want the raw substrate only. If you need framing, codec-backed typed transport, or typed zero-copy, use myelon, which re-exports the relevant public surface from this crate and builds higher layers on top.

§What this crate exposes

ConcernTypePurpose
Raw ring (SHM)SharedProducer<E>, SharedConsumer<E>Cross-process publish/consume of fixed-size events over a POSIX shared-memory segment.
Raw ring (mmap)MmapProducer<E>, MmapConsumer<E>Same, backed by a memory-mapped file.
Buildersbuild_shared_single_producer, attach_shared_consumerConstruct a producer/consumer with discovery and coordination wired up.
CoordinationCoordinationModeWhen does the producer consider its peers attached?
LivenessRequiredConsumerLivenessConfig, RequiredConsumerFailureActionA stalled required consumer is treated as a failure or alert, not silent backpressure.
Namingportable_shm_segment_nameDerive a macOS-safe SHM segment name from an arbitrary label.
Observabilityobservability (RFC-0040)Hot-path counters file plus metrics-rs / Prometheus / OTLP exporters behind feature flags.

E is your event type: anything Copy + Default + 'static with a stable layout.

What this crate deliberately does not do:

  • variable-length framing
  • typed serialization layers
  • zero-copy archived reads over serialized payloads
  • inference-specific topology helpers

§Feature flags

  • metrics (default): wire the observability counters into the metrics façade so any metrics-rs recorder can ingest them.
  • metrics-prometheus: pull in metrics-exporter-prometheus.
  • metrics-otel: pull in opentelemetry-otlp for OTLP export.
  • RUSTFLAGS="--cfg dst": compile deterministic-simulation hooks used by the internal myelon-dst harness and DST integration tests.

§Required-consumer liveness

The base model is strict broadcast — the slowest consumer gates capacity, so a stalled required consumer backpressures the producer indefinitely. The liveness layer is opt-in and turns that silent stall into a producer-observable, time-bounded event.

Enable it via SharedProducer::enable_required_consumer_liveness or MmapProducer::enable_required_consumer_liveness, then use the parallel *_managed publish methods (publish_managed, publish_batch_managed) instead of the unmanaged ones. Existing unmanaged calls keep their current semantics for callers that don’t opt in.

CaseOutcome
Required consumer doesn’t appear within startup_wait_timeout.RequiredConsumerError::StartupTimeout
Required consumer stalls past progress_timeout while gating the producer.One stderr alert + optional RequiredConsumerAlertHook callback.
Same consumer ID rejoins before shutdown_grace_period expires.Producer recovers, alert state clears, publishing resumes.
Stall persists past shutdown_grace_period.RequiredConsumerError::GracefulShutdownTriggered

The check is cold-path only — it runs only while the producer is blocked on a gating consumer, so steady-state publish cost is unchanged. There is no consumer-side heartbeat; progress is observed from the cursor data the producer already needs for gating. By design the liveness layer does not add dead-consumer eviction, quorum, or degraded broadcast — the system stays strict-broadcast.

The liveness check is cold-path only: it runs while the producer is blocked on a required consumer, not on the steady-state fast path.

§Quick start

Producer side:

use disruptor_mp::{
    build_shared_single_producer, portable_shm_segment_name, CoordinationMode,
};
let segment = portable_shm_segment_name("demo");
let mut producer = build_shared_single_producer::<Event>(&segment, 4096)
    .discover_consumer_with_prefix(1, "cp")
    .with_coordination(CoordinationMode::Immediate)
    .build_producer(Event::default)
    .expect("build producer");
producer.publish(|slot| slot.sequence = 42);

Consumer side, in a different process:

use disruptor_mp::attach_shared_consumer;
let mut consumer = attach_shared_consumer::<Event>(segment, 4096)
    .with_consumer_id("cp_0")
    .build_consumer()
    .expect("attach consumer");
while let Some(event) = consumer.try_consume_next_leased() {
    // process &event
}

See the crate README and examples/ directory for fuller end-to-end programs and operational guidance.

Modules§

backend
Backend namespaces for storage implementations.
env
Environment variable contract for the multiprocess substrate.
lock_free
Lock-free coordination primitives.
observability
Aeron-style counters file for low-overhead observability.
shared_memory
Shared-memory data-plane primitives.

Structs§

AutoConsumer
Automatic consumer that runs in a background thread.
ConsumerCounterSelection
Select which consumer-side counters are attached to a shared counters file.
MissingFreeSlots
The Ring Buffer was missing a number of free slots for doing the batch publication.
MmapConsumer
Consumer for the mmap transport.
MmapConsumerBarrier
Consumer barrier for the mmap transport.
MmapCursor
File-backed mmap cursor with cache-line padding.
MmapCursorConfig
Configuration for file-backed mmap cursor segments.
MmapFileConfig
Configuration for file-backed mmap segments.
MmapProducer
Producer for the mmap transport.
MmapRingBuffer
Ring buffer stored in a file-backed mmap region for multi-process access.
MmapTransportLayout
Stable file layout for one mmap-backed disruptor transport instance.
ProducerCounterSelection
Select which producer-side counters are attached to a shared counters file.
RequiredConsumerAlert
Producer-side alert emitted when a required consumer stops advancing while gating progress.
RequiredConsumerLivenessConfig
Producer-side liveness policy for a required consumer set.
RingBufferFull
Error indicating that the ring buffer is full.
SharedConsumer
Consumer for multi-process disruptor with broadcast semantics Each consumer maintains its own sequence and sees all events
SharedConsumerBarrier
Barrier for tracking consumers in multiprocess shared-memory topologies.
SharedCursor
A shared cursor that can be accessed across processes
SharedDisruptorBuilder
Builder for creating multi-process disruptors.
SharedMemoryConfig
Configuration for shared memory segments
SharedProducer
Multi-process producer for publishing events to shared memory ring buffer.
SharedRingBuffer
Ring buffer stored in shared memory for multi-process access

Enums§

AutoWaitStrategy
Wait strategy for automatic event handlers
CoordinationMode
Coordination mode for multiprocess producer startup
DiscoveryMode
Discovery mode for consumer detection.
MultiProcessError
Errors that can occur during multi-process setup
RequiredConsumerError
Producer-visible failure when required-consumer liveness cannot be maintained.
RequiredConsumerFailureAction
Action to take when a required consumer stops making progress.

Constants§

DEFAULT_MAX_CONSUMERS
Default maximum number of consumers that can be registered with a single shared ring buffer.
PORTABLE_SHM_SEGMENT_NAME_MAX_LEN
Conservative shared-memory segment-name budget that stays portable on macOS.

Traits§

Producer
Producer used for publishing into the Disruptor.
SharedCursorTrait
Generic shared cursor trait

Functions§

attach_shared_consumer
Attach to an existing shared disruptor as a consumer
build_shared_single_producer
Create a shared single producer for multi-process communication
default_block_strategy_duration
Return the configured backoff used by AutoWaitStrategy::Block.
default_consume_sleep_duration
Return the configured sleep used by consume_next_with_sleep and similar helpers.
default_discovery_poll_duration
Return the configured poll interval used by discovery/startup coordination loops.
is_multiple_of_u64
Returns true when value is an exact multiple of divisor.
perform_default_block_wait
Apply the configured AutoWaitStrategy::Block wait policy.
perform_default_consume_sleep_wait
Apply the configured consumer sleep wait policy.
perform_default_discovery_poll_wait
Apply the configured discovery/startup polling wait policy.
perform_sleep_wait
Apply the explicit AutoWaitStrategy::Sleep(duration) wait policy.
portable_shm_segment_name
Generate a shared-memory segment name that fits the portable macOS-safe budget.

Type Aliases§

ConsumerBarrier
Concise alias for the shared consumer barrier type.
MultiProcessResult
Result type for multi-process operations
RequiredConsumerAlertHook
Optional embedding hook invoked when producer-side stall alerting fires.
Sequence
The type for Sequence numbers in the Ring Buffer (i64).
ShmRingBuffer
Short alias for shared-memory ring buffer.