use crate::Storage;
use std::{fmt::Formatter, future::Future};
use tokio::{
sync::{mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot},
task::JoinHandle,
};
pub struct Recorder<T: Storage> {
join_handle: JoinHandle<T>,
sender: UnboundedSender<Command<T>>,
}
impl<T> Recorder<T>
where
T: Storage + 'static + Send,
T::Record: Send,
T::Query: Send,
{
pub fn new(storage: T) -> Self {
let (sender, receiver) = unbounded_channel();
let actor = Actor::new(storage, receiver);
let join_handle = tokio::spawn(actor.run());
Self {
join_handle,
sender,
}
}
pub fn from_delayed_storage(storage: impl Future<Output=T> + Send + 'static) -> Self {
let (sender, receiver) = unbounded_channel();
let join_handle = tokio::spawn(async {
let actor = Actor::new(storage.await, receiver);
actor.run().await
});
Self {
join_handle,
sender,
}
}
pub fn save(&self, record: T::Record) {
self.sender
.send(Command::Save(record))
.expect("Receiver must not be closed.")
}
pub async fn close(self) -> T {
drop(self.sender);
self.join_handle
.await
.expect("Recorder actor thread must always be able to join")
}
pub async fn records(&self, query: T::Query) -> Vec<T::Record> {
let (sender, receiver) = oneshot::channel();
self.sender.send(Command::Load(sender, query)).expect("Receiver must not be closed");
receiver.await.expect("The sender must not be dropped")
}
}
struct Actor<T: Storage> {
storage: T,
receiver: UnboundedReceiver<Command<T>>,
}
impl<T> Actor<T>
where
T: Storage,
{
pub fn new(storage: T, receiver: UnboundedReceiver<Command<T>>) -> Self {
Self { storage, receiver }
}
pub async fn run(mut self) -> T {
let mut bulk = Vec::new();
let mut current = self.receiver.recv().await;
while let Some(command) = current.take() {
let next = match command {
Command::Save(record) => {
bulk.push(record);
let next = loop {
match self.receiver.try_recv() {
Ok(Command::Save(record)) => bulk.push(record),
Ok(other) => break Some(other),
Err(_) => break None,
}
};
self.storage.save(&mut bulk).await;
bulk.clear();
next
},
Command::Load(sender, query) => {
let records = self.storage.load(query).await;
let _ = sender.send(records);
None
},
};
current = if next.is_none() {
self.receiver.recv().await
} else {
next
};
}
self.storage
}
}
enum Command<T: Storage> {
Save(T::Record),
Load(oneshot::Sender<Vec<T::Record>>, T::Query),
}
impl<T> std::fmt::Debug for Command<T> where T: Storage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Command::Save(_) => f.debug_tuple("Save").finish(),
Command::Load(..) => f.debug_tuple("Load").finish(),
}
}
}