Skip to main content

tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, MutexGuard, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::{
62    dto::{self, Command, Response, WebSocketMessage, WebsocketError},
63    models::{blockchain::BlockAggregatedChanges, ExtractorIdentity},
64};
65use uuid::Uuid;
66use zstd;
67
68use crate::TYCHO_SERVER_VERSION;
69
70#[derive(Error, Debug)]
71pub enum DeltasError {
72    /// Failed to parse the provided URI.
73    #[error("Failed to parse URI: {0}. Error: {1}")]
74    UriParsing(String, String),
75
76    /// The requested subscription is already pending and is awaiting confirmation from the server.
77    #[error("The requested subscription is already pending")]
78    SubscriptionAlreadyPending,
79
80    #[error("The server replied with an error: {0}")]
81    ServerError(String),
82
83    /// A message failed to send via an internal channel or through the websocket channel.
84    /// This is typically a fatal error and might indicate a bug in the implementation.
85    #[error("{0}")]
86    TransportError(String),
87
88    /// The internal message buffer is full. This likely means that messages are not being consumed
89    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
90    /// size.
91    #[error("The buffer is full!")]
92    BufferFull,
93
94    /// The client has no active connections but was accessed (e.g., by calling subscribe).
95    /// This typically occurs when trying to use the client before calling connect() or
96    /// after the connection has been closed.
97    #[error("The client is not connected!")]
98    NotConnected,
99
100    /// The connect method was called while the client already had an active connection.
101    #[error("The client is already connected!")]
102    AlreadyConnected,
103
104    /// The connection was closed orderly by the server, e.g. because it restarted.
105    #[error("The server closed the connection!")]
106    ConnectionClosed,
107
108    /// The connection was closed unexpectedly by the server or encountered a network error.
109    #[error("Connection error: {0}")]
110    ConnectionError(#[from] Box<tungstenite::Error>),
111
112    /// A fatal error occurred that cannot be recovered from.
113    #[error("Tycho FatalError: {0}")]
114    Fatal(String),
115}
116
117#[derive(Clone, Debug)]
118pub struct SubscriptionOptions {
119    include_state: bool,
120    compression: bool,
121    partial_blocks: bool,
122}
123
124impl Default for SubscriptionOptions {
125    fn default() -> Self {
126        Self { include_state: true, compression: true, partial_blocks: false }
127    }
128}
129
130impl SubscriptionOptions {
131    pub fn new() -> Self {
132        Self::default()
133    }
134    pub fn with_state(mut self, val: bool) -> Self {
135        self.include_state = val;
136        self
137    }
138    pub fn with_compression(mut self, val: bool) -> Self {
139        self.compression = val;
140        self
141    }
142    pub fn with_partial_blocks(mut self, val: bool) -> Self {
143        self.partial_blocks = val;
144        self
145    }
146}
147
148#[cfg_attr(test, automock)]
149#[async_trait]
150pub trait DeltasClient {
151    /// Subscribe to an extractor and receive realtime messages
152    ///
153    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
154    /// cancels while waiting for confirmation the subscription may still be registered. If the
155    /// receiver was deallocated though, the first message from the subscription will remove it
156    /// again - since there is no one to inform about these messages.
157    async fn subscribe(
158        &self,
159        extractor_id: ExtractorIdentity,
160        options: SubscriptionOptions,
161    ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>;
162
163    /// Unsubscribe from an subscription
164    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
165
166    /// Start the clients message handling loop.
167    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
168
169    /// Close the clients message handling loop.
170    async fn close(&self) -> Result<(), DeltasError>;
171}
172
173#[derive(Clone)]
174pub struct WsDeltasClient {
175    /// The tycho indexer websocket uri.
176    uri: Uri,
177    /// Authorization key for the websocket connection.
178    auth_key: Option<String>,
179    /// Maximum amount of reconnects to try before giving up.
180    max_reconnects: u64,
181    /// Duration to wait before attempting to reconnect
182    retry_cooldown: Duration,
183    /// The client will buffer this many messages incoming from the websocket
184    /// before starting to drop them.
185    ws_buffer_size: usize,
186    /// The client will buffer that many messages for each subscription before it starts dropping
187    /// them.
188    subscription_buffer_size: usize,
189    /// Notify tasks waiting for a connection to be established.
190    conn_notify: Arc<Notify>,
191    /// Shared client instance state.
192    inner: Arc<Mutex<Option<Inner>>>,
193    /// If set the client has exhausted its reconnection attempts
194    dead: Arc<AtomicBool>,
195}
196
197type WebSocketSink =
198    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
199
200/// Subscription State
201///
202/// Subscription go through a lifecycle:
203///
204/// ```text
205/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
206/// ```
207///
208/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
209/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
210/// for `unsubscribe`.
211#[derive(Debug)]
212enum SubscriptionInfo {
213    /// Subscription was requested we wait for server confirmation and uuid assignment.
214    RequestedSubscription(
215        oneshot::Sender<Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>>,
216    ),
217    /// Subscription is active.
218    Active,
219    /// Unsubscription was requested, we wait for server confirmation.
220    RequestedUnsubscription(oneshot::Sender<()>),
221}
222
223/// Internal struct containing shared state between of WsDeltaClient instances.
224struct Inner {
225    /// Websocket sender handle.
226    sink: WebSocketSink,
227    /// Command channel sender handle.
228    cmd_tx: Sender<()>,
229    /// Currently pending subscriptions, keyed by model-layer extractor identity.
230    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
231    /// Active subscriptions.
232    subscriptions: HashMap<Uuid, SubscriptionInfo>,
233    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
234    /// subscribe.
235    sender: HashMap<Uuid, Sender<BlockAggregatedChanges>>,
236    /// How many messages to buffer per subscription before starting to drop new messages.
237    buffer_size: usize,
238}
239
240/// Shared state between all client instances.
241///
242/// This state is behind a mutex and requires synchronization to be read of modified.
243impl Inner {
244    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
245        Self {
246            sink,
247            cmd_tx,
248            pending: HashMap::new(),
249            subscriptions: HashMap::new(),
250            sender: HashMap::new(),
251            buffer_size,
252        }
253    }
254
255    /// Registers a new pending subscription.
256    #[allow(clippy::result_large_err)]
257    fn new_subscription(
258        &mut self,
259        id: &ExtractorIdentity,
260        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError>>,
261    ) -> Result<(), DeltasError> {
262        if self.pending.contains_key(id) {
263            return Err(DeltasError::SubscriptionAlreadyPending);
264        }
265        self.pending
266            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
267        Ok(())
268    }
269
270    /// Transitions a pending subscription to active.
271    ///
272    /// Will ignore any request to do so for subscriptions that are not pending.
273    fn mark_active(&mut self, extractor_id: ExtractorIdentity, subscription_id: Uuid) {
274        if let Some(info) = self.pending.remove(&extractor_id) {
275            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
276                let (tx, rx) = mpsc::channel(self.buffer_size);
277                self.sender.insert(subscription_id, tx);
278                self.subscriptions
279                    .insert(subscription_id, SubscriptionInfo::Active);
280                let _ = ready_tx
281                    .send(Ok((subscription_id, rx)))
282                    .map_err(|_| {
283                        warn!(
284                            ?extractor_id,
285                            ?subscription_id,
286                            "Subscriber for has gone away. Ignoring."
287                        )
288                    });
289            } else {
290                error!(
291                    ?extractor_id,
292                    ?subscription_id,
293                    "Pending subscription was not in the correct state to 
294                    transition to active. Ignoring!"
295                )
296            }
297        } else {
298            error!(
299                ?extractor_id,
300                ?subscription_id,
301                "Tried to mark an unknown subscription as active. Ignoring!"
302            );
303        }
304    }
305
306    /// Sends a message to a subscription's receiver.
307    #[allow(clippy::result_large_err)]
308    fn send(&mut self, id: &Uuid, msg: BlockAggregatedChanges) -> Result<(), DeltasError> {
309        if let Some(sender) = self.sender.get_mut(id) {
310            sender
311                .try_send(msg)
312                .map_err(|e| match e {
313                    TrySendError::Full(_) => DeltasError::BufferFull,
314                    TrySendError::Closed(_) => {
315                        DeltasError::TransportError("The subscriber has gone away".to_string())
316                    }
317                })?;
318        }
319        Ok(())
320    }
321
322    /// Requests a subscription to end.
323    ///
324    /// The subscription needs to exist and be active for this to have any effect. Wll use
325    /// `ready_tx` to notify the receiver once the transition to ended completed.
326    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
327        if let Some(info) = self
328            .subscriptions
329            .get_mut(subscription_id)
330        {
331            if let SubscriptionInfo::Active = info {
332                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
333            }
334        } else {
335            // no big deal imo so only debug lvl...
336            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
337        }
338    }
339
340    /// Removes and fully ends a subscription
341    ///
342    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
343    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
344    /// Will remove a subscription even it was in active or pending state before, this is to support
345    /// any server side failure of the subscription.
346    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
347        if let Entry::Occupied(e) = self
348            .subscriptions
349            .entry(subscription_id)
350        {
351            let info = e.remove();
352            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
353                let _ = tx.send(()).map_err(|_| {
354                    debug!(?subscription_id, "failed to notify about removed subscription")
355                });
356                self.sender
357                    .remove(&subscription_id)
358                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
359            } else {
360                warn!(?subscription_id, "Subscription ended unexpectedly!");
361                self.sender
362                    .remove(&subscription_id)
363                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
364            }
365        } else {
366            // TODO: There is a race condition that can trigger multiple unsubscribes
367            //  if server doesn't respond quickly enough leading to some ugly logs but
368            //  doesn't affect behaviour negatively. E.g. BufferFull and multiple
369            //  messages from the ws connection are queued.
370            trace!(
371                ?subscription_id,
372                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
373            );
374        }
375
376        Ok(())
377    }
378
379    fn cancel_pending(&mut self, extractor_id: ExtractorIdentity, error: &WebsocketError) {
380        if let Some(sub_info) = self.pending.remove(&extractor_id) {
381            match sub_info {
382                SubscriptionInfo::RequestedSubscription(tx) => {
383                    let _ = tx
384                        .send(Err(DeltasError::ServerError(format!(
385                            "Subscription failed: {error}"
386                        ))))
387                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
388                }
389                _ => {
390                    error!(?extractor_id, "Pending subscription in wrong state")
391                }
392            }
393        } else {
394            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
395        }
396    }
397
398    /// Sends a message through the websocket.
399    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
400        self.sink.send(msg).await.map_err(|e| {
401            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
402        })
403    }
404}
405
406/// Tycho client websocket implementation.
407impl WsDeltasClient {
408    // Construct a new client with 5 reconnection attempts.
409    #[allow(clippy::result_large_err)]
410    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
411        let uri = ws_uri
412            .parse::<Uri>()
413            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
414        Ok(Self {
415            uri,
416            auth_key: auth_key.map(|s| s.to_string()),
417            inner: Arc::new(Mutex::new(None)),
418            ws_buffer_size: 128,
419            subscription_buffer_size: 128,
420            conn_notify: Arc::new(Notify::new()),
421            max_reconnects: 5,
422            retry_cooldown: Duration::from_millis(500),
423            dead: Arc::new(AtomicBool::new(false)),
424        })
425    }
426
427    // Construct a new client with a custom number of reconnection attempts.
428    #[allow(clippy::result_large_err)]
429    pub fn new_with_reconnects(
430        ws_uri: &str,
431        auth_key: Option<&str>,
432        max_reconnects: u64,
433        retry_cooldown: Duration,
434    ) -> Result<Self, DeltasError> {
435        let uri = ws_uri
436            .parse::<Uri>()
437            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
438
439        Ok(Self {
440            uri,
441            auth_key: auth_key.map(|s| s.to_string()),
442            inner: Arc::new(Mutex::new(None)),
443            ws_buffer_size: 128,
444            subscription_buffer_size: 128,
445            conn_notify: Arc::new(Notify::new()),
446            max_reconnects,
447            retry_cooldown,
448            dead: Arc::new(AtomicBool::new(false)),
449        })
450    }
451
452    // Construct a new client with custom buffer sizes (for testing)
453    #[cfg(test)]
454    #[allow(clippy::result_large_err)]
455    pub fn new_with_custom_buffers(
456        ws_uri: &str,
457        auth_key: Option<&str>,
458        ws_buffer_size: usize,
459        subscription_buffer_size: usize,
460    ) -> Result<Self, DeltasError> {
461        let uri = ws_uri
462            .parse::<Uri>()
463            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
464        Ok(Self {
465            uri,
466            auth_key: auth_key.map(|s| s.to_string()),
467            inner: Arc::new(Mutex::new(None)),
468            ws_buffer_size,
469            subscription_buffer_size,
470            conn_notify: Arc::new(Notify::new()),
471            max_reconnects: 5,
472            retry_cooldown: Duration::from_millis(0),
473            dead: Arc::new(AtomicBool::new(false)),
474        })
475    }
476
477    /// Ensures that the client is connected.
478    ///
479    /// This method will acquire the lock for inner.
480    async fn is_connected(&self) -> bool {
481        let guard = self.inner.as_ref().lock().await;
482        guard.is_some()
483    }
484
485    /// Waits for the client to be connected
486    ///
487    /// This method acquires the lock for inner for a short period, then waits until the
488    /// connection is established if not already connected.
489    async fn ensure_connection(&self) -> Result<(), DeltasError> {
490        if self.dead.load(Ordering::SeqCst) {
491            return Err(DeltasError::NotConnected);
492        }
493        if self.is_connected().await {
494            return Ok(());
495        }
496        // Enable the future BEFORE re-checking is_connected to close the race window where
497        // the reconnect task calls notify_waiters() between is_connected() returning false and
498        // notified().await — without enable(), that notification would be lost and this call
499        // would block until the next reconnect.
500        let notified = self.conn_notify.notified();
501        tokio::pin!(notified);
502        notified.as_mut().enable();
503        if !self.is_connected().await {
504            notified.await;
505        }
506        if self.dead.load(Ordering::SeqCst) {
507            return Err(DeltasError::NotConnected);
508        }
509        if !self.is_connected().await {
510            return Err(DeltasError::NotConnected);
511        }
512        Ok(())
513    }
514
515    /// Main message handling logic
516    ///
517    /// If the message returns an error, a reconnect attempt may be considered depending on the
518    /// error type.
519    #[instrument(skip(self, msg))]
520    async fn handle_msg(
521        &self,
522        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
523    ) -> Result<(), DeltasError> {
524        let mut guard = self.inner.lock().await;
525
526        match msg {
527            // We do not deserialize the message directly into a WebSocketMessage. This is because
528            // the serde arbitrary_precision feature (often included in many
529            // dependencies we use) breaks some untagged enum deserializations. Instead,
530            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
531            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
532                serde_json::Value,
533            >(&text)
534            {
535                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
536                    Ok(ws_message) => match ws_message {
537                        WebSocketMessage::BlockAggregatedChanges { subscription_id, deltas } => {
538                            Self::handle_block_changes_msg(&mut guard, subscription_id, deltas)
539                                .await?;
540                        }
541                        WebSocketMessage::Response(Response::NewSubscription {
542                            extractor_id,
543                            subscription_id,
544                        }) => {
545                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
546                            let inner = guard
547                                .as_mut()
548                                .ok_or_else(|| DeltasError::NotConnected)?;
549                            inner.mark_active(extractor_id.into(), subscription_id);
550                        }
551                        WebSocketMessage::Response(Response::SubscriptionEnded {
552                            subscription_id,
553                        }) => {
554                            info!(?subscription_id, "Received a subscription ended");
555                            let inner = guard
556                                .as_mut()
557                                .ok_or_else(|| DeltasError::NotConnected)?;
558                            inner.remove_subscription(subscription_id)?;
559                        }
560                        WebSocketMessage::Response(Response::Error(error)) => match &error {
561                            WebsocketError::ExtractorNotFound(extractor_id) => {
562                                let inner = guard
563                                    .as_mut()
564                                    .ok_or_else(|| DeltasError::NotConnected)?;
565                                inner.cancel_pending(extractor_id.clone().into(), &error);
566                            }
567                            WebsocketError::SubscriptionNotFound(subscription_id) => {
568                                debug!("Received subscription not found, removing subscription");
569                                let inner = guard
570                                    .as_mut()
571                                    .ok_or_else(|| DeltasError::NotConnected)?;
572                                inner.remove_subscription(*subscription_id)?;
573                            }
574                            WebsocketError::ParseError(raw, e) => {
575                                return Err(DeltasError::ServerError(format!(
576                                    "Server failed to parse client message: {e}, msg: {raw}"
577                                )))
578                            }
579                            WebsocketError::CompressionError(subscription_id, e) => {
580                                return Err(DeltasError::ServerError(format!(
581                                    "Server failed to compress message for subscription: \
582                                     {subscription_id}, error: {e}"
583                                )))
584                            }
585                            WebsocketError::SubscribeError(extractor_id) => {
586                                let inner = guard
587                                    .as_mut()
588                                    .ok_or_else(|| DeltasError::NotConnected)?;
589                                inner.cancel_pending(extractor_id.clone().into(), &error);
590                            }
591                        },
592                    },
593                    Err(e) => {
594                        error!(
595                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
596                            e, text
597                        );
598                    }
599                },
600                Err(e) => {
601                    error!(
602                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
603                        e, text
604                    );
605                }
606            },
607            Ok(tungstenite::protocol::Message::Binary(data)) => {
608                // Decompress the zstd-compressed data,
609                // Note that we only support compressed BlockAggregatedChanges messages for now.
610                match zstd::decode_all(data.as_slice()) {
611                    Ok(decompressed) => {
612                        match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
613                            Ok(value) => {
614                                match serde_json::from_value::<WebSocketMessage>(value.clone()) {
615                                    Ok(ws_message) => match ws_message {
616                                        WebSocketMessage::BlockAggregatedChanges {
617                                            subscription_id,
618                                            deltas,
619                                        } => {
620                                            Self::handle_block_changes_msg(
621                                                &mut guard,
622                                                subscription_id,
623                                                deltas,
624                                            )
625                                            .await?;
626                                        }
627                                        _ => {
628                                            error!(
629                                                "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
630                                            );
631                                        }
632                                    },
633                                    Err(e) => {
634                                        error!(
635                                            "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
636                                        );
637                                    }
638                                }
639                            }
640                            Err(e) => {
641                                error!(
642                                    "Failed to deserialize compressed message: invalid JSON. {e}",
643                                );
644                            }
645                        }
646                    }
647                    Err(e) => {
648                        error!("Failed to decompress zstd data: {}", e);
649                    }
650                }
651            }
652            Ok(tungstenite::protocol::Message::Ping(_)) => {
653                // Respond to pings with pongs.
654                let inner = guard
655                    .as_mut()
656                    .ok_or_else(|| DeltasError::NotConnected)?;
657                if let Err(error) = inner
658                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
659                    .await
660                {
661                    debug!(?error, "Failed to send pong!");
662                }
663            }
664            Ok(tungstenite::protocol::Message::Pong(_)) => {
665                // Do nothing.
666            }
667            Ok(tungstenite::protocol::Message::Close(frame)) => {
668                match &frame {
669                    Some(f) => {
670                        warn!(code = ?f.code, reason = %f.reason, "WebSocket closed by server")
671                    }
672                    None => warn!("WebSocket closed by server (no close frame)"),
673                }
674                return Err(DeltasError::ConnectionClosed);
675            }
676            Ok(unknown_msg) => {
677                info!("Received an unknown message type: {:?}", unknown_msg);
678            }
679            Err(error) => {
680                error!(?error, "Websocket error");
681                return Err(match error {
682                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
683                    tungstenite::Error::AlreadyClosed => {
684                        warn!("Received AlreadyClosed error which is indicative of a bug!");
685                        DeltasError::ConnectionError(Box::new(error))
686                    }
687                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
688                        DeltasError::ConnectionError(Box::new(error))
689                    }
690                    _ => DeltasError::Fatal(error.to_string()),
691                });
692            }
693        };
694        Ok(())
695    }
696
697    async fn handle_block_changes_msg(
698        guard: &mut MutexGuard<'_, Option<Inner>>,
699        subscription_id: Uuid,
700        deltas: dto::BlockAggregatedChanges,
701    ) -> Result<(), DeltasError> {
702        trace!(?deltas, "Received a block state change, sending to channel");
703        let inner = guard
704            .as_mut()
705            .ok_or_else(|| DeltasError::NotConnected)?;
706        match inner.send(&subscription_id, BlockAggregatedChanges::from(deltas)) {
707            Err(DeltasError::BufferFull) => {
708                error!(?subscription_id, "Buffer full, unsubscribing!");
709                Self::force_unsubscribe(subscription_id, inner).await;
710            }
711            Err(_) => {
712                warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
713                Self::force_unsubscribe(subscription_id, inner).await;
714            }
715            _ => { /* Do nothing */ }
716        }
717        Ok(())
718    }
719
720    /// Forcefully ends a (client) stream by unsubscribing.
721    ///
722    /// Is used only if the message can't be processed due to an error that might resolve
723    /// itself by resubscribing.
724    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
725        // avoid unsubscribing multiple times
726        if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
727            .subscriptions
728            .get(&subscription_id)
729        {
730            return;
731        }
732
733        let (tx, rx) = oneshot::channel();
734        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
735            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
736        } else {
737            // Wait for unsubscribe completion with timeout
738            match tokio::time::timeout(Duration::from_secs(5), rx).await {
739                Ok(_) => {
740                    debug!(?subscription_id, "Unsubscribe completed successfully");
741                }
742                Err(_) => {
743                    warn!(?subscription_id, "Unsubscribe completion timed out");
744                }
745            }
746        }
747    }
748
749    /// Helper method to force an unsubscription
750    ///
751    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
752    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
753    /// receivers.
754    async fn unsubscribe_inner(
755        inner: &mut Inner,
756        subscription_id: Uuid,
757        ready_tx: oneshot::Sender<()>,
758    ) -> Result<(), DeltasError> {
759        debug!(?subscription_id, "Unsubscribing");
760        inner.end_subscription(&subscription_id, ready_tx);
761        let cmd = Command::Unsubscribe { subscription_id };
762        inner
763            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
764                |e| {
765                    DeltasError::TransportError(format!(
766                        "Failed to serialize unsubscribe command: {e}"
767                    ))
768                },
769            )?))
770            .await?;
771        Ok(())
772    }
773}
774
775#[async_trait]
776impl DeltasClient for WsDeltasClient {
777    #[instrument(skip(self))]
778    async fn subscribe(
779        &self,
780        extractor_id: ExtractorIdentity,
781        options: SubscriptionOptions,
782    ) -> Result<(Uuid, Receiver<BlockAggregatedChanges>), DeltasError> {
783        trace!("Starting subscribe");
784        self.ensure_connection().await?;
785        let (ready_tx, ready_rx) = oneshot::channel();
786        {
787            let mut guard = self.inner.lock().await;
788            let inner = guard
789                .as_mut()
790                .ok_or_else(|| DeltasError::NotConnected)?;
791            trace!("Sending subscribe command");
792            inner.new_subscription(&extractor_id, ready_tx)?;
793            let cmd = Command::Subscribe {
794                extractor_id: extractor_id.into(),
795                include_state: options.include_state,
796                compression: options.compression,
797                partial_blocks: options.partial_blocks,
798            };
799            inner
800                .ws_send(tungstenite::protocol::Message::Text(
801                    serde_json::to_string(&cmd).map_err(|e| {
802                        DeltasError::TransportError(format!(
803                            "Failed to serialize subscribe command: {e}"
804                        ))
805                    })?,
806                ))
807                .await?;
808        }
809        trace!("Waiting for subscription response");
810        let res = tokio::time::timeout(Duration::from_secs(30), ready_rx)
811            .await
812            .map_err(|_| {
813                DeltasError::TransportError(
814                    "Subscribe confirmation timed out after 30s".to_string(),
815                )
816            })?
817            .map_err(|_| {
818                DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
819            })??;
820        trace!("Subscription successful");
821        Ok(res)
822    }
823
824    #[instrument(skip(self))]
825    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
826        self.ensure_connection().await?;
827        let (ready_tx, ready_rx) = oneshot::channel();
828        {
829            let mut guard = self.inner.lock().await;
830            let inner = guard
831                .as_mut()
832                .ok_or_else(|| DeltasError::NotConnected)?;
833
834            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
835        }
836        tokio::time::timeout(Duration::from_secs(5), ready_rx)
837            .await
838            .map_err(|_| {
839                warn!(?subscription_id, "Unsubscribe confirmation timed out after 5s");
840                DeltasError::TransportError(
841                    "Unsubscribe confirmation timed out after 5s".to_string(),
842                )
843            })?
844            .map_err(|_| {
845                DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
846            })?;
847
848        Ok(())
849    }
850
851    #[instrument(skip(self))]
852    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
853        if self.is_connected().await {
854            return Err(DeltasError::AlreadyConnected);
855        }
856        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
857        info!(?ws_uri, "Starting TychoWebsocketClient");
858
859        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
860        {
861            let mut guard = self.inner.as_ref().lock().await;
862            *guard = None;
863        }
864        let this = self.clone();
865        let jh = tokio::spawn(async move {
866            let mut retry_count = 0;
867            let mut result = Err(DeltasError::NotConnected);
868
869            'retry: while retry_count < this.max_reconnects {
870                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
871                if retry_count > 0 {
872                    sleep(this.retry_cooldown).await;
873                }
874
875                // Create a WebSocket request
876                let mut request_builder = Request::builder()
877                    .uri(&ws_uri)
878                    .header(SEC_WEBSOCKET_KEY, generate_key())
879                    .header(SEC_WEBSOCKET_VERSION, 13)
880                    .header(CONNECTION, "Upgrade")
881                    .header(UPGRADE, "websocket")
882                    .header(
883                        HOST,
884                        this.uri.host().ok_or_else(|| {
885                            DeltasError::UriParsing(
886                                ws_uri.clone(),
887                                "No host found in tycho url".to_string(),
888                            )
889                        })?,
890                    )
891                    .header(
892                        USER_AGENT,
893                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
894                    );
895
896                // Add Authorization if one is given
897                if let Some(ref key) = this.auth_key {
898                    request_builder = request_builder.header(AUTHORIZATION, key);
899                }
900
901                let request = request_builder.body(()).map_err(|e| {
902                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
903                })?;
904                let (conn, _) = match connect_async(request).await {
905                    Ok(conn) => conn,
906                    Err(e) => {
907                        // Prepare for reconnection
908                        retry_count += 1;
909                        let mut guard = this.inner.as_ref().lock().await;
910                        *guard = None;
911
912                        if let tungstenite::Error::Http(response) = &e {
913                            if response.status() == tungstenite::http::StatusCode::TOO_MANY_REQUESTS
914                            {
915                                let reason = response
916                                    .body()
917                                    .as_deref()
918                                    .and_then(|b| std::str::from_utf8(b).ok())
919                                    .unwrap_or("")
920                                    .to_string();
921                                warn!(reason, "WebSocket connection rejected: rate limited");
922                                continue 'retry;
923                            }
924                        }
925
926                        warn!(
927                            e = e.to_string(),
928                            "Failed to connect to WebSocket server; Reconnecting"
929                        );
930                        continue 'retry;
931                    }
932                };
933
934                let (ws_tx_new, ws_rx_new) = conn.split();
935                {
936                    let mut guard = this.inner.as_ref().lock().await;
937                    *guard =
938                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
939                }
940                let mut msg_rx = ws_rx_new.boxed();
941
942                info!("Connection Successful: TychoWebsocketClient started");
943                this.conn_notify.notify_waiters();
944                result = Ok(());
945
946                // If no WS frame arrives within this window the TCP connection is
947                // considered stalled (e.g. network cut without a TCP RST/FIN). The OS
948                // default keepalive fires after ~2 hours; this gives us an
949                // application-level detection that is orders of magnitude faster.
950                const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
951                loop {
952                    let res = tokio::select! {
953                        msg_result = tokio::time::timeout(IDLE_TIMEOUT, msg_rx.next()) => {
954                            match msg_result {
955                                Err(_elapsed) => {
956                                    warn!("No WS frame received for {IDLE_TIMEOUT:?}, \
957                                           treating connection as stalled; Reconnecting...");
958                                    retry_count += 1;
959                                    let mut guard = this.inner.as_ref().lock().await;
960                                    *guard = None;
961                                    break; // break inner loop → reconnect
962                                }
963                                Ok(Some(msg)) => this.handle_msg(msg).await,
964                                Ok(None) => {
965                                    // This code should not be reachable since the stream
966                                    // should return ConnectionClosed in the case above
967                                    // before it returns None here.
968                                    warn!("Websocket connection silently closed, giving up!");
969                                    break 'retry
970                                }
971                            }
972                        },
973                        _ = cmd_rx.recv() => {break 'retry},
974                    };
975                    if let Err(error) = res {
976                        debug!(?error, "WsError");
977                        if matches!(
978                            error,
979                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
980                        ) {
981                            // Prepare for reconnection
982                            retry_count += 1;
983                            let mut guard = this.inner.as_ref().lock().await;
984                            *guard = None;
985
986                            warn!(
987                                ?error,
988                                ?retry_count,
989                                "Connection dropped unexpectedly; Reconnecting..."
990                            );
991                            break;
992                        } else {
993                            // Other errors are considered fatal
994                            error!(?error, "Fatal error; Exiting");
995                            result = Err(error);
996                            break 'retry;
997                        }
998                    }
999                }
1000            }
1001            debug!(
1002                retry_count,
1003                max_reconnects=?this.max_reconnects,
1004                "Reconnection loop ended"
1005            );
1006            // Clean up before exiting
1007            let mut guard = this.inner.as_ref().lock().await;
1008            *guard = None;
1009
1010            // Check if max retries has been reached.
1011            if retry_count >= this.max_reconnects {
1012                error!("Max reconnection attempts reached; Exiting");
1013                this.dead.store(true, Ordering::SeqCst);
1014                this.conn_notify.notify_waiters(); // Notify that the task is done
1015                result = Err(DeltasError::ConnectionClosed);
1016            }
1017
1018            result
1019        });
1020
1021        self.conn_notify.notified().await;
1022
1023        if self.is_connected().await {
1024            Ok(jh)
1025        } else {
1026            Err(DeltasError::NotConnected)
1027        }
1028    }
1029
1030    #[instrument(skip(self))]
1031    async fn close(&self) -> Result<(), DeltasError> {
1032        info!("Closing TychoWebsocketClient");
1033        let mut guard = self.inner.lock().await;
1034        let inner = guard
1035            .as_mut()
1036            .ok_or_else(|| DeltasError::NotConnected)?;
1037        inner
1038            .cmd_tx
1039            .send(())
1040            .await
1041            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
1042        Ok(())
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use std::{net::SocketAddr, str::FromStr};
1049
1050    use tokio::{net::TcpListener, time::timeout};
1051    use tycho_common::models::Chain;
1052
1053    use super::*;
1054
1055    #[derive(Clone)]
1056    enum ExpectedComm {
1057        Receive(u64, tungstenite::protocol::Message),
1058        Send(tungstenite::protocol::Message),
1059    }
1060
1061    async fn mock_tycho_ws(
1062        messages: &[ExpectedComm],
1063        reconnects: usize,
1064    ) -> (SocketAddr, JoinHandle<()>) {
1065        info!("Starting mock webserver");
1066        // zero port here means the OS chooses an open port
1067        let server = TcpListener::bind("127.0.0.1:0")
1068            .await
1069            .expect("localhost bind failed");
1070        let addr = server.local_addr().unwrap();
1071        let messages = messages.to_vec();
1072
1073        let jh = tokio::spawn(async move {
1074            info!("mock webserver started");
1075            for _ in 0..(reconnects + 1) {
1076                info!("Awaiting client connections");
1077                if let Ok((stream, _)) = server.accept().await {
1078                    info!("Client connected");
1079                    let mut websocket = tokio_tungstenite::accept_async(stream)
1080                        .await
1081                        .unwrap();
1082
1083                    info!("Handling messages..");
1084                    for c in messages.iter().cloned() {
1085                        match c {
1086                            ExpectedComm::Receive(t, exp) => {
1087                                info!("Awaiting message...");
1088                                let msg = timeout(Duration::from_millis(t), websocket.next())
1089                                    .await
1090                                    .expect("Receive timeout")
1091                                    .expect("Stream exhausted")
1092                                    .expect("Failed to receive message.");
1093                                info!("Message received");
1094                                assert_eq!(msg, exp)
1095                            }
1096                            ExpectedComm::Send(data) => {
1097                                info!("Sending message");
1098                                websocket
1099                                    .send(data)
1100                                    .await
1101                                    .expect("Failed to send message");
1102                                info!("Message sent");
1103                            }
1104                        };
1105                    }
1106                    info!("Mock communication completed");
1107                    sleep(Duration::from_millis(100)).await;
1108                    // Close the WebSocket connection
1109                    let _ = websocket.close(None).await;
1110                    info!("Mock server closed connection");
1111                }
1112            }
1113            info!("mock server ended");
1114        });
1115        (addr, jh)
1116    }
1117
1118    const SUBSCRIPTION_ID: &str = "30b740d1-cf09-4e0e-8cfe-b1434d447ece";
1119
1120    fn subscribe() -> String {
1121        subscribe_with_compression(false)
1122    }
1123
1124    fn subscribe_with_compression(compression: bool) -> String {
1125        serde_json::json!({
1126            "method": "subscribe",
1127            "extractor_id": {
1128                "chain": "ethereum",
1129                "name": "vm:ambient"
1130            },
1131            "include_state": true,
1132            "compression": compression,
1133            "partial_blocks": false
1134        })
1135        .to_string()
1136    }
1137
1138    fn subscription_confirmation() -> String {
1139        r#"
1140        {
1141            "method": "newsubscription",
1142            "extractor_id":{
1143                "chain": "ethereum",
1144                "name": "vm:ambient"
1145            },
1146            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1147        }
1148        "#
1149        .replace(|c: char| c.is_whitespace(), "")
1150    }
1151
1152    fn block_deltas() -> String {
1153        r#"
1154        {
1155            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1156            "deltas": {
1157                "extractor": "vm:ambient",
1158                "chain": "ethereum",
1159                "block": {
1160                    "number": 123,
1161                    "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1162                    "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1163                    "chain": "ethereum",
1164                    "ts": "2023-09-14T00:00:00"
1165                },
1166                "finalized_block_height": 0,
1167                "revert": false,
1168                "new_tokens": {},
1169                "account_updates": {
1170                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1171                        "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1172                        "chain": "ethereum",
1173                        "slots": {},
1174                        "balance": "0x01f4",
1175                        "code": "",
1176                        "change": "Update"
1177                    }
1178                },
1179                "state_updates": {
1180                    "component_1": {
1181                        "component_id": "component_1",
1182                        "updated_attributes": {"attr1": "0x01"},
1183                        "deleted_attributes": ["attr2"]
1184                    }
1185                },
1186                "new_protocol_components":
1187                    { "protocol_1": {
1188                            "id": "protocol_1",
1189                            "protocol_system": "system_1",
1190                            "protocol_type_name": "type_1",
1191                            "chain": "ethereum",
1192                            "tokens": ["0x01", "0x02"],
1193                            "contract_ids": ["0x01", "0x02"],
1194                            "static_attributes": {"attr1": "0x01f4"},
1195                            "change": "Update",
1196                            "creation_tx": "0x01",
1197                            "created_at": "2023-09-14T00:00:00"
1198                        }
1199                    },
1200                "deleted_protocol_components": {},
1201                "component_balances": {
1202                    "protocol_1":
1203                        {
1204                            "0x01": {
1205                                "token": "0x01",
1206                                "balance": "0x01f4",
1207                                "balance_float": 0.0,
1208                                "modify_tx": "0x01",
1209                                "component_id": "protocol_1"
1210                            }
1211                        }
1212                },
1213                "account_balances": {
1214                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1215                        "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1216                            "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1217                            "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1218                            "balance": "0x01f4",
1219                            "modify_tx": "0x01"
1220                        }
1221                    }
1222                },
1223                "component_tvl": {
1224                    "protocol_1": 1000.0
1225                },
1226                "dci_update": {
1227                    "new_entrypoints": {},
1228                    "new_entrypoint_params": {},
1229                    "trace_results": {}
1230                }
1231            }
1232        }
1233        "#.replace(|c: char| c.is_whitespace(), "")
1234    }
1235
1236    fn unsubscribe() -> String {
1237        r#"
1238        {
1239            "method": "unsubscribe",
1240            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1241        }
1242        "#
1243        .replace(|c: char| c.is_whitespace(), "")
1244    }
1245
1246    fn subscription_ended() -> String {
1247        r#"
1248        {
1249            "method": "subscriptionended",
1250            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1251        }
1252        "#
1253        .replace(|c: char| c.is_whitespace(), "")
1254    }
1255
1256    #[tokio::test]
1257    async fn test_uncompressed_subscribe_receive() {
1258        let exp_comm = [
1259            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1260            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1261            ExpectedComm::Send(tungstenite::protocol::Message::Text(block_deltas())),
1262        ];
1263        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1264
1265        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1266        let jh = client
1267            .connect()
1268            .await
1269            .expect("connect failed");
1270        let (_, mut rx) = timeout(
1271            Duration::from_millis(100),
1272            client.subscribe(
1273                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1274                SubscriptionOptions::new().with_compression(false),
1275            ),
1276        )
1277        .await
1278        .expect("subscription timed out")
1279        .expect("subscription failed");
1280        let _ = timeout(Duration::from_millis(100), rx.recv())
1281            .await
1282            .expect("awaiting message timeout out")
1283            .expect("receiving message failed");
1284        timeout(Duration::from_millis(100), client.close())
1285            .await
1286            .expect("close timed out")
1287            .expect("close failed");
1288        jh.await
1289            .expect("ws loop errored")
1290            .unwrap();
1291        server_thread.await.unwrap();
1292    }
1293
1294    #[tokio::test]
1295    async fn test_compressed_subscribe_receive() {
1296        let compressed_block_deltas = zstd::encode_all(
1297            block_deltas().as_bytes(),
1298            0, // default compression level
1299        )
1300        .expect("Failed to compress block deltas message");
1301
1302        let exp_comm = [
1303            ExpectedComm::Receive(
1304                100,
1305                tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
1306            ),
1307            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1308            ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1309        ];
1310        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1311
1312        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1313        let jh = client
1314            .connect()
1315            .await
1316            .expect("connect failed");
1317        let (_, mut rx) = timeout(
1318            Duration::from_millis(100),
1319            client.subscribe(
1320                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1321                SubscriptionOptions::new().with_compression(true),
1322            ),
1323        )
1324        .await
1325        .expect("subscription timed out")
1326        .expect("subscription failed");
1327        let _ = timeout(Duration::from_millis(100), rx.recv())
1328            .await
1329            .expect("awaiting message timeout out")
1330            .expect("receiving message failed");
1331        timeout(Duration::from_millis(100), client.close())
1332            .await
1333            .expect("close timed out")
1334            .expect("close failed");
1335        jh.await
1336            .expect("ws loop errored")
1337            .unwrap();
1338        server_thread.await.unwrap();
1339    }
1340
1341    #[tokio::test]
1342    async fn test_unsubscribe() {
1343        let exp_comm = [
1344            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1345            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1346            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1347            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1348        ];
1349        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1350
1351        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1352        let jh = client
1353            .connect()
1354            .await
1355            .expect("connect failed");
1356        let (sub_id, mut rx) = timeout(
1357            Duration::from_millis(100),
1358            client.subscribe(
1359                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1360                SubscriptionOptions::new().with_compression(false),
1361            ),
1362        )
1363        .await
1364        .expect("subscription timed out")
1365        .expect("subscription failed");
1366
1367        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1368            .await
1369            .expect("unsubscribe timed out")
1370            .expect("unsubscribe failed");
1371        let res = timeout(Duration::from_millis(100), rx.recv())
1372            .await
1373            .expect("awaiting message timeout out");
1374
1375        // If the subscription ended, the channel should have been closed.
1376        assert!(res.is_none());
1377
1378        timeout(Duration::from_millis(100), client.close())
1379            .await
1380            .expect("close timed out")
1381            .expect("close failed");
1382        jh.await
1383            .expect("ws loop errored")
1384            .unwrap();
1385        server_thread.await.unwrap();
1386    }
1387
1388    #[tokio::test]
1389    async fn test_subscription_unexpected_end() {
1390        let exp_comm = [
1391            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1392            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1393            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
1394        ];
1395        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1396
1397        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1398        let jh = client
1399            .connect()
1400            .await
1401            .expect("connect failed");
1402        let (_, mut rx) = timeout(
1403            Duration::from_millis(100),
1404            client.subscribe(
1405                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1406                SubscriptionOptions::new().with_compression(false),
1407            ),
1408        )
1409        .await
1410        .expect("subscription timed out")
1411        .expect("subscription failed");
1412        let res = timeout(Duration::from_millis(100), rx.recv())
1413            .await
1414            .expect("awaiting message timeout out");
1415
1416        // If the subscription ended, the channel should have been closed.
1417        assert!(res.is_none());
1418
1419        timeout(Duration::from_millis(100), client.close())
1420            .await
1421            .expect("close timed out")
1422            .expect("close failed");
1423        jh.await
1424            .expect("ws loop errored")
1425            .unwrap();
1426        server_thread.await.unwrap();
1427    }
1428
1429    #[test_log::test(tokio::test)]
1430    async fn test_reconnect() {
1431        let exp_comm = [
1432            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe()
1433            )),
1434            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1435                subscription_confirmation()
1436            )),
1437            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1438                {
1439                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1440                    "deltas": {
1441                        "extractor": "vm:ambient",
1442                        "chain": "ethereum",
1443                        "block": {
1444                            "number": 123,
1445                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1446                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1447                            "chain": "ethereum",             
1448                            "ts": "2023-09-14T00:00:00"
1449                        },
1450                        "finalized_block_height": 0,
1451                        "revert": false,
1452                        "new_tokens": {},
1453                        "account_updates": {
1454                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1455                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1456                                "chain": "ethereum",
1457                                "slots": {},
1458                                "balance": "0x01f4",
1459                                "code": "",
1460                                "change": "Update"
1461                            }
1462                        },
1463                        "state_updates": {
1464                            "component_1": {
1465                                "component_id": "component_1",
1466                                "updated_attributes": {"attr1": "0x01"},
1467                                "deleted_attributes": ["attr2"]
1468                            }
1469                        },
1470                        "new_protocol_components": {
1471                            "protocol_1":
1472                                {
1473                                    "id": "protocol_1",
1474                                    "protocol_system": "system_1",
1475                                    "protocol_type_name": "type_1",
1476                                    "chain": "ethereum",
1477                                    "tokens": ["0x01", "0x02"],
1478                                    "contract_ids": ["0x01", "0x02"],
1479                                    "static_attributes": {"attr1": "0x01f4"},
1480                                    "change": "Update",
1481                                    "creation_tx": "0x01",
1482                                    "created_at": "2023-09-14T00:00:00"
1483                                }
1484                            },
1485                        "deleted_protocol_components": {},
1486                        "component_balances": {
1487                            "protocol_1": {
1488                                "0x01": {
1489                                    "token": "0x01",
1490                                    "balance": "0x01f4",
1491                                    "balance_float": 1000.0,
1492                                    "modify_tx": "0x01",
1493                                    "component_id": "protocol_1"
1494                                }
1495                            }
1496                        },
1497                        "account_balances": {
1498                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1499                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1500                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1501                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1502                                    "balance": "0x01f4",
1503                                    "modify_tx": "0x01"
1504                                }
1505                            }
1506                        },
1507                        "component_tvl": {
1508                            "protocol_1": 1000.0
1509                        },
1510                        "dci_update": {
1511                            "new_entrypoints": {},
1512                            "new_entrypoint_params": {},
1513                            "trace_results": {}
1514                        }
1515                    }
1516                }
1517                "#.to_owned()
1518            ))
1519        ];
1520        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1521        let client = WsDeltasClient::new_with_reconnects(
1522            &format!("ws://{addr}"),
1523            None,
1524            3,
1525            // server stays down for 100ms on connection drop
1526            Duration::from_millis(110),
1527        )
1528        .unwrap();
1529
1530        let jh: JoinHandle<Result<(), DeltasError>> = client
1531            .connect()
1532            .await
1533            .expect("connect failed");
1534
1535        for _ in 0..2 {
1536            dbg!("loop");
1537            let (_, mut rx) = timeout(
1538                Duration::from_millis(200),
1539                client.subscribe(
1540                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1541                    SubscriptionOptions::new().with_compression(false),
1542                ),
1543            )
1544            .await
1545            .expect("subscription timed out")
1546            .expect("subscription failed");
1547
1548            let _ = timeout(Duration::from_millis(100), rx.recv())
1549                .await
1550                .expect("awaiting message timeout out")
1551                .expect("receiving message failed");
1552
1553            // wait for the connection to drop
1554            let res = timeout(Duration::from_millis(200), rx.recv())
1555                .await
1556                .expect("awaiting closed connection timeout out");
1557            assert!(res.is_none());
1558        }
1559        let res = jh.await.expect("ws client join failed");
1560        // 5th client reconnect attempt should fail
1561        assert!(res.is_err());
1562        server_thread
1563            .await
1564            .expect("ws server loop errored");
1565    }
1566
1567    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1568        let server = TcpListener::bind("127.0.0.1:0")
1569            .await
1570            .expect("localhost bind failed");
1571        let addr = server.local_addr().unwrap();
1572        let jh = tokio::spawn(async move {
1573            while let Ok((stream, _)) = server.accept().await {
1574                if accept_first {
1575                    // Send connection handshake to accept the connection (fail later)
1576                    let stream = tokio_tungstenite::accept_async(stream)
1577                        .await
1578                        .unwrap();
1579                    sleep(Duration::from_millis(10)).await;
1580                    drop(stream)
1581                } else {
1582                    // Close the connection to simulate a failure
1583                    drop(stream);
1584                }
1585            }
1586        });
1587        (addr, jh)
1588    }
1589
1590    #[test_log::test(tokio::test)]
1591    async fn test_subscribe_dead_client_after_max_attempts() {
1592        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1593        let client = WsDeltasClient::new_with_reconnects(
1594            &format!("ws://{addr}"),
1595            None,
1596            3,
1597            Duration::from_secs(0),
1598        )
1599        .unwrap();
1600
1601        let join_handle = client.connect().await.unwrap();
1602        let handle_res = join_handle.await.unwrap();
1603        assert!(handle_res.is_err());
1604        assert!(!client.is_connected().await);
1605
1606        let subscription_res = timeout(
1607            Duration::from_millis(10),
1608            client.subscribe(
1609                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1610                SubscriptionOptions::new(),
1611            ),
1612        )
1613        .await
1614        .unwrap();
1615        assert!(subscription_res.is_err());
1616    }
1617
1618    #[test_log::test(tokio::test)]
1619    async fn test_ws_client_retry_cooldown() {
1620        let start = std::time::Instant::now();
1621        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1622
1623        // Use the mock server that immediately drops connections
1624        let client = WsDeltasClient::new_with_reconnects(
1625            &format!("ws://{addr}"),
1626            None,
1627            3,                         // 3 attempts total (so 2 retries with cooldowns)
1628            Duration::from_millis(50), // 50ms cooldown
1629        )
1630        .unwrap();
1631
1632        // Try to connect - this should fail after retries but still measure the time
1633        let connect_result = client.connect().await;
1634        let elapsed = start.elapsed();
1635
1636        // Connection should fail after exhausting retries
1637        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1638
1639        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1640        assert!(
1641            elapsed >= Duration::from_millis(100),
1642            "Expected at least 100ms elapsed, got {:?}",
1643            elapsed
1644        );
1645
1646        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1647        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1648    }
1649
1650    #[test_log::test(tokio::test)]
1651    async fn test_buffer_full_triggers_unsubscribe() {
1652        // Expected communication sequence for buffer full scenario
1653        let exp_comm = {
1654            [
1655            // 1. Client subscribes
1656            ExpectedComm::Receive(
1657                100,
1658                tungstenite::protocol::Message::Text(
1659                    subscribe(),
1660                ),
1661            ),
1662            // 2. Server confirms subscription
1663            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1664                subscription_confirmation(),
1665            )),
1666            // 3. Server sends first message (fills buffer)
1667            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1668                r#"
1669                {
1670                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1671                    "deltas": {
1672                        "extractor": "vm:ambient",
1673                        "chain": "ethereum",
1674                        "block": {
1675                            "number": 123,
1676                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1677                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1678                            "chain": "ethereum",
1679                            "ts": "2023-09-14T00:00:00"
1680                        },
1681                        "finalized_block_height": 0,
1682                        "revert": false,
1683                        "new_tokens": {},
1684                        "account_updates": {},
1685                        "state_updates": {},
1686                        "new_protocol_components": {},
1687                        "deleted_protocol_components": {},
1688                        "component_balances": {},
1689                        "account_balances": {},
1690                        "component_tvl": {},
1691                        "dci_update": {
1692                            "new_entrypoints": {},
1693                            "new_entrypoint_params": {},
1694                            "trace_results": {}
1695                        }
1696                    }
1697                }
1698                "#.to_owned()
1699            )),
1700            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1701            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1702                r#"
1703                {
1704                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1705                    "deltas": {
1706                        "extractor": "vm:ambient",
1707                        "chain": "ethereum",
1708                        "block": {
1709                            "number": 124,
1710                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1711                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1712                            "chain": "ethereum",
1713                            "ts": "2023-09-14T00:00:01"
1714                        },
1715                        "finalized_block_height": 0,
1716                        "revert": false,
1717                        "new_tokens": {},
1718                        "account_updates": {},
1719                        "state_updates": {},
1720                        "new_protocol_components": {},
1721                        "deleted_protocol_components": {},
1722                        "component_balances": {},
1723                        "account_balances": {},
1724                        "component_tvl": {},
1725                        "dci_update": {
1726                            "new_entrypoints": {},
1727                            "new_entrypoint_params": {},
1728                            "trace_results": {}
1729                        }
1730                    }
1731                }
1732                "#.to_owned()
1733            )),
1734            // 5. Expect unsubscribe command due to buffer full
1735            ExpectedComm::Receive(
1736                100,
1737                tungstenite::protocol::Message::Text(
1738                    unsubscribe(),
1739                ),
1740            ),
1741            // 6. Server confirms unsubscription
1742            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1743                subscription_ended(),
1744            )),
1745        ]
1746        };
1747
1748        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1749
1750        // Create client with very small buffer size (1) to easily trigger BufferFull
1751        let client = WsDeltasClient::new_with_custom_buffers(
1752            &format!("ws://{addr}"),
1753            None,
1754            128, // ws_buffer_size
1755            1,   // subscription_buffer_size - this will trigger BufferFull easily
1756        )
1757        .unwrap();
1758
1759        let jh = client
1760            .connect()
1761            .await
1762            .expect("connect failed");
1763
1764        let (_sub_id, mut rx) = timeout(
1765            Duration::from_millis(100),
1766            client.subscribe(
1767                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1768                SubscriptionOptions::new().with_compression(false),
1769            ),
1770        )
1771        .await
1772        .expect("subscription timed out")
1773        .expect("subscription failed");
1774
1775        // Allow time for messages to be processed and buffer to fill up
1776        tokio::time::sleep(Duration::from_millis(100)).await;
1777
1778        // Collect all messages until channel closes or we get a reasonable number
1779        let mut received_msgs = Vec::new();
1780
1781        // Use a single longer timeout to collect messages until channel closes
1782        while received_msgs.len() < 3 {
1783            match timeout(Duration::from_millis(200), rx.recv()).await {
1784                Ok(Some(msg)) => {
1785                    received_msgs.push(msg);
1786                }
1787                Ok(None) => {
1788                    // Channel closed - this is what we expect after buffer overflow
1789                    break;
1790                }
1791                Err(_) => {
1792                    // Timeout - no more messages coming
1793                    break;
1794                }
1795            }
1796        }
1797
1798        // Verify the key behavior: buffer overflow should limit messages and close channel
1799        assert!(
1800            received_msgs.len() <= 1,
1801            "Expected buffer overflow to limit messages to at most 1, got {}",
1802            received_msgs.len()
1803        );
1804
1805        if let Some(first_msg) = received_msgs.first() {
1806            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1807        }
1808
1809        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1810        // been verified We don't need to explicitly close the client as it will be cleaned
1811        // up when dropped
1812
1813        // Just wait for the tasks to finish cleanly
1814        drop(rx); // Explicitly drop the receiver
1815        tokio::time::sleep(Duration::from_millis(50)).await;
1816
1817        // Abort the tasks to clean up
1818        jh.abort();
1819        server_thread.abort();
1820
1821        let _ = jh.await;
1822        let _ = server_thread.await;
1823    }
1824
1825    #[tokio::test]
1826    async fn test_server_error_handling() {
1827        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1828
1829        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1830
1831        // Test ExtractorNotFound error
1832        let error_response = WebSocketMessage::Response(Response::Error(
1833            WebsocketError::ExtractorNotFound(extractor_id.clone().into()),
1834        ));
1835        let error_json = serde_json::to_string(&error_response).unwrap();
1836
1837        let exp_comm = [
1838            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1839            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1840        ];
1841
1842        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1843
1844        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1845        let jh = client
1846            .connect()
1847            .await
1848            .expect("connect failed");
1849
1850        let result = timeout(
1851            Duration::from_millis(100),
1852            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1853        )
1854        .await
1855        .expect("subscription timed out");
1856
1857        // Verify that we get a ServerError
1858        assert!(result.is_err());
1859        if let Err(DeltasError::ServerError(msg)) = result {
1860            assert!(msg.contains("Subscription failed"));
1861            assert!(msg.contains("Extractor not found"));
1862        } else {
1863            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1864        }
1865
1866        timeout(Duration::from_millis(100), client.close())
1867            .await
1868            .expect("close timed out")
1869            .expect("close failed");
1870        jh.await
1871            .expect("ws loop errored")
1872            .unwrap();
1873        server_thread.await.unwrap();
1874    }
1875
1876    #[test_log::test(tokio::test)]
1877    async fn test_subscription_not_found_error() {
1878        // Test scenario: Server restart causes subscription loss
1879        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1880
1881        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1882        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1883
1884        let error_response = WebSocketMessage::Response(Response::Error(
1885            WebsocketError::SubscriptionNotFound(subscription_id),
1886        ));
1887        let error_json = serde_json::to_string(&error_response).unwrap();
1888
1889        let exp_comm = [
1890            // 1. Client subscribes successfully
1891            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1892            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
1893            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1894            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
1895            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1896            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1897        ];
1898
1899        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1900
1901        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1902        let jh = client
1903            .connect()
1904            .await
1905            .expect("connect failed");
1906
1907        // Subscribe successfully
1908        let (received_sub_id, _rx) = timeout(
1909            Duration::from_millis(100),
1910            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1911        )
1912        .await
1913        .expect("subscription timed out")
1914        .expect("subscription failed");
1915
1916        assert_eq!(received_sub_id, subscription_id);
1917
1918        // Now try to unsubscribe - this should fail because server "restarted"
1919        let unsubscribe_result =
1920            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1921                .await
1922                .expect("unsubscribe timed out");
1923
1924        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1925        // In this case, the client should treat it as successful since the subscription
1926        // is effectively gone (whether due to server restart or other reasons)
1927        unsubscribe_result
1928            .expect("Unsubscribe should succeed even if server says subscription not found");
1929
1930        timeout(Duration::from_millis(100), client.close())
1931            .await
1932            .expect("close timed out")
1933            .expect("close failed");
1934        jh.await
1935            .expect("ws loop errored")
1936            .unwrap();
1937        server_thread.await.unwrap();
1938    }
1939
1940    #[test_log::test(tokio::test)]
1941    async fn test_parse_error_handling() {
1942        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1943
1944        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1945        let error_response = WebSocketMessage::Response(Response::Error(
1946            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1947        ));
1948        let error_json = serde_json::to_string(&error_response).unwrap();
1949
1950        let exp_comm = [
1951            // subscribe first so connect can finish successfully
1952            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
1953            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1954        ];
1955
1956        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1957
1958        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1959        let jh = client
1960            .connect()
1961            .await
1962            .expect("connect failed");
1963
1964        // Subscribe successfully
1965        let _ = timeout(
1966            Duration::from_millis(100),
1967            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
1968        )
1969        .await
1970        .expect("subscription timed out");
1971
1972        // The client should receive the parse error and close the connection
1973        let result = jh
1974            .await
1975            .expect("ws loop should complete");
1976        assert!(result.is_err());
1977        if let Err(DeltasError::ServerError(message)) = result {
1978            assert!(message.contains("Server failed to parse client message"));
1979        } else {
1980            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1981        }
1982
1983        server_thread.await.unwrap();
1984    }
1985
1986    #[test_log::test(tokio::test)]
1987    async fn test_compression_error_handling() {
1988        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1989
1990        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
1991        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
1992        let error_response = WebSocketMessage::Response(Response::Error(
1993            WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1994        ));
1995        let error_json = serde_json::to_string(&error_response).unwrap();
1996
1997        let exp_comm = [
1998            // subscribe first so connect can finish successfully
1999            ExpectedComm::Receive(
2000                100,
2001                tungstenite::protocol::Message::Text(subscribe_with_compression(true)),
2002            ),
2003            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2004        ];
2005
2006        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2007
2008        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2009        let jh = client
2010            .connect()
2011            .await
2012            .expect("connect failed");
2013
2014        // Subscribe successfully with compression disabled
2015        let _ = timeout(
2016            Duration::from_millis(100),
2017            client.subscribe(extractor_id, SubscriptionOptions::new()),
2018        )
2019        .await
2020        .expect("subscription timed out");
2021
2022        // The client should receive the parse error
2023        let result = jh
2024            .await
2025            .expect("ws loop should complete");
2026        assert!(result.is_err());
2027        if let Err(DeltasError::ServerError(message)) = result {
2028            assert!(message.contains("Server failed to compress message for subscription"));
2029        } else {
2030            panic!("Expected DeltasError::ServerError, got: {:?}", result);
2031        }
2032
2033        server_thread.await.unwrap();
2034    }
2035
2036    #[tokio::test]
2037    async fn test_subscribe_error_handling() {
2038        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2039
2040        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2041
2042        let error_response = WebSocketMessage::Response(Response::Error(
2043            WebsocketError::SubscribeError(extractor_id.clone().into()),
2044        ));
2045        let error_json = serde_json::to_string(&error_response).unwrap();
2046
2047        let exp_comm = [
2048            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2049            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2050        ];
2051
2052        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2053
2054        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2055        let jh = client
2056            .connect()
2057            .await
2058            .expect("connect failed");
2059
2060        let result = timeout(
2061            Duration::from_millis(100),
2062            client.subscribe(extractor_id, SubscriptionOptions::new().with_compression(false)),
2063        )
2064        .await
2065        .expect("subscription timed out");
2066
2067        // Verify that we get a ServerError for subscribe failure
2068        assert!(result.is_err());
2069        if let Err(DeltasError::ServerError(msg)) = result {
2070            assert!(msg.contains("Subscription failed"));
2071            assert!(msg.contains("Failed to subscribe to extractor"));
2072        } else {
2073            panic!("Expected DeltasError::ServerError, got: {:?}", result);
2074        }
2075
2076        timeout(Duration::from_millis(100), client.close())
2077            .await
2078            .expect("close timed out")
2079            .expect("close failed");
2080        jh.await
2081            .expect("ws loop errored")
2082            .unwrap();
2083        server_thread.await.unwrap();
2084    }
2085
2086    #[tokio::test]
2087    async fn test_cancel_pending_subscription() {
2088        // This test verifies that pending subscriptions are properly cancelled when errors occur
2089        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2090
2091        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "vm:ambient");
2092
2093        let error_response = WebSocketMessage::Response(Response::Error(
2094            WebsocketError::ExtractorNotFound(extractor_id.clone().into()),
2095        ));
2096        let error_json = serde_json::to_string(&error_response).unwrap();
2097
2098        let exp_comm = [
2099            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2100            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2101        ];
2102
2103        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2104
2105        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2106        let jh = client
2107            .connect()
2108            .await
2109            .expect("connect failed");
2110
2111        // Start two subscription attempts simultaneously
2112        let client_clone = client.clone();
2113        let extractor_id_clone = extractor_id.clone();
2114
2115        let subscription1 = tokio::spawn({
2116            let client_for_spawn = client.clone();
2117            async move {
2118                client_for_spawn
2119                    .subscribe(extractor_id, SubscriptionOptions::new().with_compression(false))
2120                    .await
2121            }
2122        });
2123
2124        let subscription2 = tokio::spawn(async move {
2125            // This should fail because there's already a pending subscription
2126            client_clone
2127                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2128                .await
2129        });
2130
2131        let (result1, result2) = tokio::join!(subscription1, subscription2);
2132
2133        let result1 = result1.unwrap();
2134        let result2 = result2.unwrap();
2135
2136        // One should fail due to ExtractorNotFound error from server
2137        // The other should fail due to SubscriptionAlreadyPending
2138        assert!(result1.is_err() || result2.is_err());
2139
2140        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2141            // This is expected for the second subscription
2142        } else if let Err(DeltasError::ServerError(_)) = result1 {
2143            // This is expected for the first subscription that gets the server error
2144        } else {
2145            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2146        }
2147
2148        timeout(Duration::from_millis(100), client.close())
2149            .await
2150            .expect("close timed out")
2151            .expect("close failed");
2152        jh.await
2153            .expect("ws loop errored")
2154            .unwrap();
2155        server_thread.await.unwrap();
2156    }
2157
2158    #[tokio::test]
2159    async fn test_force_unsubscribe_prevents_multiple_calls() {
2160        // Test that force_unsubscribe prevents sending duplicate unsubscribe commands
2161        // when called multiple times for the same subscription_id
2162
2163        let subscription_id = Uuid::from_str(SUBSCRIPTION_ID).unwrap();
2164
2165        let exp_comm = [
2166            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(subscribe())),
2167            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_confirmation())),
2168            // Expect only ONE unsubscribe message, even though force_unsubscribe is called twice
2169            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(unsubscribe())),
2170            ExpectedComm::Send(tungstenite::protocol::Message::Text(subscription_ended())),
2171        ];
2172
2173        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2174
2175        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2176        let jh = client
2177            .connect()
2178            .await
2179            .expect("connect failed");
2180
2181        let (received_sub_id, _rx) = timeout(
2182            Duration::from_millis(100),
2183            client.subscribe(
2184                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2185                SubscriptionOptions::new().with_compression(false),
2186            ),
2187        )
2188        .await
2189        .expect("subscription timed out")
2190        .expect("subscription failed");
2191
2192        assert_eq!(received_sub_id, subscription_id);
2193
2194        // Access the inner state to call force_unsubscribe directly
2195        {
2196            let mut inner_guard = client.inner.lock().await;
2197            let inner = inner_guard
2198                .as_mut()
2199                .expect("client should be connected");
2200
2201            // Call force_unsubscribe twice - only the first should send an unsubscribe message
2202            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2203            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2204        }
2205
2206        // Give time for messages to be processed
2207        tokio::time::sleep(Duration::from_millis(50)).await;
2208
2209        // Close may fail if client disconnected after unsubscribe, which is fine
2210        let _ = timeout(Duration::from_millis(100), client.close()).await;
2211
2212        // Wait for tasks to complete
2213        let _ = jh.await;
2214        let _ = server_thread.await;
2215    }
2216}