coreon-amqp 0.1.0

amqp:* component for camel-rs (RabbitMQ via lapin).
Documentation
//! amqp:* — RabbitMQ / AMQP 0-9-1 producer and consumer via [`lapin`].
//!
//! URI: `amqp:<queue>?url=amqp://user:pass@host:5672/%2F&exchange=&prefetch=32`
//!
//! - Producer: publishes `exchange.in.body` bytes to `<exchange>` with
//!   routing key = `<queue>`. If `exchange` is empty (default), publishes
//!   directly to the queue.
//! - Consumer: declares the queue (idempotent), subscribes with the given
//!   prefetch, emits one Exchange per delivery. ACKs on pipeline success,
//!   NACKs (with requeue=false) on pipeline error.
//!
//! Tests: marked `#[ignore]` unless `AMQP_URL` env var is set.
//! `docker run -p 5672:5672 rabbitmq:3.13-management` then run
//! `AMQP_URL=amqp://guest:guest@localhost:5672/%2F cargo test -p camel-amqp -- --ignored`.

use async_trait::async_trait;
use bytes::Bytes;
use coreon_core::{
    message::{Body, Message},
    uri::CamelUri,
    CamelError, Component, Consumer, Endpoint, Exchange, Processor, Producer, Result,
};
use dashmap::DashMap;
use futures::StreamExt;
use lapin::{
    options::{
        BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
        QueueDeclareOptions,
    },
    types::FieldTable,
    BasicProperties, Connection, ConnectionProperties,
};
use std::sync::Arc;
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{debug, warn};

pub struct AmqpComponent {
    endpoints: DashMap<String, Arc<AmqpEndpoint>>,
}

impl AmqpComponent {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            endpoints: DashMap::new(),
        })
    }
}

impl Default for AmqpComponent {
    fn default() -> Self {
        Self {
            endpoints: DashMap::new(),
        }
    }
}

#[async_trait]
impl Component for AmqpComponent {
    fn scheme(&self) -> &str {
        "amqp"
    }

    async fn create_endpoint(&self, uri: &CamelUri) -> Result<Arc<dyn Endpoint>> {
        let key = uri.as_str().to_owned();
        if let Some(ep) = self.endpoints.get(&key) {
            return Ok(ep.clone());
        }
        let url = uri
            .get_param("url")
            .ok_or_else(|| CamelError::InvalidUri {
                uri: key.clone(),
                reason: "missing 'url' param".into(),
            })?
            .to_owned();
        let exchange = uri.get_param("exchange").unwrap_or("").to_owned();
        let prefetch: u16 = uri
            .get_param("prefetch")
            .map(str::parse::<u16>)
            .transpose()
            .map_err(|e| CamelError::InvalidUri {
                uri: key.clone(),
                reason: format!("prefetch: {e}"),
            })?
            .unwrap_or(32);
        let ep = Arc::new(AmqpEndpoint {
            uri: key.clone(),
            queue: uri.path.clone(),
            url,
            exchange,
            prefetch,
        });
        self.endpoints.insert(key, ep.clone());
        Ok(ep)
    }
}

pub struct AmqpEndpoint {
    uri: String,
    queue: String,
    url: String,
    exchange: String,
    prefetch: u16,
}

async fn open_channel(url: &str) -> Result<lapin::Channel> {
    let conn = Connection::connect(url, ConnectionProperties::default())
        .await
        .map_err(|e| CamelError::Endpoint(format!("amqp connect: {e}")))?;
    conn.create_channel()
        .await
        .map_err(|e| CamelError::Endpoint(format!("amqp create_channel: {e}")))
}

#[async_trait]
impl Endpoint for AmqpEndpoint {
    fn uri(&self) -> &str {
        &self.uri
    }

    async fn create_producer(&self) -> Result<Arc<dyn Producer>> {
        let channel = open_channel(&self.url).await?;
        Ok(Arc::new(AmqpProducer {
            channel,
            exchange: self.exchange.clone(),
            routing_key: self.queue.clone(),
        }))
    }

