use crate::{Batch, ChannelCompressor, ChannelId, CompressorError, Frame};
use alloc::{vec, vec::Vec};
use maili_genesis::RollupConfig;
use rand::{rngs::SmallRng, RngCore, SeedableRng};
const FRAME_V0_OVERHEAD: usize = 23;
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum ChannelOutError {
#[error("The channel is already closed")]
ChannelClosed,
#[error("The max frame size is too small")]
MaxFrameSizeTooSmall,
#[error("Missing compressed batch data")]
MissingData,
#[error("Error from compression")]
Compression(#[from] CompressorError),
#[error("Error encoding the batch")]
BatchEncoding,
#[error("The encoded batch exceeds the max RLP bytes per channel")]
ExceedsMaxRlpBytesPerChannel,
}
#[allow(missing_debug_implementations)]
pub struct ChannelOut<'a, C>
where
C: ChannelCompressor,
{
pub id: ChannelId,
pub config: &'a RollupConfig,
pub rlp_length: u64,
pub closed: bool,
pub frame_number: u16,
pub compressor: C,
}
impl<'a, C> ChannelOut<'a, C>
where
C: ChannelCompressor,
{
pub const fn new(id: ChannelId, config: &'a RollupConfig, compressor: C) -> Self {
Self { id, config, rlp_length: 0, frame_number: 0, closed: false, compressor }
}
pub fn reset(&mut self) {
self.rlp_length = 0;
self.frame_number = 0;
self.closed = false;
self.compressor.reset();
let mut small_rng = SmallRng::seed_from_u64(43);
SmallRng::fill_bytes(&mut small_rng, &mut self.id);
}
pub fn add_batch(&mut self, batch: Batch) -> Result<(), ChannelOutError> {
if self.closed {
return Err(ChannelOutError::ChannelClosed);
}
let mut buf = vec![];
batch.encode(&mut buf).map_err(|_| ChannelOutError::BatchEncoding)?;
let max_rlp_bytes_per_channel = self.config.max_rlp_bytes_per_channel(batch.timestamp());
if self.rlp_length + buf.len() as u64 > max_rlp_bytes_per_channel {
return Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel);
}
self.compressor.write(&buf)?;
Ok(())
}
pub const fn input_bytes(&self) -> u64 {
self.rlp_length
}
pub fn ready_bytes(&self) -> usize {
self.compressor.len()
}
pub fn flush(&mut self) -> Result<(), ChannelOutError> {
self.compressor.flush()?;
Ok(())
}
pub fn close(&mut self) {
self.closed = true;
}
pub fn output_frame(&mut self, max_size: usize) -> Result<Frame, ChannelOutError> {
if max_size < FRAME_V0_OVERHEAD {
return Err(ChannelOutError::MaxFrameSizeTooSmall);
}
let mut frame =
Frame { id: self.id, number: self.frame_number, is_last: self.closed, data: vec![] };
let mut max_size = max_size - FRAME_V0_OVERHEAD;
if max_size > self.ready_bytes() {
max_size = self.ready_bytes();
}
let mut data = Vec::with_capacity(max_size);
self.compressor.read(&mut data).map_err(ChannelOutError::Compression)?;
frame.data.extend_from_slice(data.as_slice());
self.frame_number += 1;
Ok(frame)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::MockCompressor, CompressorWriter, SingleBatch, SpanBatch};
#[test]
fn test_output_frame_max_size_too_small() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
assert_eq!(channel.output_frame(0), Err(ChannelOutError::MaxFrameSizeTooSmall));
}
#[test]
fn test_channel_out_output_frame_no_data() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(
ChannelId::default(),
&config,
MockCompressor { read_error: true, compressed: Some(Default::default()) },
);
let err = channel.output_frame(FRAME_V0_OVERHEAD).unwrap_err();
assert_eq!(err, ChannelOutError::Compression(CompressorError::Full));
}
#[test]
fn test_channel_out_output() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(
ChannelId::default(),
&config,
MockCompressor { compressed: Some(Default::default()), ..Default::default() },
);
let frame = channel.output_frame(FRAME_V0_OVERHEAD).unwrap();
assert_eq!(frame.id, ChannelId::default());
assert_eq!(frame.number, 0);
assert!(!frame.is_last);
}
#[test]
fn test_channel_out_reset() {
let config = RollupConfig::default();
let mut channel = ChannelOut {
id: ChannelId::default(),
config: &config,
rlp_length: 10,
closed: true,
frame_number: 11,
compressor: MockCompressor::default(),
};
channel.reset();
assert_eq!(channel.rlp_length, 0);
assert_eq!(channel.frame_number, 0);
assert!(channel.id != ChannelId::default());
assert!(!channel.closed);
}
#[test]
fn test_channel_out_ready_bytes_empty() {
let config = RollupConfig::default();
let channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
assert_eq!(channel.ready_bytes(), 0);
}
#[test]
fn test_channel_out_ready_bytes_some() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
channel.compressor.write(&[1, 2, 3]).unwrap();
assert_eq!(channel.ready_bytes(), 3);
}
#[test]
fn test_channel_out_close() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
assert!(!channel.closed);
channel.close();
assert!(channel.closed);
}
#[test]
fn test_channel_out_add_batch_closed() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
channel.close();
let batch = Batch::Single(SingleBatch::default());
assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ChannelClosed));
}
#[test]
fn test_channel_out_empty_span_batch_decode_error() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
let batch = Batch::Span(SpanBatch::default());
assert_eq!(channel.add_batch(batch), Err(ChannelOutError::BatchEncoding));
}
#[test]
fn test_channel_out_max_rlp_bytes_per_channel() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
let batch = Batch::Single(SingleBatch::default());
channel.rlp_length = config.max_rlp_bytes_per_channel(batch.timestamp());
assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel));
}
#[test]
fn test_channel_out_add_batch() {
let config = RollupConfig::default();
let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
let batch = Batch::Single(SingleBatch::default());
assert_eq!(channel.add_batch(batch), Ok(()));
}
}