1use anyhow::{bail, Result};
2use base64::{encode_config, URL_SAFE_NO_PAD};
3use elasticsearch::{
4 BulkOperation, BulkParts, Elasticsearch,
5};
6use serde_json::Value;
7use sha2::{Digest, Sha256};
8
9use crate::ecs::TimelineObject;
10
11struct ElasticDocument {
12 id: String,
13 content: Value,
14}
15
16impl From<ElasticDocument> for (String, Value) {
17 fn from(me: ElasticDocument) -> Self {
18 (me.id, me.content)
19 }
20}
21
22impl From<Value> for ElasticDocument {
23 fn from(val: Value) -> Self {
24 let mut hasher = Sha256::new();
25 hasher.update(val.to_string());
26 let result = hasher.finalize();
27 Self {
28 id: encode_config(result, URL_SAFE_NO_PAD),
29 content: val,
30 }
31 }
32}
33
34pub struct Index {
35 name: String,
36 client: Elasticsearch,
37
38 cache_size: usize,
39 document_cache: Option<Vec<ElasticDocument>>,
40}
41
42impl Index {
43 pub fn new(name: String, client: Elasticsearch) -> Self {
44 Self {
45 name,
46 client,
47 cache_size: 10000,
48 document_cache: Some(Vec::new()),
49 }
50 }
51
52 #[allow(dead_code)]
53 pub async fn add_timeline_object<Obj>(&mut self, object: Obj) -> Result<()> where Obj: TimelineObject {
54 for builder_res in object {
55 match builder_res {
56 Err(why) => {
57 log::error!("Error while creating JSON value: {why}")
58 }
59 Ok(builder) => {
60 let (_, value) = builder.into();
61 self.add_bulk_document(value).await?;
62 }
63 }
64 }
65 Ok(())
66 }
67
68 pub (crate) async fn add_bulk_document(&mut self, document: Value) -> Result<()> {
69 if let Some(c) = self.document_cache.as_mut() {
70 c.push(document.into())
71 }
72
73 if self.document_cache.as_ref().unwrap().len() >= self.cache_size {
74 self.flush().await
75 } else {
76 Ok(())
77 }
78 }
79
80 pub async fn flush(&mut self) -> Result<()> {
81
82 match self.document_cache.as_ref() {
83 None => log::trace!("There is no document cache"),
84
85 Some(document_cache) => {
86 log::info!("flushing document cache with {} entries", document_cache.len());
87 if document_cache.is_empty() {
88 log::trace!("Document cache is empty");
89 } else {
90 let parts = BulkParts::Index(&self.name);
91
92 let item_count = self.document_cache.as_ref().unwrap().len();
93 let items: Vec<BulkOperation<Value>> = self
94 .document_cache
95 .replace(Vec::new())
96 .unwrap()
97 .into_iter()
98 .map(|v| {
99 let (id, val) = v.into();
100 BulkOperation::create(id, val).into()
101 })
102 .collect();
103 let bulk = self.client.bulk(parts).body(items);
104
105 let response = bulk.send().await?;
106
107 if !response.status_code().is_success() {
108 log::error!(
109 "error {} while sending bulk operation",
110 response.status_code()
111 );
112 log::error!("{}", response.text().await?);
113 bail!("error while sending bulk operation");
114 } else {
115 let json: Value = response.json().await?;
116 if json["errors"].as_bool().unwrap() {
117 log::error!("error while writing to elasticsearch: {json}");
118 } else {
119 log::trace!("successfully wrote {item_count} items");
120 }
121 }
122 }
123 }
124 }
125 Ok(())
126 }
127
128 pub async fn set_cache_size(&mut self, cache_size: usize) -> Result<()> {
129 if self.cache_size > cache_size {
130 self.flush().await?;
131 }
132 self.cache_size = cache_size;
133 Ok(())
134 }
135}
136
137impl Drop for Index {
138 fn drop(&mut self) {
139 let _ = self.flush();
140 }
141}