use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use nautilus_dialect::Sql;
use crate::error::{ConnectorError as Error, Result};
use crate::row_stream::RowStream;
use crate::{Executor, Row};
#[derive(Debug, Clone)]
pub struct TransactionOptions {
pub timeout: Duration,
pub isolation_level: Option<IsolationLevel>,
}
impl Default for TransactionOptions {
fn default() -> Self {
Self {
timeout: Duration::from_secs(5),
isolation_level: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
impl IsolationLevel {
pub fn as_sql(&self) -> &'static str {
match self {
IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
IsolationLevel::ReadCommitted => "READ COMMITTED",
IsolationLevel::RepeatableRead => "REPEATABLE READ",
IsolationLevel::Serializable => "SERIALIZABLE",
}
}
}
enum TransactionInner {
Postgres(Arc<Mutex<Option<sqlx::Transaction<'static, sqlx::Postgres>>>>),
Mysql(Arc<Mutex<Option<sqlx::Transaction<'static, sqlx::MySql>>>>),
Sqlite(Arc<Mutex<Option<sqlx::Transaction<'static, sqlx::Sqlite>>>>),
}
pub struct TransactionExecutor {
inner: TransactionInner,
}
impl TransactionExecutor {
pub fn postgres(tx: sqlx::Transaction<'static, sqlx::Postgres>) -> Self {
Self {
inner: TransactionInner::Postgres(Arc::new(Mutex::new(Some(tx)))),
}
}
pub fn mysql(tx: sqlx::Transaction<'static, sqlx::MySql>) -> Self {
Self {
inner: TransactionInner::Mysql(Arc::new(Mutex::new(Some(tx)))),
}
}
pub fn sqlite(tx: sqlx::Transaction<'static, sqlx::Sqlite>) -> Self {
Self {
inner: TransactionInner::Sqlite(Arc::new(Mutex::new(Some(tx)))),
}
}
pub async fn commit(&self) -> Result<()> {
match &self.inner {
TransactionInner::Postgres(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.commit()
.await
.map_err(|e| Error::database(e, "Commit failed"))
}
TransactionInner::Mysql(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.commit()
.await
.map_err(|e| Error::database(e, "Commit failed"))
}
TransactionInner::Sqlite(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.commit()
.await
.map_err(|e| Error::database(e, "Commit failed"))
}
}
}
pub async fn rollback(&self) -> Result<()> {
match &self.inner {
TransactionInner::Postgres(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.rollback()
.await
.map_err(|e| Error::database(e, "Rollback failed"))
}
TransactionInner::Mysql(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.rollback()
.await
.map_err(|e| Error::database(e, "Rollback failed"))
}
TransactionInner::Sqlite(mx) => {
let tx = mx
.lock()
.await
.take()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
tx.rollback()
.await
.map_err(|e| Error::database(e, "Rollback failed"))
}
}
}
pub async fn is_open(&self) -> bool {
match &self.inner {
TransactionInner::Postgres(mx) => mx.lock().await.is_some(),
TransactionInner::Mysql(mx) => mx.lock().await.is_some(),
TransactionInner::Sqlite(mx) => mx.lock().await.is_some(),
}
}
pub async fn execute_affected(&self, sql: &Sql) -> Result<usize> {
match &self.inner {
TransactionInner::Postgres(tx_arc) => {
let mut guard = tx_arc.lock().await;
let tx = guard
.as_mut()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
let mut query = sqlx::query(&sql.text);
for param in &sql.params {
query = crate::postgres::bind_value(query, param)?;
}
use sqlx::Executor as _;
let result = (&mut **tx)
.execute(query)
.await
.map_err(|e| Error::database(e, "Mutation failed"))?;
Ok(result.rows_affected() as usize)
}
TransactionInner::Mysql(tx_arc) => {
let mut guard = tx_arc.lock().await;
let tx = guard
.as_mut()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
let mut query = sqlx::query(&sql.text);
for param in &sql.params {
query = crate::mysql::bind_value(query, param)?;
}
use sqlx::Executor as _;
let result = (&mut **tx)
.execute(query)
.await
.map_err(|e| Error::database(e, "Mutation failed"))?;
Ok(result.rows_affected() as usize)
}
TransactionInner::Sqlite(tx_arc) => {
let mut guard = tx_arc.lock().await;
let tx = guard
.as_mut()
.ok_or_else(|| Error::database_msg("Transaction already closed"))?;
let mut query = sqlx::query(&sql.text);
for param in &sql.params {
query = crate::sqlite::bind_value(query, param)?;
}
use sqlx::Executor as _;
let result = (&mut **tx)
.execute(query)
.await
.map_err(|e| Error::database(e, "Mutation failed"))?;
Ok(result.rows_affected() as usize)
}
}
}
}
impl Executor for TransactionExecutor {
type Row<'conn>
= Row
where
Self: 'conn;
type RowStream<'conn>
= RowStream
where
Self: 'conn;
fn execute<'conn>(&'conn self, sql: &'conn Sql) -> Self::RowStream<'conn> {
let sql_text = sql.text.clone();
let params = sql.params.clone();
match &self.inner {
TransactionInner::Postgres(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut query = sqlx::query(&sql_text);
for param in ¶ms {
query = match crate::postgres::bind_value(query, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
let rows = match (&mut **tx).fetch_all(query).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Query failed")); return; }
};
drop(guard);
for row in rows {
match crate::postgres_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
TransactionInner::Mysql(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut query = sqlx::query(&sql_text);
for param in ¶ms {
query = match crate::mysql::bind_value(query, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
let rows = match (&mut **tx).fetch_all(query).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Query failed")); return; }
};
drop(guard);
for row in rows {
match crate::mysql_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
TransactionInner::Sqlite(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut query = sqlx::query(&sql_text);
for param in ¶ms {
query = match crate::sqlite::bind_value(query, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
let rows = match (&mut **tx).fetch_all(query).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Query failed")); return; }
};
drop(guard);
for row in rows {
match crate::sqlite_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
}
}
fn execute_and_fetch<'conn>(
&'conn self,
mutation: &'conn Sql,
fetch: &'conn Sql,
) -> Self::RowStream<'conn> {
let mutation_text = mutation.text.clone();
let mutation_params = mutation.params.clone();
let fetch_text = fetch.text.clone();
let fetch_params = fetch.params.clone();
match &self.inner {
TransactionInner::Postgres(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut mq = sqlx::query(&mutation_text);
for param in &mutation_params {
mq = match crate::postgres::bind_value(mq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
if let Err(e) = (&mut **tx).execute(mq).await {
yield Err(Error::database(e, "Mutation failed")); return;
}
let mut fq = sqlx::query(&fetch_text);
for param in &fetch_params {
fq = match crate::postgres::bind_value(fq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
let rows = match (&mut **tx).fetch_all(fq).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Fetch failed")); return; }
};
drop(guard);
for row in rows {
match crate::postgres_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
TransactionInner::Mysql(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut mq = sqlx::query(&mutation_text);
for param in &mutation_params {
mq = match crate::mysql::bind_value(mq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
if let Err(e) = (&mut **tx).execute(mq).await {
yield Err(Error::database(e, "Mutation failed")); return;
}
let mut fq = sqlx::query(&fetch_text);
for param in &fetch_params {
fq = match crate::mysql::bind_value(fq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
let rows = match (&mut **tx).fetch_all(fq).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Fetch failed")); return; }
};
drop(guard);
for row in rows {
match crate::mysql_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
TransactionInner::Sqlite(tx_arc) => {
let tx_arc = Arc::clone(tx_arc);
let stream = async_stream::stream! {
let mut guard = tx_arc.lock().await;
let tx = match guard.as_mut() {
Some(tx) => tx,
None => { yield Err(Error::database_msg("Transaction already closed")); return; }
};
let mut mq = sqlx::query(&mutation_text);
for param in &mutation_params {
mq = match crate::sqlite::bind_value(mq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
use sqlx::Executor as _;
if let Err(e) = (&mut **tx).execute(mq).await {
yield Err(Error::database(e, "Mutation failed")); return;
}
let mut fq = sqlx::query(&fetch_text);
for param in &fetch_params {
fq = match crate::sqlite::bind_value(fq, param) {
Ok(q) => q,
Err(e) => { yield Err(e); return; }
};
}
let rows = match (&mut **tx).fetch_all(fq).await {
Ok(rows) => rows,
Err(e) => { yield Err(Error::database(e, "Fetch failed")); return; }
};
drop(guard);
for row in rows {
match crate::sqlite_stream::decode_row_internal(row) {
Ok(r) => yield Ok(r),
Err(e) => yield Err(e),
}
}
};
RowStream::new_from_stream(Box::pin(stream))
}
}
}
}