kona_derive/stages/
frame_queue.rs

1//! This module contains the [FrameQueue] stage of the derivation pipeline.
2
3use crate::{
4    NextFrameProvider, OriginAdvancer, OriginProvider, PipelineError, PipelineResult, Signal,
5    SignalReceiver,
6};
7use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
8use alloy_primitives::Bytes;
9use async_trait::async_trait;
10use core::fmt::Debug;
11use kona_genesis::RollupConfig;
12use kona_protocol::{BlockInfo, Frame};
13
14/// Provides data frames for the [`FrameQueue`] stage.
15#[async_trait]
16pub trait FrameQueueProvider {
17    /// An item that can be converted into a byte array.
18    type Item: Into<Bytes>;
19
20    /// Retrieves the next data item from the L1 retrieval stage.
21    /// If there is data, it pushes it into the next stage.
22    /// If there is no data, it returns an error.
23    async fn next_data(&mut self) -> PipelineResult<Self::Item>;
24}
25
26/// The [`FrameQueue`] stage of the derivation pipeline.
27/// This stage takes the output of the [`L1Retrieval`] stage and parses it into frames.
28///
29/// [`L1Retrieval`]: crate::stages::L1Retrieval
30#[derive(Debug)]
31pub struct FrameQueue<P>
32where
33    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
34{
35    /// The previous stage in the pipeline.
36    pub prev: P,
37    /// The current frame queue.
38    queue: VecDeque<Frame>,
39    /// The rollup config.
40    rollup_config: Arc<RollupConfig>,
41}
42
43impl<P> FrameQueue<P>
44where
45    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
46{
47    /// Create a new [`FrameQueue`] stage with the given previous [`L1Retrieval`] stage.
48    ///
49    /// [`L1Retrieval`]: crate::stages::L1Retrieval
50    pub const fn new(prev: P, cfg: Arc<RollupConfig>) -> Self {
51        Self { prev, queue: VecDeque::new(), rollup_config: cfg }
52    }
53
54    /// Returns if holocene is active.
55    pub fn is_holocene_active(&self, origin: BlockInfo) -> bool {
56        self.rollup_config.is_holocene_active(origin.timestamp)
57    }
58
59    /// Prunes frames if Holocene is active.
60    pub fn prune(&mut self, origin: BlockInfo) {
61        if !self.is_holocene_active(origin) {
62            return;
63        }
64
65        let mut i = 0;
66        while i < self.queue.len() - 1 {
67            let prev_frame = &self.queue[i];
68            let next_frame = &self.queue[i + 1];
69            let extends_channel = prev_frame.id == next_frame.id;
70
71            // If the frames are in the same channel, and the frame numbers are not sequential,
72            // drop the next frame.
73            if extends_channel && prev_frame.number + 1 != next_frame.number {
74                self.queue.remove(i + 1);
75                continue;
76            }
77
78            // If the frames are in the same channel, and the previous is last, drop the next frame.
79            if extends_channel && prev_frame.is_last {
80                self.queue.remove(i + 1);
81                continue;
82            }
83
84            // If the frames are in different channels, the next frame must be first.
85            if !extends_channel && next_frame.number != 0 {
86                self.queue.remove(i + 1);
87                continue;
88            }
89
90            // If the frames are in different channels, and the current channel is not last, walk
91            // back the channel and drop all prev frames.
92            if !extends_channel && !prev_frame.is_last && next_frame.number == 0 {
93                // Find the index of the first frame in the queue with the same channel ID
94                // as the previous frame.
95                let first_frame =
96                    self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible");
97
98                // Drain all frames from the previous channel.
99                let drained = self.queue.drain(first_frame..=i);
100                i = i.saturating_sub(drained.len());
101                continue;
102            }
103
104            i += 1;
105        }
106    }
107
108    /// Loads more frames into the [`FrameQueue`].
109    pub async fn load_frames(&mut self) -> PipelineResult<()> {
110        // Skip loading frames if the queue is not empty.
111        if !self.queue.is_empty() {
112            return Ok(());
113        }
114
115        let data = match self.prev.next_data().await {
116            Ok(data) => data,
117            Err(e) => {
118                debug!(target: "frame_queue", "Failed to retrieve data: {:?}", e);
119                // SAFETY: Bubble up potential EOF error without wrapping.
120                return Err(e);
121            }
122        };
123
124        let Ok(frames) = Frame::parse_frames(&data.into()) else {
125            // There may be more frames in the queue for the
126            // pipeline to advance, so don't return an error here.
127            error!(target: "frame_queue", "Failed to parse frames from data.");
128            return Ok(());
129        };
130
131        // Optimistically extend the queue with the new frames.
132        self.queue.extend(frames);
133
134        // Update metrics with last frame count
135        kona_macros::set!(
136            gauge,
137            crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_BUFFER,
138            self.queue.len() as f64
139        );
140        let queue_size = self.queue.iter().map(|f| f.size()).sum::<usize>() as f64;
141        kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_MEM, queue_size);
142
143        // Prune frames if Holocene is active.
144        let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
145        self.prune(origin);
146
147        Ok(())
148    }
149}
150
151#[async_trait]
152impl<P> OriginAdvancer for FrameQueue<P>
153where
154    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
155{
156    async fn advance_origin(&mut self) -> PipelineResult<()> {
157        self.prev.advance_origin().await
158    }
159}
160
161#[async_trait]
162impl<P> NextFrameProvider for FrameQueue<P>
163where
164    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
165{
166    async fn next_frame(&mut self) -> PipelineResult<Frame> {
167        self.load_frames().await?;
168
169        // If we did not add more frames but still have more data, retry this function.
170        if self.queue.is_empty() {
171            trace!(target: "frame_queue", "Queue is empty after fetching data. Retrying next_frame.");
172            return Err(PipelineError::NotEnoughData.temp());
173        }
174
175        Ok(self.queue.pop_front().expect("Frame queue impossibly empty"))
176    }
177}
178
179impl<P> OriginProvider for FrameQueue<P>
180where
181    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
182{
183    fn origin(&self) -> Option<BlockInfo> {
184        self.prev.origin()
185    }
186}
187
188#[async_trait]
189impl<P> SignalReceiver for FrameQueue<P>
190where
191    P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
192{
193    async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
194        self.prev.signal(signal).await?;
195        self.queue = VecDeque::default();
196        Ok(())
197    }
198}
199
200#[cfg(test)]
201pub(crate) mod tests {
202    use super::*;
203    use crate::{test_utils::TestFrameQueueProvider, types::ResetSignal};
204    use alloc::vec;
205    use kona_genesis::HardForkConfig;
206
207    #[tokio::test]
208    async fn test_frame_queue_reset() {
209        let mock = TestFrameQueueProvider::new(vec![]);
210        let mut frame_queue = FrameQueue::new(mock, Default::default());
211        assert!(!frame_queue.prev.reset);
212        frame_queue.signal(ResetSignal::default().signal()).await.unwrap();
213        assert_eq!(frame_queue.queue.len(), 0);
214        assert!(frame_queue.prev.reset);
215    }
216
217    #[tokio::test]
218    async fn test_frame_queue_empty_bytes() {
219        let data = vec![Ok(Bytes::from(vec![0x00]))];
220        let mut mock = TestFrameQueueProvider::new(data);
221        mock.set_origin(BlockInfo::default());
222        let mut frame_queue = FrameQueue::new(mock, Default::default());
223        assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
224        let err = frame_queue.next_frame().await.unwrap_err();
225        assert_eq!(err, PipelineError::NotEnoughData.temp());
226    }
227
228    #[tokio::test]
229    async fn test_frame_queue_no_frames_decoded() {
230        let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())];
231        let mut mock = TestFrameQueueProvider::new(data);
232        mock.set_origin(BlockInfo::default());
233        let mut frame_queue = FrameQueue::new(mock, Default::default());
234        assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
235        let err = frame_queue.next_frame().await.unwrap_err();
236        assert_eq!(err, PipelineError::NotEnoughData.temp());
237    }
238
239    #[tokio::test]
240    async fn test_frame_queue_wrong_derivation_version() {
241        let assert = crate::test_utils::FrameQueueBuilder::new()
242            .with_origin(BlockInfo::default())
243            .with_raw_frames(Bytes::from(vec![0x01]))
244            .with_expected_err(PipelineError::NotEnoughData.temp())
245            .build();
246        assert.holocene_active(false);
247        assert.next_frames().await;
248    }
249
250    #[tokio::test]
251    async fn test_frame_queue_frame_too_short() {
252        let assert = crate::test_utils::FrameQueueBuilder::new()
253            .with_origin(BlockInfo::default())
254            .with_raw_frames(Bytes::from(vec![0x00, 0x01]))
255            .with_expected_err(PipelineError::NotEnoughData.temp())
256            .build();
257        assert.holocene_active(false);
258        assert.next_frames().await;
259    }
260
261    #[tokio::test]
262    async fn test_frame_queue_single_frame() {
263        let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
264        let assert = crate::test_utils::FrameQueueBuilder::new()
265            .with_expected_frames(&frames)
266            .with_origin(BlockInfo::default())
267            .with_frames(&frames)
268            .build();
269        assert.holocene_active(false);
270        assert.next_frames().await;
271    }
272
273    #[tokio::test]
274    async fn test_frame_queue_multiple_frames() {
275        let frames = [
276            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
277            crate::frame!(0xFF, 1, vec![0xDD; 50], false),
278            crate::frame!(0xFF, 2, vec![0xDD; 50], true),
279        ];
280        let assert = crate::test_utils::FrameQueueBuilder::new()
281            .with_expected_frames(&frames)
282            .with_origin(BlockInfo::default())
283            .with_frames(&frames)
284            .build();
285        assert.holocene_active(false);
286        assert.next_frames().await;
287    }
288
289    #[tokio::test]
290    async fn test_frame_queue_missing_origin() {
291        let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
292        let assert = crate::test_utils::FrameQueueBuilder::new()
293            .with_expected_frames(&frames)
294            .with_frames(&frames)
295            .build();
296        assert.holocene_active(false);
297        assert.missing_origin().await;
298    }
299
300    #[tokio::test]
301    async fn test_holocene_valid_frames() {
302        let frames = [
303            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
304            crate::frame!(0xFF, 1, vec![0xDD; 50], false),
305            crate::frame!(0xFF, 2, vec![0xDD; 50], true),
306        ];
307        let cfg = RollupConfig {
308            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
309            ..Default::default()
310        };
311        let assert = crate::test_utils::FrameQueueBuilder::new()
312            .with_rollup_config(&cfg)
313            .with_origin(BlockInfo::default())
314            .with_expected_frames(&frames)
315            .with_frames(&frames)
316            .build();
317        assert.holocene_active(true);
318        assert.next_frames().await;
319    }
320
321    #[tokio::test]
322    async fn test_holocene_single_frame() {
323        let frames = [crate::frame!(0xFF, 1, vec![0xDD; 50], true)];
324        let cfg = RollupConfig {
325            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
326            ..Default::default()
327        };
328        let assert = crate::test_utils::FrameQueueBuilder::new()
329            .with_rollup_config(&cfg)
330            .with_origin(BlockInfo::default())
331            .with_expected_frames(&frames)
332            .with_frames(&frames)
333            .build();
334        assert.holocene_active(true);
335        assert.next_frames().await;
336    }
337
338    #[tokio::test]
339    async fn test_holocene_unordered_frames() {
340        let frames = [
341            // -- First Channel --
342            crate::frame!(0xEE, 0, vec![0xDD; 50], false),
343            crate::frame!(0xEE, 1, vec![0xDD; 50], false),
344            crate::frame!(0xEE, 2, vec![0xDD; 50], true),
345            crate::frame!(0xEE, 3, vec![0xDD; 50], false), // Dropped
346            // -- Next Channel --
347            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
348            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
349        ];
350        let cfg = RollupConfig {
351            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
352            ..Default::default()
353        };
354        let assert = crate::test_utils::FrameQueueBuilder::new()
355            .with_rollup_config(&cfg)
356            .with_origin(BlockInfo::default())
357            .with_expected_frames(&[&frames[0..3], &frames[4..]].concat())
358            .with_frames(&frames)
359            .build();
360        assert.holocene_active(true);
361        assert.next_frames().await;
362    }
363
364    #[tokio::test]
365    async fn test_holocene_non_sequential_frames() {
366        let frames = [
367            // -- First Channel --
368            crate::frame!(0xEE, 0, vec![0xDD; 50], false),
369            crate::frame!(0xEE, 1, vec![0xDD; 50], false),
370            crate::frame!(0xEE, 3, vec![0xDD; 50], true), // Dropped
371            crate::frame!(0xEE, 4, vec![0xDD; 50], false), // Dropped
372        ];
373        let cfg = RollupConfig {
374            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
375            ..Default::default()
376        };
377        let assert = crate::test_utils::FrameQueueBuilder::new()
378            .with_rollup_config(&cfg)
379            .with_origin(BlockInfo::default())
380            .with_expected_frames(&frames[0..2])
381            .with_frames(&frames)
382            .build();
383        assert.holocene_active(true);
384        assert.next_frames().await;
385    }
386
387    #[tokio::test]
388    async fn test_holocene_unclosed_channel() {
389        let frames = [
390            // -- First Channel --
391            crate::frame!(0xEE, 0, vec![0xDD; 50], false),
392            crate::frame!(0xEE, 1, vec![0xDD; 50], false),
393            crate::frame!(0xEE, 2, vec![0xDD; 50], false),
394            crate::frame!(0xEE, 3, vec![0xDD; 50], false),
395            // -- Next Channel --
396            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
397            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
398        ];
399        let cfg = RollupConfig {
400            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
401            ..Default::default()
402        };
403        let assert = crate::test_utils::FrameQueueBuilder::new()
404            .with_rollup_config(&cfg)
405            .with_origin(BlockInfo::default())
406            .with_expected_frames(&frames[4..])
407            .with_frames(&frames)
408            .build();
409        assert.holocene_active(true);
410        assert.next_frames().await;
411    }
412
413    #[tokio::test]
414    async fn test_holocene_unstarted_channel() {
415        let frames = [
416            // -- First Channel --
417            crate::frame!(0xDD, 0, vec![0xDD; 50], false),
418            crate::frame!(0xDD, 1, vec![0xDD; 50], false),
419            crate::frame!(0xDD, 2, vec![0xDD; 50], false),
420            crate::frame!(0xDD, 3, vec![0xDD; 50], true),
421            // -- Second Channel --
422            crate::frame!(0xEE, 1, vec![0xDD; 50], false), // Dropped
423            crate::frame!(0xEE, 2, vec![0xDD; 50], true),  // Dropped
424            // -- Third Channel --
425            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
426            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
427        ];
428        let cfg = RollupConfig {
429            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
430            ..Default::default()
431        };
432        let assert = crate::test_utils::FrameQueueBuilder::new()
433            .with_rollup_config(&cfg)
434            .with_origin(BlockInfo::default())
435            .with_expected_frames(&[&frames[0..4], &frames[6..]].concat())
436            .with_frames(&frames)
437            .build();
438        assert.holocene_active(true);
439        assert.next_frames().await;
440    }
441
442    #[tokio::test]
443    async fn test_holocene_unclosed_channel_with_invalid_start() {
444        let frames = [
445            // -- First Channel --
446            crate::frame!(0xEE, 0, vec![0xDD; 50], false),
447            crate::frame!(0xEE, 1, vec![0xDD; 50], false),
448            crate::frame!(0xEE, 2, vec![0xDD; 50], false),
449            crate::frame!(0xEE, 3, vec![0xDD; 50], false),
450            // -- Next Channel --
451            crate::frame!(0xFF, 1, vec![0xDD; 50], false), // Dropped
452            crate::frame!(0xFF, 2, vec![0xDD; 50], true),  // Dropped
453        ];
454        let cfg = RollupConfig {
455            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
456            ..Default::default()
457        };
458        let assert = crate::test_utils::FrameQueueBuilder::new()
459            .with_rollup_config(&cfg)
460            .with_origin(BlockInfo::default())
461            .with_expected_frames(&frames[0..4])
462            .with_frames(&frames)
463            .build();
464        assert.holocene_active(true);
465        assert.next_frames().await;
466    }
467
468    #[tokio::test]
469    async fn test_holocene_replace_channel() {
470        let frames = [
471            // -- First Channel - VALID & CLOSED --
472            crate::frame!(0xDD, 0, vec![0xDD; 50], false),
473            crate::frame!(0xDD, 1, vec![0xDD; 50], true),
474            // -- Second Channel - VALID & NOT CLOSED / DROPPED --
475            crate::frame!(0xEE, 0, vec![0xDD; 50], false),
476            crate::frame!(0xEE, 1, vec![0xDD; 50], false),
477            // -- Third Channel - VALID & CLOSED / REPLACES CHANNEL #2 --
478            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
479            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
480        ];
481        let cfg = RollupConfig {
482            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
483            ..Default::default()
484        };
485        let assert = crate::test_utils::FrameQueueBuilder::new()
486            .with_rollup_config(&cfg)
487            .with_origin(BlockInfo::default())
488            .with_expected_frames(&[&frames[0..2], &frames[4..]].concat())
489            .with_frames(&frames)
490            .build();
491        assert.holocene_active(true);
492        assert.next_frames().await;
493    }
494
495    #[tokio::test]
496    async fn test_holocene_interleaved_invalid_channel() {
497        let frames = [
498            // -- First channel is dropped since it is replaced by the second channel --
499            // -- Second channel is dropped since it isn't closed --
500            crate::frame!(0x01, 0, vec![0xDD; 50], false),
501            crate::frame!(0x02, 0, vec![0xDD; 50], false),
502            crate::frame!(0x01, 1, vec![0xDD; 50], true),
503            crate::frame!(0x02, 1, vec![0xDD; 50], false),
504            // -- Third Channel - VALID & CLOSED --
505            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
506            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
507        ];
508        let cfg = RollupConfig {
509            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
510            ..Default::default()
511        };
512        let assert = crate::test_utils::FrameQueueBuilder::new()
513            .with_rollup_config(&cfg)
514            .with_origin(BlockInfo::default())
515            .with_expected_frames(&frames[4..])
516            .with_frames(&frames)
517            .build();
518        assert.holocene_active(true);
519        assert.next_frames().await;
520    }
521
522    #[tokio::test]
523    async fn test_holocene_interleaved_valid_channel() {
524        let frames = [
525            // -- First channel is dropped since it is replaced by the second channel --
526            // -- Second channel is successfully closed so it's valid --
527            crate::frame!(0x01, 0, vec![0xDD; 50], false),
528            crate::frame!(0x02, 0, vec![0xDD; 50], false),
529            crate::frame!(0x01, 1, vec![0xDD; 50], true),
530            crate::frame!(0x02, 1, vec![0xDD; 50], true),
531            // -- Third Channel - VALID & CLOSED --
532            crate::frame!(0xFF, 0, vec![0xDD; 50], false),
533            crate::frame!(0xFF, 1, vec![0xDD; 50], true),
534        ];
535        let cfg = RollupConfig {
536            hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
537            ..Default::default()
538        };
539        let assert = crate::test_utils::FrameQueueBuilder::new()
540            .with_rollup_config(&cfg)
541            .with_origin(BlockInfo::default())
542            .with_expected_frames(&[&frames[1..2], &frames[3..]].concat())
543            .with_frames(&frames)
544            .build();
545        assert.holocene_active(true);
546        assert.next_frames().await;
547    }
548}