1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, MutexGuard, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62 BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65use zstd;
66
67use crate::TYCHO_SERVER_VERSION;
68
69#[derive(Error, Debug)]
70pub enum DeltasError {
71 #[error("Failed to parse URI: {0}. Error: {1}")]
73 UriParsing(String, String),
74
75 #[error("The requested subscription is already pending")]
77 SubscriptionAlreadyPending,
78
79 #[error("The server replied with an error: {0}")]
80 ServerError(String, #[source] WebsocketError),
81
82 #[error("{0}")]
85 TransportError(String),
86
87 #[error("The buffer is full!")]
91 BufferFull,
92
93 #[error("The client is not connected!")]
97 NotConnected,
98
99 #[error("The client is already connected!")]
101 AlreadyConnected,
102
103 #[error("The server closed the connection!")]
105 ConnectionClosed,
106
107 #[error("Connection error: {0}")]
109 ConnectionError(#[from] Box<tungstenite::Error>),
110
111 #[error("Tycho FatalError: {0}")]
113 Fatal(String),
114}
115
116#[derive(Clone, Debug)]
117pub struct SubscriptionOptions {
118 include_state: bool,
119 compression: bool,
120}
121
122impl Default for SubscriptionOptions {
123 fn default() -> Self {
124 Self { include_state: true, compression: true }
125 }
126}
127
128impl SubscriptionOptions {
129 pub fn new() -> Self {
130 Self::default()
131 }
132 pub fn with_state(mut self, val: bool) -> Self {
133 self.include_state = val;
134 self
135 }
136 pub fn with_compression(mut self, val: bool) -> Self {
137 self.compression = val;
138 self
139 }
140}
141
142#[cfg_attr(test, automock)]
143#[async_trait]
144pub trait DeltasClient {
145 async fn subscribe(
152 &self,
153 extractor_id: ExtractorIdentity,
154 options: SubscriptionOptions,
155 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
156
157 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
159
160 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
162
163 async fn close(&self) -> Result<(), DeltasError>;
165}
166
167#[derive(Clone)]
168pub struct WsDeltasClient {
169 uri: Uri,
171 auth_key: Option<String>,
173 max_reconnects: u64,
175 retry_cooldown: Duration,
177 ws_buffer_size: usize,
180 subscription_buffer_size: usize,
183 conn_notify: Arc<Notify>,
185 inner: Arc<Mutex<Option<Inner>>>,
187 dead: Arc<AtomicBool>,
189}
190
191type WebSocketSink =
192 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
193
194#[derive(Debug)]
206enum SubscriptionInfo {
207 RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
209 Active,
211 RequestedUnsubscription(oneshot::Sender<()>),
213}
214
215struct Inner {
217 sink: WebSocketSink,
219 cmd_tx: Sender<()>,
221 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
223 subscriptions: HashMap<Uuid, SubscriptionInfo>,
225 sender: HashMap<Uuid, Sender<BlockChanges>>,
228 buffer_size: usize,
230}
231
232impl Inner {
236 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
237 Self {
238 sink,
239 cmd_tx,
240 pending: HashMap::new(),
241 subscriptions: HashMap::new(),
242 sender: HashMap::new(),
243 buffer_size,
244 }
245 }
246
247 #[allow(clippy::result_large_err)]
249 fn new_subscription(
250 &mut self,
251 id: &ExtractorIdentity,
252 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
253 ) -> Result<(), DeltasError> {
254 if self.pending.contains_key(id) {
255 return Err(DeltasError::SubscriptionAlreadyPending);
256 }
257 self.pending
258 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
259 Ok(())
260 }
261
262 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
266 if let Some(info) = self.pending.remove(extractor_id) {
267 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
268 let (tx, rx) = mpsc::channel(self.buffer_size);
269 self.sender.insert(subscription_id, tx);
270 self.subscriptions
271 .insert(subscription_id, SubscriptionInfo::Active);
272 let _ = ready_tx
273 .send(Ok((subscription_id, rx)))
274 .map_err(|_| {
275 warn!(
276 ?extractor_id,
277 ?subscription_id,
278 "Subscriber for has gone away. Ignoring."
279 )
280 });
281 } else {
282 error!(
283 ?extractor_id,
284 ?subscription_id,
285 "Pending subscription was not in the correct state to
286 transition to active. Ignoring!"
287 )
288 }
289 } else {
290 error!(
291 ?extractor_id,
292 ?subscription_id,
293 "Tried to mark an unknown subscription as active. Ignoring!"
294 );
295 }
296 }
297
298 #[allow(clippy::result_large_err)]
300 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
301 if let Some(sender) = self.sender.get_mut(id) {
302 sender
303 .try_send(msg)
304 .map_err(|e| match e {
305 TrySendError::Full(_) => DeltasError::BufferFull,
306 TrySendError::Closed(_) => {
307 DeltasError::TransportError("The subscriber has gone away".to_string())
308 }
309 })?;
310 }
311 Ok(())
312 }
313
314 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
319 if let Some(info) = self
320 .subscriptions
321 .get_mut(subscription_id)
322 {
323 if let SubscriptionInfo::Active = info {
324 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
325 }
326 } else {
327 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
329 }
330 }
331
332 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
339 if let Entry::Occupied(e) = self
340 .subscriptions
341 .entry(subscription_id)
342 {
343 let info = e.remove();
344 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
345 let _ = tx.send(()).map_err(|_| {
346 debug!(?subscription_id, "failed to notify about removed subscription")
347 });
348 self.sender
349 .remove(&subscription_id)
350 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
351 } else {
352 warn!(?subscription_id, "Subscription ended unexpectedly!");
353 self.sender
354 .remove(&subscription_id)
355 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
356 }
357 } else {
358 trace!(
363 ?subscription_id,
364 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
365 );
366 }
367
368 Ok(())
369 }
370
371 fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
372 if let Some(sub_info) = self.pending.remove(extractor_id) {
373 match sub_info {
374 SubscriptionInfo::RequestedSubscription(tx) => {
375 let _ = tx
376 .send(Err(DeltasError::ServerError(
377 format!("Subscription failed: {error}"),
378 error.clone(),
379 )))
380 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
381 }
382 _ => {
383 error!(?extractor_id, "Pending subscription in wrong state")
384 }
385 }
386 } else {
387 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
388 }
389 }
390
391 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
393 self.sink.send(msg).await.map_err(|e| {
394 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
395 })
396 }
397}
398
399impl WsDeltasClient {
401 #[allow(clippy::result_large_err)]
403 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
404 let uri = ws_uri
405 .parse::<Uri>()
406 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
407 Ok(Self {
408 uri,
409 auth_key: auth_key.map(|s| s.to_string()),
410 inner: Arc::new(Mutex::new(None)),
411 ws_buffer_size: 128,
412 subscription_buffer_size: 128,
413 conn_notify: Arc::new(Notify::new()),
414 max_reconnects: 5,
415 retry_cooldown: Duration::from_millis(500),
416 dead: Arc::new(AtomicBool::new(false)),
417 })
418 }
419
420 #[allow(clippy::result_large_err)]
422 pub fn new_with_reconnects(
423 ws_uri: &str,
424 auth_key: Option<&str>,
425 max_reconnects: u64,
426 retry_cooldown: Duration,
427 ) -> Result<Self, DeltasError> {
428 let uri = ws_uri
429 .parse::<Uri>()
430 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
431
432 Ok(Self {
433 uri,
434 auth_key: auth_key.map(|s| s.to_string()),
435 inner: Arc::new(Mutex::new(None)),
436 ws_buffer_size: 128,
437 subscription_buffer_size: 128,
438 conn_notify: Arc::new(Notify::new()),
439 max_reconnects,
440 retry_cooldown,
441 dead: Arc::new(AtomicBool::new(false)),
442 })
443 }
444
445 #[cfg(test)]
447 #[allow(clippy::result_large_err)]
448 pub fn new_with_custom_buffers(
449 ws_uri: &str,
450 auth_key: Option<&str>,
451 ws_buffer_size: usize,
452 subscription_buffer_size: usize,
453 ) -> Result<Self, DeltasError> {
454 let uri = ws_uri
455 .parse::<Uri>()
456 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
457 Ok(Self {
458 uri,
459 auth_key: auth_key.map(|s| s.to_string()),
460 inner: Arc::new(Mutex::new(None)),
461 ws_buffer_size,
462 subscription_buffer_size,
463 conn_notify: Arc::new(Notify::new()),
464 max_reconnects: 5,
465 retry_cooldown: Duration::from_millis(0),
466 dead: Arc::new(AtomicBool::new(false)),
467 })
468 }
469
470 async fn is_connected(&self) -> bool {
474 let guard = self.inner.as_ref().lock().await;
475 guard.is_some()
476 }
477
478 async fn ensure_connection(&self) -> Result<(), DeltasError> {
483 if self.dead.load(Ordering::SeqCst) {
484 return Err(DeltasError::NotConnected)
485 };
486 if !self.is_connected().await {
487 self.conn_notify.notified().await;
488 };
489 Ok(())
490 }
491
492 #[instrument(skip(self, msg))]
497 async fn handle_msg(
498 &self,
499 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
500 ) -> Result<(), DeltasError> {
501 let mut guard = self.inner.lock().await;
502
503 match msg {
504 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
509 serde_json::Value,
510 >(&text)
511 {
512 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
513 Ok(ws_message) => match ws_message {
514 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
515 Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
516 }
517 WebSocketMessage::Response(Response::NewSubscription {
518 extractor_id,
519 subscription_id,
520 }) => {
521 info!(?extractor_id, ?subscription_id, "Received a new subscription");
522 let inner = guard
523 .as_mut()
524 .ok_or_else(|| DeltasError::NotConnected)?;
525 inner.mark_active(&extractor_id, subscription_id);
526 }
527 WebSocketMessage::Response(Response::SubscriptionEnded {
528 subscription_id,
529 }) => {
530 info!(?subscription_id, "Received a subscription ended");
531 let inner = guard
532 .as_mut()
533 .ok_or_else(|| DeltasError::NotConnected)?;
534 inner.remove_subscription(subscription_id)?;
535 }
536 WebSocketMessage::Response(Response::Error(error)) => match &error {
537 WebsocketError::ExtractorNotFound(extractor_id) => {
538 let inner = guard
539 .as_mut()
540 .ok_or_else(|| DeltasError::NotConnected)?;
541 inner.cancel_pending(extractor_id, &error);
542 }
543 WebsocketError::SubscriptionNotFound(subscription_id) => {
544 debug!("Received subscription not found, removing subscription");
545 let inner = guard
546 .as_mut()
547 .ok_or_else(|| DeltasError::NotConnected)?;
548 inner.remove_subscription(*subscription_id)?;
549 }
550 WebsocketError::ParseError(raw, e) => {
551 return Err(DeltasError::ServerError(
552 format!(
553 "Server failed to parse client message: {e}, msg: {raw}"
554 ),
555 error.clone(),
556 ))
557 }
558 WebsocketError::CompressionError(subscription_id, e) => {
559 return Err(DeltasError::ServerError(
560 format!(
561 "Server failed to compress message for subscription: {subscription_id}, error: {e}"
562 ),
563 error.clone(),
564 ))
565 }
566 WebsocketError::SubscribeError(extractor_id) => {
567 let inner = guard
568 .as_mut()
569 .ok_or_else(|| DeltasError::NotConnected)?;
570 inner.cancel_pending(extractor_id, &error);
571 }
572 },
573 },
574 Err(e) => {
575 error!(
576 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
577 e, text
578 );
579 }
580 },
581 Err(e) => {
582 error!(
583 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
584 e, text
585 );
586 }
587 },
588 Ok(tungstenite::protocol::Message::Binary(data)) => {
589 match zstd::decode_all(data.as_slice()) {
592 Ok(decompressed) => match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
593 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value.clone()) {
594 Ok(ws_message) => match ws_message {
595 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
596 Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
597 }
598 _ => {
599 error!(
600 "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
601 );
602 }
603
604 },
605 Err(e) => {
606 error!(
607 "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
608 );
609 }
610 },
611 Err(e) => {
612 error!(
613 "Failed to deserialize compressed message: invalid JSON. {e}",
614 );
615 }
616 },
617 Err(e) => {
618 error!("Failed to decompress zstd data: {}", e);
619 }
620 }
621 },
622 Ok(tungstenite::protocol::Message::Ping(_)) => {
623 let inner = guard
625 .as_mut()
626 .ok_or_else(|| DeltasError::NotConnected)?;
627 if let Err(error) = inner
628 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
629 .await
630 {
631 debug!(?error, "Failed to send pong!");
632 }
633 }
634 Ok(tungstenite::protocol::Message::Pong(_)) => {
635 }
637 Ok(tungstenite::protocol::Message::Close(_)) => {
638 return Err(DeltasError::ConnectionClosed);
639 }
640 Ok(unknown_msg) => {
641 info!("Received an unknown message type: {:?}", unknown_msg);
642 }
643 Err(error) => {
644 error!(?error, "Websocket error");
645 return Err(match error {
646 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
647 tungstenite::Error::AlreadyClosed => {
648 warn!("Received AlreadyClosed error which is indicative of a bug!");
649 DeltasError::ConnectionError(Box::new(error))
650 }
651 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
652 DeltasError::ConnectionError(Box::new(error))
653 }
654 _ => DeltasError::Fatal(error.to_string()),
655 });
656 }
657 };
658 Ok(())
659 }
660
661 async fn handle_block_changes_msg(
662 guard: &mut MutexGuard<'_, Option<Inner>>,
663 subscription_id: Uuid,
664 deltas: BlockChanges,
665 ) -> Result<(), DeltasError> {
666 trace!(?deltas, "Received a block state change, sending to channel");
667 let inner = guard
668 .as_mut()
669 .ok_or_else(|| DeltasError::NotConnected)?;
670 match inner.send(&subscription_id, deltas) {
671 Err(DeltasError::BufferFull) => {
672 error!(?subscription_id, "Buffer full, unsubscribing!");
673 Self::force_unsubscribe(subscription_id, inner).await;
674 }
675 Err(_) => {
676 warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
677 Self::force_unsubscribe(subscription_id, inner).await;
678 }
679 _ => { }
680 }
681 Ok(())
682 }
683
684 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
689 if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
691 .subscriptions
692 .get(&subscription_id)
693 {
694 return
695 }
696
697 let (tx, rx) = oneshot::channel();
698 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
699 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
700 } else {
701 match tokio::time::timeout(Duration::from_secs(5), rx).await {
703 Ok(_) => {
704 debug!(?subscription_id, "Unsubscribe completed successfully");
705 }
706 Err(_) => {
707 warn!(?subscription_id, "Unsubscribe completion timed out");
708 }
709 }
710 }
711 }
712
713 async fn unsubscribe_inner(
719 inner: &mut Inner,
720 subscription_id: Uuid,
721 ready_tx: oneshot::Sender<()>,
722 ) -> Result<(), DeltasError> {
723 debug!(?subscription_id, "Unsubscribing");
724 inner.end_subscription(&subscription_id, ready_tx);
725 let cmd = Command::Unsubscribe { subscription_id };
726 inner
727 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
728 |e| {
729 DeltasError::TransportError(format!(
730 "Failed to serialize unsubscribe command: {e}"
731 ))
732 },
733 )?))
734 .await?;
735 Ok(())
736 }
737}
738
739#[async_trait]
740impl DeltasClient for WsDeltasClient {
741 #[instrument(skip(self))]
742 async fn subscribe(
743 &self,
744 extractor_id: ExtractorIdentity,
745 options: SubscriptionOptions,
746 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
747 trace!("Starting subscribe");
748 self.ensure_connection().await?;
749 let (ready_tx, ready_rx) = oneshot::channel();
750 {
751 let mut guard = self.inner.lock().await;
752 let inner = guard
753 .as_mut()
754 .ok_or_else(|| DeltasError::NotConnected)?;
755 trace!("Sending subscribe command");
756 inner.new_subscription(&extractor_id, ready_tx)?;
757 let cmd = Command::Subscribe {
758 extractor_id,
759 include_state: options.include_state,
760 compression: options.compression,
761 };
762 inner
763 .ws_send(tungstenite::protocol::Message::Text(
764 serde_json::to_string(&cmd).map_err(|e| {
765 DeltasError::TransportError(format!(
766 "Failed to serialize subscribe command: {e}"
767 ))
768 })?,
769 ))
770 .await?;
771 }
772 trace!("Waiting for subscription response");
773 let res = ready_rx.await.map_err(|_| {
774 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
775 })??;
776 trace!("Subscription successful");
777 Ok(res)
778 }
779
780 #[instrument(skip(self))]
781 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
782 self.ensure_connection().await?;
783 let (ready_tx, ready_rx) = oneshot::channel();
784 {
785 let mut guard = self.inner.lock().await;
786 let inner = guard
787 .as_mut()
788 .ok_or_else(|| DeltasError::NotConnected)?;
789
790 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
791 }
792 ready_rx.await.map_err(|_| {
793 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
794 })?;
795
796 Ok(())
797 }
798
799 #[instrument(skip(self))]
800 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
801 if self.is_connected().await {
802 return Err(DeltasError::AlreadyConnected);
803 }
804 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
805 info!(?ws_uri, "Starting TychoWebsocketClient");
806
807 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
808 {
809 let mut guard = self.inner.as_ref().lock().await;
810 *guard = None;
811 }
812 let this = self.clone();
813 let jh = tokio::spawn(async move {
814 let mut retry_count = 0;
815 let mut result = Err(DeltasError::NotConnected);
816
817 'retry: while retry_count < this.max_reconnects {
818 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
819 if retry_count > 0 {
820 sleep(this.retry_cooldown).await;
821 }
822
823 let mut request_builder = Request::builder()
825 .uri(&ws_uri)
826 .header(SEC_WEBSOCKET_KEY, generate_key())
827 .header(SEC_WEBSOCKET_VERSION, 13)
828 .header(CONNECTION, "Upgrade")
829 .header(UPGRADE, "websocket")
830 .header(
831 HOST,
832 this.uri.host().ok_or_else(|| {
833 DeltasError::UriParsing(
834 ws_uri.clone(),
835 "No host found in tycho url".to_string(),
836 )
837 })?,
838 )
839 .header(
840 USER_AGENT,
841 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
842 );
843
844 if let Some(ref key) = this.auth_key {
846 request_builder = request_builder.header(AUTHORIZATION, key);
847 }
848
849 let request = request_builder.body(()).map_err(|e| {
850 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
851 })?;
852 let (conn, _) = match connect_async(request).await {
853 Ok(conn) => conn,
854 Err(e) => {
855 retry_count += 1;
857 let mut guard = this.inner.as_ref().lock().await;
858 *guard = None;
859
860 warn!(
861 e = e.to_string(),
862 "Failed to connect to WebSocket server; Reconnecting"
863 );
864 continue 'retry;
865 }
866 };
867
868 let (ws_tx_new, ws_rx_new) = conn.split();
869 {
870 let mut guard = this.inner.as_ref().lock().await;
871 *guard =
872 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
873 }
874 let mut msg_rx = ws_rx_new.boxed();
875
876 info!("Connection Successful: TychoWebsocketClient started");
877 this.conn_notify.notify_waiters();
878 result = Ok(());
879
880 loop {
881 let res = tokio::select! {
882 msg = msg_rx.next() => match msg {
883 Some(msg) => this.handle_msg(msg).await,
884 None => {
885 warn!("Websocket connection silently closed, giving up!");
889 break 'retry
890 }
891 },
892 _ = cmd_rx.recv() => {break 'retry},
893 };
894 if let Err(error) = res {
895 debug!(?error, "WsError");
896 if matches!(
897 error,
898 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
899 ) {
900 retry_count += 1;
902 let mut guard = this.inner.as_ref().lock().await;
903 *guard = None;
904
905 warn!(
906 ?error,
907 ?retry_count,
908 "Connection dropped unexpectedly; Reconnecting..."
909 );
910 break;
911 } else {
912 error!(?error, "Fatal error; Exiting");
914 result = Err(error);
915 break 'retry;
916 }
917 }
918 }
919 }
920 debug!(
921 retry_count,
922 max_reconnects=?this.max_reconnects,
923 "Reconnection loop ended"
924 );
925 let mut guard = this.inner.as_ref().lock().await;
927 *guard = None;
928
929 if retry_count >= this.max_reconnects {
931 error!("Max reconnection attempts reached; Exiting");
932 this.dead.store(true, Ordering::SeqCst);
933 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
935 }
936
937 result
938 });
939
940 self.conn_notify.notified().await;
941
942 if self.is_connected().await {
943 Ok(jh)
944 } else {
945 Err(DeltasError::NotConnected)
946 }
947 }
948
949 #[instrument(skip(self))]
950 async fn close(&self) -> Result<(), DeltasError> {
951 info!("Closing TychoWebsocketClient");
952 let mut guard = self.inner.lock().await;
953 let inner = guard
954 .as_mut()
955 .ok_or_else(|| DeltasError::NotConnected)?;
956 inner
957 .cmd_tx
958 .send(())
959 .await
960 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
961 Ok(())
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use std::net::SocketAddr;
968
969 use test_log::test;
970 use tokio::{net::TcpListener, time::timeout};
971 use tycho_common::dto::Chain;
972
973 use super::*;
974
975 #[derive(Clone)]
976 enum ExpectedComm {
977 Receive(u64, tungstenite::protocol::Message),
978 Send(tungstenite::protocol::Message),
979 }
980
981 async fn mock_tycho_ws(
982 messages: &[ExpectedComm],
983 reconnects: usize,
984 ) -> (SocketAddr, JoinHandle<()>) {
985 info!("Starting mock webserver");
986 let server = TcpListener::bind("127.0.0.1:0")
988 .await
989 .expect("localhost bind failed");
990 let addr = server.local_addr().unwrap();
991 let messages = messages.to_vec();
992
993 let jh = tokio::spawn(async move {
994 info!("mock webserver started");
995 for _ in 0..(reconnects + 1) {
996 info!("Awaiting client connections");
997 if let Ok((stream, _)) = server.accept().await {
998 info!("Client connected");
999 let mut websocket = tokio_tungstenite::accept_async(stream)
1000 .await
1001 .unwrap();
1002
1003 info!("Handling messages..");
1004 for c in messages.iter().cloned() {
1005 match c {
1006 ExpectedComm::Receive(t, exp) => {
1007 info!("Awaiting message...");
1008 let msg = timeout(Duration::from_millis(t), websocket.next())
1009 .await
1010 .expect("Receive timeout")
1011 .expect("Stream exhausted")
1012 .expect("Failed to receive message.");
1013 info!("Message received");
1014 assert_eq!(msg, exp)
1015 }
1016 ExpectedComm::Send(data) => {
1017 info!("Sending message");
1018 websocket
1019 .send(data)
1020 .await
1021 .expect("Failed to send message");
1022 info!("Message sent");
1023 }
1024 };
1025 }
1026 info!("Mock communication completed");
1027 sleep(Duration::from_millis(100)).await;
1028 let _ = websocket.close(None).await;
1030 info!("Mock server closed connection");
1031 }
1032 }
1033 info!("mock server ended");
1034 });
1035 (addr, jh)
1036 }
1037
1038 const SUBSCRIBE: &str = r#"
1039 {
1040 "method":"subscribe",
1041 "extractor_id":{
1042 "chain":"ethereum",
1043 "name":"vm:ambient"
1044 },
1045 "include_state": true,
1046 "compression": false
1047 }"#;
1048
1049 const SUBSCRIPTION_CONFIRMATION: &str = r#"
1050 {
1051 "method": "newsubscription",
1052 "extractor_id":{
1053 "chain": "ethereum",
1054 "name": "vm:ambient"
1055 },
1056 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1057 }"#;
1058
1059 const BLOCK_DELTAS: &str = r#"
1060 {
1061 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1062 "deltas": {
1063 "extractor": "vm:ambient",
1064 "chain": "ethereum",
1065 "block": {
1066 "number": 123,
1067 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1068 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1069 "chain": "ethereum",
1070 "ts": "2023-09-14T00:00:00"
1071 },
1072 "finalized_block_height": 0,
1073 "revert": false,
1074 "new_tokens": {},
1075 "account_updates": {
1076 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1077 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1078 "chain": "ethereum",
1079 "slots": {},
1080 "balance": "0x01f4",
1081 "code": "",
1082 "change": "Update"
1083 }
1084 },
1085 "state_updates": {
1086 "component_1": {
1087 "component_id": "component_1",
1088 "updated_attributes": {"attr1": "0x01"},
1089 "deleted_attributes": ["attr2"]
1090 }
1091 },
1092 "new_protocol_components":
1093 { "protocol_1": {
1094 "id": "protocol_1",
1095 "protocol_system": "system_1",
1096 "protocol_type_name": "type_1",
1097 "chain": "ethereum",
1098 "tokens": ["0x01", "0x02"],
1099 "contract_ids": ["0x01", "0x02"],
1100 "static_attributes": {"attr1": "0x01f4"},
1101 "change": "Update",
1102 "creation_tx": "0x01",
1103 "created_at": "2023-09-14T00:00:00"
1104 }
1105 },
1106 "deleted_protocol_components": {},
1107 "component_balances": {
1108 "protocol_1":
1109 {
1110 "0x01": {
1111 "token": "0x01",
1112 "balance": "0x01f4",
1113 "balance_float": 0.0,
1114 "modify_tx": "0x01",
1115 "component_id": "protocol_1"
1116 }
1117 }
1118 },
1119 "account_balances": {
1120 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1121 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1122 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1123 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1124 "balance": "0x01f4",
1125 "modify_tx": "0x01"
1126 }
1127 }
1128 },
1129 "component_tvl": {
1130 "protocol_1": 1000.0
1131 },
1132 "dci_update": {
1133 "new_entrypoints": {},
1134 "new_entrypoint_params": {},
1135 "trace_results": {}
1136 }
1137 }
1138 }
1139 "#;
1140
1141 const UNSUBSCRIBE: &str = r#"
1142 {
1143 "method": "unsubscribe",
1144 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1145 }
1146 "#;
1147
1148 const SUBSCRIPTION_ENDED: &str = r#"
1149 {
1150 "method": "subscriptionended",
1151 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1152 }
1153 "#;
1154
1155 #[tokio::test]
1156 async fn test_uncompressed_subscribe_receive() {
1157 let exp_comm = [
1158 ExpectedComm::Receive(
1159 100,
1160 tungstenite::protocol::Message::Text(
1161 SUBSCRIBE
1162 .to_owned()
1163 .replace(|c: char| c.is_whitespace(), ""),
1164 ),
1165 ),
1166 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1167 SUBSCRIPTION_CONFIRMATION
1168 .to_owned()
1169 .replace(|c: char| c.is_whitespace(), ""),
1170 )),
1171 ExpectedComm::Send(tungstenite::protocol::Message::Text(BLOCK_DELTAS.to_owned())),
1172 ];
1173 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1174
1175 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1176 let jh = client
1177 .connect()
1178 .await
1179 .expect("connect failed");
1180 let (_, mut rx) = timeout(
1181 Duration::from_millis(100),
1182 client.subscribe(
1183 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1184 SubscriptionOptions::new().with_compression(false),
1185 ),
1186 )
1187 .await
1188 .expect("subscription timed out")
1189 .expect("subscription failed");
1190 let _ = timeout(Duration::from_millis(100), rx.recv())
1191 .await
1192 .expect("awaiting message timeout out")
1193 .expect("receiving message failed");
1194 timeout(Duration::from_millis(100), client.close())
1195 .await
1196 .expect("close timed out")
1197 .expect("close failed");
1198 jh.await
1199 .expect("ws loop errored")
1200 .unwrap();
1201 server_thread.await.unwrap();
1202 }
1203
1204 #[tokio::test]
1205 async fn test_compressed_subscribe_receive() {
1206 let compressed_block_deltas = zstd::encode_all(
1207 BLOCK_DELTAS.as_bytes(),
1208 0, )
1210 .expect("Failed to compress block deltas message");
1211
1212 let exp_comm = [
1213 ExpectedComm::Receive(
1214 100,
1215 tungstenite::protocol::Message::Text(
1216 r#"
1217 {
1218 "method":"subscribe",
1219 "extractor_id":{
1220 "chain":"ethereum",
1221 "name":"vm:ambient"
1222 },
1223 "include_state": true,
1224 "compression": true
1225 }"#
1226 .to_owned()
1227 .replace(|c: char| c.is_whitespace(), ""),
1228 ),
1229 ),
1230 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1231 SUBSCRIPTION_CONFIRMATION
1232 .to_owned()
1233 .replace(|c: char| c.is_whitespace(), ""),
1234 )),
1235 ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1236 ];
1237 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1238
1239 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1240 let jh = client
1241 .connect()
1242 .await
1243 .expect("connect failed");
1244 let (_, mut rx) = timeout(
1245 Duration::from_millis(100),
1246 client.subscribe(
1247 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1248 SubscriptionOptions::new().with_compression(true),
1249 ),
1250 )
1251 .await
1252 .expect("subscription timed out")
1253 .expect("subscription failed");
1254 let _ = timeout(Duration::from_millis(100), rx.recv())
1255 .await
1256 .expect("awaiting message timeout out")
1257 .expect("receiving message failed");
1258 timeout(Duration::from_millis(100), client.close())
1259 .await
1260 .expect("close timed out")
1261 .expect("close failed");
1262 jh.await
1263 .expect("ws loop errored")
1264 .unwrap();
1265 server_thread.await.unwrap();
1266 }
1267
1268 #[tokio::test]
1269 async fn test_unsubscribe() {
1270 let exp_comm = [
1271 ExpectedComm::Receive(
1272 100,
1273 tungstenite::protocol::Message::Text(
1274 SUBSCRIBE
1275 .to_owned()
1276 .replace(|c: char| c.is_whitespace(), ""),
1277 ),
1278 ),
1279 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1280 SUBSCRIPTION_CONFIRMATION
1281 .to_owned()
1282 .replace(|c: char| c.is_whitespace(), ""),
1283 )),
1284 ExpectedComm::Receive(
1285 100,
1286 tungstenite::protocol::Message::Text(
1287 UNSUBSCRIBE
1288 .to_owned()
1289 .replace(|c: char| c.is_whitespace(), ""),
1290 ),
1291 ),
1292 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1293 SUBSCRIPTION_ENDED
1294 .to_owned()
1295 .replace(|c: char| c.is_whitespace(), ""),
1296 )),
1297 ];
1298 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1299
1300 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1301 let jh = client
1302 .connect()
1303 .await
1304 .expect("connect failed");
1305 let (sub_id, mut rx) = timeout(
1306 Duration::from_millis(100),
1307 client.subscribe(
1308 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1309 SubscriptionOptions::new().with_compression(false),
1310 ),
1311 )
1312 .await
1313 .expect("subscription timed out")
1314 .expect("subscription failed");
1315
1316 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1317 .await
1318 .expect("unsubscribe timed out")
1319 .expect("unsubscribe failed");
1320 let res = timeout(Duration::from_millis(100), rx.recv())
1321 .await
1322 .expect("awaiting message timeout out");
1323
1324 assert!(res.is_none());
1326
1327 timeout(Duration::from_millis(100), client.close())
1328 .await
1329 .expect("close timed out")
1330 .expect("close failed");
1331 jh.await
1332 .expect("ws loop errored")
1333 .unwrap();
1334 server_thread.await.unwrap();
1335 }
1336
1337 #[tokio::test]
1338 async fn test_subscription_unexpected_end() {
1339 let exp_comm = [
1340 ExpectedComm::Receive(
1341 100,
1342 tungstenite::protocol::Message::Text(
1343 SUBSCRIBE
1344 .to_owned()
1345 .replace(|c: char| c.is_whitespace(), ""),
1346 ),
1347 ),
1348 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1349 SUBSCRIPTION_CONFIRMATION
1350 .to_owned()
1351 .replace(|c: char| c.is_whitespace(), ""),
1352 )),
1353 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1354 SUBSCRIPTION_ENDED
1355 .to_owned()
1356 .replace(|c: char| c.is_whitespace(), ""),
1357 )),
1358 ];
1359 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1360
1361 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1362 let jh = client
1363 .connect()
1364 .await
1365 .expect("connect failed");
1366 let (_, mut rx) = timeout(
1367 Duration::from_millis(100),
1368 client.subscribe(
1369 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1370 SubscriptionOptions::new().with_compression(false),
1371 ),
1372 )
1373 .await
1374 .expect("subscription timed out")
1375 .expect("subscription failed");
1376 let res = timeout(Duration::from_millis(100), rx.recv())
1377 .await
1378 .expect("awaiting message timeout out");
1379
1380 assert!(res.is_none());
1382
1383 timeout(Duration::from_millis(100), client.close())
1384 .await
1385 .expect("close timed out")
1386 .expect("close failed");
1387 jh.await
1388 .expect("ws loop errored")
1389 .unwrap();
1390 server_thread.await.unwrap();
1391 }
1392
1393 #[test_log::test(tokio::test)]
1394 async fn test_reconnect() {
1395 let exp_comm = [
1396 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(SUBSCRIBE.to_owned().replace(|c: char| c.is_whitespace(), "")
1397 )),
1398 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1399 SUBSCRIPTION_CONFIRMATION.to_owned().replace(|c: char| c.is_whitespace(), "")
1400 )),
1401 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1402 {
1403 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1404 "deltas": {
1405 "extractor": "vm:ambient",
1406 "chain": "ethereum",
1407 "block": {
1408 "number": 123,
1409 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1410 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1411 "chain": "ethereum",
1412 "ts": "2023-09-14T00:00:00"
1413 },
1414 "finalized_block_height": 0,
1415 "revert": false,
1416 "new_tokens": {},
1417 "account_updates": {
1418 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1419 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1420 "chain": "ethereum",
1421 "slots": {},
1422 "balance": "0x01f4",
1423 "code": "",
1424 "change": "Update"
1425 }
1426 },
1427 "state_updates": {
1428 "component_1": {
1429 "component_id": "component_1",
1430 "updated_attributes": {"attr1": "0x01"},
1431 "deleted_attributes": ["attr2"]
1432 }
1433 },
1434 "new_protocol_components": {
1435 "protocol_1":
1436 {
1437 "id": "protocol_1",
1438 "protocol_system": "system_1",
1439 "protocol_type_name": "type_1",
1440 "chain": "ethereum",
1441 "tokens": ["0x01", "0x02"],
1442 "contract_ids": ["0x01", "0x02"],
1443 "static_attributes": {"attr1": "0x01f4"},
1444 "change": "Update",
1445 "creation_tx": "0x01",
1446 "created_at": "2023-09-14T00:00:00"
1447 }
1448 },
1449 "deleted_protocol_components": {},
1450 "component_balances": {
1451 "protocol_1": {
1452 "0x01": {
1453 "token": "0x01",
1454 "balance": "0x01f4",
1455 "balance_float": 1000.0,
1456 "modify_tx": "0x01",
1457 "component_id": "protocol_1"
1458 }
1459 }
1460 },
1461 "account_balances": {
1462 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1463 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1464 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1465 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1466 "balance": "0x01f4",
1467 "modify_tx": "0x01"
1468 }
1469 }
1470 },
1471 "component_tvl": {
1472 "protocol_1": 1000.0
1473 },
1474 "dci_update": {
1475 "new_entrypoints": {},
1476 "new_entrypoint_params": {},
1477 "trace_results": {}
1478 }
1479 }
1480 }
1481 "#.to_owned()
1482 ))
1483 ];
1484 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1485 let client = WsDeltasClient::new_with_reconnects(
1486 &format!("ws://{addr}"),
1487 None,
1488 3,
1489 Duration::from_millis(110),
1491 )
1492 .unwrap();
1493
1494 let jh: JoinHandle<Result<(), DeltasError>> = client
1495 .connect()
1496 .await
1497 .expect("connect failed");
1498
1499 for _ in 0..2 {
1500 dbg!("loop");
1501 let (_, mut rx) = timeout(
1502 Duration::from_millis(200),
1503 client.subscribe(
1504 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1505 SubscriptionOptions::new().with_compression(false),
1506 ),
1507 )
1508 .await
1509 .expect("subscription timed out")
1510 .expect("subscription failed");
1511
1512 let _ = timeout(Duration::from_millis(100), rx.recv())
1513 .await
1514 .expect("awaiting message timeout out")
1515 .expect("receiving message failed");
1516
1517 let res = timeout(Duration::from_millis(200), rx.recv())
1519 .await
1520 .expect("awaiting closed connection timeout out");
1521 assert!(res.is_none());
1522 }
1523 let res = jh.await.expect("ws client join failed");
1524 assert!(res.is_err());
1526 server_thread
1527 .await
1528 .expect("ws server loop errored");
1529 }
1530
1531 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1532 let server = TcpListener::bind("127.0.0.1:0")
1533 .await
1534 .expect("localhost bind failed");
1535 let addr = server.local_addr().unwrap();
1536 let jh = tokio::spawn(async move {
1537 while let Ok((stream, _)) = server.accept().await {
1538 if accept_first {
1539 let stream = tokio_tungstenite::accept_async(stream)
1541 .await
1542 .unwrap();
1543 sleep(Duration::from_millis(10)).await;
1544 drop(stream)
1545 } else {
1546 drop(stream);
1548 }
1549 }
1550 });
1551 (addr, jh)
1552 }
1553
1554 #[test(tokio::test)]
1555 async fn test_subscribe_dead_client_after_max_attempts() {
1556 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1557 let client = WsDeltasClient::new_with_reconnects(
1558 &format!("ws://{addr}"),
1559 None,
1560 3,
1561 Duration::from_secs(0),
1562 )
1563 .unwrap();
1564
1565 let join_handle = client.connect().await.unwrap();
1566 let handle_res = join_handle.await.unwrap();
1567 assert!(handle_res.is_err());
1568 assert!(!client.is_connected().await);
1569
1570 let subscription_res = timeout(
1571 Duration::from_millis(10),
1572 client.subscribe(
1573 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1574 SubscriptionOptions::new(),
1575 ),
1576 )
1577 .await
1578 .unwrap();
1579 assert!(subscription_res.is_err());
1580 }
1581
1582 #[test(tokio::test)]
1583 async fn test_ws_client_retry_cooldown() {
1584 let start = std::time::Instant::now();
1585 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1586
1587 let client = WsDeltasClient::new_with_reconnects(
1589 &format!("ws://{addr}"),
1590 None,
1591 3, Duration::from_millis(50), )
1594 .unwrap();
1595
1596 let connect_result = client.connect().await;
1598 let elapsed = start.elapsed();
1599
1600 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1602
1603 assert!(
1605 elapsed >= Duration::from_millis(100),
1606 "Expected at least 100ms elapsed, got {:?}",
1607 elapsed
1608 );
1609
1610 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1612 }
1613
1614 #[test_log::test(tokio::test)]
1615 async fn test_buffer_full_triggers_unsubscribe() {
1616 let exp_comm = {
1618 [
1619 ExpectedComm::Receive(
1621 100,
1622 tungstenite::protocol::Message::Text(
1623 SUBSCRIBE
1624 .to_owned()
1625 .replace(|c: char| c.is_whitespace(), ""),
1626 ),
1627 ),
1628 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1630 SUBSCRIPTION_CONFIRMATION
1631 .to_owned()
1632 .replace(|c: char| c.is_whitespace(), ""),
1633 )),
1634 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1636 r#"
1637 {
1638 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1639 "deltas": {
1640 "extractor": "vm:ambient",
1641 "chain": "ethereum",
1642 "block": {
1643 "number": 123,
1644 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1645 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1646 "chain": "ethereum",
1647 "ts": "2023-09-14T00:00:00"
1648 },
1649 "finalized_block_height": 0,
1650 "revert": false,
1651 "new_tokens": {},
1652 "account_updates": {},
1653 "state_updates": {},
1654 "new_protocol_components": {},
1655 "deleted_protocol_components": {},
1656 "component_balances": {},
1657 "account_balances": {},
1658 "component_tvl": {},
1659 "dci_update": {
1660 "new_entrypoints": {},
1661 "new_entrypoint_params": {},
1662 "trace_results": {}
1663 }
1664 }
1665 }
1666 "#.to_owned()
1667 )),
1668 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1670 r#"
1671 {
1672 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1673 "deltas": {
1674 "extractor": "vm:ambient",
1675 "chain": "ethereum",
1676 "block": {
1677 "number": 124,
1678 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1679 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1680 "chain": "ethereum",
1681 "ts": "2023-09-14T00:00:01"
1682 },
1683 "finalized_block_height": 0,
1684 "revert": false,
1685 "new_tokens": {},
1686 "account_updates": {},
1687 "state_updates": {},
1688 "new_protocol_components": {},
1689 "deleted_protocol_components": {},
1690 "component_balances": {},
1691 "account_balances": {},
1692 "component_tvl": {},
1693 "dci_update": {
1694 "new_entrypoints": {},
1695 "new_entrypoint_params": {},
1696 "trace_results": {}
1697 }
1698 }
1699 }
1700 "#.to_owned()
1701 )),
1702 ExpectedComm::Receive(
1704 100,
1705 tungstenite::protocol::Message::Text(
1706 UNSUBSCRIBE
1707 .to_owned()
1708 .replace(|c: char| c.is_whitespace(), ""),
1709 ),
1710 ),
1711 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1713 SUBSCRIPTION_ENDED
1714 .to_owned()
1715 .replace(|c: char| c.is_whitespace(), ""),
1716 )),
1717 ]
1718 };
1719
1720 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1721
1722 let client = WsDeltasClient::new_with_custom_buffers(
1724 &format!("ws://{addr}"),
1725 None,
1726 128, 1, )
1729 .unwrap();
1730
1731 let jh = client
1732 .connect()
1733 .await
1734 .expect("connect failed");
1735
1736 let (_sub_id, mut rx) = timeout(
1737 Duration::from_millis(100),
1738 client.subscribe(
1739 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1740 SubscriptionOptions::new().with_compression(false),
1741 ),
1742 )
1743 .await
1744 .expect("subscription timed out")
1745 .expect("subscription failed");
1746
1747 tokio::time::sleep(Duration::from_millis(100)).await;
1749
1750 let mut received_msgs = Vec::new();
1752
1753 while received_msgs.len() < 3 {
1755 match timeout(Duration::from_millis(200), rx.recv()).await {
1756 Ok(Some(msg)) => {
1757 received_msgs.push(msg);
1758 }
1759 Ok(None) => {
1760 break;
1762 }
1763 Err(_) => {
1764 break;
1766 }
1767 }
1768 }
1769
1770 assert!(
1772 received_msgs.len() <= 1,
1773 "Expected buffer overflow to limit messages to at most 1, got {}",
1774 received_msgs.len()
1775 );
1776
1777 if let Some(first_msg) = received_msgs.first() {
1778 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1779 }
1780
1781 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1788
1789 jh.abort();
1791 server_thread.abort();
1792
1793 let _ = jh.await;
1794 let _ = server_thread.await;
1795 }
1796
1797 #[tokio::test]
1798 async fn test_server_error_handling() {
1799 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1800
1801 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1802
1803 let error_response = WebSocketMessage::Response(Response::Error(
1805 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1806 ));
1807 let error_json = serde_json::to_string(&error_response).unwrap();
1808
1809 let exp_comm = [
1810 ExpectedComm::Receive(
1811 100,
1812 tungstenite::protocol::Message::Text(
1813 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1814 ),
1815 ),
1816 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1817 ];
1818
1819 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1820
1821 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1822 let jh = client
1823 .connect()
1824 .await
1825 .expect("connect failed");
1826
1827 let result = timeout(
1828 Duration::from_millis(100),
1829 client.subscribe(extractor_id, SubscriptionOptions::new()),
1830 )
1831 .await
1832 .expect("subscription timed out");
1833
1834 assert!(result.is_err());
1836 if let Err(DeltasError::ServerError(msg, _)) = result {
1837 assert!(msg.contains("Subscription failed"));
1838 assert!(msg.contains("Extractor not found"));
1839 } else {
1840 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1841 }
1842
1843 timeout(Duration::from_millis(100), client.close())
1844 .await
1845 .expect("close timed out")
1846 .expect("close failed");
1847 jh.await
1848 .expect("ws loop errored")
1849 .unwrap();
1850 server_thread.await.unwrap();
1851 }
1852
1853 #[test_log::test(tokio::test)]
1854 async fn test_subscription_not_found_error() {
1855 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1857
1858 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1859 let subscription_id = Uuid::new_v4();
1860
1861 let error_response = WebSocketMessage::Response(Response::Error(
1862 WebsocketError::SubscriptionNotFound(subscription_id),
1863 ));
1864 let error_json = serde_json::to_string(&error_response).unwrap();
1865
1866 let exp_comm = [
1867 ExpectedComm::Receive(
1869 100,
1870 tungstenite::protocol::Message::Text(
1871 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1872 ),
1873 ),
1874 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1875 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1876 subscription_id
1877 ))),
1878 ExpectedComm::Receive(
1880 100,
1881 tungstenite::protocol::Message::Text(format!(
1882 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1883 subscription_id
1884 )),
1885 ),
1886 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1888 ];
1889
1890 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1891
1892 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1893 let jh = client
1894 .connect()
1895 .await
1896 .expect("connect failed");
1897
1898 let (received_sub_id, _rx) = timeout(
1900 Duration::from_millis(100),
1901 client.subscribe(extractor_id, SubscriptionOptions::new()),
1902 )
1903 .await
1904 .expect("subscription timed out")
1905 .expect("subscription failed");
1906
1907 assert_eq!(received_sub_id, subscription_id);
1908
1909 let unsubscribe_result =
1911 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1912 .await
1913 .expect("unsubscribe timed out");
1914
1915 unsubscribe_result
1919 .expect("Unsubscribe should succeed even if server says subscription not found");
1920
1921 timeout(Duration::from_millis(100), client.close())
1922 .await
1923 .expect("close timed out")
1924 .expect("close failed");
1925 jh.await
1926 .expect("ws loop errored")
1927 .unwrap();
1928 server_thread.await.unwrap();
1929 }
1930
1931 #[test_log::test(tokio::test)]
1932 async fn test_parse_error_handling() {
1933 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1934
1935 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1936 let error_response = WebSocketMessage::Response(Response::Error(
1937 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1938 ));
1939 let error_json = serde_json::to_string(&error_response).unwrap();
1940
1941 let exp_comm = [
1942 ExpectedComm::Receive(
1944 100,
1945 tungstenite::protocol::Message::Text(
1946 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1947 ),
1948 ),
1949 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1950 ];
1951
1952 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1953
1954 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1955 let jh = client
1956 .connect()
1957 .await
1958 .expect("connect failed");
1959
1960 let _ = timeout(
1962 Duration::from_millis(100),
1963 client.subscribe(extractor_id, SubscriptionOptions::new()),
1964 )
1965 .await
1966 .expect("subscription timed out");
1967
1968 let result = jh
1970 .await
1971 .expect("ws loop should complete");
1972 assert!(result.is_err());
1973 if let Err(DeltasError::ServerError(message, _)) = result {
1974 assert!(message.contains("Server failed to parse client message"));
1975 } else {
1976 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1977 }
1978
1979 server_thread.await.unwrap();
1980 }
1981
1982 #[test_log::test(tokio::test)]
1983 async fn test_compression_error_handling() {
1984 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1985
1986 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1987 let subscription_id = Uuid::new_v4();
1988 let error_response = WebSocketMessage::Response(Response::Error(
1989 WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1990 ));
1991 let error_json = serde_json::to_string(&error_response).unwrap();
1992
1993 let exp_comm = [
1994 ExpectedComm::Receive(
1996 100,
1997 tungstenite::protocol::Message::Text(
1998 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1999 ),
2000 ),
2001 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
2002 ];
2003
2004 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2005
2006 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2007 let jh = client
2008 .connect()
2009 .await
2010 .expect("connect failed");
2011
2012 let _ = timeout(
2014 Duration::from_millis(100),
2015 client.subscribe(extractor_id, SubscriptionOptions::new()),
2016 )
2017 .await
2018 .expect("subscription timed out");
2019
2020 let result = jh
2022 .await
2023 .expect("ws loop should complete");
2024 assert!(result.is_err());
2025 if let Err(DeltasError::ServerError(message, _)) = result {
2026 assert!(message.contains("Server failed to compress message for subscription"));
2027 } else {
2028 panic!("Expected DeltasError::ServerError, got: {:?}", result);
2029 }
2030
2031 server_thread.await.unwrap();
2032 }
2033
2034 #[tokio::test]
2035 async fn test_subscribe_error_handling() {
2036 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2037
2038 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
2039
2040 let error_response = WebSocketMessage::Response(Response::Error(
2041 WebsocketError::SubscribeError(extractor_id.clone()),
2042 ));
2043 let error_json = serde_json::to_string(&error_response).unwrap();
2044
2045 let exp_comm = [
2046 ExpectedComm::Receive(
2047 100,
2048 tungstenite::protocol::Message::Text(
2049 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true,"compression":true}"#.to_string()
2050 ),
2051 ),
2052 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2053 ];
2054
2055 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2056
2057 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2058 let jh = client
2059 .connect()
2060 .await
2061 .expect("connect failed");
2062
2063 let result = timeout(
2064 Duration::from_millis(100),
2065 client.subscribe(extractor_id, SubscriptionOptions::new()),
2066 )
2067 .await
2068 .expect("subscription timed out");
2069
2070 assert!(result.is_err());
2072 if let Err(DeltasError::ServerError(msg, _)) = result {
2073 assert!(msg.contains("Subscription failed"));
2074 assert!(msg.contains("Failed to subscribe to extractor"));
2075 } else {
2076 panic!("Expected DeltasError::ServerError, got: {:?}", result);
2077 }
2078
2079 timeout(Duration::from_millis(100), client.close())
2080 .await
2081 .expect("close timed out")
2082 .expect("close failed");
2083 jh.await
2084 .expect("ws loop errored")
2085 .unwrap();
2086 server_thread.await.unwrap();
2087 }
2088
2089 #[tokio::test]
2090 async fn test_cancel_pending_subscription() {
2091 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2093
2094 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
2095
2096 let error_response = WebSocketMessage::Response(Response::Error(
2097 WebsocketError::ExtractorNotFound(extractor_id.clone()),
2098 ));
2099 let error_json = serde_json::to_string(&error_response).unwrap();
2100
2101 let exp_comm = [
2102 ExpectedComm::Receive(
2103 100,
2104 tungstenite::protocol::Message::Text(
2105 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
2106 ),
2107 ),
2108 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2109 ];
2110
2111 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2112
2113 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2114 let jh = client
2115 .connect()
2116 .await
2117 .expect("connect failed");
2118
2119 let client_clone = client.clone();
2121 let extractor_id_clone = extractor_id.clone();
2122
2123 let subscription1 = tokio::spawn({
2124 let client_for_spawn = client.clone();
2125 async move {
2126 client_for_spawn
2127 .subscribe(extractor_id, SubscriptionOptions::new())
2128 .await
2129 }
2130 });
2131
2132 let subscription2 = tokio::spawn(async move {
2133 client_clone
2135 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2136 .await
2137 });
2138
2139 let (result1, result2) = tokio::join!(subscription1, subscription2);
2140
2141 let result1 = result1.unwrap();
2142 let result2 = result2.unwrap();
2143
2144 assert!(result1.is_err() || result2.is_err());
2147
2148 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2149 } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2151 } else {
2153 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2154 }
2155
2156 timeout(Duration::from_millis(100), client.close())
2157 .await
2158 .expect("close timed out")
2159 .expect("close failed");
2160 jh.await
2161 .expect("ws loop errored")
2162 .unwrap();
2163 server_thread.await.unwrap();
2164 }
2165
2166 #[tokio::test]
2167 async fn test_force_unsubscribe_prevents_multiple_calls() {
2168 let subscription_id = Uuid::new_v4();
2172
2173 let exp_comm = [
2174 ExpectedComm::Receive(
2175 100,
2176 tungstenite::protocol::Message::Text(
2177 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true,"compression":true}"#.to_string()
2178 ),
2179 ),
2180 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2181 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2182 subscription_id
2183 ))),
2184 ExpectedComm::Receive(
2186 100,
2187 tungstenite::protocol::Message::Text(format!(
2188 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2189 subscription_id
2190 )),
2191 ),
2192 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2193 r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2194 subscription_id
2195 ))),
2196 ];
2197
2198 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2199
2200 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2201 let jh = client
2202 .connect()
2203 .await
2204 .expect("connect failed");
2205
2206 let (received_sub_id, _rx) = timeout(
2207 Duration::from_millis(100),
2208 client.subscribe(
2209 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2210 SubscriptionOptions::new(),
2211 ),
2212 )
2213 .await
2214 .expect("subscription timed out")
2215 .expect("subscription failed");
2216
2217 assert_eq!(received_sub_id, subscription_id);
2218
2219 {
2221 let mut inner_guard = client.inner.lock().await;
2222 let inner = inner_guard
2223 .as_mut()
2224 .expect("client should be connected");
2225
2226 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2228 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2229 }
2230
2231 tokio::time::sleep(Duration::from_millis(50)).await;
2233
2234 let _ = timeout(Duration::from_millis(100), client.close()).await;
2236
2237 let _ = jh.await;
2239 let _ = server_thread.await;
2240 }
2241}