1#![forbid(unsafe_code)]
2
3use aws_config::SdkConfig;
4use chrono::Utc;
5use docbox_database::{
6 DatabasePoolCache, DbTransaction,
7 models::{
8 document_box::{DocumentBoxScopeRaw, DocumentBoxScopeRawRef},
9 file::FileId,
10 folder::FolderId,
11 tenant::Tenant,
12 tenant_migration::{CreateTenantMigration, TenantMigration},
13 },
14};
15use docbox_secrets::SecretManager;
16use models::{
17 FileSearchRequest, FileSearchResults, SearchIndexData, SearchRequest, SearchResults,
18 UpdateSearchIndexData,
19};
20use serde::{Deserialize, Serialize};
21use std::{ops::DerefMut, sync::Arc};
22use thiserror::Error;
23use uuid::Uuid;
24
25pub mod models;
26
27pub use database::{
28 DatabaseSearchConfig, DatabaseSearchError, DatabaseSearchIndex, DatabaseSearchIndexFactory,
29 DatabaseSearchIndexFactoryError,
30};
31pub use opensearch::{
32 OpenSearchConfig, OpenSearchIndex, OpenSearchIndexFactory, OpenSearchIndexFactoryError,
33 OpenSearchSearchError,
34};
35pub use typesense::{
36 TypesenseApiKey, TypesenseApiKeyProvider, TypesenseApiKeySecret, TypesenseIndex,
37 TypesenseIndexFactory, TypesenseIndexFactoryError, TypesenseSearchConfig, TypesenseSearchError,
38};
39
40mod database;
41mod opensearch;
42mod typesense;
43
44#[derive(Debug, Clone, Deserialize, Serialize)]
45#[serde(tag = "provider", rename_all = "snake_case")]
46pub enum SearchIndexFactoryConfig {
47 Typesense(typesense::TypesenseSearchConfig),
48 OpenSearch(opensearch::OpenSearchConfig),
49 Database(database::DatabaseSearchConfig),
50}
51
52#[derive(Debug, Error)]
53pub enum SearchIndexFactoryError {
54 #[error(transparent)]
55 Typesense(#[from] typesense::TypesenseIndexFactoryError),
56 #[error(transparent)]
57 OpenSearch(#[from] opensearch::OpenSearchIndexFactoryError),
58 #[error(transparent)]
59 Database(#[from] database::DatabaseSearchIndexFactoryError),
60 #[error("unknown search index factory type requested")]
61 UnknownIndexFactory,
62}
63
64impl SearchIndexFactoryConfig {
65 pub fn from_env() -> Result<Self, SearchIndexFactoryError> {
66 let variant = std::env::var("DOCBOX_SEARCH_INDEX_FACTORY")
67 .unwrap_or_else(|_| "database".to_string())
68 .to_lowercase();
69 match variant.as_str() {
70 "open_search" | "opensearch" => opensearch::OpenSearchConfig::from_env()
71 .map(Self::OpenSearch)
72 .map_err(SearchIndexFactoryError::OpenSearch),
73
74 "typesense" => typesense::TypesenseSearchConfig::from_env()
75 .map(Self::Typesense)
76 .map_err(SearchIndexFactoryError::Typesense),
77
78 "database" => database::DatabaseSearchConfig::from_env()
79 .map(Self::Database)
80 .map_err(SearchIndexFactoryError::Database),
81
82 _ => Err(SearchIndexFactoryError::UnknownIndexFactory),
84 }
85 }
86}
87
88#[derive(Clone)]
89pub enum SearchIndexFactory {
90 Typesense(typesense::TypesenseIndexFactory),
91 OpenSearch(opensearch::OpenSearchIndexFactory),
92 Database(database::DatabaseSearchIndexFactory),
93}
94
95impl SearchIndexFactory {
96 pub fn from_config(
98 aws_config: &SdkConfig,
99 secrets: SecretManager,
100 db: Arc<DatabasePoolCache>,
101 config: SearchIndexFactoryConfig,
102 ) -> Result<Self, SearchIndexFactoryError> {
103 match config {
104 SearchIndexFactoryConfig::Typesense(config) => {
105 tracing::debug!("using typesense search index");
106 typesense::TypesenseIndexFactory::from_config(secrets, config)
107 .map(SearchIndexFactory::Typesense)
108 .map_err(SearchIndexFactoryError::Typesense)
109 }
110
111 SearchIndexFactoryConfig::OpenSearch(config) => {
112 tracing::debug!("using opensearch search index");
113 opensearch::OpenSearchIndexFactory::from_config(aws_config, config)
114 .map(SearchIndexFactory::OpenSearch)
115 .map_err(SearchIndexFactoryError::OpenSearch)
116 }
117
118 SearchIndexFactoryConfig::Database(config) => {
119 tracing::debug!("using opensearch search index");
120 database::DatabaseSearchIndexFactory::from_config(db, config)
121 .map(SearchIndexFactory::Database)
122 .map_err(SearchIndexFactoryError::Database)
123 }
124 }
125 }
126
127 pub fn create_search_index(&self, tenant: &Tenant) -> TenantSearchIndex {
129 match self {
130 SearchIndexFactory::Typesense(factory) => {
131 let search_index = tenant.os_index_name.clone();
132 TenantSearchIndex::Typesense(factory.create_search_index(search_index))
133 }
134
135 SearchIndexFactory::OpenSearch(factory) => {
136 let search_index = opensearch::TenantSearchIndexName::from_tenant(tenant);
137 TenantSearchIndex::OpenSearch(factory.create_search_index(search_index))
138 }
139
140 SearchIndexFactory::Database(factory) => {
141 TenantSearchIndex::Database(factory.create_search_index(tenant))
142 }
143 }
144 }
145}
146
147#[derive(Clone)]
148pub enum TenantSearchIndex {
149 Typesense(typesense::TypesenseIndex),
150 OpenSearch(opensearch::OpenSearchIndex),
151 Database(database::DatabaseSearchIndex),
152}
153
154#[derive(Debug, Error)]
155pub enum SearchError {
156 #[error(transparent)]
157 Typesense(#[from] typesense::TypesenseSearchError),
158 #[error(transparent)]
159 OpenSearch(#[from] opensearch::OpenSearchSearchError),
160 #[error(transparent)]
161 Database(#[from] database::DatabaseSearchError),
162 #[error("failed to perform migration")]
163 Migration,
164}
165
166impl TenantSearchIndex {
167 #[tracing::instrument(skip(self))]
169 pub async fn create_index(&self) -> Result<(), SearchError> {
170 match self {
171 TenantSearchIndex::Typesense(index) => index.create_index().await,
172 TenantSearchIndex::OpenSearch(index) => index.create_index().await,
173 TenantSearchIndex::Database(index) => index.create_index().await,
174 }
175 }
176
177 #[tracing::instrument(skip(self))]
179 pub async fn index_exists(&self) -> Result<bool, SearchError> {
180 match self {
181 TenantSearchIndex::Typesense(index) => index.index_exists().await,
182 TenantSearchIndex::OpenSearch(index) => index.index_exists().await,
183 TenantSearchIndex::Database(index) => index.index_exists().await,
184 }
185 }
186
187 #[tracing::instrument(skip(self))]
189 pub async fn delete_index(&self) -> Result<(), SearchError> {
190 match self {
191 TenantSearchIndex::Typesense(index) => index.delete_index().await,
192 TenantSearchIndex::OpenSearch(index) => index.delete_index().await,
193 TenantSearchIndex::Database(index) => index.delete_index().await,
194 }
195 }
196
197 #[tracing::instrument(skip(self))]
199 pub async fn search_index(
200 &self,
201 scope: &[DocumentBoxScopeRaw],
202 query: SearchRequest,
203 folder_children: Option<Vec<FolderId>>,
204 ) -> Result<SearchResults, SearchError> {
205 match self {
206 TenantSearchIndex::Typesense(index) => {
207 index.search_index(scope, query, folder_children).await
208 }
209 TenantSearchIndex::OpenSearch(index) => {
210 index.search_index(scope, query, folder_children).await
211 }
212 TenantSearchIndex::Database(index) => {
213 index.search_index(scope, query, folder_children).await
214 }
215 }
216 }
217
218 #[tracing::instrument(skip(self))]
220 pub async fn search_index_file(
221 &self,
222 scope: &DocumentBoxScopeRaw,
223 file_id: FileId,
224 query: FileSearchRequest,
225 ) -> Result<FileSearchResults, SearchError> {
226 match self {
227 TenantSearchIndex::Typesense(index) => {
228 index.search_index_file(scope, file_id, query).await
229 }
230 TenantSearchIndex::OpenSearch(index) => {
231 index.search_index_file(scope, file_id, query).await
232 }
233 TenantSearchIndex::Database(index) => {
234 index.search_index_file(scope, file_id, query).await
235 }
236 }
237 }
238
239 #[tracing::instrument(skip(self))]
241 pub async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
242 match self {
243 TenantSearchIndex::Typesense(index) => index.add_data(data).await,
244 TenantSearchIndex::OpenSearch(index) => index.add_data(data).await,
245 TenantSearchIndex::Database(index) => index.add_data(data).await,
246 }
247 }
248
249 #[tracing::instrument(skip(self))]
251 pub async fn update_data(
252 &self,
253 item_id: Uuid,
254 data: UpdateSearchIndexData,
255 ) -> Result<(), SearchError> {
256 match self {
257 TenantSearchIndex::Typesense(index) => index.update_data(item_id, data).await,
258 TenantSearchIndex::OpenSearch(index) => index.update_data(item_id, data).await,
259 TenantSearchIndex::Database(index) => index.update_data(item_id, data).await,
260 }
261 }
262
263 #[tracing::instrument(skip(self))]
265 pub async fn delete_data(&self, id: Uuid) -> Result<(), SearchError> {
266 match self {
267 TenantSearchIndex::Typesense(index) => index.delete_data(id).await,
268 TenantSearchIndex::OpenSearch(index) => index.delete_data(id).await,
269 TenantSearchIndex::Database(index) => index.delete_data(id).await,
270 }
271 }
272
273 #[tracing::instrument(skip(self))]
275 pub async fn delete_by_scope(
276 &self,
277 scope: DocumentBoxScopeRawRef<'_>,
278 ) -> Result<(), SearchError> {
279 match self {
280 TenantSearchIndex::Typesense(index) => index.delete_by_scope(scope).await,
281 TenantSearchIndex::OpenSearch(index) => index.delete_by_scope(scope).await,
282 TenantSearchIndex::Database(index) => index.delete_by_scope(scope).await,
283 }
284 }
285
286 #[tracing::instrument(skip(self))]
288 pub async fn get_pending_migrations(
289 &self,
290 applied_names: Vec<String>,
291 ) -> Result<Vec<String>, SearchError> {
292 match self {
293 TenantSearchIndex::Typesense(index) => {
294 index.get_pending_migrations(applied_names).await
295 }
296 TenantSearchIndex::OpenSearch(index) => {
297 index.get_pending_migrations(applied_names).await
298 }
299 TenantSearchIndex::Database(index) => index.get_pending_migrations(applied_names).await,
300 }
301 }
302
303 #[tracing::instrument(skip(self))]
305 pub async fn apply_migration(
306 &self,
307 tenant: &Tenant,
308 root_t: &mut DbTransaction<'_>,
309 tenant_t: &mut DbTransaction<'_>,
310 name: &str,
311 ) -> Result<(), SearchError> {
312 match self {
314 TenantSearchIndex::Typesense(index) => {
315 index
316 .apply_migration(tenant, root_t, tenant_t, name)
317 .await?
318 }
319
320 TenantSearchIndex::OpenSearch(index) => {
321 index
322 .apply_migration(tenant, root_t, tenant_t, name)
323 .await?
324 }
325
326 TenantSearchIndex::Database(index) => {
327 index
328 .apply_migration(tenant, root_t, tenant_t, name)
329 .await?
330 }
331 }
332
333 TenantMigration::create(
335 root_t.deref_mut(),
336 CreateTenantMigration {
337 tenant_id: tenant.id,
338 env: tenant.env.clone(),
339 name: name.to_string(),
340 applied_at: Utc::now(),
341 },
342 )
343 .await
344 .map_err(|error| {
345 tracing::error!(?error, "failed to create tenant migration");
346 SearchError::Migration
347 })?;
348
349 Ok(())
350 }
351
352 #[tracing::instrument(skip_all, fields(?tenant, ?target_migration_name))]
357 pub async fn apply_migrations(
358 &self,
359 tenant: &Tenant,
360 root_t: &mut DbTransaction<'_>,
361 tenant_t: &mut DbTransaction<'_>,
362 target_migration_name: Option<&str>,
363 ) -> Result<(), SearchError> {
364 let applied_migrations =
365 TenantMigration::find_by_tenant(root_t.deref_mut(), tenant.id, &tenant.env)
366 .await
367 .map_err(|error| {
368 tracing::error!(?error, "failed to query tenant migrations");
369 SearchError::Migration
370 })?;
371 let pending_migrations = self
372 .get_pending_migrations(
373 applied_migrations
374 .into_iter()
375 .map(|value| value.name)
376 .collect(),
377 )
378 .await?;
379
380 for migration_name in pending_migrations {
381 if target_migration_name
383 .is_some_and(|target_migration_name| target_migration_name.ne(&migration_name))
384 {
385 continue;
386 }
387
388 if let Err(error) = self
390 .apply_migration(tenant, root_t, tenant_t, &migration_name)
391 .await
392 {
393 tracing::error!(%migration_name, ?error, "failed to apply migration");
394 return Err(error);
395 }
396 }
397
398 Ok(())
399 }
400}
401
402pub(crate) trait SearchIndex: Send + Sync + 'static {
403 async fn create_index(&self) -> Result<(), SearchError>;
404
405 async fn index_exists(&self) -> Result<bool, SearchError>;
406
407 async fn delete_index(&self) -> Result<(), SearchError>;
408
409 async fn search_index(
410 &self,
411 scope: &[DocumentBoxScopeRaw],
412 query: SearchRequest,
413 folder_children: Option<Vec<FolderId>>,
414 ) -> Result<SearchResults, SearchError>;
415
416 async fn search_index_file(
417 &self,
418 scope: &DocumentBoxScopeRaw,
419 file_id: FileId,
420 query: FileSearchRequest,
421 ) -> Result<FileSearchResults, SearchError>;
422
423 async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError>;
424
425 async fn update_data(
426 &self,
427 item_id: Uuid,
428 data: UpdateSearchIndexData,
429 ) -> Result<(), SearchError>;
430
431 async fn delete_data(&self, id: Uuid) -> Result<(), SearchError>;
432
433 async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError>;
434
435 async fn get_pending_migrations(
436 &self,
437 applied_names: Vec<String>,
438 ) -> Result<Vec<String>, SearchError>;
439
440 async fn apply_migration(
441 &self,
442 tenant: &Tenant,
443 root_t: &mut DbTransaction<'_>,
444 t: &mut DbTransaction<'_>,
445 name: &str,
446 ) -> Result<(), SearchError>;
447}