1use crate::{
2 events::TenantEventPublisher,
3 files::{
4 create_file_key,
5 upload_file::{
6 ProcessingConfig, UploadFile, UploadFileError, UploadFileState, rollback_upload_file,
7 upload_file,
8 },
9 },
10 processing::{ProcessingError, ProcessingLayer},
11 storage::TenantStorageLayer,
12};
13use docbox_database::{
14 DbErr, DbPool, DbTransaction,
15 models::{
16 document_box::DocumentBoxScopeRaw,
17 file::FileId,
18 folder::Folder,
19 presigned_upload_task::{
20 CreatePresignedUploadTask, PresignedTaskStatus, PresignedUploadTask,
21 PresignedUploadTaskId,
22 },
23 user::UserId,
24 },
25};
26use docbox_search::TenantSearchIndex;
27use mime::Mime;
28use serde::Serialize;
29use std::{collections::HashMap, ops::DerefMut, str::FromStr};
30use thiserror::Error;
31use uuid::Uuid;
32
33#[derive(Serialize)]
34pub struct PresignedUploadOutcome {
35 pub task_id: PresignedUploadTaskId,
36 pub method: String,
37 pub uri: String,
38 pub headers: HashMap<String, String>,
39}
40
41#[derive(Debug, Error)]
42pub enum PresignedUploadError {
43 #[error(transparent)]
45 UploadFile(#[from] UploadFileError),
46
47 #[error("failed to load file from s3")]
49 LoadFile(anyhow::Error),
50
51 #[error("file had an invalid mime type")]
53 InvalidMimeType(mime::FromStrError),
54
55 #[error("failed to create file")]
57 CreateFile(DbErr),
58
59 #[error("failed to process file: {0}")]
61 Processing(#[from] ProcessingError),
62
63 #[error("failed to update task status")]
65 UpdateTaskStatus(DbErr),
66}
67
68#[derive(Default)]
71pub struct PresignedUploadState {
72 file: UploadFileState,
74}
75
76pub struct CreatePresigned {
77 pub name: String,
79
80 pub document_box: DocumentBoxScopeRaw,
82
83 pub folder: Folder,
85
86 pub size: i32,
88
89 pub mime: Mime,
91
92 pub created_by: Option<UserId>,
94
95 pub parent_id: Option<FileId>,
97
98 pub processing_config: Option<ProcessingConfig>,
100}
101
102pub async fn create_presigned_upload(
104 db: &DbPool,
105 storage: &TenantStorageLayer,
106 create: CreatePresigned,
107) -> anyhow::Result<PresignedUploadOutcome> {
108 let file_key = create_file_key(
109 &create.folder.document_box,
110 &create.name,
111 &create.mime,
112 Uuid::new_v4(),
113 );
114 let (signed_request, expires_at) = storage
115 .create_presigned(&file_key, create.size as i64)
116 .await?;
117
118 let processing_config = match &create.processing_config {
120 Some(config) => {
121 let value = serde_json::to_value(config).map_err(|err| DbErr::Encode(Box::new(err)))?;
122 Some(value)
123 }
124 None => None,
125 };
126
127 let task = PresignedUploadTask::create(
128 db,
129 CreatePresignedUploadTask {
130 name: create.name,
131 mime: create.mime.to_string(),
132 document_box: create.document_box,
133 folder_id: create.folder.id,
134 size: create.size,
135 file_key,
136 created_by: create.created_by,
137 expires_at,
138 parent_id: create.parent_id,
139 processing_config,
140 },
141 )
142 .await?;
143
144 Ok(PresignedUploadOutcome {
145 task_id: task.id,
146 method: signed_request.method().to_string(),
147 uri: signed_request.uri().to_string(),
148 headers: signed_request
149 .headers()
150 .map(|(key, value)| (key.to_string(), value.to_string()))
151 .collect(),
152 })
153}
154
155pub struct CompletePresigned {
156 pub task: PresignedUploadTask,
157 pub folder: Folder,
158}
159
160pub async fn safe_complete_presigned(
163 db_pool: DbPool,
164 search: TenantSearchIndex,
165 storage: TenantStorageLayer,
166 events: TenantEventPublisher,
167 processing: ProcessingLayer,
168 mut complete: CompletePresigned,
169) -> Result<(), anyhow::Error> {
170 let mut db = db_pool.begin().await.map_err(|cause| {
172 tracing::error!(?cause, "failed to begin transaction");
173 anyhow::anyhow!("failed to begin transaction")
174 })?;
175
176 let mut upload_state = PresignedUploadState::default();
177
178 match complete_presigned(
179 &mut db,
180 &search,
181 &storage,
182 &events,
183 &processing,
184 &mut complete,
185 &mut upload_state,
186 )
187 .await
188 {
189 Ok(value) => value,
190 Err(err) => {
191 let error_message = err.to_string();
192
193 tokio::spawn(async move {
195 if let Err(cause) = db.rollback().await {
196 tracing::error!(?cause, "failed to roll back database transaction");
197 }
198
199 if let Err(cause) = complete
201 .task
202 .set_status(
203 &db_pool,
204 PresignedTaskStatus::Failed {
205 error: error_message,
206 },
207 )
208 .await
209 {
210 tracing::error!(?cause, "failed to set presigned task status to failure");
211 }
212
213 rollback_presigned_upload_file(&search, &storage, upload_state).await;
214 });
215
216 return Err(anyhow::Error::from(err));
217 }
218 };
219
220 if let Err(cause) = db.commit().await {
222 tracing::error!(?cause, "failed to commit transaction");
223
224 if let Err(cause) = complete
226 .task
227 .set_status(
228 &db_pool,
229 PresignedTaskStatus::Failed {
230 error: "Internal server error".to_string(),
231 },
232 )
233 .await
234 {
235 tracing::error!(?cause, "failed to set presigned task status to failure");
236 }
237
238 tokio::spawn(async move {
240 rollback_presigned_upload_file(&search, &storage, upload_state).await;
241 });
242
243 return Err(anyhow::anyhow!("failed to commit transaction"));
244 }
245
246 Ok(())
247}
248
249pub async fn complete_presigned(
251 db: &mut DbTransaction<'_>,
252 search: &TenantSearchIndex,
253 storage: &TenantStorageLayer,
254 events: &TenantEventPublisher,
255 processing: &ProcessingLayer,
256 complete: &mut CompletePresigned,
257 upload_state: &mut PresignedUploadState,
258) -> Result<(), PresignedUploadError> {
259 let task = &mut complete.task;
260
261 let file_bytes = storage
263 .get_file(&task.file_key)
264 .await
265 .map_err(PresignedUploadError::LoadFile)?
266 .collect_bytes()
267 .await
268 .map_err(PresignedUploadError::LoadFile)?;
269
270 let mime = mime::Mime::from_str(&task.mime).map_err(PresignedUploadError::InvalidMimeType)?;
272
273 let processing_config: Option<ProcessingConfig> = match &task.processing_config {
275 Some(value) => match serde_json::from_value(value.0.clone()) {
276 Ok(value) => value,
277 Err(cause) => {
278 tracing::error!(?cause, "failed to deserialize processing config");
279 None
280 }
281 },
282 None => None,
283 };
284
285 let upload = UploadFile {
286 fixed_id: None,
287 parent_id: task.parent_id,
288 folder_id: complete.folder.id,
289 document_box: complete.folder.document_box.clone(),
290 name: task.name.clone(),
291 mime,
292 file_bytes,
293 created_by: task.created_by.clone(),
294 file_key: Some(task.file_key.clone()),
295 processing_config,
296 };
297
298 let output = upload_file(
300 db,
301 search,
302 storage,
303 events,
304 processing,
305 upload,
306 &mut upload_state.file,
307 )
308 .await?;
309
310 task.set_status(
312 db.deref_mut(),
313 PresignedTaskStatus::Completed {
314 file_id: output.file.id,
315 },
316 )
317 .await
318 .map_err(PresignedUploadError::UpdateTaskStatus)?;
319
320 Ok(())
321}
322
323pub async fn rollback_presigned_upload_file(
326 search: &TenantSearchIndex,
327 storage: &TenantStorageLayer,
328 upload_state: PresignedUploadState,
329) {
330 rollback_upload_file(search, storage, upload_state.file).await;
332}