fugle-marketdata-core 0.4.0

Internal kernel for the Fugle market data SDK. End users should depend on `fugle-marketdata` instead.
//! Connection state machine and event types.
//!
//! Runtime-free: this module depends only on `std::sync::mpsc` and
//! `std::time::Duration`. It is shared by both the sync `WebSocketClient`
//! (always compiled) and the async `aio::WebSocketClient` (behind the
//! `tokio-comp` feature).
//!
//! # Backpressure policy
//!
//! Events flow over `std::sync::mpsc::sync_channel(N)` where `N` is the
//! per-client `event_buffer` (default
//! [`DEFAULT_EVENT_BUFFER`](crate::websocket::DEFAULT_EVENT_BUFFER)). The
//! channel is **drop-newest**: when full, [`emit_event`] discards the
//! incoming event rather than blocking the network task. This is the only
//! safe choice because `std::sync::mpsc` does not expose receiver-side
//! access to the sender; switching to a primitive that does (e.g.
//! `tokio::sync::broadcast`) would break the `events()` /
//! `state_events()` API surface that bindings depend on.
//!
//! Drops are surfaced via:
//! - the per-client
//!   [`messages_dropped_total`](crate::aio::WebSocketClient::messages_dropped_total)
//!   counter (for the inbound *message* channel; the *event* channel
//!   shares the same drop-newest discipline but is small enough that
//!   saturation is rare); and
//! - a `tracing::warn!` at the saturation site when the `tracing` feature
//!   is enabled.
//!
//! Saturation is itself the bug signal — a healthy consumer never
//! approaches the configured cap.

use std::sync::mpsc;
use std::time::Duration;

/// Who initiated the disconnect captured by
/// [`ConnectionEvent::Disconnected`] / [`ConnectionState::Closed`].
///
/// Lets consumers branch on the cause without string-matching the
/// `reason` field.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DisconnectIntent {
    /// Local caller invoked `disconnect()` or `shutdown_with_timeout(...)`.
    Client,
    /// Server sent a Close frame (regardless of close code).
    Server,
    /// Transport-level failure: I/O error, EOF without Close frame,
    /// heartbeat timeout, etc.
    Network,
}

/// WebSocket connection state machine
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionState {
    /// Not connected
    Disconnected,
    /// Connecting to server
    Connecting,
    /// Authenticating with server
    Authenticating,
    /// Connected and authenticated
    Connected,
    /// Reconnecting after disconnection
    Reconnecting { attempt: u32 },
    /// Connection closed. `intent` mirrors the matching
    /// [`ConnectionEvent::Disconnected`] field so state inspection by the
    /// caller does not lose classification information.
    Closed {
        code: Option<u16>,
        reason: String,
        intent: DisconnectIntent,
    },
}

/// Events emitted by WebSocket connection.
///
/// Consumers attribute events to their source client via the
/// [`events()`](crate::aio::WebSocketClient::events) /
/// [`state_events()`](crate::aio::WebSocketClient::state_events)
/// `Receiver` they were yielded from — `tokio::select!` arms naturally
/// label by source, and code that merges streams from multiple clients
/// is expected to wrap with its own labeling adapter (3 lines via
/// `tokio_stream::StreamExt::map`). The SDK does not pre-empt that
/// decision by stuffing a label on every event.
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionEvent {
    /// Connection attempt started
    Connecting,
    /// Connection established
    Connected,
    /// Authentication successful
    Authenticated,
    /// Authentication rejected by the server (parallels old SDKs' `unauthenticated` event)
    Unauthenticated { message: String },
    /// Connection closed.
    ///
    /// `intent` classifies the originator: [`Client`](DisconnectIntent::Client)
    /// for local-initiated, [`Server`](DisconnectIntent::Server) for a
    /// peer Close frame, [`Network`](DisconnectIntent::Network) for
    /// transport errors / EOF / heartbeat timeout.
    Disconnected {
        code: Option<u16>,
        reason: String,
        intent: DisconnectIntent,
    },
    /// Reconnection attempt started
    Reconnecting { attempt: u32 },
    /// Reconnection failed after max attempts
    ReconnectFailed { attempts: u32 },
    /// Heartbeat timeout: no inbound frame received within the configured
    /// `heartbeat_timeout` window. Emitted by the dispatch loop when the
    /// read-site timeout fires; the dispatch loop returns immediately
    /// afterwards, which lets the reconnect path take over.
    HeartbeatTimeout { elapsed: Duration },
    /// Error occurred
    Error { message: String, code: i32 },
}

/// Emit a [`ConnectionEvent`] on the bounded event channel.
///
/// See the module-level documentation for the drop-newest backpressure
/// policy and how saturation is surfaced.
pub(crate) fn emit_event(tx: &mpsc::SyncSender<ConnectionEvent>, event: ConnectionEvent) {
    if let Err(mpsc::TrySendError::Full(dropped)) = tx.try_send(event) {
        crate::tracing_compat::warn!(
            target: "fugle_marketdata::ws",
            ?dropped,
            "event channel saturated; consumer is likely stuck"
        );
        let _ = dropped; // suppress unused warning when tracing feature is off
    }
}