1use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use phoenix_rise::{
8 PhoenixClient, PhoenixClientEvent, PhoenixClientSubscriptionHandle, PhoenixHttpClient,
9 PhoenixSubscription, SubscriptionKey,
10};
11use tracing::warn;
12
13use crate::tui::math::pct_change_24h;
14pub use crate::tui::MarketInfo;
15use crate::tui::{
16 build_spline_config, compute_price_decimals, restore_terminal, setup_terminal, spawn_splash,
17 spawn_spline_poller, MarketListUpdate, MarketStatUpdate, SplineConfig,
18};
19
20const MARKETS_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22struct MarketSnapshot {
23 price: f64,
24 volume_24h: f64,
25 open_interest_usd: f64,
26 change_24h: f64,
27}
28
29fn compute_change(update: &phoenix_rise::MarketStatsUpdate) -> f64 {
30 pct_change_24h(update.mark_price, update.prev_day_mark_price)
31}
32
33fn spawn_stat_forwarder(
34 mut rx: tokio::sync::mpsc::UnboundedReceiver<PhoenixClientEvent>,
35 tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
36) {
37 tokio::spawn(async move {
38 while let Some(event) = rx.recv().await {
39 if let PhoenixClientEvent::MarketUpdate { update, .. } = event {
40 match tx.try_send(update) {
41 Ok(()) => {}
42 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
43 }
46 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
47 }
48 }
49 }
50 });
51}
52
53async fn subscribe_market_stats(
60 ws: &PhoenixClient,
61 symbols: &[String],
62 stat_tx: &tokio::sync::mpsc::Sender<MarketStatUpdate>,
63 timeout: Duration,
64) -> (
65 HashMap<String, MarketSnapshot>,
66 Vec<PhoenixClientSubscriptionHandle>,
67) {
68 let mut handles = Vec::new();
69 let mut initial_rxs = Vec::new();
70
71 for sym in symbols {
72 match ws
73 .subscribe(PhoenixSubscription::Key(SubscriptionKey::market(
74 sym.clone(),
75 )))
76 .await
77 {
78 Ok((rx, handle)) => {
79 initial_rxs.push((sym.clone(), rx));
80 handles.push(handle);
81 }
82 Err(e) => {
83 warn!(symbol = %sym, error = %e, "stats subscribe failed");
84 }
85 }
86 }
87
88 let mut snapshots = HashMap::new();
89 let deadline = tokio::time::Instant::now() + timeout;
90 let mut remaining_rxs = Vec::new();
91
92 for (sym, mut rx) in initial_rxs {
93 let left = deadline.saturating_duration_since(tokio::time::Instant::now());
94 if !left.is_zero() {
95 if let Ok(Some(PhoenixClientEvent::MarketUpdate { update, .. })) =
96 tokio::time::timeout(left, rx.recv()).await
97 {
98 snapshots.insert(
99 sym.clone(),
100 MarketSnapshot {
101 price: update.mark_price,
102 volume_24h: update.day_volume_usd,
103 open_interest_usd: update.open_interest * update.mark_price,
104 change_24h: compute_change(&update),
105 },
106 );
107 let _ = stat_tx.try_send(update);
113 }
114 }
115 remaining_rxs.push((sym, rx));
116 }
117
118 for (_sym, rx) in remaining_rxs {
119 spawn_stat_forwarder(rx, stat_tx.clone());
120 }
121
122 (snapshots, handles)
123}
124
125fn tradable(m: &phoenix_rise::ExchangeMarketConfig) -> bool {
126 matches!(
127 m.market_status,
128 phoenix_rise::types::MarketStatus::Active | phoenix_rise::types::MarketStatus::PostOnly
129 )
130}
131
132fn build_market_infos(
133 tradable_markets: &[&phoenix_rise::ExchangeMarketConfig],
134 snapshots: &HashMap<String, MarketSnapshot>,
135) -> Vec<MarketInfo> {
136 let mut infos: Vec<MarketInfo> = tradable_markets
137 .iter()
138 .map(|m| {
139 let max_leverage = m
140 .leverage_tiers
141 .first()
142 .map(|t| t.max_leverage)
143 .unwrap_or(1.0);
144 let price_decimals = compute_price_decimals(m.tick_size, m.base_lots_decimals);
145 snapshots.get(&m.symbol).map_or(
146 MarketInfo {
147 symbol: m.symbol.clone(),
148 price: 0.0,
149 volume_24h: 0.0,
150 open_interest_usd: 0.0,
151 max_leverage,
152 change_24h: 0.0,
153 price_decimals,
154 isolated_only: m.isolated_only,
155 },
156 |snap| MarketInfo {
157 symbol: m.symbol.clone(),
158 price: snap.price,
159 volume_24h: snap.volume_24h,
160 open_interest_usd: snap.open_interest_usd,
161 max_leverage,
162 change_24h: snap.change_24h,
163 price_decimals,
164 isolated_only: m.isolated_only,
165 },
166 )
167 })
168 .collect();
169
170 infos.sort_by(|a, b| {
171 b.volume_24h
172 .partial_cmp(&a.volume_24h)
173 .unwrap_or(std::cmp::Ordering::Equal)
174 });
175
176 infos
177}
178
179fn build_spline_configs(
180 tradable_markets: &[&phoenix_rise::ExchangeMarketConfig],
181) -> HashMap<String, SplineConfig> {
182 let mut out = HashMap::new();
183 for m in tradable_markets {
184 match build_spline_config(m) {
185 Ok(cfg) => {
186 out.insert(m.symbol.clone(), cfg);
187 }
188 Err(e) => warn!(symbol = %m.symbol, error = %e, "failed to build spline config"),
189 }
190 }
191 out
192}
193
194struct LoadedSetup {
195 http: Arc<PhoenixHttpClient>,
196 ws: Arc<PhoenixClient>,
197 market_infos: Vec<MarketInfo>,
198 spline_configs: HashMap<String, SplineConfig>,
199 symbols: Vec<String>,
200 stat_tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
201 stat_rx: tokio::sync::mpsc::Receiver<MarketStatUpdate>,
202 stat_handles: Vec<PhoenixClientSubscriptionHandle>,
206}
207
208async fn load_setup() -> Result<LoadedSetup, Box<dyn std::error::Error>> {
209 let http = Arc::new(PhoenixHttpClient::new_from_env()?);
210 let ws = Arc::new(PhoenixClient::new_from_env().await?);
211
212 let markets = http.get_markets().await?;
213 let tradable_markets: Vec<_> = markets.iter().filter(|m| tradable(m)).collect();
214 let symbols: Vec<String> = tradable_markets.iter().map(|m| m.symbol.clone()).collect();
215
216 let (stat_tx, stat_rx) = tokio::sync::mpsc::channel::<MarketStatUpdate>(128);
218 let (snapshots, stat_handles) =
219 subscribe_market_stats(&ws, &symbols, &stat_tx, Duration::from_secs(3)).await;
220
221 let market_infos = build_market_infos(&tradable_markets, &snapshots);
222 let spline_configs = build_spline_configs(&tradable_markets);
223
224 Ok(LoadedSetup {
225 http,
226 ws,
227 market_infos,
228 spline_configs,
229 symbols,
230 stat_tx,
231 stat_rx,
232 stat_handles,
233 })
234}
235
236pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
237 crate::tui::spawn_auto_priority_fee_refresh();
240
241 let terminal = setup_terminal()?;
244 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
245 let splash = spawn_splash(terminal, stop_rx);
246
247 let setup_result = load_setup().await;
248
249 let _ = stop_tx.send(());
251 let mut terminal = match splash.await {
252 Ok(t) => t,
253 Err(e) => {
254 crate::tui::cleanup_terminal();
255 return Err(e.into());
256 }
257 };
258
259 let LoadedSetup {
260 http,
261 ws,
262 market_infos,
263 spline_configs,
264 symbols,
265 stat_tx,
266 stat_rx,
267 stat_handles: _stat_handles,
268 } = match setup_result {
269 Ok(s) => s,
270 Err(e) => {
271 restore_terminal(&mut terminal);
272 return Err(e);
273 }
274 };
275
276 let (market_tx, market_rx) = tokio::sync::mpsc::channel::<MarketListUpdate>(16);
277
278 let tui_task = spawn_spline_poller(
279 terminal,
280 &ws,
281 market_infos,
282 spline_configs,
283 market_rx,
284 stat_rx,
285 Arc::clone(&http),
286 )
287 .await?;
288
289 let ws_poll = Arc::clone(&ws);
290 let stat_tx_poll = stat_tx.clone();
291 tokio::spawn(async move {
292 let mut known: HashSet<String> = symbols.into_iter().collect();
293 let mut _poll_handles: Vec<PhoenixClientSubscriptionHandle> = Vec::new();
294 let mut interval = tokio::time::interval(MARKETS_POLL_INTERVAL);
295 interval.tick().await;
296 loop {
297 interval.tick().await;
298 let list = match http.get_markets().await {
299 Ok(v) => v,
300 Err(e) => {
301 warn!(error = %e, "poll get_markets failed");
302 continue;
303 }
304 };
305 let tradable: Vec<_> = list.iter().filter(|m| tradable(m)).collect();
306 let mut new_markets = Vec::new();
307 let mut new_configs = HashMap::new();
308 let mut new_symbols = Vec::new();
309
310 for m in &tradable {
311 if known.contains(&m.symbol) {
312 continue;
313 }
314 known.insert(m.symbol.clone());
315 new_symbols.push(m.symbol.clone());
316
317 let max_leverage = m
318 .leverage_tiers
319 .first()
320 .map(|t| t.max_leverage)
321 .unwrap_or(1.0);
322 new_markets.push(MarketInfo {
323 symbol: m.symbol.clone(),
324 price: 0.0,
325 volume_24h: 0.0,
326 open_interest_usd: 0.0,
327 max_leverage,
328 change_24h: 0.0,
329 price_decimals: compute_price_decimals(m.tick_size, m.base_lots_decimals),
330 isolated_only: m.isolated_only,
331 });
332
333 if let Ok(cfg) = build_spline_config(m) {
334 new_configs.insert(m.symbol.clone(), cfg);
335 }
336 }
337
338 if !new_symbols.is_empty() {
339 let (_, new_handles) = subscribe_market_stats(
340 &ws_poll,
341 &new_symbols,
342 &stat_tx_poll,
343 Duration::from_secs(5),
344 )
345 .await;
346 _poll_handles.extend(new_handles);
347
348 if market_tx
349 .send(MarketListUpdate {
350 markets: new_markets,
351 configs: new_configs,
352 })
353 .await
354 .is_err()
355 {
356 break;
357 }
358 }
359 }
360 });
361
362 tokio::select! {
363 sig = tokio::signal::ctrl_c() => {
364 sig?;
365 }
366 res = tui_task => {
367 if let Err(e) = res {
368 warn!(error = %e, "tui task ended with join error");
369 }
370 }
371 }
372 crate::tui::cleanup_terminal();
373 Ok(())
374}