1use crate::files::{create_file_key, index_file::store_file_index};
2use crate::processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
3use crate::storage::TenantStorageLayer;
4use crate::utils::error::CompositeError;
5use crate::{
6 events::{TenantEventMessage, TenantEventPublisher},
7 files::generated::{QueuedUpload, upload_generated_files},
8};
9use bytes::Bytes;
10use docbox_database::models::document_box::DocumentBoxScopeRaw;
11use docbox_database::models::folder::FolderId;
12use docbox_database::{
13 DbErr, DbPool, DbTransaction,
14 models::{
15 document_box::WithScope,
16 file::{CreateFile, File, FileId},
17 generated_file::GeneratedFile,
18 user::UserId,
19 },
20};
21use docbox_search::TenantSearchIndex;
22use mime::Mime;
23use serde::{Deserialize, Serialize};
24use std::ops::DerefMut;
25use thiserror::Error;
26use utoipa::ToSchema;
27use uuid::Uuid;
28
29#[derive(Debug, Error)]
30pub enum UploadFileError {
31 #[error("failed to create file search index: {0}")]
33 CreateIndex(anyhow::Error),
34
35 #[error("failed to create file")]
37 CreateFile(DbErr),
38
39 #[error("failed to store file encryption state")]
41 SetEncryption(DbErr),
42
43 #[error("failed to process file: {0}")]
45 Processing(#[from] ProcessingError),
46
47 #[error("failed to upload generated file to storage layer: {0}")]
49 UploadGeneratedFile(anyhow::Error),
50
51 #[error("failed to create generated file")]
53 CreateGeneratedFile(DbErr),
54
55 #[error("failed to upload file to storage layer: {0}")]
57 UploadFile(anyhow::Error),
58
59 #[error(transparent)]
61 Composite(#[from] CompositeError),
62}
63
64#[derive(Default)]
67pub struct UploadFileState {
68 pub s3_upload_keys: Vec<String>,
70 pub search_index_files: Vec<Uuid>,
72}
73
74pub struct UploadFile {
75 pub fixed_id: Option<FileId>,
78
79 pub parent_id: Option<FileId>,
81
82 pub folder_id: FolderId,
84
85 pub document_box: DocumentBoxScopeRaw,
87
88 pub name: String,
90
91 pub mime: Mime,
93
94 pub file_bytes: Bytes,
96
97 pub created_by: Option<UserId>,
99
100 pub file_key: Option<String>,
102
103 pub processing_config: Option<ProcessingConfig>,
106}
107
108#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
109#[serde(default)]
110pub struct ProcessingConfig {
111 pub email: Option<EmailProcessingConfig>,
113}
114
115#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
116#[serde(default)]
117pub struct EmailProcessingConfig {
118 pub skip_attachments: Option<bool>,
120}
121
122pub struct UploadedFileData {
123 pub file: File,
125 pub generated: Vec<GeneratedFile>,
127 pub additional_files: Vec<UploadedFileData>,
129}
130
131pub async fn safe_upload_file(
134 db: DbPool,
135 search: TenantSearchIndex,
136 storage: TenantStorageLayer,
137 events: TenantEventPublisher,
138 processing: ProcessingLayer,
139 upload: UploadFile,
140) -> Result<UploadedFileData, anyhow::Error> {
141 let mut db = db.begin().await.map_err(|cause| {
143 tracing::error!(?cause, "failed to begin transaction");
144 anyhow::anyhow!("failed to begin transaction")
145 })?;
146
147 let mut upload_state = UploadFileState::default();
149
150 let output = match upload_file(
151 &mut db,
152 &search,
153 &storage,
154 &events,
155 &processing,
156 upload,
157 &mut upload_state,
158 )
159 .await
160 {
161 Ok(value) => value,
162 Err(err) => {
163 tokio::spawn(async move {
165 if let Err(cause) = db.rollback().await {
166 tracing::error!(?cause, "failed to roll back database transaction");
167 }
168
169 rollback_upload_file(&search, &storage, upload_state).await;
170 });
171
172 return Err(anyhow::Error::from(err));
173 }
174 };
175
176 if let Err(cause) = db.commit().await {
178 tracing::error!(?cause, "failed to commit transaction");
179
180 tokio::spawn(async move {
182 rollback_upload_file(&search, &storage, upload_state).await;
183 });
184
185 return Err(anyhow::anyhow!("failed to commit transaction"));
186 }
187
188 Ok(output)
189}
190
191pub async fn upload_file(
192 db: &mut DbTransaction<'_>,
193 search: &TenantSearchIndex,
194 storage: &TenantStorageLayer,
195 events: &TenantEventPublisher,
196 processing: &ProcessingLayer,
197 upload: UploadFile,
198 upload_state: &mut UploadFileState,
199) -> Result<UploadedFileData, UploadFileError> {
200 let (s3_upload, file_key) = match upload.file_key.as_ref() {
202 Some(file_key) => (false, file_key.clone()),
204
205 None => (
207 true,
208 create_file_key(
209 &upload.document_box,
210 &upload.name,
211 &upload.mime,
212 Uuid::new_v4(),
213 ),
214 ),
215 };
216
217 let processing_output = process_file(
219 &upload.processing_config,
220 processing,
221 upload.file_bytes.clone(),
222 &upload.mime,
223 )
224 .await?;
225
226 let encrypted = processing_output
228 .as_ref()
229 .map(|output| output.encrypted)
230 .unwrap_or_default();
231
232 let file = create_file_record(db, &upload, &file_key, &upload.file_bytes, encrypted).await?;
234
235 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
236 let mut generated_files: Option<Vec<GeneratedFile>> = None;
237 let mut additional_files: Vec<UploadedFileData> = Vec::new();
238
239 if let Some(processing_output) = processing_output {
240 index_metadata = processing_output.index_metadata;
241
242 tracing::debug!("uploading generated files");
243 let files = store_generated_files(
244 db,
245 storage,
246 &file,
247 &mut upload_state.s3_upload_keys,
248 processing_output.upload_queue,
249 )
250 .await?;
251 generated_files = Some(files);
252
253 for additional_file in processing_output.additional_files {
255 let upload = UploadFile {
256 parent_id: Some(file.id),
257 fixed_id: additional_file.fixed_id,
258 folder_id: upload.folder_id,
259 document_box: upload.document_box.clone(),
260 name: additional_file.name,
261 mime: additional_file.mime,
262 file_bytes: additional_file.bytes,
263 created_by: upload.created_by.clone(),
264 file_key: None,
265 processing_config: upload.processing_config.clone(),
266 };
267
268 let output = Box::pin(upload_file(
270 db,
271 search,
272 storage,
273 events,
274 processing,
275 upload,
276 upload_state,
277 ))
278 .await?;
279
280 additional_files.push(output);
281 }
282 }
283
284 tracing::debug!("indexing file contents");
286 store_file_index(search, &file, &upload.document_box, index_metadata).await?;
287 upload_state.search_index_files.push(file.id);
288
289 if s3_upload {
290 tracing::debug!("uploading main file");
292 storage
293 .upload_file(&file_key, file.mime.clone(), upload.file_bytes)
294 .await
295 .map_err(UploadFileError::UploadFile)?;
296 upload_state.s3_upload_keys.push(file_key.clone());
297 }
298
299 events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
301 file.clone(),
302 upload.document_box.clone(),
303 )));
304
305 Ok(UploadedFileData {
306 file,
307 generated: generated_files.unwrap_or_default(),
308 additional_files,
309 })
310}
311
312async fn create_file_record(
314 db: &mut DbTransaction<'_>,
315 upload: &UploadFile,
316 file_key: &str,
317 file_bytes: &Bytes,
318 encrypted: bool,
319) -> Result<File, UploadFileError> {
320 let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
321 let size = file_bytes.len().min(i32::MAX as usize) as i32;
322
323 File::create(
325 db.deref_mut(),
326 CreateFile {
327 parent_id: upload.parent_id,
328 fixed_id: upload.fixed_id,
329 name: upload.name.clone(),
330 mime: upload.mime.to_string(),
331 file_key: file_key.to_owned(),
332 folder_id: upload.folder_id,
333 hash,
334 size,
335 created_by: upload.created_by.clone(),
336 encrypted,
337 },
338 )
339 .await
340 .map_err(UploadFileError::CreateFile)
341}
342
343pub async fn store_generated_files(
351 db: &mut DbTransaction<'_>,
352 storage: &TenantStorageLayer,
353 file: &File,
354 s3_upload_keys: &mut Vec<String>,
355 queued_uploads: Vec<QueuedUpload>,
356) -> Result<Vec<GeneratedFile>, UploadFileError> {
357 let upload_results = upload_generated_files(
359 storage,
360 &file.file_key,
361 &file.id,
362 &file.hash,
363 queued_uploads,
364 )
365 .await;
366
367 let mut generated_files = Vec::new();
368 let mut upload_errors = Vec::new();
369
370 for result in upload_results {
371 match result {
372 Ok(create) => {
374 s3_upload_keys.push(file.file_key.clone());
376
377 let generated_file = match GeneratedFile::create(db.deref_mut(), create)
379 .await
380 .map_err(UploadFileError::CreateGeneratedFile)
381 {
382 Ok(value) => value,
383 Err(err) => {
384 upload_errors.push(err);
385 continue;
386 }
387 };
388
389 generated_files.push(generated_file);
390 }
391 Err(cause) => {
393 tracing::error!(?cause, "failed to upload generated file");
394 upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
395 }
396 }
397 }
398
399 if !upload_errors.is_empty() {
402 tracing::warn!("error while uploading generated files, operation failed");
403
404 let error = upload_errors
405 .into_iter()
406 .map(anyhow::Error::from)
407 .collect::<CompositeError>();
408
409 return Err(UploadFileError::Composite(error));
410 }
411
412 Ok(generated_files)
413}
414
415pub async fn rollback_upload_file(
418 search: &TenantSearchIndex,
419 storage: &TenantStorageLayer,
420 upload_state: UploadFileState,
421) {
422 for key in upload_state.s3_upload_keys {
424 if let Err(err) = storage.delete_file(&key).await {
425 tracing::error!(?err, "failed to rollback created tenant s3 file");
426 }
427 }
428
429 for index in upload_state.search_index_files {
431 if let Err(err) = search.delete_data(index).await {
432 tracing::error!(?index, ?err, "failed to rollback created file search index",);
433 }
434 }
435}