foxglove 0.25.0

Foxglove SDK
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
//! Pure decision logic for the dormant-phase watch loop.
//!
//! The shell in `connection.rs` is responsible for I/O (opening the watch stream, sleeping,
//! flipping connection status, cancelling the gateway). This module owns the *policy*: given a
//! connect error or a terminal [`WatchOutcome`], how should we mutate the retry state and what
//! should the shell do next? Keeping the policy pure lets us cover the branchy cases with cheap
//! synchronous tests.

use std::time::Duration;

use crate::api_client::WatchWakeEvent;

use super::watch::{HeartbeatExit, WatchError, WatchOutcome};

/// Backoff applied after transient watch-loop failures, capped at this value. Starts small and
/// doubles up to the cap. Reset on a successful connect.
pub(super) const MAX_BACKOFF: Duration = Duration::from_secs(30);
pub(super) const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
/// Backoff applied when another gateway holds the lease (409 Conflict). Picked conservatively
/// since the API owns lease TTLs and does not advertise them to the gateway.
pub(super) const LEASE_CONFLICT_BACKOFF: Duration = Duration::from_secs(30);
/// Backoff applied when the API returns a 2xx with a non-SSE body — the shape of an upstream
/// maintenance page. Fixed (not exponential) so the gateway recovers promptly when the
/// maintenance window ends.
pub(super) const NON_SSE_RESPONSE_BACKOFF: Duration = Duration::from_secs(30);

/// Mutable state carried across iterations of the watch loop.
pub(super) struct WatchRetryState {
    /// Lease ID of the previous watch, threaded into the next connect attempt so the API can
    /// short-circuit a conflict against our own prior lease. Cleared once a fresh watch is
    /// established or once the API tells us another lease owns the device.
    previous_lease_id: Option<String>,
    /// Backoff applied to the next transient retry. Doubled per failure up to [`MAX_BACKOFF`];
    /// reset by [`on_connect_success`].
    backoff: Duration,
}

impl WatchRetryState {
    pub fn new() -> Self {
        Self {
            previous_lease_id: None,
            backoff: INITIAL_BACKOFF,
        }
    }

    fn double_backoff(&mut self) {
        self.backoff = self.backoff.saturating_mul(2).min(MAX_BACKOFF);
    }

    pub fn previous_lease_id(&self) -> Option<&str> {
        self.previous_lease_id.as_deref()
    }
}

/// What the shell should do after a failed [`super::watch::Watch::connect`] attempt.
#[derive(Debug, PartialEq, Eq)]
pub(super) enum ConnectAction {
    /// Transition to `Connecting`, sleep for `delay`, then retry the connect.
    RetryAfter(Duration),
    /// Cancel the gateway and stop the watch loop.
    StopUnauthorized,
}

/// What the shell should do after a [`WatchOutcome`] terminates a connected watch.
#[derive(Debug, PartialEq, Eq)]
pub(super) enum WatchAction {
    /// A `wake` arrived: stop the watch loop and proceed to LiveKit.
    Wake(WatchWakeEvent),
    /// Soft reconnect: iterate immediately. The shell does NOT flip status to `Connecting`;
    /// the next connect attempt will only flip if it fails, and `connect_watch` owns the
    /// backoff schedule from there. Used for blips we expect to recover from quickly
    /// (LB-induced transport drops, read-timeouts, clean stream-end, lease gone). State
    /// updates (previous lease ID) have already been applied to the [`WatchRetryState`].
    Reconnect,
    /// Hard backoff: a known long disruption. The shell flips status to `Connecting` before
    /// sleeping `delay` and iterating. Used when we know we'll be off the wire long enough
    /// that callers should see the degradation (e.g. lease conflict).
    Backoff { delay: Duration },
    /// Cancel the gateway and stop the watch loop (e.g. heartbeat returned 401).
    StopUnauthorized,
    /// Stop the watch loop without cancelling. Reserved for the defensive "heartbeat task
    /// dropped its sender" path, which only fires if the heartbeat task panicked or was
    /// externally aborted before reporting a terminal reason.
    Stop,
}

/// Apply the state mutations that follow a successful [`super::watch::Watch::connect`]: drop
/// the previous lease ID (it has now been replaced) and reset the transient-retry backoff.
pub(super) fn on_connect_success(retry: &mut WatchRetryState) {
    retry.previous_lease_id = None;
    retry.backoff = INITIAL_BACKOFF;
}

