usiem_kernel/
kernel.rs

1use channels::ComponentChannels;
2use comp_store::SiemComponentStore;
3use comp_tracking::{ComponentBuildingInfo, ComponentTracking};
4use std::thread;
5use usiem::components::command::SiemCommandCall;
6use usiem::components::common::SiemMessage;
7use usiem::components::metrics::SiemMetricDefinition;
8use usiem::components::{SiemComponent, SiemDatasetManager};
9use usiem::crossbeam_channel;
10use usiem::crossbeam_channel::TryRecvError;
11use usiem::prelude::kernel_message::KernelMessager;
12use usiem::prelude::storage::SiemComponentStateStorage;
13use usiem::prelude::{CommandResult, SiemCommandHeader, SiemCommandResponse};
14use usiem::utilities::types::LogString;
15use utils::ComponentNames;
16
17use crate::metrics::{generate_kernel_metrics, KernelMetrics};
18use crate::{channels, comp_store, comp_tracking, utils};
19
20pub struct SiemBasicKernel {
21    channels: ComponentChannels,
22    components: SiemComponentStore,
23    metrics: (Vec<SiemMetricDefinition>, KernelMetrics),
24    pub command_response_timeout: i64,
25    max_threads: usize,
26    dataset_manager: Option<Box<dyn SiemDatasetManager>>,
27}
28
29impl SiemBasicKernel {
30    pub fn new(channel_size: usize, max_threads: usize, command_timeout: i64) -> SiemBasicKernel {
31        let metrics = generate_kernel_metrics();
32        return SiemBasicKernel {
33            channels: ComponentChannels::new(channel_size, max_threads as f64, metrics.1.clone()),
34            components: SiemComponentStore::default(),
35            command_response_timeout: command_timeout,
36            max_threads,
37            dataset_manager: None,
38            metrics,
39        };
40    }
41
42    pub fn register_wal_component(&mut self, component: Box<dyn SiemComponent>) {
43        self.components.register_wal_component(component);
44    }
45
46    pub fn register_input_component(&mut self, component: Box<dyn SiemComponent>) {
47        self.components.register_input_component(component);
48    }
49    pub fn register_rule_engine_component(&mut self, component: Box<dyn SiemComponent>) {
50        self.components.register_rule_engine_component(component);
51    }
52    pub fn register_output_component(&mut self, component: Box<dyn SiemComponent>) {
53        self.components.register_output_component(component);
54    }
55    pub fn register_other_component(&mut self, component: Box<dyn SiemComponent>) {
56        self.components.register_other_component(component);
57    }
58    pub fn register_parser_component(&mut self, component: Box<dyn SiemComponent>) {
59        self.components.register_parser_component(component);
60    }
61    pub fn register_enricher_component(&mut self, component: Box<dyn SiemComponent>) {
62        self.components.register_enricher_component(component);
63    }
64    pub fn register_norun_component(&mut self, component: Box<dyn SiemComponent>) {
65        self.components.register_norun_component(component);
66    }
67    pub fn register_dataset_manager(&mut self, component: Box<dyn SiemDatasetManager>) {
68        self.dataset_manager = Some(component);
69    }
70    pub fn register_alert_component(&mut self, component: Box<dyn SiemComponent>) {
71        self.components.register_alert_component(component);
72    }
73    pub fn register_state_storage(&mut self, state_storage: Box<dyn SiemComponentStateStorage>) {
74        self.components.register_state_storage(state_storage);
75    }
76
77    fn increase_processed_messages(&self, value: u64) {
78        self.metrics
79            .1
80            .total_messages_processed_by_kernel
81            .inc_by(value as i64);
82    }
83
84    pub fn run(&mut self) {
85        let mut component_tracking = ComponentTracking::new();
86        component_tracking.command_timeout = self.command_response_timeout;
87
88        let dataset_holder = match &self.dataset_manager {
89            Some(dataset_manager) => dataset_manager.get_datasets(),
90            None => {
91                panic!("No DatasetManager!")
92            }
93        };
94
95        // We can take the dataset manager. If for whatever reason the thread ends, then we will end the execution of the entire program.
96        match self.dataset_manager.take() {
97            Some(mut comp) => {
98                let msngr = KernelMessager::new(
99                    1,
100                    comp.name().to_string(),
101                    self.channels.kernel_channel.1.clone(),
102                );
103                let local_channel = comp.local_channel();
104                let thread_join = thread::spawn(move || {
105                    usiem::logging::initialize_component_logger(msngr);
106                    comp.run()
107                });
108                component_tracking.register_external_component(1, (thread_join, local_channel));
109            }
110            None => {
111                panic!("Kernel needs a ParserComponent!!")
112            }
113        };
114        let guard = dataset_holder.lock().unwrap();
115        let datasets = (*guard).clone();
116        drop(guard);
117
118        let mut build_info = ComponentBuildingInfo {
119            store: self.components.clone(),
120            channels: self.channels.clone(),
121            datasets: datasets,
122        };
123        component_tracking.run_all_components(&build_info);
124
125        let names = ComponentNames {
126            parser: build_info.store.get_parser_name().to_string(),
127            enricher: build_info.store.get_enricher_name().to_string(),
128            rule_engine: build_info.store.get_rule_engine_name().to_string(),
129            output: build_info.store.get_output_name().to_string(),
130        };
131
132        let mut iterations = 0;
133        let mut total_messages_processed = 0;
134
135        loop {
136            iterations += 1;
137            #[cfg(feature = "metrics")]
138            {
139                self.channels.update_metrics();
140                self.increase_processed_messages(total_messages_processed);
141                total_messages_processed = 0;
142            }
143            if iterations % 1024 == 0 {
144                self.scale_components(&mut component_tracking, &names, &build_info);
145            }
146
147            if iterations % 64 == 0 {
148                component_tracking.check_tasks();
149            }
150            // Prioritize receiving messages
151            for _ in 0..50 {
152                match self.channels.kernel_channel.0.try_recv() {
153                    Ok(msg) => {
154                        total_messages_processed += 1;
155                        if self.is_message_for_kernel(&msg) {
156                            match self.process_message_for_kernel(
157                                msg,
158                                &mut component_tracking,
159                                &mut build_info,
160                            ) {
161                                Ok(_) => {}
162                                Err(v) => {
163                                    println!("{}", v);
164                                    return;
165                                }
166                            }
167                        } else {
168                            let _ = component_tracking.route_message(msg, &build_info);
169                        }
170                    }
171                    Err(err) => match err {
172                        TryRecvError::Empty => {}
173                        TryRecvError::Disconnected => {
174                            panic!("Kernel channel disconnected!!!")
175                        }
176                    },
177                }
178            }
179        }
180    }
181
182    fn process_message_for_kernel(
183        &mut self,
184        message: SiemMessage,
185        component_tracking: &mut ComponentTracking,
186        build_info: &mut ComponentBuildingInfo,
187    ) -> Result<(), &'static str> {
188        match message {
189            SiemMessage::Command(header, command) => match command {
190                SiemCommandCall::START_COMPONENT(_comp_name) => {}
191                SiemCommandCall::STOP_COMPONENT(comp_name) => {
192                    if comp_name == "KERNEL" {
193                        return Err("Kernel received shutdown command");
194                    }
195                }
196                SiemCommandCall::OTHER(name, _params) => {
197                    if name == "COMPONENT_FINISHED" {
198                        let _ = component_tracking.clean_comp_id(header.comp_id);
199                    }
200                }
201                SiemCommandCall::GET_TASK_RESULT(task_id) => {
202                    let result = component_tracking.get_task_result(task_id);
203                    let result = if let Some(result) = result {
204                        CommandResult::Ok(result)
205                    } else {
206                        CommandResult::Err(usiem::prelude::CommandError::NotFound(
207                            LogString::Borrowed("Task has not finished"),
208                        ))
209                    };
210                    component_tracking.send_message_to_component(
211                        header.comp_id,
212                        SiemMessage::Response(
213                            SiemCommandHeader {
214                                comm_id: header.comm_id,
215                                comp_id: 0,
216                                user: header.user,
217                            },
218                            SiemCommandResponse::GET_TASK_RESULT(result),
219                        ),
220                    );
221                }
222                _ => {}
223            },
224            SiemMessage::Notification(_) => {}
225            SiemMessage::Dataset(dataset) => {
226                component_tracking.update_dataset(dataset.clone());
227                build_info.datasets.insert(dataset);
228            }
229            _ => {}
230        }
231        Ok(())
232    }
233
234    fn is_message_for_kernel(&self, message: &SiemMessage) -> bool {
235        match message {
236            SiemMessage::Command(_header, command) => match command {
237                SiemCommandCall::START_COMPONENT(_comp_name) => true,
238                SiemCommandCall::STOP_COMPONENT(_comp_name) => true,
239                SiemCommandCall::OTHER(name, _params) => name == "COMPONENT_FINISHED",
240                _ => false,
241            },
242            SiemMessage::Notification(_) => true,
243            SiemMessage::Dataset(_dataset) => true,
244            _ => false,
245        }
246    }
247
248    fn scale_components(
249        &self,
250        tracking: &mut ComponentTracking,
251        names: &ComponentNames,
252        build_info: &ComponentBuildingInfo,
253    ) {
254        match self.channels.scale_parser() {
255            channels::ScaleAction::ScaleUp => {
256                if tracking.running_instances_of_component(&names.parser) < self.max_threads {
257                    let _ = tracking.run_parser(&build_info);
258                }
259            }
260            _ => {}
261        };
262        match self.channels.scale_enricher() {
263            channels::ScaleAction::ScaleUp => {
264                if tracking.running_instances_of_component(&names.enricher) < self.max_threads {
265                    let _ = tracking.run_enricher(&build_info);
266                }
267            }
268            _ => {}
269        };
270        match self.channels.scale_rules() {
271            channels::ScaleAction::ScaleUp => {
272                if tracking.running_instances_of_component(&names.rule_engine) < self.max_threads {
273                    let _ = tracking.run_rule_engine(&build_info);
274                }
275            }
276            _ => {}
277        };
278        match self.channels.scale_output() {
279            channels::ScaleAction::ScaleUp => {
280                if tracking.running_instances_of_component(&names.output) < self.max_threads {
281                    let _ = tracking.run_output(&build_info);
282                }
283            }
284            _ => {}
285        };
286    }
287
288    pub fn get_metrics(&self) -> Vec<SiemMetricDefinition> {
289        self.metrics.0.clone()
290    }
291    pub fn configure_channels_in_components(&mut self) {
292        match self.components.wal_component.as_mut() {
293            Some(comp) => {
294                let r = self.channels.wal_log.0.clone();
295                let s = self.channels.parser_channel.1.clone();
296                comp.set_log_channel(s, r);
297            }
298            None => {}
299        }
300        match self.components.enricher_component.as_mut() {
301            Some(comp) => {
302                let r = self.channels.enricher_channel.0.clone();
303                let s = self.channels.rule_engine_channel.1.clone();
304                comp.set_log_channel(s, r);
305            }
306            None => {}
307        }
308        match self.components.parser_component.as_mut() {
309            Some(comp) => {
310                let r = self.channels.parser_channel.0.clone();
311                let s = self.channels.enricher_channel.1.clone();
312                comp.set_log_channel(s, r);
313            }
314            None => {}
315        }
316        match self.components.rule_engine_component.as_mut() {
317            Some(comp) => {
318                let r = self.channels.rule_engine_channel.0.clone();
319                let s = self.channels.output_channel.1.clone();
320                comp.set_log_channel(s, r);
321            }
322            None => {}
323        }
324        match self.components.output_component.as_mut() {
325            Some(comp) => {
326                let r = self.channels.rule_engine_channel.0.clone();
327                let s = if let Some(_) = self.components.wal_component {
328                    self.channels.wal_log.1.clone()
329                } else {
330                    let (s, _r) = crossbeam_channel::bounded(0);
331                    s
332                };
333                comp.set_log_channel(s, r);
334            }
335            None => {}
336        }
337        for comp in self.components.input_components.iter_mut() {
338            let (_s, r) = crossbeam_channel::bounded(1);
339            let s = if let Some(_) = &self.components.wal_component {
340                self.channels.wal_log.1.clone()
341            } else {
342                self.channels.parser_channel.1.clone()
343            };
344            comp.set_log_channel(s, r);
345        }
346    }
347}
348
349#[cfg(test)]
350mod tests {
351
352    use std::collections::BTreeMap;
353
354    use usiem::prelude::metrics::SiemMetric;
355    use usiem::prelude::storage::DummyStateStorage;
356    use usiem::prelude::SiemCommandHeader;
357
358    use super::*;
359    use crate::test_comp::{BasicComponent, BasicDatasetManager};
360
361    fn setup_dummy_kernel() -> SiemBasicKernel {
362        let mut kernel = SiemBasicKernel::new(1000, 4, 5000);
363        let comp = BasicComponent::new();
364        let dm = BasicDatasetManager::new();
365        let ic = BasicComponent::new();
366        let pc = BasicComponent::new();
367        let ec = BasicComponent::new();
368        let oc = BasicComponent::new();
369        let re = BasicComponent::new();
370        let ac = BasicComponent::new();
371        kernel.register_other_component(Box::new(comp));
372        kernel.register_dataset_manager(Box::new(dm));
373        kernel.register_input_component(Box::new(ic));
374        kernel.register_output_component(Box::new(oc));
375        kernel.register_parser_component(Box::new(pc));
376        kernel.register_rule_engine_component(Box::new(re));
377        kernel.register_enricher_component(Box::new(ec));
378        kernel.register_alert_component(Box::new(ac));
379        kernel.register_state_storage(Box::new(DummyStateStorage {}));
380        kernel
381    }
382
383    #[test]
384    fn test_kernel_instance() {
385        let mut kernel = setup_dummy_kernel();
386        let sender = kernel.channels.kernel_channel.1.clone();
387        std::thread::spawn(move || {
388            std::thread::sleep(std::time::Duration::from_millis(1_000));
389            // STOP parser component to finish testing
390            for _ in 0..20 {
391                let _r = sender.send(SiemMessage::Command(
392                    SiemCommandHeader {
393                        comp_id: 0,
394                        comm_id: 0,
395                        user: String::from("kernel"),
396                    },
397                    SiemCommandCall::GET_RULE(String::from("no_exists_rule")),
398                ));
399            }
400            std::thread::sleep(std::time::Duration::from_millis(1_000));
401            let _r = sender.send(SiemMessage::Command(
402                SiemCommandHeader {
403                    comp_id: 0,
404                    comm_id: 0,
405                    user: String::from("kernel"),
406                },
407                SiemCommandCall::STOP_COMPONENT("KERNEL".to_string()),
408            ));
409        });
410        kernel.run();
411        let mut metrics = BTreeMap::new();
412        kernel.get_metrics().iter().for_each(|v| {
413            metrics.insert(v.name().to_string(), v.metric().clone());
414        });
415        // Test metrics are working
416        if let SiemMetric::Counter(val) = metrics.get("total_messages_processed_by_kernel").unwrap()
417        {
418            // 20 messages of get rule + Notifications of components
419            assert!(val.with_labels(&[]).unwrap().get() >= 20i64);
420        } else {
421            unreachable!("Must be a counter")
422        }
423    }
424}