1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
62use uuid::Uuid;
63
64use crate::TYCHO_SERVER_VERSION;
65
66#[derive(Error, Debug)]
67pub enum DeltasError {
68 #[error("Failed to parse URI: {0}. Error: {1}")]
70 UriParsing(String, String),
71
72 #[error("The requested subscription is already pending")]
74 SubscriptionAlreadyPending,
75
76 #[error("{0}")]
79 TransportError(String),
80
81 #[error("The buffer is full!")]
85 BufferFull,
86
87 #[error("The client is not connected!")]
91 NotConnected,
92
93 #[error("The client is already connected!")]
95 AlreadyConnected,
96
97 #[error("The server closed the connection!")]
99 ConnectionClosed,
100
101 #[error("Connection error: {0}")]
103 ConnectionError(#[from] Box<tungstenite::Error>),
104
105 #[error("Tycho FatalError: {0}")]
107 Fatal(String),
108}
109
110#[derive(Clone, Debug)]
111pub struct SubscriptionOptions {
112 include_state: bool,
113}
114
115impl Default for SubscriptionOptions {
116 fn default() -> Self {
117 Self { include_state: true }
118 }
119}
120
121impl SubscriptionOptions {
122 pub fn new() -> Self {
123 Self::default()
124 }
125 pub fn with_state(mut self, val: bool) -> Self {
126 self.include_state = val;
127 self
128 }
129}
130
131#[cfg_attr(test, automock)]
132#[async_trait]
133pub trait DeltasClient {
134 async fn subscribe(
141 &self,
142 extractor_id: ExtractorIdentity,
143 options: SubscriptionOptions,
144 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
145
146 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
148
149 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
151
152 async fn close(&self) -> Result<(), DeltasError>;
154}
155
156#[derive(Clone)]
157pub struct WsDeltasClient {
158 uri: Uri,
160 auth_key: Option<String>,
162 max_reconnects: u32,
164 ws_buffer_size: usize,
167 subscription_buffer_size: usize,
170 conn_notify: Arc<Notify>,
172 inner: Arc<Mutex<Option<Inner>>>,
174 dead: Arc<AtomicBool>,
176}
177
178type WebSocketSink =
179 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
180
181#[derive(Debug)]
193enum SubscriptionInfo {
194 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
196 Active,
198 RequestedUnsubscription(oneshot::Sender<()>),
200}
201
202struct Inner {
204 sink: WebSocketSink,
206 cmd_tx: Sender<()>,
208 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
210 subscriptions: HashMap<Uuid, SubscriptionInfo>,
212 sender: HashMap<Uuid, Sender<BlockChanges>>,
215 buffer_size: usize,
217}
218
219impl Inner {
223 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
224 Self {
225 sink,
226 cmd_tx,
227 pending: HashMap::new(),
228 subscriptions: HashMap::new(),
229 sender: HashMap::new(),
230 buffer_size,
231 }
232 }
233
234 #[allow(clippy::result_large_err)]
236 fn new_subscription(
237 &mut self,
238 id: &ExtractorIdentity,
239 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
240 ) -> Result<(), DeltasError> {
241 if self.pending.contains_key(id) {
242 return Err(DeltasError::SubscriptionAlreadyPending);
243 }
244 self.pending
245 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
246 Ok(())
247 }
248
249 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
253 if let Some(info) = self.pending.remove(extractor_id) {
254 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
255 let (tx, rx) = mpsc::channel(self.buffer_size);
256 self.sender.insert(subscription_id, tx);
257 self.subscriptions
258 .insert(subscription_id, SubscriptionInfo::Active);
259 let _ = ready_tx
260 .send((subscription_id, rx))
261 .map_err(|_| {
262 warn!(
263 ?extractor_id,
264 ?subscription_id,
265 "Subscriber for has gone away. Ignoring."
266 )
267 });
268 } else {
269 error!(
270 ?extractor_id,
271 ?subscription_id,
272 "Pending subscription was not in the correct state to
273 transition to active. Ignoring!"
274 )
275 }
276 } else {
277 error!(
278 ?extractor_id,
279 ?subscription_id,
280 "Tried to mark an unkown subscription as active. Ignoring!"
281 );
282 }
283 }
284
285 #[allow(clippy::result_large_err)]
287 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
288 if let Some(sender) = self.sender.get_mut(id) {
289 sender
290 .try_send(msg)
291 .map_err(|e| match e {
292 TrySendError::Full(_) => DeltasError::BufferFull,
293 TrySendError::Closed(_) => {
294 DeltasError::TransportError("The subscriber has gone away".to_string())
295 }
296 })?;
297 }
298 Ok(())
299 }
300
301 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
306 if let Some(info) = self
307 .subscriptions
308 .get_mut(subscription_id)
309 {
310 if let SubscriptionInfo::Active = info {
311 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
312 }
313 } else {
314 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
316 }
317 }
318
319 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
326 if let Entry::Occupied(e) = self
327 .subscriptions
328 .entry(subscription_id)
329 {
330 let info = e.remove();
331 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
332 let _ = tx.send(()).map_err(|_| {
333 warn!(?subscription_id, "failed to notify about removed subscription")
334 });
335 self.sender
336 .remove(&subscription_id)
337 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
338 } else {
339 warn!(?subscription_id, "Subscription ended unexpectedly!");
340 self.sender
341 .remove(&subscription_id)
342 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
343 }
344 } else {
345 error!(
346 ?subscription_id,
347 "Received `SubscriptionEnded`, but was never subscribed
348 to it. This is likely a bug!"
349 );
350 }
351
352 Ok(())
353 }
354
355 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
357 self.sink.send(msg).await.map_err(|e| {
358 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
359 })
360 }
361}
362
363impl WsDeltasClient {
365 #[allow(clippy::result_large_err)]
367 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
368 let uri = ws_uri
369 .parse::<Uri>()
370 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
371 Ok(Self {
372 uri,
373 auth_key: auth_key.map(|s| s.to_string()),
374 inner: Arc::new(Mutex::new(None)),
375 ws_buffer_size: 128,
376 subscription_buffer_size: 128,
377 conn_notify: Arc::new(Notify::new()),
378 max_reconnects: 5,
379 dead: Arc::new(AtomicBool::new(false)),
380 })
381 }
382
383 #[allow(clippy::result_large_err)]
385 pub fn new_with_reconnects(
386 ws_uri: &str,
387 max_reconnects: u32,
388 auth_key: Option<&str>,
389 ) -> Result<Self, DeltasError> {
390 let uri = ws_uri
391 .parse::<Uri>()
392 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
393
394 Ok(Self {
395 uri,
396 auth_key: auth_key.map(|s| s.to_string()),
397 inner: Arc::new(Mutex::new(None)),
398 ws_buffer_size: 128,
399 subscription_buffer_size: 128,
400 conn_notify: Arc::new(Notify::new()),
401 max_reconnects,
402 dead: Arc::new(AtomicBool::new(false)),
403 })
404 }
405
406 async fn is_connected(&self) -> bool {
410 let guard = self.inner.as_ref().lock().await;
411 guard.is_some()
412 }
413
414 async fn ensure_connection(&self) -> Result<(), DeltasError> {
419 if self.dead.load(Ordering::SeqCst) {
420 return Err(DeltasError::NotConnected)
421 };
422 if !self.is_connected().await {
423 self.conn_notify.notified().await;
424 };
425 Ok(())
426 }
427
428 #[instrument(skip(self, msg))]
433 async fn handle_msg(
434 &self,
435 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
436 ) -> Result<(), DeltasError> {
437 let mut guard = self.inner.lock().await;
438
439 match msg {
440 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
445 serde_json::Value,
446 >(&text)
447 {
448 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
449 Ok(ws_message) => match ws_message {
450 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
451 trace!(?deltas, "Received a block state change, sending to channel");
452 let inner = guard
453 .as_mut()
454 .ok_or_else(|| DeltasError::NotConnected)?;
455 match inner.send(&subscription_id, deltas) {
456 Err(DeltasError::BufferFull) => {
457 error!(?subscription_id, "Buffer full, message dropped!");
458 }
459 Err(_) => {
460 warn!(
461 ?subscription_id,
462 "Receiver for has gone away, unsubscribing!"
463 );
464 let (tx, rx) = oneshot::channel();
465 if let Err(e) = WsDeltasClient::unsubscribe_inner(
466 inner,
467 subscription_id,
468 tx,
469 )
470 .await
471 {
472 warn!(
473 ?e,
474 ?subscription_id,
475 "Failed to send unsubscribe command"
476 );
477 } else {
478 match tokio::time::timeout(Duration::from_secs(5), rx).await
480 {
481 Ok(_) => {
482 debug!(
483 ?subscription_id,
484 "Unsubscribe completed successfully"
485 );
486 }
487 Err(_) => {
488 warn!(
489 ?subscription_id,
490 "Unsubscribe completion timed out"
491 );
492 }
493 }
494 }
495 }
496 _ => { }
497 }
498 }
499 WebSocketMessage::Response(Response::NewSubscription {
500 extractor_id,
501 subscription_id,
502 }) => {
503 info!(?extractor_id, ?subscription_id, "Received a new subscription");
504 let inner = guard
505 .as_mut()
506 .ok_or_else(|| DeltasError::NotConnected)?;
507 inner.mark_active(&extractor_id, subscription_id);
508 }
509 WebSocketMessage::Response(Response::SubscriptionEnded {
510 subscription_id,
511 }) => {
512 info!(?subscription_id, "Received a subscription ended");
513 let inner = guard
514 .as_mut()
515 .ok_or_else(|| DeltasError::NotConnected)?;
516 inner.remove_subscription(subscription_id)?;
517 }
518 },
519 Err(e) => {
520 error!(
521 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
522 e, text
523 );
524 }
525 },
526 Err(e) => {
527 error!(
528 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
529 e, text
530 );
531 }
532 },
533 Ok(tungstenite::protocol::Message::Ping(_)) => {
534 let inner = guard
536 .as_mut()
537 .ok_or_else(|| DeltasError::NotConnected)?;
538 if let Err(error) = inner
539 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
540 .await
541 {
542 debug!(?error, "Failed to send pong!");
543 }
544 }
545 Ok(tungstenite::protocol::Message::Pong(_)) => {
546 }
548 Ok(tungstenite::protocol::Message::Close(_)) => {
549 return Err(DeltasError::ConnectionClosed);
550 }
551 Ok(unknown_msg) => {
552 info!("Received an unknown message type: {:?}", unknown_msg);
553 }
554 Err(error) => {
555 error!(?error, "Websocket error");
556 return Err(match error {
557 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
558 tungstenite::Error::AlreadyClosed => {
559 warn!("Received AlreadyClosed error which is indicative of a bug!");
560 DeltasError::ConnectionError(Box::new(error))
561 }
562 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
563 DeltasError::ConnectionError(Box::new(error))
564 }
565 _ => DeltasError::Fatal(error.to_string()),
566 });
567 }
568 };
569 Ok(())
570 }
571
572 async fn unsubscribe_inner(
578 inner: &mut Inner,
579 subscription_id: Uuid,
580 ready_tx: oneshot::Sender<()>,
581 ) -> Result<(), DeltasError> {
582 inner.end_subscription(&subscription_id, ready_tx);
583 let cmd = Command::Unsubscribe { subscription_id };
584 inner
585 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
586 |e| {
587 DeltasError::TransportError(format!(
588 "Failed to serialize unsubscribe command: {e}"
589 ))
590 },
591 )?))
592 .await?;
593 Ok(())
594 }
595}
596
597#[async_trait]
598impl DeltasClient for WsDeltasClient {
599 #[instrument(skip(self))]
600 async fn subscribe(
601 &self,
602 extractor_id: ExtractorIdentity,
603 options: SubscriptionOptions,
604 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
605 trace!("Starting subscribe");
606 self.ensure_connection().await?;
607 let (ready_tx, ready_rx) = oneshot::channel();
608 {
609 let mut guard = self.inner.lock().await;
610 let inner = guard
611 .as_mut()
612 .ok_or_else(|| DeltasError::NotConnected)?;
613 trace!("Sending subscribe command");
614 inner.new_subscription(&extractor_id, ready_tx)?;
615 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
616 inner
617 .ws_send(tungstenite::protocol::Message::Text(
618 serde_json::to_string(&cmd).map_err(|e| {
619 DeltasError::TransportError(format!(
620 "Failed to serialize subscribe command: {e}"
621 ))
622 })?,
623 ))
624 .await?;
625 }
626 trace!("Waiting for subscription response");
627 let rx = ready_rx.await.map_err(|_| {
628 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
629 })?;
630 trace!("Subscription successful");
631 Ok(rx)
632 }
633
634 #[instrument(skip(self))]
635 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
636 self.ensure_connection().await?;
637 let (ready_tx, ready_rx) = oneshot::channel();
638 {
639 let mut guard = self.inner.lock().await;
640 let inner = guard
641 .as_mut()
642 .ok_or_else(|| DeltasError::NotConnected)?;
643
644 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
645 }
646 ready_rx.await.map_err(|_| {
647 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
648 })?;
649
650 Ok(())
651 }
652
653 #[instrument(skip(self))]
654 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
655 if self.is_connected().await {
656 return Err(DeltasError::AlreadyConnected);
657 }
658 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
659 info!(?ws_uri, "Starting TychoWebsocketClient");
660
661 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
662 {
663 let mut guard = self.inner.as_ref().lock().await;
664 *guard = None;
665 }
666 let this = self.clone();
667 let jh = tokio::spawn(async move {
668 let mut retry_count = 0;
669 let mut result = Err(DeltasError::NotConnected);
670
671 'retry: while retry_count < this.max_reconnects {
672 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
673
674 let mut request_builder = Request::builder()
676 .uri(&ws_uri)
677 .header(SEC_WEBSOCKET_KEY, generate_key())
678 .header(SEC_WEBSOCKET_VERSION, 13)
679 .header(CONNECTION, "Upgrade")
680 .header(UPGRADE, "websocket")
681 .header(
682 HOST,
683 this.uri.host().ok_or_else(|| {
684 DeltasError::UriParsing(
685 ws_uri.clone(),
686 "No host found in tycho url".to_string(),
687 )
688 })?,
689 )
690 .header(
691 USER_AGENT,
692 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
693 );
694
695 if let Some(ref key) = this.auth_key {
697 request_builder = request_builder.header(AUTHORIZATION, key);
698 }
699
700 let request = request_builder.body(()).map_err(|e| {
701 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
702 })?;
703 let (conn, _) = match connect_async(request).await {
704 Ok(conn) => conn,
705 Err(e) => {
706 retry_count += 1;
708 let mut guard = this.inner.as_ref().lock().await;
709 *guard = None;
710
711 warn!(
712 e = e.to_string(),
713 "Failed to connect to WebSocket server; Reconnecting"
714 );
715 sleep(Duration::from_millis(500)).await;
716
717 continue 'retry;
718 }
719 };
720
721 let (ws_tx_new, ws_rx_new) = conn.split();
722 {
723 let mut guard = this.inner.as_ref().lock().await;
724 *guard =
725 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
726 }
727 let mut msg_rx = ws_rx_new.boxed();
728
729 info!("Connection Successful: TychoWebsocketClient started");
730 this.conn_notify.notify_waiters();
731 result = Ok(());
732
733 loop {
734 let res = tokio::select! {
735 msg = msg_rx.next() => match msg {
736 Some(msg) => this.handle_msg(msg).await,
737 None => { break 'retry } },
739 _ = cmd_rx.recv() => {break 'retry},
740 };
741 if let Err(error) = res {
742 debug!(?error, "WsError");
743 if matches!(
744 error,
745 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
746 ) {
747 retry_count += 1;
749 let mut guard = this.inner.as_ref().lock().await;
750 *guard = None;
751
752 warn!(
753 ?error,
754 ?retry_count,
755 "Connection dropped unexpectedly; Reconnecting..."
756 );
757 break;
758 } else {
759 error!(?error, "Fatal error; Exiting");
761 result = Err(error);
762 break 'retry;
763 }
764 }
765 }
766 }
767 debug!(
768 retry_count,
769 max_reconnects=?this.max_reconnects,
770 "Reconnection loop ended"
771 );
772 let mut guard = this.inner.as_ref().lock().await;
774 *guard = None;
775
776 if retry_count >= this.max_reconnects {
778 error!("Max reconnection attempts reached; Exiting");
779 this.dead.store(true, Ordering::SeqCst);
780 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
782 }
783
784 result
785 });
786
787 self.conn_notify.notified().await;
788
789 if self.is_connected().await {
790 Ok(jh)
791 } else {
792 Err(DeltasError::NotConnected)
793 }
794 }
795
796 #[instrument(skip(self))]
797 async fn close(&self) -> Result<(), DeltasError> {
798 info!("Closing TychoWebsocketClient");
799 let mut guard = self.inner.lock().await;
800 let inner = guard
801 .as_mut()
802 .ok_or_else(|| DeltasError::NotConnected)?;
803 inner
804 .cmd_tx
805 .send(())
806 .await
807 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
808 Ok(())
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use std::net::SocketAddr;
815
816 use test_log::test;
817 use tokio::{net::TcpListener, time::timeout};
818 use tycho_common::dto::Chain;
819
820 use super::*;
821
822 #[derive(Clone)]
823 enum ExpectedComm {
824 Receive(u64, tungstenite::protocol::Message),
825 Send(tungstenite::protocol::Message),
826 }
827
828 async fn mock_tycho_ws(
829 messages: &[ExpectedComm],
830 reconnects: usize,
831 ) -> (SocketAddr, JoinHandle<()>) {
832 info!("Starting mock webserver");
833 let server = TcpListener::bind("127.0.0.1:0")
835 .await
836 .expect("localhost bind failed");
837 let addr = server.local_addr().unwrap();
838 let messages = messages.to_vec();
839
840 let jh = tokio::spawn(async move {
841 info!("mock webserver started");
842 for _ in 0..(reconnects + 1) {
843 if let Ok((stream, _)) = server.accept().await {
844 let mut websocket = tokio_tungstenite::accept_async(stream)
845 .await
846 .unwrap();
847
848 info!("Handling messages..");
849 for c in messages.iter().cloned() {
850 match c {
851 ExpectedComm::Receive(t, exp) => {
852 info!("Awaiting message...");
853 let msg = timeout(Duration::from_millis(t), websocket.next())
854 .await
855 .expect("Receive timeout")
856 .expect("Stream exhausted")
857 .expect("Failed to receive message.");
858 info!("Message received");
859 assert_eq!(msg, exp)
860 }
861 ExpectedComm::Send(data) => {
862 info!("Sending message");
863 websocket
864 .send(data)
865 .await
866 .expect("Failed to send message");
867 info!("Message sent");
868 }
869 };
870 }
871 sleep(Duration::from_millis(100)).await;
872 let _ = websocket.close(None).await;
874 }
875 }
876 });
877 (addr, jh)
878 }
879
880 #[tokio::test]
881 async fn test_subscribe_receive() {
882 let exp_comm = [
883 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
884 {
885 "method":"subscribe",
886 "extractor_id":{
887 "chain":"ethereum",
888 "name":"vm:ambient"
889 },
890 "include_state": true
891 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
892 )),
893 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
894 {
895 "method":"newsubscription",
896 "extractor_id":{
897 "chain":"ethereum",
898 "name":"vm:ambient"
899 },
900 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
901 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
902 )),
903 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
904 {
905 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
906 "deltas": {
907 "extractor": "vm:ambient",
908 "chain": "ethereum",
909 "block": {
910 "number": 123,
911 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
912 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
913 "chain": "ethereum",
914 "ts": "2023-09-14T00:00:00"
915 },
916 "finalized_block_height": 0,
917 "revert": false,
918 "new_tokens": {},
919 "account_updates": {
920 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
921 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
922 "chain": "ethereum",
923 "slots": {},
924 "balance": "0x01f4",
925 "code": "",
926 "change": "Update"
927 }
928 },
929 "state_updates": {
930 "component_1": {
931 "component_id": "component_1",
932 "updated_attributes": {"attr1": "0x01"},
933 "deleted_attributes": ["attr2"]
934 }
935 },
936 "new_protocol_components":
937 { "protocol_1": {
938 "id": "protocol_1",
939 "protocol_system": "system_1",
940 "protocol_type_name": "type_1",
941 "chain": "ethereum",
942 "tokens": ["0x01", "0x02"],
943 "contract_ids": ["0x01", "0x02"],
944 "static_attributes": {"attr1": "0x01f4"},
945 "change": "Update",
946 "creation_tx": "0x01",
947 "created_at": "2023-09-14T00:00:00"
948 }
949 },
950 "deleted_protocol_components": {},
951 "component_balances": {
952 "protocol_1":
953 {
954 "0x01": {
955 "token": "0x01",
956 "balance": "0x01f4",
957 "balance_float": 0.0,
958 "modify_tx": "0x01",
959 "component_id": "protocol_1"
960 }
961 }
962 },
963 "account_balances": {
964 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
965 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
966 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
967 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
968 "balance": "0x01f4",
969 "modify_tx": "0x01"
970 }
971 }
972 },
973 "component_tvl": {
974 "protocol_1": 1000.0
975 },
976 "dci_update": {
977 "new_entrypoints": {},
978 "new_entrypoint_params": {},
979 "trace_results": {}
980 }
981 }
982 }
983 "#.to_owned()
984 ))
985 ];
986 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
987
988 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
989 let jh = client
990 .connect()
991 .await
992 .expect("connect failed");
993 let (_, mut rx) = timeout(
994 Duration::from_millis(100),
995 client.subscribe(
996 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
997 SubscriptionOptions::new(),
998 ),
999 )
1000 .await
1001 .expect("subscription timed out")
1002 .expect("subscription failed");
1003 let _ = timeout(Duration::from_millis(100), rx.recv())
1004 .await
1005 .expect("awaiting message timeout out")
1006 .expect("receiving message failed");
1007 timeout(Duration::from_millis(100), client.close())
1008 .await
1009 .expect("close timed out")
1010 .expect("close failed");
1011 jh.await
1012 .expect("ws loop errored")
1013 .unwrap();
1014 server_thread.await.unwrap();
1015 }
1016
1017 #[tokio::test]
1018 async fn test_unsubscribe() {
1019 let exp_comm = [
1020 ExpectedComm::Receive(
1021 100,
1022 tungstenite::protocol::Message::Text(
1023 r#"
1024 {
1025 "method": "subscribe",
1026 "extractor_id":{
1027 "chain": "ethereum",
1028 "name": "vm:ambient"
1029 },
1030 "include_state": true
1031 }"#
1032 .to_owned()
1033 .replace(|c: char| c.is_whitespace(), ""),
1034 ),
1035 ),
1036 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1037 r#"
1038 {
1039 "method": "newsubscription",
1040 "extractor_id":{
1041 "chain": "ethereum",
1042 "name": "vm:ambient"
1043 },
1044 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1045 }"#
1046 .to_owned()
1047 .replace(|c: char| c.is_whitespace(), ""),
1048 )),
1049 ExpectedComm::Receive(
1050 100,
1051 tungstenite::protocol::Message::Text(
1052 r#"
1053 {
1054 "method": "unsubscribe",
1055 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1056 }
1057 "#
1058 .to_owned()
1059 .replace(|c: char| c.is_whitespace(), ""),
1060 ),
1061 ),
1062 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1063 r#"
1064 {
1065 "method": "subscriptionended",
1066 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1067 }
1068 "#
1069 .to_owned()
1070 .replace(|c: char| c.is_whitespace(), ""),
1071 )),
1072 ];
1073 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1074
1075 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1076 let jh = client
1077 .connect()
1078 .await
1079 .expect("connect failed");
1080 let (sub_id, mut rx) = timeout(
1081 Duration::from_millis(100),
1082 client.subscribe(
1083 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1084 SubscriptionOptions::new(),
1085 ),
1086 )
1087 .await
1088 .expect("subscription timed out")
1089 .expect("subscription failed");
1090
1091 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1092 .await
1093 .expect("unsubscribe timed out")
1094 .expect("unsubscribe failed");
1095 let res = timeout(Duration::from_millis(100), rx.recv())
1096 .await
1097 .expect("awaiting message timeout out");
1098
1099 assert!(res.is_none());
1101
1102 timeout(Duration::from_millis(100), client.close())
1103 .await
1104 .expect("close timed out")
1105 .expect("close failed");
1106 jh.await
1107 .expect("ws loop errored")
1108 .unwrap();
1109 server_thread.await.unwrap();
1110 }
1111
1112 #[tokio::test]
1113 async fn test_subscription_unexpected_end() {
1114 let exp_comm = [
1115 ExpectedComm::Receive(
1116 100,
1117 tungstenite::protocol::Message::Text(
1118 r#"
1119 {
1120 "method":"subscribe",
1121 "extractor_id":{
1122 "chain":"ethereum",
1123 "name":"vm:ambient"
1124 },
1125 "include_state": true
1126 }"#
1127 .to_owned()
1128 .replace(|c: char| c.is_whitespace(), ""),
1129 ),
1130 ),
1131 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1132 r#"
1133 {
1134 "method":"newsubscription",
1135 "extractor_id":{
1136 "chain":"ethereum",
1137 "name":"vm:ambient"
1138 },
1139 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1140 }"#
1141 .to_owned()
1142 .replace(|c: char| c.is_whitespace(), ""),
1143 )),
1144 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1145 r#"
1146 {
1147 "method": "subscriptionended",
1148 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1149 }"#
1150 .to_owned()
1151 .replace(|c: char| c.is_whitespace(), ""),
1152 )),
1153 ];
1154 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1155
1156 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1157 let jh = client
1158 .connect()
1159 .await
1160 .expect("connect failed");
1161 let (_, mut rx) = timeout(
1162 Duration::from_millis(100),
1163 client.subscribe(
1164 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1165 SubscriptionOptions::new(),
1166 ),
1167 )
1168 .await
1169 .expect("subscription timed out")
1170 .expect("subscription failed");
1171 let res = timeout(Duration::from_millis(100), rx.recv())
1172 .await
1173 .expect("awaiting message timeout out");
1174
1175 assert!(res.is_none());
1177
1178 timeout(Duration::from_millis(100), client.close())
1179 .await
1180 .expect("close timed out")
1181 .expect("close failed");
1182 jh.await
1183 .expect("ws loop errored")
1184 .unwrap();
1185 server_thread.await.unwrap();
1186 }
1187
1188 #[test_log::test(tokio::test)]
1189 async fn test_reconnect() {
1190 let exp_comm = [
1191 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1192 {
1193 "method":"subscribe",
1194 "extractor_id":{
1195 "chain":"ethereum",
1196 "name":"vm:ambient"
1197 },
1198 "include_state": true
1199 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1200 )),
1201 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1202 {
1203 "method":"newsubscription",
1204 "extractor_id":{
1205 "chain":"ethereum",
1206 "name":"vm:ambient"
1207 },
1208 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1209 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1210 )),
1211 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1212 {
1213 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1214 "deltas": {
1215 "extractor": "vm:ambient",
1216 "chain": "ethereum",
1217 "block": {
1218 "number": 123,
1219 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1220 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1221 "chain": "ethereum",
1222 "ts": "2023-09-14T00:00:00"
1223 },
1224 "finalized_block_height": 0,
1225 "revert": false,
1226 "new_tokens": {},
1227 "account_updates": {
1228 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1229 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1230 "chain": "ethereum",
1231 "slots": {},
1232 "balance": "0x01f4",
1233 "code": "",
1234 "change": "Update"
1235 }
1236 },
1237 "state_updates": {
1238 "component_1": {
1239 "component_id": "component_1",
1240 "updated_attributes": {"attr1": "0x01"},
1241 "deleted_attributes": ["attr2"]
1242 }
1243 },
1244 "new_protocol_components": {
1245 "protocol_1":
1246 {
1247 "id": "protocol_1",
1248 "protocol_system": "system_1",
1249 "protocol_type_name": "type_1",
1250 "chain": "ethereum",
1251 "tokens": ["0x01", "0x02"],
1252 "contract_ids": ["0x01", "0x02"],
1253 "static_attributes": {"attr1": "0x01f4"},
1254 "change": "Update",
1255 "creation_tx": "0x01",
1256 "created_at": "2023-09-14T00:00:00"
1257 }
1258 },
1259 "deleted_protocol_components": {},
1260 "component_balances": {
1261 "protocol_1": {
1262 "0x01": {
1263 "token": "0x01",
1264 "balance": "0x01f4",
1265 "balance_float": 1000.0,
1266 "modify_tx": "0x01",
1267 "component_id": "protocol_1"
1268 }
1269 }
1270 },
1271 "account_balances": {
1272 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1273 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1274 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1275 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1276 "balance": "0x01f4",
1277 "modify_tx": "0x01"
1278 }
1279 }
1280 },
1281 "component_tvl": {
1282 "protocol_1": 1000.0
1283 },
1284 "dci_update": {
1285 "new_entrypoints": {},
1286 "new_entrypoint_params": {},
1287 "trace_results": {}
1288 }
1289 }
1290 }
1291 "#.to_owned()
1292 ))
1293 ];
1294 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1295 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1296 let jh: JoinHandle<Result<(), DeltasError>> = client
1297 .connect()
1298 .await
1299 .expect("connect failed");
1300
1301 for _ in 0..2 {
1302 let (_, mut rx) = timeout(
1303 Duration::from_millis(100),
1304 client.subscribe(
1305 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1306 SubscriptionOptions::new(),
1307 ),
1308 )
1309 .await
1310 .expect("subscription timed out")
1311 .expect("subscription failed");
1312
1313 let _ = timeout(Duration::from_millis(100), rx.recv())
1314 .await
1315 .expect("awaiting message timeout out")
1316 .expect("receiving message failed");
1317
1318 let res = timeout(Duration::from_millis(200), rx.recv())
1320 .await
1321 .expect("awaiting closed connection timeout out");
1322 assert!(res.is_none());
1323 }
1324 let res = jh.await.expect("ws client join failed");
1325 assert!(res.is_err());
1327 server_thread
1328 .await
1329 .expect("ws server loop errored");
1330 }
1331
1332 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1333 let server = TcpListener::bind("127.0.0.1:0")
1334 .await
1335 .expect("localhost bind failed");
1336 let addr = server.local_addr().unwrap();
1337 let jh = tokio::spawn(async move {
1338 while let Ok((stream, _)) = server.accept().await {
1339 if accept_first {
1340 let stream = tokio_tungstenite::accept_async(stream)
1342 .await
1343 .unwrap();
1344 sleep(Duration::from_millis(10)).await;
1345 drop(stream)
1346 } else {
1347 drop(stream);
1349 }
1350 }
1351 });
1352 (addr, jh)
1353 }
1354
1355 #[test(tokio::test)]
1356 async fn test_subscribe_dead_client_after_max_attempts() {
1357 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1358 let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1359
1360 let join_handle = client.connect().await.unwrap();
1361 let handle_res = join_handle.await.unwrap();
1362 assert!(handle_res.is_err());
1363 assert!(!client.is_connected().await);
1364
1365 let subscription_res = timeout(
1366 Duration::from_millis(10),
1367 client.subscribe(
1368 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1369 SubscriptionOptions::new(),
1370 ),
1371 )
1372 .await
1373 .unwrap();
1374 assert!(subscription_res.is_err());
1375 }
1376}