exchange-apiws 0.1.10

Exchange REST and WebSocket clients — spot trading, futures, account management, and live data streams
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//! WebSocket feed runner — connection, ping, reconnect, message dispatch.
//!
//! [`run_feed`] is the single entry point. It drives any [`ExchangeConnector`]
//! through the full session lifecycle:
//!
//! ```text
//! connect → subscribe → recv loop ──► parse → tx.send(DataMessage)
//!     ▲           │ ping tick
//!     │           ▼
//!     └── reconnect (exponential backoff)
//! ```
//!
//! ## KuCoin rate limits enforced here
//!
//! KuCoin enforces a limit of **100 client-to-server messages per 10 seconds**
//! per connection (applies to subscribe, unsubscribe, and ping messages).
//! Exceeding this may cause the server to disconnect the connection.
//! The runner enforces this limit with a sliding window before sending any
//! outbound message.
//!
//! # Example
//!
//! ```no_run
//! use std::sync::Arc;
//! use tokio::sync::{mpsc, watch};
//! use exchange_apiws::{KuCoinClient, Credentials, KucoinEnv};
//! use exchange_apiws::actors::{DataMessage, ExchangeConnector};
//! use exchange_apiws::ws::{KucoinConnector, run_feed, WsRunnerConfig};
//!
//! #[tokio::main]
//! async fn main() -> exchange_apiws::Result<()> {
//!     let client = KuCoinClient::new(Credentials::from_env()?, KucoinEnv::LiveFutures)?;
//!     let token  = client.get_ws_token_public().await?;
//!     let conn   = Arc::new(KucoinConnector::new(&token, KucoinEnv::LiveFutures)?);
//!
//!     let mut subs = vec![];
//!     if let Some(s) = conn.trade_subscription("XBTUSDTM")  { subs.push(s); }
//!     if let Some(s) = conn.ticker_subscription("XBTUSDTM") { subs.push(s); }
//!
//!     let (tx, mut rx)               = mpsc::channel::<DataMessage>(1024);
//!     let (shutdown_tx, shutdown_rx) = watch::channel(false);
//!     let config = WsRunnerConfig::from_ping_interval(conn.ping_interval_secs);
//!
//!     tokio::spawn(run_feed(conn.ws_url().to_string(), subs, conn, tx, config, shutdown_rx));
//!
//!     while let Some(msg) = rx.recv().await {
//!         println!("{msg:?}");
//!     }
//!     let _ = shutdown_tx.send(true);
//!     Ok(())
//! }
//! ```

use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, watch};
use tokio::time::interval;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{debug, error, info, warn};

use crate::actors::{DataMessage, ExchangeConnector};
use crate::error::{ExchangeError, Result};
use crate::ws::types::WsMessage;

// ── Config ────────────────────────────────────────────────────────────────────

/// Tuning parameters for the WS runner.
#[derive(Debug, Clone)]
pub struct WsRunnerConfig {
    /// How often to send an application-level KuCoin ping (seconds).
    pub ping_interval_secs: u64,
    /// Base reconnect delay (seconds). Doubles on each attempt up to
    /// [`max_reconnect_delay_secs`].
    pub reconnect_delay_secs: u64,
    /// Hard ceiling on the per-attempt reconnect delay (seconds).
    ///
    /// Defaults to 80 s (16× the 5 s base). Lower this for latency-sensitive
    /// contexts — e.g. set to 30 s for a futures trading bot so a prolonged
    /// outage re-tries every 30 s rather than every 80 s once the backoff
    /// saturates.
    pub max_reconnect_delay_secs: u64,
    /// Give up and return [`ExchangeError::WsDisconnected`] after this many
    /// consecutive failed reconnect attempts. Set to `u32::MAX` to retry forever.
    pub max_reconnect_attempts: u32,
}

impl Default for WsRunnerConfig {
    fn default() -> Self {
        Self {
            ping_interval_secs: 20,
            reconnect_delay_secs: 5,
            max_reconnect_delay_secs: 80,
            max_reconnect_attempts: 10,
        }
    }
}

impl WsRunnerConfig {
    /// Build from the ping interval advertised by a KuCoin instance server.
    ///
    /// Pass `connector.ping_interval_secs` after calling [`crate::ws::KucoinConnector::new`].
    pub fn from_ping_interval(ping_interval_secs: u64) -> Self {
        Self {
            ping_interval_secs,
            ..Default::default()
        }
    }
}

// ── Rate-limit guard ──────────────────────────────────────────────────────────

