use crate::container::Container;
use crate::error::{DaosError, Result};
use crate::runtime::require_runtime;
use crate::unsafe_inner::ffi::{
daos_tx_abort, daos_tx_close, daos_tx_commit, daos_tx_open, daos_tx_open_snap, daos_tx_restart,
};
use crate::unsafe_inner::handle::{DAOS_HANDLE_NULL, DaosHandle};
use daos::daos_handle_t;
use std::marker::PhantomData;
pub mod flags {
use crate::unsafe_inner::ffi::tx_flags;
pub const TX_RDONLY: u64 = tx_flags::TX_RDONLY;
pub const TX_ZERO_COPY: u64 = tx_flags::TX_ZERO_COPY;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxState {
Open,
Committed,
Aborted,
Closed,
}
impl TxState {
pub fn is_open(&self) -> bool {
matches!(self, TxState::Open)
}
}
#[derive(Debug)]
pub struct Transaction<'c> {
_container_lifetime: PhantomData<&'c Container<'c>>,
handle: Option<DaosHandle>,
state: TxState,
}
impl<'c> Transaction<'c> {
pub fn new(container: &'c Container<'c>, flags: u64) -> Result<Self> {
let coh = container.as_handle()?;
Self::new_with_handle(coh, flags)
}
pub(crate) fn new_with_handle(container: DaosHandle, flags: u64) -> Result<Self> {
require_runtime()?;
let handle = daos_tx_open(container, flags)?;
Ok(Self {
_container_lifetime: PhantomData,
handle: Some(handle),
state: TxState::Open,
})
}
pub fn open_snap(container: &'c Container<'c>, epoch: u64) -> Result<Self> {
let coh = container.as_handle()?;
Self::open_snap_with_handle(coh, epoch)
}
pub(crate) fn open_snap_with_handle(container: DaosHandle, epoch: u64) -> Result<Self> {
require_runtime()?;
let handle = daos_tx_open_snap(container, epoch)?;
Ok(Self {
_container_lifetime: PhantomData,
handle: Some(handle),
state: TxState::Open,
})
}
#[inline]
pub fn as_handle(&self) -> Result<DaosHandle> {
if self.state == TxState::Open {
self.handle.ok_or(DaosError::InvalidArg)
} else {
Err(DaosError::InvalidArg)
}
}
#[inline]
pub fn state(&self) -> TxState {
self.state
}
#[inline]
pub fn is_open(&self) -> bool {
self.state == TxState::Open
}
#[inline]
pub(crate) fn raw_handle(&self) -> daos_handle_t {
self.handle
.map(|h| h.as_raw())
.unwrap_or_else(|| crate::unsafe_inner::handle::DAOS_HANDLE_NULL)
}
pub fn commit(&mut self) -> Result<()> {
if self.state != TxState::Open {
return Err(DaosError::InvalidArg);
}
let handle = self.handle.ok_or(DaosError::InvalidArg)?;
daos_tx_commit(handle)?;
self.state = TxState::Committed;
Ok(())
}
pub fn abort(&mut self) -> Result<()> {
if self.state != TxState::Open {
return Err(DaosError::InvalidArg);
}
let handle = self.handle.ok_or(DaosError::InvalidArg)?;
daos_tx_abort(handle)?;
self.state = TxState::Aborted;
Ok(())
}
pub fn restart(&mut self) -> Result<()> {
if self.state != TxState::Open {
return Err(DaosError::InvalidArg);
}
let handle = self.handle.ok_or(DaosError::InvalidArg)?;
daos_tx_restart(handle)?;
self.state = TxState::Open;
Ok(())
}
pub fn close(&mut self) -> Result<()> {
if let Some(handle) = self.handle.take() {
daos_tx_close(handle)?;
}
self.state = TxState::Closed;
Ok(())
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
if let Err(e) = daos_tx_close(handle) {
eprintln!(
"Transaction::drop: daos_tx_close() failed with {:?}, continuing with drop anyway",
e
);
}
}
self.state = TxState::Closed;
}
}
#[derive(Debug, Default)]
pub enum Tx<'c> {
Some(Transaction<'c>),
#[default]
None,
}
impl<'c> Tx<'c> {
#[inline]
pub fn none() -> Self {
Tx::None
}
#[inline]
pub fn is_none(&self) -> bool {
matches!(self, Tx::None)
}
#[inline]
pub fn is_some(&self) -> bool {
matches!(self, Tx::Some(_))
}
pub fn as_handle(&self) -> Result<DaosHandle> {
match self {
Tx::Some(tx) => tx.as_handle(),
Tx::None => Err(DaosError::InvalidArg),
}
}
#[inline]
pub(crate) fn as_raw_daos_handle(&self) -> daos_handle_t {
match self {
Tx::Some(tx) => tx.raw_handle(),
Tx::None => DAOS_HANDLE_NULL,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::unsafe_inner::handle::DaosHandle;
#[test]
fn test_tx_state_initial_values() {
assert!(TxState::Open != TxState::Committed);
assert!(TxState::Open != TxState::Aborted);
assert!(TxState::Open != TxState::Closed);
}
#[test]
fn test_tx_state_debug() {
assert_eq!(format!("{:?}", TxState::Open), "Open");
assert_eq!(format!("{:?}", TxState::Committed), "Committed");
assert_eq!(format!("{:?}", TxState::Aborted), "Aborted");
assert_eq!(format!("{:?}", TxState::Closed), "Closed");
}
#[test]
fn test_tx_none_default() {
let tx = Tx::None;
assert!(tx.is_none());
assert!(!tx.is_some());
assert!(matches!(tx, Tx::None));
}
#[test]
fn test_tx_none_as_handle_error() {
let tx = Tx::None;
let result = tx.as_handle();
assert!(result.is_err());
}
#[test]
fn test_tx_none_default_impl() {
let tx = Tx::default();
assert!(tx.is_none());
}
#[test]
fn test_transaction_flags_constants() {
assert_eq!(flags::TX_RDONLY, 1);
assert_eq!(flags::TX_ZERO_COPY, 2);
}
#[test]
fn test_tx_enum_debug() {
let tx_none = Tx::None;
assert!(format!("{:?}", tx_none).contains("None"));
}
#[test]
fn test_require_runtime_error_when_not_init() {
while crate::runtime::is_runtime_initialized() {
drop(crate::runtime::DaosRuntime::new());
}
let valid_handle = unsafe { DaosHandle::from_raw(daos::daos_handle_t { cookie: 12345 }) };
let result = Transaction::new_with_handle(valid_handle, 0);
assert!(result.is_err());
}
#[test]
fn test_transaction_state_transitions() {
let state = TxState::Open;
assert!(state.is_open());
let committed = TxState::Committed;
assert!(!committed.is_open());
}
#[test]
fn test_tx_some_creation_requires_valid_container() {
while crate::runtime::is_runtime_initialized() {
drop(crate::runtime::DaosRuntime::new());
}
crate::runtime::DaosRuntime::new().unwrap();
let invalid_handle = unsafe { DaosHandle::from_raw(daos::daos_handle_t { cookie: 0 }) };
let result = Transaction::new_with_handle(invalid_handle, 0);
assert!(result.is_err());
while crate::runtime::is_runtime_initialized() {
drop(crate::runtime::DaosRuntime::new());
}
}
#[test]
fn test_transaction_already_closed_operations() {
while crate::runtime::is_runtime_initialized() {
drop(crate::runtime::DaosRuntime::new());
}
crate::runtime::DaosRuntime::new().unwrap();
let valid_handle = unsafe { DaosHandle::from_raw(daos::daos_handle_t { cookie: 12345 }) };
let _result = Transaction::new_with_handle(valid_handle, 0);
while crate::runtime::is_runtime_initialized() {
drop(crate::runtime::DaosRuntime::new());
}
}
#[test]
fn test_tx_none_passthrough_semantics() {
let tx = Tx::none();
assert!(tx.is_none());
assert!(tx.as_handle().is_err());
}
#[test]
fn test_transaction_drop_closes_handle() {
let tx_none = Tx::none();
assert!(tx_none.is_none());
}
}