1use crate::{
12 files::{
13 index_file::store_file_index,
14 upload_file::{UploadFileError, store_generated_files},
15 },
16 utils::{file::get_file_name_ext, timing::handle_slow_future},
17};
18use docbox_database::{
19 DbPool, DbResult,
20 models::{
21 file::{CreateFile, FileWithScope},
22 generated_file::{CreateGeneratedFile, GeneratedFile},
23 },
24};
25use docbox_processing::{
26 DEFAULT_PROCESS_TIMEOUT, ProcessingError, ProcessingIndexMetadata, ProcessingLayer,
27 process_file,
28};
29use docbox_search::TenantSearchIndex;
30use docbox_storage::{StorageLayerError, TenantStorageLayer};
31use futures::{StreamExt, future::BoxFuture};
32use mime::Mime;
33use std::{ops::DerefMut, time::Duration};
34use thiserror::Error;
35use tokio::time::timeout;
36use tracing::Instrument;
37
38pub async fn reprocess_octet_stream_files(
39 db: &DbPool,
40 search: &TenantSearchIndex,
41 storage: &TenantStorageLayer,
42 processing: &ProcessingLayer,
43) -> DbResult<()> {
44 _ = search.create_index().await;
45
46 let files = get_files(db).await?;
47 let mut skipped = Vec::new();
48 let mut processing_files = Vec::new();
49
50 for file in files {
51 let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
52 let guesses = mime_guess::from_ext(&ext);
53 guesses.first()
54 });
55
56 if let Some(mime) = guessed_mime {
57 processing_files.push((file, mime));
58 } else {
59 skipped.push(file);
60 }
61 }
62
63 let span = tracing::Span::current();
64
65 _ = futures::stream::iter(processing_files)
67 .map(|(file, mime)| -> BoxFuture<'static, ()> {
68 let db = db.clone();
69 let search = search.clone();
70 let storage = storage.clone();
71 let processing = processing.clone();
72 let span = span.clone();
73
74 Box::pin(
75 async move {
76 tracing::debug!(?file, "stating file");
77 if let Err(error) =
78 perform_process_file(db, storage, search, processing, file, mime).await
79 {
80 tracing::error!(?error, "failed to migrate file");
81 };
82 }
83 .instrument(span),
84 )
85 })
86 .buffered(FILE_PROCESS_SIZE)
87 .collect::<Vec<()>>()
88 .await;
89
90 for skipped in skipped {
91 tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
92 }
93
94 Ok(())
95}
96
97const DATABASE_PAGE_SIZE: u64 = 1000;
99const FILE_PROCESS_SIZE: usize = 50;
101
102pub async fn get_files(db: &DbPool) -> DbResult<Vec<FileWithScope>> {
103 let mut page_index = 0;
104 let mut data = Vec::new();
105
106 loop {
107 let mut files = match docbox_database::models::file::File::all_by_mime(
108 db,
109 "application/octet-stream",
110 page_index * DATABASE_PAGE_SIZE,
111 DATABASE_PAGE_SIZE,
112 )
113 .await
114 {
115 Ok(value) => value,
116 Err(error) => {
117 tracing::error!(?error, ?page_index, "failed to load files page");
118 return Err(error);
119 }
120 };
121
122 let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
123
124 data.append(&mut files);
125
126 if is_end {
127 break;
128 }
129
130 page_index += 1;
131 }
132
133 Ok(data)
134}
135
136#[derive(Debug, Error)]
137pub enum ProcessFileError {
138 #[error("failed to begin transaction")]
139 BeginTransaction,
140
141 #[error("failed to commit transaction")]
142 CommitTransaction,
143
144 #[error(transparent)]
145 Storage(#[from] StorageLayerError),
146
147 #[error(transparent)]
148 Process(#[from] ProcessingError),
149
150 #[error(transparent)]
151 UploadFile(#[from] UploadFileError),
152
153 #[error("failed to mark file as encrypted")]
154 SetEncrypted,
155
156 #[error("failed to update file mime")]
157 SetMime,
158
159 #[error("timeout occurred while processing file")]
160 ConvertTimeout,
161}
162
163pub async fn perform_process_file(
165 db: DbPool,
166 storage: TenantStorageLayer,
167 search: TenantSearchIndex,
168 processing: ProcessingLayer,
169 mut file: FileWithScope,
170 mime: Mime,
171) -> Result<(), ProcessFileError> {
172 let bytes = storage
173 .get_file(&file.file.file_key)
174 .await
175 .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?
176 .collect_bytes()
177 .await
178 .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?;
179
180 let process_future = process_file(&None, &processing, bytes, &mime);
181
182 let process_timeout = processing
183 .config
184 .process_timeout
185 .unwrap_or(DEFAULT_PROCESS_TIMEOUT);
186
187 let process_future = timeout(process_timeout, process_future);
189
190 let processing_output = handle_slow_future(process_future, Duration::from_secs(25), || {
192 tracing::warn!(
193 ?file,
194 "file upload processing has taken over 25s to complete"
195 )
196 })
197 .await
198 .map_err(|_| ProcessFileError::ConvertTimeout)??;
199
200 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
201
202 let file_in = &file.file;
203
204 let created_file = CreateFile {
205 id: file_in.id,
206 parent_id: file_in.parent_id,
207 name: file_in.name.clone(),
208 mime: file_in.mime.to_string(),
209 file_key: file_in.file_key.to_owned(),
210 folder_id: file_in.folder_id,
211 hash: file_in.hash.clone(),
212 size: file_in.size,
213 created_by: file_in.created_by.clone(),
214 created_at: file_in.created_at,
215 encrypted: file_in.encrypted,
216 };
217
218 let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
219
220 let encrypted = processing_output
222 .as_ref()
223 .map(|output| output.encrypted)
224 .unwrap_or_default();
225
226 if let Some(processing_output) = processing_output {
227 index_metadata = processing_output.index_metadata;
228
229 let mut s3_upload_keys = Vec::new();
230
231 tracing::debug!("uploading generated files");
232 let prepared_files = store_generated_files(
233 &storage,
234 &created_file,
235 &mut s3_upload_keys,
236 processing_output.upload_queue,
237 )
238 .await?;
239 generated_files = Some(prepared_files);
240 }
241
242 tracing::debug!("indexing file contents");
244 store_file_index(&search, &created_file, &file.scope, index_metadata).await?;
245
246 let mut db = db.begin().await.map_err(|error| {
248 tracing::error!(?error, "failed to begin transaction");
249 ProcessFileError::BeginTransaction
250 })?;
251
252 if let Some(creates) = generated_files {
254 for create in creates {
255 GeneratedFile::create(db.deref_mut(), create)
256 .await
257 .map_err(UploadFileError::CreateGeneratedFile)?;
258 }
259 }
260
261 if encrypted {
262 tracing::debug!("marking file as encrypted");
264 file.file = file
265 .file
266 .set_encrypted(db.deref_mut(), true)
267 .await
268 .map_err(|error| {
269 tracing::error!(?error, "failed to set file as encrypted");
270 ProcessFileError::SetEncrypted
271 })?;
272 }
273
274 tracing::debug!("updating file mime type");
276 file.file = file
277 .file
278 .set_mime(db.deref_mut(), mime.to_string())
279 .await
280 .map_err(|error| {
281 tracing::error!(?error, "failed to set file mime");
282 ProcessFileError::SetMime
283 })?;
284
285 db.commit().await.map_err(|error| {
286 tracing::error!(?error, "failed to commit transaction");
287 ProcessFileError::CommitTransaction
288 })?;
289
290 Ok(())
291}