#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
#![cfg_attr(not(feature = "std"), no_std)]
#![no_builtins]
#![warn(
clippy::all,
clippy::nursery,
clippy::pedantic,
clippy::cargo,
)]
#![allow(
clippy::cargo_common_metadata,
clippy::module_name_repetitions,
clippy::multiple_crate_versions,
)]
use lock_pool::{LockGuard, LockPool, maybe_await};
use core::future::Future;
use core::mem::MaybeUninit;
use core::ops::{Deref, DerefMut};
const MAX_WAITERS: usize = 32;
pub trait Connection: Sized + Send + Sync {
type Context: Send + Sync;
type Error: Send + Sync;
fn create(ctx: &Self::Context) -> impl Future<Output = Result<Self, Self::Error>> + Send + Sync;
fn needs_health_check(&mut self, ctx: &Self::Context) -> bool;
fn health_check(&mut self, ctx: &Self::Context) -> impl Future<Output = Result<(), Self::Error>> + Send + Sync;
}
type Guard<'pool, Conn, const SIZE: usize> = LockGuard<'pool, LazyConn<Conn>, SIZE, MAX_WAITERS>;
#[doc(hidden)]
pub struct LazyConn<Conn> {
conn: Option<Conn>
}
impl<Conn> Default for LazyConn<Conn> {
#[inline]
fn default() -> Self {
Self::new()
}
}
impl<Conn> LazyConn<Conn> {
#[must_use] const fn new() -> Self {
Self {
conn: None
}
}
const fn init(conn: Conn) -> Self {
Self {
conn: Some(conn)
}
}
#[inline]
pub const fn get(&self) -> Option<&Conn> {
self.conn.as_ref()
}
#[inline]
pub fn get_mut(&mut self) -> Option<&mut Conn> {
self.conn.as_mut()
}
#[inline]
pub fn replace(&mut self, new: Conn) -> Option<Conn> {
self.conn.replace(new)
}
#[inline]
pub fn expire(&mut self) {
self.conn = None;
}
}
unsafe impl<Conn: Sync> Sync for LazyConn<Conn> {}
unsafe impl<Conn: Send> Send for LazyConn<Conn> {}
#[doc(hidden)]
#[repr(transparent)]
pub struct LiveConn<'pool, Conn, const SIZE: usize> {
guard: Guard<'pool, Conn, SIZE>
}
impl<'pool, Conn, const SIZE: usize> LiveConn<'pool, Conn, SIZE> {
#[inline]
#[must_use]
pub unsafe fn new(g: Guard<'pool, Conn, SIZE>) -> Self {
debug_assert!(g.conn.is_some());
Self { guard: g }
}
}
impl<'pool, Conn, const SIZE: usize> Deref for LiveConn<'pool, Conn, SIZE> {
type Target = Conn;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe { self.guard.conn.as_ref().unwrap_unchecked() }
}
}
impl<'pool, Conn, const SIZE: usize> DerefMut for LiveConn<'pool, Conn, SIZE> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { self.guard.conn.as_mut().unwrap_unchecked() }
}
}
impl<'pool, Conn, const SIZE: usize> AsRef<Conn> for LiveConn<'pool, Conn, SIZE> {
#[inline]
fn as_ref(&self) -> &Conn {
unsafe { self.guard.conn.as_ref().unwrap_unchecked() }
}
}
impl<'pool, Conn, const SIZE: usize> AsMut<Conn> for LiveConn<'pool, Conn, SIZE> {
#[inline]
fn as_mut(&mut self) -> &mut Conn {
unsafe { self.guard.conn.as_mut().unwrap_unchecked() }
}
}
pub struct Pool<Conn: Connection, const SIZE: usize> {
pool: LockPool<LazyConn<Conn>, SIZE, MAX_WAITERS>,
ctx: Conn::Context
}
impl<Conn, const SIZE: usize> Default for Pool<Conn, SIZE>
where
Conn: Connection,
Conn::Context: Default,
{
fn default() -> Self {
Self {
pool: LockPool::new(),
ctx: Conn::Context::default(),
}
}
}
impl<Conn: Connection + Send + Sync, const SIZE: usize> Pool<Conn, SIZE> {
#[must_use] #[inline]
pub fn new(ctx: Conn::Context) -> Self {
Self {
pool: LockPool::new(),
ctx
}
}
pub async fn new_eager<Fut, F, E>(ctx: Conn::Context, f: F) -> Result<Self, E>
where
Fut: Future<Output = Result<Conn, E>> + Send + Sync,
F: Fn(usize) -> Fut + Send + Sync,
E: Send + Sync
{
let mut temp: [MaybeUninit<LazyConn<Conn>>; SIZE] = unsafe {
MaybeUninit::uninit().assume_init()
};
for i in 0..SIZE {
match f(i).await {
Ok(conn) => {
let slot = unsafe { temp.get_unchecked_mut(i) };
slot.write(LazyConn::init(conn))
},
Err(err) if core::mem::needs_drop::<Conn>() => {
for needs_drop in &mut temp[0..i] {
unsafe { needs_drop.assume_init_drop() }
}
return Err(err);
}
Err(err) => return Err(err)
};
}
Ok(Self {
pool: LockPool::from(unsafe {
core::mem::transmute_copy::<[MaybeUninit<LazyConn<Conn>>; SIZE], [LazyConn<Conn>; SIZE]>(&temp)
}),
ctx
})
}
pub async fn insert(&self, conn: Conn) -> Option<Conn> {
let mut maybe_conn = maybe_await!(self.pool.get());
maybe_conn.replace(conn)
}
pub async fn get(&self) -> Result<LiveConn<Conn, SIZE>, Conn::Error> {
let mut maybe_conn = maybe_await!(self.pool.get());
if let Some(conn) = maybe_conn.get_mut() {
if conn.needs_health_check(&self.ctx) && conn.health_check(&self.ctx).await.is_err() {
*maybe_conn = LazyConn::init(Conn::create(&self.ctx).await?);
}
} else {
*maybe_conn = LazyConn::init(Conn::create(&self.ctx).await?);
}
Ok(unsafe { LiveConn::new(maybe_conn) })
}
pub async fn try_get(&self) -> Option<Result<LiveConn<Conn, SIZE>, Conn::Error>> {
let mut maybe_conn = self.pool.try_get()?;
if let Some(conn) = maybe_conn.get_mut() {
if conn.needs_health_check(&self.ctx) && conn.health_check(&self.ctx).await.is_err() {
match Conn::create(&self.ctx).await {
Ok(conn) => *maybe_conn = LazyConn::init(conn),
Err(e) => return Some(Err(e))
};
}
} else {
match Conn::create(&self.ctx).await {
Ok(conn) => *maybe_conn = LazyConn::init(conn),
Err(e) => return Some(Err(e))
}
}
Some(Ok(unsafe { LiveConn::new(maybe_conn) }))
}
#[inline]
pub async fn get_or_create(&self) -> Result<FreshOrPooled<Conn, SIZE>, Conn::Error> {
match self.try_get().await {
Some(res) => res.map(FreshOrPooled::Pooled),
None => Conn::create(&self.ctx).await.map(FreshOrPooled::Fresh)
}
}
}
pub enum FreshOrPooled<'pool, Conn, const SIZE: usize> {
Fresh(Conn),
Pooled(LiveConn<'pool, Conn, SIZE>)
}
macro_rules! match_fresh_or_pooled {
($e:expr, |$c:ident| $arm:expr) => {
match $e {
$crate::FreshOrPooled::Fresh($c) => $arm,
$crate::FreshOrPooled::Pooled($c) => $arm
}
};
}
impl<'pool, Conn, const SIZE: usize> Deref for FreshOrPooled<'pool, Conn, SIZE> {
type Target = Conn;
#[inline]
fn deref(&self) -> &Self::Target {
match_fresh_or_pooled!(self, |conn| conn)
}
}
impl<'pool, Conn, const SIZE: usize> DerefMut for FreshOrPooled<'pool, Conn, SIZE> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
match_fresh_or_pooled!(self, |conn| conn)
}
}
impl<'pool, Conn, const SIZE: usize> FreshOrPooled<'pool, Conn, SIZE> {
#[inline]
pub const fn is_pooled(&self) -> bool {
matches!(self, Self::Pooled(_))
}
#[inline]
pub const fn is_fresh(&self) -> bool {
matches!(self, Self::Fresh(_))
}
}
#[allow(clippy::nursery)]
#[cfg(test)]
mod tests {
use super::*;
extern crate alloc;
use alloc::string::String;
use futures::executor::block_on;
trait Inner {
fn new() -> Self;
}
struct Conn<T: Inner> {
inner: T,
needs_health_check: bool,
is_healthy: bool
}
impl<T: Inner> Conn<T> {
fn new() -> Self {
Self {
inner: T::new(),
needs_health_check: true,
is_healthy: true
}
}
}
unsafe impl<T: Inner> Send for Conn<T> {}
unsafe impl<T: Inner> Sync for Conn<T> {}
impl<T: Inner> Connection for Conn<T> {
type Context = ();
type Error = ();
async fn create(_ctx: &()) -> Result<Self, Self::Error> {
Ok(Self::new())
}
fn needs_health_check(&mut self, _ctx: &()) -> bool {
self.needs_health_check
}
async fn health_check(&mut self, _ctx: &()) -> Result<(), Self::Error> {
if self.is_healthy {
Ok(())
} else {
Err(())
}
}
}
impl Inner for String {
fn new() -> Self {
String::from("Hello World")
}
}
impl Inner for usize {
fn new() -> Self {
7
}
}
fn create_conn<C: Connection<Context = ()>>(_idx: usize) -> impl Future<Output = Result<C, <C as Connection>::Error>> {
C::create(&())
}
macro_rules! create_conn_fail {
($fail_on:literal, $c:ty) => {
|idx| async move {
if idx == $fail_on {
Err(())
} else {
<$c>::create(&()).await
}
}
};
}
macro_rules! assert_string_inner {
($live_conn:ident) => {
assert_eq!($live_conn.inner, String::from("Hello World"))
};
}
macro_rules! assert_usize_inner {
($live_conn:ident) => {
assert_eq!($live_conn.inner, 7)
};
}
#[test]
fn new_eager_success_alloc() {
let fut = async {
let pool: Pool<Conn<String>, 5> = Pool::new_eager((), create_conn)
.await
.unwrap();
let c0 = pool.get().await.unwrap();
assert_string_inner!(c0);
let c1 = pool.get().await.unwrap();
assert_string_inner!(c1);
let c2 = pool.get().await.unwrap();
assert_string_inner!(c2);
let c3 = pool.get().await.unwrap();
assert_string_inner!(c3);
let c4 = pool.get().await.unwrap();
assert_string_inner!(c4);
};
block_on(fut);
}
#[test]
fn new_eager_failure_alloc() {
let fut = async {
assert!(
Pool::<Conn<String>, 5>::new_eager((), create_conn_fail!(3, Conn<String>))
.await
.is_err()
);
};
block_on(fut);
}
#[test]
fn new_eager_success() {
let fut = async {
let pool: Pool<Conn<usize>, 5> = Pool::new_eager((), create_conn)
.await
.unwrap();
let c0 = pool.get().await.unwrap();
assert_usize_inner!(c0);
let c1 = pool.get().await.unwrap();
assert_usize_inner!(c1);
let c2 = pool.get().await.unwrap();
assert_usize_inner!(c2);
let c3 = pool.get().await.unwrap();
assert_usize_inner!(c3);
let c4 = pool.get().await.unwrap();
assert_usize_inner!(c4);
};
block_on(fut);
}
#[test]
fn new_eager_failure() {
let fut = async {
assert!(
Pool::<Conn<usize>, 5>::new_eager((), create_conn_fail!(3, Conn<usize>))
.await
.is_err()
);
};
block_on(fut);
}
#[test]
fn get_health_check() {
let fut = async {
let pool: Pool<Conn<usize>, 5> = Pool::new_eager((), create_conn)
.await
.unwrap();
let mut g0 = pool.get().await.unwrap();
g0.needs_health_check = true;
drop(g0);
let mut g0 = pool.get().await.unwrap();
g0.is_healthy = false;
drop(g0);
let g0 = pool.get().await.unwrap();
drop(g0);
};
block_on(fut);
}
#[test]
fn try_get_health_check() {
let fut = async {
let pool: Pool<Conn<usize>, 5> = Pool::new_eager((), create_conn)
.await
.unwrap();
let mut g0 = pool.try_get().await.unwrap().unwrap();
g0.needs_health_check = true;
drop(g0);
let mut g0 = pool.try_get().await.unwrap().unwrap();
g0.is_healthy = false;
drop(g0);
let g0 = pool.try_get().await.unwrap().unwrap();
drop(g0);
};
block_on(fut);
}
#[test]
fn get_uninitialized() {
let fut = async {
let pool: Pool<Conn<usize>, 5> = Pool::new(());
let c0 = pool.get().await.unwrap();
assert_usize_inner!(c0);
let c1 = pool.get().await.unwrap();
assert_usize_inner!(c1);
let c2 = pool.get().await.unwrap();
assert_usize_inner!(c2);
let c3 = pool.get().await.unwrap();
assert_usize_inner!(c3);
let c4 = pool.get().await.unwrap();
assert_usize_inner!(c4);
};
block_on(fut);
}
#[test]
fn try_get_uninitialized() {
let fut = async {
let pool: Pool<Conn<usize>, 5> = Pool::new(());
let c0 = pool.try_get().await.unwrap().unwrap();
assert_usize_inner!(c0);
let c1 = pool.try_get().await.unwrap().unwrap();
assert_usize_inner!(c1);
let c2 = pool.try_get().await.unwrap().unwrap();
assert_usize_inner!(c2);
let c3 = pool.try_get().await.unwrap().unwrap();
assert_usize_inner!(c3);
let c4 = pool.try_get().await.unwrap().unwrap();
assert_usize_inner!(c4);
};
block_on(fut);
}
#[test]
fn get_or_create() {
let fut = async {
let pool: Pool<Conn<usize>, 1> = Pool::default();
let pooled = pool.get_or_create().await.unwrap();
assert_usize_inner!(pooled);
assert!(pooled.is_pooled());
let fresh = pool.get_or_create().await.unwrap();
assert_usize_inner!(pooled);
assert!(fresh.is_fresh());
drop(pooled);
let pooled = pool.get_or_create().await.unwrap();
assert_usize_inner!(pooled);
assert!(pooled.is_pooled());
drop(fresh);
let fresh = pool.get_or_create().await.unwrap();
assert_usize_inner!(fresh);
assert!(fresh.is_fresh());
};
block_on(fut);
}
#[test]
fn lazy_conn_basic_utils() {
let conn = LazyConn::<Conn<usize>>::default();
assert!(conn.get().is_none());
let mut conn = LazyConn::init(Conn::<usize>::new());
assert!(conn.get().is_some());
conn.expire();
assert!(conn.get().is_none());
}
#[test]
fn insert() {
let fut = async {
let pool: Pool<Conn<usize>, 1> = Pool::default();
let mut conn = Conn::<usize>::new();
conn.inner = 400;
pool.insert(conn).await;
let conn = pool.get().await.unwrap();
assert_eq!(conn.inner, 400);
};
block_on(fut);
}
}