docbox_core/files/
upload_file.rs

1use crate::files::{create_file_key, index_file::store_file_index};
2use crate::processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
3use crate::storage::TenantStorageLayer;
4use crate::utils::error::CompositeError;
5use crate::{
6    events::{TenantEventMessage, TenantEventPublisher},
7    files::generated::{QueuedUpload, upload_generated_files},
8};
9use bytes::Bytes;
10use docbox_database::models::document_box::DocumentBoxScopeRaw;
11use docbox_database::models::folder::FolderId;
12use docbox_database::{
13    DbErr, DbPool, DbTransaction,
14    models::{
15        document_box::WithScope,
16        file::{CreateFile, File, FileId},
17        generated_file::GeneratedFile,
18        user::UserId,
19    },
20};
21use docbox_search::TenantSearchIndex;
22use mime::Mime;
23use serde::{Deserialize, Serialize};
24use std::ops::DerefMut;
25use thiserror::Error;
26use utoipa::ToSchema;
27use uuid::Uuid;
28
29#[derive(Debug, Error)]
30pub enum UploadFileError {
31    /// Failed to create the search index
32    #[error("failed to create file search index: {0}")]
33    CreateIndex(anyhow::Error),
34
35    /// Failed to create the file database row
36    #[error("failed to create file")]
37    CreateFile(DbErr),
38
39    /// Failed to set file encryption state
40    #[error("failed to store file encryption state")]
41    SetEncryption(DbErr),
42
43    /// Failed to process the file
44    #[error("failed to process file: {0}")]
45    Processing(#[from] ProcessingError),
46
47    /// Failed to upload generated file to storage layer
48    #[error("failed to upload generated file to storage layer: {0}")]
49    UploadGeneratedFile(anyhow::Error),
50
51    /// Failed to create the generated file database row
52    #[error("failed to create generated file")]
53    CreateGeneratedFile(DbErr),
54
55    /// Failed to upload file to storage layer
56    #[error("failed to upload file to storage layer: {0}")]
57    UploadFile(anyhow::Error),
58
59    /// Multiple error messages
60    #[error(transparent)]
61    Composite(#[from] CompositeError),
62}
63
64/// State keeping track of whats been generated from a file
65/// upload, to help with reverted changes on failure
66#[derive(Default)]
67pub struct UploadFileState {
68    /// S3 file upload keys
69    pub s3_upload_keys: Vec<String>,
70    /// Search index files
71    pub search_index_files: Vec<Uuid>,
72}
73
74pub struct UploadFile {
75    /// Fixed file ID to use instead of a randomly
76    /// generated file ID
77    pub fixed_id: Option<FileId>,
78
79    /// ID of the parent file if this file is related to another file
80    pub parent_id: Option<FileId>,
81
82    /// ID of the destination folder
83    pub folder_id: FolderId,
84
85    /// Document box the file and folder are contained within
86    pub document_box: DocumentBoxScopeRaw,
87
88    /// File name
89    pub name: String,
90
91    /// File content type
92    pub mime: Mime,
93
94    /// File content
95    pub file_bytes: Bytes,
96
97    /// User uploading the file
98    pub created_by: Option<UserId>,
99
100    /// Key to the file if the file is already uploaded to S3
101    pub file_key: Option<String>,
102
103    /// Config that can be used when processing for additional
104    /// configuration to how the file is processed
105    pub processing_config: Option<ProcessingConfig>,
106}
107
108#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
109#[serde(default)]
110pub struct ProcessingConfig {
111    /// Email specific processing configuration
112    pub email: Option<EmailProcessingConfig>,
113}
114
115#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
116#[serde(default)]
117pub struct EmailProcessingConfig {
118    /// Whether to skip extracting attachments when processing an email
119    pub skip_attachments: Option<bool>,
120}
121
122pub struct UploadedFileData {
123    /// The uploaded file itself
124    pub file: File,
125    /// Generated data alongside the file
126    pub generated: Vec<GeneratedFile>,
127    /// Additional files created and uploaded from processing the file
128    pub additional_files: Vec<UploadedFileData>,
129}
130
131/// Safely performs [upload_file] ensuring that on failure all resources are
132/// properly cleaned up
133pub async fn safe_upload_file(
134    db: DbPool,
135    search: TenantSearchIndex,
136    storage: TenantStorageLayer,
137    events: TenantEventPublisher,
138    processing: ProcessingLayer,
139    upload: UploadFile,
140) -> Result<UploadedFileData, anyhow::Error> {
141    // Start a database transaction
142    let mut db = db.begin().await.map_err(|cause| {
143        tracing::error!(?cause, "failed to begin transaction");
144        anyhow::anyhow!("failed to begin transaction")
145    })?;
146
147    // Create state for tracking allocated resources
148    let mut upload_state = UploadFileState::default();
149
150    let output = match upload_file(
151        &mut db,
152        &search,
153        &storage,
154        &events,
155        &processing,
156        upload,
157        &mut upload_state,
158    )
159    .await
160    {
161        Ok(value) => value,
162        Err(err) => {
163            // Attempt to rollback any allocated resources in the background
164            tokio::spawn(async move {
165                if let Err(cause) = db.rollback().await {
166                    tracing::error!(?cause, "failed to roll back database transaction");
167                }
168
169                rollback_upload_file(&search, &storage, upload_state).await;
170            });
171
172            return Err(anyhow::Error::from(err));
173        }
174    };
175
176    // Commit the transaction
177    if let Err(cause) = db.commit().await {
178        tracing::error!(?cause, "failed to commit transaction");
179
180        // Attempt to rollback any allocated resources in the background
181        tokio::spawn(async move {
182            rollback_upload_file(&search, &storage, upload_state).await;
183        });
184
185        return Err(anyhow::anyhow!("failed to commit transaction"));
186    }
187
188    Ok(output)
189}
190
191pub async fn upload_file(
192    db: &mut DbTransaction<'_>,
193    search: &TenantSearchIndex,
194    storage: &TenantStorageLayer,
195    events: &TenantEventPublisher,
196    processing: &ProcessingLayer,
197    upload: UploadFile,
198    upload_state: &mut UploadFileState,
199) -> Result<UploadedFileData, UploadFileError> {
200    // Determine if we need to upload and what the file key is
201    let (s3_upload, file_key) = match upload.file_key.as_ref() {
202        // Already have a file key, don't want to upload
203        Some(file_key) => (false, file_key.clone()),
204
205        // No existing file key, we are creating one and uploading the file
206        None => (
207            true,
208            create_file_key(
209                &upload.document_box,
210                &upload.name,
211                &upload.mime,
212                Uuid::new_v4(),
213            ),
214        ),
215    };
216
217    // Process the file
218    let processing_output = process_file(
219        &upload.processing_config,
220        processing,
221        upload.file_bytes.clone(),
222        &upload.mime,
223    )
224    .await?;
225
226    // Get file encryption state
227    let encrypted = processing_output
228        .as_ref()
229        .map(|output| output.encrypted)
230        .unwrap_or_default();
231
232    // Create database record
233    let file = create_file_record(db, &upload, &file_key, &upload.file_bytes, encrypted).await?;
234
235    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
236    let mut generated_files: Option<Vec<GeneratedFile>> = None;
237    let mut additional_files: Vec<UploadedFileData> = Vec::new();
238
239    if let Some(processing_output) = processing_output {
240        index_metadata = processing_output.index_metadata;
241
242        tracing::debug!("uploading generated files");
243        let files = store_generated_files(
244            db,
245            storage,
246            &file,
247            &mut upload_state.s3_upload_keys,
248            processing_output.upload_queue,
249        )
250        .await?;
251        generated_files = Some(files);
252
253        // Process additional files
254        for additional_file in processing_output.additional_files {
255            let upload = UploadFile {
256                parent_id: Some(file.id),
257                fixed_id: additional_file.fixed_id,
258                folder_id: upload.folder_id,
259                document_box: upload.document_box.clone(),
260                name: additional_file.name,
261                mime: additional_file.mime,
262                file_bytes: additional_file.bytes,
263                created_by: upload.created_by.clone(),
264                file_key: None,
265                processing_config: upload.processing_config.clone(),
266            };
267
268            // Process the child file (Additional file outputs are ignored)
269            let output = Box::pin(upload_file(
270                db,
271                search,
272                storage,
273                events,
274                processing,
275                upload,
276                upload_state,
277            ))
278            .await?;
279
280            additional_files.push(output);
281        }
282    }
283
284    // Index the file in the search index
285    tracing::debug!("indexing file contents");
286    store_file_index(search, &file, &upload.document_box, index_metadata).await?;
287    upload_state.search_index_files.push(file.id);
288
289    if s3_upload {
290        // Upload the file itself to S3
291        tracing::debug!("uploading main file");
292        storage
293            .upload_file(&file_key, file.mime.clone(), upload.file_bytes)
294            .await
295            .map_err(UploadFileError::UploadFile)?;
296        upload_state.s3_upload_keys.push(file_key.clone());
297    }
298
299    // Publish an event
300    events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
301        file.clone(),
302        upload.document_box.clone(),
303    )));
304
305    Ok(UploadedFileData {
306        file,
307        generated: generated_files.unwrap_or_default(),
308        additional_files,
309    })
310}
311
312/// Create a file database record
313async fn create_file_record(
314    db: &mut DbTransaction<'_>,
315    upload: &UploadFile,
316    file_key: &str,
317    file_bytes: &Bytes,
318    encrypted: bool,
319) -> Result<File, UploadFileError> {
320    let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
321    let size = file_bytes.len().min(i32::MAX as usize) as i32;
322
323    // Create file to commit against
324    File::create(
325        db.deref_mut(),
326        CreateFile {
327            parent_id: upload.parent_id,
328            fixed_id: upload.fixed_id,
329            name: upload.name.clone(),
330            mime: upload.mime.to_string(),
331            file_key: file_key.to_owned(),
332            folder_id: upload.folder_id,
333            hash,
334            size,
335            created_by: upload.created_by.clone(),
336            encrypted,
337        },
338    )
339    .await
340    .map_err(UploadFileError::CreateFile)
341}
342
343/// Stores the provided queued file uploads as generated files in
344/// the database for a specific file, returns the generated file
345/// database entries
346///
347/// Any uploads that succeed to S3 will have their file key pushed
348/// to `s3_upload_keys` so that it can be rolled back if any errors
349/// occur
350pub async fn store_generated_files(
351    db: &mut DbTransaction<'_>,
352    storage: &TenantStorageLayer,
353    file: &File,
354    s3_upload_keys: &mut Vec<String>,
355    queued_uploads: Vec<QueuedUpload>,
356) -> Result<Vec<GeneratedFile>, UploadFileError> {
357    // Upload the generated files to S3
358    let upload_results = upload_generated_files(
359        storage,
360        &file.file_key,
361        &file.id,
362        &file.hash,
363        queued_uploads,
364    )
365    .await;
366
367    let mut generated_files = Vec::new();
368    let mut upload_errors = Vec::new();
369
370    for result in upload_results {
371        match result {
372            // Successful upload, store generated file
373            Ok(create) => {
374                // Track uploaded file keys
375                s3_upload_keys.push(file.file_key.clone());
376
377                // Store generated file in database
378                let generated_file = match GeneratedFile::create(db.deref_mut(), create)
379                    .await
380                    .map_err(UploadFileError::CreateGeneratedFile)
381                {
382                    Ok(value) => value,
383                    Err(err) => {
384                        upload_errors.push(err);
385                        continue;
386                    }
387                };
388
389                generated_files.push(generated_file);
390            }
391            // Failed upload
392            Err(cause) => {
393                tracing::error!(?cause, "failed to upload generated file");
394                upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
395            }
396        }
397    }
398
399    // Handle any errors in the upload process
400    // (This must occur after so that we can ensure we capture all successful uploads to rollback)
401    if !upload_errors.is_empty() {
402        tracing::warn!("error while uploading generated files, operation failed");
403
404        let error = upload_errors
405            .into_iter()
406            .map(anyhow::Error::from)
407            .collect::<CompositeError>();
408
409        return Err(UploadFileError::Composite(error));
410    }
411
412    Ok(generated_files)
413}
414
415/// Performs the process of rolling back a file upload based
416/// on the current upload state
417pub async fn rollback_upload_file(
418    search: &TenantSearchIndex,
419    storage: &TenantStorageLayer,
420    upload_state: UploadFileState,
421) {
422    // Revert upload S3 files
423    for key in upload_state.s3_upload_keys {
424        if let Err(err) = storage.delete_file(&key).await {
425            tracing::error!(?err, "failed to rollback created tenant s3 file");
426        }
427    }
428
429    // Revert file index data
430    for index in upload_state.search_index_files {
431        if let Err(err) = search.delete_data(index).await {
432            tracing::error!(?index, ?err, "failed to rollback created file search index",);
433        }
434    }
435}