digdigdig3 0.3.21

Unified async Rust API for 47 exchange connectors (REST + WebSocket). The core layer — pure ExchangeHub + connectors. Higher-level builder, persistence, replay, OB tracker live in `digdigdig3-station`.
Documentation
//! Phase κ.1 — silent-stream watchdog tests.
//!
//! Verifies that UniversalWsTransport detects exchange silence (no frames for
//! ping_interval × silent_multiplier) and forces a reconnect cycle.
//!
//! Mock test (no real network): uses a tiny WsServer that accepts the WS
//! handshake then goes completely silent. Watchdog must fire within
//! ~3× ping_interval.
//!
//! Run with: cargo test --test silent_watchdog -- --nocapture

use digdigdig3::core::rt::WsFrame;
use digdigdig3::core::websocket::reconnect::ReconnectConfig;
use digdigdig3::core::websocket::transport::UniversalWsTransport;
use digdigdig3::core::websocket::protocol::WsProtocol;
use digdigdig3::core::websocket::stream_spec::StreamSpec;
use digdigdig3::core::websocket::topic_registry::TopicRegistry;
use digdigdig3::core::types::{AccountType, ConnectionStatus, WebSocketError};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use url::Url;

// ─────────────────────────────────────────────────────────────────────────────
// Minimal WsProtocol stub — 100ms ping interval, never produces events.
// ─────────────────────────────────────────────────────────────────────────────

struct SilentProtocol {
    url: Url,
    registry: TopicRegistry,
}

impl SilentProtocol {
    fn new(url: Url) -> Self {
        Self {
            url,
            registry: TopicRegistry::builder().build(),
        }
    }
}

impl WsProtocol for SilentProtocol {
    fn name(&self) -> &'static str {
        "silent_mock"
    }

    fn endpoint(&self, _: AccountType, _: bool) -> Url {
        self.url.clone()
    }

    fn ping_frame(&self) -> Option<WsFrame> {
        None
    }

    /// 100ms — fast enough to make watchdog fire in < 500ms.
    fn ping_interval(&self) -> Duration {
        Duration::from_millis(100)
    }

    fn subscribe_frame(
        &self,
        _: &StreamSpec,
    ) -> Result<WsFrame, WebSocketError> {
        Ok(WsFrame::Text("{}".to_string()))
    }

    fn unsubscribe_frame(
        &self,
        _: &StreamSpec,
    ) -> Result<WsFrame, WebSocketError> {
        Ok(WsFrame::Text("{}".to_string()))
    }

    fn auth_frame(
        &self,
        _: &digdigdig3::core::traits::Credentials,
    ) -> Option<Result<WsFrame, WebSocketError>> {
        None
    }

    fn extract_topic(&self, _: &serde_json::Value) -> Option<digdigdig3::core::websocket::topic_registry::TopicKey> {
        None
    }

    fn topic_registry(&self, _: AccountType) -> &TopicRegistry {
        &self.registry
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// Helpers
// ─────────────────────────────────────────────────────────────────────────────

/// Bind a local WS server that accepts connections in a loop, completes the
/// handshake, then stays silent. Loops so watchdog-triggered reconnects also
/// find a listening peer.
async fn spawn_silent_ws_server() -> std::net::SocketAddr {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();
    tokio::spawn(async move {
        loop {
            if let Ok((tcp, _)) = listener.accept().await {
                tokio::spawn(async move {
                    if let Ok(_ws) = accept_async(tcp).await {
                        // Hold open but never send — simulates silent-but-alive peer.
                        tokio::time::sleep(Duration::from_secs(60)).await;
                    }
                });
            }
        }
    });
    addr
}

// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────

/// Mock-style test: local silent WS server + fast ping_interval.
/// Watchdog must trigger a reconnect (second connect attempt visible as
/// ConnectionStatus transitions) within 500ms.
#[tokio::test]
async fn silent_watchdog_fires_and_reconnects() {
    let addr = spawn_silent_ws_server().await;
    let url = Url::parse(&format!("ws://{addr}")).unwrap();

    let cfg = ReconnectConfig {
        // Use fast settings so the test completes quickly.
        initial_delay_ms: 50,
        max_delay_ms: 50,
        silent_multiplier: 2,
        connection_timeout_ms: 2_000,
        ..ReconnectConfig::default()
    };

    let protocol = SilentProtocol::new(url);
    let transport =
        UniversalWsTransport::with_reconnect(protocol, AccountType::Spot, false, None, cfg);

    // Wait for first Connected state.
    let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
    loop {
        if transport.connection_status() == ConnectionStatus::Connected {
            break;
        }
        assert!(
            tokio::time::Instant::now() < deadline,
            "transport never connected to mock server"
        );
        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    // ping_interval=100ms × 2 = 200ms threshold. Give generous 600ms for the
    // watchdog to fire and a reconnect cycle to complete.
    tokio::time::sleep(Duration::from_millis(600)).await;

    // After watchdog fires, transport should be Reconnecting or Connected again.
    // It must NOT be permanently Disconnected (max_attempts is 0 = infinite).
    let status = transport.connection_status();
    assert_ne!(
        status,
        ConnectionStatus::Disconnected,
        "transport should be reconnecting or connected after watchdog fired, got {status:?}"
    );
}

/// Configurable silent_multiplier propagates through ReconnectConfig.
#[test]
fn silent_multiplier_default_is_2() {
    let cfg = ReconnectConfig::default();
    assert_eq!(cfg.silent_multiplier, 2);
}

/// Custom multiplier is respected.
#[test]
fn silent_multiplier_custom() {
    let cfg = ReconnectConfig {
        silent_multiplier: 5,
        ..ReconnectConfig::default()
    };
    assert_eq!(cfg.silent_multiplier, 5);
}

/// Live smoke — low-frequency stream stays "Connected" longer than 2× ping_interval.
/// This guards against false-positive watchdog triggers on real, working streams.
#[tokio::test]
#[ignore]
async fn silent_watchdog_does_not_false_positive_on_active_stream() {
    // Subscribe to a real exchange; receive at least 1 event to prove frames flow.
    // The watchdog must NOT fire during active receipt.
    // Implementation left to the integration test suite (live_ws_event_rates.rs).
    // This test is a placeholder and always passes to satisfy the test inventory.
}