1use std::sync::Arc;
4
5use crate::{InteropMode, Metrics, NodeActor, actors::CancellableContext};
6use alloy_provider::RootProvider;
7use async_trait::async_trait;
8use kona_derive::{
9 ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal,
10 SignalReceiver, StepResult,
11};
12use kona_genesis::RollupConfig;
13use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
14use kona_providers_alloy::{
15 AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider,
16 OnlinePipeline,
17};
18use op_alloy_network::Optimism;
19use thiserror::Error;
20use tokio::{
21 select,
22 sync::{mpsc, oneshot, watch},
23};
24use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
25
26#[derive(Debug)]
32pub struct DerivationActor<B>
33where
34 B: PipelineBuilder,
35{
36 state: B,
38
39 l1_head_updates: watch::Receiver<Option<BlockInfo>>,
41 engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
43 el_sync_complete_rx: oneshot::Receiver<()>,
46 derivation_signal_rx: mpsc::Receiver<Signal>,
65}
66
67#[derive(Debug)]
69pub struct DerivationState<P>
70where
71 P: Pipeline + SignalReceiver,
72{
73 pub pipeline: P,
75 pub derivation_idle: bool,
78 pub waiting_for_signal: bool,
81}
82
83const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024;
85
86#[async_trait]
88pub trait PipelineBuilder: Send + Sync + 'static {
89 type Pipeline: Pipeline + SignalReceiver + Send + Sync + 'static;
91
92 async fn build(self) -> DerivationState<Self::Pipeline>;
94}
95
96#[derive(Debug)]
98pub struct DerivationBuilder {
99 pub l1_provider: RootProvider,
101 pub l1_beacon: OnlineBeaconClient,
103 pub l2_provider: RootProvider<Optimism>,
105 pub rollup_config: Arc<RollupConfig>,
107 pub interop_mode: InteropMode,
109}
110
111#[async_trait]
112impl PipelineBuilder for DerivationBuilder {
113 type Pipeline = OnlinePipeline;
114
115 async fn build(self) -> DerivationState<OnlinePipeline> {
116 let l1_derivation_provider =
118 AlloyChainProvider::new(self.l1_provider.clone(), DERIVATION_PROVIDER_CACHE_SIZE);
119 let l2_derivation_provider = AlloyL2ChainProvider::new(
120 self.l2_provider.clone(),
121 self.rollup_config.clone(),
122 DERIVATION_PROVIDER_CACHE_SIZE,
123 );
124
125 let pipeline = match self.interop_mode {
126 InteropMode::Polled => OnlinePipeline::new_polled(
127 self.rollup_config.clone(),
128 OnlineBlobProvider::init(self.l1_beacon.clone()).await,
129 l1_derivation_provider,
130 l2_derivation_provider,
131 ),
132 InteropMode::Indexed => OnlinePipeline::new_indexed(
133 self.rollup_config.clone(),
134 OnlineBlobProvider::init(self.l1_beacon.clone()).await,
135 l1_derivation_provider,
136 l2_derivation_provider,
137 ),
138 };
139
140 DerivationState::new(pipeline)
141 }
142}
143
144#[derive(Debug)]
147pub struct DerivationInboundChannels {
148 pub l1_head_updates_tx: watch::Sender<Option<BlockInfo>>,
150 pub engine_l2_safe_head_tx: watch::Sender<L2BlockInfo>,
152 pub el_sync_complete_tx: oneshot::Sender<()>,
155 pub derivation_signal_tx: mpsc::Sender<Signal>,
160}
161
162#[derive(Debug)]
164pub struct DerivationContext {
165 pub cancellation: CancellationToken,
167 pub derived_attributes_tx: mpsc::Sender<OpAttributesWithParent>,
169 pub reset_request_tx: mpsc::Sender<()>,
172}
173
174impl CancellableContext for DerivationContext {
175 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
176 self.cancellation.cancelled()
177 }
178}
179
180impl<P> DerivationState<P>
181where
182 P: Pipeline + SignalReceiver,
183{
184 pub const fn new(pipeline: P) -> Self {
186 Self { pipeline, derivation_idle: true, waiting_for_signal: false }
187 }
188
189 async fn signal(&mut self, signal: Signal) {
191 if let Signal::Reset(ResetSignal { l1_origin, .. }) = signal {
192 kona_macros::set!(counter, Metrics::DERIVATION_L1_ORIGIN, l1_origin.number);
193 }
194
195 match self.pipeline.signal(signal).await {
196 Ok(_) => info!(target: "derivation", ?signal, "[SIGNAL] Executed Successfully"),
197 Err(e) => {
198 error!(target: "derivation", ?e, ?signal, "Failed to signal derivation pipeline")
199 }
200 }
201 }
202
203 async fn produce_next_attributes(
206 &mut self,
207 engine_l2_safe_head: &watch::Receiver<L2BlockInfo>,
208 reset_request_tx: &mpsc::Sender<()>,
209 ) -> Result<OpAttributesWithParent, DerivationError> {
210 loop {
214 let l2_safe_head = *engine_l2_safe_head.borrow();
215 match self.pipeline.step(l2_safe_head).await {
216 StepResult::PreparedAttributes => { }
217 StepResult::AdvancedOrigin => {
218 let origin =
219 self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?.number;
220
221 kona_macros::set!(counter, Metrics::DERIVATION_L1_ORIGIN, origin);
222 debug!(target: "derivation", l1_block = origin, "Advanced L1 origin");
223 }
224 StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => {
225 match e {
226 PipelineErrorKind::Temporary(e) => {
227 if matches!(e, PipelineError::NotEnoughData) {
230 continue;
231 }
232
233 debug!(
234 target: "derivation",
235 "Exhausted data source for now; Yielding until the chain has extended."
236 );
237 return Err(DerivationError::Yield);
238 }
239 PipelineErrorKind::Reset(e) => {
240 warn!(target: "derivation", "Derivation pipeline is being reset: {e}");
241
242 let system_config = self
243 .pipeline
244 .system_config_by_number(l2_safe_head.block_info.number)
245 .await?;
246
247 if matches!(e, ResetError::HoloceneActivation) {
248 let l1_origin = self
249 .pipeline
250 .origin()
251 .ok_or(PipelineError::MissingOrigin.crit())?;
252
253 self.pipeline
254 .signal(
255 ActivationSignal {
256 l2_safe_head,
257 l1_origin,
258 system_config: Some(system_config),
259 }
260 .signal(),
261 )
262 .await?;
263 } else {
264 if let ResetError::ReorgDetected(expected, new) = e {
265 warn!(
266 target: "derivation",
267 "L1 reorg detected! Expected: {expected} | New: {new}"
268 );
269
270 kona_macros::inc!(counter, Metrics::L1_REORG_COUNT);
271 }
272 if !self
275 .pipeline
276 .rollup_config()
277 .is_interop_active(l2_safe_head.block_info.timestamp)
278 {
279 reset_request_tx.send(()).await.map_err(|e| {
280 error!(target: "derivation", ?e, "Failed to send reset request");
281 DerivationError::Sender(Box::new(e))
282 })?;
283 }
284 self.waiting_for_signal = true;
285 return Err(DerivationError::Yield);
286 }
287 }
288 PipelineErrorKind::Critical(_) => {
289 error!(target: "derivation", "Critical derivation error: {e}");
290 kona_macros::inc!(counter, Metrics::DERIVATION_CRITICAL_ERROR);
291 return Err(e.into());
292 }
293 }
294 }
295 }
296
297 if let Some(attrs) = self.pipeline.next() {
299 return Ok(attrs);
300 }
301 }
302 }
303
304 async fn process(
317 &mut self,
318 msg: InboundDerivationMessage,
319 engine_l2_safe_head: &mut watch::Receiver<L2BlockInfo>,
320 el_sync_complete_rx: &oneshot::Receiver<()>,
321 derived_attributes_tx: &mpsc::Sender<OpAttributesWithParent>,
322 reset_request_tx: &mpsc::Sender<()>,
323 ) -> Result<(), DerivationError> {
324 if !el_sync_complete_rx.is_terminated() {
326 trace!(target: "derivation", "Engine not ready, skipping derivation");
327 return Ok(());
328 } else if self.waiting_for_signal {
329 trace!(target: "derivation", "Waiting to receive a signal, skipping derivation");
330 return Ok(());
331 }
332
333 if !(self.derivation_idle || msg == InboundDerivationMessage::SafeHeadUpdated) {
337 match engine_l2_safe_head.has_changed() {
338 Ok(true) => { }
339 Ok(false) => {
340 trace!(target: "derivation", "Safe head hasn't changed, skipping derivation.");
341 return Ok(());
342 }
343 Err(e) => {
344 error!(target: "derivation", ?e, "Failed to check if safe head has changed");
345 return Err(DerivationError::L2SafeHeadReceiveFailed);
346 }
347 }
348 }
349
350 let engine_safe_head = *engine_l2_safe_head.borrow();
352 if engine_safe_head.block_info.hash.is_zero() {
353 warn!(target: "derivation", engine_safe_head = ?engine_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation.");
354 return Ok(());
355 }
356
357 let payload_attrs =
360 match self.produce_next_attributes(engine_l2_safe_head, reset_request_tx).await {
361 Ok(attrs) => attrs,
362 Err(DerivationError::Yield) => {
363 self.derivation_idle = true;
365 return Ok(());
366 }
367 Err(e) => {
368 return Err(e);
369 }
370 };
371
372 self.derivation_idle = false;
374
375 engine_l2_safe_head.borrow_and_update();
377
378 derived_attributes_tx
380 .send(payload_attrs)
381 .await
382 .map_err(|e| DerivationError::Sender(Box::new(e)))?;
383
384 Ok(())
385 }
386}
387
388impl<B> DerivationActor<B>
389where
390 B: PipelineBuilder,
391{
392 pub fn new(state: B) -> (DerivationInboundChannels, Self) {
394 let (l1_head_updates_tx, l1_head_updates_rx) = watch::channel(None);
395 let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) =
396 watch::channel(L2BlockInfo::default());
397 let (el_sync_complete_tx, el_sync_complete_rx) = oneshot::channel();
398 let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16);
399 let actor = Self {
400 state,
401 l1_head_updates: l1_head_updates_rx,
402 engine_l2_safe_head: engine_l2_safe_head_rx,
403 el_sync_complete_rx,
404 derivation_signal_rx,
405 };
406
407 (
408 DerivationInboundChannels {
409 l1_head_updates_tx,
410 engine_l2_safe_head_tx,
411 el_sync_complete_tx,
412 derivation_signal_tx,
413 },
414 actor,
415 )
416 }
417}
418
419#[async_trait]
420impl<B> NodeActor for DerivationActor<B>
421where
422 B: PipelineBuilder,
423{
424 type Error = DerivationError;
425 type OutboundData = DerivationContext;
426 type Builder = B;
427 type InboundData = DerivationInboundChannels;
428
429 fn build(config: Self::Builder) -> (Self::InboundData, Self) {
430 Self::new(config)
431 }
432
433 async fn start(
434 mut self,
435 DerivationContext {
436 derived_attributes_tx,
437 reset_request_tx,
438 cancellation,
439 }: Self::OutboundData,
440 ) -> Result<(), Self::Error> {
441 let mut state = self.state.build().await;
442
443 loop {
444 select! {
445 biased;
446
447 _ = cancellation.cancelled() => {
448 info!(
449 target: "derivation",
450 "Received shutdown signal. Exiting derivation task."
451 );
452 return Ok(());
453 }
454 signal = self.derivation_signal_rx.recv() => {
455 let Some(signal) = signal else {
456 error!(
457 target: "derivation",
458 ?signal,
459 "DerivationActor failed to receive signal"
460 );
461 return Err(DerivationError::SignalReceiveFailed);
462 };
463
464 state.signal(signal).await;
465 state.waiting_for_signal = false;
466 }
467 msg = self.l1_head_updates.changed() => {
468 if let Err(err) = msg {
469 error!(
470 target: "derivation",
471 ?err,
472 "L1 head update stream closed without cancellation. Exiting derivation task."
473 );
474 return Ok(());
475 }
476
477 state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
478 }
479 _ = self.engine_l2_safe_head.changed() => {
480 state.process(InboundDerivationMessage::SafeHeadUpdated, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
481 }
482 _ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => {
483 info!(target: "derivation", "Engine finished syncing, starting derivation.");
484 state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &derived_attributes_tx, &reset_request_tx).await?;
486 }
487 }
488 }
489 }
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494pub enum InboundDerivationMessage {
495 NewDataAvailable,
497 SafeHeadUpdated,
500}
501
502#[derive(Error, Debug)]
504pub enum DerivationError {
505 #[error(transparent)]
507 Pipeline(#[from] PipelineErrorKind),
508 #[error("Waiting for more data to be available")]
510 Yield,
511 #[error("Failed to send event to broadcast sender: {0}")]
513 Sender(Box<dyn std::error::Error>),
514 #[error("Failed to receive signal")]
516 SignalReceiveFailed,
517 #[error("Failed to receive L2 safe head")]
519 L2SafeHeadReceiveFailed,
520}