use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::ops::Deref;
use std::rc::Rc;
use std::result;
use std::sync::Arc;
use atomic_refcell::AtomicRefCell;
use super::eager_result::EagerResult;
use super::io::bolt::message_parameters::{BeginParameters, RunParameters};
use super::io::bolt::ResponseCallbacks;
use super::io::PooledBolt;
use super::record_stream::{GetSingleRecordError, RecordStream, SharedErrorPropagator};
use super::Record;
use crate::error_::{Neo4jError, Result};
use crate::summary::Summary;
use crate::value::{ValueReceive, ValueSend};
#[derive(Debug)]
pub struct Transaction<'driver, 'tx> {
inner_tx: &'tx mut InnerTransaction<'driver>,
drop_result: RefCell<Result<()>>,
}
impl<'driver, 'tx> Transaction<'driver, 'tx> {
pub(crate) fn new(inner: &'tx mut InnerTransaction<'driver>) -> Self {
Self {
inner_tx: inner,
drop_result: RefCell::new(Ok(())),
}
}
pub fn query<Q: AsRef<str>>(
&'tx self,
query: Q,
) -> TransactionQueryBuilder<'driver, 'tx, Q, DefaultKey, DefaultParameters> {
TransactionQueryBuilder::new(self, query)
}
fn run<Q: AsRef<str>, K: Borrow<str> + Debug, M: Borrow<HashMap<K, ValueSend>>>(
&'tx self,
builder: TransactionQueryBuilder<'driver, 'tx, Q, K, M>,
) -> Result<TransactionRecordStream<'driver, 'tx>> {
let query = builder.query.as_ref();
let parameters = builder.parameters.borrow();
Ok(TransactionRecordStream(
self.inner_tx.run(query, parameters)?,
self,
))
}
pub fn commit(self) -> Result<()> {
self.drop_result.into_inner()?;
self.inner_tx.commit()
}
pub fn rollback(self) -> Result<()> {
match self.drop_result.into_inner() {
Ok(_) => self.inner_tx.rollback(),
Err(_) => {
Ok(())
}
}
}
}
#[derive(Debug)]
pub struct TransactionRecordStream<'driver, 'tx>(
RecordStream<'driver>,
&'tx Transaction<'driver, 'tx>,
);
impl Drop for TransactionRecordStream<'_, '_> {
fn drop(&mut self) {
if let Err(err) = self.0.consume() {
if self.1.drop_result.borrow().is_ok() {
let _ = self.1.drop_result.replace(Err(err));
}
}
}
}
impl<'driver> TransactionRecordStream<'driver, '_> {
pub fn consume(mut self) -> Result<Option<Summary>> {
self.0.consume()
}
pub fn keys(&self) -> Vec<Arc<String>> {
self.0.keys()
}
pub fn single(&mut self) -> result::Result<Result<Record>, GetSingleRecordError> {
self.0.single()
}
pub fn try_as_eager_result(&mut self) -> Result<Option<EagerResult>> {
self.0.try_as_eager_result()
}
pub(crate) fn raw_stream_mut(&mut self) -> &mut RecordStream<'driver> {
&mut self.0
}
}
impl Iterator for TransactionRecordStream<'_, '_> {
type Item = Result<Record>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
#[derive(Debug)]
pub(crate) struct InnerTransaction<'driver> {
connection: Rc<RefCell<PooledBolt<'driver>>>,
bookmark: Arc<AtomicRefCell<Option<String>>>,
error_propagator: SharedErrorPropagator,
fetch_size: i64,
closed: bool,
}
impl<'driver> InnerTransaction<'driver> {
pub(crate) fn new(
connection: PooledBolt<'driver>,
fetch_size: i64,
error_propagator: SharedErrorPropagator,
) -> Self {
Self {
connection: Rc::new(RefCell::new(connection)),
bookmark: Default::default(),
error_propagator,
fetch_size,
closed: false,
}
}
pub(crate) fn begin<K: Borrow<str> + Debug>(
&mut self,
parameters: BeginParameters<K>,
eager: bool,
callbacks: ResponseCallbacks,
) -> Result<()> {
let mut cx = self.connection.borrow_mut();
cx.begin(parameters, callbacks)?;
if eager {
cx.write_all(None)?;
cx.read_all(None)?;
self.check_error()?;
}
Ok(())
}
pub(crate) fn commit(&mut self) -> Result<()> {
self.closed = true;
self.check_error()?;
let mut cx = self.connection.borrow_mut();
let bookmark = Arc::clone(&self.bookmark);
cx.write_all(None)?;
cx.read_all(None)?;
cx.commit(ResponseCallbacks::new().with_on_success(move |mut meta| {
if let Some(ValueReceive::String(bms)) = meta.remove("bookmark") {
*bookmark.borrow_mut() = Some(bms);
};
Ok(())
}))?;
cx.write_all(None)?;
Neo4jError::wrap_commit(cx.read_all(None))
}
pub(crate) fn rollback(&mut self) -> Result<()> {
self.closed = true;
if self.error_propagator.deref().borrow().error().is_some() {
return Ok(());
}
let mut cx = self.connection.borrow_mut();
cx.rollback()?;
cx.write_all(None)?;
cx.read_all(None)
}
pub(crate) fn close(&mut self) -> Result<()> {
if self.check_error().is_err() || self.connection.borrow_mut().closed() {
self.closed = true;
}
if !self.closed {
return self.rollback();
}
Ok(())
}
pub(crate) fn into_bookmark(self) -> Option<String> {
self.bookmark.borrow_mut().take()
}
pub(crate) fn run<K: Borrow<str> + Debug>(
&self,
query: &str,
parameters: &HashMap<K, ValueSend>,
) -> Result<RecordStream<'driver>> {
let cx = Rc::clone(&self.connection);
let mut record_stream = RecordStream::new(
cx,
self.fetch_size,
false,
Some(Arc::clone(&self.error_propagator)),
);
record_stream.run(
RunParameters::new_transaction_run(query, Some(parameters)),
None,
)?;
Ok(record_stream)
}
fn check_error(&self) -> Result<()> {
match self.error_propagator.deref().borrow().error() {
None => Ok(()),
Some(err) => Err(Neo4jError::ServerError {
error: err.deref().clone(),
}),
}
}
}
pub struct TransactionQueryBuilder<
'driver,
'tx,
Q: AsRef<str>,
K: Borrow<str> + Debug,
M: Borrow<HashMap<K, ValueSend>>,
> {
tx: &'tx Transaction<'driver, 'tx>,
query: Q,
_k: PhantomData<K>,
parameters: M,
}
type DefaultKey = String;
type DefaultParameters = HashMap<DefaultKey, ValueSend>;
impl<'driver, 'tx, Q: AsRef<str>>
TransactionQueryBuilder<'driver, 'tx, Q, DefaultKey, DefaultParameters>
{
fn new(tx: &'tx Transaction<'driver, 'tx>, query: Q) -> Self {
Self {
tx,
query,
_k: PhantomData,
parameters: Default::default(),
}
}
}
impl<'driver, 'tx, Q: AsRef<str>, K: Borrow<str> + Debug, M: Borrow<HashMap<K, ValueSend>>>
TransactionQueryBuilder<'driver, 'tx, Q, K, M>
{
pub fn with_parameters<K_: Borrow<str> + Debug, M_: Borrow<HashMap<K_, ValueSend>>>(
self,
parameters: M_,
) -> TransactionQueryBuilder<'driver, 'tx, Q, K_, M_> {
let Self {
tx,
query,
_k: _,
parameters: _,
} = self;
TransactionQueryBuilder {
tx,
query,
_k: PhantomData,
parameters,
}
}
pub fn without_parameters(
self,
) -> TransactionQueryBuilder<'driver, 'tx, Q, DefaultKey, DefaultParameters> {
let Self {
tx,
query,
_k: _,
parameters: _,
} = self;
TransactionQueryBuilder {
tx,
query,
_k: PhantomData,
parameters: Default::default(),
}
}
pub fn run(self) -> Result<TransactionRecordStream<'driver, 'tx>> {
self.tx.run(self)
}
}
impl<Q: AsRef<str>, K: Borrow<str> + Debug, M: Borrow<HashMap<K, ValueSend>>> Debug
for TransactionQueryBuilder<'_, '_, Q, K, M>
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransactionQueryBuilder")
.field("inner_tx", &self.tx)
.field("query", &self.query.as_ref())
.field("parameters", self.parameters.borrow())
.finish()
}
}
#[derive(Debug, Clone, Copy)]
pub struct TransactionTimeout {
timeout: InternalTransactionTimeout,
}
impl TransactionTimeout {
#[inline]
pub fn from_millis(timeout: i64) -> Option<Self> {
if timeout <= 0 {
return None;
}
Some(Self {
timeout: InternalTransactionTimeout::Custom(timeout),
})
}
#[inline]
pub fn none() -> Self {
Self {
timeout: InternalTransactionTimeout::None,
}
}
#[inline]
pub(crate) fn raw(&self) -> Option<i64> {
self.timeout.raw()
}
}
impl Default for TransactionTimeout {
#[inline]
fn default() -> Self {
Self {
timeout: InternalTransactionTimeout::Default,
}
}
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum InternalTransactionTimeout {
None,
Default,
Custom(i64),
}
impl Default for InternalTransactionTimeout {
fn default() -> Self {
Self::Default
}
}
impl InternalTransactionTimeout {
#[inline]
pub(crate) fn raw(&self) -> Option<i64> {
match self {
Self::None => Some(0),
Self::Default => None,
Self::Custom(timeout) => Some(*timeout),
}
}
}