Skip to main content

rusqlite/
unlock_notify.rs

1//! [Unlock Notification](http://sqlite.org/unlock_notify.html)
2
3use std::ffi::{c_int, c_void};
4use std::panic::catch_unwind;
5use std::sync::{Condvar, Mutex};
6
7use crate::ffi;
8
9struct UnlockNotification {
10    cond: Condvar,      // Condition variable to wait on
11    mutex: Mutex<bool>, // Mutex to protect structure
12}
13
14impl UnlockNotification {
15    fn new() -> Self {
16        Self {
17            cond: Condvar::new(),
18            mutex: Mutex::new(false),
19        }
20    }
21
22    fn fired(&self) {
23        let mut flag = unpoison(self.mutex.lock());
24        *flag = true;
25        self.cond.notify_one();
26    }
27
28    fn wait(&self) {
29        let mut fired = unpoison(self.mutex.lock());
30        while !*fired {
31            fired = unpoison(self.cond.wait(fired));
32        }
33    }
34}
35
36#[inline]
37fn unpoison<T>(r: Result<T, std::sync::PoisonError<T>>) -> T {
38    r.unwrap_or_else(std::sync::PoisonError::into_inner)
39}
40
41/// This function is an unlock-notify callback
42unsafe extern "C" fn unlock_notify_cb(ap_arg: *mut *mut c_void, n_arg: c_int) {
43    use std::slice::from_raw_parts;
44    let args = from_raw_parts(ap_arg as *const &UnlockNotification, n_arg as usize);
45    for un in args {
46        drop(catch_unwind(std::panic::AssertUnwindSafe(|| un.fired())));
47    }
48}
49
50pub unsafe fn is_locked(db: *mut ffi::sqlite3, rc: c_int) -> bool {
51    rc == ffi::SQLITE_LOCKED_SHAREDCACHE
52        || (rc & 0xFF) == ffi::SQLITE_LOCKED
53            && ffi::sqlite3_extended_errcode(db) == ffi::SQLITE_LOCKED_SHAREDCACHE
54}
55
56/// This function assumes that an SQLite API call (either `sqlite3_prepare_v2()`
57/// or `sqlite3_step()`) has just returned `SQLITE_LOCKED`. The argument is the
58/// associated database connection.
59///
60/// This function calls `sqlite3_unlock_notify()` to register for an
61/// unlock-notify callback, then blocks until that callback is delivered
62/// and returns `SQLITE_OK`. The caller should then retry the failed operation.
63///
64/// Or, if `sqlite3_unlock_notify()` indicates that to block would deadlock
65/// the system, then this function returns `SQLITE_LOCKED` immediately. In
66/// this case the caller should not retry the operation and should roll
67/// back the current transaction (if any).
68#[cfg(feature = "unlock_notify")]
69pub unsafe fn wait_for_unlock_notify(db: *mut ffi::sqlite3) -> c_int {
70    let un = UnlockNotification::new();
71    /* Register for an unlock-notify callback. */
72    let rc = ffi::sqlite3_unlock_notify(
73        db,
74        Some(unlock_notify_cb),
75        &un as *const UnlockNotification as *mut c_void,
76    );
77    debug_assert!(
78        rc == ffi::SQLITE_LOCKED || rc == ffi::SQLITE_LOCKED_SHAREDCACHE || rc == ffi::SQLITE_OK
79    );
80    if rc == ffi::SQLITE_OK {
81        un.wait();
82    }
83    rc
84}
85
86#[cfg(all(test, not(miri)))]
87mod test {
88    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
89    use wasm_bindgen_test::wasm_bindgen_test as test;
90
91    use crate::{Connection, OpenFlags, Result, Transaction, TransactionBehavior};
92    use std::sync::mpsc::sync_channel;
93    use std::thread;
94    use std::time;
95
96    #[cfg_attr(
97        all(target_family = "wasm", target_os = "unknown"),
98        ignore = "no thread on this platform"
99    )]
100    #[test]
101    fn test_unlock_notify() -> Result<()> {
102        let url = "file::memory:?cache=shared";
103        let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_URI;
104        let db1 = Connection::open_with_flags(url, flags)?;
105        db1.execute_batch("CREATE TABLE foo (x)")?;
106        let (rx, tx) = sync_channel(0);
107        let child = thread::spawn(move || {
108            let mut db2 = Connection::open_with_flags(url, flags).unwrap();
109            let tx2 = Transaction::new(&mut db2, TransactionBehavior::Immediate).unwrap();
110            tx2.execute_batch("INSERT INTO foo VALUES (42)").unwrap();
111            rx.send(1).unwrap();
112            let ten_millis = time::Duration::from_millis(10);
113            thread::sleep(ten_millis);
114            tx2.commit().unwrap();
115        });
116        assert_eq!(tx.recv().unwrap(), 1);
117        assert_eq!(42, db1.one_column::<i64, _>("SELECT x FROM foo", [])?);
118        child.join().unwrap();
119        Ok(())
120    }
121}