use polynode::{PolyNodeClient, WsMessage};
use polynode::ws::{Subscription, SubscriptionType, StreamOptions};
use std::time::Duration;
const API_KEY: &str = "pn_live_sdk_testing_enterprise_c0b4bfdff7bfd9f722bc89e1be83e965";
fn client() -> PolyNodeClient {
PolyNodeClient::new(API_KEY).unwrap()
}
#[tokio::test]
async fn test_connect_and_subscribe() {
let c = client();
let mut stream = c.stream(StreamOptions::default()).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.min_size(1.0)
.snapshot_count(5)
).await.unwrap();
let mut got_subscribed = false;
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(WsMessage::Subscribed { subscription_id, .. })) => {
assert!(!subscription_id.is_empty());
got_subscribed = true;
break;
}
Some(Ok(WsMessage::Snapshot(_))) => {
}
Some(Ok(WsMessage::Event(_))) => {
}
Some(Ok(_)) => {}
Some(Err(e)) => panic!("error: {}", e),
None => break,
}
}
_ = &mut timeout => break,
}
}
assert!(got_subscribed, "should have received subscribed ack");
stream.close().await.unwrap();
}
#[tokio::test]
async fn test_receive_events() {
let c = client();
let mut stream = c.stream(StreamOptions::default()).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.min_size(1.0)
.snapshot_count(5)
).await.unwrap();
let mut event_count = 0;
let timeout = tokio::time::sleep(Duration::from_secs(15));
tokio::pin!(timeout);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(WsMessage::Event(_))) | Some(Ok(WsMessage::Snapshot(_))) => {
event_count += 1;
if event_count >= 2 { break; }
}
Some(Ok(_)) => {}
Some(Err(e)) => panic!("error: {}", e),
None => break,
}
}
_ = &mut timeout => break,
}
}
assert!(event_count >= 1, "should have received at least 1 event, got {}", event_count);
stream.close().await.unwrap();
}
#[tokio::test]
async fn test_filtered_subscription() {
let c = client();
let mut stream = c.stream(StreamOptions::default()).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.min_size(100.0)
.status("pending")
.snapshot_count(3)
).await.unwrap();
let mut got_subscribed = false;
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(WsMessage::Subscribed { subscription_type, .. })) => {
assert_eq!(subscription_type, "settlements");
got_subscribed = true;
break;
}
Some(Ok(_)) => {}
Some(Err(e)) => panic!("error: {}", e),
None => break,
}
}
_ = &mut timeout => break,
}
}
assert!(got_subscribed);
stream.close().await.unwrap();
}
#[tokio::test]
async fn test_compression() {
let c = client();
let mut stream = c.stream(StreamOptions {
compress: true,
..Default::default()
}).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.min_size(1.0)
.snapshot_count(5)
).await.unwrap();
let mut event_count = 0;
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(WsMessage::Event(_))) | Some(Ok(WsMessage::Snapshot(_))) => {
event_count += 1;
if event_count >= 1 { break; }
}
Some(Ok(_)) => {}
Some(Err(e)) => panic!("compressed error: {}", e),
None => break,
}
}
_ = &mut timeout => break,
}
}
assert!(event_count >= 1, "should receive events with compression, got {}", event_count);
stream.close().await.unwrap();
}
#[tokio::test]
async fn test_multiple_subscriptions() {
let c = client();
let mut stream = c.stream(StreamOptions::default()).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.wallets(vec!["0x4bfb41d5b3570defd03c39a9a4d8de6bd8b8982e".into()])
.snapshot_count(1)
).await.unwrap();
stream.subscribe(
Subscription::new(SubscriptionType::Settlements)
.wallets(vec!["0x19db9b949dc686aa6e33f8a019c7b2bb0d4aabbf".into()])
.snapshot_count(1)
).await.unwrap();
let mut subscribed_count = 0;
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
msg = stream.next() => {
match msg {
Some(Ok(WsMessage::Subscribed { .. })) => {
subscribed_count += 1;
if subscribed_count >= 2 { break; }
}
Some(Ok(_)) => {}
Some(Err(e)) => panic!("error: {}", e),
None => break,
}
}
_ = &mut timeout => break,
}
}
assert_eq!(subscribed_count, 2, "should get 2 subscribed acks");
stream.close().await.unwrap();
}