tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is clonable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::Arc,
25    time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31    header::{
32        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33        USER_AGENT,
34    },
35    Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41    net::TcpStream,
42    sync::{
43        mpsc::{self, error::TrySendError, Receiver, Sender},
44        oneshot, Mutex, Notify,
45    },
46    task::JoinHandle,
47    time::sleep,
48};
49use tokio_tungstenite::{
50    connect_async,
51    tungstenite::{
52        self,
53        handshake::client::{generate_key, Request},
54    },
55    MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65    /// The passed tycho url failed to parse.
66    #[error("Failed to parse URI: {0}. Error: {1}")]
67    UriParsing(String, String),
68    /// Informs you about a subscription being already pending and is awaiting conformation from
69    /// the server.
70    #[error("The requested subscription is already pending")]
71    SubscriptionAlreadyPending,
72    /// Informs that an message failed to send via an internal channel or throuh the websocket
73    /// channel. This is most likely fatal and might mean that the implementation is buggy under
74    /// certain conditions.
75    #[error("{0}")]
76    TransportError(String),
77    /// An internal buffer is full. This likely means that messages are not being consumed fast
78    /// enough. If the incoming load emits messages in bursts, consider increasing the buffer size.
79    #[error("The buffer is full!")]
80    BufferFull,
81    /// The client has currently no active connection but it was accessed e.g. by calling
82    /// subscribe.
83    #[error("The client is not connected!")]
84    NotConnected,
85    /// The connect method was called while the client already had an active connection.
86    #[error("The client is already connected!")]
87    AlreadyConnected,
88    /// The connection was closed orderly by the server, e.g. because it restarted.
89    #[error("The server closed the connection!")]
90    ConnectionClosed,
91    /// The connection was closed unexpectedly by the server.
92    #[error("ConnectionError {source}")]
93    ConnectionError {
94        #[from]
95        source: tungstenite::Error,
96    },
97    /// Other fatal errors: e.g. if the underlying websockets buffer is full.
98    #[error("Tycho FatalError: {0}")]
99    Fatal(String),
100}
101
102#[derive(Clone, Debug)]
103pub struct SubscriptionOptions {
104    include_state: bool,
105}
106
107impl Default for SubscriptionOptions {
108    fn default() -> Self {
109        Self { include_state: true }
110    }
111}
112
113impl SubscriptionOptions {
114    pub fn new() -> Self {
115        Self::default()
116    }
117    pub fn with_state(mut self, val: bool) -> Self {
118        self.include_state = val;
119        self
120    }
121}
122
123#[cfg_attr(test, automock)]
124#[async_trait]
125pub trait DeltasClient {
126    /// Subscribe to an extractor and receive realtime messages
127    ///
128    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
129    /// cancels while waiting for confirmation the subscription may still be registered. If the
130    /// receiver was deallocated though, the first message from the subscription will remove it
131    /// again - since there is no one to inform about these messages.
132    async fn subscribe(
133        &self,
134        extractor_id: ExtractorIdentity,
135        options: SubscriptionOptions,
136    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
137
138    /// Unsubscribe from an subscription
139    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
140
141    /// Start the clients message handling loop.
142    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
143
144    /// Close the clients message handling loop.
145    async fn close(&self) -> Result<(), DeltasError>;
146}
147
148#[derive(Clone)]
149pub struct WsDeltasClient {
150    /// The tycho indexer websocket uri.
151    uri: Uri,
152    /// Authorization key for the websocket connection.
153    auth_key: Option<String>,
154    /// Maximum amount of reconnects to try before giving up.
155    max_reconnects: u32,
156    /// The client will buffer this many messages incoming from the websocket
157    /// before starting to drop them.
158    ws_buffer_size: usize,
159    /// The client will buffer that many messages for each subscription before it starts droppping
160    /// them.
161    subscription_buffer_size: usize,
162    /// Notify tasks waiting for a connection to be established.
163    conn_notify: Arc<Notify>,
164    /// Shared client instance state.
165    inner: Arc<Mutex<Option<Inner>>>,
166}
167
168type WebSocketSink =
169    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
170
171/// Subscription State
172///
173/// Subscription go through a lifecycle:
174///
175/// ```text
176/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
177/// ```
178///
179/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
180/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
181/// for `unsubscribe`.
182#[derive(Debug)]
183enum SubscriptionInfo {
184    /// Subscription was requested we wait for server confirmation and uuid assignment.
185    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
186    /// Subscription is active.
187    Active,
188    /// Unsubscription was requested, we wait for server confirmation.
189    RequestedUnsubscription(oneshot::Sender<()>),
190}
191
192/// Internal struct containing shared state between of WsDeltaClient instances.
193struct Inner {
194    /// Websocket sender handle.
195    sink: WebSocketSink,
196    /// Command channel sender handle.
197    cmd_tx: Sender<()>,
198    /// Currently pending subscriptions.
199    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
200    /// Active subscriptions.
201    subscriptions: HashMap<Uuid, SubscriptionInfo>,
202    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
203    /// subscribe.
204    sender: HashMap<Uuid, Sender<BlockChanges>>,
205    /// How many messages to buffer per subscription before starting to drop new messages.
206    buffer_size: usize,
207}
208
209/// Shared state betweeen all client instances.
210///
211/// This state is behind a mutex and requires synchronisation to be read of modified.
212impl Inner {
213    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
214        Self {
215            sink,
216            cmd_tx,
217            pending: HashMap::new(),
218            subscriptions: HashMap::new(),
219            sender: HashMap::new(),
220            buffer_size,
221        }
222    }
223
224    /// Registers a new pending subscription.
225    #[allow(clippy::result_large_err)]
226    fn new_subscription(
227        &mut self,
228        id: &ExtractorIdentity,
229        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
230    ) -> Result<(), DeltasError> {
231        if self.pending.contains_key(id) {
232            return Err(DeltasError::SubscriptionAlreadyPending);
233        }
234        self.pending
235            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
236        Ok(())
237    }
238
239    /// Transitions a pending subscription to active.
240    ///
241    /// Will ignore any request to do so for subscriptions that are not pending.
242    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
243        if let Some(info) = self.pending.remove(extractor_id) {
244            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
245                let (tx, rx) = mpsc::channel(self.buffer_size);
246                self.sender.insert(subscription_id, tx);
247                self.subscriptions
248                    .insert(subscription_id, SubscriptionInfo::Active);
249                let _ = ready_tx
250                    .send((subscription_id, rx))
251                    .map_err(|_| {
252                        warn!(
253                            ?extractor_id,
254                            ?subscription_id,
255                            "Subscriber for has gone away. Ignoring."
256                        )
257                    });
258            } else {
259                error!(
260                    ?extractor_id,
261                    ?subscription_id,
262                    "Pending subscription was not in the correct state to 
263                    transition to active. Ignoring!"
264                )
265            }
266        } else {
267            error!(
268                ?extractor_id,
269                ?subscription_id,
270                "Tried to mark an unkown subscription as active. Ignoring!"
271            );
272        }
273    }
274
275    /// Sends a message to a subscription's receiver.
276    #[allow(clippy::result_large_err)]
277    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
278        if let Some(sender) = self.sender.get_mut(id) {
279            sender
280                .try_send(msg)
281                .map_err(|e| match e {
282                    TrySendError::Full(_) => DeltasError::BufferFull,
283                    TrySendError::Closed(_) => {
284                        DeltasError::TransportError("The subscriber has gone away".to_string())
285                    }
286                })?;
287        }
288        Ok(())
289    }
290
291    /// Requests a subscription to end.
292    ///
293    /// The subscription needs to exist and be active for this to have any effect. Wll use
294    /// `ready_tx` to notify the receiver once the transition to ended completed.
295    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
296        if let Some(info) = self
297            .subscriptions
298            .get_mut(subscription_id)
299        {
300            if let SubscriptionInfo::Active = info {
301                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
302            }
303        } else {
304            // no big deal imo so only debug lvl..
305            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
306        }
307    }
308
309    /// Removes and fully ends a subscription
310    ///
311    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
312    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
313    /// Will remove a subscription even it was in active or pending state before, this is to support
314    /// any server side failure of the subscription.
315    fn remove_subscription(&mut self, subscription_id: Uuid) {
316        if let Entry::Occupied(e) = self
317            .subscriptions
318            .entry(subscription_id)
319        {
320            let info = e.remove();
321            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
322                let _ = tx.send(()).map_err(|_| {
323                    warn!(?subscription_id, "failed to notify about removed subscription")
324                });
325                self.sender
326                    .remove(&subscription_id)
327                    .expect(
328                        "Inconsistent internal client state: `sender` state 
329                        drifted from `info` while removing a subscription.",
330                    );
331            } else {
332                warn!(?subscription_id, "Subscription ended unexpectedly!");
333                self.sender
334                    .remove(&subscription_id)
335                    .expect("sender channel missing");
336            }
337        } else {
338            error!(
339                ?subscription_id,
340                "Received `SubscriptionEnded`, but was never subscribed 
341                to it. This is likely a bug!"
342            );
343        }
344    }
345
346    /// Sends a message through the websocket.
347    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
348        self.sink.send(msg).await.map_err(|e| {
349            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
350        })
351    }
352}
353
354/// Tycho client websocket implementation.
355impl WsDeltasClient {
356    // Construct a new client with 5 reconnection attempts.
357    #[allow(clippy::result_large_err)]
358    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
359        let uri = ws_uri
360            .parse::<Uri>()
361            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
362        Ok(Self {
363            uri,
364            auth_key: auth_key.map(|s| s.to_string()),
365            inner: Arc::new(Mutex::new(None)),
366            ws_buffer_size: 128,
367            subscription_buffer_size: 128,
368            conn_notify: Arc::new(Notify::new()),
369            max_reconnects: 5,
370        })
371    }
372
373    // Construct a new client with a custom number of reconnection attempts.
374    #[allow(clippy::result_large_err)]
375    pub fn new_with_reconnects(
376        ws_uri: &str,
377        max_reconnects: u32,
378        auth_key: Option<&str>,
379    ) -> Result<Self, DeltasError> {
380        let uri = ws_uri
381            .parse::<Uri>()
382            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
383
384        Ok(Self {
385            uri,
386            auth_key: auth_key.map(|s| s.to_string()),
387            inner: Arc::new(Mutex::new(None)),
388            ws_buffer_size: 128,
389            subscription_buffer_size: 128,
390            conn_notify: Arc::new(Notify::new()),
391            max_reconnects,
392        })
393    }
394
395    /// Ensures that the client is connected.
396    ///
397    /// This method will acquire the lock for inner.
398    async fn is_connected(&self) -> bool {
399        let guard = self.inner.as_ref().lock().await;
400        guard.is_some()
401    }
402
403    /// Waits for the client to be connected
404    ///
405    /// This method acquires the lock for inner for a short period, then waits until the  
406    /// connection is established if not already connected.
407    async fn ensure_connection(&self) {
408        if !self.is_connected().await {
409            self.conn_notify.notified().await;
410        }
411    }
412
413    /// Main message handling logic
414    ///
415    /// If the message returns an error, a reconnect attempt may be considered depending on the
416    /// error type.
417    #[instrument(skip(self, msg))]
418    async fn handle_msg(
419        &self,
420        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
421    ) -> Result<(), DeltasError> {
422        let mut guard = self.inner.lock().await;
423
424        match msg {
425            // We do not deserialize the message directly into a WebSocketMessage. This is because
426            // the serde arbitrary_precision feature (often included in many
427            // dependencies we use) breaks some untagged enum deserializations. Instead,
428            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
429            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
430                serde_json::Value,
431            >(&text)
432            {
433                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
434                    Ok(ws_message) => match ws_message {
435                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
436                            trace!(?deltas, "Received a block state change, sending to channel");
437                            let inner = guard
438                                .as_mut()
439                                .ok_or_else(|| DeltasError::NotConnected)?;
440                            match inner.send(&subscription_id, deltas) {
441                                Err(DeltasError::BufferFull) => {
442                                    error!(?subscription_id, "Buffer full, message dropped!");
443                                }
444                                Err(_) => {
445                                    warn!(
446                                        ?subscription_id,
447                                        "Receiver for has gone away, unsubscribing!"
448                                    );
449                                    let (tx, _) = oneshot::channel();
450                                    let _ = WsDeltasClient::unsubscribe_inner(
451                                        inner,
452                                        subscription_id,
453                                        tx,
454                                    )
455                                    .await;
456                                }
457                                _ => { /* Do nothing */ }
458                            }
459                        }
460                        WebSocketMessage::Response(Response::NewSubscription {
461                            extractor_id,
462                            subscription_id,
463                        }) => {
464                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
465                            let inner = guard
466                                .as_mut()
467                                .ok_or_else(|| DeltasError::NotConnected)?;
468                            inner.mark_active(&extractor_id, subscription_id);
469                        }
470                        WebSocketMessage::Response(Response::SubscriptionEnded {
471                            subscription_id,
472                        }) => {
473                            info!(?subscription_id, "Received a subscription ended");
474                            let inner = guard
475                                .as_mut()
476                                .ok_or_else(|| DeltasError::NotConnected)?;
477                            inner.remove_subscription(subscription_id);
478                        }
479                    },
480                    Err(e) => {
481                        error!(error = %e, message=text, "Failed to deserialize WebSocketMessage: message does not match expected structs");
482                    }
483                },
484                Err(e) => {
485                    error!(error = %e, message=text, "Failed to deserialize message: invalid json");
486                }
487            },
488            Ok(tungstenite::protocol::Message::Ping(_)) => {
489                // Respond to pings with pongs.
490                let inner = guard
491                    .as_mut()
492                    .ok_or_else(|| DeltasError::NotConnected)?;
493                if let Err(error) = inner
494                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
495                    .await
496                {
497                    debug!(?error, "Failed to send pong!");
498                }
499            }
500            Ok(tungstenite::protocol::Message::Pong(_)) => {
501                // Do nothing.
502            }
503            Ok(tungstenite::protocol::Message::Close(_)) => {
504                return Err(DeltasError::ConnectionClosed);
505            }
506            Ok(unknown_msg) => {
507                info!("Received an unknown message type: {:?}", unknown_msg);
508            }
509            Err(error) => {
510                error!(?error, "Websocket error");
511                return Err(match &error {
512                    tungstenite::Error::ConnectionClosed => DeltasError::from(error),
513                    tungstenite::Error::AlreadyClosed => {
514                        warn!("Received AlreadyClosed error which is indicative of a bug!");
515                        DeltasError::from(error)
516                    }
517                    tungstenite::Error::Io(_) => DeltasError::from(error),
518                    tungstenite::Error::Protocol(_) => DeltasError::from(error),
519                    _ => DeltasError::Fatal(error.to_string()),
520                });
521            }
522        };
523        Ok(())
524    }
525
526    /// Helper method to force request a unsubscribe of a subscription
527    ///
528    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
529    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
530    /// receivers.
531    async fn unsubscribe_inner(
532        inner: &mut Inner,
533        subscription_id: Uuid,
534        ready_tx: oneshot::Sender<()>,
535    ) -> Result<(), DeltasError> {
536        inner.end_subscription(&subscription_id, ready_tx);
537        let cmd = Command::Unsubscribe { subscription_id };
538        inner
539            .ws_send(tungstenite::protocol::Message::Text(
540                serde_json::to_string(&cmd).expect("serialize cmd encode error"),
541            ))
542            .await?;
543        Ok(())
544    }
545}
546
547#[async_trait]
548impl DeltasClient for WsDeltasClient {
549    #[instrument(skip(self))]
550    async fn subscribe(
551        &self,
552        extractor_id: ExtractorIdentity,
553        options: SubscriptionOptions,
554    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
555        trace!("Starting subscribe");
556        self.ensure_connection().await;
557        let (ready_tx, ready_rx) = oneshot::channel();
558        {
559            let mut guard = self.inner.lock().await;
560            let inner = guard
561                .as_mut()
562                .expect("ws not connected");
563            trace!("Sending subscribe command");
564            inner.new_subscription(&extractor_id, ready_tx)?;
565            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
566            inner
567                .ws_send(tungstenite::protocol::Message::Text(
568                    serde_json::to_string(&cmd).expect("serialize cmd encode error"),
569                ))
570                .await?;
571        }
572        trace!("Waiting for subscription response");
573        let rx = ready_rx
574            .await
575            .expect("ready channel closed");
576        trace!("Subscription successfull");
577        Ok(rx)
578    }
579
580    #[instrument(skip(self))]
581    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
582        self.ensure_connection().await;
583        let (ready_tx, ready_rx) = oneshot::channel();
584        {
585            let mut guard = self.inner.lock().await;
586            let inner = guard
587                .as_mut()
588                .expect("ws not connected");
589
590            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
591        }
592        ready_rx
593            .await
594            .expect("ready channel closed");
595
596        Ok(())
597    }
598
599    #[instrument(skip(self))]
600    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
601        if self.is_connected().await {
602            return Err(DeltasError::AlreadyConnected);
603        }
604        let ws_uri = format!("{}{}/ws", self.uri, TYCHO_SERVER_VERSION);
605        info!(?ws_uri, "Starting TychoWebsocketClient");
606
607        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
608        {
609            let mut guard = self.inner.as_ref().lock().await;
610            *guard = None;
611        }
612        let this = self.clone();
613        let jh = tokio::spawn(async move {
614            let mut retry_count = 0;
615            let mut result = Err(DeltasError::NotConnected);
616
617            'retry: while retry_count < this.max_reconnects {
618                info!(?ws_uri, "Connecting to WebSocket server");
619
620                // Create a WebSocket request
621                let mut request_builder = Request::builder()
622                    .uri(&ws_uri)
623                    .header(SEC_WEBSOCKET_KEY, generate_key())
624                    .header(SEC_WEBSOCKET_VERSION, 13)
625                    .header(CONNECTION, "Upgrade")
626                    .header(UPGRADE, "websocket")
627                    .header(
628                        HOST,
629                        this.uri
630                            .host()
631                            .expect("no host found in tycho url"),
632                    )
633                    .header(USER_AGENT, format!("tycho-client-{}", env!("CARGO_PKG_VERSION")));
634
635                // Add Authorization if one is given
636                if let Some(ref key) = this.auth_key {
637                    request_builder = request_builder.header(AUTHORIZATION, key);
638                }
639
640                let request = request_builder.body(()).unwrap();
641                let (conn, _) = match connect_async(request).await {
642                    Ok(conn) => conn,
643                    Err(e) => {
644                        // Prepare for reconnection
645                        retry_count += 1;
646                        let mut guard = this.inner.as_ref().lock().await;
647                        *guard = None;
648
649                        warn!(
650                            e = e.to_string(),
651                            "Failed to connect to WebSocket server; Reconnecting"
652                        );
653                        sleep(Duration::from_millis(500)).await;
654
655                        continue 'retry;
656                    }
657                };
658
659                let (ws_tx_new, ws_rx_new) = conn.split();
660                {
661                    let mut guard = this.inner.as_ref().lock().await;
662                    *guard =
663                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
664                }
665                let mut msg_rx = ws_rx_new.boxed();
666
667                info!("Connection Successful: TychoWebsocketClient started");
668                this.conn_notify.notify_waiters();
669                result = Ok(());
670
671                loop {
672                    let res = tokio::select! {
673                        msg = msg_rx.next() => match msg {
674                            Some(msg) => this.handle_msg(msg).await,
675                            None => { break 'retry } // ws connection silently closed
676                        },
677                        _ = cmd_rx.recv() => {break 'retry},
678                    };
679                    if let Err(error) = res {
680                        if matches!(
681                            error,
682                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
683                        ) {
684                            // Prepare for reconnection
685                            retry_count += 1;
686                            let mut guard = this.inner.as_ref().lock().await;
687                            *guard = None;
688
689                            warn!(
690                                ?error,
691                                ?retry_count,
692                                "Connection dropped unexpectedly; Reconnecting"
693                            );
694                            break;
695                        } else {
696                            // Other errors are considered fatal
697                            error!(?error, "Fatal error; Exiting");
698                            result = Err(error);
699                            break 'retry;
700                        }
701                    }
702                }
703            }
704
705            // Clean up before exiting
706            let mut guard = this.inner.as_ref().lock().await;
707            *guard = None;
708
709            // Check if max retries has been reached.
710            if retry_count >= this.max_reconnects {
711                error!("Max reconnection attempts reached; Exiting");
712                this.conn_notify.notify_waiters(); // Notify that the task is done
713                result = Err(DeltasError::ConnectionClosed);
714            }
715
716            result
717        });
718
719        self.conn_notify.notified().await;
720
721        if self.is_connected().await {
722            Ok(jh)
723        } else {
724            Err(DeltasError::NotConnected)
725        }
726    }
727
728    #[instrument(skip(self))]
729    async fn close(&self) -> Result<(), DeltasError> {
730        info!("Closing TychoWebsocketClient");
731        let mut guard = self.inner.lock().await;
732        let inner = guard
733            .as_mut()
734            .ok_or_else(|| DeltasError::NotConnected)?;
735        inner
736            .cmd_tx
737            .send(())
738            .await
739            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
740        Ok(())
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use std::net::SocketAddr;
747
748    use tokio::{net::TcpListener, time::timeout};
749    use tycho_common::dto::Chain;
750
751    use super::*;
752
753    #[derive(Clone)]
754    enum ExpectedComm {
755        Receive(u64, tungstenite::protocol::Message),
756        Send(tungstenite::protocol::Message),
757    }
758
759    async fn mock_tycho_ws(
760        messages: &[ExpectedComm],
761        reconnects: usize,
762    ) -> (SocketAddr, JoinHandle<()>) {
763        info!("Starting mock webserver");
764        // zero port here means the OS chooses an open port
765        let server = TcpListener::bind("127.0.0.1:0")
766            .await
767            .expect("localhost bind failed");
768        let addr = server.local_addr().unwrap();
769        let messages = messages.to_vec();
770
771        let jh = tokio::spawn(async move {
772            info!("mock webserver started");
773            for _ in 0..(reconnects + 1) {
774                if let Ok((stream, _)) = server.accept().await {
775                    let mut websocket = tokio_tungstenite::accept_async(stream)
776                        .await
777                        .unwrap();
778
779                    info!("Handling messages..");
780                    for c in messages.iter().cloned() {
781                        match c {
782                            ExpectedComm::Receive(t, exp) => {
783                                info!("Awaiting message...");
784                                let msg = timeout(Duration::from_millis(t), websocket.next())
785                                    .await
786                                    .expect("Receive timeout")
787                                    .expect("Stream exhausted")
788                                    .expect("Failed to receive message.");
789                                info!("Message received");
790                                assert_eq!(msg, exp)
791                            }
792                            ExpectedComm::Send(data) => {
793                                info!("Sending message");
794                                websocket
795                                    .send(data)
796                                    .await
797                                    .expect("Failed to send message");
798                                info!("Message sent");
799                            }
800                        };
801                    }
802                    sleep(Duration::from_millis(100)).await;
803                    // Close the WebSocket connection
804                    let _ = websocket.close(None).await;
805                }
806            }
807        });
808        (addr, jh)
809    }
810
811    #[tokio::test]
812    async fn test_subscribe_receive() {
813        let exp_comm = [
814            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
815                {
816                    "method":"subscribe",
817                    "extractor_id":{
818                        "chain":"ethereum",
819                        "name":"vm:ambient"
820                    },
821                    "include_state": true
822                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
823            )),
824            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
825                {
826                    "method":"newsubscription",
827                    "extractor_id":{
828                    "chain":"ethereum",
829                    "name":"vm:ambient"
830                    },
831                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
832                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
833            )),
834            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
835                {
836                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
837                    "deltas": {
838                        "extractor": "vm:ambient",
839                        "chain": "ethereum",
840                        "block": {
841                            "number": 123,
842                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
843                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
844                            "chain": "ethereum",             
845                            "ts": "2023-09-14T00:00:00"
846                        },
847                        "finalized_block_height": 0,
848                        "revert": false,
849                        "new_tokens": {},
850                        "account_updates": {
851                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
852                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
853                                "chain": "ethereum",
854                                "slots": {},
855                                "balance": "0x01f4",
856                                "code": "",
857                                "change": "Update"
858                            }
859                        },
860                        "state_updates": {
861                            "component_1": {
862                                "component_id": "component_1",
863                                "updated_attributes": {"attr1": "0x01"},
864                                "deleted_attributes": ["attr2"]
865                            }
866                        },
867                        "new_protocol_components": 
868                            { "protocol_1": {
869                                    "id": "protocol_1",
870                                    "protocol_system": "system_1",
871                                    "protocol_type_name": "type_1",
872                                    "chain": "ethereum",
873                                    "tokens": ["0x01", "0x02"],
874                                    "contract_ids": ["0x01", "0x02"],
875                                    "static_attributes": {"attr1": "0x01f4"},
876                                    "change": "Update",
877                                    "creation_tx": "0x01",
878                                    "created_at": "2023-09-14T00:00:00"
879                                }
880                            },
881                        "deleted_protocol_components": {},
882                        "component_balances": {
883                            "protocol_1":
884                                {
885                                    "0x01": {
886                                        "token": "0x01",
887                                        "balance": "0x01f4",
888                                        "balance_float": 0.0,
889                                        "modify_tx": "0x01",
890                                        "component_id": "protocol_1"
891                                    }
892                                }
893                        },
894                        "account_balances": {
895                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
896                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
897                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
898                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
899                                    "balance": "0x01f4",
900                                    "modify_tx": "0x01"
901                                }
902                            }
903                        },
904                        "component_tvl": {
905                            "protocol_1": 1000.0
906                        }
907                    }
908                }
909                "#.to_owned()
910            ))
911        ];
912        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
913
914        let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
915        let jh = client
916            .connect()
917            .await
918            .expect("connect failed");
919        let (_, mut rx) = timeout(
920            Duration::from_millis(100),
921            client.subscribe(
922                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
923                SubscriptionOptions::new(),
924            ),
925        )
926        .await
927        .expect("subscription timed out")
928        .expect("subscription failed");
929        let _ = timeout(Duration::from_millis(100), rx.recv())
930            .await
931            .expect("awaiting message timeout out")
932            .expect("receiving message failed");
933        timeout(Duration::from_millis(100), client.close())
934            .await
935            .expect("close timed out")
936            .expect("close failed");
937        jh.await
938            .expect("ws loop errored")
939            .unwrap();
940        server_thread.await.unwrap();
941    }
942
943    #[tokio::test]
944    async fn test_unsubscribe() {
945        let exp_comm = [
946            ExpectedComm::Receive(
947                100,
948                tungstenite::protocol::Message::Text(
949                    r#"
950                {
951                    "method": "subscribe",
952                    "extractor_id":{
953                        "chain": "ethereum",
954                        "name": "vm:ambient"
955                    },
956                    "include_state": true
957                }"#
958                    .to_owned()
959                    .replace(|c: char| c.is_whitespace(), ""),
960                ),
961            ),
962            ExpectedComm::Send(tungstenite::protocol::Message::Text(
963                r#"
964                {
965                    "method": "newsubscription",
966                    "extractor_id":{
967                        "chain": "ethereum",
968                        "name": "vm:ambient"
969                    },
970                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
971                }"#
972                .to_owned()
973                .replace(|c: char| c.is_whitespace(), ""),
974            )),
975            ExpectedComm::Receive(
976                100,
977                tungstenite::protocol::Message::Text(
978                    r#"
979                {
980                    "method": "unsubscribe",
981                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
982                }
983                "#
984                    .to_owned()
985                    .replace(|c: char| c.is_whitespace(), ""),
986                ),
987            ),
988            ExpectedComm::Send(tungstenite::protocol::Message::Text(
989                r#"
990                {
991                    "method": "subscriptionended",
992                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
993                }
994                "#
995                .to_owned()
996                .replace(|c: char| c.is_whitespace(), ""),
997            )),
998        ];
999        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1000
1001        let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1002        let jh = client
1003            .connect()
1004            .await
1005            .expect("connect failed");
1006        let (sub_id, mut rx) = timeout(
1007            Duration::from_millis(100),
1008            client.subscribe(
1009                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1010                SubscriptionOptions::new(),
1011            ),
1012        )
1013        .await
1014        .expect("subscription timed out")
1015        .expect("subscription failed");
1016
1017        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1018            .await
1019            .expect("unsubscribe timed out")
1020            .expect("unsubscribe failed");
1021        let res = timeout(Duration::from_millis(100), rx.recv())
1022            .await
1023            .expect("awaiting message timeout out");
1024
1025        // If the subscription ended, the channel should have been closed.
1026        assert!(res.is_none());
1027
1028        timeout(Duration::from_millis(100), client.close())
1029            .await
1030            .expect("close timed out")
1031            .expect("close failed");
1032        jh.await
1033            .expect("ws loop errored")
1034            .unwrap();
1035        server_thread.await.unwrap();
1036    }
1037
1038    #[tokio::test]
1039    async fn test_subscription_unexpected_end() {
1040        let exp_comm = [
1041            ExpectedComm::Receive(
1042                100,
1043                tungstenite::protocol::Message::Text(
1044                    r#"
1045                {
1046                    "method":"subscribe",
1047                    "extractor_id":{
1048                        "chain":"ethereum",
1049                        "name":"vm:ambient"
1050                    },
1051                    "include_state": true
1052                }"#
1053                    .to_owned()
1054                    .replace(|c: char| c.is_whitespace(), ""),
1055                ),
1056            ),
1057            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1058                r#"
1059                {
1060                    "method":"newsubscription",
1061                    "extractor_id":{
1062                        "chain":"ethereum",
1063                        "name":"vm:ambient"
1064                    },
1065                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1066                }"#
1067                .to_owned()
1068                .replace(|c: char| c.is_whitespace(), ""),
1069            )),
1070            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1071                r#"
1072                {
1073                    "method": "subscriptionended",
1074                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1075                }"#
1076                .to_owned()
1077                .replace(|c: char| c.is_whitespace(), ""),
1078            )),
1079        ];
1080        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1081
1082        let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1083        let jh = client
1084            .connect()
1085            .await
1086            .expect("connect failed");
1087        let (_, mut rx) = timeout(
1088            Duration::from_millis(100),
1089            client.subscribe(
1090                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1091                SubscriptionOptions::new(),
1092            ),
1093        )
1094        .await
1095        .expect("subscription timed out")
1096        .expect("subscription failed");
1097        let res = timeout(Duration::from_millis(100), rx.recv())
1098            .await
1099            .expect("awaiting message timeout out");
1100
1101        // If the subscription ended, the channel should have been closed.
1102        assert!(res.is_none());
1103
1104        timeout(Duration::from_millis(100), client.close())
1105            .await
1106            .expect("close timed out")
1107            .expect("close failed");
1108        jh.await
1109            .expect("ws loop errored")
1110            .unwrap();
1111        server_thread.await.unwrap();
1112    }
1113
1114    #[test_log::test(tokio::test)]
1115    async fn test_reconnect() {
1116        let exp_comm = [
1117            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1118                {
1119                    "method":"subscribe",
1120                    "extractor_id":{
1121                        "chain":"ethereum",
1122                        "name":"vm:ambient"
1123                    },
1124                    "include_state": true
1125                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1126            )),
1127            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1128                {
1129                    "method":"newsubscription",
1130                    "extractor_id":{
1131                    "chain":"ethereum",
1132                    "name":"vm:ambient"
1133                    },
1134                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1135                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1136            )),
1137            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1138                {
1139                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1140                    "deltas": {
1141                        "extractor": "vm:ambient",
1142                        "chain": "ethereum",
1143                        "block": {
1144                            "number": 123,
1145                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1146                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1147                            "chain": "ethereum",             
1148                            "ts": "2023-09-14T00:00:00"
1149                        },
1150                        "finalized_block_height": 0,
1151                        "revert": false,
1152                        "new_tokens": {},
1153                        "account_updates": {
1154                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1155                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1156                                "chain": "ethereum",
1157                                "slots": {},
1158                                "balance": "0x01f4",
1159                                "code": "",
1160                                "change": "Update"
1161                            }
1162                        },
1163                        "state_updates": {
1164                            "component_1": {
1165                                "component_id": "component_1",
1166                                "updated_attributes": {"attr1": "0x01"},
1167                                "deleted_attributes": ["attr2"]
1168                            }
1169                        },
1170                        "new_protocol_components": {
1171                            "protocol_1":
1172                                {
1173                                    "id": "protocol_1",
1174                                    "protocol_system": "system_1",
1175                                    "protocol_type_name": "type_1",
1176                                    "chain": "ethereum",
1177                                    "tokens": ["0x01", "0x02"],
1178                                    "contract_ids": ["0x01", "0x02"],
1179                                    "static_attributes": {"attr1": "0x01f4"},
1180                                    "change": "Update",
1181                                    "creation_tx": "0x01",
1182                                    "created_at": "2023-09-14T00:00:00"
1183                                }
1184                            },
1185                        "deleted_protocol_components": {},
1186                        "component_balances": {
1187                            "protocol_1": {
1188                                "0x01": {
1189                                    "token": "0x01",
1190                                    "balance": "0x01f4",
1191                                    "balance_float": 1000.0,
1192                                    "modify_tx": "0x01",
1193                                    "component_id": "protocol_1"
1194                                }
1195                            }
1196                        },
1197                        "account_balances": {
1198                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1199                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1200                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1201                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1202                                    "balance": "0x01f4",
1203                                    "modify_tx": "0x01"
1204                                }
1205                            }
1206                        },
1207                        "component_tvl": {
1208                            "protocol_1": 1000.0
1209                        }
1210                    }
1211                }
1212                "#.to_owned()
1213            ))
1214        ];
1215        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1216        let client = WsDeltasClient::new(&format!("ws://{}", addr), None).unwrap();
1217        let jh: JoinHandle<Result<(), DeltasError>> = client
1218            .connect()
1219            .await
1220            .expect("connect failed");
1221
1222        for _ in 0..2 {
1223            let (_, mut rx) = timeout(
1224                Duration::from_millis(100),
1225                client.subscribe(
1226                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1227                    SubscriptionOptions::new(),
1228                ),
1229            )
1230            .await
1231            .expect("subscription timed out")
1232            .expect("subscription failed");
1233
1234            let _ = timeout(Duration::from_millis(100), rx.recv())
1235                .await
1236                .expect("awaiting message timeout out")
1237                .expect("receiving message failed");
1238
1239            // wait for the connection to drop
1240            let res = timeout(Duration::from_millis(200), rx.recv())
1241                .await
1242                .expect("awaiting closed connection timeout out");
1243            assert!(res.is_none());
1244        }
1245        let res = jh.await.expect("ws client join failed");
1246        // 5th client reconnect attempt should fail
1247        assert!(res.is_err());
1248        server_thread
1249            .await
1250            .expect("ws server loop errored");
1251    }
1252
1253    async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1254        let server = TcpListener::bind("127.0.0.1:0")
1255            .await
1256            .expect("localhost bind failed");
1257        let addr = server.local_addr().unwrap();
1258        let jh = tokio::spawn(async move {
1259            while let Ok((stream, _)) = server.accept().await {
1260                // Immediately close the connection to simulate a failure
1261                drop(stream);
1262            }
1263        });
1264        (addr, jh)
1265    }
1266
1267    #[tokio::test]
1268    async fn test_connect_max_attempts() {
1269        let (addr, _) = mock_bad_connection_tycho_ws().await;
1270        let client =
1271            WsDeltasClient::new_with_reconnects(&format!("ws://{}", addr), 3, None).unwrap();
1272
1273        let join_handle = client.connect().await;
1274
1275        assert!(join_handle.is_err());
1276        assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1277    }
1278}