1#![deny(missing_docs, missing_debug_implementations)]
3
4pub use bb8;
5pub use lapin;
6
7pub mod prelude;
9
10use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
11use lapin::types::ShortString;
12use lapin::{ConnectionProperties, ConnectionState};
13use std::fmt;
14
15pub struct LapinConnectionManager {
40 amqp_address: String,
41 conn_properties: ConnectionProperties,
42}
43
44impl LapinConnectionManager {
45 pub fn new(amqp_address: &str, conn_properties: ConnectionProperties) -> Self {
54 Self {
55 amqp_address: amqp_address.to_string(),
56 conn_properties,
57 }
58 }
59}
60
61impl bb8::ManageConnection for LapinConnectionManager {
62 type Connection = lapin::Connection;
63 type Error = lapin::ErrorKind;
64
65 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
66 lapin::Connection::connect(&self.amqp_address, self.conn_properties.clone())
67 .await
68 .map_err(|e| e.kind().to_owned())
69 }
70
71 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
72 let valid_states = [ConnectionState::Initial, ConnectionState::Connecting, ConnectionState::Connected];
73 if valid_states.contains(&conn.status().state()) {
74 Ok(())
75 } else {
76 Err(lapin::ErrorKind::ProtocolError(AMQPError::new(
77 AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
78 ShortString::from("Invalid connection"),
79 )))
80 }
81 }
82
83 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
84 let broken_states = [ConnectionState::Closed, ConnectionState::Error];
85 broken_states.contains(&conn.status().state())
86 }
87}
88
89impl fmt::Debug for LapinConnectionManager {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("LapinConnectionManager")
92 .field("amqp_address", &self.amqp_address)
93 .finish()
94 }
95}