async_fifo/fifo/
mod.rs

1//! First-in, first-out shared buffer
2
3use core::sync::atomic::{AtomicUsize, AtomicU32};
4use core::sync::atomic::Ordering::{SeqCst, Relaxed};
5use core::array::from_fn;
6
7use alloc::boxed::Box;
8use alloc::vec::Vec;
9
10use crate::slot::Slot;
11
12#[doc(hidden)]
13pub use storage::TmpArray;
14
15pub use storage::{Storage, InternalStorageApi};
16pub use block_size::{BlockSize, SmallBlockSize, DefaultBlockSize, LargeBlockSize, HugeBlockSize};
17use block_ptr::{BlockPointer, CollectedBlock};
18
19mod block;
20mod block_ptr;
21mod block_size;
22mod storage;
23
24const REV_CAP: usize = 8;
25
26type RecycleBin<const L: usize, const F: usize, T> = Vec<CollectedBlock<L, F, T>>;
27
28fn try_swap_int(atomic_int: &AtomicUsize, old: usize, new: usize) -> bool {
29    atomic_int.compare_exchange(old, new, SeqCst, Relaxed).is_ok()
30}
31
32/// Shared First-In/First-Out Buffer
33///
34/// # Principle
35///
36/// The methods on this object work by negociating how
37/// many items can be pushed or pulled based on atomic
38/// integer operations.
39///
40/// When you push N items, a reservation of N slots will be
41/// made in the buffer. Once the reservation is made, the items
42/// are written into the slots and they are marked as "produced".
43///
44/// When you try to pull some items from the buffer, first
45/// a negociation takes place based on constraints specified
46/// by the `Storage` implementation you provide. If the number
47/// of available items meets these constraints, these items are
48/// extracted from the buffer.
49///
50/// # Fifo vs Channel
51///
52/// Realistically, someone would use channels instead of this.
53/// Channels provide distinct producer/consumer handles for one
54/// FIFO, which makes code easier to understand.
55///
56/// # Memory Footprint
57///
58/// - This structure has a size of 68 bytes.
59/// - It has a boxed 24-byte block recycle bin.
60/// - The recycle bin has a growable buffer of 16-byte elements.
61/// - Each block as a size of `16 + F x (2 + 8 x sz(item))` bytes.
62///
63/// For basic uses, the overall memory footprint of this structure
64/// should be in the hundreds of bytes.
65pub struct Fifo<const L: usize, const F: usize, T> {
66    // updated by consumers when they collect fully consumed first blocks
67    first_block: BlockPointer<L, F, T>,
68    // updated by producers when they take slots
69    prod_cursor: AtomicUsize,
70    // updated by consumers when they take slots
71    cons_cursor: AtomicUsize,
72    // current revision
73    revision: AtomicU32,
74    // Shared recycle bin for collected blocks
75    recycle_bin: Slot<RecycleBin<L, F, T>>,
76    // ringbuf of visitors per revision
77    visitors: [AtomicU32; REV_CAP],
78}
79
80impl<const L: usize, const F: usize, T> Default for Fifo<L, F, T> {
81    fn default() -> Self {
82        assert_eq!(F * 8, L);
83        let recycle_bin = Box::new(RecycleBin::new());
84
85        Self {
86            first_block: BlockPointer::new(),
87            prod_cursor: AtomicUsize::new(0),
88            cons_cursor: AtomicUsize::new(0),
89            revision: AtomicU32::new(0),
90            recycle_bin: Slot::new(recycle_bin),
91            visitors: from_fn(|_| AtomicU32::new(0)),
92        }
93    }
94}
95
96impl<const L: usize, const F: usize, T> Fifo<L, F, T> {
97    fn init_visit(&self) -> usize {
98        loop {
99            let rev = self.revision.load(SeqCst) as usize;
100            let rev_refcount = &self.visitors[rev % REV_CAP];
101            rev_refcount.fetch_add(1, SeqCst);
102
103            let new_rev = self.revision.load(SeqCst) as usize;
104            match (new_rev - rev) < REV_CAP {
105                true => break rev,
106                // we have written into an already re-used refcount
107                false => _ = rev_refcount.fetch_sub(1, SeqCst),
108            }
109        }
110    }
111
112    fn stop_visit(&self, rev: usize) {
113        self.visitors[rev % REV_CAP].fetch_sub(1, SeqCst);
114    }
115
116    fn try_maintain(&self) {
117        let Some(mut bin) = self.recycle_bin.try_take(false) else {
118            // another thread is already trying to recycle the first block
119            return;
120        };
121
122        let current_rev = self.revision.load(SeqCst) as usize;
123        let oldest_rev = current_rev.saturating_sub(REV_CAP - 1);
124
125        // find the oldest revision that still has at least one visitor
126        let mut oldest_visited_rev = current_rev;
127        for rev in oldest_rev..current_rev {
128            let rc_slot = rev % REV_CAP;
129            if self.visitors[rc_slot].load(SeqCst) != 0 {
130                oldest_visited_rev = rev;
131                break;
132            }
133        }
134
135        let oldest_used_slot = oldest_visited_rev % REV_CAP;
136
137        let next_rev = current_rev + 1;
138        let next_slot = next_rev % REV_CAP;
139
140        // is no one visiting this revision?
141        let can_increment = next_slot != oldest_used_slot;
142
143        let mut i = 0;
144        while i < bin.len() {
145            if bin[i].revision < oldest_visited_rev {
146                // quick, recycle these blocks, before we switch
147                // to that refcount slot
148                let block = bin.remove(i);
149                self.first_block.recycle(block);
150            } else {
151                i += 1;
152            }
153        }
154
155        if can_increment {
156            let mut has_collected = false;
157
158            while let Some(block) = self.first_block.try_collect(current_rev) {
159                bin.push(block);
160                has_collected = true;
161            }
162
163            if has_collected {
164                self.revision.store(next_rev as u32, SeqCst);
165            }
166        }
167
168        assert!(self.recycle_bin.try_insert(bin).is_ok());
169    }
170
171    // visit must be ongoing
172    fn produced(&self) -> usize {
173        let mut is_first_block = true;
174        let mut maybe_block = &self.first_block;
175        let mut total_produced = 0;
176
177        'outer: while let Some(block) = maybe_block.load() {
178            if is_first_block {
179                total_produced = block.offset.load(SeqCst);
180                is_first_block = false;
181            }
182
183            for i in 0..L {
184                match block.is_produced(i) {
185                    false => break 'outer,
186                    true => total_produced += 1,
187                }
188            }
189
190            maybe_block = &block.next;
191        }
192
193        total_produced
194    }
195}
196
197unsafe impl<const L: usize, const F: usize, T> Send for Fifo<L, F, T> {}
198unsafe impl<const L: usize, const F: usize, T> Sync for Fifo<L, F, T> {}
199
200/// Dyn-Compatible subset of [`Fifo`] methods
201pub trait FifoApi<T>: Send + Sync {
202    fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>);
203    fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize;
204    fn iter(&self) -> PullIter<'_, T>;
205    #[doc(hidden)]
206    fn consume_item(&self, i: usize) -> T;
207    #[doc(hidden)]
208    fn iter_drop(&self);
209}
210
211impl<const L: usize, const F: usize, T> FifoApi<T> for Fifo<L, F, T> {
212    /// Inserts all the items from an iterator into the FIFO, atomically
213    ///
214    /// The order of the items is preserved, and they will be inserted
215    /// consecutively; other pushes from other tasks will not interfere.
216    ///
217    /// This method doesn't spin, yield or sleeps; it should complete
218    /// rather immediately. The only think that can take time here is
219    /// an occasional memory allocation.
220    fn push(&self, iter: &mut dyn ExactSizeIterator<Item = T>) {
221        let revision = self.init_visit();
222
223        let mut remaining = iter.len();
224        let mut i = self.prod_cursor.fetch_add(remaining, SeqCst);
225
226        let mut is_first_block = true;
227        let mut block_offset = 0;
228        let mut maybe_block = &self.first_block;
229
230        while remaining > 0 {
231            let Some(block) = maybe_block.load() else {
232                maybe_block.append_new();
233                continue;
234            };
235
236            if is_first_block {
237                block_offset = block.offset.load(SeqCst);
238                is_first_block = false;
239            }
240
241            let next_block_offset = block_offset + L;
242            let block_range = block_offset..next_block_offset;
243
244            // do we have slots here?
245            while block_range.contains(&i) && remaining > 0 {
246                let item = iter.next().unwrap();
247
248                let slot_i = i - block_offset;
249                block.produce(slot_i, item);
250
251                i += 1;
252                remaining -= 1;
253            }
254
255            block_offset = next_block_offset;
256            maybe_block = &block.next;
257        }
258
259        self.stop_visit(revision);
260        self.try_maintain();
261    }
262
263    /// Retrieves some elements from the FIFO, atomically
264    ///
265    /// If the number of available items doesn't meet the constraints
266    /// of the `storage` parameter, this returns zero.
267    ///
268    /// This method doesn't spin, yield or sleeps; it should complete
269    /// rather immediately. The only think that can take time here is
270    /// an occasional memory allocation.
271    fn pull(&self, storage: &mut dyn InternalStorageApi<T>) -> usize {
272        let (min, max) = storage.bounds();
273        let max = max.unwrap_or(usize::MAX);
274        let min = min.unwrap_or(1);
275        let revision = self.init_visit();
276
277        let mut success = false;
278        let mut i = 0;
279        let mut negotiated = 0;
280
281        while !success {
282            let produced = self.produced();
283            i = self.cons_cursor.load(SeqCst);
284            negotiated = match produced.checked_sub(i) {
285                Some(available) => available.min(max),
286                None => continue,
287            };
288
289            if negotiated < min {
290                negotiated = 0;
291                break;
292            }
293
294            success = try_swap_int(&self.cons_cursor, i, i + negotiated);
295        }
296
297        storage.reserve(negotiated);
298        let mut remaining = negotiated;
299        let mut is_first_block = true;
300        let mut block_offset = 0;
301        let mut maybe_block = &self.first_block;
302
303        while remaining > 0 {
304            let Some(block) = maybe_block.load() else {
305                maybe_block.append_new();
306                continue;
307            };
308
309            if is_first_block {
310                block_offset = block.offset.load(SeqCst);
311                is_first_block = false;
312            }
313
314            let next_block_offset = block_offset + L;
315            let block_range = block_offset..next_block_offset;
316
317            // do we have slots here?
318            while block_range.contains(&i) && remaining > 0 {
319                let slot_i = i - block_offset;
320                let item = block.consume(slot_i);
321
322                let storage_index = negotiated - remaining;
323                storage.insert(storage_index, item);
324
325                i += 1;
326                remaining -= 1;
327            }
328
329            block_offset = next_block_offset;
330            maybe_block = &block.next;
331        }
332
333        self.stop_visit(revision);
334        self.try_maintain();
335
336        negotiated
337    }
338
339    fn iter(&self) -> PullIter<'_, T> {
340        let revision = self.init_visit();
341        let mut success = false;
342        let mut i = 0;
343        let mut negotiated = 0;
344
345        while !success {
346            let produced = self.produced();
347            i = self.cons_cursor.load(SeqCst);
348
349            negotiated = match produced.checked_sub(i) {
350                Some(available) => available,
351                None => continue,
352            };
353
354            success = try_swap_int(&self.cons_cursor, i, i + negotiated);
355        }
356
357        self.stop_visit(revision);
358
359        PullIter {
360            fifo: self,
361            i,
362            remaining: negotiated,
363        }
364    }
365
366    fn consume_item(&self, i: usize) -> T {
367        let revision = self.init_visit();
368        let mut is_first_block = true;
369        let mut block_offset = 0;
370        let mut maybe_block = &self.first_block;
371
372        let item = loop {
373            let block = maybe_block.load().unwrap();
374
375            if is_first_block {
376                block_offset = block.offset.load(SeqCst);
377                is_first_block = false;
378            }
379
380            let next_block_offset = block_offset + L;
381            let block_range = block_offset..next_block_offset;
382
383            // do we have slots here?
384            if block_range.contains(&i) {
385                let slot_i = i - block_offset;
386                break block.consume(slot_i);
387            }
388
389            block_offset = next_block_offset;
390            maybe_block = &block.next;
391        };
392
393        self.stop_visit(revision);
394
395        item
396    }
397
398    fn iter_drop(&self) {
399        self.try_maintain();
400    }
401}
402
403pub struct PullIter<'a, T> {
404    fifo: &'a dyn FifoApi<T>,
405    i: usize,
406    remaining: usize,
407}
408
409impl<'a, T> Iterator for PullIter<'a, T> {
410    type Item = T;
411
412    fn next(&mut self) -> Option<Self::Item> {
413        let next_rem = self.remaining.checked_sub(1)?;
414        let item = self.fifo.consume_item(self.i);
415
416        self.remaining = next_rem;
417        self.i += 1;
418
419        Some(item)
420    }
421
422    fn size_hint(&self) -> (usize, Option<usize>) {
423        (self.remaining, Some(self.remaining))
424    }
425}
426
427impl<'a, T> ExactSizeIterator for PullIter<'a, T> {}
428
429impl<'a, T> Drop for PullIter<'a, T> {
430    fn drop(&mut self) {
431        // consume remaining items
432        let _ = self.count();
433        self.fifo.iter_drop();
434    }
435}