disk_chan/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
#![doc = include_str!("../README.md")]
use std::{path::Path, sync::Arc};
mod atomic_union;
mod disk_chan_page;
use disk_chan_page::*;
mod disk_chan;
use disk_chan::DiskChan;
/// Thread-safe Consumer type with a group number representing the consumer group.
///
/// Use [try_clone](Self::try_clone()) to make copies of the [Consumer] in the same group or
/// use [subscribe](Producer::subscribe()) on a producer to create new Consumers.
///
/// ```no_run
/// # async fn consumer_example() -> Result<(), std::io::Error> {
/// let mut tx = disk_chan::new("foo", 2_usize.pow(24), 1).await?;
/// let mut rx = tx.subscribe(0).await?;
/// let mut rx2 = rx.try_clone().await?;
///
/// let msg = loop {
/// match rx.recv().await {
/// Some(msg) => break msg,
/// None => rx.next_page().await?,
/// }
/// };
/// # Ok(())
/// # }
/// ```
pub struct Consumer {
current_page: usize,
group: usize,
local: Arc<ChanPage>,
chan: Arc<DiskChan>,
}
impl std::fmt::Debug for Consumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Consumer")
.field("current_page", &self.current_page)
.field("group", &self.group)
.field("chan", &self.chan)
.finish_non_exhaustive()
}
}
impl Consumer {
/// Attempts to clone the current [Consumer] with the same group.
///
/// This can potentially fail to do interactions with the file system, but
/// should be near impossible with correct usage.
pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
let (current_page, local) = self.chan.get_page(self.current_page).await?;
Ok(Consumer {
current_page,
group: self.group,
local,
chan: self.chan.clone(),
})
}
/// Asynchronously receives the next message in the consumer group.
///
/// Returns [None] when the consumer has read all messages from the current page,
/// and the user must call [next_page](Consumer::next_page()) to continue reading.
///
/// This is necessary due to the message's lifetime being tied to the consumer's current page so it must
/// not be accessed after a call to [next_page](Consumer::next_page()). Thus, it is up to the
/// user to ensure the compiler is happy with lifetimes.
///
/// ```no_run
/// # async fn consumer_example() -> Result<(), std::io::Error> {
/// let mut tx = disk_chan::new("foo", 2_usize.pow(24), 1).await?;
/// let mut rx = tx.subscribe(0).await?;
/// let mut rx2 = rx.try_clone().await?;
///
/// let msg = loop {
/// match rx.recv().await {
/// Some(msg) => break msg,
/// None => rx.next_page().await?,
/// }
/// };
/// # Ok(())
/// # }
/// ```
pub async fn recv(&self) -> Option<&[u8]> {
match self.local.pop(self.group).await {
Ok(data) => Some(data),
Err(_) => None,
}
}
/// Sets the internal page to the next available one.
///
/// Should be called after [recv](Self::recv()) returns [None].
///
/// ```no_run
/// # async fn consumer_example() -> Result<(), std::io::Error> {
/// let mut tx = disk_chan::new("foo", 2_usize.pow(24), 1).await?;
/// let mut rx = tx.subscribe(0).await?;
/// let mut rx2 = rx.try_clone().await?;
///
/// let msg = loop {
/// match rx.recv().await {
/// Some(msg) => break msg,
/// None => rx.next_page().await?,
/// }
/// };
/// # Ok(())
/// # }
/// ```
pub async fn next_page(&mut self) -> Result<(), std::io::Error> {
let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
self.current_page = current_page;
self.local = local;
Ok(())
}
}
/// Thread-safe Producer type.
///
/// use [disk_chan::new](new) to create a new [Producer] for a channel. This will lock
/// the channel to the process as only one process should own the channel at any time.
///
/// Use [clone](Self::clone()) to make copies of the Producer. Note that
/// [try_clone](Self::try_clone) also exists but is solely for consistency with the
/// [Consumer] api and cannot actually fail.
///
/// ```no_run
/// # async fn producer_example() -> Result<(), std::io::Error> {
/// let mut tx = disk_chan::new("foo", 2_usize.pow(24), 1).await?;
///
/// tx.send("test").await?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct Producer {
current_page: usize,
local: Arc<ChanPage>,
chan: Arc<DiskChan>,
}
impl std::fmt::Debug for Producer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Producer")
.field("current_page", &self.current_page)
.field("chan", &self.chan)
.finish_non_exhaustive()
}
}
/// Creates a new channel and returns the [Producer] for that channel.
///
/// Call [Producer::subscribe()] to create a [Consumer] for the channel.
///
/// Takes a `page_size` representing the maximum number of bytes per page and
/// a `max_pages` for the number of pages to have on disk at any moment.
///
/// For example,
/// if you want the channel to hold 4GB of data (~ 2^32 bytes) you can create a channel
/// with a `page_size` of 2^28 bytes and 16 `max_pages`.
///
/// Note that all pages have a maximum number of `2^16 - 1` messages per page so you
/// want to optimize the `page_size` to be approximately `average message size * (2^16 - 1)`
/// and adjust `max_pages` to tune the amount of data stored.
pub async fn new<P: AsRef<Path>>(
path: P,
page_size: usize,
max_pages: usize,
) -> Result<Producer, std::io::Error> {
let chan = DiskChan::new(path, page_size, max_pages).await?;
let chan = Arc::new(chan);
let (current_page, local) = chan.get_page(0).await?;
Ok(Producer {
current_page,
local,
chan,
})
}
impl Producer {
/// Clone the [Producer]. This is actually infallible, but exists
/// to have consistency with the [Consumer] API.
pub async fn try_clone(&self) -> Result<Self, std::io::Error> {
Ok(self.clone())
}
/// Creates a new [Consumer] with the given group.
///
/// This is thread safe and can be called multiple times with the same group, but it
/// is generally recommend to clone existing consumers.
pub async fn subscribe(&self, group: usize) -> Result<Consumer, std::io::Error> {
let (current_page, local) = self.chan.get_page(0).await?;
let chan = self.chan.clone();
Ok(Consumer {
current_page,
group,
local,
chan,
})
}
/// Asynchronously sends a message to the channel.
///
/// ```no_run
/// # async fn producer_example() -> Result<(), std::io::Error> {
/// let mut tx = disk_chan::new("foo", 2_usize.pow(24), 1).await?;
///
/// tx.send("test").await?;
/// # Ok(())
/// # }
/// ```
pub async fn send<V: AsRef<[u8]>>(&mut self, val: V) -> Result<(), std::io::Error> {
loop {
match self.local.push(&val) {
Ok(()) => return Ok(()),
Err(_) => {}
}
let (current_page, local) = self.chan.get_page(self.current_page + 1).await?;
self.current_page = current_page;
self.local = local;
}
}
}