1use crate::{Batch, ChannelCompressor, ChannelId, CompressorError, Frame};
4use alloc::{vec, vec::Vec};
5use kona_genesis::RollupConfig;
6use rand::{RngCore, SeedableRng, rngs::SmallRng};
7
8const FRAME_V0_OVERHEAD: usize = 23;
10
11#[derive(Debug, Clone, PartialEq, thiserror::Error)]
13pub enum ChannelOutError {
14 #[error("The channel is already closed")]
16 ChannelClosed,
17 #[error("The max frame size is too small")]
19 MaxFrameSizeTooSmall,
20 #[error("Missing compressed batch data")]
22 MissingData,
23 #[error("Error from compression")]
25 Compression(#[from] CompressorError),
26 #[error("Error encoding the batch")]
28 BatchEncoding,
29 #[error("The encoded batch exceeds the max RLP bytes per channel")]
31 ExceedsMaxRlpBytesPerChannel,
32}
33
34#[allow(missing_debug_implementations)]
36pub struct ChannelOut<'a, C>
37where
38 C: ChannelCompressor,
39{
40 pub id: ChannelId,
42 pub config: &'a RollupConfig,
46 pub rlp_length: u64,
48 pub closed: bool,
50 pub frame_number: u16,
52 pub compressor: C,
54}
55
56impl<'a, C> ChannelOut<'a, C>
57where
58 C: ChannelCompressor,
59{
60 pub const fn new(id: ChannelId, config: &'a RollupConfig, compressor: C) -> Self {
62 Self { id, config, rlp_length: 0, frame_number: 0, closed: false, compressor }
63 }
64
65 pub fn reset(&mut self) {
67 self.rlp_length = 0;
68 self.frame_number = 0;
69 self.closed = false;
70 self.compressor.reset();
71 let mut small_rng = SmallRng::seed_from_u64(43);
75 SmallRng::fill_bytes(&mut small_rng, &mut self.id);
76 }
77
78 pub fn add_batch(&mut self, batch: Batch) -> Result<(), ChannelOutError> {
81 if self.closed {
82 return Err(ChannelOutError::ChannelClosed);
83 }
84
85 let mut buf = vec![];
87 batch.encode(&mut buf).map_err(|_| ChannelOutError::BatchEncoding)?;
88
89 let max_rlp_bytes_per_channel = self.config.max_rlp_bytes_per_channel(batch.timestamp());
91 if self.rlp_length + buf.len() as u64 > max_rlp_bytes_per_channel {
92 return Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel);
93 }
94
95 self.compressor.write(&buf)?;
96
97 Ok(())
98 }
99
100 pub const fn input_bytes(&self) -> u64 {
102 self.rlp_length
103 }
104
105 pub fn ready_bytes(&self) -> usize {
107 self.compressor.len()
108 }
109
110 pub fn flush(&mut self) -> Result<(), ChannelOutError> {
112 self.compressor.flush()?;
113 Ok(())
114 }
115
116 pub fn close(&mut self) {
118 self.closed = true;
119 }
120
121 pub fn output_frame(&mut self, max_size: usize) -> Result<Frame, ChannelOutError> {
123 if max_size < FRAME_V0_OVERHEAD {
124 return Err(ChannelOutError::MaxFrameSizeTooSmall);
125 }
126
127 let mut frame =
129 Frame { id: self.id, number: self.frame_number, is_last: self.closed, data: vec![] };
130
131 let mut max_size = max_size - FRAME_V0_OVERHEAD;
132 if max_size > self.ready_bytes() {
133 max_size = self.ready_bytes();
134 }
135
136 let mut data = Vec::with_capacity(max_size);
138 self.compressor.read(&mut data).map_err(ChannelOutError::Compression)?;
139 frame.data.extend_from_slice(data.as_slice());
140
141 self.frame_number += 1;
143 Ok(frame)
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::{CompressorWriter, SingleBatch, SpanBatch, test_utils::MockCompressor};
151
152 #[test]
153 fn test_output_frame_max_size_too_small() {
154 let config = RollupConfig::default();
155 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
156 assert_eq!(channel.output_frame(0), Err(ChannelOutError::MaxFrameSizeTooSmall));
157 }
158
159 #[test]
160 fn test_channel_out_output_frame_no_data() {
161 let config = RollupConfig::default();
162 let mut channel = ChannelOut::new(
163 ChannelId::default(),
164 &config,
165 MockCompressor { read_error: true, compressed: Some(Default::default()) },
166 );
167 let err = channel.output_frame(FRAME_V0_OVERHEAD).unwrap_err();
168 assert_eq!(err, ChannelOutError::Compression(CompressorError::Full));
169 }
170
171 #[test]
172 fn test_channel_out_output() {
173 let config = RollupConfig::default();
174 let mut channel = ChannelOut::new(
175 ChannelId::default(),
176 &config,
177 MockCompressor { compressed: Some(Default::default()), ..Default::default() },
178 );
179 let frame = channel.output_frame(FRAME_V0_OVERHEAD).unwrap();
180 assert_eq!(frame.id, ChannelId::default());
181 assert_eq!(frame.number, 0);
182 assert!(!frame.is_last);
183 }
184
185 #[test]
186 fn test_channel_out_reset() {
187 let config = RollupConfig::default();
188 let mut channel = ChannelOut {
189 id: ChannelId::default(),
190 config: &config,
191 rlp_length: 10,
192 closed: true,
193 frame_number: 11,
194 compressor: MockCompressor::default(),
195 };
196 channel.reset();
197 assert_eq!(channel.rlp_length, 0);
198 assert_eq!(channel.frame_number, 0);
199 assert!(channel.id != ChannelId::default());
203 assert!(!channel.closed);
204 }
205
206 #[test]
207 fn test_channel_out_ready_bytes_empty() {
208 let config = RollupConfig::default();
209 let channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
210 assert_eq!(channel.ready_bytes(), 0);
211 }
212
213 #[test]
214 fn test_channel_out_ready_bytes_some() {
215 let config = RollupConfig::default();
216 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
217 channel.compressor.write(&[1, 2, 3]).unwrap();
218 assert_eq!(channel.ready_bytes(), 3);
219 }
220
221 #[test]
222 fn test_channel_out_close() {
223 let config = RollupConfig::default();
224 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
225 assert!(!channel.closed);
226
227 channel.close();
228 assert!(channel.closed);
229 }
230
231 #[test]
232 fn test_channel_out_add_batch_closed() {
233 let config = RollupConfig::default();
234 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
235 channel.close();
236
237 let batch = Batch::Single(SingleBatch::default());
238 assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ChannelClosed));
239 }
240
241 #[test]
242 fn test_channel_out_empty_span_batch_decode_error() {
243 let config = RollupConfig::default();
244 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
245
246 let batch = Batch::Span(SpanBatch::default());
247 assert_eq!(channel.add_batch(batch), Err(ChannelOutError::BatchEncoding));
248 }
249
250 #[test]
251 fn test_channel_out_max_rlp_bytes_per_channel() {
252 let config = RollupConfig::default();
253 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
254
255 let batch = Batch::Single(SingleBatch::default());
256 channel.rlp_length = config.max_rlp_bytes_per_channel(batch.timestamp());
257
258 assert_eq!(channel.add_batch(batch), Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel));
259 }
260
261 #[test]
262 fn test_channel_out_add_batch() {
263 let config = RollupConfig::default();
264 let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default());
265
266 let batch = Batch::Single(SingleBatch::default());
267 assert_eq!(channel.add_batch(batch), Ok(()));
268 }
269}