rusqlite/
unlock_notify.rs1use std::ffi::{c_int, c_void};
4use std::panic::catch_unwind;
5use std::sync::{Condvar, Mutex};
6
7use crate::ffi;
8
9struct UnlockNotification {
10 cond: Condvar, mutex: Mutex<bool>, }
13
14impl UnlockNotification {
15 fn new() -> Self {
16 Self {
17 cond: Condvar::new(),
18 mutex: Mutex::new(false),
19 }
20 }
21
22 fn fired(&self) {
23 let mut flag = unpoison(self.mutex.lock());
24 *flag = true;
25 self.cond.notify_one();
26 }
27
28 fn wait(&self) {
29 let mut fired = unpoison(self.mutex.lock());
30 while !*fired {
31 fired = unpoison(self.cond.wait(fired));
32 }
33 }
34}
35
36#[inline]
37fn unpoison<T>(r: Result<T, std::sync::PoisonError<T>>) -> T {
38 r.unwrap_or_else(std::sync::PoisonError::into_inner)
39}
40
41unsafe extern "C" fn unlock_notify_cb(ap_arg: *mut *mut c_void, n_arg: c_int) {
43 use std::slice::from_raw_parts;
44 let args = from_raw_parts(ap_arg as *const &UnlockNotification, n_arg as usize);
45 for un in args {
46 drop(catch_unwind(std::panic::AssertUnwindSafe(|| un.fired())));
47 }
48}
49
50pub unsafe fn is_locked(db: *mut ffi::sqlite3, rc: c_int) -> bool {
51 rc == ffi::SQLITE_LOCKED_SHAREDCACHE
52 || (rc & 0xFF) == ffi::SQLITE_LOCKED
53 && ffi::sqlite3_extended_errcode(db) == ffi::SQLITE_LOCKED_SHAREDCACHE
54}
55
56#[cfg(feature = "unlock_notify")]
69pub unsafe fn wait_for_unlock_notify(db: *mut ffi::sqlite3) -> c_int {
70 let un = UnlockNotification::new();
71 let rc = ffi::sqlite3_unlock_notify(
73 db,
74 Some(unlock_notify_cb),
75 &un as *const UnlockNotification as *mut c_void,
76 );
77 debug_assert!(
78 rc == ffi::SQLITE_LOCKED || rc == ffi::SQLITE_LOCKED_SHAREDCACHE || rc == ffi::SQLITE_OK
79 );
80 if rc == ffi::SQLITE_OK {
81 un.wait();
82 }
83 rc
84}
85
86#[cfg(all(test, not(miri)))]
87mod test {
88 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
89 use wasm_bindgen_test::wasm_bindgen_test as test;
90
91 use crate::{Connection, OpenFlags, Result, Transaction, TransactionBehavior};
92 use std::sync::mpsc::sync_channel;
93 use std::thread;
94 use std::time;
95
96 #[cfg_attr(
97 all(target_family = "wasm", target_os = "unknown"),
98 ignore = "no thread on this platform"
99 )]
100 #[test]
101 fn test_unlock_notify() -> Result<()> {
102 let url = "file::memory:?cache=shared";
103 let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_URI;
104 let db1 = Connection::open_with_flags(url, flags)?;
105 db1.execute_batch("CREATE TABLE foo (x)")?;
106 let (rx, tx) = sync_channel(0);
107 let child = thread::spawn(move || {
108 let mut db2 = Connection::open_with_flags(url, flags).unwrap();
109 let tx2 = Transaction::new(&mut db2, TransactionBehavior::Immediate).unwrap();
110 tx2.execute_batch("INSERT INTO foo VALUES (42)").unwrap();
111 rx.send(1).unwrap();
112 let ten_millis = time::Duration::from_millis(10);
113 thread::sleep(ten_millis);
114 tx2.commit().unwrap();
115 });
116 assert_eq!(tx.recv().unwrap(), 1);
117 assert_eq!(42, db1.one_column::<i64, _>("SELECT x FROM foo", [])?);
118 child.join().unwrap();
119 Ok(())
120 }
121}