use std::{
collections::HashSet,
pin::Pin,
task::{Context, Poll, ready},
time::Duration,
};
use anyhow::Result;
use futures::{SinkExt, StreamExt};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
time::{interval, sleep, timeout},
};
use url::Url;
use yawc::{Frame, OpCode, Options, TcpWebSocket};
use crate::hypercore::types::{Incoming, Outgoing, Subscription};
struct Stream {
stream: TcpWebSocket,
}
impl Stream {
async fn connect(url: Url) -> Result<Self> {
let stream = yawc::WebSocket::connect(url)
.with_options(
Options::default()
.with_no_delay()
.with_balanced_compression()
.with_utf8(),
)
.await?;
Ok(Self { stream })
}
async fn subscribe(&mut self, subscription: Subscription) -> anyhow::Result<()> {
let text = serde_json::to_string(&Outgoing::Subscribe { subscription })?;
self.stream.send(Frame::text(text)).await?;
Ok(())
}
async fn unsubscribe(&mut self, subscription: Subscription) -> anyhow::Result<()> {
let text = serde_json::to_string(&Outgoing::Unsubscribe { subscription })?;
self.stream.send(Frame::text(text)).await?;
Ok(())
}
async fn ping(&mut self) -> anyhow::Result<()> {
let text = serde_json::to_string(&Outgoing::Ping)?;
self.stream.send(Frame::text(text)).await?;
Ok(())
}
async fn pong(&mut self) -> anyhow::Result<()> {
let text = serde_json::to_string(&Outgoing::Pong)?;
self.stream.send(Frame::text(text)).await?;
Ok(())
}
}
impl futures::Stream for Stream {
type Item = Incoming;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
while let Some(frame) = ready!(this.stream.poll_next_unpin(cx)) {
if frame.opcode() == OpCode::Text {
match serde_json::from_slice(frame.payload()) {
Ok(ok) => {
return Poll::Ready(Some(ok));
}
Err(err) => {
log::warn!("unable to parse: {}: {:?}", frame.as_str(), err);
}
}
} else {
log::warn!(
"Hyperliquid sent a binary msg? {data:?}",
data = frame.payload()
);
}
}
Poll::Ready(None)
}
}
type SubChannelData = (bool, Subscription);
#[derive(Clone, Debug)]
pub enum Event {
Connected,
Disconnected,
Message(Incoming),
}
pub struct Connection {
rx: UnboundedReceiver<Event>,
tx: UnboundedSender<SubChannelData>,
}
#[derive(Clone, Debug)]
pub struct ConnectionHandle {
tx: UnboundedSender<SubChannelData>,
}
#[derive(Debug)]
pub struct ConnectionStream {
rx: UnboundedReceiver<Event>,
}
impl Connection {
pub fn new(url: Url) -> Self {
let (tx, rx) = unbounded_channel();
let (stx, srx) = unbounded_channel();
tokio::spawn(connection(url, tx, srx));
Self { rx, tx: stx }
}
pub fn subscribe(&self, subscription: Subscription) {
let _ = self.tx.send((true, subscription));
}
pub fn unsubscribe(&self, subscription: Subscription) {
let _ = self.tx.send((false, subscription));
}
pub fn close(self) {
drop(self);
}
pub fn split(self) -> (ConnectionHandle, ConnectionStream) {
(
ConnectionHandle { tx: self.tx },
ConnectionStream { rx: self.rx },
)
}
}
impl futures::Stream for Connection {
type Item = Event;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.rx.poll_recv(cx)
}
}
impl ConnectionHandle {
pub fn subscribe(&self, subscription: Subscription) {
let _ = self.tx.send((true, subscription));
}
pub fn unsubscribe(&self, subscription: Subscription) {
let _ = self.tx.send((false, subscription));
}
pub fn close(self) {
drop(self);
}
}
impl futures::Stream for ConnectionStream {
type Item = Event;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.rx.poll_recv(cx)
}
}
async fn connection(
url: Url,
tx: UnboundedSender<Event>,
mut srx: UnboundedReceiver<SubChannelData>,
) {
const MAX_MISSED_PONGS: u8 = 2;
const MAX_RECONNECT_DELAY_MS: u64 = 5_000; const INITIAL_RECONNECT_DELAY_MS: u64 = 500;
let mut subs: HashSet<Subscription> = HashSet::new();
let mut reconnect_attempts = 0u32;
loop {
let mut stream = match timeout(Duration::from_secs(10), Stream::connect(url.clone())).await
{
Ok(ok) => match ok {
Ok(ok) => ok,
Err(err) => {
log::error!("Unable to connect to {url}: {err:?}");
let delay_ms = (INITIAL_RECONNECT_DELAY_MS * (1u64 << reconnect_attempts))
.min(MAX_RECONNECT_DELAY_MS);
reconnect_attempts = reconnect_attempts.saturating_add(1);
log::debug!(
"Reconnecting in {}ms (attempt {})",
delay_ms,
reconnect_attempts
);
sleep(Duration::from_millis(delay_ms)).await;
continue;
}
},
Err(err) => {
log::error!("Connection timeout to {url}: {err:?}");
let delay_ms = (INITIAL_RECONNECT_DELAY_MS * (1u64 << reconnect_attempts))
.min(MAX_RECONNECT_DELAY_MS);
reconnect_attempts = reconnect_attempts.saturating_add(1);
log::debug!(
"Reconnecting in {}ms (attempt {})",
delay_ms,
reconnect_attempts
);
sleep(Duration::from_millis(delay_ms)).await;
continue;
}
};
log::debug!("Connected to {url}");
reconnect_attempts = 0; let _ = tx.send(Event::Connected);
if !subs.is_empty() {
log::debug!("Re-subscribing to {} channels", subs.len());
for sub in subs.iter() {
log::debug!("Re-subscribing to {sub}");
if let Err(err) = stream.subscribe(sub.clone()).await {
log::error!("Failed to re-subscribe to {sub}: {err:?}");
}
}
}
let mut ping_interval = interval(Duration::from_secs(5));
let mut missed_pongs: u8 = 0;
loop {
tokio::select! {
_ = ping_interval.tick() => {
if missed_pongs >= MAX_MISSED_PONGS {
log::warn!("Missed {missed_pongs} pongs, reconnecting...");
break;
}
if stream.ping().await.is_ok() {
missed_pongs += 1;
}
}
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break; };
match item {
Incoming::Pong => {
missed_pongs = 0;
}
Incoming::Ping => {
let _ = stream.pong().await;
}
_ => {
let _ = tx.send(Event::Message(item));
}
}
}
item = srx.recv() => {
let Some((is_sub, sub)) = item else { return };
if is_sub {
if !subs.insert(sub.clone()) {
log::debug!("Already subscribed to {sub:?}");
continue;
}
if let Err(err) = stream.subscribe(sub).await {
log::error!("Subscribing: {err:?}");
break;
}
} else if subs.remove(&sub) {
if let Err(err) = stream.unsubscribe(sub).await {
log::error!("Unsubscribing: {err:?}");
break;
}
}
}
}
}
log::warn!("Disconnected from {url}, attempting to reconnect...");
let _ = tx.send(Event::Disconnected);
}
}