sql_middleware/sqlite/connection/
tx.rs1use crate::middleware::SqlMiddlewareDbError;
2
3use super::{SqliteConnection, run_blocking};
4use crate::sqlite::config::SharedSqliteConnection;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tokio::time::sleep;
9
10const ROLLBACK_BUSY_RETRIES: &[Duration] = &[
11 Duration::from_millis(10),
12 Duration::from_millis(25),
13 Duration::from_millis(50),
14];
15
16pub(crate) async fn rollback_with_busy_retries(
17 handle: &SharedSqliteConnection,
18) -> Result<(), SqlMiddlewareDbError> {
19 if handle.force_rollback_busy_for_tests() {
20 return Err(SqlMiddlewareDbError::SqliteError(
21 rusqlite::Error::SqliteFailure(
22 rusqlite::ffi::Error {
23 code: rusqlite::ErrorCode::DatabaseBusy,
24 extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
25 },
26 None,
27 ),
28 ));
29 }
30
31 for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
32 let result = run_blocking(Arc::clone(handle), |guard| {
33 guard
34 .execute_batch("ROLLBACK")
35 .map_err(SqlMiddlewareDbError::SqliteError)
36 })
37 .await;
38
39 if result.is_ok() {
40 return result;
41 }
42 if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
43 &result
44 && err.code == rusqlite::ErrorCode::DatabaseBusy
45 && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
46 {
47 sleep(delay).await;
48 continue;
49 }
50 return result;
51 }
52
53 Err(SqlMiddlewareDbError::ExecutionError(
54 "rollback retries exhausted".into(),
55 ))
56}
57
58pub(crate) fn rollback_with_busy_retries_blocking(
59 handle: &SharedSqliteConnection,
60) -> Result<(), SqlMiddlewareDbError> {
61 if handle.force_rollback_busy_for_tests() {
62 return Err(SqlMiddlewareDbError::SqliteError(
63 rusqlite::Error::SqliteFailure(
64 rusqlite::ffi::Error {
65 code: rusqlite::ErrorCode::DatabaseBusy,
66 extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
67 },
68 None,
69 ),
70 ));
71 }
72
73 for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
74 let result = handle.execute_blocking(|guard| {
75 guard
76 .execute_batch("ROLLBACK")
77 .map_err(SqlMiddlewareDbError::SqliteError)
78 });
79
80 if result.is_ok() {
81 return result;
82 }
83 if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
84 &result
85 && err.code == rusqlite::ErrorCode::DatabaseBusy
86 && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
87 {
88 thread::sleep(delay);
89 continue;
90 }
91 return result;
92 }
93
94 Err(SqlMiddlewareDbError::ExecutionError(
95 "rollback retries exhausted".into(),
96 ))
97}
98
99impl SqliteConnection {
100 pub async fn begin(&mut self) -> Result<(), SqlMiddlewareDbError> {
105 if self.in_transaction {
106 return Err(SqlMiddlewareDbError::ExecutionError(
107 "SQLite transaction already in progress".into(),
108 ));
109 }
110 run_blocking(self.conn_handle(), move |guard| {
111 guard
112 .execute_batch("BEGIN")
113 .map_err(SqlMiddlewareDbError::SqliteError)
114 })
115 .await?;
116 self.in_transaction = true;
117 Ok(())
118 }
119
120 pub async fn commit(&mut self) -> Result<(), SqlMiddlewareDbError> {
125 if !self.in_transaction {
126 return Err(SqlMiddlewareDbError::ExecutionError(
127 "SQLite transaction not active".into(),
128 ));
129 }
130 run_blocking(self.conn_handle(), move |guard| {
131 guard
132 .execute_batch("COMMIT")
133 .map_err(SqlMiddlewareDbError::SqliteError)
134 })
135 .await?;
136 self.in_transaction = false;
137 Ok(())
138 }
139
140 pub async fn rollback(&mut self) -> Result<(), SqlMiddlewareDbError> {
145 if !self.in_transaction {
146 return Err(SqlMiddlewareDbError::ExecutionError(
147 "SQLite transaction not active".into(),
148 ));
149 }
150 let result = rollback_with_busy_retries(&self.conn_handle()).await;
151 if result.is_err() {
152 self.mark_broken();
153 return result;
154 }
155 self.in_transaction = false;
156 result
157 }
158}