ansa/handles.rs
1use crate::ringbuffer::RingBuffer;
2use crate::wait::{TryWaitStrategy, WaitStrategy};
3use std::collections::Bound;
4use std::ops::RangeBounds;
5use std::sync::atomic::{fence, AtomicI64, Ordering};
6use std::sync::Arc;
7
8/// A handle with mutable access to events on the ring buffer.
9///
10/// `MultiProducer`s can be cloned to enable distributed writes. Clones coordinate by claiming
11/// non-overlapping ranges of sequence values, which can be writen to in parallel.
12///
13/// Clones of this `MultiProducer` share this producer's cursor.
14///
15/// See: **\[INSERT LINK]** for alternative methods of parallelizing producers without using
16/// `MultiProducer`.
17///
18/// # Examples
19/// ```
20/// use ansa::{DisruptorBuilder, Follows, Handle};
21///
22/// let mut handles = DisruptorBuilder::new(64, || 0)
23/// .add_handle(0, Handle::Producer, Follows::LeadProducer)
24/// .build()?;
25///
26/// // lead and trailing are separate handles with separate cursors and clones
27/// let lead = handles.take_lead().unwrap().into_multi();
28/// let trailing = handles.take_producer(0).unwrap().into_multi();
29///
30/// let lead_clone = lead.clone();
31///
32/// assert_eq!(lead.count(), 2);
33/// assert_eq!(trailing.count(), 1);
34/// # Ok::<(), ansa::BuildError>(())
35/// ```
36///
37/// # Limitations
38///
39/// The `MultiProducer` API is quite limited in comparison to either [`Producers`](Producer) or
40/// [`Consumers`](Consumer). This is due to the mechanism used to coordinate `MultiProducer` clones.
41///
42/// TL;DR, the `MultiProducer` limitations are:
43/// 1) `wait_range` method cannot be implemented.
44/// 2) Separate `wait` and `apply` methods cannot be conveniently provided.
45/// 3) The `try_wait_apply` method will always move the cursor up by the requested batch size,
46/// even if an error occurs.
47///
48/// As soon as a `MultiProducer` clone claims and waits for available sequences, it is commited to
49/// moving the shared cursor up to the end of the claimed batch, regardless of whether either
50/// waiting for or processing the batch is successful or not.
51///
52/// This commitment requires that batches always be completed (and the cursor moved), even for
53/// fallible methods where errors might occur. Importantly, this means that handles which follow a
54/// `MultiProducer` cannot assume, e.g., that an event was successfully written based only on its
55/// sequence becoming available.
56///
57/// Another consequence is that batches cannot be sized dynamically depending on what sequences are
58/// actually available, as the bounds of the claim must be used. Hence, `wait_range` cannot be
59/// implemented.
60///
61/// Lastly, separate `wait` methods cannot be provided, because [`EventsMut`] structs cannot be
62/// guaranteed to operate correctly in user code. If an `EventsMut` is dropped without an apply
63/// method being called, then the claim it was generated from will go unused, ultimately causing
64/// permanent disruptor stalls. Instead, combined wait and apply methods are provided.
65#[derive(Debug)]
66pub struct MultiProducer<E, W, const LEAD: bool> {
67 inner: HandleInner<E, W, LEAD>,
68 claim: Arc<Cursor>, // shared between all clones of this multi producer
69}
70
71impl<E, W, const LEAD: bool> Clone for MultiProducer<E, W, LEAD>
72where
73 W: Clone,
74{
75 fn clone(&self) -> Self {
76 MultiProducer {
77 inner: HandleInner {
78 cursor: Arc::clone(&self.inner.cursor),
79 barrier: self.inner.barrier.clone(),
80 buffer: Arc::clone(&self.inner.buffer),
81 wait_strategy: self.inner.wait_strategy.clone(),
82 available: self.inner.available,
83 },
84 claim: Arc::clone(&self.claim),
85 }
86 }
87}
88
89impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD> {
90 /// Returns `true` if this producer is the lead producer.
91 ///
92 /// # Examples
93 /// ```
94 /// use ansa::*;
95 ///
96 /// let mut handles = DisruptorBuilder::new(64, || 0)
97 /// .add_handle(0, Handle::Producer, Follows::LeadProducer)
98 /// .build()?;
99 ///
100 /// let lead_multi = handles.take_lead().unwrap().into_multi();
101 /// let follow_multi = handles.take_producer(0).unwrap().into_multi();
102 ///
103 /// assert_eq!(lead_multi.is_lead(), true);
104 /// assert_eq!(follow_multi.is_lead(), false);
105 /// # Ok::<(), BuildError>(())
106 /// ```
107 pub const fn is_lead(&self) -> bool {
108 LEAD
109 }
110
111 /// Returns the count of [`MultiProducer`]s associated with this cursor.
112 ///
113 /// **Important:** Care should be taken when performing actions based upon this number, as any
114 /// thread which holds an associated [`MultiProducer`] may clone it at any time, thereby
115 /// changing the count.
116 ///
117 /// # Examples
118 /// ```
119 /// let (multi, _) = ansa::mpsc(64, || 0);
120 /// assert_eq!(multi.count(), 1);
121 ///
122 /// let multi_2 = multi.clone();
123 /// assert_eq!(multi.count(), 2);
124 /// // consume a `MultiProducer` by attempting the conversion into a `Producer`
125 /// assert!(matches!(multi.into_producer(), None));
126 /// assert_eq!(multi_2.count(), 1);
127 /// ```
128 #[inline]
129 pub fn count(&self) -> usize {
130 Arc::strong_count(&self.claim)
131 }
132
133 /// Returns the current sequence value for this producer's cursor.
134 ///
135 /// **Important:** The cursor for a `MultiProducer` is shared by all of its clones. Any of
136 /// these clones could alter this value at any time, if they are writing to the buffer.
137 ///
138 /// # Examples
139 /// ```
140 /// let (mut multi, _) = ansa::mpsc(64, || 0);
141 /// // sequences start at -1, but accesses always occur at the next sequence,
142 /// // so the first accessed sequence will be 0
143 /// assert_eq!(multi.sequence(), -1);
144 ///
145 /// // move the producer cursor up by 10
146 /// multi.wait_for_each(10, |_, _, _| ());
147 /// assert_eq!(multi.sequence(), 9);
148 ///
149 /// // clone and move only the clone
150 /// let mut clone = multi.clone();
151 /// clone.wait_for_each(10, |_, _, _| ());
152 ///
153 /// // Both have moved because all clones share the same cursor
154 /// assert_eq!(multi.sequence(), 19);
155 /// assert_eq!(clone.sequence(), 19);
156 /// ```
157 #[inline]
158 pub fn sequence(&self) -> i64 {
159 self.inner.cursor.sequence.load(Ordering::Relaxed)
160 }
161
162 /// Returns the size of the ring buffer.
163 ///
164 /// # Examples
165 /// ```
166 /// let (producer, _) = ansa::mpsc(64, || 0);
167 /// assert_eq!(producer.buffer_size(), 64);
168 /// ```
169 #[inline]
170 pub fn buffer_size(&self) -> usize {
171 self.inner.buffer_size()
172 }
173
174 /// Set the wait strategy for this `MultiProducer` clone.
175 ///
176 /// Does not alter the wait strategy for any other handle, _including_ other clones of this
177 /// `MultiProducer`.
178 ///
179 /// # Examples
180 /// ```
181 /// use ansa::MultiProducer;
182 /// use ansa::wait::{WaitBusy, WaitPhased, WaitSleep};
183 ///
184 /// let (multi, _) = ansa::mpsc(64, || 0u8);
185 /// let clone = multi.clone();
186 /// // multi changes its strategy
187 /// let _: MultiProducer<u8, WaitBusy, true> = multi.set_wait_strategy(WaitBusy);
188 /// // clone keeps the original wait strategy
189 /// let _: MultiProducer<u8, WaitPhased<WaitSleep>, true> = clone;
190 /// ```
191 #[inline]
192 pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> MultiProducer<E, W2, LEAD> {
193 MultiProducer {
194 inner: self.inner.set_wait_strategy(wait_strategy),
195 claim: self.claim,
196 }
197 }
198
199 /// Return a [`Producer`] if exactly one `MultiProducer` exists for this cursor.
200 ///
201 /// Otherwise, return `None` and drops this producer.
202 ///
203 /// If this function is called when only one `MultiProducer` exists, then it is guaranteed to
204 /// return a `Producer`.
205 ///
206 /// # Examples
207 /// ```
208 /// let (multi, _) = ansa::mpsc(64, || 0);
209 /// let multi_clone = multi.clone();
210 ///
211 /// assert!(matches!(multi.into_producer(), None));
212 /// assert!(matches!(multi_clone.into_producer(), Some(ansa::Producer { .. })));
213 /// ```
214 #[inline]
215 pub fn into_producer(self) -> Option<Producer<E, W, LEAD>> {
216 Arc::into_inner(self.claim).map(|_| self.inner.into_producer())
217 }
218
219 #[inline]
220 fn wait_bounds(&self, size: i64) -> (i64, i64) {
221 let mut current_claim = self.claim.sequence.load(Ordering::Relaxed);
222 let mut claim_end = current_claim + size;
223 while let Err(new_current) = self.claim.sequence.compare_exchange(
224 current_claim,
225 claim_end,
226 Ordering::AcqRel,
227 Ordering::Relaxed,
228 ) {
229 current_claim = new_current;
230 claim_end = new_current + size;
231 }
232 let desired_seq = if LEAD {
233 claim_end - saturate_i64(self.inner.buffer.size())
234 } else {
235 claim_end
236 };
237 (current_claim, desired_seq)
238 }
239
240 #[inline]
241 fn update_cursor(&self, start: i64, end: i64) {
242 while self.inner.cursor.sequence.load(Ordering::Acquire) != start {
243 std::hint::spin_loop();
244 }
245 self.inner.cursor.sequence.store(end, Ordering::Release)
246 }
247}
248
249impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD>
250where
251 W: WaitStrategy,
252{
253 /// Wait for and process a batch of exactly `size` number of events.
254 ///
255 /// `size` values larger than the buffer will cause permanent stalls.
256 #[inline]
257 pub fn wait_for_each<F>(&mut self, size: usize, mut f: F)
258 where
259 F: FnMut(&mut E, i64, bool),
260 {
261 debug_assert!(size <= self.inner.buffer.size());
262 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
263 if self.inner.available < till_seq {
264 self.inner.available = self.inner.wait_strategy.wait(till_seq, &self.inner.barrier);
265 }
266 debug_assert!(self.inner.available >= till_seq);
267 fence(Ordering::Acquire);
268 // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
269 // before this handle has finished interacting with it. Construction of the disruptor
270 // guarantees producers don't overlap with other handles, thus no mutable aliasing.
271 unsafe {
272 self.inner.buffer.apply(from_seq + 1, size, |ptr, seq, end| {
273 // SAFETY: deref guaranteed safe by ringbuffer initialisation
274 let event: &mut E = &mut *ptr;
275 f(event, seq, end)
276 })
277 };
278 // Ordering::Release performed by this method
279 self.update_cursor(from_seq, from_seq + saturate_i64(size))
280 }
281}
282
283impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD>
284where
285 W: TryWaitStrategy,
286{
287 /// Wait for and process a batch of exactly `size` number of events.
288 ///
289 /// If waiting fails, returns the wait error, `W::Error`, which must be convertible to `Err`.
290 ///
291 /// If processing the batch fails, returns `Err`.
292 ///
293 /// `size` values larger than the buffer will cause permanent stalls.
294 #[inline]
295 pub fn try_wait_for_each<F, Err>(&mut self, size: usize, mut f: F) -> Result<(), Err>
296 where
297 F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
298 Err: From<W::Error>,
299 {
300 debug_assert!(size <= self.inner.buffer.size());
301 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
302 fence(Ordering::Acquire);
303 if self.inner.available < till_seq {
304 self.inner.available = self
305 .inner
306 .wait_strategy
307 .try_wait(till_seq, &self.inner.barrier)
308 .inspect_err(|_| self.update_cursor(from_seq, from_seq + saturate_i64(size)))?;
309 }
310 debug_assert!(self.inner.available >= till_seq);
311 // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
312 // before this handle has finished interacting with it. Construction of the disruptor
313 // guarantees producers don't overlap with other handles, thus no mutable aliasing.
314 let result = unsafe {
315 self.inner.buffer.try_apply(from_seq + 1, size, |ptr, seq, end| {
316 // SAFETY: deref guaranteed safe by ringbuffer initialisation
317 let event: &mut E = &mut *ptr;
318 f(event, seq, end)
319 })
320 };
321 // Ordering::Release performed by this method
322 self.update_cursor(from_seq, from_seq + saturate_i64(size));
323 result
324 }
325}
326
327/// A handle with mutable access to events on the ring buffer.
328///
329/// Cannot access events concurrently with other handles.
330#[derive(Debug)]
331#[repr(transparent)]
332pub struct Producer<E, W, const LEAD: bool> {
333 inner: HandleInner<E, W, LEAD>,
334}
335
336impl<E, W, const LEAD: bool> Producer<E, W, LEAD> {
337 /// Returns `true` if this producer is the lead producer.
338 pub const fn is_lead(&self) -> bool {
339 LEAD
340 }
341
342 /// Returns the current sequence value for this producer's cursor.
343 ///
344 /// # Examples
345 /// ```
346 /// let (mut producer, _) = ansa::spsc(64, || 0);
347 /// // sequences start at -1, but accesses always occur at the next sequence,
348 /// // so the first accessed sequence will be 0
349 /// assert_eq!(producer.sequence(), -1);
350 ///
351 /// // move the producer up by 10
352 /// producer.wait(10).for_each(|_, _, _| ());
353 /// assert_eq!(producer.sequence(), 9);
354 /// ```
355 #[inline]
356 pub fn sequence(&self) -> i64 {
357 self.inner.cursor.sequence.load(Ordering::Relaxed)
358 }
359
360 /// Returns the size of the ring buffer.
361 ///
362 /// # Examples
363 /// ```
364 /// let (producer, _) = ansa::spsc(64, || 0);
365 /// assert_eq!(producer.buffer_size(), 64);
366 /// ```
367 #[inline]
368 pub fn buffer_size(&self) -> usize {
369 self.inner.buffer_size()
370 }
371
372 /// Returns a new handle with the given `wait_strategy`, consuming this handle.
373 ///
374 /// All other properties of the original handle remain unchanged in the new handle.
375 ///
376 /// Does not alter the wait strategy for any other handle.
377 ///
378 /// # Examples
379 /// ```
380 /// use ansa::Producer;
381 /// use ansa::wait::WaitBusy;
382 ///
383 /// let (producer, _) = ansa::spsc(64, || 0u8);
384 /// let _: Producer<u8, WaitBusy, true> = producer.set_wait_strategy(WaitBusy);
385 /// ```
386 #[inline]
387 pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> Producer<E, W2, LEAD> {
388 self.inner.set_wait_strategy(wait_strategy).into_producer()
389 }
390
391 /// Converts this `Producer` into a [`MultiProducer`].
392 ///
393 /// # Examples
394 /// ```
395 /// let (producer, _) = ansa::spsc(64, || 0);
396 ///
397 /// assert!(matches!(producer.into_multi(), ansa::MultiProducer { .. }))
398 /// ```
399 #[inline]
400 pub fn into_multi(self) -> MultiProducer<E, W, LEAD> {
401 let producer_seq = self.sequence();
402 MultiProducer {
403 inner: self.inner,
404 claim: Arc::new(Cursor::new(producer_seq)),
405 }
406 }
407}
408
409impl<E, W, const LEAD: bool> Producer<E, W, LEAD>
410where
411 W: WaitStrategy,
412{
413 /// Wait until exactly `size` number of events are available.
414 ///
415 /// `size` values larger than the buffer will cause permanent stalls.
416 #[inline]
417 pub fn wait(&mut self, size: usize) -> EventsMut<'_, E> {
418 EventsMut(self.inner.wait(size))
419 }
420
421 /// Wait until a number of events within the given range are available.
422 ///
423 /// If a lower bound is provided, it must be less than the buffer size.
424 ///
425 /// # Examples
426 ///
427 /// ```
428 /// let (mut producer, _) = ansa::spsc(64, || 0);
429 ///
430 /// producer.wait_range(1..=10); // waits for a batch of at most 10 events
431 /// producer.wait_range(10..); // waits for a batch of at least 10 events
432 /// producer.wait_range(1..); // waits for any number of events
433 /// ```
434 #[inline]
435 pub fn wait_range<R>(&mut self, range: R) -> EventsMut<'_, E>
436 where
437 R: RangeBounds<usize>,
438 {
439 EventsMut(self.inner.wait_range(range))
440 }
441}
442
443impl<E, W, const LEAD: bool> Producer<E, W, LEAD>
444where
445 W: TryWaitStrategy,
446{
447 /// Wait until exactly `size` number of events are available.
448 ///
449 /// Otherwise, return the wait strategy error.
450 ///
451 /// `size` values larger than the buffer will cause permanent stalls.
452 #[inline]
453 pub fn try_wait(&mut self, size: usize) -> Result<EventsMut<'_, E>, W::Error> {
454 self.inner.try_wait(size).map(EventsMut)
455 }
456
457 /// Wait until a number of events within the given range are available.
458 ///
459 /// If a lower bound is provided, it must be less than the buffer size.
460 ///
461 /// # Examples
462 ///
463 /// ```
464 /// use std::time::Duration;
465 /// use ansa::wait::{Timeout, WaitBusy};
466 ///
467 /// let (mut producer, _) = ansa::spsc(64, || 0);
468 /// let timeout = Timeout::new(Duration::from_millis(1), WaitBusy);
469 /// let mut producer = producer.set_wait_strategy(timeout);
470 ///
471 /// producer.try_wait_range(1..=10)?; // waits for a batch of at most 10 events
472 /// producer.try_wait_range(10..)?; // waits for a batch of at least 10 events
473 /// producer.try_wait_range(1..)?; // waits for any number of events
474 /// # Ok::<(), ansa::wait::TimedOut>(())
475 /// ```
476 #[inline]
477 pub fn try_wait_range<R>(&mut self, range: R) -> Result<EventsMut<'_, E>, W::Error>
478 where
479 R: RangeBounds<usize>,
480 {
481 self.inner.try_wait_range(range).map(EventsMut)
482 }
483}
484
485/// A handle with immutable access to events on the ring buffer.
486///
487/// Can access events concurrently to other handles with immutable access.
488#[derive(Debug)]
489#[repr(transparent)]
490pub struct Consumer<E, W> {
491 inner: HandleInner<E, W, false>,
492}
493
494impl<E, W> Consumer<E, W> {
495 /// Returns the current sequence value for this consumer's cursor.
496 ///
497 /// # Examples
498 /// ```
499 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
500 /// // sequences start at -1, but accesses always occur at the next sequence,
501 /// // so the first accessed sequence will be 0
502 /// assert_eq!(producer.sequence(), -1);
503 ///
504 /// // move the producer up by 10, otherwise the consumer will block the
505 /// // thread while waiting for available events
506 /// producer.wait(10).for_each(|_, _, _| ());
507 /// assert_eq!(producer.sequence(), 9);
508 ///
509 /// // now we can move the consumer
510 /// consumer.wait(5).for_each(|_, _, _| ());
511 /// assert_eq!(consumer.sequence(), 4);
512 /// ```
513 #[inline]
514 pub fn sequence(&self) -> i64 {
515 self.inner.cursor.sequence.load(Ordering::Relaxed)
516 }
517
518 /// Returns the size of the ring buffer.
519 ///
520 /// # Examples
521 /// ```
522 /// let (_, consumer) = ansa::spsc(64, || 0);
523 /// assert_eq!(consumer.buffer_size(), 64);
524 /// ```
525 #[inline]
526 pub fn buffer_size(&self) -> usize {
527 self.inner.buffer_size()
528 }
529
530 /// Returns a new handle with the given `wait_strategy`, consuming this handle.
531 ///
532 /// All other properties of the original handle remain unchanged in the new handle.
533 ///
534 /// Does not alter the wait strategy for any other handle.
535 ///
536 /// # Examples
537 /// ```
538 /// use ansa::Consumer;
539 /// use ansa::wait::WaitBusy;
540 ///
541 /// let (_, consumer) = ansa::spsc(64, || 0u8);
542 /// let _: Consumer<u8, WaitBusy> = consumer.set_wait_strategy(WaitBusy);
543 /// ```
544 #[inline]
545 pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> Consumer<E, W2> {
546 self.inner.set_wait_strategy(wait_strategy).into_consumer()
547 }
548}
549
550impl<E, W> Consumer<E, W>
551where
552 W: WaitStrategy,
553{
554 /// Wait until exactly `size` number of events are available.
555 ///
556 /// `size` values larger than the buffer will cause permanent stalls.
557 #[inline]
558 pub fn wait(&mut self, size: usize) -> Events<'_, E> {
559 Events(self.inner.wait(size))
560 }
561
562 /// Wait until a number of events within the given range are available.
563 ///
564 /// If a lower bound is provided, it must be less than the buffer size.
565 ///
566 /// Any range which starts from 0 or is unbounded enables non-blocking waits for
567 /// [`well-behaved`] [`WaitStrategy`] implementations
568 ///
569 /// # Examples
570 ///
571 /// ```
572 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
573 ///
574 /// let events = consumer.wait_range(..); // wait for any number of events
575 /// assert_eq!(events.size(), 0);
576 ///
577 /// // Whereas this would block the thread waiting for the producer to move.
578 /// // consumer.wait_range(1..);
579 ///
580 /// // move the lead producer
581 /// producer.wait(20).for_each(|_, _, _| ());
582 ///
583 /// let events = consumer.wait_range(..=10); // wait for a batch of at most 10 events
584 /// assert_eq!(events.size(), 10);
585 ///
586 /// let events = consumer.wait_range(10..); // wait for a batch of at least 10 events
587 /// assert_eq!(events.size(), 20);
588 ///
589 /// let events = consumer.wait_range(..);
590 /// assert_eq!(events.size(), 20);
591 /// ```
592 /// Can be used to for non-blocking waits. .. is always non-blocking
593 #[inline]
594 pub fn wait_range<R>(&mut self, range: R) -> Events<'_, E>
595 where
596 R: RangeBounds<usize>,
597 {
598 Events(self.inner.wait_range(range))
599 }
600}
601
602impl<E, W> Consumer<E, W>
603where
604 W: TryWaitStrategy,
605{
606 /// Wait until exactly `size` number of events are available.
607 ///
608 /// Otherwise, return the wait strategy error.
609 ///
610 /// `size` values larger than the buffer will cause permanent stalls.
611 ///
612 /// # Examples
613 ///
614 /// ```
615 /// use std::time::Duration;
616 /// use ansa::wait::{Timeout, WaitBusy};
617 ///
618 /// let (mut producer, consumer) = ansa::spsc(64, || 0u32);
619 /// let timeout = Timeout::new(Duration::from_millis(10), WaitBusy);
620 /// let mut consumer = consumer.set_wait_strategy(timeout);
621 ///
622 /// // Try to wait for 5 events, but timeout if they're not available
623 /// match consumer.try_wait(5) {
624 /// Ok(events) => {
625 /// // Events were available within the timeout
626 /// events.for_each(|event, seq, _| println!("Event {}: {}", seq, event));
627 /// }
628 /// Err(_timeout) => {
629 /// // Timeout occurred - no events were available in time
630 /// println!("No events available within timeout");
631 /// }
632 /// }
633 /// ```
634 #[inline]
635 pub fn try_wait(&mut self, size: usize) -> Result<Events<'_, E>, W::Error> {
636 self.inner.try_wait(size).map(Events)
637 }
638
639 /// Wait until a number of events within the given range are available.
640 ///
641 /// If a lower bound is provided, it must be less than the buffer size.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use std::time::Duration;
647 /// use ansa::wait::{Timeout, WaitBusy};
648 ///
649 /// let (mut producer, consumer) = ansa::spsc(64, || 0);
650 /// // move the lead producer
651 /// producer.wait(20).for_each(|_, _, _| ());
652 ///
653 /// let timeout = Timeout::new(Duration::from_millis(1), WaitBusy);
654 /// let mut consumer = consumer.set_wait_strategy(timeout);
655 ///
656 /// consumer.try_wait_range(1..=10)?; // waits for a batch of at most 10 events
657 /// consumer.try_wait_range(10..)?; // waits for a batch of at least 10 events
658 /// consumer.try_wait_range(1..)?; // waits for any number of events
659 /// # Ok::<(), ansa::wait::TimedOut>(())
660 /// ```
661 #[inline]
662 pub fn try_wait_range<R>(&mut self, range: R) -> Result<Events<'_, E>, W::Error>
663 where
664 R: RangeBounds<usize>,
665 {
666 self.inner.try_wait_range(range).map(Events)
667 }
668}
669
670#[derive(Debug)]
671pub(crate) struct HandleInner<E, W, const LEAD: bool> {
672 pub(crate) cursor: Arc<Cursor>,
673 pub(crate) barrier: Barrier,
674 pub(crate) buffer: Arc<RingBuffer<E>>,
675 pub(crate) wait_strategy: W,
676 pub(crate) available: i64,
677}
678
679impl<E, W> HandleInner<E, W, false> {
680 #[inline]
681 pub(crate) fn into_consumer(self) -> Consumer<E, W> {
682 Consumer { inner: self }
683 }
684}
685
686impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD> {
687 pub(crate) fn new(
688 cursor: Arc<Cursor>,
689 barrier: Barrier,
690 buffer: Arc<RingBuffer<E>>,
691 wait_strategy: W,
692 ) -> Self {
693 let available = if LEAD {
694 // buffer size can always be cast to i64, as allocations cannot be larger than i64::MAX
695 CURSOR_START - (buffer.size() as i64)
696 } else {
697 CURSOR_START
698 };
699 HandleInner {
700 cursor,
701 barrier,
702 buffer,
703 wait_strategy,
704 available,
705 }
706 }
707
708 #[inline]
709 pub(crate) fn into_producer(self) -> Producer<E, W, LEAD> {
710 Producer { inner: self }
711 }
712
713 #[inline]
714 fn buffer_size(&self) -> usize {
715 self.buffer.size()
716 }
717
718 #[inline]
719 fn set_wait_strategy<W2>(self, wait_strategy: W2) -> HandleInner<E, W2, LEAD> {
720 HandleInner {
721 cursor: self.cursor,
722 barrier: self.barrier,
723 buffer: self.buffer,
724 wait_strategy,
725 available: self.available,
726 }
727 }
728
729 #[inline]
730 fn wait_bounds(&self, size: i64) -> (i64, i64) {
731 let from_sequence = self.cursor.sequence.load(Ordering::Relaxed);
732 let batch_end = from_sequence + size;
733 let till_sequence = if LEAD {
734 batch_end - saturate_i64(self.buffer.size())
735 } else {
736 batch_end
737 };
738 (from_sequence, till_sequence)
739 }
740
741 #[inline]
742 fn as_batch(&mut self, from_seq: i64, batch_size: usize) -> Batch<'_, E> {
743 Batch {
744 cursor: &mut self.cursor,
745 buffer: &self.buffer,
746 current: from_seq,
747 size: batch_size,
748 }
749 }
750
751 #[inline]
752 fn range_batch_size(&self, from_seq: i64, end_bound: Bound<&usize>) -> usize {
753 let from = if LEAD {
754 from_seq - saturate_i64(self.buffer.size())
755 } else {
756 from_seq
757 };
758 let available_batch = (self.available - from).unsigned_abs() as usize;
759 match end_bound {
760 Bound::Included(b) => available_batch.min(*b),
761 Bound::Excluded(b) => available_batch.min(b.saturating_sub(1)),
762 Bound::Unbounded => available_batch,
763 }
764 }
765}
766
767impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD>
768where
769 W: WaitStrategy,
770{
771 #[inline]
772 fn wait(&mut self, size: usize) -> Batch<'_, E> {
773 debug_assert!(self.buffer.size() >= size);
774 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
775 if self.available < till_seq {
776 self.available = self.wait_strategy.wait(till_seq, &self.barrier);
777 }
778 debug_assert!(self.available >= till_seq);
779 self.as_batch(from_seq, size)
780 }
781
782 // todo: check that range: 0.. always returns immediately if no sequence available
783 #[inline]
784 fn wait_range<R>(&mut self, range: R) -> Batch<'_, E>
785 where
786 R: RangeBounds<usize>,
787 {
788 let batch_min = match range.start_bound() {
789 Bound::Included(b) => *b,
790 Bound::Excluded(b) => b.saturating_add(1),
791 Bound::Unbounded => 0,
792 };
793 debug_assert!(self.buffer.size() >= batch_min);
794 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(batch_min));
795 if self.available < till_seq.max(1) {
796 self.available = self.wait_strategy.wait(till_seq, &self.barrier);
797 }
798 debug_assert!(self.available >= till_seq);
799 let batch_max = self.range_batch_size(from_seq, range.end_bound());
800 self.as_batch(from_seq, batch_max)
801 }
802}
803
804impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD>
805where
806 W: TryWaitStrategy,
807{
808 #[inline]
809 fn try_wait(&mut self, size: usize) -> Result<Batch<'_, E>, W::Error> {
810 debug_assert!(self.buffer.size() >= size);
811 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
812 if self.available < till_seq {
813 self.available = self.wait_strategy.try_wait(till_seq, &self.barrier)?;
814 }
815 debug_assert!(self.available >= till_seq);
816 Ok(self.as_batch(from_seq, size))
817 }
818
819 #[inline]
820 fn try_wait_range<R>(&mut self, range: R) -> Result<Batch<'_, E>, W::Error>
821 where
822 R: RangeBounds<usize>,
823 {
824 let batch_min = match range.start_bound() {
825 Bound::Included(b) => *b,
826 Bound::Excluded(b) => b.saturating_add(1),
827 Bound::Unbounded => 0,
828 };
829 debug_assert!(self.buffer.size() >= batch_min);
830 let (from_seq, till_seq) = self.wait_bounds(saturate_i64(batch_min));
831 if self.available < till_seq.max(1) {
832 self.available = self.wait_strategy.try_wait(till_seq, &self.barrier)?;
833 }
834 debug_assert!(self.available >= till_seq);
835 let batch_max = self.range_batch_size(from_seq, range.end_bound());
836 Ok(self.as_batch(from_seq, batch_max))
837 }
838}
839
840struct Batch<'a, E> {
841 cursor: &'a mut Arc<Cursor>, // mutable ref ensures handle cannot run another overlapping wait
842 buffer: &'a Arc<RingBuffer<E>>,
843 current: i64,
844 size: usize,
845}
846
847impl<E> Batch<'_, E> {
848 #[inline]
849 fn apply<F>(self, f: F)
850 where
851 F: FnMut(*mut E, i64, bool),
852 {
853 fence(Ordering::Acquire);
854 // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
855 // before this handle has finished interacting with it. Construction of the disruptor
856 // guarantees producers don't overlap with other handles, thus no mutable aliasing.
857 unsafe { self.buffer.apply(self.current + 1, self.size, f) };
858 let seq_end = self.current + saturate_i64(self.size);
859 self.cursor.sequence.store(seq_end, Ordering::Release);
860 }
861
862 #[inline]
863 fn try_apply<F, Err>(self, f: F) -> Result<(), Err>
864 where
865 F: FnMut(*mut E, i64, bool) -> Result<(), Err>,
866 {
867 fence(Ordering::Acquire);
868 // SAFETY: see Batch::apply
869 unsafe { self.buffer.try_apply(self.current + 1, self.size, f)? };
870 let seq_end = self.current + saturate_i64(self.size);
871 self.cursor.sequence.store(seq_end, Ordering::Release);
872 Ok(())
873 }
874
875 #[inline]
876 fn try_commit<F, Err>(self, mut f: F) -> Result<(), Err>
877 where
878 F: FnMut(*mut E, i64, bool) -> Result<(), Err>,
879 {
880 fence(Ordering::Acquire);
881 // SAFETY: see Batch::apply
882 unsafe {
883 self.buffer.try_apply(self.current + 1, self.size, |ptr, seq, end| {
884 f(ptr, seq, end)
885 .inspect_err(|_| self.cursor.sequence.store(seq - 1, Ordering::Release))
886 })?;
887 }
888 let seq_end = self.current + saturate_i64(self.size);
889 self.cursor.sequence.store(seq_end, Ordering::Release);
890 Ok(())
891 }
892}
893
894// todo: elucidate docs with 'in another process' statements, eg. for describing mut alias possibility
895
896/// A batch of events which may be mutably accessed.
897#[repr(transparent)]
898pub struct EventsMut<'a, E>(Batch<'a, E>);
899
900impl<E> EventsMut<'_, E> {
901 /// Returns the size of this batch
902 #[inline]
903 pub fn size(&self) -> usize {
904 self.0.size
905 }
906
907 /// Process a batch of mutable events on the buffer using the closure `f`.
908 ///
909 /// The parameters of `f` are:
910 ///
911 /// - `event: &mut E`, a reference to the buffer element being accessed.
912 /// - `sequence: i64`, the position of this event in the sequence.
913 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
914 ///
915 /// # Examples
916 /// ```
917 /// let (mut producer, _) = ansa::spsc(64, || 0);
918 ///
919 /// producer.wait(10).for_each(|event, seq, _| *event = seq);
920 /// ```
921 #[inline]
922 pub fn for_each<F>(self, mut f: F)
923 where
924 F: FnMut(&mut E, i64, bool),
925 {
926 self.0.apply(|ptr, seq, end| {
927 // SAFETY: deref guaranteed safe by ringbuffer initialisation
928 let event: &mut E = unsafe { &mut *ptr };
929 f(event, seq, end)
930 })
931 }
932
933 /// Try to process a batch of mutable events on the buffer using the closure `f`.
934 ///
935 /// If an error occurs, leaves the cursor sequence unchanged and returns the error.
936 ///
937 /// **Important**: does _not_ undo any mutations to events if an error occurs.
938 ///
939 /// The parameters of `f` are:
940 ///
941 /// - `event: &mut E`, a reference to the buffer element being accessed.
942 /// - `sequence: i64`, the position of this event in the sequence.
943 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
944 ///
945 /// # Examples
946 /// ```
947 /// let (mut producer, _) = ansa::spsc(64, || 0);
948 ///
949 /// producer.wait(10).try_for_each(|_, seq, _| {
950 /// match seq {
951 /// 100 => Err(seq),
952 /// _ => Ok(())
953 /// }
954 /// })?;
955 /// assert_eq!(producer.sequence(), 9);
956 ///
957 /// let result = producer.wait(10).try_for_each(|_, seq, _| {
958 /// match seq {
959 /// 15 => Err(seq),
960 /// _ => Ok(())
961 /// }
962 /// });
963 /// assert_eq!(result, Err(15));
964 /// // If an error is returned, the cursor will not be moved.
965 /// assert_eq!(producer.sequence(), 9);
966 /// # Ok::<(), i64>(())
967 /// ```
968 #[inline]
969 pub fn try_for_each<F, Err>(self, mut f: F) -> Result<(), Err>
970 where
971 F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
972 {
973 self.0.try_apply(|ptr, seq, end| {
974 // SAFETY: deref guaranteed safe by ringbuffer initialisation
975 let event: &mut E = unsafe { &mut *ptr };
976 f(event, seq, end)
977 })
978 }
979
980 /// Try to process a batch of mutable events on the buffer using the closure `f`.
981 ///
982 /// If an error occurs, returns the error and updates the cursor sequence to the position of
983 /// the last successfully processed event. In effect, commits the successful portion of the batch.
984 ///
985 /// **Important**: does _not_ undo any mutations to events if an error occurs.
986 ///
987 /// The parameters of `f` are:
988 ///
989 /// - `event: &mut E`, a reference to the buffer element being accessed.
990 /// - `sequence: i64`, the position of this event in the sequence.
991 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
992 ///
993 /// # Examples
994 /// ```
995 /// let (mut producer, _) = ansa::spsc(64, || 0);
996 ///
997 /// producer.wait(10).try_commit_each(|_, seq, _| {
998 /// match seq {
999 /// 100 => Err(seq),
1000 /// _ => Ok(())
1001 /// }
1002 /// })?;
1003 ///
1004 /// assert_eq!(producer.sequence(), 9);
1005 /// # Ok::<(), i64>(())
1006 /// ```
1007 /// On failure, the cursor will be moved to the last successful sequence.
1008 /// ```
1009 /// let (mut producer, _) = ansa::spsc(64, || 0);
1010 ///
1011 /// let result = producer.wait(10).try_commit_each(|_, seq, _| {
1012 /// match seq {
1013 /// 5 => Err(seq),
1014 /// _ => Ok(())
1015 /// }
1016 /// });
1017 ///
1018 /// assert_eq!(result, Err(5));
1019 /// assert_eq!(producer.sequence(), 4);
1020 /// ```
1021 #[inline]
1022 pub fn try_commit_each<F, Err>(self, mut f: F) -> Result<(), Err>
1023 where
1024 F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
1025 {
1026 self.0.try_commit(|ptr, seq, end| {
1027 // SAFETY: deref guaranteed safe by ringbuffer initialisation
1028 let event: &mut E = unsafe { &mut *ptr };
1029 f(event, seq, end)
1030 })
1031 }
1032}
1033
1034/// A batch of events which may be immutably accessed.
1035#[repr(transparent)]
1036pub struct Events<'a, E>(Batch<'a, E>);
1037
1038impl<E> Events<'_, E> {
1039 /// Returns the size of this batch
1040 #[inline]
1041 pub fn size(&self) -> usize {
1042 self.0.size
1043 }
1044
1045 /// Process a batch of immutable events on the buffer using the closure `f`.
1046 ///
1047 /// The parameters of `f` are:
1048 ///
1049 /// - `event: &E`, a reference to the buffer element being accessed.
1050 /// - `sequence: i64`, the position of this event in the sequence.
1051 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1052 ///
1053 /// # Examples
1054 /// ```
1055 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1056 ///
1057 /// // move the producer so that events are available to the following consumer
1058 /// producer.wait(20).for_each(|_, _, _| ());
1059 ///
1060 /// consumer.wait(10).for_each(|event, seq, _| println!("{seq}: {event}"));
1061 /// ```
1062 #[inline]
1063 pub fn for_each<F>(self, mut f: F)
1064 where
1065 F: FnMut(&E, i64, bool),
1066 {
1067 self.0.apply(|ptr, seq, end| {
1068 // SAFETY: deref guaranteed safe by ringbuffer initialisation
1069 let event: &E = unsafe { &*ptr };
1070 f(event, seq, end)
1071 })
1072 }
1073
1074 /// Try to process a batch of immutable events on the buffer using the closure `f`.
1075 ///
1076 /// If an error occurs, leaves the cursor sequence unchanged and returns the error.
1077 ///
1078 /// The parameters of `f` are:
1079 ///
1080 /// - `event: &E`, a reference to the buffer element being accessed.
1081 /// - `sequence: i64`, the position of this event in the sequence.
1082 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1083 ///
1084 /// # Examples
1085 /// ```
1086 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1087 ///
1088 /// // move the producer so that events are available to the following consumer
1089 /// producer.wait(20).for_each(|_, _, _| ());
1090 ///
1091 /// consumer.wait(10).try_for_each(|_, seq, _| {
1092 /// match seq {
1093 /// 100 => Err(seq),
1094 /// _ => Ok(())
1095 /// }
1096 /// })?;
1097 ///
1098 /// assert_eq!(consumer.sequence(), 9);
1099 /// # Ok::<(), i64>(())
1100 /// ```
1101 /// On failure, the cursor will not be moved.
1102 /// ```
1103 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1104 ///
1105 /// // move the producer so that events are available to the following consumer
1106 /// producer.wait(20).for_each(|_, _, _| ());
1107 ///
1108 /// let result = consumer.wait(10).try_for_each(|_, seq, _| {
1109 /// match seq {
1110 /// 5 => Err(seq),
1111 /// _ => Ok(())
1112 /// }
1113 /// });
1114 ///
1115 /// assert_eq!(result, Err(5));
1116 /// // sequence values start at -1, and the first event is at sequence 0
1117 /// assert_eq!(consumer.sequence(), -1);
1118 /// ```
1119 #[inline]
1120 pub fn try_for_each<F, Err>(self, mut f: F) -> Result<(), Err>
1121 where
1122 F: FnMut(&E, i64, bool) -> Result<(), Err>,
1123 {
1124 self.0.try_apply(|ptr, seq, end| {
1125 // SAFETY: deref guaranteed safe by ringbuffer initialisation
1126 let event: &E = unsafe { &*ptr };
1127 f(event, seq, end)
1128 })
1129 }
1130
1131 /// Try to process a batch of immutable events on the buffer using the closure `f`.
1132 ///
1133 /// If an error occurs, returns the error and updates the cursor sequence to the position of
1134 /// the last successfully processed event. In effect, commits the successful portion of the batch.
1135 ///
1136 /// The parameters of `f` are:
1137 ///
1138 /// - `event: &E`, a reference to the buffer element being accessed.
1139 /// - `sequence: i64`, the position of this event in the sequence.
1140 /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1141 ///
1142 /// # Examples
1143 /// ```
1144 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1145 ///
1146 /// // move the producer so that events are available to the following consumer
1147 /// producer.wait(20).for_each(|_, _, _| ());
1148 ///
1149 /// consumer.wait(10).try_commit_each(|_, seq, _| {
1150 /// match seq {
1151 /// 100 => Err(seq),
1152 /// _ => Ok(())
1153 /// }
1154 /// })?;
1155 ///
1156 /// assert_eq!(consumer.sequence(), 9);
1157 /// # Ok::<(), i64>(())
1158 /// ```
1159 /// On failure, the cursor will be moved to the last successful sequence.
1160 /// ```
1161 /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1162 ///
1163 /// // move the producer so that events are available to the following consumer
1164 /// producer.wait(20).for_each(|_, _, _| ());
1165 ///
1166 /// let result = consumer.wait(10).try_commit_each(|_, seq, _| {
1167 /// match seq {
1168 /// 5 => Err(seq),
1169 /// _ => Ok(())
1170 /// }
1171 /// });
1172 ///
1173 /// assert_eq!(result, Err(5));
1174 /// assert_eq!(consumer.sequence(), 4);
1175 /// ```
1176 #[inline]
1177 pub fn try_commit_each<F, Err>(self, mut f: F) -> Result<(), Err>
1178 where
1179 F: FnMut(&E, i64, bool) -> Result<(), Err>,
1180 {
1181 self.0.try_commit(|ptr, seq, end| {
1182 // SAFETY: deref guaranteed safe by ringbuffer initialisation
1183 let event: &E = unsafe { &*ptr };
1184 f(event, seq, end)
1185 })
1186 }
1187}
1188
1189#[derive(Debug)]
1190#[repr(transparent)]
1191pub(crate) struct Cursor {
1192 #[cfg(not(feature = "cache-padded"))]
1193 sequence: AtomicI64,
1194 #[cfg(feature = "cache-padded")]
1195 sequence: crossbeam_utils::CachePadded<AtomicI64>,
1196}
1197
1198const CURSOR_START: i64 = -1;
1199
1200impl Cursor {
1201 pub(crate) const fn new(seq: i64) -> Self {
1202 Cursor {
1203 #[cfg(not(feature = "cache-padded"))]
1204 sequence: AtomicI64::new(seq),
1205 #[cfg(feature = "cache-padded")]
1206 sequence: crossbeam_utils::CachePadded::new(AtomicI64::new(seq)),
1207 }
1208 }
1209
1210 /// Create a cursor at the start of the sequence. All accesses begin on the _next_ position in
1211 /// the sequence, thus cursors start at `-1`, so that accesses start at `0`.
1212 pub(crate) const fn start() -> Self {
1213 Cursor::new(CURSOR_START)
1214 }
1215}
1216
1217/// A collection of cursors that determine which sequence is available to a handle.
1218///
1219/// Every handle has a barrier, and no handle may overtake its barrier.
1220#[derive(Clone, Debug)]
1221#[repr(transparent)]
1222pub struct Barrier(Barrier_);
1223
1224#[derive(Clone, Debug)]
1225enum Barrier_ {
1226 One(Arc<Cursor>),
1227 Many(Box<[Arc<Cursor>]>),
1228}
1229
1230impl Barrier {
1231 pub(crate) fn one(cursor: Arc<Cursor>) -> Self {
1232 Barrier(Barrier_::One(cursor))
1233 }
1234
1235 pub(crate) fn many(cursors: Box<[Arc<Cursor>]>) -> Self {
1236 Barrier(Barrier_::Many(cursors))
1237 }
1238
1239 /// The position of the barrier.
1240 ///
1241 /// # Examples
1242 /// ```
1243 /// use ansa::{Barrier, wait::WaitStrategy};
1244 ///
1245 /// struct MyBusyWait;
1246 ///
1247 /// // SAFETY: wait returns once barrier seq >= desired_seq, with barrier seq itself
1248 /// unsafe impl WaitStrategy for MyBusyWait {
1249 /// fn wait(&self, desired_seq: i64, barrier: &Barrier) -> i64 {
1250 /// while barrier.sequence() < desired_seq {} // busy-spin
1251 /// barrier.sequence()
1252 /// }
1253 /// }
1254 /// ```
1255 #[inline]
1256 pub fn sequence(&self) -> i64 {
1257 match &self.0 {
1258 Barrier_::One(cursor) => cursor.sequence.load(Ordering::Relaxed),
1259 Barrier_::Many(cursors) => cursors.iter().fold(i64::MAX, |seq, cursor| {
1260 seq.min(cursor.sequence.load(Ordering::Relaxed))
1261 }),
1262 }
1263 }
1264}
1265
1266impl Drop for Barrier {
1267 // We need to break the Arc cycle of barriers. Just get rid of all the Arcs to guarantee this.
1268 fn drop(&mut self) {
1269 self.0 = Barrier_::Many(Box::new([]));
1270 }
1271}
1272
1273#[allow(clippy::cast_possible_truncation)]
1274#[allow(clippy::cast_possible_wrap)]
1275#[inline]
1276const fn saturate_i64(u: usize) -> i64 {
1277 if const { size_of::<usize>() >= 8 } {
1278 // the 64th bit must be zero to avoid negative wrapping when casting to i64
1279 (u & i64::MAX as usize) as i64
1280 } else {
1281 // all usize values fit in i64
1282 u as i64
1283 }
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288 use super::*;
1289 use crate::wait::WaitBusy;
1290
1291 // Check that sizes don't accidentally change. If size change is found and intended, just
1292 // change the values in this test.
1293 #[test]
1294 fn sizes() {
1295 assert_eq!(size_of::<Consumer<u8, WaitBusy>>(), 40);
1296 assert_eq!(size_of::<Producer<u8, WaitBusy, true>>(), 40);
1297 assert_eq!(size_of::<MultiProducer<u8, WaitBusy, true>>(), 48);
1298 }
1299
1300 #[test]
1301 fn test_wait_range() {
1302 let buffer = Arc::new(RingBuffer::from_factory(16, || 0));
1303 let lead_cursor = Arc::new(Cursor::new(8));
1304 let follows_cursor = Arc::new(Cursor::new(4));
1305
1306 let mut lead_handle = HandleInner::<_, _, true>::new(
1307 Arc::clone(&lead_cursor),
1308 Barrier::one(Arc::clone(&follows_cursor)),
1309 Arc::clone(&buffer),
1310 WaitBusy,
1311 );
1312
1313 let mut follows_handle = HandleInner::<_, _, false>::new(
1314 Arc::clone(&follows_cursor),
1315 Barrier::one(Arc::clone(&lead_cursor)),
1316 Arc::clone(&buffer),
1317 WaitBusy,
1318 );
1319
1320 let lead_batch = lead_handle.wait_range(1..);
1321 assert_eq!(lead_batch.current, 8);
1322 assert_eq!(lead_batch.size, 12);
1323
1324 let follows_batch = follows_handle.wait_range(1..);
1325 assert_eq!(follows_batch.current, 4);
1326 assert_eq!(follows_batch.size, 4);
1327 }
1328
1329 #[test]
1330 fn test_size_zero_apply() {
1331 let buffer = Arc::new(RingBuffer::from_factory(16, || 0));
1332 let mut lead_handle = HandleInner::<_, _, true>::new(
1333 Arc::new(Cursor::new(8)),
1334 Barrier::one(Arc::new(Cursor::new(4))),
1335 Arc::clone(&buffer),
1336 WaitBusy,
1337 );
1338
1339 let sequence_before_apply = lead_handle.cursor.sequence.load(Ordering::Relaxed);
1340
1341 let batch = lead_handle.wait(0);
1342 assert_eq!(batch.size, 0);
1343
1344 batch.apply(|_, _, _| assert!(false));
1345 // sequence should not have moved and the apply function should not have been called
1346 let sequence_after_apply = lead_handle.cursor.sequence.load(Ordering::Relaxed);
1347 assert_eq!(sequence_before_apply, sequence_after_apply);
1348 }
1349}