async_fifo/fifo/
api.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use core::array::from_fn;
use core::task::Waker;

use crate::slot::AtomicSlot;
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;

use super::block::{Fifo, FifoImpl};
use super::{Storage, TmpArray};

/// Creates a Fifo with custom block size.
///
/// `L` is the block size.
/// `F` must be the block size divided by 8.
///
/// Panics if `F` isn't equal to `L / 8`;
pub fn with_block_size<
    const C: usize,
    const L: usize,
    const F: usize,
    T: 'static,
>() -> (Producer<T>, [Consumer<T>; C]) {
    assert_eq!(F * 8, L);
    let mut wakers = Vec::with_capacity(C);

    for _ in 0..C {
        wakers.push(AtomicSlot::default());
    }

    let fifo: Fifo<L, F, T> = Fifo::new(wakers.into());

    let arc = Arc::new(fifo);

    let consumer = |i| Consumer {
        fifo: arc.clone(),
        waker_index: i,
    };

    let producer = Producer {
        fifo: arc.clone(),
    };

    (producer, from_fn(consumer))
}

/// Creates a Fifo with reasonable default parameters.
pub fn new<const C: usize, T: 'static>() -> (Producer<T>, [Consumer<T>; C]) {
    with_block_size::<C, 32, 4, T>()
}

/// Fifo Production Handle (implements `Clone`)
#[derive(Clone)]
pub struct Producer<T> {
    fifo: Arc<dyn FifoImpl<T>>,
}

impl<T> Producer<T> {
    /// Sends a batch of items in the channel, atomically.
    ///
    /// This operation is non-blocking and always succeeds immediately.
    pub fn send_iter<I: ExactSizeIterator<Item = T>>(&self, mut iter: I) {
        self.fifo.send_iter(&mut iter);
    }

    /// Sends one item through the channel.
    ///
    /// This operation is non-blocking and always succeeds immediately.
    pub fn send(&self, item: T) {
        self.send_iter(core::iter::once(item));
    }
}

/// Fifo Consumption Handle
pub struct Consumer<T> {
    fifo: Arc<dyn FifoImpl<T>>,
    waker_index: usize,
}

unsafe impl<T> Send for Producer<T> {}
unsafe impl<T> Sync for Producer<T> {}

unsafe impl<T> Send for Consumer<T> {}
unsafe impl<T> Sync for Consumer<T> {}

impl<T> Consumer<T> {
    /// Tries to receive some items into custom storage.
    pub fn try_recv_into(&self, storage: &mut dyn Storage<T>) -> usize {
        self.fifo.try_recv(storage)
    }

    /// Tries to receive as many items as possible, into a vector.
    pub fn try_recv_many(&self) -> Vec<T> {
        let mut items = Vec::new();
        self.try_recv_into(&mut items);
        items
    }

    /// Tries to receive exactly `N` items into an array.
    pub fn try_recv_exact<const N: usize>(&self) -> Option<[T; N]> {
        let mut array = TmpArray(from_fn(|_| None));
        let len = self.try_recv_into(&mut array);
        (len == N).then(|| array.0.map(Option::unwrap))
    }

    /// Tries to receive one item.
    pub fn try_recv(&self) -> Option<T> {
        self.try_recv_exact().map(|[item]| item)
    }

    /// Sets the waker of the current task, to be woken up when new items are available.
    pub fn insert_waker(&self, waker: Box<Waker>) {
        self.fifo.insert_waker(waker, self.waker_index);
    }

    /// Tries to take back a previously inserted waker.
    pub fn take_waker(&self) -> Option<Box<Waker>> {
        self.fifo.take_waker(self.waker_index)
    }
}