Skip to main content

docbox_core/files/
reprocess_octet_stream_files.rs

1//! # Reprocess application/octet-stream
2//!
3//! This is a migration helper script, used to handle the case where file
4//! types were not known at the time of ingest and were taken in as simply
5//! application/octet-stream files.
6//!
7//! This migration takes all of the files matching that mime type and attempts
8//! to infer the file mime type based on its extension and perform the processing
9//! step to generate its processed variants and update the file mime type
10
11use crate::{
12    files::{
13        index_file::store_file_index,
14        upload_file::{UploadFileError, store_generated_files},
15    },
16    utils::{file::get_file_name_ext, timing::handle_slow_future},
17};
18use docbox_database::{
19    DbPool, DbResult,
20    models::{
21        file::{CreateFile, FileWithScope},
22        generated_file::{CreateGeneratedFile, GeneratedFile},
23    },
24};
25use docbox_processing::{
26    DEFAULT_PROCESS_TIMEOUT, ProcessingError, ProcessingIndexMetadata, ProcessingLayer,
27    process_file,
28};
29use docbox_search::TenantSearchIndex;
30use docbox_storage::{StorageLayer, StorageLayerError};
31use futures::{StreamExt, future::BoxFuture};
32use mime::Mime;
33use std::{ops::DerefMut, time::Duration};
34use thiserror::Error;
35use tokio::time::timeout;
36use tracing::Instrument;
37
38pub async fn reprocess_octet_stream_files(
39    db: &DbPool,
40    search: &TenantSearchIndex,
41    storage: &StorageLayer,
42    processing: &ProcessingLayer,
43) -> DbResult<()> {
44    _ = search.create_index().await;
45
46    let files = get_files(db).await?;
47    let mut skipped = Vec::new();
48    let mut processing_files = Vec::new();
49
50    for file in files {
51        let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
52            let guesses = mime_guess::from_ext(&ext);
53            guesses.first()
54        });
55
56        if let Some(mime) = guessed_mime {
57            processing_files.push((file, mime));
58        } else {
59            skipped.push(file);
60        }
61    }
62
63    let span = tracing::Span::current();
64
65    // Process all the files
66    _ = futures::stream::iter(processing_files)
67        .map(|(file, mime)| -> BoxFuture<'static, ()> {
68            let db = db.clone();
69            let search = search.clone();
70            let storage = storage.clone();
71            let processing = processing.clone();
72            let span = span.clone();
73
74            Box::pin(
75                async move {
76                    tracing::debug!(?file, "stating file");
77                    if let Err(error) =
78                        perform_process_file(db, storage, search, processing, file, mime).await
79                    {
80                        tracing::error!(?error, "failed to migrate file");
81                    };
82                }
83                .instrument(span),
84            )
85        })
86        .buffered(FILE_PROCESS_SIZE)
87        .collect::<Vec<()>>()
88        .await;
89
90    for skipped in skipped {
91        tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
92    }
93
94    Ok(())
95}
96
97/// Size of each page to request from the database
98const DATABASE_PAGE_SIZE: u64 = 1000;
99/// Number of files to process in parallel
100const FILE_PROCESS_SIZE: usize = 50;
101
102pub async fn get_files(db: &DbPool) -> DbResult<Vec<FileWithScope>> {
103    let mut page_index = 0;
104    let mut data = Vec::new();
105
106    loop {
107        let mut files = match docbox_database::models::file::File::all_by_mime(
108            db,
109            "application/octet-stream",
110            page_index * DATABASE_PAGE_SIZE,
111            DATABASE_PAGE_SIZE,
112        )
113        .await
114        {
115            Ok(value) => value,
116            Err(error) => {
117                tracing::error!(?error, ?page_index, "failed to load files page");
118                return Err(error);
119            }
120        };
121
122        let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
123
124        data.append(&mut files);
125
126        if is_end {
127            break;
128        }
129
130        page_index += 1;
131    }
132
133    Ok(data)
134}
135
136#[derive(Debug, Error)]
137pub enum ProcessFileError {
138    #[error("failed to begin transaction")]
139    BeginTransaction,
140
141    #[error("failed to commit transaction")]
142    CommitTransaction,
143
144    #[error(transparent)]
145    Storage(#[from] StorageLayerError),
146
147    #[error(transparent)]
148    Process(#[from] ProcessingError),
149
150    #[error(transparent)]
151    UploadFile(#[from] UploadFileError),
152
153    #[error("failed to mark file as encrypted")]
154    SetEncrypted,
155
156    #[error("failed to update file mime")]
157    SetMime,
158
159    #[error("timeout occurred while processing file")]
160    ConvertTimeout,
161}
162
163/// TODO: Handle rollback for failure
164pub async fn perform_process_file(
165    db: DbPool,
166    storage: StorageLayer,
167    search: TenantSearchIndex,
168    processing: ProcessingLayer,
169    mut file: FileWithScope,
170    mime: Mime,
171) -> Result<(), ProcessFileError> {
172    let bytes = storage
173        .get_file(&file.file.file_key)
174        .await
175        .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?
176        .collect_bytes()
177        .await
178        .inspect_err(|error| tracing::error!(?error, "Failed to get storage file"))?;
179
180    let process_future = process_file(&None, &processing, bytes, &mime);
181
182    let process_timeout = processing
183        .config
184        .process_timeout
185        .unwrap_or(DEFAULT_PROCESS_TIMEOUT);
186
187    // Apply a 120s timeout to file processing, we can assume it has definitely failed if its taken that long
188    let process_future = timeout(process_timeout, process_future);
189
190    // Apply a slow future warning to the processing future
191    let processing_output = handle_slow_future(process_future, Duration::from_secs(25), || {
192        tracing::warn!(
193            ?file,
194            "file upload processing has taken over 25s to complete"
195        )
196    })
197    .await
198    .map_err(|_| ProcessFileError::ConvertTimeout)??;
199
200    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
201
202    let file_in = &file.file;
203
204    let created_file = CreateFile {
205        id: file_in.id,
206        parent_id: file_in.parent_id,
207        name: file_in.name.clone(),
208        mime: file_in.mime.to_string(),
209        file_key: file_in.file_key.to_owned(),
210        folder_id: file_in.folder_id,
211        hash: file_in.hash.clone(),
212        size: file_in.size,
213        created_by: file_in.created_by.clone(),
214        created_at: file_in.created_at,
215        encrypted: file_in.encrypted,
216    };
217
218    let mut generated_files: Option<Vec<CreateGeneratedFile>> = None;
219
220    // Get file encryption state
221    let encrypted = processing_output
222        .as_ref()
223        .map(|output| output.encrypted)
224        .unwrap_or_default();
225
226    if let Some(processing_output) = processing_output {
227        index_metadata = processing_output.index_metadata;
228
229        let mut s3_upload_keys = Vec::new();
230
231        tracing::debug!("uploading generated files");
232        let prepared_files = store_generated_files(
233            &storage,
234            &created_file,
235            &mut s3_upload_keys,
236            processing_output.upload_queue,
237        )
238        .await?;
239        generated_files = Some(prepared_files);
240    }
241
242    // Index the file in the search index
243    tracing::debug!("indexing file contents");
244    store_file_index(&search, &created_file, &file.scope, index_metadata).await?;
245
246    // Start a database transaction
247    let mut db = db.begin().await.map_err(|error| {
248        tracing::error!(?error, "failed to begin transaction");
249        ProcessFileError::BeginTransaction
250    })?;
251
252    // Create generated file records
253    if let Some(creates) = generated_files {
254        for create in creates {
255            GeneratedFile::create(db.deref_mut(), create)
256                .await
257                .map_err(UploadFileError::CreateGeneratedFile)?;
258        }
259    }
260
261    if encrypted {
262        // Mark the file as encrypted
263        tracing::debug!("marking file as encrypted");
264        file.file = file
265            .file
266            .set_encrypted(db.deref_mut(), true)
267            .await
268            .map_err(|error| {
269                tracing::error!(?error, "failed to set file as encrypted");
270                ProcessFileError::SetEncrypted
271            })?;
272    }
273
274    // Update the file mime type
275    tracing::debug!("updating file mime type");
276    file.file = file
277        .file
278        .set_mime(db.deref_mut(), mime.to_string())
279        .await
280        .map_err(|error| {
281            tracing::error!(?error, "failed to set file mime");
282            ProcessFileError::SetMime
283        })?;
284
285    db.commit().await.map_err(|error| {
286        tracing::error!(?error, "failed to commit transaction");
287        ProcessFileError::CommitTransaction
288    })?;
289
290    Ok(())
291}