#![deny(missing_docs, missing_debug_implementations)]
pub use bb8;
pub use lapin;
pub mod prelude;
use async_rs::traits::RuntimeKit;
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::ShortString;
use lapin::ConnectionBuilder;
#[derive(Debug)]
pub struct LapinConnectionManager<RK: RuntimeKit + Send + Sync + Clone + 'static> {
conn_builder: ConnectionBuilder<RK>,
}
impl<RK: RuntimeKit + Send + Sync + Clone + 'static> LapinConnectionManager<RK> {
pub fn new(conn_builder: ConnectionBuilder<RK>) -> Self {
Self { conn_builder }
}
}
impl<RK: RuntimeKit + Send + Sync + Clone + 'static> bb8::ManageConnection for LapinConnectionManager<RK> {
type Connection = lapin::Connection;
type Error = lapin::ErrorKind;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.conn_builder.connect().await.map_err(|e| e.kind().to_owned())
}
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
let conn_status = conn.status();
if !conn_status.closing() && !conn_status.closed() && !conn_status.errored() {
Ok(())
} else {
Err(lapin::ErrorKind::ProtocolError(AMQPError::new(
AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
ShortString::from("Invalid connection"),
)))
}
}
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
let conn_status = conn.status();
conn_status.closed() || conn_status.errored()
}
}