Skip to main content

disruptor_mp/backend/mmap/
consumer.rs

1//! Mmap-backed consumer implementation.
2
3use crate::{MmapCursor, MmapRingBuffer, MmapTransportLayout, MultiProcessResult};
4use disruptor_core::Sequence;
5use std::ops::Deref;
6use std::sync::atomic::Ordering;
7
8/// Consumer for the mmap transport.
9pub struct MmapConsumer<E> {
10    ring_buffer: MmapRingBuffer<E>,
11    producer_sequence: MmapCursor,
12    consumer_sequence: MmapCursor,
13    consumer_id: String,
14    last_processed_sequence: Sequence,
15    readiness_cursor: Option<MmapCursor>,
16}
17
18pub struct MmapConsumerLease<'a, E>
19where
20    E: Copy + Default,
21{
22    consumer: &'a mut MmapConsumer<E>,
23    sequence: Sequence,
24    event_ptr: *const E,
25}
26
27impl<E> MmapConsumerLease<'_, E>
28where
29    E: Copy + Default,
30{
31    pub fn sequence(&self) -> Sequence {
32        self.sequence
33    }
34}
35
36impl<E> Deref for MmapConsumerLease<'_, E>
37where
38    E: Copy + Default,
39{
40    type Target = E;
41
42    fn deref(&self) -> &Self::Target {
43        // Safety: the lease keeps the consumer sequence unpublished until drop,
44        // so the producer cannot reuse the backing ring slot.
45        unsafe { &*self.event_ptr }
46    }
47}
48
49impl<E> Drop for MmapConsumerLease<'_, E>
50where
51    E: Copy + Default,
52{
53    fn drop(&mut self) {
54        self.consumer
55            .consumer_sequence
56            .store(self.sequence, Ordering::Release);
57        self.consumer.last_processed_sequence = self.sequence;
58    }
59}
60
61impl<E> MmapConsumer<E>
62where
63    E: Copy + Default,
64{
65    #[inline]
66    fn available_batch_bounds(&self) -> Option<(Sequence, Sequence)> {
67        let producer_sequence = self.producer_sequence.load(Ordering::Acquire);
68        let next_sequence = self.last_processed_sequence + 1;
69        if next_sequence > producer_sequence {
70            return None;
71        }
72        Some((next_sequence, producer_sequence))
73    }
74
75    #[inline]
76    fn publish_consumed_sequence(&mut self, sequence: Sequence) {
77        self.consumer_sequence.store(sequence, Ordering::Release);
78        self.last_processed_sequence = sequence;
79    }
80
81    #[inline]
82    fn process_snapshot_batch<F>(
83        &mut self,
84        lower: Sequence,
85        upper: Sequence,
86        processor: &mut F,
87    ) -> usize
88    where
89        F: FnMut(&E, Sequence),
90    {
91        let mut processed = 0usize;
92        for sequence in lower..=upper {
93            let event_ptr = self.ring_buffer.get(sequence);
94            let event = unsafe { &*event_ptr };
95            processor(event, sequence);
96            processed += 1;
97        }
98        self.publish_consumed_sequence(upper);
99        processed
100    }
101
102    /// Attach a consumer to an existing mmap transport.
103    pub fn attach(
104        layout: MmapTransportLayout,
105        buffer_size: usize,
106        consumer_id: &str,
107    ) -> MultiProcessResult<Self> {
108        let ring_buffer = MmapRingBuffer::attach(layout.ring_config(
109            buffer_size,
110            std::mem::size_of::<E>(),
111            false,
112        ))?;
113        let producer_sequence = MmapCursor::attach(layout.producer_cursor_config(false))?;
114        let consumer_sequence =
115            MmapCursor::new_or_attach(layout.consumer_cursor_config(consumer_id, true)?, -1)?;
116        let is_new_consumer = consumer_sequence.is_owner();
117        let readiness_cursor = MmapCursor::attach(layout.readiness_cursor_config(false)).ok();
118
119        let mut consumer = Self {
120            ring_buffer,
121            producer_sequence,
122            consumer_sequence,
123            consumer_id: consumer_id.to_string(),
124            last_processed_sequence: -1,
125            readiness_cursor,
126        };
127
128        consumer.last_processed_sequence = consumer.consumer_sequence.load(Ordering::Acquire);
129        if is_new_consumer {
130            consumer.signal_readiness();
131        }
132        Ok(consumer)
133    }
134
135    /// Signal that this consumer is ready.
136    pub fn signal_readiness(&self) {
137        if let Some(readiness_cursor) = &self.readiness_cursor {
138            readiness_cursor.fetch_add(1, Ordering::AcqRel);
139        }
140    }
141
142    /// Return whether this consumer has a readiness cursor attached.
143    pub fn has_coordination_support(&self) -> bool {
144        self.readiness_cursor.is_some()
145    }
146
147    /// Try to consume the next available event.
148    pub fn try_consume_next(&mut self) -> Option<(Sequence, E)> {
149        let (next_sequence, _upper) = self.available_batch_bounds()?;
150
151        let event_ptr = self.ring_buffer.get(next_sequence);
152        let event = unsafe { *event_ptr };
153
154        self.publish_consumed_sequence(next_sequence);
155        Some((next_sequence, event))
156    }
157
158    /// Try to lease the next available event without copying it out of the ring slot.
159    pub fn try_consume_next_leased(&mut self) -> Option<MmapConsumerLease<'_, E>> {
160        let (next_sequence, _upper) = self.available_batch_bounds()?;
161
162        let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
163        Some(MmapConsumerLease {
164            consumer: self,
165            sequence: next_sequence,
166            event_ptr,
167        })
168    }
169
170    /// Process all currently available events.
171    pub fn process_available<F>(&mut self, mut processor: F) -> usize
172    where
173        F: FnMut(&E, Sequence),
174    {
175        let Some((lower, upper)) = self.available_batch_bounds() else {
176            return 0;
177        };
178        self.process_snapshot_batch(lower, upper, &mut processor)
179    }
180
181    /// Block until one event is available, then consume it.
182    pub fn consume_next(&mut self) -> (Sequence, E) {
183        loop {
184            if let Some(result) = self.try_consume_next() {
185                return result;
186            }
187            std::hint::spin_loop();
188        }
189    }
190
191    /// Block until one event is available, then consume it with the configured sleep policy.
192    pub fn consume_next_with_sleep(&mut self) -> (Sequence, E) {
193        loop {
194            if let Some(result) = self.try_consume_next() {
195                return result;
196            }
197            crate::perform_default_consume_sleep_wait();
198        }
199    }
200
201    /// Block until one event is available, then lease it.
202    pub fn consume_next_leased(&mut self) -> MmapConsumerLease<'_, E> {
203        loop {
204            if let Some((next_sequence, _upper)) = self.available_batch_bounds() {
205                let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
206                return MmapConsumerLease {
207                    consumer: self,
208                    sequence: next_sequence,
209                    event_ptr,
210                };
211            }
212            std::hint::spin_loop();
213        }
214    }
215
216    /// Block until one event is available, then lease it with the configured sleep policy.
217    pub fn consume_next_leased_with_sleep(&mut self) -> MmapConsumerLease<'_, E> {
218        loop {
219            if let Some((next_sequence, _upper)) = self.available_batch_bounds() {
220                let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
221                return MmapConsumerLease {
222                    consumer: self,
223                    sequence: next_sequence,
224                    event_ptr,
225                };
226            }
227            crate::perform_default_consume_sleep_wait();
228        }
229    }
230
231    /// Return the last processed sequence.
232    pub fn current_sequence(&self) -> Sequence {
233        self.last_processed_sequence
234    }
235
236    /// Return the logical consumer id.
237    pub fn consumer_id(&self) -> &str {
238        &self.consumer_id
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use crate::MmapProducer;
246    use std::time::{SystemTime, UNIX_EPOCH};
247
248    #[derive(Debug, Copy, Clone, Default, PartialEq)]
249    struct TestEvent {
250        sequence: i64,
251        data: i64,
252    }
253
254    fn unique_layout(prefix: &str) -> MmapTransportLayout {
255        let pid = std::process::id();
256        let nanos = SystemTime::now()
257            .duration_since(UNIX_EPOCH)
258            .expect("system time should be valid")
259            .as_nanos();
260        let root = std::env::temp_dir().join(format!("{prefix}_{pid}_{nanos}"));
261        MmapTransportLayout::new(root, "queue01").unwrap()
262    }
263
264    #[test]
265    fn process_available_borrows_ring_slots_instead_of_stack_copies() {
266        let layout = unique_layout("mmap_process_available_ref");
267        let mut producer =
268            MmapProducer::<TestEvent>::create(layout.clone(), 8, TestEvent::default).unwrap();
269        let mut consumer = MmapConsumer::<TestEvent>::attach(layout.clone(), 8, "c0001").unwrap();
270
271        for i in 0..4 {
272            producer.publish(|event| {
273                event.sequence = i;
274                event.data = i * 10;
275            });
276        }
277
278        let expected_ptrs: Vec<usize> = (0..4)
279            .map(|seq| consumer.ring_buffer.get(seq) as *const TestEvent as usize)
280            .collect();
281        let mut seen_ptrs = Vec::new();
282        let processed = consumer.process_available(|event, _seq| {
283            seen_ptrs.push(event as *const TestEvent as usize);
284        });
285
286        assert_eq!(processed, 4);
287        assert_eq!(seen_ptrs, expected_ptrs);
288
289        let _ = std::fs::remove_dir_all(layout.root_dir());
290    }
291
292    #[test]
293    fn process_available_publishes_consumer_sequence_after_batch_completion() {
294        let layout = unique_layout("mmap_process_available_cursor");
295        let mut producer =
296            MmapProducer::<TestEvent>::create(layout.clone(), 8, TestEvent::default).unwrap();
297        let mut consumer = MmapConsumer::<TestEvent>::attach(layout.clone(), 8, "c0001").unwrap();
298
299        for i in 0..6 {
300            producer.publish(|event| {
301                event.sequence = i;
302                event.data = i * 100;
303            });
304        }
305
306        let initial_sequence = producer
307            .consumer_sequence("c0001")
308            .expect("consumer cursor should be discoverable");
309        let mut seen = Vec::new();
310        let processed = consumer.process_available(|event, seq| {
311            let observed = producer
312                .consumer_sequence("c0001")
313                .expect("consumer cursor should stay readable during callback");
314            assert_eq!(observed, initial_sequence);
315            seen.push((seq, event.sequence, event.data));
316        });
317
318        assert_eq!(processed, 6);
319        assert_eq!(seen.len(), 6);
320        assert_eq!(
321            producer.consumer_sequence("c0001"),
322            Some((processed - 1) as i64)
323        );
324        assert_eq!(consumer.current_sequence(), (processed - 1) as i64);
325
326        let _ = std::fs::remove_dir_all(layout.root_dir());
327    }
328}