1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::Arc,
25 time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31 header::{
32 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33 USER_AGENT,
34 },
35 Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41 net::TcpStream,
42 sync::{
43 mpsc::{self, error::TrySendError, Receiver, Sender},
44 oneshot, Mutex, Notify,
45 },
46 task::JoinHandle,
47 time::sleep,
48};
49use tokio_tungstenite::{
50 connect_async,
51 tungstenite::{
52 self,
53 handshake::client::{generate_key, Request},
54 },
55 MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65 #[error("Failed to parse URI: {0}. Error: {1}")]
67 UriParsing(String, String),
68
69 #[error("The requested subscription is already pending")]
71 SubscriptionAlreadyPending,
72
73 #[error("{0}")]
76 TransportError(String),
77
78 #[error("The buffer is full!")]
82 BufferFull,
83
84 #[error("The client is not connected!")]
88 NotConnected,
89
90 #[error("The client is already connected!")]
92 AlreadyConnected,
93
94 #[error("The server closed the connection!")]
96 ConnectionClosed,
97
98 #[error("Connection error: {0}")]
100 ConnectionError(#[from] Box<tungstenite::Error>),
101
102 #[error("Tycho FatalError: {0}")]
104 Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109 include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113 fn default() -> Self {
114 Self { include_state: true }
115 }
116}
117
118impl SubscriptionOptions {
119 pub fn new() -> Self {
120 Self::default()
121 }
122 pub fn with_state(mut self, val: bool) -> Self {
123 self.include_state = val;
124 self
125 }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131 async fn subscribe(
138 &self,
139 extractor_id: ExtractorIdentity,
140 options: SubscriptionOptions,
141 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149 async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155 uri: Uri,
157 auth_key: Option<String>,
159 max_reconnects: u32,
161 ws_buffer_size: usize,
164 subscription_buffer_size: usize,
167 conn_notify: Arc<Notify>,
169 inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176#[derive(Debug)]
188enum SubscriptionInfo {
189 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191 Active,
193 RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197struct Inner {
199 sink: WebSocketSink,
201 cmd_tx: Sender<()>,
203 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205 subscriptions: HashMap<Uuid, SubscriptionInfo>,
207 sender: HashMap<Uuid, Sender<BlockChanges>>,
210 buffer_size: usize,
212}
213
214impl Inner {
218 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219 Self {
220 sink,
221 cmd_tx,
222 pending: HashMap::new(),
223 subscriptions: HashMap::new(),
224 sender: HashMap::new(),
225 buffer_size,
226 }
227 }
228
229 #[allow(clippy::result_large_err)]
231 fn new_subscription(
232 &mut self,
233 id: &ExtractorIdentity,
234 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235 ) -> Result<(), DeltasError> {
236 if self.pending.contains_key(id) {
237 return Err(DeltasError::SubscriptionAlreadyPending);
238 }
239 self.pending
240 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241 Ok(())
242 }
243
244 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248 if let Some(info) = self.pending.remove(extractor_id) {
249 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250 let (tx, rx) = mpsc::channel(self.buffer_size);
251 self.sender.insert(subscription_id, tx);
252 self.subscriptions
253 .insert(subscription_id, SubscriptionInfo::Active);
254 let _ = ready_tx
255 .send((subscription_id, rx))
256 .map_err(|_| {
257 warn!(
258 ?extractor_id,
259 ?subscription_id,
260 "Subscriber for has gone away. Ignoring."
261 )
262 });
263 } else {
264 error!(
265 ?extractor_id,
266 ?subscription_id,
267 "Pending subscription was not in the correct state to
268 transition to active. Ignoring!"
269 )
270 }
271 } else {
272 error!(
273 ?extractor_id,
274 ?subscription_id,
275 "Tried to mark an unkown subscription as active. Ignoring!"
276 );
277 }
278 }
279
280 #[allow(clippy::result_large_err)]
282 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283 if let Some(sender) = self.sender.get_mut(id) {
284 sender
285 .try_send(msg)
286 .map_err(|e| match e {
287 TrySendError::Full(_) => DeltasError::BufferFull,
288 TrySendError::Closed(_) => {
289 DeltasError::TransportError("The subscriber has gone away".to_string())
290 }
291 })?;
292 }
293 Ok(())
294 }
295
296 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301 if let Some(info) = self
302 .subscriptions
303 .get_mut(subscription_id)
304 {
305 if let SubscriptionInfo::Active = info {
306 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307 }
308 } else {
309 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311 }
312 }
313
314 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321 if let Entry::Occupied(e) = self
322 .subscriptions
323 .entry(subscription_id)
324 {
325 let info = e.remove();
326 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327 let _ = tx.send(()).map_err(|_| {
328 warn!(?subscription_id, "failed to notify about removed subscription")
329 });
330 self.sender
331 .remove(&subscription_id)
332 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333 } else {
334 warn!(?subscription_id, "Subscription ended unexpectedly!");
335 self.sender
336 .remove(&subscription_id)
337 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338 }
339 } else {
340 error!(
341 ?subscription_id,
342 "Received `SubscriptionEnded`, but was never subscribed
343 to it. This is likely a bug!"
344 );
345 }
346
347 Ok(())
348 }
349
350 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352 self.sink.send(msg).await.map_err(|e| {
353 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354 })
355 }
356}
357
358impl WsDeltasClient {
360 #[allow(clippy::result_large_err)]
362 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363 let uri = ws_uri
364 .parse::<Uri>()
365 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366 Ok(Self {
367 uri,
368 auth_key: auth_key.map(|s| s.to_string()),
369 inner: Arc::new(Mutex::new(None)),
370 ws_buffer_size: 128,
371 subscription_buffer_size: 128,
372 conn_notify: Arc::new(Notify::new()),
373 max_reconnects: 5,
374 })
375 }
376
377 #[allow(clippy::result_large_err)]
379 pub fn new_with_reconnects(
380 ws_uri: &str,
381 max_reconnects: u32,
382 auth_key: Option<&str>,
383 ) -> Result<Self, DeltasError> {
384 let uri = ws_uri
385 .parse::<Uri>()
386 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388 Ok(Self {
389 uri,
390 auth_key: auth_key.map(|s| s.to_string()),
391 inner: Arc::new(Mutex::new(None)),
392 ws_buffer_size: 128,
393 subscription_buffer_size: 128,
394 conn_notify: Arc::new(Notify::new()),
395 max_reconnects,
396 })
397 }
398
399 async fn is_connected(&self) -> bool {
403 let guard = self.inner.as_ref().lock().await;
404 guard.is_some()
405 }
406
407 async fn ensure_connection(&self) {
412 if !self.is_connected().await {
413 self.conn_notify.notified().await;
414 }
415 }
416
417 #[instrument(skip(self, msg))]
422 async fn handle_msg(
423 &self,
424 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425 ) -> Result<(), DeltasError> {
426 let mut guard = self.inner.lock().await;
427
428 match msg {
429 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434 serde_json::Value,
435 >(&text)
436 {
437 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438 Ok(ws_message) => match ws_message {
439 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440 trace!(?deltas, "Received a block state change, sending to channel");
441 let inner = guard
442 .as_mut()
443 .ok_or_else(|| DeltasError::NotConnected)?;
444 match inner.send(&subscription_id, deltas) {
445 Err(DeltasError::BufferFull) => {
446 error!(?subscription_id, "Buffer full, message dropped!");
447 }
448 Err(_) => {
449 warn!(
450 ?subscription_id,
451 "Receiver for has gone away, unsubscribing!"
452 );
453 let (tx, rx) = oneshot::channel();
454 if let Err(e) = WsDeltasClient::unsubscribe_inner(
455 inner,
456 subscription_id,
457 tx,
458 )
459 .await
460 {
461 warn!(
462 ?e,
463 ?subscription_id,
464 "Failed to send unsubscribe command"
465 );
466 } else {
467 match tokio::time::timeout(Duration::from_secs(5), rx).await
469 {
470 Ok(_) => {
471 debug!(
472 ?subscription_id,
473 "Unsubscribe completed successfully"
474 );
475 }
476 Err(_) => {
477 warn!(
478 ?subscription_id,
479 "Unsubscribe completion timed out"
480 );
481 }
482 }
483 }
484 }
485 _ => { }
486 }
487 }
488 WebSocketMessage::Response(Response::NewSubscription {
489 extractor_id,
490 subscription_id,
491 }) => {
492 info!(?extractor_id, ?subscription_id, "Received a new subscription");
493 let inner = guard
494 .as_mut()
495 .ok_or_else(|| DeltasError::NotConnected)?;
496 inner.mark_active(&extractor_id, subscription_id);
497 }
498 WebSocketMessage::Response(Response::SubscriptionEnded {
499 subscription_id,
500 }) => {
501 info!(?subscription_id, "Received a subscription ended");
502 let inner = guard
503 .as_mut()
504 .ok_or_else(|| DeltasError::NotConnected)?;
505 inner.remove_subscription(subscription_id)?;
506 }
507 },
508 Err(e) => {
509 error!(
510 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
511 e, text
512 );
513 }
514 },
515 Err(e) => {
516 error!(
517 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
518 e, text
519 );
520 }
521 },
522 Ok(tungstenite::protocol::Message::Ping(_)) => {
523 let inner = guard
525 .as_mut()
526 .ok_or_else(|| DeltasError::NotConnected)?;
527 if let Err(error) = inner
528 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
529 .await
530 {
531 debug!(?error, "Failed to send pong!");
532 }
533 }
534 Ok(tungstenite::protocol::Message::Pong(_)) => {
535 }
537 Ok(tungstenite::protocol::Message::Close(_)) => {
538 return Err(DeltasError::ConnectionClosed);
539 }
540 Ok(unknown_msg) => {
541 info!("Received an unknown message type: {:?}", unknown_msg);
542 }
543 Err(error) => {
544 error!(?error, "Websocket error");
545 return Err(match error {
546 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
547 tungstenite::Error::AlreadyClosed => {
548 warn!("Received AlreadyClosed error which is indicative of a bug!");
549 DeltasError::ConnectionError(Box::new(error))
550 }
551 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
552 DeltasError::ConnectionError(Box::new(error))
553 }
554 _ => DeltasError::Fatal(error.to_string()),
555 });
556 }
557 };
558 Ok(())
559 }
560
561 async fn unsubscribe_inner(
567 inner: &mut Inner,
568 subscription_id: Uuid,
569 ready_tx: oneshot::Sender<()>,
570 ) -> Result<(), DeltasError> {
571 inner.end_subscription(&subscription_id, ready_tx);
572 let cmd = Command::Unsubscribe { subscription_id };
573 inner
574 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
575 |e| {
576 DeltasError::TransportError(format!(
577 "Failed to serialize unsubscribe command: {e}"
578 ))
579 },
580 )?))
581 .await?;
582 Ok(())
583 }
584}
585
586#[async_trait]
587impl DeltasClient for WsDeltasClient {
588 #[instrument(skip(self))]
589 async fn subscribe(
590 &self,
591 extractor_id: ExtractorIdentity,
592 options: SubscriptionOptions,
593 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
594 trace!("Starting subscribe");
595 self.ensure_connection().await;
596 let (ready_tx, ready_rx) = oneshot::channel();
597 {
598 let mut guard = self.inner.lock().await;
599 let inner = guard
600 .as_mut()
601 .ok_or_else(|| DeltasError::NotConnected)?;
602 trace!("Sending subscribe command");
603 inner.new_subscription(&extractor_id, ready_tx)?;
604 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
605 inner
606 .ws_send(tungstenite::protocol::Message::Text(
607 serde_json::to_string(&cmd).map_err(|e| {
608 DeltasError::TransportError(format!(
609 "Failed to serialize subscribe command: {e}"
610 ))
611 })?,
612 ))
613 .await?;
614 }
615 trace!("Waiting for subscription response");
616 let rx = ready_rx.await.map_err(|_| {
617 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
618 })?;
619 trace!("Subscription successful");
620 Ok(rx)
621 }
622
623 #[instrument(skip(self))]
624 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
625 self.ensure_connection().await;
626 let (ready_tx, ready_rx) = oneshot::channel();
627 {
628 let mut guard = self.inner.lock().await;
629 let inner = guard
630 .as_mut()
631 .ok_or_else(|| DeltasError::NotConnected)?;
632
633 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
634 }
635 ready_rx.await.map_err(|_| {
636 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
637 })?;
638
639 Ok(())
640 }
641
642 #[instrument(skip(self))]
643 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
644 if self.is_connected().await {
645 return Err(DeltasError::AlreadyConnected);
646 }
647 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
648 info!(?ws_uri, "Starting TychoWebsocketClient");
649
650 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
651 {
652 let mut guard = self.inner.as_ref().lock().await;
653 *guard = None;
654 }
655 let this = self.clone();
656 let jh = tokio::spawn(async move {
657 let mut retry_count = 0;
658 let mut result = Err(DeltasError::NotConnected);
659
660 'retry: while retry_count < this.max_reconnects {
661 info!(?ws_uri, "Connecting to WebSocket server");
662
663 let mut request_builder = Request::builder()
665 .uri(&ws_uri)
666 .header(SEC_WEBSOCKET_KEY, generate_key())
667 .header(SEC_WEBSOCKET_VERSION, 13)
668 .header(CONNECTION, "Upgrade")
669 .header(UPGRADE, "websocket")
670 .header(
671 HOST,
672 this.uri.host().ok_or_else(|| {
673 DeltasError::UriParsing(
674 ws_uri.clone(),
675 "No host found in tycho url".to_string(),
676 )
677 })?,
678 )
679 .header(
680 USER_AGENT,
681 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
682 );
683
684 if let Some(ref key) = this.auth_key {
686 request_builder = request_builder.header(AUTHORIZATION, key);
687 }
688
689 let request = request_builder.body(()).map_err(|e| {
690 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
691 })?;
692 let (conn, _) = match connect_async(request).await {
693 Ok(conn) => conn,
694 Err(e) => {
695 retry_count += 1;
697 let mut guard = this.inner.as_ref().lock().await;
698 *guard = None;
699
700 warn!(
701 e = e.to_string(),
702 "Failed to connect to WebSocket server; Reconnecting"
703 );
704 sleep(Duration::from_millis(500)).await;
705
706 continue 'retry;
707 }
708 };
709
710 let (ws_tx_new, ws_rx_new) = conn.split();
711 {
712 let mut guard = this.inner.as_ref().lock().await;
713 *guard =
714 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
715 }
716 let mut msg_rx = ws_rx_new.boxed();
717
718 info!("Connection Successful: TychoWebsocketClient started");
719 this.conn_notify.notify_waiters();
720 result = Ok(());
721
722 loop {
723 let res = tokio::select! {
724 msg = msg_rx.next() => match msg {
725 Some(msg) => this.handle_msg(msg).await,
726 None => { break 'retry } },
728 _ = cmd_rx.recv() => {break 'retry},
729 };
730 if let Err(error) = res {
731 if matches!(
732 error,
733 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
734 ) {
735 retry_count += 1;
737 let mut guard = this.inner.as_ref().lock().await;
738 *guard = None;
739
740 warn!(
741 ?error,
742 ?retry_count,
743 "Connection dropped unexpectedly; Reconnecting..."
744 );
745 break;
746 } else {
747 error!(?error, "Fatal error; Exiting");
749 result = Err(error);
750 break 'retry;
751 }
752 }
753 }
754 }
755
756 let mut guard = this.inner.as_ref().lock().await;
758 *guard = None;
759
760 if retry_count >= this.max_reconnects {
762 error!("Max reconnection attempts reached; Exiting");
763 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
765 }
766
767 result
768 });
769
770 self.conn_notify.notified().await;
771
772 if self.is_connected().await {
773 Ok(jh)
774 } else {
775 Err(DeltasError::NotConnected)
776 }
777 }
778
779 #[instrument(skip(self))]
780 async fn close(&self) -> Result<(), DeltasError> {
781 info!("Closing TychoWebsocketClient");
782 let mut guard = self.inner.lock().await;
783 let inner = guard
784 .as_mut()
785 .ok_or_else(|| DeltasError::NotConnected)?;
786 inner
787 .cmd_tx
788 .send(())
789 .await
790 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
791 Ok(())
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use std::net::SocketAddr;
798
799 use tokio::{net::TcpListener, time::timeout};
800 use tycho_common::dto::Chain;
801
802 use super::*;
803
804 #[derive(Clone)]
805 enum ExpectedComm {
806 Receive(u64, tungstenite::protocol::Message),
807 Send(tungstenite::protocol::Message),
808 }
809
810 async fn mock_tycho_ws(
811 messages: &[ExpectedComm],
812 reconnects: usize,
813 ) -> (SocketAddr, JoinHandle<()>) {
814 info!("Starting mock webserver");
815 let server = TcpListener::bind("127.0.0.1:0")
817 .await
818 .expect("localhost bind failed");
819 let addr = server.local_addr().unwrap();
820 let messages = messages.to_vec();
821
822 let jh = tokio::spawn(async move {
823 info!("mock webserver started");
824 for _ in 0..(reconnects + 1) {
825 if let Ok((stream, _)) = server.accept().await {
826 let mut websocket = tokio_tungstenite::accept_async(stream)
827 .await
828 .unwrap();
829
830 info!("Handling messages..");
831 for c in messages.iter().cloned() {
832 match c {
833 ExpectedComm::Receive(t, exp) => {
834 info!("Awaiting message...");
835 let msg = timeout(Duration::from_millis(t), websocket.next())
836 .await
837 .expect("Receive timeout")
838 .expect("Stream exhausted")
839 .expect("Failed to receive message.");
840 info!("Message received");
841 assert_eq!(msg, exp)
842 }
843 ExpectedComm::Send(data) => {
844 info!("Sending message");
845 websocket
846 .send(data)
847 .await
848 .expect("Failed to send message");
849 info!("Message sent");
850 }
851 };
852 }
853 sleep(Duration::from_millis(100)).await;
854 let _ = websocket.close(None).await;
856 }
857 }
858 });
859 (addr, jh)
860 }
861
862 #[tokio::test]
863 async fn test_subscribe_receive() {
864 let exp_comm = [
865 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
866 {
867 "method":"subscribe",
868 "extractor_id":{
869 "chain":"ethereum",
870 "name":"vm:ambient"
871 },
872 "include_state": true
873 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
874 )),
875 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
876 {
877 "method":"newsubscription",
878 "extractor_id":{
879 "chain":"ethereum",
880 "name":"vm:ambient"
881 },
882 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
883 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
884 )),
885 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
886 {
887 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
888 "deltas": {
889 "extractor": "vm:ambient",
890 "chain": "ethereum",
891 "block": {
892 "number": 123,
893 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
894 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
895 "chain": "ethereum",
896 "ts": "2023-09-14T00:00:00"
897 },
898 "finalized_block_height": 0,
899 "revert": false,
900 "new_tokens": {},
901 "account_updates": {
902 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
903 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
904 "chain": "ethereum",
905 "slots": {},
906 "balance": "0x01f4",
907 "code": "",
908 "change": "Update"
909 }
910 },
911 "state_updates": {
912 "component_1": {
913 "component_id": "component_1",
914 "updated_attributes": {"attr1": "0x01"},
915 "deleted_attributes": ["attr2"]
916 }
917 },
918 "new_protocol_components":
919 { "protocol_1": {
920 "id": "protocol_1",
921 "protocol_system": "system_1",
922 "protocol_type_name": "type_1",
923 "chain": "ethereum",
924 "tokens": ["0x01", "0x02"],
925 "contract_ids": ["0x01", "0x02"],
926 "static_attributes": {"attr1": "0x01f4"},
927 "change": "Update",
928 "creation_tx": "0x01",
929 "created_at": "2023-09-14T00:00:00"
930 }
931 },
932 "deleted_protocol_components": {},
933 "component_balances": {
934 "protocol_1":
935 {
936 "0x01": {
937 "token": "0x01",
938 "balance": "0x01f4",
939 "balance_float": 0.0,
940 "modify_tx": "0x01",
941 "component_id": "protocol_1"
942 }
943 }
944 },
945 "account_balances": {
946 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
947 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
948 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
949 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
950 "balance": "0x01f4",
951 "modify_tx": "0x01"
952 }
953 }
954 },
955 "component_tvl": {
956 "protocol_1": 1000.0
957 },
958 "dci_update": {
959 "new_entrypoints": {},
960 "new_entrypoint_params": {},
961 "trace_results": {}
962 }
963 }
964 }
965 "#.to_owned()
966 ))
967 ];
968 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
969
970 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
971 let jh = client
972 .connect()
973 .await
974 .expect("connect failed");
975 let (_, mut rx) = timeout(
976 Duration::from_millis(100),
977 client.subscribe(
978 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
979 SubscriptionOptions::new(),
980 ),
981 )
982 .await
983 .expect("subscription timed out")
984 .expect("subscription failed");
985 let _ = timeout(Duration::from_millis(100), rx.recv())
986 .await
987 .expect("awaiting message timeout out")
988 .expect("receiving message failed");
989 timeout(Duration::from_millis(100), client.close())
990 .await
991 .expect("close timed out")
992 .expect("close failed");
993 jh.await
994 .expect("ws loop errored")
995 .unwrap();
996 server_thread.await.unwrap();
997 }
998
999 #[tokio::test]
1000 async fn test_unsubscribe() {
1001 let exp_comm = [
1002 ExpectedComm::Receive(
1003 100,
1004 tungstenite::protocol::Message::Text(
1005 r#"
1006 {
1007 "method": "subscribe",
1008 "extractor_id":{
1009 "chain": "ethereum",
1010 "name": "vm:ambient"
1011 },
1012 "include_state": true
1013 }"#
1014 .to_owned()
1015 .replace(|c: char| c.is_whitespace(), ""),
1016 ),
1017 ),
1018 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1019 r#"
1020 {
1021 "method": "newsubscription",
1022 "extractor_id":{
1023 "chain": "ethereum",
1024 "name": "vm:ambient"
1025 },
1026 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1027 }"#
1028 .to_owned()
1029 .replace(|c: char| c.is_whitespace(), ""),
1030 )),
1031 ExpectedComm::Receive(
1032 100,
1033 tungstenite::protocol::Message::Text(
1034 r#"
1035 {
1036 "method": "unsubscribe",
1037 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1038 }
1039 "#
1040 .to_owned()
1041 .replace(|c: char| c.is_whitespace(), ""),
1042 ),
1043 ),
1044 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1045 r#"
1046 {
1047 "method": "subscriptionended",
1048 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1049 }
1050 "#
1051 .to_owned()
1052 .replace(|c: char| c.is_whitespace(), ""),
1053 )),
1054 ];
1055 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1056
1057 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1058 let jh = client
1059 .connect()
1060 .await
1061 .expect("connect failed");
1062 let (sub_id, mut rx) = timeout(
1063 Duration::from_millis(100),
1064 client.subscribe(
1065 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1066 SubscriptionOptions::new(),
1067 ),
1068 )
1069 .await
1070 .expect("subscription timed out")
1071 .expect("subscription failed");
1072
1073 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1074 .await
1075 .expect("unsubscribe timed out")
1076 .expect("unsubscribe failed");
1077 let res = timeout(Duration::from_millis(100), rx.recv())
1078 .await
1079 .expect("awaiting message timeout out");
1080
1081 assert!(res.is_none());
1083
1084 timeout(Duration::from_millis(100), client.close())
1085 .await
1086 .expect("close timed out")
1087 .expect("close failed");
1088 jh.await
1089 .expect("ws loop errored")
1090 .unwrap();
1091 server_thread.await.unwrap();
1092 }
1093
1094 #[tokio::test]
1095 async fn test_subscription_unexpected_end() {
1096 let exp_comm = [
1097 ExpectedComm::Receive(
1098 100,
1099 tungstenite::protocol::Message::Text(
1100 r#"
1101 {
1102 "method":"subscribe",
1103 "extractor_id":{
1104 "chain":"ethereum",
1105 "name":"vm:ambient"
1106 },
1107 "include_state": true
1108 }"#
1109 .to_owned()
1110 .replace(|c: char| c.is_whitespace(), ""),
1111 ),
1112 ),
1113 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1114 r#"
1115 {
1116 "method":"newsubscription",
1117 "extractor_id":{
1118 "chain":"ethereum",
1119 "name":"vm:ambient"
1120 },
1121 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1122 }"#
1123 .to_owned()
1124 .replace(|c: char| c.is_whitespace(), ""),
1125 )),
1126 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1127 r#"
1128 {
1129 "method": "subscriptionended",
1130 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1131 }"#
1132 .to_owned()
1133 .replace(|c: char| c.is_whitespace(), ""),
1134 )),
1135 ];
1136 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1137
1138 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1139 let jh = client
1140 .connect()
1141 .await
1142 .expect("connect failed");
1143 let (_, mut rx) = timeout(
1144 Duration::from_millis(100),
1145 client.subscribe(
1146 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1147 SubscriptionOptions::new(),
1148 ),
1149 )
1150 .await
1151 .expect("subscription timed out")
1152 .expect("subscription failed");
1153 let res = timeout(Duration::from_millis(100), rx.recv())
1154 .await
1155 .expect("awaiting message timeout out");
1156
1157 assert!(res.is_none());
1159
1160 timeout(Duration::from_millis(100), client.close())
1161 .await
1162 .expect("close timed out")
1163 .expect("close failed");
1164 jh.await
1165 .expect("ws loop errored")
1166 .unwrap();
1167 server_thread.await.unwrap();
1168 }
1169
1170 #[test_log::test(tokio::test)]
1171 async fn test_reconnect() {
1172 let exp_comm = [
1173 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1174 {
1175 "method":"subscribe",
1176 "extractor_id":{
1177 "chain":"ethereum",
1178 "name":"vm:ambient"
1179 },
1180 "include_state": true
1181 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1182 )),
1183 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1184 {
1185 "method":"newsubscription",
1186 "extractor_id":{
1187 "chain":"ethereum",
1188 "name":"vm:ambient"
1189 },
1190 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1191 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1192 )),
1193 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1194 {
1195 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1196 "deltas": {
1197 "extractor": "vm:ambient",
1198 "chain": "ethereum",
1199 "block": {
1200 "number": 123,
1201 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1202 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1203 "chain": "ethereum",
1204 "ts": "2023-09-14T00:00:00"
1205 },
1206 "finalized_block_height": 0,
1207 "revert": false,
1208 "new_tokens": {},
1209 "account_updates": {
1210 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1211 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1212 "chain": "ethereum",
1213 "slots": {},
1214 "balance": "0x01f4",
1215 "code": "",
1216 "change": "Update"
1217 }
1218 },
1219 "state_updates": {
1220 "component_1": {
1221 "component_id": "component_1",
1222 "updated_attributes": {"attr1": "0x01"},
1223 "deleted_attributes": ["attr2"]
1224 }
1225 },
1226 "new_protocol_components": {
1227 "protocol_1":
1228 {
1229 "id": "protocol_1",
1230 "protocol_system": "system_1",
1231 "protocol_type_name": "type_1",
1232 "chain": "ethereum",
1233 "tokens": ["0x01", "0x02"],
1234 "contract_ids": ["0x01", "0x02"],
1235 "static_attributes": {"attr1": "0x01f4"},
1236 "change": "Update",
1237 "creation_tx": "0x01",
1238 "created_at": "2023-09-14T00:00:00"
1239 }
1240 },
1241 "deleted_protocol_components": {},
1242 "component_balances": {
1243 "protocol_1": {
1244 "0x01": {
1245 "token": "0x01",
1246 "balance": "0x01f4",
1247 "balance_float": 1000.0,
1248 "modify_tx": "0x01",
1249 "component_id": "protocol_1"
1250 }
1251 }
1252 },
1253 "account_balances": {
1254 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1255 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1256 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1257 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1258 "balance": "0x01f4",
1259 "modify_tx": "0x01"
1260 }
1261 }
1262 },
1263 "component_tvl": {
1264 "protocol_1": 1000.0
1265 },
1266 "dci_update": {
1267 "new_entrypoints": {},
1268 "new_entrypoint_params": {},
1269 "trace_results": {}
1270 }
1271 }
1272 }
1273 "#.to_owned()
1274 ))
1275 ];
1276 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1277 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1278 let jh: JoinHandle<Result<(), DeltasError>> = client
1279 .connect()
1280 .await
1281 .expect("connect failed");
1282
1283 for _ in 0..2 {
1284 let (_, mut rx) = timeout(
1285 Duration::from_millis(100),
1286 client.subscribe(
1287 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1288 SubscriptionOptions::new(),
1289 ),
1290 )
1291 .await
1292 .expect("subscription timed out")
1293 .expect("subscription failed");
1294
1295 let _ = timeout(Duration::from_millis(100), rx.recv())
1296 .await
1297 .expect("awaiting message timeout out")
1298 .expect("receiving message failed");
1299
1300 let res = timeout(Duration::from_millis(200), rx.recv())
1302 .await
1303 .expect("awaiting closed connection timeout out");
1304 assert!(res.is_none());
1305 }
1306 let res = jh.await.expect("ws client join failed");
1307 assert!(res.is_err());
1309 server_thread
1310 .await
1311 .expect("ws server loop errored");
1312 }
1313
1314 async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1315 let server = TcpListener::bind("127.0.0.1:0")
1316 .await
1317 .expect("localhost bind failed");
1318 let addr = server.local_addr().unwrap();
1319 let jh = tokio::spawn(async move {
1320 while let Ok((stream, _)) = server.accept().await {
1321 drop(stream);
1323 }
1324 });
1325 (addr, jh)
1326 }
1327
1328 #[tokio::test]
1329 async fn test_connect_max_attempts() {
1330 let (addr, _) = mock_bad_connection_tycho_ws().await;
1331 let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1332
1333 let join_handle = client.connect().await;
1334
1335 assert!(join_handle.is_err());
1336 assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1337 }
1338}