kaspa_wrpc_server/
service.rs

1use crate::{connection::*, router::*, server::*};
2use async_trait::async_trait;
3use kaspa_core::{
4    info,
5    task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture},
6    trace, warn,
7};
8use kaspa_rpc_core::api::ops::RpcApiOps;
9use kaspa_rpc_service::service::RpcCoreService;
10use kaspa_utils::triggers::SingleTrigger;
11use std::sync::Arc;
12use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
13use workflow_rpc::server::prelude::*;
14pub use workflow_rpc::server::{Encoding as WrpcEncoding, WebSocketConfig, WebSocketCounters};
15
16static MAX_WRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 128; // 128MB
17
18/// Options for configuring the wRPC server
19pub struct Options {
20    pub listen_address: String,
21    pub grpc_proxy_address: Option<String>,
22    pub verbose: bool,
23}
24
25impl Default for Options {
26    fn default() -> Self {
27        Options { listen_address: "127.0.0.1:17110".to_owned(), verbose: false, grpc_proxy_address: None }
28    }
29}
30
31/// ### KaspaRpcHandler
32///
33/// [`KaspaRpcHandler`] is a handler struct that implements the [`RpcHandler`] trait
34/// allowing it to receive [`connect()`](RpcHandler::connect),
35/// [`disconnect()`](RpcHandler::disconnect) and [`handshake()`](RpcHandler::handshake)
36/// calls invoked by the [`RpcServer`].
37///
38/// [`RpcHandler::handshake`] is called by the [`RpcServer`] supplying the [`Messenger`]
39/// and expecting user to return a `ServerContext` struct (or an `Arc` of) where
40/// this struct will be supplied to each RPC method call.  Each RPC method call receives
41/// 3 arguments - `ServerContext`, `ConnectionContext` and `Request`. Upon completion
42/// the method should return a `Result`.
43///
44/// RPC method handling is implemented in the [`Router`].
45///
46pub struct KaspaRpcHandler {
47    pub server: Server,
48    pub options: Arc<Options>,
49}
50
51impl KaspaRpcHandler {
52    pub fn new(
53        tasks: usize,
54        encoding: WrpcEncoding,
55        core_service: Option<Arc<RpcCoreService>>,
56        options: Arc<Options>,
57    ) -> KaspaRpcHandler {
58        KaspaRpcHandler { server: Server::new(tasks, encoding, core_service, options.clone()), options }
59    }
60}
61
62#[async_trait]
63impl RpcHandler for KaspaRpcHandler {
64    type Context = Connection;
65
66    async fn handshake(
67        self: Arc<Self>,
68        peer: &SocketAddr,
69        _sender: &mut WebSocketSender,
70        _receiver: &mut WebSocketReceiver,
71        messenger: Arc<Messenger>,
72    ) -> WebSocketResult<Connection> {
73        // TODO - discuss and implement handshake
74        // handshake::greeting(
75        //     std::time::Duration::from_millis(3000),
76        //     sender,
77        //     receiver,
78        //     Box::pin(|msg| if msg != "kaspa" { Err(WebSocketError::NegotiationFailure) } else { Ok(()) }),
79        // )
80        // .await
81
82        let connection = self.server.connect(peer, messenger).await.map_err(|err| err.to_string())?;
83        Ok(connection)
84    }
85
86    /// Disconnect the websocket. Receives `Connection` (a.k.a `Self::Context`)
87    /// before dropping it. This is the last chance to cleanup and resources owned by
88    /// this connection. Delegate to Server.
89    async fn disconnect(self: Arc<Self>, ctx: Self::Context, _result: WebSocketResult<()>) {
90        self.server.disconnect(ctx).await;
91    }
92}
93
94///
95///  wRPC Server - A wrapper around and an initializer of the RpcServer
96///
97pub struct WrpcService {
98    // TODO: see if tha Adapter/ConnectionHandler design of P2P and gRPC can be applied here too
99    options: Arc<Options>,
100    server: RpcServer,
101    rpc_handler: Arc<KaspaRpcHandler>,
102    shutdown: SingleTrigger,
103}
104
105impl WrpcService {
106    /// Create and initialize RpcServer
107    pub fn new(
108        tasks: usize,
109        core_service: Option<Arc<RpcCoreService>>,
110        encoding: &Encoding,
111        counters: Arc<WebSocketCounters>,
112        options: Options,
113    ) -> Self {
114        let options = Arc::new(options);
115        // Create handle to manage connections
116        let rpc_handler = Arc::new(KaspaRpcHandler::new(tasks, *encoding, core_service, options.clone()));
117
118        // Create router (initializes Interface registering RPC method and notification handlers)
119        let router = Arc::new(Router::new(rpc_handler.server.clone()));
120        // Create a server
121        let server = RpcServer::new_with_encoding::<Server, Connection, RpcApiOps, Id64>(
122            *encoding,
123            rpc_handler.clone(),
124            router.interface.clone(),
125            Some(counters),
126            false,
127        );
128
129        WrpcService { options, server, rpc_handler, shutdown: SingleTrigger::default() }
130    }
131
132    /// Start listening on the configured address (will panic if the socket listen() fails)
133    pub fn serve(self: Arc<Self>) -> OneshotSender<()> {
134        let (termination_sender, termination_receiver) = oneshot_channel::<()>();
135        let listen_address = self.options.listen_address.clone();
136        self.rpc_handler.server.start();
137
138        // Spawn a task stopping the server on termination signal
139        let service = self.clone();
140        tokio::spawn(async move {
141            let _ = termination_receiver.await;
142            service.server.stop().unwrap_or_else(|err| warn!("wRPC unable to signal shutdown: `{err}`"));
143            service.server.join().await.unwrap_or_else(|err| warn!("wRPC error: `{err}"));
144        });
145
146        // Spawn a task running the server
147        info!("WRPC Server starting on: {}", listen_address);
148        tokio::spawn(async move {
149            let config = WebSocketConfig { max_message_size: Some(MAX_WRPC_MESSAGE_SIZE), ..Default::default() };
150            match self.server.bind(&listen_address).await {
151                Ok(listener) => {
152                    let serve_result = self.server.listen(listener, Some(config)).await;
153                    match serve_result {
154                        Ok(_) => info!("WRPC Server stopped on: {}", listen_address),
155                        Err(err) => panic!("WRPC Server {listen_address} stopped with error: {err:?}"),
156                    }
157                }
158                Err(err) => panic!("WRPC Server bind error on {listen_address}: {err:?}"),
159            }
160        });
161
162        termination_sender
163    }
164}
165
166const WRPC_SERVER: &str = "wrpc-service";
167
168impl AsyncService for WrpcService {
169    fn ident(self: Arc<Self>) -> &'static str {
170        WRPC_SERVER
171    }
172
173    fn start(self: Arc<Self>) -> AsyncServiceFuture {
174        trace!("{} starting", WRPC_SERVER);
175
176        // Prepare a shutdown signal receiver
177        let shutdown_signal = self.shutdown.listener.clone();
178
179        // Run the server
180        trace!("{} running the wRPC server", WRPC_SERVER);
181        let terminate_server = self.clone().serve();
182
183        Box::pin(async move {
184            // Keep the gRPC server running until a service shutdown signal is received
185            shutdown_signal.await;
186
187            // Wait for the notifier to shutdown
188            self.clone()
189                .rpc_handler
190                .server
191                .join()
192                .await
193                .map_err(|err| AsyncServiceError::Service(format!("Notification system error: `{err}`")))?;
194
195            // Signal server termination
196            drop(terminate_server);
197
198            Ok(())
199        })
200    }
201
202    fn signal_exit(self: Arc<Self>) {
203        trace!("sending an exit signal to {}", WRPC_SERVER);
204        self.shutdown.trigger.trigger();
205    }
206
207    fn stop(self: Arc<Self>) -> AsyncServiceFuture {
208        Box::pin(async move {
209            trace!("{} stopped", WRPC_SERVER);
210            Ok(())
211        })
212    }
213}