kona_node_service/actors/
l1_watcher_rpc.rs

1//! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP
2//! RPC.
3
4use crate::{NodeActor, actors::CancellableContext};
5use alloy_eips::{BlockId, BlockNumberOrTag};
6use alloy_primitives::{Address, B256};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::PollerBuilder;
9use alloy_rpc_types_eth::{Block, Log};
10use alloy_transport::TransportError;
11use async_stream::stream;
12use async_trait::async_trait;
13use futures::{Stream, StreamExt};
14use kona_genesis::{RollupConfig, SystemConfigLog, SystemConfigUpdate, UnsafeBlockSignerUpdate};
15use kona_protocol::BlockInfo;
16use kona_rpc::{L1State, L1WatcherQueries};
17use std::{sync::Arc, time::Duration};
18use thiserror::Error;
19use tokio::{
20    select,
21    sync::{
22        mpsc::{self, error::SendError},
23        watch,
24    },
25    task::JoinHandle,
26};
27use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
28
29/// An L1 chain watcher that checks for L1 block updates over RPC.
30#[derive(Debug)]
31pub struct L1WatcherRpc {
32    state: L1WatcherRpcState,
33    /// The inbound queries to the L1 watcher.
34    pub inbound_queries: tokio::sync::mpsc::Receiver<L1WatcherQueries>,
35}
36
37/// The configuration for the L1 watcher actor.
38#[derive(Debug)]
39pub struct L1WatcherRpcState {
40    /// The [`RollupConfig`] to tell if ecotone is active.
41    /// This is used to determine if the L1 watcher should check for unsafe block signer updates.
42    pub rollup: Arc<RollupConfig>,
43    /// The L1 provider.
44    pub l1_provider: RootProvider,
45}
46
47impl L1WatcherRpcState {
48    /// Fetches logs for the given block hash.
49    async fn fetch_logs(&self, block_hash: B256) -> Result<Vec<Log>, L1WatcherRpcError<BlockInfo>> {
50        let logs = self
51            .l1_provider
52            .get_logs(&alloy_rpc_types_eth::Filter::new().select(block_hash))
53            .await?;
54
55        Ok(logs)
56    }
57
58    /// Spins up a task to process inbound queries.
59    fn start_query_processor(
60        &self,
61        mut inbound_queries: tokio::sync::mpsc::Receiver<L1WatcherQueries>,
62        head_updates_recv: watch::Receiver<Option<BlockInfo>>,
63    ) -> JoinHandle<()> {
64        // Start the inbound query processor in a separate task to avoid blocking the main task.
65        // We can cheaply clone the l1 provider here because it is an Arc.
66        let l1_provider = self.l1_provider.clone();
67        let rollup_config = self.rollup.clone();
68
69        tokio::spawn(async move {
70            while let Some(query) = inbound_queries.recv().await {
71                match query {
72                    L1WatcherQueries::Config(sender) => {
73                        if let Err(e) = sender.send((*rollup_config).clone()) {
74                            warn!(target: "l1_watcher", error = ?e, "Failed to send L1 config to the query sender");
75                        }
76                    }
77                    L1WatcherQueries::L1State(sender) => {
78                        let current_l1 = *head_updates_recv.borrow();
79
80                        let head_l1 = match l1_provider.get_block(BlockId::latest()).await {
81                                Ok(block) => block,
82                                Err(e) => {
83                                    warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest head block");
84                                    None
85                                }}.map(|block| block.into_consensus().into());
86
87                        let finalized_l1 = match l1_provider.get_block(BlockId::finalized()).await {
88                                Ok(block) => block,
89                                Err(e) => {
90                                    warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest finalized block");
91                                    None
92                                }}.map(|block| block.into_consensus().into());
93
94                        let safe_l1 = match l1_provider.get_block(BlockId::safe()).await {
95                                Ok(block) => block,
96                                Err(e) => {
97                                    warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest safe block");
98                                    None
99                                }}.map(|block| block.into_consensus().into());
100
101                        if let Err(e) = sender.send(L1State {
102                            current_l1,
103                            current_l1_finalized: finalized_l1,
104                            head_l1,
105                            safe_l1,
106                            finalized_l1,
107                        }) {
108                            warn!(target: "l1_watcher", error = ?e, "Failed to send L1 state to the query sender");
109                        }
110                    }
111                }
112            }
113
114            error!(target: "l1_watcher", "L1 watcher query channel closed unexpectedly, exiting query processor task.");
115        })
116    }
117}
118
119/// The inbound channels for the L1 watcher actor.
120#[derive(Debug)]
121pub struct L1WatcherRpcInboundChannels {
122    /// The inbound queries to the L1 watcher.
123    pub inbound_queries: tokio::sync::mpsc::Sender<L1WatcherQueries>,
124}
125
126/// The communication context used by the L1 watcher actor.
127#[derive(Debug)]
128pub struct L1WatcherRpcContext {
129    /// The latest L1 head block.
130    pub latest_head: watch::Sender<Option<BlockInfo>>,
131    /// The latest L1 finalized block.
132    pub latest_finalized: watch::Sender<Option<BlockInfo>>,
133    /// The block signer sender.
134    pub block_signer_sender: mpsc::Sender<Address>,
135    /// The cancellation token, shared between all tasks.
136    pub cancellation: CancellationToken,
137}
138
139impl CancellableContext for L1WatcherRpcContext {
140    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
141        self.cancellation.cancelled()
142    }
143}
144
145impl L1WatcherRpc {
146    /// Creates a new [`L1WatcherRpc`] instance.
147    pub fn new(config: L1WatcherRpcState) -> (L1WatcherRpcInboundChannels, Self) {
148        let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024);
149
150        let actor = Self { state: config, inbound_queries: l1_watcher_queries_recv };
151        (L1WatcherRpcInboundChannels { inbound_queries: l1_watcher_queries_sender }, actor)
152    }
153}
154
155#[async_trait]
156impl NodeActor for L1WatcherRpc {
157    type Error = L1WatcherRpcError<BlockInfo>;
158    type InboundData = L1WatcherRpcInboundChannels;
159    type OutboundData = L1WatcherRpcContext;
160    type Builder = L1WatcherRpcState;
161
162    fn build(config: Self::Builder) -> (Self::InboundData, Self) {
163        Self::new(config)
164    }
165
166    async fn start(
167        mut self,
168        L1WatcherRpcContext { latest_head, latest_finalized, block_signer_sender, cancellation }: Self::OutboundData,
169    ) -> Result<(), Self::Error> {
170        let mut head_stream = BlockStream::new(
171            &self.state.l1_provider,
172            BlockNumberOrTag::Latest,
173            Duration::from_secs(13),
174        )
175        .into_stream();
176        let mut finalized_stream = BlockStream::new(
177            &self.state.l1_provider,
178            BlockNumberOrTag::Finalized,
179            Duration::from_secs(60),
180        )
181        .into_stream();
182
183        let inbound_query_processor =
184            self.state.start_query_processor(self.inbound_queries, latest_head.subscribe());
185
186        // Start the main processing loop.
187        loop {
188            select! {
189                _ = cancellation.cancelled() => {
190                    // Exit the task on cancellation.
191                    info!(
192                        target: "l1_watcher",
193                        "Received shutdown signal. Exiting L1 watcher task."
194                    );
195
196                    // Kill the inbound query processor.
197                    inbound_query_processor.abort();
198
199                    return Ok(());
200                },
201                new_head = head_stream.next() => match new_head {
202                    None => {
203                        return Err(L1WatcherRpcError::StreamEnded);
204                    }
205                    Some(head_block_info) => {
206                        // Send the head update event to all consumers.
207                        latest_head.send_replace(Some(head_block_info));
208
209                        // For each log, attempt to construct a `SystemConfigLog`.
210                        // Build the `SystemConfigUpdate` from the log.
211                        // If the update is an Unsafe block signer update, send the address
212                        // to the block signer sender.
213                        let logs = self.state.fetch_logs(head_block_info.hash).await?;
214                        let ecotone_active = self.state.rollup.is_ecotone_active(head_block_info.timestamp);
215                        for log in logs {
216                            if log.address() != self.state.rollup.l1_system_config_address {
217                                continue; // Skip logs not related to the system config.
218                            }
219
220                            let sys_cfg_log = SystemConfigLog::new(log.into(), ecotone_active);
221                            if let Ok(SystemConfigUpdate::UnsafeBlockSigner(UnsafeBlockSignerUpdate { unsafe_block_signer })) = sys_cfg_log.build() {
222                                info!(
223                                    target: "l1_watcher",
224                                    "Unsafe block signer update: {unsafe_block_signer}"
225                                );
226                                if let Err(e) = block_signer_sender.send(unsafe_block_signer).await {
227                                    error!(
228                                        target: "l1_watcher",
229                                        "Error sending unsafe block signer update: {e}"
230                                    );
231                                }
232                            }
233                        }
234                    },
235                },
236                new_finalized = finalized_stream.next() => match new_finalized {
237                    None => {
238                        return Err(L1WatcherRpcError::StreamEnded);
239                    }
240                    Some(finalized_block_info) => {
241                        latest_finalized.send_replace(Some(finalized_block_info));
242                    }
243                }
244            }
245        }
246    }
247}
248
249/// A wrapper around a [`PollerBuilder`] that observes [`BlockId`] updates on a [`RootProvider`].
250///
251/// Note that this stream is not guaranteed to be contiguous. It may miss certain blocks, and
252/// yielded items should only be considered to be the latest block matching the given
253/// [`BlockNumberOrTag`].
254struct BlockStream<'a> {
255    /// The inner [`RootProvider`].
256    l1_provider: &'a RootProvider,
257    /// The block tag to poll for.
258    tag: BlockNumberOrTag,
259    /// The poll interval (in seconds).
260    poll_interval: Duration,
261}
262
263impl<'a> BlockStream<'a> {
264    /// Creates a new [`BlockStream`] instance.
265    ///
266    /// ## Panics
267    /// Panics if the passed [`BlockNumberOrTag`] is of the [`BlockNumberOrTag::Number`] variant.
268    fn new(l1_provider: &'a RootProvider, tag: BlockNumberOrTag, poll_interval: Duration) -> Self {
269        if matches!(tag, BlockNumberOrTag::Number(_)) {
270            panic!("Invalid BlockNumberOrTag variant - Must be a tag");
271        }
272        Self { l1_provider, tag, poll_interval }
273    }
274
275    /// Transforms the watcher into a [`Stream`].
276    fn into_stream(self) -> impl Stream<Item = BlockInfo> + Unpin {
277        let mut poll_stream = PollerBuilder::<_, Block>::new(
278            self.l1_provider.weak_client(),
279            "eth_getBlockByNumber",
280            (self.tag, false),
281        )
282        .with_poll_interval(self.poll_interval)
283        .into_stream();
284
285        Box::pin(stream! {
286            let mut last_block = None;
287            while let Some(next) = poll_stream.next().await {
288                let info: BlockInfo = next.into_consensus().into();
289
290                if last_block.map(|b| b != info).unwrap_or(true) {
291                    last_block = Some(info);
292                    yield info;
293                }
294            }
295        })
296    }
297}
298
299/// The error type for the [`L1WatcherRpc`].
300#[derive(Error, Debug)]
301pub enum L1WatcherRpcError<T> {
302    /// Error sending the head update event.
303    #[error("Error sending the head update event: {0}")]
304    SendError(#[from] SendError<T>),
305    /// Error in the transport layer.
306    #[error("Transport error: {0}")]
307    Transport(#[from] TransportError),
308    /// The L1 block was not found.
309    #[error("L1 block not found: {0}")]
310    L1BlockNotFound(BlockId),
311    /// Stream ended unexpectedly.
312    #[error("Stream ended unexpectedly")]
313    StreamEnded,
314}