mini_io_queue/
ring.rs

1use crate::cache_padded::CachePadded;
2use core::ops::Range;
3use core::sync::atomic::{AtomicUsize, Ordering};
4
5/// A low-level atomic ring-buffer building block.
6///
7/// This type implements logic for managing the two non-overlapping regions of a ring buffer mapped
8/// to linear storage. This is useful for implementing safe, higher-level queues such as
9/// [`nonblocking`] and [`asyncio`].
10///
11/// `Ring` models two "regions", named the left and right region, of a conceptual fixed-length ring
12/// array. Each region can advance itself, moving data at the start of the region to the end of the
13/// other region. This allows data in each region to be read and written independently, then fed to
14/// the other region without synchronization.
15///
16/// When a region is mapped to a linear array, which has a start and end index, it might end up in
17/// two "chunks" where it overlaps the edge of the array. `Ring` provides access to the indices
18/// of the first chunks in the left and right region, as well as logic to advance each region. It
19/// does not contain the buffer itself.
20///
21/// [`nonblocking`]: crate::nonblocking
22/// [`asyncio`]: crate::asyncio
23#[derive(Debug, Default)]
24pub struct Ring {
25    left: CachePadded<AtomicUsize>,
26    right: CachePadded<AtomicUsize>,
27    capacity: usize,
28}
29
30impl Ring {
31    /// Creates a ring with a specific capacity.
32    ///
33    /// The ring starts with an empty left region and a right range from index 0 to the capacity.
34    pub fn new(capacity: usize) -> Self {
35        Ring {
36            left: CachePadded::new(AtomicUsize::new(0)),
37            right: CachePadded::new(AtomicUsize::new(0)),
38            capacity,
39        }
40    }
41
42    /// Gets the capacity of the ring.
43    #[inline]
44    pub fn capacity(&self) -> usize {
45        self.capacity
46    }
47
48    /// Returns an object that implements [`Display`] with specific formatting. This is useful for
49    /// debugging the stage of the ring.
50    ///
51    /// # Example
52    /// ```
53    /// use mini_io_queue::Ring;
54    ///
55    /// let ring = Ring::new(16);
56    /// ring.advance_right(4);
57    ///
58    /// let s = format!("{}", ring.display(16, 'L', 'R'));
59    /// assert_eq!(s, "LLLLRRRRRRRRRRRR");
60    /// ```
61    #[inline]
62    pub fn display(&self, width: usize, left_char: char, right_char: char) -> Display {
63        Display {
64            ring: self,
65            width,
66            left_char,
67            right_char,
68        }
69    }
70
71    /// Gets the range of indices in both chunks of the left region. Both or one range can be empty.
72    /// The ranges will never overlap.
73    ///
74    /// The function guarantees that the start index of both ranges will stay the same across calls,
75    /// and the end index will only increase, until the range is invalidated.
76    ///
77    /// This function also guarantees that the returned ranges will never overlap with a range
78    /// returned by [`right_ranges`], as long as the ranges are not invalidated.
79    ///
80    /// The ranges are invalidated by advancing the region by calling [`advance_left`] or
81    /// [`advance_left_unchecked`]. Any ranges read before advancing are effectively meaningless
82    /// after advancing. Invalidated ranges must not be used to slice an underlying array, or you
83    /// may end up with overlapping left and right slices.
84    ///
85    /// [`advance_left`]: Ring::advance_left
86    /// [`advance_left_unchecked`]: Ring::advance_left_unchecked
87    /// [`right_ranges`]: Ring::right_ranges
88    pub fn left_ranges(&self) -> (Range<usize>, Range<usize>) {
89        let left = self.left.load(Ordering::Acquire);
90        let right = self.right.load(Ordering::Acquire);
91
92        let left_offset = left % self.capacity;
93        let right_offset = right % self.capacity;
94
95        debug_assert!(left_offset <= self.capacity);
96        debug_assert!(right_offset <= self.capacity);
97
98        // Left is empty if read == write
99        if left == right {
100            return (left_offset..left_offset, left_offset..left_offset);
101        }
102
103        if left_offset >= right_offset {
104            //  V chunk 2      V chunk 1
105            // [LLLRRRRRRRRRRRRLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL]
106            //     ^-right     ^-left
107
108            (left_offset..self.capacity, 0..right_offset)
109        } else {
110            //     V chunk 1
111            // [RRRLLLLLLLLLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR]
112            //     ^-left      ^-right
113
114            (left_offset..right_offset, 0..0)
115        }
116    }
117
118    /// Gets the total length of the left region. This will always match the combined lengths of the
119    /// slices returned by [`left_ranges`]. This will always be less than or equal to the ring's
120    /// capacity.
121    ///
122    /// [`left_ranges`]: Ring::left_ranges
123    pub fn left_len(&self) -> usize {
124        let left = self.left.load(Ordering::Acquire);
125        let right = self.right.load(Ordering::Acquire);
126
127        right.wrapping_sub(left) % (self.capacity * 2)
128    }
129
130    /// Advances the left region, conceptually moving `len` elements at the start of the left region
131    /// to the end of the right region and shrinking the left region as a result.
132    ///
133    /// # Panics
134    /// Panics if `len` is larger than the current size of the left region.
135    pub fn advance_left(&self, len: usize) {
136        assert!(
137            len <= self.left_len(),
138            "len was larger than left region length"
139        );
140
141        // Safety: max length is ensured by the assert.
142        unsafe { self.advance_left_unchecked(len) }
143    }
144
145    /// Advances the left region, conceptually moving `len` elements at the start of the left region
146    /// to the end of the right region and shrinking the left region as a result.
147    ///
148    /// # Safety
149    /// `len` must be less than or equal to the length of the left region returned by [`left_len`].
150    ///
151    /// [`left_len`]: Ring::left_len
152    #[inline]
153    pub unsafe fn advance_left_unchecked(&self, len: usize) {
154        self.left
155            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |left| {
156                Some(left.wrapping_add(len) % (self.capacity * 2))
157            })
158            .unwrap();
159    }
160
161    /// Gets the range of indices in the both chunks of the right region. Both or one range can be
162    /// empty. The ranges will never overlap.
163    ///
164    /// The function guarantees that the start index of both ranges will stay the same across calls,
165    /// and the end index will only increase, until the range is invalidated.
166    ///
167    /// This function also guarantees that the returned rangse will never overlap with a range
168    /// returned by [`left_ranges`], as long as the ranges are not invalidated.
169    ///
170    /// The ranges are invalidated by advancing the region by calling [`advance_right`] or
171    /// [`advance_right_unchecked`]. Any ranges read before advancing are effectively meaningless
172    /// after advancing. Invalidated ranges must not be used to slice an underlying array, or you
173    /// may end up with overlapping left and right slices.
174    ///
175    /// [`advance_right`]: Ring::advance_right
176    /// [`advance_right_unchecked`]: Ring::advance_right_unchecked
177    /// [`left_ranges`]: Ring::left_ranges
178    pub fn right_ranges(&self) -> (Range<usize>, Range<usize>) {
179        let left = self.left.load(Ordering::Acquire);
180        let right = self.right.load(Ordering::Acquire);
181
182        let left_size = right.wrapping_sub(left) % (self.capacity * 2);
183
184        let left_offset = left % self.capacity;
185        let right_offset = right % self.capacity;
186
187        debug_assert!(left_offset <= self.capacity);
188        debug_assert!(right_offset <= self.capacity);
189
190        // Right is empty if left_size == capacity
191        if left_size == self.capacity {
192            return (right_offset..right_offset, right_offset..right_offset);
193        }
194
195        if left_offset <= right_offset {
196            //  V chunk 2      V chunk 1
197            // [RRRLLLLLLLLLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR]
198            //     ^-left      ^-right
199
200            (right_offset..self.capacity, 0..left_offset)
201        } else {
202            //       V chunk 1
203            // [LLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL]
204            //       ^-right                               ^-left
205
206            (right_offset..left_offset, 0..0)
207        }
208    }
209
210    /// Gets the total length of the right region. This will always match the combined lengths of
211    /// the slices returned by [`right_ranges`]. This will always be less than or equal to the
212    /// ring's capacity.
213    ///
214    /// [`right_ranges`]: Ring::right_ranges
215    pub fn right_len(&self) -> usize {
216        self.capacity - self.left_len()
217    }
218
219    /// Advances the right region, conceptually moving `len` elements at the start of the right
220    /// region to the end of the left region and shrinking the right region as a result.
221    ///
222    /// # Panics
223    /// Panics if `len` is larger than the current size of the right region.
224    pub fn advance_right(&self, len: usize) {
225        assert!(
226            len <= self.right_len(),
227            "len was larger than right region length"
228        );
229
230        // Safety: max length is ensured by the assert.
231        unsafe { self.advance_right_unchecked(len) }
232    }
233
234    /// Advances the right region, conceptually moving `len` elements at the start of the right
235    /// region to the end of the left region and shrinking the right region as a result.
236    ///
237    /// # Safety
238    /// `len` must be less than or equal to the length of the left region returned by [`right_len`].
239    ///
240    /// [`right_len`]: Ring::right_len
241    #[inline]
242    pub unsafe fn advance_right_unchecked(&self, len: usize) {
243        self.right
244            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |right| {
245                Some(right.wrapping_add(len) % (self.capacity * 2))
246            })
247            .unwrap();
248    }
249}
250
251/// Helper struct for printing formatted rings with [`format!`] and `{}`.
252#[derive(Debug)]
253pub struct Display<'a> {
254    ring: &'a Ring,
255    width: usize,
256    left_char: char,
257    right_char: char,
258}
259
260impl core::fmt::Display for Display<'_> {
261    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
262        let left = self.ring.left.load(Ordering::Acquire);
263        let right = self.ring.right.load(Ordering::Acquire);
264
265        let left_offset = left % self.ring.capacity;
266        let right_offset = right % self.ring.capacity;
267
268        let cap_f32 = self.ring.capacity as f32;
269        let width_f32 = self.width as f32;
270        let left_index = ((left_offset as f32 / cap_f32) * width_f32) as usize;
271        let right_index = ((right_offset as f32 / cap_f32) * width_f32) as usize;
272
273        // left is empty if left == right
274        if left == right {
275            for _ in 0..self.width {
276                write!(f, "{}", self.right_char)?;
277            }
278            return Ok(());
279        }
280
281        let (outer_char, inner_char, inner_start, inner_end) = if left_offset >= right_offset {
282            (self.left_char, self.right_char, right_index, left_index)
283        } else {
284            (self.right_char, self.left_char, left_index, right_index)
285        };
286
287        for _ in 0..inner_start {
288            write!(f, "{outer_char}")?;
289        }
290        for _ in inner_start..inner_end {
291            write!(f, "{inner_char}")?;
292        }
293        for _ in inner_end..self.width {
294            write!(f, "{outer_char}")?;
295        }
296
297        Ok(())
298    }
299}