Skip to main content

docbox_search/
lib.rs

1#![forbid(unsafe_code)]
2
3use aws_config::SdkConfig;
4use chrono::Utc;
5use docbox_database::{
6    DatabasePoolCache, DbTransaction,
7    models::{
8        document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
9        file::FileId,
10        folder::FolderId,
11        tenant::Tenant,
12        tenant_migration::{CreateTenantMigration, TenantMigration},
13    },
14};
15use docbox_secrets::SecretManager;
16use models::{
17    FileSearchRequest, FileSearchResults, SearchIndexData, SearchRequest, SearchResults,
18    UpdateSearchIndexData,
19};
20use serde::{Deserialize, Serialize};
21use std::{ops::DerefMut, sync::Arc};
22use thiserror::Error;
23use uuid::Uuid;
24
25pub mod models;
26
27pub use database::{
28    DatabaseSearchConfig, DatabaseSearchError, DatabaseSearchIndexFactory,
29    DatabaseSearchIndexFactoryError,
30};
31pub use opensearch::{
32    OpenSearchConfig, OpenSearchIndexFactory, OpenSearchIndexFactoryError, OpenSearchSearchError,
33};
34pub use typesense::{
35    TypesenseApiKey, TypesenseApiKeyProvider, TypesenseApiKeySecret, TypesenseIndexFactory,
36    TypesenseIndexFactoryError, TypesenseSearchConfig, TypesenseSearchError,
37};
38
39mod database;
40mod opensearch;
41mod typesense;
42
43#[derive(Debug, Clone, Deserialize, Serialize)]
44#[serde(tag = "provider", rename_all = "snake_case")]
45pub enum SearchIndexFactoryConfig {
46    Typesense(typesense::TypesenseSearchConfig),
47    OpenSearch(opensearch::OpenSearchConfig),
48    Database(database::DatabaseSearchConfig),
49}
50
51#[derive(Debug, Error)]
52pub enum SearchIndexFactoryError {
53    #[error(transparent)]
54    Typesense(#[from] typesense::TypesenseIndexFactoryError),
55    #[error(transparent)]
56    OpenSearch(#[from] opensearch::OpenSearchIndexFactoryError),
57    #[error(transparent)]
58    Database(#[from] database::DatabaseSearchIndexFactoryError),
59    #[error("unknown search index factory type requested")]
60    UnknownIndexFactory,
61}
62
63impl SearchIndexFactoryConfig {
64    pub fn from_env() -> Result<Self, SearchIndexFactoryError> {
65        let variant = std::env::var("DOCBOX_SEARCH_INDEX_FACTORY")
66            .unwrap_or_else(|_| "database".to_string())
67            .to_lowercase();
68        match variant.as_str() {
69            "open_search" | "opensearch" => opensearch::OpenSearchConfig::from_env()
70                .map(Self::OpenSearch)
71                .map_err(SearchIndexFactoryError::OpenSearch),
72
73            "typesense" => typesense::TypesenseSearchConfig::from_env()
74                .map(Self::Typesense)
75                .map_err(SearchIndexFactoryError::Typesense),
76
77            "database" => database::DatabaseSearchConfig::from_env()
78                .map(Self::Database)
79                .map_err(SearchIndexFactoryError::Database),
80
81            // Unknown type requested
82            _ => Err(SearchIndexFactoryError::UnknownIndexFactory),
83        }
84    }
85}
86
87#[derive(Clone)]
88pub enum SearchIndexFactory {
89    Typesense(typesense::TypesenseIndexFactory),
90    OpenSearch(opensearch::OpenSearchIndexFactory),
91    Database(database::DatabaseSearchIndexFactory),
92}
93
94impl SearchIndexFactory {
95    /// Create a search index factory from the provided `config`
96    pub fn from_config(
97        aws_config: &SdkConfig,
98        secrets: SecretManager,
99        db: Arc<DatabasePoolCache>,
100        config: SearchIndexFactoryConfig,
101    ) -> Result<Self, SearchIndexFactoryError> {
102        match config {
103            SearchIndexFactoryConfig::Typesense(config) => {
104                tracing::debug!("using typesense search index");
105                typesense::TypesenseIndexFactory::from_config(secrets, config)
106                    .map(SearchIndexFactory::Typesense)
107                    .map_err(SearchIndexFactoryError::Typesense)
108            }
109
110            SearchIndexFactoryConfig::OpenSearch(config) => {
111                tracing::debug!("using opensearch search index");
112                opensearch::OpenSearchIndexFactory::from_config(aws_config, config)
113                    .map(SearchIndexFactory::OpenSearch)
114                    .map_err(SearchIndexFactoryError::OpenSearch)
115            }
116
117            SearchIndexFactoryConfig::Database(config) => {
118                tracing::debug!("using opensearch search index");
119                database::DatabaseSearchIndexFactory::from_config(db, config)
120                    .map(SearchIndexFactory::Database)
121                    .map_err(SearchIndexFactoryError::Database)
122            }
123        }
124    }
125
126    /// Create a new "OpenSearch" search index for the tenant
127    pub fn create_search_index(&self, tenant: &Tenant) -> TenantSearchIndex {
128        match self {
129            SearchIndexFactory::Typesense(factory) => {
130                let search_index = tenant.os_index_name.clone();
131                TenantSearchIndex::Typesense(factory.create_search_index(search_index))
132            }
133
134            SearchIndexFactory::OpenSearch(factory) => {
135                let search_index = opensearch::TenantSearchIndexName::from_tenant(tenant);
136                TenantSearchIndex::OpenSearch(factory.create_search_index(search_index))
137            }
138
139            SearchIndexFactory::Database(factory) => {
140                TenantSearchIndex::Database(factory.create_search_index(tenant))
141            }
142        }
143    }
144}
145
146#[derive(Clone)]
147pub enum TenantSearchIndex {
148    Typesense(typesense::TypesenseIndex),
149    OpenSearch(opensearch::OpenSearchIndex),
150    Database(database::DatabaseSearchIndex),
151}
152
153#[derive(Debug, Error)]
154pub enum SearchError {
155    #[error(transparent)]
156    Typesense(#[from] typesense::TypesenseSearchError),
157    #[error(transparent)]
158    OpenSearch(#[from] opensearch::OpenSearchSearchError),
159    #[error(transparent)]
160    Database(#[from] database::DatabaseSearchError),
161    #[error("failed to perform migration")]
162    Migration,
163}
164
165impl TenantSearchIndex {
166    /// Creates a search index for the tenant
167    #[tracing::instrument(skip(self))]
168    pub async fn create_index(&self) -> Result<(), SearchError> {
169        match self {
170            TenantSearchIndex::Typesense(index) => index.create_index().await,
171            TenantSearchIndex::OpenSearch(index) => index.create_index().await,
172            TenantSearchIndex::Database(index) => index.create_index().await,
173        }
174    }
175
176    /// Checks if the tenant search index exists
177    #[tracing::instrument(skip(self))]
178    pub async fn index_exists(&self) -> Result<bool, SearchError> {
179        match self {
180            TenantSearchIndex::Typesense(index) => index.index_exists().await,
181            TenantSearchIndex::OpenSearch(index) => index.index_exists().await,
182            TenantSearchIndex::Database(index) => index.index_exists().await,
183        }
184    }
185
186    /// Deletes the search index for the tenant
187    #[tracing::instrument(skip(self))]
188    pub async fn delete_index(&self) -> Result<(), SearchError> {
189        match self {
190            TenantSearchIndex::Typesense(index) => index.delete_index().await,
191            TenantSearchIndex::OpenSearch(index) => index.delete_index().await,
192            TenantSearchIndex::Database(index) => index.delete_index().await,
193        }
194    }
195
196    /// Searches the search index with the provided query
197    #[tracing::instrument(skip(self))]
198    pub async fn search_index(
199        &self,
200        scope: &[DocumentBoxScopeRaw],
201        query: SearchRequest,
202        folder_children: Option<Vec<FolderId>>,
203    ) -> Result<SearchResults, SearchError> {
204        match self {
205            TenantSearchIndex::Typesense(index) => {
206                index.search_index(scope, query, folder_children).await
207            }
208            TenantSearchIndex::OpenSearch(index) => {
209                index.search_index(scope, query, folder_children).await
210            }
211            TenantSearchIndex::Database(index) => {
212                index.search_index(scope, query, folder_children).await
213            }
214        }
215    }
216
217    /// Searches the index for matches scoped to a specific file
218    #[tracing::instrument(skip(self))]
219    pub async fn search_index_file(
220        &self,
221        scope: &DocumentBoxScopeRaw,
222        file_id: FileId,
223        query: FileSearchRequest,
224    ) -> Result<FileSearchResults, SearchError> {
225        match self {
226            TenantSearchIndex::Typesense(index) => {
227                index.search_index_file(scope, file_id, query).await
228            }
229            TenantSearchIndex::OpenSearch(index) => {
230                index.search_index_file(scope, file_id, query).await
231            }
232            TenantSearchIndex::Database(index) => {
233                index.search_index_file(scope, file_id, query).await
234            }
235        }
236    }
237
238    /// Adds the provided data to the search index
239    #[tracing::instrument(skip(self))]
240    pub async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
241        match self {
242            TenantSearchIndex::Typesense(index) => index.add_data(data).await,
243            TenantSearchIndex::OpenSearch(index) => index.add_data(data).await,
244            TenantSearchIndex::Database(index) => index.add_data(data).await,
245        }
246    }
247
248    /// Updates the provided data in the search index
249    #[tracing::instrument(skip(self))]
250    pub async fn update_data(
251        &self,
252        item_id: Uuid,
253        data: UpdateSearchIndexData,
254    ) -> Result<(), SearchError> {
255        match self {
256            TenantSearchIndex::Typesense(index) => index.update_data(item_id, data).await,
257            TenantSearchIndex::OpenSearch(index) => index.update_data(item_id, data).await,
258            TenantSearchIndex::Database(index) => index.update_data(item_id, data).await,
259        }
260    }
261
262    /// Deletes the provided data from the search index by `id`
263    #[tracing::instrument(skip(self))]
264    pub async fn delete_data(&self, id: Uuid) -> Result<(), SearchError> {
265        match self {
266            TenantSearchIndex::Typesense(index) => index.delete_data(id).await,
267            TenantSearchIndex::OpenSearch(index) => index.delete_data(id).await,
268            TenantSearchIndex::Database(index) => index.delete_data(id).await,
269        }
270    }
271
272    /// Deletes all data contained within the specified `scope`
273    #[tracing::instrument(skip(self))]
274    pub async fn delete_by_scope(
275        &self,
276        scope: DocumentBoxScopeRawRef<'_>,
277    ) -> Result<(), SearchError> {
278        match self {
279            TenantSearchIndex::Typesense(index) => index.delete_by_scope(scope).await,
280            TenantSearchIndex::OpenSearch(index) => index.delete_by_scope(scope).await,
281            TenantSearchIndex::Database(index) => index.delete_by_scope(scope).await,
282        }
283    }
284
285    /// Get all pending migrations based on the `applied_names` list of applied migrations
286    #[tracing::instrument(skip(self))]
287    pub async fn get_pending_migrations(
288        &self,
289        applied_names: Vec<String>,
290    ) -> Result<Vec<String>, SearchError> {
291        match self {
292            TenantSearchIndex::Typesense(index) => {
293                index.get_pending_migrations(applied_names).await
294            }
295            TenantSearchIndex::OpenSearch(index) => {
296                index.get_pending_migrations(applied_names).await
297            }
298            TenantSearchIndex::Database(index) => index.get_pending_migrations(applied_names).await,
299        }
300    }
301
302    /// Apply a specific migration for a `tenant` by `name`
303    #[tracing::instrument(skip(self))]
304    pub async fn apply_migration(
305        &self,
306        tenant: &Tenant,
307        root_t: &mut DbTransaction<'_>,
308        tenant_t: &mut DbTransaction<'_>,
309        name: &str,
310    ) -> Result<(), SearchError> {
311        // Apply migration logic
312        match self {
313            TenantSearchIndex::Typesense(index) => {
314                index
315                    .apply_migration(tenant, root_t, tenant_t, name)
316                    .await?
317            }
318
319            TenantSearchIndex::OpenSearch(index) => {
320                index
321                    .apply_migration(tenant, root_t, tenant_t, name)
322                    .await?
323            }
324
325            TenantSearchIndex::Database(index) => {
326                index
327                    .apply_migration(tenant, root_t, tenant_t, name)
328                    .await?
329            }
330        }
331
332        // Store the applied migration
333        TenantMigration::create(
334            root_t.deref_mut(),
335            CreateTenantMigration {
336                tenant_id: tenant.id,
337                env: tenant.env.clone(),
338                name: name.to_string(),
339                applied_at: Utc::now(),
340            },
341        )
342        .await
343        .map_err(|error| {
344            tracing::error!(?error, "failed to create tenant migration");
345            SearchError::Migration
346        })?;
347
348        Ok(())
349    }
350
351    /// Apply all pending migrations for a `tenant`
352    ///
353    /// When `target_migration_name` is specified only that target migration will
354    /// be run
355    #[tracing::instrument(skip_all, fields(?tenant, ?target_migration_name))]
356    pub async fn apply_migrations(
357        &self,
358        tenant: &Tenant,
359        root_t: &mut DbTransaction<'_>,
360        tenant_t: &mut DbTransaction<'_>,
361        target_migration_name: Option<&str>,
362    ) -> Result<(), SearchError> {
363        let applied_migrations =
364            TenantMigration::find_by_tenant(root_t.deref_mut(), tenant.id, &tenant.env)
365                .await
366                .map_err(|error| {
367                    tracing::error!(?error, "failed to query tenant migrations");
368                    SearchError::Migration
369                })?;
370        let pending_migrations = self
371            .get_pending_migrations(
372                applied_migrations
373                    .into_iter()
374                    .map(|value| value.name)
375                    .collect(),
376            )
377            .await?;
378
379        for migration_name in pending_migrations {
380            // If targeting a specific migration only apply the target one
381            if target_migration_name
382                .is_some_and(|target_migration_name| target_migration_name.ne(&migration_name))
383            {
384                continue;
385            }
386
387            // Apply the migration
388            if let Err(error) = self
389                .apply_migration(tenant, root_t, tenant_t, &migration_name)
390                .await
391            {
392                tracing::error!(%migration_name, ?error, "failed to apply migration");
393                return Err(error);
394            }
395        }
396
397        Ok(())
398    }
399}
400
401pub(crate) trait SearchIndex: Send + Sync + 'static {
402    async fn create_index(&self) -> Result<(), SearchError>;
403
404    async fn index_exists(&self) -> Result<bool, SearchError>;
405
406    async fn delete_index(&self) -> Result<(), SearchError>;
407
408    async fn search_index(
409        &self,
410        scope: &[DocumentBoxScopeRaw],
411        query: SearchRequest,
412        folder_children: Option<Vec<FolderId>>,
413    ) -> Result<SearchResults, SearchError>;
414
415    async fn search_index_file(
416        &self,
417        scope: &DocumentBoxScopeRaw,
418        file_id: FileId,
419        query: FileSearchRequest,
420    ) -> Result<FileSearchResults, SearchError>;
421
422    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError>;
423
424    async fn update_data(
425        &self,
426        item_id: Uuid,
427        data: UpdateSearchIndexData,
428    ) -> Result<(), SearchError>;
429
430    async fn delete_data(&self, id: Uuid) -> Result<(), SearchError>;
431
432    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError>;
433
434    async fn get_pending_migrations(
435        &self,
436        applied_names: Vec<String>,
437    ) -> Result<Vec<String>, SearchError>;
438
439    async fn apply_migration(
440        &self,
441        tenant: &Tenant,
442        root_t: &mut DbTransaction<'_>,
443        t: &mut DbTransaction<'_>,
444        name: &str,
445    ) -> Result<(), SearchError>;
446}