1use crate::files::{create_file_key, index_file::store_file_index};
2use crate::processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
3use crate::utils::error::CompositeError;
4use crate::{
5 events::{TenantEventMessage, TenantEventPublisher},
6 files::generated::{QueuedUpload, upload_generated_files},
7};
8use bytes::Bytes;
9use docbox_database::models::document_box::DocumentBoxScopeRaw;
10use docbox_database::models::folder::FolderId;
11use docbox_database::{
12 DbErr, DbPool, DbTransaction,
13 models::{
14 document_box::WithScope,
15 file::{CreateFile, File, FileId},
16 generated_file::GeneratedFile,
17 user::UserId,
18 },
19};
20use docbox_search::TenantSearchIndex;
21use docbox_storage::TenantStorageLayer;
22use mime::Mime;
23use serde::{Deserialize, Serialize};
24use std::ops::DerefMut;
25use thiserror::Error;
26use tracing::Instrument;
27use utoipa::ToSchema;
28use uuid::Uuid;
29
30#[derive(Debug, Error)]
31pub enum UploadFileError {
32 #[error("failed to create file search index: {0}")]
34 CreateIndex(anyhow::Error),
35
36 #[error("failed to create file")]
38 CreateFile(DbErr),
39
40 #[error("failed to store file encryption state")]
42 SetEncryption(DbErr),
43
44 #[error("failed to process file: {0}")]
46 Processing(#[from] ProcessingError),
47
48 #[error("failed to upload generated file to storage layer: {0}")]
50 UploadGeneratedFile(anyhow::Error),
51
52 #[error("failed to create generated file")]
54 CreateGeneratedFile(DbErr),
55
56 #[error("failed to upload file to storage layer: {0}")]
58 UploadFile(anyhow::Error),
59
60 #[error(transparent)]
62 Composite(#[from] CompositeError),
63}
64
65#[derive(Default)]
68pub struct UploadFileState {
69 pub s3_upload_keys: Vec<String>,
71 pub search_index_files: Vec<Uuid>,
73}
74
75pub struct UploadFile {
76 pub fixed_id: Option<FileId>,
79
80 pub parent_id: Option<FileId>,
82
83 pub folder_id: FolderId,
85
86 pub document_box: DocumentBoxScopeRaw,
88
89 pub name: String,
91
92 pub mime: Mime,
94
95 pub file_bytes: Bytes,
97
98 pub created_by: Option<UserId>,
100
101 pub file_key: Option<String>,
103
104 pub processing_config: Option<ProcessingConfig>,
107}
108
109#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
110#[serde(default)]
111pub struct ProcessingConfig {
112 pub email: Option<EmailProcessingConfig>,
114}
115
116#[derive(Debug, Default, Clone, Deserialize, Serialize, ToSchema)]
117#[serde(default)]
118pub struct EmailProcessingConfig {
119 pub skip_attachments: Option<bool>,
121}
122
123pub struct UploadedFileData {
124 pub file: File,
126 pub generated: Vec<GeneratedFile>,
128 pub additional_files: Vec<UploadedFileData>,
130}
131
132pub async fn safe_upload_file(
135 db: DbPool,
136 search: TenantSearchIndex,
137 storage: TenantStorageLayer,
138 events: TenantEventPublisher,
139 processing: ProcessingLayer,
140 upload: UploadFile,
141) -> Result<UploadedFileData, anyhow::Error> {
142 let mut db = db.begin().await.map_err(|cause| {
144 tracing::error!(?cause, "failed to begin transaction");
145 anyhow::anyhow!("failed to begin transaction")
146 })?;
147
148 let mut upload_state = UploadFileState::default();
150
151 let output = match upload_file(
152 &mut db,
153 &search,
154 &storage,
155 &events,
156 &processing,
157 upload,
158 &mut upload_state,
159 )
160 .await
161 {
162 Ok(value) => value,
163 Err(err) => {
164 let span = tracing::Span::current();
165
166 tokio::spawn(
168 async move {
169 if let Err(cause) = db.rollback().await {
170 tracing::error!(?cause, "failed to roll back database transaction");
171 }
172
173 rollback_upload_file(&search, &storage, upload_state).await;
174 }
175 .instrument(span),
176 );
177
178 return Err(anyhow::Error::from(err));
179 }
180 };
181
182 if let Err(cause) = db.commit().await {
184 tracing::error!(?cause, "failed to commit transaction");
185 let span = tracing::Span::current();
186
187 tokio::spawn(
189 async move {
190 rollback_upload_file(&search, &storage, upload_state).await;
191 }
192 .instrument(span),
193 );
194
195 return Err(anyhow::anyhow!("failed to commit transaction"));
196 }
197
198 Ok(output)
199}
200
201pub async fn upload_file(
202 db: &mut DbTransaction<'_>,
203 search: &TenantSearchIndex,
204 storage: &TenantStorageLayer,
205 events: &TenantEventPublisher,
206 processing: &ProcessingLayer,
207 upload: UploadFile,
208 upload_state: &mut UploadFileState,
209) -> Result<UploadedFileData, UploadFileError> {
210 let (s3_upload, file_key) = match upload.file_key.as_ref() {
212 Some(file_key) => (false, file_key.clone()),
214
215 None => (
217 true,
218 create_file_key(
219 &upload.document_box,
220 &upload.name,
221 &upload.mime,
222 Uuid::new_v4(),
223 ),
224 ),
225 };
226
227 let processing_output = process_file(
229 &upload.processing_config,
230 processing,
231 upload.file_bytes.clone(),
232 &upload.mime,
233 )
234 .await?;
235
236 let encrypted = processing_output
238 .as_ref()
239 .map(|output| output.encrypted)
240 .unwrap_or_default();
241
242 let file = create_file_record(db, &upload, &file_key, &upload.file_bytes, encrypted).await?;
244
245 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
246 let mut generated_files: Option<Vec<GeneratedFile>> = None;
247 let mut additional_files: Vec<UploadedFileData> = Vec::new();
248
249 if let Some(processing_output) = processing_output {
250 index_metadata = processing_output.index_metadata;
251
252 tracing::debug!("uploading generated files");
253 let files = store_generated_files(
254 db,
255 storage,
256 &file,
257 &mut upload_state.s3_upload_keys,
258 processing_output.upload_queue,
259 )
260 .await?;
261 generated_files = Some(files);
262
263 for additional_file in processing_output.additional_files {
265 let upload = UploadFile {
266 parent_id: Some(file.id),
267 fixed_id: additional_file.fixed_id,
268 folder_id: upload.folder_id,
269 document_box: upload.document_box.clone(),
270 name: additional_file.name,
271 mime: additional_file.mime,
272 file_bytes: additional_file.bytes,
273 created_by: upload.created_by.clone(),
274 file_key: None,
275 processing_config: upload.processing_config.clone(),
276 };
277
278 let output = Box::pin(upload_file(
280 db,
281 search,
282 storage,
283 events,
284 processing,
285 upload,
286 upload_state,
287 ))
288 .await?;
289
290 additional_files.push(output);
291 }
292 }
293
294 tracing::debug!("indexing file contents");
296 store_file_index(search, &file, &upload.document_box, index_metadata).await?;
297 upload_state.search_index_files.push(file.id);
298
299 if s3_upload {
300 tracing::debug!("uploading main file");
302 storage
303 .upload_file(&file_key, file.mime.clone(), upload.file_bytes)
304 .await
305 .map_err(UploadFileError::UploadFile)?;
306 upload_state.s3_upload_keys.push(file_key.clone());
307 }
308
309 events.publish_event(TenantEventMessage::FileCreated(WithScope::new(
311 file.clone(),
312 upload.document_box.clone(),
313 )));
314
315 Ok(UploadedFileData {
316 file,
317 generated: generated_files.unwrap_or_default(),
318 additional_files,
319 })
320}
321
322async fn create_file_record(
324 db: &mut DbTransaction<'_>,
325 upload: &UploadFile,
326 file_key: &str,
327 file_bytes: &Bytes,
328 encrypted: bool,
329) -> Result<File, UploadFileError> {
330 let hash = sha256::digest(file_bytes.as_ref() as &[u8]);
331 let size = file_bytes.len().min(i32::MAX as usize) as i32;
332
333 File::create(
335 db.deref_mut(),
336 CreateFile {
337 parent_id: upload.parent_id,
338 fixed_id: upload.fixed_id,
339 name: upload.name.clone(),
340 mime: upload.mime.to_string(),
341 file_key: file_key.to_owned(),
342 folder_id: upload.folder_id,
343 hash,
344 size,
345 created_by: upload.created_by.clone(),
346 encrypted,
347 },
348 )
349 .await
350 .map_err(UploadFileError::CreateFile)
351}
352
353pub async fn store_generated_files(
361 db: &mut DbTransaction<'_>,
362 storage: &TenantStorageLayer,
363 file: &File,
364 s3_upload_keys: &mut Vec<String>,
365 queued_uploads: Vec<QueuedUpload>,
366) -> Result<Vec<GeneratedFile>, UploadFileError> {
367 let upload_results = upload_generated_files(
369 storage,
370 &file.file_key,
371 &file.id,
372 &file.hash,
373 queued_uploads,
374 )
375 .await;
376
377 let mut generated_files = Vec::new();
378 let mut upload_errors = Vec::new();
379
380 for result in upload_results {
381 match result {
382 Ok(create) => {
384 s3_upload_keys.push(file.file_key.clone());
386
387 let generated_file = match GeneratedFile::create(db.deref_mut(), create)
389 .await
390 .map_err(UploadFileError::CreateGeneratedFile)
391 {
392 Ok(value) => value,
393 Err(err) => {
394 upload_errors.push(err);
395 continue;
396 }
397 };
398
399 generated_files.push(generated_file);
400 }
401 Err(cause) => {
403 tracing::error!(?cause, "failed to upload generated file");
404 upload_errors.push(UploadFileError::UploadGeneratedFile(cause));
405 }
406 }
407 }
408
409 if !upload_errors.is_empty() {
412 tracing::warn!("error while uploading generated files, operation failed");
413
414 let error = upload_errors
415 .into_iter()
416 .map(anyhow::Error::from)
417 .collect::<CompositeError>();
418
419 return Err(UploadFileError::Composite(error));
420 }
421
422 Ok(generated_files)
423}
424
425pub async fn rollback_upload_file(
428 search: &TenantSearchIndex,
429 storage: &TenantStorageLayer,
430 upload_state: UploadFileState,
431) {
432 for key in upload_state.s3_upload_keys {
434 if let Err(err) = storage.delete_file(&key).await {
435 tracing::error!(?err, "failed to rollback created tenant s3 file");
436 }
437 }
438
439 for index in upload_state.search_index_files {
441 if let Err(err) = search.delete_data(index).await {
442 tracing::error!(?index, ?err, "failed to rollback created file search index",);
443 }
444 }
445}