es4forensics/
index.rs

1use anyhow::{bail, Result};
2use base64::{encode_config, URL_SAFE_NO_PAD};
3use elasticsearch::{
4    BulkOperation, BulkParts, Elasticsearch,
5};
6use serde_json::Value;
7use sha2::{Digest, Sha256};
8
9use crate::ecs::TimelineObject;
10
11struct ElasticDocument {
12    id: String,
13    content: Value,
14}
15
16impl From<ElasticDocument> for (String, Value) {
17    fn from(me: ElasticDocument) -> Self {
18        (me.id, me.content)
19    }
20}
21
22impl From<Value> for ElasticDocument {
23    fn from(val: Value) -> Self {
24        let mut hasher = Sha256::new();
25        hasher.update(val.to_string());
26        let result = hasher.finalize();
27        Self {
28            id: encode_config(result, URL_SAFE_NO_PAD),
29            content: val,
30        }
31    }
32}
33
34pub struct Index {
35    name: String,
36    client: Elasticsearch,
37
38    cache_size: usize,
39    document_cache: Option<Vec<ElasticDocument>>,
40}
41
42impl Index {
43    pub fn new(name: String, client: Elasticsearch) -> Self {
44        Self {
45            name,
46            client,
47            cache_size: 10000,
48            document_cache: Some(Vec::new()),
49        }
50    }
51    
52    #[allow(dead_code)]
53    pub async fn add_timeline_object<Obj>(&mut self, object: Obj) -> Result<()> where Obj: TimelineObject {
54        for builder_res in object {
55            match builder_res {
56                Err(why) => {
57                    log::error!("Error while creating JSON value: {why}")
58                }
59                Ok(builder) => {
60                    let (_, value) = builder.into();
61                    self.add_bulk_document(value).await?;
62                }
63            }
64        }
65        Ok(())
66    }
67
68    pub (crate) async fn add_bulk_document(&mut self, document: Value) -> Result<()> {
69        if let Some(c) = self.document_cache.as_mut() {
70            c.push(document.into())
71        }
72
73        if self.document_cache.as_ref().unwrap().len() >= self.cache_size {
74            self.flush().await
75        } else {
76            Ok(())
77        }
78    }
79
80    pub async fn flush(&mut self) -> Result<()> {
81
82        match self.document_cache.as_ref() {
83            None => log::trace!("There is no document cache"),
84
85            Some(document_cache) => {
86                log::info!("flushing document cache with {} entries", document_cache.len());
87                if document_cache.is_empty() {
88                    log::trace!("Document cache is empty");
89                } else {
90                    let parts = BulkParts::Index(&self.name);
91
92                    let item_count = self.document_cache.as_ref().unwrap().len();
93                    let items: Vec<BulkOperation<Value>> = self
94                        .document_cache
95                        .replace(Vec::new())
96                        .unwrap()
97                        .into_iter()
98                        .map(|v| {
99                            let (id, val) = v.into();
100                            BulkOperation::create(id, val).into()
101                        })
102                        .collect();
103                    let bulk = self.client.bulk(parts).body(items);
104
105                    let response = bulk.send().await?;
106
107                    if !response.status_code().is_success() {
108                        log::error!(
109                            "error {} while sending bulk operation",
110                            response.status_code()
111                        );
112                        log::error!("{}", response.text().await?);
113                        bail!("error while sending bulk operation");
114                    } else {
115                        let json: Value = response.json().await?;
116                        if json["errors"].as_bool().unwrap() {
117                            log::error!("error while writing to elasticsearch: {json}");
118                        } else {
119                            log::trace!("successfully wrote {item_count} items");
120                        }
121                    }
122                }
123            }
124        }
125        Ok(())
126    }
127
128    pub async fn set_cache_size(&mut self, cache_size: usize) -> Result<()> {
129        if self.cache_size > cache_size {
130            self.flush().await?;
131        }
132        self.cache_size = cache_size;
133        Ok(())
134    }
135}
136
137impl Drop for Index {
138    fn drop(&mut self) {
139        let _ = self.flush();
140    }
141}