mobc_lapin/lib.rs
1pub use lapin;
2use lapin::{Connection, ConnectionProperties, ConnectionState, Error as LapinError};
3pub use mobc;
4use mobc::async_trait;
5use mobc::Manager;
6
7/// A `mobc::Manager` for `lapin::Connection`s.
8///
9/// ## Example
10///
11/// ```no_run
12/// use mobc::Pool;
13/// use mobc_lapin::RMQConnectionManager;
14/// use tokio_amqp::*;
15/// use futures::StreamExt;
16/// use std::time::Duration;
17/// use lapin::{
18/// options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation,
19/// ConnectionProperties,
20/// };
21///
22/// const PAYLOAD: &[u8;13] = b"Hello, World!";
23/// const QUEUE_NAME: &str = "test";
24///
25/// #[tokio::main]
26/// async fn main() {
27/// let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
28/// let manager = RMQConnectionManager::new(
29/// addr.to_owned(),
30/// ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current()),
31/// );
32/// let pool = Pool::<RMQConnectionManager>::builder()
33/// .max_open(5)
34/// .build(manager);
35///
36/// let conn = pool.get().await.unwrap();
37/// let channel = conn.create_channel().await.unwrap();
38/// let _ = channel
39/// .queue_declare(
40/// QUEUE_NAME,
41/// QueueDeclareOptions::default(),
42/// FieldTable::default(),
43/// )
44/// .await.unwrap();
45///
46/// // send messages to the queue
47/// println!("spawning senders...");
48/// for i in 0..50 {
49/// let send_pool = pool.clone();
50/// let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
51/// tokio::spawn(async move {
52/// let mut interval = tokio::time::interval(Duration::from_millis(200));
53/// loop {
54/// interval.tick().await;
55/// let send_conn = send_pool.get().await.unwrap();
56/// let send_channel = send_conn.create_channel().await.unwrap();
57/// let confirm = send_channel
58/// .basic_publish(
59/// "",
60/// QUEUE_NAME,
61/// BasicPublishOptions::default(),
62/// PAYLOAD.as_ref(),
63/// send_props.clone(),
64/// )
65/// .await.unwrap()
66/// .await.unwrap();
67/// assert_eq!(confirm, Confirmation::NotRequested);
68/// }
69///
70/// });
71/// }
72///
73/// // listen for incoming messages from the queue
74/// let mut consumer = channel
75/// .basic_consume(
76/// QUEUE_NAME,
77/// "my_consumer",
78/// BasicConsumeOptions::default(),
79/// FieldTable::default(),
80/// )
81/// .await.unwrap();
82///
83/// println!("listening to messages...");
84/// while let Some(delivery) = consumer.next().await {
85/// let delivery = delivery.expect("error in consumer");
86/// println!("incoming message from: {:?}", delivery.properties.kind());
87/// channel
88/// .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
89/// .await
90/// .expect("ack");
91/// }
92/// }
93/// ```
94#[derive(Clone)]
95pub struct RMQConnectionManager {
96 addr: String,
97 connection_properties: ConnectionProperties,
98}
99
100impl RMQConnectionManager {
101 pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
102 Self {
103 addr,
104 connection_properties,
105 }
106 }
107}
108
109#[async_trait]
110impl Manager for RMQConnectionManager {
111 type Connection = Connection;
112 type Error = LapinError;
113
114 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
115 let c = Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await?;
116 Ok(c)
117 }
118
119 async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
120 match conn.status().state() {
121 ConnectionState::Connected => Ok(conn),
122 other_state => Err(LapinError::InvalidConnectionState(other_state)),
123 }
124 }
125}