kona_protocol/
channel.rs

1//! Channel Types
2
3use alloc::vec::Vec;
4use alloy_primitives::{Bytes, map::HashMap};
5
6use crate::{BlockInfo, Frame};
7
8/// [`CHANNEL_ID_LENGTH`] is the length of the channel ID.
9pub const CHANNEL_ID_LENGTH: usize = 16;
10
11/// [`ChannelId`] is an opaque identifier for a channel.
12pub type ChannelId = [u8; CHANNEL_ID_LENGTH];
13
14/// [`MAX_RLP_BYTES_PER_CHANNEL`] is the maximum amount of bytes that will be read from
15/// a channel. This limit is set when decoding the RLP.
16pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;
17
18/// [`FJORD_MAX_RLP_BYTES_PER_CHANNEL`] is the maximum amount of bytes that will be read from
19/// a channel when the Fjord Hardfork is activated. This limit is set when decoding the RLP.
20pub const FJORD_MAX_RLP_BYTES_PER_CHANNEL: u64 = 100_000_000;
21
22/// An error returned when adding a frame to a channel.
23#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum ChannelError {
25    /// The frame id does not match the channel id.
26    #[error("Frame id does not match channel id")]
27    FrameIdMismatch,
28    /// The channel is closed.
29    #[error("Channel is closed")]
30    ChannelClosed,
31    /// The frame number is already in the channel.
32    #[error("Frame number {0} already exists")]
33    FrameNumberExists(usize),
34    /// The frame number is beyond the end frame.
35    #[error("Frame number {0} is beyond end frame")]
36    FrameBeyondEndFrame(usize),
37}
38
39/// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
40///
41/// Frames are allowed to be ingested out of order.
42/// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
43/// channel may mark itself as ready for reading once all intervening frames have been added
44#[derive(Debug, Clone, Default)]
45pub struct Channel {
46    /// The unique identifier for this channel
47    id: ChannelId,
48    /// The block that the channel is currently open at
49    open_block: BlockInfo,
50    /// Estimated memory size, used to drop the channel if we have too much data
51    estimated_size: usize,
52    /// True if the last frame has been buffered
53    closed: bool,
54    /// The highest frame number that has been ingested
55    highest_frame_number: u16,
56    /// The frame number of the frame where `is_last` is true
57    /// No other frame number may be higher than this
58    last_frame_number: u16,
59    /// Store a map of frame number to frame for constant time ordering
60    inputs: HashMap<u16, Frame>,
61    /// The highest L1 inclusion block that a frame was included in
62    highest_l1_inclusion_block: BlockInfo,
63}
64
65impl Channel {
66    /// Create a new [`Channel`] with the given [`ChannelId`] and [`BlockInfo`].
67    pub fn new(id: ChannelId, open_block: BlockInfo) -> Self {
68        Self { id, open_block, inputs: HashMap::default(), ..Default::default() }
69    }
70
71    /// Returns the current [`ChannelId`] for the channel.
72    pub const fn id(&self) -> ChannelId {
73        self.id
74    }
75
76    /// Returns the number of frames ingested.
77    pub fn len(&self) -> usize {
78        self.inputs.len()
79    }
80
81    /// Returns if the channel is empty.
82    pub fn is_empty(&self) -> bool {
83        self.inputs.is_empty()
84    }
85
86    /// Add a frame to the channel.
87    ///
88    /// ## Takes
89    /// - `frame`: The frame to add to the channel
90    /// - `l1_inclusion_block`: The block that the frame was included in
91    ///
92    /// ## Returns
93    /// - `Ok(()):` If the frame was successfully buffered
94    /// - `Err(_):` If the frame was invalid
95    pub fn add_frame(
96        &mut self,
97        frame: Frame,
98        l1_inclusion_block: BlockInfo,
99    ) -> Result<(), ChannelError> {
100        // Ensure that the frame ID is equal to the channel ID.
101        if frame.id != self.id {
102            return Err(ChannelError::FrameIdMismatch);
103        }
104        if frame.is_last && self.closed {
105            return Err(ChannelError::ChannelClosed);
106        }
107        if self.inputs.contains_key(&frame.number) {
108            return Err(ChannelError::FrameNumberExists(frame.number as usize));
109        }
110        if self.closed && frame.number >= self.last_frame_number {
111            return Err(ChannelError::FrameBeyondEndFrame(frame.number as usize));
112        }
113
114        // Guaranteed to succeed at this point. Update the channel state.
115        if frame.is_last {
116            self.last_frame_number = frame.number;
117            self.closed = true;
118
119            // Prune frames with a higher number than the last frame number when we receive a
120            // closing frame.
121            if self.last_frame_number < self.highest_frame_number {
122                self.inputs.retain(|id, frame| {
123                    self.estimated_size -= frame.size();
124                    *id < self.last_frame_number
125                });
126                self.highest_frame_number = self.last_frame_number;
127            }
128        }
129
130        // Update the highest frame number.
131        if frame.number > self.highest_frame_number {
132            self.highest_frame_number = frame.number;
133        }
134
135        if self.highest_l1_inclusion_block.number < l1_inclusion_block.number {
136            self.highest_l1_inclusion_block = l1_inclusion_block;
137        }
138
139        self.estimated_size += frame.size();
140        self.inputs.insert(frame.number, frame);
141        Ok(())
142    }
143
144    /// Returns the block number of the L1 block that contained the first [`Frame`] in this channel.
145    pub const fn open_block_number(&self) -> u64 {
146        self.open_block.number
147    }
148
149    /// Returns the estimated size of the channel including [`Frame`] overhead.
150    pub const fn size(&self) -> usize {
151        self.estimated_size
152    }
153
154    /// Returns `true` if the channel is ready to be read.
155    pub fn is_ready(&self) -> bool {
156        // Must have buffered the last frame before the channel is ready.
157        if !self.closed {
158            return false;
159        }
160
161        // Must have the possibility of contiguous frames.
162        if self.inputs.len() != (self.last_frame_number + 1) as usize {
163            return false;
164        }
165
166        // Check for contiguous frames.
167        for i in 0..=self.last_frame_number {
168            if !self.inputs.contains_key(&i) {
169                return false;
170            }
171        }
172
173        true
174    }
175
176    /// Returns all of the channel's [`Frame`]s concatenated together.
177    ///
178    /// ## Returns
179    ///
180    /// - `Some(Bytes)`: The concatenated frame data
181    /// - `None`: If the channel is missing frames
182    pub fn frame_data(&self) -> Option<Bytes> {
183        if self.is_empty() {
184            return None;
185        }
186        let mut data = Vec::with_capacity(self.size());
187        (0..=self.last_frame_number).try_for_each(|i| {
188            let frame = self.inputs.get(&i)?;
189            data.extend_from_slice(&frame.data);
190            Some(())
191        })?;
192        Some(data.into())
193    }
194}
195
196#[cfg(test)]
197mod test {
198    use super::*;
199    use alloc::{
200        string::{String, ToString},
201        vec,
202    };
203
204    struct FrameValidityTestCase {
205        #[allow(dead_code)]
206        name: String,
207        frames: Vec<Frame>,
208        should_error: Vec<bool>,
209        sizes: Vec<u64>,
210        frame_data: Option<Bytes>,
211    }
212
213    fn run_frame_validity_test(test_case: FrameValidityTestCase) {
214        // #[cfg(feature = "std")]
215        // println!("Running test: {}", test_case.name);
216
217        let id = [0xFF; 16];
218        let block = BlockInfo::default();
219        let mut channel = Channel::new(id, block);
220
221        if test_case.frames.len() != test_case.should_error.len() ||
222            test_case.frames.len() != test_case.sizes.len()
223        {
224            panic!("Test case length mismatch");
225        }
226
227        for (i, frame) in test_case.frames.iter().enumerate() {
228            let result = channel.add_frame(frame.clone(), block);
229            if test_case.should_error[i] {
230                assert!(result.is_err());
231            } else {
232                assert!(result.is_ok());
233            }
234            assert_eq!(channel.size(), test_case.sizes[i] as usize);
235        }
236
237        if test_case.frame_data.is_some() {
238            assert_eq!(channel.frame_data().unwrap(), test_case.frame_data.unwrap());
239        }
240    }
241
242    #[test]
243    fn test_channel_accessors() {
244        let id = [0xFF; 16];
245        let block = BlockInfo { number: 42, timestamp: 0, ..Default::default() };
246        let channel = Channel::new(id, block);
247
248        assert_eq!(channel.id(), id);
249        assert_eq!(channel.open_block_number(), block.number);
250        assert_eq!(channel.size(), 0);
251        assert_eq!(channel.len(), 0);
252        assert!(channel.is_empty());
253        assert!(!channel.is_ready());
254    }
255
256    #[test]
257    fn test_frame_validity() {
258        let id = [0xFF; 16];
259        let test_cases = [
260            FrameValidityTestCase {
261                name: "wrong channel".to_string(),
262                frames: vec![Frame { id: [0xEE; 16], ..Default::default() }],
263                should_error: vec![true],
264                sizes: vec![0],
265                frame_data: None,
266            },
267            FrameValidityTestCase {
268                name: "double close".to_string(),
269                frames: vec![
270                    Frame { id, is_last: true, number: 2, data: b"four".to_vec() },
271                    Frame { id, is_last: true, number: 1, ..Default::default() },
272                ],
273                should_error: vec![false, true],
274                sizes: vec![204, 204],
275                frame_data: None,
276            },
277            FrameValidityTestCase {
278                name: "duplicate frame".to_string(),
279                frames: vec![
280                    Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
281                    Frame { id, number: 2, data: b"seven".to_vec(), ..Default::default() },
282                ],
283                should_error: vec![false, true],
284                sizes: vec![204, 204],
285                frame_data: None,
286            },
287            FrameValidityTestCase {
288                name: "duplicate closing frames".to_string(),
289                frames: vec![
290                    Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
291                    Frame { id, number: 2, is_last: true, data: b"seven".to_vec() },
292                ],
293                should_error: vec![false, true],
294                sizes: vec![204, 204],
295                frame_data: None,
296            },
297            FrameValidityTestCase {
298                name: "frame past closing".to_string(),
299                frames: vec![
300                    Frame { id, number: 2, is_last: true, data: b"four".to_vec() },
301                    Frame { id, number: 10, data: b"seven".to_vec(), ..Default::default() },
302                ],
303                should_error: vec![false, true],
304                sizes: vec![204, 204],
305                frame_data: None,
306            },
307            FrameValidityTestCase {
308                name: "prune after close frame".to_string(),
309                frames: vec![
310                    Frame { id, number: 0, is_last: false, data: b"seven".to_vec() },
311                    Frame { id, number: 1, is_last: true, data: b"four".to_vec() },
312                ],
313                should_error: vec![false, false],
314                sizes: vec![205, 409],
315                frame_data: Some(b"sevenfour".to_vec().into()),
316            },
317            FrameValidityTestCase {
318                name: "multiple valid frames, no data".to_string(),
319                frames: vec![
320                    Frame { id, number: 1, data: b"seven__".to_vec(), ..Default::default() },
321                    Frame { id, number: 2, data: b"four".to_vec(), ..Default::default() },
322                ],
323                should_error: vec![false, false],
324                sizes: vec![207, 411],
325                // Notice: this is none because there is no frame at index 0,
326                //         which causes the frame_data to short-circuit to None.
327                frame_data: None,
328            },
329            FrameValidityTestCase {
330                name: "multiple valid frames".to_string(),
331                frames: vec![
332                    Frame { id, number: 0, data: b"seven__".to_vec(), ..Default::default() },
333                    Frame { id, number: 1, data: b"four".to_vec(), ..Default::default() },
334                ],
335                should_error: vec![false, false],
336                sizes: vec![207, 411],
337                frame_data: Some(b"seven__".to_vec().into()),
338            },
339        ];
340
341        test_cases.into_iter().for_each(run_frame_validity_test);
342    }
343}