1use std::convert::TryInto;
3use std::mem;
4use std::os::raw::{c_int, c_void};
5use std::panic::catch_unwind;
6use std::ptr;
7use std::time::Duration;
8
9use crate::ffi;
10use crate::{Connection, InnerConnection, Result};
11
12impl Connection {
13 pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
28 let ms: i32 = timeout
29 .as_secs()
30 .checked_mul(1000)
31 .and_then(|t| t.checked_add(timeout.subsec_millis().into()))
32 .and_then(|t| t.try_into().ok())
33 .expect("too big");
34 self.db.borrow_mut().busy_timeout(ms)
35 }
36
37 pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
59 unsafe extern "C" fn busy_handler_callback(p_arg: *mut c_void, count: c_int) -> c_int {
60 let handler_fn: fn(i32) -> bool = mem::transmute(p_arg);
61 if let Ok(true) = catch_unwind(|| handler_fn(count)) {
62 1
63 } else {
64 0
65 }
66 }
67 let c = self.db.borrow_mut();
68 let r = match callback {
69 Some(f) => unsafe {
70 ffi::sqlite3_busy_handler(c.db(), Some(busy_handler_callback), f as *mut c_void)
71 },
72 None => unsafe { ffi::sqlite3_busy_handler(c.db(), None, ptr::null_mut()) },
73 };
74 c.decode_result(r)
75 }
76}
77
78impl InnerConnection {
79 #[inline]
80 fn busy_timeout(&mut self, timeout: c_int) -> Result<()> {
81 let r = unsafe { ffi::sqlite3_busy_timeout(self.db, timeout) };
82 self.decode_result(r)
83 }
84}
85
86#[cfg(test)]
87mod test {
88 use std::sync::atomic::{AtomicBool, Ordering};
89 use std::sync::mpsc::sync_channel;
90 use std::thread;
91 use std::time::Duration;
92
93 use crate::{Connection, ErrorCode, Result, TransactionBehavior};
94
95 #[test]
96 fn test_default_busy() -> Result<()> {
97 let temp_dir = tempfile::tempdir().unwrap();
98 let path = temp_dir.path().join("test.db3");
99
100 let mut db1 = Connection::open(&path)?;
101 let tx1 = db1.transaction_with_behavior(TransactionBehavior::Exclusive)?;
102 let db2 = Connection::open(&path)?;
103 let r: Result<()> = db2.query_row("PRAGMA schema_version", [], |_| unreachable!());
104 assert_eq!(
105 r.unwrap_err().sqlite_error_code(),
106 Some(ErrorCode::DatabaseBusy)
107 );
108 tx1.rollback()
109 }
110
111 #[test]
112 #[ignore] fn test_busy_timeout() {
114 let temp_dir = tempfile::tempdir().unwrap();
115 let path = temp_dir.path().join("test.db3");
116
117 let db2 = Connection::open(&path).unwrap();
118 db2.busy_timeout(Duration::from_secs(1)).unwrap();
119
120 let (rx, tx) = sync_channel(0);
121 let child = thread::spawn(move || {
122 let mut db1 = Connection::open(&path).unwrap();
123 let tx1 = db1
124 .transaction_with_behavior(TransactionBehavior::Exclusive)
125 .unwrap();
126 rx.send(1).unwrap();
127 thread::sleep(Duration::from_millis(100));
128 tx1.rollback().unwrap();
129 });
130
131 assert_eq!(tx.recv().unwrap(), 1);
132 let _ = db2
133 .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
134 .expect("unexpected error");
135
136 child.join().unwrap();
137 }
138
139 #[test]
140 #[ignore] fn test_busy_handler() {
142 static CALLED: AtomicBool = AtomicBool::new(false);
143 fn busy_handler(_: i32) -> bool {
144 CALLED.store(true, Ordering::Relaxed);
145 thread::sleep(Duration::from_millis(100));
146 true
147 }
148
149 let temp_dir = tempfile::tempdir().unwrap();
150 let path = temp_dir.path().join("test.db3");
151
152 let db2 = Connection::open(&path).unwrap();
153 db2.busy_handler(Some(busy_handler)).unwrap();
154
155 let (rx, tx) = sync_channel(0);
156 let child = thread::spawn(move || {
157 let mut db1 = Connection::open(&path).unwrap();
158 let tx1 = db1
159 .transaction_with_behavior(TransactionBehavior::Exclusive)
160 .unwrap();
161 rx.send(1).unwrap();
162 thread::sleep(Duration::from_millis(100));
163 tx1.rollback().unwrap();
164 });
165
166 assert_eq!(tx.recv().unwrap(), 1);
167 let _ = db2
168 .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
169 .expect("unexpected error");
170 assert!(CALLED.load(Ordering::Relaxed));
171
172 child.join().unwrap();
173 }
174}