1use crate::error::{AmqpError, Result};
2use deadpool::managed::{Manager, Pool, RecycleError, RecycleResult};
3use lapin::{Channel, Connection, ConnectionProperties};
4use std::future::Future;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8pub struct AmqpConnectionManager {
9 uri: String,
10 connection_name: String,
11}
12
13impl AmqpConnectionManager {
14 pub fn new(uri: String, connection_name: String) -> Self {
15 Self { uri, connection_name }
16 }
17}
18
19impl Manager for AmqpConnectionManager {
20 type Type = Connection;
21 type Error = lapin::Error;
22
23 fn create(&self) -> impl Future<Output = std::result::Result<Self::Type, Self::Error>> + Send {
24 let uri = self.uri.clone();
25 let connection_name = self.connection_name.clone();
26 async move {
27 let opts = ConnectionProperties::default()
28 .with_connection_name(connection_name.clone().into());
29
30 Connection::connect(&uri, opts).await
31 }
32 }
33
34 #[allow(clippy::manual_async_fn)]
35 fn recycle(&self, conn: &mut Self::Type, _metrics: &deadpool::managed::Metrics) -> impl Future<Output = RecycleResult<Self::Error>> + Send {
36 async move {
37 if conn.status().connected() {
38 Ok(())
39 } else {
40 Err(RecycleError::Backend(
41 std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Connection not connected").into(),
42 ))
43 }
44 }
45 }
46}
47
48pub type AmqpPool = Pool<AmqpConnectionManager>;
49
50pub fn create_pool(uri: String, connection_name: String, max_size: usize) -> Result<AmqpPool> {
51 let manager = AmqpConnectionManager::new(uri, connection_name);
52 let pool = Pool::builder(manager)
53 .max_size(max_size)
54 .build()
55 .map_err(|e| AmqpError::PoolError(e.to_string()))?;
56
57 Ok(pool)
58}
59
60#[derive(Clone)]
61pub struct ChannelPool {
62 pool: Arc<AmqpPool>,
63 channel: Arc<Mutex<Option<Channel>>>,
64}
65
66impl ChannelPool {
67 pub fn new(pool: Arc<AmqpPool>) -> Self {
68 Self {
69 pool,
70 channel: Arc::new(Mutex::new(None)),
71 }
72 }
73
74 pub async fn get_channel(&self) -> Result<Channel> {
75 let mut cached = self.channel.lock().await;
76
77 if let Some(channel) = cached.as_ref()
78 && channel.status().connected()
79 {
80 return Ok(channel.clone());
81 }
82
83 let conn = self
84 .pool
85 .get()
86 .await
87 .map_err(|e| AmqpError::PoolError(e.to_string()))?;
88
89 let channel = conn
90 .create_channel()
91 .await
92 .map_err(AmqpError::ConnectionError)?;
93 *cached = Some(channel.clone());
94
95 Ok(channel)
96 }
97}