docbox_core/files/
reprocess_octet_stream_files.rs

1//! # Reprocess application/octet-stream
2//!
3//! This is a migration helper script, used to handle the case where file
4//! types were not known at the time of ingest and were taken in as simply
5//! application/octet-stream files.
6//!
7//! This migration takes all of the files matching that mime type and attempts
8//! to infer the file mime type based on its extension and perform the processing
9//! step to generate its processed variants and update the file mime type
10
11use crate::{
12    files::{
13        index_file::store_file_index,
14        upload_file::{UploadFileError, store_generated_files},
15    },
16    utils::file::get_file_name_ext,
17};
18use docbox_database::{
19    DbPool, DbResult,
20    models::{
21        file::{CreateFile, FileWithScope},
22        generated_file::{CreateGeneratedFile, GeneratedFile},
23    },
24};
25use docbox_processing::{ProcessingError, ProcessingIndexMetadata, ProcessingLayer, process_file};
26use docbox_search::TenantSearchIndex;
27use docbox_storage::{StorageLayerError, TenantStorageLayer};
28use futures::{StreamExt, future::BoxFuture};
29use mime::Mime;
30use std::ops::DerefMut;
31use thiserror::Error;
32use tracing::Instrument;
33
34pub async fn reprocess_octet_stream_files(
35    db: &DbPool,
36    search: &TenantSearchIndex,
37    storage: &TenantStorageLayer,
38    processing: &ProcessingLayer,
39) -> DbResult<()> {
40    _ = search.create_index().await;
41
42    let files = get_files(db).await?;
43    let mut skipped = Vec::new();
44    let mut processing_files = Vec::new();
45
46    for file in files {
47        let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
48            let guesses = mime_guess::from_ext(&ext);
49            guesses.first()
50        });
51
52        if let Some(mime) = guessed_mime {
53            processing_files.push((file, mime));
54        } else {
55            skipped.push(file);
56        }
57    }
58
59    let span = tracing::Span::current();
60
61    // Process all the files
62    _ = futures::stream::iter(processing_files)
63        .map(|(file, mime)| -> BoxFuture<'static, ()> {
64            let db = db.clone();
65            let search = search.clone();
66            let storage = storage.clone();
67            let processing = processing.clone();
68            let span = span.clone();
69
70            Box::pin(
71                async move {
72                    tracing::debug!(?file, "stating file");
73                    if let Err(error) =
74                        perform_process_file(db, storage, search, processing, file, mime).await
75                    {
76                        tracing::error!(?error, "failed to migrate file");
77                    };
78                }
79                .instrument(span),
80            )
81        })
82        .buffered(FILE_PROCESS_SIZE)
83        .collect::<Vec<()>>()
84        .await;
85
86    for skipped in skipped {
87        tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
88    }
89
90    Ok(())
91}
92
93/// Size of each page to request from the database
94const DATABASE_PAGE_SIZE: u64 = 1000;
95/// Number of files to process in parallel
96const FILE_PROCESS_SIZE: usize = 50;
97
98pub async fn get_files(db: &DbPool) -> DbResult<Vec<FileWithScope>> {
99    let mut page_index = 0;
100    let mut data = Vec::new();
101
102    loop {
103        let mut files = match docbox_database::models::file::File::all_by_mime(
104            db,
105            "application/octet-stream",
106            page_index * DATABASE_PAGE_SIZE,
107            DATABASE_PAGE_SIZE,
108        )
109        .await
110        {
111            Ok(value) => value,
112            Err(error) => {
113                tracing::error!(?error, ?page_index, "failed to load files page");
114                return Err(error);
115            }
116        };
117
118        let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
119
120        data.append(&mut files);
121
122        if is_end {
123            break;
124        }
125
126        page_index += 1;
127    }
128
129    Ok(data)
130}
131
132#[derive(Debug, Error)]
133pub enum ProcessFileError {
134    #[error("failed to begin transaction")]
135    BeginTransaction,
136
137    #[error("failed to commit transaction")]
138    CommitTransaction,
139
140    #[error(transparent)]
141    Storage(#[from] StorageLayerError),
142
143    #[error(transparent)]
144    Process(#[from] ProcessingError),
145
146    #[error(transparent)]
147    UploadFile(#[from] UploadFileError),
148
149    #[error("failed to mark file as encrypted")]
150    SetEncrypted,
151
152    #[error("failed to update file mime")]
153    SetMime,
154}
155
156/// TODO: Handle rollback for failure
157pub async fn perform_process_file(
158    db: DbPool,
159    storage: TenantStorageLayer,
160    search: TenantSearchIndex,
161    processing: ProcessingLayer,
162    mut file: FileWithScope,
163    mime: Mime,
164) -> Result<(), ProcessFileError> {
165    let bytes = storage
166        .get_file(&file.file.file_key)
167        .await
168        .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?
169        .collect_bytes()
170        .await
171        .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?;
172
173    let processing_output = process_file(&None, &processing, bytes, &mime).await?;
174
175    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
176
177    let file_in = &file.file;
178
179    let created_file = CreateFile {
180        id: file_in.id,
181        parent_id: file_in.parent_id,
182        name: file_in.name.clone(),
183        mime: file_in.mime.to_string(),
184        file_key: file_in.file_key.to_owned(),
185        folder_id: file_in.folder_id,
186        hash: file_in.hash.clone(),
187        size: file_in.size,
188        created_by: file_in.created_by.clone(),
189        created_at: file_in.created_at,
190        encrypted: file_in.encrypted,
191    };
192
193    let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
194
195    // Get file encryption state
196    let encrypted = processing_output
197        .as_ref()
198        .map(|output| output.encrypted)
199        .unwrap_or_default();
200
201    if let Some(processing_output) = processing_output {
202        index_metadata = processing_output.index_metadata;
203
204        let mut s3_upload_keys = Vec::new();
205
206        tracing::debug!("uploading generated files");
207        let prepared_files = store_generated_files(
208            &storage,
209            &created_file,
210            &mut s3_upload_keys,
211            processing_output.upload_queue,
212        )
213        .await?;
214        generated_files = Some(prepared_files);
215    }
216
217    // Index the file in the search index
218    tracing::debug!("indexing file contents");
219    store_file_index(&search, &created_file, &file.scope, index_metadata).await?;
220
221    // Start a database transaction
222    let mut db = db.begin().await.map_err(|cause| {
223        tracing::error!(?cause, "failed to begin transaction");
224        ProcessFileError::BeginTransaction
225    })?;
226
227    // Create generated file records
228    if let Some(creates) = generated_files {
229        for create in creates {
230            GeneratedFile::create(db.deref_mut(), create)
231                .await
232                .map_err(UploadFileError::CreateGeneratedFile)?;
233        }
234    }
235
236    if encrypted {
237        // Mark the file as encrypted
238        tracing::debug!("marking file as encrypted");
239        file.file = file
240            .file
241            .set_encrypted(db.deref_mut(), true)
242            .await
243            .map_err(|error| {
244                tracing::error!(?error, "failed to set file as encrypted");
245                ProcessFileError::SetEncrypted
246            })?;
247    }
248
249    // Update the file mime type
250    tracing::debug!("updating file mime type");
251    file.file = file
252        .file
253        .set_mime(db.deref_mut(), mime.to_string())
254        .await
255        .map_err(|error| {
256            tracing::error!(?error, "failed to set file mime");
257            ProcessFileError::SetMime
258        })?;
259
260    db.commit().await.map_err(|error| {
261        tracing::error!(?error, "failed to commit transaction");
262        ProcessFileError::CommitTransaction
263    })?;
264
265    Ok(())
266}