1use crate::{
2 collector::{WrpcServiceCollector, WrpcServiceConverter},
3 connection::Connection,
4 result::Result,
5 service::Options,
6};
7use kaspa_grpc_client::GrpcClient;
8use kaspa_notify::{
9 connection::ChannelType,
10 events::EVENT_TYPE_ARRAY,
11 listener::ListenerLifespan,
12 notifier::Notifier,
13 scope::Scope,
14 subscriber::Subscriber,
15 subscription::{MutationPolicies, UtxosChangedMutationPolicy},
16};
17use kaspa_rpc_core::{
18 api::rpc::{DynRpcService, RpcApi},
19 notify::{channel::NotificationChannel, connection::ChannelConnection, mode::NotificationMode},
20 Notification, RpcResult,
21};
22use kaspa_rpc_service::service::RpcCoreService;
23use std::{
24 collections::HashMap,
25 sync::{
26 atomic::{AtomicU64, Ordering},
27 Arc, Mutex,
28 },
29};
30use workflow_log::*;
31use workflow_rpc::server::prelude::*;
32
33pub type WrpcNotifier = Notifier<Notification, Connection>;
34
35struct RpcCore {
36 pub service: Arc<RpcCoreService>,
37 pub wrpc_notifier: Arc<WrpcNotifier>,
38}
39
40struct ServerInner {
41 pub next_connection_id: AtomicU64,
42 pub _encoding: Encoding,
43 pub sockets: Mutex<HashMap<u64, Connection>>,
44 pub rpc_core: Option<RpcCore>,
45 pub options: Arc<Options>,
46}
47
48#[derive(Clone)]
49pub struct Server {
50 inner: Arc<ServerInner>,
51}
52
53const WRPC_SERVER: &str = "wrpc-server";
54
55impl Server {
56 pub fn new(tasks: usize, encoding: Encoding, core_service: Option<Arc<RpcCoreService>>, options: Arc<Options>) -> Self {
57 let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet);
59
60 assert_eq!(
62 core_service.is_none(),
63 options.grpc_proxy_address.is_some(),
64 "invalid setup: Server must exclusively get either a core service or a gRPC server address"
65 );
66
67 let rpc_core = if let Some(service) = core_service {
68 let notification_channel = NotificationChannel::default();
70 let listener_id = service.notifier().register_new_listener(
71 ChannelConnection::new(WRPC_SERVER, notification_channel.sender(), ChannelType::Closable),
72 ListenerLifespan::Static(policies),
73 );
74
75 let enabled_events = EVENT_TYPE_ARRAY[..].into();
77 let converter = Arc::new(WrpcServiceConverter::new());
78 let collector = Arc::new(WrpcServiceCollector::new(WRPC_SERVER, notification_channel.receiver(), converter));
79 let subscriber = Arc::new(Subscriber::new(WRPC_SERVER, enabled_events, service.notifier(), listener_id));
80 let wrpc_notifier = Arc::new(Notifier::new(
81 WRPC_SERVER,
82 enabled_events,
83 vec![collector],
84 vec![subscriber],
85 service.subscription_context(),
86 tasks,
87 policies,
88 ));
89 Some(RpcCore { service, wrpc_notifier })
90 } else {
91 None
92 };
93
94 Server {
95 inner: Arc::new(ServerInner {
96 next_connection_id: AtomicU64::new(0),
97 _encoding: encoding,
98 sockets: Mutex::new(HashMap::new()),
99 rpc_core,
100 options,
101 }),
102 }
103 }
104
105 pub fn start(&self) {
106 if let Some(rpc_core) = &self.inner.rpc_core {
107 rpc_core.wrpc_notifier.clone().start();
109 }
110 }
111
112 pub async fn connect(&self, peer: &SocketAddr, messenger: Arc<Messenger>) -> Result<Connection> {
113 let id = self.inner.next_connection_id.fetch_add(1, Ordering::SeqCst);
115
116 let grpc_client = if let Some(grpc_proxy_address) = &self.inner.options.grpc_proxy_address {
117 log_info!("Routing wrpc://{peer} -> {grpc_proxy_address}");
120 let grpc_client = GrpcClient::connect_with_args(
121 NotificationMode::Direct,
122 grpc_proxy_address.to_owned(),
123 None,
124 false,
125 None,
126 true,
127 None,
128 Default::default(),
129 )
130 .await
131 .map_err(|e| WebSocketError::Other(e.to_string()))?;
132 Some(Arc::new(grpc_client))
134 } else {
135 None
136 };
137 let connection = Connection::new(id, peer, messenger, grpc_client);
138 if self.inner.options.grpc_proxy_address.is_some() {
139 connection.grpc_client().start(Some(connection.grpc_client_notify_target())).await;
141 }
143 self.inner.sockets.lock()?.insert(id, connection.clone());
144 Ok(connection)
145 }
146
147 pub async fn disconnect(&self, connection: Connection) {
148 if let Some(rpc_core) = &self.inner.rpc_core {
150 if let Some(listener_id) = connection.listener_id() {
151 rpc_core.wrpc_notifier.unregister_listener(listener_id).unwrap_or_else(|err| {
152 log_error!("WebSocket {} (disconnected) error unregistering the notification listener: {err}", connection.peer());
153 });
154 }
155 } else {
156 let _ = connection.grpc_client().disconnect().await;
157 let _ = connection.grpc_client().join().await;
158 }
159
160 self.inner.sockets.lock().unwrap().remove(&connection.id());
161
162 }
165
166 #[inline(always)]
167 pub fn notifier(&self) -> Option<Arc<WrpcNotifier>> {
168 self.inner.rpc_core.as_ref().map(|x| x.wrpc_notifier.clone())
169 }
170
171 pub fn rpc_service(&self, connection: &Connection) -> DynRpcService {
172 if let Some(rpc_core) = &self.inner.rpc_core {
173 rpc_core.service.clone()
174 } else {
175 connection.grpc_client()
176 }
177 }
178
179 pub async fn start_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> {
180 let listener_id = if let Some(listener_id) = connection.listener_id() {
181 listener_id
182 } else {
183 let notifier =
187 self.notifier().unwrap_or_else(|| panic!("Incorrect use: `server::Server` does not carry an internal notifier"));
188 let listener_id = notifier.register_new_listener(connection.clone(), ListenerLifespan::Dynamic);
189 connection.register_notification_listener(listener_id);
190 listener_id
191 };
192 workflow_log::log_trace!("notification subscribe[0x{listener_id:x}] {scope:?}");
193 if let Some(rpc_core) = &self.inner.rpc_core {
194 rpc_core.wrpc_notifier.clone().try_start_notify(listener_id, scope)?;
195 } else {
196 connection.grpc_client().start_notify(listener_id, scope).await?;
197 }
198 Ok(())
199 }
200
201 pub async fn stop_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> {
202 if let Some(listener_id) = connection.listener_id() {
203 workflow_log::log_trace!("notification unsubscribe[0x{listener_id:x}] {scope:?}");
204 if let Some(rpc_core) = &self.inner.rpc_core {
205 rpc_core.wrpc_notifier.clone().try_stop_notify(listener_id, scope)?;
206 } else {
207 connection.grpc_client().stop_notify(listener_id, scope).await?;
208 }
209 } else {
210 workflow_log::log_trace!("notification unsubscribe[N/A] {scope:?}");
211 }
212 Ok(())
213 }
214
215 pub fn verbose(&self) -> bool {
216 self.inner.options.verbose
217 }
218
219 pub async fn join(&self) -> Result<()> {
220 if let Some(rpc_core) = &self.inner.rpc_core {
221 rpc_core.wrpc_notifier.join().await?;
223 } else {
224 }
227 Ok(())
228 }
229}