sqlsrv/
lib.rs

1#![allow(clippy::doc_markdown)]
2
3//! A library for implementing an in-process SQLite database server.
4//!
5//! # Connection pooling
6//! sqlsrv implements connection pooling that reflects the concurrency model
7//! of SQLite:  It supports multiple parallel readers, but only one writer.
8//!
9//! # Thread pooling
10//! In addition to pooling connections, the library supports optionally using
11//! a thread pool for diaptching database operations onto threads.
12//!
13//! # Incremental auto-clean
14//! The connection pool has built-in support for setting up incremental
15//! autovacuum, and can be configured to implicitly run incremental vacuuming.
16//!
17//! To use this feature, a "maximum dirt" value is configured on the connection
18//! pool.  Whenever the writer connection performs changes to the database it
19//! can add "dirt" to the connection.  When the writer connection is returned
20//! to the connection pool it checks to see if the amount of dirt is equal to
21//! or greater than the configured "maximum dirt" threshold.  If the threshold
22//! has been reached, an incremental autovacuum is performed.
23//!
24//! # Features
25//! | Feature  | Function
26//! |----------|----------
27//! | `tpool`  | Enable functions/methods that use a thread pool.
28
29#![cfg_attr(docsrs, feature(doc_cfg))]
30
31mod changehook;
32mod err;
33mod rawhook;
34mod wrconn;
35
36pub mod utils;
37
38use std::{
39  fmt, mem::ManuallyDrop, num::NonZeroUsize, path::Path, str::FromStr,
40  sync::Arc
41};
42
43use parking_lot::{Condvar, Mutex};
44
45use r2d2::{CustomizeConnection, PooledConnection};
46
47pub use r2d2_sqlite::SqliteConnectionManager;
48
49pub use r2d2;
50pub use rusqlite;
51
52use rusqlite::{Connection, OpenFlags, params};
53
54#[cfg(feature = "tpool")]
55use threadpool::ThreadPool;
56
57pub use changehook::ChangeLogHook;
58pub use err::Error;
59pub use rawhook::{Action, Hook};
60pub use wrconn::WrConn;
61
62
63/// Wrapper around a SQL functions registration callback used to select which
64/// connection types to perform registrations on.
65pub enum RegOn<F>
66where
67  F: Fn(&Connection) -> Result<(), rusqlite::Error>
68{
69  /// This registration callback should only be called for read-only
70  /// connections.
71  RO(F),
72
73  /// This registration callback should only be called for the read/write
74  /// connections.
75  RW(F),
76
77  /// This registration callback should be called for both the read-only and
78  /// read/write connections.
79  Both(F)
80}
81
82
83type RegCb = dyn Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync;
84
85enum CbType {
86  Ro(Box<RegCb>),
87  Rw(Box<RegCb>),
88  Both(Box<RegCb>)
89}
90
91
92/// Used to register application callbacks to set up database schema.
93pub trait SchemaMgr {
94  /// Called just after the writer connection has been created and is intended
95  /// to perform database initialization (create tables, add predefined rows,
96  /// etc).
97  ///
98  /// `newdb` will be `true` if the database file did not exist prior to
99  /// initialization.
100  ///
101  /// While this method can be used to perform schema upgrades, there are two
102  /// specialized methods (`need_upgrade()` and `upgrade()`) that can be used
103  /// for this purpose instead.
104  ///
105  /// The default implementation does nothing but returns `Ok(())`.
106  ///
107  /// # Errors
108  /// Application-specific error.
109  #[allow(unused_variables)]
110  fn init(&self, conn: &mut Connection, newdb: bool) -> Result<(), Error> {
111    Ok(())
112  }
113
114  /// Application callback used to determine if the database schema is out of
115  /// date and needs to be updated.
116  ///
117  /// The default implementation does nothing but returns `Ok(false)`.
118  ///
119  /// # Errors
120  /// Application-specific error.
121  #[allow(unused_variables)]
122  fn need_upgrade(&self, conn: &Connection) -> Result<bool, Error> {
123    Ok(false)
124  }
125
126  /// Upgrade the database schema.
127  ///
128  /// This is called if [`SchemaMgr::need_upgrade()`] returns `Ok(true)`.
129  ///
130  /// The default implementation does nothing but returns `Ok(())`.
131  ///
132  /// # Errors
133  /// Application-specific error.
134  #[allow(unused_variables)]
135  fn upgrade(&self, conn: &mut Connection) -> Result<(), Error> {
136    Ok(())
137  }
138}
139
140
141#[derive(Clone, Debug)]
142struct AutoClean {
143  /// The amount of dirt that must have accumulated before an incremental
144  /// vacuum is triggered.
145  ///
146  /// If this is set to `0` the incremental vacuum will be run at every
147  /// opportunity.
148  dirt_threshold: usize,
149
150  /// The number of pages to process from the freelist each time an
151  /// incremental autovacuum is triggered.
152  ///
153  /// If this is `None` then all pages will be processed.
154  npages: Option<NonZeroUsize>
155}
156
157
158/// Read-only connection.
159struct RoConn {
160  regfuncs: Vec<CbType>
161}
162
163impl fmt::Debug for RoConn {
164  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165    write!(f, "RoConn {{}}")
166  }
167}
168
169
170impl CustomizeConnection<rusqlite::Connection, rusqlite::Error> for RoConn {
171  fn on_acquire(
172    &self,
173    conn: &mut rusqlite::Connection
174  ) -> Result<(), rusqlite::Error> {
175    conn.pragma_update(None, "foreign_keys", "ON")?;
176
177    for rf in &self.regfuncs {
178      match rf {
179        CbType::Ro(f) | CbType::Both(f) => {
180          f(conn)?;
181        }
182        CbType::Rw(_) => {}
183      }
184    }
185
186    Ok(())
187  }
188
189  fn on_release(&self, _conn: rusqlite::Connection) {}
190}
191
192
193/// Builder for constructing a [`ConnPool`] object.
194pub struct Builder {
195  schmgr: Box<dyn SchemaMgr>,
196  full_vacuum: bool,
197  max_readers: usize,
198  autoclean: Option<AutoClean>,
199  hook: Option<Arc<dyn Hook + Send + Sync>>,
200  regfuncs: Option<Vec<CbType>>,
201  #[cfg(feature = "tpool")]
202  tpool: Option<Arc<ThreadPool>>
203}
204
205/// Internal methods.
206impl Builder {
207  /// Open the writer connection.
208  fn open_writer(&self, fname: &Path) -> Result<Connection, rusqlite::Error> {
209    let conn = Connection::open(fname)?;
210    conn.pragma_update(None, "journal_mode", "WAL")?;
211    conn.pragma_update(None, "foreign_keys", "ON")?;
212
213    // Only enable incremental auto vacuum if a dirt watermark has been
214    // configured.
215    if self.autoclean.is_some() {
216      conn.pragma_update(None, "auto_vacuum", "INCREMENTAL")?;
217    }
218
219    Ok(conn)
220  }
221
222  /// Run a full vacuum.
223  ///
224  /// This is an internal function that may be called by `build()` if a full
225  /// vacuum has been requested.
226  fn full_vacuum(conn: &Connection) -> Result<(), rusqlite::Error> {
227    conn.execute("VACUUM;", params![])?;
228    Ok(())
229  }
230
231  fn create_ro_pool(
232    &self,
233    fname: &Path,
234    regfuncs: Vec<CbType>
235  ) -> Result<r2d2::Pool<SqliteConnectionManager>, r2d2::Error> {
236    let fl =
237      OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX;
238    let manager = SqliteConnectionManager::file(fname).with_flags(fl);
239    let roconn_initterm = RoConn { regfuncs };
240    let max_readers = u32::try_from(self.max_readers).unwrap();
241    r2d2::Pool::builder()
242      .max_size(max_readers)
243      .connection_customizer(Box::new(roconn_initterm))
244      .build(manager)
245  }
246}
247
248
249impl Builder {
250  /// Create a new `Builder` for constructing a [`ConnPool`] object.
251  ///
252  /// Default to not run a full vacuum of the database on initialization and
253  /// create 2 read-only connections for the pool.
254  /// No workers thread pool will be used.
255  #[must_use]
256  pub fn new(schmgr: Box<dyn SchemaMgr>) -> Self {
257    Self {
258      schmgr,
259      full_vacuum: false,
260      max_readers: 2,
261      autoclean: None,
262      hook: None,
263      regfuncs: None,
264      #[cfg(feature = "tpool")]
265      tpool: None
266    }
267  }
268
269  /// Trigger a full vacuum when initializing the connection pool.
270  ///
271  /// Operates on an owned `Builder` object.
272  #[must_use]
273  pub const fn init_vacuum(mut self) -> Self {
274    self.full_vacuum = true;
275    self
276  }
277
278  /// Trigger a full vacuum when initializing the connection pool.
279  ///
280  /// Operates on a borrowed `Builder` object.
281  pub const fn init_vacuum_r(&mut self) -> &mut Self {
282    self.full_vacuum = true;
283    self
284  }
285
286  /// Set maximum number of readers in the connection pool.
287  ///
288  /// Operates on an owned `Builder` object.
289  #[must_use]
290  pub const fn max_readers(mut self, n: usize) -> Self {
291    self.max_readers = n;
292    self
293  }
294
295  /// Set maximum number of readers in the connection pool.
296  ///
297  /// Operates on a borrowed `Builder` object.
298  pub const fn max_readers_r(&mut self, n: usize) -> &mut Self {
299    self.max_readers = n;
300    self
301  }
302
303  /// Request that a "raw" update hook be added to the writer connection.
304  ///
305  /// Operates on an owned `Builder` object.
306  #[must_use]
307  pub fn hook(mut self, hook: Arc<dyn Hook + Send + Sync>) -> Self {
308    self.hook = Some(hook);
309    self
310  }
311
312  /// Request that a "raw" update hook be added to the writer connection.
313  ///
314  /// Operates on a borrowed `Builder` object.
315  pub fn hook_r(&mut self, hook: Arc<dyn Hook + Send + Sync>) -> &mut Self {
316    self.hook = Some(hook);
317    self
318  }
319
320  /// Enable incremental autovacuum.
321  ///
322  /// `dirt_watermark` is used to set what amount of "dirt" is required in
323  /// order to trigger an autoclean.  `nfree` is the number of blocks in the
324  /// freelists to process each time the autoclean is run.
325  #[must_use]
326  pub const fn incremental_autoclean(
327    mut self,
328    dirt_watermark: usize,
329    npages: Option<NonZeroUsize>
330  ) -> Self {
331    self.autoclean = Some(AutoClean {
332      dirt_threshold: dirt_watermark,
333      npages
334    });
335    self
336  }
337
338  /// Add a callback to register one or more scalar SQL functions.
339  ///
340  /// The closure should be wrapped in a `RegOn::RO()` if the function should
341  /// only be registered on read-only connections.  `RegOn::RW()` is used to
342  /// register the function on the read/write connection.  Use `RegOn::Both()`
343  /// to register in both read-only and the read/write connection.
344  #[must_use]
345  pub fn reg_scalar_fn<F>(mut self, r: RegOn<F>) -> Self
346  where
347    F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
348  {
349    self.reg_scalar_fn_r(r);
350    self
351  }
352
353  /// Add a callback to register one or more scalar SQL functions.
354  ///
355  /// This is the same as [`Builder::reg_scalar_fn()`], but it operates on
356  /// `&mut Builder` rather than passing ownership.
357  pub fn reg_scalar_fn_r<F>(&mut self, r: RegOn<F>) -> &mut Self
358  where
359    F: Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static
360  {
361    match r {
362      RegOn::RO(f) => {
363        self
364          .regfuncs
365          .get_or_insert(Vec::new())
366          .push(CbType::Ro(Box::new(f)));
367      }
368      RegOn::RW(f) => {
369        self
370          .regfuncs
371          .get_or_insert(Vec::new())
372          .push(CbType::Rw(Box::new(f)));
373      }
374      RegOn::Both(f) => {
375        self
376          .regfuncs
377          .get_or_insert(Vec::new())
378          .push(CbType::Both(Box::new(f)));
379      }
380    }
381    self
382  }
383
384  #[cfg(feature = "tpool")]
385  #[must_use]
386  pub fn thread_pool(mut self, tpool: Arc<ThreadPool>) -> Self {
387    self.tpool = Some(tpool);
388    self
389  }
390
391  #[cfg(feature = "tpool")]
392  pub fn thread_pool_r(&mut self, tpool: Arc<ThreadPool>) -> &mut Self {
393    self.tpool = Some(tpool);
394    self
395  }
396
397  /// Construct a connection pool.
398  ///
399  /// # Errors
400  /// [`Error::Sqlite`] will be returned if a database error occurred.
401  pub fn build<P>(mut self, fname: P) -> Result<ConnPool, Error>
402  where
403    P: AsRef<Path>
404  {
405    // ToDo: Use std::path::absolute() once stabilized
406    let fname = fname.as_ref();
407    let db_exists = fname.exists();
408
409    //
410    // Set up the read/write connection
411    //
412    // This must be done before creating the read-only connection pool, because
413    // at that point the database file must already exist.
414    //
415    let mut conn = self.open_writer(fname)?;
416
417    //
418    // Register read/write connection functions
419    //
420
421    // Option<Vec<T>>  -->  Vec<T>
422    let regfuncs = self.regfuncs.take().unwrap_or_default();
423
424    // Call SQL function registration callbacks for read/write connection.
425    for rf in &regfuncs {
426      match rf {
427        CbType::Rw(f) | CbType::Both(f) => {
428          f(&conn)?;
429        }
430        CbType::Ro(_) => {}
431      }
432    }
433
434    //
435    // Perform schema initialization.
436    //
437    // This must be done after auto_vacuum is set, because auto_vacuum requires
438    // configuration before any tables have been created.
439    // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
440    //
441    self.schmgr.init(&mut conn, !db_exists)?;
442    if self.schmgr.need_upgrade(&conn)? {
443      self.schmgr.upgrade(&mut conn)?;
444    }
445
446    //
447    // Perform a full vacuum if requested to do so.
448    //
449    if self.full_vacuum {
450      Self::full_vacuum(&conn)?;
451    }
452
453    //
454    // Register a callback hook
455    //
456    if let Some(ref hook) = self.hook {
457      rawhook::hook(&conn, hook);
458    }
459
460    //
461    // Set up connection pool for read-only connections.
462    //
463    let rpool = self.create_ro_pool(fname, regfuncs)?;
464
465    //
466    // Prepare shared data
467    //
468    let iconn = InnerWrConn { conn, dirt: 0 };
469    let inner = Inner { conn: Some(iconn) };
470    let sh = Arc::new(Shared {
471      inner: Mutex::new(inner),
472      signal: Condvar::new(),
473      autoclean: self.autoclean.clone()
474    });
475
476    Ok(ConnPool {
477      rpool,
478      sh,
479      #[cfg(feature = "tpool")]
480      tpool: self.tpool
481    })
482  }
483
484
485  /// Construct a connection pool.
486  ///
487  /// Same as [`Builder::build()`], but register a change log callback on the
488  /// writer as well.
489  ///
490  /// This method should not be called if the application has requested to add
491  /// a raw update hook.
492  ///
493  /// # Errors
494  /// [`Error::Sqlite`] is returned for database errors.
495  ///
496  /// # Panics
497  /// This method will panic if a hook has been added to the Builder.
498  pub fn build_with_changelog_hook<P, D, T>(
499    mut self,
500    fname: P,
501    hook: Box<dyn ChangeLogHook<Database = D, Table = T> + Send>
502  ) -> Result<ConnPool, Error>
503  where
504    P: AsRef<Path>,
505    D: FromStr + Send + Sized + 'static,
506    T: FromStr + Send + Sized + 'static
507  {
508    assert!(
509      self.hook.is_some(),
510      "Can't build a connection pool with both a raw and changelog hook"
511    );
512
513    // ToDo: Use std::path::absolute() once stabilized
514    let fname = fname.as_ref();
515    let db_exists = fname.exists();
516
517    //
518    // Set up the read/write connection
519    //
520    // This must be done before creating the read-only connection pool, because
521    // at that point the database file must already exist.
522    //
523    let mut conn = self.open_writer(fname)?;
524
525    //
526    // Register read/write connection functions
527    //
528
529    // Option<Vec<T>>  -->  Vec<T>
530    let regfuncs = self.regfuncs.take().unwrap_or_default();
531
532    // Call SQL function registration callbacks for read/write connection.
533    for rf in &regfuncs {
534      match rf {
535        CbType::Rw(f) | CbType::Both(f) => {
536          f(&conn)?;
537        }
538        CbType::Ro(_) => {}
539      }
540    }
541
542
543    //
544    // Perform schema initialization.
545    //
546    // This must be done after auto_vacuum is set, because auto_vacuum requires
547    // configuration before any tables have been created.
548    // See: https://www.sqlite.org/pragma.html#pragma_auto_vacuum
549    //
550    self.schmgr.init(&mut conn, !db_exists)?;
551    if self.schmgr.need_upgrade(&conn)? {
552      self.schmgr.upgrade(&mut conn)?;
553    }
554
555    //
556    // Perform a full vacuum if requested to do so.
557    //
558    if self.full_vacuum {
559      Self::full_vacuum(&conn)?;
560    }
561
562    //
563    // Register a callback hook
564    //
565    changehook::hook(&conn, hook);
566
567    //
568    // Set up connection pool for read-only connections.
569    //
570    let rpool = self.create_ro_pool(fname, regfuncs)?;
571
572    //
573    // Prepare shared data
574    //
575    let iconn = InnerWrConn { conn, dirt: 0 };
576    let inner = Inner { conn: Some(iconn) };
577    let sh = Arc::new(Shared {
578      inner: Mutex::new(inner),
579      signal: Condvar::new(),
580      autoclean: self.autoclean.clone()
581    });
582
583    Ok(ConnPool {
584      rpool,
585      sh,
586      #[cfg(feature = "tpool")]
587      tpool: self.tpool
588    })
589  }
590}
591
592
593/// Inner writer connection object.
594///
595/// When the writer is acquired from the connection pool it passes an instance
596/// of this struct to the WrConn object.
597struct InnerWrConn {
598  /// The writer connection.
599  conn: Connection,
600
601  /// Amount of accumulated dirt.
602  ///
603  /// Only used when the autoclean feature is used.
604  dirt: usize
605}
606
607
608struct Inner {
609  /// The writer connection.
610  ///
611  /// This is `None` when a `WrConn` exists, and is set to `Some()` by
612  /// `WrConn`'s Drop implementation.
613  conn: Option<InnerWrConn>
614}
615
616struct Shared {
617  inner: Mutex<Inner>,
618  signal: Condvar,
619  autoclean: Option<AutoClean>
620}
621
622
623/// SQLite connection pool.
624///
625/// This is a specialized connection pool that is defined specifically for
626/// sqlite, and only allows a single writer but multiple readers.
627// Note:  In Rust the drop order of struct fields is in the order of
628//        declaration.  If the writer is dropped before the readers, sqlite
629//        will not clean up its wal files when the writet closes (presumably
630//        because the readers are keeping the files locked).  Therefore it is
631//        important that the r2d2 connection pool is declared before the
632//        `Shared` buffer (since it contains the writer).
633#[derive(Clone)]
634pub struct ConnPool {
635  rpool: r2d2::Pool<SqliteConnectionManager>,
636  sh: Arc<Shared>,
637  #[cfg(feature = "tpool")]
638  tpool: Option<Arc<ThreadPool>>
639}
640
641impl ConnPool {
642  /// Return the pool size.
643  ///
644  /// In effect, this is the size of the read-only pool plus one (for the
645  /// read/write connection).
646  #[must_use]
647  pub fn size(&self) -> usize {
648    (self.rpool.max_size() + 1) as usize
649  }
650
651  /// Acquire a read-only connection.
652  ///
653  /// # Errors
654  /// [`r2d2::Error`] will be returned if a read-only connection could not be
655  /// acquired.
656  pub fn reader(
657    &self
658  ) -> Result<PooledConnection<SqliteConnectionManager>, r2d2::Error> {
659    self.rpool.get()
660  }
661
662  /// Acquire the read/write connection.
663  ///
664  /// If the writer is already taken, then block and wait for it to become
665  /// available.
666  #[must_use]
667  #[allow(clippy::significant_drop_tightening)]
668  pub fn writer(&self) -> WrConn {
669    let mut g = self.sh.inner.lock();
670    let conn = loop {
671      if let Some(conn) = g.conn.take() {
672        break conn;
673      }
674      self.sh.signal.wait(&mut g);
675    };
676
677    WrConn {
678      sh: Arc::clone(&self.sh),
679      inner: ManuallyDrop::new(conn)
680    }
681  }
682
683  /// Attempt to acquire the writer connection.
684  ///
685  /// Returns `Some(conn)` if the writer connection was available at the time
686  /// of the request.  Returns `None` if the writer has already been taken.
687  #[must_use]
688  pub fn try_writer(&self) -> Option<WrConn> {
689    let conn = self.sh.inner.lock().conn.take()?;
690    Some(WrConn {
691      sh: Arc::clone(&self.sh),
692      inner: ManuallyDrop::new(conn)
693    })
694  }
695}
696
697
698/// Special queries.
699impl ConnPool {
700  /// Return the number of unused pages.
701  ///
702  /// # Errors
703  /// [`Error::R2D2`] indicates that it wasn't possible to acquire a read-only
704  /// connection from the connection pool.  [`Error::Sqlite`] means it was not
705  /// possible to query the free page list count.
706  pub fn freelist_count(&self) -> Result<usize, Error> {
707    Ok(self.reader()?.query_row_and_then(
708      "PRAGMA freelist_count;'",
709      [],
710      |row| row.get(0)
711    )?)
712  }
713}
714
715
716/// Read-only connection processing.
717impl ConnPool {
718  /// Run a read-only database operation.
719  ///
720  /// # Errors
721  /// The error type `E` is used to return application-defined errors, though
722  /// it must be possible to convert a `r2d2::Error` into `E` using the `From`
723  /// trait.
724  pub fn run_ro<T, F, E>(&self, f: F) -> Result<T, E>
725  where
726    T: Send + 'static,
727    F: FnOnce(&Connection) -> Result<T, E> + Send + 'static,
728    E: From<r2d2::Error>
729  {
730    // Acquire a read-only connection from the pool
731    let conn = self.reader()?;
732
733    // Run caller-provided closure.
734    f(&conn)
735  }
736
737  /// Run a read-only database operation on a thread.
738  ///
739  /// # Errors
740  /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
741  /// connection from the connection pool.
742  ///
743  /// # Panics
744  /// A thread pool must be associated with the [`ConnPool`] or this method
745  /// will panic.
746  #[cfg(feature = "tpool")]
747  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
748  pub fn run_ro_thrd<F>(&self, f: F) -> Result<(), r2d2::Error>
749  where
750    F: FnOnce(&Connection) + Send + 'static
751  {
752    let Some(ref tpool) = self.tpool else {
753      panic!("ConnPool does to have a thread pool");
754    };
755
756    // Acquire a read-only connection from the pool and then run the provided
757    // closure on a thread from the thread pool.
758    let conn = self.reader()?;
759    tpool.execute(move || {
760      f(&conn);
761    });
762    Ok(())
763  }
764
765  /// Run a read-only database operation on a thread, allowing the caller to
766  /// receive the `Result<T, E>` of the supplied closure using a
767  /// one-shot channel.
768  ///
769  /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
770  /// case will be passed as a "set" value through the `swctx` channel, and the
771  /// `Err` case will be passed as a "fail" value.
772  ///
773  /// # Errors
774  /// [`r2d2::Error`] is returned if it wasn't possible to acquire a read-only
775  /// connection from the connection pool.
776  ///
777  /// # Panics
778  /// A thread pool must be associated with the [`ConnPool`] or this method
779  /// will panic.
780  #[cfg(feature = "tpool")]
781  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
782  pub fn run_ro_thrd_result<T, E, F>(
783    &self,
784    f: F
785  ) -> Result<swctx::WaitCtx<T, (), E>, r2d2::Error>
786  where
787    T: Send + 'static,
788    E: fmt::Debug + Send + 'static,
789    F: FnOnce(&Connection) -> Result<T, E> + Send + 'static
790  {
791    let Some(ref tpool) = self.tpool else {
792      panic!("ConnPool does to have a thread pool");
793    };
794
795    let conn = self.reader()?;
796
797    let (sctx, wctx) = swctx::mkpair();
798
799    // Ignore errors relating to pass the results back
800    tpool.execute(move || match f(&conn) {
801      Ok(t) => {
802        let _ = sctx.set(t);
803      }
804      Err(e) => {
805        let _ = sctx.fail(e);
806      }
807    });
808
809    Ok(wctx)
810  }
811}
812
813/// Read/Write connection processing.
814impl ConnPool {
815  /// Run a read/write database operation.
816  ///
817  /// # Errors
818  /// Returns an application-specific type `E` on error.
819  pub fn run_rw<T, E, F>(&self, f: F) -> Result<T, E>
820  where
821    T: Send + 'static,
822    E: fmt::Debug + Send + 'static,
823    F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
824  {
825    let mut conn = self.writer();
826    f(&mut conn)
827  }
828
829  /// Run a read/write database operation on a thread.
830  ///
831  /// The supplied closure should return an `Option<usize>`, where the `Some()`
832  /// case denotes the specified amount of "dirt" should be added to the write
833  /// connection.  `None` means no dirt should be added.
834  ///
835  /// # Panics
836  /// A thread pool must be associated with the [`ConnPool`] or this method
837  /// will panic.
838  #[cfg(feature = "tpool")]
839  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
840  pub fn run_rw_thrd<F>(&self, f: F)
841  where
842    F: FnOnce(&mut WrConn) -> Option<usize> + Send + 'static
843  {
844    let Some(ref tpool) = self.tpool else {
845      panic!("ConnPool does to have a thread pool");
846    };
847
848    let mut conn = self.writer();
849    tpool.execute(move || {
850      let dirt = f(&mut conn);
851      if let Some(dirt) = dirt {
852        conn.add_dirt(dirt);
853      }
854    });
855  }
856
857  /// Run a read/write database operation on a thread, allowing the
858  /// caller to receive the `Result<T, E>` of the supplied closure using a
859  /// one-shot channel.
860  ///
861  /// The supplied closure in `f` should return a `Result<T, E>` where the `Ok`
862  /// case will be passed as a "set" value through the `swctx` channel, and the
863  /// `Err` case will be passed as a "fail" value.
864  ///
865  /// # Panics
866  /// A thread pool must be associated with the [`ConnPool`] or this method
867  /// will panic.
868  #[cfg(feature = "tpool")]
869  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
870  pub fn run_rw_thrd_result<T, E, F>(&self, f: F) -> swctx::WaitCtx<T, (), E>
871  where
872    T: Send + 'static,
873    E: fmt::Debug + Send + 'static,
874    F: FnOnce(&mut WrConn) -> Result<T, E> + Send + 'static
875  {
876    let Some(ref tpool) = self.tpool else {
877      panic!("ConnPool does to have a thread pool");
878    };
879
880    let mut conn = self.writer();
881
882    let (sctx, wctx) = swctx::mkpair();
883
884    tpool.execute(move || match f(&mut conn) {
885      Ok(t) => {
886        let _ = sctx.set(t);
887      }
888      Err(e) => {
889        let _ = sctx.fail(e);
890      }
891    });
892
893    wctx
894  }
895}
896
897
898impl ConnPool {
899  #[cfg(feature = "tpool")]
900  #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))]
901  #[must_use]
902  pub fn incremental_vacuum(
903    &self,
904    n: Option<usize>
905  ) -> swctx::WaitCtx<(), (), rusqlite::Error> {
906    self.run_rw_thrd_result(move |conn| conn.incremental_vacuum(n))
907  }
908}
909
910// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :