Skip to main content

coreon_amqp/
lib.rs

1//! amqp:* — RabbitMQ / AMQP 0-9-1 producer and consumer via [`lapin`].
2//!
3//! URI: `amqp:<queue>?url=amqp://user:pass@host:5672/%2F&exchange=&prefetch=32`
4//!
5//! - Producer: publishes `exchange.in.body` bytes to `<exchange>` with
6//!   routing key = `<queue>`. If `exchange` is empty (default), publishes
7//!   directly to the queue.
8//! - Consumer: declares the queue (idempotent), subscribes with the given
9//!   prefetch, emits one Exchange per delivery. ACKs on pipeline success,
10//!   NACKs (with requeue=false) on pipeline error.
11//!
12//! Tests: marked `#[ignore]` unless `AMQP_URL` env var is set.
13//! `docker run -p 5672:5672 rabbitmq:3.13-management` then run
14//! `AMQP_URL=amqp://guest:guest@localhost:5672/%2F cargo test -p camel-amqp -- --ignored`.
15
16use async_trait::async_trait;
17use bytes::Bytes;
18use coreon_core::{
19    message::{Body, Message},
20    uri::CamelUri,
21    CamelError, Component, Consumer, Endpoint, Exchange, Processor, Producer, Result,
22};
23use dashmap::DashMap;
24use futures::StreamExt;
25use lapin::{
26    options::{
27        BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
28        QueueDeclareOptions,
29    },
30    types::FieldTable,
31    BasicProperties, Connection, ConnectionProperties,
32};
33use std::sync::Arc;
34use tokio::{sync::Mutex, task::JoinHandle};
35use tracing::{debug, warn};
36
37pub struct AmqpComponent {
38    endpoints: DashMap<String, Arc<AmqpEndpoint>>,
39}
40
41impl AmqpComponent {
42    pub fn new() -> Arc<Self> {
43        Arc::new(Self {
44            endpoints: DashMap::new(),
45        })
46    }
47}
48
49impl Default for AmqpComponent {
50    fn default() -> Self {
51        Self {
52            endpoints: DashMap::new(),
53        }
54    }
55}
56
57#[async_trait]
58impl Component for AmqpComponent {
59    fn scheme(&self) -> &str {
60        "amqp"
61    }
62
63    async fn create_endpoint(&self, uri: &CamelUri) -> Result<Arc<dyn Endpoint>> {
64        let key = uri.as_str().to_owned();
65        if let Some(ep) = self.endpoints.get(&key) {
66            return Ok(ep.clone());
67        }
68        let url = uri
69            .get_param("url")
70            .ok_or_else(|| CamelError::InvalidUri {
71                uri: key.clone(),
72                reason: "missing 'url' param".into(),
73            })?
74            .to_owned();
75        let exchange = uri.get_param("exchange").unwrap_or("").to_owned();
76        let prefetch: u16 = uri
77            .get_param("prefetch")
78            .map(str::parse::<u16>)
79            .transpose()
80            .map_err(|e| CamelError::InvalidUri {
81                uri: key.clone(),
82                reason: format!("prefetch: {e}"),
83            })?
84            .unwrap_or(32);
85        let ep = Arc::new(AmqpEndpoint {
86            uri: key.clone(),
87            queue: uri.path.clone(),
88            url,
89            exchange,
90            prefetch,
91        });
92        self.endpoints.insert(key, ep.clone());
93        Ok(ep)
94    }
95}
96
97pub struct AmqpEndpoint {
98    uri: String,
99    queue: String,
100    url: String,
101    exchange: String,
102    prefetch: u16,
103}
104
105async fn open_channel(url: &str) -> Result<lapin::Channel> {
106    let conn = Connection::connect(url, ConnectionProperties::default())
107        .await
108        .map_err(|e| CamelError::Endpoint(format!("amqp connect: {e}")))?;
109    conn.create_channel()
110        .await
111        .map_err(|e| CamelError::Endpoint(format!("amqp create_channel: {e}")))
112}
113
114#[async_trait]
115impl Endpoint for AmqpEndpoint {
116    fn uri(&self) -> &str {
117        &self.uri
118    }
119
120    async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
121        let channel = open_channel(&self.url).await?;
122        Ok(Arc::new(AmqpProducer {
123            channel,
124            exchange: self.exchange.clone(),
125            routing_key: self.queue.clone(),
126        }))
127    }
128
129    async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
130        let channel = open_channel(&self.url).await?;
131        channel
132            .queue_declare(
133                &self.queue,
134                QueueDeclareOptions::default(),
135                FieldTable::default(),
136            )
137            .await
138            .map_err(|e| CamelError::Endpoint(format!("amqp queue_declare: {e}")))?;
139        channel
140            .basic_qos(self.prefetch, lapin::options::BasicQosOptions::default())
141            .await
142            .map_err(|e| CamelError::Endpoint(format!("amqp basic_qos: {e}")))?;
143        Ok(Arc::new(AmqpConsumer {
144            channel,
145            queue: self.queue.clone(),
146            pipeline,
147            task: Mutex::new(None),
148        }))
149    }
150}
151
152pub struct AmqpProducer {
153    channel: lapin::Channel,
154    exchange: String,
155    routing_key: String,
156}
157
158#[async_trait]
159impl Producer for AmqpProducer {
160    async fn send(&self, exchange: &mut Exchange) -> Result<()> {
161        let payload = match &exchange.r#in.body {
162            Body::Empty => Bytes::new(),
163            Body::Text(s) => Bytes::from(s.clone().into_bytes()),
164            Body::Bytes(b) => b.clone(),
165            Body::Custom(_) => {
166                return Err(CamelError::Processor(
167                    "amqp: cannot serialize Body::Custom".into(),
168                ))
169            }
170        };
171        self.channel
172            .basic_publish(
173                &self.exchange,
174                &self.routing_key,
175                BasicPublishOptions::default(),
176                &payload,
177                BasicProperties::default(),
178            )
179            .await
180            .map_err(|e| CamelError::Processor(format!("amqp publish: {e}")))?
181            .await
182            .map_err(|e| CamelError::Processor(format!("amqp confirm: {e}")))?;
183        debug!(
184            exchange = %self.exchange,
185            routing_key = %self.routing_key,
186            bytes = payload.len(),
187            "amqp: published"
188        );
189        Ok(())
190    }
191}
192
193pub struct AmqpConsumer {
194    channel: lapin::Channel,
195    queue: String,
196    pipeline: Arc<dyn Processor>,
197    task: Mutex<Option<JoinHandle<()>>>,
198}
199
200#[async_trait]
201impl Consumer for AmqpConsumer {
202    async fn start(&self) -> Result<()> {
203        let mut stream = self
204            .channel
205            .basic_consume(
206                &self.queue,
207                "camel-rs",
208                BasicConsumeOptions::default(),
209                FieldTable::default(),
210            )
211            .await
212            .map_err(|e| CamelError::Endpoint(format!("amqp basic_consume: {e}")))?;
213        let pipeline = self.pipeline.clone();
214        let queue = self.queue.clone();
215        let handle = tokio::spawn(async move {
216            while let Some(delivery_res) = stream.next().await {
217                match delivery_res {
218                    Err(e) => warn!(queue = %queue, error = %e, "amqp: delivery error"),
219                    Ok(delivery) => {
220                        let msg = Message {
221                            headers: std::iter::once((
222                                "CamelAmqpQueue".to_owned(),
223                                queue.clone(),
224                            ))
225                            .collect(),
226                            body: Body::Bytes(Bytes::from(delivery.data.clone())),
227                        };
228                        let mut ex = Exchange {
229                            r#in: msg,
230                            ..Exchange::default()
231                        };
232                        match pipeline.process(&mut ex).await {
233                            Ok(()) => {
234                                if let Err(e) =
235                                    delivery.ack(BasicAckOptions::default()).await
236                                {
237                                    warn!(queue = %queue, error = %e, "amqp: ack failed");
238                                }
239                            }
240                            Err(e) => {
241                                warn!(queue = %queue, error = %e, "amqp: pipeline failed, NACK");
242                                let _ = delivery
243                                    .nack(BasicNackOptions {
244                                        multiple: false,
245                                        requeue: false,
246                                    })
247                                    .await;
248                            }
249                        }
250                    }
251                }
252            }
253        });
254        *self.task.lock().await = Some(handle);
255        Ok(())
256    }
257
258    async fn stop(&self) -> Result<()> {
259        if let Some(h) = self.task.lock().await.take() {
260            h.abort();
261        }
262        Ok(())
263    }
264}