docbox_core/notifications/
process.rs1use crate::notifications::{AppNotificationQueue, NotificationQueueMessage};
6use crate::processing::ProcessingLayer;
7use crate::{
8 events::EventPublisherFactory,
9 files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
10};
11use docbox_database::{
12 DatabasePoolCache,
13 models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
14};
15use docbox_search::SearchIndexFactory;
16use docbox_storage::StorageLayerFactory;
17use std::sync::Arc;
18use tracing::Instrument;
19
20#[derive(Clone)]
21pub struct NotificationQueueData {
22 pub db_cache: Arc<DatabasePoolCache>,
23 pub search: SearchIndexFactory,
24 pub storage: StorageLayerFactory,
25 pub events: EventPublisherFactory,
26 pub processing: ProcessingLayer,
27}
28
29pub async fn process_notification_queue(
32 mut notification_queue: AppNotificationQueue,
33 data: NotificationQueueData,
34) {
35 while let Some(message) = notification_queue.next_message().await {
37 match message {
38 NotificationQueueMessage::FileCreated {
39 bucket_name,
40 object_key,
41 } => {
42 tokio::spawn(safe_handle_file_uploaded(
43 data.clone(),
44 bucket_name,
45 object_key,
46 ));
47 }
48 }
49 }
50}
51
52pub async fn safe_handle_file_uploaded(
53 data: NotificationQueueData,
54 bucket_name: String,
55 object_key: String,
56) {
57 if let Err(cause) = handle_file_uploaded(data, bucket_name, object_key).await {
58 tracing::error!(?cause, "failed to handle sqs file upload");
59 }
60}
61
62pub async fn handle_file_uploaded(
63 data: NotificationQueueData,
64 bucket_name: String,
65 object_key: String,
66) -> anyhow::Result<()> {
67 let tenant = {
68 let db = data.db_cache.get_root_pool().await?;
69 match Tenant::find_by_bucket(&db, &bucket_name).await? {
70 Some(value) => value,
71 None => {
72 tracing::warn!(
73 ?bucket_name,
74 ?object_key,
75 "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
76 );
77 return Ok(());
78 }
79 }
80 };
81
82 let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
84
85 handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
86 .instrument(span)
87 .await
88}
89
90pub async fn handle_file_uploaded_tenant(
91 tenant: Tenant,
92 data: NotificationQueueData,
93 bucket_name: String,
94 object_key: String,
95) -> anyhow::Result<()> {
96 let object_key = match urlencoding::decode(&object_key) {
97 Ok(value) => value.to_string(),
98 Err(err) => {
99 tracing::warn!(
100 ?err,
101 ?bucket_name,
102 ?object_key,
103 "file was uploaded into a bucket but had an invalid file name"
104 );
105 return Ok(());
106 }
107 };
108
109 let db = data.db_cache.get_tenant_pool(&tenant).await?;
110
111 let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
113 Ok(Some(task)) => task,
114 Ok(None) => {
115 tracing::debug!("uploaded file was not a presigned upload");
116 return Ok(());
117 }
118 Err(cause) => {
119 tracing::error!(?cause, "unable to query presigned upload");
120 anyhow::bail!("unable to query presigned upload");
121 }
122 };
123
124 let scope = task.document_box.clone();
125
126 let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
128 Ok(Some(value)) => value,
129 Ok(None) => {
130 tracing::error!("presigned upload folder no longer exists");
131 anyhow::bail!("presigned upload folder no longer exists");
132 }
133 Err(cause) => {
134 tracing::error!(?cause, "unable to query folder");
135 anyhow::bail!("unable to query folder");
136 }
137 };
138
139 let complete = CompletePresigned { task, folder };
141
142 let search = data.search.create_search_index(&tenant);
143 let storage = data.storage.create_storage_layer(&tenant);
144 let events = data.events.create_event_publisher(&tenant);
145
146 safe_complete_presigned(db, search, storage, events, data.processing, complete).await?;
148
149 Ok(())
150}