uiuifree_elastic/
lib.rs

1pub mod error;
2
3use crate::error::ElasticError;
4use dotenv::dotenv;
5use elastic_parser::{Doc, Hit, SearchResponse, Shards};
6use elastic_query_builder::QueryBuilder;
7use elasticsearch::http::request::JsonBody;
8use elasticsearch::http::response::Response;
9pub use elasticsearch::http::transport::*;
10use elasticsearch::http::transport::{SingleNodeConnectionPool, Transport};
11use elasticsearch::indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsAlias, IndicesExistsAliasParts, IndicesExistsIndexTemplate, IndicesExistsIndexTemplateParts, IndicesExistsParts, IndicesExistsTemplateParts, IndicesGetAliasParts, IndicesPutIndexTemplateParts, IndicesRefreshParts};
12pub use elasticsearch::Elasticsearch;
13use elasticsearch::{
14    BulkParts, DeleteByQueryParts, DeleteParts, Error, GetParts, GetSourceParts, IndexParts,
15    ScrollParts, SearchParts, UpdateByQueryParts, UpdateParts,
16};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::env;
21
22extern crate serde;
23// #[macro_use]
24extern crate serde_json;
25
26pub use elastic_parser;
27pub use elastic_query_builder;
28pub use elasticsearch::http::Url;
29use elasticsearch::ilm::IlmPutLifecycleParts;
30use elasticsearch::params::Refresh;
31
32pub fn el_client() -> Result<Elasticsearch, ElasticError> {
33    dotenv().ok();
34    let host = env::var("ELASTIC_HOST").unwrap_or("http://localhost:9200".to_string());
35    let transport = Transport::single_node(host.as_str());
36    // let transport = SingleNodeConnectionPool::new(Url::parse(host.as_str()).unwrap());
37
38    return match transport {
39        Ok(v) => Ok(Elasticsearch::new(v)),
40        Err(e) => Err(ElasticError::Connection(e.to_string())),
41    };
42}
43
44pub fn el_single_node(url: &str) -> Elasticsearch {
45    let pool = SingleNodeConnectionPool::new(Url::parse(url).unwrap());
46    Elasticsearch::new(TransportBuilder::new(pool).build().unwrap())
47}
48
49fn bool_to_refresh(value: bool) -> Refresh {
50    match value {
51        true => Refresh::True,
52        false => Refresh::False,
53    }
54}
55
56async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
57    input: Result<Response, Error>,
58) -> Result<T, ElasticError> {
59    if input.is_err() {
60        return Err(ElasticError::Send(input.unwrap_err().to_string()));
61    }
62    let input = input.unwrap();
63    let status_code = input.status_code().as_u16();
64    if status_code != 200 {
65        let c = input.text().await;
66        return Err(ElasticError::Status(status_code, c.unwrap()));
67    }
68    let a = input.json::<Value>().await;
69    if a.is_err() {
70        return Err(ElasticError::JsonParse(a.err().unwrap().to_string()));
71    }
72    let v = a.unwrap();
73    let b = serde_json::from_value(v.clone());
74    if b.is_err() {
75        return Err(ElasticError::JsonParse(v.to_string()));
76    }
77    return Ok(b.unwrap());
78}
79
80pub struct ElasticApi {
81    client: Elasticsearch,
82}
83
84impl ElasticApi {
85    pub fn new(client: Elasticsearch) -> ElasticApi {
86        ElasticApi { client }
87    }
88    pub fn get(&self) -> GetApi {
89        GetApi::new(&self)
90    }
91    pub fn update(&self) -> UpdateApi {
92        UpdateApi::new(&self)
93    }
94    pub fn indices(&self) -> IndicesApi {
95        IndicesApi::new(&self)
96    }
97    pub fn search(&self) -> SearchApi {
98        SearchApi::new(&self)
99    }
100    pub fn bulk(&self) -> BulkApi {
101        BulkApi::new(&self)
102    }
103    pub fn index(&self) -> IndexApi {
104        IndexApi::new(&self)
105    }
106    pub fn delete_by_query(&self) -> DeleteByQueryApi {
107        DeleteByQueryApi::new(&self)
108    }
109    pub fn update_by_query(&self) -> UpdateByQuery {
110        UpdateByQuery::new(&self)
111    }
112    pub fn ilm(&self) -> IlmApi {
113        IlmApi::new(&self)
114    }
115}
116
117pub struct SearchApi<'a> {
118    api: &'a ElasticApi,
119}
120
121impl SearchApi<'_> {
122    pub fn new(api: &ElasticApi) -> SearchApi {
123        SearchApi { api }
124    }
125}
126
127pub struct GetApi<'a> {
128    api: &'a ElasticApi,
129}
130
131impl GetApi<'_> {
132    pub fn new(api: &ElasticApi) -> GetApi {
133        GetApi { api }
134    }
135}
136
137pub struct DeleteApi<'a> {
138    api: &'a ElasticApi,
139}
140
141impl DeleteApi<'_> {
142    pub fn new(api: &ElasticApi) -> DeleteApi {
143        DeleteApi { api }
144    }
145}
146
147pub struct DeleteDocResponse {}
148
149impl DeleteApi<'_> {
150    pub async fn doc(&self, index: &str, id: &str) -> Result<Response, ElasticError> {
151        let res = self
152            .api
153            .client
154            .delete(DeleteParts::IndexId(index, id))
155            .send()
156            .await;
157        if res.is_err() {
158            return Err(ElasticError::Response(res.err().unwrap().to_string()));
159        }
160        return Ok(res.unwrap());
161    }
162}
163
164impl SearchApi<'_> {
165    pub async fn search<T>(
166        &self,
167        index: &[&str],
168        query_builder: &QueryBuilder,
169    ) -> Result<Option<SearchResponse<T>>, ElasticError>
170    where
171        T: DeserializeOwned + 'static + Clone,
172    {
173        if !query_builder.get_scroll().is_empty() {
174            return match self
175                .api
176                .client
177                .search(SearchParts::Index(index))
178                .body(query_builder.build())
179                .from(query_builder.get_from())
180                .size(query_builder.get_size())
181                .scroll(query_builder.get_scroll())
182                .send()
183                .await
184            {
185                Ok(response) => {
186                    return match response.json::<SearchResponse<T>>().await {
187                        Ok(v) => Ok(Some(v)),
188                        Err(v) => {
189                            return Err(ElasticError::JsonParse(v.to_string()));
190                        }
191                    };
192                }
193                Err(_) => Ok(None),
194            };
195        }
196
197        let res = self
198            .api
199            .client
200            .search(SearchParts::Index(&index))
201            .body(query_builder.build())
202            .from(query_builder.get_from())
203            .size(query_builder.get_size())
204            .send()
205            .await;
206        parse_response(res).await
207    }
208    pub async fn scroll<T>(
209        &self,
210        scroll_id: &str,
211        alive: &str,
212    ) -> Result<Option<SearchResponse<T>>, ElasticError>
213    where
214        T: DeserializeOwned + 'static + Clone,
215    {
216        match self
217            .api
218            .client
219            .scroll(ScrollParts::ScrollId(scroll_id))
220            .scroll(alive)
221            .send()
222            .await
223        {
224            Ok(response) => {
225                return match response.json::<SearchResponse<T>>().await {
226                    Ok(v) => Ok(Some(v)),
227                    Err(v) => {
228                        return Err(ElasticError::Response(v.to_string()));
229                    }
230                };
231            }
232            Err(_) => Ok(None),
233        }
234    }
235
236    pub async fn first_search<T>(
237        &self,
238        index: &str,
239        query_builder: QueryBuilder,
240    ) -> Result<Option<Hit<T>>, ElasticError>
241    where
242        T: DeserializeOwned + 'static + Clone,
243    {
244        return match self
245            .api
246            .client
247            .search(SearchParts::Index(&[index]))
248            .body(query_builder.build())
249            .size(1)
250            .send()
251            .await
252        {
253            Ok(response) => {
254                return match response.json::<SearchResponse<T>>().await {
255                    Ok(v) => {
256                        let hits = v.hits;
257                        if hits.is_none() {
258                            return Ok(None);
259                        }
260                        let hits = hits.unwrap();
261                        let value = hits.hits;
262                        if value.is_none() {
263                            return Ok(None);
264                        }
265                        let value = value.unwrap().pop();
266                        if value.is_none() {
267                            return Ok(None);
268                        }
269                        Ok(Some(value.unwrap()))
270                    }
271                    Err(v) => {
272                        return Err(ElasticError::Response(v.to_string()));
273                    }
274                };
275            }
276            Err(_) => Ok(None),
277        };
278    }
279}
280
281pub struct IndicesApi<'a> {
282    api: &'a ElasticApi,
283}
284
285impl IndicesApi<'_> {
286    pub fn new(api: &ElasticApi) -> IndicesApi {
287        IndicesApi { api }
288    }
289}
290
291impl IndicesApi<'_> {
292    pub async fn get_alias(&self, index: &[&str]) -> Result<Value, ElasticError> {
293        let res = self
294            .api
295            .client
296            .indices()
297            .get_alias(IndicesGetAliasParts::Index(index))
298            .send()
299            .await;
300
301        match res {
302            Ok(v) => {
303                if v.status_code() != 200 {
304                    return Err(ElasticError::NotFound(index.join(",")));
305                }
306                Ok(v.json().await.unwrap())
307            }
308            Err(err) => Err(ElasticError::Connection(err.to_string())),
309        }
310    }
311    pub async fn exist_alias(&self, index: &[&str]) -> Result<(), ElasticError> {
312        let res = self
313            .api
314            .client
315            .indices()
316            .exists_alias(IndicesExistsAliasParts::Name(index))
317            .send()
318            .await;
319
320        match res {
321            Ok(v) => {
322                if v.status_code() != 200 {
323                    return Err(ElasticError::NotFound(index.join(",")));
324                }
325                Ok(())
326            }
327            Err(err) => Err(ElasticError::Connection(err.to_string())),
328        }
329    }
330    pub async fn update_alias(&self, value: Value) -> Result<Value, ElasticError> {
331        let res = self
332            .api
333            .client
334            .indices()
335            .update_aliases()
336            .body(value)
337            .send()
338            .await;
339
340        match res {
341            Ok(v) => {
342                if v.status_code() != 200 {
343                    return Err(ElasticError::Status(
344                        v.status_code().as_u16(),
345                        v.text().await.unwrap_or_default(),
346                    ));
347                }
348                Ok(v.json().await.unwrap())
349            }
350            Err(err) => Err(ElasticError::Connection(err.to_string())),
351        }
352    }
353    pub async fn exists(&self, index: &str) -> Result<(), ElasticError> {
354        let res = self
355            .api
356            .client
357            .indices()
358            .exists(IndicesExistsParts::Index(&[index]))
359            .send()
360            .await;
361        // el_client()?.index(IndexParts::IndexId("1","1")).body()
362
363        if res.is_err() {
364            return Err(ElasticError::Connection(res.unwrap_err().to_string()));
365        }
366        if res.unwrap().status_code() != 200 {
367            return Err(ElasticError::NotFound(index.to_string()));
368        }
369        Ok(())
370    }
371    pub async fn refresh(&self, index: &str) -> Result<IndicesRefreshResponse, ElasticError> {
372        let res = self
373            .api
374            .client
375            .indices()
376            .refresh(IndicesRefreshParts::Index(&[index]))
377            .send()
378            .await;
379        if res.is_err() {
380            return Err(ElasticError::Connection(res.err().unwrap().to_string()));
381        }
382
383        parse_response(res).await
384    }
385    pub async fn create<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
386    where
387        T: Serialize,
388    {
389        return match self
390            .api
391            .client
392            .indices()
393            .create(IndicesCreateParts::Index(index))
394            .body(json)
395            .send()
396            .await
397        {
398            Ok(v) => {
399                let s = v.status_code();
400                Ok(s == 200)
401            }
402            Err(e) => Err(ElasticError::Send(e.to_string())),
403        };
404    }
405    pub async fn put_index_template<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
406    where
407        T: Serialize,
408    {
409        return match self
410            .api
411            .client
412            .indices()
413            .put_index_template(IndicesPutIndexTemplateParts::Name(index))
414            .body(json)
415            .send()
416            .await
417        {
418            Ok(v) => {
419                let s = v.status_code();
420                Ok(s == 200)
421            }
422            Err(e) => Err(ElasticError::Send(e.to_string())),
423        };
424    }
425    pub async fn exists_index_template<T>(&self, index: &str) -> Result<bool, ElasticError>
426    {
427        return match self
428            .api
429            .client
430            .indices()
431            .exists_index_template(IndicesExistsIndexTemplateParts::Name(index))
432            .send()
433            .await
434        {
435            Ok(v) => {
436                let s = v.status_code();
437                Ok(s == 200)
438            }
439            Err(e) => Err(ElasticError::Send(e.to_string())),
440        };
441    }
442
443    pub async fn delete(&self, index: &str) -> Result<bool, ElasticError> {
444        let res = self
445            .api
446            .client
447            .indices()
448            .delete(IndicesDeleteParts::Index(&[index]))
449            .send()
450            .await;
451        if res.is_err() {
452            return Err(ElasticError::Response(res.err().unwrap().to_string()));
453        }
454        let res = res.unwrap();
455        Ok(res.status_code() == 200)
456    }
457
458    pub async fn recreate<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
459    where
460        T: Serialize,
461    {
462        if self.api.indices().exists(index).await.is_ok() {
463            let _ = self.api.indices().delete(index).await?;
464        }
465        Ok(self.api.indices().create(index, json).await?)
466    }
467}
468
469#[derive(Debug, Serialize, Deserialize, Clone)]
470pub struct IndicesRefreshResponse {
471    pub _shards: Option<Shards>,
472}
473
474impl GetApi<'_> {
475    pub async fn source<T: for<'de> serde::Deserialize<'de>>(
476        &self,
477        index: &str,
478        id: &str,
479    ) -> Result<T, ElasticError> {
480        let res = self
481            .api
482            .client
483            .get_source(GetSourceParts::IndexId(index, id))
484            .send()
485            .await;
486        parse_response(res).await
487    }
488    pub async fn doc<T: for<'de> serde::Deserialize<'de>>(
489        &self,
490        index: &str,
491        id: &str,
492    ) -> Result<Doc<T>, ElasticError> {
493        let res = self
494            .api
495            .client
496            .get(GetParts::IndexId(index, id))
497            .send()
498            .await;
499
500        parse_response::<Doc<T>>(res).await
501    }
502}
503
504pub struct UpdateApi<'a> {
505    api: &'a ElasticApi,
506}
507
508impl UpdateApi<'_> {
509    pub fn new(api: &ElasticApi) -> UpdateApi {
510        UpdateApi { api }
511    }
512}
513
514impl UpdateApi<'_> {
515    pub async fn doc<T: serde::Serialize>(
516        &self,
517        index: &str,
518        id: &str,
519        source: T,
520        refresh: bool,
521    ) -> Result<(), ElasticError> {
522        let res = self
523            .api
524            .client
525            .update(UpdateParts::IndexId(index, id))
526            .refresh(bool_to_refresh(refresh))
527            .body(json!({ "doc": source }))
528            .send()
529            .await;
530        if res.is_err() {
531            return Err(ElasticError::Response(res.unwrap_err().to_string()));
532        }
533        let code = res.as_ref().unwrap().status_code();
534        if code == 404 {
535            return Err(ElasticError::NotFound(format!("not found entity: {}", id)));
536        }
537        let res = res.unwrap();
538        if res.status_code() != 200 {
539            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
540        }
541        Ok(())
542    }
543}
544
545pub struct BulkApi<'a> {
546    api: &'a ElasticApi,
547}
548
549impl BulkApi<'_> {
550    pub fn new(api: &ElasticApi) -> BulkApi {
551        BulkApi { api }
552    }
553}
554
555impl BulkApi<'_> {
556    pub async fn bulk<T: serde::Serialize>(
557        &self,
558        sources: Vec<T>,
559        refresh: bool,
560    ) -> Result<Value, ElasticError> {
561        let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
562        for source in sources {
563            body.push(json!(source).into())
564        }
565
566        parse_response(
567            self.api
568                .client
569                .bulk(BulkParts::None)
570                .body(body)
571                .refresh(bool_to_refresh(refresh))
572                .send()
573                .await,
574        )
575            .await
576        // let res = client.bulk(BulkParts::None).body(body).send().await;
577        // if res.is_err() {
578        //     return Err(ElasticError::Response(res.err().unwrap().to_string()));
579        // }
580        //
581        // return Ok(res.unwrap());
582    }
583    pub async fn insert_index<T: serde::Serialize>(
584        &self,
585        index: &str,
586        sources: Vec<T>,
587    ) -> Result<Response, ElasticError> {
588        let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
589        for source in sources {
590            body.push(json!({"index": {}}).into());
591            body.push(json!(source).into())
592        }
593        let res = self
594            .api
595            .client
596            .bulk(BulkParts::Index(index))
597            .body(body)
598            .send()
599            .await;
600        if res.is_err() {
601            return Err(ElasticError::Response(res.err().unwrap().to_string()));
602        }
603        return Ok(res.unwrap());
604    }
605    pub async fn insert_index_by_id<T: serde::Serialize>(
606        &self,
607        index: &str,
608        id: &str,
609        source: T,
610        refresh: bool,
611    ) -> Result<Response, ElasticError> {
612        let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
613        body.push(json!({"index": {"_id":id}}).into());
614        body.push(json!(source).into());
615
616        let res = self
617            .api
618            .client
619            .bulk(BulkParts::Index(index))
620            .body(body)
621            .refresh(bool_to_refresh(refresh))
622            .send()
623            .await;
624        if res.is_err() {
625            return Err(ElasticError::Response(res.err().unwrap().to_string()));
626        }
627        return Ok(res.unwrap());
628    }
629}
630
631/// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
632pub struct IndexApi<'a> {
633    api: &'a ElasticApi,
634}
635
636impl IndexApi<'_> {
637    pub fn new(api: &ElasticApi) -> IndexApi {
638        IndexApi { api }
639    }
640}
641
642impl IndexApi<'_> {
643    pub async fn create<T: serde::Serialize>(
644        &self,
645        index: &str,
646        source: T,
647        refresh: bool,
648    ) -> Result<(), ElasticError> {
649        let res = self
650            .api
651            .client
652            .index(IndexParts::Index(index))
653            .refresh(bool_to_refresh(refresh))
654            .body(source)
655            .send()
656            .await;
657        if res.is_err() {
658            return Err(ElasticError::Response(res.unwrap_err().to_string()));
659        }
660        let code = res.as_ref().unwrap().status_code();
661        if code == 404 {
662            return Err(ElasticError::NotFound(format!("not found entity")));
663        }
664        let res = res.unwrap();
665        // println!("status: {}",code);
666        if res.status_code() != 200 && res.status_code() != 201 {
667            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
668        }
669        Ok(())
670    }
671    pub async fn doc<T: serde::Serialize>(
672        &self,
673        index: &str,
674        id: &str,
675        source: T,
676        refresh: bool,
677    ) -> Result<(), ElasticError> {
678        let res = self
679            .api
680            .client
681            .index(IndexParts::IndexId(index, id))
682            .refresh(bool_to_refresh(refresh))
683            .body(source)
684            .send()
685            .await;
686        if res.is_err() {
687            return Err(ElasticError::Response(res.unwrap_err().to_string()));
688        }
689        let code = res.as_ref().unwrap().status_code();
690        if code == 404 {
691            return Err(ElasticError::NotFound(format!("not found entity: {}", id)));
692        }
693        let res = res.unwrap();
694        // println!("status: {}",code);
695        if res.status_code() != 200 && res.status_code() != 201 {
696            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
697        }
698        Ok(())
699    }
700}
701
702/// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
703pub struct UpdateByQuery<'a> {
704    api: &'a ElasticApi,
705}
706
707impl UpdateByQuery<'_> {
708    pub fn new(api: &ElasticApi) -> UpdateByQuery {
709        UpdateByQuery { api }
710    }
711}
712
713impl UpdateByQuery<'_> {
714    pub async fn index(
715        &self,
716        index: &str,
717        query_builder: &QueryBuilder,
718        refresh: bool,
719    ) -> Result<(), ElasticError> {
720        let res = self
721            .api
722            .client
723            .update_by_query(UpdateByQueryParts::Index(&[index]))
724            .refresh(refresh)
725            .body(query_builder.build())
726            .send()
727            .await;
728        if res.is_err() {
729            return Err(ElasticError::Response(res.unwrap_err().to_string()));
730        }
731        let code = res.as_ref().unwrap().status_code();
732        if code == 404 {
733            return Err(ElasticError::NotFound(format!("not found entity")));
734        }
735        let res = res.unwrap();
736        if res.status_code() != 200 {
737            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
738        }
739        Ok(())
740    }
741}
742
743/// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
744pub struct DeleteByQueryApi<'a> {
745    api: &'a ElasticApi,
746}
747
748impl DeleteByQueryApi<'_> {
749    pub fn new(api: &ElasticApi) -> DeleteByQueryApi {
750        DeleteByQueryApi { api }
751    }
752}
753
754impl DeleteByQueryApi<'_> {
755    pub async fn index(
756        &self,
757        index: &str,
758        query_builder: &QueryBuilder,
759        refresh: bool,
760    ) -> Result<(), ElasticError> {
761        let res = self
762            .api
763            .client
764            .delete_by_query(DeleteByQueryParts::Index(&[index]))
765            .body(query_builder.build())
766            .refresh(refresh)
767            .send()
768            .await;
769        if res.is_err() {
770            return Err(ElasticError::Response(res.unwrap_err().to_string()));
771        }
772        let code = res.as_ref().unwrap().status_code();
773        if code == 404 {
774            return Err(ElasticError::NotFound(format!("not found index")));
775        }
776        let res = res.unwrap();
777        // println!("status: {}",code);
778        if res.status_code() != 200 && res.status_code() != 201 {
779            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
780        }
781        Ok(())
782    }
783}
784
785/// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
786pub struct IlmApi<'a> {
787    api: &'a ElasticApi,
788}
789
790impl IlmApi<'_> {
791    pub fn new(api: &ElasticApi) -> IlmApi {
792        IlmApi { api }
793    }
794}
795
796#[derive(Serialize, Deserialize)]
797struct Acknowledged {
798    acknowledged: bool,
799}
800impl IlmApi<'_> {
801    pub async fn put_lifecycle<T: Serialize>(
802        &self,
803        ilm_name: &str,
804        value: T,
805    ) -> Result<bool, ElasticError>
806    {
807        let res = match self
808            .api
809            .client
810            .ilm()
811            .put_lifecycle(IlmPutLifecycleParts::Policy(ilm_name))
812            .body(value)
813            .send()
814            .await {
815            Ok(v) => v,
816            Err(e) => { return Err(ElasticError::Response(e.to_string())) }
817        };
818
819        let code = res.status_code();
820        if code == 404 {
821            return Err(ElasticError::NotFound("not found ILM".to_string()));
822        }
823        if res.status_code() != 200 && res.status_code() != 201 {
824            return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
825        }
826        match res.json::<Acknowledged>().await {
827            Ok(v) => { Ok(v.acknowledged) }
828            Err(_) => { Ok(false) }
829        }
830    }
831}