Skip to main content

indodax_cli/commands/
websocket.rs

1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15#[derive(Debug, clap::Subcommand)]
16pub enum WebSocketCommand {
17    #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
18    Ticker {
19        #[arg(default_value = "btc_idr")]
20        pair: String,
21    },
22
23    #[command(name = "trades", about = "Stream real-time trades for a pair")]
24    Trades {
25        #[arg(default_value = "btc_idr")]
26        pair: String,
27    },
28
29    #[command(name = "book", about = "Stream real-time order book for a pair")]
30    Book {
31        #[arg(default_value = "btc_idr")]
32        pair: String,
33    },
34
35    #[command(name = "summary", about = "Stream 24h summary for all pairs")]
36    Summary,
37
38    #[command(name = "orders", about = "Stream private order updates")]
39    Orders,
40}
41
42pub async fn execute(
43    client: &IndodaxClient,
44    cmd: &WebSocketCommand,
45    output_format: OutputFormat,
46) -> Result<CommandOutput> {
47    match cmd {
48        WebSocketCommand::Ticker { pair } => {
49            let pair = helpers::normalize_pair(pair);
50            ws_ticker(client, &pair, output_format).await
51        }
52        WebSocketCommand::Trades { pair } => {
53            let pair = helpers::normalize_pair(pair);
54            ws_trades(client, &pair, output_format).await
55        }
56        WebSocketCommand::Book { pair } => {
57            let pair = helpers::normalize_pair(pair);
58            ws_book(client, &pair, output_format).await
59        }
60        WebSocketCommand::Summary => ws_summary(client, output_format).await,
61        WebSocketCommand::Orders => ws_orders(client, output_format).await,
62    }
63}
64
65async fn ws_connect_and_listen(
66    ws_url: &str,
67    token: &str,
68    channel: &str,
69    handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
70    output_format: OutputFormat,
71) -> Result<CommandOutput> {
72    let spinner_ref = if output_format == OutputFormat::Json {
73        eprintln!(
74            "{}",
75            serde_json::json!({"event": "connecting", "url": ws_url})
76        );
77        None
78    } else {
79        let pb = ProgressBar::new_spinner();
80        pb.set_message("Connecting to Indodax WebSocket...");
81        pb.enable_steady_tick(std::time::Duration::from_millis(100));
82        Some(pb)
83    };
84
85    let mut events: Vec<serde_json::Value> = Vec::new();
86    let mut retry_count = 0;
87
88    'reconnect: loop {
89        if retry_count > 0 {
90            let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
91            if let Some(ref pb) = spinner_ref {
92                pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
93            } else {
94                eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
95            }
96            tokio::select! {
97                _ = tokio::signal::ctrl_c() => break 'reconnect,
98                _ = tokio::time::sleep(delay) => {}
99            }
100        }
101
102        let (mut ws_stream, _) = match tokio::time::timeout(std::time::Duration::from_secs(10), connect_async(ws_url)).await {
103            Ok(Ok(s)) => s,
104            Ok(Err(e)) => {
105                retry_count += 1;
106                tracing::warn!("WebSocket connection failed: {}. Retrying...", e);
107                continue 'reconnect;
108            }
109            Err(_) => {
110                retry_count += 1;
111                tracing::warn!("WebSocket connection timed out after 10s. Retrying...");
112                continue 'reconnect;
113            }
114        };
115
116        if let Some(ref pb) = spinner_ref {
117            pb.set_message("Connected. Authenticating...");
118        } else {
119            eprintln!(
120                "{}",
121                serde_json::json!({"event": "connected", "status": "authenticating"})
122            );
123        }
124
125        let auth_msg = serde_json::json!({
126            "params": { "token": token },
127            "id": 1
128        });
129        if let Err(e) = ws_stream.send(Message::Text(auth_msg.to_string())).await {
130            retry_count += 1;
131            tracing::warn!("Failed to send auth message: {}. Retrying...", e);
132            continue 'reconnect;
133        }
134
135        let mut authed = false;
136        let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
137        ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138
139        loop {
140            tokio::select! {
141                _ = tokio::signal::ctrl_c() => {
142                    if let Some(ref pb) = spinner_ref {
143                        pb.finish_and_clear();
144                        eprintln!("Interrupted by user. Closing connection...");
145                    } else {
146                        eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
147                    }
148                    let _ = ws_stream.send(Message::Close(None)).await;
149                    break 'reconnect;
150                }
151                _ = ping_interval.tick() => {
152                    // Application-level ping to keep connection alive
153                    let ping_msg = serde_json::json!({
154                        "method": 7,
155                        "id": 7
156                    });
157                    if let Err(e) = ws_stream.send(Message::Text(ping_msg.to_string())).await {
158                        tracing::warn!("Failed to send WebSocket ping: {}. Triggering reconnect...", e);
159                        retry_count += 1;
160                        continue 'reconnect;
161                    }
162                }
163                msg = ws_stream.next() => {
164                    let msg = match msg {
165                        Some(m) => m,
166                        None => {
167                            retry_count += 1;
168                            tracing::warn!("WebSocket stream ended. Reconnecting...");
169                            continue 'reconnect;
170                        }
171                    };
172
173                    match msg {
174                        Ok(Message::Text(text)) => {
175                            let val = match serde_json::from_str::<serde_json::Value>(&text) {
176                                Ok(v) => v,
177                                Err(e) => {
178                                    tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
179                                    continue;
180                                }
181                            };
182
183                            if !authed {
184                                if val.get("id").and_then(|v| v.as_i64()) == Some(1)
185                                    && val.get("result").is_some()
186                                {
187                                    authed = true;
188                                    retry_count = 0; // Reset retry count on successful auth
189                                    if let Some(ref pb) = spinner_ref {
190                                        pb.set_message(format!("Authenticated. Subscribing to: {}", channel));
191                                    } else {
192                                        eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
193                                    }
194                                    let sub_msg = serde_json::json!({
195                                        "method": 1,
196                                        "params": { "channel": channel },
197                                        "id": 2
198                                    });
199                                    if let Err(_e) = ws_stream.send(Message::Text(sub_msg.to_string())).await {
200                                        retry_count += 1;
201                                        continue 'reconnect;
202                                    }
203                                }
204                                continue;
205                            }
206
207                            if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
208                                // subscription confirmation
209                                if let Some(ref pb) = spinner_ref {
210                                    pb.finish_and_clear();
211                                    eprintln!("Subscription active: {}", channel);
212                                    eprintln!();
213                                }
214                            } else if val.get("result").is_some() {
215                                if let Some(event) = handler(val) {
216                                    events.push(event);
217                                }
218                            }
219                        }
220                        Ok(Message::Ping(data)) => {
221                            let _ = ws_stream.send(Message::Pong(data)).await;
222                        }
223                        Ok(Message::Close(_)) => {
224                            retry_count += 1;
225                            tracing::warn!("Connection closed by server. Reconnecting...");
226                            continue 'reconnect;
227                        }
228                        Err(e) => {
229                            retry_count += 1;
230                            tracing::warn!("WebSocket error: {}. Reconnecting...", e);
231                            continue 'reconnect;
232                        }
233                        _ => {}
234                    }
235                }
236            }
237        }
238    }
239
240    if output_format == OutputFormat::Json {
241        Ok(CommandOutput::new_empty().with_suppress_final_output(true))
242    } else {
243        Ok(CommandOutput::json(serde_json::json!({
244            "status": "disconnected",
245            "events": events,
246            "event_count": events.len(),
247        })))
248    }
249}
250
251fn format_ws_price(val: &serde_json::Value) -> Option<String> {
252    let f = val.as_f64()
253        .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
254    if f == 0.0 {
255        return Some("0".into());
256    }
257    if (f - f.round()).abs() < f64::EPSILON && f.abs() >= 1.0 {
258        return Some(format!("{}", f as u64));
259    }
260    let s = format!("{:.8}", f);
261    let trimmed = s.trim_end_matches('0');
262    Some(trimmed.trim_end_matches('.').to_string())
263}
264
265async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
266    let channel = format!("chart:tick-{}", pair);
267    let token = helpers::fetch_public_ws_token(client).await?;
268    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
269        let rows = &val["result"]["data"]["data"];
270        let mut last_event = None;
271        if let serde_json::Value::Array(arr) = rows {
272            for row in arr {
273                if let serde_json::Value::Array(fields) = row {
274                    if fields.len() >= 4 {
275                        let ts = fields[0].as_u64().unwrap_or(0);
276                        let price = fields.get(2).and_then(format_ws_price).unwrap_or_default();
277                        let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
278                            .map(|d| d.format("%H:%M:%S").to_string())
279                            .unwrap_or_default();
280                        if output_format == OutputFormat::Json {
281                            println!("{}", serde_json::json!({
282                                "event": "ticker", "pair": pair, "time": time_str, "price": price
283                            }));
284                        } else {
285                            println!("[{}] {}  {}", time_str, pair, price);
286                        }
287                        last_event = Some(serde_json::json!({
288                            "event": "ticker", "pair": pair, "time": time_str, "price": price
289                        }));
290                    }
291                }
292            }
293        }
294        last_event
295    }, output_format)
296    .await
297}
298
299async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
300    let channel = format!("market:trade-activity-{}", pair);
301    let token = helpers::fetch_public_ws_token(client).await?;
302    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
303        let rows = &val["result"]["data"]["data"];
304        let mut last_event = None;
305        if let serde_json::Value::Array(arr) = rows {
306            for row in arr {
307                if let serde_json::Value::Array(fields) = row {
308                    if fields.len() >= 7 {
309                        let ts = fields[1].as_u64().unwrap_or(0);
310                        let side = fields.get(3).and_then(|v| v.as_str()).unwrap_or("?");
311                        let price = fields.get(4).and_then(|v| v.as_f64()).unwrap_or(0.0);
312                        let volume = fields.get(6).and_then(|v| v.as_str()).and_then(|s| s.parse().ok()).unwrap_or(0.0);
313                        let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
314                            .map(|d| d.format("%H:%M:%S").to_string())
315                            .unwrap_or_default();
316                        if output_format == OutputFormat::Json {
317                            println!("{}", serde_json::json!({
318                                "event": "trade", "pair": pair, "time": time_str,
319                                "side": side, "price": price, "volume": volume
320                            }));
321                        } else {
322                            println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
323                        }
324                        last_event = Some(serde_json::json!({
325                            "event": "trade", "pair": pair, "time": time_str,
326                            "side": side, "price": price, "volume": volume
327                        }));
328                    }
329                }
330            }
331        }
332        last_event
333    }, output_format)
334    .await
335}
336
337async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
338    let channel = format!("market:order-book-{}", pair);
339    let token = helpers::fetch_public_ws_token(client).await?;
340    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
341        let data = &val["result"]["data"]["data"];
342        
343        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
344            if let Some(arr) = entry.as_array() {
345                if arr.len() >= 2 {
346                    let p = helpers::value_to_string(&arr[0]);
347                    let v = helpers::value_to_string(&arr[1]);
348                    return Some((p, v));
349                }
350            } else if let Some(obj) = entry.as_object() {
351                let p = helpers::value_to_string(obj.get("price").unwrap_or(&serde_json::Value::Null));
352                let v = helpers::value_to_string(
353                    obj.get("btc_volume")
354                        .or_else(|| obj.get("volume"))
355                        .or_else(|| obj.get("amount"))
356                        .unwrap_or(&serde_json::Value::Null),
357                );
358                return Some((p, v));
359            }
360            None
361        };
362
363        let ask_price = data["ask"].as_array().and_then(|asks| {
364            asks.first().and_then(parse_entry)
365        }).or_else(|| data["asks"].as_array().and_then(|asks| {
366            asks.first().and_then(parse_entry)
367        }));
368
369        let bid_price = data["bid"].as_array().and_then(|bids| {
370            bids.first().and_then(parse_entry)
371        }).or_else(|| data["bids"].as_array().and_then(|bids| {
372            bids.first().and_then(parse_entry)
373        }));
374
375        let event = serde_json::json!({
376            "event": "orderbook", "pair": pair,
377            "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
378            "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
379        });
380        if output_format == OutputFormat::Json {
381            println!("{}", event);
382        } else if std::io::stdout().is_terminal() {
383            if let Some((price, amount)) = ask_price {
384                print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
385            }
386            if let Some((price, amount)) = bid_price {
387                println!("Bid: {} @ {}", price, amount);
388            }
389        } else {
390            if let Some((price, amount)) = ask_price {
391                println!("Ask: {} @ {}", price, amount);
392            }
393            if let Some((price, amount)) = bid_price {
394                println!("Bid: {} @ {}", price, amount);
395            }
396        }
397        Some(event)
398    }, output_format)
399    .await
400}
401
402async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
403    let token = helpers::fetch_public_ws_token(client).await?;
404    ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
405        let rows = &val["result"]["data"]["data"];
406        let mut last_event = None;
407        if let serde_json::Value::Array(arr) = rows {
408            for row in arr {
409                if let serde_json::Value::Array(fields) = row {
410                    if fields.len() >= 8 {
411                        let pair = fields[0].as_str().unwrap_or("?");
412                        let last = helpers::value_to_string(fields.get(2).unwrap_or(&serde_json::Value::Null));
413                        let high = helpers::value_to_string(&fields[4]);
414                        let low = helpers::value_to_string(&fields[3]);
415                        let price_24h = fields[5].as_f64().unwrap_or(0.0);
416                        let last_f = fields[2].as_f64().unwrap_or(0.0);
417                        let change = if price_24h > 0.0 {
418                            format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
419                        } else {
420                            "0%".to_string()
421                        };
422                        if output_format == OutputFormat::Json {
423                            println!("{}", serde_json::json!({
424                                "event": "summary", "pair": pair, "last": last,
425                                "high": high, "low": low, "change": change
426                            }));
427                        } else if std::io::stdout().is_terminal() {
428                            println!("\x1b[K{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
429                        } else {
430                            println!("{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
431                        }
432                        last_event = Some(serde_json::json!({
433                            "event": "summary", "pair": pair, "last": last,
434                            "high": high, "low": low, "change": change
435                        }));
436                    }
437                }
438            }
439        }
440        last_event
441    }, output_format)
442    .await
443}
444
445async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
446    if client.signer().is_none() {
447        return Err(anyhow::anyhow!(
448            "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
449        ));
450    }
451
452    eprintln!("Generating WebSocket token...");
453    let (token, channel) = client.generate_ws_token().await.map_err(|e| {
454        anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
455    })?;
456    eprintln!("Token generated. Connecting to private WebSocket...");
457
458    ws_private_connect_and_listen(PRIVATE_WS_URL, &token, &channel, |val| {
459        // Private WebSocket messages can be in several formats depending on the server version
460        // We look for common patterns in order and balance updates.
461        
462        let result = val.get("result").or_else(|| val.get("push")).unwrap_or(&val);
463        let data = result.get("data").unwrap_or(result);
464
465        if let Some(order_id) = data.get("order_id").or(data.get("orderId")).and_then(|v| v.as_u64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))) {
466            let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
467            let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
468            let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
469            let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
470            let amount = helpers::value_to_string(data.get("amount").or(data.get("quantity")).unwrap_or(&serde_json::Value::Null));
471            
472            if output_format == OutputFormat::Json {
473                println!("{}", serde_json::json!({
474                    "event": "order_update", "order_id": order_id, "pair": pair,
475                    "side": side, "status": status, "price": price, "amount": amount
476                }));
477            } else {
478                println!("Order Update: ID={} Pair={} Side={} Status={} Price={} Amount={}",
479                    order_id, pair, side, status, price, amount);
480            }
481            Some(serde_json::json!({
482                "event": "order_update", "order_id": order_id, "pair": pair,
483                "side": side, "status": status, "price": price, "amount": amount
484            }))
485        } else if let Some(currency) = data.get("currency").or(data.get("asset")).and_then(|v| v.as_str()) {
486            let available = helpers::value_to_string(data.get("available").or(data.get("balance")).unwrap_or(&serde_json::Value::Null));
487            let frozen = helpers::value_to_string(data.get("frozen").or(data.get("hold")).unwrap_or(&serde_json::Value::Null));
488            
489            if output_format == OutputFormat::Json {
490                println!("{}", serde_json::json!({
491                    "event": "balance_update", "currency": currency,
492                    "available": available, "frozen": frozen
493                }));
494            } else {
495                println!("Balance Update: {} Available={} Frozen={}", currency, available, frozen);
496            }
497            Some(serde_json::json!({
498                "event": "balance_update", "currency": currency,
499                "available": available, "frozen": frozen
500            }))
501        } else {
502            // Raw fallback for unknown private events
503            if output_format == OutputFormat::Json {
504                let raw = serde_json::json!({"event": "private_update_raw", "data": data});
505                println!("{}", raw);
506                Some(raw)
507            } else {
508                // If it's a heartbeat or confirmation, we might want to skip printing
509                if data.get("method").and_then(|m| m.as_str()) != Some("pong") {
510                    println!("Private Event: {}", serde_json::to_string(data).unwrap_or_default());
511                }
512                Some(data.clone())
513            }
514        }
515    }, output_format)
516    .await
517}
518
519async fn ws_private_connect_and_listen(
520    ws_url: &str,
521    token: &str,
522    channel: &str,
523    handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
524    output_format: OutputFormat,
525) -> Result<CommandOutput> {
526    let spinner_ref = if output_format == OutputFormat::Json {
527        eprintln!("{}", serde_json::json!({"event": "connecting", "url": ws_url}));
528        None
529    } else {
530        let pb = ProgressBar::new_spinner();
531        pb.set_message("Connecting to Private WebSocket...");
532        pb.enable_steady_tick(std::time::Duration::from_millis(100));
533        Some(pb)
534    };
535
536    let mut events: Vec<serde_json::Value> = Vec::new();
537    let mut retry_count = 0;
538
539    'reconnect: loop {
540        if retry_count > 0 {
541            let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
542            if let Some(ref pb) = spinner_ref {
543                pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
544            } else {
545                eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
546            }
547            tokio::select! {
548                _ = tokio::signal::ctrl_c() => break 'reconnect,
549                _ = tokio::time::sleep(delay) => {}
550            }
551        }
552
553        let (mut ws_stream, _) = match tokio::time::timeout(std::time::Duration::from_secs(10), connect_async(ws_url)).await {
554            Ok(Ok(s)) => s,
555            Ok(Err(e)) => {
556                retry_count += 1;
557                tracing::warn!("Private WebSocket connection failed: {}. Retrying...", e);
558                continue 'reconnect;
559            }
560            Err(_) => {
561                retry_count += 1;
562                tracing::warn!("Private WebSocket connection timed out after 10s. Retrying...");
563                continue 'reconnect;
564            }
565        };
566
567        // 1. Connect (Authenticate)
568        let connect_msg = serde_json::json!({
569            "connect": { "token": token },
570            "id": 1
571        });
572        if let Err(_e) = ws_stream.send(Message::Text(connect_msg.to_string())).await {
573            retry_count += 1;
574            continue 'reconnect;
575        }
576
577        let mut authed = false;
578        let mut subscribed = false;
579        let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
580        ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
581
582        loop {
583            tokio::select! {
584                _ = tokio::signal::ctrl_c() => {
585                    let _ = ws_stream.send(Message::Close(None)).await;
586                    break 'reconnect;
587                }
588                _ = ping_interval.tick() => {
589                    // Private WS uses standard ping frames if configured in URL, 
590                    // but some versions also support application-level pings.
591                    let _ = ws_stream.send(Message::Ping(vec![])).await;
592                }
593                msg = ws_stream.next() => {
594                    let msg = match msg {
595                        Some(m) => m,
596                        None => { retry_count += 1; continue 'reconnect; }
597                    };
598
599                    match msg {
600                        Ok(Message::Text(text)) => {
601                            let val: serde_json::Value = match serde_json::from_str(&text) {
602                                Ok(v) => v,
603                                Err(_) => continue,
604                            };
605
606                            if !authed {
607                                // Check for connection success
608                                if val.get("connect").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(1) {
609                                    authed = true;
610                                    retry_count = 0;
611                                    // 2. Subscribe to user channel
612                                    let sub_msg = serde_json::json!({
613                                        "subscribe": { "channel": channel },
614                                        "id": 2
615                                    });
616                                    let _ = ws_stream.send(Message::Text(sub_msg.to_string())).await;
617                                }
618                                continue;
619                            }
620
621                            if !subscribed {
622                                if val.get("subscribe").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(2) {
623                                    subscribed = true;
624                                    if let Some(ref pb) = spinner_ref {
625                                        pb.finish_and_clear();
626                                        eprintln!("Private subscription active: {}", channel);
627                                        eprintln!();
628                                    }
629                                }
630                                continue;
631                            }
632
633                            if let Some(event) = handler(val) {
634                                events.push(event);
635                            }
636                        }
637                        Ok(Message::Ping(data)) => { let _ = ws_stream.send(Message::Pong(data)).await; }
638                        Ok(Message::Close(_)) => { retry_count += 1; continue 'reconnect; }
639                        Err(_) => { retry_count += 1; continue 'reconnect; }
640                        _ => {}
641                    }
642                }
643            }
644        }
645    }
646
647    if output_format == OutputFormat::Json {
648        Ok(CommandOutput::new_empty().with_suppress_final_output(true))
649    } else {
650        Ok(CommandOutput::json(serde_json::json!({
651            "status": "disconnected",
652            "events": events,
653            "event_count": events.len(),
654        })))
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use serde_json::json;
662
663    #[test]
664    fn test_websocket_command_variants() {
665        let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
666        let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
667        let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
668        let _cmd4 = WebSocketCommand::Summary;
669        let _cmd5 = WebSocketCommand::Orders;
670    }
671
672    #[test]
673    fn test_format_ws_price() {
674        assert_eq!(format_ws_price(&json!(1234.56)), Some("1234.56".to_string()));
675        assert_eq!(format_ws_price(&json!("1234.56")), Some("1234.56".to_string()));
676        assert_eq!(format_ws_price(&json!(1000)), Some("1000".to_string()));
677        assert_eq!(format_ws_price(&json!(0)), Some("0".to_string()));
678    }
679
680    #[test]
681    fn test_ticker_parsing_logic() {
682        let msg = json!({
683            "result": {
684                "data": {
685                    "data": [
686                        [1632717721, 4087327, 14340, "1063.73019525"]
687                    ]
688                }
689            }
690        });
691        
692        // Simulating the handler logic inside ws_ticker
693        let rows = &msg["result"]["data"]["data"];
694        if let serde_json::Value::Array(arr) = rows {
695            let fields = arr[0].as_array().unwrap();
696            let price = format_ws_price(&fields[2]).unwrap();
697            assert_eq!(price, "14340");
698        } else {
699            panic!("Expected array");
700        }
701    }
702
703    #[test]
704    fn test_orderbook_parsing_array_format() {
705        let msg = json!({
706            "result": {
707                "data": {
708                    "data": {
709                        "asks": [["651000000", "0.05000000"]],
710                        "bids": [["650000000", "0.12345678"]]
711                    }
712                }
713            }
714        });
715        
716        let data = &msg["result"]["data"]["data"];
717        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
718            if let Some(arr) = entry.as_array() {
719                if arr.len() >= 2 {
720                    let p = helpers::value_to_string(&arr[0]);
721                    let v = helpers::value_to_string(&arr[1]);
722                    return Some((p, v));
723                }
724            }
725            None
726        };
727
728        let ask = data["asks"].as_array().unwrap().first().and_then(parse_entry).unwrap();
729        assert_eq!(ask.0, "651000000");
730        assert_eq!(ask.1, "0.05000000");
731    }
732
733    #[test]
734    fn test_orderbook_parsing_object_format() {
735        let msg = json!({
736            "result": {
737                "data": {
738                    "data": {
739                        "ask": [{"price": "319437000", "btc_volume": "0.11035661"}],
740                        "bid": [{"price": "319436000", "btc_volume": "0.61427265"}]
741                    }
742                }
743            }
744        });
745
746        let data = &msg["result"]["data"]["data"];
747        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
748            if let Some(obj) = entry.as_object() {
749                let p = helpers::value_to_string(obj.get("price").unwrap());
750                let v = helpers::value_to_string(obj.get("btc_volume").unwrap());
751                return Some((p, v));
752            }
753            None
754        };
755
756        let ask = data["ask"].as_array().unwrap().first().and_then(parse_entry).unwrap();
757        assert_eq!(ask.0, "319437000");
758        assert_eq!(ask.1, "0.11035661");
759    }
760
761    #[test]
762    fn test_private_order_update_parsing() {
763        let msg = json!({
764            "push": {
765                "data": {
766                    "order_id": 12345,
767                    "pair": "btcidr",
768                    "side": "buy",
769                    "status": "filled",
770                    "price": "500000000",
771                    "amount": "0.1"
772                }
773            }
774        });
775
776        let result = msg.get("result").or_else(|| msg.get("push")).unwrap_or(&msg);
777        let data = result.get("data").unwrap_or(result);
778        
779        assert_eq!(data["order_id"], 12345);
780        assert_eq!(data["pair"], "btcidr");
781    }
782
783    #[test]
784    fn test_private_balance_update_parsing() {
785        let msg = json!({
786            "currency": "idr",
787            "available": "1000000",
788            "frozen": "50000"
789        });
790
791        let data = &msg;
792        assert_eq!(data["currency"], "idr");
793        assert_eq!(data["available"], "1000000");
794    }
795
796    #[test]
797    fn test_fetch_public_ws_token_precedence() {
798        // We can't easily mock the IndodaxClient's network call here without more refactoring,
799        // but we can test the logic of fetch_public_ws_token if we extract it slightly.
800        let default_token = helpers::DEFAULT_STATIC_WS_TOKEN;
801        assert!(default_token.starts_with("eyJ"));
802    }
803}