ansa/
handles.rs

1use crate::ringbuffer::RingBuffer;
2use crate::wait::{TryWaitStrategy, WaitStrategy};
3use std::collections::Bound;
4use std::ops::RangeBounds;
5use std::sync::atomic::{fence, AtomicI64, Ordering};
6use std::sync::Arc;
7
8/// A handle with mutable access to events on the ring buffer.
9///
10/// `MultiProducer`s can be cloned to enable distributed writes. Clones coordinate by claiming
11/// non-overlapping ranges of sequence values, which can be writen to in parallel.
12///
13/// Clones of this `MultiProducer` share this producer's cursor.
14///
15/// See: **\[INSERT LINK]** for alternative methods of parallelizing producers without using
16/// `MultiProducer`.
17///
18/// # Examples
19/// ```
20/// use ansa::{DisruptorBuilder, Follows, Handle};
21///
22/// let mut handles = DisruptorBuilder::new(64, || 0)
23///     .add_handle(0, Handle::Producer, Follows::LeadProducer)
24///     .build()?;
25///
26/// // lead and trailing are separate handles with separate cursors and clones
27/// let lead = handles.take_lead().unwrap().into_multi();
28/// let trailing = handles.take_producer(0).unwrap().into_multi();
29///
30/// let lead_clone = lead.clone();
31///
32/// assert_eq!(lead.count(), 2);
33/// assert_eq!(trailing.count(), 1);
34/// # Ok::<(), ansa::BuildError>(())
35/// ```
36///
37/// # Limitations
38///
39/// The `MultiProducer` API is quite limited in comparison to either [`Producers`](Producer) or
40/// [`Consumers`](Consumer). This is due to the mechanism used to coordinate `MultiProducer` clones.
41///
42/// TL;DR, the `MultiProducer` limitations are:
43/// 1) `wait_range` method cannot be implemented.
44/// 2) Separate `wait` and `apply` methods cannot be conveniently provided.
45/// 3) The `try_wait_apply` method will always move the cursor up by the requested batch size,
46///    even if an error occurs.
47///
48/// As soon as a `MultiProducer` clone claims and waits for available sequences, it is commited to
49/// moving the shared cursor up to the end of the claimed batch, regardless of whether either
50/// waiting for or processing the batch is successful or not.
51///
52/// This commitment requires that batches always be completed (and the cursor moved), even for
53/// fallible methods where errors might occur. Importantly, this means that handles which follow a
54/// `MultiProducer` cannot assume, e.g., that an event was successfully written based only on its
55/// sequence becoming available.
56///
57/// Another consequence is that batches cannot be sized dynamically depending on what sequences are
58/// actually available, as the bounds of the claim must be used. Hence, `wait_range` cannot be
59/// implemented.
60///
61/// Lastly, separate `wait` methods cannot be provided, because [`EventsMut`] structs cannot be
62/// guaranteed to operate correctly in user code. If an `EventsMut` is dropped without an apply
63/// method being called, then the claim it was generated from will go unused, ultimately causing
64/// permanent disruptor stalls. Instead, combined wait and apply methods are provided.
65#[derive(Debug)]
66pub struct MultiProducer<E, W, const LEAD: bool> {
67    inner: HandleInner<E, W, LEAD>,
68    claim: Arc<Cursor>, // shared between all clones of this multi producer
69}
70
71impl<E, W, const LEAD: bool> Clone for MultiProducer<E, W, LEAD>
72where
73    W: Clone,
74{
75    fn clone(&self) -> Self {
76        MultiProducer {
77            inner: HandleInner {
78                cursor: Arc::clone(&self.inner.cursor),
79                barrier: self.inner.barrier.clone(),
80                buffer: Arc::clone(&self.inner.buffer),
81                wait_strategy: self.inner.wait_strategy.clone(),
82                available: self.inner.available,
83            },
84            claim: Arc::clone(&self.claim),
85        }
86    }
87}
88
89impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD> {
90    /// Returns `true` if this producer is the lead producer.
91    ///
92    /// # Examples
93    /// ```
94    /// use ansa::*;
95    ///
96    /// let mut handles = DisruptorBuilder::new(64, || 0)
97    ///     .add_handle(0, Handle::Producer, Follows::LeadProducer)
98    ///     .build()?;
99    ///
100    /// let lead_multi = handles.take_lead().unwrap().into_multi();
101    /// let follow_multi = handles.take_producer(0).unwrap().into_multi();
102    ///
103    /// assert_eq!(lead_multi.is_lead(), true);
104    /// assert_eq!(follow_multi.is_lead(), false);
105    /// # Ok::<(), BuildError>(())
106    /// ```
107    pub const fn is_lead(&self) -> bool {
108        LEAD
109    }
110
111    /// Returns the count of [`MultiProducer`]s associated with this cursor.
112    ///
113    /// **Important:** Care should be taken when performing actions based upon this number, as any
114    /// thread which holds an associated [`MultiProducer`] may clone it at any time, thereby
115    /// changing the count.
116    ///
117    /// # Examples
118    /// ```
119    /// let (multi, _) = ansa::mpsc(64, || 0);
120    /// assert_eq!(multi.count(), 1);
121    ///
122    /// let multi_2 = multi.clone();
123    /// assert_eq!(multi.count(), 2);
124    /// // consume a `MultiProducer` by attempting the conversion into a `Producer`
125    /// assert!(matches!(multi.into_producer(), None));
126    /// assert_eq!(multi_2.count(), 1);
127    /// ```
128    #[inline]
129    pub fn count(&self) -> usize {
130        Arc::strong_count(&self.claim)
131    }
132
133    /// Returns the current sequence value for this producer's cursor.
134    ///
135    /// **Important:** The cursor for a `MultiProducer` is shared by all of its clones. Any of
136    /// these clones could alter this value at any time, if they are writing to the buffer.
137    ///
138    /// # Examples
139    /// ```
140    /// let (mut multi, _) = ansa::mpsc(64, || 0);
141    /// // sequences start at -1, but accesses always occur at the next sequence,
142    /// // so the first accessed sequence will be 0
143    /// assert_eq!(multi.sequence(), -1);
144    ///
145    /// // move the producer cursor up by 10
146    /// multi.wait_for_each(10, |_, _, _| ());
147    /// assert_eq!(multi.sequence(), 9);
148    ///
149    /// // clone and move only the clone
150    /// let mut clone = multi.clone();
151    /// clone.wait_for_each(10, |_, _, _| ());
152    ///
153    /// // Both have moved because all clones share the same cursor
154    /// assert_eq!(multi.sequence(), 19);
155    /// assert_eq!(clone.sequence(), 19);
156    /// ```
157    #[inline]
158    pub fn sequence(&self) -> i64 {
159        self.inner.cursor.sequence.load(Ordering::Relaxed)
160    }
161
162    /// Returns the size of the ring buffer.
163    ///
164    /// # Examples
165    /// ```
166    /// let (producer, _) = ansa::mpsc(64, || 0);
167    /// assert_eq!(producer.buffer_size(), 64);
168    /// ```
169    #[inline]
170    pub fn buffer_size(&self) -> usize {
171        self.inner.buffer_size()
172    }
173
174    /// Set the wait strategy for this `MultiProducer` clone.
175    ///
176    /// Does not alter the wait strategy for any other handle, _including_ other clones of this
177    /// `MultiProducer`.
178    ///
179    /// # Examples
180    /// ```
181    /// use ansa::MultiProducer;
182    /// use ansa::wait::{WaitBusy, WaitPhased, WaitSleep};
183    ///
184    /// let (multi, _) = ansa::mpsc(64, || 0u8);
185    /// let clone = multi.clone();
186    /// // multi changes its strategy
187    /// let _: MultiProducer<u8, WaitBusy, true> = multi.set_wait_strategy(WaitBusy);
188    /// // clone keeps the original wait strategy
189    /// let _: MultiProducer<u8, WaitPhased<WaitSleep>, true> = clone;
190    /// ```
191    #[inline]
192    pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> MultiProducer<E, W2, LEAD> {
193        MultiProducer {
194            inner: self.inner.set_wait_strategy(wait_strategy),
195            claim: self.claim,
196        }
197    }
198
199    /// Return a [`Producer`] if exactly one `MultiProducer` exists for this cursor.
200    ///
201    /// Otherwise, return `None` and drops this producer.
202    ///
203    /// If this function is called when only one `MultiProducer` exists, then it is guaranteed to
204    /// return a `Producer`.
205    ///
206    /// # Examples
207    /// ```
208    /// let (multi, _) = ansa::mpsc(64, || 0);
209    /// let multi_clone = multi.clone();
210    ///
211    /// assert!(matches!(multi.into_producer(), None));
212    /// assert!(matches!(multi_clone.into_producer(), Some(ansa::Producer { .. })));
213    /// ```
214    #[inline]
215    pub fn into_producer(self) -> Option<Producer<E, W, LEAD>> {
216        Arc::into_inner(self.claim).map(|_| self.inner.into_producer())
217    }
218
219    #[inline]
220    fn wait_bounds(&self, size: i64) -> (i64, i64) {
221        let mut current_claim = self.claim.sequence.load(Ordering::Relaxed);
222        let mut claim_end = current_claim + size;
223        while let Err(new_current) = self.claim.sequence.compare_exchange(
224            current_claim,
225            claim_end,
226            Ordering::AcqRel,
227            Ordering::Relaxed,
228        ) {
229            current_claim = new_current;
230            claim_end = new_current + size;
231        }
232        let desired_seq = if LEAD {
233            claim_end - saturate_i64(self.inner.buffer.size())
234        } else {
235            claim_end
236        };
237        (current_claim, desired_seq)
238    }
239
240    #[inline]
241    fn update_cursor(&self, start: i64, end: i64) {
242        while self.inner.cursor.sequence.load(Ordering::Acquire) != start {
243            std::hint::spin_loop();
244        }
245        self.inner.cursor.sequence.store(end, Ordering::Release)
246    }
247}
248
249impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD>
250where
251    W: WaitStrategy,
252{
253    /// Wait for and process a batch of exactly `size` number of events.
254    ///
255    /// `size` values larger than the buffer will cause permanent stalls.
256    #[inline]
257    pub fn wait_for_each<F>(&mut self, size: usize, mut f: F)
258    where
259        F: FnMut(&mut E, i64, bool),
260    {
261        debug_assert!(size <= self.inner.buffer.size());
262        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
263        if self.inner.available < till_seq {
264            self.inner.available = self.inner.wait_strategy.wait(till_seq, &self.inner.barrier);
265        }
266        debug_assert!(self.inner.available >= till_seq);
267        fence(Ordering::Acquire);
268        // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
269        // before this handle has finished interacting with it. Construction of the disruptor
270        // guarantees producers don't overlap with other handles, thus no mutable aliasing.
271        unsafe {
272            self.inner.buffer.apply(from_seq + 1, size, |ptr, seq, end| {
273                // SAFETY: deref guaranteed safe by ringbuffer initialisation
274                let event: &mut E = &mut *ptr;
275                f(event, seq, end)
276            })
277        };
278        // Ordering::Release performed by this method
279        self.update_cursor(from_seq, from_seq + saturate_i64(size))
280    }
281}
282
283impl<E, W, const LEAD: bool> MultiProducer<E, W, LEAD>
284where
285    W: TryWaitStrategy,
286{
287    /// Wait for and process a batch of exactly `size` number of events.
288    ///
289    /// If waiting fails, returns the wait error, `W::Error`, which must be convertible to `Err`.
290    ///
291    /// If processing the batch fails, returns `Err`.
292    ///
293    /// `size` values larger than the buffer will cause permanent stalls.
294    #[inline]
295    pub fn try_wait_for_each<F, Err>(&mut self, size: usize, mut f: F) -> Result<(), Err>
296    where
297        F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
298        Err: From<W::Error>,
299    {
300        debug_assert!(size <= self.inner.buffer.size());
301        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
302        fence(Ordering::Acquire);
303        if self.inner.available < till_seq {
304            self.inner.available = self
305                .inner
306                .wait_strategy
307                .try_wait(till_seq, &self.inner.barrier)
308                .inspect_err(|_| self.update_cursor(from_seq, from_seq + saturate_i64(size)))?;
309        }
310        debug_assert!(self.inner.available >= till_seq);
311        // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
312        // before this handle has finished interacting with it. Construction of the disruptor
313        // guarantees producers don't overlap with other handles, thus no mutable aliasing.
314        let result = unsafe {
315            self.inner.buffer.try_apply(from_seq + 1, size, |ptr, seq, end| {
316                // SAFETY: deref guaranteed safe by ringbuffer initialisation
317                let event: &mut E = &mut *ptr;
318                f(event, seq, end)
319            })
320        };
321        // Ordering::Release performed by this method
322        self.update_cursor(from_seq, from_seq + saturate_i64(size));
323        result
324    }
325}
326
327/// A handle with mutable access to events on the ring buffer.
328///
329/// Cannot access events concurrently with other handles.
330#[derive(Debug)]
331#[repr(transparent)]
332pub struct Producer<E, W, const LEAD: bool> {
333    inner: HandleInner<E, W, LEAD>,
334}
335
336impl<E, W, const LEAD: bool> Producer<E, W, LEAD> {
337    /// Returns `true` if this producer is the lead producer.
338    pub const fn is_lead(&self) -> bool {
339        LEAD
340    }
341
342    /// Returns the current sequence value for this producer's cursor.
343    ///
344    /// # Examples
345    /// ```
346    /// let (mut producer, _) = ansa::spsc(64, || 0);
347    /// // sequences start at -1, but accesses always occur at the next sequence,
348    /// // so the first accessed sequence will be 0
349    /// assert_eq!(producer.sequence(), -1);
350    ///
351    /// // move the producer up by 10
352    /// producer.wait(10).for_each(|_, _, _| ());
353    /// assert_eq!(producer.sequence(), 9);
354    /// ```
355    #[inline]
356    pub fn sequence(&self) -> i64 {
357        self.inner.cursor.sequence.load(Ordering::Relaxed)
358    }
359
360    /// Returns the size of the ring buffer.
361    ///
362    /// # Examples
363    /// ```
364    /// let (producer, _) = ansa::spsc(64, || 0);
365    /// assert_eq!(producer.buffer_size(), 64);
366    /// ```
367    #[inline]
368    pub fn buffer_size(&self) -> usize {
369        self.inner.buffer_size()
370    }
371
372    /// Returns a new handle with the given `wait_strategy`, consuming this handle.
373    ///
374    /// All other properties of the original handle remain unchanged in the new handle.
375    ///
376    /// Does not alter the wait strategy for any other handle.
377    ///
378    /// # Examples
379    /// ```
380    /// use ansa::Producer;
381    /// use ansa::wait::WaitBusy;
382    ///
383    /// let (producer, _) = ansa::spsc(64, || 0u8);
384    /// let _: Producer<u8, WaitBusy, true> = producer.set_wait_strategy(WaitBusy);
385    /// ```
386    #[inline]
387    pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> Producer<E, W2, LEAD> {
388        self.inner.set_wait_strategy(wait_strategy).into_producer()
389    }
390
391    /// Converts this `Producer` into a [`MultiProducer`].
392    ///
393    /// # Examples
394    /// ```
395    /// let (producer, _) = ansa::spsc(64, || 0);
396    ///
397    /// assert!(matches!(producer.into_multi(), ansa::MultiProducer { .. }))
398    /// ```
399    #[inline]
400    pub fn into_multi(self) -> MultiProducer<E, W, LEAD> {
401        let producer_seq = self.sequence();
402        MultiProducer {
403            inner: self.inner,
404            claim: Arc::new(Cursor::new(producer_seq)),
405        }
406    }
407}
408
409impl<E, W, const LEAD: bool> Producer<E, W, LEAD>
410where
411    W: WaitStrategy,
412{
413    /// Wait until exactly `size` number of events are available.
414    ///
415    /// `size` values larger than the buffer will cause permanent stalls.
416    #[inline]
417    pub fn wait(&mut self, size: usize) -> EventsMut<'_, E> {
418        EventsMut(self.inner.wait(size))
419    }
420
421    /// Wait until a number of events within the given range are available.
422    ///
423    /// If a lower bound is provided, it must be less than the buffer size.
424    ///
425    /// # Examples
426    ///
427    /// ```
428    /// let (mut producer, _) = ansa::spsc(64, || 0);
429    ///
430    /// producer.wait_range(1..=10); // waits for a batch of at most 10 events
431    /// producer.wait_range(10..); // waits for a batch of at least 10 events
432    /// producer.wait_range(1..); // waits for any number of events
433    /// ```
434    #[inline]
435    pub fn wait_range<R>(&mut self, range: R) -> EventsMut<'_, E>
436    where
437        R: RangeBounds<usize>,
438    {
439        EventsMut(self.inner.wait_range(range))
440    }
441}
442
443impl<E, W, const LEAD: bool> Producer<E, W, LEAD>
444where
445    W: TryWaitStrategy,
446{
447    /// Wait until exactly `size` number of events are available.
448    ///
449    /// Otherwise, return the wait strategy error.
450    ///
451    /// `size` values larger than the buffer will cause permanent stalls.
452    #[inline]
453    pub fn try_wait(&mut self, size: usize) -> Result<EventsMut<'_, E>, W::Error> {
454        self.inner.try_wait(size).map(EventsMut)
455    }
456
457    /// Wait until a number of events within the given range are available.
458    ///
459    /// If a lower bound is provided, it must be less than the buffer size.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// use std::time::Duration;
465    /// use ansa::wait::{Timeout, WaitBusy};
466    ///
467    /// let (mut producer, _) = ansa::spsc(64, || 0);
468    /// let timeout = Timeout::new(Duration::from_millis(1), WaitBusy);
469    /// let mut producer = producer.set_wait_strategy(timeout);
470    ///
471    /// producer.try_wait_range(1..=10)?; // waits for a batch of at most 10 events
472    /// producer.try_wait_range(10..)?; // waits for a batch of at least 10 events
473    /// producer.try_wait_range(1..)?; // waits for any number of events
474    /// # Ok::<(), ansa::wait::TimedOut>(())
475    /// ```
476    #[inline]
477    pub fn try_wait_range<R>(&mut self, range: R) -> Result<EventsMut<'_, E>, W::Error>
478    where
479        R: RangeBounds<usize>,
480    {
481        self.inner.try_wait_range(range).map(EventsMut)
482    }
483}
484
485/// A handle with immutable access to events on the ring buffer.
486///
487/// Can access events concurrently to other handles with immutable access.
488#[derive(Debug)]
489#[repr(transparent)]
490pub struct Consumer<E, W> {
491    inner: HandleInner<E, W, false>,
492}
493
494impl<E, W> Consumer<E, W> {
495    /// Returns the current sequence value for this consumer's cursor.
496    ///
497    /// # Examples
498    /// ```
499    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
500    /// // sequences start at -1, but accesses always occur at the next sequence,
501    /// // so the first accessed sequence will be 0
502    /// assert_eq!(producer.sequence(), -1);
503    ///
504    /// // move the producer up by 10, otherwise the consumer will block the
505    /// // thread while waiting for available events
506    /// producer.wait(10).for_each(|_, _, _| ());
507    /// assert_eq!(producer.sequence(), 9);
508    ///
509    /// // now we can move the consumer
510    /// consumer.wait(5).for_each(|_, _, _| ());
511    /// assert_eq!(consumer.sequence(), 4);
512    /// ```
513    #[inline]
514    pub fn sequence(&self) -> i64 {
515        self.inner.cursor.sequence.load(Ordering::Relaxed)
516    }
517
518    /// Returns the size of the ring buffer.
519    ///
520    /// # Examples
521    /// ```
522    /// let (_, consumer) = ansa::spsc(64, || 0);
523    /// assert_eq!(consumer.buffer_size(), 64);
524    /// ```
525    #[inline]
526    pub fn buffer_size(&self) -> usize {
527        self.inner.buffer_size()
528    }
529
530    /// Returns a new handle with the given `wait_strategy`, consuming this handle.
531    ///
532    /// All other properties of the original handle remain unchanged in the new handle.
533    ///
534    /// Does not alter the wait strategy for any other handle.
535    ///
536    /// # Examples
537    /// ```
538    /// use ansa::Consumer;
539    /// use ansa::wait::WaitBusy;
540    ///
541    /// let (_, consumer) = ansa::spsc(64, || 0u8);
542    /// let _: Consumer<u8, WaitBusy> = consumer.set_wait_strategy(WaitBusy);
543    /// ```
544    #[inline]
545    pub fn set_wait_strategy<W2>(self, wait_strategy: W2) -> Consumer<E, W2> {
546        self.inner.set_wait_strategy(wait_strategy).into_consumer()
547    }
548}
549
550impl<E, W> Consumer<E, W>
551where
552    W: WaitStrategy,
553{
554    /// Wait until exactly `size` number of events are available.
555    ///
556    /// `size` values larger than the buffer will cause permanent stalls.
557    #[inline]
558    pub fn wait(&mut self, size: usize) -> Events<'_, E> {
559        Events(self.inner.wait(size))
560    }
561
562    /// Wait until a number of events within the given range are available.
563    ///
564    /// If a lower bound is provided, it must be less than the buffer size.
565    ///
566    /// Any range which starts from 0 or is unbounded enables non-blocking waits for
567    /// [`well-behaved`] [`WaitStrategy`] implementations
568    ///
569    /// # Examples
570    ///
571    /// ```
572    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
573    ///
574    /// let events = consumer.wait_range(..); // wait for any number of events
575    /// assert_eq!(events.size(), 0);
576    ///
577    /// // Whereas this would block the thread waiting for the producer to move.
578    /// // consumer.wait_range(1..);
579    ///
580    /// // move the lead producer
581    /// producer.wait(20).for_each(|_, _, _| ());
582    ///
583    /// let events = consumer.wait_range(..=10); // wait for a batch of at most 10 events
584    /// assert_eq!(events.size(), 10);
585    ///
586    /// let events = consumer.wait_range(10..); // wait for a batch of at least 10 events
587    /// assert_eq!(events.size(), 20);
588    ///
589    /// let events = consumer.wait_range(..);
590    /// assert_eq!(events.size(), 20);
591    /// ```
592    /// Can be used to for non-blocking waits. .. is always non-blocking
593    #[inline]
594    pub fn wait_range<R>(&mut self, range: R) -> Events<'_, E>
595    where
596        R: RangeBounds<usize>,
597    {
598        Events(self.inner.wait_range(range))
599    }
600}
601
602impl<E, W> Consumer<E, W>
603where
604    W: TryWaitStrategy,
605{
606    /// Wait until exactly `size` number of events are available.
607    ///
608    /// Otherwise, return the wait strategy error.
609    ///
610    /// `size` values larger than the buffer will cause permanent stalls.
611    ///
612    /// # Examples
613    ///
614    /// ```
615    /// use std::time::Duration;
616    /// use ansa::wait::{Timeout, WaitBusy};
617    ///
618    /// let (mut producer, consumer) = ansa::spsc(64, || 0u32);
619    /// let timeout = Timeout::new(Duration::from_millis(10), WaitBusy);
620    /// let mut consumer = consumer.set_wait_strategy(timeout);
621    ///
622    /// // Try to wait for 5 events, but timeout if they're not available
623    /// match consumer.try_wait(5) {
624    ///     Ok(events) => {
625    ///         // Events were available within the timeout
626    ///         events.for_each(|event, seq, _| println!("Event {}: {}", seq, event));
627    ///     }
628    ///     Err(_timeout) => {
629    ///         // Timeout occurred - no events were available in time
630    ///         println!("No events available within timeout");
631    ///     }
632    /// }
633    /// ```
634    #[inline]
635    pub fn try_wait(&mut self, size: usize) -> Result<Events<'_, E>, W::Error> {
636        self.inner.try_wait(size).map(Events)
637    }
638
639    /// Wait until a number of events within the given range are available.
640    ///
641    /// If a lower bound is provided, it must be less than the buffer size.
642    ///
643    /// # Examples
644    ///
645    /// ```
646    /// use std::time::Duration;
647    /// use ansa::wait::{Timeout, WaitBusy};
648    ///
649    /// let (mut producer, consumer) = ansa::spsc(64, || 0);
650    /// // move the lead producer
651    /// producer.wait(20).for_each(|_, _, _| ());
652    ///
653    /// let timeout = Timeout::new(Duration::from_millis(1), WaitBusy);
654    /// let mut consumer = consumer.set_wait_strategy(timeout);
655    ///
656    /// consumer.try_wait_range(1..=10)?; // waits for a batch of at most 10 events
657    /// consumer.try_wait_range(10..)?; // waits for a batch of at least 10 events
658    /// consumer.try_wait_range(1..)?; // waits for any number of events
659    /// # Ok::<(), ansa::wait::TimedOut>(())
660    /// ```
661    #[inline]
662    pub fn try_wait_range<R>(&mut self, range: R) -> Result<Events<'_, E>, W::Error>
663    where
664        R: RangeBounds<usize>,
665    {
666        self.inner.try_wait_range(range).map(Events)
667    }
668}
669
670#[derive(Debug)]
671pub(crate) struct HandleInner<E, W, const LEAD: bool> {
672    pub(crate) cursor: Arc<Cursor>,
673    pub(crate) barrier: Barrier,
674    pub(crate) buffer: Arc<RingBuffer<E>>,
675    pub(crate) wait_strategy: W,
676    pub(crate) available: i64,
677}
678
679impl<E, W> HandleInner<E, W, false> {
680    #[inline]
681    pub(crate) fn into_consumer(self) -> Consumer<E, W> {
682        Consumer { inner: self }
683    }
684}
685
686impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD> {
687    pub(crate) fn new(
688        cursor: Arc<Cursor>,
689        barrier: Barrier,
690        buffer: Arc<RingBuffer<E>>,
691        wait_strategy: W,
692    ) -> Self {
693        let available = if LEAD {
694            // buffer size can always be cast to i64, as allocations cannot be larger than i64::MAX
695            CURSOR_START - (buffer.size() as i64)
696        } else {
697            CURSOR_START
698        };
699        HandleInner {
700            cursor,
701            barrier,
702            buffer,
703            wait_strategy,
704            available,
705        }
706    }
707
708    #[inline]
709    pub(crate) fn into_producer(self) -> Producer<E, W, LEAD> {
710        Producer { inner: self }
711    }
712
713    #[inline]
714    fn buffer_size(&self) -> usize {
715        self.buffer.size()
716    }
717
718    #[inline]
719    fn set_wait_strategy<W2>(self, wait_strategy: W2) -> HandleInner<E, W2, LEAD> {
720        HandleInner {
721            cursor: self.cursor,
722            barrier: self.barrier,
723            buffer: self.buffer,
724            wait_strategy,
725            available: self.available,
726        }
727    }
728
729    #[inline]
730    fn wait_bounds(&self, size: i64) -> (i64, i64) {
731        let from_sequence = self.cursor.sequence.load(Ordering::Relaxed);
732        let batch_end = from_sequence + size;
733        let till_sequence = if LEAD {
734            batch_end - saturate_i64(self.buffer.size())
735        } else {
736            batch_end
737        };
738        (from_sequence, till_sequence)
739    }
740
741    #[inline]
742    fn as_batch(&mut self, from_seq: i64, batch_size: usize) -> Batch<'_, E> {
743        Batch {
744            cursor: &mut self.cursor,
745            buffer: &self.buffer,
746            current: from_seq,
747            size: batch_size,
748        }
749    }
750
751    #[inline]
752    fn range_batch_size(&self, from_seq: i64, end_bound: Bound<&usize>) -> usize {
753        let from = if LEAD {
754            from_seq - saturate_i64(self.buffer.size())
755        } else {
756            from_seq
757        };
758        let available_batch = (self.available - from).unsigned_abs() as usize;
759        match end_bound {
760            Bound::Included(b) => available_batch.min(*b),
761            Bound::Excluded(b) => available_batch.min(b.saturating_sub(1)),
762            Bound::Unbounded => available_batch,
763        }
764    }
765}
766
767impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD>
768where
769    W: WaitStrategy,
770{
771    #[inline]
772    fn wait(&mut self, size: usize) -> Batch<'_, E> {
773        debug_assert!(self.buffer.size() >= size);
774        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
775        if self.available < till_seq {
776            self.available = self.wait_strategy.wait(till_seq, &self.barrier);
777        }
778        debug_assert!(self.available >= till_seq);
779        self.as_batch(from_seq, size)
780    }
781
782    // todo: check that range: 0.. always returns immediately if no sequence available
783    #[inline]
784    fn wait_range<R>(&mut self, range: R) -> Batch<'_, E>
785    where
786        R: RangeBounds<usize>,
787    {
788        let batch_min = match range.start_bound() {
789            Bound::Included(b) => *b,
790            Bound::Excluded(b) => b.saturating_add(1),
791            Bound::Unbounded => 0,
792        };
793        debug_assert!(self.buffer.size() >= batch_min);
794        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(batch_min));
795        if self.available < till_seq.max(1) {
796            self.available = self.wait_strategy.wait(till_seq, &self.barrier);
797        }
798        debug_assert!(self.available >= till_seq);
799        let batch_max = self.range_batch_size(from_seq, range.end_bound());
800        self.as_batch(from_seq, batch_max)
801    }
802}
803
804impl<E, W, const LEAD: bool> HandleInner<E, W, LEAD>
805where
806    W: TryWaitStrategy,
807{
808    #[inline]
809    fn try_wait(&mut self, size: usize) -> Result<Batch<'_, E>, W::Error> {
810        debug_assert!(self.buffer.size() >= size);
811        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(size));
812        if self.available < till_seq {
813            self.available = self.wait_strategy.try_wait(till_seq, &self.barrier)?;
814        }
815        debug_assert!(self.available >= till_seq);
816        Ok(self.as_batch(from_seq, size))
817    }
818
819    #[inline]
820    fn try_wait_range<R>(&mut self, range: R) -> Result<Batch<'_, E>, W::Error>
821    where
822        R: RangeBounds<usize>,
823    {
824        let batch_min = match range.start_bound() {
825            Bound::Included(b) => *b,
826            Bound::Excluded(b) => b.saturating_add(1),
827            Bound::Unbounded => 0,
828        };
829        debug_assert!(self.buffer.size() >= batch_min);
830        let (from_seq, till_seq) = self.wait_bounds(saturate_i64(batch_min));
831        if self.available < till_seq.max(1) {
832            self.available = self.wait_strategy.try_wait(till_seq, &self.barrier)?;
833        }
834        debug_assert!(self.available >= till_seq);
835        let batch_max = self.range_batch_size(from_seq, range.end_bound());
836        Ok(self.as_batch(from_seq, batch_max))
837    }
838}
839
840struct Batch<'a, E> {
841    cursor: &'a mut Arc<Cursor>, // mutable ref ensures handle cannot run another overlapping wait
842    buffer: &'a Arc<RingBuffer<E>>,
843    current: i64,
844    size: usize,
845}
846
847impl<E> Batch<'_, E> {
848    #[inline]
849    fn apply<F>(self, f: F)
850    where
851        F: FnMut(*mut E, i64, bool),
852    {
853        fence(Ordering::Acquire);
854        // SAFETY: Acquire-Release barrier ensures following handles cannot access this sequence
855        // before this handle has finished interacting with it. Construction of the disruptor
856        // guarantees producers don't overlap with other handles, thus no mutable aliasing.
857        unsafe { self.buffer.apply(self.current + 1, self.size, f) };
858        let seq_end = self.current + saturate_i64(self.size);
859        self.cursor.sequence.store(seq_end, Ordering::Release);
860    }
861
862    #[inline]
863    fn try_apply<F, Err>(self, f: F) -> Result<(), Err>
864    where
865        F: FnMut(*mut E, i64, bool) -> Result<(), Err>,
866    {
867        fence(Ordering::Acquire);
868        // SAFETY: see Batch::apply
869        unsafe { self.buffer.try_apply(self.current + 1, self.size, f)? };
870        let seq_end = self.current + saturate_i64(self.size);
871        self.cursor.sequence.store(seq_end, Ordering::Release);
872        Ok(())
873    }
874
875    #[inline]
876    fn try_commit<F, Err>(self, mut f: F) -> Result<(), Err>
877    where
878        F: FnMut(*mut E, i64, bool) -> Result<(), Err>,
879    {
880        fence(Ordering::Acquire);
881        // SAFETY: see Batch::apply
882        unsafe {
883            self.buffer.try_apply(self.current + 1, self.size, |ptr, seq, end| {
884                f(ptr, seq, end)
885                    .inspect_err(|_| self.cursor.sequence.store(seq - 1, Ordering::Release))
886            })?;
887        }
888        let seq_end = self.current + saturate_i64(self.size);
889        self.cursor.sequence.store(seq_end, Ordering::Release);
890        Ok(())
891    }
892}
893
894// todo: elucidate docs with 'in another process' statements, eg. for describing mut alias possibility
895
896/// A batch of events which may be mutably accessed.
897#[repr(transparent)]
898pub struct EventsMut<'a, E>(Batch<'a, E>);
899
900impl<E> EventsMut<'_, E> {
901    /// Returns the size of this batch
902    #[inline]
903    pub fn size(&self) -> usize {
904        self.0.size
905    }
906
907    /// Process a batch of mutable events on the buffer using the closure `f`.
908    ///
909    /// The parameters of `f` are:
910    ///
911    /// - `event: &mut E`, a reference to the buffer element being accessed.
912    /// - `sequence: i64`, the position of this event in the sequence.
913    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
914    ///
915    /// # Examples
916    /// ```
917    /// let (mut producer, _) = ansa::spsc(64, || 0);
918    ///
919    /// producer.wait(10).for_each(|event, seq, _| *event = seq);
920    /// ```
921    #[inline]
922    pub fn for_each<F>(self, mut f: F)
923    where
924        F: FnMut(&mut E, i64, bool),
925    {
926        self.0.apply(|ptr, seq, end| {
927            // SAFETY: deref guaranteed safe by ringbuffer initialisation
928            let event: &mut E = unsafe { &mut *ptr };
929            f(event, seq, end)
930        })
931    }
932
933    /// Try to process a batch of mutable events on the buffer using the closure `f`.
934    ///
935    /// If an error occurs, leaves the cursor sequence unchanged and returns the error.
936    ///
937    /// **Important**: does _not_ undo any mutations to events if an error occurs.
938    ///
939    /// The parameters of `f` are:
940    ///
941    /// - `event: &mut E`, a reference to the buffer element being accessed.
942    /// - `sequence: i64`, the position of this event in the sequence.
943    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
944    ///
945    /// # Examples
946    /// ```
947    /// let (mut producer, _) = ansa::spsc(64, || 0);
948    ///
949    /// producer.wait(10).try_for_each(|_, seq, _| {
950    ///     match seq {
951    ///         100 => Err(seq),
952    ///         _ => Ok(())
953    ///     }
954    /// })?;
955    /// assert_eq!(producer.sequence(), 9);
956    ///
957    /// let result = producer.wait(10).try_for_each(|_, seq, _| {
958    ///     match seq {
959    ///         15 => Err(seq),
960    ///         _ => Ok(())
961    ///     }
962    /// });
963    /// assert_eq!(result, Err(15));
964    /// // If an error is returned, the cursor will not be moved.
965    /// assert_eq!(producer.sequence(), 9);
966    /// # Ok::<(), i64>(())
967    /// ```
968    #[inline]
969    pub fn try_for_each<F, Err>(self, mut f: F) -> Result<(), Err>
970    where
971        F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
972    {
973        self.0.try_apply(|ptr, seq, end| {
974            // SAFETY: deref guaranteed safe by ringbuffer initialisation
975            let event: &mut E = unsafe { &mut *ptr };
976            f(event, seq, end)
977        })
978    }
979
980    /// Try to process a batch of mutable events on the buffer using the closure `f`.
981    ///
982    /// If an error occurs, returns the error and updates the cursor sequence to the position of
983    /// the last successfully processed event. In effect, commits the successful portion of the batch.
984    ///
985    /// **Important**: does _not_ undo any mutations to events if an error occurs.
986    ///
987    /// The parameters of `f` are:
988    ///
989    /// - `event: &mut E`, a reference to the buffer element being accessed.
990    /// - `sequence: i64`, the position of this event in the sequence.
991    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
992    ///
993    /// # Examples
994    /// ```
995    /// let (mut producer, _) = ansa::spsc(64, || 0);
996    ///
997    /// producer.wait(10).try_commit_each(|_, seq, _| {
998    ///     match seq {
999    ///         100 => Err(seq),
1000    ///         _ => Ok(())
1001    ///     }
1002    /// })?;
1003    ///
1004    /// assert_eq!(producer.sequence(), 9);
1005    /// # Ok::<(), i64>(())
1006    /// ```
1007    /// On failure, the cursor will be moved to the last successful sequence.
1008    /// ```
1009    /// let (mut producer, _) = ansa::spsc(64, || 0);
1010    ///
1011    /// let result = producer.wait(10).try_commit_each(|_, seq, _| {
1012    ///     match seq {
1013    ///         5 => Err(seq),
1014    ///         _ => Ok(())
1015    ///     }
1016    /// });
1017    ///
1018    /// assert_eq!(result, Err(5));
1019    /// assert_eq!(producer.sequence(), 4);
1020    /// ```
1021    #[inline]
1022    pub fn try_commit_each<F, Err>(self, mut f: F) -> Result<(), Err>
1023    where
1024        F: FnMut(&mut E, i64, bool) -> Result<(), Err>,
1025    {
1026        self.0.try_commit(|ptr, seq, end| {
1027            // SAFETY: deref guaranteed safe by ringbuffer initialisation
1028            let event: &mut E = unsafe { &mut *ptr };
1029            f(event, seq, end)
1030        })
1031    }
1032}
1033
1034/// A batch of events which may be immutably accessed.
1035#[repr(transparent)]
1036pub struct Events<'a, E>(Batch<'a, E>);
1037
1038impl<E> Events<'_, E> {
1039    /// Returns the size of this batch
1040    #[inline]
1041    pub fn size(&self) -> usize {
1042        self.0.size
1043    }
1044
1045    /// Process a batch of immutable events on the buffer using the closure `f`.
1046    ///
1047    /// The parameters of `f` are:
1048    ///
1049    /// - `event: &E`, a reference to the buffer element being accessed.
1050    /// - `sequence: i64`, the position of this event in the sequence.
1051    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1052    ///
1053    /// # Examples
1054    /// ```
1055    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1056    ///
1057    /// // move the producer so that events are available to the following consumer
1058    /// producer.wait(20).for_each(|_, _, _| ());
1059    ///
1060    /// consumer.wait(10).for_each(|event, seq, _| println!("{seq}: {event}"));
1061    /// ```
1062    #[inline]
1063    pub fn for_each<F>(self, mut f: F)
1064    where
1065        F: FnMut(&E, i64, bool),
1066    {
1067        self.0.apply(|ptr, seq, end| {
1068            // SAFETY: deref guaranteed safe by ringbuffer initialisation
1069            let event: &E = unsafe { &*ptr };
1070            f(event, seq, end)
1071        })
1072    }
1073
1074    /// Try to process a batch of immutable events on the buffer using the closure `f`.
1075    ///
1076    /// If an error occurs, leaves the cursor sequence unchanged and returns the error.
1077    ///
1078    /// The parameters of `f` are:
1079    ///
1080    /// - `event: &E`, a reference to the buffer element being accessed.
1081    /// - `sequence: i64`, the position of this event in the sequence.
1082    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1083    ///
1084    /// # Examples
1085    /// ```
1086    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1087    ///
1088    /// // move the producer so that events are available to the following consumer
1089    /// producer.wait(20).for_each(|_, _, _| ());
1090    ///
1091    /// consumer.wait(10).try_for_each(|_, seq, _| {
1092    ///     match seq {
1093    ///         100 => Err(seq),
1094    ///         _ => Ok(())
1095    ///     }
1096    /// })?;
1097    ///
1098    /// assert_eq!(consumer.sequence(), 9);
1099    /// # Ok::<(), i64>(())
1100    /// ```
1101    /// On failure, the cursor will not be moved.
1102    /// ```
1103    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1104    ///
1105    /// // move the producer so that events are available to the following consumer
1106    /// producer.wait(20).for_each(|_, _, _| ());
1107    ///
1108    /// let result = consumer.wait(10).try_for_each(|_, seq, _| {
1109    ///     match seq {
1110    ///         5 => Err(seq),
1111    ///         _ => Ok(())
1112    ///     }
1113    /// });
1114    ///
1115    /// assert_eq!(result, Err(5));
1116    /// // sequence values start at -1, and the first event is at sequence 0
1117    /// assert_eq!(consumer.sequence(), -1);
1118    /// ```
1119    #[inline]
1120    pub fn try_for_each<F, Err>(self, mut f: F) -> Result<(), Err>
1121    where
1122        F: FnMut(&E, i64, bool) -> Result<(), Err>,
1123    {
1124        self.0.try_apply(|ptr, seq, end| {
1125            // SAFETY: deref guaranteed safe by ringbuffer initialisation
1126            let event: &E = unsafe { &*ptr };
1127            f(event, seq, end)
1128        })
1129    }
1130
1131    /// Try to process a batch of immutable events on the buffer using the closure `f`.
1132    ///
1133    /// If an error occurs, returns the error and updates the cursor sequence to the position of
1134    /// the last successfully processed event. In effect, commits the successful portion of the batch.
1135    ///
1136    /// The parameters of `f` are:
1137    ///
1138    /// - `event: &E`, a reference to the buffer element being accessed.
1139    /// - `sequence: i64`, the position of this event in the sequence.
1140    /// - `batch_end: bool`, indicating whether this is the last event in the requested batch.
1141    ///
1142    /// # Examples
1143    /// ```
1144    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1145    ///
1146    /// // move the producer so that events are available to the following consumer
1147    /// producer.wait(20).for_each(|_, _, _| ());
1148    ///
1149    /// consumer.wait(10).try_commit_each(|_, seq, _| {
1150    ///     match seq {
1151    ///         100 => Err(seq),
1152    ///         _ => Ok(())
1153    ///     }
1154    /// })?;
1155    ///
1156    /// assert_eq!(consumer.sequence(), 9);
1157    /// # Ok::<(), i64>(())
1158    /// ```
1159    /// On failure, the cursor will be moved to the last successful sequence.
1160    /// ```
1161    /// let (mut producer, mut consumer) = ansa::spsc(64, || 0);
1162    ///
1163    /// // move the producer so that events are available to the following consumer
1164    /// producer.wait(20).for_each(|_, _, _| ());
1165    ///
1166    /// let result = consumer.wait(10).try_commit_each(|_, seq, _| {
1167    ///     match seq {
1168    ///         5 => Err(seq),
1169    ///         _ => Ok(())
1170    ///     }
1171    /// });
1172    ///
1173    /// assert_eq!(result, Err(5));
1174    /// assert_eq!(consumer.sequence(), 4);
1175    /// ```
1176    #[inline]
1177    pub fn try_commit_each<F, Err>(self, mut f: F) -> Result<(), Err>
1178    where
1179        F: FnMut(&E, i64, bool) -> Result<(), Err>,
1180    {
1181        self.0.try_commit(|ptr, seq, end| {
1182            // SAFETY: deref guaranteed safe by ringbuffer initialisation
1183            let event: &E = unsafe { &*ptr };
1184            f(event, seq, end)
1185        })
1186    }
1187}
1188
1189#[derive(Debug)]
1190#[repr(transparent)]
1191pub(crate) struct Cursor {
1192    #[cfg(not(feature = "cache-padded"))]
1193    sequence: AtomicI64,
1194    #[cfg(feature = "cache-padded")]
1195    sequence: crossbeam_utils::CachePadded<AtomicI64>,
1196}
1197
1198const CURSOR_START: i64 = -1;
1199
1200impl Cursor {
1201    pub(crate) const fn new(seq: i64) -> Self {
1202        Cursor {
1203            #[cfg(not(feature = "cache-padded"))]
1204            sequence: AtomicI64::new(seq),
1205            #[cfg(feature = "cache-padded")]
1206            sequence: crossbeam_utils::CachePadded::new(AtomicI64::new(seq)),
1207        }
1208    }
1209
1210    /// Create a cursor at the start of the sequence. All accesses begin on the _next_ position in
1211    /// the sequence, thus cursors start at `-1`, so that accesses start at `0`.
1212    pub(crate) const fn start() -> Self {
1213        Cursor::new(CURSOR_START)
1214    }
1215}
1216
1217/// A collection of cursors that determine which sequence is available to a handle.
1218///
1219/// Every handle has a barrier, and no handle may overtake its barrier.
1220#[derive(Clone, Debug)]
1221#[repr(transparent)]
1222pub struct Barrier(Barrier_);
1223
1224#[derive(Clone, Debug)]
1225enum Barrier_ {
1226    One(Arc<Cursor>),
1227    Many(Box<[Arc<Cursor>]>),
1228}
1229
1230impl Barrier {
1231    pub(crate) fn one(cursor: Arc<Cursor>) -> Self {
1232        Barrier(Barrier_::One(cursor))
1233    }
1234
1235    pub(crate) fn many(cursors: Box<[Arc<Cursor>]>) -> Self {
1236        Barrier(Barrier_::Many(cursors))
1237    }
1238
1239    /// The position of the barrier.
1240    ///
1241    /// # Examples
1242    /// ```
1243    /// use ansa::{Barrier, wait::WaitStrategy};
1244    ///
1245    /// struct MyBusyWait;
1246    ///
1247    /// // SAFETY: wait returns once barrier seq >= desired_seq, with barrier seq itself
1248    /// unsafe impl WaitStrategy for MyBusyWait {
1249    ///     fn wait(&self, desired_seq: i64, barrier: &Barrier) -> i64 {
1250    ///         while barrier.sequence() < desired_seq {} // busy-spin
1251    ///         barrier.sequence()
1252    ///     }
1253    /// }
1254    /// ```
1255    #[inline]
1256    pub fn sequence(&self) -> i64 {
1257        match &self.0 {
1258            Barrier_::One(cursor) => cursor.sequence.load(Ordering::Relaxed),
1259            Barrier_::Many(cursors) => cursors.iter().fold(i64::MAX, |seq, cursor| {
1260                seq.min(cursor.sequence.load(Ordering::Relaxed))
1261            }),
1262        }
1263    }
1264}
1265
1266impl Drop for Barrier {
1267    // We need to break the Arc cycle of barriers. Just get rid of all the Arcs to guarantee this.
1268    fn drop(&mut self) {
1269        self.0 = Barrier_::Many(Box::new([]));
1270    }
1271}
1272
1273#[allow(clippy::cast_possible_truncation)]
1274#[allow(clippy::cast_possible_wrap)]
1275#[inline]
1276const fn saturate_i64(u: usize) -> i64 {
1277    if const { size_of::<usize>() >= 8 } {
1278        // the 64th bit must be zero to avoid negative wrapping when casting to i64
1279        (u & i64::MAX as usize) as i64
1280    } else {
1281        // all usize values fit in i64
1282        u as i64
1283    }
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288    use super::*;
1289    use crate::wait::WaitBusy;
1290
1291    // Check that sizes don't accidentally change. If size change is found and intended, just
1292    // change the values in this test.
1293    #[test]
1294    fn sizes() {
1295        assert_eq!(size_of::<Consumer<u8, WaitBusy>>(), 40);
1296        assert_eq!(size_of::<Producer<u8, WaitBusy, true>>(), 40);
1297        assert_eq!(size_of::<MultiProducer<u8, WaitBusy, true>>(), 48);
1298    }
1299
1300    #[test]
1301    fn test_wait_range() {
1302        let buffer = Arc::new(RingBuffer::from_factory(16, || 0));
1303        let lead_cursor = Arc::new(Cursor::new(8));
1304        let follows_cursor = Arc::new(Cursor::new(4));
1305
1306        let mut lead_handle = HandleInner::<_, _, true>::new(
1307            Arc::clone(&lead_cursor),
1308            Barrier::one(Arc::clone(&follows_cursor)),
1309            Arc::clone(&buffer),
1310            WaitBusy,
1311        );
1312
1313        let mut follows_handle = HandleInner::<_, _, false>::new(
1314            Arc::clone(&follows_cursor),
1315            Barrier::one(Arc::clone(&lead_cursor)),
1316            Arc::clone(&buffer),
1317            WaitBusy,
1318        );
1319
1320        let lead_batch = lead_handle.wait_range(1..);
1321        assert_eq!(lead_batch.current, 8);
1322        assert_eq!(lead_batch.size, 12);
1323
1324        let follows_batch = follows_handle.wait_range(1..);
1325        assert_eq!(follows_batch.current, 4);
1326        assert_eq!(follows_batch.size, 4);
1327    }
1328
1329    #[test]
1330    fn test_size_zero_apply() {
1331        let buffer = Arc::new(RingBuffer::from_factory(16, || 0));
1332        let mut lead_handle = HandleInner::<_, _, true>::new(
1333            Arc::new(Cursor::new(8)),
1334            Barrier::one(Arc::new(Cursor::new(4))),
1335            Arc::clone(&buffer),
1336            WaitBusy,
1337        );
1338
1339        let sequence_before_apply = lead_handle.cursor.sequence.load(Ordering::Relaxed);
1340
1341        let batch = lead_handle.wait(0);
1342        assert_eq!(batch.size, 0);
1343
1344        batch.apply(|_, _, _| assert!(false));
1345        // sequence should not have moved and the apply function should not have been called
1346        let sequence_after_apply = lead_handle.cursor.sequence.load(Ordering::Relaxed);
1347        assert_eq!(sequence_before_apply, sequence_after_apply);
1348    }
1349}