simulator-client 0.9.0

Async WebSocket client for the Solana simulator backtest API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
//! Connection-managed wrappers over the backtest WebSocket protocol.
//!
//! Most callers should use [`ManagedBacktestSession`]. It owns the control and
//! subscription manager tasks, waits for session creation, gates `Continue`
//! sends on live connections, and provides one shutdown path.
//!
//! The control and subscription WebSockets are each owned by a dedicated task
//! that handles its own lifecycle: connect, handshake, keepalive, reconnect.
//! Workload code interacts via channels and a status watcher, never with the
//! WebSocket directly.

use std::{
    future::Future,
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::{Duration, Instant},
};

use futures::SinkExt;
use rand::Rng;
use simulator_api::{BacktestError, BacktestRequest};
use tokio::{
    net::TcpStream,
    sync::{Notify, OwnedSemaphorePermit, Semaphore, watch},
};
use tokio_tungstenite::{
    MaybeTlsStream, WebSocketStream, connect_async,
    tungstenite::{Error as WsError, Message, client::IntoClientRequest, http::HeaderValue},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::error::err_chain;

mod control;
mod parallel;
mod session;
mod subscription;

pub use control::{ControlEvent, ControlHandle, spawn_control_manager};
pub use parallel::{ManagedParallelSession, ParallelSubSession};
pub use session::{ManagedBacktestSession, ManagedEvent, ManagedSessionError};
pub use subscription::{
    SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
    spawn_transaction_subscription_manager,
};

/// Timeout for the initial WebSocket connect (TCP + TLS + HTTP upgrade).
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

/// Timeout for any single response during a handshake (Create, Attach, Resume,
/// subscribe ack). Does not apply to per-message reads during an established
/// session — those are bounded by ping/pong liveness instead.
///
/// Set generously because the management service can park subscribe upgrades
/// for the duration of session startup (observed ~100s on cold runtimes); a
/// shorter timeout fires before management can forward the subscribe message
/// and gets reported as "subscribe ack timeout" even though the session is
/// fine. The proper fix is server-side (reject parked upgrades cleanly), but
/// this keeps the client from giving up early in the meantime.
pub const HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);

/// How often to send a WebSocket ping during an established connection.
pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);

/// How long without inbound traffic before declaring the connection dead.
/// Roughly three missed pings.
pub const KEEPALIVE_MISS_DEADLINE: Duration = Duration::from_secs(45);

/// Total budget for the graceful close handshake (close request + close frame).
/// A supervisor's SIGTERM→SIGKILL grace must exceed this or the close is cut short.
pub const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);

pub const RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
pub const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
pub const RECONNECT_BACKOFF_MULTIPLIER: f64 = 2.0;
pub const RECONNECT_JITTER: f64 = 0.2;
pub const RECONNECT_MAX_TOTAL: Duration = Duration::from_secs(5 * 60);
pub const RECONNECT_MAX_ATTEMPTS: u32 = 20;

/// Reconnect attempts that fire immediately, before the cross-session reconnect
/// gate engages. A transient drop recovers fast; only once these rapid retries
/// are exhausted does a session fall back to waiting for siblings to drain.
pub const RECONNECT_UNGATED_ATTEMPTS: u32 = 5;

/// A connection that stays `Up` for this long resets the reconnect counter.
pub const RECONNECT_UPTIME_RESET: Duration = Duration::from_secs(30);

/// Connection state as observed from outside the manager task.
///
/// `Down` is a transient "currently not connected, manager is retrying"
/// state. `Failed` is terminal; the retry budget was exhausted.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionStatus {
    Up,
    Down,
    Failed(String),
}

/// Identity of a created backtest session.
#[derive(Clone, Debug)]
pub struct SessionInfo {
    pub session_id: String,
    pub rpc_endpoint: String,
    /// Opaque `task_id` reported by the server for this session, if any.
    pub task_id: Option<String>,
}

/// Tracks reconnect attempts and enforces the reconnect policy.
pub(crate) struct ReconnectBudget {
    attempts: u32,
    started_at: std::time::Instant,
    current_backoff: Duration,
}

impl ReconnectBudget {
    pub fn new() -> Self {
        Self {
            attempts: 0,
            started_at: std::time::Instant::now(),
            current_backoff: RECONNECT_INITIAL_BACKOFF,
        }
    }

    pub fn reset(&mut self) {
        self.attempts = 0;
        self.started_at = std::time::Instant::now();
        self.current_backoff = RECONNECT_INITIAL_BACKOFF;
    }

