docbox_core/files/
reprocess_octet_stream_files.rs1use crate::{
2 files::{index_file::store_file_index, upload_file::store_generated_files},
3 processing::{ProcessingIndexMetadata, ProcessingLayer, process_file},
4 utils::file::get_file_name_ext,
5};
6use anyhow::Context;
7use docbox_database::{DbPool, models::file::FileWithScope};
8use docbox_search::TenantSearchIndex;
9use docbox_storage::TenantStorageLayer;
10use futures::{StreamExt, future::BoxFuture};
11use mime::Mime;
12use std::ops::DerefMut;
13use tracing::Instrument;
14
15pub async fn reprocess_octet_stream_files(
16 db: &DbPool,
17 search: &TenantSearchIndex,
18 storage: &TenantStorageLayer,
19 processing: &ProcessingLayer,
20) -> anyhow::Result<()> {
21 _ = search.create_index().await;
22
23 let files = get_files(db).await?;
24 let mut skipped = Vec::new();
25 let mut processing_files = Vec::new();
26
27 for file in files {
28 let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
29 let guesses = mime_guess::from_ext(&ext);
30 guesses.first()
31 });
32
33 if let Some(mime) = guessed_mime {
34 processing_files.push((file, mime));
35 } else {
36 skipped.push(file);
37 }
38 }
39
40 let span = tracing::Span::current();
41
42 _ = futures::stream::iter(processing_files)
44 .map(|(file, mime)| -> BoxFuture<'static, ()> {
45 let db = db.clone();
46 let search = search.clone();
47 let storage = storage.clone();
48 let processing = processing.clone();
49 let span = span.clone();
50
51 Box::pin(
52 async move {
53 tracing::debug!(?file, "stating file");
54 if let Err(error) =
55 perform_process_file(db, storage, search, processing, file, mime).await
56 {
57 tracing::error!(?error, "failed to migrate file");
58 };
59 }
60 .instrument(span),
61 )
62 })
63 .buffered(FILE_PROCESS_SIZE)
64 .collect::<Vec<()>>()
65 .await;
66
67 for skipped in skipped {
68 tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
69 }
70
71 Ok(())
72}
73
74const DATABASE_PAGE_SIZE: u64 = 1000;
76const FILE_PROCESS_SIZE: usize = 50;
78
79pub async fn get_files(db: &DbPool) -> anyhow::Result<Vec<FileWithScope>> {
80 let mut page_index = 0;
81 let mut data = Vec::new();
82
83 loop {
84 let mut files = docbox_database::models::file::File::all_by_mime(
85 db,
86 "application/octet-stream",
87 page_index * DATABASE_PAGE_SIZE,
88 DATABASE_PAGE_SIZE,
89 )
90 .await
91 .with_context(|| format!("failed to load files page: {page_index}"))?;
92
93 let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
94
95 data.append(&mut files);
96
97 if is_end {
98 break;
99 }
100
101 page_index += 1;
102 }
103
104 Ok(data)
105}
106
107pub async fn perform_process_file(
108 db: DbPool,
109 storage: TenantStorageLayer,
110 search: TenantSearchIndex,
111 processing: ProcessingLayer,
112 mut file: FileWithScope,
113 mime: Mime,
114) -> anyhow::Result<()> {
115 let mut db = db.begin().await.map_err(|cause| {
117 tracing::error!(?cause, "failed to begin transaction");
118 anyhow::anyhow!("failed to begin transaction")
119 })?;
120
121 let bytes = storage
122 .get_file(&file.file.file_key)
123 .await?
124 .collect_bytes()
125 .await?;
126
127 let processing_output = process_file(&None, &processing, bytes, &mime).await?;
128
129 let mut index_metadata: Option<ProcessingIndexMetadata> = None;
130
131 if let Some(processing_output) = processing_output {
132 if processing_output.encrypted {
134 tracing::debug!("marking file as encrypted");
135 file.file = file.file.set_encrypted(db.deref_mut(), true).await?;
136 }
137
138 index_metadata = processing_output.index_metadata;
139
140 let mut s3_upload_keys = Vec::new();
141
142 tracing::debug!("uploading generated files");
143 store_generated_files(
144 &mut db,
145 &storage,
146 &file.file,
147 &mut s3_upload_keys,
148 processing_output.upload_queue,
149 )
150 .await?;
151 }
152
153 tracing::debug!("indexing file contents");
155 store_file_index(&search, &file.file, &file.scope, index_metadata).await?;
156
157 file.file = file.file.set_mime(db.deref_mut(), mime.to_string()).await?;
158
159 db.commit().await?;
160
161 Ok(())
162}