Skip to main content

docbox_search/database/
mod.rs

1//! # Database
2//!
3//! Database backed search, uses the postgres backend directly as a search index.
4//!
5//! When using this search type additional tables and indexes are added in order to store the
6//! page text contents for files in the database, it also adds additional columns to
7//! other tables to provide tsvector variants to allow fast full text search.
8//!
9//! This is a good backend to choose if you don't wish to have a dedicated search service
10//! running to manage a copy of your data, you can instead store it along side the metadata
11//! inside your postgres database.
12
13use crate::{
14    SearchError, SearchIndex,
15    models::{
16        FileSearchRequest, FileSearchResults, FlattenedItemResult, PageResult, SearchIndexData,
17        SearchIndexType, SearchRequest, SearchResults, SearchScore,
18    },
19};
20use docbox_database::{
21    DatabasePoolCache, DbPool,
22    models::{
23        document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
24        file::FileId,
25        folder::FolderId,
26        search::{
27            DbSearchResult, count_search_file_pages, delete_file_pages_by_file_id,
28            delete_file_pages_by_scope, search_file_pages,
29        },
30        tenant::Tenant,
31    },
32    sqlx,
33};
34use itertools::Itertools;
35use serde::{Deserialize, Serialize};
36use std::{sync::Arc, vec};
37
38pub use error::{DatabaseSearchError, DatabaseSearchIndexFactoryError};
39
40pub mod error;
41
42#[derive(Debug, Clone, Deserialize, Serialize)]
43pub struct DatabaseSearchConfig {}
44
45impl DatabaseSearchConfig {
46    pub fn from_env() -> Result<Self, DatabaseSearchIndexFactoryError> {
47        Ok(Self {})
48    }
49}
50
51#[derive(Clone)]
52pub struct DatabaseSearchIndexFactory {
53    db: Arc<DatabasePoolCache>,
54}
55
56impl DatabaseSearchIndexFactory {
57    pub fn from_config(
58        db: Arc<DatabasePoolCache>,
59        _config: DatabaseSearchConfig,
60    ) -> Result<Self, DatabaseSearchIndexFactoryError> {
61        Ok(Self { db })
62    }
63
64    pub fn create_search_index(&self, tenant: &Tenant) -> DatabaseSearchIndex {
65        DatabaseSearchIndex {
66            db: self.db.clone(),
67            tenant: Arc::new(tenant.clone()),
68        }
69    }
70}
71
72#[derive(Clone)]
73pub struct DatabaseSearchIndex {
74    db: Arc<DatabasePoolCache>,
75    tenant: Arc<Tenant>,
76}
77
78const TENANT_MIGRATIONS: &[(&str, &str)] = &[
79    (
80        "m1_create_additional_indexes",
81        include_str!("./migrations/m1_create_additional_indexes.sql"),
82    ),
83    (
84        "m2_search_create_files_pages_table",
85        include_str!("./migrations/m2_search_create_files_pages_table.sql"),
86    ),
87    (
88        "m3_create_tsvector_columns",
89        include_str!("./migrations/m3_create_tsvector_columns.sql"),
90    ),
91];
92
93impl DatabaseSearchIndex {
94    pub async fn acquire_db(&self) -> Result<DbPool, SearchError> {
95        let db = self
96            .db
97            .get_tenant_pool(&self.tenant)
98            .await
99            .map_err(|error| {
100                tracing::error!(?error, "failed to acquire database for searching");
101                DatabaseSearchError::AcquireDatabase
102            })?;
103        Ok(db)
104    }
105
106    /// Close the associated tenant database pool
107    pub async fn close(&self) {
108        self.db.close_tenant_pool(&self.tenant).await;
109    }
110}
111
112impl SearchIndex for DatabaseSearchIndex {
113    async fn create_index(&self) -> Result<(), SearchError> {
114        // No-op, creation is handled in the migration running phase
115        Ok(())
116    }
117
118    async fn index_exists(&self) -> Result<bool, SearchError> {
119        // Since "index_exists" is used by the management interface to detect
120        // if the index has been already created, in this case we want to always
121        // report false so that it doesn't think the index exists
122        // (Even though if the tenant exists then the index exists)
123        Ok(false)
124    }
125
126    async fn delete_index(&self) -> Result<(), SearchError> {
127        // No-op
128        Ok(())
129    }
130
131    async fn search_index(
132        &self,
133        scopes: &[DocumentBoxScopeRaw],
134        query: SearchRequest,
135        folder_children: Option<Vec<FolderId>>,
136    ) -> Result<crate::models::SearchResults, SearchError> {
137        let db = self.acquire_db().await?;
138
139        let query_text = query.query.unwrap_or_default();
140
141        let results: Vec<DbSearchResult> = sqlx::query_as(
142            r#"
143WITH
144    "query_data" AS (
145        SELECT plainto_tsquery('english', $1) AS "ts_query"
146    ),
147
148    -- Search links
149    "link_matches" AS (
150        SELECT
151            'Link' AS "item_type",
152            "link"."id" AS "item_id",
153            "folder"."document_box" AS "document_box",
154            ($3::BOOLEAN AND "link"."name_tsv" @@ "query_data"."ts_query") AS "name_match_tsv",
155            ts_rank("link"."name_tsv", "query_data"."ts_query") AS "name_match_tsv_rank",
156            ($3::BOOLEAN AND "link"."name" ILIKE '%' || $1 || '%') AS "name_match",
157            ($4::BOOLEAN AND "link"."value" ILIKE '%' || $1 || '%') AS "content_match",
158            0::FLOAT8 as "content_rank",
159            0::INT AS "total_hits",
160            '[]'::json AS "page_matches",
161            "link"."created_at" AS "created_at"
162        FROM "docbox_links" "link"
163        CROSS JOIN "query_data"
164        LEFT JOIN "docbox_folders" "folder" ON "link"."folder_id" = "folder"."id"
165        WHERE "folder"."document_box" = ANY($2)
166            AND ($6 IS NULL OR "link"."created_at" >= $6)
167            AND ($7 IS NULL OR "link"."created_at" <= $7)
168            AND ($8 IS NULL OR "link"."created_by" = $8)
169            AND ($9 IS NULL OR "link"."folder_id" = ANY($9))
170    ),
171
172    -- Search folders
173    "folder_matches" AS (
174        SELECT
175            'Folder' AS "item_type",
176            "folder"."id" AS "item_id",
177            "folder"."document_box" AS "document_box",
178            ($3::BOOLEAN AND "folder"."name_tsv" @@ "query_data"."ts_query") AS "name_match_tsv",
179            ts_rank("folder"."name_tsv", "query_data"."ts_query") AS "name_match_tsv_rank",
180            ($3::BOOLEAN AND "folder"."name" ILIKE '%' || $1 || '%') AS "name_match",
181            FALSE as "content_match",
182            0::FLOAT8 as "content_rank",
183            0::INT AS "total_hits",
184            '[]'::json AS "page_matches",
185            "folder"."created_at" AS "created_at"
186        FROM "docbox_folders" "folder"
187        CROSS JOIN "query_data"
188        WHERE "folder"."document_box" = ANY($2)
189            AND ($6 IS NULL OR "folder"."created_at" >= $6)
190            AND ($7 IS NULL OR "folder"."created_at" <= $7)
191            AND ($8 IS NULL OR "folder"."created_by" = $8)
192            AND ($9 IS NULL OR "folder"."folder_id" = ANY($9))
193    ),
194
195    -- Search files
196    "file_matches" AS (
197        SELECT
198            'File' AS "item_type",
199            "file"."id" AS "item_id",
200            "folder"."document_box" AS "document_box",
201            ($3::BOOLEAN AND "file"."name_tsv" @@ "query_data"."ts_query") AS "name_match_tsv",
202            ts_rank("file"."name_tsv", "query_data"."ts_query") AS "name_match_tsv_rank",
203            ($3::BOOLEAN AND "file"."name" ILIKE '%' || $1 || '%') AS "name_match",
204            ($4::BOOLEAN AND COUNT("pages"."page") > 0) AS "content_match",
205            COALESCE(AVG("pages"."content_match_rank"), 0) as "content_rank",
206            COALESCE(MAX("pages"."total_hits"), 0) AS "total_hits",
207            (COALESCE(
208                json_agg(
209                    json_build_object(
210                        'page', "pages"."page",
211                        'matched', ts_headline('english', "pages"."content", "query_data"."ts_query", 'StartSel=<em>, StopSel=</em>')
212                    )
213                    ORDER BY "pages"."content_match_rank" DESC, "pages"."page" ASC
214                )  FILTER (WHERE "pages"."page" IS NOT NULL),
215                '[]'::json
216            )) AS "page_matches",
217            "file"."created_at" AS "created_at"
218        FROM "docbox_files" "file"
219        CROSS JOIN "query_data"
220        LEFT JOIN "docbox_folders" "folder"
221            ON "file"."folder_id" = "folder"."id" AND "folder"."document_box" = ANY($2)
222        LEFT JOIN LATERAL (
223            -- Match the page content
224            WITH "page_data" AS (
225                SELECT
226                    "p".*,
227                    "p"."content_tsv" @@ "query_data"."ts_query" AS "content_match_tsv",
228                    "p"."content" ILIKE '%' || $1 || '%' AS "content_match",
229                    COUNT(*) OVER () AS "total_hits",
230                    (ts_rank("p"."content_tsv", "query_data"."ts_query")
231                     + CASE WHEN "p"."content" ILIKE '%' || $1 || '%' THEN 1.0 ELSE 0 END -- Boost result for ILIKE content matches
232                    ) AS "content_match_rank"
233                FROM "docbox_files_pages" "p"
234                WHERE "p"."file_id" = "file"."id"
235            )
236            SELECT *
237            FROM "page_data"
238            WHERE "content_match" OR "content_match_tsv"
239            ORDER BY "content_match_rank" DESC, "page" ASC
240            LIMIT $10::INT
241            OFFSET $11::INT
242        ) "pages" ON $4::BOOLEAN
243        WHERE "folder"."document_box" = ANY($2)
244            AND ($5 IS NULL OR "file"."mime" = $5)
245            AND ($6 IS NULL OR "file"."created_at" >= $6)
246            AND ($7 IS NULL OR "file"."created_at" <= $7)
247            AND ($8 IS NULL OR "file"."created_by" = $8)
248            AND ($9 IS NULL OR "file"."folder_id" = ANY($9))
249
250        GROUP BY "file"."id", "folder"."document_box", "query_data"."ts_query"
251    ),
252
253    "results" AS (
254        SELECT *
255        FROM "link_matches"
256        WHERE "name_match" OR "name_match_tsv" OR "content_match"
257
258        UNION ALL
259
260        SELECT *
261        FROM "folder_matches"
262        WHERE "name_match" OR "name_match_tsv" OR "content_match"
263
264        UNION ALL
265
266        SELECT *
267        FROM "file_matches"
268        WHERE "name_match" OR "name_match_tsv" OR "content_match"
269    )
270
271    (
272        SELECT *,
273        ("name_match_tsv_rank"
274         + "content_rank"
275         + CASE WHEN "name_match" THEN 1.0 ELSE 0 END -- Boost result for ILIKE name matches
276         + CASE WHEN "item_type" = 'Link' AND "content_match" THEN 1.0 ELSE 0 END -- Boost link content matches
277        ) AS "rank",
278        COUNT("item_id") OVER() as "total_count"
279        FROM "results"
280        WHERE "name_match" OR "name_match_tsv" OR "content_match"
281    )
282    ORDER BY "rank" DESC, "created_at" DESC
283    LIMIT $12
284    OFFSET $13"#,
285        )
286        .bind(query_text)
287        .bind(scopes)
288        .bind(query.include_name)
289        .bind(query.include_content)
290        .bind(query.mime.map(|value| value.0.to_string()))
291        .bind(query.created_at.as_ref().map(|created_at| created_at.start))
292        .bind(query.created_at.as_ref().map(|created_at| created_at.end))
293        .bind(query.created_by)
294        .bind(folder_children)
295        .bind(query.max_pages.unwrap_or(3) as i32)
296        .bind(query.pages_offset.unwrap_or_default() as i32)
297        .bind(query.size.unwrap_or(50) as i32)
298        .bind(query.offset.unwrap_or(0) as i32)
299        .fetch_all(&db)
300        .await
301        .map_err(|error| {
302            tracing::error!(?error, "failed to search index");
303            DatabaseSearchError::SearchIndex
304        })?;
305
306        let total_hits = results
307            .first()
308            .map(|result| result.total_count)
309            .unwrap_or_default();
310
311        let results = results
312            .into_iter()
313            .filter_map(|result| {
314                let item_ty = match result.item_type.as_str() {
315                    "File" => SearchIndexType::File,
316                    "Folder" => SearchIndexType::Folder,
317                    "Link" => SearchIndexType::Link,
318                    // Unknown type error, should never occur but must be handled
319                    _ => return None,
320                };
321
322                Some(FlattenedItemResult {
323                    item_ty,
324                    item_id: result.item_id,
325                    document_box: result.document_box,
326                    page_matches: result
327                        .page_matches
328                        .into_iter()
329                        .map(|result| PageResult {
330                            matches: vec![result.matched],
331                            page: result.page as u64,
332                        })
333                        .collect(),
334                    total_hits: result.total_hits as u64,
335                    score: SearchScore::Float(result.rank as f32),
336                    name_match: result.name_match,
337                    content_match: result.content_match,
338                })
339            })
340            .collect();
341
342        Ok(SearchResults {
343            total_hits: total_hits as u64,
344            results,
345        })
346    }
347
348    async fn search_index_file(
349        &self,
350        scope: &DocumentBoxScopeRaw,
351        file_id: FileId,
352        query: FileSearchRequest,
353    ) -> Result<crate::models::FileSearchResults, SearchError> {
354        let db = self.acquire_db().await?;
355        let query_text = query.query.unwrap_or_default();
356        let total_pages = count_search_file_pages(&db, scope, file_id, &query_text)
357            .await
358            .map_err(|error| {
359                tracing::error!(?error, "failed to count search file pages");
360                DatabaseSearchError::CountFilePages
361            })?;
362        let pages = search_file_pages(
363            &db,
364            scope,
365            file_id,
366            &query_text,
367            query.limit.unwrap_or(50) as i64,
368            query.offset.unwrap_or(0) as i64,
369        )
370        .await
371        .map_err(|error| {
372            tracing::error!(?error, "failed to search file pages");
373            DatabaseSearchError::SearchFilePages
374        })?;
375
376        Ok(FileSearchResults {
377            total_hits: total_pages.count as u64,
378            results: pages
379                .into_iter()
380                .map(|page| PageResult {
381                    page: page.page as u64,
382                    matches: vec![page.highlighted_content],
383                })
384                .collect(),
385        })
386    }
387
388    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
389        let db = self.acquire_db().await?;
390
391        for item in data {
392            let pages = match item.pages {
393                Some(value) => value,
394                // Skip anything without pages
395                None => continue,
396            };
397
398            if pages.is_empty() {
399                continue;
400            }
401
402            let values = pages
403                .iter()
404                .enumerate()
405                .map(|(index, _page)| format!("($1, ${}, ${})", 2 + index * 2, 3 + index * 2))
406                .join(",");
407
408            let query = format!(
409                r#"INSERT INTO "docbox_files_pages" ("file_id", "page", "content") VALUES {values}"#
410            );
411
412            let mut query = sqlx::query(&query)
413                // Shared amongst all values
414                .bind(item.item_id);
415
416            for page in pages {
417                query = query.bind(page.page as i32).bind(page.content);
418            }
419
420            if let Err(error) = query.execute(&db).await {
421                tracing::error!(?error, "failed to add search data");
422                return Err(DatabaseSearchError::AddData.into());
423            }
424        }
425
426        Ok(())
427    }
428
429    async fn update_data(
430        &self,
431        _item_id: uuid::Uuid,
432        _data: crate::models::UpdateSearchIndexData,
433    ) -> Result<(), SearchError> {
434        // No-op: Currently page data is never updated, and since this search implementation sources all other
435        // data directly from the database it already has a copy of everything it needs so no changes need to be made
436        Ok(())
437    }
438
439    async fn delete_data(&self, id: uuid::Uuid) -> Result<(), SearchError> {
440        let db = self.acquire_db().await?;
441        delete_file_pages_by_file_id(&db, id)
442            .await
443            .map_err(|error| {
444                tracing::error!(?error, "failed to delete search data by id");
445                DatabaseSearchError::DeleteData
446            })?;
447        Ok(())
448    }
449
450    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
451        let db = self.acquire_db().await?;
452        delete_file_pages_by_scope(&db, scope)
453            .await
454            .map_err(|error| {
455                tracing::error!(?error, "failed to delete search data by scope");
456                DatabaseSearchError::DeleteData
457            })?;
458        Ok(())
459    }
460
461    async fn get_pending_migrations(
462        &self,
463        applied_names: Vec<String>,
464    ) -> Result<Vec<String>, SearchError> {
465        let pending = TENANT_MIGRATIONS
466            .iter()
467            .filter(|(migration_name, _migration)| {
468                // Skip already applied migrations
469                !applied_names
470                    .iter()
471                    .any(|applied_migration| applied_migration.eq(migration_name))
472            })
473            .map(|(migration_name, _migration)| migration_name.to_string())
474            .collect();
475
476        Ok(pending)
477    }
478
479    async fn apply_migration(
480        &self,
481        _tenant: &docbox_database::models::tenant::Tenant,
482        _root_t: &mut docbox_database::DbTransaction<'_>,
483        t: &mut docbox_database::DbTransaction<'_>,
484        name: &str,
485    ) -> Result<(), SearchError> {
486        let (_, migration) = TENANT_MIGRATIONS
487            .iter()
488            .find(|(migration_name, _)| name.eq(*migration_name))
489            .ok_or(DatabaseSearchError::MigrationNotFound)?;
490
491        // Apply the migration
492        docbox_database::migrations::apply_migration(t, name, migration)
493            .await
494            .map_err(|error| {
495                tracing::error!(?error, "failed to apply migration");
496                DatabaseSearchError::ApplyMigration
497            })?;
498
499        Ok(())
500    }
501}