use std::time::Duration;
use crate::api_client::WatchWakeEvent;
use super::watch::{HeartbeatExit, WatchError, WatchOutcome};
pub(super) const MAX_BACKOFF: Duration = Duration::from_secs(30);
pub(super) const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
pub(super) const LEASE_CONFLICT_BACKOFF: Duration = Duration::from_secs(30);
pub(super) const NON_SSE_RESPONSE_BACKOFF: Duration = Duration::from_secs(30);
pub(super) struct WatchRetryState {
previous_lease_id: Option<String>,
backoff: Duration,
}
impl WatchRetryState {
pub fn new() -> Self {
Self {
previous_lease_id: None,
backoff: INITIAL_BACKOFF,
}
}
fn double_backoff(&mut self) {
self.backoff = self.backoff.saturating_mul(2).min(MAX_BACKOFF);
}
pub fn previous_lease_id(&self) -> Option<&str> {
self.previous_lease_id.as_deref()
}
}
#[derive(Debug, PartialEq, Eq)]
pub(super) enum ConnectAction {
RetryAfter(Duration),
StopUnauthorized,
}
#[derive(Debug, PartialEq, Eq)]
pub(super) enum WatchAction {
Wake(WatchWakeEvent),
Reconnect,
Backoff { delay: Duration },
StopUnauthorized,
Stop,
}
pub(super) fn on_connect_success(retry: &mut WatchRetryState) {
retry.previous_lease_id = None;
retry.backoff = INITIAL_BACKOFF;
}
pub(super) fn on_connect_error(err: &WatchError, retry: &mut WatchRetryState) -> ConnectAction {
if matches!(err, WatchError::Unauthorized) {
return ConnectAction::StopUnauthorized;
}
let delay = match err {
WatchError::Conflict => {
retry.previous_lease_id = None;
LEASE_CONFLICT_BACKOFF
}
WatchError::UnexpectedContentType { .. } => {
NON_SSE_RESPONSE_BACKOFF
}
_ => {
let delay = retry.backoff;
retry.double_backoff();
delay
}
};
ConnectAction::RetryAfter(delay)
}
pub(super) fn on_outcome(
outcome: WatchOutcome,
lease_id: String,
watch_duration: Duration,
heartbeat_interval: Duration,
retry: &mut WatchRetryState,
) -> WatchAction {
match outcome {
WatchOutcome::Wake(wake) => WatchAction::Wake(wake),
WatchOutcome::ReadTimeout | WatchOutcome::StreamEnded => {
retry.previous_lease_id = Some(lease_id);
WatchAction::Reconnect
}
WatchOutcome::StreamError(_) => {
retry.previous_lease_id = Some(lease_id);
if watch_duration >= heartbeat_interval {
WatchAction::Reconnect
} else {
let delay = retry.backoff;
retry.double_backoff();
WatchAction::Backoff { delay }
}
}
WatchOutcome::HeartbeatLost(reason) => match reason {
HeartbeatExit::Conflict => {
retry.previous_lease_id = None;
WatchAction::Backoff {
delay: LEASE_CONFLICT_BACKOFF,
}
}
HeartbeatExit::Gone => {
retry.previous_lease_id = None;
WatchAction::Reconnect
}
HeartbeatExit::Unauthorized => WatchAction::StopUnauthorized,
HeartbeatExit::Failed => {
retry.previous_lease_id = Some(lease_id);
WatchAction::Reconnect
}
HeartbeatExit::Cancelled => WatchAction::Stop,
},
}
}
#[cfg(test)]
mod tests {
use super::*;
fn wake_event() -> WatchWakeEvent {
WatchWakeEvent {
remote_access_session_id: Some("ras_test".into()),
url: "wss://livekit.example".into(),
token: "lk_token".into(),
}
}
fn lease() -> String {
"lease-abc".into()
}
const HEARTBEAT: Duration = Duration::from_secs(10);
const HEALTHY: Duration = Duration::from_secs(60);
const SHORT: Duration = Duration::from_millis(100);
#[test]
fn double_backoff_caps_at_max() {
let mut state = WatchRetryState::new();
for _ in 0..20 {
state.double_backoff();
}
assert_eq!(state.backoff, MAX_BACKOFF);
}
#[test]
fn connect_success_clears_lease_and_resets_backoff() {
let mut state = WatchRetryState {
previous_lease_id: Some("stale".into()),
backoff: Duration::from_secs(8),
};
on_connect_success(&mut state);
assert_eq!(state.previous_lease_id, None);
assert_eq!(state.backoff, INITIAL_BACKOFF);
}
#[test]
fn connect_error_unauthorized_stops_without_mutating_state() {
let mut state = WatchRetryState {
previous_lease_id: Some("keep-me".into()),
backoff: Duration::from_secs(4),
};
let action = on_connect_error(&WatchError::Unauthorized, &mut state);
assert_eq!(action, ConnectAction::StopUnauthorized);
assert_eq!(state.previous_lease_id.as_deref(), Some("keep-me"));
assert_eq!(state.backoff, Duration::from_secs(4));
}
#[test]
fn connect_error_conflict_drops_lease_and_uses_lease_conflict_backoff() {
let mut state = WatchRetryState {
previous_lease_id: Some("ours".into()),
backoff: Duration::from_secs(2),
};
let action = on_connect_error(&WatchError::Conflict, &mut state);
assert_eq!(action, ConnectAction::RetryAfter(LEASE_CONFLICT_BACKOFF));
assert_eq!(state.previous_lease_id, None);
assert_eq!(state.backoff, Duration::from_secs(2));
}
#[test]
fn connect_error_unexpected_content_type_uses_fixed_backoff() {
let mut state = WatchRetryState {
previous_lease_id: Some("keep".into()),
backoff: Duration::from_secs(2),
};
let action = on_connect_error(
&WatchError::UnexpectedContentType {
content_type: Some("text/html".into()),
},
&mut state,
);
assert_eq!(action, ConnectAction::RetryAfter(NON_SSE_RESPONSE_BACKOFF));
assert_eq!(state.previous_lease_id.as_deref(), Some("keep"));
assert_eq!(state.backoff, Duration::from_secs(2));
}
#[test]
fn connect_error_generic_uses_current_backoff_then_doubles() {
let mut state = WatchRetryState {
previous_lease_id: Some("keep".into()),
backoff: Duration::from_secs(2),
};
let action = on_connect_error(&WatchError::UnexpectedEof, &mut state);
assert_eq!(action, ConnectAction::RetryAfter(Duration::from_secs(2)));
assert_eq!(state.previous_lease_id.as_deref(), Some("keep"));
assert_eq!(state.backoff, Duration::from_secs(4));
}
#[test]
fn connect_error_generic_caps_backoff_at_max() {
let mut state = WatchRetryState {
previous_lease_id: None,
backoff: MAX_BACKOFF,
};
let action = on_connect_error(&WatchError::HelloTimeout, &mut state);
assert_eq!(action, ConnectAction::RetryAfter(MAX_BACKOFF));
assert_eq!(state.backoff, MAX_BACKOFF);
}
#[test]
fn outcome_wake_returns_wake() {
let mut state = WatchRetryState {
previous_lease_id: Some("untouched".into()),
backoff: Duration::from_secs(8),
};
let action = on_outcome(
WatchOutcome::Wake(wake_event()),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Wake(wake_event()));
assert_eq!(state.previous_lease_id.as_deref(), Some("untouched"));
assert_eq!(state.backoff, Duration::from_secs(8));
}
#[test]
fn outcome_read_timeout_reconnects_immediately_with_lease() {
let mut state = WatchRetryState {
previous_lease_id: None,
backoff: Duration::from_secs(8),
};
let action = on_outcome(
WatchOutcome::ReadTimeout,
lease(),
SHORT,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Reconnect);
assert_eq!(state.previous_lease_id, Some(lease()));
assert_eq!(state.backoff, Duration::from_secs(8));
}
#[test]
fn outcome_stream_ended_reconnects_immediately_with_lease() {
let mut state = WatchRetryState::new();
let action = on_outcome(
WatchOutcome::StreamEnded,
lease(),
SHORT,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Reconnect);
assert_eq!(state.previous_lease_id, Some(lease()));
assert_eq!(state.backoff, INITIAL_BACKOFF);
}
#[test]
fn outcome_stream_error_after_healthy_run_reconnects_immediately() {
let mut state = WatchRetryState {
previous_lease_id: None,
backoff: Duration::from_secs(4),
};
let action = on_outcome(
WatchOutcome::StreamError(WatchError::UnexpectedEof),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Reconnect);
assert_eq!(state.previous_lease_id, Some(lease()));
assert_eq!(state.backoff, Duration::from_secs(4));
}
#[test]
fn outcome_stream_error_after_short_run_backs_off_and_doubles() {
let mut state = WatchRetryState {
previous_lease_id: None,
backoff: Duration::from_secs(4),
};
let action = on_outcome(
WatchOutcome::StreamError(WatchError::UnexpectedEof),
lease(),
SHORT,
HEARTBEAT,
&mut state,
);
assert_eq!(
action,
WatchAction::Backoff {
delay: Duration::from_secs(4),
}
);
assert_eq!(state.previous_lease_id, Some(lease()));
assert_eq!(state.backoff, Duration::from_secs(8));
}
#[test]
fn outcome_stream_error_short_run_caps_backoff_at_max() {
let mut state = WatchRetryState {
previous_lease_id: None,
backoff: MAX_BACKOFF,
};
let action = on_outcome(
WatchOutcome::StreamError(WatchError::UnexpectedEof),
lease(),
SHORT,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Backoff { delay: MAX_BACKOFF });
assert_eq!(state.backoff, MAX_BACKOFF);
}
#[test]
fn outcome_heartbeat_conflict_drops_lease_with_conflict_backoff() {
let mut state = WatchRetryState {
previous_lease_id: Some("ours".into()),
backoff: Duration::from_secs(8),
};
let action = on_outcome(
WatchOutcome::HeartbeatLost(HeartbeatExit::Conflict),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(
action,
WatchAction::Backoff {
delay: LEASE_CONFLICT_BACKOFF,
}
);
assert_eq!(state.previous_lease_id, None);
assert_eq!(state.backoff, Duration::from_secs(8));
}
#[test]
fn outcome_heartbeat_gone_drops_lease_no_delay() {
let mut state = WatchRetryState {
previous_lease_id: Some("ours".into()),
backoff: INITIAL_BACKOFF,
};
let action = on_outcome(
WatchOutcome::HeartbeatLost(HeartbeatExit::Gone),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Reconnect);
assert_eq!(state.previous_lease_id, None);
}
#[test]
fn outcome_heartbeat_unauthorized_stops() {
let mut state = WatchRetryState::new();
let action = on_outcome(
WatchOutcome::HeartbeatLost(HeartbeatExit::Unauthorized),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::StopUnauthorized);
}
#[test]
fn outcome_heartbeat_failed_keeps_lease_no_delay() {
let mut state = WatchRetryState::new();
let action = on_outcome(
WatchOutcome::HeartbeatLost(HeartbeatExit::Failed),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Reconnect);
assert_eq!(state.previous_lease_id, Some(lease()));
}
#[test]
fn outcome_heartbeat_cancelled_stops_without_unauthorized() {
let mut state = WatchRetryState::new();
let action = on_outcome(
WatchOutcome::HeartbeatLost(HeartbeatExit::Cancelled),
lease(),
HEALTHY,
HEARTBEAT,
&mut state,
);
assert_eq!(action, WatchAction::Stop);
}
}