1use crate::{
2 events::TenantEventPublisher,
3 files::{
4 create_file_key,
5 upload_file::{
6 ProcessingConfig, UploadFile, UploadFileError, UploadFileState, rollback_upload_file,
7 upload_file,
8 },
9 },
10 processing::{ProcessingError, ProcessingLayer},
11};
12use docbox_database::{
13 DbErr, DbPool, DbTransaction,
14 models::{
15 document_box::DocumentBoxScopeRaw,
16 file::FileId,
17 folder::Folder,
18 presigned_upload_task::{
19 CreatePresignedUploadTask, PresignedTaskStatus, PresignedUploadTask,
20 PresignedUploadTaskId,
21 },
22 user::UserId,
23 },
24};
25use docbox_search::TenantSearchIndex;
26use docbox_storage::TenantStorageLayer;
27use mime::Mime;
28use serde::Serialize;
29use std::{collections::HashMap, ops::DerefMut, str::FromStr};
30use thiserror::Error;
31use tracing::Instrument;
32use uuid::Uuid;
33
34#[derive(Serialize)]
35pub struct PresignedUploadOutcome {
36 pub task_id: PresignedUploadTaskId,
37 pub method: String,
38 pub uri: String,
39 pub headers: HashMap<String, String>,
40}
41
42#[derive(Debug, Error)]
43pub enum PresignedUploadError {
44 #[error(transparent)]
46 UploadFile(#[from] UploadFileError),
47
48 #[error("failed to load file from s3")]
50 LoadFile(anyhow::Error),
51
52 #[error("file had an invalid mime type")]
54 InvalidMimeType(mime::FromStrError),
55
56 #[error("failed to create file")]
58 CreateFile(DbErr),
59
60 #[error("failed to process file: {0}")]
62 Processing(#[from] ProcessingError),
63
64 #[error("failed to update task status")]
66 UpdateTaskStatus(DbErr),
67}
68
69#[derive(Default)]
72pub struct PresignedUploadState {
73 file: UploadFileState,
75}
76
77pub struct CreatePresigned {
78 pub name: String,
80
81 pub document_box: DocumentBoxScopeRaw,
83
84 pub folder: Folder,
86
87 pub size: i32,
89
90 pub mime: Mime,
92
93 pub created_by: Option<UserId>,
95
96 pub parent_id: Option<FileId>,
98
99 pub processing_config: Option<ProcessingConfig>,
101}
102
103pub async fn create_presigned_upload(
105 db: &DbPool,
106 storage: &TenantStorageLayer,
107 create: CreatePresigned,
108) -> anyhow::Result<PresignedUploadOutcome> {
109 let file_key = create_file_key(
110 &create.folder.document_box,
111 &create.name,
112 &create.mime,
113 Uuid::new_v4(),
114 );
115 let (signed_request, expires_at) = storage
116 .create_presigned(&file_key, create.size as i64)
117 .await?;
118
119 let processing_config = match &create.processing_config {
121 Some(config) => {
122 let value = serde_json::to_value(config).map_err(|err| DbErr::Encode(Box::new(err)))?;
123 Some(value)
124 }
125 None => None,
126 };
127
128 let task = PresignedUploadTask::create(
129 db,
130 CreatePresignedUploadTask {
131 name: create.name,
132 mime: create.mime.to_string(),
133 document_box: create.document_box,
134 folder_id: create.folder.id,
135 size: create.size,
136 file_key,
137 created_by: create.created_by,
138 expires_at,
139 parent_id: create.parent_id,
140 processing_config,
141 },
142 )
143 .await?;
144
145 Ok(PresignedUploadOutcome {
146 task_id: task.id,
147 method: signed_request.method().to_string(),
148 uri: signed_request.uri().to_string(),
149 headers: signed_request
150 .headers()
151 .map(|(key, value)| (key.to_string(), value.to_string()))
152 .collect(),
153 })
154}
155
156pub struct CompletePresigned {
157 pub task: PresignedUploadTask,
158 pub folder: Folder,
159}
160
161pub async fn safe_complete_presigned(
164 db_pool: DbPool,
165 search: TenantSearchIndex,
166 storage: TenantStorageLayer,
167 events: TenantEventPublisher,
168 processing: ProcessingLayer,
169 mut complete: CompletePresigned,
170) -> Result<(), anyhow::Error> {
171 let mut db = db_pool.begin().await.map_err(|cause| {
173 tracing::error!(?cause, "failed to begin transaction");
174 anyhow::anyhow!("failed to begin transaction")
175 })?;
176
177 let mut upload_state = PresignedUploadState::default();
178
179 match complete_presigned(
180 &mut db,
181 &search,
182 &storage,
183 &events,
184 &processing,
185 &mut complete,
186 &mut upload_state,
187 )
188 .await
189 {
190 Ok(value) => value,
191 Err(err) => {
192 let error_message = err.to_string();
193
194 let span = tracing::Span::current();
195
196 tokio::spawn(
198 async move {
199 if let Err(cause) = db.rollback().await {
200 tracing::error!(?cause, "failed to roll back database transaction");
201 }
202
203 if let Err(cause) = complete
205 .task
206 .set_status(
207 &db_pool,
208 PresignedTaskStatus::Failed {
209 error: error_message,
210 },
211 )
212 .await
213 {
214 tracing::error!(?cause, "failed to set presigned task status to failure");
215 }
216
217 rollback_presigned_upload_file(&search, &storage, upload_state).await;
218 }
219 .instrument(span),
220 );
221
222 return Err(anyhow::Error::from(err));
223 }
224 };
225
226 if let Err(cause) = db.commit().await {
228 tracing::error!(?cause, "failed to commit transaction");
229
230 if let Err(cause) = complete
232 .task
233 .set_status(
234 &db_pool,
235 PresignedTaskStatus::Failed {
236 error: "Internal server error".to_string(),
237 },
238 )
239 .await
240 {
241 tracing::error!(?cause, "failed to set presigned task status to failure");
242 }
243
244 let span = tracing::Span::current();
245
246 tokio::spawn(
248 async move {
249 rollback_presigned_upload_file(&search, &storage, upload_state).await;
250 }
251 .instrument(span),
252 );
253
254 return Err(anyhow::anyhow!("failed to commit transaction"));
255 }
256
257 Ok(())
258}
259
260pub async fn complete_presigned(
262 db: &mut DbTransaction<'_>,
263 search: &TenantSearchIndex,
264 storage: &TenantStorageLayer,
265 events: &TenantEventPublisher,
266 processing: &ProcessingLayer,
267 complete: &mut CompletePresigned,
268 upload_state: &mut PresignedUploadState,
269) -> Result<(), PresignedUploadError> {
270 let task = &mut complete.task;
271
272 let file_bytes = storage
274 .get_file(&task.file_key)
275 .await
276 .map_err(PresignedUploadError::LoadFile)?
277 .collect_bytes()
278 .await
279 .map_err(PresignedUploadError::LoadFile)?;
280
281 let mime = mime::Mime::from_str(&task.mime).map_err(PresignedUploadError::InvalidMimeType)?;
283
284 let processing_config: Option<ProcessingConfig> = match &task.processing_config {
286 Some(value) => match serde_json::from_value(value.0.clone()) {
287 Ok(value) => value,
288 Err(cause) => {
289 tracing::error!(?cause, "failed to deserialize processing config");
290 None
291 }
292 },
293 None => None,
294 };
295
296 let upload = UploadFile {
297 fixed_id: None,
298 parent_id: task.parent_id,
299 folder_id: complete.folder.id,
300 document_box: complete.folder.document_box.clone(),
301 name: task.name.clone(),
302 mime,
303 file_bytes,
304 created_by: task.created_by.clone(),
305 file_key: Some(task.file_key.clone()),
306 processing_config,
307 };
308
309 let output = upload_file(
311 db,
312 search,
313 storage,
314 events,
315 processing,
316 upload,
317 &mut upload_state.file,
318 )
319 .await?;
320
321 task.set_status(
323 db.deref_mut(),
324 PresignedTaskStatus::Completed {
325 file_id: output.file.id,
326 },
327 )
328 .await
329 .map_err(PresignedUploadError::UpdateTaskStatus)?;
330
331 Ok(())
332}
333
334pub async fn rollback_presigned_upload_file(
337 search: &TenantSearchIndex,
338 storage: &TenantStorageLayer,
339 upload_state: PresignedUploadState,
340) {
341 rollback_upload_file(search, storage, upload_state.file).await;
343}