kona_protocol/batch/
reader.rs1use crate::{Batch, decompress_brotli};
4use alloc::vec::Vec;
5use alloy_primitives::Bytes;
6use alloy_rlp::Decodable;
7use kona_genesis::RollupConfig;
8use miniz_oxide::inflate::decompress_to_vec_zlib;
9const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
11
12const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
14
15const CHANNEL_VERSION_BROTLI: u8 = 1;
17
18#[derive(Debug)]
23pub struct BatchReader {
24 data: Option<Vec<u8>>,
26 decompressed: Vec<u8>,
28 cursor: usize,
30 max_rlp_bytes_per_channel: usize,
32}
33
34impl BatchReader {
35 pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
37 where
38 T: Into<Vec<u8>>,
39 {
40 Self {
41 data: Some(data.into()),
42 decompressed: Vec::new(),
43 cursor: 0,
44 max_rlp_bytes_per_channel,
45 }
46 }
47
48 pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
50 let mut brotli_used = false;
52
53 if let Some(data) = self.data.take() {
54 if data.is_empty() {
56 return None;
57 }
58
59 let compression_type = data[0];
60 if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD ||
61 (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD
62 {
63 self.decompressed = decompress_to_vec_zlib(&data).ok()?;
64
65 if self.decompressed.len() > self.max_rlp_bytes_per_channel {
67 return None;
68 }
69 } else if compression_type == CHANNEL_VERSION_BROTLI {
70 brotli_used = true;
71 self.decompressed =
72 decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel).ok()?;
73 } else {
74 return None;
75 }
76 }
77
78 let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
80 let bytes = Bytes::decode(decompressed_reader).ok()?;
81 let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
82 return None;
83 };
84
85 if brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
87 return None;
88 }
89
90 self.cursor = self.decompressed.len() - decompressed_reader.len();
92 Some(batch)
93 }
94}
95
96#[cfg(test)]
97mod test {
98 use super::*;
99 use kona_genesis::{
100 HardForkConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
101 };
102
103 fn new_compressed_batch_data() -> Bytes {
104 let file_contents =
105 alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
106 let file_contents = &(&*file_contents)[..file_contents.len() - 1];
107 let data = alloy_primitives::hex::decode(file_contents).unwrap();
108 data.into()
109 }
110
111 #[test]
112 fn test_batch_reader() {
113 let raw = new_compressed_batch_data();
114 let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
115 let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
116 reader.next_batch(&RollupConfig::default()).unwrap();
117 assert_eq!(reader.cursor, decompressed_len);
118 }
119
120 #[test]
121 fn test_batch_reader_fjord() {
122 let raw = new_compressed_batch_data();
123 let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
124 let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
125 reader
126 .next_batch(&RollupConfig {
127 hardforks: HardForkConfig { fjord_time: Some(0), ..Default::default() },
128 ..Default::default()
129 })
130 .unwrap();
131 assert_eq!(reader.cursor, decompressed_len);
132 }
133}