kona_protocol/
channel_out.rs

1//! Contains the `ChannelOut` primitive for Optimism.
2
3use crate::{Batch, ChannelCompressor, ChannelId, CompressorError, Frame};
4use alloc::{vec, vec::Vec};
5use kona_genesis::RollupConfig;
6use rand::{RngCore, SeedableRng, rngs::SmallRng};
7
8/// The frame overhead.
9const FRAME_V0_OVERHEAD: usize = 23;
10
11/// An error returned by the [ChannelOut] when adding single batches.
12#[derive(Debug, Clone, PartialEq, thiserror::Error)]
13pub enum ChannelOutError {
14    /// The channel is closed.
15    #[error("The channel is already closed")]
16    ChannelClosed,
17    /// The max frame size is too small.
18    #[error("The max frame size is too small")]
19    MaxFrameSizeTooSmall,
20    /// Missing compressed batch data.
21    #[error("Missing compressed batch data")]
22    MissingData,
23    /// An error from compression.
24    #[error("Error from compression")]
25    Compression(#[from] CompressorError),
26    /// An error encoding the `Batch`.
27    #[error("Error encoding the batch")]
28    BatchEncoding,
29    /// The encoded batch exceeds the max RLP bytes per channel.
30    #[error("The encoded batch exceeds the max RLP bytes per channel")]
31    ExceedsMaxRlpBytesPerChannel,
32}
33
34/// [ChannelOut] constructs a channel from compressed, encoded batch data.
35#[allow(missing_debug_implementations)]
36pub struct ChannelOut<'a, C>
37where
38    C: ChannelCompressor,
39{
40    /// The unique identifier for the channel.
41    pub id: ChannelId,
42    /// A reference to the [RollupConfig] used to
43    /// check the max RLP bytes per channel when
44    /// encoding and accepting the
45    pub config: &'a RollupConfig,
46    /// The rlp length of the channel.
47    pub rlp_length: u64,
48    /// Whether the channel is closed.
49    pub closed: bool,
50    /// The frame number.
51    pub frame_number: u16,
52    /// The compressor.
53    pub compressor: C,
54}
55
56impl<'a, C> ChannelOut<'a, C>
57where
58    C: ChannelCompressor,
59{
60    /// Creates a new [ChannelOut] with the given [ChannelId].
61    pub const fn new(id: ChannelId, config: &'a RollupConfig, compressor: C) -> Self {
62        Self { id, config, rlp_length: 0, frame_number: 0, closed: false, compressor }
63    }
64
65    /// Resets the [ChannelOut] to its initial state.
66    pub fn reset(&mut self) {
67        self.rlp_length = 0;
68        self.frame_number = 0;
69        self.closed = false;
70        self.compressor.reset();
71        // `getrandom` isn't available for wasm and risc targets
72        // Thread-based RNGs are not available for no_std
73        // So we must use a seeded RNG.
74        let mut small_rng = SmallRng::seed_from_u64(43);
75        SmallRng::fill_bytes(&mut small_rng, &mut self.id);
76    }
77
78    /// Accepts the given [crate::Batch] data into the [ChannelOut], compressing it
79    /// into frames.
80    pub fn add_batch(&mut self, batch: Batch) -> Result<(), ChannelOutError> {
81        if self.closed {
82            return Err(ChannelOutError::ChannelClosed);
83        }
84
85        // Encode the batch.
86        let mut buf = vec![];
87        batch.encode(&mut buf).map_err(|_| ChannelOutError::BatchEncoding)?;
88
89        // Validate that the RLP length is within the channel's limits.
90        let max_rlp_bytes_per_channel = self.config.max_rlp_bytes_per_channel(batch.timestamp());
91        if self.rlp_length + buf.len() as u64 > max_rlp_bytes_per_channel {
92            return Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel);
93        }
94
95        self.compressor.write(&buf)?;
96
97        Ok(())
98    }
99
100    /// Returns the total amount of rlp-encoded input bytes.
101    pub const fn input_bytes(&self) -> u64 {
102        self.rlp_length
103    }
104
105    /// Returns the number of bytes ready to be output to a frame.
106    pub fn ready_bytes(&self) -> usize {
107        self.compressor.len()
108    }
109
110    /// Flush the internal compressor.
111    pub fn flush(&mut self) -> Result<(), ChannelOutError> {
112        self.compressor.flush()?;
113        Ok(())
114    }
115
116    /// Closes the channel if not already closed.
117    pub fn close(&mut self) {
118        self.closed = true;
119    }
120
121    /// Outputs a [Frame] from the [ChannelOut].
122    pub fn output_frame(&mut self, max_size: usize) -> Result<Frame, ChannelOutError> {
123        if max_size < FRAME_V0_OVERHEAD {
124            return Err(ChannelOutError::MaxFrameSizeTooSmall);
125        }
126
127        // Construct an empty frame.
128        let mut frame =
129            Frame { id: self.id, number: self.frame_number, is_last: self.closed, data: vec![] };
130
131        let mut max_size = max_size - FRAME_V0_OVERHEAD;
132        if max_size > self.ready_bytes() {
133            max_size = self.ready_bytes();
134        }
135
136        // Read `max_size` bytes from the compressed data.
137        let mut data = Vec::with_capacity(max_size);
138        self.compressor.read(&mut data).map_err(ChannelOutError::Compression)?;
139        frame.data.extend_from_slice(data.as_slice());
140
141        // Update the compressed data.
142        self.frame_number += 1;
143        Ok(frame)
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::{CompressorWriter, SingleBatch, SpanBatch, test_utils::MockCompressor};
151
152    #[test]
153    fn test_output_frame_max_size_too_small() {
154        let config = RollupConfig::default();
155        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
156        assert_eq!(channel.output_frame(0), Err(ChannelOutError::MaxFrameSizeTooSmall));
157    }
158
159    #[test]
160    fn test_channel_out_output_frame_no_data() {
161        let config = RollupConfig::default();
162        let mut channel = ChannelOut::new(
163            ChannelId::default(),
164            &config,
165            MockCompressor { read_error: true, compressed: Some(Default::default()) },
166        );
167        let err = channel.output_frame(FRAME_V0_OVERHEAD).unwrap_err();
168        assert_eq!(err, ChannelOutError::Compression(CompressorError::Full));
169    }
170
171    #[test]
172    fn test_channel_out_output() {
173        let config = RollupConfig::default();
174        let mut channel = ChannelOut::new(
175            ChannelId::default(),
176            &config,
177            MockCompressor { compressed: Some(Default::default()), ..Default::default() },
178        );
179        let frame = channel.output_frame(FRAME_V0_OVERHEAD).unwrap();
180        assert_eq!(frame.id, ChannelId::default());
181        assert_eq!(frame.number, 0);
182        assert!(!frame.is_last);
183    }
184
185    #[test]
186    fn test_channel_out_reset() {
187        let config = RollupConfig::default();
188        let mut channel = ChannelOut {
189            id: ChannelId::default(),
190            config: &config,
191            rlp_length: 10,
192            closed: true,
193            frame_number: 11,
194            compressor: MockCompressor::default(),
195        };
196        channel.reset();
197        assert_eq!(channel.rlp_length, 0);
198        assert_eq!(channel.frame_number, 0);
199        // The odds of a randomized channel id being equal to the
200        // default are so astronomically low, this test will always pass.
201        // The randomized [u8; 16] is about 1/255^16.
202        assert!(channel.id != ChannelId::default());
203        assert!(!channel.closed);
204    }
205
206    #[test]
207    fn test_channel_out_ready_bytes_empty() {
208        let config = RollupConfig::default();
209        let channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
210        assert_eq!(channel.ready_bytes(), 0);
211    }
212
213    #[test]
214    fn test_channel_out_ready_bytes_some() {
215        let config = RollupConfig::default();
216        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
217        channel.compressor.write(&[1, 2, 3]).unwrap();
218        assert_eq!(channel.ready_bytes(), 3);
219    }
220
221    #[test]
222    fn test_channel_out_close() {
223        let config = RollupConfig::default();
224        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
225        assert!(!channel.closed);
226
227        channel.close();
228        assert!(channel.closed);
229    }
230
231    #[test]
232    fn test_channel_out_add_batch_closed() {
233        let config = RollupConfig::default();
234        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
235        channel.close();
236
237        let batch = Batch::Single(SingleBatch::default());
238        assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ChannelClosed));
239    }
240
241    #[test]
242    fn test_channel_out_empty_span_batch_decode_error() {
243        let config = RollupConfig::default();
244        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
245
246        let batch = Batch::Span(SpanBatch::default());
247        assert_eq!(channel.add_batch(batch), Err(ChannelOutError::BatchEncoding));
248    }
249
250    #[test]
251    fn test_channel_out_max_rlp_bytes_per_channel() {
252        let config = RollupConfig::default();
253        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
254
255        let batch = Batch::Single(SingleBatch::default());
256        channel.rlp_length = config.max_rlp_bytes_per_channel(batch.timestamp());
257
258        assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel));
259    }
260
261    #[test]
262    fn test_channel_out_add_batch() {
263        let config = RollupConfig::default();
264        let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
265
266        let batch = Batch::Single(SingleBatch::default());
267        assert_eq!(channel.add_batch(batch), Ok(()));
268    }
269}