1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use std::{collections::HashSet, io::Write};

use flow_record_common::Object;
use serde::Serialize;

use crate::Record;

const RECORDSTREAM_MAGIC: &[u8] = b"RECORDSTREAM\n";

pub struct DfirSerializer<W: Write> {
    writer: W,
    has_header_written: bool,
    buffer: Vec<u8>,
    written_descriptor_hashes: HashSet<u32>,
}

impl<W> DfirSerializer<W>
where
    W: Write,
{
    pub fn new(writer: W) -> Self {
        Self {
            writer,
            has_header_written: false,
            buffer: Vec::new(),
            written_descriptor_hashes: HashSet::new(),
        }
    }

    pub fn into_inner(self) -> W {
        self.writer
    }

    pub fn without_header(mut self) -> Self {
        self.has_header_written = true;
        self
    }

    pub fn serialize<R>(&mut self, record: &R) -> Result<(), rmp_serde::encode::Error>
    where
        R: Record,
    {
        if !self.has_header_written {
            self.print_magic()?;
            self.has_header_written = true;
        }

        let descriptor_hash = R::descriptor_hash();
        if !self.written_descriptor_hashes.contains(&descriptor_hash) {
            self.buffer.extend(R::descriptor());
            self.flush_buffer();

            self.written_descriptor_hashes.insert(descriptor_hash);
        }

        Object::with_record(record).serialize(&mut self.serializer())?;

        self.flush_buffer();

        Ok(())
    }

    fn serializer(&mut self) -> rmp_serde::Serializer<&mut Vec<u8>> {
        rmp_serde::Serializer::new(&mut self.buffer)
            .with_bytes(rmp_serde::config::BytesMode::ForceAll)
    }

    fn flush_buffer(&mut self) {
        let size = (self.buffer.len() as u32).to_be_bytes();
        self.writer.write_all(&size).unwrap();
        self.writer.write_all(&self.buffer).unwrap();
        self.buffer.clear();
    }

    pub fn print_magic(&mut self) -> Result<(), rmp_serde::encode::Error> {
        RECORDSTREAM_MAGIC.serialize(&mut self.serializer())?;
        self.flush_buffer();
        Ok(())
    }
}