polynode 0.8.1

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock};

use crate::error::{Error, Result};
use crate::types::orderbook::{ObMessage, OrderbookLevel, OrderbookUpdate};
use super::state::LocalOrderbook;
use super::stream::{ObStream, ObStreamOptions};

/// Options for the orderbook engine.
#[derive(Debug, Clone)]
pub struct EngineOptions {
    pub compress: bool,
    pub auto_reconnect: bool,
    pub max_reconnect_attempts: Option<u32>,
}

impl Default for EngineOptions {
    fn default() -> Self {
        Self {
            compress: true,
            auto_reconnect: true,
            max_reconnect_attempts: None,
        }
    }
}

struct ViewInner {
    tokens: HashSet<String>,
    tx: mpsc::Sender<OrderbookUpdate>,
}

/// High-level orderbook client. Manages one WebSocket connection, maintains
/// local state for all subscribed tokens, and routes updates to filtered views.
///
/// ```rust,no_run
/// use polynode::orderbook::engine::{OrderbookEngine, EngineOptions};
///
/// # async fn example() -> polynode::Result<()> {
/// let engine = OrderbookEngine::connect("pn_live_...", EngineOptions::default()).await?;
/// engine.subscribe(vec!["token_a".into(), "token_b".into()]).await?;
///
/// let mut view = engine.view(vec!["token_a".into()]);
/// println!("midpoint: {:?}", view.midpoint("token_a"));
/// # Ok(())
/// # }
/// ```
pub struct OrderbookEngine {
    state: Arc<RwLock<LocalOrderbook>>,
    views: Arc<RwLock<Vec<Arc<RwLock<ViewInner>>>>>,
    cmd_tx: mpsc::Sender<EngineCommand>,
    _task: tokio::task::JoinHandle<()>,
}

enum EngineCommand {
    Subscribe(Vec<String>),
    Close,
}

impl OrderbookEngine {
    /// Connect to the orderbook WebSocket and start the background processing task.
    pub async fn connect(api_key: &str, options: EngineOptions) -> Result<Self> {
        let ob_url = "wss://ob.polynode.dev/ws";
        let stream_opts = ObStreamOptions {
            compress: options.compress,
            auto_reconnect: options.auto_reconnect,
            max_reconnect_attempts: options.max_reconnect_attempts,
            ..Default::default()
        };

        let mut stream = ObStream::connect(api_key, ob_url, stream_opts).await?;

        let state = Arc::new(RwLock::new(LocalOrderbook::new()));
        let views: Arc<RwLock<Vec<Arc<RwLock<ViewInner>>>>> = Arc::new(RwLock::new(Vec::new()));
        let (cmd_tx, mut cmd_rx) = mpsc::channel::<EngineCommand>(64);

        let state_clone = state.clone();
        let views_clone = views.clone();

        let task = tokio::spawn(async move {
            loop {
                tokio::select! {
                    msg = stream.next() => {
                        match msg {
                            Some(Ok(ObMessage::Update(update))) => {
                                // Apply to shared state
                                {
                                    let mut s = state_clone.write().await;
                                    match &update {
                                        OrderbookUpdate::Snapshot(snap) => s.apply_snapshot(snap),
                                        OrderbookUpdate::Update(u) => s.apply_update(u),
                                        OrderbookUpdate::PriceChange(_) => {},
                                    }
                                }

                                // Route to matching views
                                let asset_id = match &update {
                                    OrderbookUpdate::Snapshot(s) => &s.asset_id,
                                    OrderbookUpdate::Update(u) => &u.asset_id,
                                    OrderbookUpdate::PriceChange(_) => continue,
                                };

                                let views = views_clone.read().await;
                                for view_arc in views.iter() {
                                    let view = view_arc.read().await;
                                    if view.tokens.contains(asset_id) {
                                        let _ = view.tx.try_send(update.clone());
                                    }
                                }
                            }
                            Some(Ok(_)) => {} // SnapshotsDone, Subscribed, etc.
                            Some(Err(_)) => {} // Errors handled by auto-reconnect
                            None => break, // Stream closed
                        }
                    }
                    cmd = cmd_rx.recv() => {
                        match cmd {
                            Some(EngineCommand::Subscribe(ids)) => {
                                let _ = stream.subscribe(ids).await;
                            }
                            Some(EngineCommand::Close) | None => break,
                        }
                    }
                }
            }
        });

        Ok(Self {
            state,
            views,
            cmd_tx,
            _task: task,
        })
    }

