rusty_chunkenc/encoder/
chunk_encoder.rs

1use crate::{
2    chunk::{Chunk, ChunkType},
3    crc32c::write_crc32c,
4};
5
6use super::uvarint_encoder::write_uvarint;
7
8fn write_chunk_type<W: std::io::Write>(
9    chunk_type: ChunkType,
10    writer: &mut W,
11) -> std::io::Result<()> {
12    match chunk_type {
13        ChunkType::XOR => {
14            writer.write_all(&[1u8])?;
15        }
16        ChunkType::Histogram => {
17            writer.write_all(&[2u8])?;
18        }
19        ChunkType::FloatHistogram => {
20            writer.write_all(&[3u8])?;
21        }
22    }
23    Ok(())
24}
25
26impl Chunk {
27    /// Writes the chunk to the writer in the Prometheus format.
28    pub fn write<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
29        // A chunk starts by its size, which we don't know yet
30        // So… we can't stream the chunk writing, we have to
31        // write it all in memory first.
32
33        let mut buffer: Vec<u8> = Vec::with_capacity(32);
34
35        match self {
36            Chunk::XOR(xor_chunk) => {
37                write_chunk_type(ChunkType::XOR, &mut buffer)?;
38                xor_chunk.write(&mut buffer)?;
39            }
40            Chunk::Histogram(histogram_chunk) => {
41                write_chunk_type(ChunkType::Histogram, &mut buffer)?;
42                histogram_chunk.write(&mut buffer)?;
43            }
44            Chunk::FloatHistogram(float_histogram_chunk) => {
45                write_chunk_type(ChunkType::FloatHistogram, &mut buffer)?;
46                float_histogram_chunk.write(&mut buffer)?;
47            }
48        }
49
50        let chunk_len = buffer.len() as u64 - 1;
51
52        write_uvarint(chunk_len, writer)?;
53        writer.write_all(&buffer)?;
54        write_crc32c(&buffer, writer)?;
55
56        Ok(())
57    }
58}
59
60#[cfg(test)]
61mod tests {
62
63    use crate::{
64        chunk::read_chunk,
65        xor::{XORChunk, XORSample},
66    };
67
68    use super::*;
69    use rand::{Rng, SeedableRng};
70
71    fn generate_random_test_data(seed: u64, count: usize) -> Vec<Chunk> {
72        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
73
74        let mut test_cases = Vec::with_capacity(count);
75        for _ in 0..count {
76            let mut timestamp: i64 = rng.random_range(1234567890..1357908642);
77            let vec_size = rng.random_range(1..129);
78            let mut vec = Vec::with_capacity(vec_size);
79
80            let mut value: f64 = if rng.random_bool(0.5) {
81                rng.random_range(-100000000.0..1000000.0)
82            } else {
83                rng.random_range(-10000.0..10000.0)
84            };
85            vec.push(XORSample { timestamp, value });
86
87            for _ in 1..vec_size {
88                timestamp += rng.random_range(1..30);
89                if rng.random_bool(0.33) {
90                    value += 1.0;
91                } else if rng.random_bool(0.33) {
92                    value = rng.random();
93                }
94                vec.push(XORSample { timestamp, value });
95            }
96            test_cases.push(Chunk::XOR(XORChunk::new(vec)));
97        }
98        test_cases
99    }
100
101    #[test]
102    fn test_write_chunk() {
103        let test_cases = generate_random_test_data(1234, 128);
104
105        let mut buffer: Vec<u8> = Vec::new();
106
107        // Write
108        for test_case in &test_cases {
109            test_case.write(&mut buffer).unwrap();
110        }
111
112        // Read again
113        let mut cursor: &[u8] = &buffer;
114        for test_case in test_cases {
115            let (new_cursor, parsed_chunk) = read_chunk(cursor).unwrap();
116            assert_eq!(parsed_chunk, test_case);
117            cursor = new_cursor;
118        }
119    }
120
121    #[test]
122    fn test_wrong_crc32c() {
123        let test_cases = generate_random_test_data(1234, 1);
124        let test_case = &test_cases[0];
125        let mut buffer: Vec<u8> = Vec::new();
126        test_case.write(&mut buffer).unwrap();
127        // check that it's read correctly first
128        let (_, parsed_chunk) = read_chunk(&buffer).unwrap();
129        assert_eq!(&parsed_chunk, test_case);
130
131        // Now corrupt the CRC32C
132        let buffer_len = buffer.len();
133        buffer[buffer_len - 4] = !buffer[buffer_len - 4];
134
135        let error = read_chunk(&buffer).unwrap_err();
136        assert!(error.to_string().contains("Verify"));
137    }
138}