disk_chan/lib.rs
1#![doc = include_str!("../README.md")]
2
3use std::{path::Path, sync::Arc};
4
5mod atomic_union;
6
7mod disk_chan_page;
8use disk_chan_page::{ChanPage, IdxUsize};
9
10mod disk_chan;
11use disk_chan::DiskChan;
12
13/// Thread-safe Consumer type with a group number representing the consumer group.
14///
15/// Use [try_clone](Self::try_clone()) to make copies of the [Consumer] in the same group or
16/// use [subscribe](Producer::subscribe()) on a producer to create new Consumers.
17///
18/// ```no_run
19/// # async fn consumer_example() -> Result<(), std::io::Error> {
20/// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
21/// let mut rx = tx.subscribe(0).await?;
22/// let mut rx2 = rx.try_clone().await?;
23///
24/// let msg = loop {
25/// match rx.recv().await {
26/// Some(msg) => break msg,
27/// None => rx.next_page().await?,
28/// }
29/// };
30/// # Ok(())
31/// # }
32/// ```
33pub struct Consumer {
34 current_page: usize,
35 group: usize,
36 local: Arc<ChanPage>,
37 chan: Arc<DiskChan>,
38}
39
40impl std::fmt::Debug for Consumer {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("Consumer")
43 .field("current_page", &self.current_page)
44 .field("group", &self.group)
45 .field("chan", &self.chan)
46 .finish_non_exhaustive()
47 }
48}
49
50impl Consumer {
51 /// Attempts to clone the current [Consumer] with the same group.
52 ///
53 /// This can potentially fail to do interactions with the file system, but
54 /// should be near impossible with correct usage.
55 pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
56 let (current_page, local) = self.chan.get_page(self.current_page).await?;
57
58 Ok(Consumer {
59 current_page,
60 group: self.group,
61 local,
62 chan: self.chan.clone(),
63 })
64 }
65
66 /// Asynchronously receives the next message in the consumer group.
67 ///
68 /// Returns [None] when the consumer has read all messages from the current page,
69 /// and the user must call [next_page](Consumer::next_page()) to continue reading.
70 ///
71 /// This is necessary due to the message's lifetime being tied to the consumer's current page so it must
72 /// not be accessed after a call to [next_page](Consumer::next_page()). Thus, it is up to the
73 /// user to ensure the compiler is happy with lifetimes.
74 ///
75 /// ```no_run
76 /// # async fn consumer_example() -> Result<(), std::io::Error> {
77 /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
78 /// let mut rx = tx.subscribe(0).await?;
79 /// let mut rx2 = rx.try_clone().await?;
80 ///
81 /// let msg = loop {
82 /// match rx.recv().await {
83 /// Some(msg) => break msg,
84 /// None => rx.next_page().await?,
85 /// }
86 /// };
87 /// # Ok(())
88 /// # }
89 /// ```
90 pub async fn recv(&self) -> Option<&[u8]> {
91 match self.local.pop(self.group).await {
92 Ok(data) => Some(data),
93 Err(_) => None,
94 }
95 }
96
97 /// Sets the internal page to the next available one.
98 ///
99 /// Should be called after [recv](Self::recv()) returns [None].
100 ///
101 /// ```no_run
102 /// # async fn consumer_example() -> Result<(), std::io::Error> {
103 /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
104 /// let mut rx = tx.subscribe(0).await?;
105 /// let mut rx2 = rx.try_clone().await?;
106 ///
107 /// let msg = loop {
108 /// match rx.recv().await {
109 /// Some(msg) => break msg,
110 /// None => rx.next_page().await?,
111 /// }
112 /// };
113 /// # Ok(())
114 /// # }
115 /// ```
116 pub async fn next_page(&mut self) -> Result<(), std::io::Error> {
117 let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
118 self.current_page = current_page;
119 self.local = local;
120 Ok(())
121 }
122}
123
124/// Thread-safe Producer type.
125///
126/// use [disk_chan::new](new) to create a new [Producer] for a channel. This will lock
127/// the channel to the process as only one process should own the channel at any time.
128///
129/// Use [clone](Self::clone()) to make copies of the Producer. Note that
130/// [try_clone](Self::try_clone) also exists but is solely for consistency with the
131/// [Consumer] api and cannot actually fail.
132///
133/// ```no_run
134/// # async fn producer_example() -> Result<(), std::io::Error> {
135/// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
136///
137/// tx.send("test").await?;
138/// # Ok(())
139/// # }
140/// ```
141#[derive(Clone)]
142pub struct Producer {
143 current_page: usize,
144 local: Arc<ChanPage>,
145 chan: Arc<DiskChan>,
146}
147
148impl std::fmt::Debug for Producer {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("Producer")
151 .field("current_page", &self.current_page)
152 .field("chan", &self.chan)
153 .finish_non_exhaustive()
154 }
155}
156
157/// Creates a new channel and returns the [Producer] for that channel.
158///
159/// Call [Producer::subscribe()] to create a [Consumer] for the channel.
160///
161/// Takes a `page_size` representing the maximum number of bytes per page and
162/// a `max_pages` for the number of pages to have on disk at any moment.
163///
164/// For example,
165/// if you want the channel to hold 4GB of data (~ 2^32 bytes) you can create a channel
166/// with a `page_size` of 2^28 bytes and 16 `max_pages`.
167///
168/// Note that all pages have a maximum number of `2^16 - 1` messages per page so you
169/// want to optimize the `page_size` to be approximately `average message size * (2^16 - 1)`
170/// and adjust `max_pages` to tune the amount of data stored.
171pub async fn new<P: AsRef<Path>>(
172 path: P,
173 page_size: IdxUsize,
174 max_pages: usize,
175) -> Result<Producer, std::io::Error> {
176 let chan = DiskChan::new(path, page_size, max_pages).await?;
177 let chan = Arc::new(chan);
178 let (current_page, local) = chan.get_page(0).await?;
179
180 Ok(Producer {
181 current_page,
182 local,
183 chan,
184 })
185}
186
187impl Producer {
188 /// Clone the [Producer]. This is actually infallible, but exists
189 /// to have consistency with the [Consumer] API.
190 pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
191 Ok(self.clone())
192 }
193
194 /// Creates a new [Consumer] with the given group.
195 ///
196 /// This is thread safe and can be called multiple times with the same group, but it
197 /// is generally recommend to clone existing consumers.
198 pub async fn subscribe(&self, group: usize) -> Result<Consumer, std::io::Error> {
199 let (current_page, local) = self.chan.get_page(0).await?;
200 let chan = self.chan.clone();
201
202 Ok(Consumer {
203 current_page,
204 group,
205 local,
206 chan,
207 })
208 }
209
210 /// Asynchronously sends a message to the channel.
211 ///
212 /// ```no_run
213 /// # async fn producer_example() -> Result<(), std::io::Error> {
214 /// let mut tx = disk_chan::new("foo", 2_u32.pow(24), 1).await?;
215 ///
216 /// tx.send("test").await?;
217 /// # Ok(())
218 /// # }
219 /// ```
220 pub async fn send<V: AsRef<[u8]>>(&mut self, val: V) -> Result<(), std::io::Error> {
221 loop {
222 #[allow(clippy::single_match)]
223 match self.local.push(&val) {
224 Ok(()) => return Ok(()),
225 Err(_) => {}
226 }
227
228 let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
229 self.current_page = current_page;
230 self.local = local;
231 }
232 }
233}