kraken_api_client/spot/ws/client.rs
1//! WebSocket client implementation.
2
3use std::time::Duration;
4
5use crate::error::KrakenError;
6use crate::spot::ws::stream::KrakenStream;
7
8/// WebSocket endpoint URLs.
9pub mod endpoints {
10 /// Public WebSocket endpoint.
11 pub const WS_PUBLIC: &str = "wss://ws.kraken.com/v2";
12 /// Private (authenticated) WebSocket endpoint.
13 pub const WS_AUTH: &str = "wss://ws-auth.kraken.com/v2";
14}
15
16/// Configuration for WebSocket connections.
17#[derive(Debug, Clone)]
18pub struct WsConfig {
19 /// Initial backoff duration for reconnection.
20 pub initial_backoff: Duration,
21 /// Maximum backoff duration for reconnection.
22 pub max_backoff: Duration,
23 /// Maximum number of reconnection attempts (None = infinite).
24 pub max_reconnect_attempts: Option<u32>,
25 /// Ping interval for connection health checks.
26 pub ping_interval: Duration,
27 /// Pong timeout - disconnect if no pong received.
28 pub pong_timeout: Duration,
29}
30
31impl Default for WsConfig {
32 fn default() -> Self {
33 Self {
34 initial_backoff: Duration::from_secs(1),
35 max_backoff: Duration::from_secs(60),
36 max_reconnect_attempts: None, // Infinite
37 ping_interval: Duration::from_secs(30),
38 pong_timeout: Duration::from_secs(10),
39 }
40 }
41}
42
43impl WsConfig {
44 /// Create a new configuration builder.
45 pub fn builder() -> WsConfigBuilder {
46 WsConfigBuilder::new()
47 }
48}
49
50/// Builder for [`WsConfig`].
51#[derive(Debug, Clone, Default)]
52pub struct WsConfigBuilder {
53 config: WsConfig,
54}
55
56impl WsConfigBuilder {
57 /// Create a new builder with default settings.
58 pub fn new() -> Self {
59 Self {
60 config: WsConfig::default(),
61 }
62 }
63
64 /// Set the reconnection backoff parameters.
65 pub fn reconnect_backoff(mut self, initial: Duration, max: Duration) -> Self {
66 self.config.initial_backoff = initial;
67 self.config.max_backoff = max;
68 self
69 }
70
71 /// Set maximum reconnection attempts.
72 pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
73 self.config.max_reconnect_attempts = Some(attempts);
74 self
75 }
76
77 /// Set ping interval.
78 pub fn ping_interval(mut self, interval: Duration) -> Self {
79 self.config.ping_interval = interval;
80 self
81 }
82
83 /// Build the configuration.
84 pub fn build(self) -> WsConfig {
85 self.config
86 }
87}
88
89/// Kraken Spot WebSocket client.
90///
91/// Provides methods to connect to public and private WebSocket channels
92/// with automatic reconnection and subscription restoration.
93#[derive(Debug, Clone)]
94pub struct SpotWsClient {
95 /// Public WebSocket URL.
96 public_url: String,
97 /// Private WebSocket URL.
98 auth_url: String,
99 /// Connection configuration.
100 config: WsConfig,
101}
102
103impl SpotWsClient {
104 /// Create a new WebSocket client with default settings.
105 pub fn new() -> Self {
106 Self::with_config(WsConfig::default())
107 }
108
109 /// Create a new WebSocket client with custom configuration.
110 pub fn with_config(config: WsConfig) -> Self {
111 Self {
112 public_url: endpoints::WS_PUBLIC.to_string(),
113 auth_url: endpoints::WS_AUTH.to_string(),
114 config,
115 }
116 }
117
118 /// Create a client with custom URLs (useful for testing).
119 pub fn with_urls(public_url: impl Into<String>, auth_url: impl Into<String>) -> Self {
120 Self {
121 public_url: public_url.into(),
122 auth_url: auth_url.into(),
123 config: WsConfig::default(),
124 }
125 }
126
127 /// Get the public WebSocket URL.
128 pub fn public_url(&self) -> &str {
129 &self.public_url
130 }
131
132 /// Get the private WebSocket URL.
133 pub fn auth_url(&self) -> &str {
134 &self.auth_url
135 }
136
137 /// Get the configuration.
138 pub fn config(&self) -> &WsConfig {
139 &self.config
140 }
141
142 /// Connect to the public WebSocket endpoint.
143 ///
144 /// Returns a stream of market data messages.
145 ///
146 /// # Example
147 ///
148 /// ```rust,ignore
149 /// use kraken_api_client::spot::ws::SpotWsClient;
150 /// use kraken_api_client::spot::ws::messages::{SubscribeParams, channels};
151 /// use futures_util::StreamExt;
152 ///
153 /// let client = SpotWsClient::new();
154 /// let mut stream = client.connect_public().await?;
155 ///
156 /// // Subscribe to ticker updates
157 /// stream.subscribe(SubscribeParams::public(channels::TICKER, vec!["BTC/USD".into()])).await?;
158 ///
159 /// while let Some(msg) = stream.next().await {
160 /// println!("Message: {:?}", msg);
161 /// }
162 /// ```
163 pub async fn connect_public(&self) -> Result<KrakenStream, KrakenError> {
164 KrakenStream::connect_public(&self.public_url, self.config.clone()).await
165 }
166
167 /// Connect to the public WebSocket endpoint with custom configuration.
168 pub async fn connect_public_with_config(
169 &self,
170 config: WsConfig,
171 ) -> Result<KrakenStream, KrakenError> {
172 KrakenStream::connect_public(&self.public_url, config).await
173 }
174
175 /// Connect to the private (authenticated) WebSocket endpoint.
176 ///
177 /// Requires a valid WebSocket token obtained from the REST API.
178 ///
179 /// # Example
180 ///
181 /// ```rust,ignore
182 /// use kraken_api_client::spot::ws::SpotWsClient;
183 /// use kraken_api_client::spot::rest::SpotRestClient;
184 /// use kraken_api_client::auth::StaticCredentials;
185 /// use kraken_api_client::spot::ws::messages::{SubscribeParams, channels};
186 /// use futures_util::StreamExt;
187 /// use std::sync::Arc;
188 ///
189 /// // First, get a WebSocket token from the REST API
190 /// let credentials = Arc::new(StaticCredentials::new("api_key", "api_secret"));
191 /// let rest_client = SpotRestClient::builder().credentials(credentials).build();
192 /// let token_response = rest_client.get_websocket_token().await?;
193 ///
194 /// // Then connect to the private WebSocket
195 /// let ws_client = SpotWsClient::new();
196 /// let mut stream = ws_client.connect_private(token_response.token).await?;
197 ///
198 /// // Subscribe to execution updates
199 /// stream.subscribe(SubscribeParams::private(channels::EXECUTIONS, &token_response.token)).await?;
200 ///
201 /// while let Some(msg) = stream.next().await {
202 /// println!("Message: {:?}", msg);
203 /// }
204 /// ```
205 pub async fn connect_private(&self, token: impl Into<String>) -> Result<KrakenStream, KrakenError> {
206 KrakenStream::connect_private(&self.auth_url, self.config.clone(), token.into()).await
207 }
208
209 /// Connect to the private WebSocket endpoint with custom configuration.
210 pub async fn connect_private_with_config(
211 &self,
212 token: impl Into<String>,
213 config: WsConfig,
214 ) -> Result<KrakenStream, KrakenError> {
215 KrakenStream::connect_private(&self.auth_url, config, token.into()).await
216 }
217}
218
219impl Default for SpotWsClient {
220 fn default() -> Self {
221 Self::new()
222 }
223}