1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::Arc,
25 time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31 header::{
32 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33 USER_AGENT,
34 },
35 Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41 net::TcpStream,
42 sync::{
43 mpsc::{self, error::TrySendError, Receiver, Sender},
44 oneshot, Mutex, Notify,
45 },
46 task::JoinHandle,
47 time::sleep,
48};
49use tokio_tungstenite::{
50 connect_async,
51 tungstenite::{
52 self,
53 handshake::client::{generate_key, Request},
54 },
55 MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65 #[error("Failed to parse URI: {0}. Error: {1}")]
67 UriParsing(String, String),
68
69 #[error("The requested subscription is already pending")]
71 SubscriptionAlreadyPending,
72
73 #[error("{0}")]
76 TransportError(String),
77
78 #[error("The buffer is full!")]
82 BufferFull,
83
84 #[error("The client is not connected!")]
88 NotConnected,
89
90 #[error("The client is already connected!")]
92 AlreadyConnected,
93
94 #[error("The server closed the connection!")]
96 ConnectionClosed,
97
98 #[error("Connection error: {0}")]
100 ConnectionError(#[from] Box<tungstenite::Error>),
101
102 #[error("Tycho FatalError: {0}")]
104 Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109 include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113 fn default() -> Self {
114 Self { include_state: true }
115 }
116}
117
118impl SubscriptionOptions {
119 pub fn new() -> Self {
120 Self::default()
121 }
122 pub fn with_state(mut self, val: bool) -> Self {
123 self.include_state = val;
124 self
125 }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131 async fn subscribe(
138 &self,
139 extractor_id: ExtractorIdentity,
140 options: SubscriptionOptions,
141 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149 async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155 uri: Uri,
157 auth_key: Option<String>,
159 max_reconnects: u32,
161 ws_buffer_size: usize,
164 subscription_buffer_size: usize,
167 conn_notify: Arc<Notify>,
169 inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176#[derive(Debug)]
188enum SubscriptionInfo {
189 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191 Active,
193 RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197struct Inner {
199 sink: WebSocketSink,
201 cmd_tx: Sender<()>,
203 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205 subscriptions: HashMap<Uuid, SubscriptionInfo>,
207 sender: HashMap<Uuid, Sender<BlockChanges>>,
210 buffer_size: usize,
212}
213
214impl Inner {
218 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219 Self {
220 sink,
221 cmd_tx,
222 pending: HashMap::new(),
223 subscriptions: HashMap::new(),
224 sender: HashMap::new(),
225 buffer_size,
226 }
227 }
228
229 #[allow(clippy::result_large_err)]
231 fn new_subscription(
232 &mut self,
233 id: &ExtractorIdentity,
234 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235 ) -> Result<(), DeltasError> {
236 if self.pending.contains_key(id) {
237 return Err(DeltasError::SubscriptionAlreadyPending);
238 }
239 self.pending
240 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241 Ok(())
242 }
243
244 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248 if let Some(info) = self.pending.remove(extractor_id) {
249 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250 let (tx, rx) = mpsc::channel(self.buffer_size);
251 self.sender.insert(subscription_id, tx);
252 self.subscriptions
253 .insert(subscription_id, SubscriptionInfo::Active);
254 let _ = ready_tx
255 .send((subscription_id, rx))
256 .map_err(|_| {
257 warn!(
258 ?extractor_id,
259 ?subscription_id,
260 "Subscriber for has gone away. Ignoring."
261 )
262 });
263 } else {
264 error!(
265 ?extractor_id,
266 ?subscription_id,
267 "Pending subscription was not in the correct state to
268 transition to active. Ignoring!"
269 )
270 }
271 } else {
272 error!(
273 ?extractor_id,
274 ?subscription_id,
275 "Tried to mark an unkown subscription as active. Ignoring!"
276 );
277 }
278 }
279
280 #[allow(clippy::result_large_err)]
282 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283 if let Some(sender) = self.sender.get_mut(id) {
284 sender
285 .try_send(msg)
286 .map_err(|e| match e {
287 TrySendError::Full(_) => DeltasError::BufferFull,
288 TrySendError::Closed(_) => {
289 DeltasError::TransportError("The subscriber has gone away".to_string())
290 }
291 })?;
292 }
293 Ok(())
294 }
295
296 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301 if let Some(info) = self
302 .subscriptions
303 .get_mut(subscription_id)
304 {
305 if let SubscriptionInfo::Active = info {
306 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307 }
308 } else {
309 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311 }
312 }
313
314 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321 if let Entry::Occupied(e) = self
322 .subscriptions
323 .entry(subscription_id)
324 {
325 let info = e.remove();
326 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327 let _ = tx.send(()).map_err(|_| {
328 warn!(?subscription_id, "failed to notify about removed subscription")
329 });
330 self.sender
331 .remove(&subscription_id)
332 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333 } else {
334 warn!(?subscription_id, "Subscription ended unexpectedly!");
335 self.sender
336 .remove(&subscription_id)
337 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338 }
339 } else {
340 error!(
341 ?subscription_id,
342 "Received `SubscriptionEnded`, but was never subscribed
343 to it. This is likely a bug!"
344 );
345 }
346
347 Ok(())
348 }
349
350 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352 self.sink.send(msg).await.map_err(|e| {
353 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354 })
355 }
356}
357
358impl WsDeltasClient {
360 #[allow(clippy::result_large_err)]
362 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363 let uri = ws_uri
364 .parse::<Uri>()
365 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366 Ok(Self {
367 uri,
368 auth_key: auth_key.map(|s| s.to_string()),
369 inner: Arc::new(Mutex::new(None)),
370 ws_buffer_size: 128,
371 subscription_buffer_size: 128,
372 conn_notify: Arc::new(Notify::new()),
373 max_reconnects: 5,
374 })
375 }
376
377 #[allow(clippy::result_large_err)]
379 pub fn new_with_reconnects(
380 ws_uri: &str,
381 max_reconnects: u32,
382 auth_key: Option<&str>,
383 ) -> Result<Self, DeltasError> {
384 let uri = ws_uri
385 .parse::<Uri>()
386 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388 Ok(Self {
389 uri,
390 auth_key: auth_key.map(|s| s.to_string()),
391 inner: Arc::new(Mutex::new(None)),
392 ws_buffer_size: 128,
393 subscription_buffer_size: 128,
394 conn_notify: Arc::new(Notify::new()),
395 max_reconnects,
396 })
397 }
398
399 async fn is_connected(&self) -> bool {
403 let guard = self.inner.as_ref().lock().await;
404 guard.is_some()
405 }
406
407 async fn ensure_connection(&self) {
412 if !self.is_connected().await {
413 self.conn_notify.notified().await;
414 }
415 }
416
417 #[instrument(skip(self, msg))]
422 async fn handle_msg(
423 &self,
424 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425 ) -> Result<(), DeltasError> {
426 let mut guard = self.inner.lock().await;
427
428 match msg {
429 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434 serde_json::Value,
435 >(&text)
436 {
437 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438 Ok(ws_message) => match ws_message {
439 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440 trace!(?deltas, "Received a block state change, sending to channel");
441 let inner = guard
442 .as_mut()
443 .ok_or_else(|| DeltasError::NotConnected)?;
444 match inner.send(&subscription_id, deltas) {
445 Err(DeltasError::BufferFull) => {
446 error!(?subscription_id, "Buffer full, message dropped!");
447 }
448 Err(_) => {
449 warn!(
450 ?subscription_id,
451 "Receiver for has gone away, unsubscribing!"
452 );
453 let (tx, _) = oneshot::channel();
454 let _ = WsDeltasClient::unsubscribe_inner(
455 inner,
456 subscription_id,
457 tx,
458 )
459 .await;
460 }
461 _ => { }
462 }
463 }
464 WebSocketMessage::Response(Response::NewSubscription {
465 extractor_id,
466 subscription_id,
467 }) => {
468 info!(?extractor_id, ?subscription_id, "Received a new subscription");
469 let inner = guard
470 .as_mut()
471 .ok_or_else(|| DeltasError::NotConnected)?;
472 inner.mark_active(&extractor_id, subscription_id);
473 }
474 WebSocketMessage::Response(Response::SubscriptionEnded {
475 subscription_id,
476 }) => {
477 info!(?subscription_id, "Received a subscription ended");
478 let inner = guard
479 .as_mut()
480 .ok_or_else(|| DeltasError::NotConnected)?;
481 inner.remove_subscription(subscription_id)?;
482 }
483 },
484 Err(e) => {
485 error!(
486 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
487 e, text
488 );
489 }
490 },
491 Err(e) => {
492 error!(
493 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
494 e, text
495 );
496 }
497 },
498 Ok(tungstenite::protocol::Message::Ping(_)) => {
499 let inner = guard
501 .as_mut()
502 .ok_or_else(|| DeltasError::NotConnected)?;
503 if let Err(error) = inner
504 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
505 .await
506 {
507 debug!(?error, "Failed to send pong!");
508 }
509 }
510 Ok(tungstenite::protocol::Message::Pong(_)) => {
511 }
513 Ok(tungstenite::protocol::Message::Close(_)) => {
514 return Err(DeltasError::ConnectionClosed);
515 }
516 Ok(unknown_msg) => {
517 info!("Received an unknown message type: {:?}", unknown_msg);
518 }
519 Err(error) => {
520 error!(?error, "Websocket error");
521 return Err(match error {
522 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
523 tungstenite::Error::AlreadyClosed => {
524 warn!("Received AlreadyClosed error which is indicative of a bug!");
525 DeltasError::ConnectionError(Box::new(error))
526 }
527 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
528 DeltasError::ConnectionError(Box::new(error))
529 }
530 _ => DeltasError::Fatal(error.to_string()),
531 });
532 }
533 };
534 Ok(())
535 }
536
537 async fn unsubscribe_inner(
543 inner: &mut Inner,
544 subscription_id: Uuid,
545 ready_tx: oneshot::Sender<()>,
546 ) -> Result<(), DeltasError> {
547 inner.end_subscription(&subscription_id, ready_tx);
548 let cmd = Command::Unsubscribe { subscription_id };
549 inner
550 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
551 |e| {
552 DeltasError::TransportError(format!(
553 "Failed to serialize unsubscribe command: {e}"
554 ))
555 },
556 )?))
557 .await?;
558 Ok(())
559 }
560}
561
562#[async_trait]
563impl DeltasClient for WsDeltasClient {
564 #[instrument(skip(self))]
565 async fn subscribe(
566 &self,
567 extractor_id: ExtractorIdentity,
568 options: SubscriptionOptions,
569 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
570 trace!("Starting subscribe");
571 self.ensure_connection().await;
572 let (ready_tx, ready_rx) = oneshot::channel();
573 {
574 let mut guard = self.inner.lock().await;
575 let inner = guard
576 .as_mut()
577 .ok_or_else(|| DeltasError::NotConnected)?;
578 trace!("Sending subscribe command");
579 inner.new_subscription(&extractor_id, ready_tx)?;
580 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
581 inner
582 .ws_send(tungstenite::protocol::Message::Text(
583 serde_json::to_string(&cmd).map_err(|e| {
584 DeltasError::TransportError(format!(
585 "Failed to serialize subscribe command: {e}"
586 ))
587 })?,
588 ))
589 .await?;
590 }
591 trace!("Waiting for subscription response");
592 let rx = ready_rx.await.map_err(|_| {
593 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
594 })?;
595 trace!("Subscription successful");
596 Ok(rx)
597 }
598
599 #[instrument(skip(self))]
600 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
601 self.ensure_connection().await;
602 let (ready_tx, ready_rx) = oneshot::channel();
603 {
604 let mut guard = self.inner.lock().await;
605 let inner = guard
606 .as_mut()
607 .ok_or_else(|| DeltasError::NotConnected)?;
608
609 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
610 }
611 ready_rx.await.map_err(|_| {
612 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
613 })?;
614
615 Ok(())
616 }
617
618 #[instrument(skip(self))]
619 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
620 if self.is_connected().await {
621 return Err(DeltasError::AlreadyConnected);
622 }
623 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
624 info!(?ws_uri, "Starting TychoWebsocketClient");
625
626 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
627 {
628 let mut guard = self.inner.as_ref().lock().await;
629 *guard = None;
630 }
631 let this = self.clone();
632 let jh = tokio::spawn(async move {
633 let mut retry_count = 0;
634 let mut result = Err(DeltasError::NotConnected);
635
636 'retry: while retry_count < this.max_reconnects {
637 info!(?ws_uri, "Connecting to WebSocket server");
638
639 let mut request_builder = Request::builder()
641 .uri(&ws_uri)
642 .header(SEC_WEBSOCKET_KEY, generate_key())
643 .header(SEC_WEBSOCKET_VERSION, 13)
644 .header(CONNECTION, "Upgrade")
645 .header(UPGRADE, "websocket")
646 .header(
647 HOST,
648 this.uri.host().ok_or_else(|| {
649 DeltasError::UriParsing(
650 ws_uri.clone(),
651 "No host found in tycho url".to_string(),
652 )
653 })?,
654 )
655 .header(
656 USER_AGENT,
657 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
658 );
659
660 if let Some(ref key) = this.auth_key {
662 request_builder = request_builder.header(AUTHORIZATION, key);
663 }
664
665 let request = request_builder.body(()).map_err(|e| {
666 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
667 })?;
668 let (conn, _) = match connect_async(request).await {
669 Ok(conn) => conn,
670 Err(e) => {
671 retry_count += 1;
673 let mut guard = this.inner.as_ref().lock().await;
674 *guard = None;
675
676 warn!(
677 e = e.to_string(),
678 "Failed to connect to WebSocket server; Reconnecting"
679 );
680 sleep(Duration::from_millis(500)).await;
681
682 continue 'retry;
683 }
684 };
685
686 let (ws_tx_new, ws_rx_new) = conn.split();
687 {
688 let mut guard = this.inner.as_ref().lock().await;
689 *guard =
690 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
691 }
692 let mut msg_rx = ws_rx_new.boxed();
693
694 info!("Connection Successful: TychoWebsocketClient started");
695 this.conn_notify.notify_waiters();
696 result = Ok(());
697
698 loop {
699 let res = tokio::select! {
700 msg = msg_rx.next() => match msg {
701 Some(msg) => this.handle_msg(msg).await,
702 None => { break 'retry } },
704 _ = cmd_rx.recv() => {break 'retry},
705 };
706 if let Err(error) = res {
707 if matches!(
708 error,
709 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
710 ) {
711 retry_count += 1;
713 let mut guard = this.inner.as_ref().lock().await;
714 *guard = None;
715
716 warn!(
717 ?error,
718 ?retry_count,
719 "Connection dropped unexpectedly; Reconnecting..."
720 );
721 break;
722 } else {
723 error!(?error, "Fatal error; Exiting");
725 result = Err(error);
726 break 'retry;
727 }
728 }
729 }
730 }
731
732 let mut guard = this.inner.as_ref().lock().await;
734 *guard = None;
735
736 if retry_count >= this.max_reconnects {
738 error!("Max reconnection attempts reached; Exiting");
739 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
741 }
742
743 result
744 });
745
746 self.conn_notify.notified().await;
747
748 if self.is_connected().await {
749 Ok(jh)
750 } else {
751 Err(DeltasError::NotConnected)
752 }
753 }
754
755 #[instrument(skip(self))]
756 async fn close(&self) -> Result<(), DeltasError> {
757 info!("Closing TychoWebsocketClient");
758 let mut guard = self.inner.lock().await;
759 let inner = guard
760 .as_mut()
761 .ok_or_else(|| DeltasError::NotConnected)?;
762 inner
763 .cmd_tx
764 .send(())
765 .await
766 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
767 Ok(())
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use std::net::SocketAddr;
774
775 use tokio::{net::TcpListener, time::timeout};
776 use tycho_common::dto::Chain;
777
778 use super::*;
779
780 #[derive(Clone)]
781 enum ExpectedComm {
782 Receive(u64, tungstenite::protocol::Message),
783 Send(tungstenite::protocol::Message),
784 }
785
786 async fn mock_tycho_ws(
787 messages: &[ExpectedComm],
788 reconnects: usize,
789 ) -> (SocketAddr, JoinHandle<()>) {
790 info!("Starting mock webserver");
791 let server = TcpListener::bind("127.0.0.1:0")
793 .await
794 .expect("localhost bind failed");
795 let addr = server.local_addr().unwrap();
796 let messages = messages.to_vec();
797
798 let jh = tokio::spawn(async move {
799 info!("mock webserver started");
800 for _ in 0..(reconnects + 1) {
801 if let Ok((stream, _)) = server.accept().await {
802 let mut websocket = tokio_tungstenite::accept_async(stream)
803 .await
804 .unwrap();
805
806 info!("Handling messages..");
807 for c in messages.iter().cloned() {
808 match c {
809 ExpectedComm::Receive(t, exp) => {
810 info!("Awaiting message...");
811 let msg = timeout(Duration::from_millis(t), websocket.next())
812 .await
813 .expect("Receive timeout")
814 .expect("Stream exhausted")
815 .expect("Failed to receive message.");
816 info!("Message received");
817 assert_eq!(msg, exp)
818 }
819 ExpectedComm::Send(data) => {
820 info!("Sending message");
821 websocket
822 .send(data)
823 .await
824 .expect("Failed to send message");
825 info!("Message sent");
826 }
827 };
828 }
829 sleep(Duration::from_millis(100)).await;
830 let _ = websocket.close(None).await;
832 }
833 }
834 });
835 (addr, jh)
836 }
837
838 #[tokio::test]
839 async fn test_subscribe_receive() {
840 let exp_comm = [
841 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
842 {
843 "method":"subscribe",
844 "extractor_id":{
845 "chain":"ethereum",
846 "name":"vm:ambient"
847 },
848 "include_state": true
849 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
850 )),
851 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
852 {
853 "method":"newsubscription",
854 "extractor_id":{
855 "chain":"ethereum",
856 "name":"vm:ambient"
857 },
858 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
859 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
860 )),
861 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
862 {
863 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
864 "deltas": {
865 "extractor": "vm:ambient",
866 "chain": "ethereum",
867 "block": {
868 "number": 123,
869 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
870 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
871 "chain": "ethereum",
872 "ts": "2023-09-14T00:00:00"
873 },
874 "finalized_block_height": 0,
875 "revert": false,
876 "new_tokens": {},
877 "account_updates": {
878 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
879 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
880 "chain": "ethereum",
881 "slots": {},
882 "balance": "0x01f4",
883 "code": "",
884 "change": "Update"
885 }
886 },
887 "state_updates": {
888 "component_1": {
889 "component_id": "component_1",
890 "updated_attributes": {"attr1": "0x01"},
891 "deleted_attributes": ["attr2"]
892 }
893 },
894 "new_protocol_components":
895 { "protocol_1": {
896 "id": "protocol_1",
897 "protocol_system": "system_1",
898 "protocol_type_name": "type_1",
899 "chain": "ethereum",
900 "tokens": ["0x01", "0x02"],
901 "contract_ids": ["0x01", "0x02"],
902 "static_attributes": {"attr1": "0x01f4"},
903 "change": "Update",
904 "creation_tx": "0x01",
905 "created_at": "2023-09-14T00:00:00"
906 }
907 },
908 "deleted_protocol_components": {},
909 "component_balances": {
910 "protocol_1":
911 {
912 "0x01": {
913 "token": "0x01",
914 "balance": "0x01f4",
915 "balance_float": 0.0,
916 "modify_tx": "0x01",
917 "component_id": "protocol_1"
918 }
919 }
920 },
921 "account_balances": {
922 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
923 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
924 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
925 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
926 "balance": "0x01f4",
927 "modify_tx": "0x01"
928 }
929 }
930 },
931 "component_tvl": {
932 "protocol_1": 1000.0
933 },
934 "dci_update": {
935 "new_entrypoints": {},
936 "new_entrypoint_params": {},
937 "trace_results": {}
938 }
939 }
940 }
941 "#.to_owned()
942 ))
943 ];
944 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
945
946 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
947 let jh = client
948 .connect()
949 .await
950 .expect("connect failed");
951 let (_, mut rx) = timeout(
952 Duration::from_millis(100),
953 client.subscribe(
954 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
955 SubscriptionOptions::new(),
956 ),
957 )
958 .await
959 .expect("subscription timed out")
960 .expect("subscription failed");
961 let _ = timeout(Duration::from_millis(100), rx.recv())
962 .await
963 .expect("awaiting message timeout out")
964 .expect("receiving message failed");
965 timeout(Duration::from_millis(100), client.close())
966 .await
967 .expect("close timed out")
968 .expect("close failed");
969 jh.await
970 .expect("ws loop errored")
971 .unwrap();
972 server_thread.await.unwrap();
973 }
974
975 #[tokio::test]
976 async fn test_unsubscribe() {
977 let exp_comm = [
978 ExpectedComm::Receive(
979 100,
980 tungstenite::protocol::Message::Text(
981 r#"
982 {
983 "method": "subscribe",
984 "extractor_id":{
985 "chain": "ethereum",
986 "name": "vm:ambient"
987 },
988 "include_state": true
989 }"#
990 .to_owned()
991 .replace(|c: char| c.is_whitespace(), ""),
992 ),
993 ),
994 ExpectedComm::Send(tungstenite::protocol::Message::Text(
995 r#"
996 {
997 "method": "newsubscription",
998 "extractor_id":{
999 "chain": "ethereum",
1000 "name": "vm:ambient"
1001 },
1002 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1003 }"#
1004 .to_owned()
1005 .replace(|c: char| c.is_whitespace(), ""),
1006 )),
1007 ExpectedComm::Receive(
1008 100,
1009 tungstenite::protocol::Message::Text(
1010 r#"
1011 {
1012 "method": "unsubscribe",
1013 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014 }
1015 "#
1016 .to_owned()
1017 .replace(|c: char| c.is_whitespace(), ""),
1018 ),
1019 ),
1020 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1021 r#"
1022 {
1023 "method": "subscriptionended",
1024 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1025 }
1026 "#
1027 .to_owned()
1028 .replace(|c: char| c.is_whitespace(), ""),
1029 )),
1030 ];
1031 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1032
1033 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1034 let jh = client
1035 .connect()
1036 .await
1037 .expect("connect failed");
1038 let (sub_id, mut rx) = timeout(
1039 Duration::from_millis(100),
1040 client.subscribe(
1041 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1042 SubscriptionOptions::new(),
1043 ),
1044 )
1045 .await
1046 .expect("subscription timed out")
1047 .expect("subscription failed");
1048
1049 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1050 .await
1051 .expect("unsubscribe timed out")
1052 .expect("unsubscribe failed");
1053 let res = timeout(Duration::from_millis(100), rx.recv())
1054 .await
1055 .expect("awaiting message timeout out");
1056
1057 assert!(res.is_none());
1059
1060 timeout(Duration::from_millis(100), client.close())
1061 .await
1062 .expect("close timed out")
1063 .expect("close failed");
1064 jh.await
1065 .expect("ws loop errored")
1066 .unwrap();
1067 server_thread.await.unwrap();
1068 }
1069
1070 #[tokio::test]
1071 async fn test_subscription_unexpected_end() {
1072 let exp_comm = [
1073 ExpectedComm::Receive(
1074 100,
1075 tungstenite::protocol::Message::Text(
1076 r#"
1077 {
1078 "method":"subscribe",
1079 "extractor_id":{
1080 "chain":"ethereum",
1081 "name":"vm:ambient"
1082 },
1083 "include_state": true
1084 }"#
1085 .to_owned()
1086 .replace(|c: char| c.is_whitespace(), ""),
1087 ),
1088 ),
1089 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1090 r#"
1091 {
1092 "method":"newsubscription",
1093 "extractor_id":{
1094 "chain":"ethereum",
1095 "name":"vm:ambient"
1096 },
1097 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1098 }"#
1099 .to_owned()
1100 .replace(|c: char| c.is_whitespace(), ""),
1101 )),
1102 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1103 r#"
1104 {
1105 "method": "subscriptionended",
1106 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1107 }"#
1108 .to_owned()
1109 .replace(|c: char| c.is_whitespace(), ""),
1110 )),
1111 ];
1112 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1113
1114 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1115 let jh = client
1116 .connect()
1117 .await
1118 .expect("connect failed");
1119 let (_, mut rx) = timeout(
1120 Duration::from_millis(100),
1121 client.subscribe(
1122 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1123 SubscriptionOptions::new(),
1124 ),
1125 )
1126 .await
1127 .expect("subscription timed out")
1128 .expect("subscription failed");
1129 let res = timeout(Duration::from_millis(100), rx.recv())
1130 .await
1131 .expect("awaiting message timeout out");
1132
1133 assert!(res.is_none());
1135
1136 timeout(Duration::from_millis(100), client.close())
1137 .await
1138 .expect("close timed out")
1139 .expect("close failed");
1140 jh.await
1141 .expect("ws loop errored")
1142 .unwrap();
1143 server_thread.await.unwrap();
1144 }
1145
1146 #[test_log::test(tokio::test)]
1147 async fn test_reconnect() {
1148 let exp_comm = [
1149 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1150 {
1151 "method":"subscribe",
1152 "extractor_id":{
1153 "chain":"ethereum",
1154 "name":"vm:ambient"
1155 },
1156 "include_state": true
1157 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1158 )),
1159 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1160 {
1161 "method":"newsubscription",
1162 "extractor_id":{
1163 "chain":"ethereum",
1164 "name":"vm:ambient"
1165 },
1166 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1167 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1168 )),
1169 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1170 {
1171 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1172 "deltas": {
1173 "extractor": "vm:ambient",
1174 "chain": "ethereum",
1175 "block": {
1176 "number": 123,
1177 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1178 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1179 "chain": "ethereum",
1180 "ts": "2023-09-14T00:00:00"
1181 },
1182 "finalized_block_height": 0,
1183 "revert": false,
1184 "new_tokens": {},
1185 "account_updates": {
1186 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1187 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1188 "chain": "ethereum",
1189 "slots": {},
1190 "balance": "0x01f4",
1191 "code": "",
1192 "change": "Update"
1193 }
1194 },
1195 "state_updates": {
1196 "component_1": {
1197 "component_id": "component_1",
1198 "updated_attributes": {"attr1": "0x01"},
1199 "deleted_attributes": ["attr2"]
1200 }
1201 },
1202 "new_protocol_components": {
1203 "protocol_1":
1204 {
1205 "id": "protocol_1",
1206 "protocol_system": "system_1",
1207 "protocol_type_name": "type_1",
1208 "chain": "ethereum",
1209 "tokens": ["0x01", "0x02"],
1210 "contract_ids": ["0x01", "0x02"],
1211 "static_attributes": {"attr1": "0x01f4"},
1212 "change": "Update",
1213 "creation_tx": "0x01",
1214 "created_at": "2023-09-14T00:00:00"
1215 }
1216 },
1217 "deleted_protocol_components": {},
1218 "component_balances": {
1219 "protocol_1": {
1220 "0x01": {
1221 "token": "0x01",
1222 "balance": "0x01f4",
1223 "balance_float": 1000.0,
1224 "modify_tx": "0x01",
1225 "component_id": "protocol_1"
1226 }
1227 }
1228 },
1229 "account_balances": {
1230 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1231 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1232 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1233 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1234 "balance": "0x01f4",
1235 "modify_tx": "0x01"
1236 }
1237 }
1238 },
1239 "component_tvl": {
1240 "protocol_1": 1000.0
1241 },
1242 "dci_update": {
1243 "new_entrypoints": {},
1244 "new_entrypoint_params": {},
1245 "trace_results": {}
1246 }
1247 }
1248 }
1249 "#.to_owned()
1250 ))
1251 ];
1252 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1253 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1254 let jh: JoinHandle<Result<(), DeltasError>> = client
1255 .connect()
1256 .await
1257 .expect("connect failed");
1258
1259 for _ in 0..2 {
1260 let (_, mut rx) = timeout(
1261 Duration::from_millis(100),
1262 client.subscribe(
1263 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1264 SubscriptionOptions::new(),
1265 ),
1266 )
1267 .await
1268 .expect("subscription timed out")
1269 .expect("subscription failed");
1270
1271 let _ = timeout(Duration::from_millis(100), rx.recv())
1272 .await
1273 .expect("awaiting message timeout out")
1274 .expect("receiving message failed");
1275
1276 let res = timeout(Duration::from_millis(200), rx.recv())
1278 .await
1279 .expect("awaiting closed connection timeout out");
1280 assert!(res.is_none());
1281 }
1282 let res = jh.await.expect("ws client join failed");
1283 assert!(res.is_err());
1285 server_thread
1286 .await
1287 .expect("ws server loop errored");
1288 }
1289
1290 async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1291 let server = TcpListener::bind("127.0.0.1:0")
1292 .await
1293 .expect("localhost bind failed");
1294 let addr = server.local_addr().unwrap();
1295 let jh = tokio::spawn(async move {
1296 while let Ok((stream, _)) = server.accept().await {
1297 drop(stream);
1299 }
1300 });
1301 (addr, jh)
1302 }
1303
1304 #[tokio::test]
1305 async fn test_connect_max_attempts() {
1306 let (addr, _) = mock_bad_connection_tycho_ws().await;
1307 let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1308
1309 let join_handle = client.connect().await;
1310
1311 assert!(join_handle.is_err());
1312 assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1313 }
1314}