1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
62use uuid::Uuid;
63
64use crate::TYCHO_SERVER_VERSION;
65
66#[derive(Error, Debug)]
67pub enum DeltasError {
68 #[error("Failed to parse URI: {0}. Error: {1}")]
70 UriParsing(String, String),
71
72 #[error("The requested subscription is already pending")]
74 SubscriptionAlreadyPending,
75
76 #[error("{0}")]
79 TransportError(String),
80
81 #[error("The buffer is full!")]
85 BufferFull,
86
87 #[error("The client is not connected!")]
91 NotConnected,
92
93 #[error("The client is already connected!")]
95 AlreadyConnected,
96
97 #[error("The server closed the connection!")]
99 ConnectionClosed,
100
101 #[error("Connection error: {0}")]
103 ConnectionError(#[from] Box<tungstenite::Error>),
104
105 #[error("Tycho FatalError: {0}")]
107 Fatal(String),
108}
109
110#[derive(Clone, Debug)]
111pub struct SubscriptionOptions {
112 include_state: bool,
113}
114
115impl Default for SubscriptionOptions {
116 fn default() -> Self {
117 Self { include_state: true }
118 }
119}
120
121impl SubscriptionOptions {
122 pub fn new() -> Self {
123 Self::default()
124 }
125 pub fn with_state(mut self, val: bool) -> Self {
126 self.include_state = val;
127 self
128 }
129}
130
131#[cfg_attr(test, automock)]
132#[async_trait]
133pub trait DeltasClient {
134 async fn subscribe(
141 &self,
142 extractor_id: ExtractorIdentity,
143 options: SubscriptionOptions,
144 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
145
146 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
148
149 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
151
152 async fn close(&self) -> Result<(), DeltasError>;
154}
155
156#[derive(Clone)]
157pub struct WsDeltasClient {
158 uri: Uri,
160 auth_key: Option<String>,
162 max_reconnects: u32,
164 ws_buffer_size: usize,
167 subscription_buffer_size: usize,
170 conn_notify: Arc<Notify>,
172 inner: Arc<Mutex<Option<Inner>>>,
174 dead: Arc<AtomicBool>,
176}
177
178type WebSocketSink =
179 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
180
181#[derive(Debug)]
193enum SubscriptionInfo {
194 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
196 Active,
198 RequestedUnsubscription(oneshot::Sender<()>),
200}
201
202struct Inner {
204 sink: WebSocketSink,
206 cmd_tx: Sender<()>,
208 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
210 subscriptions: HashMap<Uuid, SubscriptionInfo>,
212 sender: HashMap<Uuid, Sender<BlockChanges>>,
215 buffer_size: usize,
217}
218
219impl Inner {
223 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
224 Self {
225 sink,
226 cmd_tx,
227 pending: HashMap::new(),
228 subscriptions: HashMap::new(),
229 sender: HashMap::new(),
230 buffer_size,
231 }
232 }
233
234 #[allow(clippy::result_large_err)]
236 fn new_subscription(
237 &mut self,
238 id: &ExtractorIdentity,
239 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
240 ) -> Result<(), DeltasError> {
241 if self.pending.contains_key(id) {
242 return Err(DeltasError::SubscriptionAlreadyPending);
243 }
244 self.pending
245 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
246 Ok(())
247 }
248
249 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
253 if let Some(info) = self.pending.remove(extractor_id) {
254 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
255 let (tx, rx) = mpsc::channel(self.buffer_size);
256 self.sender.insert(subscription_id, tx);
257 self.subscriptions
258 .insert(subscription_id, SubscriptionInfo::Active);
259 let _ = ready_tx
260 .send((subscription_id, rx))
261 .map_err(|_| {
262 warn!(
263 ?extractor_id,
264 ?subscription_id,
265 "Subscriber for has gone away. Ignoring."
266 )
267 });
268 } else {
269 error!(
270 ?extractor_id,
271 ?subscription_id,
272 "Pending subscription was not in the correct state to
273 transition to active. Ignoring!"
274 )
275 }
276 } else {
277 error!(
278 ?extractor_id,
279 ?subscription_id,
280 "Tried to mark an unkown subscription as active. Ignoring!"
281 );
282 }
283 }
284
285 #[allow(clippy::result_large_err)]
287 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
288 if let Some(sender) = self.sender.get_mut(id) {
289 sender
290 .try_send(msg)
291 .map_err(|e| match e {
292 TrySendError::Full(_) => DeltasError::BufferFull,
293 TrySendError::Closed(_) => {
294 DeltasError::TransportError("The subscriber has gone away".to_string())
295 }
296 })?;
297 }
298 Ok(())
299 }
300
301 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
306 if let Some(info) = self
307 .subscriptions
308 .get_mut(subscription_id)
309 {
310 if let SubscriptionInfo::Active = info {
311 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
312 }
313 } else {
314 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
316 }
317 }
318
319 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
326 if let Entry::Occupied(e) = self
327 .subscriptions
328 .entry(subscription_id)
329 {
330 let info = e.remove();
331 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
332 let _ = tx.send(()).map_err(|_| {
333 warn!(?subscription_id, "failed to notify about removed subscription")
334 });
335 self.sender
336 .remove(&subscription_id)
337 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
338 } else {
339 warn!(?subscription_id, "Subscription ended unexpectedly!");
340 self.sender
341 .remove(&subscription_id)
342 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
343 }
344 } else {
345 error!(
346 ?subscription_id,
347 "Received `SubscriptionEnded`, but was never subscribed
348 to it. This is likely a bug!"
349 );
350 }
351
352 Ok(())
353 }
354
355 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
357 self.sink.send(msg).await.map_err(|e| {
358 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
359 })
360 }
361}
362
363impl WsDeltasClient {
365 #[allow(clippy::result_large_err)]
367 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
368 let uri = ws_uri
369 .parse::<Uri>()
370 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
371 Ok(Self {
372 uri,
373 auth_key: auth_key.map(|s| s.to_string()),
374 inner: Arc::new(Mutex::new(None)),
375 ws_buffer_size: 128,
376 subscription_buffer_size: 128,
377 conn_notify: Arc::new(Notify::new()),
378 max_reconnects: 5,
379 dead: Arc::new(AtomicBool::new(false)),
380 })
381 }
382
383 #[allow(clippy::result_large_err)]
385 pub fn new_with_reconnects(
386 ws_uri: &str,
387 max_reconnects: u32,
388 auth_key: Option<&str>,
389 ) -> Result<Self, DeltasError> {
390 let uri = ws_uri
391 .parse::<Uri>()
392 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
393
394 Ok(Self {
395 uri,
396 auth_key: auth_key.map(|s| s.to_string()),
397 inner: Arc::new(Mutex::new(None)),
398 ws_buffer_size: 128,
399 subscription_buffer_size: 128,
400 conn_notify: Arc::new(Notify::new()),
401 max_reconnects,
402 dead: Arc::new(AtomicBool::new(false)),
403 })
404 }
405
406 #[cfg(test)]
408 #[allow(clippy::result_large_err)]
409 pub fn new_with_custom_buffers(
410 ws_uri: &str,
411 auth_key: Option<&str>,
412 ws_buffer_size: usize,
413 subscription_buffer_size: usize,
414 ) -> Result<Self, DeltasError> {
415 let uri = ws_uri
416 .parse::<Uri>()
417 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
418 Ok(Self {
419 uri,
420 auth_key: auth_key.map(|s| s.to_string()),
421 inner: Arc::new(Mutex::new(None)),
422 ws_buffer_size,
423 subscription_buffer_size,
424 conn_notify: Arc::new(Notify::new()),
425 max_reconnects: 5,
426 dead: Arc::new(AtomicBool::new(false)),
427 })
428 }
429
430 async fn is_connected(&self) -> bool {
434 let guard = self.inner.as_ref().lock().await;
435 guard.is_some()
436 }
437
438 async fn ensure_connection(&self) -> Result<(), DeltasError> {
443 if self.dead.load(Ordering::SeqCst) {
444 return Err(DeltasError::NotConnected)
445 };
446 if !self.is_connected().await {
447 self.conn_notify.notified().await;
448 };
449 Ok(())
450 }
451
452 #[instrument(skip(self, msg))]
457 async fn handle_msg(
458 &self,
459 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
460 ) -> Result<(), DeltasError> {
461 let mut guard = self.inner.lock().await;
462
463 match msg {
464 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
469 serde_json::Value,
470 >(&text)
471 {
472 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
473 Ok(ws_message) => match ws_message {
474 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
475 trace!(?deltas, "Received a block state change, sending to channel");
476 let inner = guard
477 .as_mut()
478 .ok_or_else(|| DeltasError::NotConnected)?;
479 match inner.send(&subscription_id, deltas) {
480 Err(DeltasError::BufferFull) => {
481 error!(?subscription_id, "Buffer full, unsubscribing!");
482 Self::force_unsubscribe(subscription_id, inner).await;
483 }
484 Err(_) => {
485 warn!(
486 ?subscription_id,
487 "Receiver for has gone away, unsubscribing!"
488 );
489 Self::force_unsubscribe(subscription_id, inner).await;
490 }
491 _ => { }
492 }
493 }
494 WebSocketMessage::Response(Response::NewSubscription {
495 extractor_id,
496 subscription_id,
497 }) => {
498 info!(?extractor_id, ?subscription_id, "Received a new subscription");
499 let inner = guard
500 .as_mut()
501 .ok_or_else(|| DeltasError::NotConnected)?;
502 inner.mark_active(&extractor_id, subscription_id);
503 }
504 WebSocketMessage::Response(Response::SubscriptionEnded {
505 subscription_id,
506 }) => {
507 info!(?subscription_id, "Received a subscription ended");
508 let inner = guard
509 .as_mut()
510 .ok_or_else(|| DeltasError::NotConnected)?;
511 inner.remove_subscription(subscription_id)?;
512 }
513 },
514 Err(e) => {
515 error!(
516 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
517 e, text
518 );
519 }
520 },
521 Err(e) => {
522 error!(
523 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
524 e, text
525 );
526 }
527 },
528 Ok(tungstenite::protocol::Message::Ping(_)) => {
529 let inner = guard
531 .as_mut()
532 .ok_or_else(|| DeltasError::NotConnected)?;
533 if let Err(error) = inner
534 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
535 .await
536 {
537 debug!(?error, "Failed to send pong!");
538 }
539 }
540 Ok(tungstenite::protocol::Message::Pong(_)) => {
541 }
543 Ok(tungstenite::protocol::Message::Close(_)) => {
544 return Err(DeltasError::ConnectionClosed);
545 }
546 Ok(unknown_msg) => {
547 info!("Received an unknown message type: {:?}", unknown_msg);
548 }
549 Err(error) => {
550 error!(?error, "Websocket error");
551 return Err(match error {
552 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
553 tungstenite::Error::AlreadyClosed => {
554 warn!("Received AlreadyClosed error which is indicative of a bug!");
555 DeltasError::ConnectionError(Box::new(error))
556 }
557 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
558 DeltasError::ConnectionError(Box::new(error))
559 }
560 _ => DeltasError::Fatal(error.to_string()),
561 });
562 }
563 };
564 Ok(())
565 }
566
567 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
572 let (tx, rx) = oneshot::channel();
573 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
574 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
575 } else {
576 match tokio::time::timeout(Duration::from_secs(5), rx).await {
578 Ok(_) => {
579 debug!(?subscription_id, "Unsubscribe completed successfully");
580 }
581 Err(_) => {
582 warn!(?subscription_id, "Unsubscribe completion timed out");
583 }
584 }
585 }
586 }
587
588 async fn unsubscribe_inner(
594 inner: &mut Inner,
595 subscription_id: Uuid,
596 ready_tx: oneshot::Sender<()>,
597 ) -> Result<(), DeltasError> {
598 debug!(?subscription_id, "Unsubscribing");
599 inner.end_subscription(&subscription_id, ready_tx);
600 let cmd = Command::Unsubscribe { subscription_id };
601 inner
602 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
603 |e| {
604 DeltasError::TransportError(format!(
605 "Failed to serialize unsubscribe command: {e}"
606 ))
607 },
608 )?))
609 .await?;
610 Ok(())
611 }
612}
613
614#[async_trait]
615impl DeltasClient for WsDeltasClient {
616 #[instrument(skip(self))]
617 async fn subscribe(
618 &self,
619 extractor_id: ExtractorIdentity,
620 options: SubscriptionOptions,
621 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
622 trace!("Starting subscribe");
623 self.ensure_connection().await?;
624 let (ready_tx, ready_rx) = oneshot::channel();
625 {
626 let mut guard = self.inner.lock().await;
627 let inner = guard
628 .as_mut()
629 .ok_or_else(|| DeltasError::NotConnected)?;
630 trace!("Sending subscribe command");
631 inner.new_subscription(&extractor_id, ready_tx)?;
632 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
633 inner
634 .ws_send(tungstenite::protocol::Message::Text(
635 serde_json::to_string(&cmd).map_err(|e| {
636 DeltasError::TransportError(format!(
637 "Failed to serialize subscribe command: {e}"
638 ))
639 })?,
640 ))
641 .await?;
642 }
643 trace!("Waiting for subscription response");
644 let rx = ready_rx.await.map_err(|_| {
645 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
646 })?;
647 trace!("Subscription successful");
648 Ok(rx)
649 }
650
651 #[instrument(skip(self))]
652 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
653 self.ensure_connection().await?;
654 let (ready_tx, ready_rx) = oneshot::channel();
655 {
656 let mut guard = self.inner.lock().await;
657 let inner = guard
658 .as_mut()
659 .ok_or_else(|| DeltasError::NotConnected)?;
660
661 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
662 }
663 ready_rx.await.map_err(|_| {
664 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
665 })?;
666
667 Ok(())
668 }
669
670 #[instrument(skip(self))]
671 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
672 if self.is_connected().await {
673 return Err(DeltasError::AlreadyConnected);
674 }
675 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
676 info!(?ws_uri, "Starting TychoWebsocketClient");
677
678 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
679 {
680 let mut guard = self.inner.as_ref().lock().await;
681 *guard = None;
682 }
683 let this = self.clone();
684 let jh = tokio::spawn(async move {
685 let mut retry_count = 0;
686 let mut result = Err(DeltasError::NotConnected);
687
688 'retry: while retry_count < this.max_reconnects {
689 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
690
691 let mut request_builder = Request::builder()
693 .uri(&ws_uri)
694 .header(SEC_WEBSOCKET_KEY, generate_key())
695 .header(SEC_WEBSOCKET_VERSION, 13)
696 .header(CONNECTION, "Upgrade")
697 .header(UPGRADE, "websocket")
698 .header(
699 HOST,
700 this.uri.host().ok_or_else(|| {
701 DeltasError::UriParsing(
702 ws_uri.clone(),
703 "No host found in tycho url".to_string(),
704 )
705 })?,
706 )
707 .header(
708 USER_AGENT,
709 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
710 );
711
712 if let Some(ref key) = this.auth_key {
714 request_builder = request_builder.header(AUTHORIZATION, key);
715 }
716
717 let request = request_builder.body(()).map_err(|e| {
718 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
719 })?;
720 let (conn, _) = match connect_async(request).await {
721 Ok(conn) => conn,
722 Err(e) => {
723 retry_count += 1;
725 let mut guard = this.inner.as_ref().lock().await;
726 *guard = None;
727
728 warn!(
729 e = e.to_string(),
730 "Failed to connect to WebSocket server; Reconnecting"
731 );
732 sleep(Duration::from_millis(500)).await;
733
734 continue 'retry;
735 }
736 };
737
738 let (ws_tx_new, ws_rx_new) = conn.split();
739 {
740 let mut guard = this.inner.as_ref().lock().await;
741 *guard =
742 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
743 }
744 let mut msg_rx = ws_rx_new.boxed();
745
746 info!("Connection Successful: TychoWebsocketClient started");
747 this.conn_notify.notify_waiters();
748 result = Ok(());
749
750 loop {
751 let res = tokio::select! {
752 msg = msg_rx.next() => match msg {
753 Some(msg) => this.handle_msg(msg).await,
754 None => { break 'retry } },
756 _ = cmd_rx.recv() => {break 'retry},
757 };
758 if let Err(error) = res {
759 debug!(?error, "WsError");
760 if matches!(
761 error,
762 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
763 ) {
764 retry_count += 1;
766 let mut guard = this.inner.as_ref().lock().await;
767 *guard = None;
768
769 warn!(
770 ?error,
771 ?retry_count,
772 "Connection dropped unexpectedly; Reconnecting..."
773 );
774 break;
775 } else {
776 error!(?error, "Fatal error; Exiting");
778 result = Err(error);
779 break 'retry;
780 }
781 }
782 }
783 }
784 debug!(
785 retry_count,
786 max_reconnects=?this.max_reconnects,
787 "Reconnection loop ended"
788 );
789 let mut guard = this.inner.as_ref().lock().await;
791 *guard = None;
792
793 if retry_count >= this.max_reconnects {
795 error!("Max reconnection attempts reached; Exiting");
796 this.dead.store(true, Ordering::SeqCst);
797 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
799 }
800
801 result
802 });
803
804 self.conn_notify.notified().await;
805
806 if self.is_connected().await {
807 Ok(jh)
808 } else {
809 Err(DeltasError::NotConnected)
810 }
811 }
812
813 #[instrument(skip(self))]
814 async fn close(&self) -> Result<(), DeltasError> {
815 info!("Closing TychoWebsocketClient");
816 let mut guard = self.inner.lock().await;
817 let inner = guard
818 .as_mut()
819 .ok_or_else(|| DeltasError::NotConnected)?;
820 inner
821 .cmd_tx
822 .send(())
823 .await
824 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
825 Ok(())
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use std::net::SocketAddr;
832
833 use test_log::test;
834 use tokio::{net::TcpListener, time::timeout};
835 use tycho_common::dto::Chain;
836
837 use super::*;
838
839 #[derive(Clone)]
840 enum ExpectedComm {
841 Receive(u64, tungstenite::protocol::Message),
842 Send(tungstenite::protocol::Message),
843 }
844
845 async fn mock_tycho_ws(
846 messages: &[ExpectedComm],
847 reconnects: usize,
848 ) -> (SocketAddr, JoinHandle<()>) {
849 info!("Starting mock webserver");
850 let server = TcpListener::bind("127.0.0.1:0")
852 .await
853 .expect("localhost bind failed");
854 let addr = server.local_addr().unwrap();
855 let messages = messages.to_vec();
856
857 let jh = tokio::spawn(async move {
858 info!("mock webserver started");
859 for _ in 0..(reconnects + 1) {
860 if let Ok((stream, _)) = server.accept().await {
861 let mut websocket = tokio_tungstenite::accept_async(stream)
862 .await
863 .unwrap();
864
865 info!("Handling messages..");
866 for c in messages.iter().cloned() {
867 match c {
868 ExpectedComm::Receive(t, exp) => {
869 info!("Awaiting message...");
870 let msg = timeout(Duration::from_millis(t), websocket.next())
871 .await
872 .expect("Receive timeout")
873 .expect("Stream exhausted")
874 .expect("Failed to receive message.");
875 info!("Message received");
876 assert_eq!(msg, exp)
877 }
878 ExpectedComm::Send(data) => {
879 info!("Sending message");
880 websocket
881 .send(data)
882 .await
883 .expect("Failed to send message");
884 info!("Message sent");
885 }
886 };
887 }
888 sleep(Duration::from_millis(100)).await;
889 let _ = websocket.close(None).await;
891 }
892 }
893 });
894 (addr, jh)
895 }
896
897 #[tokio::test]
898 async fn test_subscribe_receive() {
899 let exp_comm = [
900 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
901 {
902 "method":"subscribe",
903 "extractor_id":{
904 "chain":"ethereum",
905 "name":"vm:ambient"
906 },
907 "include_state": true
908 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
909 )),
910 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
911 {
912 "method":"newsubscription",
913 "extractor_id":{
914 "chain":"ethereum",
915 "name":"vm:ambient"
916 },
917 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
918 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
919 )),
920 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
921 {
922 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
923 "deltas": {
924 "extractor": "vm:ambient",
925 "chain": "ethereum",
926 "block": {
927 "number": 123,
928 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
929 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
930 "chain": "ethereum",
931 "ts": "2023-09-14T00:00:00"
932 },
933 "finalized_block_height": 0,
934 "revert": false,
935 "new_tokens": {},
936 "account_updates": {
937 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
938 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
939 "chain": "ethereum",
940 "slots": {},
941 "balance": "0x01f4",
942 "code": "",
943 "change": "Update"
944 }
945 },
946 "state_updates": {
947 "component_1": {
948 "component_id": "component_1",
949 "updated_attributes": {"attr1": "0x01"},
950 "deleted_attributes": ["attr2"]
951 }
952 },
953 "new_protocol_components":
954 { "protocol_1": {
955 "id": "protocol_1",
956 "protocol_system": "system_1",
957 "protocol_type_name": "type_1",
958 "chain": "ethereum",
959 "tokens": ["0x01", "0x02"],
960 "contract_ids": ["0x01", "0x02"],
961 "static_attributes": {"attr1": "0x01f4"},
962 "change": "Update",
963 "creation_tx": "0x01",
964 "created_at": "2023-09-14T00:00:00"
965 }
966 },
967 "deleted_protocol_components": {},
968 "component_balances": {
969 "protocol_1":
970 {
971 "0x01": {
972 "token": "0x01",
973 "balance": "0x01f4",
974 "balance_float": 0.0,
975 "modify_tx": "0x01",
976 "component_id": "protocol_1"
977 }
978 }
979 },
980 "account_balances": {
981 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
982 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
983 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
984 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
985 "balance": "0x01f4",
986 "modify_tx": "0x01"
987 }
988 }
989 },
990 "component_tvl": {
991 "protocol_1": 1000.0
992 },
993 "dci_update": {
994 "new_entrypoints": {},
995 "new_entrypoint_params": {},
996 "trace_results": {}
997 }
998 }
999 }
1000 "#.to_owned()
1001 ))
1002 ];
1003 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1004
1005 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1006 let jh = client
1007 .connect()
1008 .await
1009 .expect("connect failed");
1010 let (_, mut rx) = timeout(
1011 Duration::from_millis(100),
1012 client.subscribe(
1013 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1014 SubscriptionOptions::new(),
1015 ),
1016 )
1017 .await
1018 .expect("subscription timed out")
1019 .expect("subscription failed");
1020 let _ = timeout(Duration::from_millis(100), rx.recv())
1021 .await
1022 .expect("awaiting message timeout out")
1023 .expect("receiving message failed");
1024 timeout(Duration::from_millis(100), client.close())
1025 .await
1026 .expect("close timed out")
1027 .expect("close failed");
1028 jh.await
1029 .expect("ws loop errored")
1030 .unwrap();
1031 server_thread.await.unwrap();
1032 }
1033
1034 #[tokio::test]
1035 async fn test_unsubscribe() {
1036 let exp_comm = [
1037 ExpectedComm::Receive(
1038 100,
1039 tungstenite::protocol::Message::Text(
1040 r#"
1041 {
1042 "method": "subscribe",
1043 "extractor_id":{
1044 "chain": "ethereum",
1045 "name": "vm:ambient"
1046 },
1047 "include_state": true
1048 }"#
1049 .to_owned()
1050 .replace(|c: char| c.is_whitespace(), ""),
1051 ),
1052 ),
1053 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1054 r#"
1055 {
1056 "method": "newsubscription",
1057 "extractor_id":{
1058 "chain": "ethereum",
1059 "name": "vm:ambient"
1060 },
1061 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1062 }"#
1063 .to_owned()
1064 .replace(|c: char| c.is_whitespace(), ""),
1065 )),
1066 ExpectedComm::Receive(
1067 100,
1068 tungstenite::protocol::Message::Text(
1069 r#"
1070 {
1071 "method": "unsubscribe",
1072 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1073 }
1074 "#
1075 .to_owned()
1076 .replace(|c: char| c.is_whitespace(), ""),
1077 ),
1078 ),
1079 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1080 r#"
1081 {
1082 "method": "subscriptionended",
1083 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1084 }
1085 "#
1086 .to_owned()
1087 .replace(|c: char| c.is_whitespace(), ""),
1088 )),
1089 ];
1090 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1091
1092 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1093 let jh = client
1094 .connect()
1095 .await
1096 .expect("connect failed");
1097 let (sub_id, mut rx) = timeout(
1098 Duration::from_millis(100),
1099 client.subscribe(
1100 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1101 SubscriptionOptions::new(),
1102 ),
1103 )
1104 .await
1105 .expect("subscription timed out")
1106 .expect("subscription failed");
1107
1108 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1109 .await
1110 .expect("unsubscribe timed out")
1111 .expect("unsubscribe failed");
1112 let res = timeout(Duration::from_millis(100), rx.recv())
1113 .await
1114 .expect("awaiting message timeout out");
1115
1116 assert!(res.is_none());
1118
1119 timeout(Duration::from_millis(100), client.close())
1120 .await
1121 .expect("close timed out")
1122 .expect("close failed");
1123 jh.await
1124 .expect("ws loop errored")
1125 .unwrap();
1126 server_thread.await.unwrap();
1127 }
1128
1129 #[tokio::test]
1130 async fn test_subscription_unexpected_end() {
1131 let exp_comm = [
1132 ExpectedComm::Receive(
1133 100,
1134 tungstenite::protocol::Message::Text(
1135 r#"
1136 {
1137 "method":"subscribe",
1138 "extractor_id":{
1139 "chain":"ethereum",
1140 "name":"vm:ambient"
1141 },
1142 "include_state": true
1143 }"#
1144 .to_owned()
1145 .replace(|c: char| c.is_whitespace(), ""),
1146 ),
1147 ),
1148 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1149 r#"
1150 {
1151 "method":"newsubscription",
1152 "extractor_id":{
1153 "chain":"ethereum",
1154 "name":"vm:ambient"
1155 },
1156 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1157 }"#
1158 .to_owned()
1159 .replace(|c: char| c.is_whitespace(), ""),
1160 )),
1161 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1162 r#"
1163 {
1164 "method": "subscriptionended",
1165 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1166 }"#
1167 .to_owned()
1168 .replace(|c: char| c.is_whitespace(), ""),
1169 )),
1170 ];
1171 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1172
1173 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1174 let jh = client
1175 .connect()
1176 .await
1177 .expect("connect failed");
1178 let (_, mut rx) = timeout(
1179 Duration::from_millis(100),
1180 client.subscribe(
1181 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1182 SubscriptionOptions::new(),
1183 ),
1184 )
1185 .await
1186 .expect("subscription timed out")
1187 .expect("subscription failed");
1188 let res = timeout(Duration::from_millis(100), rx.recv())
1189 .await
1190 .expect("awaiting message timeout out");
1191
1192 assert!(res.is_none());
1194
1195 timeout(Duration::from_millis(100), client.close())
1196 .await
1197 .expect("close timed out")
1198 .expect("close failed");
1199 jh.await
1200 .expect("ws loop errored")
1201 .unwrap();
1202 server_thread.await.unwrap();
1203 }
1204
1205 #[test_log::test(tokio::test)]
1206 async fn test_reconnect() {
1207 let exp_comm = [
1208 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1209 {
1210 "method":"subscribe",
1211 "extractor_id":{
1212 "chain":"ethereum",
1213 "name":"vm:ambient"
1214 },
1215 "include_state": true
1216 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1217 )),
1218 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1219 {
1220 "method":"newsubscription",
1221 "extractor_id":{
1222 "chain":"ethereum",
1223 "name":"vm:ambient"
1224 },
1225 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1226 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1227 )),
1228 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1229 {
1230 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1231 "deltas": {
1232 "extractor": "vm:ambient",
1233 "chain": "ethereum",
1234 "block": {
1235 "number": 123,
1236 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1237 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1238 "chain": "ethereum",
1239 "ts": "2023-09-14T00:00:00"
1240 },
1241 "finalized_block_height": 0,
1242 "revert": false,
1243 "new_tokens": {},
1244 "account_updates": {
1245 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1246 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1247 "chain": "ethereum",
1248 "slots": {},
1249 "balance": "0x01f4",
1250 "code": "",
1251 "change": "Update"
1252 }
1253 },
1254 "state_updates": {
1255 "component_1": {
1256 "component_id": "component_1",
1257 "updated_attributes": {"attr1": "0x01"},
1258 "deleted_attributes": ["attr2"]
1259 }
1260 },
1261 "new_protocol_components": {
1262 "protocol_1":
1263 {
1264 "id": "protocol_1",
1265 "protocol_system": "system_1",
1266 "protocol_type_name": "type_1",
1267 "chain": "ethereum",
1268 "tokens": ["0x01", "0x02"],
1269 "contract_ids": ["0x01", "0x02"],
1270 "static_attributes": {"attr1": "0x01f4"},
1271 "change": "Update",
1272 "creation_tx": "0x01",
1273 "created_at": "2023-09-14T00:00:00"
1274 }
1275 },
1276 "deleted_protocol_components": {},
1277 "component_balances": {
1278 "protocol_1": {
1279 "0x01": {
1280 "token": "0x01",
1281 "balance": "0x01f4",
1282 "balance_float": 1000.0,
1283 "modify_tx": "0x01",
1284 "component_id": "protocol_1"
1285 }
1286 }
1287 },
1288 "account_balances": {
1289 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1290 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1291 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1292 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1293 "balance": "0x01f4",
1294 "modify_tx": "0x01"
1295 }
1296 }
1297 },
1298 "component_tvl": {
1299 "protocol_1": 1000.0
1300 },
1301 "dci_update": {
1302 "new_entrypoints": {},
1303 "new_entrypoint_params": {},
1304 "trace_results": {}
1305 }
1306 }
1307 }
1308 "#.to_owned()
1309 ))
1310 ];
1311 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1312 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1313 let jh: JoinHandle<Result<(), DeltasError>> = client
1314 .connect()
1315 .await
1316 .expect("connect failed");
1317
1318 for _ in 0..2 {
1319 let (_, mut rx) = timeout(
1320 Duration::from_millis(100),
1321 client.subscribe(
1322 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1323 SubscriptionOptions::new(),
1324 ),
1325 )
1326 .await
1327 .expect("subscription timed out")
1328 .expect("subscription failed");
1329
1330 let _ = timeout(Duration::from_millis(100), rx.recv())
1331 .await
1332 .expect("awaiting message timeout out")
1333 .expect("receiving message failed");
1334
1335 let res = timeout(Duration::from_millis(200), rx.recv())
1337 .await
1338 .expect("awaiting closed connection timeout out");
1339 assert!(res.is_none());
1340 }
1341 let res = jh.await.expect("ws client join failed");
1342 assert!(res.is_err());
1344 server_thread
1345 .await
1346 .expect("ws server loop errored");
1347 }
1348
1349 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1350 let server = TcpListener::bind("127.0.0.1:0")
1351 .await
1352 .expect("localhost bind failed");
1353 let addr = server.local_addr().unwrap();
1354 let jh = tokio::spawn(async move {
1355 while let Ok((stream, _)) = server.accept().await {
1356 if accept_first {
1357 let stream = tokio_tungstenite::accept_async(stream)
1359 .await
1360 .unwrap();
1361 sleep(Duration::from_millis(10)).await;
1362 drop(stream)
1363 } else {
1364 drop(stream);
1366 }
1367 }
1368 });
1369 (addr, jh)
1370 }
1371
1372 #[test(tokio::test)]
1373 async fn test_subscribe_dead_client_after_max_attempts() {
1374 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1375 let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1376
1377 let join_handle = client.connect().await.unwrap();
1378 let handle_res = join_handle.await.unwrap();
1379 assert!(handle_res.is_err());
1380 assert!(!client.is_connected().await);
1381
1382 let subscription_res = timeout(
1383 Duration::from_millis(10),
1384 client.subscribe(
1385 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1386 SubscriptionOptions::new(),
1387 ),
1388 )
1389 .await
1390 .unwrap();
1391 assert!(subscription_res.is_err());
1392 }
1393
1394 #[test_log::test(tokio::test)]
1395 async fn test_buffer_full_triggers_unsubscribe() {
1396 let exp_comm = {
1398 [
1399 ExpectedComm::Receive(
1401 100,
1402 tungstenite::protocol::Message::Text(
1403 r#"
1404 {
1405 "method":"subscribe",
1406 "extractor_id":{
1407 "chain":"ethereum",
1408 "name":"vm:ambient"
1409 },
1410 "include_state": true
1411 }"#
1412 .to_owned()
1413 .replace(|c: char| c.is_whitespace(), ""),
1414 ),
1415 ),
1416 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1418 r#"
1419 {
1420 "method":"newsubscription",
1421 "extractor_id":{
1422 "chain":"ethereum",
1423 "name":"vm:ambient"
1424 },
1425 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1426 }"#
1427 .to_owned()
1428 .replace(|c: char| c.is_whitespace(), ""),
1429 )),
1430 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1432 r#"
1433 {
1434 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1435 "deltas": {
1436 "extractor": "vm:ambient",
1437 "chain": "ethereum",
1438 "block": {
1439 "number": 123,
1440 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1441 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1442 "chain": "ethereum",
1443 "ts": "2023-09-14T00:00:00"
1444 },
1445 "finalized_block_height": 0,
1446 "revert": false,
1447 "new_tokens": {},
1448 "account_updates": {},
1449 "state_updates": {},
1450 "new_protocol_components": {},
1451 "deleted_protocol_components": {},
1452 "component_balances": {},
1453 "account_balances": {},
1454 "component_tvl": {},
1455 "dci_update": {
1456 "new_entrypoints": {},
1457 "new_entrypoint_params": {},
1458 "trace_results": {}
1459 }
1460 }
1461 }
1462 "#.to_owned()
1463 )),
1464 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1466 r#"
1467 {
1468 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1469 "deltas": {
1470 "extractor": "vm:ambient",
1471 "chain": "ethereum",
1472 "block": {
1473 "number": 124,
1474 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1475 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1476 "chain": "ethereum",
1477 "ts": "2023-09-14T00:00:01"
1478 },
1479 "finalized_block_height": 0,
1480 "revert": false,
1481 "new_tokens": {},
1482 "account_updates": {},
1483 "state_updates": {},
1484 "new_protocol_components": {},
1485 "deleted_protocol_components": {},
1486 "component_balances": {},
1487 "account_balances": {},
1488 "component_tvl": {},
1489 "dci_update": {
1490 "new_entrypoints": {},
1491 "new_entrypoint_params": {},
1492 "trace_results": {}
1493 }
1494 }
1495 }
1496 "#.to_owned()
1497 )),
1498 ExpectedComm::Receive(
1500 100,
1501 tungstenite::protocol::Message::Text(
1502 r#"
1503 {
1504 "method": "unsubscribe",
1505 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1506 }
1507 "#
1508 .to_owned()
1509 .replace(|c: char| c.is_whitespace(), ""),
1510 ),
1511 ),
1512 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1514 r#"
1515 {
1516 "method": "subscriptionended",
1517 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1518 }
1519 "#
1520 .to_owned()
1521 .replace(|c: char| c.is_whitespace(), ""),
1522 )),
1523 ]
1524 };
1525
1526 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1527
1528 let client = WsDeltasClient::new_with_custom_buffers(
1530 &format!("ws://{addr}"),
1531 None,
1532 128, 1, )
1535 .unwrap();
1536
1537 let jh = client
1538 .connect()
1539 .await
1540 .expect("connect failed");
1541
1542 let (_sub_id, mut rx) = timeout(
1543 Duration::from_millis(100),
1544 client.subscribe(
1545 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1546 SubscriptionOptions::new(),
1547 ),
1548 )
1549 .await
1550 .expect("subscription timed out")
1551 .expect("subscription failed");
1552
1553 tokio::time::sleep(Duration::from_millis(100)).await;
1555
1556 let mut received_msgs = Vec::new();
1558
1559 while received_msgs.len() < 3 {
1561 match timeout(Duration::from_millis(200), rx.recv()).await {
1562 Ok(Some(msg)) => {
1563 received_msgs.push(msg);
1564 }
1565 Ok(None) => {
1566 break;
1568 }
1569 Err(_) => {
1570 break;
1572 }
1573 }
1574 }
1575
1576 assert!(
1578 received_msgs.len() <= 1,
1579 "Expected buffer overflow to limit messages to at most 1, got {}",
1580 received_msgs.len()
1581 );
1582
1583 if let Some(first_msg) = received_msgs.first() {
1584 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1585 }
1586
1587 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1594
1595 jh.abort();
1597 server_thread.abort();
1598
1599 let _ = jh.await;
1600 let _ = server_thread.await;
1601 }
1602}