mobc_lapin/
lib.rs

1pub use lapin;
2use lapin::{Connection, ConnectionProperties, ConnectionState, Error as LapinError};
3pub use mobc;
4use mobc::async_trait;
5use mobc::Manager;
6
7/// A `mobc::Manager` for `lapin::Connection`s.
8///
9/// ## Example
10///
11/// ```no_run
12/// use mobc::Pool;
13/// use mobc_lapin::RMQConnectionManager;
14/// use tokio_amqp::*;
15/// use futures::StreamExt;
16/// use std::time::Duration;
17/// use lapin::{
18///     options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation,
19///     ConnectionProperties,
20/// };
21///
22/// const PAYLOAD: &[u8;13] = b"Hello, World!";
23/// const QUEUE_NAME: &str = "test";
24///
25/// #[tokio::main]
26/// async fn main() {
27///     let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
28///     let manager = RMQConnectionManager::new(
29///         addr.to_owned(),
30///         ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current()),
31///     );
32///     let pool = Pool::<RMQConnectionManager>::builder()
33///         .max_open(5)
34///         .build(manager);
35///
36///     let conn = pool.get().await.unwrap();
37///     let channel = conn.create_channel().await.unwrap();
38///     let _ = channel
39///         .queue_declare(
40///             QUEUE_NAME,
41///             QueueDeclareOptions::default(),
42///             FieldTable::default(),
43///         )
44///         .await.unwrap();
45///
46///     // send messages to the queue
47///     println!("spawning senders...");
48///     for i in 0..50 {
49///         let send_pool = pool.clone();
50///         let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
51///         tokio::spawn(async move {
52///             let mut interval = tokio::time::interval(Duration::from_millis(200));
53///             loop {
54///                 interval.tick().await;
55///                 let send_conn = send_pool.get().await.unwrap();
56///                 let send_channel = send_conn.create_channel().await.unwrap();
57///                 let confirm = send_channel
58///                     .basic_publish(
59///                         "",
60///                         QUEUE_NAME,
61///                         BasicPublishOptions::default(),
62///                         PAYLOAD.as_ref(),
63///                         send_props.clone(),
64///                     )
65///                     .await.unwrap()
66///                     .await.unwrap();
67///                 assert_eq!(confirm, Confirmation::NotRequested);
68///             }
69///
70///         });
71///     }
72///
73///     // listen for incoming messages from the queue
74///     let mut consumer = channel
75///         .basic_consume(
76///             QUEUE_NAME,
77///             "my_consumer",
78///             BasicConsumeOptions::default(),
79///             FieldTable::default(),
80///         )
81///         .await.unwrap();
82///
83///     println!("listening to messages...");
84///     while let Some(delivery) = consumer.next().await {
85///         let delivery = delivery.expect("error in consumer");
86///         println!("incoming message from: {:?}", delivery.properties.kind());
87///         channel
88///             .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
89///             .await
90///             .expect("ack");
91///         }
92/// }
93/// ```
94#[derive(Clone)]
95pub struct RMQConnectionManager {
96    addr: String,
97    connection_properties: ConnectionProperties,
98}
99
100impl RMQConnectionManager {
101    pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
102        Self {
103            addr,
104            connection_properties,
105        }
106    }
107}
108
109#[async_trait]
110impl Manager for RMQConnectionManager {
111    type Connection = Connection;
112    type Error = LapinError;
113
114    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
115        let c = Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await?;
116        Ok(c)
117    }
118
119    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
120        match conn.status().state() {
121            ConnectionState::Connected => Ok(conn),
122            other_state => Err(LapinError::InvalidConnectionState(other_state)),
123        }
124    }
125}