1use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network};
18use zebra_consensus::router::service_trait::BlockVerifierService;
19use zebra_network::AddressBookPeers;
20use zebra_node_services::mempool::MempoolService;
21use zebra_state::{ReadState as ReadStateService, State as StateService};
22
23use crate::{
24 config,
25 methods::{RpcImpl, RpcServer as _},
26 server::{
27 http_request_compatibility::HttpRequestMiddlewareLayer,
28 rpc_call_compatibility::FixRpcResponseMiddleware,
29 },
30};
31
32pub mod cookie;
33pub mod error;
34pub mod http_request_compatibility;
35pub mod rpc_call_compatibility;
36
37#[cfg(test)]
38mod tests;
39
40#[derive(Clone)]
42pub struct RpcServer {
43 config: config::rpc::Config,
45
46 network: Network,
48
49 build_version: String,
51
52 close_handle: ServerHandle,
54}
55
56impl fmt::Debug for RpcServer {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58 f.debug_struct("RpcServer")
59 .field("config", &self.config)
60 .field("network", &self.network)
61 .field("build_version", &self.build_version)
62 .field(
63 "close_handle",
64 &"ServerHandle",
66 )
67 .finish()
68 }
69}
70
71pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
73
74type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
75
76impl RpcServer {
77 #[allow(clippy::too_many_arguments)]
92 pub async fn start<
93 Mempool,
94 State,
95 ReadState,
96 Tip,
97 BlockVerifierRouter,
98 SyncStatus,
99 AddressBook,
100 >(
101 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
102 conf: config::rpc::Config,
103 ) -> Result<ServerTask, tower::BoxError>
104 where
105 Mempool: MempoolService,
106 State: StateService,
107 ReadState: ReadStateService,
108 Tip: ChainTip + Clone + Send + Sync + 'static,
109 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
110 BlockVerifierRouter: BlockVerifierService,
111 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
112 {
113 let listen_addr = conf
114 .listen_addr
115 .expect("caller should make sure listen_addr is set");
116
117 let http_middleware_layer = if conf.enable_cookie_auth {
118 let cookie = Cookie::default();
119 cookie::write_to_disk(&cookie, &conf.cookie_dir)
120 .expect("Zebra must be able to write the auth cookie to the disk");
121 HttpRequestMiddlewareLayer::new(Some(cookie))
122 } else {
123 HttpRequestMiddlewareLayer::new(None)
124 };
125
126 let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
127
128 let rpc_middleware = RpcServiceBuilder::new()
129 .rpc_logger(1024)
130 .layer_fn(FixRpcResponseMiddleware::new);
131
132 let server = Server::builder()
133 .http_only()
134 .set_http_middleware(http_middleware)
135 .set_rpc_middleware(rpc_middleware)
136 .build(listen_addr)
137 .await?;
138
139 info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
140
141 Ok(tokio::spawn(async move {
142 server.start(rpc.into_rpc()).stopped().await;
143 Ok(())
144 }))
145 }
146
147 pub fn shutdown_blocking(&self) {
152 Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
153 }
154
155 pub fn shutdown(&self) -> JoinHandle<()> {
158 let close_handle = self.close_handle.clone();
159 let config = self.config.clone();
160 let span = Span::current();
161
162 tokio::task::spawn_blocking(move || {
163 span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
164 })
165 }
166
167 fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
171 let span = Span::current();
174 let wait_on_shutdown = move || {
175 span.in_scope(|| {
176 if config.enable_cookie_auth {
177 if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
178 warn!(
179 ?err,
180 "unexpectedly could not remove the rpc auth cookie from the disk"
181 )
182 }
183 }
184
185 info!("Stopping RPC server");
186 let _ = close_handle.stop();
187 debug!("Stopped RPC server");
188 })
189 };
190
191 let span = Span::current();
192 let thread_handle = std::thread::spawn(wait_on_shutdown);
193
194 span.in_scope(|| match thread_handle.join() {
196 Ok(()) => (),
197 Err(panic_object) => panic::resume_unwind(panic_object),
198 })
199 }
200}
201
202impl Drop for RpcServer {
203 fn drop(&mut self) {
204 self.shutdown_blocking();
209 }
210}