cp_microservice/impl/api/server/input/
amqp_input.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures_util::TryStreamExt;
5use lapin::message::Delivery;
6use lapin::options::{BasicAckOptions, BasicPublishOptions, BasicRejectOptions};
7use lapin::types::ShortString;
8use lapin::{BasicProperties, Channel, Consumer};
9use uuid::Uuid;
10
11use crate::api::server::input::input::Input;
12use crate::api::server::input::input_data::InputData;
13use crate::api::server::input::replier::Replier;
14use crate::api::shared::request::Request;
15use crate::core::error::{Error, ErrorKind};
16use crate::r#impl::api::shared::amqp_queue_consumer::AmqpQueueConsumer;
17
18pub struct AmqpInput {
19    channel: Arc<Channel>,
20    consumer: Consumer,
21    reject_options: BasicRejectOptions,
22    ack_options: BasicAckOptions,
23}
24
25impl AmqpInput {
26    pub async fn try_new(
27        channel: Arc<Channel>,
28        queue_consumer: AmqpQueueConsumer,
29    ) -> Result<AmqpInput, Error> {
30        let _queue = match channel
31            .queue_declare(
32                queue_consumer.queue().name(),
33                *queue_consumer.queue().declare().options(),
34                queue_consumer.queue().declare().arguments().clone(),
35            )
36            .await
37        {
38            Ok(queue) => queue,
39            Err(error) => {
40                return Err(Error::new(
41                    ErrorKind::ApiError,
42                    format!("failed to declare queue: {}", error),
43                ));
44            }
45        };
46
47        match channel
48            .basic_qos(
49                queue_consumer.qos().prefetch_count(),
50                *queue_consumer.qos().options(),
51            )
52            .await
53        {
54            Ok(()) => (),
55            Err(error) => {
56                return Err(Error::new(
57                    ErrorKind::ApiError,
58                    format!("failure basic qos: {}", error),
59                ));
60            }
61        }
62
63        let consumer = match AmqpInput::try_get_consumer(&channel, &queue_consumer).await {
64            Ok(consumer) => consumer,
65            Err(error) => {
66                return Err(Error::new(
67                    ErrorKind::ApiError,
68                    format!("failed to create consumer: {}", error),
69                ));
70            }
71        };
72
73        let reject_options = *queue_consumer.reject();
74        let ack_options = *queue_consumer.acknowledge();
75
76        Ok(Self {
77            channel,
78            consumer,
79            reject_options,
80            ack_options,
81        })
82    }
83
84    async fn try_get_consumer(
85        channel: &Arc<Channel>,
86        queue_consumer: &AmqpQueueConsumer,
87    ) -> Result<Consumer, Error> {
88        let consumer_tag = format!("{}#{}", queue_consumer.queue().name(), Uuid::new_v4());
89        let consumer = match channel
90            .basic_consume(
91                queue_consumer.queue().name(),
92                consumer_tag.as_str(),
93                *queue_consumer.consume().options(),
94                queue_consumer.consume().arguments().clone(),
95            )
96            .await
97        {
98            Ok(consumer) => consumer,
99            Err(error) => {
100                return Err(Error::new(
101                    ErrorKind::ApiError,
102                    format!("failure basic consume: {}", error),
103                ));
104            }
105        };
106
107        Ok(consumer)
108    }
109
110    async fn reject_delivery(&self, delivery: Delivery, rejection_error: Error) -> Error {
111        match delivery.reject(self.reject_options).await {
112            Ok(_) => rejection_error,
113            Err(error) => Error::new(
114                ErrorKind::ApiError,
115                format!("failed to reject delivery: {}", error),
116            ),
117        }
118    }
119}
120
121#[async_trait]
122impl Input for AmqpInput {
123    async fn receive(&mut self) -> Result<InputData, Error> {
124        let delivery = match self.consumer.try_next().await {
125            Ok(optional_delivery) => match optional_delivery {
126                Some(delivery) => delivery,
127                None => {
128                    return Err(Error::new(
129                        ErrorKind::ApiError,
130                        "consumer got an empty delivery",
131                    ));
132                }
133            },
134            Err(error) => {
135                return Err(Error::new(
136                    ErrorKind::ApiError,
137                    format!("consumer got an error: {}", error),
138                ));
139            }
140        };
141
142        let json_request = match std::str::from_utf8(delivery.data.as_slice()) {
143            Ok(json_request) => json_request,
144            Err(error) => {
145                return Err(self
146                    .reject_delivery(
147                        delivery,
148                        Error::new(
149                            ErrorKind::RequestError,
150                            format!("delivery is not an utf8 string: {}", error),
151                        ),
152                    )
153                    .await);
154            }
155        };
156
157        let request: Request = match serde_json::from_str(json_request) {
158            Ok(request) => request,
159            Err(error) => {
160                return Err(self
161                    .reject_delivery(
162                        delivery,
163                        Error::new(
164                            ErrorKind::RequestError,
165                            format!("failed to deserialize request: {}", error),
166                        ),
167                    )
168                    .await);
169            }
170        };
171
172        if let Err(error) = delivery.ack(self.ack_options).await {
173            log::warn!("failed to acknowledge delivery: {}", error);
174        }
175
176        let channel = self.channel.clone();
177        let properties: BasicProperties = delivery.properties;
178
179        let replier: Replier = Arc::new(move |value| {
180            let channel = channel.clone();
181            let properties = properties.clone();
182
183            Box::pin(async move {
184                let request_properties = properties;
185
186                let reply_to = match request_properties.reply_to() {
187                    Some(reply_to) => reply_to,
188                    None => return Ok(()),
189                };
190
191                let mut response_properties = BasicProperties::default()
192                    .with_content_type(ShortString::from("application/json"));
193
194                if let Some(correlation_id) = request_properties.correlation_id() {
195                    response_properties =
196                        response_properties.with_correlation_id(correlation_id.clone());
197                }
198
199                let publish_options = BasicPublishOptions::default();
200                let payload = match serde_json::to_vec(&value) {
201                    Ok(payload) => payload,
202                    Err(error) => {
203                        return Err(Error::new(
204                            ErrorKind::ApiError,
205                            format!("failed to serialize result: {}", error),
206                        ));
207                    }
208                };
209
210                match channel
211                    .basic_publish(
212                        "",
213                        reply_to.as_str(),
214                        publish_options,
215                        payload.as_slice(),
216                        response_properties,
217                    )
218                    .await
219                {
220                    Ok(_) => (),
221                    Err(error) => {
222                        return Err(Error::new(
223                            ErrorKind::ApiError,
224                            format!("failed to send reply: {}", error),
225                        ));
226                    }
227                }
228
229                Ok(())
230            })
231        });
232
233        Ok(InputData::new(request, replier))
234    }
235}