1use crate::{
12 files::{
13 index_file::store_file_index,
14 upload_file::{UploadFileError, store_generated_files},
15 },
16 utils::file::get_file_name_ext,
17};
18use docbox_database::{
19 DbPool, DbResult,
20 models::{
21 file::{CreateFile, FileWithScope},
22 generated_file::{CreateGeneratedFile, GeneratedFile},
23 },
24};
25use docbox_processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
26use docbox_search::TenantSearchIndex;
27use docbox_storage::{StorageLayerError, TenantStorageLayer};
28use futures::{StreamExt, future::BoxFuture};
29use mime::Mime;
30use std::ops::DerefMut;
31use thiserror::Error;
32use tracing::Instrument;
33
34pub async fn reprocess_octet_stream_files(
35 db: &DbPool,
36 search: &TenantSearchIndex,
37 storage: &TenantStorageLayer,
38 processing: &ProcessingLayer,
39) -> DbResult<()> {
40 _ = search.create_index().await;
41
42 let files = get_files(db).await?;
43 let mut skipped = Vec::new();
44 let mut processing_files = Vec::new();
45
46 for file in files {
47 let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
48 let guesses = mime_guess::from_ext(&ext);
49 guesses.first()
50 });
51
52 if let Some(mime) = guessed_mime {
53 processing_files.push((file, mime));
54 } else {
55 skipped.push(file);
56 }
57 }
58
59 let span = tracing::Span::current();
60
61 _ = futures::stream::iter(processing_files)
63 .map(|(file, mime)| -> BoxFuture<'static, ()> {
64 let db = db.clone();
65 let search = search.clone();
66 let storage = storage.clone();
67 let processing = processing.clone();
68 let span = span.clone();
69
70 Box::pin(
71 async move {
72 tracing::debug!(?file, "stating file");
73 if let Err(error) =
74 perform_process_file(db, storage, search, processing, file, mime).await
75 {
76 tracing::error!(?error, "failed to migrate file");
77 };
78 }
79 .instrument(span),
80 )
81 })
82 .buffered(FILE_PROCESS_SIZE)
83 .collect::<Vec<()>>()
84 .await;
85
86 for skipped in skipped {
87 tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
88 }
89
90 Ok(())
91}
92
93const DATABASE_PAGE_SIZE: u64 = 1000;
95const FILE_PROCESS_SIZE: usize = 50;
97
98pub async fn get_files(db: &DbPool) -> DbResult<Vec<FileWithScope>> {
99 let mut page_index = 0;
100 let mut data = Vec::new();
101
102 loop {
103 let mut files = match docbox_database::models::file::File::all_by_mime(
104 db,
105 "application/octet-stream",
106 page_index * DATABASE_PAGE_SIZE,
107 DATABASE_PAGE_SIZE,
108 )
109 .await
110 {
111 Ok(value) => value,
112 Err(error) => {
113 tracing::error!(?error, ?page_index, "failed to load files page");
114 return Err(error);
115 }
116 };
117
118 let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
119
120 data.append(&mut files);
121
122 if is_end {
123 break;
124 }
125
126 page_index += 1;
127 }
128
129 Ok(data)
130}
131
132#[derive(Debug, Error)]
133pub enum ProcessFileError {
134 #[error("failed to begin transaction")]
135 BeginTransaction,
136
137 #[error("failed to commit transaction")]
138 CommitTransaction,
139
140 #[error(transparent)]
141 Storage(#[from] StorageLayerError),
142
143 #[error(transparent)]
144 Process(#[from] ProcessingError),
145
146 #[error(transparent)]
147 UploadFile(#[from] UploadFileError),
148
149 #[error("failed to mark file as encrypted")]
150 SetEncrypted,
151
152 #[error("failed to update file mime")]
153 SetMime,
154}
155
156pub async fn perform_process_file(
158 db: DbPool,
159 storage: TenantStorageLayer,
160 search: TenantSearchIndex,
161 processing: ProcessingLayer,
162 mut file: FileWithScope,
163 mime: Mime,
164) -> Result<(), ProcessFileError> {
165 let bytes = storage
166 .get_file(&file.file.file_key)
167 .await
168 .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?
169 .collect_bytes()
170 .await
171 .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?;
172
173 let processing_output = process_file(&None, &processing, bytes, &mime).await?;
174
175 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
176
177 let file_in = &file.file;
178
179 let created_file = CreateFile {
180 id: file_in.id,
181 parent_id: file_in.parent_id,
182 name: file_in.name.clone(),
183 mime: file_in.mime.to_string(),
184 file_key: file_in.file_key.to_owned(),
185 folder_id: file_in.folder_id,
186 hash: file_in.hash.clone(),
187 size: file_in.size,
188 created_by: file_in.created_by.clone(),
189 created_at: file_in.created_at,
190 encrypted: file_in.encrypted,
191 };
192
193 let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
194
195 let encrypted = processing_output
197 .as_ref()
198 .map(|output| output.encrypted)
199 .unwrap_or_default();
200
201 if let Some(processing_output) = processing_output {
202 index_metadata = processing_output.index_metadata;
203
204 let mut s3_upload_keys = Vec::new();
205
206 tracing::debug!("uploading generated files");
207 let prepared_files = store_generated_files(
208 &storage,
209 &created_file,
210 &mut s3_upload_keys,
211 processing_output.upload_queue,
212 )
213 .await?;
214 generated_files = Some(prepared_files);
215 }
216
217 tracing::debug!("indexing file contents");
219 store_file_index(&search, &created_file, &file.scope, index_metadata).await?;
220
221 let mut db = db.begin().await.map_err(|cause| {
223 tracing::error!(?cause, "failed to begin transaction");
224 ProcessFileError::BeginTransaction
225 })?;
226
227 if let Some(creates) = generated_files {
229 for create in creates {
230 GeneratedFile::create(db.deref_mut(), create)
231 .await
232 .map_err(UploadFileError::CreateGeneratedFile)?;
233 }
234 }
235
236 if encrypted {
237 tracing::debug!("marking file as encrypted");
239 file.file = file
240 .file
241 .set_encrypted(db.deref_mut(), true)
242 .await
243 .map_err(|error| {
244 tracing::error!(?error, "failed to set file as encrypted");
245 ProcessFileError::SetEncrypted
246 })?;
247 }
248
249 tracing::debug!("updating file mime type");
251 file.file = file
252 .file
253 .set_mime(db.deref_mut(), mime.to_string())
254 .await
255 .map_err(|error| {
256 tracing::error!(?error, "failed to set file mime");
257 ProcessFileError::SetMime
258 })?;
259
260 db.commit().await.map_err(|error| {
261 tracing::error!(?error, "failed to commit transaction");
262 ProcessFileError::CommitTransaction
263 })?;
264
265 Ok(())
266}