kaspa_wrpc_server/
service.rs1use crate::{connection::*, router::*, server::*};
2use async_trait::async_trait;
3use kaspa_core::{
4 info,
5 task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture},
6 trace, warn,
7};
8use kaspa_rpc_core::api::ops::RpcApiOps;
9use kaspa_rpc_service::service::RpcCoreService;
10use kaspa_utils::triggers::SingleTrigger;
11use std::sync::Arc;
12use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
13use workflow_rpc::server::prelude::*;
14pub use workflow_rpc::server::{Encoding as WrpcEncoding, WebSocketConfig, WebSocketCounters};
15
16static MAX_WRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 128; pub struct Options {
20 pub listen_address: String,
21 pub grpc_proxy_address: Option<String>,
22 pub verbose: bool,
23}
24
25impl Default for Options {
26 fn default() -> Self {
27 Options { listen_address: "127.0.0.1:17110".to_owned(), verbose: false, grpc_proxy_address: None }
28 }
29}
30
31pub struct KaspaRpcHandler {
47 pub server: Server,
48 pub options: Arc<Options>,
49}
50
51impl KaspaRpcHandler {
52 pub fn new(
53 tasks: usize,
54 encoding: WrpcEncoding,
55 core_service: Option<Arc<RpcCoreService>>,
56 options: Arc<Options>,
57 ) -> KaspaRpcHandler {
58 KaspaRpcHandler { server: Server::new(tasks, encoding, core_service, options.clone()), options }
59 }
60}
61
62#[async_trait]
63impl RpcHandler for KaspaRpcHandler {
64 type Context = Connection;
65
66 async fn handshake(
67 self: Arc<Self>,
68 peer: &SocketAddr,
69 _sender: &mut WebSocketSender,
70 _receiver: &mut WebSocketReceiver,
71 messenger: Arc<Messenger>,
72 ) -> WebSocketResult<Connection> {
73 let connection = self.server.connect(peer, messenger).await.map_err(|err| err.to_string())?;
83 Ok(connection)
84 }
85
86 async fn disconnect(self: Arc<Self>, ctx: Self::Context, _result: WebSocketResult<()>) {
90 self.server.disconnect(ctx).await;
91 }
92}
93
94pub struct WrpcService {
98 options: Arc<Options>,
100 server: RpcServer,
101 rpc_handler: Arc<KaspaRpcHandler>,
102 shutdown: SingleTrigger,
103}
104
105impl WrpcService {
106 pub fn new(
108 tasks: usize,
109 core_service: Option<Arc<RpcCoreService>>,
110 encoding: &Encoding,
111 counters: Arc<WebSocketCounters>,
112 options: Options,
113 ) -> Self {
114 let options = Arc::new(options);
115 let rpc_handler = Arc::new(KaspaRpcHandler::new(tasks, *encoding, core_service, options.clone()));
117
118 let router = Arc::new(Router::new(rpc_handler.server.clone()));
120 let server = RpcServer::new_with_encoding::<Server, Connection, RpcApiOps, Id64>(
122 *encoding,
123 rpc_handler.clone(),
124 router.interface.clone(),
125 Some(counters),
126 false,
127 );
128
129 WrpcService { options, server, rpc_handler, shutdown: SingleTrigger::default() }
130 }
131
132 pub fn serve(self: Arc<Self>) -> OneshotSender<()> {
134 let (termination_sender, termination_receiver) = oneshot_channel::<()>();
135 let listen_address = self.options.listen_address.clone();
136 self.rpc_handler.server.start();
137
138 let service = self.clone();
140 tokio::spawn(async move {
141 let _ = termination_receiver.await;
142 service.server.stop().unwrap_or_else(|err| warn!("wRPC unable to signal shutdown: `{err}`"));
143 service.server.join().await.unwrap_or_else(|err| warn!("wRPC error: `{err}"));
144 });
145
146 info!("WRPC Server starting on: {}", listen_address);
148 tokio::spawn(async move {
149 let config = WebSocketConfig { max_message_size: Some(MAX_WRPC_MESSAGE_SIZE), ..Default::default() };
150 match self.server.bind(&listen_address).await {
151 Ok(listener) => {
152 let serve_result = self.server.listen(listener, Some(config)).await;
153 match serve_result {
154 Ok(_) => info!("WRPC Server stopped on: {}", listen_address),
155 Err(err) => panic!("WRPC Server {listen_address} stopped with error: {err:?}"),
156 }
157 }
158 Err(err) => panic!("WRPC Server bind error on {listen_address}: {err:?}"),
159 }
160 });
161
162 termination_sender
163 }
164}
165
166const WRPC_SERVER: &str = "wrpc-service";
167
168impl AsyncService for WrpcService {
169 fn ident(self: Arc<Self>) -> &'static str {
170 WRPC_SERVER
171 }
172
173 fn start(self: Arc<Self>) -> AsyncServiceFuture {
174 trace!("{} starting", WRPC_SERVER);
175
176 let shutdown_signal = self.shutdown.listener.clone();
178
179 trace!("{} running the wRPC server", WRPC_SERVER);
181 let terminate_server = self.clone().serve();
182
183 Box::pin(async move {
184 shutdown_signal.await;
186
187 self.clone()
189 .rpc_handler
190 .server
191 .join()
192 .await
193 .map_err(|err| AsyncServiceError::Service(format!("Notification system error: `{err}`")))?;
194
195 drop(terminate_server);
197
198 Ok(())
199 })
200 }
201
202 fn signal_exit(self: Arc<Self>) {
203 trace!("sending an exit signal to {}", WRPC_SERVER);
204 self.shutdown.trigger.trigger();
205 }
206
207 fn stop(self: Arc<Self>) -> AsyncServiceFuture {
208 Box::pin(async move {
209 trace!("{} stopped", WRPC_SERVER);
210 Ok(())
211 })
212 }
213}