docbox_core/notifications/
process.rs1use crate::notifications::{AppNotificationQueue, NotificationQueueMessage};
6use crate::processing::ProcessingLayer;
7use crate::secrets::AppSecretManager;
8use crate::storage::StorageLayerFactory;
9use crate::{
10 events::EventPublisherFactory,
11 files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
12};
13use docbox_database::{
14 DatabasePoolCache,
15 models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
16};
17use docbox_search::SearchIndexFactory;
18use std::sync::Arc;
19use tracing::Instrument;
20
21#[derive(Clone)]
22pub struct NotificationQueueData {
23 pub db_cache: Arc<DatabasePoolCache<AppSecretManager>>,
24 pub search: SearchIndexFactory,
25 pub storage: StorageLayerFactory,
26 pub events: EventPublisherFactory,
27 pub processing: ProcessingLayer,
28}
29
30pub async fn process_notification_queue(
33 mut notification_queue: AppNotificationQueue,
34 data: NotificationQueueData,
35) {
36 while let Some(message) = notification_queue.next_message().await {
38 match message {
39 NotificationQueueMessage::FileCreated {
40 bucket_name,
41 object_key,
42 } => {
43 tokio::spawn(safe_handle_file_uploaded(
44 data.clone(),
45 bucket_name,
46 object_key,
47 ));
48 }
49 }
50 }
51}
52
53pub async fn safe_handle_file_uploaded(
54 data: NotificationQueueData,
55 bucket_name: String,
56 object_key: String,
57) {
58 if let Err(cause) = handle_file_uploaded(data, bucket_name, object_key).await {
59 tracing::error!(?cause, "failed to handle sqs file upload");
60 }
61}
62
63pub async fn handle_file_uploaded(
64 data: NotificationQueueData,
65 bucket_name: String,
66 object_key: String,
67) -> anyhow::Result<()> {
68 let tenant = {
69 let db = data.db_cache.get_root_pool().await?;
70 match Tenant::find_by_bucket(&db, &bucket_name).await? {
71 Some(value) => value,
72 None => {
73 tracing::warn!(
74 ?bucket_name,
75 ?object_key,
76 "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
77 );
78 return Ok(());
79 }
80 }
81 };
82
83 let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
85
86 handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
87 .instrument(span)
88 .await
89}
90
91pub async fn handle_file_uploaded_tenant(
92 tenant: Tenant,
93 data: NotificationQueueData,
94 bucket_name: String,
95 object_key: String,
96) -> anyhow::Result<()> {
97 let object_key = match urlencoding::decode(&object_key) {
98 Ok(value) => value.to_string(),
99 Err(err) => {
100 tracing::warn!(
101 ?err,
102 ?bucket_name,
103 ?object_key,
104 "file was uploaded into a bucket but had an invalid file name"
105 );
106 return Ok(());
107 }
108 };
109
110 let db = data.db_cache.get_tenant_pool(&tenant).await?;
111
112 let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
114 Ok(Some(task)) => task,
115 Ok(None) => {
116 tracing::debug!("uploaded file was not a presigned upload");
117 return Ok(());
118 }
119 Err(cause) => {
120 tracing::error!(?cause, "unable to query presigned upload");
121 anyhow::bail!("unable to query presigned upload");
122 }
123 };
124
125 let scope = task.document_box.clone();
126
127 let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
129 Ok(Some(value)) => value,
130 Ok(None) => {
131 tracing::error!("presigned upload folder no longer exists");
132 anyhow::bail!("presigned upload folder no longer exists");
133 }
134 Err(cause) => {
135 tracing::error!(?cause, "unable to query folder");
136 anyhow::bail!("unable to query folder");
137 }
138 };
139
140 let complete = CompletePresigned { task, folder };
142
143 let search = data.search.create_search_index(&tenant);
144 let storage = data.storage.create_storage_layer(&tenant);
145 let events = data.events.create_event_publisher(&tenant);
146
147 safe_complete_presigned(db, search, storage, events, data.processing, complete).await?;
149
150 Ok(())
151}