1use alloc::{borrow::Cow, collections::BTreeMap as HashMap, fmt};
4use core::{ops::Add, str::FromStr};
5
6use async_trait::async_trait;
7use async_tungstenite::{
8 tokio::ConnectStream,
9 tungstenite::{
10 protocol::{frame::coding::CloseCode, CloseFrame},
11 Message,
12 },
13 WebSocketStream,
14};
15use futures::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::time::{Duration, Instant};
18use tracing::{debug, error};
19
20use tendermint::{block::Height, Hash};
21use tendermint_config::net;
22
23use super::router::{SubscriptionId, SubscriptionIdRef};
24use crate::{
25 client::{
26 subscription::SubscriptionTx,
27 sync::{ChannelRx, ChannelTx},
28 transport::router::{PublishResult, SubscriptionRouter},
29 Client, CompatMode,
30 },
31 dialect::{v0_34, Dialect, LatestDialect},
32 endpoint::{self, subscribe, unsubscribe},
33 error::Error,
34 event::{self, Event},
35 prelude::*,
36 query::Query,
37 request::Wrapper,
38 response, Id, Order, Request, Response, Scheme, SimpleRequest, Subscription,
39 SubscriptionClient, Url,
40};
41
42const RECV_TIMEOUT_SECONDS: u64 = 30;
47
48const RECV_TIMEOUT: Duration = Duration::from_secs(RECV_TIMEOUT_SECONDS);
49
50const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / 10);
54
55pub use async_tungstenite::tungstenite::protocol::WebSocketConfig;
57
58#[derive(Debug, Clone)]
138pub struct WebSocketClient {
139 inner: sealed::WebSocketClient,
140 compat: CompatMode,
141}
142
143pub struct Builder {
145 url: WebSocketClientUrl,
146 compat: CompatMode,
147 transport_config: Option<WebSocketConfig>,
148}
149
150impl Builder {
151 pub fn compat_mode(mut self, mode: CompatMode) -> Self {
155 self.compat = mode;
156 self
157 }
158
159 pub fn config(mut self, config: WebSocketConfig) -> Self {
161 self.transport_config = Some(config);
162 self
163 }
164
165 pub async fn build(self) -> Result<(WebSocketClient, WebSocketClientDriver), Error> {
167 let url = self.url.0;
168 let compat = self.compat;
169 let (inner, driver) = if url.is_secure() {
170 sealed::WebSocketClient::new_secure(url, compat, self.transport_config).await?
171 } else {
172 sealed::WebSocketClient::new_unsecure(url, compat, self.transport_config).await?
173 };
174
175 Ok((WebSocketClient { inner, compat }, driver))
176 }
177}
178
179impl WebSocketClient {
180 pub async fn new<U>(url: U) -> Result<(Self, WebSocketClientDriver), Error>
185 where
186 U: TryInto<WebSocketClientUrl, Error = Error>,
187 {
188 let url = url.try_into()?;
189 Self::builder(url).build().await
190 }
191
192 pub async fn new_with_config<U>(
197 url: U,
198 config: WebSocketConfig,
199 ) -> Result<(Self, WebSocketClientDriver), Error>
200 where
201 U: TryInto<WebSocketClientUrl, Error = Error>,
202 {
203 let url = url.try_into()?;
204 Self::builder(url).config(config).build().await
205 }
206
207 pub fn builder(url: WebSocketClientUrl) -> Builder {
212 Builder {
213 url,
214 compat: Default::default(),
215 transport_config: Default::default(),
216 }
217 }
218
219 async fn perform_with_dialect<R, S>(&self, request: R, dialect: S) -> Result<R::Output, Error>
220 where
221 R: SimpleRequest<S>,
222 S: Dialect,
223 {
224 self.inner.perform(request, dialect).await
225 }
226}
227
228#[async_trait]
229impl Client for WebSocketClient {
230 async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
231 where
232 R: SimpleRequest,
233 {
234 self.perform_with_dialect(request, LatestDialect).await
235 }
236
237 async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
238 where
239 H: Into<Height> + Send,
240 {
241 perform_with_compat!(self, endpoint::block::Request::new(height.into()))
242 }
243
244 async fn block_by_hash(
245 &self,
246 hash: tendermint::Hash,
247 ) -> Result<endpoint::block_by_hash::Response, Error> {
248 perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
249 }
250
251 async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
252 perform_with_compat!(self, endpoint::block::Request::default())
253 }
254
255 async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
256 where
257 H: Into<Height> + Send,
258 {
259 perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
260 }
261
262 async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
263 perform_with_compat!(self, endpoint::block_results::Request::default())
264 }
265
266 async fn block_search(
267 &self,
268 query: Query,
269 page: u32,
270 per_page: u8,
271 order: Order,
272 ) -> Result<endpoint::block_search::Response, Error> {
273 perform_with_compat!(
274 self,
275 endpoint::block_search::Request::new(query, page, per_page, order)
276 )
277 }
278
279 async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
280 where
281 H: Into<Height> + Send,
282 {
283 let height = height.into();
284 match self.compat {
285 CompatMode::V0_38 => self.perform(endpoint::header::Request::new(height)).await,
286 CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
287 CompatMode::V0_34 => {
288 let resp = self
291 .perform_with_dialect(endpoint::block::Request::new(height), v0_34::Dialect)
292 .await?;
293 Ok(resp.into())
294 },
295 }
296 }
297
298 async fn header_by_hash(
299 &self,
300 hash: Hash,
301 ) -> Result<endpoint::header_by_hash::Response, Error> {
302 match self.compat {
303 CompatMode::V0_38 => {
304 self.perform(endpoint::header_by_hash::Request::new(hash))
305 .await
306 },
307 CompatMode::V0_37 => {
308 self.perform(endpoint::header_by_hash::Request::new(hash))
309 .await
310 },
311 CompatMode::V0_34 => {
312 let resp = self
315 .perform_with_dialect(
316 endpoint::block_by_hash::Request::new(hash),
317 v0_34::Dialect,
318 )
319 .await?;
320 Ok(resp.into())
321 },
322 }
323 }
324
325 async fn tx(&self, hash: Hash, prove: bool) -> Result<endpoint::tx::Response, Error> {
326 perform_with_compat!(self, endpoint::tx::Request::new(hash, prove))
327 }
328
329 async fn tx_search(
330 &self,
331 query: Query,
332 prove: bool,
333 page: u32,
334 per_page: u8,
335 order: Order,
336 ) -> Result<endpoint::tx_search::Response, Error> {
337 perform_with_compat!(
338 self,
339 endpoint::tx_search::Request::new(query, prove, page, per_page, order)
340 )
341 }
342
343 async fn broadcast_tx_commit<T>(
344 &self,
345 tx: T,
346 ) -> Result<endpoint::broadcast::tx_commit::Response, Error>
347 where
348 T: Into<Vec<u8>> + Send,
349 {
350 perform_with_compat!(self, endpoint::broadcast::tx_commit::Request::new(tx))
351 }
352}
353
354#[async_trait]
355impl SubscriptionClient for WebSocketClient {
356 async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
357 self.inner.subscribe(query).await
358 }
359
360 async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
361 self.inner.unsubscribe(query).await
362 }
363
364 fn close(self) -> Result<(), Error> {
365 self.inner.close()
366 }
367}
368
369#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
373#[serde(transparent)]
374pub struct WebSocketClientUrl(Url);
375
376impl TryFrom<Url> for WebSocketClientUrl {
377 type Error = Error;
378
379 fn try_from(value: Url) -> Result<Self, Error> {
380 match value.scheme() {
381 Scheme::WebSocket | Scheme::SecureWebSocket => Ok(Self(value)),
382 _ => Err(Error::invalid_params(format!(
383 "cannot use URL {value} with WebSocket clients"
384 ))),
385 }
386 }
387}
388
389impl FromStr for WebSocketClientUrl {
390 type Err = Error;
391
392 fn from_str(s: &str) -> Result<Self, Error> {
393 let url: Url = s.parse()?;
394 url.try_into()
395 }
396}
397
398impl fmt::Display for WebSocketClientUrl {
399 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400 self.0.fmt(f)
401 }
402}
403
404impl TryFrom<&str> for WebSocketClientUrl {
405 type Error = Error;
406
407 fn try_from(value: &str) -> Result<Self, Error> {
408 value.parse()
409 }
410}
411
412impl TryFrom<net::Address> for WebSocketClientUrl {
413 type Error = Error;
414
415 fn try_from(value: net::Address) -> Result<Self, Error> {
416 match value {
417 net::Address::Tcp {
418 peer_id: _,
419 host,
420 port,
421 } => format!("ws://{host}:{port}/websocket").parse(),
422 net::Address::Unix { .. } => Err(Error::invalid_params(
423 "only TCP-based node addresses are supported".to_string(),
424 )),
425 }
426 }
427}
428
429impl From<WebSocketClientUrl> for Url {
430 fn from(url: WebSocketClientUrl) -> Self {
431 url.0
432 }
433}
434
435mod sealed {
436 use async_tungstenite::{
437 tokio::{connect_async_with_config, connect_async_with_tls_connector_and_config},
438 tungstenite::client::IntoClientRequest,
439 };
440 use tracing::debug;
441
442 use super::{
443 DriverCommand, SimpleRequestCommand, SubscribeCommand, UnsubscribeCommand,
444 WebSocketClientDriver, WebSocketConfig,
445 };
446 use crate::{
447 client::{
448 sync::{unbounded, ChannelTx},
449 transport::auth::authorize,
450 CompatMode,
451 },
452 dialect::Dialect,
453 prelude::*,
454 query::Query,
455 request::Wrapper,
456 utils::uuid_str,
457 Error, Response, SimpleRequest, Subscription, Url,
458 };
459
460 #[derive(Debug, Clone)]
463 pub struct Unsecure;
464
465 #[derive(Debug, Clone)]
468 pub struct Secure;
469
470 #[derive(Debug, Clone)]
477 pub struct AsyncTungsteniteClient<C> {
478 cmd_tx: ChannelTx<DriverCommand>,
479 _client_type: core::marker::PhantomData<C>,
480 }
481
482 impl AsyncTungsteniteClient<Unsecure> {
483 pub async fn new(
492 url: Url,
493 compat: CompatMode,
494 config: Option<WebSocketConfig>,
495 ) -> Result<(Self, WebSocketClientDriver), Error> {
496 debug!("Connecting to unsecure WebSocket endpoint: {}", url);
497
498 let (stream, _response) = connect_async_with_config(url, config)
499 .await
500 .map_err(Error::tungstenite)?;
501
502 let (cmd_tx, cmd_rx) = unbounded();
503 let driver = WebSocketClientDriver::new(stream, cmd_rx, compat);
504 let client = Self {
505 cmd_tx,
506 _client_type: Default::default(),
507 };
508
509 Ok((client, driver))
510 }
511 }
512
513 impl AsyncTungsteniteClient<Secure> {
514 pub async fn new(
524 url: Url,
525 compat: CompatMode,
526 config: Option<WebSocketConfig>,
527 ) -> Result<(Self, WebSocketClientDriver), Error> {
528 debug!("Connecting to secure WebSocket endpoint: {}", url);
529
530 let (stream, _response) =
533 connect_async_with_tls_connector_and_config(url, None, config)
534 .await
535 .map_err(Error::tungstenite)?;
536
537 let (cmd_tx, cmd_rx) = unbounded();
538 let driver = WebSocketClientDriver::new(stream, cmd_rx, compat);
539 let client = Self {
540 cmd_tx,
541 _client_type: Default::default(),
542 };
543
544 Ok((client, driver))
545 }
546 }
547
548 impl<C> AsyncTungsteniteClient<C> {
549 fn send_cmd(&self, cmd: DriverCommand) -> Result<(), Error> {
550 self.cmd_tx.send(cmd)
551 }
552
553 pub fn close(self) -> Result<(), Error> {
555 self.send_cmd(DriverCommand::Terminate)
556 }
557 }
558
559 impl<C> AsyncTungsteniteClient<C> {
560 pub async fn perform<R, S>(&self, request: R) -> Result<R::Output, Error>
561 where
562 R: SimpleRequest<S>,
563 S: Dialect,
564 {
565 let wrapper = Wrapper::new(request);
566 let id = wrapper.id().to_string();
567 let wrapped_request = wrapper.into_json();
568
569 tracing::debug!("Outgoing request: {}", wrapped_request);
570
571 let (response_tx, mut response_rx) = unbounded();
572
573 self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand {
574 id,
575 wrapped_request,
576 response_tx,
577 }))?;
578
579 let response = response_rx.recv().await.ok_or_else(|| {
580 Error::client_internal("failed to hear back from WebSocket driver".to_string())
581 })??;
582
583 tracing::debug!("Incoming response: {}", response);
584
585 R::Response::from_string(response).map(Into::into)
586 }
587
588 pub async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
589 let (subscription_tx, subscription_rx) = unbounded();
590 let (response_tx, mut response_rx) = unbounded();
591 let id = uuid_str();
593 self.send_cmd(DriverCommand::Subscribe(SubscribeCommand {
594 id: id.to_string(),
595 query: query.to_string(),
596 subscription_tx,
597 response_tx,
598 }))?;
599 response_rx.recv().await.ok_or_else(|| {
601 Error::client_internal("failed to hear back from WebSocket driver".to_string())
602 })??;
603 Ok(Subscription::new(id, query, subscription_rx))
604 }
605
606 pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
607 let (response_tx, mut response_rx) = unbounded();
608 self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand {
609 query: query.to_string(),
610 response_tx,
611 }))?;
612 response_rx.recv().await.ok_or_else(|| {
613 Error::client_internal("failed to hear back from WebSocket driver".to_string())
614 })??;
615 Ok(())
616 }
617 }
618
619 #[derive(Debug, Clone)]
622 pub enum WebSocketClient {
623 Unsecure(AsyncTungsteniteClient<Unsecure>),
624 Secure(AsyncTungsteniteClient<Secure>),
625 }
626
627 impl WebSocketClient {
628 pub async fn new_unsecure(
629 url: Url,
630 compat: CompatMode,
631 config: Option<WebSocketConfig>,
632 ) -> Result<(Self, WebSocketClientDriver), Error> {
633 let (client, driver) =
634 AsyncTungsteniteClient::<Unsecure>::new(url, compat, config).await?;
635 Ok((Self::Unsecure(client), driver))
636 }
637
638 pub async fn new_secure(
639 url: Url,
640 compat: CompatMode,
641 config: Option<WebSocketConfig>,
642 ) -> Result<(Self, WebSocketClientDriver), Error> {
643 let (client, driver) =
644 AsyncTungsteniteClient::<Secure>::new(url, compat, config).await?;
645 Ok((Self::Secure(client), driver))
646 }
647
648 pub fn close(self) -> Result<(), Error> {
649 match self {
650 WebSocketClient::Unsecure(c) => c.close(),
651 WebSocketClient::Secure(c) => c.close(),
652 }
653 }
654 }
655
656 impl WebSocketClient {
657 pub async fn perform<R, S>(&self, request: R, _dialect: S) -> Result<R::Output, Error>
658 where
659 R: SimpleRequest<S>,
660 S: Dialect,
661 {
662 match self {
663 WebSocketClient::Unsecure(c) => c.perform(request).await,
664 WebSocketClient::Secure(c) => c.perform(request).await,
665 }
666 }
667
668 pub async fn subscribe(&self, query: Query) -> Result<Subscription, Error> {
669 match self {
670 WebSocketClient::Unsecure(c) => c.subscribe(query).await,
671 WebSocketClient::Secure(c) => c.subscribe(query).await,
672 }
673 }
674
675 pub async fn unsubscribe(&self, query: Query) -> Result<(), Error> {
676 match self {
677 WebSocketClient::Unsecure(c) => c.unsubscribe(query).await,
678 WebSocketClient::Secure(c) => c.unsubscribe(query).await,
679 }
680 }
681 }
682
683 use async_tungstenite::tungstenite;
684
685 impl IntoClientRequest for Url {
686 fn into_client_request(
687 self,
688 ) -> tungstenite::Result<tungstenite::handshake::client::Request> {
689 let builder = tungstenite::handshake::client::Request::builder()
690 .method("GET")
691 .header("Host", self.host())
692 .header("Connection", "Upgrade")
693 .header("Upgrade", "websocket")
694 .header("Sec-WebSocket-Version", "13")
695 .header(
696 "Sec-WebSocket-Key",
697 tungstenite::handshake::client::generate_key(),
698 );
699
700 let builder = if let Some(auth) = authorize(self.as_ref()) {
701 builder.header("Authorization", auth.to_string())
702 } else {
703 builder
704 };
705
706 builder
707 .uri(self.to_string())
708 .body(())
709 .map_err(tungstenite::error::Error::HttpFormat)
710 }
711 }
712}
713
714#[derive(Debug, Clone)]
717enum DriverCommand {
718 Subscribe(SubscribeCommand),
720 Unsubscribe(UnsubscribeCommand),
722 SimpleRequest(SimpleRequestCommand),
724 Terminate,
725}
726
727#[derive(Debug, Clone)]
728struct SubscribeCommand {
729 id: String,
731 query: String,
733 subscription_tx: SubscriptionTx,
735 response_tx: ChannelTx<Result<(), Error>>,
737}
738
739#[derive(Debug, Clone)]
740struct UnsubscribeCommand {
741 query: String,
743 response_tx: ChannelTx<Result<(), Error>>,
745}
746
747#[derive(Debug, Clone)]
748struct SimpleRequestCommand {
749 id: String,
753 wrapped_request: String,
755 response_tx: ChannelTx<Result<String, Error>>,
757}
758
759#[derive(Serialize, Deserialize, Debug, Clone)]
760struct GenericJsonResponse(serde_json::Value);
761
762impl Response for GenericJsonResponse {}
763
764pub struct WebSocketClientDriver {
769 stream: WebSocketStream<ConnectStream>,
771 router: SubscriptionRouter,
773 cmd_rx: ChannelRx<DriverCommand>,
775 pending_commands: HashMap<SubscriptionId, DriverCommand>,
778 compat: CompatMode,
780}
781
782impl WebSocketClientDriver {
783 fn new(
784 stream: WebSocketStream<ConnectStream>,
785 cmd_rx: ChannelRx<DriverCommand>,
786 compat: CompatMode,
787 ) -> Self {
788 Self {
789 stream,
790 router: SubscriptionRouter::default(),
791 cmd_rx,
792 pending_commands: HashMap::new(),
793 compat,
794 }
795 }
796
797 async fn send_msg(&mut self, msg: Message) -> Result<(), Error> {
798 self.stream.send(msg).await.map_err(|e| {
799 Error::web_socket("failed to write to WebSocket connection".to_string(), e)
800 })
801 }
802
803 async fn simple_request(&mut self, cmd: SimpleRequestCommand) -> Result<(), Error> {
804 if let Err(e) = self
805 .send_msg(Message::Text(cmd.wrapped_request.clone()))
806 .await
807 {
808 cmd.response_tx.send(Err(e.clone()))?;
809 return Err(e);
810 }
811 self.pending_commands
812 .insert(cmd.id.clone(), DriverCommand::SimpleRequest(cmd));
813 Ok(())
814 }
815
816 pub async fn run(mut self) -> Result<(), Error> {
819 let mut ping_interval =
820 tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
821
822 let recv_timeout = tokio::time::sleep(RECV_TIMEOUT);
823 tokio::pin!(recv_timeout);
824
825 loop {
826 tokio::select! {
827 Some(res) = self.stream.next() => match res {
828 Ok(msg) => {
829 recv_timeout.as_mut().reset(Instant::now().add(RECV_TIMEOUT));
832 self.handle_incoming_msg(msg).await?
833 },
834 Err(e) => return Err(
835 Error::web_socket(
836 "failed to read from WebSocket connection".to_string(),
837 e
838 ),
839 ),
840 },
841 Some(cmd) = self.cmd_rx.recv() => match cmd {
842 DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?,
843 DriverCommand::Unsubscribe(unsubs_cmd) => self.unsubscribe(unsubs_cmd).await?,
844 DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?,
845 DriverCommand::Terminate => return self.close().await,
846 },
847 _ = ping_interval.tick() => self.ping().await?,
848 _ = &mut recv_timeout => {
849 return Err(Error::web_socket_timeout(RECV_TIMEOUT));
850 }
851 }
852 }
853 }
854
855 async fn send_request<R>(&mut self, wrapper: Wrapper<R>) -> Result<(), Error>
856 where
857 R: Request,
858 {
859 self.send_msg(Message::Text(
860 serde_json::to_string_pretty(&wrapper).unwrap(),
861 ))
862 .await
863 }
864
865 async fn subscribe(&mut self, cmd: SubscribeCommand) -> Result<(), Error> {
866 if self.router.num_subscriptions_for_query(cmd.query.clone()) > 0 {
870 let (id, query, subscription_tx, response_tx) =
871 (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx);
872 self.router.add(id, query, subscription_tx);
873 return response_tx.send(Ok(()));
874 }
875
876 let wrapper = Wrapper::new_with_id(
878 Id::Str(cmd.id.clone()),
879 subscribe::Request::new(cmd.query.clone()),
880 );
881 if let Err(e) = self.send_request(wrapper).await {
882 cmd.response_tx.send(Err(e.clone()))?;
883 return Err(e);
884 }
885 self.pending_commands
886 .insert(cmd.id.clone(), DriverCommand::Subscribe(cmd));
887 Ok(())
888 }
889
890 async fn unsubscribe(&mut self, cmd: UnsubscribeCommand) -> Result<(), Error> {
891 if self.router.remove_by_query(cmd.query.clone()) == 0 {
895 cmd.response_tx.send(Ok(()))?;
898 return Ok(());
899 }
900
901 let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.clone()));
904 let req_id = wrapper.id().clone();
905 if let Err(e) = self.send_request(wrapper).await {
906 cmd.response_tx.send(Err(e.clone()))?;
907 return Err(e);
908 }
909 self.pending_commands
910 .insert(req_id.to_string(), DriverCommand::Unsubscribe(cmd));
911 Ok(())
912 }
913
914 async fn handle_incoming_msg(&mut self, msg: Message) -> Result<(), Error> {
915 match msg {
916 Message::Text(s) => self.handle_text_msg(s).await,
917 Message::Ping(v) => self.pong(v).await,
918 _ => Ok(()),
919 }
920 }
921
922 async fn handle_text_msg(&mut self, msg: String) -> Result<(), Error> {
923 let parse_res = match self.compat {
924 CompatMode::V0_38 => event::v0_38::DeEvent::from_string(&msg).map(Into::into),
925 CompatMode::V0_37 => event::v0_37::DeEvent::from_string(&msg).map(Into::into),
926 CompatMode::V0_34 => event::v0_34::DeEvent::from_string(&msg).map(Into::into),
927 };
928
929 if let Ok(ev) = parse_res {
930 debug!("JSON-RPC event: {}", msg);
931 self.publish_event(ev).await;
932 return Ok(());
933 }
934
935 let wrapper: response::Wrapper<GenericJsonResponse> = match serde_json::from_str(&msg) {
936 Ok(w) => w,
937 Err(e) => {
938 error!(
939 "Failed to deserialize incoming message as a JSON-RPC message: {}",
940 e
941 );
942
943 debug!("JSON-RPC message: {}", msg);
944
945 return Ok(());
946 },
947 };
948
949 debug!("Generic JSON-RPC message: {:?}", wrapper);
950
951 let id = wrapper.id().to_string();
952
953 if let Some(e) = wrapper.into_error() {
954 self.publish_error(&id, e).await;
955 }
956
957 if let Some(pending_cmd) = self.pending_commands.remove(&id) {
958 self.respond_to_pending_command(pending_cmd, msg).await?;
959 };
960
961 Ok(())
965 }
966
967 async fn publish_error(&mut self, id: SubscriptionIdRef<'_>, err: Error) {
968 if let PublishResult::AllDisconnected(query) = self.router.publish_error(id, err) {
969 debug!(
970 "All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
971 query
972 );
973
974 if let Err(e) = self
978 .send_request(Wrapper::new(unsubscribe::Request::new(query)))
979 .await
980 {
981 error!("Failed to send unsubscribe request: {}", e);
982 }
983 }
984 }
985
986 async fn publish_event(&mut self, ev: Event) {
987 if let PublishResult::AllDisconnected(query) = self.router.publish_event(ev) {
988 debug!(
989 "All subscribers for query \"{}\" have disconnected. Unsubscribing from query...",
990 query
991 );
992
993 if let Err(e) = self
997 .send_request(Wrapper::new(unsubscribe::Request::new(query)))
998 .await
999 {
1000 error!("Failed to send unsubscribe request: {}", e);
1001 }
1002 }
1003 }
1004
1005 async fn respond_to_pending_command(
1006 &mut self,
1007 pending_cmd: DriverCommand,
1008 response: String,
1009 ) -> Result<(), Error> {
1010 match pending_cmd {
1011 DriverCommand::Subscribe(cmd) => {
1012 let (id, query, subscription_tx, response_tx) =
1013 (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx);
1014 self.router.add(id, query, subscription_tx);
1015 response_tx.send(Ok(()))
1016 },
1017 DriverCommand::Unsubscribe(cmd) => cmd.response_tx.send(Ok(())),
1018 DriverCommand::SimpleRequest(cmd) => cmd.response_tx.send(Ok(response)),
1019 _ => Ok(()),
1020 }
1021 }
1022
1023 async fn pong(&mut self, v: Vec<u8>) -> Result<(), Error> {
1024 self.send_msg(Message::Pong(v)).await
1025 }
1026
1027 async fn ping(&mut self) -> Result<(), Error> {
1028 self.send_msg(Message::Ping(Vec::new())).await
1029 }
1030
1031 async fn close(mut self) -> Result<(), Error> {
1032 self.send_msg(Message::Close(Some(CloseFrame {
1033 code: CloseCode::Normal,
1034 reason: Cow::from("client closed WebSocket connection"),
1035 })))
1036 .await?;
1037
1038 while let Some(res) = self.stream.next().await {
1039 if res.is_err() {
1040 return Ok(());
1041 }
1042 }
1043 Ok(())
1044 }
1045}
1046
1047#[cfg(test)]
1048mod test {
1049 use alloc::collections::BTreeMap as HashMap;
1050 use std::{path::PathBuf, println};
1051
1052 use async_tungstenite::{
1053 tokio::{accept_async, TokioAdapter},
1054 tungstenite::client::IntoClientRequest,
1055 };
1056 use http::{header::AUTHORIZATION, Uri};
1057 use tokio::{
1058 fs,
1059 net::{TcpListener, TcpStream},
1060 task::JoinHandle,
1061 };
1062
1063 use super::*;
1064 use crate::{client::sync::unbounded, query::EventType, request, Method};
1065
1066 struct TestServer {
1068 node_addr: net::Address,
1069 driver_hdl: JoinHandle<Result<(), Error>>,
1070 terminate_tx: ChannelTx<Result<(), Error>>,
1071 event_tx: ChannelTx<Event>,
1072 }
1073
1074 #[derive(Copy, Clone)]
1077 enum TestRpcVersion {
1078 V0_34,
1079 V0_37,
1080 V0_38,
1081 }
1082
1083 impl TestServer {
1084 async fn new(addr: &str, version: TestRpcVersion) -> Self {
1085 let listener = TcpListener::bind(addr).await.unwrap();
1086 let local_addr = listener.local_addr().unwrap();
1087 let node_addr = net::Address::Tcp {
1088 peer_id: None,
1089 host: local_addr.ip().to_string(),
1090 port: local_addr.port(),
1091 };
1092 let (terminate_tx, terminate_rx) = unbounded();
1093 let (event_tx, event_rx) = unbounded();
1094 let driver = TestServerDriver::new(listener, version, event_rx, terminate_rx);
1095 let driver_hdl = tokio::spawn(async move { driver.run().await });
1096 Self {
1097 node_addr,
1098 driver_hdl,
1099 terminate_tx,
1100 event_tx,
1101 }
1102 }
1103
1104 fn publish_event(&mut self, ev: Event) -> Result<(), Error> {
1105 self.event_tx.send(ev)
1106 }
1107
1108 async fn terminate(self) -> Result<(), Error> {
1109 self.terminate_tx.send(Ok(())).unwrap();
1110 self.driver_hdl.await.unwrap()
1111 }
1112 }
1113
1114 struct TestServerDriver {
1116 listener: TcpListener,
1117 version: TestRpcVersion,
1118 event_rx: ChannelRx<Event>,
1119 terminate_rx: ChannelRx<Result<(), Error>>,
1120 handlers: Vec<TestServerHandler>,
1121 }
1122
1123 impl TestServerDriver {
1124 fn new(
1125 listener: TcpListener,
1126 version: TestRpcVersion,
1127 event_rx: ChannelRx<Event>,
1128 terminate_rx: ChannelRx<Result<(), Error>>,
1129 ) -> Self {
1130 Self {
1131 listener,
1132 version,
1133 event_rx,
1134 terminate_rx,
1135 handlers: Vec::new(),
1136 }
1137 }
1138
1139 async fn run(mut self) -> Result<(), Error> {
1140 loop {
1141 tokio::select! {
1142 Some(ev) = self.event_rx.recv() => self.publish_event(ev),
1143 res = self.listener.accept() => {
1144 let (stream, _) = res.unwrap();
1145 self.handle_incoming(stream).await
1146 }
1147 Some(res) = self.terminate_rx.recv() => {
1148 self.terminate().await;
1149 return res;
1150 },
1151 }
1152 }
1153 }
1154
1155 fn publish_event(&mut self, ev: Event) {
1158 for handler in &mut self.handlers {
1159 handler.publish_event(ev.clone());
1160 }
1161 }
1162
1163 async fn handle_incoming(&mut self, stream: TcpStream) {
1164 self.handlers
1165 .push(TestServerHandler::new(stream, self.version).await);
1166 }
1167
1168 async fn terminate(&mut self) {
1169 while !self.handlers.is_empty() {
1170 let handler = match self.handlers.pop() {
1171 Some(h) => h,
1172 None => break,
1173 };
1174 let _ = handler.terminate().await;
1175 }
1176 }
1177 }
1178
1179 struct TestServerHandler {
1182 driver_hdl: JoinHandle<Result<(), Error>>,
1183 terminate_tx: ChannelTx<Result<(), Error>>,
1184 event_tx: ChannelTx<Event>,
1185 }
1186
1187 impl TestServerHandler {
1188 async fn new(stream: TcpStream, version: TestRpcVersion) -> Self {
1189 let conn: WebSocketStream<TokioAdapter<TcpStream>> =
1190 accept_async(stream).await.unwrap();
1191 let (terminate_tx, terminate_rx) = unbounded();
1192 let (event_tx, event_rx) = unbounded();
1193 let driver = TestServerHandlerDriver::new(conn, version, event_rx, terminate_rx);
1194 let driver_hdl = tokio::spawn(async move { driver.run().await });
1195 Self {
1196 driver_hdl,
1197 terminate_tx,
1198 event_tx,
1199 }
1200 }
1201
1202 fn publish_event(&mut self, ev: Event) {
1203 let _ = self.event_tx.send(ev);
1204 }
1205
1206 async fn terminate(self) -> Result<(), Error> {
1207 self.terminate_tx.send(Ok(()))?;
1208 self.driver_hdl.await.unwrap()
1209 }
1210 }
1211
1212 struct TestServerHandlerDriver {
1214 conn: WebSocketStream<TokioAdapter<TcpStream>>,
1215 version: TestRpcVersion,
1216 event_rx: ChannelRx<Event>,
1217 terminate_rx: ChannelRx<Result<(), Error>>,
1218 subscriptions: HashMap<String, String>,
1221 }
1222
1223 impl TestServerHandlerDriver {
1224 fn new(
1225 conn: WebSocketStream<TokioAdapter<TcpStream>>,
1226 version: TestRpcVersion,
1227 event_rx: ChannelRx<Event>,
1228 terminate_rx: ChannelRx<Result<(), Error>>,
1229 ) -> Self {
1230 Self {
1231 conn,
1232 version,
1233 event_rx,
1234 terminate_rx,
1235 subscriptions: HashMap::new(),
1236 }
1237 }
1238
1239 async fn run(mut self) -> Result<(), Error> {
1240 loop {
1241 tokio::select! {
1242 Some(msg) = self.conn.next() => {
1243 if let Some(ret) = self.handle_incoming_msg(msg.unwrap()).await {
1244 return ret;
1245 }
1246 }
1247 Some(ev) = self.event_rx.recv() => self.publish_event(ev).await,
1248 Some(res) = self.terminate_rx.recv() => {
1249 self.terminate().await;
1250 return res;
1251 },
1252 }
1253 }
1254 }
1255
1256 async fn publish_event(&mut self, ev: Event) {
1257 let subs_id = match self.subscriptions.get(&ev.query) {
1258 Some(id) => Id::Str(id.clone()),
1259 None => return,
1260 };
1261 match self.version {
1262 TestRpcVersion::V0_38 => {
1263 let ev: event::v0_38::SerEvent = ev.into();
1264 self.send(subs_id, ev).await;
1265 },
1266 TestRpcVersion::V0_37 => {
1267 let ev: event::v0_37::SerEvent = ev.into();
1268 self.send(subs_id, ev).await;
1269 },
1270 TestRpcVersion::V0_34 => {
1271 let ev: event::v0_34::SerEvent = ev.into();
1272 self.send(subs_id, ev).await;
1273 },
1274 }
1275 }
1276
1277 async fn handle_incoming_msg(&mut self, msg: Message) -> Option<Result<(), Error>> {
1278 match msg {
1279 Message::Text(s) => self.handle_incoming_text_msg(s).await,
1280 Message::Ping(v) => {
1281 let _ = self.conn.send(Message::Pong(v)).await;
1282 None
1283 },
1284 Message::Close(_) => {
1285 self.terminate().await;
1286 Some(Ok(()))
1287 },
1288 _ => None,
1289 }
1290 }
1291
1292 async fn handle_incoming_text_msg(&mut self, msg: String) -> Option<Result<(), Error>> {
1293 match serde_json::from_str::<serde_json::Value>(&msg) {
1294 Ok(json_msg) => {
1295 if let Some(json_method) = json_msg.get("method") {
1296 match Method::from_str(json_method.as_str().unwrap()) {
1297 Ok(method) => match method {
1298 Method::Subscribe => {
1299 let req = serde_json::from_str::<
1300 request::Wrapper<subscribe::Request>,
1301 >(&msg)
1302 .unwrap();
1303
1304 self.add_subscription(
1305 req.params().query.clone(),
1306 req.id().to_string(),
1307 );
1308 self.send(req.id().clone(), subscribe::Response {}).await;
1309 },
1310 Method::Unsubscribe => {
1311 let req = serde_json::from_str::<
1312 request::Wrapper<unsubscribe::Request>,
1313 >(&msg)
1314 .unwrap();
1315
1316 self.remove_subscription(req.params().query.clone());
1317 self.send(req.id().clone(), unsubscribe::Response {}).await;
1318 },
1319 _ => {
1320 println!("Unsupported method in incoming request: {}", &method);
1321 },
1322 },
1323 Err(e) => {
1324 println!(
1325 "Unexpected method in incoming request: {json_method} ({e})"
1326 );
1327 },
1328 }
1329 }
1330 },
1331 Err(e) => {
1332 println!("Failed to parse incoming request: {} ({})", &msg, e);
1333 },
1334 }
1335 None
1336 }
1337
1338 fn add_subscription(&mut self, query: String, id: String) {
1339 println!("Adding subscription with ID {} for query: {}", &id, &query);
1340 self.subscriptions.insert(query, id);
1341 }
1342
1343 fn remove_subscription(&mut self, query: String) {
1344 if let Some(id) = self.subscriptions.remove(&query) {
1345 println!("Removed subscription {id} for query: {query}");
1346 }
1347 }
1348
1349 async fn send<R>(&mut self, id: Id, res: R)
1350 where
1351 R: Serialize,
1352 {
1353 self.conn
1354 .send(Message::Text(
1355 serde_json::to_string(&response::Wrapper::new_with_id(id, Some(res), None))
1356 .unwrap(),
1357 ))
1358 .await
1359 .unwrap();
1360 }
1361
1362 async fn terminate(&mut self) {
1363 let _ = self
1364 .conn
1365 .close(Some(CloseFrame {
1366 code: CloseCode::Normal,
1367 reason: Default::default(),
1368 }))
1369 .await;
1370 }
1371 }
1372
1373 async fn read_json_fixture(version: &str, name: &str) -> String {
1374 fs::read_to_string(
1375 PathBuf::from("./tests/kvstore_fixtures")
1376 .join(version)
1377 .join("incoming")
1378 .join(name.to_owned() + ".json"),
1379 )
1380 .await
1381 .unwrap()
1382 }
1383
1384 mod v0_34 {
1385 use super::*;
1386 use crate::event::v0_34::DeEvent;
1387
1388 async fn read_event(name: &str) -> Event {
1389 DeEvent::from_string(read_json_fixture("v0_34", name).await)
1390 .unwrap()
1391 .into()
1392 }
1393
1394 #[tokio::test]
1395 async fn websocket_client_happy_path() {
1396 let event1 = read_event("subscribe_newblock_0").await;
1397 let event2 = read_event("subscribe_newblock_1").await;
1398 let event3 = read_event("subscribe_newblock_2").await;
1399 let test_events = vec![event1, event2, event3];
1400
1401 println!("Starting WebSocket server...");
1402 let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_34).await;
1403 println!("Creating client RPC WebSocket connection...");
1404 let url = server.node_addr.clone().try_into().unwrap();
1405 let (client, driver) = WebSocketClient::builder(url)
1406 .compat_mode(CompatMode::V0_34)
1407 .build()
1408 .await
1409 .unwrap();
1410 let driver_handle = tokio::spawn(async move { driver.run().await });
1411
1412 println!("Initiating subscription for new blocks...");
1413 let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1414
1415 let subs_collector_hdl = tokio::spawn(async move {
1417 let mut results = Vec::new();
1418 while let Some(res) = subs.next().await {
1419 results.push(res);
1420 if results.len() == 3 {
1421 break;
1422 }
1423 }
1424 results
1425 });
1426
1427 println!("Publishing events");
1428 for ev in &test_events {
1430 server.publish_event(ev.clone()).unwrap();
1431 }
1432
1433 println!("Collecting results from subscription...");
1434 let collected_results = subs_collector_hdl.await.unwrap();
1435
1436 client.close().unwrap();
1437 server.terminate().await.unwrap();
1438 let _ = driver_handle.await.unwrap();
1439 println!("Closed client and terminated server");
1440
1441 assert_eq!(3, collected_results.len());
1442 for i in 0..3 {
1443 assert_eq!(
1444 test_events[i],
1445 collected_results[i].as_ref().unwrap().clone()
1446 );
1447 }
1448 }
1449 }
1450
1451 mod v0_37 {
1452 use super::*;
1453 use crate::event::v0_37::DeEvent;
1454
1455 async fn read_event(name: &str) -> Event {
1456 DeEvent::from_string(read_json_fixture("v0_37", name).await)
1457 .unwrap()
1458 .into()
1459 }
1460
1461 #[tokio::test]
1462 async fn websocket_client_happy_path() {
1463 let event1 = read_event("subscribe_newblock_0").await;
1464 let event2 = read_event("subscribe_newblock_1").await;
1465 let event3 = read_event("subscribe_newblock_2").await;
1466 let test_events = vec![event1, event2, event3];
1467
1468 println!("Starting WebSocket server...");
1469 let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_37).await;
1470 println!("Creating client RPC WebSocket connection...");
1471 let url = server.node_addr.clone().try_into().unwrap();
1472 let (client, driver) = WebSocketClient::builder(url)
1473 .compat_mode(CompatMode::V0_37)
1474 .build()
1475 .await
1476 .unwrap();
1477 let driver_handle = tokio::spawn(async move { driver.run().await });
1478
1479 println!("Initiating subscription for new blocks...");
1480 let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1481
1482 let subs_collector_hdl = tokio::spawn(async move {
1484 let mut results = Vec::new();
1485 while let Some(res) = subs.next().await {
1486 results.push(res);
1487 if results.len() == 3 {
1488 break;
1489 }
1490 }
1491 results
1492 });
1493
1494 println!("Publishing events");
1495 for ev in &test_events {
1497 server.publish_event(ev.clone()).unwrap();
1498 }
1499
1500 println!("Collecting results from subscription...");
1501 let collected_results = subs_collector_hdl.await.unwrap();
1502
1503 client.close().unwrap();
1504 server.terminate().await.unwrap();
1505 let _ = driver_handle.await.unwrap();
1506 println!("Closed client and terminated server");
1507
1508 assert_eq!(3, collected_results.len());
1509 for i in 0..3 {
1510 assert_eq!(
1511 test_events[i],
1512 collected_results[i].as_ref().unwrap().clone()
1513 );
1514 }
1515 }
1516 }
1517
1518 mod v0_38 {
1519 use super::*;
1520 use crate::event::v0_38::DeEvent;
1521
1522 async fn read_event(name: &str) -> Event {
1523 DeEvent::from_string(read_json_fixture("v0_38", name).await)
1524 .unwrap()
1525 .into()
1526 }
1527
1528 #[tokio::test]
1529 async fn websocket_client_happy_path() {
1530 let event1 = read_event("subscribe_newblock_0").await;
1531 let event2 = read_event("subscribe_newblock_1").await;
1532 let event3 = read_event("subscribe_newblock_2").await;
1533 let test_events = vec![event1, event2, event3];
1534
1535 println!("Starting WebSocket server...");
1536 let mut server = TestServer::new("127.0.0.1:0", TestRpcVersion::V0_38).await;
1537 println!("Creating client RPC WebSocket connection...");
1538 let url = server.node_addr.clone().try_into().unwrap();
1539 let (client, driver) = WebSocketClient::builder(url)
1540 .compat_mode(CompatMode::V0_37)
1541 .build()
1542 .await
1543 .unwrap();
1544 let driver_handle = tokio::spawn(async move { driver.run().await });
1545
1546 println!("Initiating subscription for new blocks...");
1547 let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
1548
1549 let subs_collector_hdl = tokio::spawn(async move {
1551 let mut results = Vec::new();
1552 while let Some(res) = subs.next().await {
1553 results.push(res);
1554 if results.len() == 3 {
1555 break;
1556 }
1557 }
1558 results
1559 });
1560
1561 println!("Publishing events");
1562 for ev in &test_events {
1564 server.publish_event(ev.clone()).unwrap();
1565 }
1566
1567 println!("Collecting results from subscription...");
1568 let collected_results = subs_collector_hdl.await.unwrap();
1569
1570 client.close().unwrap();
1571 server.terminate().await.unwrap();
1572 let _ = driver_handle.await.unwrap();
1573 println!("Closed client and terminated server");
1574
1575 assert_eq!(3, collected_results.len());
1576 for i in 0..3 {
1577 assert_eq!(
1578 test_events[i],
1579 collected_results[i].as_ref().unwrap().clone()
1580 );
1581 }
1582 }
1583 }
1584
1585 fn authorization(req: &http::Request<()>) -> Option<&str> {
1586 req.headers()
1587 .get(AUTHORIZATION)
1588 .map(|h| h.to_str().unwrap())
1589 }
1590
1591 #[test]
1592 fn without_basic_auth() {
1593 let uri = Uri::from_str("http://example.com").unwrap();
1594 let req = uri.into_client_request().unwrap();
1595
1596 assert_eq!(authorization(&req), None);
1597 }
1598
1599 #[test]
1600 fn with_basic_auth() {
1601 let uri = Uri::from_str("http://toto:tata@example.com").unwrap();
1602 let req = uri.into_client_request().unwrap();
1603
1604 assert_eq!(authorization(&req), None);
1605 }
1606}