1use crate::{
14 SearchError, SearchIndex,
15 models::{
16 FileSearchRequest, FileSearchResults, FlattenedItemResult, PageResult, SearchIndexData,
17 SearchIndexType, SearchRequest, SearchResults, SearchScore,
18 },
19};
20use docbox_database::{
21 DatabasePoolCache, DbPool,
22 models::{
23 document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
24 file::FileId,
25 folder::FolderId,
26 search::{
27 DocboxSearchDateRange, DocboxSearchFilters, DocboxSearchMatchRanked,
28 count_search_file_pages, delete_file_pages_by_file_id, delete_file_pages_by_scope,
29 search_file_pages,
30 },
31 tenant::Tenant,
32 },
33 sqlx,
34};
35use itertools::Itertools;
36use serde::{Deserialize, Serialize};
37use std::{sync::Arc, vec};
38
39pub use error::{DatabaseSearchError, DatabaseSearchIndexFactoryError};
40
41pub mod error;
42
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct DatabaseSearchConfig {}
45
46impl DatabaseSearchConfig {
47 pub fn from_env() -> Result<Self, DatabaseSearchIndexFactoryError> {
48 Ok(Self {})
49 }
50}
51
52#[derive(Clone)]
53pub struct DatabaseSearchIndexFactory {
54 db: Arc<DatabasePoolCache>,
55}
56
57impl DatabaseSearchIndexFactory {
58 pub fn from_config(
59 db: Arc<DatabasePoolCache>,
60 _config: DatabaseSearchConfig,
61 ) -> Result<Self, DatabaseSearchIndexFactoryError> {
62 Ok(Self { db })
63 }
64
65 pub fn create_search_index(&self, tenant: &Tenant) -> DatabaseSearchIndex {
66 DatabaseSearchIndex {
67 db: IndexDatabaseSource::Pools {
68 db: self.db.clone(),
69 tenant: Arc::new(tenant.clone()),
70 },
71 }
72 }
73}
74
75#[derive(Clone)]
76pub struct DatabaseSearchIndex {
77 db: IndexDatabaseSource,
78}
79
80#[derive(Clone)]
81pub enum IndexDatabaseSource {
82 Pools {
84 db: Arc<DatabasePoolCache>,
86 tenant: Arc<Tenant>,
88 },
89 Pool(DbPool),
91}
92
93const TENANT_MIGRATIONS: &[(&str, &str)] = &[
94 (
95 "m1_create_additional_indexes",
96 include_str!("./migrations/m1_create_additional_indexes.sql"),
97 ),
98 (
99 "m2_search_create_files_pages_table",
100 include_str!("./migrations/m2_search_create_files_pages_table.sql"),
101 ),
102 (
103 "m3_create_tsvector_columns",
104 include_str!("./migrations/m3_create_tsvector_columns.sql"),
105 ),
106 (
107 "m4_search_functions_and_types",
108 include_str!("./migrations/m4_search_functions_and_types.sql"),
109 ),
110];
111
112impl DatabaseSearchIndex {
113 pub fn from_pool(db: DbPool) -> Self {
114 Self {
115 db: IndexDatabaseSource::Pool(db),
116 }
117 }
118
119 pub async fn acquire_db(&self) -> Result<DbPool, SearchError> {
120 match &self.db {
121 IndexDatabaseSource::Pools { db, tenant } => {
122 let db = db.get_tenant_pool(tenant).await.map_err(|error| {
123 tracing::error!(?error, "failed to acquire database for searching");
124 DatabaseSearchError::AcquireDatabase
125 })?;
126 Ok(db)
127 }
128 IndexDatabaseSource::Pool(db) => Ok(db.clone()),
129 }
130 }
131
132 pub async fn close(&self) {
134 if let IndexDatabaseSource::Pools { db, tenant } = &self.db {
135 db.close_tenant_pool(tenant).await;
136 }
137 }
138}
139
140impl SearchIndex for DatabaseSearchIndex {
141 async fn create_index(&self) -> Result<(), SearchError> {
142 Ok(())
144 }
145
146 async fn index_exists(&self) -> Result<bool, SearchError> {
147 Ok(false)
152 }
153
154 async fn delete_index(&self) -> Result<(), SearchError> {
155 Ok(())
157 }
158
159 async fn search_index(
160 &self,
161 scopes: &[DocumentBoxScopeRaw],
162 query: SearchRequest,
163 folder_children: Option<Vec<FolderId>>,
164 ) -> Result<crate::models::SearchResults, SearchError> {
165 let db = self.acquire_db().await?;
166
167 let query_text = query.query.unwrap_or_default();
168
169 let results: Vec<DocboxSearchMatchRanked> = sqlx::query_as(
170 r#"
171 SELECT *
172 FROM resolve_search_results(
173 $1,
174 plainto_tsquery('english', $1),
175 $2,
176 $3,
177 $4,
178 $5
179 )
180 LIMIT $6
181 OFFSET $7"#,
182 )
183 .bind(query_text)
184 .bind(DocboxSearchFilters {
185 document_boxes: scopes.to_vec(),
186 folder_children,
187 include_name: query.include_name,
188 include_content: query.include_content,
189 created_at: query.created_at.map(|value| DocboxSearchDateRange {
190 start: value.start,
191 end: value.end,
192 }),
193 created_by: query.created_by,
194 })
195 .bind(query.mime.map(|value| value.0.to_string()))
196 .bind(query.max_pages.unwrap_or(3) as i32)
197 .bind(query.pages_offset.unwrap_or_default() as i32)
198 .bind(query.size.unwrap_or(50) as i32)
199 .bind(query.offset.unwrap_or(0) as i32)
200 .fetch_all(&db)
201 .await
202 .map_err(|error| {
203 tracing::error!(?error, "failed to search index");
204 DatabaseSearchError::SearchIndex
205 })?;
206
207 let total_hits = results
208 .first()
209 .map(|result| result.total_count)
210 .unwrap_or_default();
211
212 let results = results
213 .into_iter()
214 .filter_map(|result| {
215 let rank = result.rank;
216 let result = result.search_match;
217
218 let item_ty = match result.item_type.as_str() {
219 "File" => SearchIndexType::File,
220 "Folder" => SearchIndexType::Folder,
221 "Link" => SearchIndexType::Link,
222 _ => return None,
224 };
225
226 Some(FlattenedItemResult {
227 item_ty,
228 item_id: result.item_id,
229 document_box: result.document_box,
230 page_matches: result
231 .page_matches
232 .into_iter()
233 .map(|result| PageResult {
234 matches: vec![result.matched],
235 page: result.page as u64,
236 })
237 .collect(),
238 total_hits: result.total_hits as u64,
239 score: SearchScore::Float(rank as f32),
240 name_match: result.name_match,
241 content_match: result.content_match,
242 })
243 })
244 .collect();
245
246 Ok(SearchResults {
247 total_hits: total_hits as u64,
248 results,
249 })
250 }
251
252 async fn search_index_file(
253 &self,
254 scope: &DocumentBoxScopeRaw,
255 file_id: FileId,
256 query: FileSearchRequest,
257 ) -> Result<crate::models::FileSearchResults, SearchError> {
258 let db = self.acquire_db().await?;
259 let query_text = query.query.unwrap_or_default();
260 let total_pages = count_search_file_pages(&db, scope, file_id, &query_text)
261 .await
262 .map_err(|error| {
263 tracing::error!(?error, "failed to count search file pages");
264 DatabaseSearchError::CountFilePages
265 })?;
266 let pages = search_file_pages(
267 &db,
268 scope,
269 file_id,
270 &query_text,
271 query.limit.unwrap_or(50) as i64,
272 query.offset.unwrap_or(0) as i64,
273 )
274 .await
275 .map_err(|error| {
276 tracing::error!(?error, "failed to search file pages");
277 DatabaseSearchError::SearchFilePages
278 })?;
279
280 Ok(FileSearchResults {
281 total_hits: total_pages.count as u64,
282 results: pages
283 .into_iter()
284 .map(|page| PageResult {
285 page: page.page as u64,
286 matches: vec![page.highlighted_content],
287 })
288 .collect(),
289 })
290 }
291
292 async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
293 let db = self.acquire_db().await?;
294
295 for item in data {
296 let pages = match item.pages {
297 Some(value) => value,
298 None => continue,
300 };
301
302 if pages.is_empty() {
303 continue;
304 }
305
306 let values = pages
307 .iter()
308 .enumerate()
309 .map(|(index, _page)| format!("($1, ${}, ${})", 2 + index * 2, 3 + index * 2))
310 .join(",");
311
312 let query = format!(
313 r#"INSERT INTO "docbox_files_pages" ("file_id", "page", "content") VALUES {values}"#
314 );
315
316 let mut query = sqlx::query(&query)
317 .bind(item.item_id);
319
320 for page in pages {
321 query = query.bind(page.page as i32).bind(page.content);
322 }
323
324 if let Err(error) = query.execute(&db).await {
325 tracing::error!(?error, "failed to add search data");
326 return Err(DatabaseSearchError::AddData.into());
327 }
328 }
329
330 Ok(())
331 }
332
333 async fn update_data(
334 &self,
335 _item_id: uuid::Uuid,
336 _data: crate::models::UpdateSearchIndexData,
337 ) -> Result<(), SearchError> {
338 Ok(())
341 }
342
343 async fn delete_data(&self, id: uuid::Uuid) -> Result<(), SearchError> {
344 let db = self.acquire_db().await?;
345 delete_file_pages_by_file_id(&db, id)
346 .await
347 .map_err(|error| {
348 tracing::error!(?error, "failed to delete search data by id");
349 DatabaseSearchError::DeleteData
350 })?;
351 Ok(())
352 }
353
354 async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
355 let db = self.acquire_db().await?;
356 delete_file_pages_by_scope(&db, scope)
357 .await
358 .map_err(|error| {
359 tracing::error!(?error, "failed to delete search data by scope");
360 DatabaseSearchError::DeleteData
361 })?;
362 Ok(())
363 }
364
365 async fn get_pending_migrations(
366 &self,
367 applied_names: Vec<String>,
368 ) -> Result<Vec<String>, SearchError> {
369 let pending = TENANT_MIGRATIONS
370 .iter()
371 .filter(|(migration_name, _migration)| {
372 !applied_names
374 .iter()
375 .any(|applied_migration| applied_migration.eq(migration_name))
376 })
377 .map(|(migration_name, _migration)| migration_name.to_string())
378 .collect();
379
380 Ok(pending)
381 }
382
383 async fn apply_migration(
384 &self,
385 _tenant: &docbox_database::models::tenant::Tenant,
386 _root_t: &mut docbox_database::DbTransaction<'_>,
387 t: &mut docbox_database::DbTransaction<'_>,
388 name: &str,
389 ) -> Result<(), SearchError> {
390 let (_, migration) = TENANT_MIGRATIONS
391 .iter()
392 .find(|(migration_name, _)| name.eq(*migration_name))
393 .ok_or(DatabaseSearchError::MigrationNotFound)?;
394
395 docbox_database::migrations::apply_migration(t, name, migration)
397 .await
398 .map_err(|error| {
399 tracing::error!(?error, "failed to apply migration");
400 DatabaseSearchError::ApplyMigration
401 })?;
402
403 Ok(())
404 }
405}