sqlx_core_guts/sqlite/connection/
mod.rs

1use std::cmp::Ordering;
2use std::fmt::{self, Debug, Formatter};
3use std::ptr::NonNull;
4
5use futures_core::future::BoxFuture;
6use futures_intrusive::sync::MutexGuard;
7use futures_util::future;
8use libsqlite3_sys::sqlite3;
9
10pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
11
12use crate::common::StatementCache;
13use crate::connection::{Connection, LogSettings};
14use crate::error::Error;
15use crate::sqlite::connection::establish::EstablishParams;
16use crate::sqlite::connection::worker::ConnectionWorker;
17use crate::sqlite::statement::VirtualStatement;
18use crate::sqlite::{Sqlite, SqliteConnectOptions};
19use crate::transaction::Transaction;
20
21pub(crate) mod collation;
22mod describe;
23mod establish;
24mod execute;
25mod executor;
26mod explain;
27mod handle;
28
29mod worker;
30
31/// A connection to an open [Sqlite] database.
32///
33/// Because SQLite is an in-process database accessed by blocking API calls, SQLx uses a background
34/// thread and communicates with it via channels to allow non-blocking access to the database.
35///
36/// Dropping this struct will signal the worker thread to quit and close the database, though
37/// if an error occurs there is no way to pass it back to the user this way.
38///
39/// You can explicitly call [`.close()`][Self::close] to ensure the database is closed successfully
40/// or get an error otherwise.
41pub struct SqliteConnection {
42    pub(crate) worker: ConnectionWorker,
43    pub(crate) row_channel_size: usize,
44}
45
46pub struct LockedSqliteHandle<'a> {
47    pub(crate) guard: MutexGuard<'a, ConnectionState>,
48}
49
50pub(crate) struct ConnectionState {
51    pub(crate) handle: ConnectionHandle,
52
53    // transaction status
54    pub(crate) transaction_depth: usize,
55
56    pub(crate) statements: Statements,
57
58    log_settings: LogSettings,
59}
60
61pub(crate) struct Statements {
62    // cache of semi-persistent statements
63    cached: StatementCache<VirtualStatement>,
64    // most recent non-persistent statement
65    temp: Option<VirtualStatement>,
66}
67
68impl SqliteConnection {
69    pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
70        let params = EstablishParams::from_options(options)?;
71        let worker = ConnectionWorker::establish(params).await?;
72        Ok(Self {
73            worker,
74            row_channel_size: options.row_channel_size,
75        })
76    }
77
78    /// Returns the underlying sqlite3* connection handle.
79    ///
80    /// ### Note
81    /// There is no synchronization using this method, beware that the background thread could
82    /// be making SQLite API calls concurrent to use of this method.
83    ///
84    /// You probably want to use [`.lock_handle()`][Self::lock_handle] to ensure that the worker thread is not using
85    /// the database concurrently.
86    #[deprecated = "Unsynchronized access is unsafe. See documentation for details."]
87    pub fn as_raw_handle(&mut self) -> *mut sqlite3 {
88        self.worker.handle_raw.as_ptr()
89    }
90
91    /// Apply a collation to the open database.
92    ///
93    /// See [`SqliteConnectOptions::collation()`] for details.
94    ///
95    /// ### Deprecated
96    /// Due to the rearchitecting of the SQLite driver, this method cannot actually function
97    /// synchronously and return the result directly from `sqlite3_create_collation_v2()`, so
98    /// it instead sends a message to the worker create the collation asynchronously.
99    /// If an error occurs it will simply be logged.
100    ///
101    /// Instead, you should specify the collation during the initial configuration with
102    /// [`SqliteConnectOptions::collation()`]. Then, if the collation fails to apply it will
103    /// return an error during the connection creation. When used with a [Pool][crate::pool::Pool],
104    /// this also ensures that the collation is applied to all connections automatically.
105    ///
106    /// Or if necessary, you can call [`.lock_handle()`][Self::lock_handle]
107    /// and create the collation directly with [`LockedSqliteHandle::create_collation()`].
108    ///
109    /// [`Error::WorkerCrashed`] may still be returned if we could not communicate with the worker.
110    ///
111    /// Note that this may also block if the worker command channel is currently applying
112    /// backpressure.
113    #[deprecated = "Completes asynchronously. See documentation for details."]
114    pub fn create_collation(
115        &mut self,
116        name: &str,
117        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
118    ) -> Result<(), Error> {
119        self.worker.create_collation(name, compare)
120    }
121
122    /// Lock the SQLite database handle out from the worker thread so direct SQLite API calls can
123    /// be made safely.
124    ///
125    /// Returns an error if the worker thread crashed.
126    pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
127        let guard = self.worker.unlock_db().await?;
128
129        Ok(LockedSqliteHandle { guard })
130    }
131}
132
133impl Debug for SqliteConnection {
134    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
135        f.debug_struct("SqliteConnection")
136            .field("row_channel_size", &self.row_channel_size)
137            .field("cached_statements_size", &self.cached_statements_size())
138            .finish()
139    }
140}
141
142impl Connection for SqliteConnection {
143    type Database = Sqlite;
144
145    type Options = SqliteConnectOptions;
146
147    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
148        Box::pin(async move {
149            let shutdown = self.worker.shutdown();
150            // Drop the statement worker, which should
151            // cover all references to the connection handle outside of the worker thread
152            drop(self);
153            // Ensure the worker thread has terminated
154            shutdown.await
155        })
156    }
157
158    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
159        Box::pin(async move {
160            drop(self);
161            Ok(())
162        })
163    }
164
165    /// Ensure the background worker thread is alive and accepting commands.
166    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
167        Box::pin(self.worker.ping())
168    }
169
170    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
171    where
172        Self: Sized,
173    {
174        Transaction::begin(self)
175    }
176
177    fn cached_statements_size(&self) -> usize {
178        self.worker
179            .shared
180            .cached_statements_size
181            .load(std::sync::atomic::Ordering::Acquire)
182    }
183
184    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
185        Box::pin(async move {
186            self.worker.clear_cache().await?;
187            Ok(())
188        })
189    }
190
191    #[doc(hidden)]
192    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
193        // For SQLite, FLUSH does effectively nothing...
194        // Well, we could use this to ensure that the command channel has been cleared,
195        // but it would only develop a backlog if a lot of queries are executed and then cancelled
196        // partway through, and then this would only make that situation worse.
197        Box::pin(future::ok(()))
198    }
199
200    #[doc(hidden)]
201    fn should_flush(&self) -> bool {
202        false
203    }
204}
205
206impl LockedSqliteHandle<'_> {
207    /// Returns the underlying sqlite3* connection handle.
208    ///
209    /// As long as this `LockedSqliteHandle` exists, it is guaranteed that the background thread
210    /// is not making FFI calls on this database handle or any of its statements.
211    pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
212        self.guard.handle.as_non_null_ptr()
213    }
214
215    /// Apply a collation to the open database.
216    ///
217    /// See [`SqliteConnectOptions::collation()`] for details.
218    pub fn create_collation(
219        &mut self,
220        name: &str,
221        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
222    ) -> Result<(), Error> {
223        collation::create_collation(&mut self.guard.handle, name, compare)
224    }
225}
226
227impl Drop for ConnectionState {
228    fn drop(&mut self) {
229        // explicitly drop statements before the connection handle is dropped
230        self.statements.clear();
231    }
232}
233
234impl Statements {
235    fn new(capacity: usize) -> Self {
236        Statements {
237            cached: StatementCache::new(capacity),
238            temp: None,
239        }
240    }
241
242    fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
243        if !persistent || !self.cached.is_enabled() {
244            return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
245        }
246
247        let exists = self.cached.contains_key(query);
248
249        if !exists {
250            let statement = VirtualStatement::new(query, true)?;
251            self.cached.insert(query, statement);
252        }
253
254        let statement = self.cached.get_mut(query).unwrap();
255
256        if exists {
257            // as this statement has been executed before, we reset before continuing
258            statement.reset()?;
259        }
260
261        Ok(statement)
262    }
263
264    fn len(&self) -> usize {
265        self.cached.len()
266    }
267
268    fn clear(&mut self) {
269        self.cached.clear();
270        self.temp = None;
271    }
272}