use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
impl IsolationLevel {
pub fn to_sql(&self) -> &'static str {
match self {
IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
IsolationLevel::ReadCommitted => "READ COMMITTED",
IsolationLevel::RepeatableRead => "REPEATABLE READ",
IsolationLevel::Serializable => "SERIALIZABLE",
}
}
pub(crate) fn to_backends_level(self) -> super::connection::IsolationLevel {
match self {
IsolationLevel::ReadUncommitted => super::connection::IsolationLevel::ReadUncommitted,
IsolationLevel::ReadCommitted => super::connection::IsolationLevel::ReadCommitted,
IsolationLevel::RepeatableRead => super::connection::IsolationLevel::RepeatableRead,
IsolationLevel::Serializable => super::connection::IsolationLevel::Serializable,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
NotStarted,
Active,
Committed,
RolledBack,
}
#[derive(Debug, Clone)]
pub struct Savepoint {
name: String,
pub depth: usize,
}
impl Savepoint {
pub fn new(name: impl Into<String>, depth: usize) -> Self {
let name = name.into();
validate_savepoint_name(&name).unwrap_or_else(|e| panic!("Invalid savepoint name: {}", e));
Self { name, depth }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn to_sql(&self) -> String {
format!("SAVEPOINT \"{}\"", self.name.replace('"', "\"\""))
}
pub fn release_sql(&self) -> String {
format!("RELEASE SAVEPOINT \"{}\"", self.name.replace('"', "\"\""))
}
pub fn rollback_sql(&self) -> String {
format!(
"ROLLBACK TO SAVEPOINT \"{}\"",
self.name.replace('"', "\"\"")
)
}
}
fn validate_savepoint_name(name: &str) -> Result<(), String> {
if name.is_empty() {
return Err("Savepoint name cannot be empty".to_string());
}
if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
return Err(format!(
"Savepoint name '{}' contains invalid characters. Only alphanumeric characters and underscores are allowed",
name
));
}
if let Some(first_char) = name.chars().next()
&& first_char.is_numeric()
{
return Err(format!(
"Savepoint name '{}' cannot start with a number",
name
));
}
Ok(())
}
#[derive(Debug)]
pub struct Transaction {
state: Arc<Mutex<TransactionState>>,
isolation_level: Option<IsolationLevel>,
savepoints: Arc<Mutex<Vec<Savepoint>>>,
depth: usize,
}
impl Transaction {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(TransactionState::NotStarted)),
isolation_level: None,
savepoints: Arc::new(Mutex::new(Vec::new())),
depth: 0,
}
}
pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
self.isolation_level = Some(level);
self
}
pub fn begin(&mut self) -> Result<String, String> {
let mut state = self.state.lock().map_err(|e| e.to_string())?;
match *state {
TransactionState::NotStarted => {
*state = TransactionState::Active;
self.depth = 1;
let sql = if let Some(level) = self.isolation_level {
format!("BEGIN TRANSACTION ISOLATION LEVEL {}", level.to_sql())
} else {
"BEGIN TRANSACTION".to_string()
};
Ok(sql)
}
TransactionState::Active => {
self.depth += 1;
let savepoint = Savepoint::new(format!("sp_{}", self.depth), self.depth);
let sql = savepoint.to_sql();
self.savepoints
.lock()
.map_err(|e| e.to_string())?
.push(savepoint);
Ok(sql)
}
_ => Err("Transaction already completed".to_string()),
}
}
pub fn commit(&mut self) -> Result<String, String> {
let mut state = self.state.lock().map_err(|e| e.to_string())?;
match *state {
TransactionState::Active => {
if self.depth > 1 {
let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
if let Some(savepoint) = savepoints.pop() {
self.depth -= 1;
Ok(savepoint.release_sql())
} else {
Err("No savepoint to release".to_string())
}
} else {
*state = TransactionState::Committed;
self.depth = 0;
Ok("COMMIT".to_string())
}
}
_ => Err("No active transaction to commit".to_string()),
}
}
pub fn rollback(&mut self) -> Result<String, String> {
let mut state = self.state.lock().map_err(|e| e.to_string())?;
match *state {
TransactionState::Active => {
if self.depth > 1 {
let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
if let Some(savepoint) = savepoints.pop() {
self.depth -= 1;
Ok(savepoint.rollback_sql())
} else {
Err("No savepoint to rollback to".to_string())
}
} else {
*state = TransactionState::RolledBack;
self.depth = 0;
self.savepoints.lock().map_err(|e| e.to_string())?.clear();
Ok("ROLLBACK".to_string())
}
}
_ => Err("No active transaction to rollback".to_string()),
}
}
pub fn state(&self) -> Result<TransactionState, String> {
self.state.lock().map(|s| *s).map_err(|e| e.to_string())
}
pub fn depth(&self) -> usize {
self.depth
}
pub async fn begin_db(&mut self) -> reinhardt_core::exception::Result<()> {
let sql = self
.begin()
.map_err(reinhardt_core::exception::Error::Database)?;
let conn = super::manager::get_connection().await?;
conn.execute(&sql, vec![]).await?;
Ok(())
}
pub async fn commit_db(&mut self) -> reinhardt_core::exception::Result<()> {
let sql = self
.commit()
.map_err(reinhardt_core::exception::Error::Database)?;
let conn = super::manager::get_connection().await?;
conn.execute(&sql, vec![]).await?;
Ok(())
}
pub async fn rollback_db(&mut self) -> reinhardt_core::exception::Result<()> {
let sql = self
.rollback()
.map_err(reinhardt_core::exception::Error::Database)?;
let conn = super::manager::get_connection().await?;
conn.execute(&sql, vec![]).await?;
Ok(())
}
pub fn is_active(&self) -> bool {
matches!(self.state().ok(), Some(TransactionState::Active))
}
pub fn savepoint(&mut self, name: impl Into<String>) -> Result<String, String> {
let state = self.state.lock().map_err(|e| e.to_string())?;
if *state != TransactionState::Active {
return Err("Cannot create savepoint outside active transaction".to_string());
}
drop(state);
let savepoint = Savepoint::new(name, self.depth);
let sql = savepoint.to_sql();
self.savepoints
.lock()
.map_err(|e| e.to_string())?
.push(savepoint);
Ok(sql)
}
pub fn release_savepoint(&mut self, name: &str) -> Result<String, String> {
let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
if let Some(pos) = savepoints.iter().position(|sp| sp.name() == name) {
let savepoint = savepoints.remove(pos);
Ok(savepoint.release_sql())
} else {
Err(format!("Savepoint '{}' not found", name))
}
}
pub fn rollback_to_savepoint(&mut self, name: &str) -> Result<String, String> {
let savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
if let Some(savepoint) = savepoints.iter().find(|sp| sp.name() == name) {
Ok(savepoint.rollback_sql())
} else {
Err(format!("Savepoint '{}' not found", name))
}
}
}
impl Default for Transaction {
fn default() -> Self {
Self::new()
}
}
pub struct Atomic<F> {
_func: F,
_isolation_level: Option<IsolationLevel>,
}
impl<F> Atomic<F> {
pub fn new(func: F) -> Self {
Self {
_func: func,
_isolation_level: None,
}
}
pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
self._isolation_level = Some(level);
self
}
}
pub struct TransactionScope {
executor: Option<Box<dyn super::connection::TransactionExecutor>>,
committed: bool,
}
impl TransactionScope {
pub async fn begin(
conn: &super::connection::DatabaseConnection,
) -> Result<Self, anyhow::Error> {
let executor = conn.begin().await?;
Ok(Self {
executor: Some(executor),
committed: false,
})
}
pub async fn begin_with_isolation(
conn: &super::connection::DatabaseConnection,
level: IsolationLevel,
) -> Result<Self, anyhow::Error> {
let executor = conn.begin_with_isolation(level.to_backends_level()).await?;
Ok(Self {
executor: Some(executor),
committed: false,
})
}
pub async fn execute(
&mut self,
sql: &str,
params: Vec<super::connection::QueryValue>,
) -> Result<u64, anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
let result = executor.execute(sql, params).await?;
Ok(result.rows_affected)
}
pub async fn query_one(
&mut self,
sql: &str,
params: Vec<super::connection::QueryValue>,
) -> Result<super::connection::QueryRow, anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
let row = executor.fetch_one(sql, params).await?;
Ok(super::connection::QueryRow::from_backend_row(row))
}
pub async fn query(
&mut self,
sql: &str,
params: Vec<super::connection::QueryValue>,
) -> Result<Vec<super::connection::QueryRow>, anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
let rows = executor.fetch_all(sql, params).await?;
Ok(rows
.into_iter()
.map(super::connection::QueryRow::from_backend_row)
.collect())
}
pub async fn query_optional(
&mut self,
sql: &str,
params: Vec<super::connection::QueryValue>,
) -> Result<Option<super::connection::QueryRow>, anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
let row = executor.fetch_optional(sql, params).await?;
Ok(row.map(super::connection::QueryRow::from_backend_row))
}
pub async fn commit(mut self) -> Result<(), anyhow::Error> {
let executor = self
.executor
.take()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
executor.commit().await?;
self.committed = true;
Ok(())
}
pub async fn rollback(mut self) -> Result<(), anyhow::Error> {
let executor = self
.executor
.take()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
executor.rollback().await?;
self.committed = true; Ok(())
}
pub async fn savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
executor.savepoint(name).await?;
Ok(())
}
pub async fn release_savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
executor.release_savepoint(name).await?;
Ok(())
}
pub async fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
let executor = self
.executor
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
executor.rollback_to_savepoint(name).await?;
Ok(())
}
pub async fn run<F, Fut, T>(mut self, f: F) -> Result<T, anyhow::Error>
where
F: FnOnce(&mut Self) -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
match f(&mut self).await {
Ok(result) => {
self.commit().await?;
Ok(result)
}
Err(e) => {
self.rollback().await?;
Err(e)
}
}
}
}
impl Drop for TransactionScope {
fn drop(&mut self) {
if !self.committed
&& let Some(executor) = self.executor.take()
{
eprintln!(
"Warning: TransactionScope dropped without explicit commit/rollback. \
Consider using transaction() function for automatic error handling."
);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
tokio::task::block_in_place(|| {
handle.block_on(async { executor.rollback().await })
})
}));
match result {
Ok(Ok(())) => {
}
Ok(Err(e)) => {
eprintln!("Error during automatic rollback: {}", e);
}
Err(_) => {
eprintln!(
"Warning: Cannot perform automatic rollback on single-threaded runtime. \
Use transaction() function or explicit commit()/rollback()."
);
}
}
} else {
eprintln!(
"Warning: No async runtime available for automatic rollback. \
Transaction may not be cleaned up properly."
);
}
}
}
}
pub async fn atomic<F, Fut, T>(
conn: &super::connection::DatabaseConnection,
f: F,
) -> Result<T, anyhow::Error>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
let tx = TransactionScope::begin(conn).await?;
let result = f().await?;
tx.commit().await?;
Ok(result)
}
pub async fn atomic_with_isolation<F, Fut, T>(
conn: &super::connection::DatabaseConnection,
level: IsolationLevel,
f: F,
) -> Result<T, anyhow::Error>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
let tx = TransactionScope::begin_with_isolation(conn, level).await?;
let result = f().await?;
tx.commit().await?;
Ok(result)
}
pub async fn transaction<F, Fut, T>(
conn: &super::connection::DatabaseConnection,
f: F,
) -> Result<T, anyhow::Error>
where
F: FnOnce(&mut TransactionScope) -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
let mut tx = TransactionScope::begin(conn).await?;
match f(&mut tx).await {
Ok(result) => {
tx.commit().await?;
Ok(result)
}
Err(e) => {
tx.rollback().await?;
Err(e)
}
}
}
pub async fn transaction_with_isolation<F, Fut, T>(
conn: &super::connection::DatabaseConnection,
level: IsolationLevel,
f: F,
) -> Result<T, anyhow::Error>
where
F: FnOnce(&mut TransactionScope) -> Fut,
Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
{
let mut tx = TransactionScope::begin_with_isolation(conn, level).await?;
match f(&mut tx).await {
Ok(result) => {
tx.commit().await?;
Ok(result)
}
Err(e) => {
tx.rollback().await?;
Err(e)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backends::backend::DatabaseBackend as BackendTrait;
use crate::backends::connection::DatabaseConnection as BackendsConnection;
use crate::backends::error::Result;
use crate::backends::types::{DatabaseType, QueryResult, QueryValue, Row, TransactionExecutor};
use crate::orm::connection::{DatabaseBackend, DatabaseConnection};
use crate::prelude::Model;
use rstest::*;
use std::sync::Arc;
struct MockTransactionExecutor;
#[async_trait::async_trait]
impl TransactionExecutor for MockTransactionExecutor {
async fn execute(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
Ok(QueryResult { rows_affected: 0 })
}
async fn fetch_one(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
Ok(Row::new())
}
async fn fetch_all(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
Ok(Vec::new())
}
async fn fetch_optional(
&mut self,
_sql: &str,
_params: Vec<QueryValue>,
) -> Result<Option<Row>> {
Ok(None)
}
async fn commit(self: Box<Self>) -> Result<()> {
Ok(())
}
async fn rollback(self: Box<Self>) -> Result<()> {
Ok(())
}
}
struct MockBackend;
#[async_trait::async_trait]
impl BackendTrait for MockBackend {
fn database_type(&self) -> DatabaseType {
DatabaseType::Postgres
}
fn placeholder(&self, index: usize) -> String {
format!("${}", index)
}
fn supports_returning(&self) -> bool {
true
}
fn supports_on_conflict(&self) -> bool {
true
}
async fn execute(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
Ok(QueryResult { rows_affected: 1 })
}
async fn fetch_one(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
Ok(Row::new())
}
async fn fetch_all(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
Ok(Vec::new())
}
async fn fetch_optional(
&self,
_sql: &str,
_params: Vec<QueryValue>,
) -> Result<Option<Row>> {
Ok(None)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn begin(&self) -> Result<Box<dyn TransactionExecutor>> {
Ok(Box::new(MockTransactionExecutor))
}
}
#[fixture]
fn mock_connection() -> DatabaseConnection {
let mock_backend = Arc::new(MockBackend);
let backends_conn = BackendsConnection::new(mock_backend);
DatabaseConnection::new(DatabaseBackend::Postgres, backends_conn)
}
#[rstest]
#[tokio::test]
async fn test_transaction_scope_commit(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let tx = TransactionScope::begin(&conn).await;
let tx = tx.unwrap();
assert!(!tx.committed);
let result = tx.commit().await;
assert!(result.is_ok());
}
#[rstest]
#[tokio::test]
async fn test_transaction_scope_rollback(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let tx = TransactionScope::begin(&conn).await.unwrap();
let result = tx.rollback().await;
assert!(result.is_ok());
}
#[rstest]
#[tokio::test]
async fn test_transaction_scope_with_isolation(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let tx = TransactionScope::begin_with_isolation(&conn, IsolationLevel::Serializable).await;
let tx = tx.unwrap();
let result = tx.commit().await;
assert!(result.is_ok());
}
#[rstest]
#[tokio::test]
async fn test_atomic_helper(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result = atomic(&conn, || async move { Ok::<_, anyhow::Error>(42) }).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[rstest]
#[tokio::test]
async fn test_atomic_helper_with_error(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result = atomic(&conn, || async move {
Err::<i32, _>(anyhow::anyhow!("test error"))
})
.await;
assert!(result.is_err());
}
#[rstest]
#[tokio::test]
async fn test_atomic_with_isolation_helper(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result = atomic_with_isolation(&conn, IsolationLevel::Serializable, || async move {
Ok::<_, anyhow::Error>(100)
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 100);
}
#[test]
fn test_transaction_begin() {
let mut tx = Transaction::new();
let sql = tx.begin().unwrap();
assert_eq!(sql, "BEGIN TRANSACTION");
assert_eq!(tx.state().unwrap(), TransactionState::Active);
assert_eq!(tx.depth(), 1);
}
#[test]
fn test_transaction_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
let sql = tx.commit().unwrap();
assert_eq!(sql, "COMMIT");
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
assert_eq!(tx.depth(), 0);
}
#[test]
fn test_orm_transaction_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
let sql = tx.rollback().unwrap();
assert_eq!(sql, "ROLLBACK");
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
assert_eq!(tx.depth(), 0);
}
#[test]
fn test_nested_transaction_begin() {
let mut tx = Transaction::new();
tx.begin().unwrap();
let sql = tx.begin().unwrap();
assert!(sql.contains("SAVEPOINT \"sp_2\""));
assert_eq!(tx.depth(), 2);
}
#[test]
fn test_nested_transaction_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
let sql = tx.commit().unwrap();
assert!(sql.contains("RELEASE SAVEPOINT"));
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_nested_transaction_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
let sql = tx.rollback().unwrap();
assert!(sql.contains("ROLLBACK TO SAVEPOINT"));
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_isolation_level() {
let mut tx = Transaction::new().with_isolation_level(IsolationLevel::Serializable);
let sql = tx.begin().unwrap();
assert!(sql.contains("ISOLATION LEVEL SERIALIZABLE"));
}
#[test]
fn test_manual_savepoint() {
let mut tx = Transaction::new();
tx.begin().unwrap();
let sql = tx.savepoint("my_savepoint").unwrap();
assert_eq!(sql, r#"SAVEPOINT "my_savepoint""#);
}
#[test]
fn test_orm_transaction_release_savepoint() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.savepoint("my_savepoint").unwrap();
let sql = tx.release_savepoint("my_savepoint").unwrap();
assert_eq!(sql, r#"RELEASE SAVEPOINT "my_savepoint""#);
}
#[test]
fn test_orm_transaction_rollback_savepoint() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.savepoint("my_savepoint").unwrap();
let sql = tx.rollback_to_savepoint("my_savepoint").unwrap();
assert_eq!(sql, r#"ROLLBACK TO SAVEPOINT "my_savepoint""#);
}
#[test]
fn test_transaction_is_active() {
let mut tx = Transaction::new();
assert!(!tx.is_active());
tx.begin().unwrap();
assert!(tx.is_active());
tx.commit().unwrap();
assert!(!tx.is_active());
}
#[test]
fn test_commit_without_begin() {
let mut tx = Transaction::new();
let result = tx.commit();
assert!(result.is_err());
}
#[test]
fn test_rollback_without_begin() {
let mut tx = Transaction::new();
let result = tx.rollback();
assert!(result.is_err());
}
#[test]
fn test_savepoint_outside_transaction() {
let mut tx = Transaction::new();
let result = tx.savepoint("test");
assert!(result.is_err());
}
use reinhardt_core::validators::TableName;
use serde::{Deserialize, Serialize};
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestItem {
id: Option<i64>,
name: String,
value: i32,
}
#[derive(Clone)]
struct TestItemFields;
impl crate::orm::model::FieldSelector for TestItemFields {
fn with_alias(self, _alias: &str) -> Self {
self
}
}
#[allow(dead_code)]
const TEST_ITEM_TABLE: TableName = TableName::new_const("test_items");
impl Model for TestItem {
type PrimaryKey = i64;
type Fields = TestItemFields;
fn table_name() -> &'static str {
TEST_ITEM_TABLE.as_str()
}
fn new_fields() -> Self::Fields {
TestItemFields
}
fn primary_key(&self) -> Option<Self::PrimaryKey> {
self.id
}
fn set_primary_key(&mut self, value: Self::PrimaryKey) {
self.id = Some(value);
}
}
async fn setup_transaction_test_db() -> reinhardt_core::exception::Result<()> {
use sqlx::SqlitePool;
use tokio::sync::OnceCell;
static POOL: OnceCell<SqlitePool> = OnceCell::const_new();
let pool = POOL
.get_or_init(|| async {
SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to create in-memory SQLite pool")
})
.await;
sqlx::query(
"CREATE TABLE IF NOT EXISTS test_items (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER NOT NULL
)",
)
.execute(pool)
.await
.map_err(|e| {
reinhardt_core::exception::Error::Database(format!("Create table failed: {}", e))
})?;
sqlx::query("DELETE FROM test_items")
.execute(pool)
.await
.map_err(|e| {
reinhardt_core::exception::Error::Database(format!(
"Clear table data failed: {}",
e
))
})?;
Ok(())
}
#[tokio::test]
async fn test_begin_db_execution() {
let mut tx = Transaction::new();
let sql = tx.begin().unwrap();
assert_eq!(
sql, "BEGIN TRANSACTION",
"Should generate BEGIN TRANSACTION SQL"
);
assert!(tx.is_active(), "Transaction should be active after begin()");
assert_eq!(tx.depth(), 1, "Transaction depth should be 1");
}
#[tokio::test]
async fn test_commit_db_sql_generation() {
setup_transaction_test_db().await.unwrap();
let mut tx = Transaction::new();
let begin_sql = tx.begin().unwrap();
assert_eq!(begin_sql, "BEGIN TRANSACTION");
assert!(tx.is_active());
assert_eq!(tx.depth(), 1);
let commit_sql = tx.commit().unwrap();
assert_eq!(commit_sql, "COMMIT");
assert!(!tx.is_active());
assert_eq!(tx.depth(), 0);
}
#[tokio::test]
async fn test_rollback_db_sql_generation() {
setup_transaction_test_db().await.unwrap();
let mut tx = Transaction::new();
let begin_sql = tx.begin().unwrap();
assert_eq!(begin_sql, "BEGIN TRANSACTION");
assert!(tx.is_active());
let rollback_sql = tx.rollback().unwrap();
assert_eq!(rollback_sql, "ROLLBACK");
assert!(!tx.is_active());
assert_eq!(tx.depth(), 0);
}
#[tokio::test]
async fn test_nested_transaction_sql_generation() {
setup_transaction_test_db().await.unwrap();
let mut tx = Transaction::new();
let begin_sql = tx.begin().unwrap();
assert_eq!(begin_sql, "BEGIN TRANSACTION");
assert_eq!(tx.depth(), 1);
let savepoint_sql = tx.begin().unwrap();
assert!(savepoint_sql.contains("SAVEPOINT \"sp_2\""));
assert_eq!(tx.depth(), 2);
let rollback_sql = tx.rollback().unwrap();
assert!(rollback_sql.contains("ROLLBACK TO SAVEPOINT"));
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
let commit_sql = tx.commit().unwrap();
assert_eq!(commit_sql, "COMMIT");
assert_eq!(tx.depth(), 0);
assert!(!tx.is_active());
}
#[tokio::test]
async fn test_transaction_isolation_level_sql() {
setup_transaction_test_db().await.unwrap();
let mut tx = Transaction::new().with_isolation_level(IsolationLevel::Serializable);
let begin_sql = tx.begin().unwrap();
assert!(begin_sql.contains("ISOLATION LEVEL SERIALIZABLE"));
assert!(tx.is_active());
}
}
#[cfg(test)]
mod transaction_extended_tests {
use super::*;
use crate::orm::connection::{DatabaseBackend, DatabaseConnection};
use crate::backends::backend::DatabaseBackend as BackendTrait;
use crate::backends::connection::DatabaseConnection as BackendsConnection;
use crate::backends::error::Result;
use crate::backends::types::{DatabaseType, QueryResult, QueryValue, Row, TransactionExecutor};
use rstest::*;
use std::sync::Arc;
struct MockTransactionExecutor;
#[async_trait::async_trait]
impl TransactionExecutor for MockTransactionExecutor {
async fn execute(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
Ok(QueryResult { rows_affected: 0 })
}
async fn fetch_one(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
Ok(Row::new())
}
async fn fetch_all(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
Ok(Vec::new())
}
async fn fetch_optional(
&mut self,
_sql: &str,
_params: Vec<QueryValue>,
) -> Result<Option<Row>> {
Ok(None)
}
async fn commit(self: Box<Self>) -> Result<()> {
Ok(())
}
async fn rollback(self: Box<Self>) -> Result<()> {
Ok(())
}
}
struct MockBackend;
#[async_trait::async_trait]
impl BackendTrait for MockBackend {
fn database_type(&self) -> DatabaseType {
DatabaseType::Postgres
}
fn placeholder(&self, index: usize) -> String {
format!("${}", index)
}
fn supports_returning(&self) -> bool {
true
}
fn supports_on_conflict(&self) -> bool {
true
}
async fn execute(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
Ok(QueryResult { rows_affected: 1 })
}
async fn fetch_one(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
Ok(Row::new())
}
async fn fetch_all(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
Ok(Vec::new())
}
async fn fetch_optional(
&self,
_sql: &str,
_params: Vec<QueryValue>,
) -> Result<Option<Row>> {
Ok(None)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn begin(&self) -> Result<Box<dyn TransactionExecutor>> {
Ok(Box::new(MockTransactionExecutor))
}
}
#[fixture]
fn mock_connection() -> DatabaseConnection {
let mock_backend = Arc::new(MockBackend);
let backends_conn = BackendsConnection::new(mock_backend);
DatabaseConnection::new(DatabaseBackend::Postgres, backends_conn)
}
#[test]
fn test_alternate_decorator_syntax_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_alternate_decorator_syntax_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_alternate_decorator_syntax_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_alternate_decorator_syntax_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_atomic_allows_queries_after_fixing_transaction() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert!(!tx.is_active());
}
#[test]
fn test_atomic_allows_queries_after_fixing_transaction_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert!(!tx.is_active());
}
#[test]
fn test_atomic_does_not_leak_savepoints_on_failure() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_atomic_does_not_leak_savepoints_on_failure_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_atomic_prevents_calling_transaction_methods() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
}
#[test]
fn test_atomic_prevents_calling_transaction_methods_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
}
#[test]
fn test_atomic_prevents_queries_in_broken_transaction() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_atomic_prevents_queries_in_broken_transaction_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_atomic_prevents_queries_in_broken_transaction_after_client_close() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert!(!tx.is_active());
}
#[test]
fn test_atomic_prevents_queries_in_broken_transaction_after_client_close_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert!(!tx.is_active());
}
#[test]
fn test_atomic_prevents_setting_autocommit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
}
#[test]
fn test_atomic_prevents_setting_autocommit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
}
#[test]
fn test_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_commit_2() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_commit_3() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_decorator_syntax_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_decorator_syntax_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_decorator_syntax_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_decorator_syntax_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_failure_on_exit_transaction() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_failure_on_exit_transaction_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_force_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_force_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_implicit_savepoint_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_implicit_savepoint_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_mark_for_rollback_on_error_in_autocommit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_mark_for_rollback_on_error_in_autocommit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_mark_for_rollback_on_error_in_transaction() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state(), Ok(TransactionState::RolledBack));
}
#[test]
fn test_mark_for_rollback_on_error_in_transaction_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state(), Ok(TransactionState::RolledBack));
}
#[test]
fn test_merged_commit_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_merged_commit_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_merged_commit_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_merged_commit_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_merged_inner_savepoint_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_merged_inner_savepoint_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_merged_outer_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_merged_outer_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_merged_rollback_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_merged_rollback_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_merged_rollback_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_merged_rollback_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_nested_both_durable() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_both_durable_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_commit_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.depth(), 1);
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_commit_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.depth(), 1);
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_commit_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_nested_commit_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_nested_inner_durable() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_nested_inner_durable_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.depth(), 1);
assert!(tx.is_active());
}
#[test]
fn test_nested_outer_durable() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_outer_durable_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.commit().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_rollback_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_rollback_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_nested_rollback_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_nested_rollback_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.begin().unwrap();
tx.rollback().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_orm_query_after_error_and_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_orm_query_after_error_and_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_orm_query_without_autocommit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_orm_query_without_autocommit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
assert!(tx.is_active());
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_prevent_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_prevent_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_reuse_commit_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_reuse_commit_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_reuse_commit_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_reuse_commit_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_reuse_rollback_commit() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_reuse_rollback_commit_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_reuse_rollback_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_reuse_rollback_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_rollback() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_rollback_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.rollback().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
}
#[test]
fn test_sequence_of_durables() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_sequence_of_durables_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_wrap_callable_instance() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[test]
fn test_wrap_callable_instance_1() {
let mut tx = Transaction::new();
tx.begin().unwrap();
tx.commit().unwrap();
assert_eq!(tx.state().unwrap(), TransactionState::Committed);
}
#[rstest]
#[tokio::test]
async fn test_transaction_closure_success(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result = transaction(&conn, |_tx| async move { Ok(42) }).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[rstest]
#[tokio::test]
async fn test_transaction_closure_error_rollback(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result: std::result::Result<(), _> =
transaction(
&conn,
|_tx| async move { Err(anyhow::anyhow!("Test error")) },
)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), "Test error");
}
#[rstest]
#[tokio::test]
async fn test_transaction_with_isolation_level(mock_connection: DatabaseConnection) {
let conn = mock_connection;
let result = transaction_with_isolation(
&conn,
IsolationLevel::Serializable,
|_tx| async move { Ok(()) },
)
.await;
assert!(result.is_ok());
}
}