1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use crate::Storage;
use std::fmt::Formatter;
use tokio::{
sync::{mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot},
task::JoinHandle,
};
/// Persists records asynchronously.
///
/// You may want to use this instead of directly calling your persistence backend if you do not want
/// to wait for the record to be persisted, in the handler which created the record. To achieve this
/// Recoder spawns an actor to which all records are sent immediatly. The actor when uses the
/// [`Storage`] trait to talk to your persistence backend.
///
/// Recorder takes ownership of an actor and the green thread it is running in.
pub struct Recorder<T: Storage> {
/// We need the handle to make sure we join the actor before our recorder goes out of scope.
join_handle: JoinHandle<T>,
/// We choose an unbounded sender since we want to talk from sync to async code without waiting
/// for the persistence backend to catch up.
sender: UnboundedSender<Command<T::Record>>,
}
impl<T> Recorder<T>
where
T: Storage + 'static + Send,
T::Record: Send,
{
pub async fn new(storage: T) -> Self {
let (sender, receiver) = unbounded_channel();
let actor = Actor::new(storage, receiver);
let join_handle = tokio::spawn(actor.run());
Self {
join_handle,
sender,
}
}
/// Sends the record to the internal actor for storage. This interface is fire and forget. It
/// will not wait for the record to be actually persisted, just place it in the channel for the
/// actor to pick up. This is why this method is both synchronous and non blocking.
pub fn save(&self, record: T::Record) {
self.sender
.send(Command::Save(record))
.expect("Receiver must not be closed.")
}
/// Stop accepting new records to save, persist the ones send so far.
///
/// Gives back ownership of the underlying storage.
pub async fn close(self) -> T {
// Close sender, so we stop sending messages and `Actor::run`.
drop(self.sender);
// Now that actor run nows it should terminate, we wait for it.
self.join_handle
.await
.expect("Recorder actor thread must always be able to join")
}
/// All the records stored in the internal storage.
pub async fn records(&self) -> Vec<T::Record> {
let (sender, receiver) = oneshot::channel();
self.sender.send(Command::Load(sender)).expect("Receiver must not be closed");
receiver.await.expect("The sender must not be dropped")
}
}
/// Asynchronously spawned by [`Recorder`] in order to persist records
struct Actor<T: Storage> {
storage: T,
receiver: UnboundedReceiver<Command<T::Record>>,
}
impl<T> Actor<T>
where
T: Storage,
{
pub fn new(storage: T, receiver: UnboundedReceiver<Command<T::Record>>) -> Self {
Self { storage, receiver }
}
pub async fn run(mut self) -> T {
// If messages come in fast, we do not send them one by one, but rather collect all since
// the last call to save in one bulk;
let mut bulk = Vec::new();
let mut current = self.receiver.recv().await;
while let Some(command) = current.take() {
let next = match command {
Command::Save(record) => {
bulk.push(record);
// Push all immediatly available records into the next bulk, until it would
// block again, or we would have to serve a load command.
let next = loop {
match self.receiver.try_recv() {
Ok(Command::Save(record)) => bulk.push(record),
Ok(other) => break Some(other),
Err(_) => break None,
}
};
self.storage.save(&mut bulk).await;
bulk.clear();
next
},
Command::Load(sender) => {
// Fetch records ...
let records = self.storage.load().await;
// ... and answer sender. This might fail, but if the sender is dropped and
// stopped, caring, so do we. Let's drop the result.
let _ = sender.send(records);
// We did not peek ahead, so we do not know the next command.
None
},
};
// Use next or wait for next event
current = if next.is_none() {
// Wait for the next event, can block. If none this means recorder has been dropped
// and we terminate this loop.
self.receiver.recv().await
} else {
// We already know the next event to process, since we had to peek ahead.
next
};
}
self.storage
}
}
/// Message send from recorder to actor. Allowes for custom debug implementation lifting the
/// limitation that `T` has to be `Debug`.
enum Command<T> {
/// Save record T to the storage backend
Save(T),
/// Load all records from the storage. Use the sender to return them back to the caller.
Load(oneshot::Sender<Vec<T>>),
}
/// Custom implementation of debug for Message, which does not rely on the record type `T` to be
/// debug itstelf.
impl<T> std::fmt::Debug for Command<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Command::Save(_) => f.debug_tuple("Save").finish(),
Command::Load(_) => f.debug_tuple("Load").finish(),
}
}
}