Skip to main content

docbox_core/notifications/
process.rs

1//! # Processing
2//!
3//! Logic for processing notifications from the notification queue
4
5use super::{AppNotificationQueue, NotificationQueueMessage};
6use crate::{
7    events::EventPublisherFactory,
8    files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
9};
10use docbox_database::{
11    DatabasePoolCache,
12    models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
13};
14use docbox_processing::ProcessingLayer;
15use docbox_search::SearchIndexFactory;
16use docbox_storage::StorageLayerFactory;
17use std::sync::Arc;
18use tracing::Instrument;
19
20#[derive(Clone)]
21pub struct NotificationQueueData {
22    pub db_cache: Arc<DatabasePoolCache>,
23    pub search: SearchIndexFactory,
24    pub storage: StorageLayerFactory,
25    pub events: EventPublisherFactory,
26    pub processing: ProcessingLayer,
27}
28
29/// Processes events coming from the notification queue. This will be
30/// things like successful file uploads that need to be processed
31pub async fn process_notification_queue(
32    mut notification_queue: AppNotificationQueue,
33    data: NotificationQueueData,
34) {
35    // Process messages from the notification queue
36    while let Some(message) = notification_queue.next_message().await {
37        match message {
38            NotificationQueueMessage::FileCreated {
39                bucket_name,
40                object_key,
41            } => {
42                tokio::spawn(handle_file_uploaded(data.clone(), bucket_name, object_key));
43            }
44        }
45    }
46}
47
48/// Handle file upload notifications
49#[tracing::instrument(skip(data))]
50pub async fn handle_file_uploaded(
51    data: NotificationQueueData,
52    bucket_name: String,
53    object_key: String,
54) {
55    let tenant = {
56        let db = match data.db_cache.get_root_pool().await {
57            Ok(value) => value,
58            Err(error) => {
59                tracing::error!(?error, "failed to acquire root database pool");
60                return;
61            }
62        };
63
64        match Tenant::find_by_bucket(&db, &bucket_name).await {
65            Ok(Some(value)) => value,
66            Ok(None) => {
67                tracing::warn!(
68                    "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
69                );
70                return;
71            }
72            Err(error) => {
73                tracing::error!(?error, "failed to query tenant for bucket");
74                return;
75            }
76        }
77    };
78
79    // Provide a span that contains the tenant metadata
80    let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
81
82    handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
83        .instrument(span)
84        .await;
85}
86
87/// Handle file upload notification once the tenant has been identified
88#[tracing::instrument(skip(data))]
89pub async fn handle_file_uploaded_tenant(
90    tenant: Tenant,
91    data: NotificationQueueData,
92    bucket_name: String,
93    object_key: String,
94) {
95    let object_key = match urlencoding::decode(&object_key) {
96        Ok(value) => value.to_string(),
97        Err(error) => {
98            tracing::warn!(
99                ?error,
100                "file was uploaded into a bucket but had an invalid file name"
101            );
102            return;
103        }
104    };
105
106    let db = match data.db_cache.get_tenant_pool(&tenant).await {
107        Ok(value) => value,
108        Err(error) => {
109            tracing::error!(?error, "failed to get tenant database pool");
110            return;
111        }
112    };
113
114    // Locate a pending upload task for the uploaded file
115    let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
116        Ok(Some(task)) => task,
117        // Ignore files that aren't attached to a presigned upload task
118        // (Things like generated files will show up here)
119        Ok(None) => {
120            tracing::debug!("uploaded file was not a presigned upload");
121            return;
122        }
123        Err(error) => {
124            tracing::error!(?error, "unable to query presigned upload");
125            return;
126        }
127    };
128
129    let scope = task.document_box.clone();
130
131    // Retrieve the target folder
132    let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
133        Ok(Some(value)) => value,
134        Ok(None) => {
135            tracing::error!("presigned upload folder no longer exists");
136            return;
137        }
138        Err(error) => {
139            tracing::error!(?error, "unable to query folder");
140            return;
141        }
142    };
143
144    // Update stored editing user data
145    let complete = CompletePresigned { task, folder };
146
147    let search = data.search.create_search_index(&tenant);
148    let storage = data.storage.create_storage_layer(&tenant);
149    let events = data.events.create_event_publisher(&tenant);
150
151    // Create task future that performs the file upload
152    if let Err(error) =
153        safe_complete_presigned(db, search, storage, events, data.processing, complete).await
154    {
155        tracing::error!(?error, "failed to complete presigned file upload");
156    }
157}