poolx-redis 0.1.2

poolx for redis, with many features like idle connection checking and reaping, healthcheck, and more.
Documentation
use std::io::ErrorKind;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;

use futures_core::future::BoxFuture;
use redis::aio::ConnectionLike;
use redis::{Client, Cmd, Pipeline, RedisFuture, Value};

use poolx::{Connection, ConnectOptions, futures_core, url};
use poolx::url::Url;

#[derive(Debug, Clone)]
pub struct RedisConnectionOption {
    url: Url,
    client: Client,
}

impl RedisConnectionOption {}

impl FromStr for RedisConnectionOption {
    type Err = poolx::Error;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        let url = s.parse::<Url>().map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
        Self::from_url(&url)
    }
}

impl ConnectOptions for RedisConnectionOption {
    type Connection = RedisConnection;

    fn from_url(url: &url::Url) -> Result<Self, poolx::Error> {
        let client = Client::open(url.clone()).map_err(|e| poolx::Error::Configuration(Box::new(e)))?;
        Ok(Self {
            url: url.clone(),
            client,
        })
    }

    fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, poolx::Error>> where Self::Connection: Sized {
        Box::pin(async move {
            let conn = self.client.get_async_connection().await.map_err(|e| poolx::Error::Io(std::io::Error::from(ErrorKind::ConnectionReset)))?;
            Ok(RedisConnection { inner: conn })
        })
    }
}

pub struct RedisConnection {
    inner: redis::aio::Connection,
}

impl AsMut<redis::aio::Connection> for RedisConnection {
    fn as_mut(&mut self) -> &mut redis::aio::Connection {
        &mut self.inner
    }
}


impl Connection for RedisConnection {
    type Options = RedisConnectionOption;

    fn close(mut self) -> BoxFuture<'static, Result<(), poolx::Error>> {
        Box::pin(async move {
            self.inner.req_packed_command(&redis::cmd("QUIT")).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
            Ok(())
        })
    }

    fn close_hard(self) -> BoxFuture<'static, Result<(), poolx::Error>> {
        Box::pin(async move {
            Ok(())
        })
    }

    fn ping(&mut self) -> BoxFuture<'_, Result<(), poolx::Error>> {
        Box::pin(async move {
            let pong: String = redis::cmd("PING").query_async(&mut self.inner).await.map_err(|e| std::io::Error::new(ErrorKind::ConnectionReset, e.to_string()))?;
            match pong.as_str() {
                "PONG" => Ok(()),
                _ => Err(poolx::Error::ResponseError),
            }
        })
    }
}

impl ConnectionLike for RedisConnection{
    fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
        self.inner.req_packed_command(cmd)
    }

    fn req_packed_commands<'a>(&'a mut self, cmd: &'a Pipeline, offset: usize, count: usize) -> RedisFuture<'a, Vec<Value>> {
        self.inner.req_packed_commands(cmd, offset, count)
    }

    fn get_db(&self) -> i64 {
        self.inner.get_db()
    }
}
#[cfg(test)]
mod tests {
    use redis::cmd;

    use poolx::{Pool, PoolOptions};

    use crate::RedisConnection;

    #[tokio::test]
    async fn test_redis_connection_pool() {
        let url = "redis://:foobared@127.0.0.1:6379";
        let option = url.parse::<super::RedisConnectionOption>().unwrap();

        let pool: Pool<RedisConnection> = PoolOptions::new()
            .test_before_acquire(true)
            .idle_timeout(std::time::Duration::from_secs(3))
            .min_connections(3)
            .max_connections(100)
            .connect_lazy_with(option);

        for i in 0..10 {
            let mut conn = pool.acquire().await.unwrap();
            let reply: String = cmd("PING").query_async(conn.as_mut()).await.unwrap();
            println!("reply: {}", reply);
        }
    }
}