tycho_client/
deltas.rs

1//! # Deltas Client
2//!
3//! This module focuses on implementing the Real-Time Deltas client for the Tycho Indexer service.
4//! Utilizing this client facilitates efficient, instant communication with the indexing service,
5//! promoting seamless data synchronization.
6//!
7//! ## Websocket Implementation
8//!
9//! The present WebSocket implementation is cloneable, which enables it to be shared
10//! across multiple asynchronous tasks without creating separate instances for each task. This
11//! unique feature boosts efficiency as it:
12//!
13//! - **Reduces Server Load:** By maintaining a single universal client, the load on the server is
14//!   significantly reduced. This is because fewer connections are made to the server, preventing it
15//!   from getting overwhelmed by numerous simultaneous requests.
16//! - **Conserves Resource Usage:** A single shared client requires fewer system resources than if
17//!   multiple clients were instantiated and used separately as there is some overhead for websocket
18//!   handshakes and message.
19//!
20//! Therefore, sharing one client among multiple tasks ensures optimal performance, reduces resource
21//! consumption, and enhances overall software scalability.
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        Arc,
27    },
28    time::Duration,
29};
30
31use async_trait::async_trait;
32use futures03::{stream::SplitSink, SinkExt, StreamExt};
33use hyper::{
34    header::{
35        AUTHORIZATION, CONNECTION, HOST, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_VERSION, UPGRADE,
36        USER_AGENT,
37    },
38    Uri,
39};
40#[cfg(test)]
41use mockall::automock;
42use thiserror::Error;
43use tokio::{
44    net::TcpStream,
45    sync::{
46        mpsc::{self, error::TrySendError, Receiver, Sender},
47        oneshot, Mutex, Notify,
48    },
49    task::JoinHandle,
50    time::sleep,
51};
52use tokio_tungstenite::{
53    connect_async,
54    tungstenite::{
55        self,
56        handshake::client::{generate_key, Request},
57    },
58    MaybeTlsStream, WebSocketStream,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use tycho_common::dto::{BlockChanges, Command, ExtractorIdentity, Response, WebSocketMessage};
62use uuid::Uuid;
63
64use crate::TYCHO_SERVER_VERSION;
65
66#[derive(Error, Debug)]
67pub enum DeltasError {
68    /// Failed to parse the provided URI.
69    #[error("Failed to parse URI: {0}. Error: {1}")]
70    UriParsing(String, String),
71
72    /// The requested subscription is already pending and is awaiting confirmation from the server.
73    #[error("The requested subscription is already pending")]
74    SubscriptionAlreadyPending,
75
76    /// A message failed to send via an internal channel or through the websocket channel.
77    /// This is typically a fatal error and might indicate a bug in the implementation.
78    #[error("{0}")]
79    TransportError(String),
80
81    /// The internal message buffer is full. This likely means that messages are not being consumed
82    /// fast enough. If the incoming load emits messages in bursts, consider increasing the buffer
83    /// size.
84    #[error("The buffer is full!")]
85    BufferFull,
86
87    /// The client has no active connections but was accessed (e.g., by calling subscribe).
88    /// This typically occurs when trying to use the client before calling connect() or
89    /// after the connection has been closed.
90    #[error("The client is not connected!")]
91    NotConnected,
92
93    /// The connect method was called while the client already had an active connection.
94    #[error("The client is already connected!")]
95    AlreadyConnected,
96
97    /// The connection was closed orderly by the server, e.g. because it restarted.
98    #[error("The server closed the connection!")]
99    ConnectionClosed,
100
101    /// The connection was closed unexpectedly by the server or encountered a network error.
102    #[error("Connection error: {0}")]
103    ConnectionError(#[from] Box<tungstenite::Error>),
104
105    /// A fatal error occurred that cannot be recovered from.
106    #[error("Tycho FatalError: {0}")]
107    Fatal(String),
108}
109
110#[derive(Clone, Debug)]
111pub struct SubscriptionOptions {
112    include_state: bool,
113}
114
115impl Default for SubscriptionOptions {
116    fn default() -> Self {
117        Self { include_state: true }
118    }
119}
120
121impl SubscriptionOptions {
122    pub fn new() -> Self {
123        Self::default()
124    }
125    pub fn with_state(mut self, val: bool) -> Self {
126        self.include_state = val;
127        self
128    }
129}
130
131#[cfg_attr(test, automock)]
132#[async_trait]
133pub trait DeltasClient {
134    /// Subscribe to an extractor and receive realtime messages
135    ///
136    /// Will request a subscription from tycho and wait for confirmation of it. If the caller
137    /// cancels while waiting for confirmation the subscription may still be registered. If the
138    /// receiver was deallocated though, the first message from the subscription will remove it
139    /// again - since there is no one to inform about these messages.
140    async fn subscribe(
141        &self,
142        extractor_id: ExtractorIdentity,
143        options: SubscriptionOptions,
144    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError>;
145
146    /// Unsubscribe from an subscription
147    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError>;
148
149    /// Start the clients message handling loop.
150    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError>;
151
152    /// Close the clients message handling loop.
153    async fn close(&self) -> Result<(), DeltasError>;
154}
155
156#[derive(Clone)]
157pub struct WsDeltasClient {
158    /// The tycho indexer websocket uri.
159    uri: Uri,
160    /// Authorization key for the websocket connection.
161    auth_key: Option<String>,
162    /// Maximum amount of reconnects to try before giving up.
163    max_reconnects: u32,
164    /// The client will buffer this many messages incoming from the websocket
165    /// before starting to drop them.
166    ws_buffer_size: usize,
167    /// The client will buffer that many messages for each subscription before it starts droppping
168    /// them.
169    subscription_buffer_size: usize,
170    /// Notify tasks waiting for a connection to be established.
171    conn_notify: Arc<Notify>,
172    /// Shared client instance state.
173    inner: Arc<Mutex<Option<Inner>>>,
174    /// If set the client has exhausted it's reconnection attemtpts
175    dead: Arc<AtomicBool>,
176}
177
178type WebSocketSink =
179    SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::protocol::Message>;
180
181/// Subscription State
182///
183/// Subscription go through a lifecycle:
184///
185/// ```text
186/// O ---> requested subscribe ----> active ----> requested unsub ---> ended
187/// ```
188///
189/// We use oneshot channels to inform the client struct about when these transition happened. E.g.
190/// because for `subscribe`` to finish, we want the state to have transition to `active` and similar
191/// for `unsubscribe`.
192#[derive(Debug)]
193enum SubscriptionInfo {
194    /// Subscription was requested we wait for server confirmation and uuid assignment.
195    RequestedSubscription(oneshot::Sender<(Uuid, Receiver<BlockChanges>)>),
196    /// Subscription is active.
197    Active,
198    /// Unsubscription was requested, we wait for server confirmation.
199    RequestedUnsubscription(oneshot::Sender<()>),
200}
201
202/// Internal struct containing shared state between of WsDeltaClient instances.
203struct Inner {
204    /// Websocket sender handle.
205    sink: WebSocketSink,
206    /// Command channel sender handle.
207    cmd_tx: Sender<()>,
208    /// Currently pending subscriptions.
209    pending: HashMap<ExtractorIdentity, SubscriptionInfo>,
210    /// Active subscriptions.
211    subscriptions: HashMap<Uuid, SubscriptionInfo>,
212    /// For eachs subscription we keep a sender handle, the receiver is returned to the caller of
213    /// subscribe.
214    sender: HashMap<Uuid, Sender<BlockChanges>>,
215    /// How many messages to buffer per subscription before starting to drop new messages.
216    buffer_size: usize,
217}
218
219/// Shared state between all client instances.
220///
221/// This state is behind a mutex and requires synchronization to be read of modified.
222impl Inner {
223    fn new(cmd_tx: Sender<()>, sink: WebSocketSink, buffer_size: usize) -> Self {
224        Self {
225            sink,
226            cmd_tx,
227            pending: HashMap::new(),
228            subscriptions: HashMap::new(),
229            sender: HashMap::new(),
230            buffer_size,
231        }
232    }
233
234    /// Registers a new pending subscription.
235    #[allow(clippy::result_large_err)]
236    fn new_subscription(
237        &mut self,
238        id: &ExtractorIdentity,
239        ready_tx: oneshot::Sender<(Uuid, Receiver<BlockChanges>)>,
240    ) -> Result<(), DeltasError> {
241        if self.pending.contains_key(id) {
242            return Err(DeltasError::SubscriptionAlreadyPending);
243        }
244        self.pending
245            .insert(id.clone(), SubscriptionInfo::RequestedSubscription(ready_tx));
246        Ok(())
247    }
248
249    /// Transitions a pending subscription to active.
250    ///
251    /// Will ignore any request to do so for subscriptions that are not pending.
252    fn mark_active(&mut self, extractor_id: &ExtractorIdentity, subscription_id: Uuid) {
253        if let Some(info) = self.pending.remove(extractor_id) {
254            if let SubscriptionInfo::RequestedSubscription(ready_tx) = info {
255                let (tx, rx) = mpsc::channel(self.buffer_size);
256                self.sender.insert(subscription_id, tx);
257                self.subscriptions
258                    .insert(subscription_id, SubscriptionInfo::Active);
259                let _ = ready_tx
260                    .send((subscription_id, rx))
261                    .map_err(|_| {
262                        warn!(
263                            ?extractor_id,
264                            ?subscription_id,
265                            "Subscriber for has gone away. Ignoring."
266                        )
267                    });
268            } else {
269                error!(
270                    ?extractor_id,
271                    ?subscription_id,
272                    "Pending subscription was not in the correct state to 
273                    transition to active. Ignoring!"
274                )
275            }
276        } else {
277            error!(
278                ?extractor_id,
279                ?subscription_id,
280                "Tried to mark an unkown subscription as active. Ignoring!"
281            );
282        }
283    }
284
285    /// Sends a message to a subscription's receiver.
286    #[allow(clippy::result_large_err)]
287    fn send(&mut self, id: &Uuid, msg: BlockChanges) -> Result<(), DeltasError> {
288        if let Some(sender) = self.sender.get_mut(id) {
289            sender
290                .try_send(msg)
291                .map_err(|e| match e {
292                    TrySendError::Full(_) => DeltasError::BufferFull,
293                    TrySendError::Closed(_) => {
294                        DeltasError::TransportError("The subscriber has gone away".to_string())
295                    }
296                })?;
297        }
298        Ok(())
299    }
300
301    /// Requests a subscription to end.
302    ///
303    /// The subscription needs to exist and be active for this to have any effect. Wll use
304    /// `ready_tx` to notify the receiver once the transition to ended completed.
305    fn end_subscription(&mut self, subscription_id: &Uuid, ready_tx: oneshot::Sender<()>) {
306        if let Some(info) = self
307            .subscriptions
308            .get_mut(subscription_id)
309        {
310            if let SubscriptionInfo::Active = info {
311                *info = SubscriptionInfo::RequestedUnsubscription(ready_tx);
312            }
313        } else {
314            // no big deal imo so only debug lvl..
315            debug!(?subscription_id, "Tried unsubscribing from a non existent subscription");
316        }
317    }
318
319    /// Removes and fully ends a subscription
320    ///
321    /// Any calls for non-existing subscriptions will be simply ignored. May panic on internal state
322    /// inconsistencies: e.g. if the subscription exists but there is no sender for it.
323    /// Will remove a subscription even it was in active or pending state before, this is to support
324    /// any server side failure of the subscription.
325    fn remove_subscription(&mut self, subscription_id: Uuid) -> Result<(), DeltasError> {
326        if let Entry::Occupied(e) = self
327            .subscriptions
328            .entry(subscription_id)
329        {
330            let info = e.remove();
331            if let SubscriptionInfo::RequestedUnsubscription(tx) = info {
332                let _ = tx.send(()).map_err(|_| {
333                    warn!(?subscription_id, "failed to notify about removed subscription")
334                });
335                self.sender
336                    .remove(&subscription_id)
337                    .ok_or_else(|| DeltasError::Fatal("Inconsistent internal client state: `sender` state drifted from `info` while removing a subscription.".to_string()))?;
338            } else {
339                warn!(?subscription_id, "Subscription ended unexpectedly!");
340                self.sender
341                    .remove(&subscription_id)
342                    .ok_or_else(|| DeltasError::Fatal("sender channel missing".to_string()))?;
343            }
344        } else {
345            error!(
346                ?subscription_id,
347                "Received `SubscriptionEnded`, but was never subscribed 
348                to it. This is likely a bug!"
349            );
350        }
351
352        Ok(())
353    }
354
355    /// Sends a message through the websocket.
356    async fn ws_send(&mut self, msg: tungstenite::protocol::Message) -> Result<(), DeltasError> {
357        self.sink.send(msg).await.map_err(|e| {
358            DeltasError::TransportError(format!("Failed to send message to websocket: {e}"))
359        })
360    }
361}
362
363/// Tycho client websocket implementation.
364impl WsDeltasClient {
365    // Construct a new client with 5 reconnection attempts.
366    #[allow(clippy::result_large_err)]
367    pub fn new(ws_uri: &str, auth_key: Option<&str>) -> Result<Self, DeltasError> {
368        let uri = ws_uri
369            .parse::<Uri>()
370            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
371        Ok(Self {
372            uri,
373            auth_key: auth_key.map(|s| s.to_string()),
374            inner: Arc::new(Mutex::new(None)),
375            ws_buffer_size: 128,
376            subscription_buffer_size: 128,
377            conn_notify: Arc::new(Notify::new()),
378            max_reconnects: 5,
379            dead: Arc::new(AtomicBool::new(false)),
380        })
381    }
382
383    // Construct a new client with a custom number of reconnection attempts.
384    #[allow(clippy::result_large_err)]
385    pub fn new_with_reconnects(
386        ws_uri: &str,
387        max_reconnects: u32,
388        auth_key: Option<&str>,
389    ) -> Result<Self, DeltasError> {
390        let uri = ws_uri
391            .parse::<Uri>()
392            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
393
394        Ok(Self {
395            uri,
396            auth_key: auth_key.map(|s| s.to_string()),
397            inner: Arc::new(Mutex::new(None)),
398            ws_buffer_size: 128,
399            subscription_buffer_size: 128,
400            conn_notify: Arc::new(Notify::new()),
401            max_reconnects,
402            dead: Arc::new(AtomicBool::new(false)),
403        })
404    }
405
406    // Construct a new client with custom buffer sizes (for testing)
407    #[cfg(test)]
408    #[allow(clippy::result_large_err)]
409    pub fn new_with_custom_buffers(
410        ws_uri: &str,
411        auth_key: Option<&str>,
412        ws_buffer_size: usize,
413        subscription_buffer_size: usize,
414    ) -> Result<Self, DeltasError> {
415        let uri = ws_uri
416            .parse::<Uri>()
417            .map_err(|e| DeltasError::UriParsing(ws_uri.to_string(), e.to_string()))?;
418        Ok(Self {
419            uri,
420            auth_key: auth_key.map(|s| s.to_string()),
421            inner: Arc::new(Mutex::new(None)),
422            ws_buffer_size,
423            subscription_buffer_size,
424            conn_notify: Arc::new(Notify::new()),
425            max_reconnects: 5,
426            dead: Arc::new(AtomicBool::new(false)),
427        })
428    }
429
430    /// Ensures that the client is connected.
431    ///
432    /// This method will acquire the lock for inner.
433    async fn is_connected(&self) -> bool {
434        let guard = self.inner.as_ref().lock().await;
435        guard.is_some()
436    }
437
438    /// Waits for the client to be connected
439    ///
440    /// This method acquires the lock for inner for a short period, then waits until the  
441    /// connection is established if not already connected.
442    async fn ensure_connection(&self) -> Result<(), DeltasError> {
443        if self.dead.load(Ordering::SeqCst) {
444            return Err(DeltasError::NotConnected)
445        };
446        if !self.is_connected().await {
447            self.conn_notify.notified().await;
448        };
449        Ok(())
450    }
451
452    /// Main message handling logic
453    ///
454    /// If the message returns an error, a reconnect attempt may be considered depending on the
455    /// error type.
456    #[instrument(skip(self, msg))]
457    async fn handle_msg(
458        &self,
459        msg: Result<tungstenite::protocol::Message, tokio_tungstenite::tungstenite::error::Error>,
460    ) -> Result<(), DeltasError> {
461        let mut guard = self.inner.lock().await;
462
463        match msg {
464            // We do not deserialize the message directly into a WebSocketMessage. This is because
465            // the serde arbitrary_precision feature (often included in many
466            // dependencies we use) breaks some untagged enum deserializations. Instead,
467            // we deserialize the message into a serde_json::Value and convert that into a WebSocketMessage. For more info on this issue, see: https://github.com/serde-rs/json/issues/740
468            Ok(tungstenite::protocol::Message::Text(text)) => match serde_json::from_str::<
469                serde_json::Value,
470            >(&text)
471            {
472                Ok(value) => match serde_json::from_value::<WebSocketMessage>(value) {
473                    Ok(ws_message) => match ws_message {
474                        WebSocketMessage::BlockChanges { subscription_id, deltas } => {
475                            trace!(?deltas, "Received a block state change, sending to channel");
476                            let inner = guard
477                                .as_mut()
478                                .ok_or_else(|| DeltasError::NotConnected)?;
479                            match inner.send(&subscription_id, deltas) {
480                                Err(DeltasError::BufferFull) => {
481                                    error!(?subscription_id, "Buffer full, unsubscribing!");
482                                    Self::force_unsubscribe(subscription_id, inner).await;
483                                }
484                                Err(_) => {
485                                    warn!(
486                                        ?subscription_id,
487                                        "Receiver for has gone away, unsubscribing!"
488                                    );
489                                    Self::force_unsubscribe(subscription_id, inner).await;
490                                }
491                                _ => { /* Do nothing */ }
492                            }
493                        }
494                        WebSocketMessage::Response(Response::NewSubscription {
495                            extractor_id,
496                            subscription_id,
497                        }) => {
498                            info!(?extractor_id, ?subscription_id, "Received a new subscription");
499                            let inner = guard
500                                .as_mut()
501                                .ok_or_else(|| DeltasError::NotConnected)?;
502                            inner.mark_active(&extractor_id, subscription_id);
503                        }
504                        WebSocketMessage::Response(Response::SubscriptionEnded {
505                            subscription_id,
506                        }) => {
507                            info!(?subscription_id, "Received a subscription ended");
508                            let inner = guard
509                                .as_mut()
510                                .ok_or_else(|| DeltasError::NotConnected)?;
511                            inner.remove_subscription(subscription_id)?;
512                        }
513                    },
514                    Err(e) => {
515                        error!(
516                            "Failed to deserialize WebSocketMessage: {}. \nMessage: {}",
517                            e, text
518                        );
519                    }
520                },
521                Err(e) => {
522                    error!(
523                        "Failed to deserialize message: invalid JSON. {} \nMessage: {}",
524                        e, text
525                    );
526                }
527            },
528            Ok(tungstenite::protocol::Message::Ping(_)) => {
529                // Respond to pings with pongs.
530                let inner = guard
531                    .as_mut()
532                    .ok_or_else(|| DeltasError::NotConnected)?;
533                if let Err(error) = inner
534                    .ws_send(tungstenite::protocol::Message::Pong(Vec::new()))
535                    .await
536                {
537                    debug!(?error, "Failed to send pong!");
538                }
539            }
540            Ok(tungstenite::protocol::Message::Pong(_)) => {
541                // Do nothing.
542            }
543            Ok(tungstenite::protocol::Message::Close(_)) => {
544                return Err(DeltasError::ConnectionClosed);
545            }
546            Ok(unknown_msg) => {
547                info!("Received an unknown message type: {:?}", unknown_msg);
548            }
549            Err(error) => {
550                error!(?error, "Websocket error");
551                return Err(match error {
552                    tungstenite::Error::ConnectionClosed => DeltasError::ConnectionClosed,
553                    tungstenite::Error::AlreadyClosed => {
554                        warn!("Received AlreadyClosed error which is indicative of a bug!");
555                        DeltasError::ConnectionError(Box::new(error))
556                    }
557                    tungstenite::Error::Io(_) | tungstenite::Error::Protocol(_) => {
558                        DeltasError::ConnectionError(Box::new(error))
559                    }
560                    _ => DeltasError::Fatal(error.to_string()),
561                });
562            }
563        };
564        Ok(())
565    }
566
567    /// Forcefully ends a (client) stream by unsubscribing.
568    ///
569    /// Is used only if the message can't be processed due to an error that might resolve
570    /// itself by resubscribing.
571    async fn force_unsubscribe(subscription_id: Uuid, inner: &mut Inner) {
572        let (tx, rx) = oneshot::channel();
573        if let Err(e) = WsDeltasClient::unsubscribe_inner(inner, subscription_id, tx).await {
574            warn!(?e, ?subscription_id, "Failed to send unsubscribe command");
575        } else {
576            // Wait for unsubscribe completion with timeout
577            match tokio::time::timeout(Duration::from_secs(5), rx).await {
578                Ok(_) => {
579                    debug!(?subscription_id, "Unsubscribe completed successfully");
580                }
581                Err(_) => {
582                    warn!(?subscription_id, "Unsubscribe completion timed out");
583                }
584            }
585        }
586    }
587
588    /// Helper method to force an unsubscription
589    ///
590    /// This method expects to receive a mutable reference to `Inner` so it does not acquire a
591    /// lock. Used for normal unsubscribes as well to remove any subscriptions with deallocated
592    /// receivers.
593    async fn unsubscribe_inner(
594        inner: &mut Inner,
595        subscription_id: Uuid,
596        ready_tx: oneshot::Sender<()>,
597    ) -> Result<(), DeltasError> {
598        debug!(?subscription_id, "Unsubscribing");
599        inner.end_subscription(&subscription_id, ready_tx);
600        let cmd = Command::Unsubscribe { subscription_id };
601        inner
602            .ws_send(tungstenite::protocol::Message::Text(serde_json::to_string(&cmd).map_err(
603                |e| {
604                    DeltasError::TransportError(format!(
605                        "Failed to serialize unsubscribe command: {e}"
606                    ))
607                },
608            )?))
609            .await?;
610        Ok(())
611    }
612}
613
614#[async_trait]
615impl DeltasClient for WsDeltasClient {
616    #[instrument(skip(self))]
617    async fn subscribe(
618        &self,
619        extractor_id: ExtractorIdentity,
620        options: SubscriptionOptions,
621    ) -> Result<(Uuid, Receiver<BlockChanges>), DeltasError> {
622        trace!("Starting subscribe");
623        self.ensure_connection().await?;
624        let (ready_tx, ready_rx) = oneshot::channel();
625        {
626            let mut guard = self.inner.lock().await;
627            let inner = guard
628                .as_mut()
629                .ok_or_else(|| DeltasError::NotConnected)?;
630            trace!("Sending subscribe command");
631            inner.new_subscription(&extractor_id, ready_tx)?;
632            let cmd = Command::Subscribe { extractor_id, include_state: options.include_state };
633            inner
634                .ws_send(tungstenite::protocol::Message::Text(
635                    serde_json::to_string(&cmd).map_err(|e| {
636                        DeltasError::TransportError(format!(
637                            "Failed to serialize subscribe command: {e}"
638                        ))
639                    })?,
640                ))
641                .await?;
642        }
643        trace!("Waiting for subscription response");
644        let rx = ready_rx.await.map_err(|_| {
645            DeltasError::TransportError("Subscription channel closed unexpectedly".to_string())
646        })?;
647        trace!("Subscription successful");
648        Ok(rx)
649    }
650
651    #[instrument(skip(self))]
652    async fn unsubscribe(&self, subscription_id: Uuid) -> Result<(), DeltasError> {
653        self.ensure_connection().await?;
654        let (ready_tx, ready_rx) = oneshot::channel();
655        {
656            let mut guard = self.inner.lock().await;
657            let inner = guard
658                .as_mut()
659                .ok_or_else(|| DeltasError::NotConnected)?;
660
661            WsDeltasClient::unsubscribe_inner(inner, subscription_id, ready_tx).await?;
662        }
663        ready_rx.await.map_err(|_| {
664            DeltasError::TransportError("Unsubscribe channel closed unexpectedly".to_string())
665        })?;
666
667        Ok(())
668    }
669
670    #[instrument(skip(self))]
671    async fn connect(&self) -> Result<JoinHandle<Result<(), DeltasError>>, DeltasError> {
672        if self.is_connected().await {
673            return Err(DeltasError::AlreadyConnected);
674        }
675        let ws_uri = format!("{uri}{TYCHO_SERVER_VERSION}/ws", uri = self.uri);
676        info!(?ws_uri, "Starting TychoWebsocketClient");
677
678        let (cmd_tx, mut cmd_rx) = mpsc::channel(self.ws_buffer_size);
679        {
680            let mut guard = self.inner.as_ref().lock().await;
681            *guard = None;
682        }
683        let this = self.clone();
684        let jh = tokio::spawn(async move {
685            let mut retry_count = 0;
686            let mut result = Err(DeltasError::NotConnected);
687
688            'retry: while retry_count < this.max_reconnects {
689                info!(?ws_uri, retry_count, "Connecting to WebSocket server");
690
691                // Create a WebSocket request
692                let mut request_builder = Request::builder()
693                    .uri(&ws_uri)
694                    .header(SEC_WEBSOCKET_KEY, generate_key())
695                    .header(SEC_WEBSOCKET_VERSION, 13)
696                    .header(CONNECTION, "Upgrade")
697                    .header(UPGRADE, "websocket")
698                    .header(
699                        HOST,
700                        this.uri.host().ok_or_else(|| {
701                            DeltasError::UriParsing(
702                                ws_uri.clone(),
703                                "No host found in tycho url".to_string(),
704                            )
705                        })?,
706                    )
707                    .header(
708                        USER_AGENT,
709                        format!("tycho-client-{version}", version = env!("CARGO_PKG_VERSION")),
710                    );
711
712                // Add Authorization if one is given
713                if let Some(ref key) = this.auth_key {
714                    request_builder = request_builder.header(AUTHORIZATION, key);
715                }
716
717                let request = request_builder.body(()).map_err(|e| {
718                    DeltasError::TransportError(format!("Failed to build connection request: {e}"))
719                })?;
720                let (conn, _) = match connect_async(request).await {
721                    Ok(conn) => conn,
722                    Err(e) => {
723                        // Prepare for reconnection
724                        retry_count += 1;
725                        let mut guard = this.inner.as_ref().lock().await;
726                        *guard = None;
727
728                        warn!(
729                            e = e.to_string(),
730                            "Failed to connect to WebSocket server; Reconnecting"
731                        );
732                        sleep(Duration::from_millis(500)).await;
733
734                        continue 'retry;
735                    }
736                };
737
738                let (ws_tx_new, ws_rx_new) = conn.split();
739                {
740                    let mut guard = this.inner.as_ref().lock().await;
741                    *guard =
742                        Some(Inner::new(cmd_tx.clone(), ws_tx_new, this.subscription_buffer_size));
743                }
744                let mut msg_rx = ws_rx_new.boxed();
745
746                info!("Connection Successful: TychoWebsocketClient started");
747                this.conn_notify.notify_waiters();
748                result = Ok(());
749
750                loop {
751                    let res = tokio::select! {
752                        msg = msg_rx.next() => match msg {
753                            Some(msg) => this.handle_msg(msg).await,
754                            None => { break 'retry } // ws connection silently closed
755                        },
756                        _ = cmd_rx.recv() => {break 'retry},
757                    };
758                    if let Err(error) = res {
759                        debug!(?error, "WsError");
760                        if matches!(
761                            error,
762                            DeltasError::ConnectionClosed | DeltasError::ConnectionError { .. }
763                        ) {
764                            // Prepare for reconnection
765                            retry_count += 1;
766                            let mut guard = this.inner.as_ref().lock().await;
767                            *guard = None;
768
769                            warn!(
770                                ?error,
771                                ?retry_count,
772                                "Connection dropped unexpectedly; Reconnecting..."
773                            );
774                            break;
775                        } else {
776                            // Other errors are considered fatal
777                            error!(?error, "Fatal error; Exiting");
778                            result = Err(error);
779                            break 'retry;
780                        }
781                    }
782                }
783            }
784            debug!(
785                retry_count,
786                max_reconnects=?this.max_reconnects,
787                "Reconnection loop ended"
788            );
789            // Clean up before exiting
790            let mut guard = this.inner.as_ref().lock().await;
791            *guard = None;
792
793            // Check if max retries has been reached.
794            if retry_count >= this.max_reconnects {
795                error!("Max reconnection attempts reached; Exiting");
796                this.dead.store(true, Ordering::SeqCst);
797                this.conn_notify.notify_waiters(); // Notify that the task is done
798                result = Err(DeltasError::ConnectionClosed);
799            }
800
801            result
802        });
803
804        self.conn_notify.notified().await;
805
806        if self.is_connected().await {
807            Ok(jh)
808        } else {
809            Err(DeltasError::NotConnected)
810        }
811    }
812
813    #[instrument(skip(self))]
814    async fn close(&self) -> Result<(), DeltasError> {
815        info!("Closing TychoWebsocketClient");
816        let mut guard = self.inner.lock().await;
817        let inner = guard
818            .as_mut()
819            .ok_or_else(|| DeltasError::NotConnected)?;
820        inner
821            .cmd_tx
822            .send(())
823            .await
824            .map_err(|e| DeltasError::TransportError(e.to_string()))?;
825        Ok(())
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use std::net::SocketAddr;
832
833    use test_log::test;
834    use tokio::{net::TcpListener, time::timeout};
835    use tycho_common::dto::Chain;
836
837    use super::*;
838
839    #[derive(Clone)]
840    enum ExpectedComm {
841        Receive(u64, tungstenite::protocol::Message),
842        Send(tungstenite::protocol::Message),
843    }
844
845    async fn mock_tycho_ws(
846        messages: &[ExpectedComm],
847        reconnects: usize,
848    ) -> (SocketAddr, JoinHandle<()>) {
849        info!("Starting mock webserver");
850        // zero port here means the OS chooses an open port
851        let server = TcpListener::bind("127.0.0.1:0")
852            .await
853            .expect("localhost bind failed");
854        let addr = server.local_addr().unwrap();
855        let messages = messages.to_vec();
856
857        let jh = tokio::spawn(async move {
858            info!("mock webserver started");
859            for _ in 0..(reconnects + 1) {
860                if let Ok((stream, _)) = server.accept().await {
861                    let mut websocket = tokio_tungstenite::accept_async(stream)
862                        .await
863                        .unwrap();
864
865                    info!("Handling messages..");
866                    for c in messages.iter().cloned() {
867                        match c {
868                            ExpectedComm::Receive(t, exp) => {
869                                info!("Awaiting message...");
870                                let msg = timeout(Duration::from_millis(t), websocket.next())
871                                    .await
872                                    .expect("Receive timeout")
873                                    .expect("Stream exhausted")
874                                    .expect("Failed to receive message.");
875                                info!("Message received");
876                                assert_eq!(msg, exp)
877                            }
878                            ExpectedComm::Send(data) => {
879                                info!("Sending message");
880                                websocket
881                                    .send(data)
882                                    .await
883                                    .expect("Failed to send message");
884                                info!("Message sent");
885                            }
886                        };
887                    }
888                    sleep(Duration::from_millis(100)).await;
889                    // Close the WebSocket connection
890                    let _ = websocket.close(None).await;
891                }
892            }
893        });
894        (addr, jh)
895    }
896
897    #[tokio::test]
898    async fn test_subscribe_receive() {
899        let exp_comm = [
900            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
901                {
902                    "method":"subscribe",
903                    "extractor_id":{
904                        "chain":"ethereum",
905                        "name":"vm:ambient"
906                    },
907                    "include_state": true
908                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
909            )),
910            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
911                {
912                    "method":"newsubscription",
913                    "extractor_id":{
914                    "chain":"ethereum",
915                    "name":"vm:ambient"
916                    },
917                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
918                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
919            )),
920            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
921                {
922                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
923                    "deltas": {
924                        "extractor": "vm:ambient",
925                        "chain": "ethereum",
926                        "block": {
927                            "number": 123,
928                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
929                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
930                            "chain": "ethereum",             
931                            "ts": "2023-09-14T00:00:00"
932                        },
933                        "finalized_block_height": 0,
934                        "revert": false,
935                        "new_tokens": {},
936                        "account_updates": {
937                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
938                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
939                                "chain": "ethereum",
940                                "slots": {},
941                                "balance": "0x01f4",
942                                "code": "",
943                                "change": "Update"
944                            }
945                        },
946                        "state_updates": {
947                            "component_1": {
948                                "component_id": "component_1",
949                                "updated_attributes": {"attr1": "0x01"},
950                                "deleted_attributes": ["attr2"]
951                            }
952                        },
953                        "new_protocol_components": 
954                            { "protocol_1": {
955                                    "id": "protocol_1",
956                                    "protocol_system": "system_1",
957                                    "protocol_type_name": "type_1",
958                                    "chain": "ethereum",
959                                    "tokens": ["0x01", "0x02"],
960                                    "contract_ids": ["0x01", "0x02"],
961                                    "static_attributes": {"attr1": "0x01f4"},
962                                    "change": "Update",
963                                    "creation_tx": "0x01",
964                                    "created_at": "2023-09-14T00:00:00"
965                                }
966                            },
967                        "deleted_protocol_components": {},
968                        "component_balances": {
969                            "protocol_1":
970                                {
971                                    "0x01": {
972                                        "token": "0x01",
973                                        "balance": "0x01f4",
974                                        "balance_float": 0.0,
975                                        "modify_tx": "0x01",
976                                        "component_id": "protocol_1"
977                                    }
978                                }
979                        },
980                        "account_balances": {
981                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
982                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
983                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
984                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
985                                    "balance": "0x01f4",
986                                    "modify_tx": "0x01"
987                                }
988                            }
989                        },
990                        "component_tvl": {
991                            "protocol_1": 1000.0
992                        },
993                        "dci_update": {
994                            "new_entrypoints": {},
995                            "new_entrypoint_params": {},
996                            "trace_results": {}
997                        }
998                    }
999                }
1000                "#.to_owned()
1001            ))
1002        ];
1003        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1004
1005        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1006        let jh = client
1007            .connect()
1008            .await
1009            .expect("connect failed");
1010        let (_, mut rx) = timeout(
1011            Duration::from_millis(100),
1012            client.subscribe(
1013                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1014                SubscriptionOptions::new(),
1015            ),
1016        )
1017        .await
1018        .expect("subscription timed out")
1019        .expect("subscription failed");
1020        let _ = timeout(Duration::from_millis(100), rx.recv())
1021            .await
1022            .expect("awaiting message timeout out")
1023            .expect("receiving message failed");
1024        timeout(Duration::from_millis(100), client.close())
1025            .await
1026            .expect("close timed out")
1027            .expect("close failed");
1028        jh.await
1029            .expect("ws loop errored")
1030            .unwrap();
1031        server_thread.await.unwrap();
1032    }
1033
1034    #[tokio::test]
1035    async fn test_unsubscribe() {
1036        let exp_comm = [
1037            ExpectedComm::Receive(
1038                100,
1039                tungstenite::protocol::Message::Text(
1040                    r#"
1041                {
1042                    "method": "subscribe",
1043                    "extractor_id":{
1044                        "chain": "ethereum",
1045                        "name": "vm:ambient"
1046                    },
1047                    "include_state": true
1048                }"#
1049                    .to_owned()
1050                    .replace(|c: char| c.is_whitespace(), ""),
1051                ),
1052            ),
1053            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1054                r#"
1055                {
1056                    "method": "newsubscription",
1057                    "extractor_id":{
1058                        "chain": "ethereum",
1059                        "name": "vm:ambient"
1060                    },
1061                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1062                }"#
1063                .to_owned()
1064                .replace(|c: char| c.is_whitespace(), ""),
1065            )),
1066            ExpectedComm::Receive(
1067                100,
1068                tungstenite::protocol::Message::Text(
1069                    r#"
1070                {
1071                    "method": "unsubscribe",
1072                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1073                }
1074                "#
1075                    .to_owned()
1076                    .replace(|c: char| c.is_whitespace(), ""),
1077                ),
1078            ),
1079            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1080                r#"
1081                {
1082                    "method": "subscriptionended",
1083                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1084                }
1085                "#
1086                .to_owned()
1087                .replace(|c: char| c.is_whitespace(), ""),
1088            )),
1089        ];
1090        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1091
1092        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1093        let jh = client
1094            .connect()
1095            .await
1096            .expect("connect failed");
1097        let (sub_id, mut rx) = timeout(
1098            Duration::from_millis(100),
1099            client.subscribe(
1100                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1101                SubscriptionOptions::new(),
1102            ),
1103        )
1104        .await
1105        .expect("subscription timed out")
1106        .expect("subscription failed");
1107
1108        timeout(Duration::from_millis(100), client.unsubscribe(sub_id))
1109            .await
1110            .expect("unsubscribe timed out")
1111            .expect("unsubscribe failed");
1112        let res = timeout(Duration::from_millis(100), rx.recv())
1113            .await
1114            .expect("awaiting message timeout out");
1115
1116        // If the subscription ended, the channel should have been closed.
1117        assert!(res.is_none());
1118
1119        timeout(Duration::from_millis(100), client.close())
1120            .await
1121            .expect("close timed out")
1122            .expect("close failed");
1123        jh.await
1124            .expect("ws loop errored")
1125            .unwrap();
1126        server_thread.await.unwrap();
1127    }
1128
1129    #[tokio::test]
1130    async fn test_subscription_unexpected_end() {
1131        let exp_comm = [
1132            ExpectedComm::Receive(
1133                100,
1134                tungstenite::protocol::Message::Text(
1135                    r#"
1136                {
1137                    "method":"subscribe",
1138                    "extractor_id":{
1139                        "chain":"ethereum",
1140                        "name":"vm:ambient"
1141                    },
1142                    "include_state": true
1143                }"#
1144                    .to_owned()
1145                    .replace(|c: char| c.is_whitespace(), ""),
1146                ),
1147            ),
1148            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1149                r#"
1150                {
1151                    "method":"newsubscription",
1152                    "extractor_id":{
1153                        "chain":"ethereum",
1154                        "name":"vm:ambient"
1155                    },
1156                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1157                }"#
1158                .to_owned()
1159                .replace(|c: char| c.is_whitespace(), ""),
1160            )),
1161            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1162                r#"
1163                {
1164                    "method": "subscriptionended",
1165                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1166                }"#
1167                .to_owned()
1168                .replace(|c: char| c.is_whitespace(), ""),
1169            )),
1170        ];
1171        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1172
1173        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1174        let jh = client
1175            .connect()
1176            .await
1177            .expect("connect failed");
1178        let (_, mut rx) = timeout(
1179            Duration::from_millis(100),
1180            client.subscribe(
1181                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1182                SubscriptionOptions::new(),
1183            ),
1184        )
1185        .await
1186        .expect("subscription timed out")
1187        .expect("subscription failed");
1188        let res = timeout(Duration::from_millis(100), rx.recv())
1189            .await
1190            .expect("awaiting message timeout out");
1191
1192        // If the subscription ended, the channel should have been closed.
1193        assert!(res.is_none());
1194
1195        timeout(Duration::from_millis(100), client.close())
1196            .await
1197            .expect("close timed out")
1198            .expect("close failed");
1199        jh.await
1200            .expect("ws loop errored")
1201            .unwrap();
1202        server_thread.await.unwrap();
1203    }
1204
1205    #[test_log::test(tokio::test)]
1206    async fn test_reconnect() {
1207        let exp_comm = [
1208            ExpectedComm::Receive(100, tungstenite::protocol::Message::Text(r#"
1209                {
1210                    "method":"subscribe",
1211                    "extractor_id":{
1212                        "chain":"ethereum",
1213                        "name":"vm:ambient"
1214                    },
1215                    "include_state": true
1216                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1217            )),
1218            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1219                {
1220                    "method":"newsubscription",
1221                    "extractor_id":{
1222                    "chain":"ethereum",
1223                    "name":"vm:ambient"
1224                    },
1225                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1226                }"#.to_owned().replace(|c: char| c.is_whitespace(), "")
1227            )),
1228            ExpectedComm::Send(tungstenite::protocol::Message::Text(r#"
1229                {
1230                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1231                    "deltas": {
1232                        "extractor": "vm:ambient",
1233                        "chain": "ethereum",
1234                        "block": {
1235                            "number": 123,
1236                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1237                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1238                            "chain": "ethereum",             
1239                            "ts": "2023-09-14T00:00:00"
1240                        },
1241                        "finalized_block_height": 0,
1242                        "revert": false,
1243                        "new_tokens": {},
1244                        "account_updates": {
1245                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1246                                "address": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1247                                "chain": "ethereum",
1248                                "slots": {},
1249                                "balance": "0x01f4",
1250                                "code": "",
1251                                "change": "Update"
1252                            }
1253                        },
1254                        "state_updates": {
1255                            "component_1": {
1256                                "component_id": "component_1",
1257                                "updated_attributes": {"attr1": "0x01"},
1258                                "deleted_attributes": ["attr2"]
1259                            }
1260                        },
1261                        "new_protocol_components": {
1262                            "protocol_1":
1263                                {
1264                                    "id": "protocol_1",
1265                                    "protocol_system": "system_1",
1266                                    "protocol_type_name": "type_1",
1267                                    "chain": "ethereum",
1268                                    "tokens": ["0x01", "0x02"],
1269                                    "contract_ids": ["0x01", "0x02"],
1270                                    "static_attributes": {"attr1": "0x01f4"},
1271                                    "change": "Update",
1272                                    "creation_tx": "0x01",
1273                                    "created_at": "2023-09-14T00:00:00"
1274                                }
1275                            },
1276                        "deleted_protocol_components": {},
1277                        "component_balances": {
1278                            "protocol_1": {
1279                                "0x01": {
1280                                    "token": "0x01",
1281                                    "balance": "0x01f4",
1282                                    "balance_float": 1000.0,
1283                                    "modify_tx": "0x01",
1284                                    "component_id": "protocol_1"
1285                                }
1286                            }
1287                        },
1288                        "account_balances": {
1289                            "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1290                                "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": {
1291                                    "account": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1292                                    "token": "0x7a250d5630b4cf539739df2c5dacb4c659f2488d",
1293                                    "balance": "0x01f4",
1294                                    "modify_tx": "0x01"
1295                                }
1296                            }
1297                        },
1298                        "component_tvl": {
1299                            "protocol_1": 1000.0
1300                        },
1301                        "dci_update": {
1302                            "new_entrypoints": {},
1303                            "new_entrypoint_params": {},
1304                            "trace_results": {}
1305                        }
1306                    }
1307                }
1308                "#.to_owned()
1309            ))
1310        ];
1311        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 1).await;
1312        let client = WsDeltasClient::new(&format!("ws://{addr}"), None).unwrap();
1313        let jh: JoinHandle<Result<(), DeltasError>> = client
1314            .connect()
1315            .await
1316            .expect("connect failed");
1317
1318        for _ in 0..2 {
1319            let (_, mut rx) = timeout(
1320                Duration::from_millis(100),
1321                client.subscribe(
1322                    ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1323                    SubscriptionOptions::new(),
1324                ),
1325            )
1326            .await
1327            .expect("subscription timed out")
1328            .expect("subscription failed");
1329
1330            let _ = timeout(Duration::from_millis(100), rx.recv())
1331                .await
1332                .expect("awaiting message timeout out")
1333                .expect("receiving message failed");
1334
1335            // wait for the connection to drop
1336            let res = timeout(Duration::from_millis(200), rx.recv())
1337                .await
1338                .expect("awaiting closed connection timeout out");
1339            assert!(res.is_none());
1340        }
1341        let res = jh.await.expect("ws client join failed");
1342        // 5th client reconnect attempt should fail
1343        assert!(res.is_err());
1344        server_thread
1345            .await
1346            .expect("ws server loop errored");
1347    }
1348
1349    async fn mock_bad_connection_tycho_ws(accept_first: bool) -> (SocketAddr, JoinHandle<()>) {
1350        let server = TcpListener::bind("127.0.0.1:0")
1351            .await
1352            .expect("localhost bind failed");
1353        let addr = server.local_addr().unwrap();
1354        let jh = tokio::spawn(async move {
1355            while let Ok((stream, _)) = server.accept().await {
1356                if accept_first {
1357                    // Send connection handshake to accept the connection (fail later)
1358                    let stream = tokio_tungstenite::accept_async(stream)
1359                        .await
1360                        .unwrap();
1361                    sleep(Duration::from_millis(10)).await;
1362                    drop(stream)
1363                } else {
1364                    // Close the connection to simulate a failure
1365                    drop(stream);
1366                }
1367            }
1368        });
1369        (addr, jh)
1370    }
1371
1372    #[test(tokio::test)]
1373    async fn test_subscribe_dead_client_after_max_attempts() {
1374        let (addr, _) = mock_bad_connection_tycho_ws(true).await;
1375        let client = WsDeltasClient::new_with_reconnects(&format!("ws://{addr}"), 3, None).unwrap();
1376
1377        let join_handle = client.connect().await.unwrap();
1378        let handle_res = join_handle.await.unwrap();
1379        assert!(handle_res.is_err());
1380        assert!(!client.is_connected().await);
1381
1382        let subscription_res = timeout(
1383            Duration::from_millis(10),
1384            client.subscribe(
1385                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1386                SubscriptionOptions::new(),
1387            ),
1388        )
1389        .await
1390        .unwrap();
1391        assert!(subscription_res.is_err());
1392    }
1393
1394    #[test_log::test(tokio::test)]
1395    async fn test_buffer_full_triggers_unsubscribe() {
1396        // Expected communication sequence for buffer full scenario
1397        let exp_comm = {
1398            [
1399            // 1. Client subscribes
1400            ExpectedComm::Receive(
1401                100,
1402                tungstenite::protocol::Message::Text(
1403                    r#"
1404                {
1405                    "method":"subscribe",
1406                    "extractor_id":{
1407                        "chain":"ethereum",
1408                        "name":"vm:ambient"
1409                    },
1410                    "include_state": true
1411                }"#
1412                    .to_owned()
1413                    .replace(|c: char| c.is_whitespace(), ""),
1414                ),
1415            ),
1416            // 2. Server confirms subscription
1417            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1418                r#"
1419                {
1420                    "method":"newsubscription",
1421                    "extractor_id":{
1422                        "chain":"ethereum",
1423                        "name":"vm:ambient"
1424                    },
1425                    "subscription_id":"30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1426                }"#
1427                .to_owned()
1428                .replace(|c: char| c.is_whitespace(), ""),
1429            )),
1430            // 3. Server sends first message (fills buffer)
1431            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1432                r#"
1433                {
1434                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1435                    "deltas": {
1436                        "extractor": "vm:ambient",
1437                        "chain": "ethereum",
1438                        "block": {
1439                            "number": 123,
1440                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1441                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1442                            "chain": "ethereum",
1443                            "ts": "2023-09-14T00:00:00"
1444                        },
1445                        "finalized_block_height": 0,
1446                        "revert": false,
1447                        "new_tokens": {},
1448                        "account_updates": {},
1449                        "state_updates": {},
1450                        "new_protocol_components": {},
1451                        "deleted_protocol_components": {},
1452                        "component_balances": {},
1453                        "account_balances": {},
1454                        "component_tvl": {},
1455                        "dci_update": {
1456                            "new_entrypoints": {},
1457                            "new_entrypoint_params": {},
1458                            "trace_results": {}
1459                        }
1460                    }
1461                }
1462                "#.to_owned()
1463            )),
1464            // 4. Server sends second message (triggers buffer overflow and force unsubscribe)
1465            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1466                r#"
1467                {
1468                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece",
1469                    "deltas": {
1470                        "extractor": "vm:ambient",
1471                        "chain": "ethereum",
1472                        "block": {
1473                            "number": 124,
1474                            "hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
1475                            "parent_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
1476                            "chain": "ethereum",
1477                            "ts": "2023-09-14T00:00:01"
1478                        },
1479                        "finalized_block_height": 0,
1480                        "revert": false,
1481                        "new_tokens": {},
1482                        "account_updates": {},
1483                        "state_updates": {},
1484                        "new_protocol_components": {},
1485                        "deleted_protocol_components": {},
1486                        "component_balances": {},
1487                        "account_balances": {},
1488                        "component_tvl": {},
1489                        "dci_update": {
1490                            "new_entrypoints": {},
1491                            "new_entrypoint_params": {},
1492                            "trace_results": {}
1493                        }
1494                    }
1495                }
1496                "#.to_owned()
1497            )),
1498            // 5. Expect unsubscribe command due to buffer full
1499            ExpectedComm::Receive(
1500                100,
1501                tungstenite::protocol::Message::Text(
1502                    r#"
1503                {
1504                    "method": "unsubscribe",
1505                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1506                }
1507                "#
1508                    .to_owned()
1509                    .replace(|c: char| c.is_whitespace(), ""),
1510                ),
1511            ),
1512            // 6. Server confirms unsubscription
1513            ExpectedComm::Send(tungstenite::protocol::Message::Text(
1514                r#"
1515                {
1516                    "method": "subscriptionended",
1517                    "subscription_id": "30b740d1-cf09-4e0e-8cfe-b1434d447ece"
1518                }
1519                "#
1520                .to_owned()
1521                .replace(|c: char| c.is_whitespace(), ""),
1522            )),
1523        ]
1524        };
1525
1526        let (addr, server_thread) = mock_tycho_ws(&exp_comm, 0).await;
1527
1528        // Create client with very small buffer size (1) to easily trigger BufferFull
1529        let client = WsDeltasClient::new_with_custom_buffers(
1530            &format!("ws://{addr}"),
1531            None,
1532            128, // ws_buffer_size
1533            1,   // subscription_buffer_size - this will trigger BufferFull easily
1534        )
1535        .unwrap();
1536
1537        let jh = client
1538            .connect()
1539            .await
1540            .expect("connect failed");
1541
1542        let (_sub_id, mut rx) = timeout(
1543            Duration::from_millis(100),
1544            client.subscribe(
1545                ExtractorIdentity::new(Chain::Ethereum, "vm:ambient"),
1546                SubscriptionOptions::new(),
1547            ),
1548        )
1549        .await
1550        .expect("subscription timed out")
1551        .expect("subscription failed");
1552
1553        // Allow time for messages to be processed and buffer to fill up
1554        tokio::time::sleep(Duration::from_millis(100)).await;
1555
1556        // Collect all messages until channel closes or we get a reasonable number
1557        let mut received_msgs = Vec::new();
1558
1559        // Use a single longer timeout to collect messages until channel closes
1560        while received_msgs.len() < 3 {
1561            match timeout(Duration::from_millis(200), rx.recv()).await {
1562                Ok(Some(msg)) => {
1563                    received_msgs.push(msg);
1564                }
1565                Ok(None) => {
1566                    // Channel closed - this is what we expect after buffer overflow
1567                    break;
1568                }
1569                Err(_) => {
1570                    // Timeout - no more messages coming
1571                    break;
1572                }
1573            }
1574        }
1575
1576        // Verify the key behavior: buffer overflow should limit messages and close channel
1577        assert!(
1578            received_msgs.len() <= 1,
1579            "Expected buffer overflow to limit messages to at most 1, got {}",
1580            received_msgs.len()
1581        );
1582
1583        if let Some(first_msg) = received_msgs.first() {
1584            assert_eq!(first_msg.block.number, 123, "Expected first message with block 123");
1585        }
1586
1587        // Test passed! The key behavior we're testing (buffer full causes force unsubscribe) has
1588        // been verified We don't need to explicitly close the client as it will be cleaned
1589        // up when dropped
1590
1591        // Just wait for the tasks to finish cleanly
1592        drop(rx); // Explicitly drop the receiver
1593        tokio::time::sleep(Duration::from_millis(50)).await;
1594
1595        // Abort the tasks to clean up
1596        jh.abort();
1597        server_thread.abort();
1598
1599        let _ = jh.await;
1600        let _ = server_thread.await;
1601    }
1602}