docbox_core/notifications/
mod.rs1mod mpsc;
6mod noop;
7pub mod process;
8mod sqs;
9
10use crate::aws::SqsClient;
11pub use mpsc::MpscNotificationQueueSender;
12
13use serde::Deserialize;
14
15pub use sqs::parse_bucket_message;
17
18#[derive(Debug, Clone, Deserialize)]
19#[serde(tag = "provider", rename_all = "snake_case")]
20pub enum NotificationConfig {
21 Sqs { queue_url: String },
22 Noop,
23 Mpsc,
24}
25
26impl NotificationConfig {
27 pub fn from_env() -> Self {
28 match (
29 std::env::var("DOCBOX_MPSC_QUEUE"),
30 std::env::var("DOCBOX_SQS_URL"),
31 ) {
32 (Ok(_), _) => NotificationConfig::Mpsc,
33 (_, Ok(queue_url)) => NotificationConfig::Sqs { queue_url },
34 _ => NotificationConfig::Noop,
35 }
36 }
37}
38
39pub enum AppNotificationQueue {
40 Sqs(sqs::SqsNotificationQueue),
41 Noop(noop::NoopNotificationQueue),
42 Mpsc(mpsc::MpscNotificationQueue),
43}
44
45impl AppNotificationQueue {
46 pub fn from_config(sqs_client: SqsClient, config: NotificationConfig) -> Self {
47 match config {
48 NotificationConfig::Sqs { queue_url } => {
49 tracing::debug!(%queue_url, "using SQS notification queue");
50 AppNotificationQueue::Sqs(sqs::SqsNotificationQueue::create(sqs_client, queue_url))
51 }
52 NotificationConfig::Noop => {
53 tracing::warn!("queue not specified, falling back to no-op queue");
54 AppNotificationQueue::Noop(noop::NoopNotificationQueue)
55 }
56 NotificationConfig::Mpsc => {
57 tracing::debug!("DOCBOX_MPSC_QUEUE is set using local webhook notification queue");
58 AppNotificationQueue::Mpsc(mpsc::MpscNotificationQueue::create())
59 }
60 }
61 }
62
63 pub async fn next_message(&mut self) -> Option<NotificationQueueMessage> {
64 match self {
65 AppNotificationQueue::Sqs(queue) => queue.next_message().await,
66 AppNotificationQueue::Noop(queue) => queue.next_message().await,
67 AppNotificationQueue::Mpsc(queue) => queue.next_message().await,
68 }
69 }
70}
71
72pub enum NotificationQueueMessage {
74 FileCreated {
75 bucket_name: String,
76 object_key: String,
77 },
78}
79
80pub(crate) trait NotificationQueue: Send + Sync + 'static {
81 async fn next_message(&mut self) -> Option<NotificationQueueMessage>;
83}