sqlsrv/lib.rs
1#![allow(clippy::doc_markdown)]
2
3//! A library for implementing an in-process SQLite database server.
4//!
5//! # Connection pooling
6//! sqlsrv implements connection pooling that reflects the concurrency model
7//! of SQLite: It supports multiple parallel readers, but only one writer.
8//!
9//! # Thread pooling
10//! In addition to pooling connections, the library supports optionally using
11//! a thread pool for diaptching database operations onto threads.
12//!
13//! # Incremental auto-clean
14//! The connection pool has built-in support for setting up incremental
15//! autovacuum, and can be configured to implicitly run incremental vacuuming.
16//!
17//! To use this feature, a "maximum dirt" value is configured on the connection
18//! pool. Whenever the writer connection performs changes to the database it
19//! can add "dirt" to the connection. When the writer connection is returned
20//! to the connection pool it checks to see if the amount of dirt is equal to
21//! or greater than the configured "maximum dirt" threshold. If the threshold
22//! has been reached, an incremental autovacuum is performed.
23//!
24//! # Features
25//! | Feature | Function
26//! |----------|----------
27//! | `tpool` | Enable functions/methods that use a thread pool.
28
29#![cfg_attr(docsrs, feature(doc_cfg))]
30
31mod changehook;
32mod err;
33mod rawhook;
34mod wrconn;
35
36pub mod utils;
37
38use std::{
39 fmt, mem::ManuallyDrop, num::NonZeroUsize, path::Path, str::FromStr,
40 sync::Arc
41};
42
43use parking_lot::{Condvar, Mutex};
44
45use r2d2::{CustomizeConnection, PooledConnection};
46
47pub use {r2d2, r2d2_sqlite::SqliteConnectionManager, rusqlite};
48
49use rusqlite::{Connection, OpenFlags, params};
50
51#[cfg(feature = "tpool")]
52use threadpool::ThreadPool;
53
54pub use changehook::ChangeLogHook;
55pub use err::Error;
56pub use rawhook::{Action, Hook};
57pub use wrconn::WrConn;
58
59
60/// Wrapper around a SQL functions registration callback used to select which
61/// connection types to perform registrations on.
62pub enum RegOn<F>
63where
64 F: Fn(&Connection) -> Result<(), rusqlite::Error>
65{
66 /// This registration callback should only be called for read-only
67 /// connections.
68 RO(F),
69
70 /// This registration callback should only be called for the read/write
71 /// connections.
72 RW(F),
73
74 /// This registration callback should be called for both the read-only and
75 /// read/write connections.
76 Both(F)
77}
78
79
80type RegCb = dyn Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync;
81
82enum CbType {
83 Ro(Box<RegCb>),
84 Rw(Box<RegCb>),
85 Both(Box<RegCb>)
86}
87
88
89/// Used to register application callbacks to set up database schema.
90pub trait SchemaMgr {
91 /// Called just after the writer connection has been created and is intended
92 /// to perform database initialization (create tables, add predefined rows,
93 /// etc).
94 ///
95 /// `newdb` will be `true` if the database file did not exist prior to
96 /// initialization.
97 ///
98 /// While this method can be used to perform schema upgrades, there are two
99 /// specialized methods (`need_upgrade()` and `upgrade()`) that can be used
100 /// for this purpose instead.
101 ///
102 /// The default implementation does nothing but returns `Ok(())`.
103 ///
104 /// # Errors
105 /// Application-specific error.
106 #[allow(unused_variables)]
107 fn init(&self, conn: &mut Connection, newdb: bool) -> Result<(), Error> {
108 Ok(())
109 }
110
111 /// Application callback used to determine if the database schema is out of
112 /// date and needs to be updated.
113 ///
114 /// The default implementation does nothing but returns `Ok(false)`.
115 ///
116 /// # Errors
117 /// Application-specific error.
118 #[allow(unused_variables)]
119 fn need_upgrade(&self, conn: &Connection) -> Result<bool, Error> {
120 Ok(false)
121 }
122
123 /// Upgrade the database schema.
124 ///
125 /// This is called if [`SchemaMgr::need_upgrade()`] returns `Ok(true)`.
126 ///
127 /// The default implementation does nothing but returns `Ok(())`.
128 ///
129 /// # Errors
130 /// Application-specific error.
131 #[allow(unused_variables)]
132 fn upgrade(&self, conn: &mut Connection) -> Result<(), Error> {
133 Ok(())
134 }
135}
136
137
138#[derive(Clone, Debug)]
139struct AutoClean {
140 /// The amount of dirt that must have accumulated before an incremental
141 /// vacuum is triggered.
142 ///
143 /// If this is set to `0` the incremental vacuum will be run at every
144 /// opportunity.
145 dirt_threshold: usize,
146
147 /// The number of pages to process from the freelist each time an
148 /// incremental autovacuum is triggered.
149 ///
150 /// If this is `None` then all pages will be processed.
151 npages: Option<NonZeroUsize>
152}
153
154
155/// Read-only connection.
156struct RoConn {
157 regfuncs: Vec<CbType>
158}
159
160impl fmt::Debug for RoConn {
161 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162 write!(f, "RoConn {{}}")
163 }
164}
165
166
167impl CustomizeConnection<rusqlite::Connection, rusqlite::Error> for RoConn {
168 fn on_acquire(
169 &self,
170 conn: &mut rusqlite::Connection
171 ) -> Result<(), rusqlite::Error> {
172 conn.pragma_update(None, "foreign_keys", "ON")?;
173
174 for rf in &self.regfuncs {
175 match rf {
176 CbType::Ro(f) | CbType::Both(f) => {
177 f(conn)?;
178 }
179 CbType::Rw(_) => {}
180 }
181 }
182
183 Ok(())
184 }
185
186 fn on_release(&self, _conn: rusqlite::Connection) {}
187}
188
189
190/// Builder for constructing a [`ConnPool`] object.
191pub struct Builder {
192 schmgr: Box<dyn SchemaMgr>,
193 full_vacuum: bool,
194 max_readers: usize,
195 autoclean: Option<AutoClean>,
196 hook: Option<Arc<dyn Hook + Send + Sync>>,
197 regfuncs: Option<Vec<CbType>>,
198 #[cfg(feature = "tpool")]
199 tpool: Option<Arc<ThreadPool>>
200}
201
202/// Internal methods.
203impl Builder {
204 /// Open the writer connection.
205 fn open_writer(&self, fname: &Path) -> Result<Connection, rusqlite::Error> {
206 let conn = Connection::open(fname)?;
207 conn.pragma_update(None, "journal_mode", "WAL")?;
208 conn.pragma_update(None, "foreign_keys", "ON")?;
209
210 // Only enable incremental auto vacuum if a dirt watermark has been
211 // configured.
212 if self.autoclean.is_some() {
213 conn.pragma_update(None, "auto_vacuum", "INCREMENTAL")?;
214 }
215
216 Ok(conn)
217 }
218
219 /// Run a full vacuum.
220 ///
221 /// This is an internal function that may be called by `build()` if a full
222 /// vacuum has been requested.
223 fn full_vacuum(conn: &Connection) -> Result<(), rusqlite::Error> {
224 conn.execute("VACUUM;", params![])?;
225 Ok(())
226 }
227
228 fn create_ro_pool(
229 &self,
230 fname: &Path,
231 regfuncs: Vec<CbType>
232 ) -> Result<r2d2::Pool<SqliteConnectionManager>, r2d2::Error> {
233 let fl =
234 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
235 let manager = SqliteConnectionManager::file(fname).with_flags(fl);
236 let roconn_initterm = RoConn { regfuncs };
237 let max_readers = u32::try_from(self.max_readers).unwrap();
238 r2d2::Pool::builder()
239 .max_size(max_readers)
240 .connection_customizer(Box::new(roconn_initterm))
241 .build(manager)
242 }
243}
244
245
246impl Builder {
247 /// Create a new `Builder` for constructing a [`ConnPool`] object.
248 ///
249 /// Default to not run a full vacuum of the database on initialization and
250 /// create 2 read-only connections for the pool.
251 /// No workers thread pool will be used.
252 #[must_use]
253 pub fn new(schmgr: Box<dyn SchemaMgr>) -> Self {
254 Self {
255 schmgr,
256 full_vacuum: false,
257 max_readers: 2,
258 autoclean: None,
259 hook: None,
260 regfuncs: None,
261 #[cfg(feature = "tpool")]
262 tpool: None
263 }
264 }
265
266 /// Trigger a full vacuum when initializing the connection pool.
267 ///
268 /// Operates on an owned `Builder` object.
269 #[must_use]
270 pub const fn init_vacuum(mut self) -> Self {
271 self.full_vacuum = true;
272 self
273 }
274
275 /// Trigger a full vacuum when initializing the connection pool.
276 ///
277 /// Operates on a borrowed `Builder` object.
278 pub const fn init_vacuum_r(&mut self) -> &mut Self {
279 self.full_vacuum = true;
280 self
281 }
282
283 /// Set maximum number of readers in the connection pool.
284 ///
285 /// Operates on an owned `Builder` object.
286 #[must_use]
287 pub const fn max_readers(mut self, n: usize) -> Self {
288 self.max_readers = n;
289 self
290 }
291
292 /// Set maximum number of readers in the connection pool.
293 ///
294 /// Operates on a borrowed `Builder` object.
295 pub const fn max_readers_r(&mut self, n: usize) -> &mut Self {
296 self.max_readers = n;
297 self
298 }
299
300 /// Request that a "raw" update hook be added to the writer connection.
301 ///
302 /// Operates on an owned `Builder` object.
303 #[must_use]
304 pub fn hook(mut self, hook: Arc<dyn Hook + Send + Sync>) -> Self {
305 self.hook = Some(hook);
306 self
307 }
308
309 /// Request that a "raw" update hook be added to the writer connection.
310 ///
311 /// Operates on a borrowed `Builder` object.
312 pub fn hook_r(&mut self, hook: Arc<dyn Hook + Send + Sync>) -> &mut Self {
313 self.hook = Some(hook);
314 self
315 }
316
317 /// Enable incremental autovacuum.
318 ///
319 /// `dirt_watermark` is used to set what amount of "dirt" is required in
320 /// order to trigger an autoclean. `nfree` is the number of blocks in the
321 /// freelists to process each time the autoclean is run.
322 #[must_use]
323 pub const fn incremental_autoclean(
324 mut self,
325 dirt_watermark: usize,
326 npages: Option<NonZeroUsize>
327 ) -> Self {
328 self.autoclean = Some(AutoClean {
329 dirt_threshold: dirt_watermark,
330 npages
331 });
332 self
333 }
334
335 /// Add a callback to register one or more scalar SQL functions.
336 ///
337 /// The closure should be wrapped in a `RegOn::RO()` if the function should
338 /// only be registered on read-only connections. `RegOn::RW()` is used to
339 /// register the function on the read/write connection. Use `RegOn::Both()`
340 /// to register in both read-only and the read/write connection.
341 #[must_use]
342 pub fn reg_scalar_fn<F>(mut self, r: RegOn<F>) -> Self
343 where
344 F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
345 {
346 self.reg_scalar_fn_r(r);
347 self
348 }
349
350 /// Add a callback to register one or more scalar SQL functions.
351 ///
352 /// This is the same as [`Builder::reg_scalar_fn()`], but it operates on
353 /// `&mut Builder` rather than passing ownership.
354 pub fn reg_scalar_fn_r<F>(&mut self, r: RegOn<F>) -> &mut Self
355 where
356 F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
357 {
358 match r {
359 RegOn::RO(f) => {
360 self
361 .regfuncs
362 .get_or_insert(Vec::new())
363 .push(CbType::Ro(Box::new(f)));
364 }
365 RegOn::RW(f) => {
366 self
367 .regfuncs
368 .get_or_insert(Vec::new())
369 .push(CbType::Rw(Box::new(f)));
370 }
371 RegOn::Both(f) => {
372 self
373 .regfuncs
374 .get_or_insert(Vec::new())
375 .push(CbType::Both(Box::new(f)));
376 }
377 }
378 self
379 }
380
381 #[cfg(feature = "tpool")]
382 #[must_use]
383 pub fn thread_pool(mut self, tpool: Arc<ThreadPool>) -> Self {
384 self.tpool = Some(tpool);
385 self
386 }
387
388 #[cfg(feature = "tpool")]
389 pub fn thread_pool_r(&mut self, tpool: Arc<ThreadPool>) -> &mut Self {
390 self.tpool = Some(tpool);
391 self
392 }
393
394 /// Construct a connection pool.
395 ///
396 /// # Errors
397 /// [`Error::Sqlite`] will be returned if a database error occurred.
398 pub fn build<P>(mut self, fname: P) -> Result<ConnPool, Error>
399 where
400 P: AsRef<Path>
401 {
402 // ToDo: Use std::path::absolute() once stabilized
403 let fname = fname.as_ref();
404 let db_exists = fname.exists();
405
406 //
407 // Set up the read/write connection
408 //
409 // This must be done before creating the read-only connection pool, because
410 // at that point the database file must already exist.
411 //
412 let mut conn = self.open_writer(fname)?;
413
414 //
415 // Register read/write connection functions
416 //
417
418 // Option<Vec<T>> --> Vec<T>
419 let regfuncs = self.regfuncs.take().unwrap_or_default();
420
421 // Call SQL function registration callbacks for read/write connection.
422 for rf in ®funcs {
423 match rf {
424 CbType::Rw(f) | CbType::Both(f) => {
425 f(&conn)?;
426 }
427 CbType::Ro(_) => {}
428 }
429 }
430
431 //
432 // Perform schema initialization.
433 //
434 // This must be done after auto_vacuum is set, because auto_vacuum requires
435 // configuration before any tables have been created.
436 // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
437 //
438 self.schmgr.init(&mut conn, !db_exists)?;
439 if self.schmgr.need_upgrade(&conn)? {
440 self.schmgr.upgrade(&mut conn)?;
441 }
442
443 //
444 // Perform a full vacuum if requested to do so.
445 //
446 if self.full_vacuum {
447 Self::full_vacuum(&conn)?;
448 }
449
450 //
451 // Register a callback hook
452 //
453 if let Some(ref hook) = self.hook {
454 rawhook::hook(&conn, hook);
455 }
456
457 //
458 // Set up connection pool for read-only connections.
459 //
460 let rpool = self.create_ro_pool(fname, regfuncs)?;
461
462 //
463 // Prepare shared data
464 //
465 let iconn = InnerWrConn { conn, dirt: 0 };
466 let inner = Inner { conn: Some(iconn) };
467 let sh = Arc::new(Shared {
468 inner: Mutex::new(inner),
469 signal: Condvar::new(),
470 autoclean: self.autoclean.clone()
471 });
472
473 Ok(ConnPool {
474 rpool,
475 sh,
476 #[cfg(feature = "tpool")]
477 tpool: self.tpool
478 })
479 }
480
481
482 /// Construct a connection pool.
483 ///
484 /// Same as [`Builder::build()`], but register a change log callback on the
485 /// writer as well.
486 ///
487 /// This method should not be called if the application has requested to add
488 /// a raw update hook.
489 ///
490 /// # Errors
491 /// [`Error::Sqlite`] is returned for database errors.
492 ///
493 /// # Panics
494 /// This method will panic if a hook has been added to the Builder.
495 pub fn build_with_changelog_hook<P, D, T>(
496 mut self,
497 fname: P,
498 hook: Box<dyn ChangeLogHook<Database = D, Table = T> + Send>
499 ) -> Result<ConnPool, Error>
500 where
501 P: AsRef<Path>,
502 D: FromStr + Send + Sized + 'static,
503 T: FromStr + Send + Sized + 'static
504 {
505 assert!(
506 self.hook.is_some(),
507 "Can't build a connection pool with both a raw and changelog hook"
508 );
509
510 // ToDo: Use std::path::absolute() once stabilized
511 let fname = fname.as_ref();
512 let db_exists = fname.exists();
513
514 //
515 // Set up the read/write connection
516 //
517 // This must be done before creating the read-only connection pool, because
518 // at that point the database file must already exist.
519 //
520 let mut conn = self.open_writer(fname)?;
521
522 //
523 // Register read/write connection functions
524 //
525
526 // Option<Vec<T>> --> Vec<T>
527 let regfuncs = self.regfuncs.take().unwrap_or_default();
528
529 // Call SQL function registration callbacks for read/write connection.
530 for rf in ®funcs {
531 match rf {
532 CbType::Rw(f) | CbType::Both(f) => {
533 f(&conn)?;
534 }
535 CbType::Ro(_) => {}
536 }
537 }
538
539
540 //
541 // Perform schema initialization.
542 //
543 // This must be done after auto_vacuum is set, because auto_vacuum requires
544 // configuration before any tables have been created.
545 // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
546 //
547 self.schmgr.init(&mut conn, !db_exists)?;
548 if self.schmgr.need_upgrade(&conn)? {
549 self.schmgr.upgrade(&mut conn)?;
550 }
551
552 //
553 // Perform a full vacuum if requested to do so.
554 //
555 if self.full_vacuum {
556 Self::full_vacuum(&conn)?;
557 }
558
559 //
560 // Register a callback hook
561 //
562 changehook::hook(&conn, hook);
563
564 //
565 // Set up connection pool for read-only connections.
566 //
567 let rpool = self.create_ro_pool(fname, regfuncs)?;
568
569 //
570 // Prepare shared data
571 //
572 let iconn = InnerWrConn { conn, dirt: 0 };
573 let inner = Inner { conn: Some(iconn) };
574 let sh = Arc::new(Shared {
575 inner: Mutex::new(inner),
576 signal: Condvar::new(),
577 autoclean: self.autoclean.clone()
578 });
579
580 Ok(ConnPool {
581 rpool,
582 sh,
583 #[cfg(feature = "tpool")]
584 tpool: self.tpool
585 })
586 }
587}
588
589
590/// Inner writer connection object.
591///
592/// When the writer is acquired from the connection pool it passes an instance
593/// of this struct to the WrConn object.
594struct InnerWrConn {
595 /// The writer connection.
596 conn: Connection,
597
598 /// Amount of accumulated dirt.
599 ///
600 /// Only used when the autoclean feature is used.
601 dirt: usize
602}
603
604
605struct Inner {
606 /// The writer connection.
607 ///
608 /// This is `None` when a `WrConn` exists, and is set to `Some()` by
609 /// `WrConn`'s Drop implementation.
610 conn: Option<InnerWrConn>
611}
612
613struct Shared {
614 inner: Mutex<Inner>,
615 signal: Condvar,
616 autoclean: Option<AutoClean>
617}
618
619
620/// SQLite connection pool.
621///
622/// This is a specialized connection pool that is defined specifically for
623/// sqlite, and only allows a single writer but multiple readers.
624// Note: In Rust the drop order of struct fields is in the order of
625// declaration. If the writer is dropped before the readers, sqlite
626// will not clean up its wal files when the writet closes (presumably
627// because the readers are keeping the files locked). Therefore it is
628// important that the r2d2 connection pool is declared before the
629// `Shared` buffer (since it contains the writer).
630#[derive(Clone)]
631pub struct ConnPool {
632 rpool: r2d2::Pool<SqliteConnectionManager>,
633 sh: Arc<Shared>,
634 #[cfg(feature = "tpool")]
635 tpool: Option<Arc<ThreadPool>>
636}
637
638impl ConnPool {
639 /// Return the pool size.
640 ///
641 /// In effect, this is the size of the read-only pool plus one (for the
642 /// read/write connection).
643 #[must_use]
644 pub fn size(&self) -> usize {
645 (self.rpool.max_size() + 1) as usize
646 }
647
648 /// Acquire a read-only connection.
649 ///
650 /// # Errors
651 /// [`r2d2::Error`] will be returned if a read-only connection could not be
652 /// acquired.
653 pub fn reader(
654 &self
655 ) -> Result<PooledConnection<SqliteConnectionManager>, r2d2::Error> {
656 self.rpool.get()
657 }
658
659 /// Acquire the read/write connection.
660 ///
661 /// If the writer is already taken, then block and wait for it to become
662 /// available.
663 #[must_use]
664 #[allow(clippy::significant_drop_tightening)]
665 pub fn writer(&self) -> WrConn {
666 let mut g = self.sh.inner.lock();
667 let conn = loop {
668 if let Some(conn) = g.conn.take() {
669 break conn;
670 }
671 self.sh.signal.wait(&mut g);
672 };
673
674 WrConn {
675 sh: Arc::clone(&self.sh),
676 inner: ManuallyDrop::new(conn)
677 }
678 }
679
680 /// Attempt to acquire the writer connection.
681 ///
682 /// Returns `Some(conn)` if the writer connection was available at the time
683 /// of the request. Returns `None` if the writer has already been taken.
684 #[must_use]
685 pub fn try_writer(&self) -> Option<WrConn> {
686 let conn = self.sh.inner.lock().conn.take()?;
687 Some(WrConn {
688 sh: Arc::clone(&self.sh),
689 inner: ManuallyDrop::new(conn)
690 })
691 }
692}
693
694
695/// Special queries.
696impl ConnPool {
697 /// Return the number of unused pages.
698 ///
699 /// # Errors
700 /// [`Error::R2D2`] indicates that it wasn't possible to acquire a read-only
701 /// connection from the connection pool. [`Error::Sqlite`] means it was not
702 /// possible to query the free page list count.
703 pub fn freelist_count(&self) -> Result<usize, Error> {
704 Ok(self.reader()?.query_row_and_then(
705 "PRAGMA freelist_count;'",
706 [],
707 |row| row.get(0)
708 )?)
709 }
710}
711
712
713/// Read-only connection processing.
714impl ConnPool {
715 /// Run a read-only database operation.
716 ///
717 /// # Errors
718 /// The error type `E` is used to return application-defined errors, though
719 /// it must be possible to convert a `r2d2::Error` into `E` using the `From`
720 /// trait.
721 pub fn run_ro<T, F, E>(&self, f: F) -> Result<T, E>
722 where
723 T: Send + 'static,
724 F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
725 E: From<r2d2::Error>
726 {
727 // Acquire a read-only connection from the pool
728 let conn = self.reader()?;
729
730 // Run caller-provided closure.
731 f(&conn)
732 }
733
734 /// Run a read-only database operation on a thread.
735 ///
736 /// # Errors
737 /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
738 /// connection from the connection pool.
739 ///
740 /// # Panics
741 /// A thread pool must be associated with the [`ConnPool`] or this method
742 /// will panic.
743 #[cfg(feature = "tpool")]
744 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
745 pub fn run_ro_thrd<F>(&self, f: F) -> Result<(), r2d2::Error>
746 where
747 F: FnOnce(&Connection) + Send + 'static
748 {
749 let Some(ref tpool) = self.tpool else {
750 panic!("ConnPool does to have a thread pool");
751 };
752
753 // Acquire a read-only connection from the pool and then run the provided
754 // closure on a thread from the thread pool.
755 let conn = self.reader()?;
756 tpool.execute(move || {
757 f(&conn);
758 });
759 Ok(())
760 }
761
762 /// Run a read-only database operation on a thread, allowing the caller to
763 /// receive the `Result<T, E>` of the supplied closure using a
764 /// one-shot channel.
765 ///
766 /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
767 /// case will be passed as a "set" value through the `swctx` channel, and the
768 /// `Err` case will be passed as a "fail" value.
769 ///
770 /// # Errors
771 /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
772 /// connection from the connection pool.
773 ///
774 /// # Panics
775 /// A thread pool must be associated with the [`ConnPool`] or this method
776 /// will panic.
777 #[cfg(feature = "tpool")]
778 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
779 pub fn run_ro_thrd_result<T, E, F>(
780 &self,
781 f: F
782 ) -> Result<swctx::WaitCtx<T, (), E>, r2d2::Error>
783 where
784 T: Send + 'static,
785 E: fmt::Debug + Send + 'static,
786 F: FnOnce(&Connection) -> Result<T, E> + Send + 'static
787 {
788 let Some(ref tpool) = self.tpool else {
789 panic!("ConnPool does to have a thread pool");
790 };
791
792 let conn = self.reader()?;
793
794 let (sctx, wctx) = swctx::mkpair();
795
796 // Ignore errors relating to pass the results back
797 tpool.execute(move || match f(&conn) {
798 Ok(t) => {
799 let _ = sctx.set(t);
800 }
801 Err(e) => {
802 let _ = sctx.fail(e);
803 }
804 });
805
806 Ok(wctx)
807 }
808}
809
810/// Read/Write connection processing.
811impl ConnPool {
812 /// Run a read/write database operation.
813 ///
814 /// # Errors
815 /// Returns an application-specific type `E` on error.
816 pub fn run_rw<T, E, F>(&self, f: F) -> Result<T, E>
817 where
818 T: Send + 'static,
819 E: fmt::Debug + Send + 'static,
820 F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
821 {
822 let mut conn = self.writer();
823 f(&mut conn)
824 }
825
826 /// Run a read/write database operation on a thread.
827 ///
828 /// The supplied closure should return an `Option<usize>`, where the `Some()`
829 /// case denotes the specified amount of "dirt" should be added to the write
830 /// connection. `None` means no dirt should be added.
831 ///
832 /// # Panics
833 /// A thread pool must be associated with the [`ConnPool`] or this method
834 /// will panic.
835 #[cfg(feature = "tpool")]
836 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
837 pub fn run_rw_thrd<F>(&self, f: F)
838 where
839 F: FnOnce(&mut WrConn) -> Option<usize> + Send + 'static
840 {
841 let Some(ref tpool) = self.tpool else {
842 panic!("ConnPool does to have a thread pool");
843 };
844
845 let mut conn = self.writer();
846 tpool.execute(move || {
847 let dirt = f(&mut conn);
848 if let Some(dirt) = dirt {
849 conn.add_dirt(dirt);
850 }
851 });
852 }
853
854 /// Run a read/write database operation on a thread, allowing the
855 /// caller to receive the `Result<T, E>` of the supplied closure using a
856 /// one-shot channel.
857 ///
858 /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
859 /// case will be passed as a "set" value through the `swctx` channel, and the
860 /// `Err` case will be passed as a "fail" value.
861 ///
862 /// # Panics
863 /// A thread pool must be associated with the [`ConnPool`] or this method
864 /// will panic.
865 #[cfg(feature = "tpool")]
866 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
867 pub fn run_rw_thrd_result<T, E, F>(&self, f: F) -> swctx::WaitCtx<T, (), E>
868 where
869 T: Send + 'static,
870 E: fmt::Debug + Send + 'static,
871 F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
872 {
873 let Some(ref tpool) = self.tpool else {
874 panic!("ConnPool does to have a thread pool");
875 };
876
877 let mut conn = self.writer();
878
879 let (sctx, wctx) = swctx::mkpair();
880
881 tpool.execute(move || match f(&mut conn) {
882 Ok(t) => {
883 let _ = sctx.set(t);
884 }
885 Err(e) => {
886 let _ = sctx.fail(e);
887 }
888 });
889
890 wctx
891 }
892}
893
894
895impl ConnPool {
896 #[cfg(feature = "tpool")]
897 #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
898 #[must_use]
899 pub fn incremental_vacuum(
900 &self,
901 n: Option<usize>
902 ) -> swctx::WaitCtx<(), (), rusqlite::Error> {
903 self.run_rw_thrd_result(move |conn| conn.incremental_vacuum(n))
904 }
905}
906
907// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :