1use std::sync::{Arc, Mutex};
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum IsolationLevel {
88 ReadUncommitted,
90 ReadCommitted,
92 RepeatableRead,
94 Serializable,
96}
97
98impl IsolationLevel {
99 pub fn to_sql(&self) -> &'static str {
113 match self {
114 IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
115 IsolationLevel::ReadCommitted => "READ COMMITTED",
116 IsolationLevel::RepeatableRead => "REPEATABLE READ",
117 IsolationLevel::Serializable => "SERIALIZABLE",
118 }
119 }
120
121 pub(crate) fn to_backends_level(self) -> super::connection::IsolationLevel {
123 match self {
124 IsolationLevel::ReadUncommitted => super::connection::IsolationLevel::ReadUncommitted,
125 IsolationLevel::ReadCommitted => super::connection::IsolationLevel::ReadCommitted,
126 IsolationLevel::RepeatableRead => super::connection::IsolationLevel::RepeatableRead,
127 IsolationLevel::Serializable => super::connection::IsolationLevel::Serializable,
128 }
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum TransactionState {
135 NotStarted,
137 Active,
139 Committed,
141 RolledBack,
143}
144
145#[derive(Debug, Clone)]
151pub struct Savepoint {
152 name: String,
153 pub depth: usize,
155}
156
157impl Savepoint {
158 pub fn new(name: impl Into<String>, depth: usize) -> Self {
175 let name = name.into();
176 validate_savepoint_name(&name).unwrap_or_else(|e| panic!("Invalid savepoint name: {}", e));
177 Self { name, depth }
178 }
179
180 pub fn name(&self) -> &str {
182 &self.name
183 }
184
185 pub fn to_sql(&self) -> String {
196 format!("SAVEPOINT \"{}\"", self.name.replace('"', "\"\""))
197 }
198
199 pub fn release_sql(&self) -> String {
210 format!("RELEASE SAVEPOINT \"{}\"", self.name.replace('"', "\"\""))
211 }
212
213 pub fn rollback_sql(&self) -> String {
224 format!(
225 "ROLLBACK TO SAVEPOINT \"{}\"",
226 self.name.replace('"', "\"\"")
227 )
228 }
229}
230
231fn validate_savepoint_name(name: &str) -> Result<(), String> {
236 if name.is_empty() {
237 return Err("Savepoint name cannot be empty".to_string());
238 }
239
240 if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
241 return Err(format!(
242 "Savepoint name '{}' contains invalid characters. Only alphanumeric characters and underscores are allowed",
243 name
244 ));
245 }
246
247 if let Some(first_char) = name.chars().next()
248 && first_char.is_numeric()
249 {
250 return Err(format!(
251 "Savepoint name '{}' cannot start with a number",
252 name
253 ));
254 }
255
256 Ok(())
257}
258
259#[derive(Debug)]
261pub struct Transaction {
262 state: Arc<Mutex<TransactionState>>,
263 isolation_level: Option<IsolationLevel>,
264 savepoints: Arc<Mutex<Vec<Savepoint>>>,
265 depth: usize,
266}
267
268impl Transaction {
269 pub fn new() -> Self {
281 Self {
282 state: Arc::new(Mutex::new(TransactionState::NotStarted)),
283 isolation_level: None,
284 savepoints: Arc::new(Mutex::new(Vec::new())),
285 depth: 0,
286 }
287 }
288 pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
300 self.isolation_level = Some(level);
301 self
302 }
303 pub fn begin(&mut self) -> Result<String, String> {
320 let mut state = self.state.lock().map_err(|e| e.to_string())?;
321
322 match *state {
323 TransactionState::NotStarted => {
324 *state = TransactionState::Active;
325 self.depth = 1;
326
327 let sql = if let Some(level) = self.isolation_level {
328 format!("BEGIN TRANSACTION ISOLATION LEVEL {}", level.to_sql())
329 } else {
330 "BEGIN TRANSACTION".to_string()
331 };
332
333 Ok(sql)
334 }
335 TransactionState::Active => {
336 self.depth += 1;
338 let savepoint = Savepoint::new(format!("sp_{}", self.depth), self.depth);
339 let sql = savepoint.to_sql();
340
341 self.savepoints
342 .lock()
343 .map_err(|e| e.to_string())?
344 .push(savepoint);
345
346 Ok(sql)
347 }
348 _ => Err("Transaction already completed".to_string()),
349 }
350 }
351 pub fn commit(&mut self) -> Result<String, String> {
365 let mut state = self.state.lock().map_err(|e| e.to_string())?;
366
367 match *state {
368 TransactionState::Active => {
369 if self.depth > 1 {
370 let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
372 if let Some(savepoint) = savepoints.pop() {
373 self.depth -= 1;
374 Ok(savepoint.release_sql())
375 } else {
376 Err("No savepoint to release".to_string())
377 }
378 } else {
379 *state = TransactionState::Committed;
381 self.depth = 0;
382 Ok("COMMIT".to_string())
383 }
384 }
385 _ => Err("No active transaction to commit".to_string()),
386 }
387 }
388 pub fn rollback(&mut self) -> Result<String, String> {
402 let mut state = self.state.lock().map_err(|e| e.to_string())?;
403
404 match *state {
405 TransactionState::Active => {
406 if self.depth > 1 {
407 let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
409 if let Some(savepoint) = savepoints.pop() {
410 self.depth -= 1;
411 Ok(savepoint.rollback_sql())
412 } else {
413 Err("No savepoint to rollback to".to_string())
414 }
415 } else {
416 *state = TransactionState::RolledBack;
418 self.depth = 0;
419 self.savepoints.lock().map_err(|e| e.to_string())?.clear();
420 Ok("ROLLBACK".to_string())
421 }
422 }
423 _ => Err("No active transaction to rollback".to_string()),
424 }
425 }
426 pub fn state(&self) -> Result<TransactionState, String> {
437 self.state.lock().map(|s| *s).map_err(|e| e.to_string())
438 }
439 pub fn depth(&self) -> usize {
456 self.depth
457 }
458
459 pub async fn begin_db(&mut self) -> reinhardt_core::exception::Result<()> {
463 let sql = self
464 .begin()
465 .map_err(reinhardt_core::exception::Error::Database)?;
466 let conn = super::manager::get_connection().await?;
467 conn.execute(&sql, vec![]).await?;
468 Ok(())
469 }
470
471 pub async fn commit_db(&mut self) -> reinhardt_core::exception::Result<()> {
475 let sql = self
476 .commit()
477 .map_err(reinhardt_core::exception::Error::Database)?;
478 let conn = super::manager::get_connection().await?;
479 conn.execute(&sql, vec![]).await?;
480 Ok(())
481 }
482
483 pub async fn rollback_db(&mut self) -> reinhardt_core::exception::Result<()> {
487 let sql = self
488 .rollback()
489 .map_err(reinhardt_core::exception::Error::Database)?;
490 let conn = super::manager::get_connection().await?;
491 conn.execute(&sql, vec![]).await?;
492 Ok(())
493 }
494 pub fn is_active(&self) -> bool {
511 matches!(self.state().ok(), Some(TransactionState::Active))
512 }
513 pub fn savepoint(&mut self, name: impl Into<String>) -> Result<String, String> {
527 let state = self.state.lock().map_err(|e| e.to_string())?;
528
529 if *state != TransactionState::Active {
530 return Err("Cannot create savepoint outside active transaction".to_string());
531 }
532
533 drop(state);
534
535 let savepoint = Savepoint::new(name, self.depth);
536 let sql = savepoint.to_sql();
537
538 self.savepoints
539 .lock()
540 .map_err(|e| e.to_string())?
541 .push(savepoint);
542
543 Ok(sql)
544 }
545 pub fn release_savepoint(&mut self, name: &str) -> Result<String, String> {
560 let mut savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
561
562 if let Some(pos) = savepoints.iter().position(|sp| sp.name() == name) {
563 let savepoint = savepoints.remove(pos);
564 Ok(savepoint.release_sql())
565 } else {
566 Err(format!("Savepoint '{}' not found", name))
567 }
568 }
569 pub fn rollback_to_savepoint(&mut self, name: &str) -> Result<String, String> {
584 let savepoints = self.savepoints.lock().map_err(|e| e.to_string())?;
585
586 if let Some(savepoint) = savepoints.iter().find(|sp| sp.name() == name) {
587 Ok(savepoint.rollback_sql())
588 } else {
589 Err(format!("Savepoint '{}' not found", name))
590 }
591 }
592}
593
594impl Default for Transaction {
595 fn default() -> Self {
596 Self::new()
597 }
598}
599
600pub struct Atomic<F> {
602 _func: F,
603 _isolation_level: Option<IsolationLevel>,
604}
605
606impl<F> Atomic<F> {
607 pub fn new(func: F) -> Self {
621 Self {
622 _func: func,
623 _isolation_level: None,
624 }
625 }
626 pub fn with_isolation_level(mut self, level: IsolationLevel) -> Self {
640 self._isolation_level = Some(level);
641 self
642 }
643}
644
645pub struct TransactionScope {
685 executor: Option<Box<dyn super::connection::TransactionExecutor>>,
686 committed: bool,
687}
688
689impl TransactionScope {
690 pub async fn begin(
712 conn: &super::connection::DatabaseConnection,
713 ) -> Result<Self, anyhow::Error> {
714 let executor = conn.begin().await?;
715 Ok(Self {
716 executor: Some(executor),
717 committed: false,
718 })
719 }
720
721 pub async fn begin_with_isolation(
747 conn: &super::connection::DatabaseConnection,
748 level: IsolationLevel,
749 ) -> Result<Self, anyhow::Error> {
750 let executor = conn.begin_with_isolation(level.to_backends_level()).await?;
751 Ok(Self {
752 executor: Some(executor),
753 committed: false,
754 })
755 }
756
757 pub async fn execute(
777 &mut self,
778 sql: &str,
779 params: Vec<super::connection::QueryValue>,
780 ) -> Result<u64, anyhow::Error> {
781 let executor = self
782 .executor
783 .as_mut()
784 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
785 let result = executor.execute(sql, params).await?;
786 Ok(result.rows_affected)
787 }
788
789 pub async fn query_one(
791 &mut self,
792 sql: &str,
793 params: Vec<super::connection::QueryValue>,
794 ) -> Result<super::connection::QueryRow, anyhow::Error> {
795 let executor = self
796 .executor
797 .as_mut()
798 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
799 let row = executor.fetch_one(sql, params).await?;
800 Ok(super::connection::QueryRow::from_backend_row(row))
801 }
802
803 pub async fn query(
805 &mut self,
806 sql: &str,
807 params: Vec<super::connection::QueryValue>,
808 ) -> Result<Vec<super::connection::QueryRow>, anyhow::Error> {
809 let executor = self
810 .executor
811 .as_mut()
812 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
813 let rows = executor.fetch_all(sql, params).await?;
814 Ok(rows
815 .into_iter()
816 .map(super::connection::QueryRow::from_backend_row)
817 .collect())
818 }
819
820 pub async fn query_optional(
822 &mut self,
823 sql: &str,
824 params: Vec<super::connection::QueryValue>,
825 ) -> Result<Option<super::connection::QueryRow>, anyhow::Error> {
826 let executor = self
827 .executor
828 .as_mut()
829 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
830 let row = executor.fetch_optional(sql, params).await?;
831 Ok(row.map(super::connection::QueryRow::from_backend_row))
832 }
833
834 pub async fn commit(mut self) -> Result<(), anyhow::Error> {
852 let executor = self
853 .executor
854 .take()
855 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
856 executor.commit().await?;
857 self.committed = true;
858 Ok(())
859 }
860
861 pub async fn rollback(mut self) -> Result<(), anyhow::Error> {
879 let executor = self
880 .executor
881 .take()
882 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
883 executor.rollback().await?;
884 self.committed = true; Ok(())
886 }
887
888 pub async fn savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
924 let executor = self
925 .executor
926 .as_mut()
927 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
928 executor.savepoint(name).await?;
929 Ok(())
930 }
931
932 pub async fn release_savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
961 let executor = self
962 .executor
963 .as_mut()
964 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
965 executor.release_savepoint(name).await?;
966 Ok(())
967 }
968
969 pub async fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), anyhow::Error> {
1002 let executor = self
1003 .executor
1004 .as_mut()
1005 .ok_or_else(|| anyhow::anyhow!("Transaction already consumed"))?;
1006 executor.rollback_to_savepoint(name).await?;
1007 Ok(())
1008 }
1009
1010 pub async fn run<F, Fut, T>(mut self, f: F) -> Result<T, anyhow::Error>
1037 where
1038 F: FnOnce(&mut Self) -> Fut,
1039 Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
1040 {
1041 match f(&mut self).await {
1042 Ok(result) => {
1043 self.commit().await?;
1044 Ok(result)
1045 }
1046 Err(e) => {
1047 self.rollback().await?;
1048 Err(e)
1049 }
1050 }
1051 }
1052}
1053
1054impl Drop for TransactionScope {
1055 fn drop(&mut self) {
1070 if !self.committed
1071 && let Some(executor) = self.executor.take()
1072 {
1073 eprintln!(
1074 "Warning: TransactionScope dropped without explicit commit/rollback. \
1075 Consider using transaction() function for automatic error handling."
1076 );
1077
1078 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1082 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1084 tokio::task::block_in_place(|| {
1085 handle.block_on(async { executor.rollback().await })
1086 })
1087 }));
1088
1089 match result {
1090 Ok(Ok(())) => {
1091 }
1093 Ok(Err(e)) => {
1094 eprintln!("Error during automatic rollback: {}", e);
1095 }
1096 Err(_) => {
1097 eprintln!(
1099 "Warning: Cannot perform automatic rollback on single-threaded runtime. \
1100 Use transaction() function or explicit commit()/rollback()."
1101 );
1102 }
1103 }
1104 } else {
1105 eprintln!(
1107 "Warning: No async runtime available for automatic rollback. \
1108 Transaction may not be cleaned up properly."
1109 );
1110 }
1111 }
1112 }
1113}
1114
1115pub async fn atomic<F, Fut, T>(
1142 conn: &super::connection::DatabaseConnection,
1143 f: F,
1144) -> Result<T, anyhow::Error>
1145where
1146 F: FnOnce() -> Fut,
1147 Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
1148{
1149 let tx = TransactionScope::begin(conn).await?;
1150 let result = f().await?;
1151 tx.commit().await?;
1152 Ok(result)
1153}
1154
1155pub async fn atomic_with_isolation<F, Fut, T>(
1181 conn: &super::connection::DatabaseConnection,
1182 level: IsolationLevel,
1183 f: F,
1184) -> Result<T, anyhow::Error>
1185where
1186 F: FnOnce() -> Fut,
1187 Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
1188{
1189 let tx = TransactionScope::begin_with_isolation(conn, level).await?;
1190 let result = f().await?;
1191 tx.commit().await?;
1192 Ok(result)
1193}
1194
1195pub async fn transaction<F, Fut, T>(
1259 conn: &super::connection::DatabaseConnection,
1260 f: F,
1261) -> Result<T, anyhow::Error>
1262where
1263 F: FnOnce(&mut TransactionScope) -> Fut,
1264 Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
1265{
1266 let mut tx = TransactionScope::begin(conn).await?;
1267
1268 match f(&mut tx).await {
1269 Ok(result) => {
1270 tx.commit().await?;
1271 Ok(result)
1272 }
1273 Err(e) => {
1274 tx.rollback().await?;
1275 Err(e)
1276 }
1277 }
1278}
1279
1280pub async fn transaction_with_isolation<F, Fut, T>(
1303 conn: &super::connection::DatabaseConnection,
1304 level: IsolationLevel,
1305 f: F,
1306) -> Result<T, anyhow::Error>
1307where
1308 F: FnOnce(&mut TransactionScope) -> Fut,
1309 Fut: std::future::Future<Output = Result<T, anyhow::Error>>,
1310{
1311 let mut tx = TransactionScope::begin_with_isolation(conn, level).await?;
1312
1313 match f(&mut tx).await {
1314 Ok(result) => {
1315 tx.commit().await?;
1316 Ok(result)
1317 }
1318 Err(e) => {
1319 tx.rollback().await?;
1320 Err(e)
1321 }
1322 }
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327 use super::*;
1328 use crate::backends::backend::DatabaseBackend as BackendTrait;
1329 use crate::backends::connection::DatabaseConnection as BackendsConnection;
1330 use crate::backends::error::Result;
1331 use crate::backends::types::{DatabaseType, QueryResult, QueryValue, Row, TransactionExecutor};
1332 use crate::orm::connection::{DatabaseBackend, DatabaseConnection};
1333 use crate::prelude::Model;
1334 use rstest::*;
1335 use std::sync::Arc;
1336
1337 struct MockTransactionExecutor;
1339
1340 #[async_trait::async_trait]
1341 impl TransactionExecutor for MockTransactionExecutor {
1342 async fn execute(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
1343 Ok(QueryResult { rows_affected: 0 })
1344 }
1345
1346 async fn fetch_one(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
1347 Ok(Row::new())
1348 }
1349
1350 async fn fetch_all(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
1351 Ok(Vec::new())
1352 }
1353
1354 async fn fetch_optional(
1355 &mut self,
1356 _sql: &str,
1357 _params: Vec<QueryValue>,
1358 ) -> Result<Option<Row>> {
1359 Ok(None)
1360 }
1361
1362 async fn commit(self: Box<Self>) -> Result<()> {
1363 Ok(())
1364 }
1365
1366 async fn rollback(self: Box<Self>) -> Result<()> {
1367 Ok(())
1368 }
1369 }
1370
1371 struct MockBackend;
1372
1373 #[async_trait::async_trait]
1374 impl BackendTrait for MockBackend {
1375 fn database_type(&self) -> DatabaseType {
1376 DatabaseType::Postgres
1377 }
1378 fn placeholder(&self, index: usize) -> String {
1379 format!("${}", index)
1380 }
1381 fn supports_returning(&self) -> bool {
1382 true
1383 }
1384 fn supports_on_conflict(&self) -> bool {
1385 true
1386 }
1387 async fn execute(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
1388 Ok(QueryResult { rows_affected: 1 })
1389 }
1390 async fn fetch_one(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
1391 Ok(Row::new())
1392 }
1393 async fn fetch_all(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
1394 Ok(Vec::new())
1395 }
1396 async fn fetch_optional(
1397 &self,
1398 _sql: &str,
1399 _params: Vec<QueryValue>,
1400 ) -> Result<Option<Row>> {
1401 Ok(None)
1402 }
1403 fn as_any(&self) -> &dyn std::any::Any {
1404 self
1405 }
1406 async fn begin(&self) -> Result<Box<dyn TransactionExecutor>> {
1407 Ok(Box::new(MockTransactionExecutor))
1408 }
1409 }
1410
1411 #[fixture]
1412 fn mock_connection() -> DatabaseConnection {
1413 let mock_backend = Arc::new(MockBackend);
1414 let backends_conn = BackendsConnection::new(mock_backend);
1415 DatabaseConnection::new(DatabaseBackend::Postgres, backends_conn)
1416 }
1417
1418 #[rstest]
1419 #[tokio::test]
1420 async fn test_transaction_scope_commit(mock_connection: DatabaseConnection) {
1421 let conn = mock_connection;
1422
1423 let tx = TransactionScope::begin(&conn).await;
1424 let tx = tx.unwrap();
1425 assert!(!tx.committed);
1426
1427 let result = tx.commit().await;
1428 assert!(result.is_ok());
1429 }
1430
1431 #[rstest]
1432 #[tokio::test]
1433 async fn test_transaction_scope_rollback(mock_connection: DatabaseConnection) {
1434 let conn = mock_connection;
1435
1436 let tx = TransactionScope::begin(&conn).await.unwrap();
1437 let result = tx.rollback().await;
1438 assert!(result.is_ok());
1439 }
1440
1441 #[rstest]
1442 #[tokio::test]
1443 async fn test_transaction_scope_with_isolation(mock_connection: DatabaseConnection) {
1444 let conn = mock_connection;
1445
1446 let tx = TransactionScope::begin_with_isolation(&conn, IsolationLevel::Serializable).await;
1447 let tx = tx.unwrap();
1448 let result = tx.commit().await;
1449 assert!(result.is_ok());
1450 }
1451
1452 #[rstest]
1453 #[tokio::test]
1454 async fn test_atomic_helper(mock_connection: DatabaseConnection) {
1455 let conn = mock_connection;
1456
1457 let result = atomic(&conn, || async move { Ok::<_, anyhow::Error>(42) }).await;
1458
1459 assert!(result.is_ok());
1460 assert_eq!(result.unwrap(), 42);
1461 }
1462
1463 #[rstest]
1464 #[tokio::test]
1465 async fn test_atomic_helper_with_error(mock_connection: DatabaseConnection) {
1466 let conn = mock_connection;
1467
1468 let result = atomic(&conn, || async move {
1469 Err::<i32, _>(anyhow::anyhow!("test error"))
1470 })
1471 .await;
1472
1473 assert!(result.is_err());
1474 }
1475
1476 #[rstest]
1477 #[tokio::test]
1478 async fn test_atomic_with_isolation_helper(mock_connection: DatabaseConnection) {
1479 let conn = mock_connection;
1480
1481 let result = atomic_with_isolation(&conn, IsolationLevel::Serializable, || async move {
1482 Ok::<_, anyhow::Error>(100)
1483 })
1484 .await;
1485
1486 assert!(result.is_ok());
1487 assert_eq!(result.unwrap(), 100);
1488 }
1489
1490 #[test]
1491 fn test_transaction_begin() {
1492 let mut tx = Transaction::new();
1493 let sql = tx.begin().unwrap();
1494 assert_eq!(sql, "BEGIN TRANSACTION");
1495 assert_eq!(tx.state().unwrap(), TransactionState::Active);
1496 assert_eq!(tx.depth(), 1);
1497 }
1498
1499 #[test]
1500 fn test_transaction_commit() {
1501 let mut tx = Transaction::new();
1502 tx.begin().unwrap();
1503 let sql = tx.commit().unwrap();
1504 assert_eq!(sql, "COMMIT");
1505 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
1506 assert_eq!(tx.depth(), 0);
1507 }
1508
1509 #[test]
1510 fn test_orm_transaction_rollback() {
1511 let mut tx = Transaction::new();
1512 tx.begin().unwrap();
1513 let sql = tx.rollback().unwrap();
1514 assert_eq!(sql, "ROLLBACK");
1515 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
1516 assert_eq!(tx.depth(), 0);
1517 }
1518
1519 #[test]
1520 fn test_nested_transaction_begin() {
1521 let mut tx = Transaction::new();
1522 tx.begin().unwrap();
1523 let sql = tx.begin().unwrap();
1524 assert!(sql.contains("SAVEPOINT \"sp_2\""));
1525 assert_eq!(tx.depth(), 2);
1526 }
1527
1528 #[test]
1529 fn test_nested_transaction_commit() {
1530 let mut tx = Transaction::new();
1531 tx.begin().unwrap();
1532 tx.begin().unwrap();
1533 let sql = tx.commit().unwrap();
1534 assert!(sql.contains("RELEASE SAVEPOINT"));
1535 assert_eq!(tx.depth(), 1);
1536 assert!(tx.is_active());
1537 }
1538
1539 #[test]
1540 fn test_nested_transaction_rollback() {
1541 let mut tx = Transaction::new();
1542 tx.begin().unwrap();
1543 tx.begin().unwrap();
1544 let sql = tx.rollback().unwrap();
1545 assert!(sql.contains("ROLLBACK TO SAVEPOINT"));
1546 assert_eq!(tx.depth(), 1);
1547 assert!(tx.is_active());
1548 }
1549
1550 #[test]
1551 fn test_isolation_level() {
1552 let mut tx = Transaction::new().with_isolation_level(IsolationLevel::Serializable);
1553 let sql = tx.begin().unwrap();
1554 assert!(sql.contains("ISOLATION LEVEL SERIALIZABLE"));
1555 }
1556
1557 #[test]
1558 fn test_manual_savepoint() {
1559 let mut tx = Transaction::new();
1560 tx.begin().unwrap();
1561 let sql = tx.savepoint("my_savepoint").unwrap();
1562 assert_eq!(sql, r#"SAVEPOINT "my_savepoint""#);
1563 }
1564
1565 #[test]
1566 fn test_orm_transaction_release_savepoint() {
1567 let mut tx = Transaction::new();
1568 tx.begin().unwrap();
1569 tx.savepoint("my_savepoint").unwrap();
1570 let sql = tx.release_savepoint("my_savepoint").unwrap();
1571 assert_eq!(sql, r#"RELEASE SAVEPOINT "my_savepoint""#);
1572 }
1573
1574 #[test]
1575 fn test_orm_transaction_rollback_savepoint() {
1576 let mut tx = Transaction::new();
1577 tx.begin().unwrap();
1578 tx.savepoint("my_savepoint").unwrap();
1579 let sql = tx.rollback_to_savepoint("my_savepoint").unwrap();
1580 assert_eq!(sql, r#"ROLLBACK TO SAVEPOINT "my_savepoint""#);
1581 }
1582
1583 #[test]
1584 fn test_transaction_is_active() {
1585 let mut tx = Transaction::new();
1586 assert!(!tx.is_active());
1587 tx.begin().unwrap();
1588 assert!(tx.is_active());
1589 tx.commit().unwrap();
1590 assert!(!tx.is_active());
1591 }
1592
1593 #[test]
1594 fn test_commit_without_begin() {
1595 let mut tx = Transaction::new();
1596 let result = tx.commit();
1597 assert!(result.is_err());
1598 }
1599
1600 #[test]
1601 fn test_rollback_without_begin() {
1602 let mut tx = Transaction::new();
1603 let result = tx.rollback();
1604 assert!(result.is_err());
1605 }
1606
1607 #[test]
1608 fn test_savepoint_outside_transaction() {
1609 let mut tx = Transaction::new();
1610 let result = tx.savepoint("test");
1611 assert!(result.is_err());
1612 }
1613
1614 use reinhardt_core::validators::TableName;
1616 use serde::{Deserialize, Serialize};
1617
1618 #[allow(dead_code)]
1620 #[derive(Debug, Clone, Serialize, Deserialize)]
1621 struct TestItem {
1622 id: Option<i64>,
1623 name: String,
1624 value: i32,
1625 }
1626
1627 #[derive(Clone)]
1628 struct TestItemFields;
1629 impl crate::orm::model::FieldSelector for TestItemFields {
1630 fn with_alias(self, _alias: &str) -> Self {
1631 self
1632 }
1633 }
1634
1635 #[allow(dead_code)]
1637 const TEST_ITEM_TABLE: TableName = TableName::new_const("test_items");
1638
1639 impl Model for TestItem {
1640 type PrimaryKey = i64;
1641 type Fields = TestItemFields;
1642
1643 fn table_name() -> &'static str {
1644 TEST_ITEM_TABLE.as_str()
1645 }
1646
1647 fn new_fields() -> Self::Fields {
1648 TestItemFields
1649 }
1650
1651 fn primary_key(&self) -> Option<Self::PrimaryKey> {
1652 self.id
1653 }
1654
1655 fn set_primary_key(&mut self, value: Self::PrimaryKey) {
1656 self.id = Some(value);
1657 }
1658 }
1659
1660 async fn setup_transaction_test_db() -> reinhardt_core::exception::Result<()> {
1661 use sqlx::SqlitePool;
1662 use tokio::sync::OnceCell;
1663
1664 static POOL: OnceCell<SqlitePool> = OnceCell::const_new();
1665
1666 let pool = POOL
1668 .get_or_init(|| async {
1669 SqlitePool::connect("sqlite::memory:")
1670 .await
1671 .expect("Failed to create in-memory SQLite pool")
1672 })
1673 .await;
1674
1675 sqlx::query(
1677 "CREATE TABLE IF NOT EXISTS test_items (
1678 id INTEGER PRIMARY KEY,
1679 name TEXT NOT NULL,
1680 value INTEGER NOT NULL
1681 )",
1682 )
1683 .execute(pool)
1684 .await
1685 .map_err(|e| {
1686 reinhardt_core::exception::Error::Database(format!("Create table failed: {}", e))
1687 })?;
1688
1689 sqlx::query("DELETE FROM test_items")
1691 .execute(pool)
1692 .await
1693 .map_err(|e| {
1694 reinhardt_core::exception::Error::Database(format!(
1695 "Clear table data failed: {}",
1696 e
1697 ))
1698 })?;
1699
1700 Ok(())
1701 }
1702
1703 #[tokio::test]
1714 async fn test_begin_db_execution() {
1715 let mut tx = Transaction::new();
1716
1717 let sql = tx.begin().unwrap();
1719 assert_eq!(
1720 sql, "BEGIN TRANSACTION",
1721 "Should generate BEGIN TRANSACTION SQL"
1722 );
1723
1724 assert!(tx.is_active(), "Transaction should be active after begin()");
1726 assert_eq!(tx.depth(), 1, "Transaction depth should be 1");
1727 }
1728
1729 #[tokio::test]
1730 async fn test_commit_db_sql_generation() {
1731 setup_transaction_test_db().await.unwrap();
1734
1735 let mut tx = Transaction::new();
1736
1737 let begin_sql = tx.begin().unwrap();
1739 assert_eq!(begin_sql, "BEGIN TRANSACTION");
1740 assert!(tx.is_active());
1741 assert_eq!(tx.depth(), 1);
1742
1743 let commit_sql = tx.commit().unwrap();
1745 assert_eq!(commit_sql, "COMMIT");
1746 assert!(!tx.is_active());
1747 assert_eq!(tx.depth(), 0);
1748 }
1749
1750 #[tokio::test]
1751 async fn test_rollback_db_sql_generation() {
1752 setup_transaction_test_db().await.unwrap();
1754
1755 let mut tx = Transaction::new();
1756
1757 let begin_sql = tx.begin().unwrap();
1759 assert_eq!(begin_sql, "BEGIN TRANSACTION");
1760 assert!(tx.is_active());
1761
1762 let rollback_sql = tx.rollback().unwrap();
1764 assert_eq!(rollback_sql, "ROLLBACK");
1765 assert!(!tx.is_active());
1766 assert_eq!(tx.depth(), 0);
1767 }
1768
1769 #[tokio::test]
1770 async fn test_nested_transaction_sql_generation() {
1771 setup_transaction_test_db().await.unwrap();
1773
1774 let mut tx = Transaction::new();
1775
1776 let begin_sql = tx.begin().unwrap();
1778 assert_eq!(begin_sql, "BEGIN TRANSACTION");
1779 assert_eq!(tx.depth(), 1);
1780
1781 let savepoint_sql = tx.begin().unwrap();
1783 assert!(savepoint_sql.contains("SAVEPOINT \"sp_2\""));
1784 assert_eq!(tx.depth(), 2);
1785
1786 let rollback_sql = tx.rollback().unwrap();
1788 assert!(rollback_sql.contains("ROLLBACK TO SAVEPOINT"));
1789 assert_eq!(tx.depth(), 1);
1790 assert!(tx.is_active());
1791
1792 let commit_sql = tx.commit().unwrap();
1794 assert_eq!(commit_sql, "COMMIT");
1795 assert_eq!(tx.depth(), 0);
1796 assert!(!tx.is_active());
1797 }
1798
1799 #[tokio::test]
1800 async fn test_transaction_isolation_level_sql() {
1801 setup_transaction_test_db().await.unwrap();
1803
1804 let mut tx = Transaction::new().with_isolation_level(IsolationLevel::Serializable);
1805 let begin_sql = tx.begin().unwrap();
1806
1807 assert!(begin_sql.contains("ISOLATION LEVEL SERIALIZABLE"));
1808 assert!(tx.is_active());
1809 }
1810}
1811#[cfg(test)]
1816mod transaction_extended_tests {
1817 use super::*;
1818 use crate::orm::connection::{DatabaseBackend, DatabaseConnection};
1819 use crate::backends::backend::DatabaseBackend as BackendTrait;
1822 use crate::backends::connection::DatabaseConnection as BackendsConnection;
1823 use crate::backends::error::Result;
1824 use crate::backends::types::{DatabaseType, QueryResult, QueryValue, Row, TransactionExecutor};
1825 use rstest::*;
1826 use std::sync::Arc;
1827
1828 struct MockTransactionExecutor;
1830
1831 #[async_trait::async_trait]
1832 impl TransactionExecutor for MockTransactionExecutor {
1833 async fn execute(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
1834 Ok(QueryResult { rows_affected: 0 })
1835 }
1836
1837 async fn fetch_one(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
1838 Ok(Row::new())
1839 }
1840
1841 async fn fetch_all(&mut self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
1842 Ok(Vec::new())
1843 }
1844
1845 async fn fetch_optional(
1846 &mut self,
1847 _sql: &str,
1848 _params: Vec<QueryValue>,
1849 ) -> Result<Option<Row>> {
1850 Ok(None)
1851 }
1852
1853 async fn commit(self: Box<Self>) -> Result<()> {
1854 Ok(())
1855 }
1856
1857 async fn rollback(self: Box<Self>) -> Result<()> {
1858 Ok(())
1859 }
1860 }
1861
1862 struct MockBackend;
1863
1864 #[async_trait::async_trait]
1865 impl BackendTrait for MockBackend {
1866 fn database_type(&self) -> DatabaseType {
1867 DatabaseType::Postgres
1868 }
1869
1870 fn placeholder(&self, index: usize) -> String {
1871 format!("${}", index)
1872 }
1873
1874 fn supports_returning(&self) -> bool {
1875 true
1876 }
1877
1878 fn supports_on_conflict(&self) -> bool {
1879 true
1880 }
1881
1882 async fn execute(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<QueryResult> {
1883 Ok(QueryResult { rows_affected: 1 })
1884 }
1885
1886 async fn fetch_one(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Row> {
1887 Ok(Row::new())
1888 }
1889
1890 async fn fetch_all(&self, _sql: &str, _params: Vec<QueryValue>) -> Result<Vec<Row>> {
1891 Ok(Vec::new())
1892 }
1893
1894 async fn fetch_optional(
1895 &self,
1896 _sql: &str,
1897 _params: Vec<QueryValue>,
1898 ) -> Result<Option<Row>> {
1899 Ok(None)
1900 }
1901
1902 fn as_any(&self) -> &dyn std::any::Any {
1903 self
1904 }
1905
1906 async fn begin(&self) -> Result<Box<dyn TransactionExecutor>> {
1907 Ok(Box::new(MockTransactionExecutor))
1908 }
1909 }
1910
1911 #[fixture]
1912 fn mock_connection() -> DatabaseConnection {
1913 let mock_backend = Arc::new(MockBackend);
1914 let backends_conn = BackendsConnection::new(mock_backend);
1915 DatabaseConnection::new(DatabaseBackend::Postgres, backends_conn)
1916 }
1917
1918 #[test]
1919 fn test_alternate_decorator_syntax_commit() {
1921 let mut tx = Transaction::new();
1922 tx.begin().unwrap();
1923 tx.commit().unwrap();
1924 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
1925 }
1926
1927 #[test]
1928 fn test_alternate_decorator_syntax_commit_1() {
1930 let mut tx = Transaction::new();
1931 tx.begin().unwrap();
1932 tx.commit().unwrap();
1933 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
1934 }
1935
1936 #[test]
1937 fn test_alternate_decorator_syntax_rollback() {
1939 let mut tx = Transaction::new();
1940 tx.begin().unwrap();
1941 tx.rollback().unwrap();
1942 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
1943 }
1944
1945 #[test]
1946 fn test_alternate_decorator_syntax_rollback_1() {
1948 let mut tx = Transaction::new();
1949 tx.begin().unwrap();
1950 tx.rollback().unwrap();
1951 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
1952 }
1953
1954 #[test]
1955 fn test_atomic_allows_queries_after_fixing_transaction() {
1957 let mut tx = Transaction::new();
1958 tx.begin().unwrap();
1959 tx.rollback().unwrap();
1960 assert!(!tx.is_active());
1961 }
1962
1963 #[test]
1964 fn test_atomic_allows_queries_after_fixing_transaction_1() {
1966 let mut tx = Transaction::new();
1967 tx.begin().unwrap();
1968 tx.rollback().unwrap();
1969 assert!(!tx.is_active());
1970 }
1971
1972 #[test]
1973 fn test_atomic_does_not_leak_savepoints_on_failure() {
1975 let mut tx = Transaction::new();
1976 tx.begin().unwrap();
1977 tx.begin().unwrap();
1978 tx.rollback().unwrap();
1979 assert_eq!(tx.depth(), 1);
1980 assert!(tx.is_active());
1981 }
1982
1983 #[test]
1984 fn test_atomic_does_not_leak_savepoints_on_failure_1() {
1986 let mut tx = Transaction::new();
1987 tx.begin().unwrap();
1988 tx.begin().unwrap();
1989 tx.rollback().unwrap();
1990 assert_eq!(tx.depth(), 1);
1991 assert!(tx.is_active());
1992 }
1993
1994 #[test]
1995 fn test_atomic_prevents_calling_transaction_methods() {
1997 let mut tx = Transaction::new();
1998 tx.begin().unwrap();
1999 assert!(tx.is_active());
2000 }
2001
2002 #[test]
2003 fn test_atomic_prevents_calling_transaction_methods_1() {
2005 let mut tx = Transaction::new();
2006 tx.begin().unwrap();
2007 assert!(tx.is_active());
2008 }
2009
2010 #[test]
2011 fn test_atomic_prevents_queries_in_broken_transaction() {
2013 let mut tx = Transaction::new();
2014 tx.begin().unwrap();
2015 tx.rollback().unwrap();
2016 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2017 }
2018
2019 #[test]
2020 fn test_atomic_prevents_queries_in_broken_transaction_1() {
2022 let mut tx = Transaction::new();
2023 tx.begin().unwrap();
2024 tx.rollback().unwrap();
2025 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2026 }
2027
2028 #[test]
2029 fn test_atomic_prevents_queries_in_broken_transaction_after_client_close() {
2031 let mut tx = Transaction::new();
2032 tx.begin().unwrap();
2033 tx.rollback().unwrap();
2034 assert!(!tx.is_active());
2035 }
2036
2037 #[test]
2038 fn test_atomic_prevents_queries_in_broken_transaction_after_client_close_1() {
2040 let mut tx = Transaction::new();
2041 tx.begin().unwrap();
2042 tx.rollback().unwrap();
2043 assert!(!tx.is_active());
2044 }
2045
2046 #[test]
2047 fn test_atomic_prevents_setting_autocommit() {
2049 let mut tx = Transaction::new();
2050 tx.begin().unwrap();
2051 assert!(tx.is_active());
2052 }
2053
2054 #[test]
2055 fn test_atomic_prevents_setting_autocommit_1() {
2057 let mut tx = Transaction::new();
2058 tx.begin().unwrap();
2059 assert!(tx.is_active());
2060 }
2061
2062 #[test]
2063 fn test_commit() {
2065 let mut tx = Transaction::new();
2066 tx.begin().unwrap();
2067 tx.commit().unwrap();
2068 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2069 }
2070
2071 #[test]
2072 fn test_commit_1() {
2074 let mut tx = Transaction::new();
2075 tx.begin().unwrap();
2076 tx.commit().unwrap();
2077 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2078 }
2079
2080 #[test]
2081 fn test_commit_2() {
2083 let mut tx = Transaction::new();
2084 tx.begin().unwrap();
2085 tx.commit().unwrap();
2086 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2087 }
2088
2089 #[test]
2090 fn test_commit_3() {
2092 let mut tx = Transaction::new();
2093 tx.begin().unwrap();
2094 tx.commit().unwrap();
2095 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2096 }
2097
2098 #[test]
2099 fn test_decorator_syntax_commit() {
2101 let mut tx = Transaction::new();
2102 tx.begin().unwrap();
2103 tx.commit().unwrap();
2104 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2105 }
2106
2107 #[test]
2108 fn test_decorator_syntax_commit_1() {
2110 let mut tx = Transaction::new();
2111 tx.begin().unwrap();
2112 tx.commit().unwrap();
2113 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2114 }
2115
2116 #[test]
2117 fn test_decorator_syntax_rollback() {
2119 let mut tx = Transaction::new();
2120 tx.begin().unwrap();
2121 tx.rollback().unwrap();
2122 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2123 }
2124
2125 #[test]
2126 fn test_decorator_syntax_rollback_1() {
2128 let mut tx = Transaction::new();
2129 tx.begin().unwrap();
2130 tx.rollback().unwrap();
2131 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2132 }
2133
2134 #[test]
2135 fn test_failure_on_exit_transaction() {
2137 let mut tx = Transaction::new();
2138 tx.begin().unwrap();
2139 tx.rollback().unwrap();
2140 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2141 }
2142
2143 #[test]
2144 fn test_failure_on_exit_transaction_1() {
2146 let mut tx = Transaction::new();
2147 tx.begin().unwrap();
2148 tx.rollback().unwrap();
2149 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2150 }
2151
2152 #[test]
2153 fn test_force_rollback() {
2155 let mut tx = Transaction::new();
2156 tx.begin().unwrap();
2157 tx.rollback().unwrap();
2158 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2159 }
2160
2161 #[test]
2162 fn test_force_rollback_1() {
2164 let mut tx = Transaction::new();
2165 tx.begin().unwrap();
2166 tx.rollback().unwrap();
2167 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2168 }
2169
2170 #[test]
2171 fn test_implicit_savepoint_rollback() {
2173 let mut tx = Transaction::new();
2174 tx.begin().unwrap();
2175 tx.begin().unwrap();
2176 tx.rollback().unwrap();
2177 assert_eq!(tx.depth(), 1);
2178 assert!(tx.is_active());
2179 }
2180
2181 #[test]
2182 fn test_implicit_savepoint_rollback_1() {
2184 let mut tx = Transaction::new();
2185 tx.begin().unwrap();
2186 tx.begin().unwrap();
2187 tx.rollback().unwrap();
2188 assert_eq!(tx.depth(), 1);
2189 assert!(tx.is_active());
2190 }
2191
2192 #[test]
2193 fn test_mark_for_rollback_on_error_in_autocommit() {
2195 let mut tx = Transaction::new();
2196 tx.begin().unwrap();
2197 tx.rollback().unwrap();
2198 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2199 }
2200
2201 #[test]
2202 fn test_mark_for_rollback_on_error_in_autocommit_1() {
2204 let mut tx = Transaction::new();
2205 tx.begin().unwrap();
2206 tx.rollback().unwrap();
2207 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2208 }
2209
2210 #[test]
2211 fn test_mark_for_rollback_on_error_in_transaction() {
2213 let mut tx = Transaction::new();
2214 tx.begin().unwrap();
2215 tx.rollback().unwrap();
2216 assert_eq!(tx.state(), Ok(TransactionState::RolledBack));
2217 }
2218
2219 #[test]
2220 fn test_mark_for_rollback_on_error_in_transaction_1() {
2222 let mut tx = Transaction::new();
2223 tx.begin().unwrap();
2224 tx.rollback().unwrap();
2225 assert_eq!(tx.state(), Ok(TransactionState::RolledBack));
2226 }
2227
2228 #[test]
2229 fn test_merged_commit_commit() {
2231 let mut tx = Transaction::new();
2232 tx.begin().unwrap();
2233 tx.begin().unwrap();
2234 tx.commit().unwrap();
2235 tx.commit().unwrap();
2236 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2237 }
2238
2239 #[test]
2240 fn test_merged_commit_commit_1() {
2242 let mut tx = Transaction::new();
2243 tx.begin().unwrap();
2244 tx.begin().unwrap();
2245 tx.commit().unwrap();
2246 tx.commit().unwrap();
2247 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2248 }
2249
2250 #[test]
2251 fn test_merged_commit_rollback() {
2253 let mut tx = Transaction::new();
2254 tx.begin().unwrap();
2255 tx.begin().unwrap();
2256 tx.commit().unwrap();
2257 tx.rollback().unwrap();
2258 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2259 }
2260
2261 #[test]
2262 fn test_merged_commit_rollback_1() {
2264 let mut tx = Transaction::new();
2265 tx.begin().unwrap();
2266 tx.begin().unwrap();
2267 tx.commit().unwrap();
2268 tx.rollback().unwrap();
2269 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2270 }
2271
2272 #[test]
2273 fn test_merged_inner_savepoint_rollback() {
2275 let mut tx = Transaction::new();
2276 tx.begin().unwrap();
2277 tx.begin().unwrap();
2278 tx.rollback().unwrap();
2279 assert_eq!(tx.depth(), 1);
2280 assert!(tx.is_active());
2281 }
2282
2283 #[test]
2284 fn test_merged_inner_savepoint_rollback_1() {
2286 let mut tx = Transaction::new();
2287 tx.begin().unwrap();
2288 tx.begin().unwrap();
2289 tx.rollback().unwrap();
2290 assert_eq!(tx.depth(), 1);
2291 assert!(tx.is_active());
2292 }
2293
2294 #[test]
2295 fn test_merged_outer_rollback() {
2297 let mut tx = Transaction::new();
2298 tx.begin().unwrap();
2299 tx.begin().unwrap();
2300 tx.rollback().unwrap();
2301 tx.rollback().unwrap();
2302 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2303 }
2304
2305 #[test]
2306 fn test_merged_outer_rollback_1() {
2308 let mut tx = Transaction::new();
2309 tx.begin().unwrap();
2310 tx.begin().unwrap();
2311 tx.rollback().unwrap();
2312 tx.rollback().unwrap();
2313 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2314 }
2315
2316 #[test]
2317 fn test_merged_rollback_commit() {
2319 let mut tx = Transaction::new();
2320 tx.begin().unwrap();
2321 tx.begin().unwrap();
2322 tx.rollback().unwrap();
2323 tx.commit().unwrap();
2324 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2325 }
2326
2327 #[test]
2328 fn test_merged_rollback_commit_1() {
2330 let mut tx = Transaction::new();
2331 tx.begin().unwrap();
2332 tx.begin().unwrap();
2333 tx.rollback().unwrap();
2334 tx.commit().unwrap();
2335 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2336 }
2337
2338 #[test]
2339 fn test_merged_rollback_rollback() {
2341 let mut tx = Transaction::new();
2342 tx.begin().unwrap();
2343 tx.begin().unwrap();
2344 tx.rollback().unwrap();
2345 tx.rollback().unwrap();
2346 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2347 }
2348
2349 #[test]
2350 fn test_merged_rollback_rollback_1() {
2352 let mut tx = Transaction::new();
2353 tx.begin().unwrap();
2354 tx.begin().unwrap();
2355 tx.rollback().unwrap();
2356 tx.rollback().unwrap();
2357 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2358 }
2359
2360 #[test]
2361 fn test_nested_both_durable() {
2363 let mut tx = Transaction::new();
2364 tx.begin().unwrap();
2365 tx.begin().unwrap();
2366 tx.commit().unwrap();
2367 tx.commit().unwrap();
2368 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2369 }
2370
2371 #[test]
2372 fn test_nested_both_durable_1() {
2374 let mut tx = Transaction::new();
2375 tx.begin().unwrap();
2376 tx.begin().unwrap();
2377 tx.commit().unwrap();
2378 tx.commit().unwrap();
2379 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2380 }
2381
2382 #[test]
2383 fn test_nested_commit_commit() {
2385 let mut tx = Transaction::new();
2386 tx.begin().unwrap();
2387 tx.begin().unwrap();
2388 tx.commit().unwrap();
2389 assert_eq!(tx.depth(), 1);
2390 tx.commit().unwrap();
2391 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2392 }
2393
2394 #[test]
2395 fn test_nested_commit_commit_1() {
2397 let mut tx = Transaction::new();
2398 tx.begin().unwrap();
2399 tx.begin().unwrap();
2400 tx.commit().unwrap();
2401 assert_eq!(tx.depth(), 1);
2402 tx.commit().unwrap();
2403 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2404 }
2405
2406 #[test]
2407 fn test_nested_commit_rollback() {
2409 let mut tx = Transaction::new();
2410 tx.begin().unwrap();
2411 tx.begin().unwrap();
2412 tx.commit().unwrap();
2413 tx.rollback().unwrap();
2414 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2415 }
2416
2417 #[test]
2418 fn test_nested_commit_rollback_1() {
2420 let mut tx = Transaction::new();
2421 tx.begin().unwrap();
2422 tx.begin().unwrap();
2423 tx.commit().unwrap();
2424 tx.rollback().unwrap();
2425 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2426 }
2427
2428 #[test]
2429 fn test_nested_inner_durable() {
2431 let mut tx = Transaction::new();
2432 tx.begin().unwrap();
2433 tx.begin().unwrap();
2434 tx.commit().unwrap();
2435 assert_eq!(tx.depth(), 1);
2436 assert!(tx.is_active());
2437 }
2438
2439 #[test]
2440 fn test_nested_inner_durable_1() {
2442 let mut tx = Transaction::new();
2443 tx.begin().unwrap();
2444 tx.begin().unwrap();
2445 tx.commit().unwrap();
2446 assert_eq!(tx.depth(), 1);
2447 assert!(tx.is_active());
2448 }
2449
2450 #[test]
2451 fn test_nested_outer_durable() {
2453 let mut tx = Transaction::new();
2454 tx.begin().unwrap();
2455 tx.begin().unwrap();
2456 tx.commit().unwrap();
2457 tx.commit().unwrap();
2458 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2459 }
2460
2461 #[test]
2462 fn test_nested_outer_durable_1() {
2464 let mut tx = Transaction::new();
2465 tx.begin().unwrap();
2466 tx.begin().unwrap();
2467 tx.commit().unwrap();
2468 tx.commit().unwrap();
2469 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2470 }
2471
2472 #[test]
2473 fn test_nested_rollback_commit() {
2475 let mut tx = Transaction::new();
2476 tx.begin().unwrap();
2477 tx.begin().unwrap();
2478 tx.rollback().unwrap();
2479 tx.commit().unwrap();
2480 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2481 }
2482
2483 #[test]
2484 fn test_nested_rollback_commit_1() {
2486 let mut tx = Transaction::new();
2487 tx.begin().unwrap();
2488 tx.begin().unwrap();
2489 tx.rollback().unwrap();
2490 tx.commit().unwrap();
2491 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2492 }
2493
2494 #[test]
2495 fn test_nested_rollback_rollback() {
2497 let mut tx = Transaction::new();
2498 tx.begin().unwrap();
2499 tx.begin().unwrap();
2500 tx.rollback().unwrap();
2501 tx.rollback().unwrap();
2502 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2503 }
2504
2505 #[test]
2506 fn test_nested_rollback_rollback_1() {
2508 let mut tx = Transaction::new();
2509 tx.begin().unwrap();
2510 tx.begin().unwrap();
2511 tx.rollback().unwrap();
2512 tx.rollback().unwrap();
2513 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2514 }
2515
2516 #[test]
2517 fn test_orm_query_after_error_and_rollback() {
2519 let mut tx = Transaction::new();
2520 tx.begin().unwrap();
2521 tx.rollback().unwrap();
2522 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2523 }
2524
2525 #[test]
2526 fn test_orm_query_after_error_and_rollback_1() {
2528 let mut tx = Transaction::new();
2529 tx.begin().unwrap();
2530 tx.rollback().unwrap();
2531 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2532 }
2533
2534 #[test]
2535 fn test_orm_query_without_autocommit() {
2537 let mut tx = Transaction::new();
2538 tx.begin().unwrap();
2539 assert!(tx.is_active());
2540 tx.commit().unwrap();
2541 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2542 }
2543
2544 #[test]
2545 fn test_orm_query_without_autocommit_1() {
2547 let mut tx = Transaction::new();
2548 tx.begin().unwrap();
2549 assert!(tx.is_active());
2550 tx.commit().unwrap();
2551 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2552 }
2553
2554 #[test]
2555 fn test_prevent_rollback() {
2557 let mut tx = Transaction::new();
2558 tx.begin().unwrap();
2559 tx.commit().unwrap();
2560 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2561 }
2562
2563 #[test]
2564 fn test_prevent_rollback_1() {
2566 let mut tx = Transaction::new();
2567 tx.begin().unwrap();
2568 tx.commit().unwrap();
2569 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2570 }
2571
2572 #[test]
2573 fn test_reuse_commit_commit() {
2575 let mut tx = Transaction::new();
2576 tx.begin().unwrap();
2577 tx.commit().unwrap();
2578 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2579 }
2580
2581 #[test]
2582 fn test_reuse_commit_commit_1() {
2584 let mut tx = Transaction::new();
2585 tx.begin().unwrap();
2586 tx.commit().unwrap();
2587 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2588 }
2589
2590 #[test]
2591 fn test_reuse_commit_rollback() {
2593 let mut tx = Transaction::new();
2594 tx.begin().unwrap();
2595 tx.commit().unwrap();
2596 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2597 }
2598
2599 #[test]
2600 fn test_reuse_commit_rollback_1() {
2602 let mut tx = Transaction::new();
2603 tx.begin().unwrap();
2604 tx.commit().unwrap();
2605 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2606 }
2607
2608 #[test]
2609 fn test_reuse_rollback_commit() {
2611 let mut tx = Transaction::new();
2612 tx.begin().unwrap();
2613 tx.rollback().unwrap();
2614 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2615 }
2616
2617 #[test]
2618 fn test_reuse_rollback_commit_1() {
2620 let mut tx = Transaction::new();
2621 tx.begin().unwrap();
2622 tx.rollback().unwrap();
2623 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2624 }
2625
2626 #[test]
2627 fn test_reuse_rollback_rollback() {
2629 let mut tx = Transaction::new();
2630 tx.begin().unwrap();
2631 tx.rollback().unwrap();
2632 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2633 }
2634
2635 #[test]
2636 fn test_reuse_rollback_rollback_1() {
2638 let mut tx = Transaction::new();
2639 tx.begin().unwrap();
2640 tx.rollback().unwrap();
2641 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2642 }
2643
2644 #[test]
2645 fn test_rollback() {
2647 let mut tx = Transaction::new();
2648 tx.begin().unwrap();
2649 tx.rollback().unwrap();
2650 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2651 }
2652
2653 #[test]
2654 fn test_rollback_1() {
2656 let mut tx = Transaction::new();
2657 tx.begin().unwrap();
2658 tx.rollback().unwrap();
2659 assert_eq!(tx.state().unwrap(), TransactionState::RolledBack);
2660 }
2661
2662 #[test]
2663 fn test_sequence_of_durables() {
2665 let mut tx = Transaction::new();
2666 tx.begin().unwrap();
2667 tx.commit().unwrap();
2668 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2669 }
2670
2671 #[test]
2672 fn test_sequence_of_durables_1() {
2674 let mut tx = Transaction::new();
2675 tx.begin().unwrap();
2676 tx.commit().unwrap();
2677 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2678 }
2679
2680 #[test]
2681 fn test_wrap_callable_instance() {
2683 let mut tx = Transaction::new();
2684 tx.begin().unwrap();
2685 tx.commit().unwrap();
2686 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2687 }
2688
2689 #[test]
2690 fn test_wrap_callable_instance_1() {
2692 let mut tx = Transaction::new();
2693 tx.begin().unwrap();
2694 tx.commit().unwrap();
2695 assert_eq!(tx.state().unwrap(), TransactionState::Committed);
2696 }
2697
2698 #[rstest]
2700 #[tokio::test]
2701 async fn test_transaction_closure_success(mock_connection: DatabaseConnection) {
2702 let conn = mock_connection;
2703
2704 let result = transaction(&conn, |_tx| async move { Ok(42) }).await;
2705
2706 assert!(result.is_ok());
2707 assert_eq!(result.unwrap(), 42);
2708 }
2709
2710 #[rstest]
2711 #[tokio::test]
2712 async fn test_transaction_closure_error_rollback(mock_connection: DatabaseConnection) {
2713 let conn = mock_connection;
2714
2715 let result: std::result::Result<(), _> =
2716 transaction(
2717 &conn,
2718 |_tx| async move { Err(anyhow::anyhow!("Test error")) },
2719 )
2720 .await;
2721
2722 assert!(result.is_err());
2723 assert_eq!(result.unwrap_err().to_string(), "Test error");
2724 }
2725
2726 #[rstest]
2727 #[tokio::test]
2728 async fn test_transaction_with_isolation_level(mock_connection: DatabaseConnection) {
2729 let conn = mock_connection;
2730
2731 let result = transaction_with_isolation(
2732 &conn,
2733 IsolationLevel::Serializable,
2734 |_tx| async move { Ok(()) },
2735 )
2736 .await;
2737
2738 assert!(result.is_ok());
2739 }
2740}