use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, Sender, channel},
},
thread::{self, JoinHandle},
time::Duration,
};
use crate::{
database::{
CompareAndSwapTransaction, Database, DatabaseError, DbHandle, TransactionError,
apply_cas_tx,
},
trace, warn,
};
type TransactionFn<D> = Box<
dyn FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError> + Send + 'static,
>;
pub struct DatabaseWriter<D: Database> {
sender: Option<Sender<TransactionFn<D>>>,
closed: Arc<AtomicBool>,
thread: Option<JoinHandle<Result<(), DatabaseError>>>,
}
impl<D: Database + 'static> DatabaseWriter<D> {
#[must_use]
pub fn spawn() -> Self {
let (sender, rx) = channel();
let closed = Arc::new(AtomicBool::new(false));
let run_closed = closed.clone();
let thread = thread::spawn(move || {
if let Err(err) = Self::run(rx) {
run_closed.store(true, Ordering::Relaxed);
return Err(err);
}
Ok(())
});
Self {
sender: Some(sender),
closed,
thread: Some(thread),
}
}
pub fn run(rx: Receiver<TransactionFn<D>>) -> Result<(), DatabaseError> {
'run_loop: loop {
let mut queue = Vec::new();
match rx.recv() {
Ok(func) => queue.push(func),
Err(_) => return Ok(()),
}
thread::sleep(Duration::from_millis(250));
while let Ok(func) = rx.try_recv() {
queue.push(func);
}
let database = DbHandle::<D>::open()?;
for _ in 0..10 {
let mut cas_tx = CompareAndSwapTransaction::with_db(database.clone());
for func in &mut queue {
func(&mut cas_tx)?;
}
match apply_cas_tx(cas_tx, false) {
Ok(()) => continue 'run_loop,
Err(TransactionError::CompareAndSwapError) => {
trace!("Transaction ran into a CAS error and is retrying.");
}
Err(err) => return Err(DatabaseError::Transaction(err)),
}
}
return Err(DatabaseError::Transaction(TransactionError::TooManyRetries));
}
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
pub fn transaction<F>(&self, mut func: F)
where
F: FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError>
+ Send
+ 'static,
{
if self.is_closed() {
return;
}
let boxed: TransactionFn<D> = Box::new(move |cas_tx| func(cas_tx));
if let Some(sender) = &self.sender {
let _ = sender.send(boxed);
}
}
pub fn finish(mut self) -> Result<(), DatabaseError> {
drop(self.sender.take());
self.thread
.take()
.expect("Writer thread was lost?")
.join()
.expect("Writer thread panicked")?;
Ok(())
}
}
impl<D: Database> Drop for DatabaseWriter<D> {
fn drop(&mut self) {
drop(self.sender.take());
if let Some(take) = self.thread.take()
&& let Err(err) = take
.join()
.expect("Writer thread panicked after being dropped")
{
warn!("A database writer was dropped with an error: {err}");
}
}
}