#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(
nonstandard_style,
rust_2018_idioms,
rustdoc::broken_intra_doc_links,
rustdoc::private_intra_doc_links
)]
#![forbid(non_ascii_idents, unsafe_code)]
#![warn(
deprecated_in_future,
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
unreachable_pub,
unused_import_braces,
unused_labels,
unused_lifetimes,
unused_qualifications,
unused_results
)]
#![allow(clippy::uninlined_format_args)]
mod config;
use std::sync::atomic::{AtomicU64, Ordering};
use apexbase::embedded::{ApexDB, Row};
use deadpool::managed::{RecycleError, RecycleResult};
use deadpool_sync::SyncWrapper;
pub use crate::config::{Config, ConfigError};
pub use apexbase;
pub use deadpool::managed::reexports::*;
pub use deadpool_sync::reexports::*;
deadpool::managed_reexports!(
"apexbase",
Manager,
Object,
apexbase::ApexError,
ConfigError
);
pub type Connection = Object;
#[derive(Debug)]
pub struct Manager {
config: Config,
runtime: Runtime,
test_query_count: AtomicU64,
}
impl Manager {
#[must_use]
pub fn from_config(config: &Config, runtime: Runtime) -> Self {
Self {
config: config.clone(),
runtime,
test_query_count: AtomicU64::new(0),
}
}
}
impl deadpool::managed::Manager for Manager {
type Type = SyncWrapper<ApexDB>;
type Error = apexbase::ApexError;
async fn create(&self) -> Result<Self::Type, Self::Error> {
let path = self.config.path.clone();
let durability = self.config.durability;
let drop_if_exists = self.config.drop_if_exists;
SyncWrapper::new(self.runtime, move || {
ApexDB::builder(&path)
.durability(durability)
.drop_if_exists(drop_if_exists)
.build()
})
.await
}
async fn recycle(
&self,
conn: &mut Self::Type,
_metrics: &Metrics,
) -> RecycleResult<Self::Error> {
if conn.is_mutex_poisoned() {
return Err(RecycleError::Message(
"Mutex is poisoned. Connection is considered unusable.".into(),
));
}
let test_query_count = self.test_query_count.fetch_add(1, Ordering::Relaxed);
conn.interact(move |apexdb| {
let tbl_name = format!("__deadpool_test_{}", test_query_count);
let table = apexdb
.create_table(&tbl_name)
.map_err(|e| RecycleError::message(format!("create: {}", e)))?;
let mut r = Row::new();
let _ = r.insert(
"val".to_string(),
apexbase::data::Value::UInt64(test_query_count),
);
let _ = table
.insert(r)
.map_err(|e| RecycleError::message(format!("insert: {}", e)))?;
let cnt = table
.count()
.map_err(|e| RecycleError::message(format!("count: {}", e)))?;
if cnt != 1 {
return Err(RecycleError::Message(
"Expected 1 row in test table".into(),
));
}
apexdb
.drop_table(&tbl_name)
.map_err(|e| RecycleError::message(format!("drop: {}", e)))?;
Ok(())
})
.await
.map_err(|e| RecycleError::Message(format!("interact error: {}", e).into()))?
}
}