1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4 nonstandard_style,
5 rust_2018_idioms,
6 rustdoc::broken_intra_doc_links,
7 rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11 deprecated_in_future,
12 missing_copy_implementations,
13 missing_debug_implementations,
14 missing_docs,
15 unreachable_pub,
16 unused_import_braces,
17 unused_labels,
18 unused_lifetimes,
19 unused_qualifications,
20 unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24mod config;
25
26use std::sync::atomic::{AtomicU64, Ordering};
27
28use apexbase::embedded::{ApexDB, Row};
29use deadpool::managed::{RecycleError, RecycleResult};
30use deadpool_sync::SyncWrapper;
31
32pub use crate::config::{Config, ConfigError};
33pub use apexbase;
34pub use deadpool::managed::reexports::*;
35pub use deadpool_sync::reexports::*;
36
37deadpool::managed_reexports!(
38 "apexbase",
39 Manager,
40 Object,
41 apexbase::ApexError,
42 ConfigError
43);
44
45pub type Connection = Object;
47
48#[derive(Debug)]
52pub struct Manager {
53 config: Config,
54 runtime: Runtime,
55 test_query_count: AtomicU64,
56}
57
58impl Manager {
59 #[must_use]
62 pub fn from_config(config: &Config, runtime: Runtime) -> Self {
63 Self {
64 config: config.clone(),
65 runtime,
66 test_query_count: AtomicU64::new(0),
67 }
68 }
69}
70
71impl deadpool::managed::Manager for Manager {
72 type Type = SyncWrapper<ApexDB>;
73 type Error = apexbase::ApexError;
74
75 async fn create(&self) -> Result<Self::Type, Self::Error> {
76 let path = self.config.path.clone();
77 let durability = self.config.durability;
78 let drop_if_exists = self.config.drop_if_exists;
79 SyncWrapper::new(self.runtime, move || {
80 ApexDB::builder(&path)
81 .durability(durability)
82 .drop_if_exists(drop_if_exists)
83 .build()
84 })
85 .await
86 }
87
88 async fn recycle(
89 &self,
90 conn: &mut Self::Type,
91 _metrics: &Metrics,
92 ) -> RecycleResult<Self::Error> {
93 if conn.is_mutex_poisoned() {
94 return Err(RecycleError::Message(
95 "Mutex is poisoned. Connection is considered unusable.".into(),
96 ));
97 }
98 let test_query_count = self.test_query_count.fetch_add(1, Ordering::Relaxed);
99 conn.interact(move |apexdb| {
100 let tbl_name = format!("__deadpool_test_{}", test_query_count);
103 let table = apexdb
104 .create_table(&tbl_name)
105 .map_err(|e| RecycleError::message(format!("create: {}", e)))?;
106 let mut r = Row::new();
107 let _ = r.insert(
108 "val".to_string(),
109 apexbase::data::Value::UInt64(test_query_count),
110 );
111 let _ = table
112 .insert(r)
113 .map_err(|e| RecycleError::message(format!("insert: {}", e)))?;
114 let cnt = table
115 .count()
116 .map_err(|e| RecycleError::message(format!("count: {}", e)))?;
117 if cnt != 1 {
118 return Err(RecycleError::Message(
119 "Expected 1 row in test table".into(),
120 ));
121 }
122 apexdb
123 .drop_table(&tbl_name)
124 .map_err(|e| RecycleError::message(format!("drop: {}", e)))?;
125 Ok(())
126 })
127 .await
128 .map_err(|e| RecycleError::Message(format!("interact error: {}", e).into()))?
129 }
130}