polynode 0.13.4

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
//! Redemption watcher — monitors wallets for redeemable positions after oracle resolution.
//!
//! # Example
//!
//! ```rust,no_run
//! # async fn example() -> polynode::Result<()> {
//! let client = std::sync::Arc::new(polynode::PolyNodeClient::new("pn_live_...")?);
//! let mut watcher = polynode::RedemptionWatcher::new(client, Default::default());
//! watcher.start(&["0xabc..."]).await?;
//!
//! while let Some(alert) = watcher.next_alert().await {
//!     println!("{}: {} — {} (payout: ${})",
//!         alert.wallet, alert.market_title, alert.outcome, alert.estimated_payout_usd);
//! }
//! # Ok(())
//! # }
//! ```

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use serde::Serialize;

use crate::error::{Error, Result};
use crate::client::PolyNodeClient;
use crate::ws::StreamOptions;
use crate::types::ws_messages::WsMessage;

/// Configuration for the redemption watcher.
#[derive(Debug, Clone)]
pub struct RedemptionWatcherConfig {
    /// Track position changes via WebSocket. Default: true.
    pub track_position_changes: bool,
    /// Periodic REST refresh interval in seconds. 0 = disabled. Default: 300.
    pub refresh_interval_secs: u64,
    /// Enable zlib compression on WebSocket. Default: true.
    pub compress: bool,
}

impl Default for RedemptionWatcherConfig {
    fn default() -> Self {
        Self {
            track_position_changes: true,
            refresh_interval_secs: 300,
            compress: true,
        }
    }
}

/// Alert fired when an oracle resolves and a watched wallet holds a position.
#[derive(Debug, Clone, Serialize)]
pub struct RedeemableAlert {
    pub wallet: String,
    pub condition_id: String,
    pub token_id: String,
    pub outcome: String,
    pub winning_outcome: String,
    pub is_winner: bool,
    pub size: f64,
    pub estimated_payout_usd: f64,
    pub market_title: String,
    pub market_slug: String,
    pub market_image: Option<String>,
    pub resolved_price: f64,
    pub payouts: Vec<u64>,
    pub block_number: u64,
    pub timestamp: i64,
}

/// A tracked position for a wallet.
#[derive(Debug, Clone)]
pub struct TrackedPosition {
    pub wallet: String,
    pub token_id: String,
    pub condition_id: String,
    pub outcome: String,
    pub size: f64,
    pub market_title: String,
    pub market_slug: String,
    pub market_image: Option<String>,
    pub outcomes: Vec<String>,
    pub token_ids: Vec<String>,
}

/// Monitors wallets for redeemable positions after oracle resolution.
pub struct RedemptionWatcher {
    client: Arc<PolyNodeClient>,
    config: RedemptionWatcherConfig,
    by_condition: HashMap<String, Vec<TrackedPosition>>,
    by_wallet: HashMap<String, HashSet<String>>,
    alert_tx: tokio::sync::mpsc::UnboundedSender<RedeemableAlert>,
    alert_rx: tokio::sync::mpsc::UnboundedReceiver<RedeemableAlert>,
    closed: bool,
}

impl RedemptionWatcher {
    pub fn new(client: Arc<PolyNodeClient>, config: RedemptionWatcherConfig) -> Self {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        Self {
            client,
            config,
            by_condition: HashMap::new(),
            by_wallet: HashMap::new(),
            alert_tx: tx,
            alert_rx: rx,
            closed: false,
        }
    }

