1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use phoenix_rise::api::{
8 PhoenixClient, PhoenixClientEvent, PhoenixClientSubscriptionHandle, PhoenixHttpClient,
9 PhoenixSubscription, SubscriptionKey,
10};
11use phoenix_rise::types::exchange::ExchangeMarketConfig;
12use phoenix_rise::types::market::{MarketStatsUpdate, MarketStatus};
13use tracing::warn;
14
15pub use crate::tui::MarketInfo;
16use crate::tui::math::pct_change_24h;
17use crate::tui::{
18 MarketListUpdate, MarketStatUpdate, SplineConfig, build_spline_config, compute_price_decimals,
19 establish_rpc_with_fallback, restore_terminal, setup_terminal, spawn_splash,
20 spawn_spline_poller,
21};
22
23const MARKETS_POLL_INTERVAL: Duration = Duration::from_secs(60);
24
25struct MarketSnapshot {
26 price: f64,
27 volume_24h: f64,
28 open_interest_usd: f64,
29 change_24h: f64,
30}
31
32fn compute_change(update: &MarketStatsUpdate) -> f64 {
33 pct_change_24h(update.mark_price, update.prev_day_mark_price)
34}
35
36fn spawn_stat_forwarder(
37 mut rx: tokio::sync::mpsc::UnboundedReceiver<PhoenixClientEvent>,
38 tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
39) {
40 tokio::spawn(async move {
41 while let Some(event) = rx.recv().await {
42 if let PhoenixClientEvent::MarketUpdate { update, .. } = event {
43 match tx.try_send(update) {
44 Ok(()) => {}
45 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
46 }
49 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
50 }
51 }
52 }
53 });
54}
55
56async fn subscribe_market_stats(
63 ws: &PhoenixClient,
64 symbols: &[String],
65 stat_tx: &tokio::sync::mpsc::Sender<MarketStatUpdate>,
66 timeout: Duration,
67) -> (
68 HashMap<String, MarketSnapshot>,
69 Vec<PhoenixClientSubscriptionHandle>,
70) {
71 let mut handles = Vec::new();
72 let mut initial_rxs = Vec::new();
73
74 for sym in symbols {
75 match ws
76 .subscribe(PhoenixSubscription::Key(SubscriptionKey::market(
77 sym.clone(),
78 )))
79 .await
80 {
81 Ok((rx, handle)) => {
82 initial_rxs.push((sym.clone(), rx));
83 handles.push(handle);
84 }
85 Err(e) => {
86 warn!(symbol = %sym, error = %e, "stats subscribe failed");
87 }
88 }
89 }
90
91 let mut snapshots = HashMap::new();
92 let deadline = tokio::time::Instant::now() + timeout;
93 let mut remaining_rxs = Vec::new();
94
95 for (sym, mut rx) in initial_rxs {
96 let left = deadline.saturating_duration_since(tokio::time::Instant::now());
97 if !left.is_zero()
98 && let Ok(Some(PhoenixClientEvent::MarketUpdate { update, .. })) =
99 tokio::time::timeout(left, rx.recv()).await
100 {
101 snapshots.insert(
102 sym.clone(),
103 MarketSnapshot {
104 price: update.mark_price,
105 volume_24h: update.day_volume_usd,
106 open_interest_usd: update.open_interest * update.mark_price,
107 change_24h: compute_change(&update),
108 },
109 );
110 let _ = stat_tx.try_send(update);
116 }
117 remaining_rxs.push((sym, rx));
118 }
119
120 for (_sym, rx) in remaining_rxs {
121 spawn_stat_forwarder(rx, stat_tx.clone());
122 }
123
124 (snapshots, handles)
125}
126
127fn tradable(m: &ExchangeMarketConfig) -> bool {
128 matches!(
129 m.market_status,
130 MarketStatus::Active | MarketStatus::PostOnly
131 )
132}
133
134fn build_market_infos(
135 tradable_markets: &[&ExchangeMarketConfig],
136 snapshots: &HashMap<String, MarketSnapshot>,
137) -> Vec<MarketInfo> {
138 let mut infos: Vec<MarketInfo> = tradable_markets
139 .iter()
140 .map(|m| {
141 let max_leverage = m
142 .leverage_tiers
143 .first()
144 .map(|t| t.max_leverage)
145 .unwrap_or(1.0);
146 let price_decimals = compute_price_decimals(m.tick_size, m.base_lots_decimals);
147 snapshots.get(&m.symbol).map_or(
148 MarketInfo {
149 symbol: m.symbol.clone(),
150 price: 0.0,
151 volume_24h: 0.0,
152 open_interest_usd: 0.0,
153 max_leverage,
154 change_24h: 0.0,
155 price_decimals,
156 isolated_only: m.isolated_only,
157 price_flash: None,
158 },
159 |snap| MarketInfo {
160 symbol: m.symbol.clone(),
161 price: snap.price,
162 volume_24h: snap.volume_24h,
163 open_interest_usd: snap.open_interest_usd,
164 max_leverage,
165 change_24h: snap.change_24h,
166 price_decimals,
167 isolated_only: m.isolated_only,
168 price_flash: None,
169 },
170 )
171 })
172 .collect();
173
174 infos.sort_by(|a, b| {
175 b.volume_24h
176 .partial_cmp(&a.volume_24h)
177 .unwrap_or(std::cmp::Ordering::Equal)
178 });
179
180 infos
181}
182
183fn build_spline_configs(
184 tradable_markets: &[&ExchangeMarketConfig],
185) -> HashMap<String, SplineConfig> {
186 let mut out = HashMap::new();
187 for m in tradable_markets {
188 match build_spline_config(m) {
189 Ok(cfg) => {
190 out.insert(m.symbol.clone(), cfg);
191 }
192 Err(e) => warn!(symbol = %m.symbol, error = %e, "failed to build spline config"),
193 }
194 }
195 out
196}
197
198struct LoadedSetup {
199 http: Arc<PhoenixHttpClient>,
200 ws: Arc<PhoenixClient>,
201 market_infos: Vec<MarketInfo>,
202 spline_configs: HashMap<String, SplineConfig>,
203 symbols: Vec<String>,
204 stat_tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
205 stat_rx: tokio::sync::mpsc::Receiver<MarketStatUpdate>,
206 stat_handles: Vec<PhoenixClientSubscriptionHandle>,
210}
211
212async fn load_setup() -> Result<LoadedSetup, Box<dyn std::error::Error>> {
213 establish_rpc_with_fallback().await;
214
215 let http = Arc::new(PhoenixHttpClient::new_from_env()?);
216 let ws = Arc::new(PhoenixClient::new_from_env().await?);
217
218 let markets = http.markets().get_markets().await?;
219 let tradable_markets: Vec<_> = markets.iter().filter(|m| tradable(m)).collect();
220 let symbols: Vec<String> = tradable_markets.iter().map(|m| m.symbol.clone()).collect();
221
222 let (stat_tx, stat_rx) = tokio::sync::mpsc::channel::<MarketStatUpdate>(128);
224 let (snapshots, stat_handles) =
225 subscribe_market_stats(&ws, &symbols, &stat_tx, Duration::from_secs(3)).await;
226
227 let market_infos = build_market_infos(&tradable_markets, &snapshots);
228 let spline_configs = build_spline_configs(&tradable_markets);
229
230 Ok(LoadedSetup {
231 http,
232 ws,
233 market_infos,
234 spline_configs,
235 symbols,
236 stat_tx,
237 stat_rx,
238 stat_handles,
239 })
240}
241
242pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
243 crate::tui::spawn_auto_priority_fee_refresh();
246
247 let terminal = setup_terminal()?;
250 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
251 let splash = spawn_splash(terminal, stop_rx);
252
253 let setup_result = load_setup().await;
254
255 let _ = stop_tx.send(());
257 let mut terminal = match splash.await {
258 Ok(t) => t,
259 Err(e) => {
260 crate::tui::cleanup_terminal();
261 return Err(e.into());
262 }
263 };
264
265 let LoadedSetup {
266 http,
267 ws,
268 market_infos,
269 spline_configs,
270 symbols,
271 stat_tx,
272 stat_rx,
273 stat_handles: _stat_handles,
274 } = match setup_result {
275 Ok(s) => s,
276 Err(e) => {
277 restore_terminal(&mut terminal);
278 return Err(e);
279 }
280 };
281
282 let (market_tx, market_rx) = tokio::sync::mpsc::channel::<MarketListUpdate>(16);
283
284 let tui_task = spawn_spline_poller(
285 terminal,
286 &ws,
287 market_infos,
288 spline_configs,
289 market_rx,
290 stat_rx,
291 Arc::clone(&http),
292 )
293 .await?;
294
295 let ws_poll = Arc::clone(&ws);
296 let stat_tx_poll = stat_tx.clone();
297 tokio::spawn(async move {
298 let mut known: HashSet<String> = symbols.into_iter().collect();
299 let mut _poll_handles: Vec<PhoenixClientSubscriptionHandle> = Vec::new();
300 let mut interval = tokio::time::interval(MARKETS_POLL_INTERVAL);
301 interval.tick().await;
302 loop {
303 interval.tick().await;
304 let list = match http.markets().get_markets().await {
305 Ok(v) => v,
306 Err(e) => {
307 warn!(error = %e, "poll markets().get_markets failed");
308 continue;
309 }
310 };
311 let tradable: Vec<_> = list.iter().filter(|m| tradable(m)).collect();
312 let mut new_markets = Vec::new();
313 let mut new_configs = HashMap::new();
314 let mut new_symbols = Vec::new();
315
316 for m in &tradable {
317 if known.contains(&m.symbol) {
318 continue;
319 }
320 known.insert(m.symbol.clone());
321 new_symbols.push(m.symbol.clone());
322
323 let max_leverage = m
324 .leverage_tiers
325 .first()
326 .map(|t| t.max_leverage)
327 .unwrap_or(1.0);
328 new_markets.push(MarketInfo {
329 symbol: m.symbol.clone(),
330 price: 0.0,
331 volume_24h: 0.0,
332 open_interest_usd: 0.0,
333 max_leverage,
334 change_24h: 0.0,
335 price_decimals: compute_price_decimals(m.tick_size, m.base_lots_decimals),
336 isolated_only: m.isolated_only,
337 price_flash: None,
338 });
339
340 if let Ok(cfg) = build_spline_config(m) {
341 new_configs.insert(m.symbol.clone(), cfg);
342 }
343 }
344
345 if !new_symbols.is_empty() {
346 let (_, new_handles) = subscribe_market_stats(
347 &ws_poll,
348 &new_symbols,
349 &stat_tx_poll,
350 Duration::from_secs(5),
351 )
352 .await;
353 _poll_handles.extend(new_handles);
354
355 if market_tx
356 .send(MarketListUpdate {
357 markets: new_markets,
358 configs: new_configs,
359 })
360 .await
361 .is_err()
362 {
363 break;
364 }
365 }
366 }
367 });
368
369 tokio::select! {
370 sig = tokio::signal::ctrl_c() => {
371 sig?;
372 }
373 res = tui_task => {
374 if let Err(e) = res {
375 warn!(error = %e, "tui task ended with join error");
376 }
377 }
378 }
379 crate::tui::cleanup_terminal();
380 Ok(())
381}