1use metrics::{generate_parser_metrics, ParserMetrics};
2use std::borrow::Cow;
3use std::collections::{BTreeMap, BTreeSet};
4use usiem::components::command::{
5 CommandDefinition, SiemCommandCall, SiemCommandHeader, SiemCommandResponse, SiemFunctionType,
6};
7use usiem::components::command_types::ParserDefinition;
8use usiem::components::common::{SiemComponentCapabilities, SiemMessage, UserRole};
9use usiem::components::dataset::holder::DatasetHolder;
10use usiem::components::parsing::{LogParser, LogParsingError};
11use usiem::components::SiemComponent;
12use usiem::crossbeam_channel::TryRecvError;
13use usiem::crossbeam_channel::{Receiver, Sender};
14use usiem::events::SiemLog;
15
16use usiem::prelude::storage::SiemComponentStateStorage;
17use usiem::prelude::{CommandResult, SiemError, SiemMetricDefinition};
18use usiem::send_message;
19
20mod metrics;
21
22#[derive(Clone)]
33pub struct BasicParserComponent {
34 local_chnl_rcv: Receiver<SiemMessage>,
36 local_chnl_snd: Sender<SiemMessage>,
38 log_receiver: Receiver<SiemLog>,
40 log_sender: Sender<SiemLog>,
41 conn: Option<Box<dyn SiemComponentStateStorage>>,
42 parsers: Vec<Box<dyn LogParser>>,
43 datasets: DatasetHolder,
44 metrics: (Vec<SiemMetricDefinition>, ParserMetrics),
45}
46
47impl BasicParserComponent {
48 pub fn new() -> BasicParserComponent {
49 let (local_chnl_snd, local_chnl_rcv) = usiem::crossbeam_channel::bounded(10_000);
50 let (log_sender, log_receiver) = usiem::crossbeam_channel::unbounded();
51 return BasicParserComponent {
52 local_chnl_rcv,
53 local_chnl_snd,
54 log_receiver,
55 log_sender,
56 parsers: Vec::new(),
57 conn: None,
58 datasets: DatasetHolder::from_datasets(vec![]),
59 metrics: generate_parser_metrics(&[]),
60 };
61 }
62 pub fn add_parser(&mut self, parser: Box<dyn LogParser>) {
64 self.parsers.push(parser);
65 self.metrics = generate_parser_metrics(&self.parsers);
66 }
67
68 fn list_parsers(&self, header: &SiemCommandHeader) -> SiemMessage {
69 let content2 = self
70 .parsers
71 .iter()
72 .map(|x| ParserDefinition {
73 name: x.name().to_string(),
74 description: x.description().to_string(),
75 })
76 .collect::<Vec<ParserDefinition>>();
77 SiemMessage::Response(
78 SiemCommandHeader {
79 comm_id: header.comm_id,
80 comp_id: header.comp_id,
81 user: header.user.clone(),
82 },
83 SiemCommandResponse::LIST_PARSERS(CommandResult::Ok(content2)),
84 )
85 }
86
87 #[cfg(feature="metrics")]
88 fn update_parser_error_metric(&self, parser: &str) {
89 self.metrics
90 .1
91 .parser_bug_error
92 .with_labels(&[("parser", parser)])
93 .and_then(|metric| {
94 metric.inc();
95 Some(metric)
96 });
97 }
98 #[cfg(feature="metrics")]
99 fn update_parser_not_implemented(&self, parser: &str) {
100 self.metrics
101 .1
102 .parser_unimplemented
103 .with_labels(&[("parser", parser)])
104 .and_then(|metric| {
105 metric.inc();
106 Some(metric)
107 });
108 }
109 #[cfg(feature="metrics")]
110 fn update_parser_format_error(&self, parser: &str) {
111 self.metrics
112 .1
113 .parser_format_error
114 .with_labels(&[("parser", parser)])
115 .and_then(|metric| {
116 metric.inc();
117 Some(metric)
118 });
119 }
120 #[cfg(feature="metrics")]
121 fn update_discard_metric(&self, parser: &str) {
122 self.metrics
123 .1
124 .parser_discarded
125 .with_labels(&[("parser", parser)])
126 .and_then(|metric| {
127 metric.inc();
128 Some(metric)
129 });
130 }
131
132 fn parse_log<'a>(
133 &'a self,
134 origin_parser_map: &mut BTreeMap<String, Vec<&'a Box<dyn LogParser>>>,
135 log: SiemLog,
136 ) -> Option<SiemLog> {
137 let mut empty = Vec::with_capacity(128);
138 let origin: String = log.origin().to_string();
139 let selected_parsers = match origin_parser_map.get_mut(&origin) {
140 Some(vc) => vc,
141 None => &mut empty,
142 };
143 let mut tried_parsers = BTreeSet::new();
144 let mut log = log;
145 for parser in &(*selected_parsers) {
147 match parser.parse_log(log, &self.datasets) {
148 Ok(lg) => {
149 return Some(lg);
150 }
151 Err(e) => match e {
152 LogParsingError::NoValidParser(lg) => {
153 log = lg;
154 }
155 LogParsingError::ParserError(lg, error) => {
156 usiem::warn!("Cannot parse log {:?}. Error={}", lg, error);
157 #[cfg(feature="metrics")]
158 self.update_parser_error_metric(parser.name());
159 return Some(lg);
160 }
161 LogParsingError::NotImplemented(lg) => {
162 #[cfg(feature="metrics")]
163 self.update_parser_not_implemented(parser.name());
164 return Some(lg);
165 }
166 LogParsingError::FormatError(lg, _) => {
167 #[cfg(feature="metrics")]
168 self.update_parser_format_error(parser.name());
169 return Some(lg);
170 }
171 LogParsingError::Discard => {
172 #[cfg(feature="metrics")]
173 self.update_discard_metric(parser.name());
174 return None;
175 }
176 },
177 };
178 tried_parsers.insert(parser.name());
179 }
180 for parser in &self.parsers {
182 if !tried_parsers.contains(parser.name()) {
183 log = match parser.parse_log(log, &self.datasets) {
184 Ok(lg) => {
185 if !origin_parser_map.contains_key(&origin) {
186 origin_parser_map.insert(origin.clone(), vec![&parser]);
187 } else {
188 let _: Option<String> =
189 origin_parser_map.get_mut(&origin).and_then(|x| {
190 x.push(&parser);
191 None
192 });
193 }
194 return Some(lg);
195 }
196 Err(e) => match e {
197 LogParsingError::NoValidParser(lg) => lg,
198 LogParsingError::ParserError(lg, error) => {
199 #[cfg(feature="metrics")]
200 self.update_parser_error_metric(parser.name());
201 if !origin_parser_map.contains_key(&origin) {
202 origin_parser_map.insert(origin.clone(), vec![&parser]);
203 } else {
204 let _: Option<String> =
205 origin_parser_map.get_mut(&origin).and_then(|x| {
206 x.push(&parser);
207 None
208 });
209 }
210 usiem::warn!("Cannot parse log {:?}. Error={}", lg, error);
211 return Some(lg);
212 }
213 LogParsingError::NotImplemented(lg) => {
214 #[cfg(feature="metrics")]
215 self.update_parser_not_implemented(parser.name());
216 if !origin_parser_map.contains_key(&origin) {
217 origin_parser_map.insert(origin.clone(), vec![&parser]);
218 } else {
219 let _: Option<String> =
220 origin_parser_map.get_mut(&origin).and_then(|x| {
221 x.push(&parser);
222 None
223 });
224 }
225 return Some(lg);
226 }
227 LogParsingError::FormatError(lg, error) => {
228 #[cfg(feature="metrics")]
229 self.update_parser_format_error(parser.name());
230 usiem::warn!("Cannot process log. Format error: {}", error);
231 if !origin_parser_map.contains_key(&origin) {
232 origin_parser_map.insert(origin.clone(), vec![&parser]);
233 } else {
234 let _: Option<String> =
235 origin_parser_map.get_mut(&origin).and_then(|x| {
236 x.push(&parser);
237 None
238 });
239 }
240 return Some(lg);
241 }
242 LogParsingError::Discard => {
243 #[cfg(feature="metrics")]
244 self.update_discard_metric(parser.name());
245 if !origin_parser_map.contains_key(&origin) {
246 origin_parser_map.insert(origin.clone(), vec![&parser]);
247 } else {
248 let _: Option<String> =
249 origin_parser_map.get_mut(&origin).and_then(|x| {
250 x.push(&parser);
251 None
252 });
253 }
254 return None;
255 }
256 },
257 };
258 }
259 }
260 Some(log)
261 }
262}
263
264impl SiemComponent for BasicParserComponent {
265 fn name(&self) -> &'static str {
266 "BasicParser"
267 }
268 fn local_channel(&self) -> Sender<SiemMessage> {
269 self.local_chnl_snd.clone()
270 }
271 fn set_log_channel(&mut self, log_sender: Sender<SiemLog>, receiver: Receiver<SiemLog>) {
272 self.log_receiver = receiver;
273 self.log_sender = log_sender;
274 }
275 fn duplicate(&self) -> Box<dyn SiemComponent> {
276 return Box::new(self.clone());
277 }
278 fn set_datasets(&mut self, datasets: DatasetHolder) {
279 self.datasets = datasets;
280 }
281
282 fn run(&mut self) -> Result<(), SiemError> {
284 let local_chnl_rcv = self.local_chnl_rcv.clone();
285 let mut origin_parser_map: BTreeMap<String, Vec<&Box<dyn LogParser>>> = BTreeMap::new();
286 loop {
287 let rcv_action = local_chnl_rcv.try_recv();
288 match rcv_action {
289 Ok(msg) => match msg {
290 SiemMessage::Command(hdr, cmd) => match cmd {
291 SiemCommandCall::STOP_COMPONENT(_name) => return Ok(()),
292 SiemCommandCall::LIST_PARSERS(_pagination) => {
293 send_message!(self.list_parsers(&hdr)).unwrap();
294 }
295 _ => {}
296 },
297 SiemMessage::Log(msg) => {
298 self.parse_log(&mut origin_parser_map, msg);
299 }
300 _ => {}
301 },
302 Err(e) => match e {
303 TryRecvError::Empty => {
304 std::thread::sleep(std::time::Duration::from_millis(10));
305 }
306 TryRecvError::Disconnected => return Ok(()),
307 },
308 }
309
310 let rcv_log = (&self.log_receiver).try_recv();
311 match rcv_log {
312 Ok(log) => {
313 let log = match self.parse_log(&mut origin_parser_map, log) {
314 Some(log) => log,
315 None => continue,
316 };
317 match self.log_sender.send(log) {
318 Ok(v) => v,
319 Err(err) => usiem::warn!("Cannot send log: {:?}", err.0),
320 };
321 }
322 Err(e) => match e {
323 TryRecvError::Empty => {
324 continue;
325 }
326 TryRecvError::Disconnected => return Ok(()),
327 },
328 }
329 }
330 }
331
332 fn set_storage(&mut self, conn: Box<dyn SiemComponentStateStorage>) {
334 self.conn = Some(conn);
335 }
336
337 fn capabilities(&self) -> SiemComponentCapabilities {
339 let datasets = Vec::new();
340 let mut commands = Vec::new();
341
342 let stop_component = CommandDefinition::new(SiemFunctionType::STOP_COMPONENT,Cow::Borrowed("Stop BasicParser") ,Cow::Borrowed("This allows stopping all Basic Parser components.\nUse only when really needed, like when there is a bug in the parsing process.") , UserRole::Administrator);
343 commands.push(stop_component);
344 let start_component = CommandDefinition::new(
345 SiemFunctionType::START_COMPONENT, Cow::Borrowed("Start Basic Parser"),
347 Cow::Borrowed("This allows processing logs."),
348 UserRole::Administrator,
349 );
350 commands.push(start_component);
351
352 let list_parsers = CommandDefinition::new(
353 SiemFunctionType::LIST_PARSERS,
354 Cow::Borrowed("List log parsers"),
355 Cow::Borrowed("List all parsers in this component."),
356 UserRole::Administrator,
357 );
358 commands.push(list_parsers);
359
360 SiemComponentCapabilities::new(
361 Cow::Borrowed("BasicParser"),
362 Cow::Borrowed("Parse logs using multiple diferent parsers"),
363 Cow::Borrowed(""), datasets,
365 commands,
366 vec![],
367 self.metrics.0.clone(),
368 )
369 }
370}
371
372#[cfg(test)]
373mod parser_test {
374 use std::convert::TryInto;
375
376 use super::BasicParserComponent;
377 use usiem::components::command::{
378 Pagination, SiemCommandCall, SiemCommandHeader, SiemCommandResponse,
379 };
380 use usiem::components::common::SiemMessage;
381 use usiem::components::SiemComponent;
382 use usiem::events::field::SiemField;
383 use usiem::events::SiemLog;
384 use usiem::prelude::counter::CounterVec;
385 use usiem::prelude::kernel_message::KernelMessager;
386 use usiem::prelude::{CommandResult, NotificationLevel};
387 use usiem::testing::parsers::{DummyParserAll, DummyParserError, DummyParserText};
388
389 #[test]
390 fn test_parser() {
391 let (log_to_component, log_receiver) = usiem::crossbeam_channel::unbounded();
392 let (log_sender, next_log_receiver) = usiem::crossbeam_channel::unbounded();
393
394 let parser1 = DummyParserText::new();
395 let parser2 = DummyParserAll::new();
396
397 let mut parser = BasicParserComponent::new();
398 let component_channel = parser.local_channel();
399 parser.add_parser(Box::from(parser1));
400 parser.add_parser(Box::from(parser2));
401 parser.set_log_channel(log_sender, log_receiver);
402
403 let log1 = SiemLog::new(
404 "This is a DUMY log for DummyParserTextDUMMY",
405 0,
406 "localhost1",
407 );
408 let log2 = SiemLog::new(
409 "This is NOT a DUmmY log for DummyParserTextDUmmY",
410 0,
411 "localhost2",
412 );
413 let _r = log_to_component.send(log1);
414 let _r = log_to_component.send(log2);
415
416 std::thread::spawn(move || {
417 std::thread::sleep(std::time::Duration::from_millis(200));
418 let _r = component_channel.send(SiemMessage::Command(
420 SiemCommandHeader {
421 comm_id: 0,
422 comp_id: 0,
423 user: "Superuser".to_string(),
424 },
425 SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
426 ));
427 });
428 parser.run().expect("Should not end with errors");
429 let log = next_log_receiver.recv().expect("Log must be received");
430 assert_eq!(
431 log.field("parser"),
432 Some(&SiemField::from_str("DummyParserText"))
433 );
434 let log = next_log_receiver.recv().expect("Log must be received");
435 assert_eq!(
436 log.field("parser"),
437 Some(&SiemField::from_str("DummyParserAll"))
438 );
439 }
440
441 #[test]
442 fn should_list_all_parsers() {
443 let (kernel_sender, kernel_receiver) = usiem::crossbeam_channel::unbounded();
444
445 let msngr = KernelMessager::new(1, String::new(), kernel_sender);
446
447 let parser1 = DummyParserText::new();
448 let parser2 = DummyParserAll::new();
449
450 let mut parser = BasicParserComponent::new();
451 let component_channel = parser.local_channel();
452 parser.add_parser(Box::from(parser1));
453 parser.add_parser(Box::from(parser2));
454 let parser_thd = std::thread::spawn(move || {
455 usiem::logging::initialize_component_logger(msngr);
456 parser.run().expect("Should end without errors");
457 });
458 std::thread::spawn(move || {
459 let _r = component_channel.send(SiemMessage::Command(
460 SiemCommandHeader {
461 comm_id: 0,
462 comp_id: 0,
463 user: "Superuser".to_string(),
464 },
465 SiemCommandCall::LIST_PARSERS(Pagination {
466 offset: 0,
467 limit: 1000,
468 }),
469 ));
470 std::thread::sleep(std::time::Duration::from_millis(200));
471 let _r = component_channel.send(SiemMessage::Command(
473 SiemCommandHeader {
474 comm_id: 0,
475 comp_id: 0,
476 user: "Superuser".to_string(),
477 },
478 SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
479 ));
480 });
481 parser_thd.join().unwrap();
482
483 let response = kernel_receiver.recv();
484 let msg: SiemMessage = response.unwrap();
485 if let SiemMessage::Response(_, SiemCommandResponse::LIST_PARSERS(CommandResult::Ok(res))) =
486 msg
487 {
488 assert_eq!(2, res.len());
489 assert_eq!("DummyParserText", res.get(0).unwrap().name);
490 assert_eq!("DummyParserAll", res.get(1).unwrap().name);
491 } else {
492 panic!("Must not be error")
493 }
494 }
495
496 #[test]
497 #[cfg(feature="metrics")]
498 fn should_update_metrics() {
499 let (kernel_sender, kernel_receiver) = usiem::crossbeam_channel::unbounded();
500
501 let msngr = KernelMessager::new(1, String::new(), kernel_sender);
502 let parser1 = DummyParserText::new();
503 let parser2 = DummyParserError::new();
504
505 let mut parser = BasicParserComponent::new();
506
507 let component_channel = parser.local_channel();
508 parser.add_parser(Box::from(parser2));
509 parser.add_parser(Box::from(parser1));
510
511 let capa = parser.capabilities();
512 let mut metrics = std::collections::BTreeMap::new();
513 capa.metrics().iter().for_each(|v| {
514 metrics.insert(v.name().to_string(), v.metric().clone());
515 });
516
517 let parser_unimplemented: CounterVec = metrics
518 .get("parser_unimplemented")
519 .unwrap()
520 .try_into()
521 .unwrap();
522 assert_eq!(0, parser_unimplemented.with_labels(&[]).unwrap().get());
523 let parser_format_error: CounterVec = metrics
524 .get("parser_format_error")
525 .unwrap()
526 .try_into()
527 .unwrap();
528 assert_eq!(0, parser_format_error.with_labels(&[]).unwrap().get());
529 let parser_bug_error: CounterVec =
530 metrics.get("parser_bug_error").unwrap().try_into().unwrap();
531 assert_eq!(0, parser_bug_error.with_labels(&[]).unwrap().get());
532 assert_eq!(
533 0,
534 parser_bug_error
535 .with_labels(&[("parser", "DummyParserError")])
536 .unwrap()
537 .get()
538 );
539 assert_eq!(
540 0,
541 parser_bug_error
542 .with_labels(&[("parser", "DummyParserText")])
543 .unwrap()
544 .get()
545 );
546
547 let parser_thd = std::thread::spawn(move || {
548 usiem::logging::set_max_level(NotificationLevel::Debug);
549 usiem::logging::initialize_component_logger(msngr);
550 parser.run().expect("Should end without errors");
551 });
552
553 std::thread::spawn(move || loop {
554 let msg = match kernel_receiver.recv() {
555 Ok(v) => v,
556 Err(_) => return,
557 };
558 if let SiemMessage::Notification(msg) = msg {
559 println!("{}", msg.log);
560 }
561 });
562
563 let log1 = SiemLog::new("This is a DUMMY log for DummyParserText", 0, "localhost1");
564 let _r = component_channel.send(SiemMessage::Log(log1));
565
566 std::thread::spawn(move || {
567 let _r = component_channel.send(SiemMessage::Command(
568 SiemCommandHeader {
569 comm_id: 0,
570 comp_id: 0,
571 user: "Superuser".to_string(),
572 },
573 SiemCommandCall::LIST_PARSERS(Pagination {
574 offset: 0,
575 limit: 1000,
576 }),
577 ));
578 std::thread::sleep(std::time::Duration::from_millis(200));
579 let _r = component_channel.send(SiemMessage::Command(
581 SiemCommandHeader {
582 comm_id: 0,
583 comp_id: 0,
584 user: "Superuser".to_string(),
585 },
586 SiemCommandCall::STOP_COMPONENT("BasicParserComponent".to_string()),
587 ));
588 std::thread::sleep(std::time::Duration::from_millis(200));
589 });
590
591 parser_thd.join().unwrap();
592 println!("{:?}", parser_bug_error);
593 assert_eq!(
594 1,
595 parser_bug_error
596 .with_labels(&[("parser", "DummyParserError")])
597 .unwrap()
598 .get()
599 );
600 assert_eq!(
601 0,
602 parser_bug_error
603 .with_labels(&[("parser", "DummyParserText")])
604 .unwrap()
605 .get()
606 );
607 }
608}