Skip to main content

docbox_search/database/
mod.rs

1//! # Database
2//!
3//! Database backed search, uses the postgres backend directly as a search index.
4//!
5//! When using this search type additional tables and indexes are added in order to store the
6//! page text contents for files in the database, it also adds additional columns to
7//! other tables to provide tsvector variants to allow fast full text search.
8//!
9//! This is a good backend to choose if you don't wish to have a dedicated search service
10//! running to manage a copy of your data, you can instead store it along side the metadata
11//! inside your postgres database.
12
13use crate::{
14    SearchError, SearchIndex,
15    models::{
16        FileSearchRequest, FileSearchResults, FlattenedItemResult, PageResult, SearchIndexData,
17        SearchIndexType, SearchRequest, SearchResults, SearchScore,
18    },
19};
20use docbox_database::{
21    DatabasePoolCache, DbPool,
22    models::{
23        document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
24        file::FileId,
25        folder::FolderId,
26        search::{
27            DocboxSearchDateRange, DocboxSearchFilters, DocboxSearchMatchRanked,
28            count_search_file_pages, delete_file_pages_by_file_id, delete_file_pages_by_scope,
29            search_file_pages,
30        },
31        tenant::Tenant,
32    },
33    sqlx,
34};
35use itertools::Itertools;
36use serde::{Deserialize, Serialize};
37use std::{sync::Arc, vec};
38
39pub use error::{DatabaseSearchError, DatabaseSearchIndexFactoryError};
40
41pub mod error;
42
43#[derive(Debug, Clone, Deserialize, Serialize)]
44pub struct DatabaseSearchConfig {}
45
46impl DatabaseSearchConfig {
47    pub fn from_env() -> Result<Self, DatabaseSearchIndexFactoryError> {
48        Ok(Self {})
49    }
50}
51
52#[derive(Clone)]
53pub struct DatabaseSearchIndexFactory {
54    db: Arc<DatabasePoolCache>,
55}
56
57impl DatabaseSearchIndexFactory {
58    pub fn from_config(
59        db: Arc<DatabasePoolCache>,
60        _config: DatabaseSearchConfig,
61    ) -> Result<Self, DatabaseSearchIndexFactoryError> {
62        Ok(Self { db })
63    }
64
65    pub fn create_search_index(&self, tenant: &Tenant) -> DatabaseSearchIndex {
66        DatabaseSearchIndex {
67            db: IndexDatabaseSource::Pools {
68                db: self.db.clone(),
69                tenant: Arc::new(tenant.clone()),
70            },
71        }
72    }
73}
74
75#[derive(Clone)]
76pub struct DatabaseSearchIndex {
77    db: IndexDatabaseSource,
78}
79
80#[derive(Clone)]
81pub enum IndexDatabaseSource {
82    /// Database source backed by the database pool cache
83    Pools {
84        /// The cache for producing databases
85        db: Arc<DatabasePoolCache>,
86        /// The tenant the search index is for
87        tenant: Arc<Tenant>,
88    },
89    /// Singular database pool backed implementation for testing
90    Pool(DbPool),
91}
92
93const TENANT_MIGRATIONS: &[(&str, &str)] = &[
94    (
95        "m1_create_additional_indexes",
96        include_str!("./migrations/m1_create_additional_indexes.sql"),
97    ),
98    (
99        "m2_search_create_files_pages_table",
100        include_str!("./migrations/m2_search_create_files_pages_table.sql"),
101    ),
102    (
103        "m3_create_tsvector_columns",
104        include_str!("./migrations/m3_create_tsvector_columns.sql"),
105    ),
106    (
107        "m4_search_functions_and_types",
108        include_str!("./migrations/m4_search_functions_and_types.sql"),
109    ),
110];
111
112impl DatabaseSearchIndex {
113    pub fn from_pool(db: DbPool) -> Self {
114        Self {
115            db: IndexDatabaseSource::Pool(db),
116        }
117    }
118
119    pub async fn acquire_db(&self) -> Result<DbPool, SearchError> {
120        match &self.db {
121            IndexDatabaseSource::Pools { db, tenant } => {
122                let db = db.get_tenant_pool(tenant).await.map_err(|error| {
123                    tracing::error!(?error, "failed to acquire database for searching");
124                    DatabaseSearchError::AcquireDatabase
125                })?;
126                Ok(db)
127            }
128            IndexDatabaseSource::Pool(db) => Ok(db.clone()),
129        }
130    }
131
132    /// Close the associated tenant database pool
133    pub async fn close(&self) {
134        if let IndexDatabaseSource::Pools { db, tenant } = &self.db {
135            db.close_tenant_pool(tenant).await;
136        }
137    }
138}
139
140impl SearchIndex for DatabaseSearchIndex {
141    async fn create_index(&self) -> Result<(), SearchError> {
142        // No-op, creation is handled in the migration running phase
143        Ok(())
144    }
145
146    async fn index_exists(&self) -> Result<bool, SearchError> {
147        // Since "index_exists" is used by the management interface to detect
148        // if the index has been already created, in this case we want to always
149        // report false so that it doesn't think the index exists
150        // (Even though if the tenant exists then the index exists)
151        Ok(false)
152    }
153
154    async fn delete_index(&self) -> Result<(), SearchError> {
155        // No-op
156        Ok(())
157    }
158
159    async fn search_index(
160        &self,
161        scopes: &[DocumentBoxScopeRaw],
162        query: SearchRequest,
163        folder_children: Option<Vec<FolderId>>,
164    ) -> Result<crate::models::SearchResults, SearchError> {
165        let db = self.acquire_db().await?;
166
167        let query_text = query.query.unwrap_or_default();
168
169        let results: Vec<DocboxSearchMatchRanked> = sqlx::query_as(
170            r#"
171    SELECT *
172    FROM resolve_search_results(
173        $1,
174        plainto_tsquery('english', $1),
175        $2,
176        $3,
177        $4,
178        $5
179    )
180    LIMIT $6
181    OFFSET $7"#,
182        )
183        .bind(query_text)
184        .bind(DocboxSearchFilters {
185            document_boxes: scopes.to_vec(),
186            folder_children,
187            include_name: query.include_name,
188            include_content: query.include_content,
189            created_at: query.created_at.map(|value| DocboxSearchDateRange {
190                start: value.start,
191                end: value.end,
192            }),
193            created_by: query.created_by,
194        })
195        .bind(query.mime.map(|value| value.0.to_string()))
196        .bind(query.max_pages.unwrap_or(3) as i32)
197        .bind(query.pages_offset.unwrap_or_default() as i32)
198        .bind(query.size.unwrap_or(50) as i32)
199        .bind(query.offset.unwrap_or(0) as i32)
200        .fetch_all(&db)
201        .await
202        .map_err(|error| {
203            tracing::error!(?error, "failed to search index");
204            DatabaseSearchError::SearchIndex
205        })?;
206
207        let total_hits = results
208            .first()
209            .map(|result| result.total_count)
210            .unwrap_or_default();
211
212        let results = results
213            .into_iter()
214            .filter_map(|result| {
215                let rank = result.rank;
216                let result = result.search_match;
217
218                let item_ty = match result.item_type.as_str() {
219                    "File" => SearchIndexType::File,
220                    "Folder" => SearchIndexType::Folder,
221                    "Link" => SearchIndexType::Link,
222                    // Unknown type error, should never occur but must be handled
223                    _ => return None,
224                };
225
226                Some(FlattenedItemResult {
227                    item_ty,
228                    item_id: result.item_id,
229                    document_box: result.document_box,
230                    page_matches: result
231                        .page_matches
232                        .into_iter()
233                        .map(|result| PageResult {
234                            matches: vec![result.matched],
235                            page: result.page as u64,
236                        })
237                        .collect(),
238                    total_hits: result.total_hits as u64,
239                    score: SearchScore::Float(rank as f32),
240                    name_match: result.name_match,
241                    content_match: result.content_match,
242                })
243            })
244            .collect();
245
246        Ok(SearchResults {
247            total_hits: total_hits as u64,
248            results,
249        })
250    }
251
252    async fn search_index_file(
253        &self,
254        scope: &DocumentBoxScopeRaw,
255        file_id: FileId,
256        query: FileSearchRequest,
257    ) -> Result<crate::models::FileSearchResults, SearchError> {
258        let db = self.acquire_db().await?;
259        let query_text = query.query.unwrap_or_default();
260        let total_pages = count_search_file_pages(&db, scope, file_id, &query_text)
261            .await
262            .map_err(|error| {
263                tracing::error!(?error, "failed to count search file pages");
264                DatabaseSearchError::CountFilePages
265            })?;
266        let pages = search_file_pages(
267            &db,
268            scope,
269            file_id,
270            &query_text,
271            query.limit.unwrap_or(50) as i64,
272            query.offset.unwrap_or(0) as i64,
273        )
274        .await
275        .map_err(|error| {
276            tracing::error!(?error, "failed to search file pages");
277            DatabaseSearchError::SearchFilePages
278        })?;
279
280        Ok(FileSearchResults {
281            total_hits: total_pages.count as u64,
282            results: pages
283                .into_iter()
284                .map(|page| PageResult {
285                    page: page.page as u64,
286                    matches: vec![page.highlighted_content],
287                })
288                .collect(),
289        })
290    }
291
292    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
293        let db = self.acquire_db().await?;
294
295        for item in data {
296            let pages = match item.pages {
297                Some(value) => value,
298                // Skip anything without pages
299                None => continue,
300            };
301
302            if pages.is_empty() {
303                continue;
304            }
305
306            let values = pages
307                .iter()
308                .enumerate()
309                .map(|(index, _page)| format!("($1, ${}, ${})", 2 + index * 2, 3 + index * 2))
310                .join(",");
311
312            let query = format!(
313                r#"INSERT INTO "docbox_files_pages" ("file_id", "page", "content") VALUES {values}"#
314            );
315
316            let mut query = sqlx::query(&query)
317                // Shared amongst all values
318                .bind(item.item_id);
319
320            for page in pages {
321                query = query.bind(page.page as i32).bind(page.content);
322            }
323
324            if let Err(error) = query.execute(&db).await {
325                tracing::error!(?error, "failed to add search data");
326                return Err(DatabaseSearchError::AddData.into());
327            }
328        }
329
330        Ok(())
331    }
332
333    async fn update_data(
334        &self,
335        _item_id: uuid::Uuid,
336        _data: crate::models::UpdateSearchIndexData,
337    ) -> Result<(), SearchError> {
338        // No-op: Currently page data is never updated, and since this search implementation sources all other
339        // data directly from the database it already has a copy of everything it needs so no changes need to be made
340        Ok(())
341    }
342
343    async fn delete_data(&self, id: uuid::Uuid) -> Result<(), SearchError> {
344        let db = self.acquire_db().await?;
345        delete_file_pages_by_file_id(&db, id)
346            .await
347            .map_err(|error| {
348                tracing::error!(?error, "failed to delete search data by id");
349                DatabaseSearchError::DeleteData
350            })?;
351        Ok(())
352    }
353
354    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
355        let db = self.acquire_db().await?;
356        delete_file_pages_by_scope(&db, scope)
357            .await
358            .map_err(|error| {
359                tracing::error!(?error, "failed to delete search data by scope");
360                DatabaseSearchError::DeleteData
361            })?;
362        Ok(())
363    }
364
365    async fn get_pending_migrations(
366        &self,
367        applied_names: Vec<String>,
368    ) -> Result<Vec<String>, SearchError> {
369        let pending = TENANT_MIGRATIONS
370            .iter()
371            .filter(|(migration_name, _migration)| {
372                // Skip already applied migrations
373                !applied_names
374                    .iter()
375                    .any(|applied_migration| applied_migration.eq(migration_name))
376            })
377            .map(|(migration_name, _migration)| migration_name.to_string())
378            .collect();
379
380        Ok(pending)
381    }
382
383    async fn apply_migration(
384        &self,
385        _tenant: &docbox_database::models::tenant::Tenant,
386        _root_t: &mut docbox_database::DbTransaction<'_>,
387        t: &mut docbox_database::DbTransaction<'_>,
388        name: &str,
389    ) -> Result<(), SearchError> {
390        let (_, migration) = TENANT_MIGRATIONS
391            .iter()
392            .find(|(migration_name, _)| name.eq(*migration_name))
393            .ok_or(DatabaseSearchError::MigrationNotFound)?;
394
395        // Apply the migration
396        docbox_database::migrations::apply_migration(t, name, migration)
397            .await
398            .map_err(|error| {
399                tracing::error!(?error, "failed to apply migration");
400                DatabaseSearchError::ApplyMigration
401            })?;
402
403        Ok(())
404    }
405}