kaspa_wrpc_server/
server.rs

1use crate::{
2    collector::{WrpcServiceCollector, WrpcServiceConverter},
3    connection::Connection,
4    result::Result,
5    service::Options,
6};
7use kaspa_grpc_client::GrpcClient;
8use kaspa_notify::{
9    connection::ChannelType,
10    events::EVENT_TYPE_ARRAY,
11    listener::ListenerLifespan,
12    notifier::Notifier,
13    scope::Scope,
14    subscriber::Subscriber,
15    subscription::{MutationPolicies, UtxosChangedMutationPolicy},
16};
17use kaspa_rpc_core::{
18    api::rpc::{DynRpcService, RpcApi},
19    notify::{channel::NotificationChannel, connection::ChannelConnection, mode::NotificationMode},
20    Notification, RpcResult,
21};
22use kaspa_rpc_service::service::RpcCoreService;
23use std::{
24    collections::HashMap,
25    sync::{
26        atomic::{AtomicU64, Ordering},
27        Arc, Mutex,
28    },
29};
30use workflow_log::*;
31use workflow_rpc::server::prelude::*;
32
33pub type WrpcNotifier = Notifier<Notification, Connection>;
34
35struct RpcCore {
36    pub service: Arc<RpcCoreService>,
37    pub wrpc_notifier: Arc<WrpcNotifier>,
38}
39
40struct ServerInner {
41    pub next_connection_id: AtomicU64,
42    pub _encoding: Encoding,
43    pub sockets: Mutex<HashMap<u64, Connection>>,
44    pub rpc_core: Option<RpcCore>,
45    pub options: Arc<Options>,
46}
47
48#[derive(Clone)]
49pub struct Server {
50    inner: Arc<ServerInner>,
51}
52
53const WRPC_SERVER: &str = "wrpc-server";
54
55impl Server {
56    pub fn new(tasks: usize, encoding: Encoding, core_service: Option<Arc<RpcCoreService>>, options: Arc<Options>) -> Self {
57        // This notifier UTXOs subscription granularity to rpc-core notifier
58        let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet);
59
60        // Either get a core service or be called from the proxy and rely each connection having its own gRPC client
61        assert_eq!(
62            core_service.is_none(),
63            options.grpc_proxy_address.is_some(),
64            "invalid setup: Server must exclusively get either a core service or a gRPC server address"
65        );
66
67        let rpc_core = if let Some(service) = core_service {
68            // Prepare rpc service objects
69            let notification_channel = NotificationChannel::default();
70            let listener_id = service.notifier().register_new_listener(
71                ChannelConnection::new(WRPC_SERVER, notification_channel.sender(), ChannelType::Closable),
72                ListenerLifespan::Static(policies),
73            );
74
75            // Prepare notification internals
76            let enabled_events = EVENT_TYPE_ARRAY[..].into();
77            let converter = Arc::new(WrpcServiceConverter::new());
78            let collector = Arc::new(WrpcServiceCollector::new(WRPC_SERVER, notification_channel.receiver(), converter));
79            let subscriber = Arc::new(Subscriber::new(WRPC_SERVER, enabled_events, service.notifier(), listener_id));
80            let wrpc_notifier = Arc::new(Notifier::new(
81                WRPC_SERVER,
82                enabled_events,
83                vec![collector],
84                vec![subscriber],
85                service.subscription_context(),
86                tasks,
87                policies,
88            ));
89            Some(RpcCore { service, wrpc_notifier })
90        } else {
91            None
92        };
93
94        Server {
95            inner: Arc::new(ServerInner {
96                next_connection_id: AtomicU64::new(0),
97                _encoding: encoding,
98                sockets: Mutex::new(HashMap::new()),
99                rpc_core,
100                options,
101            }),
102        }
103    }
104
105    pub fn start(&self) {
106        if let Some(rpc_core) = &self.inner.rpc_core {
107            // Start the internal notifier
108            rpc_core.wrpc_notifier.clone().start();
109        }
110    }
111
112    pub async fn connect(&self, peer: &SocketAddr, messenger: Arc<Messenger>) -> Result<Connection> {
113        // log_trace!("WebSocket connected: {}", peer);
114        let id = self.inner.next_connection_id.fetch_add(1, Ordering::SeqCst);
115
116        let grpc_client = if let Some(grpc_proxy_address) = &self.inner.options.grpc_proxy_address {
117            // Provider::GrpcClient
118
119            log_info!("Routing wrpc://{peer} -> {grpc_proxy_address}");
120            let grpc_client = GrpcClient::connect_with_args(
121                NotificationMode::Direct,
122                grpc_proxy_address.to_owned(),
123                None,
124                false,
125                None,
126                true,
127                None,
128                Default::default(),
129            )
130            .await
131            .map_err(|e| WebSocketError::Other(e.to_string()))?;
132            // log_trace!("Creating proxy relay...");
133            Some(Arc::new(grpc_client))
134        } else {
135            None
136        };
137        let connection = Connection::new(id, peer, messenger, grpc_client);
138        if self.inner.options.grpc_proxy_address.is_some() {
139            // log_trace!("starting gRPC");
140            connection.grpc_client().start(Some(connection.grpc_client_notify_target())).await;
141            // log_trace!("gRPC started...");
142        }
143        self.inner.sockets.lock()?.insert(id, connection.clone());
144        Ok(connection)
145    }
146
147    pub async fn disconnect(&self, connection: Connection) {
148        // log_info!("WebSocket disconnected: {}", connection.peer());
149        if let Some(rpc_core) = &self.inner.rpc_core {
150            if let Some(listener_id) = connection.listener_id() {
151                rpc_core.wrpc_notifier.unregister_listener(listener_id).unwrap_or_else(|err| {
152                    log_error!("WebSocket {} (disconnected) error unregistering the notification listener: {err}", connection.peer());
153                });
154            }
155        } else {
156            let _ = connection.grpc_client().disconnect().await;
157            let _ = connection.grpc_client().join().await;
158        }
159
160        self.inner.sockets.lock().unwrap().remove(&connection.id());
161
162        // FIXME: determine if messenger should be closed explicitly
163        // connection.close();
164    }
165
166    #[inline(always)]
167    pub fn notifier(&self) -> Option<Arc<WrpcNotifier>> {
168        self.inner.rpc_core.as_ref().map(|x| x.wrpc_notifier.clone())
169    }
170
171    pub fn rpc_service(&self, connection: &Connection) -> DynRpcService {
172        if let Some(rpc_core) = &self.inner.rpc_core {
173            rpc_core.service.clone()
174        } else {
175            connection.grpc_client()
176        }
177    }
178
179    pub async fn start_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> {
180        let listener_id = if let Some(listener_id) = connection.listener_id() {
181            listener_id
182        } else {
183            // The only possible case here is a server connected to rpc core.
184            // If the proxy is used, the connection has a gRPC client and the listener id
185            // is always set to Some(ListenerId::default()) by the connection ctor.
186            let notifier =
187                self.notifier().unwrap_or_else(|| panic!("Incorrect use: `server::Server` does not carry an internal notifier"));
188            let listener_id = notifier.register_new_listener(connection.clone(), ListenerLifespan::Dynamic);
189            connection.register_notification_listener(listener_id);
190            listener_id
191        };
192        workflow_log::log_trace!("notification subscribe[0x{listener_id:x}] {scope:?}");
193        if let Some(rpc_core) = &self.inner.rpc_core {
194            rpc_core.wrpc_notifier.clone().try_start_notify(listener_id, scope)?;
195        } else {
196            connection.grpc_client().start_notify(listener_id, scope).await?;
197        }
198        Ok(())
199    }
200
201    pub async fn stop_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> {
202        if let Some(listener_id) = connection.listener_id() {
203            workflow_log::log_trace!("notification unsubscribe[0x{listener_id:x}] {scope:?}");
204            if let Some(rpc_core) = &self.inner.rpc_core {
205                rpc_core.wrpc_notifier.clone().try_stop_notify(listener_id, scope)?;
206            } else {
207                connection.grpc_client().stop_notify(listener_id, scope).await?;
208            }
209        } else {
210            workflow_log::log_trace!("notification unsubscribe[N/A] {scope:?}");
211        }
212        Ok(())
213    }
214
215    pub fn verbose(&self) -> bool {
216        self.inner.options.verbose
217    }
218
219    pub async fn join(&self) -> Result<()> {
220        if let Some(rpc_core) = &self.inner.rpc_core {
221            // Wait for the internal notifier to stop
222            rpc_core.wrpc_notifier.join().await?;
223        } else {
224            // FIXME: check if all existing connections are actually getting a call to self.disconnect(connection)
225            //        else do it here
226        }
227        Ok(())
228    }
229}