sql_middleware/sqlite/connection/
tx.rs1use crate::middleware::SqlMiddlewareDbError;
2
3use super::{SqliteConnection, run_blocking};
4use crate::sqlite::config::SharedSqliteConnection;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tokio::time::sleep;
9
10const ROLLBACK_BUSY_RETRIES: &[Duration] =
11 &[Duration::from_millis(10), Duration::from_millis(25), Duration::from_millis(50)];
12
13pub(crate) async fn rollback_with_busy_retries(
14 handle: &SharedSqliteConnection,
15) -> Result<(), SqlMiddlewareDbError> {
16 if handle.force_rollback_busy_for_tests() {
17 return Err(SqlMiddlewareDbError::SqliteError(
18 rusqlite::Error::SqliteFailure(
19 rusqlite::ffi::Error {
20 code: rusqlite::ErrorCode::DatabaseBusy,
21 extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
22 },
23 None,
24 ),
25 ));
26 }
27
28 for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
29 let result = run_blocking(Arc::clone(handle), |guard| {
30 guard
31 .execute_batch("ROLLBACK")
32 .map_err(SqlMiddlewareDbError::SqliteError)
33 })
34 .await;
35
36 if result.is_ok() {
37 return result;
38 }
39 if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
40 &result
41 && err.code == rusqlite::ErrorCode::DatabaseBusy
42 && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
43 {
44 sleep(delay).await;
45 continue;
46 }
47 return result;
48 }
49
50 Err(SqlMiddlewareDbError::ExecutionError(
51 "rollback retries exhausted".into(),
52 ))
53}
54
55pub(crate) fn rollback_with_busy_retries_blocking(
56 handle: &SharedSqliteConnection,
57) -> Result<(), SqlMiddlewareDbError> {
58 if handle.force_rollback_busy_for_tests() {
59 return Err(SqlMiddlewareDbError::SqliteError(
60 rusqlite::Error::SqliteFailure(
61 rusqlite::ffi::Error {
62 code: rusqlite::ErrorCode::DatabaseBusy,
63 extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
64 },
65 None,
66 ),
67 ));
68 }
69
70 for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
71 let result = handle.execute_blocking(|guard| {
72 guard
73 .execute_batch("ROLLBACK")
74 .map_err(SqlMiddlewareDbError::SqliteError)
75 });
76
77 if result.is_ok() {
78 return result;
79 }
80 if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
81 &result
82 && err.code == rusqlite::ErrorCode::DatabaseBusy
83 && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
84 {
85 thread::sleep(delay);
86 continue;
87 }
88 return result;
89 }
90
91 Err(SqlMiddlewareDbError::ExecutionError(
92 "rollback retries exhausted".into(),
93 ))
94}
95
96impl SqliteConnection {
97 pub async fn begin(&mut self) -> Result<(), SqlMiddlewareDbError> {
102 if self.in_transaction {
103 return Err(SqlMiddlewareDbError::ExecutionError(
104 "SQLite transaction already in progress".into(),
105 ));
106 }
107 run_blocking(self.conn_handle(), move |guard| {
108 guard
109 .execute_batch("BEGIN")
110 .map_err(SqlMiddlewareDbError::SqliteError)
111 })
112 .await?;
113 self.in_transaction = true;
114 Ok(())
115 }
116
117 pub async fn commit(&mut self) -> Result<(), SqlMiddlewareDbError> {
122 if !self.in_transaction {
123 return Err(SqlMiddlewareDbError::ExecutionError(
124 "SQLite transaction not active".into(),
125 ));
126 }
127 run_blocking(self.conn_handle(), move |guard| {
128 guard
129 .execute_batch("COMMIT")
130 .map_err(SqlMiddlewareDbError::SqliteError)
131 })
132 .await?;
133 self.in_transaction = false;
134 Ok(())
135 }
136
137 pub async fn rollback(&mut self) -> Result<(), SqlMiddlewareDbError> {
142 if !self.in_transaction {
143 return Err(SqlMiddlewareDbError::ExecutionError(
144 "SQLite transaction not active".into(),
145 ));
146 }
147 let result = rollback_with_busy_retries(&self.conn_handle()).await;
148 if result.is_err() {
149 self.mark_broken();
150 return result;
151 }
152 self.in_transaction = false;
153 result
154 }
155}