    async fn create_consumer(&self, pipeline: Arc<dyn Processor>) -> Result<Arc<dyn Consumer>> {
        let channel = open_channel(&self.url).await?;
        channel
            .queue_declare(
                &self.queue,
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await
            .map_err(|e| CamelError::Endpoint(format!("amqp queue_declare: {e}")))?;
        channel
            .basic_qos(self.prefetch, lapin::options::BasicQosOptions::default())
            .await
            .map_err(|e| CamelError::Endpoint(format!("amqp basic_qos: {e}")))?;
        Ok(Arc::new(AmqpConsumer {
            channel,
            queue: self.queue.clone(),
            pipeline,
            task: Mutex::new(None),
        }))
    }
}

pub struct AmqpProducer {
    channel: lapin::Channel,
    exchange: String,
    routing_key: String,
}

#[async_trait]
impl Producer for AmqpProducer {
    async fn send(&self, exchange: &mut Exchange) -> Result<()> {
        let payload = match &exchange.r#in.body {
            Body::Empty => Bytes::new(),
            Body::Text(s) => Bytes::from(s.clone().into_bytes()),
            Body::Bytes(b) => b.clone(),
            Body::Custom(_) => {
                return Err(CamelError::Processor(
                    "amqp: cannot serialize Body::Custom".into(),
                ))
            }
        };
        self.channel
            .basic_publish(
                &self.exchange,
                &self.routing_key,
                BasicPublishOptions::default(),
                &payload,
                BasicProperties::default(),
            )
            .await
            .map_err(|e| CamelError::Processor(format!("amqp publish: {e}")))?
            .await
            .map_err(|e| CamelError::Processor(format!("amqp confirm: {e}")))?;
        debug!(
            exchange = %self.exchange,
            routing_key = %self.routing_key,
            bytes = payload.len(),
            "amqp: published"
        );
        Ok(())
    }
}

pub struct AmqpConsumer {
    channel: lapin::Channel,
    queue: String,
    pipeline: Arc<dyn Processor>,
    task: Mutex<Option<JoinHandle<()>>>,
}

#[async_trait]
impl Consumer for AmqpConsumer {
    async fn start(&self) -> Result<()> {
        let mut stream = self
            .channel
            .basic_consume(
                &self.queue,
                "camel-rs",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await
            .map_err(|e| CamelError::Endpoint(format!("amqp basic_consume: {e}")))?;
        let pipeline = self.pipeline.clone();
        let queue = self.queue.clone();
        let handle = tokio::spawn(async move {
            while let Some(delivery_res) = stream.next().await {
                match delivery_res {
                    Err(e) => warn!(queue = %queue, error = %e, "amqp: delivery error"),
                    Ok(delivery) => {
                        let msg = Message {
                            headers: std::iter::once((
                                "CamelAmqpQueue".to_owned(),
                                queue.clone(),
                            ))
                            .collect(),
                            body: Body::Bytes(Bytes::from(delivery.data.clone())),
                        };
                        let mut ex = Exchange {
                            r#in: msg,
                            ..Exchange::default()
                        };
                        match pipeline.process(&mut ex).await {
                            Ok(()) => {
                                if let Err(e) =
                                    delivery.ack(BasicAckOptions::default()).await
                                {
                                    warn!(queue = %queue, error = %e, "amqp: ack failed");
                                }
                            }
                            Err(e) => {
                                warn!(queue = %queue, error = %e, "amqp: pipeline failed, NACK");
                                let _ = delivery
                                    .nack(BasicNackOptions {
                                        multiple: false,
                                        requeue: false,
                                    })
                                    .await;
                            }
                        }
                    }
                }
            }
        });
        *self.task.lock().await = Some(handle);
        Ok(())
    }

    async fn stop(&self) -> Result<()> {
        if let Some(h) = self.task.lock().await.take() {
            h.abort();
        }
        Ok(())
    }
}