1use crate::{
4 NextFrameProvider, OriginAdvancer, OriginProvider, PipelineError, PipelineResult, Signal,
5 SignalReceiver,
6};
7use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
8use alloy_primitives::Bytes;
9use async_trait::async_trait;
10use core::fmt::Debug;
11use kona_genesis::RollupConfig;
12use kona_protocol::{BlockInfo, Frame};
13
14#[async_trait]
16pub trait FrameQueueProvider {
17 type Item: Into<Bytes>;
19
20 async fn next_data(&mut self) -> PipelineResult<Self::Item>;
24}
25
26#[derive(Debug)]
31pub struct FrameQueue<P>
32where
33 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
34{
35 pub prev: P,
37 queue: VecDeque<Frame>,
39 rollup_config: Arc<RollupConfig>,
41}
42
43impl<P> FrameQueue<P>
44where
45 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
46{
47 pub const fn new(prev: P, cfg: Arc<RollupConfig>) -> Self {
51 Self { prev, queue: VecDeque::new(), rollup_config: cfg }
52 }
53
54 pub fn is_holocene_active(&self, origin: BlockInfo) -> bool {
56 self.rollup_config.is_holocene_active(origin.timestamp)
57 }
58
59 pub fn prune(&mut self, origin: BlockInfo) {
61 if !self.is_holocene_active(origin) {
62 return;
63 }
64
65 let mut i = 0;
66 while i < self.queue.len() - 1 {
67 let prev_frame = &self.queue[i];
68 let next_frame = &self.queue[i + 1];
69 let extends_channel = prev_frame.id == next_frame.id;
70
71 if extends_channel && prev_frame.number + 1 != next_frame.number {
74 self.queue.remove(i + 1);
75 continue;
76 }
77
78 if extends_channel && prev_frame.is_last {
80 self.queue.remove(i + 1);
81 continue;
82 }
83
84 if !extends_channel && next_frame.number != 0 {
86 self.queue.remove(i + 1);
87 continue;
88 }
89
90 if !extends_channel && !prev_frame.is_last && next_frame.number == 0 {
93 let first_frame =
96 self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible");
97
98 let drained = self.queue.drain(first_frame..=i);
100 i = i.saturating_sub(drained.len());
101 continue;
102 }
103
104 i += 1;
105 }
106 }
107
108 pub async fn load_frames(&mut self) -> PipelineResult<()> {
110 if !self.queue.is_empty() {
112 return Ok(());
113 }
114
115 let data = match self.prev.next_data().await {
116 Ok(data) => data,
117 Err(e) => {
118 debug!(target: "frame_queue", "Failed to retrieve data: {:?}", e);
119 return Err(e);
121 }
122 };
123
124 let Ok(frames) = Frame::parse_frames(&data.into()) else {
125 error!(target: "frame_queue", "Failed to parse frames from data.");
128 return Ok(());
129 };
130
131 self.queue.extend(frames);
133
134 kona_macros::set!(
136 gauge,
137 crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_BUFFER,
138 self.queue.len() as f64
139 );
140 let queue_size = self.queue.iter().map(|f| f.size()).sum::<usize>() as f64;
141 kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_FRAME_QUEUE_MEM, queue_size);
142
143 let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
145 self.prune(origin);
146
147 Ok(())
148 }
149}
150
151#[async_trait]
152impl<P> OriginAdvancer for FrameQueue<P>
153where
154 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
155{
156 async fn advance_origin(&mut self) -> PipelineResult<()> {
157 self.prev.advance_origin().await
158 }
159}
160
161#[async_trait]
162impl<P> NextFrameProvider for FrameQueue<P>
163where
164 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
165{
166 async fn next_frame(&mut self) -> PipelineResult<Frame> {
167 self.load_frames().await?;
168
169 if self.queue.is_empty() {
171 trace!(target: "frame_queue", "Queue is empty after fetching data. Retrying next_frame.");
172 return Err(PipelineError::NotEnoughData.temp());
173 }
174
175 Ok(self.queue.pop_front().expect("Frame queue impossibly empty"))
176 }
177}
178
179impl<P> OriginProvider for FrameQueue<P>
180where
181 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
182{
183 fn origin(&self) -> Option<BlockInfo> {
184 self.prev.origin()
185 }
186}
187
188#[async_trait]
189impl<P> SignalReceiver for FrameQueue<P>
190where
191 P: FrameQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
192{
193 async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
194 self.prev.signal(signal).await?;
195 self.queue = VecDeque::default();
196 Ok(())
197 }
198}
199
200#[cfg(test)]
201pub(crate) mod tests {
202 use super::*;
203 use crate::{test_utils::TestFrameQueueProvider, types::ResetSignal};
204 use alloc::vec;
205 use kona_genesis::HardForkConfig;
206
207 #[tokio::test]
208 async fn test_frame_queue_reset() {
209 let mock = TestFrameQueueProvider::new(vec![]);
210 let mut frame_queue = FrameQueue::new(mock, Default::default());
211 assert!(!frame_queue.prev.reset);
212 frame_queue.signal(ResetSignal::default().signal()).await.unwrap();
213 assert_eq!(frame_queue.queue.len(), 0);
214 assert!(frame_queue.prev.reset);
215 }
216
217 #[tokio::test]
218 async fn test_frame_queue_empty_bytes() {
219 let data = vec![Ok(Bytes::from(vec![0x00]))];
220 let mut mock = TestFrameQueueProvider::new(data);
221 mock.set_origin(BlockInfo::default());
222 let mut frame_queue = FrameQueue::new(mock, Default::default());
223 assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
224 let err = frame_queue.next_frame().await.unwrap_err();
225 assert_eq!(err, PipelineError::NotEnoughData.temp());
226 }
227
228 #[tokio::test]
229 async fn test_frame_queue_no_frames_decoded() {
230 let data = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())];
231 let mut mock = TestFrameQueueProvider::new(data);
232 mock.set_origin(BlockInfo::default());
233 let mut frame_queue = FrameQueue::new(mock, Default::default());
234 assert!(!frame_queue.is_holocene_active(BlockInfo::default()));
235 let err = frame_queue.next_frame().await.unwrap_err();
236 assert_eq!(err, PipelineError::NotEnoughData.temp());
237 }
238
239 #[tokio::test]
240 async fn test_frame_queue_wrong_derivation_version() {
241 let assert = crate::test_utils::FrameQueueBuilder::new()
242 .with_origin(BlockInfo::default())
243 .with_raw_frames(Bytes::from(vec![0x01]))
244 .with_expected_err(PipelineError::NotEnoughData.temp())
245 .build();
246 assert.holocene_active(false);
247 assert.next_frames().await;
248 }
249
250 #[tokio::test]
251 async fn test_frame_queue_frame_too_short() {
252 let assert = crate::test_utils::FrameQueueBuilder::new()
253 .with_origin(BlockInfo::default())
254 .with_raw_frames(Bytes::from(vec![0x00, 0x01]))
255 .with_expected_err(PipelineError::NotEnoughData.temp())
256 .build();
257 assert.holocene_active(false);
258 assert.next_frames().await;
259 }
260
261 #[tokio::test]
262 async fn test_frame_queue_single_frame() {
263 let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
264 let assert = crate::test_utils::FrameQueueBuilder::new()
265 .with_expected_frames(&frames)
266 .with_origin(BlockInfo::default())
267 .with_frames(&frames)
268 .build();
269 assert.holocene_active(false);
270 assert.next_frames().await;
271 }
272
273 #[tokio::test]
274 async fn test_frame_queue_multiple_frames() {
275 let frames = [
276 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
277 crate::frame!(0xFF, 1, vec![0xDD; 50], false),
278 crate::frame!(0xFF, 2, vec![0xDD; 50], true),
279 ];
280 let assert = crate::test_utils::FrameQueueBuilder::new()
281 .with_expected_frames(&frames)
282 .with_origin(BlockInfo::default())
283 .with_frames(&frames)
284 .build();
285 assert.holocene_active(false);
286 assert.next_frames().await;
287 }
288
289 #[tokio::test]
290 async fn test_frame_queue_missing_origin() {
291 let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)];
292 let assert = crate::test_utils::FrameQueueBuilder::new()
293 .with_expected_frames(&frames)
294 .with_frames(&frames)
295 .build();
296 assert.holocene_active(false);
297 assert.missing_origin().await;
298 }
299
300 #[tokio::test]
301 async fn test_holocene_valid_frames() {
302 let frames = [
303 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
304 crate::frame!(0xFF, 1, vec![0xDD; 50], false),
305 crate::frame!(0xFF, 2, vec![0xDD; 50], true),
306 ];
307 let cfg = RollupConfig {
308 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
309 ..Default::default()
310 };
311 let assert = crate::test_utils::FrameQueueBuilder::new()
312 .with_rollup_config(&cfg)
313 .with_origin(BlockInfo::default())
314 .with_expected_frames(&frames)
315 .with_frames(&frames)
316 .build();
317 assert.holocene_active(true);
318 assert.next_frames().await;
319 }
320
321 #[tokio::test]
322 async fn test_holocene_single_frame() {
323 let frames = [crate::frame!(0xFF, 1, vec![0xDD; 50], true)];
324 let cfg = RollupConfig {
325 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
326 ..Default::default()
327 };
328 let assert = crate::test_utils::FrameQueueBuilder::new()
329 .with_rollup_config(&cfg)
330 .with_origin(BlockInfo::default())
331 .with_expected_frames(&frames)
332 .with_frames(&frames)
333 .build();
334 assert.holocene_active(true);
335 assert.next_frames().await;
336 }
337
338 #[tokio::test]
339 async fn test_holocene_unordered_frames() {
340 let frames = [
341 crate::frame!(0xEE, 0, vec![0xDD; 50], false),
343 crate::frame!(0xEE, 1, vec![0xDD; 50], false),
344 crate::frame!(0xEE, 2, vec![0xDD; 50], true),
345 crate::frame!(0xEE, 3, vec![0xDD; 50], false), crate::frame!(0xFF, 0, vec![0xDD; 50], false),
348 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
349 ];
350 let cfg = RollupConfig {
351 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
352 ..Default::default()
353 };
354 let assert = crate::test_utils::FrameQueueBuilder::new()
355 .with_rollup_config(&cfg)
356 .with_origin(BlockInfo::default())
357 .with_expected_frames(&[&frames[0..3], &frames[4..]].concat())
358 .with_frames(&frames)
359 .build();
360 assert.holocene_active(true);
361 assert.next_frames().await;
362 }
363
364 #[tokio::test]
365 async fn test_holocene_non_sequential_frames() {
366 let frames = [
367 crate::frame!(0xEE, 0, vec![0xDD; 50], false),
369 crate::frame!(0xEE, 1, vec![0xDD; 50], false),
370 crate::frame!(0xEE, 3, vec![0xDD; 50], true), crate::frame!(0xEE, 4, vec![0xDD; 50], false), ];
373 let cfg = RollupConfig {
374 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
375 ..Default::default()
376 };
377 let assert = crate::test_utils::FrameQueueBuilder::new()
378 .with_rollup_config(&cfg)
379 .with_origin(BlockInfo::default())
380 .with_expected_frames(&frames[0..2])
381 .with_frames(&frames)
382 .build();
383 assert.holocene_active(true);
384 assert.next_frames().await;
385 }
386
387 #[tokio::test]
388 async fn test_holocene_unclosed_channel() {
389 let frames = [
390 crate::frame!(0xEE, 0, vec![0xDD; 50], false),
392 crate::frame!(0xEE, 1, vec![0xDD; 50], false),
393 crate::frame!(0xEE, 2, vec![0xDD; 50], false),
394 crate::frame!(0xEE, 3, vec![0xDD; 50], false),
395 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
397 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
398 ];
399 let cfg = RollupConfig {
400 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
401 ..Default::default()
402 };
403 let assert = crate::test_utils::FrameQueueBuilder::new()
404 .with_rollup_config(&cfg)
405 .with_origin(BlockInfo::default())
406 .with_expected_frames(&frames[4..])
407 .with_frames(&frames)
408 .build();
409 assert.holocene_active(true);
410 assert.next_frames().await;
411 }
412
413 #[tokio::test]
414 async fn test_holocene_unstarted_channel() {
415 let frames = [
416 crate::frame!(0xDD, 0, vec![0xDD; 50], false),
418 crate::frame!(0xDD, 1, vec![0xDD; 50], false),
419 crate::frame!(0xDD, 2, vec![0xDD; 50], false),
420 crate::frame!(0xDD, 3, vec![0xDD; 50], true),
421 crate::frame!(0xEE, 1, vec![0xDD; 50], false), crate::frame!(0xEE, 2, vec![0xDD; 50], true), crate::frame!(0xFF, 0, vec![0xDD; 50], false),
426 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
427 ];
428 let cfg = RollupConfig {
429 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
430 ..Default::default()
431 };
432 let assert = crate::test_utils::FrameQueueBuilder::new()
433 .with_rollup_config(&cfg)
434 .with_origin(BlockInfo::default())
435 .with_expected_frames(&[&frames[0..4], &frames[6..]].concat())
436 .with_frames(&frames)
437 .build();
438 assert.holocene_active(true);
439 assert.next_frames().await;
440 }
441
442 #[tokio::test]
443 async fn test_holocene_unclosed_channel_with_invalid_start() {
444 let frames = [
445 crate::frame!(0xEE, 0, vec![0xDD; 50], false),
447 crate::frame!(0xEE, 1, vec![0xDD; 50], false),
448 crate::frame!(0xEE, 2, vec![0xDD; 50], false),
449 crate::frame!(0xEE, 3, vec![0xDD; 50], false),
450 crate::frame!(0xFF, 1, vec![0xDD; 50], false), crate::frame!(0xFF, 2, vec![0xDD; 50], true), ];
454 let cfg = RollupConfig {
455 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
456 ..Default::default()
457 };
458 let assert = crate::test_utils::FrameQueueBuilder::new()
459 .with_rollup_config(&cfg)
460 .with_origin(BlockInfo::default())
461 .with_expected_frames(&frames[0..4])
462 .with_frames(&frames)
463 .build();
464 assert.holocene_active(true);
465 assert.next_frames().await;
466 }
467
468 #[tokio::test]
469 async fn test_holocene_replace_channel() {
470 let frames = [
471 crate::frame!(0xDD, 0, vec![0xDD; 50], false),
473 crate::frame!(0xDD, 1, vec![0xDD; 50], true),
474 crate::frame!(0xEE, 0, vec![0xDD; 50], false),
476 crate::frame!(0xEE, 1, vec![0xDD; 50], false),
477 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
479 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
480 ];
481 let cfg = RollupConfig {
482 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
483 ..Default::default()
484 };
485 let assert = crate::test_utils::FrameQueueBuilder::new()
486 .with_rollup_config(&cfg)
487 .with_origin(BlockInfo::default())
488 .with_expected_frames(&[&frames[0..2], &frames[4..]].concat())
489 .with_frames(&frames)
490 .build();
491 assert.holocene_active(true);
492 assert.next_frames().await;
493 }
494
495 #[tokio::test]
496 async fn test_holocene_interleaved_invalid_channel() {
497 let frames = [
498 crate::frame!(0x01, 0, vec![0xDD; 50], false),
501 crate::frame!(0x02, 0, vec![0xDD; 50], false),
502 crate::frame!(0x01, 1, vec![0xDD; 50], true),
503 crate::frame!(0x02, 1, vec![0xDD; 50], false),
504 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
506 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
507 ];
508 let cfg = RollupConfig {
509 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
510 ..Default::default()
511 };
512 let assert = crate::test_utils::FrameQueueBuilder::new()
513 .with_rollup_config(&cfg)
514 .with_origin(BlockInfo::default())
515 .with_expected_frames(&frames[4..])
516 .with_frames(&frames)
517 .build();
518 assert.holocene_active(true);
519 assert.next_frames().await;
520 }
521
522 #[tokio::test]
523 async fn test_holocene_interleaved_valid_channel() {
524 let frames = [
525 crate::frame!(0x01, 0, vec![0xDD; 50], false),
528 crate::frame!(0x02, 0, vec![0xDD; 50], false),
529 crate::frame!(0x01, 1, vec![0xDD; 50], true),
530 crate::frame!(0x02, 1, vec![0xDD; 50], true),
531 crate::frame!(0xFF, 0, vec![0xDD; 50], false),
533 crate::frame!(0xFF, 1, vec![0xDD; 50], true),
534 ];
535 let cfg = RollupConfig {
536 hardforks: HardForkConfig { holocene_time: Some(0), ..Default::default() },
537 ..Default::default()
538 };
539 let assert = crate::test_utils::FrameQueueBuilder::new()
540 .with_rollup_config(&cfg)
541 .with_origin(BlockInfo::default())
542 .with_expected_frames(&[&frames[1..2], &frames[3..]].concat())
543 .with_frames(&frames)
544 .build();
545 assert.holocene_active(true);
546 assert.next_frames().await;
547 }
548}