Skip to main content

cinder/
app.rs

1//! Wire HTTP, WebSocket, and the TUI.
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use phoenix_rise::api::{
8    PhoenixClient, PhoenixClientEvent, PhoenixClientSubscriptionHandle, PhoenixHttpClient,
9    PhoenixSubscription, SubscriptionKey,
10};
11use phoenix_rise::types::exchange::ExchangeMarketConfig;
12use phoenix_rise::types::market::{MarketStatsUpdate, MarketStatus};
13use tracing::warn;
14
15pub use crate::tui::MarketInfo;
16use crate::tui::math::pct_change_24h;
17use crate::tui::{
18    MarketListUpdate, MarketStatUpdate, SplineConfig, build_spline_config, compute_price_decimals,
19    establish_rpc_with_fallback, restore_terminal, setup_terminal, spawn_splash,
20    spawn_spline_poller,
21};
22
23const MARKETS_POLL_INTERVAL: Duration = Duration::from_secs(60);
24
25struct MarketSnapshot {
26    price: f64,
27    volume_24h: f64,
28    open_interest_usd: f64,
29    change_24h: f64,
30}
31
32fn compute_change(update: &MarketStatsUpdate) -> f64 {
33    pct_change_24h(update.mark_price, update.prev_day_mark_price)
34}
35
36fn spawn_stat_forwarder(
37    mut rx: tokio::sync::mpsc::UnboundedReceiver<PhoenixClientEvent>,
38    tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
39) {
40    tokio::spawn(async move {
41        while let Some(event) = rx.recv().await {
42            if let PhoenixClientEvent::MarketUpdate { update, .. } = event {
43                match tx.try_send(update) {
44                    Ok(()) => {}
45                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
46                        // Consumer behind: drop updates instead of blocking
47                        // Phoenix recv loops indefinitely.
48                    }
49                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
50                }
51            }
52        }
53    });
54}
55
56/// Subscribe to market stats for a set of symbols. Waits up to `timeout` for an
57/// initial snapshot per symbol, then spawns persistent forwarder tasks that
58/// pipe all subsequent updates into `stat_tx`.
59///
60/// Returns collected snapshots and the subscription handles (must be kept
61/// alive).
62async fn subscribe_market_stats(
63    ws: &PhoenixClient,
64    symbols: &[String],
65    stat_tx: &tokio::sync::mpsc::Sender<MarketStatUpdate>,
66    timeout: Duration,
67) -> (
68    HashMap<String, MarketSnapshot>,
69    Vec<PhoenixClientSubscriptionHandle>,
70) {
71    let mut handles = Vec::new();
72    let mut initial_rxs = Vec::new();
73
74    for sym in symbols {
75        match ws
76            .subscribe(PhoenixSubscription::Key(SubscriptionKey::market(
77                sym.clone(),
78            )))
79            .await
80        {
81            Ok((rx, handle)) => {
82                initial_rxs.push((sym.clone(), rx));
83                handles.push(handle);
84            }
85            Err(e) => {
86                warn!(symbol = %sym, error = %e, "stats subscribe failed");
87            }
88        }
89    }
90
91    let mut snapshots = HashMap::new();
92    let deadline = tokio::time::Instant::now() + timeout;
93    let mut remaining_rxs = Vec::new();
94
95    for (sym, mut rx) in initial_rxs {
96        let left = deadline.saturating_duration_since(tokio::time::Instant::now());
97        if !left.is_zero()
98            && let Ok(Some(PhoenixClientEvent::MarketUpdate { update, .. })) =
99                tokio::time::timeout(left, rx.recv()).await
100        {
101            snapshots.insert(
102                sym.clone(),
103                MarketSnapshot {
104                    price: update.mark_price,
105                    volume_24h: update.day_volume_usd,
106                    open_interest_usd: update.open_interest * update.mark_price,
107                    change_24h: compute_change(&update),
108                },
109            );
110            // Also forward the captured update into the runtime channel so
111            // the TUI's per-symbol stats cache is hot before the first
112            // frame renders. Without this, the active market's header
113            // briefly shows "Waiting for market data…" until Phoenix's
114            // next periodic push (which can be several seconds out).
115            let _ = stat_tx.try_send(update);
116        }
117        remaining_rxs.push((sym, rx));
118    }
119
120    for (_sym, rx) in remaining_rxs {
121        spawn_stat_forwarder(rx, stat_tx.clone());
122    }
123
124    (snapshots, handles)
125}
126
127fn tradable(m: &ExchangeMarketConfig) -> bool {
128    matches!(
129        m.market_status,
130        MarketStatus::Active | MarketStatus::PostOnly
131    )
132}
133
134fn build_market_infos(
135    tradable_markets: &[&ExchangeMarketConfig],
136    snapshots: &HashMap<String, MarketSnapshot>,
137) -> Vec<MarketInfo> {
138    let mut infos: Vec<MarketInfo> = tradable_markets
139        .iter()
140        .map(|m| {
141            let max_leverage = m
142                .leverage_tiers
143                .first()
144                .map(|t| t.max_leverage)
145                .unwrap_or(1.0);
146            let price_decimals = compute_price_decimals(m.tick_size, m.base_lots_decimals);
147            snapshots.get(&m.symbol).map_or(
148                MarketInfo {
149                    symbol: m.symbol.clone(),
150                    price: 0.0,
151                    volume_24h: 0.0,
152                    open_interest_usd: 0.0,
153                    max_leverage,
154                    change_24h: 0.0,
155                    price_decimals,
156                    isolated_only: m.isolated_only,
157                    price_flash: None,
158                },
159                |snap| MarketInfo {
160                    symbol: m.symbol.clone(),
161                    price: snap.price,
162                    volume_24h: snap.volume_24h,
163                    open_interest_usd: snap.open_interest_usd,
164                    max_leverage,
165                    change_24h: snap.change_24h,
166                    price_decimals,
167                    isolated_only: m.isolated_only,
168                    price_flash: None,
169                },
170            )
171        })
172        .collect();
173
174    infos.sort_by(|a, b| {
175        b.volume_24h
176            .partial_cmp(&a.volume_24h)
177            .unwrap_or(std::cmp::Ordering::Equal)
178    });
179
180    infos
181}
182
183fn build_spline_configs(
184    tradable_markets: &[&ExchangeMarketConfig],
185) -> HashMap<String, SplineConfig> {
186    let mut out = HashMap::new();
187    for m in tradable_markets {
188        match build_spline_config(m) {
189            Ok(cfg) => {
190                out.insert(m.symbol.clone(), cfg);
191            }
192            Err(e) => warn!(symbol = %m.symbol, error = %e, "failed to build spline config"),
193        }
194    }
195    out
196}
197
198struct LoadedSetup {
199    http: Arc<PhoenixHttpClient>,
200    ws: Arc<PhoenixClient>,
201    market_infos: Vec<MarketInfo>,
202    spline_configs: HashMap<String, SplineConfig>,
203    symbols: Vec<String>,
204    stat_tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
205    stat_rx: tokio::sync::mpsc::Receiver<MarketStatUpdate>,
206    // Dropping these unsubscribes the per-market stats streams. Must outlive
207    // `run()` — otherwise the order-book header is stuck on "Waiting for
208    // market data…" because no stat updates ever flow.
209    stat_handles: Vec<PhoenixClientSubscriptionHandle>,
210}
211
212async fn load_setup() -> Result<LoadedSetup, Box<dyn std::error::Error>> {
213    establish_rpc_with_fallback().await;
214
215    let http = Arc::new(PhoenixHttpClient::new_from_env()?);
216    let ws = Arc::new(PhoenixClient::new_from_env().await?);
217
218    let markets = http.markets().get_markets().await?;
219    let tradable_markets: Vec<_> = markets.iter().filter(|m| tradable(m)).collect();
220    let symbols: Vec<String> = tradable_markets.iter().map(|m| m.symbol.clone()).collect();
221
222    // Bounded buffer; 512 × stat payload was unused headroom on typical machines.
223    let (stat_tx, stat_rx) = tokio::sync::mpsc::channel::<MarketStatUpdate>(128);
224    let (snapshots, stat_handles) =
225        subscribe_market_stats(&ws, &symbols, &stat_tx, Duration::from_secs(3)).await;
226
227    let market_infos = build_market_infos(&tradable_markets, &snapshots);
228    let spline_configs = build_spline_configs(&tradable_markets);
229
230    Ok(LoadedSetup {
231        http,
232        ws,
233        market_infos,
234        spline_configs,
235        symbols,
236        stat_tx,
237        stat_rx,
238        stat_handles,
239    })
240}
241
242pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
243    // Kick off the auto priority-fee refresh task; runs in the background for
244    // the lifetime of the process and seeds the dynamic CU-price default.
245    crate::tui::spawn_auto_priority_fee_refresh();
246
247    // Bring up the alt-screen terminal up-front so the splash can paint over
248    // the otherwise blank startup window.
249    let terminal = setup_terminal()?;
250    let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
251    let splash = spawn_splash(terminal, stop_rx);
252
253    let setup_result = load_setup().await;
254
255    // Stop the splash and reclaim the terminal regardless of outcome.
256    let _ = stop_tx.send(());
257    let mut terminal = match splash.await {
258        Ok(t) => t,
259        Err(e) => {
260            crate::tui::cleanup_terminal();
261            return Err(e.into());
262        }
263    };
264
265    let LoadedSetup {
266        http,
267        ws,
268        market_infos,
269        spline_configs,
270        symbols,
271        stat_tx,
272        stat_rx,
273        stat_handles: _stat_handles,
274    } = match setup_result {
275        Ok(s) => s,
276        Err(e) => {
277            restore_terminal(&mut terminal);
278            return Err(e);
279        }
280    };
281
282    let (market_tx, market_rx) = tokio::sync::mpsc::channel::<MarketListUpdate>(16);
283
284    let tui_task = spawn_spline_poller(
285        terminal,
286        &ws,
287        market_infos,
288        spline_configs,
289        market_rx,
290        stat_rx,
291        Arc::clone(&http),
292    )
293    .await?;
294
295    let ws_poll = Arc::clone(&ws);
296    let stat_tx_poll = stat_tx.clone();
297    tokio::spawn(async move {
298        let mut known: HashSet<String> = symbols.into_iter().collect();
299        let mut _poll_handles: Vec<PhoenixClientSubscriptionHandle> = Vec::new();
300        let mut interval = tokio::time::interval(MARKETS_POLL_INTERVAL);
301        interval.tick().await;
302        loop {
303            interval.tick().await;
304            let list = match http.markets().get_markets().await {
305                Ok(v) => v,
306                Err(e) => {
307                    warn!(error = %e, "poll markets().get_markets failed");
308                    continue;
309                }
310            };
311            let tradable: Vec<_> = list.iter().filter(|m| tradable(m)).collect();
312            let mut new_markets = Vec::new();
313            let mut new_configs = HashMap::new();
314            let mut new_symbols = Vec::new();
315
316            for m in &tradable {
317                if known.contains(&m.symbol) {
318                    continue;
319                }
320                known.insert(m.symbol.clone());
321                new_symbols.push(m.symbol.clone());
322
323                let max_leverage = m
324                    .leverage_tiers
325                    .first()
326                    .map(|t| t.max_leverage)
327                    .unwrap_or(1.0);
328                new_markets.push(MarketInfo {
329                    symbol: m.symbol.clone(),
330                    price: 0.0,
331                    volume_24h: 0.0,
332                    open_interest_usd: 0.0,
333                    max_leverage,
334                    change_24h: 0.0,
335                    price_decimals: compute_price_decimals(m.tick_size, m.base_lots_decimals),
336                    isolated_only: m.isolated_only,
337                    price_flash: None,
338                });
339
340                if let Ok(cfg) = build_spline_config(m) {
341                    new_configs.insert(m.symbol.clone(), cfg);
342                }
343            }
344
345            if !new_symbols.is_empty() {
346                let (_, new_handles) = subscribe_market_stats(
347                    &ws_poll,
348                    &new_symbols,
349                    &stat_tx_poll,
350                    Duration::from_secs(5),
351                )
352                .await;
353                _poll_handles.extend(new_handles);
354
355                if market_tx
356                    .send(MarketListUpdate {
357                        markets: new_markets,
358                        configs: new_configs,
359                    })
360                    .await
361                    .is_err()
362                {
363                    break;
364                }
365            }
366        }
367    });
368
369    tokio::select! {
370        sig = tokio::signal::ctrl_c() => {
371            sig?;
372        }
373        res = tui_task => {
374            if let Err(e) = res {
375                warn!(error = %e, "tui task ended with join error");
376            }
377        }
378    }
379    crate::tui::cleanup_terminal();
380    Ok(())
381}