docbox_core/notifications/
process.rs1use super::{AppNotificationQueue, NotificationQueueMessage};
6use crate::{
7 events::EventPublisherFactory,
8 files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
9};
10use docbox_database::{
11 DatabasePoolCache,
12 models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
13};
14use docbox_processing::ProcessingLayer;
15use docbox_search::SearchIndexFactory;
16use docbox_storage::StorageLayerFactory;
17use std::sync::Arc;
18use tracing::Instrument;
19
20#[derive(Clone)]
21pub struct NotificationQueueData {
22 pub db_cache: Arc<DatabasePoolCache>,
23 pub search: SearchIndexFactory,
24 pub storage: StorageLayerFactory,
25 pub events: EventPublisherFactory,
26 pub processing: ProcessingLayer,
27}
28
29pub async fn process_notification_queue(
32 mut notification_queue: AppNotificationQueue,
33 data: NotificationQueueData,
34) {
35 while let Some(message) = notification_queue.next_message().await {
37 match message {
38 NotificationQueueMessage::FileCreated {
39 bucket_name,
40 object_key,
41 } => {
42 tokio::spawn(handle_file_uploaded(data.clone(), bucket_name, object_key));
43 }
44 }
45 }
46}
47
48#[tracing::instrument(skip(data))]
50pub async fn handle_file_uploaded(
51 data: NotificationQueueData,
52 bucket_name: String,
53 object_key: String,
54) {
55 let tenant = {
56 let db = match data.db_cache.get_root_pool().await {
57 Ok(value) => value,
58 Err(error) => {
59 tracing::error!(?error, "failed to acquire root database pool");
60 return;
61 }
62 };
63
64 match Tenant::find_by_bucket(&db, &bucket_name).await {
65 Ok(Some(value)) => value,
66 Ok(None) => {
67 tracing::warn!(
68 "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
69 );
70 return;
71 }
72 Err(error) => {
73 tracing::error!(?error, "failed to query tenant for bucket");
74 return;
75 }
76 }
77 };
78
79 let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
81
82 handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
83 .instrument(span)
84 .await;
85}
86
87#[tracing::instrument(skip(data))]
89pub async fn handle_file_uploaded_tenant(
90 tenant: Tenant,
91 data: NotificationQueueData,
92 bucket_name: String,
93 object_key: String,
94) {
95 let object_key = match urlencoding::decode(&object_key) {
96 Ok(value) => value.to_string(),
97 Err(error) => {
98 tracing::warn!(
99 ?error,
100 "file was uploaded into a bucket but had an invalid file name"
101 );
102 return;
103 }
104 };
105
106 let db = match data.db_cache.get_tenant_pool(&tenant).await {
107 Ok(value) => value,
108 Err(error) => {
109 tracing::error!(?error, "failed to get tenant database pool");
110 return;
111 }
112 };
113
114 let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
116 Ok(Some(task)) => task,
117 Ok(None) => {
120 tracing::debug!("uploaded file was not a presigned upload");
121 return;
122 }
123 Err(error) => {
124 tracing::error!(?error, "unable to query presigned upload");
125 return;
126 }
127 };
128
129 let scope = task.document_box.clone();
130
131 let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
133 Ok(Some(value)) => value,
134 Ok(None) => {
135 tracing::error!("presigned upload folder no longer exists");
136 return;
137 }
138 Err(error) => {
139 tracing::error!(?error, "unable to query folder");
140 return;
141 }
142 };
143
144 let complete = CompletePresigned { task, folder };
146
147 let search = data.search.create_search_index(&tenant);
148 let storage = data.storage.create_storage_layer(&tenant);
149 let events = data.events.create_event_publisher(&tenant);
150
151 if let Err(error) =
153 safe_complete_presigned(db, search, storage, events, data.processing, complete).await
154 {
155 tracing::error!(?error, "failed to complete presigned file upload");
156 }
157}