polynode 0.7.0

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
use polynode::{PolyNodeClient, WsMessage};
use polynode::ws::{Subscription, SubscriptionType, StreamOptions};
use std::time::Duration;

const API_KEY: &str = "pn_live_sdk_testing_enterprise_c0b4bfdff7bfd9f722bc89e1be83e965";

fn client() -> PolyNodeClient {
    PolyNodeClient::new(API_KEY).unwrap()
}

#[tokio::test]
async fn test_connect_and_subscribe() {
    let c = client();
    let mut stream = c.stream(StreamOptions::default()).await.unwrap();

    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .min_size(1.0)
            .snapshot_count(5)
    ).await.unwrap();

    // Should receive snapshot and/or subscribed ack
    let mut got_subscribed = false;
    let timeout = tokio::time::sleep(Duration::from_secs(10));
    tokio::pin!(timeout);

    loop {
        tokio::select! {
            msg = stream.next() => {
                match msg {
                    Some(Ok(WsMessage::Subscribed { subscription_id, .. })) => {
                        assert!(!subscription_id.is_empty());
                        got_subscribed = true;
                        break;
                    }
                    Some(Ok(WsMessage::Snapshot(_))) => {
                        // snapshot comes before subscribed, keep going
                    }
                    Some(Ok(WsMessage::Event(_))) => {
                        // events can come too
                    }
                    Some(Ok(_)) => {}
                    Some(Err(e)) => panic!("error: {}", e),
                    None => break,
                }
            }
            _ = &mut timeout => break,
        }
    }

    assert!(got_subscribed, "should have received subscribed ack");
    stream.close().await.unwrap();
}

#[tokio::test]
async fn test_receive_events() {
    let c = client();
    let mut stream = c.stream(StreamOptions::default()).await.unwrap();

    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .min_size(1.0)
            .snapshot_count(5)
    ).await.unwrap();

    let mut event_count = 0;
    let timeout = tokio::time::sleep(Duration::from_secs(15));
    tokio::pin!(timeout);

    loop {
        tokio::select! {
            msg = stream.next() => {
                match msg {
                    Some(Ok(WsMessage::Event(_))) | Some(Ok(WsMessage::Snapshot(_))) => {
                        event_count += 1;
                        if event_count >= 2 { break; }
                    }
                    Some(Ok(_)) => {}
                    Some(Err(e)) => panic!("error: {}", e),
                    None => break,
                }
            }
            _ = &mut timeout => break,
        }
    }

    assert!(event_count >= 1, "should have received at least 1 event, got {}", event_count);
    stream.close().await.unwrap();
}

#[tokio::test]
async fn test_filtered_subscription() {
    let c = client();
    let mut stream = c.stream(StreamOptions::default()).await.unwrap();

    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .min_size(100.0)
            .status("pending")
            .snapshot_count(3)
    ).await.unwrap();

    let mut got_subscribed = false;
    let timeout = tokio::time::sleep(Duration::from_secs(10));
    tokio::pin!(timeout);

    loop {
        tokio::select! {
            msg = stream.next() => {
                match msg {
                    Some(Ok(WsMessage::Subscribed { subscription_type, .. })) => {
                        assert_eq!(subscription_type, "settlements");
                        got_subscribed = true;
                        break;
                    }
                    Some(Ok(_)) => {}
                    Some(Err(e)) => panic!("error: {}", e),
                    None => break,
                }
            }
            _ = &mut timeout => break,
        }
    }

    assert!(got_subscribed);
    stream.close().await.unwrap();
}

#[tokio::test]
async fn test_compression() {
    let c = client();
    let mut stream = c.stream(StreamOptions {
        compress: true,
        ..Default::default()
    }).await.unwrap();

    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .min_size(1.0)
            .snapshot_count(5)
    ).await.unwrap();

    let mut event_count = 0;
    let timeout = tokio::time::sleep(Duration::from_secs(10));
    tokio::pin!(timeout);

    loop {
        tokio::select! {
            msg = stream.next() => {
                match msg {
                    Some(Ok(WsMessage::Event(_))) | Some(Ok(WsMessage::Snapshot(_))) => {
                        event_count += 1;
                        if event_count >= 1 { break; }
                    }
                    Some(Ok(_)) => {}
                    Some(Err(e)) => panic!("compressed error: {}", e),
                    None => break,
                }
            }
            _ = &mut timeout => break,
        }
    }

    assert!(event_count >= 1, "should receive events with compression, got {}", event_count);
    stream.close().await.unwrap();
}

#[tokio::test]
async fn test_multiple_subscriptions() {
    let c = client();
    let mut stream = c.stream(StreamOptions::default()).await.unwrap();

    // Both use wallet filters to avoid firehose limits
    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .wallets(vec!["0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e".into()])
            .snapshot_count(1)
    ).await.unwrap();

    stream.subscribe(
        Subscription::new(SubscriptionType::Settlements)
            .wallets(vec!["0x19db9b949dc686aa6e33f8a019c7b2bb0d4aabbf".into()])
            .snapshot_count(1)
    ).await.unwrap();

    let mut subscribed_count = 0;
    let timeout = tokio::time::sleep(Duration::from_secs(10));
    tokio::pin!(timeout);

    loop {
        tokio::select! {
            msg = stream.next() => {
                match msg {
                    Some(Ok(WsMessage::Subscribed { .. })) => {
                        subscribed_count += 1;
                        if subscribed_count >= 2 { break; }
                    }
                    Some(Ok(_)) => {}
                    Some(Err(e)) => panic!("error: {}", e),
                    None => break,
                }
            }
            _ = &mut timeout => break,
        }
    }

    assert_eq!(subscribed_count, 2, "should get 2 subscribed acks");
    stream.close().await.unwrap();
}