cp_microservice/impl/api/server/input/
amqp_input.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures_util::TryStreamExt;
5use lapin::message::Delivery;
6use lapin::options::{BasicAckOptions, BasicPublishOptions, BasicRejectOptions};
7use lapin::types::ShortString;
8use lapin::{BasicProperties, Channel, Consumer};
9use uuid::Uuid;
10
11use crate::api::server::input::input::Input;
12use crate::api::server::input::input_data::InputData;
13use crate::api::server::input::replier::Replier;
14use crate::api::shared::request::Request;
15use crate::core::error::{Error, ErrorKind};
16use crate::r#impl::api::shared::amqp_queue_consumer::AmqpQueueConsumer;
17
18pub struct AmqpInput {
19 channel: Arc<Channel>,
20 consumer: Consumer,
21 reject_options: BasicRejectOptions,
22 ack_options: BasicAckOptions,
23}
24
25impl AmqpInput {
26 pub async fn try_new(
27 channel: Arc<Channel>,
28 queue_consumer: AmqpQueueConsumer,
29 ) -> Result<AmqpInput, Error> {
30 let _queue = match channel
31 .queue_declare(
32 queue_consumer.queue().name(),
33 *queue_consumer.queue().declare().options(),
34 queue_consumer.queue().declare().arguments().clone(),
35 )
36 .await
37 {
38 Ok(queue) => queue,
39 Err(error) => {
40 return Err(Error::new(
41 ErrorKind::ApiError,
42 format!("failed to declare queue: {}", error),
43 ));
44 }
45 };
46
47 match channel
48 .basic_qos(
49 queue_consumer.qos().prefetch_count(),
50 *queue_consumer.qos().options(),
51 )
52 .await
53 {
54 Ok(()) => (),
55 Err(error) => {
56 return Err(Error::new(
57 ErrorKind::ApiError,
58 format!("failure basic qos: {}", error),
59 ));
60 }
61 }
62
63 let consumer = match AmqpInput::try_get_consumer(&channel, &queue_consumer).await {
64 Ok(consumer) => consumer,
65 Err(error) => {
66 return Err(Error::new(
67 ErrorKind::ApiError,
68 format!("failed to create consumer: {}", error),
69 ));
70 }
71 };
72
73 let reject_options = *queue_consumer.reject();
74 let ack_options = *queue_consumer.acknowledge();
75
76 Ok(Self {
77 channel,
78 consumer,
79 reject_options,
80 ack_options,
81 })
82 }
83
84 async fn try_get_consumer(
85 channel: &Arc<Channel>,
86 queue_consumer: &AmqpQueueConsumer,
87 ) -> Result<Consumer, Error> {
88 let consumer_tag = format!("{}#{}", queue_consumer.queue().name(), Uuid::new_v4());
89 let consumer = match channel
90 .basic_consume(
91 queue_consumer.queue().name(),
92 consumer_tag.as_str(),
93 *queue_consumer.consume().options(),
94 queue_consumer.consume().arguments().clone(),
95 )
96 .await
97 {
98 Ok(consumer) => consumer,
99 Err(error) => {
100 return Err(Error::new(
101 ErrorKind::ApiError,
102 format!("failure basic consume: {}", error),
103 ));
104 }
105 };
106
107 Ok(consumer)
108 }
109
110 async fn reject_delivery(&self, delivery: Delivery, rejection_error: Error) -> Error {
111 match delivery.reject(self.reject_options).await {
112 Ok(_) => rejection_error,
113 Err(error) => Error::new(
114 ErrorKind::ApiError,
115 format!("failed to reject delivery: {}", error),
116 ),
117 }
118 }
119}
120
121#[async_trait]
122impl Input for AmqpInput {
123 async fn receive(&mut self) -> Result<InputData, Error> {
124 let delivery = match self.consumer.try_next().await {
125 Ok(optional_delivery) => match optional_delivery {
126 Some(delivery) => delivery,
127 None => {
128 return Err(Error::new(
129 ErrorKind::ApiError,
130 "consumer got an empty delivery",
131 ));
132 }
133 },
134 Err(error) => {
135 return Err(Error::new(
136 ErrorKind::ApiError,
137 format!("consumer got an error: {}", error),
138 ));
139 }
140 };
141
142 let json_request = match std::str::from_utf8(delivery.data.as_slice()) {
143 Ok(json_request) => json_request,
144 Err(error) => {
145 return Err(self
146 .reject_delivery(
147 delivery,
148 Error::new(
149 ErrorKind::RequestError,
150 format!("delivery is not an utf8 string: {}", error),
151 ),
152 )
153 .await);
154 }
155 };
156
157 let request: Request = match serde_json::from_str(json_request) {
158 Ok(request) => request,
159 Err(error) => {
160 return Err(self
161 .reject_delivery(
162 delivery,
163 Error::new(
164 ErrorKind::RequestError,
165 format!("failed to deserialize request: {}", error),
166 ),
167 )
168 .await);
169 }
170 };
171
172 if let Err(error) = delivery.ack(self.ack_options).await {
173 log::warn!("failed to acknowledge delivery: {}", error);
174 }
175
176 let channel = self.channel.clone();
177 let properties: BasicProperties = delivery.properties;
178
179 let replier: Replier = Arc::new(move |value| {
180 let channel = channel.clone();
181 let properties = properties.clone();
182
183 Box::pin(async move {
184 let request_properties = properties;
185
186 let reply_to = match request_properties.reply_to() {
187 Some(reply_to) => reply_to,
188 None => return Ok(()),
189 };
190
191 let mut response_properties = BasicProperties::default()
192 .with_content_type(ShortString::from("application/json"));
193
194 if let Some(correlation_id) = request_properties.correlation_id() {
195 response_properties =
196 response_properties.with_correlation_id(correlation_id.clone());
197 }
198
199 let publish_options = BasicPublishOptions::default();
200 let payload = match serde_json::to_vec(&value) {
201 Ok(payload) => payload,
202 Err(error) => {
203 return Err(Error::new(
204 ErrorKind::ApiError,
205 format!("failed to serialize result: {}", error),
206 ));
207 }
208 };
209
210 match channel
211 .basic_publish(
212 "",
213 reply_to.as_str(),
214 publish_options,
215 payload.as_slice(),
216 response_properties,
217 )
218 .await
219 {
220 Ok(_) => (),
221 Err(error) => {
222 return Err(Error::new(
223 ErrorKind::ApiError,
224 format!("failed to send reply: {}", error),
225 ));
226 }
227 }
228
229 Ok(())
230 })
231 });
232
233 Ok(InputData::new(request, replier))
234 }
235}