easy_odm/
lib.rs

1use async_trait::async_trait;
2use elasticsearch::{Elasticsearch, Error as EsError, SearchParts};
3use serde::{Deserialize, Serialize, de::DeserializeOwned};
4use serde_json::Value;
5use std::collections::HashMap;
6
7/// Common error type for the ES ORM
8#[derive(thiserror::Error, Debug)]
9pub enum EsOrmError {
10    #[error("HTTP error: {0}")]
11    Http(#[from] EsError),
12    #[error("Serialization error: {0}")]
13    Serde(#[from] serde_json::Error),
14    #[error("Document not found")]
15    NotFound,
16    #[error("Elasticsearch error: {0}")]
17    EsError(Value),
18}
19
20/// Minimal response for index operations
21#[derive(Debug, Deserialize)]
22pub struct IndexResponse {
23    pub result: String,
24    #[serde(flatten)]
25    pub raw: Value,
26}
27
28/// Minimal response for delete operations
29#[derive(Debug, Deserialize)]
30pub struct DeleteResponse {
31    pub result: String,
32    #[serde(flatten)]
33    pub raw: Value,
34}
35
36/// Response for update operations
37#[derive(Debug, Deserialize)]
38pub struct UpdateResponse {
39    pub result: String,
40    #[serde(rename = "_version")]
41    pub version: Option<u64>,
42    #[serde(flatten)]
43    pub raw: Value,
44}
45
46/// Query parameters for filtering documents
47#[derive(Debug, Clone)]
48pub struct QueryParams {
49    /// Simple field-value filters (term queries)
50    pub filters: HashMap<String, Value>,
51    /// Range queries (e.g., {"age": {"gte": 18, "lte": 65}})
52    pub ranges: HashMap<String, HashMap<String, Value>>,
53    /// Match queries for text search
54    pub matches: HashMap<String, String>,
55    /// Sort parameters
56    pub sort: Vec<HashMap<String, Value>>,
57    /// Pagination
58    pub from: Option<usize>,
59    pub size: Option<usize>,
60}
61
62impl Default for QueryParams {
63    fn default() -> Self {
64        Self {
65            filters: HashMap::new(),
66            ranges: HashMap::new(),
67            matches: HashMap::new(),
68            sort: Vec::new(),
69            from: None,
70            size: Some(10), // Default to 10 results
71        }
72    }
73}
74
75impl QueryParams {
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Add a term filter
81    pub fn filter(mut self, field: &str, value: Value) -> Self {
82        self.filters.insert(field.to_string(), value);
83        self
84    }
85
86    /// Add a range filter
87    pub fn range(mut self, field: &str, range: HashMap<String, Value>) -> Self {
88        self.ranges.insert(field.to_string(), range);
89        self
90    }
91
92    /// Add a match query
93    pub fn match_query(mut self, field: &str, value: &str) -> Self {
94        self.matches.insert(field.to_string(), value.to_string());
95        self
96    }
97
98    /// Add sorting
99    pub fn sort_by(mut self, field: &str, order: &str) -> Self {
100        let mut sort_item = HashMap::new();
101        sort_item.insert(field.to_string(), serde_json::json!({"order": order}));
102        self.sort.push(sort_item);
103        self
104    }
105
106    /// Set pagination
107    pub fn paginate(mut self, from: usize, size: usize) -> Self {
108        self.from = Some(from);
109        self.size = Some(size);
110        self
111    }
112
113    /// Convert to Elasticsearch query DSL
114    pub fn to_es_query(&self) -> Value {
115        let mut query = serde_json::json!({
116            "bool": {
117                "must": []
118            }
119        });
120
121        let must = query["bool"]["must"].as_array_mut().unwrap();
122
123        // Add term filters
124        for (field, value) in &self.filters {
125            must.push(serde_json::json!({
126                "term": {
127                    field: value
128                }
129            }));
130        }
131
132        // Add range queries
133        for (field, range) in &self.ranges {
134            must.push(serde_json::json!({
135                "range": {
136                    field: range
137                }
138            }));
139        }
140
141        // Add match queries
142        for (field, value) in &self.matches {
143            must.push(serde_json::json!({
144                "match": {
145                    field: value
146                }
147            }));
148        }
149
150        // If no conditions, use match_all
151        if must.is_empty() {
152            return serde_json::json!({
153                "match_all": {}
154            });
155        }
156
157        query
158    }
159
160    /// Convert to full search body
161    pub fn to_search_body(&self) -> Value {
162        let mut body = serde_json::json!({
163            "query": self.to_es_query()
164        });
165
166        if !self.sort.is_empty() {
167            body["sort"] = serde_json::json!(self.sort);
168        }
169
170        if let Some(from) = self.from {
171            body["from"] = serde_json::json!(from);
172        }
173
174        if let Some(size) = self.size {
175            body["size"] = serde_json::json!(size);
176        }
177
178        body
179    }
180}
181
182/// Search results wrapper
183#[derive(Debug)]
184pub struct SearchResults<T> {
185    pub documents: Vec<T>,
186    pub total: u64,
187    pub took: u64,
188}
189
190#[async_trait]
191pub trait Document: Serialize + DeserializeOwned + Sized {
192    /// The Elasticsearch index name where documents of this type live
193    fn index_name() -> &'static str;
194    /// Return the unique ID of this document
195    fn id(&self) -> &str;
196
197    /// Create or update this document in Elasticsearch and return the saved document
198    async fn save(&self, client: &Elasticsearch) -> Result<Self, EsOrmError> {
199        let doc_id = self.id().to_string();
200        let doc_body = serde_json::to_value(self)?;
201
202        let response = client
203            .index(elasticsearch::IndexParts::IndexId(
204                Self::index_name(),
205                &doc_id,
206            ))
207            .body(&doc_body)
208            .send()
209            .await?;
210        let status = response.status_code();
211        let body: Value = response.json().await?;
212        if status.is_success() {
213            // Return the document we just saved (we already have it)
214            Ok(serde_json::from_value(doc_body)?)
215        } else {
216            Err(EsOrmError::EsError(body))
217        }
218    }
219
220    /// Fetch a document by ID
221    async fn get(client: &Elasticsearch, id: &str) -> Result<Self, EsOrmError> {
222        let response = client
223            .get(elasticsearch::GetParts::IndexId(Self::index_name(), id))
224            .send()
225            .await?;
226        let status = response.status_code();
227        let body: Value = response.json().await?;
228        if status.is_success() {
229            // ES wraps source under `_source`
230            let source = body.get("_source").ok_or(EsOrmError::NotFound)?;
231            Ok(serde_json::from_value(source.clone())?)
232        } else if status.as_u16() == 404 {
233            Err(EsOrmError::NotFound)
234        } else {
235            Err(EsOrmError::EsError(body))
236        }
237    }
238
239    /// Delete a document by ID
240    async fn delete(client: &Elasticsearch, id: &str) -> Result<DeleteResponse, EsOrmError> {
241        let response = client
242            .delete(elasticsearch::DeleteParts::IndexId(Self::index_name(), id))
243            .send()
244            .await?;
245        let status = response.status_code();
246        let body: Value = response.json().await?;
247        if status.is_success() {
248            Ok(
249                serde_json::from_value(body.clone()).unwrap_or(DeleteResponse {
250                    result: "deleted".into(),
251                    raw: body,
252                }),
253            )
254        } else if status.as_u16() == 404 {
255            Err(EsOrmError::NotFound)
256        } else {
257            Err(EsOrmError::EsError(body))
258        }
259    }
260
261    /// Update a document with partial data and return the updated document
262    async fn update(
263        client: &Elasticsearch,
264        id: &str,
265        partial_doc: &Value,
266    ) -> Result<Self, EsOrmError> {
267        let update_body = serde_json::json!({
268            "doc": partial_doc,
269            "doc_as_upsert": false
270        });
271
272        let response = client
273            .update(elasticsearch::UpdateParts::IndexId(Self::index_name(), id))
274            .body(update_body)
275            .send()
276            .await?;
277
278        let status = response.status_code();
279        let body: Value = response.json().await?;
280
281        if status.is_success() {
282            // After successful update, fetch the updated document
283            Self::get(client, id).await
284        } else if status.as_u16() == 404 {
285            Err(EsOrmError::NotFound)
286        } else {
287            Err(EsOrmError::EsError(body))
288        }
289    }
290
291    /// Refresh this document instance with the latest data from Elasticsearch
292    async fn refresh(&mut self, client: &Elasticsearch) -> Result<(), EsOrmError> {
293        let updated_doc = Self::get(client, self.id()).await?;
294        *self = updated_doc;
295        Ok(())
296    }
297
298    /// Find documents using query parameters
299    async fn find(
300        client: &Elasticsearch,
301        params: QueryParams,
302    ) -> Result<SearchResults<Self>, EsOrmError> {
303        let search_body = params.to_search_body();
304
305        let response = client
306            .search(SearchParts::Index(&[Self::index_name()]))
307            .body(search_body)
308            .send()
309            .await?;
310
311        let status = response.status_code();
312        let body: Value = response.json().await?;
313
314        if !status.is_success() {
315            return Err(EsOrmError::EsError(body));
316        }
317
318        let hits = body["hits"]["hits"]
319            .as_array()
320            .ok_or(EsOrmError::NotFound)?;
321        let mut documents = Vec::new();
322
323        for hit in hits {
324            let source = hit["_source"].clone();
325            let doc: Self = serde_json::from_value(source)?;
326            documents.push(doc);
327        }
328
329        let total = body["hits"]["total"]["value"].as_u64().unwrap_or(0);
330        let took = body["took"].as_u64().unwrap_or(0);
331
332        Ok(SearchResults {
333            documents,
334            total,
335            took,
336        })
337    }
338
339    /// Find all documents (with optional limit)
340    async fn find_all(
341        client: &Elasticsearch,
342        limit: Option<usize>,
343    ) -> Result<SearchResults<Self>, EsOrmError> {
344        let mut params = QueryParams::new();
345        if let Some(size) = limit {
346            params.size = Some(size);
347        }
348        Self::find(client, params).await
349    }
350
351    /// Find one document matching the query parameters
352    async fn find_one(
353        client: &Elasticsearch,
354        params: QueryParams,
355    ) -> Result<Option<Self>, EsOrmError> {
356        let mut single_params = params;
357        single_params.size = Some(1);
358
359        let results = Self::find(client, single_params).await?;
360        Ok(results.documents.into_iter().next())
361    }
362
363    /// Execute a custom query and return raw response
364    async fn custom_query(client: &Elasticsearch, query_body: Value) -> Result<Value, EsOrmError> {
365        let response = client
366            .search(SearchParts::Index(&[Self::index_name()]))
367            .body(query_body)
368            .send()
369            .await?;
370
371        let status = response.status_code();
372        let body: Value = response.json().await?;
373
374        if status.is_success() {
375            Ok(body)
376        } else {
377            Err(EsOrmError::EsError(body))
378        }
379    }
380
381    /// Execute an aggregation query
382    async fn aggregate(
383        client: &Elasticsearch,
384        aggs: Value,
385        query: Option<Value>,
386        size: Option<u64>,
387    ) -> Result<Value, EsOrmError> {
388        let mut body = serde_json::json!({
389        "aggs": aggs,
390        "size": size.unwrap_or(0)
391    });
392
393        if let Some(q) = query {
394            body["query"] = q;
395        }
396
397        Self::custom_query(client, body).await
398    }
399
400    /// Count documents matching the query
401    async fn count(client: &Elasticsearch, params: Option<QueryParams>) -> Result<u64, EsOrmError> {
402        let query = params
403            .map(|p| p.to_es_query())
404            .unwrap_or(serde_json::json!({"match_all": {}}));
405
406        let response = client
407            .count(elasticsearch::CountParts::Index(&[Self::index_name()]))
408            .body(serde_json::json!({"query": query}))
409            .send()
410            .await?;
411
412        let status = response.status_code();
413        let body: Value = response.json().await?;
414
415        if status.is_success() {
416            Ok(body["count"].as_u64().unwrap_or(0))
417        } else {
418            Err(EsOrmError::EsError(body))
419        }
420    }
421}
422
423#[cfg(test)]
424#[path = "tests/mod.rs"]
425mod tests;