1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::Arc,
25 time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31 header::{
32 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33 USER_AGENT,
34 },
35 Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41 net::TcpStream,
42 sync::{
43 mpsc::{self, error::TrySendError, Receiver, Sender},
44 oneshot, Mutex, Notify,
45 },
46 task::JoinHandle,
47 time::sleep,
48};
49use tokio_tungstenite::{
50 connect_async,
51 tungstenite::{
52 self,
53 handshake::client::{generate_key, Request},
54 },
55 MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65 #[error("Failed to parse URI: {0}. Error: {1}")]
67 UriParsing(String, String),
68
69 #[error("The requested subscription is already pending")]
71 SubscriptionAlreadyPending,
72
73 #[error("{0}")]
76 TransportError(String),
77
78 #[error("The buffer is full!")]
82 BufferFull,
83
84 #[error("The client is not connected!")]
88 NotConnected,
89
90 #[error("The client is already connected!")]
92 AlreadyConnected,
93
94 #[error("The server closed the connection!")]
96 ConnectionClosed,
97
98 #[error("Connection error: {0}")]
100 ConnectionError(#[from] Box<tungstenite::Error>),
101
102 #[error("Tycho FatalError: {0}")]
104 Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109 include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113 fn default() -> Self {
114 Self { include_state: true }
115 }
116}
117
118impl SubscriptionOptions {
119 pub fn new() -> Self {
120 Self::default()
121 }
122 pub fn with_state(mut self, val: bool) -> Self {
123 self.include_state = val;
124 self
125 }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131 async fn subscribe(
138 &self,
139 extractor_id: ExtractorIdentity,
140 options: SubscriptionOptions,
141 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149 async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155 uri: Uri,
157 auth_key: Option<String>,
159 max_reconnects: u32,
161 ws_buffer_size: usize,
164 subscription_buffer_size: usize,
167 conn_notify: Arc<Notify>,
169 inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176#[derive(Debug)]
188enum SubscriptionInfo {
189 RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191 Active,
193 RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197struct Inner {
199 sink: WebSocketSink,
201 cmd_tx: Sender<()>,
203 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205 subscriptions: HashMap<Uuid, SubscriptionInfo>,
207 sender: HashMap<Uuid, Sender<BlockChanges>>,
210 buffer_size: usize,
212}
213
214impl Inner {
218 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219 Self {
220 sink,
221 cmd_tx,
222 pending: HashMap::new(),
223 subscriptions: HashMap::new(),
224 sender: HashMap::new(),
225 buffer_size,
226 }
227 }
228
229 #[allow(clippy::result_large_err)]
231 fn new_subscription(
232 &mut self,
233 id: &ExtractorIdentity,
234 ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235 ) -> Result<(), DeltasError> {
236 if self.pending.contains_key(id) {
237 return Err(DeltasError::SubscriptionAlreadyPending);
238 }
239 self.pending
240 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241 Ok(())
242 }
243
244 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248 if let Some(info) = self.pending.remove(extractor_id) {
249 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250 let (tx, rx) = mpsc::channel(self.buffer_size);
251 self.sender.insert(subscription_id, tx);
252 self.subscriptions
253 .insert(subscription_id, SubscriptionInfo::Active);
254 let _ = ready_tx
255 .send((subscription_id, rx))
256 .map_err(|_| {
257 warn!(
258 ?extractor_id,
259 ?subscription_id,
260 "Subscriber for has gone away. Ignoring."
261 )
262 });
263 } else {
264 error!(
265 ?extractor_id,
266 ?subscription_id,
267 "Pending subscription was not in the correct state to
268 transition to active. Ignoring!"
269 )
270 }
271 } else {
272 error!(
273 ?extractor_id,
274 ?subscription_id,
275 "Tried to mark an unkown subscription as active. Ignoring!"
276 );
277 }
278 }
279
280 #[allow(clippy::result_large_err)]
282 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283 if let Some(sender) = self.sender.get_mut(id) {
284 sender
285 .try_send(msg)
286 .map_err(|e| match e {
287 TrySendError::Full(_) => DeltasError::BufferFull,
288 TrySendError::Closed(_) => {
289 DeltasError::TransportError("The subscriber has gone away".to_string())
290 }
291 })?;
292 }
293 Ok(())
294 }
295
296 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301 if let Some(info) = self
302 .subscriptions
303 .get_mut(subscription_id)
304 {
305 if let SubscriptionInfo::Active = info {
306 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307 }
308 } else {
309 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311 }
312 }
313
314 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321 if let Entry::Occupied(e) = self
322 .subscriptions
323 .entry(subscription_id)
324 {
325 let info = e.remove();
326 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327 let _ = tx.send(()).map_err(|_| {
328 warn!(?subscription_id, "failed to notify about removed subscription")
329 });
330 self.sender
331 .remove(&subscription_id)
332 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333 } else {
334 warn!(?subscription_id, "Subscription ended unexpectedly!");
335 self.sender
336 .remove(&subscription_id)
337 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338 }
339 } else {
340 error!(
341 ?subscription_id,
342 "Received `SubscriptionEnded`, but was never subscribed
343 to it. This is likely a bug!"
344 );
345 }
346
347 Ok(())
348 }
349
350 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352 self.sink.send(msg).await.map_err(|e| {
353 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354 })
355 }
356}
357
358impl WsDeltasClient {
360 #[allow(clippy::result_large_err)]
362 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363 let uri = ws_uri
364 .parse::<Uri>()
365 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366 Ok(Self {
367 uri,
368 auth_key: auth_key.map(|s| s.to_string()),
369 inner: Arc::new(Mutex::new(None)),
370 ws_buffer_size: 128,
371 subscription_buffer_size: 128,
372 conn_notify: Arc::new(Notify::new()),
373 max_reconnects: 5,
374 })
375 }
376
377 #[allow(clippy::result_large_err)]
379 pub fn new_with_reconnects(
380 ws_uri: &str,
381 max_reconnects: u32,
382 auth_key: Option<&str>,
383 ) -> Result<Self, DeltasError> {
384 let uri = ws_uri
385 .parse::<Uri>()
386 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388 Ok(Self {
389 uri,
390 auth_key: auth_key.map(|s| s.to_string()),
391 inner: Arc::new(Mutex::new(None)),
392 ws_buffer_size: 128,
393 subscription_buffer_size: 128,
394 conn_notify: Arc::new(Notify::new()),
395 max_reconnects,
396 })
397 }
398
399 async fn is_connected(&self) -> bool {
403 let guard = self.inner.as_ref().lock().await;
404 guard.is_some()
405 }
406
407 async fn ensure_connection(&self) {
412 if !self.is_connected().await {
413 self.conn_notify.notified().await;
414 }
415 }
416
417 #[instrument(skip(self, msg))]
422 async fn handle_msg(
423 &self,
424 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425 ) -> Result<(), DeltasError> {
426 let mut guard = self.inner.lock().await;
427
428 match msg {
429 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434 serde_json::Value,
435 >(&text)
436 {
437 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438 Ok(ws_message) => match ws_message {
439 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440 trace!(?deltas, "Received a block state change, sending to channel");
441 let inner = guard
442 .as_mut()
443 .ok_or_else(|| DeltasError::NotConnected)?;
444 match inner.send(&subscription_id, deltas) {
445 Err(DeltasError::BufferFull) => {
446 error!(?subscription_id, "Buffer full, message dropped!");
447 }
448 Err(_) => {
449 warn!(
450 ?subscription_id,
451 "Receiver for has gone away, unsubscribing!"
452 );
453 let (tx, _) = oneshot::channel();
454 let _ = WsDeltasClient::unsubscribe_inner(
455 inner,
456 subscription_id,
457 tx,
458 )
459 .await;
460 }
461 _ => { }
462 }
463 }
464 WebSocketMessage::Response(Response::NewSubscription {
465 extractor_id,
466 subscription_id,
467 }) => {
468 info!(?extractor_id, ?subscription_id, "Received a new subscription");
469 let inner = guard
470 .as_mut()
471 .ok_or_else(|| DeltasError::NotConnected)?;
472 inner.mark_active(&extractor_id, subscription_id);
473 }
474 WebSocketMessage::Response(Response::SubscriptionEnded {
475 subscription_id,
476 }) => {
477 info!(?subscription_id, "Received a subscription ended");
478 let inner = guard
479 .as_mut()
480 .ok_or_else(|| DeltasError::NotConnected)?;
481 inner.remove_subscription(subscription_id)?;
482 }
483 },
484 Err(e) => {
485 error!(error = %e, message=text, "Failed to deserialize WebSocketMessage: message does not match expected structs");
486 }
487 },
488 Err(e) => {
489 error!(error = %e, message=text, "Failed to deserialize message: invalid json");
490 }
491 },
492 Ok(tungstenite::protocol::Message::Ping(_)) => {
493 let inner = guard
495 .as_mut()
496 .ok_or_else(|| DeltasError::NotConnected)?;
497 if let Err(error) = inner
498 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
499 .await
500 {
501 debug!(?error, "Failed to send pong!");
502 }
503 }
504 Ok(tungstenite::protocol::Message::Pong(_)) => {
505 }
507 Ok(tungstenite::protocol::Message::Close(_)) => {
508 return Err(DeltasError::ConnectionClosed);
509 }
510 Ok(unknown_msg) => {
511 info!("Received an unknown message type: {:?}", unknown_msg);
512 }
513 Err(error) => {
514 error!(?error, "Websocket error");
515 return Err(match error {
516 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
517 tungstenite::Error::AlreadyClosed => {
518 warn!("Received AlreadyClosed error which is indicative of a bug!");
519 DeltasError::ConnectionError(Box::new(error))
520 }
521 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
522 DeltasError::ConnectionError(Box::new(error))
523 }
524 _ => DeltasError::Fatal(error.to_string()),
525 });
526 }
527 };
528 Ok(())
529 }
530
531 async fn unsubscribe_inner(
537 inner: &mut Inner,
538 subscription_id: Uuid,
539 ready_tx: oneshot::Sender<()>,
540 ) -> Result<(), DeltasError> {
541 inner.end_subscription(&subscription_id, ready_tx);
542 let cmd = Command::Unsubscribe { subscription_id };
543 inner
544 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
545 |e| {
546 DeltasError::TransportError(format!(
547 "Failed to serialize unsubscribe command: {e}"
548 ))
549 },
550 )?))
551 .await?;
552 Ok(())
553 }
554}
555
556#[async_trait]
557impl DeltasClient for WsDeltasClient {
558 #[instrument(skip(self))]
559 async fn subscribe(
560 &self,
561 extractor_id: ExtractorIdentity,
562 options: SubscriptionOptions,
563 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
564 trace!("Starting subscribe");
565 self.ensure_connection().await;
566 let (ready_tx, ready_rx) = oneshot::channel();
567 {
568 let mut guard = self.inner.lock().await;
569 let inner = guard
570 .as_mut()
571 .ok_or_else(|| DeltasError::NotConnected)?;
572 trace!("Sending subscribe command");
573 inner.new_subscription(&extractor_id, ready_tx)?;
574 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
575 inner
576 .ws_send(tungstenite::protocol::Message::Text(
577 serde_json::to_string(&cmd).map_err(|e| {
578 DeltasError::TransportError(format!(
579 "Failed to serialize subscribe command: {e}"
580 ))
581 })?,
582 ))
583 .await?;
584 }
585 trace!("Waiting for subscription response");
586 let rx = ready_rx.await.map_err(|_| {
587 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
588 })?;
589 trace!("Subscription successful");
590 Ok(rx)
591 }
592
593 #[instrument(skip(self))]
594 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
595 self.ensure_connection().await;
596 let (ready_tx, ready_rx) = oneshot::channel();
597 {
598 let mut guard = self.inner.lock().await;
599 let inner = guard
600 .as_mut()
601 .ok_or_else(|| DeltasError::NotConnected)?;
602
603 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
604 }
605 ready_rx.await.map_err(|_| {
606 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
607 })?;
608
609 Ok(())
610 }
611
612 #[instrument(skip(self))]
613 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
614 if self.is_connected().await {
615 return Err(DeltasError::AlreadyConnected);
616 }
617 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
618 info!(?ws_uri, "Starting TychoWebsocketClient");
619
620 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
621 {
622 let mut guard = self.inner.as_ref().lock().await;
623 *guard = None;
624 }
625 let this = self.clone();
626 let jh = tokio::spawn(async move {
627 let mut retry_count = 0;
628 let mut result = Err(DeltasError::NotConnected);
629
630 'retry: while retry_count < this.max_reconnects {
631 info!(?ws_uri, "Connecting to WebSocket server");
632
633 let mut request_builder = Request::builder()
635 .uri(&ws_uri)
636 .header(SEC_WEBSOCKET_KEY, generate_key())
637 .header(SEC_WEBSOCKET_VERSION, 13)
638 .header(CONNECTION, "Upgrade")
639 .header(UPGRADE, "websocket")
640 .header(
641 HOST,
642 this.uri.host().ok_or_else(|| {
643 DeltasError::UriParsing(
644 ws_uri.clone(),
645 "No host found in tycho url".to_string(),
646 )
647 })?,
648 )
649 .header(
650 USER_AGENT,
651 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
652 );
653
654 if let Some(ref key) = this.auth_key {
656 request_builder = request_builder.header(AUTHORIZATION, key);
657 }
658
659 let request = request_builder.body(()).map_err(|e| {
660 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
661 })?;
662 let (conn, _) = match connect_async(request).await {
663 Ok(conn) => conn,
664 Err(e) => {
665 retry_count += 1;
667 let mut guard = this.inner.as_ref().lock().await;
668 *guard = None;
669
670 warn!(
671 e = e.to_string(),
672 "Failed to connect to WebSocket server; Reconnecting"
673 );
674 sleep(Duration::from_millis(500)).await;
675
676 continue 'retry;
677 }
678 };
679
680 let (ws_tx_new, ws_rx_new) = conn.split();
681 {
682 let mut guard = this.inner.as_ref().lock().await;
683 *guard =
684 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
685 }
686 let mut msg_rx = ws_rx_new.boxed();
687
688 info!("Connection Successful: TychoWebsocketClient started");
689 this.conn_notify.notify_waiters();
690 result = Ok(());
691
692 loop {
693 let res = tokio::select! {
694 msg = msg_rx.next() => match msg {
695 Some(msg) => this.handle_msg(msg).await,
696 None => { break 'retry } },
698 _ = cmd_rx.recv() => {break 'retry},
699 };
700 if let Err(error) = res {
701 if matches!(
702 error,
703 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
704 ) {
705 retry_count += 1;
707 let mut guard = this.inner.as_ref().lock().await;
708 *guard = None;
709
710 warn!(
711 ?error,
712 ?retry_count,
713 "Connection dropped unexpectedly; Reconnecting..."
714 );
715 break;
716 } else {
717 error!(?error, "Fatal error; Exiting");
719 result = Err(error);
720 break 'retry;
721 }
722 }
723 }
724 }
725
726 let mut guard = this.inner.as_ref().lock().await;
728 *guard = None;
729
730 if retry_count >= this.max_reconnects {
732 error!("Max reconnection attempts reached; Exiting");
733 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
735 }
736
737 result
738 });
739
740 self.conn_notify.notified().await;
741
742 if self.is_connected().await {
743 Ok(jh)
744 } else {
745 Err(DeltasError::NotConnected)
746 }
747 }
748
749 #[instrument(skip(self))]
750 async fn close(&self) -> Result<(), DeltasError> {
751 info!("Closing TychoWebsocketClient");
752 let mut guard = self.inner.lock().await;
753 let inner = guard
754 .as_mut()
755 .ok_or_else(|| DeltasError::NotConnected)?;
756 inner
757 .cmd_tx
758 .send(())
759 .await
760 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
761 Ok(())
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use std::net::SocketAddr;
768
769 use tokio::{net::TcpListener, time::timeout};
770 use tycho_common::dto::Chain;
771
772 use super::*;
773
774 #[derive(Clone)]
775 enum ExpectedComm {
776 Receive(u64, tungstenite::protocol::Message),
777 Send(tungstenite::protocol::Message),
778 }
779
780 async fn mock_tycho_ws(
781 messages: &[ExpectedComm],
782 reconnects: usize,
783 ) -> (SocketAddr, JoinHandle<()>) {
784 info!("Starting mock webserver");
785 let server = TcpListener::bind("127.0.0.1:0")
787 .await
788 .expect("localhost bind failed");
789 let addr = server.local_addr().unwrap();
790 let messages = messages.to_vec();
791
792 let jh = tokio::spawn(async move {
793 info!("mock webserver started");
794 for _ in 0..(reconnects + 1) {
795 if let Ok((stream, _)) = server.accept().await {
796 let mut websocket = tokio_tungstenite::accept_async(stream)
797 .await
798 .unwrap();
799
800 info!("Handling messages..");
801 for c in messages.iter().cloned() {
802 match c {
803 ExpectedComm::Receive(t, exp) => {
804 info!("Awaiting message...");
805 let msg = timeout(Duration::from_millis(t), websocket.next())
806 .await
807 .expect("Receive timeout")
808 .expect("Stream exhausted")
809 .expect("Failed to receive message.");
810 info!("Message received");
811 assert_eq!(msg, exp)
812 }
813 ExpectedComm::Send(data) => {
814 info!("Sending message");
815 websocket
816 .send(data)
817 .await
818 .expect("Failed to send message");
819 info!("Message sent");
820 }
821 };
822 }
823 sleep(Duration::from_millis(100)).await;
824 let _ = websocket.close(None).await;
826 }
827 }
828 });
829 (addr, jh)
830 }
831
832 #[tokio::test]
833 async fn test_subscribe_receive() {
834 let exp_comm = [
835 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
836 {
837 "method":"subscribe",
838 "extractor_id":{
839 "chain":"ethereum",
840 "name":"vm:ambient"
841 },
842 "include_state": true
843 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
844 )),
845 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
846 {
847 "method":"newsubscription",
848 "extractor_id":{
849 "chain":"ethereum",
850 "name":"vm:ambient"
851 },
852 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
853 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
854 )),
855 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
856 {
857 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
858 "deltas": {
859 "extractor": "vm:ambient",
860 "chain": "ethereum",
861 "block": {
862 "number": 123,
863 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
864 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
865 "chain": "ethereum",
866 "ts": "2023-09-14T00:00:00"
867 },
868 "finalized_block_height": 0,
869 "revert": false,
870 "new_tokens": {},
871 "account_updates": {
872 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
873 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
874 "chain": "ethereum",
875 "slots": {},
876 "balance": "0x01f4",
877 "code": "",
878 "change": "Update"
879 }
880 },
881 "state_updates": {
882 "component_1": {
883 "component_id": "component_1",
884 "updated_attributes": {"attr1": "0x01"},
885 "deleted_attributes": ["attr2"]
886 }
887 },
888 "new_protocol_components":
889 { "protocol_1": {
890 "id": "protocol_1",
891 "protocol_system": "system_1",
892 "protocol_type_name": "type_1",
893 "chain": "ethereum",
894 "tokens": ["0x01", "0x02"],
895 "contract_ids": ["0x01", "0x02"],
896 "static_attributes": {"attr1": "0x01f4"},
897 "change": "Update",
898 "creation_tx": "0x01",
899 "created_at": "2023-09-14T00:00:00"
900 }
901 },
902 "deleted_protocol_components": {},
903 "component_balances": {
904 "protocol_1":
905 {
906 "0x01": {
907 "token": "0x01",
908 "balance": "0x01f4",
909 "balance_float": 0.0,
910 "modify_tx": "0x01",
911 "component_id": "protocol_1"
912 }
913 }
914 },
915 "account_balances": {
916 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
917 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
918 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
919 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
920 "balance": "0x01f4",
921 "modify_tx": "0x01"
922 }
923 }
924 },
925 "component_tvl": {
926 "protocol_1": 1000.0
927 }
928 }
929 }
930 "#.to_owned()
931 ))
932 ];
933 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
934
935 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
936 let jh = client
937 .connect()
938 .await
939 .expect("connect failed");
940 let (_, mut rx) = timeout(
941 Duration::from_millis(100),
942 client.subscribe(
943 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
944 SubscriptionOptions::new(),
945 ),
946 )
947 .await
948 .expect("subscription timed out")
949 .expect("subscription failed");
950 let _ = timeout(Duration::from_millis(100), rx.recv())
951 .await
952 .expect("awaiting message timeout out")
953 .expect("receiving message failed");
954 timeout(Duration::from_millis(100), client.close())
955 .await
956 .expect("close timed out")
957 .expect("close failed");
958 jh.await
959 .expect("ws loop errored")
960 .unwrap();
961 server_thread.await.unwrap();
962 }
963
964 #[tokio::test]
965 async fn test_unsubscribe() {
966 let exp_comm = [
967 ExpectedComm::Receive(
968 100,
969 tungstenite::protocol::Message::Text(
970 r#"
971 {
972 "method": "subscribe",
973 "extractor_id":{
974 "chain": "ethereum",
975 "name": "vm:ambient"
976 },
977 "include_state": true
978 }"#
979 .to_owned()
980 .replace(|c: char| c.is_whitespace(), ""),
981 ),
982 ),
983 ExpectedComm::Send(tungstenite::protocol::Message::Text(
984 r#"
985 {
986 "method": "newsubscription",
987 "extractor_id":{
988 "chain": "ethereum",
989 "name": "vm:ambient"
990 },
991 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
992 }"#
993 .to_owned()
994 .replace(|c: char| c.is_whitespace(), ""),
995 )),
996 ExpectedComm::Receive(
997 100,
998 tungstenite::protocol::Message::Text(
999 r#"
1000 {
1001 "method": "unsubscribe",
1002 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1003 }
1004 "#
1005 .to_owned()
1006 .replace(|c: char| c.is_whitespace(), ""),
1007 ),
1008 ),
1009 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1010 r#"
1011 {
1012 "method": "subscriptionended",
1013 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014 }
1015 "#
1016 .to_owned()
1017 .replace(|c: char| c.is_whitespace(), ""),
1018 )),
1019 ];
1020 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1021
1022 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1023 let jh = client
1024 .connect()
1025 .await
1026 .expect("connect failed");
1027 let (sub_id, mut rx) = timeout(
1028 Duration::from_millis(100),
1029 client.subscribe(
1030 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1031 SubscriptionOptions::new(),
1032 ),
1033 )
1034 .await
1035 .expect("subscription timed out")
1036 .expect("subscription failed");
1037
1038 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1039 .await
1040 .expect("unsubscribe timed out")
1041 .expect("unsubscribe failed");
1042 let res = timeout(Duration::from_millis(100), rx.recv())
1043 .await
1044 .expect("awaiting message timeout out");
1045
1046 assert!(res.is_none());
1048
1049 timeout(Duration::from_millis(100), client.close())
1050 .await
1051 .expect("close timed out")
1052 .expect("close failed");
1053 jh.await
1054 .expect("ws loop errored")
1055 .unwrap();
1056 server_thread.await.unwrap();
1057 }
1058
1059 #[tokio::test]
1060 async fn test_subscription_unexpected_end() {
1061 let exp_comm = [
1062 ExpectedComm::Receive(
1063 100,
1064 tungstenite::protocol::Message::Text(
1065 r#"
1066 {
1067 "method":"subscribe",
1068 "extractor_id":{
1069 "chain":"ethereum",
1070 "name":"vm:ambient"
1071 },
1072 "include_state": true
1073 }"#
1074 .to_owned()
1075 .replace(|c: char| c.is_whitespace(), ""),
1076 ),
1077 ),
1078 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1079 r#"
1080 {
1081 "method":"newsubscription",
1082 "extractor_id":{
1083 "chain":"ethereum",
1084 "name":"vm:ambient"
1085 },
1086 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1087 }"#
1088 .to_owned()
1089 .replace(|c: char| c.is_whitespace(), ""),
1090 )),
1091 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1092 r#"
1093 {
1094 "method": "subscriptionended",
1095 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1096 }"#
1097 .to_owned()
1098 .replace(|c: char| c.is_whitespace(), ""),
1099 )),
1100 ];
1101 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1102
1103 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1104 let jh = client
1105 .connect()
1106 .await
1107 .expect("connect failed");
1108 let (_, mut rx) = timeout(
1109 Duration::from_millis(100),
1110 client.subscribe(
1111 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1112 SubscriptionOptions::new(),
1113 ),
1114 )
1115 .await
1116 .expect("subscription timed out")
1117 .expect("subscription failed");
1118 let res = timeout(Duration::from_millis(100), rx.recv())
1119 .await
1120 .expect("awaiting message timeout out");
1121
1122 assert!(res.is_none());
1124
1125 timeout(Duration::from_millis(100), client.close())
1126 .await
1127 .expect("close timed out")
1128 .expect("close failed");
1129 jh.await
1130 .expect("ws loop errored")
1131 .unwrap();
1132 server_thread.await.unwrap();
1133 }
1134
1135 #[test_log::test(tokio::test)]
1136 async fn test_reconnect() {
1137 let exp_comm = [
1138 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1139 {
1140 "method":"subscribe",
1141 "extractor_id":{
1142 "chain":"ethereum",
1143 "name":"vm:ambient"
1144 },
1145 "include_state": true
1146 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1147 )),
1148 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1149 {
1150 "method":"newsubscription",
1151 "extractor_id":{
1152 "chain":"ethereum",
1153 "name":"vm:ambient"
1154 },
1155 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1156 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1157 )),
1158 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1159 {
1160 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1161 "deltas": {
1162 "extractor": "vm:ambient",
1163 "chain": "ethereum",
1164 "block": {
1165 "number": 123,
1166 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1167 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1168 "chain": "ethereum",
1169 "ts": "2023-09-14T00:00:00"
1170 },
1171 "finalized_block_height": 0,
1172 "revert": false,
1173 "new_tokens": {},
1174 "account_updates": {
1175 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1176 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1177 "chain": "ethereum",
1178 "slots": {},
1179 "balance": "0x01f4",
1180 "code": "",
1181 "change": "Update"
1182 }
1183 },
1184 "state_updates": {
1185 "component_1": {
1186 "component_id": "component_1",
1187 "updated_attributes": {"attr1": "0x01"},
1188 "deleted_attributes": ["attr2"]
1189 }
1190 },
1191 "new_protocol_components": {
1192 "protocol_1":
1193 {
1194 "id": "protocol_1",
1195 "protocol_system": "system_1",
1196 "protocol_type_name": "type_1",
1197 "chain": "ethereum",
1198 "tokens": ["0x01", "0x02"],
1199 "contract_ids": ["0x01", "0x02"],
1200 "static_attributes": {"attr1": "0x01f4"},
1201 "change": "Update",
1202 "creation_tx": "0x01",
1203 "created_at": "2023-09-14T00:00:00"
1204 }
1205 },
1206 "deleted_protocol_components": {},
1207 "component_balances": {
1208 "protocol_1": {
1209 "0x01": {
1210 "token": "0x01",
1211 "balance": "0x01f4",
1212 "balance_float": 1000.0,
1213 "modify_tx": "0x01",
1214 "component_id": "protocol_1"
1215 }
1216 }
1217 },
1218 "account_balances": {
1219 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1220 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1221 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1222 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1223 "balance": "0x01f4",
1224 "modify_tx": "0x01"
1225 }
1226 }
1227 },
1228 "component_tvl": {
1229 "protocol_1": 1000.0
1230 }
1231 }
1232 }
1233 "#.to_owned()
1234 ))
1235 ];
1236 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1237 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1238 let jh: JoinHandle<Result<(), DeltasError>> = client
1239 .connect()
1240 .await
1241 .expect("connect failed");
1242
1243 for _ in 0..2 {
1244 let (_, mut rx) = timeout(
1245 Duration::from_millis(100),
1246 client.subscribe(
1247 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1248 SubscriptionOptions::new(),
1249 ),
1250 )
1251 .await
1252 .expect("subscription timed out")
1253 .expect("subscription failed");
1254
1255 let _ = timeout(Duration::from_millis(100), rx.recv())
1256 .await
1257 .expect("awaiting message timeout out")
1258 .expect("receiving message failed");
1259
1260 let res = timeout(Duration::from_millis(200), rx.recv())
1262 .await
1263 .expect("awaiting closed connection timeout out");
1264 assert!(res.is_none());
1265 }
1266 let res = jh.await.expect("ws client join failed");
1267 assert!(res.is_err());
1269 server_thread
1270 .await
1271 .expect("ws server loop errored");
1272 }
1273
1274 async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1275 let server = TcpListener::bind("127.0.0.1:0")
1276 .await
1277 .expect("localhost bind failed");
1278 let addr = server.local_addr().unwrap();
1279 let jh = tokio::spawn(async move {
1280 while let Ok((stream, _)) = server.accept().await {
1281 drop(stream);
1283 }
1284 });
1285 (addr, jh)
1286 }
1287
1288 #[tokio::test]
1289 async fn test_connect_max_attempts() {
1290 let (addr, _) = mock_bad_connection_tycho_ws().await;
1291 let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1292
1293 let join_handle = client.connect().await;
1294
1295 assert!(join_handle.is_err());
1296 assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1297 }
1298}