Skip to main content

indodax_cli/commands/
websocket.rs

1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15#[derive(Debug, clap::Subcommand)]
16pub enum WebSocketCommand {
17    #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
18    Ticker {
19        #[arg(default_value = "btc_idr")]
20        pair: String,
21    },
22
23    #[command(name = "trades", about = "Stream real-time trades for a pair")]
24    Trades {
25        #[arg(default_value = "btc_idr")]
26        pair: String,
27    },
28
29    #[command(name = "book", about = "Stream real-time order book for a pair")]
30    Book {
31        #[arg(default_value = "btc_idr")]
32        pair: String,
33    },
34
35    #[command(name = "summary", about = "Stream 24h summary for all pairs")]
36    Summary,
37
38    #[command(name = "orders", about = "Stream private order updates")]
39    Orders,
40}
41
42pub async fn execute(
43    client: &IndodaxClient,
44    cmd: &WebSocketCommand,
45    output_format: OutputFormat,
46) -> Result<CommandOutput> {
47    match cmd {
48        WebSocketCommand::Ticker { pair } => {
49            let pair = helpers::normalize_pair(pair);
50            ws_ticker(client, &pair, output_format).await
51        }
52        WebSocketCommand::Trades { pair } => {
53            let pair = helpers::normalize_pair(pair);
54            ws_trades(client, &pair, output_format).await
55        }
56        WebSocketCommand::Book { pair } => {
57            let pair = helpers::normalize_pair(pair);
58            ws_book(client, &pair, output_format).await
59        }
60        WebSocketCommand::Summary => ws_summary(client, output_format).await,
61        WebSocketCommand::Orders => ws_orders(client, output_format).await,
62    }
63}
64
65async fn ws_connect_and_listen(
66    ws_url: &str,
67    token: &str,
68    channel: &str,
69    handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
70    output_format: OutputFormat,
71) -> Result<CommandOutput> {
72    let spinner_ref = if output_format == OutputFormat::Json {
73        eprintln!(
74            "{}",
75            serde_json::json!({"event": "connecting", "url": ws_url})
76        );
77        None
78    } else {
79        let pb = ProgressBar::new_spinner();
80        pb.set_message("Connecting to Indodax WebSocket...");
81        pb.enable_steady_tick(std::time::Duration::from_millis(100));
82        Some(pb)
83    };
84
85    let mut events: Vec<serde_json::Value> = Vec::new();
86    let mut retry_count = 0;
87
88    'reconnect: loop {
89        if retry_count > 0 {
90            let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
91            if let Some(ref pb) = spinner_ref {
92                pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
93            } else {
94                eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
95            }
96            tokio::select! {
97                _ = tokio::signal::ctrl_c() => break 'reconnect,
98                _ = tokio::time::sleep(delay) => {}
99            }
100        }
101
102        let (mut ws_stream, _) = match connect_async(ws_url).await {
103            Ok(s) => s,
104            Err(e) => {
105                retry_count += 1;
106                tracing::warn!("WebSocket connection failed: {}. Retrying...", e);
107                continue 'reconnect;
108            }
109        };
110
111        if let Some(ref pb) = spinner_ref {
112            pb.set_message("Connected. Authenticating...");
113        } else {
114            eprintln!(
115                "{}",
116                serde_json::json!({"event": "connected", "status": "authenticating"})
117            );
118        }
119
120        let auth_msg = serde_json::json!({
121            "params": { "token": token },
122            "id": 1
123        });
124        if let Err(e) = ws_stream.send(Message::Text(auth_msg.to_string())).await {
125            retry_count += 1;
126            tracing::warn!("Failed to send auth message: {}. Retrying...", e);
127            continue 'reconnect;
128        }
129
130        let mut authed = false;
131        let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
132        ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
133
134        loop {
135            tokio::select! {
136                _ = tokio::signal::ctrl_c() => {
137                    if let Some(ref pb) = spinner_ref {
138                        pb.finish_and_clear();
139                        eprintln!("Interrupted by user. Closing connection...");
140                    } else {
141                        eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
142                    }
143                    let _ = ws_stream.send(Message::Close(None)).await;
144                    break 'reconnect;
145                }
146                _ = ping_interval.tick() => {
147                    // Application-level ping to keep connection alive
148                    let ping_msg = serde_json::json!({
149                        "method": 7,
150                        "id": 7
151                    });
152                    if let Err(e) = ws_stream.send(Message::Text(ping_msg.to_string())).await {
153                        tracing::warn!("Failed to send WebSocket ping: {}. Triggering reconnect...", e);
154                        retry_count += 1;
155                        continue 'reconnect;
156                    }
157                }
158                msg = ws_stream.next() => {
159                    let msg = match msg {
160                        Some(m) => m,
161                        None => {
162                            retry_count += 1;
163                            tracing::warn!("WebSocket stream ended. Reconnecting...");
164                            continue 'reconnect;
165                        }
166                    };
167
168                    match msg {
169                        Ok(Message::Text(text)) => {
170                            let val = match serde_json::from_str::<serde_json::Value>(&text) {
171                                Ok(v) => v,
172                                Err(e) => {
173                                    tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
174                                    continue;
175                                }
176                            };
177
178                            if !authed {
179                                if val.get("id").and_then(|v| v.as_i64()) == Some(1)
180                                    && val.get("result").is_some()
181                                {
182                                    authed = true;
183                                    retry_count = 0; // Reset retry count on successful auth
184                                    if let Some(ref pb) = spinner_ref {
185                                        pb.set_message(format!("Authenticated. Subscribing to: {}", channel));
186                                    } else {
187                                        eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
188                                    }
189                                    let sub_msg = serde_json::json!({
190                                        "method": 1,
191                                        "params": { "channel": channel },
192                                        "id": 2
193                                    });
194                                    if let Err(e) = ws_stream.send(Message::Text(sub_msg.to_string())).await {
195                                        retry_count += 1;
196                                        continue 'reconnect;
197                                    }
198                                }
199                                continue;
200                            }
201
202                            if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
203                                // subscription confirmation
204                                if let Some(ref pb) = spinner_ref {
205                                    pb.finish_and_clear();
206                                    eprintln!("Subscription active: {}", channel);
207                                    eprintln!();
208                                }
209                            } else if val.get("result").is_some() {
210                                if let Some(event) = handler(val) {
211                                    events.push(event);
212                                }
213                            }
214                        }
215                        Ok(Message::Ping(data)) => {
216                            let _ = ws_stream.send(Message::Pong(data)).await;
217                        }
218                        Ok(Message::Close(_)) => {
219                            retry_count += 1;
220                            tracing::warn!("Connection closed by server. Reconnecting...");
221                            continue 'reconnect;
222                        }
223                        Err(e) => {
224                            retry_count += 1;
225                            tracing::warn!("WebSocket error: {}. Reconnecting...", e);
226                            continue 'reconnect;
227                        }
228                        _ => {}
229                    }
230                }
231            }
232        }
233    }
234
235    Ok(CommandOutput::json(serde_json::json!({
236        "status": "disconnected",
237        "events": events,
238        "event_count": events.len(),
239    })))
240}
241
242fn format_ws_price(val: &serde_json::Value) -> Option<String> {
243    let f = val.as_f64()
244        .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
245    if f == 0.0 {
246        return Some("0".into());
247    }
248    if (f - f.round()).abs() < f64::EPSILON && f.abs() >= 1.0 {
249        return Some(format!("{}", f as u64));
250    }
251    let s = format!("{:.8}", f);
252    let trimmed = s.trim_end_matches('0');
253    Some(trimmed.trim_end_matches('.').to_string())
254}
255
256async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
257    let channel = format!("chart:tick-{}", pair);
258    let token = helpers::fetch_public_ws_token(client).await?;
259    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
260        let rows = &val["result"]["data"]["data"];
261        let mut last_event = None;
262        if let serde_json::Value::Array(arr) = rows {
263            for row in arr {
264                if let serde_json::Value::Array(fields) = row {
265                    if fields.len() >= 4 {
266                        let ts = fields[0].as_u64().unwrap_or(0);
267                        let price = format_ws_price(&fields[2]).unwrap_or_default();
268                        let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
269                            .map(|d| d.format("%H:%M:%S").to_string())
270                            .unwrap_or_default();
271                        if output_format == OutputFormat::Json {
272                            println!("{}", serde_json::json!({
273                                "event": "ticker", "pair": pair, "time": time_str, "price": price
274                            }));
275                        } else {
276                            println!("[{}] {}  {}", time_str, pair, price);
277                        }
278                        last_event = Some(serde_json::json!({
279                            "event": "ticker", "pair": pair, "time": time_str, "price": price
280                        }));
281                    }
282                }
283            }
284        }
285        last_event
286    }, output_format)
287    .await
288}
289
290async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
291    let channel = format!("market:trade-activity-{}", pair);
292    let token = helpers::fetch_public_ws_token(client).await?;
293    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
294        let rows = &val["result"]["data"]["data"];
295        let mut last_event = None;
296        if let serde_json::Value::Array(arr) = rows {
297            for row in arr {
298                if let serde_json::Value::Array(fields) = row {
299                    if fields.len() >= 7 {
300                        let ts = fields[1].as_u64().unwrap_or(0);
301                        let side = fields[3].as_str().unwrap_or("?");
302                        let price = fields[4].as_f64().unwrap_or(0.0);
303                        let volume = fields[6].as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
304                        let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
305                            .map(|d| d.format("%H:%M:%S").to_string())
306                            .unwrap_or_default();
307                        if output_format == OutputFormat::Json {
308                            println!("{}", serde_json::json!({
309                                "event": "trade", "pair": pair, "time": time_str,
310                                "side": side, "price": price, "volume": volume
311                            }));
312                        } else {
313                            println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
314                        }
315                        last_event = Some(serde_json::json!({
316                            "event": "trade", "pair": pair, "time": time_str,
317                            "side": side, "price": price, "volume": volume
318                        }));
319                    }
320                }
321            }
322        }
323        last_event
324    }, output_format)
325    .await
326}
327
328async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
329    let channel = format!("market:order-book-{}", pair);
330    let token = helpers::fetch_public_ws_token(client).await?;
331    ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
332        let data = &val["result"]["data"]["data"];
333        
334        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
335            if let Some(arr) = entry.as_array() {
336                if arr.len() >= 2 {
337                    let p = helpers::value_to_string(&arr[0]);
338                    let v = helpers::value_to_string(&arr[1]);
339                    return Some((p, v));
340                }
341            } else if let Some(obj) = entry.as_object() {
342                let p = helpers::value_to_string(obj.get("price").unwrap_or(&serde_json::Value::Null));
343                let v = helpers::value_to_string(
344                    obj.get("btc_volume")
345                        .or_else(|| obj.get("volume"))
346                        .or_else(|| obj.get("amount"))
347                        .unwrap_or(&serde_json::Value::Null),
348                );
349                return Some((p, v));
350            }
351            None
352        };
353
354        let ask_price = data["ask"].as_array().and_then(|asks| {
355            asks.first().and_then(parse_entry)
356        }).or_else(|| data["asks"].as_array().and_then(|asks| {
357            asks.first().and_then(parse_entry)
358        }));
359
360        let bid_price = data["bid"].as_array().and_then(|bids| {
361            bids.first().and_then(parse_entry)
362        }).or_else(|| data["bids"].as_array().and_then(|bids| {
363            bids.first().and_then(parse_entry)
364        }));
365
366        let event = serde_json::json!({
367            "event": "orderbook", "pair": pair,
368            "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
369            "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
370        });
371        if output_format == OutputFormat::Json {
372            println!("{}", event);
373        } else if std::io::stdout().is_terminal() {
374            if let Some((price, amount)) = ask_price {
375                print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
376            }
377            if let Some((price, amount)) = bid_price {
378                println!("Bid: {} @ {}", price, amount);
379            }
380        } else {
381            if let Some((price, amount)) = ask_price {
382                println!("Ask: {} @ {}", price, amount);
383            }
384            if let Some((price, amount)) = bid_price {
385                println!("Bid: {} @ {}", price, amount);
386            }
387        }
388        Some(event)
389    }, output_format)
390    .await
391}
392
393async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
394    let token = helpers::fetch_public_ws_token(client).await?;
395    ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
396        let rows = &val["result"]["data"]["data"];
397        let mut last_event = None;
398        if let serde_json::Value::Array(arr) = rows {
399            for row in arr {
400                if let serde_json::Value::Array(fields) = row {
401                    if fields.len() >= 8 {
402                        let pair = fields[0].as_str().unwrap_or("?");
403                        let last = helpers::value_to_string(&fields[2]);
404                        let high = helpers::value_to_string(&fields[4]);
405                        let low = helpers::value_to_string(&fields[3]);
406                        let price_24h = fields[5].as_f64().unwrap_or(0.0);
407                        let last_f = fields[2].as_f64().unwrap_or(0.0);
408                        let change = if price_24h > 0.0 {
409                            format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
410                        } else {
411                            "0%".to_string()
412                        };
413                        if output_format == OutputFormat::Json {
414                            println!("{}", serde_json::json!({
415                                "event": "summary", "pair": pair, "last": last,
416                                "high": high, "low": low, "change": change
417                            }));
418                        } else if std::io::stdout().is_terminal() {
419                            println!("\x1b[K{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
420                        } else {
421                            println!("{:15}  last: {:>15}  high: {:>15}  low: {:>15}  change: {}", pair, last, high, low, change);
422                        }
423                        last_event = Some(serde_json::json!({
424                            "event": "summary", "pair": pair, "last": last,
425                            "high": high, "low": low, "change": change
426                        }));
427                    }
428                }
429            }
430        }
431        last_event
432    }, output_format)
433    .await
434}
435
436async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
437    if client.signer().is_none() {
438        return Err(anyhow::anyhow!(
439            "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
440        ));
441    }
442
443    eprintln!("Generating WebSocket token...");
444    let (token, channel) = client.generate_ws_token().await.map_err(|e| {
445        anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
446    })?;
447    eprintln!("Token generated. Connecting to private WebSocket...");
448
449    ws_private_connect_and_listen(PRIVATE_WS_URL, &token, &channel, |val| {
450        // Private WebSocket messages can be in several formats depending on the server version
451        // We look for common patterns in order and balance updates.
452        
453        let result = val.get("result").or(val.get("push")).or(Some(&val)).unwrap();
454        let data = result.get("data").unwrap_or(result);
455
456        if let Some(order_id) = data.get("order_id").or(data.get("orderId")).and_then(|v| v.as_u64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))) {
457            let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
458            let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
459            let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
460            let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
461            let amount = helpers::value_to_string(data.get("amount").or(data.get("quantity")).unwrap_or(&serde_json::Value::Null));
462            
463            if output_format == OutputFormat::Json {
464                println!("{}", serde_json::json!({
465                    "event": "order_update", "order_id": order_id, "pair": pair,
466                    "side": side, "status": status, "price": price, "amount": amount
467                }));
468            } else {
469                println!("Order Update: ID={} Pair={} Side={} Status={} Price={} Amount={}",
470                    order_id, pair, side, status, price, amount);
471            }
472            Some(serde_json::json!({
473                "event": "order_update", "order_id": order_id, "pair": pair,
474                "side": side, "status": status, "price": price, "amount": amount
475            }))
476        } else if let Some(currency) = data.get("currency").or(data.get("asset")).and_then(|v| v.as_str()) {
477            let available = helpers::value_to_string(data.get("available").or(data.get("balance")).unwrap_or(&serde_json::Value::Null));
478            let frozen = helpers::value_to_string(data.get("frozen").or(data.get("hold")).unwrap_or(&serde_json::Value::Null));
479            
480            if output_format == OutputFormat::Json {
481                println!("{}", serde_json::json!({
482                    "event": "balance_update", "currency": currency,
483                    "available": available, "frozen": frozen
484                }));
485            } else {
486                println!("Balance Update: {} Available={} Frozen={}", currency, available, frozen);
487            }
488            Some(serde_json::json!({
489                "event": "balance_update", "currency": currency,
490                "available": available, "frozen": frozen
491            }))
492        } else {
493            // Raw fallback for unknown private events
494            if output_format == OutputFormat::Json {
495                let raw = serde_json::json!({"event": "private_update_raw", "data": data});
496                println!("{}", raw);
497                Some(raw)
498            } else {
499                // If it's a heartbeat or confirmation, we might want to skip printing
500                if data.get("method").and_then(|m| m.as_str()) != Some("pong") {
501                    println!("Private Event: {}", serde_json::to_string(data).unwrap_or_default());
502                }
503                Some(data.clone())
504            }
505        }
506    }, output_format)
507    .await
508}
509
510async fn ws_private_connect_and_listen(
511    ws_url: &str,
512    token: &str,
513    channel: &str,
514    handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
515    output_format: OutputFormat,
516) -> Result<CommandOutput> {
517    let spinner_ref = if output_format == OutputFormat::Json {
518        eprintln!("{}", serde_json::json!({"event": "connecting", "url": ws_url}));
519        None
520    } else {
521        let pb = ProgressBar::new_spinner();
522        pb.set_message("Connecting to Private WebSocket...");
523        pb.enable_steady_tick(std::time::Duration::from_millis(100));
524        Some(pb)
525    };
526
527    let mut events: Vec<serde_json::Value> = Vec::new();
528    let mut retry_count = 0;
529
530    'reconnect: loop {
531        if retry_count > 0 {
532            let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
533            if let Some(ref pb) = spinner_ref {
534                pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
535            } else {
536                eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
537            }
538            tokio::select! {
539                _ = tokio::signal::ctrl_c() => break 'reconnect,
540                _ = tokio::time::sleep(delay) => {}
541            }
542        }
543
544        let (mut ws_stream, _) = match connect_async(ws_url).await {
545            Ok(s) => s,
546            Err(e) => {
547                retry_count += 1;
548                tracing::warn!("Private WebSocket connection failed: {}. Retrying...", e);
549                continue 'reconnect;
550            }
551        };
552
553        // 1. Connect (Authenticate)
554        let connect_msg = serde_json::json!({
555            "connect": { "token": token },
556            "id": 1
557        });
558        if let Err(e) = ws_stream.send(Message::Text(connect_msg.to_string())).await {
559            retry_count += 1;
560            continue 'reconnect;
561        }
562
563        let mut authed = false;
564        let mut subscribed = false;
565        let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
566
567        loop {
568            tokio::select! {
569                _ = tokio::signal::ctrl_c() => {
570                    let _ = ws_stream.send(Message::Close(None)).await;
571                    break 'reconnect;
572                }
573                _ = ping_interval.tick() => {
574                    // Private WS uses standard ping frames if configured in URL, 
575                    // but some versions also support application-level pings.
576                    let _ = ws_stream.send(Message::Ping(vec![])).await;
577                }
578                msg = ws_stream.next() => {
579                    let msg = match msg {
580                        Some(m) => m,
581                        None => { retry_count += 1; continue 'reconnect; }
582                    };
583
584                    match msg {
585                        Ok(Message::Text(text)) => {
586                            let val: serde_json::Value = match serde_json::from_str(&text) {
587                                Ok(v) => v,
588                                Err(_) => continue,
589                            };
590
591                            if !authed {
592                                // Check for connection success
593                                if val.get("connect").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(1) {
594                                    authed = true;
595                                    retry_count = 0;
596                                    // 2. Subscribe to user channel
597                                    let sub_msg = serde_json::json!({
598                                        "subscribe": { "channel": channel },
599                                        "id": 2
600                                    });
601                                    let _ = ws_stream.send(Message::Text(sub_msg.to_string())).await;
602                                }
603                                continue;
604                            }
605
606                            if !subscribed {
607                                if val.get("subscribe").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(2) {
608                                    subscribed = true;
609                                    if let Some(ref pb) = spinner_ref {
610                                        pb.finish_and_clear();
611                                        eprintln!("Private subscription active: {}", channel);
612                                        eprintln!();
613                                    }
614                                }
615                                continue;
616                            }
617
618                            if let Some(event) = handler(val) {
619                                events.push(event);
620                            }
621                        }
622                        Ok(Message::Ping(data)) => { let _ = ws_stream.send(Message::Pong(data)).await; }
623                        Ok(Message::Close(_)) => { retry_count += 1; continue 'reconnect; }
624                        Err(_) => { retry_count += 1; continue 'reconnect; }
625                        _ => {}
626                    }
627                }
628            }
629        }
630    }
631
632    Ok(CommandOutput::json(serde_json::json!({
633        "status": "disconnected",
634        "events": events,
635        "event_count": events.len(),
636    })))
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642    use serde_json::json;
643
644    #[test]
645    fn test_websocket_command_variants() {
646        let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
647        let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
648        let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
649        let _cmd4 = WebSocketCommand::Summary;
650        let _cmd5 = WebSocketCommand::Orders;
651    }
652
653    #[test]
654    fn test_format_ws_price() {
655        assert_eq!(format_ws_price(&json!(1234.56)), Some("1234.56".to_string()));
656        assert_eq!(format_ws_price(&json!("1234.56")), Some("1234.56".to_string()));
657        assert_eq!(format_ws_price(&json!(1000)), Some("1000".to_string()));
658        assert_eq!(format_ws_price(&json!(0)), Some("0".to_string()));
659    }
660
661    #[test]
662    fn test_ticker_parsing_logic() {
663        let msg = json!({
664            "result": {
665                "data": {
666                    "data": [
667                        [1632717721, 4087327, 14340, "1063.73019525"]
668                    ]
669                }
670            }
671        });
672        
673        // Simulating the handler logic inside ws_ticker
674        let rows = &msg["result"]["data"]["data"];
675        if let serde_json::Value::Array(arr) = rows {
676            let fields = arr[0].as_array().unwrap();
677            let price = format_ws_price(&fields[2]).unwrap();
678            assert_eq!(price, "14340");
679        } else {
680            panic!("Expected array");
681        }
682    }
683
684    #[test]
685    fn test_orderbook_parsing_array_format() {
686        let msg = json!({
687            "result": {
688                "data": {
689                    "data": {
690                        "asks": [["651000000", "0.05000000"]],
691                        "bids": [["650000000", "0.12345678"]]
692                    }
693                }
694            }
695        });
696        
697        let data = &msg["result"]["data"]["data"];
698        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
699            if let Some(arr) = entry.as_array() {
700                if arr.len() >= 2 {
701                    let p = helpers::value_to_string(&arr[0]);
702                    let v = helpers::value_to_string(&arr[1]);
703                    return Some((p, v));
704                }
705            }
706            None
707        };
708
709        let ask = data["asks"].as_array().unwrap().first().and_then(parse_entry).unwrap();
710        assert_eq!(ask.0, "651000000");
711        assert_eq!(ask.1, "0.05000000");
712    }
713
714    #[test]
715    fn test_orderbook_parsing_object_format() {
716        let msg = json!({
717            "result": {
718                "data": {
719                    "data": {
720                        "ask": [{"price": "319437000", "btc_volume": "0.11035661"}],
721                        "bid": [{"price": "319436000", "btc_volume": "0.61427265"}]
722                    }
723                }
724            }
725        });
726
727        let data = &msg["result"]["data"]["data"];
728        let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
729            if let Some(obj) = entry.as_object() {
730                let p = helpers::value_to_string(obj.get("price").unwrap());
731                let v = helpers::value_to_string(obj.get("btc_volume").unwrap());
732                return Some((p, v));
733            }
734            None
735        };
736
737        let ask = data["ask"].as_array().unwrap().first().and_then(parse_entry).unwrap();
738        assert_eq!(ask.0, "319437000");
739        assert_eq!(ask.1, "0.11035661");
740    }
741
742    #[test]
743    fn test_private_order_update_parsing() {
744        let msg = json!({
745            "push": {
746                "data": {
747                    "order_id": 12345,
748                    "pair": "btcidr",
749                    "side": "buy",
750                    "status": "filled",
751                    "price": "500000000",
752                    "amount": "0.1"
753                }
754            }
755        });
756
757        let result = msg.get("result").or(msg.get("push")).or(Some(&msg)).unwrap();
758        let data = result.get("data").unwrap_or(result);
759        
760        assert_eq!(data["order_id"], 12345);
761        assert_eq!(data["pair"], "btcidr");
762    }
763
764    #[test]
765    fn test_private_balance_update_parsing() {
766        let msg = json!({
767            "currency": "idr",
768            "available": "1000000",
769            "frozen": "50000"
770        });
771
772        let data = &msg;
773        assert_eq!(data["currency"], "idr");
774        assert_eq!(data["available"], "1000000");
775    }
776
777    #[test]
778    fn test_fetch_public_ws_token_precedence() {
779        // We can't easily mock the IndodaxClient's network call here without more refactoring,
780        // but we can test the logic of fetch_public_ws_token if we extract it slightly.
781        let default_token = helpers::DEFAULT_STATIC_WS_TOKEN;
782        assert!(default_token.starts_with("eyJ"));
783    }
784}