sqlx_core_guts/sqlite/connection/
mod.rs1use std::cmp::Ordering;
2use std::fmt::{self, Debug, Formatter};
3use std::ptr::NonNull;
4
5use futures_core::future::BoxFuture;
6use futures_intrusive::sync::MutexGuard;
7use futures_util::future;
8use libsqlite3_sys::sqlite3;
9
10pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
11
12use crate::common::StatementCache;
13use crate::connection::{Connection, LogSettings};
14use crate::error::Error;
15use crate::sqlite::connection::establish::EstablishParams;
16use crate::sqlite::connection::worker::ConnectionWorker;
17use crate::sqlite::statement::VirtualStatement;
18use crate::sqlite::{Sqlite, SqliteConnectOptions};
19use crate::transaction::Transaction;
20
21pub(crate) mod collation;
22mod describe;
23mod establish;
24mod execute;
25mod executor;
26mod explain;
27mod handle;
28
29mod worker;
30
31pub struct SqliteConnection {
42 pub(crate) worker: ConnectionWorker,
43 pub(crate) row_channel_size: usize,
44}
45
46pub struct LockedSqliteHandle<'a> {
47 pub(crate) guard: MutexGuard<'a, ConnectionState>,
48}
49
50pub(crate) struct ConnectionState {
51 pub(crate) handle: ConnectionHandle,
52
53 pub(crate) transaction_depth: usize,
55
56 pub(crate) statements: Statements,
57
58 log_settings: LogSettings,
59}
60
61pub(crate) struct Statements {
62 cached: StatementCache<VirtualStatement>,
64 temp: Option<VirtualStatement>,
66}
67
68impl SqliteConnection {
69 pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
70 let params = EstablishParams::from_options(options)?;
71 let worker = ConnectionWorker::establish(params).await?;
72 Ok(Self {
73 worker,
74 row_channel_size: options.row_channel_size,
75 })
76 }
77
78 #[deprecated = "Unsynchronized access is unsafe. See documentation for details."]
87 pub fn as_raw_handle(&mut self) -> *mut sqlite3 {
88 self.worker.handle_raw.as_ptr()
89 }
90
91 #[deprecated = "Completes asynchronously. See documentation for details."]
114 pub fn create_collation(
115 &mut self,
116 name: &str,
117 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
118 ) -> Result<(), Error> {
119 self.worker.create_collation(name, compare)
120 }
121
122 pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
127 let guard = self.worker.unlock_db().await?;
128
129 Ok(LockedSqliteHandle { guard })
130 }
131}
132
133impl Debug for SqliteConnection {
134 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
135 f.debug_struct("SqliteConnection")
136 .field("row_channel_size", &self.row_channel_size)
137 .field("cached_statements_size", &self.cached_statements_size())
138 .finish()
139 }
140}
141
142impl Connection for SqliteConnection {
143 type Database = Sqlite;
144
145 type Options = SqliteConnectOptions;
146
147 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
148 Box::pin(async move {
149 let shutdown = self.worker.shutdown();
150 drop(self);
153 shutdown.await
155 })
156 }
157
158 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
159 Box::pin(async move {
160 drop(self);
161 Ok(())
162 })
163 }
164
165 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
167 Box::pin(self.worker.ping())
168 }
169
170 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
171 where
172 Self: Sized,
173 {
174 Transaction::begin(self)
175 }
176
177 fn cached_statements_size(&self) -> usize {
178 self.worker
179 .shared
180 .cached_statements_size
181 .load(std::sync::atomic::Ordering::Acquire)
182 }
183
184 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
185 Box::pin(async move {
186 self.worker.clear_cache().await?;
187 Ok(())
188 })
189 }
190
191 #[doc(hidden)]
192 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
193 Box::pin(future::ok(()))
198 }
199
200 #[doc(hidden)]
201 fn should_flush(&self) -> bool {
202 false
203 }
204}
205
206impl LockedSqliteHandle<'_> {
207 pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
212 self.guard.handle.as_non_null_ptr()
213 }
214
215 pub fn create_collation(
219 &mut self,
220 name: &str,
221 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
222 ) -> Result<(), Error> {
223 collation::create_collation(&mut self.guard.handle, name, compare)
224 }
225}
226
227impl Drop for ConnectionState {
228 fn drop(&mut self) {
229 self.statements.clear();
231 }
232}
233
234impl Statements {
235 fn new(capacity: usize) -> Self {
236 Statements {
237 cached: StatementCache::new(capacity),
238 temp: None,
239 }
240 }
241
242 fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
243 if !persistent || !self.cached.is_enabled() {
244 return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
245 }
246
247 let exists = self.cached.contains_key(query);
248
249 if !exists {
250 let statement = VirtualStatement::new(query, true)?;
251 self.cached.insert(query, statement);
252 }
253
254 let statement = self.cached.get_mut(query).unwrap();
255
256 if exists {
257 statement.reset()?;
259 }
260
261 Ok(statement)
262 }
263
264 fn len(&self) -> usize {
265 self.cached.len()
266 }
267
268 fn clear(&mut self) {
269 self.cached.clear();
270 self.temp = None;
271 }
272}