1use docbox_database::{
2 DbErr, DbPool, DbResult,
3 models::{
4 document_box::DocumentBoxScopeRaw,
5 file::{File, FileWithScope},
6 folder::Folder,
7 generated_file::{GeneratedFile, GeneratedFileType},
8 link::{Link, LinkWithScope},
9 },
10};
11use docbox_processing::{office::is_pdf_compatible, pdf::split_pdf_text_pages};
12use docbox_search::{
13 SearchError, TenantSearchIndex,
14 models::{DocumentPage, SearchIndexData, SearchIndexType},
15};
16use docbox_storage::{StorageLayerError, TenantStorageLayer};
17use futures::{StreamExt, future::LocalBoxFuture, stream::FuturesUnordered};
18use itertools::Itertools;
19use std::{str::FromStr, string::FromUtf8Error};
20use thiserror::Error;
21
22#[derive(Debug, Error)]
23pub enum RebuildTenantIndexError {
24 #[error(transparent)]
25 Database(#[from] DbErr),
26 #[error(transparent)]
27 Search(#[from] SearchError),
28 #[error(transparent)]
29 WriteIndexData(std::io::Error),
30 #[error(transparent)]
31 SerializeIndexData(serde_json::Error),
32}
33
34pub async fn rebuild_tenant_index(
37 db: &DbPool,
38 search: &TenantSearchIndex,
39 storage: &TenantStorageLayer,
40) -> Result<(), RebuildTenantIndexError> {
41 tracing::info!("started re-indexing tenant");
42
43 let index_data = recreate_search_index_data(db, storage).await?;
44 tracing::debug!("all data loaded: {}", index_data.len());
45
46 {
47 let serialized = serde_json::to_string(&index_data)
48 .map_err(RebuildTenantIndexError::SerializeIndexData)?;
49 tokio::fs::write("index_data.json", serialized)
50 .await
51 .map_err(RebuildTenantIndexError::WriteIndexData)?;
52 }
53
54 apply_rebuilt_tenant_index(search, index_data).await?;
55
56 Ok(())
57}
58
59pub async fn apply_rebuilt_tenant_index(
61 search: &TenantSearchIndex,
62 data: Vec<SearchIndexData>,
63) -> Result<(), SearchError> {
64 _ = search.create_index().await;
66
67 let index_data_chunks = data.into_iter().chunks(INDEX_CHUNK_SIZE);
68 let index_data_chunks = index_data_chunks.into_iter();
69
70 for data in index_data_chunks {
71 let chunk = data.collect::<Vec<_>>();
72 search.add_data(chunk).await?;
73 }
74
75 Ok(())
76}
77
78pub async fn recreate_search_index_data(
80 db: &DbPool,
81 storage: &TenantStorageLayer,
82) -> DbResult<Vec<SearchIndexData>> {
83 let links = create_links_index_data(db).await?;
84 let folders = create_folders_index_data(db).await?;
85 let files = create_files_index_data(db, storage).await?;
86
87 let index_data = links
88 .into_iter()
89 .chain(folders.into_iter())
90 .chain(files.into_iter())
91 .collect::<Vec<SearchIndexData>>();
92
93 Ok(index_data)
94}
95
96const INDEX_CHUNK_SIZE: usize = 5000;
97const DATABASE_PAGE_SIZE: u64 = 1000;
99const FILE_PROCESS_SIZE: usize = 500;
101
102pub async fn create_links_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
104 let mut page_index = 0;
105 let mut data = Vec::new();
106
107 loop {
108 let links = match Link::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
109 Ok(value) => value,
110 Err(error) => {
111 tracing::error!(?error, ?page_index, "failed to load links page");
112 return Err(error);
113 }
114 };
115 let is_end = (links.len() as u64) < DATABASE_PAGE_SIZE;
116
117 for LinkWithScope { link, scope } in links {
118 data.push(SearchIndexData {
119 ty: SearchIndexType::Link,
120 item_id: link.id,
121 folder_id: link.folder_id,
122 name: link.name.to_string(),
123 mime: None,
124 content: Some(link.value.clone()),
125 pages: None,
126 created_at: link.created_at,
127 created_by: link.created_by.clone(),
128 document_box: scope.clone(),
129 })
130 }
131
132 if is_end {
133 break;
134 }
135
136 page_index += 1;
137 }
138
139 Ok(data)
140}
141
142pub async fn create_folders_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
144 let mut page_index = 0;
145 let mut data = Vec::new();
146
147 loop {
148 let folders =
149 match Folder::all_non_root(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE)
150 .await
151 {
152 Ok(value) => value,
153 Err(error) => {
154 tracing::error!(?error, ?page_index, "failed to load folders page");
155 return Err(error);
156 }
157 };
158 let is_end = (folders.len() as u64) < DATABASE_PAGE_SIZE;
159
160 for folder in folders {
161 let folder_id = match folder.folder_id {
162 Some(value) => value,
163 None => continue,
165 };
166
167 data.push(SearchIndexData {
168 ty: SearchIndexType::Folder,
169 item_id: folder.id,
170 folder_id,
171 name: folder.name.to_string(),
172 mime: None,
173 content: None,
174 pages: None,
175 created_at: folder.created_at,
176 created_by: folder.created_by.clone(),
177 document_box: folder.document_box.clone(),
178 })
179 }
180
181 if is_end {
182 break;
183 }
184
185 page_index += 1;
186 }
187
188 Ok(data)
189}
190
191pub async fn create_files_index_data(
192 db: &DbPool,
193 storage: &TenantStorageLayer,
194) -> DbResult<Vec<SearchIndexData>> {
195 let mut page_index = 0;
196 let mut data = Vec::new();
197 let mut files_for_processing = Vec::new();
198
199 loop {
200 let files = match File::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
201 Ok(value) => value,
202 Err(error) => {
203 tracing::error!(?error, ?page_index, "failed to load files page");
204 return Err(error);
205 }
206 };
207 let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
208
209 for FileWithScope { file, scope } in files {
210 let mime = match mime::Mime::from_str(&file.mime) {
211 Ok(value) => value,
212 Err(error) => {
213 tracing::error!(?error, ?file, "file has an invalid mime type");
214 continue;
215 }
216 };
217
218 if file.encrypted || !is_pdf_compatible(&mime) {
219 data.push(SearchIndexData {
221 ty: SearchIndexType::File,
222 item_id: file.id,
223 folder_id: file.folder_id,
224 name: file.name,
225 mime: Some(file.mime),
226 content: None,
227 created_at: file.created_at,
228 created_by: file.created_by,
229 document_box: scope,
230 pages: None,
231 })
232 } else {
233 files_for_processing.push((file, scope));
235 }
236 }
237
238 if is_end {
239 break;
240 }
241
242 page_index += 1;
243 }
244
245 for chunk in files_for_processing.chunks(FILE_PROCESS_SIZE) {
246 let mut results: Vec<SearchIndexData> = chunk
247 .iter()
248 .map(|(file, scope)| -> LocalBoxFuture<'_, SearchIndexData> {
249 Box::pin(async move {
250 let pages =
251 match try_pdf_compatible_document_pages(db, storage, scope, file).await {
252 Ok(value) => value,
253 Err(cause) => {
254 tracing::error!(?cause, "failed to re-create pdf index data pages");
255 return SearchIndexData {
256 ty: SearchIndexType::File,
257 item_id: file.id,
258 folder_id: file.folder_id,
259 name: file.name.clone(),
260 mime: Some(file.mime.clone()),
261 content: None,
262 created_at: file.created_at,
263 created_by: file.created_by.clone(),
264 document_box: scope.clone(),
265 pages: None,
266 };
267 }
268 };
269
270 SearchIndexData {
271 ty: SearchIndexType::File,
272 item_id: file.id,
273 folder_id: file.folder_id,
274 name: file.name.clone(),
275 mime: Some(file.mime.clone()),
276 content: None,
277 created_at: file.created_at,
278 created_by: file.created_by.clone(),
279 document_box: scope.clone(),
280 pages: Some(pages),
281 }
282 })
283 })
284 .collect::<FuturesUnordered<LocalBoxFuture<'_, SearchIndexData>>>()
285 .collect()
286 .await;
287
288 data.append(&mut results);
289 }
290
291 Ok(data)
292}
293
294#[derive(Debug, Error)]
295pub enum PdfCompatibleRebuildError {
296 #[error("file is missing text content")]
297 MissingTextContent,
298
299 #[error(transparent)]
300 Database(#[from] DbErr),
301
302 #[error(transparent)]
303 Storage(#[from] StorageLayerError),
304
305 #[error(transparent)]
306 InvalidTextContent(#[from] FromUtf8Error),
307}
308
309pub async fn try_pdf_compatible_document_pages(
311 db: &DbPool,
312 storage: &TenantStorageLayer,
313 scope: &DocumentBoxScopeRaw,
314 file: &File,
315) -> Result<Vec<DocumentPage>, PdfCompatibleRebuildError> {
316 let text_file = GeneratedFile::find(db, scope, file.id, GeneratedFileType::TextContent)
318 .await?
319 .ok_or(PdfCompatibleRebuildError::MissingTextContent)?;
320
321 tracing::debug!(?text_file, "loaded file generated text content");
322
323 let text_content = storage
325 .get_file(&text_file.file_key)
326 .await?
327 .collect_bytes()
328 .await
329 .inspect_err(|cause| {
330 tracing::error!(?cause, "failed to load pdf bytes from s3 stream");
331 })?;
332
333 let text_content = text_content.to_vec();
335 let text_content =
336 String::from_utf8(text_content).map_err(PdfCompatibleRebuildError::InvalidTextContent)?;
337
338 let pages = split_pdf_text_pages(&text_content);
340
341 let pages = pages
343 .into_iter()
344 .enumerate()
345 .map(|(page, content)| DocumentPage {
346 page: page as u64,
347 content: content.to_string(),
348 })
349 .collect();
350
351 Ok(pages)
352}