Skip to main content

docbox_core/tenant/
rebuild_tenant_index.rs

1use docbox_database::{
2    DbErr, DbPool, DbResult,
3    models::{
4        document_box::DocumentBoxScopeRaw,
5        file::{File, FileWithScope},
6        folder::Folder,
7        generated_file::{GeneratedFile, GeneratedFileType},
8        link::{Link, LinkWithScope},
9    },
10};
11use docbox_processing::{office::is_pdf_compatible, pdf::split_pdf_text_pages};
12use docbox_search::{
13    SearchError, TenantSearchIndex,
14    models::{DocumentPage, SearchIndexData, SearchIndexType},
15};
16use docbox_storage::{StorageLayerError, TenantStorageLayer};
17use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
18use std::{str::FromStr, string::FromUtf8Error};
19use thiserror::Error;
20
21#[derive(Debug, Error)]
22pub enum RebuildTenantIndexError {
23    #[error(transparent)]
24    Database(#[from] DbErr),
25    #[error(transparent)]
26    Search(#[from] SearchError),
27    #[error(transparent)]
28    WriteIndexData(std::io::Error),
29    #[error(transparent)]
30    SerializeIndexData(serde_json::Error),
31}
32
33/// Rebuild the search index for the tenant based on that
34/// data stored in the database and the content stored in S3
35pub async fn rebuild_tenant_index(
36    db: &DbPool,
37    search: &TenantSearchIndex,
38    storage: &TenantStorageLayer,
39) -> Result<(), RebuildTenantIndexError> {
40    tracing::info!("started re-indexing tenant");
41
42    let index_data = recreate_search_index_data(db, storage).await?;
43    tracing::debug!("all data loaded: {}", index_data.len());
44
45    {
46        let serialized = serde_json::to_string(&index_data)
47            .map_err(RebuildTenantIndexError::SerializeIndexData)?;
48        tokio::fs::write("index_data.json", serialized)
49            .await
50            .map_err(RebuildTenantIndexError::WriteIndexData)?;
51    }
52
53    apply_rebuilt_tenant_index(search, index_data).await?;
54
55    Ok(())
56}
57
58/// Apply the rebuilt tenant index
59pub async fn apply_rebuilt_tenant_index(
60    search: &TenantSearchIndex,
61    data: Vec<SearchIndexData>,
62) -> Result<(), SearchError> {
63    // Ensure the index exists
64    _ = search.create_index().await;
65
66    let mut iter = data.into_iter();
67
68    loop {
69        let chunk: Vec<_> = iter.by_ref().take(INDEX_CHUNK_SIZE).collect();
70        if chunk.is_empty() {
71            break;
72        }
73        search.add_data(chunk).await?;
74    }
75
76    Ok(())
77}
78
79/// Rebuild the entire tenant search index
80pub async fn recreate_search_index_data(
81    db: &DbPool,
82    storage: &TenantStorageLayer,
83) -> DbResult<Vec<SearchIndexData>> {
84    let links = create_links_index_data(db).await?;
85    let folders = create_folders_index_data(db).await?;
86    let files = create_files_index_data(db, storage).await?;
87
88    let index_data = links
89        .into_iter()
90        .chain(folders.into_iter())
91        .chain(files.into_iter())
92        .collect::<Vec<SearchIndexData>>();
93
94    Ok(index_data)
95}
96
97const INDEX_CHUNK_SIZE: usize = 5000;
98/// Size of each page to request from the database
99const DATABASE_PAGE_SIZE: u64 = 1000;
100/// Number of files to process in parallel
101const FILE_PROCESS_SIZE: usize = 500;
102
103/// Collects all stored links and creates the [SearchIndexData] for them
104pub async fn create_links_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
105    let mut page_index = 0;
106    let mut data = Vec::new();
107
108    loop {
109        let links = match Link::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
110            Ok(value) => value,
111            Err(error) => {
112                tracing::error!(?error, ?page_index, "failed to load links page");
113                return Err(error);
114            }
115        };
116        let is_end = (links.len() as u64) < DATABASE_PAGE_SIZE;
117
118        for LinkWithScope { link, scope } in links {
119            data.push(SearchIndexData {
120                ty: SearchIndexType::Link,
121                item_id: link.id,
122                folder_id: link.folder_id,
123                name: link.name.to_string(),
124                mime: None,
125                content: Some(link.value.clone()),
126                pages: None,
127                created_at: link.created_at,
128                created_by: link.created_by.clone(),
129                document_box: scope.clone(),
130            })
131        }
132
133        if is_end {
134            break;
135        }
136
137        page_index += 1;
138    }
139
140    Ok(data)
141}
142
143/// Collects all stored non-root folders and creates the [SearchIndexData] for them
144pub async fn create_folders_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
145    let mut page_index = 0;
146    let mut data = Vec::new();
147
148    loop {
149        let folders =
150            match Folder::all_non_root(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE)
151                .await
152            {
153                Ok(value) => value,
154                Err(error) => {
155                    tracing::error!(?error, ?page_index, "failed to load folders page");
156                    return Err(error);
157                }
158            };
159        let is_end = (folders.len() as u64) < DATABASE_PAGE_SIZE;
160
161        for folder in folders {
162            let folder_id = match folder.folder_id {
163                Some(value) => value,
164                // Root folders are not included in the index
165                None => continue,
166            };
167
168            data.push(SearchIndexData {
169                ty: SearchIndexType::Folder,
170                item_id: folder.id,
171                folder_id,
172                name: folder.name.to_string(),
173                mime: None,
174                content: None,
175                pages: None,
176                created_at: folder.created_at,
177                created_by: folder.created_by.clone(),
178                document_box: folder.document_box.clone(),
179            })
180        }
181
182        if is_end {
183            break;
184        }
185
186        page_index += 1;
187    }
188
189    Ok(data)
190}
191
192pub async fn create_files_index_data(
193    db: &DbPool,
194    storage: &TenantStorageLayer,
195) -> DbResult<Vec<SearchIndexData>> {
196    let mut page_index = 0;
197    let mut data = Vec::new();
198    let mut files_for_processing = Vec::new();
199
200    loop {
201        let files = match File::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
202            Ok(value) => value,
203            Err(error) => {
204                tracing::error!(?error, ?page_index, "failed to load files page");
205                return Err(error);
206            }
207        };
208        let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
209
210        for FileWithScope { file, scope } in files {
211            let mime = match mime::Mime::from_str(&file.mime) {
212                Ok(value) => value,
213                Err(error) => {
214                    tracing::error!(?error, ?file, "file has an invalid mime type");
215                    continue;
216                }
217            };
218
219            if file.encrypted || !is_pdf_compatible(&mime) {
220                // These files don't require any processing
221                data.push(SearchIndexData {
222                    ty: SearchIndexType::File,
223                    item_id: file.id,
224                    folder_id: file.folder_id,
225                    name: file.name,
226                    mime: Some(file.mime),
227                    content: None,
228                    created_at: file.created_at,
229                    created_by: file.created_by,
230                    document_box: scope,
231                    pages: None,
232                })
233            } else {
234                // File needs additional processing
235                files_for_processing.push((file, scope));
236            }
237        }
238
239        if is_end {
240            break;
241        }
242
243        page_index += 1;
244    }
245
246    for chunk in files_for_processing.chunks(FILE_PROCESS_SIZE) {
247        let mut results: Vec<SearchIndexData> = chunk
248            .iter()
249            .map(|(file, scope)| -> BoxFuture<'_, SearchIndexData> {
250                Box::pin(async move {
251                    let pages =
252                        match try_pdf_compatible_document_pages(db, storage, scope, file).await {
253                            Ok(value) => value,
254                            Err(error) => {
255                                tracing::error!(?error, "failed to re-create pdf index data pages");
256                                return SearchIndexData {
257                                    ty: SearchIndexType::File,
258                                    item_id: file.id,
259                                    folder_id: file.folder_id,
260                                    name: file.name.clone(),
261                                    mime: Some(file.mime.clone()),
262                                    content: None,
263                                    created_at: file.created_at,
264                                    created_by: file.created_by.clone(),
265                                    document_box: scope.clone(),
266                                    pages: None,
267                                };
268                            }
269                        };
270
271                    SearchIndexData {
272                        ty: SearchIndexType::File,
273                        item_id: file.id,
274                        folder_id: file.folder_id,
275                        name: file.name.clone(),
276                        mime: Some(file.mime.clone()),
277                        content: None,
278                        created_at: file.created_at,
279                        created_by: file.created_by.clone(),
280                        document_box: scope.clone(),
281                        pages: Some(pages),
282                    }
283                })
284            })
285            .collect::<FuturesUnordered<BoxFuture<'_, SearchIndexData>>>()
286            .collect()
287            .await;
288
289        data.append(&mut results);
290    }
291
292    Ok(data)
293}
294
295#[derive(Debug, Error)]
296pub enum PdfCompatibleRebuildError {
297    #[error("file is missing text content")]
298    MissingTextContent,
299
300    #[error(transparent)]
301    Database(#[from] DbErr),
302
303    #[error(transparent)]
304    Storage(#[from] StorageLayerError),
305
306    #[error(transparent)]
307    InvalidTextContent(#[from] FromUtf8Error),
308}
309
310/// Attempts to obtain the [DocumentPage] collection for a PDF compatible file
311pub async fn try_pdf_compatible_document_pages(
312    db: &DbPool,
313    storage: &TenantStorageLayer,
314    scope: &DocumentBoxScopeRaw,
315    file: &File,
316) -> Result<Vec<DocumentPage>, PdfCompatibleRebuildError> {
317    // Load the extracted text content for the file
318    let text_file = GeneratedFile::find(db, scope, file.id, GeneratedFileType::TextContent)
319        .await?
320        .ok_or(PdfCompatibleRebuildError::MissingTextContent)?;
321
322    tracing::debug!(?text_file, "loaded file generated text content");
323
324    // Read the PDF file from S3
325    let text_content = storage
326        .get_file(&text_file.file_key)
327        .await?
328        .collect_bytes()
329        .await
330        .inspect_err(|error| {
331            tracing::error!(?error, "failed to load pdf bytes from s3 stream");
332        })?;
333
334    // Load the text content
335    let text_content = text_content.to_vec();
336    let text_content =
337        String::from_utf8(text_content).map_err(PdfCompatibleRebuildError::InvalidTextContent)?;
338
339    // Split the content back into pages
340    let pages = split_pdf_text_pages(&text_content);
341
342    // Create the pages data
343    let pages = pages
344        .into_iter()
345        .enumerate()
346        .map(|(page, content)| DocumentPage {
347            page: page as u64,
348            content: content.to_string(),
349        })
350        .collect();
351
352    Ok(pages)
353}