cdbc_sqlite/connection/
mod.rs1
2use cdbc::connection::{Connection};
3use cdbc::error::Error;
4use crate::statement::{VirtualStatement};
5use crate::{Sqlite, SqliteConnectOptions};
6use cdbc::transaction::Transaction;
7use libsqlite3_sys::sqlite3;
8use std::cmp::Ordering;
9use std::fmt::{self, Debug, Formatter};
10use std::ptr::NonNull;
11use mco::std::sync::MutexGuard;
12use either::Either;
13use cdbc::database::{Database, HasStatement};
14use cdbc::{Execute, Executor};
15use cdbc::describe::Describe;
16use cdbc::io::chan_stream::ChanStream;
17use cdbc::utils::statement_cache::StatementCache;
18
19pub mod collation;
20mod describe;
21mod establish;
22mod execute;
23mod executor;
24mod explain;
25mod handle;
26
27mod worker;
28mod executor_mut;
29
30pub use handle::{ConnectionHandle, ConnectionHandleRaw};
31use crate::connection::establish::EstablishParams;
32use crate::connection::worker::ConnectionWorker;
33
34pub struct SqliteConnection {
45 pub worker: ConnectionWorker,
46 pub row_channel_size: usize,
47}
48
49pub struct LockedSqliteHandle<'a> {
50 pub guard: MutexGuard<'a, ConnectionState>,
51}
52
53pub struct ConnectionState {
54 pub handle: ConnectionHandle,
55
56 pub transaction_depth: usize,
58
59 pub statements: Statements,
60}
61
62pub struct Statements {
63 cached: StatementCache<VirtualStatement>,
65 temp: Option<VirtualStatement>,
67}
68
69impl SqliteConnection {
70 pub fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
71 let params = EstablishParams::from_options(options)?;
72 let worker = ConnectionWorker::establish(params)?;
73 Ok(Self {
74 worker,
75 row_channel_size: options.row_channel_size,
76 })
77 }
78
79 #[deprecated = "Unsynchronized access is unsafe. See documentation for details."]
88 pub fn as_raw_handle(&mut self) -> *mut sqlite3 {
89 self.worker.handle_raw.as_ptr()
90 }
91
92 #[deprecated = "Completes asynchronously. See documentation for details."]
115 pub fn create_collation(
116 &mut self,
117 name: &str,
118 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
119 ) -> Result<(), Error> {
120 self.worker.create_collation(name, compare)
121 }
122
123 pub fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
128 let guard = self.worker.unlock_db()?;
129
130 Ok(LockedSqliteHandle { guard })
131 }
132}
133
134impl Debug for SqliteConnection {
135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136 f.debug_struct("SqliteConnection")
137 .field("row_channel_size", &self.row_channel_size)
138 .field("cached_statements_size", &self.cached_statements_size())
139 .finish()
140 }
141}
142
143impl Connection for SqliteConnection {
144
145 type Options = SqliteConnectOptions;
146
147 fn close(mut self) -> Result<(), Error> {
148 let shutdown = self.worker.shutdown();
149 drop(self);
152 shutdown
154 }
155
156 fn ping(&mut self) -> Result<(), Error> {
158 self.worker.ping()
159 }
160
161 fn begin(&mut self) -> Result<Transaction<'_, Self::Database>, Error>
162 where
163 Self: Sized,
164 {
165 Transaction::begin(self)
166 }
167
168 fn cached_statements_size(&self) -> usize {
169 self.worker
170 .shared
171 .cached_statements_size
172 .load(std::sync::atomic::Ordering::Acquire)
173 }
174
175 fn clear_cached_statements(&mut self) -> Result<(), Error> {
176 self.worker.clear_cache()?;
177 Ok(())
178 }
179
180 #[doc(hidden)]
181 fn flush(&mut self) -> Result<(), Error> {
182 Ok(())
187 }
188
189 #[doc(hidden)]
190 fn should_flush(&self) -> bool {
191 false
192 }
193}
194
195impl LockedSqliteHandle<'_> {
196 pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
201 self.guard.handle.as_non_null_ptr()
202 }
203
204 pub fn create_collation(
208 &mut self,
209 name: &str,
210 compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
211 ) -> Result<(), Error> {
212 collation::create_collation(&mut self.guard.handle, name, compare)
213 }
214}
215
216impl Drop for ConnectionState {
217 fn drop(&mut self) {
218 self.statements.clear();
220 }
221}
222
223impl Statements {
224 fn new(capacity: usize) -> Self {
225 Statements {
226 cached: StatementCache::new(capacity),
227 temp: None,
228 }
229 }
230
231 fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
232 if !persistent || !self.cached.is_enabled() {
233 return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
234 }
235
236 let exists = self.cached.contains_key(query);
237
238 if !exists {
239 let statement = VirtualStatement::new(query, true)?;
240 self.cached.insert(query, statement);
241 }
242
243 let statement = self.cached.get_mut(query).unwrap();
244
245 if exists {
246 statement.reset()?;
248 }
249
250 Ok(statement)
251 }
252
253 fn len(&self) -> usize {
254 self.cached.len()
255 }
256
257 fn clear(&mut self) {
258 self.cached.clear();
259 self.temp = None;
260 }
261}