Skip to main content

sql_middleware/sqlite/connection/
tx.rs

1use crate::middleware::SqlMiddlewareDbError;
2
3use super::{SqliteConnection, run_blocking};
4use crate::sqlite::config::SharedSqliteConnection;
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8use tokio::time::sleep;
9
10const ROLLBACK_BUSY_RETRIES: &[Duration] = &[
11    Duration::from_millis(10),
12    Duration::from_millis(25),
13    Duration::from_millis(50),
14];
15
16pub(crate) async fn rollback_with_busy_retries(
17    handle: &SharedSqliteConnection,
18) -> Result<(), SqlMiddlewareDbError> {
19    if handle.force_rollback_busy_for_tests() {
20        return Err(SqlMiddlewareDbError::SqliteError(
21            rusqlite::Error::SqliteFailure(
22                rusqlite::ffi::Error {
23                    code: rusqlite::ErrorCode::DatabaseBusy,
24                    extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
25                },
26                None,
27            ),
28        ));
29    }
30
31    for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
32        let result = run_blocking(Arc::clone(handle), |guard| {
33            guard
34                .execute_batch("ROLLBACK")
35                .map_err(SqlMiddlewareDbError::SqliteError)
36        })
37        .await;
38
39        if result.is_ok() {
40            return result;
41        }
42        if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
43            &result
44            && err.code == rusqlite::ErrorCode::DatabaseBusy
45            && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
46        {
47            sleep(delay).await;
48            continue;
49        }
50        return result;
51    }
52
53    Err(SqlMiddlewareDbError::ExecutionError(
54        "rollback retries exhausted".into(),
55    ))
56}
57
58pub(crate) fn rollback_with_busy_retries_blocking(
59    handle: &SharedSqliteConnection,
60) -> Result<(), SqlMiddlewareDbError> {
61    if handle.force_rollback_busy_for_tests() {
62        return Err(SqlMiddlewareDbError::SqliteError(
63            rusqlite::Error::SqliteFailure(
64                rusqlite::ffi::Error {
65                    code: rusqlite::ErrorCode::DatabaseBusy,
66                    extended_code: rusqlite::ErrorCode::DatabaseBusy as i32,
67                },
68                None,
69            ),
70        ));
71    }
72
73    for (idx, delay) in ROLLBACK_BUSY_RETRIES.iter().copied().enumerate() {
74        let result = handle.execute_blocking(|guard| {
75            guard
76                .execute_batch("ROLLBACK")
77                .map_err(SqlMiddlewareDbError::SqliteError)
78        });
79
80        if result.is_ok() {
81            return result;
82        }
83        if let Err(SqlMiddlewareDbError::SqliteError(rusqlite::Error::SqliteFailure(err, _))) =
84            &result
85            && err.code == rusqlite::ErrorCode::DatabaseBusy
86            && idx + 1 < ROLLBACK_BUSY_RETRIES.len()
87        {
88            thread::sleep(delay);
89            continue;
90        }
91        return result;
92    }
93
94    Err(SqlMiddlewareDbError::ExecutionError(
95        "rollback retries exhausted".into(),
96    ))
97}
98
99impl SqliteConnection {
100    /// Begin a transaction, transitioning this connection into transactional mode.
101    ///
102    /// # Errors
103    /// Returns `SqlMiddlewareDbError` if the transaction cannot be started or is already active.
104    pub async fn begin(&mut self) -> Result<(), SqlMiddlewareDbError> {
105        if self.in_transaction {
106            return Err(SqlMiddlewareDbError::ExecutionError(
107                "SQLite transaction already in progress".into(),
108            ));
109        }
110        run_blocking(self.conn_handle(), move |guard| {
111            guard
112                .execute_batch("BEGIN")
113                .map_err(SqlMiddlewareDbError::SqliteError)
114        })
115        .await?;
116        self.in_transaction = true;
117        Ok(())
118    }
119
120    /// Commit an open transaction.
121    ///
122    /// # Errors
123    /// Returns `SqlMiddlewareDbError` if committing fails or no transaction is active.
124    pub async fn commit(&mut self) -> Result<(), SqlMiddlewareDbError> {
125        if !self.in_transaction {
126            return Err(SqlMiddlewareDbError::ExecutionError(
127                "SQLite transaction not active".into(),
128            ));
129        }
130        run_blocking(self.conn_handle(), move |guard| {
131            guard
132                .execute_batch("COMMIT")
133                .map_err(SqlMiddlewareDbError::SqliteError)
134        })
135        .await?;
136        self.in_transaction = false;
137        Ok(())
138    }
139
140    /// Roll back an open transaction.
141    ///
142    /// # Errors
143    /// Returns `SqlMiddlewareDbError` if rolling back fails or no transaction is active.
144    pub async fn rollback(&mut self) -> Result<(), SqlMiddlewareDbError> {
145        if !self.in_transaction {
146            return Err(SqlMiddlewareDbError::ExecutionError(
147                "SQLite transaction not active".into(),
148            ));
149        }
150        let result = rollback_with_busy_retries(&self.conn_handle()).await;
151        if result.is_err() {
152            self.mark_broken();
153            return result;
154        }
155        self.in_transaction = false;
156        result
157    }
158}