kona_protocol/batch/
reader.rs

1//! Contains the [`BatchReader`] which is used to iteratively consume batches from raw data.
2
3use crate::{Batch, BrotliDecompressionError, decompress_brotli};
4use alloc::vec::Vec;
5use alloy_primitives::Bytes;
6use alloy_rlp::Decodable;
7use kona_genesis::RollupConfig;
8use miniz_oxide::inflate::decompress_to_vec_zlib;
9
10/// Error type for decompression failures.
11#[derive(Debug, thiserror::Error)]
12pub enum DecompressionError {
13    /// The data to decompress was empty.
14    #[error("the data to decompress was empty")]
15    EmptyData,
16    /// The compression type is not supported.
17    #[error("the compression type {0} is not supported")]
18    UnsupportedType(u8),
19    /// A brotli decompression error.
20    #[error("brotli decompression error: {0}")]
21    BrotliError(#[from] BrotliDecompressionError),
22    /// A zlib decompression error.
23    #[error("zlib decompression error")]
24    ZlibError,
25    /// The RLP data is too large for the configured maximum.
26    #[error("the RLP data is too large: {0} bytes, maximum allowed: {1} bytes")]
27    RlpTooLarge(usize, usize),
28}
29
30/// Batch Reader provides a function that iteratively consumes batches from the reader.
31/// The L1Inclusion block is also provided at creation time.
32/// Warning: the batch reader can read every batch-type.
33/// The caller of the batch-reader should filter the results.
34#[derive(Debug)]
35pub struct BatchReader {
36    /// The raw data to decode.
37    data: Option<Vec<u8>>,
38    /// Decompressed data.
39    pub decompressed: Vec<u8>,
40    /// The current cursor in the `decompressed` data.
41    cursor: usize,
42    /// The maximum RLP bytes per channel.
43    max_rlp_bytes_per_channel: usize,
44    /// Whether brotli decompression was used.
45    pub brotli_used: bool,
46}
47
48impl BatchReader {
49    /// ZLIB Deflate Compression Method.
50    pub const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
51
52    /// ZLIB Reserved Compression Info.
53    pub const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
54
55    /// Brotli Compression Channel Version.
56    pub const CHANNEL_VERSION_BROTLI: u8 = 1;
57
58    /// Creates a new [`BatchReader`] from the given data and max decompressed RLP bytes per
59    /// channel.
60    pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
61    where
62        T: Into<Vec<u8>>,
63    {
64        Self {
65            data: Some(data.into()),
66            decompressed: Vec::new(),
67            cursor: 0,
68            max_rlp_bytes_per_channel,
69            brotli_used: false,
70        }
71    }
72
73    /// Helper method to decompress the data contained in the reader.
74    pub fn decompress(&mut self) -> Result<(), DecompressionError> {
75        if let Some(data) = self.data.take() {
76            // Peek at the data to determine the compression type.
77            if data.is_empty() {
78                return Err(DecompressionError::EmptyData);
79            }
80
81            let compression_type = data[0];
82            if (compression_type & 0x0F) == Self::ZLIB_DEFLATE_COMPRESSION_METHOD ||
83                (compression_type & 0x0F) == Self::ZLIB_RESERVED_COMPRESSION_METHOD
84            {
85                self.decompressed =
86                    decompress_to_vec_zlib(&data).map_err(|_| DecompressionError::ZlibError)?;
87
88                // Check the size of the decompressed channel RLP.
89                if self.decompressed.len() > self.max_rlp_bytes_per_channel {
90                    return Err(DecompressionError::RlpTooLarge(
91                        self.decompressed.len(),
92                        self.max_rlp_bytes_per_channel,
93                    ));
94                }
95            } else if compression_type == Self::CHANNEL_VERSION_BROTLI {
96                self.brotli_used = true;
97                self.decompressed = decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel)?;
98            } else {
99                return Err(DecompressionError::UnsupportedType(compression_type));
100            }
101        }
102        Ok(())
103    }
104
105    /// Pulls out the next batch from the reader.
106    pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
107        // Ensure the data is decompressed.
108        self.decompress().ok()?;
109
110        // Decompress and RLP decode the batch data, before finally decoding the batch itself.
111        let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
112        let bytes = Bytes::decode(decompressed_reader).ok()?;
113        let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
114            return None;
115        };
116
117        // Confirm that brotli decompression was performed *after* the Fjord hardfork.
118        if self.brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
119            return None;
120        }
121
122        // Advance the cursor on the reader.
123        self.cursor = self.decompressed.len() - decompressed_reader.len();
124        Some(batch)
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use super::*;
131    use kona_genesis::{
132        HardForkConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
133    };
134
135    fn new_compressed_batch_data() -> Bytes {
136        let file_contents =
137            alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
138        let file_contents = &(&*file_contents)[..file_contents.len() - 1];
139        let data = alloy_primitives::hex::decode(file_contents).unwrap();
140        data.into()
141    }
142
143    #[test]
144    fn test_batch_reader() {
145        let raw = new_compressed_batch_data();
146        let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
147        let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
148        reader.next_batch(&RollupConfig::default()).unwrap();
149        assert_eq!(reader.cursor, decompressed_len);
150    }
151
152    #[test]
153    fn test_batch_reader_fjord() {
154        let raw = new_compressed_batch_data();
155        let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
156        let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
157        reader
158            .next_batch(&RollupConfig {
159                hardforks: HardForkConfig { fjord_time: Some(0), ..Default::default() },
160                ..Default::default()
161            })
162            .unwrap();
163        assert_eq!(reader.cursor, decompressed_len);
164    }
165}