use std::time::Duration;
use tokio::time::Instant;
use super::connection_state::{ConnectionState, StateTransition};
use super::disconnect::DisconnectReason;
use super::reconnect::{ReconnectAction, ReconnectPolicy};
#[derive(Debug, Clone)]
pub struct ConnectionManagerConfig {
pub initial_delay: Duration,
pub max_delay: Duration,
pub max_attempts: Option<u32>,
pub jitter_factor: f64,
}
impl Default for ConnectionManagerConfig {
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
max_attempts: None,
jitter_factor: 0.5,
}
}
}
pub struct ConnectionManager {
state: ConnectionState,
state_entered_at: Instant,
policy: ReconnectPolicy,
transitions: Vec<StateTransition>,
label: String,
}
impl ConnectionManager {
pub fn new(label: impl Into<String>, config: ConnectionManagerConfig) -> Self {
let policy = ReconnectPolicy::builder()
.initial_delay(config.initial_delay)
.max_delay(config.max_delay)
.max_attempts(config.max_attempts)
.jitter_factor(config.jitter_factor)
.build();
Self {
state: ConnectionState::Disconnected,
state_entered_at: Instant::now(),
policy,
transitions: Vec::new(),
label: label.into(),
}
}
pub fn with_defaults(label: impl Into<String>) -> Self {
Self::new(label, ConnectionManagerConfig::default())
}
pub fn state(&self) -> ConnectionState {
self.state
}
pub fn time_in_state(&self) -> Duration {
self.state_entered_at.elapsed()
}
pub fn consecutive_failures(&self) -> u32 {
self.policy.consecutive_failures()
}
pub fn transitions(&self) -> &[StateTransition] {
&self.transitions
}
pub fn transition(&mut self, new_state: ConnectionState, reason: impl Into<String>) {
let now = Instant::now();
let reason = reason.into();
let duration_in_prev = now.duration_since(self.state_entered_at);
let prev_state = self.state;
self.transitions.push(StateTransition {
at: now,
from: prev_state,
to: new_state,
reason: reason.clone(),
duration_in_prev,
});
tracing::info!(
label = %self.label,
from = %prev_state,
to = %new_state,
reason = %reason,
elapsed_in_prev_ms = duration_in_prev.as_millis() as u64,
"connection.state_transition"
);
self.state = new_state;
self.state_entered_at = now;
}
pub fn on_connected(&mut self) {
self.policy.on_connected();
}
pub fn on_disconnect(&mut self, reason: &DisconnectReason) -> ReconnectAction {
let attempt = self.policy.consecutive_failures() + 1;
self.transition(
ConnectionState::Reconnecting { attempt },
format!("{}", reason),
);
self.policy.next_action(reason)
}
}