rusqlite/
busy.rs

1//! Busy handler (when the database is locked)
2use std::convert::TryInto;
3use std::mem;
4use std::os::raw::{c_int, c_void};
5use std::panic::catch_unwind;
6use std::ptr;
7use std::time::Duration;
8
9use crate::ffi;
10use crate::{Connection, InnerConnection, Result};
11
12impl Connection {
13    /// Set a busy handler that sleeps for a specified amount of time when a
14    /// table is locked. The handler will sleep multiple times until at
15    /// least "ms" milliseconds of sleeping have accumulated.
16    ///
17    /// Calling this routine with an argument equal to zero turns off all busy
18    /// handlers.
19    ///
20    /// There can only be a single busy handler for a particular database
21    /// connection at any given moment. If another busy handler was defined
22    /// (using [`busy_handler`](Connection::busy_handler)) prior to calling this
23    /// routine, that other busy handler is cleared.
24    ///
25    /// Newly created connections currently have a default busy timeout of
26    /// 5000ms, but this may be subject to change.
27    pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
28        let ms: i32 = timeout
29            .as_secs()
30            .checked_mul(1000)
31            .and_then(|t| t.checked_add(timeout.subsec_millis().into()))
32            .and_then(|t| t.try_into().ok())
33            .expect("too big");
34        self.db.borrow_mut().busy_timeout(ms)
35    }
36
37    /// Register a callback to handle `SQLITE_BUSY` errors.
38    ///
39    /// If the busy callback is `None`, then `SQLITE_BUSY` is returned
40    /// immediately upon encountering the lock. The argument to the busy
41    /// handler callback is the number of times that the
42    /// busy handler has been invoked previously for the
43    /// same locking event. If the busy callback returns `false`, then no
44    /// additional attempts are made to access the
45    /// database and `SQLITE_BUSY` is returned to the
46    /// application. If the callback returns `true`, then another attempt
47    /// is made to access the database and the cycle repeats.
48    ///
49    /// There can only be a single busy handler defined for each database
50    /// connection. Setting a new busy handler clears any previously set
51    /// handler. Note that calling [`busy_timeout()`](Connection::busy_timeout)
52    /// or evaluating `PRAGMA busy_timeout=N` will change the busy handler
53    /// and thus clear any previously set busy handler.
54    ///
55    /// Newly created connections default to a
56    /// [`busy_timeout()`](Connection::busy_timeout) handler with a timeout
57    /// of 5000ms, although this is subject to change.
58    pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
59        unsafe extern "C" fn busy_handler_callback(p_arg: *mut c_void, count: c_int) -> c_int {
60            let handler_fn: fn(i32) -> bool = mem::transmute(p_arg);
61            c_int::from(catch_unwind(|| handler_fn(count)).unwrap_or_default())
62        }
63        let c = self.db.borrow_mut();
64        let r = match callback {
65            Some(f) => unsafe {
66                ffi::sqlite3_busy_handler(c.db(), Some(busy_handler_callback), f as *mut c_void)
67            },
68            None => unsafe { ffi::sqlite3_busy_handler(c.db(), None, ptr::null_mut()) },
69        };
70        c.decode_result(r)
71    }
72}
73
74impl InnerConnection {
75    #[inline]
76    fn busy_timeout(&mut self, timeout: c_int) -> Result<()> {
77        let r = unsafe { ffi::sqlite3_busy_timeout(self.db, timeout) };
78        self.decode_result(r)
79    }
80}
81
82#[cfg(test)]
83mod test {
84    use std::sync::atomic::{AtomicBool, Ordering};
85    use std::sync::mpsc::sync_channel;
86    use std::thread;
87    use std::time::Duration;
88
89    use crate::{Connection, ErrorCode, Result, TransactionBehavior};
90
91    #[test]
92    fn test_default_busy() -> Result<()> {
93        let temp_dir = tempfile::tempdir().unwrap();
94        let path = temp_dir.path().join("test.db3");
95
96        let mut db1 = Connection::open(&path)?;
97        let tx1 = db1.transaction_with_behavior(TransactionBehavior::Exclusive)?;
98        let db2 = Connection::open(&path)?;
99        let r: Result<()> = db2.query_row("PRAGMA schema_version", [], |_| unreachable!());
100        assert_eq!(
101            r.unwrap_err().sqlite_error_code(),
102            Some(ErrorCode::DatabaseBusy)
103        );
104        tx1.rollback()
105    }
106
107    #[test]
108    #[ignore] // FIXME: unstable
109    fn test_busy_timeout() {
110        let temp_dir = tempfile::tempdir().unwrap();
111        let path = temp_dir.path().join("test.db3");
112
113        let db2 = Connection::open(&path).unwrap();
114        db2.busy_timeout(Duration::from_secs(1)).unwrap();
115
116        let (rx, tx) = sync_channel(0);
117        let child = thread::spawn(move || {
118            let mut db1 = Connection::open(&path).unwrap();
119            let tx1 = db1
120                .transaction_with_behavior(TransactionBehavior::Exclusive)
121                .unwrap();
122            rx.send(1).unwrap();
123            thread::sleep(Duration::from_millis(100));
124            tx1.rollback().unwrap();
125        });
126
127        assert_eq!(tx.recv().unwrap(), 1);
128        let _ = db2
129            .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
130            .expect("unexpected error");
131
132        child.join().unwrap();
133    }
134
135    #[test]
136    #[ignore] // FIXME: unstable
137    fn test_busy_handler() {
138        static CALLED: AtomicBool = AtomicBool::new(false);
139        fn busy_handler(_: i32) -> bool {
140            CALLED.store(true, Ordering::Relaxed);
141            thread::sleep(Duration::from_millis(100));
142            true
143        }
144
145        let temp_dir = tempfile::tempdir().unwrap();
146        let path = temp_dir.path().join("test.db3");
147
148        let db2 = Connection::open(&path).unwrap();
149        db2.busy_handler(Some(busy_handler)).unwrap();
150
151        let (rx, tx) = sync_channel(0);
152        let child = thread::spawn(move || {
153            let mut db1 = Connection::open(&path).unwrap();
154            let tx1 = db1
155                .transaction_with_behavior(TransactionBehavior::Exclusive)
156                .unwrap();
157            rx.send(1).unwrap();
158            thread::sleep(Duration::from_millis(100));
159            tx1.rollback().unwrap();
160        });
161
162        assert_eq!(tx.recv().unwrap(), 1);
163        let _ = db2
164            .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
165            .expect("unexpected error");
166        assert!(CALLED.load(Ordering::Relaxed));
167
168        child.join().unwrap();
169    }
170}