bothan_lib/worker/websocket.rs
1//! WebSocket-based asset information providers and streaming mechanisms.
2//!
3//! This module provides functionality for streaming asset information in real-time
4//! from WebSocket APIs. It defines traits for WebSocket connectors and providers,
5//! as well as a function for starting a listening loop.
6//!
7//! The module provides:
8//!
9//! - The [`AssetInfoProviderConnector`] trait for creating WebSocket connections
10//! - The [`AssetInfoProvider`] trait for managing WebSocket subscriptions and data
11//! - The [`Data`] enum for different types of received data
12//! - The [`start_listening`] function which implements the WebSocket listener loop
13//!
14//! # WebSocket Strategy
15//!
16//! The WebSocket strategy follows these principles:
17//!
18//! 1. **Persistent Connections**: Maintains persistent WebSocket connections for real-time data
19//! 2. **Automatic Reconnection**: Automatically reconnects with exponential backoff if the connection is lost
20//! 3. **Subscription Management**: Provides a standard way to subscribe to asset updates
21//! 4. **Error Resilience**: Handles connection failures and data errors gracefully
22//!
23//! When implementing new WebSocket-based asset providers, implement both the
24//! [`AssetInfoProviderConnector`] and [`AssetInfoProvider`] traits, and use the
25//! [`start_listening`] function to handle the connection lifecycle.
26
27use std::fmt::Display;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use tokio::select;
32use tokio::time::{sleep, timeout};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info};
35
36use crate::metrics::websocket::{ConnectionResult, MessageType, Metrics};
37use crate::store::{Store, WorkerStore};
38use crate::types::AssetInfo;
39
40/// Represents different types of data that can be received from a WebSocket connection.
41///
42/// This enum allows WebSocket providers to distinguish between different types of
43/// messages they might receive, handling each appropriately.
44///
45/// # Variants
46///
47/// * `AssetInfo` - Contains asset information updates to be stored
48/// * `Ping` - Represents a ping message from the server
49/// * `Unused` - Represents data that is not relevant to asset information updates
50pub enum Data {
51 /// Asset information updates to be stored in the system.
52 ///
53 /// This variant contains a vector of [`AssetInfo`] structures that should be
54 /// saved to the store.
55 AssetInfo(Vec<AssetInfo>),
56
57 /// Serverside Ping messages.
58 Ping,
59
60 /// Data that is not relevant to asset information updates.
61 ///
62 /// This variant is used for messages that should be acknowledged but don’t
63 /// contain asset information, such as heartbeats or subscription acknowledgments.
64 Unused,
65}
66
67/// Trait for factory objects that can establish connections to WebSocket providers.
68///
69/// This trait separates the connection establishment logic from the provider itself,
70/// allowing for cleaner error handling and reconnection strategies. Implementors
71/// should handle the initial connection setup and return a provider that's ready
72/// to subscribe to asset information.
73#[async_trait::async_trait]
74pub trait AssetInfoProviderConnector: Send + Sync {
75 /// The type of provider that this connector creates.
76 ///
77 /// This should be a type that implements the [`AssetInfoProvider`] trait.
78 type Provider: AssetInfoProvider;
79
80 /// The type returned in the event of a connection failure.
81 ///
82 /// This should be a custom error type that implements the Display trait
83 /// and captures all possible error conditions during connection.
84 type Error: Display;
85
86 /// Establishes a connection to the WebSocket and returns a provider.
87 ///
88 /// This method should handle the initial WebSocket connection setup,
89 /// including any authentication or handshaking required by the API.
90 ///
91 /// # Errors
92 ///
93 /// Returns a connector-specific error if the connection fails, such as when
94 /// the API is unavailable, authentication fails, or the connection can’t be
95 /// established for any other reason.
96 async fn connect(&self) -> Result<Self::Provider, Self::Error>;
97}
98
99/// Trait for providers that can stream asset information from WebSocket APIs.
100///
101/// This trait defines the interface that WebSocket-based asset information providers
102/// must implement. Providers are responsible for subscribing to asset updates,
103/// receiving and parsing WebSocket messages, and converting them into [`Data`] structures.
104#[async_trait::async_trait]
105pub trait AssetInfoProvider: Send + Sync {
106 /// The type returned in the event of a subscription failure.
107 ///
108 /// This should be a custom error type that implements the Display trait
109 /// and captures all possible error conditions during subscription.
110 type SubscriptionError: Display;
111
112 /// The type returned in the event of a message reception failure.
113 ///
114 /// This should be a custom error type that implements the Display trait
115 /// and captures all possible error conditions during message reception.
116 type ListeningError: Display;
117
118 /// Subscribes to asset updates for the specified asset IDs.
119 ///
120 /// This method should send subscription requests to the WebSocket API
121 /// for each of the specified asset IDs, configuring the connection to
122 /// receive updates for these assets.
123 ///
124 /// # Errors
125 ///
126 /// Returns a subscription-specific error if the operation fails, such as when
127 /// the API rejects the subscription request or the request can’t be sent.
128 async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError>;
129
130 /// Waits for and returns the next data update from the WebSocket.
131 ///
132 /// This method should wait for the next WebSocket message, parse it,
133 /// and return the appropriate data structure. It returns None if the
134 /// connection has been closed.
135 ///
136 /// # Returns
137 ///
138 /// * `Some(Ok(Data))` - If a message was successfully received and parsed
139 /// * `Some(Err(ListeningError))` - If there was an error receiving or parsing the message
140 /// * `None` - If the connection has been closed
141 async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>>;
142
143 /// Attempts to gracefully close the WebSocket connection.
144 ///
145 /// This method should send a close frame to the WebSocket server
146 /// and perform any necessary cleanup. It may fail silently if the
147 /// connection is already closed.
148 async fn try_close(mut self);
149}
150
151/// Starts listening for asset information from a WebSocket provider.
152///
153/// This function implements a listener loop that continuously receives asset
154/// information from a WebSocket provider and stores it using the provided worker store.
155/// The loop continues until the cancellation token is triggered.
156///
157/// # Features
158///
159/// * Maintains persistent WebSocket connections
160/// * Automatically reconnects with exponential backoff if the connection is lost
161/// * Monitors for timeouts to detect connection issues
162/// * Handles errors gracefully by logging them and continuing
163/// * Cancels listening gracefully when requested via the cancellation token
164#[tracing::instrument(skip(
165 cancellation_token,
166 provider_connector,
167 store,
168 ids,
169 connection_timeout
170))]
171pub async fn start_listening<S, E1, E2, P, C>(
172 cancellation_token: CancellationToken,
173 provider_connector: Arc<C>,
174 store: WorkerStore<S>,
175 ids: Vec<String>,
176 connection_timeout: Duration,
177 metrics: Metrics,
178) where
179 E1: Display,
180 E2: Display,
181 S: Store,
182 P: AssetInfoProvider<SubscriptionError = E1, ListeningError = E2>,
183 C: AssetInfoProviderConnector<Provider = P>,
184{
185 let mut connection = connect(provider_connector.as_ref(), &ids, &metrics).await;
186 loop {
187 select! {
188 _ = cancellation_token.cancelled() => break,
189 result = timeout(connection_timeout, connection.next()) => {
190 match result {
191 // If timeout, we assume the connection has been dropped, and we attempt to reconnect
192 Err(_) | Ok(None) => {
193 let new_conn = connect(provider_connector.as_ref(), &ids, &metrics).await;
194 connection = new_conn
195 }
196 Ok(Some(Ok(Data::AssetInfo(assets)))) => {
197 metrics.increment_activity_messages_total(MessageType::AssetInfo);
198 if let Err(e) = store.set_batch_asset_info(assets).await {
199 error!("failed to set asset info with error: {e}")
200 } else {
201 info!("asset info updated successfully");
202 }
203 }
204 Ok(Some(Ok(Data::Ping))) => metrics.increment_activity_messages_total(MessageType::Ping),
205 Ok(Some(Ok(Data::Unused))) => {
206 metrics.increment_activity_messages_total(MessageType::Unused);
207 debug!("received irrelevant data");
208 },
209 Ok(Some(Err(e))) => {
210 metrics.increment_activity_messages_total(MessageType::Error);
211 error!("{}", e);
212 },
213 }
214 }
215 }
216 }
217}
218
219/// Helper function to establish a connection with exponential backoff.
220///
221/// This function attempts to connect to the WebSocket provider and subscribe
222/// to the specified asset IDs. If the connection or subscription fails, it will
223/// retry with an exponential backoff strategy.
224///
225/// # Returns
226///
227/// A connected and subscribed provider ready to receive WebSocket messages
228async fn connect<C, P, E1, E2>(connector: &C, ids: &[String], metrics: &Metrics) -> P
229where
230 P: AssetInfoProvider<SubscriptionError = E1, ListeningError = E2>,
231 C: AssetInfoProviderConnector<Provider = P>,
232{
233 let mut retry_count = 0;
234 let mut backoff = Duration::from_secs(1);
235 let max_backoff = Duration::from_secs(64);
236
237 loop {
238 let start_time = Instant::now();
239
240 if let Ok(mut provider) = connector.connect().await {
241 if provider.subscribe(ids).await.is_ok() {
242 metrics.update_websocket_connection(
243 start_time.elapsed().as_millis(),
244 ConnectionResult::Success,
245 );
246 return provider;
247 }
248 }
249
250 retry_count += 1;
251 if backoff < max_backoff {
252 backoff *= 2;
253 }
254
255 metrics.update_websocket_connection(
256 start_time.elapsed().as_millis(),
257 ConnectionResult::Failed,
258 );
259 error!("failed to reconnect. current attempt: {}", retry_count);
260 sleep(backoff).await;
261 }
262}