rbdc_sqlite/connection/mod.rs
1use futures_core::future::BoxFuture;
2use futures_intrusive::sync::MutexGuard;
3use futures_util::future;
4use libsqlite3_sys::sqlite3;
5use std::cmp::Ordering;
6use std::fmt::{self, Debug, Formatter};
7use std::ptr::NonNull;
8
9pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
10
11use crate::connection::establish::EstablishParams;
12use crate::connection::worker::ConnectionWorker;
13use crate::statement::VirtualStatement;
14use crate::SqliteConnectOptions;
15use rbdc::error::Error;
16use rbdc::StatementCache;
17
18pub(crate) mod collation;
19mod establish;
20mod execute;
21mod executor;
22mod handle;
23
24mod worker;
25pub use worker::Command;
26
27/// A connection to an open [Sqlite] database.
28///
29/// Because SQLite is an in-process database accessed by blocking API calls, rbdc uses a background
30/// thread and communicates with it via channels to allow non-blocking access to the database.
31///
32/// Dropping this struct will signal the worker thread to quit and close the database, though
33/// if an error occurs there is no way to pass it back to the user this way.
34///
35/// You can explicitly call [`.close()`][Self::close] to ensure the database is closed successfully
36/// or get an error otherwise.
37pub struct SqliteConnection {
38 pub(crate) worker: ConnectionWorker,
39 pub(crate) row_channel_size: usize,
40}
41
42// SAFETY: SqliteConnection is safe to share between threads because:
43// 1. The `worker` field is marked as `pub(crate)`, preventing external users from directly accessing it
44// 2. All SQLite operations are sent through thread-safe flume channels to a dedicated worker thread
45// 3. The `worker.handle_raw` pointer (raw SQLite connection) is encapsulated within the crate
46// 4. Any operation requiring direct access to the SQLite connection uses the `lock_handle()` method
47// which acquires a mutex lock, ensuring synchronized access
48// 5. The only potentially unsafe method `as_raw_handle()` is marked as deprecated with warnings
49// about thread safety and should be marked as `unsafe` to properly indicate its risks
50unsafe impl Sync for SqliteConnection {}
51
52pub struct LockedSqliteHandle<'a> {
53 pub(crate) guard: MutexGuard<'a, ConnectionState>,
54}
55
56pub struct ConnectionState {
57 pub(crate) handle: ConnectionHandle,
58
59 pub(crate) statements: Statements,
60}
61
62pub(crate) struct Statements {
63 // cache of semi-persistent statements
64 cached: StatementCache<VirtualStatement>,
65 // most recent non-persistent statement
66 temp: Option<VirtualStatement>,
67}
68
69impl SqliteConnection {
70 pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
71 let params = EstablishParams::from_options(options)?;
72 let worker = ConnectionWorker::establish(params).await?;
73 Ok(Self {
74 worker,
75 row_channel_size: options.row_channel_size,
76 })
77 }
78
79 /// Returns the underlying sqlite3* connection handle.
80 ///
81 /// ### Note
82 /// There is no synchronization using this method, beware that the background thread could
83 /// be making SQLite API calls concurrent to use of this method.
84 ///
85 /// You probably want to use [`.lock_handle()`][Self::lock_handle] to ensure that the worker thread is not using
86 /// the database concurrently.
87 #[deprecated(note = "Unsynchronized access is unsafe. See documentation for details.")]
88 pub unsafe fn as_raw_handle(&mut self) -> *mut sqlite3 {
89 self.worker.handle_raw.as_ptr()
90 }
91
92 /// Apply a collation to the open database.
93 ///
94 /// See [`SqliteConnectOptions::collation()`] for details.
95 ///
96 /// ### Deprecated
97 /// Due to the rearchitecting of the SQLite driver, this method cannot actually function
98 /// synchronously and return the result directly from `sqlite3_create_collation_v2()`, so
99 /// it instead sends a message to the worker create the collation asynchronously.
100 /// If an error occurs it will simply be logged.
101 ///
102 /// Instead, you should specify the collation during the initial configuration with
103 /// [`SqliteConnectOptions::collation()`]. Then, if the collation fails to apply it will
104 /// return an error during the connection creation. When used with a [Pool][crate::pool::Pool],
105 /// this also ensures that the collation is applied to all connections automatically.
106 ///
107 /// Or if necessary, you can call [`.lock_handle()`][Self::lock_handle]
108 /// and create the collation directly with [`LockedSqliteHandle::create_collation()`].
109 ///
110 /// [`Error::from("WorkerCrashed")`] may still be returned if we could not communicate with the worker.
111 ///
112 /// Note that this may also block if the worker command channel is currently applying
113 /// backpressure.
114 #[deprecated(note = "Completes asynchronously. See documentation for details.")]
115 pub fn create_collation(
116 &mut self,
117 name: &str,
118 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
119 ) -> Result<(), Error> {
120 self.worker.create_collation(name, compare)
121 }
122
123 /// Lock the SQLite database handle out from the worker thread so direct SQLite API calls can
124 /// be made safely.
125 ///
126 /// Returns an error if the worker thread crashed.
127 pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
128 let guard = self.worker.unlock_db().await?;
129
130 Ok(LockedSqliteHandle { guard })
131 }
132}
133
134impl Debug for SqliteConnection {
135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136 f.debug_struct("SqliteConnection")
137 .field("row_channel_size", &self.row_channel_size)
138 .field("cached_statements_size", &self.cached_statements_size())
139 .finish()
140 }
141}
142
143impl SqliteConnection {
144 pub async fn do_close(&mut self) -> Result<(), Error> {
145 // Drop the statement worker, which should
146 // cover all references to the connection handle outside of the worker thread
147 // Ensure the worker thread has terminated
148 self.worker.shutdown().await
149 }
150
151 /// Ensure the background worker thread is alive and accepting commands.
152 pub fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
153 Box::pin(self.worker.ping())
154 }
155
156 pub fn cached_statements_size(&self) -> usize {
157 self.worker
158 .shared
159 .cached_statements_size
160 .load(std::sync::atomic::Ordering::Acquire)
161 }
162
163 pub fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
164 Box::pin(async move {
165 self.worker.clear_cache().await?;
166 Ok(())
167 })
168 }
169
170 #[doc(hidden)]
171 pub fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
172 // For SQLite, FLUSH does effectively nothing...
173 // Well, we could use this to ensure that the command channel has been cleared,
174 // but it would only develop a backlog if a lot of queries are executed and then cancelled
175 // partway through, and then this would only make that situation worse.
176 Box::pin(future::ok(()))
177 }
178
179 #[doc(hidden)]
180 pub fn should_flush(&self) -> bool {
181 false
182 }
183}
184
185impl LockedSqliteHandle<'_> {
186 /// Returns the underlying sqlite3* connection handle.
187 ///
188 /// As long as this `LockedSqliteHandle` exists, it is guaranteed that the background thread
189 /// is not making FFI calls on this database handle or any of its statements.
190 pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
191 self.guard.handle.as_non_null_ptr()
192 }
193
194 /// Apply a collation to the open database.
195 ///
196 /// See [`SqliteConnectOptions::collation()`] for details.
197 pub fn create_collation(
198 &mut self,
199 name: &str,
200 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
201 ) -> Result<(), Error> {
202 collation::create_collation(&mut self.guard.handle, name, compare)
203 }
204}
205
206impl Drop for ConnectionState {
207 fn drop(&mut self) {
208 // explicitly drop statements before the connection handle is dropped
209 self.statements.clear();
210 }
211}
212
213impl Statements {
214 fn new(capacity: usize) -> Self {
215 Statements {
216 cached: StatementCache::new(capacity),
217 temp: None,
218 }
219 }
220
221 fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
222 if !persistent || !self.cached.is_enabled() {
223 return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
224 }
225
226 let exists = self.cached.contains_key(query);
227
228 if !exists {
229 let statement = VirtualStatement::new(query, true)?;
230 self.cached.insert(query, statement);
231 }
232 //.unwrap() is safe
233 let statement = self.cached.get_mut(query).unwrap();
234
235 if exists {
236 // as this statement has been executed before, we reset before continuing
237 statement.reset()?;
238 }
239
240 Ok(statement)
241 }
242
243 fn len(&self) -> usize {
244 self.cached.len()
245 }
246
247 fn clear(&mut self) {
248 self.cached.clear();
249 self.temp = None;
250 }
251}