use super::Client;
use crate::events::{ConnectionState, Error};
use crate::utils::backoff::ExponentialBackoff;
use std::env;
use std::time::{Duration, Instant};
use teamtalk_sys as ffi;
#[derive(Clone)]
pub struct ReconnectConfig {
pub max_attempts: u32,
pub min_delay: Duration,
pub max_delay: Duration,
pub stability_threshold: Duration,
}
impl Default for ReconnectConfig {
fn default() -> Self {
Self {
max_attempts: u32::MAX,
min_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(60),
stability_threshold: Duration::from_secs(10),
}
}
}
#[derive(Clone)]
pub struct ReconnectWorkflowConfig {
pub login: ReconnectConfig,
pub join: ReconnectConfig,
}
impl Default for ReconnectWorkflowConfig {
fn default() -> Self {
let defaults = ReconnectConfig::default();
Self {
login: defaults.clone(),
join: defaults,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReconnectPhaseTimeouts {
pub connect: Option<Duration>,
pub login: Option<Duration>,
pub join: Option<Duration>,
}
impl ReconnectPhaseTimeouts {
pub const fn disabled() -> Self {
Self {
connect: None,
login: None,
join: None,
}
}
}
impl Default for ReconnectPhaseTimeouts {
fn default() -> Self {
let timeout = Some(Duration::from_secs(15));
Self {
connect: timeout,
login: timeout,
join: timeout,
}
}
}
pub struct ReconnectHandler {
pub config: ReconnectConfig,
backoff: ExponentialBackoff,
attempts: u32,
last_attempt: Option<Instant>,
connected_at: Option<Instant>,
}
impl ReconnectHandler {
pub fn new(config: ReconnectConfig) -> Self {
let backoff = ExponentialBackoff::new(config.min_delay, config.max_delay, 1.6, 1.0);
Self {
config,
backoff,
attempts: 0,
last_attempt: None,
connected_at: None,
}
}
pub fn mark_connected(&mut self) {
self.connected_at = Some(Instant::now());
}
pub fn mark_disconnected(&mut self) {
if let Some(at) = self.connected_at
&& at.elapsed() >= self.config.stability_threshold
{
self.attempts = 0;
self.backoff.reset();
}
self.connected_at = None;
}
pub fn can_attempt(&self) -> bool {
if self.attempts >= self.config.max_attempts {
return false;
}
match self.last_attempt {
Some(last) => last.elapsed() >= self.backoff.current_delay(),
None => true,
}
}
pub fn record_attempt(&mut self) {
self.last_attempt = Some(Instant::now());
self.attempts += 1;
self.backoff.next_delay();
}
pub fn reset(&mut self) {
self.attempts = 0;
self.last_attempt = None;
self.connected_at = None;
self.backoff.reset();
}
pub fn current_delay(&self) -> Duration {
self.backoff.current_delay()
}
pub fn attempts(&self) -> u32 {
self.attempts
}
pub fn exhausted(&self) -> bool {
self.attempts >= self.config.max_attempts
}
}
#[derive(Debug, Clone)]
pub struct ConnectParams<'a> {
pub host: &'a str,
pub tcp: i32,
pub udp: i32,
pub encrypted: bool,
}
#[derive(Debug, Clone)]
pub struct ConnectParamsOwned {
pub host: String,
pub tcp: i32,
pub udp: i32,
pub encrypted: bool,
}
impl ConnectParamsOwned {
pub fn new(host: impl Into<String>, tcp: i32, udp: i32, encrypted: bool) -> Self {
Self {
host: host.into(),
tcp,
udp,
encrypted,
}
}
pub fn from_env() -> Self {
let host = env::var("TT_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let tcp = env::var("TT_TCP")
.ok()
.and_then(|value| value.parse::<i32>().ok())
.unwrap_or(10333);
let udp = env::var("TT_UDP")
.ok()
.and_then(|value| value.parse::<i32>().ok())
.unwrap_or(10333);
let encrypted = env::var("TT_ENCRYPTED")
.ok()
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false);
Self::new(host, tcp, udp, encrypted)
}
}
impl<'a> From<&ConnectParams<'a>> for ConnectParamsOwned {
fn from(params: &ConnectParams<'a>) -> Self {
Self::new(params.host, params.tcp, params.udp, params.encrypted)
}
}
fn reset_auto_recovery_handlers(auto: &mut super::core::AutoReconnectState) {
auto.login_handler = Some(ReconnectHandler::new(auto.workflow.login.clone()));
auto.join_handler = Some(ReconnectHandler::new(auto.workflow.join.clone()));
auto.login_gave_up = false;
auto.join_gave_up = false;
auto.recovery_completed = false;
auto.clear_phase_tracking();
}
fn validate_client_keep_alive(keep_alive: &crate::types::ClientKeepAlive) -> Result<(), Error> {
if keep_alive.lost_ms <= 0 {
return Err(Error::InvalidParam);
}
if keep_alive.udp_interval_ms <= 0 {
return Err(Error::InvalidParam);
}
if keep_alive.lost_ms <= keep_alive.udp_interval_ms {
return Err(Error::InvalidParam);
}
if keep_alive.tcp_interval_ms > 0 && keep_alive.lost_ms <= keep_alive.tcp_interval_ms {
return Err(Error::InvalidParam);
}
Ok(())
}
fn dedupe_events(events: Vec<crate::events::Event>) -> Vec<crate::events::Event> {
let mut unique = Vec::with_capacity(events.len());
for event in events {
if unique
.iter()
.any(|existing| std::mem::discriminant(existing) == std::mem::discriminant(&event))
{
continue;
}
unique.push(event);
}
unique
}
fn validate_phase_timeouts(timeouts: &ReconnectPhaseTimeouts) -> Result<(), Error> {
for timeout in [timeouts.connect, timeouts.login, timeouts.join] {
if matches!(timeout, Some(value) if value.is_zero()) {
return Err(Error::InvalidParam);
}
}
Ok(())
}
fn ensure_connect_not_busy(has_connection_flags: bool) -> Result<(), Error> {
if has_connection_flags {
return Err(Error::CommandFailed {
code: -1,
message: "Connect refused: client is already connecting or connected".to_string(),
});
}
Ok(())
}
mod auto_reconnect;
mod connect_ops;
mod keep_alive;