1pub mod error;
2
3use crate::error::ElasticError;
4use dotenv::dotenv;
5use elastic_parser::{Doc, Hit, SearchResponse, Shards};
6use elastic_query_builder::QueryBuilder;
7use elasticsearch::http::request::JsonBody;
8use elasticsearch::http::response::Response;
9pub use elasticsearch::http::transport::*;
10use elasticsearch::http::transport::{SingleNodeConnectionPool, Transport};
11use elasticsearch::indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsAlias, IndicesExistsAliasParts, IndicesExistsIndexTemplate, IndicesExistsIndexTemplateParts, IndicesExistsParts, IndicesExistsTemplateParts, IndicesGetAliasParts, IndicesPutIndexTemplateParts, IndicesRefreshParts};
12pub use elasticsearch::Elasticsearch;
13use elasticsearch::{
14 BulkParts, DeleteByQueryParts, DeleteParts, Error, GetParts, GetSourceParts, IndexParts,
15 ScrollParts, SearchParts, UpdateByQueryParts, UpdateParts,
16};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use std::env;
21
22extern crate serde;
23extern crate serde_json;
25
26pub use elastic_parser;
27pub use elastic_query_builder;
28pub use elasticsearch::http::Url;
29use elasticsearch::ilm::IlmPutLifecycleParts;
30use elasticsearch::params::Refresh;
31
32pub fn el_client() -> Result<Elasticsearch, ElasticError> {
33 dotenv().ok();
34 let host = env::var("ELASTIC_HOST").unwrap_or("http://localhost:9200".to_string());
35 let transport = Transport::single_node(host.as_str());
36 return match transport {
39 Ok(v) => Ok(Elasticsearch::new(v)),
40 Err(e) => Err(ElasticError::Connection(e.to_string())),
41 };
42}
43
44pub fn el_single_node(url: &str) -> Elasticsearch {
45 let pool = SingleNodeConnectionPool::new(Url::parse(url).unwrap());
46 Elasticsearch::new(TransportBuilder::new(pool).build().unwrap())
47}
48
49fn bool_to_refresh(value: bool) -> Refresh {
50 match value {
51 true => Refresh::True,
52 false => Refresh::False,
53 }
54}
55
56async fn parse_response<T: for<'de> serde::Deserialize<'de>>(
57 input: Result<Response, Error>,
58) -> Result<T, ElasticError> {
59 if input.is_err() {
60 return Err(ElasticError::Send(input.unwrap_err().to_string()));
61 }
62 let input = input.unwrap();
63 let status_code = input.status_code().as_u16();
64 if status_code != 200 {
65 let c = input.text().await;
66 return Err(ElasticError::Status(status_code, c.unwrap()));
67 }
68 let a = input.json::<Value>().await;
69 if a.is_err() {
70 return Err(ElasticError::JsonParse(a.err().unwrap().to_string()));
71 }
72 let v = a.unwrap();
73 let b = serde_json::from_value(v.clone());
74 if b.is_err() {
75 return Err(ElasticError::JsonParse(v.to_string()));
76 }
77 return Ok(b.unwrap());
78}
79
80pub struct ElasticApi {
81 client: Elasticsearch,
82}
83
84impl ElasticApi {
85 pub fn new(client: Elasticsearch) -> ElasticApi {
86 ElasticApi { client }
87 }
88 pub fn get(&self) -> GetApi {
89 GetApi::new(&self)
90 }
91 pub fn update(&self) -> UpdateApi {
92 UpdateApi::new(&self)
93 }
94 pub fn indices(&self) -> IndicesApi {
95 IndicesApi::new(&self)
96 }
97 pub fn search(&self) -> SearchApi {
98 SearchApi::new(&self)
99 }
100 pub fn bulk(&self) -> BulkApi {
101 BulkApi::new(&self)
102 }
103 pub fn index(&self) -> IndexApi {
104 IndexApi::new(&self)
105 }
106 pub fn delete_by_query(&self) -> DeleteByQueryApi {
107 DeleteByQueryApi::new(&self)
108 }
109 pub fn update_by_query(&self) -> UpdateByQuery {
110 UpdateByQuery::new(&self)
111 }
112 pub fn ilm(&self) -> IlmApi {
113 IlmApi::new(&self)
114 }
115}
116
117pub struct SearchApi<'a> {
118 api: &'a ElasticApi,
119}
120
121impl SearchApi<'_> {
122 pub fn new(api: &ElasticApi) -> SearchApi {
123 SearchApi { api }
124 }
125}
126
127pub struct GetApi<'a> {
128 api: &'a ElasticApi,
129}
130
131impl GetApi<'_> {
132 pub fn new(api: &ElasticApi) -> GetApi {
133 GetApi { api }
134 }
135}
136
137pub struct DeleteApi<'a> {
138 api: &'a ElasticApi,
139}
140
141impl DeleteApi<'_> {
142 pub fn new(api: &ElasticApi) -> DeleteApi {
143 DeleteApi { api }
144 }
145}
146
147pub struct DeleteDocResponse {}
148
149impl DeleteApi<'_> {
150 pub async fn doc(&self, index: &str, id: &str) -> Result<Response, ElasticError> {
151 let res = self
152 .api
153 .client
154 .delete(DeleteParts::IndexId(index, id))
155 .send()
156 .await;
157 if res.is_err() {
158 return Err(ElasticError::Response(res.err().unwrap().to_string()));
159 }
160 return Ok(res.unwrap());
161 }
162}
163
164impl SearchApi<'_> {
165 pub async fn search<T>(
166 &self,
167 index: &[&str],
168 query_builder: &QueryBuilder,
169 ) -> Result<Option<SearchResponse<T>>, ElasticError>
170 where
171 T: DeserializeOwned + 'static + Clone,
172 {
173 if !query_builder.get_scroll().is_empty() {
174 return match self
175 .api
176 .client
177 .search(SearchParts::Index(index))
178 .body(query_builder.build())
179 .from(query_builder.get_from())
180 .size(query_builder.get_size())
181 .scroll(query_builder.get_scroll())
182 .send()
183 .await
184 {
185 Ok(response) => {
186 return match response.json::<SearchResponse<T>>().await {
187 Ok(v) => Ok(Some(v)),
188 Err(v) => {
189 return Err(ElasticError::JsonParse(v.to_string()));
190 }
191 };
192 }
193 Err(_) => Ok(None),
194 };
195 }
196
197 let res = self
198 .api
199 .client
200 .search(SearchParts::Index(&index))
201 .body(query_builder.build())
202 .from(query_builder.get_from())
203 .size(query_builder.get_size())
204 .send()
205 .await;
206 parse_response(res).await
207 }
208 pub async fn scroll<T>(
209 &self,
210 scroll_id: &str,
211 alive: &str,
212 ) -> Result<Option<SearchResponse<T>>, ElasticError>
213 where
214 T: DeserializeOwned + 'static + Clone,
215 {
216 match self
217 .api
218 .client
219 .scroll(ScrollParts::ScrollId(scroll_id))
220 .scroll(alive)
221 .send()
222 .await
223 {
224 Ok(response) => {
225 return match response.json::<SearchResponse<T>>().await {
226 Ok(v) => Ok(Some(v)),
227 Err(v) => {
228 return Err(ElasticError::Response(v.to_string()));
229 }
230 };
231 }
232 Err(_) => Ok(None),
233 }
234 }
235
236 pub async fn first_search<T>(
237 &self,
238 index: &str,
239 query_builder: QueryBuilder,
240 ) -> Result<Option<Hit<T>>, ElasticError>
241 where
242 T: DeserializeOwned + 'static + Clone,
243 {
244 return match self
245 .api
246 .client
247 .search(SearchParts::Index(&[index]))
248 .body(query_builder.build())
249 .size(1)
250 .send()
251 .await
252 {
253 Ok(response) => {
254 return match response.json::<SearchResponse<T>>().await {
255 Ok(v) => {
256 let hits = v.hits;
257 if hits.is_none() {
258 return Ok(None);
259 }
260 let hits = hits.unwrap();
261 let value = hits.hits;
262 if value.is_none() {
263 return Ok(None);
264 }
265 let value = value.unwrap().pop();
266 if value.is_none() {
267 return Ok(None);
268 }
269 Ok(Some(value.unwrap()))
270 }
271 Err(v) => {
272 return Err(ElasticError::Response(v.to_string()));
273 }
274 };
275 }
276 Err(_) => Ok(None),
277 };
278 }
279}
280
281pub struct IndicesApi<'a> {
282 api: &'a ElasticApi,
283}
284
285impl IndicesApi<'_> {
286 pub fn new(api: &ElasticApi) -> IndicesApi {
287 IndicesApi { api }
288 }
289}
290
291impl IndicesApi<'_> {
292 pub async fn get_alias(&self, index: &[&str]) -> Result<Value, ElasticError> {
293 let res = self
294 .api
295 .client
296 .indices()
297 .get_alias(IndicesGetAliasParts::Index(index))
298 .send()
299 .await;
300
301 match res {
302 Ok(v) => {
303 if v.status_code() != 200 {
304 return Err(ElasticError::NotFound(index.join(",")));
305 }
306 Ok(v.json().await.unwrap())
307 }
308 Err(err) => Err(ElasticError::Connection(err.to_string())),
309 }
310 }
311 pub async fn exist_alias(&self, index: &[&str]) -> Result<(), ElasticError> {
312 let res = self
313 .api
314 .client
315 .indices()
316 .exists_alias(IndicesExistsAliasParts::Name(index))
317 .send()
318 .await;
319
320 match res {
321 Ok(v) => {
322 if v.status_code() != 200 {
323 return Err(ElasticError::NotFound(index.join(",")));
324 }
325 Ok(())
326 }
327 Err(err) => Err(ElasticError::Connection(err.to_string())),
328 }
329 }
330 pub async fn update_alias(&self, value: Value) -> Result<Value, ElasticError> {
331 let res = self
332 .api
333 .client
334 .indices()
335 .update_aliases()
336 .body(value)
337 .send()
338 .await;
339
340 match res {
341 Ok(v) => {
342 if v.status_code() != 200 {
343 return Err(ElasticError::Status(
344 v.status_code().as_u16(),
345 v.text().await.unwrap_or_default(),
346 ));
347 }
348 Ok(v.json().await.unwrap())
349 }
350 Err(err) => Err(ElasticError::Connection(err.to_string())),
351 }
352 }
353 pub async fn exists(&self, index: &str) -> Result<(), ElasticError> {
354 let res = self
355 .api
356 .client
357 .indices()
358 .exists(IndicesExistsParts::Index(&[index]))
359 .send()
360 .await;
361 if res.is_err() {
364 return Err(ElasticError::Connection(res.unwrap_err().to_string()));
365 }
366 if res.unwrap().status_code() != 200 {
367 return Err(ElasticError::NotFound(index.to_string()));
368 }
369 Ok(())
370 }
371 pub async fn refresh(&self, index: &str) -> Result<IndicesRefreshResponse, ElasticError> {
372 let res = self
373 .api
374 .client
375 .indices()
376 .refresh(IndicesRefreshParts::Index(&[index]))
377 .send()
378 .await;
379 if res.is_err() {
380 return Err(ElasticError::Connection(res.err().unwrap().to_string()));
381 }
382
383 parse_response(res).await
384 }
385 pub async fn create<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
386 where
387 T: Serialize,
388 {
389 return match self
390 .api
391 .client
392 .indices()
393 .create(IndicesCreateParts::Index(index))
394 .body(json)
395 .send()
396 .await
397 {
398 Ok(v) => {
399 let s = v.status_code();
400 Ok(s == 200)
401 }
402 Err(e) => Err(ElasticError::Send(e.to_string())),
403 };
404 }
405 pub async fn put_index_template<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
406 where
407 T: Serialize,
408 {
409 return match self
410 .api
411 .client
412 .indices()
413 .put_index_template(IndicesPutIndexTemplateParts::Name(index))
414 .body(json)
415 .send()
416 .await
417 {
418 Ok(v) => {
419 let s = v.status_code();
420 Ok(s == 200)
421 }
422 Err(e) => Err(ElasticError::Send(e.to_string())),
423 };
424 }
425 pub async fn exists_index_template<T>(&self, index: &str) -> Result<bool, ElasticError>
426 {
427 return match self
428 .api
429 .client
430 .indices()
431 .exists_index_template(IndicesExistsIndexTemplateParts::Name(index))
432 .send()
433 .await
434 {
435 Ok(v) => {
436 let s = v.status_code();
437 Ok(s == 200)
438 }
439 Err(e) => Err(ElasticError::Send(e.to_string())),
440 };
441 }
442
443 pub async fn delete(&self, index: &str) -> Result<bool, ElasticError> {
444 let res = self
445 .api
446 .client
447 .indices()
448 .delete(IndicesDeleteParts::Index(&[index]))
449 .send()
450 .await;
451 if res.is_err() {
452 return Err(ElasticError::Response(res.err().unwrap().to_string()));
453 }
454 let res = res.unwrap();
455 Ok(res.status_code() == 200)
456 }
457
458 pub async fn recreate<T>(&self, index: &str, json: T) -> Result<bool, ElasticError>
459 where
460 T: Serialize,
461 {
462 if self.api.indices().exists(index).await.is_ok() {
463 let _ = self.api.indices().delete(index).await?;
464 }
465 Ok(self.api.indices().create(index, json).await?)
466 }
467}
468
469#[derive(Debug, Serialize, Deserialize, Clone)]
470pub struct IndicesRefreshResponse {
471 pub _shards: Option<Shards>,
472}
473
474impl GetApi<'_> {
475 pub async fn source<T: for<'de> serde::Deserialize<'de>>(
476 &self,
477 index: &str,
478 id: &str,
479 ) -> Result<T, ElasticError> {
480 let res = self
481 .api
482 .client
483 .get_source(GetSourceParts::IndexId(index, id))
484 .send()
485 .await;
486 parse_response(res).await
487 }
488 pub async fn doc<T: for<'de> serde::Deserialize<'de>>(
489 &self,
490 index: &str,
491 id: &str,
492 ) -> Result<Doc<T>, ElasticError> {
493 let res = self
494 .api
495 .client
496 .get(GetParts::IndexId(index, id))
497 .send()
498 .await;
499
500 parse_response::<Doc<T>>(res).await
501 }
502}
503
504pub struct UpdateApi<'a> {
505 api: &'a ElasticApi,
506}
507
508impl UpdateApi<'_> {
509 pub fn new(api: &ElasticApi) -> UpdateApi {
510 UpdateApi { api }
511 }
512}
513
514impl UpdateApi<'_> {
515 pub async fn doc<T: serde::Serialize>(
516 &self,
517 index: &str,
518 id: &str,
519 source: T,
520 refresh: bool,
521 ) -> Result<(), ElasticError> {
522 let res = self
523 .api
524 .client
525 .update(UpdateParts::IndexId(index, id))
526 .refresh(bool_to_refresh(refresh))
527 .body(json!({ "doc": source }))
528 .send()
529 .await;
530 if res.is_err() {
531 return Err(ElasticError::Response(res.unwrap_err().to_string()));
532 }
533 let code = res.as_ref().unwrap().status_code();
534 if code == 404 {
535 return Err(ElasticError::NotFound(format!("not found entity: {}", id)));
536 }
537 let res = res.unwrap();
538 if res.status_code() != 200 {
539 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
540 }
541 Ok(())
542 }
543}
544
545pub struct BulkApi<'a> {
546 api: &'a ElasticApi,
547}
548
549impl BulkApi<'_> {
550 pub fn new(api: &ElasticApi) -> BulkApi {
551 BulkApi { api }
552 }
553}
554
555impl BulkApi<'_> {
556 pub async fn bulk<T: serde::Serialize>(
557 &self,
558 sources: Vec<T>,
559 refresh: bool,
560 ) -> Result<Value, ElasticError> {
561 let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
562 for source in sources {
563 body.push(json!(source).into())
564 }
565
566 parse_response(
567 self.api
568 .client
569 .bulk(BulkParts::None)
570 .body(body)
571 .refresh(bool_to_refresh(refresh))
572 .send()
573 .await,
574 )
575 .await
576 }
583 pub async fn insert_index<T: serde::Serialize>(
584 &self,
585 index: &str,
586 sources: Vec<T>,
587 ) -> Result<Response, ElasticError> {
588 let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
589 for source in sources {
590 body.push(json!({"index": {}}).into());
591 body.push(json!(source).into())
592 }
593 let res = self
594 .api
595 .client
596 .bulk(BulkParts::Index(index))
597 .body(body)
598 .send()
599 .await;
600 if res.is_err() {
601 return Err(ElasticError::Response(res.err().unwrap().to_string()));
602 }
603 return Ok(res.unwrap());
604 }
605 pub async fn insert_index_by_id<T: serde::Serialize>(
606 &self,
607 index: &str,
608 id: &str,
609 source: T,
610 refresh: bool,
611 ) -> Result<Response, ElasticError> {
612 let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
613 body.push(json!({"index": {"_id":id}}).into());
614 body.push(json!(source).into());
615
616 let res = self
617 .api
618 .client
619 .bulk(BulkParts::Index(index))
620 .body(body)
621 .refresh(bool_to_refresh(refresh))
622 .send()
623 .await;
624 if res.is_err() {
625 return Err(ElasticError::Response(res.err().unwrap().to_string()));
626 }
627 return Ok(res.unwrap());
628 }
629}
630
631pub struct IndexApi<'a> {
633 api: &'a ElasticApi,
634}
635
636impl IndexApi<'_> {
637 pub fn new(api: &ElasticApi) -> IndexApi {
638 IndexApi { api }
639 }
640}
641
642impl IndexApi<'_> {
643 pub async fn create<T: serde::Serialize>(
644 &self,
645 index: &str,
646 source: T,
647 refresh: bool,
648 ) -> Result<(), ElasticError> {
649 let res = self
650 .api
651 .client
652 .index(IndexParts::Index(index))
653 .refresh(bool_to_refresh(refresh))
654 .body(source)
655 .send()
656 .await;
657 if res.is_err() {
658 return Err(ElasticError::Response(res.unwrap_err().to_string()));
659 }
660 let code = res.as_ref().unwrap().status_code();
661 if code == 404 {
662 return Err(ElasticError::NotFound(format!("not found entity")));
663 }
664 let res = res.unwrap();
665 if res.status_code() != 200 && res.status_code() != 201 {
667 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
668 }
669 Ok(())
670 }
671 pub async fn doc<T: serde::Serialize>(
672 &self,
673 index: &str,
674 id: &str,
675 source: T,
676 refresh: bool,
677 ) -> Result<(), ElasticError> {
678 let res = self
679 .api
680 .client
681 .index(IndexParts::IndexId(index, id))
682 .refresh(bool_to_refresh(refresh))
683 .body(source)
684 .send()
685 .await;
686 if res.is_err() {
687 return Err(ElasticError::Response(res.unwrap_err().to_string()));
688 }
689 let code = res.as_ref().unwrap().status_code();
690 if code == 404 {
691 return Err(ElasticError::NotFound(format!("not found entity: {}", id)));
692 }
693 let res = res.unwrap();
694 if res.status_code() != 200 && res.status_code() != 201 {
696 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
697 }
698 Ok(())
699 }
700}
701
702pub struct UpdateByQuery<'a> {
704 api: &'a ElasticApi,
705}
706
707impl UpdateByQuery<'_> {
708 pub fn new(api: &ElasticApi) -> UpdateByQuery {
709 UpdateByQuery { api }
710 }
711}
712
713impl UpdateByQuery<'_> {
714 pub async fn index(
715 &self,
716 index: &str,
717 query_builder: &QueryBuilder,
718 refresh: bool,
719 ) -> Result<(), ElasticError> {
720 let res = self
721 .api
722 .client
723 .update_by_query(UpdateByQueryParts::Index(&[index]))
724 .refresh(refresh)
725 .body(query_builder.build())
726 .send()
727 .await;
728 if res.is_err() {
729 return Err(ElasticError::Response(res.unwrap_err().to_string()));
730 }
731 let code = res.as_ref().unwrap().status_code();
732 if code == 404 {
733 return Err(ElasticError::NotFound(format!("not found entity")));
734 }
735 let res = res.unwrap();
736 if res.status_code() != 200 {
737 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
738 }
739 Ok(())
740 }
741}
742
743pub struct DeleteByQueryApi<'a> {
745 api: &'a ElasticApi,
746}
747
748impl DeleteByQueryApi<'_> {
749 pub fn new(api: &ElasticApi) -> DeleteByQueryApi {
750 DeleteByQueryApi { api }
751 }
752}
753
754impl DeleteByQueryApi<'_> {
755 pub async fn index(
756 &self,
757 index: &str,
758 query_builder: &QueryBuilder,
759 refresh: bool,
760 ) -> Result<(), ElasticError> {
761 let res = self
762 .api
763 .client
764 .delete_by_query(DeleteByQueryParts::Index(&[index]))
765 .body(query_builder.build())
766 .refresh(refresh)
767 .send()
768 .await;
769 if res.is_err() {
770 return Err(ElasticError::Response(res.unwrap_err().to_string()));
771 }
772 let code = res.as_ref().unwrap().status_code();
773 if code == 404 {
774 return Err(ElasticError::NotFound(format!("not found index")));
775 }
776 let res = res.unwrap();
777 if res.status_code() != 200 && res.status_code() != 201 {
779 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
780 }
781 Ok(())
782 }
783}
784
785pub struct IlmApi<'a> {
787 api: &'a ElasticApi,
788}
789
790impl IlmApi<'_> {
791 pub fn new(api: &ElasticApi) -> IlmApi {
792 IlmApi { api }
793 }
794}
795
796#[derive(Serialize, Deserialize)]
797struct Acknowledged {
798 acknowledged: bool,
799}
800impl IlmApi<'_> {
801 pub async fn put_lifecycle<T: Serialize>(
802 &self,
803 ilm_name: &str,
804 value: T,
805 ) -> Result<bool, ElasticError>
806 {
807 let res = match self
808 .api
809 .client
810 .ilm()
811 .put_lifecycle(IlmPutLifecycleParts::Policy(ilm_name))
812 .body(value)
813 .send()
814 .await {
815 Ok(v) => v,
816 Err(e) => { return Err(ElasticError::Response(e.to_string())) }
817 };
818
819 let code = res.status_code();
820 if code == 404 {
821 return Err(ElasticError::NotFound("not found ILM".to_string()));
822 }
823 if res.status_code() != 200 && res.status_code() != 201 {
824 return Err(ElasticError::Response(res.text().await.unwrap_or_default()));
825 }
826 match res.json::<Acknowledged>().await {
827 Ok(v) => { Ok(v.acknowledged) }
828 Err(_) => { Ok(false) }
829 }
830 }
831}