selene-core 0.4.2

selene-core is the backend for Selene, a local-first music player
Documentation
use std::sync::{
    LazyLock,
    mpsc::{Receiver, Sender, channel},
};

use lunar_lib::{trace, warn};
use sled::transaction::TransactionError;

use crate::database::{CompareAndSwapTransaction, DatabaseError, apply_cas_tx};

pub type TxFn =
    Box<dyn FnMut(&mut CompareAndSwapTransaction) -> Result<(), DatabaseError> + Send + 'static>;

struct SyncTx {
    func: TxFn,
    flush: bool,
    callback: Sender<Result<(), DatabaseError>>,
}

static WRITER: LazyLock<Sender<SyncTx>> = LazyLock::new(|| {
    let (tx, rx) = channel();

    std::thread::spawn(move || {
        writer_thread(rx);
    });

    tx
});

fn writer_thread(rx: Receiver<SyncTx>) {
    'rx_loop: while let Ok(SyncTx {
        mut func,
        flush,
        callback,
    }) = rx.recv()
    {
        for _ in 0..10 {
            let mut cas_tx = CompareAndSwapTransaction::new();
            if let Err(err) = func(&mut cas_tx) {
                let _ = callback.send(Err(err));
                continue 'rx_loop;
            }

            match apply_cas_tx(cas_tx, flush) {
                Ok(()) => {
                    trace!("Completed sync transaction");
                    let _ = callback.send(Ok(()));
                    continue 'rx_loop;
                }
                Err(TransactionError::Abort(DatabaseError::CompareAndSwapError(_))) => {
                    trace!("Sync transaction ran into a CAS error and is retrying.");
                }
                Err(err) => {
                    let err = DatabaseError::from(err);
                    let _ = callback.send(Err(err));
                    continue 'rx_loop;
                }
            }
        }
        let _ = callback.send(Err(DatabaseError::TooManyRetries));
    }

    warn!("Writer shutting down because the channel closed");
}

pub fn db_sync_transaction<F>(f: F, flush: bool) -> Result<(), DatabaseError>
where
    F: FnMut(&mut CompareAndSwapTransaction) -> Result<(), DatabaseError> + Send + 'static,
{
    let (tx, rx) = channel();
    let sync_tx = SyncTx {
        func: Box::new(f),
        flush,
        callback: tx,
    };

    WRITER.send(sync_tx).expect("Writer sender died");

    rx.recv().expect("Writer died while holding reciever")
}