sqlsrv/lib.rs
1#![allow(clippy::doc_markdown)]
2
3//! A library for implementing an in-process SQLite database server.
4//!
5//! # Connection pooling
6//! sqlsrv implements connection pooling that reflects the concurrency model
7//! of SQLite: It supports multiple parallel readers, but only one writer.
8//!
9//! # Thread pooling
10//! In addition to pooling connections, the library supports optionally using
11//! a thread pool for diaptching database operations onto threads.
12//!
13//! # Incremental auto-clean
14//! The connection pool has built-in support for setting up incremental
15//! autovacuum, and can be configured to implicitly run incremental vacuuming.
16//!
17//! To use this feature, a "maximum dirt" value is configured on the connection
18//! pool. Whenever the writer connection performs changes to the database it
19//! can add "dirt" to the connection. When the writer connection is returned
20//! to the connection pool it checks to see if the amount of dirt is equal to
21//! or greater than the configured "maximum dirt" threshold. If the threshold
22//! has been reached, an incremental autovacuum is performed.
23//!
24//! # Features
25//! | Feature | Function
26//! |----------|----------
27//! | `tpool` | Enable functions/methods that use a thread pool.
28
29#![cfg_attr(docsrs, feature(doc_cfg))]
30
31mod changehook;
32mod err;
33mod rawhook;
34mod wrconn;
35
36pub mod utils;
37
38use std::{
39 fmt, mem::ManuallyDrop, num::NonZeroUsize, path::Path, str::FromStr,
40 sync::Arc
41};
42
43use parking_lot::{Condvar, Mutex};
44
45use r2d2::{CustomizeConnection, PooledConnection};
46
47pub use r2d2_sqlite::SqliteConnectionManager;
48
49pub use r2d2;
50pub use rusqlite;
51
52use rusqlite::{Connection, OpenFlags, params};
53
54#[cfg(feature = "tpool")]
55use threadpool::ThreadPool;
56
57pub use changehook::ChangeLogHook;
58pub use err::Error;
59pub use rawhook::{Action, Hook};
60pub use wrconn::WrConn;
61
62
63/// Wrapper around a SQL functions registration callback used to select which
64/// connection types to perform registrations on.
65pub enum RegOn<F>
66where
67 F: Fn(&Connection) -> Result<(), rusqlite::Error>
68{
69 /// This registration callback should only be called for read-only
70 /// connections.
71 RO(F),
72
73 /// This registration callback should only be called for the read/write
74 /// connections.
75 RW(F),
76
77 /// This registration callback should be called for both the read-only and
78 /// read/write connections.
79 Both(F)
80}
81
82
83type RegCb = dyn Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync;
84
85enum CbType {
86 Ro(Box<RegCb>),
87 Rw(Box<RegCb>),
88 Both(Box<RegCb>)
89}
90
91
92/// Used to register application callbacks to set up database schema.
93pub trait SchemaMgr {
94 /// Called just after the writer connection has been created and is intended
95 /// to perform database initialization (create tables, add predefined rows,
96 /// etc).
97 ///
98 /// `newdb` will be `true` if the database file did not exist prior to
99 /// initialization.
100 ///
101 /// While this method can be used to perform schema upgrades, there are two
102 /// specialized methods (`need_upgrade()` and `upgrade()`) that can be used
103 /// for this purpose instead.
104 ///
105 /// The default implementation does nothing but returns `Ok(())`.
106 ///
107 /// # Errors
108 /// Application-specific error.
109 #[allow(unused_variables)]
110 fn init(&self, conn: &mut Connection, newdb: bool) -> Result<(), Error> {
111 Ok(())
112 }
113
114 /// Application callback used to determine if the database schema is out of
115 /// date and needs to be updated.
116 ///
117 /// The default implementation does nothing but returns `Ok(false)`.
118 ///
119 /// # Errors
120 /// Application-specific error.
121 #[allow(unused_variables)]
122 fn need_upgrade(&self, conn: &Connection) -> Result<bool, Error> {
123 Ok(false)
124 }
125
126 /// Upgrade the database schema.
127 ///
128 /// This is called if [`SchemaMgr::need_upgrade()`] returns `Ok(true)`.
129 ///
130 /// The default implementation does nothing but returns `Ok(())`.
131 ///
132 /// # Errors
133 /// Application-specific error.
134 #[allow(unused_variables)]
135 fn upgrade(&self, conn: &mut Connection) -> Result<(), Error> {
136 Ok(())
137 }
138}
139
140
141#[derive(Clone, Debug)]
142struct AutoClean {
143 /// The amount of dirt that must have accumulated before an incremental
144 /// vacuum is triggered.
145 ///
146 /// If this is set to `0` the incremental vacuum will be run at every
147 /// opportunity.
148 dirt_threshold: usize,
149
150 /// The number of pages to process from the freelist each time an
151 /// incremental autovacuum is triggered.
152 ///
153 /// If this is `None` then all pages will be processed.
154 npages: Option<NonZeroUsize>
155}
156
157
158/// Read-only connection.
159struct RoConn {
160 regfuncs: Vec<CbType>
161}
162
163impl fmt::Debug for RoConn {
164 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165 write!(f, "RoConn {{}}")
166 }
167}
168
169
170impl CustomizeConnection<rusqlite::Connection, rusqlite::Error> for RoConn {
171 fn on_acquire(
172 &self,
173 conn: &mut rusqlite::Connection
174 ) -> Result<(), rusqlite::Error> {
175 conn.pragma_update(None, "foreign_keys", "ON")?;
176
177 for rf in &self.regfuncs {
178 match rf {
179 CbType::Ro(f) | CbType::Both(f) => {
180 f(conn)?;
181 }
182 CbType::Rw(_) => {}
183 }
184 }
185
186 Ok(())
187 }
188
189 fn on_release(&self, _conn: rusqlite::Connection) {}
190}
191
192
193/// Builder for constructing a [`ConnPool`] object.
194pub struct Builder {
195 schmgr: Box<dyn SchemaMgr>,
196 full_vacuum: bool,
197 max_readers: usize,
198 autoclean: Option<AutoClean>,
199 hook: Option<Arc<dyn Hook + Send + Sync>>,
200 regfuncs: Option<Vec<CbType>>,
201 #[cfg(feature = "tpool")]
202 tpool: Option<Arc<ThreadPool>>
203}
204
205/// Internal methods.
206impl Builder {
207 /// Open the writer connection.
208 fn open_writer(&self, fname: &Path) -> Result<Connection, rusqlite::Error> {
209 let conn = Connection::open(fname)?;
210 conn.pragma_update(None, "journal_mode", "WAL")?;
211 conn.pragma_update(None, "foreign_keys", "ON")?;
212
213 // Only enable incremental auto vacuum if a dirt watermark has been
214 // configured.
215 if self.autoclean.is_some() {
216 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL")?;
217 }
218
219 Ok(conn)
220 }
221
222 /// Run a full vacuum.
223 ///
224 /// This is an internal function that may be called by `build()` if a full
225 /// vacuum has been requested.
226 fn full_vacuum(conn: &Connection) -> Result<(), rusqlite::Error> {
227 conn.execute("VACUUM;", params![])?;
228 Ok(())
229 }
230
231 fn create_ro_pool(
232 &self,
233 fname: &Path,
234 regfuncs: Vec<CbType>
235 ) -> Result<r2d2::Pool<SqliteConnectionManager>, r2d2::Error> {
236 let fl =
237 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
238 let manager = SqliteConnectionManager::file(fname).with_flags(fl);
239 let roconn_initterm = RoConn { regfuncs };
240 let max_readers = u32::try_from(self.max_readers).unwrap();
241 r2d2::Pool::builder()
242 .max_size(max_readers)
243 .connection_customizer(Box::new(roconn_initterm))
244 .build(manager)
245 }
246}
247
248
249impl Builder {
250 /// Create a new `Builder` for constructing a [`ConnPool`] object.
251 ///
252 /// Default to not run a full vacuum of the database on initialization and
253 /// create 2 read-only connections for the pool.
254 /// No workers thread pool will be used.
255 #[must_use]
256 pub fn new(schmgr: Box<dyn SchemaMgr>) -> Self {
257 Self {
258 schmgr,
259 full_vacuum: false,
260 max_readers: 2,
261 autoclean: None,
262 hook: None,
263 regfuncs: None,
264 #[cfg(feature = "tpool")]
265 tpool: None
266 }
267 }
268
269 /// Trigger a full vacuum when initializing the connection pool.
270 ///
271 /// Operates on an owned `Builder` object.
272 #[must_use]
273 pub const fn init_vacuum(mut self) -> Self {
274 self.full_vacuum = true;
275 self
276 }
277
278 /// Trigger a full vacuum when initializing the connection pool.
279 ///
280 /// Operates on a borrowed `Builder` object.
281 pub const fn init_vacuum_r(&mut self) -> &mut Self {
282 self.full_vacuum = true;
283 self
284 }
285
286 /// Set maximum number of readers in the connection pool.
287 ///
288 /// Operates on an owned `Builder` object.
289 #[must_use]
290 pub const fn max_readers(mut self, n: usize) -> Self {
291 self.max_readers = n;
292 self
293 }
294
295 /// Set maximum number of readers in the connection pool.
296 ///
297 /// Operates on a borrowed `Builder` object.
298 pub const fn max_readers_r(&mut self, n: usize) -> &mut Self {
299 self.max_readers = n;
300 self
301 }
302
303 /// Request that a "raw" update hook be added to the writer connection.
304 ///
305 /// Operates on an owned `Builder` object.
306 #[must_use]
307 pub fn hook(mut self, hook: Arc<dyn Hook + Send + Sync>) -> Self {
308 self.hook = Some(hook);
309 self
310 }
311
312 /// Request that a "raw" update hook be added to the writer connection.
313 ///
314 /// Operates on a borrowed `Builder` object.
315 pub fn hook_r(&mut self, hook: Arc<dyn Hook + Send + Sync>) -> &mut Self {
316 self.hook = Some(hook);
317 self
318 }
319
320 /// Enable incremental autovacuum.
321 ///
322 /// `dirt_watermark` is used to set what amount of "dirt" is required in
323 /// order to trigger an autoclean. `nfree` is the number of blocks in the
324 /// freelists to process each time the autoclean is run.
325 #[must_use]
326 pub const fn incremental_autoclean(
327 mut self,
328 dirt_watermark: usize,
329 npages: Option<NonZeroUsize>
330 ) -> Self {
331 self.autoclean = Some(AutoClean {
332 dirt_threshold: dirt_watermark,
333 npages
334 });
335 self
336 }
337
338 /// Add a callback to register one or more scalar SQL functions.
339 ///
340 /// The closure should be wrapped in a `RegOn::RO()` if the function should
341 /// only be registered on read-only connections. `RegOn::RW()` is used to
342 /// register the function on the read/write connection. Use `RegOn::Both()`
343 /// to register in both read-only and the read/write connection.
344 #[must_use]
345 pub fn reg_scalar_fn<F>(mut self, r: RegOn<F>) -> Self
346 where
347 F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
348 {
349 self.reg_scalar_fn_r(r);
350 self
351 }
352
353 /// Add a callback to register one or more scalar SQL functions.
354 ///
355 /// This is the same as [`Builder::reg_scalar_fn()`], but it operates on
356 /// `&mut Builder` rather than passing ownership.
357 pub fn reg_scalar_fn_r<F>(&mut self, r: RegOn<F>) -> &mut Self
358 where
359 F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
360 {
361 match r {
362 RegOn::RO(f) => {
363 self
364 .regfuncs
365 .get_or_insert(Vec::new())
366 .push(CbType::Ro(Box::new(f)));
367 }
368 RegOn::RW(f) => {
369 self
370 .regfuncs
371 .get_or_insert(Vec::new())
372 .push(CbType::Rw(Box::new(f)));
373 }
374 RegOn::Both(f) => {
375 self
376 .regfuncs
377 .get_or_insert(Vec::new())
378 .push(CbType::Both(Box::new(f)));
379 }
380 }
381 self
382 }
383
384 #[cfg(feature = "tpool")]
385 #[must_use]
386 pub fn thread_pool(mut self, tpool: Arc<ThreadPool>) -> Self {
387 self.tpool = Some(tpool);
388 self
389 }
390
391 #[cfg(feature = "tpool")]
392 pub fn thread_pool_r(&mut self, tpool: Arc<ThreadPool>) -> &mut Self {
393 self.tpool = Some(tpool);
394 self
395 }
396
397 /// Construct a connection pool.
398 ///
399 /// # Errors
400 /// [`Error::Sqlite`] will be returned if a database error occurred.
401 pub fn build<P>(mut self, fname: P) -> Result<ConnPool, Error>
402 where
403 P: AsRef<Path>
404 {
405 // ToDo: Use std::path::absolute() once stabilized
406 let fname = fname.as_ref();
407 let db_exists = fname.exists();
408
409 //
410 // Set up the read/write connection
411 //
412 // This must be done before creating the read-only connection pool, because
413 // at that point the database file must already exist.
414 //
415 let mut conn = self.open_writer(fname)?;
416
417 //
418 // Register read/write connection functions
419 //
420
421 // Option<Vec<T>> --> Vec<T>
422 let regfuncs = self.regfuncs.take().unwrap_or_default();
423
424 // Call SQL function registration callbacks for read/write connection.
425 for rf in ®funcs {
426 match rf {
427 CbType::Rw(f) | CbType::Both(f) => {
428 f(&conn)?;
429 }
430 CbType::Ro(_) => {}
431 }
432 }
433
434 //
435 // Perform schema initialization.
436 //
437 // This must be done after auto_vacuum is set, because auto_vacuum requires
438 // configuration before any tables have been created.
439 // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
440 //
441 self.schmgr.init(&mut conn, !db_exists)?;
442 if self.schmgr.need_upgrade(&conn)? {
443 self.schmgr.upgrade(&mut conn)?;
444 }
445
446 //
447 // Perform a full vacuum if requested to do so.
448 //
449 if self.full_vacuum {
450 Self::full_vacuum(&conn)?;
451 }
452
453 //
454 // Register a callback hook
455 //
456 if let Some(ref hook) = self.hook {
457 rawhook::hook(&conn, hook);
458 }
459
460 //
461 // Set up connection pool for read-only connections.
462 //
463 let rpool = self.create_ro_pool(fname, regfuncs)?;
464
465 //
466 // Prepare shared data
467 //
468 let iconn = InnerWrConn { conn, dirt: 0 };
469 let inner = Inner { conn: Some(iconn) };
470 let sh = Arc::new(Shared {
471 inner: Mutex::new(inner),
472 signal: Condvar::new(),
473 autoclean: self.autoclean.clone()
474 });
475
476 Ok(ConnPool {
477 rpool,
478 sh,
479 #[cfg(feature = "tpool")]
480 tpool: self.tpool
481 })
482 }
483
484
485 /// Construct a connection pool.
486 ///
487 /// Same as [`Builder::build()`], but register a change log callback on the
488 /// writer as well.
489 ///
490 /// This method should not be called if the application has requested to add
491 /// a raw update hook.
492 ///
493 /// # Errors
494 /// [`Error::Sqlite`] is returned for database errors.
495 ///
496 /// # Panics
497 /// This method will panic if a hook has been added to the Builder.
498 pub fn build_with_changelog_hook<P, D, T>(
499 mut self,
500 fname: P,
501 hook: Box<dyn ChangeLogHook<Database = D, Table = T> + Send>
502 ) -> Result<ConnPool, Error>
503 where
504 P: AsRef<Path>,
505 D: FromStr + Send + Sized + 'static,
506 T: FromStr + Send + Sized + 'static
507 {
508 assert!(
509 self.hook.is_some(),
510 "Can't build a connection pool with both a raw and changelog hook"
511 );
512
513 // ToDo: Use std::path::absolute() once stabilized
514 let fname = fname.as_ref();
515 let db_exists = fname.exists();
516
517 //
518 // Set up the read/write connection
519 //
520 // This must be done before creating the read-only connection pool, because
521 // at that point the database file must already exist.
522 //
523 let mut conn = self.open_writer(fname)?;
524
525 //
526 // Register read/write connection functions
527 //
528
529 // Option<Vec<T>> --> Vec<T>
530 let regfuncs = self.regfuncs.take().unwrap_or_default();
531
532 // Call SQL function registration callbacks for read/write connection.
533 for rf in ®funcs {
534 match rf {
535 CbType::Rw(f) | CbType::Both(f) => {
536 f(&conn)?;
537 }
538 CbType::Ro(_) => {}
539 }
540 }
541
542
543 //
544 // Perform schema initialization.
545 //
546 // This must be done after auto_vacuum is set, because auto_vacuum requires
547 // configuration before any tables have been created.
548 // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
549 //
550 self.schmgr.init(&mut conn, !db_exists)?;
551 if self.schmgr.need_upgrade(&conn)? {
552 self.schmgr.upgrade(&mut conn)?;
553 }
554
555 //
556 // Perform a full vacuum if requested to do so.
557 //
558 if self.full_vacuum {
559 Self::full_vacuum(&conn)?;
560 }
561
562 //
563 // Register a callback hook
564 //
565 changehook::hook(&conn, hook);
566
567 //
568 // Set up connection pool for read-only connections.
569 //
570 let rpool = self.create_ro_pool(fname, regfuncs)?;
571
572 //
573 // Prepare shared data
574 //
575 let iconn = InnerWrConn { conn, dirt: 0 };
576 let inner = Inner { conn: Some(iconn) };
577 let sh = Arc::new(Shared {
578 inner: Mutex::new(inner),
579 signal: Condvar::new(),
580 autoclean: self.autoclean.clone()
581 });
582
583 Ok(ConnPool {
584 rpool,
585 sh,
586 #[cfg(feature = "tpool")]
587 tpool: self.tpool
588 })
589 }
590}
591
592
593/// Inner writer connection object.
594///
595/// When the writer is acquired from the connection pool it passes an instance
596/// of this struct to the WrConn object.
597struct InnerWrConn {
598 /// The writer connection.
599 conn: Connection,
600
601 /// Amount of accumulated dirt.
602 ///
603 /// Only used when the autoclean feature is used.
604 dirt: usize
605}
606
607
608struct Inner {
609 /// The writer connection.
610 ///
611 /// This is `None` when a `WrConn` exists, and is set to `Some()` by
612 /// `WrConn`'s Drop implementation.
613 conn: Option<InnerWrConn>
614}
615
616struct Shared {
617 inner: Mutex<Inner>,
618 signal: Condvar,
619 autoclean: Option<AutoClean>
620}
621
622
623/// SQLite connection pool.
624///
625/// This is a specialized connection pool that is defined specifically for
626/// sqlite, and only allows a single writer but multiple readers.
627// Note: In Rust the drop order of struct fields is in the order of
628// declaration. If the writer is dropped before the readers, sqlite
629// will not clean up its wal files when the writet closes (presumably
630// because the readers are keeping the files locked). Therefore it is
631// important that the r2d2 connection pool is declared before the
632// `Shared` buffer (since it contains the writer).
633#[derive(Clone)]
634pub struct ConnPool {
635 rpool: r2d2::Pool<SqliteConnectionManager>,
636 sh: Arc<Shared>,
637 #[cfg(feature = "tpool")]
638 tpool: Option<Arc<ThreadPool>>
639}
640
641impl ConnPool {
642 /// Return the pool size.
643 ///
644 /// In effect, this is the size of the read-only pool plus one (for the
645 /// read/write connection).
646 #[must_use]
647 pub fn size(&self) -> usize {
648 (self.rpool.max_size() + 1) as usize
649 }
650
651 /// Acquire a read-only connection.
652 ///
653 /// # Errors
654 /// [`r2d2::Error`] will be returned if a read-only connection could not be
655 /// acquired.
656 pub fn reader(
657 &self
658 ) -> Result<PooledConnection<SqliteConnectionManager>, r2d2::Error> {
659 self.rpool.get()
660 }
661
662 /// Acquire the read/write connection.
663 ///
664 /// If the writer is already taken, then block and wait for it to become
665 /// available.
666 #[must_use]
667 #[allow(clippy::significant_drop_tightening)]
668 pub fn writer(&self) -> WrConn {
669 let mut g = self.sh.inner.lock();
670 let conn = loop {
671 if let Some(conn) = g.conn.take() {
672 break conn;
673 }
674 self.sh.signal.wait(&mut g);
675 };
676
677 WrConn {
678 sh: Arc::clone(&self.sh),
679 inner: ManuallyDrop::new(conn)
680 }
681 }
682
683 /// Attempt to acquire the writer connection.
684 ///
685 /// Returns `Some(conn)` if the writer connection was available at the time
686 /// of the request. Returns `None` if the writer has already been taken.
687 #[must_use]
688 pub fn try_writer(&self) -> Option<WrConn> {
689 let conn = self.sh.inner.lock().conn.take()?;
690 Some(WrConn {
691 sh: Arc::clone(&self.sh),
692 inner: ManuallyDrop::new(conn)
693 })
694 }
695}
696
697
698/// Special queries.
699impl ConnPool {
700 /// Return the number of unused pages.
701 ///
702 /// # Errors
703 /// [`Error::R2D2`] indicates that it wasn't possible to acquire a read-only
704 /// connection from the connection pool. [`Error::Sqlite`] means it was not
705 /// possible to query the free page list count.
706 pub fn freelist_count(&self) -> Result<usize, Error> {
707 Ok(self.reader()?.query_row_and_then(
708 "PRAGMA freelist_count;'",
709 [],
710 |row| row.get(0)
711 )?)
712 }
713}
714
715
716/// Read-only connection processing.
717impl ConnPool {
718 /// Run a read-only database operation.
719 ///
720 /// # Errors
721 /// The error type `E` is used to return application-defined errors, though
722 /// it must be possible to convert a `r2d2::Error` into `E` using the `From`
723 /// trait.
724 pub fn run_ro<T, F, E>(&self, f: F) -> Result<T, E>
725 where
726 T: Send + 'static,
727 F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
728 E: From<r2d2::Error>
729 {
730 // Acquire a read-only connection from the pool
731 let conn = self.reader()?;
732
733 // Run caller-provided closure.
734 f(&conn)
735 }
736
737 /// Run a read-only database operation on a thread.
738 ///
739 /// # Errors
740 /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
741 /// connection from the connection pool.
742 ///
743 /// # Panics
744 /// A thread pool must be associated with the [`ConnPool`] or this method
745 /// will panic.
746 #[cfg(feature = "tpool")]
747 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
748 pub fn run_ro_thrd<F>(&self, f: F) -> Result<(), r2d2::Error>
749 where
750 F: FnOnce(&Connection) + Send + 'static
751 {
752 let Some(ref tpool) = self.tpool else {
753 panic!("ConnPool does to have a thread pool");
754 };
755
756 // Acquire a read-only connection from the pool and then run the provided
757 // closure on a thread from the thread pool.
758 let conn = self.reader()?;
759 tpool.execute(move || {
760 f(&conn);
761 });
762 Ok(())
763 }
764
765 /// Run a read-only database operation on a thread, allowing the caller to
766 /// receive the `Result<T, E>` of the supplied closure using a
767 /// one-shot channel.
768 ///
769 /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
770 /// case will be passed as a "set" value through the `swctx` channel, and the
771 /// `Err` case will be passed as a "fail" value.
772 ///
773 /// # Errors
774 /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
775 /// connection from the connection pool.
776 ///
777 /// # Panics
778 /// A thread pool must be associated with the [`ConnPool`] or this method
779 /// will panic.
780 #[cfg(feature = "tpool")]
781 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
782 pub fn run_ro_thrd_result<T, E, F>(
783 &self,
784 f: F
785 ) -> Result<swctx::WaitCtx<T, (), E>, r2d2::Error>
786 where
787 T: Send + 'static,
788 E: fmt::Debug + Send + 'static,
789 F: FnOnce(&Connection) -> Result<T, E> + Send + 'static
790 {
791 let Some(ref tpool) = self.tpool else {
792 panic!("ConnPool does to have a thread pool");
793 };
794
795 let conn = self.reader()?;
796
797 let (sctx, wctx) = swctx::mkpair();
798
799 // Ignore errors relating to pass the results back
800 tpool.execute(move || match f(&conn) {
801 Ok(t) => {
802 let _ = sctx.set(t);
803 }
804 Err(e) => {
805 let _ = sctx.fail(e);
806 }
807 });
808
809 Ok(wctx)
810 }
811}
812
813/// Read/Write connection processing.
814impl ConnPool {
815 /// Run a read/write database operation.
816 ///
817 /// # Errors
818 /// Returns an application-specific type `E` on error.
819 pub fn run_rw<T, E, F>(&self, f: F) -> Result<T, E>
820 where
821 T: Send + 'static,
822 E: fmt::Debug + Send + 'static,
823 F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
824 {
825 let mut conn = self.writer();
826 f(&mut conn)
827 }
828
829 /// Run a read/write database operation on a thread.
830 ///
831 /// The supplied closure should return an `Option<usize>`, where the `Some()`
832 /// case denotes the specified amount of "dirt" should be added to the write
833 /// connection. `None` means no dirt should be added.
834 ///
835 /// # Panics
836 /// A thread pool must be associated with the [`ConnPool`] or this method
837 /// will panic.
838 #[cfg(feature = "tpool")]
839 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
840 pub fn run_rw_thrd<F>(&self, f: F)
841 where
842 F: FnOnce(&mut WrConn) -> Option<usize> + Send + 'static
843 {
844 let Some(ref tpool) = self.tpool else {
845 panic!("ConnPool does to have a thread pool");
846 };
847
848 let mut conn = self.writer();
849 tpool.execute(move || {
850 let dirt = f(&mut conn);
851 if let Some(dirt) = dirt {
852 conn.add_dirt(dirt);
853 }
854 });
855 }
856
857 /// Run a read/write database operation on a thread, allowing the
858 /// caller to receive the `Result<T, E>` of the supplied closure using a
859 /// one-shot channel.
860 ///
861 /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
862 /// case will be passed as a "set" value through the `swctx` channel, and the
863 /// `Err` case will be passed as a "fail" value.
864 ///
865 /// # Panics
866 /// A thread pool must be associated with the [`ConnPool`] or this method
867 /// will panic.
868 #[cfg(feature = "tpool")]
869 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
870 pub fn run_rw_thrd_result<T, E, F>(&self, f: F) -> swctx::WaitCtx<T, (), E>
871 where
872 T: Send + 'static,
873 E: fmt::Debug + Send + 'static,
874 F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
875 {
876 let Some(ref tpool) = self.tpool else {
877 panic!("ConnPool does to have a thread pool");
878 };
879
880 let mut conn = self.writer();
881
882 let (sctx, wctx) = swctx::mkpair();
883
884 tpool.execute(move || match f(&mut conn) {
885 Ok(t) => {
886 let _ = sctx.set(t);
887 }
888 Err(e) => {
889 let _ = sctx.fail(e);
890 }
891 });
892
893 wctx
894 }
895}
896
897
898impl ConnPool {
899 #[cfg(feature = "tpool")]
900 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
901 #[must_use]
902 pub fn incremental_vacuum(
903 &self,
904 n: Option<usize>
905 ) -> swctx::WaitCtx<(), (), rusqlite::Error> {
906 self.run_rw_thrd_result(move |conn| conn.incremental_vacuum(n))
907 }
908}
909
910// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :