1use super::{EngineError, L2Finalizer};
4use alloy_rpc_types_engine::JwtSecret;
5use async_trait::async_trait;
6use futures::future::OptionFuture;
7use kona_derive::{ResetSignal, Signal};
8use kona_engine::{
9 BuildTask, ConsolidateTask, Engine, EngineClient, EngineQueries,
10 EngineState as InnerEngineState, EngineTask, EngineTaskError, EngineTaskErrorSeverity,
11 InsertTask,
12};
13use kona_genesis::RollupConfig;
14use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
15use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
16use std::sync::Arc;
17use tokio::{
18 sync::{mpsc, oneshot, watch},
19 task::JoinHandle,
20};
21use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
22use url::Url;
23
24use crate::{NodeActor, NodeMode, actors::CancellableContext};
25
26#[derive(Debug)]
30pub struct EngineActor {
31 builder: EngineBuilder,
33 attributes_rx: mpsc::Receiver<OpAttributesWithParent>,
35 unsafe_block_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
37 reset_request_rx: mpsc::Receiver<()>,
39 inbound_queries: mpsc::Receiver<EngineQueries>,
41 build_request_rx:
47 Option<mpsc::Receiver<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>>,
48 finalizer: L2Finalizer,
50}
51
52#[derive(Debug)]
54pub struct EngineInboundData {
55 pub build_request_tx:
61 Option<mpsc::Sender<(OpAttributesWithParent, mpsc::Sender<OpExecutionPayloadEnvelope>)>>,
62 pub attributes_tx: mpsc::Sender<OpAttributesWithParent>,
64 pub unsafe_block_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
72 pub reset_request_tx: mpsc::Sender<()>,
74 pub inbound_queries_tx: mpsc::Sender<EngineQueries>,
76 pub finalized_l1_block_tx: watch::Sender<Option<BlockInfo>>,
78}
79
80#[derive(Debug, Clone)]
82pub struct EngineBuilder {
83 pub config: Arc<RollupConfig>,
85 pub engine_url: Url,
87 pub l1_rpc_url: Url,
89 pub jwt_secret: JwtSecret,
91 pub mode: NodeMode,
95}
96
97impl EngineBuilder {
98 fn build_state(self) -> EngineActorState {
101 let client = self.client();
102 let state = InnerEngineState::default();
103 let (engine_state_send, _) = tokio::sync::watch::channel(state);
104 let (engine_queue_length_send, _) = tokio::sync::watch::channel(0);
105
106 EngineActorState {
107 rollup: self.config,
108 client,
109 engine: Engine::new(state, engine_state_send, engine_queue_length_send),
110 }
111 }
112
113 pub fn client(&self) -> Arc<EngineClient> {
115 EngineClient::new_http(
116 self.engine_url.clone(),
117 self.l1_rpc_url.clone(),
118 self.config.clone(),
119 self.jwt_secret,
120 )
121 .into()
122 }
123}
124
125#[derive(Debug)]
127pub(super) struct EngineActorState {
128 pub(super) rollup: Arc<RollupConfig>,
130 pub(super) client: Arc<EngineClient>,
132 pub(super) engine: Engine,
134}
135
136#[derive(Debug)]
138pub struct EngineContext {
139 pub cancellation: CancellationToken,
141 pub engine_unsafe_head_tx: Option<watch::Sender<L2BlockInfo>>,
144 pub engine_l2_safe_head_tx: watch::Sender<L2BlockInfo>,
146 pub sync_complete_tx: oneshot::Sender<()>,
151 pub derivation_signal_tx: mpsc::Sender<Signal>,
153}
154
155impl CancellableContext for EngineContext {
156 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
157 self.cancellation.cancelled()
158 }
159}
160
161impl EngineActor {
162 pub fn new(config: EngineBuilder) -> (EngineInboundData, Self) {
164 let (finalized_l1_block_tx, finalized_l1_block_rx) = watch::channel(None);
165 let (inbound_queries_tx, inbound_queries_rx) = mpsc::channel(1024);
166 let (attributes_tx, attributes_rx) = mpsc::channel(1024);
167 let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024);
168 let (reset_request_tx, reset_request_rx) = mpsc::channel(1024);
169
170 let (build_request_tx, build_request_rx) = if config.mode.is_sequencer() {
171 let (tx, rx) = mpsc::channel(1024);
172 (Some(tx), Some(rx))
173 } else {
174 (None, None)
175 };
176
177 let actor = Self {
178 builder: config,
179 attributes_rx,
180 unsafe_block_rx,
181 reset_request_rx,
182 inbound_queries: inbound_queries_rx,
183 build_request_rx,
184 finalizer: L2Finalizer::new(finalized_l1_block_rx),
185 };
186
187 let outbound_data = EngineInboundData {
188 build_request_tx,
189 finalized_l1_block_tx,
190 inbound_queries_tx,
191 attributes_tx,
192 unsafe_block_tx,
193 reset_request_tx,
194 };
195
196 (outbound_data, actor)
197 }
198}
199
200impl EngineActorState {
201 fn start_query_task(
203 &self,
204 mut inbound_query_channel: tokio::sync::mpsc::Receiver<EngineQueries>,
205 ) -> JoinHandle<()> {
206 let state_recv = self.engine.state_subscribe();
207 let queue_length_recv = self.engine.queue_length_subscribe();
208 let engine_client = self.client.clone();
209 let rollup_config = self.rollup.clone();
210
211 tokio::spawn(async move {
212 while let Some(req) = inbound_query_channel.recv().await {
213 {
214 trace!(target: "engine", ?req, "Received engine query request.");
215
216 if let Err(e) = req
217 .handle(&state_recv, &queue_length_recv, &engine_client, &rollup_config)
218 .await
219 {
220 warn!(target: "engine", err = ?e, "Failed to handle engine query request.");
221 }
222 }
223 }
224 })
225 }
226
227 pub(super) async fn reset(
229 &mut self,
230 derivation_signal_tx: &mpsc::Sender<Signal>,
231 engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
232 finalizer: &mut L2Finalizer,
233 ) -> Result<(), EngineError> {
234 let (l2_safe_head, l1_origin, system_config) =
236 self.engine.reset(self.client.clone(), self.rollup.clone()).await?;
237
238 let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) };
240 match derivation_signal_tx.send(signal.signal()).await {
241 Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"),
242 Err(err) => {
243 error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor");
244 return Err(EngineError::ChannelClosed);
245 }
246 }
247
248 self.maybe_update_safe_head(engine_l2_safe_head_tx);
250
251 finalizer.clear();
253
254 Ok(())
255 }
256
257 async fn drain(
259 &mut self,
260 derivation_signal_tx: &mpsc::Sender<Signal>,
261 sync_complete_tx: &mut Option<oneshot::Sender<()>>,
262 engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
263 finalizer: &mut L2Finalizer,
264 ) -> Result<(), EngineError> {
265 match self.engine.drain().await {
266 Ok(_) => {
267 trace!(target: "engine", "[ENGINE] tasks drained");
268 }
269 Err(err) => {
270 match err.severity() {
271 EngineTaskErrorSeverity::Critical => {
272 error!(target: "engine", ?err, "Critical error draining engine tasks");
273 return Err(err.into());
274 }
275 EngineTaskErrorSeverity::Reset => {
276 warn!(target: "engine", ?err, "Received reset request");
277 self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?;
278 }
279 EngineTaskErrorSeverity::Flush => {
280 warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline.");
285 match derivation_signal_tx.send(Signal::FlushChannel).await {
286 Ok(_) => {
287 debug!(target: "engine", "Sent flush signal to derivation actor")
288 }
289 Err(err) => {
290 error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor.");
291 return Err(EngineError::ChannelClosed);
292 }
293 }
294 }
295 EngineTaskErrorSeverity::Temporary => {
296 trace!(target: "engine", ?err, "Temporary error draining engine tasks");
297 }
298 }
299 }
300 }
301
302 self.maybe_update_safe_head(engine_l2_safe_head_tx);
303 self.check_el_sync(
304 derivation_signal_tx,
305 engine_l2_safe_head_tx,
306 sync_complete_tx,
307 finalizer,
308 )
309 .await?;
310
311 Ok(())
312 }
313
314 async fn check_el_sync(
316 &mut self,
317 derivation_signal_tx: &mpsc::Sender<Signal>,
318 engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
319 sync_complete_tx: &mut Option<oneshot::Sender<()>>,
320 finalizer: &mut L2Finalizer,
321 ) -> Result<(), EngineError> {
322 if self.engine.state().el_sync_finished {
323 let Some(sync_complete_tx) = std::mem::take(sync_complete_tx) else {
324 return Ok(());
325 };
326
327 if self.engine.state().sync_state.finalized_head() != L2BlockInfo::default() {
330 return Ok(());
331 }
332
333 info!(target: "engine", "Performing initial engine reset");
335 self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?;
336 sync_complete_tx.send(()).ok();
337 }
338
339 Ok(())
340 }
341
342 fn maybe_update_safe_head(&self, engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>) {
344 let state_safe_head = self.engine.state().sync_state.safe_head();
345 let update = |head: &mut L2BlockInfo| {
346 if head != &state_safe_head {
347 *head = state_safe_head;
348 return true;
349 }
350 false
351 };
352 let sent = engine_l2_safe_head_tx.send_if_modified(update);
353 trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update");
354 }
355}
356
357#[async_trait]
358impl NodeActor for EngineActor {
359 type Error = EngineError;
360 type OutboundData = EngineContext;
361 type InboundData = EngineInboundData;
362 type Builder = EngineBuilder;
363
364 fn build(config: Self::Builder) -> (Self::InboundData, Self) {
365 Self::new(config)
366 }
367
368 async fn start(
369 mut self,
370 EngineContext {
371 cancellation,
372 engine_l2_safe_head_tx,
373 sync_complete_tx,
374 derivation_signal_tx,
375 mut engine_unsafe_head_tx,
376 }: Self::OutboundData,
377 ) -> Result<(), Self::Error> {
378 let mut state = self.builder.build_state();
379
380 let handle = state.start_query_task(self.inbound_queries);
382
383 let mut sync_complete_tx = Some(sync_complete_tx);
386
387 loop {
388 state
390 .drain(
391 &derivation_signal_tx,
392 &mut sync_complete_tx,
393 &engine_l2_safe_head_tx,
394 &mut self.finalizer,
395 )
396 .await?;
397
398 if let Some(unsafe_head_tx) = engine_unsafe_head_tx.as_mut() {
400 unsafe_head_tx.send_if_modified(|val| {
401 let new_head = state.engine.state().sync_state.unsafe_head();
402 (*val != new_head).then(|| *val = new_head).is_some()
403 });
404 }
405
406 tokio::select! {
407 biased;
408
409 _ = cancellation.cancelled() => {
410 warn!(target: "engine", "EngineActor received shutdown signal. Aborting engine query task.");
411
412 handle.abort();
413
414 return Ok(());
415 }
416 reset = self.reset_request_rx.recv() => {
417 if reset.is_none() {
418 error!(target: "engine", "Reset request receiver closed unexpectedly");
419 cancellation.cancel();
420 return Err(EngineError::ChannelClosed);
421 }
422 warn!(target: "engine", "Received reset request");
423 state
424 .reset(&derivation_signal_tx, &engine_l2_safe_head_tx, &mut self.finalizer)
425 .await?;
426 }
427 Some(res) = OptionFuture::from(self.build_request_rx.as_mut().map(|rx| rx.recv())), if self.build_request_rx.is_some() => {
428 let Some((attributes, response_tx)) = res else {
429 error!(target: "engine", "Build request receiver closed unexpectedly while in sequencer mode");
430 cancellation.cancel();
431 return Err(EngineError::ChannelClosed);
432 };
433
434 let task = EngineTask::Build(BuildTask::new(
435 state.client.clone(),
436 state.rollup.clone(),
437 attributes,
438 false,
440 Some(response_tx),
441 ));
442 state.engine.enqueue(task);
443 }
444 unsafe_block = self.unsafe_block_rx.recv() => {
445 let Some(envelope) = unsafe_block else {
446 error!(target: "engine", "Unsafe block receiver closed unexpectedly");
447 cancellation.cancel();
448 return Err(EngineError::ChannelClosed);
449 };
450 let task = EngineTask::Insert(InsertTask::new(
451 state.client.clone(),
452 state.rollup.clone(),
453 envelope,
454 false, ));
456 state.engine.enqueue(task);
457 }
458 attributes = self.attributes_rx.recv() => {
459 let Some(attributes) = attributes else {
460 error!(target: "engine", "Attributes receiver closed unexpectedly");
461 cancellation.cancel();
462 return Err(EngineError::ChannelClosed);
463 };
464 self.finalizer.enqueue_for_finalization(&attributes);
465
466 let task = EngineTask::Consolidate(ConsolidateTask::new(
467 state.client.clone(),
468 state.rollup.clone(),
469 attributes,
470 true,
471 ));
472 state.engine.enqueue(task);
473 }
474 msg = self.finalizer.new_finalized_block() => {
475 if let Err(err) = msg {
476 error!(target: "engine", ?err, "L1 finalized block receiver closed unexpectedly");
477 cancellation.cancel();
478 return Err(EngineError::ChannelClosed);
479 }
480 self.finalizer.try_finalize_next(&mut state).await;
483 }
484 }
485 }
486 }
487}