1use crate::{SharedCursor, SharedRingBuffer};
8use disruptor_core::Sequence;
9use std::ops::Deref;
10use std::sync::atomic::Ordering;
11
12pub struct SharedConsumer<E> {
15 ring_buffer: SharedRingBuffer<E>,
16 producer_sequence: SharedCursor,
17 consumer_sequence: SharedCursor,
19 consumer_id: String,
21 last_processed_sequence: Sequence,
23 consumers_ready: Option<SharedCursor>,
25 counters: ConsumerCounters,
28}
29
30#[derive(Default, Debug)]
34pub struct ConsumerCounters {
35 pub events_consumed: Option<crate::observability::CounterHandle>,
37 pub consumer_empty_spins: Option<crate::observability::CounterHandle>,
39 pub consumer_lag_max: Option<crate::observability::CounterHandle>,
41}
42
43#[derive(Clone, Copy, Debug)]
49pub struct ConsumerCounterSelection {
50 pub events_consumed: bool,
52 pub consumer_empty_spins: bool,
54 pub consumer_lag_max: bool,
56}
57
58impl ConsumerCounterSelection {
59 pub const FULL: Self = Self {
61 events_consumed: true,
62 consumer_empty_spins: true,
63 consumer_lag_max: true,
64 };
65
66 pub const LITE: Self = Self {
68 events_consumed: true,
69 consumer_empty_spins: false,
70 consumer_lag_max: false,
71 };
72}
73
74pub struct SharedConsumerLease<'a, E>
75where
76 E: Copy + Default,
77{
78 consumer: &'a mut SharedConsumer<E>,
79 sequence: Sequence,
80 event_ptr: *const E,
81}
82
83impl<E> SharedConsumerLease<'_, E>
84where
85 E: Copy + Default,
86{
87 pub fn sequence(&self) -> Sequence {
88 self.sequence
89 }
90}
91
92impl<E> Deref for SharedConsumerLease<'_, E>
93where
94 E: Copy + Default,
95{
96 type Target = E;
97
98 fn deref(&self) -> &Self::Target {
99 unsafe { &*self.event_ptr }
102 }
103}
104
105impl<E> Drop for SharedConsumerLease<'_, E>
106where
107 E: Copy + Default,
108{
109 fn drop(&mut self) {
110 self.consumer.publish_consumed_sequence(self.sequence);
111 if let Some(h) = &self.consumer.counters.events_consumed {
112 h.inc();
113 }
114 }
115}
116
117impl<E> SharedConsumer<E>
118where
119 E: Copy + Default,
120{
121 pub(crate) fn new_with_coordination(
122 ring_buffer: SharedRingBuffer<E>,
123 producer_sequence: SharedCursor,
124 consumer_sequence: SharedCursor,
125 consumer_id: String,
126 base_name: Option<String>,
127 ) -> Self {
128 assert!(!consumer_id.is_empty(), "consumer_id must not be empty");
129
130 let consumers_ready = base_name.as_ref().and_then(|name| {
132 let coordination_name = format!("{}_cr", name);
134 SharedCursor::attach(&coordination_name).ok()
135 });
136
137 let mut consumer = Self {
138 ring_buffer,
139 producer_sequence,
140 consumer_sequence,
141 consumer_id,
142 last_processed_sequence: -1,
143 consumers_ready,
144 counters: ConsumerCounters::default(),
145 };
146
147 consumer.last_processed_sequence = consumer.consumer_sequence.load(Ordering::Acquire);
148
149 consumer.signal_readiness();
151
152 consumer
153 }
154
155 pub fn signal_readiness(&self) {
158 if let Some(consumers_ready) = &self.consumers_ready {
159 consumers_ready.fetch_add(1, Ordering::AcqRel);
161 }
162 }
163
164 pub fn try_attach_coordination(&mut self, base_name: &str) -> bool {
166 assert!(!base_name.is_empty(), "base_name must not be empty");
167
168 if self.consumers_ready.is_some() {
169 return true; }
171
172 let coordination_name = format!("{}_cr", base_name);
174 if let Ok(cursor) = SharedCursor::attach(&coordination_name) {
175 self.consumers_ready = Some(cursor);
176 self.signal_readiness(); return true;
178 }
179 false
180 }
181
182 pub fn has_coordination_support(&self) -> bool {
184 self.consumers_ready.is_some()
185 }
186
187 pub fn attach_counters(&mut self, file: &crate::observability::CountersFile) {
192 self.attach_counters_selected(file, ConsumerCounterSelection::FULL);
193 }
194
195 pub fn attach_counters_selected(
197 &mut self,
198 file: &crate::observability::CountersFile,
199 selection: ConsumerCounterSelection,
200 ) {
201 use crate::observability::{ids, COUNTER_FLAG_CONSUMER};
202 self.counters.events_consumed = if selection.events_consumed {
203 file.register(
204 ids::EVENTS_CONSUMED,
205 COUNTER_FLAG_CONSUMER,
206 "events_consumed",
207 )
208 } else {
209 None
210 };
211 self.counters.consumer_empty_spins = if selection.consumer_empty_spins {
212 file.register(
213 ids::CONSUMER_EMPTY_SPINS,
214 COUNTER_FLAG_CONSUMER,
215 "consumer_empty_spins",
216 )
217 } else {
218 None
219 };
220 self.counters.consumer_lag_max = if selection.consumer_lag_max {
221 file.register(
222 ids::CONSUMER_LAG_MAX,
223 COUNTER_FLAG_CONSUMER,
224 "consumer_lag_max",
225 )
226 } else {
227 None
228 };
229 }
230
231 pub fn counters(&self) -> &ConsumerCounters {
233 &self.counters
234 }
235
236 #[inline]
248 pub fn record_consume_latency_ns(&self, ns: u64) {
249 #[cfg(feature = "metrics")]
250 metrics::histogram!("disruptor_mp_consume_latency_ns").record(ns as f64);
251 #[cfg(not(feature = "metrics"))]
252 let _ = ns;
253 }
254
255 pub fn try_consume_next(&mut self) -> Option<(Sequence, E)> {
258 let Some((next_sequence, upper)) = self.available_batch_bounds() else {
261 if let Some(h) = &self.counters.consumer_empty_spins {
262 h.inc();
263 }
264 return None;
265 };
266
267 let event_ptr = self.ring_buffer.get(next_sequence);
268 let event = unsafe { *event_ptr }; self.publish_consumed_sequence(next_sequence);
271 if let Some(h) = &self.counters.events_consumed {
272 h.inc();
273 }
274 if let Some(h) = &self.counters.consumer_lag_max {
275 let lag = (upper - next_sequence).max(0) as u64;
277 h.record_max(lag);
278 }
279 Some((next_sequence, event))
280 }
281
282 pub fn try_consume_next_leased(&mut self) -> Option<SharedConsumerLease<'_, E>> {
287 let Some((next_sequence, upper)) = self.available_batch_bounds() else {
288 if let Some(h) = &self.counters.consumer_empty_spins {
289 h.inc();
290 }
291 return None;
292 };
293 if let Some(h) = &self.counters.consumer_lag_max {
294 let lag = (upper - next_sequence).max(0) as u64;
295 h.record_max(lag);
296 }
297 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
298 Some(SharedConsumerLease {
302 consumer: self,
303 sequence: next_sequence,
304 event_ptr,
305 })
306 }
307
308 #[inline]
309 fn available_batch_bounds(&self) -> Option<(Sequence, Sequence)> {
310 assert!(
311 self.last_processed_sequence >= -1,
312 "consumer sequence must not be lower than -1"
313 );
314
315 let producer_seq = self.producer_sequence.load(Ordering::Acquire);
316 let next_sequence = self.last_processed_sequence + 1;
317
318 if next_sequence > producer_seq {
319 return None;
320 }
321
322 Some((next_sequence, producer_seq))
323 }
324
325 #[inline]
326 fn publish_consumed_sequence(&mut self, sequence: Sequence) {
327 self.consumer_sequence.store(sequence, Ordering::Release);
332 self.last_processed_sequence = sequence;
333 }
334
335 #[inline]
336 fn is_end_of_batch(&self) -> bool {
337 self.last_processed_sequence >= self.producer_sequence.load(Ordering::Acquire)
338 }
339
340 #[inline]
341 fn process_snapshot_batch<F>(
342 &mut self,
343 lower: Sequence,
344 upper: Sequence,
345 processor: &mut F,
346 ) -> usize
347 where
348 F: FnMut(&E, Sequence),
349 {
350 let mut processed = 0usize;
351 if let Some(h) = &self.counters.consumer_lag_max {
352 let lag = (upper - lower).max(0) as u64;
353 h.record_max(lag);
354 }
355
356 for sequence in lower..=upper {
357 let event_ptr = self.ring_buffer.get(sequence);
358 let event = unsafe { &*event_ptr };
359 processor(event, sequence);
360 processed += 1;
361 }
362
363 self.publish_consumed_sequence(upper);
364 if let Some(h) = &self.counters.events_consumed {
365 h.add(processed as u64);
366 }
367 processed
368 }
369
370 #[inline]
371 fn process_snapshot_batch_with_eob<F>(
372 &mut self,
373 lower: Sequence,
374 upper: Sequence,
375 processor: &mut F,
376 ) -> usize
377 where
378 F: FnMut(&E, Sequence, bool),
379 {
380 let mut processed = 0usize;
381 if let Some(h) = &self.counters.consumer_lag_max {
382 let lag = (upper - lower).max(0) as u64;
383 h.record_max(lag);
384 }
385
386 for sequence in lower..=upper {
387 let event_ptr = self.ring_buffer.get(sequence);
388 let event = unsafe { &*event_ptr };
389 let end_of_batch = sequence == upper;
390 processor(event, sequence, end_of_batch);
391 processed += 1;
392 }
393
394 self.publish_consumed_sequence(upper);
395 if let Some(h) = &self.counters.events_consumed {
396 h.add(processed as u64);
397 }
398 processed
399 }
400
401 pub fn consume_next(&mut self) -> (Sequence, E) {
404 loop {
405 if let Some((seq, event)) = self.try_consume_next() {
406 return (seq, event);
407 }
408 #[cfg(dst)]
409 if crate::dst::buggify(file!(), line!()) {
410 std::thread::yield_now();
411 }
412 std::hint::spin_loop();
415 }
416 }
417
418 pub fn consume_next_leased(&mut self) -> SharedConsumerLease<'_, E> {
420 loop {
421 if let Some((next_sequence, _)) = self.available_batch_bounds() {
422 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
423 return SharedConsumerLease {
424 consumer: self,
425 sequence: next_sequence,
426 event_ptr,
427 };
428 }
429 std::hint::spin_loop();
430 }
431 }
432
433 pub fn consume_next_with_sleep(&mut self) -> (Sequence, E) {
437 loop {
438 if let Some((seq, event)) = self.try_consume_next() {
439 return (seq, event);
440 }
441 super::wait::perform_default_consume_sleep_wait();
444 }
445 }
446
447 pub fn consume_next_leased_with_sleep(&mut self) -> SharedConsumerLease<'_, E> {
449 loop {
450 if let Some((next_sequence, _)) = self.available_batch_bounds() {
451 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
452 return SharedConsumerLease {
453 consumer: self,
454 sequence: next_sequence,
455 event_ptr,
456 };
457 }
458 super::wait::perform_default_consume_sleep_wait();
459 }
460 }
461
462 pub fn process_next_blocking<F>(&mut self, mut processor: F) -> (Sequence, bool)
467 where
468 F: FnMut(&E, Sequence, bool),
469 {
470 let (sequence, event) = self.consume_next();
472
473 let end_of_batch = self.is_end_of_batch();
475
476 processor(&event, sequence, end_of_batch);
478
479 (sequence, end_of_batch)
480 }
481
482 pub fn process_next_blocking_with_sleep<F>(&mut self, mut processor: F) -> (Sequence, bool)
487 where
488 F: FnMut(&E, Sequence, bool),
489 {
490 let (sequence, event) = self.consume_next_with_sleep();
492
493 let end_of_batch = self.is_end_of_batch();
495
496 processor(&event, sequence, end_of_batch);
498
499 (sequence, end_of_batch)
500 }
501
502 pub fn process_available_blocking<F>(&mut self, mut processor: F) -> usize
507 where
508 F: FnMut(&E, Sequence, bool),
509 {
510 let mut processed = 0usize;
511
512 loop {
513 if let Some((lower, upper)) = self.available_batch_bounds() {
514 processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
515 break;
516 }
517
518 std::hint::spin_loop();
520 }
521
522 while let Some((lower, upper)) = self.available_batch_bounds() {
523 processed += self.process_snapshot_batch_with_eob(lower, upper, &mut processor);
524 }
525
526 processed
527 }
528
529 pub fn process_available<F>(&mut self, mut processor: F) -> usize
532 where
533 F: FnMut(&E, Sequence),
534 {
535 #[cfg(dst)]
536 if crate::dst::buggify(file!(), line!()) {
537 return 0;
538 }
539
540 let mut processed = 0usize;
541 let mut observed_batch = false;
542
543 while let Some((lower, upper)) = self.available_batch_bounds() {
544 observed_batch = true;
545 processed += self.process_snapshot_batch(lower, upper, &mut processor);
546 }
547
548 if !observed_batch {
549 if let Some(h) = &self.counters.consumer_empty_spins {
550 h.inc();
551 }
552 }
553
554 processed
555 }
556
557 pub fn current_sequence(&self) -> Sequence {
559 self.last_processed_sequence
560 }
561
562 pub fn producer_sequence(&self) -> Sequence {
564 self.producer_sequence.load(Ordering::Acquire)
566 }
567
568 pub fn consumer_sequence(&self) -> Sequence {
570 self.consumer_sequence.load(Ordering::Acquire)
572 }
573
574 pub fn debug_sequences(&self) -> (Sequence, Sequence, Sequence) {
576 let producer_seq = self.producer_sequence.load(Ordering::Acquire);
577 let consumer_seq = self.consumer_sequence.load(Ordering::Acquire);
578 (self.last_processed_sequence, producer_seq, consumer_seq)
579 }
580
581 pub fn consumer_id(&self) -> &str {
583 &self.consumer_id
584 }
585}
586
587