stream-tungstenite 0.6.1

A streaming implementation of the Tungstenite WebSocket protocol
Documentation
//! Connection supervisor - manages connection lifecycle and reconnection.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;

use super::connector::{Connector, DefaultConnector};
use super::retry::{ExponentialBackoff, RetryStrategy};
use super::state::{ConnectionSnapshot, ConnectionState};
use crate::error::{ConnectError, DisconnectReason, SupervisorError};

/// Lightweight handle to update activity timestamp without borrowing the supervisor
#[derive(Clone)]
pub struct ActivityHandle {
    state: std::sync::Arc<ConnectionState>,
}

impl ActivityHandle {
    /// Mark activity (last activity timestamp) on the underlying connection state
    pub async fn update(&self) {
        self.state.update_activity().await;
    }
}

/// Connection events broadcast to listeners
#[derive(Debug, Clone)]
pub enum ConnectionEvent {
    /// Starting connection attempt
    Connecting { attempt: u32 },
    /// Successfully connected
    Connected { id: u64 },
    /// Disconnected
    Disconnected { reason: DisconnectReason },
    /// Reconnection scheduled
    ReconnectScheduled { delay: Duration, attempt: u32 },
    /// Error occurred (non-fatal)
    Error { error: ConnectError, attempt: u32 },
    /// Fatal error - will stop reconnecting
    FatalError { error: ConnectError },
    /// Shutdown initiated
    Shutdown,
}

/// Connection supervisor configuration
#[derive(Clone)]
pub struct SupervisorConfig {
    /// Retry strategy
    pub retry_strategy: Box<dyn RetryStrategy>,
    /// Connection timeout
    pub connect_timeout: Duration,
    /// Whether to exit on first connection failure
    pub exit_on_first_failure: bool,
}

impl Default for SupervisorConfig {
    fn default() -> Self {
        Self {
            retry_strategy: Box::new(ExponentialBackoff::standard()),
            connect_timeout: Duration::from_secs(30),
            exit_on_first_failure: false,
        }
    }
}

impl SupervisorConfig {
    /// Create a new supervisor config
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Set retry strategy
    #[must_use]
    pub fn with_retry(mut self, strategy: impl RetryStrategy + 'static) -> Self {
        self.retry_strategy = Box::new(strategy);
        self
    }

    /// Set connection timeout
    #[must_use]
    pub const fn with_connect_timeout(mut self, timeout: Duration) -> Self {
        self.connect_timeout = timeout;
        self
    }

    /// Set whether to exit on first connection failure
    #[must_use]
    pub const fn with_exit_on_first_failure(mut self, exit: bool) -> Self {
        self.exit_on_first_failure = exit;
        self
    }

    /// Create a fast reconnect configuration
    #[must_use]
    pub fn fast() -> Self {
        Self {
            retry_strategy: Box::new(ExponentialBackoff::fast()),
            connect_timeout: Duration::from_secs(10),
            exit_on_first_failure: false,
        }
    }

    /// Create a stable connection configuration
    #[must_use]
    pub fn stable() -> Self {
        Self {
            retry_strategy: Box::new(ExponentialBackoff::conservative()),
            connect_timeout: Duration::from_secs(60),
            exit_on_first_failure: false,
        }
    }
}

/// Connection supervisor - manages the connection lifecycle
pub struct ConnectionSupervisor<C: Connector = DefaultConnector> {
    /// URI to connect to
    uri: String,
    /// Connector instance
    connector: C,
    /// Configuration
    config: SupervisorConfig,
    /// Connection state
    state: Arc<ConnectionState>,
    /// Event broadcaster
    event_tx: broadcast::Sender<ConnectionEvent>,
    /// Shutdown flag
    shutdown: Arc<AtomicBool>,
}

impl ConnectionSupervisor<DefaultConnector> {
    /// Create a new supervisor with default connector
    pub fn new(uri: impl Into<String>) -> Self {
        Self::with_connector(uri, DefaultConnector::new())
    }
}

impl<C: Connector> ConnectionSupervisor<C> {
    /// Create a new supervisor with custom connector
    pub fn with_connector(uri: impl Into<String>, connector: C) -> Self {
        let (event_tx, _) = broadcast::channel(64);

        Self {
            uri: uri.into(),
            connector,
            config: SupervisorConfig::default(),
            state: Arc::new(ConnectionState::new()),
            event_tx,
            shutdown: Arc::new(AtomicBool::new(false)),
        }
    }

    /// Configure the supervisor
    #[must_use]
    pub fn with_config(mut self, config: SupervisorConfig) -> Self {
        self.config = config;
        self
    }

    /// Get the URI
    pub fn uri(&self) -> &str {
        &self.uri
    }

    /// Get the current connection state snapshot
    pub async fn snapshot(&self) -> ConnectionSnapshot {
        self.state.snapshot().await
    }

    /// Check if currently connected
    pub fn is_connected(&self) -> bool {
        self.state.is_connected()
    }

    /// Get the current connection ID
    pub fn connection_id(&self) -> u64 {
        self.state.id()
    }

    /// Subscribe to connection events
    pub fn subscribe(&self) -> broadcast::Receiver<ConnectionEvent> {
        self.event_tx.subscribe()
    }

    /// Get a lightweight activity handle to update last-activity without borrowing self
    pub fn activity_handle(&self) -> ActivityHandle {
        ActivityHandle {
            state: self.state.clone(),
        }
    }

    /// Emit a fatal error event (used by higher layers for unrecoverable initialization failures)
    pub fn fatal(&self, error: ConnectError) {
        let _ = self.event_tx.send(ConnectionEvent::FatalError { error });
    }

