boomnet 0.0.78

Framework for building low latency clients on top of TCP.
Documentation
#![allow(unused)]

use std::io;
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;

use boomnet::inet::{IntoNetworkInterface, ToSocketAddr};
use boomnet::service::endpoint::Context;
use boomnet::service::endpoint::ws::{TlsWebsocket, TlsWebsocketEndpoint, TlsWebsocketEndpointWithContext};
use boomnet::service::select::mio::MioSelector;
use boomnet::service::{IntoIOService, IntoIOServiceWithContext};
use boomnet::stream::mio::{IntoMioStream, MioStream};
use boomnet::stream::tls::TlsStream;
use boomnet::stream::{BindAndConnect, ConnectionInfo, ConnectionInfoProvider};
use boomnet::ws::{IntoTlsWebsocket, Websocket, WebsocketFrame};
use idle::IdleStrategy;
use log::info;
use url::Url;

enum MarketDataEndpoint {
    Trade(TradeEndpoint),
    Ticker(TickerEndpoint),
}

impl MarketDataEndpoint {
    fn poll(&mut self, ws: &mut Websocket<TlsStream<<Self as TlsWebsocketEndpoint>::Stream>>) -> io::Result<()> {
        match self {
            MarketDataEndpoint::Ticker(ticker) => ticker.poll(ws),
            MarketDataEndpoint::Trade(trade) => trade.poll(ws),
        }
    }
}

impl ConnectionInfoProvider for MarketDataEndpoint {
    fn connection_info(&self) -> &ConnectionInfo {
        match self {
            MarketDataEndpoint::Ticker(ticker) => ticker.connection_info(),
            MarketDataEndpoint::Trade(trade) => trade.connection_info(),
        }
    }
}

impl TlsWebsocketEndpoint for MarketDataEndpoint {
    type Stream = MioStream;

    fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<Option<Websocket<TlsStream<Self::Stream>>>> {
        match self {
            MarketDataEndpoint::Ticker(ticker) => ticker.create_websocket(addr),
            MarketDataEndpoint::Trade(trade) => trade.create_websocket(addr),
        }
    }
}

struct TradeEndpoint {
    id: u32,
    connection_info: ConnectionInfo,
    instrument: &'static str,
}

impl TradeEndpoint {
    pub fn new(id: u32, url: &'static str, instrument: &'static str) -> TradeEndpoint {
        let connection_info = Url::parse(url).try_into().unwrap();
        Self {
            id,
            connection_info,
            instrument,
        }
    }

    #[inline]
    fn poll(&mut self, ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>) -> io::Result<()> {
        while let Some(Ok(WebsocketFrame::Text(fin, data))) = ws.receive_next() {
            info!("({fin}) {}", String::from_utf8_lossy(data));
        }
        Ok(())
    }
}

impl ConnectionInfoProvider for TradeEndpoint {
    fn connection_info(&self) -> &ConnectionInfo {
        &self.connection_info
    }
}

impl TlsWebsocketEndpoint for TradeEndpoint {
    type Stream = MioStream;

    fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<Option<TlsWebsocket<Self::Stream>>> {
        let mut ws = self
            .connection_info
            .clone()
            .into_tcp_stream_with_addr(addr)?
            .into_mio_stream()
            .into_tls_websocket("/ws")?;

        ws.send_text(
            true,
            Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
        )?;

        Ok(Some(ws))
    }
}

struct TickerEndpoint {
    id: u32,
    connection_info: ConnectionInfo,
    instrument: &'static str,
}

impl TickerEndpoint {
    pub fn new(id: u32, url: &'static str, instrument: &'static str) -> TickerEndpoint {
        let connection_info = Url::parse(url).try_into().unwrap();
        Self {
            id,
            connection_info,
            instrument,
        }
    }

    #[inline]
    fn poll(&mut self, ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>) -> io::Result<()> {
        while let Some(Ok(WebsocketFrame::Text(fin, data))) = ws.receive_next() {
            info!("({fin}) {}", String::from_utf8_lossy(data));
        }
        Ok(())
    }
}

impl ConnectionInfoProvider for TickerEndpoint {
    fn connection_info(&self) -> &ConnectionInfo {
        &self.connection_info
    }
}

impl TlsWebsocketEndpoint for TickerEndpoint {
    type Stream = MioStream;

    fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<Option<TlsWebsocket<Self::Stream>>> {
        let mut ws = self
            .connection_info
            .clone()
            .into_tcp_stream_with_addr(addr)?
            .into_mio_stream()
            .into_tls_websocket("/ws")?;

        ws.send_text(
            true,
            Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@ticker"],"id":1}}"#, self.instrument).as_bytes()),
        )?;

        Ok(Some(ws))
    }
}

fn main() -> anyhow::Result<()> {
    env_logger::init();

    let mut io_service = MioSelector::new()?.into_io_service();

    let ticker = MarketDataEndpoint::Ticker(TickerEndpoint::new(0, "wss://stream.binance.com:443/ws", "btcusdt"));
    let trade = MarketDataEndpoint::Trade(TradeEndpoint::new(1, "wss://stream.binance.com:443/ws", "ethusdt"));

    io_service.register(ticker);
    io_service.register(trade);

    loop {
        io_service.poll(|ws, endpoint| endpoint.poll(ws))?;
    }
}