1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
pub mod input;
pub mod output;

#[cfg(test)]
mod elastic_test {
    use super::input::ElasticSearchInput;
    use super::output::{ElasticOuputConfig, ElasticSearchOutput};
    use std::thread;
    use usiem::chrono;
    use usiem::components::command::{SiemCommandCall, SiemCommandHeader};
    use usiem::components::common::SiemMessage;
    use usiem::components::SiemComponent;
    use usiem::crossbeam_channel;
    use usiem::events::{SiemEvent, SiemLog};

    #[test]
    fn test_elasticsearch_in_out() {
        let (k_sender, _receiver) = crossbeam_channel::unbounded();

        let (log_sender_for_in, log_rec_for_in) = crossbeam_channel::unbounded();
        let (log_sender_for_out, log_rec_for_out) = crossbeam_channel::unbounded();

        let mut es_in = ElasticSearchInput::new("127.0.0.1:9200".to_string(), 1);
        es_in.set_kernel_sender(k_sender.clone());
        es_in.set_log_channel(log_sender_for_in.clone(), log_rec_for_in.clone());

        let config = ElasticOuputConfig {
            commit_max_messages: 30,
            commit_timeout: 1,
            commit_time: 1,
            cache_size: 50,
            elastic_address: String::from("http://127.0.0.1:9200"),
            elastic_stream: String::from("log-integration-test"),
            bearer_token: None,
        };

        let mut es_output = ElasticSearchOutput::new(config);
        es_output.set_log_channel(log_sender_for_out.clone(), log_rec_for_out);
        es_output.set_kernel_sender(k_sender);
        let out_channel = es_output.local_channel();
        let in_channel = es_in.local_channel();

        thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(50));

            for i in 0..100 {
                let log = SiemLog::new(
                    format!("Log for testing only {}", i),
                    chrono::Utc::now().timestamp_millis(),
                    "123.45.67.89",
                );
                log_sender_for_out.send(log).unwrap();
                println!("Sended log {}", i);
            }
            thread::sleep(std::time::Duration::from_millis(3000));
            out_channel
                .send(SiemMessage::Command(
                    SiemCommandHeader {
                        comm_id: 0,
                        comp_id: 0,
                        user: String::from("None"),
                    },
                    SiemCommandCall::STOP_COMPONENT("Component for testing".to_string()),
                ))
                .unwrap();
            in_channel
                .send(SiemMessage::Command(
                    SiemCommandHeader {
                        comm_id: 0,
                        comp_id: 0,
                        user: String::from("None"),
                    },
                    SiemCommandCall::STOP_COMPONENT("Component for testing".to_string()),
                ))
                .unwrap();
            println!("Ended sending logs");
        });
        thread::spawn(move || {
            es_in.run();
        });
        thread::sleep(std::time::Duration::from_millis(100));
        thread::spawn(move || {
            es_output.run();
        });
        thread::sleep(std::time::Duration::from_millis(3000));
        let mut log_n = 0;
        loop {
            match log_rec_for_in.try_recv() {
                Ok(log) => {
                    log_n += 1;
                    match log.event() {
                        SiemEvent::Json(value) => {
                            assert!(value
                                .get("message")
                                .unwrap()
                                .as_str()
                                .unwrap()
                                .contains("Log for testing only"))
                        }
                        _ => {
                            panic!("Invalid log body")
                        }
                    }
                }
                Err(_e) => break,
            };
        }
        assert_eq!(log_n, 100);
    }
}