disk_chan/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{path::Path, sync::Arc};
4
5mod atomic_union;
6
7mod disk_chan_page;
8use disk_chan_page::{ChanPage, IdxUsize};
9
10mod disk_chan;
11use disk_chan::DiskChan;
12
13/// Thread-safe Consumer type with a group number representing the consumer group.
14///
15/// Use [try_clone](Self::try_clone()) to make copies of the [Consumer] in the same group or
16/// use [subscribe](Producer::subscribe()) on a producer to create new Consumers.
17///
18/// ```no_run
19/// # async fn consumer_example() -> Result<(), std::io::Error> {
20/// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
21/// let mut rx = tx.subscribe(0).await?;
22/// let mut rx2 = rx.try_clone().await?;
23///
24/// let msg = loop {
25///     match rx.recv().await {
26///         Some(msg) => break msg,
27///         None => rx.next_page().await?,
28///     }
29/// };
30/// # Ok(())
31/// # }
32/// ```
33pub struct Consumer {
34    current_page: usize,
35    group: usize,
36    local: Arc<ChanPage>,
37    chan: Arc<DiskChan>,
38}
39
40impl std::fmt::Debug for Consumer {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("Consumer")
43            .field("current_page", &self.current_page)
44            .field("group", &self.group)
45            .field("chan", &self.chan)
46            .finish_non_exhaustive()
47    }
48}
49
50impl Consumer {
51    /// Attempts to clone the current [Consumer] with the same group.
52    ///
53    /// This can potentially fail to do interactions with the file system, but
54    /// should be near impossible with correct usage.
55    pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
56        let (current_page, local) = self.chan.get_page(self.current_page).await?;
57
58        Ok(Consumer {
59            current_page,
60            group: self.group,
61            local,
62            chan: self.chan.clone(),
63        })
64    }
65
66    /// Asynchronously receives the next message in the consumer group.
67    ///
68    /// Returns [None] when the consumer has read all messages from the current page,
69    /// and the user must call [next_page](Consumer::next_page()) to continue reading.
70    ///
71    /// This is necessary due to the message's lifetime being tied to the consumer's current page so it must
72    /// not be accessed after a call to [next_page](Consumer::next_page()). Thus, it is up to the
73    /// user to ensure the compiler is happy with lifetimes.
74    ///
75    /// ```no_run
76    /// # async fn consumer_example() -> Result<(), std::io::Error> {
77    /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
78    /// let mut rx = tx.subscribe(0).await?;
79    /// let mut rx2 = rx.try_clone().await?;
80    ///
81    /// let msg = loop {
82    ///     match rx.recv().await {
83    ///         Some(msg) => break msg,
84    ///         None => rx.next_page().await?,
85    ///     }
86    /// };
87    /// # Ok(())
88    /// # }
89    /// ```
90    pub async fn recv(&self) -> Option<&[u8]> {
91        match self.local.pop(self.group).await {
92            Ok(data) => Some(data),
93            Err(_) => None,
94        }
95    }
96
97    /// Sets the internal page to the next available one.
98    ///
99    /// Should be called after [recv](Self::recv()) returns [None].
100    ///
101    /// ```no_run
102    /// # async fn consumer_example() -> Result<(), std::io::Error> {
103    /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
104    /// let mut rx = tx.subscribe(0).await?;
105    /// let mut rx2 = rx.try_clone().await?;
106    ///
107    /// let msg = loop {
108    ///     match rx.recv().await {
109    ///         Some(msg) => break msg,
110    ///         None => rx.next_page().await?,
111    ///     }
112    /// };
113    /// # Ok(())
114    /// # }
115    /// ```
116    pub async fn next_page(&mut self) -> Result<(), std::io::Error> {
117        let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
118        self.current_page = current_page;
119        self.local = local;
120        Ok(())
121    }
122}
123
124/// Thread-safe Producer type.
125///
126/// use [disk_chan::new](new) to create a new [Producer] for a channel. This will lock
127/// the channel to the process as only one process should own the channel at any time.
128///
129/// Use [clone](Self::clone()) to make copies of the Producer. Note that
130/// [try_clone](Self::try_clone) also exists but is solely for consistency with the
131/// [Consumer] api and cannot actually fail.
132///
133/// ```no_run
134/// # async fn producer_example() -> Result<(), std::io::Error> {
135/// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
136///
137/// tx.send("test").await?;
138/// # Ok(())
139/// # }
140/// ```
141#[derive(Clone)]
142pub struct Producer {
143    current_page: usize,
144    local: Arc<ChanPage>,
145    chan: Arc<DiskChan>,
146}
147
148impl std::fmt::Debug for Producer {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("Producer")
151            .field("current_page", &self.current_page)
152            .field("chan", &self.chan)
153            .finish_non_exhaustive()
154    }
155}
156
157/// Creates a new channel and returns the [Producer] for that channel.
158///
159/// Call [Producer::subscribe()] to create a [Consumer] for the channel.
160///
161/// Takes a `page_size` representing the maximum number of bytes per page and
162/// a `max_pages` for the number of pages to have on disk at any moment.
163///
164/// For example,
165/// if you want the channel to hold 4GB of data (~ 2^32 bytes) you can create a channel
166/// with a `page_size` of 2^28 bytes and 16 `max_pages`.
167///
168/// Note that all pages have a maximum number of `2^16 - 1` messages per page so you
169/// want to optimize the `page_size` to be approximately `average message size * (2^16 - 1)`
170/// and adjust `max_pages` to tune the amount of data stored.
171pub async fn new<P: AsRef<Path>>(
172    path: P,
173    page_size: IdxUsize,
174    max_pages: usize,
175) -> Result<Producer, std::io::Error> {
176    let chan = DiskChan::new(path, page_size, max_pages).await?;
177    let chan = Arc::new(chan);
178    let (current_page, local) = chan.get_page(0).await?;
179
180    Ok(Producer {
181        current_page,
182        local,
183        chan,
184    })
185}
186
187impl Producer {
188    /// Clone the [Producer]. This is actually infallible, but exists
189    /// to have consistency with the [Consumer] API.
190    pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
191        Ok(self.clone())
192    }
193
194    /// Creates a new [Consumer] with the given group.
195    ///
196    /// This is thread safe and can be called multiple times with the same group, but it
197    /// is generally recommend to clone existing consumers.
198    pub async fn subscribe(&self, group: usize) -> Result<Consumer, std::io::Error> {
199        let (current_page, local) = self.chan.get_page(0).await?;
200        let chan = self.chan.clone();
201
202        Ok(Consumer {
203            current_page,
204            group,
205            local,
206            chan,
207        })
208    }
209
210    /// Asynchronously sends a message to the channel.
211    ///
212    /// ```no_run
213    /// # async fn producer_example() -> Result<(), std::io::Error> {
214    /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
215    ///
216    /// tx.send("test").await?;
217    /// # Ok(())
218    /// # }
219    /// ```
220    pub async fn send<V: AsRef<[u8]>>(&mut self, val: V) -> Result<(), std::io::Error> {
221        loop {
222            #[allow(clippy::single_match)]
223            match self.local.push(&val) {
224                Ok(()) => return Ok(()),
225                Err(_) => {}
226            }
227
228            let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
229            self.current_page = current_page;
230            self.local = local;
231        }
232    }
233}