docbox_core/files/
upload_file.rs

1use crate::{
2    events::{TenantEventMessage, TenantEventPublisher},
3    files::{
4        create_file_key,
5        generated::{make_create_generated_files, upload_generated_files},
6        index_file::store_file_index,
7    },
8};
9use bytes::Bytes;
10use chrono::Utc;
11use docbox_database::models::{
12    document_box::DocumentBoxScopeRaw, generated_file::CreateGeneratedFile,
13};
14use docbox_database::models::{document_box::DocumentBoxScopeRawRef, folder::FolderId};
15use docbox_database::{
16    DbErr, DbPool, DbTransaction,
17    models::{
18        document_box::WithScope,
19        file::{CreateFile, File, FileId},
20        generated_file::GeneratedFile,
21        user::UserId,
22    },
23};
24use docbox_processing::{
25    ProcessingConfig, ProcessingError, ProcessingIndexMetadata, ProcessingLayer, QueuedUpload,
26    process_file,
27};
28use docbox_search::{SearchError, TenantSearchIndex};
29use docbox_storage::{StorageLayerError, TenantStorageLayer};
30use mime::Mime;
31use std::ops::DerefMut;
32use thiserror::Error;
33use tracing::Instrument;
34use uuid::Uuid;
35
36/// Error messages from this are user-facing so any data included should ensure
37/// it does not expose any information that it should't
38#[derive(Debug, Error)]
39pub enum UploadFileError {
40    /// Failed to create the search index
41    #[error("failed to create file search index: {0}")]
42    CreateIndex(SearchError),
43
44    /// Failed to create the file database row
45    #[error("failed to create file entry")]
46    CreateFile(DbErr),
47
48    /// Failed to process the file
49    #[error("failed to process file: {0}")]
50    Processing(#[from] ProcessingError),
51
52    /// Failed to upload generated file to storage layer
53    #[error("failed to upload generated file to storage layer: {0}")]
54    UploadGeneratedFile(StorageLayerError),
55
56    /// Failed to create the generated file database row
57    #[error("failed to create generated file")]
58    CreateGeneratedFile(DbErr),
59
60    /// Failed to upload file to storage layer
61    #[error("failed to upload file to storage layer: {0}")]
62    UploadFile(StorageLayerError),
63
64    /// File uploads failed
65    #[error("failed to upload files")]
66    FailedFileUploads(Vec<UploadFileError>),
67
68    /// Failed to begin the transaction
69    #[error("failed to perform operation (start)")]
70    BeginTransaction(DbErr),
71
72    /// Failed to commit the transaction
73    #[error("failed to perform operation (end)")]
74    CommitTransaction(DbErr),
75}
76
77/// State keeping track of whats been generated from a file
78/// upload, to help with reverted changes on failure
79#[derive(Default)]
80pub struct UploadFileState {
81    /// Storage file upload keys
82    pub storage_upload_keys: Vec<String>,
83    /// Search index files
84    pub search_index_files: Vec<Uuid>,
85}
86
87pub struct UploadFile {
88    /// Fixed file ID to use instead of a randomly
89    /// generated file ID
90    pub fixed_id: Option<FileId>,
91
92    /// ID of the parent file if this file is related to another file
93    pub parent_id: Option<FileId>,
94
95    /// ID of the destination folder
96    pub folder_id: FolderId,
97
98    /// Document box the file and folder are contained within
99    pub document_box: DocumentBoxScopeRaw,
100
101    /// File name
102    pub name: String,
103
104    /// File content type
105    pub mime: Mime,
106
107    /// File content
108    pub file_bytes: Bytes,
109
110    /// User uploading the file
111    pub created_by: Option<UserId>,
112
113    /// Key to the file if the file is already uploaded to S3
114    pub file_key: Option<String>,
115
116    /// Config that can be used when processing for additional
117    /// configuration to how the file is processed
118    pub processing_config: Option<ProcessingConfig>,
119}
120
121#[derive(Debug)]
122pub struct UploadedFileData {
123    /// The uploaded file itself
124    pub file: File,
125    /// Generated data alongside the file
126    pub generated: Vec<GeneratedFile>,
127    /// Additional files created and uploaded from processing the file
128    pub additional_files: Vec<UploadedFileData>,
129}
130
131pub async fn upload_file(
132    db: &DbPool,
133    search: &TenantSearchIndex,
134    storage: &TenantStorageLayer,
135    processing: &ProcessingLayer,
136    events: &TenantEventPublisher,
137    upload: UploadFile,
138) -> Result<UploadedFileData, UploadFileError> {
139    let document_box = upload.document_box.clone();
140    let mut upload_state = UploadFileState::default();
141
142    // Perform the creation of resources and processing
143    let data =
144        match upload_file_inner(search, storage, processing, upload, &mut upload_state, 0).await {
145            Ok(value) => value,
146            Err(error) => {
147                tracing::error!(?error, "failed to complete inner file processing");
148                background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
149                return Err(error);
150            }
151        };
152
153    // Persist records to the database
154    let mut db = db.begin().await.map_err(|error| {
155        tracing::error!(?error, "failed to begin transaction");
156        UploadFileError::BeginTransaction(error)
157    })?;
158
159    let output = match persist_file_upload(&mut db, data).await {
160        Ok(value) => value,
161        Err(error) => {
162            if let Err(cause) = db.rollback().await {
163                tracing::error!(?cause, "failed to roll back database transaction");
164            }
165
166            tracing::error!(?error, "failed to complete inner file processing");
167            background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
168            return Err(error);
169        }
170    };
171
172    if let Err(error) = db.commit().await {
173        tracing::error!(?error, "failed to commit transaction");
174        background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
175        return Err(UploadFileError::CommitTransaction(error));
176    }
177
178    // Publish creation events
179    publish_file_creation_events(events, &document_box, &output);
180
181    Ok(output)
182}
183
184/// Publish file creation events for all created files
185pub fn publish_file_creation_events(
186    events: &TenantEventPublisher,
187    document_box: DocumentBoxScopeRawRef<'_>,
188    output: &UploadedFileData,
189) {
190    // Publish file creation events
191    events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
192        output.file.clone(),
193        document_box.to_string(),
194    )));
195
196    for additional_file in &output.additional_files {
197        publish_file_creation_events(events, document_box, additional_file);
198    }
199}
200
201#[derive(Debug)]
202pub struct PreparedUploadData {
203    /// Main file record to create
204    file: CreateFile,
205
206    /// Generated file records to create
207    generated_files: Option<Vec<CreateGeneratedFile>>,
208
209    /// Child additional file records to create
210    additional_files: Vec<PreparedUploadData>,
211}
212
213/// Performs the file uploading, processing and storage. Prepares the data without
214/// persisting data to the database
215///
216/// Performs the following:
217/// - Process the file
218/// - Create a prepared file record database metadata
219/// - Upload generated files and create their metadata
220/// - Perform this function for additional inner files
221/// - Store file metadata in the search index
222/// - Upload the main file to S3 if not already performed
223async fn upload_file_inner(
224    search: &TenantSearchIndex,
225    storage: &TenantStorageLayer,
226    processing: &ProcessingLayer,
227    upload: UploadFile,
228    upload_state: &mut UploadFileState,
229    iteration: usize,
230) -> Result<PreparedUploadData, UploadFileError> {
231    let server_max_iterations = processing.config.max_unpack_iterations.unwrap_or(1);
232    let max_iterations = upload
233        .processing_config
234        .as_ref()
235        .and_then(|value| value.max_unpack_iterations)
236        .unwrap_or(1)
237        .min(server_max_iterations);
238
239    // Determine if we need to upload and what the file key is
240    let (s3_upload, file_key) = match upload.file_key.as_ref() {
241        // Already have a file key, don't want to upload
242        Some(file_key) => (false, file_key.clone()),
243
244        // No existing file key, we are creating one and uploading the file
245        None => (
246            true,
247            create_file_key(
248                &upload.document_box,
249                &upload.name,
250                &upload.mime,
251                Uuid::new_v4(),
252            ),
253        ),
254    };
255
256    // Process the file
257    let processing_output = process_file(
258        &upload.processing_config,
259        processing,
260        upload.file_bytes.clone(),
261        &upload.mime,
262    )
263    .await?;
264
265    // Get file encryption state
266    let encrypted = processing_output
267        .as_ref()
268        .map(|output| output.encrypted)
269        .unwrap_or_default();
270
271    let file_record = make_file_record(&upload, &file_key, &upload.file_bytes, encrypted);
272
273    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
274    let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
275    let mut additional_files: Vec<PreparedUploadData> = Vec::new();
276
277    if let Some(processing_output) = processing_output {
278        index_metadata = processing_output.index_metadata;
279
280        // Upload generated files and store the metadata
281        tracing::debug!("uploading generated files");
282        let prepared_files = store_generated_files(
283            storage,
284            &file_record,
285            &mut upload_state.storage_upload_keys,
286            processing_output.upload_queue,
287        )
288        .await?;
289        generated_files = Some(prepared_files);
290
291        let next_iteration = iteration + 1;
292        if next_iteration > max_iterations {
293            tracing::debug!(
294                ?iteration,
295                "additional files were present but exceeded the max unpacking iteration for this request"
296            );
297        } else {
298            // Process additional files
299            for additional_file in processing_output.additional_files {
300                let upload = UploadFile {
301                    parent_id: Some(file_record.id),
302                    fixed_id: additional_file.fixed_id,
303                    folder_id: upload.folder_id,
304                    document_box: upload.document_box.clone(),
305                    name: additional_file.name,
306                    mime: additional_file.mime,
307                    file_bytes: additional_file.bytes,
308                    created_by: upload.created_by.clone(),
309                    file_key: None,
310                    processing_config: upload.processing_config.clone(),
311                };
312
313                // Process the child file (Additional file outputs are ignored)
314                let output = Box::pin(upload_file_inner(
315                    search,
316                    storage,
317                    processing,
318                    upload,
319                    upload_state,
320                    next_iteration,
321                ))
322                .await?;
323
324                additional_files.push(output);
325            }
326        }
327    }
328
329    // Index the file in the search index
330    tracing::debug!("indexing file contents");
331    store_file_index(search, &file_record, &upload.document_box, index_metadata).await?;
332    upload_state.search_index_files.push(file_record.id);
333
334    if s3_upload {
335        // Upload the file itself to S3
336        tracing::debug!("uploading main file");
337        storage
338            .upload_file(&file_key, file_record.mime.clone(), upload.file_bytes)
339            .await
340            .map_err(UploadFileError::UploadFile)?;
341        upload_state.storage_upload_keys.push(file_key.clone());
342    }
343
344    Ok(PreparedUploadData {
345        file: file_record,
346        generated_files,
347        additional_files,
348    })
349}
350
351/// Persists the data from [PreparedUploadData] into the database storing any applied changes
352async fn persist_file_upload(
353    db: &mut DbTransaction<'_>,
354    data: PreparedUploadData,
355) -> Result<UploadedFileData, UploadFileError> {
356    // Create file to commit against
357    let file = File::create(db.deref_mut(), data.file)
358        .await
359        .map_err(UploadFileError::CreateFile)?;
360
361    // Create generated file records
362    let mut generated_files = Vec::new();
363    if let Some(creates) = data.generated_files {
364        for create in creates {
365            let generated_file = GeneratedFile::create(db.deref_mut(), create)
366                .await
367                .map_err(UploadFileError::CreateGeneratedFile)?;
368
369            generated_files.push(generated_file);
370        }
371    }
372
373    // Create records for inner additional files
374    let mut additional_files: Vec<UploadedFileData> = Vec::new();
375    for additional_file in data.additional_files {
376        let inner = Box::pin(persist_file_upload(db, additional_file)).await?;
377        additional_files.push(inner);
378    }
379
380    Ok(UploadedFileData {
381        file,
382        generated: generated_files,
383        additional_files,
384    })
385}
386
387/// Creates a file record to be stored in the database
388fn make_file_record(
389    upload: &UploadFile,
390    file_key: &str,
391    file_bytes: &Bytes,
392    encrypted: bool,
393) -> CreateFile {
394    let id = upload.fixed_id.unwrap_or_else(Uuid::new_v4);
395    let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
396    let size = file_bytes.len().min(i32::MAX as usize) as i32;
397    let created_at = Utc::now();
398
399    CreateFile {
400        id,
401        parent_id: upload.parent_id,
402        name: upload.name.clone(),
403        mime: upload.mime.to_string(),
404        file_key: file_key.to_owned(),
405        folder_id: upload.folder_id,
406        hash,
407        size,
408        created_by: upload.created_by.clone(),
409        created_at,
410        encrypted,
411    }
412}
413
414/// Creates prepared upload records for generated files, stores the files
415/// in S3 and returns the [CreateGeneratedFile] structures to be stored
416/// in the database at a later step
417///
418/// Any uploads that succeed to storage will have their file key pushed
419/// to `storage_upload_keys` so that it can be rolled back if any errors
420/// occur
421pub async fn store_generated_files(
422    storage: &TenantStorageLayer,
423    file: &CreateFile,
424    storage_upload_keys: &mut Vec<String>,
425    queued_uploads: Vec<QueuedUpload>,
426) -> Result<Vec<CreateGeneratedFile>, UploadFileError> {
427    let prepared_uploads =
428        make_create_generated_files(&file.file_key, &file.id, &file.hash, queued_uploads);
429
430    // Upload the generated files to S3
431    let upload_results = upload_generated_files(storage, prepared_uploads).await;
432
433    let mut generated_files = Vec::new();
434    let mut upload_errors = Vec::new();
435
436    for result in upload_results {
437        match result {
438            // Successful upload, store generated file
439            Ok(create) => {
440                // Track uploaded file keys
441                storage_upload_keys.push(file.file_key.clone());
442                generated_files.push(create);
443            }
444            // Failed upload
445            Err(cause) => {
446                tracing::error!(?cause, "failed to upload generated file");
447                upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
448            }
449        }
450    }
451
452    // Handle any errors in the upload process
453    // (This must occur after so that we can ensure we capture all successful uploads to rollback)
454    if !upload_errors.is_empty() {
455        tracing::warn!(
456            ?upload_errors,
457            "error while uploading generated files, operation failed"
458        );
459
460        return Err(UploadFileError::FailedFileUploads(upload_errors));
461    }
462
463    Ok(generated_files)
464}
465
466/// Performs a background rollback task on an uploaded file
467fn background_rollback_upload_file(
468    search: TenantSearchIndex,
469    storage: TenantStorageLayer,
470    upload_state: UploadFileState,
471) {
472    let span = tracing::Span::current();
473
474    // Attempt to rollback any allocated resources in the background
475    tokio::spawn(
476        async move {
477            rollback_upload_file(&search, &storage, upload_state).await;
478        }
479        .instrument(span),
480    );
481}
482
483/// Performs the process of rolling back a file upload based
484/// on the current upload state
485async fn rollback_upload_file(
486    search: &TenantSearchIndex,
487    storage: &TenantStorageLayer,
488    upload_state: UploadFileState,
489) {
490    // Revert upload S3 files
491    for key in upload_state.storage_upload_keys {
492        if let Err(err) = storage.delete_file(&key).await {
493            tracing::error!(?err, "failed to rollback created tenant s3 file");
494        }
495    }
496
497    // Revert file index data
498    for index in upload_state.search_index_files {
499        if let Err(err) = search.delete_data(index).await {
500            tracing::error!(?index, ?err, "failed to rollback created file search index",);
501        }
502    }
503}