1use crate::{
2 events::{TenantEventMessage, TenantEventPublisher},
3 files::{
4 create_file_key,
5 generated::{make_create_generated_files, upload_generated_files},
6 index_file::store_file_index,
7 },
8};
9use bytes::Bytes;
10use chrono::Utc;
11use docbox_database::models::{
12 document_box::DocumentBoxScopeRaw, generated_file::CreateGeneratedFile,
13};
14use docbox_database::models::{document_box::DocumentBoxScopeRawRef, folder::FolderId};
15use docbox_database::{
16 DbErr, DbPool, DbTransaction,
17 models::{
18 document_box::WithScope,
19 file::{CreateFile, File, FileId},
20 generated_file::GeneratedFile,
21 user::UserId,
22 },
23};
24use docbox_processing::{
25 ProcessingConfig, ProcessingError, ProcessingIndexMetadata, ProcessingLayer, QueuedUpload,
26 process_file,
27};
28use docbox_search::{SearchError, TenantSearchIndex};
29use docbox_storage::{StorageLayerError, TenantStorageLayer};
30use mime::Mime;
31use std::ops::DerefMut;
32use thiserror::Error;
33use tracing::Instrument;
34use uuid::Uuid;
35
36#[derive(Debug, Error)]
39pub enum UploadFileError {
40 #[error("failed to create file search index: {0}")]
42 CreateIndex(SearchError),
43
44 #[error("failed to create file entry")]
46 CreateFile(DbErr),
47
48 #[error("failed to process file: {0}")]
50 Processing(#[from] ProcessingError),
51
52 #[error("failed to upload generated file to storage layer: {0}")]
54 UploadGeneratedFile(StorageLayerError),
55
56 #[error("failed to create generated file")]
58 CreateGeneratedFile(DbErr),
59
60 #[error("failed to upload file to storage layer: {0}")]
62 UploadFile(StorageLayerError),
63
64 #[error("failed to upload files")]
66 FailedFileUploads(Vec<UploadFileError>),
67
68 #[error("failed to perform operation (start)")]
70 BeginTransaction(DbErr),
71
72 #[error("failed to perform operation (end)")]
74 CommitTransaction(DbErr),
75}
76
77#[derive(Default)]
80pub struct UploadFileState {
81 pub storage_upload_keys: Vec<String>,
83 pub search_index_files: Vec<Uuid>,
85}
86
87pub struct UploadFile {
88 pub fixed_id: Option<FileId>,
91
92 pub parent_id: Option<FileId>,
94
95 pub folder_id: FolderId,
97
98 pub document_box: DocumentBoxScopeRaw,
100
101 pub name: String,
103
104 pub mime: Mime,
106
107 pub file_bytes: Bytes,
109
110 pub created_by: Option<UserId>,
112
113 pub file_key: Option<String>,
115
116 pub processing_config: Option<ProcessingConfig>,
119}
120
121#[derive(Debug)]
122pub struct UploadedFileData {
123 pub file: File,
125 pub generated: Vec<GeneratedFile>,
127 pub additional_files: Vec<UploadedFileData>,
129}
130
131pub async fn upload_file(
132 db: &DbPool,
133 search: &TenantSearchIndex,
134 storage: &TenantStorageLayer,
135 processing: &ProcessingLayer,
136 events: &TenantEventPublisher,
137 upload: UploadFile,
138) -> Result<UploadedFileData, UploadFileError> {
139 let document_box = upload.document_box.clone();
140 let mut upload_state = UploadFileState::default();
141
142 let data =
144 match upload_file_inner(search, storage, processing, upload, &mut upload_state, 0).await {
145 Ok(value) => value,
146 Err(error) => {
147 tracing::error!(?error, "failed to complete inner file processing");
148 background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
149 return Err(error);
150 }
151 };
152
153 let mut db = db.begin().await.map_err(|error| {
155 tracing::error!(?error, "failed to begin transaction");
156 UploadFileError::BeginTransaction(error)
157 })?;
158
159 let output = match persist_file_upload(&mut db, data).await {
160 Ok(value) => value,
161 Err(error) => {
162 if let Err(cause) = db.rollback().await {
163 tracing::error!(?cause, "failed to roll back database transaction");
164 }
165
166 tracing::error!(?error, "failed to complete inner file processing");
167 background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
168 return Err(error);
169 }
170 };
171
172 if let Err(error) = db.commit().await {
173 tracing::error!(?error, "failed to commit transaction");
174 background_rollback_upload_file(search.clone(), storage.clone(), upload_state);
175 return Err(UploadFileError::CommitTransaction(error));
176 }
177
178 publish_file_creation_events(events, &document_box, &output);
180
181 Ok(output)
182}
183
184pub fn publish_file_creation_events(
186 events: &TenantEventPublisher,
187 document_box: DocumentBoxScopeRawRef<'_>,
188 output: &UploadedFileData,
189) {
190 events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
192 output.file.clone(),
193 document_box.to_string(),
194 )));
195
196 for additional_file in &output.additional_files {
197 publish_file_creation_events(events, document_box, additional_file);
198 }
199}
200
201#[derive(Debug)]
202pub struct PreparedUploadData {
203 file: CreateFile,
205
206 generated_files: Option<Vec<CreateGeneratedFile>>,
208
209 additional_files: Vec<PreparedUploadData>,
211}
212
213async fn upload_file_inner(
224 search: &TenantSearchIndex,
225 storage: &TenantStorageLayer,
226 processing: &ProcessingLayer,
227 upload: UploadFile,
228 upload_state: &mut UploadFileState,
229 iteration: usize,
230) -> Result<PreparedUploadData, UploadFileError> {
231 let server_max_iterations = processing.config.max_unpack_iterations.unwrap_or(1);
232 let max_iterations = upload
233 .processing_config
234 .as_ref()
235 .and_then(|value| value.max_unpack_iterations)
236 .unwrap_or(1)
237 .min(server_max_iterations);
238
239 let (s3_upload, file_key) = match upload.file_key.as_ref() {
241 Some(file_key) => (false, file_key.clone()),
243
244 None => (
246 true,
247 create_file_key(
248 &upload.document_box,
249 &upload.name,
250 &upload.mime,
251 Uuid::new_v4(),
252 ),
253 ),
254 };
255
256 let processing_output = process_file(
258 &upload.processing_config,
259 processing,
260 upload.file_bytes.clone(),
261 &upload.mime,
262 )
263 .await?;
264
265 let encrypted = processing_output
267 .as_ref()
268 .map(|output| output.encrypted)
269 .unwrap_or_default();
270
271 let file_record = make_file_record(&upload, &file_key, &upload.file_bytes, encrypted);
272
273 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
274 let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
275 let mut additional_files: Vec<PreparedUploadData> = Vec::new();
276
277 if let Some(processing_output) = processing_output {
278 index_metadata = processing_output.index_metadata;
279
280 tracing::debug!("uploading generated files");
282 let prepared_files = store_generated_files(
283 storage,
284 &file_record,
285 &mut upload_state.storage_upload_keys,
286 processing_output.upload_queue,
287 )
288 .await?;
289 generated_files = Some(prepared_files);
290
291 let next_iteration = iteration + 1;
292 if next_iteration > max_iterations {
293 tracing::debug!(
294 ?iteration,
295 "additional files were present but exceeded the max unpacking iteration for this request"
296 );
297 } else {
298 for additional_file in processing_output.additional_files {
300 let upload = UploadFile {
301 parent_id: Some(file_record.id),
302 fixed_id: additional_file.fixed_id,
303 folder_id: upload.folder_id,
304 document_box: upload.document_box.clone(),
305 name: additional_file.name,
306 mime: additional_file.mime,
307 file_bytes: additional_file.bytes,
308 created_by: upload.created_by.clone(),
309 file_key: None,
310 processing_config: upload.processing_config.clone(),
311 };
312
313 let output = Box::pin(upload_file_inner(
315 search,
316 storage,
317 processing,
318 upload,
319 upload_state,
320 next_iteration,
321 ))
322 .await?;
323
324 additional_files.push(output);
325 }
326 }
327 }
328
329 tracing::debug!("indexing file contents");
331 store_file_index(search, &file_record, &upload.document_box, index_metadata).await?;
332 upload_state.search_index_files.push(file_record.id);
333
334 if s3_upload {
335 tracing::debug!("uploading main file");
337 storage
338 .upload_file(&file_key, file_record.mime.clone(), upload.file_bytes)
339 .await
340 .map_err(UploadFileError::UploadFile)?;
341 upload_state.storage_upload_keys.push(file_key.clone());
342 }
343
344 Ok(PreparedUploadData {
345 file: file_record,
346 generated_files,
347 additional_files,
348 })
349}
350
351async fn persist_file_upload(
353 db: &mut DbTransaction<'_>,
354 data: PreparedUploadData,
355) -> Result<UploadedFileData, UploadFileError> {
356 let file = File::create(db.deref_mut(), data.file)
358 .await
359 .map_err(UploadFileError::CreateFile)?;
360
361 let mut generated_files = Vec::new();
363 if let Some(creates) = data.generated_files {
364 for create in creates {
365 let generated_file = GeneratedFile::create(db.deref_mut(), create)
366 .await
367 .map_err(UploadFileError::CreateGeneratedFile)?;
368
369 generated_files.push(generated_file);
370 }
371 }
372
373 let mut additional_files: Vec<UploadedFileData> = Vec::new();
375 for additional_file in data.additional_files {
376 let inner = Box::pin(persist_file_upload(db, additional_file)).await?;
377 additional_files.push(inner);
378 }
379
380 Ok(UploadedFileData {
381 file,
382 generated: generated_files,
383 additional_files,
384 })
385}
386
387fn make_file_record(
389 upload: &UploadFile,
390 file_key: &str,
391 file_bytes: &Bytes,
392 encrypted: bool,
393) -> CreateFile {
394 let id = upload.fixed_id.unwrap_or_else(Uuid::new_v4);
395 let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
396 let size = file_bytes.len().min(i32::MAX as usize) as i32;
397 let created_at = Utc::now();
398
399 CreateFile {
400 id,
401 parent_id: upload.parent_id,
402 name: upload.name.clone(),
403 mime: upload.mime.to_string(),
404 file_key: file_key.to_owned(),
405 folder_id: upload.folder_id,
406 hash,
407 size,
408 created_by: upload.created_by.clone(),
409 created_at,
410 encrypted,
411 }
412}
413
414pub async fn store_generated_files(
422 storage: &TenantStorageLayer,
423 file: &CreateFile,
424 storage_upload_keys: &mut Vec<String>,
425 queued_uploads: Vec<QueuedUpload>,
426) -> Result<Vec<CreateGeneratedFile>, UploadFileError> {
427 let prepared_uploads =
428 make_create_generated_files(&file.file_key, &file.id, &file.hash, queued_uploads);
429
430 let upload_results = upload_generated_files(storage, prepared_uploads).await;
432
433 let mut generated_files = Vec::new();
434 let mut upload_errors = Vec::new();
435
436 for result in upload_results {
437 match result {
438 Ok(create) => {
440 storage_upload_keys.push(file.file_key.clone());
442 generated_files.push(create);
443 }
444 Err(cause) => {
446 tracing::error!(?cause, "failed to upload generated file");
447 upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
448 }
449 }
450 }
451
452 if !upload_errors.is_empty() {
455 tracing::warn!(
456 ?upload_errors,
457 "error while uploading generated files, operation failed"
458 );
459
460 return Err(UploadFileError::FailedFileUploads(upload_errors));
461 }
462
463 Ok(generated_files)
464}
465
466fn background_rollback_upload_file(
468 search: TenantSearchIndex,
469 storage: TenantStorageLayer,
470 upload_state: UploadFileState,
471) {
472 let span = tracing::Span::current();
473
474 tokio::spawn(
476 async move {
477 rollback_upload_file(&search, &storage, upload_state).await;
478 }
479 .instrument(span),
480 );
481}
482
483async fn rollback_upload_file(
486 search: &TenantSearchIndex,
487 storage: &TenantStorageLayer,
488 upload_state: UploadFileState,
489) {
490 for key in upload_state.storage_upload_keys {
492 if let Err(err) = storage.delete_file(&key).await {
493 tracing::error!(?err, "failed to rollback created tenant s3 file");
494 }
495 }
496
497 for index in upload_state.search_index_files {
499 if let Err(err) = search.delete_data(index).await {
500 tracing::error!(?index, ?err, "failed to rollback created file search index",);
501 }
502 }
503}