1use crate::SearchError;
2use crate::models::FileSearchRequest;
3use crate::opensearch::models::{OsSearchIndexData, OsUpdateSearchIndexData, SearchResponse};
4
5use super::models::{FlattenedItemResult, PageResult, SearchScore};
6use super::{
7 SearchIndex,
8 models::{
9 FileSearchResults, SearchIndexData, SearchRequest, SearchResults, UpdateSearchIndexData,
10 },
11};
12use aws_config::SdkConfig;
13use docbox_database::DbTransaction;
14use docbox_database::models::document_box::DocumentBoxScopeRawRef;
15use docbox_database::models::file::FileId;
16use docbox_database::models::{
17 document_box::DocumentBoxScopeRaw, folder::FolderId, tenant::Tenant,
18};
19use opensearch::indices::IndicesGetParts;
20use opensearch::{
21 DeleteByQueryParts, OpenSearch, SearchParts,
22 http::{
23 Url,
24 request::JsonBody,
25 transport::{SingleNodeConnectionPool, TransportBuilder},
26 },
27 indices::{IndicesCreateParts, IndicesDeleteParts},
28};
29use reqwest::StatusCode;
30use serde::{Deserialize, Serialize};
31use serde_json::json;
32use serde_with::skip_serializing_none;
33use uuid::Uuid;
34
35pub use error::{OpenSearchIndexFactoryError, OpenSearchSearchError};
36
37pub mod error;
38mod models;
39
40#[derive(Debug, Clone, Deserialize, Serialize)]
41pub struct OpenSearchConfig {
42 pub url: String,
44}
45
46impl OpenSearchConfig {
47 pub fn from_env() -> Result<Self, OpenSearchIndexFactoryError> {
48 let url =
49 std::env::var("OPENSEARCH_URL").map_err(|_| OpenSearchIndexFactoryError::MissingUrl)?;
50 Ok(Self { url })
51 }
52}
53
54#[derive(Clone)]
55pub struct OpenSearchIndexFactory {
56 client: OpenSearch,
57}
58
59impl OpenSearchIndexFactory {
60 pub fn from_config(
61 aws_config: &SdkConfig,
62 config: OpenSearchConfig,
63 ) -> Result<Self, OpenSearchIndexFactoryError> {
64 let url = reqwest::Url::parse(&config.url).map_err(|error| {
65 tracing::error!(?error, "failed to parse opensearch url");
66 OpenSearchIndexFactoryError::InvalidUrl
67 })?;
68 let client = create_open_search(aws_config, url)?;
69 Ok(Self { client })
70 }
71
72 pub fn create_search_index(&self, search_index: TenantSearchIndexName) -> OpenSearchIndex {
73 OpenSearchIndex {
74 client: self.client.clone(),
75 search_index,
76 }
77 }
78}
79
80#[derive(Clone)]
81pub struct OpenSearchIndex {
82 client: OpenSearch,
83 search_index: TenantSearchIndexName,
84}
85
86#[derive(Clone, Debug)]
88pub struct TenantSearchIndexName(String);
89
90impl TenantSearchIndexName {
91 pub fn from_tenant(tenant: &Tenant) -> Self {
92 Self(tenant.os_index_name.clone())
93 }
94}
95
96pub fn create_open_search(
98 aws_config: &SdkConfig,
99 url: Url,
100) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
101 if cfg!(debug_assertions) {
102 create_open_search_dev(url)
103 } else {
104 create_open_search_prod(aws_config, url)
105 }
106}
107
108pub fn create_open_search_dev(url: Url) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
110 let conn_pool = SingleNodeConnectionPool::new(url);
111
112 let transport = TransportBuilder::new(conn_pool)
113 .disable_proxy()
115 .cert_validation(opensearch::cert::CertificateValidation::None)
117 .build()
118 .map_err(|error| {
119 tracing::error!(?error, "failed to build opensearch transport");
120 OpenSearchIndexFactoryError::BuildTransport
121 })?;
122
123 let open_search = OpenSearch::new(transport);
124
125 Ok(open_search)
126}
127
128pub fn create_open_search_prod(
130 aws_config: &SdkConfig,
131 url: Url,
132) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
133 let conn_pool = SingleNodeConnectionPool::new(url);
135
136 let transport = TransportBuilder::new(conn_pool)
137 .disable_proxy()
139 .auth(aws_config.clone().try_into().map_err(|error| {
140 tracing::error!(?error, "failed to create opensearch transport auth config");
141 OpenSearchIndexFactoryError::CreateAuthConfig
142 })?)
143 .service_name("es")
144 .build()
145 .map_err(|error| {
146 tracing::error!(?error, "failed to build opensearch transport");
147 OpenSearchIndexFactoryError::BuildTransport
148 })?;
149
150 let open_search = OpenSearch::new(transport);
151
152 Ok(open_search)
153}
154
155impl SearchIndex for OpenSearchIndex {
156 async fn create_index(&self) -> Result<(), SearchError> {
157 let response = self
159 .client
160 .indices()
161 .create(IndicesCreateParts::Index(&self.search_index.0))
162 .body(json!({
163 "settings": {
164 "analysis": {
165 "tokenizer": {
166 "edge_ngram_tokenizer": {
167 "type": "edge_ngram",
168 "min_gram": 1,
169 "max_gram": 25,
170 "token_chars": [
171 "letter",
172 "digit"
173 ]
174 }
175 },
176 "analyzer": {
177 "edge_ngram_analyzer": {
178 "type": "custom",
179 "tokenizer": "edge_ngram_tokenizer"
180 }
181 }
182 }
183 },
184 "mappings" : {
185 "properties" : {
186 "item_id": { "type": "keyword" },
188 "item_type": { "type": "keyword" },
190 "mime": { "type": "keyword" },
192 "name" : { "type" : "text", "analyzer": "edge_ngram_analyzer" },
194 "content" : { "type" : "text" },
196 "created_at": { "type": "date", "format": "rfc3339_lenient" },
198 "created_by": { "type": "keyword" },
200 "folder_id": { "type": "keyword" },
202 "document_box": { "type": "keyword" },
204 "version": {
206 "type": "keyword"
207 },
208 "pages": {
210 "type": "nested",
211 "properties": {
212 "content" : { "type" : "text" },
214 "page": { "type": "integer" },
216 }
217 }
218 }
219 }
220 }))
221 .send()
222 .await
223 .map_err(|error| {
224 tracing::error!(?error, "failed to create index");
225 OpenSearchSearchError::CreateIndex
226 })?;
227
228 tracing::debug!("open search response {response:?}");
229
230 Ok(())
231 }
232
233 async fn index_exists(&self) -> Result<bool, SearchError> {
234 let response = self
236 .client
237 .indices()
238 .get(IndicesGetParts::Index(&[&self.search_index.0]))
239 .send()
240 .await
241 .map_err(|error| {
242 tracing::error!(?error, "failed to get index");
243 OpenSearchSearchError::GetIndex
244 })?;
245
246 if response.status_code() == StatusCode::NOT_FOUND {
247 return Ok(false);
248 }
249
250 response.error_for_status_code().map_err(|error| {
251 tracing::error!(?error, "failed to get index");
252 OpenSearchSearchError::GetIndex
253 })?;
254
255 Ok(true)
256 }
257
258 async fn delete_index(&self) -> Result<(), SearchError> {
259 let response = self
261 .client
262 .indices()
263 .delete(IndicesDeleteParts::Index(&[&self.search_index.0]))
264 .send()
265 .await
266 .map_err(|error| {
267 tracing::error!(?error, "failed to delete index");
268 OpenSearchSearchError::DeleteIndex
269 })?;
270
271 if response.status_code() == StatusCode::NOT_FOUND {
273 return Ok(());
274 }
275
276 response.error_for_status_code().map_err(|error| {
277 tracing::error!(?error, "failed to delete search index (response)");
278 OpenSearchSearchError::DeleteIndex
279 })?;
280
281 Ok(())
282 }
283
284 async fn search_index_file(
285 &self,
286 scope: &DocumentBoxScopeRaw,
287 file_id: docbox_database::models::file::FileId,
288 query: super::models::FileSearchRequest,
289 ) -> Result<FileSearchResults, SearchError> {
290 let offset = query.offset;
291 let query = create_opensearch_file_query(query, scope, file_id);
292
293 tracing::debug!(%query, "searching with query");
294
295 let response = self
297 .client
298 .search(SearchParts::Index(&[&self.search_index.0]))
299 .from(offset.unwrap_or(0) as i64)
300 .body(query)
301 .send()
302 .await
303 .map_err(|error| {
304 tracing::error!(?error, "failed to search index file");
305 OpenSearchSearchError::SearchIndex
306 })?;
307
308 let response: serde_json::Value = response.json().await.map_err(|error| {
309 tracing::error!(?error, "failed to get file search response");
310 OpenSearchSearchError::SearchIndex
311 })?;
312
313 tracing::debug!(%response, "search response");
314
315 let response: SearchResponse = serde_json::from_value(response).map_err(|error| {
316 tracing::error!(?error, "failed to parse file search response");
317 OpenSearchSearchError::SearchIndex
318 })?;
319
320 let (total_hits, results) = response
321 .hits
322 .hits
323 .into_iter()
324 .next()
325 .and_then(|item| item.inner_hits)
326 .map(|inner_hits| {
327 let total_hits = inner_hits.pages.hits.total.value;
328 let page_matches: Vec<PageResult> = inner_hits
329 .pages
330 .hits
331 .hits
332 .into_iter()
333 .map(|value| PageResult {
334 page: value._source.page,
335 matches: value.highlight.content,
336 })
337 .collect();
338 (total_hits, page_matches)
339 })
340 .unwrap_or_default();
341
342 Ok(FileSearchResults {
343 total_hits,
344 results,
345 })
346 }
347
348 async fn search_index(
349 &self,
350 scope: &[DocumentBoxScopeRaw],
351 query: SearchRequest,
352 folder_children: Option<Vec<FolderId>>,
353 ) -> Result<SearchResults, SearchError> {
354 let offset = query.offset;
355 let query = create_opensearch_query(query, scope, folder_children);
356
357 tracing::debug!(%query, "searching with query");
358
359 let response = self
361 .client
362 .search(SearchParts::Index(&[&self.search_index.0]))
363 .from(offset.unwrap_or(0) as i64)
364 .body(query)
365 .send()
366 .await
367 .map_err(|error| {
368 tracing::error!(?error, "failed to search index");
369 OpenSearchSearchError::SearchIndex
370 })?;
371
372 let response: serde_json::Value = response.json().await.map_err(|error| {
373 tracing::error!(?error, "failed to get search response");
374 OpenSearchSearchError::SearchIndex
375 })?;
376
377 tracing::debug!(%response);
378
379 let response: SearchResponse = serde_json::from_value(response).map_err(|error| {
380 tracing::error!(?error, "failed to parse search response");
381 OpenSearchSearchError::SearchIndex
382 })?;
383 let total_hits = response.hits.total.value;
384
385 const NAME_MATCH_KEYS: [&str; 2] = ["name_match_exact", "name_match_wildcard"];
386
387 let results: Vec<FlattenedItemResult> = response
388 .hits
389 .hits
390 .into_iter()
391 .map(|item| {
392 let (total_hits, page_matches) = match item.inner_hits {
393 Some(inner_hits) => {
394 let total_hits = inner_hits.pages.hits.total.value;
395 let page_matches: Vec<PageResult> = inner_hits
396 .pages
397 .hits
398 .hits
399 .into_iter()
400 .map(|value| PageResult {
401 page: value._source.page,
402 matches: value.highlight.content,
403 })
404 .collect();
405 (total_hits, page_matches)
406 }
407 None => (0, vec![]),
408 };
409
410 let name_match = item.matched_queries.is_some_and(|matches| {
411 matches
412 .iter()
413 .any(|value| NAME_MATCH_KEYS.contains(&value.as_str()))
414 });
415 let content_match = !page_matches.is_empty();
416
417 FlattenedItemResult {
418 item_ty: item._source.item_type,
419 item_id: item._source.item_id,
420 document_box: item._source.document_box,
421 score: SearchScore::Float(item._score),
422 page_matches,
423 total_hits,
424 name_match,
425 content_match,
426 }
427 })
428 .collect();
429
430 Ok(SearchResults {
431 total_hits,
432 results,
433 })
434 }
435
436 async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
437 let mapped_data: Vec<JsonBody<OsSearchIndexData>> = data
438 .into_iter()
439 .map(|data| {
440 JsonBody::new(OsSearchIndexData {
441 ty: data.ty,
442 folder_id: data.folder_id,
443 document_box: data.document_box,
444 item_id: data.item_id,
445 name: data.name,
446 mime: data.mime,
447 content: data.content,
448 created_at: data.created_at.to_rfc3339(),
449 created_by: data.created_by,
450 pages: data.pages,
451 })
452 })
453 .collect();
454
455 let result = self
457 .client
458 .bulk(opensearch::BulkParts::Index(&self.search_index.0))
460 .body(mapped_data)
461 .send()
462 .await
463 .map_err(|error| {
464 tracing::error!(?error, "failed to bulk add data");
465 OpenSearchSearchError::AddData
466 })?;
467
468 let status_code = result.status_code();
469
470 let response = result.text().await.map_err(|error| {
471 tracing::error!(?error, "failed to get bulk add response");
472 OpenSearchSearchError::AddData
473 })?;
474
475 if status_code.is_client_error() || status_code.is_server_error() {
476 tracing::error!(?response, "bulk add error response");
477 return Err(OpenSearchSearchError::AddData.into());
478 }
479 Ok(())
480 }
481
482 async fn update_data(
483 &self,
484 item_id: Uuid,
485 data: UpdateSearchIndexData,
486 ) -> Result<(), SearchError> {
487 let data = OsUpdateSearchIndexData {
488 folder_id: data.folder_id,
489 name: data.name,
490 content: data.content,
491 pages: data.pages,
492 };
493
494 let items = self.get_by_item_id(item_id).await.map_err(|error| {
495 tracing::error!(?error, "failed to find items to update");
496 OpenSearchSearchError::UpdateData
497 })?;
498
499 if items.is_empty() {
501 return Ok(());
502 }
503
504 #[derive(Serialize)]
506 enum BulkUpdateEntry<'a> {
507 #[serde(rename = "update")]
509 Update {
510 _id: String,
512 },
513 #[serde(rename = "doc")]
515 Document {
516 #[serde(flatten)]
517 data: &'a OsUpdateSearchIndexData,
518 },
519 }
520
521 let updates: Vec<JsonBody<BulkUpdateEntry<'_>>> = items
523 .into_iter()
524 .flat_map(|_id| {
525 [
526 BulkUpdateEntry::Update { _id },
527 BulkUpdateEntry::Document { data: &data },
528 ]
529 })
530 .map(JsonBody::new)
531 .collect();
532
533 let result = self
535 .client
536 .bulk(opensearch::BulkParts::Index(&self.search_index.0))
537 .body(updates)
538 .send()
539 .await
540 .map_err(|error| {
541 tracing::error!(?error, "failed to update data (request)");
542 OpenSearchSearchError::UpdateData
543 })?;
544
545 let status_code = result.status_code();
546 let response: serde_json::Value = result.json().await.map_err(|error| {
547 tracing::error!(?error, "failed to update data (response)");
548 OpenSearchSearchError::UpdateData
549 })?;
550
551 tracing::debug!(?response, "search index update response");
552
553 if status_code.is_client_error() || status_code.is_server_error() {
554 tracing::error!(?response, "update data error response");
555 return Err(OpenSearchSearchError::UpdateData.into());
556 }
557
558 Ok(())
559 }
560
561 async fn delete_data(&self, item_id: Uuid) -> Result<(), SearchError> {
562 self.client
563 .delete_by_query(DeleteByQueryParts::Index(&[&self.search_index.0]))
564 .body(json!({
565 "query": {
566 "term": { "item_id": item_id }
567 }
568 }))
569 .send()
570 .await
571 .map_err(|error| {
572 tracing::error!(?error, "failed to delete data");
573 OpenSearchSearchError::DeleteData
574 })?;
575
576 Ok(())
577 }
578
579 async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
580 self.client
581 .delete_by_query(DeleteByQueryParts::Index(&[&self.search_index.0]))
582 .body(json!({
583 "query": {
584 "term": { "document_box": scope }
585 }
586 }))
587 .send()
588 .await
589 .map_err(|error| {
590 tracing::error!(?error, "failed to delete data by scope");
591 OpenSearchSearchError::DeleteData
592 })?;
593
594 Ok(())
595 }
596
597 async fn get_pending_migrations(
598 &self,
599 _applied_names: Vec<String>,
600 ) -> Result<Vec<String>, SearchError> {
601 Ok(Vec::new())
602 }
603
604 async fn apply_migration(
605 &self,
606 _tenant: &Tenant,
607 _root_t: &mut DbTransaction<'_>,
608 _t: &mut DbTransaction<'_>,
609 _name: &str,
610 ) -> Result<(), SearchError> {
611 Ok(())
612 }
613}
614
615impl OpenSearchIndex {
616 async fn get_by_item_id(&self, item_id: Uuid) -> Result<Vec<String>, OpenSearchSearchError> {
618 #[derive(Debug, Deserialize, Serialize)]
619 struct Response {
620 hits: Hits,
621 }
622
623 #[derive(Debug, Deserialize, Serialize)]
624 struct Hits {
625 hits: Vec<Hit>,
626 }
627
628 #[derive(Debug, Deserialize, Serialize)]
629 struct Hit {
630 _id: String,
631 }
632
633 let response = self
635 .client
636 .search(SearchParts::Index(&[&self.search_index.0]))
637 .from(0)
638 .size(10)
639 .body(json!({
640 "query": {
641 "term": { "item_id": item_id }
642 },
643 }))
644 .send()
645 .await
646 .map_err(|error| {
647 tracing::error!(?error, "failed to get search item by id");
648 OpenSearchSearchError::SearchIndex
649 })?;
650
651 let response: Response = response.json().await.map_err(|error| {
652 tracing::error!(?error, "failed to parse search item by id response");
653 OpenSearchSearchError::SearchIndex
654 })?;
655
656 Ok(response.hits.hits.into_iter().map(|hit| hit._id).collect())
657 }
658}
659
660#[skip_serializing_none]
662#[derive(Serialize)]
663struct DateRange {
664 gte: Option<String>,
665 lte: Option<String>,
666}
667
668pub fn create_opensearch_query(
669 req: SearchRequest,
670 scopes: &[DocumentBoxScopeRaw],
671 folder_children: Option<Vec<FolderId>>,
672) -> serde_json::Value {
673 let mut filters = vec![];
674 let mut should = Vec::new();
675
676 filters.push(json!({
678 "terms": { "document_box": scopes }
679 }));
680
681 let query = req
682 .query
683 .filter(|value| !value.is_empty());
685
686 if let Some(ref query) = query {
687 if req.include_name {
688 should.push(json!({
690 "term": {
691 "name": {
692 "value": query,
693 "boost": 2,
694 "_name": "name_match_exact",
695 "case_insensitive": true
696 }
697 }
698 }));
699 should.push(json!({
700 "wildcard": {
701 "name": {
702 "value": format!("*{query}*"),
703 "boost": 1.5,
704 "_name": "name_match_wildcard",
705 "case_insensitive": true
706 }
707 }
708 }));
709 }
710
711 if req.include_content {
712 should.push(json!({
714 "match": {
715 "content": {
716 "query": query,
717 "_name": "content_match"
719 },
720 }
721 }));
722
723 should.push(json!({
725 "nested": {
726 "path": "pages",
727 "query": {
729 "match": {
730 "pages.content": {
731 "query": query,
732 "_name": "content_match"
734 },
735 }
736 },
737 "inner_hits": {
738 "_source": ["pages.page"],
739 "highlight": {
741 "fields": {
742 "pages.content": {
743 "fragment_size": 150,
744 "number_of_fragments": 3,
745 "type": "unified"
746 }
747 }
748 },
749 "sort": [
751 {
752 "_score": {
753 "order": "desc"
754 }
755 }
756 ],
757 "size": req.max_pages.unwrap_or(3),
759 }
760 }
761 }));
762 }
763 }
764
765 if let Some(folder_children) = folder_children {
766 filters.push(json!({
767 "terms": { "folder_id": folder_children }
768 }));
769 }
770
771 if let Some(ref mime) = req.mime {
772 filters.push(json!({
773 "term": { "mime": mime }
774 }));
775 }
776
777 if let Some(ref created_at) = req.created_at {
778 let start = created_at.start.map(|value| value.to_rfc3339());
779 let end = created_at.end.map(|value| value.to_rfc3339());
780
781 if start.is_some() || end.is_some() {
782 filters.push(json!({
783 "range": {
784 "created_at": DateRange {
785 gte: start,
786 lte: end
787 }
788 }
789 }));
790 }
791 }
792
793 if let Some(ref created_by) = req.created_by {
794 filters.push(json!({
795 "term": { "created_by": created_by }
796 }));
797 }
798
799 let minimum_should_match = if !should.is_empty() { 1 } else { 0 };
801
802 json!({
803 "query": {
805 "bool": {
806 "filter": filters,
807 "should": should,
808 "minimum_should_match": minimum_should_match
809 },
810 },
811
812 "size": req.size.unwrap_or(50),
814 "from": req.offset.unwrap_or(0),
816
817 "_source": [
819 "item_id",
820 "item_type",
821 "document_box"
822 ],
823
824 "sort": [
826 {
827 "_score": {
828 "order": "desc"
829 }
830 }
831 ]
832 })
833}
834
835pub fn create_opensearch_file_query(
836 req: FileSearchRequest,
837 scope: &DocumentBoxScopeRaw,
838 file_id: FileId,
839) -> serde_json::Value {
840 let query = req.query.unwrap_or_default();
841
842 json!({
843 "query": {
845 "bool": {
846 "filter": [
847 {
848 "term": { "document_box": scope }
849 },
850 {
851 "term": { "item_id": file_id }
852 }
853 ],
854 "should": [
855 {
856 "nested": {
857 "path": "pages",
858 "query": {
860 "match": {
861 "pages.content": {
862 "query": query,
863 "_name": "content_match"
865 },
866 }
867 },
868 "inner_hits": {
869 "_source": ["pages.page"],
870 "highlight": {
872 "fields": {
873 "pages.content": {
874 "fragment_size": 150,
875 "number_of_fragments": 1,
876 "type": "unified"
877 }
878 }
879 },
880 "sort": [
882 {
883 "_score": {
884 "order": "desc"
885 }
886 }
887 ],
888 "size": req.limit.unwrap_or(3),
890 "from": req.offset.unwrap_or(0),
891 }
892 }
893 }
894 ],
895 "minimum_should_match": 1
896 },
897 },
898
899 "size": 1,
900 "from": 0,
901
902 "_source": [
904 "item_id",
905 "item_type",
906 "document_box"
907 ],
908
909 "sort": [
911 {
912 "_score": {
913 "order": "desc"
914 }
915 }
916 ]
917 })
918}