Skip to main content

docbox_core/notifications/
mod.rs

1//! # Notifications
2//!
3//! Notifications queue system handling notifications for the app
4
5mod mpsc;
6mod noop;
7pub mod process;
8mod sqs;
9
10use crate::aws::SqsClient;
11pub use mpsc::MpscNotificationQueueSender;
12
13use serde::Deserialize;
14
15// Pretty common utility function
16pub use sqs::parse_bucket_message;
17
18#[derive(Debug, Clone, Deserialize)]
19#[serde(tag = "provider", rename_all = "snake_case")]
20pub enum NotificationConfig {
21    Sqs { queue_url: String },
22    Noop,
23    Mpsc,
24}
25
26impl NotificationConfig {
27    pub fn from_env() -> Self {
28        match (
29            std::env::var("DOCBOX_MPSC_QUEUE"),
30            std::env::var("DOCBOX_SQS_URL"),
31        ) {
32            (Ok(_), _) => NotificationConfig::Mpsc,
33            (_, Ok(queue_url)) => NotificationConfig::Sqs { queue_url },
34            _ => NotificationConfig::Noop,
35        }
36    }
37}
38
39pub enum AppNotificationQueue {
40    Sqs(sqs::SqsNotificationQueue),
41    Noop(noop::NoopNotificationQueue),
42    Mpsc(mpsc::MpscNotificationQueue),
43}
44
45impl AppNotificationQueue {
46    pub fn from_config(sqs_client: SqsClient, config: NotificationConfig) -> Self {
47        match config {
48            NotificationConfig::Sqs { queue_url } => {
49                tracing::debug!(%queue_url, "using SQS notification queue");
50                AppNotificationQueue::Sqs(sqs::SqsNotificationQueue::create(sqs_client, queue_url))
51            }
52            NotificationConfig::Noop => {
53                tracing::warn!("queue not specified, falling back to no-op queue");
54                AppNotificationQueue::Noop(noop::NoopNotificationQueue)
55            }
56            NotificationConfig::Mpsc => {
57                tracing::debug!("DOCBOX_MPSC_QUEUE is set using local webhook notification queue");
58                AppNotificationQueue::Mpsc(mpsc::MpscNotificationQueue::create())
59            }
60        }
61    }
62
63    pub async fn next_message(&mut self) -> Option<NotificationQueueMessage> {
64        match self {
65            AppNotificationQueue::Sqs(queue) => queue.next_message().await,
66            AppNotificationQueue::Noop(queue) => queue.next_message().await,
67            AppNotificationQueue::Mpsc(queue) => queue.next_message().await,
68        }
69    }
70}
71
72/// Type of message from the notification queue
73pub enum NotificationQueueMessage {
74    FileCreated {
75        bucket_name: String,
76        object_key: String,
77    },
78}
79
80pub(crate) trait NotificationQueue: Send + Sync + 'static {
81    /// Request the next message from the notification queue
82    async fn next_message(&mut self) -> Option<NotificationQueueMessage>;
83}