pyth_lazer_client/client.rs
1//! # Pyth Lazer Client
2//!
3//! This module provides a high-level client for connecting to Pyth Lazer data streams.
4//! The client maintains multiple WebSocket connections for redundancy and provides
5//! automatic deduplication of messages.
6//!
7//! ## Features
8//!
9//! - Multiple redundant WebSocket connections
10//! - Automatic message deduplication
11//! - Exponential backoff for reconnections
12//! - Configurable timeouts and channel capacities
13//! - Builder pattern for easy configuration
14//!
15//! ## Basic Usage
16//!
17//! ```rust,ignore
18//! use pyth_lazer_client::PythLazerClientBuilder;
19//! use pyth_lazer_protocol::subscription::SubscribeRequest;
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//! let mut client = PythLazerClientBuilder::new("your_access_token".to_string())
24//! .with_num_connections(2)
25//! .build()?;
26//!
27//! let mut receiver = client.start().await?;
28//!
29//! // Subscribe to price feeds
30//! let subscribe_request = SubscribeRequest {
31//! // ... configure subscription
32//! };
33//! client.subscribe(subscribe_request).await?;
34//!
35//! // Process incoming messages
36//! while let Some(response) = receiver.recv().await {
37//! println!("Received: {:?}", response);
38//! }
39//!
40//! Ok(())
41//! }
42//! ```
43
44use std::time::Duration;
45
46use crate::{
47 backoff::{PythLazerExponentialBackoff, PythLazerExponentialBackoffBuilder},
48 resilient_ws_connection::PythLazerResilientWSConnection,
49 ws_connection::AnyResponse,
50 CHANNEL_CAPACITY,
51};
52use anyhow::{bail, Result};
53use backoff::ExponentialBackoff;
54use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId};
55use tokio::sync::mpsc::{self, error::TrySendError};
56use tracing::{error, warn};
57use ttl_cache::TtlCache;
58use url::Url;
59
60const DEDUP_CACHE_SIZE: usize = 100_000;
61const DEDUP_TTL: Duration = Duration::from_secs(10);
62
63const DEFAULT_ENDPOINTS: [&str; 2] = [
64 "wss://pyth-lazer-0.dourolabs.app/v1/stream",
65 "wss://pyth-lazer-1.dourolabs.app/v1/stream",
66];
67const DEFAULT_NUM_CONNECTIONS: usize = 4;
68const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
69
70/// A high-performance client for connecting to Pyth Lazer data streams.
71///
72/// The `PythLazerClient` maintains multiple WebSocket connections to Pyth Lazer endpoints
73/// for redundancy. It automatically handles connection management,
74/// message deduplication, and provides a unified stream of price updates.
75///
76/// ## Architecture
77///
78/// - Maintains multiple WebSocket connections to different endpoints
79/// - Uses a TTL cache for deduplicating messages across connections
80/// - Provides a single channel for consuming deduplicated messages
81/// - Handles connection failures with exponential backoff
82pub struct PythLazerClient {
83 endpoints: Vec<Url>,
84 access_token: String,
85 num_connections: usize,
86 ws_connections: Vec<PythLazerResilientWSConnection>,
87 backoff: ExponentialBackoff,
88 timeout: Duration,
89 channel_capacity: usize,
90}
91
92impl PythLazerClient {
93 /// Creates a new Pyth Lazer client instance.
94 ///
95 /// This is a low-level constructor. Consider using [`PythLazerClientBuilder`] for a more
96 /// convenient way to create clients with sensible defaults.
97 ///
98 /// # Arguments
99 ///
100 /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty.
101 /// * `access_token` - The authentication token for accessing Pyth Lazer services
102 /// * `num_connections` - The number of WebSocket connections to maintain for redundancy
103 /// * `backoff` - The exponential backoff configuration for connection retries
104 /// * `timeout` - The timeout duration for WebSocket operations
105 /// * `channel_capacity` - The capacity of the message channel
106 ///
107 /// # Returns
108 ///
109 /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration is invalid.
110 ///
111 /// # Errors
112 ///
113 /// Returns an error if:
114 /// - The `endpoints` vector is empty
115 ///
116 pub fn new(
117 endpoints: Vec<Url>,
118 access_token: String,
119 num_connections: usize,
120 backoff: PythLazerExponentialBackoff,
121 timeout: Duration,
122 channel_capacity: usize,
123 ) -> Result<Self> {
124 if endpoints.is_empty() {
125 bail!("At least one endpoint must be provided");
126 }
127 Ok(Self {
128 endpoints,
129 access_token,
130 num_connections,
131 ws_connections: Vec::with_capacity(num_connections),
132 backoff: backoff.into(),
133 timeout,
134 channel_capacity,
135 })
136 }
137
138 /// Starts the client and begins establishing WebSocket connections.
139 ///
140 /// This method initializes all WebSocket connections and starts the message processing
141 /// loop. It returns a receiver channel that will yield deduplicated messages from
142 /// all connections.
143 ///
144 /// # Returns
145 ///
146 /// Returns a `Receiver<AnyResponse>` that yields deduplicated messages from all
147 /// WebSocket connections. The receiver will continue to yield messages until
148 /// all connections are closed or the client is dropped.
149 ///
150 /// # Errors
151 ///
152 /// This method itself doesn't return errors, but individual connection failures
153 /// are handled internally with automatic reconnection using the configured backoff
154 /// strategy.
155 ///
156 /// # Message Deduplication
157 ///
158 /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures
159 /// that identical messages received from multiple connections are only delivered once.
160 ///
161 pub async fn start(&mut self) -> Result<mpsc::Receiver<AnyResponse>> {
162 let (sender, receiver) = mpsc::channel::<AnyResponse>(self.channel_capacity);
163 let (ws_connection_sender, mut ws_connection_receiver) =
164 mpsc::channel::<AnyResponse>(CHANNEL_CAPACITY);
165
166 for i in 0..self.num_connections {
167 let endpoint = self.endpoints[i % self.endpoints.len()].clone();
168 let connection = PythLazerResilientWSConnection::new(
169 endpoint,
170 self.access_token.clone(),
171 self.backoff.clone(),
172 self.timeout,
173 ws_connection_sender.clone(),
174 );
175 self.ws_connections.push(connection);
176 }
177
178 let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);
179
180 tokio::spawn(async move {
181 while let Some(response) = ws_connection_receiver.recv().await {
182 let cache_key = response.cache_key();
183 if seen_updates.contains_key(&cache_key) {
184 continue;
185 }
186 seen_updates.insert(cache_key, response.clone(), DEDUP_TTL);
187
188 match sender.try_send(response) {
189 Ok(_) => (),
190 Err(TrySendError::Full(r)) => {
191 warn!("Sender channel is full, responses will be delayed");
192 if sender.send(r).await.is_err() {
193 error!("Sender channel is closed, stopping client");
194 }
195 }
196 Err(TrySendError::Closed(_)) => {
197 error!("Sender channel is closed, stopping client");
198 }
199 }
200 }
201 });
202
203 Ok(receiver)
204 }
205
206 /// Subscribes to data streams across all WebSocket connections.
207 ///
208 /// This method sends the subscription request to all active WebSocket connections,
209 /// ensuring redundancy. If any connection fails to subscribe,
210 /// an error is returned, but other connections may still be subscribed.
211 ///
212 /// # Arguments
213 ///
214 /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to
215 ///
216 /// # Returns
217 ///
218 /// Returns `Ok(())` if the subscription was successfully sent to all connections,
219 /// or an error if any connection failed to process the subscription.
220 ///
221 pub async fn subscribe(&mut self, subscribe_request: SubscribeRequest) -> Result<()> {
222 for connection in &mut self.ws_connections {
223 connection.subscribe(subscribe_request.clone()).await?;
224 }
225 Ok(())
226 }
227
228 /// Unsubscribes from a specific data stream across all WebSocket connections.
229 ///
230 /// This method sends an unsubscribe request for the specified subscription ID
231 /// to all active WebSocket connections.
232 ///
233 /// # Arguments
234 ///
235 /// * `subscription_id` - The ID of the subscription to cancel
236 ///
237 /// # Returns
238 ///
239 /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections,
240 /// or an error if any connection failed to process the request.
241 ///
242 pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
243 for connection in &mut self.ws_connections {
244 connection.unsubscribe(subscription_id).await?;
245 }
246 Ok(())
247 }
248}
249
250/// A builder for creating [`PythLazerClient`] instances with customizable configuration.
251///
252/// The builder provides a convenient way to configure a Pyth Lazer client with sensible
253/// defaults while allowing customization of all parameters. It follows the builder pattern
254/// for a fluent API.
255///
256/// ## Default Configuration
257///
258/// - **Endpoints**: Uses Pyth Lazer's default production endpoints
259/// - **Connections**: 4 concurrent WebSocket connections
260/// - **Timeout**: 5 seconds for WebSocket operations
261/// - **Backoff**: Exponential backoff with default settings
262/// - **Channel Capacity**: Uses the default 1000
263///
264pub struct PythLazerClientBuilder {
265 endpoints: Vec<Url>,
266 access_token: String,
267 num_connections: usize,
268 backoff: PythLazerExponentialBackoff,
269 timeout: Duration,
270 channel_capacity: usize,
271}
272
273impl PythLazerClientBuilder {
274 /// Creates a new builder with default configuration.
275 ///
276 /// This initializes the builder with sensible defaults for production use:
277 /// - Default Pyth Lazer endpoints
278 /// - 4 WebSocket connections
279 /// - 5-second timeout
280 ///
281 /// # Arguments
282 ///
283 /// * `access_token` - The authentication token for accessing Pyth Lazer services
284 ///
285 pub fn new(access_token: String) -> Self {
286 Self {
287 endpoints: DEFAULT_ENDPOINTS
288 .iter()
289 .map(|&s| s.parse().unwrap())
290 .collect(),
291 access_token,
292 num_connections: DEFAULT_NUM_CONNECTIONS,
293 backoff: PythLazerExponentialBackoffBuilder::default().build(),
294 timeout: DEFAULT_TIMEOUT,
295 channel_capacity: CHANNEL_CAPACITY,
296 }
297 }
298
299 /// Sets custom WebSocket endpoints for the client.
300 ///
301 /// By default, the client uses Pyth Lazer's production endpoints. Use this method
302 /// to connect to different environments (staging, local development) or to use
303 /// custom endpoint configurations.
304 ///
305 /// # Arguments
306 ///
307 /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty.
308 ///
309 pub fn with_endpoints(mut self, endpoints: Vec<Url>) -> Self {
310 self.endpoints = endpoints;
311 self
312 }
313
314 /// Sets the number of concurrent WebSocket connections to maintain.
315 ///
316 /// More connections provide better redundancy and can improve throughput,
317 /// but also consume more resources.
318 ///
319 /// # Arguments
320 ///
321 /// * `num_connections` - The number of WebSocket connections (must be > 0)
322 ///
323 pub fn with_num_connections(mut self, num_connections: usize) -> Self {
324 self.num_connections = num_connections;
325 self
326 }
327
328 /// Sets the exponential backoff configuration for connection retries.
329 ///
330 /// The backoff strategy determines how the client handles connection failures
331 /// and retries.
332 ///
333 /// # Arguments
334 ///
335 /// * `backoff` - The exponential backoff configuration
336 ///
337 pub fn with_backoff(mut self, backoff: PythLazerExponentialBackoff) -> Self {
338 self.backoff = backoff;
339 self
340 }
341
342 /// Sets the timeout duration for WebSocket operations.
343 ///
344 /// This timeout applies to each WebSocket connection,
345 /// if no response is received within this duration,
346 /// the connection will be considered failed and retried.
347 ///
348 /// # Arguments
349 ///
350 /// * `timeout` - The timeout duration for each WebSocket
351 ///
352 pub fn with_timeout(mut self, timeout: Duration) -> Self {
353 self.timeout = timeout;
354 self
355 }
356
357 /// Sets the capacity of the internal message channel.
358 ///
359 /// This determines how many messages can be buffered internally before
360 /// the client starts applying backpressure.
361 ///
362 /// # Arguments
363 ///
364 /// * `channel_capacity` - The channel capacity (number of messages)
365 ///
366 pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
367 self.channel_capacity = channel_capacity;
368 self
369 }
370
371 /// Builds the configured [`PythLazerClient`] instance.
372 ///
373 /// This consumes the builder and creates a new client with the specified
374 /// configuration. The client is ready to use but connections are not
375 /// established until [`PythLazerClient::start`] is called.
376 ///
377 /// # Returns
378 ///
379 /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration
380 /// is invalid.
381 ///
382 /// # Errors
383 ///
384 /// Returns an error if:
385 /// - No endpoints are configured
386 /// - Any configuration parameter is invalid
387 ///
388 pub fn build(self) -> Result<PythLazerClient> {
389 PythLazerClient::new(
390 self.endpoints,
391 self.access_token,
392 self.num_connections,
393 self.backoff,
394 self.timeout,
395 self.channel_capacity,
396 )
397 }
398}