carrot_cake/pool/
channel.rs1use deadpool::managed::{self, Manager};
3use lapin::{options::ConfirmSelectOptions, Channel, ChannelState};
4
5use super::connection::ConnectionPool;
6
7pub type ChannelPool = deadpool::managed::Pool<ChannelManager>;
11
12pub struct ChannelManager {
17 connection_pool: ConnectionPool,
18 pub(crate) publisher_confirms: bool,
19}
20
21impl ChannelManager {
22 pub fn new(connection_pool: ConnectionPool) -> Self {
27 Self {
28 connection_pool,
29 publisher_confirms: true,
30 }
31 }
32
33 pub fn without_publisher_confirmations(mut self) -> Self {
37 self.publisher_confirms = false;
38 self
39 }
40}
41
42#[async_trait::async_trait]
44impl Manager for ChannelManager {
45 type Type = Channel;
46 type Error = super::Error;
47
48 async fn create(&self) -> Result<Channel, super::Error> {
49 let connection = self.connection_pool.get().await?;
50 let channel = connection.create_channel().await?;
51 if self.publisher_confirms {
52 channel
53 .confirm_select(ConfirmSelectOptions { nowait: false })
54 .await?;
55 }
56 Ok(channel)
57 }
58
59 async fn recycle(&self, obj: &mut Channel) -> managed::RecycleResult<super::Error> {
60 match obj.status().state() {
61 ChannelState::Connected => Ok(()),
62 state => Err(managed::RecycleError::Message(format!(
63 "Channel is not in an healthy state {state:?}",
64 ))),
65 }
66 }
67}