carrot_cake/pool/
connection.rs

1//! Implements [`Manager`] for [`Connection`].
2
3use crate::amqp::ConnectionFactory;
4use deadpool::managed::{self, Manager};
5use lapin::{Connection, ConnectionState};
6
7/// `ConnectionPool` is a connection pool for a `lapin::Channel`.
8pub type ConnectionPool = deadpool::managed::Pool<ConnectionFactory>;
9
10#[async_trait::async_trait]
11impl Manager for ConnectionFactory {
12    type Type = Connection;
13    type Error = super::Error;
14
15    async fn create(&self) -> Result<Connection, super::Error> {
16        Ok(self.new_connection().await?)
17    }
18
19    async fn recycle(&self, obj: &mut Connection) -> managed::RecycleResult<super::Error> {
20        match obj.status().state() {
21            ConnectionState::Connected => Ok(()),
22            state => Err(managed::RecycleError::Message(format!(
23                "Connection is not in an healthy state {state:?}",
24            ))),
25        }
26    }
27}