1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
pub use lapin;
use lapin::{Connection, ConnectionProperties, ConnectionState, Error as LapinError};
pub use mobc;
use mobc::async_trait;
use mobc::Manager;

/// A `mobc::Manager` for `lapin::Connection`s.
///
/// ## Example
///
/// ```no_run
/// use mobc::Pool;
/// use mobc_lapin::RMQConnectionManager;
/// use tokio_amqp::*;
/// use futures::StreamExt;
/// use std::time::Duration;
/// use lapin::{
///     options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation,
///     ConnectionProperties,
/// };
///
/// const PAYLOAD: &[u8;13] = b"Hello, World!";
/// const QUEUE_NAME: &str = "test";
///
/// #[tokio::main]
/// async fn main() {
///     let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
///     let manager = RMQConnectionManager::new(
///         addr.to_owned(),
///         ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current()),
///     );
///     let pool = Pool::<RMQConnectionManager>::builder()
///         .max_open(5)
///         .build(manager);
///
///     let conn = pool.get().await.unwrap();
///     let channel = conn.create_channel().await.unwrap();
///     let _ = channel
///         .queue_declare(
///             QUEUE_NAME,
///             QueueDeclareOptions::default(),
///             FieldTable::default(),
///         )
///         .await.unwrap();
///
///     // send messages to the queue
///     println!("spawning senders...");
///     for i in 0..50 {
///         let send_pool = pool.clone();
///         let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
///         tokio::spawn(async move {
///             let mut interval = tokio::time::interval(Duration::from_millis(200));
///             loop {
///                 interval.tick().await;
///                 let send_conn = send_pool.get().await.unwrap();
///                 let send_channel = send_conn.create_channel().await.unwrap();
///                 let confirm = send_channel
///                     .basic_publish(
///                         "",
///                         QUEUE_NAME,
///                         BasicPublishOptions::default(),
///                         PAYLOAD.as_ref(),
///                         send_props.clone(),
///                     )
///                     .await.unwrap()
///                     .await.unwrap();
///                 assert_eq!(confirm, Confirmation::NotRequested);
///             }
///
///         });
///     }
///
///     // listen for incoming messages from the queue
///     let mut consumer = channel
///         .basic_consume(
///             QUEUE_NAME,
///             "my_consumer",
///             BasicConsumeOptions::default(),
///             FieldTable::default(),
///         )
///         .await.unwrap();
///
///     println!("listening to messages...");
///     while let Some(delivery) = consumer.next().await {
///         let delivery = delivery.expect("error in consumer");
///         println!("incoming message from: {:?}", delivery.properties.kind());
///         channel
///             .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
///             .await
///             .expect("ack");
///         }
/// }
/// ```
#[derive(Clone)]
pub struct RMQConnectionManager {
    addr: String,
    connection_properties: ConnectionProperties,
}

impl RMQConnectionManager {
    pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
        Self {
            addr,
            connection_properties,
        }
    }
}

#[async_trait]
impl Manager for RMQConnectionManager {
    type Connection = Connection;
    type Error = LapinError;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        let c = Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await?;
        Ok(c)
    }

    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
        match conn.status().state() {
            ConnectionState::Connected => Ok(conn),
            other_state => Err(LapinError::InvalidConnectionState(other_state)),
        }
    }
}