1use docbox_database::{
2 DbErr, DbPool, DbResult,
3 models::{
4 document_box::DocumentBoxScopeRaw,
5 file::{File, FileWithScope},
6 folder::Folder,
7 generated_file::{GeneratedFile, GeneratedFileType},
8 link::{Link, LinkWithScope},
9 },
10};
11use docbox_processing::{office::is_pdf_compatible, pdf::split_pdf_text_pages};
12use docbox_search::{
13 SearchError, TenantSearchIndex,
14 models::{DocumentPage, SearchIndexData, SearchIndexType},
15};
16use docbox_storage::{StorageLayerError, TenantStorageLayer};
17use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
18use std::{str::FromStr, string::FromUtf8Error};
19use thiserror::Error;
20
21#[derive(Debug, Error)]
22pub enum RebuildTenantIndexError {
23 #[error(transparent)]
24 Database(#[from] DbErr),
25 #[error(transparent)]
26 Search(#[from] SearchError),
27 #[error(transparent)]
28 WriteIndexData(std::io::Error),
29 #[error(transparent)]
30 SerializeIndexData(serde_json::Error),
31}
32
33pub async fn rebuild_tenant_index(
36 db: &DbPool,
37 search: &TenantSearchIndex,
38 storage: &TenantStorageLayer,
39) -> Result<(), RebuildTenantIndexError> {
40 tracing::info!("started re-indexing tenant");
41
42 let index_data = recreate_search_index_data(db, storage).await?;
43 tracing::debug!("all data loaded: {}", index_data.len());
44
45 {
46 let serialized = serde_json::to_string(&index_data)
47 .map_err(RebuildTenantIndexError::SerializeIndexData)?;
48 tokio::fs::write("index_data.json", serialized)
49 .await
50 .map_err(RebuildTenantIndexError::WriteIndexData)?;
51 }
52
53 apply_rebuilt_tenant_index(search, index_data).await?;
54
55 Ok(())
56}
57
58pub async fn apply_rebuilt_tenant_index(
60 search: &TenantSearchIndex,
61 data: Vec<SearchIndexData>,
62) -> Result<(), SearchError> {
63 _ = search.create_index().await;
65
66 let mut iter = data.into_iter();
67
68 loop {
69 let chunk: Vec<_> = iter.by_ref().take(INDEX_CHUNK_SIZE).collect();
70 if chunk.is_empty() {
71 break;
72 }
73 search.add_data(chunk).await?;
74 }
75
76 Ok(())
77}
78
79pub async fn recreate_search_index_data(
81 db: &DbPool,
82 storage: &TenantStorageLayer,
83) -> DbResult<Vec<SearchIndexData>> {
84 let links = create_links_index_data(db).await?;
85 let folders = create_folders_index_data(db).await?;
86 let files = create_files_index_data(db, storage).await?;
87
88 let index_data = links
89 .into_iter()
90 .chain(folders.into_iter())
91 .chain(files.into_iter())
92 .collect::<Vec<SearchIndexData>>();
93
94 Ok(index_data)
95}
96
97const INDEX_CHUNK_SIZE: usize = 5000;
98const DATABASE_PAGE_SIZE: u64 = 1000;
100const FILE_PROCESS_SIZE: usize = 500;
102
103pub async fn create_links_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
105 let mut page_index = 0;
106 let mut data = Vec::new();
107
108 loop {
109 let links = match Link::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
110 Ok(value) => value,
111 Err(error) => {
112 tracing::error!(?error, ?page_index, "failed to load links page");
113 return Err(error);
114 }
115 };
116 let is_end = (links.len() as u64) < DATABASE_PAGE_SIZE;
117
118 for LinkWithScope { link, scope } in links {
119 data.push(SearchIndexData {
120 ty: SearchIndexType::Link,
121 item_id: link.id,
122 folder_id: link.folder_id,
123 name: link.name.to_string(),
124 mime: None,
125 content: Some(link.value.clone()),
126 pages: None,
127 created_at: link.created_at,
128 created_by: link.created_by.clone(),
129 document_box: scope.clone(),
130 })
131 }
132
133 if is_end {
134 break;
135 }
136
137 page_index += 1;
138 }
139
140 Ok(data)
141}
142
143pub async fn create_folders_index_data(db: &DbPool) -> DbResult<Vec<SearchIndexData>> {
145 let mut page_index = 0;
146 let mut data = Vec::new();
147
148 loop {
149 let folders =
150 match Folder::all_non_root(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE)
151 .await
152 {
153 Ok(value) => value,
154 Err(error) => {
155 tracing::error!(?error, ?page_index, "failed to load folders page");
156 return Err(error);
157 }
158 };
159 let is_end = (folders.len() as u64) < DATABASE_PAGE_SIZE;
160
161 for folder in folders {
162 let folder_id = match folder.folder_id {
163 Some(value) => value,
164 None => continue,
166 };
167
168 data.push(SearchIndexData {
169 ty: SearchIndexType::Folder,
170 item_id: folder.id,
171 folder_id,
172 name: folder.name.to_string(),
173 mime: None,
174 content: None,
175 pages: None,
176 created_at: folder.created_at,
177 created_by: folder.created_by.clone(),
178 document_box: folder.document_box.clone(),
179 })
180 }
181
182 if is_end {
183 break;
184 }
185
186 page_index += 1;
187 }
188
189 Ok(data)
190}
191
192pub async fn create_files_index_data(
193 db: &DbPool,
194 storage: &TenantStorageLayer,
195) -> DbResult<Vec<SearchIndexData>> {
196 let mut page_index = 0;
197 let mut data = Vec::new();
198 let mut files_for_processing = Vec::new();
199
200 loop {
201 let files = match File::all(db, page_index * DATABASE_PAGE_SIZE, DATABASE_PAGE_SIZE).await {
202 Ok(value) => value,
203 Err(error) => {
204 tracing::error!(?error, ?page_index, "failed to load files page");
205 return Err(error);
206 }
207 };
208 let is_end = (files.len() as u64) < DATABASE_PAGE_SIZE;
209
210 for FileWithScope { file, scope } in files {
211 let mime = match mime::Mime::from_str(&file.mime) {
212 Ok(value) => value,
213 Err(error) => {
214 tracing::error!(?error, ?file, "file has an invalid mime type");
215 continue;
216 }
217 };
218
219 if file.encrypted || !is_pdf_compatible(&mime) {
220 data.push(SearchIndexData {
222 ty: SearchIndexType::File,
223 item_id: file.id,
224 folder_id: file.folder_id,
225 name: file.name,
226 mime: Some(file.mime),
227 content: None,
228 created_at: file.created_at,
229 created_by: file.created_by,
230 document_box: scope,
231 pages: None,
232 })
233 } else {
234 files_for_processing.push((file, scope));
236 }
237 }
238
239 if is_end {
240 break;
241 }
242
243 page_index += 1;
244 }
245
246 for chunk in files_for_processing.chunks(FILE_PROCESS_SIZE) {
247 let mut results: Vec<SearchIndexData> = chunk
248 .iter()
249 .map(|(file, scope)| -> BoxFuture<'_, SearchIndexData> {
250 Box::pin(async move {
251 let pages =
252 match try_pdf_compatible_document_pages(db, storage, scope, file).await {
253 Ok(value) => value,
254 Err(error) => {
255 tracing::error!(?error, "failed to re-create pdf index data pages");
256 return SearchIndexData {
257 ty: SearchIndexType::File,
258 item_id: file.id,
259 folder_id: file.folder_id,
260 name: file.name.clone(),
261 mime: Some(file.mime.clone()),
262 content: None,
263 created_at: file.created_at,
264 created_by: file.created_by.clone(),
265 document_box: scope.clone(),
266 pages: None,
267 };
268 }
269 };
270
271 SearchIndexData {
272 ty: SearchIndexType::File,
273 item_id: file.id,
274 folder_id: file.folder_id,
275 name: file.name.clone(),
276 mime: Some(file.mime.clone()),
277 content: None,
278 created_at: file.created_at,
279 created_by: file.created_by.clone(),
280 document_box: scope.clone(),
281 pages: Some(pages),
282 }
283 })
284 })
285 .collect::<FuturesUnordered<BoxFuture<'_, SearchIndexData>>>()
286 .collect()
287 .await;
288
289 data.append(&mut results);
290 }
291
292 Ok(data)
293}
294
295#[derive(Debug, Error)]
296pub enum PdfCompatibleRebuildError {
297 #[error("file is missing text content")]
298 MissingTextContent,
299
300 #[error(transparent)]
301 Database(#[from] DbErr),
302
303 #[error(transparent)]
304 Storage(#[from] StorageLayerError),
305
306 #[error(transparent)]
307 InvalidTextContent(#[from] FromUtf8Error),
308}
309
310pub async fn try_pdf_compatible_document_pages(
312 db: &DbPool,
313 storage: &TenantStorageLayer,
314 scope: &DocumentBoxScopeRaw,
315 file: &File,
316) -> Result<Vec<DocumentPage>, PdfCompatibleRebuildError> {
317 let text_file = GeneratedFile::find(db, scope, file.id, GeneratedFileType::TextContent)
319 .await?
320 .ok_or(PdfCompatibleRebuildError::MissingTextContent)?;
321
322 tracing::debug!(?text_file, "loaded file generated text content");
323
324 let text_content = storage
326 .get_file(&text_file.file_key)
327 .await?
328 .collect_bytes()
329 .await
330 .inspect_err(|error| {
331 tracing::error!(?error, "failed to load pdf bytes from s3 stream");
332 })?;
333
334 let text_content = text_content.to_vec();
336 let text_content =
337 String::from_utf8(text_content).map_err(PdfCompatibleRebuildError::InvalidTextContent)?;
338
339 let pages = split_pdf_text_pages(&text_content);
341
342 let pages = pages
344 .into_iter()
345 .enumerate()
346 .map(|(page, content)| DocumentPage {
347 page: page as u64,
348 content: content.to_string(),
349 })
350 .collect();
351
352 Ok(pages)
353}