docbox_core/notifications/
mod.rs1use crate::aws::SqsClient;
6mod mpsc;
7mod noop;
8pub mod process;
9mod sqs;
10
11pub use mpsc::MpscNotificationQueueSender;
12
13use serde::Deserialize;
14
15pub use sqs::parse_bucket_message;
17
18#[derive(Debug, Clone, Deserialize)]
19#[serde(tag = "provider", rename_all = "snake_case")]
20pub enum NotificationConfig {
21 Sqs { queue_url: String },
22 Noop,
23 Mpsc,
24}
25
26impl NotificationConfig {
27 pub fn from_env() -> anyhow::Result<Self> {
28 Ok(
29 match (
30 std::env::var("DOCBOX_MPSC_QUEUE"),
31 std::env::var("DOCBOX_SQS_URL"),
32 ) {
33 (Ok(_), _) => NotificationConfig::Mpsc,
34 (_, Ok(queue_url)) => NotificationConfig::Sqs { queue_url },
35 _ => NotificationConfig::Noop,
36 },
37 )
38 }
39}
40
41pub enum AppNotificationQueue {
42 Sqs(sqs::SqsNotificationQueue),
43 Noop(noop::NoopNotificationQueue),
44 Mpsc(mpsc::MpscNotificationQueue),
45}
46
47impl AppNotificationQueue {
48 pub fn from_config(sqs_client: SqsClient, config: NotificationConfig) -> anyhow::Result<Self> {
49 match config {
50 NotificationConfig::Sqs { queue_url } => {
51 tracing::debug!(%queue_url, "using SQS notification queue");
52 Ok(AppNotificationQueue::Sqs(
53 sqs::SqsNotificationQueue::create(sqs_client, queue_url),
54 ))
55 }
56 NotificationConfig::Noop => {
57 tracing::warn!("queue not specified, falling back to no-op queue");
58 Ok(AppNotificationQueue::Noop(noop::NoopNotificationQueue))
59 }
60 NotificationConfig::Mpsc => {
61 tracing::debug!("DOCBOX_MPSC_QUEUE is set using local webhook notification queue");
62 Ok(AppNotificationQueue::Mpsc(
63 mpsc::MpscNotificationQueue::create(),
64 ))
65 }
66 }
67 }
68
69 pub async fn next_message(&mut self) -> Option<NotificationQueueMessage> {
70 match self {
71 AppNotificationQueue::Sqs(queue) => queue.next_message().await,
72 AppNotificationQueue::Noop(queue) => queue.next_message().await,
73 AppNotificationQueue::Mpsc(queue) => queue.next_message().await,
74 }
75 }
76}
77
78pub enum NotificationQueueMessage {
80 FileCreated {
81 bucket_name: String,
82 object_key: String,
83 },
84}
85
86pub(crate) trait NotificationQueue: Send + Sync + 'static {
87 async fn next_message(&mut self) -> Option<NotificationQueueMessage>;
89}