mini_io_queue/ring.rs
1use crate::cache_padded::CachePadded;
2use core::ops::Range;
3use core::sync::atomic::{AtomicUsize, Ordering};
4
5/// A low-level atomic ring-buffer building block.
6///
7/// This type implements logic for managing the two non-overlapping regions of a ring buffer mapped
8/// to linear storage. This is useful for implementing safe, higher-level queues such as
9/// [`nonblocking`] and [`asyncio`].
10///
11/// `Ring` models two "regions", named the left and right region, of a conceptual fixed-length ring
12/// array. Each region can advance itself, moving data at the start of the region to the end of the
13/// other region. This allows data in each region to be read and written independently, then fed to
14/// the other region without synchronization.
15///
16/// When a region is mapped to a linear array, which has a start and end index, it might end up in
17/// two "chunks" where it overlaps the edge of the array. `Ring` provides access to the indices
18/// of the first chunks in the left and right region, as well as logic to advance each region. It
19/// does not contain the buffer itself.
20///
21/// [`nonblocking`]: crate::nonblocking
22/// [`asyncio`]: crate::asyncio
23#[derive(Debug, Default)]
24pub struct Ring {
25 left: CachePadded<AtomicUsize>,
26 right: CachePadded<AtomicUsize>,
27 capacity: usize,
28}
29
30impl Ring {
31 /// Creates a ring with a specific capacity.
32 ///
33 /// The ring starts with an empty left region and a right range from index 0 to the capacity.
34 pub fn new(capacity: usize) -> Self {
35 Ring {
36 left: CachePadded::new(AtomicUsize::new(0)),
37 right: CachePadded::new(AtomicUsize::new(0)),
38 capacity,
39 }
40 }
41
42 /// Gets the capacity of the ring.
43 #[inline]
44 pub fn capacity(&self) -> usize {
45 self.capacity
46 }
47
48 /// Returns an object that implements [`Display`] with specific formatting. This is useful for
49 /// debugging the stage of the ring.
50 ///
51 /// # Example
52 /// ```
53 /// use mini_io_queue::Ring;
54 ///
55 /// let ring = Ring::new(16);
56 /// ring.advance_right(4);
57 ///
58 /// let s = format!("{}", ring.display(16, 'L', 'R'));
59 /// assert_eq!(s, "LLLLRRRRRRRRRRRR");
60 /// ```
61 #[inline]
62 pub fn display(&self, width: usize, left_char: char, right_char: char) -> Display {
63 Display {
64 ring: self,
65 width,
66 left_char,
67 right_char,
68 }
69 }
70
71 /// Gets the range of indices in both chunks of the left region. Both or one range can be empty.
72 /// The ranges will never overlap.
73 ///
74 /// The function guarantees that the start index of both ranges will stay the same across calls,
75 /// and the end index will only increase, until the range is invalidated.
76 ///
77 /// This function also guarantees that the returned ranges will never overlap with a range
78 /// returned by [`right_ranges`], as long as the ranges are not invalidated.
79 ///
80 /// The ranges are invalidated by advancing the region by calling [`advance_left`] or
81 /// [`advance_left_unchecked`]. Any ranges read before advancing are effectively meaningless
82 /// after advancing. Invalidated ranges must not be used to slice an underlying array, or you
83 /// may end up with overlapping left and right slices.
84 ///
85 /// [`advance_left`]: Ring::advance_left
86 /// [`advance_left_unchecked`]: Ring::advance_left_unchecked
87 /// [`right_ranges`]: Ring::right_ranges
88 pub fn left_ranges(&self) -> (Range<usize>, Range<usize>) {
89 let left = self.left.load(Ordering::Acquire);
90 let right = self.right.load(Ordering::Acquire);
91
92 let left_offset = left % self.capacity;
93 let right_offset = right % self.capacity;
94
95 debug_assert!(left_offset <= self.capacity);
96 debug_assert!(right_offset <= self.capacity);
97
98 // Left is empty if read == write
99 if left == right {
100 return (left_offset..left_offset, left_offset..left_offset);
101 }
102
103 if left_offset >= right_offset {
104 // V chunk 2 V chunk 1
105 // [LLLRRRRRRRRRRRRLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL]
106 // ^-right ^-left
107
108 (left_offset..self.capacity, 0..right_offset)
109 } else {
110 // V chunk 1
111 // [RRRLLLLLLLLLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR]
112 // ^-left ^-right
113
114 (left_offset..right_offset, 0..0)
115 }
116 }
117
118 /// Gets the total length of the left region. This will always match the combined lengths of the
119 /// slices returned by [`left_ranges`]. This will always be less than or equal to the ring's
120 /// capacity.
121 ///
122 /// [`left_ranges`]: Ring::left_ranges
123 pub fn left_len(&self) -> usize {
124 let left = self.left.load(Ordering::Acquire);
125 let right = self.right.load(Ordering::Acquire);
126
127 right.wrapping_sub(left) % (self.capacity * 2)
128 }
129
130 /// Advances the left region, conceptually moving `len` elements at the start of the left region
131 /// to the end of the right region and shrinking the left region as a result.
132 ///
133 /// # Panics
134 /// Panics if `len` is larger than the current size of the left region.
135 pub fn advance_left(&self, len: usize) {
136 assert!(
137 len <= self.left_len(),
138 "len was larger than left region length"
139 );
140
141 // Safety: max length is ensured by the assert.
142 unsafe { self.advance_left_unchecked(len) }
143 }
144
145 /// Advances the left region, conceptually moving `len` elements at the start of the left region
146 /// to the end of the right region and shrinking the left region as a result.
147 ///
148 /// # Safety
149 /// `len` must be less than or equal to the length of the left region returned by [`left_len`].
150 ///
151 /// [`left_len`]: Ring::left_len
152 #[inline]
153 pub unsafe fn advance_left_unchecked(&self, len: usize) {
154 self.left
155 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |left| {
156 Some(left.wrapping_add(len) % (self.capacity * 2))
157 })
158 .unwrap();
159 }
160
161 /// Gets the range of indices in the both chunks of the right region. Both or one range can be
162 /// empty. The ranges will never overlap.
163 ///
164 /// The function guarantees that the start index of both ranges will stay the same across calls,
165 /// and the end index will only increase, until the range is invalidated.
166 ///
167 /// This function also guarantees that the returned rangse will never overlap with a range
168 /// returned by [`left_ranges`], as long as the ranges are not invalidated.
169 ///
170 /// The ranges are invalidated by advancing the region by calling [`advance_right`] or
171 /// [`advance_right_unchecked`]. Any ranges read before advancing are effectively meaningless
172 /// after advancing. Invalidated ranges must not be used to slice an underlying array, or you
173 /// may end up with overlapping left and right slices.
174 ///
175 /// [`advance_right`]: Ring::advance_right
176 /// [`advance_right_unchecked`]: Ring::advance_right_unchecked
177 /// [`left_ranges`]: Ring::left_ranges
178 pub fn right_ranges(&self) -> (Range<usize>, Range<usize>) {
179 let left = self.left.load(Ordering::Acquire);
180 let right = self.right.load(Ordering::Acquire);
181
182 let left_size = right.wrapping_sub(left) % (self.capacity * 2);
183
184 let left_offset = left % self.capacity;
185 let right_offset = right % self.capacity;
186
187 debug_assert!(left_offset <= self.capacity);
188 debug_assert!(right_offset <= self.capacity);
189
190 // Right is empty if left_size == capacity
191 if left_size == self.capacity {
192 return (right_offset..right_offset, right_offset..right_offset);
193 }
194
195 if left_offset <= right_offset {
196 // V chunk 2 V chunk 1
197 // [RRRLLLLLLLLLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR]
198 // ^-left ^-right
199
200 (right_offset..self.capacity, 0..left_offset)
201 } else {
202 // V chunk 1
203 // [LLLLLRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRLLLLLLLLLLLLLLLLLLLLLLLLLLLLLLL]
204 // ^-right ^-left
205
206 (right_offset..left_offset, 0..0)
207 }
208 }
209
210 /// Gets the total length of the right region. This will always match the combined lengths of
211 /// the slices returned by [`right_ranges`]. This will always be less than or equal to the
212 /// ring's capacity.
213 ///
214 /// [`right_ranges`]: Ring::right_ranges
215 pub fn right_len(&self) -> usize {
216 self.capacity - self.left_len()
217 }
218
219 /// Advances the right region, conceptually moving `len` elements at the start of the right
220 /// region to the end of the left region and shrinking the right region as a result.
221 ///
222 /// # Panics
223 /// Panics if `len` is larger than the current size of the right region.
224 pub fn advance_right(&self, len: usize) {
225 assert!(
226 len <= self.right_len(),
227 "len was larger than right region length"
228 );
229
230 // Safety: max length is ensured by the assert.
231 unsafe { self.advance_right_unchecked(len) }
232 }
233
234 /// Advances the right region, conceptually moving `len` elements at the start of the right
235 /// region to the end of the left region and shrinking the right region as a result.
236 ///
237 /// # Safety
238 /// `len` must be less than or equal to the length of the left region returned by [`right_len`].
239 ///
240 /// [`right_len`]: Ring::right_len
241 #[inline]
242 pub unsafe fn advance_right_unchecked(&self, len: usize) {
243 self.right
244 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |right| {
245 Some(right.wrapping_add(len) % (self.capacity * 2))
246 })
247 .unwrap();
248 }
249}
250
251/// Helper struct for printing formatted rings with [`format!`] and `{}`.
252#[derive(Debug)]
253pub struct Display<'a> {
254 ring: &'a Ring,
255 width: usize,
256 left_char: char,
257 right_char: char,
258}
259
260impl core::fmt::Display for Display<'_> {
261 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
262 let left = self.ring.left.load(Ordering::Acquire);
263 let right = self.ring.right.load(Ordering::Acquire);
264
265 let left_offset = left % self.ring.capacity;
266 let right_offset = right % self.ring.capacity;
267
268 let cap_f32 = self.ring.capacity as f32;
269 let width_f32 = self.width as f32;
270 let left_index = ((left_offset as f32 / cap_f32) * width_f32) as usize;
271 let right_index = ((right_offset as f32 / cap_f32) * width_f32) as usize;
272
273 // left is empty if left == right
274 if left == right {
275 for _ in 0..self.width {
276 write!(f, "{}", self.right_char)?;
277 }
278 return Ok(());
279 }
280
281 let (outer_char, inner_char, inner_start, inner_end) = if left_offset >= right_offset {
282 (self.left_char, self.right_char, right_index, left_index)
283 } else {
284 (self.right_char, self.left_char, left_index, right_index)
285 };
286
287 for _ in 0..inner_start {
288 write!(f, "{outer_char}")?;
289 }
290 for _ in inner_start..inner_end {
291 write!(f, "{inner_char}")?;
292 }
293 for _ in inner_end..self.width {
294 write!(f, "{outer_char}")?;
295 }
296
297 Ok(())
298 }
299}