    pub fn attempt(&self) -> u32 {
        self.attempts
    }

    /// Discount time the subscription spent parked behind the reconnect
    /// coordinator. Parking is deliberate waiting for siblings to free the
    /// link, not retry thrashing, so it must not count against the wall-clock
    /// budget — otherwise a session that correctly steps aside for minutes is
    /// failed the moment it reconnects. The attempt-count ceiling still bounds
    /// total retries.
    pub fn discount_parked(&mut self, parked: Duration) {
        self.started_at += parked;
    }

    /// Record an attempt and return its backoff, or `None` if the budget is
    /// exhausted.
    pub fn next_backoff(&mut self) -> Option<Duration> {
        if self.attempts >= RECONNECT_MAX_ATTEMPTS
            || self.started_at.elapsed() >= RECONNECT_MAX_TOTAL
        {
            return None;
        }
        self.attempts += 1;
        let backoff = with_jitter(self.current_backoff);
        self.current_backoff = std::cmp::min(
            RECONNECT_MAX_BACKOFF,
            Duration::from_secs_f64(
                self.current_backoff.as_secs_f64() * RECONNECT_BACKOFF_MULTIPLIER,
            ),
        );
        Some(backoff)
    }
}

/// Coordinates subscription reconnects across a parallel batch sharing one
/// bandwidth-constrained link.
///
/// A dropped subscription competing with its still-streaming siblings just
/// starves — the handshake can't get through a saturated pipe. So once a session
/// has burned its fast ungated attempts, it stops fighting and **parks until no
/// sibling is streaming** (the connected sessions have finished and freed their
/// bandwidth), then reconnects into the quiet link and resumes via
/// `replayFromSlot`. Stepping aside also speeds the streaming siblings up, so the
/// batch drains and the parked sessions recover — serially, one at a time, since
/// at a tight rate only one stream fits.
pub struct ReconnectCoordinator {
    /// Subscriptions currently `Up` and streaming. A parked session waits for
    /// this to reach zero before reconnecting.
    streaming: AtomicUsize,
    /// Woken whenever `streaming` decreases, so parked sessions re-check.
    drained: Notify,
    /// One in-flight reconnect handshake at a time, so the all-parked case
    /// recovers serially rather than storming the link the moment it goes quiet.
    handshake: Arc<Semaphore>,
}

impl Default for ReconnectCoordinator {
    fn default() -> Self {
        Self::new()
    }
}

impl ReconnectCoordinator {
    pub fn new() -> Self {
        Self {
            streaming: AtomicUsize::new(0),
            drained: Notify::new(),
            handshake: Arc::new(Semaphore::new(1)),
        }
    }

    /// Mark this subscription as streaming until the returned guard drops.
    /// Call once a (re)subscribe succeeds; the guard's `Drop` decrements the
    /// count and wakes any sibling parked in [`Self::reconnect_slot`]. RAII so
    /// a panicking or aborted task can't leak the count and wedge the batch.
    pub fn enter(self: &Arc<Self>) -> StreamingGuard {
        self.streaming.fetch_add(1, Ordering::SeqCst);
        StreamingGuard(self.clone())
    }

    /// Park until no sibling is streaming, then take the single reconnect slot.
    /// The returned permit must be held across connect + subscribe and dropped
    /// once [`Self::enter`] has been called (or the attempt failed), so the next
    /// parked session proceeds only after this one is streaming again. Returns
    /// `None` if cancelled.
    pub async fn reconnect_slot(&self, cancel: &CancellationToken) -> Option<OwnedSemaphorePermit> {
        loop {
            // Wait for the link to go quiet. Register interest before reading so
            // a `leave()` between the check and the wait isn't lost.
            loop {
                let drained = self.drained.notified();
                if self.streaming.load(Ordering::SeqCst) == 0 {
                    break;
                }
                tokio::select! {
                    biased;
                    _ = cancel.cancelled() => return None,
                    _ = drained => {}
                }
            }
            let permit = tokio::select! {
                biased;
                _ = cancel.cancelled() => return None,
                p = self.handshake.clone().acquire_owned() => p.ok()?,
            };
            // A sibling may have re-entered while we queued for the permit; if so,
            // release it and wait for quiet again rather than reconnect into
            // contention.
            if self.streaming.load(Ordering::SeqCst) == 0 {
                return Some(permit);
            }
        }
    }
}

/// Holds a [`ReconnectCoordinator`]'s streaming count for the lifetime of one
/// connection; decrements and wakes parked siblings on drop.
pub struct StreamingGuard(Arc<ReconnectCoordinator>);

