sqlx_core_oldapi/sqlite/connection/
mod.rs

1use std::cmp::Ordering;
2use std::fmt::{self, Debug, Formatter};
3use std::ptr::NonNull;
4
5use futures_core::future::BoxFuture;
6use futures_intrusive::sync::MutexGuard;
7use futures_util::future;
8use libsqlite3_sys::sqlite3;
9
10pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
11
12use crate::common::StatementCache;
13use crate::connection::{Connection, LogSettings};
14use crate::error::Error;
15use crate::sqlite::connection::establish::EstablishParams;
16use crate::sqlite::connection::worker::ConnectionWorker;
17use crate::sqlite::statement::VirtualStatement;
18use crate::sqlite::{Sqlite, SqliteConnectOptions};
19use crate::transaction::Transaction;
20
21pub(crate) mod collation;
22pub(crate) mod describe;
23pub(crate) mod establish;
24pub(crate) mod execute;
25mod executor;
26mod explain;
27pub(crate) mod function;
28mod handle;
29
30mod worker;
31
32/// A connection to an open [Sqlite] database.
33///
34/// Because SQLite is an in-process database accessed by blocking API calls, SQLx uses a background
35/// thread and communicates with it via channels to allow non-blocking access to the database.
36///
37/// Dropping this struct will signal the worker thread to quit and close the database, though
38/// if an error occurs there is no way to pass it back to the user this way.
39///
40/// You can explicitly call [`.close()`][Self::close] to ensure the database is closed successfully
41/// or get an error otherwise.
42pub struct SqliteConnection {
43    pub(crate) worker: ConnectionWorker,
44    pub(crate) row_channel_size: usize,
45}
46
47pub struct LockedSqliteHandle<'a> {
48    pub(crate) guard: MutexGuard<'a, ConnectionState>,
49}
50
51pub(crate) struct ConnectionState {
52    pub(crate) handle: ConnectionHandle,
53
54    // transaction status
55    pub(crate) transaction_depth: usize,
56
57    pub(crate) statements: Statements,
58
59    log_settings: LogSettings,
60}
61
62pub(crate) struct Statements {
63    // cache of semi-persistent statements
64    cached: StatementCache<VirtualStatement>,
65    // most recent non-persistent statement
66    temp: Option<VirtualStatement>,
67}
68
69impl SqliteConnection {
70    pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
71        let params = EstablishParams::from_options(options)?;
72        let worker = ConnectionWorker::establish(params).await?;
73        Ok(Self {
74            worker,
75            row_channel_size: options.row_channel_size,
76        })
77    }
78
79    /// Returns the underlying sqlite3* connection handle.
80    ///
81    /// ### Note
82    /// There is no synchronization using this method, beware that the background thread could
83    /// be making SQLite API calls concurrent to use of this method.
84    ///
85    /// You probably want to use [`.lock_handle()`][Self::lock_handle] to ensure that the worker thread is not using
86    /// the database concurrently.
87    #[deprecated = "Unsynchronized access is unsafe. See documentation for details."]
88    pub fn as_raw_handle(&mut self) -> *mut sqlite3 {
89        self.worker.handle_raw.as_ptr()
90    }
91
92    /// Apply a collation to the open database.
93    ///
94    /// See [`SqliteConnectOptions::collation()`] for details.
95    ///
96    /// ### Deprecated
97    /// Due to the rearchitecting of the SQLite driver, this method cannot actually function
98    /// synchronously and return the result directly from `sqlite3_create_collation_v2()`, so
99    /// it instead sends a message to the worker create the collation asynchronously.
100    /// If an error occurs it will simply be logged.
101    ///
102    /// Instead, you should specify the collation during the initial configuration with
103    /// [`SqliteConnectOptions::collation()`]. Then, if the collation fails to apply it will
104    /// return an error during the connection creation. When used with a [Pool][crate::pool::Pool],
105    /// this also ensures that the collation is applied to all connections automatically.
106    ///
107    /// Or if necessary, you can call [`.lock_handle()`][Self::lock_handle]
108    /// and create the collation directly with [`LockedSqliteHandle::create_collation()`].
109    ///
110    /// [`Error::WorkerCrashed`] may still be returned if we could not communicate with the worker.
111    ///
112    /// Note that this may also block if the worker command channel is currently applying
113    /// backpressure.
114    #[deprecated = "Completes asynchronously. See documentation for details."]
115    pub fn create_collation(
116        &mut self,
117        name: &str,
118        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
119    ) -> Result<(), Error> {
120        self.worker.create_collation(name, compare)
121    }
122
123    /// Lock the SQLite database handle out from the worker thread so direct SQLite API calls can
124    /// be made safely.
125    ///
126    /// Returns an error if the worker thread crashed.
127    pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
128        let guard = self.worker.unlock_db().await?;
129
130        Ok(LockedSqliteHandle { guard })
131    }
132}
133
134impl Debug for SqliteConnection {
135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136        f.debug_struct("SqliteConnection")
137            .field("row_channel_size", &self.row_channel_size)
138            .field("cached_statements_size", &self.cached_statements_size())
139            .finish()
140    }
141}
142
143impl Connection for SqliteConnection {
144    type Database = Sqlite;
145
146    type Options = SqliteConnectOptions;
147
148    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
149        Box::pin(async move {
150            let shutdown = self.worker.shutdown();
151            // Drop the statement worker, which should
152            // cover all references to the connection handle outside of the worker thread
153            drop(self);
154            // Ensure the worker thread has terminated
155            shutdown.await
156        })
157    }
158
159    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
160        Box::pin(async move {
161            drop(self);
162            Ok(())
163        })
164    }
165
166    /// Ensure the background worker thread is alive and accepting commands.
167    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
168        Box::pin(self.worker.ping())
169    }
170
171    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
172    where
173        Self: Sized,
174    {
175        Transaction::begin(self)
176    }
177
178    fn cached_statements_size(&self) -> usize {
179        self.worker
180            .shared
181            .cached_statements_size
182            .load(std::sync::atomic::Ordering::Acquire)
183    }
184
185    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
186        Box::pin(async move {
187            self.worker.clear_cache().await?;
188            Ok(())
189        })
190    }
191
192    #[doc(hidden)]
193    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
194        // For SQLite, FLUSH does effectively nothing...
195        // Well, we could use this to ensure that the command channel has been cleared,
196        // but it would only develop a backlog if a lot of queries are executed and then cancelled
197        // partway through, and then this would only make that situation worse.
198        Box::pin(future::ok(()))
199    }
200
201    #[doc(hidden)]
202    fn should_flush(&self) -> bool {
203        false
204    }
205}
206
207impl LockedSqliteHandle<'_> {
208    /// Returns the underlying sqlite3* connection handle.
209    ///
210    /// As long as this `LockedSqliteHandle` exists, it is guaranteed that the background thread
211    /// is not making FFI calls on this database handle or any of its statements.
212    pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
213        self.guard.handle.as_non_null_ptr()
214    }
215
216    /// Apply a collation to the open database.
217    ///
218    /// See [`SqliteConnectOptions::collation()`] for details.
219    pub fn create_collation(
220        &mut self,
221        name: &str,
222        compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
223    ) -> Result<(), Error> {
224        collation::create_collation(&mut self.guard.handle, name, compare)
225    }
226
227    /// Create a user-defined function.
228    /// See [`SqliteConnectOptions::create_function()`] for details.
229    pub fn create_function(&mut self, function: function::Function) -> Result<(), Error> {
230        function.create(&mut self.guard.handle)
231    }
232}
233
234impl Drop for ConnectionState {
235    fn drop(&mut self) {
236        // explicitly drop statements before the connection handle is dropped
237        self.statements.clear();
238    }
239}
240
241impl Statements {
242    fn new(capacity: usize) -> Self {
243        Statements {
244            cached: StatementCache::new(capacity),
245            temp: None,
246        }
247    }
248
249    fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
250        if !persistent || !self.cached.is_enabled() {
251            return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
252        }
253
254        let exists = self.cached.contains_key(query);
255
256        if !exists {
257            let statement = VirtualStatement::new(query, true)?;
258            self.cached.insert(query, statement);
259        }
260
261        let statement = self.cached.get_mut(query).unwrap();
262
263        if exists {
264            // as this statement has been executed before, we reset before continuing
265            statement.reset()?;
266        }
267
268        Ok(statement)
269    }
270
271    fn len(&self) -> usize {
272        self.cached.len()
273    }
274
275    fn clear(&mut self) {
276        self.cached.clear();
277        self.temp = None;
278    }
279}