docbox_core/files/
upload_file_presigned.rs

1use crate::{
2    events::TenantEventPublisher,
3    files::{
4        create_file_key,
5        upload_file::{
6            ProcessingConfig, UploadFile, UploadFileError, UploadFileState, rollback_upload_file,
7            upload_file,
8        },
9    },
10    processing::{ProcessingError, ProcessingLayer},
11    storage::TenantStorageLayer,
12};
13use docbox_database::{
14    DbErr, DbPool, DbTransaction,
15    models::{
16        document_box::DocumentBoxScopeRaw,
17        file::FileId,
18        folder::Folder,
19        presigned_upload_task::{
20            CreatePresignedUploadTask, PresignedTaskStatus, PresignedUploadTask,
21            PresignedUploadTaskId,
22        },
23        user::UserId,
24    },
25};
26use docbox_search::TenantSearchIndex;
27use mime::Mime;
28use serde::Serialize;
29use std::{collections::HashMap, ops::DerefMut, str::FromStr};
30use thiserror::Error;
31use uuid::Uuid;
32
33#[derive(Serialize)]
34pub struct PresignedUploadOutcome {
35    pub task_id: PresignedUploadTaskId,
36    pub method: String,
37    pub uri: String,
38    pub headers: HashMap<String, String>,
39}
40
41#[derive(Debug, Error)]
42pub enum PresignedUploadError {
43    /// Error when uploading files
44    #[error(transparent)]
45    UploadFile(#[from] UploadFileError),
46
47    /// Error loading the file from S3
48    #[error("failed to load file from s3")]
49    LoadFile(anyhow::Error),
50
51    /// Stored file metadata mime type was invalid
52    #[error("file had an invalid mime type")]
53    InvalidMimeType(mime::FromStrError),
54
55    /// Failed to create the file database row
56    #[error("failed to create file")]
57    CreateFile(DbErr),
58
59    /// Failed to process the file
60    #[error("failed to process file: {0}")]
61    Processing(#[from] ProcessingError),
62
63    /// Failed to update the task status
64    #[error("failed to update task status")]
65    UpdateTaskStatus(DbErr),
66}
67
68/// State keeping track of whats been generated from a file
69/// upload, to help with reverted changes on failure
70#[derive(Default)]
71pub struct PresignedUploadState {
72    /// File upload state
73    file: UploadFileState,
74}
75
76pub struct CreatePresigned {
77    /// Name of the file being uploaded
78    pub name: String,
79
80    /// The document box scope to store within
81    pub document_box: DocumentBoxScopeRaw,
82
83    /// Folder to store the file in
84    pub folder: Folder,
85
86    /// Size of the file being uploaded
87    pub size: i32,
88
89    /// Mime type of the file
90    pub mime: Mime,
91
92    /// User uploading the file
93    pub created_by: Option<UserId>,
94
95    /// Optional parent file ID
96    pub parent_id: Option<FileId>,
97
98    /// Config for processing step
99    pub processing_config: Option<ProcessingConfig>,
100}
101
102/// Create a new presigned file upload request
103pub async fn create_presigned_upload(
104    db: &DbPool,
105    storage: &TenantStorageLayer,
106    create: CreatePresigned,
107) -> anyhow::Result<PresignedUploadOutcome> {
108    let file_key = create_file_key(
109        &create.folder.document_box,
110        &create.name,
111        &create.mime,
112        Uuid::new_v4(),
113    );
114    let (signed_request, expires_at) = storage
115        .create_presigned(&file_key, create.size as i64)
116        .await?;
117
118    // Encode the processing config for the database
119    let processing_config = match &create.processing_config {
120        Some(config) => {
121            let value = serde_json::to_value(config).map_err(|err| DbErr::Encode(Box::new(err)))?;
122            Some(value)
123        }
124        None => None,
125    };
126
127    let task = PresignedUploadTask::create(
128        db,
129        CreatePresignedUploadTask {
130            name: create.name,
131            mime: create.mime.to_string(),
132            document_box: create.document_box,
133            folder_id: create.folder.id,
134            size: create.size,
135            file_key,
136            created_by: create.created_by,
137            expires_at,
138            parent_id: create.parent_id,
139            processing_config,
140        },
141    )
142    .await?;
143
144    Ok(PresignedUploadOutcome {
145        task_id: task.id,
146        method: signed_request.method().to_string(),
147        uri: signed_request.uri().to_string(),
148        headers: signed_request
149            .headers()
150            .map(|(key, value)| (key.to_string(), value.to_string()))
151            .collect(),
152    })
153}
154
155pub struct CompletePresigned {
156    pub task: PresignedUploadTask,
157    pub folder: Folder,
158}
159
160/// Safely performs [upload_file] ensuring that on failure all resources are
161/// properly cleaned up
162pub async fn safe_complete_presigned(
163    db_pool: DbPool,
164    search: TenantSearchIndex,
165    storage: TenantStorageLayer,
166    events: TenantEventPublisher,
167    processing: ProcessingLayer,
168    mut complete: CompletePresigned,
169) -> Result<(), anyhow::Error> {
170    // Start a database transaction
171    let mut db = db_pool.begin().await.map_err(|cause| {
172        tracing::error!(?cause, "failed to begin transaction");
173        anyhow::anyhow!("failed to begin transaction")
174    })?;
175
176    let mut upload_state = PresignedUploadState::default();
177
178    match complete_presigned(
179        &mut db,
180        &search,
181        &storage,
182        &events,
183        &processing,
184        &mut complete,
185        &mut upload_state,
186    )
187    .await
188    {
189        Ok(value) => value,
190        Err(err) => {
191            let error_message = err.to_string();
192
193            // Attempt to rollback any allocated resources in the background
194            tokio::spawn(async move {
195                if let Err(cause) = db.rollback().await {
196                    tracing::error!(?cause, "failed to roll back database transaction");
197                }
198
199                // Update the task status
200                if let Err(cause) = complete
201                    .task
202                    .set_status(
203                        &db_pool,
204                        PresignedTaskStatus::Failed {
205                            error: error_message,
206                        },
207                    )
208                    .await
209                {
210                    tracing::error!(?cause, "failed to set presigned task status to failure");
211                }
212
213                rollback_presigned_upload_file(&search, &storage, upload_state).await;
214            });
215
216            return Err(anyhow::Error::from(err));
217        }
218    };
219
220    // Commit the transaction
221    if let Err(cause) = db.commit().await {
222        tracing::error!(?cause, "failed to commit transaction");
223
224        // Update the task status
225        if let Err(cause) = complete
226            .task
227            .set_status(
228                &db_pool,
229                PresignedTaskStatus::Failed {
230                    error: "Internal server error".to_string(),
231                },
232            )
233            .await
234        {
235            tracing::error!(?cause, "failed to set presigned task status to failure");
236        }
237
238        // Attempt to rollback any allocated resources in the background
239        tokio::spawn(async move {
240            rollback_presigned_upload_file(&search, &storage, upload_state).await;
241        });
242
243        return Err(anyhow::anyhow!("failed to commit transaction"));
244    }
245
246    Ok(())
247}
248
249/// Completes a presigned file upload
250pub async fn complete_presigned(
251    db: &mut DbTransaction<'_>,
252    search: &TenantSearchIndex,
253    storage: &TenantStorageLayer,
254    events: &TenantEventPublisher,
255    processing: &ProcessingLayer,
256    complete: &mut CompletePresigned,
257    upload_state: &mut PresignedUploadState,
258) -> Result<(), PresignedUploadError> {
259    let task = &mut complete.task;
260
261    // Load the file from S3
262    let file_bytes = storage
263        .get_file(&task.file_key)
264        .await
265        .map_err(PresignedUploadError::LoadFile)?
266        .collect_bytes()
267        .await
268        .map_err(PresignedUploadError::LoadFile)?;
269
270    // Get the mime type from the task
271    let mime = mime::Mime::from_str(&task.mime).map_err(PresignedUploadError::InvalidMimeType)?;
272
273    // Parse task processing config
274    let processing_config: Option<ProcessingConfig> = match &task.processing_config {
275        Some(value) => match serde_json::from_value(value.0.clone()) {
276            Ok(value) => value,
277            Err(cause) => {
278                tracing::error!(?cause, "failed to deserialize processing config");
279                None
280            }
281        },
282        None => None,
283    };
284
285    let upload = UploadFile {
286        fixed_id: None,
287        parent_id: task.parent_id,
288        folder_id: complete.folder.id,
289        document_box: complete.folder.document_box.clone(),
290        name: task.name.clone(),
291        mime,
292        file_bytes,
293        created_by: task.created_by.clone(),
294        file_key: Some(task.file_key.clone()),
295        processing_config,
296    };
297
298    // Perform the upload
299    let output = upload_file(
300        db,
301        search,
302        storage,
303        events,
304        processing,
305        upload,
306        &mut upload_state.file,
307    )
308    .await?;
309
310    // Update the task status
311    task.set_status(
312        db.deref_mut(),
313        PresignedTaskStatus::Completed {
314            file_id: output.file.id,
315        },
316    )
317    .await
318    .map_err(PresignedUploadError::UpdateTaskStatus)?;
319
320    Ok(())
321}
322
323/// Performs the process of rolling back a file upload based
324/// on the current upload state
325pub async fn rollback_presigned_upload_file(
326    search: &TenantSearchIndex,
327    storage: &TenantStorageLayer,
328    upload_state: PresignedUploadState,
329) {
330    // Revert file state
331    rollback_upload_file(search, storage, upload_state.file).await;
332}