use std::time::Duration;
use tokio::sync::mpsc;
use crate::error::Result;
use crate::ws::{WsStream, StreamOptions, Subscription as WsSub, SubscriptionType};
use crate::types::ws_messages::WsMessage;
use crate::types::events::PolyNodeEvent;
use crate::PolyNodeClient;
use super::{Coin, ShortFormInterval, ShortFormMessage, RotationInfo};
use super::discovery::{discover_markets, current_window_end};
pub struct ShortFormStream {
rx: mpsc::Receiver<ShortFormMessage>,
stop_tx: mpsc::Sender<()>,
}
impl ShortFormStream {
pub(crate) async fn start(
client: &PolyNodeClient,
interval: ShortFormInterval,
coins: Vec<Coin>,
rotation_buffer: u64,
) -> Result<Self> {
let (msg_tx, msg_rx) = mpsc::channel(1024);
let (stop_tx, stop_rx) = mpsc::channel(1);
let api_key = client.api_key.clone();
let ws_url = client.ws_url.clone();
let base_url = client.base_url.clone();
let http = reqwest::Client::new();
tokio::spawn(stream_task(
http,
api_key,
ws_url,
base_url,
interval,
coins,
rotation_buffer,
msg_tx,
stop_rx,
));
Ok(Self {
rx: msg_rx,
stop_tx,
})
}
pub async fn next(&mut self) -> Option<ShortFormMessage> {
self.rx.recv().await
}
pub async fn stop(self) {
let _ = self.stop_tx.send(()).await;
}
}
fn now_secs() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
async fn stream_task(
http: reqwest::Client,
api_key: String,
ws_url: String,
base_url: String,
interval: ShortFormInterval,
coins: Vec<Coin>,
rotation_buffer: u64,
msg_tx: mpsc::Sender<ShortFormMessage>,
mut stop_rx: mpsc::Receiver<()>,
) {
let mut ws = match WsStream::connect(&api_key, &ws_url, StreamOptions::default()).await {
Ok(ws) => ws,
Err(e) => {
let _ = msg_tx
.send(ShortFormMessage::Error(format!("WS connect failed: {}", e)))
.await;
return;
}
};
loop {
let markets = discover_markets(&http, &base_url, interval, &coins).await;
if markets.is_empty() {
let _ = msg_tx
.send(ShortFormMessage::Error(format!(
"No {} markets found, retrying...",
interval
)))
.await;
} else {
let slugs: Vec<String> = markets.iter().map(|m| m.slug.clone()).collect();
let sub = WsSub::new(SubscriptionType::Settlements)
.slugs(slugs)
.status("all");
if let Err(e) = ws.subscribe(sub).await {
let _ = msg_tx
.send(ShortFormMessage::Error(format!("Subscribe failed: {}", e)))
.await;
}
let window_start = markets.iter().map(|m| m.window_start).min().unwrap_or(0);
let window_end = markets.iter().map(|m| m.window_end).max().unwrap_or(0);
let time_remaining = (window_end - now_secs()).max(0);
let _ = msg_tx
.send(ShortFormMessage::Rotation(RotationInfo {
interval,
markets,
window_start,
window_end,
time_remaining,
}))
.await;
}
let window_end = current_window_end(interval);
let delay_secs = (window_end - now_secs() + rotation_buffer as i64).max(1) as u64;
let deadline = tokio::time::Instant::now() + Duration::from_secs(delay_secs);
loop {
tokio::select! {
_ = stop_rx.recv() => {
let _ = ws.close().await;
return;
}
_ = tokio::time::sleep_until(deadline) => {
let _ = ws.unsubscribe(None).await;
break;
}
msg = ws.next() => {
match msg {
Some(Ok(WsMessage::Event(event))) => {
if msg_tx.send(ShortFormMessage::Event(event)).await.is_err() {
return;
}
}
Some(Ok(WsMessage::Snapshot(events))) => {
for raw in events {
if let Ok(event) = serde_json::from_value::<PolyNodeEvent>(raw) {
if msg_tx.send(ShortFormMessage::Event(event)).await.is_err() {
return;
}
}
}
}
Some(Ok(WsMessage::Error { message, .. })) => {
let _ = msg_tx.send(ShortFormMessage::Error(message)).await;
}
Some(Ok(_)) => {} Some(Err(e)) => {
let _ = msg_tx
.send(ShortFormMessage::Error(format!("WS error: {}", e)))
.await;
}
None => {
let _ = msg_tx
.send(ShortFormMessage::Error("WS closed, reconnecting...".into()))
.await;
match WsStream::connect(&api_key, &ws_url, StreamOptions::default()).await {
Ok(new_ws) => {
ws = new_ws;
break; }
Err(e) => {
let _ = msg_tx
.send(ShortFormMessage::Error(format!("Reconnect failed: {}", e)))
.await;
return;
}
}
}
}
}
}
}
}
}