docbox_core/notifications/
mod.rs

1//! # Notifications
2//!
3//! Notifications queue system handling notifications for the app
4
5use crate::aws::SqsClient;
6mod mpsc;
7mod noop;
8pub mod process;
9mod sqs;
10
11pub use mpsc::MpscNotificationQueueSender;
12
13use serde::Deserialize;
14
15// Pretty common utility function
16pub use sqs::parse_bucket_message;
17
18#[derive(Debug, Clone, Deserialize)]
19#[serde(tag = "provider", rename_all = "snake_case")]
20pub enum NotificationConfig {
21    Sqs { queue_url: String },
22    Noop,
23    Mpsc,
24}
25
26impl NotificationConfig {
27    pub fn from_env() -> anyhow::Result<Self> {
28        Ok(
29            match (
30                std::env::var("DOCBOX_MPSC_QUEUE"),
31                std::env::var("DOCBOX_SQS_URL"),
32            ) {
33                (Ok(_), _) => NotificationConfig::Mpsc,
34                (_, Ok(queue_url)) => NotificationConfig::Sqs { queue_url },
35                _ => NotificationConfig::Noop,
36            },
37        )
38    }
39}
40
41pub enum AppNotificationQueue {
42    Sqs(sqs::SqsNotificationQueue),
43    Noop(noop::NoopNotificationQueue),
44    Mpsc(mpsc::MpscNotificationQueue),
45}
46
47impl AppNotificationQueue {
48    pub fn from_config(sqs_client: SqsClient, config: NotificationConfig) -> anyhow::Result<Self> {
49        match config {
50            NotificationConfig::Sqs { queue_url } => {
51                tracing::debug!(%queue_url, "using SQS notification queue");
52                Ok(AppNotificationQueue::Sqs(
53                    sqs::SqsNotificationQueue::create(sqs_client, queue_url),
54                ))
55            }
56            NotificationConfig::Noop => {
57                tracing::warn!("queue not specified, falling back to no-op queue");
58                Ok(AppNotificationQueue::Noop(noop::NoopNotificationQueue))
59            }
60            NotificationConfig::Mpsc => {
61                tracing::debug!("DOCBOX_MPSC_QUEUE is set using local webhook notification queue");
62                Ok(AppNotificationQueue::Mpsc(
63                    mpsc::MpscNotificationQueue::create(),
64                ))
65            }
66        }
67    }
68
69    pub async fn next_message(&mut self) -> Option<NotificationQueueMessage> {
70        match self {
71            AppNotificationQueue::Sqs(queue) => queue.next_message().await,
72            AppNotificationQueue::Noop(queue) => queue.next_message().await,
73            AppNotificationQueue::Mpsc(queue) => queue.next_message().await,
74        }
75    }
76}
77
78/// Type of message from the notification queue
79pub enum NotificationQueueMessage {
80    FileCreated {
81        bucket_name: String,
82        object_key: String,
83    },
84}
85
86pub(crate) trait NotificationQueue: Send + Sync + 'static {
87    /// Request the next message from the notification queue
88    async fn next_message(&mut self) -> Option<NotificationQueueMessage>;
89}