docbox_core/notifications/
process.rs1use super::{AppNotificationQueue, NotificationQueueMessage};
6use crate::{
7 events::EventPublisherFactory,
8 files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
9 tenant::tenant_options_ext::TenantOptionsExt,
10};
11use docbox_database::{
12 DatabasePoolCache,
13 models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
14};
15use docbox_processing::ProcessingLayer;
16use docbox_search::SearchIndexFactory;
17use docbox_storage::StorageLayerFactory;
18use std::sync::Arc;
19use tracing::Instrument;
20
21#[derive(Clone)]
22pub struct NotificationQueueData {
23 pub db_cache: Arc<DatabasePoolCache>,
24 pub search: SearchIndexFactory,
25 pub storage: StorageLayerFactory,
26 pub events: EventPublisherFactory,
27 pub processing: ProcessingLayer,
28}
29
30pub async fn process_notification_queue(
33 mut notification_queue: AppNotificationQueue,
34 data: NotificationQueueData,
35) {
36 while let Some(message) = notification_queue.next_message().await {
38 match message {
39 NotificationQueueMessage::FileCreated {
40 bucket_name,
41 object_key,
42 } => {
43 tokio::spawn(handle_file_uploaded(data.clone(), bucket_name, object_key));
44 }
45 }
46 }
47}
48
49#[tracing::instrument(skip(data))]
51pub async fn handle_file_uploaded(
52 data: NotificationQueueData,
53 bucket_name: String,
54 object_key: String,
55) {
56 let tenant = {
57 let db = match data.db_cache.get_root_pool().await {
58 Ok(value) => value,
59 Err(error) => {
60 tracing::error!(?error, "failed to acquire root database pool");
61 return;
62 }
63 };
64
65 match Tenant::find_by_bucket(&db, &bucket_name).await {
66 Ok(Some(value)) => value,
67 Ok(None) => {
68 tracing::warn!(
69 "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
70 );
71 return;
72 }
73 Err(error) => {
74 tracing::error!(?error, "failed to query tenant for bucket");
75 return;
76 }
77 }
78 };
79
80 let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
82
83 handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
84 .instrument(span)
85 .await;
86}
87
88#[tracing::instrument(skip(data))]
90pub async fn handle_file_uploaded_tenant(
91 tenant: Tenant,
92 data: NotificationQueueData,
93 bucket_name: String,
94 object_key: String,
95) {
96 let object_key = match urlencoding::decode(&object_key) {
97 Ok(value) => value.to_string(),
98 Err(error) => {
99 tracing::warn!(
100 ?error,
101 "file was uploaded into a bucket but had an invalid file name"
102 );
103 return;
104 }
105 };
106
107 let db = match data.db_cache.get_tenant_pool(&tenant).await {
108 Ok(value) => value,
109 Err(error) => {
110 tracing::error!(?error, "failed to get tenant database pool");
111 return;
112 }
113 };
114
115 let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
117 Ok(Some(task)) => task,
118 Ok(None) => {
121 tracing::debug!("uploaded file was not a presigned upload");
122 return;
123 }
124 Err(error) => {
125 tracing::error!(?error, "unable to query presigned upload");
126 return;
127 }
128 };
129
130 let scope = task.document_box.clone();
131
132 let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
134 Ok(Some(value)) => value,
135 Ok(None) => {
136 tracing::error!("presigned upload folder no longer exists");
137 return;
138 }
139 Err(error) => {
140 tracing::error!(?error, "unable to query folder");
141 return;
142 }
143 };
144
145 let complete = CompletePresigned { task, folder };
147
148 let search = data.search.create_search_index(&tenant);
149 let storage = data.storage.create_layer(tenant.storage_layer_options());
150 let events = data.events.create_event_publisher(&tenant);
151
152 if let Err(error) =
154 safe_complete_presigned(db, search, storage, events, data.processing, complete).await
155 {
156 tracing::error!(?error, "failed to complete presigned file upload");
157 }
158}