1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::Arc,
25 time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31 header::{
32 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33 USER_AGENT,
34 },
35 Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41 net::TcpStream,
42 sync::{
43 mpsc::{self, error::TrySendError, Receiver, Sender},
44 oneshot, Mutex, Notify,
45 },
46 task::JoinHandle,
47 time::sleep,
48};
49use tokio_tungstenite::{
50 connect_async,
51 tungstenite::{
52 self,
53 handshake::client::{generate_key, Request},
54 },
55 MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65 #[error("Failed to parse URI: {0}. Error: {1}")]
67 UriParsing(String, String),
68 #[error("The requested subscription is already pending")]
71 SubscriptionAlreadyPending,
72 #[error("{0}")]
76 TransportError(String),
77 #[error("The buffer is full!")]
80 BufferFull,
81 #[error("The client is not connected!")]
84 NotConnected,
85 #[error("The client is already connected!")]
87 AlreadyConnected,
88 #[error("The server closed the connection!")]
90 ConnectionClosed,
91 #[error("ConnectionError {source}")]
93 ConnectionError {
94 #[from]
95 source: tungstenite::Error,
96 },
97 #[error("Tycho FatalError: {0}")]
99 Fatal(String),
100}
101
102#[derive(Clone, Debug)]
103pub struct SubscriptionOptions {
104 include_state: bool,
105}
106
107impl Default for SubscriptionOptions {
108 fn default() -> Self {
109 Self { include_state: true }
110 }
111}
112
113impl SubscriptionOptions {
114 pub fn new() -> Self {
115 Self::default()
116 }
117 pub fn with_state(mut self, val: bool) -> Self {
118 self.include_state = val;
119 self
120 }
121}
122
123#[cfg_attr(test, automock)]
124#[async_trait]
125pub trait DeltasClient {
126 async fn subscribe(
133 &self,
134 extractor_id: ExtractorIdentity,
135 options: SubscriptionOptions,
136 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
137
138 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
140
141 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
143
144 async fn close(&self) -> Result<(), DeltasError>;
146}
147
148#[derive(Clone)]
149pub struct WsDeltasClient {
150 uri: Uri,
152 auth_key: Option<String>,
154 max_reconnects: u32,
156 ws_buffer_size: usize,
159 subscription_buffer_size: usize,
162 conn_notify: Arc<Notify>,
164 inner: Arc<Mutex<Option<Inner>>>,
166}
167
168type WebSocketSink =
169 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
170
171#[derive(Debug)]
183enum SubscriptionInfo {
184 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
186 Active,
188 RequestedUnsubscription(oneshot::Sender<()>),
190}
191
192struct Inner {
194 sink: WebSocketSink,
196 cmd_tx: Sender<()>,
198 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
200 subscriptions: HashMap<Uuid, SubscriptionInfo>,
202 sender: HashMap<Uuid, Sender<BlockChanges>>,
205 buffer_size: usize,
207}
208
209impl Inner {
213 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
214 Self {
215 sink,
216 cmd_tx,
217 pending: HashMap::new(),
218 subscriptions: HashMap::new(),
219 sender: HashMap::new(),
220 buffer_size,
221 }
222 }
223
224 #[allow(clippy::result_large_err)]
226 fn new_subscription(
227 &mut self,
228 id: &ExtractorIdentity,
229 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
230 ) -> Result<(), DeltasError> {
231 if self.pending.contains_key(id) {
232 return Err(DeltasError::SubscriptionAlreadyPending);
233 }
234 self.pending
235 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
236 Ok(())
237 }
238
239 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
243 if let Some(info) = self.pending.remove(extractor_id) {
244 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
245 let (tx, rx) = mpsc::channel(self.buffer_size);
246 self.sender.insert(subscription_id, tx);
247 self.subscriptions
248 .insert(subscription_id, SubscriptionInfo::Active);
249 let _ = ready_tx
250 .send((subscription_id, rx))
251 .map_err(|_| {
252 warn!(
253 ?extractor_id,
254 ?subscription_id,
255 "Subscriber for has gone away. Ignoring."
256 )
257 });
258 } else {
259 error!(
260 ?extractor_id,
261 ?subscription_id,
262 "Pending subscription was not in the correct state to
263 transition to active. Ignoring!"
264 )
265 }
266 } else {
267 error!(
268 ?extractor_id,
269 ?subscription_id,
270 "Tried to mark an unkown subscription as active. Ignoring!"
271 );
272 }
273 }
274
275 #[allow(clippy::result_large_err)]
277 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
278 if let Some(sender) = self.sender.get_mut(id) {
279 sender
280 .try_send(msg)
281 .map_err(|e| match e {
282 TrySendError::Full(_) => DeltasError::BufferFull,
283 TrySendError::Closed(_) => {
284 DeltasError::TransportError("The subscriber has gone away".to_string())
285 }
286 })?;
287 }
288 Ok(())
289 }
290
291 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
296 if let Some(info) = self
297 .subscriptions
298 .get_mut(subscription_id)
299 {
300 if let SubscriptionInfo::Active = info {
301 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
302 }
303 } else {
304 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
306 }
307 }
308
309 fn remove_subscription(&mut self, subscription_id: Uuid) {
316 if let Entry::Occupied(e) = self
317 .subscriptions
318 .entry(subscription_id)
319 {
320 let info = e.remove();
321 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
322 let _ = tx.send(()).map_err(|_| {
323 warn!(?subscription_id, "failed to notify about removed subscription")
324 });
325 self.sender
326 .remove(&subscription_id)
327 .expect(
328 "Inconsistent internal client state: `sender` state
329 drifted from `info` while removing a subscription.",
330 );
331 } else {
332 warn!(?subscription_id, "Subscription ended unexpectedly!");
333 self.sender
334 .remove(&subscription_id)
335 .expect("sender channel missing");
336 }
337 } else {
338 error!(
339 ?subscription_id,
340 "Received `SubscriptionEnded`, but was never subscribed
341 to it. This is likely a bug!"
342 );
343 }
344 }
345
346 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
348 self.sink.send(msg).await.map_err(|e| {
349 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
350 })
351 }
352}
353
354impl WsDeltasClient {
356 #[allow(clippy::result_large_err)]
358 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
359 let uri = ws_uri
360 .parse::<Uri>()
361 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
362 Ok(Self {
363 uri,
364 auth_key: auth_key.map(|s| s.to_string()),
365 inner: Arc::new(Mutex::new(None)),
366 ws_buffer_size: 128,
367 subscription_buffer_size: 128,
368 conn_notify: Arc::new(Notify::new()),
369 max_reconnects: 5,
370 })
371 }
372
373 #[allow(clippy::result_large_err)]
375 pub fn new_with_reconnects(
376 ws_uri: &str,
377 max_reconnects: u32,
378 auth_key: Option<&str>,
379 ) -> Result<Self, DeltasError> {
380 let uri = ws_uri
381 .parse::<Uri>()
382 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
383
384 Ok(Self {
385 uri,
386 auth_key: auth_key.map(|s| s.to_string()),
387 inner: Arc::new(Mutex::new(None)),
388 ws_buffer_size: 128,
389 subscription_buffer_size: 128,
390 conn_notify: Arc::new(Notify::new()),
391 max_reconnects,
392 })
393 }
394
395 async fn is_connected(&self) -> bool {
399 let guard = self.inner.as_ref().lock().await;
400 guard.is_some()
401 }
402
403 async fn ensure_connection(&self) {
408 if !self.is_connected().await {
409 self.conn_notify.notified().await;
410 }
411 }
412
413 #[instrument(skip(self, msg))]
418 async fn handle_msg(
419 &self,
420 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
421 ) -> Result<(), DeltasError> {
422 let mut guard = self.inner.lock().await;
423
424 match msg {
425 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
430 serde_json::Value,
431 >(&text)
432 {
433 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
434 Ok(ws_message) => match ws_message {
435 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
436 trace!(?deltas, "Received a block state change, sending to channel");
437 let inner = guard
438 .as_mut()
439 .ok_or_else(|| DeltasError::NotConnected)?;
440 match inner.send(&subscription_id, deltas) {
441 Err(DeltasError::BufferFull) => {
442 error!(?subscription_id, "Buffer full, message dropped!");
443 }
444 Err(_) => {
445 warn!(
446 ?subscription_id,
447 "Receiver for has gone away, unsubscribing!"
448 );
449 let (tx, _) = oneshot::channel();
450 let _ = WsDeltasClient::unsubscribe_inner(
451 inner,
452 subscription_id,
453 tx,
454 )
455 .await;
456 }
457 _ => { }
458 }
459 }
460 WebSocketMessage::Response(Response::NewSubscription {
461 extractor_id,
462 subscription_id,
463 }) => {
464 info!(?extractor_id, ?subscription_id, "Received a new subscription");
465 let inner = guard
466 .as_mut()
467 .ok_or_else(|| DeltasError::NotConnected)?;
468 inner.mark_active(&extractor_id, subscription_id);
469 }
470 WebSocketMessage::Response(Response::SubscriptionEnded {
471 subscription_id,
472 }) => {
473 info!(?subscription_id, "Received a subscription ended");
474 let inner = guard
475 .as_mut()
476 .ok_or_else(|| DeltasError::NotConnected)?;
477 inner.remove_subscription(subscription_id);
478 }
479 },
480 Err(e) => {
481 error!(error = %e, message=text, "Failed to deserialize WebSocketMessage: message does not match expected structs");
482 }
483 },
484 Err(e) => {
485 error!(error = %e, message=text, "Failed to deserialize message: invalid json");
486 }
487 },
488 Ok(tungstenite::protocol::Message::Ping(_)) => {
489 let inner = guard
491 .as_mut()
492 .ok_or_else(|| DeltasError::NotConnected)?;
493 if let Err(error) = inner
494 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
495 .await
496 {
497 debug!(?error, "Failed to send pong!");
498 }
499 }
500 Ok(tungstenite::protocol::Message::Pong(_)) => {
501 }
503 Ok(tungstenite::protocol::Message::Close(_)) => {
504 return Err(DeltasError::ConnectionClosed);
505 }
506 Ok(unknown_msg) => {
507 info!("Received an unknown message type: {:?}", unknown_msg);
508 }
509 Err(error) => {
510 error!(?error, "Websocket error");
511 return Err(match &error {
512 tungstenite::Error::ConnectionClosed => DeltasError::from(error),
513 tungstenite::Error::AlreadyClosed => {
514 warn!("Received AlreadyClosed error which is indicative of a bug!");
515 DeltasError::from(error)
516 }
517 tungstenite::Error::Io(_) => DeltasError::from(error),
518 tungstenite::Error::Protocol(_) => DeltasError::from(error),
519 _ => DeltasError::Fatal(error.to_string()),
520 });
521 }
522 };
523 Ok(())
524 }
525
526 async fn unsubscribe_inner(
532 inner: &mut Inner,
533 subscription_id: Uuid,
534 ready_tx: oneshot::Sender<()>,
535 ) -> Result<(), DeltasError> {
536 inner.end_subscription(&subscription_id, ready_tx);
537 let cmd = Command::Unsubscribe { subscription_id };
538 inner
539 .ws_send(tungstenite::protocol::Message::Text(
540 serde_json::to_string(&cmd).expect("serialize cmd encode error"),
541 ))
542 .await?;
543 Ok(())
544 }
545}
546
547#[async_trait]
548impl DeltasClient for WsDeltasClient {
549 #[instrument(skip(self))]
550 async fn subscribe(
551 &self,
552 extractor_id: ExtractorIdentity,
553 options: SubscriptionOptions,
554 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
555 trace!("Starting subscribe");
556 self.ensure_connection().await;
557 let (ready_tx, ready_rx) = oneshot::channel();
558 {
559 let mut guard = self.inner.lock().await;
560 let inner = guard
561 .as_mut()
562 .expect("ws not connected");
563 trace!("Sending subscribe command");
564 inner.new_subscription(&extractor_id, ready_tx)?;
565 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
566 inner
567 .ws_send(tungstenite::protocol::Message::Text(
568 serde_json::to_string(&cmd).expect("serialize cmd encode error"),
569 ))
570 .await?;
571 }
572 trace!("Waiting for subscription response");
573 let rx = ready_rx
574 .await
575 .expect("ready channel closed");
576 trace!("Subscription successfull");
577 Ok(rx)
578 }
579
580 #[instrument(skip(self))]
581 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
582 self.ensure_connection().await;
583 let (ready_tx, ready_rx) = oneshot::channel();
584 {
585 let mut guard = self.inner.lock().await;
586 let inner = guard
587 .as_mut()
588 .expect("ws not connected");
589
590 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
591 }
592 ready_rx
593 .await
594 .expect("ready channel closed");
595
596 Ok(())
597 }
598
599 #[instrument(skip(self))]
600 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
601 if self.is_connected().await {
602 return Err(DeltasError::AlreadyConnected);
603 }
604 let ws_uri = format!("{}{}/ws", self.uri, TYCHO_SERVER_VERSION);
605 info!(?ws_uri, "Starting TychoWebsocketClient");
606
607 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
608 {
609 let mut guard = self.inner.as_ref().lock().await;
610 *guard = None;
611 }
612 let this = self.clone();
613 let jh = tokio::spawn(async move {
614 let mut retry_count = 0;
615 let mut result = Err(DeltasError::NotConnected);
616
617 'retry: while retry_count < this.max_reconnects {
618 info!(?ws_uri, "Connecting to WebSocket server");
619
620 let mut request_builder = Request::builder()
622 .uri(&ws_uri)
623 .header(SEC_WEBSOCKET_KEY, generate_key())
624 .header(SEC_WEBSOCKET_VERSION, 13)
625 .header(CONNECTION, "Upgrade")
626 .header(UPGRADE, "websocket")
627 .header(
628 HOST,
629 this.uri
630 .host()
631 .expect("no host found in tycho url"),
632 )
633 .header(USER_AGENT, format!("tycho-client-{}", env!("CARGO_PKG_VERSION")));
634
635 if let Some(ref key) = this.auth_key {
637 request_builder = request_builder.header(AUTHORIZATION, key);
638 }
639
640 let request = request_builder.body(()).unwrap();
641 let (conn, _) = match connect_async(request).await {
642 Ok(conn) => conn,
643 Err(e) => {
644 retry_count += 1;
646 let mut guard = this.inner.as_ref().lock().await;
647 *guard = None;
648
649 warn!(
650 e = e.to_string(),
651 "Failed to connect to WebSocket server; Reconnecting"
652 );
653 sleep(Duration::from_millis(500)).await;
654
655 continue 'retry;
656 }
657 };
658
659 let (ws_tx_new, ws_rx_new) = conn.split();
660 {
661 let mut guard = this.inner.as_ref().lock().await;
662 *guard =
663 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
664 }
665 let mut msg_rx = ws_rx_new.boxed();
666
667 info!("Connection Successful: TychoWebsocketClient started");
668 this.conn_notify.notify_waiters();
669 result = Ok(());
670
671 loop {
672 let res = tokio::select! {
673 msg = msg_rx.next() => match msg {
674 Some(msg) => this.handle_msg(msg).await,
675 None => { break 'retry } },
677 _ = cmd_rx.recv() => {break 'retry},
678 };
679 if let Err(error) = res {
680 if matches!(
681 error,
682 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
683 ) {
684 retry_count += 1;
686 let mut guard = this.inner.as_ref().lock().await;
687 *guard = None;
688
689 warn!(
690 ?error,
691 ?retry_count,
692 "Connection dropped unexpectedly; Reconnecting"
693 );
694 break;
695 } else {
696 error!(?error, "Fatal error; Exiting");
698 result = Err(error);
699 break 'retry;
700 }
701 }
702 }
703 }
704
705 let mut guard = this.inner.as_ref().lock().await;
707 *guard = None;
708
709 if retry_count >= this.max_reconnects {
711 error!("Max reconnection attempts reached; Exiting");
712 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
714 }
715
716 result
717 });
718
719 self.conn_notify.notified().await;
720
721 if self.is_connected().await {
722 Ok(jh)
723 } else {
724 Err(DeltasError::NotConnected)
725 }
726 }
727
728 #[instrument(skip(self))]
729 async fn close(&self) -> Result<(), DeltasError> {
730 info!("Closing TychoWebsocketClient");
731 let mut guard = self.inner.lock().await;
732 let inner = guard
733 .as_mut()
734 .ok_or_else(|| DeltasError::NotConnected)?;
735 inner
736 .cmd_tx
737 .send(())
738 .await
739 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
740 Ok(())
741 }
742}
743
744#[cfg(test)]
745mod tests {
746 use std::net::SocketAddr;
747
748 use tokio::{net::TcpListener, time::timeout};
749 use tycho_common::dto::Chain;
750
751 use super::*;
752
753 #[derive(Clone)]
754 enum ExpectedComm {
755 Receive(u64, tungstenite::protocol::Message),
756 Send(tungstenite::protocol::Message),
757 }
758
759 async fn mock_tycho_ws(
760 messages: &[ExpectedComm],
761 reconnects: usize,
762 ) -> (SocketAddr, JoinHandle<()>) {
763 info!("Starting mock webserver");
764 let server = TcpListener::bind("127.0.0.1:0")
766 .await
767 .expect("localhost bind failed");
768 let addr = server.local_addr().unwrap();
769 let messages = messages.to_vec();
770
771 let jh = tokio::spawn(async move {
772 info!("mock webserver started");
773 for _ in 0..(reconnects + 1) {
774 if let Ok((stream, _)) = server.accept().await {
775 let mut websocket = tokio_tungstenite::accept_async(stream)
776 .await
777 .unwrap();
778
779 info!("Handling messages..");
780 for c in messages.iter().cloned() {
781 match c {
782 ExpectedComm::Receive(t, exp) => {
783 info!("Awaiting message...");
784 let msg = timeout(Duration::from_millis(t), websocket.next())
785 .await
786 .expect("Receive timeout")
787 .expect("Stream exhausted")
788 .expect("Failed to receive message.");
789 info!("Message received");
790 assert_eq!(msg, exp)
791 }
792 ExpectedComm::Send(data) => {
793 info!("Sending message");
794 websocket
795 .send(data)
796 .await
797 .expect("Failed to send message");
798 info!("Message sent");
799 }
800 };
801 }
802 sleep(Duration::from_millis(100)).await;
803 let _ = websocket.close(None).await;
805 }
806 }
807 });
808 (addr, jh)
809 }
810
811 #[tokio::test]
812 async fn test_subscribe_receive() {
813 let exp_comm = [
814 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
815 {
816 "method":"subscribe",
817 "extractor_id":{
818 "chain":"ethereum",
819 "name":"vm:ambient"
820 },
821 "include_state": true
822 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
823 )),
824 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
825 {
826 "method":"newsubscription",
827 "extractor_id":{
828 "chain":"ethereum",
829 "name":"vm:ambient"
830 },
831 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
832 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
833 )),
834 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
835 {
836 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
837 "deltas": {
838 "extractor": "vm:ambient",
839 "chain": "ethereum",
840 "block": {
841 "number": 123,
842 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
843 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
844 "chain": "ethereum",
845 "ts": "2023-09-14T00:00:00"
846 },
847 "finalized_block_height": 0,
848 "revert": false,
849 "new_tokens": {},
850 "account_updates": {
851 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
852 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
853 "chain": "ethereum",
854 "slots": {},
855 "balance": "0x01f4",
856 "code": "",
857 "change": "Update"
858 }
859 },
860 "state_updates": {
861 "component_1": {
862 "component_id": "component_1",
863 "updated_attributes": {"attr1": "0x01"},
864 "deleted_attributes": ["attr2"]
865 }
866 },
867 "new_protocol_components":
868 { "protocol_1": {
869 "id": "protocol_1",
870 "protocol_system": "system_1",
871 "protocol_type_name": "type_1",
872 "chain": "ethereum",
873 "tokens": ["0x01", "0x02"],
874 "contract_ids": ["0x01", "0x02"],
875 "static_attributes": {"attr1": "0x01f4"},
876 "change": "Update",
877 "creation_tx": "0x01",
878 "created_at": "2023-09-14T00:00:00"
879 }
880 },
881 "deleted_protocol_components": {},
882 "component_balances": {
883 "protocol_1":
884 {
885 "0x01": {
886 "token": "0x01",
887 "balance": "0x01f4",
888 "balance_float": 0.0,
889 "modify_tx": "0x01",
890 "component_id": "protocol_1"
891 }
892 }
893 },
894 "account_balances": {
895 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
896 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
897 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
898 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
899 "balance": "0x01f4",
900 "modify_tx": "0x01"
901 }
902 }
903 },
904 "component_tvl": {
905 "protocol_1": 1000.0
906 }
907 }
908 }
909 "#.to_owned()
910 ))
911 ];
912 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
913
914 let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
915 let jh = client
916 .connect()
917 .await
918 .expect("connect failed");
919 let (_, mut rx) = timeout(
920 Duration::from_millis(100),
921 client.subscribe(
922 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
923 SubscriptionOptions::new(),
924 ),
925 )
926 .await
927 .expect("subscription timed out")
928 .expect("subscription failed");
929 let _ = timeout(Duration::from_millis(100), rx.recv())
930 .await
931 .expect("awaiting message timeout out")
932 .expect("receiving message failed");
933 timeout(Duration::from_millis(100), client.close())
934 .await
935 .expect("close timed out")
936 .expect("close failed");
937 jh.await
938 .expect("ws loop errored")
939 .unwrap();
940 server_thread.await.unwrap();
941 }
942
943 #[tokio::test]
944 async fn test_unsubscribe() {
945 let exp_comm = [
946 ExpectedComm::Receive(
947 100,
948 tungstenite::protocol::Message::Text(
949 r#"
950 {
951 "method": "subscribe",
952 "extractor_id":{
953 "chain": "ethereum",
954 "name": "vm:ambient"
955 },
956 "include_state": true
957 }"#
958 .to_owned()
959 .replace(|c: char| c.is_whitespace(), ""),
960 ),
961 ),
962 ExpectedComm::Send(tungstenite::protocol::Message::Text(
963 r#"
964 {
965 "method": "newsubscription",
966 "extractor_id":{
967 "chain": "ethereum",
968 "name": "vm:ambient"
969 },
970 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
971 }"#
972 .to_owned()
973 .replace(|c: char| c.is_whitespace(), ""),
974 )),
975 ExpectedComm::Receive(
976 100,
977 tungstenite::protocol::Message::Text(
978 r#"
979 {
980 "method": "unsubscribe",
981 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
982 }
983 "#
984 .to_owned()
985 .replace(|c: char| c.is_whitespace(), ""),
986 ),
987 ),
988 ExpectedComm::Send(tungstenite::protocol::Message::Text(
989 r#"
990 {
991 "method": "subscriptionended",
992 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
993 }
994 "#
995 .to_owned()
996 .replace(|c: char| c.is_whitespace(), ""),
997 )),
998 ];
999 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1000
1001 let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1002 let jh = client
1003 .connect()
1004 .await
1005 .expect("connect failed");
1006 let (sub_id, mut rx) = timeout(
1007 Duration::from_millis(100),
1008 client.subscribe(
1009 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1010 SubscriptionOptions::new(),
1011 ),
1012 )
1013 .await
1014 .expect("subscription timed out")
1015 .expect("subscription failed");
1016
1017 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1018 .await
1019 .expect("unsubscribe timed out")
1020 .expect("unsubscribe failed");
1021 let res = timeout(Duration::from_millis(100), rx.recv())
1022 .await
1023 .expect("awaiting message timeout out");
1024
1025 assert!(res.is_none());
1027
1028 timeout(Duration::from_millis(100), client.close())
1029 .await
1030 .expect("close timed out")
1031 .expect("close failed");
1032 jh.await
1033 .expect("ws loop errored")
1034 .unwrap();
1035 server_thread.await.unwrap();
1036 }
1037
1038 #[tokio::test]
1039 async fn test_subscription_unexpected_end() {
1040 let exp_comm = [
1041 ExpectedComm::Receive(
1042 100,
1043 tungstenite::protocol::Message::Text(
1044 r#"
1045 {
1046 "method":"subscribe",
1047 "extractor_id":{
1048 "chain":"ethereum",
1049 "name":"vm:ambient"
1050 },
1051 "include_state": true
1052 }"#
1053 .to_owned()
1054 .replace(|c: char| c.is_whitespace(), ""),
1055 ),
1056 ),
1057 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1058 r#"
1059 {
1060 "method":"newsubscription",
1061 "extractor_id":{
1062 "chain":"ethereum",
1063 "name":"vm:ambient"
1064 },
1065 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1066 }"#
1067 .to_owned()
1068 .replace(|c: char| c.is_whitespace(), ""),
1069 )),
1070 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1071 r#"
1072 {
1073 "method": "subscriptionended",
1074 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1075 }"#
1076 .to_owned()
1077 .replace(|c: char| c.is_whitespace(), ""),
1078 )),
1079 ];
1080 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1081
1082 let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1083 let jh = client
1084 .connect()
1085 .await
1086 .expect("connect failed");
1087 let (_, mut rx) = timeout(
1088 Duration::from_millis(100),
1089 client.subscribe(
1090 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1091 SubscriptionOptions::new(),
1092 ),
1093 )
1094 .await
1095 .expect("subscription timed out")
1096 .expect("subscription failed");
1097 let res = timeout(Duration::from_millis(100), rx.recv())
1098 .await
1099 .expect("awaiting message timeout out");
1100
1101 assert!(res.is_none());
1103
1104 timeout(Duration::from_millis(100), client.close())
1105 .await
1106 .expect("close timed out")
1107 .expect("close failed");
1108 jh.await
1109 .expect("ws loop errored")
1110 .unwrap();
1111 server_thread.await.unwrap();
1112 }
1113
1114 #[test_log::test(tokio::test)]
1115 async fn test_reconnect() {
1116 let exp_comm = [
1117 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1118 {
1119 "method":"subscribe",
1120 "extractor_id":{
1121 "chain":"ethereum",
1122 "name":"vm:ambient"
1123 },
1124 "include_state": true
1125 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1126 )),
1127 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1128 {
1129 "method":"newsubscription",
1130 "extractor_id":{
1131 "chain":"ethereum",
1132 "name":"vm:ambient"
1133 },
1134 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1135 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1136 )),
1137 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1138 {
1139 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1140 "deltas": {
1141 "extractor": "vm:ambient",
1142 "chain": "ethereum",
1143 "block": {
1144 "number": 123,
1145 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1146 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1147 "chain": "ethereum",
1148 "ts": "2023-09-14T00:00:00"
1149 },
1150 "finalized_block_height": 0,
1151 "revert": false,
1152 "new_tokens": {},
1153 "account_updates": {
1154 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1155 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1156 "chain": "ethereum",
1157 "slots": {},
1158 "balance": "0x01f4",
1159 "code": "",
1160 "change": "Update"
1161 }
1162 },
1163 "state_updates": {
1164 "component_1": {
1165 "component_id": "component_1",
1166 "updated_attributes": {"attr1": "0x01"},
1167 "deleted_attributes": ["attr2"]
1168 }
1169 },
1170 "new_protocol_components": {
1171 "protocol_1":
1172 {
1173 "id": "protocol_1",
1174 "protocol_system": "system_1",
1175 "protocol_type_name": "type_1",
1176 "chain": "ethereum",
1177 "tokens": ["0x01", "0x02"],
1178 "contract_ids": ["0x01", "0x02"],
1179 "static_attributes": {"attr1": "0x01f4"},
1180 "change": "Update",
1181 "creation_tx": "0x01",
1182 "created_at": "2023-09-14T00:00:00"
1183 }
1184 },
1185 "deleted_protocol_components": {},
1186 "component_balances": {
1187 "protocol_1": {
1188 "0x01": {
1189 "token": "0x01",
1190 "balance": "0x01f4",
1191 "balance_float": 1000.0,
1192 "modify_tx": "0x01",
1193 "component_id": "protocol_1"
1194 }
1195 }
1196 },
1197 "account_balances": {
1198 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1199 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1200 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1201 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1202 "balance": "0x01f4",
1203 "modify_tx": "0x01"
1204 }
1205 }
1206 },
1207 "component_tvl": {
1208 "protocol_1": 1000.0
1209 }
1210 }
1211 }
1212 "#.to_owned()
1213 ))
1214 ];
1215 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1216 let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1217 let jh: JoinHandle<Result<(), DeltasError>> = client
1218 .connect()
1219 .await
1220 .expect("connect failed");
1221
1222 for _ in 0..2 {
1223 let (_, mut rx) = timeout(
1224 Duration::from_millis(100),
1225 client.subscribe(
1226 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1227 SubscriptionOptions::new(),
1228 ),
1229 )
1230 .await
1231 .expect("subscription timed out")
1232 .expect("subscription failed");
1233
1234 let _ = timeout(Duration::from_millis(100), rx.recv())
1235 .await
1236 .expect("awaiting message timeout out")
1237 .expect("receiving message failed");
1238
1239 let res = timeout(Duration::from_millis(200), rx.recv())
1241 .await
1242 .expect("awaiting closed connection timeout out");
1243 assert!(res.is_none());
1244 }
1245 let res = jh.await.expect("ws client join failed");
1246 assert!(res.is_err());
1248 server_thread
1249 .await
1250 .expect("ws server loop errored");
1251 }
1252
1253 async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1254 let server = TcpListener::bind("127.0.0.1:0")
1255 .await
1256 .expect("localhost bind failed");
1257 let addr = server.local_addr().unwrap();
1258 let jh = tokio::spawn(async move {
1259 while let Ok((stream, _)) = server.accept().await {
1260 drop(stream);
1262 }
1263 });
1264 (addr, jh)
1265 }
1266
1267 #[tokio::test]
1268 async fn test_connect_max_attempts() {
1269 let (addr, _) = mock_bad_connection_tycho_ws().await;
1270 let client =
1271 WsDeltasClient::new_with_reconnects(&format!("ws://{}", addr), 3, None).unwrap();
1272
1273 let join_handle = client.connect().await;
1274
1275 assert!(join_handle.is_err());
1276 assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1277 }
1278}