1use crate::{
4 errors::{PipelineError, ResetError},
5 traits::{
6 AttributesBuilder, AttributesProvider, NextAttributes, OriginAdvancer, OriginProvider,
7 SignalReceiver,
8 },
9 types::{PipelineResult, Signal},
10};
11use alloc::{boxed::Box, sync::Arc};
12use async_trait::async_trait;
13use core::fmt::Debug;
14use kona_genesis::RollupConfig;
15use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent, SingleBatch};
16use op_alloy_rpc_types_engine::OpPayloadAttributes;
17
18#[derive(Debug)]
32pub struct AttributesQueue<P, AB>
33where
34 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
35 AB: AttributesBuilder + Debug,
36{
37 cfg: Arc<RollupConfig>,
39 prev: P,
41 is_last_in_span: bool,
43 batch: Option<SingleBatch>,
45 builder: AB,
47}
48
49impl<P, AB> AttributesQueue<P, AB>
50where
51 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
52 AB: AttributesBuilder + Debug,
53{
54 pub const fn new(cfg: Arc<RollupConfig>, prev: P, builder: AB) -> Self {
56 Self { cfg, prev, is_last_in_span: false, batch: None, builder }
57 }
58
59 pub async fn load_batch(&mut self, parent: L2BlockInfo) -> PipelineResult<SingleBatch> {
61 if self.batch.is_none() {
62 let batch = self.prev.next_batch(parent).await?;
63 self.batch = Some(batch);
64 self.is_last_in_span = self.prev.is_last_in_span();
65 }
66 self.batch.as_ref().cloned().ok_or(PipelineError::Eof.temp())
67 }
68
69 pub async fn next_attributes(
71 &mut self,
72 parent: L2BlockInfo,
73 ) -> PipelineResult<OpAttributesWithParent> {
74 let batch = match self.load_batch(parent).await {
75 Ok(batch) => batch,
76 Err(e) => {
77 return Err(e);
78 }
79 };
80
81 #[cfg(feature = "metrics")]
83 let start = std::time::Instant::now();
84 let attributes = match self.create_next_attributes(batch, parent).await {
85 Ok(attributes) => attributes,
86 Err(e) => {
87 return Err(e);
88 }
89 };
90 let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
91 let populated_attributes =
92 OpAttributesWithParent::new(attributes, parent, Some(origin), self.is_last_in_span);
93 kona_macros::record!(
94 histogram,
95 crate::metrics::Metrics::PIPELINE_ATTRIBUTES_BUILD_DURATION,
96 start.elapsed().as_secs_f64()
97 );
98
99 self.batch = None;
101 self.is_last_in_span = false;
102 Ok(populated_attributes)
103 }
104
105 pub async fn create_next_attributes(
108 &mut self,
109 batch: SingleBatch,
110 parent: L2BlockInfo,
111 ) -> PipelineResult<OpPayloadAttributes> {
112 if batch.parent_hash != parent.block_info.hash {
114 return Err(ResetError::BadParentHash(batch.parent_hash, parent.block_info.hash).into());
115 }
116
117 let actual = parent.block_info.timestamp + self.cfg.block_time;
119 if actual != batch.timestamp {
120 return Err(ResetError::BadTimestamp(batch.timestamp, actual).into());
121 }
122
123 let tx_count = batch.transactions.len();
125 let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch()).await?;
126 attributes.no_tx_pool = Some(true);
127 match attributes.transactions {
128 Some(ref mut txs) => txs.extend(batch.transactions),
129 None => {
130 if !batch.transactions.is_empty() {
131 attributes.transactions = Some(batch.transactions);
132 }
133 }
134 }
135
136 info!(
137 target: "attributes_queue",
138 txs = tx_count,
139 timestamp = batch.timestamp,
140 "generated attributes in payload queue",
141 );
142
143 Ok(attributes)
144 }
145}
146
147#[async_trait]
148impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
149where
150 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
151 AB: AttributesBuilder + Debug + Send,
152{
153 async fn advance_origin(&mut self) -> PipelineResult<()> {
154 self.prev.advance_origin().await
155 }
156}
157
158#[async_trait]
159impl<P, AB> NextAttributes for AttributesQueue<P, AB>
160where
161 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
162 AB: AttributesBuilder + Debug + Send,
163{
164 async fn next_attributes(
165 &mut self,
166 parent: L2BlockInfo,
167 ) -> PipelineResult<OpAttributesWithParent> {
168 self.next_attributes(parent).await
169 }
170}
171
172impl<P, AB> OriginProvider for AttributesQueue<P, AB>
173where
174 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
175 AB: AttributesBuilder + Debug,
176{
177 fn origin(&self) -> Option<BlockInfo> {
178 self.prev.origin()
179 }
180}
181
182#[async_trait]
183impl<P, AB> SignalReceiver for AttributesQueue<P, AB>
184where
185 P: AttributesProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
186 AB: AttributesBuilder + Send + Debug,
187{
188 async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
189 match signal {
190 s @ Signal::Reset(_) | s @ Signal::Activation(_) => {
191 self.prev.signal(s).await?;
192 self.batch = None;
193 self.is_last_in_span = false;
194 }
195 s @ Signal::FlushChannel => {
196 self.batch = None;
197 self.prev.signal(s).await?;
198 }
199 s @ Signal::ProvideBlock(_) => {
200 self.prev.signal(s).await?;
201 }
202 }
203 Ok(())
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::{
211 errors::{BuilderError, PipelineErrorKind},
212 test_utils::{TestAttributesBuilder, TestAttributesProvider, new_test_attributes_provider},
213 types::ResetSignal,
214 };
215 use alloc::{sync::Arc, vec, vec::Vec};
216 use alloy_primitives::{Address, B256, Bytes, b256};
217 use alloy_rpc_types_engine::PayloadAttributes;
218
219 fn default_optimism_payload_attributes() -> OpPayloadAttributes {
220 OpPayloadAttributes {
221 payload_attributes: PayloadAttributes {
222 timestamp: 0,
223 suggested_fee_recipient: Address::default(),
224 prev_randao: B256::default(),
225 withdrawals: None,
226 parent_beacon_block_root: None,
227 },
228 no_tx_pool: Some(false),
229 transactions: None,
230 gas_limit: None,
231 eip_1559_params: None,
232 }
233 }
234
235 fn new_attributes_queue(
236 cfg: Option<RollupConfig>,
237 origin: Option<BlockInfo>,
238 batches: Vec<PipelineResult<SingleBatch>>,
239 ) -> AttributesQueue<TestAttributesProvider, TestAttributesBuilder> {
240 let cfg = cfg.unwrap_or_default();
241 let mock_batch_queue = new_test_attributes_provider(origin, batches);
242 let mock_attributes_builder = TestAttributesBuilder::default();
243 AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder)
244 }
245
246 #[tokio::test]
247 async fn test_attributes_queue_flush() {
248 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
249 attributes_queue.batch = Some(SingleBatch::default());
250 assert!(!attributes_queue.prev.flushed);
251 attributes_queue.signal(Signal::FlushChannel).await.unwrap();
252 assert!(attributes_queue.prev.flushed);
253 assert!(attributes_queue.batch.is_none());
254 }
255
256 #[tokio::test]
257 async fn test_attributes_queue_reset() {
258 let cfg = RollupConfig::default();
259 let mock = new_test_attributes_provider(None, vec![]);
260 let mock_builder = TestAttributesBuilder::default();
261 let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
262 aq.batch = Some(SingleBatch::default());
263 assert!(!aq.prev.reset);
264 aq.signal(ResetSignal::default().signal()).await.unwrap();
265 assert!(aq.batch.is_none());
266 assert!(aq.prev.reset);
267 }
268
269 #[tokio::test]
270 async fn test_load_batch_eof() {
271 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
272 let parent = L2BlockInfo::default();
273 let result = attributes_queue.load_batch(parent).await.unwrap_err();
274 assert_eq!(result, PipelineError::Eof.temp());
275 }
276
277 #[tokio::test]
278 async fn test_load_batch_last_in_span() {
279 let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]);
280 let parent = L2BlockInfo::default();
281 let result = attributes_queue.load_batch(parent).await.unwrap();
282 assert_eq!(result, Default::default());
283 assert!(attributes_queue.is_last_in_span);
284 }
285
286 #[tokio::test]
287 async fn test_create_next_attributes_bad_parent_hash() {
288 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
289 let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666");
290 let parent = L2BlockInfo {
291 block_info: BlockInfo { hash: bad_hash, ..Default::default() },
292 ..Default::default()
293 };
294 let batch = SingleBatch::default();
295 let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
296 assert_eq!(
297 result,
298 PipelineErrorKind::Reset(ResetError::BadParentHash(Default::default(), bad_hash))
299 );
300 }
301
302 #[tokio::test]
303 async fn test_create_next_attributes_bad_timestamp() {
304 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
305 let parent = L2BlockInfo::default();
306 let batch = SingleBatch { timestamp: 1, ..Default::default() };
307 let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
308 assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 0)));
309 }
310
311 #[tokio::test]
312 async fn test_create_next_attributes_bad_parent_timestamp() {
313 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
314 let parent = L2BlockInfo {
315 block_info: BlockInfo { timestamp: 2, ..Default::default() },
316 ..Default::default()
317 };
318 let batch = SingleBatch { timestamp: 1, ..Default::default() };
319 let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
320 assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
321 }
322
323 #[tokio::test]
324 async fn test_create_next_attributes_bad_config_timestamp() {
325 let cfg = RollupConfig { block_time: 1, ..Default::default() };
326 let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]);
327 let parent = L2BlockInfo {
328 block_info: BlockInfo { timestamp: 1, ..Default::default() },
329 ..Default::default()
330 };
331 let batch = SingleBatch { timestamp: 1, ..Default::default() };
332 let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
333 assert_eq!(result, PipelineErrorKind::Reset(ResetError::BadTimestamp(1, 2)));
334 }
335
336 #[tokio::test]
337 async fn test_create_next_attributes_preparation_fails() {
338 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
339 let parent = L2BlockInfo::default();
340 let batch = SingleBatch::default();
341 let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err();
342 assert_eq!(
343 result,
344 PipelineError::AttributesBuilder(BuilderError::AttributesUnavailable).crit()
345 );
346 }
347
348 #[tokio::test]
349 async fn test_create_next_attributes_success() {
350 let cfg = RollupConfig::default();
351 let mock = new_test_attributes_provider(None, vec![]);
352 let mut payload_attributes = default_optimism_payload_attributes();
353 let mock_builder =
354 TestAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] };
355 let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
356 let parent = L2BlockInfo::default();
357 let txs = vec![Bytes::default(), Bytes::default()];
358 let batch = SingleBatch { transactions: txs.clone(), ..Default::default() };
359 let attributes = aq.create_next_attributes(batch, parent).await.unwrap();
360 payload_attributes.no_tx_pool = Some(true);
362 match payload_attributes.transactions {
363 Some(ref mut t) => t.extend(txs),
364 None => payload_attributes.transactions = Some(txs),
365 }
366 assert_eq!(attributes, payload_attributes);
367 }
368
369 #[tokio::test]
370 async fn test_next_attributes_load_batch_eof() {
371 let mut attributes_queue = new_attributes_queue(None, None, vec![]);
372 let parent = L2BlockInfo::default();
373 let result = attributes_queue.next_attributes(parent).await.unwrap_err();
374 assert_eq!(result, PipelineError::Eof.temp());
375 }
376
377 #[tokio::test]
378 async fn test_next_attributes_load_batch_last_in_span() {
379 let cfg = RollupConfig::default();
380 let mock =
381 new_test_attributes_provider(Some(Default::default()), vec![Ok(Default::default())]);
382 let mut pa = default_optimism_payload_attributes();
383 let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] };
384 let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
385 let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap();
388 assert!(aq.is_last_in_span);
389 assert!(aq.batch.is_some());
390 let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();
393 pa.no_tx_pool = Some(true);
394 let populated_attributes = OpAttributesWithParent {
395 inner: pa,
396 parent: L2BlockInfo::default(),
397 derived_from: Some(BlockInfo::default()),
398 is_last_in_span: true,
399 };
400 assert_eq!(attributes, populated_attributes);
401 assert!(!aq.is_last_in_span);
402 assert!(aq.batch.is_none());
403 }
404}