flow_record/
serializer.rs

1use std::{collections::HashSet, io::Write};
2
3use binrw::{io::NoSeek, BinResult, BinWrite};
4use flow_record_common::{Error, FlowRecord, RecordPack};
5use rmpv::Value;
6
7use crate::prelude::RawFlowRecord;
8
9pub const RECORDSTREAM_MAGIC: &[u8] = b"RECORDSTREAM\n";
10
11pub struct DfirSerializer<W: Write> {
12    writer: NoSeek<W>,
13    has_header_written: bool,
14    written_descriptor_hashes: HashSet<u32>,
15}
16
17impl<W> DfirSerializer<W>
18where
19    W: Write,
20{
21    pub fn new(writer: W) -> Self {
22        let writer = NoSeek::new(writer);
23        Self {
24            writer,
25            has_header_written: false,
26            written_descriptor_hashes: HashSet::new(),
27        }
28    }
29
30    pub fn into_inner(self) -> W {
31        self.writer.into_inner()
32    }
33
34    pub fn without_header(mut self) -> Self {
35        self.has_header_written = true;
36        self
37    }
38
39    pub fn serialize<R>(&mut self, record: R) -> Result<(), Error>
40    where
41        R: FlowRecord,
42    {
43        if !self.has_header_written {
44            self.write_header()?;
45        }
46
47        if !self
48            .written_descriptor_hashes
49            .contains(&R::descriptor_hash())
50        {
51            self.write_descriptor::<R>()?;
52        }
53
54        self.write_flow_record(Value::try_from(RecordPack::with_record(record))?.into())?;
55
56        Ok(())
57    }
58
59    fn write_descriptor<R>(&mut self) -> Result<(), Error>
60    where
61        R: FlowRecord,
62    {
63        self.write_flow_record(
64            Value::try_from(RecordPack::with_descriptor(R::descriptor().clone()))?.into(),
65        )?;
66        self.written_descriptor_hashes.insert(R::descriptor_hash());
67
68        for (hash, descriptor) in R::child_descriptors() {
69            if !self.written_descriptor_hashes.contains(hash) {
70                self.write_flow_record(
71                    Value::try_from(RecordPack::with_descriptor(descriptor.clone()))?.into(),
72                )?;
73                self.written_descriptor_hashes.insert(*hash);
74            }
75        }
76        Ok(())
77    }
78
79    fn write_header(&mut self) -> Result<(), Error> {
80        self.write_flow_record(Value::Binary(RECORDSTREAM_MAGIC.to_vec()).into())?;
81        self.has_header_written = true;
82        Ok(())
83    }
84
85    fn write_flow_record(&mut self, fr: RawFlowRecord) -> BinResult<()> {
86        fr.write_be(&mut self.writer)
87    }
88}