use lapin::{
message::{BasicReturnMessage, Delivery, DeliveryResult},
options::*,
protocol::{AMQPErrorKind, AMQPSoftError},
types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use log::info;
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
async_global_executor::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connection error");
info!("CONNECTED");
let channel_a = conn.create_channel().await.expect("create_channel");
let channel_b = conn.create_channel().await.expect("create_channel");
info!("[{}] state: {:?}", line!(), conn.status().state());
let queue = channel_a
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("[{}] declared queue: {:?}", line!(), queue);
channel_a
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("Enabled publisher-confirms");
info!("will consume");
channel_b
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| async move {
info!("received message: {:?}", delivery);
if let Ok(Some((_, delivery))) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
}
})
.expect("set_delegate");
info!("[{}] state: {:?}", line!(), conn.status().state());
info!("will publish");
let payload = b"Hello world!";
let confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await .expect("publisher-confirms");
assert!(confirm.is_ack());
assert_eq!(confirm.take_message(), None);
info!("[{}] state: {:?}", line!(), conn.status().state());
for _ in 1..=2 {
channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await
.expect("basic_publish"); }
let returned = channel_a
.wait_for_confirms()
.await
.expect("wait for confirms");
assert!(returned.is_empty());
let confirm = channel_a
.basic_publish(
"",
"unroutable-routing-key-for-tests",
BasicPublishOptions {
mandatory: true,
..BasicPublishOptions::default()
},
payload.to_vec(),
BasicProperties::default().with_priority(42),
)
.await
.expect("basic_publish")
.await .expect("publisher-confirms");
assert!(confirm.is_ack());
let message = confirm.take_message().unwrap();
assert_eq!(
message,
BasicReturnMessage {
delivery: Delivery {
delivery_tag: 0,
exchange: "".into(),
routing_key: "unroutable-routing-key-for-tests".into(),
redelivered: false,
properties: BasicProperties::default().with_priority(42),
data: payload.to_vec(),
acker: Default::default(),
},
reply_code: 312,
reply_text: "NO_ROUTE".into(),
}
);
let error = message.error().unwrap();
assert_eq!(error.kind(), &AMQPErrorKind::Soft(AMQPSoftError::NOROUTE));
let _ = channel_a;
})
}