/// Sliding-window rate limiter for outbound WS messages.
///
/// KuCoin allows 100 client→server messages per 10 seconds per connection.
/// This tracks send times in a `VecDeque` and sleeps if the window is full.
struct WsMsgGuard {
    window: VecDeque<Instant>,
    max_msgs: usize,
    window_dur: Duration,
}

impl WsMsgGuard {
    fn new() -> Self {
        Self {
            window: VecDeque::with_capacity(100),
            max_msgs: 100,
            window_dur: Duration::from_secs(10),
        }
    }

    /// Call before every outbound send. Sleeps if the 100/10s quota is full.
    async fn check(&mut self) {
        let now = Instant::now();
        // Drop timestamps older than the window.
        while self
            .window
            .front()
            .is_some_and(|t| now - *t > self.window_dur)
        {
            self.window.pop_front();
        }
        if self.window.len() >= self.max_msgs {
            // Sleep until the oldest message falls out of the window.
            if let Some(oldest) = self.window.front() {
                let wait = self.window_dur.saturating_sub(now - *oldest);
                if !wait.is_zero() {
                    warn!(
                        wait_ms = wait.as_millis(),
                        "WS outbound rate limit reached (100/10s) — throttling"
                    );
                    tokio::time::sleep(wait).await;
                }
            }
        }
        self.window.push_back(Instant::now());
    }
}

// ── Public entry point ────────────────────────────────────────────────────────

/// Drive a WebSocket feed for any [`ExchangeConnector`].
///
/// Subscribes to all topics in `subscriptions` on connect, forwards parsed
/// [`DataMessage`]s to `tx`, and reconnects automatically on any disconnect.
///
/// The reconnect attempt counter resets to zero whenever a session ran
/// successfully for at least [`STABLE_SESSION_SECS`] seconds. This means
/// a stable connection that eventually drops is treated the same as a fresh
/// start — it won't exhaust the attempt budget just from normal daily
/// reconnects.
///
/// # Arguments
/// - `ws_url`        — Full WSS URL with token query params.
/// - `subscriptions` — JSON subscription messages (build with the connector's helpers).
/// - `connector`     — Shared connector used to parse incoming frames.
/// - `tx`            — Downstream channel that receives parsed messages.
/// - `config`        — Ping interval, backoff, and max retry settings.
/// - `shutdown`      — Send `true` to request a graceful close.
///
/// # Returns
/// `Ok(())` on clean shutdown.
/// `Err(ExchangeError::WsDisconnected)` if max reconnect attempts are exhausted.
pub async fn run_feed(
    ws_url: impl Into<String>,
    subscriptions: Vec<String>,
    connector: Arc<dyn ExchangeConnector>,
    tx: mpsc::Sender<DataMessage>,
    config: WsRunnerConfig,
    mut shutdown: watch::Receiver<bool>,
) -> Result<()> {
    /// A session that ran at least this long is considered stable.
    /// After a stable session the attempt counter resets so normal
    /// daily reconnects (token expiry, rolling restarts, etc.) don't
    /// burn the retry budget.
    const STABLE_SESSION_SECS: u64 = 60;

    let url = ws_url.into();
    let mut attempts: u32 = 0;

    loop {
        if attempts > 0 {
            // Exponential backoff capped at config.max_reconnect_delay_secs.
            let exp = (attempts - 1).min(63); // guard against overflow on shift
            let delay = config
                .reconnect_delay_secs
                .saturating_mul(1u64 << exp.min(4)) // double each step
                .min(config.max_reconnect_delay_secs);
            warn!(
                attempt = attempts,
                max = config.max_reconnect_attempts,
                delay_secs = delay,
                exchange = connector.exchange_name(),
                "WS reconnecting"
            );
            tokio::time::sleep(Duration::from_secs(delay)).await;
        }

        let session_start = Instant::now();
        let outcome = single_session(
            &url,
            &subscriptions,
            connector.clone(),
            tx.clone(),
            &config,
            &mut shutdown,
            attempts,
        )
        .await;

        match outcome {
            SessionOutcome::ShutdownRequested => {
                info!(
                    exchange = connector.exchange_name(),
                    "WS feed shut down cleanly"
                );
                return Ok(());
            }
            SessionOutcome::ReceiverDropped => {
                info!("DataMessage receiver dropped; stopping WS feed");
                return Ok(());
            }
            SessionOutcome::Disconnected => {
                // If the session was stable for long enough, treat this as
                // a fresh start rather than a retry.  Normal causes: token
                // expiry (KuCoin tokens last ~24 h), rolling server restart,
                // or a clean network handoff.
                if session_start.elapsed().as_secs() >= STABLE_SESSION_SECS {
                    info!(
                        exchange = connector.exchange_name(),
                        uptime_secs = session_start.elapsed().as_secs(),
                        "WS stable session ended — resetting reconnect counter",
                    );
                    attempts = 0;
                } else {
                    attempts += 1;
                    if attempts > config.max_reconnect_attempts {
                        error!(
                            max = config.max_reconnect_attempts,
                            exchange = connector.exchange_name(),
                            "WS max reconnect attempts exhausted"
                        );
                        return Err(ExchangeError::WsDisconnected {
                            url: url.to_string(),
                            attempts,
                        });
                    }
                }
            }
        }
    }
}

