kona_node_service/actors/
derivation.rs

1//! [NodeActor] implementation for the derivation sub-routine.
2
3use std::sync::Arc;
4
5use crate::{InteropMode, Metrics, NodeActor, actors::CancellableContext};
6use alloy_provider::RootProvider;
7use async_trait::async_trait;
8use kona_derive::{
9    ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal,
10    SignalReceiver, StepResult,
11};
12use kona_genesis::RollupConfig;
13use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
14use kona_providers_alloy::{
15    AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider,
16    OnlinePipeline,
17};
18use op_alloy_network::Optimism;
19use thiserror::Error;
20use tokio::{
21    select,
22    sync::{mpsc, oneshot, watch},
23};
24use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
25
26/// The [NodeActor] for the derivation sub-routine.
27///
28/// This actor is responsible for receiving messages from [NodeActor]s and stepping the
29/// derivation pipeline forward to produce new payload attributes. The actor then sends the payload
30/// to the [NodeActor] responsible for the execution sub-routine.
31#[derive(Debug)]
32pub struct DerivationActor<B>
33where
34    B: PipelineBuilder,
35{
36    /// The state for the derivation actor.
37    state: B,
38
39    /// The receiver for L1 head update notifications.
40    l1_head_updates: watch::Receiver<Option<BlockInfo>>,
41    /// The receiver for L2 safe head update notifications.
42    engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
43    /// A receiver used by the engine to signal derivation to begin. Completing EL sync consumes
44    /// the instance.
45    el_sync_complete_rx: oneshot::Receiver<()>,
46    /// A receiver that sends a [`Signal`] to the derivation pipeline.
47    ///
48    /// The derivation actor steps over the derivation pipeline to generate
49    /// [`OpAttributesWithParent`]. These attributes then need to be executed
50    /// via the engine api, which is done by sending them through the
51    /// [`DerivationContext::derived_attributes_tx`] channel.
52    ///
53    /// When the engine api receives an `INVALID` response for a new block (
54    /// the new [`OpAttributesWithParent`]) during block building, the payload
55    /// is reduced to "deposits-only". When this happens, the channel and
56    /// remaining buffered batches need to be flushed out of the derivation
57    /// pipeline.
58    ///
59    /// This channel allows the engine to send a [`Signal::FlushChannel`]
60    /// message back to the derivation pipeline when an `INVALID` response
61    /// occurs.
62    ///
63    /// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
64    derivation_signal_rx: mpsc::Receiver<Signal>,
65}
66
67/// The state for the derivation actor.
68#[derive(Debug)]
69pub struct DerivationState<P>
70where
71    P: Pipeline + SignalReceiver,
72{
73    /// The derivation pipeline.
74    pub pipeline: P,
75    /// A flag indicating whether or not derivation is idle. Derivation is considered idle when it
76    /// has yielded to wait for more data on the DAL.
77    pub derivation_idle: bool,
78    /// A flag indicating whether or not derivation is waiting for a signal. When waiting for a
79    /// signal, derivation cannot process any incoming events.
80    pub waiting_for_signal: bool,
81}
82
83/// The size of the cache used in the derivation pipeline's providers.
84const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024;
85
86/// A trait for building derivation pipelines.
87#[async_trait]
88pub trait PipelineBuilder: Send + Sync + 'static {
89    /// The type of pipeline to build.
90    type Pipeline: Pipeline + SignalReceiver + Send + Sync + 'static;
91
92    /// Builds the derivation pipeline.
93    async fn build(self) -> DerivationState<Self::Pipeline>;
94}
95
96/// The configuration necessary to build the derivation actor.
97#[derive(Debug)]
98pub struct DerivationBuilder {
99    /// The L1 provider.
100    pub l1_provider: RootProvider,
101    /// The L1 beacon client.
102    pub l1_beacon: OnlineBeaconClient,
103    /// The L2 provider.
104    pub l2_provider: RootProvider<Optimism>,
105    /// The rollup config.
106    pub rollup_config: Arc<RollupConfig>,
107    /// The interop mode.
108    pub interop_mode: InteropMode,
109}
110
111#[async_trait]
112impl PipelineBuilder for DerivationBuilder {
113    type Pipeline = OnlinePipeline;
114
115    async fn build(self) -> DerivationState<OnlinePipeline> {
116        // Create the caching L1/L2 EL providers for derivation.
117        let l1_derivation_provider =
118            AlloyChainProvider::new(self.l1_provider.clone(), DERIVATION_PROVIDER_CACHE_SIZE);
119        let l2_derivation_provider = AlloyL2ChainProvider::new(
120            self.l2_provider.clone(),
121            self.rollup_config.clone(),
122            DERIVATION_PROVIDER_CACHE_SIZE,
123        );
124
125        let pipeline = match self.interop_mode {
126            InteropMode::Polled => OnlinePipeline::new_polled(
127                self.rollup_config.clone(),
128                OnlineBlobProvider::init(self.l1_beacon.clone()).await,
129                l1_derivation_provider,
130                l2_derivation_provider,
131            ),
132            InteropMode::Indexed => OnlinePipeline::new_indexed(
133                self.rollup_config.clone(),
134                OnlineBlobProvider::init(self.l1_beacon.clone()).await,
135                l1_derivation_provider,
136                l2_derivation_provider,
137            ),
138        };
139
140        DerivationState::new(pipeline)
141    }
142}
143
144/// The inbound channels for the derivation actor.
145/// These channels are used to send messages to the derivation actor by other actors.
146#[derive(Debug)]
147pub struct DerivationInboundChannels {
148    /// The sender for L1 head update notifications.
149    pub l1_head_updates_tx: watch::Sender<Option<BlockInfo>>,
150    /// The sender for L2 safe head update notifications.
151    pub engine_l2_safe_head_tx: watch::Sender<L2BlockInfo>,
152    /// A sender used by the engine to signal derivation to begin. Completing EL sync consumes the
153    /// instance.
154    pub el_sync_complete_tx: oneshot::Sender<()>,
155    /// A sender that sends a [`Signal`] to the derivation pipeline.
156    ///
157    /// This channel should be used by the engine actor to send [`Signal`]s to the derivation
158    /// pipeline. The signals are received by `DerivationActor::derivation_signal_rx`.
159    pub derivation_signal_tx: mpsc::Sender<Signal>,
160}
161
162/// The communication context used by the derivation actor.
163#[derive(Debug)]
164pub struct DerivationContext {
165    /// The cancellation token, shared between all tasks.
166    pub cancellation: CancellationToken,
167    /// Sends the derived [`OpAttributesWithParent`]s produced by the actor.
168    pub derived_attributes_tx: mpsc::Sender<OpAttributesWithParent>,
169    /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
170    /// them to the engine.
171    pub reset_request_tx: mpsc::Sender<()>,
172}
173
174impl CancellableContext for DerivationContext {
175    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
176        self.cancellation.cancelled()
177    }
178}
179
180impl<P> DerivationState<P>
181where
182    P: Pipeline + SignalReceiver,
183{
184    /// Creates a new instance of the [DerivationState].
185    pub const fn new(pipeline: P) -> Self {
186        Self { pipeline, derivation_idle: true, waiting_for_signal: false }
187    }
188
189    /// Handles a [`Signal`] received over the derivation signal receiver channel.
190    async fn signal(&mut self, signal: Signal) {
191        if let Signal::Reset(ResetSignal { l1_origin, .. }) = signal {
192            kona_macros::set!(counter, Metrics::DERIVATION_L1_ORIGIN, l1_origin.number);
193        }
194
195        match self.pipeline.signal(signal).await {
196            Ok(_) => info!(target: "derivation", ?signal, "[SIGNAL] Executed Successfully"),
197            Err(e) => {
198                error!(target: "derivation", ?e, ?signal, "Failed to signal derivation pipeline")
199            }
200        }
201    }
202
203    /// Attempts to step the derivation pipeline forward as much as possible in order to produce the
204    /// next safe payload.
205    async fn produce_next_attributes(
206        &mut self,
207        engine_l2_safe_head: &watch::Receiver<L2BlockInfo>,
208        reset_request_tx: &mpsc::Sender<()>,
209    ) -> Result<OpAttributesWithParent, DerivationError> {
210        // As we start the safe head at the disputed block's parent, we step the pipeline until the
211        // first attributes are produced. All batches at and before the safe head will be
212        // dropped, so the first payload will always be the disputed one.
213        loop {
214            let l2_safe_head = *engine_l2_safe_head.borrow();
215            match self.pipeline.step(l2_safe_head).await {
216                StepResult::PreparedAttributes => { /* continue; attributes will be sent off. */ }
217                StepResult::AdvancedOrigin => {
218                    let origin =
219                        self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?.number;
220
221                    kona_macros::set!(counter, Metrics::DERIVATION_L1_ORIGIN, origin);
222                    debug!(target: "derivation", l1_block = origin, "Advanced L1 origin");
223                }
224                StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => {
225                    match e {
226                        PipelineErrorKind::Temporary(e) => {
227                            // NotEnoughData is transient, and doesn't imply we need to wait for
228                            // more data. We can continue stepping until we receive an Eof.
229                            if matches!(e, PipelineError::NotEnoughData) {
230                                continue;
231                            }
232
233                            debug!(
234                                target: "derivation",
235                                "Exhausted data source for now; Yielding until the chain has extended."
236                            );
237                            return Err(DerivationError::Yield);
238                        }
239                        PipelineErrorKind::Reset(e) => {
240                            warn!(target: "derivation", "Derivation pipeline is being reset: {e}");
241
242                            let system_config = self
243                                .pipeline
244                                .system_config_by_number(l2_safe_head.block_info.number)
245                                .await?;
246
247                            if matches!(e, ResetError::HoloceneActivation) {
248                                let l1_origin = self
249                                    .pipeline
250                                    .origin()
251                                    .ok_or(PipelineError::MissingOrigin.crit())?;
252
253                                self.pipeline
254                                    .signal(
255                                        ActivationSignal {
256                                            l2_safe_head,
257                                            l1_origin,
258                                            system_config: Some(system_config),
259                                        }
260                                        .signal(),
261                                    )
262                                    .await?;
263                            } else {
264                                if let ResetError::ReorgDetected(expected, new) = e {
265                                    warn!(
266                                        target: "derivation",
267                                        "L1 reorg detected! Expected: {expected} | New: {new}"
268                                    );
269
270                                    kona_macros::inc!(counter, Metrics::L1_REORG_COUNT);
271                                }
272                                // send the `reset` signal to the engine actor only when interop is
273                                // not active.
274                                if !self
275                                    .pipeline
276                                    .rollup_config()
277                                    .is_interop_active(l2_safe_head.block_info.timestamp)
278                                {
279                                    reset_request_tx.send(()).await.map_err(|e| {
280                                        error!(target: "derivation", ?e, "Failed to send reset request");
281                                        DerivationError::Sender(Box::new(e))
282                                    })?;
283                                }
284                                self.waiting_for_signal = true;
285                                return Err(DerivationError::Yield);
286                            }
287                        }
288                        PipelineErrorKind::Critical(_) => {
289                            error!(target: "derivation", "Critical derivation error: {e}");
290                            kona_macros::inc!(counter, Metrics::DERIVATION_CRITICAL_ERROR);
291                            return Err(e.into());
292                        }
293                    }
294                }
295            }
296
297            // If there are any new attributes, send them to the execution actor.
298            if let Some(attrs) = self.pipeline.next() {
299                return Ok(attrs);
300            }
301        }
302    }
303
304    /// Attempts to process the next payload attributes.
305    ///
306    /// There are a few constraints around stepping on the derivation pipeline.
307    /// - The l2 safe head ([`L2BlockInfo`]) must not be the zero hash.
308    /// - The pipeline must not be stepped on with the same L2 safe head twice.
309    /// - Errors must be bubbled up to the caller.
310    ///
311    /// In order to achieve this, the channel to receive the L2 safe head
312    /// [`L2BlockInfo`] from the engine is *only* marked as _seen_ after payload
313    /// attributes are successfully produced. If the pipeline step errors,
314    /// the same [`L2BlockInfo`] is used again. If the [`L2BlockInfo`] is the
315    /// zero hash, the pipeline is not stepped on.
316    async fn process(
317        &mut self,
318        msg: InboundDerivationMessage,
319        engine_l2_safe_head: &mut watch::Receiver<L2BlockInfo>,
320        el_sync_complete_rx: &oneshot::Receiver<()>,
321        derived_attributes_tx: &mpsc::Sender<OpAttributesWithParent>,
322        reset_request_tx: &mpsc::Sender<()>,
323    ) -> Result<(), DerivationError> {
324        // Only attempt derivation once the engine finishes syncing.
325        if !el_sync_complete_rx.is_terminated() {
326            trace!(target: "derivation", "Engine not ready, skipping derivation");
327            return Ok(());
328        } else if self.waiting_for_signal {
329            trace!(target: "derivation", "Waiting to receive a signal, skipping derivation");
330            return Ok(());
331        }
332
333        // If derivation isn't idle and the message hasn't observed a safe head update already,
334        // check if the safe head has changed before continuing. This is to prevent attempts to
335        // progress the pipeline while it is in the middle of processing a channel.
336        if !(self.derivation_idle || msg == InboundDerivationMessage::SafeHeadUpdated) {
337            match engine_l2_safe_head.has_changed() {
338                Ok(true) => { /* Proceed to produce next payload attributes. */ }
339                Ok(false) => {
340                    trace!(target: "derivation", "Safe head hasn't changed, skipping derivation.");
341                    return Ok(());
342                }
343                Err(e) => {
344                    error!(target: "derivation", ?e, "Failed to check if safe head has changed");
345                    return Err(DerivationError::L2SafeHeadReceiveFailed);
346                }
347            }
348        }
349
350        // Wait for the engine to initialize unknowns prior to kicking off derivation.
351        let engine_safe_head = *engine_l2_safe_head.borrow();
352        if engine_safe_head.block_info.hash.is_zero() {
353            warn!(target: "derivation", engine_safe_head = ?engine_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation.");
354            return Ok(());
355        }
356
357        // Advance the pipeline as much as possible, new data may be available or there still may be
358        // payloads in the attributes queue.
359        let payload_attrs =
360            match self.produce_next_attributes(engine_l2_safe_head, reset_request_tx).await {
361                Ok(attrs) => attrs,
362                Err(DerivationError::Yield) => {
363                    // Yield until more data is available.
364                    self.derivation_idle = true;
365                    return Ok(());
366                }
367                Err(e) => {
368                    return Err(e);
369                }
370            };
371
372        // Mark derivation as busy.
373        self.derivation_idle = false;
374
375        // Mark the L2 safe head as seen.
376        engine_l2_safe_head.borrow_and_update();
377
378        // Send payload attributes out for processing.
379        derived_attributes_tx
380            .send(payload_attrs)
381            .await
382            .map_err(|e| DerivationError::Sender(Box::new(e)))?;
383
384        Ok(())
385    }
386}
387
388impl<B> DerivationActor<B>
389where
390    B: PipelineBuilder,
391{
392    /// Creates a new instance of the [DerivationActor].
393    pub fn new(state: B) -> (DerivationInboundChannels, Self) {
394        let (l1_head_updates_tx, l1_head_updates_rx) = watch::channel(None);
395        let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) =
396            watch::channel(L2BlockInfo::default());
397        let (el_sync_complete_tx, el_sync_complete_rx) = oneshot::channel();
398        let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16);
399        let actor = Self {
400            state,
401            l1_head_updates: l1_head_updates_rx,
402            engine_l2_safe_head: engine_l2_safe_head_rx,
403            el_sync_complete_rx,
404            derivation_signal_rx,
405        };
406
407        (
408            DerivationInboundChannels {
409                l1_head_updates_tx,
410                engine_l2_safe_head_tx,
411                el_sync_complete_tx,
412                derivation_signal_tx,
413            },
414            actor,
415        )
416    }
417}
418
419#[async_trait]
420impl<B> NodeActor for DerivationActor<B>
421where
422    B: PipelineBuilder,
423{
424    type Error = DerivationError;
425    type OutboundData = DerivationContext;
426    type Builder = B;
427    type InboundData = DerivationInboundChannels;
428
429    fn build(config: Self::Builder) -> (Self::InboundData, Self) {
430        Self::new(config)
431    }
432
433    async fn start(
434        mut self,
435        DerivationContext {
436            derived_attributes_tx,
437            reset_request_tx,
438            cancellation,
439        }: Self::OutboundData,
440    ) -> Result<(), Self::Error> {
441        let mut state = self.state.build().await;
442
443        loop {
444            select! {
445                biased;
446
447                _ = cancellation.cancelled() => {
448                    info!(
449                        target: "derivation",
450                        "Received shutdown signal. Exiting derivation task."
451                    );
452                    return Ok(());
453                }
454                signal = self.derivation_signal_rx.recv() => {
455                    let Some(signal) = signal else {
456                        error!(
457                            target: "derivation",
458                            ?signal,
459                            "DerivationActor failed to receive signal"
460                        );
461                        return Err(DerivationError::SignalReceiveFailed);
462                    };
463
464                    state.signal(signal).await;
465                    state.waiting_for_signal = false;
466                }
467                msg = self.l1_head_updates.changed() => {
468                    if let Err(err) = msg {
469                        error!(
470                            target: "derivation",
471                            ?err,
472                            "L1 head update stream closed without cancellation. Exiting derivation task."
473                        );
474                        return Ok(());
475                    }
476
477                    state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
478                }
479                _ = self.engine_l2_safe_head.changed() => {
480                    state.process(InboundDerivationMessage::SafeHeadUpdated, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
481                }
482                _ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => {
483                    info!(target: "derivation", "Engine finished syncing, starting derivation.");
484                    // Optimistically process the first message.
485                    state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
486                }
487            }
488        }
489    }
490}
491
492/// Messages that the [DerivationActor] can receive from other actors.
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494pub enum InboundDerivationMessage {
495    /// New data is potentially available for processing on the data availability layer.
496    NewDataAvailable,
497    /// The engine has updated its safe head. An attempt to process the next payload attributes can
498    /// be made.
499    SafeHeadUpdated,
500}
501
502/// An error from the [DerivationActor].
503#[derive(Error, Debug)]
504pub enum DerivationError {
505    /// An error originating from the derivation pipeline.
506    #[error(transparent)]
507    Pipeline(#[from] PipelineErrorKind),
508    /// Waiting for more data to be available.
509    #[error("Waiting for more data to be available")]
510    Yield,
511    /// An error originating from the broadcast sender.
512    #[error("Failed to send event to broadcast sender: {0}")]
513    Sender(Box<dyn std::error::Error>),
514    /// An error from the signal receiver.
515    #[error("Failed to receive signal")]
516    SignalReceiveFailed,
517    /// Unable to receive the L2 safe head to step on the pipeline.
518    #[error("Failed to receive L2 safe head")]
519    L2SafeHeadReceiveFailed,
520}