docbox_core/files/
upload_file_presigned.rs

1use crate::{
2    events::TenantEventPublisher,
3    files::{
4        create_file_key,
5        upload_file::{
6            ProcessingConfig, UploadFile, UploadFileError, UploadFileState, rollback_upload_file,
7            upload_file,
8        },
9    },
10    processing::{ProcessingError, ProcessingLayer},
11};
12use docbox_database::{
13    DbErr, DbPool, DbTransaction,
14    models::{
15        document_box::DocumentBoxScopeRaw,
16        file::FileId,
17        folder::Folder,
18        presigned_upload_task::{
19            CreatePresignedUploadTask, PresignedTaskStatus, PresignedUploadTask,
20            PresignedUploadTaskId,
21        },
22        user::UserId,
23    },
24};
25use docbox_search::TenantSearchIndex;
26use docbox_storage::TenantStorageLayer;
27use mime::Mime;
28use serde::Serialize;
29use std::{collections::HashMap, ops::DerefMut, str::FromStr};
30use thiserror::Error;
31use tracing::Instrument;
32use uuid::Uuid;
33
34#[derive(Serialize)]
35pub struct PresignedUploadOutcome {
36    pub task_id: PresignedUploadTaskId,
37    pub method: String,
38    pub uri: String,
39    pub headers: HashMap<String, String>,
40}
41
42#[derive(Debug, Error)]
43pub enum PresignedUploadError {
44    /// Error when uploading files
45    #[error(transparent)]
46    UploadFile(#[from] UploadFileError),
47
48    /// Error loading the file from S3
49    #[error("failed to load file from s3")]
50    LoadFile(anyhow::Error),
51
52    /// Stored file metadata mime type was invalid
53    #[error("file had an invalid mime type")]
54    InvalidMimeType(mime::FromStrError),
55
56    /// Failed to create the file database row
57    #[error("failed to create file")]
58    CreateFile(DbErr),
59
60    /// Failed to process the file
61    #[error("failed to process file: {0}")]
62    Processing(#[from] ProcessingError),
63
64    /// Failed to update the task status
65    #[error("failed to update task status")]
66    UpdateTaskStatus(DbErr),
67}
68
69/// State keeping track of whats been generated from a file
70/// upload, to help with reverted changes on failure
71#[derive(Default)]
72pub struct PresignedUploadState {
73    /// File upload state
74    file: UploadFileState,
75}
76
77pub struct CreatePresigned {
78    /// Name of the file being uploaded
79    pub name: String,
80
81    /// The document box scope to store within
82    pub document_box: DocumentBoxScopeRaw,
83
84    /// Folder to store the file in
85    pub folder: Folder,
86
87    /// Size of the file being uploaded
88    pub size: i32,
89
90    /// Mime type of the file
91    pub mime: Mime,
92
93    /// User uploading the file
94    pub created_by: Option<UserId>,
95
96    /// Optional parent file ID
97    pub parent_id: Option<FileId>,
98
99    /// Config for processing step
100    pub processing_config: Option<ProcessingConfig>,
101}
102
103/// Create a new presigned file upload request
104pub async fn create_presigned_upload(
105    db: &DbPool,
106    storage: &TenantStorageLayer,
107    create: CreatePresigned,
108) -> anyhow::Result<PresignedUploadOutcome> {
109    let file_key = create_file_key(
110        &create.folder.document_box,
111        &create.name,
112        &create.mime,
113        Uuid::new_v4(),
114    );
115    let (signed_request, expires_at) = storage
116        .create_presigned(&file_key, create.size as i64)
117        .await?;
118
119    // Encode the processing config for the database
120    let processing_config = match &create.processing_config {
121        Some(config) => {
122            let value = serde_json::to_value(config).map_err(|err| DbErr::Encode(Box::new(err)))?;
123            Some(value)
124        }
125        None => None,
126    };
127
128    let task = PresignedUploadTask::create(
129        db,
130        CreatePresignedUploadTask {
131            name: create.name,
132            mime: create.mime.to_string(),
133            document_box: create.document_box,
134            folder_id: create.folder.id,
135            size: create.size,
136            file_key,
137            created_by: create.created_by,
138            expires_at,
139            parent_id: create.parent_id,
140            processing_config,
141        },
142    )
143    .await?;
144
145    Ok(PresignedUploadOutcome {
146        task_id: task.id,
147        method: signed_request.method().to_string(),
148        uri: signed_request.uri().to_string(),
149        headers: signed_request
150            .headers()
151            .map(|(key, value)| (key.to_string(), value.to_string()))
152            .collect(),
153    })
154}
155
156pub struct CompletePresigned {
157    pub task: PresignedUploadTask,
158    pub folder: Folder,
159}
160
161/// Safely performs [upload_file] ensuring that on failure all resources are
162/// properly cleaned up
163pub async fn safe_complete_presigned(
164    db_pool: DbPool,
165    search: TenantSearchIndex,
166    storage: TenantStorageLayer,
167    events: TenantEventPublisher,
168    processing: ProcessingLayer,
169    mut complete: CompletePresigned,
170) -> Result<(), anyhow::Error> {
171    // Start a database transaction
172    let mut db = db_pool.begin().await.map_err(|cause| {
173        tracing::error!(?cause, "failed to begin transaction");
174        anyhow::anyhow!("failed to begin transaction")
175    })?;
176
177    let mut upload_state = PresignedUploadState::default();
178
179    match complete_presigned(
180        &mut db,
181        &search,
182        &storage,
183        &events,
184        &processing,
185        &mut complete,
186        &mut upload_state,
187    )
188    .await
189    {
190        Ok(value) => value,
191        Err(err) => {
192            let error_message = err.to_string();
193
194            let span = tracing::Span::current();
195
196            // Attempt to rollback any allocated resources in the background
197            tokio::spawn(
198                async move {
199                    if let Err(cause) = db.rollback().await {
200                        tracing::error!(?cause, "failed to roll back database transaction");
201                    }
202
203                    // Update the task status
204                    if let Err(cause) = complete
205                        .task
206                        .set_status(
207                            &db_pool,
208                            PresignedTaskStatus::Failed {
209                                error: error_message,
210                            },
211                        )
212                        .await
213                    {
214                        tracing::error!(?cause, "failed to set presigned task status to failure");
215                    }
216
217                    rollback_presigned_upload_file(&search, &storage, upload_state).await;
218                }
219                .instrument(span),
220            );
221
222            return Err(anyhow::Error::from(err));
223        }
224    };
225
226    // Commit the transaction
227    if let Err(cause) = db.commit().await {
228        tracing::error!(?cause, "failed to commit transaction");
229
230        // Update the task status
231        if let Err(cause) = complete
232            .task
233            .set_status(
234                &db_pool,
235                PresignedTaskStatus::Failed {
236                    error: "Internal server error".to_string(),
237                },
238            )
239            .await
240        {
241            tracing::error!(?cause, "failed to set presigned task status to failure");
242        }
243
244        let span = tracing::Span::current();
245
246        // Attempt to rollback any allocated resources in the background
247        tokio::spawn(
248            async move {
249                rollback_presigned_upload_file(&search, &storage, upload_state).await;
250            }
251            .instrument(span),
252        );
253
254        return Err(anyhow::anyhow!("failed to commit transaction"));
255    }
256
257    Ok(())
258}
259
260/// Completes a presigned file upload
261pub async fn complete_presigned(
262    db: &mut DbTransaction<'_>,
263    search: &TenantSearchIndex,
264    storage: &TenantStorageLayer,
265    events: &TenantEventPublisher,
266    processing: &ProcessingLayer,
267    complete: &mut CompletePresigned,
268    upload_state: &mut PresignedUploadState,
269) -> Result<(), PresignedUploadError> {
270    let task = &mut complete.task;
271
272    // Load the file from S3
273    let file_bytes = storage
274        .get_file(&task.file_key)
275        .await
276        .map_err(PresignedUploadError::LoadFile)?
277        .collect_bytes()
278        .await
279        .map_err(PresignedUploadError::LoadFile)?;
280
281    // Get the mime type from the task
282    let mime = mime::Mime::from_str(&task.mime).map_err(PresignedUploadError::InvalidMimeType)?;
283
284    // Parse task processing config
285    let processing_config: Option<ProcessingConfig> = match &task.processing_config {
286        Some(value) => match serde_json::from_value(value.0.clone()) {
287            Ok(value) => value,
288            Err(cause) => {
289                tracing::error!(?cause, "failed to deserialize processing config");
290                None
291            }
292        },
293        None => None,
294    };
295
296    let upload = UploadFile {
297        fixed_id: None,
298        parent_id: task.parent_id,
299        folder_id: complete.folder.id,
300        document_box: complete.folder.document_box.clone(),
301        name: task.name.clone(),
302        mime,
303        file_bytes,
304        created_by: task.created_by.clone(),
305        file_key: Some(task.file_key.clone()),
306        processing_config,
307    };
308
309    // Perform the upload
310    let output = upload_file(
311        db,
312        search,
313        storage,
314        events,
315        processing,
316        upload,
317        &mut upload_state.file,
318    )
319    .await?;
320
321    // Update the task status
322    task.set_status(
323        db.deref_mut(),
324        PresignedTaskStatus::Completed {
325            file_id: output.file.id,
326        },
327    )
328    .await
329    .map_err(PresignedUploadError::UpdateTaskStatus)?;
330
331    Ok(())
332}
333
334/// Performs the process of rolling back a file upload based
335/// on the current upload state
336pub async fn rollback_presigned_upload_file(
337    search: &TenantSearchIndex,
338    storage: &TenantStorageLayer,
339    upload_state: PresignedUploadState,
340) {
341    // Revert file state
342    rollback_upload_file(search, storage, upload_state.file).await;
343}