Skip to main content

opensearch_client/
document.rs

1use opensearch_dsl::Query;
2use serde::{de::DeserializeOwned, Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5
6use crate::{
7    common, common::DocumentDeleteResponse, get_opensearch, Error, IndexResponse, UpdateActionBody,
8};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Field {
12    pub name: String,
13    pub field_type: String,
14    pub os_type: String,
15    pub aggregatable: bool,
16    pub searchable: bool,
17    pub sub_fields: Vec<Box<Field>>,
18}
19
20#[async_trait::async_trait]
21pub trait Document: Serialize + DeserializeOwned + Sized + std::clone::Clone {
22    /// The Elasticsearch index name where documents of this type live
23    fn index_name() -> String;
24    /// Return the unique ID of this document
25    fn id(&self) -> String;
26
27    fn columns() -> Vec<Field>;
28
29    /// Create or update this document in Elasticsearch
30    async fn save(&self) -> Result<IndexResponse, Error> {
31        let doc_id = self.id().to_string();
32        let doc_body = serde_json::to_value(self)?;
33
34        get_opensearch()
35            .index_document(&Self::index_name(), &doc_body, Some(doc_id))
36            .await
37    }
38
39    /// Fetch a document by ID
40    async fn get(id: &str) -> Result<Self, Error> {
41        let index = Self::index_name();
42        let result = get_opensearch().get_typed::<Self>(&index, id).await?;
43        match result.source {
44            None => {
45                return Err(Error::DocumentNotFoundError(
46                    index.to_owned(),
47                    id.to_owned(),
48                ))
49            }
50            Some(value) => Ok(value),
51        }
52    }
53
54    /// Delete a document by ID
55    async fn delete(id: &str) -> Result<DocumentDeleteResponse, Error> {
56        let index = Self::index_name();
57        get_opensearch().delete().index(index).id(id).call().await
58    }
59
60    /// Update a document with partial data and return the updated document
61    async fn update(id: &str, partial_doc: &Value) -> Result<IndexResponse, Error> {
62        let index = Self::index_name();
63        let update_body = UpdateActionBody {
64            doc: Some(partial_doc.clone()),
65            doc_as_upsert: Some(false),
66            ..Default::default()
67        };
68
69        get_opensearch()
70            .update_document(&index, id, &update_body)
71            .await
72    }
73
74    async fn upsert_doc(&self) -> Result<IndexResponse, Error> {
75        let index = Self::index_name();
76        let body = serde_json::to_value(self)?;
77        let update_body = UpdateActionBody {
78            doc: Some(body.clone()),
79            doc_as_upsert: Some(true),
80            ..Default::default()
81        };
82        let id = self.id().to_string();
83
84        get_opensearch()
85            .update_document(&index, &id, &update_body)
86            .await
87    }
88
89    async fn upsert_value(&self, partial_doc: &Value) -> Result<IndexResponse, Error> {
90        let index = Self::index_name();
91        let body = partial_doc.clone();
92        let update_body = UpdateActionBody {
93            doc: Some(body),
94            doc_as_upsert: Some(true),
95            ..Default::default()
96        };
97        let id = self.id().to_string();
98
99        get_opensearch()
100            .update_document(&index, &id, &update_body)
101            .await
102    }
103
104    /// Refresh this document instance with the latest data from Elasticsearch
105    async fn refresh(&mut self) -> Result<(), Error> {
106        let updated_doc = Self::get(&self.id()).await?;
107        *self = updated_doc;
108        Ok(())
109    }
110
111    /// Find documents using query parameters
112    async fn find(search: common::Search) -> Result<crate::search::TypedSearchResult<Self>, Error> {
113        let index = Self::index_name();
114        get_opensearch().search_typed::<Self>(&index, search).await
115    }
116
117    /// Find all documents (with optional limit)
118    async fn find_all(
119        limit: Option<usize>,
120    ) -> Result<crate::search::TypedSearchResult<Self>, Error> {
121        let mut params = common::Search::default();
122        if let Some(l) = limit {
123            params = params.size(l as u64);
124        }
125        Self::find(params).await
126    }
127
128    /// Find one document matching the query parameters
129    async fn find_one(params: common::Search) -> Result<Option<Self>, Error> {
130        let single_params = params.size(1);
131
132        let results = Self::find(single_params).await?;
133        Ok(results
134            .hits
135            .hits
136            .into_iter()
137            .next()
138            .and_then(|f| f.source.clone()))
139    }
140
141    /// Count documents matching the query
142    async fn count(params: Option<Query>) -> Result<u32, Error> {
143        let index = Self::index_name();
144        let query: Query = params.unwrap_or(Query::MatchAll(Query::match_all()));
145
146        let response = get_opensearch()
147            .count()
148            .index(index)
149            .query(query)
150            .call()
151            .await?;
152        Ok(response.count)
153    }
154}