zebra_rpc/
server.rs

1//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
2//!
3//! This endpoint is compatible with clients that incorrectly send
4//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
5//! such as `lightwalletd`.
6//!
7//! See the full list of
8//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
9
10use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tracing::*;
16
17use zebra_chain::{chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network};
18use zebra_consensus::router::service_trait::BlockVerifierService;
19use zebra_network::AddressBookPeers;
20use zebra_node_services::mempool::MempoolService;
21use zebra_state::{ReadState as ReadStateService, State as StateService};
22
23use crate::{
24    config,
25    methods::{RpcImpl, RpcServer as _},
26    server::{
27        http_request_compatibility::HttpRequestMiddlewareLayer,
28        rpc_call_compatibility::FixRpcResponseMiddleware,
29    },
30};
31
32pub mod cookie;
33pub mod error;
34pub mod http_request_compatibility;
35pub mod rpc_call_compatibility;
36
37#[cfg(test)]
38mod tests;
39
40/// Zebra RPC Server
41#[derive(Clone)]
42pub struct RpcServer {
43    /// The RPC config.
44    config: config::rpc::Config,
45
46    /// The configured network.
47    network: Network,
48
49    /// Zebra's application version, with build metadata.
50    build_version: String,
51
52    /// A server handle used to shuts down the RPC server.
53    close_handle: ServerHandle,
54}
55
56impl fmt::Debug for RpcServer {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.debug_struct("RpcServer")
59            .field("config", &self.config)
60            .field("network", &self.network)
61            .field("build_version", &self.build_version)
62            .field(
63                "close_handle",
64                // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
65                &"ServerHandle",
66            )
67            .finish()
68    }
69}
70
71/// The message to log when logging the RPC server's listen address
72pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
73
74type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
75
76impl RpcServer {
77    /// Starts the RPC server.
78    ///
79    /// `build_version` and `user_agent` are version strings for the application,
80    /// which are used in RPC responses.
81    ///
82    /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
83    /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
84    ///
85    /// # Panics
86    ///
87    /// - If [`Config::listen_addr`](config::rpc::Config::listen_addr) is `None`.
88    //
89    // TODO:
90    // - replace VersionString with semver::Version, and update the tests to provide valid versions
91    #[allow(clippy::too_many_arguments)]
92    pub async fn start<
93        Mempool,
94        State,
95        ReadState,
96        Tip,
97        BlockVerifierRouter,
98        SyncStatus,
99        AddressBook,
100    >(
101        rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
102        conf: config::rpc::Config,
103    ) -> Result<ServerTask, tower::BoxError>
104    where
105        Mempool: MempoolService,
106        State: StateService,
107        ReadState: ReadStateService,
108        Tip: ChainTip + Clone + Send + Sync + 'static,
109        AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
110        BlockVerifierRouter: BlockVerifierService,
111        SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
112    {
113        let listen_addr = conf
114            .listen_addr
115            .expect("caller should make sure listen_addr is set");
116
117        let http_middleware_layer = if conf.enable_cookie_auth {
118            let cookie = Cookie::default();
119            cookie::write_to_disk(&cookie, &conf.cookie_dir)
120                .expect("Zebra must be able to write the auth cookie to the disk");
121            HttpRequestMiddlewareLayer::new(Some(cookie))
122        } else {
123            HttpRequestMiddlewareLayer::new(None)
124        };
125
126        let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
127
128        let rpc_middleware = RpcServiceBuilder::new()
129            .rpc_logger(1024)
130            .layer_fn(FixRpcResponseMiddleware::new);
131
132        let server = Server::builder()
133            .http_only()
134            .set_http_middleware(http_middleware)
135            .set_rpc_middleware(rpc_middleware)
136            .build(listen_addr)
137            .await?;
138
139        info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
140
141        Ok(tokio::spawn(async move {
142            server.start(rpc.into_rpc()).stopped().await;
143            Ok(())
144        }))
145    }
146
147    /// Shut down this RPC server, blocking the current thread.
148    ///
149    /// This method can be called from within a tokio executor without panicking.
150    /// But it is blocking, so `shutdown()` should be used instead.
151    pub fn shutdown_blocking(&self) {
152        Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
153    }
154
155    /// Shut down this RPC server asynchronously.
156    /// Returns a task that completes when the server is shut down.
157    pub fn shutdown(&self) -> JoinHandle<()> {
158        let close_handle = self.close_handle.clone();
159        let config = self.config.clone();
160        let span = Span::current();
161
162        tokio::task::spawn_blocking(move || {
163            span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
164        })
165    }
166
167    /// Shuts down this RPC server using its `close_handle`.
168    ///
169    /// See `shutdown_blocking()` for details.
170    fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
171        // The server is a blocking task, so it can't run inside a tokio thread.
172        // See the note at wait_on_server.
173        let span = Span::current();
174        let wait_on_shutdown = move || {
175            span.in_scope(|| {
176                if config.enable_cookie_auth {
177                    if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
178                        warn!(
179                            ?err,
180                            "unexpectedly could not remove the rpc auth cookie from the disk"
181                        )
182                    }
183                }
184
185                info!("Stopping RPC server");
186                let _ = close_handle.stop();
187                debug!("Stopped RPC server");
188            })
189        };
190
191        let span = Span::current();
192        let thread_handle = std::thread::spawn(wait_on_shutdown);
193
194        // Propagate panics from the inner std::thread to the outer tokio blocking task
195        span.in_scope(|| match thread_handle.join() {
196            Ok(()) => (),
197            Err(panic_object) => panic::resume_unwind(panic_object),
198        })
199    }
200}
201
202impl Drop for RpcServer {
203    fn drop(&mut self) {
204        // Block on shutting down, propagating panics.
205        // This can take around 150 seconds.
206        //
207        // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
208        self.shutdown_blocking();
209    }
210}