    /// Start watching. Fetches positions via REST, then spawns a background task
    /// that listens to oracle events and position changes.
    pub async fn start(&mut self, wallets: &[&str]) -> Result<()> {
        if self.closed {
            return Err(Error::Api { status: 0, message: "Watcher is closed".into() });
        }

        self.fetch_and_load(wallets).await?;

        // Clone what we need for the background task
        let client = self.client.clone();
        let alert_tx = self.alert_tx.clone();
        let compress = self.config.compress;
        let wallet_list: Vec<String> = self.by_wallet.keys().cloned().collect();
        let by_condition = self.by_condition.clone();
        let by_wallet = self.by_wallet.clone();
        let refresh_secs = self.config.refresh_interval_secs;
        let track_pos = self.config.track_position_changes;

        tokio::spawn(async move {
            run_watcher_loop(
                client, alert_tx, compress, wallet_list,
                by_condition, by_wallet,
                refresh_secs, track_pos,
            ).await;
        });

        Ok(())
    }

    /// Add wallets at runtime.
    pub async fn add_wallets(&mut self, wallets: &[&str]) -> Result<()> {
        self.fetch_and_load(wallets).await
    }

    /// Remove wallets from tracking.
    pub fn remove_wallets(&mut self, wallets: &[&str]) {
        for wallet in wallets {
            let w = wallet.to_lowercase();
            if let Some(conditions) = self.by_wallet.remove(&w) {
                for cond_id in conditions {
                    if let Some(positions) = self.by_condition.get_mut(&cond_id) {
                        positions.retain(|p| p.wallet.to_lowercase() != w);
                        if positions.is_empty() {
                            self.by_condition.remove(&cond_id);
                        }
                    }
                }
            }
        }
    }

    /// Receive the next alert. Returns None when the watcher is closed.
    pub async fn next_alert(&mut self) -> Option<RedeemableAlert> {
        self.alert_rx.recv().await
    }

    /// All tracked wallet addresses.
    pub fn wallets(&self) -> Vec<String> {
        self.by_wallet.keys().cloned().collect()
    }

    /// Get tracked positions for a wallet.
    pub fn positions_for(&self, wallet: &str) -> Vec<TrackedPosition> {
        let w = wallet.to_lowercase();
        let Some(conditions) = self.by_wallet.get(&w) else { return vec![] };
        let mut result = Vec::new();
        for cond_id in conditions {
            if let Some(positions) = self.by_condition.get(cond_id) {
                for p in positions {
                    if p.wallet.to_lowercase() == w {
                        result.push(p.clone());
                    }
                }
            }
        }
        result
    }

    /// Total tracked positions across all wallets.
    pub fn size(&self) -> usize {
        self.by_condition.values().map(|v| v.len()).sum()
    }

    /// Close the watcher.
    pub fn close(&mut self) {
        self.closed = true;
        // Dropping alert_tx will cause alert_rx.recv() to return None
    }

    // ── Internal ──

    async fn fetch_and_load(&mut self, wallets: &[&str]) -> Result<()> {
        for wallet in wallets {
            match self.client.wallet_positions_data(wallet, None, None).await {
                Ok(data) => self.load_positions(wallet, &data.positions),
                Err(e) => tracing::warn!("Failed to fetch positions for {}: {}", wallet, e),
            }
        }
        Ok(())
    }

    fn load_positions(&mut self, wallet: &str, positions: &[serde_json::Value]) {
        let w = wallet.to_lowercase();
        let wallet_conditions = self.by_wallet.entry(w.clone()).or_default();

        for pos in positions {
            let condition_id = pos.get("conditionId")
                .or_else(|| pos.get("condition_id"))
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string();
            let token_id = pos.get("asset")
                .or_else(|| pos.get("token_id"))
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string();
            let size = pos.get("size")
                .and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
                .unwrap_or(0.0);

            if condition_id.is_empty() || token_id.is_empty() || size <= 0.0 {
                continue;
            }

            let tracked = TrackedPosition {
                wallet: w.clone(),
                token_id: token_id.clone(),
                condition_id: condition_id.clone(),
                outcome: pos.get("outcome").and_then(|v| v.as_str()).unwrap_or("").into(),
                size,
                market_title: pos.get("market_title").or_else(|| pos.get("title"))
                    .and_then(|v| v.as_str()).unwrap_or("").into(),
                market_slug: pos.get("market_slug").or_else(|| pos.get("slug"))
                    .and_then(|v| v.as_str()).unwrap_or("").into(),
                market_image: pos.get("market_image").or_else(|| pos.get("image"))
                    .and_then(|v| v.as_str()).map(String::from),
                outcomes: pos.get("outcomes")
                    .and_then(|v| v.as_array())
                    .map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
                    .unwrap_or_default(),
                token_ids: pos.get("token_ids")
                    .and_then(|v| v.as_array())
                    .map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
                    .unwrap_or_default(),
            };

            let existing = self.by_condition.entry(condition_id.clone()).or_default();
            // Avoid duplicates
            if let Some(idx) = existing.iter().position(|p| p.wallet == w && p.token_id == token_id) {
                existing[idx] = tracked;
            } else {
                existing.push(tracked);
            }
            wallet_conditions.insert(condition_id);
        }
    }
}

