1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62 BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70 #[error("Failed to parse URI: {0}. Error: {1}")]
72 UriParsing(String, String),
73
74 #[error("The requested subscription is already pending")]
76 SubscriptionAlreadyPending,
77
78 #[error("The server replied with an error: {0}")]
79 ServerError(String, #[source] WebsocketError),
80
81 #[error("{0}")]
84 TransportError(String),
85
86 #[error("The buffer is full!")]
90 BufferFull,
91
92 #[error("The client is not connected!")]
96 NotConnected,
97
98 #[error("The client is already connected!")]
100 AlreadyConnected,
101
102 #[error("The server closed the connection!")]
104 ConnectionClosed,
105
106 #[error("Connection error: {0}")]
108 ConnectionError(#[from] Box<tungstenite::Error>),
109
110 #[error("Tycho FatalError: {0}")]
112 Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117 include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121 fn default() -> Self {
122 Self { include_state: true }
123 }
124}
125
126impl SubscriptionOptions {
127 pub fn new() -> Self {
128 Self::default()
129 }
130 pub fn with_state(mut self, val: bool) -> Self {
131 self.include_state = val;
132 self
133 }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139 async fn subscribe(
146 &self,
147 extractor_id: ExtractorIdentity,
148 options: SubscriptionOptions,
149 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157 async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163 uri: Uri,
165 auth_key: Option<String>,
167 max_reconnects: u64,
169 retry_cooldown: Duration,
171 ws_buffer_size: usize,
174 subscription_buffer_size: usize,
177 conn_notify: Arc<Notify>,
179 inner: Arc<Mutex<Option<Inner>>>,
181 dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188#[derive(Debug)]
200enum SubscriptionInfo {
201 RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203 Active,
205 RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209struct Inner {
211 sink: WebSocketSink,
213 cmd_tx: Sender<()>,
215 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217 subscriptions: HashMap<Uuid, SubscriptionInfo>,
219 sender: HashMap<Uuid, Sender<BlockChanges>>,
222 buffer_size: usize,
224}
225
226impl Inner {
230 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231 Self {
232 sink,
233 cmd_tx,
234 pending: HashMap::new(),
235 subscriptions: HashMap::new(),
236 sender: HashMap::new(),
237 buffer_size,
238 }
239 }
240
241 #[allow(clippy::result_large_err)]
243 fn new_subscription(
244 &mut self,
245 id: &ExtractorIdentity,
246 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247 ) -> Result<(), DeltasError> {
248 if self.pending.contains_key(id) {
249 return Err(DeltasError::SubscriptionAlreadyPending);
250 }
251 self.pending
252 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253 Ok(())
254 }
255
256 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260 if let Some(info) = self.pending.remove(extractor_id) {
261 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262 let (tx, rx) = mpsc::channel(self.buffer_size);
263 self.sender.insert(subscription_id, tx);
264 self.subscriptions
265 .insert(subscription_id, SubscriptionInfo::Active);
266 let _ = ready_tx
267 .send(Ok((subscription_id, rx)))
268 .map_err(|_| {
269 warn!(
270 ?extractor_id,
271 ?subscription_id,
272 "Subscriber for has gone away. Ignoring."
273 )
274 });
275 } else {
276 error!(
277 ?extractor_id,
278 ?subscription_id,
279 "Pending subscription was not in the correct state to
280 transition to active. Ignoring!"
281 )
282 }
283 } else {
284 error!(
285 ?extractor_id,
286 ?subscription_id,
287 "Tried to mark an unknown subscription as active. Ignoring!"
288 );
289 }
290 }
291
292 #[allow(clippy::result_large_err)]
294 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295 if let Some(sender) = self.sender.get_mut(id) {
296 sender
297 .try_send(msg)
298 .map_err(|e| match e {
299 TrySendError::Full(_) => DeltasError::BufferFull,
300 TrySendError::Closed(_) => {
301 DeltasError::TransportError("The subscriber has gone away".to_string())
302 }
303 })?;
304 }
305 Ok(())
306 }
307
308 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313 if let Some(info) = self
314 .subscriptions
315 .get_mut(subscription_id)
316 {
317 if let SubscriptionInfo::Active = info {
318 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319 }
320 } else {
321 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323 }
324 }
325
326 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333 if let Entry::Occupied(e) = self
334 .subscriptions
335 .entry(subscription_id)
336 {
337 let info = e.remove();
338 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339 let _ = tx.send(()).map_err(|_| {
340 debug!(?subscription_id, "failed to notify about removed subscription")
341 });
342 self.sender
343 .remove(&subscription_id)
344 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345 } else {
346 warn!(?subscription_id, "Subscription ended unexpectedly!");
347 self.sender
348 .remove(&subscription_id)
349 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350 }
351 } else {
352 trace!(
357 ?subscription_id,
358 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
359 );
360 }
361
362 Ok(())
363 }
364
365 fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
366 if let Some(sub_info) = self.pending.remove(extractor_id) {
367 match sub_info {
368 SubscriptionInfo::RequestedSubscription(tx) => {
369 let _ = tx
370 .send(Err(DeltasError::ServerError(
371 format!("Subscription failed: {error}"),
372 error.clone(),
373 )))
374 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
375 }
376 _ => {
377 error!(?extractor_id, "Pending subscription in wrong state")
378 }
379 }
380 } else {
381 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
382 }
383 }
384
385 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
387 self.sink.send(msg).await.map_err(|e| {
388 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
389 })
390 }
391}
392
393impl WsDeltasClient {
395 #[allow(clippy::result_large_err)]
397 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
398 let uri = ws_uri
399 .parse::<Uri>()
400 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
401 Ok(Self {
402 uri,
403 auth_key: auth_key.map(|s| s.to_string()),
404 inner: Arc::new(Mutex::new(None)),
405 ws_buffer_size: 128,
406 subscription_buffer_size: 128,
407 conn_notify: Arc::new(Notify::new()),
408 max_reconnects: 5,
409 retry_cooldown: Duration::from_millis(500),
410 dead: Arc::new(AtomicBool::new(false)),
411 })
412 }
413
414 #[allow(clippy::result_large_err)]
416 pub fn new_with_reconnects(
417 ws_uri: &str,
418 auth_key: Option<&str>,
419 max_reconnects: u64,
420 retry_cooldown: Duration,
421 ) -> Result<Self, DeltasError> {
422 let uri = ws_uri
423 .parse::<Uri>()
424 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
425
426 Ok(Self {
427 uri,
428 auth_key: auth_key.map(|s| s.to_string()),
429 inner: Arc::new(Mutex::new(None)),
430 ws_buffer_size: 128,
431 subscription_buffer_size: 128,
432 conn_notify: Arc::new(Notify::new()),
433 max_reconnects,
434 retry_cooldown,
435 dead: Arc::new(AtomicBool::new(false)),
436 })
437 }
438
439 #[cfg(test)]
441 #[allow(clippy::result_large_err)]
442 pub fn new_with_custom_buffers(
443 ws_uri: &str,
444 auth_key: Option<&str>,
445 ws_buffer_size: usize,
446 subscription_buffer_size: usize,
447 ) -> Result<Self, DeltasError> {
448 let uri = ws_uri
449 .parse::<Uri>()
450 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
451 Ok(Self {
452 uri,
453 auth_key: auth_key.map(|s| s.to_string()),
454 inner: Arc::new(Mutex::new(None)),
455 ws_buffer_size,
456 subscription_buffer_size,
457 conn_notify: Arc::new(Notify::new()),
458 max_reconnects: 5,
459 retry_cooldown: Duration::from_millis(0),
460 dead: Arc::new(AtomicBool::new(false)),
461 })
462 }
463
464 async fn is_connected(&self) -> bool {
468 let guard = self.inner.as_ref().lock().await;
469 guard.is_some()
470 }
471
472 async fn ensure_connection(&self) -> Result<(), DeltasError> {
477 if self.dead.load(Ordering::SeqCst) {
478 return Err(DeltasError::NotConnected)
479 };
480 if !self.is_connected().await {
481 self.conn_notify.notified().await;
482 };
483 Ok(())
484 }
485
486 #[instrument(skip(self, msg))]
491 async fn handle_msg(
492 &self,
493 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
494 ) -> Result<(), DeltasError> {
495 let mut guard = self.inner.lock().await;
496
497 match msg {
498 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
503 serde_json::Value,
504 >(&text)
505 {
506 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
507 Ok(ws_message) => match ws_message {
508 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
509 trace!(?deltas, "Received a block state change, sending to channel");
510 let inner = guard
511 .as_mut()
512 .ok_or_else(|| DeltasError::NotConnected)?;
513 match inner.send(&subscription_id, deltas) {
514 Err(DeltasError::BufferFull) => {
515 error!(?subscription_id, "Buffer full, unsubscribing!");
516 Self::force_unsubscribe(subscription_id, inner).await;
517 }
518 Err(_) => {
519 warn!(
520 ?subscription_id,
521 "Receiver for has gone away, unsubscribing!"
522 );
523 Self::force_unsubscribe(subscription_id, inner).await;
524 }
525 _ => { }
526 }
527 }
528 WebSocketMessage::Response(Response::NewSubscription {
529 extractor_id,
530 subscription_id,
531 }) => {
532 info!(?extractor_id, ?subscription_id, "Received a new subscription");
533 let inner = guard
534 .as_mut()
535 .ok_or_else(|| DeltasError::NotConnected)?;
536 inner.mark_active(&extractor_id, subscription_id);
537 }
538 WebSocketMessage::Response(Response::SubscriptionEnded {
539 subscription_id,
540 }) => {
541 info!(?subscription_id, "Received a subscription ended");
542 let inner = guard
543 .as_mut()
544 .ok_or_else(|| DeltasError::NotConnected)?;
545 inner.remove_subscription(subscription_id)?;
546 }
547 WebSocketMessage::Response(Response::Error(error)) => match &error {
548 WebsocketError::ExtractorNotFound(extractor_id) => {
549 let inner = guard
550 .as_mut()
551 .ok_or_else(|| DeltasError::NotConnected)?;
552 inner.cancel_pending(extractor_id, &error);
553 }
554 WebsocketError::SubscriptionNotFound(subscription_id) => {
555 debug!("Received subscription not found, removing subscription");
556 let inner = guard
557 .as_mut()
558 .ok_or_else(|| DeltasError::NotConnected)?;
559 inner.remove_subscription(*subscription_id)?;
560 }
561 WebsocketError::ParseError(raw, e) => {
562 return Err(DeltasError::ServerError(
563 format!(
564 "Server failed to parse client message: {e}, msg: {raw}"
565 ),
566 error.clone(),
567 ))
568 }
569 WebsocketError::CompressionError(subscription_id, e) => {
570 return Err(DeltasError::ServerError(
571 format!(
572 "Server failed to compress message for subscription: {subscription_id}, error: {e}"
573 ),
574 error.clone(),
575 ))
576 }
577 WebsocketError::SubscribeError(extractor_id) => {
578 let inner = guard
579 .as_mut()
580 .ok_or_else(|| DeltasError::NotConnected)?;
581 inner.cancel_pending(extractor_id, &error);
582 }
583 },
584 },
585 Err(e) => {
586 error!(
587 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
588 e, text
589 );
590 }
591 },
592 Err(e) => {
593 error!(
594 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
595 e, text
596 );
597 }
598 },
599 Ok(tungstenite::protocol::Message::Ping(_)) => {
600 let inner = guard
602 .as_mut()
603 .ok_or_else(|| DeltasError::NotConnected)?;
604 if let Err(error) = inner
605 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
606 .await
607 {
608 debug!(?error, "Failed to send pong!");
609 }
610 }
611 Ok(tungstenite::protocol::Message::Pong(_)) => {
612 }
614 Ok(tungstenite::protocol::Message::Close(_)) => {
615 return Err(DeltasError::ConnectionClosed);
616 }
617 Ok(unknown_msg) => {
618 info!("Received an unknown message type: {:?}", unknown_msg);
619 }
620 Err(error) => {
621 error!(?error, "Websocket error");
622 return Err(match error {
623 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
624 tungstenite::Error::AlreadyClosed => {
625 warn!("Received AlreadyClosed error which is indicative of a bug!");
626 DeltasError::ConnectionError(Box::new(error))
627 }
628 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
629 DeltasError::ConnectionError(Box::new(error))
630 }
631 _ => DeltasError::Fatal(error.to_string()),
632 });
633 }
634 };
635 Ok(())
636 }
637
638 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
643 if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
645 .subscriptions
646 .get(&subscription_id)
647 {
648 return
649 }
650
651 let (tx, rx) = oneshot::channel();
652 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
653 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
654 } else {
655 match tokio::time::timeout(Duration::from_secs(5), rx).await {
657 Ok(_) => {
658 debug!(?subscription_id, "Unsubscribe completed successfully");
659 }
660 Err(_) => {
661 warn!(?subscription_id, "Unsubscribe completion timed out");
662 }
663 }
664 }
665 }
666
667 async fn unsubscribe_inner(
673 inner: &mut Inner,
674 subscription_id: Uuid,
675 ready_tx: oneshot::Sender<()>,
676 ) -> Result<(), DeltasError> {
677 debug!(?subscription_id, "Unsubscribing");
678 inner.end_subscription(&subscription_id, ready_tx);
679 let cmd = Command::Unsubscribe { subscription_id };
680 inner
681 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
682 |e| {
683 DeltasError::TransportError(format!(
684 "Failed to serialize unsubscribe command: {e}"
685 ))
686 },
687 )?))
688 .await?;
689 Ok(())
690 }
691}
692
693#[async_trait]
694impl DeltasClient for WsDeltasClient {
695 #[instrument(skip(self))]
696 async fn subscribe(
697 &self,
698 extractor_id: ExtractorIdentity,
699 options: SubscriptionOptions,
700 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
701 trace!("Starting subscribe");
702 self.ensure_connection().await?;
703 let (ready_tx, ready_rx) = oneshot::channel();
704 {
705 let mut guard = self.inner.lock().await;
706 let inner = guard
707 .as_mut()
708 .ok_or_else(|| DeltasError::NotConnected)?;
709 trace!("Sending subscribe command");
710 inner.new_subscription(&extractor_id, ready_tx)?;
711 let cmd = Command::Subscribe {
712 extractor_id,
713 include_state: options.include_state,
714 compression: false,
715 };
716 inner
717 .ws_send(tungstenite::protocol::Message::Text(
718 serde_json::to_string(&cmd).map_err(|e| {
719 DeltasError::TransportError(format!(
720 "Failed to serialize subscribe command: {e}"
721 ))
722 })?,
723 ))
724 .await?;
725 }
726 trace!("Waiting for subscription response");
727 let res = ready_rx.await.map_err(|_| {
728 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
729 })??;
730 trace!("Subscription successful");
731 Ok(res)
732 }
733
734 #[instrument(skip(self))]
735 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
736 self.ensure_connection().await?;
737 let (ready_tx, ready_rx) = oneshot::channel();
738 {
739 let mut guard = self.inner.lock().await;
740 let inner = guard
741 .as_mut()
742 .ok_or_else(|| DeltasError::NotConnected)?;
743
744 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
745 }
746 ready_rx.await.map_err(|_| {
747 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
748 })?;
749
750 Ok(())
751 }
752
753 #[instrument(skip(self))]
754 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
755 if self.is_connected().await {
756 return Err(DeltasError::AlreadyConnected);
757 }
758 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
759 info!(?ws_uri, "Starting TychoWebsocketClient");
760
761 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
762 {
763 let mut guard = self.inner.as_ref().lock().await;
764 *guard = None;
765 }
766 let this = self.clone();
767 let jh = tokio::spawn(async move {
768 let mut retry_count = 0;
769 let mut result = Err(DeltasError::NotConnected);
770
771 'retry: while retry_count < this.max_reconnects {
772 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
773 if retry_count > 0 {
774 sleep(this.retry_cooldown).await;
775 }
776
777 let mut request_builder = Request::builder()
779 .uri(&ws_uri)
780 .header(SEC_WEBSOCKET_KEY, generate_key())
781 .header(SEC_WEBSOCKET_VERSION, 13)
782 .header(CONNECTION, "Upgrade")
783 .header(UPGRADE, "websocket")
784 .header(
785 HOST,
786 this.uri.host().ok_or_else(|| {
787 DeltasError::UriParsing(
788 ws_uri.clone(),
789 "No host found in tycho url".to_string(),
790 )
791 })?,
792 )
793 .header(
794 USER_AGENT,
795 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
796 );
797
798 if let Some(ref key) = this.auth_key {
800 request_builder = request_builder.header(AUTHORIZATION, key);
801 }
802
803 let request = request_builder.body(()).map_err(|e| {
804 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
805 })?;
806 let (conn, _) = match connect_async(request).await {
807 Ok(conn) => conn,
808 Err(e) => {
809 retry_count += 1;
811 let mut guard = this.inner.as_ref().lock().await;
812 *guard = None;
813
814 warn!(
815 e = e.to_string(),
816 "Failed to connect to WebSocket server; Reconnecting"
817 );
818 continue 'retry;
819 }
820 };
821
822 let (ws_tx_new, ws_rx_new) = conn.split();
823 {
824 let mut guard = this.inner.as_ref().lock().await;
825 *guard =
826 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
827 }
828 let mut msg_rx = ws_rx_new.boxed();
829
830 info!("Connection Successful: TychoWebsocketClient started");
831 this.conn_notify.notify_waiters();
832 result = Ok(());
833
834 loop {
835 let res = tokio::select! {
836 msg = msg_rx.next() => match msg {
837 Some(msg) => this.handle_msg(msg).await,
838 None => {
839 warn!("Websocket connection silently closed, giving up!");
843 break 'retry
844 }
845 },
846 _ = cmd_rx.recv() => {break 'retry},
847 };
848 if let Err(error) = res {
849 debug!(?error, "WsError");
850 if matches!(
851 error,
852 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
853 ) {
854 retry_count += 1;
856 let mut guard = this.inner.as_ref().lock().await;
857 *guard = None;
858
859 warn!(
860 ?error,
861 ?retry_count,
862 "Connection dropped unexpectedly; Reconnecting..."
863 );
864 break;
865 } else {
866 error!(?error, "Fatal error; Exiting");
868 result = Err(error);
869 break 'retry;
870 }
871 }
872 }
873 }
874 debug!(
875 retry_count,
876 max_reconnects=?this.max_reconnects,
877 "Reconnection loop ended"
878 );
879 let mut guard = this.inner.as_ref().lock().await;
881 *guard = None;
882
883 if retry_count >= this.max_reconnects {
885 error!("Max reconnection attempts reached; Exiting");
886 this.dead.store(true, Ordering::SeqCst);
887 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
889 }
890
891 result
892 });
893
894 self.conn_notify.notified().await;
895
896 if self.is_connected().await {
897 Ok(jh)
898 } else {
899 Err(DeltasError::NotConnected)
900 }
901 }
902
903 #[instrument(skip(self))]
904 async fn close(&self) -> Result<(), DeltasError> {
905 info!("Closing TychoWebsocketClient");
906 let mut guard = self.inner.lock().await;
907 let inner = guard
908 .as_mut()
909 .ok_or_else(|| DeltasError::NotConnected)?;
910 inner
911 .cmd_tx
912 .send(())
913 .await
914 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
915 Ok(())
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::net::SocketAddr;
922
923 use test_log::test;
924 use tokio::{net::TcpListener, time::timeout};
925 use tycho_common::dto::Chain;
926
927 use super::*;
928
929 #[derive(Clone)]
930 enum ExpectedComm {
931 Receive(u64, tungstenite::protocol::Message),
932 Send(tungstenite::protocol::Message),
933 }
934
935 async fn mock_tycho_ws(
936 messages: &[ExpectedComm],
937 reconnects: usize,
938 ) -> (SocketAddr, JoinHandle<()>) {
939 info!("Starting mock webserver");
940 let server = TcpListener::bind("127.0.0.1:0")
942 .await
943 .expect("localhost bind failed");
944 let addr = server.local_addr().unwrap();
945 let messages = messages.to_vec();
946
947 let jh = tokio::spawn(async move {
948 info!("mock webserver started");
949 for _ in 0..(reconnects + 1) {
950 info!("Awaiting client connections");
951 if let Ok((stream, _)) = server.accept().await {
952 info!("Client connected");
953 let mut websocket = tokio_tungstenite::accept_async(stream)
954 .await
955 .unwrap();
956
957 info!("Handling messages..");
958 for c in messages.iter().cloned() {
959 match c {
960 ExpectedComm::Receive(t, exp) => {
961 info!("Awaiting message...");
962 let msg = timeout(Duration::from_millis(t), websocket.next())
963 .await
964 .expect("Receive timeout")
965 .expect("Stream exhausted")
966 .expect("Failed to receive message.");
967 info!("Message received");
968 assert_eq!(msg, exp)
969 }
970 ExpectedComm::Send(data) => {
971 info!("Sending message");
972 websocket
973 .send(data)
974 .await
975 .expect("Failed to send message");
976 info!("Message sent");
977 }
978 };
979 }
980 info!("Mock communication completed");
981 sleep(Duration::from_millis(100)).await;
982 let _ = websocket.close(None).await;
984 info!("Mock server closed connection");
985 }
986 }
987 info!("mock server ended");
988 });
989 (addr, jh)
990 }
991
992 #[tokio::test]
993 async fn test_subscribe_receive() {
994 let exp_comm = [
995 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
996 {
997 "method":"subscribe",
998 "extractor_id":{
999 "chain":"ethereum",
1000 "name":"vm:ambient"
1001 },
1002 "include_state": true,
1003 "compression": false
1004 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1005 )),
1006 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1007 {
1008 "method":"newsubscription",
1009 "extractor_id":{
1010 "chain":"ethereum",
1011 "name":"vm:ambient"
1012 },
1013 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1015 )),
1016 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1017 {
1018 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1019 "deltas": {
1020 "extractor": "vm:ambient",
1021 "chain": "ethereum",
1022 "block": {
1023 "number": 123,
1024 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1025 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1026 "chain": "ethereum",
1027 "ts": "2023-09-14T00:00:00"
1028 },
1029 "finalized_block_height": 0,
1030 "revert": false,
1031 "new_tokens": {},
1032 "account_updates": {
1033 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1034 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1035 "chain": "ethereum",
1036 "slots": {},
1037 "balance": "0x01f4",
1038 "code": "",
1039 "change": "Update"
1040 }
1041 },
1042 "state_updates": {
1043 "component_1": {
1044 "component_id": "component_1",
1045 "updated_attributes": {"attr1": "0x01"},
1046 "deleted_attributes": ["attr2"]
1047 }
1048 },
1049 "new_protocol_components":
1050 { "protocol_1": {
1051 "id": "protocol_1",
1052 "protocol_system": "system_1",
1053 "protocol_type_name": "type_1",
1054 "chain": "ethereum",
1055 "tokens": ["0x01", "0x02"],
1056 "contract_ids": ["0x01", "0x02"],
1057 "static_attributes": {"attr1": "0x01f4"},
1058 "change": "Update",
1059 "creation_tx": "0x01",
1060 "created_at": "2023-09-14T00:00:00"
1061 }
1062 },
1063 "deleted_protocol_components": {},
1064 "component_balances": {
1065 "protocol_1":
1066 {
1067 "0x01": {
1068 "token": "0x01",
1069 "balance": "0x01f4",
1070 "balance_float": 0.0,
1071 "modify_tx": "0x01",
1072 "component_id": "protocol_1"
1073 }
1074 }
1075 },
1076 "account_balances": {
1077 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1078 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1079 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1080 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1081 "balance": "0x01f4",
1082 "modify_tx": "0x01"
1083 }
1084 }
1085 },
1086 "component_tvl": {
1087 "protocol_1": 1000.0
1088 },
1089 "dci_update": {
1090 "new_entrypoints": {},
1091 "new_entrypoint_params": {},
1092 "trace_results": {}
1093 }
1094 }
1095 }
1096 "#.to_owned()
1097 ))
1098 ];
1099 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1100
1101 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1102 let jh = client
1103 .connect()
1104 .await
1105 .expect("connect failed");
1106 let (_, mut rx) = timeout(
1107 Duration::from_millis(100),
1108 client.subscribe(
1109 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1110 SubscriptionOptions::new(),
1111 ),
1112 )
1113 .await
1114 .expect("subscription timed out")
1115 .expect("subscription failed");
1116 let _ = timeout(Duration::from_millis(100), rx.recv())
1117 .await
1118 .expect("awaiting message timeout out")
1119 .expect("receiving message failed");
1120 timeout(Duration::from_millis(100), client.close())
1121 .await
1122 .expect("close timed out")
1123 .expect("close failed");
1124 jh.await
1125 .expect("ws loop errored")
1126 .unwrap();
1127 server_thread.await.unwrap();
1128 }
1129
1130 #[tokio::test]
1131 async fn test_unsubscribe() {
1132 let exp_comm = [
1133 ExpectedComm::Receive(
1134 100,
1135 tungstenite::protocol::Message::Text(
1136 r#"
1137 {
1138 "method": "subscribe",
1139 "extractor_id":{
1140 "chain": "ethereum",
1141 "name": "vm:ambient"
1142 },
1143 "include_state": true,
1144 "compression": false
1145 }"#
1146 .to_owned()
1147 .replace(|c: char| c.is_whitespace(), ""),
1148 ),
1149 ),
1150 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1151 r#"
1152 {
1153 "method": "newsubscription",
1154 "extractor_id":{
1155 "chain": "ethereum",
1156 "name": "vm:ambient"
1157 },
1158 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1159 }"#
1160 .to_owned()
1161 .replace(|c: char| c.is_whitespace(), ""),
1162 )),
1163 ExpectedComm::Receive(
1164 100,
1165 tungstenite::protocol::Message::Text(
1166 r#"
1167 {
1168 "method": "unsubscribe",
1169 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1170 }
1171 "#
1172 .to_owned()
1173 .replace(|c: char| c.is_whitespace(), ""),
1174 ),
1175 ),
1176 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1177 r#"
1178 {
1179 "method": "subscriptionended",
1180 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1181 }
1182 "#
1183 .to_owned()
1184 .replace(|c: char| c.is_whitespace(), ""),
1185 )),
1186 ];
1187 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1188
1189 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1190 let jh = client
1191 .connect()
1192 .await
1193 .expect("connect failed");
1194 let (sub_id, mut rx) = timeout(
1195 Duration::from_millis(100),
1196 client.subscribe(
1197 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1198 SubscriptionOptions::new(),
1199 ),
1200 )
1201 .await
1202 .expect("subscription timed out")
1203 .expect("subscription failed");
1204
1205 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1206 .await
1207 .expect("unsubscribe timed out")
1208 .expect("unsubscribe failed");
1209 let res = timeout(Duration::from_millis(100), rx.recv())
1210 .await
1211 .expect("awaiting message timeout out");
1212
1213 assert!(res.is_none());
1215
1216 timeout(Duration::from_millis(100), client.close())
1217 .await
1218 .expect("close timed out")
1219 .expect("close failed");
1220 jh.await
1221 .expect("ws loop errored")
1222 .unwrap();
1223 server_thread.await.unwrap();
1224 }
1225
1226 #[tokio::test]
1227 async fn test_subscription_unexpected_end() {
1228 let exp_comm = [
1229 ExpectedComm::Receive(
1230 100,
1231 tungstenite::protocol::Message::Text(
1232 r#"
1233 {
1234 "method":"subscribe",
1235 "extractor_id":{
1236 "chain":"ethereum",
1237 "name":"vm:ambient"
1238 },
1239 "include_state": true,
1240 "compression": false
1241 }"#
1242 .to_owned()
1243 .replace(|c: char| c.is_whitespace(), ""),
1244 ),
1245 ),
1246 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1247 r#"
1248 {
1249 "method":"newsubscription",
1250 "extractor_id":{
1251 "chain":"ethereum",
1252 "name":"vm:ambient"
1253 },
1254 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1255 }"#
1256 .to_owned()
1257 .replace(|c: char| c.is_whitespace(), ""),
1258 )),
1259 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1260 r#"
1261 {
1262 "method": "subscriptionended",
1263 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1264 }"#
1265 .to_owned()
1266 .replace(|c: char| c.is_whitespace(), ""),
1267 )),
1268 ];
1269 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1270
1271 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1272 let jh = client
1273 .connect()
1274 .await
1275 .expect("connect failed");
1276 let (_, mut rx) = timeout(
1277 Duration::from_millis(100),
1278 client.subscribe(
1279 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1280 SubscriptionOptions::new(),
1281 ),
1282 )
1283 .await
1284 .expect("subscription timed out")
1285 .expect("subscription failed");
1286 let res = timeout(Duration::from_millis(100), rx.recv())
1287 .await
1288 .expect("awaiting message timeout out");
1289
1290 assert!(res.is_none());
1292
1293 timeout(Duration::from_millis(100), client.close())
1294 .await
1295 .expect("close timed out")
1296 .expect("close failed");
1297 jh.await
1298 .expect("ws loop errored")
1299 .unwrap();
1300 server_thread.await.unwrap();
1301 }
1302
1303 #[test_log::test(tokio::test)]
1304 async fn test_reconnect() {
1305 let exp_comm = [
1306 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1307 {
1308 "method":"subscribe",
1309 "extractor_id":{
1310 "chain":"ethereum",
1311 "name":"vm:ambient"
1312 },
1313 "include_state": true,
1314 "compression": false
1315 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1316 )),
1317 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1318 {
1319 "method":"newsubscription",
1320 "extractor_id":{
1321 "chain":"ethereum",
1322 "name":"vm:ambient"
1323 },
1324 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1325 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1326 )),
1327 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1328 {
1329 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1330 "deltas": {
1331 "extractor": "vm:ambient",
1332 "chain": "ethereum",
1333 "block": {
1334 "number": 123,
1335 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1336 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1337 "chain": "ethereum",
1338 "ts": "2023-09-14T00:00:00"
1339 },
1340 "finalized_block_height": 0,
1341 "revert": false,
1342 "new_tokens": {},
1343 "account_updates": {
1344 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1345 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1346 "chain": "ethereum",
1347 "slots": {},
1348 "balance": "0x01f4",
1349 "code": "",
1350 "change": "Update"
1351 }
1352 },
1353 "state_updates": {
1354 "component_1": {
1355 "component_id": "component_1",
1356 "updated_attributes": {"attr1": "0x01"},
1357 "deleted_attributes": ["attr2"]
1358 }
1359 },
1360 "new_protocol_components": {
1361 "protocol_1":
1362 {
1363 "id": "protocol_1",
1364 "protocol_system": "system_1",
1365 "protocol_type_name": "type_1",
1366 "chain": "ethereum",
1367 "tokens": ["0x01", "0x02"],
1368 "contract_ids": ["0x01", "0x02"],
1369 "static_attributes": {"attr1": "0x01f4"},
1370 "change": "Update",
1371 "creation_tx": "0x01",
1372 "created_at": "2023-09-14T00:00:00"
1373 }
1374 },
1375 "deleted_protocol_components": {},
1376 "component_balances": {
1377 "protocol_1": {
1378 "0x01": {
1379 "token": "0x01",
1380 "balance": "0x01f4",
1381 "balance_float": 1000.0,
1382 "modify_tx": "0x01",
1383 "component_id": "protocol_1"
1384 }
1385 }
1386 },
1387 "account_balances": {
1388 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1389 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1390 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1391 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1392 "balance": "0x01f4",
1393 "modify_tx": "0x01"
1394 }
1395 }
1396 },
1397 "component_tvl": {
1398 "protocol_1": 1000.0
1399 },
1400 "dci_update": {
1401 "new_entrypoints": {},
1402 "new_entrypoint_params": {},
1403 "trace_results": {}
1404 }
1405 }
1406 }
1407 "#.to_owned()
1408 ))
1409 ];
1410 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1411 let client = WsDeltasClient::new_with_reconnects(
1412 &format!("ws://{addr}"),
1413 None,
1414 3,
1415 Duration::from_millis(110),
1417 )
1418 .unwrap();
1419
1420 let jh: JoinHandle<Result<(), DeltasError>> = client
1421 .connect()
1422 .await
1423 .expect("connect failed");
1424
1425 for _ in 0..2 {
1426 dbg!("loop");
1427 let (_, mut rx) = timeout(
1428 Duration::from_millis(200),
1429 client.subscribe(
1430 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1431 SubscriptionOptions::new(),
1432 ),
1433 )
1434 .await
1435 .expect("subscription timed out")
1436 .expect("subscription failed");
1437
1438 let _ = timeout(Duration::from_millis(100), rx.recv())
1439 .await
1440 .expect("awaiting message timeout out")
1441 .expect("receiving message failed");
1442
1443 let res = timeout(Duration::from_millis(200), rx.recv())
1445 .await
1446 .expect("awaiting closed connection timeout out");
1447 assert!(res.is_none());
1448 }
1449 let res = jh.await.expect("ws client join failed");
1450 assert!(res.is_err());
1452 server_thread
1453 .await
1454 .expect("ws server loop errored");
1455 }
1456
1457 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1458 let server = TcpListener::bind("127.0.0.1:0")
1459 .await
1460 .expect("localhost bind failed");
1461 let addr = server.local_addr().unwrap();
1462 let jh = tokio::spawn(async move {
1463 while let Ok((stream, _)) = server.accept().await {
1464 if accept_first {
1465 let stream = tokio_tungstenite::accept_async(stream)
1467 .await
1468 .unwrap();
1469 sleep(Duration::from_millis(10)).await;
1470 drop(stream)
1471 } else {
1472 drop(stream);
1474 }
1475 }
1476 });
1477 (addr, jh)
1478 }
1479
1480 #[test(tokio::test)]
1481 async fn test_subscribe_dead_client_after_max_attempts() {
1482 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1483 let client = WsDeltasClient::new_with_reconnects(
1484 &format!("ws://{addr}"),
1485 None,
1486 3,
1487 Duration::from_secs(0),
1488 )
1489 .unwrap();
1490
1491 let join_handle = client.connect().await.unwrap();
1492 let handle_res = join_handle.await.unwrap();
1493 assert!(handle_res.is_err());
1494 assert!(!client.is_connected().await);
1495
1496 let subscription_res = timeout(
1497 Duration::from_millis(10),
1498 client.subscribe(
1499 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1500 SubscriptionOptions::new(),
1501 ),
1502 )
1503 .await
1504 .unwrap();
1505 assert!(subscription_res.is_err());
1506 }
1507
1508 #[test(tokio::test)]
1509 async fn test_ws_client_retry_cooldown() {
1510 let start = std::time::Instant::now();
1511 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1512
1513 let client = WsDeltasClient::new_with_reconnects(
1515 &format!("ws://{addr}"),
1516 None,
1517 3, Duration::from_millis(50), )
1520 .unwrap();
1521
1522 let connect_result = client.connect().await;
1524 let elapsed = start.elapsed();
1525
1526 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1528
1529 assert!(
1531 elapsed >= Duration::from_millis(100),
1532 "Expected at least 100ms elapsed, got {:?}",
1533 elapsed
1534 );
1535
1536 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1538 }
1539
1540 #[test_log::test(tokio::test)]
1541 async fn test_buffer_full_triggers_unsubscribe() {
1542 let exp_comm = {
1544 [
1545 ExpectedComm::Receive(
1547 100,
1548 tungstenite::protocol::Message::Text(
1549 r#"
1550 {
1551 "method":"subscribe",
1552 "extractor_id":{
1553 "chain":"ethereum",
1554 "name":"vm:ambient"
1555 },
1556 "include_state": true,
1557 "compression": false
1558 }"#
1559 .to_owned()
1560 .replace(|c: char| c.is_whitespace(), ""),
1561 ),
1562 ),
1563 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1565 r#"
1566 {
1567 "method":"newsubscription",
1568 "extractor_id":{
1569 "chain":"ethereum",
1570 "name":"vm:ambient"
1571 },
1572 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1573 }"#
1574 .to_owned()
1575 .replace(|c: char| c.is_whitespace(), ""),
1576 )),
1577 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1579 r#"
1580 {
1581 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1582 "deltas": {
1583 "extractor": "vm:ambient",
1584 "chain": "ethereum",
1585 "block": {
1586 "number": 123,
1587 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1588 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1589 "chain": "ethereum",
1590 "ts": "2023-09-14T00:00:00"
1591 },
1592 "finalized_block_height": 0,
1593 "revert": false,
1594 "new_tokens": {},
1595 "account_updates": {},
1596 "state_updates": {},
1597 "new_protocol_components": {},
1598 "deleted_protocol_components": {},
1599 "component_balances": {},
1600 "account_balances": {},
1601 "component_tvl": {},
1602 "dci_update": {
1603 "new_entrypoints": {},
1604 "new_entrypoint_params": {},
1605 "trace_results": {}
1606 }
1607 }
1608 }
1609 "#.to_owned()
1610 )),
1611 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1613 r#"
1614 {
1615 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1616 "deltas": {
1617 "extractor": "vm:ambient",
1618 "chain": "ethereum",
1619 "block": {
1620 "number": 124,
1621 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1622 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1623 "chain": "ethereum",
1624 "ts": "2023-09-14T00:00:01"
1625 },
1626 "finalized_block_height": 0,
1627 "revert": false,
1628 "new_tokens": {},
1629 "account_updates": {},
1630 "state_updates": {},
1631 "new_protocol_components": {},
1632 "deleted_protocol_components": {},
1633 "component_balances": {},
1634 "account_balances": {},
1635 "component_tvl": {},
1636 "dci_update": {
1637 "new_entrypoints": {},
1638 "new_entrypoint_params": {},
1639 "trace_results": {}
1640 }
1641 }
1642 }
1643 "#.to_owned()
1644 )),
1645 ExpectedComm::Receive(
1647 100,
1648 tungstenite::protocol::Message::Text(
1649 r#"
1650 {
1651 "method": "unsubscribe",
1652 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1653 }
1654 "#
1655 .to_owned()
1656 .replace(|c: char| c.is_whitespace(), ""),
1657 ),
1658 ),
1659 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1661 r#"
1662 {
1663 "method": "subscriptionended",
1664 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1665 }
1666 "#
1667 .to_owned()
1668 .replace(|c: char| c.is_whitespace(), ""),
1669 )),
1670 ]
1671 };
1672
1673 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1674
1675 let client = WsDeltasClient::new_with_custom_buffers(
1677 &format!("ws://{addr}"),
1678 None,
1679 128, 1, )
1682 .unwrap();
1683
1684 let jh = client
1685 .connect()
1686 .await
1687 .expect("connect failed");
1688
1689 let (_sub_id, mut rx) = timeout(
1690 Duration::from_millis(100),
1691 client.subscribe(
1692 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1693 SubscriptionOptions::new(),
1694 ),
1695 )
1696 .await
1697 .expect("subscription timed out")
1698 .expect("subscription failed");
1699
1700 tokio::time::sleep(Duration::from_millis(100)).await;
1702
1703 let mut received_msgs = Vec::new();
1705
1706 while received_msgs.len() < 3 {
1708 match timeout(Duration::from_millis(200), rx.recv()).await {
1709 Ok(Some(msg)) => {
1710 received_msgs.push(msg);
1711 }
1712 Ok(None) => {
1713 break;
1715 }
1716 Err(_) => {
1717 break;
1719 }
1720 }
1721 }
1722
1723 assert!(
1725 received_msgs.len() <= 1,
1726 "Expected buffer overflow to limit messages to at most 1, got {}",
1727 received_msgs.len()
1728 );
1729
1730 if let Some(first_msg) = received_msgs.first() {
1731 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1732 }
1733
1734 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1741
1742 jh.abort();
1744 server_thread.abort();
1745
1746 let _ = jh.await;
1747 let _ = server_thread.await;
1748 }
1749
1750 #[tokio::test]
1751 async fn test_server_error_handling() {
1752 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1753
1754 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1755
1756 let error_response = WebSocketMessage::Response(Response::Error(
1758 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1759 ));
1760 let error_json = serde_json::to_string(&error_response).unwrap();
1761
1762 let exp_comm = [
1763 ExpectedComm::Receive(
1764 100,
1765 tungstenite::protocol::Message::Text(
1766 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1767 ),
1768 ),
1769 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1770 ];
1771
1772 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1773
1774 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1775 let jh = client
1776 .connect()
1777 .await
1778 .expect("connect failed");
1779
1780 let result = timeout(
1781 Duration::from_millis(100),
1782 client.subscribe(extractor_id, SubscriptionOptions::new()),
1783 )
1784 .await
1785 .expect("subscription timed out");
1786
1787 assert!(result.is_err());
1789 if let Err(DeltasError::ServerError(msg, _)) = result {
1790 assert!(msg.contains("Subscription failed"));
1791 assert!(msg.contains("Extractor not found"));
1792 } else {
1793 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1794 }
1795
1796 timeout(Duration::from_millis(100), client.close())
1797 .await
1798 .expect("close timed out")
1799 .expect("close failed");
1800 jh.await
1801 .expect("ws loop errored")
1802 .unwrap();
1803 server_thread.await.unwrap();
1804 }
1805
1806 #[test_log::test(tokio::test)]
1807 async fn test_subscription_not_found_error() {
1808 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1810
1811 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1812 let subscription_id = Uuid::new_v4();
1813
1814 let error_response = WebSocketMessage::Response(Response::Error(
1815 WebsocketError::SubscriptionNotFound(subscription_id),
1816 ));
1817 let error_json = serde_json::to_string(&error_response).unwrap();
1818
1819 let exp_comm = [
1820 ExpectedComm::Receive(
1822 100,
1823 tungstenite::protocol::Message::Text(
1824 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1825 ),
1826 ),
1827 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1828 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1829 subscription_id
1830 ))),
1831 ExpectedComm::Receive(
1833 100,
1834 tungstenite::protocol::Message::Text(format!(
1835 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1836 subscription_id
1837 )),
1838 ),
1839 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1841 ];
1842
1843 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1844
1845 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1846 let jh = client
1847 .connect()
1848 .await
1849 .expect("connect failed");
1850
1851 let (received_sub_id, _rx) = timeout(
1853 Duration::from_millis(100),
1854 client.subscribe(extractor_id, SubscriptionOptions::new()),
1855 )
1856 .await
1857 .expect("subscription timed out")
1858 .expect("subscription failed");
1859
1860 assert_eq!(received_sub_id, subscription_id);
1861
1862 let unsubscribe_result =
1864 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1865 .await
1866 .expect("unsubscribe timed out");
1867
1868 unsubscribe_result
1872 .expect("Unsubscribe should succeed even if server says subscription not found");
1873
1874 timeout(Duration::from_millis(100), client.close())
1875 .await
1876 .expect("close timed out")
1877 .expect("close failed");
1878 jh.await
1879 .expect("ws loop errored")
1880 .unwrap();
1881 server_thread.await.unwrap();
1882 }
1883
1884 #[test_log::test(tokio::test)]
1885 async fn test_parse_error_handling() {
1886 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1887
1888 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1889 let error_response = WebSocketMessage::Response(Response::Error(
1890 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1891 ));
1892 let error_json = serde_json::to_string(&error_response).unwrap();
1893
1894 let exp_comm = [
1895 ExpectedComm::Receive(
1897 100,
1898 tungstenite::protocol::Message::Text(
1899 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1900 ),
1901 ),
1902 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1903 ];
1904
1905 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1906
1907 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1908 let jh = client
1909 .connect()
1910 .await
1911 .expect("connect failed");
1912
1913 let _ = timeout(
1915 Duration::from_millis(100),
1916 client.subscribe(extractor_id, SubscriptionOptions::new()),
1917 )
1918 .await
1919 .expect("subscription timed out");
1920
1921 let result = jh
1923 .await
1924 .expect("ws loop should complete");
1925 assert!(result.is_err());
1926 if let Err(DeltasError::ServerError(message, _)) = result {
1927 assert!(message.contains("Server failed to parse client message"));
1928 } else {
1929 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1930 }
1931
1932 server_thread.await.unwrap();
1933 }
1934
1935 #[test_log::test(tokio::test)]
1936 async fn test_compression_error_handling() {
1937 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1938
1939 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1940 let subscription_id = Uuid::new_v4();
1941 let error_response = WebSocketMessage::Response(Response::Error(
1942 WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1943 ));
1944 let error_json = serde_json::to_string(&error_response).unwrap();
1945
1946 let exp_comm = [
1947 ExpectedComm::Receive(
1949 100,
1950 tungstenite::protocol::Message::Text(
1951 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1952 ),
1953 ),
1954 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1955 ];
1956
1957 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1958
1959 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1960 let jh = client
1961 .connect()
1962 .await
1963 .expect("connect failed");
1964
1965 let _ = timeout(
1967 Duration::from_millis(100),
1968 client.subscribe(extractor_id, SubscriptionOptions::new()),
1969 )
1970 .await
1971 .expect("subscription timed out");
1972
1973 let result = jh
1975 .await
1976 .expect("ws loop should complete");
1977 assert!(result.is_err());
1978 if let Err(DeltasError::ServerError(message, _)) = result {
1979 assert!(message.contains("Server failed to compress message for subscription"));
1980 } else {
1981 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1982 }
1983
1984 server_thread.await.unwrap();
1985 }
1986
1987 #[tokio::test]
1988 async fn test_subscribe_error_handling() {
1989 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1990
1991 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1992
1993 let error_response = WebSocketMessage::Response(Response::Error(
1994 WebsocketError::SubscribeError(extractor_id.clone()),
1995 ));
1996 let error_json = serde_json::to_string(&error_response).unwrap();
1997
1998 let exp_comm = [
1999 ExpectedComm::Receive(
2000 100,
2001 tungstenite::protocol::Message::Text(
2002 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true,"compression":false}"#.to_string()
2003 ),
2004 ),
2005 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2006 ];
2007
2008 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2009
2010 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2011 let jh = client
2012 .connect()
2013 .await
2014 .expect("connect failed");
2015
2016 let result = timeout(
2017 Duration::from_millis(100),
2018 client.subscribe(extractor_id, SubscriptionOptions::new()),
2019 )
2020 .await
2021 .expect("subscription timed out");
2022
2023 assert!(result.is_err());
2025 if let Err(DeltasError::ServerError(msg, _)) = result {
2026 assert!(msg.contains("Subscription failed"));
2027 assert!(msg.contains("Failed to subscribe to extractor"));
2028 } else {
2029 panic!("Expected DeltasError::ServerError, got: {:?}", result);
2030 }
2031
2032 timeout(Duration::from_millis(100), client.close())
2033 .await
2034 .expect("close timed out")
2035 .expect("close failed");
2036 jh.await
2037 .expect("ws loop errored")
2038 .unwrap();
2039 server_thread.await.unwrap();
2040 }
2041
2042 #[tokio::test]
2043 async fn test_cancel_pending_subscription() {
2044 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2046
2047 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
2048
2049 let error_response = WebSocketMessage::Response(Response::Error(
2050 WebsocketError::ExtractorNotFound(extractor_id.clone()),
2051 ));
2052 let error_json = serde_json::to_string(&error_response).unwrap();
2053
2054 let exp_comm = [
2055 ExpectedComm::Receive(
2056 100,
2057 tungstenite::protocol::Message::Text(
2058 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
2059 ),
2060 ),
2061 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2062 ];
2063
2064 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2065
2066 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2067 let jh = client
2068 .connect()
2069 .await
2070 .expect("connect failed");
2071
2072 let client_clone = client.clone();
2074 let extractor_id_clone = extractor_id.clone();
2075
2076 let subscription1 = tokio::spawn({
2077 let client_for_spawn = client.clone();
2078 async move {
2079 client_for_spawn
2080 .subscribe(extractor_id, SubscriptionOptions::new())
2081 .await
2082 }
2083 });
2084
2085 let subscription2 = tokio::spawn(async move {
2086 client_clone
2088 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2089 .await
2090 });
2091
2092 let (result1, result2) = tokio::join!(subscription1, subscription2);
2093
2094 let result1 = result1.unwrap();
2095 let result2 = result2.unwrap();
2096
2097 assert!(result1.is_err() || result2.is_err());
2100
2101 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2102 } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2104 } else {
2106 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2107 }
2108
2109 timeout(Duration::from_millis(100), client.close())
2110 .await
2111 .expect("close timed out")
2112 .expect("close failed");
2113 jh.await
2114 .expect("ws loop errored")
2115 .unwrap();
2116 server_thread.await.unwrap();
2117 }
2118
2119 #[tokio::test]
2120 async fn test_force_unsubscribe_prevents_multiple_calls() {
2121 let subscription_id = Uuid::new_v4();
2125
2126 let exp_comm = [
2127 ExpectedComm::Receive(
2128 100,
2129 tungstenite::protocol::Message::Text(
2130 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true,"compression":false}"#.to_string()
2131 ),
2132 ),
2133 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2134 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2135 subscription_id
2136 ))),
2137 ExpectedComm::Receive(
2139 100,
2140 tungstenite::protocol::Message::Text(format!(
2141 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2142 subscription_id
2143 )),
2144 ),
2145 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2146 r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2147 subscription_id
2148 ))),
2149 ];
2150
2151 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2152
2153 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2154 let jh = client
2155 .connect()
2156 .await
2157 .expect("connect failed");
2158
2159 let (received_sub_id, _rx) = timeout(
2160 Duration::from_millis(100),
2161 client.subscribe(
2162 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2163 SubscriptionOptions::new(),
2164 ),
2165 )
2166 .await
2167 .expect("subscription timed out")
2168 .expect("subscription failed");
2169
2170 assert_eq!(received_sub_id, subscription_id);
2171
2172 {
2174 let mut inner_guard = client.inner.lock().await;
2175 let inner = inner_guard
2176 .as_mut()
2177 .expect("client should be connected");
2178
2179 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2181 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2182 }
2183
2184 tokio::time::sleep(Duration::from_millis(50)).await;
2186
2187 let _ = timeout(Duration::from_millis(100), client.close()).await;
2189
2190 let _ = jh.await;
2192 let _ = server_thread.await;
2193 }
2194}