1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62 BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70 #[error("Failed to parse URI: {0}. Error: {1}")]
72 UriParsing(String, String),
73
74 #[error("The requested subscription is already pending")]
76 SubscriptionAlreadyPending,
77
78 #[error("The server replied with an error: {0}")]
79 ServerError(String, #[source] WebsocketError),
80
81 #[error("{0}")]
84 TransportError(String),
85
86 #[error("The buffer is full!")]
90 BufferFull,
91
92 #[error("The client is not connected!")]
96 NotConnected,
97
98 #[error("The client is already connected!")]
100 AlreadyConnected,
101
102 #[error("The server closed the connection!")]
104 ConnectionClosed,
105
106 #[error("Connection error: {0}")]
108 ConnectionError(#[from] Box<tungstenite::Error>),
109
110 #[error("Tycho FatalError: {0}")]
112 Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117 include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121 fn default() -> Self {
122 Self { include_state: true }
123 }
124}
125
126impl SubscriptionOptions {
127 pub fn new() -> Self {
128 Self::default()
129 }
130 pub fn with_state(mut self, val: bool) -> Self {
131 self.include_state = val;
132 self
133 }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139 async fn subscribe(
146 &self,
147 extractor_id: ExtractorIdentity,
148 options: SubscriptionOptions,
149 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157 async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163 uri: Uri,
165 auth_key: Option<String>,
167 max_reconnects: u64,
169 retry_cooldown: Duration,
171 ws_buffer_size: usize,
174 subscription_buffer_size: usize,
177 conn_notify: Arc<Notify>,
179 inner: Arc<Mutex<Option<Inner>>>,
181 dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188#[derive(Debug)]
200enum SubscriptionInfo {
201 RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203 Active,
205 RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209struct Inner {
211 sink: WebSocketSink,
213 cmd_tx: Sender<()>,
215 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217 subscriptions: HashMap<Uuid, SubscriptionInfo>,
219 sender: HashMap<Uuid, Sender<BlockChanges>>,
222 buffer_size: usize,
224}
225
226impl Inner {
230 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231 Self {
232 sink,
233 cmd_tx,
234 pending: HashMap::new(),
235 subscriptions: HashMap::new(),
236 sender: HashMap::new(),
237 buffer_size,
238 }
239 }
240
241 #[allow(clippy::result_large_err)]
243 fn new_subscription(
244 &mut self,
245 id: &ExtractorIdentity,
246 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247 ) -> Result<(), DeltasError> {
248 if self.pending.contains_key(id) {
249 return Err(DeltasError::SubscriptionAlreadyPending);
250 }
251 self.pending
252 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253 Ok(())
254 }
255
256 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260 if let Some(info) = self.pending.remove(extractor_id) {
261 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262 let (tx, rx) = mpsc::channel(self.buffer_size);
263 self.sender.insert(subscription_id, tx);
264 self.subscriptions
265 .insert(subscription_id, SubscriptionInfo::Active);
266 let _ = ready_tx
267 .send(Ok((subscription_id, rx)))
268 .map_err(|_| {
269 warn!(
270 ?extractor_id,
271 ?subscription_id,
272 "Subscriber for has gone away. Ignoring."
273 )
274 });
275 } else {
276 error!(
277 ?extractor_id,
278 ?subscription_id,
279 "Pending subscription was not in the correct state to
280 transition to active. Ignoring!"
281 )
282 }
283 } else {
284 error!(
285 ?extractor_id,
286 ?subscription_id,
287 "Tried to mark an unknown subscription as active. Ignoring!"
288 );
289 }
290 }
291
292 #[allow(clippy::result_large_err)]
294 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295 if let Some(sender) = self.sender.get_mut(id) {
296 sender
297 .try_send(msg)
298 .map_err(|e| match e {
299 TrySendError::Full(_) => DeltasError::BufferFull,
300 TrySendError::Closed(_) => {
301 DeltasError::TransportError("The subscriber has gone away".to_string())
302 }
303 })?;
304 }
305 Ok(())
306 }
307
308 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313 if let Some(info) = self
314 .subscriptions
315 .get_mut(subscription_id)
316 {
317 if let SubscriptionInfo::Active = info {
318 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319 }
320 } else {
321 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323 }
324 }
325
326 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333 if let Entry::Occupied(e) = self
334 .subscriptions
335 .entry(subscription_id)
336 {
337 let info = e.remove();
338 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339 let _ = tx.send(()).map_err(|_| {
340 debug!(?subscription_id, "failed to notify about removed subscription")
341 });
342 self.sender
343 .remove(&subscription_id)
344 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345 } else {
346 warn!(?subscription_id, "Subscription ended unexpectedly!");
347 self.sender
348 .remove(&subscription_id)
349 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350 }
351 } else {
352 trace!(
357 ?subscription_id,
358 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
359 );
360 }
361
362 Ok(())
363 }
364
365 fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
366 if let Some(sub_info) = self.pending.remove(extractor_id) {
367 match sub_info {
368 SubscriptionInfo::RequestedSubscription(tx) => {
369 let _ = tx
370 .send(Err(DeltasError::ServerError(
371 format!("Subscription failed: {error}"),
372 error.clone(),
373 )))
374 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
375 }
376 _ => {
377 error!(?extractor_id, "Pending subscription in wrong state")
378 }
379 }
380 } else {
381 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
382 }
383 }
384
385 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
387 self.sink.send(msg).await.map_err(|e| {
388 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
389 })
390 }
391}
392
393impl WsDeltasClient {
395 #[allow(clippy::result_large_err)]
397 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
398 let uri = ws_uri
399 .parse::<Uri>()
400 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
401 Ok(Self {
402 uri,
403 auth_key: auth_key.map(|s| s.to_string()),
404 inner: Arc::new(Mutex::new(None)),
405 ws_buffer_size: 128,
406 subscription_buffer_size: 128,
407 conn_notify: Arc::new(Notify::new()),
408 max_reconnects: 5,
409 retry_cooldown: Duration::from_millis(500),
410 dead: Arc::new(AtomicBool::new(false)),
411 })
412 }
413
414 #[allow(clippy::result_large_err)]
416 pub fn new_with_reconnects(
417 ws_uri: &str,
418 auth_key: Option<&str>,
419 max_reconnects: u64,
420 retry_cooldown: Duration,
421 ) -> Result<Self, DeltasError> {
422 let uri = ws_uri
423 .parse::<Uri>()
424 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
425
426 Ok(Self {
427 uri,
428 auth_key: auth_key.map(|s| s.to_string()),
429 inner: Arc::new(Mutex::new(None)),
430 ws_buffer_size: 128,
431 subscription_buffer_size: 128,
432 conn_notify: Arc::new(Notify::new()),
433 max_reconnects,
434 retry_cooldown,
435 dead: Arc::new(AtomicBool::new(false)),
436 })
437 }
438
439 #[cfg(test)]
441 #[allow(clippy::result_large_err)]
442 pub fn new_with_custom_buffers(
443 ws_uri: &str,
444 auth_key: Option<&str>,
445 ws_buffer_size: usize,
446 subscription_buffer_size: usize,
447 ) -> Result<Self, DeltasError> {
448 let uri = ws_uri
449 .parse::<Uri>()
450 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
451 Ok(Self {
452 uri,
453 auth_key: auth_key.map(|s| s.to_string()),
454 inner: Arc::new(Mutex::new(None)),
455 ws_buffer_size,
456 subscription_buffer_size,
457 conn_notify: Arc::new(Notify::new()),
458 max_reconnects: 5,
459 retry_cooldown: Duration::from_millis(0),
460 dead: Arc::new(AtomicBool::new(false)),
461 })
462 }
463
464 async fn is_connected(&self) -> bool {
468 let guard = self.inner.as_ref().lock().await;
469 guard.is_some()
470 }
471
472 async fn ensure_connection(&self) -> Result<(), DeltasError> {
477 if self.dead.load(Ordering::SeqCst) {
478 return Err(DeltasError::NotConnected)
479 };
480 if !self.is_connected().await {
481 self.conn_notify.notified().await;
482 };
483 Ok(())
484 }
485
486 #[instrument(skip(self, msg))]
491 async fn handle_msg(
492 &self,
493 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
494 ) -> Result<(), DeltasError> {
495 let mut guard = self.inner.lock().await;
496
497 match msg {
498 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
503 serde_json::Value,
504 >(&text)
505 {
506 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
507 Ok(ws_message) => match ws_message {
508 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
509 trace!(?deltas, "Received a block state change, sending to channel");
510 let inner = guard
511 .as_mut()
512 .ok_or_else(|| DeltasError::NotConnected)?;
513 match inner.send(&subscription_id, deltas) {
514 Err(DeltasError::BufferFull) => {
515 error!(?subscription_id, "Buffer full, unsubscribing!");
516 Self::force_unsubscribe(subscription_id, inner).await;
517 }
518 Err(_) => {
519 warn!(
520 ?subscription_id,
521 "Receiver for has gone away, unsubscribing!"
522 );
523 Self::force_unsubscribe(subscription_id, inner).await;
524 }
525 _ => { }
526 }
527 }
528 WebSocketMessage::Response(Response::NewSubscription {
529 extractor_id,
530 subscription_id,
531 }) => {
532 info!(?extractor_id, ?subscription_id, "Received a new subscription");
533 let inner = guard
534 .as_mut()
535 .ok_or_else(|| DeltasError::NotConnected)?;
536 inner.mark_active(&extractor_id, subscription_id);
537 }
538 WebSocketMessage::Response(Response::SubscriptionEnded {
539 subscription_id,
540 }) => {
541 info!(?subscription_id, "Received a subscription ended");
542 let inner = guard
543 .as_mut()
544 .ok_or_else(|| DeltasError::NotConnected)?;
545 inner.remove_subscription(subscription_id)?;
546 }
547 WebSocketMessage::Response(Response::Error(error)) => match &error {
548 WebsocketError::ExtractorNotFound(extractor_id) => {
549 let inner = guard
550 .as_mut()
551 .ok_or_else(|| DeltasError::NotConnected)?;
552 inner.cancel_pending(extractor_id, &error);
553 }
554 WebsocketError::SubscriptionNotFound(subscription_id) => {
555 debug!("Received subscription not found, removing subscription");
556 let inner = guard
557 .as_mut()
558 .ok_or_else(|| DeltasError::NotConnected)?;
559 inner.remove_subscription(*subscription_id)?;
560 }
561 WebsocketError::ParseError(raw, e) => {
562 return Err(DeltasError::ServerError(
563 format!(
564 "Server failed to parse client message: {e}, msg: {raw}"
565 ),
566 error.clone(),
567 ))
568 }
569 WebsocketError::SubscribeError(extractor_id) => {
570 let inner = guard
571 .as_mut()
572 .ok_or_else(|| DeltasError::NotConnected)?;
573 inner.cancel_pending(extractor_id, &error);
574 }
575 },
576 },
577 Err(e) => {
578 error!(
579 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
580 e, text
581 );
582 }
583 },
584 Err(e) => {
585 error!(
586 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
587 e, text
588 );
589 }
590 },
591 Ok(tungstenite::protocol::Message::Ping(_)) => {
592 let inner = guard
594 .as_mut()
595 .ok_or_else(|| DeltasError::NotConnected)?;
596 if let Err(error) = inner
597 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
598 .await
599 {
600 debug!(?error, "Failed to send pong!");
601 }
602 }
603 Ok(tungstenite::protocol::Message::Pong(_)) => {
604 }
606 Ok(tungstenite::protocol::Message::Close(_)) => {
607 return Err(DeltasError::ConnectionClosed);
608 }
609 Ok(unknown_msg) => {
610 info!("Received an unknown message type: {:?}", unknown_msg);
611 }
612 Err(error) => {
613 error!(?error, "Websocket error");
614 return Err(match error {
615 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
616 tungstenite::Error::AlreadyClosed => {
617 warn!("Received AlreadyClosed error which is indicative of a bug!");
618 DeltasError::ConnectionError(Box::new(error))
619 }
620 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
621 DeltasError::ConnectionError(Box::new(error))
622 }
623 _ => DeltasError::Fatal(error.to_string()),
624 });
625 }
626 };
627 Ok(())
628 }
629
630 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
635 if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
637 .subscriptions
638 .get(&subscription_id)
639 {
640 return
641 }
642
643 let (tx, rx) = oneshot::channel();
644 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
645 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
646 } else {
647 match tokio::time::timeout(Duration::from_secs(5), rx).await {
649 Ok(_) => {
650 debug!(?subscription_id, "Unsubscribe completed successfully");
651 }
652 Err(_) => {
653 warn!(?subscription_id, "Unsubscribe completion timed out");
654 }
655 }
656 }
657 }
658
659 async fn unsubscribe_inner(
665 inner: &mut Inner,
666 subscription_id: Uuid,
667 ready_tx: oneshot::Sender<()>,
668 ) -> Result<(), DeltasError> {
669 debug!(?subscription_id, "Unsubscribing");
670 inner.end_subscription(&subscription_id, ready_tx);
671 let cmd = Command::Unsubscribe { subscription_id };
672 inner
673 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
674 |e| {
675 DeltasError::TransportError(format!(
676 "Failed to serialize unsubscribe command: {e}"
677 ))
678 },
679 )?))
680 .await?;
681 Ok(())
682 }
683}
684
685#[async_trait]
686impl DeltasClient for WsDeltasClient {
687 #[instrument(skip(self))]
688 async fn subscribe(
689 &self,
690 extractor_id: ExtractorIdentity,
691 options: SubscriptionOptions,
692 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
693 trace!("Starting subscribe");
694 self.ensure_connection().await?;
695 let (ready_tx, ready_rx) = oneshot::channel();
696 {
697 let mut guard = self.inner.lock().await;
698 let inner = guard
699 .as_mut()
700 .ok_or_else(|| DeltasError::NotConnected)?;
701 trace!("Sending subscribe command");
702 inner.new_subscription(&extractor_id, ready_tx)?;
703 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
704 inner
705 .ws_send(tungstenite::protocol::Message::Text(
706 serde_json::to_string(&cmd).map_err(|e| {
707 DeltasError::TransportError(format!(
708 "Failed to serialize subscribe command: {e}"
709 ))
710 })?,
711 ))
712 .await?;
713 }
714 trace!("Waiting for subscription response");
715 let res = ready_rx.await.map_err(|_| {
716 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
717 })??;
718 trace!("Subscription successful");
719 Ok(res)
720 }
721
722 #[instrument(skip(self))]
723 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
724 self.ensure_connection().await?;
725 let (ready_tx, ready_rx) = oneshot::channel();
726 {
727 let mut guard = self.inner.lock().await;
728 let inner = guard
729 .as_mut()
730 .ok_or_else(|| DeltasError::NotConnected)?;
731
732 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
733 }
734 ready_rx.await.map_err(|_| {
735 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
736 })?;
737
738 Ok(())
739 }
740
741 #[instrument(skip(self))]
742 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
743 if self.is_connected().await {
744 return Err(DeltasError::AlreadyConnected);
745 }
746 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
747 info!(?ws_uri, "Starting TychoWebsocketClient");
748
749 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
750 {
751 let mut guard = self.inner.as_ref().lock().await;
752 *guard = None;
753 }
754 let this = self.clone();
755 let jh = tokio::spawn(async move {
756 let mut retry_count = 0;
757 let mut result = Err(DeltasError::NotConnected);
758
759 'retry: while retry_count < this.max_reconnects {
760 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
761 if retry_count > 0 {
762 sleep(this.retry_cooldown).await;
763 }
764
765 let mut request_builder = Request::builder()
767 .uri(&ws_uri)
768 .header(SEC_WEBSOCKET_KEY, generate_key())
769 .header(SEC_WEBSOCKET_VERSION, 13)
770 .header(CONNECTION, "Upgrade")
771 .header(UPGRADE, "websocket")
772 .header(
773 HOST,
774 this.uri.host().ok_or_else(|| {
775 DeltasError::UriParsing(
776 ws_uri.clone(),
777 "No host found in tycho url".to_string(),
778 )
779 })?,
780 )
781 .header(
782 USER_AGENT,
783 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
784 );
785
786 if let Some(ref key) = this.auth_key {
788 request_builder = request_builder.header(AUTHORIZATION, key);
789 }
790
791 let request = request_builder.body(()).map_err(|e| {
792 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
793 })?;
794 let (conn, _) = match connect_async(request).await {
795 Ok(conn) => conn,
796 Err(e) => {
797 retry_count += 1;
799 let mut guard = this.inner.as_ref().lock().await;
800 *guard = None;
801
802 warn!(
803 e = e.to_string(),
804 "Failed to connect to WebSocket server; Reconnecting"
805 );
806 continue 'retry;
807 }
808 };
809
810 let (ws_tx_new, ws_rx_new) = conn.split();
811 {
812 let mut guard = this.inner.as_ref().lock().await;
813 *guard =
814 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
815 }
816 let mut msg_rx = ws_rx_new.boxed();
817
818 info!("Connection Successful: TychoWebsocketClient started");
819 this.conn_notify.notify_waiters();
820 result = Ok(());
821
822 loop {
823 let res = tokio::select! {
824 msg = msg_rx.next() => match msg {
825 Some(msg) => this.handle_msg(msg).await,
826 None => {
827 warn!("Websocket connection silently closed, giving up!");
831 break 'retry
832 }
833 },
834 _ = cmd_rx.recv() => {break 'retry},
835 };
836 if let Err(error) = res {
837 debug!(?error, "WsError");
838 if matches!(
839 error,
840 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
841 ) {
842 retry_count += 1;
844 let mut guard = this.inner.as_ref().lock().await;
845 *guard = None;
846
847 warn!(
848 ?error,
849 ?retry_count,
850 "Connection dropped unexpectedly; Reconnecting..."
851 );
852 break;
853 } else {
854 error!(?error, "Fatal error; Exiting");
856 result = Err(error);
857 break 'retry;
858 }
859 }
860 }
861 }
862 debug!(
863 retry_count,
864 max_reconnects=?this.max_reconnects,
865 "Reconnection loop ended"
866 );
867 let mut guard = this.inner.as_ref().lock().await;
869 *guard = None;
870
871 if retry_count >= this.max_reconnects {
873 error!("Max reconnection attempts reached; Exiting");
874 this.dead.store(true, Ordering::SeqCst);
875 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
877 }
878
879 result
880 });
881
882 self.conn_notify.notified().await;
883
884 if self.is_connected().await {
885 Ok(jh)
886 } else {
887 Err(DeltasError::NotConnected)
888 }
889 }
890
891 #[instrument(skip(self))]
892 async fn close(&self) -> Result<(), DeltasError> {
893 info!("Closing TychoWebsocketClient");
894 let mut guard = self.inner.lock().await;
895 let inner = guard
896 .as_mut()
897 .ok_or_else(|| DeltasError::NotConnected)?;
898 inner
899 .cmd_tx
900 .send(())
901 .await
902 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
903 Ok(())
904 }
905}
906
907#[cfg(test)]
908mod tests {
909 use std::net::SocketAddr;
910
911 use test_log::test;
912 use tokio::{net::TcpListener, time::timeout};
913 use tycho_common::dto::Chain;
914
915 use super::*;
916
917 #[derive(Clone)]
918 enum ExpectedComm {
919 Receive(u64, tungstenite::protocol::Message),
920 Send(tungstenite::protocol::Message),
921 }
922
923 async fn mock_tycho_ws(
924 messages: &[ExpectedComm],
925 reconnects: usize,
926 ) -> (SocketAddr, JoinHandle<()>) {
927 info!("Starting mock webserver");
928 let server = TcpListener::bind("127.0.0.1:0")
930 .await
931 .expect("localhost bind failed");
932 let addr = server.local_addr().unwrap();
933 let messages = messages.to_vec();
934
935 let jh = tokio::spawn(async move {
936 info!("mock webserver started");
937 for _ in 0..(reconnects + 1) {
938 info!("Awaiting client connections");
939 if let Ok((stream, _)) = server.accept().await {
940 info!("Client connected");
941 let mut websocket = tokio_tungstenite::accept_async(stream)
942 .await
943 .unwrap();
944
945 info!("Handling messages..");
946 for c in messages.iter().cloned() {
947 match c {
948 ExpectedComm::Receive(t, exp) => {
949 info!("Awaiting message...");
950 let msg = timeout(Duration::from_millis(t), websocket.next())
951 .await
952 .expect("Receive timeout")
953 .expect("Stream exhausted")
954 .expect("Failed to receive message.");
955 info!("Message received");
956 assert_eq!(msg, exp)
957 }
958 ExpectedComm::Send(data) => {
959 info!("Sending message");
960 websocket
961 .send(data)
962 .await
963 .expect("Failed to send message");
964 info!("Message sent");
965 }
966 };
967 }
968 info!("Mock communication completed");
969 sleep(Duration::from_millis(100)).await;
970 let _ = websocket.close(None).await;
972 info!("Mock server closed connection");
973 }
974 }
975 info!("mock server ended");
976 });
977 (addr, jh)
978 }
979
980 #[tokio::test]
981 async fn test_subscribe_receive() {
982 let exp_comm = [
983 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
984 {
985 "method":"subscribe",
986 "extractor_id":{
987 "chain":"ethereum",
988 "name":"vm:ambient"
989 },
990 "include_state": true
991 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
992 )),
993 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
994 {
995 "method":"newsubscription",
996 "extractor_id":{
997 "chain":"ethereum",
998 "name":"vm:ambient"
999 },
1000 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1001 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1002 )),
1003 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1004 {
1005 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1006 "deltas": {
1007 "extractor": "vm:ambient",
1008 "chain": "ethereum",
1009 "block": {
1010 "number": 123,
1011 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1012 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1013 "chain": "ethereum",
1014 "ts": "2023-09-14T00:00:00"
1015 },
1016 "finalized_block_height": 0,
1017 "revert": false,
1018 "new_tokens": {},
1019 "account_updates": {
1020 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1021 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1022 "chain": "ethereum",
1023 "slots": {},
1024 "balance": "0x01f4",
1025 "code": "",
1026 "change": "Update"
1027 }
1028 },
1029 "state_updates": {
1030 "component_1": {
1031 "component_id": "component_1",
1032 "updated_attributes": {"attr1": "0x01"},
1033 "deleted_attributes": ["attr2"]
1034 }
1035 },
1036 "new_protocol_components":
1037 { "protocol_1": {
1038 "id": "protocol_1",
1039 "protocol_system": "system_1",
1040 "protocol_type_name": "type_1",
1041 "chain": "ethereum",
1042 "tokens": ["0x01", "0x02"],
1043 "contract_ids": ["0x01", "0x02"],
1044 "static_attributes": {"attr1": "0x01f4"},
1045 "change": "Update",
1046 "creation_tx": "0x01",
1047 "created_at": "2023-09-14T00:00:00"
1048 }
1049 },
1050 "deleted_protocol_components": {},
1051 "component_balances": {
1052 "protocol_1":
1053 {
1054 "0x01": {
1055 "token": "0x01",
1056 "balance": "0x01f4",
1057 "balance_float": 0.0,
1058 "modify_tx": "0x01",
1059 "component_id": "protocol_1"
1060 }
1061 }
1062 },
1063 "account_balances": {
1064 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1065 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1066 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1067 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1068 "balance": "0x01f4",
1069 "modify_tx": "0x01"
1070 }
1071 }
1072 },
1073 "component_tvl": {
1074 "protocol_1": 1000.0
1075 },
1076 "dci_update": {
1077 "new_entrypoints": {},
1078 "new_entrypoint_params": {},
1079 "trace_results": {}
1080 }
1081 }
1082 }
1083 "#.to_owned()
1084 ))
1085 ];
1086 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1087
1088 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1089 let jh = client
1090 .connect()
1091 .await
1092 .expect("connect failed");
1093 let (_, mut rx) = timeout(
1094 Duration::from_millis(100),
1095 client.subscribe(
1096 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1097 SubscriptionOptions::new(),
1098 ),
1099 )
1100 .await
1101 .expect("subscription timed out")
1102 .expect("subscription failed");
1103 let _ = timeout(Duration::from_millis(100), rx.recv())
1104 .await
1105 .expect("awaiting message timeout out")
1106 .expect("receiving message failed");
1107 timeout(Duration::from_millis(100), client.close())
1108 .await
1109 .expect("close timed out")
1110 .expect("close failed");
1111 jh.await
1112 .expect("ws loop errored")
1113 .unwrap();
1114 server_thread.await.unwrap();
1115 }
1116
1117 #[tokio::test]
1118 async fn test_unsubscribe() {
1119 let exp_comm = [
1120 ExpectedComm::Receive(
1121 100,
1122 tungstenite::protocol::Message::Text(
1123 r#"
1124 {
1125 "method": "subscribe",
1126 "extractor_id":{
1127 "chain": "ethereum",
1128 "name": "vm:ambient"
1129 },
1130 "include_state": true
1131 }"#
1132 .to_owned()
1133 .replace(|c: char| c.is_whitespace(), ""),
1134 ),
1135 ),
1136 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1137 r#"
1138 {
1139 "method": "newsubscription",
1140 "extractor_id":{
1141 "chain": "ethereum",
1142 "name": "vm:ambient"
1143 },
1144 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1145 }"#
1146 .to_owned()
1147 .replace(|c: char| c.is_whitespace(), ""),
1148 )),
1149 ExpectedComm::Receive(
1150 100,
1151 tungstenite::protocol::Message::Text(
1152 r#"
1153 {
1154 "method": "unsubscribe",
1155 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1156 }
1157 "#
1158 .to_owned()
1159 .replace(|c: char| c.is_whitespace(), ""),
1160 ),
1161 ),
1162 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1163 r#"
1164 {
1165 "method": "subscriptionended",
1166 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1167 }
1168 "#
1169 .to_owned()
1170 .replace(|c: char| c.is_whitespace(), ""),
1171 )),
1172 ];
1173 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1174
1175 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1176 let jh = client
1177 .connect()
1178 .await
1179 .expect("connect failed");
1180 let (sub_id, mut rx) = timeout(
1181 Duration::from_millis(100),
1182 client.subscribe(
1183 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1184 SubscriptionOptions::new(),
1185 ),
1186 )
1187 .await
1188 .expect("subscription timed out")
1189 .expect("subscription failed");
1190
1191 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1192 .await
1193 .expect("unsubscribe timed out")
1194 .expect("unsubscribe failed");
1195 let res = timeout(Duration::from_millis(100), rx.recv())
1196 .await
1197 .expect("awaiting message timeout out");
1198
1199 assert!(res.is_none());
1201
1202 timeout(Duration::from_millis(100), client.close())
1203 .await
1204 .expect("close timed out")
1205 .expect("close failed");
1206 jh.await
1207 .expect("ws loop errored")
1208 .unwrap();
1209 server_thread.await.unwrap();
1210 }
1211
1212 #[tokio::test]
1213 async fn test_subscription_unexpected_end() {
1214 let exp_comm = [
1215 ExpectedComm::Receive(
1216 100,
1217 tungstenite::protocol::Message::Text(
1218 r#"
1219 {
1220 "method":"subscribe",
1221 "extractor_id":{
1222 "chain":"ethereum",
1223 "name":"vm:ambient"
1224 },
1225 "include_state": true
1226 }"#
1227 .to_owned()
1228 .replace(|c: char| c.is_whitespace(), ""),
1229 ),
1230 ),
1231 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1232 r#"
1233 {
1234 "method":"newsubscription",
1235 "extractor_id":{
1236 "chain":"ethereum",
1237 "name":"vm:ambient"
1238 },
1239 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1240 }"#
1241 .to_owned()
1242 .replace(|c: char| c.is_whitespace(), ""),
1243 )),
1244 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1245 r#"
1246 {
1247 "method": "subscriptionended",
1248 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1249 }"#
1250 .to_owned()
1251 .replace(|c: char| c.is_whitespace(), ""),
1252 )),
1253 ];
1254 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1255
1256 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1257 let jh = client
1258 .connect()
1259 .await
1260 .expect("connect failed");
1261 let (_, mut rx) = timeout(
1262 Duration::from_millis(100),
1263 client.subscribe(
1264 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1265 SubscriptionOptions::new(),
1266 ),
1267 )
1268 .await
1269 .expect("subscription timed out")
1270 .expect("subscription failed");
1271 let res = timeout(Duration::from_millis(100), rx.recv())
1272 .await
1273 .expect("awaiting message timeout out");
1274
1275 assert!(res.is_none());
1277
1278 timeout(Duration::from_millis(100), client.close())
1279 .await
1280 .expect("close timed out")
1281 .expect("close failed");
1282 jh.await
1283 .expect("ws loop errored")
1284 .unwrap();
1285 server_thread.await.unwrap();
1286 }
1287
1288 #[test_log::test(tokio::test)]
1289 async fn test_reconnect() {
1290 let exp_comm = [
1291 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1292 {
1293 "method":"subscribe",
1294 "extractor_id":{
1295 "chain":"ethereum",
1296 "name":"vm:ambient"
1297 },
1298 "include_state": true
1299 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1300 )),
1301 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1302 {
1303 "method":"newsubscription",
1304 "extractor_id":{
1305 "chain":"ethereum",
1306 "name":"vm:ambient"
1307 },
1308 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1309 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1310 )),
1311 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1312 {
1313 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1314 "deltas": {
1315 "extractor": "vm:ambient",
1316 "chain": "ethereum",
1317 "block": {
1318 "number": 123,
1319 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1320 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1321 "chain": "ethereum",
1322 "ts": "2023-09-14T00:00:00"
1323 },
1324 "finalized_block_height": 0,
1325 "revert": false,
1326 "new_tokens": {},
1327 "account_updates": {
1328 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1329 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1330 "chain": "ethereum",
1331 "slots": {},
1332 "balance": "0x01f4",
1333 "code": "",
1334 "change": "Update"
1335 }
1336 },
1337 "state_updates": {
1338 "component_1": {
1339 "component_id": "component_1",
1340 "updated_attributes": {"attr1": "0x01"},
1341 "deleted_attributes": ["attr2"]
1342 }
1343 },
1344 "new_protocol_components": {
1345 "protocol_1":
1346 {
1347 "id": "protocol_1",
1348 "protocol_system": "system_1",
1349 "protocol_type_name": "type_1",
1350 "chain": "ethereum",
1351 "tokens": ["0x01", "0x02"],
1352 "contract_ids": ["0x01", "0x02"],
1353 "static_attributes": {"attr1": "0x01f4"},
1354 "change": "Update",
1355 "creation_tx": "0x01",
1356 "created_at": "2023-09-14T00:00:00"
1357 }
1358 },
1359 "deleted_protocol_components": {},
1360 "component_balances": {
1361 "protocol_1": {
1362 "0x01": {
1363 "token": "0x01",
1364 "balance": "0x01f4",
1365 "balance_float": 1000.0,
1366 "modify_tx": "0x01",
1367 "component_id": "protocol_1"
1368 }
1369 }
1370 },
1371 "account_balances": {
1372 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1373 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1374 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1375 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1376 "balance": "0x01f4",
1377 "modify_tx": "0x01"
1378 }
1379 }
1380 },
1381 "component_tvl": {
1382 "protocol_1": 1000.0
1383 },
1384 "dci_update": {
1385 "new_entrypoints": {},
1386 "new_entrypoint_params": {},
1387 "trace_results": {}
1388 }
1389 }
1390 }
1391 "#.to_owned()
1392 ))
1393 ];
1394 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1395 let client = WsDeltasClient::new_with_reconnects(
1396 &format!("ws://{addr}"),
1397 None,
1398 3,
1399 Duration::from_millis(110),
1401 )
1402 .unwrap();
1403
1404 let jh: JoinHandle<Result<(), DeltasError>> = client
1405 .connect()
1406 .await
1407 .expect("connect failed");
1408
1409 for _ in 0..2 {
1410 dbg!("loop");
1411 let (_, mut rx) = timeout(
1412 Duration::from_millis(200),
1413 client.subscribe(
1414 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1415 SubscriptionOptions::new(),
1416 ),
1417 )
1418 .await
1419 .expect("subscription timed out")
1420 .expect("subscription failed");
1421
1422 let _ = timeout(Duration::from_millis(100), rx.recv())
1423 .await
1424 .expect("awaiting message timeout out")
1425 .expect("receiving message failed");
1426
1427 let res = timeout(Duration::from_millis(200), rx.recv())
1429 .await
1430 .expect("awaiting closed connection timeout out");
1431 assert!(res.is_none());
1432 }
1433 let res = jh.await.expect("ws client join failed");
1434 assert!(res.is_err());
1436 server_thread
1437 .await
1438 .expect("ws server loop errored");
1439 }
1440
1441 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1442 let server = TcpListener::bind("127.0.0.1:0")
1443 .await
1444 .expect("localhost bind failed");
1445 let addr = server.local_addr().unwrap();
1446 let jh = tokio::spawn(async move {
1447 while let Ok((stream, _)) = server.accept().await {
1448 if accept_first {
1449 let stream = tokio_tungstenite::accept_async(stream)
1451 .await
1452 .unwrap();
1453 sleep(Duration::from_millis(10)).await;
1454 drop(stream)
1455 } else {
1456 drop(stream);
1458 }
1459 }
1460 });
1461 (addr, jh)
1462 }
1463
1464 #[test(tokio::test)]
1465 async fn test_subscribe_dead_client_after_max_attempts() {
1466 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1467 let client = WsDeltasClient::new_with_reconnects(
1468 &format!("ws://{addr}"),
1469 None,
1470 3,
1471 Duration::from_secs(0),
1472 )
1473 .unwrap();
1474
1475 let join_handle = client.connect().await.unwrap();
1476 let handle_res = join_handle.await.unwrap();
1477 assert!(handle_res.is_err());
1478 assert!(!client.is_connected().await);
1479
1480 let subscription_res = timeout(
1481 Duration::from_millis(10),
1482 client.subscribe(
1483 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1484 SubscriptionOptions::new(),
1485 ),
1486 )
1487 .await
1488 .unwrap();
1489 assert!(subscription_res.is_err());
1490 }
1491
1492 #[test(tokio::test)]
1493 async fn test_ws_client_retry_cooldown() {
1494 let start = std::time::Instant::now();
1495 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1496
1497 let client = WsDeltasClient::new_with_reconnects(
1499 &format!("ws://{addr}"),
1500 None,
1501 3, Duration::from_millis(50), )
1504 .unwrap();
1505
1506 let connect_result = client.connect().await;
1508 let elapsed = start.elapsed();
1509
1510 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1512
1513 assert!(
1515 elapsed >= Duration::from_millis(100),
1516 "Expected at least 100ms elapsed, got {:?}",
1517 elapsed
1518 );
1519
1520 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1522 }
1523
1524 #[test_log::test(tokio::test)]
1525 async fn test_buffer_full_triggers_unsubscribe() {
1526 let exp_comm = {
1528 [
1529 ExpectedComm::Receive(
1531 100,
1532 tungstenite::protocol::Message::Text(
1533 r#"
1534 {
1535 "method":"subscribe",
1536 "extractor_id":{
1537 "chain":"ethereum",
1538 "name":"vm:ambient"
1539 },
1540 "include_state": true
1541 }"#
1542 .to_owned()
1543 .replace(|c: char| c.is_whitespace(), ""),
1544 ),
1545 ),
1546 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1548 r#"
1549 {
1550 "method":"newsubscription",
1551 "extractor_id":{
1552 "chain":"ethereum",
1553 "name":"vm:ambient"
1554 },
1555 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1556 }"#
1557 .to_owned()
1558 .replace(|c: char| c.is_whitespace(), ""),
1559 )),
1560 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1562 r#"
1563 {
1564 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1565 "deltas": {
1566 "extractor": "vm:ambient",
1567 "chain": "ethereum",
1568 "block": {
1569 "number": 123,
1570 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1571 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1572 "chain": "ethereum",
1573 "ts": "2023-09-14T00:00:00"
1574 },
1575 "finalized_block_height": 0,
1576 "revert": false,
1577 "new_tokens": {},
1578 "account_updates": {},
1579 "state_updates": {},
1580 "new_protocol_components": {},
1581 "deleted_protocol_components": {},
1582 "component_balances": {},
1583 "account_balances": {},
1584 "component_tvl": {},
1585 "dci_update": {
1586 "new_entrypoints": {},
1587 "new_entrypoint_params": {},
1588 "trace_results": {}
1589 }
1590 }
1591 }
1592 "#.to_owned()
1593 )),
1594 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1596 r#"
1597 {
1598 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1599 "deltas": {
1600 "extractor": "vm:ambient",
1601 "chain": "ethereum",
1602 "block": {
1603 "number": 124,
1604 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1605 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1606 "chain": "ethereum",
1607 "ts": "2023-09-14T00:00:01"
1608 },
1609 "finalized_block_height": 0,
1610 "revert": false,
1611 "new_tokens": {},
1612 "account_updates": {},
1613 "state_updates": {},
1614 "new_protocol_components": {},
1615 "deleted_protocol_components": {},
1616 "component_balances": {},
1617 "account_balances": {},
1618 "component_tvl": {},
1619 "dci_update": {
1620 "new_entrypoints": {},
1621 "new_entrypoint_params": {},
1622 "trace_results": {}
1623 }
1624 }
1625 }
1626 "#.to_owned()
1627 )),
1628 ExpectedComm::Receive(
1630 100,
1631 tungstenite::protocol::Message::Text(
1632 r#"
1633 {
1634 "method": "unsubscribe",
1635 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1636 }
1637 "#
1638 .to_owned()
1639 .replace(|c: char| c.is_whitespace(), ""),
1640 ),
1641 ),
1642 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1644 r#"
1645 {
1646 "method": "subscriptionended",
1647 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1648 }
1649 "#
1650 .to_owned()
1651 .replace(|c: char| c.is_whitespace(), ""),
1652 )),
1653 ]
1654 };
1655
1656 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1657
1658 let client = WsDeltasClient::new_with_custom_buffers(
1660 &format!("ws://{addr}"),
1661 None,
1662 128, 1, )
1665 .unwrap();
1666
1667 let jh = client
1668 .connect()
1669 .await
1670 .expect("connect failed");
1671
1672 let (_sub_id, mut rx) = timeout(
1673 Duration::from_millis(100),
1674 client.subscribe(
1675 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1676 SubscriptionOptions::new(),
1677 ),
1678 )
1679 .await
1680 .expect("subscription timed out")
1681 .expect("subscription failed");
1682
1683 tokio::time::sleep(Duration::from_millis(100)).await;
1685
1686 let mut received_msgs = Vec::new();
1688
1689 while received_msgs.len() < 3 {
1691 match timeout(Duration::from_millis(200), rx.recv()).await {
1692 Ok(Some(msg)) => {
1693 received_msgs.push(msg);
1694 }
1695 Ok(None) => {
1696 break;
1698 }
1699 Err(_) => {
1700 break;
1702 }
1703 }
1704 }
1705
1706 assert!(
1708 received_msgs.len() <= 1,
1709 "Expected buffer overflow to limit messages to at most 1, got {}",
1710 received_msgs.len()
1711 );
1712
1713 if let Some(first_msg) = received_msgs.first() {
1714 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1715 }
1716
1717 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1724
1725 jh.abort();
1727 server_thread.abort();
1728
1729 let _ = jh.await;
1730 let _ = server_thread.await;
1731 }
1732
1733 #[tokio::test]
1734 async fn test_server_error_handling() {
1735 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1736
1737 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1738
1739 let error_response = WebSocketMessage::Response(Response::Error(
1741 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1742 ));
1743 let error_json = serde_json::to_string(&error_response).unwrap();
1744
1745 let exp_comm = [
1746 ExpectedComm::Receive(
1747 100,
1748 tungstenite::protocol::Message::Text(
1749 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1750 ),
1751 ),
1752 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1753 ];
1754
1755 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1756
1757 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1758 let jh = client
1759 .connect()
1760 .await
1761 .expect("connect failed");
1762
1763 let result = timeout(
1764 Duration::from_millis(100),
1765 client.subscribe(extractor_id, SubscriptionOptions::new()),
1766 )
1767 .await
1768 .expect("subscription timed out");
1769
1770 assert!(result.is_err());
1772 if let Err(DeltasError::ServerError(msg, _)) = result {
1773 assert!(msg.contains("Subscription failed"));
1774 assert!(msg.contains("Extractor not found"));
1775 } else {
1776 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1777 }
1778
1779 timeout(Duration::from_millis(100), client.close())
1780 .await
1781 .expect("close timed out")
1782 .expect("close failed");
1783 jh.await
1784 .expect("ws loop errored")
1785 .unwrap();
1786 server_thread.await.unwrap();
1787 }
1788
1789 #[test_log::test(tokio::test)]
1790 async fn test_subscription_not_found_error() {
1791 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1793
1794 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1795 let subscription_id = Uuid::new_v4();
1796
1797 let error_response = WebSocketMessage::Response(Response::Error(
1798 WebsocketError::SubscriptionNotFound(subscription_id),
1799 ));
1800 let error_json = serde_json::to_string(&error_response).unwrap();
1801
1802 let exp_comm = [
1803 ExpectedComm::Receive(
1805 100,
1806 tungstenite::protocol::Message::Text(
1807 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1808 ),
1809 ),
1810 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1811 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1812 subscription_id
1813 ))),
1814 ExpectedComm::Receive(
1816 100,
1817 tungstenite::protocol::Message::Text(format!(
1818 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1819 subscription_id
1820 )),
1821 ),
1822 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1824 ];
1825
1826 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1827
1828 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1829 let jh = client
1830 .connect()
1831 .await
1832 .expect("connect failed");
1833
1834 let (received_sub_id, _rx) = timeout(
1836 Duration::from_millis(100),
1837 client.subscribe(extractor_id, SubscriptionOptions::new()),
1838 )
1839 .await
1840 .expect("subscription timed out")
1841 .expect("subscription failed");
1842
1843 assert_eq!(received_sub_id, subscription_id);
1844
1845 let unsubscribe_result =
1847 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1848 .await
1849 .expect("unsubscribe timed out");
1850
1851 unsubscribe_result
1855 .expect("Unsubscribe should succeed even if server says subscription not found");
1856
1857 timeout(Duration::from_millis(100), client.close())
1858 .await
1859 .expect("close timed out")
1860 .expect("close failed");
1861 jh.await
1862 .expect("ws loop errored")
1863 .unwrap();
1864 server_thread.await.unwrap();
1865 }
1866
1867 #[test_log::test(tokio::test)]
1868 async fn test_parse_error_handling() {
1869 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1870
1871 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1872 let error_response = WebSocketMessage::Response(Response::Error(
1873 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1874 ));
1875 let error_json = serde_json::to_string(&error_response).unwrap();
1876
1877 let exp_comm = [
1878 ExpectedComm::Receive(
1880 100,
1881 tungstenite::protocol::Message::Text(
1882 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1883 ),
1884 ),
1885 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1886 ];
1887
1888 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1889
1890 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1891 let jh = client
1892 .connect()
1893 .await
1894 .expect("connect failed");
1895
1896 let _ = timeout(
1898 Duration::from_millis(100),
1899 client.subscribe(extractor_id, SubscriptionOptions::new()),
1900 )
1901 .await
1902 .expect("subscription timed out");
1903
1904 let result = jh
1906 .await
1907 .expect("ws loop should complete");
1908 assert!(result.is_err());
1909 if let Err(DeltasError::ServerError(message, _)) = result {
1910 assert!(message.contains("Server failed to parse client message"));
1911 } else {
1912 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1913 }
1914
1915 server_thread.await.unwrap();
1916 }
1917
1918 #[tokio::test]
1919 async fn test_subscribe_error_handling() {
1920 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1921
1922 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1923
1924 let error_response = WebSocketMessage::Response(Response::Error(
1925 WebsocketError::SubscribeError(extractor_id.clone()),
1926 ));
1927 let error_json = serde_json::to_string(&error_response).unwrap();
1928
1929 let exp_comm = [
1930 ExpectedComm::Receive(
1931 100,
1932 tungstenite::protocol::Message::Text(
1933 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true}"#.to_string()
1934 ),
1935 ),
1936 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1937 ];
1938
1939 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1940
1941 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1942 let jh = client
1943 .connect()
1944 .await
1945 .expect("connect failed");
1946
1947 let result = timeout(
1948 Duration::from_millis(100),
1949 client.subscribe(extractor_id, SubscriptionOptions::new()),
1950 )
1951 .await
1952 .expect("subscription timed out");
1953
1954 assert!(result.is_err());
1956 if let Err(DeltasError::ServerError(msg, _)) = result {
1957 assert!(msg.contains("Subscription failed"));
1958 assert!(msg.contains("Failed to subscribe to extractor"));
1959 } else {
1960 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1961 }
1962
1963 timeout(Duration::from_millis(100), client.close())
1964 .await
1965 .expect("close timed out")
1966 .expect("close failed");
1967 jh.await
1968 .expect("ws loop errored")
1969 .unwrap();
1970 server_thread.await.unwrap();
1971 }
1972
1973 #[tokio::test]
1974 async fn test_cancel_pending_subscription() {
1975 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1977
1978 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1979
1980 let error_response = WebSocketMessage::Response(Response::Error(
1981 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1982 ));
1983 let error_json = serde_json::to_string(&error_response).unwrap();
1984
1985 let exp_comm = [
1986 ExpectedComm::Receive(
1987 100,
1988 tungstenite::protocol::Message::Text(
1989 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1990 ),
1991 ),
1992 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1993 ];
1994
1995 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1996
1997 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1998 let jh = client
1999 .connect()
2000 .await
2001 .expect("connect failed");
2002
2003 let client_clone = client.clone();
2005 let extractor_id_clone = extractor_id.clone();
2006
2007 let subscription1 = tokio::spawn({
2008 let client_for_spawn = client.clone();
2009 async move {
2010 client_for_spawn
2011 .subscribe(extractor_id, SubscriptionOptions::new())
2012 .await
2013 }
2014 });
2015
2016 let subscription2 = tokio::spawn(async move {
2017 client_clone
2019 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2020 .await
2021 });
2022
2023 let (result1, result2) = tokio::join!(subscription1, subscription2);
2024
2025 let result1 = result1.unwrap();
2026 let result2 = result2.unwrap();
2027
2028 assert!(result1.is_err() || result2.is_err());
2031
2032 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2033 } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2035 } else {
2037 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2038 }
2039
2040 timeout(Duration::from_millis(100), client.close())
2041 .await
2042 .expect("close timed out")
2043 .expect("close failed");
2044 jh.await
2045 .expect("ws loop errored")
2046 .unwrap();
2047 server_thread.await.unwrap();
2048 }
2049
2050 #[tokio::test]
2051 async fn test_force_unsubscribe_prevents_multiple_calls() {
2052 let subscription_id = Uuid::new_v4();
2056
2057 let exp_comm = [
2058 ExpectedComm::Receive(
2059 100,
2060 tungstenite::protocol::Message::Text(
2061 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true}"#.to_string()
2062 ),
2063 ),
2064 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2065 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2066 subscription_id
2067 ))),
2068 ExpectedComm::Receive(
2070 100,
2071 tungstenite::protocol::Message::Text(format!(
2072 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2073 subscription_id
2074 )),
2075 ),
2076 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2077 r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2078 subscription_id
2079 ))),
2080 ];
2081
2082 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2083
2084 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2085 let jh = client
2086 .connect()
2087 .await
2088 .expect("connect failed");
2089
2090 let (received_sub_id, _rx) = timeout(
2091 Duration::from_millis(100),
2092 client.subscribe(
2093 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2094 SubscriptionOptions::new(),
2095 ),
2096 )
2097 .await
2098 .expect("subscription timed out")
2099 .expect("subscription failed");
2100
2101 assert_eq!(received_sub_id, subscription_id);
2102
2103 {
2105 let mut inner_guard = client.inner.lock().await;
2106 let inner = inner_guard
2107 .as_mut()
2108 .expect("client should be connected");
2109
2110 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2112 WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2113 }
2114
2115 tokio::time::sleep(Duration::from_millis(50)).await;
2117
2118 let _ = timeout(Duration::from_millis(100), client.close()).await;
2120
2121 let _ = jh.await;
2123 let _ = server_thread.await;
2124 }
2125}