use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, Sender, channel},
},
thread::{self, JoinHandle},
time::Duration,
};
use crate::{
database::{CompareAndSwapTransaction, Database, TransactionError, apply_cas_tx},
trace, warn,
};
type TransactionFn<D> = Box<
dyn FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError> + Send + 'static,
>;
pub struct DatabaseWriter<D: Database> {
sender: Option<Sender<TransactionFn<D>>>,
closed: Arc<AtomicBool>,
thread: Option<JoinHandle<Result<(), TransactionError>>>,
}
impl<D: Database + 'static> DatabaseWriter<D> {
pub fn spawn() -> Self {
let (sender, rx) = channel();
let closed = Arc::new(AtomicBool::new(false));
let run_closed = closed.clone();
let thread = thread::spawn(move || {
if let Err(err) = Self::run(rx) {
run_closed.store(true, Ordering::Relaxed);
return Err(err);
}
Ok(())
});
Self {
sender: Some(sender),
closed,
thread: Some(thread),
}
}
pub fn run(rx: Receiver<TransactionFn<D>>) -> Result<(), TransactionError> {
'run_loop: loop {
let mut queue = Vec::new();
match rx.recv() {
Ok(func) => queue.push(func),
Err(_) => return Ok(()),
}
thread::sleep(Duration::from_millis(250));
loop {
match rx.try_recv() {
Ok(func) => {
queue.push(func);
}
Err(_) => break,
}
}
for _ in 0..10 {
let mut cas_tx = CompareAndSwapTransaction::new()?;
for func in &mut queue {
func(&mut cas_tx)?;
}
match apply_cas_tx(cas_tx, false) {
Ok(()) => continue 'run_loop,
Err(TransactionError::CompareAndSwapError) => {
trace!("Transaction ran into a CAS error and is retrying.");
}
Err(err) => return Err(err.into()),
}
}
return Err(TransactionError::TooManyRetries);
}
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
pub fn transaction<F>(&self, mut func: F)
where
F: FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError>
+ Send
+ 'static,
{
if self.is_closed() {
return;
}
let boxed: TransactionFn<D> = Box::new(move |cas_tx| func(cas_tx));
if let Some(sender) = &self.sender {
let _ = sender.send(boxed);
}
}
pub fn finish(mut self) -> Result<(), TransactionError> {
drop(self.sender.take());
self.thread
.take()
.expect("Writer thread was lost?")
.join()
.expect("Writer thread panicked")?;
Ok(())
}
}
impl<D: Database> Drop for DatabaseWriter<D> {
fn drop(&mut self) {
drop(self.sender.take());
if let Some(take) = self.thread.take() {
if let Err(err) = take
.join()
.expect("Writer thread panicked after being dropped")
{
warn!("A database writer was dropped with an error: {err}")
}
}
}
}