kaspa_grpc_client/
lib.rs

1use self::{
2    error::{Error, Result},
3    resolver::{id::IdResolver, queue::QueueResolver, DynResolver},
4};
5use async_channel::{Receiver, Sender};
6use async_trait::async_trait;
7pub use client_pool::ClientPool;
8use connection_event::ConnectionEvent;
9use futures::{future::FutureExt, pin_mut, select};
10use kaspa_core::{debug, error, trace};
11use kaspa_grpc_core::{
12    channel::NotificationChannel,
13    ops::KaspadPayloadOps,
14    protowire::{kaspad_request, rpc_client::RpcClient, GetInfoRequestMessage, KaspadRequest, KaspadResponse},
15    RPC_MAX_MESSAGE_SIZE,
16};
17use kaspa_notify::{
18    collector::{Collector, CollectorFrom},
19    error::{Error as NotifyError, Result as NotifyResult},
20    events::{EventArray, EventType, EVENT_TYPE_ARRAY},
21    listener::{ListenerId, ListenerLifespan},
22    notifier::{DynNotify, Notifier},
23    scope::Scope,
24    subscriber::{Subscriber, SubscriptionManager},
25    subscription::{
26        array::ArrayBuilder, context::SubscriptionContext, Command, DynSubscription, MutateSingle, Mutation, MutationPolicies,
27        UtxosChangedMutationPolicy,
28    },
29};
30use kaspa_rpc_core::{
31    api::rpc::RpcApi,
32    error::RpcError,
33    error::RpcResult,
34    model::message::*,
35    notify::{collector::RpcCoreConverter, connection::ChannelConnection, mode::NotificationMode},
36    Notification,
37};
38use kaspa_utils::{channel::Channel, triggers::DuplexTrigger};
39use kaspa_utils_tower::{
40    counters::TowerConnectionCounters,
41    middleware::{measure_request_body_size_layer, CountBytesBody, MapResponseBodyLayer, ServiceBuilder},
42};
43use regex::Regex;
44use std::{
45    sync::{
46        atomic::{AtomicBool, Ordering},
47        Arc,
48    },
49    time::Duration,
50};
51use tokio::sync::Mutex;
52use tonic::codec::CompressionEncoding;
53use tonic::codegen::Body;
54use tonic::Streaming;
55
56mod connection_event;
57pub mod error;
58mod resolver;
59#[macro_use]
60mod route;
61
62mod client_pool;
63
64pub type GrpcClientCollector = CollectorFrom<RpcCoreConverter>;
65pub type GrpcClientNotify = DynNotify<Notification>;
66pub type GrpcClientNotifier = Notifier<Notification, ChannelConnection>;
67
68type DirectSubscriptions = Mutex<EventArray<DynSubscription>>;
69
70#[derive(Debug, Clone)]
71pub struct GrpcClient {
72    inner: Arc<Inner>,
73    /// In multi listener mode, a full-featured Notifier
74    notifier: Option<Arc<GrpcClientNotifier>>,
75    /// In direct mode, a Collector relaying incoming notifications via a channel (see `self.notification_channel_receiver()`)
76    collector: Option<Arc<GrpcClientCollector>>,
77    subscriptions: Option<Arc<DirectSubscriptions>>,
78    subscription_context: SubscriptionContext,
79    policies: MutationPolicies,
80    notification_mode: NotificationMode,
81}
82
83const GRPC_CLIENT: &str = "grpc-client";
84
85impl GrpcClient {
86    pub const DIRECT_MODE_LISTENER_ID: ListenerId = 0;
87
88    pub async fn connect(url: String) -> Result<GrpcClient> {
89        Self::connect_with_args(NotificationMode::Direct, url, None, false, None, false, None, Default::default()).await
90    }
91
92    /// Connects to a gRPC server.
93    ///
94    /// `notification_mode` determines how notifications are handled:
95    ///
96    /// - `MultiListeners` => Multiple listeners are supported via the [`RpcApi`] implementation.
97    ///                       Registering listeners is needed before subscribing to notifications.
98    /// - `Direct` => A single listener receives the notification via a channel (see  `self.notification_channel_receiver()`).
99    ///               Registering a listener is pointless and ignored.
100    ///               Subscribing to notifications ignores the listener ID.
101    ///
102    /// `url`: the server to connect to
103    ///
104    /// `subscription_context`: it is advised to provide a clone of the same instance if multiple clients dealing with
105    /// `UtxosChangedNotifications` are connected concurrently in order to optimize the memory footprint.
106    ///
107    /// `reconnect`: features an automatic reconnection to the server, reactivating all subscriptions on success.
108    ///
109    /// `connection_event_sender`: when provided will notify of connection and disconnection events via the channel.
110    ///
111    /// `override_handle_stop_notify`: legacy, should be removed in near future, always set to `false`.
112    ///
113    /// `timeout_duration`: request timeout duration
114    ///
115    /// `counters`: collects some bandwidth metrics
116    pub async fn connect_with_args(
117        notification_mode: NotificationMode,
118        url: String,
119        subscription_context: Option<SubscriptionContext>,
120        reconnect: bool,
121        connection_event_sender: Option<Sender<ConnectionEvent>>,
122        override_handle_stop_notify: bool,
123        timeout_duration: Option<u64>,
124        counters: Arc<TowerConnectionCounters>,
125    ) -> Result<GrpcClient> {
126        let schema = Regex::new(r"^grpc://").unwrap();
127        if !schema.is_match(&url) {
128            return Err(Error::GrpcAddressSchema(url));
129        }
130        let inner = Inner::connect(
131            url,
132            connection_event_sender,
133            override_handle_stop_notify,
134            timeout_duration.unwrap_or(REQUEST_TIMEOUT_DURATION),
135            counters,
136        )
137        .await?;
138        let converter = Arc::new(RpcCoreConverter::new());
139        let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet);
140        let subscription_context = subscription_context.unwrap_or_default();
141        let (notifier, collector, subscriptions) = match notification_mode {
142            NotificationMode::MultiListeners => {
143                let enabled_events = EVENT_TYPE_ARRAY[..].into();
144                let collector = Arc::new(GrpcClientCollector::new(GRPC_CLIENT, inner.notification_channel_receiver(), converter));
145                let subscriber = Arc::new(Subscriber::new(GRPC_CLIENT, enabled_events, inner.clone(), 0));
146                let notifier: GrpcClientNotifier = Notifier::new(
147                    GRPC_CLIENT,
148                    enabled_events,
149                    vec![collector],
150                    vec![subscriber],
151                    subscription_context.clone(),
152                    3,
153                    policies,
154                );
155                (Some(Arc::new(notifier)), None, None)
156            }
157            NotificationMode::Direct => {
158                let collector = GrpcClientCollector::new(GRPC_CLIENT, inner.notification_channel_receiver(), converter);
159                let subscriptions = ArrayBuilder::single(Self::DIRECT_MODE_LISTENER_ID, None);
160                (None, Some(Arc::new(collector)), Some(Arc::new(Mutex::new(subscriptions))))
161            }
162        };
163
164        if reconnect {
165            // Start the connection monitor
166            inner.clone().spawn_connection_monitor(notifier.clone(), subscriptions.clone(), subscription_context.clone());
167        }
168
169        Ok(Self { inner, notifier, collector, subscriptions, subscription_context, policies, notification_mode })
170    }
171
172    #[inline(always)]
173    pub fn notifier(&self) -> Option<Arc<GrpcClientNotifier>> {
174        self.notifier.clone()
175    }
176
177    /// Starts RPC services.
178    pub async fn start(&self, notify: Option<GrpcClientNotify>) {
179        match &self.notification_mode {
180            NotificationMode::MultiListeners => {
181                assert!(notify.is_none(), "client is on multi-listeners mode");
182                self.notifier.clone().unwrap().start();
183            }
184            NotificationMode::Direct => {
185                if let Some(notify) = notify {
186                    self.collector.as_ref().unwrap().clone().start(notify);
187                }
188            }
189        }
190    }
191
192    /// Joins on RPC services.
193    pub async fn join(&self) -> Result<()> {
194        match &self.notification_mode {
195            NotificationMode::MultiListeners => {
196                self.notifier.as_ref().unwrap().join().await?;
197            }
198            NotificationMode::Direct => {
199                if self.collector.as_ref().unwrap().is_started() {
200                    self.collector.as_ref().unwrap().clone().join().await?;
201                }
202            }
203        }
204        Ok(())
205    }
206
207    pub fn is_connected(&self) -> bool {
208        self.inner.is_connected()
209    }
210
211    pub fn handle_message_id(&self) -> bool {
212        self.inner.handle_message_id()
213    }
214
215    pub fn handle_stop_notify(&self) -> bool {
216        self.inner.handle_stop_notify()
217    }
218
219    pub async fn disconnect(&self) -> Result<()> {
220        self.inner.disconnect().await?;
221        Ok(())
222    }
223
224    pub fn notification_channel_receiver(&self) -> Receiver<Notification> {
225        self.inner.notification_channel.receiver()
226    }
227
228    pub fn notification_mode(&self) -> NotificationMode {
229        self.notification_mode
230    }
231}
232
233#[async_trait]
234impl RpcApi for GrpcClient {
235    // this example illustrates the body of the function created by the route!() macro
236    // async fn submit_block_call(&self, request: SubmitBlockRequest) -> RpcResult<SubmitBlockResponse> {
237    //     self.inner.call(KaspadPayloadOps::SubmitBlock, request).await?.as_ref().try_into()
238    // }
239
240    route!(ping_call, Ping);
241    route!(get_sync_status_call, GetSyncStatus);
242    route!(get_server_info_call, GetServerInfo);
243    route!(get_metrics_call, GetMetrics);
244    route!(get_connections_call, GetConnections);
245    route!(get_system_info_call, GetSystemInfo);
246    route!(submit_block_call, SubmitBlock);
247    route!(get_block_template_call, GetBlockTemplate);
248    route!(get_block_call, GetBlock);
249    route!(get_info_call, GetInfo);
250    route!(get_current_network_call, GetCurrentNetwork);
251    route!(get_peer_addresses_call, GetPeerAddresses);
252    route!(get_sink_call, GetSink);
253    route!(get_mempool_entry_call, GetMempoolEntry);
254    route!(get_mempool_entries_call, GetMempoolEntries);
255    route!(get_connected_peer_info_call, GetConnectedPeerInfo);
256    route!(add_peer_call, AddPeer);
257    route!(submit_transaction_call, SubmitTransaction);
258    route!(submit_transaction_replacement_call, SubmitTransactionReplacement);
259    route!(get_subnetwork_call, GetSubnetwork);
260    route!(get_virtual_chain_from_block_call, GetVirtualChainFromBlock);
261    route!(get_blocks_call, GetBlocks);
262    route!(get_block_count_call, GetBlockCount);
263    route!(get_block_dag_info_call, GetBlockDagInfo);
264    route!(resolve_finality_conflict_call, ResolveFinalityConflict);
265    route!(shutdown_call, Shutdown);
266    route!(get_headers_call, GetHeaders);
267    route!(get_utxos_by_addresses_call, GetUtxosByAddresses);
268    route!(get_balance_by_address_call, GetBalanceByAddress);
269    route!(get_balances_by_addresses_call, GetBalancesByAddresses);
270    route!(get_sink_blue_score_call, GetSinkBlueScore);
271    route!(ban_call, Ban);
272    route!(unban_call, Unban);
273    route!(estimate_network_hashes_per_second_call, EstimateNetworkHashesPerSecond);
274    route!(get_mempool_entries_by_addresses_call, GetMempoolEntriesByAddresses);
275    route!(get_coin_supply_call, GetCoinSupply);
276    route!(get_daa_score_timestamp_estimate_call, GetDaaScoreTimestampEstimate);
277    route!(get_fee_estimate_call, GetFeeEstimate);
278    route!(get_fee_estimate_experimental_call, GetFeeEstimateExperimental);
279    route!(get_current_block_color_call, GetCurrentBlockColor);
280
281    // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
282    // Notification API
283
284    /// Register a new listener and returns an id identifying it.
285    fn register_new_listener(&self, connection: ChannelConnection) -> ListenerId {
286        match self.notification_mode {
287            NotificationMode::MultiListeners => {
288                self.notifier.as_ref().unwrap().register_new_listener(connection, ListenerLifespan::Dynamic)
289            }
290            // In direct mode, listener registration/unregistration is ignored
291            NotificationMode::Direct => Self::DIRECT_MODE_LISTENER_ID,
292        }
293    }
294
295    /// Unregister an existing listener.
296    ///
297    /// Stop all notifications for this listener, unregister the id and its associated connection.
298    async fn unregister_listener(&self, id: ListenerId) -> RpcResult<()> {
299        match self.notification_mode {
300            NotificationMode::MultiListeners => {
301                self.notifier.as_ref().unwrap().unregister_listener(id)?;
302            }
303            // In direct mode, listener registration/unregistration is ignored
304            NotificationMode::Direct => {}
305        }
306        Ok(())
307    }
308
309    /// Start sending notifications of some type to a listener.
310    async fn start_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> {
311        match self.notification_mode {
312            NotificationMode::MultiListeners => {
313                self.notifier.clone().unwrap().try_start_notify(id, scope)?;
314            }
315            NotificationMode::Direct => {
316                if self.inner.will_reconnect() {
317                    let event = scope.event_type();
318                    self.subscriptions.as_ref().unwrap().lock().await[event].mutate(
319                        Mutation::new(Command::Start, scope.clone()),
320                        self.policies,
321                        &self.subscription_context,
322                    )?;
323                }
324                self.inner.start_notify_to_client(scope).await?;
325            }
326        }
327        Ok(())
328    }
329
330    /// Stop sending notifications of some type to a listener.
331    async fn stop_notify(&self, id: ListenerId, scope: Scope) -> RpcResult<()> {
332        if self.handle_stop_notify() {
333            match self.notification_mode {
334                NotificationMode::MultiListeners => {
335                    self.notifier.clone().unwrap().try_stop_notify(id, scope)?;
336                }
337                NotificationMode::Direct => {
338                    if self.inner.will_reconnect() {
339                        let event = scope.event_type();
340                        self.subscriptions.as_ref().unwrap().lock().await[event].mutate(
341                            Mutation::new(Command::Stop, scope.clone()),
342                            self.policies,
343                            &self.subscription_context,
344                        )?;
345                    }
346                    self.inner.stop_notify_to_client(scope).await?;
347                }
348            }
349            Ok(())
350        } else {
351            Err(RpcError::UnsupportedFeature)
352        }
353    }
354}
355
356pub const CONNECT_TIMEOUT_DURATION: u64 = 20_000;
357pub const REQUEST_TIMEOUT_DURATION: u64 = 5_000;
358pub const TIMEOUT_MONITORING_INTERVAL: u64 = 10_000;
359pub const RECONNECT_INTERVAL: u64 = 2_000;
360
361type KaspadRequestSender = async_channel::Sender<KaspadRequest>;
362type KaspadRequestReceiver = async_channel::Receiver<KaspadRequest>;
363
364#[derive(Debug, Default)]
365struct ServerFeatures {
366    pub handle_stop_notify: bool,
367    pub handle_message_id: bool,
368}
369
370/// A struct to handle messages flowing to (requests) and from (responses) a protowire server.
371/// Incoming responses are associated to pending requests based on their matching operation
372/// type and, for some operations like [`ClientApiOps::GetBlock`], on their properties.
373///
374/// Data flow:
375/// ```
376/// //   KaspadRequest -> request_send -> stream -> KaspadResponse
377/// ```
378///
379/// Execution flow:
380/// ```
381/// // | call ---------------------------------------------------->|
382/// //                                  | response_receiver_task ->|
383/// ```
384///
385///
386/// #### Further development
387///
388/// TODO:
389///
390/// Carry any subscribe call result up to the initial GrpcClient::start_notify execution.
391/// For now, GrpcClient::start_notify only gets a result reflecting the call to
392/// Notifier::try_send_dispatch. This is not complete.
393///
394/// Investigate a possible bottleneck in handle_response with the processing of pendings.
395/// If this is the case, some concurrent alternative should be considered.
396///
397/// Design/flow:
398///
399/// Currently call is blocking until response_receiver_task or timeout_task do solve the pending.
400/// So actual concurrency must happen higher in the code.
401/// Is there a better way to handle the flow?
402///
403#[derive(Debug)]
404struct Inner {
405    url: String,
406
407    server_features: ServerFeatures,
408
409    // Pushing incoming notifications forward
410    notification_channel: NotificationChannel,
411
412    // Sending to server
413    request_sender: KaspadRequestSender,
414    request_receiver: KaspadRequestReceiver,
415
416    // Receiving from server
417    receiver_is_running: AtomicBool,
418    receiver_shutdown: DuplexTrigger,
419
420    /// Matching responses with pending requests
421    resolver: DynResolver,
422
423    // Pending timeout cleaning task
424    timeout_is_running: AtomicBool,
425    timeout_shutdown: DuplexTrigger,
426    timeout_timer_interval: u64,
427    timeout_duration: u64,
428
429    // Connection monitor allowing to reconnect automatically to the server
430    connector_is_running: AtomicBool,
431    connector_shutdown: DuplexTrigger,
432    connector_timer_interval: u64,
433
434    // Connection event channel
435    connection_event_sender: Option<Sender<ConnectionEvent>>,
436
437    // temporary hack to override the handle_stop_notify flag
438    override_handle_stop_notify: bool,
439
440    // bandwidth counters
441    counters: Arc<TowerConnectionCounters>,
442}
443
444impl Inner {
445    fn new(
446        url: String,
447        server_features: ServerFeatures,
448        request_sender: KaspadRequestSender,
449        request_receiver: KaspadRequestReceiver,
450        connection_event_sender: Option<Sender<ConnectionEvent>>,
451        override_handle_stop_notify: bool,
452        timeout_duration: u64,
453        counters: Arc<TowerConnectionCounters>,
454    ) -> Self {
455        let resolver: DynResolver = match server_features.handle_message_id {
456            true => Arc::new(IdResolver::new()),
457            false => Arc::new(QueueResolver::new()),
458        };
459        let notification_channel = Channel::default();
460        Self {
461            url,
462            server_features,
463            notification_channel,
464            request_sender,
465            request_receiver,
466            resolver,
467            receiver_is_running: AtomicBool::new(false),
468            receiver_shutdown: DuplexTrigger::new(),
469            timeout_is_running: AtomicBool::new(false),
470            timeout_shutdown: DuplexTrigger::new(),
471            timeout_duration,
472            timeout_timer_interval: TIMEOUT_MONITORING_INTERVAL,
473            connector_is_running: AtomicBool::new(false),
474            connector_shutdown: DuplexTrigger::new(),
475            connector_timer_interval: RECONNECT_INTERVAL,
476            connection_event_sender,
477            override_handle_stop_notify,
478            counters,
479        }
480    }
481
482    // TODO - remove the override (discuss how to handle this in relation to the golang client)
483    async fn connect(
484        url: String,
485        connection_event_sender: Option<Sender<ConnectionEvent>>,
486        override_handle_stop_notify: bool,
487        timeout_duration: u64,
488        counters: Arc<TowerConnectionCounters>,
489    ) -> Result<Arc<Self>> {
490        // Request channel
491        let (request_sender, request_receiver) = async_channel::unbounded();
492
493        // Try to connect to the server
494        let (stream, server_features) =
495            Inner::try_connect(url.clone(), request_sender.clone(), request_receiver.clone(), timeout_duration, counters.clone())
496                .await?;
497
498        // create the inner object
499        let inner = Arc::new(Inner::new(
500            url,
501            server_features,
502            request_sender,
503            request_receiver,
504            connection_event_sender,
505            override_handle_stop_notify,
506            timeout_duration,
507            counters,
508        ));
509
510        // Start the request timeout cleaner
511        inner.clone().spawn_request_timeout_monitor();
512
513        // Start the response receiving task
514        inner.clone().spawn_response_receiver_task(stream);
515
516        trace!("GRPC client: connected");
517        Ok(inner)
518    }
519
520    #[allow(unused_variables)]
521    async fn try_connect(
522        url: String,
523        request_sender: KaspadRequestSender,
524        request_receiver: KaspadRequestReceiver,
525        request_timeout: u64,
526        counters: Arc<TowerConnectionCounters>,
527    ) -> Result<(Streaming<KaspadResponse>, ServerFeatures)> {
528        // gRPC endpoint
529        #[cfg(not(feature = "heap"))]
530        let channel =
531            tonic::transport::Channel::builder(url.parse::<tonic::transport::Uri>().map_err(|e| Error::String(e.to_string()))?)
532                .timeout(tokio::time::Duration::from_millis(request_timeout))
533                .connect_timeout(tokio::time::Duration::from_millis(CONNECT_TIMEOUT_DURATION))
534                .connect()
535                .await?;
536
537        #[cfg(feature = "heap")]
538        let channel =
539            tonic::transport::Channel::builder(url.parse::<tonic::transport::Uri>().map_err(|e| Error::String(e.to_string()))?)
540                .connect()
541                .await?;
542
543        let bytes_rx = &counters.bytes_rx;
544        let bytes_tx = &counters.bytes_tx;
545        let channel = ServiceBuilder::new()
546            .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, bytes_rx.clone())))
547            .layer(measure_request_body_size_layer(bytes_tx.clone(), |body| {
548                body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync()
549            }))
550            .service(channel);
551
552        // Build the gRPC client with an interceptor setting the request timeout
553        #[cfg(not(feature = "heap"))]
554        let request_timeout = tokio::time::Duration::from_millis(request_timeout);
555        #[cfg(not(feature = "heap"))]
556        let mut client = RpcClient::with_interceptor(channel, move |mut req: tonic::Request<()>| {
557            req.set_timeout(request_timeout);
558            Ok(req)
559        });
560
561        #[cfg(feature = "heap")]
562        let mut client = RpcClient::new(channel);
563
564        client = client
565            .send_compressed(CompressionEncoding::Gzip)
566            .accept_compressed(CompressionEncoding::Gzip)
567            .max_decoding_message_size(RPC_MAX_MESSAGE_SIZE);
568
569        // Prepare a request receiver stream
570        let stream_receiver = request_receiver.clone();
571        let request_stream = async_stream::stream! {
572            while let Ok(item) = stream_receiver.recv().await {
573                yield item;
574            }
575        };
576
577        // Actual KaspadRequest to KaspadResponse stream
578        let mut stream: Streaming<KaspadResponse> = client.message_stream(request_stream).await?.into_inner();
579
580        // Collect server capabilities as stated in GetInfoResponse
581        let mut server_features = ServerFeatures::default();
582        request_sender.send(GetInfoRequestMessage {}.into()).await?;
583        match stream.message().await? {
584            Some(ref msg) => {
585                trace!("GRPC client: try_connect - GetInfo got a response");
586                let response: RpcResult<GetInfoResponse> = msg.try_into();
587                if let Ok(response) = response {
588                    server_features.handle_stop_notify = response.has_notify_command;
589                    server_features.handle_message_id = response.has_message_id;
590                }
591            }
592            None => {
593                debug!("GRPC client: try_connect - stream closed by the server");
594                return Err(Error::String("GRPC stream was closed by the server".to_string()));
595            }
596        }
597
598        Ok((stream, server_features))
599    }
600
601    async fn reconnect(
602        self: Arc<Self>,
603        notifier: Option<Arc<GrpcClientNotifier>>,
604        subscriptions: Option<Arc<DirectSubscriptions>>,
605        subscription_context: &SubscriptionContext,
606    ) -> RpcResult<()> {
607        assert_ne!(
608            notifier.is_some(),
609            subscriptions.is_some(),
610            "exclusively either a notifier in MultiListener mode or subscriptions in Direct mode"
611        );
612        // TODO: verify if server feature have changed since first connection
613
614        // Try to connect to the server
615        let (stream, _) = Inner::try_connect(
616            self.url.clone(),
617            self.request_sender.clone(),
618            self.request_receiver.clone(),
619            self.timeout_duration,
620            self.counters.clone(),
621        )
622        .await?;
623
624        // Start the response receiving task
625        self.clone().spawn_response_receiver_task(stream);
626
627        // Re-register the compounded subscription state of the notifier in MultiListener mode
628        if let Some(notifier) = notifier.as_ref() {
629            notifier.try_renew_subscriptions()?;
630        }
631
632        // Re-register the subscriptions state in Direct mode
633        if let Some(subscriptions) = subscriptions.as_ref() {
634            let subscriptions = subscriptions.lock().await;
635            for event in EVENT_TYPE_ARRAY {
636                if subscriptions[event].active() {
637                    self.clone().start_notify_to_client(subscriptions[event].scope(subscription_context)).await?;
638                }
639            }
640        }
641
642        debug!("GRPC client: reconnected");
643        Ok(())
644    }
645
646    pub fn notification_channel_receiver(&self) -> Receiver<Notification> {
647        self.notification_channel.receiver()
648    }
649
650    fn send_connection_event(&self, event: ConnectionEvent) {
651        if let Some(ref connection_event_sender) = self.connection_event_sender {
652            if let Err(err) = connection_event_sender.try_send(event) {
653                debug!("Send connection event error: {err}");
654            }
655        }
656    }
657
658    fn is_connected(&self) -> bool {
659        self.receiver_is_running.load(Ordering::SeqCst)
660    }
661
662    fn will_reconnect(&self) -> bool {
663        self.connector_is_running.load(Ordering::SeqCst)
664    }
665
666    #[inline(always)]
667    fn handle_message_id(&self) -> bool {
668        self.server_features.handle_message_id
669    }
670
671    #[inline(always)]
672    fn handle_stop_notify(&self) -> bool {
673        // TODO - remove this
674        if self.override_handle_stop_notify {
675            true
676        } else {
677            self.server_features.handle_stop_notify
678        }
679    }
680
681    #[inline(always)]
682    fn resolver(&self) -> DynResolver {
683        self.resolver.clone()
684    }
685
686    async fn call(&self, op: KaspadPayloadOps, request: impl Into<KaspadRequest>) -> Result<KaspadResponse> {
687        // Calls are only allowed if the client is connected to the server
688        if self.is_connected() {
689            let id = u64::from_le_bytes(rand::random::<[u8; 8]>());
690            let mut request: KaspadRequest = request.into();
691            request.id = id;
692
693            trace!("GRPC client: resolver call: {:?}", request);
694            if request.payload.is_some() {
695                let receiver = self.resolver().register_request(op, &request);
696                self.request_sender.send(request).await.map_err(|_| Error::ChannelRecvError)?;
697                receiver.await?
698            } else {
699                Err(Error::MissingRequestPayload)
700            }
701        } else {
702            Err(Error::NotConnected)
703        }
704    }
705
706    /// Launch a task that periodically checks pending requests and deletes those that have
707    /// waited longer than a predefined delay.
708    fn spawn_request_timeout_monitor(self: Arc<Self>) {
709        // Note: self is a cloned Arc here so that it can be used in the spawned task.
710
711        // The task can only be spawned once
712        if self.timeout_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
713            trace!("GRPC client: timeout task - spawn request ignored since already spawned");
714            return;
715        }
716
717        tokio::spawn(async move {
718            trace!("GRPC client: timeout task - started");
719            let shutdown = self.timeout_shutdown.request.listener.clone().fuse();
720            pin_mut!(shutdown);
721
722            loop {
723                let timeout_timer_interval = Duration::from_millis(self.timeout_timer_interval);
724                let delay = tokio::time::sleep(timeout_timer_interval).fuse();
725                pin_mut!(delay);
726
727                select! {
728                    _ = shutdown => { break; },
729                    _ = delay => {
730                        trace!("GRPC client: timeout task - running");
731                        let timeout = Duration::from_millis(self.timeout_duration);
732                        self.resolver().remove_expired_requests(timeout);
733                    },
734                }
735            }
736            self.timeout_is_running.store(false, Ordering::SeqCst);
737            self.timeout_shutdown.response.trigger.trigger();
738
739            trace!("GRPC client: timeout task - terminated");
740        });
741    }
742
743    /// Launch a task receiving and handling response messages sent by the server.
744    fn spawn_response_receiver_task(self: Arc<Self>, mut stream: Streaming<KaspadResponse>) {
745        // Note: self is a cloned Arc here so that it can be used in the spawned task.
746
747        // The task can only be spawned once
748        if self.receiver_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
749            trace!("GRPC client: response receiver task - spawn ignored since already spawned");
750            return;
751        }
752
753        // Send connection event
754        self.send_connection_event(ConnectionEvent::Connected);
755
756        tokio::spawn(async move {
757            trace!("GRPC client: response receiver task - started");
758            loop {
759                let shutdown = self.receiver_shutdown.request.listener.clone();
760                pin_mut!(shutdown);
761
762                tokio::select! {
763                    biased;
764
765                    _ = shutdown => {
766                        break;
767                    }
768
769                    message = stream.message() => {
770                        match message {
771                            Ok(msg) => {
772                                match msg {
773                                    Some(response) => {
774                                        self.handle_response(response);
775                                    },
776                                    None =>{
777                                        debug!("GRPC client: response receiver task - the connection to the server is closed");
778
779                                        // A reconnection is needed
780                                        break;
781                                    }
782                                }
783                            },
784                            Err(err) => {
785                                debug!("GRPC client: response receiver task - the response receiver gets an error from the server: {:?}", err);
786
787                                // TODO: ignore cases not requiring a reconnection
788
789                                // A reconnection is needed
790                                break;
791                            }
792                        }
793                    }
794                }
795            }
796            // Mark as not connected
797            self.receiver_is_running.store(false, Ordering::SeqCst);
798            self.send_connection_event(ConnectionEvent::Disconnected);
799
800            // Close the notification channel so that notifiers/collectors/subscribers can be joined on
801            if !self.will_reconnect() {
802                self.notification_channel.close();
803            }
804
805            if self.receiver_shutdown.request.listener.is_triggered() {
806                self.receiver_shutdown.response.trigger.trigger();
807            }
808
809            trace!("GRPC client: response receiver task - terminated");
810        });
811    }
812
813    /// Launch a task that periodically checks if the connection to the server is alive
814    /// and if not that tries to reconnect to the server.
815    fn spawn_connection_monitor(
816        self: Arc<Self>,
817        notifier: Option<Arc<GrpcClientNotifier>>,
818        subscriptions: Option<Arc<DirectSubscriptions>>,
819        subscription_context: SubscriptionContext,
820    ) {
821        // Note: self is a cloned Arc here so that it can be used in the spawned task.
822
823        // The task can only be spawned once
824        if self.connector_is_running.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
825            trace!("GRPC client: connection monitor task - spawn ignored since already spawned");
826            return;
827        }
828
829        tokio::spawn(async move {
830            trace!("GRPC client: connection monitor task - started");
831            let shutdown = self.connector_shutdown.request.listener.clone().fuse();
832            pin_mut!(shutdown);
833            loop {
834                let connector_timer_interval = Duration::from_millis(self.connector_timer_interval);
835                let delay = tokio::time::sleep(connector_timer_interval).fuse();
836                pin_mut!(delay);
837                select! {
838                    _ = shutdown => { break; },
839                    _ = delay => {
840                        trace!("GRPC client: connection monitor task - running");
841                        if !self.is_connected() {
842                            match self.clone().reconnect(notifier.clone(), subscriptions.clone(), &subscription_context).await {
843                                Ok(_) => {
844                                    trace!("GRPC client: reconnection to server succeeded");
845                                },
846                                Err(err) => {
847                                    trace!("GRPC client: reconnection to server failed with error {err:?}");
848                                }
849                            }
850                        }
851                    },
852                }
853            }
854            self.connector_is_running.store(false, Ordering::SeqCst);
855            self.connector_shutdown.response.trigger.trigger();
856            trace!("GRPC client: connection monitor task - terminating");
857        });
858    }
859
860    fn handle_response(&self, response: KaspadResponse) {
861        if response.is_notification() {
862            trace!("GRPC client: handle_response received a notification");
863            match Notification::try_from(&response) {
864                Ok(notification) => {
865                    let event: EventType = (&notification).into();
866                    trace!("GRPC client: handle_response received notification: {:?}", event);
867
868                    // Here we ignore any returned error
869                    match self.notification_channel.try_send(notification) {
870                        Ok(_) => {}
871                        Err(err) => {
872                            error!("GRPC client: error while trying to send a notification to the notifier: {:?}", err);
873                        }
874                    }
875                }
876                Err(err) => {
877                    error!("GRPC client: handle_response error converting response into notification: {:?}", err);
878                }
879            }
880        } else if response.payload.is_some() {
881            self.resolver().handle_response(response);
882        }
883    }
884
885    async fn disconnect(&self) -> Result<()> {
886        self.stop_connector_monitor().await?;
887        self.stop_timeout_monitor().await?;
888        self.stop_response_receiver_task().await?;
889        self.request_receiver.close();
890        trace!("GRPC client: disconnected");
891        Ok(())
892    }
893
894    async fn stop_response_receiver_task(&self) -> Result<()> {
895        if self.receiver_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
896            self.receiver_shutdown.request.trigger.trigger();
897            self.receiver_shutdown.response.listener.clone().await;
898        }
899        Ok(())
900    }
901
902    async fn stop_timeout_monitor(&self) -> Result<()> {
903        if self.timeout_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
904            self.timeout_shutdown.request.trigger.trigger();
905            self.timeout_shutdown.response.listener.clone().await;
906        }
907        Ok(())
908    }
909
910    async fn stop_connector_monitor(&self) -> Result<()> {
911        if self.connector_is_running.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
912            self.connector_shutdown.request.trigger.trigger();
913            self.connector_shutdown.response.listener.clone().await;
914        }
915        Ok(())
916    }
917
918    /// Start sending notifications of some type to the client.
919    async fn start_notify_to_client(&self, scope: Scope) -> RpcResult<()> {
920        let request = kaspad_request::Payload::from_notification_type(&scope, Command::Start);
921        self.call((&request).into(), request).await?;
922        Ok(())
923    }
924
925    /// Stop sending notifications of some type to the client.
926    async fn stop_notify_to_client(&self, scope: Scope) -> RpcResult<()> {
927        if self.handle_stop_notify() {
928            let request = kaspad_request::Payload::from_notification_type(&scope, Command::Stop);
929            self.call((&request).into(), request).await?;
930        }
931        Ok(())
932    }
933}
934
935#[async_trait]
936impl SubscriptionManager for Inner {
937    async fn start_notify(&self, _: ListenerId, scope: Scope) -> NotifyResult<()> {
938        trace!("GRPC client: start_notify: {:?}", scope);
939        self.start_notify_to_client(scope).await.map_err(|err| NotifyError::General(err.to_string()))?;
940        Ok(())
941    }
942
943    async fn stop_notify(&self, _: ListenerId, scope: Scope) -> NotifyResult<()> {
944        if self.handle_stop_notify() {
945            trace!("GRPC client: stop_notify: {:?}", scope);
946            self.stop_notify_to_client(scope).await.map_err(|err| NotifyError::General(err.to_string()))?;
947        } else {
948            trace!("GRPC client: stop_notify ignored because not supported by the server: {:?}", scope);
949        }
950        Ok(())
951    }
952}