Skip to main content

tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, MutexGuard, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62    BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65use zstd;
66
67use crate::TYCHO_SERVER_VERSION;
68
69#[derive(Error, Debug)]
70pub enum DeltasError {
71    /// Failed to parse the provided URI.
72    #[error("Failed to parse URI: {0}. Error: {1}")]
73    UriParsing(String, String),
74
75    /// The requested subscription is already pending and is awaiting confirmation from the server.
76    #[error("The requested subscription is already pending")]
77    SubscriptionAlreadyPending,
78
79    #[error("The server replied with an error: {0}")]
80    ServerError(String, #[source] WebsocketError),
81
82    /// A message failed to send via an internal channel or through the websocket channel.
83    /// This is typically a fatal error and might indicate a bug in the implementation.
84    #[error("{0}")]
85    TransportError(String),
86
87    /// The internal message buffer is full. This likely means that messages are not being consumed
88    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
89    /// size.
90    #[error("The buffer is full!")]
91    BufferFull,
92
93    /// The client has no active connections but was accessed (e.g., by calling subscribe).
94    /// This typically occurs when trying to use the client before calling connect() or
95    /// after the connection has been closed.
96    #[error("The client is not connected!")]
97    NotConnected,
98
99    /// The connect method was called while the client already had an active connection.
100    #[error("The client is already connected!")]
101    AlreadyConnected,
102
103    /// The connection was closed orderly by the server, e.g. because it restarted.
104    #[error("The server closed the connection!")]
105    ConnectionClosed,
106
107    /// The connection was closed unexpectedly by the server or encountered a network error.
108    #[error("Connection error: {0}")]
109    ConnectionError(#[from] Box<tungstenite::Error>),
110
111    /// A fatal error occurred that cannot be recovered from.
112    #[error("Tycho FatalError: {0}")]
113    Fatal(String),
114}
115
116#[derive(Clone, Debug)]
117pub struct SubscriptionOptions {
118    include_state: bool,
119    compression: bool,
120    partial_blocks: bool,
121}
122
123impl Default for SubscriptionOptions {
124    fn default() -> Self {
125        Self { include_state: true, compression: true, partial_blocks: false }
126    }
127}
128
129impl SubscriptionOptions {
130    pub fn new() -> Self {
131        Self::default()
132    }
133    pub fn with_state(mut self, val: bool) -> Self {
134        self.include_state = val;
135        self
136    }
137    pub fn with_compression(mut self, val: bool) -> Self {
138        self.compression = val;
139        self
140    }
141    pub fn with_partial_blocks(mut self, val: bool) -> Self {
142        self.partial_blocks = val;
143        self
144    }
145}
146
147#[cfg_attr(test, automock)]
148#[async_trait]
149pub trait DeltasClient {
150    /// Subscribe to an extractor and receive realtime messages
151    ///
152    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
153    /// cancels while waiting for confirmation the subscription may still be registered. If the
154    /// receiver was deallocated though, the first message from the subscription will remove it
155    /// again - since there is no one to inform about these messages.
156    async fn subscribe(
157        &self,
158        extractor_id: ExtractorIdentity,
159        options: SubscriptionOptions,
160    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
161
162    /// Unsubscribe from an subscription
163    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
164
165    /// Start the clients message handling loop.
166    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
167
168    /// Close the clients message handling loop.
169    async fn close(&self) -> Result<(), DeltasError>;
170}
171
172#[derive(Clone)]
173pub struct WsDeltasClient {
174    /// The tycho indexer websocket uri.
175    uri: Uri,
176    /// Authorization key for the websocket connection.
177    auth_key: Option<String>,
178    /// Maximum amount of reconnects to try before giving up.
179    max_reconnects: u64,
180    /// Duration to wait before attempting to reconnect
181    retry_cooldown: Duration,
182    /// The client will buffer this many messages incoming from the websocket
183    /// before starting to drop them.
184    ws_buffer_size: usize,
185    /// The client will buffer that many messages for each subscription before it starts dropping
186    /// them.
187    subscription_buffer_size: usize,
188    /// Notify tasks waiting for a connection to be established.
189    conn_notify: Arc<Notify>,
190    /// Shared client instance state.
191    inner: Arc<Mutex<Option<Inner>>>,
192    /// If set the client has exhausted its reconnection attempts
193    dead: Arc<AtomicBool>,
194}
195
196type WebSocketSink =
197    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
198
199/// Subscription State
200///
201/// Subscription go through a lifecycle:
202///
203/// ```text
204/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
205/// ```
206///
207/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
208/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
209/// for `unsubscribe`.
210#[derive(Debug)]
211enum SubscriptionInfo {
212    /// Subscription was requested we wait for server confirmation and uuid assignment.
213    RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
214    /// Subscription is active.
215    Active,
216    /// Unsubscription was requested, we wait for server confirmation.
217    RequestedUnsubscription(oneshot::Sender<()>),
218}
219
220/// Internal struct containing shared state between of WsDeltaClient instances.
221struct Inner {
222    /// Websocket sender handle.
223    sink: WebSocketSink,
224    /// Command channel sender handle.
225    cmd_tx: Sender<()>,
226    /// Currently pending subscriptions.
227    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
228    /// Active subscriptions.
229    subscriptions: HashMap<Uuid, SubscriptionInfo>,
230    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
231    /// subscribe.
232    sender: HashMap<Uuid, Sender<BlockChanges>>,
233    /// How many messages to buffer per subscription before starting to drop new messages.
234    buffer_size: usize,
235}
236
237/// Shared state between all client instances.
238///
239/// This state is behind a mutex and requires synchronization to be read of modified.
240impl Inner {
241    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
242        Self {
243            sink,
244            cmd_tx,
245            pending: HashMap::new(),
246            subscriptions: HashMap::new(),
247            sender: HashMap::new(),
248            buffer_size,
249        }
250    }
251
252    /// Registers a new pending subscription.
253    #[allow(clippy::result_large_err)]
254    fn new_subscription(
255        &mut self,
256        id: &ExtractorIdentity,
257        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
258    ) -> Result<(), DeltasError> {
259        if self.pending.contains_key(id) {
260            return Err(DeltasError::SubscriptionAlreadyPending);
261        }
262        self.pending
263            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
264        Ok(())
265    }
266
267    /// Transitions a pending subscription to active.
268    ///
269    /// Will ignore any request to do so for subscriptions that are not pending.
270    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
271        if let Some(info) = self.pending.remove(extractor_id) {
272            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
273                let (tx, rx) = mpsc::channel(self.buffer_size);
274                self.sender.insert(subscription_id, tx);
275                self.subscriptions
276                    .insert(subscription_id, SubscriptionInfo::Active);
277                let _ = ready_tx
278                    .send(Ok((subscription_id, rx)))
279                    .map_err(|_| {
280                        warn!(
281                            ?extractor_id,
282                            ?subscription_id,
283                            "Subscriber for has gone away. Ignoring."
284                        )
285                    });
286            } else {
287                error!(
288                    ?extractor_id,
289                    ?subscription_id,
290                    "Pending subscription was not in the correct state to 
291                    transition to active. Ignoring!"
292                )
293            }
294        } else {
295            error!(
296                ?extractor_id,
297                ?subscription_id,
298                "Tried to mark an unknown subscription as active. Ignoring!"
299            );
300        }
301    }
302
303    /// Sends a message to a subscription's receiver.
304    #[allow(clippy::result_large_err)]
305    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
306        if let Some(sender) = self.sender.get_mut(id) {
307            sender
308                .try_send(msg)
309                .map_err(|e| match e {
310                    TrySendError::Full(_) => DeltasError::BufferFull,
311                    TrySendError::Closed(_) => {
312                        DeltasError::TransportError("The subscriber has gone away".to_string())
313                    }
314                })?;
315        }
316        Ok(())
317    }
318
319    /// Requests a subscription to end.
320    ///
321    /// The subscription needs to exist and be active for this to have any effect. Wll use
322    /// `ready_tx` to notify the receiver once the transition to ended completed.
323    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
324        if let Some(info) = self
325            .subscriptions
326            .get_mut(subscription_id)
327        {
328            if let SubscriptionInfo::Active = info {
329                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
330            }
331        } else {
332            // no big deal imo so only debug lvl...
333            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
334        }
335    }
336
337    /// Removes and fully ends a subscription
338    ///
339    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
340    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
341    /// Will remove a subscription even it was in active or pending state before, this is to support
342    /// any server side failure of the subscription.
343    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
344        if let Entry::Occupied(e) = self
345            .subscriptions
346            .entry(subscription_id)
347        {
348            let info = e.remove();
349            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
350                let _ = tx.send(()).map_err(|_| {
351                    debug!(?subscription_id, "failed to notify about removed subscription")
352                });
353                self.sender
354                    .remove(&subscription_id)
355                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
356            } else {
357                warn!(?subscription_id, "Subscription ended unexpectedly!");
358                self.sender
359                    .remove(&subscription_id)
360                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
361            }
362        } else {
363            // TODO: There is a race condition that can trigger multiple unsubscribes
364            //  if server doesn't respond quickly enough leading to some ugly logs but
365            //  doesn't affect behaviour negatively. E.g. BufferFull and multiple
366            //  messages from the ws connection are queued.
367            trace!(
368                ?subscription_id,
369                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
370            );
371        }
372
373        Ok(())
374    }
375
376    fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
377        if let Some(sub_info) = self.pending.remove(extractor_id) {
378            match sub_info {
379                SubscriptionInfo::RequestedSubscription(tx) => {
380                    let _ = tx
381                        .send(Err(DeltasError::ServerError(
382                            format!("Subscription failed: {error}"),
383                            error.clone(),
384                        )))
385                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
386                }
387                _ => {
388                    error!(?extractor_id, "Pending subscription in wrong state")
389                }
390            }
391        } else {
392            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
393        }
394    }
395
396    /// Sends a message through the websocket.
397    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
398        self.sink.send(msg).await.map_err(|e| {
399            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
400        })
401    }
402}
403
404/// Tycho client websocket implementation.
405impl WsDeltasClient {
406    // Construct a new client with 5 reconnection attempts.
407    #[allow(clippy::result_large_err)]
408    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
409        let uri = ws_uri
410            .parse::<Uri>()
411            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
412        Ok(Self {
413            uri,
414            auth_key: auth_key.map(|s| s.to_string()),
415            inner: Arc::new(Mutex::new(None)),
416            ws_buffer_size: 128,
417            subscription_buffer_size: 128,
418            conn_notify: Arc::new(Notify::new()),
419            max_reconnects: 5,
420            retry_cooldown: Duration::from_millis(500),
421            dead: Arc::new(AtomicBool::new(false)),
422        })
423    }
424
425    // Construct a new client with a custom number of reconnection attempts.
426    #[allow(clippy::result_large_err)]
427    pub fn new_with_reconnects(
428        ws_uri: &str,
429        auth_key: Option<&str>,
430        max_reconnects: u64,
431        retry_cooldown: Duration,
432    ) -> Result<Self, DeltasError> {
433        let uri = ws_uri
434            .parse::<Uri>()
435            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
436
437        Ok(Self {
438            uri,
439            auth_key: auth_key.map(|s| s.to_string()),
440            inner: Arc::new(Mutex::new(None)),
441            ws_buffer_size: 128,
442            subscription_buffer_size: 128,
443            conn_notify: Arc::new(Notify::new()),
444            max_reconnects,
445            retry_cooldown,
446            dead: Arc::new(AtomicBool::new(false)),
447        })
448    }
449
450    // Construct a new client with custom buffer sizes (for testing)
451    #[cfg(test)]
452    #[allow(clippy::result_large_err)]
453    pub fn new_with_custom_buffers(
454        ws_uri: &str,
455        auth_key: Option<&str>,
456        ws_buffer_size: usize,
457        subscription_buffer_size: usize,
458    ) -> Result<Self, DeltasError> {
459        let uri = ws_uri
460            .parse::<Uri>()
461            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
462        Ok(Self {
463            uri,
464            auth_key: auth_key.map(|s| s.to_string()),
465            inner: Arc::new(Mutex::new(None)),
466            ws_buffer_size,
467            subscription_buffer_size,
468            conn_notify: Arc::new(Notify::new()),
469            max_reconnects: 5,
470            retry_cooldown: Duration::from_millis(0),
471            dead: Arc::new(AtomicBool::new(false)),
472        })
473    }
474
475    /// Ensures that the client is connected.
476    ///
477    /// This method will acquire the lock for inner.
478    async fn is_connected(&self) -> bool {
479        let guard = self.inner.as_ref().lock().await;
480        guard.is_some()
481    }
482
483    /// Waits for the client to be connected
484    ///
485    /// This method acquires the lock for inner for a short period, then waits until the  
486    /// connection is established if not already connected.
487    async fn ensure_connection(&self) -> Result<(), DeltasError> {
488        if self.dead.load(Ordering::SeqCst) {
489            return Err(DeltasError::NotConnected)
490        };
491        if !self.is_connected().await {
492            self.conn_notify.notified().await;
493        };
494        Ok(())
495    }
496
497    /// Main message handling logic
498    ///
499    /// If the message returns an error, a reconnect attempt may be considered depending on the
500    /// error type.
501    #[instrument(skip(self, msg))]
502    async fn handle_msg(
503        &self,
504        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
505    ) -> Result<(), DeltasError> {
506        let mut guard = self.inner.lock().await;
507
508        match msg {
509            // We do not deserialize the message directly into a WebSocketMessage. This is because
510            // the serde arbitrary_precision feature (often included in many
511            // dependencies we use) breaks some untagged enum deserializations. Instead,
512            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
513            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
514                serde_json::Value,
515            >(&text)
516            {
517                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
518                    Ok(ws_message) => match ws_message {
519                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
520                            Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
521                        }
522                        WebSocketMessage::Response(Response::NewSubscription {
523                            extractor_id,
524                            subscription_id,
525                        }) => {
526                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
527                            let inner = guard
528                                .as_mut()
529                                .ok_or_else(|| DeltasError::NotConnected)?;
530                            inner.mark_active(&extractor_id, subscription_id);
531                        }
532                        WebSocketMessage::Response(Response::SubscriptionEnded {
533                            subscription_id,
534                        }) => {
535                            info!(?subscription_id, "Received a subscription ended");
536                            let inner = guard
537                                .as_mut()
538                                .ok_or_else(|| DeltasError::NotConnected)?;
539                            inner.remove_subscription(subscription_id)?;
540                        }
541                        WebSocketMessage::Response(Response::Error(error)) => match &error {
542                            WebsocketError::ExtractorNotFound(extractor_id) => {
543                                let inner = guard
544                                    .as_mut()
545                                    .ok_or_else(|| DeltasError::NotConnected)?;
546                                inner.cancel_pending(extractor_id, &error);
547                            }
548                            WebsocketError::SubscriptionNotFound(subscription_id) => {
549                                debug!("Received subscription not found, removing subscription");
550                                let inner = guard
551                                    .as_mut()
552                                    .ok_or_else(|| DeltasError::NotConnected)?;
553                                inner.remove_subscription(*subscription_id)?;
554                            }
555                            WebsocketError::ParseError(raw, e) => {
556                                return Err(DeltasError::ServerError(
557                                    format!(
558                                        "Server failed to parse client message: {e}, msg: {raw}"
559                                    ),
560                                    error.clone(),
561                                ))
562                            }
563                            WebsocketError::CompressionError(subscription_id, e) => {
564                                return Err(DeltasError::ServerError(
565                                    format!(
566                                        "Server failed to compress message for subscription: {subscription_id}, error: {e}"
567                                    ),
568                                    error.clone(),
569                                ))
570                            }
571                            WebsocketError::SubscribeError(extractor_id) => {
572                                let inner = guard
573                                    .as_mut()
574                                    .ok_or_else(|| DeltasError::NotConnected)?;
575                                inner.cancel_pending(extractor_id, &error);
576                            }
577                        },
578                    },
579                    Err(e) => {
580                        error!(
581                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
582                            e, text
583                        );
584                    }
585                },
586                Err(e) => {
587                    error!(
588                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
589                        e, text
590                    );
591                }
592            },
593            Ok(tungstenite::protocol::Message::Binary(data)) => {
594                // Decompress the zstd-compressed data,
595                // Note that we only support compressed BlockChanges messages for now.
596                match zstd::decode_all(data.as_slice()) {
597                    Ok(decompressed) => match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
598                                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value.clone()) {
599                                    Ok(ws_message) => match ws_message {
600                                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
601                                            Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
602                                        }
603                                        _ => {
604                                            error!(
605                                                "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
606                                            );
607                                        }
608
609                                    },
610                                    Err(e) => {
611                                        error!(
612                                            "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
613                                        );
614                                    }
615                                },
616                                Err(e) => {
617                                    error!(
618                                        "Failed to deserialize compressed message: invalid JSON. {e}",
619                                    );
620                                }
621                            },
622                    Err(e) => {
623                        error!("Failed to decompress zstd data: {}", e);
624                    }
625                }
626            },
627            Ok(tungstenite::protocol::Message::Ping(_)) => {
628                // Respond to pings with pongs.
629                let inner = guard
630                    .as_mut()
631                    .ok_or_else(|| DeltasError::NotConnected)?;
632                if let Err(error) = inner
633                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
634                    .await
635                {
636                    debug!(?error, "Failed to send pong!");
637                }
638            }
639            Ok(tungstenite::protocol::Message::Pong(_)) => {
640                // Do nothing.
641            }
642            Ok(tungstenite::protocol::Message::Close(_)) => {
643                return Err(DeltasError::ConnectionClosed);
644            }
645            Ok(unknown_msg) => {
646                info!("Received an unknown message type: {:?}", unknown_msg);
647            }
648            Err(error) => {
649                error!(?error, "Websocket error");
650                return Err(match error {
651                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
652                    tungstenite::Error::AlreadyClosed => {
653                        warn!("Received AlreadyClosed error which is indicative of a bug!");
654                        DeltasError::ConnectionError(Box::new(error))
655                    }
656                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
657                        DeltasError::ConnectionError(Box::new(error))
658                    }
659                    _ => DeltasError::Fatal(error.to_string()),
660                });
661            }
662        };
663        Ok(())
664    }
665
666    async fn handle_block_changes_msg(
667        guard: &mut MutexGuard<'_, Option<Inner>>,
668        subscription_id: Uuid,
669        deltas: BlockChanges,
670    ) -> Result<(), DeltasError> {
671        trace!(?deltas, "Received a block state change, sending to channel");
672        let inner = guard
673            .as_mut()
674            .ok_or_else(|| DeltasError::NotConnected)?;
675        match inner.send(&subscription_id, deltas) {
676            Err(DeltasError::BufferFull) => {
677                error!(?subscription_id, "Buffer full, unsubscribing!");
678                Self::force_unsubscribe(subscription_id, inner).await;
679            }
680            Err(_) => {
681                warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
682                Self::force_unsubscribe(subscription_id, inner).await;
683            }
684            _ => { /* Do nothing */ }
685        }
686        Ok(())
687    }
688
689    /// Forcefully ends a (client) stream by unsubscribing.
690    ///
691    /// Is used only if the message can't be processed due to an error that might resolve
692    /// itself by resubscribing.
693    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
694        // avoid unsubscribing multiple times
695        if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
696            .subscriptions
697            .get(&subscription_id)
698        {
699            return
700        }
701
702        let (tx, rx) = oneshot::channel();
703        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
704            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
705        } else {
706            // Wait for unsubscribe completion with timeout
707            match tokio::time::timeout(Duration::from_secs(5), rx).await {
708                Ok(_) => {
709                    debug!(?subscription_id, "Unsubscribe completed successfully");
710                }
711                Err(_) => {
712                    warn!(?subscription_id, "Unsubscribe completion timed out");
713                }
714            }
715        }
716    }
717
718    /// Helper method to force an unsubscription
719    ///
720    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
721    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
722    /// receivers.
723    async fn unsubscribe_inner(
724        inner: &mut Inner,
725        subscription_id: Uuid,
726        ready_tx: oneshot::Sender<()>,
727    ) -> Result<(), DeltasError> {
728        debug!(?subscription_id, "Unsubscribing");
729        inner.end_subscription(&subscription_id, ready_tx);
730        let cmd = Command::Unsubscribe { subscription_id };
731        inner
732            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
733                |e| {
734                    DeltasError::TransportError(format!(
735                        "Failed to serialize unsubscribe command: {e}"
736                    ))
737                },
738            )?))
739            .await?;
740        Ok(())
741    }
742}
743
744#[async_trait]
745impl DeltasClient for WsDeltasClient {
746    #[instrument(skip(self))]
747    async fn subscribe(
748        &self,
749        extractor_id: ExtractorIdentity,
750        options: SubscriptionOptions,
751    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
752        trace!("Starting subscribe");
753        self.ensure_connection().await?;
754        let (ready_tx, ready_rx) = oneshot::channel();
755        {
756            let mut guard = self.inner.lock().await;
757            let inner = guard
758                .as_mut()
759                .ok_or_else(|| DeltasError::NotConnected)?;
760            trace!("Sending subscribe command");
761            inner.new_subscription(&extractor_id, ready_tx)?;
762            let cmd = Command::Subscribe {
763                extractor_id,
764                include_state: options.include_state,
765                compression: options.compression,
766                partial_blocks: options.partial_blocks,
767            };
768            inner
769                .ws_send(tungstenite::protocol::Message::Text(
770                    serde_json::to_string(&cmd).map_err(|e| {
771                        DeltasError::TransportError(format!(
772                            "Failed to serialize subscribe command: {e}"
773                        ))
774                    })?,
775                ))
776                .await?;
777        }
778        trace!("Waiting for subscription response");
779        let res = ready_rx.await.map_err(|_| {
780            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
781        })??;
782        trace!("Subscription successful");
783        Ok(res)
784    }
785
786    #[instrument(skip(self))]
787    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
788        self.ensure_connection().await?;
789        let (ready_tx, ready_rx) = oneshot::channel();
790        {
791            let mut guard = self.inner.lock().await;
792            let inner = guard
793                .as_mut()
794                .ok_or_else(|| DeltasError::NotConnected)?;
795
796            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
797        }
798        ready_rx.await.map_err(|_| {
799            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
800        })?;
801
802        Ok(())
803    }
804
805    #[instrument(skip(self))]
806    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
807        if self.is_connected().await {
808            return Err(DeltasError::AlreadyConnected);
809        }
810        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
811        info!(?ws_uri, "Starting TychoWebsocketClient");
812
813        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
814        {
815            let mut guard = self.inner.as_ref().lock().await;
816            *guard = None;
817        }
818        let this = self.clone();
819        let jh = tokio::spawn(async move {
820            let mut retry_count = 0;
821            let mut result = Err(DeltasError::NotConnected);
822
823            'retry: while retry_count < this.max_reconnects {
824                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
825                if retry_count > 0 {
826                    sleep(this.retry_cooldown).await;
827                }
828
829                // Create a WebSocket request
830                let mut request_builder = Request::builder()
831                    .uri(&ws_uri)
832                    .header(SEC_WEBSOCKET_KEY, generate_key())
833                    .header(SEC_WEBSOCKET_VERSION, 13)
834                    .header(CONNECTION, "Upgrade")
835                    .header(UPGRADE, "websocket")
836                    .header(
837                        HOST,
838                        this.uri.host().ok_or_else(|| {
839                            DeltasError::UriParsing(
840                                ws_uri.clone(),
841                                "No host found in tycho url".to_string(),
842                            )
843                        })?,
844                    )
845                    .header(
846                        USER_AGENT,
847                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
848                    );
849
850                // Add Authorization if one is given
851                if let Some(ref key) = this.auth_key {
852                    request_builder = request_builder.header(AUTHORIZATION, key);
853                }
854
855                let request = request_builder.body(()).map_err(|e| {
856                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
857                })?;
858                let (conn, _) = match connect_async(request).await {
859                    Ok(conn) => conn,
860                    Err(e) => {
861                        // Prepare for reconnection
862                        retry_count += 1;
863                        let mut guard = this.inner.as_ref().lock().await;
864                        *guard = None;
865
866                        warn!(
867                            e = e.to_string(),
868                            "Failed to connect to WebSocket server; Reconnecting"
869                        );
870                        continue 'retry;
871                    }
872                };
873
874                let (ws_tx_new, ws_rx_new) = conn.split();
875                {
876                    let mut guard = this.inner.as_ref().lock().await;
877                    *guard =
878                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
879                }
880                let mut msg_rx = ws_rx_new.boxed();
881
882                info!("Connection Successful: TychoWebsocketClient started");
883                this.conn_notify.notify_waiters();
884                result = Ok(());
885
886                loop {
887                    let res = tokio::select! {
888                        msg = msg_rx.next() => match msg {
889                            Some(msg) => this.handle_msg(msg).await,
890                            None => {
891                                // This code should not be reachable since the stream
892                                // should return ConnectionClosed in the case above
893                                // before it returns None here.
894                                warn!("Websocket connection silently closed, giving up!");
895                                break 'retry
896                            }
897                        },
898                        _ = cmd_rx.recv() => {break 'retry},
899                    };
900                    if let Err(error) = res {
901                        debug!(?error, "WsError");
902                        if matches!(
903                            error,
904                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
905                        ) {
906                            // Prepare for reconnection
907                            retry_count += 1;
908                            let mut guard = this.inner.as_ref().lock().await;
909                            *guard = None;
910
911                            warn!(
912                                ?error,
913                                ?retry_count,
914                                "Connection dropped unexpectedly; Reconnecting..."
915                            );
916                            break;
917                        } else {
918                            // Other errors are considered fatal
919                            error!(?error, "Fatal error; Exiting");
920                            result = Err(error);
921                            break 'retry;
922                        }
923                    }
924                }
925            }
926            debug!(
927                retry_count,
928                max_reconnects=?this.max_reconnects,
929                "Reconnection loop ended"
930            );
931            // Clean up before exiting
932            let mut guard = this.inner.as_ref().lock().await;
933            *guard = None;
934
935            // Check if max retries has been reached.
936            if retry_count >= this.max_reconnects {
937                error!("Max reconnection attempts reached; Exiting");
938                this.dead.store(true, Ordering::SeqCst);
939                this.conn_notify.notify_waiters(); // Notify that the task is done
940                result = Err(DeltasError::ConnectionClosed);
941            }
942
943            result
944        });
945
946        self.conn_notify.notified().await;
947
948        if self.is_connected().await {
949            Ok(jh)
950        } else {
951            Err(DeltasError::NotConnected)
952        }
953    }
954
955    #[instrument(skip(self))]
956    async fn close(&self) -> Result<(), DeltasError> {
957        info!("Closing TychoWebsocketClient");
958        let mut guard = self.inner.lock().await;
959        let inner = guard
960            .as_mut()
961            .ok_or_else(|| DeltasError::NotConnected)?;
962        inner
963            .cmd_tx
964            .send(())
965            .await
966            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
967        Ok(())
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use std::{net::SocketAddr, str::FromStr};
974
975    use tokio::{net::TcpListener, time::timeout};
976    use tycho_common::dto::Chain;
977
978    use super::*;
979
980    #[derive(Clone)]
981    enum ExpectedComm {
982        Receive(u64, tungstenite::protocol::Message),
983        Send(tungstenite::protocol::Message),
984    }
985
986    async fn mock_tycho_ws(
987        messages: &[ExpectedComm],
988        reconnects: usize,
989    ) -> (SocketAddr, JoinHandle<()>) {
990        info!("Starting mock webserver");
991        // zero port here means the OS chooses an open port
992        let server = TcpListener::bind("127.0.0.1:0")
993            .await
994            .expect("localhost bind failed");
995        let addr = server.local_addr().unwrap();
996        let messages = messages.to_vec();
997
998        let jh = tokio::spawn(async move {
999            info!("mock webserver started");
1000            for _ in 0..(reconnects + 1) {
1001                info!("Awaiting client connections");
1002                if let Ok((stream, _)) = server.accept().await {
1003                    info!("Client connected");
1004                    let mut websocket = tokio_tungstenite::accept_async(stream)
1005                        .await
1006                        .unwrap();
1007
1008                    info!("Handling messages..");
1009                    for c in messages.iter().cloned() {
1010                        match c {
1011                            ExpectedComm::Receive(t, exp) => {
1012                                info!("Awaiting message...");
1013                                let msg = timeout(Duration::from_millis(t), websocket.next())
1014                                    .await
1015                                    .expect("Receive timeout")
1016                                    .expect("Stream exhausted")
1017                                    .expect("Failed to receive message.");
1018                                info!("Message received");
1019                                assert_eq!(msg, exp)
1020                            }
1021                            ExpectedComm::Send(data) => {
1022                                info!("Sending message");
1023                                websocket
1024                                    .send(data)
1025                                    .await
1026                                    .expect("Failed to send message");
1027                                info!("Message sent");
1028                            }
1029                        };
1030                    }
1031                    info!("Mock communication completed");
1032                    sleep(Duration::from_millis(100)).await;
1033                    // Close the WebSocket connection
1034                    let _ = websocket.close(None).await;
1035                    info!("Mock server closed connection");
1036                }
1037            }
1038            info!("mock server ended");
1039        });
1040        (addr, jh)
1041    }
1042
1043    const SUBSCRIPTION_ID: &str = "30b740d1-cf09-4e0e-8cfe-b1434d447ece";
1044
1045    fn subscribe() -> String {
1046        subscribe_with_compression(false)
1047    }
1048
1049    fn subscribe_with_compression(compression: bool) -> String {
1050        serde_json::json!({
1051            "method": "subscribe",
1052            "extractor_id": {
1053                "chain": "ethereum",
1054                "name": "vm:ambient"
1055            },
1056            "include_state": true,
1057            "compression": compression,
1058            "partial_blocks": false
1059        })
1060        .to_string()
1061    }
1062
1063    fn subscription_confirmation() -> String {
1064        r#"
1065        {
1066            "method": "newsubscription",
1067            "extractor_id":{
1068                "chain": "ethereum",
1069                "name": "vm:ambient"
1070            },
1071            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1072        }
1073        "#
1074        .replace(|c: char| c.is_whitespace(), "")
1075    }
1076
1077    fn block_deltas() -> String {
1078        r#"
1079        {
1080            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1081            "deltas": {
1082                "extractor": "vm:ambient",
1083                "chain": "ethereum",
1084                "block": {
1085                    "number": 123,
1086                    "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1087                    "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1088                    "chain": "ethereum",
1089                    "ts": "2023-09-14T00:00:00"
1090                },
1091                "finalized_block_height": 0,
1092                "revert": false,
1093                "new_tokens": {},
1094                "account_updates": {
1095                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1096                        "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1097                        "chain": "ethereum",
1098                        "slots": {},
1099                        "balance": "0x01f4",
1100                        "code": "",
1101                        "change": "Update"
1102                    }
1103                },
1104                "state_updates": {
1105                    "component_1": {
1106                        "component_id": "component_1",
1107                        "updated_attributes": {"attr1": "0x01"},
1108                        "deleted_attributes": ["attr2"]
1109                    }
1110                },
1111                "new_protocol_components":
1112                    { "protocol_1": {
1113                            "id": "protocol_1",
1114                            "protocol_system": "system_1",
1115                            "protocol_type_name": "type_1",
1116                            "chain": "ethereum",
1117                            "tokens": ["0x01", "0x02"],
1118                            "contract_ids": ["0x01", "0x02"],
1119                            "static_attributes": {"attr1": "0x01f4"},
1120                            "change": "Update",
1121                            "creation_tx": "0x01",
1122                            "created_at": "2023-09-14T00:00:00"
1123                        }
1124                    },
1125                "deleted_protocol_components": {},
1126                "component_balances": {
1127                    "protocol_1":
1128                        {
1129                            "0x01": {
1130                                "token": "0x01",
1131                                "balance": "0x01f4",
1132                                "balance_float": 0.0,
1133                                "modify_tx": "0x01",
1134                                "component_id": "protocol_1"
1135                            }
1136                        }
1137                },
1138                "account_balances": {
1139                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1140                        "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1141                            "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1142                            "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1143                            "balance": "0x01f4",
1144                            "modify_tx": "0x01"
1145                        }
1146                    }
1147                },
1148                "component_tvl": {
1149                    "protocol_1": 1000.0
1150                },
1151                "dci_update": {
1152                    "new_entrypoints": {},
1153                    "new_entrypoint_params": {},
1154                    "trace_results": {}
1155                }
1156            }
1157        }
1158        "#.replace(|c: char| c.is_whitespace(), "")
1159    }
1160
1161    fn unsubscribe() -> String {
1162        r#"
1163        {
1164            "method": "unsubscribe",
1165            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1166        }
1167        "#
1168        .replace(|c: char| c.is_whitespace(), "")
1169    }
1170
1171    fn subscription_ended() -> String {
1172        r#"
1173        {
1174            "method": "subscriptionended",
1175            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1176        }
1177        "#
1178        .replace(|c: char| c.is_whitespace(), "")
1179    }
1180
1181    #[tokio::test]
1182    async fn test_uncompressed_subscribe_receive() {
1183        let exp_comm = [
1184            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1185            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1186            ExpectedComm::Send(tungstenite::protocol::Message::Text(block_deltas())),
1187        ];
1188        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1189
1190        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1191        let jh = client
1192            .connect()
1193            .await
1194            .expect("connect failed");
1195        let (_, mut rx) = timeout(
1196            Duration::from_millis(100),
1197            client.subscribe(
1198                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1199                SubscriptionOptions::new().with_compression(false),
1200            ),
1201        )
1202        .await
1203        .expect("subscription timed out")
1204        .expect("subscription failed");
1205        let _ = timeout(Duration::from_millis(100), rx.recv())
1206            .await
1207            .expect("awaiting message timeout out")
1208            .expect("receiving message failed");
1209        timeout(Duration::from_millis(100), client.close())
1210            .await
1211            .expect("close timed out")
1212            .expect("close failed");
1213        jh.await
1214            .expect("ws loop errored")
1215            .unwrap();
1216        server_thread.await.unwrap();
1217    }
1218
1219    #[tokio::test]
1220    async fn test_compressed_subscribe_receive() {
1221        let compressed_block_deltas = zstd::encode_all(
1222            block_deltas().as_bytes(),
1223            0, // default compression level
1224        )
1225        .expect("Failed to compress block deltas message");
1226
1227        let exp_comm = [
1228            ExpectedComm::Receive(
1229                100,
1230                tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1231            ),
1232            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1233            ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1234        ];
1235        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1236
1237        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1238        let jh = client
1239            .connect()
1240            .await
1241            .expect("connect failed");
1242        let (_, mut rx) = timeout(
1243            Duration::from_millis(100),
1244            client.subscribe(
1245                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1246                SubscriptionOptions::new().with_compression(true),
1247            ),
1248        )
1249        .await
1250        .expect("subscription timed out")
1251        .expect("subscription failed");
1252        let _ = timeout(Duration::from_millis(100), rx.recv())
1253            .await
1254            .expect("awaiting message timeout out")
1255            .expect("receiving message failed");
1256        timeout(Duration::from_millis(100), client.close())
1257            .await
1258            .expect("close timed out")
1259            .expect("close failed");
1260        jh.await
1261            .expect("ws loop errored")
1262            .unwrap();
1263        server_thread.await.unwrap();
1264    }
1265
1266    #[tokio::test]
1267    async fn test_unsubscribe() {
1268        let exp_comm = [
1269            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1270            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1271            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1272            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1273        ];
1274        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1275
1276        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1277        let jh = client
1278            .connect()
1279            .await
1280            .expect("connect failed");
1281        let (sub_id, mut rx) = timeout(
1282            Duration::from_millis(100),
1283            client.subscribe(
1284                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1285                SubscriptionOptions::new().with_compression(false),
1286            ),
1287        )
1288        .await
1289        .expect("subscription timed out")
1290        .expect("subscription failed");
1291
1292        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1293            .await
1294            .expect("unsubscribe timed out")
1295            .expect("unsubscribe failed");
1296        let res = timeout(Duration::from_millis(100), rx.recv())
1297            .await
1298            .expect("awaiting message timeout out");
1299
1300        // If the subscription ended, the channel should have been closed.
1301        assert!(res.is_none());
1302
1303        timeout(Duration::from_millis(100), client.close())
1304            .await
1305            .expect("close timed out")
1306            .expect("close failed");
1307        jh.await
1308            .expect("ws loop errored")
1309            .unwrap();
1310        server_thread.await.unwrap();
1311    }
1312
1313    #[tokio::test]
1314    async fn test_subscription_unexpected_end() {
1315        let exp_comm = [
1316            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1317            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1318            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1319        ];
1320        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1321
1322        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1323        let jh = client
1324            .connect()
1325            .await
1326            .expect("connect failed");
1327        let (_, mut rx) = timeout(
1328            Duration::from_millis(100),
1329            client.subscribe(
1330                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1331                SubscriptionOptions::new().with_compression(false),
1332            ),
1333        )
1334        .await
1335        .expect("subscription timed out")
1336        .expect("subscription failed");
1337        let res = timeout(Duration::from_millis(100), rx.recv())
1338            .await
1339            .expect("awaiting message timeout out");
1340
1341        // If the subscription ended, the channel should have been closed.
1342        assert!(res.is_none());
1343
1344        timeout(Duration::from_millis(100), client.close())
1345            .await
1346            .expect("close timed out")
1347            .expect("close failed");
1348        jh.await
1349            .expect("ws loop errored")
1350            .unwrap();
1351        server_thread.await.unwrap();
1352    }
1353
1354    #[test_log::test(tokio::test)]
1355    async fn test_reconnect() {
1356        let exp_comm = [
1357            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe()
1358            )),
1359            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1360                subscription_confirmation()
1361            )),
1362            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1363                {
1364                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1365                    "deltas": {
1366                        "extractor": "vm:ambient",
1367                        "chain": "ethereum",
1368                        "block": {
1369                            "number": 123,
1370                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1371                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1372                            "chain": "ethereum",             
1373                            "ts": "2023-09-14T00:00:00"
1374                        },
1375                        "finalized_block_height": 0,
1376                        "revert": false,
1377                        "new_tokens": {},
1378                        "account_updates": {
1379                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1380                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1381                                "chain": "ethereum",
1382                                "slots": {},
1383                                "balance": "0x01f4",
1384                                "code": "",
1385                                "change": "Update"
1386                            }
1387                        },
1388                        "state_updates": {
1389                            "component_1": {
1390                                "component_id": "component_1",
1391                                "updated_attributes": {"attr1": "0x01"},
1392                                "deleted_attributes": ["attr2"]
1393                            }
1394                        },
1395                        "new_protocol_components": {
1396                            "protocol_1":
1397                                {
1398                                    "id": "protocol_1",
1399                                    "protocol_system": "system_1",
1400                                    "protocol_type_name": "type_1",
1401                                    "chain": "ethereum",
1402                                    "tokens": ["0x01", "0x02"],
1403                                    "contract_ids": ["0x01", "0x02"],
1404                                    "static_attributes": {"attr1": "0x01f4"},
1405                                    "change": "Update",
1406                                    "creation_tx": "0x01",
1407                                    "created_at": "2023-09-14T00:00:00"
1408                                }
1409                            },
1410                        "deleted_protocol_components": {},
1411                        "component_balances": {
1412                            "protocol_1": {
1413                                "0x01": {
1414                                    "token": "0x01",
1415                                    "balance": "0x01f4",
1416                                    "balance_float": 1000.0,
1417                                    "modify_tx": "0x01",
1418                                    "component_id": "protocol_1"
1419                                }
1420                            }
1421                        },
1422                        "account_balances": {
1423                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1424                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1425                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1426                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1427                                    "balance": "0x01f4",
1428                                    "modify_tx": "0x01"
1429                                }
1430                            }
1431                        },
1432                        "component_tvl": {
1433                            "protocol_1": 1000.0
1434                        },
1435                        "dci_update": {
1436                            "new_entrypoints": {},
1437                            "new_entrypoint_params": {},
1438                            "trace_results": {}
1439                        }
1440                    }
1441                }
1442                "#.to_owned()
1443            ))
1444        ];
1445        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1446        let client = WsDeltasClient::new_with_reconnects(
1447            &format!("ws://{addr}"),
1448            None,
1449            3,
1450            // server stays down for 100ms on connection drop
1451            Duration::from_millis(110),
1452        )
1453        .unwrap();
1454
1455        let jh: JoinHandle<Result<(), DeltasError>> = client
1456            .connect()
1457            .await
1458            .expect("connect failed");
1459
1460        for _ in 0..2 {
1461            dbg!("loop");
1462            let (_, mut rx) = timeout(
1463                Duration::from_millis(200),
1464                client.subscribe(
1465                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1466                    SubscriptionOptions::new().with_compression(false),
1467                ),
1468            )
1469            .await
1470            .expect("subscription timed out")
1471            .expect("subscription failed");
1472
1473            let _ = timeout(Duration::from_millis(100), rx.recv())
1474                .await
1475                .expect("awaiting message timeout out")
1476                .expect("receiving message failed");
1477
1478            // wait for the connection to drop
1479            let res = timeout(Duration::from_millis(200), rx.recv())
1480                .await
1481                .expect("awaiting closed connection timeout out");
1482            assert!(res.is_none());
1483        }
1484        let res = jh.await.expect("ws client join failed");
1485        // 5th client reconnect attempt should fail
1486        assert!(res.is_err());
1487        server_thread
1488            .await
1489            .expect("ws server loop errored");
1490    }
1491
1492    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1493        let server = TcpListener::bind("127.0.0.1:0")
1494            .await
1495            .expect("localhost bind failed");
1496        let addr = server.local_addr().unwrap();
1497        let jh = tokio::spawn(async move {
1498            while let Ok((stream, _)) = server.accept().await {
1499                if accept_first {
1500                    // Send connection handshake to accept the connection (fail later)
1501                    let stream = tokio_tungstenite::accept_async(stream)
1502                        .await
1503                        .unwrap();
1504                    sleep(Duration::from_millis(10)).await;
1505                    drop(stream)
1506                } else {
1507                    // Close the connection to simulate a failure
1508                    drop(stream);
1509                }
1510            }
1511        });
1512        (addr, jh)
1513    }
1514
1515    #[test_log::test(tokio::test)]
1516    async fn test_subscribe_dead_client_after_max_attempts() {
1517        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1518        let client = WsDeltasClient::new_with_reconnects(
1519            &format!("ws://{addr}"),
1520            None,
1521            3,
1522            Duration::from_secs(0),
1523        )
1524        .unwrap();
1525
1526        let join_handle = client.connect().await.unwrap();
1527        let handle_res = join_handle.await.unwrap();
1528        assert!(handle_res.is_err());
1529        assert!(!client.is_connected().await);
1530
1531        let subscription_res = timeout(
1532            Duration::from_millis(10),
1533            client.subscribe(
1534                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1535                SubscriptionOptions::new(),
1536            ),
1537        )
1538        .await
1539        .unwrap();
1540        assert!(subscription_res.is_err());
1541    }
1542
1543    #[test_log::test(tokio::test)]
1544    async fn test_ws_client_retry_cooldown() {
1545        let start = std::time::Instant::now();
1546        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1547
1548        // Use the mock server that immediately drops connections
1549        let client = WsDeltasClient::new_with_reconnects(
1550            &format!("ws://{addr}"),
1551            None,
1552            3,                         // 3 attempts total (so 2 retries with cooldowns)
1553            Duration::from_millis(50), // 50ms cooldown
1554        )
1555        .unwrap();
1556
1557        // Try to connect - this should fail after retries but still measure the time
1558        let connect_result = client.connect().await;
1559        let elapsed = start.elapsed();
1560
1561        // Connection should fail after exhausting retries
1562        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1563
1564        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1565        assert!(
1566            elapsed >= Duration::from_millis(100),
1567            "Expected at least 100ms elapsed, got {:?}",
1568            elapsed
1569        );
1570
1571        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1572        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1573    }
1574
1575    #[test_log::test(tokio::test)]
1576    async fn test_buffer_full_triggers_unsubscribe() {
1577        // Expected communication sequence for buffer full scenario
1578        let exp_comm = {
1579            [
1580            // 1. Client subscribes
1581            ExpectedComm::Receive(
1582                100,
1583                tungstenite::protocol::Message::Text(
1584                    subscribe(),
1585                ),
1586            ),
1587            // 2. Server confirms subscription
1588            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1589                subscription_confirmation(),
1590            )),
1591            // 3. Server sends first message (fills buffer)
1592            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1593                r#"
1594                {
1595                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1596                    "deltas": {
1597                        "extractor": "vm:ambient",
1598                        "chain": "ethereum",
1599                        "block": {
1600                            "number": 123,
1601                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1602                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1603                            "chain": "ethereum",
1604                            "ts": "2023-09-14T00:00:00"
1605                        },
1606                        "finalized_block_height": 0,
1607                        "revert": false,
1608                        "new_tokens": {},
1609                        "account_updates": {},
1610                        "state_updates": {},
1611                        "new_protocol_components": {},
1612                        "deleted_protocol_components": {},
1613                        "component_balances": {},
1614                        "account_balances": {},
1615                        "component_tvl": {},
1616                        "dci_update": {
1617                            "new_entrypoints": {},
1618                            "new_entrypoint_params": {},
1619                            "trace_results": {}
1620                        }
1621                    }
1622                }
1623                "#.to_owned()
1624            )),
1625            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1626            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1627                r#"
1628                {
1629                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1630                    "deltas": {
1631                        "extractor": "vm:ambient",
1632                        "chain": "ethereum",
1633                        "block": {
1634                            "number": 124,
1635                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1636                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1637                            "chain": "ethereum",
1638                            "ts": "2023-09-14T00:00:01"
1639                        },
1640                        "finalized_block_height": 0,
1641                        "revert": false,
1642                        "new_tokens": {},
1643                        "account_updates": {},
1644                        "state_updates": {},
1645                        "new_protocol_components": {},
1646                        "deleted_protocol_components": {},
1647                        "component_balances": {},
1648                        "account_balances": {},
1649                        "component_tvl": {},
1650                        "dci_update": {
1651                            "new_entrypoints": {},
1652                            "new_entrypoint_params": {},
1653                            "trace_results": {}
1654                        }
1655                    }
1656                }
1657                "#.to_owned()
1658            )),
1659            // 5. Expect unsubscribe command due to buffer full
1660            ExpectedComm::Receive(
1661                100,
1662                tungstenite::protocol::Message::Text(
1663                    unsubscribe(),
1664                ),
1665            ),
1666            // 6. Server confirms unsubscription
1667            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1668                subscription_ended(),
1669            )),
1670        ]
1671        };
1672
1673        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1674
1675        // Create client with very small buffer size (1) to easily trigger BufferFull
1676        let client = WsDeltasClient::new_with_custom_buffers(
1677            &format!("ws://{addr}"),
1678            None,
1679            128, // ws_buffer_size
1680            1,   // subscription_buffer_size - this will trigger BufferFull easily
1681        )
1682        .unwrap();
1683
1684        let jh = client
1685            .connect()
1686            .await
1687            .expect("connect failed");
1688
1689        let (_sub_id, mut rx) = timeout(
1690            Duration::from_millis(100),
1691            client.subscribe(
1692                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1693                SubscriptionOptions::new().with_compression(false),
1694            ),
1695        )
1696        .await
1697        .expect("subscription timed out")
1698        .expect("subscription failed");
1699
1700        // Allow time for messages to be processed and buffer to fill up
1701        tokio::time::sleep(Duration::from_millis(100)).await;
1702
1703        // Collect all messages until channel closes or we get a reasonable number
1704        let mut received_msgs = Vec::new();
1705
1706        // Use a single longer timeout to collect messages until channel closes
1707        while received_msgs.len() < 3 {
1708            match timeout(Duration::from_millis(200), rx.recv()).await {
1709                Ok(Some(msg)) => {
1710                    received_msgs.push(msg);
1711                }
1712                Ok(None) => {
1713                    // Channel closed - this is what we expect after buffer overflow
1714                    break;
1715                }
1716                Err(_) => {
1717                    // Timeout - no more messages coming
1718                    break;
1719                }
1720            }
1721        }
1722
1723        // Verify the key behavior: buffer overflow should limit messages and close channel
1724        assert!(
1725            received_msgs.len() <= 1,
1726            "Expected buffer overflow to limit messages to at most 1, got {}",
1727            received_msgs.len()
1728        );
1729
1730        if let Some(first_msg) = received_msgs.first() {
1731            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1732        }
1733
1734        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1735        // been verified We don't need to explicitly close the client as it will be cleaned
1736        // up when dropped
1737
1738        // Just wait for the tasks to finish cleanly
1739        drop(rx); // Explicitly drop the receiver
1740        tokio::time::sleep(Duration::from_millis(50)).await;
1741
1742        // Abort the tasks to clean up
1743        jh.abort();
1744        server_thread.abort();
1745
1746        let _ = jh.await;
1747        let _ = server_thread.await;
1748    }
1749
1750    #[tokio::test]
1751    async fn test_server_error_handling() {
1752        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1753
1754        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1755
1756        // Test ExtractorNotFound error
1757        let error_response = WebSocketMessage::Response(Response::Error(
1758            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1759        ));
1760        let error_json = serde_json::to_string(&error_response).unwrap();
1761
1762        let exp_comm = [
1763            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1764            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1765        ];
1766
1767        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1768
1769        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1770        let jh = client
1771            .connect()
1772            .await
1773            .expect("connect failed");
1774
1775        let result = timeout(
1776            Duration::from_millis(100),
1777            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1778        )
1779        .await
1780        .expect("subscription timed out");
1781
1782        // Verify that we get a ServerError
1783        assert!(result.is_err());
1784        if let Err(DeltasError::ServerError(msg, _)) = result {
1785            assert!(msg.contains("Subscription failed"));
1786            assert!(msg.contains("Extractor not found"));
1787        } else {
1788            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1789        }
1790
1791        timeout(Duration::from_millis(100), client.close())
1792            .await
1793            .expect("close timed out")
1794            .expect("close failed");
1795        jh.await
1796            .expect("ws loop errored")
1797            .unwrap();
1798        server_thread.await.unwrap();
1799    }
1800
1801    #[test_log::test(tokio::test)]
1802    async fn test_subscription_not_found_error() {
1803        // Test scenario: Server restart causes subscription loss
1804        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1805
1806        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1807        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1808
1809        let error_response = WebSocketMessage::Response(Response::Error(
1810            WebsocketError::SubscriptionNotFound(subscription_id),
1811        ));
1812        let error_json = serde_json::to_string(&error_response).unwrap();
1813
1814        let exp_comm = [
1815            // 1. Client subscribes successfully
1816            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1817            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1818            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1819            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1820            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1821            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1822        ];
1823
1824        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1825
1826        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1827        let jh = client
1828            .connect()
1829            .await
1830            .expect("connect failed");
1831
1832        // Subscribe successfully
1833        let (received_sub_id, _rx) = timeout(
1834            Duration::from_millis(100),
1835            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1836        )
1837        .await
1838        .expect("subscription timed out")
1839        .expect("subscription failed");
1840
1841        assert_eq!(received_sub_id, subscription_id);
1842
1843        // Now try to unsubscribe - this should fail because server "restarted"
1844        let unsubscribe_result =
1845            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1846                .await
1847                .expect("unsubscribe timed out");
1848
1849        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1850        // In this case, the client should treat it as successful since the subscription
1851        // is effectively gone (whether due to server restart or other reasons)
1852        unsubscribe_result
1853            .expect("Unsubscribe should succeed even if server says subscription not found");
1854
1855        timeout(Duration::from_millis(100), client.close())
1856            .await
1857            .expect("close timed out")
1858            .expect("close failed");
1859        jh.await
1860            .expect("ws loop errored")
1861            .unwrap();
1862        server_thread.await.unwrap();
1863    }
1864
1865    #[test_log::test(tokio::test)]
1866    async fn test_parse_error_handling() {
1867        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1868
1869        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1870        let error_response = WebSocketMessage::Response(Response::Error(
1871            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1872        ));
1873        let error_json = serde_json::to_string(&error_response).unwrap();
1874
1875        let exp_comm = [
1876            // subscribe first so connect can finish successfully
1877            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1878            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1879        ];
1880
1881        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1882
1883        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1884        let jh = client
1885            .connect()
1886            .await
1887            .expect("connect failed");
1888
1889        // Subscribe successfully
1890        let _ = timeout(
1891            Duration::from_millis(100),
1892            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1893        )
1894        .await
1895        .expect("subscription timed out");
1896
1897        // The client should receive the parse error and close the connection
1898        let result = jh
1899            .await
1900            .expect("ws loop should complete");
1901        assert!(result.is_err());
1902        if let Err(DeltasError::ServerError(message, _)) = result {
1903            assert!(message.contains("Server failed to parse client message"));
1904        } else {
1905            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1906        }
1907
1908        server_thread.await.unwrap();
1909    }
1910
1911    #[test_log::test(tokio::test)]
1912    async fn test_compression_error_handling() {
1913        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1914
1915        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1916        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1917        let error_response = WebSocketMessage::Response(Response::Error(
1918            WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1919        ));
1920        let error_json = serde_json::to_string(&error_response).unwrap();
1921
1922        let exp_comm = [
1923            // subscribe first so connect can finish successfully
1924            ExpectedComm::Receive(
1925                100,
1926                tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1927            ),
1928            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1929        ];
1930
1931        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1932
1933        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1934        let jh = client
1935            .connect()
1936            .await
1937            .expect("connect failed");
1938
1939        // Subscribe successfully with compression disabled
1940        let _ = timeout(
1941            Duration::from_millis(100),
1942            client.subscribe(extractor_id, SubscriptionOptions::new()),
1943        )
1944        .await
1945        .expect("subscription timed out");
1946
1947        // The client should receive the parse error
1948        let result = jh
1949            .await
1950            .expect("ws loop should complete");
1951        assert!(result.is_err());
1952        if let Err(DeltasError::ServerError(message, _)) = result {
1953            assert!(message.contains("Server failed to compress message for subscription"));
1954        } else {
1955            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1956        }
1957
1958        server_thread.await.unwrap();
1959    }
1960
1961    #[tokio::test]
1962    async fn test_subscribe_error_handling() {
1963        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1964
1965        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1966
1967        let error_response = WebSocketMessage::Response(Response::Error(
1968            WebsocketError::SubscribeError(extractor_id.clone()),
1969        ));
1970        let error_json = serde_json::to_string(&error_response).unwrap();
1971
1972        let exp_comm = [
1973            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1974            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1975        ];
1976
1977        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1978
1979        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1980        let jh = client
1981            .connect()
1982            .await
1983            .expect("connect failed");
1984
1985        let result = timeout(
1986            Duration::from_millis(100),
1987            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1988        )
1989        .await
1990        .expect("subscription timed out");
1991
1992        // Verify that we get a ServerError for subscribe failure
1993        assert!(result.is_err());
1994        if let Err(DeltasError::ServerError(msg, _)) = result {
1995            assert!(msg.contains("Subscription failed"));
1996            assert!(msg.contains("Failed to subscribe to extractor"));
1997        } else {
1998            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1999        }
2000
2001        timeout(Duration::from_millis(100), client.close())
2002            .await
2003            .expect("close timed out")
2004            .expect("close failed");
2005        jh.await
2006            .expect("ws loop errored")
2007            .unwrap();
2008        server_thread.await.unwrap();
2009    }
2010
2011    #[tokio::test]
2012    async fn test_cancel_pending_subscription() {
2013        // This test verifies that pending subscriptions are properly cancelled when errors occur
2014        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2015
2016        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2017
2018        let error_response = WebSocketMessage::Response(Response::Error(
2019            WebsocketError::ExtractorNotFound(extractor_id.clone()),
2020        ));
2021        let error_json = serde_json::to_string(&error_response).unwrap();
2022
2023        let exp_comm = [
2024            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2025            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2026        ];
2027
2028        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2029
2030        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2031        let jh = client
2032            .connect()
2033            .await
2034            .expect("connect failed");
2035
2036        // Start two subscription attempts simultaneously
2037        let client_clone = client.clone();
2038        let extractor_id_clone = extractor_id.clone();
2039
2040        let subscription1 = tokio::spawn({
2041            let client_for_spawn = client.clone();
2042            async move {
2043                client_for_spawn
2044                    .subscribe(extractor_id, SubscriptionOptions::new().with_compression(false))
2045                    .await
2046            }
2047        });
2048
2049        let subscription2 = tokio::spawn(async move {
2050            // This should fail because there's already a pending subscription
2051            client_clone
2052                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2053                .await
2054        });
2055
2056        let (result1, result2) = tokio::join!(subscription1, subscription2);
2057
2058        let result1 = result1.unwrap();
2059        let result2 = result2.unwrap();
2060
2061        // One should fail due to ExtractorNotFound error from server
2062        // The other should fail due to SubscriptionAlreadyPending
2063        assert!(result1.is_err() || result2.is_err());
2064
2065        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2066            // This is expected for the second subscription
2067        } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2068            // This is expected for the first subscription that gets the server error
2069        } else {
2070            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2071        }
2072
2073        timeout(Duration::from_millis(100), client.close())
2074            .await
2075            .expect("close timed out")
2076            .expect("close failed");
2077        jh.await
2078            .expect("ws loop errored")
2079            .unwrap();
2080        server_thread.await.unwrap();
2081    }
2082
2083    #[tokio::test]
2084    async fn test_force_unsubscribe_prevents_multiple_calls() {
2085        // Test that force_unsubscribe prevents sending duplicate unsubscribe commands
2086        // when called multiple times for the same subscription_id
2087
2088        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
2089
2090        let exp_comm = [
2091            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2092            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
2093            // Expect only ONE unsubscribe message, even though force_unsubscribe is called twice
2094            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
2095            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
2096        ];
2097
2098        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2099
2100        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2101        let jh = client
2102            .connect()
2103            .await
2104            .expect("connect failed");
2105
2106        let (received_sub_id, _rx) = timeout(
2107            Duration::from_millis(100),
2108            client.subscribe(
2109                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2110                SubscriptionOptions::new().with_compression(false),
2111            ),
2112        )
2113        .await
2114        .expect("subscription timed out")
2115        .expect("subscription failed");
2116
2117        assert_eq!(received_sub_id, subscription_id);
2118
2119        // Access the inner state to call force_unsubscribe directly
2120        {
2121            let mut inner_guard = client.inner.lock().await;
2122            let inner = inner_guard
2123                .as_mut()
2124                .expect("client should be connected");
2125
2126            // Call force_unsubscribe twice - only the first should send an unsubscribe message
2127            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2128            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2129        }
2130
2131        // Give time for messages to be processed
2132        tokio::time::sleep(Duration::from_millis(50)).await;
2133
2134        // Close may fail if client disconnected after unsubscribe, which is fine
2135        let _ = timeout(Duration::from_millis(100), client.close()).await;
2136
2137        // Wait for tasks to complete
2138        let _ = jh.await;
2139        let _ = server_thread.await;
2140    }
2141}