tendermint_rpc/client/transport/
websocket.rs

1//! WebSocket-based clients for accessing Tendermint RPC functionality.
2
3use alloc::{borrow::Cow, collections::BTreeMap as HashMap, fmt};
4use core::{ops::Add, str::FromStr};
5
6use async_trait::async_trait;
7use async_tungstenite::{
8    tokio::ConnectStream,
9    tungstenite::{
10        protocol::{frame::coding::CloseCode, CloseFrame},
11        Message,
12    },
13    WebSocketStream,
14};
15use futures::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::time::{Duration, Instant};
18use tracing::{debug, error};
19
20use tendermint::{block::Height, Hash};
21use tendermint_config::net;
22
23use super::router::{SubscriptionId, SubscriptionIdRef};
24use crate::{
25    client::{
26        subscription::SubscriptionTx,
27        sync::{ChannelRx, ChannelTx},
28        transport::router::{PublishResult, SubscriptionRouter},
29        Client, CompatMode,
30    },
31    dialect::{v0_34, Dialect, LatestDialect},
32    endpoint::{self, subscribe, unsubscribe},
33    error::Error,
34    event::{self, Event},
35    prelude::*,
36    query::Query,
37    request::Wrapper,
38    response, Id, Order, Request, Response, Scheme, SimpleRequest, Subscription,
39    SubscriptionClient, Url,
40};
41
42// WebSocket connection times out if we haven't heard anything at all from the
43// server in this long.
44//
45// Taken from https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L27
46const RECV_TIMEOUT_SECONDS: u64 = 30;
47
48const RECV_TIMEOUT: Duration = Duration::from_secs(RECV_TIMEOUT_SECONDS);
49
50// How frequently to send ping messages to the WebSocket server.
51//
52// Taken from https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28
53const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / 10);
54
55/// Low-level WebSocket configuration
56pub use async_tungstenite::tungstenite::protocol::WebSocketConfig;
57
58/// Tendermint RPC client that provides access to all RPC functionality
59/// (including [`Event`] subscription) over a WebSocket connection.
60///
61/// The `WebSocketClient` itself is effectively just a handle to its driver
62/// The driver is the component of the client that actually interacts with the
63/// remote RPC over the WebSocket connection. The `WebSocketClient` can
64/// therefore be cloned into different asynchronous contexts, effectively
65/// allowing for asynchronous access to the driver.
66///
67/// It is the caller's responsibility to spawn an asynchronous task in which to
68/// execute the [`WebSocketClientDriver::run`] method. See the example below.
69///
70/// Dropping [`Subscription`]s will automatically terminate them (the
71/// `WebSocketClientDriver` detects a disconnected channel and removes the
72/// subscription from its internal routing table). When all subscriptions to a
73/// particular query have disconnected, the driver will automatically issue an
74/// unsubscribe request to the remote RPC endpoint.
75///
76/// ### Timeouts
77///
78/// The WebSocket client connection times out after 30 seconds if it does not
79/// receive anything at all from the server. This will automatically return
80/// errors to all active subscriptions and terminate them.
81///
82/// This is not configurable at present.
83///
84/// ### Keep-Alive
85///
86/// The WebSocket client implements a keep-alive mechanism whereby it sends a
87/// PING message to the server every 27 seconds, matching the PING cadence of
88/// the Tendermint server (see [this code][tendermint-websocket-ping] for
89/// details).
90///
91/// This is not configurable at present.
92///
93/// ## Examples
94///
95/// ```rust,ignore
96/// use tendermint::abci::Transaction;
97/// use tendermint_rpc::{WebSocketClient, SubscriptionClient, Client};
98/// use tendermint_rpc::query::EventType;
99/// use futures::StreamExt;
100///
101/// #[tokio::main]
102/// async fn main() {
103///     let (client, driver) = WebSocketClient::new("ws://127.0.0.1:26657/websocket")
104///         .await
105///         .unwrap();
106///     let driver_handle = tokio::spawn(async move { driver.run().await });
107///
108///     // Standard client functionality
109///     let tx = format!("some-key=some-value");
110///     client.broadcast_tx_async(Transaction::from(tx.into_bytes())).await.unwrap();
111///
112///     // Subscription functionality
113///     let mut subs = client.subscribe(EventType::NewBlock.into())
114///         .await
115///         .unwrap();
116///
117///     // Grab 5 NewBlock events
118///     let mut ev_count = 5_i32;
119///
120///     while let Some(res) = subs.next().await {
121///         let ev = res.unwrap();
122///         println!("Got event: {:?}", ev);
123///         ev_count -= 1;
124///         if ev_count < 0 {
125///             break;
126///         }
127///     }
128///
129///     // Signal to the driver to terminate.
130///     client.close().unwrap();
131///     // Await the driver's termination to ensure proper connection closure.
132///     let _ = driver_handle.await.unwrap();
133/// }
134/// ```
135///
136/// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28
137#[derive(Debug, Clone)]
138pub struct WebSocketClient {
139    inner: sealed::WebSocketClient,
140    compat: CompatMode,
141}
142
143/// The builder pattern constructor for [`WebSocketClient`].
144pub struct Builder {
145    url: WebSocketClientUrl,
146    compat: CompatMode,
147    transport_config: Option<WebSocketConfig>,
148}
149
150impl Builder {
151    /// Use the specified compatibility mode for the Tendermint RPC protocol.
152    ///
153    /// The default is the latest protocol version supported by this crate.
154    pub fn compat_mode(mut self, mode: CompatMode) -> Self {
155        self.compat = mode;
156        self
157    }
158
159    /// Use the specified low-level WebSocket configuration options.
160    pub fn config(mut self, config: WebSocketConfig) -> Self {
161        self.transport_config = Some(config);
162        self
163    }
164
165    /// Try to create a client with the options specified for this builder.
166    pub async fn build(self) -> Result<(WebSocketClient, WebSocketClientDriver), Error> {
167        let url = self.url.0;
168        let compat = self.compat;
169        let (inner, driver) = if url.is_secure() {
170            sealed::WebSocketClient::new_secure(url, compat, self.transport_config).await?
171        } else {
172            sealed::WebSocketClient::new_unsecure(url, compat, self.transport_config).await?
173        };
174
175        Ok((WebSocketClient { inner, compat }, driver))
176    }
177}
178
179impl WebSocketClient {
180    /// Construct a new WebSocket-based client connecting to the given
181    /// Tendermint node's RPC endpoint.
182    ///
183    /// Supports both `ws://` and `wss://` protocols.
184    pub async fn new<U>(url: U) -> Result<(Self, WebSocketClientDriver), Error>
185    where
186        U: TryInto<WebSocketClientUrl, Error = Error>,
187    {
188        let url = url.try_into()?;
189        Self::builder(url).build().await
190    }
191
192    /// Construct a new WebSocket-based client connecting to the given
193    /// Tendermint node's RPC endpoint.
194    ///
195    /// Supports both `ws://` and `wss://` protocols.
196    pub async fn new_with_config<U>(
197        url: U,
198        config: WebSocketConfig,
199    ) -> Result<(Self, WebSocketClientDriver), Error>
200    where
201        U: TryInto<WebSocketClientUrl, Error = Error>,
202    {
203        let url = url.try_into()?;
204        Self::builder(url).config(config).build().await
205    }
206
207    /// Initiate a builder for a WebSocket-based client connecting to the given
208    /// Tendermint node's RPC endpoint.
209    ///
210    /// Supports both `ws://` and `wss://` protocols.
211    pub fn builder(url: WebSocketClientUrl) -> Builder {
212        Builder {
213            url,
214            compat: Default::default(),
215            transport_config: Default::default(),
216        }
217    }
218
219    async fn perform_with_dialect<R, S>(&self, request: R, dialect: S) -> Result<R::Output, Error>
220    where
221        R: SimpleRequest<S>,
222        S: Dialect,
223    {
224        self.inner.perform(request, dialect).await
225    }
226}
227
228#[async_trait]
229impl Client for WebSocketClient {
230    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
231    where
232        R: SimpleRequest,
233    {
234        self.perform_with_dialect(request, LatestDialect).await
235    }
236
237    async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
238    where
239        H: Into<Height> + Send,
240    {
241        perform_with_compat!(self, endpoint::block::Request::new(height.into()))
242    }
243
244    async fn block_by_hash(
245        &self,
246        hash: tendermint::Hash,
247    ) -> Result<endpoint::block_by_hash::Response, Error> {
248        perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
249    }
250
251    async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
252        perform_with_compat!(self, endpoint::block::Request::default())
253    }
254
255    async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
256    where
257        H: Into<Height> + Send,
258    {
259        perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
260    }
261
262    async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
263        perform_with_compat!(self, endpoint::block_results::Request::default())
264    }
265
266    async fn block_search(
267        &self,
268        query: Query,
269        page: u32,
270        per_page: u8,
271        order: Order,
272    ) -> Result<endpoint::block_search::Response, Error> {
273        perform_with_compat!(
274            self,
275            endpoint::block_search::Request::new(query, page, per_page, order)
276        )
277    }
278
279    async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
280    where
281        H: Into<Height> + Send,
282    {
283        let height = height.into();
284        match self.compat {
285            CompatMode::V0_38 => self.perform(endpoint::header::Request::new(height)).await,
286            CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
287            CompatMode::V0_34 => {
288                // Back-fill with a request to /block endpoint and
289                // taking just the header from the response.
290                let resp = self
291                    .perform_with_dialect(endpoint::block::Request::new(height), v0_34::Dialect)
292                    .await?;
293                Ok(resp.into())
294            },
295        }
296    }
297
298    async fn header_by_hash(
299        &self,
300        hash: Hash,
301    ) -> Result<endpoint::header_by_hash::Response, Error> {
302        match self.compat {
303            CompatMode::V0_38 => {
304                self.perform(endpoint::header_by_hash::Request::new(hash))
305                    .await
306            },
307            CompatMode::V0_37 => {
308                self.perform(endpoint::header_by_hash::Request::new(hash))
309                    .await
310            },
311            CompatMode::V0_34 => {
312                // Back-fill with a request to /block_by_hash endpoint and
313                // taking just the header from the response.
314                let resp = self
315                    .perform_with_dialect(
316                        endpoint::block_by_hash::Request::new(hash),
317                        v0_34::Dialect,
318                    )
319                    .await?;
320                Ok(resp.into())
321            },
322        }
323    }
324
325    async fn tx(&self, hash: Hash, prove: bool) -> Result<endpoint::tx::Response, Error> {
326        perform_with_compat!(self, endpoint::tx::Request::new(hash, prove))
327    }
328
329    async fn tx_search(
330        &self,
331        query: Query,
332        prove: bool,
333        page: u32,
334        per_page: u8,
335        order: Order,
336    ) -> Result<endpoint::tx_search::Response, Error> {
337        perform_with_compat!(
338            self,
339            endpoint::tx_search::Request::new(query, prove, page, per_page, order)
340        )
341    }
342
343    async fn broadcast_tx_commit<T>(
344        &self,
345        tx: T,
346    ) -> Result<endpoint::broadcast::tx_commit::Response, Error>
347    where
348        T: Into<Vec<u8>> + Send,
349    {
350        perform_with_compat!(self, endpoint::broadcast::tx_commit::Request::new(tx))
351    }
352}
353
354#[async_trait]
355impl SubscriptionClient for WebSocketClient {
356    async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
357        self.inner.subscribe(query).await
358    }
359
360    async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
361        self.inner.unsubscribe(query).await
362    }
363
364    fn close(self) -> Result<(), Error> {
365        self.inner.close()
366    }
367}
368
369/// A URL limited to use with WebSocket clients.
370///
371/// Facilitates useful type conversions and inferences.
372#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
373#[serde(transparent)]
374pub struct WebSocketClientUrl(Url);
375
376impl TryFrom<Url> for WebSocketClientUrl {
377    type Error = Error;
378
379    fn try_from(value: Url) -> Result<Self, Error> {
380        match value.scheme() {
381            Scheme::WebSocket | Scheme::SecureWebSocket => Ok(Self(value)),
382            _ => Err(Error::invalid_params(format!(
383                "cannot use URL {value} with WebSocket clients"
384            ))),
385        }
386    }
387}
388
389impl FromStr for WebSocketClientUrl {
390    type Err = Error;
391
392    fn from_str(s: &str) -> Result<Self, Error> {
393        let url: Url = s.parse()?;
394        url.try_into()
395    }
396}
397
398impl fmt::Display for WebSocketClientUrl {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        self.0.fmt(f)
401    }
402}
403
404impl TryFrom<&str> for WebSocketClientUrl {
405    type Error = Error;
406
407    fn try_from(value: &str) -> Result<Self, Error> {
408        value.parse()
409    }
410}
411
412impl TryFrom<net::Address> for WebSocketClientUrl {
413    type Error = Error;
414
415    fn try_from(value: net::Address) -> Result<Self, Error> {
416        match value {
417            net::Address::Tcp {
418                peer_id: _,
419                host,
420                port,
421            } => format!("ws://{host}:{port}/websocket").parse(),
422            net::Address::Unix { .. } => Err(Error::invalid_params(
423                "only TCP-based node addresses are supported".to_string(),
424            )),
425        }
426    }
427}
428
429impl From<WebSocketClientUrl> for Url {
430    fn from(url: WebSocketClientUrl) -> Self {
431        url.0
432    }
433}
434
435mod sealed {
436    use async_tungstenite::{
437        tokio::{connect_async_with_config, connect_async_with_tls_connector_and_config},
438        tungstenite::client::IntoClientRequest,
439    };
440    use tracing::debug;
441
442    use super::{
443        DriverCommand, SimpleRequestCommand, SubscribeCommand, UnsubscribeCommand,
444        WebSocketClientDriver, WebSocketConfig,
445    };
446    use crate::{
447        client::{
448            sync::{unbounded, ChannelTx},
449            transport::auth::authorize,
450            CompatMode,
451        },
452        dialect::Dialect,
453        prelude::*,
454        query::Query,
455        request::Wrapper,
456        utils::uuid_str,
457        Error, Response, SimpleRequest, Subscription, Url,
458    };
459
460    /// Marker for the [`AsyncTungsteniteClient`] for clients operating over
461    /// unsecure connections.
462    #[derive(Debug, Clone)]
463    pub struct Unsecure;
464
465    /// Marker for the [`AsyncTungsteniteClient`] for clients operating over
466    /// secure connections.
467    #[derive(Debug, Clone)]
468    pub struct Secure;
469
470    /// An [`async-tungstenite`]-based WebSocket client.
471    ///
472    /// Different modes of operation (secure and unsecure) are facilitated by
473    /// different variants of this type.
474    ///
475    /// [`async-tungstenite`]: https://crates.io/crates/async-tungstenite
476    #[derive(Debug, Clone)]
477    pub struct AsyncTungsteniteClient<C> {
478        cmd_tx: ChannelTx<DriverCommand>,
479        _client_type: core::marker::PhantomData<C>,
480    }
481
482    impl AsyncTungsteniteClient<Unsecure> {
483        /// Construct a WebSocket client. Immediately attempts to open a WebSocket
484        /// connection to the node with the given address.
485        ///
486        /// On success, this returns both a client handle (a `WebSocketClient`
487        /// instance) as well as the WebSocket connection driver. The execution of
488        /// this driver becomes the responsibility of the client owner, and must be
489        /// executed in a separate asynchronous context to the client to ensure it
490        /// doesn't block the client.
491        pub async fn new(
492            url: Url,
493            compat: CompatMode,
494            config: Option<WebSocketConfig>,
495        ) -> Result<(Self, WebSocketClientDriver), Error> {
496            debug!("Connecting to unsecure WebSocket endpoint: {}", url);
497
498            let (stream, _response) = connect_async_with_config(url, config)
499                .await
500                .map_err(Error::tungstenite)?;
501
502            let (cmd_tx, cmd_rx) = unbounded();
503            let driver = WebSocketClientDriver::new(stream, cmd_rx, compat);
504            let client = Self {
505                cmd_tx,
506                _client_type: Default::default(),
507            };
508
509            Ok((client, driver))
510        }
511    }
512
513    impl AsyncTungsteniteClient<Secure> {
514        /// Construct a WebSocket client. Immediately attempts to open a WebSocket
515        /// connection to the node with the given address, but over a secure
516        /// connection.
517        ///
518        /// On success, this returns both a client handle (a `WebSocketClient`
519        /// instance) as well as the WebSocket connection driver. The execution of
520        /// this driver becomes the responsibility of the client owner, and must be
521        /// executed in a separate asynchronous context to the client to ensure it
522        /// doesn't block the client.
523        pub async fn new(
524            url: Url,
525            compat: CompatMode,
526            config: Option<WebSocketConfig>,
527        ) -> Result<(Self, WebSocketClientDriver), Error> {
528            debug!("Connecting to secure WebSocket endpoint: {}", url);
529
530            // Not supplying a connector means async_tungstenite will create the
531            // connector for us.
532            let (stream, _response) =
533                connect_async_with_tls_connector_and_config(url, None, config)
534                    .await
535                    .map_err(Error::tungstenite)?;
536
537            let (cmd_tx, cmd_rx) = unbounded();
538            let driver = WebSocketClientDriver::new(stream, cmd_rx, compat);
539            let client = Self {
540                cmd_tx,
541                _client_type: Default::default(),
542            };
543
544            Ok((client, driver))
545        }
546    }
547
548    impl<C> AsyncTungsteniteClient<C> {
549        fn send_cmd(&self, cmd: DriverCommand) -> Result<(), Error> {
550            self.cmd_tx.send(cmd)
551        }
552
553        /// Signals to the driver that it must terminate.
554        pub fn close(self) -> Result<(), Error> {
555            self.send_cmd(DriverCommand::Terminate)
556        }
557    }
558
559    impl<C> AsyncTungsteniteClient<C> {
560        pub async fn perform<R, S>(&self, request: R) -> Result<R::Output, Error>
561        where
562            R: SimpleRequest<S>,
563            S: Dialect,
564        {
565            let wrapper = Wrapper::new(request);
566            let id = wrapper.id().to_string();
567            let wrapped_request = wrapper.into_json();
568
569            tracing::debug!("Outgoing request: {}", wrapped_request);
570
571            let (response_tx, mut response_rx) = unbounded();
572
573            self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand {
574                id,
575                wrapped_request,
576                response_tx,
577            }))?;
578
579            let response = response_rx.recv().await.ok_or_else(|| {
580                Error::client_internal("failed to hear back from WebSocket driver".to_string())
581            })??;
582
583            tracing::debug!("Incoming response: {}", response);
584
585            R::Response::from_string(response).map(Into::into)
586        }
587
588        pub async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
589            let (subscription_tx, subscription_rx) = unbounded();
590            let (response_tx, mut response_rx) = unbounded();
591            // By default we use UUIDs to differentiate subscriptions
592            let id = uuid_str();
593            self.send_cmd(DriverCommand::Subscribe(SubscribeCommand {
594                id: id.to_string(),
595                query: query.to_string(),
596                subscription_tx,
597                response_tx,
598            }))?;
599            // Make sure our subscription request went through successfully.
600            response_rx.recv().await.ok_or_else(|| {
601                Error::client_internal("failed to hear back from WebSocket driver".to_string())
602            })??;
603            Ok(Subscription::new(id, query, subscription_rx))
604        }
605
606        pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
607            let (response_tx, mut response_rx) = unbounded();
608            self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand {
609                query: query.to_string(),
610                response_tx,
611            }))?;
612            response_rx.recv().await.ok_or_else(|| {
613                Error::client_internal("failed to hear back from WebSocket driver".to_string())
614            })??;
615            Ok(())
616        }
617    }
618
619    /// Allows us to erase the type signatures associated with the different
620    /// WebSocket client variants.
621    #[derive(Debug, Clone)]
622    pub enum WebSocketClient {
623        Unsecure(AsyncTungsteniteClient<Unsecure>),
624        Secure(AsyncTungsteniteClient<Secure>),
625    }
626
627    impl WebSocketClient {
628        pub async fn new_unsecure(
629            url: Url,
630            compat: CompatMode,
631            config: Option<WebSocketConfig>,
632        ) -> Result<(Self, WebSocketClientDriver), Error> {
633            let (client, driver) =
634                AsyncTungsteniteClient::<Unsecure>::new(url, compat, config).await?;
635            Ok((Self::Unsecure(client), driver))
636        }
637
638        pub async fn new_secure(
639            url: Url,
640            compat: CompatMode,
641            config: Option<WebSocketConfig>,
642        ) -> Result<(Self, WebSocketClientDriver), Error> {
643            let (client, driver) =
644                AsyncTungsteniteClient::<Secure>::new(url, compat, config).await?;
645            Ok((Self::Secure(client), driver))
646        }
647
648        pub fn close(self) -> Result<(), Error> {
649            match self {
650                WebSocketClient::Unsecure(c) => c.close(),
651                WebSocketClient::Secure(c) => c.close(),
652            }
653        }
654    }
655
656    impl WebSocketClient {
657        pub async fn perform<R, S>(&self, request: R, _dialect: S) -> Result<R::Output, Error>
658        where
659            R: SimpleRequest<S>,
660            S: Dialect,
661        {
662            match self {
663                WebSocketClient::Unsecure(c) => c.perform(request).await,
664                WebSocketClient::Secure(c) => c.perform(request).await,
665            }
666        }
667
668        pub async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
669            match self {
670                WebSocketClient::Unsecure(c) => c.subscribe(query).await,
671                WebSocketClient::Secure(c) => c.subscribe(query).await,
672            }
673        }
674
675        pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
676            match self {
677                WebSocketClient::Unsecure(c) => c.unsubscribe(query).await,
678                WebSocketClient::Secure(c) => c.unsubscribe(query).await,
679            }
680        }
681    }
682
683    use async_tungstenite::tungstenite;
684
685    impl IntoClientRequest for Url {
686        fn into_client_request(
687            self,
688        ) -> tungstenite::Result<tungstenite::handshake::client::Request> {
689            let builder = tungstenite::handshake::client::Request::builder()
690                .method("GET")
691                .header("Host", self.host())
692                .header("Connection", "Upgrade")
693                .header("Upgrade", "websocket")
694                .header("Sec-WebSocket-Version", "13")
695                .header(
696                    "Sec-WebSocket-Key",
697                    tungstenite::handshake::client::generate_key(),
698                );
699
700            let builder = if let Some(auth) = authorize(self.as_ref()) {
701                builder.header("Authorization", auth.to_string())
702            } else {
703                builder
704            };
705
706            builder
707                .uri(self.to_string())
708                .body(())
709                .map_err(tungstenite::error::Error::HttpFormat)
710        }
711    }
712}
713
714// The different types of commands that can be sent from the WebSocketClient to
715// the driver.
716#[derive(Debug, Clone)]
717enum DriverCommand {
718    // Initiate a subscription request.
719    Subscribe(SubscribeCommand),
720    // Initiate an unsubscribe request.
721    Unsubscribe(UnsubscribeCommand),
722    // For non-subscription-related requests.
723    SimpleRequest(SimpleRequestCommand),
724    Terminate,
725}
726
727#[derive(Debug, Clone)]
728struct SubscribeCommand {
729    // The desired ID for the outgoing JSON-RPC request.
730    id: String,
731    // The query for which we want to receive events.
732    query: String,
733    // Where to send subscription events.
734    subscription_tx: SubscriptionTx,
735    // Where to send the result of the subscription request.
736    response_tx: ChannelTx<Result<(), Error>>,
737}
738
739#[derive(Debug, Clone)]
740struct UnsubscribeCommand {
741    // The query from which to unsubscribe.
742    query: String,
743    // Where to send the result of the unsubscribe request.
744    response_tx: ChannelTx<Result<(), Error>>,
745}
746
747#[derive(Debug, Clone)]
748struct SimpleRequestCommand {
749    // The desired ID for the outgoing JSON-RPC request. Technically we
750    // could extract this from the wrapped request, but that would mean
751    // additional unnecessary computational resources for deserialization.
752    id: String,
753    // The wrapped and serialized JSON-RPC request.
754    wrapped_request: String,
755    // Where to send the result of the simple request.
756    response_tx: ChannelTx<Result<String, Error>>,
757}
758
759#[derive(Serialize, Deserialize, Debug, Clone)]
760struct GenericJsonResponse(serde_json::Value);
761
762impl Response for GenericJsonResponse {}
763
764/// Drives the WebSocket connection for a `WebSocketClient` instance.
765///
766/// This is the primary component responsible for transport-level interaction
767/// with the remote WebSocket endpoint.
768pub struct WebSocketClientDriver {
769    // The underlying WebSocket network connection.
770    stream: WebSocketStream<ConnectStream>,
771    // Facilitates routing of events to their respective subscriptions.
772    router: SubscriptionRouter,
773    // How we receive incoming commands from the WebSocketClient.
774    cmd_rx: ChannelRx<DriverCommand>,
775    // Commands we've received but have not yet completed, indexed by their ID.
776    // A Terminate command is executed immediately.
777    pending_commands: HashMap<SubscriptionId, DriverCommand>,
778    // The compatibility mode directing how to parse subscription events.
779    compat: CompatMode,
780}
781
782impl WebSocketClientDriver {
783    fn new(
784        stream: WebSocketStream<ConnectStream>,
785        cmd_rx: ChannelRx<DriverCommand>,
786        compat: CompatMode,
787    ) -> Self {
788        Self {
789            stream,
790            router: SubscriptionRouter::default(),
791            cmd_rx,
792            pending_commands: HashMap::new(),
793            compat,
794        }
795    }
796
797    async fn send_msg(&mut self, msg: Message) -> Result<(), Error> {
798        self.stream.send(msg).await.map_err(|e| {
799            Error::web_socket("failed to write to WebSocket connection".to_string(), e)
800        })
801    }
802
803    async fn simple_request(&mut self, cmd: SimpleRequestCommand) -> Result<(), Error> {
804        if let Err(e) = self
805            .send_msg(Message::Text(cmd.wrapped_request.clone()))
806            .await
807        {
808            cmd.response_tx.send(Err(e.clone()))?;
809            return Err(e);
810        }
811        self.pending_commands
812            .insert(cmd.id.clone(), DriverCommand::SimpleRequest(cmd));
813        Ok(())
814    }
815
816    /// Executes the WebSocket driver, which manages the underlying WebSocket
817    /// transport.
818    pub async fn run(mut self) -> Result<(), Error> {
819        let mut ping_interval =
820            tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
821
822        let recv_timeout = tokio::time::sleep(RECV_TIMEOUT);
823        tokio::pin!(recv_timeout);
824
825        loop {
826            tokio::select! {
827                Some(res) = self.stream.next() => match res {
828                    Ok(msg) => {
829                        // Reset the receive timeout every time we successfully
830                        // receive a message from the remote endpoint.
831                        recv_timeout.as_mut().reset(Instant::now().add(RECV_TIMEOUT));
832                        self.handle_incoming_msg(msg).await?
833                    },
834                    Err(e) => return Err(
835                        Error::web_socket(
836                            "failed to read from WebSocket connection".to_string(),
837                            e
838                        ),
839                    ),
840                },
841                Some(cmd) = self.cmd_rx.recv() => match cmd {
842                    DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?,
843                    DriverCommand::Unsubscribe(unsubs_cmd) => self.unsubscribe(unsubs_cmd).await?,
844                    DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?,
845                    DriverCommand::Terminate => return self.close().await,
846                },
847                _ = ping_interval.tick() => self.ping().await?,
848                _ = &mut recv_timeout => {
849                    return Err(Error::web_socket_timeout(RECV_TIMEOUT));
850                }
851            }
852        }
853    }
854
855    async fn send_request<R>(&mut self, wrapper: Wrapper<R>) -> Result<(), Error>
856    where
857        R: Request,
858    {
859        self.send_msg(Message::Text(
860            serde_json::to_string_pretty(&wrapper).unwrap(),
861        ))
862        .await
863    }
864
865    async fn subscribe(&mut self, cmd: SubscribeCommand) -> Result<(), Error> {
866        // If we already have an active subscription for the given query,
867        // there's no need to initiate another one. Just add this subscription
868        // to the router.
869        if self.router.num_subscriptions_for_query(cmd.query.clone()) > 0 {
870            let (id, query, subscription_tx, response_tx) =
871                (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx);
872            self.router.add(id, query, subscription_tx);
873            return response_tx.send(Ok(()));
874        }
875
876        // Otherwise, we need to initiate a subscription request.
877        let wrapper = Wrapper::new_with_id(
878            Id::Str(cmd.id.clone()),
879            subscribe::Request::new(cmd.query.clone()),
880        );
881        if let Err(e) = self.send_request(wrapper).await {
882            cmd.response_tx.send(Err(e.clone()))?;
883            return Err(e);
884        }
885        self.pending_commands
886            .insert(cmd.id.clone(), DriverCommand::Subscribe(cmd));
887        Ok(())
888    }
889
890    async fn unsubscribe(&mut self, cmd: UnsubscribeCommand) -> Result<(), Error> {
891        // Terminate all subscriptions for this query immediately. This
892        // prioritizes acknowledgement of the caller's wishes over networking
893        // problems.
894        if self.router.remove_by_query(cmd.query.clone()) == 0 {
895            // If there were no subscriptions for this query, respond
896            // immediately.
897            cmd.response_tx.send(Ok(()))?;
898            return Ok(());
899        }
900
901        // Unsubscribe requests can (and probably should) have distinct
902        // JSON-RPC IDs as compared to their subscription IDs.
903        let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.clone()));
904        let req_id = wrapper.id().clone();
905        if let Err(e) = self.send_request(wrapper).await {
906            cmd.response_tx.send(Err(e.clone()))?;
907            return Err(e);
908        }
909        self.pending_commands
910            .insert(req_id.to_string(), DriverCommand::Unsubscribe(cmd));
911        Ok(())
912    }
913
914    async fn handle_incoming_msg(&mut self, msg: Message) -> Result<(), Error> {
915        match msg {
916            Message::Text(s) => self.handle_text_msg(s).await,
917            Message::Ping(v) => self.pong(v).await,
918            _ => Ok(()),
919        }
920    }
921
922    async fn handle_text_msg(&mut self, msg: String) -> Result<(), Error> {
923        let parse_res = match self.compat {
924            CompatMode::V0_38 => event::v0_38::DeEvent::from_string(&msg).map(Into::into),
925            CompatMode::V0_37 => event::v0_37::DeEvent::from_string(&msg).map(Into::into),
926            CompatMode::V0_34 => event::v0_34::DeEvent::from_string(&msg).map(Into::into),
927        };
928
929        if let Ok(ev) = parse_res {
930            debug!("JSON-RPC event: {}", msg);
931            self.publish_event(ev).await;
932            return Ok(());
933        }
934
935        let wrapper: response::Wrapper<GenericJsonResponse> = match serde_json::from_str(&msg) {
936            Ok(w) => w,
937            Err(e) => {
938                error!(
939                    "Failed to deserialize incoming message as a JSON-RPC message: {}",
940                    e
941                );
942
943                debug!("JSON-RPC message: {}", msg);
944
945                return Ok(());
946            },
947        };
948
949        debug!("Generic JSON-RPC message: {:?}", wrapper);
950
951        let id = wrapper.id().to_string();
952
953        if let Some(e) = wrapper.into_error() {
954            self.publish_error(&id, e).await;
955        }
956
957        if let Some(pending_cmd) = self.pending_commands.remove(&id) {
958            self.respond_to_pending_command(pending_cmd, msg).await?;
959        };
960
961        // We ignore incoming messages whose ID we don't recognize (could be
962        // relating to a fire-and-forget unsubscribe request - see the
963        // publish_event() method below).
964        Ok(())
965    }
966
967    async fn publish_error(&mut self, id: SubscriptionIdRef<'_>, err: Error) {
968        if let PublishResult::AllDisconnected(query) = self.router.publish_error(id, err) {
969            debug!(
970                "All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
971                query
972            );
973
974            // If all subscribers have disconnected for this query, we need to
975            // unsubscribe from it. We issue a fire-and-forget unsubscribe
976            // message.
977            if let Err(e) = self
978                .send_request(Wrapper::new(unsubscribe::Request::new(query)))
979                .await
980            {
981                error!("Failed to send unsubscribe request: {}", e);
982            }
983        }
984    }
985
986    async fn publish_event(&mut self, ev: Event) {
987        if let PublishResult::AllDisconnected(query) = self.router.publish_event(ev) {
988            debug!(
989                "All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
990                query
991            );
992
993            // If all subscribers have disconnected for this query, we need to
994            // unsubscribe from it. We issue a fire-and-forget unsubscribe
995            // message.
996            if let Err(e) = self
997                .send_request(Wrapper::new(unsubscribe::Request::new(query)))
998                .await
999            {
1000                error!("Failed to send unsubscribe request: {}", e);
1001            }
1002        }
1003    }
1004
1005    async fn respond_to_pending_command(
1006        &mut self,
1007        pending_cmd: DriverCommand,
1008        response: String,
1009    ) -> Result<(), Error> {
1010        match pending_cmd {
1011            DriverCommand::Subscribe(cmd) => {
1012                let (id, query, subscription_tx, response_tx) =
1013                    (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx);
1014                self.router.add(id, query, subscription_tx);
1015                response_tx.send(Ok(()))
1016            },
1017            DriverCommand::Unsubscribe(cmd) => cmd.response_tx.send(Ok(())),
1018            DriverCommand::SimpleRequest(cmd) => cmd.response_tx.send(Ok(response)),
1019            _ => Ok(()),
1020        }
1021    }
1022
1023    async fn pong(&mut self, v: Vec<u8>) -> Result<(), Error> {
1024        self.send_msg(Message::Pong(v)).await
1025    }
1026
1027    async fn ping(&mut self) -> Result<(), Error> {
1028        self.send_msg(Message::Ping(Vec::new())).await
1029    }
1030
1031    async fn close(mut self) -> Result<(), Error> {
1032        self.send_msg(Message::Close(Some(CloseFrame {
1033            code: CloseCode::Normal,
1034            reason: Cow::from("client closed WebSocket connection"),
1035        })))
1036        .await?;
1037
1038        while let Some(res) = self.stream.next().await {
1039            if res.is_err() {
1040                return Ok(());
1041            }
1042        }
1043        Ok(())
1044    }
1045}
1046
1047#[cfg(test)]
1048mod test {
1049    use alloc::collections::BTreeMap as HashMap;
1050    use std::{path::PathBuf, println};
1051
1052    use async_tungstenite::{
1053        tokio::{accept_async, TokioAdapter},
1054        tungstenite::client::IntoClientRequest,
1055    };
1056    use http::{header::AUTHORIZATION, Uri};
1057    use tokio::{
1058        fs,
1059        net::{TcpListener, TcpStream},
1060        task::JoinHandle,
1061    };
1062
1063    use super::*;
1064    use crate::{client::sync::unbounded, query::EventType, request, Method};
1065
1066    // Interface to a driver that manages all incoming WebSocket connections.
1067    struct TestServer {
1068        node_addr: net::Address,
1069        driver_hdl: JoinHandle<Result<(), Error>>,
1070        terminate_tx: ChannelTx<Result<(), Error>>,
1071        event_tx: ChannelTx<Event>,
1072    }
1073
1074    // A setting telling which of the CometBFT server versions to emulate
1075    // with the test server.
1076    #[derive(Copy, Clone)]
1077    enum TestRpcVersion {
1078        V0_34,
1079        V0_37,
1080        V0_38,
1081    }
1082
1083    impl TestServer {
1084        async fn new(addr: &str, version: TestRpcVersion) -> Self {
1085            let listener = TcpListener::bind(addr).await.unwrap();
1086            let local_addr = listener.local_addr().unwrap();
1087            let node_addr = net::Address::Tcp {
1088                peer_id: None,
1089                host: local_addr.ip().to_string(),
1090                port: local_addr.port(),
1091            };
1092            let (terminate_tx, terminate_rx) = unbounded();
1093            let (event_tx, event_rx) = unbounded();
1094            let driver = TestServerDriver::new(listener, version, event_rx, terminate_rx);
1095            let driver_hdl = tokio::spawn(async move { driver.run().await });
1096            Self {
1097                node_addr,
1098                driver_hdl,
1099                terminate_tx,
1100                event_tx,
1101            }
1102        }
1103
1104        fn publish_event(&mut self, ev: Event) -> Result<(), Error> {
1105            self.event_tx.send(ev)
1106        }
1107
1108        async fn terminate(self) -> Result<(), Error> {
1109            self.terminate_tx.send(Ok(())).unwrap();
1110            self.driver_hdl.await.unwrap()
1111        }
1112    }
1113
1114    // Manages all incoming WebSocket connections.
1115    struct TestServerDriver {
1116        listener: TcpListener,
1117        version: TestRpcVersion,
1118        event_rx: ChannelRx<Event>,
1119        terminate_rx: ChannelRx<Result<(), Error>>,
1120        handlers: Vec<TestServerHandler>,
1121    }
1122
1123    impl TestServerDriver {
1124        fn new(
1125            listener: TcpListener,
1126            version: TestRpcVersion,
1127            event_rx: ChannelRx<Event>,
1128            terminate_rx: ChannelRx<Result<(), Error>>,
1129        ) -> Self {
1130            Self {
1131                listener,
1132                version,
1133                event_rx,
1134                terminate_rx,
1135                handlers: Vec::new(),
1136            }
1137        }
1138
1139        async fn run(mut self) -> Result<(), Error> {
1140            loop {
1141                tokio::select! {
1142                    Some(ev) = self.event_rx.recv() => self.publish_event(ev),
1143                    res = self.listener.accept() => {
1144                        let (stream, _) = res.unwrap();
1145                        self.handle_incoming(stream).await
1146                    }
1147                    Some(res) = self.terminate_rx.recv() => {
1148                        self.terminate().await;
1149                        return res;
1150                    },
1151                }
1152            }
1153        }
1154
1155        // Publishes the given event to all subscribers for the query relating
1156        // to the event.
1157        fn publish_event(&mut self, ev: Event) {
1158            for handler in &mut self.handlers {
1159                handler.publish_event(ev.clone());
1160            }
1161        }
1162
1163        async fn handle_incoming(&mut self, stream: TcpStream) {
1164            self.handlers
1165                .push(TestServerHandler::new(stream, self.version).await);
1166        }
1167
1168        async fn terminate(&mut self) {
1169            while !self.handlers.is_empty() {
1170                let handler = match self.handlers.pop() {
1171                    Some(h) => h,
1172                    None => break,
1173                };
1174                let _ = handler.terminate().await;
1175            }
1176        }
1177    }
1178
1179    // Interface to a driver that manages a single incoming WebSocket
1180    // connection.
1181    struct TestServerHandler {
1182        driver_hdl: JoinHandle<Result<(), Error>>,
1183        terminate_tx: ChannelTx<Result<(), Error>>,
1184        event_tx: ChannelTx<Event>,
1185    }
1186
1187    impl TestServerHandler {
1188        async fn new(stream: TcpStream, version: TestRpcVersion) -> Self {
1189            let conn: WebSocketStream<TokioAdapter<TcpStream>> =
1190                accept_async(stream).await.unwrap();
1191            let (terminate_tx, terminate_rx) = unbounded();
1192            let (event_tx, event_rx) = unbounded();
1193            let driver = TestServerHandlerDriver::new(conn, version, event_rx, terminate_rx);
1194            let driver_hdl = tokio::spawn(async move { driver.run().await });
1195            Self {
1196                driver_hdl,
1197                terminate_tx,
1198                event_tx,
1199            }
1200        }
1201
1202        fn publish_event(&mut self, ev: Event) {
1203            let _ = self.event_tx.send(ev);
1204        }
1205
1206        async fn terminate(self) -> Result<(), Error> {
1207            self.terminate_tx.send(Ok(()))?;
1208            self.driver_hdl.await.unwrap()
1209        }
1210    }
1211
1212    // Manages interaction with a single incoming WebSocket connection.
1213    struct TestServerHandlerDriver {
1214        conn: WebSocketStream<TokioAdapter<TcpStream>>,
1215        version: TestRpcVersion,
1216        event_rx: ChannelRx<Event>,
1217        terminate_rx: ChannelRx<Result<(), Error>>,
1218        // A mapping of subscription queries to subscription IDs for this
1219        // connection.
1220        subscriptions: HashMap<String, String>,
1221    }
1222
1223    impl TestServerHandlerDriver {
1224        fn new(
1225            conn: WebSocketStream<TokioAdapter<TcpStream>>,
1226            version: TestRpcVersion,
1227            event_rx: ChannelRx<Event>,
1228            terminate_rx: ChannelRx<Result<(), Error>>,
1229        ) -> Self {
1230            Self {
1231                conn,
1232                version,
1233                event_rx,
1234                terminate_rx,
1235                subscriptions: HashMap::new(),
1236            }
1237        }
1238
1239        async fn run(mut self) -> Result<(), Error> {
1240            loop {
1241                tokio::select! {
1242                    Some(msg) = self.conn.next() => {
1243                        if let Some(ret) = self.handle_incoming_msg(msg.unwrap()).await {
1244                            return ret;
1245                        }
1246                    }
1247                    Some(ev) = self.event_rx.recv() => self.publish_event(ev).await,
1248                    Some(res) = self.terminate_rx.recv() => {
1249                        self.terminate().await;
1250                        return res;
1251                    },
1252                }
1253            }
1254        }
1255
1256        async fn publish_event(&mut self, ev: Event) {
1257            let subs_id = match self.subscriptions.get(&ev.query) {
1258                Some(id) => Id::Str(id.clone()),
1259                None => return,
1260            };
1261            match self.version {
1262                TestRpcVersion::V0_38 => {
1263                    let ev: event::v0_38::SerEvent = ev.into();
1264                    self.send(subs_id, ev).await;
1265                },
1266                TestRpcVersion::V0_37 => {
1267                    let ev: event::v0_37::SerEvent = ev.into();
1268                    self.send(subs_id, ev).await;
1269                },
1270                TestRpcVersion::V0_34 => {
1271                    let ev: event::v0_34::SerEvent = ev.into();
1272                    self.send(subs_id, ev).await;
1273                },
1274            }
1275        }
1276
1277        async fn handle_incoming_msg(&mut self, msg: Message) -> Option<Result<(), Error>> {
1278            match msg {
1279                Message::Text(s) => self.handle_incoming_text_msg(s).await,
1280                Message::Ping(v) => {
1281                    let _ = self.conn.send(Message::Pong(v)).await;
1282                    None
1283                },
1284                Message::Close(_) => {
1285                    self.terminate().await;
1286                    Some(Ok(()))
1287                },
1288                _ => None,
1289            }
1290        }
1291
1292        async fn handle_incoming_text_msg(&mut self, msg: String) -> Option<Result<(), Error>> {
1293            match serde_json::from_str::<serde_json::Value>(&msg) {
1294                Ok(json_msg) => {
1295                    if let Some(json_method) = json_msg.get("method") {
1296                        match Method::from_str(json_method.as_str().unwrap()) {
1297                            Ok(method) => match method {
1298                                Method::Subscribe => {
1299                                    let req = serde_json::from_str::<
1300                                        request::Wrapper<subscribe::Request>,
1301                                    >(&msg)
1302                                    .unwrap();
1303
1304                                    self.add_subscription(
1305                                        req.params().query.clone(),
1306                                        req.id().to_string(),
1307                                    );
1308                                    self.send(req.id().clone(), subscribe::Response {}).await;
1309                                },
1310                                Method::Unsubscribe => {
1311                                    let req = serde_json::from_str::<
1312                                        request::Wrapper<unsubscribe::Request>,
1313                                    >(&msg)
1314                                    .unwrap();
1315
1316                                    self.remove_subscription(req.params().query.clone());
1317                                    self.send(req.id().clone(), unsubscribe::Response {}).await;
1318                                },
1319                                _ => {
1320                                    println!("Unsupported method in incoming request: {}", &method);
1321                                },
1322                            },
1323                            Err(e) => {
1324                                println!(
1325                                    "Unexpected method in incoming request: {json_method} ({e})"
1326                                );
1327                            },
1328                        }
1329                    }
1330                },
1331                Err(e) => {
1332                    println!("Failed to parse incoming request: {} ({})", &msg, e);
1333                },
1334            }
1335            None
1336        }
1337
1338        fn add_subscription(&mut self, query: String, id: String) {
1339            println!("Adding subscription with ID {} for query: {}", &id, &query);
1340            self.subscriptions.insert(query, id);
1341        }
1342
1343        fn remove_subscription(&mut self, query: String) {
1344            if let Some(id) = self.subscriptions.remove(&query) {
1345                println!("Removed subscription {id} for query: {query}");
1346            }
1347        }
1348
1349        async fn send<R>(&mut self, id: Id, res: R)
1350        where
1351            R: Serialize,
1352        {
1353            self.conn
1354                .send(Message::Text(
1355                    serde_json::to_string(&response::Wrapper::new_with_id(id, Some(res), None))
1356                        .unwrap(),
1357                ))
1358                .await
1359                .unwrap();
1360        }
1361
1362        async fn terminate(&mut self) {
1363            let _ = self
1364                .conn
1365                .close(Some(CloseFrame {
1366                    code: CloseCode::Normal,
1367                    reason: Default::default(),
1368                }))
1369                .await;
1370        }
1371    }
1372
1373    async fn read_json_fixture(version: &str, name: &str) -> String {
1374        fs::read_to_string(
1375            PathBuf::from("./tests/kvstore_fixtures")
1376                .join(version)
1377                .join("incoming")
1378                .join(name.to_owned() + ".json"),
1379        )
1380        .await
1381        .unwrap()
1382    }
1383
1384    mod v0_34 {
1385        use super::*;
1386        use crate::event::v0_34::DeEvent;
1387
1388        async fn read_event(name: &str) -> Event {
1389            DeEvent::from_string(read_json_fixture("v0_34", name).await)
1390                .unwrap()
1391                .into()
1392        }
1393
1394        #[tokio::test]
1395        async fn websocket_client_happy_path() {
1396            let event1 = read_event("subscribe_newblock_0").await;
1397            let event2 = read_event("subscribe_newblock_1").await;
1398            let event3 = read_event("subscribe_newblock_2").await;
1399            let test_events = vec![event1, event2, event3];
1400
1401            println!("Starting WebSocket server...");
1402            let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_34).await;
1403            println!("Creating client RPC WebSocket connection...");
1404            let url = server.node_addr.clone().try_into().unwrap();
1405            let (client, driver) = WebSocketClient::builder(url)
1406                .compat_mode(CompatMode::V0_34)
1407                .build()
1408                .await
1409                .unwrap();
1410            let driver_handle = tokio::spawn(async move { driver.run().await });
1411
1412            println!("Initiating subscription for new blocks...");
1413            let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1414
1415            // Collect all the events from the subscription.
1416            let subs_collector_hdl = tokio::spawn(async move {
1417                let mut results = Vec::new();
1418                while let Some(res) = subs.next().await {
1419                    results.push(res);
1420                    if results.len() == 3 {
1421                        break;
1422                    }
1423                }
1424                results
1425            });
1426
1427            println!("Publishing events");
1428            // Publish the events from this context
1429            for ev in &test_events {
1430                server.publish_event(ev.clone()).unwrap();
1431            }
1432
1433            println!("Collecting results from subscription...");
1434            let collected_results = subs_collector_hdl.await.unwrap();
1435
1436            client.close().unwrap();
1437            server.terminate().await.unwrap();
1438            let _ = driver_handle.await.unwrap();
1439            println!("Closed client and terminated server");
1440
1441            assert_eq!(3, collected_results.len());
1442            for i in 0..3 {
1443                assert_eq!(
1444                    test_events[i],
1445                    collected_results[i].as_ref().unwrap().clone()
1446                );
1447            }
1448        }
1449    }
1450
1451    mod v0_37 {
1452        use super::*;
1453        use crate::event::v0_37::DeEvent;
1454
1455        async fn read_event(name: &str) -> Event {
1456            DeEvent::from_string(read_json_fixture("v0_37", name).await)
1457                .unwrap()
1458                .into()
1459        }
1460
1461        #[tokio::test]
1462        async fn websocket_client_happy_path() {
1463            let event1 = read_event("subscribe_newblock_0").await;
1464            let event2 = read_event("subscribe_newblock_1").await;
1465            let event3 = read_event("subscribe_newblock_2").await;
1466            let test_events = vec![event1, event2, event3];
1467
1468            println!("Starting WebSocket server...");
1469            let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_37).await;
1470            println!("Creating client RPC WebSocket connection...");
1471            let url = server.node_addr.clone().try_into().unwrap();
1472            let (client, driver) = WebSocketClient::builder(url)
1473                .compat_mode(CompatMode::V0_37)
1474                .build()
1475                .await
1476                .unwrap();
1477            let driver_handle = tokio::spawn(async move { driver.run().await });
1478
1479            println!("Initiating subscription for new blocks...");
1480            let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1481
1482            // Collect all the events from the subscription.
1483            let subs_collector_hdl = tokio::spawn(async move {
1484                let mut results = Vec::new();
1485                while let Some(res) = subs.next().await {
1486                    results.push(res);
1487                    if results.len() == 3 {
1488                        break;
1489                    }
1490                }
1491                results
1492            });
1493
1494            println!("Publishing events");
1495            // Publish the events from this context
1496            for ev in &test_events {
1497                server.publish_event(ev.clone()).unwrap();
1498            }
1499
1500            println!("Collecting results from subscription...");
1501            let collected_results = subs_collector_hdl.await.unwrap();
1502
1503            client.close().unwrap();
1504            server.terminate().await.unwrap();
1505            let _ = driver_handle.await.unwrap();
1506            println!("Closed client and terminated server");
1507
1508            assert_eq!(3, collected_results.len());
1509            for i in 0..3 {
1510                assert_eq!(
1511                    test_events[i],
1512                    collected_results[i].as_ref().unwrap().clone()
1513                );
1514            }
1515        }
1516    }
1517
1518    mod v0_38 {
1519        use super::*;
1520        use crate::event::v0_38::DeEvent;
1521
1522        async fn read_event(name: &str) -> Event {
1523            DeEvent::from_string(read_json_fixture("v0_38", name).await)
1524                .unwrap()
1525                .into()
1526        }
1527
1528        #[tokio::test]
1529        async fn websocket_client_happy_path() {
1530            let event1 = read_event("subscribe_newblock_0").await;
1531            let event2 = read_event("subscribe_newblock_1").await;
1532            let event3 = read_event("subscribe_newblock_2").await;
1533            let test_events = vec![event1, event2, event3];
1534
1535            println!("Starting WebSocket server...");
1536            let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_38).await;
1537            println!("Creating client RPC WebSocket connection...");
1538            let url = server.node_addr.clone().try_into().unwrap();
1539            let (client, driver) = WebSocketClient::builder(url)
1540                .compat_mode(CompatMode::V0_37)
1541                .build()
1542                .await
1543                .unwrap();
1544            let driver_handle = tokio::spawn(async move { driver.run().await });
1545
1546            println!("Initiating subscription for new blocks...");
1547            let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1548
1549            // Collect all the events from the subscription.
1550            let subs_collector_hdl = tokio::spawn(async move {
1551                let mut results = Vec::new();
1552                while let Some(res) = subs.next().await {
1553                    results.push(res);
1554                    if results.len() == 3 {
1555                        break;
1556                    }
1557                }
1558                results
1559            });
1560
1561            println!("Publishing events");
1562            // Publish the events from this context
1563            for ev in &test_events {
1564                server.publish_event(ev.clone()).unwrap();
1565            }
1566
1567            println!("Collecting results from subscription...");
1568            let collected_results = subs_collector_hdl.await.unwrap();
1569
1570            client.close().unwrap();
1571            server.terminate().await.unwrap();
1572            let _ = driver_handle.await.unwrap();
1573            println!("Closed client and terminated server");
1574
1575            assert_eq!(3, collected_results.len());
1576            for i in 0..3 {
1577                assert_eq!(
1578                    test_events[i],
1579                    collected_results[i].as_ref().unwrap().clone()
1580                );
1581            }
1582        }
1583    }
1584
1585    fn authorization(req: &http::Request<()>) -> Option<&str> {
1586        req.headers()
1587            .get(AUTHORIZATION)
1588            .map(|h| h.to_str().unwrap())
1589    }
1590
1591    #[test]
1592    fn without_basic_auth() {
1593        let uri = Uri::from_str("http://example.com").unwrap();
1594        let req = uri.into_client_request().unwrap();
1595
1596        assert_eq!(authorization(&req), None);
1597    }
1598
1599    #[test]
1600    fn with_basic_auth() {
1601        let uri = Uri::from_str("http://toto:tata@example.com").unwrap();
1602        let req = uri.into_client_request().unwrap();
1603
1604        assert_eq!(authorization(&req), None);
1605    }
1606}