kona_rpc/
ws.rs

1//! Custom RPC subscription endpoints to for the kona node to stream internal state/data.
2
3use jsonrpsee::{
4    PendingSubscriptionSink, SubscriptionSink, core::SubscriptionResult, tracing::warn,
5};
6use kona_engine::{EngineQueries, EngineQuerySender, EngineState};
7use kona_protocol::L2BlockInfo;
8
9use jsonrpsee::core::to_json_raw_value;
10
11use crate::jsonrpsee::WsServer;
12
13/// An RPC server that handles subscriptions to the node's state.
14#[derive(Debug)]
15pub struct WsRPC {
16    /// The engine query sender.
17    engine_query_sender: EngineQuerySender,
18}
19
20impl WsRPC {
21    /// Constructs a new [`WsRPC`] instance.
22    pub const fn new(engine_query_sender: EngineQuerySender) -> Self {
23        Self { engine_query_sender }
24    }
25
26    async fn engine_state_watcher(
27        &self,
28    ) -> Result<tokio::sync::watch::Receiver<EngineState>, jsonrpsee::core::SubscriptionError> {
29        let (query_tx, query_rx) = tokio::sync::oneshot::channel();
30
31        if let Err(e) = self.engine_query_sender.send(EngineQueries::StateReceiver(query_tx)).await
32        {
33            warn!(target: "rpc::ws", ?e, "Failed to send engine state receiver query. The engine query handler is likely closed.");
34            return Err(jsonrpsee::core::SubscriptionError::from(
35                "Internal error. Failed to send engine state receiver query. The engine query handler is likely closed.",
36            ));
37        }
38
39        query_rx.await.map_err(|_| jsonrpsee::core::SubscriptionError::from("Internal error. Failed to receive engine state receiver query. The engine query handler is likely closed."))
40    }
41
42    async fn send_state_update(
43        sink: &SubscriptionSink,
44        state: L2BlockInfo,
45    ) -> Result<(), jsonrpsee::core::SubscriptionError> {
46        sink.send(to_json_raw_value(&state).map_err(|_| {
47            jsonrpsee::core::SubscriptionError::from(
48                "Internal error. Impossible to convert l2 block info to json",
49            )
50        })?)
51        .await
52        .map_err(|_| {
53            jsonrpsee::core::SubscriptionError::from(
54                "Failed to send head update. Subscription likely dropped.",
55            )
56        })
57    }
58}
59
60#[async_trait::async_trait]
61impl WsServer for WsRPC {
62    async fn ws_safe_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
63        let sink = sink.accept().await?;
64
65        let mut subscription = self.engine_state_watcher().await?;
66
67        let mut current_safe_head = subscription.borrow().sync_state.safe_head();
68
69        Self::send_state_update(&sink, current_safe_head).await?;
70
71        while let Ok(new_state) = subscription
72            .wait_for(|state| state.sync_state.safe_head() != current_safe_head)
73            .await
74            .map(|state| *state)
75        {
76            current_safe_head = new_state.sync_state.safe_head();
77            Self::send_state_update(&sink, current_safe_head).await?;
78        }
79
80        warn!(target: "rpc::ws", "Subscription to safe head updates has been closed.");
81        Ok(())
82    }
83
84    async fn ws_finalized_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
85        let sink = sink.accept().await?;
86
87        let mut subscription = self.engine_state_watcher().await?;
88
89        let mut current_finalized_head = subscription.borrow().sync_state.finalized_head();
90
91        Self::send_state_update(&sink, current_finalized_head).await?;
92
93        while let Ok(new_state) = subscription
94            .wait_for(|state| state.sync_state.finalized_head() != current_finalized_head)
95            .await
96            .map(|state| *state)
97        {
98            current_finalized_head = new_state.sync_state.finalized_head();
99            Self::send_state_update(&sink, current_finalized_head).await?;
100        }
101
102        warn!(target: "rpc::ws", "Subscription to finalized head updates has been closed.");
103        Ok(())
104    }
105
106    async fn ws_unsafe_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
107        let sink = sink.accept().await?;
108
109        let mut subscription = self.engine_state_watcher().await?;
110
111        let mut current_unsafe_head = subscription.borrow().sync_state.unsafe_head();
112
113        Self::send_state_update(&sink, current_unsafe_head).await?;
114
115        while let Ok(new_state) = subscription
116            .wait_for(|state| state.sync_state.unsafe_head() != current_unsafe_head)
117            .await
118            .map(|state| *state)
119        {
120            current_unsafe_head = new_state.sync_state.unsafe_head();
121            Self::send_state_update(&sink, current_unsafe_head).await?;
122        }
123
124        warn!(target: "rpc::ws", "Subscription to unsafe head updates has been closed.");
125        Ok(())
126    }
127}