fast_websocket_client 0.4.1

Tokio-native WebSocket client for Rust. High-throughput, low-latency, callback-driven, proxy-ready.
Documentation
use futures::FutureExt; // for now_or_never

use std::time::{Duration, Instant};

use fast_websocket_client::{OpCode, base_client};

#[derive(serde::Serialize)]
struct Subscription {
    method: String,
    params: Vec<String>,
    id: u128,
}

async fn subscribe(
    client: &mut base_client::Online,
    started_at: Instant,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let data = Subscription {
        method: "SUBSCRIBE".to_string(),
        params: vec!["btcusdt@bookTicker".to_string()],
        id: started_at.elapsed().as_nanos(),
    };
    // 짧은 timeout 예시
    tokio::time::timeout(Duration::from_millis(0), client.send_json(&data)).await??;
    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let started_at = Instant::now();
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();

    let url = "wss://data-stream.binance.vision:9443/ws/bakeusdt@bookTicker";

    let handle = runtime.spawn(async move {
        'reconnect_loop: loop {
            // 1) 연결 시도
            let connect_future = base_client::connect(url);
            let mut client: base_client::Online = match connect_future.await {
                Ok(c) => {
                    println!("connected");
                    c
                }
                Err(e) => {
                    eprintln!("Reconnecting from an Error: {e:?}");
                    tokio::time::sleep(Duration::from_secs(10)).await;
                    continue;
                }
            };

            // 2) 연결 직후 구독 시도
            if let Err(e) = subscribe(&mut client, started_at).await {
                eprintln!("Reconnecting from an Error: {e:?}");
                let _ = client.send_close("").await;
                tokio::time::sleep(Duration::from_secs(10)).await;
                continue;
            }

            // 3) 메시지 처리 루프
            'message_loop: loop {
                // ----------------------------
                // A) 즉시 읽을 수 있는 메시지 모두 수집
                // ----------------------------
                let mut pending_messages = Vec::new(); // 메시지를 임시 저장할 벡터

                loop {
                    let fut = client.receive_frame();
                    match fut.now_or_never() {
                        Some(Ok(frame)) => {
                            match frame.opcode {
                                OpCode::Text => {
                                    let payload = match simdutf8::basic::from_utf8(&frame.payload) {
                                        Ok(s) => s.to_string(), // 저장을 위해 소유권 있는 String
                                        Err(e) => {
                                            eprintln!("Reconnecting from an Error: {e:?}");
                                            let _ = client.send_close("").await;
                                            break 'message_loop;
                                        }
                                    };
                                    pending_messages.push(payload); // 벡터에 저장
                                }
                                OpCode::Close => {
                                    println!(
                                        "(immediate close) {:?}",
                                        String::from_utf8_lossy(&frame.payload[2..])
                                    );
                                    break 'reconnect_loop;
                                }
                                _ => {}
                            }
                        }
                        Some(Err(e)) => {
                            eprintln!("Reconnecting from an Error (immediate): {e:?}");
                            let _ = client.send_close("").await;
                            break 'message_loop;
                        }
                        None => {
                            // 더 이상 즉시 읽을 메시지가 없으면 루프 종료
                            break;
                        }
                    }
                }

                // 벡터에 모은 메시지들 한 번에 출력
                for msg in pending_messages {
                    println!("(immediate batch) {msg}");
                }

                // ----------------------------
                // B) await로 새 메시지 기다림
                // ----------------------------
                let frame = match client.receive_frame().await {
                    Ok(f) => f,
                    Err(e) => {
                        eprintln!("Reconnecting from an Error (await): {e:?}");
                        let _ = client.send_close("").await;
                        break 'message_loop;
                    }
                };

                match frame.opcode {
                    OpCode::Text => {
                        let payload = match simdutf8::basic::from_utf8(&frame.payload) {
                            Ok(s) => s,
                            Err(e) => {
                                eprintln!("Reconnecting from an Error: {e:?}");
                                let _ = client.send_close("").await;
                                break 'message_loop;
                            }
                        };
                        println!("(await) {payload}");
                    }
                    OpCode::Close => {
                        println!(
                            "(await close) {:?}",
                            String::from_utf8_lossy(&frame.payload[2..])
                        );
                        break 'reconnect_loop;
                    }
                    _ => {}
                }
            }
        }
    });

    runtime.block_on(handle)?;
    Ok(())
}