amqp_api_server/api/input/
amqp_request_replier.rs

1use std::sync::Arc;
2use amqp_api_shared::request_result::RequestResult;
3
4use lapin::message::Delivery;
5use lapin::options::BasicPublishOptions;
6use lapin::types::ShortString;
7use lapin::{BasicProperties, Channel};
8
9use crate::error::{Error, ErrorKind};
10
11pub struct AmqpRequestReplier<'reply> {
12    channel: &'reply Arc<Channel>,
13    reply_to: &'reply str,
14    response_properties: BasicProperties,
15}
16
17impl<'reply> AmqpRequestReplier<'reply> {
18    fn new(
19        channel: &'reply Arc<Channel>,
20        reply_to: &'reply str,
21        response_properties: BasicProperties,
22    ) -> AmqpRequestReplier<'reply> {
23        AmqpRequestReplier {
24            channel,
25            reply_to,
26            response_properties,
27        }
28    }
29
30    pub async fn reply(&'reply self, result: RequestResult) -> Result<(), Error> {
31        let options = BasicPublishOptions::default();
32        let payload = match serde_json::to_vec(&result) {
33            Ok(payload) => payload,
34            Err(error) => {
35                return Err(Error::new(
36                    ErrorKind::InternalFailure,
37                    format!("failed to serialize result: {}", error),
38                ));
39            }
40        };
41
42        match self
43            .channel
44            .basic_publish(
45                "",
46                self.reply_to,
47                options,
48                payload.as_slice(),
49                self.response_properties.clone(),
50            )
51            .await
52        {
53            Ok(_) => Ok(()),
54            Err(error) => Err(Error::new(
55                ErrorKind::AmqpFailure,
56                format!("failed to send reply: {}", error),
57            )),
58        }
59    }
60}
61
62pub fn try_generate_replier<'reply>(
63    channel: &'reply Arc<Channel>,
64    delivery: &'reply Delivery,
65) -> Option<AmqpRequestReplier<'reply>> {
66    let request_properties = &delivery.properties;
67
68    let reply_to = match request_properties.reply_to() {
69        Some(reply_to) => reply_to,
70        None => return None,
71    };
72
73    let mut properties =
74        BasicProperties::default().with_content_type(ShortString::from("application/json"));
75
76    if let Some(correlation_id) = request_properties.correlation_id() {
77        properties = properties.with_correlation_id(correlation_id.clone());
78    }
79
80    Some(AmqpRequestReplier::new(
81        channel,
82        reply_to.as_str(),
83        properties,
84    ))
85}