use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use serde::Serialize;
use crate::error::{Error, Result};
use crate::client::PolyNodeClient;
use crate::ws::StreamOptions;
use crate::types::ws_messages::WsMessage;
#[derive(Debug, Clone)]
pub struct RedemptionWatcherConfig {
pub track_position_changes: bool,
pub refresh_interval_secs: u64,
pub compress: bool,
}
impl Default for RedemptionWatcherConfig {
fn default() -> Self {
Self {
track_position_changes: true,
refresh_interval_secs: 300,
compress: true,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RedeemableAlert {
pub wallet: String,
pub condition_id: String,
pub token_id: String,
pub outcome: String,
pub winning_outcome: String,
pub is_winner: bool,
pub size: f64,
pub estimated_payout_usd: f64,
pub market_title: String,
pub market_slug: String,
pub market_image: Option<String>,
pub resolved_price: f64,
pub payouts: Vec<u64>,
pub block_number: u64,
pub timestamp: i64,
}
#[derive(Debug, Clone)]
pub struct TrackedPosition {
pub wallet: String,
pub token_id: String,
pub condition_id: String,
pub outcome: String,
pub size: f64,
pub market_title: String,
pub market_slug: String,
pub market_image: Option<String>,
pub outcomes: Vec<String>,
pub token_ids: Vec<String>,
}
pub struct RedemptionWatcher {
client: Arc<PolyNodeClient>,
config: RedemptionWatcherConfig,
by_condition: HashMap<String, Vec<TrackedPosition>>,
by_wallet: HashMap<String, HashSet<String>>,
alert_tx: tokio::sync::mpsc::UnboundedSender<RedeemableAlert>,
alert_rx: tokio::sync::mpsc::UnboundedReceiver<RedeemableAlert>,
closed: bool,
}
impl RedemptionWatcher {
pub fn new(client: Arc<PolyNodeClient>, config: RedemptionWatcherConfig) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Self {
client,
config,
by_condition: HashMap::new(),
by_wallet: HashMap::new(),
alert_tx: tx,
alert_rx: rx,
closed: false,
}
}
pub async fn start(&mut self, wallets: &[&str]) -> Result<()> {
if self.closed {
return Err(Error::Api { status: 0, message: "Watcher is closed".into() });
}
self.fetch_and_load(wallets).await?;
let client = self.client.clone();
let alert_tx = self.alert_tx.clone();
let compress = self.config.compress;
let wallet_list: Vec<String> = self.by_wallet.keys().cloned().collect();
let by_condition = self.by_condition.clone();
let by_wallet = self.by_wallet.clone();
let refresh_secs = self.config.refresh_interval_secs;
let track_pos = self.config.track_position_changes;
tokio::spawn(async move {
run_watcher_loop(
client, alert_tx, compress, wallet_list,
by_condition, by_wallet,
refresh_secs, track_pos,
).await;
});
Ok(())
}
pub async fn add_wallets(&mut self, wallets: &[&str]) -> Result<()> {
self.fetch_and_load(wallets).await
}
pub fn remove_wallets(&mut self, wallets: &[&str]) {
for wallet in wallets {
let w = wallet.to_lowercase();
if let Some(conditions) = self.by_wallet.remove(&w) {
for cond_id in conditions {
if let Some(positions) = self.by_condition.get_mut(&cond_id) {
positions.retain(|p| p.wallet.to_lowercase() != w);
if positions.is_empty() {
self.by_condition.remove(&cond_id);
}
}
}
}
}
}
pub async fn next_alert(&mut self) -> Option<RedeemableAlert> {
self.alert_rx.recv().await
}
pub fn wallets(&self) -> Vec<String> {
self.by_wallet.keys().cloned().collect()
}
pub fn positions_for(&self, wallet: &str) -> Vec<TrackedPosition> {
let w = wallet.to_lowercase();
let Some(conditions) = self.by_wallet.get(&w) else { return vec![] };
let mut result = Vec::new();
for cond_id in conditions {
if let Some(positions) = self.by_condition.get(cond_id) {
for p in positions {
if p.wallet.to_lowercase() == w {
result.push(p.clone());
}
}
}
}
result
}
pub fn size(&self) -> usize {
self.by_condition.values().map(|v| v.len()).sum()
}
pub fn close(&mut self) {
self.closed = true;
}
async fn fetch_and_load(&mut self, wallets: &[&str]) -> Result<()> {
for wallet in wallets {
match self.client.wallet_positions_data(wallet, None, None).await {
Ok(data) => self.load_positions(wallet, &data.positions),
Err(e) => tracing::warn!("Failed to fetch positions for {}: {}", wallet, e),
}
}
Ok(())
}
fn load_positions(&mut self, wallet: &str, positions: &[serde_json::Value]) {
let w = wallet.to_lowercase();
let wallet_conditions = self.by_wallet.entry(w.clone()).or_default();
for pos in positions {
let condition_id = pos.get("conditionId")
.or_else(|| pos.get("condition_id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let token_id = pos.get("asset")
.or_else(|| pos.get("token_id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let size = pos.get("size")
.and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok())))
.unwrap_or(0.0);
if condition_id.is_empty() || token_id.is_empty() || size <= 0.0 {
continue;
}
let tracked = TrackedPosition {
wallet: w.clone(),
token_id: token_id.clone(),
condition_id: condition_id.clone(),
outcome: pos.get("outcome").and_then(|v| v.as_str()).unwrap_or("").into(),
size,
market_title: pos.get("market_title").or_else(|| pos.get("title"))
.and_then(|v| v.as_str()).unwrap_or("").into(),
market_slug: pos.get("market_slug").or_else(|| pos.get("slug"))
.and_then(|v| v.as_str()).unwrap_or("").into(),
market_image: pos.get("market_image").or_else(|| pos.get("image"))
.and_then(|v| v.as_str()).map(String::from),
outcomes: pos.get("outcomes")
.and_then(|v| v.as_array())
.map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
.unwrap_or_default(),
token_ids: pos.get("token_ids")
.and_then(|v| v.as_array())
.map(|a| a.iter().filter_map(|v| v.as_str().map(String::from)).collect())
.unwrap_or_default(),
};
let existing = self.by_condition.entry(condition_id.clone()).or_default();
if let Some(idx) = existing.iter().position(|p| p.wallet == w && p.token_id == token_id) {
existing[idx] = tracked;
} else {
existing.push(tracked);
}
wallet_conditions.insert(condition_id);
}
}
}
async fn run_watcher_loop(
client: Arc<PolyNodeClient>,
alert_tx: tokio::sync::mpsc::UnboundedSender<RedeemableAlert>,
compress: bool,
_wallet_list: Vec<String>,
mut by_condition: HashMap<String, Vec<TrackedPosition>>,
mut by_wallet: HashMap<String, HashSet<String>>,
_refresh_secs: u64,
_track_pos: bool,
) {
let stream = match client.stream(StreamOptions {
compress,
auto_reconnect: true,
..Default::default()
}).await {
Ok(s) => s,
Err(e) => {
tracing::error!("Failed to connect WebSocket for redemption watcher: {}", e);
return;
}
};
let sub = crate::ws::Subscription::new(crate::ws::SubscriptionType::Oracle);
if let Err(e) = stream.subscribe(sub).await {
tracing::error!("Failed to subscribe to oracle events: {}", e);
return;
}
let mut stream = stream;
while let Some(msg) = stream.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
tracing::warn!("WebSocket error in redemption watcher: {}", e);
continue;
}
};
if let WsMessage::Event(crate::types::events::PolyNodeEvent::Oracle(event)) = msg {
if event.oracle_type != crate::types::common::OracleEventType::ConditionResolution {
continue;
}
let Some(ref condition_id) = event.condition_id else { continue };
let positions = match by_condition.get(condition_id) {
Some(p) if !p.is_empty() => p.clone(),
_ => continue,
};
for pos in &positions {
let token_index = event.token_ids.as_ref()
.and_then(|ids| ids.iter().position(|id| id == &pos.token_id));
let is_winner = token_index
.and_then(|i| event.payouts.as_ref().and_then(|p| p.get(i)))
.map(|&p| p > 0)
.unwrap_or(false);
let alert = RedeemableAlert {
wallet: pos.wallet.clone(),
condition_id: condition_id.clone(),
token_id: pos.token_id.clone(),
outcome: pos.outcome.clone(),
winning_outcome: event.resolved_outcome.clone().unwrap_or_default(),
is_winner,
size: pos.size,
estimated_payout_usd: if is_winner { pos.size } else { 0.0 },
market_title: pos.market_title.clone(),
market_slug: pos.market_slug.clone(),
market_image: pos.market_image.clone(),
resolved_price: event.resolved_price.unwrap_or(0.0),
payouts: event.payouts.clone().unwrap_or_default(),
block_number: event.block_number,
timestamp: event.timestamp,
};
if alert_tx.send(alert).is_err() {
return; }
}
by_condition.remove(condition_id);
for wallet_conds in by_wallet.values_mut() {
wallet_conds.remove(condition_id);
}
}
}
}