tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
62use uuid::Uuid;
63
64use crate::TYCHO_SERVER_VERSION;
65
66#[derive(Error, Debug)]
67pub enum DeltasError {
68    /// Failed to parse the provided URI.
69    #[error("Failed to parse URI: {0}. Error: {1}")]
70    UriParsing(String, String),
71
72    /// The requested subscription is already pending and is awaiting confirmation from the server.
73    #[error("The requested subscription is already pending")]
74    SubscriptionAlreadyPending,
75
76    /// A message failed to send via an internal channel or through the websocket channel.
77    /// This is typically a fatal error and might indicate a bug in the implementation.
78    #[error("{0}")]
79    TransportError(String),
80
81    /// The internal message buffer is full. This likely means that messages are not being consumed
82    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
83    /// size.
84    #[error("The buffer is full!")]
85    BufferFull,
86
87    /// The client has no active connections but was accessed (e.g., by calling subscribe).
88    /// This typically occurs when trying to use the client before calling connect() or
89    /// after the connection has been closed.
90    #[error("The client is not connected!")]
91    NotConnected,
92
93    /// The connect method was called while the client already had an active connection.
94    #[error("The client is already connected!")]
95    AlreadyConnected,
96
97    /// The connection was closed orderly by the server, e.g. because it restarted.
98    #[error("The server closed the connection!")]
99    ConnectionClosed,
100
101    /// The connection was closed unexpectedly by the server or encountered a network error.
102    #[error("Connection error: {0}")]
103    ConnectionError(#[from] Box<tungstenite::Error>),
104
105    /// A fatal error occurred that cannot be recovered from.
106    #[error("Tycho FatalError: {0}")]
107    Fatal(String),
108}
109
110#[derive(Clone, Debug)]
111pub struct SubscriptionOptions {
112    include_state: bool,
113}
114
115impl Default for SubscriptionOptions {
116    fn default() -> Self {
117        Self { include_state: true }
118    }
119}
120
121impl SubscriptionOptions {
122    pub fn new() -> Self {
123        Self::default()
124    }
125    pub fn with_state(mut self, val: bool) -> Self {
126        self.include_state = val;
127        self
128    }
129}
130
131#[cfg_attr(test, automock)]
132#[async_trait]
133pub trait DeltasClient {
134    /// Subscribe to an extractor and receive realtime messages
135    ///
136    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
137    /// cancels while waiting for confirmation the subscription may still be registered. If the
138    /// receiver was deallocated though, the first message from the subscription will remove it
139    /// again - since there is no one to inform about these messages.
140    async fn subscribe(
141        &self,
142        extractor_id: ExtractorIdentity,
143        options: SubscriptionOptions,
144    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
145
146    /// Unsubscribe from an subscription
147    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
148
149    /// Start the clients message handling loop.
150    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
151
152    /// Close the clients message handling loop.
153    async fn close(&self) -> Result<(), DeltasError>;
154}
155
156#[derive(Clone)]
157pub struct WsDeltasClient {
158    /// The tycho indexer websocket uri.
159    uri: Uri,
160    /// Authorization key for the websocket connection.
161    auth_key: Option<String>,
162    /// Maximum amount of reconnects to try before giving up.
163    max_reconnects: u32,
164    /// The client will buffer this many messages incoming from the websocket
165    /// before starting to drop them.
166    ws_buffer_size: usize,
167    /// The client will buffer that many messages for each subscription before it starts droppping
168    /// them.
169    subscription_buffer_size: usize,
170    /// Notify tasks waiting for a connection to be established.
171    conn_notify: Arc<Notify>,
172    /// Shared client instance state.
173    inner: Arc<Mutex<Option<Inner>>>,
174    /// If set the client has exhausted it's reconnection attemtpts
175    dead: Arc<AtomicBool>,
176}
177
178type WebSocketSink =
179    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
180
181/// Subscription State
182///
183/// Subscription go through a lifecycle:
184///
185/// ```text
186/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
187/// ```
188///
189/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
190/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
191/// for `unsubscribe`.
192#[derive(Debug)]
193enum SubscriptionInfo {
194    /// Subscription was requested we wait for server confirmation and uuid assignment.
195    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
196    /// Subscription is active.
197    Active,
198    /// Unsubscription was requested, we wait for server confirmation.
199    RequestedUnsubscription(oneshot::Sender<()>),
200}
201
202/// Internal struct containing shared state between of WsDeltaClient instances.
203struct Inner {
204    /// Websocket sender handle.
205    sink: WebSocketSink,
206    /// Command channel sender handle.
207    cmd_tx: Sender<()>,
208    /// Currently pending subscriptions.
209    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
210    /// Active subscriptions.
211    subscriptions: HashMap<Uuid, SubscriptionInfo>,
212    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
213    /// subscribe.
214    sender: HashMap<Uuid, Sender<BlockChanges>>,
215    /// How many messages to buffer per subscription before starting to drop new messages.
216    buffer_size: usize,
217}
218
219/// Shared state between all client instances.
220///
221/// This state is behind a mutex and requires synchronization to be read of modified.
222impl Inner {
223    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
224        Self {
225            sink,
226            cmd_tx,
227            pending: HashMap::new(),
228            subscriptions: HashMap::new(),
229            sender: HashMap::new(),
230            buffer_size,
231        }
232    }
233
234    /// Registers a new pending subscription.
235    #[allow(clippy::result_large_err)]
236    fn new_subscription(
237        &mut self,
238        id: &ExtractorIdentity,
239        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
240    ) -> Result<(), DeltasError> {
241        if self.pending.contains_key(id) {
242            return Err(DeltasError::SubscriptionAlreadyPending);
243        }
244        self.pending
245            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
246        Ok(())
247    }
248
249    /// Transitions a pending subscription to active.
250    ///
251    /// Will ignore any request to do so for subscriptions that are not pending.
252    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
253        if let Some(info) = self.pending.remove(extractor_id) {
254            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
255                let (tx, rx) = mpsc::channel(self.buffer_size);
256                self.sender.insert(subscription_id, tx);
257                self.subscriptions
258                    .insert(subscription_id, SubscriptionInfo::Active);
259                let _ = ready_tx
260                    .send((subscription_id, rx))
261                    .map_err(|_| {
262                        warn!(
263                            ?extractor_id,
264                            ?subscription_id,
265                            "Subscriber for has gone away. Ignoring."
266                        )
267                    });
268            } else {
269                error!(
270                    ?extractor_id,
271                    ?subscription_id,
272                    "Pending subscription was not in the correct state to 
273                    transition to active. Ignoring!"
274                )
275            }
276        } else {
277            error!(
278                ?extractor_id,
279                ?subscription_id,
280                "Tried to mark an unkown subscription as active. Ignoring!"
281            );
282        }
283    }
284
285    /// Sends a message to a subscription's receiver.
286    #[allow(clippy::result_large_err)]
287    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
288        if let Some(sender) = self.sender.get_mut(id) {
289            sender
290                .try_send(msg)
291                .map_err(|e| match e {
292                    TrySendError::Full(_) => DeltasError::BufferFull,
293                    TrySendError::Closed(_) => {
294                        DeltasError::TransportError("The subscriber has gone away".to_string())
295                    }
296                })?;
297        }
298        Ok(())
299    }
300
301    /// Requests a subscription to end.
302    ///
303    /// The subscription needs to exist and be active for this to have any effect. Wll use
304    /// `ready_tx` to notify the receiver once the transition to ended completed.
305    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
306        if let Some(info) = self
307            .subscriptions
308            .get_mut(subscription_id)
309        {
310            if let SubscriptionInfo::Active = info {
311                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
312            }
313        } else {
314            // no big deal imo so only debug lvl..
315            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
316        }
317    }
318
319    /// Removes and fully ends a subscription
320    ///
321    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
322    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
323    /// Will remove a subscription even it was in active or pending state before, this is to support
324    /// any server side failure of the subscription.
325    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
326        if let Entry::Occupied(e) = self
327            .subscriptions
328            .entry(subscription_id)
329        {
330            let info = e.remove();
331            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
332                let _ = tx.send(()).map_err(|_| {
333                    warn!(?subscription_id, "failed to notify about removed subscription")
334                });
335                self.sender
336                    .remove(&subscription_id)
337                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
338            } else {
339                warn!(?subscription_id, "Subscription ended unexpectedly!");
340                self.sender
341                    .remove(&subscription_id)
342                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
343            }
344        } else {
345            error!(
346                ?subscription_id,
347                "Received `SubscriptionEnded`, but was never subscribed 
348                to it. This is likely a bug!"
349            );
350        }
351
352        Ok(())
353    }
354
355    /// Sends a message through the websocket.
356    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
357        self.sink.send(msg).await.map_err(|e| {
358            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
359        })
360    }
361}
362
363/// Tycho client websocket implementation.
364impl WsDeltasClient {
365    // Construct a new client with 5 reconnection attempts.
366    #[allow(clippy::result_large_err)]
367    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
368        let uri = ws_uri
369            .parse::<Uri>()
370            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
371        Ok(Self {
372            uri,
373            auth_key: auth_key.map(|s| s.to_string()),
374            inner: Arc::new(Mutex::new(None)),
375            ws_buffer_size: 128,
376            subscription_buffer_size: 128,
377            conn_notify: Arc::new(Notify::new()),
378            max_reconnects: 5,
379            dead: Arc::new(AtomicBool::new(false)),
380        })
381    }
382
383    // Construct a new client with a custom number of reconnection attempts.
384    #[allow(clippy::result_large_err)]
385    pub fn new_with_reconnects(
386        ws_uri: &str,
387        max_reconnects: u32,
388        auth_key: Option<&str>,
389    ) -> Result<Self, DeltasError> {
390        let uri = ws_uri
391            .parse::<Uri>()
392            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
393
394        Ok(Self {
395            uri,
396            auth_key: auth_key.map(|s| s.to_string()),
397            inner: Arc::new(Mutex::new(None)),
398            ws_buffer_size: 128,
399            subscription_buffer_size: 128,
400            conn_notify: Arc::new(Notify::new()),
401            max_reconnects,
402            dead: Arc::new(AtomicBool::new(false)),
403        })
404    }
405
406    /// Ensures that the client is connected.
407    ///
408    /// This method will acquire the lock for inner.
409    async fn is_connected(&self) -> bool {
410        let guard = self.inner.as_ref().lock().await;
411        guard.is_some()
412    }
413
414    /// Waits for the client to be connected
415    ///
416    /// This method acquires the lock for inner for a short period, then waits until the  
417    /// connection is established if not already connected.
418    async fn ensure_connection(&self) -> Result<(), DeltasError> {
419        if self.dead.load(Ordering::SeqCst) {
420            return Err(DeltasError::NotConnected)
421        };
422        if !self.is_connected().await {
423            self.conn_notify.notified().await;
424        };
425        Ok(())
426    }
427
428    /// Main message handling logic
429    ///
430    /// If the message returns an error, a reconnect attempt may be considered depending on the
431    /// error type.
432    #[instrument(skip(self, msg))]
433    async fn handle_msg(
434        &self,
435        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
436    ) -> Result<(), DeltasError> {
437        let mut guard = self.inner.lock().await;
438
439        match msg {
440            // We do not deserialize the message directly into a WebSocketMessage. This is because
441            // the serde arbitrary_precision feature (often included in many
442            // dependencies we use) breaks some untagged enum deserializations. Instead,
443            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
444            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
445                serde_json::Value,
446            >(&text)
447            {
448                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
449                    Ok(ws_message) => match ws_message {
450                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
451                            trace!(?deltas, "Received a block state change, sending to channel");
452                            let inner = guard
453                                .as_mut()
454                                .ok_or_else(|| DeltasError::NotConnected)?;
455                            match inner.send(&subscription_id, deltas) {
456                                Err(DeltasError::BufferFull) => {
457                                    error!(?subscription_id, "Buffer full, message dropped!");
458                                }
459                                Err(_) => {
460                                    warn!(
461                                        ?subscription_id,
462                                        "Receiver for has gone away, unsubscribing!"
463                                    );
464                                    let (tx, rx) = oneshot::channel();
465                                    if let Err(e) = WsDeltasClient::unsubscribe_inner(
466                                        inner,
467                                        subscription_id,
468                                        tx,
469                                    )
470                                    .await
471                                    {
472                                        warn!(
473                                            ?e,
474                                            ?subscription_id,
475                                            "Failed to send unsubscribe command"
476                                        );
477                                    } else {
478                                        // Wait for unsubscribe completion with timeout
479                                        match tokio::time::timeout(Duration::from_secs(5), rx).await
480                                        {
481                                            Ok(_) => {
482                                                debug!(
483                                                    ?subscription_id,
484                                                    "Unsubscribe completed successfully"
485                                                );
486                                            }
487                                            Err(_) => {
488                                                warn!(
489                                                    ?subscription_id,
490                                                    "Unsubscribe completion timed out"
491                                                );
492                                            }
493                                        }
494                                    }
495                                }
496                                _ => { /* Do nothing */ }
497                            }
498                        }
499                        WebSocketMessage::Response(Response::NewSubscription {
500                            extractor_id,
501                            subscription_id,
502                        }) => {
503                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
504                            let inner = guard
505                                .as_mut()
506                                .ok_or_else(|| DeltasError::NotConnected)?;
507                            inner.mark_active(&extractor_id, subscription_id);
508                        }
509                        WebSocketMessage::Response(Response::SubscriptionEnded {
510                            subscription_id,
511                        }) => {
512                            info!(?subscription_id, "Received a subscription ended");
513                            let inner = guard
514                                .as_mut()
515                                .ok_or_else(|| DeltasError::NotConnected)?;
516                            inner.remove_subscription(subscription_id)?;
517                        }
518                    },
519                    Err(e) => {
520                        error!(
521                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
522                            e, text
523                        );
524                    }
525                },
526                Err(e) => {
527                    error!(
528                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
529                        e, text
530                    );
531                }
532            },
533            Ok(tungstenite::protocol::Message::Ping(_)) => {
534                // Respond to pings with pongs.
535                let inner = guard
536                    .as_mut()
537                    .ok_or_else(|| DeltasError::NotConnected)?;
538                if let Err(error) = inner
539                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
540                    .await
541                {
542                    debug!(?error, "Failed to send pong!");
543                }
544            }
545            Ok(tungstenite::protocol::Message::Pong(_)) => {
546                // Do nothing.
547            }
548            Ok(tungstenite::protocol::Message::Close(_)) => {
549                return Err(DeltasError::ConnectionClosed);
550            }
551            Ok(unknown_msg) => {
552                info!("Received an unknown message type: {:?}", unknown_msg);
553            }
554            Err(error) => {
555                error!(?error, "Websocket error");
556                return Err(match error {
557                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
558                    tungstenite::Error::AlreadyClosed => {
559                        warn!("Received AlreadyClosed error which is indicative of a bug!");
560                        DeltasError::ConnectionError(Box::new(error))
561                    }
562                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
563                        DeltasError::ConnectionError(Box::new(error))
564                    }
565                    _ => DeltasError::Fatal(error.to_string()),
566                });
567            }
568        };
569        Ok(())
570    }
571
572    /// Helper method to force an unsubscription
573    ///
574    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
575    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
576    /// receivers.
577    async fn unsubscribe_inner(
578        inner: &mut Inner,
579        subscription_id: Uuid,
580        ready_tx: oneshot::Sender<()>,
581    ) -> Result<(), DeltasError> {
582        inner.end_subscription(&subscription_id, ready_tx);
583        let cmd = Command::Unsubscribe { subscription_id };
584        inner
585            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
586                |e| {
587                    DeltasError::TransportError(format!(
588                        "Failed to serialize unsubscribe command: {e}"
589                    ))
590                },
591            )?))
592            .await?;
593        Ok(())
594    }
595}
596
597#[async_trait]
598impl DeltasClient for WsDeltasClient {
599    #[instrument(skip(self))]
600    async fn subscribe(
601        &self,
602        extractor_id: ExtractorIdentity,
603        options: SubscriptionOptions,
604    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
605        trace!("Starting subscribe");
606        self.ensure_connection().await?;
607        let (ready_tx, ready_rx) = oneshot::channel();
608        {
609            let mut guard = self.inner.lock().await;
610            let inner = guard
611                .as_mut()
612                .ok_or_else(|| DeltasError::NotConnected)?;
613            trace!("Sending subscribe command");
614            inner.new_subscription(&extractor_id, ready_tx)?;
615            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
616            inner
617                .ws_send(tungstenite::protocol::Message::Text(
618                    serde_json::to_string(&cmd).map_err(|e| {
619                        DeltasError::TransportError(format!(
620                            "Failed to serialize subscribe command: {e}"
621                        ))
622                    })?,
623                ))
624                .await?;
625        }
626        trace!("Waiting for subscription response");
627        let rx = ready_rx.await.map_err(|_| {
628            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
629        })?;
630        trace!("Subscription successful");
631        Ok(rx)
632    }
633
634    #[instrument(skip(self))]
635    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
636        self.ensure_connection().await?;
637        let (ready_tx, ready_rx) = oneshot::channel();
638        {
639            let mut guard = self.inner.lock().await;
640            let inner = guard
641                .as_mut()
642                .ok_or_else(|| DeltasError::NotConnected)?;
643
644            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
645        }
646        ready_rx.await.map_err(|_| {
647            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
648        })?;
649
650        Ok(())
651    }
652
653    #[instrument(skip(self))]
654    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
655        if self.is_connected().await {
656            return Err(DeltasError::AlreadyConnected);
657        }
658        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
659        info!(?ws_uri, "Starting TychoWebsocketClient");
660
661        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
662        {
663            let mut guard = self.inner.as_ref().lock().await;
664            *guard = None;
665        }
666        let this = self.clone();
667        let jh = tokio::spawn(async move {
668            let mut retry_count = 0;
669            let mut result = Err(DeltasError::NotConnected);
670
671            'retry: while retry_count < this.max_reconnects {
672                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
673
674                // Create a WebSocket request
675                let mut request_builder = Request::builder()
676                    .uri(&ws_uri)
677                    .header(SEC_WEBSOCKET_KEY, generate_key())
678                    .header(SEC_WEBSOCKET_VERSION, 13)
679                    .header(CONNECTION, "Upgrade")
680                    .header(UPGRADE, "websocket")
681                    .header(
682                        HOST,
683                        this.uri.host().ok_or_else(|| {
684                            DeltasError::UriParsing(
685                                ws_uri.clone(),
686                                "No host found in tycho url".to_string(),
687                            )
688                        })?,
689                    )
690                    .header(
691                        USER_AGENT,
692                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
693                    );
694
695                // Add Authorization if one is given
696                if let Some(ref key) = this.auth_key {
697                    request_builder = request_builder.header(AUTHORIZATION, key);
698                }
699
700                let request = request_builder.body(()).map_err(|e| {
701                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
702                })?;
703                let (conn, _) = match connect_async(request).await {
704                    Ok(conn) => conn,
705                    Err(e) => {
706                        // Prepare for reconnection
707                        retry_count += 1;
708                        let mut guard = this.inner.as_ref().lock().await;
709                        *guard = None;
710
711                        warn!(
712                            e = e.to_string(),
713                            "Failed to connect to WebSocket server; Reconnecting"
714                        );
715                        sleep(Duration::from_millis(500)).await;
716
717                        continue 'retry;
718                    }
719                };
720
721                let (ws_tx_new, ws_rx_new) = conn.split();
722                {
723                    let mut guard = this.inner.as_ref().lock().await;
724                    *guard =
725                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
726                }
727                let mut msg_rx = ws_rx_new.boxed();
728
729                info!("Connection Successful: TychoWebsocketClient started");
730                this.conn_notify.notify_waiters();
731                result = Ok(());
732
733                loop {
734                    let res = tokio::select! {
735                        msg = msg_rx.next() => match msg {
736                            Some(msg) => this.handle_msg(msg).await,
737                            None => { break 'retry } // ws connection silently closed
738                        },
739                        _ = cmd_rx.recv() => {break 'retry},
740                    };
741                    if let Err(error) = res {
742                        debug!(?error, "WsError");
743                        if matches!(
744                            error,
745                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
746                        ) {
747                            // Prepare for reconnection
748                            retry_count += 1;
749                            let mut guard = this.inner.as_ref().lock().await;
750                            *guard = None;
751
752                            warn!(
753                                ?error,
754                                ?retry_count,
755                                "Connection dropped unexpectedly; Reconnecting..."
756                            );
757                            break;
758                        } else {
759                            // Other errors are considered fatal
760                            error!(?error, "Fatal error; Exiting");
761                            result = Err(error);
762                            break 'retry;
763                        }
764                    }
765                }
766            }
767            debug!(
768                retry_count,
769                max_reconnects=?this.max_reconnects,
770                "Reconnection loop ended"
771            );
772            // Clean up before exiting
773            let mut guard = this.inner.as_ref().lock().await;
774            *guard = None;
775
776            // Check if max retries has been reached.
777            if retry_count >= this.max_reconnects {
778                error!("Max reconnection attempts reached; Exiting");
779                this.dead.store(true, Ordering::SeqCst);
780                this.conn_notify.notify_waiters(); // Notify that the task is done
781                result = Err(DeltasError::ConnectionClosed);
782            }
783
784            result
785        });
786
787        self.conn_notify.notified().await;
788
789        if self.is_connected().await {
790            Ok(jh)
791        } else {
792            Err(DeltasError::NotConnected)
793        }
794    }
795
796    #[instrument(skip(self))]
797    async fn close(&self) -> Result<(), DeltasError> {
798        info!("Closing TychoWebsocketClient");
799        let mut guard = self.inner.lock().await;
800        let inner = guard
801            .as_mut()
802            .ok_or_else(|| DeltasError::NotConnected)?;
803        inner
804            .cmd_tx
805            .send(())
806            .await
807            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
808        Ok(())
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use std::net::SocketAddr;
815
816    use test_log::test;
817    use tokio::{net::TcpListener, time::timeout};
818    use tycho_common::dto::Chain;
819
820    use super::*;
821
822    #[derive(Clone)]
823    enum ExpectedComm {
824        Receive(u64, tungstenite::protocol::Message),
825        Send(tungstenite::protocol::Message),
826    }
827
828    async fn mock_tycho_ws(
829        messages: &[ExpectedComm],
830        reconnects: usize,
831    ) -> (SocketAddr, JoinHandle<()>) {
832        info!("Starting mock webserver");
833        // zero port here means the OS chooses an open port
834        let server = TcpListener::bind("127.0.0.1:0")
835            .await
836            .expect("localhost bind failed");
837        let addr = server.local_addr().unwrap();
838        let messages = messages.to_vec();
839
840        let jh = tokio::spawn(async move {
841            info!("mock webserver started");
842            for _ in 0..(reconnects + 1) {
843                if let Ok((stream, _)) = server.accept().await {
844                    let mut websocket = tokio_tungstenite::accept_async(stream)
845                        .await
846                        .unwrap();
847
848                    info!("Handling messages..");
849                    for c in messages.iter().cloned() {
850                        match c {
851                            ExpectedComm::Receive(t, exp) => {
852                                info!("Awaiting message...");
853                                let msg = timeout(Duration::from_millis(t), websocket.next())
854                                    .await
855                                    .expect("Receive timeout")
856                                    .expect("Stream exhausted")
857                                    .expect("Failed to receive message.");
858                                info!("Message received");
859                                assert_eq!(msg, exp)
860                            }
861                            ExpectedComm::Send(data) => {
862                                info!("Sending message");
863                                websocket
864                                    .send(data)
865                                    .await
866                                    .expect("Failed to send message");
867                                info!("Message sent");
868                            }
869                        };
870                    }
871                    sleep(Duration::from_millis(100)).await;
872                    // Close the WebSocket connection
873                    let _ = websocket.close(None).await;
874                }
875            }
876        });
877        (addr, jh)
878    }
879
880    #[tokio::test]
881    async fn test_subscribe_receive() {
882        let exp_comm = [
883            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
884                {
885                    "method":"subscribe",
886                    "extractor_id":{
887                        "chain":"ethereum",
888                        "name":"vm:ambient"
889                    },
890                    "include_state": true
891                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
892            )),
893            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
894                {
895                    "method":"newsubscription",
896                    "extractor_id":{
897                    "chain":"ethereum",
898                    "name":"vm:ambient"
899                    },
900                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
901                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
902            )),
903            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
904                {
905                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
906                    "deltas": {
907                        "extractor": "vm:ambient",
908                        "chain": "ethereum",
909                        "block": {
910                            "number": 123,
911                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
912                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
913                            "chain": "ethereum",             
914                            "ts": "2023-09-14T00:00:00"
915                        },
916                        "finalized_block_height": 0,
917                        "revert": false,
918                        "new_tokens": {},
919                        "account_updates": {
920                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
921                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
922                                "chain": "ethereum",
923                                "slots": {},
924                                "balance": "0x01f4",
925                                "code": "",
926                                "change": "Update"
927                            }
928                        },
929                        "state_updates": {
930                            "component_1": {
931                                "component_id": "component_1",
932                                "updated_attributes": {"attr1": "0x01"},
933                                "deleted_attributes": ["attr2"]
934                            }
935                        },
936                        "new_protocol_components": 
937                            { "protocol_1": {
938                                    "id": "protocol_1",
939                                    "protocol_system": "system_1",
940                                    "protocol_type_name": "type_1",
941                                    "chain": "ethereum",
942                                    "tokens": ["0x01", "0x02"],
943                                    "contract_ids": ["0x01", "0x02"],
944                                    "static_attributes": {"attr1": "0x01f4"},
945                                    "change": "Update",
946                                    "creation_tx": "0x01",
947                                    "created_at": "2023-09-14T00:00:00"
948                                }
949                            },
950                        "deleted_protocol_components": {},
951                        "component_balances": {
952                            "protocol_1":
953                                {
954                                    "0x01": {
955                                        "token": "0x01",
956                                        "balance": "0x01f4",
957                                        "balance_float": 0.0,
958                                        "modify_tx": "0x01",
959                                        "component_id": "protocol_1"
960                                    }
961                                }
962                        },
963                        "account_balances": {
964                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
965                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
966                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
967                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
968                                    "balance": "0x01f4",
969                                    "modify_tx": "0x01"
970                                }
971                            }
972                        },
973                        "component_tvl": {
974                            "protocol_1": 1000.0
975                        },
976                        "dci_update": {
977                            "new_entrypoints": {},
978                            "new_entrypoint_params": {},
979                            "trace_results": {}
980                        }
981                    }
982                }
983                "#.to_owned()
984            ))
985        ];
986        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
987
988        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
989        let jh = client
990            .connect()
991            .await
992            .expect("connect failed");
993        let (_, mut rx) = timeout(
994            Duration::from_millis(100),
995            client.subscribe(
996                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
997                SubscriptionOptions::new(),
998            ),
999        )
1000        .await
1001        .expect("subscription timed out")
1002        .expect("subscription failed");
1003        let _ = timeout(Duration::from_millis(100), rx.recv())
1004            .await
1005            .expect("awaiting message timeout out")
1006            .expect("receiving message failed");
1007        timeout(Duration::from_millis(100), client.close())
1008            .await
1009            .expect("close timed out")
1010            .expect("close failed");
1011        jh.await
1012            .expect("ws loop errored")
1013            .unwrap();
1014        server_thread.await.unwrap();
1015    }
1016
1017    #[tokio::test]
1018    async fn test_unsubscribe() {
1019        let exp_comm = [
1020            ExpectedComm::Receive(
1021                100,
1022                tungstenite::protocol::Message::Text(
1023                    r#"
1024                {
1025                    "method": "subscribe",
1026                    "extractor_id":{
1027                        "chain": "ethereum",
1028                        "name": "vm:ambient"
1029                    },
1030                    "include_state": true
1031                }"#
1032                    .to_owned()
1033                    .replace(|c: char| c.is_whitespace(), ""),
1034                ),
1035            ),
1036            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1037                r#"
1038                {
1039                    "method": "newsubscription",
1040                    "extractor_id":{
1041                        "chain": "ethereum",
1042                        "name": "vm:ambient"
1043                    },
1044                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1045                }"#
1046                .to_owned()
1047                .replace(|c: char| c.is_whitespace(), ""),
1048            )),
1049            ExpectedComm::Receive(
1050                100,
1051                tungstenite::protocol::Message::Text(
1052                    r#"
1053                {
1054                    "method": "unsubscribe",
1055                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1056                }
1057                "#
1058                    .to_owned()
1059                    .replace(|c: char| c.is_whitespace(), ""),
1060                ),
1061            ),
1062            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1063                r#"
1064                {
1065                    "method": "subscriptionended",
1066                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1067                }
1068                "#
1069                .to_owned()
1070                .replace(|c: char| c.is_whitespace(), ""),
1071            )),
1072        ];
1073        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1074
1075        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1076        let jh = client
1077            .connect()
1078            .await
1079            .expect("connect failed");
1080        let (sub_id, mut rx) = timeout(
1081            Duration::from_millis(100),
1082            client.subscribe(
1083                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1084                SubscriptionOptions::new(),
1085            ),
1086        )
1087        .await
1088        .expect("subscription timed out")
1089        .expect("subscription failed");
1090
1091        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1092            .await
1093            .expect("unsubscribe timed out")
1094            .expect("unsubscribe failed");
1095        let res = timeout(Duration::from_millis(100), rx.recv())
1096            .await
1097            .expect("awaiting message timeout out");
1098
1099        // If the subscription ended, the channel should have been closed.
1100        assert!(res.is_none());
1101
1102        timeout(Duration::from_millis(100), client.close())
1103            .await
1104            .expect("close timed out")
1105            .expect("close failed");
1106        jh.await
1107            .expect("ws loop errored")
1108            .unwrap();
1109        server_thread.await.unwrap();
1110    }
1111
1112    #[tokio::test]
1113    async fn test_subscription_unexpected_end() {
1114        let exp_comm = [
1115            ExpectedComm::Receive(
1116                100,
1117                tungstenite::protocol::Message::Text(
1118                    r#"
1119                {
1120                    "method":"subscribe",
1121                    "extractor_id":{
1122                        "chain":"ethereum",
1123                        "name":"vm:ambient"
1124                    },
1125                    "include_state": true
1126                }"#
1127                    .to_owned()
1128                    .replace(|c: char| c.is_whitespace(), ""),
1129                ),
1130            ),
1131            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1132                r#"
1133                {
1134                    "method":"newsubscription",
1135                    "extractor_id":{
1136                        "chain":"ethereum",
1137                        "name":"vm:ambient"
1138                    },
1139                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1140                }"#
1141                .to_owned()
1142                .replace(|c: char| c.is_whitespace(), ""),
1143            )),
1144            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1145                r#"
1146                {
1147                    "method": "subscriptionended",
1148                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1149                }"#
1150                .to_owned()
1151                .replace(|c: char| c.is_whitespace(), ""),
1152            )),
1153        ];
1154        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1155
1156        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1157        let jh = client
1158            .connect()
1159            .await
1160            .expect("connect failed");
1161        let (_, mut rx) = timeout(
1162            Duration::from_millis(100),
1163            client.subscribe(
1164                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1165                SubscriptionOptions::new(),
1166            ),
1167        )
1168        .await
1169        .expect("subscription timed out")
1170        .expect("subscription failed");
1171        let res = timeout(Duration::from_millis(100), rx.recv())
1172            .await
1173            .expect("awaiting message timeout out");
1174
1175        // If the subscription ended, the channel should have been closed.
1176        assert!(res.is_none());
1177
1178        timeout(Duration::from_millis(100), client.close())
1179            .await
1180            .expect("close timed out")
1181            .expect("close failed");
1182        jh.await
1183            .expect("ws loop errored")
1184            .unwrap();
1185        server_thread.await.unwrap();
1186    }
1187
1188    #[test_log::test(tokio::test)]
1189    async fn test_reconnect() {
1190        let exp_comm = [
1191            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1192                {
1193                    "method":"subscribe",
1194                    "extractor_id":{
1195                        "chain":"ethereum",
1196                        "name":"vm:ambient"
1197                    },
1198                    "include_state": true
1199                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1200            )),
1201            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1202                {
1203                    "method":"newsubscription",
1204                    "extractor_id":{
1205                    "chain":"ethereum",
1206                    "name":"vm:ambient"
1207                    },
1208                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1209                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1210            )),
1211            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1212                {
1213                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1214                    "deltas": {
1215                        "extractor": "vm:ambient",
1216                        "chain": "ethereum",
1217                        "block": {
1218                            "number": 123,
1219                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1220                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1221                            "chain": "ethereum",             
1222                            "ts": "2023-09-14T00:00:00"
1223                        },
1224                        "finalized_block_height": 0,
1225                        "revert": false,
1226                        "new_tokens": {},
1227                        "account_updates": {
1228                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1229                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1230                                "chain": "ethereum",
1231                                "slots": {},
1232                                "balance": "0x01f4",
1233                                "code": "",
1234                                "change": "Update"
1235                            }
1236                        },
1237                        "state_updates": {
1238                            "component_1": {
1239                                "component_id": "component_1",
1240                                "updated_attributes": {"attr1": "0x01"},
1241                                "deleted_attributes": ["attr2"]
1242                            }
1243                        },
1244                        "new_protocol_components": {
1245                            "protocol_1":
1246                                {
1247                                    "id": "protocol_1",
1248                                    "protocol_system": "system_1",
1249                                    "protocol_type_name": "type_1",
1250                                    "chain": "ethereum",
1251                                    "tokens": ["0x01", "0x02"],
1252                                    "contract_ids": ["0x01", "0x02"],
1253                                    "static_attributes": {"attr1": "0x01f4"},
1254                                    "change": "Update",
1255                                    "creation_tx": "0x01",
1256                                    "created_at": "2023-09-14T00:00:00"
1257                                }
1258                            },
1259                        "deleted_protocol_components": {},
1260                        "component_balances": {
1261                            "protocol_1": {
1262                                "0x01": {
1263                                    "token": "0x01",
1264                                    "balance": "0x01f4",
1265                                    "balance_float": 1000.0,
1266                                    "modify_tx": "0x01",
1267                                    "component_id": "protocol_1"
1268                                }
1269                            }
1270                        },
1271                        "account_balances": {
1272                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1273                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1274                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1275                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1276                                    "balance": "0x01f4",
1277                                    "modify_tx": "0x01"
1278                                }
1279                            }
1280                        },
1281                        "component_tvl": {
1282                            "protocol_1": 1000.0
1283                        },
1284                        "dci_update": {
1285                            "new_entrypoints": {},
1286                            "new_entrypoint_params": {},
1287                            "trace_results": {}
1288                        }
1289                    }
1290                }
1291                "#.to_owned()
1292            ))
1293        ];
1294        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1295        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1296        let jh: JoinHandle<Result<(), DeltasError>> = client
1297            .connect()
1298            .await
1299            .expect("connect failed");
1300
1301        for _ in 0..2 {
1302            let (_, mut rx) = timeout(
1303                Duration::from_millis(100),
1304                client.subscribe(
1305                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1306                    SubscriptionOptions::new(),
1307                ),
1308            )
1309            .await
1310            .expect("subscription timed out")
1311            .expect("subscription failed");
1312
1313            let _ = timeout(Duration::from_millis(100), rx.recv())
1314                .await
1315                .expect("awaiting message timeout out")
1316                .expect("receiving message failed");
1317
1318            // wait for the connection to drop
1319            let res = timeout(Duration::from_millis(200), rx.recv())
1320                .await
1321                .expect("awaiting closed connection timeout out");
1322            assert!(res.is_none());
1323        }
1324        let res = jh.await.expect("ws client join failed");
1325        // 5th client reconnect attempt should fail
1326        assert!(res.is_err());
1327        server_thread
1328            .await
1329            .expect("ws server loop errored");
1330    }
1331
1332    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1333        let server = TcpListener::bind("127.0.0.1:0")
1334            .await
1335            .expect("localhost bind failed");
1336        let addr = server.local_addr().unwrap();
1337        let jh = tokio::spawn(async move {
1338            while let Ok((stream, _)) = server.accept().await {
1339                if accept_first {
1340                    // Send connection handshake to accept the connection (fail later)
1341                    let stream = tokio_tungstenite::accept_async(stream)
1342                        .await
1343                        .unwrap();
1344                    sleep(Duration::from_millis(10)).await;
1345                    drop(stream)
1346                } else {
1347                    // Close the connection to simulate a failure
1348                    drop(stream);
1349                }
1350            }
1351        });
1352        (addr, jh)
1353    }
1354
1355    #[test(tokio::test)]
1356    async fn test_subscribe_dead_client_after_max_attempts() {
1357        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1358        let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1359
1360        let join_handle = client.connect().await.unwrap();
1361        let handle_res = join_handle.await.unwrap();
1362        assert!(handle_res.is_err());
1363        assert!(!client.is_connected().await);
1364
1365        let subscription_res = timeout(
1366            Duration::from_millis(10),
1367            client.subscribe(
1368                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1369                SubscriptionOptions::new(),
1370            ),
1371        )
1372        .await
1373        .unwrap();
1374        assert!(subscription_res.is_err());
1375    }
1376}