    /// Subscribe to orderbook updates. Accepts token IDs, slugs, or condition IDs.
    pub async fn subscribe(&self, identifiers: Vec<String>) -> Result<()> {
        self.cmd_tx
            .send(EngineCommand::Subscribe(identifiers))
            .await
            .map_err(|_| Error::Disconnected)
    }

    /// Create a filtered view. Only receives updates for the specified tokens.
    pub fn view(&self, token_ids: Vec<String>) -> EngineView {
        let (tx, rx) = mpsc::channel(1024);
        let inner = Arc::new(RwLock::new(ViewInner {
            tokens: token_ids.into_iter().collect(),
            tx,
        }));

        // Register with engine
        let views = self.views.clone();
        let inner_clone = inner.clone();
        tokio::spawn(async move {
            views.write().await.push(inner_clone);
        });

        EngineView {
            state: self.state.clone(),
            _inner: inner,
            rx,
        }
    }

    /// Direct handle to the underlying local orderbook state. Useful for
    /// callers who want to hold the read lock and inspect or iterate the
    /// full state themselves.
    pub fn state(&self) -> Arc<RwLock<LocalOrderbook>> {
        self.state.clone()
    }

    /// Get the midpoint price for any token.
    pub async fn midpoint(&self, token_id: &str) -> Option<f64> {
        self.state.read().await.midpoint(token_id)
    }

    /// Get the bid-ask spread for any token.
    pub async fn spread(&self, token_id: &str) -> Option<f64> {
        self.state.read().await.spread(token_id)
    }

    /// Get the best bid for any token.
    pub async fn best_bid(&self, token_id: &str) -> Option<OrderbookLevel> {
        self.state.read().await.best_bid(token_id).cloned()
    }

    /// Get the best ask for any token.
    pub async fn best_ask(&self, token_id: &str) -> Option<OrderbookLevel> {
        self.state.read().await.best_ask(token_id).cloned()
    }

    /// Get the full book for any token.
    pub async fn book(&self, token_id: &str) -> Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await
            .get_book(token_id)
            .map(|(b, a)| (b.to_vec(), a.to_vec()))
    }

    /// Number of tracked assets in local state.
    pub async fn len(&self) -> usize {
        self.state.read().await.len()
    }

    /// All currently tracked asset IDs.
    pub async fn tracked_tokens(&self) -> Vec<String> {
        self.state.read().await.tracked_tokens()
    }

    /// When the local copy of this asset was last touched (snapshot or delta).
    pub async fn last_change(&self, token_id: &str) -> Option<Instant> {
        self.state.read().await.last_change(token_id)
    }

    /// All tracked tokens whose last update is older than `threshold`.
    pub async fn inactive_since(&self, threshold: Duration) -> Vec<String> {
        self.state.read().await.inactive_since(threshold)
    }

    /// Midpoints for the requested tokens (missing tokens are omitted).
    pub async fn midpoints(&self, token_ids: &[String]) -> HashMap<String, f64> {
        self.state.read().await.midpoints(token_ids)
    }

    /// Midpoints for every tracked token.
    pub async fn midpoints_all(&self) -> HashMap<String, f64> {
        self.state.read().await.midpoints_all()
    }

    /// Spreads for the requested tokens (missing tokens are omitted).
    pub async fn spreads(&self, token_ids: &[String]) -> HashMap<String, f64> {
        self.state.read().await.spreads(token_ids)
    }

    /// Spreads for every tracked token.
    pub async fn spreads_all(&self) -> HashMap<String, f64> {
        self.state.read().await.spreads_all()
    }

    /// Best bids for the requested tokens (missing tokens are omitted).
    pub async fn best_bids(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_bids(token_ids)
    }

    /// Best bids for every tracked token.
    pub async fn best_bids_all(&self) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_bids_all()
    }

    /// Best asks for the requested tokens (missing tokens are omitted).
    pub async fn best_asks(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_asks(token_ids)
    }

    /// Best asks for every tracked token.
    pub async fn best_asks_all(&self) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_asks_all()
    }

    /// Full books for the requested tokens (missing tokens are omitted).
    pub async fn books(&self, token_ids: &[String]) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await.books(token_ids)
    }

    /// Full books for every tracked token.
    pub async fn books_all(&self) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await.books_all()
    }

    /// Close the connection and clean up.
    pub async fn close(self) -> Result<()> {
        let _ = self.cmd_tx.send(EngineCommand::Close).await;
        Ok(())
    }
}

