pyth_lazer_client/
client.rs

1//! # Pyth Lazer Client
2//!
3//! This module provides a high-level client for connecting to Pyth Lazer data streams.
4//! The client maintains multiple WebSocket connections for redundancy and provides
5//! automatic deduplication of messages.
6//!
7//! ## Features
8//!
9//! - Multiple redundant WebSocket connections
10//! - Automatic message deduplication
11//! - Exponential backoff for reconnections
12//! - Configurable timeouts and channel capacities
13//! - Builder pattern for easy configuration
14//!
15//! ## Basic Usage
16//!
17//! ```rust,ignore
18//! use pyth_lazer_client::PythLazerClientBuilder;
19//! use pyth_lazer_protocol::subscription::SubscribeRequest;
20//!
21//! #[tokio::main]
22//! async fn main() -> anyhow::Result<()> {
23//!     let mut client = PythLazerClientBuilder::new("your_access_token".to_string())
24//!         .with_num_connections(2)
25//!         .build()?;
26//!
27//!     let mut receiver = client.start().await?;
28//!
29//!     // Subscribe to price feeds
30//!     let subscribe_request = SubscribeRequest {
31//!         // ... configure subscription
32//!     };
33//!     client.subscribe(subscribe_request).await?;
34//!
35//!     // Process incoming messages
36//!     while let Some(response) = receiver.recv().await {
37//!         println!("Received: {:?}", response);
38//!     }
39//!
40//!     Ok(())
41//! }
42//! ```
43
44use std::time::Duration;
45
46use crate::{
47    backoff::{PythLazerExponentialBackoff, PythLazerExponentialBackoffBuilder},
48    resilient_ws_connection::PythLazerResilientWSConnection,
49    ws_connection::AnyResponse,
50    CHANNEL_CAPACITY,
51};
52use anyhow::{bail, Result};
53use backoff::ExponentialBackoff;
54use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId};
55use tokio::sync::mpsc::{self, error::TrySendError};
56use tracing::{error, warn};
57use ttl_cache::TtlCache;
58use url::Url;
59
60const DEDUP_CACHE_SIZE: usize = 100_000;
61const DEDUP_TTL: Duration = Duration::from_secs(10);
62
63const DEFAULT_ENDPOINTS: [&str; 2] = [
64    "wss://pyth-lazer-0.dourolabs.app/v1/stream",
65    "wss://pyth-lazer-1.dourolabs.app/v1/stream",
66];
67const DEFAULT_NUM_CONNECTIONS: usize = 4;
68const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
69
70/// A high-performance client for connecting to Pyth Lazer data streams.
71///
72/// The `PythLazerClient` maintains multiple WebSocket connections to Pyth Lazer endpoints
73/// for redundancy. It automatically handles connection management,
74/// message deduplication, and provides a unified stream of price updates.
75///
76/// ## Architecture
77///
78/// - Maintains multiple WebSocket connections to different endpoints
79/// - Uses a TTL cache for deduplicating messages across connections
80/// - Provides a single channel for consuming deduplicated messages
81/// - Handles connection failures with exponential backoff
82pub struct PythLazerClient {
83    endpoints: Vec<Url>,
84    access_token: String,
85    num_connections: usize,
86    ws_connections: Vec<PythLazerResilientWSConnection>,
87    backoff: ExponentialBackoff,
88    timeout: Duration,
89    channel_capacity: usize,
90}
91
92impl PythLazerClient {
93    /// Creates a new Pyth Lazer client instance.
94    ///
95    /// This is a low-level constructor. Consider using [`PythLazerClientBuilder`] for a more
96    /// convenient way to create clients with sensible defaults.
97    ///
98    /// # Arguments
99    ///
100    /// * `endpoints` - A vector of WebSocket endpoint URLs to connect to. Must not be empty.
101    /// * `access_token` - The authentication token for accessing Pyth Lazer services
102    /// * `num_connections` - The number of WebSocket connections to maintain for redundancy
103    /// * `backoff` - The exponential backoff configuration for connection retries
104    /// * `timeout` - The timeout duration for WebSocket operations
105    /// * `channel_capacity` - The capacity of the message channel
106    ///
107    /// # Returns
108    ///
109    /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration is invalid.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if:
114    /// - The `endpoints` vector is empty
115    ///
116    pub fn new(
117        endpoints: Vec<Url>,
118        access_token: String,
119        num_connections: usize,
120        backoff: PythLazerExponentialBackoff,
121        timeout: Duration,
122        channel_capacity: usize,
123    ) -> Result<Self> {
124        if endpoints.is_empty() {
125            bail!("At least one endpoint must be provided");
126        }
127        Ok(Self {
128            endpoints,
129            access_token,
130            num_connections,
131            ws_connections: Vec::with_capacity(num_connections),
132            backoff: backoff.into(),
133            timeout,
134            channel_capacity,
135        })
136    }
137
138    /// Starts the client and begins establishing WebSocket connections.
139    ///
140    /// This method initializes all WebSocket connections and starts the message processing
141    /// loop. It returns a receiver channel that will yield deduplicated messages from
142    /// all connections.
143    ///
144    /// # Returns
145    ///
146    /// Returns a `Receiver<AnyResponse>` that yields deduplicated messages from all
147    /// WebSocket connections. The receiver will continue to yield messages until
148    /// all connections are closed or the client is dropped.
149    ///
150    /// # Errors
151    ///
152    /// This method itself doesn't return errors, but individual connection failures
153    /// are handled internally with automatic reconnection using the configured backoff
154    /// strategy.
155    ///
156    /// # Message Deduplication
157    ///
158    /// Messages are deduplicated using a TTL cache with a 10-second window. This ensures
159    /// that identical messages received from multiple connections are only delivered once.
160    ///
161    pub async fn start(&mut self) -> Result<mpsc::Receiver<AnyResponse>> {
162        let (sender, receiver) = mpsc::channel::<AnyResponse>(self.channel_capacity);
163        let (ws_connection_sender, mut ws_connection_receiver) =
164            mpsc::channel::<AnyResponse>(CHANNEL_CAPACITY);
165
166        for i in 0..self.num_connections {
167            let endpoint = self.endpoints[i % self.endpoints.len()].clone();
168            let connection = PythLazerResilientWSConnection::new(
169                endpoint,
170                self.access_token.clone(),
171                self.backoff.clone(),
172                self.timeout,
173                ws_connection_sender.clone(),
174            );
175            self.ws_connections.push(connection);
176        }
177
178        let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);
179
180        tokio::spawn(async move {
181            while let Some(response) = ws_connection_receiver.recv().await {
182                let cache_key = response.cache_key();
183                if seen_updates.contains_key(&cache_key) {
184                    continue;
185                }
186                seen_updates.insert(cache_key, response.clone(), DEDUP_TTL);
187
188                match sender.try_send(response) {
189                    Ok(_) => (),
190                    Err(TrySendError::Full(r)) => {
191                        warn!("Sender channel is full, responses will be delayed");
192                        if sender.send(r).await.is_err() {
193                            error!("Sender channel is closed, stopping client");
194                        }
195                    }
196                    Err(TrySendError::Closed(_)) => {
197                        error!("Sender channel is closed, stopping client");
198                    }
199                }
200            }
201        });
202
203        Ok(receiver)
204    }
205
206    /// Subscribes to data streams across all WebSocket connections.
207    ///
208    /// This method sends the subscription request to all active WebSocket connections,
209    /// ensuring redundancy. If any connection fails to subscribe,
210    /// an error is returned, but other connections may still be subscribed.
211    ///
212    /// # Arguments
213    ///
214    /// * `subscribe_request` - The subscription request specifying which data streams to subscribe to
215    ///
216    /// # Returns
217    ///
218    /// Returns `Ok(())` if the subscription was successfully sent to all connections,
219    /// or an error if any connection failed to process the subscription.
220    ///
221    pub async fn subscribe(&mut self, subscribe_request: SubscribeRequest) -> Result<()> {
222        for connection in &mut self.ws_connections {
223            connection.subscribe(subscribe_request.clone()).await?;
224        }
225        Ok(())
226    }
227
228    /// Unsubscribes from a specific data stream across all WebSocket connections.
229    ///
230    /// This method sends an unsubscribe request for the specified subscription ID
231    /// to all active WebSocket connections.
232    ///
233    /// # Arguments
234    ///
235    /// * `subscription_id` - The ID of the subscription to cancel
236    ///
237    /// # Returns
238    ///
239    /// Returns `Ok(())` if the unsubscribe request was successfully sent to all connections,
240    /// or an error if any connection failed to process the request.
241    ///
242    pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> {
243        for connection in &mut self.ws_connections {
244            connection.unsubscribe(subscription_id).await?;
245        }
246        Ok(())
247    }
248}
249
250/// A builder for creating [`PythLazerClient`] instances with customizable configuration.
251///
252/// The builder provides a convenient way to configure a Pyth Lazer client with sensible
253/// defaults while allowing customization of all parameters. It follows the builder pattern
254/// for a fluent API.
255///
256/// ## Default Configuration
257///
258/// - **Endpoints**: Uses Pyth Lazer's default production endpoints
259/// - **Connections**: 4 concurrent WebSocket connections
260/// - **Timeout**: 5 seconds for WebSocket operations
261/// - **Backoff**: Exponential backoff with default settings
262/// - **Channel Capacity**: Uses the default 1000
263///
264pub struct PythLazerClientBuilder {
265    endpoints: Vec<Url>,
266    access_token: String,
267    num_connections: usize,
268    backoff: PythLazerExponentialBackoff,
269    timeout: Duration,
270    channel_capacity: usize,
271}
272
273impl PythLazerClientBuilder {
274    /// Creates a new builder with default configuration.
275    ///
276    /// This initializes the builder with sensible defaults for production use:
277    /// - Default Pyth Lazer endpoints
278    /// - 4 WebSocket connections
279    /// - 5-second timeout
280    ///
281    /// # Arguments
282    ///
283    /// * `access_token` - The authentication token for accessing Pyth Lazer services
284    ///
285    pub fn new(access_token: String) -> Self {
286        Self {
287            endpoints: DEFAULT_ENDPOINTS
288                .iter()
289                .map(|&s| s.parse().unwrap())
290                .collect(),
291            access_token,
292            num_connections: DEFAULT_NUM_CONNECTIONS,
293            backoff: PythLazerExponentialBackoffBuilder::default().build(),
294            timeout: DEFAULT_TIMEOUT,
295            channel_capacity: CHANNEL_CAPACITY,
296        }
297    }
298
299    /// Sets custom WebSocket endpoints for the client.
300    ///
301    /// By default, the client uses Pyth Lazer's production endpoints. Use this method
302    /// to connect to different environments (staging, local development) or to use
303    /// custom endpoint configurations.
304    ///
305    /// # Arguments
306    ///
307    /// * `endpoints` - A vector of WebSocket endpoint URLs. Must not be empty.
308    ///
309    pub fn with_endpoints(mut self, endpoints: Vec<Url>) -> Self {
310        self.endpoints = endpoints;
311        self
312    }
313
314    /// Sets the number of concurrent WebSocket connections to maintain.
315    ///
316    /// More connections provide better redundancy and can improve throughput,
317    /// but also consume more resources.
318    ///
319    /// # Arguments
320    ///
321    /// * `num_connections` - The number of WebSocket connections (must be > 0)
322    ///
323    pub fn with_num_connections(mut self, num_connections: usize) -> Self {
324        self.num_connections = num_connections;
325        self
326    }
327
328    /// Sets the exponential backoff configuration for connection retries.
329    ///
330    /// The backoff strategy determines how the client handles connection failures
331    /// and retries.
332    ///
333    /// # Arguments
334    ///
335    /// * `backoff` - The exponential backoff configuration
336    ///
337    pub fn with_backoff(mut self, backoff: PythLazerExponentialBackoff) -> Self {
338        self.backoff = backoff;
339        self
340    }
341
342    /// Sets the timeout duration for WebSocket operations.
343    ///
344    /// This timeout applies to each WebSocket connection,
345    /// if no response is received within this duration,
346    /// the connection will be considered failed and retried.
347    ///
348    /// # Arguments
349    ///
350    /// * `timeout` - The timeout duration for each WebSocket
351    ///
352    pub fn with_timeout(mut self, timeout: Duration) -> Self {
353        self.timeout = timeout;
354        self
355    }
356
357    /// Sets the capacity of the internal message channel.
358    ///
359    /// This determines how many messages can be buffered internally before
360    /// the client starts applying backpressure.
361    ///
362    /// # Arguments
363    ///
364    /// * `channel_capacity` - The channel capacity (number of messages)
365    ///
366    pub fn with_channel_capacity(mut self, channel_capacity: usize) -> Self {
367        self.channel_capacity = channel_capacity;
368        self
369    }
370
371    /// Builds the configured [`PythLazerClient`] instance.
372    ///
373    /// This consumes the builder and creates a new client with the specified
374    /// configuration. The client is ready to use but connections are not
375    /// established until [`PythLazerClient::start`] is called.
376    ///
377    /// # Returns
378    ///
379    /// Returns `Ok(PythLazerClient)` on success, or an error if the configuration
380    /// is invalid.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if:
385    /// - No endpoints are configured
386    /// - Any configuration parameter is invalid
387    ///
388    pub fn build(self) -> Result<PythLazerClient> {
389        PythLazerClient::new(
390            self.endpoints,
391            self.access_token,
392            self.num_connections,
393            self.backoff,
394            self.timeout,
395            self.channel_capacity,
396        )
397    }
398}