docbox_core/tenant/
rebuild_tenant_index.rs

1use docbox_database::{
2    DbErr, DbPool, DbResult,
3    models::{
4        document_box::DocumentBoxScopeRaw,
5        file::{File, FileWithScope},
6        folder::Folder,
7        generated_file::{GeneratedFile, GeneratedFileType},
8        link::{Link, LinkWithScope},
9    },
10};
11use docbox_processing::{office::is_pdf_compatible, pdf::split_pdf_text_pages};
12use docbox_search::{
13    SearchError, TenantSearchIndex,
14    models::{DocumentPage, SearchIndexData, SearchIndexType},
15};
16use docbox_storage::{StorageLayerError, TenantStorageLayer};
17use futures::{StreamExt, future::LocalBoxFuture, stream::FuturesUnordered};
18use itertools::Itertools;
19use std::{str::FromStr, string::FromUtf8Error};
20use thiserror::Error;
21
22#[derive(Debug, Error)]
23pub enum RebuildTenantIndexError {
24    #[error(transparent)]
25    Database(#[from] DbErr),
26    #[error(transparent)]
27    Search(#[from] SearchError),
28    #[error(transparent)]
29    WriteIndexData(std::io::Error),
30    #[error(transparent)]
31    SerializeIndexData(serde_json::Error),
32}
33
34/// Rebuild the search index for the tenant based on that
35/// data stored in the database and the content stored in S3
36pub async fn rebuild_tenant_index(
37    db: &DbPool,
38    search: &TenantSearchIndex,
39    storage: &TenantStorageLayer,
40) -> Result<(), RebuildTenantIndexError> {
41    tracing::info!("started re-indexing tenant");
42
43    let index_data = recreate_search_index_data(db, storage).await?;
44    tracing::debug!("all data loaded: {}", index_data.len());
45
46    {
47        let serialized = serde_json::to_string(&index_data)
48            .map_err(RebuildTenantIndexError::SerializeIndexData)?;
49        tokio::fs::write("index_data.json", serialized)
50            .await
51            .map_err(RebuildTenantIndexError::WriteIndexData)?;
52    }
53
54    apply_rebuilt_tenant_index(search, index_data).await?;
55
56    Ok(())
57}
58
59/// Apply the rebuilt tenant index
60pub async fn apply_rebuilt_tenant_index(
61    search: &TenantSearchIndex,
62    data: Vec<SearchIndexData>,
63) -> Result<(), SearchError> {
64    // Ensure the index exists
65    _ = search.create_index().await;
66
67    let index_data_chunks = data.into_iter().chunks(INDEX_CHUNK_SIZE);
68    let index_data_chunks = index_data_chunks.into_iter();
69
70    for data in index_data_chunks {
71        let chunk = data.collect::<Vec<_>>();
72        search.add_data(chunk).await?;
73    }
74
75    Ok(())
76}
77
78/// Rebuild the entire tenant search index
79pub async fn recreate_search_index_data(
80    db: &DbPool,
81    storage: &TenantStorageLayer,
82) -> DbResult<Vec<SearchIndexData>> {
83    let links = create_links_index_data(db).await?;
84    let folders = create_folders_index_data(db).await?;
85    let files = create_files_index_data(db, storage).await?;
86
87    let index_data = links
88        .into_iter()
89        .chain(folders.into_iter())
90        .chain(files.into_iter())
91        .collect::<Vec<SearchIndexData>>();
92
93    Ok(index_data)
94}
95
96const INDEX_CHUNK_SIZE: usize = 5000;
97/// Size of each page to request from the database
98const DATABASE_PAGE_SIZE: u64 = 1000;
99/// Number of files to process in parallel
100const FILE_PROCESS_SIZE: usize = 500;
101
102/// Collects all stored links and creates the [SearchIndexData] for them
103pub async fn create_links_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
104    let mut page_index = 0;
105    let mut data = Vec::new();
106
107    loop {
108        let links = match Link::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
109            Ok(value) => value,
110            Err(error) => {
111                tracing::error!(?error, ?page_index, "failed to load links page");
112                return Err(error);
113            }
114        };
115        let is_end = (links.len() as u64) < DATABASE_PAGE_SIZE;
116
117        for LinkWithScope { link, scope } in links {
118            data.push(SearchIndexData {
119                ty: SearchIndexType::Link,
120                item_id: link.id,
121                folder_id: link.folder_id,
122                name: link.name.to_string(),
123                mime: None,
124                content: Some(link.value.clone()),
125                pages: None,
126                created_at: link.created_at,
127                created_by: link.created_by.clone(),
128                document_box: scope.clone(),
129            })
130        }
131
132        if is_end {
133            break;
134        }
135
136        page_index += 1;
137    }
138
139    Ok(data)
140}
141
142/// Collects all stored non-root folders and creates the [SearchIndexData] for them
143pub async fn create_folders_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
144    let mut page_index = 0;
145    let mut data = Vec::new();
146
147    loop {
148        let folders =
149            match Folder::all_non_root(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE)
150                .await
151            {
152                Ok(value) => value,
153                Err(error) => {
154                    tracing::error!(?error, ?page_index, "failed to load folders page");
155                    return Err(error);
156                }
157            };
158        let is_end = (folders.len() as u64) < DATABASE_PAGE_SIZE;
159
160        for folder in folders {
161            let folder_id = match folder.folder_id {
162                Some(value) => value,
163                // Root folders are not included in the index
164                None => continue,
165            };
166
167            data.push(SearchIndexData {
168                ty: SearchIndexType::Folder,
169                item_id: folder.id,
170                folder_id,
171                name: folder.name.to_string(),
172                mime: None,
173                content: None,
174                pages: None,
175                created_at: folder.created_at,
176                created_by: folder.created_by.clone(),
177                document_box: folder.document_box.clone(),
178            })
179        }
180
181        if is_end {
182            break;
183        }
184
185        page_index += 1;
186    }
187
188    Ok(data)
189}
190
191pub async fn create_files_index_data(
192    db: &DbPool,
193    storage: &TenantStorageLayer,
194) -> DbResult<Vec<SearchIndexData>> {
195    let mut page_index = 0;
196    let mut data = Vec::new();
197    let mut files_for_processing = Vec::new();
198
199    loop {
200        let files = match File::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
201            Ok(value) => value,
202            Err(error) => {
203                tracing::error!(?error, ?page_index, "failed to load files page");
204                return Err(error);
205            }
206        };
207        let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
208
209        for FileWithScope { file, scope } in files {
210            let mime = match mime::Mime::from_str(&file.mime) {
211                Ok(value) => value,
212                Err(error) => {
213                    tracing::error!(?error, ?file, "file has an invalid mime type");
214                    continue;
215                }
216            };
217
218            if file.encrypted || !is_pdf_compatible(&mime) {
219                // These files don't require any processing
220                data.push(SearchIndexData {
221                    ty: SearchIndexType::File,
222                    item_id: file.id,
223                    folder_id: file.folder_id,
224                    name: file.name,
225                    mime: Some(file.mime),
226                    content: None,
227                    created_at: file.created_at,
228                    created_by: file.created_by,
229                    document_box: scope,
230                    pages: None,
231                })
232            } else {
233                // File needs additional processing
234                files_for_processing.push((file, scope));
235            }
236        }
237
238        if is_end {
239            break;
240        }
241
242        page_index += 1;
243    }
244
245    for chunk in files_for_processing.chunks(FILE_PROCESS_SIZE) {
246        let mut results: Vec<SearchIndexData> = chunk
247            .iter()
248            .map(|(file, scope)| -> LocalBoxFuture<'_, SearchIndexData> {
249                Box::pin(async move {
250                    let pages =
251                        match try_pdf_compatible_document_pages(db, storage, scope, file).await {
252                            Ok(value) => value,
253                            Err(cause) => {
254                                tracing::error!(?cause, "failed to re-create pdf index data pages");
255                                return SearchIndexData {
256                                    ty: SearchIndexType::File,
257                                    item_id: file.id,
258                                    folder_id: file.folder_id,
259                                    name: file.name.clone(),
260                                    mime: Some(file.mime.clone()),
261                                    content: None,
262                                    created_at: file.created_at,
263                                    created_by: file.created_by.clone(),
264                                    document_box: scope.clone(),
265                                    pages: None,
266                                };
267                            }
268                        };
269
270                    SearchIndexData {
271                        ty: SearchIndexType::File,
272                        item_id: file.id,
273                        folder_id: file.folder_id,
274                        name: file.name.clone(),
275                        mime: Some(file.mime.clone()),
276                        content: None,
277                        created_at: file.created_at,
278                        created_by: file.created_by.clone(),
279                        document_box: scope.clone(),
280                        pages: Some(pages),
281                    }
282                })
283            })
284            .collect::<FuturesUnordered<LocalBoxFuture<'_, SearchIndexData>>>()
285            .collect()
286            .await;
287
288        data.append(&mut results);
289    }
290
291    Ok(data)
292}
293
294#[derive(Debug, Error)]
295pub enum PdfCompatibleRebuildError {
296    #[error("file is missing text content")]
297    MissingTextContent,
298
299    #[error(transparent)]
300    Database(#[from] DbErr),
301
302    #[error(transparent)]
303    Storage(#[from] StorageLayerError),
304
305    #[error(transparent)]
306    InvalidTextContent(#[from] FromUtf8Error),
307}
308
309/// Attempts to obtain the [DocumentPage] collection for a PDF compatible file
310pub async fn try_pdf_compatible_document_pages(
311    db: &DbPool,
312    storage: &TenantStorageLayer,
313    scope: &DocumentBoxScopeRaw,
314    file: &File,
315) -> Result<Vec<DocumentPage>, PdfCompatibleRebuildError> {
316    // Load the extracted text content for the file
317    let text_file = GeneratedFile::find(db, scope, file.id, GeneratedFileType::TextContent)
318        .await?
319        .ok_or(PdfCompatibleRebuildError::MissingTextContent)?;
320
321    tracing::debug!(?text_file, "loaded file generated text content");
322
323    // Read the PDF file from S3
324    let text_content = storage
325        .get_file(&text_file.file_key)
326        .await?
327        .collect_bytes()
328        .await
329        .inspect_err(|cause| {
330            tracing::error!(?cause, "failed to load pdf bytes from s3 stream");
331        })?;
332
333    // Load the text content
334    let text_content = text_content.to_vec();
335    let text_content =
336        String::from_utf8(text_content).map_err(PdfCompatibleRebuildError::InvalidTextContent)?;
337
338    // Split the content back into pages
339    let pages = split_pdf_text_pages(&text_content);
340
341    // Create the pages data
342    let pages = pages
343        .into_iter()
344        .enumerate()
345        .map(|(page, content)| DocumentPage {
346            page: page as u64,
347            content: content.to_string(),
348        })
349        .collect();
350
351    Ok(pages)
352}