digdigdig3 0.1.30

Unified async Rust API for 44 exchange connectors — crypto, stocks, forex. REST + WebSocket.
Documentation
//! Crypto.com WebSocket Integration Tests
//!
//! Tests WebSocket connectivity and subscriptions against real Crypto.com API.
//! IMPORTANT: Crypto.com requires a 1-second delay after connection before sending requests.
//!
//! Run with:
//! ```text
//! cargo test --package digdigdig3 --lib crypto::cex::crypto_com::_tests_websocket -- --ignored --nocapture
//! ```
//!
//! NOTE: All tests connect to real Crypto.com endpoints and require network access.
//! NOTE: CryptoComWebSocket has inherent methods that shadow trait methods.
//! We use explicit UFCS to call the WebSocketConnector trait implementations.

use std::time::Duration;
use tokio::time::timeout;
use tokio_stream::StreamExt;

use crate::core::{AccountType, ConnectionStatus, StreamType, SubscriptionRequest, Symbol};
use crate::core::traits::WebSocketConnector;
use super::websocket::CryptoComWebSocket;

// ═══════════════════════════════════════════════════════════════════════════════
// TEST HELPERS
// ═══════════════════════════════════════════════════════════════════════════════

fn btc_usdt() -> Symbol {
    // Crypto.com Spot uses BTC_USDT format
    Symbol::new("BTC", "USDT")
}

// ═══════════════════════════════════════════════════════════════════════════════
// CAPABILITIES TEST
// ═══════════════════════════════════════════════════════════════════════════════

#[tokio::test]
#[ignore]
async fn test_orderbook_capabilities() {
    // CryptoComWebSocket::new() is a sync constructor
    let ws = CryptoComWebSocket::new(None, false);

    let caps = WebSocketConnector::orderbook_capabilities(&ws, AccountType::Spot);
    println!("Crypto.com orderbook capabilities: {:?}", caps);

    // Crypto.com supports depths 10, 50
    assert!(!caps.ws_depths.is_empty(), "Must have at least one depth level");
    println!("ws_depths: {:?}", caps.ws_depths);
    println!("supports_snapshot: {}", caps.supports_snapshot);
    println!("supports_delta: {}", caps.supports_delta);
}

// ═══════════════════════════════════════════════════════════════════════════════
// SUBSCRIPTION TESTS
//
// NOTE: CryptoComWebSocket has both inherent methods (connect(), event_stream())
// and WebSocketConnector trait implementations. The trait methods are called
// explicitly via UFCS to avoid resolving to the inherent methods.
// ═══════════════════════════════════════════════════════════════════════════════

#[tokio::test]
#[ignore]
async fn test_subscribe_orderbook() {
    let mut ws = CryptoComWebSocket::new(None, false);

    // Allow extra time: Crypto.com requires 1s delay after connect
    // Use trait method explicitly to avoid ambiguity with inherent connect()
    let connect_result = timeout(
        Duration::from_secs(15),
        WebSocketConnector::connect(&mut ws, AccountType::Spot),
    ).await;

    match connect_result {
        Ok(Ok(())) => {
            assert_eq!(WebSocketConnector::connection_status(&ws), ConnectionStatus::Connected);

            let mut sub = SubscriptionRequest::new(btc_usdt(), StreamType::Orderbook);
            sub.depth = Some(10);
            let result = WebSocketConnector::subscribe(&mut ws, sub).await;

            if result.is_err() {
                println!("Subscribe failed: {:?}", result.err());
                let _ = WebSocketConnector::disconnect(&mut ws).await;
                return;
            }

            println!("Subscribed to Crypto.com orderbook depth=10 — waiting for snapshot...");

            let mut stream = WebSocketConnector::event_stream(&ws);
            let ob_event = timeout(Duration::from_secs(20), async {
                use crate::core::StreamEvent;
                while let Some(event) = stream.next().await {
                    match event {
                        Ok(ev @ StreamEvent::OrderbookSnapshot(_)) | Ok(ev @ StreamEvent::OrderbookDelta(_)) => {
                            return ev;
                        }
                        Ok(other) => {
                            println!("Received non-orderbook event (skipping): {:?}", other);
                            continue;
                        }
                        Err(e) => panic!("Stream returned error: {:?}", e),
                    }
                }
                panic!("Stream ended without orderbook data");
            }).await.expect("Timeout waiting for orderbook data — Crypto.com did not send snapshot/delta within 20s");

            use crate::core::StreamEvent;
            if let StreamEvent::OrderbookSnapshot(ob) = &ob_event {
                assert!(!ob.bids.is_empty(), "Snapshot bids must not be empty");
                assert!(!ob.asks.is_empty(), "Snapshot asks must not be empty");

                let bid_prices: Vec<f64> = ob.bids.iter().map(|b| b.price).collect();
                let sorted_desc = bid_prices.windows(2).all(|w| w[0] >= w[1]);
                assert!(sorted_desc, "Bids must be sorted descending by price");

                let ask_prices: Vec<f64> = ob.asks.iter().map(|a| a.price).collect();
                let sorted_asc = ask_prices.windows(2).all(|w| w[0] <= w[1]);
                assert!(sorted_asc, "Asks must be sorted ascending by price");

                let best_bid = ob.bids[0].price;
                let best_ask = ob.asks[0].price;
                assert!(best_bid < best_ask, "Book must not be crossed: best_bid={} best_ask={}", best_bid, best_ask);

                println!("Orderbook snapshot OK: {} bids, {} asks, best_bid={}, best_ask={}", ob.bids.len(), ob.asks.len(), best_bid, best_ask);
            } else {
                println!("Received orderbook delta (no snapshot assertions): {:?}", ob_event);
            }

            let _ = WebSocketConnector::disconnect(&mut ws).await;
            println!("Crypto.com orderbook subscription works");
        }
        Ok(Err(e)) => println!("Connection failed: {:?}", e),
        Err(_) => println!("Connection timeout"),
    }
}

