tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, MutexGuard, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{
62    BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage, WebsocketError,
63};
64use uuid::Uuid;
65use zstd;
66
67use crate::TYCHO_SERVER_VERSION;
68
69#[derive(Error, Debug)]
70pub enum DeltasError {
71    /// Failed to parse the provided URI.
72    #[error("Failed to parse URI: {0}. Error: {1}")]
73    UriParsing(String, String),
74
75    /// The requested subscription is already pending and is awaiting confirmation from the server.
76    #[error("The requested subscription is already pending")]
77    SubscriptionAlreadyPending,
78
79    #[error("The server replied with an error: {0}")]
80    ServerError(String, #[source] WebsocketError),
81
82    /// A message failed to send via an internal channel or through the websocket channel.
83    /// This is typically a fatal error and might indicate a bug in the implementation.
84    #[error("{0}")]
85    TransportError(String),
86
87    /// The internal message buffer is full. This likely means that messages are not being consumed
88    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
89    /// size.
90    #[error("The buffer is full!")]
91    BufferFull,
92
93    /// The client has no active connections but was accessed (e.g., by calling subscribe).
94    /// This typically occurs when trying to use the client before calling connect() or
95    /// after the connection has been closed.
96    #[error("The client is not connected!")]
97    NotConnected,
98
99    /// The connect method was called while the client already had an active connection.
100    #[error("The client is already connected!")]
101    AlreadyConnected,
102
103    /// The connection was closed orderly by the server, e.g. because it restarted.
104    #[error("The server closed the connection!")]
105    ConnectionClosed,
106
107    /// The connection was closed unexpectedly by the server or encountered a network error.
108    #[error("Connection error: {0}")]
109    ConnectionError(#[from] Box<tungstenite::Error>),
110
111    /// A fatal error occurred that cannot be recovered from.
112    #[error("Tycho FatalError: {0}")]
113    Fatal(String),
114}
115
116#[derive(Clone, Debug)]
117pub struct SubscriptionOptions {
118    include_state: bool,
119    compression: bool,
120}
121
122impl Default for SubscriptionOptions {
123    fn default() -> Self {
124        Self { include_state: true, compression: true }
125    }
126}
127
128impl SubscriptionOptions {
129    pub fn new() -> Self {
130        Self::default()
131    }
132    pub fn with_state(mut self, val: bool) -> Self {
133        self.include_state = val;
134        self
135    }
136    pub fn with_compression(mut self, val: bool) -> Self {
137        self.compression = val;
138        self
139    }
140}
141
142#[cfg_attr(test, automock)]
143#[async_trait]
144pub trait DeltasClient {
145    /// Subscribe to an extractor and receive realtime messages
146    ///
147    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
148    /// cancels while waiting for confirmation the subscription may still be registered. If the
149    /// receiver was deallocated though, the first message from the subscription will remove it
150    /// again - since there is no one to inform about these messages.
151    async fn subscribe(
152        &self,
153        extractor_id: ExtractorIdentity,
154        options: SubscriptionOptions,
155    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
156
157    /// Unsubscribe from an subscription
158    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
159
160    /// Start the clients message handling loop.
161    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
162
163    /// Close the clients message handling loop.
164    async fn close(&self) -> Result<(), DeltasError>;
165}
166
167#[derive(Clone)]
168pub struct WsDeltasClient {
169    /// The tycho indexer websocket uri.
170    uri: Uri,
171    /// Authorization key for the websocket connection.
172    auth_key: Option<String>,
173    /// Maximum amount of reconnects to try before giving up.
174    max_reconnects: u64,
175    /// Duration to wait before attempting to reconnect
176    retry_cooldown: Duration,
177    /// The client will buffer this many messages incoming from the websocket
178    /// before starting to drop them.
179    ws_buffer_size: usize,
180    /// The client will buffer that many messages for each subscription before it starts dropping
181    /// them.
182    subscription_buffer_size: usize,
183    /// Notify tasks waiting for a connection to be established.
184    conn_notify: Arc<Notify>,
185    /// Shared client instance state.
186    inner: Arc<Mutex<Option<Inner>>>,
187    /// If set the client has exhausted its reconnection attempts
188    dead: Arc<AtomicBool>,
189}
190
191type WebSocketSink =
192    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
193
194/// Subscription State
195///
196/// Subscription go through a lifecycle:
197///
198/// ```text
199/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
200/// ```
201///
202/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
203/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
204/// for `unsubscribe`.
205#[derive(Debug)]
206enum SubscriptionInfo {
207    /// Subscription was requested we wait for server confirmation and uuid assignment.
208    RequestedSubscription(oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>),
209    /// Subscription is active.
210    Active,
211    /// Unsubscription was requested, we wait for server confirmation.
212    RequestedUnsubscription(oneshot::Sender<()>),
213}
214
215/// Internal struct containing shared state between of WsDeltaClient instances.
216struct Inner {
217    /// Websocket sender handle.
218    sink: WebSocketSink,
219    /// Command channel sender handle.
220    cmd_tx: Sender<()>,
221    /// Currently pending subscriptions.
222    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
223    /// Active subscriptions.
224    subscriptions: HashMap<Uuid, SubscriptionInfo>,
225    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
226    /// subscribe.
227    sender: HashMap<Uuid, Sender<BlockChanges>>,
228    /// How many messages to buffer per subscription before starting to drop new messages.
229    buffer_size: usize,
230}
231
232/// Shared state between all client instances.
233///
234/// This state is behind a mutex and requires synchronization to be read of modified.
235impl Inner {
236    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
237        Self {
238            sink,
239            cmd_tx,
240            pending: HashMap::new(),
241            subscriptions: HashMap::new(),
242            sender: HashMap::new(),
243            buffer_size,
244        }
245    }
246
247    /// Registers a new pending subscription.
248    #[allow(clippy::result_large_err)]
249    fn new_subscription(
250        &mut self,
251        id: &ExtractorIdentity,
252        ready_tx: oneshot::Sender<Result<(Uuid, Receiver<BlockChanges>), DeltasError>>,
253    ) -> Result<(), DeltasError> {
254        if self.pending.contains_key(id) {
255            return Err(DeltasError::SubscriptionAlreadyPending);
256        }
257        self.pending
258            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
259        Ok(())
260    }
261
262    /// Transitions a pending subscription to active.
263    ///
264    /// Will ignore any request to do so for subscriptions that are not pending.
265    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
266        if let Some(info) = self.pending.remove(extractor_id) {
267            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
268                let (tx, rx) = mpsc::channel(self.buffer_size);
269                self.sender.insert(subscription_id, tx);
270                self.subscriptions
271                    .insert(subscription_id, SubscriptionInfo::Active);
272                let _ = ready_tx
273                    .send(Ok((subscription_id, rx)))
274                    .map_err(|_| {
275                        warn!(
276                            ?extractor_id,
277                            ?subscription_id,
278                            "Subscriber for has gone away. Ignoring."
279                        )
280                    });
281            } else {
282                error!(
283                    ?extractor_id,
284                    ?subscription_id,
285                    "Pending subscription was not in the correct state to 
286                    transition to active. Ignoring!"
287                )
288            }
289        } else {
290            error!(
291                ?extractor_id,
292                ?subscription_id,
293                "Tried to mark an unknown subscription as active. Ignoring!"
294            );
295        }
296    }
297
298    /// Sends a message to a subscription's receiver.
299    #[allow(clippy::result_large_err)]
300    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
301        if let Some(sender) = self.sender.get_mut(id) {
302            sender
303                .try_send(msg)
304                .map_err(|e| match e {
305                    TrySendError::Full(_) => DeltasError::BufferFull,
306                    TrySendError::Closed(_) => {
307                        DeltasError::TransportError("The subscriber has gone away".to_string())
308                    }
309                })?;
310        }
311        Ok(())
312    }
313
314    /// Requests a subscription to end.
315    ///
316    /// The subscription needs to exist and be active for this to have any effect. Wll use
317    /// `ready_tx` to notify the receiver once the transition to ended completed.
318    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
319        if let Some(info) = self
320            .subscriptions
321            .get_mut(subscription_id)
322        {
323            if let SubscriptionInfo::Active = info {
324                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
325            }
326        } else {
327            // no big deal imo so only debug lvl...
328            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
329        }
330    }
331
332    /// Removes and fully ends a subscription
333    ///
334    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
335    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
336    /// Will remove a subscription even it was in active or pending state before, this is to support
337    /// any server side failure of the subscription.
338    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
339        if let Entry::Occupied(e) = self
340            .subscriptions
341            .entry(subscription_id)
342        {
343            let info = e.remove();
344            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
345                let _ = tx.send(()).map_err(|_| {
346                    debug!(?subscription_id, "failed to notify about removed subscription")
347                });
348                self.sender
349                    .remove(&subscription_id)
350                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
351            } else {
352                warn!(?subscription_id, "Subscription ended unexpectedly!");
353                self.sender
354                    .remove(&subscription_id)
355                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
356            }
357        } else {
358            // TODO: There is a race condition that can trigger multiple unsubscribes
359            //  if server doesn't respond quickly enough leading to some ugly logs but
360            //  doesn't affect behaviour negatively. E.g. BufferFull and multiple
361            //  messages from the ws connection are queued.
362            trace!(
363                ?subscription_id,
364                "Received `SubscriptionEnded`, but was never subscribed to it. This is likely a bug!"
365            );
366        }
367
368        Ok(())
369    }
370
371    fn cancel_pending(&mut self, extractor_id: &ExtractorIdentity, error: &WebsocketError) {
372        if let Some(sub_info) = self.pending.remove(extractor_id) {
373            match sub_info {
374                SubscriptionInfo::RequestedSubscription(tx) => {
375                    let _ = tx
376                        .send(Err(DeltasError::ServerError(
377                            format!("Subscription failed: {error}"),
378                            error.clone(),
379                        )))
380                        .map_err(|_| debug!("Cancel pending failed: receiver deallocated!"));
381                }
382                _ => {
383                    error!(?extractor_id, "Pending subscription in wrong state")
384                }
385            }
386        } else {
387            debug!(?extractor_id, "Tried cancel on non-existent pending subscription!")
388        }
389    }
390
391    /// Sends a message through the websocket.
392    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
393        self.sink.send(msg).await.map_err(|e| {
394            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
395        })
396    }
397}
398
399/// Tycho client websocket implementation.
400impl WsDeltasClient {
401    // Construct a new client with 5 reconnection attempts.
402    #[allow(clippy::result_large_err)]
403    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
404        let uri = ws_uri
405            .parse::<Uri>()
406            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
407        Ok(Self {
408            uri,
409            auth_key: auth_key.map(|s| s.to_string()),
410            inner: Arc::new(Mutex::new(None)),
411            ws_buffer_size: 128,
412            subscription_buffer_size: 128,
413            conn_notify: Arc::new(Notify::new()),
414            max_reconnects: 5,
415            retry_cooldown: Duration::from_millis(500),
416            dead: Arc::new(AtomicBool::new(false)),
417        })
418    }
419
420    // Construct a new client with a custom number of reconnection attempts.
421    #[allow(clippy::result_large_err)]
422    pub fn new_with_reconnects(
423        ws_uri: &str,
424        auth_key: Option<&str>,
425        max_reconnects: u64,
426        retry_cooldown: Duration,
427    ) -> Result<Self, DeltasError> {
428        let uri = ws_uri
429            .parse::<Uri>()
430            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
431
432        Ok(Self {
433            uri,
434            auth_key: auth_key.map(|s| s.to_string()),
435            inner: Arc::new(Mutex::new(None)),
436            ws_buffer_size: 128,
437            subscription_buffer_size: 128,
438            conn_notify: Arc::new(Notify::new()),
439            max_reconnects,
440            retry_cooldown,
441            dead: Arc::new(AtomicBool::new(false)),
442        })
443    }
444
445    // Construct a new client with custom buffer sizes (for testing)
446    #[cfg(test)]
447    #[allow(clippy::result_large_err)]
448    pub fn new_with_custom_buffers(
449        ws_uri: &str,
450        auth_key: Option<&str>,
451        ws_buffer_size: usize,
452        subscription_buffer_size: usize,
453    ) -> Result<Self, DeltasError> {
454        let uri = ws_uri
455            .parse::<Uri>()
456            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
457        Ok(Self {
458            uri,
459            auth_key: auth_key.map(|s| s.to_string()),
460            inner: Arc::new(Mutex::new(None)),
461            ws_buffer_size,
462            subscription_buffer_size,
463            conn_notify: Arc::new(Notify::new()),
464            max_reconnects: 5,
465            retry_cooldown: Duration::from_millis(0),
466            dead: Arc::new(AtomicBool::new(false)),
467        })
468    }
469
470    /// Ensures that the client is connected.
471    ///
472    /// This method will acquire the lock for inner.
473    async fn is_connected(&self) -> bool {
474        let guard = self.inner.as_ref().lock().await;
475        guard.is_some()
476    }
477
478    /// Waits for the client to be connected
479    ///
480    /// This method acquires the lock for inner for a short period, then waits until the  
481    /// connection is established if not already connected.
482    async fn ensure_connection(&self) -> Result<(), DeltasError> {
483        if self.dead.load(Ordering::SeqCst) {
484            return Err(DeltasError::NotConnected)
485        };
486        if !self.is_connected().await {
487            self.conn_notify.notified().await;
488        };
489        Ok(())
490    }
491
492    /// Main message handling logic
493    ///
494    /// If the message returns an error, a reconnect attempt may be considered depending on the
495    /// error type.
496    #[instrument(skip(self, msg))]
497    async fn handle_msg(
498        &self,
499        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
500    ) -> Result<(), DeltasError> {
501        let mut guard = self.inner.lock().await;
502
503        match msg {
504            // We do not deserialize the message directly into a WebSocketMessage. This is because
505            // the serde arbitrary_precision feature (often included in many
506            // dependencies we use) breaks some untagged enum deserializations. Instead,
507            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
508            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
509                serde_json::Value,
510            >(&text)
511            {
512                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
513                    Ok(ws_message) => match ws_message {
514                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
515                            Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
516                        }
517                        WebSocketMessage::Response(Response::NewSubscription {
518                            extractor_id,
519                            subscription_id,
520                        }) => {
521                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
522                            let inner = guard
523                                .as_mut()
524                                .ok_or_else(|| DeltasError::NotConnected)?;
525                            inner.mark_active(&extractor_id, subscription_id);
526                        }
527                        WebSocketMessage::Response(Response::SubscriptionEnded {
528                            subscription_id,
529                        }) => {
530                            info!(?subscription_id, "Received a subscription ended");
531                            let inner = guard
532                                .as_mut()
533                                .ok_or_else(|| DeltasError::NotConnected)?;
534                            inner.remove_subscription(subscription_id)?;
535                        }
536                        WebSocketMessage::Response(Response::Error(error)) => match &error {
537                            WebsocketError::ExtractorNotFound(extractor_id) => {
538                                let inner = guard
539                                    .as_mut()
540                                    .ok_or_else(|| DeltasError::NotConnected)?;
541                                inner.cancel_pending(extractor_id, &error);
542                            }
543                            WebsocketError::SubscriptionNotFound(subscription_id) => {
544                                debug!("Received subscription not found, removing subscription");
545                                let inner = guard
546                                    .as_mut()
547                                    .ok_or_else(|| DeltasError::NotConnected)?;
548                                inner.remove_subscription(*subscription_id)?;
549                            }
550                            WebsocketError::ParseError(raw, e) => {
551                                return Err(DeltasError::ServerError(
552                                    format!(
553                                        "Server failed to parse client message: {e}, msg: {raw}"
554                                    ),
555                                    error.clone(),
556                                ))
557                            }
558                            WebsocketError::CompressionError(subscription_id, e) => {
559                                return Err(DeltasError::ServerError(
560                                    format!(
561                                        "Server failed to compress message for subscription: {subscription_id}, error: {e}"
562                                    ),
563                                    error.clone(),
564                                ))
565                            }
566                            WebsocketError::SubscribeError(extractor_id) => {
567                                let inner = guard
568                                    .as_mut()
569                                    .ok_or_else(|| DeltasError::NotConnected)?;
570                                inner.cancel_pending(extractor_id, &error);
571                            }
572                        },
573                    },
574                    Err(e) => {
575                        error!(
576                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
577                            e, text
578                        );
579                    }
580                },
581                Err(e) => {
582                    error!(
583                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
584                        e, text
585                    );
586                }
587            },
588            Ok(tungstenite::protocol::Message::Binary(data)) => {
589                // Decompress the zstd-compressed data,
590                // Note that we only support compressed BlockChanges messages for now.
591                match zstd::decode_all(data.as_slice()) {
592                    Ok(decompressed) => match serde_json::from_slice::<serde_json::Value>(decompressed.as_slice()) {
593                                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value.clone()) {
594                                    Ok(ws_message) => match ws_message {
595                                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
596                                            Self::handle_block_changes_msg(&mut guard, subscription_id, deltas).await?;
597                                        }
598                                        _ => {
599                                            error!(
600                                                "Received unsupported compressed WebSocketMessage variant. \nMessage: {ws_message:?}",
601                                            );
602                                        }
603
604                                    },
605                                    Err(e) => {
606                                        error!(
607                                            "Failed to deserialize compressed WebSocketMessage: {e}. \nMessage: {value:?}",
608                                        );
609                                    }
610                                },
611                                Err(e) => {
612                                    error!(
613                                        "Failed to deserialize compressed message: invalid JSON. {e}",
614                                    );
615                                }
616                            },
617                    Err(e) => {
618                        error!("Failed to decompress zstd data: {}", e);
619                    }
620                }
621            },
622            Ok(tungstenite::protocol::Message::Ping(_)) => {
623                // Respond to pings with pongs.
624                let inner = guard
625                    .as_mut()
626                    .ok_or_else(|| DeltasError::NotConnected)?;
627                if let Err(error) = inner
628                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
629                    .await
630                {
631                    debug!(?error, "Failed to send pong!");
632                }
633            }
634            Ok(tungstenite::protocol::Message::Pong(_)) => {
635                // Do nothing.
636            }
637            Ok(tungstenite::protocol::Message::Close(_)) => {
638                return Err(DeltasError::ConnectionClosed);
639            }
640            Ok(unknown_msg) => {
641                info!("Received an unknown message type: {:?}", unknown_msg);
642            }
643            Err(error) => {
644                error!(?error, "Websocket error");
645                return Err(match error {
646                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
647                    tungstenite::Error::AlreadyClosed => {
648                        warn!("Received AlreadyClosed error which is indicative of a bug!");
649                        DeltasError::ConnectionError(Box::new(error))
650                    }
651                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
652                        DeltasError::ConnectionError(Box::new(error))
653                    }
654                    _ => DeltasError::Fatal(error.to_string()),
655                });
656            }
657        };
658        Ok(())
659    }
660
661    async fn handle_block_changes_msg(
662        guard: &mut MutexGuard<'_, Option<Inner>>,
663        subscription_id: Uuid,
664        deltas: BlockChanges,
665    ) -> Result<(), DeltasError> {
666        trace!(?deltas, "Received a block state change, sending to channel");
667        let inner = guard
668            .as_mut()
669            .ok_or_else(|| DeltasError::NotConnected)?;
670        match inner.send(&subscription_id, deltas) {
671            Err(DeltasError::BufferFull) => {
672                error!(?subscription_id, "Buffer full, unsubscribing!");
673                Self::force_unsubscribe(subscription_id, inner).await;
674            }
675            Err(_) => {
676                warn!(?subscription_id, "Receiver for has gone away, unsubscribing!");
677                Self::force_unsubscribe(subscription_id, inner).await;
678            }
679            _ => { /* Do nothing */ }
680        }
681        Ok(())
682    }
683
684    /// Forcefully ends a (client) stream by unsubscribing.
685    ///
686    /// Is used only if the message can't be processed due to an error that might resolve
687    /// itself by resubscribing.
688    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
689        // avoid unsubscribing multiple times
690        if let Some(SubscriptionInfo::RequestedUnsubscription(_)) = inner
691            .subscriptions
692            .get(&subscription_id)
693        {
694            return
695        }
696
697        let (tx, rx) = oneshot::channel();
698        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
699            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
700        } else {
701            // Wait for unsubscribe completion with timeout
702            match tokio::time::timeout(Duration::from_secs(5), rx).await {
703                Ok(_) => {
704                    debug!(?subscription_id, "Unsubscribe completed successfully");
705                }
706                Err(_) => {
707                    warn!(?subscription_id, "Unsubscribe completion timed out");
708                }
709            }
710        }
711    }
712
713    /// Helper method to force an unsubscription
714    ///
715    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
716    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
717    /// receivers.
718    async fn unsubscribe_inner(
719        inner: &mut Inner,
720        subscription_id: Uuid,
721        ready_tx: oneshot::Sender<()>,
722    ) -> Result<(), DeltasError> {
723        debug!(?subscription_id, "Unsubscribing");
724        inner.end_subscription(&subscription_id, ready_tx);
725        let cmd = Command::Unsubscribe { subscription_id };
726        inner
727            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
728                |e| {
729                    DeltasError::TransportError(format!(
730                        "Failed to serialize unsubscribe command: {e}"
731                    ))
732                },
733            )?))
734            .await?;
735        Ok(())
736    }
737}
738
739#[async_trait]
740impl DeltasClient for WsDeltasClient {
741    #[instrument(skip(self))]
742    async fn subscribe(
743        &self,
744        extractor_id: ExtractorIdentity,
745        options: SubscriptionOptions,
746    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
747        trace!("Starting subscribe");
748        self.ensure_connection().await?;
749        let (ready_tx, ready_rx) = oneshot::channel();
750        {
751            let mut guard = self.inner.lock().await;
752            let inner = guard
753                .as_mut()
754                .ok_or_else(|| DeltasError::NotConnected)?;
755            trace!("Sending subscribe command");
756            inner.new_subscription(&extractor_id, ready_tx)?;
757            let cmd = Command::Subscribe {
758                extractor_id,
759                include_state: options.include_state,
760                compression: options.compression,
761            };
762            inner
763                .ws_send(tungstenite::protocol::Message::Text(
764                    serde_json::to_string(&cmd).map_err(|e| {
765                        DeltasError::TransportError(format!(
766                            "Failed to serialize subscribe command: {e}"
767                        ))
768                    })?,
769                ))
770                .await?;
771        }
772        trace!("Waiting for subscription response");
773        let res = ready_rx.await.map_err(|_| {
774            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
775        })??;
776        trace!("Subscription successful");
777        Ok(res)
778    }
779
780    #[instrument(skip(self))]
781    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
782        self.ensure_connection().await?;
783        let (ready_tx, ready_rx) = oneshot::channel();
784        {
785            let mut guard = self.inner.lock().await;
786            let inner = guard
787                .as_mut()
788                .ok_or_else(|| DeltasError::NotConnected)?;
789
790            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
791        }
792        ready_rx.await.map_err(|_| {
793            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
794        })?;
795
796        Ok(())
797    }
798
799    #[instrument(skip(self))]
800    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
801        if self.is_connected().await {
802            return Err(DeltasError::AlreadyConnected);
803        }
804        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
805        info!(?ws_uri, "Starting TychoWebsocketClient");
806
807        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
808        {
809            let mut guard = self.inner.as_ref().lock().await;
810            *guard = None;
811        }
812        let this = self.clone();
813        let jh = tokio::spawn(async move {
814            let mut retry_count = 0;
815            let mut result = Err(DeltasError::NotConnected);
816
817            'retry: while retry_count < this.max_reconnects {
818                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
819                if retry_count > 0 {
820                    sleep(this.retry_cooldown).await;
821                }
822
823                // Create a WebSocket request
824                let mut request_builder = Request::builder()
825                    .uri(&ws_uri)
826                    .header(SEC_WEBSOCKET_KEY, generate_key())
827                    .header(SEC_WEBSOCKET_VERSION, 13)
828                    .header(CONNECTION, "Upgrade")
829                    .header(UPGRADE, "websocket")
830                    .header(
831                        HOST,
832                        this.uri.host().ok_or_else(|| {
833                            DeltasError::UriParsing(
834                                ws_uri.clone(),
835                                "No host found in tycho url".to_string(),
836                            )
837                        })?,
838                    )
839                    .header(
840                        USER_AGENT,
841                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
842                    );
843
844                // Add Authorization if one is given
845                if let Some(ref key) = this.auth_key {
846                    request_builder = request_builder.header(AUTHORIZATION, key);
847                }
848
849                let request = request_builder.body(()).map_err(|e| {
850                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
851                })?;
852                let (conn, _) = match connect_async(request).await {
853                    Ok(conn) => conn,
854                    Err(e) => {
855                        // Prepare for reconnection
856                        retry_count += 1;
857                        let mut guard = this.inner.as_ref().lock().await;
858                        *guard = None;
859
860                        warn!(
861                            e = e.to_string(),
862                            "Failed to connect to WebSocket server; Reconnecting"
863                        );
864                        continue 'retry;
865                    }
866                };
867
868                let (ws_tx_new, ws_rx_new) = conn.split();
869                {
870                    let mut guard = this.inner.as_ref().lock().await;
871                    *guard =
872                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
873                }
874                let mut msg_rx = ws_rx_new.boxed();
875
876                info!("Connection Successful: TychoWebsocketClient started");
877                this.conn_notify.notify_waiters();
878                result = Ok(());
879
880                loop {
881                    let res = tokio::select! {
882                        msg = msg_rx.next() => match msg {
883                            Some(msg) => this.handle_msg(msg).await,
884                            None => {
885                                // This code should not be reachable since the stream
886                                // should return ConnectionClosed in the case above
887                                // before it returns None here.
888                                warn!("Websocket connection silently closed, giving up!");
889                                break 'retry
890                            }
891                        },
892                        _ = cmd_rx.recv() => {break 'retry},
893                    };
894                    if let Err(error) = res {
895                        debug!(?error, "WsError");
896                        if matches!(
897                            error,
898                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
899                        ) {
900                            // Prepare for reconnection
901                            retry_count += 1;
902                            let mut guard = this.inner.as_ref().lock().await;
903                            *guard = None;
904
905                            warn!(
906                                ?error,
907                                ?retry_count,
908                                "Connection dropped unexpectedly; Reconnecting..."
909                            );
910                            break;
911                        } else {
912                            // Other errors are considered fatal
913                            error!(?error, "Fatal error; Exiting");
914                            result = Err(error);
915                            break 'retry;
916                        }
917                    }
918                }
919            }
920            debug!(
921                retry_count,
922                max_reconnects=?this.max_reconnects,
923                "Reconnection loop ended"
924            );
925            // Clean up before exiting
926            let mut guard = this.inner.as_ref().lock().await;
927            *guard = None;
928
929            // Check if max retries has been reached.
930            if retry_count >= this.max_reconnects {
931                error!("Max reconnection attempts reached; Exiting");
932                this.dead.store(true, Ordering::SeqCst);
933                this.conn_notify.notify_waiters(); // Notify that the task is done
934                result = Err(DeltasError::ConnectionClosed);
935            }
936
937            result
938        });
939
940        self.conn_notify.notified().await;
941
942        if self.is_connected().await {
943            Ok(jh)
944        } else {
945            Err(DeltasError::NotConnected)
946        }
947    }
948
949    #[instrument(skip(self))]
950    async fn close(&self) -> Result<(), DeltasError> {
951        info!("Closing TychoWebsocketClient");
952        let mut guard = self.inner.lock().await;
953        let inner = guard
954            .as_mut()
955            .ok_or_else(|| DeltasError::NotConnected)?;
956        inner
957            .cmd_tx
958            .send(())
959            .await
960            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
961        Ok(())
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    use std::net::SocketAddr;
968
969    use test_log::test;
970    use tokio::{net::TcpListener, time::timeout};
971    use tycho_common::dto::Chain;
972
973    use super::*;
974
975    #[derive(Clone)]
976    enum ExpectedComm {
977        Receive(u64, tungstenite::protocol::Message),
978        Send(tungstenite::protocol::Message),
979    }
980
981    async fn mock_tycho_ws(
982        messages: &[ExpectedComm],
983        reconnects: usize,
984    ) -> (SocketAddr, JoinHandle<()>) {
985        info!("Starting mock webserver");
986        // zero port here means the OS chooses an open port
987        let server = TcpListener::bind("127.0.0.1:0")
988            .await
989            .expect("localhost bind failed");
990        let addr = server.local_addr().unwrap();
991        let messages = messages.to_vec();
992
993        let jh = tokio::spawn(async move {
994            info!("mock webserver started");
995            for _ in 0..(reconnects + 1) {
996                info!("Awaiting client connections");
997                if let Ok((stream, _)) = server.accept().await {
998                    info!("Client connected");
999                    let mut websocket = tokio_tungstenite::accept_async(stream)
1000                        .await
1001                        .unwrap();
1002
1003                    info!("Handling messages..");
1004                    for c in messages.iter().cloned() {
1005                        match c {
1006                            ExpectedComm::Receive(t, exp) => {
1007                                info!("Awaiting message...");
1008                                let msg = timeout(Duration::from_millis(t), websocket.next())
1009                                    .await
1010                                    .expect("Receive timeout")
1011                                    .expect("Stream exhausted")
1012                                    .expect("Failed to receive message.");
1013                                info!("Message received");
1014                                assert_eq!(msg, exp)
1015                            }
1016                            ExpectedComm::Send(data) => {
1017                                info!("Sending message");
1018                                websocket
1019                                    .send(data)
1020                                    .await
1021                                    .expect("Failed to send message");
1022                                info!("Message sent");
1023                            }
1024                        };
1025                    }
1026                    info!("Mock communication completed");
1027                    sleep(Duration::from_millis(100)).await;
1028                    // Close the WebSocket connection
1029                    let _ = websocket.close(None).await;
1030                    info!("Mock server closed connection");
1031                }
1032            }
1033            info!("mock server ended");
1034        });
1035        (addr, jh)
1036    }
1037
1038    const SUBSCRIBE: &str = r#"
1039        {
1040            "method":"subscribe",
1041            "extractor_id":{
1042                "chain":"ethereum",
1043                "name":"vm:ambient"
1044            },
1045            "include_state": true,
1046            "compression": false
1047        }"#;
1048
1049    const SUBSCRIPTION_CONFIRMATION: &str = r#"
1050        {
1051            "method": "newsubscription",
1052            "extractor_id":{
1053                "chain": "ethereum",
1054                "name": "vm:ambient"
1055            },
1056            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1057        }"#;
1058
1059    const BLOCK_DELTAS: &str = r#"
1060        {
1061            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1062            "deltas": {
1063                "extractor": "vm:ambient",
1064                "chain": "ethereum",
1065                "block": {
1066                    "number": 123,
1067                    "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1068                    "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1069                    "chain": "ethereum",
1070                    "ts": "2023-09-14T00:00:00"
1071                },
1072                "finalized_block_height": 0,
1073                "revert": false,
1074                "new_tokens": {},
1075                "account_updates": {
1076                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1077                        "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1078                        "chain": "ethereum",
1079                        "slots": {},
1080                        "balance": "0x01f4",
1081                        "code": "",
1082                        "change": "Update"
1083                    }
1084                },
1085                "state_updates": {
1086                    "component_1": {
1087                        "component_id": "component_1",
1088                        "updated_attributes": {"attr1": "0x01"},
1089                        "deleted_attributes": ["attr2"]
1090                    }
1091                },
1092                "new_protocol_components":
1093                    { "protocol_1": {
1094                            "id": "protocol_1",
1095                            "protocol_system": "system_1",
1096                            "protocol_type_name": "type_1",
1097                            "chain": "ethereum",
1098                            "tokens": ["0x01", "0x02"],
1099                            "contract_ids": ["0x01", "0x02"],
1100                            "static_attributes": {"attr1": "0x01f4"},
1101                            "change": "Update",
1102                            "creation_tx": "0x01",
1103                            "created_at": "2023-09-14T00:00:00"
1104                        }
1105                    },
1106                "deleted_protocol_components": {},
1107                "component_balances": {
1108                    "protocol_1":
1109                        {
1110                            "0x01": {
1111                                "token": "0x01",
1112                                "balance": "0x01f4",
1113                                "balance_float": 0.0,
1114                                "modify_tx": "0x01",
1115                                "component_id": "protocol_1"
1116                            }
1117                        }
1118                },
1119                "account_balances": {
1120                    "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1121                        "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1122                            "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1123                            "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1124                            "balance": "0x01f4",
1125                            "modify_tx": "0x01"
1126                        }
1127                    }
1128                },
1129                "component_tvl": {
1130                    "protocol_1": 1000.0
1131                },
1132                "dci_update": {
1133                    "new_entrypoints": {},
1134                    "new_entrypoint_params": {},
1135                    "trace_results": {}
1136                }
1137            }
1138        }
1139        "#;
1140
1141    const UNSUBSCRIBE: &str = r#"
1142        {
1143            "method": "unsubscribe",
1144            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1145        }
1146        "#;
1147
1148    const SUBSCRIPTION_ENDED: &str = r#"
1149        {
1150            "method": "subscriptionended",
1151            "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1152        }
1153        "#;
1154
1155    #[tokio::test]
1156    async fn test_uncompressed_subscribe_receive() {
1157        let exp_comm = [
1158            ExpectedComm::Receive(
1159                100,
1160                tungstenite::protocol::Message::Text(
1161                    SUBSCRIBE
1162                        .to_owned()
1163                        .replace(|c: char| c.is_whitespace(), ""),
1164                ),
1165            ),
1166            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1167                SUBSCRIPTION_CONFIRMATION
1168                    .to_owned()
1169                    .replace(|c: char| c.is_whitespace(), ""),
1170            )),
1171            ExpectedComm::Send(tungstenite::protocol::Message::Text(BLOCK_DELTAS.to_owned())),
1172        ];
1173        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1174
1175        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1176        let jh = client
1177            .connect()
1178            .await
1179            .expect("connect failed");
1180        let (_, mut rx) = timeout(
1181            Duration::from_millis(100),
1182            client.subscribe(
1183                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1184                SubscriptionOptions::new().with_compression(false),
1185            ),
1186        )
1187        .await
1188        .expect("subscription timed out")
1189        .expect("subscription failed");
1190        let _ = timeout(Duration::from_millis(100), rx.recv())
1191            .await
1192            .expect("awaiting message timeout out")
1193            .expect("receiving message failed");
1194        timeout(Duration::from_millis(100), client.close())
1195            .await
1196            .expect("close timed out")
1197            .expect("close failed");
1198        jh.await
1199            .expect("ws loop errored")
1200            .unwrap();
1201        server_thread.await.unwrap();
1202    }
1203
1204    #[tokio::test]
1205    async fn test_compressed_subscribe_receive() {
1206        let compressed_block_deltas = zstd::encode_all(
1207            BLOCK_DELTAS.as_bytes(),
1208            0, // default compression level
1209        )
1210        .expect("Failed to compress block deltas message");
1211
1212        let exp_comm = [
1213            ExpectedComm::Receive(
1214                100,
1215                tungstenite::protocol::Message::Text(
1216                    r#"
1217                {
1218                    "method":"subscribe",
1219                    "extractor_id":{
1220                        "chain":"ethereum",
1221                        "name":"vm:ambient"
1222                    },
1223                    "include_state": true,
1224                    "compression": true
1225                }"#
1226                    .to_owned()
1227                    .replace(|c: char| c.is_whitespace(), ""),
1228                ),
1229            ),
1230            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1231                SUBSCRIPTION_CONFIRMATION
1232                    .to_owned()
1233                    .replace(|c: char| c.is_whitespace(), ""),
1234            )),
1235            ExpectedComm::Send(tungstenite::protocol::Message::Binary(compressed_block_deltas)),
1236        ];
1237        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1238
1239        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1240        let jh = client
1241            .connect()
1242            .await
1243            .expect("connect failed");
1244        let (_, mut rx) = timeout(
1245            Duration::from_millis(100),
1246            client.subscribe(
1247                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1248                SubscriptionOptions::new().with_compression(true),
1249            ),
1250        )
1251        .await
1252        .expect("subscription timed out")
1253        .expect("subscription failed");
1254        let _ = timeout(Duration::from_millis(100), rx.recv())
1255            .await
1256            .expect("awaiting message timeout out")
1257            .expect("receiving message failed");
1258        timeout(Duration::from_millis(100), client.close())
1259            .await
1260            .expect("close timed out")
1261            .expect("close failed");
1262        jh.await
1263            .expect("ws loop errored")
1264            .unwrap();
1265        server_thread.await.unwrap();
1266    }
1267
1268    #[tokio::test]
1269    async fn test_unsubscribe() {
1270        let exp_comm = [
1271            ExpectedComm::Receive(
1272                100,
1273                tungstenite::protocol::Message::Text(
1274                    SUBSCRIBE
1275                        .to_owned()
1276                        .replace(|c: char| c.is_whitespace(), ""),
1277                ),
1278            ),
1279            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1280                SUBSCRIPTION_CONFIRMATION
1281                    .to_owned()
1282                    .replace(|c: char| c.is_whitespace(), ""),
1283            )),
1284            ExpectedComm::Receive(
1285                100,
1286                tungstenite::protocol::Message::Text(
1287                    UNSUBSCRIBE
1288                        .to_owned()
1289                        .replace(|c: char| c.is_whitespace(), ""),
1290                ),
1291            ),
1292            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1293                SUBSCRIPTION_ENDED
1294                    .to_owned()
1295                    .replace(|c: char| c.is_whitespace(), ""),
1296            )),
1297        ];
1298        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1299
1300        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1301        let jh = client
1302            .connect()
1303            .await
1304            .expect("connect failed");
1305        let (sub_id, mut rx) = timeout(
1306            Duration::from_millis(100),
1307            client.subscribe(
1308                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1309                SubscriptionOptions::new().with_compression(false),
1310            ),
1311        )
1312        .await
1313        .expect("subscription timed out")
1314        .expect("subscription failed");
1315
1316        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1317            .await
1318            .expect("unsubscribe timed out")
1319            .expect("unsubscribe failed");
1320        let res = timeout(Duration::from_millis(100), rx.recv())
1321            .await
1322            .expect("awaiting message timeout out");
1323
1324        // If the subscription ended, the channel should have been closed.
1325        assert!(res.is_none());
1326
1327        timeout(Duration::from_millis(100), client.close())
1328            .await
1329            .expect("close timed out")
1330            .expect("close failed");
1331        jh.await
1332            .expect("ws loop errored")
1333            .unwrap();
1334        server_thread.await.unwrap();
1335    }
1336
1337    #[tokio::test]
1338    async fn test_subscription_unexpected_end() {
1339        let exp_comm = [
1340            ExpectedComm::Receive(
1341                100,
1342                tungstenite::protocol::Message::Text(
1343                    SUBSCRIBE
1344                        .to_owned()
1345                        .replace(|c: char| c.is_whitespace(), ""),
1346                ),
1347            ),
1348            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1349                SUBSCRIPTION_CONFIRMATION
1350                    .to_owned()
1351                    .replace(|c: char| c.is_whitespace(), ""),
1352            )),
1353            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1354                SUBSCRIPTION_ENDED
1355                    .to_owned()
1356                    .replace(|c: char| c.is_whitespace(), ""),
1357            )),
1358        ];
1359        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1360
1361        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1362        let jh = client
1363            .connect()
1364            .await
1365            .expect("connect failed");
1366        let (_, mut rx) = timeout(
1367            Duration::from_millis(100),
1368            client.subscribe(
1369                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1370                SubscriptionOptions::new().with_compression(false),
1371            ),
1372        )
1373        .await
1374        .expect("subscription timed out")
1375        .expect("subscription failed");
1376        let res = timeout(Duration::from_millis(100), rx.recv())
1377            .await
1378            .expect("awaiting message timeout out");
1379
1380        // If the subscription ended, the channel should have been closed.
1381        assert!(res.is_none());
1382
1383        timeout(Duration::from_millis(100), client.close())
1384            .await
1385            .expect("close timed out")
1386            .expect("close failed");
1387        jh.await
1388            .expect("ws loop errored")
1389            .unwrap();
1390        server_thread.await.unwrap();
1391    }
1392
1393    #[test_log::test(tokio::test)]
1394    async fn test_reconnect() {
1395        let exp_comm = [
1396            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(SUBSCRIBE.to_owned().replace(|c: char| c.is_whitespace(), "")
1397            )),
1398            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1399                SUBSCRIPTION_CONFIRMATION.to_owned().replace(|c: char| c.is_whitespace(), "")
1400            )),
1401            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1402                {
1403                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1404                    "deltas": {
1405                        "extractor": "vm:ambient",
1406                        "chain": "ethereum",
1407                        "block": {
1408                            "number": 123,
1409                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1410                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1411                            "chain": "ethereum",             
1412                            "ts": "2023-09-14T00:00:00"
1413                        },
1414                        "finalized_block_height": 0,
1415                        "revert": false,
1416                        "new_tokens": {},
1417                        "account_updates": {
1418                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1419                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1420                                "chain": "ethereum",
1421                                "slots": {},
1422                                "balance": "0x01f4",
1423                                "code": "",
1424                                "change": "Update"
1425                            }
1426                        },
1427                        "state_updates": {
1428                            "component_1": {
1429                                "component_id": "component_1",
1430                                "updated_attributes": {"attr1": "0x01"},
1431                                "deleted_attributes": ["attr2"]
1432                            }
1433                        },
1434                        "new_protocol_components": {
1435                            "protocol_1":
1436                                {
1437                                    "id": "protocol_1",
1438                                    "protocol_system": "system_1",
1439                                    "protocol_type_name": "type_1",
1440                                    "chain": "ethereum",
1441                                    "tokens": ["0x01", "0x02"],
1442                                    "contract_ids": ["0x01", "0x02"],
1443                                    "static_attributes": {"attr1": "0x01f4"},
1444                                    "change": "Update",
1445                                    "creation_tx": "0x01",
1446                                    "created_at": "2023-09-14T00:00:00"
1447                                }
1448                            },
1449                        "deleted_protocol_components": {},
1450                        "component_balances": {
1451                            "protocol_1": {
1452                                "0x01": {
1453                                    "token": "0x01",
1454                                    "balance": "0x01f4",
1455                                    "balance_float": 1000.0,
1456                                    "modify_tx": "0x01",
1457                                    "component_id": "protocol_1"
1458                                }
1459                            }
1460                        },
1461                        "account_balances": {
1462                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1463                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1464                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1465                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1466                                    "balance": "0x01f4",
1467                                    "modify_tx": "0x01"
1468                                }
1469                            }
1470                        },
1471                        "component_tvl": {
1472                            "protocol_1": 1000.0
1473                        },
1474                        "dci_update": {
1475                            "new_entrypoints": {},
1476                            "new_entrypoint_params": {},
1477                            "trace_results": {}
1478                        }
1479                    }
1480                }
1481                "#.to_owned()
1482            ))
1483        ];
1484        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1485        let client = WsDeltasClient::new_with_reconnects(
1486            &format!("ws://{addr}"),
1487            None,
1488            3,
1489            // server stays down for 100ms on connection drop
1490            Duration::from_millis(110),
1491        )
1492        .unwrap();
1493
1494        let jh: JoinHandle<Result<(), DeltasError>> = client
1495            .connect()
1496            .await
1497            .expect("connect failed");
1498
1499        for _ in 0..2 {
1500            dbg!("loop");
1501            let (_, mut rx) = timeout(
1502                Duration::from_millis(200),
1503                client.subscribe(
1504                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1505                    SubscriptionOptions::new().with_compression(false),
1506                ),
1507            )
1508            .await
1509            .expect("subscription timed out")
1510            .expect("subscription failed");
1511
1512            let _ = timeout(Duration::from_millis(100), rx.recv())
1513                .await
1514                .expect("awaiting message timeout out")
1515                .expect("receiving message failed");
1516
1517            // wait for the connection to drop
1518            let res = timeout(Duration::from_millis(200), rx.recv())
1519                .await
1520                .expect("awaiting closed connection timeout out");
1521            assert!(res.is_none());
1522        }
1523        let res = jh.await.expect("ws client join failed");
1524        // 5th client reconnect attempt should fail
1525        assert!(res.is_err());
1526        server_thread
1527            .await
1528            .expect("ws server loop errored");
1529    }
1530
1531    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1532        let server = TcpListener::bind("127.0.0.1:0")
1533            .await
1534            .expect("localhost bind failed");
1535        let addr = server.local_addr().unwrap();
1536        let jh = tokio::spawn(async move {
1537            while let Ok((stream, _)) = server.accept().await {
1538                if accept_first {
1539                    // Send connection handshake to accept the connection (fail later)
1540                    let stream = tokio_tungstenite::accept_async(stream)
1541                        .await
1542                        .unwrap();
1543                    sleep(Duration::from_millis(10)).await;
1544                    drop(stream)
1545                } else {
1546                    // Close the connection to simulate a failure
1547                    drop(stream);
1548                }
1549            }
1550        });
1551        (addr, jh)
1552    }
1553
1554    #[test(tokio::test)]
1555    async fn test_subscribe_dead_client_after_max_attempts() {
1556        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1557        let client = WsDeltasClient::new_with_reconnects(
1558            &format!("ws://{addr}"),
1559            None,
1560            3,
1561            Duration::from_secs(0),
1562        )
1563        .unwrap();
1564
1565        let join_handle = client.connect().await.unwrap();
1566        let handle_res = join_handle.await.unwrap();
1567        assert!(handle_res.is_err());
1568        assert!(!client.is_connected().await);
1569
1570        let subscription_res = timeout(
1571            Duration::from_millis(10),
1572            client.subscribe(
1573                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1574                SubscriptionOptions::new(),
1575            ),
1576        )
1577        .await
1578        .unwrap();
1579        assert!(subscription_res.is_err());
1580    }
1581
1582    #[test(tokio::test)]
1583    async fn test_ws_client_retry_cooldown() {
1584        let start = std::time::Instant::now();
1585        let (addr, _) = mock_bad_connection_tycho_ws(false).await;
1586
1587        // Use the mock server that immediately drops connections
1588        let client = WsDeltasClient::new_with_reconnects(
1589            &format!("ws://{addr}"),
1590            None,
1591            3,                         // 3 attempts total (so 2 retries with cooldowns)
1592            Duration::from_millis(50), // 50ms cooldown
1593        )
1594        .unwrap();
1595
1596        // Try to connect - this should fail after retries but still measure the time
1597        let connect_result = client.connect().await;
1598        let elapsed = start.elapsed();
1599
1600        // Connection should fail after exhausting retries
1601        assert!(connect_result.is_err(), "Expected connection to fail after retries");
1602
1603        // Should have waited at least 100ms total (2 retries × 50ms cooldown each)
1604        assert!(
1605            elapsed >= Duration::from_millis(100),
1606            "Expected at least 100ms elapsed, got {:?}",
1607            elapsed
1608        );
1609
1610        // Should not take too long (max ~300ms for 3 attempts with some tolerance)
1611        assert!(elapsed < Duration::from_millis(500), "Took too long: {:?}", elapsed);
1612    }
1613
1614    #[test_log::test(tokio::test)]
1615    async fn test_buffer_full_triggers_unsubscribe() {
1616        // Expected communication sequence for buffer full scenario
1617        let exp_comm = {
1618            [
1619            // 1. Client subscribes
1620            ExpectedComm::Receive(
1621                100,
1622                tungstenite::protocol::Message::Text(
1623                    SUBSCRIBE
1624                    .to_owned()
1625                    .replace(|c: char| c.is_whitespace(), ""),
1626                ),
1627            ),
1628            // 2. Server confirms subscription
1629            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1630                SUBSCRIPTION_CONFIRMATION
1631                .to_owned()
1632                .replace(|c: char| c.is_whitespace(), ""),
1633            )),
1634            // 3. Server sends first message (fills buffer)
1635            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1636                r#"
1637                {
1638                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1639                    "deltas": {
1640                        "extractor": "vm:ambient",
1641                        "chain": "ethereum",
1642                        "block": {
1643                            "number": 123,
1644                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1645                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1646                            "chain": "ethereum",
1647                            "ts": "2023-09-14T00:00:00"
1648                        },
1649                        "finalized_block_height": 0,
1650                        "revert": false,
1651                        "new_tokens": {},
1652                        "account_updates": {},
1653                        "state_updates": {},
1654                        "new_protocol_components": {},
1655                        "deleted_protocol_components": {},
1656                        "component_balances": {},
1657                        "account_balances": {},
1658                        "component_tvl": {},
1659                        "dci_update": {
1660                            "new_entrypoints": {},
1661                            "new_entrypoint_params": {},
1662                            "trace_results": {}
1663                        }
1664                    }
1665                }
1666                "#.to_owned()
1667            )),
1668            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1669            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1670                r#"
1671                {
1672                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1673                    "deltas": {
1674                        "extractor": "vm:ambient",
1675                        "chain": "ethereum",
1676                        "block": {
1677                            "number": 124,
1678                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1679                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1680                            "chain": "ethereum",
1681                            "ts": "2023-09-14T00:00:01"
1682                        },
1683                        "finalized_block_height": 0,
1684                        "revert": false,
1685                        "new_tokens": {},
1686                        "account_updates": {},
1687                        "state_updates": {},
1688                        "new_protocol_components": {},
1689                        "deleted_protocol_components": {},
1690                        "component_balances": {},
1691                        "account_balances": {},
1692                        "component_tvl": {},
1693                        "dci_update": {
1694                            "new_entrypoints": {},
1695                            "new_entrypoint_params": {},
1696                            "trace_results": {}
1697                        }
1698                    }
1699                }
1700                "#.to_owned()
1701            )),
1702            // 5. Expect unsubscribe command due to buffer full
1703            ExpectedComm::Receive(
1704                100,
1705                tungstenite::protocol::Message::Text(
1706                    UNSUBSCRIBE
1707                    .to_owned()
1708                    .replace(|c: char| c.is_whitespace(), ""),
1709                ),
1710            ),
1711            // 6. Server confirms unsubscription
1712            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1713                SUBSCRIPTION_ENDED
1714                .to_owned()
1715                .replace(|c: char| c.is_whitespace(), ""),
1716            )),
1717        ]
1718        };
1719
1720        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1721
1722        // Create client with very small buffer size (1) to easily trigger BufferFull
1723        let client = WsDeltasClient::new_with_custom_buffers(
1724            &format!("ws://{addr}"),
1725            None,
1726            128, // ws_buffer_size
1727            1,   // subscription_buffer_size - this will trigger BufferFull easily
1728        )
1729        .unwrap();
1730
1731        let jh = client
1732            .connect()
1733            .await
1734            .expect("connect failed");
1735
1736        let (_sub_id, mut rx) = timeout(
1737            Duration::from_millis(100),
1738            client.subscribe(
1739                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1740                SubscriptionOptions::new().with_compression(false),
1741            ),
1742        )
1743        .await
1744        .expect("subscription timed out")
1745        .expect("subscription failed");
1746
1747        // Allow time for messages to be processed and buffer to fill up
1748        tokio::time::sleep(Duration::from_millis(100)).await;
1749
1750        // Collect all messages until channel closes or we get a reasonable number
1751        let mut received_msgs = Vec::new();
1752
1753        // Use a single longer timeout to collect messages until channel closes
1754        while received_msgs.len() < 3 {
1755            match timeout(Duration::from_millis(200), rx.recv()).await {
1756                Ok(Some(msg)) => {
1757                    received_msgs.push(msg);
1758                }
1759                Ok(None) => {
1760                    // Channel closed - this is what we expect after buffer overflow
1761                    break;
1762                }
1763                Err(_) => {
1764                    // Timeout - no more messages coming
1765                    break;
1766                }
1767            }
1768        }
1769
1770        // Verify the key behavior: buffer overflow should limit messages and close channel
1771        assert!(
1772            received_msgs.len() <= 1,
1773            "Expected buffer overflow to limit messages to at most 1, got {}",
1774            received_msgs.len()
1775        );
1776
1777        if let Some(first_msg) = received_msgs.first() {
1778            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1779        }
1780
1781        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1782        // been verified We don't need to explicitly close the client as it will be cleaned
1783        // up when dropped
1784
1785        // Just wait for the tasks to finish cleanly
1786        drop(rx); // Explicitly drop the receiver
1787        tokio::time::sleep(Duration::from_millis(50)).await;
1788
1789        // Abort the tasks to clean up
1790        jh.abort();
1791        server_thread.abort();
1792
1793        let _ = jh.await;
1794        let _ = server_thread.await;
1795    }
1796
1797    #[tokio::test]
1798    async fn test_server_error_handling() {
1799        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1800
1801        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1802
1803        // Test ExtractorNotFound error
1804        let error_response = WebSocketMessage::Response(Response::Error(
1805            WebsocketError::ExtractorNotFound(extractor_id.clone()),
1806        ));
1807        let error_json = serde_json::to_string(&error_response).unwrap();
1808
1809        let exp_comm = [
1810            ExpectedComm::Receive(
1811                100,
1812                tungstenite::protocol::Message::Text(
1813                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1814                ),
1815            ),
1816            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1817        ];
1818
1819        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1820
1821        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1822        let jh = client
1823            .connect()
1824            .await
1825            .expect("connect failed");
1826
1827        let result = timeout(
1828            Duration::from_millis(100),
1829            client.subscribe(extractor_id, SubscriptionOptions::new()),
1830        )
1831        .await
1832        .expect("subscription timed out");
1833
1834        // Verify that we get a ServerError
1835        assert!(result.is_err());
1836        if let Err(DeltasError::ServerError(msg, _)) = result {
1837            assert!(msg.contains("Subscription failed"));
1838            assert!(msg.contains("Extractor not found"));
1839        } else {
1840            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1841        }
1842
1843        timeout(Duration::from_millis(100), client.close())
1844            .await
1845            .expect("close timed out")
1846            .expect("close failed");
1847        jh.await
1848            .expect("ws loop errored")
1849            .unwrap();
1850        server_thread.await.unwrap();
1851    }
1852
1853    #[test_log::test(tokio::test)]
1854    async fn test_subscription_not_found_error() {
1855        // Test scenario: Server restart causes subscription loss
1856        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1857
1858        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1859        let subscription_id = Uuid::new_v4();
1860
1861        let error_response = WebSocketMessage::Response(Response::Error(
1862            WebsocketError::SubscriptionNotFound(subscription_id),
1863        ));
1864        let error_json = serde_json::to_string(&error_response).unwrap();
1865
1866        let exp_comm = [
1867            // 1. Client subscribes successfully
1868            ExpectedComm::Receive(
1869                100,
1870                tungstenite::protocol::Message::Text(
1871                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1872                ),
1873            ),
1874            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
1875                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"test_extractor"}},"subscription_id":"{}"}}"#,
1876                subscription_id
1877            ))),
1878            // 2. Client tries to unsubscribe (server has "restarted" and lost subscription)
1879            ExpectedComm::Receive(
1880                100,
1881                tungstenite::protocol::Message::Text(format!(
1882                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
1883                    subscription_id
1884                )),
1885            ),
1886            // 3. Server responds with SubscriptionNotFound (simulating server restart)
1887            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
1888        ];
1889
1890        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1891
1892        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1893        let jh = client
1894            .connect()
1895            .await
1896            .expect("connect failed");
1897
1898        // Subscribe successfully
1899        let (received_sub_id, _rx) = timeout(
1900            Duration::from_millis(100),
1901            client.subscribe(extractor_id, SubscriptionOptions::new()),
1902        )
1903        .await
1904        .expect("subscription timed out")
1905        .expect("subscription failed");
1906
1907        assert_eq!(received_sub_id, subscription_id);
1908
1909        // Now try to unsubscribe - this should fail because server "restarted"
1910        let unsubscribe_result =
1911            timeout(Duration::from_millis(100), client.unsubscribe(subscription_id))
1912                .await
1913                .expect("unsubscribe timed out");
1914
1915        // The unsubscribe should handle the SubscriptionNotFound error gracefully
1916        // In this case, the client should treat it as successful since the subscription
1917        // is effectively gone (whether due to server restart or other reasons)
1918        unsubscribe_result
1919            .expect("Unsubscribe should succeed even if server says subscription not found");
1920
1921        timeout(Duration::from_millis(100), client.close())
1922            .await
1923            .expect("close timed out")
1924            .expect("close failed");
1925        jh.await
1926            .expect("ws loop errored")
1927            .unwrap();
1928        server_thread.await.unwrap();
1929    }
1930
1931    #[test_log::test(tokio::test)]
1932    async fn test_parse_error_handling() {
1933        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1934
1935        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1936        let error_response = WebSocketMessage::Response(Response::Error(
1937            WebsocketError::ParseError("}2sdf".to_string(), "malformed JSON".to_string()),
1938        ));
1939        let error_json = serde_json::to_string(&error_response).unwrap();
1940
1941        let exp_comm = [
1942            // subscribe first so connect can finish successfully
1943            ExpectedComm::Receive(
1944                100,
1945                tungstenite::protocol::Message::Text(
1946                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1947                ),
1948            ),
1949            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
1950        ];
1951
1952        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1953
1954        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1955        let jh = client
1956            .connect()
1957            .await
1958            .expect("connect failed");
1959
1960        // Subscribe successfully
1961        let _ = timeout(
1962            Duration::from_millis(100),
1963            client.subscribe(extractor_id, SubscriptionOptions::new()),
1964        )
1965        .await
1966        .expect("subscription timed out");
1967
1968        // The client should receive the parse error and close the connection
1969        let result = jh
1970            .await
1971            .expect("ws loop should complete");
1972        assert!(result.is_err());
1973        if let Err(DeltasError::ServerError(message, _)) = result {
1974            assert!(message.contains("Server failed to parse client message"));
1975        } else {
1976            panic!("Expected DeltasError::ServerError, got: {:?}", result);
1977        }
1978
1979        server_thread.await.unwrap();
1980    }
1981
1982    #[test_log::test(tokio::test)]
1983    async fn test_compression_error_handling() {
1984        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
1985
1986        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
1987        let subscription_id = Uuid::new_v4();
1988        let error_response = WebSocketMessage::Response(Response::Error(
1989            WebsocketError::CompressionError(subscription_id, "Compression failed".to_string()),
1990        ));
1991        let error_json = serde_json::to_string(&error_response).unwrap();
1992
1993        let exp_comm = [
1994            // subscribe first so connect can finish successfully
1995            ExpectedComm::Receive(
1996                100,
1997                tungstenite::protocol::Message::Text(
1998                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
1999                ),
2000            ),
2001            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json))
2002        ];
2003
2004        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2005
2006        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2007        let jh = client
2008            .connect()
2009            .await
2010            .expect("connect failed");
2011
2012        // Subscribe successfully with compression disabled
2013        let _ = timeout(
2014            Duration::from_millis(100),
2015            client.subscribe(extractor_id, SubscriptionOptions::new()),
2016        )
2017        .await
2018        .expect("subscription timed out");
2019
2020        // The client should receive the parse error
2021        let result = jh
2022            .await
2023            .expect("ws loop should complete");
2024        assert!(result.is_err());
2025        if let Err(DeltasError::ServerError(message, _)) = result {
2026            assert!(message.contains("Server failed to compress message for subscription"));
2027        } else {
2028            panic!("Expected DeltasError::ServerError, got: {:?}", result);
2029        }
2030
2031        server_thread.await.unwrap();
2032    }
2033
2034    #[tokio::test]
2035    async fn test_subscribe_error_handling() {
2036        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2037
2038        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "failing_extractor");
2039
2040        let error_response = WebSocketMessage::Response(Response::Error(
2041            WebsocketError::SubscribeError(extractor_id.clone()),
2042        ));
2043        let error_json = serde_json::to_string(&error_response).unwrap();
2044
2045        let exp_comm = [
2046            ExpectedComm::Receive(
2047                100,
2048                tungstenite::protocol::Message::Text(
2049                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"failing_extractor"},"include_state":true,"compression":true}"#.to_string()
2050                ),
2051            ),
2052            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2053        ];
2054
2055        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2056
2057        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2058        let jh = client
2059            .connect()
2060            .await
2061            .expect("connect failed");
2062
2063        let result = timeout(
2064            Duration::from_millis(100),
2065            client.subscribe(extractor_id, SubscriptionOptions::new()),
2066        )
2067        .await
2068        .expect("subscription timed out");
2069
2070        // Verify that we get a ServerError for subscribe failure
2071        assert!(result.is_err());
2072        if let Err(DeltasError::ServerError(msg, _)) = result {
2073            assert!(msg.contains("Subscription failed"));
2074            assert!(msg.contains("Failed to subscribe to extractor"));
2075        } else {
2076            panic!("Expected DeltasError::ServerError, got: {:?}", result);
2077        }
2078
2079        timeout(Duration::from_millis(100), client.close())
2080            .await
2081            .expect("close timed out")
2082            .expect("close failed");
2083        jh.await
2084            .expect("ws loop errored")
2085            .unwrap();
2086        server_thread.await.unwrap();
2087    }
2088
2089    #[tokio::test]
2090    async fn test_cancel_pending_subscription() {
2091        // This test verifies that pending subscriptions are properly cancelled when errors occur
2092        use tycho_common::dto::{Response, WebSocketMessage, WebsocketError};
2093
2094        let extractor_id = ExtractorIdentity::new(Chain::Ethereum, "test_extractor");
2095
2096        let error_response = WebSocketMessage::Response(Response::Error(
2097            WebsocketError::ExtractorNotFound(extractor_id.clone()),
2098        ));
2099        let error_json = serde_json::to_string(&error_response).unwrap();
2100
2101        let exp_comm = [
2102            ExpectedComm::Receive(
2103                100,
2104                tungstenite::protocol::Message::Text(
2105                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"test_extractor"},"include_state":true,"compression":true}"#.to_string()
2106                ),
2107            ),
2108            ExpectedComm::Send(tungstenite::protocol::Message::Text(error_json)),
2109        ];
2110
2111        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2112
2113        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2114        let jh = client
2115            .connect()
2116            .await
2117            .expect("connect failed");
2118
2119        // Start two subscription attempts simultaneously
2120        let client_clone = client.clone();
2121        let extractor_id_clone = extractor_id.clone();
2122
2123        let subscription1 = tokio::spawn({
2124            let client_for_spawn = client.clone();
2125            async move {
2126                client_for_spawn
2127                    .subscribe(extractor_id, SubscriptionOptions::new())
2128                    .await
2129            }
2130        });
2131
2132        let subscription2 = tokio::spawn(async move {
2133            // This should fail because there's already a pending subscription
2134            client_clone
2135                .subscribe(extractor_id_clone, SubscriptionOptions::new())
2136                .await
2137        });
2138
2139        let (result1, result2) = tokio::join!(subscription1, subscription2);
2140
2141        let result1 = result1.unwrap();
2142        let result2 = result2.unwrap();
2143
2144        // One should fail due to ExtractorNotFound error from server
2145        // The other should fail due to SubscriptionAlreadyPending
2146        assert!(result1.is_err() || result2.is_err());
2147
2148        if let Err(DeltasError::SubscriptionAlreadyPending) = result2 {
2149            // This is expected for the second subscription
2150        } else if let Err(DeltasError::ServerError(_, _)) = result1 {
2151            // This is expected for the first subscription that gets the server error
2152        } else {
2153            panic!("Expected one SubscriptionAlreadyPending and one ServerError");
2154        }
2155
2156        timeout(Duration::from_millis(100), client.close())
2157            .await
2158            .expect("close timed out")
2159            .expect("close failed");
2160        jh.await
2161            .expect("ws loop errored")
2162            .unwrap();
2163        server_thread.await.unwrap();
2164    }
2165
2166    #[tokio::test]
2167    async fn test_force_unsubscribe_prevents_multiple_calls() {
2168        // Test that force_unsubscribe prevents sending duplicate unsubscribe commands
2169        // when called multiple times for the same subscription_id
2170
2171        let subscription_id = Uuid::new_v4();
2172
2173        let exp_comm = [
2174            ExpectedComm::Receive(
2175                100,
2176                tungstenite::protocol::Message::Text(
2177                    r#"{"method":"subscribe","extractor_id":{"chain":"ethereum","name":"vm:ambient"},"include_state":true,"compression":true}"#.to_string()
2178                ),
2179            ),
2180            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2181                r#"{{"method":"newsubscription","extractor_id":{{"chain":"ethereum","name":"vm:ambient"}},"subscription_id":"{}"}}"#,
2182                subscription_id
2183            ))),
2184            // Expect only ONE unsubscribe message, even though force_unsubscribe is called twice
2185            ExpectedComm::Receive(
2186                100,
2187                tungstenite::protocol::Message::Text(format!(
2188                    r#"{{"method":"unsubscribe","subscription_id":"{}"}}"#,
2189                    subscription_id
2190                )),
2191            ),
2192            ExpectedComm::Send(tungstenite::protocol::Message::Text(format!(
2193                r#"{{"method":"subscriptionended","subscription_id":"{}"}}"#,
2194                subscription_id
2195            ))),
2196        ];
2197
2198        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
2199
2200        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
2201        let jh = client
2202            .connect()
2203            .await
2204            .expect("connect failed");
2205
2206        let (received_sub_id, _rx) = timeout(
2207            Duration::from_millis(100),
2208            client.subscribe(
2209                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
2210                SubscriptionOptions::new(),
2211            ),
2212        )
2213        .await
2214        .expect("subscription timed out")
2215        .expect("subscription failed");
2216
2217        assert_eq!(received_sub_id, subscription_id);
2218
2219        // Access the inner state to call force_unsubscribe directly
2220        {
2221            let mut inner_guard = client.inner.lock().await;
2222            let inner = inner_guard
2223                .as_mut()
2224                .expect("client should be connected");
2225
2226            // Call force_unsubscribe twice - only the first should send an unsubscribe message
2227            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2228            WsDeltasClient::force_unsubscribe(subscription_id, inner).await;
2229        }
2230
2231        // Give time for messages to be processed
2232        tokio::time::sleep(Duration::from_millis(50)).await;
2233
2234        // Close may fail if client disconnected after unsubscribe, which is fine
2235        let _ = timeout(Duration::from_millis(100), client.close()).await;
2236
2237        // Wait for tasks to complete
2238        let _ = jh.await;
2239        let _ = server_thread.await;
2240    }
2241}