rusty_chunkenc/
chunk.rs

1use std::num::NonZeroUsize;
2
3use nom::{
4    branch::alt,
5    bytes::complete::tag,
6    combinator::{consumed, value},
7    IResult, Input, Parser, ToUsize,
8};
9
10use crate::{
11    crc32c::{assert_crc32c_on_data, read_crc32c},
12    histogram::{
13        read_float_histogram_chunk_data, read_histogram_chunk_data, FloatHistogramChunk,
14        HistogramChunk,
15    },
16    uvarint::read_uvarint,
17    xor::{read_xor_chunk_data, XORChunk, XORSample},
18};
19
20#[derive(Debug, PartialEq, Clone, Copy)]
21pub(crate) enum ChunkType {
22    #[allow(clippy::upper_case_acronyms)]
23    XOR,
24    Histogram,
25    FloatHistogram,
26}
27
28struct ChunkHeader {
29    chunk_size: u64,
30    chunk_type: ChunkType,
31}
32
33pub(crate) trait ChunkWithBlockChunkRef {
34    fn block_chunk_ref(&self) -> Option<u64>;
35    fn compute_block_chunk_ref(&mut self, file_index: u64, chunks_addr: *const u8);
36}
37
38/// A Prometheus chunk.
39///
40/// It can be a XOR chunk, a histogram chunk, or a float histogram chunk.
41///
42/// For now, only the XOR chunk type is fully implemented.
43#[derive(Debug, PartialEq)]
44pub enum Chunk {
45    XOR(XORChunk),
46    Histogram(HistogramChunk),
47    FloatHistogram(FloatHistogramChunk),
48}
49
50impl Chunk {
51    /// Creates a Chunk of type XOR.
52    pub fn new_xor(samples: Vec<XORSample>) -> Self {
53        Self::XOR(XORChunk::new(samples))
54    }
55
56    /// Returns the XOR chunk if it's a XOR chunk.
57    pub fn as_xor(self) -> Option<XORChunk> {
58        match self {
59            Chunk::XOR(xor_chunk) => Some(xor_chunk),
60            _ => None,
61        }
62    }
63
64    /// Retuns the block chunk reference.
65    pub fn block_chunk_ref(&self) -> Option<u64> {
66        match self {
67            Chunk::XOR(xor_chunk) => xor_chunk.block_chunk_ref(),
68            Chunk::Histogram(histogram_chunk) => histogram_chunk.block_chunk_ref(),
69            Chunk::FloatHistogram(float_histogram_chunk) => float_histogram_chunk.block_chunk_ref(),
70        }
71    }
72
73    pub(crate) fn compute_chunk_ref(&mut self, file_index: u64, chunks_addr: *const u8) {
74        match self {
75            Chunk::XOR(xor_chunk) => {
76                xor_chunk.compute_block_chunk_ref(file_index, chunks_addr);
77            }
78            Chunk::Histogram(histogram_chunk) => {
79                histogram_chunk.compute_block_chunk_ref(file_index, chunks_addr);
80            }
81            Chunk::FloatHistogram(float_histogram_chunk) => {
82                float_histogram_chunk.compute_block_chunk_ref(file_index, chunks_addr);
83            }
84        }
85    }
86}
87
88fn read_chunk_type(input: &[u8]) -> IResult<&[u8], ChunkType> {
89    alt((
90        value(ChunkType::XOR, tag(&[1u8][..])),
91        value(ChunkType::Histogram, tag(&[2u8][..])),
92        value(ChunkType::FloatHistogram, tag(&[3u8][..])),
93    ))
94    .parse(input)
95}
96
97fn read_chunk_header(input: &[u8]) -> IResult<&[u8], ChunkHeader> {
98    let (remaining_input, (chunk_size, chunk_type)) =
99        (read_uvarint, read_chunk_type).parse(input)?;
100
101    Ok((
102        remaining_input,
103        ChunkHeader {
104            chunk_size,
105            chunk_type,
106        },
107    ))
108}
109
110fn parse_chunk_data(
111    addr: *const u8,
112    chunk_type: ChunkType,
113    chunk_data: &[u8],
114) -> IResult<&[u8], Chunk> {
115    match chunk_type {
116        ChunkType::XOR => {
117            let (remaining_input, mut xor_chunk) = read_xor_chunk_data(chunk_data)?;
118            xor_chunk.set_addr(addr);
119            Ok((remaining_input, Chunk::XOR(xor_chunk)))
120        }
121        ChunkType::Histogram => {
122            let (remaining_input, histogram_chunk) = read_histogram_chunk_data(chunk_data)?;
123            Ok((remaining_input, Chunk::Histogram(histogram_chunk)))
124        }
125        ChunkType::FloatHistogram => {
126            let (remaining_input, float_histogram_chunk) =
127                read_float_histogram_chunk_data(chunk_data)?;
128            Ok((
129                remaining_input,
130                Chunk::FloatHistogram(float_histogram_chunk),
131            ))
132        }
133    }
134}
135
136/// Reads a chunk from the input data.
137///
138/// Returns the remaining input data and the chunk.
139pub fn read_chunk(input: &[u8]) -> IResult<&[u8], Chunk> {
140    let addr = input.as_ptr();
141
142    let (remaining_input, (consumed_header_bytes, chunk_header)) =
143        consumed(read_chunk_header).parse(input)?;
144
145    // Check if there is enough data to read the chunk, the nom way
146    let chunk_size: usize = chunk_header.chunk_size.to_usize();
147    if let Some(needed) = chunk_size
148        .checked_sub(remaining_input.len())
149        .and_then(NonZeroUsize::new)
150    {
151        return Err(nom::Err::Incomplete(nom::Needed::Size(needed)));
152    }
153
154    // Extract the data section
155    let (remaining_input, chunk_data) = remaining_input.take_split(chunk_size);
156
157    // Before we parse the chunk data, we read and check the CRC32 Castagnoli checksum
158    let (remaining_input, chunk_crc32c) = read_crc32c(remaining_input)?;
159
160    // We need to get the size of the header because it has a variable length
161    // and we use the end of the header in the CRC32 calculation.
162    // The CRC32C is computed on the type and the data, but not the size
163    const CHUNK_TYPE_SIZE: usize = 1;
164    let header_length = consumed_header_bytes.len();
165    assert_crc32c_on_data(
166        input,
167        header_length - CHUNK_TYPE_SIZE,
168        chunk_size + CHUNK_TYPE_SIZE,
169        chunk_crc32c,
170    )?;
171
172    // Finaly, we can parse the chunk data
173    let (remaining_chunk_data_input, chunk) =
174        parse_chunk_data(addr, chunk_header.chunk_type, chunk_data)?;
175
176    // https://github.com/prometheus/prometheus/pull/14854
177    if !remaining_chunk_data_input.is_empty() {
178        // The bug is that a whole byte of 0 is used for padding.
179        let (remaining_chunk_data_input, _) =
180            tag(&[0u8; 1][..]).parse(remaining_chunk_data_input)?;
181        assert!(remaining_chunk_data_input.is_empty());
182    }
183
184    // We jungled a bit between the input buffers because we wanted to check the CRC32 checksum
185    // before we parsed the chunk data. Sorry about that.
186
187    Ok((remaining_input, chunk))
188}