impl Drop for StreamingGuard {
    fn drop(&mut self) {
        self.0.streaming.fetch_sub(1, Ordering::SeqCst);
        self.0.drained.notify_waiters();
    }
}

fn with_jitter(d: Duration) -> Duration {
    let jitter = rand::rng().random_range(-RECONNECT_JITTER..RECONNECT_JITTER);
    let secs = (d.as_secs_f64() * (1.0 + jitter)).max(0.0);
    Duration::from_secs_f64(secs)
}

/// Sleep for `delay`, returning early on cancellation. Returns `true` if the
/// sleep completed, `false` if cancelled.
pub(crate) async fn cancellable_sleep(delay: Duration, cancel: &CancellationToken) -> bool {
    tokio::select! {
        _ = tokio::time::sleep(delay) => true,
        _ = cancel.cancelled() => false,
    }
}

/// A managed control WebSocket stream.
pub(super) type Ws = WebSocketStream<MaybeTlsStream<TcpStream>>;

/// Open a control WebSocket to `url`, attaching the API-key header, bounded by
/// [`CONNECT_TIMEOUT`]. Shared by the single-session and parallel managers.
pub(super) async fn connect_ws(url: &str, api_key: &str) -> Result<Ws, String> {
    let mut request = url
        .into_client_request()
        .map_err(|e| format!("build request: {}", err_chain(&e)))?;
    request.headers_mut().insert(
        "X-API-Key",
        HeaderValue::from_str(api_key).map_err(|e| format!("api key header: {}", err_chain(&e)))?,
    );

    let connect = tokio::time::timeout(CONNECT_TIMEOUT, connect_async(request))
        .await
        .map_err(|_| format!("connect timeout after {CONNECT_TIMEOUT:?}"))?
        .map_err(|e| format!("connect: {}", err_chain(&e)))?;
    Ok(connect.0)
}

/// Serialize and send a `BacktestRequest` over a control WebSocket.
pub(super) async fn send_request(ws: &mut Ws, req: &BacktestRequest) -> Result<(), String> {
    let text = serde_json::to_string(req).map_err(|e| format!("serialize: {}", err_chain(&e)))?;
    ws.send(Message::Text(text))
        .await
        .map_err(|e| format!("send: {}", err_chain(&e)))
}

/// Resolve a server-reported `rpc_endpoint` to an absolute URL: pass through an
/// already-absolute endpoint, otherwise join it onto the control `base`.
pub(super) fn resolve_rpc_url(base: &str, endpoint: &str) -> String {
    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
        endpoint.to_string()
    } else {
        format!("{}/{}", base, endpoint.trim_start_matches('/'))
    }
}

/// Outcome of a control-WS handshake (create/attach/resume).
pub(super) enum HandshakeError {
    /// Recoverable (network blip, or the session is momentarily contended).
    Transient(String),
    /// Unrecoverable — the server rejected the session outright.
    Fatal(String),
}

/// Classify a `BacktestError` returned during a handshake `stage`: a contended
/// session is expected to become claimable shortly (transient); anything else is
/// fatal.
pub(super) fn handshake_error_for_response(
    stage: &'static str,
    err: BacktestError,
) -> HandshakeError {
    match err {
        BacktestError::SessionOwnershipBusy { .. } => {
            HandshakeError::Transient(format!("{stage} contended: {}", err_chain(&err)))
        }
        _ => HandshakeError::Fatal(format!("{stage} rejected: {}", err_chain(&err))),
    }
}

/// Why a control task's message loop exited. Shared by the single-session and
/// parallel control managers.
pub(super) enum MessageLoopExit {
    /// Server reported the session(s) done, or the driver(s) dropped the request
    /// channel. Calls for a graceful WS close.
    SessionEnded,
    /// Cancellation token fired. Still graceful-closes so the server reaps the
    /// runtime now instead of after the disconnect timeout.
    Cancelled,
    /// Connection lost — attempt to reconnect.
    ConnectionLost(String),
    /// Protocol or application error with no sensible recovery.
    Terminal(String),
}

/// Publish a `ConnectionStatus` transition, suppressing no-op updates.
pub(super) fn publish_status(
    status_tx: &watch::Sender<ConnectionStatus>,
    status: ConnectionStatus,
) {
    status_tx.send_if_modified(|current| {
        if *current == status {
            false
        } else {
            *current = status;
            true
        }
    });
}

