pyth_hermes_client_rust/
client.rs

1//! # Hermes Client
2//!
3//! This module provides a high-level client for connecting to Hermes data streams.
4//! The client maintains multiple WebSocket connections for redundancy and provides
5//! automatic deduplication of messages.
6//!
7//! ## Features
8//!
9//! - Multiple redundant WebSocket connections
10//! - Automatic message deduplication
11//! - Exponential backoff for reconnections
12//! - Configurable timeouts and channel capacities
13//! - Builder pattern for easy configuration
14//!
15//! ## Basic Usage
16//!
17//! ```rust,ignore
18//! use hermes_client::{HermesClientBuilder, HermesClientMessageSubscribe};
19//!
20//! #[tokio::main]
21//! async fn main() -> anyhow::Result<()> {
22//!     let mut client = HermesClientBuilder::default()
23//!         .with_num_connections(2)
24//!         .build()?;
25//!
26//!     let mut receiver = client.start().await?;
27//!
28//!     // Subscribe to price feeds
29//!     let subscribe_request = HermesClientMessageSubscribe {
30//!         // ... configure subscription
31//!     };
32//!     client.subscribe(subscribe_request).await?;
33//!
34//!     // Process incoming messages
35//!     while let Some(response) = receiver.recv().await {
36//!         println!("Received: {:?}", response);
37//!     }
38//!
39//!     Ok(())
40//! }
41//! ```
42
43use std::time::Duration;
44
45use crate::{
46    backoff::{HermesExponentialBackoff, HermesExponentialBackoffBuilder},
47    resilient_ws_connection::HermesResilientWSConnection,
48    ws_connection::{
49        HermesClientMessage, HermesClientMessageSubscribe, HermesClientMessageUnsubscribe,
50        HermesServerMessage,
51    },
52    CHANNEL_CAPACITY,
53};
54use anyhow::{bail, Result};
55use backoff::ExponentialBackoff;
56use tokio::sync::mpsc::{self, error::TrySendError};
57use tracing::{error, warn};
58use ttl_cache::TtlCache;
59use url::Url;
60
61const DEDUP_CACHE_SIZE: usize = 100_000;
62const DEDUP_TTL: Duration = Duration::from_secs(10);
63
64const DEFAULT_ENDPOINTS: [&str; 1] = ["wss://hermes.pyth.network/ws"];
65const DEFAULT_NUM_CONNECTIONS: usize = 3;
66const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
67
68/// A high-performance client for connecting to Hermes data streams.
69///
70/// The `HermesClient` maintains multiple WebSocket connections to Hermes endpoints
71/// for redundancy. It automatically handles connection management,
72/// message deduplication, and provides a unified stream of price updates.
73///
74/// ## Architecture
75///
76/// - Maintains multiple WebSocket connections to different endpoints
77/// - Uses a TTL cache for deduplicating messages across connections
78/// - Provides a single channel for consuming deduplicated messages
79/// - Handles connection failures with exponential backoff
80pub struct HermesClient {
81    endpoints: Vec<Url>,
82    num_connections: usize,
83    ws_connections: Vec<HermesResilientWSConnection>,
84    backoff: ExponentialBackoff,
85    timeout: Duration,
86    channel_capacity: usize,
87}
88
89impl HermesClient {
90    /// Creates a new Hermes client instance.
91    ///
92    /// This is a low-level constructor. Consider using [`HermesClientBuilder`] for a more
93    /// convenient way to create clients with sensible defaults.
94    ///
95    /// # Arguments
96    ///
97    /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty.
98    /// * `num_connections` - The number of WebSocket connections to maintain for redundancy
99    /// * `backoff` - The exponential backoff configuration for connection retries
100    /// * `timeout` - The timeout duration for WebSocket operations
101    /// * `channel_capacity` - The capacity of the message channel
102    ///
103    /// # Returns
104    ///
105    /// Returns `Ok(HermesClient)` on success, or an error if the configuration is invalid.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if:
110    /// - The `endpoints` vector is empty
111    ///
112    pub fn new(
113        endpoints: Vec<Url>,
114        num_connections: usize,
115        backoff: HermesExponentialBackoff,
116        timeout: Duration,
117        channel_capacity: usize,
118    ) -> Result<Self> {
119        if endpoints.is_empty() {
120            bail!("At least one endpoint must be provided");
121        }
122        Ok(Self {
123            endpoints,
124            num_connections,
125            ws_connections: Vec::with_capacity(num_connections),
126            backoff: backoff.into(),
127            timeout,
128            channel_capacity,
129        })
130    }
131
132    /// Starts the client and begins establishing WebSocket connections.
133    ///
134    /// This method initializes all WebSocket connections and starts the message processing
135    /// loop. It returns a receiver channel that will yield deduplicated messages from
136    /// all connections.
137    ///
138    /// # Returns
139    ///
140    /// Returns a `Receiver<HermesServerMessage>` that yields deduplicated messages from all
141    /// WebSocket connections. The receiver will continue to yield messages until
142    /// all connections are closed or the client is dropped.
143    ///
144    /// # Errors
145    ///
146    /// This method itself doesn't return errors, but individual connection failures
147    /// are handled internally with automatic reconnection using the configured backoff
148    /// strategy.
149    ///
150    /// # Message Deduplication
151    ///
152    /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures
153    /// that identical messages received from multiple connections are only delivered once.
154    ///
155    pub async fn start(&mut self) -> Result<mpsc::Receiver<HermesServerMessage>> {
156        let (sender, receiver) = mpsc::channel::<HermesServerMessage>(self.channel_capacity);
157        let (ws_connection_sender, mut ws_connection_receiver) =
158            mpsc::channel::<HermesServerMessage>(CHANNEL_CAPACITY);
159
160        for i in 0..self.num_connections {
161            let endpoint = self.endpoints[i % self.endpoints.len()].clone();
162            let connection = HermesResilientWSConnection::new(
163                endpoint,
164                self.backoff.clone(),
165                self.timeout,
166                ws_connection_sender.clone(),
167            );
168            self.ws_connections.push(connection);
169        }
170
171        let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);
172
173        tokio::spawn(async move {
174            while let Some(response) = ws_connection_receiver.recv().await {
175                let cache_key = response.cache_key();
176                if seen_updates.contains_key(&cache_key) {
177                    continue;
178                }
179                seen_updates.insert(cache_key, response.clone(), DEDUP_TTL);
180
181                match sender.try_send(response) {
182                    Ok(_) => (),
183                    Err(TrySendError::Full(r)) => {
184                        warn!("Sender channel is full, responses will be delayed");
185                        if sender.send(r).await.is_err() {
186                            error!("Sender channel is closed, stopping client");
187                        }
188                    }
189                    Err(TrySendError::Closed(_)) => {
190                        error!("Sender channel is closed, stopping client");
191                    }
192                }
193            }
194        });
195
196        Ok(receiver)
197    }
198
199    /// Subscribes to data streams across all WebSocket connections.
200    ///
201    /// This method sends the subscription request to all active WebSocket connections,
202    /// ensuring redundancy. If any connection fails to subscribe,
203    /// an error is returned, but other connections may still be subscribed.
204    ///
205    /// # Arguments
206    ///
207    /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to
208    ///
209    /// # Returns
210    ///
211    /// Returns `Ok(())` if the subscription was successfully sent to all connections,
212    /// or an error if any connection failed to process the subscription.
213    ///
214    pub async fn subscribe(
215        &mut self,
216        subscribe_request: HermesClientMessageSubscribe,
217    ) -> Result<()> {
218        for connection in &mut self.ws_connections {
219            connection
220                .send_request(HermesClientMessage::Subscribe(subscribe_request.clone()))
221                .await?;
222        }
223        Ok(())
224    }
225
226    /// Unsubscribes from a specific data stream across all WebSocket connections.
227    ///
228    /// This method sends an unsubscribe request for the specified subscription ID
229    /// to all active WebSocket connections.
230    ///
231    /// # Arguments
232    ///
233    /// * `unsubscribe_request` - The unsubscribe request specifying which data streams to unsubscribe from
234    ///
235    /// # Returns
236    ///
237    /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections,
238    /// or an error if any connection failed to process the request.
239    ///
240    pub async fn unsubscribe(
241        &mut self,
242        unsubscribe_request: HermesClientMessageUnsubscribe,
243    ) -> Result<()> {
244        for connection in &mut self.ws_connections {
245            connection
246                .send_request(HermesClientMessage::Unsubscribe(
247                    unsubscribe_request.clone(),
248                ))
249                .await?;
250        }
251        Ok(())
252    }
253}
254
255/// A builder for creating [`HermesClient`] instances with customizable configuration.
256///
257/// The builder provides a convenient way to configure a Hermes client with sensible
258/// defaults while allowing customization of all parameters. It follows the builder pattern
259/// for a fluent API.
260///
261/// ## Default Configuration
262///
263/// - **Endpoints**: Uses Hermes's default production endpoints
264/// - **Connections**: 3 concurrent WebSocket connections
265/// - **Timeout**: 5 seconds for WebSocket operations
266/// - **Backoff**: Exponential backoff with default settings
267/// - **Channel Capacity**: Uses the default 1000
268///
269pub struct HermesClientBuilder {
270    endpoints: Vec<Url>,
271    num_connections: usize,
272    backoff: HermesExponentialBackoff,
273    timeout: Duration,
274    channel_capacity: usize,
275}
276
277impl Default for HermesClientBuilder {
278    fn default() -> Self {
279        Self {
280            endpoints: DEFAULT_ENDPOINTS
281                .iter()
282                .map(|&s| s.parse().unwrap())
283                .collect(),
284            num_connections: DEFAULT_NUM_CONNECTIONS,
285            backoff: HermesExponentialBackoffBuilder::default().build(),
286            timeout: DEFAULT_TIMEOUT,
287            channel_capacity: CHANNEL_CAPACITY,
288        }
289    }
290}
291
292impl HermesClientBuilder {
293    /// Sets custom WebSocket endpoints for the client.
294    ///
295    /// By default, the client uses Hermes's production endpoints. Use this method
296    /// to connect to different environments (staging, local development) or to use
297    /// custom endpoint configurations.
298    ///
299    /// # Arguments
300    ///
301    /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty.
302    ///
303    pub fn with_endpoints(mut self, endpoints: Vec<Url>) -> Self {
304        self.endpoints = endpoints;
305        self
306    }
307
308    /// Sets the number of concurrent WebSocket connections to maintain.
309    ///
310    /// More connections provide better redundancy and can improve throughput,
311    /// but also consume more resources.
312    ///
313    /// # Arguments
314    ///
315    /// * `num_connections` - The number of WebSocket connections (must be > 0)
316    ///
317    pub fn with_num_connections(mut self, num_connections: usize) -> Self {
318        self.num_connections = num_connections;
319        self
320    }
321
322    /// Sets the exponential backoff configuration for connection retries.
323    ///
324    /// The backoff strategy determines how the client handles connection failures
325    /// and retries.
326    ///
327    /// # Arguments
328    ///
329    /// * `backoff` - The exponential backoff configuration
330    ///
331    pub fn with_backoff(mut self, backoff: HermesExponentialBackoff) -> Self {
332        self.backoff = backoff;
333        self
334    }
335
336    /// Sets the timeout duration for WebSocket operations.
337    ///
338    /// This timeout applies to each WebSocket connection,
339    /// if no response is received within this duration,
340    /// the connection will be considered failed and retried.
341    ///
342    /// # Arguments
343    ///
344    /// * `timeout` - The timeout duration for each WebSocket
345    ///
346    pub fn with_timeout(mut self, timeout: Duration) -> Self {
347        self.timeout = timeout;
348        self
349    }
350
351    /// Sets the capacity of the internal message channel.
352    ///
353    /// This determines how many messages can be buffered internally before
354    /// the client starts applying backpressure.
355    ///
356    /// # Arguments
357    ///
358    /// * `channel_capacity` - The channel capacity (number of messages)
359    ///
360    pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
361        self.channel_capacity = channel_capacity;
362        self
363    }
364
365    /// Builds the configured [`HermesClient`] instance.
366    ///
367    /// This consumes the builder and creates a new client with the specified
368    /// configuration. The client is ready to use but connections are not
369    /// established until [`HermesClient::start`] is called.
370    ///
371    /// # Returns
372    ///
373    /// Returns `Ok(HermesClient)` on success, or an error if the configuration
374    /// is invalid.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if:
379    /// - No endpoints are configured
380    /// - Any configuration parameter is invalid
381    ///
382    pub fn build(self) -> Result<HermesClient> {
383        HermesClient::new(
384            self.endpoints,
385            self.num_connections,
386            self.backoff,
387            self.timeout,
388            self.channel_capacity,
389        )
390    }
391}