Skip to main content

cinder/
app.rs

1//! Wire HTTP, WebSocket, and the TUI.
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use std::time::Duration;
6
7use phoenix_rise::{
8    PhoenixClient, PhoenixClientEvent, PhoenixClientSubscriptionHandle, PhoenixHttpClient,
9    PhoenixSubscription, SubscriptionKey,
10};
11use tracing::warn;
12
13use crate::tui::math::pct_change_24h;
14pub use crate::tui::MarketInfo;
15use crate::tui::{
16    build_spline_config, compute_price_decimals, restore_terminal, setup_terminal, spawn_splash,
17    spawn_spline_poller, MarketListUpdate, MarketStatUpdate, SplineConfig,
18};
19
20const MARKETS_POLL_INTERVAL: Duration = Duration::from_secs(60);
21
22struct MarketSnapshot {
23    price: f64,
24    volume_24h: f64,
25    open_interest_usd: f64,
26    change_24h: f64,
27}
28
29fn compute_change(update: &phoenix_rise::MarketStatsUpdate) -> f64 {
30    pct_change_24h(update.mark_price, update.prev_day_mark_price)
31}
32
33fn spawn_stat_forwarder(
34    mut rx: tokio::sync::mpsc::UnboundedReceiver<PhoenixClientEvent>,
35    tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
36) {
37    tokio::spawn(async move {
38        while let Some(event) = rx.recv().await {
39            if let PhoenixClientEvent::MarketUpdate { update, .. } = event {
40                match tx.try_send(update) {
41                    Ok(()) => {}
42                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
43                        // Consumer behind: drop updates instead of blocking
44                        // Phoenix recv loops indefinitely.
45                    }
46                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
47                }
48            }
49        }
50    });
51}
52
53/// Subscribe to market stats for a set of symbols. Waits up to `timeout` for an
54/// initial snapshot per symbol, then spawns persistent forwarder tasks that
55/// pipe all subsequent updates into `stat_tx`.
56///
57/// Returns collected snapshots and the subscription handles (must be kept
58/// alive).
59async fn subscribe_market_stats(
60    ws: &PhoenixClient,
61    symbols: &[String],
62    stat_tx: &tokio::sync::mpsc::Sender<MarketStatUpdate>,
63    timeout: Duration,
64) -> (
65    HashMap<String, MarketSnapshot>,
66    Vec<PhoenixClientSubscriptionHandle>,
67) {
68    let mut handles = Vec::new();
69    let mut initial_rxs = Vec::new();
70
71    for sym in symbols {
72        match ws
73            .subscribe(PhoenixSubscription::Key(SubscriptionKey::market(
74                sym.clone(),
75            )))
76            .await
77        {
78            Ok((rx, handle)) => {
79                initial_rxs.push((sym.clone(), rx));
80                handles.push(handle);
81            }
82            Err(e) => {
83                warn!(symbol = %sym, error = %e, "stats subscribe failed");
84            }
85        }
86    }
87
88    let mut snapshots = HashMap::new();
89    let deadline = tokio::time::Instant::now() + timeout;
90    let mut remaining_rxs = Vec::new();
91
92    for (sym, mut rx) in initial_rxs {
93        let left = deadline.saturating_duration_since(tokio::time::Instant::now());
94        if !left.is_zero() {
95            if let Ok(Some(PhoenixClientEvent::MarketUpdate { update, .. })) =
96                tokio::time::timeout(left, rx.recv()).await
97            {
98                snapshots.insert(
99                    sym.clone(),
100                    MarketSnapshot {
101                        price: update.mark_price,
102                        volume_24h: update.day_volume_usd,
103                        open_interest_usd: update.open_interest * update.mark_price,
104                        change_24h: compute_change(&update),
105                    },
106                );
107                // Also forward the captured update into the runtime channel so
108                // the TUI's per-symbol stats cache is hot before the first
109                // frame renders. Without this, the active market's header
110                // briefly shows "Waiting for market data…" until Phoenix's
111                // next periodic push (which can be several seconds out).
112                let _ = stat_tx.try_send(update);
113            }
114        }
115        remaining_rxs.push((sym, rx));
116    }
117
118    for (_sym, rx) in remaining_rxs {
119        spawn_stat_forwarder(rx, stat_tx.clone());
120    }
121
122    (snapshots, handles)
123}
124
125fn tradable(m: &phoenix_rise::ExchangeMarketConfig) -> bool {
126    matches!(
127        m.market_status,
128        phoenix_rise::types::MarketStatus::Active | phoenix_rise::types::MarketStatus::PostOnly
129    )
130}
131
132fn build_market_infos(
133    tradable_markets: &[&phoenix_rise::ExchangeMarketConfig],
134    snapshots: &HashMap<String, MarketSnapshot>,
135) -> Vec<MarketInfo> {
136    let mut infos: Vec<MarketInfo> = tradable_markets
137        .iter()
138        .map(|m| {
139            let max_leverage = m
140                .leverage_tiers
141                .first()
142                .map(|t| t.max_leverage)
143                .unwrap_or(1.0);
144            let price_decimals = compute_price_decimals(m.tick_size, m.base_lots_decimals);
145            snapshots.get(&m.symbol).map_or(
146                MarketInfo {
147                    symbol: m.symbol.clone(),
148                    price: 0.0,
149                    volume_24h: 0.0,
150                    open_interest_usd: 0.0,
151                    max_leverage,
152                    change_24h: 0.0,
153                    price_decimals,
154                    isolated_only: m.isolated_only,
155                },
156                |snap| MarketInfo {
157                    symbol: m.symbol.clone(),
158                    price: snap.price,
159                    volume_24h: snap.volume_24h,
160                    open_interest_usd: snap.open_interest_usd,
161                    max_leverage,
162                    change_24h: snap.change_24h,
163                    price_decimals,
164                    isolated_only: m.isolated_only,
165                },
166            )
167        })
168        .collect();
169
170    infos.sort_by(|a, b| {
171        b.volume_24h
172            .partial_cmp(&a.volume_24h)
173            .unwrap_or(std::cmp::Ordering::Equal)
174    });
175
176    infos
177}
178
179fn build_spline_configs(
180    tradable_markets: &[&phoenix_rise::ExchangeMarketConfig],
181) -> HashMap<String, SplineConfig> {
182    let mut out = HashMap::new();
183    for m in tradable_markets {
184        match build_spline_config(m) {
185            Ok(cfg) => {
186                out.insert(m.symbol.clone(), cfg);
187            }
188            Err(e) => warn!(symbol = %m.symbol, error = %e, "failed to build spline config"),
189        }
190    }
191    out
192}
193
194struct LoadedSetup {
195    http: Arc<PhoenixHttpClient>,
196    ws: Arc<PhoenixClient>,
197    market_infos: Vec<MarketInfo>,
198    spline_configs: HashMap<String, SplineConfig>,
199    symbols: Vec<String>,
200    stat_tx: tokio::sync::mpsc::Sender<MarketStatUpdate>,
201    stat_rx: tokio::sync::mpsc::Receiver<MarketStatUpdate>,
202    // Dropping these unsubscribes the per-market stats streams. Must outlive
203    // `run()` — otherwise the order-book header is stuck on "Waiting for
204    // market data…" because no stat updates ever flow.
205    stat_handles: Vec<PhoenixClientSubscriptionHandle>,
206}
207
208async fn load_setup() -> Result<LoadedSetup, Box<dyn std::error::Error>> {
209    let http = Arc::new(PhoenixHttpClient::new_from_env()?);
210    let ws = Arc::new(PhoenixClient::new_from_env().await?);
211
212    let markets = http.get_markets().await?;
213    let tradable_markets: Vec<_> = markets.iter().filter(|m| tradable(m)).collect();
214    let symbols: Vec<String> = tradable_markets.iter().map(|m| m.symbol.clone()).collect();
215
216    // Bounded buffer; 512 × stat payload was unused headroom on typical machines.
217    let (stat_tx, stat_rx) = tokio::sync::mpsc::channel::<MarketStatUpdate>(128);
218    let (snapshots, stat_handles) =
219        subscribe_market_stats(&ws, &symbols, &stat_tx, Duration::from_secs(3)).await;
220
221    let market_infos = build_market_infos(&tradable_markets, &snapshots);
222    let spline_configs = build_spline_configs(&tradable_markets);
223
224    Ok(LoadedSetup {
225        http,
226        ws,
227        market_infos,
228        spline_configs,
229        symbols,
230        stat_tx,
231        stat_rx,
232        stat_handles,
233    })
234}
235
236pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
237    // Kick off the auto priority-fee refresh task; runs in the background for
238    // the lifetime of the process and seeds the dynamic CU-price default.
239    crate::tui::spawn_auto_priority_fee_refresh();
240
241    // Bring up the alt-screen terminal up-front so the splash can paint over
242    // the otherwise blank startup window.
243    let terminal = setup_terminal()?;
244    let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
245    let splash = spawn_splash(terminal, stop_rx);
246
247    let setup_result = load_setup().await;
248
249    // Stop the splash and reclaim the terminal regardless of outcome.
250    let _ = stop_tx.send(());
251    let mut terminal = match splash.await {
252        Ok(t) => t,
253        Err(e) => {
254            crate::tui::cleanup_terminal();
255            return Err(e.into());
256        }
257    };
258
259    let LoadedSetup {
260        http,
261        ws,
262        market_infos,
263        spline_configs,
264        symbols,
265        stat_tx,
266        stat_rx,
267        stat_handles: _stat_handles,
268    } = match setup_result {
269        Ok(s) => s,
270        Err(e) => {
271            restore_terminal(&mut terminal);
272            return Err(e);
273        }
274    };
275
276    let (market_tx, market_rx) = tokio::sync::mpsc::channel::<MarketListUpdate>(16);
277
278    let tui_task = spawn_spline_poller(
279        terminal,
280        &ws,
281        market_infos,
282        spline_configs,
283        market_rx,
284        stat_rx,
285        Arc::clone(&http),
286    )
287    .await?;
288
289    let ws_poll = Arc::clone(&ws);
290    let stat_tx_poll = stat_tx.clone();
291    tokio::spawn(async move {
292        let mut known: HashSet<String> = symbols.into_iter().collect();
293        let mut _poll_handles: Vec<PhoenixClientSubscriptionHandle> = Vec::new();
294        let mut interval = tokio::time::interval(MARKETS_POLL_INTERVAL);
295        interval.tick().await;
296        loop {
297            interval.tick().await;
298            let list = match http.get_markets().await {
299                Ok(v) => v,
300                Err(e) => {
301                    warn!(error = %e, "poll get_markets failed");
302                    continue;
303                }
304            };
305            let tradable: Vec<_> = list.iter().filter(|m| tradable(m)).collect();
306            let mut new_markets = Vec::new();
307            let mut new_configs = HashMap::new();
308            let mut new_symbols = Vec::new();
309
310            for m in &tradable {
311                if known.contains(&m.symbol) {
312                    continue;
313                }
314                known.insert(m.symbol.clone());
315                new_symbols.push(m.symbol.clone());
316
317                let max_leverage = m
318                    .leverage_tiers
319                    .first()
320                    .map(|t| t.max_leverage)
321                    .unwrap_or(1.0);
322                new_markets.push(MarketInfo {
323                    symbol: m.symbol.clone(),
324                    price: 0.0,
325                    volume_24h: 0.0,
326                    open_interest_usd: 0.0,
327                    max_leverage,
328                    change_24h: 0.0,
329                    price_decimals: compute_price_decimals(m.tick_size, m.base_lots_decimals),
330                    isolated_only: m.isolated_only,
331                });
332
333                if let Ok(cfg) = build_spline_config(m) {
334                    new_configs.insert(m.symbol.clone(), cfg);
335                }
336            }
337
338            if !new_symbols.is_empty() {
339                let (_, new_handles) = subscribe_market_stats(
340                    &ws_poll,
341                    &new_symbols,
342                    &stat_tx_poll,
343                    Duration::from_secs(5),
344                )
345                .await;
346                _poll_handles.extend(new_handles);
347
348                if market_tx
349                    .send(MarketListUpdate {
350                        markets: new_markets,
351                        configs: new_configs,
352                    })
353                    .await
354                    .is_err()
355                {
356                    break;
357                }
358            }
359        }
360    });
361
362    tokio::select! {
363        sig = tokio::signal::ctrl_c() => {
364            sig?;
365        }
366        res = tui_task => {
367            if let Err(e) = res {
368                warn!(error = %e, "tui task ended with join error");
369            }
370        }
371    }
372    crate::tui::cleanup_terminal();
373    Ok(())
374}