docbox_core/notifications/
process.rs

1//! # Processing
2//!
3//! Logic for processing notifications from the notification queue
4
5use crate::notifications::{AppNotificationQueue, NotificationQueueMessage};
6use crate::processing::ProcessingLayer;
7use crate::secrets::AppSecretManager;
8use crate::storage::StorageLayerFactory;
9use crate::{
10    events::EventPublisherFactory,
11    files::upload_file_presigned::{CompletePresigned, safe_complete_presigned},
12};
13use docbox_database::{
14    DatabasePoolCache,
15    models::{folder::Folder, presigned_upload_task::PresignedUploadTask, tenant::Tenant},
16};
17use docbox_search::SearchIndexFactory;
18use std::sync::Arc;
19use tracing::Instrument;
20
21#[derive(Clone)]
22pub struct NotificationQueueData {
23    pub db_cache: Arc<DatabasePoolCache<AppSecretManager>>,
24    pub search: SearchIndexFactory,
25    pub storage: StorageLayerFactory,
26    pub events: EventPublisherFactory,
27    pub processing: ProcessingLayer,
28}
29
30/// Processes events coming from the notification queue. This will be
31/// things like successful file uploads that need to be processed
32pub async fn process_notification_queue(
33    mut notification_queue: AppNotificationQueue,
34    data: NotificationQueueData,
35) {
36    // Process messages from the notification queue
37    while let Some(message) = notification_queue.next_message().await {
38        match message {
39            NotificationQueueMessage::FileCreated {
40                bucket_name,
41                object_key,
42            } => {
43                tokio::spawn(safe_handle_file_uploaded(
44                    data.clone(),
45                    bucket_name,
46                    object_key,
47                ));
48            }
49        }
50    }
51}
52
53pub async fn safe_handle_file_uploaded(
54    data: NotificationQueueData,
55    bucket_name: String,
56    object_key: String,
57) {
58    if let Err(cause) = handle_file_uploaded(data, bucket_name, object_key).await {
59        tracing::error!(?cause, "failed to handle sqs file upload");
60    }
61}
62
63pub async fn handle_file_uploaded(
64    data: NotificationQueueData,
65    bucket_name: String,
66    object_key: String,
67) -> anyhow::Result<()> {
68    let tenant = {
69        let db = data.db_cache.get_root_pool().await?;
70        match Tenant::find_by_bucket(&db, &bucket_name).await? {
71            Some(value) => value,
72            None => {
73                tracing::warn!(
74                    ?bucket_name,
75                    ?object_key,
76                    "file was uploaded into a bucket sqs is listening to but there was no matching tenant"
77                );
78                return Ok(());
79            }
80        }
81    };
82
83    // Provide a span that contains the tenant metadata
84    let span = tracing::info_span!("tenant", tenant_id = %tenant.id, tenant_env = %tenant.env);
85
86    handle_file_uploaded_tenant(tenant, data, bucket_name, object_key)
87        .instrument(span)
88        .await
89}
90
91pub async fn handle_file_uploaded_tenant(
92    tenant: Tenant,
93    data: NotificationQueueData,
94    bucket_name: String,
95    object_key: String,
96) -> anyhow::Result<()> {
97    let object_key = match urlencoding::decode(&object_key) {
98        Ok(value) => value.to_string(),
99        Err(err) => {
100            tracing::warn!(
101                ?err,
102                ?bucket_name,
103                ?object_key,
104                "file was uploaded into a bucket but had an invalid file name"
105            );
106            return Ok(());
107        }
108    };
109
110    let db = data.db_cache.get_tenant_pool(&tenant).await?;
111
112    // Locate a pending upload task for the uploaded file
113    let task = match PresignedUploadTask::find_by_file_key(&db, &object_key).await {
114        Ok(Some(task)) => task,
115        Ok(None) => {
116            tracing::debug!("uploaded file was not a presigned upload");
117            return Ok(());
118        }
119        Err(cause) => {
120            tracing::error!(?cause, "unable to query presigned upload");
121            anyhow::bail!("unable to query presigned upload");
122        }
123    };
124
125    let scope = task.document_box.clone();
126
127    // Retrieve the target folder
128    let folder = match Folder::find_by_id(&db, &scope, task.folder_id).await {
129        Ok(Some(value)) => value,
130        Ok(None) => {
131            tracing::error!("presigned upload folder no longer exists");
132            anyhow::bail!("presigned upload folder no longer exists");
133        }
134        Err(cause) => {
135            tracing::error!(?cause, "unable to query folder");
136            anyhow::bail!("unable to query folder");
137        }
138    };
139
140    // Update stored editing user data
141    let complete = CompletePresigned { task, folder };
142
143    let search = data.search.create_search_index(&tenant);
144    let storage = data.storage.create_storage_layer(&tenant);
145    let events = data.events.create_event_publisher(&tenant);
146
147    // Create task future that performs the file upload
148    safe_complete_presigned(db, search, storage, events, data.processing, complete).await?;
149
150    Ok(())
151}