tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is clonable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::Arc,
25    time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31    header::{
32        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33        USER_AGENT,
34    },
35    Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41    net::TcpStream,
42    sync::{
43        mpsc::{self, error::TrySendError, Receiver, Sender},
44        oneshot, Mutex, Notify,
45    },
46    task::JoinHandle,
47    time::sleep,
48};
49use tokio_tungstenite::{
50    connect_async,
51    tungstenite::{
52        self,
53        handshake::client::{generate_key, Request},
54    },
55    MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65    /// Failed to parse the provided URI.
66    #[error("Failed to parse URI: {0}. Error: {1}")]
67    UriParsing(String, String),
68
69    /// The requested subscription is already pending and is awaiting confirmation from the server.
70    #[error("The requested subscription is already pending")]
71    SubscriptionAlreadyPending,
72
73    /// A message failed to send via an internal channel or through the websocket channel.
74    /// This is typically a fatal error and might indicate a bug in the implementation.
75    #[error("{0}")]
76    TransportError(String),
77
78    /// The internal message buffer is full. This likely means that messages are not being consumed
79    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
80    /// size.
81    #[error("The buffer is full!")]
82    BufferFull,
83
84    /// The client has no active connections but was accessed (e.g., by calling subscribe).
85    /// This typically occurs when trying to use the client before calling connect() or
86    /// after the connection has been closed.
87    #[error("The client is not connected!")]
88    NotConnected,
89
90    /// The connect method was called while the client already had an active connection.
91    #[error("The client is already connected!")]
92    AlreadyConnected,
93
94    /// The connection was closed orderly by the server, e.g. because it restarted.
95    #[error("The server closed the connection!")]
96    ConnectionClosed,
97
98    /// The connection was closed unexpectedly by the server or encountered a network error.
99    #[error("Connection error: {0}")]
100    ConnectionError(#[from] Box<tungstenite::Error>),
101
102    /// A fatal error occurred that cannot be recovered from.
103    #[error("Tycho FatalError: {0}")]
104    Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109    include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113    fn default() -> Self {
114        Self { include_state: true }
115    }
116}
117
118impl SubscriptionOptions {
119    pub fn new() -> Self {
120        Self::default()
121    }
122    pub fn with_state(mut self, val: bool) -> Self {
123        self.include_state = val;
124        self
125    }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131    /// Subscribe to an extractor and receive realtime messages
132    ///
133    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
134    /// cancels while waiting for confirmation the subscription may still be registered. If the
135    /// receiver was deallocated though, the first message from the subscription will remove it
136    /// again - since there is no one to inform about these messages.
137    async fn subscribe(
138        &self,
139        extractor_id: ExtractorIdentity,
140        options: SubscriptionOptions,
141    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143    /// Unsubscribe from an subscription
144    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146    /// Start the clients message handling loop.
147    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149    /// Close the clients message handling loop.
150    async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155    /// The tycho indexer websocket uri.
156    uri: Uri,
157    /// Authorization key for the websocket connection.
158    auth_key: Option<String>,
159    /// Maximum amount of reconnects to try before giving up.
160    max_reconnects: u32,
161    /// The client will buffer this many messages incoming from the websocket
162    /// before starting to drop them.
163    ws_buffer_size: usize,
164    /// The client will buffer that many messages for each subscription before it starts droppping
165    /// them.
166    subscription_buffer_size: usize,
167    /// Notify tasks waiting for a connection to be established.
168    conn_notify: Arc<Notify>,
169    /// Shared client instance state.
170    inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176/// Subscription State
177///
178/// Subscription go through a lifecycle:
179///
180/// ```text
181/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
182/// ```
183///
184/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
185/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
186/// for `unsubscribe`.
187#[derive(Debug)]
188enum SubscriptionInfo {
189    /// Subscription was requested we wait for server confirmation and uuid assignment.
190    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191    /// Subscription is active.
192    Active,
193    /// Unsubscription was requested, we wait for server confirmation.
194    RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197/// Internal struct containing shared state between of WsDeltaClient instances.
198struct Inner {
199    /// Websocket sender handle.
200    sink: WebSocketSink,
201    /// Command channel sender handle.
202    cmd_tx: Sender<()>,
203    /// Currently pending subscriptions.
204    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205    /// Active subscriptions.
206    subscriptions: HashMap<Uuid, SubscriptionInfo>,
207    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
208    /// subscribe.
209    sender: HashMap<Uuid, Sender<BlockChanges>>,
210    /// How many messages to buffer per subscription before starting to drop new messages.
211    buffer_size: usize,
212}
213
214/// Shared state betweeen all client instances.
215///
216/// This state is behind a mutex and requires synchronization to be read of modified.
217impl Inner {
218    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219        Self {
220            sink,
221            cmd_tx,
222            pending: HashMap::new(),
223            subscriptions: HashMap::new(),
224            sender: HashMap::new(),
225            buffer_size,
226        }
227    }
228
229    /// Registers a new pending subscription.
230    #[allow(clippy::result_large_err)]
231    fn new_subscription(
232        &mut self,
233        id: &ExtractorIdentity,
234        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235    ) -> Result<(), DeltasError> {
236        if self.pending.contains_key(id) {
237            return Err(DeltasError::SubscriptionAlreadyPending);
238        }
239        self.pending
240            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241        Ok(())
242    }
243
244    /// Transitions a pending subscription to active.
245    ///
246    /// Will ignore any request to do so for subscriptions that are not pending.
247    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248        if let Some(info) = self.pending.remove(extractor_id) {
249            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250                let (tx, rx) = mpsc::channel(self.buffer_size);
251                self.sender.insert(subscription_id, tx);
252                self.subscriptions
253                    .insert(subscription_id, SubscriptionInfo::Active);
254                let _ = ready_tx
255                    .send((subscription_id, rx))
256                    .map_err(|_| {
257                        warn!(
258                            ?extractor_id,
259                            ?subscription_id,
260                            "Subscriber for has gone away. Ignoring."
261                        )
262                    });
263            } else {
264                error!(
265                    ?extractor_id,
266                    ?subscription_id,
267                    "Pending subscription was not in the correct state to 
268                    transition to active. Ignoring!"
269                )
270            }
271        } else {
272            error!(
273                ?extractor_id,
274                ?subscription_id,
275                "Tried to mark an unkown subscription as active. Ignoring!"
276            );
277        }
278    }
279
280    /// Sends a message to a subscription's receiver.
281    #[allow(clippy::result_large_err)]
282    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283        if let Some(sender) = self.sender.get_mut(id) {
284            sender
285                .try_send(msg)
286                .map_err(|e| match e {
287                    TrySendError::Full(_) => DeltasError::BufferFull,
288                    TrySendError::Closed(_) => {
289                        DeltasError::TransportError("The subscriber has gone away".to_string())
290                    }
291                })?;
292        }
293        Ok(())
294    }
295
296    /// Requests a subscription to end.
297    ///
298    /// The subscription needs to exist and be active for this to have any effect. Wll use
299    /// `ready_tx` to notify the receiver once the transition to ended completed.
300    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301        if let Some(info) = self
302            .subscriptions
303            .get_mut(subscription_id)
304        {
305            if let SubscriptionInfo::Active = info {
306                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307            }
308        } else {
309            // no big deal imo so only debug lvl..
310            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311        }
312    }
313
314    /// Removes and fully ends a subscription
315    ///
316    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
317    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
318    /// Will remove a subscription even it was in active or pending state before, this is to support
319    /// any server side failure of the subscription.
320    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321        if let Entry::Occupied(e) = self
322            .subscriptions
323            .entry(subscription_id)
324        {
325            let info = e.remove();
326            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327                let _ = tx.send(()).map_err(|_| {
328                    warn!(?subscription_id, "failed to notify about removed subscription")
329                });
330                self.sender
331                    .remove(&subscription_id)
332                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333            } else {
334                warn!(?subscription_id, "Subscription ended unexpectedly!");
335                self.sender
336                    .remove(&subscription_id)
337                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338            }
339        } else {
340            error!(
341                ?subscription_id,
342                "Received `SubscriptionEnded`, but was never subscribed 
343                to it. This is likely a bug!"
344            );
345        }
346
347        Ok(())
348    }
349
350    /// Sends a message through the websocket.
351    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352        self.sink.send(msg).await.map_err(|e| {
353            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354        })
355    }
356}
357
358/// Tycho client websocket implementation.
359impl WsDeltasClient {
360    // Construct a new client with 5 reconnection attempts.
361    #[allow(clippy::result_large_err)]
362    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363        let uri = ws_uri
364            .parse::<Uri>()
365            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366        Ok(Self {
367            uri,
368            auth_key: auth_key.map(|s| s.to_string()),
369            inner: Arc::new(Mutex::new(None)),
370            ws_buffer_size: 128,
371            subscription_buffer_size: 128,
372            conn_notify: Arc::new(Notify::new()),
373            max_reconnects: 5,
374        })
375    }
376
377    // Construct a new client with a custom number of reconnection attempts.
378    #[allow(clippy::result_large_err)]
379    pub fn new_with_reconnects(
380        ws_uri: &str,
381        max_reconnects: u32,
382        auth_key: Option<&str>,
383    ) -> Result<Self, DeltasError> {
384        let uri = ws_uri
385            .parse::<Uri>()
386            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388        Ok(Self {
389            uri,
390            auth_key: auth_key.map(|s| s.to_string()),
391            inner: Arc::new(Mutex::new(None)),
392            ws_buffer_size: 128,
393            subscription_buffer_size: 128,
394            conn_notify: Arc::new(Notify::new()),
395            max_reconnects,
396        })
397    }
398
399    /// Ensures that the client is connected.
400    ///
401    /// This method will acquire the lock for inner.
402    async fn is_connected(&self) -> bool {
403        let guard = self.inner.as_ref().lock().await;
404        guard.is_some()
405    }
406
407    /// Waits for the client to be connected
408    ///
409    /// This method acquires the lock for inner for a short period, then waits until the  
410    /// connection is established if not already connected.
411    async fn ensure_connection(&self) {
412        if !self.is_connected().await {
413            self.conn_notify.notified().await;
414        }
415    }
416
417    /// Main message handling logic
418    ///
419    /// If the message returns an error, a reconnect attempt may be considered depending on the
420    /// error type.
421    #[instrument(skip(self, msg))]
422    async fn handle_msg(
423        &self,
424        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425    ) -> Result<(), DeltasError> {
426        let mut guard = self.inner.lock().await;
427
428        match msg {
429            // We do not deserialize the message directly into a WebSocketMessage. This is because
430            // the serde arbitrary_precision feature (often included in many
431            // dependencies we use) breaks some untagged enum deserializations. Instead,
432            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
433            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434                serde_json::Value,
435            >(&text)
436            {
437                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438                    Ok(ws_message) => match ws_message {
439                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440                            trace!(?deltas, "Received a block state change, sending to channel");
441                            let inner = guard
442                                .as_mut()
443                                .ok_or_else(|| DeltasError::NotConnected)?;
444                            match inner.send(&subscription_id, deltas) {
445                                Err(DeltasError::BufferFull) => {
446                                    error!(?subscription_id, "Buffer full, message dropped!");
447                                }
448                                Err(_) => {
449                                    warn!(
450                                        ?subscription_id,
451                                        "Receiver for has gone away, unsubscribing!"
452                                    );
453                                    let (tx, _) = oneshot::channel();
454                                    let _ = WsDeltasClient::unsubscribe_inner(
455                                        inner,
456                                        subscription_id,
457                                        tx,
458                                    )
459                                    .await;
460                                }
461                                _ => { /* Do nothing */ }
462                            }
463                        }
464                        WebSocketMessage::Response(Response::NewSubscription {
465                            extractor_id,
466                            subscription_id,
467                        }) => {
468                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
469                            let inner = guard
470                                .as_mut()
471                                .ok_or_else(|| DeltasError::NotConnected)?;
472                            inner.mark_active(&extractor_id, subscription_id);
473                        }
474                        WebSocketMessage::Response(Response::SubscriptionEnded {
475                            subscription_id,
476                        }) => {
477                            info!(?subscription_id, "Received a subscription ended");
478                            let inner = guard
479                                .as_mut()
480                                .ok_or_else(|| DeltasError::NotConnected)?;
481                            inner.remove_subscription(subscription_id)?;
482                        }
483                    },
484                    Err(e) => {
485                        error!(error = %e, message=text, "Failed to deserialize WebSocketMessage: message does not match expected structs");
486                    }
487                },
488                Err(e) => {
489                    error!(error = %e, message=text, "Failed to deserialize message: invalid json");
490                }
491            },
492            Ok(tungstenite::protocol::Message::Ping(_)) => {
493                // Respond to pings with pongs.
494                let inner = guard
495                    .as_mut()
496                    .ok_or_else(|| DeltasError::NotConnected)?;
497                if let Err(error) = inner
498                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
499                    .await
500                {
501                    debug!(?error, "Failed to send pong!");
502                }
503            }
504            Ok(tungstenite::protocol::Message::Pong(_)) => {
505                // Do nothing.
506            }
507            Ok(tungstenite::protocol::Message::Close(_)) => {
508                return Err(DeltasError::ConnectionClosed);
509            }
510            Ok(unknown_msg) => {
511                info!("Received an unknown message type: {:?}", unknown_msg);
512            }
513            Err(error) => {
514                error!(?error, "Websocket error");
515                return Err(match error {
516                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
517                    tungstenite::Error::AlreadyClosed => {
518                        warn!("Received AlreadyClosed error which is indicative of a bug!");
519                        DeltasError::ConnectionError(Box::new(error))
520                    }
521                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
522                        DeltasError::ConnectionError(Box::new(error))
523                    }
524                    _ => DeltasError::Fatal(error.to_string()),
525                });
526            }
527        };
528        Ok(())
529    }
530
531    /// Helper method to force request a unsubscribe of a subscription
532    ///
533    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
534    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
535    /// receivers.
536    async fn unsubscribe_inner(
537        inner: &mut Inner,
538        subscription_id: Uuid,
539        ready_tx: oneshot::Sender<()>,
540    ) -> Result<(), DeltasError> {
541        inner.end_subscription(&subscription_id, ready_tx);
542        let cmd = Command::Unsubscribe { subscription_id };
543        inner
544            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
545                |e| {
546                    DeltasError::TransportError(format!(
547                        "Failed to serialize unsubscribe command: {e}"
548                    ))
549                },
550            )?))
551            .await?;
552        Ok(())
553    }
554}
555
556#[async_trait]
557impl DeltasClient for WsDeltasClient {
558    #[instrument(skip(self))]
559    async fn subscribe(
560        &self,
561        extractor_id: ExtractorIdentity,
562        options: SubscriptionOptions,
563    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
564        trace!("Starting subscribe");
565        self.ensure_connection().await;
566        let (ready_tx, ready_rx) = oneshot::channel();
567        {
568            let mut guard = self.inner.lock().await;
569            let inner = guard
570                .as_mut()
571                .ok_or_else(|| DeltasError::NotConnected)?;
572            trace!("Sending subscribe command");
573            inner.new_subscription(&extractor_id, ready_tx)?;
574            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
575            inner
576                .ws_send(tungstenite::protocol::Message::Text(
577                    serde_json::to_string(&cmd).map_err(|e| {
578                        DeltasError::TransportError(format!(
579                            "Failed to serialize subscribe command: {e}"
580                        ))
581                    })?,
582                ))
583                .await?;
584        }
585        trace!("Waiting for subscription response");
586        let rx = ready_rx.await.map_err(|_| {
587            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
588        })?;
589        trace!("Subscription successful");
590        Ok(rx)
591    }
592
593    #[instrument(skip(self))]
594    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
595        self.ensure_connection().await;
596        let (ready_tx, ready_rx) = oneshot::channel();
597        {
598            let mut guard = self.inner.lock().await;
599            let inner = guard
600                .as_mut()
601                .ok_or_else(|| DeltasError::NotConnected)?;
602
603            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
604        }
605        ready_rx.await.map_err(|_| {
606            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
607        })?;
608
609        Ok(())
610    }
611
612    #[instrument(skip(self))]
613    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
614        if self.is_connected().await {
615            return Err(DeltasError::AlreadyConnected);
616        }
617        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
618        info!(?ws_uri, "Starting TychoWebsocketClient");
619
620        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
621        {
622            let mut guard = self.inner.as_ref().lock().await;
623            *guard = None;
624        }
625        let this = self.clone();
626        let jh = tokio::spawn(async move {
627            let mut retry_count = 0;
628            let mut result = Err(DeltasError::NotConnected);
629
630            'retry: while retry_count < this.max_reconnects {
631                info!(?ws_uri, "Connecting to WebSocket server");
632
633                // Create a WebSocket request
634                let mut request_builder = Request::builder()
635                    .uri(&ws_uri)
636                    .header(SEC_WEBSOCKET_KEY, generate_key())
637                    .header(SEC_WEBSOCKET_VERSION, 13)
638                    .header(CONNECTION, "Upgrade")
639                    .header(UPGRADE, "websocket")
640                    .header(
641                        HOST,
642                        this.uri.host().ok_or_else(|| {
643                            DeltasError::UriParsing(
644                                ws_uri.clone(),
645                                "No host found in tycho url".to_string(),
646                            )
647                        })?,
648                    )
649                    .header(
650                        USER_AGENT,
651                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
652                    );
653
654                // Add Authorization if one is given
655                if let Some(ref key) = this.auth_key {
656                    request_builder = request_builder.header(AUTHORIZATION, key);
657                }
658
659                let request = request_builder.body(()).map_err(|e| {
660                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
661                })?;
662                let (conn, _) = match connect_async(request).await {
663                    Ok(conn) => conn,
664                    Err(e) => {
665                        // Prepare for reconnection
666                        retry_count += 1;
667                        let mut guard = this.inner.as_ref().lock().await;
668                        *guard = None;
669
670                        warn!(
671                            e = e.to_string(),
672                            "Failed to connect to WebSocket server; Reconnecting"
673                        );
674                        sleep(Duration::from_millis(500)).await;
675
676                        continue 'retry;
677                    }
678                };
679
680                let (ws_tx_new, ws_rx_new) = conn.split();
681                {
682                    let mut guard = this.inner.as_ref().lock().await;
683                    *guard =
684                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
685                }
686                let mut msg_rx = ws_rx_new.boxed();
687
688                info!("Connection Successful: TychoWebsocketClient started");
689                this.conn_notify.notify_waiters();
690                result = Ok(());
691
692                loop {
693                    let res = tokio::select! {
694                        msg = msg_rx.next() => match msg {
695                            Some(msg) => this.handle_msg(msg).await,
696                            None => { break 'retry } // ws connection silently closed
697                        },
698                        _ = cmd_rx.recv() => {break 'retry},
699                    };
700                    if let Err(error) = res {
701                        if matches!(
702                            error,
703                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
704                        ) {
705                            // Prepare for reconnection
706                            retry_count += 1;
707                            let mut guard = this.inner.as_ref().lock().await;
708                            *guard = None;
709
710                            warn!(
711                                ?error,
712                                ?retry_count,
713                                "Connection dropped unexpectedly; Reconnecting..."
714                            );
715                            break;
716                        } else {
717                            // Other errors are considered fatal
718                            error!(?error, "Fatal error; Exiting");
719                            result = Err(error);
720                            break 'retry;
721                        }
722                    }
723                }
724            }
725
726            // Clean up before exiting
727            let mut guard = this.inner.as_ref().lock().await;
728            *guard = None;
729
730            // Check if max retries has been reached.
731            if retry_count >= this.max_reconnects {
732                error!("Max reconnection attempts reached; Exiting");
733                this.conn_notify.notify_waiters(); // Notify that the task is done
734                result = Err(DeltasError::ConnectionClosed);
735            }
736
737            result
738        });
739
740        self.conn_notify.notified().await;
741
742        if self.is_connected().await {
743            Ok(jh)
744        } else {
745            Err(DeltasError::NotConnected)
746        }
747    }
748
749    #[instrument(skip(self))]
750    async fn close(&self) -> Result<(), DeltasError> {
751        info!("Closing TychoWebsocketClient");
752        let mut guard = self.inner.lock().await;
753        let inner = guard
754            .as_mut()
755            .ok_or_else(|| DeltasError::NotConnected)?;
756        inner
757            .cmd_tx
758            .send(())
759            .await
760            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
761        Ok(())
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use std::net::SocketAddr;
768
769    use tokio::{net::TcpListener, time::timeout};
770    use tycho_common::dto::Chain;
771
772    use super::*;
773
774    #[derive(Clone)]
775    enum ExpectedComm {
776        Receive(u64, tungstenite::protocol::Message),
777        Send(tungstenite::protocol::Message),
778    }
779
780    async fn mock_tycho_ws(
781        messages: &[ExpectedComm],
782        reconnects: usize,
783    ) -> (SocketAddr, JoinHandle<()>) {
784        info!("Starting mock webserver");
785        // zero port here means the OS chooses an open port
786        let server = TcpListener::bind("127.0.0.1:0")
787            .await
788            .expect("localhost bind failed");
789        let addr = server.local_addr().unwrap();
790        let messages = messages.to_vec();
791
792        let jh = tokio::spawn(async move {
793            info!("mock webserver started");
794            for _ in 0..(reconnects + 1) {
795                if let Ok((stream, _)) = server.accept().await {
796                    let mut websocket = tokio_tungstenite::accept_async(stream)
797                        .await
798                        .unwrap();
799
800                    info!("Handling messages..");
801                    for c in messages.iter().cloned() {
802                        match c {
803                            ExpectedComm::Receive(t, exp) => {
804                                info!("Awaiting message...");
805                                let msg = timeout(Duration::from_millis(t), websocket.next())
806                                    .await
807                                    .expect("Receive timeout")
808                                    .expect("Stream exhausted")
809                                    .expect("Failed to receive message.");
810                                info!("Message received");
811                                assert_eq!(msg, exp)
812                            }
813                            ExpectedComm::Send(data) => {
814                                info!("Sending message");
815                                websocket
816                                    .send(data)
817                                    .await
818                                    .expect("Failed to send message");
819                                info!("Message sent");
820                            }
821                        };
822                    }
823                    sleep(Duration::from_millis(100)).await;
824                    // Close the WebSocket connection
825                    let _ = websocket.close(None).await;
826                }
827            }
828        });
829        (addr, jh)
830    }
831
832    #[tokio::test]
833    async fn test_subscribe_receive() {
834        let exp_comm = [
835            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
836                {
837                    "method":"subscribe",
838                    "extractor_id":{
839                        "chain":"ethereum",
840                        "name":"vm:ambient"
841                    },
842                    "include_state": true
843                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
844            )),
845            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
846                {
847                    "method":"newsubscription",
848                    "extractor_id":{
849                    "chain":"ethereum",
850                    "name":"vm:ambient"
851                    },
852                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
853                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
854            )),
855            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
856                {
857                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
858                    "deltas": {
859                        "extractor": "vm:ambient",
860                        "chain": "ethereum",
861                        "block": {
862                            "number": 123,
863                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
864                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
865                            "chain": "ethereum",             
866                            "ts": "2023-09-14T00:00:00"
867                        },
868                        "finalized_block_height": 0,
869                        "revert": false,
870                        "new_tokens": {},
871                        "account_updates": {
872                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
873                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
874                                "chain": "ethereum",
875                                "slots": {},
876                                "balance": "0x01f4",
877                                "code": "",
878                                "change": "Update"
879                            }
880                        },
881                        "state_updates": {
882                            "component_1": {
883                                "component_id": "component_1",
884                                "updated_attributes": {"attr1": "0x01"},
885                                "deleted_attributes": ["attr2"]
886                            }
887                        },
888                        "new_protocol_components": 
889                            { "protocol_1": {
890                                    "id": "protocol_1",
891                                    "protocol_system": "system_1",
892                                    "protocol_type_name": "type_1",
893                                    "chain": "ethereum",
894                                    "tokens": ["0x01", "0x02"],
895                                    "contract_ids": ["0x01", "0x02"],
896                                    "static_attributes": {"attr1": "0x01f4"},
897                                    "change": "Update",
898                                    "creation_tx": "0x01",
899                                    "created_at": "2023-09-14T00:00:00"
900                                }
901                            },
902                        "deleted_protocol_components": {},
903                        "component_balances": {
904                            "protocol_1":
905                                {
906                                    "0x01": {
907                                        "token": "0x01",
908                                        "balance": "0x01f4",
909                                        "balance_float": 0.0,
910                                        "modify_tx": "0x01",
911                                        "component_id": "protocol_1"
912                                    }
913                                }
914                        },
915                        "account_balances": {
916                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
917                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
918                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
919                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
920                                    "balance": "0x01f4",
921                                    "modify_tx": "0x01"
922                                }
923                            }
924                        },
925                        "component_tvl": {
926                            "protocol_1": 1000.0
927                        }
928                    }
929                }
930                "#.to_owned()
931            ))
932        ];
933        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
934
935        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
936        let jh = client
937            .connect()
938            .await
939            .expect("connect failed");
940        let (_, mut rx) = timeout(
941            Duration::from_millis(100),
942            client.subscribe(
943                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
944                SubscriptionOptions::new(),
945            ),
946        )
947        .await
948        .expect("subscription timed out")
949        .expect("subscription failed");
950        let _ = timeout(Duration::from_millis(100), rx.recv())
951            .await
952            .expect("awaiting message timeout out")
953            .expect("receiving message failed");
954        timeout(Duration::from_millis(100), client.close())
955            .await
956            .expect("close timed out")
957            .expect("close failed");
958        jh.await
959            .expect("ws loop errored")
960            .unwrap();
961        server_thread.await.unwrap();
962    }
963
964    #[tokio::test]
965    async fn test_unsubscribe() {
966        let exp_comm = [
967            ExpectedComm::Receive(
968                100,
969                tungstenite::protocol::Message::Text(
970                    r#"
971                {
972                    "method": "subscribe",
973                    "extractor_id":{
974                        "chain": "ethereum",
975                        "name": "vm:ambient"
976                    },
977                    "include_state": true
978                }"#
979                    .to_owned()
980                    .replace(|c: char| c.is_whitespace(), ""),
981                ),
982            ),
983            ExpectedComm::Send(tungstenite::protocol::Message::Text(
984                r#"
985                {
986                    "method": "newsubscription",
987                    "extractor_id":{
988                        "chain": "ethereum",
989                        "name": "vm:ambient"
990                    },
991                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
992                }"#
993                .to_owned()
994                .replace(|c: char| c.is_whitespace(), ""),
995            )),
996            ExpectedComm::Receive(
997                100,
998                tungstenite::protocol::Message::Text(
999                    r#"
1000                {
1001                    "method": "unsubscribe",
1002                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1003                }
1004                "#
1005                    .to_owned()
1006                    .replace(|c: char| c.is_whitespace(), ""),
1007                ),
1008            ),
1009            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1010                r#"
1011                {
1012                    "method": "subscriptionended",
1013                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014                }
1015                "#
1016                .to_owned()
1017                .replace(|c: char| c.is_whitespace(), ""),
1018            )),
1019        ];
1020        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1021
1022        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1023        let jh = client
1024            .connect()
1025            .await
1026            .expect("connect failed");
1027        let (sub_id, mut rx) = timeout(
1028            Duration::from_millis(100),
1029            client.subscribe(
1030                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1031                SubscriptionOptions::new(),
1032            ),
1033        )
1034        .await
1035        .expect("subscription timed out")
1036        .expect("subscription failed");
1037
1038        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1039            .await
1040            .expect("unsubscribe timed out")
1041            .expect("unsubscribe failed");
1042        let res = timeout(Duration::from_millis(100), rx.recv())
1043            .await
1044            .expect("awaiting message timeout out");
1045
1046        // If the subscription ended, the channel should have been closed.
1047        assert!(res.is_none());
1048
1049        timeout(Duration::from_millis(100), client.close())
1050            .await
1051            .expect("close timed out")
1052            .expect("close failed");
1053        jh.await
1054            .expect("ws loop errored")
1055            .unwrap();
1056        server_thread.await.unwrap();
1057    }
1058
1059    #[tokio::test]
1060    async fn test_subscription_unexpected_end() {
1061        let exp_comm = [
1062            ExpectedComm::Receive(
1063                100,
1064                tungstenite::protocol::Message::Text(
1065                    r#"
1066                {
1067                    "method":"subscribe",
1068                    "extractor_id":{
1069                        "chain":"ethereum",
1070                        "name":"vm:ambient"
1071                    },
1072                    "include_state": true
1073                }"#
1074                    .to_owned()
1075                    .replace(|c: char| c.is_whitespace(), ""),
1076                ),
1077            ),
1078            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1079                r#"
1080                {
1081                    "method":"newsubscription",
1082                    "extractor_id":{
1083                        "chain":"ethereum",
1084                        "name":"vm:ambient"
1085                    },
1086                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1087                }"#
1088                .to_owned()
1089                .replace(|c: char| c.is_whitespace(), ""),
1090            )),
1091            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1092                r#"
1093                {
1094                    "method": "subscriptionended",
1095                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1096                }"#
1097                .to_owned()
1098                .replace(|c: char| c.is_whitespace(), ""),
1099            )),
1100        ];
1101        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1102
1103        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1104        let jh = client
1105            .connect()
1106            .await
1107            .expect("connect failed");
1108        let (_, mut rx) = timeout(
1109            Duration::from_millis(100),
1110            client.subscribe(
1111                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1112                SubscriptionOptions::new(),
1113            ),
1114        )
1115        .await
1116        .expect("subscription timed out")
1117        .expect("subscription failed");
1118        let res = timeout(Duration::from_millis(100), rx.recv())
1119            .await
1120            .expect("awaiting message timeout out");
1121
1122        // If the subscription ended, the channel should have been closed.
1123        assert!(res.is_none());
1124
1125        timeout(Duration::from_millis(100), client.close())
1126            .await
1127            .expect("close timed out")
1128            .expect("close failed");
1129        jh.await
1130            .expect("ws loop errored")
1131            .unwrap();
1132        server_thread.await.unwrap();
1133    }
1134
1135    #[test_log::test(tokio::test)]
1136    async fn test_reconnect() {
1137        let exp_comm = [
1138            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1139                {
1140                    "method":"subscribe",
1141                    "extractor_id":{
1142                        "chain":"ethereum",
1143                        "name":"vm:ambient"
1144                    },
1145                    "include_state": true
1146                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1147            )),
1148            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1149                {
1150                    "method":"newsubscription",
1151                    "extractor_id":{
1152                    "chain":"ethereum",
1153                    "name":"vm:ambient"
1154                    },
1155                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1156                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1157            )),
1158            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1159                {
1160                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1161                    "deltas": {
1162                        "extractor": "vm:ambient",
1163                        "chain": "ethereum",
1164                        "block": {
1165                            "number": 123,
1166                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1167                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1168                            "chain": "ethereum",             
1169                            "ts": "2023-09-14T00:00:00"
1170                        },
1171                        "finalized_block_height": 0,
1172                        "revert": false,
1173                        "new_tokens": {},
1174                        "account_updates": {
1175                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1176                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1177                                "chain": "ethereum",
1178                                "slots": {},
1179                                "balance": "0x01f4",
1180                                "code": "",
1181                                "change": "Update"
1182                            }
1183                        },
1184                        "state_updates": {
1185                            "component_1": {
1186                                "component_id": "component_1",
1187                                "updated_attributes": {"attr1": "0x01"},
1188                                "deleted_attributes": ["attr2"]
1189                            }
1190                        },
1191                        "new_protocol_components": {
1192                            "protocol_1":
1193                                {
1194                                    "id": "protocol_1",
1195                                    "protocol_system": "system_1",
1196                                    "protocol_type_name": "type_1",
1197                                    "chain": "ethereum",
1198                                    "tokens": ["0x01", "0x02"],
1199                                    "contract_ids": ["0x01", "0x02"],
1200                                    "static_attributes": {"attr1": "0x01f4"},
1201                                    "change": "Update",
1202                                    "creation_tx": "0x01",
1203                                    "created_at": "2023-09-14T00:00:00"
1204                                }
1205                            },
1206                        "deleted_protocol_components": {},
1207                        "component_balances": {
1208                            "protocol_1": {
1209                                "0x01": {
1210                                    "token": "0x01",
1211                                    "balance": "0x01f4",
1212                                    "balance_float": 1000.0,
1213                                    "modify_tx": "0x01",
1214                                    "component_id": "protocol_1"
1215                                }
1216                            }
1217                        },
1218                        "account_balances": {
1219                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1220                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1221                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1222                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1223                                    "balance": "0x01f4",
1224                                    "modify_tx": "0x01"
1225                                }
1226                            }
1227                        },
1228                        "component_tvl": {
1229                            "protocol_1": 1000.0
1230                        }
1231                    }
1232                }
1233                "#.to_owned()
1234            ))
1235        ];
1236        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1237        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1238        let jh: JoinHandle<Result<(), DeltasError>> = client
1239            .connect()
1240            .await
1241            .expect("connect failed");
1242
1243        for _ in 0..2 {
1244            let (_, mut rx) = timeout(
1245                Duration::from_millis(100),
1246                client.subscribe(
1247                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1248                    SubscriptionOptions::new(),
1249                ),
1250            )
1251            .await
1252            .expect("subscription timed out")
1253            .expect("subscription failed");
1254
1255            let _ = timeout(Duration::from_millis(100), rx.recv())
1256                .await
1257                .expect("awaiting message timeout out")
1258                .expect("receiving message failed");
1259
1260            // wait for the connection to drop
1261            let res = timeout(Duration::from_millis(200), rx.recv())
1262                .await
1263                .expect("awaiting closed connection timeout out");
1264            assert!(res.is_none());
1265        }
1266        let res = jh.await.expect("ws client join failed");
1267        // 5th client reconnect attempt should fail
1268        assert!(res.is_err());
1269        server_thread
1270            .await
1271            .expect("ws server loop errored");
1272    }
1273
1274    async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1275        let server = TcpListener::bind("127.0.0.1:0")
1276            .await
1277            .expect("localhost bind failed");
1278        let addr = server.local_addr().unwrap();
1279        let jh = tokio::spawn(async move {
1280            while let Ok((stream, _)) = server.accept().await {
1281                // Immediately close the connection to simulate a failure
1282                drop(stream);
1283            }
1284        });
1285        (addr, jh)
1286    }
1287
1288    #[tokio::test]
1289    async fn test_connect_max_attempts() {
1290        let (addr, _) = mock_bad_connection_tycho_ws().await;
1291        let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1292
1293        let join_handle = client.connect().await;
1294
1295        assert!(join_handle.is_err());
1296        assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1297    }
1298}