Skip to main content

deadpool_citadeldb/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24use std::sync::atomic::{AtomicIsize, Ordering};
25
26use deadpool::managed::{self, RecycleError};
27use deadpool_sync::SyncWrapper;
28
29/// Configuration support.
30pub mod config;
31pub use config::{Config, ConfigError};
32
33pub use citadel;
34pub use citadel_sql;
35
36pub use deadpool::managed::reexports::*;
37pub use deadpool_sync::reexports::*;
38
39deadpool::managed_reexports!(
40    "citadeldb",
41    Manager,
42    managed::Object<Manager>,
43    Error,
44    ConfigError
45);
46
47/// Type alias for [`Object`]
48pub type Connection = Object;
49
50/// [`Manager`] for creating and recycling CitadelDB connections.
51///
52/// [`Manager`]: managed::Manager
53#[derive(Debug)]
54pub struct Manager {
55    db: &'static citadel::Database,
56    recycle_count: AtomicIsize,
57    runtime: Runtime,
58}
59
60impl Manager {
61    /// Creates a new [`Manager`] using the given [`Config`] backed by the
62    /// specified [`Runtime`].
63    #[must_use]
64    pub fn from_config(config: &Config, runtime: Runtime) -> Self {
65        Self {
66            db: Box::leak(Box::new(config.create_database().unwrap())),
67            recycle_count: AtomicIsize::new(0),
68            runtime,
69        }
70    }
71}
72
73/// Error type for [`deadpool-citadeldb`](crate).
74#[derive(Debug, thiserror::Error)]
75pub enum Error {
76    /// The error was reported by the CitadelDB storage engine.
77    #[error("CitadelDB error: {0}")]
78    Citadel(#[from] citadel::Error),
79    /// The test query was executed but the database returned
80    /// an unexpected response.
81    #[error("Test query failed: {0}")]
82    TestQueryFailed(String),
83}
84
85impl managed::Manager for Manager {
86    type Type = SyncWrapper<citadel_sql::Connection<'static>>;
87    type Error = citadel_sql::error::SqlError;
88
89    async fn create(&self) -> Result<Self::Type, Self::Error> {
90        // Box::leak gives us a 'static reference, satisfying SyncWrapper's T: 'static bound.
91        let db: &'static citadel::Database = self.db;
92        SyncWrapper::new(self.runtime, move || citadel_sql::Connection::open(db)).await
93    }
94
95    async fn recycle(
96        &self,
97        conn: &mut Self::Type,
98        _: &Metrics,
99    ) -> managed::RecycleResult<Self::Error> {
100        if conn.is_mutex_poisoned() {
101            return Err(RecycleError::Message(
102                "Mutex is poisoned. Connection is considered unusable.".into(),
103            ));
104        }
105        let recycle_count = self.recycle_count.fetch_add(1, Ordering::Relaxed);
106        let n: isize = conn
107            .interact(move |conn| {
108                let result = conn.query_params(
109                    "SELECT $1",
110                    &[citadel_sql::Value::Integer(recycle_count as i64)],
111                )?;
112                result
113                    .rows
114                    .first()
115                    .and_then(|row| row.first())
116                    .and_then(|val| match val {
117                        citadel_sql::Value::Integer(i) => Some(*i as isize),
118                        _ => None,
119                    })
120                    .ok_or_else(|| {
121                        citadel_sql::error::SqlError::InvalidValue("expected integer".into())
122                    })
123            })
124            .await
125            .map_err(|e| RecycleError::message(format!("{}", e)))??;
126        if n == recycle_count {
127            Ok(())
128        } else {
129            Err(RecycleError::message("Recycle count mismatch"))
130        }
131    }
132}