sqlsrv/
lib.rs

1#![allow(clippy::doc_markdown)]
2
3//! A library for implementing an in-process SQLite database server.
4//!
5//! # Connection pooling
6//! sqlsrv implements connection pooling that reflects the concurrency model
7//! of SQLite:  It supports multiple parallel readers, but only one writer.
8//!
9//! # Thread pooling
10//! In addition to pooling connections, the library supports optionally using
11//! a thread pool for diaptching database operations onto threads.
12//!
13//! # Incremental auto-clean
14//! The connection pool has built-in support for setting up incremental
15//! autovacuum, and can be configured to implicitly run incremental vacuuming.
16//!
17//! To use this feature, a "maximum dirt" value is configured on the connection
18//! pool.  Whenever the writer connection performs changes to the database it
19//! can add "dirt" to the connection.  When the writer connection is returned
20//! to the connection pool it checks to see if the amount of dirt is equal to
21//! or greater than the configured "maximum dirt" threshold.  If the threshold
22//! has been reached, an incremental autovacuum is performed.
23//!
24//! # Features
25//! | Feature  | Function
26//! |----------|----------
27//! | `tpool`  | Enable functions/methods that use a thread pool.
28
29#![cfg_attr(docsrs, feature(doc_cfg))]
30
31mod changehook;
32mod err;
33mod rawhook;
34mod wrconn;
35
36pub mod utils;
37
38use std::{
39  fmt, mem::ManuallyDrop, num::NonZeroUsize, path::Path, str::FromStr,
40  sync::Arc
41};
42
43use parking_lot::{Condvar, Mutex};
44
45use r2d2::{CustomizeConnection, PooledConnection};
46
47pub use {r2d2, r2d2_sqlite::SqliteConnectionManager, rusqlite};
48
49use rusqlite::{Connection, OpenFlags, params};
50
51#[cfg(feature = "tpool")]
52use threadpool::ThreadPool;
53
54pub use changehook::ChangeLogHook;
55pub use err::Error;
56pub use rawhook::{Action, Hook};
57pub use wrconn::WrConn;
58
59
60/// Wrapper around a SQL functions registration callback used to select which
61/// connection types to perform registrations on.
62pub enum RegOn<F>
63where
64  F: Fn(&Connection) -> Result<(), rusqlite::Error>
65{
66  /// This registration callback should only be called for read-only
67  /// connections.
68  RO(F),
69
70  /// This registration callback should only be called for the read/write
71  /// connections.
72  RW(F),
73
74  /// This registration callback should be called for both the read-only and
75  /// read/write connections.
76  Both(F)
77}
78
79
80type RegCb = dyn Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync;
81
82enum CbType {
83  Ro(Box<RegCb>),
84  Rw(Box<RegCb>),
85  Both(Box<RegCb>)
86}
87
88
89/// Used to register application callbacks to set up database schema.
90pub trait SchemaMgr {
91  /// Called just after the writer connection has been created and is intended
92  /// to perform database initialization (create tables, add predefined rows,
93  /// etc).
94  ///
95  /// `newdb` will be `true` if the database file did not exist prior to
96  /// initialization.
97  ///
98  /// While this method can be used to perform schema upgrades, there are two
99  /// specialized methods (`need_upgrade()` and `upgrade()`) that can be used
100  /// for this purpose instead.
101  ///
102  /// The default implementation does nothing but returns `Ok(())`.
103  ///
104  /// # Errors
105  /// Application-specific error.
106  #[allow(unused_variables)]
107  fn init(&self, conn: &mut Connection, newdb: bool) -> Result<(), Error> {
108    Ok(())
109  }
110
111  /// Application callback used to determine if the database schema is out of
112  /// date and needs to be updated.
113  ///
114  /// The default implementation does nothing but returns `Ok(false)`.
115  ///
116  /// # Errors
117  /// Application-specific error.
118  #[allow(unused_variables)]
119  fn need_upgrade(&self, conn: &Connection) -> Result<bool, Error> {
120    Ok(false)
121  }
122
123  /// Upgrade the database schema.
124  ///
125  /// This is called if [`SchemaMgr::need_upgrade()`] returns `Ok(true)`.
126  ///
127  /// The default implementation does nothing but returns `Ok(())`.
128  ///
129  /// # Errors
130  /// Application-specific error.
131  #[allow(unused_variables)]
132  fn upgrade(&self, conn: &mut Connection) -> Result<(), Error> {
133    Ok(())
134  }
135}
136
137
138#[derive(Clone, Debug)]
139struct AutoClean {
140  /// The amount of dirt that must have accumulated before an incremental
141  /// vacuum is triggered.
142  ///
143  /// If this is set to `0` the incremental vacuum will be run at every
144  /// opportunity.
145  dirt_threshold: usize,
146
147  /// The number of pages to process from the freelist each time an
148  /// incremental autovacuum is triggered.
149  ///
150  /// If this is `None` then all pages will be processed.
151  npages: Option<NonZeroUsize>
152}
153
154
155/// Read-only connection.
156struct RoConn {
157  regfuncs: Vec<CbType>
158}
159
160impl fmt::Debug for RoConn {
161  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162    write!(f, "RoConn {{}}")
163  }
164}
165
166
167impl CustomizeConnection<rusqlite::Connection, rusqlite::Error> for RoConn {
168  fn on_acquire(
169    &self,
170    conn: &mut rusqlite::Connection
171  ) -> Result<(), rusqlite::Error> {
172    conn.pragma_update(None, "foreign_keys", "ON")?;
173
174    for rf in &self.regfuncs {
175      match rf {
176        CbType::Ro(f) | CbType::Both(f) => {
177          f(conn)?;
178        }
179        CbType::Rw(_) => {}
180      }
181    }
182
183    Ok(())
184  }
185
186  fn on_release(&self, _conn: rusqlite::Connection) {}
187}
188
189
190/// Builder for constructing a [`ConnPool`] object.
191pub struct Builder {
192  schmgr: Box<dyn SchemaMgr>,
193  full_vacuum: bool,
194  max_readers: usize,
195  autoclean: Option<AutoClean>,
196  hook: Option<Arc<dyn Hook + Send + Sync>>,
197  regfuncs: Option<Vec<CbType>>,
198  #[cfg(feature = "tpool")]
199  tpool: Option<Arc<ThreadPool>>
200}
201
202/// Internal methods.
203impl Builder {
204  /// Open the writer connection.
205  fn open_writer(&self, fname: &Path) -> Result<Connection, rusqlite::Error> {
206    let conn = Connection::open(fname)?;
207    conn.pragma_update(None, "journal_mode", "WAL")?;
208    conn.pragma_update(None, "foreign_keys", "ON")?;
209
210    // Only enable incremental auto vacuum if a dirt watermark has been
211    // configured.
212    if self.autoclean.is_some() {
213      conn.pragma_update(None, "auto_vacuum", "INCREMENTAL")?;
214    }
215
216    Ok(conn)
217  }
218
219  /// Run a full vacuum.
220  ///
221  /// This is an internal function that may be called by `build()` if a full
222  /// vacuum has been requested.
223  fn full_vacuum(conn: &Connection) -> Result<(), rusqlite::Error> {
224    conn.execute("VACUUM;", params![])?;
225    Ok(())
226  }
227
228  fn create_ro_pool(
229    &self,
230    fname: &Path,
231    regfuncs: Vec<CbType>
232  ) -> Result<r2d2::Pool<SqliteConnectionManager>, r2d2::Error> {
233    let fl =
234      OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
235    let manager = SqliteConnectionManager::file(fname).with_flags(fl);
236    let roconn_initterm = RoConn { regfuncs };
237    let max_readers = u32::try_from(self.max_readers).unwrap();
238    r2d2::Pool::builder()
239      .max_size(max_readers)
240      .connection_customizer(Box::new(roconn_initterm))
241      .build(manager)
242  }
243}
244
245
246impl Builder {
247  /// Create a new `Builder` for constructing a [`ConnPool`] object.
248  ///
249  /// Default to not run a full vacuum of the database on initialization and
250  /// create 2 read-only connections for the pool.
251  /// No workers thread pool will be used.
252  #[must_use]
253  pub fn new(schmgr: Box<dyn SchemaMgr>) -> Self {
254    Self {
255      schmgr,
256      full_vacuum: false,
257      max_readers: 2,
258      autoclean: None,
259      hook: None,
260      regfuncs: None,
261      #[cfg(feature = "tpool")]
262      tpool: None
263    }
264  }
265
266  /// Trigger a full vacuum when initializing the connection pool.
267  ///
268  /// Operates on an owned `Builder` object.
269  #[must_use]
270  pub const fn init_vacuum(mut self) -> Self {
271    self.full_vacuum = true;
272    self
273  }
274
275  /// Trigger a full vacuum when initializing the connection pool.
276  ///
277  /// Operates on a borrowed `Builder` object.
278  pub const fn init_vacuum_r(&mut self) -> &mut Self {
279    self.full_vacuum = true;
280    self
281  }
282
283  /// Set maximum number of readers in the connection pool.
284  ///
285  /// Operates on an owned `Builder` object.
286  #[must_use]
287  pub const fn max_readers(mut self, n: usize) -> Self {
288    self.max_readers = n;
289    self
290  }
291
292  /// Set maximum number of readers in the connection pool.
293  ///
294  /// Operates on a borrowed `Builder` object.
295  pub const fn max_readers_r(&mut self, n: usize) -> &mut Self {
296    self.max_readers = n;
297    self
298  }
299
300  /// Request that a "raw" update hook be added to the writer connection.
301  ///
302  /// Operates on an owned `Builder` object.
303  #[must_use]
304  pub fn hook(mut self, hook: Arc<dyn Hook + Send + Sync>) -> Self {
305    self.hook = Some(hook);
306    self
307  }
308
309  /// Request that a "raw" update hook be added to the writer connection.
310  ///
311  /// Operates on a borrowed `Builder` object.
312  pub fn hook_r(&mut self, hook: Arc<dyn Hook + Send + Sync>) -> &mut Self {
313    self.hook = Some(hook);
314    self
315  }
316
317  /// Enable incremental autovacuum.
318  ///
319  /// `dirt_watermark` is used to set what amount of "dirt" is required in
320  /// order to trigger an autoclean.  `nfree` is the number of blocks in the
321  /// freelists to process each time the autoclean is run.
322  #[must_use]
323  pub const fn incremental_autoclean(
324    mut self,
325    dirt_watermark: usize,
326    npages: Option<NonZeroUsize>
327  ) -> Self {
328    self.autoclean = Some(AutoClean {
329      dirt_threshold: dirt_watermark,
330      npages
331    });
332    self
333  }
334
335  /// Add a callback to register one or more scalar SQL functions.
336  ///
337  /// The closure should be wrapped in a `RegOn::RO()` if the function should
338  /// only be registered on read-only connections.  `RegOn::RW()` is used to
339  /// register the function on the read/write connection.  Use `RegOn::Both()`
340  /// to register in both read-only and the read/write connection.
341  #[must_use]
342  pub fn reg_scalar_fn<F>(mut self, r: RegOn<F>) -> Self
343  where
344    F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
345  {
346    self.reg_scalar_fn_r(r);
347    self
348  }
349
350  /// Add a callback to register one or more scalar SQL functions.
351  ///
352  /// This is the same as [`Builder::reg_scalar_fn()`], but it operates on
353  /// `&mut Builder` rather than passing ownership.
354  pub fn reg_scalar_fn_r<F>(&mut self, r: RegOn<F>) -> &mut Self
355  where
356    F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
357  {
358    match r {
359      RegOn::RO(f) => {
360        self
361          .regfuncs
362          .get_or_insert(Vec::new())
363          .push(CbType::Ro(Box::new(f)));
364      }
365      RegOn::RW(f) => {
366        self
367          .regfuncs
368          .get_or_insert(Vec::new())
369          .push(CbType::Rw(Box::new(f)));
370      }
371      RegOn::Both(f) => {
372        self
373          .regfuncs
374          .get_or_insert(Vec::new())
375          .push(CbType::Both(Box::new(f)));
376      }
377    }
378    self
379  }
380
381  #[cfg(feature = "tpool")]
382  #[must_use]
383  pub fn thread_pool(mut self, tpool: Arc<ThreadPool>) -> Self {
384    self.tpool = Some(tpool);
385    self
386  }
387
388  #[cfg(feature = "tpool")]
389  pub fn thread_pool_r(&mut self, tpool: Arc<ThreadPool>) -> &mut Self {
390    self.tpool = Some(tpool);
391    self
392  }
393
394  /// Construct a connection pool.
395  ///
396  /// # Errors
397  /// [`Error::Sqlite`] will be returned if a database error occurred.
398  pub fn build<P>(mut self, fname: P) -> Result<ConnPool, Error>
399  where
400    P: AsRef<Path>
401  {
402    // ToDo: Use std::path::absolute() once stabilized
403    let fname = fname.as_ref();
404    let db_exists = fname.exists();
405
406    //
407    // Set up the read/write connection
408    //
409    // This must be done before creating the read-only connection pool, because
410    // at that point the database file must already exist.
411    //
412    let mut conn = self.open_writer(fname)?;
413
414    //
415    // Register read/write connection functions
416    //
417
418    // Option<Vec<T>>  -->  Vec<T>
419    let regfuncs = self.regfuncs.take().unwrap_or_default();
420
421    // Call SQL function registration callbacks for read/write connection.
422    for rf in &regfuncs {
423      match rf {
424        CbType::Rw(f) | CbType::Both(f) => {
425          f(&conn)?;
426        }
427        CbType::Ro(_) => {}
428      }
429    }
430
431    //
432    // Perform schema initialization.
433    //
434    // This must be done after auto_vacuum is set, because auto_vacuum requires
435    // configuration before any tables have been created.
436    // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
437    //
438    self.schmgr.init(&mut conn, !db_exists)?;
439    if self.schmgr.need_upgrade(&conn)? {
440      self.schmgr.upgrade(&mut conn)?;
441    }
442
443    //
444    // Perform a full vacuum if requested to do so.
445    //
446    if self.full_vacuum {
447      Self::full_vacuum(&conn)?;
448    }
449
450    //
451    // Register a callback hook
452    //
453    if let Some(ref hook) = self.hook {
454      rawhook::hook(&conn, hook);
455    }
456
457    //
458    // Set up connection pool for read-only connections.
459    //
460    let rpool = self.create_ro_pool(fname, regfuncs)?;
461
462    //
463    // Prepare shared data
464    //
465    let iconn = InnerWrConn { conn, dirt: 0 };
466    let inner = Inner { conn: Some(iconn) };
467    let sh = Arc::new(Shared {
468      inner: Mutex::new(inner),
469      signal: Condvar::new(),
470      autoclean: self.autoclean.clone()
471    });
472
473    Ok(ConnPool {
474      rpool,
475      sh,
476      #[cfg(feature = "tpool")]
477      tpool: self.tpool
478    })
479  }
480
481
482  /// Construct a connection pool.
483  ///
484  /// Same as [`Builder::build()`], but register a change log callback on the
485  /// writer as well.
486  ///
487  /// This method should not be called if the application has requested to add
488  /// a raw update hook.
489  ///
490  /// # Errors
491  /// [`Error::Sqlite`] is returned for database errors.
492  ///
493  /// # Panics
494  /// This method will panic if a hook has been added to the Builder.
495  pub fn build_with_changelog_hook<P, D, T>(
496    mut self,
497    fname: P,
498    hook: Box<dyn ChangeLogHook<Database = D, Table = T> + Send>
499  ) -> Result<ConnPool, Error>
500  where
501    P: AsRef<Path>,
502    D: FromStr + Send + Sized + 'static,
503    T: FromStr + Send + Sized + 'static
504  {
505    assert!(
506      self.hook.is_some(),
507      "Can't build a connection pool with both a raw and changelog hook"
508    );
509
510    // ToDo: Use std::path::absolute() once stabilized
511    let fname = fname.as_ref();
512    let db_exists = fname.exists();
513
514    //
515    // Set up the read/write connection
516    //
517    // This must be done before creating the read-only connection pool, because
518    // at that point the database file must already exist.
519    //
520    let mut conn = self.open_writer(fname)?;
521
522    //
523    // Register read/write connection functions
524    //
525
526    // Option<Vec<T>>  -->  Vec<T>
527    let regfuncs = self.regfuncs.take().unwrap_or_default();
528
529    // Call SQL function registration callbacks for read/write connection.
530    for rf in &regfuncs {
531      match rf {
532        CbType::Rw(f) | CbType::Both(f) => {
533          f(&conn)?;
534        }
535        CbType::Ro(_) => {}
536      }
537    }
538
539
540    //
541    // Perform schema initialization.
542    //
543    // This must be done after auto_vacuum is set, because auto_vacuum requires
544    // configuration before any tables have been created.
545    // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
546    //
547    self.schmgr.init(&mut conn, !db_exists)?;
548    if self.schmgr.need_upgrade(&conn)? {
549      self.schmgr.upgrade(&mut conn)?;
550    }
551
552    //
553    // Perform a full vacuum if requested to do so.
554    //
555    if self.full_vacuum {
556      Self::full_vacuum(&conn)?;
557    }
558
559    //
560    // Register a callback hook
561    //
562    changehook::hook(&conn, hook);
563
564    //
565    // Set up connection pool for read-only connections.
566    //
567    let rpool = self.create_ro_pool(fname, regfuncs)?;
568
569    //
570    // Prepare shared data
571    //
572    let iconn = InnerWrConn { conn, dirt: 0 };
573    let inner = Inner { conn: Some(iconn) };
574    let sh = Arc::new(Shared {
575      inner: Mutex::new(inner),
576      signal: Condvar::new(),
577      autoclean: self.autoclean.clone()
578    });
579
580    Ok(ConnPool {
581      rpool,
582      sh,
583      #[cfg(feature = "tpool")]
584      tpool: self.tpool
585    })
586  }
587}
588
589
590/// Inner writer connection object.
591///
592/// When the writer is acquired from the connection pool it passes an instance
593/// of this struct to the WrConn object.
594struct InnerWrConn {
595  /// The writer connection.
596  conn: Connection,
597
598  /// Amount of accumulated dirt.
599  ///
600  /// Only used when the autoclean feature is used.
601  dirt: usize
602}
603
604
605struct Inner {
606  /// The writer connection.
607  ///
608  /// This is `None` when a `WrConn` exists, and is set to `Some()` by
609  /// `WrConn`'s Drop implementation.
610  conn: Option<InnerWrConn>
611}
612
613struct Shared {
614  inner: Mutex<Inner>,
615  signal: Condvar,
616  autoclean: Option<AutoClean>
617}
618
619
620/// SQLite connection pool.
621///
622/// This is a specialized connection pool that is defined specifically for
623/// sqlite, and only allows a single writer but multiple readers.
624// Note:  In Rust the drop order of struct fields is in the order of
625//        declaration.  If the writer is dropped before the readers, sqlite
626//        will not clean up its wal files when the writet closes (presumably
627//        because the readers are keeping the files locked).  Therefore it is
628//        important that the r2d2 connection pool is declared before the
629//        `Shared` buffer (since it contains the writer).
630#[derive(Clone)]
631pub struct ConnPool {
632  rpool: r2d2::Pool<SqliteConnectionManager>,
633  sh: Arc<Shared>,
634  #[cfg(feature = "tpool")]
635  tpool: Option<Arc<ThreadPool>>
636}
637
638impl ConnPool {
639  /// Return the pool size.
640  ///
641  /// In effect, this is the size of the read-only pool plus one (for the
642  /// read/write connection).
643  #[must_use]
644  pub fn size(&self) -> usize {
645    (self.rpool.max_size() + 1) as usize
646  }
647
648  /// Acquire a read-only connection.
649  ///
650  /// # Errors
651  /// [`r2d2::Error`] will be returned if a read-only connection could not be
652  /// acquired.
653  pub fn reader(
654    &self
655  ) -> Result<PooledConnection<SqliteConnectionManager>, r2d2::Error> {
656    self.rpool.get()
657  }
658
659  /// Acquire the read/write connection.
660  ///
661  /// If the writer is already taken, then block and wait for it to become
662  /// available.
663  #[must_use]
664  #[allow(clippy::significant_drop_tightening)]
665  pub fn writer(&self) -> WrConn {
666    let mut g = self.sh.inner.lock();
667    let conn = loop {
668      if let Some(conn) = g.conn.take() {
669        break conn;
670      }
671      self.sh.signal.wait(&mut g);
672    };
673
674    WrConn {
675      sh: Arc::clone(&self.sh),
676      inner: ManuallyDrop::new(conn)
677    }
678  }
679
680  /// Attempt to acquire the writer connection.
681  ///
682  /// Returns `Some(conn)` if the writer connection was available at the time
683  /// of the request.  Returns `None` if the writer has already been taken.
684  #[must_use]
685  pub fn try_writer(&self) -> Option<WrConn> {
686    let conn = self.sh.inner.lock().conn.take()?;
687    Some(WrConn {
688      sh: Arc::clone(&self.sh),
689      inner: ManuallyDrop::new(conn)
690    })
691  }
692}
693
694
695/// Special queries.
696impl ConnPool {
697  /// Return the number of unused pages.
698  ///
699  /// # Errors
700  /// [`Error::R2D2`] indicates that it wasn't possible to acquire a read-only
701  /// connection from the connection pool.  [`Error::Sqlite`] means it was not
702  /// possible to query the free page list count.
703  pub fn freelist_count(&self) -> Result<usize, Error> {
704    Ok(self.reader()?.query_row_and_then(
705      "PRAGMA freelist_count;'",
706      [],
707      |row| row.get(0)
708    )?)
709  }
710}
711
712
713/// Read-only connection processing.
714impl ConnPool {
715  /// Run a read-only database operation.
716  ///
717  /// # Errors
718  /// The error type `E` is used to return application-defined errors, though
719  /// it must be possible to convert a `r2d2::Error` into `E` using the `From`
720  /// trait.
721  pub fn run_ro<T, F, E>(&self, f: F) -> Result<T, E>
722  where
723    T: Send + 'static,
724    F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
725    E: From<r2d2::Error>
726  {
727    // Acquire a read-only connection from the pool
728    let conn = self.reader()?;
729
730    // Run caller-provided closure.
731    f(&conn)
732  }
733
734  /// Run a read-only database operation on a thread.
735  ///
736  /// # Errors
737  /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
738  /// connection from the connection pool.
739  ///
740  /// # Panics
741  /// A thread pool must be associated with the [`ConnPool`] or this method
742  /// will panic.
743  #[cfg(feature = "tpool")]
744  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
745  pub fn run_ro_thrd<F>(&self, f: F) -> Result<(), r2d2::Error>
746  where
747    F: FnOnce(&Connection) + Send + 'static
748  {
749    let Some(ref tpool) = self.tpool else {
750      panic!("ConnPool does to have a thread pool");
751    };
752
753    // Acquire a read-only connection from the pool and then run the provided
754    // closure on a thread from the thread pool.
755    let conn = self.reader()?;
756    tpool.execute(move || {
757      f(&conn);
758    });
759    Ok(())
760  }
761
762  /// Run a read-only database operation on a thread, allowing the caller to
763  /// receive the `Result<T, E>` of the supplied closure using a
764  /// one-shot channel.
765  ///
766  /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
767  /// case will be passed as a "set" value through the `swctx` channel, and the
768  /// `Err` case will be passed as a "fail" value.
769  ///
770  /// # Errors
771  /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
772  /// connection from the connection pool.
773  ///
774  /// # Panics
775  /// A thread pool must be associated with the [`ConnPool`] or this method
776  /// will panic.
777  #[cfg(feature = "tpool")]
778  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
779  pub fn run_ro_thrd_result<T, E, F>(
780    &self,
781    f: F
782  ) -> Result<swctx::WaitCtx<T, (), E>, r2d2::Error>
783  where
784    T: Send + 'static,
785    E: fmt::Debug + Send + 'static,
786    F: FnOnce(&Connection) -> Result<T, E> + Send + 'static
787  {
788    let Some(ref tpool) = self.tpool else {
789      panic!("ConnPool does to have a thread pool");
790    };
791
792    let conn = self.reader()?;
793
794    let (sctx, wctx) = swctx::mkpair();
795
796    // Ignore errors relating to pass the results back
797    tpool.execute(move || match f(&conn) {
798      Ok(t) => {
799        let _ = sctx.set(t);
800      }
801      Err(e) => {
802        let _ = sctx.fail(e);
803      }
804    });
805
806    Ok(wctx)
807  }
808}
809
810/// Read/Write connection processing.
811impl ConnPool {
812  /// Run a read/write database operation.
813  ///
814  /// # Errors
815  /// Returns an application-specific type `E` on error.
816  pub fn run_rw<T, E, F>(&self, f: F) -> Result<T, E>
817  where
818    T: Send + 'static,
819    E: fmt::Debug + Send + 'static,
820    F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
821  {
822    let mut conn = self.writer();
823    f(&mut conn)
824  }
825
826  /// Run a read/write database operation on a thread.
827  ///
828  /// The supplied closure should return an `Option<usize>`, where the `Some()`
829  /// case denotes the specified amount of "dirt" should be added to the write
830  /// connection.  `None` means no dirt should be added.
831  ///
832  /// # Panics
833  /// A thread pool must be associated with the [`ConnPool`] or this method
834  /// will panic.
835  #[cfg(feature = "tpool")]
836  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
837  pub fn run_rw_thrd<F>(&self, f: F)
838  where
839    F: FnOnce(&mut WrConn) -> Option<usize> + Send + 'static
840  {
841    let Some(ref tpool) = self.tpool else {
842      panic!("ConnPool does to have a thread pool");
843    };
844
845    let mut conn = self.writer();
846    tpool.execute(move || {
847      let dirt = f(&mut conn);
848      if let Some(dirt) = dirt {
849        conn.add_dirt(dirt);
850      }
851    });
852  }
853
854  /// Run a read/write database operation on a thread, allowing the
855  /// caller to receive the `Result<T, E>` of the supplied closure using a
856  /// one-shot channel.
857  ///
858  /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
859  /// case will be passed as a "set" value through the `swctx` channel, and the
860  /// `Err` case will be passed as a "fail" value.
861  ///
862  /// # Panics
863  /// A thread pool must be associated with the [`ConnPool`] or this method
864  /// will panic.
865  #[cfg(feature = "tpool")]
866  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
867  pub fn run_rw_thrd_result<T, E, F>(&self, f: F) -> swctx::WaitCtx<T, (), E>
868  where
869    T: Send + 'static,
870    E: fmt::Debug + Send + 'static,
871    F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
872  {
873    let Some(ref tpool) = self.tpool else {
874      panic!("ConnPool does to have a thread pool");
875    };
876
877    let mut conn = self.writer();
878
879    let (sctx, wctx) = swctx::mkpair();
880
881    tpool.execute(move || match f(&mut conn) {
882      Ok(t) => {
883        let _ = sctx.set(t);
884      }
885      Err(e) => {
886        let _ = sctx.fail(e);
887      }
888    });
889
890    wctx
891  }
892}
893
894
895impl ConnPool {
896  #[cfg(feature = "tpool")]
897  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
898  #[must_use]
899  pub fn incremental_vacuum(
900    &self,
901    n: Option<usize>
902  ) -> swctx::WaitCtx<(), (), rusqlite::Error> {
903    self.run_rw_thrd_result(move |conn| conn.incremental_vacuum(n))
904  }
905}
906
907// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :