1use crate::chains::dex::DexTokenData;
7use crate::chains::{ChainClientFactory, DexDataSource};
8use crate::market::{ExchangeClient, TradeSide, VenueRegistry};
9use crate::web::AppState;
10use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
11use axum::extract::{Query, State};
12use axum::response::IntoResponse;
13use serde::Deserialize;
14use std::sync::Arc;
15use std::time::Duration;
16
17#[derive(Debug, Deserialize)]
19pub struct MonitorQuery {
20 pub token: String,
22 #[serde(default = "default_chain")]
24 pub chain: String,
25 #[serde(default = "default_refresh")]
27 pub refresh: u64,
28 #[serde(default)]
31 pub venue: Option<String>,
32 #[serde(default)]
34 pub pair: Option<String>,
35}
36
37fn default_chain() -> String {
38 "ethereum".to_string()
39}
40
41fn default_refresh() -> u64 {
42 5
43}
44
45pub async fn ws_handler(
47 ws: WebSocketUpgrade,
48 State(state): State<Arc<AppState>>,
49 Query(params): Query<MonitorQuery>,
50) -> impl IntoResponse {
51 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
52}
53
54async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, params: MonitorQuery) {
59 let dex_client: Box<dyn DexDataSource> = state.factory.create_dex_client();
60 let refresh = Duration::from_secs(params.refresh.max(1));
61
62 let token_input = params.token.clone();
64 let chain = params.chain.clone();
65
66 let exchange_client: Option<ExchangeClient> = params.venue.as_ref().and_then(|venue_id| {
68 VenueRegistry::load()
69 .ok()
70 .and_then(|r| r.create_exchange_client(venue_id).ok())
71 });
72
73 let exchange_pair: Option<String> = exchange_client.as_ref().map(|ec| {
74 let base = params.pair.as_deref().unwrap_or(&token_input);
75 ec.format_pair(base)
76 });
77
78 let init_msg = serde_json::json!({
80 "type": "connected",
81 "token": token_input,
82 "chain": chain,
83 "refresh_secs": params.refresh,
84 "venue": params.venue,
85 "exchange_pair": exchange_pair,
86 });
87 if socket
88 .send(Message::Text(init_msg.to_string()))
89 .await
90 .is_err()
91 {
92 return;
93 }
94
95 loop {
96 let data: crate::error::Result<DexTokenData> =
98 dex_client.get_token_data(&chain, &token_input).await;
99
100 let exchange_snapshot = if let (Some(ec), Some(pair)) = (&exchange_client, &exchange_pair) {
102 Some(ec.fetch_market_snapshot(pair).await)
103 } else {
104 None
105 };
106
107 let msg = match data {
108 Ok(token_data) => {
109 let mut frame = serde_json::json!({
110 "type": "update",
111 "timestamp": chrono::Utc::now().to_rfc3339(),
112 "token": {
113 "symbol": token_data.symbol,
114 "name": token_data.name,
115 "address": token_data.address,
116 },
117 "price_usd": token_data.price_usd,
118 "price_change_24h": token_data.price_change_24h,
119 "price_change_6h": token_data.price_change_6h,
120 "price_change_1h": token_data.price_change_1h,
121 "volume_24h": token_data.volume_24h,
122 "volume_6h": token_data.volume_6h,
123 "volume_1h": token_data.volume_1h,
124 "liquidity_usd": token_data.liquidity_usd,
125 "market_cap": token_data.market_cap,
126 "buys_24h": token_data.total_buys_24h,
127 "sells_24h": token_data.total_sells_24h,
128 "buys_1h": token_data.total_buys_1h,
129 "sells_1h": token_data.total_sells_1h,
130 "pairs": token_data.pairs.iter().take(5).map(|p| {
131 serde_json::json!({
132 "dex": p.dex_name,
133 "base": p.base_token,
134 "quote": p.quote_token,
135 "price_usd": p.price_usd,
136 "volume_24h": p.volume_24h,
137 "liquidity_usd": p.liquidity_usd,
138 })
139 }).collect::<Vec<_>>(),
140 });
141
142 if let Some(snap) = &exchange_snapshot {
144 attach_exchange_data(&mut frame, snap);
145 }
146
147 frame
148 }
149 Err(e) => {
150 let mut frame = serde_json::json!({
151 "type": "error",
152 "message": e.to_string(),
153 });
154 if let Some(snap) = &exchange_snapshot {
156 attach_exchange_data(&mut frame, snap);
157 }
158 frame
159 }
160 };
161
162 if socket.send(Message::Text(msg.to_string())).await.is_err() {
163 break;
165 }
166
167 tokio::select! {
169 _ = tokio::time::sleep(refresh) => {},
170 msg = socket.recv() => {
171 match msg {
172 Some(Ok(Message::Close(_))) | None => break,
173 Some(Ok(Message::Text(text))) => {
174 if let Ok(cmd) = serde_json::from_str::<serde_json::Value>(&text)
176 && cmd.get("type").and_then(|t| t.as_str()) == Some("ping")
177 {
178 let pong = serde_json::json!({"type": "pong"});
179 let _ = socket.send(Message::Text(pong.to_string())).await;
180 }
181 }
182 _ => {}
183 }
184 }
185 }
186 }
187}
188
189fn attach_exchange_data(frame: &mut serde_json::Value, snap: &crate::market::MarketSnapshot) {
191 if let Some(book) = &snap.order_book {
192 frame["exchange_order_book"] = serde_json::json!({
193 "pair": book.pair,
194 "best_bid": book.best_bid(),
195 "best_ask": book.best_ask(),
196 "mid_price": book.mid_price(),
197 "spread": book.spread(),
198 "bid_depth": book.bid_depth(),
199 "ask_depth": book.ask_depth(),
200 "bids": book.bids.iter().take(20).map(|l| {
201 serde_json::json!({"price": l.price, "quantity": l.quantity})
202 }).collect::<Vec<_>>(),
203 "asks": book.asks.iter().take(20).map(|l| {
204 serde_json::json!({"price": l.price, "quantity": l.quantity})
205 }).collect::<Vec<_>>(),
206 });
207 }
208
209 if let Some(ticker) = &snap.ticker {
210 frame["exchange_ticker"] = serde_json::json!({
211 "pair": ticker.pair,
212 "last_price": ticker.last_price,
213 "high_24h": ticker.high_24h,
214 "low_24h": ticker.low_24h,
215 "volume_24h": ticker.volume_24h,
216 "quote_volume_24h": ticker.quote_volume_24h,
217 "best_bid": ticker.best_bid,
218 "best_ask": ticker.best_ask,
219 });
220 }
221
222 if let Some(trades) = &snap.recent_trades {
223 frame["exchange_trades"] = serde_json::json!(
224 trades
225 .iter()
226 .take(20)
227 .map(|t| {
228 serde_json::json!({
229 "price": t.price,
230 "quantity": t.quantity,
231 "timestamp_ms": t.timestamp_ms,
232 "side": match t.side {
233 TradeSide::Buy => "buy",
234 TradeSide::Sell => "sell",
235 },
236 })
237 })
238 .collect::<Vec<_>>()
239 );
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::market::{MarketSnapshot, OrderBook, OrderBookLevel, Ticker, Trade};
247
248 #[test]
249 fn test_default_chain() {
250 assert_eq!(default_chain(), "ethereum");
251 }
252
253 #[test]
254 fn test_default_refresh() {
255 assert_eq!(default_refresh(), 5);
256 }
257
258 #[test]
259 fn test_deserialize_monitor_query_full() {
260 let json = serde_json::json!({
261 "token": "USDC",
262 "chain": "solana",
263 "refresh": 10
264 });
265 let query: MonitorQuery = serde_json::from_value(json).unwrap();
266 assert_eq!(query.token, "USDC");
267 assert_eq!(query.chain, "solana");
268 assert_eq!(query.refresh, 10);
269 assert!(query.venue.is_none());
270 }
271
272 #[test]
273 fn test_deserialize_monitor_query_minimal() {
274 let json = serde_json::json!({
275 "token": "ETH"
276 });
277 let query: MonitorQuery = serde_json::from_value(json).unwrap();
278 assert_eq!(query.token, "ETH");
279 assert_eq!(query.chain, "ethereum");
280 assert_eq!(query.refresh, 5);
281 assert!(query.venue.is_none());
282 assert!(query.pair.is_none());
283 }
284
285 #[test]
286 fn test_deserialize_monitor_query_with_venue() {
287 let json = serde_json::json!({
288 "token": "BTC",
289 "venue": "binance",
290 "pair": "USDC",
291 "refresh": 10
292 });
293 let query: MonitorQuery = serde_json::from_value(json).unwrap();
294 assert_eq!(query.token, "BTC");
295 assert_eq!(query.venue.as_deref(), Some("binance"));
296 assert_eq!(query.pair.as_deref(), Some("USDC"));
297 }
298
299 #[test]
300 fn test_attach_exchange_data_full() {
301 let snapshot = MarketSnapshot {
302 order_book: Some(OrderBook {
303 pair: "BTC/USDT".to_string(),
304 bids: vec![OrderBookLevel {
305 price: 50000.0,
306 quantity: 1.5,
307 }],
308 asks: vec![OrderBookLevel {
309 price: 50010.0,
310 quantity: 2.0,
311 }],
312 }),
313 ticker: Some(Ticker {
314 pair: "BTC/USDT".to_string(),
315 last_price: Some(50005.0),
316 high_24h: Some(51000.0),
317 low_24h: Some(49000.0),
318 volume_24h: Some(1_000_000.0),
319 quote_volume_24h: Some(50_000_000.0),
320 best_bid: Some(50000.0),
321 best_ask: Some(50010.0),
322 }),
323 recent_trades: Some(vec![Trade {
324 price: 50005.0,
325 quantity: 0.5,
326 quote_quantity: Some(25002.5),
327 timestamp_ms: 1700000000000,
328 side: TradeSide::Buy,
329 id: None,
330 }]),
331 };
332
333 let mut frame = serde_json::json!({"type": "update"});
334 attach_exchange_data(&mut frame, &snapshot);
335
336 assert!(frame.get("exchange_order_book").is_some());
337 assert!(frame.get("exchange_ticker").is_some());
338 assert!(frame.get("exchange_trades").is_some());
339 assert_eq!(
340 frame["exchange_ticker"]["last_price"].as_f64().unwrap(),
341 50005.0
342 );
343 }
344
345 #[test]
346 fn test_attach_exchange_data_empty() {
347 let snapshot = MarketSnapshot {
348 order_book: None,
349 ticker: None,
350 recent_trades: None,
351 };
352
353 let mut frame = serde_json::json!({"type": "update"});
354 attach_exchange_data(&mut frame, &snapshot);
355
356 assert!(frame.get("exchange_order_book").is_none());
357 assert!(frame.get("exchange_ticker").is_none());
358 assert!(frame.get("exchange_trades").is_none());
359 }
360}