/// Backtest errors the server cannot recover from mid-session.
pub(super) fn is_terminal_backtest_error(err: &BacktestError) -> bool {
    matches!(
        err,
        BacktestError::NoMoreBlocks
            | BacktestError::AdvanceSlotFailed { .. }
            | BacktestError::FinalizeSlotFailed { .. }
            | BacktestError::Internal { .. }
    )
}

/// Send `CloseBacktestSession` then the WS close frame, bounded by a single
/// [`GRACEFUL_CLOSE_TIMEOUT`] budget. The close request makes the server reap
/// the runtime(s) immediately instead of waiting out the disconnect timeout; the
/// close frame is best-effort cleanup.
pub(super) async fn graceful_close(ws: &mut Ws) {
    let _ = tokio::time::timeout(GRACEFUL_CLOSE_TIMEOUT, async {
        let _ = send_request(ws, &BacktestRequest::CloseBacktestSession).await;
        let _ = ws.close(None).await;
    })
    .await;
}

/// An inbound control frame, classified for the message loop.
pub(super) enum InboundFrame {
    /// A text (or UTF-8 binary) payload to parse.
    Text(String),
    /// A frame with no application payload (ping/pong/raw/non-UTF-8 binary).
    Ignore,
    /// The connection ended or errored; carries the loss reason.
    Lost(String),
}

/// Classify the result of `ws.next()` for a control message loop.
pub(super) fn classify_inbound(msg: Option<Result<Message, WsError>>) -> InboundFrame {
    match msg {
        Some(Ok(Message::Text(t))) => InboundFrame::Text(t),
        Some(Ok(Message::Binary(b))) => match String::from_utf8(b) {
            Ok(t) => InboundFrame::Text(t),
            Err(_) => InboundFrame::Ignore,
        },
        Some(Ok(Message::Pong(_) | Message::Ping(_) | Message::Frame(_))) => InboundFrame::Ignore,
        Some(Ok(Message::Close(frame))) => InboundFrame::Lost(format!("remote close: {frame:?}")),
        Some(Err(e)) => InboundFrame::Lost(format!("ws read: {}", err_chain(&e))),
        None => InboundFrame::Lost("ws stream ended".into()),
    }
}

/// Send a keepalive ping unless inbound traffic has been silent past the
/// deadline. Returns the loss reason if the connection should be considered
/// dead, else `None`.
pub(super) async fn send_keepalive_ping(ws: &mut Ws, last_inbound: Instant) -> Option<String> {
    if last_inbound.elapsed() > KEEPALIVE_MISS_DEADLINE {
        return Some(format!("no traffic for {:?}", last_inbound.elapsed()));
    }
    if let Err(e) = ws.send(Message::Ping(vec![])).await {
        return Some(format!("ping send: {}", err_chain(&e)));
    }
    None
}

/// A control WebSocket task: connect, handshake, run a message loop, reconnect.
/// [`run_control_loop`] drives the shared connect/backoff/exit state machine;
/// implementors supply the protocol-specific handshake and message loop.
pub(super) trait ControlConnection: Send + 'static {
    fn url(&self) -> &str;
    fn api_key(&self) -> &str;
    fn cancel(&self) -> &CancellationToken;
    /// Prefix for reconnect log lines, e.g. `"control"` / `"parallel control"`.
    fn label(&self) -> &'static str;
    fn status_tx(&self) -> &watch::Sender<ConnectionStatus>;
    /// Fail the pending create one-shot with `reason` (no-op once fired).
    fn fail_pending(&mut self, reason: String);
    /// Create on first connect, attach/resume on reconnect.
    fn handshake(&mut self, ws: Ws) -> impl Future<Output = Result<Ws, HandshakeError>> + Send;
    /// Drive the established connection until it exits.
    fn message_loop(&mut self, ws: Ws) -> impl Future<Output = MessageLoopExit> + Send;

    fn publish(&self, status: ConnectionStatus) {
        publish_status(self.status_tx(), status);
    }

    fn finish_failed(&mut self, reason: String) {
        self.fail_pending(reason.clone());
        self.publish(ConnectionStatus::Failed(reason));
    }
}