/// A filtered view over the shared orderbook state.
/// Only receives updates for its configured tokens.
pub struct EngineView {
    state: Arc<RwLock<LocalOrderbook>>,
    _inner: Arc<RwLock<ViewInner>>,
    rx: mpsc::Receiver<OrderbookUpdate>,
}

impl EngineView {
    /// Receive the next update for this view's tokens.
    pub async fn next(&mut self) -> Option<OrderbookUpdate> {
        self.rx.recv().await
    }

    /// Change which tokens this view tracks.
    pub async fn set_tokens(&self, token_ids: Vec<String>) {
        let mut inner = self._inner.write().await;
        inner.tokens = token_ids.into_iter().collect();
    }

    /// Get the midpoint price for a token.
    pub async fn midpoint(&self, token_id: &str) -> Option<f64> {
        self.state.read().await.midpoint(token_id)
    }

    /// Get the bid-ask spread for a token.
    pub async fn spread(&self, token_id: &str) -> Option<f64> {
        self.state.read().await.spread(token_id)
    }

    /// Get the best bid for a token.
    pub async fn best_bid(&self, token_id: &str) -> Option<OrderbookLevel> {
        self.state.read().await.best_bid(token_id).cloned()
    }

    /// Get the best ask for a token.
    pub async fn best_ask(&self, token_id: &str) -> Option<OrderbookLevel> {
        self.state.read().await.best_ask(token_id).cloned()
    }

    /// Get the full book for a token.
    pub async fn book(&self, token_id: &str) -> Option<(Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await
            .get_book(token_id)
            .map(|(b, a)| (b.to_vec(), a.to_vec()))
    }

    /// All currently tracked asset IDs in the underlying engine state.
    pub async fn tracked_tokens(&self) -> Vec<String> {
        self.state.read().await.tracked_tokens()
    }

    /// When the local copy of this asset was last touched.
    pub async fn last_change(&self, token_id: &str) -> Option<Instant> {
        self.state.read().await.last_change(token_id)
    }

    /// All tracked tokens whose last update is older than `threshold`.
    pub async fn inactive_since(&self, threshold: Duration) -> Vec<String> {
        self.state.read().await.inactive_since(threshold)
    }

    /// Midpoints for the requested tokens (missing tokens are omitted).
    pub async fn midpoints(&self, token_ids: &[String]) -> HashMap<String, f64> {
        self.state.read().await.midpoints(token_ids)
    }

    /// Midpoints for every tracked token in the underlying engine state.
    pub async fn midpoints_all(&self) -> HashMap<String, f64> {
        self.state.read().await.midpoints_all()
    }

    /// Spreads for the requested tokens (missing tokens are omitted).
    pub async fn spreads(&self, token_ids: &[String]) -> HashMap<String, f64> {
        self.state.read().await.spreads(token_ids)
    }

    /// Spreads for every tracked token in the underlying engine state.
    pub async fn spreads_all(&self) -> HashMap<String, f64> {
        self.state.read().await.spreads_all()
    }

    /// Best bids for the requested tokens (missing tokens are omitted).
    pub async fn best_bids(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_bids(token_ids)
    }

    /// Best bids for every tracked token in the underlying engine state.
    pub async fn best_bids_all(&self) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_bids_all()
    }

    /// Best asks for the requested tokens (missing tokens are omitted).
    pub async fn best_asks(&self, token_ids: &[String]) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_asks(token_ids)
    }

    /// Best asks for every tracked token in the underlying engine state.
    pub async fn best_asks_all(&self) -> HashMap<String, OrderbookLevel> {
        self.state.read().await.best_asks_all()
    }

    /// Full books for the requested tokens (missing tokens are omitted).
    pub async fn books(&self, token_ids: &[String]) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await.books(token_ids)
    }

    /// Full books for every tracked token in the underlying engine state.
    pub async fn books_all(&self) -> HashMap<String, (Vec<OrderbookLevel>, Vec<OrderbookLevel>)> {
        self.state.read().await.books_all()
    }
}