tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62    BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70    /// Failed to parse the provided URI.
71    #[error("Failed to parse URI: {0}. Error: {1}")]
72    UriParsing(String, String),
73
74    /// The requested subscription is already pending and is awaiting confirmation from the server.
75    #[error("The requested subscription is already pending")]
76    SubscriptionAlreadyPending,
77
78    #[error("The server replied with an error: {0}")]
79    ServerError(String, #[source] WebsocketError),
80
81    /// A message failed to send via an internal channel or through the websocket channel.
82    /// This is typically a fatal error and might indicate a bug in the implementation.
83    #[error("{0}")]
84    TransportError(String),
85
86    /// The internal message buffer is full. This likely means that messages are not being consumed
87    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
88    /// size.
89    #[error("The buffer is full!")]
90    BufferFull,
91
92    /// The client has no active connections but was accessed (e.g., by calling subscribe).
93    /// This typically occurs when trying to use the client before calling connect() or
94    /// after the connection has been closed.
95    #[error("The client is not connected!")]
96    NotConnected,
97
98    /// The connect method was called while the client already had an active connection.
99    #[error("The client is already connected!")]
100    AlreadyConnected,
101
102    /// The connection was closed orderly by the server, e.g. because it restarted.
103    #[error("The server closed the connection!")]
104    ConnectionClosed,
105
106    /// The connection was closed unexpectedly by the server or encountered a network error.
107    #[error("Connection error: {0}")]
108    ConnectionError(#[from] Box<tungstenite::Error>),
109
110    /// A fatal error occurred that cannot be recovered from.
111    #[error("Tycho FatalError: {0}")]
112    Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117    include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121    fn default() -> Self {
122        Self { include_state: true }
123    }
124}
125
126impl SubscriptionOptions {
127    pub fn new() -> Self {
128        Self::default()
129    }
130    pub fn with_state(mut self, val: bool) -> Self {
131        self.include_state = val;
132        self
133    }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139    /// Subscribe to an extractor and receive realtime messages
140    ///
141    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
142    /// cancels while waiting for confirmation the subscription may still be registered. If the
143    /// receiver was deallocated though, the first message from the subscription will remove it
144    /// again - since there is no one to inform about these messages.
145    async fn subscribe(
146        &self,
147        extractor_id: ExtractorIdentity,
148        options: SubscriptionOptions,
149    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151    /// Unsubscribe from an subscription
152    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154    /// Start the clients message handling loop.
155    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157    /// Close the clients message handling loop.
158    async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163    /// The tycho indexer websocket uri.
164    uri: Uri,
165    /// Authorization key for the websocket connection.
166    auth_key: Option<String>,
167    /// Maximum amount of reconnects to try before giving up.
168    max_reconnects: u64,
169    /// Duration to wait before attempting to reconnect
170    retry_cooldown: Duration,
171    /// The client will buffer this many messages incoming from the websocket
172    /// before starting to drop them.
173    ws_buffer_size: usize,
174    /// The client will buffer that many messages for each subscription before it starts droppping
175    /// them.
176    subscription_buffer_size: usize,
177    /// Notify tasks waiting for a connection to be established.
178    conn_notify: Arc<Notify>,
179    /// Shared client instance state.
180    inner: Arc<Mutex<Option<Inner>>>,
181    /// If set the client has exhausted its reconnection attempts
182    dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188/// Subscription State
189///
190/// Subscription go through a lifecycle:
191///
192/// ```text
193/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
194/// ```
195///
196/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
197/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
198/// for `unsubscribe`.
199#[derive(Debug)]
200enum SubscriptionInfo {
201    /// Subscription was requested we wait for server confirmation and uuid assignment.
202    RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203    /// Subscription is active.
204    Active,
205    /// Unsubscription was requested, we wait for server confirmation.
206    RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209/// Internal struct containing shared state between of WsDeltaClient instances.
210struct Inner {
211    /// Websocket sender handle.
212    sink: WebSocketSink,
213    /// Command channel sender handle.
214    cmd_tx: Sender<()>,
215    /// Currently pending subscriptions.
216    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217    /// Active subscriptions.
218    subscriptions: HashMap<Uuid, SubscriptionInfo>,
219    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
220    /// subscribe.
221    sender: HashMap<Uuid, Sender<BlockChanges>>,
222    /// How many messages to buffer per subscription before starting to drop new messages.
223    buffer_size: usize,
224}
225
226/// Shared state between all client instances.
227///
228/// This state is behind a mutex and requires synchronization to be read of modified.
229impl Inner {
230    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231        Self {
232            sink,
233            cmd_tx,
234            pending: HashMap::new(),
235            subscriptions: HashMap::new(),
236            sender: HashMap::new(),
237            buffer_size,
238        }
239    }
240
241    /// Registers a new pending subscription.
242    #[allow(clippy::result_large_err)]
243    fn new_subscription(
244        &mut self,
245        id: &ExtractorIdentity,
246        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247    ) -> Result<(), DeltasError> {
248        if self.pending.contains_key(id) {
249            return Err(DeltasError::SubscriptionAlreadyPending);
250        }
251        self.pending
252            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253        Ok(())
254    }
255
256    /// Transitions a pending subscription to active.
257    ///
258    /// Will ignore any request to do so for subscriptions that are not pending.
259    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260        if let Some(info) = self.pending.remove(extractor_id) {
261            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262                let (tx, rx) = mpsc::channel(self.buffer_size);
263                self.sender.insert(subscription_id, tx);
264                self.subscriptions
265                    .insert(subscription_id, SubscriptionInfo::Active);
266                let _ = ready_tx
267                    .send(Ok((subscription_id, rx)))
268                    .map_err(|_| {
269                        warn!(
270                            ?extractor_id,
271                            ?subscription_id,
272                            "Subscriber for has gone away. Ignoring."
273                        )
274                    });
275            } else {
276                error!(
277                    ?extractor_id,
278                    ?subscription_id,
279                    "Pending subscription was not in the correct state to 
280                    transition to active. Ignoring!"
281                )
282            }
283        } else {
284            error!(
285                ?extractor_id,
286                ?subscription_id,
287                "Tried to mark an unknown subscription as active. Ignoring!"
288            );
289        }
290    }
291
292    /// Sends a message to a subscription's receiver.
293    #[allow(clippy::result_large_err)]
294    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295        if let Some(sender) = self.sender.get_mut(id) {
296            sender
297                .try_send(msg)
298                .map_err(|e| match e {
299                    TrySendError::Full(_) => DeltasError::BufferFull,
300                    TrySendError::Closed(_) => {
301                        DeltasError::TransportError("The subscriber has gone away".to_string())
302                    }
303                })?;
304        }
305        Ok(())
306    }
307
308    /// Requests a subscription to end.
309    ///
310    /// The subscription needs to exist and be active for this to have any effect. Wll use
311    /// `ready_tx` to notify the receiver once the transition to ended completed.
312    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313        if let Some(info) = self
314            .subscriptions
315            .get_mut(subscription_id)
316        {
317            if let SubscriptionInfo::Active = info {
318                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319            }
320        } else {
321            // no big deal imo so only debug lvl...
322            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323        }
324    }
325
326    /// Removes and fully ends a subscription
327    ///
328    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
329    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
330    /// Will remove a subscription even it was in active or pending state before, this is to support
331    /// any server side failure of the subscription.
332    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333        if let Entry::Occupied(e) = self
334            .subscriptions
335            .entry(subscription_id)
336        {
337            let info = e.remove();
338            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339                let _ = tx.send(()).map_err(|_| {
340                    debug!(?subscription_id, "failed to notify about removed subscription")
341                });
342                self.sender
343                    .remove(&subscription_id)
344                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345            } else {
346                warn!(?subscription_id, "Subscription ended unexpectedly!");
347                self.sender
348                    .remove(&subscription_id)
349                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350            }
351        } else {
352            // TODO: There is a race condition that can trigger multiple unsubscribes
353            //  if server doesn't respond quickly enough leading to some ugly logs but
354            //  doesn't affect behaviour negatively. E.g. BufferFull and multiple
355            //  messages from the ws connection are queued.
356            trace!(
357                ?subscription_id,
358                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
359            );
360        }
361
362        Ok(())
363    }
364
365    fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
366        if let Some(sub_info) = self.pending.remove(extractor_id) {
367            match sub_info {
368                SubscriptionInfo::RequestedSubscription(tx) => {
369                    let _ = tx
370                        .send(Err(DeltasError::ServerError(
371                            format!("Subscription failed: {error}"),
372                            error.clone(),
373                        )))
374                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
375                }
376                _ => {
377                    error!(?extractor_id, "Pending subscription in wrong state")
378                }
379            }
380        } else {
381            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
382        }
383    }
384
385    /// Sends a message through the websocket.
386    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
387        self.sink.send(msg).await.map_err(|e| {
388            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
389        })
390    }
391}
392
393/// Tycho client websocket implementation.
394impl WsDeltasClient {
395    // Construct a new client with 5 reconnection attempts.
396    #[allow(clippy::result_large_err)]
397    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
398        let uri = ws_uri
399            .parse::<Uri>()
400            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
401        Ok(Self {
402            uri,
403            auth_key: auth_key.map(|s| s.to_string()),
404            inner: Arc::new(Mutex::new(None)),
405            ws_buffer_size: 128,
406            subscription_buffer_size: 128,
407            conn_notify: Arc::new(Notify::new()),
408            max_reconnects: 5,
409            retry_cooldown: Duration::from_millis(500),
410            dead: Arc::new(AtomicBool::new(false)),
411        })
412    }
413
414    // Construct a new client with a custom number of reconnection attempts.
415    #[allow(clippy::result_large_err)]
416    pub fn new_with_reconnects(
417        ws_uri: &str,
418        auth_key: Option<&str>,
419        max_reconnects: u64,
420        retry_cooldown: Duration,
421    ) -> Result<Self, DeltasError> {
422        let uri = ws_uri
423            .parse::<Uri>()
424            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
425
426        Ok(Self {
427            uri,
428            auth_key: auth_key.map(|s| s.to_string()),
429            inner: Arc::new(Mutex::new(None)),
430            ws_buffer_size: 128,
431            subscription_buffer_size: 128,
432            conn_notify: Arc::new(Notify::new()),
433            max_reconnects,
434            retry_cooldown,
435            dead: Arc::new(AtomicBool::new(false)),
436        })
437    }
438
439    // Construct a new client with custom buffer sizes (for testing)
440    #[cfg(test)]
441    #[allow(clippy::result_large_err)]
442    pub fn new_with_custom_buffers(
443        ws_uri: &str,
444        auth_key: Option<&str>,
445        ws_buffer_size: usize,
446        subscription_buffer_size: usize,
447    ) -> Result<Self, DeltasError> {
448        let uri = ws_uri
449            .parse::<Uri>()
450            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
451        Ok(Self {
452            uri,
453            auth_key: auth_key.map(|s| s.to_string()),
454            inner: Arc::new(Mutex::new(None)),
455            ws_buffer_size,
456            subscription_buffer_size,
457            conn_notify: Arc::new(Notify::new()),
458            max_reconnects: 5,
459            retry_cooldown: Duration::from_millis(0),
460            dead: Arc::new(AtomicBool::new(false)),
461        })
462    }
463
464    /// Ensures that the client is connected.
465    ///
466    /// This method will acquire the lock for inner.
467    async fn is_connected(&self) -> bool {
468        let guard = self.inner.as_ref().lock().await;
469        guard.is_some()
470    }
471
472    /// Waits for the client to be connected
473    ///
474    /// This method acquires the lock for inner for a short period, then waits until the  
475    /// connection is established if not already connected.
476    async fn ensure_connection(&self) -> Result<(), DeltasError> {
477        if self.dead.load(Ordering::SeqCst) {
478            return Err(DeltasError::NotConnected)
479        };
480        if !self.is_connected().await {
481            self.conn_notify.notified().await;
482        };
483        Ok(())
484    }
485
486    /// Main message handling logic
487    ///
488    /// If the message returns an error, a reconnect attempt may be considered depending on the
489    /// error type.
490    #[instrument(skip(self, msg))]
491    async fn handle_msg(
492        &self,
493        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
494    ) -> Result<(), DeltasError> {
495        let mut guard = self.inner.lock().await;
496
497        match msg {
498            // We do not deserialize the message directly into a WebSocketMessage. This is because
499            // the serde arbitrary_precision feature (often included in many
500            // dependencies we use) breaks some untagged enum deserializations. Instead,
501            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
502            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
503                serde_json::Value,
504            >(&text)
505            {
506                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
507                    Ok(ws_message) => match ws_message {
508                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
509                            trace!(?deltas, "Received a block state change, sending to channel");
510                            let inner = guard
511                                .as_mut()
512                                .ok_or_else(|| DeltasError::NotConnected)?;
513                            match inner.send(&subscription_id, deltas) {
514                                Err(DeltasError::BufferFull) => {
515                                    error!(?subscription_id, "Buffer full, unsubscribing!");
516                                    Self::force_unsubscribe(subscription_id, inner).await;
517                                }
518                                Err(_) => {
519                                    warn!(
520                                        ?subscription_id,
521                                        "Receiver for has gone away, unsubscribing!"
522                                    );
523                                    Self::force_unsubscribe(subscription_id, inner).await;
524                                }
525                                _ => { /* Do nothing */ }
526                            }
527                        }
528                        WebSocketMessage::Response(Response::NewSubscription {
529                            extractor_id,
530                            subscription_id,
531                        }) => {
532                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
533                            let inner = guard
534                                .as_mut()
535                                .ok_or_else(|| DeltasError::NotConnected)?;
536                            inner.mark_active(&extractor_id, subscription_id);
537                        }
538                        WebSocketMessage::Response(Response::SubscriptionEnded {
539                            subscription_id,
540                        }) => {
541                            info!(?subscription_id, "Received a subscription ended");
542                            let inner = guard
543                                .as_mut()
544                                .ok_or_else(|| DeltasError::NotConnected)?;
545                            inner.remove_subscription(subscription_id)?;
546                        }
547                        WebSocketMessage::Response(Response::Error(error)) => match &error {
548                            WebsocketError::ExtractorNotFound(extractor_id) => {
549                                let inner = guard
550                                    .as_mut()
551                                    .ok_or_else(|| DeltasError::NotConnected)?;
552                                inner.cancel_pending(extractor_id, &error);
553                            }
554                            WebsocketError::SubscriptionNotFound(subscription_id) => {
555                                debug!("Received subscription not found, removing subscription");
556                                let inner = guard
557                                    .as_mut()
558                                    .ok_or_else(|| DeltasError::NotConnected)?;
559                                inner.remove_subscription(*subscription_id)?;
560                            }
561                            WebsocketError::ParseError(raw, e) => {
562                                return Err(DeltasError::ServerError(
563                                    format!(
564                                        "Server failed to parse client message: {e}, msg: {raw}"
565                                    ),
566                                    error.clone(),
567                                ))
568                            }
569                            WebsocketError::SubscribeError(extractor_id) => {
570                                let inner = guard
571                                    .as_mut()
572                                    .ok_or_else(|| DeltasError::NotConnected)?;
573                                inner.cancel_pending(extractor_id, &error);
574                            }
575                        },
576                    },
577                    Err(e) => {
578                        error!(
579                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
580                            e, text
581                        );
582                    }
583                },
584                Err(e) => {
585                    error!(
586                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
587                        e, text
588                    );
589                }
590            },
591            Ok(tungstenite::protocol::Message::Ping(_)) => {
592                // Respond to pings with pongs.
593                let inner = guard
594                    .as_mut()
595                    .ok_or_else(|| DeltasError::NotConnected)?;
596                if let Err(error) = inner
597                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
598                    .await
599                {
600                    debug!(?error, "Failed to send pong!");
601                }
602            }
603            Ok(tungstenite::protocol::Message::Pong(_)) => {
604                // Do nothing.
605            }
606            Ok(tungstenite::protocol::Message::Close(_)) => {
607                return Err(DeltasError::ConnectionClosed);
608            }
609            Ok(unknown_msg) => {
610                info!("Received an unknown message type: {:?}", unknown_msg);
611            }
612            Err(error) => {
613                error!(?error, "Websocket error");
614                return Err(match error {
615                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
616                    tungstenite::Error::AlreadyClosed => {
617                        warn!("Received AlreadyClosed error which is indicative of a bug!");
618                        DeltasError::ConnectionError(Box::new(error))
619                    }
620                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
621                        DeltasError::ConnectionError(Box::new(error))
622                    }
623                    _ => DeltasError::Fatal(error.to_string()),
624                });
625            }
626        };
627        Ok(())
628    }
629
630    /// Forcefully ends a (client) stream by unsubscribing.
631    ///
632    /// Is used only if the message can't be processed due to an error that might resolve
633    /// itself by resubscribing.
634    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
635        // avoid unsubscribing multiple times
636        if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
637            .subscriptions
638            .get(&subscription_id)
639        {
640            return
641        }
642
643        let (tx, rx) = oneshot::channel();
644        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
645            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
646        } else {
647            // Wait for unsubscribe completion with timeout
648            match tokio::time::timeout(Duration::from_secs(5), rx).await {
649                Ok(_) => {
650                    debug!(?subscription_id, "Unsubscribe completed successfully");
651                }
652                Err(_) => {
653                    warn!(?subscription_id, "Unsubscribe completion timed out");
654                }
655            }
656        }
657    }
658
659    /// Helper method to force an unsubscription
660    ///
661    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
662    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
663    /// receivers.
664    async fn unsubscribe_inner(
665        inner: &mut Inner,
666        subscription_id: Uuid,
667        ready_tx: oneshot::Sender<()>,
668    ) -> Result<(), DeltasError> {
669        debug!(?subscription_id, "Unsubscribing");
670        inner.end_subscription(&subscription_id, ready_tx);
671        let cmd = Command::Unsubscribe { subscription_id };
672        inner
673            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
674                |e| {
675                    DeltasError::TransportError(format!(
676                        "Failed to serialize unsubscribe command: {e}"
677                    ))
678                },
679            )?))
680            .await?;
681        Ok(())
682    }
683}
684
685#[async_trait]
686impl DeltasClient for WsDeltasClient {
687    #[instrument(skip(self))]
688    async fn subscribe(
689        &self,
690        extractor_id: ExtractorIdentity,
691        options: SubscriptionOptions,
692    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
693        trace!("Starting subscribe");
694        self.ensure_connection().await?;
695        let (ready_tx, ready_rx) = oneshot::channel();
696        {
697            let mut guard = self.inner.lock().await;
698            let inner = guard
699                .as_mut()
700                .ok_or_else(|| DeltasError::NotConnected)?;
701            trace!("Sending subscribe command");
702            inner.new_subscription(&extractor_id, ready_tx)?;
703            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
704            inner
705                .ws_send(tungstenite::protocol::Message::Text(
706                    serde_json::to_string(&cmd).map_err(|e| {
707                        DeltasError::TransportError(format!(
708                            "Failed to serialize subscribe command: {e}"
709                        ))
710                    })?,
711                ))
712                .await?;
713        }
714        trace!("Waiting for subscription response");
715        let res = ready_rx.await.map_err(|_| {
716            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
717        })??;
718        trace!("Subscription successful");
719        Ok(res)
720    }
721
722    #[instrument(skip(self))]
723    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
724        self.ensure_connection().await?;
725        let (ready_tx, ready_rx) = oneshot::channel();
726        {
727            let mut guard = self.inner.lock().await;
728            let inner = guard
729                .as_mut()
730                .ok_or_else(|| DeltasError::NotConnected)?;
731
732            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
733        }
734        ready_rx.await.map_err(|_| {
735            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
736        })?;
737
738        Ok(())
739    }
740
741    #[instrument(skip(self))]
742    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
743        if self.is_connected().await {
744            return Err(DeltasError::AlreadyConnected);
745        }
746        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
747        info!(?ws_uri, "Starting TychoWebsocketClient");
748
749        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
750        {
751            let mut guard = self.inner.as_ref().lock().await;
752            *guard = None;
753        }
754        let this = self.clone();
755        let jh = tokio::spawn(async move {
756            let mut retry_count = 0;
757            let mut result = Err(DeltasError::NotConnected);
758
759            'retry: while retry_count < this.max_reconnects {
760                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
761                if retry_count > 0 {
762                    sleep(this.retry_cooldown).await;
763                }
764
765                // Create a WebSocket request
766                let mut request_builder = Request::builder()
767                    .uri(&ws_uri)
768                    .header(SEC_WEBSOCKET_KEY, generate_key())
769                    .header(SEC_WEBSOCKET_VERSION, 13)
770                    .header(CONNECTION, "Upgrade")
771                    .header(UPGRADE, "websocket")
772                    .header(
773                        HOST,
774                        this.uri.host().ok_or_else(|| {
775                            DeltasError::UriParsing(
776                                ws_uri.clone(),
777                                "No host found in tycho url".to_string(),
778                            )
779                        })?,
780                    )
781                    .header(
782                        USER_AGENT,
783                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
784                    );
785
786                // Add Authorization if one is given
787                if let Some(ref key) = this.auth_key {
788                    request_builder = request_builder.header(AUTHORIZATION, key);
789                }
790
791                let request = request_builder.body(()).map_err(|e| {
792                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
793                })?;
794                let (conn, _) = match connect_async(request).await {
795                    Ok(conn) => conn,
796                    Err(e) => {
797                        // Prepare for reconnection
798                        retry_count += 1;
799                        let mut guard = this.inner.as_ref().lock().await;
800                        *guard = None;
801
802                        warn!(
803                            e = e.to_string(),
804                            "Failed to connect to WebSocket server; Reconnecting"
805                        );
806                        continue 'retry;
807                    }
808                };
809
810                let (ws_tx_new, ws_rx_new) = conn.split();
811                {
812                    let mut guard = this.inner.as_ref().lock().await;
813                    *guard =
814                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
815                }
816                let mut msg_rx = ws_rx_new.boxed();
817
818                info!("Connection Successful: TychoWebsocketClient started");
819                this.conn_notify.notify_waiters();
820                result = Ok(());
821
822                loop {
823                    let res = tokio::select! {
824                        msg = msg_rx.next() => match msg {
825                            Some(msg) => this.handle_msg(msg).await,
826                            None => {
827                                // This code should not be reachable since the stream
828                                // should return ConnectionClosed in the case above
829                                // before it returns None here.
830                                warn!("Websocket connection silently closed, giving up!");
831                                break 'retry
832                            }
833                        },
834                        _ = cmd_rx.recv() => {break 'retry},
835                    };
836                    if let Err(error) = res {
837                        debug!(?error, "WsError");
838                        if matches!(
839                            error,
840                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
841                        ) {
842                            // Prepare for reconnection
843                            retry_count += 1;
844                            let mut guard = this.inner.as_ref().lock().await;
845                            *guard = None;
846
847                            warn!(
848                                ?error,
849                                ?retry_count,
850                                "Connection dropped unexpectedly; Reconnecting..."
851                            );
852                            break;
853                        } else {
854                            // Other errors are considered fatal
855                            error!(?error, "Fatal error; Exiting");
856                            result = Err(error);
857                            break 'retry;
858                        }
859                    }
860                }
861            }
862            debug!(
863                retry_count,
864                max_reconnects=?this.max_reconnects,
865                "Reconnection loop ended"
866            );
867            // Clean up before exiting
868            let mut guard = this.inner.as_ref().lock().await;
869            *guard = None;
870
871            // Check if max retries has been reached.
872            if retry_count >= this.max_reconnects {
873                error!("Max reconnection attempts reached; Exiting");
874                this.dead.store(true, Ordering::SeqCst);
875                this.conn_notify.notify_waiters(); // Notify that the task is done
876                result = Err(DeltasError::ConnectionClosed);
877            }
878
879            result
880        });
881
882        self.conn_notify.notified().await;
883
884        if self.is_connected().await {
885            Ok(jh)
886        } else {
887            Err(DeltasError::NotConnected)
888        }
889    }
890
891    #[instrument(skip(self))]
892    async fn close(&self) -> Result<(), DeltasError> {
893        info!("Closing TychoWebsocketClient");
894        let mut guard = self.inner.lock().await;
895        let inner = guard
896            .as_mut()
897            .ok_or_else(|| DeltasError::NotConnected)?;
898        inner
899            .cmd_tx
900            .send(())
901            .await
902            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
903        Ok(())
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use std::net::SocketAddr;
910
911    use test_log::test;
912    use tokio::{net::TcpListener, time::timeout};
913    use tycho_common::dto::Chain;
914
915    use super::*;
916
917    #[derive(Clone)]
918    enum ExpectedComm {
919        Receive(u64, tungstenite::protocol::Message),
920        Send(tungstenite::protocol::Message),
921    }
922
923    async fn mock_tycho_ws(
924        messages: &[ExpectedComm],
925        reconnects: usize,
926    ) -> (SocketAddr, JoinHandle<()>) {
927        info!("Starting mock webserver");
928        // zero port here means the OS chooses an open port
929        let server = TcpListener::bind("127.0.0.1:0")
930            .await
931            .expect("localhost bind failed");
932        let addr = server.local_addr().unwrap();
933        let messages = messages.to_vec();
934
935        let jh = tokio::spawn(async move {
936            info!("mock webserver started");
937            for _ in 0..(reconnects + 1) {
938                info!("Awaiting client connections");
939                if let Ok((stream, _)) = server.accept().await {
940                    info!("Client connected");
941                    let mut websocket = tokio_tungstenite::accept_async(stream)
942                        .await
943                        .unwrap();
944
945                    info!("Handling messages..");
946                    for c in messages.iter().cloned() {
947                        match c {
948                            ExpectedComm::Receive(t, exp) => {
949                                info!("Awaiting message...");
950                                let msg = timeout(Duration::from_millis(t), websocket.next())
951                                    .await
952                                    .expect("Receive timeout")
953                                    .expect("Stream exhausted")
954                                    .expect("Failed to receive message.");
955                                info!("Message received");
956                                assert_eq!(msg, exp)
957                            }
958                            ExpectedComm::Send(data) => {
959                                info!("Sending message");
960                                websocket
961                                    .send(data)
962                                    .await
963                                    .expect("Failed to send message");
964                                info!("Message sent");
965                            }
966                        };
967                    }
968                    info!("Mock communication completed");
969                    sleep(Duration::from_millis(100)).await;
970                    // Close the WebSocket connection
971                    let _ = websocket.close(None).await;
972                    info!("Mock server closed connection");
973                }
974            }
975            info!("mock server ended");
976        });
977        (addr, jh)
978    }
979
980    #[tokio::test]
981    async fn test_subscribe_receive() {
982        let exp_comm = [
983            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
984                {
985                    "method":"subscribe",
986                    "extractor_id":{
987                        "chain":"ethereum",
988                        "name":"vm:ambient"
989                    },
990                    "include_state": true
991                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
992            )),
993            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
994                {
995                    "method":"newsubscription",
996                    "extractor_id":{
997                    "chain":"ethereum",
998                    "name":"vm:ambient"
999                    },
1000                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1001                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1002            )),
1003            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1004                {
1005                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1006                    "deltas": {
1007                        "extractor": "vm:ambient",
1008                        "chain": "ethereum",
1009                        "block": {
1010                            "number": 123,
1011                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1012                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1013                            "chain": "ethereum",             
1014                            "ts": "2023-09-14T00:00:00"
1015                        },
1016                        "finalized_block_height": 0,
1017                        "revert": false,
1018                        "new_tokens": {},
1019                        "account_updates": {
1020                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1021                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1022                                "chain": "ethereum",
1023                                "slots": {},
1024                                "balance": "0x01f4",
1025                                "code": "",
1026                                "change": "Update"
1027                            }
1028                        },
1029                        "state_updates": {
1030                            "component_1": {
1031                                "component_id": "component_1",
1032                                "updated_attributes": {"attr1": "0x01"},
1033                                "deleted_attributes": ["attr2"]
1034                            }
1035                        },
1036                        "new_protocol_components": 
1037                            { "protocol_1": {
1038                                    "id": "protocol_1",
1039                                    "protocol_system": "system_1",
1040                                    "protocol_type_name": "type_1",
1041                                    "chain": "ethereum",
1042                                    "tokens": ["0x01", "0x02"],
1043                                    "contract_ids": ["0x01", "0x02"],
1044                                    "static_attributes": {"attr1": "0x01f4"},
1045                                    "change": "Update",
1046                                    "creation_tx": "0x01",
1047                                    "created_at": "2023-09-14T00:00:00"
1048                                }
1049                            },
1050                        "deleted_protocol_components": {},
1051                        "component_balances": {
1052                            "protocol_1":
1053                                {
1054                                    "0x01": {
1055                                        "token": "0x01",
1056                                        "balance": "0x01f4",
1057                                        "balance_float": 0.0,
1058                                        "modify_tx": "0x01",
1059                                        "component_id": "protocol_1"
1060                                    }
1061                                }
1062                        },
1063                        "account_balances": {
1064                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1065                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1066                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1067                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1068                                    "balance": "0x01f4",
1069                                    "modify_tx": "0x01"
1070                                }
1071                            }
1072                        },
1073                        "component_tvl": {
1074                            "protocol_1": 1000.0
1075                        },
1076                        "dci_update": {
1077                            "new_entrypoints": {},
1078                            "new_entrypoint_params": {},
1079                            "trace_results": {}
1080                        }
1081                    }
1082                }
1083                "#.to_owned()
1084            ))
1085        ];
1086        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1087
1088        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1089        let jh = client
1090            .connect()
1091            .await
1092            .expect("connect failed");
1093        let (_, mut rx) = timeout(
1094            Duration::from_millis(100),
1095            client.subscribe(
1096                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1097                SubscriptionOptions::new(),
1098            ),
1099        )
1100        .await
1101        .expect("subscription timed out")
1102        .expect("subscription failed");
1103        let _ = timeout(Duration::from_millis(100), rx.recv())
1104            .await
1105            .expect("awaiting message timeout out")
1106            .expect("receiving message failed");
1107        timeout(Duration::from_millis(100), client.close())
1108            .await
1109            .expect("close timed out")
1110            .expect("close failed");
1111        jh.await
1112            .expect("ws loop errored")
1113            .unwrap();
1114        server_thread.await.unwrap();
1115    }
1116
1117    #[tokio::test]
1118    async fn test_unsubscribe() {
1119        let exp_comm = [
1120            ExpectedComm::Receive(
1121                100,
1122                tungstenite::protocol::Message::Text(
1123                    r#"
1124                {
1125                    "method": "subscribe",
1126                    "extractor_id":{
1127                        "chain": "ethereum",
1128                        "name": "vm:ambient"
1129                    },
1130                    "include_state": true
1131                }"#
1132                    .to_owned()
1133                    .replace(|c: char| c.is_whitespace(), ""),
1134                ),
1135            ),
1136            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1137                r#"
1138                {
1139                    "method": "newsubscription",
1140                    "extractor_id":{
1141                        "chain": "ethereum",
1142                        "name": "vm:ambient"
1143                    },
1144                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1145                }"#
1146                .to_owned()
1147                .replace(|c: char| c.is_whitespace(), ""),
1148            )),
1149            ExpectedComm::Receive(
1150                100,
1151                tungstenite::protocol::Message::Text(
1152                    r#"
1153                {
1154                    "method": "unsubscribe",
1155                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1156                }
1157                "#
1158                    .to_owned()
1159                    .replace(|c: char| c.is_whitespace(), ""),
1160                ),
1161            ),
1162            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1163                r#"
1164                {
1165                    "method": "subscriptionended",
1166                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1167                }
1168                "#
1169                .to_owned()
1170                .replace(|c: char| c.is_whitespace(), ""),
1171            )),
1172        ];
1173        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1174
1175        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1176        let jh = client
1177            .connect()
1178            .await
1179            .expect("connect failed");
1180        let (sub_id, mut rx) = timeout(
1181            Duration::from_millis(100),
1182            client.subscribe(
1183                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1184                SubscriptionOptions::new(),
1185            ),
1186        )
1187        .await
1188        .expect("subscription timed out")
1189        .expect("subscription failed");
1190
1191        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1192            .await
1193            .expect("unsubscribe timed out")
1194            .expect("unsubscribe failed");
1195        let res = timeout(Duration::from_millis(100), rx.recv())
1196            .await
1197            .expect("awaiting message timeout out");
1198
1199        // If the subscription ended, the channel should have been closed.
1200        assert!(res.is_none());
1201
1202        timeout(Duration::from_millis(100), client.close())
1203            .await
1204            .expect("close timed out")
1205            .expect("close failed");
1206        jh.await
1207            .expect("ws loop errored")
1208            .unwrap();
1209        server_thread.await.unwrap();
1210    }
1211
1212    #[tokio::test]
1213    async fn test_subscription_unexpected_end() {
1214        let exp_comm = [
1215            ExpectedComm::Receive(
1216                100,
1217                tungstenite::protocol::Message::Text(
1218                    r#"
1219                {
1220                    "method":"subscribe",
1221                    "extractor_id":{
1222                        "chain":"ethereum",
1223                        "name":"vm:ambient"
1224                    },
1225                    "include_state": true
1226                }"#
1227                    .to_owned()
1228                    .replace(|c: char| c.is_whitespace(), ""),
1229                ),
1230            ),
1231            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1232                r#"
1233                {
1234                    "method":"newsubscription",
1235                    "extractor_id":{
1236                        "chain":"ethereum",
1237                        "name":"vm:ambient"
1238                    },
1239                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1240                }"#
1241                .to_owned()
1242                .replace(|c: char| c.is_whitespace(), ""),
1243            )),
1244            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1245                r#"
1246                {
1247                    "method": "subscriptionended",
1248                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1249                }"#
1250                .to_owned()
1251                .replace(|c: char| c.is_whitespace(), ""),
1252            )),
1253        ];
1254        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1255
1256        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1257        let jh = client
1258            .connect()
1259            .await
1260            .expect("connect failed");
1261        let (_, mut rx) = timeout(
1262            Duration::from_millis(100),
1263            client.subscribe(
1264                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1265                SubscriptionOptions::new(),
1266            ),
1267        )
1268        .await
1269        .expect("subscription timed out")
1270        .expect("subscription failed");
1271        let res = timeout(Duration::from_millis(100), rx.recv())
1272            .await
1273            .expect("awaiting message timeout out");
1274
1275        // If the subscription ended, the channel should have been closed.
1276        assert!(res.is_none());
1277
1278        timeout(Duration::from_millis(100), client.close())
1279            .await
1280            .expect("close timed out")
1281            .expect("close failed");
1282        jh.await
1283            .expect("ws loop errored")
1284            .unwrap();
1285        server_thread.await.unwrap();
1286    }
1287
1288    #[test_log::test(tokio::test)]
1289    async fn test_reconnect() {
1290        let exp_comm = [
1291            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1292                {
1293                    "method":"subscribe",
1294                    "extractor_id":{
1295                        "chain":"ethereum",
1296                        "name":"vm:ambient"
1297                    },
1298                    "include_state": true
1299                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1300            )),
1301            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1302                {
1303                    "method":"newsubscription",
1304                    "extractor_id":{
1305                    "chain":"ethereum",
1306                    "name":"vm:ambient"
1307                    },
1308                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1309                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1310            )),
1311            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1312                {
1313                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1314                    "deltas": {
1315                        "extractor": "vm:ambient",
1316                        "chain": "ethereum",
1317                        "block": {
1318                            "number": 123,
1319                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1320                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1321                            "chain": "ethereum",             
1322                            "ts": "2023-09-14T00:00:00"
1323                        },
1324                        "finalized_block_height": 0,
1325                        "revert": false,
1326                        "new_tokens": {},
1327                        "account_updates": {
1328                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1329                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1330                                "chain": "ethereum",
1331                                "slots": {},
1332                                "balance": "0x01f4",
1333                                "code": "",
1334                                "change": "Update"
1335                            }
1336                        },
1337                        "state_updates": {
1338                            "component_1": {
1339                                "component_id": "component_1",
1340                                "updated_attributes": {"attr1": "0x01"},
1341                                "deleted_attributes": ["attr2"]
1342                            }
1343                        },
1344                        "new_protocol_components": {
1345                            "protocol_1":
1346                                {
1347                                    "id": "protocol_1",
1348                                    "protocol_system": "system_1",
1349                                    "protocol_type_name": "type_1",
1350                                    "chain": "ethereum",
1351                                    "tokens": ["0x01", "0x02"],
1352                                    "contract_ids": ["0x01", "0x02"],
1353                                    "static_attributes": {"attr1": "0x01f4"},
1354                                    "change": "Update",
1355                                    "creation_tx": "0x01",
1356                                    "created_at": "2023-09-14T00:00:00"
1357                                }
1358                            },
1359                        "deleted_protocol_components": {},
1360                        "component_balances": {
1361                            "protocol_1": {
1362                                "0x01": {
1363                                    "token": "0x01",
1364                                    "balance": "0x01f4",
1365                                    "balance_float": 1000.0,
1366                                    "modify_tx": "0x01",
1367                                    "component_id": "protocol_1"
1368                                }
1369                            }
1370                        },
1371                        "account_balances": {
1372                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1373                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1374                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1375                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1376                                    "balance": "0x01f4",
1377                                    "modify_tx": "0x01"
1378                                }
1379                            }
1380                        },
1381                        "component_tvl": {
1382                            "protocol_1": 1000.0
1383                        },
1384                        "dci_update": {
1385                            "new_entrypoints": {},
1386                            "new_entrypoint_params": {},
1387                            "trace_results": {}
1388                        }
1389                    }
1390                }
1391                "#.to_owned()
1392            ))
1393        ];
1394        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1395        let client = WsDeltasClient::new_with_reconnects(
1396            &format!("ws://{addr}"),
1397            None,
1398            3,
1399            // server stays down for 100ms on connection drop
1400            Duration::from_millis(110),
1401        )
1402        .unwrap();
1403
1404        let jh: JoinHandle<Result<(), DeltasError>> = client
1405            .connect()
1406            .await
1407            .expect("connect failed");
1408
1409        for _ in 0..2 {
1410            dbg!("loop");
1411            let (_, mut rx) = timeout(
1412                Duration::from_millis(200),
1413                client.subscribe(
1414                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1415                    SubscriptionOptions::new(),
1416                ),
1417            )
1418            .await
1419            .expect("subscription timed out")
1420            .expect("subscription failed");
1421
1422            let _ = timeout(Duration::from_millis(100), rx.recv())
1423                .await
1424                .expect("awaiting message timeout out")
1425                .expect("receiving message failed");
1426
1427            // wait for the connection to drop
1428            let res = timeout(Duration::from_millis(200), rx.recv())
1429                .await
1430                .expect("awaiting closed connection timeout out");
1431            assert!(res.is_none());
1432        }
1433        let res = jh.await.expect("ws client join failed");
1434        // 5th client reconnect attempt should fail
1435        assert!(res.is_err());
1436        server_thread
1437            .await
1438            .expect("ws server loop errored");
1439    }
1440
1441    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1442        let server = TcpListener::bind("127.0.0.1:0")
1443            .await
1444            .expect("localhost bind failed");
1445        let addr = server.local_addr().unwrap();
1446        let jh = tokio::spawn(async move {
1447            while let Ok((stream, _)) = server.accept().await {
1448                if accept_first {
1449                    // Send connection handshake to accept the connection (fail later)
1450                    let stream = tokio_tungstenite::accept_async(stream)
1451                        .await
1452                        .unwrap();
1453                    sleep(Duration::from_millis(10)).await;
1454                    drop(stream)
1455                } else {
1456                    // Close the connection to simulate a failure
1457                    drop(stream);
1458                }
1459            }
1460        });
1461        (addr, jh)
1462    }
1463
1464    #[test(tokio::test)]
1465    async fn test_subscribe_dead_client_after_max_attempts() {
1466        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1467        let client = WsDeltasClient::new_with_reconnects(
1468            &format!("ws://{addr}"),
1469            None,
1470            3,
1471            Duration::from_secs(0),
1472        )
1473        .unwrap();
1474
1475        let join_handle = client.connect().await.unwrap();
1476        let handle_res = join_handle.await.unwrap();
1477        assert!(handle_res.is_err());
1478        assert!(!client.is_connected().await);
1479
1480        let subscription_res = timeout(
1481            Duration::from_millis(10),
1482            client.subscribe(
1483                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1484                SubscriptionOptions::new(),
1485            ),
1486        )
1487        .await
1488        .unwrap();
1489        assert!(subscription_res.is_err());
1490    }
1491
1492    #[test(tokio::test)]
1493    async fn test_ws_client_retry_cooldown() {
1494        let start = std::time::Instant::now();
1495        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1496
1497        // Use the mock server that immediately drops connections
1498        let client = WsDeltasClient::new_with_reconnects(
1499            &format!("ws://{addr}"),
1500            None,
1501            3,                         // 3 attempts total (so 2 retries with cooldowns)
1502            Duration::from_millis(50), // 50ms cooldown
1503        )
1504        .unwrap();
1505
1506        // Try to connect - this should fail after retries but still measure the time
1507        let connect_result = client.connect().await;
1508        let elapsed = start.elapsed();
1509
1510        // Connection should fail after exhausting retries
1511        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1512
1513        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1514        assert!(
1515            elapsed >= Duration::from_millis(100),
1516            "Expected at least 100ms elapsed, got {:?}",
1517            elapsed
1518        );
1519
1520        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1521        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1522    }
1523
1524    #[test_log::test(tokio::test)]
1525    async fn test_buffer_full_triggers_unsubscribe() {
1526        // Expected communication sequence for buffer full scenario
1527        let exp_comm = {
1528            [
1529            // 1. Client subscribes
1530            ExpectedComm::Receive(
1531                100,
1532                tungstenite::protocol::Message::Text(
1533                    r#"
1534                {
1535                    "method":"subscribe",
1536                    "extractor_id":{
1537                        "chain":"ethereum",
1538                        "name":"vm:ambient"
1539                    },
1540                    "include_state": true
1541                }"#
1542                    .to_owned()
1543                    .replace(|c: char| c.is_whitespace(), ""),
1544                ),
1545            ),
1546            // 2. Server confirms subscription
1547            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1548                r#"
1549                {
1550                    "method":"newsubscription",
1551                    "extractor_id":{
1552                        "chain":"ethereum",
1553                        "name":"vm:ambient"
1554                    },
1555                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1556                }"#
1557                .to_owned()
1558                .replace(|c: char| c.is_whitespace(), ""),
1559            )),
1560            // 3. Server sends first message (fills buffer)
1561            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1562                r#"
1563                {
1564                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1565                    "deltas": {
1566                        "extractor": "vm:ambient",
1567                        "chain": "ethereum",
1568                        "block": {
1569                            "number": 123,
1570                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1571                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1572                            "chain": "ethereum",
1573                            "ts": "2023-09-14T00:00:00"
1574                        },
1575                        "finalized_block_height": 0,
1576                        "revert": false,
1577                        "new_tokens": {},
1578                        "account_updates": {},
1579                        "state_updates": {},
1580                        "new_protocol_components": {},
1581                        "deleted_protocol_components": {},
1582                        "component_balances": {},
1583                        "account_balances": {},
1584                        "component_tvl": {},
1585                        "dci_update": {
1586                            "new_entrypoints": {},
1587                            "new_entrypoint_params": {},
1588                            "trace_results": {}
1589                        }
1590                    }
1591                }
1592                "#.to_owned()
1593            )),
1594            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1595            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1596                r#"
1597                {
1598                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1599                    "deltas": {
1600                        "extractor": "vm:ambient",
1601                        "chain": "ethereum",
1602                        "block": {
1603                            "number": 124,
1604                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1605                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1606                            "chain": "ethereum",
1607                            "ts": "2023-09-14T00:00:01"
1608                        },
1609                        "finalized_block_height": 0,
1610                        "revert": false,
1611                        "new_tokens": {},
1612                        "account_updates": {},
1613                        "state_updates": {},
1614                        "new_protocol_components": {},
1615                        "deleted_protocol_components": {},
1616                        "component_balances": {},
1617                        "account_balances": {},
1618                        "component_tvl": {},
1619                        "dci_update": {
1620                            "new_entrypoints": {},
1621                            "new_entrypoint_params": {},
1622                            "trace_results": {}
1623                        }
1624                    }
1625                }
1626                "#.to_owned()
1627            )),
1628            // 5. Expect unsubscribe command due to buffer full
1629            ExpectedComm::Receive(
1630                100,
1631                tungstenite::protocol::Message::Text(
1632                    r#"
1633                {
1634                    "method": "unsubscribe",
1635                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1636                }
1637                "#
1638                    .to_owned()
1639                    .replace(|c: char| c.is_whitespace(), ""),
1640                ),
1641            ),
1642            // 6. Server confirms unsubscription
1643            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1644                r#"
1645                {
1646                    "method": "subscriptionended",
1647                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1648                }
1649                "#
1650                .to_owned()
1651                .replace(|c: char| c.is_whitespace(), ""),
1652            )),
1653        ]
1654        };
1655
1656        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1657
1658        // Create client with very small buffer size (1) to easily trigger BufferFull
1659        let client = WsDeltasClient::new_with_custom_buffers(
1660            &format!("ws://{addr}"),
1661            None,
1662            128, // ws_buffer_size
1663            1,   // subscription_buffer_size - this will trigger BufferFull easily
1664        )
1665        .unwrap();
1666
1667        let jh = client
1668            .connect()
1669            .await
1670            .expect("connect failed");
1671
1672        let (_sub_id, mut rx) = timeout(
1673            Duration::from_millis(100),
1674            client.subscribe(
1675                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1676                SubscriptionOptions::new(),
1677            ),
1678        )
1679        .await
1680        .expect("subscription timed out")
1681        .expect("subscription failed");
1682
1683        // Allow time for messages to be processed and buffer to fill up
1684        tokio::time::sleep(Duration::from_millis(100)).await;
1685
1686        // Collect all messages until channel closes or we get a reasonable number
1687        let mut received_msgs = Vec::new();
1688
1689        // Use a single longer timeout to collect messages until channel closes
1690        while received_msgs.len() < 3 {
1691            match timeout(Duration::from_millis(200), rx.recv()).await {
1692                Ok(Some(msg)) => {
1693                    received_msgs.push(msg);
1694                }
1695                Ok(None) => {
1696                    // Channel closed - this is what we expect after buffer overflow
1697                    break;
1698                }
1699                Err(_) => {
1700                    // Timeout - no more messages coming
1701                    break;
1702                }
1703            }
1704        }
1705
1706        // Verify the key behavior: buffer overflow should limit messages and close channel
1707        assert!(
1708            received_msgs.len() <= 1,
1709            "Expected buffer overflow to limit messages to at most 1, got {}",
1710            received_msgs.len()
1711        );
1712
1713        if let Some(first_msg) = received_msgs.first() {
1714            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1715        }
1716
1717        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1718        // been verified We don't need to explicitly close the client as it will be cleaned
1719        // up when dropped
1720
1721        // Just wait for the tasks to finish cleanly
1722        drop(rx); // Explicitly drop the receiver
1723        tokio::time::sleep(Duration::from_millis(50)).await;
1724
1725        // Abort the tasks to clean up
1726        jh.abort();
1727        server_thread.abort();
1728
1729        let _ = jh.await;
1730        let _ = server_thread.await;
1731    }
1732
1733    #[tokio::test]
1734    async fn test_server_error_handling() {
1735        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1736
1737        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1738
1739        // Test ExtractorNotFound error
1740        let error_response = WebSocketMessage::Response(Response::Error(
1741            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1742        ));
1743        let error_json = serde_json::to_string(&error_response).unwrap();
1744
1745        let exp_comm = [
1746            ExpectedComm::Receive(
1747                100,
1748                tungstenite::protocol::Message::Text(
1749                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1750                ),
1751            ),
1752            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1753        ];
1754
1755        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1756
1757        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1758        let jh = client
1759            .connect()
1760            .await
1761            .expect("connect failed");
1762
1763        let result = timeout(
1764            Duration::from_millis(100),
1765            client.subscribe(extractor_id, SubscriptionOptions::new()),
1766        )
1767        .await
1768        .expect("subscription timed out");
1769
1770        // Verify that we get a ServerError
1771        assert!(result.is_err());
1772        if let Err(DeltasError::ServerError(msg, _)) = result {
1773            assert!(msg.contains("Subscription failed"));
1774            assert!(msg.contains("Extractor not found"));
1775        } else {
1776            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1777        }
1778
1779        timeout(Duration::from_millis(100), client.close())
1780            .await
1781            .expect("close timed out")
1782            .expect("close failed");
1783        jh.await
1784            .expect("ws loop errored")
1785            .unwrap();
1786        server_thread.await.unwrap();
1787    }
1788
1789    #[test_log::test(tokio::test)]
1790    async fn test_subscription_not_found_error() {
1791        // Test scenario: Server restart causes subscription loss
1792        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1793
1794        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1795        let subscription_id = Uuid::new_v4();
1796
1797        let error_response = WebSocketMessage::Response(Response::Error(
1798            WebsocketError::SubscriptionNotFound(subscription_id),
1799        ));
1800        let error_json = serde_json::to_string(&error_response).unwrap();
1801
1802        let exp_comm = [
1803            // 1. Client subscribes successfully
1804            ExpectedComm::Receive(
1805                100,
1806                tungstenite::protocol::Message::Text(
1807                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1808                ),
1809            ),
1810            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1811                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1812                subscription_id
1813            ))),
1814            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1815            ExpectedComm::Receive(
1816                100,
1817                tungstenite::protocol::Message::Text(format!(
1818                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1819                    subscription_id
1820                )),
1821            ),
1822            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1823            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1824        ];
1825
1826        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1827
1828        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1829        let jh = client
1830            .connect()
1831            .await
1832            .expect("connect failed");
1833
1834        // Subscribe successfully
1835        let (received_sub_id, _rx) = timeout(
1836            Duration::from_millis(100),
1837            client.subscribe(extractor_id, SubscriptionOptions::new()),
1838        )
1839        .await
1840        .expect("subscription timed out")
1841        .expect("subscription failed");
1842
1843        assert_eq!(received_sub_id, subscription_id);
1844
1845        // Now try to unsubscribe - this should fail because server "restarted"
1846        let unsubscribe_result =
1847            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1848                .await
1849                .expect("unsubscribe timed out");
1850
1851        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1852        // In this case, the client should treat it as successful since the subscription
1853        // is effectively gone (whether due to server restart or other reasons)
1854        unsubscribe_result
1855            .expect("Unsubscribe should succeed even if server says subscription not found");
1856
1857        timeout(Duration::from_millis(100), client.close())
1858            .await
1859            .expect("close timed out")
1860            .expect("close failed");
1861        jh.await
1862            .expect("ws loop errored")
1863            .unwrap();
1864        server_thread.await.unwrap();
1865    }
1866
1867    #[test_log::test(tokio::test)]
1868    async fn test_parse_error_handling() {
1869        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1870
1871        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1872        let error_response = WebSocketMessage::Response(Response::Error(
1873            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1874        ));
1875        let error_json = serde_json::to_string(&error_response).unwrap();
1876
1877        let exp_comm = [
1878            // subscribe first so connect can finish successfully
1879            ExpectedComm::Receive(
1880                100,
1881                tungstenite::protocol::Message::Text(
1882                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1883                ),
1884            ),
1885            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1886        ];
1887
1888        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1889
1890        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1891        let jh = client
1892            .connect()
1893            .await
1894            .expect("connect failed");
1895
1896        // Subscribe successfully
1897        let _ = timeout(
1898            Duration::from_millis(100),
1899            client.subscribe(extractor_id, SubscriptionOptions::new()),
1900        )
1901        .await
1902        .expect("subscription timed out");
1903
1904        // The client should receive the parse error and close the connection
1905        let result = jh
1906            .await
1907            .expect("ws loop should complete");
1908        assert!(result.is_err());
1909        if let Err(DeltasError::ServerError(message, _)) = result {
1910            assert!(message.contains("Server failed to parse client message"));
1911        } else {
1912            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1913        }
1914
1915        server_thread.await.unwrap();
1916    }
1917
1918    #[tokio::test]
1919    async fn test_subscribe_error_handling() {
1920        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1921
1922        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1923
1924        let error_response = WebSocketMessage::Response(Response::Error(
1925            WebsocketError::SubscribeError(extractor_id.clone()),
1926        ));
1927        let error_json = serde_json::to_string(&error_response).unwrap();
1928
1929        let exp_comm = [
1930            ExpectedComm::Receive(
1931                100,
1932                tungstenite::protocol::Message::Text(
1933                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true}"#.to_string()
1934                ),
1935            ),
1936            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1937        ];
1938
1939        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1940
1941        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1942        let jh = client
1943            .connect()
1944            .await
1945            .expect("connect failed");
1946
1947        let result = timeout(
1948            Duration::from_millis(100),
1949            client.subscribe(extractor_id, SubscriptionOptions::new()),
1950        )
1951        .await
1952        .expect("subscription timed out");
1953
1954        // Verify that we get a ServerError for subscribe failure
1955        assert!(result.is_err());
1956        if let Err(DeltasError::ServerError(msg, _)) = result {
1957            assert!(msg.contains("Subscription failed"));
1958            assert!(msg.contains("Failed to subscribe to extractor"));
1959        } else {
1960            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1961        }
1962
1963        timeout(Duration::from_millis(100), client.close())
1964            .await
1965            .expect("close timed out")
1966            .expect("close failed");
1967        jh.await
1968            .expect("ws loop errored")
1969            .unwrap();
1970        server_thread.await.unwrap();
1971    }
1972
1973    #[tokio::test]
1974    async fn test_cancel_pending_subscription() {
1975        // This test verifies that pending subscriptions are properly cancelled when errors occur
1976        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1977
1978        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1979
1980        let error_response = WebSocketMessage::Response(Response::Error(
1981            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1982        ));
1983        let error_json = serde_json::to_string(&error_response).unwrap();
1984
1985        let exp_comm = [
1986            ExpectedComm::Receive(
1987                100,
1988                tungstenite::protocol::Message::Text(
1989                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true}"#.to_string()
1990                ),
1991            ),
1992            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1993        ];
1994
1995        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1996
1997        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1998        let jh = client
1999            .connect()
2000            .await
2001            .expect("connect failed");
2002
2003        // Start two subscription attempts simultaneously
2004        let client_clone = client.clone();
2005        let extractor_id_clone = extractor_id.clone();
2006
2007        let subscription1 = tokio::spawn({
2008            let client_for_spawn = client.clone();
2009            async move {
2010                client_for_spawn
2011                    .subscribe(extractor_id, SubscriptionOptions::new())
2012                    .await
2013            }
2014        });
2015
2016        let subscription2 = tokio::spawn(async move {
2017            // This should fail because there's already a pending subscription
2018            client_clone
2019                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2020                .await
2021        });
2022
2023        let (result1, result2) = tokio::join!(subscription1, subscription2);
2024
2025        let result1 = result1.unwrap();
2026        let result2 = result2.unwrap();
2027
2028        // One should fail due to ExtractorNotFound error from server
2029        // The other should fail due to SubscriptionAlreadyPending
2030        assert!(result1.is_err() || result2.is_err());
2031
2032        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2033            // This is expected for the second subscription
2034        } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2035            // This is expected for the first subscription that gets the server error
2036        } else {
2037            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2038        }
2039
2040        timeout(Duration::from_millis(100), client.close())
2041            .await
2042            .expect("close timed out")
2043            .expect("close failed");
2044        jh.await
2045            .expect("ws loop errored")
2046            .unwrap();
2047        server_thread.await.unwrap();
2048    }
2049
2050    #[tokio::test]
2051    async fn test_force_unsubscribe_prevents_multiple_calls() {
2052        // Test that force_unsubscribe prevents sending duplicate unsubscribe commands
2053        // when called multiple times for the same subscription_id
2054
2055        let subscription_id = Uuid::new_v4();
2056
2057        let exp_comm = [
2058            ExpectedComm::Receive(
2059                100,
2060                tungstenite::protocol::Message::Text(
2061                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true}"#.to_string()
2062                ),
2063            ),
2064            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2065                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2066                subscription_id
2067            ))),
2068            // Expect only ONE unsubscribe message, even though force_unsubscribe is called twice
2069            ExpectedComm::Receive(
2070                100,
2071                tungstenite::protocol::Message::Text(format!(
2072                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2073                    subscription_id
2074                )),
2075            ),
2076            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2077                r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2078                subscription_id
2079            ))),
2080        ];
2081
2082        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2083
2084        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2085        let jh = client
2086            .connect()
2087            .await
2088            .expect("connect failed");
2089
2090        let (received_sub_id, _rx) = timeout(
2091            Duration::from_millis(100),
2092            client.subscribe(
2093                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2094                SubscriptionOptions::new(),
2095            ),
2096        )
2097        .await
2098        .expect("subscription timed out")
2099        .expect("subscription failed");
2100
2101        assert_eq!(received_sub_id, subscription_id);
2102
2103        // Access the inner state to call force_unsubscribe directly
2104        {
2105            let mut inner_guard = client.inner.lock().await;
2106            let inner = inner_guard
2107                .as_mut()
2108                .expect("client should be connected");
2109
2110            // Call force_unsubscribe twice - only the first should send an unsubscribe message
2111            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2112            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2113        }
2114
2115        // Give time for messages to be processed
2116        tokio::time::sleep(Duration::from_millis(50)).await;
2117
2118        // Close may fail if client disconnected after unsubscribe, which is fine
2119        let _ = timeout(Duration::from_millis(100), client.close()).await;
2120
2121        // Wait for tasks to complete
2122        let _ = jh.await;
2123        let _ = server_thread.await;
2124    }
2125}