Skip to main content

scope/web/
monitor.rs

1//! WebSocket monitor handler for live token data streaming.
2//!
3//! Streams real-time token price, volume, and transaction data
4//! to connected browser clients via WebSocket.
5
6use crate::chains::dex::DexTokenData;
7use crate::chains::{ChainClientFactory, DexDataSource};
8use crate::market::{ExchangeClient, TradeSide, VenueRegistry};
9use crate::web::AppState;
10use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
11use axum::extract::{Query, State};
12use axum::response::IntoResponse;
13use serde::Deserialize;
14use std::sync::Arc;
15use std::time::Duration;
16
17/// Query parameters for the WebSocket monitor connection.
18#[derive(Debug, Deserialize)]
19pub struct MonitorQuery {
20    /// Token address or symbol to monitor.
21    pub token: String,
22    /// Chain (default: "ethereum").
23    #[serde(default = "default_chain")]
24    pub chain: String,
25    /// Refresh interval in seconds (default: 5).
26    #[serde(default = "default_refresh")]
27    pub refresh: u64,
28    /// Optional exchange venue ID (e.g., "binance") — when set, exchange data
29    /// (order book, ticker, recent trades) is included in each update frame.
30    #[serde(default)]
31    pub venue: Option<String>,
32    /// Base pair for exchange data (default: same as token).
33    #[serde(default)]
34    pub pair: Option<String>,
35}
36
37fn default_chain() -> String {
38    "ethereum".to_string()
39}
40
41fn default_refresh() -> u64 {
42    5
43}
44
45/// WS /ws/monitor — WebSocket endpoint for live token monitoring.
46pub async fn ws_handler(
47    ws: WebSocketUpgrade,
48    State(state): State<Arc<AppState>>,
49    Query(params): Query<MonitorQuery>,
50) -> impl IntoResponse {
51    ws.on_upgrade(move |socket| handle_socket(socket, state, params))
52}
53
54/// Handles an individual WebSocket connection.
55///
56/// Polls DexScreener at the configured interval and sends JSON updates
57/// containing price, volume, and activity data.
58async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, params: MonitorQuery) {
59    let dex_client: Box<dyn DexDataSource> = state.factory.create_dex_client();
60    let refresh = Duration::from_secs(params.refresh.max(1));
61
62    // Resolve token to address if needed
63    let token_input = params.token.clone();
64    let chain = params.chain.clone();
65
66    // Optionally create an exchange client for CEX data
67    let exchange_client: Option<ExchangeClient> = params.venue.as_ref().and_then(|venue_id| {
68        VenueRegistry::load()
69            .ok()
70            .and_then(|r| r.create_exchange_client(venue_id).ok())
71    });
72
73    let exchange_pair: Option<String> = exchange_client.as_ref().map(|ec| {
74        let base = params.pair.as_deref().unwrap_or(&token_input);
75        ec.format_pair(base)
76    });
77
78    // Send initial connection message
79    let init_msg = serde_json::json!({
80        "type": "connected",
81        "token": token_input,
82        "chain": chain,
83        "refresh_secs": params.refresh,
84        "venue": params.venue,
85        "exchange_pair": exchange_pair,
86    });
87    if socket
88        .send(Message::Text(init_msg.to_string()))
89        .await
90        .is_err()
91    {
92        return;
93    }
94
95    loop {
96        // Fetch latest DEX token data
97        let data: crate::error::Result<DexTokenData> =
98            dex_client.get_token_data(&chain, &token_input).await;
99
100        // Optionally fetch exchange snapshot in parallel
101        let exchange_snapshot = if let (Some(ec), Some(pair)) = (&exchange_client, &exchange_pair) {
102            Some(ec.fetch_market_snapshot(pair).await)
103        } else {
104            None
105        };
106
107        let msg = match data {
108            Ok(token_data) => {
109                let mut frame = serde_json::json!({
110                    "type": "update",
111                    "timestamp": chrono::Utc::now().to_rfc3339(),
112                    "token": {
113                        "symbol": token_data.symbol,
114                        "name": token_data.name,
115                        "address": token_data.address,
116                    },
117                    "price_usd": token_data.price_usd,
118                    "price_change_24h": token_data.price_change_24h,
119                    "price_change_6h": token_data.price_change_6h,
120                    "price_change_1h": token_data.price_change_1h,
121                    "volume_24h": token_data.volume_24h,
122                    "volume_6h": token_data.volume_6h,
123                    "volume_1h": token_data.volume_1h,
124                    "liquidity_usd": token_data.liquidity_usd,
125                    "market_cap": token_data.market_cap,
126                    "buys_24h": token_data.total_buys_24h,
127                    "sells_24h": token_data.total_sells_24h,
128                    "buys_1h": token_data.total_buys_1h,
129                    "sells_1h": token_data.total_sells_1h,
130                    "pairs": token_data.pairs.iter().take(5).map(|p| {
131                        serde_json::json!({
132                            "dex": p.dex_name,
133                            "base": p.base_token,
134                            "quote": p.quote_token,
135                            "price_usd": p.price_usd,
136                            "volume_24h": p.volume_24h,
137                            "liquidity_usd": p.liquidity_usd,
138                        })
139                    }).collect::<Vec<_>>(),
140                });
141
142                // Attach exchange data if available
143                if let Some(snap) = &exchange_snapshot {
144                    attach_exchange_data(&mut frame, snap);
145                }
146
147                frame
148            }
149            Err(e) => {
150                let mut frame = serde_json::json!({
151                    "type": "error",
152                    "message": e.to_string(),
153                });
154                // Still attach exchange data even on DEX error
155                if let Some(snap) = &exchange_snapshot {
156                    attach_exchange_data(&mut frame, snap);
157                }
158                frame
159            }
160        };
161
162        if socket.send(Message::Text(msg.to_string())).await.is_err() {
163            // Client disconnected
164            break;
165        }
166
167        // Wait for next refresh or client message
168        tokio::select! {
169            _ = tokio::time::sleep(refresh) => {},
170            msg = socket.recv() => {
171                match msg {
172                    Some(Ok(Message::Close(_))) | None => break,
173                    Some(Ok(Message::Text(text))) => {
174                        // Handle client commands (e.g., change token)
175                        if let Ok(cmd) = serde_json::from_str::<serde_json::Value>(&text)
176                            && cmd.get("type").and_then(|t| t.as_str()) == Some("ping")
177                        {
178                            let pong = serde_json::json!({"type": "pong"});
179                            let _ = socket.send(Message::Text(pong.to_string())).await;
180                        }
181                    }
182                    _ => {}
183                }
184            }
185        }
186    }
187}
188
189/// Attach exchange snapshot data (order book, ticker, trades) to a JSON frame.
190fn attach_exchange_data(frame: &mut serde_json::Value, snap: &crate::market::MarketSnapshot) {
191    if let Some(book) = &snap.order_book {
192        frame["exchange_order_book"] = serde_json::json!({
193            "pair": book.pair,
194            "best_bid": book.best_bid(),
195            "best_ask": book.best_ask(),
196            "mid_price": book.mid_price(),
197            "spread": book.spread(),
198            "bid_depth": book.bid_depth(),
199            "ask_depth": book.ask_depth(),
200            "bids": book.bids.iter().take(20).map(|l| {
201                serde_json::json!({"price": l.price, "quantity": l.quantity})
202            }).collect::<Vec<_>>(),
203            "asks": book.asks.iter().take(20).map(|l| {
204                serde_json::json!({"price": l.price, "quantity": l.quantity})
205            }).collect::<Vec<_>>(),
206        });
207    }
208
209    if let Some(ticker) = &snap.ticker {
210        frame["exchange_ticker"] = serde_json::json!({
211            "pair": ticker.pair,
212            "last_price": ticker.last_price,
213            "high_24h": ticker.high_24h,
214            "low_24h": ticker.low_24h,
215            "volume_24h": ticker.volume_24h,
216            "quote_volume_24h": ticker.quote_volume_24h,
217            "best_bid": ticker.best_bid,
218            "best_ask": ticker.best_ask,
219        });
220    }
221
222    if let Some(trades) = &snap.recent_trades {
223        frame["exchange_trades"] = serde_json::json!(
224            trades
225                .iter()
226                .take(20)
227                .map(|t| {
228                    serde_json::json!({
229                        "price": t.price,
230                        "quantity": t.quantity,
231                        "timestamp_ms": t.timestamp_ms,
232                        "side": match t.side {
233                            TradeSide::Buy => "buy",
234                            TradeSide::Sell => "sell",
235                        },
236                    })
237                })
238                .collect::<Vec<_>>()
239        );
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::market::{MarketSnapshot, OrderBook, OrderBookLevel, Ticker, Trade};
247
248    #[test]
249    fn test_default_chain() {
250        assert_eq!(default_chain(), "ethereum");
251    }
252
253    #[test]
254    fn test_default_refresh() {
255        assert_eq!(default_refresh(), 5);
256    }
257
258    #[test]
259    fn test_deserialize_monitor_query_full() {
260        let json = serde_json::json!({
261            "token": "USDC",
262            "chain": "solana",
263            "refresh": 10
264        });
265        let query: MonitorQuery = serde_json::from_value(json).unwrap();
266        assert_eq!(query.token, "USDC");
267        assert_eq!(query.chain, "solana");
268        assert_eq!(query.refresh, 10);
269        assert!(query.venue.is_none());
270    }
271
272    #[test]
273    fn test_deserialize_monitor_query_minimal() {
274        let json = serde_json::json!({
275            "token": "ETH"
276        });
277        let query: MonitorQuery = serde_json::from_value(json).unwrap();
278        assert_eq!(query.token, "ETH");
279        assert_eq!(query.chain, "ethereum");
280        assert_eq!(query.refresh, 5);
281        assert!(query.venue.is_none());
282        assert!(query.pair.is_none());
283    }
284
285    #[test]
286    fn test_deserialize_monitor_query_with_venue() {
287        let json = serde_json::json!({
288            "token": "BTC",
289            "venue": "binance",
290            "pair": "USDC",
291            "refresh": 10
292        });
293        let query: MonitorQuery = serde_json::from_value(json).unwrap();
294        assert_eq!(query.token, "BTC");
295        assert_eq!(query.venue.as_deref(), Some("binance"));
296        assert_eq!(query.pair.as_deref(), Some("USDC"));
297    }
298
299    #[test]
300    fn test_attach_exchange_data_full() {
301        let snapshot = MarketSnapshot {
302            order_book: Some(OrderBook {
303                pair: "BTC/USDT".to_string(),
304                bids: vec![OrderBookLevel {
305                    price: 50000.0,
306                    quantity: 1.5,
307                }],
308                asks: vec![OrderBookLevel {
309                    price: 50010.0,
310                    quantity: 2.0,
311                }],
312            }),
313            ticker: Some(Ticker {
314                pair: "BTC/USDT".to_string(),
315                last_price: Some(50005.0),
316                high_24h: Some(51000.0),
317                low_24h: Some(49000.0),
318                volume_24h: Some(1_000_000.0),
319                quote_volume_24h: Some(50_000_000.0),
320                best_bid: Some(50000.0),
321                best_ask: Some(50010.0),
322            }),
323            recent_trades: Some(vec![Trade {
324                price: 50005.0,
325                quantity: 0.5,
326                quote_quantity: Some(25002.5),
327                timestamp_ms: 1700000000000,
328                side: TradeSide::Buy,
329                id: None,
330            }]),
331        };
332
333        let mut frame = serde_json::json!({"type": "update"});
334        attach_exchange_data(&mut frame, &snapshot);
335
336        assert!(frame.get("exchange_order_book").is_some());
337        assert!(frame.get("exchange_ticker").is_some());
338        assert!(frame.get("exchange_trades").is_some());
339        assert_eq!(
340            frame["exchange_ticker"]["last_price"].as_f64().unwrap(),
341            50005.0
342        );
343    }
344
345    #[test]
346    fn test_attach_exchange_data_empty() {
347        let snapshot = MarketSnapshot {
348            order_book: None,
349            ticker: None,
350            recent_trades: None,
351        };
352
353        let mut frame = serde_json::json!({"type": "update"});
354        attach_exchange_data(&mut frame, &snapshot);
355
356        assert!(frame.get("exchange_order_book").is_none());
357        assert!(frame.get("exchange_ticker").is_none());
358        assert!(frame.get("exchange_trades").is_none());
359    }
360}