/// Classify a connect error and update `retry` accordingly.
pub(super) fn on_connect_error(err: &WatchError, retry: &mut WatchRetryState) -> ConnectAction {
    if matches!(err, WatchError::Unauthorized) {
        return ConnectAction::StopUnauthorized;
    }
    let delay = match err {
        WatchError::Conflict => {
            // Another gateway owns the device. Our previous lease is irrelevant; drop it so
            // the next attempt does not advertise it.
            retry.previous_lease_id = None;
            LEASE_CONFLICT_BACKOFF
        }
        WatchError::UnexpectedContentType { .. } => {
            // Looks like a maintenance page or misrouted LB. Use a fixed delay rather than
            // escalating the transient backoff.
            NON_SSE_RESPONSE_BACKOFF
        }
        _ => {
            let delay = retry.backoff;
            retry.double_backoff();
            delay
        }
    };
    ConnectAction::RetryAfter(delay)
}

/// Classify the terminal outcome of a connected watch and update `retry` accordingly.
///
/// `lease_id` is captured from the watch's `hello` before it was closed, and is threaded into
/// the next connect for transient-error reconnects.
///
/// `watch_duration` is how long the watch was running. `heartbeat_interval` is the value
/// advertised by the server in the `hello` event. A `StreamError` after the watch ran for at
/// least `heartbeat_interval` is treated as a soft blip (LB-style transient drop) and retries
/// immediately. Earlier errors fall through to backoff to avoid hot-looping.
pub(super) fn on_outcome(
    outcome: WatchOutcome,
    lease_id: String,
    watch_duration: Duration,
    heartbeat_interval: Duration,
    retry: &mut WatchRetryState,
) -> WatchAction {
    match outcome {
        WatchOutcome::Wake(wake) => WatchAction::Wake(wake),
        // Read-timeout and clean stream-end are normal protocol behaviour. Tthe API closed the
        // dormant stream. Retry immediately and reuse our lease.
        WatchOutcome::ReadTimeout | WatchOutcome::StreamEnded => {
            retry.previous_lease_id = Some(lease_id);
            WatchAction::Reconnect
        }
        // Transport errors split on whether the watch ran long enough to be considered
        // healthy. A long-lived watch that errors is almost certainly an LB-driven drop:
        // retry immediately and stay Connected. A short-lived watch that errors signals a
        // real fault:  apply backoff and surface Connecting to callers.
        WatchOutcome::StreamError(_) => {
            retry.previous_lease_id = Some(lease_id);
            if watch_duration >= heartbeat_interval {
                WatchAction::Reconnect
            } else {
                let delay = retry.backoff;
                retry.double_backoff();
                WatchAction::Backoff { delay }
            }
        }
        WatchOutcome::HeartbeatLost(reason) => match reason {
            // Another gateway took over: drop our lease ID so the next connect does not
            // advertise it, and back off conservatively. The 30s wait is long enough that
            // callers should see the degradation, so we ask the shell to flip to Connecting.
            HeartbeatExit::Conflict => {
                retry.previous_lease_id = None;
                WatchAction::Backoff {
                    delay: LEASE_CONFLICT_BACKOFF,
                }
            }
            // Lease vanished server-side: drop the ID and reconnect to acquire a fresh lease.
            HeartbeatExit::Gone => {
                retry.previous_lease_id = None;
                WatchAction::Reconnect
            }
            HeartbeatExit::Unauthorized => WatchAction::StopUnauthorized,
            // Repeated heartbeat failures without a terminal status: the lease may still be
            // valid server-side, so carry it through.
            HeartbeatExit::Failed => {
                retry.previous_lease_id = Some(lease_id);
                WatchAction::Reconnect
            }
            HeartbeatExit::Cancelled => WatchAction::Stop,
        },
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn wake_event() -> WatchWakeEvent {
        WatchWakeEvent {
            remote_access_session_id: Some("ras_test".into()),
            url: "wss://livekit.example".into(),
            token: "lk_token".into(),
        }
    }

    fn lease() -> String {
        "lease-abc".into()
    }

    const HEARTBEAT: Duration = Duration::from_secs(10);
    const HEALTHY: Duration = Duration::from_secs(60);
    const SHORT: Duration = Duration::from_millis(100);

    #[test]
    fn double_backoff_caps_at_max() {
        let mut state = WatchRetryState::new();
        for _ in 0..20 {
            state.double_backoff();
        }
        assert_eq!(state.backoff, MAX_BACKOFF);
    }

    // --- on_connect_success ---

    #[test]
    fn connect_success_clears_lease_and_resets_backoff() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("stale".into()),
            backoff: Duration::from_secs(8),
        };
        on_connect_success(&mut state);
        assert_eq!(state.previous_lease_id, None);
        assert_eq!(state.backoff, INITIAL_BACKOFF);
    }

    // --- on_connect_error ---

    #[test]
    fn connect_error_unauthorized_stops_without_mutating_state() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("keep-me".into()),
            backoff: Duration::from_secs(4),
        };
        let action = on_connect_error(&WatchError::Unauthorized, &mut state);
        assert_eq!(action, ConnectAction::StopUnauthorized);
        // Unauthorized is terminal: state is irrelevant after, but check we didn't disturb it.
        assert_eq!(state.previous_lease_id.as_deref(), Some("keep-me"));
        assert_eq!(state.backoff, Duration::from_secs(4));
    }

    #[test]
    fn connect_error_conflict_drops_lease_and_uses_lease_conflict_backoff() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("ours".into()),
            backoff: Duration::from_secs(2),
        };
        let action = on_connect_error(&WatchError::Conflict, &mut state);
        assert_eq!(action, ConnectAction::RetryAfter(LEASE_CONFLICT_BACKOFF));
        assert_eq!(state.previous_lease_id, None);
        // Conflict uses its own fixed delay and leaves the transient backoff untouched.
        assert_eq!(state.backoff, Duration::from_secs(2));
    }

    #[test]
    fn connect_error_unexpected_content_type_uses_fixed_backoff() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("keep".into()),
            backoff: Duration::from_secs(2),
        };
        let action = on_connect_error(
            &WatchError::UnexpectedContentType {
                content_type: Some("text/html".into()),
            },
            &mut state,
        );
        assert_eq!(action, ConnectAction::RetryAfter(NON_SSE_RESPONSE_BACKOFF));
        assert_eq!(state.previous_lease_id.as_deref(), Some("keep"));
        // Maintenance backoff is fixed; transient backoff is left untouched.
        assert_eq!(state.backoff, Duration::from_secs(2));
    }

    #[test]
    fn connect_error_generic_uses_current_backoff_then_doubles() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("keep".into()),
            backoff: Duration::from_secs(2),
        };
        let action = on_connect_error(&WatchError::UnexpectedEof, &mut state);
        assert_eq!(action, ConnectAction::RetryAfter(Duration::from_secs(2)));
        // Lease must be preserved across transient connect failures so the eventual successful
        // reconnect can hand it to the API.
        assert_eq!(state.previous_lease_id.as_deref(), Some("keep"));
        assert_eq!(state.backoff, Duration::from_secs(4));
    }

    #[test]
    fn connect_error_generic_caps_backoff_at_max() {
        let mut state = WatchRetryState {
            previous_lease_id: None,
            backoff: MAX_BACKOFF,
        };
        let action = on_connect_error(&WatchError::HelloTimeout, &mut state);
        assert_eq!(action, ConnectAction::RetryAfter(MAX_BACKOFF));
        assert_eq!(state.backoff, MAX_BACKOFF);
    }

    // --- on_outcome ---

    #[test]
    fn outcome_wake_returns_wake() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("untouched".into()),
            backoff: Duration::from_secs(8),
        };
        let action = on_outcome(
            WatchOutcome::Wake(wake_event()),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Wake(wake_event()));
        // Wake doesn't touch state; the next connect-success will reset it.
        assert_eq!(state.previous_lease_id.as_deref(), Some("untouched"));
        assert_eq!(state.backoff, Duration::from_secs(8));
    }

    #[test]
    fn outcome_read_timeout_reconnects_immediately_with_lease() {
        let mut state = WatchRetryState {
            previous_lease_id: None,
            backoff: Duration::from_secs(8),
        };
        let action = on_outcome(
            WatchOutcome::ReadTimeout,
            lease(),
            SHORT,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Reconnect);
        assert_eq!(state.previous_lease_id, Some(lease()));
        // No backoff change: read-timeout is normal protocol behaviour.
        assert_eq!(state.backoff, Duration::from_secs(8));
    }

    #[test]
    fn outcome_stream_ended_reconnects_immediately_with_lease() {
        let mut state = WatchRetryState::new();
        let action = on_outcome(
            WatchOutcome::StreamEnded,
            lease(),
            SHORT,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Reconnect);
        assert_eq!(state.previous_lease_id, Some(lease()));
        assert_eq!(state.backoff, INITIAL_BACKOFF);
    }

    #[test]
    fn outcome_stream_error_after_healthy_run_reconnects_immediately() {
        let mut state = WatchRetryState {
            previous_lease_id: None,
            backoff: Duration::from_secs(4),
        };
        let action = on_outcome(
            WatchOutcome::StreamError(WatchError::UnexpectedEof),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Reconnect);
        assert_eq!(state.previous_lease_id, Some(lease()));
        // Healthy run: backoff is left to connect_watch — no escalation here.
        assert_eq!(state.backoff, Duration::from_secs(4));
    }

    #[test]
    fn outcome_stream_error_after_short_run_backs_off_and_doubles() {
        let mut state = WatchRetryState {
            previous_lease_id: None,
            backoff: Duration::from_secs(4),
        };
        let action = on_outcome(
            WatchOutcome::StreamError(WatchError::UnexpectedEof),
            lease(),
            SHORT,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(
            action,
            WatchAction::Backoff {
                delay: Duration::from_secs(4),
            }
        );
        assert_eq!(state.previous_lease_id, Some(lease()));
        assert_eq!(state.backoff, Duration::from_secs(8));
    }

    #[test]
    fn outcome_stream_error_short_run_caps_backoff_at_max() {
        let mut state = WatchRetryState {
            previous_lease_id: None,
            backoff: MAX_BACKOFF,
        };
        let action = on_outcome(
            WatchOutcome::StreamError(WatchError::UnexpectedEof),
            lease(),
            SHORT,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Backoff { delay: MAX_BACKOFF });
        assert_eq!(state.backoff, MAX_BACKOFF);
    }

    #[test]
    fn outcome_heartbeat_conflict_drops_lease_with_conflict_backoff() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("ours".into()),
            backoff: Duration::from_secs(8),
        };
        let action = on_outcome(
            WatchOutcome::HeartbeatLost(HeartbeatExit::Conflict),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(
            action,
            WatchAction::Backoff {
                delay: LEASE_CONFLICT_BACKOFF,
            }
        );
        assert_eq!(state.previous_lease_id, None);
        // Conflict on the heartbeat path doesn't escalate transient backoff — the conflict
        // delay is its own thing.
        assert_eq!(state.backoff, Duration::from_secs(8));
    }

    #[test]
    fn outcome_heartbeat_gone_drops_lease_no_delay() {
        let mut state = WatchRetryState {
            previous_lease_id: Some("ours".into()),
            backoff: INITIAL_BACKOFF,
        };
        let action = on_outcome(
            WatchOutcome::HeartbeatLost(HeartbeatExit::Gone),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Reconnect);
        assert_eq!(state.previous_lease_id, None);
    }

    #[test]
    fn outcome_heartbeat_unauthorized_stops() {
        let mut state = WatchRetryState::new();
        let action = on_outcome(
            WatchOutcome::HeartbeatLost(HeartbeatExit::Unauthorized),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::StopUnauthorized);
    }

    #[test]
    fn outcome_heartbeat_failed_keeps_lease_no_delay() {
        let mut state = WatchRetryState::new();
        let action = on_outcome(
            WatchOutcome::HeartbeatLost(HeartbeatExit::Failed),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Reconnect);
        assert_eq!(state.previous_lease_id, Some(lease()));
    }

    #[test]
    fn outcome_heartbeat_cancelled_stops_without_unauthorized() {
        let mut state = WatchRetryState::new();
        let action = on_outcome(
            WatchOutcome::HeartbeatLost(HeartbeatExit::Cancelled),
            lease(),
            HEALTHY,
            HEARTBEAT,
            &mut state,
        );
        assert_eq!(action, WatchAction::Stop);
    }
}