deadpool_libsql/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![deny(
4    nonstandard_style,
5    rust_2018_idioms,
6    rustdoc::broken_intra_doc_links,
7    rustdoc::private_intra_doc_links
8)]
9#![forbid(non_ascii_idents, unsafe_code)]
10#![warn(
11    deprecated_in_future,
12    missing_copy_implementations,
13    missing_debug_implementations,
14    missing_docs,
15    unreachable_pub,
16    unused_import_braces,
17    unused_labels,
18    unused_lifetimes,
19    unused_qualifications,
20    unused_results
21)]
22#![allow(clippy::uninlined_format_args)]
23
24use std::sync::atomic::{AtomicU64, Ordering};
25
26use deadpool::managed::{self, RecycleError};
27
28pub mod config;
29pub use config::Config;
30mod errors;
31
32pub use libsql;
33
34pub use deadpool::managed::reexports::*;
35pub use errors::ConnectionError;
36deadpool::managed_reexports!(
37    "libsql",
38    Manager,
39    Connection,
40    ConnectionError,
41    config::ConfigError
42);
43
44/// Type alias for ['Object']
45pub type Connection = managed::Object<Manager>;
46
47/// [`Manager`] for creating and recycling [`libsql::Connection`].
48///
49/// [`Manager`]: managed::Manager
50#[derive(Debug)]
51pub struct Manager {
52    database: libsql::Database,
53    test_query_count: AtomicU64,
54}
55
56impl Manager {
57    /// Creates a new [`Manager`] using the given [`libsql::Database`].
58    pub fn from_libsql_database(database: libsql::Database) -> Self {
59        Self {
60            database,
61            test_query_count: AtomicU64::new(0),
62        }
63    }
64
65    /// Creates a new [`Manager`] using the given [`config::Config`].
66    pub async fn from_config(config: Config) -> Result<Self, libsql::Error> {
67        config
68            .database
69            .libsql_database()
70            .await
71            .map(Self::from_libsql_database)
72    }
73
74    async fn run_test_query(&self, conn: &libsql::Connection) -> Result<(), ConnectionError> {
75        let test_query_count = self.test_query_count.fetch_add(1, Ordering::Relaxed);
76        // A call to the database to check that it is accessible
77        let row = conn
78            .query("SELECT ?", [test_query_count])
79            .await?
80            .next()
81            .await?
82            .ok_or(ConnectionError::TestQueryFailed(
83                "No rows returned from database for test query",
84            ))?;
85        let value: u64 = row.get(0)?;
86
87        if value == test_query_count {
88            Ok(())
89        } else {
90            Err(ConnectionError::TestQueryFailed(
91                "Unexpected value returned for test query",
92            ))
93        }
94    }
95}
96
97impl managed::Manager for Manager {
98    type Type = libsql::Connection;
99    type Error = ConnectionError;
100
101    async fn create(&self) -> Result<Self::Type, Self::Error> {
102        let conn = self.database.connect()?;
103        // Libsql establishes the database connection lazily. Thus the
104        // only way to check if the connection is in a useable state is
105        // to run a test query.
106        self.run_test_query(&conn).await?;
107        Ok(conn)
108    }
109
110    async fn recycle(
111        &self,
112        conn: &mut Self::Type,
113        _: &Metrics,
114    ) -> managed::RecycleResult<Self::Error> {
115        self.run_test_query(conn)
116            .await
117            .map_err(RecycleError::Backend)
118    }
119}