/// Drive a [`ControlConnection`] through its connect → handshake → run →
/// reconnect lifecycle until the session ends, is cancelled, or the reconnect
/// budget is exhausted.
pub(super) async fn run_control_loop<T: ControlConnection>(mut task: T) {
    let mut budget = ReconnectBudget::new();

    loop {
        if task.cancel().is_cancelled() {
            task.fail_pending("cancelled before session created".to_string());
            return;
        }
        task.publish(ConnectionStatus::Down);

        let ws = match connect_ws(task.url(), task.api_key()).await {
            Ok(ws) => ws,
            Err(why) => {
                if let Some(delay) = budget.next_backoff() {
                    warn!(attempt = budget.attempt(), error = %why, ?delay, "{} connect failed, retrying", task.label());
                    if !cancellable_sleep(delay, task.cancel()).await {
                        return;
                    }
                    continue;
                }
                task.finish_failed(format!("connect: {why}"));
                return;
            }
        };

        let ws = match task.handshake(ws).await {
            Ok(ws) => ws,
            Err(HandshakeError::Fatal(why)) => {
                task.finish_failed(format!("handshake: {why}"));
                return;
            }
            Err(HandshakeError::Transient(why)) => {
                if let Some(delay) = budget.next_backoff() {
                    warn!(attempt = budget.attempt(), error = %why, ?delay, "{} handshake failed, retrying", task.label());
                    if !cancellable_sleep(delay, task.cancel()).await {
                        return;
                    }
                    continue;
                }
                task.finish_failed(format!("handshake: {why}"));
                return;
            }
        };

        task.publish(ConnectionStatus::Up);
        let connected_at = Instant::now();

        match task.message_loop(ws).await {
            MessageLoopExit::SessionEnded | MessageLoopExit::Cancelled => return,
            MessageLoopExit::ConnectionLost(why) => {
                if connected_at.elapsed() >= RECONNECT_UPTIME_RESET {
                    budget.reset();
                }
                if let Some(delay) = budget.next_backoff() {
                    warn!(attempt = budget.attempt(), reason = %why, ?delay, "{} connection lost, reconnecting", task.label());
                    if !cancellable_sleep(delay, task.cancel()).await {
                        return;
                    }
                    continue;
                }
                task.finish_failed(format!("connection lost: {why}"));
                return;
            }
            MessageLoopExit::Terminal(why) => {
                task.finish_failed(why);
                return;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn budget_exhausts_after_max_attempts() {
        let mut b = ReconnectBudget::new();
        for _ in 0..RECONNECT_MAX_ATTEMPTS {
            assert!(b.next_backoff().is_some());
        }
        assert!(b.next_backoff().is_none());
    }

    #[test]
    fn budget_reset_restores_full_budget() {
        let mut b = ReconnectBudget::new();
        b.next_backoff();
        b.next_backoff();
        b.reset();
        assert_eq!(b.attempt(), 0);
    }

    #[test]
    fn streaming_guard_balances_the_count() {
        let coord = Arc::new(ReconnectCoordinator::new());
        assert_eq!(coord.streaming.load(Ordering::SeqCst), 0);
        let g = coord.enter();
        assert_eq!(coord.streaming.load(Ordering::SeqCst), 1);
        drop(g);
        assert_eq!(coord.streaming.load(Ordering::SeqCst), 0);
    }

    #[tokio::test]
    async fn reconnect_slot_available_when_link_is_quiet() {
        let coord = Arc::new(ReconnectCoordinator::new());
        let cancel = CancellationToken::new();
        assert!(coord.reconnect_slot(&cancel).await.is_some());
    }

    #[tokio::test]
    async fn reconnect_slot_unparks_when_last_sibling_leaves() {
        let coord = Arc::new(ReconnectCoordinator::new());
        let cancel = CancellationToken::new();
        let guard = coord.enter(); // streaming == 1, so a slot request must park

        let waiter = tokio::spawn({
            let coord = coord.clone();
            let cancel = cancel.clone();
            async move { coord.reconnect_slot(&cancel).await.is_some() }
        });

        // While a sibling streams, the parked request stays pending.
        tokio::task::yield_now().await;
        assert!(!waiter.is_finished());

        drop(guard); // streaming -> 0, wakes the parked request
        assert!(waiter.await.unwrap());
    }

    #[tokio::test]
    async fn reconnect_slot_returns_none_on_cancel_while_parked() {
        let coord = Arc::new(ReconnectCoordinator::new());
        let _guard = coord.enter(); // keep the link busy so the request parks
        let cancel = CancellationToken::new();
        cancel.cancel();
        assert!(coord.reconnect_slot(&cancel).await.is_none());
    }

    #[test]
    fn discount_parked_does_not_consume_the_budget() {
        let mut b = ReconnectBudget::new();
        // A long park must leave the full attempt budget intact afterwards.
        b.discount_parked(2 * RECONNECT_MAX_TOTAL);
        for _ in 0..RECONNECT_MAX_ATTEMPTS {
            assert!(b.next_backoff().is_some());
        }
    }
}