Skip to main content

docbox_search/opensearch/
mod.rs

1use crate::SearchError;
2use crate::models::FileSearchRequest;
3use crate::opensearch::models::{OsSearchIndexData, OsUpdateSearchIndexData, SearchResponse};
4
5use super::models::{FlattenedItemResult, PageResult, SearchScore};
6use super::{
7    SearchIndex,
8    models::{
9        FileSearchResults, SearchIndexData, SearchRequest, SearchResults, UpdateSearchIndexData,
10    },
11};
12use aws_config::SdkConfig;
13use docbox_database::DbTransaction;
14use docbox_database::models::document_box::DocumentBoxScopeRawRef;
15use docbox_database::models::file::FileId;
16use docbox_database::models::{
17    document_box::DocumentBoxScopeRaw, folder::FolderId, tenant::Tenant,
18};
19use opensearch::indices::IndicesGetParts;
20use opensearch::{
21    DeleteByQueryParts, OpenSearch, SearchParts,
22    http::{
23        Url,
24        request::JsonBody,
25        transport::{SingleNodeConnectionPool, TransportBuilder},
26    },
27    indices::{IndicesCreateParts, IndicesDeleteParts},
28};
29use reqwest::StatusCode;
30use serde::{Deserialize, Serialize};
31use serde_json::json;
32use serde_with::skip_serializing_none;
33use uuid::Uuid;
34
35pub use error::{OpenSearchIndexFactoryError, OpenSearchSearchError};
36
37pub mod error;
38mod models;
39
40#[derive(Debug, Clone, Deserialize, Serialize)]
41pub struct OpenSearchConfig {
42    /// URL of the OpenSearch server
43    pub url: String,
44}
45
46impl OpenSearchConfig {
47    pub fn from_env() -> Result<Self, OpenSearchIndexFactoryError> {
48        let url =
49            std::env::var("OPENSEARCH_URL").map_err(|_| OpenSearchIndexFactoryError::MissingUrl)?;
50        Ok(Self { url })
51    }
52}
53
54#[derive(Clone)]
55pub struct OpenSearchIndexFactory {
56    client: OpenSearch,
57}
58
59impl OpenSearchIndexFactory {
60    pub fn from_config(
61        aws_config: &SdkConfig,
62        config: OpenSearchConfig,
63    ) -> Result<Self, OpenSearchIndexFactoryError> {
64        let url = reqwest::Url::parse(&config.url).map_err(|error| {
65            tracing::error!(?error, "failed to parse opensearch url");
66            OpenSearchIndexFactoryError::InvalidUrl
67        })?;
68        let client = create_open_search(aws_config, url)?;
69        Ok(Self { client })
70    }
71
72    pub fn create_search_index(&self, search_index: TenantSearchIndexName) -> OpenSearchIndex {
73        OpenSearchIndex {
74            client: self.client.clone(),
75            search_index,
76        }
77    }
78}
79
80#[derive(Clone)]
81pub struct OpenSearchIndex {
82    client: OpenSearch,
83    search_index: TenantSearchIndexName,
84}
85
86/// Represents a search index name for a specific tenant
87#[derive(Clone, Debug)]
88pub struct TenantSearchIndexName(String);
89
90impl TenantSearchIndexName {
91    pub fn from_tenant(tenant: &Tenant) -> Self {
92        Self(tenant.os_index_name.clone())
93    }
94}
95
96/// Create instance of [OpenSearch] from the environment
97pub fn create_open_search(
98    aws_config: &SdkConfig,
99    url: Url,
100) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
101    if cfg!(debug_assertions) {
102        create_open_search_dev(url)
103    } else {
104        create_open_search_prod(aws_config, url)
105    }
106}
107
108/// Create instance of [OpenSearch] from the environment
109pub fn create_open_search_dev(url: Url) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
110    let conn_pool = SingleNodeConnectionPool::new(url);
111
112    let transport = TransportBuilder::new(conn_pool)
113        // We don't want open search trying to access the index through our proxy. It has a direct route
114        .disable_proxy()
115        // Disable certificate validation for local development
116        .cert_validation(opensearch::cert::CertificateValidation::None)
117        .build()
118        .map_err(|error| {
119            tracing::error!(?error, "failed to build opensearch transport");
120            OpenSearchIndexFactoryError::BuildTransport
121        })?;
122
123    let open_search = OpenSearch::new(transport);
124
125    Ok(open_search)
126}
127
128/// Create instance of [OpenSearch] from the environment
129pub fn create_open_search_prod(
130    aws_config: &SdkConfig,
131    url: Url,
132) -> Result<OpenSearch, OpenSearchIndexFactoryError> {
133    // Setup opensearch connection pool
134    let conn_pool = SingleNodeConnectionPool::new(url);
135
136    let transport = TransportBuilder::new(conn_pool)
137        // We don't want open search trying to access the index through our proxy. It has a direct route
138        .disable_proxy()
139        .auth(aws_config.clone().try_into().map_err(|error| {
140            tracing::error!(?error, "failed to create opensearch transport auth config");
141            OpenSearchIndexFactoryError::CreateAuthConfig
142        })?)
143        .service_name("es")
144        .build()
145        .map_err(|error| {
146            tracing::error!(?error, "failed to build opensearch transport");
147            OpenSearchIndexFactoryError::BuildTransport
148        })?;
149
150    let open_search = OpenSearch::new(transport);
151
152    Ok(open_search)
153}
154
155impl SearchIndex for OpenSearchIndex {
156    async fn create_index(&self) -> Result<(), SearchError> {
157        // Create index for files
158        let response = self
159            .client
160            .indices()
161            .create(IndicesCreateParts::Index(&self.search_index.0))
162            .body(json!({
163                "settings": {
164                    "analysis": {
165                        "tokenizer": {
166                            "edge_ngram_tokenizer": {
167                                "type": "edge_ngram",
168                                "min_gram": 1,
169                                "max_gram": 25,
170                                "token_chars": [
171                                    "letter",
172                                    "digit"
173                                ]
174                            }
175                        },
176                        "analyzer": {
177                            "edge_ngram_analyzer": {
178                                "type": "custom",
179                                "tokenizer": "edge_ngram_tokenizer"
180                            }
181                        }
182                    }
183                },
184                "mappings" : {
185                    "properties" : {
186                        // ID of the document / file / link
187                        "item_id": { "type": "keyword" },
188                        // Folder, File, Link
189                        "item_type": { "type": "keyword" },
190                        // Mime type for files
191                        "mime": { "type": "keyword" },
192                        // Full text file/folder/link name search
193                        "name" : { "type" : "text", "analyzer": "edge_ngram_analyzer" },
194                        // Full text file/link value content search
195                        "content" : { "type" : "text" },
196                        // Created at date search
197                        "created_at": { "type": "date", "format": "rfc3339_lenient" },
198                        // Exact user for the creator user ID
199                        "created_by": { "type": "keyword" },
200                        // Exact search for the folder the item is within
201                        "folder_id": { "type": "keyword" },
202                         // Exact search for the document box the item is within
203                        "document_box": { "type": "keyword" },
204                        // Document versioning
205                        "version": {
206                            "type": "keyword"
207                        },
208                        // Document pages for files
209                        "pages": {
210                            "type": "nested",
211                            "properties": {
212                                // Full text file/link value content search
213                                "content" : { "type" : "text" },
214                                // Page number
215                                "page": { "type": "integer" },
216                            }
217                        }
218                    }
219                }
220            }))
221            .send()
222            .await
223            .map_err(|error| {
224                tracing::error!(?error, "failed to create index");
225                OpenSearchSearchError::CreateIndex
226            })?;
227
228        tracing::debug!("open search response {response:?}");
229
230        Ok(())
231    }
232
233    async fn index_exists(&self) -> Result<bool, SearchError> {
234        // Delete index for files
235        let response = self
236            .client
237            .indices()
238            .get(IndicesGetParts::Index(&[&self.search_index.0]))
239            .send()
240            .await
241            .map_err(|error| {
242                tracing::error!(?error, "failed to get index");
243                OpenSearchSearchError::GetIndex
244            })?;
245
246        if response.status_code() == StatusCode::NOT_FOUND {
247            return Ok(false);
248        }
249
250        response.error_for_status_code().map_err(|error| {
251            tracing::error!(?error, "failed to get index");
252            OpenSearchSearchError::GetIndex
253        })?;
254
255        Ok(true)
256    }
257
258    async fn delete_index(&self) -> Result<(), SearchError> {
259        // Delete index for files
260        let response = self
261            .client
262            .indices()
263            .delete(IndicesDeleteParts::Index(&[&self.search_index.0]))
264            .send()
265            .await
266            .map_err(|error| {
267                tracing::error!(?error, "failed to delete index");
268                OpenSearchSearchError::DeleteIndex
269            })?;
270
271        // Gracefully handle the index already not existing
272        if response.status_code() == StatusCode::NOT_FOUND {
273            return Ok(());
274        }
275
276        response.error_for_status_code().map_err(|error| {
277            tracing::error!(?error, "failed to delete search index (response)");
278            OpenSearchSearchError::DeleteIndex
279        })?;
280
281        Ok(())
282    }
283
284    async fn search_index_file(
285        &self,
286        scope: &DocumentBoxScopeRaw,
287        file_id: docbox_database::models::file::FileId,
288        query: super::models::FileSearchRequest,
289    ) -> Result<FileSearchResults, SearchError> {
290        let offset = query.offset;
291        let query = create_opensearch_file_query(query, scope, file_id);
292
293        tracing::debug!(%query, "searching with query");
294
295        // Search for field in content
296        let response = self
297            .client
298            .search(SearchParts::Index(&[&self.search_index.0]))
299            .from(offset.unwrap_or(0) as i64)
300            .body(query)
301            .send()
302            .await
303            .map_err(|error| {
304                tracing::error!(?error, "failed to search index file");
305                OpenSearchSearchError::SearchIndex
306            })?;
307
308        let response: serde_json::Value = response.json().await.map_err(|error| {
309            tracing::error!(?error, "failed to get file search response");
310            OpenSearchSearchError::SearchIndex
311        })?;
312
313        tracing::debug!(%response, "search response");
314
315        let response: SearchResponse = serde_json::from_value(response).map_err(|error| {
316            tracing::error!(?error, "failed to parse file search response");
317            OpenSearchSearchError::SearchIndex
318        })?;
319
320        let (total_hits, results) = response
321            .hits
322            .hits
323            .into_iter()
324            .next()
325            .and_then(|item| item.inner_hits)
326            .map(|inner_hits| {
327                let total_hits = inner_hits.pages.hits.total.value;
328                let page_matches: Vec<PageResult> = inner_hits
329                    .pages
330                    .hits
331                    .hits
332                    .into_iter()
333                    .map(|value| PageResult {
334                        page: value._source.page,
335                        matches: value.highlight.content,
336                    })
337                    .collect();
338                (total_hits, page_matches)
339            })
340            .unwrap_or_default();
341
342        Ok(FileSearchResults {
343            total_hits,
344            results,
345        })
346    }
347
348    async fn search_index(
349        &self,
350        scope: &[DocumentBoxScopeRaw],
351        query: SearchRequest,
352        folder_children: Option<Vec<FolderId>>,
353    ) -> Result<SearchResults, SearchError> {
354        let offset = query.offset;
355        let query = create_opensearch_query(query, scope, folder_children);
356
357        tracing::debug!(%query, "searching with query");
358
359        // Search for field in content
360        let response = self
361            .client
362            .search(SearchParts::Index(&[&self.search_index.0]))
363            .from(offset.unwrap_or(0) as i64)
364            .body(query)
365            .send()
366            .await
367            .map_err(|error| {
368                tracing::error!(?error, "failed to search index");
369                OpenSearchSearchError::SearchIndex
370            })?;
371
372        let response: serde_json::Value = response.json().await.map_err(|error| {
373            tracing::error!(?error, "failed to get search response");
374            OpenSearchSearchError::SearchIndex
375        })?;
376
377        tracing::debug!(%response);
378
379        let response: SearchResponse = serde_json::from_value(response).map_err(|error| {
380            tracing::error!(?error, "failed to parse search response");
381            OpenSearchSearchError::SearchIndex
382        })?;
383        let total_hits = response.hits.total.value;
384
385        const NAME_MATCH_KEYS: [&str; 2] = ["name_match_exact", "name_match_wildcard"];
386
387        let results: Vec<FlattenedItemResult> = response
388            .hits
389            .hits
390            .into_iter()
391            .map(|item| {
392                let (total_hits, page_matches) = match item.inner_hits {
393                    Some(inner_hits) => {
394                        let total_hits = inner_hits.pages.hits.total.value;
395                        let page_matches: Vec<PageResult> = inner_hits
396                            .pages
397                            .hits
398                            .hits
399                            .into_iter()
400                            .map(|value| PageResult {
401                                page: value._source.page,
402                                matches: value.highlight.content,
403                            })
404                            .collect();
405                        (total_hits, page_matches)
406                    }
407                    None => (0, vec![]),
408                };
409
410                let name_match = item.matched_queries.is_some_and(|matches| {
411                    matches
412                        .iter()
413                        .any(|value| NAME_MATCH_KEYS.contains(&value.as_str()))
414                });
415                let content_match = !page_matches.is_empty();
416
417                FlattenedItemResult {
418                    item_ty: item._source.item_type,
419                    item_id: item._source.item_id,
420                    document_box: item._source.document_box,
421                    score: SearchScore::Float(item._score),
422                    page_matches,
423                    total_hits,
424                    name_match,
425                    content_match,
426                }
427            })
428            .collect();
429
430        Ok(SearchResults {
431            total_hits,
432            results,
433        })
434    }
435
436    async fn add_data(&self, data: Vec<SearchIndexData>) -> Result<(), SearchError> {
437        let mapped_data: Vec<JsonBody<OsSearchIndexData>> = data
438            .into_iter()
439            .map(|data| {
440                JsonBody::new(OsSearchIndexData {
441                    ty: data.ty,
442                    folder_id: data.folder_id,
443                    document_box: data.document_box,
444                    item_id: data.item_id,
445                    name: data.name,
446                    mime: data.mime,
447                    content: data.content,
448                    created_at: data.created_at.to_rfc3339(),
449                    created_by: data.created_by,
450                    pages: data.pages,
451                })
452            })
453            .collect();
454
455        // Index a file
456        let result = self
457            .client
458            // Use file.id
459            .bulk(opensearch::BulkParts::Index(&self.search_index.0))
460            .body(mapped_data)
461            .send()
462            .await
463            .map_err(|error| {
464                tracing::error!(?error, "failed to bulk add data");
465                OpenSearchSearchError::AddData
466            })?;
467
468        let status_code = result.status_code();
469
470        let response = result.text().await.map_err(|error| {
471            tracing::error!(?error, "failed to get bulk add response");
472            OpenSearchSearchError::AddData
473        })?;
474
475        if status_code.is_client_error() || status_code.is_server_error() {
476            tracing::error!(?response, "bulk add error response");
477            return Err(OpenSearchSearchError::AddData.into());
478        }
479        Ok(())
480    }
481
482    async fn update_data(
483        &self,
484        item_id: Uuid,
485        data: UpdateSearchIndexData,
486    ) -> Result<(), SearchError> {
487        let data = OsUpdateSearchIndexData {
488            folder_id: data.folder_id,
489            name: data.name,
490            content: data.content,
491            pages: data.pages,
492        };
493
494        let items = self.get_by_item_id(item_id).await.map_err(|error| {
495            tracing::error!(?error, "failed to find items to update");
496            OpenSearchSearchError::UpdateData
497        })?;
498
499        // Nothing to update
500        if items.is_empty() {
501            return Ok(());
502        }
503
504        /// Structure for creating bulk update "update" or "doc" entries for serialization
505        #[derive(Serialize)]
506        enum BulkUpdateEntry<'a> {
507            /// Update query
508            #[serde(rename = "update")]
509            Update {
510                /// ID of the document to update
511                _id: String,
512            },
513            /// Data to update the document with
514            #[serde(rename = "doc")]
515            Document {
516                #[serde(flatten)]
517                data: &'a OsUpdateSearchIndexData,
518            },
519        }
520
521        // Create the updates
522        let updates: Vec<JsonBody<BulkUpdateEntry<'_>>> = items
523            .into_iter()
524            .flat_map(|_id| {
525                [
526                    BulkUpdateEntry::Update { _id },
527                    BulkUpdateEntry::Document { data: &data },
528                ]
529            })
530            .map(JsonBody::new)
531            .collect();
532
533        // Perform the bulk updates
534        let result = self
535            .client
536            .bulk(opensearch::BulkParts::Index(&self.search_index.0))
537            .body(updates)
538            .send()
539            .await
540            .map_err(|error| {
541                tracing::error!(?error, "failed to update data (request)");
542                OpenSearchSearchError::UpdateData
543            })?;
544
545        let status_code = result.status_code();
546        let response: serde_json::Value = result.json().await.map_err(|error| {
547            tracing::error!(?error, "failed to update data (response)");
548            OpenSearchSearchError::UpdateData
549        })?;
550
551        tracing::debug!(?response, "search index update response");
552
553        if status_code.is_client_error() || status_code.is_server_error() {
554            tracing::error!(?response, "update data error response");
555            return Err(OpenSearchSearchError::UpdateData.into());
556        }
557
558        Ok(())
559    }
560
561    async fn delete_data(&self, item_id: Uuid) -> Result<(), SearchError> {
562        self.client
563            .delete_by_query(DeleteByQueryParts::Index(&[&self.search_index.0]))
564            .body(json!({
565                "query": {
566                    "term": { "item_id": item_id }
567                }
568            }))
569            .send()
570            .await
571            .map_err(|error| {
572                tracing::error!(?error, "failed to delete data");
573                OpenSearchSearchError::DeleteData
574            })?;
575
576        Ok(())
577    }
578
579    async fn delete_by_scope(&self, scope: DocumentBoxScopeRawRef<'_>) -> Result<(), SearchError> {
580        self.client
581            .delete_by_query(DeleteByQueryParts::Index(&[&self.search_index.0]))
582            .body(json!({
583                "query": {
584                    "term": { "document_box": scope }
585                }
586            }))
587            .send()
588            .await
589            .map_err(|error| {
590                tracing::error!(?error, "failed to delete data by scope");
591                OpenSearchSearchError::DeleteData
592            })?;
593
594        Ok(())
595    }
596
597    async fn get_pending_migrations(
598        &self,
599        _applied_names: Vec<String>,
600    ) -> Result<Vec<String>, SearchError> {
601        Ok(Vec::new())
602    }
603
604    async fn apply_migration(
605        &self,
606        _tenant: &Tenant,
607        _root_t: &mut DbTransaction<'_>,
608        _t: &mut DbTransaction<'_>,
609        _name: &str,
610    ) -> Result<(), SearchError> {
611        Ok(())
612    }
613}
614
615impl OpenSearchIndex {
616    /// Collect all records for the provided `item_id`
617    async fn get_by_item_id(&self, item_id: Uuid) -> Result<Vec<String>, OpenSearchSearchError> {
618        #[derive(Debug, Deserialize, Serialize)]
619        struct Response {
620            hits: Hits,
621        }
622
623        #[derive(Debug, Deserialize, Serialize)]
624        struct Hits {
625            hits: Vec<Hit>,
626        }
627
628        #[derive(Debug, Deserialize, Serialize)]
629        struct Hit {
630            _id: String,
631        }
632
633        // Search for field in content
634        let response = self
635            .client
636            .search(SearchParts::Index(&[&self.search_index.0]))
637            .from(0)
638            .size(10)
639            .body(json!({
640                "query": {
641                   "term": { "item_id": item_id }
642                },
643            }))
644            .send()
645            .await
646            .map_err(|error| {
647                tracing::error!(?error, "failed to get search item by id");
648                OpenSearchSearchError::SearchIndex
649            })?;
650
651        let response: Response = response.json().await.map_err(|error| {
652            tracing::error!(?error, "failed to parse search item by id response");
653            OpenSearchSearchError::SearchIndex
654        })?;
655
656        Ok(response.hits.hits.into_iter().map(|hit| hit._id).collect())
657    }
658}
659
660/// Updates data within the search index
661#[skip_serializing_none]
662#[derive(Serialize)]
663struct DateRange {
664    gte: Option<String>,
665    lte: Option<String>,
666}
667
668pub fn create_opensearch_query(
669    req: SearchRequest,
670    scopes: &[DocumentBoxScopeRaw],
671    folder_children: Option<Vec<FolderId>>,
672) -> serde_json::Value {
673    let mut filters = vec![];
674    let mut should = Vec::new();
675
676    // Always filter to the specific document box scope
677    filters.push(json!({
678        "terms": { "document_box": scopes }
679    }));
680
681    let query = req
682        .query
683        // Filter out empty queries
684        .filter(|value| !value.is_empty());
685
686    if let Some(ref query) = query {
687        if req.include_name {
688            // Match name of documents
689            should.push(json!({
690                "term": {
691                    "name": {
692                        "value": query,
693                        "boost": 2,
694                        "_name": "name_match_exact",
695                        "case_insensitive": true
696                    }
697                }
698            }));
699            should.push(json!({
700                "wildcard": {
701                    "name": {
702                        "value": format!("*{query}*"),
703                        "boost": 1.5,
704                        "_name": "name_match_wildcard",
705                        "case_insensitive": true
706                    }
707                }
708            }));
709        }
710
711        if req.include_content {
712            // Match content on the document itself (Link value)
713            should.push(json!({
714                "match": {
715                    "content": {
716                        "query": query,
717                        // Name the match for scoring later
718                        "_name": "content_match"
719                    },
720                }
721            }));
722
723            // Match content pages
724            should.push(json!({
725                "nested": {
726                    "path": "pages",
727                    // Match nested page content
728                    "query": {
729                        "match": {
730                            "pages.content": {
731                                "query": query,
732                                // Name the match for scoring later
733                                "_name": "content_match"
734                            },
735                        }
736                    },
737                    "inner_hits": {
738                        "_source": ["pages.page"],
739                        // Highlight
740                        "highlight": {
741                            "fields": {
742                                "pages.content": {
743                                    "fragment_size": 150,
744                                    "number_of_fragments": 3,
745                                    "type": "unified"
746                                }
747                            }
748                        },
749                        // Order results by score
750                        "sort": [
751                            {
752                              "_score": {
753                                "order": "desc"
754                              }
755                            }
756                        ],
757                        // Pagination
758                        "size": req.max_pages.unwrap_or(3),
759                    }
760                }
761            }));
762        }
763    }
764
765    if let Some(folder_children) = folder_children {
766        filters.push(json!({
767            "terms": { "folder_id": folder_children }
768        }));
769    }
770
771    if let Some(ref mime) = req.mime {
772        filters.push(json!({
773            "term": { "mime": mime }
774        }));
775    }
776
777    if let Some(ref created_at) = req.created_at {
778        let start = created_at.start.map(|value| value.to_rfc3339());
779        let end = created_at.end.map(|value| value.to_rfc3339());
780
781        if start.is_some() || end.is_some() {
782            filters.push(json!({
783                "range": {
784                    "created_at": DateRange {
785                        gte: start,
786                        lte: end
787                    }
788                }
789            }));
790        }
791    }
792
793    if let Some(ref created_by) = req.created_by {
794        filters.push(json!({
795            "term": { "created_by": created_by }
796        }));
797    }
798
799    // When a "should" is provided we must at least match one part of it
800    let minimum_should_match = if !should.is_empty() { 1 } else { 0 };
801
802    json!({
803        // Search query itself
804        "query": {
805            "bool": {
806                "filter": filters,
807                "should": should,
808                "minimum_should_match": minimum_should_match
809            },
810        },
811
812        // Maximum number of results to find
813        "size": req.size.unwrap_or(50),
814        // Offset within results
815        "from": req.offset.unwrap_or(0),
816
817        // Only include relevant source fields
818        "_source": [
819            "item_id",
820            "item_type",
821            "document_box"
822        ],
823
824        // Sort results by match score
825        "sort": [
826            {
827                "_score": {
828                    "order": "desc"
829                }
830            }
831        ]
832    })
833}
834
835pub fn create_opensearch_file_query(
836    req: FileSearchRequest,
837    scope: &DocumentBoxScopeRaw,
838    file_id: FileId,
839) -> serde_json::Value {
840    let query = req.query.unwrap_or_default();
841
842    json!({
843        // Search query itself
844        "query": {
845            "bool": {
846                "filter": [
847                    {
848                        "term": { "document_box": scope }
849                    },
850                    {
851                        "term": { "item_id": file_id }
852                    }
853                ],
854                "should": [
855                    {
856                        "nested": {
857                            "path": "pages",
858                            // Match nested page content
859                            "query": {
860                                "match": {
861                                    "pages.content": {
862                                        "query": query,
863                                        // Name the match for scoring later
864                                        "_name": "content_match"
865                                    },
866                                }
867                            },
868                            "inner_hits": {
869                                "_source": ["pages.page"],
870                                // Highlight
871                                "highlight": {
872                                    "fields": {
873                                        "pages.content": {
874                                            "fragment_size": 150,
875                                            "number_of_fragments": 1,
876                                            "type": "unified"
877                                        }
878                                    }
879                                },
880                                // Order results by score
881                                "sort": [
882                                    {
883                                    "_score": {
884                                        "order": "desc"
885                                    }
886                                    }
887                                ],
888                                // Pagination
889                                "size": req.limit.unwrap_or(3),
890                                "from": req.offset.unwrap_or(0),
891                            }
892                        }
893                    }
894                ],
895                "minimum_should_match": 1
896            },
897        },
898
899        "size": 1,
900        "from": 0,
901
902        // Only include relevant source fields
903        "_source": [
904            "item_id",
905            "item_type",
906            "document_box"
907        ],
908
909        // Sort results by match score
910        "sort": [
911            {
912                "_score": {
913                    "order": "desc"
914                }
915            }
916        ]
917    })
918}