1use super::{
4 DelayedL1OriginSelectorProvider, L1OriginSelector, L1OriginSelectorError, SequencerConfig,
5};
6use crate::{CancellableContext, NodeActor, actors::sequencer::conductor::ConductorClient};
7use alloy_provider::RootProvider;
8use async_trait::async_trait;
9use kona_derive::{AttributesBuilder, PipelineErrorKind, StatefulAttributesBuilder};
10use kona_genesis::RollupConfig;
11use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
12use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider};
13use kona_rpc::SequencerAdminQuery;
14use op_alloy_network::Optimism;
15use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
16use std::{
17 sync::Arc,
18 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
19};
20use tokio::{
21 select,
22 sync::{mpsc, watch},
23};
24use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
25
26#[derive(Debug)]
30pub struct SequencerActor<AB: AttributesBuilderConfig> {
31 pub builder: AB,
33 pub unsafe_head_rx: watch::Receiver<L2BlockInfo>,
35 pub admin_query_rx: mpsc::Receiver<SequencerAdminQuery>,
37}
38
39#[derive(Debug)]
41pub(super) struct SequencerActorState<AB: AttributesBuilder> {
42 pub cfg: Arc<RollupConfig>,
44 pub builder: AB,
46 pub origin_selector: L1OriginSelector<DelayedL1OriginSelectorProvider>,
48 pub build_ticker: tokio::time::Interval,
50 pub conductor: Option<ConductorClient>,
52 pub is_active: bool,
58 pub is_recovery_mode: bool,
63}
64
65pub trait AttributesBuilderConfig {
67 type AB: AttributesBuilder;
69
70 fn build(self) -> Self::AB;
72}
73
74impl SequencerActorState<StatefulAttributesBuilder<AlloyChainProvider, AlloyL2ChainProvider>> {
75 fn new(
76 seq_builder: SequencerBuilder,
77 l1_head_watcher: watch::Receiver<Option<BlockInfo>>,
78 ) -> Self {
79 let SequencerConfig {
80 sequencer_stopped,
81 sequencer_recovery_mode,
82 conductor_rpc_url,
83 l1_conf_delay,
84 } = seq_builder.seq_cfg.clone();
85
86 let cfg = seq_builder.rollup_cfg.clone();
87 let l1_provider = DelayedL1OriginSelectorProvider::new(
88 seq_builder.l1_provider.clone(),
89 l1_head_watcher,
90 l1_conf_delay,
91 );
92 let conductor = conductor_rpc_url.map(ConductorClient::new_http);
93
94 let builder = seq_builder.build();
95 let build_ticker = tokio::time::interval(Duration::from_secs(cfg.block_time));
96
97 let origin_selector = L1OriginSelector::new(cfg.clone(), l1_provider);
98
99 Self {
100 cfg,
101 builder,
102 origin_selector,
103 build_ticker,
104 conductor,
105 is_active: !sequencer_stopped,
106 is_recovery_mode: sequencer_recovery_mode,
107 }
108 }
109}
110
111const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024;
112
113#[derive(Debug)]
115pub struct SequencerBuilder {
116 pub seq_cfg: SequencerConfig,
118 pub rollup_cfg: Arc<RollupConfig>,
120 pub l1_provider: RootProvider,
122 pub l2_provider: RootProvider<Optimism>,
124}
125
126impl AttributesBuilderConfig for SequencerBuilder {
127 type AB = StatefulAttributesBuilder<AlloyChainProvider, AlloyL2ChainProvider>;
128
129 fn build(self) -> Self::AB {
130 let l1_derivation_provider =
131 AlloyChainProvider::new(self.l1_provider.clone(), DERIVATION_PROVIDER_CACHE_SIZE);
132 let l2_derivation_provider = AlloyL2ChainProvider::new(
133 self.l2_provider.clone(),
134 self.rollup_cfg.clone(),
135 DERIVATION_PROVIDER_CACHE_SIZE,
136 );
137 StatefulAttributesBuilder::new(
138 self.rollup_cfg,
139 l2_derivation_provider,
140 l1_derivation_provider,
141 )
142 }
143}
144
145#[derive(Debug)]
148pub struct SequencerInboundData {
149 pub unsafe_head_tx: watch::Sender<L2BlockInfo>,
151 pub admin_query_tx: mpsc::Sender<SequencerAdminQuery>,
153}
154
155#[derive(Debug)]
157pub struct SequencerContext {
158 pub cancellation: CancellationToken,
160 pub l1_head_rx: watch::Receiver<Option<BlockInfo>>,
162 pub reset_request_tx: mpsc::Sender<()>,
164 pub build_request_tx:
167 mpsc::Sender<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>,
168 pub gossip_payload_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
171}
172
173impl CancellableContext for SequencerContext {
174 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
175 self.cancellation.cancelled()
176 }
177}
178
179#[derive(Debug, thiserror::Error)]
181pub enum SequencerActorError {
182 #[error(transparent)]
184 AttributesBuilder(#[from] PipelineErrorKind),
185 #[error(transparent)]
187 L1OriginSelector(#[from] L1OriginSelectorError),
188 #[error("Channel closed unexpectedly")]
190 ChannelClosed,
191}
192
193impl<AB: AttributesBuilderConfig> SequencerActor<AB> {
194 pub fn new(state: AB) -> (SequencerInboundData, Self) {
196 let (unsafe_head_tx, unsafe_head_rx) = watch::channel(L2BlockInfo::default());
197 let (admin_query_tx, admin_query_rx) = mpsc::channel(1024);
198 let actor = Self { builder: state, unsafe_head_rx, admin_query_rx };
199
200 (SequencerInboundData { unsafe_head_tx, admin_query_tx }, actor)
201 }
202}
203
204impl<AB: AttributesBuilder> SequencerActorState<AB> {
205 async fn build_block(
207 &mut self,
208 ctx: &mut SequencerContext,
209 unsafe_head_rx: &mut watch::Receiver<L2BlockInfo>,
210 in_recovery_mode: bool,
211 ) -> Result<(), SequencerActorError> {
212 let unsafe_head = *unsafe_head_rx.borrow();
213 let l1_origin = match self
214 .origin_selector
215 .next_l1_origin(unsafe_head, self.is_recovery_mode)
216 .await
217 {
218 Ok(l1_origin) => l1_origin,
219 Err(err) => {
220 warn!(
221 target: "sequencer",
222 ?err,
223 "Temporary error occurred while selecting next L1 origin. Re-attempting on next tick."
224 );
225 return Ok(());
226 }
227 };
228
229 if unsafe_head.l1_origin.hash != l1_origin.parent_hash &&
230 unsafe_head.l1_origin.hash != l1_origin.hash
231 {
232 warn!(
233 target: "sequencer",
234 l1_origin = ?l1_origin,
235 unsafe_head_hash = %unsafe_head.l1_origin.hash,
236 unsafe_head_l1_origin = ?unsafe_head.l1_origin,
237 "Cannot build new L2 block on inconsistent L1 origin, resetting engine"
238 );
239 if let Err(err) = ctx.reset_request_tx.send(()).await {
240 error!(target: "sequencer", ?err, "Failed to reset engine");
241 ctx.cancellation.cancel();
242 return Err(SequencerActorError::ChannelClosed);
243 }
244 return Ok(());
245 }
246
247 info!(
248 target: "sequencer",
249 parent_num = unsafe_head.block_info.number,
250 l1_origin_num = l1_origin.number,
251 "Started sequencing new block"
252 );
253
254 let _attributes_build_start = Instant::now();
256 let mut attributes =
257 match self.builder.prepare_payload_attributes(unsafe_head, l1_origin.id()).await {
258 Ok(attrs) => attrs,
259 Err(PipelineErrorKind::Temporary(_)) => {
260 return Ok(());
261 }
263 Err(PipelineErrorKind::Reset(_)) => {
264 if let Err(err) = ctx.reset_request_tx.send(()).await {
265 error!(target: "sequencer", ?err, "Failed to reset engine");
266 ctx.cancellation.cancel();
267 return Err(SequencerActorError::ChannelClosed);
268 }
269
270 warn!(
271 target: "sequencer",
272 "Resetting engine due to pipeline error while preparing payload attributes"
273 );
274 return Ok(());
275 }
276 Err(err @ PipelineErrorKind::Critical(_)) => {
277 error!(target: "sequencer", ?err, "Failed to prepare payload attributes");
278 ctx.cancellation.cancel();
279 return Err(err.into());
280 }
281 };
282
283 if in_recovery_mode {
284 attributes.no_tx_pool = Some(true);
285 }
286
287 attributes.no_tx_pool = (attributes.payload_attributes.timestamp >
290 l1_origin.timestamp + self.cfg.max_sequencer_drift(l1_origin.timestamp))
291 .then_some(true);
292
293 if self.cfg.is_first_ecotone_block(attributes.payload_attributes.timestamp) {
295 info!(target: "sequencer", "Sequencing ecotone upgrade block");
296 attributes.no_tx_pool = Some(true);
297 }
298
299 if self.cfg.is_first_fjord_block(attributes.payload_attributes.timestamp) {
301 info!(target: "sequencer", "Sequencing fjord upgrade block");
302 attributes.no_tx_pool = Some(true);
303 }
304
305 if self.cfg.is_first_granite_block(attributes.payload_attributes.timestamp) {
307 info!(target: "sequencer", "Sequencing granite upgrade block");
308 attributes.no_tx_pool = Some(true);
309 }
310
311 if self.cfg.is_first_holocene_block(attributes.payload_attributes.timestamp) {
313 info!(target: "sequencer", "Sequencing holocene upgrade block");
314 attributes.no_tx_pool = Some(true);
315 }
316
317 if self.cfg.is_first_isthmus_block(attributes.payload_attributes.timestamp) {
319 info!(target: "sequencer", "Sequencing isthmus upgrade block");
320 attributes.no_tx_pool = Some(true);
321 }
322
323 if self.cfg.is_first_interop_block(attributes.payload_attributes.timestamp) {
325 info!(target: "sequencer", "Sequencing interop upgrade block");
326 attributes.no_tx_pool = Some(true);
327 }
328
329 let attrs_with_parent = OpAttributesWithParent::new(attributes, unsafe_head, None, false);
330
331 kona_macros::set!(
333 gauge,
334 crate::Metrics::SEQUENCER_ATTRIBUTES_BUILDER_DURATION,
335 _attributes_build_start.elapsed()
336 );
337
338 let (payload_tx, payload_rx) = mpsc::channel(1);
340
341 let _build_request_start = Instant::now();
343 if let Err(err) = ctx.build_request_tx.send((attrs_with_parent, payload_tx)).await {
344 error!(target: "sequencer", ?err, "Failed to send built attributes to engine");
345 ctx.cancellation.cancel();
346 return Err(SequencerActorError::ChannelClosed);
347 }
348
349 let payload = self.try_wait_for_payload(ctx, payload_rx).await?;
350
351 kona_macros::set!(
353 gauge,
354 crate::Metrics::SEQUENCER_BLOCK_BUILDING_JOB_DURATION,
355 _build_request_start.elapsed()
356 );
357
358 if let Some(conductor) = &self.conductor {
360 let _conductor_commitment_start = Instant::now();
361 if let Err(err) = conductor.commit_unsafe_payload(&payload).await {
362 error!(target: "sequencer", ?err, "Failed to commit unsafe payload to conductor");
363 }
364
365 kona_macros::set!(
366 gauge,
367 crate::Metrics::SEQUENCER_CONDUCTOR_COMMITMENT_DURATION,
368 _conductor_commitment_start.elapsed()
369 );
370 }
371
372 let now =
373 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
374 let then = payload.execution_payload.timestamp() + self.cfg.block_time;
375 if then.saturating_sub(now) <= self.cfg.block_time {
376 warn!(
377 target: "sequencer",
378 "Next block timestamp is more than a block time away from now, building immediately"
379 );
380 self.build_ticker.reset_immediately();
381 }
382
383 self.schedule_gossip(ctx, payload).await
384 }
385
386 async fn try_wait_for_payload(
389 &mut self,
390 ctx: &mut SequencerContext,
391 mut payload_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
392 ) -> Result<OpExecutionPayloadEnvelope, SequencerActorError> {
393 payload_rx.recv().await.ok_or_else(|| {
394 error!(target: "sequencer", "Failed to receive built payload");
395 ctx.cancellation.cancel();
396 SequencerActorError::ChannelClosed
397 })
398 }
399
400 async fn schedule_gossip(
402 &mut self,
403 ctx: &mut SequencerContext,
404 payload: OpExecutionPayloadEnvelope,
405 ) -> Result<(), SequencerActorError> {
406 if let Err(err) = ctx.gossip_payload_tx.send(payload).await {
408 error!(target: "sequencer", ?err, "Failed to send payload to be signed and gossipped");
409 ctx.cancellation.cancel();
410 return Err(SequencerActorError::ChannelClosed);
411 }
412
413 Ok(())
414 }
415
416 async fn schedule_initial_reset(
418 &mut self,
419 ctx: &mut SequencerContext,
420 unsafe_head_rx: &mut watch::Receiver<L2BlockInfo>,
421 ) -> Result<(), SequencerActorError> {
422 if let Err(err) = ctx.reset_request_tx.send(()).await {
424 error!(target: "sequencer", ?err, "Failed to send reset request to engine");
425 ctx.cancellation.cancel();
426 return Err(SequencerActorError::ChannelClosed);
427 }
428
429 if unsafe_head_rx.changed().await.is_err() {
433 error!(target: "sequencer", "Failed to receive unsafe head update after reset request");
434 ctx.cancellation.cancel();
435 return Err(SequencerActorError::ChannelClosed);
436 }
437
438 Ok(())
439 }
440
441 #[cfg(feature = "metrics")]
443 fn update_metrics(&self) {
444 let state_flags: [(&str, String); 2] = [
445 ("active", self.is_active.to_string()),
446 ("recovery", self.is_recovery_mode.to_string()),
447 ];
448
449 let gauge = metrics::gauge!(crate::Metrics::SEQUENCER_STATE, &state_flags);
450 gauge.set(1);
451 }
452}
453
454#[async_trait]
455impl NodeActor for SequencerActor<SequencerBuilder> {
456 type Error = SequencerActorError;
457 type OutboundData = SequencerContext;
458 type Builder = SequencerBuilder;
459 type InboundData = SequencerInboundData;
460
461 fn build(config: Self::Builder) -> (Self::InboundData, Self) {
462 Self::new(config)
463 }
464
465 async fn start(mut self, mut ctx: Self::OutboundData) -> Result<(), Self::Error> {
466 let mut state = SequencerActorState::new(self.builder, ctx.l1_head_rx.clone());
467
468 #[cfg(feature = "metrics")]
470 state.update_metrics();
471
472 state.schedule_initial_reset(&mut ctx, &mut self.unsafe_head_rx).await?;
474
475 loop {
476 select! {
477 biased;
480 _ = ctx.cancellation.cancelled() => {
481 info!(
482 target: "sequencer",
483 "Received shutdown signal. Exiting sequencer task."
484 );
485 return Ok(());
486 }
487 Some(admin_query) = self.admin_query_rx.recv(), if !self.admin_query_rx.is_closed() => {
489 let is_sequencer_active = state.is_active;
490
491 if let Err(e) = state.handle_admin_query(admin_query, &mut self.unsafe_head_rx).await {
492 error!(target: "sequencer", err = ?e, "Failed to handle admin query");
493 }
494
495 if is_sequencer_active != state.is_active {
497 state.build_ticker.reset_immediately();
498 }
499
500 #[cfg(feature = "metrics")]
502 state.update_metrics();
503 }
504 _ = state.build_ticker.tick(), if state.is_active => {
506 state.build_block(&mut ctx, &mut self.unsafe_head_rx, state.is_recovery_mode).await?;
507 }
508 }
509 }
510 }
511}