1#![forbid(unsafe_code)]
2
3use aws_config::SdkConfig;
4use chrono::Utc;
5use docbox_database::{
6 DatabasePoolCache, DbTransaction,
7 models::{
8 document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
9 file::FileId,
10 folder::FolderId,
11 tenant::Tenant,
12 tenant_migration::{CreateTenantMigration, TenantMigration},
13 },
14};
15use docbox_secrets::SecretManager;
16use models::{
17 FileSearchRequest, FileSearchResults, SearchIndexData, SearchRequest, SearchResults,
18 UpdateSearchIndexData,
19};
20use serde::{Deserialize, Serialize};
21use std::{ops::DerefMut, sync::Arc};
22use thiserror::Error;
23use uuid::Uuid;
24
25pub mod models;
26
27pub use database::{
28 DatabaseSearchConfig, DatabaseSearchError, DatabaseSearchIndexFactory,
29 DatabaseSearchIndexFactoryError,
30};
31pub use opensearch::{
32 OpenSearchConfig, OpenSearchIndexFactory, OpenSearchIndexFactoryError, OpenSearchSearchError,
33};
34pub use typesense::{
35 TypesenseApiKey, TypesenseApiKeyProvider, TypesenseApiKeySecret, TypesenseIndexFactory,
36 TypesenseIndexFactoryError, TypesenseSearchConfig, TypesenseSearchError,
37};
38
39mod database;
40mod opensearch;
41mod typesense;
42
43#[derive(Debug, Clone, Deserialize, Serialize)]
44#[serde(tag = "provider", rename_all = "snake_case")]
45pub enum SearchIndexFactoryConfig {
46 Typesense(typesense::TypesenseSearchConfig),
47 OpenSearch(opensearch::OpenSearchConfig),
48 Database(database::DatabaseSearchConfig),
49}
50
51#[derive(Debug, Error)]
52pub enum SearchIndexFactoryError {
53 #[error(transparent)]
54 Typesense(#[from] typesense::TypesenseIndexFactoryError),
55 #[error(transparent)]
56 OpenSearch(#[from] opensearch::OpenSearchIndexFactoryError),
57 #[error(transparent)]
58 Database(#[from] database::DatabaseSearchIndexFactoryError),
59 #[error("unknown search index factory type requested")]
60 UnknownIndexFactory,
61}
62
63impl SearchIndexFactoryConfig {
64 pub fn from_env() -> Result<Self, SearchIndexFactoryError> {
65 let variant = std::env::var("DOCBOX_SEARCH_INDEX_FACTORY")
66 .unwrap_or_else(|_| "database".to_string())
67 .to_lowercase();
68 match variant.as_str() {
69 "open_search" | "opensearch" => opensearch::OpenSearchConfig::from_env()
70 .map(Self::OpenSearch)
71 .map_err(SearchIndexFactoryError::OpenSearch),
72
73 "typesense" => typesense::TypesenseSearchConfig::from_env()
74 .map(Self::Typesense)
75 .map_err(SearchIndexFactoryError::Typesense),
76
77 "database" => database::DatabaseSearchConfig::from_env()
78 .map(Self::Database)
79 .map_err(SearchIndexFactoryError::Database),
80
81 _ => Err(SearchIndexFactoryError::UnknownIndexFactory),
83 }
84 }
85}
86
87#[derive(Clone)]
88pub enum SearchIndexFactory {
89 Typesense(typesense::TypesenseIndexFactory),
90 OpenSearch(opensearch::OpenSearchIndexFactory),
91 Database(database::DatabaseSearchIndexFactory),
92}
93
94impl SearchIndexFactory {
95 pub fn from_config(
97 aws_config: &SdkConfig,
98 secrets: SecretManager,
99 db: Arc<DatabasePoolCache>,
100 config: SearchIndexFactoryConfig,
101 ) -> Result<Self, SearchIndexFactoryError> {
102 match config {
103 SearchIndexFactoryConfig::Typesense(config) => {
104 tracing::debug!("using typesense search index");
105 typesense::TypesenseIndexFactory::from_config(secrets, config)
106 .map(SearchIndexFactory::Typesense)
107 .map_err(SearchIndexFactoryError::Typesense)
108 }
109
110 SearchIndexFactoryConfig::OpenSearch(config) => {
111 tracing::debug!("using opensearch search index");
112 opensearch::OpenSearchIndexFactory::from_config(aws_config, config)
113 .map(SearchIndexFactory::OpenSearch)
114 .map_err(SearchIndexFactoryError::OpenSearch)
115 }
116
117 SearchIndexFactoryConfig::Database(config) => {
118 tracing::debug!("using opensearch search index");
119 database::DatabaseSearchIndexFactory::from_config(db, config)
120 .map(SearchIndexFactory::Database)
121 .map_err(SearchIndexFactoryError::Database)
122 }
123 }
124 }
125
126 pub fn create_search_index(&self, tenant: &Tenant) -> TenantSearchIndex {
128 match self {
129 SearchIndexFactory::Typesense(factory) => {
130 let search_index = tenant.os_index_name.clone();
131 TenantSearchIndex::Typesense(factory.create_search_index(search_index))
132 }
133
134 SearchIndexFactory::OpenSearch(factory) => {
135 let search_index = opensearch::TenantSearchIndexName::from_tenant(tenant);
136 TenantSearchIndex::OpenSearch(factory.create_search_index(search_index))
137 }
138
139 SearchIndexFactory::Database(factory) => {
140 TenantSearchIndex::Database(factory.create_search_index(tenant))
141 }
142 }
143 }
144}
145
146#[derive(Clone)]
147pub enum TenantSearchIndex {
148 Typesense(typesense::TypesenseIndex),
149 OpenSearch(opensearch::OpenSearchIndex),
150 Database(database::DatabaseSearchIndex),
151}
152
153#[derive(Debug, Error)]
154pub enum SearchError {
155 #[error(transparent)]
156 Typesense(#[from] typesense::TypesenseSearchError),
157 #[error(transparent)]
158 OpenSearch(#[from] opensearch::OpenSearchSearchError),
159 #[error(transparent)]
160 Database(#[from] database::DatabaseSearchError),
161 #[error("failed to perform migration")]
162 Migration,
163}
164
165impl TenantSearchIndex {
166 #[tracing::instrument(skip(self))]
168 pub async fn create_index(&self) -> Result<(), SearchError> {
169 match self {
170 TenantSearchIndex::Typesense(index) => index.create_index().await,
171 TenantSearchIndex::OpenSearch(index) => index.create_index().await,
172 TenantSearchIndex::Database(index) => index.create_index().await,
173 }
174 }
175
176 #[tracing::instrument(skip(self))]
178 pub async fn index_exists(&self) -> Result<bool, SearchError> {
179 match self {
180 TenantSearchIndex::Typesense(index) => index.index_exists().await,
181 TenantSearchIndex::OpenSearch(index) => index.index_exists().await,
182 TenantSearchIndex::Database(index) => index.index_exists().await,
183 }
184 }
185
186 #[tracing::instrument(skip(self))]
188 pub async fn delete_index(&self) -> Result<(), SearchError> {
189 match self {
190 TenantSearchIndex::Typesense(index) => index.delete_index().await,
191 TenantSearchIndex::OpenSearch(index) => index.delete_index().await,
192 TenantSearchIndex::Database(index) => index.delete_index().await,
193 }
194 }
195
196 #[tracing::instrument(skip(self))]
198 pub async fn search_index(
199 &self,
200 scope: &[DocumentBoxScopeRaw],
201 query: SearchRequest,
202 folder_children: Option<Vec<FolderId>>,
203 ) -> Result<SearchResults, SearchError> {
204 match self {
205 TenantSearchIndex::Typesense(index) => {
206 index.search_index(scope, query, folder_children).await
207 }
208 TenantSearchIndex::OpenSearch(index) => {
209 index.search_index(scope, query, folder_children).await
210 }
211 TenantSearchIndex::Database(index) => {
212 index.search_index(scope, query, folder_children).await
213 }
214 }
215 }
216
217 #[tracing::instrument(skip(self))]
219 pub async fn search_index_file(
220 &self,
221 scope: &DocumentBoxScopeRaw,
222 file_id: FileId,
223 query: FileSearchRequest,
224 ) -> Result<FileSearchResults, SearchError> {
225 match self {
226 TenantSearchIndex::Typesense(index) => {
227 index.search_index_file(scope, file_id, query).await
228 }
229 TenantSearchIndex::OpenSearch(index) => {
230 index.search_index_file(scope, file_id, query).await
231 }
232 TenantSearchIndex::Database(index) => {
233 index.search_index_file(scope, file_id, query).await
234 }
235 }
236 }
237
238 #[tracing::instrument(skip(self))]
240 pub async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
241 match self {
242 TenantSearchIndex::Typesense(index) => index.add_data(data).await,
243 TenantSearchIndex::OpenSearch(index) => index.add_data(data).await,
244 TenantSearchIndex::Database(index) => index.add_data(data).await,
245 }
246 }
247
248 #[tracing::instrument(skip(self))]
250 pub async fn update_data(
251 &self,
252 item_id: Uuid,
253 data: UpdateSearchIndexData,
254 ) -> Result<(), SearchError> {
255 match self {
256 TenantSearchIndex::Typesense(index) => index.update_data(item_id, data).await,
257 TenantSearchIndex::OpenSearch(index) => index.update_data(item_id, data).await,
258 TenantSearchIndex::Database(index) => index.update_data(item_id, data).await,
259 }
260 }
261
262 #[tracing::instrument(skip(self))]
264 pub async fn delete_data(&self, id: Uuid) -> Result<(), SearchError> {
265 match self {
266 TenantSearchIndex::Typesense(index) => index.delete_data(id).await,
267 TenantSearchIndex::OpenSearch(index) => index.delete_data(id).await,
268 TenantSearchIndex::Database(index) => index.delete_data(id).await,
269 }
270 }
271
272 #[tracing::instrument(skip(self))]
274 pub async fn delete_by_scope(
275 &self,
276 scope: DocumentBoxScopeRawRef<'_>,
277 ) -> Result<(), SearchError> {
278 match self {
279 TenantSearchIndex::Typesense(index) => index.delete_by_scope(scope).await,
280 TenantSearchIndex::OpenSearch(index) => index.delete_by_scope(scope).await,
281 TenantSearchIndex::Database(index) => index.delete_by_scope(scope).await,
282 }
283 }
284
285 #[tracing::instrument(skip(self))]
287 pub async fn get_pending_migrations(
288 &self,
289 applied_names: Vec<String>,
290 ) -> Result<Vec<String>, SearchError> {
291 match self {
292 TenantSearchIndex::Typesense(index) => {
293 index.get_pending_migrations(applied_names).await
294 }
295 TenantSearchIndex::OpenSearch(index) => {
296 index.get_pending_migrations(applied_names).await
297 }
298 TenantSearchIndex::Database(index) => index.get_pending_migrations(applied_names).await,
299 }
300 }
301
302 #[tracing::instrument(skip(self))]
304 pub async fn apply_migration(
305 &self,
306 tenant: &Tenant,
307 root_t: &mut DbTransaction<'_>,
308 tenant_t: &mut DbTransaction<'_>,
309 name: &str,
310 ) -> Result<(), SearchError> {
311 match self {
313 TenantSearchIndex::Typesense(index) => {
314 index
315 .apply_migration(tenant, root_t, tenant_t, name)
316 .await?
317 }
318
319 TenantSearchIndex::OpenSearch(index) => {
320 index
321 .apply_migration(tenant, root_t, tenant_t, name)
322 .await?
323 }
324
325 TenantSearchIndex::Database(index) => {
326 index
327 .apply_migration(tenant, root_t, tenant_t, name)
328 .await?
329 }
330 }
331
332 TenantMigration::create(
334 root_t.deref_mut(),
335 CreateTenantMigration {
336 tenant_id: tenant.id,
337 env: tenant.env.clone(),
338 name: name.to_string(),
339 applied_at: Utc::now(),
340 },
341 )
342 .await
343 .map_err(|error| {
344 tracing::error!(?error, "failed to create tenant migration");
345 SearchError::Migration
346 })?;
347
348 Ok(())
349 }
350
351 #[tracing::instrument(skip_all, fields(?tenant, ?target_migration_name))]
356 pub async fn apply_migrations(
357 &self,
358 tenant: &Tenant,
359 root_t: &mut DbTransaction<'_>,
360 tenant_t: &mut DbTransaction<'_>,
361 target_migration_name: Option<&str>,
362 ) -> Result<(), SearchError> {
363 let applied_migrations =
364 TenantMigration::find_by_tenant(root_t.deref_mut(), tenant.id, &tenant.env)
365 .await
366 .map_err(|error| {
367 tracing::error!(?error, "failed to query tenant migrations");
368 SearchError::Migration
369 })?;
370 let pending_migrations = self
371 .get_pending_migrations(
372 applied_migrations
373 .into_iter()
374 .map(|value| value.name)
375 .collect(),
376 )
377 .await?;
378
379 for migration_name in pending_migrations {
380 if target_migration_name
382 .is_some_and(|target_migration_name| target_migration_name.ne(&migration_name))
383 {
384 continue;
385 }
386
387 if let Err(error) = self
389 .apply_migration(tenant, root_t, tenant_t, &migration_name)
390 .await
391 {
392 tracing::error!(%migration_name, ?error, "failed to apply migration");
393 return Err(error);
394 }
395 }
396
397 Ok(())
398 }
399}
400
401pub(crate) trait SearchIndex: Send + Sync + 'static {
402 async fn create_index(&self) -> Result<(), SearchError>;
403
404 async fn index_exists(&self) -> Result<bool, SearchError>;
405
406 async fn delete_index(&self) -> Result<(), SearchError>;
407
408 async fn search_index(
409 &self,
410 scope: &[DocumentBoxScopeRaw],
411 query: SearchRequest,
412 folder_children: Option<Vec<FolderId>>,
413 ) -> Result<SearchResults, SearchError>;
414
415 async fn search_index_file(
416 &self,
417 scope: &DocumentBoxScopeRaw,
418 file_id: FileId,
419 query: FileSearchRequest,
420 ) -> Result<FileSearchResults, SearchError>;
421
422 async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError>;
423
424 async fn update_data(
425 &self,
426 item_id: Uuid,
427 data: UpdateSearchIndexData,
428 ) -> Result<(), SearchError>;
429
430 async fn delete_data(&self, id: Uuid) -> Result<(), SearchError>;
431
432 async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError>;
433
434 async fn get_pending_migrations(
435 &self,
436 applied_names: Vec<String>,
437 ) -> Result<Vec<String>, SearchError>;
438
439 async fn apply_migration(
440 &self,
441 tenant: &Tenant,
442 root_t: &mut DbTransaction<'_>,
443 t: &mut DbTransaction<'_>,
444 name: &str,
445 ) -> Result<(), SearchError>;
446}