Skip to main content

indodax_cli/commands/
websocket.rs

1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15async fn fetch_public_ws_token(client: &IndodaxClient) -> Result<String> {
16    helpers::fetch_public_ws_token(client).await
17}
18
19#[derive(Debug, clap::Subcommand)]
20pub enum WebSocketCommand {
21    #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
22    Ticker {
23        #[arg(default_value = "btc_idr")]
24        pair: String,
25    },
26
27    #[command(name = "trades", about = "Stream real-time trades for a pair")]
28    Trades {
29        #[arg(default_value = "btc_idr")]
30        pair: String,
31    },
32
33    #[command(name = "book", about = "Stream real-time order book for a pair")]
34    Book {
35        #[arg(default_value = "btc_idr")]
36        pair: String,
37    },
38
39    #[command(name = "summary", about = "Stream 24h summary for all pairs")]
40    Summary,
41
42    #[command(name = "orders", about = "Stream private order updates")]
43    Orders,
44}
45
46pub async fn execute(
47    client: &IndodaxClient,
48    cmd: &WebSocketCommand,
49    output_format: OutputFormat,
50) -> Result<CommandOutput> {
51    match cmd {
52        WebSocketCommand::Ticker { pair } => {
53            let pair = helpers::normalize_pair(pair);
54            ws_ticker(client, &pair, output_format).await
55        }
56        WebSocketCommand::Trades { pair } => {
57            let pair = helpers::normalize_pair(pair);
58            ws_trades(client, &pair, output_format).await
59        }
60        WebSocketCommand::Book { pair } => {
61            let pair = helpers::normalize_pair(pair);
62            ws_book(client, &pair, output_format).await
63        }
64        WebSocketCommand::Summary => ws_summary(client, output_format).await,
65        WebSocketCommand::Orders => ws_orders(client, output_format).await,
66    }
67}
68
69async fn ws_connect_and_listen(
70    ws_url: &str,
71    token: &str,
72    channel: &str,
73    handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
74    output_format: OutputFormat,
75) -> Result<CommandOutput> {
76    let spinner = if output_format == OutputFormat::Json {
77        eprintln!(
78            "{}",
79            serde_json::json!({"event": "connecting", "url": ws_url})
80        );
81        None
82    } else {
83        let pb = ProgressBar::new_spinner();
84        pb.set_message("Connecting to Indodax WebSocket...");
85        pb.enable_steady_tick(std::time::Duration::from_millis(100));
86        Some(pb)
87    };
88    let (mut ws_stream, _) = connect_async(ws_url).await?;
89    if let Some(ref pb) = spinner {
90        pb.set_message("Connected. Authenticating...");
91    } else {
92        eprintln!(
93            "{}",
94            serde_json::json!({"event": "connected", "status": "authenticating"})
95        );
96    }
97
98    let auth_msg = serde_json::json!({
99        "params": { "token": token },
100        "id": 1
101    });
102    ws_stream
103        .send(Message::Text(auth_msg.to_string()))
104        .await?;
105
106    let mut authed = false;
107
108    let mut events: Vec<serde_json::Value> = Vec::new();
109
110    loop {
111        tokio::select! {
112            _ = tokio::signal::ctrl_c() => {
113                if let Some(ref pb) = spinner {
114                    pb.finish_and_clear();
115                    eprintln!("Interrupted by user. Closing connection...");
116                } else {
117                    eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
118                }
119                let _ = ws_stream.send(Message::Close(None)).await;
120                break;
121            }
122            msg = ws_stream.next() => {
123                let msg = match msg {
124                    Some(m) => m,
125                    None => break,
126                };
127
128                match msg {
129                    Ok(Message::Text(text)) => {
130                        let val = match serde_json::from_str::<serde_json::Value>(&text) {
131                            Ok(v) => v,
132                            Err(e) => {
133                                tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
134                                continue;
135                            }
136                        };
137
138                        if !authed {
139                            if val.get("id").and_then(|v| v.as_i64()) == Some(1)
140                                && val.get("result").is_some()
141                            {
142                                authed = true;
143                                if let Some(ref pb) = spinner {
144                                    pb.finish_and_clear();
145                                    eprintln!("Authenticated. Subscribing to channel: {}", channel);
146                                    eprintln!();
147                                } else {
148                                    eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
149                                }
150                                let sub_msg = serde_json::json!({
151                                    "method": 1,
152                                    "params": { "channel": channel },
153                                    "id": 2
154                                });
155                                ws_stream
156                                    .send(Message::Text(sub_msg.to_string()))
157                                    .await?;
158                            }
159                            continue;
160                        }
161
162                        if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
163                            // subscription confirmation, skip
164                        } else if val.get("result").is_some() {
165                            if let Some(event) = handler(val) {
166                                events.push(event);
167                            }
168                        }
169                    }
170                    Ok(Message::Ping(_)) => {
171                        let _ = ws_stream.send(Message::Pong(vec![])).await;
172                    }
173                    Ok(Message::Close(_)) => {
174                        if let Some(ref pb) = spinner {
175                            pb.finish_and_clear();
176                            eprintln!("Connection closed by server.");
177                        } else {
178                            eprintln!("{}", serde_json::json!({"event": "disconnected", "reason": "server_close"}));
179                        }
180                        break;
181                    }
182                    Err(e) => {
183                        if let Some(ref pb) = spinner {
184                            pb.finish_and_clear();
185                            eprintln!("WebSocket error: {}", e);
186                        } else {
187                            eprintln!("{}", serde_json::json!({"event": "error", "message": e.to_string()}));
188                        }
189                        break;
190                    }
191                    _ => {}
192                }
193            }
194        }
195    }
196
197    Ok(CommandOutput::json(serde_json::json!({
198        "status": "disconnected",
199        "events": events,
200        "event_count": events.len(),
201    })))
202}
203
204fn format_ws_price(val: &serde_json::Value) -> Option<String> {
205    let f = val.as_f64()
206        .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
207    if f == 0.0 {
208        return Some("0".into());
209    }
210    if f.fract() == 0.0 && f.abs() >= 1.0 {
211        return Some(format!("{}", f as u64));
212    }
213    let s = format!("{:.8}", f);
214    let trimmed = s.trim_end_matches('0');
215    Some(trimmed.trim_end_matches('.').to_string())
216}
217
218async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
219    let channel = format!("chart:tick-{}", pair);
220    let token = fetch_public_ws_token(client).await?;
221    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
222        let rows = &val["result"]["data"]["data"];
223        let mut last_event = None;
224        if let serde_json::Value::Array(arr) = rows {
225            for row in arr {
226                if let serde_json::Value::Array(fields) = row {
227                    if fields.len() >= 4 {
228                        let ts = fields[0].as_u64().unwrap_or(0);
229                        let price = format_ws_price(&fields[2]).unwrap_or_default();
230                        let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
231                            .map(|d| d.format("%H:%M:%S").to_string())
232                            .unwrap_or_default();
233                        if output_format == OutputFormat::Json {
234                            println!("{}", serde_json::json!({
235                                "event": "ticker", "pair": pair, "time": time_str, "price": price
236                            }));
237                        } else {
238                            println!("[{}] {}  {}", time_str, pair, price);
239                        }
240                        last_event = Some(serde_json::json!({
241                            "event": "ticker", "pair": pair, "time": time_str, "price": price
242                        }));
243                    }
244                }
245            }
246        }
247        last_event
248    }, output_format)
249    .await
250}
251
252async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
253    let channel = format!("market:trade-activity-{}", pair);
254    let token = fetch_public_ws_token(client).await?;
255    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
256        let rows = &val["result"]["data"]["data"];
257        let mut last_event = None;
258        if let serde_json::Value::Array(arr) = rows {
259            for row in arr {
260                if let serde_json::Value::Array(fields) = row {
261                    if fields.len() >= 7 {
262                        let ts = fields[1].as_u64().unwrap_or(0);
263                        let side = fields[3].as_str().unwrap_or("?");
264                        let price = fields[4].as_f64().unwrap_or(0.0);
265                        let volume = fields[6].as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
266                        let time_str = chrono::DateTime::from_timestamp(ts as i64, 0)
267                            .map(|d| d.format("%H:%M:%S").to_string())
268                            .unwrap_or_default();
269                        if output_format == OutputFormat::Json {
270                            println!("{}", serde_json::json!({
271                                "event": "trade", "pair": pair, "time": time_str,
272                                "side": side, "price": price, "volume": volume
273                            }));
274                        } else {
275                            println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
276                        }
277                        last_event = Some(serde_json::json!({
278                            "event": "trade", "pair": pair, "time": time_str,
279                            "side": side, "price": price, "volume": volume
280                        }));
281                    }
282                }
283            }
284        }
285        last_event
286    }, output_format)
287    .await
288}
289
290async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
291    let channel = format!("market:order-book-{}", pair);
292    let token = fetch_public_ws_token(client).await?;
293    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
294        let data = &val["result"]["data"]["data"];
295        let ask_price = data["ask"].as_array().and_then(|asks| {
296            asks.last().and_then(|best| {
297                let p = helpers::value_to_string(best.get("price").unwrap_or(&serde_json::Value::Null));
298                let a = helpers::value_to_string(
299                    best.get("btc_volume")
300                        .or_else(|| best.get("volume"))
301                        .or_else(|| best.get("amount"))
302                        .unwrap_or(&serde_json::Value::Null),
303                );
304                Some((p, a))
305            })
306        });
307        let bid_price = data["bid"].as_array().and_then(|bids| {
308            bids.first().and_then(|best| {
309                let p = helpers::value_to_string(best.get("price").unwrap_or(&serde_json::Value::Null));
310                let a = helpers::value_to_string(
311                    best.get("btc_volume")
312                        .or_else(|| best.get("volume"))
313                        .or_else(|| best.get("amount"))
314                        .unwrap_or(&serde_json::Value::Null),
315                );
316                Some((p, a))
317            })
318        });
319        let event = serde_json::json!({
320            "event": "orderbook", "pair": pair,
321            "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
322            "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
323        });
324        if output_format == OutputFormat::Json {
325            println!("{}", event);
326        } else if std::io::stdout().is_terminal() {
327            if let Some((price, amount)) = ask_price {
328                print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
329            }
330            if let Some((price, amount)) = bid_price {
331                println!("Bid: {} @ {}", price, amount);
332            }
333        } else {
334            if let Some((price, amount)) = ask_price {
335                println!("Ask: {} @ {}", price, amount);
336            }
337            if let Some((price, amount)) = bid_price {
338                println!("Bid: {} @ {}", price, amount);
339            }
340        }
341        Some(event)
342    }, output_format)
343    .await
344}
345
346async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
347    let token = fetch_public_ws_token(client).await?;
348    ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
349        let rows = &val["result"]["data"]["data"];
350        let mut last_event = None;
351        if let serde_json::Value::Array(arr) = rows {
352            for row in arr {
353                if let serde_json::Value::Array(fields) = row {
354                    if fields.len() >= 8 {
355                        let pair = fields[0].as_str().unwrap_or("?");
356                        let last = helpers::value_to_string(&fields[2]);
357                        let high = helpers::value_to_string(&fields[4]);
358                        let low = helpers::value_to_string(&fields[3]);
359                        let price_24h = fields[5].as_f64().unwrap_or(0.0);
360                        let last_f = fields[2].as_f64().unwrap_or(0.0);
361                        let change = if price_24h > 0.0 {
362                            format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
363                        } else {
364                            "0%".to_string()
365                        };
366                        if output_format == OutputFormat::Json {
367                            println!("{}", serde_json::json!({
368                                "event": "summary", "pair": pair, "last": last,
369                                "high": high, "low": low, "change": change
370                            }));
371                        } else if std::io::stdout().is_terminal() {
372                            println!("\x1b[K{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
373                        } else {
374                            println!("{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
375                        }
376                        last_event = Some(serde_json::json!({
377                            "event": "summary", "pair": pair, "last": last,
378                            "high": high, "low": low, "change": change
379                        }));
380                    }
381                }
382            }
383        }
384        last_event
385    }, output_format)
386    .await
387}
388
389async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
390    if client.signer().is_none() {
391        return Err(anyhow::anyhow!(
392            "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
393        ));
394    }
395
396    eprintln!("Generating WebSocket token...");
397    let token = client.generate_ws_token().await.map_err(|e| {
398        anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
399    })?;
400    eprintln!("Token generated. Connecting to private WebSocket...");
401
402    let channel = "private:orders";
403    ws_connect_and_listen(PRIVATE_WS_URL, &token, channel, |val| {
404        let data = &val["result"]["data"];
405        let event = if let Some(order_id) = data.get("order_id").and_then(|v| v.as_u64()) {
406            let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
407            let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
408            let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
409            let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
410            let amount = helpers::value_to_string(data.get("amount").unwrap_or(&serde_json::Value::Null));
411            if output_format == OutputFormat::Json {
412                println!("{}", serde_json::json!({
413                    "event": "order_update", "order_id": order_id, "pair": pair,
414                    "side": side, "status": status, "price": price, "amount": amount
415                }));
416            } else {
417                println!("ID={} Pair={} Side={} Status={} Price={} Amount={}",
418                    order_id, pair, side, status, price, amount);
419            }
420            Some(serde_json::json!({
421                "event": "order_update", "order_id": order_id, "pair": pair,
422                "side": side, "status": status, "price": price, "amount": amount
423            }))
424        } else {
425            if output_format == OutputFormat::Json {
426                let raw = serde_json::json!({"event": "order_update_raw", "data": &val["result"]});
427                println!("{}", raw);
428                Some(raw)
429            } else {
430                println!("{}", serde_json::to_string_pretty(&val).unwrap_or_default());
431                None
432            }
433        };
434        event
435    }, output_format)
436    .await
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_websocket_command_variants() {
445        let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
446        let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
447        let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
448        let _cmd4 = WebSocketCommand::Summary;
449        let _cmd5 = WebSocketCommand::Orders;
450    }
451
452    #[test]
453    fn test_websocket_command_ticker() {
454        let cmd = WebSocketCommand::Ticker { pair: "xrp_idr".into() };
455        match cmd {
456            WebSocketCommand::Ticker { pair } => {
457                assert_eq!(pair, "xrp_idr");
458            }
459            _ => assert!(false, "Expected Ticker command, got {:?}", cmd),
460        }
461    }
462
463    #[test]
464    fn test_websocket_command_trades() {
465        let cmd = WebSocketCommand::Trades { pair: "doge_idr".into() };
466        match cmd {
467            WebSocketCommand::Trades { pair } => {
468                assert_eq!(pair, "doge_idr");
469            }
470            _ => assert!(false, "Expected Trades command, got {:?}", cmd),
471        }
472    }
473
474    #[test]
475    fn test_websocket_command_book() {
476        let cmd = WebSocketCommand::Book { pair: "eth_idr".into() };
477        match cmd {
478            WebSocketCommand::Book { pair } => {
479                assert_eq!(pair, "eth_idr");
480            }
481            _ => assert!(false, "Expected Book command, got {:?}", cmd),
482        }
483    }
484
485    #[test]
486    fn test_websocket_command_summary() {
487        let cmd = WebSocketCommand::Summary;
488        match cmd {
489            WebSocketCommand::Summary => (),
490            _ => assert!(false, "Expected Summary command, got {:?}", cmd),
491        }
492    }
493
494    #[test]
495    fn test_websocket_command_orders() {
496        let cmd = WebSocketCommand::Orders;
497        match cmd {
498            WebSocketCommand::Orders => (),
499            _ => assert!(false, "Expected Orders command, got {:?}", cmd),
500        }
501    }
502
503    #[test]
504    fn test_format_ws_price_u64() {
505        let val = serde_json::json!(123456);
506        assert_eq!(format_ws_price(&val).as_deref(), Some("123456"));
507    }
508
509    #[test]
510    fn test_format_ws_price_f64() {
511        let val = serde_json::json!(123.456);
512        let result = format_ws_price(&val);
513        assert!(result.is_some());
514    }
515
516    #[test]
517    fn test_format_ws_price_str() {
518        let val = serde_json::json!("789");
519        assert_eq!(format_ws_price(&val).as_deref(), Some("789"));
520    }
521
522    #[test]
523    fn test_format_ws_price_null() {
524        let val = serde_json::json!(null);
525        assert!(format_ws_price(&val).is_none());
526    }
527
528    #[test]
529    fn test_public_ws_url() {
530        assert!(PUBLIC_WS_URL.contains("ws3.indodax.com"));
531    }
532
533    #[test]
534    fn test_private_ws_url() {
535        assert!(PRIVATE_WS_URL.contains("pws.indodax.com"));
536    }
537
538    #[test]
539    fn test_public_ws_token_url() {
540        assert!(crate::commands::helpers::PUBLIC_WS_TOKEN_URL.contains("indodax.com"));
541    }
542}