docbox_core/notifications/
process.rs

1//! # Processing
2//!
3//! Logic for processing notifications from the notification queue
4
5use crate::notifications::{AppNotificationQueue, NotificationQueueMessage};
6use crate::processing::ProcessingLayer;
7use crate::{
8    events::EventPublisherFactory,
9    files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
10};
11use docbox_database::{
12    DatabasePoolCache,
13    models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
14};
15use docbox_search::SearchIndexFactory;
16use docbox_storage::StorageLayerFactory;
17use std::sync::Arc;
18use tracing::Instrument;
19
20#[derive(Clone)]
21pub struct NotificationQueueData {
22    pub db_cache: Arc<DatabasePoolCache>,
23    pub search: SearchIndexFactory,
24    pub storage: StorageLayerFactory,
25    pub events: EventPublisherFactory,
26    pub processing: ProcessingLayer,
27}
28
29/// Processes events coming from the notification queue. This will be
30/// things like successful file uploads that need to be processed
31pub async fn process_notification_queue(
32    mut notification_queue: AppNotificationQueue,
33    data: NotificationQueueData,
34) {
35    // Process messages from the notification queue
36    while let Some(message) = notification_queue.next_message().await {
37        match message {
38            NotificationQueueMessage::FileCreated {
39                bucket_name,
40                object_key,
41            } => {
42                tokio::spawn(safe_handle_file_uploaded(
43                    data.clone(),
44                    bucket_name,
45                    object_key,
46                ));
47            }
48        }
49    }
50}
51
52pub async fn safe_handle_file_uploaded(
53    data: NotificationQueueData,
54    bucket_name: String,
55    object_key: String,
56) {
57    if let Err(cause) = handle_file_uploaded(data, bucket_name, object_key).await {
58        tracing::error!(?cause, "failed to handle sqs file upload");
59    }
60}
61
62pub async fn handle_file_uploaded(
63    data: NotificationQueueData,
64    bucket_name: String,
65    object_key: String,
66) -> anyhow::Result<()> {
67    let tenant = {
68        let db = data.db_cache.get_root_pool().await?;
69        match Tenant::find_by_bucket(&db, &bucket_name).await? {
70            Some(value) => value,
71            None => {
72                tracing::warn!(
73                    ?bucket_name,
74                    ?object_key,
75                    "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
76                );
77                return Ok(());
78            }
79        }
80    };
81
82    // Provide a span that contains the tenant metadata
83    let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
84
85    handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
86        .instrument(span)
87        .await
88}
89
90pub async fn handle_file_uploaded_tenant(
91    tenant: Tenant,
92    data: NotificationQueueData,
93    bucket_name: String,
94    object_key: String,
95) -> anyhow::Result<()> {
96    let object_key = match urlencoding::decode(&object_key) {
97        Ok(value) => value.to_string(),
98        Err(err) => {
99            tracing::warn!(
100                ?err,
101                ?bucket_name,
102                ?object_key,
103                "file was uploaded into a bucket but had an invalid file name"
104            );
105            return Ok(());
106        }
107    };
108
109    let db = data.db_cache.get_tenant_pool(&tenant).await?;
110
111    // Locate a pending upload task for the uploaded file
112    let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
113        Ok(Some(task)) => task,
114        Ok(None) => {
115            tracing::debug!("uploaded file was not a presigned upload");
116            return Ok(());
117        }
118        Err(cause) => {
119            tracing::error!(?cause, "unable to query presigned upload");
120            anyhow::bail!("unable to query presigned upload");
121        }
122    };
123
124    let scope = task.document_box.clone();
125
126    // Retrieve the target folder
127    let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
128        Ok(Some(value)) => value,
129        Ok(None) => {
130            tracing::error!("presigned upload folder no longer exists");
131            anyhow::bail!("presigned upload folder no longer exists");
132        }
133        Err(cause) => {
134            tracing::error!(?cause, "unable to query folder");
135            anyhow::bail!("unable to query folder");
136        }
137    };
138
139    // Update stored editing user data
140    let complete = CompletePresigned { task, folder };
141
142    let search = data.search.create_search_index(&tenant);
143    let storage = data.storage.create_storage_layer(&tenant);
144    let events = data.events.create_event_publisher(&tenant);
145
146    // Create task future that performs the file upload
147    safe_complete_presigned(db, search, storage, events, data.processing, complete).await?;
148
149    Ok(())
150}