carrot_cake/pool/
channel.rs

1//! Implements [`Manager`] for [`Channel`].
2use deadpool::managed::{self, Manager};
3use lapin::{options::ConfirmSelectOptions, Channel, ChannelState};
4
5use super::connection::ConnectionPool;
6
7/// `ChannelPool` pools [`Channel`]s.
8///
9/// It maintains an internal pool of connections to the rabbitmq instance.
10pub type ChannelPool = deadpool::managed::Pool<ChannelManager>;
11
12/// `ChannelManager` implements [Manager] to manage a pool of [`Channel`]s.
13///
14/// `ChannelManager` keeps an internal [`ConnectionPool`]
15///  in order to reuse connections across channels.
16pub struct ChannelManager {
17    connection_pool: ConnectionPool,
18    pub(crate) publisher_confirms: bool,
19}
20
21impl ChannelManager {
22    /// Construct `ChannelManager` for a [`Channel`].
23    ///
24    /// By default, all channels will have publisher confirmations enabled,
25    /// but you can opt out using [`ChannelManager::without_publisher_confirmations`]
26    pub fn new(connection_pool: ConnectionPool) -> Self {
27        Self {
28            connection_pool,
29            publisher_confirms: true,
30        }
31    }
32
33    /// Disable publisher confirmations.
34    ///
35    /// By default, we enable publisher confirmations, but you can opt out using this flag
36    pub fn without_publisher_confirmations(mut self) -> Self {
37        self.publisher_confirms = false;
38        self
39    }
40}
41
42/// Implements [`Manager`] for [`Channel`] with publisher confirms enabled.
43#[async_trait::async_trait]
44impl Manager for ChannelManager {
45    type Type = Channel;
46    type Error = super::Error;
47
48    async fn create(&self) -> Result<Channel, super::Error> {
49        let connection = self.connection_pool.get().await?;
50        let channel = connection.create_channel().await?;
51        if self.publisher_confirms {
52            channel
53                .confirm_select(ConfirmSelectOptions { nowait: false })
54                .await?;
55        }
56        Ok(channel)
57    }
58
59    async fn recycle(&self, obj: &mut Channel) -> managed::RecycleResult<super::Error> {
60        match obj.status().state() {
61            ChannelState::Connected => Ok(()),
62            state => Err(managed::RecycleError::Message(format!(
63                "Channel is not in an healthy state {state:?}",
64            ))),
65        }
66    }
67}