use std::sync::{
LazyLock,
mpsc::{Receiver, Sender, channel},
};
use lunar_lib::{trace, warn};
use sled::transaction::TransactionError;
use crate::database::{CompareAndSwapTransaction, DatabaseError, apply_cas_tx};
pub type TxFn =
Box<dyn FnMut(&mut CompareAndSwapTransaction) -> Result<(), DatabaseError> + Send + 'static>;
struct SyncTx {
func: TxFn,
flush: bool,
callback: Sender<Result<(), DatabaseError>>,
}
static WRITER: LazyLock<Sender<SyncTx>> = LazyLock::new(|| {
let (tx, rx) = channel();
std::thread::spawn(move || {
writer_thread(rx);
});
tx
});
fn writer_thread(rx: Receiver<SyncTx>) {
'rx_loop: while let Ok(SyncTx {
mut func,
flush,
callback,
}) = rx.recv()
{
for _ in 0..10 {
let mut cas_tx = CompareAndSwapTransaction::new();
if let Err(err) = func(&mut cas_tx) {
let _ = callback.send(Err(err));
continue 'rx_loop;
}
match apply_cas_tx(cas_tx, flush) {
Ok(()) => {
trace!("Completed sync transaction");
let _ = callback.send(Ok(()));
continue 'rx_loop;
}
Err(TransactionError::Abort(DatabaseError::CompareAndSwapError(_))) => {
trace!("Sync transaction ran into a CAS error and is retrying.");
}
Err(err) => {
let err = DatabaseError::from(err);
let _ = callback.send(Err(err));
continue 'rx_loop;
}
}
}
let _ = callback.send(Err(DatabaseError::TooManyRetries));
}
warn!("Writer shutting down because the channel closed");
}
pub fn db_sync_transaction<F>(f: F, flush: bool) -> Result<(), DatabaseError>
where
F: FnMut(&mut CompareAndSwapTransaction) -> Result<(), DatabaseError> + Send + 'static,
{
let (tx, rx) = channel();
let sync_tx = SyncTx {
func: Box::new(f),
flush,
callback: tx,
};
WRITER.send(sync_tx).expect("Writer sender died");
rx.recv().expect("Writer died while holding reciever")
}