kona_derive/stages/
attributes_queue.rs

1//! Contains the logic for the `AttributesQueue` stage.
2
3use crate::{
4    errors::{PipelineError, ResetError},
5    traits::{
6        AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider,
7        SignalReceiver,
8    },
9    types::{PipelineResult, Signal},
10};
11use alloc::{boxed::Box, sync::Arc};
12use async_trait::async_trait;
13use core::fmt::Debug;
14use kona_genesis::RollupConfig;
15use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent, SingleBatch};
16use op_alloy_rpc_types_engine::OpPayloadAttributes;
17
18/// [`AttributesQueue`] accepts batches from the [`BatchQueue`] stage
19/// and transforms them into [`OpPayloadAttributes`].
20///
21/// The outputted payload attributes cannot be buffered because each batch->attributes
22/// transformation pulls in data about the current L2 safe head.
23///
24/// [`AttributesQueue`] also buffers batches that have been output because
25/// multiple batches can be created at once.
26///
27/// This stage can be reset by clearing its batch buffer.
28/// This stage does not need to retain any references to L1 blocks.
29///
30/// [`BatchQueue`]: crate::stages::BatchQueue
31#[derive(Debug)]
32pub struct AttributesQueue<P, AB>
33where
34    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
35    AB: AttributesBuilder + Debug,
36{
37    /// The rollup config.
38    cfg: Arc<RollupConfig>,
39    /// The previous stage of the derivation pipeline.
40    prev: P,
41    /// Whether the current batch is the last in its span.
42    is_last_in_span: bool,
43    /// The current batch being processed.
44    batch: Option<SingleBatch>,
45    /// The attributes builder.
46    builder: AB,
47}
48
49impl<P, AB> AttributesQueue<P, AB>
50where
51    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
52    AB: AttributesBuilder + Debug,
53{
54    /// Create a new [`AttributesQueue`] stage.
55    pub const fn new(cfg: Arc<RollupConfig>, prev: P, builder: AB) -> Self {
56        Self { cfg, prev, is_last_in_span: false, batch: None, builder }
57    }
58
59    /// Loads a [`SingleBatch`] from the [`AttributesProvider`] if needed.
60    pub async fn load_batch(&mut self, parent: L2BlockInfo) -> PipelineResult<SingleBatch> {
61        if self.batch.is_none() {
62            let batch = self.prev.next_batch(parent).await?;
63            self.batch = Some(batch);
64            self.is_last_in_span = self.prev.is_last_in_span();
65        }
66        self.batch.as_ref().cloned().ok_or(PipelineError::Eof.temp())
67    }
68
69    /// Returns the next [`OpAttributesWithParent`] from the current batch.
70    pub async fn next_attributes(
71        &mut self,
72        parent: L2BlockInfo,
73    ) -> PipelineResult<OpAttributesWithParent> {
74        let batch = match self.load_batch(parent).await {
75            Ok(batch) => batch,
76            Err(e) => {
77                return Err(e);
78            }
79        };
80
81        // Construct the payload attributes from the loaded batch.
82        #[cfg(feature = "metrics")]
83        let start = std::time::Instant::now();
84        let attributes = match self.create_next_attributes(batch, parent).await {
85            Ok(attributes) => attributes,
86            Err(e) => {
87                return Err(e);
88            }
89        };
90        let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
91        let populated_attributes =
92            OpAttributesWithParent::new(attributes, parent, Some(origin), self.is_last_in_span);
93        kona_macros::record!(
94            histogram,
95            crate::metrics::Metrics::PIPELINE_ATTRIBUTES_BUILD_DURATION,
96            start.elapsed().as_secs_f64()
97        );
98
99        // Clear out the local state once payload attributes are prepared.
100        self.batch = None;
101        self.is_last_in_span = false;
102        Ok(populated_attributes)
103    }
104
105    /// Creates the next attributes, transforming a [`SingleBatch`] into [`OpPayloadAttributes`].
106    /// This sets `no_tx_pool` and appends the batched txs to the attributes tx list.
107    pub async fn create_next_attributes(
108        &mut self,
109        batch: SingleBatch,
110        parent: L2BlockInfo,
111    ) -> PipelineResult<OpPayloadAttributes> {
112        // Sanity check parent hash
113        if batch.parent_hash != parent.block_info.hash {
114            return Err(ResetError::BadParentHash(batch.parent_hash, parent.block_info.hash).into());
115        }
116
117        // Sanity check timestamp
118        let actual = parent.block_info.timestamp + self.cfg.block_time;
119        if actual != batch.timestamp {
120            return Err(ResetError::BadTimestamp(batch.timestamp, actual).into());
121        }
122
123        // Prepare the payload attributes
124        let tx_count = batch.transactions.len();
125        let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch()).await?;
126        attributes.no_tx_pool = Some(true);
127        match attributes.transactions {
128            Some(ref mut txs) => txs.extend(batch.transactions),
129            None => {
130                if !batch.transactions.is_empty() {
131                    attributes.transactions = Some(batch.transactions);
132                }
133            }
134        }
135
136        info!(
137            target: "attributes_queue",
138            txs = tx_count,
139            timestamp = batch.timestamp,
140            "generated attributes in payload queue",
141        );
142
143        Ok(attributes)
144    }
145}
146
147#[async_trait]
148impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
149where
150    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
151    AB: AttributesBuilder + Debug + Send,
152{
153    async fn advance_origin(&mut self) -> PipelineResult<()> {
154        self.prev.advance_origin().await
155    }
156}
157
158#[async_trait]
159impl<P, AB> NextAttributes for AttributesQueue<P, AB>
160where
161    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
162    AB: AttributesBuilder + Debug + Send,
163{
164    async fn next_attributes(
165        &mut self,
166        parent: L2BlockInfo,
167    ) -> PipelineResult<OpAttributesWithParent> {
168        self.next_attributes(parent).await
169    }
170}
171
172impl<P, AB> OriginProvider for AttributesQueue<P, AB>
173where
174    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
175    AB: AttributesBuilder + Debug,
176{
177    fn origin(&self) -> Option<BlockInfo> {
178        self.prev.origin()
179    }
180}
181
182#[async_trait]
183impl<P, AB> SignalReceiver for AttributesQueue<P, AB>
184where
185    P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
186    AB: AttributesBuilder + Send + Debug,
187{
188    async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
189        match signal {
190            s @ Signal::Reset(_) | s @ Signal::Activation(_) => {
191                self.prev.signal(s).await?;
192                self.batch = None;
193                self.is_last_in_span = false;
194            }
195            s @ Signal::FlushChannel => {
196                self.batch = None;
197                self.prev.signal(s).await?;
198            }
199            s @ Signal::ProvideBlock(_) => {
200                self.prev.signal(s).await?;
201            }
202        }
203        Ok(())
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::{
211        errors::{BuilderError, PipelineErrorKind},
212        test_utils::{TestAttributesBuilder, TestAttributesProvider, new_test_attributes_provider},
213        types::ResetSignal,
214    };
215    use alloc::{sync::Arc, vec, vec::Vec};
216    use alloy_primitives::{Address, B256, Bytes, b256};
217    use alloy_rpc_types_engine::PayloadAttributes;
218
219    fn default_optimism_payload_attributes() -> OpPayloadAttributes {
220        OpPayloadAttributes {
221            payload_attributes: PayloadAttributes {
222                timestamp: 0,
223                suggested_fee_recipient: Address::default(),
224                prev_randao: B256::default(),
225                withdrawals: None,
226                parent_beacon_block_root: None,
227            },
228            no_tx_pool: Some(false),
229            transactions: None,
230            gas_limit: None,
231            eip_1559_params: None,
232        }
233    }
234
235    fn new_attributes_queue(
236        cfg: Option<RollupConfig>,
237        origin: Option<BlockInfo>,
238        batches: Vec<PipelineResult<SingleBatch>>,
239    ) -> AttributesQueue<TestAttributesProvider, TestAttributesBuilder> {
240        let cfg = cfg.unwrap_or_default();
241        let mock_batch_queue = new_test_attributes_provider(origin, batches);
242        let mock_attributes_builder = TestAttributesBuilder::default();
243        AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder)
244    }
245
246    #[tokio::test]
247    async fn test_attributes_queue_flush() {
248        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
249        attributes_queue.batch = Some(SingleBatch::default());
250        assert!(!attributes_queue.prev.flushed);
251        attributes_queue.signal(Signal::FlushChannel).await.unwrap();
252        assert!(attributes_queue.prev.flushed);
253        assert!(attributes_queue.batch.is_none());
254    }
255
256    #[tokio::test]
257    async fn test_attributes_queue_reset() {
258        let cfg = RollupConfig::default();
259        let mock = new_test_attributes_provider(None, vec![]);
260        let mock_builder = TestAttributesBuilder::default();
261        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
262        aq.batch = Some(SingleBatch::default());
263        assert!(!aq.prev.reset);
264        aq.signal(ResetSignal::default().signal()).await.unwrap();
265        assert!(aq.batch.is_none());
266        assert!(aq.prev.reset);
267    }
268
269    #[tokio::test]
270    async fn test_load_batch_eof() {
271        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
272        let parent = L2BlockInfo::default();
273        let result = attributes_queue.load_batch(parent).await.unwrap_err();
274        assert_eq!(result, PipelineError::Eof.temp());
275    }
276
277    #[tokio::test]
278    async fn test_load_batch_last_in_span() {
279        let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]);
280        let parent = L2BlockInfo::default();
281        let result = attributes_queue.load_batch(parent).await.unwrap();
282        assert_eq!(result, Default::default());
283        assert!(attributes_queue.is_last_in_span);
284    }
285
286    #[tokio::test]
287    async fn test_create_next_attributes_bad_parent_hash() {
288        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
289        let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666");
290        let parent = L2BlockInfo {
291            block_info: BlockInfo { hash: bad_hash, ..Default::default() },
292            ..Default::default()
293        };
294        let batch = SingleBatch::default();
295        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
296        assert_eq!(
297            result,
298            PipelineErrorKind::Reset(ResetError::BadParentHash(Default::default(), bad_hash))
299        );
300    }
301
302    #[tokio::test]
303    async fn test_create_next_attributes_bad_timestamp() {
304        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
305        let parent = L2BlockInfo::default();
306        let batch = SingleBatch { timestamp: 1, ..Default::default() };
307        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
308        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 0)));
309    }
310
311    #[tokio::test]
312    async fn test_create_next_attributes_bad_parent_timestamp() {
313        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
314        let parent = L2BlockInfo {
315            block_info: BlockInfo { timestamp: 2, ..Default::default() },
316            ..Default::default()
317        };
318        let batch = SingleBatch { timestamp: 1, ..Default::default() };
319        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
320        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
321    }
322
323    #[tokio::test]
324    async fn test_create_next_attributes_bad_config_timestamp() {
325        let cfg = RollupConfig { block_time: 1, ..Default::default() };
326        let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]);
327        let parent = L2BlockInfo {
328            block_info: BlockInfo { timestamp: 1, ..Default::default() },
329            ..Default::default()
330        };
331        let batch = SingleBatch { timestamp: 1, ..Default::default() };
332        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
333        assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
334    }
335
336    #[tokio::test]
337    async fn test_create_next_attributes_preparation_fails() {
338        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
339        let parent = L2BlockInfo::default();
340        let batch = SingleBatch::default();
341        let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
342        assert_eq!(
343            result,
344            PipelineError::AttributesBuilder(BuilderError::AttributesUnavailable).crit()
345        );
346    }
347
348    #[tokio::test]
349    async fn test_create_next_attributes_success() {
350        let cfg = RollupConfig::default();
351        let mock = new_test_attributes_provider(None, vec![]);
352        let mut payload_attributes = default_optimism_payload_attributes();
353        let mock_builder =
354            TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
355        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
356        let parent = L2BlockInfo::default();
357        let txs = vec![Bytes::default(), Bytes::default()];
358        let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
359        let attributes = aq.create_next_attributes(batch, parent).await.unwrap();
360        // update the expected attributes
361        payload_attributes.no_tx_pool = Some(true);
362        match payload_attributes.transactions {
363            Some(ref mut t) => t.extend(txs),
364            None => payload_attributes.transactions = Some(txs),
365        }
366        assert_eq!(attributes, payload_attributes);
367    }
368
369    #[tokio::test]
370    async fn test_next_attributes_load_batch_eof() {
371        let mut attributes_queue = new_attributes_queue(None, None, vec![]);
372        let parent = L2BlockInfo::default();
373        let result = attributes_queue.next_attributes(parent).await.unwrap_err();
374        assert_eq!(result, PipelineError::Eof.temp());
375    }
376
377    #[tokio::test]
378    async fn test_next_attributes_load_batch_last_in_span() {
379        let cfg = RollupConfig::default();
380        let mock =
381            new_test_attributes_provider(Some(Default::default()), vec![Ok(Default::default())]);
382        let mut pa = default_optimism_payload_attributes();
383        let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] };
384        let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
385        // If we load the batch, we should get the last in span.
386        // But it won't take it so it will be available in the next_attributes call.
387        let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
388        assert!(aq.is_last_in_span);
389        assert!(aq.batch.is_some());
390        // This should successfully construct the next payload attributes.
391        // It should also reset the last in span flag and clear the batch.
392        let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();
393        pa.no_tx_pool = Some(true);
394        let populated_attributes = OpAttributesWithParent {
395            inner: pa,
396            parent: L2BlockInfo::default(),
397            derived_from: Some(BlockInfo::default()),
398            is_last_in_span: true,
399        };
400        assert_eq!(attributes, populated_attributes);
401        assert!(!aq.is_last_in_span);
402        assert!(aq.batch.is_none());
403    }
404}