Skip to main content

kraken_api_client/futures/ws/
client.rs

1//! Futures WebSocket client implementation.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
7use hmac::{Hmac, Mac};
8use sha2::{Digest, Sha256, Sha512};
9
10use crate::auth::{Credentials, CredentialsProvider};
11use crate::error::KrakenError;
12use crate::futures::ws::endpoints;
13use crate::futures::ws::stream::FuturesStream;
14
15type HmacSha512 = Hmac<Sha512>;
16
17/// Configuration for WebSocket connections.
18#[derive(Debug, Clone)]
19pub struct WsConfig {
20    /// Initial backoff duration for reconnection.
21    pub initial_backoff: Duration,
22    /// Maximum backoff duration for reconnection.
23    pub max_backoff: Duration,
24    /// Maximum number of reconnection attempts (None = infinite).
25    pub max_reconnect_attempts: Option<u32>,
26    /// Ping interval for connection health checks.
27    pub ping_interval: Duration,
28    /// Pong timeout - disconnect if no pong received.
29    pub pong_timeout: Duration,
30}
31
32impl Default for WsConfig {
33    fn default() -> Self {
34        Self {
35            initial_backoff: Duration::from_secs(1),
36            max_backoff: Duration::from_secs(60),
37            max_reconnect_attempts: None, // Infinite reconnect attempts.
38            ping_interval: Duration::from_secs(30),
39            pong_timeout: Duration::from_secs(10),
40        }
41    }
42}
43
44impl WsConfig {
45    /// Create a new configuration builder.
46    pub fn builder() -> WsConfigBuilder {
47        WsConfigBuilder::new()
48    }
49}
50
51/// Builder for [`WsConfig`].
52#[derive(Debug, Clone, Default)]
53pub struct WsConfigBuilder {
54    config: WsConfig,
55}
56
57impl WsConfigBuilder {
58    /// Create a new builder with default settings.
59    pub fn new() -> Self {
60        Self {
61            config: WsConfig::default(),
62        }
63    }
64
65    /// Set the reconnection backoff parameters.
66    pub fn reconnect_backoff(mut self, initial: Duration, max: Duration) -> Self {
67        self.config.initial_backoff = initial;
68        self.config.max_backoff = max;
69        self
70    }
71
72    /// Set maximum reconnection attempts.
73    pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
74        self.config.max_reconnect_attempts = Some(attempts);
75        self
76    }
77
78    /// Set ping interval.
79    pub fn ping_interval(mut self, interval: Duration) -> Self {
80        self.config.ping_interval = interval;
81        self
82    }
83
84    /// Set pong timeout.
85    pub fn pong_timeout(mut self, timeout: Duration) -> Self {
86        self.config.pong_timeout = timeout;
87        self
88    }
89
90    /// Build the configuration.
91    pub fn build(self) -> WsConfig {
92        self.config
93    }
94}
95
96/// Kraken Futures WebSocket client.
97///
98/// Provides methods to connect to public and private WebSocket feeds
99/// with automatic reconnection and subscription restoration.
100///
101/// # Example
102///
103/// ```rust,ignore
104/// use kraken_api_client::futures::ws::{FuturesWsClient, feeds};
105/// use futures_util::StreamExt;
106///
107/// let client = FuturesWsClient::new();
108/// let mut stream = client.connect_public().await?;
109///
110/// stream.subscribe_public(feeds::TICKER, vec!["PI_XBTUSD"]).await?;
111///
112/// while let Some(msg) = stream.next().await {
113///     println!("{:?}", msg);
114/// }
115/// ```
116#[derive(Debug, Clone)]
117pub struct FuturesWsClient {
118    /// WebSocket URL.
119    url: String,
120    /// Connection configuration.
121    config: WsConfig,
122}
123
124impl FuturesWsClient {
125    /// Create a new WebSocket client with default settings.
126    pub fn new() -> Self {
127        Self::with_config(WsConfig::default())
128    }
129
130    /// Create a new WebSocket client with custom configuration.
131    pub fn with_config(config: WsConfig) -> Self {
132        Self {
133            url: endpoints::WS_PUBLIC.to_string(),
134            config,
135        }
136    }
137
138    /// Create a client for the demo/testnet environment.
139    pub fn demo() -> Self {
140        Self {
141            url: endpoints::WS_DEMO.to_string(),
142            config: WsConfig::default(),
143        }
144    }
145
146    /// Create a client with a custom URL (useful for testing).
147    pub fn with_url(url: impl Into<String>) -> Self {
148        Self {
149            url: url.into(),
150            config: WsConfig::default(),
151        }
152    }
153
154    /// Get the WebSocket URL.
155    pub fn url(&self) -> &str {
156        &self.url
157    }
158
159    /// Get the configuration.
160    pub fn config(&self) -> &WsConfig {
161        &self.config
162    }
163
164    /// Connect to the public WebSocket endpoint.
165    ///
166    /// Returns a stream that can subscribe to public feeds (ticker, book, trades).
167    ///
168    /// # Example
169    ///
170    /// ```rust,ignore
171    /// use kraken_api_client::futures::ws::{FuturesWsClient, feeds};
172    /// use futures_util::StreamExt;
173    ///
174    /// let client = FuturesWsClient::new();
175    /// let mut stream = client.connect_public().await?;
176    ///
177    /// stream.subscribe_public(feeds::BOOK, vec!["PI_XBTUSD"]).await?;
178    ///
179    /// while let Some(msg) = stream.next().await {
180    ///     match msg? {
181    ///         FuturesWsEvent::Book(book) => println!("Book: {:?}", book),
182    ///         _ => {}
183    ///     }
184    /// }
185    /// ```
186    pub async fn connect_public(&self) -> Result<FuturesStream, KrakenError> {
187        FuturesStream::connect_public(&self.url, self.config.clone()).await
188    }
189
190    /// Connect to the private WebSocket endpoint with authentication.
191    ///
192    /// This will perform the challenge/response authentication automatically.
193    ///
194    /// # Example
195    ///
196    /// ```rust,ignore
197    /// use kraken_api_client::futures::ws::{FuturesWsClient, feeds};
198    /// use kraken_api_client::auth::StaticCredentials;
199    /// use futures_util::StreamExt;
200    /// use std::sync::Arc;
201    ///
202    /// let credentials = Arc::new(StaticCredentials::new("api_key", "api_secret"));
203    /// let client = FuturesWsClient::new();
204    /// let mut stream = client.connect_private(credentials).await?;
205    ///
206    /// stream.subscribe_private(feeds::OPEN_ORDERS).await?;
207    /// stream.subscribe_private(feeds::FILLS).await?;
208    ///
209    /// while let Some(msg) = stream.next().await {
210    ///     match msg? {
211    ///         FuturesWsEvent::Order(order) => println!("Order: {:?}", order),
212    ///         FuturesWsEvent::Fill(fill) => println!("Fill: {:?}", fill),
213    ///         _ => {}
214    ///     }
215    /// }
216    /// ```
217    pub async fn connect_private(
218        &self,
219        credentials: Arc<dyn CredentialsProvider>,
220    ) -> Result<FuturesStream, KrakenError> {
221        FuturesStream::connect_private(&self.url, self.config.clone(), credentials).await
222    }
223
224    /// Connect to the public WebSocket endpoint with custom configuration.
225    pub async fn connect_public_with_config(
226        &self,
227        config: WsConfig,
228    ) -> Result<FuturesStream, KrakenError> {
229        FuturesStream::connect_public(&self.url, config).await
230    }
231
232    /// Connect to the private WebSocket endpoint with custom configuration.
233    pub async fn connect_private_with_config(
234        &self,
235        credentials: Arc<dyn CredentialsProvider>,
236        config: WsConfig,
237    ) -> Result<FuturesStream, KrakenError> {
238        FuturesStream::connect_private(&self.url, config, credentials).await
239    }
240}
241
242impl Default for FuturesWsClient {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248/// Sign a WebSocket challenge for authentication.
249///
250/// The Futures WebSocket API uses challenge-based authentication:
251/// 1. SHA-256 hash the challenge message
252/// 2. HMAC-SHA-512 with the base64-decoded API secret
253/// 3. Base64 encode the result
254///
255/// # Arguments
256///
257/// * `credentials` - API credentials containing the secret
258/// * `challenge` - The challenge string received from the server
259///
260/// # Returns
261///
262/// Base64-encoded signed challenge.
263pub fn sign_challenge(credentials: &Credentials, challenge: &str) -> Result<String, KrakenError> {
264    // Decode the API secret from base64.
265    let secret_decoded = BASE64
266        .decode(credentials.expose_secret())
267        .map_err(|_| KrakenError::Auth("API secret must be valid base64.".to_string()))?;
268
269    // SHA-256 hash the challenge.
270    let sha256_hash = Sha256::digest(challenge.as_bytes());
271
272    // HMAC-SHA-512 with the decoded secret.
273    let mut hmac = HmacSha512::new_from_slice(&secret_decoded)
274        .map_err(|e| KrakenError::Auth(format!("Invalid HMAC key: {e}")))?;
275    hmac.update(&sha256_hash);
276    let hmac_result = hmac.finalize().into_bytes();
277
278    // Base64 encode the result.
279    Ok(BASE64.encode(hmac_result))
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_sign_challenge() {
288        // Test that challenge signing produces correct format
289        let secret = BASE64.encode("test_secret_key");
290        let credentials = Credentials::new("api_key", secret);
291
292        let signed = sign_challenge(&credentials, "123e4567-e89b-12d3-a456-426614174000").unwrap();
293
294        // Should be valid base64
295        assert!(BASE64.decode(&signed).is_ok());
296        // HMAC-SHA512 produces 64 bytes, base64 encoded = 88 chars
297        assert_eq!(signed.len(), 88);
298    }
299
300    #[test]
301    fn test_sign_challenge_consistency() {
302        let secret = BASE64.encode("my_secret");
303        let credentials = Credentials::new("key", secret);
304
305        let sig1 = sign_challenge(&credentials, "test-challenge").unwrap();
306        let sig2 = sign_challenge(&credentials, "test-challenge").unwrap();
307
308        assert_eq!(sig1, sig2);
309    }
310
311    #[test]
312    fn test_sign_challenge_different_challenges() {
313        let secret = BASE64.encode("my_secret");
314        let credentials = Credentials::new("key", secret);
315
316        let sig1 = sign_challenge(&credentials, "challenge-1").unwrap();
317        let sig2 = sign_challenge(&credentials, "challenge-2").unwrap();
318
319        assert_ne!(sig1, sig2);
320    }
321
322    #[test]
323    fn test_config_builder() {
324        let config = WsConfig::builder()
325            .reconnect_backoff(Duration::from_secs(2), Duration::from_secs(120))
326            .max_reconnect_attempts(5)
327            .ping_interval(Duration::from_secs(15))
328            .pong_timeout(Duration::from_secs(5))
329            .build();
330
331        assert_eq!(config.initial_backoff, Duration::from_secs(2));
332        assert_eq!(config.max_backoff, Duration::from_secs(120));
333        assert_eq!(config.max_reconnect_attempts, Some(5));
334        assert_eq!(config.ping_interval, Duration::from_secs(15));
335        assert_eq!(config.pong_timeout, Duration::from_secs(5));
336    }
337
338    #[test]
339    fn test_client_urls() {
340        let client = FuturesWsClient::new();
341        assert_eq!(client.url(), "wss://futures.kraken.com/ws/v1");
342
343        let demo = FuturesWsClient::demo();
344        assert_eq!(demo.url(), "wss://demo-futures.kraken.com/ws/v1");
345
346        let custom = FuturesWsClient::with_url("wss://custom.example.com");
347        assert_eq!(custom.url(), "wss://custom.example.com");
348    }
349}