1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, MutexGuard, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62 BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65use zstd;
66
67use crate::TYCHO_SERVER_VERSION;
68
69#[derive(Error, Debug)]
70pub enum DeltasError {
71 #[error("Failed to parse URI: {0}. Error: {1}")]
73 UriParsing(String, String),
74
75 #[error("The requested subscription is already pending")]
77 SubscriptionAlreadyPending,
78
79 #[error("The server replied with an error: {0}")]
80 ServerError(String, #[source] WebsocketError),
81
82 #[error("{0}")]
85 TransportError(String),
86
87 #[error("The buffer is full!")]
91 BufferFull,
92
93 #[error("The client is not connected!")]
97 NotConnected,
98
99 #[error("The client is already connected!")]
101 AlreadyConnected,
102
103 #[error("The server closed the connection!")]
105 ConnectionClosed,
106
107 #[error("Connection error: {0}")]
109 ConnectionError(#[from] Box<tungstenite::Error>),
110
111 #[error("Tycho FatalError: {0}")]
113 Fatal(String),
114}
115
116#[derive(Clone, Debug)]
117pub struct SubscriptionOptions {
118 include_state: bool,
119 compression: bool,
120 partial_blocks: bool,
121}
122
123impl Default for SubscriptionOptions {
124 fn default() -> Self {
125 Self { include_state: true, compression: true, partial_blocks: false }
126 }
127}
128
129impl SubscriptionOptions {
130 pub fn new() -> Self {
131 Self::default()
132 }
133 pub fn with_state(mut self, val: bool) -> Self {
134 self.include_state = val;
135 self
136 }
137 pub fn with_compression(mut self, val: bool) -> Self {
138 self.compression = val;
139 self
140 }
141 pub fn with_partial_blocks(mut self, val: bool) -> Self {
142 self.partial_blocks = val;
143 self
144 }
145}
146
147#[cfg_attr(test, automock)]
148#[async_trait]
149pub trait DeltasClient {
150 async fn subscribe(
157 &self,
158 extractor_id: ExtractorIdentity,
159 options: SubscriptionOptions,
160 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
161
162 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
164
165 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
167
168 async fn close(&self) -> Result<(), DeltasError>;
170}
171
172#[derive(Clone)]
173pub struct WsDeltasClient {
174 uri: Uri,
176 auth_key: Option<String>,
178 max_reconnects: u64,
180 retry_cooldown: Duration,
182 ws_buffer_size: usize,
185 subscription_buffer_size: usize,
188 conn_notify: Arc<Notify>,
190 inner: Arc<Mutex<Option<Inner>>>,
192 dead: Arc<AtomicBool>,
194}
195
196type WebSocketSink =
197 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
198
199#[derive(Debug)]
211enum SubscriptionInfo {
212 RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
214 Active,
216 RequestedUnsubscription(oneshot::Sender<()>),
218}
219
220struct Inner {
222 sink: WebSocketSink,
224 cmd_tx: Sender<()>,
226 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
228 subscriptions: HashMap<Uuid, SubscriptionInfo>,
230 sender: HashMap<Uuid, Sender<BlockChanges>>,
233 buffer_size: usize,
235}
236
237impl Inner {
241 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
242 Self {
243 sink,
244 cmd_tx,
245 pending: HashMap::new(),
246 subscriptions: HashMap::new(),
247 sender: HashMap::new(),
248 buffer_size,
249 }
250 }
251
252 #[allow(clippy::result_large_err)]
254 fn new_subscription(
255 &mut self,
256 id: &ExtractorIdentity,
257 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
258 ) -> Result<(), DeltasError> {
259 if self.pending.contains_key(id) {
260 return Err(DeltasError::SubscriptionAlreadyPending);
261 }
262 self.pending
263 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
264 Ok(())
265 }
266
267 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
271 if let Some(info) = self.pending.remove(extractor_id) {
272 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
273 let (tx, rx) = mpsc::channel(self.buffer_size);
274 self.sender.insert(subscription_id, tx);
275 self.subscriptions
276 .insert(subscription_id, SubscriptionInfo::Active);
277 let _ = ready_tx
278 .send(Ok((subscription_id, rx)))
279 .map_err(|_| {
280 warn!(
281 ?extractor_id,
282 ?subscription_id,
283 "Subscriber for has gone away. Ignoring."
284 )
285 });
286 } else {
287 error!(
288 ?extractor_id,
289 ?subscription_id,
290 "Pending subscription was not in the correct state to
291 transition to active. Ignoring!"
292 )
293 }
294 } else {
295 error!(
296 ?extractor_id,
297 ?subscription_id,
298 "Tried to mark an unknown subscription as active. Ignoring!"
299 );
300 }
301 }
302
303 #[allow(clippy::result_large_err)]
305 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
306 if let Some(sender) = self.sender.get_mut(id) {
307 sender
308 .try_send(msg)
309 .map_err(|e| match e {
310 TrySendError::Full(_) => DeltasError::BufferFull,
311 TrySendError::Closed(_) => {
312 DeltasError::TransportError("The subscriber has gone away".to_string())
313 }
314 })?;
315 }
316 Ok(())
317 }
318
319 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
324 if let Some(info) = self
325 .subscriptions
326 .get_mut(subscription_id)
327 {
328 if let SubscriptionInfo::Active = info {
329 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
330 }
331 } else {
332 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
334 }
335 }
336
337 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
344 if let Entry::Occupied(e) = self
345 .subscriptions
346 .entry(subscription_id)
347 {
348 let info = e.remove();
349 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
350 let _ = tx.send(()).map_err(|_| {
351 debug!(?subscription_id, "failed to notify about removed subscription")
352 });
353 self.sender
354 .remove(&subscription_id)
355 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
356 } else {
357 warn!(?subscription_id, "Subscription ended unexpectedly!");
358 self.sender
359 .remove(&subscription_id)
360 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
361 }
362 } else {
363 trace!(
368 ?subscription_id,
369 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
370 );
371 }
372
373 Ok(())
374 }
375
376 fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
377 if let Some(sub_info) = self.pending.remove(extractor_id) {
378 match sub_info {
379 SubscriptionInfo::RequestedSubscription(tx) => {
380 let _ = tx
381 .send(Err(DeltasError::ServerError(
382 format!("Subscription failed: {error}"),
383 error.clone(),
384 )))
385 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
386 }
387 _ => {
388 error!(?extractor_id, "Pending subscription in wrong state")
389 }
390 }
391 } else {
392 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
393 }
394 }
395
396 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
398 self.sink.send(msg).await.map_err(|e| {
399 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
400 })
401 }
402}
403
404impl WsDeltasClient {
406 #[allow(clippy::result_large_err)]
408 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
409 let uri = ws_uri
410 .parse::<Uri>()
411 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
412 Ok(Self {
413 uri,
414 auth_key: auth_key.map(|s| s.to_string()),
415 inner: Arc::new(Mutex::new(None)),
416 ws_buffer_size: 128,
417 subscription_buffer_size: 128,
418 conn_notify: Arc::new(Notify::new()),
419 max_reconnects: 5,
420 retry_cooldown: Duration::from_millis(500),
421 dead: Arc::new(AtomicBool::new(false)),
422 })
423 }
424
425 #[allow(clippy::result_large_err)]
427 pub fn new_with_reconnects(
428 ws_uri: &str,
429 auth_key: Option<&str>,
430 max_reconnects: u64,
431 retry_cooldown: Duration,
432 ) -> Result<Self, DeltasError> {
433 let uri = ws_uri
434 .parse::<Uri>()
435 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
436
437 Ok(Self {
438 uri,
439 auth_key: auth_key.map(|s| s.to_string()),
440 inner: Arc::new(Mutex::new(None)),
441 ws_buffer_size: 128,
442 subscription_buffer_size: 128,
443 conn_notify: Arc::new(Notify::new()),
444 max_reconnects,
445 retry_cooldown,
446 dead: Arc::new(AtomicBool::new(false)),
447 })
448 }
449
450 #[cfg(test)]
452 #[allow(clippy::result_large_err)]
453 pub fn new_with_custom_buffers(
454 ws_uri: &str,
455 auth_key: Option<&str>,
456 ws_buffer_size: usize,
457 subscription_buffer_size: usize,
458 ) -> Result<Self, DeltasError> {
459 let uri = ws_uri
460 .parse::<Uri>()
461 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
462 Ok(Self {
463 uri,
464 auth_key: auth_key.map(|s| s.to_string()),
465 inner: Arc::new(Mutex::new(None)),
466 ws_buffer_size,
467 subscription_buffer_size,
468 conn_notify: Arc::new(Notify::new()),
469 max_reconnects: 5,
470 retry_cooldown: Duration::from_millis(0),
471 dead: Arc::new(AtomicBool::new(false)),
472 })
473 }
474
475 async fn is_connected(&self) -> bool {
479 let guard = self.inner.as_ref().lock().await;
480 guard.is_some()
481 }
482
483 async fn ensure_connection(&self) -> Result<(), DeltasError> {
488 if self.dead.load(Ordering::SeqCst) {
489 return Err(DeltasError::NotConnected)
490 };
491 if !self.is_connected().await {
492 self.conn_notify.notified().await;
493 };
494 Ok(())
495 }
496
497 #[instrument(skip(self, msg))]
502 async fn handle_msg(
503 &self,
504 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
505 ) -> Result<(), DeltasError> {
506 let mut guard = self.inner.lock().await;
507
508 match msg {
509 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
514 serde_json::Value,
515 >(&text)
516 {
517 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
518 Ok(ws_message) => match ws_message {
519 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
520 Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
521 }
522 WebSocketMessage::Response(Response::NewSubscription {
523 extractor_id,
524 subscription_id,
525 }) => {
526 info!(?extractor_id, ?subscription_id, "Received a new subscription");
527 let inner = guard
528 .as_mut()
529 .ok_or_else(|| DeltasError::NotConnected)?;
530 inner.mark_active(&extractor_id, subscription_id);
531 }
532 WebSocketMessage::Response(Response::SubscriptionEnded {
533 subscription_id,
534 }) => {
535 info!(?subscription_id, "Received a subscription ended");
536 let inner = guard
537 .as_mut()
538 .ok_or_else(|| DeltasError::NotConnected)?;
539 inner.remove_subscription(subscription_id)?;
540 }
541 WebSocketMessage::Response(Response::Error(error)) => match &error {
542 WebsocketError::ExtractorNotFound(extractor_id) => {
543 let inner = guard
544 .as_mut()
545 .ok_or_else(|| DeltasError::NotConnected)?;
546 inner.cancel_pending(extractor_id, &error);
547 }
548 WebsocketError::SubscriptionNotFound(subscription_id) => {
549 debug!("Received subscription not found, removing subscription");
550 let inner = guard
551 .as_mut()
552 .ok_or_else(|| DeltasError::NotConnected)?;
553 inner.remove_subscription(*subscription_id)?;
554 }
555 WebsocketError::ParseError(raw, e) => {
556 return Err(DeltasError::ServerError(
557 format!(
558 "Server failed to parse client message: {e}, msg: {raw}"
559 ),
560 error.clone(),
561 ))
562 }
563 WebsocketError::CompressionError(subscription_id, e) => {
564 return Err(DeltasError::ServerError(
565 format!(
566 "Server failed to compress message for subscription: {subscription_id}, error: {e}"
567 ),
568 error.clone(),
569 ))
570 }
571 WebsocketError::SubscribeError(extractor_id) => {
572 let inner = guard
573 .as_mut()
574 .ok_or_else(|| DeltasError::NotConnected)?;
575 inner.cancel_pending(extractor_id, &error);
576 }
577 },
578 },
579 Err(e) => {
580 error!(
581 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
582 e, text
583 );
584 }
585 },
586 Err(e) => {
587 error!(
588 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
589 e, text
590 );
591 }
592 },
593 Ok(tungstenite::protocol::Message::Binary(data)) => {
594 match zstd::decode_all(data.as_slice()) {
597 Ok(decompressed) => match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
598 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value.clone()) {
599 Ok(ws_message) => match ws_message {
600 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
601 Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
602 }
603 _ => {
604 error!(
605 "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
606 );
607 }
608
609 },
610 Err(e) => {
611 error!(
612 "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
613 );
614 }
615 },
616 Err(e) => {
617 error!(
618 "Failed to deserialize compressed message: invalid JSON. {e}",
619 );
620 }
621 },
622 Err(e) => {
623 error!("Failed to decompress zstd data: {}", e);
624 }
625 }
626 },
627 Ok(tungstenite::protocol::Message::Ping(_)) => {
628 let inner = guard
630 .as_mut()
631 .ok_or_else(|| DeltasError::NotConnected)?;
632 if let Err(error) = inner
633 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
634 .await
635 {
636 debug!(?error, "Failed to send pong!");
637 }
638 }
639 Ok(tungstenite::protocol::Message::Pong(_)) => {
640 }
642 Ok(tungstenite::protocol::Message::Close(_)) => {
643 return Err(DeltasError::ConnectionClosed);
644 }
645 Ok(unknown_msg) => {
646 info!("Received an unknown message type: {:?}", unknown_msg);
647 }
648 Err(error) => {
649 error!(?error, "Websocket error");
650 return Err(match error {
651 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
652 tungstenite::Error::AlreadyClosed => {
653 warn!("Received AlreadyClosed error which is indicative of a bug!");
654 DeltasError::ConnectionError(Box::new(error))
655 }
656 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
657 DeltasError::ConnectionError(Box::new(error))
658 }
659 _ => DeltasError::Fatal(error.to_string()),
660 });
661 }
662 };
663 Ok(())
664 }
665
666 async fn handle_block_changes_msg(
667 guard: &mut MutexGuard<'_, Option<Inner>>,
668 subscription_id: Uuid,
669 deltas: BlockChanges,
670 ) -> Result<(), DeltasError> {
671 trace!(?deltas, "Received a block state change, sending to channel");
672 let inner = guard
673 .as_mut()
674 .ok_or_else(|| DeltasError::NotConnected)?;
675 match inner.send(&subscription_id, deltas) {
676 Err(DeltasError::BufferFull) => {
677 error!(?subscription_id, "Buffer full, unsubscribing!");
678 Self::force_unsubscribe(subscription_id, inner).await;
679 }
680 Err(_) => {
681 warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
682 Self::force_unsubscribe(subscription_id, inner).await;
683 }
684 _ => { }
685 }
686 Ok(())
687 }
688
689 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
694 if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
696 .subscriptions
697 .get(&subscription_id)
698 {
699 return
700 }
701
702 let (tx, rx) = oneshot::channel();
703 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
704 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
705 } else {
706 match tokio::time::timeout(Duration::from_secs(5), rx).await {
708 Ok(_) => {
709 debug!(?subscription_id, "Unsubscribe completed successfully");
710 }
711 Err(_) => {
712 warn!(?subscription_id, "Unsubscribe completion timed out");
713 }
714 }
715 }
716 }
717
718 async fn unsubscribe_inner(
724 inner: &mut Inner,
725 subscription_id: Uuid,
726 ready_tx: oneshot::Sender<()>,
727 ) -> Result<(), DeltasError> {
728 debug!(?subscription_id, "Unsubscribing");
729 inner.end_subscription(&subscription_id, ready_tx);
730 let cmd = Command::Unsubscribe { subscription_id };
731 inner
732 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
733 |e| {
734 DeltasError::TransportError(format!(
735 "Failed to serialize unsubscribe command: {e}"
736 ))
737 },
738 )?))
739 .await?;
740 Ok(())
741 }
742}
743
744#[async_trait]
745impl DeltasClient for WsDeltasClient {
746 #[instrument(skip(self))]
747 async fn subscribe(
748 &self,
749 extractor_id: ExtractorIdentity,
750 options: SubscriptionOptions,
751 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
752 trace!("Starting subscribe");
753 self.ensure_connection().await?;
754 let (ready_tx, ready_rx) = oneshot::channel();
755 {
756 let mut guard = self.inner.lock().await;
757 let inner = guard
758 .as_mut()
759 .ok_or_else(|| DeltasError::NotConnected)?;
760 trace!("Sending subscribe command");
761 inner.new_subscription(&extractor_id, ready_tx)?;
762 let cmd = Command::Subscribe {
763 extractor_id,
764 include_state: options.include_state,
765 compression: options.compression,
766 partial_blocks: options.partial_blocks,
767 };
768 inner
769 .ws_send(tungstenite::protocol::Message::Text(
770 serde_json::to_string(&cmd).map_err(|e| {
771 DeltasError::TransportError(format!(
772 "Failed to serialize subscribe command: {e}"
773 ))
774 })?,
775 ))
776 .await?;
777 }
778 trace!("Waiting for subscription response");
779 let res = ready_rx.await.map_err(|_| {
780 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
781 })??;
782 trace!("Subscription successful");
783 Ok(res)
784 }
785
786 #[instrument(skip(self))]
787 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
788 self.ensure_connection().await?;
789 let (ready_tx, ready_rx) = oneshot::channel();
790 {
791 let mut guard = self.inner.lock().await;
792 let inner = guard
793 .as_mut()
794 .ok_or_else(|| DeltasError::NotConnected)?;
795
796 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
797 }
798 ready_rx.await.map_err(|_| {
799 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
800 })?;
801
802 Ok(())
803 }
804
805 #[instrument(skip(self))]
806 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
807 if self.is_connected().await {
808 return Err(DeltasError::AlreadyConnected);
809 }
810 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
811 info!(?ws_uri, "Starting TychoWebsocketClient");
812
813 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
814 {
815 let mut guard = self.inner.as_ref().lock().await;
816 *guard = None;
817 }
818 let this = self.clone();
819 let jh = tokio::spawn(async move {
820 let mut retry_count = 0;
821 let mut result = Err(DeltasError::NotConnected);
822
823 'retry: while retry_count < this.max_reconnects {
824 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
825 if retry_count > 0 {
826 sleep(this.retry_cooldown).await;
827 }
828
829 let mut request_builder = Request::builder()
831 .uri(&ws_uri)
832 .header(SEC_WEBSOCKET_KEY, generate_key())
833 .header(SEC_WEBSOCKET_VERSION, 13)
834 .header(CONNECTION, "Upgrade")
835 .header(UPGRADE, "websocket")
836 .header(
837 HOST,
838 this.uri.host().ok_or_else(|| {
839 DeltasError::UriParsing(
840 ws_uri.clone(),
841 "No host found in tycho url".to_string(),
842 )
843 })?,
844 )
845 .header(
846 USER_AGENT,
847 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
848 );
849
850 if let Some(ref key) = this.auth_key {
852 request_builder = request_builder.header(AUTHORIZATION, key);
853 }
854
855 let request = request_builder.body(()).map_err(|e| {
856 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
857 })?;
858 let (conn, _) = match connect_async(request).await {
859 Ok(conn) => conn,
860 Err(e) => {
861 retry_count += 1;
863 let mut guard = this.inner.as_ref().lock().await;
864 *guard = None;
865
866 warn!(
867 e = e.to_string(),
868 "Failed to connect to WebSocket server; Reconnecting"
869 );
870 continue 'retry;
871 }
872 };
873
874 let (ws_tx_new, ws_rx_new) = conn.split();
875 {
876 let mut guard = this.inner.as_ref().lock().await;
877 *guard =
878 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
879 }
880 let mut msg_rx = ws_rx_new.boxed();
881
882 info!("Connection Successful: TychoWebsocketClient started");
883 this.conn_notify.notify_waiters();
884 result = Ok(());
885
886 loop {
887 let res = tokio::select! {
888 msg = msg_rx.next() => match msg {
889 Some(msg) => this.handle_msg(msg).await,
890 None => {
891 warn!("Websocket connection silently closed, giving up!");
895 break 'retry
896 }
897 },
898 _ = cmd_rx.recv() => {break 'retry},
899 };
900 if let Err(error) = res {
901 debug!(?error, "WsError");
902 if matches!(
903 error,
904 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
905 ) {
906 retry_count += 1;
908 let mut guard = this.inner.as_ref().lock().await;
909 *guard = None;
910
911 warn!(
912 ?error,
913 ?retry_count,
914 "Connection dropped unexpectedly; Reconnecting..."
915 );
916 break;
917 } else {
918 error!(?error, "Fatal error; Exiting");
920 result = Err(error);
921 break 'retry;
922 }
923 }
924 }
925 }
926 debug!(
927 retry_count,
928 max_reconnects=?this.max_reconnects,
929 "Reconnection loop ended"
930 );
931 let mut guard = this.inner.as_ref().lock().await;
933 *guard = None;
934
935 if retry_count >= this.max_reconnects {
937 error!("Max reconnection attempts reached; Exiting");
938 this.dead.store(true, Ordering::SeqCst);
939 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
941 }
942
943 result
944 });
945
946 self.conn_notify.notified().await;
947
948 if self.is_connected().await {
949 Ok(jh)
950 } else {
951 Err(DeltasError::NotConnected)
952 }
953 }
954
955 #[instrument(skip(self))]
956 async fn close(&self) -> Result<(), DeltasError> {
957 info!("Closing TychoWebsocketClient");
958 let mut guard = self.inner.lock().await;
959 let inner = guard
960 .as_mut()
961 .ok_or_else(|| DeltasError::NotConnected)?;
962 inner
963 .cmd_tx
964 .send(())
965 .await
966 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
967 Ok(())
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use std::{net::SocketAddr, str::FromStr};
974
975 use tokio::{net::TcpListener, time::timeout};
976 use tycho_common::dto::Chain;
977
978 use super::*;
979
980 #[derive(Clone)]
981 enum ExpectedComm {
982 Receive(u64, tungstenite::protocol::Message),
983 Send(tungstenite::protocol::Message),
984 }
985
986 async fn mock_tycho_ws(
987 messages: &[ExpectedComm],
988 reconnects: usize,
989 ) -> (SocketAddr, JoinHandle<()>) {
990 info!("Starting mock webserver");
991 let server = TcpListener::bind("127.0.0.1:0")
993 .await
994 .expect("localhost bind failed");
995 let addr = server.local_addr().unwrap();
996 let messages = messages.to_vec();
997
998 let jh = tokio::spawn(async move {
999 info!("mock webserver started");
1000 for _ in 0..(reconnects + 1) {
1001 info!("Awaiting client connections");
1002 if let Ok((stream, _)) = server.accept().await {
1003 info!("Client connected");
1004 let mut websocket = tokio_tungstenite::accept_async(stream)
1005 .await
1006 .unwrap();
1007
1008 info!("Handling messages..");
1009 for c in messages.iter().cloned() {
1010 match c {
1011 ExpectedComm::Receive(t, exp) => {
1012 info!("Awaiting message...");
1013 let msg = timeout(Duration::from_millis(t), websocket.next())
1014 .await
1015 .expect("Receive timeout")
1016 .expect("Stream exhausted")
1017 .expect("Failed to receive message.");
1018 info!("Message received");
1019 assert_eq!(msg, exp)
1020 }
1021 ExpectedComm::Send(data) => {
1022 info!("Sending message");
1023 websocket
1024 .send(data)
1025 .await
1026 .expect("Failed to send message");
1027 info!("Message sent");
1028 }
1029 };
1030 }
1031 info!("Mock communication completed");
1032 sleep(Duration::from_millis(100)).await;
1033 let _ = websocket.close(None).await;
1035 info!("Mock server closed connection");
1036 }
1037 }
1038 info!("mock server ended");
1039 });
1040 (addr, jh)
1041 }
1042
1043 const SUBSCRIPTION_ID: &str = "30b740d1-cf09-4e0e-8cfe-b1434d447ece";
1044
1045 fn subscribe() -> String {
1046 subscribe_with_compression(false)
1047 }
1048
1049 fn subscribe_with_compression(compression: bool) -> String {
1050 serde_json::json!({
1051 "method": "subscribe",
1052 "extractor_id": {
1053 "chain": "ethereum",
1054 "name": "vm:ambient"
1055 },
1056 "include_state": true,
1057 "compression": compression,
1058 "partial_blocks": false
1059 })
1060 .to_string()
1061 }
1062
1063 fn subscription_confirmation() -> String {
1064 r#"
1065 {
1066 "method": "newsubscription",
1067 "extractor_id":{
1068 "chain": "ethereum",
1069 "name": "vm:ambient"
1070 },
1071 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1072 }
1073 "#
1074 .replace(|c: char| c.is_whitespace(), "")
1075 }
1076
1077 fn block_deltas() -> String {
1078 r#"
1079 {
1080 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1081 "deltas": {
1082 "extractor": "vm:ambient",
1083 "chain": "ethereum",
1084 "block": {
1085 "number": 123,
1086 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1087 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1088 "chain": "ethereum",
1089 "ts": "2023-09-14T00:00:00"
1090 },
1091 "finalized_block_height": 0,
1092 "revert": false,
1093 "new_tokens": {},
1094 "account_updates": {
1095 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1096 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1097 "chain": "ethereum",
1098 "slots": {},
1099 "balance": "0x01f4",
1100 "code": "",
1101 "change": "Update"
1102 }
1103 },
1104 "state_updates": {
1105 "component_1": {
1106 "component_id": "component_1",
1107 "updated_attributes": {"attr1": "0x01"},
1108 "deleted_attributes": ["attr2"]
1109 }
1110 },
1111 "new_protocol_components":
1112 { "protocol_1": {
1113 "id": "protocol_1",
1114 "protocol_system": "system_1",
1115 "protocol_type_name": "type_1",
1116 "chain": "ethereum",
1117 "tokens": ["0x01", "0x02"],
1118 "contract_ids": ["0x01", "0x02"],
1119 "static_attributes": {"attr1": "0x01f4"},
1120 "change": "Update",
1121 "creation_tx": "0x01",
1122 "created_at": "2023-09-14T00:00:00"
1123 }
1124 },
1125 "deleted_protocol_components": {},
1126 "component_balances": {
1127 "protocol_1":
1128 {
1129 "0x01": {
1130 "token": "0x01",
1131 "balance": "0x01f4",
1132 "balance_float": 0.0,
1133 "modify_tx": "0x01",
1134 "component_id": "protocol_1"
1135 }
1136 }
1137 },
1138 "account_balances": {
1139 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1140 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1141 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1142 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1143 "balance": "0x01f4",
1144 "modify_tx": "0x01"
1145 }
1146 }
1147 },
1148 "component_tvl": {
1149 "protocol_1": 1000.0
1150 },
1151 "dci_update": {
1152 "new_entrypoints": {},
1153 "new_entrypoint_params": {},
1154 "trace_results": {}
1155 }
1156 }
1157 }
1158 "#.replace(|c: char| c.is_whitespace(), "")
1159 }
1160
1161 fn unsubscribe() -> String {
1162 r#"
1163 {
1164 "method": "unsubscribe",
1165 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1166 }
1167 "#
1168 .replace(|c: char| c.is_whitespace(), "")
1169 }
1170
1171 fn subscription_ended() -> String {
1172 r#"
1173 {
1174 "method": "subscriptionended",
1175 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1176 }
1177 "#
1178 .replace(|c: char| c.is_whitespace(), "")
1179 }
1180
1181 #[tokio::test]
1182 async fn test_uncompressed_subscribe_receive() {
1183 let exp_comm = [
1184 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1185 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1186 ExpectedComm::Send(tungstenite::protocol::Message::Text(block_deltas())),
1187 ];
1188 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1189
1190 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1191 let jh = client
1192 .connect()
1193 .await
1194 .expect("connect failed");
1195 let (_, mut rx) = timeout(
1196 Duration::from_millis(100),
1197 client.subscribe(
1198 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1199 SubscriptionOptions::new().with_compression(false),
1200 ),
1201 )
1202 .await
1203 .expect("subscription timed out")
1204 .expect("subscription failed");
1205 let _ = timeout(Duration::from_millis(100), rx.recv())
1206 .await
1207 .expect("awaiting message timeout out")
1208 .expect("receiving message failed");
1209 timeout(Duration::from_millis(100), client.close())
1210 .await
1211 .expect("close timed out")
1212 .expect("close failed");
1213 jh.await
1214 .expect("ws loop errored")
1215 .unwrap();
1216 server_thread.await.unwrap();
1217 }
1218
1219 #[tokio::test]
1220 async fn test_compressed_subscribe_receive() {
1221 let compressed_block_deltas = zstd::encode_all(
1222 block_deltas().as_bytes(),
1223 0, )
1225 .expect("Failed to compress block deltas message");
1226
1227 let exp_comm = [
1228 ExpectedComm::Receive(
1229 100,
1230 tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1231 ),
1232 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1233 ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1234 ];
1235 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1236
1237 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1238 let jh = client
1239 .connect()
1240 .await
1241 .expect("connect failed");
1242 let (_, mut rx) = timeout(
1243 Duration::from_millis(100),
1244 client.subscribe(
1245 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1246 SubscriptionOptions::new().with_compression(true),
1247 ),
1248 )
1249 .await
1250 .expect("subscription timed out")
1251 .expect("subscription failed");
1252 let _ = timeout(Duration::from_millis(100), rx.recv())
1253 .await
1254 .expect("awaiting message timeout out")
1255 .expect("receiving message failed");
1256 timeout(Duration::from_millis(100), client.close())
1257 .await
1258 .expect("close timed out")
1259 .expect("close failed");
1260 jh.await
1261 .expect("ws loop errored")
1262 .unwrap();
1263 server_thread.await.unwrap();
1264 }
1265
1266 #[tokio::test]
1267 async fn test_unsubscribe() {
1268 let exp_comm = [
1269 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1270 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1271 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1272 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1273 ];
1274 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1275
1276 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1277 let jh = client
1278 .connect()
1279 .await
1280 .expect("connect failed");
1281 let (sub_id, mut rx) = timeout(
1282 Duration::from_millis(100),
1283 client.subscribe(
1284 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1285 SubscriptionOptions::new().with_compression(false),
1286 ),
1287 )
1288 .await
1289 .expect("subscription timed out")
1290 .expect("subscription failed");
1291
1292 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1293 .await
1294 .expect("unsubscribe timed out")
1295 .expect("unsubscribe failed");
1296 let res = timeout(Duration::from_millis(100), rx.recv())
1297 .await
1298 .expect("awaiting message timeout out");
1299
1300 assert!(res.is_none());
1302
1303 timeout(Duration::from_millis(100), client.close())
1304 .await
1305 .expect("close timed out")
1306 .expect("close failed");
1307 jh.await
1308 .expect("ws loop errored")
1309 .unwrap();
1310 server_thread.await.unwrap();
1311 }
1312
1313 #[tokio::test]
1314 async fn test_subscription_unexpected_end() {
1315 let exp_comm = [
1316 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1317 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1318 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1319 ];
1320 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1321
1322 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1323 let jh = client
1324 .connect()
1325 .await
1326 .expect("connect failed");
1327 let (_, mut rx) = timeout(
1328 Duration::from_millis(100),
1329 client.subscribe(
1330 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1331 SubscriptionOptions::new().with_compression(false),
1332 ),
1333 )
1334 .await
1335 .expect("subscription timed out")
1336 .expect("subscription failed");
1337 let res = timeout(Duration::from_millis(100), rx.recv())
1338 .await
1339 .expect("awaiting message timeout out");
1340
1341 assert!(res.is_none());
1343
1344 timeout(Duration::from_millis(100), client.close())
1345 .await
1346 .expect("close timed out")
1347 .expect("close failed");
1348 jh.await
1349 .expect("ws loop errored")
1350 .unwrap();
1351 server_thread.await.unwrap();
1352 }
1353
1354 #[test_log::test(tokio::test)]
1355 async fn test_reconnect() {
1356 let exp_comm = [
1357 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe()
1358 )),
1359 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1360 subscription_confirmation()
1361 )),
1362 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1363 {
1364 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1365 "deltas": {
1366 "extractor": "vm:ambient",
1367 "chain": "ethereum",
1368 "block": {
1369 "number": 123,
1370 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1371 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1372 "chain": "ethereum",
1373 "ts": "2023-09-14T00:00:00"
1374 },
1375 "finalized_block_height": 0,
1376 "revert": false,
1377 "new_tokens": {},
1378 "account_updates": {
1379 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1380 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1381 "chain": "ethereum",
1382 "slots": {},
1383 "balance": "0x01f4",
1384 "code": "",
1385 "change": "Update"
1386 }
1387 },
1388 "state_updates": {
1389 "component_1": {
1390 "component_id": "component_1",
1391 "updated_attributes": {"attr1": "0x01"},
1392 "deleted_attributes": ["attr2"]
1393 }
1394 },
1395 "new_protocol_components": {
1396 "protocol_1":
1397 {
1398 "id": "protocol_1",
1399 "protocol_system": "system_1",
1400 "protocol_type_name": "type_1",
1401 "chain": "ethereum",
1402 "tokens": ["0x01", "0x02"],
1403 "contract_ids": ["0x01", "0x02"],
1404 "static_attributes": {"attr1": "0x01f4"},
1405 "change": "Update",
1406 "creation_tx": "0x01",
1407 "created_at": "2023-09-14T00:00:00"
1408 }
1409 },
1410 "deleted_protocol_components": {},
1411 "component_balances": {
1412 "protocol_1": {
1413 "0x01": {
1414 "token": "0x01",
1415 "balance": "0x01f4",
1416 "balance_float": 1000.0,
1417 "modify_tx": "0x01",
1418 "component_id": "protocol_1"
1419 }
1420 }
1421 },
1422 "account_balances": {
1423 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1424 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1425 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1426 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1427 "balance": "0x01f4",
1428 "modify_tx": "0x01"
1429 }
1430 }
1431 },
1432 "component_tvl": {
1433 "protocol_1": 1000.0
1434 },
1435 "dci_update": {
1436 "new_entrypoints": {},
1437 "new_entrypoint_params": {},
1438 "trace_results": {}
1439 }
1440 }
1441 }
1442 "#.to_owned()
1443 ))
1444 ];
1445 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1446 let client = WsDeltasClient::new_with_reconnects(
1447 &format!("ws://{addr}"),
1448 None,
1449 3,
1450 Duration::from_millis(110),
1452 )
1453 .unwrap();
1454
1455 let jh: JoinHandle<Result<(), DeltasError>> = client
1456 .connect()
1457 .await
1458 .expect("connect failed");
1459
1460 for _ in 0..2 {
1461 dbg!("loop");
1462 let (_, mut rx) = timeout(
1463 Duration::from_millis(200),
1464 client.subscribe(
1465 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1466 SubscriptionOptions::new().with_compression(false),
1467 ),
1468 )
1469 .await
1470 .expect("subscription timed out")
1471 .expect("subscription failed");
1472
1473 let _ = timeout(Duration::from_millis(100), rx.recv())
1474 .await
1475 .expect("awaiting message timeout out")
1476 .expect("receiving message failed");
1477
1478 let res = timeout(Duration::from_millis(200), rx.recv())
1480 .await
1481 .expect("awaiting closed connection timeout out");
1482 assert!(res.is_none());
1483 }
1484 let res = jh.await.expect("ws client join failed");
1485 assert!(res.is_err());
1487 server_thread
1488 .await
1489 .expect("ws server loop errored");
1490 }
1491
1492 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1493 let server = TcpListener::bind("127.0.0.1:0")
1494 .await
1495 .expect("localhost bind failed");
1496 let addr = server.local_addr().unwrap();
1497 let jh = tokio::spawn(async move {
1498 while let Ok((stream, _)) = server.accept().await {
1499 if accept_first {
1500 let stream = tokio_tungstenite::accept_async(stream)
1502 .await
1503 .unwrap();
1504 sleep(Duration::from_millis(10)).await;
1505 drop(stream)
1506 } else {
1507 drop(stream);
1509 }
1510 }
1511 });
1512 (addr, jh)
1513 }
1514
1515 #[test_log::test(tokio::test)]
1516 async fn test_subscribe_dead_client_after_max_attempts() {
1517 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1518 let client = WsDeltasClient::new_with_reconnects(
1519 &format!("ws://{addr}"),
1520 None,
1521 3,
1522 Duration::from_secs(0),
1523 )
1524 .unwrap();
1525
1526 let join_handle = client.connect().await.unwrap();
1527 let handle_res = join_handle.await.unwrap();
1528 assert!(handle_res.is_err());
1529 assert!(!client.is_connected().await);
1530
1531 let subscription_res = timeout(
1532 Duration::from_millis(10),
1533 client.subscribe(
1534 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1535 SubscriptionOptions::new(),
1536 ),
1537 )
1538 .await
1539 .unwrap();
1540 assert!(subscription_res.is_err());
1541 }
1542
1543 #[test_log::test(tokio::test)]
1544 async fn test_ws_client_retry_cooldown() {
1545 let start = std::time::Instant::now();
1546 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1547
1548 let client = WsDeltasClient::new_with_reconnects(
1550 &format!("ws://{addr}"),
1551 None,
1552 3, Duration::from_millis(50), )
1555 .unwrap();
1556
1557 let connect_result = client.connect().await;
1559 let elapsed = start.elapsed();
1560
1561 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1563
1564 assert!(
1566 elapsed >= Duration::from_millis(100),
1567 "Expected at least 100ms elapsed, got {:?}",
1568 elapsed
1569 );
1570
1571 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1573 }
1574
1575 #[test_log::test(tokio::test)]
1576 async fn test_buffer_full_triggers_unsubscribe() {
1577 let exp_comm = {
1579 [
1580 ExpectedComm::Receive(
1582 100,
1583 tungstenite::protocol::Message::Text(
1584 subscribe(),
1585 ),
1586 ),
1587 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1589 subscription_confirmation(),
1590 )),
1591 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1593 r#"
1594 {
1595 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1596 "deltas": {
1597 "extractor": "vm:ambient",
1598 "chain": "ethereum",
1599 "block": {
1600 "number": 123,
1601 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1602 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1603 "chain": "ethereum",
1604 "ts": "2023-09-14T00:00:00"
1605 },
1606 "finalized_block_height": 0,
1607 "revert": false,
1608 "new_tokens": {},
1609 "account_updates": {},
1610 "state_updates": {},
1611 "new_protocol_components": {},
1612 "deleted_protocol_components": {},
1613 "component_balances": {},
1614 "account_balances": {},
1615 "component_tvl": {},
1616 "dci_update": {
1617 "new_entrypoints": {},
1618 "new_entrypoint_params": {},
1619 "trace_results": {}
1620 }
1621 }
1622 }
1623 "#.to_owned()
1624 )),
1625 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1627 r#"
1628 {
1629 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1630 "deltas": {
1631 "extractor": "vm:ambient",
1632 "chain": "ethereum",
1633 "block": {
1634 "number": 124,
1635 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1636 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1637 "chain": "ethereum",
1638 "ts": "2023-09-14T00:00:01"
1639 },
1640 "finalized_block_height": 0,
1641 "revert": false,
1642 "new_tokens": {},
1643 "account_updates": {},
1644 "state_updates": {},
1645 "new_protocol_components": {},
1646 "deleted_protocol_components": {},
1647 "component_balances": {},
1648 "account_balances": {},
1649 "component_tvl": {},
1650 "dci_update": {
1651 "new_entrypoints": {},
1652 "new_entrypoint_params": {},
1653 "trace_results": {}
1654 }
1655 }
1656 }
1657 "#.to_owned()
1658 )),
1659 ExpectedComm::Receive(
1661 100,
1662 tungstenite::protocol::Message::Text(
1663 unsubscribe(),
1664 ),
1665 ),
1666 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1668 subscription_ended(),
1669 )),
1670 ]
1671 };
1672
1673 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1674
1675 let client = WsDeltasClient::new_with_custom_buffers(
1677 &format!("ws://{addr}"),
1678 None,
1679 128, 1, )
1682 .unwrap();
1683
1684 let jh = client
1685 .connect()
1686 .await
1687 .expect("connect failed");
1688
1689 let (_sub_id, mut rx) = timeout(
1690 Duration::from_millis(100),
1691 client.subscribe(
1692 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1693 SubscriptionOptions::new().with_compression(false),
1694 ),
1695 )
1696 .await
1697 .expect("subscription timed out")
1698 .expect("subscription failed");
1699
1700 tokio::time::sleep(Duration::from_millis(100)).await;
1702
1703 let mut received_msgs = Vec::new();
1705
1706 while received_msgs.len() < 3 {
1708 match timeout(Duration::from_millis(200), rx.recv()).await {
1709 Ok(Some(msg)) => {
1710 received_msgs.push(msg);
1711 }
1712 Ok(None) => {
1713 break;
1715 }
1716 Err(_) => {
1717 break;
1719 }
1720 }
1721 }
1722
1723 assert!(
1725 received_msgs.len() <= 1,
1726 "Expected buffer overflow to limit messages to at most 1, got {}",
1727 received_msgs.len()
1728 );
1729
1730 if let Some(first_msg) = received_msgs.first() {
1731 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1732 }
1733
1734 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1741
1742 jh.abort();
1744 server_thread.abort();
1745
1746 let _ = jh.await;
1747 let _ = server_thread.await;
1748 }
1749
1750 #[tokio::test]
1751 async fn test_server_error_handling() {
1752 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1753
1754 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1755
1756 let error_response = WebSocketMessage::Response(Response::Error(
1758 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1759 ));
1760 let error_json = serde_json::to_string(&error_response).unwrap();
1761
1762 let exp_comm = [
1763 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1764 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1765 ];
1766
1767 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1768
1769 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1770 let jh = client
1771 .connect()
1772 .await
1773 .expect("connect failed");
1774
1775 let result = timeout(
1776 Duration::from_millis(100),
1777 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1778 )
1779 .await
1780 .expect("subscription timed out");
1781
1782 assert!(result.is_err());
1784 if let Err(DeltasError::ServerError(msg, _)) = result {
1785 assert!(msg.contains("Subscription failed"));
1786 assert!(msg.contains("Extractor not found"));
1787 } else {
1788 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1789 }
1790
1791 timeout(Duration::from_millis(100), client.close())
1792 .await
1793 .expect("close timed out")
1794 .expect("close failed");
1795 jh.await
1796 .expect("ws loop errored")
1797 .unwrap();
1798 server_thread.await.unwrap();
1799 }
1800
1801 #[test_log::test(tokio::test)]
1802 async fn test_subscription_not_found_error() {
1803 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1805
1806 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1807 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1808
1809 let error_response = WebSocketMessage::Response(Response::Error(
1810 WebsocketError::SubscriptionNotFound(subscription_id),
1811 ));
1812 let error_json = serde_json::to_string(&error_response).unwrap();
1813
1814 let exp_comm = [
1815 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1817 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1818 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1820 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1822 ];
1823
1824 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1825
1826 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1827 let jh = client
1828 .connect()
1829 .await
1830 .expect("connect failed");
1831
1832 let (received_sub_id, _rx) = timeout(
1834 Duration::from_millis(100),
1835 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1836 )
1837 .await
1838 .expect("subscription timed out")
1839 .expect("subscription failed");
1840
1841 assert_eq!(received_sub_id, subscription_id);
1842
1843 let unsubscribe_result =
1845 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1846 .await
1847 .expect("unsubscribe timed out");
1848
1849 unsubscribe_result
1853 .expect("Unsubscribe should succeed even if server says subscription not found");
1854
1855 timeout(Duration::from_millis(100), client.close())
1856 .await
1857 .expect("close timed out")
1858 .expect("close failed");
1859 jh.await
1860 .expect("ws loop errored")
1861 .unwrap();
1862 server_thread.await.unwrap();
1863 }
1864
1865 #[test_log::test(tokio::test)]
1866 async fn test_parse_error_handling() {
1867 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1868
1869 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1870 let error_response = WebSocketMessage::Response(Response::Error(
1871 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1872 ));
1873 let error_json = serde_json::to_string(&error_response).unwrap();
1874
1875 let exp_comm = [
1876 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1878 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1879 ];
1880
1881 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1882
1883 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1884 let jh = client
1885 .connect()
1886 .await
1887 .expect("connect failed");
1888
1889 let _ = timeout(
1891 Duration::from_millis(100),
1892 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1893 )
1894 .await
1895 .expect("subscription timed out");
1896
1897 let result = jh
1899 .await
1900 .expect("ws loop should complete");
1901 assert!(result.is_err());
1902 if let Err(DeltasError::ServerError(message, _)) = result {
1903 assert!(message.contains("Server failed to parse client message"));
1904 } else {
1905 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1906 }
1907
1908 server_thread.await.unwrap();
1909 }
1910
1911 #[test_log::test(tokio::test)]
1912 async fn test_compression_error_handling() {
1913 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1914
1915 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1916 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1917 let error_response = WebSocketMessage::Response(Response::Error(
1918 WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1919 ));
1920 let error_json = serde_json::to_string(&error_response).unwrap();
1921
1922 let exp_comm = [
1923 ExpectedComm::Receive(
1925 100,
1926 tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1927 ),
1928 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1929 ];
1930
1931 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1932
1933 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1934 let jh = client
1935 .connect()
1936 .await
1937 .expect("connect failed");
1938
1939 let _ = timeout(
1941 Duration::from_millis(100),
1942 client.subscribe(extractor_id, SubscriptionOptions::new()),
1943 )
1944 .await
1945 .expect("subscription timed out");
1946
1947 let result = jh
1949 .await
1950 .expect("ws loop should complete");
1951 assert!(result.is_err());
1952 if let Err(DeltasError::ServerError(message, _)) = result {
1953 assert!(message.contains("Server failed to compress message for subscription"));
1954 } else {
1955 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1956 }
1957
1958 server_thread.await.unwrap();
1959 }
1960
1961 #[tokio::test]
1962 async fn test_subscribe_error_handling() {
1963 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1964
1965 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1966
1967 let error_response = WebSocketMessage::Response(Response::Error(
1968 WebsocketError::SubscribeError(extractor_id.clone()),
1969 ));
1970 let error_json = serde_json::to_string(&error_response).unwrap();
1971
1972 let exp_comm = [
1973 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1974 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1975 ];
1976
1977 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1978
1979 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1980 let jh = client
1981 .connect()
1982 .await
1983 .expect("connect failed");
1984
1985 let result = timeout(
1986 Duration::from_millis(100),
1987 client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1988 )
1989 .await
1990 .expect("subscription timed out");
1991
1992 assert!(result.is_err());
1994 if let Err(DeltasError::ServerError(msg, _)) = result {
1995 assert!(msg.contains("Subscription failed"));
1996 assert!(msg.contains("Failed to subscribe to extractor"));
1997 } else {
1998 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1999 }
2000
2001 timeout(Duration::from_millis(100), client.close())
2002 .await
2003 .expect("close timed out")
2004 .expect("close failed");
2005 jh.await
2006 .expect("ws loop errored")
2007 .unwrap();
2008 server_thread.await.unwrap();
2009 }
2010
2011 #[tokio::test]
2012 async fn test_cancel_pending_subscription() {
2013 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2015
2016 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2017
2018 let error_response = WebSocketMessage::Response(Response::Error(
2019 WebsocketError::ExtractorNotFound(extractor_id.clone()),
2020 ));
2021 let error_json = serde_json::to_string(&error_response).unwrap();
2022
2023 let exp_comm = [
2024 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2025 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2026 ];
2027
2028 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2029
2030 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2031 let jh = client
2032 .connect()
2033 .await
2034 .expect("connect failed");
2035
2036 let client_clone = client.clone();
2038 let extractor_id_clone = extractor_id.clone();
2039
2040 let subscription1 = tokio::spawn({
2041 let client_for_spawn = client.clone();
2042 async move {
2043 client_for_spawn
2044 .subscribe(extractor_id, SubscriptionOptions::new().with_compression(false))
2045 .await
2046 }
2047 });
2048
2049 let subscription2 = tokio::spawn(async move {
2050 client_clone
2052 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2053 .await
2054 });
2055
2056 let (result1, result2) = tokio::join!(subscription1, subscription2);
2057
2058 let result1 = result1.unwrap();
2059 let result2 = result2.unwrap();
2060
2061 assert!(result1.is_err() || result2.is_err());
2064
2065 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2066 } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2068 } else {
2070 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2071 }
2072
2073 timeout(Duration::from_millis(100), client.close())
2074 .await
2075 .expect("close timed out")
2076 .expect("close failed");
2077 jh.await
2078 .expect("ws loop errored")
2079 .unwrap();
2080 server_thread.await.unwrap();
2081 }
2082
2083 #[tokio::test]
2084 async fn test_force_unsubscribe_prevents_multiple_calls() {
2085 let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
2089
2090 let exp_comm = [
2091 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2092 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
2093 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
2095 ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
2096 ];
2097
2098 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2099
2100 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2101 let jh = client
2102 .connect()
2103 .await
2104 .expect("connect failed");
2105
2106 let (received_sub_id, _rx) = timeout(
2107 Duration::from_millis(100),
2108 client.subscribe(
2109 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2110 SubscriptionOptions::new().with_compression(false),
2111 ),
2112 )
2113 .await
2114 .expect("subscription timed out")
2115 .expect("subscription failed");
2116
2117 assert_eq!(received_sub_id, subscription_id);
2118
2119 {
2121 let mut inner_guard = client.inner.lock().await;
2122 let inner = inner_guard
2123 .as_mut()
2124 .expect("client should be connected");
2125
2126 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2128 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2129 }
2130
2131 tokio::time::sleep(Duration::from_millis(50)).await;
2133
2134 let _ = timeout(Duration::from_millis(100), client.close()).await;
2136
2137 let _ = jh.await;
2139 let _ = server_thread.await;
2140 }
2141}