1use jsonrpsee::{
4 PendingSubscriptionSink, SubscriptionSink, core::SubscriptionResult, tracing::warn,
5};
6use kona_engine::{EngineQueries, EngineQuerySender, EngineState};
7use kona_protocol::L2BlockInfo;
8
9use jsonrpsee::core::to_json_raw_value;
10
11use crate::jsonrpsee::WsServer;
12
13#[derive(Debug)]
15pub struct WsRPC {
16 engine_query_sender: EngineQuerySender,
18}
19
20impl WsRPC {
21 pub const fn new(engine_query_sender: EngineQuerySender) -> Self {
23 Self { engine_query_sender }
24 }
25
26 async fn engine_state_watcher(
27 &self,
28 ) -> Result<tokio::sync::watch::Receiver<EngineState>, jsonrpsee::core::SubscriptionError> {
29 let (query_tx, query_rx) = tokio::sync::oneshot::channel();
30
31 if let Err(e) = self.engine_query_sender.send(EngineQueries::StateReceiver(query_tx)).await
32 {
33 warn!(target: "rpc::ws", ?e, "Failed to send engine state receiver query. The engine query handler is likely closed.");
34 return Err(jsonrpsee::core::SubscriptionError::from(
35 "Internal error. Failed to send engine state receiver query. The engine query handler is likely closed.",
36 ));
37 }
38
39 query_rx.await.map_err(|_| jsonrpsee::core::SubscriptionError::from("Internal error. Failed to receive engine state receiver query. The engine query handler is likely closed."))
40 }
41
42 async fn send_state_update(
43 sink: &SubscriptionSink,
44 state: L2BlockInfo,
45 ) -> Result<(), jsonrpsee::core::SubscriptionError> {
46 sink.send(to_json_raw_value(&state).map_err(|_| {
47 jsonrpsee::core::SubscriptionError::from(
48 "Internal error. Impossible to convert l2 block info to json",
49 )
50 })?)
51 .await
52 .map_err(|_| {
53 jsonrpsee::core::SubscriptionError::from(
54 "Failed to send head update. Subscription likely dropped.",
55 )
56 })
57 }
58}
59
60#[async_trait::async_trait]
61impl WsServer for WsRPC {
62 async fn ws_safe_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
63 let sink = sink.accept().await?;
64
65 let mut subscription = self.engine_state_watcher().await?;
66
67 let mut current_safe_head = subscription.borrow().sync_state.safe_head();
68
69 Self::send_state_update(&sink, current_safe_head).await?;
70
71 while let Ok(new_state) = subscription
72 .wait_for(|state| state.sync_state.safe_head() != current_safe_head)
73 .await
74 .map(|state| *state)
75 {
76 current_safe_head = new_state.sync_state.safe_head();
77 Self::send_state_update(&sink, current_safe_head).await?;
78 }
79
80 warn!(target: "rpc::ws", "Subscription to safe head updates has been closed.");
81 Ok(())
82 }
83
84 async fn ws_finalized_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
85 let sink = sink.accept().await?;
86
87 let mut subscription = self.engine_state_watcher().await?;
88
89 let mut current_finalized_head = subscription.borrow().sync_state.finalized_head();
90
91 Self::send_state_update(&sink, current_finalized_head).await?;
92
93 while let Ok(new_state) = subscription
94 .wait_for(|state| state.sync_state.finalized_head() != current_finalized_head)
95 .await
96 .map(|state| *state)
97 {
98 current_finalized_head = new_state.sync_state.finalized_head();
99 Self::send_state_update(&sink, current_finalized_head).await?;
100 }
101
102 warn!(target: "rpc::ws", "Subscription to finalized head updates has been closed.");
103 Ok(())
104 }
105
106 async fn ws_unsafe_head_updates(&self, sink: PendingSubscriptionSink) -> SubscriptionResult {
107 let sink = sink.accept().await?;
108
109 let mut subscription = self.engine_state_watcher().await?;
110
111 let mut current_unsafe_head = subscription.borrow().sync_state.unsafe_head();
112
113 Self::send_state_update(&sink, current_unsafe_head).await?;
114
115 while let Ok(new_state) = subscription
116 .wait_for(|state| state.sync_state.unsafe_head() != current_unsafe_head)
117 .await
118 .map(|state| *state)
119 {
120 current_unsafe_head = new_state.sync_state.unsafe_head();
121 Self::send_state_update(&sink, current_unsafe_head).await?;
122 }
123
124 warn!(target: "rpc::ws", "Subscription to unsafe head updates has been closed.");
125 Ok(())
126 }
127}