sockudo-queue 4.5.1

Queue manager implementations for Sockudo
Documentation
use crate::ArcJobProcessorFn;
use async_trait::async_trait;
use futures_util::StreamExt;
use lapin::options::{
    BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
};
use lapin::types::FieldTable;
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties};
use sockudo_core::error::{Error, Result};
use sockudo_core::options::RabbitMqAdapterConfig;
use sockudo_core::queue::QueueInterface;
use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
use tracing::{error, info, warn};

pub struct RabbitMqQueueManager {
    connection: Arc<Connection>,
    publish_channel: Channel,
    prefix: String,
    shutdown: Arc<Notify>,
    running: Arc<AtomicBool>,
}

impl RabbitMqQueueManager {
    pub async fn new(config: RabbitMqAdapterConfig) -> Result<Self> {
        let connection = Connection::connect(&config.url, ConnectionProperties::default())
            .await
            .map_err(|e| Error::Queue(format!("Failed to connect to RabbitMQ: {e}")))?;
        let connection = Arc::new(connection);
        let publish_channel = connection
            .create_channel()
            .await
            .map_err(|e| Error::Queue(format!("Failed to create RabbitMQ publish channel: {e}")))?;

        Ok(Self {
            connection,
            publish_channel,
            prefix: config.prefix,
            shutdown: Arc::new(Notify::new()),
            running: Arc::new(AtomicBool::new(true)),
        })
    }

    fn queue_name(&self, queue_name: &str) -> String {
        format!("{}.queue.{}", self.prefix, queue_name)
    }

    async fn ensure_queue(&self, channel: &Channel, queue_name: &str) -> Result<()> {
        channel
            .queue_declare(
                queue_name.into(),
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                FieldTable::default(),
            )
            .await
            .map_err(|e| {
                Error::Queue(format!(
                    "Failed to declare RabbitMQ queue {queue_name}: {e}"
                ))
            })?;
        Ok(())
    }
}

#[async_trait]
impl QueueInterface for RabbitMqQueueManager {
    async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
        let queue_name = self.queue_name(queue_name);
        self.ensure_queue(&self.publish_channel, &queue_name)
            .await?;
        let payload = sonic_rs::to_vec(&data)
            .map_err(|e| Error::Queue(format!("Failed to serialize RabbitMQ job: {e}")))?;

        self.publish_channel
            .basic_publish(
                "".into(),
                queue_name.as_str().into(),
                BasicPublishOptions::default(),
                &payload,
                BasicProperties::default().with_delivery_mode(2),
            )
            .await
            .map_err(|e| Error::Queue(format!("Failed to publish RabbitMQ job: {e}")))?
            .await
            .map_err(|e| Error::Queue(format!("RabbitMQ publish confirmation failed: {e}")))?;

        Ok(())
    }

    async fn process_queue(&self, queue_name: &str, callback: JobProcessorFnAsync) -> Result<()> {
        let queue_name = self.queue_name(queue_name);
        let consumer_channel = self.connection.create_channel().await.map_err(|e| {
            Error::Queue(format!("Failed to create RabbitMQ consumer channel: {e}"))
        })?;
        self.ensure_queue(&consumer_channel, &queue_name).await?;

        let mut consumer = consumer_channel
            .basic_consume(
                queue_name.as_str().into(),
                format!("sockudo-queue-{}", uuid::Uuid::new_v4()).into(),
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await
            .map_err(|e| Error::Queue(format!("Failed to start RabbitMQ consumer: {e}")))?;

        let callback: ArcJobProcessorFn = Arc::from(callback);
        let shutdown = self.shutdown.clone();
        let running = self.running.clone();

        tokio::spawn(async move {
            loop {
                if !running.load(Ordering::Relaxed) {
                    break;
                }
                let delivery = tokio::select! {
                    _ = shutdown.notified() => break,
                    delivery = consumer.next() => delivery,
                };
                let Some(delivery) = delivery else {
                    break;
                };
                match delivery {
                    Ok(delivery) => match sonic_rs::from_slice::<JobData>(&delivery.data) {
                        Ok(job) => {
                            if callback(job).await.is_ok() {
                                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
                                    error!("Failed to ack RabbitMQ queue delivery: {}", e);
                                }
                            } else {
                                warn!(
                                    "RabbitMQ queue job failed; leaving delivery unacked for broker retry"
                                );
                            }
                        }
                        Err(e) => {
                            error!("Failed to deserialize RabbitMQ queue job: {}", e);
                            let _ = delivery.ack(BasicAckOptions::default()).await;
                        }
                    },
                    Err(e) => {
                        error!("RabbitMQ queue consumer error: {}", e);
                        break;
                    }
                }
            }
            info!("RabbitMQ queue consumer stopped");
        });

        Ok(())
    }

    async fn disconnect(&self) -> Result<()> {
        self.running.store(false, Ordering::Relaxed);
        self.shutdown.notify_waiters();
        Ok(())
    }

    async fn check_health(&self) -> Result<()> {
        if self.connection.status().connected() {
            Ok(())
        } else {
            Err(Error::Queue(
                "RabbitMQ queue connection is not connected".to_string(),
            ))
        }
    }
}