Skip to main content

docbox_core/notifications/
process.rs

1//! # Processing
2//!
3//! Logic for processing notifications from the notification queue
4
5use super::{AppNotificationQueue, NotificationQueueMessage};
6use crate::{
7    events::EventPublisherFactory,
8    files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
9    tenant::tenant_options_ext::TenantOptionsExt,
10};
11use docbox_database::{
12    DatabasePoolCache,
13    models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
14};
15use docbox_processing::ProcessingLayer;
16use docbox_search::SearchIndexFactory;
17use docbox_storage::StorageLayerFactory;
18use std::sync::Arc;
19use tracing::Instrument;
20
21#[derive(Clone)]
22pub struct NotificationQueueData {
23    pub db_cache: Arc<DatabasePoolCache>,
24    pub search: SearchIndexFactory,
25    pub storage: StorageLayerFactory,
26    pub events: EventPublisherFactory,
27    pub processing: ProcessingLayer,
28}
29
30/// Processes events coming from the notification queue. This will be
31/// things like successful file uploads that need to be processed
32pub async fn process_notification_queue(
33    mut notification_queue: AppNotificationQueue,
34    data: NotificationQueueData,
35) {
36    // Process messages from the notification queue
37    while let Some(message) = notification_queue.next_message().await {
38        match message {
39            NotificationQueueMessage::FileCreated {
40                bucket_name,
41                object_key,
42            } => {
43                tokio::spawn(handle_file_uploaded(data.clone(), bucket_name, object_key));
44            }
45        }
46    }
47}
48
49/// Handle file upload notifications
50#[tracing::instrument(skip(data))]
51pub async fn handle_file_uploaded(
52    data: NotificationQueueData,
53    bucket_name: String,
54    object_key: String,
55) {
56    let tenant = {
57        let db = match data.db_cache.get_root_pool().await {
58            Ok(value) => value,
59            Err(error) => {
60                tracing::error!(?error, "failed to acquire root database pool");
61                return;
62            }
63        };
64
65        match Tenant::find_by_bucket(&db, &bucket_name).await {
66            Ok(Some(value)) => value,
67            Ok(None) => {
68                tracing::warn!(
69                    "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
70                );
71                return;
72            }
73            Err(error) => {
74                tracing::error!(?error, "failed to query tenant for bucket");
75                return;
76            }
77        }
78    };
79
80    // Provide a span that contains the tenant metadata
81    let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
82
83    handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
84        .instrument(span)
85        .await;
86}
87
88/// Handle file upload notification once the tenant has been identified
89#[tracing::instrument(skip(data))]
90pub async fn handle_file_uploaded_tenant(
91    tenant: Tenant,
92    data: NotificationQueueData,
93    bucket_name: String,
94    object_key: String,
95) {
96    let object_key = match urlencoding::decode(&object_key) {
97        Ok(value) => value.to_string(),
98        Err(error) => {
99            tracing::warn!(
100                ?error,
101                "file was uploaded into a bucket but had an invalid file name"
102            );
103            return;
104        }
105    };
106
107    let db = match data.db_cache.get_tenant_pool(&tenant).await {
108        Ok(value) => value,
109        Err(error) => {
110            tracing::error!(?error, "failed to get tenant database pool");
111            return;
112        }
113    };
114
115    // Locate a pending upload task for the uploaded file
116    let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
117        Ok(Some(task)) => task,
118        // Ignore files that aren't attached to a presigned upload task
119        // (Things like generated files will show up here)
120        Ok(None) => {
121            tracing::debug!("uploaded file was not a presigned upload");
122            return;
123        }
124        Err(error) => {
125            tracing::error!(?error, "unable to query presigned upload");
126            return;
127        }
128    };
129
130    let scope = task.document_box.clone();
131
132    // Retrieve the target folder
133    let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
134        Ok(Some(value)) => value,
135        Ok(None) => {
136            tracing::error!("presigned upload folder no longer exists");
137            return;
138        }
139        Err(error) => {
140            tracing::error!(?error, "unable to query folder");
141            return;
142        }
143    };
144
145    // Update stored editing user data
146    let complete = CompletePresigned { task, folder };
147
148    let search = data.search.create_search_index(&tenant);
149    let storage = data.storage.create_layer(tenant.storage_layer_options());
150    let events = data.events.create_event_publisher(&tenant);
151
152    // Create task future that performs the file upload
153    if let Err(error) =
154        safe_complete_presigned(db, search, storage, events, data.processing, complete).await
155    {
156        tracing::error!(?error, "failed to complete presigned file upload");
157    }
158}