Skip to main content

kraken_api_client/spot/ws/
client.rs

1//! WebSocket client implementation.
2
3use std::time::Duration;
4
5use crate::error::KrakenError;
6use crate::spot::ws::stream::KrakenStream;
7
8/// WebSocket endpoint URLs.
9pub mod endpoints {
10    /// Public WebSocket endpoint.
11    pub const WS_PUBLIC: &str = "wss://ws.kraken.com/v2";
12    /// Private (authenticated) WebSocket endpoint.
13    pub const WS_AUTH: &str = "wss://ws-auth.kraken.com/v2";
14}
15
16/// Configuration for WebSocket connections.
17#[derive(Debug, Clone)]
18pub struct WsConfig {
19    /// Initial backoff duration for reconnection.
20    pub initial_backoff: Duration,
21    /// Maximum backoff duration for reconnection.
22    pub max_backoff: Duration,
23    /// Maximum number of reconnection attempts (None = infinite).
24    pub max_reconnect_attempts: Option<u32>,
25    /// Ping interval for connection health checks.
26    pub ping_interval: Duration,
27    /// Pong timeout - disconnect if no pong received.
28    pub pong_timeout: Duration,
29}
30
31impl Default for WsConfig {
32    fn default() -> Self {
33        Self {
34            initial_backoff: Duration::from_secs(1),
35            max_backoff: Duration::from_secs(60),
36            max_reconnect_attempts: None, // Infinite
37            ping_interval: Duration::from_secs(30),
38            pong_timeout: Duration::from_secs(10),
39        }
40    }
41}
42
43impl WsConfig {
44    /// Create a new configuration builder.
45    pub fn builder() -> WsConfigBuilder {
46        WsConfigBuilder::new()
47    }
48}
49
50/// Builder for [`WsConfig`].
51#[derive(Debug, Clone, Default)]
52pub struct WsConfigBuilder {
53    config: WsConfig,
54}
55
56impl WsConfigBuilder {
57    /// Create a new builder with default settings.
58    pub fn new() -> Self {
59        Self {
60            config: WsConfig::default(),
61        }
62    }
63
64    /// Set the reconnection backoff parameters.
65    pub fn reconnect_backoff(mut self, initial: Duration, max: Duration) -> Self {
66        self.config.initial_backoff = initial;
67        self.config.max_backoff = max;
68        self
69    }
70
71    /// Set maximum reconnection attempts.
72    pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
73        self.config.max_reconnect_attempts = Some(attempts);
74        self
75    }
76
77    /// Set ping interval.
78    pub fn ping_interval(mut self, interval: Duration) -> Self {
79        self.config.ping_interval = interval;
80        self
81    }
82
83    /// Build the configuration.
84    pub fn build(self) -> WsConfig {
85        self.config
86    }
87}
88
89/// Kraken Spot WebSocket client.
90///
91/// Provides methods to connect to public and private WebSocket channels
92/// with automatic reconnection and subscription restoration.
93#[derive(Debug, Clone)]
94pub struct SpotWsClient {
95    /// Public WebSocket URL.
96    public_url: String,
97    /// Private WebSocket URL.
98    auth_url: String,
99    /// Connection configuration.
100    config: WsConfig,
101}
102
103impl SpotWsClient {
104    /// Create a new WebSocket client with default settings.
105    pub fn new() -> Self {
106        Self::with_config(WsConfig::default())
107    }
108
109    /// Create a new WebSocket client with custom configuration.
110    pub fn with_config(config: WsConfig) -> Self {
111        Self {
112            public_url: endpoints::WS_PUBLIC.to_string(),
113            auth_url: endpoints::WS_AUTH.to_string(),
114            config,
115        }
116    }
117
118    /// Create a client with custom URLs (useful for testing).
119    pub fn with_urls(public_url: impl Into<String>, auth_url: impl Into<String>) -> Self {
120        Self {
121            public_url: public_url.into(),
122            auth_url: auth_url.into(),
123            config: WsConfig::default(),
124        }
125    }
126
127    /// Get the public WebSocket URL.
128    pub fn public_url(&self) -> &str {
129        &self.public_url
130    }
131
132    /// Get the private WebSocket URL.
133    pub fn auth_url(&self) -> &str {
134        &self.auth_url
135    }
136
137    /// Get the configuration.
138    pub fn config(&self) -> &WsConfig {
139        &self.config
140    }
141
142    /// Connect to the public WebSocket endpoint.
143    ///
144    /// Returns a stream of market data messages.
145    ///
146    /// # Example
147    ///
148    /// ```rust,ignore
149    /// use kraken_api_client::spot::ws::SpotWsClient;
150    /// use kraken_api_client::spot::ws::messages::{SubscribeParams, channels};
151    /// use futures_util::StreamExt;
152    ///
153    /// let client = SpotWsClient::new();
154    /// let mut stream = client.connect_public().await?;
155    ///
156    /// // Subscribe to ticker updates
157    /// stream.subscribe(SubscribeParams::public(channels::TICKER, vec!["BTC/USD".into()])).await?;
158    ///
159    /// while let Some(msg) = stream.next().await {
160    ///     println!("Message: {:?}", msg);
161    /// }
162    /// ```
163    pub async fn connect_public(&self) -> Result<KrakenStream, KrakenError> {
164        KrakenStream::connect_public(&self.public_url, self.config.clone()).await
165    }
166
167    /// Connect to the public WebSocket endpoint with custom configuration.
168    pub async fn connect_public_with_config(
169        &self,
170        config: WsConfig,
171    ) -> Result<KrakenStream, KrakenError> {
172        KrakenStream::connect_public(&self.public_url, config).await
173    }
174
175    /// Connect to the private (authenticated) WebSocket endpoint.
176    ///
177    /// Requires a valid WebSocket token obtained from the REST API.
178    ///
179    /// # Example
180    ///
181    /// ```rust,ignore
182    /// use kraken_api_client::spot::ws::SpotWsClient;
183    /// use kraken_api_client::spot::rest::SpotRestClient;
184    /// use kraken_api_client::auth::StaticCredentials;
185    /// use kraken_api_client::spot::ws::messages::{SubscribeParams, channels};
186    /// use futures_util::StreamExt;
187    /// use std::sync::Arc;
188    ///
189    /// // First, get a WebSocket token from the REST API
190    /// let credentials = Arc::new(StaticCredentials::new("api_key", "api_secret"));
191    /// let rest_client = SpotRestClient::builder().credentials(credentials).build();
192    /// let token_response = rest_client.get_websocket_token().await?;
193    ///
194    /// // Then connect to the private WebSocket
195    /// let ws_client = SpotWsClient::new();
196    /// let mut stream = ws_client.connect_private(token_response.token).await?;
197    ///
198    /// // Subscribe to execution updates
199    /// stream.subscribe(SubscribeParams::private(channels::EXECUTIONS, &token_response.token)).await?;
200    ///
201    /// while let Some(msg) = stream.next().await {
202    ///     println!("Message: {:?}", msg);
203    /// }
204    /// ```
205    pub async fn connect_private(&self, token: impl Into<String>) -> Result<KrakenStream, KrakenError> {
206        KrakenStream::connect_private(&self.auth_url, self.config.clone(), token.into()).await
207    }
208
209    /// Connect to the private WebSocket endpoint with custom configuration.
210    pub async fn connect_private_with_config(
211        &self,
212        token: impl Into<String>,
213        config: WsConfig,
214    ) -> Result<KrakenStream, KrakenError> {
215        KrakenStream::connect_private(&self.auth_url, config, token.into()).await
216    }
217}
218
219impl Default for SpotWsClient {
220    fn default() -> Self {
221        Self::new()
222    }
223}