use crate::{Batch, BrotliDecompressionError, decompress_brotli};
use alloc::vec::Vec;
use alloy_primitives::Bytes;
use alloy_rlp::Decodable;
use kona_genesis::RollupConfig;
use miniz_oxide::inflate::decompress_to_vec_zlib;
#[derive(Debug, thiserror::Error)]
pub enum DecompressionError {
#[error("the data to decompress was empty")]
EmptyData,
#[error("the compression type {0} is not supported")]
UnsupportedType(u8),
#[error("brotli decompression error: {0}")]
BrotliError(#[from] BrotliDecompressionError),
#[error("zlib decompression error")]
ZlibError,
#[error("the RLP data is too large: {0} bytes, maximum allowed: {1} bytes")]
RlpTooLarge(usize, usize),
}
#[derive(Debug)]
pub struct BatchReader {
data: Option<Vec<u8>>,
pub decompressed: Vec<u8>,
cursor: usize,
max_rlp_bytes_per_channel: usize,
pub brotli_used: bool,
}
impl BatchReader {
pub const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8;
pub const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15;
pub const CHANNEL_VERSION_BROTLI: u8 = 1;
pub fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self
where
T: Into<Vec<u8>>,
{
Self {
data: Some(data.into()),
decompressed: Vec::new(),
cursor: 0,
max_rlp_bytes_per_channel,
brotli_used: false,
}
}
pub fn decompress(&mut self) -> Result<(), DecompressionError> {
if let Some(data) = self.data.take() {
if data.is_empty() {
return Err(DecompressionError::EmptyData);
}
let compression_type = data[0];
if (compression_type & 0x0F) == Self::ZLIB_DEFLATE_COMPRESSION_METHOD ||
(compression_type & 0x0F) == Self::ZLIB_RESERVED_COMPRESSION_METHOD
{
self.decompressed =
decompress_to_vec_zlib(&data).map_err(|_| DecompressionError::ZlibError)?;
if self.decompressed.len() > self.max_rlp_bytes_per_channel {
return Err(DecompressionError::RlpTooLarge(
self.decompressed.len(),
self.max_rlp_bytes_per_channel,
));
}
} else if compression_type == Self::CHANNEL_VERSION_BROTLI {
self.brotli_used = true;
self.decompressed = decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel)?;
} else {
return Err(DecompressionError::UnsupportedType(compression_type));
}
}
Ok(())
}
pub fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
self.decompress().ok()?;
let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
let bytes = Bytes::decode(decompressed_reader).ok()?;
let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
return None;
};
if self.brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
return None;
}
self.cursor = self.decompressed.len() - decompressed_reader.len();
Some(batch)
}
}
#[cfg(test)]
mod test {
use super::*;
use kona_genesis::{
HardForkConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
};
fn new_compressed_batch_data() -> Bytes {
let file_contents =
alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex"));
let file_contents = &(&*file_contents)[..file_contents.len() - 1];
let data = alloy_primitives::hex::decode(file_contents).unwrap();
data.into()
}
#[test]
fn test_batch_reader() {
let raw = new_compressed_batch_data();
let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize);
reader.next_batch(&RollupConfig::default()).unwrap();
assert_eq!(reader.cursor, decompressed_len);
}
#[test]
fn test_batch_reader_fjord() {
let raw = new_compressed_batch_data();
let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len();
let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize);
reader
.next_batch(&RollupConfig {
hardforks: HardForkConfig { fjord_time: Some(0), ..Default::default() },
..Default::default()
})
.unwrap();
assert_eq!(reader.cursor, decompressed_len);
}
}