1use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27 },
28 time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34 header::{
35 AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36 USER_AGENT,
37 },
38 Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44 net::TcpStream,
45 sync::{
46 mpsc::{self, error::TrySendError, Receiver, Sender},
47 oneshot, Mutex, Notify,
48 },
49 task::JoinHandle,
50 time::sleep,
51};
52use tokio_tungstenite::{
53 connect_async,
54 tungstenite::{
55 self,
56 handshake::client::{generate_key, Request},
57 },
58 MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62 BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70 #[error("Failed to parse URI: {0}. Error: {1}")]
72 UriParsing(String, String),
73
74 #[error("The requested subscription is already pending")]
76 SubscriptionAlreadyPending,
77
78 #[error("The server replied with an error: {0}")]
79 ServerError(String, #[source] WebsocketError),
80
81 #[error("{0}")]
84 TransportError(String),
85
86 #[error("The buffer is full!")]
90 BufferFull,
91
92 #[error("The client is not connected!")]
96 NotConnected,
97
98 #[error("The client is already connected!")]
100 AlreadyConnected,
101
102 #[error("The server closed the connection!")]
104 ConnectionClosed,
105
106 #[error("Connection error: {0}")]
108 ConnectionError(#[from] Box<tungstenite::Error>),
109
110 #[error("Tycho FatalError: {0}")]
112 Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117 include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121 fn default() -> Self {
122 Self { include_state: true }
123 }
124}
125
126impl SubscriptionOptions {
127 pub fn new() -> Self {
128 Self::default()
129 }
130 pub fn with_state(mut self, val: bool) -> Self {
131 self.include_state = val;
132 self
133 }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139 async fn subscribe(
146 &self,
147 extractor_id: ExtractorIdentity,
148 options: SubscriptionOptions,
149 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157 async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163 uri: Uri,
165 auth_key: Option<String>,
167 max_reconnects: u64,
169 retry_cooldown: Duration,
171 ws_buffer_size: usize,
174 subscription_buffer_size: usize,
177 conn_notify: Arc<Notify>,
179 inner: Arc<Mutex<Option<Inner>>>,
181 dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188#[derive(Debug)]
200enum SubscriptionInfo {
201 RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203 Active,
205 RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209struct Inner {
211 sink: WebSocketSink,
213 cmd_tx: Sender<()>,
215 pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217 subscriptions: HashMap<Uuid, SubscriptionInfo>,
219 sender: HashMap<Uuid, Sender<BlockChanges>>,
222 buffer_size: usize,
224}
225
226impl Inner {
230 fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231 Self {
232 sink,
233 cmd_tx,
234 pending: HashMap::new(),
235 subscriptions: HashMap::new(),
236 sender: HashMap::new(),
237 buffer_size,
238 }
239 }
240
241 #[allow(clippy::result_large_err)]
243 fn new_subscription(
244 &mut self,
245 id: &ExtractorIdentity,
246 ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247 ) -> Result<(), DeltasError> {
248 if self.pending.contains_key(id) {
249 return Err(DeltasError::SubscriptionAlreadyPending);
250 }
251 self.pending
252 .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253 Ok(())
254 }
255
256 fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260 if let Some(info) = self.pending.remove(extractor_id) {
261 if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262 let (tx, rx) = mpsc::channel(self.buffer_size);
263 self.sender.insert(subscription_id, tx);
264 self.subscriptions
265 .insert(subscription_id, SubscriptionInfo::Active);
266 let _ = ready_tx
267 .send(Ok((subscription_id, rx)))
268 .map_err(|_| {
269 warn!(
270 ?extractor_id,
271 ?subscription_id,
272 "Subscriber for has gone away. Ignoring."
273 )
274 });
275 } else {
276 error!(
277 ?extractor_id,
278 ?subscription_id,
279 "Pending subscription was not in the correct state to
280 transition to active. Ignoring!"
281 )
282 }
283 } else {
284 error!(
285 ?extractor_id,
286 ?subscription_id,
287 "Tried to mark an unknown subscription as active. Ignoring!"
288 );
289 }
290 }
291
292 #[allow(clippy::result_large_err)]
294 fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295 if let Some(sender) = self.sender.get_mut(id) {
296 sender
297 .try_send(msg)
298 .map_err(|e| match e {
299 TrySendError::Full(_) => DeltasError::BufferFull,
300 TrySendError::Closed(_) => {
301 DeltasError::TransportError("The subscriber has gone away".to_string())
302 }
303 })?;
304 }
305 Ok(())
306 }
307
308 fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313 if let Some(info) = self
314 .subscriptions
315 .get_mut(subscription_id)
316 {
317 if let SubscriptionInfo::Active = info {
318 *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319 }
320 } else {
321 debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323 }
324 }
325
326 fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333 if let Entry::Occupied(e) = self
334 .subscriptions
335 .entry(subscription_id)
336 {
337 let info = e.remove();
338 if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339 let _ = tx.send(()).map_err(|_| {
340 warn!(?subscription_id, "failed to notify about removed subscription")
341 });
342 self.sender
343 .remove(&subscription_id)
344 .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345 } else {
346 warn!(?subscription_id, "Subscription ended unexpectedly!");
347 self.sender
348 .remove(&subscription_id)
349 .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350 }
351 } else {
352 error!(
353 ?subscription_id,
354 "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
355 );
356 }
357
358 Ok(())
359 }
360
361 fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
362 if let Some(sub_info) = self.pending.remove(extractor_id) {
363 match sub_info {
364 SubscriptionInfo::RequestedSubscription(tx) => {
365 let _ = tx
366 .send(Err(DeltasError::ServerError(
367 format!("Subscription failed: {error}"),
368 error.clone(),
369 )))
370 .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
371 }
372 _ => {
373 error!(?extractor_id, "Pending subscription in wrong state")
374 }
375 }
376 } else {
377 debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
378 }
379 }
380
381 async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
383 self.sink.send(msg).await.map_err(|e| {
384 DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
385 })
386 }
387}
388
389impl WsDeltasClient {
391 #[allow(clippy::result_large_err)]
393 pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
394 let uri = ws_uri
395 .parse::<Uri>()
396 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
397 Ok(Self {
398 uri,
399 auth_key: auth_key.map(|s| s.to_string()),
400 inner: Arc::new(Mutex::new(None)),
401 ws_buffer_size: 128,
402 subscription_buffer_size: 128,
403 conn_notify: Arc::new(Notify::new()),
404 max_reconnects: 5,
405 retry_cooldown: Duration::from_millis(500),
406 dead: Arc::new(AtomicBool::new(false)),
407 })
408 }
409
410 #[allow(clippy::result_large_err)]
412 pub fn new_with_reconnects(
413 ws_uri: &str,
414 auth_key: Option<&str>,
415 max_reconnects: u64,
416 retry_cooldown: Duration,
417 ) -> Result<Self, DeltasError> {
418 let uri = ws_uri
419 .parse::<Uri>()
420 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
421
422 Ok(Self {
423 uri,
424 auth_key: auth_key.map(|s| s.to_string()),
425 inner: Arc::new(Mutex::new(None)),
426 ws_buffer_size: 128,
427 subscription_buffer_size: 128,
428 conn_notify: Arc::new(Notify::new()),
429 max_reconnects,
430 retry_cooldown,
431 dead: Arc::new(AtomicBool::new(false)),
432 })
433 }
434
435 #[cfg(test)]
437 #[allow(clippy::result_large_err)]
438 pub fn new_with_custom_buffers(
439 ws_uri: &str,
440 auth_key: Option<&str>,
441 ws_buffer_size: usize,
442 subscription_buffer_size: usize,
443 ) -> Result<Self, DeltasError> {
444 let uri = ws_uri
445 .parse::<Uri>()
446 .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
447 Ok(Self {
448 uri,
449 auth_key: auth_key.map(|s| s.to_string()),
450 inner: Arc::new(Mutex::new(None)),
451 ws_buffer_size,
452 subscription_buffer_size,
453 conn_notify: Arc::new(Notify::new()),
454 max_reconnects: 5,
455 retry_cooldown: Duration::from_millis(0),
456 dead: Arc::new(AtomicBool::new(false)),
457 })
458 }
459
460 async fn is_connected(&self) -> bool {
464 let guard = self.inner.as_ref().lock().await;
465 guard.is_some()
466 }
467
468 async fn ensure_connection(&self) -> Result<(), DeltasError> {
473 if self.dead.load(Ordering::SeqCst) {
474 return Err(DeltasError::NotConnected)
475 };
476 if !self.is_connected().await {
477 self.conn_notify.notified().await;
478 };
479 Ok(())
480 }
481
482 #[instrument(skip(self, msg))]
487 async fn handle_msg(
488 &self,
489 msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
490 ) -> Result<(), DeltasError> {
491 let mut guard = self.inner.lock().await;
492
493 match msg {
494 Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
499 serde_json::Value,
500 >(&text)
501 {
502 Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
503 Ok(ws_message) => match ws_message {
504 WebSocketMessage::BlockChanges { subscription_id, deltas } => {
505 trace!(?deltas, "Received a block state change, sending to channel");
506 let inner = guard
507 .as_mut()
508 .ok_or_else(|| DeltasError::NotConnected)?;
509 match inner.send(&subscription_id, deltas) {
510 Err(DeltasError::BufferFull) => {
511 error!(?subscription_id, "Buffer full, unsubscribing!");
512 Self::force_unsubscribe(subscription_id, inner).await;
513 }
514 Err(_) => {
515 warn!(
516 ?subscription_id,
517 "Receiver for has gone away, unsubscribing!"
518 );
519 Self::force_unsubscribe(subscription_id, inner).await;
520 }
521 _ => { }
522 }
523 }
524 WebSocketMessage::Response(Response::NewSubscription {
525 extractor_id,
526 subscription_id,
527 }) => {
528 info!(?extractor_id, ?subscription_id, "Received a new subscription");
529 let inner = guard
530 .as_mut()
531 .ok_or_else(|| DeltasError::NotConnected)?;
532 inner.mark_active(&extractor_id, subscription_id);
533 }
534 WebSocketMessage::Response(Response::SubscriptionEnded {
535 subscription_id,
536 }) => {
537 info!(?subscription_id, "Received a subscription ended");
538 let inner = guard
539 .as_mut()
540 .ok_or_else(|| DeltasError::NotConnected)?;
541 inner.remove_subscription(subscription_id)?;
542 }
543 WebSocketMessage::Response(Response::Error(error)) => match &error {
544 WebsocketError::ExtractorNotFound(extractor_id) => {
545 let inner = guard
546 .as_mut()
547 .ok_or_else(|| DeltasError::NotConnected)?;
548 inner.cancel_pending(extractor_id, &error);
549 }
550 WebsocketError::SubscriptionNotFound(subscription_id) => {
551 debug!("Received subscription not found, removing subscription");
552 let inner = guard
553 .as_mut()
554 .ok_or_else(|| DeltasError::NotConnected)?;
555 inner.remove_subscription(*subscription_id)?;
556 }
557 WebsocketError::ParseError(raw, e) => {
558 return Err(DeltasError::ServerError(
559 format!(
560 "Server failed to parse client message: {e}, msg: {raw}"
561 ),
562 error.clone(),
563 ))
564 }
565 WebsocketError::SubscribeError(extractor_id) => {
566 let inner = guard
567 .as_mut()
568 .ok_or_else(|| DeltasError::NotConnected)?;
569 inner.cancel_pending(extractor_id, &error);
570 }
571 },
572 },
573 Err(e) => {
574 error!(
575 "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
576 e, text
577 );
578 }
579 },
580 Err(e) => {
581 error!(
582 "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
583 e, text
584 );
585 }
586 },
587 Ok(tungstenite::protocol::Message::Ping(_)) => {
588 let inner = guard
590 .as_mut()
591 .ok_or_else(|| DeltasError::NotConnected)?;
592 if let Err(error) = inner
593 .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
594 .await
595 {
596 debug!(?error, "Failed to send pong!");
597 }
598 }
599 Ok(tungstenite::protocol::Message::Pong(_)) => {
600 }
602 Ok(tungstenite::protocol::Message::Close(_)) => {
603 return Err(DeltasError::ConnectionClosed);
604 }
605 Ok(unknown_msg) => {
606 info!("Received an unknown message type: {:?}", unknown_msg);
607 }
608 Err(error) => {
609 error!(?error, "Websocket error");
610 return Err(match error {
611 tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
612 tungstenite::Error::AlreadyClosed => {
613 warn!("Received AlreadyClosed error which is indicative of a bug!");
614 DeltasError::ConnectionError(Box::new(error))
615 }
616 tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
617 DeltasError::ConnectionError(Box::new(error))
618 }
619 _ => DeltasError::Fatal(error.to_string()),
620 });
621 }
622 };
623 Ok(())
624 }
625
626 async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
631 let (tx, rx) = oneshot::channel();
632 if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
633 warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
634 } else {
635 match tokio::time::timeout(Duration::from_secs(5), rx).await {
637 Ok(_) => {
638 debug!(?subscription_id, "Unsubscribe completed successfully");
639 }
640 Err(_) => {
641 warn!(?subscription_id, "Unsubscribe completion timed out");
642 }
643 }
644 }
645 }
646
647 async fn unsubscribe_inner(
653 inner: &mut Inner,
654 subscription_id: Uuid,
655 ready_tx: oneshot::Sender<()>,
656 ) -> Result<(), DeltasError> {
657 debug!(?subscription_id, "Unsubscribing");
658 inner.end_subscription(&subscription_id, ready_tx);
659 let cmd = Command::Unsubscribe { subscription_id };
660 inner
661 .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
662 |e| {
663 DeltasError::TransportError(format!(
664 "Failed to serialize unsubscribe command: {e}"
665 ))
666 },
667 )?))
668 .await?;
669 Ok(())
670 }
671}
672
673#[async_trait]
674impl DeltasClient for WsDeltasClient {
675 #[instrument(skip(self))]
676 async fn subscribe(
677 &self,
678 extractor_id: ExtractorIdentity,
679 options: SubscriptionOptions,
680 ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
681 trace!("Starting subscribe");
682 self.ensure_connection().await?;
683 let (ready_tx, ready_rx) = oneshot::channel();
684 {
685 let mut guard = self.inner.lock().await;
686 let inner = guard
687 .as_mut()
688 .ok_or_else(|| DeltasError::NotConnected)?;
689 trace!("Sending subscribe command");
690 inner.new_subscription(&extractor_id, ready_tx)?;
691 let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
692 inner
693 .ws_send(tungstenite::protocol::Message::Text(
694 serde_json::to_string(&cmd).map_err(|e| {
695 DeltasError::TransportError(format!(
696 "Failed to serialize subscribe command: {e}"
697 ))
698 })?,
699 ))
700 .await?;
701 }
702 trace!("Waiting for subscription response");
703 let res = ready_rx.await.map_err(|_| {
704 DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
705 })??;
706 trace!("Subscription successful");
707 Ok(res)
708 }
709
710 #[instrument(skip(self))]
711 async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
712 self.ensure_connection().await?;
713 let (ready_tx, ready_rx) = oneshot::channel();
714 {
715 let mut guard = self.inner.lock().await;
716 let inner = guard
717 .as_mut()
718 .ok_or_else(|| DeltasError::NotConnected)?;
719
720 WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
721 }
722 ready_rx.await.map_err(|_| {
723 DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
724 })?;
725
726 Ok(())
727 }
728
729 #[instrument(skip(self))]
730 async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
731 if self.is_connected().await {
732 return Err(DeltasError::AlreadyConnected);
733 }
734 let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
735 info!(?ws_uri, "Starting TychoWebsocketClient");
736
737 let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
738 {
739 let mut guard = self.inner.as_ref().lock().await;
740 *guard = None;
741 }
742 let this = self.clone();
743 let jh = tokio::spawn(async move {
744 let mut retry_count = 0;
745 let mut result = Err(DeltasError::NotConnected);
746
747 'retry: while retry_count < this.max_reconnects {
748 info!(?ws_uri, retry_count, "Connecting to WebSocket server");
749 if retry_count > 0 {
750 sleep(this.retry_cooldown).await;
751 }
752
753 let mut request_builder = Request::builder()
755 .uri(&ws_uri)
756 .header(SEC_WEBSOCKET_KEY, generate_key())
757 .header(SEC_WEBSOCKET_VERSION, 13)
758 .header(CONNECTION, "Upgrade")
759 .header(UPGRADE, "websocket")
760 .header(
761 HOST,
762 this.uri.host().ok_or_else(|| {
763 DeltasError::UriParsing(
764 ws_uri.clone(),
765 "No host found in tycho url".to_string(),
766 )
767 })?,
768 )
769 .header(
770 USER_AGENT,
771 format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
772 );
773
774 if let Some(ref key) = this.auth_key {
776 request_builder = request_builder.header(AUTHORIZATION, key);
777 }
778
779 let request = request_builder.body(()).map_err(|e| {
780 DeltasError::TransportError(format!("Failed to build connection request: {e}"))
781 })?;
782 let (conn, _) = match connect_async(request).await {
783 Ok(conn) => conn,
784 Err(e) => {
785 retry_count += 1;
787 let mut guard = this.inner.as_ref().lock().await;
788 *guard = None;
789
790 warn!(
791 e = e.to_string(),
792 "Failed to connect to WebSocket server; Reconnecting"
793 );
794 continue 'retry;
795 }
796 };
797
798 let (ws_tx_new, ws_rx_new) = conn.split();
799 {
800 let mut guard = this.inner.as_ref().lock().await;
801 *guard =
802 Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
803 }
804 let mut msg_rx = ws_rx_new.boxed();
805
806 info!("Connection Successful: TychoWebsocketClient started");
807 this.conn_notify.notify_waiters();
808 result = Ok(());
809
810 loop {
811 let res = tokio::select! {
812 msg = msg_rx.next() => match msg {
813 Some(msg) => this.handle_msg(msg).await,
814 None => {
815 warn!("Websocket connection silently closed, giving up!");
819 break 'retry
820 }
821 },
822 _ = cmd_rx.recv() => {break 'retry},
823 };
824 if let Err(error) = res {
825 debug!(?error, "WsError");
826 if matches!(
827 error,
828 DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
829 ) {
830 retry_count += 1;
832 let mut guard = this.inner.as_ref().lock().await;
833 *guard = None;
834
835 warn!(
836 ?error,
837 ?retry_count,
838 "Connection dropped unexpectedly; Reconnecting..."
839 );
840 break;
841 } else {
842 error!(?error, "Fatal error; Exiting");
844 result = Err(error);
845 break 'retry;
846 }
847 }
848 }
849 }
850 debug!(
851 retry_count,
852 max_reconnects=?this.max_reconnects,
853 "Reconnection loop ended"
854 );
855 let mut guard = this.inner.as_ref().lock().await;
857 *guard = None;
858
859 if retry_count >= this.max_reconnects {
861 error!("Max reconnection attempts reached; Exiting");
862 this.dead.store(true, Ordering::SeqCst);
863 this.conn_notify.notify_waiters(); result = Err(DeltasError::ConnectionClosed);
865 }
866
867 result
868 });
869
870 self.conn_notify.notified().await;
871
872 if self.is_connected().await {
873 Ok(jh)
874 } else {
875 Err(DeltasError::NotConnected)
876 }
877 }
878
879 #[instrument(skip(self))]
880 async fn close(&self) -> Result<(), DeltasError> {
881 info!("Closing TychoWebsocketClient");
882 let mut guard = self.inner.lock().await;
883 let inner = guard
884 .as_mut()
885 .ok_or_else(|| DeltasError::NotConnected)?;
886 inner
887 .cmd_tx
888 .send(())
889 .await
890 .map_err(|e| DeltasError::TransportError(e.to_string()))?;
891 Ok(())
892 }
893}
894
895#[cfg(test)]
896mod tests {
897 use std::net::SocketAddr;
898
899 use test_log::test;
900 use tokio::{net::TcpListener, time::timeout};
901 use tycho_common::dto::Chain;
902
903 use super::*;
904
905 #[derive(Clone)]
906 enum ExpectedComm {
907 Receive(u64, tungstenite::protocol::Message),
908 Send(tungstenite::protocol::Message),
909 }
910
911 async fn mock_tycho_ws(
912 messages: &[ExpectedComm],
913 reconnects: usize,
914 ) -> (SocketAddr, JoinHandle<()>) {
915 info!("Starting mock webserver");
916 let server = TcpListener::bind("127.0.0.1:0")
918 .await
919 .expect("localhost bind failed");
920 let addr = server.local_addr().unwrap();
921 let messages = messages.to_vec();
922
923 let jh = tokio::spawn(async move {
924 info!("mock webserver started");
925 for _ in 0..(reconnects + 1) {
926 info!("Awaiting client connections");
927 if let Ok((stream, _)) = server.accept().await {
928 info!("Client connected");
929 let mut websocket = tokio_tungstenite::accept_async(stream)
930 .await
931 .unwrap();
932
933 info!("Handling messages..");
934 for c in messages.iter().cloned() {
935 match c {
936 ExpectedComm::Receive(t, exp) => {
937 info!("Awaiting message...");
938 let msg = timeout(Duration::from_millis(t), websocket.next())
939 .await
940 .expect("Receive timeout")
941 .expect("Stream exhausted")
942 .expect("Failed to receive message.");
943 info!("Message received");
944 assert_eq!(msg, exp)
945 }
946 ExpectedComm::Send(data) => {
947 info!("Sending message");
948 websocket
949 .send(data)
950 .await
951 .expect("Failed to send message");
952 info!("Message sent");
953 }
954 };
955 }
956 info!("Mock communication completed");
957 sleep(Duration::from_millis(100)).await;
958 let _ = websocket.close(None).await;
960 info!("Mock server closed connection");
961 }
962 }
963 info!("mock server ended");
964 });
965 (addr, jh)
966 }
967
968 #[tokio::test]
969 async fn test_subscribe_receive() {
970 let exp_comm = [
971 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
972 {
973 "method":"subscribe",
974 "extractor_id":{
975 "chain":"ethereum",
976 "name":"vm:ambient"
977 },
978 "include_state": true
979 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
980 )),
981 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
982 {
983 "method":"newsubscription",
984 "extractor_id":{
985 "chain":"ethereum",
986 "name":"vm:ambient"
987 },
988 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
989 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
990 )),
991 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
992 {
993 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
994 "deltas": {
995 "extractor": "vm:ambient",
996 "chain": "ethereum",
997 "block": {
998 "number": 123,
999 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1000 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1001 "chain": "ethereum",
1002 "ts": "2023-09-14T00:00:00"
1003 },
1004 "finalized_block_height": 0,
1005 "revert": false,
1006 "new_tokens": {},
1007 "account_updates": {
1008 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1009 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1010 "chain": "ethereum",
1011 "slots": {},
1012 "balance": "0x01f4",
1013 "code": "",
1014 "change": "Update"
1015 }
1016 },
1017 "state_updates": {
1018 "component_1": {
1019 "component_id": "component_1",
1020 "updated_attributes": {"attr1": "0x01"},
1021 "deleted_attributes": ["attr2"]
1022 }
1023 },
1024 "new_protocol_components":
1025 { "protocol_1": {
1026 "id": "protocol_1",
1027 "protocol_system": "system_1",
1028 "protocol_type_name": "type_1",
1029 "chain": "ethereum",
1030 "tokens": ["0x01", "0x02"],
1031 "contract_ids": ["0x01", "0x02"],
1032 "static_attributes": {"attr1": "0x01f4"},
1033 "change": "Update",
1034 "creation_tx": "0x01",
1035 "created_at": "2023-09-14T00:00:00"
1036 }
1037 },
1038 "deleted_protocol_components": {},
1039 "component_balances": {
1040 "protocol_1":
1041 {
1042 "0x01": {
1043 "token": "0x01",
1044 "balance": "0x01f4",
1045 "balance_float": 0.0,
1046 "modify_tx": "0x01",
1047 "component_id": "protocol_1"
1048 }
1049 }
1050 },
1051 "account_balances": {
1052 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1053 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1054 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1055 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1056 "balance": "0x01f4",
1057 "modify_tx": "0x01"
1058 }
1059 }
1060 },
1061 "component_tvl": {
1062 "protocol_1": 1000.0
1063 },
1064 "dci_update": {
1065 "new_entrypoints": {},
1066 "new_entrypoint_params": {},
1067 "trace_results": {}
1068 }
1069 }
1070 }
1071 "#.to_owned()
1072 ))
1073 ];
1074 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1075
1076 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1077 let jh = client
1078 .connect()
1079 .await
1080 .expect("connect failed");
1081 let (_, mut rx) = timeout(
1082 Duration::from_millis(100),
1083 client.subscribe(
1084 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1085 SubscriptionOptions::new(),
1086 ),
1087 )
1088 .await
1089 .expect("subscription timed out")
1090 .expect("subscription failed");
1091 let _ = timeout(Duration::from_millis(100), rx.recv())
1092 .await
1093 .expect("awaiting message timeout out")
1094 .expect("receiving message failed");
1095 timeout(Duration::from_millis(100), client.close())
1096 .await
1097 .expect("close timed out")
1098 .expect("close failed");
1099 jh.await
1100 .expect("ws loop errored")
1101 .unwrap();
1102 server_thread.await.unwrap();
1103 }
1104
1105 #[tokio::test]
1106 async fn test_unsubscribe() {
1107 let exp_comm = [
1108 ExpectedComm::Receive(
1109 100,
1110 tungstenite::protocol::Message::Text(
1111 r#"
1112 {
1113 "method": "subscribe",
1114 "extractor_id":{
1115 "chain": "ethereum",
1116 "name": "vm:ambient"
1117 },
1118 "include_state": true
1119 }"#
1120 .to_owned()
1121 .replace(|c: char| c.is_whitespace(), ""),
1122 ),
1123 ),
1124 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1125 r#"
1126 {
1127 "method": "newsubscription",
1128 "extractor_id":{
1129 "chain": "ethereum",
1130 "name": "vm:ambient"
1131 },
1132 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1133 }"#
1134 .to_owned()
1135 .replace(|c: char| c.is_whitespace(), ""),
1136 )),
1137 ExpectedComm::Receive(
1138 100,
1139 tungstenite::protocol::Message::Text(
1140 r#"
1141 {
1142 "method": "unsubscribe",
1143 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1144 }
1145 "#
1146 .to_owned()
1147 .replace(|c: char| c.is_whitespace(), ""),
1148 ),
1149 ),
1150 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1151 r#"
1152 {
1153 "method": "subscriptionended",
1154 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1155 }
1156 "#
1157 .to_owned()
1158 .replace(|c: char| c.is_whitespace(), ""),
1159 )),
1160 ];
1161 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1162
1163 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1164 let jh = client
1165 .connect()
1166 .await
1167 .expect("connect failed");
1168 let (sub_id, mut rx) = timeout(
1169 Duration::from_millis(100),
1170 client.subscribe(
1171 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1172 SubscriptionOptions::new(),
1173 ),
1174 )
1175 .await
1176 .expect("subscription timed out")
1177 .expect("subscription failed");
1178
1179 timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1180 .await
1181 .expect("unsubscribe timed out")
1182 .expect("unsubscribe failed");
1183 let res = timeout(Duration::from_millis(100), rx.recv())
1184 .await
1185 .expect("awaiting message timeout out");
1186
1187 assert!(res.is_none());
1189
1190 timeout(Duration::from_millis(100), client.close())
1191 .await
1192 .expect("close timed out")
1193 .expect("close failed");
1194 jh.await
1195 .expect("ws loop errored")
1196 .unwrap();
1197 server_thread.await.unwrap();
1198 }
1199
1200 #[tokio::test]
1201 async fn test_subscription_unexpected_end() {
1202 let exp_comm = [
1203 ExpectedComm::Receive(
1204 100,
1205 tungstenite::protocol::Message::Text(
1206 r#"
1207 {
1208 "method":"subscribe",
1209 "extractor_id":{
1210 "chain":"ethereum",
1211 "name":"vm:ambient"
1212 },
1213 "include_state": true
1214 }"#
1215 .to_owned()
1216 .replace(|c: char| c.is_whitespace(), ""),
1217 ),
1218 ),
1219 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1220 r#"
1221 {
1222 "method":"newsubscription",
1223 "extractor_id":{
1224 "chain":"ethereum",
1225 "name":"vm:ambient"
1226 },
1227 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1228 }"#
1229 .to_owned()
1230 .replace(|c: char| c.is_whitespace(), ""),
1231 )),
1232 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1233 r#"
1234 {
1235 "method": "subscriptionended",
1236 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1237 }"#
1238 .to_owned()
1239 .replace(|c: char| c.is_whitespace(), ""),
1240 )),
1241 ];
1242 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1243
1244 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1245 let jh = client
1246 .connect()
1247 .await
1248 .expect("connect failed");
1249 let (_, mut rx) = timeout(
1250 Duration::from_millis(100),
1251 client.subscribe(
1252 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1253 SubscriptionOptions::new(),
1254 ),
1255 )
1256 .await
1257 .expect("subscription timed out")
1258 .expect("subscription failed");
1259 let res = timeout(Duration::from_millis(100), rx.recv())
1260 .await
1261 .expect("awaiting message timeout out");
1262
1263 assert!(res.is_none());
1265
1266 timeout(Duration::from_millis(100), client.close())
1267 .await
1268 .expect("close timed out")
1269 .expect("close failed");
1270 jh.await
1271 .expect("ws loop errored")
1272 .unwrap();
1273 server_thread.await.unwrap();
1274 }
1275
1276 #[test_log::test(tokio::test)]
1277 async fn test_reconnect() {
1278 let exp_comm = [
1279 ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1280 {
1281 "method":"subscribe",
1282 "extractor_id":{
1283 "chain":"ethereum",
1284 "name":"vm:ambient"
1285 },
1286 "include_state": true
1287 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1288 )),
1289 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1290 {
1291 "method":"newsubscription",
1292 "extractor_id":{
1293 "chain":"ethereum",
1294 "name":"vm:ambient"
1295 },
1296 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1297 }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1298 )),
1299 ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1300 {
1301 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1302 "deltas": {
1303 "extractor": "vm:ambient",
1304 "chain": "ethereum",
1305 "block": {
1306 "number": 123,
1307 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1308 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1309 "chain": "ethereum",
1310 "ts": "2023-09-14T00:00:00"
1311 },
1312 "finalized_block_height": 0,
1313 "revert": false,
1314 "new_tokens": {},
1315 "account_updates": {
1316 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1317 "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1318 "chain": "ethereum",
1319 "slots": {},
1320 "balance": "0x01f4",
1321 "code": "",
1322 "change": "Update"
1323 }
1324 },
1325 "state_updates": {
1326 "component_1": {
1327 "component_id": "component_1",
1328 "updated_attributes": {"attr1": "0x01"},
1329 "deleted_attributes": ["attr2"]
1330 }
1331 },
1332 "new_protocol_components": {
1333 "protocol_1":
1334 {
1335 "id": "protocol_1",
1336 "protocol_system": "system_1",
1337 "protocol_type_name": "type_1",
1338 "chain": "ethereum",
1339 "tokens": ["0x01", "0x02"],
1340 "contract_ids": ["0x01", "0x02"],
1341 "static_attributes": {"attr1": "0x01f4"},
1342 "change": "Update",
1343 "creation_tx": "0x01",
1344 "created_at": "2023-09-14T00:00:00"
1345 }
1346 },
1347 "deleted_protocol_components": {},
1348 "component_balances": {
1349 "protocol_1": {
1350 "0x01": {
1351 "token": "0x01",
1352 "balance": "0x01f4",
1353 "balance_float": 1000.0,
1354 "modify_tx": "0x01",
1355 "component_id": "protocol_1"
1356 }
1357 }
1358 },
1359 "account_balances": {
1360 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1361 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1362 "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1363 "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1364 "balance": "0x01f4",
1365 "modify_tx": "0x01"
1366 }
1367 }
1368 },
1369 "component_tvl": {
1370 "protocol_1": 1000.0
1371 },
1372 "dci_update": {
1373 "new_entrypoints": {},
1374 "new_entrypoint_params": {},
1375 "trace_results": {}
1376 }
1377 }
1378 }
1379 "#.to_owned()
1380 ))
1381 ];
1382 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1383 let client = WsDeltasClient::new_with_reconnects(
1384 &format!("ws://{addr}"),
1385 None,
1386 3,
1387 Duration::from_millis(110),
1389 )
1390 .unwrap();
1391
1392 let jh: JoinHandle<Result<(), DeltasError>> = client
1393 .connect()
1394 .await
1395 .expect("connect failed");
1396
1397 for _ in 0..2 {
1398 dbg!("loop");
1399 let (_, mut rx) = timeout(
1400 Duration::from_millis(200),
1401 client.subscribe(
1402 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1403 SubscriptionOptions::new(),
1404 ),
1405 )
1406 .await
1407 .expect("subscription timed out")
1408 .expect("subscription failed");
1409
1410 let _ = timeout(Duration::from_millis(100), rx.recv())
1411 .await
1412 .expect("awaiting message timeout out")
1413 .expect("receiving message failed");
1414
1415 let res = timeout(Duration::from_millis(200), rx.recv())
1417 .await
1418 .expect("awaiting closed connection timeout out");
1419 assert!(res.is_none());
1420 }
1421 let res = jh.await.expect("ws client join failed");
1422 assert!(res.is_err());
1424 server_thread
1425 .await
1426 .expect("ws server loop errored");
1427 }
1428
1429 async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1430 let server = TcpListener::bind("127.0.0.1:0")
1431 .await
1432 .expect("localhost bind failed");
1433 let addr = server.local_addr().unwrap();
1434 let jh = tokio::spawn(async move {
1435 while let Ok((stream, _)) = server.accept().await {
1436 if accept_first {
1437 let stream = tokio_tungstenite::accept_async(stream)
1439 .await
1440 .unwrap();
1441 sleep(Duration::from_millis(10)).await;
1442 drop(stream)
1443 } else {
1444 drop(stream);
1446 }
1447 }
1448 });
1449 (addr, jh)
1450 }
1451
1452 #[test(tokio::test)]
1453 async fn test_subscribe_dead_client_after_max_attempts() {
1454 let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1455 let client = WsDeltasClient::new_with_reconnects(
1456 &format!("ws://{addr}"),
1457 None,
1458 3,
1459 Duration::from_secs(0),
1460 )
1461 .unwrap();
1462
1463 let join_handle = client.connect().await.unwrap();
1464 let handle_res = join_handle.await.unwrap();
1465 assert!(handle_res.is_err());
1466 assert!(!client.is_connected().await);
1467
1468 let subscription_res = timeout(
1469 Duration::from_millis(10),
1470 client.subscribe(
1471 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1472 SubscriptionOptions::new(),
1473 ),
1474 )
1475 .await
1476 .unwrap();
1477 assert!(subscription_res.is_err());
1478 }
1479
1480 #[test(tokio::test)]
1481 async fn test_ws_client_retry_cooldown() {
1482 let start = std::time::Instant::now();
1483 let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1484
1485 let client = WsDeltasClient::new_with_reconnects(
1487 &format!("ws://{addr}"),
1488 None,
1489 3, Duration::from_millis(50), )
1492 .unwrap();
1493
1494 let connect_result = client.connect().await;
1496 let elapsed = start.elapsed();
1497
1498 assert!(connect_result.is_err(), "Expected connection to fail after retries");
1500
1501 assert!(
1503 elapsed >= Duration::from_millis(100),
1504 "Expected at least 100ms elapsed, got {:?}",
1505 elapsed
1506 );
1507
1508 assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1510 }
1511
1512 #[test_log::test(tokio::test)]
1513 async fn test_buffer_full_triggers_unsubscribe() {
1514 let exp_comm = {
1516 [
1517 ExpectedComm::Receive(
1519 100,
1520 tungstenite::protocol::Message::Text(
1521 r#"
1522 {
1523 "method":"subscribe",
1524 "extractor_id":{
1525 "chain":"ethereum",
1526 "name":"vm:ambient"
1527 },
1528 "include_state": true
1529 }"#
1530 .to_owned()
1531 .replace(|c: char| c.is_whitespace(), ""),
1532 ),
1533 ),
1534 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1536 r#"
1537 {
1538 "method":"newsubscription",
1539 "extractor_id":{
1540 "chain":"ethereum",
1541 "name":"vm:ambient"
1542 },
1543 "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1544 }"#
1545 .to_owned()
1546 .replace(|c: char| c.is_whitespace(), ""),
1547 )),
1548 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1550 r#"
1551 {
1552 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1553 "deltas": {
1554 "extractor": "vm:ambient",
1555 "chain": "ethereum",
1556 "block": {
1557 "number": 123,
1558 "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1559 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1560 "chain": "ethereum",
1561 "ts": "2023-09-14T00:00:00"
1562 },
1563 "finalized_block_height": 0,
1564 "revert": false,
1565 "new_tokens": {},
1566 "account_updates": {},
1567 "state_updates": {},
1568 "new_protocol_components": {},
1569 "deleted_protocol_components": {},
1570 "component_balances": {},
1571 "account_balances": {},
1572 "component_tvl": {},
1573 "dci_update": {
1574 "new_entrypoints": {},
1575 "new_entrypoint_params": {},
1576 "trace_results": {}
1577 }
1578 }
1579 }
1580 "#.to_owned()
1581 )),
1582 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1584 r#"
1585 {
1586 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1587 "deltas": {
1588 "extractor": "vm:ambient",
1589 "chain": "ethereum",
1590 "block": {
1591 "number": 124,
1592 "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1593 "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1594 "chain": "ethereum",
1595 "ts": "2023-09-14T00:00:01"
1596 },
1597 "finalized_block_height": 0,
1598 "revert": false,
1599 "new_tokens": {},
1600 "account_updates": {},
1601 "state_updates": {},
1602 "new_protocol_components": {},
1603 "deleted_protocol_components": {},
1604 "component_balances": {},
1605 "account_balances": {},
1606 "component_tvl": {},
1607 "dci_update": {
1608 "new_entrypoints": {},
1609 "new_entrypoint_params": {},
1610 "trace_results": {}
1611 }
1612 }
1613 }
1614 "#.to_owned()
1615 )),
1616 ExpectedComm::Receive(
1618 100,
1619 tungstenite::protocol::Message::Text(
1620 r#"
1621 {
1622 "method": "unsubscribe",
1623 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1624 }
1625 "#
1626 .to_owned()
1627 .replace(|c: char| c.is_whitespace(), ""),
1628 ),
1629 ),
1630 ExpectedComm::Send(tungstenite::protocol::Message::Text(
1632 r#"
1633 {
1634 "method": "subscriptionended",
1635 "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1636 }
1637 "#
1638 .to_owned()
1639 .replace(|c: char| c.is_whitespace(), ""),
1640 )),
1641 ]
1642 };
1643
1644 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1645
1646 let client = WsDeltasClient::new_with_custom_buffers(
1648 &format!("ws://{addr}"),
1649 None,
1650 128, 1, )
1653 .unwrap();
1654
1655 let jh = client
1656 .connect()
1657 .await
1658 .expect("connect failed");
1659
1660 let (_sub_id, mut rx) = timeout(
1661 Duration::from_millis(100),
1662 client.subscribe(
1663 ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1664 SubscriptionOptions::new(),
1665 ),
1666 )
1667 .await
1668 .expect("subscription timed out")
1669 .expect("subscription failed");
1670
1671 tokio::time::sleep(Duration::from_millis(100)).await;
1673
1674 let mut received_msgs = Vec::new();
1676
1677 while received_msgs.len() < 3 {
1679 match timeout(Duration::from_millis(200), rx.recv()).await {
1680 Ok(Some(msg)) => {
1681 received_msgs.push(msg);
1682 }
1683 Ok(None) => {
1684 break;
1686 }
1687 Err(_) => {
1688 break;
1690 }
1691 }
1692 }
1693
1694 assert!(
1696 received_msgs.len() <= 1,
1697 "Expected buffer overflow to limit messages to at most 1, got {}",
1698 received_msgs.len()
1699 );
1700
1701 if let Some(first_msg) = received_msgs.first() {
1702 assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1703 }
1704
1705 drop(rx); tokio::time::sleep(Duration::from_millis(50)).await;
1712
1713 jh.abort();
1715 server_thread.abort();
1716
1717 let _ = jh.await;
1718 let _ = server_thread.await;
1719 }
1720
1721 #[tokio::test]
1722 async fn test_server_error_handling() {
1723 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1724
1725 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1726
1727 let error_response = WebSocketMessage::Response(Response::Error(
1729 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1730 ));
1731 let error_json = serde_json::to_string(&error_response).unwrap();
1732
1733 let exp_comm = [
1734 ExpectedComm::Receive(
1735 100,
1736 tungstenite::protocol::Message::Text(
1737 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1738 ),
1739 ),
1740 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1741 ];
1742
1743 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1744
1745 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1746 let jh = client
1747 .connect()
1748 .await
1749 .expect("connect failed");
1750
1751 let result = timeout(
1752 Duration::from_millis(100),
1753 client.subscribe(extractor_id, SubscriptionOptions::new()),
1754 )
1755 .await
1756 .expect("subscription timed out");
1757
1758 assert!(result.is_err());
1760 if let Err(DeltasError::ServerError(msg, _)) = result {
1761 assert!(msg.contains("Subscription failed"));
1762 assert!(msg.contains("Extractor not found"));
1763 } else {
1764 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1765 }
1766
1767 timeout(Duration::from_millis(100), client.close())
1768 .await
1769 .expect("close timed out")
1770 .expect("close failed");
1771 jh.await
1772 .expect("ws loop errored")
1773 .unwrap();
1774 server_thread.await.unwrap();
1775 }
1776
1777 #[test_log::test(tokio::test)]
1778 async fn test_subscription_not_found_error() {
1779 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1781
1782 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1783 let subscription_id = Uuid::new_v4();
1784
1785 let error_response = WebSocketMessage::Response(Response::Error(
1786 WebsocketError::SubscriptionNotFound(subscription_id),
1787 ));
1788 let error_json = serde_json::to_string(&error_response).unwrap();
1789
1790 let exp_comm = [
1791 ExpectedComm::Receive(
1793 100,
1794 tungstenite::protocol::Message::Text(
1795 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1796 ),
1797 ),
1798 ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1799 r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1800 subscription_id
1801 ))),
1802 ExpectedComm::Receive(
1804 100,
1805 tungstenite::protocol::Message::Text(format!(
1806 r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1807 subscription_id
1808 )),
1809 ),
1810 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1812 ];
1813
1814 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1815
1816 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1817 let jh = client
1818 .connect()
1819 .await
1820 .expect("connect failed");
1821
1822 let (received_sub_id, _rx) = timeout(
1824 Duration::from_millis(100),
1825 client.subscribe(extractor_id, SubscriptionOptions::new()),
1826 )
1827 .await
1828 .expect("subscription timed out")
1829 .expect("subscription failed");
1830
1831 assert_eq!(received_sub_id, subscription_id);
1832
1833 let unsubscribe_result =
1835 timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1836 .await
1837 .expect("unsubscribe timed out");
1838
1839 unsubscribe_result
1843 .expect("Unsubscribe should succeed even if server says subscription not found");
1844
1845 timeout(Duration::from_millis(100), client.close())
1846 .await
1847 .expect("close timed out")
1848 .expect("close failed");
1849 jh.await
1850 .expect("ws loop errored")
1851 .unwrap();
1852 server_thread.await.unwrap();
1853 }
1854
1855 #[test_log::test(tokio::test)]
1856 async fn test_parse_error_handling() {
1857 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1858
1859 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1860 let error_response = WebSocketMessage::Response(Response::Error(
1861 WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1862 ));
1863 let error_json = serde_json::to_string(&error_response).unwrap();
1864
1865 let exp_comm = [
1866 ExpectedComm::Receive(
1868 100,
1869 tungstenite::protocol::Message::Text(
1870 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1871 ),
1872 ),
1873 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1874 ];
1875
1876 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1877
1878 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1879 let jh = client
1880 .connect()
1881 .await
1882 .expect("connect failed");
1883
1884 let _ = timeout(
1886 Duration::from_millis(100),
1887 client.subscribe(extractor_id, SubscriptionOptions::new()),
1888 )
1889 .await
1890 .expect("subscription timed out");
1891
1892 let result = jh
1894 .await
1895 .expect("ws loop should complete");
1896 assert!(result.is_err());
1897 if let Err(DeltasError::ServerError(message, _)) = result {
1898 assert!(message.contains("Server failed to parse client message"));
1899 } else {
1900 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1901 }
1902
1903 server_thread.await.unwrap();
1904 }
1905
1906 #[tokio::test]
1907 async fn test_subscribe_error_handling() {
1908 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1909
1910 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1911
1912 let error_response = WebSocketMessage::Response(Response::Error(
1913 WebsocketError::SubscribeError(extractor_id.clone()),
1914 ));
1915 let error_json = serde_json::to_string(&error_response).unwrap();
1916
1917 let exp_comm = [
1918 ExpectedComm::Receive(
1919 100,
1920 tungstenite::protocol::Message::Text(
1921 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true}"#.to_string()
1922 ),
1923 ),
1924 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1925 ];
1926
1927 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1928
1929 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1930 let jh = client
1931 .connect()
1932 .await
1933 .expect("connect failed");
1934
1935 let result = timeout(
1936 Duration::from_millis(100),
1937 client.subscribe(extractor_id, SubscriptionOptions::new()),
1938 )
1939 .await
1940 .expect("subscription timed out");
1941
1942 assert!(result.is_err());
1944 if let Err(DeltasError::ServerError(msg, _)) = result {
1945 assert!(msg.contains("Subscription failed"));
1946 assert!(msg.contains("Failed to subscribe to extractor"));
1947 } else {
1948 panic!("Expected DeltasError::ServerError, got: {:?}", result);
1949 }
1950
1951 timeout(Duration::from_millis(100), client.close())
1952 .await
1953 .expect("close timed out")
1954 .expect("close failed");
1955 jh.await
1956 .expect("ws loop errored")
1957 .unwrap();
1958 server_thread.await.unwrap();
1959 }
1960
1961 #[tokio::test]
1962 async fn test_cancel_pending_subscription() {
1963 use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1965
1966 let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1967
1968 let error_response = WebSocketMessage::Response(Response::Error(
1969 WebsocketError::ExtractorNotFound(extractor_id.clone()),
1970 ));
1971 let error_json = serde_json::to_string(&error_response).unwrap();
1972
1973 let exp_comm = [
1974 ExpectedComm::Receive(
1975 100,
1976 tungstenite::protocol::Message::Text(
1977 r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1978 ),
1979 ),
1980 ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1981 ];
1982
1983 let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1984
1985 let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1986 let jh = client
1987 .connect()
1988 .await
1989 .expect("connect failed");
1990
1991 let client_clone = client.clone();
1993 let extractor_id_clone = extractor_id.clone();
1994
1995 let subscription1 = tokio::spawn({
1996 let client_for_spawn = client.clone();
1997 async move {
1998 client_for_spawn
1999 .subscribe(extractor_id, SubscriptionOptions::new())
2000 .await
2001 }
2002 });
2003
2004 let subscription2 = tokio::spawn(async move {
2005 client_clone
2007 .subscribe(extractor_id_clone, SubscriptionOptions::new())
2008 .await
2009 });
2010
2011 let (result1, result2) = tokio::join!(subscription1, subscription2);
2012
2013 let result1 = result1.unwrap();
2014 let result2 = result2.unwrap();
2015
2016 assert!(result1.is_err() || result2.is_err());
2019
2020 if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2021 } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2023 } else {
2025 panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2026 }
2027
2028 timeout(Duration::from_millis(100), client.close())
2029 .await
2030 .expect("close timed out")
2031 .expect("close failed");
2032 jh.await
2033 .expect("ws loop errored")
2034 .unwrap();
2035 server_thread.await.unwrap();
2036 }
2037}