use super::consumer_barrier::{DiscoveryMode, SharedConsumerBarrier};
use crate::required_consumer::{
RequiredConsumerError, RequiredConsumerLivenessConfig, RequiredConsumerLivenessState,
};
use crate::{SharedCursor, SharedRingBuffer};
use disruptor_core::{MissingFreeSlots, RingBufferFull, Sequence};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default)]
pub enum CoordinationMode {
#[default]
Immediate,
WaitForConsumers {
min_consumers: i64,
timeout: Duration,
},
BufferUntilConsumers {
max_buffer: usize,
},
}
impl CoordinationMode {
pub fn wait_for_consumers(min_consumers: i64, timeout: Duration) -> Self {
assert!(min_consumers > 0, "min_consumers must be greater than zero");
CoordinationMode::WaitForConsumers {
min_consumers,
timeout,
}
}
pub fn wait_for_single_consumer(timeout: Duration) -> Self {
assert!(!timeout.is_zero(), "timeout must be greater than zero");
CoordinationMode::WaitForConsumers {
min_consumers: 1,
timeout,
}
}
}
pub struct SharedProducer<E> {
ring_buffer: SharedRingBuffer<E>,
producer_sequence: SharedCursor,
pub(crate) consumer_barrier: SharedConsumerBarrier,
sequence: Sequence,
sequence_clear_of_consumers: Sequence,
pub(crate) coordination_completed: bool,
required_consumer_liveness: Option<RequiredConsumerLivenessState>,
counters: ProducerCounters,
}
#[derive(Default, Debug)]
pub struct ProducerCounters {
pub events_published: Option<crate::observability::CounterHandle>,
pub producer_full_events: Option<crate::observability::CounterHandle>,
}
#[derive(Clone, Copy, Debug)]
pub struct ProducerCounterSelection {
pub events_published: bool,
pub producer_full_events: bool,
}
impl ProducerCounterSelection {
pub const FULL: Self = Self {
events_published: true,
producer_full_events: true,
};
pub const LITE: Self = Self {
events_published: true,
producer_full_events: false,
};
}
impl<E> SharedProducer<E>
where
E: Copy + Default,
{
pub(crate) fn new_with_coordination_and_discovery(
ring_buffer: SharedRingBuffer<E>,
producer_sequence: SharedCursor,
base_name: String,
coordination_mode: CoordinationMode,
discovery_mode: DiscoveryMode,
consumer_registration: Option<SharedCursor>,
) -> Self {
producer_sequence.store(-1, std::sync::atomic::Ordering::Release);
let mut consumer_barrier = match &coordination_mode {
CoordinationMode::WaitForConsumers { .. } => {
SharedConsumerBarrier::new_with_coordination_and_discovery(
base_name.clone(),
discovery_mode.clone(),
)
.unwrap_or_else(|_| {
SharedConsumerBarrier::new_with_discovery(base_name, discovery_mode)
})
}
_ => {
SharedConsumerBarrier::new_with_discovery(base_name, discovery_mode)
}
};
if let Some(consumer_registration) = consumer_registration {
consumer_barrier.set_consumer_registration(consumer_registration);
}
consumer_barrier.set_producer_sequence(producer_sequence.clone());
let sequence_clear_of_consumers = ring_buffer.size() as i64 - 1;
Self {
ring_buffer,
producer_sequence,
consumer_barrier,
sequence: 0,
sequence_clear_of_consumers,
coordination_completed: false,
required_consumer_liveness: None,
counters: ProducerCounters::default(),
}
}
pub fn attach_counters(&mut self, file: &crate::observability::CountersFile) {
self.attach_counters_selected(file, ProducerCounterSelection::FULL);
}
pub fn attach_counters_selected(
&mut self,
file: &crate::observability::CountersFile,
selection: ProducerCounterSelection,
) {
use crate::observability::{ids, COUNTER_FLAG_PRODUCER};
self.counters.events_published = if selection.events_published {
file.register(
ids::EVENTS_PUBLISHED,
COUNTER_FLAG_PRODUCER,
"events_published",
)
} else {
None
};
self.counters.producer_full_events = if selection.producer_full_events {
file.register(
ids::PRODUCER_FULL_EVENTS,
COUNTER_FLAG_PRODUCER,
"producer_full_events",
)
} else {
None
};
}
pub fn counters(&self) -> &ProducerCounters {
&self.counters
}
#[inline]
pub fn record_publish_latency_ns(&self, ns: u64) {
#[cfg(feature = "metrics")]
metrics::histogram!("disruptor_mp_publish_latency_ns").record(ns as f64);
#[cfg(not(feature = "metrics"))]
let _ = ns;
}
#[inline]
fn next_sequences(&mut self, n: usize) -> Result<Sequence, MissingFreeSlots> {
let n = i64::try_from(n).map_err(|_| MissingFreeSlots(u64::MAX))?;
assert!(n > 0, "batch size must be greater than zero");
let n_next = self.last_reserved_sequence(n)?;
self.update_min_consumer_clearance_window(n, n_next)?;
Ok(n_next)
}
fn last_reserved_sequence(&self, n: i64) -> Result<Sequence, MissingFreeSlots> {
self.sequence
.checked_sub(1)
.and_then(|current| current.checked_add(n))
.ok_or(MissingFreeSlots(u64::MAX))
}
fn update_min_consumer_clearance_window(
&mut self,
n: i64,
n_next: Sequence,
) -> Result<(), MissingFreeSlots> {
if self.sequence_clear_of_consumers >= n_next {
return Ok(());
}
let last_published = self.sequence - 1;
let rear_sequence_read = self.consumer_barrier.get_min_consumer_sequence();
let free_slots = self
.ring_buffer
.free_slots(last_published, rear_sequence_read);
if free_slots < n {
#[cfg(dst)]
crate::dst::assert_sometimes(
true,
"producer blocked",
format!("free_slots={free_slots} requested={n}"),
);
return Err(MissingFreeSlots((n - free_slots) as u64));
}
self.sequence_clear_of_consumers = last_published + free_slots;
Ok(())
}
#[inline]
fn apply_update<F>(&mut self, update: F) -> Sequence
where
F: FnOnce(&mut E),
{
assert!(
self.sequence >= 0,
"producer sequence must be non-negative while active"
);
let sequence = self.sequence;
let event_ptr = self.ring_buffer.get(sequence);
let event = unsafe { &mut *event_ptr };
update(event);
self.producer_sequence.store(sequence, Ordering::Release);
#[cfg(dst)]
if sequence > 0 && sequence % self.ring_buffer.size() as i64 == 0 {
crate::dst::assert_sometimes(
true,
"ring buffer wraps around",
format!("sequence={sequence} size={}", self.ring_buffer.size()),
);
}
self.sequence += 1;
sequence
}
#[inline]
fn apply_batch_updates<F>(&mut self, n: usize, update_fn: F) -> Sequence
where
F: Fn(&mut E, usize), {
assert!(
n > 0,
"batch publish requires a non-zero number of events to update"
);
let n = i64::try_from(n).expect("batch size must fit in Sequence");
let lower = self.sequence;
let upper_offset = n.checked_sub(1).expect("batch size is positive");
let upper = lower
.checked_add(upper_offset)
.expect("sequence arithmetic must not overflow");
for (i, seq) in (lower..=upper).enumerate() {
let event_ptr = self.ring_buffer.get(seq);
let event = unsafe { &mut *event_ptr };
update_fn(event, i);
}
self.producer_sequence.store(upper, Ordering::Release);
self.sequence += n;
upper
}
}
impl<E> SharedProducer<E>
where
E: Copy + Default,
{
pub fn try_publish<F>(&mut self, update: F) -> Result<Sequence, RingBufferFull>
where
F: FnOnce(&mut E),
{
if self.next_sequences(1).is_err() {
if let Some(h) = &self.counters.producer_full_events {
h.inc();
}
return Err(RingBufferFull);
}
let sequence = self.apply_update(update);
if let Some(h) = &self.counters.events_published {
h.inc();
}
Ok(sequence)
}
pub fn publish<F>(&mut self, update: F)
where
F: FnOnce(&mut E),
{
let mut spun = false;
while self.next_sequences(1).is_err() {
if !spun {
if let Some(h) = &self.counters.producer_full_events {
h.inc();
}
spun = true;
}
std::hint::spin_loop();
}
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
std::thread::yield_now();
}
self.apply_update(update);
if let Some(h) = &self.counters.events_published {
h.inc();
}
}
pub fn try_batch_publish<F>(
&mut self,
n: usize,
update_fn: F,
) -> Result<Sequence, MissingFreeSlots>
where
F: Fn(&mut E, usize),
{
if n == 0 {
return Ok(self.sequence - 1);
}
self.next_sequences(n)?;
let sequence = self.apply_batch_updates(n, update_fn);
Ok(sequence)
}
pub fn batch_publish<F>(&mut self, n: usize, update_fn: F) -> Result<Sequence, MissingFreeSlots>
where
F: Fn(&mut E, usize),
{
self.try_batch_publish(n, update_fn)
}
pub fn simple_batch_publish<F>(
&mut self,
n: usize,
update_fn: F,
) -> Result<Sequence, MissingFreeSlots>
where
F: Fn(&mut E, usize),
{
self.try_batch_publish(n, update_fn)
}
}
impl<E> Drop for SharedProducer<E> {
fn drop(&mut self) {
}
}
#[derive(Debug, thiserror::Error)]
pub enum PublishTimeoutError {
#[error("Publish operation timed out")]
Timeout,
}
impl<E> SharedProducer<E>
where
E: Copy + Default,
{
pub fn enable_required_consumer_liveness(
&mut self,
config: RequiredConsumerLivenessConfig,
) -> &mut Self {
self.required_consumer_liveness = Some(RequiredConsumerLivenessState::new(config));
self
}
#[cold]
#[inline(never)]
fn ensure_required_consumers_ready(&mut self) -> Result<(), RequiredConsumerError> {
let Some(mut state) = self.required_consumer_liveness.take() else {
return Ok(());
};
if state.startup_completed() {
self.required_consumer_liveness = Some(state);
return Ok(());
}
let deadline = Instant::now()
.checked_add(state.startup_wait_timeout())
.expect("startup_wait_timeout does not fit in Instant");
loop {
let missing = state
.missing_required_consumers(|consumer_id| self.discover_consumer_id(consumer_id));
if missing.is_empty() {
let now = Instant::now();
state.seed_progress(now, |consumer_id| self.consumer_sequence(consumer_id));
state.mark_startup_completed(now);
self.required_consumer_liveness = Some(state);
return Ok(());
}
if Instant::now() >= deadline {
let error = RequiredConsumerError::StartupTimeout { missing };
self.required_consumer_liveness = Some(state);
return Err(error);
}
super::wait::perform_default_discovery_poll_wait();
}
}
#[cold]
#[inline(never)]
fn check_required_consumer_liveness(&mut self) -> Result<(), RequiredConsumerError> {
let Some(mut state) = self.required_consumer_liveness.take() else {
return Ok(());
};
let now = Instant::now();
let producer_sequence = self.last_published_sequence();
let failure = state.evaluate_blocked(now, producer_sequence, |consumer_id| {
self.consumer_sequence(consumer_id)
});
self.required_consumer_liveness = Some(state);
if let Some(error) = failure {
return Err(error);
}
Ok(())
}
pub fn publish_with_timeout<F>(
&mut self,
timeout: Duration,
update: F,
) -> Result<Sequence, PublishTimeoutError>
where
F: FnOnce(&mut E),
{
assert!(
timeout > Duration::ZERO,
"timeout must be greater than zero"
);
let deadline = Instant::now()
.checked_add(timeout)
.expect("timeout duration does not fit in Instant");
while self.next_sequences(1).is_err() {
if Instant::now() >= deadline {
return Err(PublishTimeoutError::Timeout);
}
std::hint::spin_loop();
}
let sequence = self.apply_update(update);
Ok(sequence)
}
pub fn publish_managed<F>(&mut self, update: F) -> Result<Sequence, RequiredConsumerError>
where
F: FnOnce(&mut E),
{
self.ensure_required_consumers_ready()?;
let mut update = Some(update);
loop {
if self.next_sequences(1).is_ok() {
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
std::thread::yield_now();
}
let sequence =
self.apply_update(update.take().expect("managed update is consumed once"));
return Ok(sequence);
}
self.check_required_consumer_liveness()?;
std::thread::yield_now();
}
}
pub fn publish_batch_managed<F>(
&mut self,
n: usize,
update_fn: F,
) -> Result<Sequence, RequiredConsumerError>
where
F: Fn(&mut E, usize),
{
if n == 0 {
return Ok(self.sequence - 1);
}
self.ensure_required_consumers_ready()?;
loop {
if self.next_sequences(n).is_ok() {
return Ok(self.apply_batch_updates(n, &update_fn));
}
self.check_required_consumer_liveness()?;
std::thread::yield_now();
}
}
}
impl<E> SharedProducer<E>
where
E: Copy + Default,
{
pub fn last_published_sequence(&self) -> Sequence {
self.producer_sequence
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn min_gating_sequence(&mut self) -> Sequence {
#[cfg(dst)]
if crate::dst::buggify(file!(), line!()) {
return self.last_published_sequence();
}
self.consumer_barrier.get_min_consumer_sequence()
}
pub fn is_consumed(&mut self, seq: Sequence) -> bool {
self.consumer_barrier.get_min_consumer_sequence() >= seq
}
pub fn discover_consumer_id(&mut self, consumer_id: &str) -> bool {
self.consumer_barrier.discover_consumer_id(consumer_id)
}
pub fn wait_for_consumer_id(&mut self, consumer_id: &str, timeout: Duration) -> bool {
assert!(timeout > Duration::ZERO, "timeout must be positive");
let deadline = Instant::now()
.checked_add(timeout)
.expect("timeout duration does not fit in Instant");
loop {
if self.discover_consumer_id(consumer_id) {
return true;
}
if Instant::now() >= deadline {
return false;
}
super::wait::perform_default_discovery_poll_wait();
}
}
pub fn consumer_sequence(&mut self, consumer_id: &str) -> Option<Sequence> {
self.consumer_barrier.consumer_sequence(consumer_id)
}
pub fn get_consumer_count(&mut self) -> usize {
self.consumer_barrier.best_effort_consumer_count()
}
pub fn wait_until_consumed_with_strategy(
&mut self,
seq: Sequence,
timeout: Duration,
strategy: super::builder::AutoWaitStrategy,
) -> bool {
use std::time::Instant;
assert!(timeout >= Duration::ZERO, "timeout must be non-negative");
let deadline = Instant::now()
.checked_add(timeout)
.expect("timeout duration does not fit in Instant");
while Instant::now() < deadline {
if self.is_consumed(seq) {
return true;
}
Self::apply_wait_strategy(&strategy);
}
false
}
fn apply_wait_strategy(strategy: &super::builder::AutoWaitStrategy) {
use super::builder::AutoWaitStrategy as WS;
match strategy {
WS::BusySpin | WS::BusySpinWithSpinLoopHint => {
std::hint::spin_loop();
}
WS::SpinThenYield { spins } => {
for _ in 0..*spins {
std::hint::spin_loop();
}
std::thread::yield_now();
}
WS::Block => {
super::wait::perform_default_block_wait();
}
WS::Sleep(d) => {
super::wait::sleep_or_yield(*d);
}
}
}
}