amqp_api_server/api/input/
amqp_request_replier.rs1use std::sync::Arc;
2use amqp_api_shared::request_result::RequestResult;
3
4use lapin::message::Delivery;
5use lapin::options::BasicPublishOptions;
6use lapin::types::ShortString;
7use lapin::{BasicProperties, Channel};
8
9use crate::error::{Error, ErrorKind};
10
11pub struct AmqpRequestReplier<'reply> {
12 channel: &'reply Arc<Channel>,
13 reply_to: &'reply str,
14 response_properties: BasicProperties,
15}
16
17impl<'reply> AmqpRequestReplier<'reply> {
18 fn new(
19 channel: &'reply Arc<Channel>,
20 reply_to: &'reply str,
21 response_properties: BasicProperties,
22 ) -> AmqpRequestReplier<'reply> {
23 AmqpRequestReplier {
24 channel,
25 reply_to,
26 response_properties,
27 }
28 }
29
30 pub async fn reply(&'reply self, result: RequestResult) -> Result<(), Error> {
31 let options = BasicPublishOptions::default();
32 let payload = match serde_json::to_vec(&result) {
33 Ok(payload) => payload,
34 Err(error) => {
35 return Err(Error::new(
36 ErrorKind::InternalFailure,
37 format!("failed to serialize result: {}", error),
38 ));
39 }
40 };
41
42 match self
43 .channel
44 .basic_publish(
45 "",
46 self.reply_to,
47 options,
48 payload.as_slice(),
49 self.response_properties.clone(),
50 )
51 .await
52 {
53 Ok(_) => Ok(()),
54 Err(error) => Err(Error::new(
55 ErrorKind::AmqpFailure,
56 format!("failed to send reply: {}", error),
57 )),
58 }
59 }
60}
61
62pub fn try_generate_replier<'reply>(
63 channel: &'reply Arc<Channel>,
64 delivery: &'reply Delivery,
65) -> Option<AmqpRequestReplier<'reply>> {
66 let request_properties = &delivery.properties;
67
68 let reply_to = match request_properties.reply_to() {
69 Some(reply_to) => reply_to,
70 None => return None,
71 };
72
73 let mut properties =
74 BasicProperties::default().with_content_type(ShortString::from("application/json"));
75
76 if let Some(correlation_id) = request_properties.correlation_id() {
77 properties = properties.with_correlation_id(correlation_id.clone());
78 }
79
80 Some(AmqpRequestReplier::new(
81 channel,
82 reply_to.as_str(),
83 properties,
84 ))
85}