kona_node_service/actors/
rpc.rs

1//! RPC Server Actor
2
3use crate::{NodeActor, actors::CancellableContext};
4use async_trait::async_trait;
5use kona_p2p::P2pRpcRequest;
6use kona_rpc::{
7    AdminApiServer, AdminRpc, DevEngineApiServer, DevEngineRpc, HealthzResponse, NetworkAdminQuery,
8    OpP2PApiServer, RollupNodeApiServer, SequencerAdminQuery, WsRPC, WsServer,
9};
10use std::time::Duration;
11
12use jsonrpsee::{
13    RpcModule,
14    core::RegisterMethodError,
15    server::{Server, ServerHandle, middleware::http::ProxyGetRequestLayer},
16};
17use kona_engine::EngineQueries;
18use kona_rpc::{L1WatcherQueries, P2pRpc, RollupRpc, RpcBuilder};
19use tokio::sync::mpsc;
20use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
21
22/// An error returned by the [`RpcActor`].
23#[derive(Debug, thiserror::Error)]
24pub enum RpcActorError {
25    /// Failed to register the healthz endpoint.
26    #[error("Failed to register the healthz endpoint")]
27    RegisterHealthz(#[from] RegisterMethodError),
28    /// Failed to launch the RPC server.
29    #[error(transparent)]
30    LaunchFailed(#[from] std::io::Error),
31    /// The [`RpcActor`]'s RPC server stopped unexpectedly.
32    #[error("RPC server stopped unexpectedly")]
33    ServerStopped,
34    /// Failed to stop the RPC server.
35    #[error("Failed to stop the RPC server")]
36    StopFailed,
37}
38
39/// An actor that handles the RPC server for the rollup node.
40#[derive(Debug)]
41pub struct RpcActor {
42    /// A launcher for the rpc.
43    config: RpcBuilder,
44}
45
46impl RpcActor {
47    /// Constructs a new [`RpcActor`] given the [`RpcBuilder`].
48    pub const fn new(config: RpcBuilder) -> Self {
49        Self { config }
50    }
51}
52
53/// The communication context used by the RPC actor.
54#[derive(Debug)]
55pub struct RpcContext {
56    /// The network p2p rpc sender.
57    pub p2p_network: mpsc::Sender<P2pRpcRequest>,
58    /// The network admin rpc sender.
59    pub network_admin: mpsc::Sender<NetworkAdminQuery>,
60    /// The sequencer admin rpc sender.
61    pub sequencer_admin: Option<mpsc::Sender<SequencerAdminQuery>>,
62    /// The l1 watcher queries sender.
63    pub l1_watcher_queries: mpsc::Sender<L1WatcherQueries>,
64    /// The engine query sender.
65    pub engine_query: mpsc::Sender<EngineQueries>,
66    /// The cancellation token, shared between all tasks.
67    pub cancellation: CancellationToken,
68}
69
70impl CancellableContext for RpcContext {
71    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
72        self.cancellation.cancelled()
73    }
74}
75
76/// Launches the jsonrpsee [`Server`].
77///
78/// If the RPC server is disabled, this will return `Ok(None)`.
79///
80/// ## Errors
81///
82/// - [`std::io::Error`] if the server fails to start.
83async fn launch(
84    config: &RpcBuilder,
85    module: RpcModule<()>,
86) -> Result<ServerHandle, std::io::Error> {
87    let middleware = tower::ServiceBuilder::new()
88        .layer(
89            ProxyGetRequestLayer::new([("/healthz", "healthz")])
90                .expect("Critical: Failed to build GET method proxy"),
91        )
92        .timeout(Duration::from_secs(2));
93    let server = Server::builder().set_http_middleware(middleware).build(config.socket).await?;
94
95    if let Ok(addr) = server.local_addr() {
96        info!(target: "rpc", addr = ?addr, "RPC server bound to address");
97    } else {
98        error!(target: "rpc", "Failed to get local address for RPC server");
99    }
100
101    Ok(server.start(module))
102}
103
104#[async_trait]
105impl NodeActor for RpcActor {
106    type Error = RpcActorError;
107    type OutboundData = RpcContext;
108    type InboundData = ();
109    type Builder = RpcBuilder;
110
111    fn build(config: Self::Builder) -> (Self::InboundData, Self) {
112        ((), Self::new(config))
113    }
114
115    async fn start(
116        mut self,
117        RpcContext {
118            cancellation,
119            p2p_network,
120            l1_watcher_queries,
121            engine_query,
122            network_admin,
123            sequencer_admin,
124        }: Self::OutboundData,
125    ) -> Result<(), Self::Error> {
126        let mut modules = RpcModule::new(());
127
128        modules.register_method("healthz", |_, _, _| {
129            let response = HealthzResponse { version: std::env!("CARGO_PKG_VERSION").to_string() };
130            jsonrpsee::core::RpcResult::Ok(response)
131        })?;
132
133        // Build the p2p rpc module.
134        modules.merge(P2pRpc::new(p2p_network).into_rpc())?;
135
136        // Build the admin rpc module.
137        modules.merge(
138            AdminRpc { sequencer_sender: sequencer_admin, network_sender: network_admin }
139                .into_rpc(),
140        )?;
141
142        // Create context for communication between actors.
143        let rollup_rpc = RollupRpc::new(engine_query.clone(), l1_watcher_queries);
144        modules.merge(rollup_rpc.into_rpc())?;
145
146        // Add development RPC module for engine state introspection if enabled
147        if self.config.dev_enabled() {
148            let dev_rpc = DevEngineRpc::new(engine_query.clone());
149            modules.merge(dev_rpc.into_rpc())?;
150        }
151
152        if self.config.ws_enabled() {
153            modules.merge(WsRPC::new(engine_query).into_rpc())?;
154        }
155
156        let restarts = self.config.restart_count();
157
158        let mut handle = launch(&self.config, modules.clone()).await?;
159
160        for _ in 0..=restarts {
161            tokio::select! {
162                _ = handle.clone().stopped() => {
163                    match launch(&self.config, modules.clone()).await {
164                        Ok(h) => handle = h,
165                        Err(err) => {
166                            error!(target: "rpc", ?err, "Failed to launch rpc server");
167                            cancellation.cancel();
168                            return Err(RpcActorError::ServerStopped);
169                        }
170                    }
171                }
172                _ = cancellation.cancelled() => {
173                    // The cancellation token has been triggered, so we should stop the server.
174                    handle.stop().map_err(|_| RpcActorError::StopFailed)?;
175                    // Since the RPC Server didn't originate the error, we should return Ok.
176                    return Ok(());
177                }
178            }
179        }
180
181        // Stop the node if there has already been 3 rpc restarts.
182        cancellation.cancel();
183        return Err(RpcActorError::ServerStopped);
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::net::SocketAddr;
190
191    use super::*;
192
193    #[tokio::test]
194    async fn test_launch_no_modules() {
195        let launcher = RpcBuilder {
196            socket: SocketAddr::from(([127, 0, 0, 1], 8080)),
197            no_restart: false,
198            enable_admin: false,
199            admin_persistence: None,
200            ws_enabled: false,
201            dev_enabled: false,
202        };
203        let result = launch(&launcher, RpcModule::new(())).await;
204        assert!(result.is_ok());
205    }
206
207    #[tokio::test]
208    async fn test_launch_with_modules() {
209        let launcher = RpcBuilder {
210            socket: SocketAddr::from(([127, 0, 0, 1], 8081)),
211            no_restart: false,
212            enable_admin: false,
213            admin_persistence: None,
214            ws_enabled: false,
215            dev_enabled: false,
216        };
217        let mut modules = RpcModule::new(());
218
219        modules.merge(RpcModule::new(())).expect("module merge");
220        modules.merge(RpcModule::new(())).expect("module merge");
221        modules.merge(RpcModule::new(())).expect("module merge");
222
223        let result = launch(&launcher, modules).await;
224        assert!(result.is_ok());
225    }
226}