disruptor_mp/backend/mmap/
consumer.rs1use crate::{MmapCursor, MmapRingBuffer, MmapTransportLayout, MultiProcessResult};
4use disruptor_core::Sequence;
5use std::ops::Deref;
6use std::sync::atomic::Ordering;
7
8pub struct MmapConsumer<E> {
10 ring_buffer: MmapRingBuffer<E>,
11 producer_sequence: MmapCursor,
12 consumer_sequence: MmapCursor,
13 consumer_id: String,
14 last_processed_sequence: Sequence,
15 readiness_cursor: Option<MmapCursor>,
16}
17
18pub struct MmapConsumerLease<'a, E>
19where
20 E: Copy + Default,
21{
22 consumer: &'a mut MmapConsumer<E>,
23 sequence: Sequence,
24 event_ptr: *const E,
25}
26
27impl<E> MmapConsumerLease<'_, E>
28where
29 E: Copy + Default,
30{
31 pub fn sequence(&self) -> Sequence {
32 self.sequence
33 }
34}
35
36impl<E> Deref for MmapConsumerLease<'_, E>
37where
38 E: Copy + Default,
39{
40 type Target = E;
41
42 fn deref(&self) -> &Self::Target {
43 unsafe { &*self.event_ptr }
46 }
47}
48
49impl<E> Drop for MmapConsumerLease<'_, E>
50where
51 E: Copy + Default,
52{
53 fn drop(&mut self) {
54 self.consumer
55 .consumer_sequence
56 .store(self.sequence, Ordering::Release);
57 self.consumer.last_processed_sequence = self.sequence;
58 }
59}
60
61impl<E> MmapConsumer<E>
62where
63 E: Copy + Default,
64{
65 #[inline]
66 fn available_batch_bounds(&self) -> Option<(Sequence, Sequence)> {
67 let producer_sequence = self.producer_sequence.load(Ordering::Acquire);
68 let next_sequence = self.last_processed_sequence + 1;
69 if next_sequence > producer_sequence {
70 return None;
71 }
72 Some((next_sequence, producer_sequence))
73 }
74
75 #[inline]
76 fn publish_consumed_sequence(&mut self, sequence: Sequence) {
77 self.consumer_sequence.store(sequence, Ordering::Release);
78 self.last_processed_sequence = sequence;
79 }
80
81 #[inline]
82 fn process_snapshot_batch<F>(
83 &mut self,
84 lower: Sequence,
85 upper: Sequence,
86 processor: &mut F,
87 ) -> usize
88 where
89 F: FnMut(&E, Sequence),
90 {
91 let mut processed = 0usize;
92 for sequence in lower..=upper {
93 let event_ptr = self.ring_buffer.get(sequence);
94 let event = unsafe { &*event_ptr };
95 processor(event, sequence);
96 processed += 1;
97 }
98 self.publish_consumed_sequence(upper);
99 processed
100 }
101
102 pub fn attach(
104 layout: MmapTransportLayout,
105 buffer_size: usize,
106 consumer_id: &str,
107 ) -> MultiProcessResult<Self> {
108 let ring_buffer = MmapRingBuffer::attach(layout.ring_config(
109 buffer_size,
110 std::mem::size_of::<E>(),
111 false,
112 ))?;
113 let producer_sequence = MmapCursor::attach(layout.producer_cursor_config(false))?;
114 let consumer_sequence =
115 MmapCursor::new_or_attach(layout.consumer_cursor_config(consumer_id, true)?, -1)?;
116 let is_new_consumer = consumer_sequence.is_owner();
117 let readiness_cursor = MmapCursor::attach(layout.readiness_cursor_config(false)).ok();
118
119 let mut consumer = Self {
120 ring_buffer,
121 producer_sequence,
122 consumer_sequence,
123 consumer_id: consumer_id.to_string(),
124 last_processed_sequence: -1,
125 readiness_cursor,
126 };
127
128 consumer.last_processed_sequence = consumer.consumer_sequence.load(Ordering::Acquire);
129 if is_new_consumer {
130 consumer.signal_readiness();
131 }
132 Ok(consumer)
133 }
134
135 pub fn signal_readiness(&self) {
137 if let Some(readiness_cursor) = &self.readiness_cursor {
138 readiness_cursor.fetch_add(1, Ordering::AcqRel);
139 }
140 }
141
142 pub fn has_coordination_support(&self) -> bool {
144 self.readiness_cursor.is_some()
145 }
146
147 pub fn try_consume_next(&mut self) -> Option<(Sequence, E)> {
149 let (next_sequence, _upper) = self.available_batch_bounds()?;
150
151 let event_ptr = self.ring_buffer.get(next_sequence);
152 let event = unsafe { *event_ptr };
153
154 self.publish_consumed_sequence(next_sequence);
155 Some((next_sequence, event))
156 }
157
158 pub fn try_consume_next_leased(&mut self) -> Option<MmapConsumerLease<'_, E>> {
160 let (next_sequence, _upper) = self.available_batch_bounds()?;
161
162 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
163 Some(MmapConsumerLease {
164 consumer: self,
165 sequence: next_sequence,
166 event_ptr,
167 })
168 }
169
170 pub fn process_available<F>(&mut self, mut processor: F) -> usize
172 where
173 F: FnMut(&E, Sequence),
174 {
175 let Some((lower, upper)) = self.available_batch_bounds() else {
176 return 0;
177 };
178 self.process_snapshot_batch(lower, upper, &mut processor)
179 }
180
181 pub fn consume_next(&mut self) -> (Sequence, E) {
183 loop {
184 if let Some(result) = self.try_consume_next() {
185 return result;
186 }
187 std::hint::spin_loop();
188 }
189 }
190
191 pub fn consume_next_with_sleep(&mut self) -> (Sequence, E) {
193 loop {
194 if let Some(result) = self.try_consume_next() {
195 return result;
196 }
197 crate::perform_default_consume_sleep_wait();
198 }
199 }
200
201 pub fn consume_next_leased(&mut self) -> MmapConsumerLease<'_, E> {
203 loop {
204 if let Some((next_sequence, _upper)) = self.available_batch_bounds() {
205 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
206 return MmapConsumerLease {
207 consumer: self,
208 sequence: next_sequence,
209 event_ptr,
210 };
211 }
212 std::hint::spin_loop();
213 }
214 }
215
216 pub fn consume_next_leased_with_sleep(&mut self) -> MmapConsumerLease<'_, E> {
218 loop {
219 if let Some((next_sequence, _upper)) = self.available_batch_bounds() {
220 let event_ptr = self.ring_buffer.get(next_sequence) as *const E;
221 return MmapConsumerLease {
222 consumer: self,
223 sequence: next_sequence,
224 event_ptr,
225 };
226 }
227 crate::perform_default_consume_sleep_wait();
228 }
229 }
230
231 pub fn current_sequence(&self) -> Sequence {
233 self.last_processed_sequence
234 }
235
236 pub fn consumer_id(&self) -> &str {
238 &self.consumer_id
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use crate::MmapProducer;
246 use std::time::{SystemTime, UNIX_EPOCH};
247
248 #[derive(Debug, Copy, Clone, Default, PartialEq)]
249 struct TestEvent {
250 sequence: i64,
251 data: i64,
252 }
253
254 fn unique_layout(prefix: &str) -> MmapTransportLayout {
255 let pid = std::process::id();
256 let nanos = SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .expect("system time should be valid")
259 .as_nanos();
260 let root = std::env::temp_dir().join(format!("{prefix}_{pid}_{nanos}"));
261 MmapTransportLayout::new(root, "queue01").unwrap()
262 }
263
264 #[test]
265 fn process_available_borrows_ring_slots_instead_of_stack_copies() {
266 let layout = unique_layout("mmap_process_available_ref");
267 let mut producer =
268 MmapProducer::<TestEvent>::create(layout.clone(), 8, TestEvent::default).unwrap();
269 let mut consumer = MmapConsumer::<TestEvent>::attach(layout.clone(), 8, "c0001").unwrap();
270
271 for i in 0..4 {
272 producer.publish(|event| {
273 event.sequence = i;
274 event.data = i * 10;
275 });
276 }
277
278 let expected_ptrs: Vec<usize> = (0..4)
279 .map(|seq| consumer.ring_buffer.get(seq) as *const TestEvent as usize)
280 .collect();
281 let mut seen_ptrs = Vec::new();
282 let processed = consumer.process_available(|event, _seq| {
283 seen_ptrs.push(event as *const TestEvent as usize);
284 });
285
286 assert_eq!(processed, 4);
287 assert_eq!(seen_ptrs, expected_ptrs);
288
289 let _ = std::fs::remove_dir_all(layout.root_dir());
290 }
291
292 #[test]
293 fn process_available_publishes_consumer_sequence_after_batch_completion() {
294 let layout = unique_layout("mmap_process_available_cursor");
295 let mut producer =
296 MmapProducer::<TestEvent>::create(layout.clone(), 8, TestEvent::default).unwrap();
297 let mut consumer = MmapConsumer::<TestEvent>::attach(layout.clone(), 8, "c0001").unwrap();
298
299 for i in 0..6 {
300 producer.publish(|event| {
301 event.sequence = i;
302 event.data = i * 100;
303 });
304 }
305
306 let initial_sequence = producer
307 .consumer_sequence("c0001")
308 .expect("consumer cursor should be discoverable");
309 let mut seen = Vec::new();
310 let processed = consumer.process_available(|event, seq| {
311 let observed = producer
312 .consumer_sequence("c0001")
313 .expect("consumer cursor should stay readable during callback");
314 assert_eq!(observed, initial_sequence);
315 seen.push((seq, event.sequence, event.data));
316 });
317
318 assert_eq!(processed, 6);
319 assert_eq!(seen.len(), 6);
320 assert_eq!(
321 producer.consumer_sequence("c0001"),
322 Some((processed - 1) as i64)
323 );
324 assert_eq!(consumer.current_sequence(), (processed - 1) as i64);
325
326 let _ = std::fs::remove_dir_all(layout.root_dir());
327 }
328}