kona_protocol/batch/
reader.rs1use crate::{Batch, BrotliDecompressionError, decompress_brotli};
4use alloc::vec::Vec;
5use alloy_primitives::Bytes;
6use alloy_rlp::Decodable;
7use kona_genesis::RollupConfig;
8use miniz_oxide::inflate::decompress_to_vec_zlib;
9
10#[derive(Debug, thiserror::Error)]
12pub enum DecompressionError {
13 #[error("the data to decompress was empty")]
15 EmptyData,
16 #[error("the compression type {0} is not supported")]
18 UnsupportedType(u8),
19 #[error("brotli decompression error: {0}")]
21 BrotliError(#[from] BrotliDecompressionError),
22 #[error("zlib decompression error")]
24 ZlibError,
25 #[error("the RLP data is too large: {0} bytes, maximum allowed: {1} bytes")]
27 RlpTooLarge(usize, usize),
28}
29
30#[derive(Debug)]
35pub struct BatchReader {
36 data: Option<Vec<u8>>,
38 pub decompressed: Vec<u8>,
40 cursor: usize,
42 max_rlp_bytes_per_channel: usize,
44 pub brotli_used: bool,
46}
47
48impl BatchReader {
49 pub const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
51
52 pub const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
54
55 pub const CHANNEL_VERSION_BROTLI: u8 = 1;
57
58 pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
61 where
62 T: Into<Vec<u8>>,
63 {
64 Self {
65 data: Some(data.into()),
66 decompressed: Vec::new(),
67 cursor: 0,
68 max_rlp_bytes_per_channel,
69 brotli_used: false,
70 }
71 }
72
73 pub fn decompress(&mut self) -> Result<(), DecompressionError> {
75 if let Some(data) = self.data.take() {
76 if data.is_empty() {
78 return Err(DecompressionError::EmptyData);
79 }
80
81 let compression_type = data[0];
82 if (compression_type & 0x0F) == Self::ZLIB_DEFLATE_COMPRESSION_METHOD ||
83 (compression_type & 0x0F) == Self::ZLIB_RESERVED_COMPRESSION_METHOD
84 {
85 self.decompressed =
86 decompress_to_vec_zlib(&data).map_err(|_| DecompressionError::ZlibError)?;
87
88 if self.decompressed.len() > self.max_rlp_bytes_per_channel {
90 return Err(DecompressionError::RlpTooLarge(
91 self.decompressed.len(),
92 self.max_rlp_bytes_per_channel,
93 ));
94 }
95 } else if compression_type == Self::CHANNEL_VERSION_BROTLI {
96 self.brotli_used = true;
97 self.decompressed = decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel)?;
98 } else {
99 return Err(DecompressionError::UnsupportedType(compression_type));
100 }
101 }
102 Ok(())
103 }
104
105 pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
107 self.decompress().ok()?;
109
110 let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
112 let bytes = Bytes::decode(decompressed_reader).ok()?;
113 let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
114 return None;
115 };
116
117 if self.brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
119 return None;
120 }
121
122 self.cursor = self.decompressed.len() - decompressed_reader.len();
124 Some(batch)
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use super::*;
131 use kona_genesis::{
132 HardForkConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
133 };
134
135 fn new_compressed_batch_data() -> Bytes {
136 let file_contents =
137 alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
138 let file_contents = &(&*file_contents)[..file_contents.len() - 1];
139 let data = alloy_primitives::hex::decode(file_contents).unwrap();
140 data.into()
141 }
142
143 #[test]
144 fn test_batch_reader() {
145 let raw = new_compressed_batch_data();
146 let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
147 let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
148 reader.next_batch(&RollupConfig::default()).unwrap();
149 assert_eq!(reader.cursor, decompressed_len);
150 }
151
152 #[test]
153 fn test_batch_reader_fjord() {
154 let raw = new_compressed_batch_data();
155 let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
156 let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
157 reader
158 .next_batch(&RollupConfig {
159 hardforks: HardForkConfig { fjord_time: Some(0), ..Default::default() },
160 ..Default::default()
161 })
162 .unwrap();
163 assert_eq!(reader.cursor, decompressed_len);
164 }
165}