1use async_trait::async_trait;
2use elasticsearch::{Elasticsearch, Error as EsError, SearchParts};
3use serde::{Deserialize, Serialize, de::DeserializeOwned};
4use serde_json::Value;
5use std::collections::HashMap;
6
7#[derive(thiserror::Error, Debug)]
9pub enum EsOrmError {
10 #[error("HTTP error: {0}")]
11 Http(#[from] EsError),
12 #[error("Serialization error: {0}")]
13 Serde(#[from] serde_json::Error),
14 #[error("Document not found")]
15 NotFound,
16 #[error("Elasticsearch error: {0}")]
17 EsError(Value),
18}
19
20#[derive(Debug, Deserialize)]
22pub struct IndexResponse {
23 pub result: String,
24 #[serde(flatten)]
25 pub raw: Value,
26}
27
28#[derive(Debug, Deserialize)]
30pub struct DeleteResponse {
31 pub result: String,
32 #[serde(flatten)]
33 pub raw: Value,
34}
35
36#[derive(Debug, Deserialize)]
38pub struct UpdateResponse {
39 pub result: String,
40 #[serde(rename = "_version")]
41 pub version: Option<u64>,
42 #[serde(flatten)]
43 pub raw: Value,
44}
45
46#[derive(Debug, Clone)]
48pub struct QueryParams {
49 pub filters: HashMap<String, Value>,
51 pub ranges: HashMap<String, HashMap<String, Value>>,
53 pub matches: HashMap<String, String>,
55 pub sort: Vec<HashMap<String, Value>>,
57 pub from: Option<usize>,
59 pub size: Option<usize>,
60}
61
62impl Default for QueryParams {
63 fn default() -> Self {
64 Self {
65 filters: HashMap::new(),
66 ranges: HashMap::new(),
67 matches: HashMap::new(),
68 sort: Vec::new(),
69 from: None,
70 size: Some(10), }
72 }
73}
74
75impl QueryParams {
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn filter(mut self, field: &str, value: Value) -> Self {
82 self.filters.insert(field.to_string(), value);
83 self
84 }
85
86 pub fn range(mut self, field: &str, range: HashMap<String, Value>) -> Self {
88 self.ranges.insert(field.to_string(), range);
89 self
90 }
91
92 pub fn match_query(mut self, field: &str, value: &str) -> Self {
94 self.matches.insert(field.to_string(), value.to_string());
95 self
96 }
97
98 pub fn sort_by(mut self, field: &str, order: &str) -> Self {
100 let mut sort_item = HashMap::new();
101 sort_item.insert(field.to_string(), serde_json::json!({"order": order}));
102 self.sort.push(sort_item);
103 self
104 }
105
106 pub fn paginate(mut self, from: usize, size: usize) -> Self {
108 self.from = Some(from);
109 self.size = Some(size);
110 self
111 }
112
113 pub fn to_es_query(&self) -> Value {
115 let mut query = serde_json::json!({
116 "bool": {
117 "must": []
118 }
119 });
120
121 let must = query["bool"]["must"].as_array_mut().unwrap();
122
123 for (field, value) in &self.filters {
125 must.push(serde_json::json!({
126 "term": {
127 field: value
128 }
129 }));
130 }
131
132 for (field, range) in &self.ranges {
134 must.push(serde_json::json!({
135 "range": {
136 field: range
137 }
138 }));
139 }
140
141 for (field, value) in &self.matches {
143 must.push(serde_json::json!({
144 "match": {
145 field: value
146 }
147 }));
148 }
149
150 if must.is_empty() {
152 return serde_json::json!({
153 "match_all": {}
154 });
155 }
156
157 query
158 }
159
160 pub fn to_search_body(&self) -> Value {
162 let mut body = serde_json::json!({
163 "query": self.to_es_query()
164 });
165
166 if !self.sort.is_empty() {
167 body["sort"] = serde_json::json!(self.sort);
168 }
169
170 if let Some(from) = self.from {
171 body["from"] = serde_json::json!(from);
172 }
173
174 if let Some(size) = self.size {
175 body["size"] = serde_json::json!(size);
176 }
177
178 body
179 }
180}
181
182#[derive(Debug)]
184pub struct SearchResults<T> {
185 pub documents: Vec<T>,
186 pub total: u64,
187 pub took: u64,
188}
189
190#[async_trait]
191pub trait Document: Serialize + DeserializeOwned + Sized {
192 fn index_name() -> &'static str;
194 fn id(&self) -> &str;
196
197 async fn save(&self, client: &Elasticsearch) -> Result<Self, EsOrmError> {
199 let doc_id = self.id().to_string();
200 let doc_body = serde_json::to_value(self)?;
201
202 let response = client
203 .index(elasticsearch::IndexParts::IndexId(
204 Self::index_name(),
205 &doc_id,
206 ))
207 .body(&doc_body)
208 .send()
209 .await?;
210 let status = response.status_code();
211 let body: Value = response.json().await?;
212 if status.is_success() {
213 Ok(serde_json::from_value(doc_body)?)
215 } else {
216 Err(EsOrmError::EsError(body))
217 }
218 }
219
220 async fn get(client: &Elasticsearch, id: &str) -> Result<Self, EsOrmError> {
222 let response = client
223 .get(elasticsearch::GetParts::IndexId(Self::index_name(), id))
224 .send()
225 .await?;
226 let status = response.status_code();
227 let body: Value = response.json().await?;
228 if status.is_success() {
229 let source = body.get("_source").ok_or(EsOrmError::NotFound)?;
231 Ok(serde_json::from_value(source.clone())?)
232 } else if status.as_u16() == 404 {
233 Err(EsOrmError::NotFound)
234 } else {
235 Err(EsOrmError::EsError(body))
236 }
237 }
238
239 async fn delete(client: &Elasticsearch, id: &str) -> Result<DeleteResponse, EsOrmError> {
241 let response = client
242 .delete(elasticsearch::DeleteParts::IndexId(Self::index_name(), id))
243 .send()
244 .await?;
245 let status = response.status_code();
246 let body: Value = response.json().await?;
247 if status.is_success() {
248 Ok(
249 serde_json::from_value(body.clone()).unwrap_or(DeleteResponse {
250 result: "deleted".into(),
251 raw: body,
252 }),
253 )
254 } else if status.as_u16() == 404 {
255 Err(EsOrmError::NotFound)
256 } else {
257 Err(EsOrmError::EsError(body))
258 }
259 }
260
261 async fn update(
263 client: &Elasticsearch,
264 id: &str,
265 partial_doc: &Value,
266 ) -> Result<Self, EsOrmError> {
267 let update_body = serde_json::json!({
268 "doc": partial_doc,
269 "doc_as_upsert": false
270 });
271
272 let response = client
273 .update(elasticsearch::UpdateParts::IndexId(Self::index_name(), id))
274 .body(update_body)
275 .send()
276 .await?;
277
278 let status = response.status_code();
279 let body: Value = response.json().await?;
280
281 if status.is_success() {
282 Self::get(client, id).await
284 } else if status.as_u16() == 404 {
285 Err(EsOrmError::NotFound)
286 } else {
287 Err(EsOrmError::EsError(body))
288 }
289 }
290
291 async fn refresh(&mut self, client: &Elasticsearch) -> Result<(), EsOrmError> {
293 let updated_doc = Self::get(client, self.id()).await?;
294 *self = updated_doc;
295 Ok(())
296 }
297
298 async fn find(
300 client: &Elasticsearch,
301 params: QueryParams,
302 ) -> Result<SearchResults<Self>, EsOrmError> {
303 let search_body = params.to_search_body();
304
305 let response = client
306 .search(SearchParts::Index(&[Self::index_name()]))
307 .body(search_body)
308 .send()
309 .await?;
310
311 let status = response.status_code();
312 let body: Value = response.json().await?;
313
314 if !status.is_success() {
315 return Err(EsOrmError::EsError(body));
316 }
317
318 let hits = body["hits"]["hits"]
319 .as_array()
320 .ok_or(EsOrmError::NotFound)?;
321 let mut documents = Vec::new();
322
323 for hit in hits {
324 let source = hit["_source"].clone();
325 let doc: Self = serde_json::from_value(source)?;
326 documents.push(doc);
327 }
328
329 let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0);
330 let took = body["took"].as_u64().unwrap_or(0);
331
332 Ok(SearchResults {
333 documents,
334 total,
335 took,
336 })
337 }
338
339 async fn find_all(
341 client: &Elasticsearch,
342 limit: Option<usize>,
343 ) -> Result<SearchResults<Self>, EsOrmError> {
344 let mut params = QueryParams::new();
345 if let Some(size) = limit {
346 params.size = Some(size);
347 }
348 Self::find(client, params).await
349 }
350
351 async fn find_one(
353 client: &Elasticsearch,
354 params: QueryParams,
355 ) -> Result<Option<Self>, EsOrmError> {
356 let mut single_params = params;
357 single_params.size = Some(1);
358
359 let results = Self::find(client, single_params).await?;
360 Ok(results.documents.into_iter().next())
361 }
362
363 async fn custom_query(client: &Elasticsearch, query_body: Value) -> Result<Value, EsOrmError> {
365 let response = client
366 .search(SearchParts::Index(&[Self::index_name()]))
367 .body(query_body)
368 .send()
369 .await?;
370
371 let status = response.status_code();
372 let body: Value = response.json().await?;
373
374 if status.is_success() {
375 Ok(body)
376 } else {
377 Err(EsOrmError::EsError(body))
378 }
379 }
380
381 async fn aggregate(
383 client: &Elasticsearch,
384 aggs: Value,
385 query: Option<Value>,
386 size: Option<u64>,
387 ) -> Result<Value, EsOrmError> {
388 let mut body = serde_json::json!({
389 "aggs": aggs,
390 "size": size.unwrap_or(0)
391 });
392
393 if let Some(q) = query {
394 body["query"] = q;
395 }
396
397 Self::custom_query(client, body).await
398 }
399
400 async fn count(client: &Elasticsearch, params: Option<QueryParams>) -> Result<u64, EsOrmError> {
402 let query = params
403 .map(|p| p.to_es_query())
404 .unwrap_or(serde_json::json!({"match_all": {}}));
405
406 let response = client
407 .count(elasticsearch::CountParts::Index(&[Self::index_name()]))
408 .body(serde_json::json!({"query": query}))
409 .send()
410 .await?;
411
412 let status = response.status_code();
413 let body: Value = response.json().await?;
414
415 if status.is_success() {
416 Ok(body["count"].as_u64().unwrap_or(0))
417 } else {
418 Err(EsOrmError::EsError(body))
419 }
420 }
421}
422
423#[cfg(test)]
424#[path = "tests/mod.rs"]
425mod tests;