use std::io;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use boomnet::service::IntoIOServiceWithContext;
use boomnet::service::endpoint::Context;
use boomnet::service::endpoint::ws::{TlsWebsocket, TlsWebsocketEndpointWithContext};
use boomnet::service::select::mio::MioSelector;
use boomnet::stream::mio::{IntoMioStream, MioStream};
use boomnet::stream::{ConnectionInfo, ConnectionInfoProvider};
use boomnet::ws::{IntoTlsWebsocket, WebsocketFrame};
use log::info;
use url::Url;
struct TradeEndpoint {
connection_info: ConnectionInfo,
instrument: &'static str,
next_disconnect_time_ns: u64,
}
impl TradeEndpoint {
pub fn new(url: &'static str, instrument: &'static str, ctx: &FeedContext) -> TradeEndpoint {
let connection_info = Url::parse(url).try_into().unwrap();
Self {
connection_info,
instrument,
next_disconnect_time_ns: ctx.current_time_ns() + Duration::from_secs(10).as_nanos() as u64,
}
}
#[inline]
fn poll(
&mut self,
ws: &mut TlsWebsocket<<Self as TlsWebsocketEndpointWithContext<FeedContext>>::Stream>,
ctx: &mut FeedContext,
) -> io::Result<()> {
while let Some(Ok(WebsocketFrame::Text(fin, data))) = ws.receive_next() {
info!("({fin}) {}", String::from_utf8_lossy(data));
}
let now_ns = ctx.current_time_ns();
if now_ns > self.next_disconnect_time_ns {
self.next_disconnect_time_ns = now_ns + Duration::from_secs(10).as_nanos() as u64;
return Err(io::Error::other("disconnected due to timer"));
}
Ok(())
}
}
#[derive(Debug)]
struct FeedContext;
impl Context for FeedContext {}
impl FeedContext {
pub fn new() -> Self {
Self
}
pub fn current_time_ns(&self) -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64
}
}
impl ConnectionInfoProvider for TradeEndpoint {
fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
}
impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {
type Stream = MioStream;
fn create_websocket(
&mut self,
addr: SocketAddr,
_ctx: &mut FeedContext,
) -> io::Result<Option<TlsWebsocket<Self::Stream>>> {
let mut ws = self
.connection_info
.clone()
.into_tcp_stream_with_addr(addr)?
.into_mio_stream()
.into_tls_websocket("/ws")?;
ws.send_text(
true,
Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
)?;
Ok(Some(ws))
}
}
fn main() -> anyhow::Result<()> {
env_logger::init();
let mut ctx = FeedContext::new();
let mut io_service = MioSelector::new()?.into_io_service_with_context();
let endpoint_btc = TradeEndpoint::new("wss://stream1.binance.com:443/ws", "btcusdt", &ctx);
io_service.register(endpoint_btc)?;
loop {
io_service.poll(&mut ctx, |ws, ctx, endpoint| endpoint.poll(ws, ctx))?;
}
}