bothan_lib/worker/
websocket.rs

1//! WebSocket-based asset information providers and streaming mechanisms.
2//!
3//! This module provides functionality for streaming asset information in real-time
4//! from WebSocket APIs. It defines traits for WebSocket connectors and providers,
5//! as well as a function for starting a listening loop.
6//!
7//! The module provides:
8//!
9//! - The [`AssetInfoProviderConnector`] trait for creating WebSocket connections
10//! - The [`AssetInfoProvider`] trait for managing WebSocket subscriptions and data
11//! - The [`Data`] enum for different types of received data
12//! - The [`start_listening`] function which implements the WebSocket listener loop
13//!
14//! # WebSocket Strategy
15//!
16//! The WebSocket strategy follows these principles:
17//!
18//! 1. **Persistent Connections**: Maintains persistent WebSocket connections for real-time data
19//! 2. **Automatic Reconnection**: Automatically reconnects with exponential backoff if the connection is lost
20//! 3. **Subscription Management**: Provides a standard way to subscribe to asset updates
21//! 4. **Error Resilience**: Handles connection failures and data errors gracefully
22//!
23//! When implementing new WebSocket-based asset providers, implement both the
24//! [`AssetInfoProviderConnector`] and [`AssetInfoProvider`] traits, and use the
25//! [`start_listening`] function to handle the connection lifecycle.
26
27use std::fmt::Display;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use tokio::select;
32use tokio::time::{sleep, timeout};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info};
35
36use crate::metrics::websocket::{ConnectionResult, MessageType, Metrics};
37use crate::store::{Store, WorkerStore};
38use crate::types::AssetInfo;
39
40/// Represents different types of data that can be received from a WebSocket connection.
41///
42/// This enum allows WebSocket providers to distinguish between different types of
43/// messages they might receive, handling each appropriately.
44///
45/// # Variants
46///
47/// * `AssetInfo` - Contains asset information updates to be stored
48/// * `Ping` - Represents a ping message from the server
49/// * `Unused` - Represents data that is not relevant to asset information updates
50pub enum Data {
51    /// Asset information updates to be stored in the system.
52    ///
53    /// This variant contains a vector of [`AssetInfo`] structures that should be
54    /// saved to the store.
55    AssetInfo(Vec<AssetInfo>),
56
57    /// Serverside Ping messages.
58    Ping,
59
60    /// Data that is not relevant to asset information updates.
61    ///
62    /// This variant is used for messages that should be acknowledged but don’t
63    /// contain asset information, such as heartbeats or subscription acknowledgments.
64    Unused,
65}
66
67/// Trait for factory objects that can establish connections to WebSocket providers.
68///
69/// This trait separates the connection establishment logic from the provider itself,
70/// allowing for cleaner error handling and reconnection strategies. Implementors
71/// should handle the initial connection setup and return a provider that's ready
72/// to subscribe to asset information.
73#[async_trait::async_trait]
74pub trait AssetInfoProviderConnector: Send + Sync {
75    /// The type of provider that this connector creates.
76    ///
77    /// This should be a type that implements the [`AssetInfoProvider`] trait.
78    type Provider: AssetInfoProvider;
79
80    /// The type returned in the event of a connection failure.
81    ///
82    /// This should be a custom error type that implements the Display trait
83    /// and captures all possible error conditions during connection.
84    type Error: Display;
85
86    /// Establishes a connection to the WebSocket and returns a provider.
87    ///
88    /// This method should handle the initial WebSocket connection setup,
89    /// including any authentication or handshaking required by the API.
90    ///
91    /// # Errors
92    ///
93    /// Returns a connector-specific error if the connection fails, such as when
94    /// the API is unavailable, authentication fails, or the connection can’t be
95    /// established for any other reason.
96    async fn connect(&self) -> Result<Self::Provider, Self::Error>;
97}
98
99/// Trait for providers that can stream asset information from WebSocket APIs.
100///
101/// This trait defines the interface that WebSocket-based asset information providers
102/// must implement. Providers are responsible for subscribing to asset updates,
103/// receiving and parsing WebSocket messages, and converting them into [`Data`] structures.
104#[async_trait::async_trait]
105pub trait AssetInfoProvider: Send + Sync {
106    /// The type returned in the event of a subscription failure.
107    ///
108    /// This should be a custom error type that implements the Display trait
109    /// and captures all possible error conditions during subscription.
110    type SubscriptionError: Display;
111
112    /// The type returned in the event of a message reception failure.
113    ///
114    /// This should be a custom error type that implements the Display trait
115    /// and captures all possible error conditions during message reception.
116    type ListeningError: Display;
117
118    /// Subscribes to asset updates for the specified asset IDs.
119    ///
120    /// This method should send subscription requests to the WebSocket API
121    /// for each of the specified asset IDs, configuring the connection to
122    /// receive updates for these assets.
123    ///
124    /// # Errors
125    ///
126    /// Returns a subscription-specific error if the operation fails, such as when
127    /// the API rejects the subscription request or the request can’t be sent.
128    async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError>;
129
130    /// Waits for and returns the next data update from the WebSocket.
131    ///
132    /// This method should wait for the next WebSocket message, parse it,
133    /// and return the appropriate data structure. It returns None if the
134    /// connection has been closed.
135    ///
136    /// # Returns
137    ///
138    /// * `Some(Ok(Data))` - If a message was successfully received and parsed
139    /// * `Some(Err(ListeningError))` - If there was an error receiving or parsing the message
140    /// * `None` - If the connection has been closed
141    async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>>;
142
143    /// Attempts to gracefully close the WebSocket connection.
144    ///
145    /// This method should send a close frame to the WebSocket server
146    /// and perform any necessary cleanup. It may fail silently if the
147    /// connection is already closed.
148    async fn try_close(mut self);
149}
150
151/// Starts listening for asset information from a WebSocket provider.
152///
153/// This function implements a listener loop that continuously receives asset
154/// information from a WebSocket provider and stores it using the provided worker store.
155/// The loop continues until the cancellation token is triggered.
156///
157/// # Features
158///
159/// * Maintains persistent WebSocket connections
160/// * Automatically reconnects with exponential backoff if the connection is lost
161/// * Monitors for timeouts to detect connection issues
162/// * Handles errors gracefully by logging them and continuing
163/// * Cancels listening gracefully when requested via the cancellation token
164#[tracing::instrument(skip(
165    cancellation_token,
166    provider_connector,
167    store,
168    ids,
169    connection_timeout
170))]
171pub async fn start_listening<S, E1, E2, P, C>(
172    cancellation_token: CancellationToken,
173    provider_connector: Arc<C>,
174    store: WorkerStore<S>,
175    ids: Vec<String>,
176    connection_timeout: Duration,
177    metrics: Metrics,
178) where
179    E1: Display,
180    E2: Display,
181    S: Store,
182    P: AssetInfoProvider<SubscriptionError = E1, ListeningError = E2>,
183    C: AssetInfoProviderConnector<Provider = P>,
184{
185    let mut connection = connect(provider_connector.as_ref(), &ids, &metrics).await;
186    loop {
187        select! {
188            _ = cancellation_token.cancelled() => break,
189            result = timeout(connection_timeout, connection.next()) => {
190                    match result {
191                        // If timeout, we assume the connection has been dropped, and we attempt to reconnect
192                        Err(_) | Ok(None) => {
193                            let new_conn = connect(provider_connector.as_ref(), &ids, &metrics).await;
194                            connection = new_conn
195                        }
196                        Ok(Some(Ok(Data::AssetInfo(assets)))) => {
197                            metrics.increment_activity_messages_total(MessageType::AssetInfo);
198                            if let Err(e) = store.set_batch_asset_info(assets).await {
199                                error!("failed to set asset info with error: {e}")
200                            } else {
201                                info!("asset info updated successfully");
202                            }
203                        }
204                        Ok(Some(Ok(Data::Ping))) => metrics.increment_activity_messages_total(MessageType::Ping),
205                        Ok(Some(Ok(Data::Unused))) => {
206                            metrics.increment_activity_messages_total(MessageType::Unused);
207                            debug!("received irrelevant data");
208                        },
209                        Ok(Some(Err(e))) => {
210                            metrics.increment_activity_messages_total(MessageType::Error);
211                            error!("{}", e);
212                        },
213                    }
214            }
215        }
216    }
217}
218
219/// Helper function to establish a connection with exponential backoff.
220///
221/// This function attempts to connect to the WebSocket provider and subscribe
222/// to the specified asset IDs. If the connection or subscription fails, it will
223/// retry with an exponential backoff strategy.
224///
225/// # Returns
226///
227/// A connected and subscribed provider ready to receive WebSocket messages
228async fn connect<C, P, E1, E2>(connector: &C, ids: &[String], metrics: &Metrics) -> P
229where
230    P: AssetInfoProvider<SubscriptionError = E1, ListeningError = E2>,
231    C: AssetInfoProviderConnector<Provider = P>,
232{
233    let mut retry_count = 0;
234    let mut backoff = Duration::from_secs(1);
235    let max_backoff = Duration::from_secs(64);
236
237    loop {
238        let start_time = Instant::now();
239
240        if let Ok(mut provider) = connector.connect().await {
241            if provider.subscribe(ids).await.is_ok() {
242                metrics.update_websocket_connection(
243                    start_time.elapsed().as_millis(),
244                    ConnectionResult::Success,
245                );
246                return provider;
247            }
248        }
249
250        retry_count += 1;
251        if backoff < max_backoff {
252            backoff *= 2;
253        }
254
255        metrics.update_websocket_connection(
256            start_time.elapsed().as_millis(),
257            ConnectionResult::Failed,
258        );
259        error!("failed to reconnect. current attempt: {}", retry_count);
260        sleep(backoff).await;
261    }
262}