1use crate::{DriverError, DriverPipeline, DriverResult, Executor, PipelineCursor, TipCursor};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_consensus::BlockBody;
6use alloy_primitives::{B256, Bytes};
7use alloy_rlp::Decodable;
8use core::fmt::Debug;
9use kona_derive::{Pipeline, PipelineError, PipelineErrorKind, Signal, SignalReceiver};
10use kona_executor::BlockBuildingOutcome;
11use kona_genesis::RollupConfig;
12use kona_protocol::L2BlockInfo;
13use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType};
14use spin::RwLock;
15
16#[derive(Debug)]
18pub struct Driver<E, DP, P>
19where
20 E: Executor + Send + Sync + Debug,
21 DP: DriverPipeline<P> + Send + Sync + Debug,
22 P: Pipeline + SignalReceiver + Send + Sync + Debug,
23{
24 _marker: core::marker::PhantomData<P>,
26 pub cursor: Arc<RwLock<PipelineCursor>>,
28 pub executor: E,
30 pub pipeline: DP,
32 pub safe_head_artifacts: Option<(BlockBuildingOutcome, Vec<Bytes>)>,
34}
35
36impl<E, DP, P> Driver<E, DP, P>
37where
38 E: Executor + Send + Sync + Debug,
39 DP: DriverPipeline<P> + Send + Sync + Debug,
40 P: Pipeline + SignalReceiver + Send + Sync + Debug,
41{
42 pub const fn new(cursor: Arc<RwLock<PipelineCursor>>, executor: E, pipeline: DP) -> Self {
44 Self {
45 _marker: core::marker::PhantomData,
46 cursor,
47 executor,
48 pipeline,
49 safe_head_artifacts: None,
50 }
51 }
52
53 pub async fn wait_for_executor(&mut self) {
55 self.executor.wait_until_ready().await;
56 }
57
58 pub async fn advance_to_target(
69 &mut self,
70 cfg: &RollupConfig,
71 mut target: Option<u64>,
72 ) -> DriverResult<(L2BlockInfo, B256), E::Error> {
73 loop {
74 let pipeline_cursor = self.cursor.read();
76 let tip_cursor = pipeline_cursor.tip();
77 if let Some(tb) = target {
78 if tip_cursor.l2_safe_head.block_info.number >= tb {
79 info!(target: "client", "Derivation complete, reached L2 safe head.");
80 return Ok((tip_cursor.l2_safe_head, tip_cursor.l2_safe_head_output_root));
81 }
82 }
83
84 let mut attributes = match self.pipeline.produce_payload(tip_cursor.l2_safe_head).await
85 {
86 Ok(attrs) => attrs.take_inner(),
87 Err(PipelineErrorKind::Critical(PipelineError::EndOfSource)) => {
88 warn!(target: "client", "Exhausted data source; Halting derivation and using current safe head.");
89
90 if target.is_some() {
93 target = Some(tip_cursor.l2_safe_head.block_info.number);
94 };
95
96 if cfg.is_interop_active(self.cursor.read().l2_safe_head().block_info.number) {
99 return Err(PipelineError::EndOfSource.crit().into());
100 } else {
101 continue;
102 }
103 }
104 Err(e) => {
105 error!(target: "client", "Failed to produce payload: {:?}", e);
106 return Err(DriverError::Pipeline(e));
107 }
108 };
109
110 self.executor.update_safe_head(tip_cursor.l2_safe_head_header.clone());
111 let outcome = match self.executor.execute_payload(attributes.clone()).await {
112 Ok(outcome) => outcome,
113 Err(e) => {
114 error!(target: "client", "Failed to execute L2 block: {}", e);
115
116 if cfg.is_holocene_active(attributes.payload_attributes.timestamp) {
117 warn!(target: "client", "Flushing current channel and retrying deposit only block");
119
120 self.pipeline.signal(Signal::FlushChannel).await?;
125
126 attributes.transactions = attributes.transactions.map(|txs| {
128 txs.into_iter()
129 .filter(|tx| (!tx.is_empty() && tx[0] == OpTxType::Deposit as u8))
130 .collect::<Vec<_>>()
131 });
132
133 self.executor.update_safe_head(tip_cursor.l2_safe_head_header.clone());
135 match self.executor.execute_payload(attributes.clone()).await {
136 Ok(header) => header,
137 Err(e) => {
138 error!(
139 target: "client",
140 "Critical - Failed to execute deposit-only block: {e}",
141 );
142 return Err(DriverError::Executor(e));
143 }
144 }
145 } else {
146 continue;
148 }
149 }
150 };
151
152 let block = OpBlock {
154 header: outcome.header.inner().clone(),
155 body: BlockBody {
156 transactions: attributes
157 .transactions
158 .as_ref()
159 .unwrap_or(&Vec::new())
160 .iter()
161 .map(|tx| OpTxEnvelope::decode(&mut tx.as_ref()).map_err(DriverError::Rlp))
162 .collect::<DriverResult<Vec<OpTxEnvelope>, E::Error>>()?,
163 ommers: Vec::new(),
164 withdrawals: None,
165 },
166 };
167
168 let origin = self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?;
170 let l2_info = L2BlockInfo::from_block_and_genesis(
171 &block,
172 &self.pipeline.rollup_config().genesis,
173 )?;
174 let tip_cursor = TipCursor::new(
175 l2_info,
176 outcome.header.clone(),
177 self.executor.compute_output_root().map_err(DriverError::Executor)?,
178 );
179
180 drop(pipeline_cursor);
182 self.cursor.write().advance(origin, tip_cursor);
183
184 self.safe_head_artifacts = Some((outcome, attributes.transactions.unwrap_or_default()));
186 }
187 }
188}