// ── Internal session ──────────────────────────────────────────────────────────

enum SessionOutcome {
    ShutdownRequested,
    ReceiverDropped,
    Disconnected,
}

async fn single_session(
    url: &str,
    subscriptions: &[String],
    connector: Arc<dyn ExchangeConnector>,
    tx: mpsc::Sender<DataMessage>,
    config: &WsRunnerConfig,
    shutdown: &mut watch::Receiver<bool>,
    attempt: u32,
) -> SessionOutcome {
    info!(url, exchange = connector.exchange_name(), "WS connecting");

    let ws_stream = match connect_async(url).await {
        Ok((stream, _resp)) => stream,
        Err(e) => {
            warn!(error = %e, "WS connect failed");
            return SessionOutcome::Disconnected;
        }
    };

    let (mut write, mut read) = ws_stream.split();
    let mut guard = WsMsgGuard::new();

    // Send all subscription messages before entering the recv loop.
    for sub in subscriptions {
        guard.check().await;
        if let Err(e) = write.send(Message::Text(sub.clone().into())).await {
            warn!(error = %e, "failed to send subscription");
            return SessionOutcome::Disconnected;
        }
        debug!(topic = ?sub, "subscribed");
    }

    info!(
        exchange = connector.exchange_name(),
        "WS connected and subscribed"
    );

    let mut ping_tick = interval(Duration::from_secs(config.ping_interval_secs));
    ping_tick.tick().await; // discard the immediate first tick

    loop {
        tokio::select! {
            biased; // prioritise shutdown check under high message load

            // ── Shutdown signal ──────────────────────────────────────────────
            Ok(()) = shutdown.changed() => {
                if *shutdown.borrow() {
                    guard.check().await;
                    let _ = write.send(Message::Close(None)).await;
                    return SessionOutcome::ShutdownRequested;
                }
            }

            // ── Incoming WS frame ────────────────────────────────────────────
            frame = read.next() => {
                match frame {
                    Some(Ok(Message::Text(text))) => {
                        match connector.parse_message(&text) {
                            Ok(msgs) => {
                                for msg in msgs {
                                    if tx.send(msg).await.is_err() {
                                        return SessionOutcome::ReceiverDropped;
                                    }
                                }
                            }
                            Err(e) => {
                                warn!(error = %e, raw = %text, "parse_message error — skipping frame");
                            }
                        }
                    }
                    Some(Ok(Message::Ping(data))) => {
                        // Respond to protocol-level pings from the server.
                        if let Err(e) = write.send(Message::Pong(data)).await {
                            warn!(error = %e, "pong send failed");
                            return SessionOutcome::Disconnected;
                        }
                    }
                    Some(Ok(Message::Close(frame))) => {
                        info!(frame = ?frame, "server closed WS connection");
                        return SessionOutcome::Disconnected;
                    }
                    Some(Ok(Message::Binary(_))) => {
                        debug!("unexpected binary frame — ignored");
                    }
                    Some(Ok(_)) => {} // Pong / other frame variants — no action
                    Some(Err(e)) => {
                        // KuCoin periodically resets connections as part of
                        // normal server maintenance. A first-attempt drop is
                        // not worth alarming on — it almost always recovers
                        // immediately.  Only escalate to WARN once we've
                        // already retried at least once, signalling a
                        // persistent problem rather than a routine rotation.
                        if attempt == 0 {
                            debug!(error = %e, exchange = connector.exchange_name(), "WS read error");
                        } else {
                            warn!(error = %e, attempt, exchange = connector.exchange_name(), "WS read error");
                        }
                        return SessionOutcome::Disconnected;
                    }
                    None => {
                        debug!("WS stream closed");
                        return SessionOutcome::Disconnected;
                    }
                }
            }

            // ── Application-level ping ───────────────────────────────────────
            _ = ping_tick.tick() => {
                guard.check().await;
                if let Err(e) = write
                    .send(Message::Text(WsMessage::ping_json().into()))
                    .await
                {
                    warn!(error = %e, "ping send failed");
                    return SessionOutcome::Disconnected;
                }
                debug!(exchange = connector.exchange_name(), "sent ping");
            }
        }
    }
}