op_alloy_protocol/batch/
reader.rsuse crate::{decompress_brotli, Batch};
use alloc::vec::Vec;
use alloy_primitives::Bytes;
use alloy_rlp::Decodable;
use miniz_oxide::inflate::decompress_to_vec_zlib;
use op_alloy_genesis::RollupConfig;
const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
const CHANNEL_VERSION_BROTLI: u8 = 1;
#[derive(Debug)]
pub struct BatchReader {
data: Option<Vec<u8>>,
decompressed: Vec<u8>,
cursor: usize,
max_rlp_bytes_per_channel: usize,
}
impl BatchReader {
pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
where
T: Into<Vec<u8>>,
{
Self {
data: Some(data.into()),
decompressed: Vec::new(),
cursor: 0,
max_rlp_bytes_per_channel,
}
}
pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
let mut brotli_used = false;
if let Some(data) = self.data.take() {
if data.is_empty() {
return None;
}
let compression_type = data[0];
if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD
|| (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD
{
self.decompressed = decompress_to_vec_zlib(&data).ok()?;
if self.decompressed.len() > self.max_rlp_bytes_per_channel {
return None;
}
} else if compression_type == CHANNEL_VERSION_BROTLI {
brotli_used = true;
self.decompressed =
decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel).ok()?;
} else {
return None;
}
}
let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
let bytes = Bytes::decode(decompressed_reader).ok()?;
let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
return None;
};
if brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
return None;
}
self.cursor = self.decompressed.len() - decompressed_reader.len();
Some(batch)
}
}
#[cfg(test)]
mod test {
use super::*;
use op_alloy_genesis::{MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD};
fn new_compressed_batch_data() -> Bytes {
let file_contents =
alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
let file_contents = &(&*file_contents)[..file_contents.len() - 1];
let data = alloy_primitives::hex::decode(file_contents).unwrap();
data.into()
}
#[test]
fn test_batch_reader() {
let raw = new_compressed_batch_data();
let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
reader.next_batch(&RollupConfig::default()).unwrap();
assert_eq!(reader.cursor, decompressed_len);
}
#[test]
fn test_batch_reader_fjord() {
let raw = new_compressed_batch_data();
let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
reader.next_batch(&RollupConfig { fjord_time: Some(0), ..Default::default() }).unwrap();
assert_eq!(reader.cursor, decompressed_len);
}
}