polynode 0.7.3

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
//! Short-form stream: managed WS connection with auto-rotation.

use std::time::Duration;
use tokio::sync::mpsc;

use crate::error::Result;
use crate::ws::{WsStream, StreamOptions, Subscription as WsSub, SubscriptionType};
use crate::types::ws_messages::WsMessage;
use crate::types::events::PolyNodeEvent;
use crate::PolyNodeClient;

use super::{Coin, ShortFormInterval, ShortFormMessage, RotationInfo};
use super::discovery::{discover_markets, current_window_end};

/// Auto-rotating stream for short-form crypto markets.
pub struct ShortFormStream {
    rx: mpsc::Receiver<ShortFormMessage>,
    stop_tx: mpsc::Sender<()>,
}

impl ShortFormStream {
    /// Start the stream. Connects, discovers, subscribes, and begins rotation loop.
    pub(crate) async fn start(
        client: &PolyNodeClient,
        interval: ShortFormInterval,
        coins: Vec<Coin>,
        rotation_buffer: u64,
    ) -> Result<Self> {
        let (msg_tx, msg_rx) = mpsc::channel(1024);
        let (stop_tx, stop_rx) = mpsc::channel(1);

        let api_key = client.api_key.clone();
        let ws_url = client.ws_url.clone();
        let base_url = client.base_url.clone();
        let http = reqwest::Client::new();

        tokio::spawn(stream_task(
            http,
            api_key,
            ws_url,
            base_url,
            interval,
            coins,
            rotation_buffer,
            msg_tx,
            stop_rx,
        ));

        Ok(Self {
            rx: msg_rx,
            stop_tx,
        })
    }

    /// Receive the next message. Returns None when the stream is closed.
    pub async fn next(&mut self) -> Option<ShortFormMessage> {
        self.rx.recv().await
    }

    /// Stop the stream.
    pub async fn stop(self) {
        let _ = self.stop_tx.send(()).await;
    }
}

fn now_secs() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs() as i64
}

async fn stream_task(
    http: reqwest::Client,
    api_key: String,
    ws_url: String,
    base_url: String,
    interval: ShortFormInterval,
    coins: Vec<Coin>,
    rotation_buffer: u64,
    msg_tx: mpsc::Sender<ShortFormMessage>,
    mut stop_rx: mpsc::Receiver<()>,
) {
    // Connect WebSocket
    let mut ws = match WsStream::connect(&api_key, &ws_url, StreamOptions::default()).await {
        Ok(ws) => ws,
        Err(e) => {
            let _ = msg_tx
                .send(ShortFormMessage::Error(format!("WS connect failed: {}", e)))
                .await;
            return;
        }
    };

    loop {
        // Discover current window markets (includes price-to-beat)
        let markets = discover_markets(&http, &base_url, interval, &coins).await;

        if markets.is_empty() {
            let _ = msg_tx
                .send(ShortFormMessage::Error(format!(
                    "No {} markets found, retrying...",
                    interval
                )))
                .await;
        } else {
            // Subscribe with slug filters
            let slugs: Vec<String> = markets.iter().map(|m| m.slug.clone()).collect();
            let sub = WsSub::new(SubscriptionType::Settlements)
                .slugs(slugs)
                .status("all");

            if let Err(e) = ws.subscribe(sub).await {
                let _ = msg_tx
                    .send(ShortFormMessage::Error(format!("Subscribe failed: {}", e)))
                    .await;
            }

            // Emit rotation info
            let window_start = markets.iter().map(|m| m.window_start).min().unwrap_or(0);
            let window_end = markets.iter().map(|m| m.window_end).max().unwrap_or(0);
            let time_remaining = (window_end - now_secs()).max(0);
            let _ = msg_tx
                .send(ShortFormMessage::Rotation(RotationInfo {
                    interval,
                    markets,
                    window_start,
                    window_end,
                    time_remaining,
                }))
                .await;
        }

        // Compute deadline for next rotation
        let window_end = current_window_end(interval);
        let delay_secs = (window_end - now_secs() + rotation_buffer as i64).max(1) as u64;
        let deadline = tokio::time::Instant::now() + Duration::from_secs(delay_secs);

        // Event loop until rotation deadline or stop
        loop {
            tokio::select! {
                _ = stop_rx.recv() => {
                    let _ = ws.close().await;
                    return;
                }
                _ = tokio::time::sleep_until(deadline) => {
                    // Unsubscribe old, break to outer loop for re-discovery
                    let _ = ws.unsubscribe(None).await;
                    break;
                }
                msg = ws.next() => {
                    match msg {
                        Some(Ok(WsMessage::Event(event))) => {
                            if msg_tx.send(ShortFormMessage::Event(event)).await.is_err() {
                                return;
                            }
                        }
                        Some(Ok(WsMessage::Snapshot(events))) => {
                            for raw in events {
                                if let Ok(event) = serde_json::from_value::<PolyNodeEvent>(raw) {
                                    if msg_tx.send(ShortFormMessage::Event(event)).await.is_err() {
                                        return;
                                    }
                                }
                            }
                        }
                        Some(Ok(WsMessage::Error { message, .. })) => {
                            let _ = msg_tx.send(ShortFormMessage::Error(message)).await;
                        }
                        Some(Ok(_)) => {} // heartbeat, subscribed, etc.
                        Some(Err(e)) => {
                            let _ = msg_tx
                                .send(ShortFormMessage::Error(format!("WS error: {}", e)))
                                .await;
                        }
                        None => {
                            // Stream closed, try reconnecting
                            let _ = msg_tx
                                .send(ShortFormMessage::Error("WS closed, reconnecting...".into()))
                                .await;
                            match WsStream::connect(&api_key, &ws_url, StreamOptions::default()).await {
                                Ok(new_ws) => {
                                    ws = new_ws;
                                    break; // re-discover and re-subscribe
                                }
                                Err(e) => {
                                    let _ = msg_tx
                                        .send(ShortFormMessage::Error(format!("Reconnect failed: {}", e)))
                                        .await;
                                    return;
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}