kona_node_service/actors/
rpc.rs1use crate::{NodeActor, actors::CancellableContext};
4use async_trait::async_trait;
5use kona_p2p::P2pRpcRequest;
6use kona_rpc::{
7 AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzResponse, NetworkAdminQuery,
8 OpP2PApiServer, RollupNodeApiServer, SequencerAdminQuery, WsRPC, WsServer,
9};
10use std::time::Duration;
11
12use jsonrpsee::{
13 RpcModule,
14 core::RegisterMethodError,
15 server::{Server, ServerHandle, middleware::http::ProxyGetRequestLayer},
16};
17use kona_engine::EngineQueries;
18use kona_rpc::{L1WatcherQueries, P2pRpc, RollupRpc, RpcBuilder};
19use tokio::sync::mpsc;
20use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
21
22#[derive(Debug, thiserror::Error)]
24pub enum RpcActorError {
25 #[error("Failed to register the healthz endpoint")]
27 RegisterHealthz(#[from] RegisterMethodError),
28 #[error(transparent)]
30 LaunchFailed(#[from] std::io::Error),
31 #[error("RPC server stopped unexpectedly")]
33 ServerStopped,
34 #[error("Failed to stop the RPC server")]
36 StopFailed,
37}
38
39#[derive(Debug)]
41pub struct RpcActor {
42 config: RpcBuilder,
44}
45
46impl RpcActor {
47 pub const fn new(config: RpcBuilder) -> Self {
49 Self { config }
50 }
51}
52
53#[derive(Debug)]
55pub struct RpcContext {
56 pub p2p_network: mpsc::Sender<P2pRpcRequest>,
58 pub network_admin: mpsc::Sender<NetworkAdminQuery>,
60 pub sequencer_admin: Option<mpsc::Sender<SequencerAdminQuery>>,
62 pub l1_watcher_queries: mpsc::Sender<L1WatcherQueries>,
64 pub engine_query: mpsc::Sender<EngineQueries>,
66 pub cancellation: CancellationToken,
68}
69
70impl CancellableContext for RpcContext {
71 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
72 self.cancellation.cancelled()
73 }
74}
75
76async fn launch(
84 config: &RpcBuilder,
85 module: RpcModule<()>,
86) -> Result<ServerHandle, std::io::Error> {
87 let middleware = tower::ServiceBuilder::new()
88 .layer(
89 ProxyGetRequestLayer::new([("/healthz", "healthz")])
90 .expect("Critical: Failed to build GET method proxy"),
91 )
92 .timeout(Duration::from_secs(2));
93 let server = Server::builder().set_http_middleware(middleware).build(config.socket).await?;
94
95 if let Ok(addr) = server.local_addr() {
96 info!(target: "rpc", addr = ?addr, "RPC server bound to address");
97 } else {
98 error!(target: "rpc", "Failed to get local address for RPC server");
99 }
100
101 Ok(server.start(module))
102}
103
104#[async_trait]
105impl NodeActor for RpcActor {
106 type Error = RpcActorError;
107 type OutboundData = RpcContext;
108 type InboundData = ();
109 type Builder = RpcBuilder;
110
111 fn build(config: Self::Builder) -> (Self::InboundData, Self) {
112 ((), Self::new(config))
113 }
114
115 async fn start(
116 mut self,
117 RpcContext {
118 cancellation,
119 p2p_network,
120 l1_watcher_queries,
121 engine_query,
122 network_admin,
123 sequencer_admin,
124 }: Self::OutboundData,
125 ) -> Result<(), Self::Error> {
126 let mut modules = RpcModule::new(());
127
128 modules.register_method("healthz", |_, _, _| {
129 let response = HealthzResponse { version: std::env!("CARGO_PKG_VERSION").to_string() };
130 jsonrpsee::core::RpcResult::Ok(response)
131 })?;
132
133 modules.merge(P2pRpc::new(p2p_network).into_rpc())?;
135
136 modules.merge(
138 AdminRpc { sequencer_sender: sequencer_admin, network_sender: network_admin }
139 .into_rpc(),
140 )?;
141
142 let rollup_rpc = RollupRpc::new(engine_query.clone(), l1_watcher_queries);
144 modules.merge(rollup_rpc.into_rpc())?;
145
146 if self.config.dev_enabled() {
148 let dev_rpc = DevEngineRpc::new(engine_query.clone());
149 modules.merge(dev_rpc.into_rpc())?;
150 }
151
152 if self.config.ws_enabled() {
153 modules.merge(WsRPC::new(engine_query).into_rpc())?;
154 }
155
156 let restarts = self.config.restart_count();
157
158 let mut handle = launch(&self.config, modules.clone()).await?;
159
160 for _ in 0..=restarts {
161 tokio::select! {
162 _ = handle.clone().stopped() => {
163 match launch(&self.config, modules.clone()).await {
164 Ok(h) => handle = h,
165 Err(err) => {
166 error!(target: "rpc", ?err, "Failed to launch rpc server");
167 cancellation.cancel();
168 return Err(RpcActorError::ServerStopped);
169 }
170 }
171 }
172 _ = cancellation.cancelled() => {
173 handle.stop().map_err(|_| RpcActorError::StopFailed)?;
175 return Ok(());
177 }
178 }
179 }
180
181 cancellation.cancel();
183 return Err(RpcActorError::ServerStopped);
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::net::SocketAddr;
190
191 use super::*;
192
193 #[tokio::test]
194 async fn test_launch_no_modules() {
195 let launcher = RpcBuilder {
196 socket: SocketAddr::from(([127, 0, 0, 1], 8080)),
197 no_restart: false,
198 enable_admin: false,
199 admin_persistence: None,
200 ws_enabled: false,
201 dev_enabled: false,
202 };
203 let result = launch(&launcher, RpcModule::new(())).await;
204 assert!(result.is_ok());
205 }
206
207 #[tokio::test]
208 async fn test_launch_with_modules() {
209 let launcher = RpcBuilder {
210 socket: SocketAddr::from(([127, 0, 0, 1], 8081)),
211 no_restart: false,
212 enable_admin: false,
213 admin_persistence: None,
214 ws_enabled: false,
215 dev_enabled: false,
216 };
217 let mut modules = RpcModule::new(());
218
219 modules.merge(RpcModule::new(())).expect("module merge");
220 modules.merge(RpcModule::new(())).expect("module merge");
221 modules.merge(RpcModule::new(())).expect("module merge");
222
223 let result = launch(&launcher, modules).await;
224 assert!(result.is_ok());
225 }
226}