1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, MutexGuard, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::{
62 dto::{self, Command, Response, WebSocketMessage, WebsocketError},
63 models::{blockchain::BlockAggregatedChanges, ExtractorIdentity},
64};
65use uuid::Uuid;
66use zstd;
67
68use crate::TYCHO_SERVER_VERSION;
69
70#[derive(Error, Debug)]
71pub enum DeltasError {
72 #[error("Failed to parse URI: {0}. Error: {1}")]
74 UriParsing(String, String),
75
76 #[error("The requested subscription is already pending")]
78 SubscriptionAlreadyPending,
79
80 #[error("The server replied with an error: {0}")]
81 ServerError(String),
82
83 #[error("{0}")]
86 TransportError(String),
87
88 #[error("The buffer is full!")]
92 BufferFull,
93
94 #[error("The client is not connected!")]
98 NotConnected,
99
100 #[error("The client is already connected!")]
102 AlreadyConnected,
103
104 #[error("The server closed the connection!")]
106 ConnectionClosed,
107
108 #[error("Connection error: {0}")]
110 ConnectionError(#[from] Box<tungstenite::Error>),
111
112 #[error("Tycho FatalError: {0}")]
114 Fatal(String),
115}
116
117#[derive(Clone, Debug)]
118pub struct SubscriptionOptions {
119 include_state: bool,
120 compression: bool,
121 partial_blocks: bool,
122}
123
124impl Default for SubscriptionOptions {
125 fn default() -> Self {
126 Self { include_state: true, compression: true, partial_blocks: false }
127 }
128}
129
130impl SubscriptionOptions {
131 pub fn new() -> Self {
132 Self::default()
133 }
134 pub fn with_state(mut self, val: bool) -> Self {
135 self.include_state = val;
136 self
137 }
138 pub fn with_compression(mut self, val: bool) -> Self {
139 self.compression = val;
140 self
141 }
142 pub fn with_partial_blocks(mut self, val: bool) -> Self {
143 self.partial_blocks = val;
144 self
145 }
146}
147
148#[cfg_attr(test, automock)]
149#[async_trait]
150pub trait DeltasClient {
151 async fn subscribe(
158 &self,
159 extractor_id: ExtractorIdentity,
160 options: SubscriptionOptions,
161 ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>;
162
163 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
165
166 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
168
169 async fn close(&self) -> Result<(), DeltasError>;
171}
172
173#[derive(Clone)]
174pub struct WsDeltasClient {
175 uri: Uri,
177 auth_key: Option<String>,
179 max_reconnects: u64,
181 retry_cooldown: Duration,
183 ws_buffer_size: usize,
186 subscription_buffer_size: usize,
189 conn_notify: Arc<Notify>,
191 inner: Arc<Mutex<Option<Inner>>>,
193 dead: Arc<AtomicBool>,
195}
196
197type WebSocketSink =
198 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
199
200#[derive(Debug)]
212enum SubscriptionInfo {
213 RequestedSubscription(
215 oneshot::Sender<Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>>,
216 ),
217 Active,
219 RequestedUnsubscription(oneshot::Sender<()>),
221}
222
223struct Inner {
225 sink: WebSocketSink,
227 cmd_tx: Sender<()>,
229 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
231 subscriptions: HashMap<Uuid, SubscriptionInfo>,
233 sender: HashMap<Uuid, Sender<BlockAggregatedChanges>>,
236 buffer_size: usize,
238}
239
240impl Inner {
244 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
245 Self {
246 sink,
247 cmd_tx,
248 pending: HashMap::new(),
249 subscriptions: HashMap::new(),
250 sender: HashMap::new(),
251 buffer_size,
252 }
253 }
254
255 #[allow(clippy::result_large_err)]
257 fn new_subscription(
258 &mut self,
259 id: &ExtractorIdentity,
260 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>>,
261 ) -> Result<(), DeltasError> {
262 if self.pending.contains_key(id) {
263 return Err(DeltasError::SubscriptionAlreadyPending);
264 }
265 self.pending
266 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
267 Ok(())
268 }
269
270 fn mark_active(&mut self, extractor_id: ExtractorIdentity, subscription_id: Uuid) {
274 if let Some(info) = self.pending.remove(&extractor_id) {
275 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
276 let (tx, rx) = mpsc::channel(self.buffer_size);
277 self.sender.insert(subscription_id, tx);
278 self.subscriptions
279 .insert(subscription_id, SubscriptionInfo::Active);
280 let _ = ready_tx
281 .send(Ok((subscription_id, rx)))
282 .map_err(|_| {
283 warn!(
284 ?extractor_id,
285 ?subscription_id,
286 "Subscriber for has gone away. Ignoring."
287 )
288 });
289 } else {
290 error!(
291 ?extractor_id,
292 ?subscription_id,
293 "Pending subscription was not in the correct state to
294 transition to active. Ignoring!"
295 )
296 }
297 } else {
298 error!(
299 ?extractor_id,
300 ?subscription_id,
301 "Tried to mark an unknown subscription as active. Ignoring!"
302 );
303 }
304 }
305
306 #[allow(clippy::result_large_err)]
308 fn send(&mut self, id: &Uuid, msg: BlockAggregatedChanges) -> Result<(), DeltasError> {
309 if let Some(sender) = self.sender.get_mut(id) {
310 sender
311 .try_send(msg)
312 .map_err(|e| match e {
313 TrySendError::Full(_) => DeltasError::BufferFull,
314 TrySendError::Closed(_) => {
315 DeltasError::TransportError("The subscriber has gone away".to_string())
316 }
317 })?;
318 }
319 Ok(())
320 }
321
322 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
327 if let Some(info) = self
328 .subscriptions
329 .get_mut(subscription_id)
330 {
331 if let SubscriptionInfo::Active = info {
332 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
333 }
334 } else {
335 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
337 }
338 }
339
340 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
347 if let Entry::Occupied(e) = self
348 .subscriptions
349 .entry(subscription_id)
350 {
351 let info = e.remove();
352 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
353 let _ = tx.send(()).map_err(|_| {
354 debug!(?subscription_id, "failed to notify about removed subscription")
355 });
356 self.sender
357 .remove(&subscription_id)
358 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
359 } else {
360 warn!(?subscription_id, "Subscription ended unexpectedly!");
361 self.sender
362 .remove(&subscription_id)
363 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
364 }
365 } else {
366 trace!(
371 ?subscription_id,
372 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
373 );
374 }
375
376 Ok(())
377 }
378
379 fn cancel_pending(&mut self, extractor_id: ExtractorIdentity, error: &WebsocketError) {
380 if let Some(sub_info) = self.pending.remove(&extractor_id) {
381 match sub_info {
382 SubscriptionInfo::RequestedSubscription(tx) => {
383 let _ = tx
384 .send(Err(DeltasError::ServerError(format!(
385 "Subscription failed: {error}"
386 ))))
387 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
388 }
389 _ => {
390 error!(?extractor_id, "Pending subscription in wrong state")
391 }
392 }
393 } else {
394 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
395 }
396 }
397
398 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
400 self.sink.send(msg).await.map_err(|e| {
401 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
402 })
403 }
404}
405
406impl WsDeltasClient {
408 #[allow(clippy::result_large_err)]
410 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
411 let uri = ws_uri
412 .parse::<Uri>()
413 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
414 Ok(Self {
415 uri,
416 auth_key: auth_key.map(|s| s.to_string()),
417 inner: Arc::new(Mutex::new(None)),
418 ws_buffer_size: 128,
419 subscription_buffer_size: 128,
420 conn_notify: Arc::new(Notify::new()),
421 max_reconnects: 5,
422 retry_cooldown: Duration::from_millis(500),
423 dead: Arc::new(AtomicBool::new(false)),
424 })
425 }
426
427 #[allow(clippy::result_large_err)]
429 pub fn new_with_reconnects(
430 ws_uri: &str,
431 auth_key: Option<&str>,
432 max_reconnects: u64,
433 retry_cooldown: Duration,
434 ) -> Result<Self, DeltasError> {
435 let uri = ws_uri
436 .parse::<Uri>()
437 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
438
439 Ok(Self {
440 uri,
441 auth_key: auth_key.map(|s| s.to_string()),
442 inner: Arc::new(Mutex::new(None)),
443 ws_buffer_size: 128,
444 subscription_buffer_size: 128,
445 conn_notify: Arc::new(Notify::new()),
446 max_reconnects,
447 retry_cooldown,
448 dead: Arc::new(AtomicBool::new(false)),
449 })
450 }
451
452 #[cfg(test)]
454 #[allow(clippy::result_large_err)]
455 pub fn new_with_custom_buffers(
456 ws_uri: &str,
457 auth_key: Option<&str>,
458 ws_buffer_size: usize,
459 subscription_buffer_size: usize,
460 ) -> Result<Self, DeltasError> {
461 let uri = ws_uri
462 .parse::<Uri>()
463 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
464 Ok(Self {
465 uri,
466 auth_key: auth_key.map(|s| s.to_string()),
467 inner: Arc::new(Mutex::new(None)),
468 ws_buffer_size,
469 subscription_buffer_size,
470 conn_notify: Arc::new(Notify::new()),
471 max_reconnects: 5,
472 retry_cooldown: Duration::from_millis(0),
473 dead: Arc::new(AtomicBool::new(false)),
474 })
475 }
476
477 async fn is_connected(&self) -> bool {
481 let guard = self.inner.as_ref().lock().await;
482 guard.is_some()
483 }
484
485 async fn ensure_connection(&self) -> Result<(), DeltasError> {
490 if self.dead.load(Ordering::SeqCst) {
491 return Err(DeltasError::NotConnected);
492 }
493 if self.is_connected().await {
494 return Ok(());
495 }
496 let notified = self.conn_notify.notified();
501 tokio::pin!(notified);
502 notified.as_mut().enable();
503 if !self.is_connected().await {
504 notified.await;
505 }
506 if self.dead.load(Ordering::SeqCst) {
507 return Err(DeltasError::NotConnected);
508 }
509 if !self.is_connected().await {
510 return Err(DeltasError::NotConnected);
511 }
512 Ok(())
513 }
514
515 #[instrument(skip(self, msg))]
520 async fn handle_msg(
521 &self,
522 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
523 ) -> Result<(), DeltasError> {
524 let mut guard = self.inner.lock().await;
525
526 match msg {
527 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
532 serde_json::Value,
533 >(&text)
534 {
535 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
536 Ok(ws_message) => match ws_message {
537 WebSocketMessage::BlockAggregatedChanges { subscription_id, deltas } => {
538 Self::handle_block_changes_msg(&mut guard, subscription_id, deltas)
539 .await?;
540 }
541 WebSocketMessage::Response(Response::NewSubscription {
542 extractor_id,
543 subscription_id,
544 }) => {
545 info!(?extractor_id, ?subscription_id, "Received a new subscription");
546 let inner = guard
547 .as_mut()
548 .ok_or_else(|| DeltasError::NotConnected)?;
549 inner.mark_active(extractor_id.into(), subscription_id);
550 }
551 WebSocketMessage::Response(Response::SubscriptionEnded {
552 subscription_id,
553 }) => {
554 info!(?subscription_id, "Received a subscription ended");
555 let inner = guard
556 .as_mut()
557 .ok_or_else(|| DeltasError::NotConnected)?;
558 inner.remove_subscription(subscription_id)?;
559 }
560 WebSocketMessage::Response(Response::Error(error)) => match &error {
561 WebsocketError::ExtractorNotFound(extractor_id) => {
562 let inner = guard
563 .as_mut()
564 .ok_or_else(|| DeltasError::NotConnected)?;
565 inner.cancel_pending(extractor_id.clone().into(), &error);
566 }
567 WebsocketError::SubscriptionNotFound(subscription_id) => {
568 debug!("Received subscription not found, removing subscription");
569 let inner = guard
570 .as_mut()
571 .ok_or_else(|| DeltasError::NotConnected)?;
572 inner.remove_subscription(*subscription_id)?;
573 }
574 WebsocketError::ParseError(raw, e) => {
575 return Err(DeltasError::ServerError(format!(
576 "Server failed to parse client message: {e}, msg: {raw}"
577 )))
578 }
579 WebsocketError::CompressionError(subscription_id, e) => {
580 return Err(DeltasError::ServerError(format!(
581 "Server failed to compress message for subscription: \
582 {subscription_id}, error: {e}"
583 )))
584 }
585 WebsocketError::SubscribeError(extractor_id) => {
586 let inner = guard
587 .as_mut()
588 .ok_or_else(|| DeltasError::NotConnected)?;
589 inner.cancel_pending(extractor_id.clone().into(), &error);
590 }
591 },
592 },
593 Err(e) => {
594 error!(
595 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
596 e, text
597 );
598 }
599 },
600 Err(e) => {
601 error!(
602 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
603 e, text
604 );
605 }
606 },
607 Ok(tungstenite::protocol::Message::Binary(data)) => {
608 match zstd::decode_all(data.as_slice()) {
611 Ok(decompressed) => {
612 match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
613 Ok(value) => {
614 match serde_json::from_value::<WebSocketMessage>(value.clone()) {
615 Ok(ws_message) => match ws_message {
616 WebSocketMessage::BlockAggregatedChanges {
617 subscription_id,
618 deltas,
619 } => {
620 Self::handle_block_changes_msg(
621 &mut guard,
622 subscription_id,
623 deltas,
624 )
625 .await?;
626 }
627 _ => {
628 error!(
629 "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
630 );
631 }
632 },
633 Err(e) => {
634 error!(
635 "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
636 );
637 }
638 }
639 }
640 Err(e) => {
641 error!(
642 "Failed to deserialize compressed message: invalid JSON. {e}",
643 );
644 }
645 }
646 }
647 Err(e) => {
648 error!("Failed to decompress zstd data: {}", e);
649 }
650 }
651 }
652 Ok(tungstenite::protocol::Message::Ping(_)) => {
653 let inner = guard
655 .as_mut()
656 .ok_or_else(|| DeltasError::NotConnected)?;
657 if let Err(error) = inner
658 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
659 .await
660 {
661 debug!(?error, "Failed to send pong!");
662 }
663 }
664 Ok(tungstenite::protocol::Message::Pong(_)) => {
665 }
667 Ok(tungstenite::protocol::Message::Close(frame)) => {
668 match &frame {
669 Some(f) => {
670 warn!(code = ?f.code, reason = %f.reason, "WebSocket closed by server")
671 }
672 None => warn!("WebSocket closed by server (no close frame)"),
673 }
674 return Err(DeltasError::ConnectionClosed);
675 }
676 Ok(unknown_msg) => {
677 info!("Received an unknown message type: {:?}", unknown_msg);
678 }
679 Err(error) => {
680 error!(?error, "Websocket error");
681 return Err(match error {
682 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
683 tungstenite::Error::AlreadyClosed => {
684 warn!("Received AlreadyClosed error which is indicative of a bug!");
685 DeltasError::ConnectionError(Box::new(error))
686 }
687 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
688 DeltasError::ConnectionError(Box::new(error))
689 }
690 _ => DeltasError::Fatal(error.to_string()),
691 });
692 }
693 };
694 Ok(())
695 }
696
697 async fn handle_block_changes_msg(
698 guard: &mut MutexGuard<'_, Option<Inner>>,
699 subscription_id: Uuid,
700 deltas: dto::BlockAggregatedChanges,
701 ) -> Result<(), DeltasError> {
702 trace!(?deltas, "Received a block state change, sending to channel");
703 let inner = guard
704 .as_mut()
705 .ok_or_else(|| DeltasError::NotConnected)?;
706 match inner.send(&subscription_id, BlockAggregatedChanges::from(deltas)) {
707 Err(DeltasError::BufferFull) => {
708 error!(?subscription_id, "Buffer full, unsubscribing!");
709 Self::force_unsubscribe(subscription_id, inner).await;
710 }
711 Err(_) => {
712 warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
713 Self::force_unsubscribe(subscription_id, inner).await;
714 }
715 _ => { }
716 }
717 Ok(())
718 }
719
720 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
725 if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
727 .subscriptions
728 .get(&subscription_id)
729 {
730 return;
731 }
732
733 let (tx, rx) = oneshot::channel();
734 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
735 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
736 } else {
737 match tokio::time::timeout(Duration::from_secs(5), rx).await {
739 Ok(_) => {
740 debug!(?subscription_id, "Unsubscribe completed successfully");
741 }
742 Err(_) => {
743 warn!(?subscription_id, "Unsubscribe completion timed out");
744 }
745 }
746 }
747 }
748
749 async fn unsubscribe_inner(
755 inner: &mut Inner,
756 subscription_id: Uuid,
757 ready_tx: oneshot::Sender<()>,
758 ) -> Result<(), DeltasError> {
759 debug!(?subscription_id, "Unsubscribing");
760 inner.end_subscription(&subscription_id, ready_tx);
761 let cmd = Command::Unsubscribe { subscription_id };
762 inner
763 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
764 |e| {
765 DeltasError::TransportError(format!(
766 "Failed to serialize unsubscribe command: {e}"
767 ))
768 },
769 )?))
770 .await?;
771 Ok(())
772 }
773}
774
775#[async_trait]
776impl DeltasClient for WsDeltasClient {
777 #[instrument(skip(self))]
778 async fn subscribe(
779 &self,
780 extractor_id: ExtractorIdentity,
781 options: SubscriptionOptions,
782 ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
783 trace!("Starting subscribe");
784 self.ensure_connection().await?;
785 let (ready_tx, ready_rx) = oneshot::channel();
786 {
787 let mut guard = self.inner.lock().await;
788 let inner = guard
789 .as_mut()
790 .ok_or_else(|| DeltasError::NotConnected)?;
791 trace!("Sending subscribe command");
792 inner.new_subscription(&extractor_id, ready_tx)?;
793 let cmd = Command::Subscribe {
794 extractor_id: extractor_id.into(),
795 include_state: options.include_state,
796 compression: options.compression,
797 partial_blocks: options.partial_blocks,
798 };
799 inner
800 .ws_send(tungstenite::protocol::Message::Text(
801 serde_json::to_string(&cmd).map_err(|e| {
802 DeltasError::TransportError(format!(
803 "Failed to serialize subscribe command: {e}"
804 ))
805 })?,
806 ))
807 .await?;
808 }
809 trace!("Waiting for subscription response");
810 let res = tokio::time::timeout(Duration::from_secs(30), ready_rx)
811 .await
812 .map_err(|_| {
813 DeltasError::TransportError(
814 "Subscribe confirmation timed out after 30s".to_string(),
815 )
816 })?
817 .map_err(|_| {
818 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
819 })??;
820 trace!("Subscription successful");
821 Ok(res)
822 }
823
824 #[instrument(skip(self))]
825 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
826 self.ensure_connection().await?;
827 let (ready_tx, ready_rx) = oneshot::channel();
828 {
829 let mut guard = self.inner.lock().await;
830 let inner = guard
831 .as_mut()
832 .ok_or_else(|| DeltasError::NotConnected)?;
833
834 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
835 }
836 tokio::time::timeout(Duration::from_secs(5), ready_rx)
837 .await
838 .map_err(|_| {
839 warn!(?subscription_id, "Unsubscribe confirmation timed out after 5s");
840 DeltasError::TransportError(
841 "Unsubscribe confirmation timed out after 5s".to_string(),
842 )
843 })?
844 .map_err(|_| {
845 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
846 })?;
847
848 Ok(())
849 }
850
851 #[instrument(skip(self))]
852 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
853 if self.is_connected().await {
854 return Err(DeltasError::AlreadyConnected);
855 }
856 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
857 info!(?ws_uri, "Starting TychoWebsocketClient");
858
859 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
860 {
861 let mut guard = self.inner.as_ref().lock().await;
862 *guard = None;
863 }
864 let this = self.clone();
865 let jh = tokio::spawn(async move {
866 let mut retry_count = 0;
867 let mut result = Err(DeltasError::NotConnected);
868
869 'retry: while retry_count < this.max_reconnects {
870 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
871 if retry_count > 0 {
872 sleep(this.retry_cooldown).await;
873 }
874
875 let mut request_builder = Request::builder()
877 .uri(&ws_uri)
878 .header(SEC_WEBSOCKET_KEY, generate_key())
879 .header(SEC_WEBSOCKET_VERSION, 13)
880 .header(CONNECTION, "Upgrade")
881 .header(UPGRADE, "websocket")
882 .header(
883 HOST,
884 this.uri.host().ok_or_else(|| {
885 DeltasError::UriParsing(
886 ws_uri.clone(),
887 "No host found in tycho url".to_string(),
888 )
889 })?,
890 )
891 .header(
892 USER_AGENT,
893 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
894 );
895
896 if let Some(ref key) = this.auth_key {
898 request_builder = request_builder.header(AUTHORIZATION, key);
899 }
900
901 let request = request_builder.body(()).map_err(|e| {
902 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
903 })?;
904 let (conn, _) = match connect_async(request).await {
905 Ok(conn) => conn,
906 Err(e) => {
907 retry_count += 1;
909 let mut guard = this.inner.as_ref().lock().await;
910 *guard = None;
911
912 if let tungstenite::Error::Http(response) = &e {
913 if response.status() == tungstenite::http::StatusCode::TOO_MANY_REQUESTS
914 {
915 let reason = response
916 .body()
917 .as_deref()
918 .and_then(|b| std::str::from_utf8(b).ok())
919 .unwrap_or("")
920 .to_string();
921 warn!(reason, "WebSocket connection rejected: rate limited");
922 continue 'retry;
923 }
924 }
925
926 warn!(
927 e = e.to_string(),
928 "Failed to connect to WebSocket server; Reconnecting"
929 );
930 continue 'retry;
931 }
932 };
933
934 let (ws_tx_new, ws_rx_new) = conn.split();
935 {
936 let mut guard = this.inner.as_ref().lock().await;
937 *guard =
938 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
939 }
940 let mut msg_rx = ws_rx_new.boxed();
941
942 info!("Connection Successful: TychoWebsocketClient started");
943 this.conn_notify.notify_waiters();
944 result = Ok(());
945
946 const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
951 loop {
952 let res = tokio::select! {
953 msg_result = tokio::time::timeout(IDLE_TIMEOUT, msg_rx.next()) => {
954 match msg_result {
955 Err(_elapsed) => {
956 warn!("No WS frame received for {IDLE_TIMEOUT:?}, \
957 treating connection as stalled; Reconnecting...");
958 retry_count += 1;
959 let mut guard = this.inner.as_ref().lock().await;
960 *guard = None;
961 break; }
963 Ok(Some(msg)) => this.handle_msg(msg).await,
964 Ok(None) => {
965 warn!("Websocket connection silently closed, giving up!");
969 break 'retry
970 }
971 }
972 },
973 _ = cmd_rx.recv() => {break 'retry},
974 };
975 if let Err(error) = res {
976 debug!(?error, "WsError");
977 if matches!(
978 error,
979 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
980 ) {
981 retry_count += 1;
983 let mut guard = this.inner.as_ref().lock().await;
984 *guard = None;
985
986 warn!(
987 ?error,
988 ?retry_count,
989 "Connection dropped unexpectedly; Reconnecting..."
990 );
991 break;
992 } else {
993 error!(?error, "Fatal error; Exiting");
995 result = Err(error);
996 break 'retry;
997 }
998 }
999 }
1000 }
1001 debug!(
1002 retry_count,
1003 max_reconnects=?this.max_reconnects,
1004 "Reconnection loop ended"
1005 );
1006 let mut guard = this.inner.as_ref().lock().await;
1008 *guard = None;
1009
1010 if retry_count >= this.max_reconnects {
1012 error!("Max reconnection attempts reached; Exiting");
1013 this.dead.store(true, Ordering::SeqCst);
1014 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
1016 }
1017
1018 result
1019 });
1020
1021 self.conn_notify.notified().await;
1022
1023 if self.is_connected().await {
1024 Ok(jh)
1025 } else {
1026 Err(DeltasError::NotConnected)
1027 }
1028 }
1029
1030 #[instrument(skip(self))]
1031 async fn close(&self) -> Result<(), DeltasError> {
1032 info!("Closing TychoWebsocketClient");
1033 let mut guard = self.inner.lock().await;
1034 let inner = guard
1035 .as_mut()
1036 .ok_or_else(|| DeltasError::NotConnected)?;
1037 inner
1038 .cmd_tx
1039 .send(())
1040 .await
1041 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
1042 Ok(())
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use std::{net::SocketAddr, str::FromStr};
1049
1050 use tokio::{net::TcpListener, time::timeout};
1051 use tycho_common::models::Chain;
1052
1053 use super::*;
1054
1055 #[derive(Clone)]
1056 enum ExpectedComm {
1057 Receive(u64, tungstenite::protocol::Message),
1058 Send(tungstenite::protocol::Message),
1059 }
1060
1061 async fn mock_tycho_ws(
1062 messages: &[ExpectedComm],
1063 reconnects: usize,
1064 ) -> (SocketAddr, JoinHandle<()>) {
1065 info!("Starting mock webserver");
1066 let server = TcpListener::bind("127.0.0.1:0")
1068 .await
1069 .expect("localhost bind failed");
1070 let addr = server.local_addr().unwrap();
1071 let messages = messages.to_vec();
1072
1073 let jh = tokio::spawn(async move {
1074 info!("mock webserver started");
1075 for _ in 0..(reconnects + 1) {
1076 info!("Awaiting client connections");
1077 if let Ok((stream, _)) = server.accept().await {
1078 info!("Client connected");
1079 let mut websocket = tokio_tungstenite::accept_async(stream)
1080 .await
1081 .unwrap();
1082
1083 info!("Handling messages..");
1084 for c in messages.iter().cloned() {
1085 match c {
1086 ExpectedComm::Receive(t, exp) => {
1087 info!("Awaiting message...");
1088 let msg = timeout(Duration::from_millis(t), websocket.next())
1089 .await
1090 .expect("Receive timeout")
1091 .expect("Stream exhausted")
1092 .expect("Failed to receive message.");
1093 info!("Message received");
1094 assert_eq!(msg, exp)
1095 }
1096 ExpectedComm::Send(data) => {
1097 info!("Sending message");
1098 websocket
1099 .send(data)
1100 .await
1101 .expect("Failed to send message");
1102 info!("Message sent");
1103 }
1104 };
1105 }
1106 info!("Mock communication completed");
1107 sleep(Duration::from_millis(100)).await;
1108 let _ = websocket.close(None).await;
1110 info!("Mock server closed connection");
1111 }
1112 }
1113 info!("mock server ended");
1114 });
1115 (addr, jh)
1116 }
1117
1118 const SUBSCRIPTION_ID: &str = "30b740d1-cf09-4e0e-8cfe-b1434d447ece";
1119
1120 fn subscribe() -> String {
1121 subscribe_with_compression(false)
1122 }
1123
1124 fn subscribe_with_compression(compression: bool) -> String {
1125 serde_json::json!({
1126 "method": "subscribe",
1127 "extractor_id": {
1128 "chain": "ethereum",
1129 "name": "vm:ambient"
1130 },
1131 "include_state": true,
1132 "compression": compression,
1133 "partial_blocks": false
1134 })
1135 .to_string()
1136 }
1137
1138 fn subscription_confirmation() -> String {
1139 r#"
1140 {
1141 "method": "newsubscription",
1142 "extractor_id":{
1143 "chain": "ethereum",
1144 "name": "vm:ambient"
1145 },
1146 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1147 }
1148 "#
1149 .replace(|c: char| c.is_whitespace(), "")
1150 }
1151
1152 fn block_deltas() -> String {
1153 r#"
1154 {
1155 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1156 "deltas": {
1157 "extractor": "vm:ambient",
1158 "chain": "ethereum",
1159 "block": {
1160 "number": 123,
1161 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1162 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1163 "chain": "ethereum",
1164 "ts": "2023-09-14T00:00:00"
1165 },
1166 "finalized_block_height": 0,
1167 "revert": false,
1168 "new_tokens": {},
1169 "account_updates": {
1170 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1171 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1172 "chain": "ethereum",
1173 "slots": {},
1174 "balance": "0x01f4",
1175 "code": "",
1176 "change": "Update"
1177 }
1178 },
1179 "state_updates": {
1180 "component_1": {
1181 "component_id": "component_1",
1182 "updated_attributes": {"attr1": "0x01"},
1183 "deleted_attributes": ["attr2"]
1184 }
1185 },
1186 "new_protocol_components":
1187 { "protocol_1": {
1188 "id": "protocol_1",
1189 "protocol_system": "system_1",
1190 "protocol_type_name": "type_1",
1191 "chain": "ethereum",
1192 "tokens": ["0x01", "0x02"],
1193 "contract_ids": ["0x01", "0x02"],
1194 "static_attributes": {"attr1": "0x01f4"},
1195 "change": "Update",
1196 "creation_tx": "0x01",
1197 "created_at": "2023-09-14T00:00:00"
1198 }
1199 },
1200 "deleted_protocol_components": {},
1201 "component_balances": {
1202 "protocol_1":
1203 {
1204 "0x01": {
1205 "token": "0x01",
1206 "balance": "0x01f4",
1207 "balance_float": 0.0,
1208 "modify_tx": "0x01",
1209 "component_id": "protocol_1"
1210 }
1211 }
1212 },
1213 "account_balances": {
1214 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1215 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1216 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1217 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1218 "balance": "0x01f4",
1219 "modify_tx": "0x01"
1220 }
1221 }
1222 },
1223 "component_tvl": {
1224 "protocol_1": 1000.0
1225 },
1226 "dci_update": {
1227 "new_entrypoints": {},
1228 "new_entrypoint_params": {},
1229 "trace_results": {}
1230 }
1231 }
1232 }
1233 "#.replace(|c: char| c.is_whitespace(), "")
1234 }
1235
1236 fn unsubscribe() -> String {
1237 r#"
1238 {
1239 "method": "unsubscribe",
1240 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1241 }
1242 "#
1243 .replace(|c: char| c.is_whitespace(), "")
1244 }
1245
1246 fn subscription_ended() -> String {
1247 r#"
1248 {
1249 "method": "subscriptionended",
1250 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1251 }
1252 "#
1253 .replace(|c: char| c.is_whitespace(), "")
1254 }
1255
1256 #[tokio::test]
1257 async fn test_uncompressed_subscribe_receive() {
1258 let exp_comm = [
1259 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1260 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1261 ExpectedComm::Send(tungstenite::protocol::Message::Text(block_deltas())),
1262 ];
1263 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1264
1265 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1266 let jh = client
1267 .connect()
1268 .await
1269 .expect("connect failed");
1270 let (_, mut rx) = timeout(
1271 Duration::from_millis(100),
1272 client.subscribe(
1273 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1274 SubscriptionOptions::new().with_compression(false),
1275 ),
1276 )
1277 .await
1278 .expect("subscription timed out")
1279 .expect("subscription failed");
1280 let _ = timeout(Duration::from_millis(100), rx.recv())
1281 .await
1282 .expect("awaiting message timeout out")
1283 .expect("receiving message failed");
1284 timeout(Duration::from_millis(100), client.close())
1285 .await
1286 .expect("close timed out")
1287 .expect("close failed");
1288 jh.await
1289 .expect("ws loop errored")
1290 .unwrap();
1291 server_thread.await.unwrap();
1292 }
1293
1294 #[tokio::test]
1295 async fn test_compressed_subscribe_receive() {
1296 let compressed_block_deltas = zstd::encode_all(
1297 block_deltas().as_bytes(),
1298 0, )
1300 .expect("Failed to compress block deltas message");
1301
1302 let exp_comm = [
1303 ExpectedComm::Receive(
1304 100,
1305 tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1306 ),
1307 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1308 ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1309 ];
1310 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1311
1312 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1313 let jh = client
1314 .connect()
1315 .await
1316 .expect("connect failed");
1317 let (_, mut rx) = timeout(
1318 Duration::from_millis(100),
1319 client.subscribe(
1320 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1321 SubscriptionOptions::new().with_compression(true),
1322 ),
1323 )
1324 .await
1325 .expect("subscription timed out")
1326 .expect("subscription failed");
1327 let _ = timeout(Duration::from_millis(100), rx.recv())
1328 .await
1329 .expect("awaiting message timeout out")
1330 .expect("receiving message failed");
1331 timeout(Duration::from_millis(100), client.close())
1332 .await
1333 .expect("close timed out")
1334 .expect("close failed");
1335 jh.await
1336 .expect("ws loop errored")
1337 .unwrap();
1338 server_thread.await.unwrap();
1339 }
1340
1341 #[tokio::test]
1342 async fn test_unsubscribe() {
1343 let exp_comm = [
1344 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1345 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1346 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1347 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1348 ];
1349 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1350
1351 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1352 let jh = client
1353 .connect()
1354 .await
1355 .expect("connect failed");
1356 let (sub_id, mut rx) = timeout(
1357 Duration::from_millis(100),
1358 client.subscribe(
1359 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1360 SubscriptionOptions::new().with_compression(false),
1361 ),
1362 )
1363 .await
1364 .expect("subscription timed out")
1365 .expect("subscription failed");
1366
1367 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1368 .await
1369 .expect("unsubscribe timed out")
1370 .expect("unsubscribe failed");
1371 let res = timeout(Duration::from_millis(100), rx.recv())
1372 .await
1373 .expect("awaiting message timeout out");
1374
1375 assert!(res.is_none());
1377
1378 timeout(Duration::from_millis(100), client.close())
1379 .await
1380 .expect("close timed out")
1381 .expect("close failed");
1382 jh.await
1383 .expect("ws loop errored")
1384 .unwrap();
1385 server_thread.await.unwrap();
1386 }
1387
1388 #[tokio::test]
1389 async fn test_subscription_unexpected_end() {
1390 let exp_comm = [
1391 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1392 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1393 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1394 ];
1395 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1396
1397 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1398 let jh = client
1399 .connect()
1400 .await
1401 .expect("connect failed");
1402 let (_, mut rx) = timeout(
1403 Duration::from_millis(100),
1404 client.subscribe(
1405 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1406 SubscriptionOptions::new().with_compression(false),
1407 ),
1408 )
1409 .await
1410 .expect("subscription timed out")
1411 .expect("subscription failed");
1412 let res = timeout(Duration::from_millis(100), rx.recv())
1413 .await
1414 .expect("awaiting message timeout out");
1415
1416 assert!(res.is_none());
1418
1419 timeout(Duration::from_millis(100), client.close())
1420 .await
1421 .expect("close timed out")
1422 .expect("close failed");
1423 jh.await
1424 .expect("ws loop errored")
1425 .unwrap();
1426 server_thread.await.unwrap();
1427 }
1428
1429 #[test_log::test(tokio::test)]
1430 async fn test_reconnect() {
1431 let exp_comm = [
1432 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe()
1433 )),
1434 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1435 subscription_confirmation()
1436 )),
1437 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1438 {
1439 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1440 "deltas": {
1441 "extractor": "vm:ambient",
1442 "chain": "ethereum",
1443 "block": {
1444 "number": 123,
1445 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1446 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1447 "chain": "ethereum",
1448 "ts": "2023-09-14T00:00:00"
1449 },
1450 "finalized_block_height": 0,
1451 "revert": false,
1452 "new_tokens": {},
1453 "account_updates": {
1454 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1455 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1456 "chain": "ethereum",
1457 "slots": {},
1458 "balance": "0x01f4",
1459 "code": "",
1460 "change": "Update"
1461 }
1462 },
1463 "state_updates": {
1464 "component_1": {
1465 "component_id": "component_1",
1466 "updated_attributes": {"attr1": "0x01"},
1467 "deleted_attributes": ["attr2"]
1468 }
1469 },
1470 "new_protocol_components": {
1471 "protocol_1":
1472 {
1473 "id": "protocol_1",
1474 "protocol_system": "system_1",
1475 "protocol_type_name": "type_1",
1476 "chain": "ethereum",
1477 "tokens": ["0x01", "0x02"],
1478 "contract_ids": ["0x01", "0x02"],
1479 "static_attributes": {"attr1": "0x01f4"},
1480 "change": "Update",
1481 "creation_tx": "0x01",
1482 "created_at": "2023-09-14T00:00:00"
1483 }
1484 },
1485 "deleted_protocol_components": {},
1486 "component_balances": {
1487 "protocol_1": {
1488 "0x01": {
1489 "token": "0x01",
1490 "balance": "0x01f4",
1491 "balance_float": 1000.0,
1492 "modify_tx": "0x01",
1493 "component_id": "protocol_1"
1494 }
1495 }
1496 },
1497 "account_balances": {
1498 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1499 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1500 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1501 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1502 "balance": "0x01f4",
1503 "modify_tx": "0x01"
1504 }
1505 }
1506 },
1507 "component_tvl": {
1508 "protocol_1": 1000.0
1509 },
1510 "dci_update": {
1511 "new_entrypoints": {},
1512 "new_entrypoint_params": {},
1513 "trace_results": {}
1514 }
1515 }
1516 }
1517 "#.to_owned()
1518 ))
1519 ];
1520 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1521 let client = WsDeltasClient::new_with_reconnects(
1522 &format!("ws://{addr}"),
1523 None,
1524 3,
1525 Duration::from_millis(110),
1527 )
1528 .unwrap();
1529
1530 let jh: JoinHandle<Result<(), DeltasError>> = client
1531 .connect()
1532 .await
1533 .expect("connect failed");
1534
1535 for _ in 0..2 {
1536 dbg!("loop");
1537 let (_, mut rx) = timeout(
1538 Duration::from_millis(200),
1539 client.subscribe(
1540 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1541 SubscriptionOptions::new().with_compression(false),
1542 ),
1543 )
1544 .await
1545 .expect("subscription timed out")
1546 .expect("subscription failed");
1547
1548 let _ = timeout(Duration::from_millis(100), rx.recv())
1549 .await
1550 .expect("awaiting message timeout out")
1551 .expect("receiving message failed");
1552
1553 let res = timeout(Duration::from_millis(200), rx.recv())
1555 .await
1556 .expect("awaiting closed connection timeout out");
1557 assert!(res.is_none());
1558 }
1559 let res = jh.await.expect("ws client join failed");
1560 assert!(res.is_err());
1562 server_thread
1563 .await
1564 .expect("ws server loop errored");
1565 }
1566
1567 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1568 let server = TcpListener::bind("127.0.0.1:0")
1569 .await
1570 .expect("localhost bind failed");
1571 let addr = server.local_addr().unwrap();
1572 let jh = tokio::spawn(async move {
1573 while let Ok((stream, _)) = server.accept().await {
1574 if accept_first {
1575 let stream = tokio_tungstenite::accept_async(stream)
1577 .await
1578 .unwrap();
1579 sleep(Duration::from_millis(10)).await;
1580 drop(stream)
1581 } else {
1582 drop(stream);
1584 }
1585 }
1586 });
1587 (addr, jh)
1588 }
1589
1590 #[test_log::test(tokio::test)]
1591 async fn test_subscribe_dead_client_after_max_attempts() {
1592 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1593 let client = WsDeltasClient::new_with_reconnects(
1594 &format!("ws://{addr}"),
1595 None,
1596 3,
1597 Duration::from_secs(0),
1598 )
1599 .unwrap();
1600
1601 let join_handle = client.connect().await.unwrap();
1602 let handle_res = join_handle.await.unwrap();
1603 assert!(handle_res.is_err());
1604 assert!(!client.is_connected().await);
1605
1606 let subscription_res = timeout(
1607 Duration::from_millis(10),
1608 client.subscribe(
1609 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1610 SubscriptionOptions::new(),
1611 ),
1612 )
1613 .await
1614 .unwrap();
1615 assert!(subscription_res.is_err());
1616 }
1617
1618 #[test_log::test(tokio::test)]
1619 async fn test_ws_client_retry_cooldown() {
1620 let start = std::time::Instant::now();
1621 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1622
1623 let client = WsDeltasClient::new_with_reconnects(
1625 &format!("ws://{addr}"),
1626 None,
1627 3, Duration::from_millis(50), )
1630 .unwrap();
1631
1632 let connect_result = client.connect().await;
1634 let elapsed = start.elapsed();
1635
1636 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1638
1639 assert!(
1641 elapsed >= Duration::from_millis(100),
1642 "Expected at least 100ms elapsed, got {:?}",
1643 elapsed
1644 );
1645
1646 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1648 }
1649
1650 #[test_log::test(tokio::test)]
1651 async fn test_buffer_full_triggers_unsubscribe() {
1652 let exp_comm = {
1654 [
1655 ExpectedComm::Receive(
1657 100,
1658 tungstenite::protocol::Message::Text(
1659 subscribe(),
1660 ),
1661 ),
1662 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1664 subscription_confirmation(),
1665 )),
1666 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1668 r#"
1669 {
1670 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1671 "deltas": {
1672 "extractor": "vm:ambient",
1673 "chain": "ethereum",
1674 "block": {
1675 "number": 123,
1676 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1677 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1678 "chain": "ethereum",
1679 "ts": "2023-09-14T00:00:00"
1680 },
1681 "finalized_block_height": 0,
1682 "revert": false,
1683 "new_tokens": {},
1684 "account_updates": {},
1685 "state_updates": {},
1686 "new_protocol_components": {},
1687 "deleted_protocol_components": {},
1688 "component_balances": {},
1689 "account_balances": {},
1690 "component_tvl": {},
1691 "dci_update": {
1692 "new_entrypoints": {},
1693 "new_entrypoint_params": {},
1694 "trace_results": {}
1695 }
1696 }
1697 }
1698 "#.to_owned()
1699 )),
1700 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1702 r#"
1703 {
1704 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1705 "deltas": {
1706 "extractor": "vm:ambient",
1707 "chain": "ethereum",
1708 "block": {
1709 "number": 124,
1710 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1711 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1712 "chain": "ethereum",
1713 "ts": "2023-09-14T00:00:01"
1714 },
1715 "finalized_block_height": 0,
1716 "revert": false,
1717 "new_tokens": {},
1718 "account_updates": {},
1719 "state_updates": {},
1720 "new_protocol_components": {},
1721 "deleted_protocol_components": {},
1722 "component_balances": {},
1723 "account_balances": {},
1724 "component_tvl": {},
1725 "dci_update": {
1726 "new_entrypoints": {},
1727 "new_entrypoint_params": {},
1728 "trace_results": {}
1729 }
1730 }
1731 }
1732 "#.to_owned()
1733 )),
1734 ExpectedComm::Receive(
1736 100,
1737 tungstenite::protocol::Message::Text(
1738 unsubscribe(),
1739 ),
1740 ),
1741 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1743 subscription_ended(),
1744 )),
1745 ]
1746 };
1747
1748 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1749
1750 let client = WsDeltasClient::new_with_custom_buffers(
1752 &format!("ws://{addr}"),
1753 None,
1754 128, 1, )
1757 .unwrap();
1758
1759 let jh = client
1760 .connect()
1761 .await
1762 .expect("connect failed");
1763
1764 let (_sub_id, mut rx) = timeout(
1765 Duration::from_millis(100),
1766 client.subscribe(
1767 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1768 SubscriptionOptions::new().with_compression(false),
1769 ),
1770 )
1771 .await
1772 .expect("subscription timed out")
1773 .expect("subscription failed");
1774
1775 tokio::time::sleep(Duration::from_millis(100)).await;
1777
1778 let mut received_msgs = Vec::new();
1780
1781 while received_msgs.len() < 3 {
1783 match timeout(Duration::from_millis(200), rx.recv()).await {
1784 Ok(Some(msg)) => {
1785 received_msgs.push(msg);
1786 }
1787 Ok(None) => {
1788 break;
1790 }
1791 Err(_) => {
1792 break;
1794 }
1795 }
1796 }
1797
1798 assert!(
1800 received_msgs.len() <= 1,
1801 "Expected buffer overflow to limit messages to at most 1, got {}",
1802 received_msgs.len()
1803 );
1804
1805 if let Some(first_msg) = received_msgs.first() {
1806 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1807 }
1808
1809 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1816
1817 jh.abort();
1819 server_thread.abort();
1820
1821 let _ = jh.await;
1822 let _ = server_thread.await;
1823 }
1824
1825 #[tokio::test]
1826 async fn test_server_error_handling() {
1827 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1828
1829 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1830
1831 let error_response = WebSocketMessage::Response(Response::Error(
1833 WebsocketError::ExtractorNotFound(extractor_id.clone().into()),
1834 ));
1835 let error_json = serde_json::to_string(&error_response).unwrap();
1836
1837 let exp_comm = [
1838 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1839 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1840 ];
1841
1842 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1843
1844 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1845 let jh = client
1846 .connect()
1847 .await
1848 .expect("connect failed");
1849
1850 let result = timeout(
1851 Duration::from_millis(100),
1852 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1853 )
1854 .await
1855 .expect("subscription timed out");
1856
1857 assert!(result.is_err());
1859 if let Err(DeltasError::ServerError(msg)) = result {
1860 assert!(msg.contains("Subscription failed"));
1861 assert!(msg.contains("Extractor not found"));
1862 } else {
1863 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1864 }
1865
1866 timeout(Duration::from_millis(100), client.close())
1867 .await
1868 .expect("close timed out")
1869 .expect("close failed");
1870 jh.await
1871 .expect("ws loop errored")
1872 .unwrap();
1873 server_thread.await.unwrap();
1874 }
1875
1876 #[test_log::test(tokio::test)]
1877 async fn test_subscription_not_found_error() {
1878 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1880
1881 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1882 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1883
1884 let error_response = WebSocketMessage::Response(Response::Error(
1885 WebsocketError::SubscriptionNotFound(subscription_id),
1886 ));
1887 let error_json = serde_json::to_string(&error_response).unwrap();
1888
1889 let exp_comm = [
1890 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1892 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1893 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1895 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1897 ];
1898
1899 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1900
1901 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1902 let jh = client
1903 .connect()
1904 .await
1905 .expect("connect failed");
1906
1907 let (received_sub_id, _rx) = timeout(
1909 Duration::from_millis(100),
1910 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1911 )
1912 .await
1913 .expect("subscription timed out")
1914 .expect("subscription failed");
1915
1916 assert_eq!(received_sub_id, subscription_id);
1917
1918 let unsubscribe_result =
1920 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1921 .await
1922 .expect("unsubscribe timed out");
1923
1924 unsubscribe_result
1928 .expect("Unsubscribe should succeed even if server says subscription not found");
1929
1930 timeout(Duration::from_millis(100), client.close())
1931 .await
1932 .expect("close timed out")
1933 .expect("close failed");
1934 jh.await
1935 .expect("ws loop errored")
1936 .unwrap();
1937 server_thread.await.unwrap();
1938 }
1939
1940 #[test_log::test(tokio::test)]
1941 async fn test_parse_error_handling() {
1942 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1943
1944 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1945 let error_response = WebSocketMessage::Response(Response::Error(
1946 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1947 ));
1948 let error_json = serde_json::to_string(&error_response).unwrap();
1949
1950 let exp_comm = [
1951 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1953 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1954 ];
1955
1956 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1957
1958 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1959 let jh = client
1960 .connect()
1961 .await
1962 .expect("connect failed");
1963
1964 let _ = timeout(
1966 Duration::from_millis(100),
1967 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1968 )
1969 .await
1970 .expect("subscription timed out");
1971
1972 let result = jh
1974 .await
1975 .expect("ws loop should complete");
1976 assert!(result.is_err());
1977 if let Err(DeltasError::ServerError(message)) = result {
1978 assert!(message.contains("Server failed to parse client message"));
1979 } else {
1980 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1981 }
1982
1983 server_thread.await.unwrap();
1984 }
1985
1986 #[test_log::test(tokio::test)]
1987 async fn test_compression_error_handling() {
1988 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1989
1990 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1991 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1992 let error_response = WebSocketMessage::Response(Response::Error(
1993 WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1994 ));
1995 let error_json = serde_json::to_string(&error_response).unwrap();
1996
1997 let exp_comm = [
1998 ExpectedComm::Receive(
2000 100,
2001 tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
2002 ),
2003 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2004 ];
2005
2006 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2007
2008 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2009 let jh = client
2010 .connect()
2011 .await
2012 .expect("connect failed");
2013
2014 let _ = timeout(
2016 Duration::from_millis(100),
2017 client.subscribe(extractor_id, SubscriptionOptions::new()),
2018 )
2019 .await
2020 .expect("subscription timed out");
2021
2022 let result = jh
2024 .await
2025 .expect("ws loop should complete");
2026 assert!(result.is_err());
2027 if let Err(DeltasError::ServerError(message)) = result {
2028 assert!(message.contains("Server failed to compress message for subscription"));
2029 } else {
2030 panic!("Expected DeltasError::ServerError, got: {:?}", result);
2031 }
2032
2033 server_thread.await.unwrap();
2034 }
2035
2036 #[tokio::test]
2037 async fn test_subscribe_error_handling() {
2038 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2039
2040 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2041
2042 let error_response = WebSocketMessage::Response(Response::Error(
2043 WebsocketError::SubscribeError(extractor_id.clone().into()),
2044 ));
2045 let error_json = serde_json::to_string(&error_response).unwrap();
2046
2047 let exp_comm = [
2048 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2049 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2050 ];
2051
2052 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2053
2054 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2055 let jh = client
2056 .connect()
2057 .await
2058 .expect("connect failed");
2059
2060 let result = timeout(
2061 Duration::from_millis(100),
2062 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
2063 )
2064 .await
2065 .expect("subscription timed out");
2066
2067 assert!(result.is_err());
2069 if let Err(DeltasError::ServerError(msg)) = result {
2070 assert!(msg.contains("Subscription failed"));
2071 assert!(msg.contains("Failed to subscribe to extractor"));
2072 } else {
2073 panic!("Expected DeltasError::ServerError, got: {:?}", result);
2074 }
2075
2076 timeout(Duration::from_millis(100), client.close())
2077 .await
2078 .expect("close timed out")
2079 .expect("close failed");
2080 jh.await
2081 .expect("ws loop errored")
2082 .unwrap();
2083 server_thread.await.unwrap();
2084 }
2085
2086 #[tokio::test]
2087 async fn test_cancel_pending_subscription() {
2088 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2090
2091 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2092
2093 let error_response = WebSocketMessage::Response(Response::Error(
2094 WebsocketError::ExtractorNotFound(extractor_id.clone().into()),
2095 ));
2096 let error_json = serde_json::to_string(&error_response).unwrap();
2097
2098 let exp_comm = [
2099 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2100 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2101 ];
2102
2103 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2104
2105 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2106 let jh = client
2107 .connect()
2108 .await
2109 .expect("connect failed");
2110
2111 let client_clone = client.clone();
2113 let extractor_id_clone = extractor_id.clone();
2114
2115 let subscription1 = tokio::spawn({
2116 let client_for_spawn = client.clone();
2117 async move {
2118 client_for_spawn
2119 .subscribe(extractor_id, SubscriptionOptions::new().with_compression(false))
2120 .await
2121 }
2122 });
2123
2124 let subscription2 = tokio::spawn(async move {
2125 client_clone
2127 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2128 .await
2129 });
2130
2131 let (result1, result2) = tokio::join!(subscription1, subscription2);
2132
2133 let result1 = result1.unwrap();
2134 let result2 = result2.unwrap();
2135
2136 assert!(result1.is_err() || result2.is_err());
2139
2140 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2141 } else if let Err(DeltasError::ServerError(_)) = result1 {
2143 } else {
2145 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2146 }
2147
2148 timeout(Duration::from_millis(100), client.close())
2149 .await
2150 .expect("close timed out")
2151 .expect("close failed");
2152 jh.await
2153 .expect("ws loop errored")
2154 .unwrap();
2155 server_thread.await.unwrap();
2156 }
2157
2158 #[tokio::test]
2159 async fn test_force_unsubscribe_prevents_multiple_calls() {
2160 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
2164
2165 let exp_comm = [
2166 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2167 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
2168 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
2170 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
2171 ];
2172
2173 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2174
2175 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2176 let jh = client
2177 .connect()
2178 .await
2179 .expect("connect failed");
2180
2181 let (received_sub_id, _rx) = timeout(
2182 Duration::from_millis(100),
2183 client.subscribe(
2184 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2185 SubscriptionOptions::new().with_compression(false),
2186 ),
2187 )
2188 .await
2189 .expect("subscription timed out")
2190 .expect("subscription failed");
2191
2192 assert_eq!(received_sub_id, subscription_id);
2193
2194 {
2196 let mut inner_guard = client.inner.lock().await;
2197 let inner = inner_guard
2198 .as_mut()
2199 .expect("client should be connected");
2200
2201 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2203 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2204 }
2205
2206 tokio::time::sleep(Duration::from_millis(50)).await;
2208
2209 let _ = timeout(Duration::from_millis(100), client.close()).await;
2211
2212 let _ = jh.await;
2214 let _ = server_thread.await;
2215 }
2216}