Skip to main content

docbox_search/
lib.rs

1#![forbid(unsafe_code)]
2
3use aws_config::SdkConfig;
4use chrono::Utc;
5use docbox_database::{
6    DatabasePoolCache, DbTransaction,
7    models::{
8        document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
9        file::FileId,
10        folder::FolderId,
11        tenant::Tenant,
12        tenant_migration::{CreateTenantMigration, TenantMigration},
13    },
14};
15use docbox_secrets::SecretManager;
16use models::{
17    FileSearchRequest, FileSearchResults, SearchIndexData, SearchRequest, SearchResults,
18    UpdateSearchIndexData,
19};
20use serde::{Deserialize, Serialize};
21use std::{ops::DerefMut, sync::Arc};
22use thiserror::Error;
23use uuid::Uuid;
24
25pub mod models;
26
27pub use database::{
28    DatabaseSearchConfig, DatabaseSearchError, DatabaseSearchIndex, DatabaseSearchIndexFactory,
29    DatabaseSearchIndexFactoryError,
30};
31pub use opensearch::{
32    OpenSearchConfig, OpenSearchIndex, OpenSearchIndexFactory, OpenSearchIndexFactoryError,
33    OpenSearchSearchError,
34};
35pub use typesense::{
36    TypesenseApiKey, TypesenseApiKeyProvider, TypesenseApiKeySecret, TypesenseIndex,
37    TypesenseIndexFactory, TypesenseIndexFactoryError, TypesenseSearchConfig, TypesenseSearchError,
38};
39
40mod database;
41mod opensearch;
42mod typesense;
43
44#[derive(Debug, Clone, Deserialize, Serialize)]
45#[serde(tag = "provider", rename_all = "snake_case")]
46pub enum SearchIndexFactoryConfig {
47    Typesense(typesense::TypesenseSearchConfig),
48    OpenSearch(opensearch::OpenSearchConfig),
49    Database(database::DatabaseSearchConfig),
50}
51
52#[derive(Debug, Error)]
53pub enum SearchIndexFactoryError {
54    #[error(transparent)]
55    Typesense(#[from] typesense::TypesenseIndexFactoryError),
56    #[error(transparent)]
57    OpenSearch(#[from] opensearch::OpenSearchIndexFactoryError),
58    #[error(transparent)]
59    Database(#[from] database::DatabaseSearchIndexFactoryError),
60    #[error("unknown search index factory type requested")]
61    UnknownIndexFactory,
62}
63
64impl SearchIndexFactoryConfig {
65    pub fn from_env() -> Result<Self, SearchIndexFactoryError> {
66        let variant = std::env::var("DOCBOX_SEARCH_INDEX_FACTORY")
67            .unwrap_or_else(|_| "database".to_string())
68            .to_lowercase();
69        match variant.as_str() {
70            "open_search" | "opensearch" => opensearch::OpenSearchConfig::from_env()
71                .map(Self::OpenSearch)
72                .map_err(SearchIndexFactoryError::OpenSearch),
73
74            "typesense" => typesense::TypesenseSearchConfig::from_env()
75                .map(Self::Typesense)
76                .map_err(SearchIndexFactoryError::Typesense),
77
78            "database" => database::DatabaseSearchConfig::from_env()
79                .map(Self::Database)
80                .map_err(SearchIndexFactoryError::Database),
81
82            // Unknown type requested
83            _ => Err(SearchIndexFactoryError::UnknownIndexFactory),
84        }
85    }
86}
87
88#[derive(Clone)]
89pub enum SearchIndexFactory {
90    Typesense(typesense::TypesenseIndexFactory),
91    OpenSearch(opensearch::OpenSearchIndexFactory),
92    Database(database::DatabaseSearchIndexFactory),
93}
94
95impl SearchIndexFactory {
96    /// Create a search index factory from the provided `config`
97    pub fn from_config(
98        aws_config: &SdkConfig,
99        secrets: SecretManager,
100        db: Arc<DatabasePoolCache>,
101        config: SearchIndexFactoryConfig,
102    ) -> Result<Self, SearchIndexFactoryError> {
103        match config {
104            SearchIndexFactoryConfig::Typesense(config) => {
105                tracing::debug!("using typesense search index");
106                typesense::TypesenseIndexFactory::from_config(secrets, config)
107                    .map(SearchIndexFactory::Typesense)
108                    .map_err(SearchIndexFactoryError::Typesense)
109            }
110
111            SearchIndexFactoryConfig::OpenSearch(config) => {
112                tracing::debug!("using opensearch search index");
113                opensearch::OpenSearchIndexFactory::from_config(aws_config, config)
114                    .map(SearchIndexFactory::OpenSearch)
115                    .map_err(SearchIndexFactoryError::OpenSearch)
116            }
117
118            SearchIndexFactoryConfig::Database(config) => {
119                tracing::debug!("using opensearch search index");
120                database::DatabaseSearchIndexFactory::from_config(db, config)
121                    .map(SearchIndexFactory::Database)
122                    .map_err(SearchIndexFactoryError::Database)
123            }
124        }
125    }
126
127    /// Create a new "OpenSearch" search index for the tenant
128    pub fn create_search_index(&self, tenant: &Tenant) -> TenantSearchIndex {
129        match self {
130            SearchIndexFactory::Typesense(factory) => {
131                let search_index = tenant.os_index_name.clone();
132                TenantSearchIndex::Typesense(factory.create_search_index(search_index))
133            }
134
135            SearchIndexFactory::OpenSearch(factory) => {
136                let search_index = opensearch::TenantSearchIndexName::from_tenant(tenant);
137                TenantSearchIndex::OpenSearch(factory.create_search_index(search_index))
138            }
139
140            SearchIndexFactory::Database(factory) => {
141                TenantSearchIndex::Database(factory.create_search_index(tenant))
142            }
143        }
144    }
145}
146
147#[derive(Clone)]
148pub enum TenantSearchIndex {
149    Typesense(typesense::TypesenseIndex),
150    OpenSearch(opensearch::OpenSearchIndex),
151    Database(database::DatabaseSearchIndex),
152}
153
154#[derive(Debug, Error)]
155pub enum SearchError {
156    #[error(transparent)]
157    Typesense(#[from] typesense::TypesenseSearchError),
158    #[error(transparent)]
159    OpenSearch(#[from] opensearch::OpenSearchSearchError),
160    #[error(transparent)]
161    Database(#[from] database::DatabaseSearchError),
162    #[error("failed to perform migration")]
163    Migration,
164}
165
166impl TenantSearchIndex {
167    /// Creates a search index for the tenant
168    #[tracing::instrument(skip(self))]
169    pub async fn create_index(&self) -> Result<(), SearchError> {
170        match self {
171            TenantSearchIndex::Typesense(index) => index.create_index().await,
172            TenantSearchIndex::OpenSearch(index) => index.create_index().await,
173            TenantSearchIndex::Database(index) => index.create_index().await,
174        }
175    }
176
177    /// Checks if the tenant search index exists
178    #[tracing::instrument(skip(self))]
179    pub async fn index_exists(&self) -> Result<bool, SearchError> {
180        match self {
181            TenantSearchIndex::Typesense(index) => index.index_exists().await,
182            TenantSearchIndex::OpenSearch(index) => index.index_exists().await,
183            TenantSearchIndex::Database(index) => index.index_exists().await,
184        }
185    }
186
187    /// Deletes the search index for the tenant
188    #[tracing::instrument(skip(self))]
189    pub async fn delete_index(&self) -> Result<(), SearchError> {
190        match self {
191            TenantSearchIndex::Typesense(index) => index.delete_index().await,
192            TenantSearchIndex::OpenSearch(index) => index.delete_index().await,
193            TenantSearchIndex::Database(index) => index.delete_index().await,
194        }
195    }
196
197    /// Searches the search index with the provided query
198    #[tracing::instrument(skip(self))]
199    pub async fn search_index(
200        &self,
201        scope: &[DocumentBoxScopeRaw],
202        query: SearchRequest,
203        folder_children: Option<Vec<FolderId>>,
204    ) -> Result<SearchResults, SearchError> {
205        match self {
206            TenantSearchIndex::Typesense(index) => {
207                index.search_index(scope, query, folder_children).await
208            }
209            TenantSearchIndex::OpenSearch(index) => {
210                index.search_index(scope, query, folder_children).await
211            }
212            TenantSearchIndex::Database(index) => {
213                index.search_index(scope, query, folder_children).await
214            }
215        }
216    }
217
218    /// Searches the index for matches scoped to a specific file
219    #[tracing::instrument(skip(self))]
220    pub async fn search_index_file(
221        &self,
222        scope: &DocumentBoxScopeRaw,
223        file_id: FileId,
224        query: FileSearchRequest,
225    ) -> Result<FileSearchResults, SearchError> {
226        match self {
227            TenantSearchIndex::Typesense(index) => {
228                index.search_index_file(scope, file_id, query).await
229            }
230            TenantSearchIndex::OpenSearch(index) => {
231                index.search_index_file(scope, file_id, query).await
232            }
233            TenantSearchIndex::Database(index) => {
234                index.search_index_file(scope, file_id, query).await
235            }
236        }
237    }
238
239    /// Adds the provided data to the search index
240    #[tracing::instrument(skip(self))]
241    pub async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
242        match self {
243            TenantSearchIndex::Typesense(index) => index.add_data(data).await,
244            TenantSearchIndex::OpenSearch(index) => index.add_data(data).await,
245            TenantSearchIndex::Database(index) => index.add_data(data).await,
246        }
247    }
248
249    /// Updates the provided data in the search index
250    #[tracing::instrument(skip(self))]
251    pub async fn update_data(
252        &self,
253        item_id: Uuid,
254        data: UpdateSearchIndexData,
255    ) -> Result<(), SearchError> {
256        match self {
257            TenantSearchIndex::Typesense(index) => index.update_data(item_id, data).await,
258            TenantSearchIndex::OpenSearch(index) => index.update_data(item_id, data).await,
259            TenantSearchIndex::Database(index) => index.update_data(item_id, data).await,
260        }
261    }
262
263    /// Deletes the provided data from the search index by `id`
264    #[tracing::instrument(skip(self))]
265    pub async fn delete_data(&self, id: Uuid) -> Result<(), SearchError> {
266        match self {
267            TenantSearchIndex::Typesense(index) => index.delete_data(id).await,
268            TenantSearchIndex::OpenSearch(index) => index.delete_data(id).await,
269            TenantSearchIndex::Database(index) => index.delete_data(id).await,
270        }
271    }
272
273    /// Deletes all data contained within the specified `scope`
274    #[tracing::instrument(skip(self))]
275    pub async fn delete_by_scope(
276        &self,
277        scope: DocumentBoxScopeRawRef<'_>,
278    ) -> Result<(), SearchError> {
279        match self {
280            TenantSearchIndex::Typesense(index) => index.delete_by_scope(scope).await,
281            TenantSearchIndex::OpenSearch(index) => index.delete_by_scope(scope).await,
282            TenantSearchIndex::Database(index) => index.delete_by_scope(scope).await,
283        }
284    }
285
286    /// Get all pending migrations based on the `applied_names` list of applied migrations
287    #[tracing::instrument(skip(self))]
288    pub async fn get_pending_migrations(
289        &self,
290        applied_names: Vec<String>,
291    ) -> Result<Vec<String>, SearchError> {
292        match self {
293            TenantSearchIndex::Typesense(index) => {
294                index.get_pending_migrations(applied_names).await
295            }
296            TenantSearchIndex::OpenSearch(index) => {
297                index.get_pending_migrations(applied_names).await
298            }
299            TenantSearchIndex::Database(index) => index.get_pending_migrations(applied_names).await,
300        }
301    }
302
303    /// Apply a specific migration for a `tenant` by `name`
304    #[tracing::instrument(skip(self))]
305    pub async fn apply_migration(
306        &self,
307        tenant: &Tenant,
308        root_t: &mut DbTransaction<'_>,
309        tenant_t: &mut DbTransaction<'_>,
310        name: &str,
311    ) -> Result<(), SearchError> {
312        // Apply migration logic
313        match self {
314            TenantSearchIndex::Typesense(index) => {
315                index
316                    .apply_migration(tenant, root_t, tenant_t, name)
317                    .await?
318            }
319
320            TenantSearchIndex::OpenSearch(index) => {
321                index
322                    .apply_migration(tenant, root_t, tenant_t, name)
323                    .await?
324            }
325
326            TenantSearchIndex::Database(index) => {
327                index
328                    .apply_migration(tenant, root_t, tenant_t, name)
329                    .await?
330            }
331        }
332
333        // Store the applied migration
334        TenantMigration::create(
335            root_t.deref_mut(),
336            CreateTenantMigration {
337                tenant_id: tenant.id,
338                env: tenant.env.clone(),
339                name: name.to_string(),
340                applied_at: Utc::now(),
341            },
342        )
343        .await
344        .map_err(|error| {
345            tracing::error!(?error, "failed to create tenant migration");
346            SearchError::Migration
347        })?;
348
349        Ok(())
350    }
351
352    /// Apply all pending migrations for a `tenant`
353    ///
354    /// When `target_migration_name` is specified only that target migration will
355    /// be run
356    #[tracing::instrument(skip_all, fields(?tenant, ?target_migration_name))]
357    pub async fn apply_migrations(
358        &self,
359        tenant: &Tenant,
360        root_t: &mut DbTransaction<'_>,
361        tenant_t: &mut DbTransaction<'_>,
362        target_migration_name: Option<&str>,
363    ) -> Result<(), SearchError> {
364        let applied_migrations =
365            TenantMigration::find_by_tenant(root_t.deref_mut(), tenant.id, &tenant.env)
366                .await
367                .map_err(|error| {
368                    tracing::error!(?error, "failed to query tenant migrations");
369                    SearchError::Migration
370                })?;
371        let pending_migrations = self
372            .get_pending_migrations(
373                applied_migrations
374                    .into_iter()
375                    .map(|value| value.name)
376                    .collect(),
377            )
378            .await?;
379
380        for migration_name in pending_migrations {
381            // If targeting a specific migration only apply the target one
382            if target_migration_name
383                .is_some_and(|target_migration_name| target_migration_name.ne(&migration_name))
384            {
385                continue;
386            }
387
388            // Apply the migration
389            if let Err(error) = self
390                .apply_migration(tenant, root_t, tenant_t, &migration_name)
391                .await
392            {
393                tracing::error!(%migration_name, ?error, "failed to apply migration");
394                return Err(error);
395            }
396        }
397
398        Ok(())
399    }
400}
401
402pub(crate) trait SearchIndex: Send + Sync + 'static {
403    async fn create_index(&self) -> Result<(), SearchError>;
404
405    async fn index_exists(&self) -> Result<bool, SearchError>;
406
407    async fn delete_index(&self) -> Result<(), SearchError>;
408
409    async fn search_index(
410        &self,
411        scope: &[DocumentBoxScopeRaw],
412        query: SearchRequest,
413        folder_children: Option<Vec<FolderId>>,
414    ) -> Result<SearchResults, SearchError>;
415
416    async fn search_index_file(
417        &self,
418        scope: &DocumentBoxScopeRaw,
419        file_id: FileId,
420        query: FileSearchRequest,
421    ) -> Result<FileSearchResults, SearchError>;
422
423    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError>;
424
425    async fn update_data(
426        &self,
427        item_id: Uuid,
428        data: UpdateSearchIndexData,
429    ) -> Result<(), SearchError>;
430
431    async fn delete_data(&self, id: Uuid) -> Result<(), SearchError>;
432
433    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError>;
434
435    async fn get_pending_migrations(
436        &self,
437        applied_names: Vec<String>,
438    ) -> Result<Vec<String>, SearchError>;
439
440    async fn apply_migration(
441        &self,
442        tenant: &Tenant,
443        root_t: &mut DbTransaction<'_>,
444        t: &mut DbTransaction<'_>,
445        name: &str,
446    ) -> Result<(), SearchError>;
447}