pyth_hermes_client_rust/client.rs
1//! # Hermes Client
2//!
3//! This module provides a high-level client for connecting to Hermes data streams.
4//! The client maintains multiple WebSocket connections for redundancy and provides
5//! automatic deduplication of messages.
6//!
7//! ## Features
8//!
9//! - Multiple redundant WebSocket connections
10//! - Automatic message deduplication
11//! - Exponential backoff for reconnections
12//! - Configurable timeouts and channel capacities
13//! - Builder pattern for easy configuration
14//!
15//! ## Basic Usage
16//!
17//! ```rust,ignore
18//! use hermes_client::{HermesClientBuilder, HermesClientMessageSubscribe};
19//!
20//! #[tokio::main]
21//! async fn main() -> anyhow::Result<()> {
22//! let mut client = HermesClientBuilder::default()
23//! .with_num_connections(2)
24//! .build()?;
25//!
26//! let mut receiver = client.start().await?;
27//!
28//! // Subscribe to price feeds
29//! let subscribe_request = HermesClientMessageSubscribe {
30//! // ... configure subscription
31//! };
32//! client.subscribe(subscribe_request).await?;
33//!
34//! // Process incoming messages
35//! while let Some(response) = receiver.recv().await {
36//! println!("Received: {:?}", response);
37//! }
38//!
39//! Ok(())
40//! }
41//! ```
42
43use std::time::Duration;
44
45use crate::{
46 backoff::{HermesExponentialBackoff, HermesExponentialBackoffBuilder},
47 resilient_ws_connection::HermesResilientWSConnection,
48 ws_connection::{
49 HermesClientMessage, HermesClientMessageSubscribe, HermesClientMessageUnsubscribe,
50 HermesServerMessage,
51 },
52 CHANNEL_CAPACITY,
53};
54use anyhow::{bail, Result};
55use backoff::ExponentialBackoff;
56use tokio::sync::mpsc::{self, error::TrySendError};
57use tracing::{error, warn};
58use ttl_cache::TtlCache;
59use url::Url;
60
61const DEDUP_CACHE_SIZE: usize = 100_000;
62const DEDUP_TTL: Duration = Duration::from_secs(10);
63
64const DEFAULT_ENDPOINTS: [&str; 1] = ["wss://hermes.pyth.network/ws"];
65const DEFAULT_NUM_CONNECTIONS: usize = 3;
66const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
67
68/// A high-performance client for connecting to Hermes data streams.
69///
70/// The `HermesClient` maintains multiple WebSocket connections to Hermes endpoints
71/// for redundancy. It automatically handles connection management,
72/// message deduplication, and provides a unified stream of price updates.
73///
74/// ## Architecture
75///
76/// - Maintains multiple WebSocket connections to different endpoints
77/// - Uses a TTL cache for deduplicating messages across connections
78/// - Provides a single channel for consuming deduplicated messages
79/// - Handles connection failures with exponential backoff
80pub struct HermesClient {
81 endpoints: Vec<Url>,
82 num_connections: usize,
83 ws_connections: Vec<HermesResilientWSConnection>,
84 backoff: ExponentialBackoff,
85 timeout: Duration,
86 channel_capacity: usize,
87}
88
89impl HermesClient {
90 /// Creates a new Hermes client instance.
91 ///
92 /// This is a low-level constructor. Consider using [`HermesClientBuilder`] for a more
93 /// convenient way to create clients with sensible defaults.
94 ///
95 /// # Arguments
96 ///
97 /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty.
98 /// * `num_connections` - The number of WebSocket connections to maintain for redundancy
99 /// * `backoff` - The exponential backoff configuration for connection retries
100 /// * `timeout` - The timeout duration for WebSocket operations
101 /// * `channel_capacity` - The capacity of the message channel
102 ///
103 /// # Returns
104 ///
105 /// Returns `Ok(HermesClient)` on success, or an error if the configuration is invalid.
106 ///
107 /// # Errors
108 ///
109 /// Returns an error if:
110 /// - The `endpoints` vector is empty
111 ///
112 pub fn new(
113 endpoints: Vec<Url>,
114 num_connections: usize,
115 backoff: HermesExponentialBackoff,
116 timeout: Duration,
117 channel_capacity: usize,
118 ) -> Result<Self> {
119 if endpoints.is_empty() {
120 bail!("At least one endpoint must be provided");
121 }
122 Ok(Self {
123 endpoints,
124 num_connections,
125 ws_connections: Vec::with_capacity(num_connections),
126 backoff: backoff.into(),
127 timeout,
128 channel_capacity,
129 })
130 }
131
132 /// Starts the client and begins establishing WebSocket connections.
133 ///
134 /// This method initializes all WebSocket connections and starts the message processing
135 /// loop. It returns a receiver channel that will yield deduplicated messages from
136 /// all connections.
137 ///
138 /// # Returns
139 ///
140 /// Returns a `Receiver<HermesServerMessage>` that yields deduplicated messages from all
141 /// WebSocket connections. The receiver will continue to yield messages until
142 /// all connections are closed or the client is dropped.
143 ///
144 /// # Errors
145 ///
146 /// This method itself doesn't return errors, but individual connection failures
147 /// are handled internally with automatic reconnection using the configured backoff
148 /// strategy.
149 ///
150 /// # Message Deduplication
151 ///
152 /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures
153 /// that identical messages received from multiple connections are only delivered once.
154 ///
155 pub async fn start(&mut self) -> Result<mpsc::Receiver<HermesServerMessage>> {
156 let (sender, receiver) = mpsc::channel::<HermesServerMessage>(self.channel_capacity);
157 let (ws_connection_sender, mut ws_connection_receiver) =
158 mpsc::channel::<HermesServerMessage>(CHANNEL_CAPACITY);
159
160 for i in 0..self.num_connections {
161 let endpoint = self.endpoints[i % self.endpoints.len()].clone();
162 let connection = HermesResilientWSConnection::new(
163 endpoint,
164 self.backoff.clone(),
165 self.timeout,
166 ws_connection_sender.clone(),
167 );
168 self.ws_connections.push(connection);
169 }
170
171 let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);
172
173 tokio::spawn(async move {
174 while let Some(response) = ws_connection_receiver.recv().await {
175 let cache_key = response.cache_key();
176 if seen_updates.contains_key(&cache_key) {
177 continue;
178 }
179 seen_updates.insert(cache_key, response.clone(), DEDUP_TTL);
180
181 match sender.try_send(response) {
182 Ok(_) => (),
183 Err(TrySendError::Full(r)) => {
184 warn!("Sender channel is full, responses will be delayed");
185 if sender.send(r).await.is_err() {
186 error!("Sender channel is closed, stopping client");
187 }
188 }
189 Err(TrySendError::Closed(_)) => {
190 error!("Sender channel is closed, stopping client");
191 }
192 }
193 }
194 });
195
196 Ok(receiver)
197 }
198
199 /// Subscribes to data streams across all WebSocket connections.
200 ///
201 /// This method sends the subscription request to all active WebSocket connections,
202 /// ensuring redundancy. If any connection fails to subscribe,
203 /// an error is returned, but other connections may still be subscribed.
204 ///
205 /// # Arguments
206 ///
207 /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to
208 ///
209 /// # Returns
210 ///
211 /// Returns `Ok(())` if the subscription was successfully sent to all connections,
212 /// or an error if any connection failed to process the subscription.
213 ///
214 pub async fn subscribe(
215 &mut self,
216 subscribe_request: HermesClientMessageSubscribe,
217 ) -> Result<()> {
218 for connection in &mut self.ws_connections {
219 connection
220 .send_request(HermesClientMessage::Subscribe(subscribe_request.clone()))
221 .await?;
222 }
223 Ok(())
224 }
225
226 /// Unsubscribes from a specific data stream across all WebSocket connections.
227 ///
228 /// This method sends an unsubscribe request for the specified subscription ID
229 /// to all active WebSocket connections.
230 ///
231 /// # Arguments
232 ///
233 /// * `unsubscribe_request` - The unsubscribe request specifying which data streams to unsubscribe from
234 ///
235 /// # Returns
236 ///
237 /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections,
238 /// or an error if any connection failed to process the request.
239 ///
240 pub async fn unsubscribe(
241 &mut self,
242 unsubscribe_request: HermesClientMessageUnsubscribe,
243 ) -> Result<()> {
244 for connection in &mut self.ws_connections {
245 connection
246 .send_request(HermesClientMessage::Unsubscribe(
247 unsubscribe_request.clone(),
248 ))
249 .await?;
250 }
251 Ok(())
252 }
253}
254
255/// A builder for creating [`HermesClient`] instances with customizable configuration.
256///
257/// The builder provides a convenient way to configure a Hermes client with sensible
258/// defaults while allowing customization of all parameters. It follows the builder pattern
259/// for a fluent API.
260///
261/// ## Default Configuration
262///
263/// - **Endpoints**: Uses Hermes's default production endpoints
264/// - **Connections**: 3 concurrent WebSocket connections
265/// - **Timeout**: 5 seconds for WebSocket operations
266/// - **Backoff**: Exponential backoff with default settings
267/// - **Channel Capacity**: Uses the default 1000
268///
269pub struct HermesClientBuilder {
270 endpoints: Vec<Url>,
271 num_connections: usize,
272 backoff: HermesExponentialBackoff,
273 timeout: Duration,
274 channel_capacity: usize,
275}
276
277impl Default for HermesClientBuilder {
278 fn default() -> Self {
279 Self {
280 endpoints: DEFAULT_ENDPOINTS
281 .iter()
282 .map(|&s| s.parse().unwrap())
283 .collect(),
284 num_connections: DEFAULT_NUM_CONNECTIONS,
285 backoff: HermesExponentialBackoffBuilder::default().build(),
286 timeout: DEFAULT_TIMEOUT,
287 channel_capacity: CHANNEL_CAPACITY,
288 }
289 }
290}
291
292impl HermesClientBuilder {
293 /// Sets custom WebSocket endpoints for the client.
294 ///
295 /// By default, the client uses Hermes's production endpoints. Use this method
296 /// to connect to different environments (staging, local development) or to use
297 /// custom endpoint configurations.
298 ///
299 /// # Arguments
300 ///
301 /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty.
302 ///
303 pub fn with_endpoints(mut self, endpoints: Vec<Url>) -> Self {
304 self.endpoints = endpoints;
305 self
306 }
307
308 /// Sets the number of concurrent WebSocket connections to maintain.
309 ///
310 /// More connections provide better redundancy and can improve throughput,
311 /// but also consume more resources.
312 ///
313 /// # Arguments
314 ///
315 /// * `num_connections` - The number of WebSocket connections (must be > 0)
316 ///
317 pub fn with_num_connections(mut self, num_connections: usize) -> Self {
318 self.num_connections = num_connections;
319 self
320 }
321
322 /// Sets the exponential backoff configuration for connection retries.
323 ///
324 /// The backoff strategy determines how the client handles connection failures
325 /// and retries.
326 ///
327 /// # Arguments
328 ///
329 /// * `backoff` - The exponential backoff configuration
330 ///
331 pub fn with_backoff(mut self, backoff: HermesExponentialBackoff) -> Self {
332 self.backoff = backoff;
333 self
334 }
335
336 /// Sets the timeout duration for WebSocket operations.
337 ///
338 /// This timeout applies to each WebSocket connection,
339 /// if no response is received within this duration,
340 /// the connection will be considered failed and retried.
341 ///
342 /// # Arguments
343 ///
344 /// * `timeout` - The timeout duration for each WebSocket
345 ///
346 pub fn with_timeout(mut self, timeout: Duration) -> Self {
347 self.timeout = timeout;
348 self
349 }
350
351 /// Sets the capacity of the internal message channel.
352 ///
353 /// This determines how many messages can be buffered internally before
354 /// the client starts applying backpressure.
355 ///
356 /// # Arguments
357 ///
358 /// * `channel_capacity` - The channel capacity (number of messages)
359 ///
360 pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
361 self.channel_capacity = channel_capacity;
362 self
363 }
364
365 /// Builds the configured [`HermesClient`] instance.
366 ///
367 /// This consumes the builder and creates a new client with the specified
368 /// configuration. The client is ready to use but connections are not
369 /// established until [`HermesClient::start`] is called.
370 ///
371 /// # Returns
372 ///
373 /// Returns `Ok(HermesClient)` on success, or an error if the configuration
374 /// is invalid.
375 ///
376 /// # Errors
377 ///
378 /// Returns an error if:
379 /// - No endpoints are configured
380 /// - Any configuration parameter is invalid
381 ///
382 pub fn build(self) -> Result<HermesClient> {
383 HermesClient::new(
384 self.endpoints,
385 self.num_connections,
386 self.backoff,
387 self.timeout,
388 self.channel_capacity,
389 )
390 }
391}