sockudo-queue 4.6.0

Queue manager implementations for Sockudo
Documentation
use crate::ArcJobProcessorFn;
use async_nats::jetstream;
use async_nats::jetstream::consumer::pull;
use async_nats::jetstream::stream;
use async_trait::async_trait;
use futures_util::TryStreamExt;
use sockudo_core::error::{Error, Result};
use sockudo_core::options::NatsAdapterConfig;
use sockudo_core::queue::QueueInterface;
use sockudo_core::webhook_types::{JobData, JobProcessorFnAsync};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
use tracing::{error, info, warn};

pub struct NatsJetStreamQueueManager {
    client: async_nats::Client,
    jetstream: jetstream::Context,
    prefix: String,
    shutdown: Arc<Notify>,
    running: Arc<AtomicBool>,
}

impl NatsJetStreamQueueManager {
    pub async fn new(config: NatsAdapterConfig) -> Result<Self> {
        let mut options = async_nats::ConnectOptions::new();
        if let (Some(username), Some(password)) =
            (config.username.as_deref(), config.password.as_deref())
        {
            options = options.user_and_password(username.to_string(), password.to_string());
        } else if let Some(token) = config.token.as_deref() {
            options = options.token(token.to_string());
        }
        options = options.connection_timeout(std::time::Duration::from_millis(
            config.connection_timeout_ms,
        ));
        let client = options
            .connect(&config.servers)
            .await
            .map_err(|e| Error::Queue(format!("Failed to connect to NATS JetStream: {e}")))?;
        let jetstream = jetstream::new(client.clone());

        Ok(Self {
            client,
            jetstream,
            prefix: config.prefix,
            shutdown: Arc::new(Notify::new()),
            running: Arc::new(AtomicBool::new(true)),
        })
    }

    fn normalize(value: &str) -> String {
        value
            .chars()
            .map(|c| {
                if c.is_ascii_alphanumeric() || matches!(c, '_' | '-') {
                    c.to_ascii_lowercase()
                } else {
                    '-'
                }
            })
            .collect()
    }

    fn stream_name(&self, queue_name: &str) -> String {
        format!(
            "{}_q_{}",
            Self::normalize(&self.prefix),
            Self::normalize(queue_name)
        )
    }

    fn subject_name(&self, queue_name: &str) -> String {
        format!("{}.queue.{}", self.prefix, queue_name)
    }

    async fn ensure_stream(&self, queue_name: &str) -> Result<stream::Stream> {
        let stream_name = self.stream_name(queue_name);
        let subject = self.subject_name(queue_name);
        self.jetstream
            .get_or_create_stream(stream::Config {
                name: stream_name,
                subjects: vec![subject],
                max_messages: 100_000,
                ..Default::default()
            })
            .await
            .map_err(|e| Error::Queue(format!("Failed to ensure NATS JetStream queue stream: {e}")))
    }
}

#[async_trait]
impl QueueInterface for NatsJetStreamQueueManager {
    async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
        let _stream = self.ensure_stream(queue_name).await?;
        let subject = self.subject_name(queue_name);
        let payload = sonic_rs::to_vec(&data)
            .map_err(|e| Error::Queue(format!("Failed to serialize NATS queue job: {e}")))?;
        self.jetstream
            .publish(subject, payload.into())
            .await
            .map_err(|e| Error::Queue(format!("Failed to publish NATS queue job: {e}")))?
            .await
            .map_err(|e| Error::Queue(format!("Failed to await NATS queue publish ack: {e}")))?;
        Ok(())
    }

    async fn process_queue(&self, queue_name: &str, callback: JobProcessorFnAsync) -> Result<()> {
        let stream = self.ensure_stream(queue_name).await?;
        let consumer_name = format!("{}_consumer", self.stream_name(queue_name));
        let consumer = stream
            .get_or_create_consumer(
                consumer_name.as_str(),
                pull::Config {
                    durable_name: Some(consumer_name.clone()),
                    ack_policy: jetstream::consumer::AckPolicy::Explicit,
                    ack_wait: std::time::Duration::from_secs(30),
                    max_deliver: 10,
                    ..Default::default()
                },
            )
            .await
            .map_err(|e| Error::Queue(format!("Failed to create NATS queue consumer: {e}")))?;

        let callback: ArcJobProcessorFn = Arc::from(callback);
        let shutdown = self.shutdown.clone();
        let running = self.running.clone();

        tokio::spawn(async move {
            loop {
                if !running.load(Ordering::Relaxed) {
                    break;
                }
                let messages = match consumer.messages().await {
                    Ok(messages) => messages,
                    Err(e) => {
                        error!("Failed to open NATS JetStream message stream: {}", e);
                        break;
                    }
                };
                tokio::pin!(messages);
                loop {
                    if !running.load(Ordering::Relaxed) {
                        break;
                    }
                    let message = tokio::select! {
                        _ = shutdown.notified() => break,
                        message = messages.try_next() => message,
                    };
                    match message {
                        Ok(Some(message)) => {
                            match sonic_rs::from_slice::<JobData>(&message.payload) {
                                Ok(job) => {
                                    if callback(job).await.is_ok() {
                                        if let Err(e) = message.ack().await {
                                            error!("Failed to ack NATS queue job: {}", e);
                                        }
                                    } else {
                                        warn!(
                                            "NATS queue job failed; message left unacked for redelivery"
                                        );
                                    }
                                }
                                Err(e) => {
                                    error!("Failed to deserialize NATS queue job: {}", e);
                                    let _ = message.ack().await;
                                }
                            }
                        }
                        Ok(None) => break,
                        Err(e) => {
                            error!("NATS queue consumer error: {}", e);
                            break;
                        }
                    }
                }
            }
            info!("NATS JetStream queue consumer stopped");
        });

        Ok(())
    }

    async fn disconnect(&self) -> Result<()> {
        self.running.store(false, Ordering::Relaxed);
        self.shutdown.notify_waiters();
        self.client
            .drain()
            .await
            .map_err(|e| Error::Queue(format!("Failed to drain NATS queue client: {e}")))?;
        Ok(())
    }

    async fn check_health(&self) -> Result<()> {
        self.jetstream
            .query_account()
            .await
            .map(|_| ())
            .map_err(|e| Error::Queue(format!("NATS JetStream queue health check failed: {e}")))
    }
}