1use channels::ComponentChannels;
2use comp_store::SiemComponentStore;
3use comp_tracking::{ComponentBuildingInfo, ComponentTracking};
4use std::thread;
5use usiem::components::command::SiemCommandCall;
6use usiem::components::common::SiemMessage;
7use usiem::components::metrics::SiemMetricDefinition;
8use usiem::components::{SiemComponent, SiemDatasetManager};
9use usiem::crossbeam_channel;
10use usiem::crossbeam_channel::TryRecvError;
11use usiem::prelude::kernel_message::KernelMessager;
12use usiem::prelude::storage::SiemComponentStateStorage;
13use usiem::prelude::{CommandResult, SiemCommandHeader, SiemCommandResponse};
14use usiem::utilities::types::LogString;
15use utils::ComponentNames;
16
17use crate::metrics::{generate_kernel_metrics, KernelMetrics};
18use crate::{channels, comp_store, comp_tracking, utils};
19
20pub struct SiemBasicKernel {
21 channels: ComponentChannels,
22 components: SiemComponentStore,
23 metrics: (Vec<SiemMetricDefinition>, KernelMetrics),
24 pub command_response_timeout: i64,
25 max_threads: usize,
26 dataset_manager: Option<Box<dyn SiemDatasetManager>>,
27}
28
29impl SiemBasicKernel {
30 pub fn new(channel_size: usize, max_threads: usize, command_timeout: i64) -> SiemBasicKernel {
31 let metrics = generate_kernel_metrics();
32 return SiemBasicKernel {
33 channels: ComponentChannels::new(channel_size, max_threads as f64, metrics.1.clone()),
34 components: SiemComponentStore::default(),
35 command_response_timeout: command_timeout,
36 max_threads,
37 dataset_manager: None,
38 metrics,
39 };
40 }
41
42 pub fn register_wal_component(&mut self, component: Box<dyn SiemComponent>) {
43 self.components.register_wal_component(component);
44 }
45
46 pub fn register_input_component(&mut self, component: Box<dyn SiemComponent>) {
47 self.components.register_input_component(component);
48 }
49 pub fn register_rule_engine_component(&mut self, component: Box<dyn SiemComponent>) {
50 self.components.register_rule_engine_component(component);
51 }
52 pub fn register_output_component(&mut self, component: Box<dyn SiemComponent>) {
53 self.components.register_output_component(component);
54 }
55 pub fn register_other_component(&mut self, component: Box<dyn SiemComponent>) {
56 self.components.register_other_component(component);
57 }
58 pub fn register_parser_component(&mut self, component: Box<dyn SiemComponent>) {
59 self.components.register_parser_component(component);
60 }
61 pub fn register_enricher_component(&mut self, component: Box<dyn SiemComponent>) {
62 self.components.register_enricher_component(component);
63 }
64 pub fn register_norun_component(&mut self, component: Box<dyn SiemComponent>) {
65 self.components.register_norun_component(component);
66 }
67 pub fn register_dataset_manager(&mut self, component: Box<dyn SiemDatasetManager>) {
68 self.dataset_manager = Some(component);
69 }
70 pub fn register_alert_component(&mut self, component: Box<dyn SiemComponent>) {
71 self.components.register_alert_component(component);
72 }
73 pub fn register_state_storage(&mut self, state_storage: Box<dyn SiemComponentStateStorage>) {
74 self.components.register_state_storage(state_storage);
75 }
76
77 fn increase_processed_messages(&self, value: u64) {
78 self.metrics
79 .1
80 .total_messages_processed_by_kernel
81 .inc_by(value as i64);
82 }
83
84 pub fn run(&mut self) {
85 let mut component_tracking = ComponentTracking::new();
86 component_tracking.command_timeout = self.command_response_timeout;
87
88 let dataset_holder = match &self.dataset_manager {
89 Some(dataset_manager) => dataset_manager.get_datasets(),
90 None => {
91 panic!("No DatasetManager!")
92 }
93 };
94
95 match self.dataset_manager.take() {
97 Some(mut comp) => {
98 let msngr = KernelMessager::new(
99 1,
100 comp.name().to_string(),
101 self.channels.kernel_channel.1.clone(),
102 );
103 let local_channel = comp.local_channel();
104 let thread_join = thread::spawn(move || {
105 usiem::logging::initialize_component_logger(msngr);
106 comp.run()
107 });
108 component_tracking.register_external_component(1, (thread_join, local_channel));
109 }
110 None => {
111 panic!("Kernel needs a ParserComponent!!")
112 }
113 };
114 let guard = dataset_holder.lock().unwrap();
115 let datasets = (*guard).clone();
116 drop(guard);
117
118 let mut build_info = ComponentBuildingInfo {
119 store: self.components.clone(),
120 channels: self.channels.clone(),
121 datasets: datasets,
122 };
123 component_tracking.run_all_components(&build_info);
124
125 let names = ComponentNames {
126 parser: build_info.store.get_parser_name().to_string(),
127 enricher: build_info.store.get_enricher_name().to_string(),
128 rule_engine: build_info.store.get_rule_engine_name().to_string(),
129 output: build_info.store.get_output_name().to_string(),
130 };
131
132 let mut iterations = 0;
133 let mut total_messages_processed = 0;
134
135 loop {
136 iterations += 1;
137 #[cfg(feature = "metrics")]
138 {
139 self.channels.update_metrics();
140 self.increase_processed_messages(total_messages_processed);
141 total_messages_processed = 0;
142 }
143 if iterations % 1024 == 0 {
144 self.scale_components(&mut component_tracking, &names, &build_info);
145 }
146
147 if iterations % 64 == 0 {
148 component_tracking.check_tasks();
149 }
150 for _ in 0..50 {
152 match self.channels.kernel_channel.0.try_recv() {
153 Ok(msg) => {
154 total_messages_processed += 1;
155 if self.is_message_for_kernel(&msg) {
156 match self.process_message_for_kernel(
157 msg,
158 &mut component_tracking,
159 &mut build_info,
160 ) {
161 Ok(_) => {}
162 Err(v) => {
163 println!("{}", v);
164 return;
165 }
166 }
167 } else {
168 let _ = component_tracking.route_message(msg, &build_info);
169 }
170 }
171 Err(err) => match err {
172 TryRecvError::Empty => {}
173 TryRecvError::Disconnected => {
174 panic!("Kernel channel disconnected!!!")
175 }
176 },
177 }
178 }
179 }
180 }
181
182 fn process_message_for_kernel(
183 &mut self,
184 message: SiemMessage,
185 component_tracking: &mut ComponentTracking,
186 build_info: &mut ComponentBuildingInfo,
187 ) -> Result<(), &'static str> {
188 match message {
189 SiemMessage::Command(header, command) => match command {
190 SiemCommandCall::START_COMPONENT(_comp_name) => {}
191 SiemCommandCall::STOP_COMPONENT(comp_name) => {
192 if comp_name == "KERNEL" {
193 return Err("Kernel received shutdown command");
194 }
195 }
196 SiemCommandCall::OTHER(name, _params) => {
197 if name == "COMPONENT_FINISHED" {
198 let _ = component_tracking.clean_comp_id(header.comp_id);
199 }
200 }
201 SiemCommandCall::GET_TASK_RESULT(task_id) => {
202 let result = component_tracking.get_task_result(task_id);
203 let result = if let Some(result) = result {
204 CommandResult::Ok(result)
205 } else {
206 CommandResult::Err(usiem::prelude::CommandError::NotFound(
207 LogString::Borrowed("Task has not finished"),
208 ))
209 };
210 component_tracking.send_message_to_component(
211 header.comp_id,
212 SiemMessage::Response(
213 SiemCommandHeader {
214 comm_id: header.comm_id,
215 comp_id: 0,
216 user: header.user,
217 },
218 SiemCommandResponse::GET_TASK_RESULT(result),
219 ),
220 );
221 }
222 _ => {}
223 },
224 SiemMessage::Notification(_) => {}
225 SiemMessage::Dataset(dataset) => {
226 component_tracking.update_dataset(dataset.clone());
227 build_info.datasets.insert(dataset);
228 }
229 _ => {}
230 }
231 Ok(())
232 }
233
234 fn is_message_for_kernel(&self, message: &SiemMessage) -> bool {
235 match message {
236 SiemMessage::Command(_header, command) => match command {
237 SiemCommandCall::START_COMPONENT(_comp_name) => true,
238 SiemCommandCall::STOP_COMPONENT(_comp_name) => true,
239 SiemCommandCall::OTHER(name, _params) => name == "COMPONENT_FINISHED",
240 _ => false,
241 },
242 SiemMessage::Notification(_) => true,
243 SiemMessage::Dataset(_dataset) => true,
244 _ => false,
245 }
246 }
247
248 fn scale_components(
249 &self,
250 tracking: &mut ComponentTracking,
251 names: &ComponentNames,
252 build_info: &ComponentBuildingInfo,
253 ) {
254 match self.channels.scale_parser() {
255 channels::ScaleAction::ScaleUp => {
256 if tracking.running_instances_of_component(&names.parser) < self.max_threads {
257 let _ = tracking.run_parser(&build_info);
258 }
259 }
260 _ => {}
261 };
262 match self.channels.scale_enricher() {
263 channels::ScaleAction::ScaleUp => {
264 if tracking.running_instances_of_component(&names.enricher) < self.max_threads {
265 let _ = tracking.run_enricher(&build_info);
266 }
267 }
268 _ => {}
269 };
270 match self.channels.scale_rules() {
271 channels::ScaleAction::ScaleUp => {
272 if tracking.running_instances_of_component(&names.rule_engine) < self.max_threads {
273 let _ = tracking.run_rule_engine(&build_info);
274 }
275 }
276 _ => {}
277 };
278 match self.channels.scale_output() {
279 channels::ScaleAction::ScaleUp => {
280 if tracking.running_instances_of_component(&names.output) < self.max_threads {
281 let _ = tracking.run_output(&build_info);
282 }
283 }
284 _ => {}
285 };
286 }
287
288 pub fn get_metrics(&self) -> Vec<SiemMetricDefinition> {
289 self.metrics.0.clone()
290 }
291 pub fn configure_channels_in_components(&mut self) {
292 match self.components.wal_component.as_mut() {
293 Some(comp) => {
294 let r = self.channels.wal_log.0.clone();
295 let s = self.channels.parser_channel.1.clone();
296 comp.set_log_channel(s, r);
297 }
298 None => {}
299 }
300 match self.components.enricher_component.as_mut() {
301 Some(comp) => {
302 let r = self.channels.enricher_channel.0.clone();
303 let s = self.channels.rule_engine_channel.1.clone();
304 comp.set_log_channel(s, r);
305 }
306 None => {}
307 }
308 match self.components.parser_component.as_mut() {
309 Some(comp) => {
310 let r = self.channels.parser_channel.0.clone();
311 let s = self.channels.enricher_channel.1.clone();
312 comp.set_log_channel(s, r);
313 }
314 None => {}
315 }
316 match self.components.rule_engine_component.as_mut() {
317 Some(comp) => {
318 let r = self.channels.rule_engine_channel.0.clone();
319 let s = self.channels.output_channel.1.clone();
320 comp.set_log_channel(s, r);
321 }
322 None => {}
323 }
324 match self.components.output_component.as_mut() {
325 Some(comp) => {
326 let r = self.channels.rule_engine_channel.0.clone();
327 let s = if let Some(_) = self.components.wal_component {
328 self.channels.wal_log.1.clone()
329 } else {
330 let (s, _r) = crossbeam_channel::bounded(0);
331 s
332 };
333 comp.set_log_channel(s, r);
334 }
335 None => {}
336 }
337 for comp in self.components.input_components.iter_mut() {
338 let (_s, r) = crossbeam_channel::bounded(1);
339 let s = if let Some(_) = &self.components.wal_component {
340 self.channels.wal_log.1.clone()
341 } else {
342 self.channels.parser_channel.1.clone()
343 };
344 comp.set_log_channel(s, r);
345 }
346 }
347}
348
349#[cfg(test)]
350mod tests {
351
352 use std::collections::BTreeMap;
353
354 use usiem::prelude::metrics::SiemMetric;
355 use usiem::prelude::storage::DummyStateStorage;
356 use usiem::prelude::SiemCommandHeader;
357
358 use super::*;
359 use crate::test_comp::{BasicComponent, BasicDatasetManager};
360
361 fn setup_dummy_kernel() -> SiemBasicKernel {
362 let mut kernel = SiemBasicKernel::new(1000, 4, 5000);
363 let comp = BasicComponent::new();
364 let dm = BasicDatasetManager::new();
365 let ic = BasicComponent::new();
366 let pc = BasicComponent::new();
367 let ec = BasicComponent::new();
368 let oc = BasicComponent::new();
369 let re = BasicComponent::new();
370 let ac = BasicComponent::new();
371 kernel.register_other_component(Box::new(comp));
372 kernel.register_dataset_manager(Box::new(dm));
373 kernel.register_input_component(Box::new(ic));
374 kernel.register_output_component(Box::new(oc));
375 kernel.register_parser_component(Box::new(pc));
376 kernel.register_rule_engine_component(Box::new(re));
377 kernel.register_enricher_component(Box::new(ec));
378 kernel.register_alert_component(Box::new(ac));
379 kernel.register_state_storage(Box::new(DummyStateStorage {}));
380 kernel
381 }
382
383 #[test]
384 fn test_kernel_instance() {
385 let mut kernel = setup_dummy_kernel();
386 let sender = kernel.channels.kernel_channel.1.clone();
387 std::thread::spawn(move || {
388 std::thread::sleep(std::time::Duration::from_millis(1_000));
389 for _ in 0..20 {
391 let _r = sender.send(SiemMessage::Command(
392 SiemCommandHeader {
393 comp_id: 0,
394 comm_id: 0,
395 user: String::from("kernel"),
396 },
397 SiemCommandCall::GET_RULE(String::from("no_exists_rule")),
398 ));
399 }
400 std::thread::sleep(std::time::Duration::from_millis(1_000));
401 let _r = sender.send(SiemMessage::Command(
402 SiemCommandHeader {
403 comp_id: 0,
404 comm_id: 0,
405 user: String::from("kernel"),
406 },
407 SiemCommandCall::STOP_COMPONENT("KERNEL".to_string()),
408 ));
409 });
410 kernel.run();
411 let mut metrics = BTreeMap::new();
412 kernel.get_metrics().iter().for_each(|v| {
413 metrics.insert(v.name().to_string(), v.metric().clone());
414 });
415 if let SiemMetric::Counter(val) = metrics.get("total_messages_processed_by_kernel").unwrap()
417 {
418 assert!(val.with_labels(&[]).unwrap().get() >= 20i64);
420 } else {
421 unreachable!("Must be a counter")
422 }
423 }
424}