opensearch_client/
document.rs1use opensearch_dsl::Query;
2use serde::{de::DeserializeOwned, Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5
6use crate::{
7 common, common::DocumentDeleteResponse, get_opensearch, Error, IndexResponse, UpdateActionBody,
8};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Field {
12 pub name: String,
13 pub field_type: String,
14 pub os_type: String,
15 pub aggregatable: bool,
16 pub searchable: bool,
17 pub sub_fields: Vec<Box<Field>>,
18}
19
20#[async_trait::async_trait]
21pub trait Document: Serialize + DeserializeOwned + Sized + std::clone::Clone {
22 fn index_name() -> String;
24 fn id(&self) -> String;
26
27 fn columns() -> Vec<Field>;
28
29 async fn save(&self) -> Result<IndexResponse, Error> {
31 let doc_id = self.id().to_string();
32 let doc_body = serde_json::to_value(self)?;
33
34 get_opensearch()
35 .index_document(&Self::index_name(), &doc_body, Some(doc_id))
36 .await
37 }
38
39 async fn get(id: &str) -> Result<Self, Error> {
41 let index = Self::index_name();
42 let result = get_opensearch().get_typed::<Self>(&index, id).await?;
43 match result.source {
44 None => {
45 return Err(Error::DocumentNotFoundError(
46 index.to_owned(),
47 id.to_owned(),
48 ))
49 }
50 Some(value) => Ok(value),
51 }
52 }
53
54 async fn delete(id: &str) -> Result<DocumentDeleteResponse, Error> {
56 let index = Self::index_name();
57 get_opensearch().delete().index(index).id(id).call().await
58 }
59
60 async fn update(id: &str, partial_doc: &Value) -> Result<IndexResponse, Error> {
62 let index = Self::index_name();
63 let update_body = UpdateActionBody {
64 doc: Some(partial_doc.clone()),
65 doc_as_upsert: Some(false),
66 ..Default::default()
67 };
68
69 get_opensearch()
70 .update_document(&index, id, &update_body)
71 .await
72 }
73
74 async fn upsert_doc(&self) -> Result<IndexResponse, Error> {
75 let index = Self::index_name();
76 let body = serde_json::to_value(self)?;
77 let update_body = UpdateActionBody {
78 doc: Some(body.clone()),
79 doc_as_upsert: Some(true),
80 ..Default::default()
81 };
82 let id = self.id().to_string();
83
84 get_opensearch()
85 .update_document(&index, &id, &update_body)
86 .await
87 }
88
89 async fn upsert_value(&self, partial_doc: &Value) -> Result<IndexResponse, Error> {
90 let index = Self::index_name();
91 let body = partial_doc.clone();
92 let update_body = UpdateActionBody {
93 doc: Some(body),
94 doc_as_upsert: Some(true),
95 ..Default::default()
96 };
97 let id = self.id().to_string();
98
99 get_opensearch()
100 .update_document(&index, &id, &update_body)
101 .await
102 }
103
104 async fn refresh(&mut self) -> Result<(), Error> {
106 let updated_doc = Self::get(&self.id()).await?;
107 *self = updated_doc;
108 Ok(())
109 }
110
111 async fn find(search: common::Search) -> Result<crate::search::TypedSearchResult<Self>, Error> {
113 let index = Self::index_name();
114 get_opensearch().search_typed::<Self>(&index, search).await
115 }
116
117 async fn find_all(
119 limit: Option<usize>,
120 ) -> Result<crate::search::TypedSearchResult<Self>, Error> {
121 let mut params = common::Search::default();
122 if let Some(l) = limit {
123 params = params.size(l as u64);
124 }
125 Self::find(params).await
126 }
127
128 async fn find_one(params: common::Search) -> Result<Option<Self>, Error> {
130 let single_params = params.size(1);
131
132 let results = Self::find(single_params).await?;
133 Ok(results
134 .hits
135 .hits
136 .into_iter()
137 .next()
138 .and_then(|f| f.source.clone()))
139 }
140
141 async fn count(params: Option<Query>) -> Result<u32, Error> {
143 let index = Self::index_name();
144 let query: Query = params.unwrap_or(Query::MatchAll(Query::match_all()));
145
146 let response = get_opensearch()
147 .count()
148 .index(index)
149 .query(query)
150 .call()
151 .await?;
152 Ok(response.count)
153 }
154}