1use async_trait::async_trait;
17use bytes::Bytes;
18use coreon_core::{
19 message::{Body, Message},
20 uri::CamelUri,
21 CamelError, Component, Consumer, Endpoint, Exchange, Processor, Producer, Result,
22};
23use dashmap::DashMap;
24use futures::StreamExt;
25use lapin::{
26 options::{
27 BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
28 QueueDeclareOptions,
29 },
30 types::FieldTable,
31 BasicProperties, Connection, ConnectionProperties,
32};
33use std::sync::Arc;
34use tokio::{sync::Mutex, task::JoinHandle};
35use tracing::{debug, warn};
36
37pub struct AmqpComponent {
38 endpoints: DashMap<String, Arc<AmqpEndpoint>>,
39}
40
41impl AmqpComponent {
42 pub fn new() -> Arc<Self> {
43 Arc::new(Self {
44 endpoints: DashMap::new(),
45 })
46 }
47}
48
49impl Default for AmqpComponent {
50 fn default() -> Self {
51 Self {
52 endpoints: DashMap::new(),
53 }
54 }
55}
56
57#[async_trait]
58impl Component for AmqpComponent {
59 fn scheme(&self) -> &str {
60 "amqp"
61 }
62
63 async fn create_endpoint(&self, uri: &CamelUri) -> Result<Arc<dyn Endpoint>> {
64 let key = uri.as_str().to_owned();
65 if let Some(ep) = self.endpoints.get(&key) {
66 return Ok(ep.clone());
67 }
68 let url = uri
69 .get_param("url")
70 .ok_or_else(|| CamelError::InvalidUri {
71 uri: key.clone(),
72 reason: "missing 'url' param".into(),
73 })?
74 .to_owned();
75 let exchange = uri.get_param("exchange").unwrap_or("").to_owned();
76 let prefetch: u16 = uri
77 .get_param("prefetch")
78 .map(str::parse::<u16>)
79 .transpose()
80 .map_err(|e| CamelError::InvalidUri {
81 uri: key.clone(),
82 reason: format!("prefetch: {e}"),
83 })?
84 .unwrap_or(32);
85 let ep = Arc::new(AmqpEndpoint {
86 uri: key.clone(),
87 queue: uri.path.clone(),
88 url,
89 exchange,
90 prefetch,
91 });
92 self.endpoints.insert(key, ep.clone());
93 Ok(ep)
94 }
95}
96
97pub struct AmqpEndpoint {
98 uri: String,
99 queue: String,
100 url: String,
101 exchange: String,
102 prefetch: u16,
103}
104
105async fn open_channel(url: &str) -> Result<lapin::Channel> {
106 let conn = Connection::connect(url, ConnectionProperties::default())
107 .await
108 .map_err(|e| CamelError::Endpoint(format!("amqp connect: {e}")))?;
109 conn.create_channel()
110 .await
111 .map_err(|e| CamelError::Endpoint(format!("amqp create_channel: {e}")))
112}
113
114#[async_trait]
115impl Endpoint for AmqpEndpoint {
116 fn uri(&self) -> &str {
117 &self.uri
118 }
119
120 async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
121 let channel = open_channel(&self.url).await?;
122 Ok(Arc::new(AmqpProducer {
123 channel,
124 exchange: self.exchange.clone(),
125 routing_key: self.queue.clone(),
126 }))
127 }
128
129 async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
130 let channel = open_channel(&self.url).await?;
131 channel
132 .queue_declare(
133 &self.queue,
134 QueueDeclareOptions::default(),
135 FieldTable::default(),
136 )
137 .await
138 .map_err(|e| CamelError::Endpoint(format!("amqp queue_declare: {e}")))?;
139 channel
140 .basic_qos(self.prefetch, lapin::options::BasicQosOptions::default())
141 .await
142 .map_err(|e| CamelError::Endpoint(format!("amqp basic_qos: {e}")))?;
143 Ok(Arc::new(AmqpConsumer {
144 channel,
145 queue: self.queue.clone(),
146 pipeline,
147 task: Mutex::new(None),
148 }))
149 }
150}
151
152pub struct AmqpProducer {
153 channel: lapin::Channel,
154 exchange: String,
155 routing_key: String,
156}
157
158#[async_trait]
159impl Producer for AmqpProducer {
160 async fn send(&self, exchange: &mut Exchange) -> Result<()> {
161 let payload = match &exchange.r#in.body {
162 Body::Empty => Bytes::new(),
163 Body::Text(s) => Bytes::from(s.clone().into_bytes()),
164 Body::Bytes(b) => b.clone(),
165 Body::Custom(_) => {
166 return Err(CamelError::Processor(
167 "amqp: cannot serialize Body::Custom".into(),
168 ))
169 }
170 };
171 self.channel
172 .basic_publish(
173 &self.exchange,
174 &self.routing_key,
175 BasicPublishOptions::default(),
176 &payload,
177 BasicProperties::default(),
178 )
179 .await
180 .map_err(|e| CamelError::Processor(format!("amqp publish: {e}")))?
181 .await
182 .map_err(|e| CamelError::Processor(format!("amqp confirm: {e}")))?;
183 debug!(
184 exchange = %self.exchange,
185 routing_key = %self.routing_key,
186 bytes = payload.len(),
187 "amqp: published"
188 );
189 Ok(())
190 }
191}
192
193pub struct AmqpConsumer {
194 channel: lapin::Channel,
195 queue: String,
196 pipeline: Arc<dyn Processor>,
197 task: Mutex<Option<JoinHandle<()>>>,
198}
199
200#[async_trait]
201impl Consumer for AmqpConsumer {
202 async fn start(&self) -> Result<()> {
203 let mut stream = self
204 .channel
205 .basic_consume(
206 &self.queue,
207 "camel-rs",
208 BasicConsumeOptions::default(),
209 FieldTable::default(),
210 )
211 .await
212 .map_err(|e| CamelError::Endpoint(format!("amqp basic_consume: {e}")))?;
213 let pipeline = self.pipeline.clone();
214 let queue = self.queue.clone();
215 let handle = tokio::spawn(async move {
216 while let Some(delivery_res) = stream.next().await {
217 match delivery_res {
218 Err(e) => warn!(queue = %queue, error = %e, "amqp: delivery error"),
219 Ok(delivery) => {
220 let msg = Message {
221 headers: std::iter::once((
222 "CamelAmqpQueue".to_owned(),
223 queue.clone(),
224 ))
225 .collect(),
226 body: Body::Bytes(Bytes::from(delivery.data.clone())),
227 };
228 let mut ex = Exchange {
229 r#in: msg,
230 ..Exchange::default()
231 };
232 match pipeline.process(&mut ex).await {
233 Ok(()) => {
234 if let Err(e) =
235 delivery.ack(BasicAckOptions::default()).await
236 {
237 warn!(queue = %queue, error = %e, "amqp: ack failed");
238 }
239 }
240 Err(e) => {
241 warn!(queue = %queue, error = %e, "amqp: pipeline failed, NACK");
242 let _ = delivery
243 .nack(BasicNackOptions {
244 multiple: false,
245 requeue: false,
246 })
247 .await;
248 }
249 }
250 }
251 }
252 }
253 });
254 *self.task.lock().await = Some(handle);
255 Ok(())
256 }
257
258 async fn stop(&self) -> Result<()> {
259 if let Some(h) = self.task.lock().await.take() {
260 h.abort();
261 }
262 Ok(())
263 }
264}