    /// Request shutdown
    pub fn shutdown(&self) {
        self.state.mark_shutting_down();
        self.shutdown.store(true, Ordering::Release);
        let _ = self.event_tx.send(ConnectionEvent::Shutdown);
    }

    /// Check if shutdown was requested
    pub fn is_shutdown_requested(&self) -> bool {
        self.shutdown.load(Ordering::Acquire) || self.state.is_shutdown_requested()
    }

    /// Emit an event
    fn emit(&self, event: ConnectionEvent) {
        let _ = self.event_tx.send(event);
    }

    /// Connect with retry logic
    ///
    /// Attempts to connect to the configured URI, retrying according to the
    /// configured retry strategy.
    ///
    /// # Errors
    ///
    /// - Returns [`SupervisorError::Shutdown`] if shutdown was requested during connection.
    /// - Returns [`SupervisorError::Fatal`] if `exit_on_first_failure` is set and first connection fails.
    /// - Returns [`SupervisorError::MaxRetriesExceeded`] if the retry strategy is exhausted.
    #[allow(clippy::too_many_lines)]
    pub async fn connect(&self) -> Result<C::Stream, SupervisorError> {
        let mut retry_strategy = self.config.retry_strategy.clone();
        let mut attempt = 0u32;
        let mut is_first_attempt = true;

        loop {
            // Check for shutdown
            if self.is_shutdown_requested() {
                return Err(SupervisorError::Shutdown);
            }

            attempt += 1;

            // Update state
            if is_first_attempt {
                self.state.mark_connecting();
            } else {
                self.state.mark_reconnecting();
            }

            self.emit(ConnectionEvent::Connecting { attempt });

            // Attempt connection with timeout
            let connect_result = tokio::time::timeout(
                self.config.connect_timeout,
                self.connector.connect(&self.uri),
            )
            .await;

            match connect_result {
                Ok(Ok((stream, _response))) => {
                    // Success!
                    let id = self.state.mark_connected().await;
                    retry_strategy.reset();

                    tracing::info!(
                        uri = %self.uri,
                        connection_id = id,
                        attempt = attempt,
                        "Connection established"
                    );

                    self.emit(ConnectionEvent::Connected { id });
                    return Ok(stream);
                }
                Ok(Err(error)) => {
                    // Connection error
                    self.state.record_error(error.clone()).await;
                    self.emit(ConnectionEvent::Error {
                        error: error.clone(),
                        attempt,
                    });

                    tracing::warn!(
                        uri = %self.uri,
                        attempt = attempt,
                        error = ?error,
                        "Connection failed"
                    );

                    // Check if we should retry
                    if is_first_attempt && self.config.exit_on_first_failure {
                        self.emit(ConnectionEvent::FatalError {
                            error: error.clone(),
                        });
                        return Err(SupervisorError::Fatal(error.to_string()));
                    }

                    if let Some(delay) = retry_strategy.next_delay(&error, attempt) {
                        self.emit(ConnectionEvent::ReconnectScheduled { delay, attempt });

                        tracing::debug!(
                            delay = ?delay,
                            attempt = attempt,
                            "Scheduling reconnection"
                        );

                        // Wait for delay, checking shutdown periodically
                        tokio::time::sleep(delay).await;
                        if self.is_shutdown_requested() {
                            return Err(SupervisorError::Shutdown);
                        }
                    } else {
                        // No more retries
                        self.emit(ConnectionEvent::FatalError {
                            error: error.clone(),
                        });
                        return Err(SupervisorError::MaxRetriesExceeded { attempts: attempt });
                    }
                }
                Err(_) => {
                    // Timeout
                    let error = ConnectError::Timeout(self.config.connect_timeout);
                    self.state.record_error(error.clone()).await;
                    self.emit(ConnectionEvent::Error {
                        error: error.clone(),
                        attempt,
                    });

                    tracing::warn!(
                        uri = %self.uri,
                        attempt = attempt,
                        timeout = ?self.config.connect_timeout,
                        "Connection timeout"
                    );

                    if let Some(delay) = retry_strategy.next_delay(&error, attempt) {
                        self.emit(ConnectionEvent::ReconnectScheduled { delay, attempt });

                        tokio::time::sleep(delay).await;
                        if self.is_shutdown_requested() {
                            return Err(SupervisorError::Shutdown);
                        }
                    } else {
                        self.emit(ConnectionEvent::FatalError {
                            error: error.clone(),
                        });
                        return Err(SupervisorError::MaxRetriesExceeded { attempts: attempt });
                    }
                }
            }

            is_first_attempt = false;
        }
    }

    /// Mark as disconnected (called externally when connection is lost)
    pub async fn mark_disconnected(&self, reason: DisconnectReason) {
        self.state.mark_disconnected(reason.clone()).await;
        self.emit(ConnectionEvent::Disconnected { reason });
    }

    /// Update activity timestamp
    pub async fn update_activity(&self) {
        self.state.update_activity().await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // Note: Full tests would require a MockConnector implementation
    // These are basic tests for the API

    #[test]
    fn test_supervisor_config() {
        let config = SupervisorConfig::fast();
        assert_eq!(config.connect_timeout, Duration::from_secs(10));
    }

    #[test]
    fn test_supervisor_creation() {
        let supervisor = ConnectionSupervisor::new("wss://example.com/ws");
        assert_eq!(supervisor.uri(), "wss://example.com/ws");
        assert!(!supervisor.is_connected());
    }

    #[test]
    fn test_supervisor_shutdown() {
        let supervisor = ConnectionSupervisor::new("wss://example.com/ws");
        assert!(!supervisor.is_shutdown_requested());

        supervisor.shutdown();
        assert!(supervisor.is_shutdown_requested());
    }
}