tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is clonable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::Arc,
25    time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31    header::{
32        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33        USER_AGENT,
34    },
35    Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41    net::TcpStream,
42    sync::{
43        mpsc::{self, error::TrySendError, Receiver, Sender},
44        oneshot, Mutex, Notify,
45    },
46    task::JoinHandle,
47    time::sleep,
48};
49use tokio_tungstenite::{
50    connect_async,
51    tungstenite::{
52        self,
53        handshake::client::{generate_key, Request},
54    },
55    MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65    /// Failed to parse the provided URI.
66    #[error("Failed to parse URI: {0}. Error: {1}")]
67    UriParsing(String, String),
68
69    /// The requested subscription is already pending and is awaiting confirmation from the server.
70    #[error("The requested subscription is already pending")]
71    SubscriptionAlreadyPending,
72
73    /// A message failed to send via an internal channel or through the websocket channel.
74    /// This is typically a fatal error and might indicate a bug in the implementation.
75    #[error("{0}")]
76    TransportError(String),
77
78    /// The internal message buffer is full. This likely means that messages are not being consumed
79    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
80    /// size.
81    #[error("The buffer is full!")]
82    BufferFull,
83
84    /// The client has no active connections but was accessed (e.g., by calling subscribe).
85    /// This typically occurs when trying to use the client before calling connect() or
86    /// after the connection has been closed.
87    #[error("The client is not connected!")]
88    NotConnected,
89
90    /// The connect method was called while the client already had an active connection.
91    #[error("The client is already connected!")]
92    AlreadyConnected,
93
94    /// The connection was closed orderly by the server, e.g. because it restarted.
95    #[error("The server closed the connection!")]
96    ConnectionClosed,
97
98    /// The connection was closed unexpectedly by the server or encountered a network error.
99    #[error("Connection error: {0}")]
100    ConnectionError(#[from] Box<tungstenite::Error>),
101
102    /// A fatal error occurred that cannot be recovered from.
103    #[error("Tycho FatalError: {0}")]
104    Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109    include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113    fn default() -> Self {
114        Self { include_state: true }
115    }
116}
117
118impl SubscriptionOptions {
119    pub fn new() -> Self {
120        Self::default()
121    }
122    pub fn with_state(mut self, val: bool) -> Self {
123        self.include_state = val;
124        self
125    }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131    /// Subscribe to an extractor and receive realtime messages
132    ///
133    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
134    /// cancels while waiting for confirmation the subscription may still be registered. If the
135    /// receiver was deallocated though, the first message from the subscription will remove it
136    /// again - since there is no one to inform about these messages.
137    async fn subscribe(
138        &self,
139        extractor_id: ExtractorIdentity,
140        options: SubscriptionOptions,
141    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143    /// Unsubscribe from an subscription
144    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146    /// Start the clients message handling loop.
147    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149    /// Close the clients message handling loop.
150    async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155    /// The tycho indexer websocket uri.
156    uri: Uri,
157    /// Authorization key for the websocket connection.
158    auth_key: Option<String>,
159    /// Maximum amount of reconnects to try before giving up.
160    max_reconnects: u32,
161    /// The client will buffer this many messages incoming from the websocket
162    /// before starting to drop them.
163    ws_buffer_size: usize,
164    /// The client will buffer that many messages for each subscription before it starts droppping
165    /// them.
166    subscription_buffer_size: usize,
167    /// Notify tasks waiting for a connection to be established.
168    conn_notify: Arc<Notify>,
169    /// Shared client instance state.
170    inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176/// Subscription State
177///
178/// Subscription go through a lifecycle:
179///
180/// ```text
181/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
182/// ```
183///
184/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
185/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
186/// for `unsubscribe`.
187#[derive(Debug)]
188enum SubscriptionInfo {
189    /// Subscription was requested we wait for server confirmation and uuid assignment.
190    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191    /// Subscription is active.
192    Active,
193    /// Unsubscription was requested, we wait for server confirmation.
194    RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197/// Internal struct containing shared state between of WsDeltaClient instances.
198struct Inner {
199    /// Websocket sender handle.
200    sink: WebSocketSink,
201    /// Command channel sender handle.
202    cmd_tx: Sender<()>,
203    /// Currently pending subscriptions.
204    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205    /// Active subscriptions.
206    subscriptions: HashMap<Uuid, SubscriptionInfo>,
207    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
208    /// subscribe.
209    sender: HashMap<Uuid, Sender<BlockChanges>>,
210    /// How many messages to buffer per subscription before starting to drop new messages.
211    buffer_size: usize,
212}
213
214/// Shared state betweeen all client instances.
215///
216/// This state is behind a mutex and requires synchronization to be read of modified.
217impl Inner {
218    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219        Self {
220            sink,
221            cmd_tx,
222            pending: HashMap::new(),
223            subscriptions: HashMap::new(),
224            sender: HashMap::new(),
225            buffer_size,
226        }
227    }
228
229    /// Registers a new pending subscription.
230    #[allow(clippy::result_large_err)]
231    fn new_subscription(
232        &mut self,
233        id: &ExtractorIdentity,
234        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235    ) -> Result<(), DeltasError> {
236        if self.pending.contains_key(id) {
237            return Err(DeltasError::SubscriptionAlreadyPending);
238        }
239        self.pending
240            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241        Ok(())
242    }
243
244    /// Transitions a pending subscription to active.
245    ///
246    /// Will ignore any request to do so for subscriptions that are not pending.
247    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248        if let Some(info) = self.pending.remove(extractor_id) {
249            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250                let (tx, rx) = mpsc::channel(self.buffer_size);
251                self.sender.insert(subscription_id, tx);
252                self.subscriptions
253                    .insert(subscription_id, SubscriptionInfo::Active);
254                let _ = ready_tx
255                    .send((subscription_id, rx))
256                    .map_err(|_| {
257                        warn!(
258                            ?extractor_id,
259                            ?subscription_id,
260                            "Subscriber for has gone away. Ignoring."
261                        )
262                    });
263            } else {
264                error!(
265                    ?extractor_id,
266                    ?subscription_id,
267                    "Pending subscription was not in the correct state to 
268                    transition to active. Ignoring!"
269                )
270            }
271        } else {
272            error!(
273                ?extractor_id,
274                ?subscription_id,
275                "Tried to mark an unkown subscription as active. Ignoring!"
276            );
277        }
278    }
279
280    /// Sends a message to a subscription's receiver.
281    #[allow(clippy::result_large_err)]
282    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283        if let Some(sender) = self.sender.get_mut(id) {
284            sender
285                .try_send(msg)
286                .map_err(|e| match e {
287                    TrySendError::Full(_) => DeltasError::BufferFull,
288                    TrySendError::Closed(_) => {
289                        DeltasError::TransportError("The subscriber has gone away".to_string())
290                    }
291                })?;
292        }
293        Ok(())
294    }
295
296    /// Requests a subscription to end.
297    ///
298    /// The subscription needs to exist and be active for this to have any effect. Wll use
299    /// `ready_tx` to notify the receiver once the transition to ended completed.
300    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301        if let Some(info) = self
302            .subscriptions
303            .get_mut(subscription_id)
304        {
305            if let SubscriptionInfo::Active = info {
306                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307            }
308        } else {
309            // no big deal imo so only debug lvl..
310            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311        }
312    }
313
314    /// Removes and fully ends a subscription
315    ///
316    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
317    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
318    /// Will remove a subscription even it was in active or pending state before, this is to support
319    /// any server side failure of the subscription.
320    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321        if let Entry::Occupied(e) = self
322            .subscriptions
323            .entry(subscription_id)
324        {
325            let info = e.remove();
326            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327                let _ = tx.send(()).map_err(|_| {
328                    warn!(?subscription_id, "failed to notify about removed subscription")
329                });
330                self.sender
331                    .remove(&subscription_id)
332                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333            } else {
334                warn!(?subscription_id, "Subscription ended unexpectedly!");
335                self.sender
336                    .remove(&subscription_id)
337                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338            }
339        } else {
340            error!(
341                ?subscription_id,
342                "Received `SubscriptionEnded`, but was never subscribed 
343                to it. This is likely a bug!"
344            );
345        }
346
347        Ok(())
348    }
349
350    /// Sends a message through the websocket.
351    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352        self.sink.send(msg).await.map_err(|e| {
353            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354        })
355    }
356}
357
358/// Tycho client websocket implementation.
359impl WsDeltasClient {
360    // Construct a new client with 5 reconnection attempts.
361    #[allow(clippy::result_large_err)]
362    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363        let uri = ws_uri
364            .parse::<Uri>()
365            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366        Ok(Self {
367            uri,
368            auth_key: auth_key.map(|s| s.to_string()),
369            inner: Arc::new(Mutex::new(None)),
370            ws_buffer_size: 128,
371            subscription_buffer_size: 128,
372            conn_notify: Arc::new(Notify::new()),
373            max_reconnects: 5,
374        })
375    }
376
377    // Construct a new client with a custom number of reconnection attempts.
378    #[allow(clippy::result_large_err)]
379    pub fn new_with_reconnects(
380        ws_uri: &str,
381        max_reconnects: u32,
382        auth_key: Option<&str>,
383    ) -> Result<Self, DeltasError> {
384        let uri = ws_uri
385            .parse::<Uri>()
386            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388        Ok(Self {
389            uri,
390            auth_key: auth_key.map(|s| s.to_string()),
391            inner: Arc::new(Mutex::new(None)),
392            ws_buffer_size: 128,
393            subscription_buffer_size: 128,
394            conn_notify: Arc::new(Notify::new()),
395            max_reconnects,
396        })
397    }
398
399    /// Ensures that the client is connected.
400    ///
401    /// This method will acquire the lock for inner.
402    async fn is_connected(&self) -> bool {
403        let guard = self.inner.as_ref().lock().await;
404        guard.is_some()
405    }
406
407    /// Waits for the client to be connected
408    ///
409    /// This method acquires the lock for inner for a short period, then waits until the  
410    /// connection is established if not already connected.
411    async fn ensure_connection(&self) {
412        if !self.is_connected().await {
413            self.conn_notify.notified().await;
414        }
415    }
416
417    /// Main message handling logic
418    ///
419    /// If the message returns an error, a reconnect attempt may be considered depending on the
420    /// error type.
421    #[instrument(skip(self, msg))]
422    async fn handle_msg(
423        &self,
424        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425    ) -> Result<(), DeltasError> {
426        let mut guard = self.inner.lock().await;
427
428        match msg {
429            // We do not deserialize the message directly into a WebSocketMessage. This is because
430            // the serde arbitrary_precision feature (often included in many
431            // dependencies we use) breaks some untagged enum deserializations. Instead,
432            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
433            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434                serde_json::Value,
435            >(&text)
436            {
437                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438                    Ok(ws_message) => match ws_message {
439                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440                            trace!(?deltas, "Received a block state change, sending to channel");
441                            let inner = guard
442                                .as_mut()
443                                .ok_or_else(|| DeltasError::NotConnected)?;
444                            match inner.send(&subscription_id, deltas) {
445                                Err(DeltasError::BufferFull) => {
446                                    error!(?subscription_id, "Buffer full, message dropped!");
447                                }
448                                Err(_) => {
449                                    warn!(
450                                        ?subscription_id,
451                                        "Receiver for has gone away, unsubscribing!"
452                                    );
453                                    let (tx, _) = oneshot::channel();
454                                    let _ = WsDeltasClient::unsubscribe_inner(
455                                        inner,
456                                        subscription_id,
457                                        tx,
458                                    )
459                                    .await;
460                                }
461                                _ => { /* Do nothing */ }
462                            }
463                        }
464                        WebSocketMessage::Response(Response::NewSubscription {
465                            extractor_id,
466                            subscription_id,
467                        }) => {
468                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
469                            let inner = guard
470                                .as_mut()
471                                .ok_or_else(|| DeltasError::NotConnected)?;
472                            inner.mark_active(&extractor_id, subscription_id);
473                        }
474                        WebSocketMessage::Response(Response::SubscriptionEnded {
475                            subscription_id,
476                        }) => {
477                            info!(?subscription_id, "Received a subscription ended");
478                            let inner = guard
479                                .as_mut()
480                                .ok_or_else(|| DeltasError::NotConnected)?;
481                            inner.remove_subscription(subscription_id)?;
482                        }
483                    },
484                    Err(e) => {
485                        error!(
486                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
487                            e, text
488                        );
489                    }
490                },
491                Err(e) => {
492                    error!(
493                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
494                        e, text
495                    );
496                }
497            },
498            Ok(tungstenite::protocol::Message::Ping(_)) => {
499                // Respond to pings with pongs.
500                let inner = guard
501                    .as_mut()
502                    .ok_or_else(|| DeltasError::NotConnected)?;
503                if let Err(error) = inner
504                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
505                    .await
506                {
507                    debug!(?error, "Failed to send pong!");
508                }
509            }
510            Ok(tungstenite::protocol::Message::Pong(_)) => {
511                // Do nothing.
512            }
513            Ok(tungstenite::protocol::Message::Close(_)) => {
514                return Err(DeltasError::ConnectionClosed);
515            }
516            Ok(unknown_msg) => {
517                info!("Received an unknown message type: {:?}", unknown_msg);
518            }
519            Err(error) => {
520                error!(?error, "Websocket error");
521                return Err(match error {
522                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
523                    tungstenite::Error::AlreadyClosed => {
524                        warn!("Received AlreadyClosed error which is indicative of a bug!");
525                        DeltasError::ConnectionError(Box::new(error))
526                    }
527                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
528                        DeltasError::ConnectionError(Box::new(error))
529                    }
530                    _ => DeltasError::Fatal(error.to_string()),
531                });
532            }
533        };
534        Ok(())
535    }
536
537    /// Helper method to force request a unsubscribe of a subscription
538    ///
539    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
540    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
541    /// receivers.
542    async fn unsubscribe_inner(
543        inner: &mut Inner,
544        subscription_id: Uuid,
545        ready_tx: oneshot::Sender<()>,
546    ) -> Result<(), DeltasError> {
547        inner.end_subscription(&subscription_id, ready_tx);
548        let cmd = Command::Unsubscribe { subscription_id };
549        inner
550            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
551                |e| {
552                    DeltasError::TransportError(format!(
553                        "Failed to serialize unsubscribe command: {e}"
554                    ))
555                },
556            )?))
557            .await?;
558        Ok(())
559    }
560}
561
562#[async_trait]
563impl DeltasClient for WsDeltasClient {
564    #[instrument(skip(self))]
565    async fn subscribe(
566        &self,
567        extractor_id: ExtractorIdentity,
568        options: SubscriptionOptions,
569    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
570        trace!("Starting subscribe");
571        self.ensure_connection().await;
572        let (ready_tx, ready_rx) = oneshot::channel();
573        {
574            let mut guard = self.inner.lock().await;
575            let inner = guard
576                .as_mut()
577                .ok_or_else(|| DeltasError::NotConnected)?;
578            trace!("Sending subscribe command");
579            inner.new_subscription(&extractor_id, ready_tx)?;
580            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
581            inner
582                .ws_send(tungstenite::protocol::Message::Text(
583                    serde_json::to_string(&cmd).map_err(|e| {
584                        DeltasError::TransportError(format!(
585                            "Failed to serialize subscribe command: {e}"
586                        ))
587                    })?,
588                ))
589                .await?;
590        }
591        trace!("Waiting for subscription response");
592        let rx = ready_rx.await.map_err(|_| {
593            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
594        })?;
595        trace!("Subscription successful");
596        Ok(rx)
597    }
598
599    #[instrument(skip(self))]
600    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
601        self.ensure_connection().await;
602        let (ready_tx, ready_rx) = oneshot::channel();
603        {
604            let mut guard = self.inner.lock().await;
605            let inner = guard
606                .as_mut()
607                .ok_or_else(|| DeltasError::NotConnected)?;
608
609            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
610        }
611        ready_rx.await.map_err(|_| {
612            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
613        })?;
614
615        Ok(())
616    }
617
618    #[instrument(skip(self))]
619    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
620        if self.is_connected().await {
621            return Err(DeltasError::AlreadyConnected);
622        }
623        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
624        info!(?ws_uri, "Starting TychoWebsocketClient");
625
626        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
627        {
628            let mut guard = self.inner.as_ref().lock().await;
629            *guard = None;
630        }
631        let this = self.clone();
632        let jh = tokio::spawn(async move {
633            let mut retry_count = 0;
634            let mut result = Err(DeltasError::NotConnected);
635
636            'retry: while retry_count < this.max_reconnects {
637                info!(?ws_uri, "Connecting to WebSocket server");
638
639                // Create a WebSocket request
640                let mut request_builder = Request::builder()
641                    .uri(&ws_uri)
642                    .header(SEC_WEBSOCKET_KEY, generate_key())
643                    .header(SEC_WEBSOCKET_VERSION, 13)
644                    .header(CONNECTION, "Upgrade")
645                    .header(UPGRADE, "websocket")
646                    .header(
647                        HOST,
648                        this.uri.host().ok_or_else(|| {
649                            DeltasError::UriParsing(
650                                ws_uri.clone(),
651                                "No host found in tycho url".to_string(),
652                            )
653                        })?,
654                    )
655                    .header(
656                        USER_AGENT,
657                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
658                    );
659
660                // Add Authorization if one is given
661                if let Some(ref key) = this.auth_key {
662                    request_builder = request_builder.header(AUTHORIZATION, key);
663                }
664
665                let request = request_builder.body(()).map_err(|e| {
666                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
667                })?;
668                let (conn, _) = match connect_async(request).await {
669                    Ok(conn) => conn,
670                    Err(e) => {
671                        // Prepare for reconnection
672                        retry_count += 1;
673                        let mut guard = this.inner.as_ref().lock().await;
674                        *guard = None;
675
676                        warn!(
677                            e = e.to_string(),
678                            "Failed to connect to WebSocket server; Reconnecting"
679                        );
680                        sleep(Duration::from_millis(500)).await;
681
682                        continue 'retry;
683                    }
684                };
685
686                let (ws_tx_new, ws_rx_new) = conn.split();
687                {
688                    let mut guard = this.inner.as_ref().lock().await;
689                    *guard =
690                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
691                }
692                let mut msg_rx = ws_rx_new.boxed();
693
694                info!("Connection Successful: TychoWebsocketClient started");
695                this.conn_notify.notify_waiters();
696                result = Ok(());
697
698                loop {
699                    let res = tokio::select! {
700                        msg = msg_rx.next() => match msg {
701                            Some(msg) => this.handle_msg(msg).await,
702                            None => { break 'retry } // ws connection silently closed
703                        },
704                        _ = cmd_rx.recv() => {break 'retry},
705                    };
706                    if let Err(error) = res {
707                        if matches!(
708                            error,
709                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
710                        ) {
711                            // Prepare for reconnection
712                            retry_count += 1;
713                            let mut guard = this.inner.as_ref().lock().await;
714                            *guard = None;
715
716                            warn!(
717                                ?error,
718                                ?retry_count,
719                                "Connection dropped unexpectedly; Reconnecting..."
720                            );
721                            break;
722                        } else {
723                            // Other errors are considered fatal
724                            error!(?error, "Fatal error; Exiting");
725                            result = Err(error);
726                            break 'retry;
727                        }
728                    }
729                }
730            }
731
732            // Clean up before exiting
733            let mut guard = this.inner.as_ref().lock().await;
734            *guard = None;
735
736            // Check if max retries has been reached.
737            if retry_count >= this.max_reconnects {
738                error!("Max reconnection attempts reached; Exiting");
739                this.conn_notify.notify_waiters(); // Notify that the task is done
740                result = Err(DeltasError::ConnectionClosed);
741            }
742
743            result
744        });
745
746        self.conn_notify.notified().await;
747
748        if self.is_connected().await {
749            Ok(jh)
750        } else {
751            Err(DeltasError::NotConnected)
752        }
753    }
754
755    #[instrument(skip(self))]
756    async fn close(&self) -> Result<(), DeltasError> {
757        info!("Closing TychoWebsocketClient");
758        let mut guard = self.inner.lock().await;
759        let inner = guard
760            .as_mut()
761            .ok_or_else(|| DeltasError::NotConnected)?;
762        inner
763            .cmd_tx
764            .send(())
765            .await
766            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
767        Ok(())
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use std::net::SocketAddr;
774
775    use tokio::{net::TcpListener, time::timeout};
776    use tycho_common::dto::Chain;
777
778    use super::*;
779
780    #[derive(Clone)]
781    enum ExpectedComm {
782        Receive(u64, tungstenite::protocol::Message),
783        Send(tungstenite::protocol::Message),
784    }
785
786    async fn mock_tycho_ws(
787        messages: &[ExpectedComm],
788        reconnects: usize,
789    ) -> (SocketAddr, JoinHandle<()>) {
790        info!("Starting mock webserver");
791        // zero port here means the OS chooses an open port
792        let server = TcpListener::bind("127.0.0.1:0")
793            .await
794            .expect("localhost bind failed");
795        let addr = server.local_addr().unwrap();
796        let messages = messages.to_vec();
797
798        let jh = tokio::spawn(async move {
799            info!("mock webserver started");
800            for _ in 0..(reconnects + 1) {
801                if let Ok((stream, _)) = server.accept().await {
802                    let mut websocket = tokio_tungstenite::accept_async(stream)
803                        .await
804                        .unwrap();
805
806                    info!("Handling messages..");
807                    for c in messages.iter().cloned() {
808                        match c {
809                            ExpectedComm::Receive(t, exp) => {
810                                info!("Awaiting message...");
811                                let msg = timeout(Duration::from_millis(t), websocket.next())
812                                    .await
813                                    .expect("Receive timeout")
814                                    .expect("Stream exhausted")
815                                    .expect("Failed to receive message.");
816                                info!("Message received");
817                                assert_eq!(msg, exp)
818                            }
819                            ExpectedComm::Send(data) => {
820                                info!("Sending message");
821                                websocket
822                                    .send(data)
823                                    .await
824                                    .expect("Failed to send message");
825                                info!("Message sent");
826                            }
827                        };
828                    }
829                    sleep(Duration::from_millis(100)).await;
830                    // Close the WebSocket connection
831                    let _ = websocket.close(None).await;
832                }
833            }
834        });
835        (addr, jh)
836    }
837
838    #[tokio::test]
839    async fn test_subscribe_receive() {
840        let exp_comm = [
841            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
842                {
843                    "method":"subscribe",
844                    "extractor_id":{
845                        "chain":"ethereum",
846                        "name":"vm:ambient"
847                    },
848                    "include_state": true
849                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
850            )),
851            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
852                {
853                    "method":"newsubscription",
854                    "extractor_id":{
855                    "chain":"ethereum",
856                    "name":"vm:ambient"
857                    },
858                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
859                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
860            )),
861            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
862                {
863                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
864                    "deltas": {
865                        "extractor": "vm:ambient",
866                        "chain": "ethereum",
867                        "block": {
868                            "number": 123,
869                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
870                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
871                            "chain": "ethereum",             
872                            "ts": "2023-09-14T00:00:00"
873                        },
874                        "finalized_block_height": 0,
875                        "revert": false,
876                        "new_tokens": {},
877                        "account_updates": {
878                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
879                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
880                                "chain": "ethereum",
881                                "slots": {},
882                                "balance": "0x01f4",
883                                "code": "",
884                                "change": "Update"
885                            }
886                        },
887                        "state_updates": {
888                            "component_1": {
889                                "component_id": "component_1",
890                                "updated_attributes": {"attr1": "0x01"},
891                                "deleted_attributes": ["attr2"]
892                            }
893                        },
894                        "new_protocol_components": 
895                            { "protocol_1": {
896                                    "id": "protocol_1",
897                                    "protocol_system": "system_1",
898                                    "protocol_type_name": "type_1",
899                                    "chain": "ethereum",
900                                    "tokens": ["0x01", "0x02"],
901                                    "contract_ids": ["0x01", "0x02"],
902                                    "static_attributes": {"attr1": "0x01f4"},
903                                    "change": "Update",
904                                    "creation_tx": "0x01",
905                                    "created_at": "2023-09-14T00:00:00"
906                                }
907                            },
908                        "deleted_protocol_components": {},
909                        "component_balances": {
910                            "protocol_1":
911                                {
912                                    "0x01": {
913                                        "token": "0x01",
914                                        "balance": "0x01f4",
915                                        "balance_float": 0.0,
916                                        "modify_tx": "0x01",
917                                        "component_id": "protocol_1"
918                                    }
919                                }
920                        },
921                        "account_balances": {
922                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
923                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
924                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
925                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
926                                    "balance": "0x01f4",
927                                    "modify_tx": "0x01"
928                                }
929                            }
930                        },
931                        "component_tvl": {
932                            "protocol_1": 1000.0
933                        },
934                        "dci_update": {
935                            "new_entrypoints": {},
936                            "new_entrypoint_params": {},
937                            "trace_results": {}
938                        }
939                    }
940                }
941                "#.to_owned()
942            ))
943        ];
944        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
945
946        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
947        let jh = client
948            .connect()
949            .await
950            .expect("connect failed");
951        let (_, mut rx) = timeout(
952            Duration::from_millis(100),
953            client.subscribe(
954                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
955                SubscriptionOptions::new(),
956            ),
957        )
958        .await
959        .expect("subscription timed out")
960        .expect("subscription failed");
961        let _ = timeout(Duration::from_millis(100), rx.recv())
962            .await
963            .expect("awaiting message timeout out")
964            .expect("receiving message failed");
965        timeout(Duration::from_millis(100), client.close())
966            .await
967            .expect("close timed out")
968            .expect("close failed");
969        jh.await
970            .expect("ws loop errored")
971            .unwrap();
972        server_thread.await.unwrap();
973    }
974
975    #[tokio::test]
976    async fn test_unsubscribe() {
977        let exp_comm = [
978            ExpectedComm::Receive(
979                100,
980                tungstenite::protocol::Message::Text(
981                    r#"
982                {
983                    "method": "subscribe",
984                    "extractor_id":{
985                        "chain": "ethereum",
986                        "name": "vm:ambient"
987                    },
988                    "include_state": true
989                }"#
990                    .to_owned()
991                    .replace(|c: char| c.is_whitespace(), ""),
992                ),
993            ),
994            ExpectedComm::Send(tungstenite::protocol::Message::Text(
995                r#"
996                {
997                    "method": "newsubscription",
998                    "extractor_id":{
999                        "chain": "ethereum",
1000                        "name": "vm:ambient"
1001                    },
1002                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1003                }"#
1004                .to_owned()
1005                .replace(|c: char| c.is_whitespace(), ""),
1006            )),
1007            ExpectedComm::Receive(
1008                100,
1009                tungstenite::protocol::Message::Text(
1010                    r#"
1011                {
1012                    "method": "unsubscribe",
1013                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014                }
1015                "#
1016                    .to_owned()
1017                    .replace(|c: char| c.is_whitespace(), ""),
1018                ),
1019            ),
1020            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1021                r#"
1022                {
1023                    "method": "subscriptionended",
1024                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1025                }
1026                "#
1027                .to_owned()
1028                .replace(|c: char| c.is_whitespace(), ""),
1029            )),
1030        ];
1031        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1032
1033        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1034        let jh = client
1035            .connect()
1036            .await
1037            .expect("connect failed");
1038        let (sub_id, mut rx) = timeout(
1039            Duration::from_millis(100),
1040            client.subscribe(
1041                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1042                SubscriptionOptions::new(),
1043            ),
1044        )
1045        .await
1046        .expect("subscription timed out")
1047        .expect("subscription failed");
1048
1049        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1050            .await
1051            .expect("unsubscribe timed out")
1052            .expect("unsubscribe failed");
1053        let res = timeout(Duration::from_millis(100), rx.recv())
1054            .await
1055            .expect("awaiting message timeout out");
1056
1057        // If the subscription ended, the channel should have been closed.
1058        assert!(res.is_none());
1059
1060        timeout(Duration::from_millis(100), client.close())
1061            .await
1062            .expect("close timed out")
1063            .expect("close failed");
1064        jh.await
1065            .expect("ws loop errored")
1066            .unwrap();
1067        server_thread.await.unwrap();
1068    }
1069
1070    #[tokio::test]
1071    async fn test_subscription_unexpected_end() {
1072        let exp_comm = [
1073            ExpectedComm::Receive(
1074                100,
1075                tungstenite::protocol::Message::Text(
1076                    r#"
1077                {
1078                    "method":"subscribe",
1079                    "extractor_id":{
1080                        "chain":"ethereum",
1081                        "name":"vm:ambient"
1082                    },
1083                    "include_state": true
1084                }"#
1085                    .to_owned()
1086                    .replace(|c: char| c.is_whitespace(), ""),
1087                ),
1088            ),
1089            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1090                r#"
1091                {
1092                    "method":"newsubscription",
1093                    "extractor_id":{
1094                        "chain":"ethereum",
1095                        "name":"vm:ambient"
1096                    },
1097                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1098                }"#
1099                .to_owned()
1100                .replace(|c: char| c.is_whitespace(), ""),
1101            )),
1102            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1103                r#"
1104                {
1105                    "method": "subscriptionended",
1106                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1107                }"#
1108                .to_owned()
1109                .replace(|c: char| c.is_whitespace(), ""),
1110            )),
1111        ];
1112        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1113
1114        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1115        let jh = client
1116            .connect()
1117            .await
1118            .expect("connect failed");
1119        let (_, mut rx) = timeout(
1120            Duration::from_millis(100),
1121            client.subscribe(
1122                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1123                SubscriptionOptions::new(),
1124            ),
1125        )
1126        .await
1127        .expect("subscription timed out")
1128        .expect("subscription failed");
1129        let res = timeout(Duration::from_millis(100), rx.recv())
1130            .await
1131            .expect("awaiting message timeout out");
1132
1133        // If the subscription ended, the channel should have been closed.
1134        assert!(res.is_none());
1135
1136        timeout(Duration::from_millis(100), client.close())
1137            .await
1138            .expect("close timed out")
1139            .expect("close failed");
1140        jh.await
1141            .expect("ws loop errored")
1142            .unwrap();
1143        server_thread.await.unwrap();
1144    }
1145
1146    #[test_log::test(tokio::test)]
1147    async fn test_reconnect() {
1148        let exp_comm = [
1149            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1150                {
1151                    "method":"subscribe",
1152                    "extractor_id":{
1153                        "chain":"ethereum",
1154                        "name":"vm:ambient"
1155                    },
1156                    "include_state": true
1157                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1158            )),
1159            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1160                {
1161                    "method":"newsubscription",
1162                    "extractor_id":{
1163                    "chain":"ethereum",
1164                    "name":"vm:ambient"
1165                    },
1166                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1167                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1168            )),
1169            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1170                {
1171                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1172                    "deltas": {
1173                        "extractor": "vm:ambient",
1174                        "chain": "ethereum",
1175                        "block": {
1176                            "number": 123,
1177                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1178                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1179                            "chain": "ethereum",             
1180                            "ts": "2023-09-14T00:00:00"
1181                        },
1182                        "finalized_block_height": 0,
1183                        "revert": false,
1184                        "new_tokens": {},
1185                        "account_updates": {
1186                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1187                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1188                                "chain": "ethereum",
1189                                "slots": {},
1190                                "balance": "0x01f4",
1191                                "code": "",
1192                                "change": "Update"
1193                            }
1194                        },
1195                        "state_updates": {
1196                            "component_1": {
1197                                "component_id": "component_1",
1198                                "updated_attributes": {"attr1": "0x01"},
1199                                "deleted_attributes": ["attr2"]
1200                            }
1201                        },
1202                        "new_protocol_components": {
1203                            "protocol_1":
1204                                {
1205                                    "id": "protocol_1",
1206                                    "protocol_system": "system_1",
1207                                    "protocol_type_name": "type_1",
1208                                    "chain": "ethereum",
1209                                    "tokens": ["0x01", "0x02"],
1210                                    "contract_ids": ["0x01", "0x02"],
1211                                    "static_attributes": {"attr1": "0x01f4"},
1212                                    "change": "Update",
1213                                    "creation_tx": "0x01",
1214                                    "created_at": "2023-09-14T00:00:00"
1215                                }
1216                            },
1217                        "deleted_protocol_components": {},
1218                        "component_balances": {
1219                            "protocol_1": {
1220                                "0x01": {
1221                                    "token": "0x01",
1222                                    "balance": "0x01f4",
1223                                    "balance_float": 1000.0,
1224                                    "modify_tx": "0x01",
1225                                    "component_id": "protocol_1"
1226                                }
1227                            }
1228                        },
1229                        "account_balances": {
1230                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1231                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1232                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1233                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1234                                    "balance": "0x01f4",
1235                                    "modify_tx": "0x01"
1236                                }
1237                            }
1238                        },
1239                        "component_tvl": {
1240                            "protocol_1": 1000.0
1241                        },
1242                        "dci_update": {
1243                            "new_entrypoints": {},
1244                            "new_entrypoint_params": {},
1245                            "trace_results": {}
1246                        }
1247                    }
1248                }
1249                "#.to_owned()
1250            ))
1251        ];
1252        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1253        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1254        let jh: JoinHandle<Result<(), DeltasError>> = client
1255            .connect()
1256            .await
1257            .expect("connect failed");
1258
1259        for _ in 0..2 {
1260            let (_, mut rx) = timeout(
1261                Duration::from_millis(100),
1262                client.subscribe(
1263                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1264                    SubscriptionOptions::new(),
1265                ),
1266            )
1267            .await
1268            .expect("subscription timed out")
1269            .expect("subscription failed");
1270
1271            let _ = timeout(Duration::from_millis(100), rx.recv())
1272                .await
1273                .expect("awaiting message timeout out")
1274                .expect("receiving message failed");
1275
1276            // wait for the connection to drop
1277            let res = timeout(Duration::from_millis(200), rx.recv())
1278                .await
1279                .expect("awaiting closed connection timeout out");
1280            assert!(res.is_none());
1281        }
1282        let res = jh.await.expect("ws client join failed");
1283        // 5th client reconnect attempt should fail
1284        assert!(res.is_err());
1285        server_thread
1286            .await
1287            .expect("ws server loop errored");
1288    }
1289
1290    async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1291        let server = TcpListener::bind("127.0.0.1:0")
1292            .await
1293            .expect("localhost bind failed");
1294        let addr = server.local_addr().unwrap();
1295        let jh = tokio::spawn(async move {
1296            while let Ok((stream, _)) = server.accept().await {
1297                // Immediately close the connection to simulate a failure
1298                drop(stream);
1299            }
1300        });
1301        (addr, jh)
1302    }
1303
1304    #[tokio::test]
1305    async fn test_connect_max_attempts() {
1306        let (addr, _) = mock_bad_connection_tycho_ws().await;
1307        let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1308
1309        let join_handle = client.connect().await;
1310
1311        assert!(join_handle.is_err());
1312        assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1313    }
1314}