tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62    BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65
66use crate::TYCHO_SERVER_VERSION;
67
68#[derive(Error, Debug)]
69pub enum DeltasError {
70    /// Failed to parse the provided URI.
71    #[error("Failed to parse URI: {0}. Error: {1}")]
72    UriParsing(String, String),
73
74    /// The requested subscription is already pending and is awaiting confirmation from the server.
75    #[error("The requested subscription is already pending")]
76    SubscriptionAlreadyPending,
77
78    #[error("The server replied with an error: {0}")]
79    ServerError(String, #[source] WebsocketError),
80
81    /// A message failed to send via an internal channel or through the websocket channel.
82    /// This is typically a fatal error and might indicate a bug in the implementation.
83    #[error("{0}")]
84    TransportError(String),
85
86    /// The internal message buffer is full. This likely means that messages are not being consumed
87    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
88    /// size.
89    #[error("The buffer is full!")]
90    BufferFull,
91
92    /// The client has no active connections but was accessed (e.g., by calling subscribe).
93    /// This typically occurs when trying to use the client before calling connect() or
94    /// after the connection has been closed.
95    #[error("The client is not connected!")]
96    NotConnected,
97
98    /// The connect method was called while the client already had an active connection.
99    #[error("The client is already connected!")]
100    AlreadyConnected,
101
102    /// The connection was closed orderly by the server, e.g. because it restarted.
103    #[error("The server closed the connection!")]
104    ConnectionClosed,
105
106    /// The connection was closed unexpectedly by the server or encountered a network error.
107    #[error("Connection error: {0}")]
108    ConnectionError(#[from] Box<tungstenite::Error>),
109
110    /// A fatal error occurred that cannot be recovered from.
111    #[error("Tycho FatalError: {0}")]
112    Fatal(String),
113}
114
115#[derive(Clone, Debug)]
116pub struct SubscriptionOptions {
117    include_state: bool,
118}
119
120impl Default for SubscriptionOptions {
121    fn default() -> Self {
122        Self { include_state: true }
123    }
124}
125
126impl SubscriptionOptions {
127    pub fn new() -> Self {
128        Self::default()
129    }
130    pub fn with_state(mut self, val: bool) -> Self {
131        self.include_state = val;
132        self
133    }
134}
135
136#[cfg_attr(test, automock)]
137#[async_trait]
138pub trait DeltasClient {
139    /// Subscribe to an extractor and receive realtime messages
140    ///
141    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
142    /// cancels while waiting for confirmation the subscription may still be registered. If the
143    /// receiver was deallocated though, the first message from the subscription will remove it
144    /// again - since there is no one to inform about these messages.
145    async fn subscribe(
146        &self,
147        extractor_id: ExtractorIdentity,
148        options: SubscriptionOptions,
149    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
150
151    /// Unsubscribe from an subscription
152    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
153
154    /// Start the clients message handling loop.
155    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
156
157    /// Close the clients message handling loop.
158    async fn close(&self) -> Result<(), DeltasError>;
159}
160
161#[derive(Clone)]
162pub struct WsDeltasClient {
163    /// The tycho indexer websocket uri.
164    uri: Uri,
165    /// Authorization key for the websocket connection.
166    auth_key: Option<String>,
167    /// Maximum amount of reconnects to try before giving up.
168    max_reconnects: u64,
169    /// Duration to wait before attempting to reconnect
170    retry_cooldown: Duration,
171    /// The client will buffer this many messages incoming from the websocket
172    /// before starting to drop them.
173    ws_buffer_size: usize,
174    /// The client will buffer that many messages for each subscription before it starts dropping
175    /// them.
176    subscription_buffer_size: usize,
177    /// Notify tasks waiting for a connection to be established.
178    conn_notify: Arc<Notify>,
179    /// Shared client instance state.
180    inner: Arc<Mutex<Option<Inner>>>,
181    /// If set the client has exhausted its reconnection attempts
182    dead: Arc<AtomicBool>,
183}
184
185type WebSocketSink =
186    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
187
188/// Subscription State
189///
190/// Subscription go through a lifecycle:
191///
192/// ```text
193/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
194/// ```
195///
196/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
197/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
198/// for `unsubscribe`.
199#[derive(Debug)]
200enum SubscriptionInfo {
201    /// Subscription was requested we wait for server confirmation and uuid assignment.
202    RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
203    /// Subscription is active.
204    Active,
205    /// Unsubscription was requested, we wait for server confirmation.
206    RequestedUnsubscription(oneshot::Sender<()>),
207}
208
209/// Internal struct containing shared state between of WsDeltaClient instances.
210struct Inner {
211    /// Websocket sender handle.
212    sink: WebSocketSink,
213    /// Command channel sender handle.
214    cmd_tx: Sender<()>,
215    /// Currently pending subscriptions.
216    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
217    /// Active subscriptions.
218    subscriptions: HashMap<Uuid, SubscriptionInfo>,
219    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
220    /// subscribe.
221    sender: HashMap<Uuid, Sender<BlockChanges>>,
222    /// How many messages to buffer per subscription before starting to drop new messages.
223    buffer_size: usize,
224}
225
226/// Shared state between all client instances.
227///
228/// This state is behind a mutex and requires synchronization to be read of modified.
229impl Inner {
230    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
231        Self {
232            sink,
233            cmd_tx,
234            pending: HashMap::new(),
235            subscriptions: HashMap::new(),
236            sender: HashMap::new(),
237            buffer_size,
238        }
239    }
240
241    /// Registers a new pending subscription.
242    #[allow(clippy::result_large_err)]
243    fn new_subscription(
244        &mut self,
245        id: &ExtractorIdentity,
246        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
247    ) -> Result<(), DeltasError> {
248        if self.pending.contains_key(id) {
249            return Err(DeltasError::SubscriptionAlreadyPending);
250        }
251        self.pending
252            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
253        Ok(())
254    }
255
256    /// Transitions a pending subscription to active.
257    ///
258    /// Will ignore any request to do so for subscriptions that are not pending.
259    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
260        if let Some(info) = self.pending.remove(extractor_id) {
261            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
262                let (tx, rx) = mpsc::channel(self.buffer_size);
263                self.sender.insert(subscription_id, tx);
264                self.subscriptions
265                    .insert(subscription_id, SubscriptionInfo::Active);
266                let _ = ready_tx
267                    .send(Ok((subscription_id, rx)))
268                    .map_err(|_| {
269                        warn!(
270                            ?extractor_id,
271                            ?subscription_id,
272                            "Subscriber for has gone away. Ignoring."
273                        )
274                    });
275            } else {
276                error!(
277                    ?extractor_id,
278                    ?subscription_id,
279                    "Pending subscription was not in the correct state to 
280                    transition to active. Ignoring!"
281                )
282            }
283        } else {
284            error!(
285                ?extractor_id,
286                ?subscription_id,
287                "Tried to mark an unknown subscription as active. Ignoring!"
288            );
289        }
290    }
291
292    /// Sends a message to a subscription's receiver.
293    #[allow(clippy::result_large_err)]
294    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
295        if let Some(sender) = self.sender.get_mut(id) {
296            sender
297                .try_send(msg)
298                .map_err(|e| match e {
299                    TrySendError::Full(_) => DeltasError::BufferFull,
300                    TrySendError::Closed(_) => {
301                        DeltasError::TransportError("The subscriber has gone away".to_string())
302                    }
303                })?;
304        }
305        Ok(())
306    }
307
308    /// Requests a subscription to end.
309    ///
310    /// The subscription needs to exist and be active for this to have any effect. Wll use
311    /// `ready_tx` to notify the receiver once the transition to ended completed.
312    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
313        if let Some(info) = self
314            .subscriptions
315            .get_mut(subscription_id)
316        {
317            if let SubscriptionInfo::Active = info {
318                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
319            }
320        } else {
321            // no big deal imo so only debug lvl...
322            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
323        }
324    }
325
326    /// Removes and fully ends a subscription
327    ///
328    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
329    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
330    /// Will remove a subscription even it was in active or pending state before, this is to support
331    /// any server side failure of the subscription.
332    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
333        if let Entry::Occupied(e) = self
334            .subscriptions
335            .entry(subscription_id)
336        {
337            let info = e.remove();
338            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
339                let _ = tx.send(()).map_err(|_| {
340                    debug!(?subscription_id, "failed to notify about removed subscription")
341                });
342                self.sender
343                    .remove(&subscription_id)
344                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
345            } else {
346                warn!(?subscription_id, "Subscription ended unexpectedly!");
347                self.sender
348                    .remove(&subscription_id)
349                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
350            }
351        } else {
352            // TODO: There is a race condition that can trigger multiple unsubscribes
353            //  if server doesn't respond quickly enough leading to some ugly logs but
354            //  doesn't affect behaviour negatively. E.g. BufferFull and multiple
355            //  messages from the ws connection are queued.
356            trace!(
357                ?subscription_id,
358                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
359            );
360        }
361
362        Ok(())
363    }
364
365    fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
366        if let Some(sub_info) = self.pending.remove(extractor_id) {
367            match sub_info {
368                SubscriptionInfo::RequestedSubscription(tx) => {
369                    let _ = tx
370                        .send(Err(DeltasError::ServerError(
371                            format!("Subscription failed: {error}"),
372                            error.clone(),
373                        )))
374                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
375                }
376                _ => {
377                    error!(?extractor_id, "Pending subscription in wrong state")
378                }
379            }
380        } else {
381            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
382        }
383    }
384
385    /// Sends a message through the websocket.
386    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
387        self.sink.send(msg).await.map_err(|e| {
388            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
389        })
390    }
391}
392
393/// Tycho client websocket implementation.
394impl WsDeltasClient {
395    // Construct a new client with 5 reconnection attempts.
396    #[allow(clippy::result_large_err)]
397    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
398        let uri = ws_uri
399            .parse::<Uri>()
400            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
401        Ok(Self {
402            uri,
403            auth_key: auth_key.map(|s| s.to_string()),
404            inner: Arc::new(Mutex::new(None)),
405            ws_buffer_size: 128,
406            subscription_buffer_size: 128,
407            conn_notify: Arc::new(Notify::new()),
408            max_reconnects: 5,
409            retry_cooldown: Duration::from_millis(500),
410            dead: Arc::new(AtomicBool::new(false)),
411        })
412    }
413
414    // Construct a new client with a custom number of reconnection attempts.
415    #[allow(clippy::result_large_err)]
416    pub fn new_with_reconnects(
417        ws_uri: &str,
418        auth_key: Option<&str>,
419        max_reconnects: u64,
420        retry_cooldown: Duration,
421    ) -> Result<Self, DeltasError> {
422        let uri = ws_uri
423            .parse::<Uri>()
424            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
425
426        Ok(Self {
427            uri,
428            auth_key: auth_key.map(|s| s.to_string()),
429            inner: Arc::new(Mutex::new(None)),
430            ws_buffer_size: 128,
431            subscription_buffer_size: 128,
432            conn_notify: Arc::new(Notify::new()),
433            max_reconnects,
434            retry_cooldown,
435            dead: Arc::new(AtomicBool::new(false)),
436        })
437    }
438
439    // Construct a new client with custom buffer sizes (for testing)
440    #[cfg(test)]
441    #[allow(clippy::result_large_err)]
442    pub fn new_with_custom_buffers(
443        ws_uri: &str,
444        auth_key: Option<&str>,
445        ws_buffer_size: usize,
446        subscription_buffer_size: usize,
447    ) -> Result<Self, DeltasError> {
448        let uri = ws_uri
449            .parse::<Uri>()
450            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
451        Ok(Self {
452            uri,
453            auth_key: auth_key.map(|s| s.to_string()),
454            inner: Arc::new(Mutex::new(None)),
455            ws_buffer_size,
456            subscription_buffer_size,
457            conn_notify: Arc::new(Notify::new()),
458            max_reconnects: 5,
459            retry_cooldown: Duration::from_millis(0),
460            dead: Arc::new(AtomicBool::new(false)),
461        })
462    }
463
464    /// Ensures that the client is connected.
465    ///
466    /// This method will acquire the lock for inner.
467    async fn is_connected(&self) -> bool {
468        let guard = self.inner.as_ref().lock().await;
469        guard.is_some()
470    }
471
472    /// Waits for the client to be connected
473    ///
474    /// This method acquires the lock for inner for a short period, then waits until the  
475    /// connection is established if not already connected.
476    async fn ensure_connection(&self) -> Result<(), DeltasError> {
477        if self.dead.load(Ordering::SeqCst) {
478            return Err(DeltasError::NotConnected)
479        };
480        if !self.is_connected().await {
481            self.conn_notify.notified().await;
482        };
483        Ok(())
484    }
485
486    /// Main message handling logic
487    ///
488    /// If the message returns an error, a reconnect attempt may be considered depending on the
489    /// error type.
490    #[instrument(skip(self, msg))]
491    async fn handle_msg(
492        &self,
493        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
494    ) -> Result<(), DeltasError> {
495        let mut guard = self.inner.lock().await;
496
497        match msg {
498            // We do not deserialize the message directly into a WebSocketMessage. This is because
499            // the serde arbitrary_precision feature (often included in many
500            // dependencies we use) breaks some untagged enum deserializations. Instead,
501            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
502            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
503                serde_json::Value,
504            >(&text)
505            {
506                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
507                    Ok(ws_message) => match ws_message {
508                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
509                            trace!(?deltas, "Received a block state change, sending to channel");
510                            let inner = guard
511                                .as_mut()
512                                .ok_or_else(|| DeltasError::NotConnected)?;
513                            match inner.send(&subscription_id, deltas) {
514                                Err(DeltasError::BufferFull) => {
515                                    error!(?subscription_id, "Buffer full, unsubscribing!");
516                                    Self::force_unsubscribe(subscription_id, inner).await;
517                                }
518                                Err(_) => {
519                                    warn!(
520                                        ?subscription_id,
521                                        "Receiver for has gone away, unsubscribing!"
522                                    );
523                                    Self::force_unsubscribe(subscription_id, inner).await;
524                                }
525                                _ => { /* Do nothing */ }
526                            }
527                        }
528                        WebSocketMessage::Response(Response::NewSubscription {
529                            extractor_id,
530                            subscription_id,
531                        }) => {
532                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
533                            let inner = guard
534                                .as_mut()
535                                .ok_or_else(|| DeltasError::NotConnected)?;
536                            inner.mark_active(&extractor_id, subscription_id);
537                        }
538                        WebSocketMessage::Response(Response::SubscriptionEnded {
539                            subscription_id,
540                        }) => {
541                            info!(?subscription_id, "Received a subscription ended");
542                            let inner = guard
543                                .as_mut()
544                                .ok_or_else(|| DeltasError::NotConnected)?;
545                            inner.remove_subscription(subscription_id)?;
546                        }
547                        WebSocketMessage::Response(Response::Error(error)) => match &error {
548                            WebsocketError::ExtractorNotFound(extractor_id) => {
549                                let inner = guard
550                                    .as_mut()
551                                    .ok_or_else(|| DeltasError::NotConnected)?;
552                                inner.cancel_pending(extractor_id, &error);
553                            }
554                            WebsocketError::SubscriptionNotFound(subscription_id) => {
555                                debug!("Received subscription not found, removing subscription");
556                                let inner = guard
557                                    .as_mut()
558                                    .ok_or_else(|| DeltasError::NotConnected)?;
559                                inner.remove_subscription(*subscription_id)?;
560                            }
561                            WebsocketError::ParseError(raw, e) => {
562                                return Err(DeltasError::ServerError(
563                                    format!(
564                                        "Server failed to parse client message: {e}, msg: {raw}"
565                                    ),
566                                    error.clone(),
567                                ))
568                            }
569                            WebsocketError::CompressionError(subscription_id, e) => {
570                                return Err(DeltasError::ServerError(
571                                    format!(
572                                        "Server failed to compress message for subscription: {subscription_id}, error: {e}"
573                                    ),
574                                    error.clone(),
575                                ))
576                            }
577                            WebsocketError::SubscribeError(extractor_id) => {
578                                let inner = guard
579                                    .as_mut()
580                                    .ok_or_else(|| DeltasError::NotConnected)?;
581                                inner.cancel_pending(extractor_id, &error);
582                            }
583                        },
584                    },
585                    Err(e) => {
586                        error!(
587                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
588                            e, text
589                        );
590                    }
591                },
592                Err(e) => {
593                    error!(
594                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
595                        e, text
596                    );
597                }
598            },
599            Ok(tungstenite::protocol::Message::Ping(_)) => {
600                // Respond to pings with pongs.
601                let inner = guard
602                    .as_mut()
603                    .ok_or_else(|| DeltasError::NotConnected)?;
604                if let Err(error) = inner
605                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
606                    .await
607                {
608                    debug!(?error, "Failed to send pong!");
609                }
610            }
611            Ok(tungstenite::protocol::Message::Pong(_)) => {
612                // Do nothing.
613            }
614            Ok(tungstenite::protocol::Message::Close(_)) => {
615                return Err(DeltasError::ConnectionClosed);
616            }
617            Ok(unknown_msg) => {
618                info!("Received an unknown message type: {:?}", unknown_msg);
619            }
620            Err(error) => {
621                error!(?error, "Websocket error");
622                return Err(match error {
623                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
624                    tungstenite::Error::AlreadyClosed => {
625                        warn!("Received AlreadyClosed error which is indicative of a bug!");
626                        DeltasError::ConnectionError(Box::new(error))
627                    }
628                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
629                        DeltasError::ConnectionError(Box::new(error))
630                    }
631                    _ => DeltasError::Fatal(error.to_string()),
632                });
633            }
634        };
635        Ok(())
636    }
637
638    /// Forcefully ends a (client) stream by unsubscribing.
639    ///
640    /// Is used only if the message can't be processed due to an error that might resolve
641    /// itself by resubscribing.
642    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
643        // avoid unsubscribing multiple times
644        if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
645            .subscriptions
646            .get(&subscription_id)
647        {
648            return
649        }
650
651        let (tx, rx) = oneshot::channel();
652        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
653            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
654        } else {
655            // Wait for unsubscribe completion with timeout
656            match tokio::time::timeout(Duration::from_secs(5), rx).await {
657                Ok(_) => {
658                    debug!(?subscription_id, "Unsubscribe completed successfully");
659                }
660                Err(_) => {
661                    warn!(?subscription_id, "Unsubscribe completion timed out");
662                }
663            }
664        }
665    }
666
667    /// Helper method to force an unsubscription
668    ///
669    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
670    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
671    /// receivers.
672    async fn unsubscribe_inner(
673        inner: &mut Inner,
674        subscription_id: Uuid,
675        ready_tx: oneshot::Sender<()>,
676    ) -> Result<(), DeltasError> {
677        debug!(?subscription_id, "Unsubscribing");
678        inner.end_subscription(&subscription_id, ready_tx);
679        let cmd = Command::Unsubscribe { subscription_id };
680        inner
681            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
682                |e| {
683                    DeltasError::TransportError(format!(
684                        "Failed to serialize unsubscribe command: {e}"
685                    ))
686                },
687            )?))
688            .await?;
689        Ok(())
690    }
691}
692
693#[async_trait]
694impl DeltasClient for WsDeltasClient {
695    #[instrument(skip(self))]
696    async fn subscribe(
697        &self,
698        extractor_id: ExtractorIdentity,
699        options: SubscriptionOptions,
700    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
701        trace!("Starting subscribe");
702        self.ensure_connection().await?;
703        let (ready_tx, ready_rx) = oneshot::channel();
704        {
705            let mut guard = self.inner.lock().await;
706            let inner = guard
707                .as_mut()
708                .ok_or_else(|| DeltasError::NotConnected)?;
709            trace!("Sending subscribe command");
710            inner.new_subscription(&extractor_id, ready_tx)?;
711            let cmd = Command::Subscribe {
712                extractor_id,
713                include_state: options.include_state,
714                compression: false,
715            };
716            inner
717                .ws_send(tungstenite::protocol::Message::Text(
718                    serde_json::to_string(&cmd).map_err(|e| {
719                        DeltasError::TransportError(format!(
720                            "Failed to serialize subscribe command: {e}"
721                        ))
722                    })?,
723                ))
724                .await?;
725        }
726        trace!("Waiting for subscription response");
727        let res = ready_rx.await.map_err(|_| {
728            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
729        })??;
730        trace!("Subscription successful");
731        Ok(res)
732    }
733
734    #[instrument(skip(self))]
735    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
736        self.ensure_connection().await?;
737        let (ready_tx, ready_rx) = oneshot::channel();
738        {
739            let mut guard = self.inner.lock().await;
740            let inner = guard
741                .as_mut()
742                .ok_or_else(|| DeltasError::NotConnected)?;
743
744            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
745        }
746        ready_rx.await.map_err(|_| {
747            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
748        })?;
749
750        Ok(())
751    }
752
753    #[instrument(skip(self))]
754    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
755        if self.is_connected().await {
756            return Err(DeltasError::AlreadyConnected);
757        }
758        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
759        info!(?ws_uri, "Starting TychoWebsocketClient");
760
761        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
762        {
763            let mut guard = self.inner.as_ref().lock().await;
764            *guard = None;
765        }
766        let this = self.clone();
767        let jh = tokio::spawn(async move {
768            let mut retry_count = 0;
769            let mut result = Err(DeltasError::NotConnected);
770
771            'retry: while retry_count < this.max_reconnects {
772                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
773                if retry_count > 0 {
774                    sleep(this.retry_cooldown).await;
775                }
776
777                // Create a WebSocket request
778                let mut request_builder = Request::builder()
779                    .uri(&ws_uri)
780                    .header(SEC_WEBSOCKET_KEY, generate_key())
781                    .header(SEC_WEBSOCKET_VERSION, 13)
782                    .header(CONNECTION, "Upgrade")
783                    .header(UPGRADE, "websocket")
784                    .header(
785                        HOST,
786                        this.uri.host().ok_or_else(|| {
787                            DeltasError::UriParsing(
788                                ws_uri.clone(),
789                                "No host found in tycho url".to_string(),
790                            )
791                        })?,
792                    )
793                    .header(
794                        USER_AGENT,
795                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
796                    );
797
798                // Add Authorization if one is given
799                if let Some(ref key) = this.auth_key {
800                    request_builder = request_builder.header(AUTHORIZATION, key);
801                }
802
803                let request = request_builder.body(()).map_err(|e| {
804                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
805                })?;
806                let (conn, _) = match connect_async(request).await {
807                    Ok(conn) => conn,
808                    Err(e) => {
809                        // Prepare for reconnection
810                        retry_count += 1;
811                        let mut guard = this.inner.as_ref().lock().await;
812                        *guard = None;
813
814                        warn!(
815                            e = e.to_string(),
816                            "Failed to connect to WebSocket server; Reconnecting"
817                        );
818                        continue 'retry;
819                    }
820                };
821
822                let (ws_tx_new, ws_rx_new) = conn.split();
823                {
824                    let mut guard = this.inner.as_ref().lock().await;
825                    *guard =
826                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
827                }
828                let mut msg_rx = ws_rx_new.boxed();
829
830                info!("Connection Successful: TychoWebsocketClient started");
831                this.conn_notify.notify_waiters();
832                result = Ok(());
833
834                loop {
835                    let res = tokio::select! {
836                        msg = msg_rx.next() => match msg {
837                            Some(msg) => this.handle_msg(msg).await,
838                            None => {
839                                // This code should not be reachable since the stream
840                                // should return ConnectionClosed in the case above
841                                // before it returns None here.
842                                warn!("Websocket connection silently closed, giving up!");
843                                break 'retry
844                            }
845                        },
846                        _ = cmd_rx.recv() => {break 'retry},
847                    };
848                    if let Err(error) = res {
849                        debug!(?error, "WsError");
850                        if matches!(
851                            error,
852                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
853                        ) {
854                            // Prepare for reconnection
855                            retry_count += 1;
856                            let mut guard = this.inner.as_ref().lock().await;
857                            *guard = None;
858
859                            warn!(
860                                ?error,
861                                ?retry_count,
862                                "Connection dropped unexpectedly; Reconnecting..."
863                            );
864                            break;
865                        } else {
866                            // Other errors are considered fatal
867                            error!(?error, "Fatal error; Exiting");
868                            result = Err(error);
869                            break 'retry;
870                        }
871                    }
872                }
873            }
874            debug!(
875                retry_count,
876                max_reconnects=?this.max_reconnects,
877                "Reconnection loop ended"
878            );
879            // Clean up before exiting
880            let mut guard = this.inner.as_ref().lock().await;
881            *guard = None;
882
883            // Check if max retries has been reached.
884            if retry_count >= this.max_reconnects {
885                error!("Max reconnection attempts reached; Exiting");
886                this.dead.store(true, Ordering::SeqCst);
887                this.conn_notify.notify_waiters(); // Notify that the task is done
888                result = Err(DeltasError::ConnectionClosed);
889            }
890
891            result
892        });
893
894        self.conn_notify.notified().await;
895
896        if self.is_connected().await {
897            Ok(jh)
898        } else {
899            Err(DeltasError::NotConnected)
900        }
901    }
902
903    #[instrument(skip(self))]
904    async fn close(&self) -> Result<(), DeltasError> {
905        info!("Closing TychoWebsocketClient");
906        let mut guard = self.inner.lock().await;
907        let inner = guard
908            .as_mut()
909            .ok_or_else(|| DeltasError::NotConnected)?;
910        inner
911            .cmd_tx
912            .send(())
913            .await
914            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
915        Ok(())
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use std::net::SocketAddr;
922
923    use test_log::test;
924    use tokio::{net::TcpListener, time::timeout};
925    use tycho_common::dto::Chain;
926
927    use super::*;
928
929    #[derive(Clone)]
930    enum ExpectedComm {
931        Receive(u64, tungstenite::protocol::Message),
932        Send(tungstenite::protocol::Message),
933    }
934
935    async fn mock_tycho_ws(
936        messages: &[ExpectedComm],
937        reconnects: usize,
938    ) -> (SocketAddr, JoinHandle<()>) {
939        info!("Starting mock webserver");
940        // zero port here means the OS chooses an open port
941        let server = TcpListener::bind("127.0.0.1:0")
942            .await
943            .expect("localhost bind failed");
944        let addr = server.local_addr().unwrap();
945        let messages = messages.to_vec();
946
947        let jh = tokio::spawn(async move {
948            info!("mock webserver started");
949            for _ in 0..(reconnects + 1) {
950                info!("Awaiting client connections");
951                if let Ok((stream, _)) = server.accept().await {
952                    info!("Client connected");
953                    let mut websocket = tokio_tungstenite::accept_async(stream)
954                        .await
955                        .unwrap();
956
957                    info!("Handling messages..");
958                    for c in messages.iter().cloned() {
959                        match c {
960                            ExpectedComm::Receive(t, exp) => {
961                                info!("Awaiting message...");
962                                let msg = timeout(Duration::from_millis(t), websocket.next())
963                                    .await
964                                    .expect("Receive timeout")
965                                    .expect("Stream exhausted")
966                                    .expect("Failed to receive message.");
967                                info!("Message received");
968                                assert_eq!(msg, exp)
969                            }
970                            ExpectedComm::Send(data) => {
971                                info!("Sending message");
972                                websocket
973                                    .send(data)
974                                    .await
975                                    .expect("Failed to send message");
976                                info!("Message sent");
977                            }
978                        };
979                    }
980                    info!("Mock communication completed");
981                    sleep(Duration::from_millis(100)).await;
982                    // Close the WebSocket connection
983                    let _ = websocket.close(None).await;
984                    info!("Mock server closed connection");
985                }
986            }
987            info!("mock server ended");
988        });
989        (addr, jh)
990    }
991
992    #[tokio::test]
993    async fn test_subscribe_receive() {
994        let exp_comm = [
995            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
996                {
997                    "method":"subscribe",
998                    "extractor_id":{
999                        "chain":"ethereum",
1000                        "name":"vm:ambient"
1001                    },
1002                    "include_state": true,
1003                    "compression": false
1004                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1005            )),
1006            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1007                {
1008                    "method":"newsubscription",
1009                    "extractor_id":{
1010                    "chain":"ethereum",
1011                    "name":"vm:ambient"
1012                    },
1013                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1014                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1015            )),
1016            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1017                {
1018                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1019                    "deltas": {
1020                        "extractor": "vm:ambient",
1021                        "chain": "ethereum",
1022                        "block": {
1023                            "number": 123,
1024                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1025                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1026                            "chain": "ethereum",             
1027                            "ts": "2023-09-14T00:00:00"
1028                        },
1029                        "finalized_block_height": 0,
1030                        "revert": false,
1031                        "new_tokens": {},
1032                        "account_updates": {
1033                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1034                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1035                                "chain": "ethereum",
1036                                "slots": {},
1037                                "balance": "0x01f4",
1038                                "code": "",
1039                                "change": "Update"
1040                            }
1041                        },
1042                        "state_updates": {
1043                            "component_1": {
1044                                "component_id": "component_1",
1045                                "updated_attributes": {"attr1": "0x01"},
1046                                "deleted_attributes": ["attr2"]
1047                            }
1048                        },
1049                        "new_protocol_components": 
1050                            { "protocol_1": {
1051                                    "id": "protocol_1",
1052                                    "protocol_system": "system_1",
1053                                    "protocol_type_name": "type_1",
1054                                    "chain": "ethereum",
1055                                    "tokens": ["0x01", "0x02"],
1056                                    "contract_ids": ["0x01", "0x02"],
1057                                    "static_attributes": {"attr1": "0x01f4"},
1058                                    "change": "Update",
1059                                    "creation_tx": "0x01",
1060                                    "created_at": "2023-09-14T00:00:00"
1061                                }
1062                            },
1063                        "deleted_protocol_components": {},
1064                        "component_balances": {
1065                            "protocol_1":
1066                                {
1067                                    "0x01": {
1068                                        "token": "0x01",
1069                                        "balance": "0x01f4",
1070                                        "balance_float": 0.0,
1071                                        "modify_tx": "0x01",
1072                                        "component_id": "protocol_1"
1073                                    }
1074                                }
1075                        },
1076                        "account_balances": {
1077                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1078                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1079                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1080                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1081                                    "balance": "0x01f4",
1082                                    "modify_tx": "0x01"
1083                                }
1084                            }
1085                        },
1086                        "component_tvl": {
1087                            "protocol_1": 1000.0
1088                        },
1089                        "dci_update": {
1090                            "new_entrypoints": {},
1091                            "new_entrypoint_params": {},
1092                            "trace_results": {}
1093                        }
1094                    }
1095                }
1096                "#.to_owned()
1097            ))
1098        ];
1099        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1100
1101        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1102        let jh = client
1103            .connect()
1104            .await
1105            .expect("connect failed");
1106        let (_, mut rx) = timeout(
1107            Duration::from_millis(100),
1108            client.subscribe(
1109                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1110                SubscriptionOptions::new(),
1111            ),
1112        )
1113        .await
1114        .expect("subscription timed out")
1115        .expect("subscription failed");
1116        let _ = timeout(Duration::from_millis(100), rx.recv())
1117            .await
1118            .expect("awaiting message timeout out")
1119            .expect("receiving message failed");
1120        timeout(Duration::from_millis(100), client.close())
1121            .await
1122            .expect("close timed out")
1123            .expect("close failed");
1124        jh.await
1125            .expect("ws loop errored")
1126            .unwrap();
1127        server_thread.await.unwrap();
1128    }
1129
1130    #[tokio::test]
1131    async fn test_unsubscribe() {
1132        let exp_comm = [
1133            ExpectedComm::Receive(
1134                100,
1135                tungstenite::protocol::Message::Text(
1136                    r#"
1137                {
1138                    "method": "subscribe",
1139                    "extractor_id":{
1140                        "chain": "ethereum",
1141                        "name": "vm:ambient"
1142                    },
1143                    "include_state": true,
1144                    "compression": false
1145                }"#
1146                    .to_owned()
1147                    .replace(|c: char| c.is_whitespace(), ""),
1148                ),
1149            ),
1150            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1151                r#"
1152                {
1153                    "method": "newsubscription",
1154                    "extractor_id":{
1155                        "chain": "ethereum",
1156                        "name": "vm:ambient"
1157                    },
1158                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1159                }"#
1160                .to_owned()
1161                .replace(|c: char| c.is_whitespace(), ""),
1162            )),
1163            ExpectedComm::Receive(
1164                100,
1165                tungstenite::protocol::Message::Text(
1166                    r#"
1167                {
1168                    "method": "unsubscribe",
1169                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1170                }
1171                "#
1172                    .to_owned()
1173                    .replace(|c: char| c.is_whitespace(), ""),
1174                ),
1175            ),
1176            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1177                r#"
1178                {
1179                    "method": "subscriptionended",
1180                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1181                }
1182                "#
1183                .to_owned()
1184                .replace(|c: char| c.is_whitespace(), ""),
1185            )),
1186        ];
1187        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1188
1189        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1190        let jh = client
1191            .connect()
1192            .await
1193            .expect("connect failed");
1194        let (sub_id, mut rx) = timeout(
1195            Duration::from_millis(100),
1196            client.subscribe(
1197                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1198                SubscriptionOptions::new(),
1199            ),
1200        )
1201        .await
1202        .expect("subscription timed out")
1203        .expect("subscription failed");
1204
1205        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1206            .await
1207            .expect("unsubscribe timed out")
1208            .expect("unsubscribe failed");
1209        let res = timeout(Duration::from_millis(100), rx.recv())
1210            .await
1211            .expect("awaiting message timeout out");
1212
1213        // If the subscription ended, the channel should have been closed.
1214        assert!(res.is_none());
1215
1216        timeout(Duration::from_millis(100), client.close())
1217            .await
1218            .expect("close timed out")
1219            .expect("close failed");
1220        jh.await
1221            .expect("ws loop errored")
1222            .unwrap();
1223        server_thread.await.unwrap();
1224    }
1225
1226    #[tokio::test]
1227    async fn test_subscription_unexpected_end() {
1228        let exp_comm = [
1229            ExpectedComm::Receive(
1230                100,
1231                tungstenite::protocol::Message::Text(
1232                    r#"
1233                {
1234                    "method":"subscribe",
1235                    "extractor_id":{
1236                        "chain":"ethereum",
1237                        "name":"vm:ambient"
1238                    },
1239                    "include_state": true,
1240                    "compression": false
1241                }"#
1242                    .to_owned()
1243                    .replace(|c: char| c.is_whitespace(), ""),
1244                ),
1245            ),
1246            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1247                r#"
1248                {
1249                    "method":"newsubscription",
1250                    "extractor_id":{
1251                        "chain":"ethereum",
1252                        "name":"vm:ambient"
1253                    },
1254                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1255                }"#
1256                .to_owned()
1257                .replace(|c: char| c.is_whitespace(), ""),
1258            )),
1259            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1260                r#"
1261                {
1262                    "method": "subscriptionended",
1263                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1264                }"#
1265                .to_owned()
1266                .replace(|c: char| c.is_whitespace(), ""),
1267            )),
1268        ];
1269        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1270
1271        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1272        let jh = client
1273            .connect()
1274            .await
1275            .expect("connect failed");
1276        let (_, mut rx) = timeout(
1277            Duration::from_millis(100),
1278            client.subscribe(
1279                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1280                SubscriptionOptions::new(),
1281            ),
1282        )
1283        .await
1284        .expect("subscription timed out")
1285        .expect("subscription failed");
1286        let res = timeout(Duration::from_millis(100), rx.recv())
1287            .await
1288            .expect("awaiting message timeout out");
1289
1290        // If the subscription ended, the channel should have been closed.
1291        assert!(res.is_none());
1292
1293        timeout(Duration::from_millis(100), client.close())
1294            .await
1295            .expect("close timed out")
1296            .expect("close failed");
1297        jh.await
1298            .expect("ws loop errored")
1299            .unwrap();
1300        server_thread.await.unwrap();
1301    }
1302
1303    #[test_log::test(tokio::test)]
1304    async fn test_reconnect() {
1305        let exp_comm = [
1306            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1307                {
1308                    "method":"subscribe",
1309                    "extractor_id":{
1310                        "chain":"ethereum",
1311                        "name":"vm:ambient"
1312                    },
1313                    "include_state": true,
1314                    "compression": false
1315                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1316            )),
1317            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1318                {
1319                    "method":"newsubscription",
1320                    "extractor_id":{
1321                    "chain":"ethereum",
1322                    "name":"vm:ambient"
1323                    },
1324                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1325                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1326            )),
1327            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1328                {
1329                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1330                    "deltas": {
1331                        "extractor": "vm:ambient",
1332                        "chain": "ethereum",
1333                        "block": {
1334                            "number": 123,
1335                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1336                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1337                            "chain": "ethereum",             
1338                            "ts": "2023-09-14T00:00:00"
1339                        },
1340                        "finalized_block_height": 0,
1341                        "revert": false,
1342                        "new_tokens": {},
1343                        "account_updates": {
1344                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1345                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1346                                "chain": "ethereum",
1347                                "slots": {},
1348                                "balance": "0x01f4",
1349                                "code": "",
1350                                "change": "Update"
1351                            }
1352                        },
1353                        "state_updates": {
1354                            "component_1": {
1355                                "component_id": "component_1",
1356                                "updated_attributes": {"attr1": "0x01"},
1357                                "deleted_attributes": ["attr2"]
1358                            }
1359                        },
1360                        "new_protocol_components": {
1361                            "protocol_1":
1362                                {
1363                                    "id": "protocol_1",
1364                                    "protocol_system": "system_1",
1365                                    "protocol_type_name": "type_1",
1366                                    "chain": "ethereum",
1367                                    "tokens": ["0x01", "0x02"],
1368                                    "contract_ids": ["0x01", "0x02"],
1369                                    "static_attributes": {"attr1": "0x01f4"},
1370                                    "change": "Update",
1371                                    "creation_tx": "0x01",
1372                                    "created_at": "2023-09-14T00:00:00"
1373                                }
1374                            },
1375                        "deleted_protocol_components": {},
1376                        "component_balances": {
1377                            "protocol_1": {
1378                                "0x01": {
1379                                    "token": "0x01",
1380                                    "balance": "0x01f4",
1381                                    "balance_float": 1000.0,
1382                                    "modify_tx": "0x01",
1383                                    "component_id": "protocol_1"
1384                                }
1385                            }
1386                        },
1387                        "account_balances": {
1388                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1389                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1390                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1391                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1392                                    "balance": "0x01f4",
1393                                    "modify_tx": "0x01"
1394                                }
1395                            }
1396                        },
1397                        "component_tvl": {
1398                            "protocol_1": 1000.0
1399                        },
1400                        "dci_update": {
1401                            "new_entrypoints": {},
1402                            "new_entrypoint_params": {},
1403                            "trace_results": {}
1404                        }
1405                    }
1406                }
1407                "#.to_owned()
1408            ))
1409        ];
1410        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1411        let client = WsDeltasClient::new_with_reconnects(
1412            &format!("ws://{addr}"),
1413            None,
1414            3,
1415            // server stays down for 100ms on connection drop
1416            Duration::from_millis(110),
1417        )
1418        .unwrap();
1419
1420        let jh: JoinHandle<Result<(), DeltasError>> = client
1421            .connect()
1422            .await
1423            .expect("connect failed");
1424
1425        for _ in 0..2 {
1426            dbg!("loop");
1427            let (_, mut rx) = timeout(
1428                Duration::from_millis(200),
1429                client.subscribe(
1430                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1431                    SubscriptionOptions::new(),
1432                ),
1433            )
1434            .await
1435            .expect("subscription timed out")
1436            .expect("subscription failed");
1437
1438            let _ = timeout(Duration::from_millis(100), rx.recv())
1439                .await
1440                .expect("awaiting message timeout out")
1441                .expect("receiving message failed");
1442
1443            // wait for the connection to drop
1444            let res = timeout(Duration::from_millis(200), rx.recv())
1445                .await
1446                .expect("awaiting closed connection timeout out");
1447            assert!(res.is_none());
1448        }
1449        let res = jh.await.expect("ws client join failed");
1450        // 5th client reconnect attempt should fail
1451        assert!(res.is_err());
1452        server_thread
1453            .await
1454            .expect("ws server loop errored");
1455    }
1456
1457    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1458        let server = TcpListener::bind("127.0.0.1:0")
1459            .await
1460            .expect("localhost bind failed");
1461        let addr = server.local_addr().unwrap();
1462        let jh = tokio::spawn(async move {
1463            while let Ok((stream, _)) = server.accept().await {
1464                if accept_first {
1465                    // Send connection handshake to accept the connection (fail later)
1466                    let stream = tokio_tungstenite::accept_async(stream)
1467                        .await
1468                        .unwrap();
1469                    sleep(Duration::from_millis(10)).await;
1470                    drop(stream)
1471                } else {
1472                    // Close the connection to simulate a failure
1473                    drop(stream);
1474                }
1475            }
1476        });
1477        (addr, jh)
1478    }
1479
1480    #[test(tokio::test)]
1481    async fn test_subscribe_dead_client_after_max_attempts() {
1482        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1483        let client = WsDeltasClient::new_with_reconnects(
1484            &format!("ws://{addr}"),
1485            None,
1486            3,
1487            Duration::from_secs(0),
1488        )
1489        .unwrap();
1490
1491        let join_handle = client.connect().await.unwrap();
1492        let handle_res = join_handle.await.unwrap();
1493        assert!(handle_res.is_err());
1494        assert!(!client.is_connected().await);
1495
1496        let subscription_res = timeout(
1497            Duration::from_millis(10),
1498            client.subscribe(
1499                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1500                SubscriptionOptions::new(),
1501            ),
1502        )
1503        .await
1504        .unwrap();
1505        assert!(subscription_res.is_err());
1506    }
1507
1508    #[test(tokio::test)]
1509    async fn test_ws_client_retry_cooldown() {
1510        let start = std::time::Instant::now();
1511        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1512
1513        // Use the mock server that immediately drops connections
1514        let client = WsDeltasClient::new_with_reconnects(
1515            &format!("ws://{addr}"),
1516            None,
1517            3,                         // 3 attempts total (so 2 retries with cooldowns)
1518            Duration::from_millis(50), // 50ms cooldown
1519        )
1520        .unwrap();
1521
1522        // Try to connect - this should fail after retries but still measure the time
1523        let connect_result = client.connect().await;
1524        let elapsed = start.elapsed();
1525
1526        // Connection should fail after exhausting retries
1527        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1528
1529        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1530        assert!(
1531            elapsed >= Duration::from_millis(100),
1532            "Expected at least 100ms elapsed, got {:?}",
1533            elapsed
1534        );
1535
1536        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1537        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1538    }
1539
1540    #[test_log::test(tokio::test)]
1541    async fn test_buffer_full_triggers_unsubscribe() {
1542        // Expected communication sequence for buffer full scenario
1543        let exp_comm = {
1544            [
1545            // 1. Client subscribes
1546            ExpectedComm::Receive(
1547                100,
1548                tungstenite::protocol::Message::Text(
1549                    r#"
1550                {
1551                    "method":"subscribe",
1552                    "extractor_id":{
1553                        "chain":"ethereum",
1554                        "name":"vm:ambient"
1555                    },
1556                    "include_state": true,
1557                    "compression": false
1558                }"#
1559                    .to_owned()
1560                    .replace(|c: char| c.is_whitespace(), ""),
1561                ),
1562            ),
1563            // 2. Server confirms subscription
1564            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1565                r#"
1566                {
1567                    "method":"newsubscription",
1568                    "extractor_id":{
1569                        "chain":"ethereum",
1570                        "name":"vm:ambient"
1571                    },
1572                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1573                }"#
1574                .to_owned()
1575                .replace(|c: char| c.is_whitespace(), ""),
1576            )),
1577            // 3. Server sends first message (fills buffer)
1578            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1579                r#"
1580                {
1581                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1582                    "deltas": {
1583                        "extractor": "vm:ambient",
1584                        "chain": "ethereum",
1585                        "block": {
1586                            "number": 123,
1587                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1588                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1589                            "chain": "ethereum",
1590                            "ts": "2023-09-14T00:00:00"
1591                        },
1592                        "finalized_block_height": 0,
1593                        "revert": false,
1594                        "new_tokens": {},
1595                        "account_updates": {},
1596                        "state_updates": {},
1597                        "new_protocol_components": {},
1598                        "deleted_protocol_components": {},
1599                        "component_balances": {},
1600                        "account_balances": {},
1601                        "component_tvl": {},
1602                        "dci_update": {
1603                            "new_entrypoints": {},
1604                            "new_entrypoint_params": {},
1605                            "trace_results": {}
1606                        }
1607                    }
1608                }
1609                "#.to_owned()
1610            )),
1611            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1612            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1613                r#"
1614                {
1615                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1616                    "deltas": {
1617                        "extractor": "vm:ambient",
1618                        "chain": "ethereum",
1619                        "block": {
1620                            "number": 124,
1621                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1622                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1623                            "chain": "ethereum",
1624                            "ts": "2023-09-14T00:00:01"
1625                        },
1626                        "finalized_block_height": 0,
1627                        "revert": false,
1628                        "new_tokens": {},
1629                        "account_updates": {},
1630                        "state_updates": {},
1631                        "new_protocol_components": {},
1632                        "deleted_protocol_components": {},
1633                        "component_balances": {},
1634                        "account_balances": {},
1635                        "component_tvl": {},
1636                        "dci_update": {
1637                            "new_entrypoints": {},
1638                            "new_entrypoint_params": {},
1639                            "trace_results": {}
1640                        }
1641                    }
1642                }
1643                "#.to_owned()
1644            )),
1645            // 5. Expect unsubscribe command due to buffer full
1646            ExpectedComm::Receive(
1647                100,
1648                tungstenite::protocol::Message::Text(
1649                    r#"
1650                {
1651                    "method": "unsubscribe",
1652                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1653                }
1654                "#
1655                    .to_owned()
1656                    .replace(|c: char| c.is_whitespace(), ""),
1657                ),
1658            ),
1659            // 6. Server confirms unsubscription
1660            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1661                r#"
1662                {
1663                    "method": "subscriptionended",
1664                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1665                }
1666                "#
1667                .to_owned()
1668                .replace(|c: char| c.is_whitespace(), ""),
1669            )),
1670        ]
1671        };
1672
1673        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1674
1675        // Create client with very small buffer size (1) to easily trigger BufferFull
1676        let client = WsDeltasClient::new_with_custom_buffers(
1677            &format!("ws://{addr}"),
1678            None,
1679            128, // ws_buffer_size
1680            1,   // subscription_buffer_size - this will trigger BufferFull easily
1681        )
1682        .unwrap();
1683
1684        let jh = client
1685            .connect()
1686            .await
1687            .expect("connect failed");
1688
1689        let (_sub_id, mut rx) = timeout(
1690            Duration::from_millis(100),
1691            client.subscribe(
1692                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1693                SubscriptionOptions::new(),
1694            ),
1695        )
1696        .await
1697        .expect("subscription timed out")
1698        .expect("subscription failed");
1699
1700        // Allow time for messages to be processed and buffer to fill up
1701        tokio::time::sleep(Duration::from_millis(100)).await;
1702
1703        // Collect all messages until channel closes or we get a reasonable number
1704        let mut received_msgs = Vec::new();
1705
1706        // Use a single longer timeout to collect messages until channel closes
1707        while received_msgs.len() < 3 {
1708            match timeout(Duration::from_millis(200), rx.recv()).await {
1709                Ok(Some(msg)) => {
1710                    received_msgs.push(msg);
1711                }
1712                Ok(None) => {
1713                    // Channel closed - this is what we expect after buffer overflow
1714                    break;
1715                }
1716                Err(_) => {
1717                    // Timeout - no more messages coming
1718                    break;
1719                }
1720            }
1721        }
1722
1723        // Verify the key behavior: buffer overflow should limit messages and close channel
1724        assert!(
1725            received_msgs.len() <= 1,
1726            "Expected buffer overflow to limit messages to at most 1, got {}",
1727            received_msgs.len()
1728        );
1729
1730        if let Some(first_msg) = received_msgs.first() {
1731            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1732        }
1733
1734        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1735        // been verified We don't need to explicitly close the client as it will be cleaned
1736        // up when dropped
1737
1738        // Just wait for the tasks to finish cleanly
1739        drop(rx); // Explicitly drop the receiver
1740        tokio::time::sleep(Duration::from_millis(50)).await;
1741
1742        // Abort the tasks to clean up
1743        jh.abort();
1744        server_thread.abort();
1745
1746        let _ = jh.await;
1747        let _ = server_thread.await;
1748    }
1749
1750    #[tokio::test]
1751    async fn test_server_error_handling() {
1752        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1753
1754        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1755
1756        // Test ExtractorNotFound error
1757        let error_response = WebSocketMessage::Response(Response::Error(
1758            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1759        ));
1760        let error_json = serde_json::to_string(&error_response).unwrap();
1761
1762        let exp_comm = [
1763            ExpectedComm::Receive(
1764                100,
1765                tungstenite::protocol::Message::Text(
1766                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1767                ),
1768            ),
1769            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1770        ];
1771
1772        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1773
1774        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1775        let jh = client
1776            .connect()
1777            .await
1778            .expect("connect failed");
1779
1780        let result = timeout(
1781            Duration::from_millis(100),
1782            client.subscribe(extractor_id, SubscriptionOptions::new()),
1783        )
1784        .await
1785        .expect("subscription timed out");
1786
1787        // Verify that we get a ServerError
1788        assert!(result.is_err());
1789        if let Err(DeltasError::ServerError(msg, _)) = result {
1790            assert!(msg.contains("Subscription failed"));
1791            assert!(msg.contains("Extractor not found"));
1792        } else {
1793            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1794        }
1795
1796        timeout(Duration::from_millis(100), client.close())
1797            .await
1798            .expect("close timed out")
1799            .expect("close failed");
1800        jh.await
1801            .expect("ws loop errored")
1802            .unwrap();
1803        server_thread.await.unwrap();
1804    }
1805
1806    #[test_log::test(tokio::test)]
1807    async fn test_subscription_not_found_error() {
1808        // Test scenario: Server restart causes subscription loss
1809        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1810
1811        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1812        let subscription_id = Uuid::new_v4();
1813
1814        let error_response = WebSocketMessage::Response(Response::Error(
1815            WebsocketError::SubscriptionNotFound(subscription_id),
1816        ));
1817        let error_json = serde_json::to_string(&error_response).unwrap();
1818
1819        let exp_comm = [
1820            // 1. Client subscribes successfully
1821            ExpectedComm::Receive(
1822                100,
1823                tungstenite::protocol::Message::Text(
1824                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1825                ),
1826            ),
1827            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1828                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1829                subscription_id
1830            ))),
1831            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1832            ExpectedComm::Receive(
1833                100,
1834                tungstenite::protocol::Message::Text(format!(
1835                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1836                    subscription_id
1837                )),
1838            ),
1839            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1840            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1841        ];
1842
1843        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1844
1845        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1846        let jh = client
1847            .connect()
1848            .await
1849            .expect("connect failed");
1850
1851        // Subscribe successfully
1852        let (received_sub_id, _rx) = timeout(
1853            Duration::from_millis(100),
1854            client.subscribe(extractor_id, SubscriptionOptions::new()),
1855        )
1856        .await
1857        .expect("subscription timed out")
1858        .expect("subscription failed");
1859
1860        assert_eq!(received_sub_id, subscription_id);
1861
1862        // Now try to unsubscribe - this should fail because server "restarted"
1863        let unsubscribe_result =
1864            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1865                .await
1866                .expect("unsubscribe timed out");
1867
1868        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1869        // In this case, the client should treat it as successful since the subscription
1870        // is effectively gone (whether due to server restart or other reasons)
1871        unsubscribe_result
1872            .expect("Unsubscribe should succeed even if server says subscription not found");
1873
1874        timeout(Duration::from_millis(100), client.close())
1875            .await
1876            .expect("close timed out")
1877            .expect("close failed");
1878        jh.await
1879            .expect("ws loop errored")
1880            .unwrap();
1881        server_thread.await.unwrap();
1882    }
1883
1884    #[test_log::test(tokio::test)]
1885    async fn test_parse_error_handling() {
1886        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1887
1888        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1889        let error_response = WebSocketMessage::Response(Response::Error(
1890            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1891        ));
1892        let error_json = serde_json::to_string(&error_response).unwrap();
1893
1894        let exp_comm = [
1895            // subscribe first so connect can finish successfully
1896            ExpectedComm::Receive(
1897                100,
1898                tungstenite::protocol::Message::Text(
1899                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1900                ),
1901            ),
1902            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1903        ];
1904
1905        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1906
1907        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1908        let jh = client
1909            .connect()
1910            .await
1911            .expect("connect failed");
1912
1913        // Subscribe successfully
1914        let _ = timeout(
1915            Duration::from_millis(100),
1916            client.subscribe(extractor_id, SubscriptionOptions::new()),
1917        )
1918        .await
1919        .expect("subscription timed out");
1920
1921        // The client should receive the parse error and close the connection
1922        let result = jh
1923            .await
1924            .expect("ws loop should complete");
1925        assert!(result.is_err());
1926        if let Err(DeltasError::ServerError(message, _)) = result {
1927            assert!(message.contains("Server failed to parse client message"));
1928        } else {
1929            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1930        }
1931
1932        server_thread.await.unwrap();
1933    }
1934
1935    #[test_log::test(tokio::test)]
1936    async fn test_compression_error_handling() {
1937        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1938
1939        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1940        let subscription_id = Uuid::new_v4();
1941        let error_response = WebSocketMessage::Response(Response::Error(
1942            WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1943        ));
1944        let error_json = serde_json::to_string(&error_response).unwrap();
1945
1946        let exp_comm = [
1947            // subscribe first so connect can finish successfully
1948            ExpectedComm::Receive(
1949                100,
1950                tungstenite::protocol::Message::Text(
1951                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
1952                ),
1953            ),
1954            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1955        ];
1956
1957        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1958
1959        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1960        let jh = client
1961            .connect()
1962            .await
1963            .expect("connect failed");
1964
1965        // Subscribe successfully
1966        let _ = timeout(
1967            Duration::from_millis(100),
1968            client.subscribe(extractor_id, SubscriptionOptions::new()),
1969        )
1970        .await
1971        .expect("subscription timed out");
1972
1973        // The client should receive the parse error
1974        let result = jh
1975            .await
1976            .expect("ws loop should complete");
1977        assert!(result.is_err());
1978        if let Err(DeltasError::ServerError(message, _)) = result {
1979            assert!(message.contains("Server failed to compress message for subscription"));
1980        } else {
1981            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1982        }
1983
1984        server_thread.await.unwrap();
1985    }
1986
1987    #[tokio::test]
1988    async fn test_subscribe_error_handling() {
1989        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1990
1991        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
1992
1993        let error_response = WebSocketMessage::Response(Response::Error(
1994            WebsocketError::SubscribeError(extractor_id.clone()),
1995        ));
1996        let error_json = serde_json::to_string(&error_response).unwrap();
1997
1998        let exp_comm = [
1999            ExpectedComm::Receive(
2000                100,
2001                tungstenite::protocol::Message::Text(
2002                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true,"compression":false}"#.to_string()
2003                ),
2004            ),
2005            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2006        ];
2007
2008        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2009
2010        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2011        let jh = client
2012            .connect()
2013            .await
2014            .expect("connect failed");
2015
2016        let result = timeout(
2017            Duration::from_millis(100),
2018            client.subscribe(extractor_id, SubscriptionOptions::new()),
2019        )
2020        .await
2021        .expect("subscription timed out");
2022
2023        // Verify that we get a ServerError for subscribe failure
2024        assert!(result.is_err());
2025        if let Err(DeltasError::ServerError(msg, _)) = result {
2026            assert!(msg.contains("Subscription failed"));
2027            assert!(msg.contains("Failed to subscribe to extractor"));
2028        } else {
2029            panic!("Expected DeltasError::ServerError, got: {:?}", result);
2030        }
2031
2032        timeout(Duration::from_millis(100), client.close())
2033            .await
2034            .expect("close timed out")
2035            .expect("close failed");
2036        jh.await
2037            .expect("ws loop errored")
2038            .unwrap();
2039        server_thread.await.unwrap();
2040    }
2041
2042    #[tokio::test]
2043    async fn test_cancel_pending_subscription() {
2044        // This test verifies that pending subscriptions are properly cancelled when errors occur
2045        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2046
2047        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
2048
2049        let error_response = WebSocketMessage::Response(Response::Error(
2050            WebsocketError::ExtractorNotFound(extractor_id.clone()),
2051        ));
2052        let error_json = serde_json::to_string(&error_response).unwrap();
2053
2054        let exp_comm = [
2055            ExpectedComm::Receive(
2056                100,
2057                tungstenite::protocol::Message::Text(
2058                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":false}"#.to_string()
2059                ),
2060            ),
2061            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2062        ];
2063
2064        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2065
2066        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2067        let jh = client
2068            .connect()
2069            .await
2070            .expect("connect failed");
2071
2072        // Start two subscription attempts simultaneously
2073        let client_clone = client.clone();
2074        let extractor_id_clone = extractor_id.clone();
2075
2076        let subscription1 = tokio::spawn({
2077            let client_for_spawn = client.clone();
2078            async move {
2079                client_for_spawn
2080                    .subscribe(extractor_id, SubscriptionOptions::new())
2081                    .await
2082            }
2083        });
2084
2085        let subscription2 = tokio::spawn(async move {
2086            // This should fail because there's already a pending subscription
2087            client_clone
2088                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2089                .await
2090        });
2091
2092        let (result1, result2) = tokio::join!(subscription1, subscription2);
2093
2094        let result1 = result1.unwrap();
2095        let result2 = result2.unwrap();
2096
2097        // One should fail due to ExtractorNotFound error from server
2098        // The other should fail due to SubscriptionAlreadyPending
2099        assert!(result1.is_err() || result2.is_err());
2100
2101        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2102            // This is expected for the second subscription
2103        } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2104            // This is expected for the first subscription that gets the server error
2105        } else {
2106            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2107        }
2108
2109        timeout(Duration::from_millis(100), client.close())
2110            .await
2111            .expect("close timed out")
2112            .expect("close failed");
2113        jh.await
2114            .expect("ws loop errored")
2115            .unwrap();
2116        server_thread.await.unwrap();
2117    }
2118
2119    #[tokio::test]
2120    async fn test_force_unsubscribe_prevents_multiple_calls() {
2121        // Test that force_unsubscribe prevents sending duplicate unsubscribe commands
2122        // when called multiple times for the same subscription_id
2123
2124        let subscription_id = Uuid::new_v4();
2125
2126        let exp_comm = [
2127            ExpectedComm::Receive(
2128                100,
2129                tungstenite::protocol::Message::Text(
2130                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true,"compression":false}"#.to_string()
2131                ),
2132            ),
2133            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2134                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2135                subscription_id
2136            ))),
2137            // Expect only ONE unsubscribe message, even though force_unsubscribe is called twice
2138            ExpectedComm::Receive(
2139                100,
2140                tungstenite::protocol::Message::Text(format!(
2141                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2142                    subscription_id
2143                )),
2144            ),
2145            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2146                r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2147                subscription_id
2148            ))),
2149        ];
2150
2151        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2152
2153        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2154        let jh = client
2155            .connect()
2156            .await
2157            .expect("connect failed");
2158
2159        let (received_sub_id, _rx) = timeout(
2160            Duration::from_millis(100),
2161            client.subscribe(
2162                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2163                SubscriptionOptions::new(),
2164            ),
2165        )
2166        .await
2167        .expect("subscription timed out")
2168        .expect("subscription failed");
2169
2170        assert_eq!(received_sub_id, subscription_id);
2171
2172        // Access the inner state to call force_unsubscribe directly
2173        {
2174            let mut inner_guard = client.inner.lock().await;
2175            let inner = inner_guard
2176                .as_mut()
2177                .expect("client should be connected");
2178
2179            // Call force_unsubscribe twice - only the first should send an unsubscribe message
2180            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2181            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2182        }
2183
2184        // Give time for messages to be processed
2185        tokio::time::sleep(Duration::from_millis(50)).await;
2186
2187        // Close may fail if client disconnected after unsubscribe, which is fine
2188        let _ = timeout(Duration::from_millis(100), client.close()).await;
2189
2190        // Wait for tasks to complete
2191        let _ = jh.await;
2192        let _ = server_thread.await;
2193    }
2194}