Skip to main content

easy_rmq_rs/
pool.rs

1use crate::error::{AmqpError, Result};
2use deadpool::managed::{Manager, Pool, RecycleError, RecycleResult};
3use lapin::{Channel, Connection, ConnectionProperties};
4use std::future::Future;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8pub struct AmqpConnectionManager {
9    uri: String,
10    connection_name: String,
11}
12
13impl AmqpConnectionManager {
14    pub fn new(uri: String, connection_name: String) -> Self {
15        Self { uri, connection_name }
16    }
17}
18
19impl Manager for AmqpConnectionManager {
20    type Type = Connection;
21    type Error = lapin::Error;
22
23    fn create(&self) -> impl Future<Output = std::result::Result<Self::Type, Self::Error>> + Send {
24        let uri = self.uri.clone();
25        let connection_name = self.connection_name.clone();
26        async move {
27            let opts = ConnectionProperties::default()
28                .with_connection_name(connection_name.clone().into());
29
30            Connection::connect(&uri, opts).await
31        }
32    }
33
34    #[allow(clippy::manual_async_fn)]
35    fn recycle(&self, conn: &mut Self::Type, _metrics: &deadpool::managed::Metrics) -> impl Future<Output = RecycleResult<Self::Error>> + Send {
36        async move {
37            if conn.status().connected() {
38                Ok(())
39            } else {
40                Err(RecycleError::Backend(
41                    std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Connection not connected").into(),
42                ))
43            }
44        }
45    }
46}
47
48pub type AmqpPool = Pool<AmqpConnectionManager>;
49
50pub fn create_pool(uri: String, connection_name: String, max_size: usize) -> Result<AmqpPool> {
51    let manager = AmqpConnectionManager::new(uri, connection_name);
52    let pool = Pool::builder(manager)
53        .max_size(max_size)
54        .build()
55        .map_err(|e| AmqpError::PoolError(e.to_string()))?;
56
57    Ok(pool)
58}
59
60#[derive(Clone)]
61pub struct ChannelPool {
62    pool: Arc<AmqpPool>,
63    channel: Arc<Mutex<Option<Channel>>>,
64}
65
66impl ChannelPool {
67    pub fn new(pool: Arc<AmqpPool>) -> Self {
68        Self {
69            pool,
70            channel: Arc::new(Mutex::new(None)),
71        }
72    }
73
74    pub async fn get_channel(&self) -> Result<Channel> {
75        let mut cached = self.channel.lock().await;
76
77        if let Some(channel) = cached.as_ref()
78            && channel.status().connected()
79        {
80            return Ok(channel.clone());
81        }
82
83        let conn = self
84            .pool
85            .get()
86            .await
87            .map_err(|e| AmqpError::PoolError(e.to_string()))?;
88
89        let channel = conn
90            .create_channel()
91            .await
92            .map_err(AmqpError::ConnectionError)?;
93        *cached = Some(channel.clone());
94
95        Ok(channel)
96    }
97}