1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15#[derive(Debug, clap::Subcommand)]
16pub enum WebSocketCommand {
17 #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
18 Ticker {
19 #[arg(default_value = "btc_idr")]
20 pair: String,
21 },
22
23 #[command(name = "trades", about = "Stream real-time trades for a pair")]
24 Trades {
25 #[arg(default_value = "btc_idr")]
26 pair: String,
27 },
28
29 #[command(name = "book", about = "Stream real-time order book for a pair")]
30 Book {
31 #[arg(default_value = "btc_idr")]
32 pair: String,
33 },
34
35 #[command(name = "summary", about = "Stream 24h summary for all pairs")]
36 Summary,
37
38 #[command(name = "orders", about = "Stream private order updates")]
39 Orders,
40}
41
42pub async fn execute(
43 client: &IndodaxClient,
44 cmd: &WebSocketCommand,
45 output_format: OutputFormat,
46) -> Result<CommandOutput> {
47 match cmd {
48 WebSocketCommand::Ticker { pair } => {
49 let pair = helpers::normalize_pair(pair);
50 ws_ticker(client, &pair, output_format).await
51 }
52 WebSocketCommand::Trades { pair } => {
53 let pair = helpers::normalize_pair(pair);
54 ws_trades(client, &pair, output_format).await
55 }
56 WebSocketCommand::Book { pair } => {
57 let pair = helpers::normalize_pair(pair);
58 ws_book(client, &pair, output_format).await
59 }
60 WebSocketCommand::Summary => ws_summary(client, output_format).await,
61 WebSocketCommand::Orders => ws_orders(client, output_format).await,
62 }
63}
64
65async fn ws_connect_and_listen(
66 ws_url: &str,
67 token: &str,
68 channel: &str,
69 handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
70 output_format: OutputFormat,
71) -> Result<CommandOutput> {
72 let spinner_ref = if output_format == OutputFormat::Json {
73 eprintln!(
74 "{}",
75 serde_json::json!({"event": "connecting", "url": ws_url})
76 );
77 None
78 } else {
79 let pb = ProgressBar::new_spinner();
80 pb.set_message("Connecting to Indodax WebSocket...");
81 pb.enable_steady_tick(std::time::Duration::from_millis(100));
82 Some(pb)
83 };
84
85 let mut events: Vec<serde_json::Value> = Vec::new();
86 let mut retry_count = 0;
87
88 'reconnect: loop {
89 if retry_count > 0 {
90 let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
91 if let Some(ref pb) = spinner_ref {
92 pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
93 } else {
94 eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
95 }
96 tokio::select! {
97 _ = tokio::signal::ctrl_c() => break 'reconnect,
98 _ = tokio::time::sleep(delay) => {}
99 }
100 }
101
102 let (mut ws_stream, _) = match connect_async(ws_url).await {
103 Ok(s) => s,
104 Err(e) => {
105 retry_count += 1;
106 tracing::warn!("WebSocket connection failed: {}. Retrying...", e);
107 continue 'reconnect;
108 }
109 };
110
111 if let Some(ref pb) = spinner_ref {
112 pb.set_message("Connected. Authenticating...");
113 } else {
114 eprintln!(
115 "{}",
116 serde_json::json!({"event": "connected", "status": "authenticating"})
117 );
118 }
119
120 let auth_msg = serde_json::json!({
121 "params": { "token": token },
122 "id": 1
123 });
124 if let Err(e) = ws_stream.send(Message::Text(auth_msg.to_string())).await {
125 retry_count += 1;
126 tracing::warn!("Failed to send auth message: {}. Retrying...", e);
127 continue 'reconnect;
128 }
129
130 let mut authed = false;
131 let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
132 ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
133
134 loop {
135 tokio::select! {
136 _ = tokio::signal::ctrl_c() => {
137 if let Some(ref pb) = spinner_ref {
138 pb.finish_and_clear();
139 eprintln!("Interrupted by user. Closing connection...");
140 } else {
141 eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
142 }
143 let _ = ws_stream.send(Message::Close(None)).await;
144 break 'reconnect;
145 }
146 _ = ping_interval.tick() => {
147 let ping_msg = serde_json::json!({
149 "method": 7,
150 "id": 7
151 });
152 if let Err(e) = ws_stream.send(Message::Text(ping_msg.to_string())).await {
153 tracing::warn!("Failed to send WebSocket ping: {}. Triggering reconnect...", e);
154 retry_count += 1;
155 continue 'reconnect;
156 }
157 }
158 msg = ws_stream.next() => {
159 let msg = match msg {
160 Some(m) => m,
161 None => {
162 retry_count += 1;
163 tracing::warn!("WebSocket stream ended. Reconnecting...");
164 continue 'reconnect;
165 }
166 };
167
168 match msg {
169 Ok(Message::Text(text)) => {
170 let val = match serde_json::from_str::<serde_json::Value>(&text) {
171 Ok(v) => v,
172 Err(e) => {
173 tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
174 continue;
175 }
176 };
177
178 if !authed {
179 if val.get("id").and_then(|v| v.as_i64()) == Some(1)
180 && val.get("result").is_some()
181 {
182 authed = true;
183 retry_count = 0; if let Some(ref pb) = spinner_ref {
185 pb.set_message(format!("Authenticated. Subscribing to: {}", channel));
186 } else {
187 eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
188 }
189 let sub_msg = serde_json::json!({
190 "method": 1,
191 "params": { "channel": channel },
192 "id": 2
193 });
194 if let Err(e) = ws_stream.send(Message::Text(sub_msg.to_string())).await {
195 retry_count += 1;
196 continue 'reconnect;
197 }
198 }
199 continue;
200 }
201
202 if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
203 if let Some(ref pb) = spinner_ref {
205 pb.finish_and_clear();
206 eprintln!("Subscription active: {}", channel);
207 eprintln!();
208 }
209 } else if val.get("result").is_some() {
210 if let Some(event) = handler(val) {
211 events.push(event);
212 }
213 }
214 }
215 Ok(Message::Ping(data)) => {
216 let _ = ws_stream.send(Message::Pong(data)).await;
217 }
218 Ok(Message::Close(_)) => {
219 retry_count += 1;
220 tracing::warn!("Connection closed by server. Reconnecting...");
221 continue 'reconnect;
222 }
223 Err(e) => {
224 retry_count += 1;
225 tracing::warn!("WebSocket error: {}. Reconnecting...", e);
226 continue 'reconnect;
227 }
228 _ => {}
229 }
230 }
231 }
232 }
233 }
234
235 Ok(CommandOutput::json(serde_json::json!({
236 "status": "disconnected",
237 "events": events,
238 "event_count": events.len(),
239 })))
240}
241
242fn format_ws_price(val: &serde_json::Value) -> Option<String> {
243 let f = val.as_f64()
244 .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
245 if f == 0.0 {
246 return Some("0".into());
247 }
248 if (f - f.round()).abs() < f64::EPSILON && f.abs() >= 1.0 {
249 return Some(format!("{}", f as u64));
250 }
251 let s = format!("{:.8}", f);
252 let trimmed = s.trim_end_matches('0');
253 Some(trimmed.trim_end_matches('.').to_string())
254}
255
256async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
257 let channel = format!("chart:tick-{}", pair);
258 let token = helpers::fetch_public_ws_token(client).await?;
259 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
260 let rows = &val["result"]["data"]["data"];
261 let mut last_event = None;
262 if let serde_json::Value::Array(arr) = rows {
263 for row in arr {
264 if let serde_json::Value::Array(fields) = row {
265 if fields.len() >= 4 {
266 let ts = fields[0].as_u64().unwrap_or(0);
267 let price = format_ws_price(&fields[2]).unwrap_or_default();
268 let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
269 .map(|d| d.format("%H:%M:%S").to_string())
270 .unwrap_or_default();
271 if output_format == OutputFormat::Json {
272 println!("{}", serde_json::json!({
273 "event": "ticker", "pair": pair, "time": time_str, "price": price
274 }));
275 } else {
276 println!("[{}] {} {}", time_str, pair, price);
277 }
278 last_event = Some(serde_json::json!({
279 "event": "ticker", "pair": pair, "time": time_str, "price": price
280 }));
281 }
282 }
283 }
284 }
285 last_event
286 }, output_format)
287 .await
288}
289
290async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
291 let channel = format!("market:trade-activity-{}", pair);
292 let token = helpers::fetch_public_ws_token(client).await?;
293 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
294 let rows = &val["result"]["data"]["data"];
295 let mut last_event = None;
296 if let serde_json::Value::Array(arr) = rows {
297 for row in arr {
298 if let serde_json::Value::Array(fields) = row {
299 if fields.len() >= 7 {
300 let ts = fields[1].as_u64().unwrap_or(0);
301 let side = fields[3].as_str().unwrap_or("?");
302 let price = fields[4].as_f64().unwrap_or(0.0);
303 let volume = fields[6].as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
304 let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
305 .map(|d| d.format("%H:%M:%S").to_string())
306 .unwrap_or_default();
307 if output_format == OutputFormat::Json {
308 println!("{}", serde_json::json!({
309 "event": "trade", "pair": pair, "time": time_str,
310 "side": side, "price": price, "volume": volume
311 }));
312 } else {
313 println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
314 }
315 last_event = Some(serde_json::json!({
316 "event": "trade", "pair": pair, "time": time_str,
317 "side": side, "price": price, "volume": volume
318 }));
319 }
320 }
321 }
322 }
323 last_event
324 }, output_format)
325 .await
326}
327
328async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
329 let channel = format!("market:order-book-{}", pair);
330 let token = helpers::fetch_public_ws_token(client).await?;
331 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
332 let data = &val["result"]["data"]["data"];
333
334 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
335 if let Some(arr) = entry.as_array() {
336 if arr.len() >= 2 {
337 let p = helpers::value_to_string(&arr[0]);
338 let v = helpers::value_to_string(&arr[1]);
339 return Some((p, v));
340 }
341 } else if let Some(obj) = entry.as_object() {
342 let p = helpers::value_to_string(obj.get("price").unwrap_or(&serde_json::Value::Null));
343 let v = helpers::value_to_string(
344 obj.get("btc_volume")
345 .or_else(|| obj.get("volume"))
346 .or_else(|| obj.get("amount"))
347 .unwrap_or(&serde_json::Value::Null),
348 );
349 return Some((p, v));
350 }
351 None
352 };
353
354 let ask_price = data["ask"].as_array().and_then(|asks| {
355 asks.first().and_then(parse_entry)
356 }).or_else(|| data["asks"].as_array().and_then(|asks| {
357 asks.first().and_then(parse_entry)
358 }));
359
360 let bid_price = data["bid"].as_array().and_then(|bids| {
361 bids.first().and_then(parse_entry)
362 }).or_else(|| data["bids"].as_array().and_then(|bids| {
363 bids.first().and_then(parse_entry)
364 }));
365
366 let event = serde_json::json!({
367 "event": "orderbook", "pair": pair,
368 "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
369 "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
370 });
371 if output_format == OutputFormat::Json {
372 println!("{}", event);
373 } else if std::io::stdout().is_terminal() {
374 if let Some((price, amount)) = ask_price {
375 print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
376 }
377 if let Some((price, amount)) = bid_price {
378 println!("Bid: {} @ {}", price, amount);
379 }
380 } else {
381 if let Some((price, amount)) = ask_price {
382 println!("Ask: {} @ {}", price, amount);
383 }
384 if let Some((price, amount)) = bid_price {
385 println!("Bid: {} @ {}", price, amount);
386 }
387 }
388 Some(event)
389 }, output_format)
390 .await
391}
392
393async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
394 let token = helpers::fetch_public_ws_token(client).await?;
395 ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
396 let rows = &val["result"]["data"]["data"];
397 let mut last_event = None;
398 if let serde_json::Value::Array(arr) = rows {
399 for row in arr {
400 if let serde_json::Value::Array(fields) = row {
401 if fields.len() >= 8 {
402 let pair = fields[0].as_str().unwrap_or("?");
403 let last = helpers::value_to_string(&fields[2]);
404 let high = helpers::value_to_string(&fields[4]);
405 let low = helpers::value_to_string(&fields[3]);
406 let price_24h = fields[5].as_f64().unwrap_or(0.0);
407 let last_f = fields[2].as_f64().unwrap_or(0.0);
408 let change = if price_24h > 0.0 {
409 format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
410 } else {
411 "0%".to_string()
412 };
413 if output_format == OutputFormat::Json {
414 println!("{}", serde_json::json!({
415 "event": "summary", "pair": pair, "last": last,
416 "high": high, "low": low, "change": change
417 }));
418 } else if std::io::stdout().is_terminal() {
419 println!("\x1b[K{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
420 } else {
421 println!("{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
422 }
423 last_event = Some(serde_json::json!({
424 "event": "summary", "pair": pair, "last": last,
425 "high": high, "low": low, "change": change
426 }));
427 }
428 }
429 }
430 }
431 last_event
432 }, output_format)
433 .await
434}
435
436async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
437 if client.signer().is_none() {
438 return Err(anyhow::anyhow!(
439 "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
440 ));
441 }
442
443 eprintln!("Generating WebSocket token...");
444 let (token, channel) = client.generate_ws_token().await.map_err(|e| {
445 anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
446 })?;
447 eprintln!("Token generated. Connecting to private WebSocket...");
448
449 ws_private_connect_and_listen(PRIVATE_WS_URL, &token, &channel, |val| {
450 let result = val.get("result").or(val.get("push")).or(Some(&val)).unwrap();
454 let data = result.get("data").unwrap_or(result);
455
456 if let Some(order_id) = data.get("order_id").or(data.get("orderId")).and_then(|v| v.as_u64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))) {
457 let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
458 let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
459 let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
460 let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
461 let amount = helpers::value_to_string(data.get("amount").or(data.get("quantity")).unwrap_or(&serde_json::Value::Null));
462
463 if output_format == OutputFormat::Json {
464 println!("{}", serde_json::json!({
465 "event": "order_update", "order_id": order_id, "pair": pair,
466 "side": side, "status": status, "price": price, "amount": amount
467 }));
468 } else {
469 println!("Order Update: ID={} Pair={} Side={} Status={} Price={} Amount={}",
470 order_id, pair, side, status, price, amount);
471 }
472 Some(serde_json::json!({
473 "event": "order_update", "order_id": order_id, "pair": pair,
474 "side": side, "status": status, "price": price, "amount": amount
475 }))
476 } else if let Some(currency) = data.get("currency").or(data.get("asset")).and_then(|v| v.as_str()) {
477 let available = helpers::value_to_string(data.get("available").or(data.get("balance")).unwrap_or(&serde_json::Value::Null));
478 let frozen = helpers::value_to_string(data.get("frozen").or(data.get("hold")).unwrap_or(&serde_json::Value::Null));
479
480 if output_format == OutputFormat::Json {
481 println!("{}", serde_json::json!({
482 "event": "balance_update", "currency": currency,
483 "available": available, "frozen": frozen
484 }));
485 } else {
486 println!("Balance Update: {} Available={} Frozen={}", currency, available, frozen);
487 }
488 Some(serde_json::json!({
489 "event": "balance_update", "currency": currency,
490 "available": available, "frozen": frozen
491 }))
492 } else {
493 if output_format == OutputFormat::Json {
495 let raw = serde_json::json!({"event": "private_update_raw", "data": data});
496 println!("{}", raw);
497 Some(raw)
498 } else {
499 if data.get("method").and_then(|m| m.as_str()) != Some("pong") {
501 println!("Private Event: {}", serde_json::to_string(data).unwrap_or_default());
502 }
503 Some(data.clone())
504 }
505 }
506 }, output_format)
507 .await
508}
509
510async fn ws_private_connect_and_listen(
511 ws_url: &str,
512 token: &str,
513 channel: &str,
514 handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
515 output_format: OutputFormat,
516) -> Result<CommandOutput> {
517 let spinner_ref = if output_format == OutputFormat::Json {
518 eprintln!("{}", serde_json::json!({"event": "connecting", "url": ws_url}));
519 None
520 } else {
521 let pb = ProgressBar::new_spinner();
522 pb.set_message("Connecting to Private WebSocket...");
523 pb.enable_steady_tick(std::time::Duration::from_millis(100));
524 Some(pb)
525 };
526
527 let mut events: Vec<serde_json::Value> = Vec::new();
528 let mut retry_count = 0;
529
530 'reconnect: loop {
531 if retry_count > 0 {
532 let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
533 if let Some(ref pb) = spinner_ref {
534 pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
535 } else {
536 eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
537 }
538 tokio::select! {
539 _ = tokio::signal::ctrl_c() => break 'reconnect,
540 _ = tokio::time::sleep(delay) => {}
541 }
542 }
543
544 let (mut ws_stream, _) = match connect_async(ws_url).await {
545 Ok(s) => s,
546 Err(e) => {
547 retry_count += 1;
548 tracing::warn!("Private WebSocket connection failed: {}. Retrying...", e);
549 continue 'reconnect;
550 }
551 };
552
553 let connect_msg = serde_json::json!({
555 "connect": { "token": token },
556 "id": 1
557 });
558 if let Err(e) = ws_stream.send(Message::Text(connect_msg.to_string())).await {
559 retry_count += 1;
560 continue 'reconnect;
561 }
562
563 let mut authed = false;
564 let mut subscribed = false;
565 let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
566
567 loop {
568 tokio::select! {
569 _ = tokio::signal::ctrl_c() => {
570 let _ = ws_stream.send(Message::Close(None)).await;
571 break 'reconnect;
572 }
573 _ = ping_interval.tick() => {
574 let _ = ws_stream.send(Message::Ping(vec![])).await;
577 }
578 msg = ws_stream.next() => {
579 let msg = match msg {
580 Some(m) => m,
581 None => { retry_count += 1; continue 'reconnect; }
582 };
583
584 match msg {
585 Ok(Message::Text(text)) => {
586 let val: serde_json::Value = match serde_json::from_str(&text) {
587 Ok(v) => v,
588 Err(_) => continue,
589 };
590
591 if !authed {
592 if val.get("connect").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(1) {
594 authed = true;
595 retry_count = 0;
596 let sub_msg = serde_json::json!({
598 "subscribe": { "channel": channel },
599 "id": 2
600 });
601 let _ = ws_stream.send(Message::Text(sub_msg.to_string())).await;
602 }
603 continue;
604 }
605
606 if !subscribed {
607 if val.get("subscribe").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(2) {
608 subscribed = true;
609 if let Some(ref pb) = spinner_ref {
610 pb.finish_and_clear();
611 eprintln!("Private subscription active: {}", channel);
612 eprintln!();
613 }
614 }
615 continue;
616 }
617
618 if let Some(event) = handler(val) {
619 events.push(event);
620 }
621 }
622 Ok(Message::Ping(data)) => { let _ = ws_stream.send(Message::Pong(data)).await; }
623 Ok(Message::Close(_)) => { retry_count += 1; continue 'reconnect; }
624 Err(_) => { retry_count += 1; continue 'reconnect; }
625 _ => {}
626 }
627 }
628 }
629 }
630 }
631
632 Ok(CommandOutput::json(serde_json::json!({
633 "status": "disconnected",
634 "events": events,
635 "event_count": events.len(),
636 })))
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use serde_json::json;
643
644 #[test]
645 fn test_websocket_command_variants() {
646 let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
647 let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
648 let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
649 let _cmd4 = WebSocketCommand::Summary;
650 let _cmd5 = WebSocketCommand::Orders;
651 }
652
653 #[test]
654 fn test_format_ws_price() {
655 assert_eq!(format_ws_price(&json!(1234.56)), Some("1234.56".to_string()));
656 assert_eq!(format_ws_price(&json!("1234.56")), Some("1234.56".to_string()));
657 assert_eq!(format_ws_price(&json!(1000)), Some("1000".to_string()));
658 assert_eq!(format_ws_price(&json!(0)), Some("0".to_string()));
659 }
660
661 #[test]
662 fn test_ticker_parsing_logic() {
663 let msg = json!({
664 "result": {
665 "data": {
666 "data": [
667 [1632717721, 4087327, 14340, "1063.73019525"]
668 ]
669 }
670 }
671 });
672
673 let rows = &msg["result"]["data"]["data"];
675 if let serde_json::Value::Array(arr) = rows {
676 let fields = arr[0].as_array().unwrap();
677 let price = format_ws_price(&fields[2]).unwrap();
678 assert_eq!(price, "14340");
679 } else {
680 panic!("Expected array");
681 }
682 }
683
684 #[test]
685 fn test_orderbook_parsing_array_format() {
686 let msg = json!({
687 "result": {
688 "data": {
689 "data": {
690 "asks": [["651000000", "0.05000000"]],
691 "bids": [["650000000", "0.12345678"]]
692 }
693 }
694 }
695 });
696
697 let data = &msg["result"]["data"]["data"];
698 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
699 if let Some(arr) = entry.as_array() {
700 if arr.len() >= 2 {
701 let p = helpers::value_to_string(&arr[0]);
702 let v = helpers::value_to_string(&arr[1]);
703 return Some((p, v));
704 }
705 }
706 None
707 };
708
709 let ask = data["asks"].as_array().unwrap().first().and_then(parse_entry).unwrap();
710 assert_eq!(ask.0, "651000000");
711 assert_eq!(ask.1, "0.05000000");
712 }
713
714 #[test]
715 fn test_orderbook_parsing_object_format() {
716 let msg = json!({
717 "result": {
718 "data": {
719 "data": {
720 "ask": [{"price": "319437000", "btc_volume": "0.11035661"}],
721 "bid": [{"price": "319436000", "btc_volume": "0.61427265"}]
722 }
723 }
724 }
725 });
726
727 let data = &msg["result"]["data"]["data"];
728 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
729 if let Some(obj) = entry.as_object() {
730 let p = helpers::value_to_string(obj.get("price").unwrap());
731 let v = helpers::value_to_string(obj.get("btc_volume").unwrap());
732 return Some((p, v));
733 }
734 None
735 };
736
737 let ask = data["ask"].as_array().unwrap().first().and_then(parse_entry).unwrap();
738 assert_eq!(ask.0, "319437000");
739 assert_eq!(ask.1, "0.11035661");
740 }
741
742 #[test]
743 fn test_private_order_update_parsing() {
744 let msg = json!({
745 "push": {
746 "data": {
747 "order_id": 12345,
748 "pair": "btcidr",
749 "side": "buy",
750 "status": "filled",
751 "price": "500000000",
752 "amount": "0.1"
753 }
754 }
755 });
756
757 let result = msg.get("result").or(msg.get("push")).or(Some(&msg)).unwrap();
758 let data = result.get("data").unwrap_or(result);
759
760 assert_eq!(data["order_id"], 12345);
761 assert_eq!(data["pair"], "btcidr");
762 }
763
764 #[test]
765 fn test_private_balance_update_parsing() {
766 let msg = json!({
767 "currency": "idr",
768 "available": "1000000",
769 "frozen": "50000"
770 });
771
772 let data = &msg;
773 assert_eq!(data["currency"], "idr");
774 assert_eq!(data["available"], "1000000");
775 }
776
777 #[test]
778 fn test_fetch_public_ws_token_precedence() {
779 let default_token = helpers::DEFAULT_STATIC_WS_TOKEN;
782 assert!(default_token.starts_with("eyJ"));
783 }
784}