Skip to main content

deadpool_apexbase/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24mod config;
25
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use apexbase::embedded::{ApexDB, Row};
29use deadpool::managed::{RecycleError, RecycleResult};
30use deadpool_sync::SyncWrapper;
31
32pub use crate::config::{Config, ConfigError};
33pub use apexbase;
34pub use deadpool::managed::reexports::*;
35pub use deadpool_sync::reexports::*;
36
37deadpool::managed_reexports!(
38    "apexbase",
39    Manager,
40    Object,
41    apexbase::ApexError,
42    ConfigError
43);
44
45/// Type alias for [`Object`]
46pub type Connection = Object;
47
48/// [`Manager`] for creating and recycling ApexBase [`ApexDB`] connections.
49///
50/// [`Manager`]: deadpool::managed::Manager
51#[derive(Debug)]
52pub struct Manager {
53    config: Config,
54    runtime: Runtime,
55    test_query_count: AtomicU64,
56}
57
58impl Manager {
59    /// Creates a new [`Manager`] using the given [`Config`] backed by the
60    /// specified [`Runtime`].
61    #[must_use]
62    pub fn from_config(config: &Config, runtime: Runtime) -> Self {
63        Self {
64            config: config.clone(),
65            runtime,
66            test_query_count: AtomicU64::new(0),
67        }
68    }
69}
70
71impl deadpool::managed::Manager for Manager {
72    type Type = SyncWrapper<ApexDB>;
73    type Error = apexbase::ApexError;
74
75    async fn create(&self) -> Result<Self::Type, Self::Error> {
76        let path = self.config.path.clone();
77        let durability = self.config.durability;
78        let drop_if_exists = self.config.drop_if_exists;
79        SyncWrapper::new(self.runtime, move || {
80            ApexDB::builder(&path)
81                .durability(durability)
82                .drop_if_exists(drop_if_exists)
83                .build()
84        })
85        .await
86    }
87
88    async fn recycle(
89        &self,
90        conn: &mut Self::Type,
91        _metrics: &Metrics,
92    ) -> RecycleResult<Self::Error> {
93        if conn.is_mutex_poisoned() {
94            return Err(RecycleError::Message(
95                "Mutex is poisoned. Connection is considered unusable.".into(),
96            ));
97        }
98        let test_query_count = self.test_query_count.fetch_add(1, Ordering::Relaxed);
99        conn.interact(move |apexdb| {
100            // Run a test query to verify the connection is usable.
101            // Create a temp table, insert a test value, then drop it.
102            let tbl_name = format!("__deadpool_test_{}", test_query_count);
103            let table = apexdb
104                .create_table(&tbl_name)
105                .map_err(|e| RecycleError::message(format!("create: {}", e)))?;
106            let mut r = Row::new();
107            let _ = r.insert(
108                "val".to_string(),
109                apexbase::data::Value::UInt64(test_query_count),
110            );
111            let _ = table
112                .insert(r)
113                .map_err(|e| RecycleError::message(format!("insert: {}", e)))?;
114            let cnt = table
115                .count()
116                .map_err(|e| RecycleError::message(format!("count: {}", e)))?;
117            if cnt != 1 {
118                return Err(RecycleError::Message(
119                    "Expected 1 row in test table".into(),
120                ));
121            }
122            apexdb
123                .drop_table(&tbl_name)
124                .map_err(|e| RecycleError::message(format!("drop: {}", e)))?;
125            Ok(())
126        })
127        .await
128        .map_err(|e| RecycleError::Message(format!("interact error: {}", e).into()))?
129    }
130}