Skip to main content

docbox_search/database/
mod.rs

1//! # Database
2//!
3//! Database backed search, uses the postgres backend directly as a search index.
4//!
5//! When using this search type additional tables and indexes are added in order to store the
6//! page text contents for files in the database, it also adds additional columns to
7//! other tables to provide tsvector variants to allow fast full text search.
8//!
9//! This is a good backend to choose if you don't wish to have a dedicated search service
10//! running to manage a copy of your data, you can instead store it along side the metadata
11//! inside your postgres database.
12
13use crate::{
14    SearchError, SearchIndex,
15    models::{
16        FileSearchRequest, FileSearchResults, FlattenedItemResult, PageResult, SearchIndexData,
17        SearchIndexType, SearchRequest, SearchResults, SearchScore,
18    },
19};
20use docbox_database::{
21    DatabasePoolCache, DbPool,
22    models::{
23        document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
24        file::FileId,
25        folder::FolderId,
26        search::{
27            DocboxSearchDateRange, DocboxSearchFilters, DocboxSearchItemType,
28            DocboxSearchMatchRanked, DocboxSearchPageMatch, SearchOptions,
29            delete_file_pages_by_file_id, delete_file_pages_by_scope, search, search_file_pages,
30        },
31        tenant::Tenant,
32    },
33    sqlx,
34};
35use itertools::Itertools;
36use serde::{Deserialize, Serialize};
37use std::{sync::Arc, vec};
38
39pub use error::{DatabaseSearchError, DatabaseSearchIndexFactoryError};
40
41pub mod error;
42mod migrations;
43
44/// Configuration for a database backend search index
45#[derive(Default, Debug, Clone, Deserialize, Serialize)]
46pub struct DatabaseSearchConfig {}
47
48impl DatabaseSearchConfig {
49    /// Load the configuration from environment variables
50    pub fn from_env() -> Result<Self, DatabaseSearchIndexFactoryError> {
51        Ok(Self {})
52    }
53}
54
55/// Factory for producing [DatabaseSearchIndex]'s for tenants
56#[derive(Clone)]
57pub struct DatabaseSearchIndexFactory {
58    db: Arc<DatabasePoolCache>,
59}
60
61impl DatabaseSearchIndexFactory {
62    // Create a [DatabaseSearchIndexFactory] from a `db` pool cache and
63    // the provided configuration `config`
64    pub fn from_config(
65        db: Arc<DatabasePoolCache>,
66        config: DatabaseSearchConfig,
67    ) -> Result<Self, DatabaseSearchIndexFactoryError> {
68        _ = config;
69
70        Ok(Self { db })
71    }
72
73    /// Create a search index for the provided `tenant`
74    pub fn create_search_index(&self, tenant: &Tenant) -> DatabaseSearchIndex {
75        DatabaseSearchIndex {
76            db: IndexDatabaseSource::Pools {
77                db: self.db.clone(),
78                tenant: Arc::new(tenant.clone()),
79            },
80        }
81    }
82}
83
84/// Database backend search index
85#[derive(Clone)]
86pub struct DatabaseSearchIndex {
87    /// Underlying database source
88    db: IndexDatabaseSource,
89}
90
91#[derive(Clone)]
92pub enum IndexDatabaseSource {
93    /// Database source backed by the database pool cache
94    Pools {
95        /// The cache for producing databases
96        db: Arc<DatabasePoolCache>,
97        /// The tenant the search index is for
98        tenant: Arc<Tenant>,
99    },
100    /// Singular database pool backed implementation for testing
101    Pool(DbPool),
102}
103
104impl DatabaseSearchIndex {
105    /// Create a search index from a db pool
106    pub fn from_pool(db: DbPool) -> Self {
107        Self {
108            db: IndexDatabaseSource::Pool(db),
109        }
110    }
111
112    /// Acquire a database connection
113    async fn acquire_db(&self) -> Result<DbPool, SearchError> {
114        match &self.db {
115            IndexDatabaseSource::Pools { db, tenant } => {
116                let db = db
117                    .get_tenant_pool(tenant)
118                    .await
119                    .inspect_err(|error| {
120                        tracing::error!(?error, "failed to acquire database for searching")
121                    })
122                    .map_err(DatabaseSearchError::AcquireDatabase)?;
123                Ok(db)
124            }
125            IndexDatabaseSource::Pool(db) => Ok(db.clone()),
126        }
127    }
128
129    /// Close the associated tenant database pool
130    pub async fn close(&self) {
131        match &self.db {
132            IndexDatabaseSource::Pools { db, tenant } => {
133                db.close_tenant_pool(tenant).await;
134            }
135            IndexDatabaseSource::Pool(pool) => {
136                pool.close().await;
137            }
138        }
139    }
140}
141
142impl SearchIndex for DatabaseSearchIndex {
143    async fn create_index(&self) -> Result<(), SearchError> {
144        // No-op, creation is handled in the migration running phase
145        Ok(())
146    }
147
148    async fn index_exists(&self) -> Result<bool, SearchError> {
149        // Since "index_exists" is used by the management interface to detect
150        // if the index has been already created, in this case we want to always
151        // report false so that it doesn't think the index exists
152        // (Even though if the tenant exists then the index exists)
153        Ok(false)
154    }
155
156    async fn delete_index(&self) -> Result<(), SearchError> {
157        // No-op
158        Ok(())
159    }
160
161    #[tracing::instrument(skip(self))]
162    async fn search_index(
163        &self,
164        scopes: &[DocumentBoxScopeRaw],
165        query: SearchRequest,
166        folder_children: Option<Vec<FolderId>>,
167    ) -> Result<crate::models::SearchResults, SearchError> {
168        let db = self.acquire_db().await?;
169
170        let query_text = query.query.unwrap_or_default();
171
172        let mime = query.mime.map(|value| value.0.to_string());
173
174        let max_pages = query.max_pages.unwrap_or(3) as i64;
175        let pages_offset = query.pages_offset.unwrap_or_default() as i64;
176
177        let limit = query.size.unwrap_or(50) as i64;
178        let offset = query.offset.unwrap_or_default() as i64;
179
180        let filters = DocboxSearchFilters {
181            document_boxes: scopes.to_vec(),
182            folder_children,
183            include_name: query.include_name,
184            include_content: query.include_content,
185            created_at: query.created_at.map(|value| DocboxSearchDateRange {
186                start: value.start,
187                end: value.end,
188            }),
189            created_by: query.created_by,
190            mime,
191        };
192
193        let results = search(
194            &db,
195            SearchOptions {
196                query: query_text,
197                filters,
198                max_pages,
199                pages_offset,
200                limit,
201                offset,
202            },
203        )
204        .await
205        .inspect_err(|error| tracing::error!(?error, "failed to search index"))
206        .map_err(DatabaseSearchError::SearchIndex)?;
207
208        let total_hits = results
209            .first()
210            .map(|result| result.total_count)
211            .unwrap_or_default() as u64;
212
213        let results = results.into_iter().map(FlattenedItemResult::from).collect();
214
215        Ok(SearchResults {
216            total_hits,
217            results,
218        })
219    }
220
221    #[tracing::instrument(skip(self))]
222    async fn search_index_file(
223        &self,
224        scope: &DocumentBoxScopeRaw,
225        file_id: FileId,
226        query: FileSearchRequest,
227    ) -> Result<crate::models::FileSearchResults, SearchError> {
228        let db = self.acquire_db().await?;
229        let query_text = query.query.unwrap_or_default();
230
231        let limit = query.limit.unwrap_or(50) as i64;
232        let offset = query.offset.unwrap_or_default() as i64;
233
234        let pages = search_file_pages(&db, scope, file_id, &query_text, limit, offset)
235            .await
236            .map_err(|error| {
237                tracing::error!(?error, "failed to search file pages");
238                DatabaseSearchError::SearchFilePages
239            })?;
240
241        let total_hits = pages
242            .first()
243            .map(|value| value.total_hits)
244            .unwrap_or_default() as u64;
245
246        Ok(FileSearchResults {
247            total_hits,
248            results: pages.into_iter().map(PageResult::from).collect(),
249        })
250    }
251
252    #[tracing::instrument(skip_all)]
253    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
254        let db = self.acquire_db().await?;
255
256        for item in data {
257            let pages = match item.pages {
258                Some(value) => value,
259                // Skip anything without pages
260                None => continue,
261            };
262
263            if pages.is_empty() {
264                continue;
265            }
266
267            let values = pages
268                .iter()
269                .enumerate()
270                .map(|(index, _page)| format!("($1, ${}, ${})", 2 + index * 2, 3 + index * 2))
271                .join(",");
272
273            let query = format!(
274                r#"INSERT INTO "docbox_files_pages" ("file_id", "page", "content") VALUES {values}"#
275            );
276
277            let mut query = sqlx::query(&query)
278                // Shared amongst all values
279                .bind(item.item_id);
280
281            for page in pages {
282                query = query.bind(page.page as i32).bind(page.content);
283            }
284
285            query
286                .execute(&db)
287                .await
288                .inspect_err(|error| tracing::error!(?error, "failed to add search data"))
289                .map_err(DatabaseSearchError::AddData)?;
290        }
291
292        Ok(())
293    }
294
295    async fn update_data(
296        &self,
297        _item_id: uuid::Uuid,
298        _data: crate::models::UpdateSearchIndexData,
299    ) -> Result<(), SearchError> {
300        // No-op: Currently page data is never updated, and since this search implementation sources all other
301        // data directly from the database it already has a copy of everything it needs so no changes need to be made
302        Ok(())
303    }
304
305    #[tracing::instrument(skip(self))]
306    async fn delete_data(&self, id: uuid::Uuid) -> Result<(), SearchError> {
307        let db = self.acquire_db().await?;
308        delete_file_pages_by_file_id(&db, id)
309            .await
310            .inspect_err(|error| tracing::error!(?error, "failed to delete search data by id"))
311            .map_err(DatabaseSearchError::DeleteData)?;
312        Ok(())
313    }
314
315    #[tracing::instrument(skip(self))]
316    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
317        let db = self.acquire_db().await?;
318        delete_file_pages_by_scope(&db, scope)
319            .await
320            .inspect_err(|error| tracing::error!(?error, "failed to delete search data by scope"))
321            .map_err(DatabaseSearchError::DeleteData)?;
322        Ok(())
323    }
324
325    #[tracing::instrument(skip(self))]
326    async fn get_pending_migrations(
327        &self,
328        applied_names: Vec<String>,
329    ) -> Result<Vec<String>, SearchError> {
330        let pending = migrations::get_pending_migrations(applied_names);
331        Ok(pending)
332    }
333
334    #[tracing::instrument(skip(self))]
335    async fn apply_migration(
336        &self,
337        _tenant: &docbox_database::models::tenant::Tenant,
338        _root_t: &mut docbox_database::DbTransaction<'_>,
339        t: &mut docbox_database::DbTransaction<'_>,
340        name: &str,
341    ) -> Result<(), SearchError> {
342        migrations::apply_migration(t, name).await
343    }
344}
345
346impl From<DocboxSearchMatchRanked> for FlattenedItemResult {
347    fn from(value: DocboxSearchMatchRanked) -> Self {
348        let DocboxSearchMatchRanked {
349            search_match, rank, ..
350        } = value;
351        FlattenedItemResult {
352            item_ty: search_match.item_type.into(),
353            item_id: search_match.item_id,
354            document_box: search_match.document_box,
355            page_matches: search_match
356                .page_matches
357                .into_iter()
358                .map(PageResult::from)
359                .collect(),
360            total_hits: search_match.total_hits as u64,
361            score: SearchScore::Float(rank as f32),
362            name_match: search_match.name_match,
363            content_match: search_match.content_match,
364        }
365    }
366}
367
368impl From<DocboxSearchPageMatch> for PageResult {
369    fn from(value: DocboxSearchPageMatch) -> Self {
370        PageResult {
371            matches: vec![value.matched],
372            page: value.page as u64,
373        }
374    }
375}
376
377impl From<DocboxSearchItemType> for SearchIndexType {
378    fn from(value: DocboxSearchItemType) -> Self {
379        match value {
380            DocboxSearchItemType::File => SearchIndexType::File,
381            DocboxSearchItemType::Folder => SearchIndexType::Folder,
382            DocboxSearchItemType::Link => SearchIndexType::Link,
383        }
384    }
385}