use crate::{
error::{MdbxResult, mdbx_result},
sys::EnvPtr,
};
use std::{
ptr,
sync::mpsc::{Receiver, SyncSender, sync_channel},
};
#[derive(Copy, Clone, Debug)]
pub(crate) struct RawTxPtr(pub(crate) *mut ffi::MDBX_txn);
unsafe impl Send for RawTxPtr {}
unsafe impl Sync for RawTxPtr {}
#[derive(Debug, Clone, Copy)]
pub(crate) struct CommitLatencyPtr(pub(crate) *mut ffi::MDBX_commit_latency);
unsafe impl Send for CommitLatencyPtr {}
unsafe impl Sync for CommitLatencyPtr {}
pub(crate) struct Begin {
pub(crate) parent: RawTxPtr,
pub(crate) flags: ffi::MDBX_txn_flags_t,
pub(crate) sender: SyncSender<MdbxResult<RawTxPtr>>,
pub(crate) span: tracing::Span,
}
pub(crate) struct Abort {
pub(crate) tx: RawTxPtr,
pub(crate) sender: SyncSender<MdbxResult<bool>>,
pub(crate) span: tracing::Span,
}
pub(crate) struct Commit {
pub(crate) tx: RawTxPtr,
pub(crate) latency: CommitLatencyPtr,
pub(crate) sender: SyncSender<MdbxResult<bool>>,
pub(crate) span: tracing::Span,
}
pub(crate) enum LifecycleEvent {
Begin(Begin),
Abort(Abort),
Commit(Commit),
}
impl From<Begin> for LifecycleEvent {
fn from(begin: Begin) -> Self {
LifecycleEvent::Begin(begin)
}
}
impl From<Abort> for LifecycleEvent {
fn from(abort: Abort) -> Self {
LifecycleEvent::Abort(abort)
}
}
impl From<Commit> for LifecycleEvent {
fn from(commit: Commit) -> Self {
LifecycleEvent::Commit(commit)
}
}
pub(crate) struct LifecycleHandle {
sender: SyncSender<LifecycleEvent>,
}
impl LifecycleHandle {
#[track_caller]
#[inline(always)]
pub(crate) fn send<T: Into<LifecycleEvent>>(&self, msg: T) {
self.sender.send(msg.into()).unwrap();
}
}
impl From<SyncSender<LifecycleEvent>> for LifecycleHandle {
fn from(sender: SyncSender<LifecycleEvent>) -> Self {
Self { sender }
}
}
#[derive(Debug)]
pub(crate) struct RwSyncLifecycle {
env: EnvPtr,
rx: Receiver<LifecycleEvent>,
}
impl RwSyncLifecycle {
pub(crate) fn spawn(env: EnvPtr) -> LifecycleHandle {
let (tx, rx) = sync_channel(0);
let txn_manager = Self { env, rx };
txn_manager.start_message_listener();
tx.into()
}
fn handle_begin(&self, Begin { parent, flags, sender, span }: Begin) {
let _guard = span.entered();
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
let res = mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(self.env.0, parent.0, flags, &mut txn, ptr::null_mut())
})
.map(|_| RawTxPtr(txn));
sender.send(res).unwrap();
}
fn handle_abort(&self, Abort { tx, sender, span }: Abort) {
let _guard = span.entered();
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
fn handle_commit(&self, Commit { tx, sender, latency, span }: Commit) {
let _guard = span.entered();
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(tx.0, latency.0) })).unwrap();
}
fn start_message_listener(self) {
let task = move || {
loop {
match self.rx.recv() {
Ok(msg) => match msg {
LifecycleEvent::Begin(begin) => {
self.handle_begin(begin);
}
LifecycleEvent::Abort(abort) => {
self.handle_abort(abort);
}
LifecycleEvent::Commit(commit) => {
self.handle_commit(commit);
}
},
Err(_) => return,
}
}
};
std::thread::Builder::new().name("mdbx-rs-txn-manager".to_string()).spawn(task).unwrap();
}
}