1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
pub use lapin;
use lapin::{Connection, ConnectionProperties, ConnectionState, Error as LapinError};
pub use mobc;
use mobc::async_trait;
use mobc::Manager;
/// A `mobc::Manager` for `lapin::Connection`s.
///
/// ## Example
///
/// ```no_run
/// use mobc::Pool;
/// use mobc_lapin::RMQConnectionManager;
/// use tokio_amqp::*;
/// use futures::StreamExt;
/// use std::time::Duration;
/// use lapin::{
/// options::*, types::FieldTable, BasicProperties, publisher_confirm::Confirmation,
/// ConnectionProperties,
/// };
///
/// const PAYLOAD: &[u8;13] = b"Hello, World!";
/// const QUEUE_NAME: &str = "test";
///
/// #[tokio::main]
/// async fn main() {
/// let addr = "amqp://rmq:rmq@127.0.0.1:5672/%2f";
/// let manager = RMQConnectionManager::new(
/// addr.to_owned(),
/// ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current()),
/// );
/// let pool = Pool::<RMQConnectionManager>::builder()
/// .max_open(5)
/// .build(manager);
///
/// let conn = pool.get().await.unwrap();
/// let channel = conn.create_channel().await.unwrap();
/// let _ = channel
/// .queue_declare(
/// QUEUE_NAME,
/// QueueDeclareOptions::default(),
/// FieldTable::default(),
/// )
/// .await.unwrap();
///
/// // send messages to the queue
/// println!("spawning senders...");
/// for i in 0..50 {
/// let send_pool = pool.clone();
/// let send_props = BasicProperties::default().with_kind(format!("Sender: {}", i).into());
/// tokio::spawn(async move {
/// let mut interval = tokio::time::interval(Duration::from_millis(200));
/// loop {
/// interval.tick().await;
/// let send_conn = send_pool.get().await.unwrap();
/// let send_channel = send_conn.create_channel().await.unwrap();
/// let confirm = send_channel
/// .basic_publish(
/// "",
/// QUEUE_NAME,
/// BasicPublishOptions::default(),
/// PAYLOAD.as_ref(),
/// send_props.clone(),
/// )
/// .await.unwrap()
/// .await.unwrap();
/// assert_eq!(confirm, Confirmation::NotRequested);
/// }
///
/// });
/// }
///
/// // listen for incoming messages from the queue
/// let mut consumer = channel
/// .basic_consume(
/// QUEUE_NAME,
/// "my_consumer",
/// BasicConsumeOptions::default(),
/// FieldTable::default(),
/// )
/// .await.unwrap();
///
/// println!("listening to messages...");
/// while let Some(delivery) = consumer.next().await {
/// let delivery = delivery.expect("error in consumer");
/// println!("incoming message from: {:?}", delivery.properties.kind());
/// channel
/// .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
/// .await
/// .expect("ack");
/// }
/// }
/// ```
#[derive(Clone)]
pub struct RMQConnectionManager {
addr: String,
connection_properties: ConnectionProperties,
}
impl RMQConnectionManager {
pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
Self {
addr,
connection_properties,
}
}
}
#[async_trait]
impl Manager for RMQConnectionManager {
type Connection = Connection;
type Error = LapinError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let c = Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await?;
Ok(c)
}
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
match conn.status().state() {
ConnectionState::Connected => Ok(conn),
other_state => Err(LapinError::InvalidConnectionState(other_state)),
}
}
}