sql_middleware/sqlite/connection/
tx.rs

1use crate::middleware::SqlMiddlewareDbError;
2
3use super::{SqliteConnection, run_blocking};
4use crate::sqlite::config::SharedSqliteConnection;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tokio::time::sleep;
9
10const ROLLBACK_BUSY_RETRIES: &[Duration] =
11    &[Duration::from_millis(10), Duration::from_millis(25), Duration::from_millis(50)];
12
13pub(crate) async fn rollback_with_busy_retries(
14    handle: &SharedSqliteConnection,
15) -> Result<(), SqlMiddlewareDbError> {
16    if handle.force_rollback_busy_for_tests() {
17        return Err(SqlMiddlewareDbError::SqliteError(
18            rusqlite::Error::SqliteFailure(
19                rusqlite::ffi::Error {
20                    code: rusqlite::ErrorCode::DatabaseBusy,
21                    extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
22                },
23                None,
24            ),
25        ));
26    }
27
28    for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
29        let result = run_blocking(Arc::clone(handle), |guard| {
30            guard
31                .execute_batch("ROLLBACK")
32                .map_err(SqlMiddlewareDbError::SqliteError)
33        })
34        .await;
35
36        if result.is_ok() {
37            return result;
38        }
39        if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
40            &result
41            && err.code == rusqlite::ErrorCode::DatabaseBusy
42            && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
43        {
44            sleep(delay).await;
45            continue;
46        }
47        return result;
48    }
49
50    Err(SqlMiddlewareDbError::ExecutionError(
51        "rollback retries exhausted".into(),
52    ))
53}
54
55pub(crate) fn rollback_with_busy_retries_blocking(
56    handle: &SharedSqliteConnection,
57) -> Result<(), SqlMiddlewareDbError> {
58    if handle.force_rollback_busy_for_tests() {
59        return Err(SqlMiddlewareDbError::SqliteError(
60            rusqlite::Error::SqliteFailure(
61                rusqlite::ffi::Error {
62                    code: rusqlite::ErrorCode::DatabaseBusy,
63                    extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
64                },
65                None,
66            ),
67        ));
68    }
69
70    for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
71        let result = handle.execute_blocking(|guard| {
72            guard
73                .execute_batch("ROLLBACK")
74                .map_err(SqlMiddlewareDbError::SqliteError)
75        });
76
77        if result.is_ok() {
78            return result;
79        }
80        if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
81            &result
82            && err.code == rusqlite::ErrorCode::DatabaseBusy
83            && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
84        {
85            thread::sleep(delay);
86            continue;
87        }
88        return result;
89    }
90
91    Err(SqlMiddlewareDbError::ExecutionError(
92        "rollback retries exhausted".into(),
93    ))
94}
95
96impl SqliteConnection {
97    /// Begin a transaction, transitioning this connection into transactional mode.
98    ///
99    /// # Errors
100    /// Returns `SqlMiddlewareDbError` if the transaction cannot be started or is already active.
101    pub async fn begin(&mut self) -> Result<(), SqlMiddlewareDbError> {
102        if self.in_transaction {
103            return Err(SqlMiddlewareDbError::ExecutionError(
104                "SQLite transaction already in progress".into(),
105            ));
106        }
107        run_blocking(self.conn_handle(), move |guard| {
108            guard
109                .execute_batch("BEGIN")
110                .map_err(SqlMiddlewareDbError::SqliteError)
111        })
112        .await?;
113        self.in_transaction = true;
114        Ok(())
115    }
116
117    /// Commit an open transaction.
118    ///
119    /// # Errors
120    /// Returns `SqlMiddlewareDbError` if committing fails or no transaction is active.
121    pub async fn commit(&mut self) -> Result<(), SqlMiddlewareDbError> {
122        if !self.in_transaction {
123            return Err(SqlMiddlewareDbError::ExecutionError(
124                "SQLite transaction not active".into(),
125            ));
126        }
127        run_blocking(self.conn_handle(), move |guard| {
128            guard
129                .execute_batch("COMMIT")
130                .map_err(SqlMiddlewareDbError::SqliteError)
131        })
132        .await?;
133        self.in_transaction = false;
134        Ok(())
135    }
136
137    /// Roll back an open transaction.
138    ///
139    /// # Errors
140    /// Returns `SqlMiddlewareDbError` if rolling back fails or no transaction is active.
141    pub async fn rollback(&mut self) -> Result<(), SqlMiddlewareDbError> {
142        if !self.in_transaction {
143            return Err(SqlMiddlewareDbError::ExecutionError(
144                "SQLite transaction not active".into(),
145            ));
146        }
147        let result = rollback_with_busy_retries(&self.conn_handle()).await;
148        if result.is_err() {
149            self.mark_broken();
150            return result;
151        }
152        self.in_transaction = false;
153        result
154    }
155}