sombra 0.3.6

High-performance graph database with ACID transactions, single-file storage, and bindings for Rust, TypeScript, and Python
Documentation
use std::path::PathBuf;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

use crate::error::{acquire_lock, Result};

pub type TxId = u64;

pub(crate) enum ControlMessage {
    Commit(CommitRequest),
    #[allow(dead_code)]
    Shutdown,
}

pub(crate) struct CommitRequest {
    #[allow(dead_code)]
    pub tx_id: TxId,
    pub notifier: Arc<(Mutex<bool>, Condvar)>,
}

impl std::fmt::Debug for GroupCommitState {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("GroupCommitState").finish()
    }
}

pub(crate) struct GroupCommitState {
    pub sender: Sender<ControlMessage>,
    pub _committer_thread: Option<thread::JoinHandle<()>>,
}

impl GroupCommitState {
    pub fn spawn(db_path: PathBuf, timeout_ms: u64) -> Result<Arc<Mutex<Self>>> {
        let (sender, receiver): (Sender<ControlMessage>, Receiver<ControlMessage>) =
            mpsc::channel();

        let committer_thread = thread::spawn(move || {
            Self::group_commit_loop(db_path, receiver, timeout_ms);
        });

        Ok(Arc::new(Mutex::new(GroupCommitState {
            sender,
            _committer_thread: Some(committer_thread),
        })))
    }

    #[allow(dead_code)]
    pub fn shutdown(&self) -> Result<()> {
        self.sender
            .send(ControlMessage::Shutdown)
            .map_err(|_| crate::error::GraphError::Corruption("control channel closed".into()))
    }

    fn group_commit_loop(db_path: PathBuf, receiver: Receiver<ControlMessage>, timeout_ms: u64) {
        let wal_path = db_path.with_extension("wal");

        loop {
            let mut pending_commits = Vec::new();

            match receiver.recv_timeout(Duration::from_millis(timeout_ms)) {
                Ok(ControlMessage::Commit(first_commit)) => {
                    pending_commits.push(first_commit);

                    while let Ok(msg) = receiver.try_recv() {
                        match msg {
                            ControlMessage::Commit(commit_req) => {
                                pending_commits.push(commit_req);
                            }
                            ControlMessage::Shutdown => {
                                Self::flush_pending_commits(&wal_path, pending_commits);
                                return;
                            }
                        }
                    }
                }
                Ok(ControlMessage::Shutdown) => {
                    Self::flush_pending_commits(&wal_path, Vec::new());
                    break;
                }
                Err(mpsc::RecvTimeoutError::Timeout) => {
                    continue;
                }
                Err(mpsc::RecvTimeoutError::Disconnected) => {
                    break;
                }
            }

            Self::flush_pending_commits(&wal_path, pending_commits);
        }
    }

    fn flush_pending_commits(wal_path: &PathBuf, pending_commits: Vec<CommitRequest>) {
        if pending_commits.is_empty() {
            return;
        }

        if let Ok(file) = std::fs::OpenOptions::new().write(true).open(wal_path) {
            let _ = file.sync_data();
        }

        for commit_req in pending_commits {
            let (lock, cvar) = &*commit_req.notifier;
            if let Ok(mut done) = acquire_lock(lock) {
                *done = true;
                cvar.notify_one();
            }
        }
    }
}