docbox_core/files/
reprocess_octet_stream_files.rs

1use crate::{
2    files::{index_file::store_file_index, upload_file::store_generated_files},
3    processing::{ProcessingIndexMetadata, ProcessingLayer, process_file},
4    utils::file::get_file_name_ext,
5};
6use anyhow::Context;
7use docbox_database::{DbPool, models::file::FileWithScope};
8use docbox_search::TenantSearchIndex;
9use docbox_storage::TenantStorageLayer;
10use futures::{StreamExt, future::BoxFuture};
11use mime::Mime;
12use std::ops::DerefMut;
13use tracing::Instrument;
14
15pub async fn reprocess_octet_stream_files(
16    db: &DbPool,
17    search: &TenantSearchIndex,
18    storage: &TenantStorageLayer,
19    processing: &ProcessingLayer,
20) -> anyhow::Result<()> {
21    _ = search.create_index().await;
22
23    let files = get_files(db).await?;
24    let mut skipped = Vec::new();
25    let mut processing_files = Vec::new();
26
27    for file in files {
28        let guessed_mime = get_file_name_ext(&file.file.name).and_then(|ext| {
29            let guesses = mime_guess::from_ext(&ext);
30            guesses.first()
31        });
32
33        if let Some(mime) = guessed_mime {
34            processing_files.push((file, mime));
35        } else {
36            skipped.push(file);
37        }
38    }
39
40    let span = tracing::Span::current();
41
42    // Process all the files
43    _ = futures::stream::iter(processing_files)
44        .map(|(file, mime)| -> BoxFuture<'static, ()> {
45            let db = db.clone();
46            let search = search.clone();
47            let storage = storage.clone();
48            let processing = processing.clone();
49            let span = span.clone();
50
51            Box::pin(
52                async move {
53                    tracing::debug!(?file, "stating file");
54                    if let Err(error) =
55                        perform_process_file(db, storage, search, processing, file, mime).await
56                    {
57                        tracing::error!(?error, "failed to migrate file");
58                    };
59                }
60                .instrument(span),
61            )
62        })
63        .buffered(FILE_PROCESS_SIZE)
64        .collect::<Vec<()>>()
65        .await;
66
67    for skipped in skipped {
68        tracing::debug!(file_id = %skipped.file.id, file_name = %skipped.file.name, "skipped file");
69    }
70
71    Ok(())
72}
73
74/// Size of each page to request from the database
75const DATABASE_PAGE_SIZE: u64 = 1000;
76/// Number of files to process in parallel
77const FILE_PROCESS_SIZE: usize = 50;
78
79pub async fn get_files(db: &DbPool) -> anyhow::Result<Vec<FileWithScope>> {
80    let mut page_index = 0;
81    let mut data = Vec::new();
82
83    loop {
84        let mut files = docbox_database::models::file::File::all_by_mime(
85            db,
86            "application/octet-stream",
87            page_index * DATABASE_PAGE_SIZE,
88            DATABASE_PAGE_SIZE,
89        )
90        .await
91        .with_context(|| format!("failed to load files page: {page_index}"))?;
92
93        let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
94
95        data.append(&mut files);
96
97        if is_end {
98            break;
99        }
100
101        page_index += 1;
102    }
103
104    Ok(data)
105}
106
107pub async fn perform_process_file(
108    db: DbPool,
109    storage: TenantStorageLayer,
110    search: TenantSearchIndex,
111    processing: ProcessingLayer,
112    mut file: FileWithScope,
113    mime: Mime,
114) -> anyhow::Result<()> {
115    // Start a database transaction
116    let mut db = db.begin().await.map_err(|cause| {
117        tracing::error!(?cause, "failed to begin transaction");
118        anyhow::anyhow!("failed to begin transaction")
119    })?;
120
121    let bytes = storage
122        .get_file(&file.file.file_key)
123        .await?
124        .collect_bytes()
125        .await?;
126
127    let processing_output = process_file(&None, &processing, bytes, &mime).await?;
128
129    let mut index_metadata: Option<ProcessingIndexMetadata> = None;
130
131    if let Some(processing_output) = processing_output {
132        // Store the encryption state for encrypted files
133        if processing_output.encrypted {
134            tracing::debug!("marking file as encrypted");
135            file.file = file.file.set_encrypted(db.deref_mut(), true).await?;
136        }
137
138        index_metadata = processing_output.index_metadata;
139
140        let mut s3_upload_keys = Vec::new();
141
142        tracing::debug!("uploading generated files");
143        store_generated_files(
144            &mut db,
145            &storage,
146            &file.file,
147            &mut s3_upload_keys,
148            processing_output.upload_queue,
149        )
150        .await?;
151    }
152
153    // Index the file in the search index
154    tracing::debug!("indexing file contents");
155    store_file_index(&search, &file.file, &file.scope, index_metadata).await?;
156
157    file.file = file.file.set_mime(db.deref_mut(), mime.to_string()).await?;
158
159    db.commit().await?;
160
161    Ok(())
162}