deadpool-apexbase 0.1.0

Dead simple pool for ApexBase embedded database
Documentation
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(
    nonstandard_style,
    rust_2018_idioms,
    rustdoc::broken_intra_doc_links,
    rustdoc::private_intra_doc_links
)]
#![forbid(non_ascii_idents, unsafe_code)]
#![warn(
    deprecated_in_future,
    missing_copy_implementations,
    missing_debug_implementations,
    missing_docs,
    unreachable_pub,
    unused_import_braces,
    unused_labels,
    unused_lifetimes,
    unused_qualifications,
    unused_results
)]
#![allow(clippy::uninlined_format_args)]

mod config;

use std::sync::atomic::{AtomicU64, Ordering};

use apexbase::embedded::{ApexDB, Row};
use deadpool::managed::{RecycleError, RecycleResult};
use deadpool_sync::SyncWrapper;

pub use crate::config::{Config, ConfigError};
pub use apexbase;
pub use deadpool::managed::reexports::*;
pub use deadpool_sync::reexports::*;

deadpool::managed_reexports!(
    "apexbase",
    Manager,
    Object,
    apexbase::ApexError,
    ConfigError
);

/// Type alias for [`Object`]
pub type Connection = Object;

/// [`Manager`] for creating and recycling ApexBase [`ApexDB`] connections.
///
/// [`Manager`]: deadpool::managed::Manager
#[derive(Debug)]
pub struct Manager {
    config: Config,
    runtime: Runtime,
    test_query_count: AtomicU64,
}

impl Manager {
    /// Creates a new [`Manager`] using the given [`Config`] backed by the
    /// specified [`Runtime`].
    #[must_use]
    pub fn from_config(config: &Config, runtime: Runtime) -> Self {
        Self {
            config: config.clone(),
            runtime,
            test_query_count: AtomicU64::new(0),
        }
    }
}

impl deadpool::managed::Manager for Manager {
    type Type = SyncWrapper<ApexDB>;
    type Error = apexbase::ApexError;

    async fn create(&self) -> Result<Self::Type, Self::Error> {
        let path = self.config.path.clone();
        let durability = self.config.durability;
        let drop_if_exists = self.config.drop_if_exists;
        SyncWrapper::new(self.runtime, move || {
            ApexDB::builder(&path)
                .durability(durability)
                .drop_if_exists(drop_if_exists)
                .build()
        })
        .await
    }

    async fn recycle(
        &self,
        conn: &mut Self::Type,
        _metrics: &Metrics,
    ) -> RecycleResult<Self::Error> {
        if conn.is_mutex_poisoned() {
            return Err(RecycleError::Message(
                "Mutex is poisoned. Connection is considered unusable.".into(),
            ));
        }
        let test_query_count = self.test_query_count.fetch_add(1, Ordering::Relaxed);
        conn.interact(move |apexdb| {
            // Run a test query to verify the connection is usable.
            // Create a temp table, insert a test value, then drop it.
            let tbl_name = format!("__deadpool_test_{}", test_query_count);
            let table = apexdb
                .create_table(&tbl_name)
                .map_err(|e| RecycleError::message(format!("create: {}", e)))?;
            let mut r = Row::new();
            let _ = r.insert(
                "val".to_string(),
                apexbase::data::Value::UInt64(test_query_count),
            );
            let _ = table
                .insert(r)
                .map_err(|e| RecycleError::message(format!("insert: {}", e)))?;
            let cnt = table
                .count()
                .map_err(|e| RecycleError::message(format!("count: {}", e)))?;
            if cnt != 1 {
                return Err(RecycleError::Message(
                    "Expected 1 row in test table".into(),
                ));
            }
            apexdb
                .drop_table(&tbl_name)
                .map_err(|e| RecycleError::message(format!("drop: {}", e)))?;
            Ok(())
        })
        .await
        .map_err(|e| RecycleError::Message(format!("interact error: {}", e).into()))?
    }
}