kona_driver/
core.rs

1//! The driver of the kona derivation pipeline.
2
3use crate::{DriverError, DriverPipeline, DriverResult, Executor, PipelineCursor, TipCursor};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_consensus::BlockBody;
6use alloy_primitives::{B256, Bytes};
7use alloy_rlp::Decodable;
8use core::fmt::Debug;
9use kona_derive::{Pipeline, PipelineError, PipelineErrorKind, Signal, SignalReceiver};
10use kona_executor::BlockBuildingOutcome;
11use kona_genesis::RollupConfig;
12use kona_protocol::L2BlockInfo;
13use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType};
14use spin::RwLock;
15
16/// The Rollup Driver entrypoint.
17#[derive(Debug)]
18pub struct Driver<E, DP, P>
19where
20    E: Executor + Send + Sync + Debug,
21    DP: DriverPipeline<P> + Send + Sync + Debug,
22    P: Pipeline + SignalReceiver + Send + Sync + Debug,
23{
24    /// Marker for the pipeline.
25    _marker: core::marker::PhantomData<P>,
26    /// Cursor to keep track of the L2 tip
27    pub cursor: Arc<RwLock<PipelineCursor>>,
28    /// The Executor.
29    pub executor: E,
30    /// A pipeline abstraction.
31    pub pipeline: DP,
32    /// The safe head's execution artifacts + Transactions
33    pub safe_head_artifacts: Option<(BlockBuildingOutcome, Vec<Bytes>)>,
34}
35
36impl<E, DP, P> Driver<E, DP, P>
37where
38    E: Executor + Send + Sync + Debug,
39    DP: DriverPipeline<P> + Send + Sync + Debug,
40    P: Pipeline + SignalReceiver + Send + Sync + Debug,
41{
42    /// Creates a new [`Driver`].
43    pub const fn new(cursor: Arc<RwLock<PipelineCursor>>, executor: E, pipeline: DP) -> Self {
44        Self {
45            _marker: core::marker::PhantomData,
46            cursor,
47            executor,
48            pipeline,
49            safe_head_artifacts: None,
50        }
51    }
52
53    /// Waits until the executor is ready.
54    pub async fn wait_for_executor(&mut self) {
55        self.executor.wait_until_ready().await;
56    }
57
58    /// Advances the derivation pipeline to the target block number.
59    ///
60    /// ## Takes
61    /// - `cfg`: The rollup configuration.
62    /// - `target`: The target block number.
63    ///
64    /// ## Returns
65    /// - `Ok((l2_safe_head, output_root))` - A tuple containing the [`L2BlockInfo`] of the produced
66    ///   block and the output root.
67    /// - `Err(e)` - An error if the block could not be produced.
68    pub async fn advance_to_target(
69        &mut self,
70        cfg: &RollupConfig,
71        mut target: Option<u64>,
72    ) -> DriverResult<(L2BlockInfo, B256), E::Error> {
73        loop {
74            // Check if we have reached the target block number.
75            let pipeline_cursor = self.cursor.read();
76            let tip_cursor = pipeline_cursor.tip();
77            if let Some(tb) = target {
78                if tip_cursor.l2_safe_head.block_info.number >= tb {
79                    info!(target: "client", "Derivation complete, reached L2 safe head.");
80                    return Ok((tip_cursor.l2_safe_head, tip_cursor.l2_safe_head_output_root));
81                }
82            }
83
84            let mut attributes = match self.pipeline.produce_payload(tip_cursor.l2_safe_head).await
85            {
86                Ok(attrs) => attrs.take_inner(),
87                Err(PipelineErrorKind::Critical(PipelineError::EndOfSource)) => {
88                    warn!(target: "client", "Exhausted data source; Halting derivation and using current safe head.");
89
90                    // Adjust the target block number to the current safe head, as no more blocks
91                    // can be produced.
92                    if target.is_some() {
93                        target = Some(tip_cursor.l2_safe_head.block_info.number);
94                    };
95
96                    // If we are in interop mode, this error must be handled by the caller.
97                    // Otherwise, we continue the loop to halt derivation on the next iteration.
98                    if cfg.is_interop_active(self.cursor.read().l2_safe_head().block_info.number) {
99                        return Err(PipelineError::EndOfSource.crit().into());
100                    } else {
101                        continue;
102                    }
103                }
104                Err(e) => {
105                    error!(target: "client", "Failed to produce payload: {:?}", e);
106                    return Err(DriverError::Pipeline(e));
107                }
108            };
109
110            self.executor.update_safe_head(tip_cursor.l2_safe_head_header.clone());
111            let outcome = match self.executor.execute_payload(attributes.clone()).await {
112                Ok(outcome) => outcome,
113                Err(e) => {
114                    error!(target: "client", "Failed to execute L2 block: {}", e);
115
116                    if cfg.is_holocene_active(attributes.payload_attributes.timestamp) {
117                        // Retry with a deposit-only block.
118                        warn!(target: "client", "Flushing current channel and retrying deposit only block");
119
120                        // Flush the current batch and channel - if a block was replaced with a
121                        // deposit-only block due to execution failure, the
122                        // batch and channel it is contained in is forwards
123                        // invalidated.
124                        self.pipeline.signal(Signal::FlushChannel).await?;
125
126                        // Strip out all transactions that are not deposits.
127                        attributes.transactions = attributes.transactions.map(|txs| {
128                            txs.into_iter()
129                                .filter(|tx| (!tx.is_empty() && tx[0] == OpTxType::Deposit as u8))
130                                .collect::<Vec<_>>()
131                        });
132
133                        // Retry the execution.
134                        self.executor.update_safe_head(tip_cursor.l2_safe_head_header.clone());
135                        match self.executor.execute_payload(attributes.clone()).await {
136                            Ok(header) => header,
137                            Err(e) => {
138                                error!(
139                                    target: "client",
140                                    "Critical - Failed to execute deposit-only block: {e}",
141                                );
142                                return Err(DriverError::Executor(e));
143                            }
144                        }
145                    } else {
146                        // Pre-Holocene, discard the block if execution fails.
147                        continue;
148                    }
149                }
150            };
151
152            // Construct the block.
153            let block = OpBlock {
154                header: outcome.header.inner().clone(),
155                body: BlockBody {
156                    transactions: attributes
157                        .transactions
158                        .as_ref()
159                        .unwrap_or(&Vec::new())
160                        .iter()
161                        .map(|tx| OpTxEnvelope::decode(&mut tx.as_ref()).map_err(DriverError::Rlp))
162                        .collect::<DriverResult<Vec<OpTxEnvelope>, E::Error>>()?,
163                    ommers: Vec::new(),
164                    withdrawals: None,
165                },
166            };
167
168            // Get the pipeline origin and update the tip cursor.
169            let origin = self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?;
170            let l2_info = L2BlockInfo::from_block_and_genesis(
171                &block,
172                &self.pipeline.rollup_config().genesis,
173            )?;
174            let tip_cursor = TipCursor::new(
175                l2_info,
176                outcome.header.clone(),
177                self.executor.compute_output_root().map_err(DriverError::Executor)?,
178            );
179
180            // Advance the derivation pipeline cursor
181            drop(pipeline_cursor);
182            self.cursor.write().advance(origin, tip_cursor);
183
184            // Update the latest safe head artifacts.
185            self.safe_head_artifacts = Some((outcome, attributes.transactions.unwrap_or_default()));
186        }
187    }
188}