boomnet 0.0.78

Framework for building low latency clients on top of TCP.
Documentation
use ansi_term::Color::{Green, Purple, Red, Yellow};
use boomnet::service::endpoint::ws::{TlsWebsocket, TlsWebsocketEndpoint, TlsWebsocketEndpointWithContext};
use boomnet::service::endpoint::{Context, DisconnectReason};
use boomnet::stream::mio::{IntoMioStream, MioStream};
use boomnet::stream::tcp::TcpStream;
use boomnet::stream::tls::{IntoTlsStream, TlsConfigExt};
use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider};
use boomnet::ws::{IntoTlsWebsocket, IntoWebsocket, WebsocketFrame};
use log::{info, warn};
use std::io;
use std::net::SocketAddr;

pub struct FeedContext;
impl Context for FeedContext {}

impl FeedContext {
    #[allow(dead_code)]
    pub fn new() -> Self {
        Self
    }
}

pub struct TradeEndpoint {
    id: u32,
    connection_info: ConnectionInfo,
    instrument: &'static str,
    ws_endpoint: String,
    subscribe: bool,
}

impl TradeEndpoint {
    #[allow(dead_code)]
    pub fn new(id: u32, url: &'static str, net_iface: Option<&'static str>, instrument: &'static str) -> TradeEndpoint {
        Self::new_with_subscribe(id, url, net_iface, instrument, true)
    }

    pub fn new_with_subscribe(
        id: u32,
        url: &'static str,
        net_iface: Option<&'static str>,
        instrument: &'static str,
        subscribe: bool,
    ) -> TradeEndpoint {
        let (mut connection_info, ws_endpoint, _) = boomnet::ws::util::parse_url(url).unwrap();
        if let Some(net_iface) = net_iface {
            connection_info = connection_info.with_net_iface_from_name(net_iface);
        }
        Self {
            id,
            connection_info,
            instrument,
            ws_endpoint,
            subscribe,
        }
    }

    pub fn subscribe(&mut self, ws: &mut TlsWebsocket<MioStream>) -> io::Result<()> {
        ws.send_text(
            true,
            Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
        )?;
        Ok(())
    }

    #[inline]
    #[allow(dead_code)]
    pub fn poll(&mut self, ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>) -> io::Result<()> {
        for frame in ws.read_batch()? {
            if let WebsocketFrame::Text(fin, data) = frame? {
                match self.id % 4 {
                    0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
                    1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
                    2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
                    3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
                    _ => {}
                }
            }
        }
        Ok(())
    }

    #[inline]
    #[allow(dead_code)]
    pub fn poll_ctx(
        &mut self,
        ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpoint>::Stream>,
        _ctx: &mut FeedContext,
    ) -> io::Result<()> {
        for frame in ws.read_batch()? {
            if let WebsocketFrame::Text(fin, data) = frame? {
                match self.id % 4 {
                    0 => info!("({fin}) {}", Red.paint(String::from_utf8_lossy(data))),
                    1 => info!("({fin}) {}", Green.paint(String::from_utf8_lossy(data))),
                    2 => info!("({fin}) {}", Purple.paint(String::from_utf8_lossy(data))),
                    3 => info!("({fin}) {}", Yellow.paint(String::from_utf8_lossy(data))),
                    _ => {}
                }
            }
        }
        Ok(())
    }
}

impl ConnectionInfoProvider for TradeEndpoint {
    fn connection_info(&self) -> &ConnectionInfo {
        &self.connection_info
    }
}

impl TlsWebsocketEndpoint for TradeEndpoint {
    type Stream = MioStream;

    fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<Option<TlsWebsocket<Self::Stream>>> {
        let mut ws = TcpStream::try_from((&self.connection_info, addr))?
            .into_mio_stream()
            .into_tls_stream_with_config(|cfg| cfg.with_no_cert_verification())?
            .into_websocket(&self.ws_endpoint);

        if self.subscribe {
            self.subscribe(&mut ws)?;
        }

        Ok(Some(ws))
    }

    fn can_recreate(&mut self, reason: DisconnectReason) -> bool {
        warn!("connection disconnected: {reason}");
        true
    }
}

impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {
    type Stream = MioStream;

    fn create_websocket(
        &mut self,
        addr: SocketAddr,
        _ctx: &mut FeedContext,
    ) -> io::Result<Option<TlsWebsocket<Self::Stream>>> {
        let mut ws = TcpStream::try_from((&self.connection_info, addr))?
            .into_mio_stream()
            .into_tls_websocket(&self.ws_endpoint)?;

        if self.subscribe {
            self.subscribe(&mut ws)?;
        }

        Ok(Some(ws))
    }
}