use super::{
CloseReason, ConnectionStatus, FatalKind, GapReason, ReconnectPolicy, ReplayPhase, ResumeStart,
ServerCloseReason, WatchEvent, WatchMode, WatchOutcome,
};
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WatchState {
replay_phase: ReplayPhase,
connection_status: ConnectionStatus,
mode: WatchMode,
}
impl WatchState {
#[must_use]
pub fn watch(start: Option<ResumeStart>) -> Self {
let replay_phase = match start {
Some(start) => ReplayPhase::Replaying {
start,
replay_completed: false,
},
None => ReplayPhase::Live,
};
Self {
replay_phase,
connection_status: ConnectionStatus::Reconnecting,
mode: WatchMode::Watch,
}
}
#[must_use]
pub fn replay_only(start: ResumeStart) -> Self {
Self {
replay_phase: ReplayPhase::Replaying {
start,
replay_completed: false,
},
connection_status: ConnectionStatus::Reconnecting,
mode: WatchMode::ReplayOnly,
}
}
#[must_use]
pub fn replay_phase(&self) -> &ReplayPhase {
&self.replay_phase
}
#[must_use]
pub fn connection_status(&self) -> &ConnectionStatus {
&self.connection_status
}
#[must_use]
pub fn mode(&self) -> WatchMode {
self.mode
}
#[must_use]
pub fn is_terminal(&self) -> bool {
matches!(self.replay_phase, ReplayPhase::Closed { .. })
}
pub fn transition(&mut self, event: WatchEvent) -> WatchOutcome {
if self.is_terminal() {
return WatchOutcome::Continue;
}
match event {
WatchEvent::ConnectionEstablished => {
self.connection_status = ConnectionStatus::Connected;
WatchOutcome::Continue
}
WatchEvent::ConnectionLost { .. } | WatchEvent::HeartbeatStarvation => {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Reconnect {
policy: ReconnectPolicy::ExponentialBackoff,
}
}
WatchEvent::ServerClose { reason } => self.handle_server_close(reason),
WatchEvent::BackoffStarted(duration) => {
self.connection_status = ConnectionStatus::BackoffWait(duration);
WatchOutcome::Continue
}
WatchEvent::BackoffElapsed => {
if matches!(self.connection_status, ConnectionStatus::BackoffWait(_)) {
self.connection_status = ConnectionStatus::Reconnecting;
}
WatchOutcome::Continue
}
WatchEvent::AuthRejected => {
self.connection_status = ConnectionStatus::RefreshingAuth;
WatchOutcome::RefreshAuth
}
WatchEvent::AuthRefreshCompleted { success: true } => {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Continue
}
WatchEvent::AuthRefreshCompleted { success: false } => {
self.close_with(CloseReason::Fatal {
kind: FatalKind::AuthenticationRejectedAfterRefresh,
})
}
WatchEvent::HeartbeatReceived | WatchEvent::NotificationReceived { .. } => {
WatchOutcome::Continue
}
WatchEvent::ReplayCompleted => self.handle_replay_completed(),
WatchEvent::GapDetected(reason) => self.enter_gap(reason),
WatchEvent::Fatal(kind) => self.close_with(CloseReason::Fatal { kind }),
WatchEvent::Stop => self.close_with(CloseReason::UserRequested),
}
}
fn handle_server_close(&mut self, reason: ServerCloseReason) -> WatchOutcome {
match reason {
ServerCloseReason::MaxDurationReached => {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Reconnect {
policy: ReconnectPolicy::Immediate,
}
}
ServerCloseReason::ServerShutdown => {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Reconnect {
policy: ReconnectPolicy::ShortBackoff,
}
}
ServerCloseReason::EndOfStream => self.handle_end_of_stream(),
}
}
fn handle_end_of_stream(&mut self) -> WatchOutcome {
match self.mode {
WatchMode::Watch => {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Reconnect {
policy: ReconnectPolicy::Immediate,
}
}
WatchMode::ReplayOnly => {
if let ReplayPhase::Replaying {
replay_completed: true,
..
} = &self.replay_phase
{
self.close_with(CloseReason::EndOfStream)
} else {
self.connection_status = ConnectionStatus::Reconnecting;
WatchOutcome::Reconnect {
policy: ReconnectPolicy::Immediate,
}
}
}
}
}
fn handle_replay_completed(&mut self) -> WatchOutcome {
match (self.mode, &self.replay_phase) {
(WatchMode::Watch, ReplayPhase::Replaying { .. }) => {
self.replay_phase = ReplayPhase::Live;
}
(
WatchMode::ReplayOnly,
ReplayPhase::Replaying {
replay_completed: false,
start,
},
) => {
let start = start.clone();
self.replay_phase = ReplayPhase::Replaying {
start,
replay_completed: true,
};
}
(
WatchMode::Watch,
ReplayPhase::Live | ReplayPhase::GapDetected { .. } | ReplayPhase::Closed { .. },
)
| (
WatchMode::ReplayOnly,
ReplayPhase::Replaying {
replay_completed: true,
..
}
| ReplayPhase::Live
| ReplayPhase::GapDetected { .. }
| ReplayPhase::Closed { .. },
) => {}
}
WatchOutcome::Continue
}
fn close_with(&mut self, reason: CloseReason) -> WatchOutcome {
self.replay_phase = ReplayPhase::Closed {
reason: reason.clone(),
};
WatchOutcome::Stop { reason }
}
fn enter_gap(&mut self, reason: GapReason) -> WatchOutcome {
self.replay_phase = ReplayPhase::GapDetected { reason };
WatchOutcome::Gap { reason }
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::panic,
reason = "test code: panic-on-unexpected is the expected diagnostic"
)]
mod tests;