/// Background task that listens for oracle events and emits alerts.
async fn run_watcher_loop(
    client: Arc<PolyNodeClient>,
    alert_tx: tokio::sync::mpsc::UnboundedSender<RedeemableAlert>,
    compress: bool,
    _wallet_list: Vec<String>,
    mut by_condition: HashMap<String, Vec<TrackedPosition>>,
    mut by_wallet: HashMap<String, HashSet<String>>,
    _refresh_secs: u64,
    _track_pos: bool,
) {
    let stream = match client.stream(StreamOptions {
        compress,
        auto_reconnect: true,
        ..Default::default()
    }).await {
        Ok(s) => s,
        Err(e) => {
            tracing::error!("Failed to connect WebSocket for redemption watcher: {}", e);
            return;
        }
    };

    // Subscribe to oracle events
    let sub = crate::ws::Subscription::new(crate::ws::SubscriptionType::Oracle);
    if let Err(e) = stream.subscribe(sub).await {
        tracing::error!("Failed to subscribe to oracle events: {}", e);
        return;
    }

    let mut stream = stream;
    while let Some(msg) = stream.next().await {
        let msg = match msg {
            Ok(m) => m,
            Err(e) => {
                tracing::warn!("WebSocket error in redemption watcher: {}", e);
                continue;
            }
        };

        if let WsMessage::Event(crate::types::events::PolyNodeEvent::Oracle(event)) = msg {
            if event.oracle_type != crate::types::common::OracleEventType::ConditionResolution {
                continue;
            }
            let Some(ref condition_id) = event.condition_id else { continue };

            let positions = match by_condition.get(condition_id) {
                Some(p) if !p.is_empty() => p.clone(),
                _ => continue,
            };

            for pos in &positions {
                let token_index = event.token_ids.as_ref()
                    .and_then(|ids| ids.iter().position(|id| id == &pos.token_id));
                let is_winner = token_index
                    .and_then(|i| event.payouts.as_ref().and_then(|p| p.get(i)))
                    .map(|&p| p > 0)
                    .unwrap_or(false);

                let alert = RedeemableAlert {
                    wallet: pos.wallet.clone(),
                    condition_id: condition_id.clone(),
                    token_id: pos.token_id.clone(),
                    outcome: pos.outcome.clone(),
                    winning_outcome: event.resolved_outcome.clone().unwrap_or_default(),
                    is_winner,
                    size: pos.size,
                    estimated_payout_usd: if is_winner { pos.size } else { 0.0 },
                    market_title: pos.market_title.clone(),
                    market_slug: pos.market_slug.clone(),
                    market_image: pos.market_image.clone(),
                    resolved_price: event.resolved_price.unwrap_or(0.0),
                    payouts: event.payouts.clone().unwrap_or_default(),
                    block_number: event.block_number,
                    timestamp: event.timestamp,
                };

                if alert_tx.send(alert).is_err() {
                    return; // Receiver dropped, watcher closed
                }
            }

            // Evict resolved condition
            by_condition.remove(condition_id);
            for wallet_conds in by_wallet.values_mut() {
                wallet_conds.remove(condition_id);
            }
        }
    }
}