1use crate::err::{ChunkError, EvtxError, InputError, Result};
2
3use crate::evtx_chunk::EvtxChunkData;
4use crate::evtx_file_header::EvtxFileHeader;
5use crate::evtx_record::SerializedEvtxRecord;
6use bumpalo::Bump;
7#[cfg(feature = "multithreading")]
8use rayon::prelude::*;
9
10use log::trace;
11#[cfg(not(feature = "multithreading"))]
12use log::warn;
13
14use log::{debug, info};
15use std::fs::File;
16use std::io::{self, Cursor, Read, Seek, SeekFrom};
17
18use crate::EvtxRecord;
19use encoding::EncodingRef;
20use encoding::all::WINDOWS_1252;
21use std::cmp::max;
22use std::fmt;
23use std::fmt::Debug;
24use std::iter::{IntoIterator, Iterator};
25use std::path::Path;
26use std::sync::Arc;
27
28pub const EVTX_CHUNK_SIZE: usize = 65536;
29pub const EVTX_FILE_HEADER_SIZE: usize = 4096;
30
31pub trait ReadSeek: Read + Seek {
34 fn tell(&mut self) -> io::Result<u64> {
35 self.stream_position()
36 }
37 fn stream_len(&mut self) -> io::Result<u64> {
38 let old_pos = self.tell()?;
39 let len = self.seek(SeekFrom::End(0))?;
40
41 if old_pos != len {
44 self.seek(SeekFrom::Start(old_pos))?;
45 }
46
47 Ok(len)
48 }
49}
50
51impl<T: Read + Seek> ReadSeek for T {}
52
53pub struct EvtxParser<T: ReadSeek> {
94 data: T,
95 header: EvtxFileHeader,
96 config: Arc<ParserSettings>,
97 calculated_chunk_count: u64,
101}
102impl<T: ReadSeek> Debug for EvtxParser<T> {
103 fn fmt(&self, f: &mut fmt::Formatter) -> ::std::fmt::Result {
104 f.debug_struct("EvtxParser")
105 .field("header", &self.header)
106 .field("config", &self.config)
107 .finish()
108 }
109}
110
111#[derive(Clone)]
112pub struct ParserSettings {
113 num_threads: usize,
115 validate_checksums: bool,
117 separate_json_attributes: bool,
136 indent: bool,
138 ansi_codec: EncodingRef,
140 #[cfg(feature = "wevt_templates")]
143 wevt_cache: Option<Arc<crate::wevt_templates::WevtCache>>,
144}
145
146impl Debug for ParserSettings {
147 fn fmt(&self, f: &mut fmt::Formatter) -> ::std::fmt::Result {
148 let mut ds = f.debug_struct("ParserSettings");
149 ds.field("num_threads", &self.num_threads)
150 .field("validate_checksums", &self.validate_checksums)
151 .field("separate_json_attributes", &self.separate_json_attributes)
152 .field("indent", &self.indent)
153 .field("ansi_codec", &self.ansi_codec.name());
154
155 #[cfg(feature = "wevt_templates")]
156 ds.field("wevt_cache", &self.wevt_cache.is_some());
157
158 ds.finish()
159 }
160}
161
162impl PartialEq for ParserSettings {
163 fn eq(&self, other: &ParserSettings) -> bool {
164 self.ansi_codec.name() == other.ansi_codec.name()
165 && self.num_threads == other.num_threads
166 && self.validate_checksums == other.validate_checksums
167 && self.separate_json_attributes == other.separate_json_attributes
168 && self.indent == other.indent
169 }
170}
171
172impl Default for ParserSettings {
173 fn default() -> Self {
174 ParserSettings {
175 num_threads: 0,
176 validate_checksums: false,
177 separate_json_attributes: false,
178 indent: true,
179 ansi_codec: WINDOWS_1252,
180 #[cfg(feature = "wevt_templates")]
181 wevt_cache: None,
182 }
183 }
184}
185
186impl ParserSettings {
187 pub fn new() -> Self {
188 ParserSettings::default()
189 }
190
191 #[cfg(feature = "multithreading")]
195 pub fn num_threads(mut self, num_threads: usize) -> Self {
196 self.num_threads = if num_threads == 0 {
197 rayon::current_num_threads()
198 } else {
199 num_threads
200 };
201 self
202 }
203
204 #[cfg(not(feature = "multithreading"))]
206 pub fn num_threads(mut self, _num_threads: usize) -> Self {
207 warn!("Setting num_threads has no effect when compiling without multithreading support.");
208
209 self.num_threads = 1;
210 self
211 }
212
213 pub fn ansi_codec(mut self, ansi_codec: EncodingRef) -> Self {
215 self.ansi_codec = ansi_codec;
216
217 self
218 }
219
220 #[cfg(feature = "wevt_templates")]
222 pub fn wevt_cache(mut self, cache: Option<Arc<crate::wevt_templates::WevtCache>>) -> Self {
223 self.wevt_cache = cache;
224 self
225 }
226
227 pub fn validate_checksums(mut self, validate_checksums: bool) -> Self {
228 self.validate_checksums = validate_checksums;
229
230 self
231 }
232
233 pub fn separate_json_attributes(mut self, separate: bool) -> Self {
234 self.separate_json_attributes = separate;
235
236 self
237 }
238
239 pub fn indent(mut self, pretty: bool) -> Self {
240 self.indent = pretty;
241
242 self
243 }
244
245 pub fn get_ansi_codec(&self) -> EncodingRef {
247 self.ansi_codec
248 }
249
250 #[cfg(feature = "wevt_templates")]
251 pub(crate) fn get_wevt_cache(&self) -> Option<&Arc<crate::wevt_templates::WevtCache>> {
252 self.wevt_cache.as_ref()
253 }
254
255 pub fn should_separate_json_attributes(&self) -> bool {
256 self.separate_json_attributes
257 }
258
259 pub fn should_indent(&self) -> bool {
260 self.indent
261 }
262
263 pub fn should_validate_checksums(&self) -> bool {
264 self.validate_checksums
265 }
266
267 pub fn get_num_threads(&self) -> &usize {
268 &self.num_threads
269 }
270}
271
272impl EvtxParser<File> {
273 pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
276 let path = path
277 .as_ref()
278 .canonicalize()
279 .map_err(|e| InputError::failed_to_open_file(e, &path))?;
280
281 let f = File::open(&path).map_err(|e| InputError::failed_to_open_file(e, &path))?;
282
283 let cursor = f;
284 Self::from_read_seek(cursor)
285 }
286}
287
288impl EvtxParser<Cursor<Vec<u8>>> {
289 pub fn from_buffer(buffer: Vec<u8>) -> Result<Self> {
291 let cursor = Cursor::new(buffer);
292 Self::from_read_seek(cursor)
293 }
294}
295
296impl<T: ReadSeek> EvtxParser<T> {
297 pub fn from_read_seek(mut read_seek: T) -> Result<Self> {
298 let evtx_header = EvtxFileHeader::from_stream(&mut read_seek)?;
299
300 let stream_size = ReadSeek::stream_len(&mut read_seek)?;
305 let chunk_data_size: u64 =
306 match stream_size.checked_sub(evtx_header.header_block_size.into()) {
307 Some(c) => c,
308 None => {
309 return Err(EvtxError::calculation_error(format!(
310 "Could not calculate valid chunk count because stream size is less \
311 than evtx header block size. (stream_size: {}, header_block_size: {})",
312 stream_size, evtx_header.header_block_size
313 )));
314 }
315 };
316 let chunk_count = chunk_data_size / EVTX_CHUNK_SIZE as u64;
317
318 debug!("EVTX Header: {:#?}", evtx_header);
319 Ok(EvtxParser {
320 data: read_seek,
321 header: evtx_header,
322 config: Arc::new(ParserSettings::default()),
323 calculated_chunk_count: chunk_count,
324 })
325 }
326
327 pub fn with_configuration(mut self, configuration: ParserSettings) -> Self {
328 self.config = Arc::new(configuration);
329 self
330 }
331
332 fn allocate_chunk(
338 data: &mut T,
339 chunk_number: u64,
340 validate_checksum: bool,
341 ) -> Result<Option<EvtxChunkData>> {
342 let mut chunk_data = Vec::with_capacity(EVTX_CHUNK_SIZE);
343 let chunk_offset = EVTX_FILE_HEADER_SIZE + chunk_number as usize * EVTX_CHUNK_SIZE;
344
345 trace!(
346 "Offset `0x{:08x} ({})` - Reading chunk number `{}`",
347 chunk_offset, chunk_offset, chunk_number
348 );
349
350 data.seek(SeekFrom::Start(chunk_offset as u64))
351 .map_err(|e| EvtxError::FailedToParseChunk {
352 chunk_id: chunk_number,
353 source: Box::new(ChunkError::FailedToSeekToChunk(e)),
354 })?;
355
356 let amount_read = data
357 .take(EVTX_CHUNK_SIZE as u64)
358 .read_to_end(&mut chunk_data)
359 .map_err(|_| EvtxError::incomplete_chunk(chunk_number))?;
360
361 if amount_read != EVTX_CHUNK_SIZE {
362 return Err(EvtxError::incomplete_chunk(chunk_number));
363 }
364
365 if chunk_data.iter().all(|x| *x == 0) {
367 return Ok(None);
368 }
369
370 EvtxChunkData::new(chunk_data, validate_checksum)
371 .map(Some)
372 .map_err(|e| EvtxError::FailedToParseChunk {
373 chunk_id: chunk_number,
374 source: Box::new(e),
375 })
376 }
377
378 pub fn find_next_chunk(
382 &mut self,
383 mut chunk_number: u64,
384 ) -> Option<(Result<EvtxChunkData>, u64)> {
385 loop {
386 match EvtxParser::allocate_chunk(
387 &mut self.data,
388 chunk_number,
389 self.config.validate_checksums,
390 ) {
391 Err(err) => {
392 if chunk_number >= self.calculated_chunk_count {
395 return None;
396 } else {
397 return Some((Err(err), chunk_number));
398 }
399 }
400 Ok(None) => {
401 chunk_number = chunk_number.checked_add(1)?
405 }
406 Ok(Some(chunk)) => {
407 return Some((Ok(chunk), chunk_number));
408 }
409 };
410 }
411 }
412
413 pub fn chunks(&mut self) -> IterChunks<'_, T> {
417 IterChunks {
418 parser: self,
419 current_chunk_number: 0,
420 }
421 }
422
423 pub fn into_chunks(self) -> IntoIterChunks<T> {
427 IntoIterChunks {
428 parser: self,
429 current_chunk_number: 0,
430 }
431 }
432 pub fn serialized_records<'a, U: Send>(
435 &'a mut self,
436 f: impl FnMut(Result<EvtxRecord<'_>>) -> Result<U> + Send + Sync + Clone + 'a,
437 ) -> impl Iterator<Item = Result<U>> + 'a {
438 struct ChunkBatch<U> {
439 results: Vec<Result<U>>,
440 arena: Bump,
441 }
442
443 let num_threads = max(self.config.num_threads, 1);
445 let chunk_settings = Arc::clone(&self.config);
446
447 let mut chunks = self.chunks();
449 let mut arena_pool: Vec<Bump> = (0..num_threads)
450 .map(|_| Bump::with_capacity(EVTX_CHUNK_SIZE))
451 .collect();
452
453 let records_per_chunk = std::iter::from_fn(move || {
454 let mut chunk_of_chunks = Vec::with_capacity(num_threads);
456
457 for _ in 0..num_threads {
458 if let Some(chunk) = chunks.next() {
459 let arena = arena_pool.pop().unwrap_or_default();
460 chunk_of_chunks.push((chunk, arena));
461 };
462 }
463
464 if chunk_of_chunks.is_empty() {
466 None
467 } else {
468 #[cfg(feature = "multithreading")]
469 let chunk_iter = chunk_of_chunks.into_par_iter();
470
471 #[cfg(not(feature = "multithreading"))]
472 let chunk_iter = chunk_of_chunks.into_iter();
473
474 let iterators: Vec<ChunkBatch<U>> = chunk_iter
476 .enumerate()
477 .map(|(i, (chunk_res, arena))| match chunk_res {
478 Err(err) => ChunkBatch {
479 results: vec![Err(err)],
480 arena,
481 },
482 Ok(mut chunk) => {
483 let chunk_records_res =
484 chunk.parse_with_arena(chunk_settings.clone(), arena);
485
486 match chunk_records_res {
487 Err(err) => ChunkBatch {
488 results: vec![Err(EvtxError::FailedToParseChunk {
489 chunk_id: i as u64,
490 source: Box::new(err),
491 })],
492 arena: Bump::new(),
493 },
494 Ok(mut chunk_records) => {
495 let results = {
496 let records = chunk_records.iter();
497 records.map(f.clone()).collect()
498 };
499 let arena = chunk_records.into_arena();
500 ChunkBatch { results, arena }
501 }
502 }
503 }
504 })
505 .collect();
506
507 let mut flattened = Vec::new();
508 for batch in iterators {
509 arena_pool.push(batch.arena);
510 flattened.extend(batch.results);
511 }
512
513 Some(flattened.into_iter())
514 }
515 });
516
517 records_per_chunk.flatten()
518 }
519
520 pub fn records(&mut self) -> impl Iterator<Item = Result<SerializedEvtxRecord<String>>> + '_ {
523 self.serialized_records(|record| record.and_then(|record| record.into_xml()))
525 }
526
527 pub fn records_json(
530 &mut self,
531 ) -> impl Iterator<Item = Result<SerializedEvtxRecord<String>>> + '_ {
532 self.serialized_records(|record| record.and_then(|record| record.into_json()))
533 }
534
535 pub fn records_json_value(
538 &mut self,
539 ) -> impl Iterator<Item = Result<SerializedEvtxRecord<serde_json::Value>>> + '_ {
540 self.serialized_records(|record| record.and_then(|record| record.into_json_value()))
541 }
542}
543
544pub struct IterChunks<'c, T: ReadSeek> {
545 parser: &'c mut EvtxParser<T>,
546 current_chunk_number: u64,
547}
548
549impl<T: ReadSeek> Iterator for IterChunks<'_, T> {
550 type Item = Result<EvtxChunkData>;
551 fn next(&mut self) -> Option<<Self as Iterator>::Item> {
552 match self.parser.find_next_chunk(self.current_chunk_number) {
553 None => None,
554 Some((chunk, chunk_number)) => {
555 self.current_chunk_number = chunk_number.checked_add(1)?;
556
557 Some(chunk)
558 }
559 }
560 }
561}
562
563pub struct IntoIterChunks<T: ReadSeek> {
564 parser: EvtxParser<T>,
565 current_chunk_number: u64,
566}
567
568impl<T: ReadSeek> Iterator for IntoIterChunks<T> {
569 type Item = Result<EvtxChunkData>;
570 fn next(&mut self) -> Option<<Self as Iterator>::Item> {
571 info!("Chunk {}", self.current_chunk_number);
572 match self.parser.find_next_chunk(self.current_chunk_number) {
573 None => None,
574 Some((chunk, chunk_number)) => {
575 self.current_chunk_number = chunk_number.checked_add(1)?;
576
577 Some(chunk)
578 }
579 }
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 #![allow(unused_variables)]
586
587 use super::*;
588 use crate::ensure_env_logger_initialized;
589
590 fn process_90_records(buffer: &'static [u8]) -> crate::err::Result<()> {
591 let mut parser = EvtxParser::from_buffer(buffer.to_vec())?;
592
593 for (i, record) in parser.records().take(90).enumerate() {
594 let r = record?;
595 assert_eq!(r.event_record_id, i as u64 + 1);
596 }
597
598 Ok(())
599 }
600
601 #[test]
603 fn test_process_single_chunk() -> crate::err::Result<()> {
604 ensure_env_logger_initialized();
605 let evtx_file = include_bytes!("../samples/security.evtx");
606 process_90_records(evtx_file)?;
607
608 Ok(())
609 }
610
611 #[test]
612 fn test_sample_2() {
613 let evtx_file = include_bytes!("../samples/system.evtx");
614 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
615
616 let records: Vec<_> = parser.records().take(10).collect();
617
618 for (i, record) in records.iter().enumerate() {
619 match record {
620 Ok(r) => {
621 assert_eq!(
622 r.event_record_id,
623 i as u64 + 1,
624 "Parser is skipping records!"
625 );
626 }
627 Err(e) => panic!("Error while reading record {}, {:?}", i, e),
628 }
629 }
630
631 assert!(
633 records[0]
634 .as_ref()
635 .unwrap()
636 .data
637 .contains("<Binary></Binary>")
638 );
639 assert!(
640 records[1]
641 .as_ref()
642 .unwrap()
643 .data
644 .contains("<Binary>E107070003000C00110010001C00D6000000000000000000</Binary>")
645 );
646 }
647
648 #[test]
649 fn test_parses_first_10_records() {
650 ensure_env_logger_initialized();
651 let evtx_file = include_bytes!("../samples/security.evtx");
652 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
653
654 for (i, record) in parser.records().take(10).enumerate() {
655 match record {
656 Ok(r) => {
657 assert_eq!(
658 r.event_record_id,
659 i as u64 + 1,
660 "Parser is skipping records!"
661 );
662 }
663 Err(e) => panic!("Error while reading record {}, {:?}", i, e),
664 }
665 }
666 }
667
668 #[test]
669 fn test_parses_records_from_different_chunks() {
670 ensure_env_logger_initialized();
671 let evtx_file = include_bytes!("../samples/security.evtx");
672 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
673
674 for (i, record) in parser.records().take(1000).enumerate() {
675 match record {
676 Ok(r) => {
677 assert_eq!(r.event_record_id, i as u64 + 1);
678 }
679 Err(e) => println!("Error while reading record {}, {:?}", i, e),
680 }
681 }
682 }
683
684 #[test]
685 #[cfg(feature = "multithreading")]
686 fn test_multithreading() {
687 use std::collections::HashSet;
688
689 ensure_env_logger_initialized();
690 let evtx_file = include_bytes!("../samples/security.evtx");
691 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
692
693 let mut record_ids = HashSet::new();
694 for record in parser.records().take(1000) {
695 match record {
696 Ok(r) => {
697 record_ids.insert(r.event_record_id);
698 }
699 Err(e) => panic!("Error while reading record {:?}", e),
700 }
701 }
702
703 assert_eq!(record_ids.len(), 1000);
704 }
705
706 #[test]
707 fn test_file_with_only_a_single_chunk() {
708 ensure_env_logger_initialized();
709 let evtx_file = include_bytes!("../samples/new-user-security.evtx");
710 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
711
712 assert_eq!(parser.records().count(), 4);
713 }
714
715 #[test]
716 fn test_parses_chunk2() {
717 ensure_env_logger_initialized();
718 let evtx_file = include_bytes!("../samples/security.evtx");
719
720 let mut chunk = EvtxChunkData::new(
721 evtx_file[EVTX_FILE_HEADER_SIZE + EVTX_CHUNK_SIZE
722 ..EVTX_FILE_HEADER_SIZE + 2 * EVTX_CHUNK_SIZE]
723 .to_vec(),
724 false,
725 )
726 .unwrap();
727
728 assert!(chunk.validate_checksum());
729
730 for record in chunk
731 .parse(Arc::new(ParserSettings::default()))
732 .unwrap()
733 .iter()
734 {
735 record.unwrap();
736 }
737 }
738
739 #[test]
740 fn test_into_chunks() {
741 ensure_env_logger_initialized();
742 let evtx_file = include_bytes!("../samples/new-user-security.evtx");
743 let parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
744
745 assert_eq!(parser.into_chunks().count(), 1);
746 }
747
748 #[test]
749 fn test_into_json_value_records() {
750 ensure_env_logger_initialized();
751 let evtx_file = include_bytes!("../samples/new-user-security.evtx");
752 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
753
754 let records: Vec<_> = parser.records_json_value().collect();
755
756 for record in records {
757 let record = record.unwrap();
758
759 assert!(record.data.is_object());
760 assert!(record.data.as_object().unwrap().contains_key("Event"));
761 }
762 }
763
764 #[test]
765 fn test_parse_event_with_zero_() {
766 ensure_env_logger_initialized();
767 let evtx_file = include_bytes!("../samples/new-user-security.evtx");
768 let mut parser = EvtxParser::from_buffer(evtx_file.to_vec()).unwrap();
769
770 let records: Vec<_> = parser.records_json_value().collect();
771
772 for record in records {
773 let record = record.unwrap();
774
775 assert!(record.data.is_object());
776 assert!(record.data.as_object().unwrap().contains_key("Event"));
777 }
778 }
779}