docbox_core/files/
upload_file.rs

1use crate::files::{create_file_key, index_file::store_file_index};
2use crate::processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
3use crate::utils::error::CompositeError;
4use crate::{
5    events::{TenantEventMessage, TenantEventPublisher},
6    files::generated::{QueuedUpload, upload_generated_files},
7};
8use bytes::Bytes;
9use docbox_database::models::document_box::DocumentBoxScopeRaw;
10use docbox_database::models::folder::FolderId;
11use docbox_database::{
12    DbErr, DbPool, DbTransaction,
13    models::{
14        document_box::WithScope,
15        file::{CreateFile, File, FileId},
16        generated_file::GeneratedFile,
17        user::UserId,
18    },
19};
20use docbox_search::TenantSearchIndex;
21use docbox_storage::TenantStorageLayer;
22use mime::Mime;
23use serde::{Deserialize, Serialize};
24use std::ops::DerefMut;
25use thiserror::Error;
26use tracing::Instrument;
27use utoipa::ToSchema;
28use uuid::Uuid;
29
30#[derive(Debug, Error)]
31pub enum UploadFileError {
32    /// Failed to create the search index
33    #[error("failed to create file search index: {0}")]
34    CreateIndex(anyhow::Error),
35
36    /// Failed to create the file database row
37    #[error("failed to create file")]
38    CreateFile(DbErr),
39
40    /// Failed to set file encryption state
41    #[error("failed to store file encryption state")]
42    SetEncryption(DbErr),
43
44    /// Failed to process the file
45    #[error("failed to process file: {0}")]
46    Processing(#[from] ProcessingError),
47
48    /// Failed to upload generated file to storage layer
49    #[error("failed to upload generated file to storage layer: {0}")]
50    UploadGeneratedFile(anyhow::Error),
51
52    /// Failed to create the generated file database row
53    #[error("failed to create generated file")]
54    CreateGeneratedFile(DbErr),
55
56    /// Failed to upload file to storage layer
57    #[error("failed to upload file to storage layer: {0}")]
58    UploadFile(anyhow::Error),
59
60    /// Multiple error messages
61    #[error(transparent)]
62    Composite(#[from] CompositeError),
63}
64
65/// State keeping track of whats been generated from a file
66/// upload, to help with reverted changes on failure
67#[derive(Default)]
68pub struct UploadFileState {
69    /// S3 file upload keys
70    pub s3_upload_keys: Vec<String>,
71    /// Search index files
72    pub search_index_files: Vec<Uuid>,
73}
74
75pub struct UploadFile {
76    /// Fixed file ID to use instead of a randomly
77    /// generated file ID
78    pub fixed_id: Option<FileId>,
79
80    /// ID of the parent file if this file is related to another file
81    pub parent_id: Option<FileId>,
82
83    /// ID of the destination folder
84    pub folder_id: FolderId,
85
86    /// Document box the file and folder are contained within
87    pub document_box: DocumentBoxScopeRaw,
88
89    /// File name
90    pub name: String,
91
92    /// File content type
93    pub mime: Mime,
94
95    /// File content
96    pub file_bytes: Bytes,
97
98    /// User uploading the file
99    pub created_by: Option<UserId>,
100
101    /// Key to the file if the file is already uploaded to S3
102    pub file_key: Option<String>,
103
104    /// Config that can be used when processing for additional
105    /// configuration to how the file is processed
106    pub processing_config: Option<ProcessingConfig>,
107}
108
109#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
110#[serde(default)]
111pub struct ProcessingConfig {
112    /// Email specific processing configuration
113    pub email: Option<EmailProcessingConfig>,
114}
115
116#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
117#[serde(default)]
118pub struct EmailProcessingConfig {
119    /// Whether to skip extracting attachments when processing an email
120    pub skip_attachments: Option<bool>,
121}
122
123pub struct UploadedFileData {
124    /// The uploaded file itself
125    pub file: File,
126    /// Generated data alongside the file
127    pub generated: Vec<GeneratedFile>,
128    /// Additional files created and uploaded from processing the file
129    pub additional_files: Vec<UploadedFileData>,
130}
131
132/// Safely performs [upload_file] ensuring that on failure all resources are
133/// properly cleaned up
134pub async fn safe_upload_file(
135    db: DbPool,
136    search: TenantSearchIndex,
137    storage: TenantStorageLayer,
138    events: TenantEventPublisher,
139    processing: ProcessingLayer,
140    upload: UploadFile,
141) -> Result<UploadedFileData, anyhow::Error> {
142    // Start a database transaction
143    let mut db = db.begin().await.map_err(|cause| {
144        tracing::error!(?cause, "failed to begin transaction");
145        anyhow::anyhow!("failed to begin transaction")
146    })?;
147
148    // Create state for tracking allocated resources
149    let mut upload_state = UploadFileState::default();
150
151    let output = match upload_file(
152        &mut db,
153        &search,
154        &storage,
155        &events,
156        &processing,
157        upload,
158        &mut upload_state,
159    )
160    .await
161    {
162        Ok(value) => value,
163        Err(err) => {
164            let span = tracing::Span::current();
165
166            // Attempt to rollback any allocated resources in the background
167            tokio::spawn(
168                async move {
169                    if let Err(cause) = db.rollback().await {
170                        tracing::error!(?cause, "failed to roll back database transaction");
171                    }
172
173                    rollback_upload_file(&search, &storage, upload_state).await;
174                }
175                .instrument(span),
176            );
177
178            return Err(anyhow::Error::from(err));
179        }
180    };
181
182    // Commit the transaction
183    if let Err(cause) = db.commit().await {
184        tracing::error!(?cause, "failed to commit transaction");
185        let span = tracing::Span::current();
186
187        // Attempt to rollback any allocated resources in the background
188        tokio::spawn(
189            async move {
190                rollback_upload_file(&search, &storage, upload_state).await;
191            }
192            .instrument(span),
193        );
194
195        return Err(anyhow::anyhow!("failed to commit transaction"));
196    }
197
198    Ok(output)
199}
200
201pub async fn upload_file(
202    db: &mut DbTransaction<'_>,
203    search: &TenantSearchIndex,
204    storage: &TenantStorageLayer,
205    events: &TenantEventPublisher,
206    processing: &ProcessingLayer,
207    upload: UploadFile,
208    upload_state: &mut UploadFileState,
209) -> Result<UploadedFileData, UploadFileError> {
210    // Determine if we need to upload and what the file key is
211    let (s3_upload, file_key) = match upload.file_key.as_ref() {
212        // Already have a file key, don't want to upload
213        Some(file_key) => (false, file_key.clone()),
214
215        // No existing file key, we are creating one and uploading the file
216        None => (
217            true,
218            create_file_key(
219                &upload.document_box,
220                &upload.name,
221                &upload.mime,
222                Uuid::new_v4(),
223            ),
224        ),
225    };
226
227    // Process the file
228    let processing_output = process_file(
229        &upload.processing_config,
230        processing,
231        upload.file_bytes.clone(),
232        &upload.mime,
233    )
234    .await?;
235
236    // Get file encryption state
237    let encrypted = processing_output
238        .as_ref()
239        .map(|output| output.encrypted)
240        .unwrap_or_default();
241
242    // Create database record
243    let file = create_file_record(db, &upload, &file_key, &upload.file_bytes, encrypted).await?;
244
245    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
246    let mut generated_files: Option<Vec<GeneratedFile>> = None;
247    let mut additional_files: Vec<UploadedFileData> = Vec::new();
248
249    if let Some(processing_output) = processing_output {
250        index_metadata = processing_output.index_metadata;
251
252        tracing::debug!("uploading generated files");
253        let files = store_generated_files(
254            db,
255            storage,
256            &file,
257            &mut upload_state.s3_upload_keys,
258            processing_output.upload_queue,
259        )
260        .await?;
261        generated_files = Some(files);
262
263        // Process additional files
264        for additional_file in processing_output.additional_files {
265            let upload = UploadFile {
266                parent_id: Some(file.id),
267                fixed_id: additional_file.fixed_id,
268                folder_id: upload.folder_id,
269                document_box: upload.document_box.clone(),
270                name: additional_file.name,
271                mime: additional_file.mime,
272                file_bytes: additional_file.bytes,
273                created_by: upload.created_by.clone(),
274                file_key: None,
275                processing_config: upload.processing_config.clone(),
276            };
277
278            // Process the child file (Additional file outputs are ignored)
279            let output = Box::pin(upload_file(
280                db,
281                search,
282                storage,
283                events,
284                processing,
285                upload,
286                upload_state,
287            ))
288            .await?;
289
290            additional_files.push(output);
291        }
292    }
293
294    // Index the file in the search index
295    tracing::debug!("indexing file contents");
296    store_file_index(search, &file, &upload.document_box, index_metadata).await?;
297    upload_state.search_index_files.push(file.id);
298
299    if s3_upload {
300        // Upload the file itself to S3
301        tracing::debug!("uploading main file");
302        storage
303            .upload_file(&file_key, file.mime.clone(), upload.file_bytes)
304            .await
305            .map_err(UploadFileError::UploadFile)?;
306        upload_state.s3_upload_keys.push(file_key.clone());
307    }
308
309    // Publish an event
310    events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
311        file.clone(),
312        upload.document_box.clone(),
313    )));
314
315    Ok(UploadedFileData {
316        file,
317        generated: generated_files.unwrap_or_default(),
318        additional_files,
319    })
320}
321
322/// Create a file database record
323async fn create_file_record(
324    db: &mut DbTransaction<'_>,
325    upload: &UploadFile,
326    file_key: &str,
327    file_bytes: &Bytes,
328    encrypted: bool,
329) -> Result<File, UploadFileError> {
330    let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
331    let size = file_bytes.len().min(i32::MAX as usize) as i32;
332
333    // Create file to commit against
334    File::create(
335        db.deref_mut(),
336        CreateFile {
337            parent_id: upload.parent_id,
338            fixed_id: upload.fixed_id,
339            name: upload.name.clone(),
340            mime: upload.mime.to_string(),
341            file_key: file_key.to_owned(),
342            folder_id: upload.folder_id,
343            hash,
344            size,
345            created_by: upload.created_by.clone(),
346            encrypted,
347        },
348    )
349    .await
350    .map_err(UploadFileError::CreateFile)
351}
352
353/// Stores the provided queued file uploads as generated files in
354/// the database for a specific file, returns the generated file
355/// database entries
356///
357/// Any uploads that succeed to S3 will have their file key pushed
358/// to `s3_upload_keys` so that it can be rolled back if any errors
359/// occur
360pub async fn store_generated_files(
361    db: &mut DbTransaction<'_>,
362    storage: &TenantStorageLayer,
363    file: &File,
364    s3_upload_keys: &mut Vec<String>,
365    queued_uploads: Vec<QueuedUpload>,
366) -> Result<Vec<GeneratedFile>, UploadFileError> {
367    // Upload the generated files to S3
368    let upload_results = upload_generated_files(
369        storage,
370        &file.file_key,
371        &file.id,
372        &file.hash,
373        queued_uploads,
374    )
375    .await;
376
377    let mut generated_files = Vec::new();
378    let mut upload_errors = Vec::new();
379
380    for result in upload_results {
381        match result {
382            // Successful upload, store generated file
383            Ok(create) => {
384                // Track uploaded file keys
385                s3_upload_keys.push(file.file_key.clone());
386
387                // Store generated file in database
388                let generated_file = match GeneratedFile::create(db.deref_mut(), create)
389                    .await
390                    .map_err(UploadFileError::CreateGeneratedFile)
391                {
392                    Ok(value) => value,
393                    Err(err) => {
394                        upload_errors.push(err);
395                        continue;
396                    }
397                };
398
399                generated_files.push(generated_file);
400            }
401            // Failed upload
402            Err(cause) => {
403                tracing::error!(?cause, "failed to upload generated file");
404                upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
405            }
406        }
407    }
408
409    // Handle any errors in the upload process
410    // (This must occur after so that we can ensure we capture all successful uploads to rollback)
411    if !upload_errors.is_empty() {
412        tracing::warn!("error while uploading generated files, operation failed");
413
414        let error = upload_errors
415            .into_iter()
416            .map(anyhow::Error::from)
417            .collect::<CompositeError>();
418
419        return Err(UploadFileError::Composite(error));
420    }
421
422    Ok(generated_files)
423}
424
425/// Performs the process of rolling back a file upload based
426/// on the current upload state
427pub async fn rollback_upload_file(
428    search: &TenantSearchIndex,
429    storage: &TenantStorageLayer,
430    upload_state: UploadFileState,
431) {
432    // Revert upload S3 files
433    for key in upload_state.s3_upload_keys {
434        if let Err(err) = storage.delete_file(&key).await {
435            tracing::error!(?err, "failed to rollback created tenant s3 file");
436        }
437    }
438
439    // Revert file index data
440    for index in upload_state.search_index_files {
441        if let Err(err) = search.delete_data(index).await {
442            tracing::error!(?index, ?err, "failed to rollback created file search index",);
443        }
444    }
445}