1use self::{
2 error::{Error, Result},
3 resolver::{id::IdResolver, queue::QueueResolver, DynResolver},
4};
5use async_channel::{Receiver, Sender};
6use async_trait::async_trait;
7pub use client_pool::ClientPool;
8use connection_event::ConnectionEvent;
9use futures::{future::FutureExt, pin_mut, select};
10use kaspa_core::{debug, error, trace};
11use kaspa_grpc_core::{
12 channel::NotificationChannel,
13 ops::KaspadPayloadOps,
14 protowire::{kaspad_request, rpc_client::RpcClient, GetInfoRequestMessage, KaspadRequest, KaspadResponse},
15 RPC_MAX_MESSAGE_SIZE,
16};
17use kaspa_notify::{
18 collector::{Collector, CollectorFrom},
19 error::{Error as NotifyError, Result as NotifyResult},
20 events::{EventArray, EventType, EVENT_TYPE_ARRAY},
21 listener::{ListenerId, ListenerLifespan},
22 notifier::{DynNotify, Notifier},
23 scope::Scope,
24 subscriber::{Subscriber, SubscriptionManager},
25 subscription::{
26 array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies,
27 UtxosChangedMutationPolicy,
28 },
29};
30use kaspa_rpc_core::{
31 api::rpc::RpcApi,
32 error::RpcError,
33 error::RpcResult,
34 model::message::*,
35 notify::{collector::RpcCoreConverter, connection::ChannelConnection, mode::NotificationMode},
36 Notification,
37};
38use kaspa_utils::{channel::Channel, triggers::DuplexTrigger};
39use kaspa_utils_tower::{
40 counters::TowerConnectionCounters,
41 middleware::{measure_request_body_size_layer, CountBytesBody, MapResponseBodyLayer, ServiceBuilder},
42};
43use regex::Regex;
44use std::{
45 sync::{
46 atomic::{AtomicBool, Ordering},
47 Arc,
48 },
49 time::Duration,
50};
51use tokio::sync::Mutex;
52use tonic::codec::CompressionEncoding;
53use tonic::codegen::Body;
54use tonic::Streaming;
55
56mod connection_event;
57pub mod error;
58mod resolver;
59#[macro_use]
60mod route;
61
62mod client_pool;
63
64pub type GrpcClientCollector = CollectorFrom<RpcCoreConverter>;
65pub type GrpcClientNotify = DynNotify<Notification>;
66pub type GrpcClientNotifier = Notifier<Notification, ChannelConnection>;
67
68type DirectSubscriptions = Mutex<EventArray<DynSubscription>>;
69
70#[derive(Debug, Clone)]
71pub struct GrpcClient {
72 inner: Arc<Inner>,
73 notifier: Option<Arc<GrpcClientNotifier>>,
75 collector: Option<Arc<GrpcClientCollector>>,
77 subscriptions: Option<Arc<DirectSubscriptions>>,
78 subscription_context: SubscriptionContext,
79 policies: MutationPolicies,
80 notification_mode: NotificationMode,
81}
82
83const GRPC_CLIENT: &str = "grpc-client";
84
85impl GrpcClient {
86 pub const DIRECT_MODE_LISTENER_ID: ListenerId = 0;
87
88 pub async fn connect(url: String) -> Result<GrpcClient> {
89 Self::connect_with_args(NotificationMode::Direct, url, None, false, None, false, None, Default::default()).await
90 }
91
92 pub async fn connect_with_args(
117 notification_mode: NotificationMode,
118 url: String,
119 subscription_context: Option<SubscriptionContext>,
120 reconnect: bool,
121 connection_event_sender: Option<Sender<ConnectionEvent>>,
122 override_handle_stop_notify: bool,
123 timeout_duration: Option<u64>,
124 counters: Arc<TowerConnectionCounters>,
125 ) -> Result<GrpcClient> {
126 let schema = Regex::new(r"^grpc://").unwrap();
127 if !schema.is_match(&url) {
128 return Err(Error::GrpcAddressSchema(url));
129 }
130 let inner = Inner::connect(
131 url,
132 connection_event_sender,
133 override_handle_stop_notify,
134 timeout_duration.unwrap_or(REQUEST_TIMEOUT_DURATION),
135 counters,
136 )
137 .await?;
138 let converter = Arc::new(RpcCoreConverter::new());
139 let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet);
140 let subscription_context = subscription_context.unwrap_or_default();
141 let (notifier, collector, subscriptions) = match notification_mode {
142 NotificationMode::MultiListeners => {
143 let enabled_events = EVENT_TYPE_ARRAY[..].into();
144 let collector = Arc::new(GrpcClientCollector::new(GRPC_CLIENT, inner.notification_channel_receiver(), converter));
145 let subscriber = Arc::new(Subscriber::new(GRPC_CLIENT, enabled_events, inner.clone(), 0));
146 let notifier: GrpcClientNotifier = Notifier::new(
147 GRPC_CLIENT,
148 enabled_events,
149 vec![collector],
150 vec![subscriber],
151 subscription_context.clone(),
152 3,
153 policies,
154 );
155 (Some(Arc::new(notifier)), None, None)
156 }
157 NotificationMode::Direct => {
158 let collector = GrpcClientCollector::new(GRPC_CLIENT, inner.notification_channel_receiver(), converter);
159 let subscriptions = ArrayBuilder::single(Self::DIRECT_MODE_LISTENER_ID, None);
160 (None, Some(Arc::new(collector)), Some(Arc::new(Mutex::new(subscriptions))))
161 }
162 };
163
164 if reconnect {
165 inner.clone().spawn_connection_monitor(notifier.clone(), subscriptions.clone(), subscription_context.clone());
167 }
168
169 Ok(Self { inner, notifier, collector, subscriptions, subscription_context, policies, notification_mode })
170 }
171
172 #[inline(always)]
173 pub fn notifier(&self) -> Option<Arc<GrpcClientNotifier>> {
174 self.notifier.clone()
175 }
176
177 pub async fn start(&self, notify: Option<GrpcClientNotify>) {
179 match &self.notification_mode {
180 NotificationMode::MultiListeners => {
181 assert!(notify.is_none(), "client is on multi-listeners mode");
182 self.notifier.clone().unwrap().start();
183 }
184 NotificationMode::Direct => {
185 if let Some(notify) = notify {
186 self.collector.as_ref().unwrap().clone().start(notify);
187 }
188 }
189 }
190 }
191
192 pub async fn join(&self) -> Result<()> {
194 match &self.notification_mode {
195 NotificationMode::MultiListeners => {
196 self.notifier.as_ref().unwrap().join().await?;
197 }
198 NotificationMode::Direct => {
199 if self.collector.as_ref().unwrap().is_started() {
200 self.collector.as_ref().unwrap().clone().join().await?;
201 }
202 }
203 }
204 Ok(())
205 }
206
207 pub fn is_connected(&self) -> bool {
208 self.inner.is_connected()
209 }
210
211 pub fn handle_message_id(&self) -> bool {
212 self.inner.handle_message_id()
213 }
214
215 pub fn handle_stop_notify(&self) -> bool {
216 self.inner.handle_stop_notify()
217 }
218
219 pub async fn disconnect(&self) -> Result<()> {
220 self.inner.disconnect().await?;
221 Ok(())
222 }
223
224 pub fn notification_channel_receiver(&self) -> Receiver<Notification> {
225 self.inner.notification_channel.receiver()
226 }
227
228 pub fn notification_mode(&self) -> NotificationMode {
229 self.notification_mode
230 }
231}
232
233#[async_trait]
234impl RpcApi for GrpcClient {
235 route!(ping_call, Ping);
241 route!(get_sync_status_call, GetSyncStatus);
242 route!(get_server_info_call, GetServerInfo);
243 route!(get_metrics_call, GetMetrics);
244 route!(get_connections_call, GetConnections);
245 route!(get_system_info_call, GetSystemInfo);
246 route!(submit_block_call, SubmitBlock);
247 route!(get_block_template_call, GetBlockTemplate);
248 route!(get_block_call, GetBlock);
249 route!(get_info_call, GetInfo);
250 route!(get_current_network_call, GetCurrentNetwork);
251 route!(get_peer_addresses_call, GetPeerAddresses);
252 route!(get_sink_call, GetSink);
253 route!(get_mempool_entry_call, GetMempoolEntry);
254 route!(get_mempool_entries_call, GetMempoolEntries);
255 route!(get_connected_peer_info_call, GetConnectedPeerInfo);
256 route!(add_peer_call, AddPeer);
257 route!(submit_transaction_call, SubmitTransaction);
258 route!(submit_transaction_replacement_call, SubmitTransactionReplacement);
259 route!(get_subnetwork_call, GetSubnetwork);
260 route!(get_virtual_chain_from_block_call, GetVirtualChainFromBlock);
261 route!(get_blocks_call, GetBlocks);
262 route!(get_block_count_call, GetBlockCount);
263 route!(get_block_dag_info_call, GetBlockDagInfo);
264 route!(resolve_finality_conflict_call, ResolveFinalityConflict);
265 route!(shutdown_call, Shutdown);
266 route!(get_headers_call, GetHeaders);
267 route!(get_utxos_by_addresses_call, GetUtxosByAddresses);
268 route!(get_balance_by_address_call, GetBalanceByAddress);
269 route!(get_balances_by_addresses_call, GetBalancesByAddresses);
270 route!(get_sink_blue_score_call, GetSinkBlueScore);
271 route!(ban_call, Ban);
272 route!(unban_call, Unban);
273 route!(estimate_network_hashes_per_second_call, EstimateNetworkHashesPerSecond);
274 route!(get_mempool_entries_by_addresses_call, GetMempoolEntriesByAddresses);
275 route!(get_coin_supply_call, GetCoinSupply);
276 route!(get_daa_score_timestamp_estimate_call, GetDaaScoreTimestampEstimate);
277 route!(get_fee_estimate_call, GetFeeEstimate);
278 route!(get_fee_estimate_experimental_call, GetFeeEstimateExperimental);
279 route!(get_current_block_color_call, GetCurrentBlockColor);
280
281 fn register_new_listener(&self, connection: ChannelConnection) -> ListenerId {
286 match self.notification_mode {
287 NotificationMode::MultiListeners => {
288 self.notifier.as_ref().unwrap().register_new_listener(connection, ListenerLifespan::Dynamic)
289 }
290 NotificationMode::Direct => Self::DIRECT_MODE_LISTENER_ID,
292 }
293 }
294
295 async fn unregister_listener(&self, id: ListenerId) -> RpcResult<()> {
299 match self.notification_mode {
300 NotificationMode::MultiListeners => {
301 self.notifier.as_ref().unwrap().unregister_listener(id)?;
302 }
303 NotificationMode::Direct => {}
305 }
306 Ok(())
307 }
308
309 async fn start_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> {
311 match self.notification_mode {
312 NotificationMode::MultiListeners => {
313 self.notifier.clone().unwrap().try_start_notify(id, scope)?;
314 }
315 NotificationMode::Direct => {
316 if self.inner.will_reconnect() {
317 let event = scope.event_type();
318 self.subscriptions.as_ref().unwrap().lock().await[event].mutate(
319 Mutation::new(Command::Start, scope.clone()),
320 self.policies,
321 &self.subscription_context,
322 )?;
323 }
324 self.inner.start_notify_to_client(scope).await?;
325 }
326 }
327 Ok(())
328 }
329
330 async fn stop_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> {
332 if self.handle_stop_notify() {
333 match self.notification_mode {
334 NotificationMode::MultiListeners => {
335 self.notifier.clone().unwrap().try_stop_notify(id, scope)?;
336 }
337 NotificationMode::Direct => {
338 if self.inner.will_reconnect() {
339 let event = scope.event_type();
340 self.subscriptions.as_ref().unwrap().lock().await[event].mutate(
341 Mutation::new(Command::Stop, scope.clone()),
342 self.policies,
343 &self.subscription_context,
344 )?;
345 }
346 self.inner.stop_notify_to_client(scope).await?;
347 }
348 }
349 Ok(())
350 } else {
351 Err(RpcError::UnsupportedFeature)
352 }
353 }
354}
355
356pub const CONNECT_TIMEOUT_DURATION: u64 = 20_000;
357pub const REQUEST_TIMEOUT_DURATION: u64 = 5_000;
358pub const TIMEOUT_MONITORING_INTERVAL: u64 = 10_000;
359pub const RECONNECT_INTERVAL: u64 = 2_000;
360
361type KaspadRequestSender = async_channel::Sender<KaspadRequest>;
362type KaspadRequestReceiver = async_channel::Receiver<KaspadRequest>;
363
364#[derive(Debug, Default)]
365struct ServerFeatures {
366 pub handle_stop_notify: bool,
367 pub handle_message_id: bool,
368}
369
370#[derive(Debug)]
404struct Inner {
405 url: String,
406
407 server_features: ServerFeatures,
408
409 notification_channel: NotificationChannel,
411
412 request_sender: KaspadRequestSender,
414 request_receiver: KaspadRequestReceiver,
415
416 receiver_is_running: AtomicBool,
418 receiver_shutdown: DuplexTrigger,
419
420 resolver: DynResolver,
422
423 timeout_is_running: AtomicBool,
425 timeout_shutdown: DuplexTrigger,
426 timeout_timer_interval: u64,
427 timeout_duration: u64,
428
429 connector_is_running: AtomicBool,
431 connector_shutdown: DuplexTrigger,
432 connector_timer_interval: u64,
433
434 connection_event_sender: Option<Sender<ConnectionEvent>>,
436
437 override_handle_stop_notify: bool,
439
440 counters: Arc<TowerConnectionCounters>,
442}
443
444impl Inner {
445 fn new(
446 url: String,
447 server_features: ServerFeatures,
448 request_sender: KaspadRequestSender,
449 request_receiver: KaspadRequestReceiver,
450 connection_event_sender: Option<Sender<ConnectionEvent>>,
451 override_handle_stop_notify: bool,
452 timeout_duration: u64,
453 counters: Arc<TowerConnectionCounters>,
454 ) -> Self {
455 let resolver: DynResolver = match server_features.handle_message_id {
456 true => Arc::new(IdResolver::new()),
457 false => Arc::new(QueueResolver::new()),
458 };
459 let notification_channel = Channel::default();
460 Self {
461 url,
462 server_features,
463 notification_channel,
464 request_sender,
465 request_receiver,
466 resolver,
467 receiver_is_running: AtomicBool::new(false),
468 receiver_shutdown: DuplexTrigger::new(),
469 timeout_is_running: AtomicBool::new(false),
470 timeout_shutdown: DuplexTrigger::new(),
471 timeout_duration,
472 timeout_timer_interval: TIMEOUT_MONITORING_INTERVAL,
473 connector_is_running: AtomicBool::new(false),
474 connector_shutdown: DuplexTrigger::new(),
475 connector_timer_interval: RECONNECT_INTERVAL,
476 connection_event_sender,
477 override_handle_stop_notify,
478 counters,
479 }
480 }
481
482 async fn connect(
484 url: String,
485 connection_event_sender: Option<Sender<ConnectionEvent>>,
486 override_handle_stop_notify: bool,
487 timeout_duration: u64,
488 counters: Arc<TowerConnectionCounters>,
489 ) -> Result<Arc<Self>> {
490 let (request_sender, request_receiver) = async_channel::unbounded();
492
493 let (stream, server_features) =
495 Inner::try_connect(url.clone(), request_sender.clone(), request_receiver.clone(), timeout_duration, counters.clone())
496 .await?;
497
498 let inner = Arc::new(Inner::new(
500 url,
501 server_features,
502 request_sender,
503 request_receiver,
504 connection_event_sender,
505 override_handle_stop_notify,
506 timeout_duration,
507 counters,
508 ));
509
510 inner.clone().spawn_request_timeout_monitor();
512
513 inner.clone().spawn_response_receiver_task(stream);
515
516 trace!("GRPC client: connected");
517 Ok(inner)
518 }
519
520 #[allow(unused_variables)]
521 async fn try_connect(
522 url: String,
523 request_sender: KaspadRequestSender,
524 request_receiver: KaspadRequestReceiver,
525 request_timeout: u64,
526 counters: Arc<TowerConnectionCounters>,
527 ) -> Result<(Streaming<KaspadResponse>, ServerFeatures)> {
528 #[cfg(not(feature = "heap"))]
530 let channel =
531 tonic::transport::Channel::builder(url.parse::<tonic::transport::Uri>().map_err(|e| Error::String(e.to_string()))?)
532 .timeout(tokio::time::Duration::from_millis(request_timeout))
533 .connect_timeout(tokio::time::Duration::from_millis(CONNECT_TIMEOUT_DURATION))
534 .connect()
535 .await?;
536
537 #[cfg(feature = "heap")]
538 let channel =
539 tonic::transport::Channel::builder(url.parse::<tonic::transport::Uri>().map_err(|e| Error::String(e.to_string()))?)
540 .connect()
541 .await?;
542
543 let bytes_rx = &counters.bytes_rx;
544 let bytes_tx = &counters.bytes_tx;
545 let channel = ServiceBuilder::new()
546 .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, bytes_rx.clone())))
547 .layer(measure_request_body_size_layer(bytes_tx.clone(), |body| {
548 body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync()
549 }))
550 .service(channel);
551
552 #[cfg(not(feature = "heap"))]
554 let request_timeout = tokio::time::Duration::from_millis(request_timeout);
555 #[cfg(not(feature = "heap"))]
556 let mut client = RpcClient::with_interceptor(channel, move |mut req: tonic::Request<()>| {
557 req.set_timeout(request_timeout);
558 Ok(req)
559 });
560
561 #[cfg(feature = "heap")]
562 let mut client = RpcClient::new(channel);
563
564 client = client
565 .send_compressed(CompressionEncoding::Gzip)
566 .accept_compressed(CompressionEncoding::Gzip)
567 .max_decoding_message_size(RPC_MAX_MESSAGE_SIZE);
568
569 let stream_receiver = request_receiver.clone();
571 let request_stream = async_stream::stream! {
572 while let Ok(item) = stream_receiver.recv().await {
573 yield item;
574 }
575 };
576
577 let mut stream: Streaming<KaspadResponse> = client.message_stream(request_stream).await?.into_inner();
579
580 let mut server_features = ServerFeatures::default();
582 request_sender.send(GetInfoRequestMessage {}.into()).await?;
583 match stream.message().await? {
584 Some(ref msg) => {
585 trace!("GRPC client: try_connect - GetInfo got a response");
586 let response: RpcResult<GetInfoResponse> = msg.try_into();
587 if let Ok(response) = response {
588 server_features.handle_stop_notify = response.has_notify_command;
589 server_features.handle_message_id = response.has_message_id;
590 }
591 }
592 None => {
593 debug!("GRPC client: try_connect - stream closed by the server");
594 return Err(Error::String("GRPC stream was closed by the server".to_string()));
595 }
596 }
597
598 Ok((stream, server_features))
599 }
600
601 async fn reconnect(
602 self: Arc<Self>,
603 notifier: Option<Arc<GrpcClientNotifier>>,
604 subscriptions: Option<Arc<DirectSubscriptions>>,
605 subscription_context: &SubscriptionContext,
606 ) -> RpcResult<()> {
607 assert_ne!(
608 notifier.is_some(),
609 subscriptions.is_some(),
610 "exclusively either a notifier in MultiListener mode or subscriptions in Direct mode"
611 );
612 let (stream, _) = Inner::try_connect(
616 self.url.clone(),
617 self.request_sender.clone(),
618 self.request_receiver.clone(),
619 self.timeout_duration,
620 self.counters.clone(),
621 )
622 .await?;
623
624 self.clone().spawn_response_receiver_task(stream);
626
627 if let Some(notifier) = notifier.as_ref() {
629 notifier.try_renew_subscriptions()?;
630 }
631
632 if let Some(subscriptions) = subscriptions.as_ref() {
634 let subscriptions = subscriptions.lock().await;
635 for event in EVENT_TYPE_ARRAY {
636 if subscriptions[event].active() {
637 self.clone().start_notify_to_client(subscriptions[event].scope(subscription_context)).await?;
638 }
639 }
640 }
641
642 debug!("GRPC client: reconnected");
643 Ok(())
644 }
645
646 pub fn notification_channel_receiver(&self) -> Receiver<Notification> {
647 self.notification_channel.receiver()
648 }
649
650 fn send_connection_event(&self, event: ConnectionEvent) {
651 if let Some(ref connection_event_sender) = self.connection_event_sender {
652 if let Err(err) = connection_event_sender.try_send(event) {
653 debug!("Send connection event error: {err}");
654 }
655 }
656 }
657
658 fn is_connected(&self) -> bool {
659 self.receiver_is_running.load(Ordering::SeqCst)
660 }
661
662 fn will_reconnect(&self) -> bool {
663 self.connector_is_running.load(Ordering::SeqCst)
664 }
665
666 #[inline(always)]
667 fn handle_message_id(&self) -> bool {
668 self.server_features.handle_message_id
669 }
670
671 #[inline(always)]
672 fn handle_stop_notify(&self) -> bool {
673 if self.override_handle_stop_notify {
675 true
676 } else {
677 self.server_features.handle_stop_notify
678 }
679 }
680
681 #[inline(always)]
682 fn resolver(&self) -> DynResolver {
683 self.resolver.clone()
684 }
685
686 async fn call(&self, op: KaspadPayloadOps, request: impl Into<KaspadRequest>) -> Result<KaspadResponse> {
687 if self.is_connected() {
689 let id = u64::from_le_bytes(rand::random::<[u8; 8]>());
690 let mut request: KaspadRequest = request.into();
691 request.id = id;
692
693 trace!("GRPC client: resolver call: {:?}", request);
694 if request.payload.is_some() {
695 let receiver = self.resolver().register_request(op, &request);
696 self.request_sender.send(request).await.map_err(|_| Error::ChannelRecvError)?;
697 receiver.await?
698 } else {
699 Err(Error::MissingRequestPayload)
700 }
701 } else {
702 Err(Error::NotConnected)
703 }
704 }
705
706 fn spawn_request_timeout_monitor(self: Arc<Self>) {
709 if self.timeout_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
713 trace!("GRPC client: timeout task - spawn request ignored since already spawned");
714 return;
715 }
716
717 tokio::spawn(async move {
718 trace!("GRPC client: timeout task - started");
719 let shutdown = self.timeout_shutdown.request.listener.clone().fuse();
720 pin_mut!(shutdown);
721
722 loop {
723 let timeout_timer_interval = Duration::from_millis(self.timeout_timer_interval);
724 let delay = tokio::time::sleep(timeout_timer_interval).fuse();
725 pin_mut!(delay);
726
727 select! {
728 _ = shutdown => { break; },
729 _ = delay => {
730 trace!("GRPC client: timeout task - running");
731 let timeout = Duration::from_millis(self.timeout_duration);
732 self.resolver().remove_expired_requests(timeout);
733 },
734 }
735 }
736 self.timeout_is_running.store(false, Ordering::SeqCst);
737 self.timeout_shutdown.response.trigger.trigger();
738
739 trace!("GRPC client: timeout task - terminated");
740 });
741 }
742
743 fn spawn_response_receiver_task(self: Arc<Self>, mut stream: Streaming<KaspadResponse>) {
745 if self.receiver_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
749 trace!("GRPC client: response receiver task - spawn ignored since already spawned");
750 return;
751 }
752
753 self.send_connection_event(ConnectionEvent::Connected);
755
756 tokio::spawn(async move {
757 trace!("GRPC client: response receiver task - started");
758 loop {
759 let shutdown = self.receiver_shutdown.request.listener.clone();
760 pin_mut!(shutdown);
761
762 tokio::select! {
763 biased;
764
765 _ = shutdown => {
766 break;
767 }
768
769 message = stream.message() => {
770 match message {
771 Ok(msg) => {
772 match msg {
773 Some(response) => {
774 self.handle_response(response);
775 },
776 None =>{
777 debug!("GRPC client: response receiver task - the connection to the server is closed");
778
779 break;
781 }
782 }
783 },
784 Err(err) => {
785 debug!("GRPC client: response receiver task - the response receiver gets an error from the server: {:?}", err);
786
787 break;
791 }
792 }
793 }
794 }
795 }
796 self.receiver_is_running.store(false, Ordering::SeqCst);
798 self.send_connection_event(ConnectionEvent::Disconnected);
799
800 if !self.will_reconnect() {
802 self.notification_channel.close();
803 }
804
805 if self.receiver_shutdown.request.listener.is_triggered() {
806 self.receiver_shutdown.response.trigger.trigger();
807 }
808
809 trace!("GRPC client: response receiver task - terminated");
810 });
811 }
812
813 fn spawn_connection_monitor(
816 self: Arc<Self>,
817 notifier: Option<Arc<GrpcClientNotifier>>,
818 subscriptions: Option<Arc<DirectSubscriptions>>,
819 subscription_context: SubscriptionContext,
820 ) {
821 if self.connector_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
825 trace!("GRPC client: connection monitor task - spawn ignored since already spawned");
826 return;
827 }
828
829 tokio::spawn(async move {
830 trace!("GRPC client: connection monitor task - started");
831 let shutdown = self.connector_shutdown.request.listener.clone().fuse();
832 pin_mut!(shutdown);
833 loop {
834 let connector_timer_interval = Duration::from_millis(self.connector_timer_interval);
835 let delay = tokio::time::sleep(connector_timer_interval).fuse();
836 pin_mut!(delay);
837 select! {
838 _ = shutdown => { break; },
839 _ = delay => {
840 trace!("GRPC client: connection monitor task - running");
841 if !self.is_connected() {
842 match self.clone().reconnect(notifier.clone(), subscriptions.clone(), &subscription_context).await {
843 Ok(_) => {
844 trace!("GRPC client: reconnection to server succeeded");
845 },
846 Err(err) => {
847 trace!("GRPC client: reconnection to server failed with error {err:?}");
848 }
849 }
850 }
851 },
852 }
853 }
854 self.connector_is_running.store(false, Ordering::SeqCst);
855 self.connector_shutdown.response.trigger.trigger();
856 trace!("GRPC client: connection monitor task - terminating");
857 });
858 }
859
860 fn handle_response(&self, response: KaspadResponse) {
861 if response.is_notification() {
862 trace!("GRPC client: handle_response received a notification");
863 match Notification::try_from(&response) {
864 Ok(notification) => {
865 let event: EventType = (¬ification).into();
866 trace!("GRPC client: handle_response received notification: {:?}", event);
867
868 match self.notification_channel.try_send(notification) {
870 Ok(_) => {}
871 Err(err) => {
872 error!("GRPC client: error while trying to send a notification to the notifier: {:?}", err);
873 }
874 }
875 }
876 Err(err) => {
877 error!("GRPC client: handle_response error converting response into notification: {:?}", err);
878 }
879 }
880 } else if response.payload.is_some() {
881 self.resolver().handle_response(response);
882 }
883 }
884
885 async fn disconnect(&self) -> Result<()> {
886 self.stop_connector_monitor().await?;
887 self.stop_timeout_monitor().await?;
888 self.stop_response_receiver_task().await?;
889 self.request_receiver.close();
890 trace!("GRPC client: disconnected");
891 Ok(())
892 }
893
894 async fn stop_response_receiver_task(&self) -> Result<()> {
895 if self.receiver_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
896 self.receiver_shutdown.request.trigger.trigger();
897 self.receiver_shutdown.response.listener.clone().await;
898 }
899 Ok(())
900 }
901
902 async fn stop_timeout_monitor(&self) -> Result<()> {
903 if self.timeout_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
904 self.timeout_shutdown.request.trigger.trigger();
905 self.timeout_shutdown.response.listener.clone().await;
906 }
907 Ok(())
908 }
909
910 async fn stop_connector_monitor(&self) -> Result<()> {
911 if self.connector_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
912 self.connector_shutdown.request.trigger.trigger();
913 self.connector_shutdown.response.listener.clone().await;
914 }
915 Ok(())
916 }
917
918 async fn start_notify_to_client(&self, scope: Scope) -> RpcResult<()> {
920 let request = kaspad_request::Payload::from_notification_type(&scope, Command::Start);
921 self.call((&request).into(), request).await?;
922 Ok(())
923 }
924
925 async fn stop_notify_to_client(&self, scope: Scope) -> RpcResult<()> {
927 if self.handle_stop_notify() {
928 let request = kaspad_request::Payload::from_notification_type(&scope, Command::Stop);
929 self.call((&request).into(), request).await?;
930 }
931 Ok(())
932 }
933}
934
935#[async_trait]
936impl SubscriptionManager for Inner {
937 async fn start_notify(&self, _: ListenerId, scope: Scope) -> NotifyResult<()> {
938 trace!("GRPC client: start_notify: {:?}", scope);
939 self.start_notify_to_client(scope).await.map_err(|err| NotifyError::General(err.to_string()))?;
940 Ok(())
941 }
942
943 async fn stop_notify(&self, _: ListenerId, scope: Scope) -> NotifyResult<()> {
944 if self.handle_stop_notify() {
945 trace!("GRPC client: stop_notify: {:?}", scope);
946 self.stop_notify_to_client(scope).await.map_err(|err| NotifyError::General(err.to_string()))?;
947 } else {
948 trace!("GRPC client: stop_notify ignored because not supported by the server: {:?}", scope);
949 }
950 Ok(())
951 }
952}