kona_protocol/batch/
reader.rs

1//! Contains the [BatchReader] which is used to iteratively consume batches from raw data.
2
3use crate::{Batch, decompress_brotli};
4use alloc::vec::Vec;
5use alloy_primitives::Bytes;
6use alloy_rlp::Decodable;
7use kona_genesis::RollupConfig;
8use miniz_oxide::inflate::decompress_to_vec_zlib;
9/// ZLIB Deflate Compression Method.
10const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
11
12/// ZLIB Reserved Compression Info.
13const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
14
15/// Brotili Compression Channel Version.
16const CHANNEL_VERSION_BROTLI: u8 = 1;
17
18/// Batch Reader provides a function that iteratively consumes batches from the reader.
19/// The L1Inclusion block is also provided at creation time.
20/// Warning: the batch reader can read every batch-type.
21/// The caller of the batch-reader should filter the results.
22#[derive(Debug)]
23pub struct BatchReader {
24    /// The raw data to decode.
25    data: Option<Vec<u8>>,
26    /// Decompressed data.
27    decompressed: Vec<u8>,
28    /// The current cursor in the `decompressed` data.
29    cursor: usize,
30    /// The maximum RLP bytes per channel.
31    max_rlp_bytes_per_channel: usize,
32}
33
34impl BatchReader {
35    /// Creates a new [BatchReader] from the given data and max decompressed RLP bytes per channel.
36    pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
37    where
38        T: Into<Vec<u8>>,
39    {
40        Self {
41            data: Some(data.into()),
42            decompressed: Vec::new(),
43            cursor: 0,
44            max_rlp_bytes_per_channel,
45        }
46    }
47
48    /// Pulls out the next batch from the reader.
49    pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
50        // If the data is not already decompressed, decompress it.
51        let mut brotli_used = false;
52
53        if let Some(data) = self.data.take() {
54            // Peek at the data to determine the compression type.
55            if data.is_empty() {
56                return None;
57            }
58
59            let compression_type = data[0];
60            if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD ||
61                (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD
62            {
63                self.decompressed = decompress_to_vec_zlib(&data).ok()?;
64
65                // Check the size of the decompressed channel RLP.
66                if self.decompressed.len() > self.max_rlp_bytes_per_channel {
67                    return None;
68                }
69            } else if compression_type == CHANNEL_VERSION_BROTLI {
70                brotli_used = true;
71                self.decompressed =
72                    decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel).ok()?;
73            } else {
74                return None;
75            }
76        }
77
78        // Decompress and RLP decode the batch data, before finally decoding the batch itself.
79        let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
80        let bytes = Bytes::decode(decompressed_reader).ok()?;
81        let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
82            return None;
83        };
84
85        // Confirm that brotli decompression was performed *after* the Fjord hardfork.
86        if brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
87            return None;
88        }
89
90        // Advance the cursor on the reader.
91        self.cursor = self.decompressed.len() - decompressed_reader.len();
92        Some(batch)
93    }
94}
95
96#[cfg(test)]
97mod test {
98    use super::*;
99    use kona_genesis::{
100        HardForkConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
101    };
102
103    fn new_compressed_batch_data() -> Bytes {
104        let file_contents =
105            alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
106        let file_contents = &(&*file_contents)[..file_contents.len() - 1];
107        let data = alloy_primitives::hex::decode(file_contents).unwrap();
108        data.into()
109    }
110
111    #[test]
112    fn test_batch_reader() {
113        let raw = new_compressed_batch_data();
114        let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
115        let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
116        reader.next_batch(&RollupConfig::default()).unwrap();
117        assert_eq!(reader.cursor, decompressed_len);
118    }
119
120    #[test]
121    fn test_batch_reader_fjord() {
122        let raw = new_compressed_batch_data();
123        let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
124        let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
125        reader
126            .next_batch(&RollupConfig {
127                hardforks: HardForkConfig { fjord_time: Some(0), ..Default::default() },
128                ..Default::default()
129            })
130            .unwrap();
131        assert_eq!(reader.cursor, decompressed_len);
132    }
133}