lunar-lib 0.6.1

Common utilities for lunar applications
Documentation
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
        mpsc::{Receiver, Sender, channel},
    },
    thread::{self, JoinHandle},
    time::Duration,
};

use crate::{
    database::{CompareAndSwapTransaction, Database, TransactionError, apply_cas_tx},
    trace, warn,
};

type TransactionFn<D> = Box<
    dyn FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError> + Send + 'static,
>;

pub struct DatabaseWriter<D: Database> {
    sender: Option<Sender<TransactionFn<D>>>,
    closed: Arc<AtomicBool>,
    thread: Option<JoinHandle<Result<(), TransactionError>>>,
}

impl<D: Database + 'static> DatabaseWriter<D> {
    pub fn spawn() -> Self {
        let (sender, rx) = channel();
        let closed = Arc::new(AtomicBool::new(false));

        let run_closed = closed.clone();
        let thread = thread::spawn(move || {
            if let Err(err) = Self::run(rx) {
                run_closed.store(true, Ordering::Relaxed);
                return Err(err);
            }
            Ok(())
        });

        Self {
            sender: Some(sender),
            closed,
            thread: Some(thread),
        }
    }

    pub fn run(rx: Receiver<TransactionFn<D>>) -> Result<(), TransactionError> {
        'run_loop: loop {
            let mut queue = Vec::new();

            match rx.recv() {
                Ok(func) => queue.push(func),
                Err(_) => return Ok(()),
            }

            thread::sleep(Duration::from_millis(250));

            loop {
                match rx.try_recv() {
                    Ok(func) => {
                        queue.push(func);
                    }
                    Err(_) => break,
                }
            }

            for _ in 0..10 {
                let mut cas_tx = CompareAndSwapTransaction::new()?;
                for func in &mut queue {
                    func(&mut cas_tx)?;
                }

                match apply_cas_tx(cas_tx, false) {
                    Ok(()) => continue 'run_loop,
                    Err(TransactionError::CompareAndSwapError) => {
                        trace!("Transaction ran into a CAS error and is retrying.");
                    }
                    Err(err) => return Err(err.into()),
                }
            }
            return Err(TransactionError::TooManyRetries);
        }
    }

    pub fn is_closed(&self) -> bool {
        self.closed.load(Ordering::Relaxed)
    }

    pub fn transaction<F>(&self, mut func: F)
    where
        F: FnMut(&mut CompareAndSwapTransaction<D>) -> Result<(), TransactionError>
            + Send
            + 'static,
    {
        if self.is_closed() {
            return;
        }

        let boxed: TransactionFn<D> = Box::new(move |cas_tx| func(cas_tx));

        if let Some(sender) = &self.sender {
            let _ = sender.send(boxed);
        }
    }

    pub fn finish(mut self) -> Result<(), TransactionError> {
        drop(self.sender.take());

        self.thread
            .take()
            .expect("Writer thread was lost?")
            .join()
            .expect("Writer thread panicked")?;

        Ok(())
    }
}

impl<D: Database> Drop for DatabaseWriter<D> {
    fn drop(&mut self) {
        drop(self.sender.take());

        if let Some(take) = self.thread.take() {
            if let Err(err) = take
                .join()
                .expect("Writer thread panicked after being dropped")
            {
                warn!("A database writer was dropped with an error: {err}")
            }
        }
    }
}