usiem_basic_parser/
lib.rs

1use metrics::{generate_parser_metrics, ParserMetrics};
2use std::borrow::Cow;
3use std::collections::{BTreeMap, BTreeSet};
4use usiem::components::command::{
5    CommandDefinition, SiemCommandCall, SiemCommandHeader, SiemCommandResponse, SiemFunctionType,
6};
7use usiem::components::command_types::ParserDefinition;
8use usiem::components::common::{SiemComponentCapabilities, SiemMessage, UserRole};
9use usiem::components::dataset::holder::DatasetHolder;
10use usiem::components::parsing::{LogParser, LogParsingError};
11use usiem::components::SiemComponent;
12use usiem::crossbeam_channel::TryRecvError;
13use usiem::crossbeam_channel::{Receiver, Sender};
14use usiem::events::SiemLog;
15
16use usiem::prelude::storage::SiemComponentStateStorage;
17use usiem::prelude::{CommandResult, SiemError, SiemMetricDefinition};
18use usiem::send_message;
19
20mod metrics;
21
22/// Basic component parser
23/// 
24/// # Example
25/// ```ignore
26/// let mut parser_component = BasicParserComponent::new();
27/// parser_component.add_parser(Box::from(parser1));
28/// parser_component.add_parser(Box::from(parser2));
29/// 
30/// kernel.add_component(parser_component);
31/// ```
32#[derive(Clone)]
33pub struct BasicParserComponent {
34    /// Receive actions from other components or the kernel
35    local_chnl_rcv: Receiver<SiemMessage>,
36    /// Send actions to this components
37    local_chnl_snd: Sender<SiemMessage>,
38    /// Receive logs
39    log_receiver: Receiver<SiemLog>,
40    log_sender: Sender<SiemLog>,
41    conn: Option<Box<dyn SiemComponentStateStorage>>,
42    parsers: Vec<Box<dyn LogParser>>,
43    datasets: DatasetHolder,
44    metrics: (Vec<SiemMetricDefinition>, ParserMetrics),
45}
46
47impl BasicParserComponent {
48    pub fn new() -> BasicParserComponent {
49        let (local_chnl_snd, local_chnl_rcv) = usiem::crossbeam_channel::bounded(10_000);
50        let (log_sender, log_receiver) = usiem::crossbeam_channel::unbounded();
51        return BasicParserComponent {
52            local_chnl_rcv,
53            local_chnl_snd,
54            log_receiver,
55            log_sender,
56            parsers: Vec::new(),
57            conn: None,
58            datasets: DatasetHolder::from_datasets(vec![]),
59            metrics: generate_parser_metrics(&[]),
60        };
61    }
62    /// Adds a new parser in the component
63    pub fn add_parser(&mut self, parser: Box<dyn LogParser>) {
64        self.parsers.push(parser);
65        self.metrics = generate_parser_metrics(&self.parsers);
66    }
67
68    fn list_parsers(&self, header: &SiemCommandHeader) -> SiemMessage {
69        let content2 = self
70            .parsers
71            .iter()
72            .map(|x| ParserDefinition {
73                name: x.name().to_string(),
74                description: x.description().to_string(),
75            })
76            .collect::<Vec<ParserDefinition>>();
77        SiemMessage::Response(
78            SiemCommandHeader {
79                comm_id: header.comm_id,
80                comp_id: header.comp_id,
81                user: header.user.clone(),
82            },
83            SiemCommandResponse::LIST_PARSERS(CommandResult::Ok(content2)),
84        )
85    }
86
87    #[cfg(feature="metrics")]
88    fn update_parser_error_metric(&self, parser: &str) {
89        self.metrics
90            .1
91            .parser_bug_error
92            .with_labels(&[("parser", parser)])
93            .and_then(|metric| {
94                metric.inc();
95                Some(metric)
96            });
97    }
98    #[cfg(feature="metrics")]
99    fn update_parser_not_implemented(&self, parser: &str) {
100        self.metrics
101            .1
102            .parser_unimplemented
103            .with_labels(&[("parser", parser)])
104            .and_then(|metric| {
105                metric.inc();
106                Some(metric)
107            });
108    }
109    #[cfg(feature="metrics")]
110    fn update_parser_format_error(&self, parser: &str) {
111        self.metrics
112            .1
113            .parser_format_error
114            .with_labels(&[("parser", parser)])
115            .and_then(|metric| {
116                metric.inc();
117                Some(metric)
118            });
119    }
120    #[cfg(feature="metrics")]
121    fn update_discard_metric(&self, parser: &str) {
122        self.metrics
123            .1
124            .parser_discarded
125            .with_labels(&[("parser", parser)])
126            .and_then(|metric| {
127                metric.inc();
128                Some(metric)
129            });
130    }
131
132    fn parse_log<'a>(
133        &'a self,
134        origin_parser_map: &mut BTreeMap<String, Vec<&'a Box<dyn LogParser>>>,
135        log: SiemLog,
136    ) -> Option<SiemLog> {
137        let mut empty = Vec::with_capacity(128);
138        let origin: String = log.origin().to_string();
139        let selected_parsers = match origin_parser_map.get_mut(&origin) {
140            Some(vc) => vc,
141            None => &mut empty,
142        };
143        let mut tried_parsers = BTreeSet::new();
144        let mut log = log;
145        // Direct search
146        for parser in &(*selected_parsers) {
147            match parser.parse_log(log, &self.datasets) {
148                Ok(lg) => {
149                    return Some(lg);
150                }
151                Err(e) => match e {
152                    LogParsingError::NoValidParser(lg) => {
153                        log = lg;
154                    }
155                    LogParsingError::ParserError(lg, error) => {
156                        usiem::warn!("Cannot parse log {:?}. Error={}", lg, error);
157                        #[cfg(feature="metrics")]
158                        self.update_parser_error_metric(parser.name());
159                        return Some(lg);
160                    }
161                    LogParsingError::NotImplemented(lg) => {
162                        #[cfg(feature="metrics")]
163                        self.update_parser_not_implemented(parser.name());
164                        return Some(lg);
165                    }
166                    LogParsingError::FormatError(lg, _) => {
167                        #[cfg(feature="metrics")]
168                        self.update_parser_format_error(parser.name());
169                        return Some(lg);
170                    }
171                    LogParsingError::Discard => {
172                        #[cfg(feature="metrics")]
173                        self.update_discard_metric(parser.name());
174                        return None;
175                    }
176                },
177            };
178            tried_parsers.insert(parser.name());
179        }
180        // No direct parser found, try indirect
181        for parser in &self.parsers {
182            if !tried_parsers.contains(parser.name()) {
183                log = match parser.parse_log(log, &self.datasets) {
184                    Ok(lg) => {
185                        if !origin_parser_map.contains_key(&origin) {
186                            origin_parser_map.insert(origin.clone(), vec![&parser]);
187                        } else {
188                            let _: Option<String> =
189                                origin_parser_map.get_mut(&origin).and_then(|x| {
190                                    x.push(&parser);
191                                    None
192                                });
193                        }
194                        return Some(lg);
195                    }
196                    Err(e) => match e {
197                        LogParsingError::NoValidParser(lg) => lg,
198                        LogParsingError::ParserError(lg, error) => {
199                            #[cfg(feature="metrics")]
200                            self.update_parser_error_metric(parser.name());
201                            if !origin_parser_map.contains_key(&origin) {
202                                origin_parser_map.insert(origin.clone(), vec![&parser]);
203                            } else {
204                                let _: Option<String> =
205                                    origin_parser_map.get_mut(&origin).and_then(|x| {
206                                        x.push(&parser);
207                                        None
208                                    });
209                            }
210                            usiem::warn!("Cannot parse log {:?}. Error={}", lg, error);
211                            return Some(lg);
212                        }
213                        LogParsingError::NotImplemented(lg) => {
214                            #[cfg(feature="metrics")]
215                            self.update_parser_not_implemented(parser.name());
216                            if !origin_parser_map.contains_key(&origin) {
217                                origin_parser_map.insert(origin.clone(), vec![&parser]);
218                            } else {
219                                let _: Option<String> =
220                                    origin_parser_map.get_mut(&origin).and_then(|x| {
221                                        x.push(&parser);
222                                        None
223                                    });
224                            }
225                            return Some(lg);
226                        }
227                        LogParsingError::FormatError(lg, error) => {
228                            #[cfg(feature="metrics")]
229                            self.update_parser_format_error(parser.name());
230                            usiem::warn!("Cannot process log. Format error: {}", error);
231                            if !origin_parser_map.contains_key(&origin) {
232                                origin_parser_map.insert(origin.clone(), vec![&parser]);
233                            } else {
234                                let _: Option<String> =
235                                    origin_parser_map.get_mut(&origin).and_then(|x| {
236                                        x.push(&parser);
237                                        None
238                                    });
239                            }
240                            return Some(lg);
241                        }
242                        LogParsingError::Discard => {
243                            #[cfg(feature="metrics")]
244                            self.update_discard_metric(parser.name());
245                            if !origin_parser_map.contains_key(&origin) {
246                                origin_parser_map.insert(origin.clone(), vec![&parser]);
247                            } else {
248                                let _: Option<String> =
249                                    origin_parser_map.get_mut(&origin).and_then(|x| {
250                                        x.push(&parser);
251                                        None
252                                    });
253                            }
254                            return None;
255                        }
256                    },
257                };
258            }
259        }
260        Some(log)
261    }
262}
263
264impl SiemComponent for BasicParserComponent {
265    fn name(&self) -> &'static str {
266        "BasicParser"
267    }
268    fn local_channel(&self) -> Sender<SiemMessage> {
269        self.local_chnl_snd.clone()
270    }
271    fn set_log_channel(&mut self, log_sender: Sender<SiemLog>, receiver: Receiver<SiemLog>) {
272        self.log_receiver = receiver;
273        self.log_sender = log_sender;
274    }
275    fn duplicate(&self) -> Box<dyn SiemComponent> {
276        return Box::new(self.clone());
277    }
278    fn set_datasets(&mut self, datasets: DatasetHolder) {
279        self.datasets = datasets;
280    }
281
282    /// Execute the logic of this component in an infinite loop. Must be stopped using Commands sent using the channel.
283    fn run(&mut self) -> Result<(), SiemError> {
284        let local_chnl_rcv = self.local_chnl_rcv.clone();
285        let mut origin_parser_map: BTreeMap<String, Vec<&Box<dyn LogParser>>> = BTreeMap::new();
286        loop {
287            let rcv_action = local_chnl_rcv.try_recv();
288            match rcv_action {
289                Ok(msg) => match msg {
290                    SiemMessage::Command(hdr, cmd) => match cmd {
291                        SiemCommandCall::STOP_COMPONENT(_name) => return Ok(()),
292                        SiemCommandCall::LIST_PARSERS(_pagination) => {
293                            send_message!(self.list_parsers(&hdr)).unwrap();
294                        }
295                        _ => {}
296                    },
297                    SiemMessage::Log(msg) => {
298                        self.parse_log(&mut origin_parser_map, msg);
299                    }
300                    _ => {}
301                },
302                Err(e) => match e {
303                    TryRecvError::Empty => {
304                        std::thread::sleep(std::time::Duration::from_millis(10));
305                    }
306                    TryRecvError::Disconnected => return Ok(()),
307                },
308            }
309
310            let rcv_log = (&self.log_receiver).try_recv();
311            match rcv_log {
312                Ok(log) => {
313                    let log = match self.parse_log(&mut origin_parser_map, log) {
314                        Some(log) => log,
315                        None => continue,
316                    };
317                    match self.log_sender.send(log) {
318                        Ok(v) => v,
319                        Err(err) => usiem::warn!("Cannot send log: {:?}", err.0),
320                    };
321                }
322                Err(e) => match e {
323                    TryRecvError::Empty => {
324                        continue;
325                    }
326                    TryRecvError::Disconnected => return Ok(()),
327                },
328            }
329        }
330    }
331
332    /// Allow to store information about this component like the state or conigurations.
333    fn set_storage(&mut self, conn: Box<dyn SiemComponentStateStorage>) {
334        self.conn = Some(conn);
335    }
336
337    /// Capabilities and actions that can be performed on this component
338    fn capabilities(&self) -> SiemComponentCapabilities {
339        let datasets = Vec::new();
340        let mut commands = Vec::new();
341
342        let stop_component = CommandDefinition::new(SiemFunctionType::STOP_COMPONENT,Cow::Borrowed("Stop BasicParser") ,Cow::Borrowed("This allows stopping all Basic Parser components.\nUse only when really needed, like when there is a bug in the parsing process.") , UserRole::Administrator);
343        commands.push(stop_component);
344        let start_component = CommandDefinition::new(
345            SiemFunctionType::START_COMPONENT, // Must be added by default by the KERNEL and only used by him
346            Cow::Borrowed("Start Basic Parser"),
347            Cow::Borrowed("This allows processing logs."),
348            UserRole::Administrator,
349        );
350        commands.push(start_component);
351
352        let list_parsers = CommandDefinition::new(
353            SiemFunctionType::LIST_PARSERS,
354            Cow::Borrowed("List log parsers"),
355            Cow::Borrowed("List all parsers in this component."),
356            UserRole::Administrator,
357        );
358        commands.push(list_parsers);
359
360        SiemComponentCapabilities::new(
361            Cow::Borrowed("BasicParser"),
362            Cow::Borrowed("Parse logs using multiple diferent parsers"),
363            Cow::Borrowed(""), // No HTML
364            datasets,
365            commands,
366            vec![],
367            self.metrics.0.clone(),
368        )
369    }
370}
371
372#[cfg(test)]
373mod parser_test {
374    use std::convert::TryInto;
375
376    use super::BasicParserComponent;
377    use usiem::components::command::{
378        Pagination, SiemCommandCall, SiemCommandHeader, SiemCommandResponse,
379    };
380    use usiem::components::common::SiemMessage;
381    use usiem::components::SiemComponent;
382    use usiem::events::field::SiemField;
383    use usiem::events::SiemLog;
384    use usiem::prelude::counter::CounterVec;
385    use usiem::prelude::kernel_message::KernelMessager;
386    use usiem::prelude::{CommandResult, NotificationLevel};
387    use usiem::testing::parsers::{DummyParserAll, DummyParserError, DummyParserText};
388
389    #[test]
390    fn test_parser() {
391        let (log_to_component, log_receiver) = usiem::crossbeam_channel::unbounded();
392        let (log_sender, next_log_receiver) = usiem::crossbeam_channel::unbounded();
393
394        let parser1 = DummyParserText::new();
395        let parser2 = DummyParserAll::new();
396
397        let mut parser = BasicParserComponent::new();
398        let component_channel = parser.local_channel();
399        parser.add_parser(Box::from(parser1));
400        parser.add_parser(Box::from(parser2));
401        parser.set_log_channel(log_sender, log_receiver);
402
403        let log1 = SiemLog::new(
404            "This is a DUMY log for DummyParserTextDUMMY",
405            0,
406            "localhost1",
407        );
408        let log2 = SiemLog::new(
409            "This is NOT a DUmmY log for DummyParserTextDUmmY",
410            0,
411            "localhost2",
412        );
413        let _r = log_to_component.send(log1);
414        let _r = log_to_component.send(log2);
415
416        std::thread::spawn(move || {
417            std::thread::sleep(std::time::Duration::from_millis(200));
418            // STOP parser component to finish testing
419            let _r = component_channel.send(SiemMessage::Command(
420                SiemCommandHeader {
421                    comm_id: 0,
422                    comp_id: 0,
423                    user: "Superuser".to_string(),
424                },
425                SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
426            ));
427        });
428        parser.run().expect("Should not end with errors");
429        let log = next_log_receiver.recv().expect("Log must be received");
430        assert_eq!(
431            log.field("parser"),
432            Some(&SiemField::from_str("DummyParserText"))
433        );
434        let log = next_log_receiver.recv().expect("Log must be received");
435        assert_eq!(
436            log.field("parser"),
437            Some(&SiemField::from_str("DummyParserAll"))
438        );
439    }
440
441    #[test]
442    fn should_list_all_parsers() {
443        let (kernel_sender, kernel_receiver) = usiem::crossbeam_channel::unbounded();
444
445        let msngr = KernelMessager::new(1, String::new(), kernel_sender);
446
447        let parser1 = DummyParserText::new();
448        let parser2 = DummyParserAll::new();
449
450        let mut parser = BasicParserComponent::new();
451        let component_channel = parser.local_channel();
452        parser.add_parser(Box::from(parser1));
453        parser.add_parser(Box::from(parser2));
454        let parser_thd = std::thread::spawn(move || {
455            usiem::logging::initialize_component_logger(msngr);
456            parser.run().expect("Should end without errors");
457        });
458        std::thread::spawn(move || {
459            let _r = component_channel.send(SiemMessage::Command(
460                SiemCommandHeader {
461                    comm_id: 0,
462                    comp_id: 0,
463                    user: "Superuser".to_string(),
464                },
465                SiemCommandCall::LIST_PARSERS(Pagination {
466                    offset: 0,
467                    limit: 1000,
468                }),
469            ));
470            std::thread::sleep(std::time::Duration::from_millis(200));
471            // STOP parser component to finish testing
472            let _r = component_channel.send(SiemMessage::Command(
473                SiemCommandHeader {
474                    comm_id: 0,
475                    comp_id: 0,
476                    user: "Superuser".to_string(),
477                },
478                SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
479            ));
480        });
481        parser_thd.join().unwrap();
482
483        let response = kernel_receiver.recv();
484        let msg: SiemMessage = response.unwrap();
485        if let SiemMessage::Response(_, SiemCommandResponse::LIST_PARSERS(CommandResult::Ok(res))) =
486            msg
487        {
488            assert_eq!(2, res.len());
489            assert_eq!("DummyParserText", res.get(0).unwrap().name);
490            assert_eq!("DummyParserAll", res.get(1).unwrap().name);
491        } else {
492            panic!("Must not be error")
493        }
494    }
495
496    #[test]
497    #[cfg(feature="metrics")]
498    fn should_update_metrics() {
499        let (kernel_sender, kernel_receiver) = usiem::crossbeam_channel::unbounded();
500
501        let msngr = KernelMessager::new(1, String::new(), kernel_sender);
502        let parser1 = DummyParserText::new();
503        let parser2 = DummyParserError::new();
504
505        let mut parser = BasicParserComponent::new();
506
507        let component_channel = parser.local_channel();
508        parser.add_parser(Box::from(parser2));
509        parser.add_parser(Box::from(parser1));
510
511        let capa = parser.capabilities();
512        let mut metrics = std::collections::BTreeMap::new();
513        capa.metrics().iter().for_each(|v| {
514            metrics.insert(v.name().to_string(), v.metric().clone());
515        });
516
517        let parser_unimplemented: CounterVec = metrics
518            .get("parser_unimplemented")
519            .unwrap()
520            .try_into()
521            .unwrap();
522        assert_eq!(0, parser_unimplemented.with_labels(&[]).unwrap().get());
523        let parser_format_error: CounterVec = metrics
524            .get("parser_format_error")
525            .unwrap()
526            .try_into()
527            .unwrap();
528        assert_eq!(0, parser_format_error.with_labels(&[]).unwrap().get());
529        let parser_bug_error: CounterVec =
530            metrics.get("parser_bug_error").unwrap().try_into().unwrap();
531        assert_eq!(0, parser_bug_error.with_labels(&[]).unwrap().get());
532        assert_eq!(
533            0,
534            parser_bug_error
535                .with_labels(&[("parser", "DummyParserError")])
536                .unwrap()
537                .get()
538        );
539        assert_eq!(
540            0,
541            parser_bug_error
542                .with_labels(&[("parser", "DummyParserText")])
543                .unwrap()
544                .get()
545        );
546
547        let parser_thd = std::thread::spawn(move || {
548            usiem::logging::set_max_level(NotificationLevel::Debug);
549            usiem::logging::initialize_component_logger(msngr);
550            parser.run().expect("Should end without errors");
551        });
552
553        std::thread::spawn(move || loop {
554            let msg = match kernel_receiver.recv() {
555                Ok(v) => v,
556                Err(_) => return,
557            };
558            if let SiemMessage::Notification(msg) = msg {
559                println!("{}", msg.log);
560            }
561        });
562
563        let log1 = SiemLog::new("This is a DUMMY log for DummyParserText", 0, "localhost1");
564        let _r = component_channel.send(SiemMessage::Log(log1));
565
566        std::thread::spawn(move || {
567            let _r = component_channel.send(SiemMessage::Command(
568                SiemCommandHeader {
569                    comm_id: 0,
570                    comp_id: 0,
571                    user: "Superuser".to_string(),
572                },
573                SiemCommandCall::LIST_PARSERS(Pagination {
574                    offset: 0,
575                    limit: 1000,
576                }),
577            ));
578            std::thread::sleep(std::time::Duration::from_millis(200));
579            // STOP parser component to finish testing
580            let _r = component_channel.send(SiemMessage::Command(
581                SiemCommandHeader {
582                    comm_id: 0,
583                    comp_id: 0,
584                    user: "Superuser".to_string(),
585                },
586                SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
587            ));
588            std::thread::sleep(std::time::Duration::from_millis(200));
589        });
590
591        parser_thd.join().unwrap();
592        println!("{:?}", parser_bug_error);
593        assert_eq!(
594            1,
595            parser_bug_error
596                .with_labels(&[("parser", "DummyParserError")])
597                .unwrap()
598                .get()
599        );
600        assert_eq!(
601            0,
602            parser_bug_error
603                .with_labels(&[("parser", "DummyParserText")])
604                .unwrap()
605                .get()
606        );
607    }
608}