kona_node_service/actors/sequencer/
actor.rs

1//! The [`SequencerActor`].
2
3use super::{
4    DelayedL1OriginSelectorProvider, L1OriginSelector, L1OriginSelectorError, SequencerConfig,
5};
6use crate::{CancellableContext, NodeActor, actors::sequencer::conductor::ConductorClient};
7use alloy_provider::RootProvider;
8use async_trait::async_trait;
9use kona_derive::{AttributesBuilder, PipelineErrorKind, StatefulAttributesBuilder};
10use kona_genesis::RollupConfig;
11use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
12use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider};
13use kona_rpc::SequencerAdminQuery;
14use op_alloy_network::Optimism;
15use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
16use std::{
17    sync::Arc,
18    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
19};
20use tokio::{
21    select,
22    sync::{mpsc, watch},
23};
24use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
25
26/// The [`SequencerActor`] is responsible for building L2 blocks on top of the current unsafe head
27/// and scheduling them to be signed and gossipped by the P2P layer, extending the L2 chain with new
28/// blocks.
29#[derive(Debug)]
30pub struct SequencerActor<AB: AttributesBuilderConfig> {
31    /// The [`AttributesBuilderConfig`].
32    pub builder: AB,
33    /// Watch channel to observe the unsafe head of the engine.
34    pub unsafe_head_rx: watch::Receiver<L2BlockInfo>,
35    /// Channel to receive admin queries from the sequencer actor.
36    pub admin_query_rx: mpsc::Receiver<SequencerAdminQuery>,
37}
38
39/// The state of the [`SequencerActor`].
40#[derive(Debug)]
41pub(super) struct SequencerActorState<AB: AttributesBuilder> {
42    /// The [`RollupConfig`] for the chain being sequenced.
43    pub cfg: Arc<RollupConfig>,
44    /// The [`AttributesBuilder`].
45    pub builder: AB,
46    /// The [`L1OriginSelector`].
47    pub origin_selector: L1OriginSelector<DelayedL1OriginSelectorProvider>,
48    /// The ticker for building new blocks.
49    pub build_ticker: tokio::time::Interval,
50    /// The conductor RPC client.
51    pub conductor: Option<ConductorClient>,
52    /// Whether the sequencer is active. This is used inside communications between the sequencer
53    /// and the op-conductor to activate/deactivate the sequencer when leader election occurs.
54    ///
55    /// ## Default value
56    /// At startup, the sequencer is active.
57    pub is_active: bool,
58    /// Whether the sequencer is in recovery mode.
59    ///
60    /// ## Default value
61    /// At startup, the sequencer is _NOT_ in recovery mode.
62    pub is_recovery_mode: bool,
63}
64
65/// A trait for building [`AttributesBuilder`]s.
66pub trait AttributesBuilderConfig {
67    /// The type of [`AttributesBuilder`] to build.
68    type AB: AttributesBuilder;
69
70    /// Builds the [`AttributesBuilder`].
71    fn build(self) -> Self::AB;
72}
73
74impl SequencerActorState<StatefulAttributesBuilder<AlloyChainProvider, AlloyL2ChainProvider>> {
75    fn new(
76        seq_builder: SequencerBuilder,
77        l1_head_watcher: watch::Receiver<Option<BlockInfo>>,
78    ) -> Self {
79        let SequencerConfig {
80            sequencer_stopped,
81            sequencer_recovery_mode,
82            conductor_rpc_url,
83            l1_conf_delay,
84        } = seq_builder.seq_cfg.clone();
85
86        let cfg = seq_builder.rollup_cfg.clone();
87        let l1_provider = DelayedL1OriginSelectorProvider::new(
88            seq_builder.l1_provider.clone(),
89            l1_head_watcher,
90            l1_conf_delay,
91        );
92        let conductor = conductor_rpc_url.map(ConductorClient::new_http);
93
94        let builder = seq_builder.build();
95        let build_ticker = tokio::time::interval(Duration::from_secs(cfg.block_time));
96
97        let origin_selector = L1OriginSelector::new(cfg.clone(), l1_provider);
98
99        Self {
100            cfg,
101            builder,
102            origin_selector,
103            build_ticker,
104            conductor,
105            is_active: !sequencer_stopped,
106            is_recovery_mode: sequencer_recovery_mode,
107        }
108    }
109}
110
111const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024;
112
113/// The builder for the [`SequencerActor`].
114#[derive(Debug)]
115pub struct SequencerBuilder {
116    /// The [`SequencerConfig`].
117    pub seq_cfg: SequencerConfig,
118    /// The [`RollupConfig`] for the chain being sequenced.
119    pub rollup_cfg: Arc<RollupConfig>,
120    /// The L1 provider.
121    pub l1_provider: RootProvider,
122    /// The L2 provider.
123    pub l2_provider: RootProvider<Optimism>,
124}
125
126impl AttributesBuilderConfig for SequencerBuilder {
127    type AB = StatefulAttributesBuilder<AlloyChainProvider, AlloyL2ChainProvider>;
128
129    fn build(self) -> Self::AB {
130        let l1_derivation_provider =
131            AlloyChainProvider::new(self.l1_provider.clone(), DERIVATION_PROVIDER_CACHE_SIZE);
132        let l2_derivation_provider = AlloyL2ChainProvider::new(
133            self.l2_provider.clone(),
134            self.rollup_cfg.clone(),
135            DERIVATION_PROVIDER_CACHE_SIZE,
136        );
137        StatefulAttributesBuilder::new(
138            self.rollup_cfg,
139            l2_derivation_provider,
140            l1_derivation_provider,
141        )
142    }
143}
144
145/// The inbound channels for the [`SequencerActor`].
146/// These channels are used by external actors to send messages to the sequencer actor.
147#[derive(Debug)]
148pub struct SequencerInboundData {
149    /// Watch channel to observe the unsafe head of the engine.
150    pub unsafe_head_tx: watch::Sender<L2BlockInfo>,
151    /// Channel to send admin queries to the sequencer actor.
152    pub admin_query_tx: mpsc::Sender<SequencerAdminQuery>,
153}
154
155/// The communication context used by the [`SequencerActor`].
156#[derive(Debug)]
157pub struct SequencerContext {
158    /// The cancellation token, shared between all tasks.
159    pub cancellation: CancellationToken,
160    /// Watch channel to observe the L1 head of the chain.
161    pub l1_head_rx: watch::Receiver<Option<BlockInfo>>,
162    /// Sender to request the engine to reset.
163    pub reset_request_tx: mpsc::Sender<()>,
164    /// Sender to request the execution layer to build a payload attributes on top of the
165    /// current unsafe head.
166    pub build_request_tx:
167        mpsc::Sender<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>,
168    /// A sender to asynchronously sign and gossip built [`OpExecutionPayloadEnvelope`]s to the
169    /// network actor.
170    pub gossip_payload_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
171}
172
173impl CancellableContext for SequencerContext {
174    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
175        self.cancellation.cancelled()
176    }
177}
178
179/// An error produced by the [`SequencerActor`].
180#[derive(Debug, thiserror::Error)]
181pub enum SequencerActorError {
182    /// An error occurred while building payload attributes.
183    #[error(transparent)]
184    AttributesBuilder(#[from] PipelineErrorKind),
185    /// An error occurred while selecting the next L1 origin.
186    #[error(transparent)]
187    L1OriginSelector(#[from] L1OriginSelectorError),
188    /// A channel was unexpectedly closed.
189    #[error("Channel closed unexpectedly")]
190    ChannelClosed,
191}
192
193impl<AB: AttributesBuilderConfig> SequencerActor<AB> {
194    /// Creates a new instance of the [`SequencerActor`].
195    pub fn new(state: AB) -> (SequencerInboundData, Self) {
196        let (unsafe_head_tx, unsafe_head_rx) = watch::channel(L2BlockInfo::default());
197        let (admin_query_tx, admin_query_rx) = mpsc::channel(1024);
198        let actor = Self { builder: state, unsafe_head_rx, admin_query_rx };
199
200        (SequencerInboundData { unsafe_head_tx, admin_query_tx }, actor)
201    }
202}
203
204impl<AB: AttributesBuilder> SequencerActorState<AB> {
205    /// Starts the build job for the next L2 block, on top of the current unsafe head.
206    async fn build_block(
207        &mut self,
208        ctx: &mut SequencerContext,
209        unsafe_head_rx: &mut watch::Receiver<L2BlockInfo>,
210        in_recovery_mode: bool,
211    ) -> Result<(), SequencerActorError> {
212        let unsafe_head = *unsafe_head_rx.borrow();
213        let l1_origin = match self
214            .origin_selector
215            .next_l1_origin(unsafe_head, self.is_recovery_mode)
216            .await
217        {
218            Ok(l1_origin) => l1_origin,
219            Err(err) => {
220                warn!(
221                    target: "sequencer",
222                    ?err,
223                    "Temporary error occurred while selecting next L1 origin. Re-attempting on next tick."
224                );
225                return Ok(());
226            }
227        };
228
229        if unsafe_head.l1_origin.hash != l1_origin.parent_hash &&
230            unsafe_head.l1_origin.hash != l1_origin.hash
231        {
232            warn!(
233                target: "sequencer",
234                l1_origin = ?l1_origin,
235                unsafe_head_hash = %unsafe_head.l1_origin.hash,
236                unsafe_head_l1_origin = ?unsafe_head.l1_origin,
237                "Cannot build new L2 block on inconsistent L1 origin, resetting engine"
238            );
239            if let Err(err) = ctx.reset_request_tx.send(()).await {
240                error!(target: "sequencer", ?err, "Failed to reset engine");
241                ctx.cancellation.cancel();
242                return Err(SequencerActorError::ChannelClosed);
243            }
244            return Ok(());
245        }
246
247        info!(
248            target: "sequencer",
249            parent_num = unsafe_head.block_info.number,
250            l1_origin_num = l1_origin.number,
251            "Started sequencing new block"
252        );
253
254        // Build the payload attributes for the next block.
255        let _attributes_build_start = Instant::now();
256        let mut attributes =
257            match self.builder.prepare_payload_attributes(unsafe_head, l1_origin.id()).await {
258                Ok(attrs) => attrs,
259                Err(PipelineErrorKind::Temporary(_)) => {
260                    return Ok(());
261                    // Do nothing and allow a retry.
262                }
263                Err(PipelineErrorKind::Reset(_)) => {
264                    if let Err(err) = ctx.reset_request_tx.send(()).await {
265                        error!(target: "sequencer", ?err, "Failed to reset engine");
266                        ctx.cancellation.cancel();
267                        return Err(SequencerActorError::ChannelClosed);
268                    }
269
270                    warn!(
271                        target: "sequencer",
272                        "Resetting engine due to pipeline error while preparing payload attributes"
273                    );
274                    return Ok(());
275                }
276                Err(err @ PipelineErrorKind::Critical(_)) => {
277                    error!(target: "sequencer", ?err, "Failed to prepare payload attributes");
278                    ctx.cancellation.cancel();
279                    return Err(err.into());
280                }
281            };
282
283        if in_recovery_mode {
284            attributes.no_tx_pool = Some(true);
285        }
286
287        // If the next L2 block is beyond the sequencer drift threshold, we must produce an empty
288        // block.
289        attributes.no_tx_pool = (attributes.payload_attributes.timestamp >
290            l1_origin.timestamp + self.cfg.max_sequencer_drift(l1_origin.timestamp))
291        .then_some(true);
292
293        // Do not include transactions in the first Ecotone block.
294        if self.cfg.is_first_ecotone_block(attributes.payload_attributes.timestamp) {
295            info!(target: "sequencer", "Sequencing ecotone upgrade block");
296            attributes.no_tx_pool = Some(true);
297        }
298
299        // Do not include transactions in the first Fjord block.
300        if self.cfg.is_first_fjord_block(attributes.payload_attributes.timestamp) {
301            info!(target: "sequencer", "Sequencing fjord upgrade block");
302            attributes.no_tx_pool = Some(true);
303        }
304
305        // Do not include transactions in the first Granite block.
306        if self.cfg.is_first_granite_block(attributes.payload_attributes.timestamp) {
307            info!(target: "sequencer", "Sequencing granite upgrade block");
308            attributes.no_tx_pool = Some(true);
309        }
310
311        // Do not include transactions in the first Holocene block.
312        if self.cfg.is_first_holocene_block(attributes.payload_attributes.timestamp) {
313            info!(target: "sequencer", "Sequencing holocene upgrade block");
314            attributes.no_tx_pool = Some(true);
315        }
316
317        // Do not include transactions in the first Isthmus block.
318        if self.cfg.is_first_isthmus_block(attributes.payload_attributes.timestamp) {
319            info!(target: "sequencer", "Sequencing isthmus upgrade block");
320            attributes.no_tx_pool = Some(true);
321        }
322
323        // Do not include transactions in the first Interop block.
324        if self.cfg.is_first_interop_block(attributes.payload_attributes.timestamp) {
325            info!(target: "sequencer", "Sequencing interop upgrade block");
326            attributes.no_tx_pool = Some(true);
327        }
328
329        let attrs_with_parent = OpAttributesWithParent::new(attributes, unsafe_head, None, false);
330
331        // Log the attributes build duration, if metrics are enabled.
332        kona_macros::set!(
333            gauge,
334            crate::Metrics::SEQUENCER_ATTRIBUTES_BUILDER_DURATION,
335            _attributes_build_start.elapsed()
336        );
337
338        // Create a new channel to receive the built payload.
339        let (payload_tx, payload_rx) = mpsc::channel(1);
340
341        // Send the built attributes to the engine to be built.
342        let _build_request_start = Instant::now();
343        if let Err(err) = ctx.build_request_tx.send((attrs_with_parent, payload_tx)).await {
344            error!(target: "sequencer", ?err, "Failed to send built attributes to engine");
345            ctx.cancellation.cancel();
346            return Err(SequencerActorError::ChannelClosed);
347        }
348
349        let payload = self.try_wait_for_payload(ctx, payload_rx).await?;
350
351        // Log the block building job duration, if metrics are enabled.
352        kona_macros::set!(
353            gauge,
354            crate::Metrics::SEQUENCER_BLOCK_BUILDING_JOB_DURATION,
355            _build_request_start.elapsed()
356        );
357
358        // If the conductor is available, commit the payload to it.
359        if let Some(conductor) = &self.conductor {
360            let _conductor_commitment_start = Instant::now();
361            if let Err(err) = conductor.commit_unsafe_payload(&payload).await {
362                error!(target: "sequencer", ?err, "Failed to commit unsafe payload to conductor");
363            }
364
365            kona_macros::set!(
366                gauge,
367                crate::Metrics::SEQUENCER_CONDUCTOR_COMMITMENT_DURATION,
368                _conductor_commitment_start.elapsed()
369            );
370        }
371
372        let now =
373            SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
374        let then = payload.execution_payload.timestamp() + self.cfg.block_time;
375        if then.saturating_sub(now) <= self.cfg.block_time {
376            warn!(
377                target: "sequencer",
378                "Next block timestamp is more than a block time away from now, building immediately"
379            );
380            self.build_ticker.reset_immediately();
381        }
382
383        self.schedule_gossip(ctx, payload).await
384    }
385
386    /// Waits for the next payload to be built and returns it, if there is a payload receiver
387    /// present.
388    async fn try_wait_for_payload(
389        &mut self,
390        ctx: &mut SequencerContext,
391        mut payload_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
392    ) -> Result<OpExecutionPayloadEnvelope, SequencerActorError> {
393        payload_rx.recv().await.ok_or_else(|| {
394            error!(target: "sequencer", "Failed to receive built payload");
395            ctx.cancellation.cancel();
396            SequencerActorError::ChannelClosed
397        })
398    }
399
400    /// Schedules a built [`OpExecutionPayloadEnvelope`] to be signed and gossipped.
401    async fn schedule_gossip(
402        &mut self,
403        ctx: &mut SequencerContext,
404        payload: OpExecutionPayloadEnvelope,
405    ) -> Result<(), SequencerActorError> {
406        // Send the payload to the P2P layer to be signed and gossipped.
407        if let Err(err) = ctx.gossip_payload_tx.send(payload).await {
408            error!(target: "sequencer", ?err, "Failed to send payload to be signed and gossipped");
409            ctx.cancellation.cancel();
410            return Err(SequencerActorError::ChannelClosed);
411        }
412
413        Ok(())
414    }
415
416    /// Schedules the initial engine reset request and waits for the unsafe head to be updated.
417    async fn schedule_initial_reset(
418        &mut self,
419        ctx: &mut SequencerContext,
420        unsafe_head_rx: &mut watch::Receiver<L2BlockInfo>,
421    ) -> Result<(), SequencerActorError> {
422        // Schedule a reset of the engine, in order to initialize the engine state.
423        if let Err(err) = ctx.reset_request_tx.send(()).await {
424            error!(target: "sequencer", ?err, "Failed to send reset request to engine");
425            ctx.cancellation.cancel();
426            return Err(SequencerActorError::ChannelClosed);
427        }
428
429        // Wait for the reset request to be processed before starting the block building loop.
430        //
431        // We know that the reset has concluded when the unsafe head watch channel is updated.
432        if unsafe_head_rx.changed().await.is_err() {
433            error!(target: "sequencer", "Failed to receive unsafe head update after reset request");
434            ctx.cancellation.cancel();
435            return Err(SequencerActorError::ChannelClosed);
436        }
437
438        Ok(())
439    }
440
441    /// Updates the metrics for the sequencer actor.
442    #[cfg(feature = "metrics")]
443    fn update_metrics(&self) {
444        let state_flags: [(&str, String); 2] = [
445            ("active", self.is_active.to_string()),
446            ("recovery", self.is_recovery_mode.to_string()),
447        ];
448
449        let gauge = metrics::gauge!(crate::Metrics::SEQUENCER_STATE, &state_flags);
450        gauge.set(1);
451    }
452}
453
454#[async_trait]
455impl NodeActor for SequencerActor<SequencerBuilder> {
456    type Error = SequencerActorError;
457    type OutboundData = SequencerContext;
458    type Builder = SequencerBuilder;
459    type InboundData = SequencerInboundData;
460
461    fn build(config: Self::Builder) -> (Self::InboundData, Self) {
462        Self::new(config)
463    }
464
465    async fn start(mut self, mut ctx: Self::OutboundData) -> Result<(), Self::Error> {
466        let mut state = SequencerActorState::new(self.builder, ctx.l1_head_rx.clone());
467
468        // Initialize metrics, if configured.
469        #[cfg(feature = "metrics")]
470        state.update_metrics();
471
472        // Reset the engine state prior to beginning block building.
473        state.schedule_initial_reset(&mut ctx, &mut self.unsafe_head_rx).await?;
474
475        loop {
476            select! {
477                // We are using a biased select here to ensure that the admin queries are given priority over the block building task.
478                // This is important to limit the occurrence of race conditions where a stopped query is received when a sequencer is building a new block.
479                biased;
480                _ = ctx.cancellation.cancelled() => {
481                    info!(
482                        target: "sequencer",
483                        "Received shutdown signal. Exiting sequencer task."
484                    );
485                    return Ok(());
486                }
487                // Handle admin queries.
488                Some(admin_query) = self.admin_query_rx.recv(), if !self.admin_query_rx.is_closed() => {
489                    let is_sequencer_active = state.is_active;
490
491                    if let Err(e) = state.handle_admin_query(admin_query, &mut self.unsafe_head_rx).await {
492                        error!(target: "sequencer", err = ?e, "Failed to handle admin query");
493                    }
494
495                    // Reset the build ticker if the sequencer's activity state has changed.
496                    if is_sequencer_active != state.is_active {
497                        state.build_ticker.reset_immediately();
498                    }
499
500                    // Update metrics, if configured.
501                    #[cfg(feature = "metrics")]
502                    state.update_metrics();
503                }
504                // The sequencer must be active to build new blocks.
505                _ = state.build_ticker.tick(), if state.is_active => {
506                    state.build_block(&mut ctx, &mut self.unsafe_head_rx, state.is_recovery_mode).await?;
507                }
508            }
509        }
510    }
511}