#[tokio::test]
#[ignore]
async fn test_subscribe_trades() {
    let mut ws = CryptoComWebSocket::new(None, false);

    let connect_result = timeout(
        Duration::from_secs(15),
        WebSocketConnector::connect(&mut ws, AccountType::Spot),
    ).await;

    match connect_result {
        Ok(Ok(())) => {
            let sub = SubscriptionRequest::new(btc_usdt(), StreamType::Trade);
            let result = WebSocketConnector::subscribe(&mut ws, sub).await;

            if result.is_err() {
                println!("Subscribe failed: {:?}", result.err());
                let _ = WebSocketConnector::disconnect(&mut ws).await;
                return;
            }

            println!("Subscribed to Crypto.com trades — waiting for trade...");

            let mut stream = WebSocketConnector::event_stream(&ws);
            let event = timeout(Duration::from_secs(20), stream.next()).await;

            if let Ok(Some(Ok(ev))) = event {
                println!("Received trade event: {:?}", ev);
            } else {
                println!("No trade event received within timeout (market may be slow)");
            }

            let _ = WebSocketConnector::disconnect(&mut ws).await;
            println!("Crypto.com trades subscription works");
        }
        Ok(Err(e)) => println!("Connection failed: {:?}", e),
        Err(_) => println!("Connection timeout"),
    }
}

#[tokio::test]
#[ignore]
async fn test_orderbook_depth_levels() {
    // Crypto.com valid depths: 10, 50
    let depths = [10u32, 50];

    for depth in &depths {
        println!("Testing Crypto.com depth={}...", depth);

        let mut ws = CryptoComWebSocket::new(None, false);

        let connect_result = timeout(
            Duration::from_secs(15),
            WebSocketConnector::connect(&mut ws, AccountType::Spot),
        ).await;

        match connect_result {
            Ok(Ok(())) => {
                let mut sub = SubscriptionRequest::new(btc_usdt(), StreamType::Orderbook);
                sub.depth = Some(*depth);

                if WebSocketConnector::subscribe(&mut ws, sub).await.is_err() {
                    println!("Subscribe failed for depth={}", depth);
                    let _ = WebSocketConnector::disconnect(&mut ws).await;
                    continue;
                }

                let mut stream = WebSocketConnector::event_stream(&ws);
                let event = timeout(Duration::from_secs(20), stream.next()).await;

                match event {
                    Ok(Some(Ok(ev))) => {
                        use crate::core::StreamEvent;
                        if let StreamEvent::OrderbookSnapshot(ob) = ev {
                            println!("Depth={}: {} bids, {} asks — OK", depth, ob.bids.len(), ob.asks.len());
                        } else {
                            println!("Depth={}: received event: {:?}", depth, ev);
                        }
                    }
                    Ok(Some(Err(e))) => println!("Depth={}: error: {:?}", depth, e),
                    Ok(None) => println!("Depth={}: stream ended", depth),
                    Err(_) => println!("Depth={}: timeout", depth),
                }

                let _ = WebSocketConnector::disconnect(&mut ws).await;
            }
            Ok(Err(e)) => println!("Connection failed for depth={}: {:?}", depth, e),
            Err(_) => println!("Connection timeout for depth={}", depth),
        }

        tokio::time::sleep(Duration::from_secs(2)).await;
    }
}