kona_node_service/actors/
l1_watcher_rpc.rs1use crate::{NodeActor, actors::CancellableContext};
5use alloy_eips::{BlockId, BlockNumberOrTag};
6use alloy_primitives::{Address, B256};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::PollerBuilder;
9use alloy_rpc_types_eth::{Block, Log};
10use alloy_transport::TransportError;
11use async_stream::stream;
12use async_trait::async_trait;
13use futures::{Stream, StreamExt};
14use kona_genesis::{RollupConfig, SystemConfigLog, SystemConfigUpdate, UnsafeBlockSignerUpdate};
15use kona_protocol::BlockInfo;
16use kona_rpc::{L1State, L1WatcherQueries};
17use std::{sync::Arc, time::Duration};
18use thiserror::Error;
19use tokio::{
20 select,
21 sync::{
22 mpsc::{self, error::SendError},
23 watch,
24 },
25 task::JoinHandle,
26};
27use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
28
29#[derive(Debug)]
31pub struct L1WatcherRpc {
32 state: L1WatcherRpcState,
33 pub inbound_queries: tokio::sync::mpsc::Receiver<L1WatcherQueries>,
35}
36
37#[derive(Debug)]
39pub struct L1WatcherRpcState {
40 pub rollup: Arc<RollupConfig>,
43 pub l1_provider: RootProvider,
45}
46
47impl L1WatcherRpcState {
48 async fn fetch_logs(&self, block_hash: B256) -> Result<Vec<Log>, L1WatcherRpcError<BlockInfo>> {
50 let logs = self
51 .l1_provider
52 .get_logs(&alloy_rpc_types_eth::Filter::new().select(block_hash))
53 .await?;
54
55 Ok(logs)
56 }
57
58 fn start_query_processor(
60 &self,
61 mut inbound_queries: tokio::sync::mpsc::Receiver<L1WatcherQueries>,
62 head_updates_recv: watch::Receiver<Option<BlockInfo>>,
63 ) -> JoinHandle<()> {
64 let l1_provider = self.l1_provider.clone();
67 let rollup_config = self.rollup.clone();
68
69 tokio::spawn(async move {
70 while let Some(query) = inbound_queries.recv().await {
71 match query {
72 L1WatcherQueries::Config(sender) => {
73 if let Err(e) = sender.send((*rollup_config).clone()) {
74 warn!(target: "l1_watcher", error = ?e, "Failed to send L1 config to the query sender");
75 }
76 }
77 L1WatcherQueries::L1State(sender) => {
78 let current_l1 = *head_updates_recv.borrow();
79
80 let head_l1 = match l1_provider.get_block(BlockId::latest()).await {
81 Ok(block) => block,
82 Err(e) => {
83 warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest head block");
84 None
85 }}.map(|block| block.into_consensus().into());
86
87 let finalized_l1 = match l1_provider.get_block(BlockId::finalized()).await {
88 Ok(block) => block,
89 Err(e) => {
90 warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest finalized block");
91 None
92 }}.map(|block| block.into_consensus().into());
93
94 let safe_l1 = match l1_provider.get_block(BlockId::safe()).await {
95 Ok(block) => block,
96 Err(e) => {
97 warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest safe block");
98 None
99 }}.map(|block| block.into_consensus().into());
100
101 if let Err(e) = sender.send(L1State {
102 current_l1,
103 current_l1_finalized: finalized_l1,
104 head_l1,
105 safe_l1,
106 finalized_l1,
107 }) {
108 warn!(target: "l1_watcher", error = ?e, "Failed to send L1 state to the query sender");
109 }
110 }
111 }
112 }
113
114 error!(target: "l1_watcher", "L1 watcher query channel closed unexpectedly, exiting query processor task.");
115 })
116 }
117}
118
119#[derive(Debug)]
121pub struct L1WatcherRpcInboundChannels {
122 pub inbound_queries: tokio::sync::mpsc::Sender<L1WatcherQueries>,
124}
125
126#[derive(Debug)]
128pub struct L1WatcherRpcContext {
129 pub latest_head: watch::Sender<Option<BlockInfo>>,
131 pub latest_finalized: watch::Sender<Option<BlockInfo>>,
133 pub block_signer_sender: mpsc::Sender<Address>,
135 pub cancellation: CancellationToken,
137}
138
139impl CancellableContext for L1WatcherRpcContext {
140 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
141 self.cancellation.cancelled()
142 }
143}
144
145impl L1WatcherRpc {
146 pub fn new(config: L1WatcherRpcState) -> (L1WatcherRpcInboundChannels, Self) {
148 let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024);
149
150 let actor = Self { state: config, inbound_queries: l1_watcher_queries_recv };
151 (L1WatcherRpcInboundChannels { inbound_queries: l1_watcher_queries_sender }, actor)
152 }
153}
154
155#[async_trait]
156impl NodeActor for L1WatcherRpc {
157 type Error = L1WatcherRpcError<BlockInfo>;
158 type InboundData = L1WatcherRpcInboundChannels;
159 type OutboundData = L1WatcherRpcContext;
160 type Builder = L1WatcherRpcState;
161
162 fn build(config: Self::Builder) -> (Self::InboundData, Self) {
163 Self::new(config)
164 }
165
166 async fn start(
167 mut self,
168 L1WatcherRpcContext { latest_head, latest_finalized, block_signer_sender, cancellation }: Self::OutboundData,
169 ) -> Result<(), Self::Error> {
170 let mut head_stream = BlockStream::new(
171 &self.state.l1_provider,
172 BlockNumberOrTag::Latest,
173 Duration::from_secs(13),
174 )
175 .into_stream();
176 let mut finalized_stream = BlockStream::new(
177 &self.state.l1_provider,
178 BlockNumberOrTag::Finalized,
179 Duration::from_secs(60),
180 )
181 .into_stream();
182
183 let inbound_query_processor =
184 self.state.start_query_processor(self.inbound_queries, latest_head.subscribe());
185
186 loop {
188 select! {
189 _ = cancellation.cancelled() => {
190 info!(
192 target: "l1_watcher",
193 "Received shutdown signal. Exiting L1 watcher task."
194 );
195
196 inbound_query_processor.abort();
198
199 return Ok(());
200 },
201 new_head = head_stream.next() => match new_head {
202 None => {
203 return Err(L1WatcherRpcError::StreamEnded);
204 }
205 Some(head_block_info) => {
206 latest_head.send_replace(Some(head_block_info));
208
209 let logs = self.state.fetch_logs(head_block_info.hash).await?;
214 let ecotone_active = self.state.rollup.is_ecotone_active(head_block_info.timestamp);
215 for log in logs {
216 if log.address() != self.state.rollup.l1_system_config_address {
217 continue; }
219
220 let sys_cfg_log = SystemConfigLog::new(log.into(), ecotone_active);
221 if let Ok(SystemConfigUpdate::UnsafeBlockSigner(UnsafeBlockSignerUpdate { unsafe_block_signer })) = sys_cfg_log.build() {
222 info!(
223 target: "l1_watcher",
224 "Unsafe block signer update: {unsafe_block_signer}"
225 );
226 if let Err(e) = block_signer_sender.send(unsafe_block_signer).await {
227 error!(
228 target: "l1_watcher",
229 "Error sending unsafe block signer update: {e}"
230 );
231 }
232 }
233 }
234 },
235 },
236 new_finalized = finalized_stream.next() => match new_finalized {
237 None => {
238 return Err(L1WatcherRpcError::StreamEnded);
239 }
240 Some(finalized_block_info) => {
241 latest_finalized.send_replace(Some(finalized_block_info));
242 }
243 }
244 }
245 }
246 }
247}
248
249struct BlockStream<'a> {
255 l1_provider: &'a RootProvider,
257 tag: BlockNumberOrTag,
259 poll_interval: Duration,
261}
262
263impl<'a> BlockStream<'a> {
264 fn new(l1_provider: &'a RootProvider, tag: BlockNumberOrTag, poll_interval: Duration) -> Self {
269 if matches!(tag, BlockNumberOrTag::Number(_)) {
270 panic!("Invalid BlockNumberOrTag variant - Must be a tag");
271 }
272 Self { l1_provider, tag, poll_interval }
273 }
274
275 fn into_stream(self) -> impl Stream<Item = BlockInfo> + Unpin {
277 let mut poll_stream = PollerBuilder::<_, Block>::new(
278 self.l1_provider.weak_client(),
279 "eth_getBlockByNumber",
280 (self.tag, false),
281 )
282 .with_poll_interval(self.poll_interval)
283 .into_stream();
284
285 Box::pin(stream! {
286 let mut last_block = None;
287 while let Some(next) = poll_stream.next().await {
288 let info: BlockInfo = next.into_consensus().into();
289
290 if last_block.map(|b| b != info).unwrap_or(true) {
291 last_block = Some(info);
292 yield info;
293 }
294 }
295 })
296 }
297}
298
299#[derive(Error, Debug)]
301pub enum L1WatcherRpcError<T> {
302 #[error("Error sending the head update event: {0}")]
304 SendError(#[from] SendError<T>),
305 #[error("Transport error: {0}")]
307 Transport(#[from] TransportError),
308 #[error("L1 block not found: {0}")]
310 L1BlockNotFound(BlockId),
311 #[error("Stream ended unexpectedly")]
313 StreamEnded,
314}