tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is clonable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::Arc,
25    time::Duration,
26};
27
28use async_trait::async_trait;
29use futures03::{stream::SplitSink, SinkExt, StreamExt};
30use hyper::{
31    header::{
32        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
33        USER_AGENT,
34    },
35    Uri,
36};
37#[cfg(test)]
38use mockall::automock;
39use thiserror::Error;
40use tokio::{
41    net::TcpStream,
42    sync::{
43        mpsc::{self, error::TrySendError, Receiver, Sender},
44        oneshot, Mutex, Notify,
45    },
46    task::JoinHandle,
47    time::sleep,
48};
49use tokio_tungstenite::{
50    connect_async,
51    tungstenite::{
52        self,
53        handshake::client::{generate_key, Request},
54    },
55    MaybeTlsStream, WebSocketStream,
56};
57use tracing::{debug, error, info, instrument, trace, warn};
58use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
59use uuid::Uuid;
60
61use crate::TYCHO_SERVER_VERSION;
62
63#[derive(Error, Debug)]
64pub enum DeltasError {
65    /// Failed to parse the provided URI.
66    #[error("Failed to parse URI: {0}. Error: {1}")]
67    UriParsing(String, String),
68
69    /// The requested subscription is already pending and is awaiting confirmation from the server.
70    #[error("The requested subscription is already pending")]
71    SubscriptionAlreadyPending,
72
73    /// A message failed to send via an internal channel or through the websocket channel.
74    /// This is typically a fatal error and might indicate a bug in the implementation.
75    #[error("{0}")]
76    TransportError(String),
77
78    /// The internal message buffer is full. This likely means that messages are not being consumed
79    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
80    /// size.
81    #[error("The buffer is full!")]
82    BufferFull,
83
84    /// The client has no active connections but was accessed (e.g., by calling subscribe).
85    /// This typically occurs when trying to use the client before calling connect() or
86    /// after the connection has been closed.
87    #[error("The client is not connected!")]
88    NotConnected,
89
90    /// The connect method was called while the client already had an active connection.
91    #[error("The client is already connected!")]
92    AlreadyConnected,
93
94    /// The connection was closed orderly by the server, e.g. because it restarted.
95    #[error("The server closed the connection!")]
96    ConnectionClosed,
97
98    /// The connection was closed unexpectedly by the server or encountered a network error.
99    #[error("Connection error: {0}")]
100    ConnectionError(#[from] Box<tungstenite::Error>),
101
102    /// A fatal error occurred that cannot be recovered from.
103    #[error("Tycho FatalError: {0}")]
104    Fatal(String),
105}
106
107#[derive(Clone, Debug)]
108pub struct SubscriptionOptions {
109    include_state: bool,
110}
111
112impl Default for SubscriptionOptions {
113    fn default() -> Self {
114        Self { include_state: true }
115    }
116}
117
118impl SubscriptionOptions {
119    pub fn new() -> Self {
120        Self::default()
121    }
122    pub fn with_state(mut self, val: bool) -> Self {
123        self.include_state = val;
124        self
125    }
126}
127
128#[cfg_attr(test, automock)]
129#[async_trait]
130pub trait DeltasClient {
131    /// Subscribe to an extractor and receive realtime messages
132    ///
133    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
134    /// cancels while waiting for confirmation the subscription may still be registered. If the
135    /// receiver was deallocated though, the first message from the subscription will remove it
136    /// again - since there is no one to inform about these messages.
137    async fn subscribe(
138        &self,
139        extractor_id: ExtractorIdentity,
140        options: SubscriptionOptions,
141    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
142
143    /// Unsubscribe from an subscription
144    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
145
146    /// Start the clients message handling loop.
147    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
148
149    /// Close the clients message handling loop.
150    async fn close(&self) -> Result<(), DeltasError>;
151}
152
153#[derive(Clone)]
154pub struct WsDeltasClient {
155    /// The tycho indexer websocket uri.
156    uri: Uri,
157    /// Authorization key for the websocket connection.
158    auth_key: Option<String>,
159    /// Maximum amount of reconnects to try before giving up.
160    max_reconnects: u32,
161    /// The client will buffer this many messages incoming from the websocket
162    /// before starting to drop them.
163    ws_buffer_size: usize,
164    /// The client will buffer that many messages for each subscription before it starts droppping
165    /// them.
166    subscription_buffer_size: usize,
167    /// Notify tasks waiting for a connection to be established.
168    conn_notify: Arc<Notify>,
169    /// Shared client instance state.
170    inner: Arc<Mutex<Option<Inner>>>,
171}
172
173type WebSocketSink =
174    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
175
176/// Subscription State
177///
178/// Subscription go through a lifecycle:
179///
180/// ```text
181/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
182/// ```
183///
184/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
185/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
186/// for `unsubscribe`.
187#[derive(Debug)]
188enum SubscriptionInfo {
189    /// Subscription was requested we wait for server confirmation and uuid assignment.
190    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
191    /// Subscription is active.
192    Active,
193    /// Unsubscription was requested, we wait for server confirmation.
194    RequestedUnsubscription(oneshot::Sender<()>),
195}
196
197/// Internal struct containing shared state between of WsDeltaClient instances.
198struct Inner {
199    /// Websocket sender handle.
200    sink: WebSocketSink,
201    /// Command channel sender handle.
202    cmd_tx: Sender<()>,
203    /// Currently pending subscriptions.
204    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
205    /// Active subscriptions.
206    subscriptions: HashMap<Uuid, SubscriptionInfo>,
207    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
208    /// subscribe.
209    sender: HashMap<Uuid, Sender<BlockChanges>>,
210    /// How many messages to buffer per subscription before starting to drop new messages.
211    buffer_size: usize,
212}
213
214/// Shared state betweeen all client instances.
215///
216/// This state is behind a mutex and requires synchronization to be read of modified.
217impl Inner {
218    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
219        Self {
220            sink,
221            cmd_tx,
222            pending: HashMap::new(),
223            subscriptions: HashMap::new(),
224            sender: HashMap::new(),
225            buffer_size,
226        }
227    }
228
229    /// Registers a new pending subscription.
230    #[allow(clippy::result_large_err)]
231    fn new_subscription(
232        &mut self,
233        id: &ExtractorIdentity,
234        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
235    ) -> Result<(), DeltasError> {
236        if self.pending.contains_key(id) {
237            return Err(DeltasError::SubscriptionAlreadyPending);
238        }
239        self.pending
240            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
241        Ok(())
242    }
243
244    /// Transitions a pending subscription to active.
245    ///
246    /// Will ignore any request to do so for subscriptions that are not pending.
247    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
248        if let Some(info) = self.pending.remove(extractor_id) {
249            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
250                let (tx, rx) = mpsc::channel(self.buffer_size);
251                self.sender.insert(subscription_id, tx);
252                self.subscriptions
253                    .insert(subscription_id, SubscriptionInfo::Active);
254                let _ = ready_tx
255                    .send((subscription_id, rx))
256                    .map_err(|_| {
257                        warn!(
258                            ?extractor_id,
259                            ?subscription_id,
260                            "Subscriber for has gone away. Ignoring."
261                        )
262                    });
263            } else {
264                error!(
265                    ?extractor_id,
266                    ?subscription_id,
267                    "Pending subscription was not in the correct state to 
268                    transition to active. Ignoring!"
269                )
270            }
271        } else {
272            error!(
273                ?extractor_id,
274                ?subscription_id,
275                "Tried to mark an unkown subscription as active. Ignoring!"
276            );
277        }
278    }
279
280    /// Sends a message to a subscription's receiver.
281    #[allow(clippy::result_large_err)]
282    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
283        if let Some(sender) = self.sender.get_mut(id) {
284            sender
285                .try_send(msg)
286                .map_err(|e| match e {
287                    TrySendError::Full(_) => DeltasError::BufferFull,
288                    TrySendError::Closed(_) => {
289                        DeltasError::TransportError("The subscriber has gone away".to_string())
290                    }
291                })?;
292        }
293        Ok(())
294    }
295
296    /// Requests a subscription to end.
297    ///
298    /// The subscription needs to exist and be active for this to have any effect. Wll use
299    /// `ready_tx` to notify the receiver once the transition to ended completed.
300    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
301        if let Some(info) = self
302            .subscriptions
303            .get_mut(subscription_id)
304        {
305            if let SubscriptionInfo::Active = info {
306                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
307            }
308        } else {
309            // no big deal imo so only debug lvl..
310            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
311        }
312    }
313
314    /// Removes and fully ends a subscription
315    ///
316    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
317    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
318    /// Will remove a subscription even it was in active or pending state before, this is to support
319    /// any server side failure of the subscription.
320    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
321        if let Entry::Occupied(e) = self
322            .subscriptions
323            .entry(subscription_id)
324        {
325            let info = e.remove();
326            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
327                let _ = tx.send(()).map_err(|_| {
328                    warn!(?subscription_id, "failed to notify about removed subscription")
329                });
330                self.sender
331                    .remove(&subscription_id)
332                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
333            } else {
334                warn!(?subscription_id, "Subscription ended unexpectedly!");
335                self.sender
336                    .remove(&subscription_id)
337                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
338            }
339        } else {
340            error!(
341                ?subscription_id,
342                "Received `SubscriptionEnded`, but was never subscribed 
343                to it. This is likely a bug!"
344            );
345        }
346
347        Ok(())
348    }
349
350    /// Sends a message through the websocket.
351    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
352        self.sink.send(msg).await.map_err(|e| {
353            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
354        })
355    }
356}
357
358/// Tycho client websocket implementation.
359impl WsDeltasClient {
360    // Construct a new client with 5 reconnection attempts.
361    #[allow(clippy::result_large_err)]
362    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
363        let uri = ws_uri
364            .parse::<Uri>()
365            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
366        Ok(Self {
367            uri,
368            auth_key: auth_key.map(|s| s.to_string()),
369            inner: Arc::new(Mutex::new(None)),
370            ws_buffer_size: 128,
371            subscription_buffer_size: 128,
372            conn_notify: Arc::new(Notify::new()),
373            max_reconnects: 5,
374        })
375    }
376
377    // Construct a new client with a custom number of reconnection attempts.
378    #[allow(clippy::result_large_err)]
379    pub fn new_with_reconnects(
380        ws_uri: &str,
381        max_reconnects: u32,
382        auth_key: Option<&str>,
383    ) -> Result<Self, DeltasError> {
384        let uri = ws_uri
385            .parse::<Uri>()
386            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
387
388        Ok(Self {
389            uri,
390            auth_key: auth_key.map(|s| s.to_string()),
391            inner: Arc::new(Mutex::new(None)),
392            ws_buffer_size: 128,
393            subscription_buffer_size: 128,
394            conn_notify: Arc::new(Notify::new()),
395            max_reconnects,
396        })
397    }
398
399    /// Ensures that the client is connected.
400    ///
401    /// This method will acquire the lock for inner.
402    async fn is_connected(&self) -> bool {
403        let guard = self.inner.as_ref().lock().await;
404        guard.is_some()
405    }
406
407    /// Waits for the client to be connected
408    ///
409    /// This method acquires the lock for inner for a short period, then waits until the  
410    /// connection is established if not already connected.
411    async fn ensure_connection(&self) {
412        if !self.is_connected().await {
413            self.conn_notify.notified().await;
414        }
415    }
416
417    /// Main message handling logic
418    ///
419    /// If the message returns an error, a reconnect attempt may be considered depending on the
420    /// error type.
421    #[instrument(skip(self, msg))]
422    async fn handle_msg(
423        &self,
424        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
425    ) -> Result<(), DeltasError> {
426        let mut guard = self.inner.lock().await;
427
428        match msg {
429            // We do not deserialize the message directly into a WebSocketMessage. This is because
430            // the serde arbitrary_precision feature (often included in many
431            // dependencies we use) breaks some untagged enum deserializations. Instead,
432            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
433            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
434                serde_json::Value,
435            >(&text)
436            {
437                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
438                    Ok(ws_message) => match ws_message {
439                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
440                            trace!(?deltas, "Received a block state change, sending to channel");
441                            let inner = guard
442                                .as_mut()
443                                .ok_or_else(|| DeltasError::NotConnected)?;
444                            match inner.send(&subscription_id, deltas) {
445                                Err(DeltasError::BufferFull) => {
446                                    error!(?subscription_id, "Buffer full, message dropped!");
447                                }
448                                Err(_) => {
449                                    warn!(
450                                        ?subscription_id,
451                                        "Receiver for has gone away, unsubscribing!"
452                                    );
453                                    let (tx, rx) = oneshot::channel();
454                                    if let Err(e) = WsDeltasClient::unsubscribe_inner(
455                                        inner,
456                                        subscription_id,
457                                        tx,
458                                    )
459                                    .await
460                                    {
461                                        warn!(
462                                            ?e,
463                                            ?subscription_id,
464                                            "Failed to send unsubscribe command"
465                                        );
466                                    } else {
467                                        // Wait for unsubscribe completion with timeout
468                                        match tokio::time::timeout(Duration::from_secs(5), rx).await
469                                        {
470                                            Ok(_) => {
471                                                debug!(
472                                                    ?subscription_id,
473                                                    "Unsubscribe completed successfully"
474                                                );
475                                            }
476                                            Err(_) => {
477                                                warn!(
478                                                    ?subscription_id,
479                                                    "Unsubscribe completion timed out"
480                                                );
481                                            }
482                                        }
483                                    }
484                                }
485                                _ => { /* Do nothing */ }
486                            }
487                        }
488                        WebSocketMessage::Response(Response::NewSubscription {
489                            extractor_id,
490                            subscription_id,
491                        }) => {
492                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
493                            let inner = guard
494                                .as_mut()
495                                .ok_or_else(|| DeltasError::NotConnected)?;
496                            inner.mark_active(&extractor_id, subscription_id);
497                        }
498                        WebSocketMessage::Response(Response::SubscriptionEnded {
499                            subscription_id,
500                        }) => {
501                            info!(?subscription_id, "Received a subscription ended");
502                            let inner = guard
503                                .as_mut()
504                                .ok_or_else(|| DeltasError::NotConnected)?;
505                            inner.remove_subscription(subscription_id)?;
506                        }
507                    },
508                    Err(e) => {
509                        error!(
510                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
511                            e, text
512                        );
513                    }
514                },
515                Err(e) => {
516                    error!(
517                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
518                        e, text
519                    );
520                }
521            },
522            Ok(tungstenite::protocol::Message::Ping(_)) => {
523                // Respond to pings with pongs.
524                let inner = guard
525                    .as_mut()
526                    .ok_or_else(|| DeltasError::NotConnected)?;
527                if let Err(error) = inner
528                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
529                    .await
530                {
531                    debug!(?error, "Failed to send pong!");
532                }
533            }
534            Ok(tungstenite::protocol::Message::Pong(_)) => {
535                // Do nothing.
536            }
537            Ok(tungstenite::protocol::Message::Close(_)) => {
538                return Err(DeltasError::ConnectionClosed);
539            }
540            Ok(unknown_msg) => {
541                info!("Received an unknown message type: {:?}", unknown_msg);
542            }
543            Err(error) => {
544                error!(?error, "Websocket error");
545                return Err(match error {
546                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
547                    tungstenite::Error::AlreadyClosed => {
548                        warn!("Received AlreadyClosed error which is indicative of a bug!");
549                        DeltasError::ConnectionError(Box::new(error))
550                    }
551                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
552                        DeltasError::ConnectionError(Box::new(error))
553                    }
554                    _ => DeltasError::Fatal(error.to_string()),
555                });
556            }
557        };
558        Ok(())
559    }
560
561    /// Helper method to force request a unsubscribe of a subscription
562    ///
563    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
564    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
565    /// receivers.
566    async fn unsubscribe_inner(
567        inner: &mut Inner,
568        subscription_id: Uuid,
569        ready_tx: oneshot::Sender<()>,
570    ) -> Result<(), DeltasError> {
571        inner.end_subscription(&subscription_id, ready_tx);
572        let cmd = Command::Unsubscribe { subscription_id };
573        inner
574            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
575                |e| {
576                    DeltasError::TransportError(format!(
577                        "Failed to serialize unsubscribe command: {e}"
578                    ))
579                },
580            )?))
581            .await?;
582        Ok(())
583    }
584}
585
586#[async_trait]
587impl DeltasClient for WsDeltasClient {
588    #[instrument(skip(self))]
589    async fn subscribe(
590        &self,
591        extractor_id: ExtractorIdentity,
592        options: SubscriptionOptions,
593    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
594        trace!("Starting subscribe");
595        self.ensure_connection().await;
596        let (ready_tx, ready_rx) = oneshot::channel();
597        {
598            let mut guard = self.inner.lock().await;
599            let inner = guard
600                .as_mut()
601                .ok_or_else(|| DeltasError::NotConnected)?;
602            trace!("Sending subscribe command");
603            inner.new_subscription(&extractor_id, ready_tx)?;
604            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
605            inner
606                .ws_send(tungstenite::protocol::Message::Text(
607                    serde_json::to_string(&cmd).map_err(|e| {
608                        DeltasError::TransportError(format!(
609                            "Failed to serialize subscribe command: {e}"
610                        ))
611                    })?,
612                ))
613                .await?;
614        }
615        trace!("Waiting for subscription response");
616        let rx = ready_rx.await.map_err(|_| {
617            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
618        })?;
619        trace!("Subscription successful");
620        Ok(rx)
621    }
622
623    #[instrument(skip(self))]
624    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
625        self.ensure_connection().await;
626        let (ready_tx, ready_rx) = oneshot::channel();
627        {
628            let mut guard = self.inner.lock().await;
629            let inner = guard
630                .as_mut()
631                .ok_or_else(|| DeltasError::NotConnected)?;
632
633            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
634        }
635        ready_rx.await.map_err(|_| {
636            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
637        })?;
638
639        Ok(())
640    }
641
642    #[instrument(skip(self))]
643    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
644        if self.is_connected().await {
645            return Err(DeltasError::AlreadyConnected);
646        }
647        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
648        info!(?ws_uri, "Starting TychoWebsocketClient");
649
650        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
651        {
652            let mut guard = self.inner.as_ref().lock().await;
653            *guard = None;
654        }
655        let this = self.clone();
656        let jh = tokio::spawn(async move {
657            let mut retry_count = 0;
658            let mut result = Err(DeltasError::NotConnected);
659
660            'retry: while retry_count < this.max_reconnects {
661                info!(?ws_uri, "Connecting to WebSocket server");
662
663                // Create a WebSocket request
664                let mut request_builder = Request::builder()
665                    .uri(&ws_uri)
666                    .header(SEC_WEBSOCKET_KEY, generate_key())
667                    .header(SEC_WEBSOCKET_VERSION, 13)
668                    .header(CONNECTION, "Upgrade")
669                    .header(UPGRADE, "websocket")
670                    .header(
671                        HOST,
672                        this.uri.host().ok_or_else(|| {
673                            DeltasError::UriParsing(
674                                ws_uri.clone(),
675                                "No host found in tycho url".to_string(),
676                            )
677                        })?,
678                    )
679                    .header(
680                        USER_AGENT,
681                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
682                    );
683
684                // Add Authorization if one is given
685                if let Some(ref key) = this.auth_key {
686                    request_builder = request_builder.header(AUTHORIZATION, key);
687                }
688
689                let request = request_builder.body(()).map_err(|e| {
690                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
691                })?;
692                let (conn, _) = match connect_async(request).await {
693                    Ok(conn) => conn,
694                    Err(e) => {
695                        // Prepare for reconnection
696                        retry_count += 1;
697                        let mut guard = this.inner.as_ref().lock().await;
698                        *guard = None;
699
700                        warn!(
701                            e = e.to_string(),
702                            "Failed to connect to WebSocket server; Reconnecting"
703                        );
704                        sleep(Duration::from_millis(500)).await;
705
706                        continue 'retry;
707                    }
708                };
709
710                let (ws_tx_new, ws_rx_new) = conn.split();
711                {
712                    let mut guard = this.inner.as_ref().lock().await;
713                    *guard =
714                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
715                }
716                let mut msg_rx = ws_rx_new.boxed();
717
718                info!("Connection Successful: TychoWebsocketClient started");
719                this.conn_notify.notify_waiters();
720                result = Ok(());
721
722                loop {
723                    let res = tokio::select! {
724                        msg = msg_rx.next() => match msg {
725                            Some(msg) => this.handle_msg(msg).await,
726                            None => { break 'retry } // ws connection silently closed
727                        },
728                        _ = cmd_rx.recv() => {break 'retry},
729                    };
730                    if let Err(error) = res {
731                        if matches!(
732                            error,
733                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
734                        ) {
735                            // Prepare for reconnection
736                            retry_count += 1;
737                            let mut guard = this.inner.as_ref().lock().await;
738                            *guard = None;
739
740                            warn!(
741                                ?error,
742                                ?retry_count,
743                                "Connection dropped unexpectedly; Reconnecting..."
744                            );
745                            break;
746                        } else {
747                            // Other errors are considered fatal
748                            error!(?error, "Fatal error; Exiting");
749                            result = Err(error);
750                            break 'retry;
751                        }
752                    }
753                }
754            }
755
756            // Clean up before exiting
757            let mut guard = this.inner.as_ref().lock().await;
758            *guard = None;
759
760            // Check if max retries has been reached.
761            if retry_count >= this.max_reconnects {
762                error!("Max reconnection attempts reached; Exiting");
763                this.conn_notify.notify_waiters(); // Notify that the task is done
764                result = Err(DeltasError::ConnectionClosed);
765            }
766
767            result
768        });
769
770        self.conn_notify.notified().await;
771
772        if self.is_connected().await {
773            Ok(jh)
774        } else {
775            Err(DeltasError::NotConnected)
776        }
777    }
778
779    #[instrument(skip(self))]
780    async fn close(&self) -> Result<(), DeltasError> {
781        info!("Closing TychoWebsocketClient");
782        let mut guard = self.inner.lock().await;
783        let inner = guard
784            .as_mut()
785            .ok_or_else(|| DeltasError::NotConnected)?;
786        inner
787            .cmd_tx
788            .send(())
789            .await
790            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
791        Ok(())
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::net::SocketAddr;
798
799    use tokio::{net::TcpListener, time::timeout};
800    use tycho_common::dto::Chain;
801
802    use super::*;
803
804    #[derive(Clone)]
805    enum ExpectedComm {
806        Receive(u64, tungstenite::protocol::Message),
807        Send(tungstenite::protocol::Message),
808    }
809
810    async fn mock_tycho_ws(
811        messages: &[ExpectedComm],
812        reconnects: usize,
813    ) -> (SocketAddr, JoinHandle<()>) {
814        info!("Starting mock webserver");
815        // zero port here means the OS chooses an open port
816        let server = TcpListener::bind("127.0.0.1:0")
817            .await
818            .expect("localhost bind failed");
819        let addr = server.local_addr().unwrap();
820        let messages = messages.to_vec();
821
822        let jh = tokio::spawn(async move {
823            info!("mock webserver started");
824            for _ in 0..(reconnects + 1) {
825                if let Ok((stream, _)) = server.accept().await {
826                    let mut websocket = tokio_tungstenite::accept_async(stream)
827                        .await
828                        .unwrap();
829
830                    info!("Handling messages..");
831                    for c in messages.iter().cloned() {
832                        match c {
833                            ExpectedComm::Receive(t, exp) => {
834                                info!("Awaiting message...");
835                                let msg = timeout(Duration::from_millis(t), websocket.next())
836                                    .await
837                                    .expect("Receive timeout")
838                                    .expect("Stream exhausted")
839                                    .expect("Failed to receive message.");
840                                info!("Message received");
841                                assert_eq!(msg, exp)
842                            }
843                            ExpectedComm::Send(data) => {
844                                info!("Sending message");
845                                websocket
846                                    .send(data)
847                                    .await
848                                    .expect("Failed to send message");
849                                info!("Message sent");
850                            }
851                        };
852                    }
853                    sleep(Duration::from_millis(100)).await;
854                    // Close the WebSocket connection
855                    let _ = websocket.close(None).await;
856                }
857            }
858        });
859        (addr, jh)
860    }
861
862    #[tokio::test]
863    async fn test_subscribe_receive() {
864        let exp_comm = [
865            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
866                {
867                    "method":"subscribe",
868                    "extractor_id":{
869                        "chain":"ethereum",
870                        "name":"vm:ambient"
871                    },
872                    "include_state": true
873                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
874            )),
875            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
876                {
877                    "method":"newsubscription",
878                    "extractor_id":{
879                    "chain":"ethereum",
880                    "name":"vm:ambient"
881                    },
882                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
883                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
884            )),
885            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
886                {
887                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
888                    "deltas": {
889                        "extractor": "vm:ambient",
890                        "chain": "ethereum",
891                        "block": {
892                            "number": 123,
893                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
894                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
895                            "chain": "ethereum",             
896                            "ts": "2023-09-14T00:00:00"
897                        },
898                        "finalized_block_height": 0,
899                        "revert": false,
900                        "new_tokens": {},
901                        "account_updates": {
902                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
903                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
904                                "chain": "ethereum",
905                                "slots": {},
906                                "balance": "0x01f4",
907                                "code": "",
908                                "change": "Update"
909                            }
910                        },
911                        "state_updates": {
912                            "component_1": {
913                                "component_id": "component_1",
914                                "updated_attributes": {"attr1": "0x01"},
915                                "deleted_attributes": ["attr2"]
916                            }
917                        },
918                        "new_protocol_components": 
919                            { "protocol_1": {
920                                    "id": "protocol_1",
921                                    "protocol_system": "system_1",
922                                    "protocol_type_name": "type_1",
923                                    "chain": "ethereum",
924                                    "tokens": ["0x01", "0x02"],
925                                    "contract_ids": ["0x01", "0x02"],
926                                    "static_attributes": {"attr1": "0x01f4"},
927                                    "change": "Update",
928                                    "creation_tx": "0x01",
929                                    "created_at": "2023-09-14T00:00:00"
930                                }
931                            },
932                        "deleted_protocol_components": {},
933                        "component_balances": {
934                            "protocol_1":
935                                {
936                                    "0x01": {
937                                        "token": "0x01",
938                                        "balance": "0x01f4",
939                                        "balance_float": 0.0,
940                                        "modify_tx": "0x01",
941                                        "component_id": "protocol_1"
942                                    }
943                                }
944                        },
945                        "account_balances": {
946                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
947                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
948                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
949                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
950                                    "balance": "0x01f4",
951                                    "modify_tx": "0x01"
952                                }
953                            }
954                        },
955                        "component_tvl": {
956                            "protocol_1": 1000.0
957                        },
958                        "dci_update": {
959                            "new_entrypoints": {},
960                            "new_entrypoint_params": {},
961                            "trace_results": {}
962                        }
963                    }
964                }
965                "#.to_owned()
966            ))
967        ];
968        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
969
970        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
971        let jh = client
972            .connect()
973            .await
974            .expect("connect failed");
975        let (_, mut rx) = timeout(
976            Duration::from_millis(100),
977            client.subscribe(
978                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
979                SubscriptionOptions::new(),
980            ),
981        )
982        .await
983        .expect("subscription timed out")
984        .expect("subscription failed");
985        let _ = timeout(Duration::from_millis(100), rx.recv())
986            .await
987            .expect("awaiting message timeout out")
988            .expect("receiving message failed");
989        timeout(Duration::from_millis(100), client.close())
990            .await
991            .expect("close timed out")
992            .expect("close failed");
993        jh.await
994            .expect("ws loop errored")
995            .unwrap();
996        server_thread.await.unwrap();
997    }
998
999    #[tokio::test]
1000    async fn test_unsubscribe() {
1001        let exp_comm = [
1002            ExpectedComm::Receive(
1003                100,
1004                tungstenite::protocol::Message::Text(
1005                    r#"
1006                {
1007                    "method": "subscribe",
1008                    "extractor_id":{
1009                        "chain": "ethereum",
1010                        "name": "vm:ambient"
1011                    },
1012                    "include_state": true
1013                }"#
1014                    .to_owned()
1015                    .replace(|c: char| c.is_whitespace(), ""),
1016                ),
1017            ),
1018            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1019                r#"
1020                {
1021                    "method": "newsubscription",
1022                    "extractor_id":{
1023                        "chain": "ethereum",
1024                        "name": "vm:ambient"
1025                    },
1026                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1027                }"#
1028                .to_owned()
1029                .replace(|c: char| c.is_whitespace(), ""),
1030            )),
1031            ExpectedComm::Receive(
1032                100,
1033                tungstenite::protocol::Message::Text(
1034                    r#"
1035                {
1036                    "method": "unsubscribe",
1037                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1038                }
1039                "#
1040                    .to_owned()
1041                    .replace(|c: char| c.is_whitespace(), ""),
1042                ),
1043            ),
1044            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1045                r#"
1046                {
1047                    "method": "subscriptionended",
1048                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1049                }
1050                "#
1051                .to_owned()
1052                .replace(|c: char| c.is_whitespace(), ""),
1053            )),
1054        ];
1055        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1056
1057        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1058        let jh = client
1059            .connect()
1060            .await
1061            .expect("connect failed");
1062        let (sub_id, mut rx) = timeout(
1063            Duration::from_millis(100),
1064            client.subscribe(
1065                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1066                SubscriptionOptions::new(),
1067            ),
1068        )
1069        .await
1070        .expect("subscription timed out")
1071        .expect("subscription failed");
1072
1073        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1074            .await
1075            .expect("unsubscribe timed out")
1076            .expect("unsubscribe failed");
1077        let res = timeout(Duration::from_millis(100), rx.recv())
1078            .await
1079            .expect("awaiting message timeout out");
1080
1081        // If the subscription ended, the channel should have been closed.
1082        assert!(res.is_none());
1083
1084        timeout(Duration::from_millis(100), client.close())
1085            .await
1086            .expect("close timed out")
1087            .expect("close failed");
1088        jh.await
1089            .expect("ws loop errored")
1090            .unwrap();
1091        server_thread.await.unwrap();
1092    }
1093
1094    #[tokio::test]
1095    async fn test_subscription_unexpected_end() {
1096        let exp_comm = [
1097            ExpectedComm::Receive(
1098                100,
1099                tungstenite::protocol::Message::Text(
1100                    r#"
1101                {
1102                    "method":"subscribe",
1103                    "extractor_id":{
1104                        "chain":"ethereum",
1105                        "name":"vm:ambient"
1106                    },
1107                    "include_state": true
1108                }"#
1109                    .to_owned()
1110                    .replace(|c: char| c.is_whitespace(), ""),
1111                ),
1112            ),
1113            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1114                r#"
1115                {
1116                    "method":"newsubscription",
1117                    "extractor_id":{
1118                        "chain":"ethereum",
1119                        "name":"vm:ambient"
1120                    },
1121                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1122                }"#
1123                .to_owned()
1124                .replace(|c: char| c.is_whitespace(), ""),
1125            )),
1126            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1127                r#"
1128                {
1129                    "method": "subscriptionended",
1130                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1131                }"#
1132                .to_owned()
1133                .replace(|c: char| c.is_whitespace(), ""),
1134            )),
1135        ];
1136        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1137
1138        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1139        let jh = client
1140            .connect()
1141            .await
1142            .expect("connect failed");
1143        let (_, mut rx) = timeout(
1144            Duration::from_millis(100),
1145            client.subscribe(
1146                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1147                SubscriptionOptions::new(),
1148            ),
1149        )
1150        .await
1151        .expect("subscription timed out")
1152        .expect("subscription failed");
1153        let res = timeout(Duration::from_millis(100), rx.recv())
1154            .await
1155            .expect("awaiting message timeout out");
1156
1157        // If the subscription ended, the channel should have been closed.
1158        assert!(res.is_none());
1159
1160        timeout(Duration::from_millis(100), client.close())
1161            .await
1162            .expect("close timed out")
1163            .expect("close failed");
1164        jh.await
1165            .expect("ws loop errored")
1166            .unwrap();
1167        server_thread.await.unwrap();
1168    }
1169
1170    #[test_log::test(tokio::test)]
1171    async fn test_reconnect() {
1172        let exp_comm = [
1173            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1174                {
1175                    "method":"subscribe",
1176                    "extractor_id":{
1177                        "chain":"ethereum",
1178                        "name":"vm:ambient"
1179                    },
1180                    "include_state": true
1181                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1182            )),
1183            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1184                {
1185                    "method":"newsubscription",
1186                    "extractor_id":{
1187                    "chain":"ethereum",
1188                    "name":"vm:ambient"
1189                    },
1190                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1191                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1192            )),
1193            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1194                {
1195                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1196                    "deltas": {
1197                        "extractor": "vm:ambient",
1198                        "chain": "ethereum",
1199                        "block": {
1200                            "number": 123,
1201                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1202                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1203                            "chain": "ethereum",             
1204                            "ts": "2023-09-14T00:00:00"
1205                        },
1206                        "finalized_block_height": 0,
1207                        "revert": false,
1208                        "new_tokens": {},
1209                        "account_updates": {
1210                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1211                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1212                                "chain": "ethereum",
1213                                "slots": {},
1214                                "balance": "0x01f4",
1215                                "code": "",
1216                                "change": "Update"
1217                            }
1218                        },
1219                        "state_updates": {
1220                            "component_1": {
1221                                "component_id": "component_1",
1222                                "updated_attributes": {"attr1": "0x01"},
1223                                "deleted_attributes": ["attr2"]
1224                            }
1225                        },
1226                        "new_protocol_components": {
1227                            "protocol_1":
1228                                {
1229                                    "id": "protocol_1",
1230                                    "protocol_system": "system_1",
1231                                    "protocol_type_name": "type_1",
1232                                    "chain": "ethereum",
1233                                    "tokens": ["0x01", "0x02"],
1234                                    "contract_ids": ["0x01", "0x02"],
1235                                    "static_attributes": {"attr1": "0x01f4"},
1236                                    "change": "Update",
1237                                    "creation_tx": "0x01",
1238                                    "created_at": "2023-09-14T00:00:00"
1239                                }
1240                            },
1241                        "deleted_protocol_components": {},
1242                        "component_balances": {
1243                            "protocol_1": {
1244                                "0x01": {
1245                                    "token": "0x01",
1246                                    "balance": "0x01f4",
1247                                    "balance_float": 1000.0,
1248                                    "modify_tx": "0x01",
1249                                    "component_id": "protocol_1"
1250                                }
1251                            }
1252                        },
1253                        "account_balances": {
1254                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1255                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1256                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1257                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1258                                    "balance": "0x01f4",
1259                                    "modify_tx": "0x01"
1260                                }
1261                            }
1262                        },
1263                        "component_tvl": {
1264                            "protocol_1": 1000.0
1265                        },
1266                        "dci_update": {
1267                            "new_entrypoints": {},
1268                            "new_entrypoint_params": {},
1269                            "trace_results": {}
1270                        }
1271                    }
1272                }
1273                "#.to_owned()
1274            ))
1275        ];
1276        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1277        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1278        let jh: JoinHandle<Result<(), DeltasError>> = client
1279            .connect()
1280            .await
1281            .expect("connect failed");
1282
1283        for _ in 0..2 {
1284            let (_, mut rx) = timeout(
1285                Duration::from_millis(100),
1286                client.subscribe(
1287                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1288                    SubscriptionOptions::new(),
1289                ),
1290            )
1291            .await
1292            .expect("subscription timed out")
1293            .expect("subscription failed");
1294
1295            let _ = timeout(Duration::from_millis(100), rx.recv())
1296                .await
1297                .expect("awaiting message timeout out")
1298                .expect("receiving message failed");
1299
1300            // wait for the connection to drop
1301            let res = timeout(Duration::from_millis(200), rx.recv())
1302                .await
1303                .expect("awaiting closed connection timeout out");
1304            assert!(res.is_none());
1305        }
1306        let res = jh.await.expect("ws client join failed");
1307        // 5th client reconnect attempt should fail
1308        assert!(res.is_err());
1309        server_thread
1310            .await
1311            .expect("ws server loop errored");
1312    }
1313
1314    async fn mock_bad_connection_tycho_ws() -> (SocketAddr, JoinHandle<()>) {
1315        let server = TcpListener::bind("127.0.0.1:0")
1316            .await
1317            .expect("localhost bind failed");
1318        let addr = server.local_addr().unwrap();
1319        let jh = tokio::spawn(async move {
1320            while let Ok((stream, _)) = server.accept().await {
1321                // Immediately close the connection to simulate a failure
1322                drop(stream);
1323            }
1324        });
1325        (addr, jh)
1326    }
1327
1328    #[tokio::test]
1329    async fn test_connect_max_attempts() {
1330        let (addr, _) = mock_bad_connection_tycho_ws().await;
1331        let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1332
1333        let join_handle = client.connect().await;
1334
1335        assert!(join_handle.is_err());
1336        assert_eq!(join_handle.unwrap_err().to_string(), DeltasError::NotConnected.to_string());
1337    }
1338}