tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62    BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70    /// Failed to parse the provided URI.
71    #[error("Failed to parse URI: {0}. Error: {1}")]
72    UriParsing(String, String),
73
74    /// The requested subscription is already pending and is awaiting confirmation from the server.
75    #[error("The requested subscription is already pending")]
76    SubscriptionAlreadyPending,
77
78    #[error("The server replied with an error: {0}")]
79    ServerError(String, #[source] WebsocketError),
80
81    /// A message failed to send via an internal channel or through the websocket channel.
82    /// This is typically a fatal error and might indicate a bug in the implementation.
83    #[error("{0}")]
84    TransportError(String),
85
86    /// The internal message buffer is full. This likely means that messages are not being consumed
87    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
88    /// size.
89    #[error("The buffer is full!")]
90    BufferFull,
91
92    /// The client has no active connections but was accessed (e.g., by calling subscribe).
93    /// This typically occurs when trying to use the client before calling connect() or
94    /// after the connection has been closed.
95    #[error("The client is not connected!")]
96    NotConnected,
97
98    /// The connect method was called while the client already had an active connection.
99    #[error("The client is already connected!")]
100    AlreadyConnected,
101
102    /// The connection was closed orderly by the server, e.g. because it restarted.
103    #[error("The server closed the connection!")]
104    ConnectionClosed,
105
106    /// The connection was closed unexpectedly by the server or encountered a network error.
107    #[error("Connection error: {0}")]
108    ConnectionError(#[from] Box<tungstenite::Error>),
109
110    /// A fatal error occurred that cannot be recovered from.
111    #[error("Tycho FatalError: {0}")]
112    Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117    include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121    fn default() -> Self {
122        Self { include_state: true }
123    }
124}
125
126impl SubscriptionOptions {
127    pub fn new() -> Self {
128        Self::default()
129    }
130    pub fn with_state(mut self, val: bool) -> Self {
131        self.include_state = val;
132        self
133    }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139    /// Subscribe to an extractor and receive realtime messages
140    ///
141    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
142    /// cancels while waiting for confirmation the subscription may still be registered. If the
143    /// receiver was deallocated though, the first message from the subscription will remove it
144    /// again - since there is no one to inform about these messages.
145    async fn subscribe(
146        &self,
147        extractor_id: ExtractorIdentity,
148        options: SubscriptionOptions,
149    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151    /// Unsubscribe from an subscription
152    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154    /// Start the clients message handling loop.
155    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157    /// Close the clients message handling loop.
158    async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163    /// The tycho indexer websocket uri.
164    uri: Uri,
165    /// Authorization key for the websocket connection.
166    auth_key: Option<String>,
167    /// Maximum amount of reconnects to try before giving up.
168    max_reconnects: u64,
169    /// Duration to wait before attempting to reconnect
170    retry_cooldown: Duration,
171    /// The client will buffer this many messages incoming from the websocket
172    /// before starting to drop them.
173    ws_buffer_size: usize,
174    /// The client will buffer that many messages for each subscription before it starts droppping
175    /// them.
176    subscription_buffer_size: usize,
177    /// Notify tasks waiting for a connection to be established.
178    conn_notify: Arc<Notify>,
179    /// Shared client instance state.
180    inner: Arc<Mutex<Option<Inner>>>,
181    /// If set the client has exhausted its reconnection attempts
182    dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188/// Subscription State
189///
190/// Subscription go through a lifecycle:
191///
192/// ```text
193/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
194/// ```
195///
196/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
197/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
198/// for `unsubscribe`.
199#[derive(Debug)]
200enum SubscriptionInfo {
201    /// Subscription was requested we wait for server confirmation and uuid assignment.
202    RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203    /// Subscription is active.
204    Active,
205    /// Unsubscription was requested, we wait for server confirmation.
206    RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209/// Internal struct containing shared state between of WsDeltaClient instances.
210struct Inner {
211    /// Websocket sender handle.
212    sink: WebSocketSink,
213    /// Command channel sender handle.
214    cmd_tx: Sender<()>,
215    /// Currently pending subscriptions.
216    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217    /// Active subscriptions.
218    subscriptions: HashMap<Uuid, SubscriptionInfo>,
219    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
220    /// subscribe.
221    sender: HashMap<Uuid, Sender<BlockChanges>>,
222    /// How many messages to buffer per subscription before starting to drop new messages.
223    buffer_size: usize,
224}
225
226/// Shared state between all client instances.
227///
228/// This state is behind a mutex and requires synchronization to be read of modified.
229impl Inner {
230    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231        Self {
232            sink,
233            cmd_tx,
234            pending: HashMap::new(),
235            subscriptions: HashMap::new(),
236            sender: HashMap::new(),
237            buffer_size,
238        }
239    }
240
241    /// Registers a new pending subscription.
242    #[allow(clippy::result_large_err)]
243    fn new_subscription(
244        &mut self,
245        id: &ExtractorIdentity,
246        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247    ) -> Result<(), DeltasError> {
248        if self.pending.contains_key(id) {
249            return Err(DeltasError::SubscriptionAlreadyPending);
250        }
251        self.pending
252            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253        Ok(())
254    }
255
256    /// Transitions a pending subscription to active.
257    ///
258    /// Will ignore any request to do so for subscriptions that are not pending.
259    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260        if let Some(info) = self.pending.remove(extractor_id) {
261            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262                let (tx, rx) = mpsc::channel(self.buffer_size);
263                self.sender.insert(subscription_id, tx);
264                self.subscriptions
265                    .insert(subscription_id, SubscriptionInfo::Active);
266                let _ = ready_tx
267                    .send(Ok((subscription_id, rx)))
268                    .map_err(|_| {
269                        warn!(
270                            ?extractor_id,
271                            ?subscription_id,
272                            "Subscriber for has gone away. Ignoring."
273                        )
274                    });
275            } else {
276                error!(
277                    ?extractor_id,
278                    ?subscription_id,
279                    "Pending subscription was not in the correct state to 
280                    transition to active. Ignoring!"
281                )
282            }
283        } else {
284            error!(
285                ?extractor_id,
286                ?subscription_id,
287                "Tried to mark an unknown subscription as active. Ignoring!"
288            );
289        }
290    }
291
292    /// Sends a message to a subscription's receiver.
293    #[allow(clippy::result_large_err)]
294    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295        if let Some(sender) = self.sender.get_mut(id) {
296            sender
297                .try_send(msg)
298                .map_err(|e| match e {
299                    TrySendError::Full(_) => DeltasError::BufferFull,
300                    TrySendError::Closed(_) => {
301                        DeltasError::TransportError("The subscriber has gone away".to_string())
302                    }
303                })?;
304        }
305        Ok(())
306    }
307
308    /// Requests a subscription to end.
309    ///
310    /// The subscription needs to exist and be active for this to have any effect. Wll use
311    /// `ready_tx` to notify the receiver once the transition to ended completed.
312    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313        if let Some(info) = self
314            .subscriptions
315            .get_mut(subscription_id)
316        {
317            if let SubscriptionInfo::Active = info {
318                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319            }
320        } else {
321            // no big deal imo so only debug lvl...
322            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323        }
324    }
325
326    /// Removes and fully ends a subscription
327    ///
328    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
329    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
330    /// Will remove a subscription even it was in active or pending state before, this is to support
331    /// any server side failure of the subscription.
332    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333        if let Entry::Occupied(e) = self
334            .subscriptions
335            .entry(subscription_id)
336        {
337            let info = e.remove();
338            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339                let _ = tx.send(()).map_err(|_| {
340                    warn!(?subscription_id, "failed to notify about removed subscription")
341                });
342                self.sender
343                    .remove(&subscription_id)
344                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345            } else {
346                warn!(?subscription_id, "Subscription ended unexpectedly!");
347                self.sender
348                    .remove(&subscription_id)
349                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350            }
351        } else {
352            error!(
353                ?subscription_id,
354                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
355            );
356        }
357
358        Ok(())
359    }
360
361    fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
362        if let Some(sub_info) = self.pending.remove(extractor_id) {
363            match sub_info {
364                SubscriptionInfo::RequestedSubscription(tx) => {
365                    let _ = tx
366                        .send(Err(DeltasError::ServerError(
367                            format!("Subscription failed: {error}"),
368                            error.clone(),
369                        )))
370                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
371                }
372                _ => {
373                    error!(?extractor_id, "Pending subscription in wrong state")
374                }
375            }
376        } else {
377            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
378        }
379    }
380
381    /// Sends a message through the websocket.
382    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
383        self.sink.send(msg).await.map_err(|e| {
384            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
385        })
386    }
387}
388
389/// Tycho client websocket implementation.
390impl WsDeltasClient {
391    // Construct a new client with 5 reconnection attempts.
392    #[allow(clippy::result_large_err)]
393    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
394        let uri = ws_uri
395            .parse::<Uri>()
396            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
397        Ok(Self {
398            uri,
399            auth_key: auth_key.map(|s| s.to_string()),
400            inner: Arc::new(Mutex::new(None)),
401            ws_buffer_size: 128,
402            subscription_buffer_size: 128,
403            conn_notify: Arc::new(Notify::new()),
404            max_reconnects: 5,
405            retry_cooldown: Duration::from_millis(500),
406            dead: Arc::new(AtomicBool::new(false)),
407        })
408    }
409
410    // Construct a new client with a custom number of reconnection attempts.
411    #[allow(clippy::result_large_err)]
412    pub fn new_with_reconnects(
413        ws_uri: &str,
414        auth_key: Option<&str>,
415        max_reconnects: u64,
416        retry_cooldown: Duration,
417    ) -> Result<Self, DeltasError> {
418        let uri = ws_uri
419            .parse::<Uri>()
420            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
421
422        Ok(Self {
423            uri,
424            auth_key: auth_key.map(|s| s.to_string()),
425            inner: Arc::new(Mutex::new(None)),
426            ws_buffer_size: 128,
427            subscription_buffer_size: 128,
428            conn_notify: Arc::new(Notify::new()),
429            max_reconnects,
430            retry_cooldown,
431            dead: Arc::new(AtomicBool::new(false)),
432        })
433    }
434
435    // Construct a new client with custom buffer sizes (for testing)
436    #[cfg(test)]
437    #[allow(clippy::result_large_err)]
438    pub fn new_with_custom_buffers(
439        ws_uri: &str,
440        auth_key: Option<&str>,
441        ws_buffer_size: usize,
442        subscription_buffer_size: usize,
443    ) -> Result<Self, DeltasError> {
444        let uri = ws_uri
445            .parse::<Uri>()
446            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
447        Ok(Self {
448            uri,
449            auth_key: auth_key.map(|s| s.to_string()),
450            inner: Arc::new(Mutex::new(None)),
451            ws_buffer_size,
452            subscription_buffer_size,
453            conn_notify: Arc::new(Notify::new()),
454            max_reconnects: 5,
455            retry_cooldown: Duration::from_millis(0),
456            dead: Arc::new(AtomicBool::new(false)),
457        })
458    }
459
460    /// Ensures that the client is connected.
461    ///
462    /// This method will acquire the lock for inner.
463    async fn is_connected(&self) -> bool {
464        let guard = self.inner.as_ref().lock().await;
465        guard.is_some()
466    }
467
468    /// Waits for the client to be connected
469    ///
470    /// This method acquires the lock for inner for a short period, then waits until the  
471    /// connection is established if not already connected.
472    async fn ensure_connection(&self) -> Result<(), DeltasError> {
473        if self.dead.load(Ordering::SeqCst) {
474            return Err(DeltasError::NotConnected)
475        };
476        if !self.is_connected().await {
477            self.conn_notify.notified().await;
478        };
479        Ok(())
480    }
481
482    /// Main message handling logic
483    ///
484    /// If the message returns an error, a reconnect attempt may be considered depending on the
485    /// error type.
486    #[instrument(skip(self, msg))]
487    async fn handle_msg(
488        &self,
489        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
490    ) -> Result<(), DeltasError> {
491        let mut guard = self.inner.lock().await;
492
493        match msg {
494            // We do not deserialize the message directly into a WebSocketMessage. This is because
495            // the serde arbitrary_precision feature (often included in many
496            // dependencies we use) breaks some untagged enum deserializations. Instead,
497            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
498            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
499                serde_json::Value,
500            >(&text)
501            {
502                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
503                    Ok(ws_message) => match ws_message {
504                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
505                            trace!(?deltas, "Received a block state change, sending to channel");
506                            let inner = guard
507                                .as_mut()
508                                .ok_or_else(|| DeltasError::NotConnected)?;
509                            match inner.send(&subscription_id, deltas) {
510                                Err(DeltasError::BufferFull) => {
511                                    error!(?subscription_id, "Buffer full, unsubscribing!");
512                                    Self::force_unsubscribe(subscription_id, inner).await;
513                                }
514                                Err(_) => {
515                                    warn!(
516                                        ?subscription_id,
517                                        "Receiver for has gone away, unsubscribing!"
518                                    );
519                                    Self::force_unsubscribe(subscription_id, inner).await;
520                                }
521                                _ => { /* Do nothing */ }
522                            }
523                        }
524                        WebSocketMessage::Response(Response::NewSubscription {
525                            extractor_id,
526                            subscription_id,
527                        }) => {
528                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
529                            let inner = guard
530                                .as_mut()
531                                .ok_or_else(|| DeltasError::NotConnected)?;
532                            inner.mark_active(&extractor_id, subscription_id);
533                        }
534                        WebSocketMessage::Response(Response::SubscriptionEnded {
535                            subscription_id,
536                        }) => {
537                            info!(?subscription_id, "Received a subscription ended");
538                            let inner = guard
539                                .as_mut()
540                                .ok_or_else(|| DeltasError::NotConnected)?;
541                            inner.remove_subscription(subscription_id)?;
542                        }
543                        WebSocketMessage::Response(Response::Error(error)) => match &error {
544                            WebsocketError::ExtractorNotFound(extractor_id) => {
545                                let inner = guard
546                                    .as_mut()
547                                    .ok_or_else(|| DeltasError::NotConnected)?;
548                                inner.cancel_pending(extractor_id, &error);
549                            }
550                            WebsocketError::SubscriptionNotFound(subscription_id) => {
551                                debug!("Received subscription not found, removing subscription");
552                                let inner = guard
553                                    .as_mut()
554                                    .ok_or_else(|| DeltasError::NotConnected)?;
555                                inner.remove_subscription(*subscription_id)?;
556                            }
557                            WebsocketError::ParseError(raw, e) => {
558                                return Err(DeltasError::ServerError(
559                                    format!(
560                                        "Server failed to parse client message: {e}, msg: {raw}"
561                                    ),
562                                    error.clone(),
563                                ))
564                            }
565                            WebsocketError::SubscribeError(extractor_id) => {
566                                let inner = guard
567                                    .as_mut()
568                                    .ok_or_else(|| DeltasError::NotConnected)?;
569                                inner.cancel_pending(extractor_id, &error);
570                            }
571                        },
572                    },
573                    Err(e) => {
574                        error!(
575                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
576                            e, text
577                        );
578                    }
579                },
580                Err(e) => {
581                    error!(
582                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
583                        e, text
584                    );
585                }
586            },
587            Ok(tungstenite::protocol::Message::Ping(_)) => {
588                // Respond to pings with pongs.
589                let inner = guard
590                    .as_mut()
591                    .ok_or_else(|| DeltasError::NotConnected)?;
592                if let Err(error) = inner
593                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
594                    .await
595                {
596                    debug!(?error, "Failed to send pong!");
597                }
598            }
599            Ok(tungstenite::protocol::Message::Pong(_)) => {
600                // Do nothing.
601            }
602            Ok(tungstenite::protocol::Message::Close(_)) => {
603                return Err(DeltasError::ConnectionClosed);
604            }
605            Ok(unknown_msg) => {
606                info!("Received an unknown message type: {:?}", unknown_msg);
607            }
608            Err(error) => {
609                error!(?error, "Websocket error");
610                return Err(match error {
611                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
612                    tungstenite::Error::AlreadyClosed => {
613                        warn!("Received AlreadyClosed error which is indicative of a bug!");
614                        DeltasError::ConnectionError(Box::new(error))
615                    }
616                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
617                        DeltasError::ConnectionError(Box::new(error))
618                    }
619                    _ => DeltasError::Fatal(error.to_string()),
620                });
621            }
622        };
623        Ok(())
624    }
625
626    /// Forcefully ends a (client) stream by unsubscribing.
627    ///
628    /// Is used only if the message can't be processed due to an error that might resolve
629    /// itself by resubscribing.
630    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
631        let (tx, rx) = oneshot::channel();
632        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
633            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
634        } else {
635            // Wait for unsubscribe completion with timeout
636            match tokio::time::timeout(Duration::from_secs(5), rx).await {
637                Ok(_) => {
638                    debug!(?subscription_id, "Unsubscribe completed successfully");
639                }
640                Err(_) => {
641                    warn!(?subscription_id, "Unsubscribe completion timed out");
642                }
643            }
644        }
645    }
646
647    /// Helper method to force an unsubscription
648    ///
649    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
650    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
651    /// receivers.
652    async fn unsubscribe_inner(
653        inner: &mut Inner,
654        subscription_id: Uuid,
655        ready_tx: oneshot::Sender<()>,
656    ) -> Result<(), DeltasError> {
657        debug!(?subscription_id, "Unsubscribing");
658        inner.end_subscription(&subscription_id, ready_tx);
659        let cmd = Command::Unsubscribe { subscription_id };
660        inner
661            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
662                |e| {
663                    DeltasError::TransportError(format!(
664                        "Failed to serialize unsubscribe command: {e}"
665                    ))
666                },
667            )?))
668            .await?;
669        Ok(())
670    }
671}
672
673#[async_trait]
674impl DeltasClient for WsDeltasClient {
675    #[instrument(skip(self))]
676    async fn subscribe(
677        &self,
678        extractor_id: ExtractorIdentity,
679        options: SubscriptionOptions,
680    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
681        trace!("Starting subscribe");
682        self.ensure_connection().await?;
683        let (ready_tx, ready_rx) = oneshot::channel();
684        {
685            let mut guard = self.inner.lock().await;
686            let inner = guard
687                .as_mut()
688                .ok_or_else(|| DeltasError::NotConnected)?;
689            trace!("Sending subscribe command");
690            inner.new_subscription(&extractor_id, ready_tx)?;
691            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
692            inner
693                .ws_send(tungstenite::protocol::Message::Text(
694                    serde_json::to_string(&cmd).map_err(|e| {
695                        DeltasError::TransportError(format!(
696                            "Failed to serialize subscribe command: {e}"
697                        ))
698                    })?,
699                ))
700                .await?;
701        }
702        trace!("Waiting for subscription response");
703        let res = ready_rx.await.map_err(|_| {
704            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
705        })??;
706        trace!("Subscription successful");
707        Ok(res)
708    }
709
710    #[instrument(skip(self))]
711    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
712        self.ensure_connection().await?;
713        let (ready_tx, ready_rx) = oneshot::channel();
714        {
715            let mut guard = self.inner.lock().await;
716            let inner = guard
717                .as_mut()
718                .ok_or_else(|| DeltasError::NotConnected)?;
719
720            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
721        }
722        ready_rx.await.map_err(|_| {
723            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
724        })?;
725
726        Ok(())
727    }
728
729    #[instrument(skip(self))]
730    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
731        if self.is_connected().await {
732            return Err(DeltasError::AlreadyConnected);
733        }
734        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
735        info!(?ws_uri, "Starting TychoWebsocketClient");
736
737        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
738        {
739            let mut guard = self.inner.as_ref().lock().await;
740            *guard = None;
741        }
742        let this = self.clone();
743        let jh = tokio::spawn(async move {
744            let mut retry_count = 0;
745            let mut result = Err(DeltasError::NotConnected);
746
747            'retry: while retry_count < this.max_reconnects {
748                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
749                if retry_count > 0 {
750                    sleep(this.retry_cooldown).await;
751                }
752
753                // Create a WebSocket request
754                let mut request_builder = Request::builder()
755                    .uri(&ws_uri)
756                    .header(SEC_WEBSOCKET_KEY, generate_key())
757                    .header(SEC_WEBSOCKET_VERSION, 13)
758                    .header(CONNECTION, "Upgrade")
759                    .header(UPGRADE, "websocket")
760                    .header(
761                        HOST,
762                        this.uri.host().ok_or_else(|| {
763                            DeltasError::UriParsing(
764                                ws_uri.clone(),
765                                "No host found in tycho url".to_string(),
766                            )
767                        })?,
768                    )
769                    .header(
770                        USER_AGENT,
771                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
772                    );
773
774                // Add Authorization if one is given
775                if let Some(ref key) = this.auth_key {
776                    request_builder = request_builder.header(AUTHORIZATION, key);
777                }
778
779                let request = request_builder.body(()).map_err(|e| {
780                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
781                })?;
782                let (conn, _) = match connect_async(request).await {
783                    Ok(conn) => conn,
784                    Err(e) => {
785                        // Prepare for reconnection
786                        retry_count += 1;
787                        let mut guard = this.inner.as_ref().lock().await;
788                        *guard = None;
789
790                        warn!(
791                            e = e.to_string(),
792                            "Failed to connect to WebSocket server; Reconnecting"
793                        );
794                        continue 'retry;
795                    }
796                };
797
798                let (ws_tx_new, ws_rx_new) = conn.split();
799                {
800                    let mut guard = this.inner.as_ref().lock().await;
801                    *guard =
802                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
803                }
804                let mut msg_rx = ws_rx_new.boxed();
805
806                info!("Connection Successful: TychoWebsocketClient started");
807                this.conn_notify.notify_waiters();
808                result = Ok(());
809
810                loop {
811                    let res = tokio::select! {
812                        msg = msg_rx.next() => match msg {
813                            Some(msg) => this.handle_msg(msg).await,
814                            None => {
815                                // This code should not be reachable since the stream
816                                // should return ConnectionClosed in the case above
817                                // before it returns None here.
818                                warn!("Websocket connection silently closed, giving up!");
819                                break 'retry
820                            }
821                        },
822                        _ = cmd_rx.recv() => {break 'retry},
823                    };
824                    if let Err(error) = res {
825                        debug!(?error, "WsError");
826                        if matches!(
827                            error,
828                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
829                        ) {
830                            // Prepare for reconnection
831                            retry_count += 1;
832                            let mut guard = this.inner.as_ref().lock().await;
833                            *guard = None;
834
835                            warn!(
836                                ?error,
837                                ?retry_count,
838                                "Connection dropped unexpectedly; Reconnecting..."
839                            );
840                            break;
841                        } else {
842                            // Other errors are considered fatal
843                            error!(?error, "Fatal error; Exiting");
844                            result = Err(error);
845                            break 'retry;
846                        }
847                    }
848                }
849            }
850            debug!(
851                retry_count,
852                max_reconnects=?this.max_reconnects,
853                "Reconnection loop ended"
854            );
855            // Clean up before exiting
856            let mut guard = this.inner.as_ref().lock().await;
857            *guard = None;
858
859            // Check if max retries has been reached.
860            if retry_count >= this.max_reconnects {
861                error!("Max reconnection attempts reached; Exiting");
862                this.dead.store(true, Ordering::SeqCst);
863                this.conn_notify.notify_waiters(); // Notify that the task is done
864                result = Err(DeltasError::ConnectionClosed);
865            }
866
867            result
868        });
869
870        self.conn_notify.notified().await;
871
872        if self.is_connected().await {
873            Ok(jh)
874        } else {
875            Err(DeltasError::NotConnected)
876        }
877    }
878
879    #[instrument(skip(self))]
880    async fn close(&self) -> Result<(), DeltasError> {
881        info!("Closing TychoWebsocketClient");
882        let mut guard = self.inner.lock().await;
883        let inner = guard
884            .as_mut()
885            .ok_or_else(|| DeltasError::NotConnected)?;
886        inner
887            .cmd_tx
888            .send(())
889            .await
890            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
891        Ok(())
892    }
893}
894
895#[cfg(test)]
896mod tests {
897    use std::net::SocketAddr;
898
899    use test_log::test;
900    use tokio::{net::TcpListener, time::timeout};
901    use tycho_common::dto::Chain;
902
903    use super::*;
904
905    #[derive(Clone)]
906    enum ExpectedComm {
907        Receive(u64, tungstenite::protocol::Message),
908        Send(tungstenite::protocol::Message),
909    }
910
911    async fn mock_tycho_ws(
912        messages: &[ExpectedComm],
913        reconnects: usize,
914    ) -> (SocketAddr, JoinHandle<()>) {
915        info!("Starting mock webserver");
916        // zero port here means the OS chooses an open port
917        let server = TcpListener::bind("127.0.0.1:0")
918            .await
919            .expect("localhost bind failed");
920        let addr = server.local_addr().unwrap();
921        let messages = messages.to_vec();
922
923        let jh = tokio::spawn(async move {
924            info!("mock webserver started");
925            for _ in 0..(reconnects + 1) {
926                info!("Awaiting client connections");
927                if let Ok((stream, _)) = server.accept().await {
928                    info!("Client connected");
929                    let mut websocket = tokio_tungstenite::accept_async(stream)
930                        .await
931                        .unwrap();
932
933                    info!("Handling messages..");
934                    for c in messages.iter().cloned() {
935                        match c {
936                            ExpectedComm::Receive(t, exp) => {
937                                info!("Awaiting message...");
938                                let msg = timeout(Duration::from_millis(t), websocket.next())
939                                    .await
940                                    .expect("Receive timeout")
941                                    .expect("Stream exhausted")
942                                    .expect("Failed to receive message.");
943                                info!("Message received");
944                                assert_eq!(msg, exp)
945                            }
946                            ExpectedComm::Send(data) => {
947                                info!("Sending message");
948                                websocket
949                                    .send(data)
950                                    .await
951                                    .expect("Failed to send message");
952                                info!("Message sent");
953                            }
954                        };
955                    }
956                    info!("Mock communication completed");
957                    sleep(Duration::from_millis(100)).await;
958                    // Close the WebSocket connection
959                    let _ = websocket.close(None).await;
960                    info!("Mock server closed connection");
961                }
962            }
963            info!("mock server ended");
964        });
965        (addr, jh)
966    }
967
968    #[tokio::test]
969    async fn test_subscribe_receive() {
970        let exp_comm = [
971            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
972                {
973                    "method":"subscribe",
974                    "extractor_id":{
975                        "chain":"ethereum",
976                        "name":"vm:ambient"
977                    },
978                    "include_state": true
979                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
980            )),
981            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
982                {
983                    "method":"newsubscription",
984                    "extractor_id":{
985                    "chain":"ethereum",
986                    "name":"vm:ambient"
987                    },
988                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
989                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
990            )),
991            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
992                {
993                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
994                    "deltas": {
995                        "extractor": "vm:ambient",
996                        "chain": "ethereum",
997                        "block": {
998                            "number": 123,
999                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1000                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1001                            "chain": "ethereum",             
1002                            "ts": "2023-09-14T00:00:00"
1003                        },
1004                        "finalized_block_height": 0,
1005                        "revert": false,
1006                        "new_tokens": {},
1007                        "account_updates": {
1008                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1009                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1010                                "chain": "ethereum",
1011                                "slots": {},
1012                                "balance": "0x01f4",
1013                                "code": "",
1014                                "change": "Update"
1015                            }
1016                        },
1017                        "state_updates": {
1018                            "component_1": {
1019                                "component_id": "component_1",
1020                                "updated_attributes": {"attr1": "0x01"},
1021                                "deleted_attributes": ["attr2"]
1022                            }
1023                        },
1024                        "new_protocol_components": 
1025                            { "protocol_1": {
1026                                    "id": "protocol_1",
1027                                    "protocol_system": "system_1",
1028                                    "protocol_type_name": "type_1",
1029                                    "chain": "ethereum",
1030                                    "tokens": ["0x01", "0x02"],
1031                                    "contract_ids": ["0x01", "0x02"],
1032                                    "static_attributes": {"attr1": "0x01f4"},
1033                                    "change": "Update",
1034                                    "creation_tx": "0x01",
1035                                    "created_at": "2023-09-14T00:00:00"
1036                                }
1037                            },
1038                        "deleted_protocol_components": {},
1039                        "component_balances": {
1040                            "protocol_1":
1041                                {
1042                                    "0x01": {
1043                                        "token": "0x01",
1044                                        "balance": "0x01f4",
1045                                        "balance_float": 0.0,
1046                                        "modify_tx": "0x01",
1047                                        "component_id": "protocol_1"
1048                                    }
1049                                }
1050                        },
1051                        "account_balances": {
1052                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1053                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1054                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1055                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1056                                    "balance": "0x01f4",
1057                                    "modify_tx": "0x01"
1058                                }
1059                            }
1060                        },
1061                        "component_tvl": {
1062                            "protocol_1": 1000.0
1063                        },
1064                        "dci_update": {
1065                            "new_entrypoints": {},
1066                            "new_entrypoint_params": {},
1067                            "trace_results": {}
1068                        }
1069                    }
1070                }
1071                "#.to_owned()
1072            ))
1073        ];
1074        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1075
1076        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1077        let jh = client
1078            .connect()
1079            .await
1080            .expect("connect failed");
1081        let (_, mut rx) = timeout(
1082            Duration::from_millis(100),
1083            client.subscribe(
1084                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1085                SubscriptionOptions::new(),
1086            ),
1087        )
1088        .await
1089        .expect("subscription timed out")
1090        .expect("subscription failed");
1091        let _ = timeout(Duration::from_millis(100), rx.recv())
1092            .await
1093            .expect("awaiting message timeout out")
1094            .expect("receiving message failed");
1095        timeout(Duration::from_millis(100), client.close())
1096            .await
1097            .expect("close timed out")
1098            .expect("close failed");
1099        jh.await
1100            .expect("ws loop errored")
1101            .unwrap();
1102        server_thread.await.unwrap();
1103    }
1104
1105    #[tokio::test]
1106    async fn test_unsubscribe() {
1107        let exp_comm = [
1108            ExpectedComm::Receive(
1109                100,
1110                tungstenite::protocol::Message::Text(
1111                    r#"
1112                {
1113                    "method": "subscribe",
1114                    "extractor_id":{
1115                        "chain": "ethereum",
1116                        "name": "vm:ambient"
1117                    },
1118                    "include_state": true
1119                }"#
1120                    .to_owned()
1121                    .replace(|c: char| c.is_whitespace(), ""),
1122                ),
1123            ),
1124            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1125                r#"
1126                {
1127                    "method": "newsubscription",
1128                    "extractor_id":{
1129                        "chain": "ethereum",
1130                        "name": "vm:ambient"
1131                    },
1132                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1133                }"#
1134                .to_owned()
1135                .replace(|c: char| c.is_whitespace(), ""),
1136            )),
1137            ExpectedComm::Receive(
1138                100,
1139                tungstenite::protocol::Message::Text(
1140                    r#"
1141                {
1142                    "method": "unsubscribe",
1143                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1144                }
1145                "#
1146                    .to_owned()
1147                    .replace(|c: char| c.is_whitespace(), ""),
1148                ),
1149            ),
1150            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1151                r#"
1152                {
1153                    "method": "subscriptionended",
1154                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1155                }
1156                "#
1157                .to_owned()
1158                .replace(|c: char| c.is_whitespace(), ""),
1159            )),
1160        ];
1161        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1162
1163        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1164        let jh = client
1165            .connect()
1166            .await
1167            .expect("connect failed");
1168        let (sub_id, mut rx) = timeout(
1169            Duration::from_millis(100),
1170            client.subscribe(
1171                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1172                SubscriptionOptions::new(),
1173            ),
1174        )
1175        .await
1176        .expect("subscription timed out")
1177        .expect("subscription failed");
1178
1179        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1180            .await
1181            .expect("unsubscribe timed out")
1182            .expect("unsubscribe failed");
1183        let res = timeout(Duration::from_millis(100), rx.recv())
1184            .await
1185            .expect("awaiting message timeout out");
1186
1187        // If the subscription ended, the channel should have been closed.
1188        assert!(res.is_none());
1189
1190        timeout(Duration::from_millis(100), client.close())
1191            .await
1192            .expect("close timed out")
1193            .expect("close failed");
1194        jh.await
1195            .expect("ws loop errored")
1196            .unwrap();
1197        server_thread.await.unwrap();
1198    }
1199
1200    #[tokio::test]
1201    async fn test_subscription_unexpected_end() {
1202        let exp_comm = [
1203            ExpectedComm::Receive(
1204                100,
1205                tungstenite::protocol::Message::Text(
1206                    r#"
1207                {
1208                    "method":"subscribe",
1209                    "extractor_id":{
1210                        "chain":"ethereum",
1211                        "name":"vm:ambient"
1212                    },
1213                    "include_state": true
1214                }"#
1215                    .to_owned()
1216                    .replace(|c: char| c.is_whitespace(), ""),
1217                ),
1218            ),
1219            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1220                r#"
1221                {
1222                    "method":"newsubscription",
1223                    "extractor_id":{
1224                        "chain":"ethereum",
1225                        "name":"vm:ambient"
1226                    },
1227                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1228                }"#
1229                .to_owned()
1230                .replace(|c: char| c.is_whitespace(), ""),
1231            )),
1232            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1233                r#"
1234                {
1235                    "method": "subscriptionended",
1236                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1237                }"#
1238                .to_owned()
1239                .replace(|c: char| c.is_whitespace(), ""),
1240            )),
1241        ];
1242        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1243
1244        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1245        let jh = client
1246            .connect()
1247            .await
1248            .expect("connect failed");
1249        let (_, mut rx) = timeout(
1250            Duration::from_millis(100),
1251            client.subscribe(
1252                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1253                SubscriptionOptions::new(),
1254            ),
1255        )
1256        .await
1257        .expect("subscription timed out")
1258        .expect("subscription failed");
1259        let res = timeout(Duration::from_millis(100), rx.recv())
1260            .await
1261            .expect("awaiting message timeout out");
1262
1263        // If the subscription ended, the channel should have been closed.
1264        assert!(res.is_none());
1265
1266        timeout(Duration::from_millis(100), client.close())
1267            .await
1268            .expect("close timed out")
1269            .expect("close failed");
1270        jh.await
1271            .expect("ws loop errored")
1272            .unwrap();
1273        server_thread.await.unwrap();
1274    }
1275
1276    #[test_log::test(tokio::test)]
1277    async fn test_reconnect() {
1278        let exp_comm = [
1279            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1280                {
1281                    "method":"subscribe",
1282                    "extractor_id":{
1283                        "chain":"ethereum",
1284                        "name":"vm:ambient"
1285                    },
1286                    "include_state": true
1287                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1288            )),
1289            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1290                {
1291                    "method":"newsubscription",
1292                    "extractor_id":{
1293                    "chain":"ethereum",
1294                    "name":"vm:ambient"
1295                    },
1296                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1297                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1298            )),
1299            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1300                {
1301                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1302                    "deltas": {
1303                        "extractor": "vm:ambient",
1304                        "chain": "ethereum",
1305                        "block": {
1306                            "number": 123,
1307                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1308                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1309                            "chain": "ethereum",             
1310                            "ts": "2023-09-14T00:00:00"
1311                        },
1312                        "finalized_block_height": 0,
1313                        "revert": false,
1314                        "new_tokens": {},
1315                        "account_updates": {
1316                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1317                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1318                                "chain": "ethereum",
1319                                "slots": {},
1320                                "balance": "0x01f4",
1321                                "code": "",
1322                                "change": "Update"
1323                            }
1324                        },
1325                        "state_updates": {
1326                            "component_1": {
1327                                "component_id": "component_1",
1328                                "updated_attributes": {"attr1": "0x01"},
1329                                "deleted_attributes": ["attr2"]
1330                            }
1331                        },
1332                        "new_protocol_components": {
1333                            "protocol_1":
1334                                {
1335                                    "id": "protocol_1",
1336                                    "protocol_system": "system_1",
1337                                    "protocol_type_name": "type_1",
1338                                    "chain": "ethereum",
1339                                    "tokens": ["0x01", "0x02"],
1340                                    "contract_ids": ["0x01", "0x02"],
1341                                    "static_attributes": {"attr1": "0x01f4"},
1342                                    "change": "Update",
1343                                    "creation_tx": "0x01",
1344                                    "created_at": "2023-09-14T00:00:00"
1345                                }
1346                            },
1347                        "deleted_protocol_components": {},
1348                        "component_balances": {
1349                            "protocol_1": {
1350                                "0x01": {
1351                                    "token": "0x01",
1352                                    "balance": "0x01f4",
1353                                    "balance_float": 1000.0,
1354                                    "modify_tx": "0x01",
1355                                    "component_id": "protocol_1"
1356                                }
1357                            }
1358                        },
1359                        "account_balances": {
1360                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1361                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1362                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1363                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1364                                    "balance": "0x01f4",
1365                                    "modify_tx": "0x01"
1366                                }
1367                            }
1368                        },
1369                        "component_tvl": {
1370                            "protocol_1": 1000.0
1371                        },
1372                        "dci_update": {
1373                            "new_entrypoints": {},
1374                            "new_entrypoint_params": {},
1375                            "trace_results": {}
1376                        }
1377                    }
1378                }
1379                "#.to_owned()
1380            ))
1381        ];
1382        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1383        let client = WsDeltasClient::new_with_reconnects(
1384            &format!("ws://{addr}"),
1385            None,
1386            3,
1387            // server stays down for 100ms on connection drop
1388            Duration::from_millis(110),
1389        )
1390        .unwrap();
1391
1392        let jh: JoinHandle<Result<(), DeltasError>> = client
1393            .connect()
1394            .await
1395            .expect("connect failed");
1396
1397        for _ in 0..2 {
1398            dbg!("loop");
1399            let (_, mut rx) = timeout(
1400                Duration::from_millis(200),
1401                client.subscribe(
1402                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1403                    SubscriptionOptions::new(),
1404                ),
1405            )
1406            .await
1407            .expect("subscription timed out")
1408            .expect("subscription failed");
1409
1410            let _ = timeout(Duration::from_millis(100), rx.recv())
1411                .await
1412                .expect("awaiting message timeout out")
1413                .expect("receiving message failed");
1414
1415            // wait for the connection to drop
1416            let res = timeout(Duration::from_millis(200), rx.recv())
1417                .await
1418                .expect("awaiting closed connection timeout out");
1419            assert!(res.is_none());
1420        }
1421        let res = jh.await.expect("ws client join failed");
1422        // 5th client reconnect attempt should fail
1423        assert!(res.is_err());
1424        server_thread
1425            .await
1426            .expect("ws server loop errored");
1427    }
1428
1429    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1430        let server = TcpListener::bind("127.0.0.1:0")
1431            .await
1432            .expect("localhost bind failed");
1433        let addr = server.local_addr().unwrap();
1434        let jh = tokio::spawn(async move {
1435            while let Ok((stream, _)) = server.accept().await {
1436                if accept_first {
1437                    // Send connection handshake to accept the connection (fail later)
1438                    let stream = tokio_tungstenite::accept_async(stream)
1439                        .await
1440                        .unwrap();
1441                    sleep(Duration::from_millis(10)).await;
1442                    drop(stream)
1443                } else {
1444                    // Close the connection to simulate a failure
1445                    drop(stream);
1446                }
1447            }
1448        });
1449        (addr, jh)
1450    }
1451
1452    #[test(tokio::test)]
1453    async fn test_subscribe_dead_client_after_max_attempts() {
1454        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1455        let client = WsDeltasClient::new_with_reconnects(
1456            &format!("ws://{addr}"),
1457            None,
1458            3,
1459            Duration::from_secs(0),
1460        )
1461        .unwrap();
1462
1463        let join_handle = client.connect().await.unwrap();
1464        let handle_res = join_handle.await.unwrap();
1465        assert!(handle_res.is_err());
1466        assert!(!client.is_connected().await);
1467
1468        let subscription_res = timeout(
1469            Duration::from_millis(10),
1470            client.subscribe(
1471                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1472                SubscriptionOptions::new(),
1473            ),
1474        )
1475        .await
1476        .unwrap();
1477        assert!(subscription_res.is_err());
1478    }
1479
1480    #[test(tokio::test)]
1481    async fn test_ws_client_retry_cooldown() {
1482        let start = std::time::Instant::now();
1483        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1484
1485        // Use the mock server that immediately drops connections
1486        let client = WsDeltasClient::new_with_reconnects(
1487            &format!("ws://{addr}"),
1488            None,
1489            3,                         // 3 attempts total (so 2 retries with cooldowns)
1490            Duration::from_millis(50), // 50ms cooldown
1491        )
1492        .unwrap();
1493
1494        // Try to connect - this should fail after retries but still measure the time
1495        let connect_result = client.connect().await;
1496        let elapsed = start.elapsed();
1497
1498        // Connection should fail after exhausting retries
1499        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1500
1501        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1502        assert!(
1503            elapsed >= Duration::from_millis(100),
1504            "Expected at least 100ms elapsed, got {:?}",
1505            elapsed
1506        );
1507
1508        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1509        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1510    }
1511
1512    #[test_log::test(tokio::test)]
1513    async fn test_buffer_full_triggers_unsubscribe() {
1514        // Expected communication sequence for buffer full scenario
1515        let exp_comm = {
1516            [
1517            // 1. Client subscribes
1518            ExpectedComm::Receive(
1519                100,
1520                tungstenite::protocol::Message::Text(
1521                    r#"
1522                {
1523                    "method":"subscribe",
1524                    "extractor_id":{
1525                        "chain":"ethereum",
1526                        "name":"vm:ambient"
1527                    },
1528                    "include_state": true
1529                }"#
1530                    .to_owned()
1531                    .replace(|c: char| c.is_whitespace(), ""),
1532                ),
1533            ),
1534            // 2. Server confirms subscription
1535            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1536                r#"
1537                {
1538                    "method":"newsubscription",
1539                    "extractor_id":{
1540                        "chain":"ethereum",
1541                        "name":"vm:ambient"
1542                    },
1543                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1544                }"#
1545                .to_owned()
1546                .replace(|c: char| c.is_whitespace(), ""),
1547            )),
1548            // 3. Server sends first message (fills buffer)
1549            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1550                r#"
1551                {
1552                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1553                    "deltas": {
1554                        "extractor": "vm:ambient",
1555                        "chain": "ethereum",
1556                        "block": {
1557                            "number": 123,
1558                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1559                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1560                            "chain": "ethereum",
1561                            "ts": "2023-09-14T00:00:00"
1562                        },
1563                        "finalized_block_height": 0,
1564                        "revert": false,
1565                        "new_tokens": {},
1566                        "account_updates": {},
1567                        "state_updates": {},
1568                        "new_protocol_components": {},
1569                        "deleted_protocol_components": {},
1570                        "component_balances": {},
1571                        "account_balances": {},
1572                        "component_tvl": {},
1573                        "dci_update": {
1574                            "new_entrypoints": {},
1575                            "new_entrypoint_params": {},
1576                            "trace_results": {}
1577                        }
1578                    }
1579                }
1580                "#.to_owned()
1581            )),
1582            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1583            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1584                r#"
1585                {
1586                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1587                    "deltas": {
1588                        "extractor": "vm:ambient",
1589                        "chain": "ethereum",
1590                        "block": {
1591                            "number": 124,
1592                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1593                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1594                            "chain": "ethereum",
1595                            "ts": "2023-09-14T00:00:01"
1596                        },
1597                        "finalized_block_height": 0,
1598                        "revert": false,
1599                        "new_tokens": {},
1600                        "account_updates": {},
1601                        "state_updates": {},
1602                        "new_protocol_components": {},
1603                        "deleted_protocol_components": {},
1604                        "component_balances": {},
1605                        "account_balances": {},
1606                        "component_tvl": {},
1607                        "dci_update": {
1608                            "new_entrypoints": {},
1609                            "new_entrypoint_params": {},
1610                            "trace_results": {}
1611                        }
1612                    }
1613                }
1614                "#.to_owned()
1615            )),
1616            // 5. Expect unsubscribe command due to buffer full
1617            ExpectedComm::Receive(
1618                100,
1619                tungstenite::protocol::Message::Text(
1620                    r#"
1621                {
1622                    "method": "unsubscribe",
1623                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1624                }
1625                "#
1626                    .to_owned()
1627                    .replace(|c: char| c.is_whitespace(), ""),
1628                ),
1629            ),
1630            // 6. Server confirms unsubscription
1631            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1632                r#"
1633                {
1634                    "method": "subscriptionended",
1635                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1636                }
1637                "#
1638                .to_owned()
1639                .replace(|c: char| c.is_whitespace(), ""),
1640            )),
1641        ]
1642        };
1643
1644        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1645
1646        // Create client with very small buffer size (1) to easily trigger BufferFull
1647        let client = WsDeltasClient::new_with_custom_buffers(
1648            &format!("ws://{addr}"),
1649            None,
1650            128, // ws_buffer_size
1651            1,   // subscription_buffer_size - this will trigger BufferFull easily
1652        )
1653        .unwrap();
1654
1655        let jh = client
1656            .connect()
1657            .await
1658            .expect("connect failed");
1659
1660        let (_sub_id, mut rx) = timeout(
1661            Duration::from_millis(100),
1662            client.subscribe(
1663                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1664                SubscriptionOptions::new(),
1665            ),
1666        )
1667        .await
1668        .expect("subscription timed out")
1669        .expect("subscription failed");
1670
1671        // Allow time for messages to be processed and buffer to fill up
1672        tokio::time::sleep(Duration::from_millis(100)).await;
1673
1674        // Collect all messages until channel closes or we get a reasonable number
1675        let mut received_msgs = Vec::new();
1676
1677        // Use a single longer timeout to collect messages until channel closes
1678        while received_msgs.len() < 3 {
1679            match timeout(Duration::from_millis(200), rx.recv()).await {
1680                Ok(Some(msg)) => {
1681                    received_msgs.push(msg);
1682                }
1683                Ok(None) => {
1684                    // Channel closed - this is what we expect after buffer overflow
1685                    break;
1686                }
1687                Err(_) => {
1688                    // Timeout - no more messages coming
1689                    break;
1690                }
1691            }
1692        }
1693
1694        // Verify the key behavior: buffer overflow should limit messages and close channel
1695        assert!(
1696            received_msgs.len() <= 1,
1697            "Expected buffer overflow to limit messages to at most 1, got {}",
1698            received_msgs.len()
1699        );
1700
1701        if let Some(first_msg) = received_msgs.first() {
1702            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1703        }
1704
1705        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1706        // been verified We don't need to explicitly close the client as it will be cleaned
1707        // up when dropped
1708
1709        // Just wait for the tasks to finish cleanly
1710        drop(rx); // Explicitly drop the receiver
1711        tokio::time::sleep(Duration::from_millis(50)).await;
1712
1713        // Abort the tasks to clean up
1714        jh.abort();
1715        server_thread.abort();
1716
1717        let _ = jh.await;
1718        let _ = server_thread.await;
1719    }
1720
1721    #[tokio::test]
1722    async fn test_server_error_handling() {
1723        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1724
1725        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1726
1727        // Test ExtractorNotFound error
1728        let error_response = WebSocketMessage::Response(Response::Error(
1729            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1730        ));
1731        let error_json = serde_json::to_string(&error_response).unwrap();
1732
1733        let exp_comm = [
1734            ExpectedComm::Receive(
1735                100,
1736                tungstenite::protocol::Message::Text(
1737                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1738                ),
1739            ),
1740            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1741        ];
1742
1743        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1744
1745        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1746        let jh = client
1747            .connect()
1748            .await
1749            .expect("connect failed");
1750
1751        let result = timeout(
1752            Duration::from_millis(100),
1753            client.subscribe(extractor_id, SubscriptionOptions::new()),
1754        )
1755        .await
1756        .expect("subscription timed out");
1757
1758        // Verify that we get a ServerError
1759        assert!(result.is_err());
1760        if let Err(DeltasError::ServerError(msg, _)) = result {
1761            assert!(msg.contains("Subscription failed"));
1762            assert!(msg.contains("Extractor not found"));
1763        } else {
1764            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1765        }
1766
1767        timeout(Duration::from_millis(100), client.close())
1768            .await
1769            .expect("close timed out")
1770            .expect("close failed");
1771        jh.await
1772            .expect("ws loop errored")
1773            .unwrap();
1774        server_thread.await.unwrap();
1775    }
1776
1777    #[test_log::test(tokio::test)]
1778    async fn test_subscription_not_found_error() {
1779        // Test scenario: Server restart causes subscription loss
1780        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1781
1782        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1783        let subscription_id = Uuid::new_v4();
1784
1785        let error_response = WebSocketMessage::Response(Response::Error(
1786            WebsocketError::SubscriptionNotFound(subscription_id),
1787        ));
1788        let error_json = serde_json::to_string(&error_response).unwrap();
1789
1790        let exp_comm = [
1791            // 1. Client subscribes successfully
1792            ExpectedComm::Receive(
1793                100,
1794                tungstenite::protocol::Message::Text(
1795                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1796                ),
1797            ),
1798            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1799                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1800                subscription_id
1801            ))),
1802            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1803            ExpectedComm::Receive(
1804                100,
1805                tungstenite::protocol::Message::Text(format!(
1806                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1807                    subscription_id
1808                )),
1809            ),
1810            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1811            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1812        ];
1813
1814        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1815
1816        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1817        let jh = client
1818            .connect()
1819            .await
1820            .expect("connect failed");
1821
1822        // Subscribe successfully
1823        let (received_sub_id, _rx) = timeout(
1824            Duration::from_millis(100),
1825            client.subscribe(extractor_id, SubscriptionOptions::new()),
1826        )
1827        .await
1828        .expect("subscription timed out")
1829        .expect("subscription failed");
1830
1831        assert_eq!(received_sub_id, subscription_id);
1832
1833        // Now try to unsubscribe - this should fail because server "restarted"
1834        let unsubscribe_result =
1835            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1836                .await
1837                .expect("unsubscribe timed out");
1838
1839        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1840        // In this case, the client should treat it as successful since the subscription
1841        // is effectively gone (whether due to server restart or other reasons)
1842        unsubscribe_result
1843            .expect("Unsubscribe should succeed even if server says subscription not found");
1844
1845        timeout(Duration::from_millis(100), client.close())
1846            .await
1847            .expect("close timed out")
1848            .expect("close failed");
1849        jh.await
1850            .expect("ws loop errored")
1851            .unwrap();
1852        server_thread.await.unwrap();
1853    }
1854
1855    #[test_log::test(tokio::test)]
1856    async fn test_parse_error_handling() {
1857        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1858
1859        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1860        let error_response = WebSocketMessage::Response(Response::Error(
1861            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1862        ));
1863        let error_json = serde_json::to_string(&error_response).unwrap();
1864
1865        let exp_comm = [
1866            // subscribe first so connect can finish successfully
1867            ExpectedComm::Receive(
1868                100,
1869                tungstenite::protocol::Message::Text(
1870                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1871                ),
1872            ),
1873            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1874        ];
1875
1876        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1877
1878        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1879        let jh = client
1880            .connect()
1881            .await
1882            .expect("connect failed");
1883
1884        // Subscribe successfully
1885        let _ = timeout(
1886            Duration::from_millis(100),
1887            client.subscribe(extractor_id, SubscriptionOptions::new()),
1888        )
1889        .await
1890        .expect("subscription timed out");
1891
1892        // The client should receive the parse error and close the connection
1893        let result = jh
1894            .await
1895            .expect("ws loop should complete");
1896        assert!(result.is_err());
1897        if let Err(DeltasError::ServerError(message, _)) = result {
1898            assert!(message.contains("Server failed to parse client message"));
1899        } else {
1900            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1901        }
1902
1903        server_thread.await.unwrap();
1904    }
1905
1906    #[tokio::test]
1907    async fn test_subscribe_error_handling() {
1908        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1909
1910        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1911
1912        let error_response = WebSocketMessage::Response(Response::Error(
1913            WebsocketError::SubscribeError(extractor_id.clone()),
1914        ));
1915        let error_json = serde_json::to_string(&error_response).unwrap();
1916
1917        let exp_comm = [
1918            ExpectedComm::Receive(
1919                100,
1920                tungstenite::protocol::Message::Text(
1921                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true}"#.to_string()
1922                ),
1923            ),
1924            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1925        ];
1926
1927        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1928
1929        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1930        let jh = client
1931            .connect()
1932            .await
1933            .expect("connect failed");
1934
1935        let result = timeout(
1936            Duration::from_millis(100),
1937            client.subscribe(extractor_id, SubscriptionOptions::new()),
1938        )
1939        .await
1940        .expect("subscription timed out");
1941
1942        // Verify that we get a ServerError for subscribe failure
1943        assert!(result.is_err());
1944        if let Err(DeltasError::ServerError(msg, _)) = result {
1945            assert!(msg.contains("Subscription failed"));
1946            assert!(msg.contains("Failed to subscribe to extractor"));
1947        } else {
1948            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1949        }
1950
1951        timeout(Duration::from_millis(100), client.close())
1952            .await
1953            .expect("close timed out")
1954            .expect("close failed");
1955        jh.await
1956            .expect("ws loop errored")
1957            .unwrap();
1958        server_thread.await.unwrap();
1959    }
1960
1961    #[tokio::test]
1962    async fn test_cancel_pending_subscription() {
1963        // This test verifies that pending subscriptions are properly cancelled when errors occur
1964        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1965
1966        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1967
1968        let error_response = WebSocketMessage::Response(Response::Error(
1969            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1970        ));
1971        let error_json = serde_json::to_string(&error_response).unwrap();
1972
1973        let exp_comm = [
1974            ExpectedComm::Receive(
1975                100,
1976                tungstenite::protocol::Message::Text(
1977                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1978                ),
1979            ),
1980            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1981        ];
1982
1983        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1984
1985        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1986        let jh = client
1987            .connect()
1988            .await
1989            .expect("connect failed");
1990
1991        // Start two subscription attempts simultaneously
1992        let client_clone = client.clone();
1993        let extractor_id_clone = extractor_id.clone();
1994
1995        let subscription1 = tokio::spawn({
1996            let client_for_spawn = client.clone();
1997            async move {
1998                client_for_spawn
1999                    .subscribe(extractor_id, SubscriptionOptions::new())
2000                    .await
2001            }
2002        });
2003
2004        let subscription2 = tokio::spawn(async move {
2005            // This should fail because there's already a pending subscription
2006            client_clone
2007                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2008                .await
2009        });
2010
2011        let (result1, result2) = tokio::join!(subscription1, subscription2);
2012
2013        let result1 = result1.unwrap();
2014        let result2 = result2.unwrap();
2015
2016        // One should fail due to ExtractorNotFound error from server
2017        // The other should fail due to SubscriptionAlreadyPending
2018        assert!(result1.is_err() || result2.is_err());
2019
2020        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2021            // This is expected for the second subscription
2022        } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2023            // This is expected for the first subscription that gets the server error
2024        } else {
2025            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2026        }
2027
2028        timeout(Duration::from_millis(100), client.close())
2029            .await
2030            .expect("close timed out")
2031            .expect("close failed");
2032        jh.await
2033            .expect("ws loop errored")
2034            .unwrap();
2035        server_thread.await.unwrap();
2036    }
2037}