bb8_redis_cluster/
lib.rs

1//! Redis Cluster support for the `bb8` connection pool.
2//!
3//! # Example
4//!
5//! ```
6//! use futures_util::future::join_all;
7//! use bb8_redis_cluster::{
8//!     bb8,
9//!     redis_cluster_async::redis::{cmd, AsyncCommands},
10//!     RedisConnectionManager
11//! };
12//!
13//! #[tokio::main]
14//! async fn main() {
15//!     let manager = RedisConnectionManager::new(vec!["redis://localhost"]).unwrap();
16//!     let pool = bb8::Pool::builder().build(manager).await.unwrap();
17//!
18//!     let mut handles = vec![];
19//!
20//!     for _i in 0..10 {
21//!         let pool = pool.clone();
22//!
23//!         handles.push(tokio::spawn(async move {
24//!             let mut conn = pool.get().await.unwrap();
25//!
26//!             let reply: String = cmd("PING").query_async(&mut *conn).await.unwrap();
27//!
28//!             assert_eq!("PONG", reply);
29//!         }));
30//!     }
31//!
32//!     join_all(handles).await;
33//! }
34//! ```
35#![allow(clippy::needless_doctest_main)]
36#![deny(missing_docs, missing_debug_implementations)]
37
38pub use bb8;
39pub use redis_cluster_async;
40
41use async_trait::async_trait;
42use redis_cluster_async::{
43    redis::{ErrorKind, IntoConnectionInfo, RedisError},
44    Client, Connection,
45};
46
47/// A `bb8::ManageConnection` for `redis_cluster_async::Client::get_connection`.
48#[derive(Clone)]
49pub struct RedisConnectionManager {
50    client: Client,
51}
52
53// Because redis_cluster_async::Client does not support Debug derive.
54impl std::fmt::Debug for RedisConnectionManager {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("RedisConnectionManager")
57            .field("client", &format!("pointer({:p})", &self.client))
58            .finish()
59    }
60}
61
62impl RedisConnectionManager {
63    /// Create a new `RedisConnectionManager`.
64    /// See `redis_cluster_async::Client::open` for a description of the parameter types.
65    pub fn new<T: IntoConnectionInfo>(infos: Vec<T>) -> Result<RedisConnectionManager, RedisError> {
66        Ok(RedisConnectionManager {
67            client: Client::open(infos.into_iter().collect())?,
68        })
69    }
70}
71
72#[async_trait]
73impl bb8::ManageConnection for RedisConnectionManager {
74    type Connection = Connection;
75    type Error = RedisError;
76
77    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
78        self.client.get_connection().await
79    }
80
81    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
82        let pong: String = redis::cmd("PING").query_async(conn).await?;
83        match pong.as_str() {
84            "PONG" => Ok(()),
85            _ => Err((ErrorKind::ResponseError, "ping request").into()),
86        }
87    }
88
89    fn has_broken(&self, _: &mut Self::Connection) -> bool {
90        false
91    }
92}