rbdc_sqlite/connection/
mod.rs

1use futures_core::future::BoxFuture;
2use futures_intrusive::sync::MutexGuard;
3use futures_util::future;
4use libsqlite3_sys::sqlite3;
5use std::cmp::Ordering;
6use std::fmt::{self, Debug, Formatter};
7use std::ptr::NonNull;
8
9pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
10
11use crate::connection::establish::EstablishParams;
12use crate::connection::worker::ConnectionWorker;
13use crate::statement::VirtualStatement;
14use crate::SqliteConnectOptions;
15use rbdc::error::Error;
16use rbdc::StatementCache;
17
18pub(crate) mod collation;
19mod establish;
20mod execute;
21mod executor;
22mod handle;
23
24mod worker;
25pub use worker::Command;
26
27/// A connection to an open [Sqlite] database.
28///
29/// Because SQLite is an in-process database accessed by blocking API calls, rbdc uses a background
30/// thread and communicates with it via channels to allow non-blocking access to the database.
31///
32/// Dropping this struct will signal the worker thread to quit and close the database, though
33/// if an error occurs there is no way to pass it back to the user this way.
34///
35/// You can explicitly call [`.close()`][Self::close] to ensure the database is closed successfully
36/// or get an error otherwise.
37pub struct SqliteConnection {
38    pub(crate) worker: ConnectionWorker,
39    pub(crate) row_channel_size: usize,
40}
41
42// SAFETY: SqliteConnection is safe to share between threads because:
43// 1. The `worker` field is marked as `pub(crate)`, preventing external users from directly accessing it
44// 2. All SQLite operations are sent through thread-safe flume channels to a dedicated worker thread
45// 3. The `worker.handle_raw` pointer (raw SQLite connection) is encapsulated within the crate
46// 4. Any operation requiring direct access to the SQLite connection uses the `lock_handle()` method
47//    which acquires a mutex lock, ensuring synchronized access
48// 5. The only potentially unsafe method `as_raw_handle()` is marked as deprecated with warnings
49//    about thread safety and should be marked as `unsafe` to properly indicate its risks
50unsafe impl Sync for SqliteConnection {}
51
52pub struct LockedSqliteHandle<'a> {
53    pub(crate) guard: MutexGuard<'a, ConnectionState>,
54}
55
56pub struct ConnectionState {
57    pub(crate) handle: ConnectionHandle,
58
59    pub(crate) statements: Statements,
60}
61
62pub(crate) struct Statements {
63    // cache of semi-persistent statements
64    cached: StatementCache<VirtualStatement>,
65    // most recent non-persistent statement
66    temp: Option<VirtualStatement>,
67}
68
69impl SqliteConnection {
70    pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
71        let params = EstablishParams::from_options(options)?;
72        let worker = ConnectionWorker::establish(params).await?;
73        Ok(Self {
74            worker,
75            row_channel_size: options.row_channel_size,
76        })
77    }
78
79    /// Returns the underlying sqlite3* connection handle.
80    ///
81    /// ### Note
82    /// There is no synchronization using this method, beware that the background thread could
83    /// be making SQLite API calls concurrent to use of this method.
84    ///
85    /// You probably want to use [`.lock_handle()`][Self::lock_handle] to ensure that the worker thread is not using
86    /// the database concurrently.
87    #[deprecated(note = "Unsynchronized access is unsafe. See documentation for details.")]
88    pub unsafe fn as_raw_handle(&mut self) -> *mut sqlite3 {
89        self.worker.handle_raw.as_ptr()
90    }
91
92    /// Apply a collation to the open database.
93    ///
94    /// See [`SqliteConnectOptions::collation()`] for details.
95    ///
96    /// ### Deprecated
97    /// Due to the rearchitecting of the SQLite driver, this method cannot actually function
98    /// synchronously and return the result directly from `sqlite3_create_collation_v2()`, so
99    /// it instead sends a message to the worker create the collation asynchronously.
100    /// If an error occurs it will simply be logged.
101    ///
102    /// Instead, you should specify the collation during the initial configuration with
103    /// [`SqliteConnectOptions::collation()`]. Then, if the collation fails to apply it will
104    /// return an error during the connection creation. When used with a [Pool][crate::pool::Pool],
105    /// this also ensures that the collation is applied to all connections automatically.
106    ///
107    /// Or if necessary, you can call [`.lock_handle()`][Self::lock_handle]
108    /// and create the collation directly with [`LockedSqliteHandle::create_collation()`].
109    ///
110    /// [`Error::from("WorkerCrashed")`] may still be returned if we could not communicate with the worker.
111    ///
112    /// Note that this may also block if the worker command channel is currently applying
113    /// backpressure.
114    #[deprecated(note = "Completes asynchronously. See documentation for details.")]
115    pub fn create_collation(
116        &mut self,
117        name: &str,
118        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
119    ) -> Result<(), Error> {
120        self.worker.create_collation(name, compare)
121    }
122
123    /// Lock the SQLite database handle out from the worker thread so direct SQLite API calls can
124    /// be made safely.
125    ///
126    /// Returns an error if the worker thread crashed.
127    pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
128        let guard = self.worker.unlock_db().await?;
129
130        Ok(LockedSqliteHandle { guard })
131    }
132}
133
134impl Debug for SqliteConnection {
135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136        f.debug_struct("SqliteConnection")
137            .field("row_channel_size", &self.row_channel_size)
138            .field("cached_statements_size", &self.cached_statements_size())
139            .finish()
140    }
141}
142
143impl SqliteConnection {
144    pub async fn do_close(&mut self) -> Result<(), Error> {
145        // Drop the statement worker, which should
146        // cover all references to the connection handle outside of the worker thread
147        // Ensure the worker thread has terminated
148        self.worker.shutdown().await
149    }
150
151    /// Ensure the background worker thread is alive and accepting commands.
152    pub fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
153        Box::pin(self.worker.ping())
154    }
155
156    pub fn cached_statements_size(&self) -> usize {
157        self.worker
158            .shared
159            .cached_statements_size
160            .load(std::sync::atomic::Ordering::Acquire)
161    }
162
163    pub fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
164        Box::pin(async move {
165            self.worker.clear_cache().await?;
166            Ok(())
167        })
168    }
169
170    #[doc(hidden)]
171    pub fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
172        // For SQLite, FLUSH does effectively nothing...
173        // Well, we could use this to ensure that the command channel has been cleared,
174        // but it would only develop a backlog if a lot of queries are executed and then cancelled
175        // partway through, and then this would only make that situation worse.
176        Box::pin(future::ok(()))
177    }
178
179    #[doc(hidden)]
180    pub fn should_flush(&self) -> bool {
181        false
182    }
183}
184
185impl LockedSqliteHandle<'_> {
186    /// Returns the underlying sqlite3* connection handle.
187    ///
188    /// As long as this `LockedSqliteHandle` exists, it is guaranteed that the background thread
189    /// is not making FFI calls on this database handle or any of its statements.
190    pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
191        self.guard.handle.as_non_null_ptr()
192    }
193
194    /// Apply a collation to the open database.
195    ///
196    /// See [`SqliteConnectOptions::collation()`] for details.
197    pub fn create_collation(
198        &mut self,
199        name: &str,
200        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
201    ) -> Result<(), Error> {
202        collation::create_collation(&mut self.guard.handle, name, compare)
203    }
204}
205
206impl Drop for ConnectionState {
207    fn drop(&mut self) {
208        // explicitly drop statements before the connection handle is dropped
209        self.statements.clear();
210    }
211}
212
213impl Statements {
214    fn new(capacity: usize) -> Self {
215        Statements {
216            cached: StatementCache::new(capacity),
217            temp: None,
218        }
219    }
220
221    fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
222        if !persistent || !self.cached.is_enabled() {
223            return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
224        }
225
226        let exists = self.cached.contains_key(query);
227
228        if !exists {
229            let statement = VirtualStatement::new(query, true)?;
230            self.cached.insert(query, statement);
231        }
232        //.unwrap() is safe
233        let statement = self.cached.get_mut(query).unwrap();
234
235        if exists {
236            // as this statement has been executed before, we reset before continuing
237            statement.reset()?;
238        }
239
240        Ok(statement)
241    }
242
243    fn len(&self) -> usize {
244        self.cached.len()
245    }
246
247    fn clear(&mut self) {
248        self.cached.clear();
249        self.temp = None;
250    }
251}