1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15#[derive(Debug, clap::Subcommand)]
16pub enum WebSocketCommand {
17 #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
18 Ticker {
19 #[arg(default_value = "btc_idr")]
20 pair: String,
21 },
22
23 #[command(name = "trades", about = "Stream real-time trades for a pair")]
24 Trades {
25 #[arg(default_value = "btc_idr")]
26 pair: String,
27 },
28
29 #[command(name = "book", about = "Stream real-time order book for a pair")]
30 Book {
31 #[arg(default_value = "btc_idr")]
32 pair: String,
33 },
34
35 #[command(name = "summary", about = "Stream 24h summary for all pairs")]
36 Summary,
37
38 #[command(name = "orders", about = "Stream private order updates")]
39 Orders,
40}
41
42pub async fn execute(
43 client: &IndodaxClient,
44 cmd: &WebSocketCommand,
45 output_format: OutputFormat,
46) -> Result<CommandOutput> {
47 match cmd {
48 WebSocketCommand::Ticker { pair } => {
49 let pair = helpers::normalize_pair(pair);
50 ws_ticker(client, &pair, output_format).await
51 }
52 WebSocketCommand::Trades { pair } => {
53 let pair = helpers::normalize_pair(pair);
54 ws_trades(client, &pair, output_format).await
55 }
56 WebSocketCommand::Book { pair } => {
57 let pair = helpers::normalize_pair(pair);
58 ws_book(client, &pair, output_format).await
59 }
60 WebSocketCommand::Summary => ws_summary(client, output_format).await,
61 WebSocketCommand::Orders => ws_orders(client, output_format).await,
62 }
63}
64
65async fn ws_connect_and_listen(
66 ws_url: &str,
67 token: &str,
68 channel: &str,
69 handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
70 output_format: OutputFormat,
71) -> Result<CommandOutput> {
72 let spinner_ref = if output_format == OutputFormat::Json {
73 eprintln!(
74 "{}",
75 serde_json::json!({"event": "connecting", "url": ws_url})
76 );
77 None
78 } else {
79 let pb = ProgressBar::new_spinner();
80 pb.set_message("Connecting to Indodax WebSocket...");
81 pb.enable_steady_tick(std::time::Duration::from_millis(100));
82 Some(pb)
83 };
84
85 let mut events: Vec<serde_json::Value> = Vec::new();
86 let mut retry_count = 0;
87
88 'reconnect: loop {
89 if retry_count > 0 {
90 let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
91 if let Some(ref pb) = spinner_ref {
92 pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
93 } else {
94 eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
95 }
96 tokio::select! {
97 _ = tokio::signal::ctrl_c() => break 'reconnect,
98 _ = tokio::time::sleep(delay) => {}
99 }
100 }
101
102 let (mut ws_stream, _) = match tokio::time::timeout(std::time::Duration::from_secs(10), connect_async(ws_url)).await {
103 Ok(Ok(s)) => s,
104 Ok(Err(e)) => {
105 retry_count += 1;
106 tracing::warn!("WebSocket connection failed: {}. Retrying...", e);
107 continue 'reconnect;
108 }
109 Err(_) => {
110 retry_count += 1;
111 tracing::warn!("WebSocket connection timed out after 10s. Retrying...");
112 continue 'reconnect;
113 }
114 };
115
116 if let Some(ref pb) = spinner_ref {
117 pb.set_message("Connected. Authenticating...");
118 } else {
119 eprintln!(
120 "{}",
121 serde_json::json!({"event": "connected", "status": "authenticating"})
122 );
123 }
124
125 let auth_msg = serde_json::json!({
126 "params": { "token": token },
127 "id": 1
128 });
129 if let Err(e) = ws_stream.send(Message::Text(auth_msg.to_string())).await {
130 retry_count += 1;
131 tracing::warn!("Failed to send auth message: {}. Retrying...", e);
132 continue 'reconnect;
133 }
134
135 let mut authed = false;
136 let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
137 ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138
139 loop {
140 tokio::select! {
141 _ = tokio::signal::ctrl_c() => {
142 if let Some(ref pb) = spinner_ref {
143 pb.finish_and_clear();
144 eprintln!("Interrupted by user. Closing connection...");
145 } else {
146 eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
147 }
148 let _ = ws_stream.send(Message::Close(None)).await;
149 break 'reconnect;
150 }
151 _ = ping_interval.tick() => {
152 let ping_msg = serde_json::json!({
154 "method": 7,
155 "id": 7
156 });
157 if let Err(e) = ws_stream.send(Message::Text(ping_msg.to_string())).await {
158 tracing::warn!("Failed to send WebSocket ping: {}. Triggering reconnect...", e);
159 retry_count += 1;
160 continue 'reconnect;
161 }
162 }
163 msg = ws_stream.next() => {
164 let msg = match msg {
165 Some(m) => m,
166 None => {
167 retry_count += 1;
168 tracing::warn!("WebSocket stream ended. Reconnecting...");
169 continue 'reconnect;
170 }
171 };
172
173 match msg {
174 Ok(Message::Text(text)) => {
175 let val = match serde_json::from_str::<serde_json::Value>(&text) {
176 Ok(v) => v,
177 Err(e) => {
178 tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
179 continue;
180 }
181 };
182
183 if !authed {
184 if val.get("id").and_then(|v| v.as_i64()) == Some(1)
185 && val.get("result").is_some()
186 {
187 authed = true;
188 retry_count = 0; if let Some(ref pb) = spinner_ref {
190 pb.set_message(format!("Authenticated. Subscribing to: {}", channel));
191 } else {
192 eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
193 }
194 let sub_msg = serde_json::json!({
195 "method": 1,
196 "params": { "channel": channel },
197 "id": 2
198 });
199 if let Err(_e) = ws_stream.send(Message::Text(sub_msg.to_string())).await {
200 retry_count += 1;
201 continue 'reconnect;
202 }
203 }
204 continue;
205 }
206
207 if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
208 if let Some(ref pb) = spinner_ref {
210 pb.finish_and_clear();
211 eprintln!("Subscription active: {}", channel);
212 eprintln!();
213 }
214 } else if val.get("result").is_some() {
215 if let Some(event) = handler(val) {
216 events.push(event);
217 }
218 }
219 }
220 Ok(Message::Ping(data)) => {
221 let _ = ws_stream.send(Message::Pong(data)).await;
222 }
223 Ok(Message::Close(_)) => {
224 retry_count += 1;
225 tracing::warn!("Connection closed by server. Reconnecting...");
226 continue 'reconnect;
227 }
228 Err(e) => {
229 retry_count += 1;
230 tracing::warn!("WebSocket error: {}. Reconnecting...", e);
231 continue 'reconnect;
232 }
233 _ => {}
234 }
235 }
236 }
237 }
238 }
239
240 if output_format == OutputFormat::Json {
241 Ok(CommandOutput::new_empty().with_suppress_final_output(true))
242 } else {
243 Ok(CommandOutput::json(serde_json::json!({
244 "status": "disconnected",
245 "events": events,
246 "event_count": events.len(),
247 })))
248 }
249}
250
251fn format_ws_price(val: &serde_json::Value) -> Option<String> {
252 let f = val.as_f64()
253 .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
254 if f == 0.0 {
255 return Some("0".into());
256 }
257 if (f - f.round()).abs() < f64::EPSILON && f.abs() >= 1.0 {
258 return Some(format!("{}", f as u64));
259 }
260 let s = format!("{:.8}", f);
261 let trimmed = s.trim_end_matches('0');
262 Some(trimmed.trim_end_matches('.').to_string())
263}
264
265async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
266 let channel = format!("chart:tick-{}", pair);
267 let token = helpers::fetch_public_ws_token(client).await?;
268 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
269 let rows = &val["result"]["data"]["data"];
270 let mut last_event = None;
271 if let serde_json::Value::Array(arr) = rows {
272 for row in arr {
273 if let serde_json::Value::Array(fields) = row {
274 if fields.len() >= 4 {
275 let ts = fields[0].as_u64().unwrap_or(0);
276 let price = fields.get(2).and_then(format_ws_price).unwrap_or_default();
277 let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
278 .map(|d| d.format("%H:%M:%S").to_string())
279 .unwrap_or_default();
280 if output_format == OutputFormat::Json {
281 println!("{}", serde_json::json!({
282 "event": "ticker", "pair": pair, "time": time_str, "price": price
283 }));
284 } else {
285 println!("[{}] {} {}", time_str, pair, price);
286 }
287 last_event = Some(serde_json::json!({
288 "event": "ticker", "pair": pair, "time": time_str, "price": price
289 }));
290 }
291 }
292 }
293 }
294 last_event
295 }, output_format)
296 .await
297}
298
299async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
300 let channel = format!("market:trade-activity-{}", pair);
301 let token = helpers::fetch_public_ws_token(client).await?;
302 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
303 let rows = &val["result"]["data"]["data"];
304 let mut last_event = None;
305 if let serde_json::Value::Array(arr) = rows {
306 for row in arr {
307 if let serde_json::Value::Array(fields) = row {
308 if fields.len() >= 7 {
309 let ts = fields[1].as_u64().unwrap_or(0);
310 let side = fields.get(3).and_then(|v| v.as_str()).unwrap_or("?");
311 let price = fields.get(4).and_then(|v| v.as_f64()).unwrap_or(0.0);
312 let volume = fields.get(6).and_then(|v| v.as_str()).and_then(|s| s.parse().ok()).unwrap_or(0.0);
313 let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
314 .map(|d| d.format("%H:%M:%S").to_string())
315 .unwrap_or_default();
316 if output_format == OutputFormat::Json {
317 println!("{}", serde_json::json!({
318 "event": "trade", "pair": pair, "time": time_str,
319 "side": side, "price": price, "volume": volume
320 }));
321 } else {
322 println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
323 }
324 last_event = Some(serde_json::json!({
325 "event": "trade", "pair": pair, "time": time_str,
326 "side": side, "price": price, "volume": volume
327 }));
328 }
329 }
330 }
331 }
332 last_event
333 }, output_format)
334 .await
335}
336
337async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
338 let channel = format!("market:order-book-{}", pair);
339 let token = helpers::fetch_public_ws_token(client).await?;
340 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
341 let data = &val["result"]["data"]["data"];
342
343 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
344 if let Some(arr) = entry.as_array() {
345 if arr.len() >= 2 {
346 let p = helpers::value_to_string(&arr[0]);
347 let v = helpers::value_to_string(&arr[1]);
348 return Some((p, v));
349 }
350 } else if let Some(obj) = entry.as_object() {
351 let p = helpers::value_to_string(obj.get("price").unwrap_or(&serde_json::Value::Null));
352 let v = helpers::value_to_string(
353 obj.get("btc_volume")
354 .or_else(|| obj.get("volume"))
355 .or_else(|| obj.get("amount"))
356 .unwrap_or(&serde_json::Value::Null),
357 );
358 return Some((p, v));
359 }
360 None
361 };
362
363 let ask_price = data["ask"].as_array().and_then(|asks| {
364 asks.first().and_then(parse_entry)
365 }).or_else(|| data["asks"].as_array().and_then(|asks| {
366 asks.first().and_then(parse_entry)
367 }));
368
369 let bid_price = data["bid"].as_array().and_then(|bids| {
370 bids.first().and_then(parse_entry)
371 }).or_else(|| data["bids"].as_array().and_then(|bids| {
372 bids.first().and_then(parse_entry)
373 }));
374
375 let event = serde_json::json!({
376 "event": "orderbook", "pair": pair,
377 "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
378 "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
379 });
380 if output_format == OutputFormat::Json {
381 println!("{}", event);
382 } else if std::io::stdout().is_terminal() {
383 if let Some((price, amount)) = ask_price {
384 print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
385 }
386 if let Some((price, amount)) = bid_price {
387 println!("Bid: {} @ {}", price, amount);
388 }
389 } else {
390 if let Some((price, amount)) = ask_price {
391 println!("Ask: {} @ {}", price, amount);
392 }
393 if let Some((price, amount)) = bid_price {
394 println!("Bid: {} @ {}", price, amount);
395 }
396 }
397 Some(event)
398 }, output_format)
399 .await
400}
401
402async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
403 let token = helpers::fetch_public_ws_token(client).await?;
404 ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
405 let rows = &val["result"]["data"]["data"];
406 let mut last_event = None;
407 if let serde_json::Value::Array(arr) = rows {
408 for row in arr {
409 if let serde_json::Value::Array(fields) = row {
410 if fields.len() >= 8 {
411 let pair = fields[0].as_str().unwrap_or("?");
412 let last = helpers::value_to_string(fields.get(2).unwrap_or(&serde_json::Value::Null));
413 let high = helpers::value_to_string(&fields[4]);
414 let low = helpers::value_to_string(&fields[3]);
415 let price_24h = fields[5].as_f64().unwrap_or(0.0);
416 let last_f = fields[2].as_f64().unwrap_or(0.0);
417 let change = if price_24h > 0.0 {
418 format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
419 } else {
420 "0%".to_string()
421 };
422 if output_format == OutputFormat::Json {
423 println!("{}", serde_json::json!({
424 "event": "summary", "pair": pair, "last": last,
425 "high": high, "low": low, "change": change
426 }));
427 } else if std::io::stdout().is_terminal() {
428 println!("\x1b[K{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
429 } else {
430 println!("{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
431 }
432 last_event = Some(serde_json::json!({
433 "event": "summary", "pair": pair, "last": last,
434 "high": high, "low": low, "change": change
435 }));
436 }
437 }
438 }
439 }
440 last_event
441 }, output_format)
442 .await
443}
444
445async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
446 if client.signer().is_none() {
447 return Err(anyhow::anyhow!(
448 "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
449 ));
450 }
451
452 eprintln!("Generating WebSocket token...");
453 let (token, channel) = client.generate_ws_token().await.map_err(|e| {
454 anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
455 })?;
456 eprintln!("Token generated. Connecting to private WebSocket...");
457
458 ws_private_connect_and_listen(PRIVATE_WS_URL, &token, &channel, |val| {
459 let result = val.get("result").or_else(|| val.get("push")).unwrap_or(&val);
463 let data = result.get("data").unwrap_or(result);
464
465 if let Some(order_id) = data.get("order_id").or(data.get("orderId")).and_then(|v| v.as_u64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))) {
466 let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
467 let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
468 let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
469 let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
470 let amount = helpers::value_to_string(data.get("amount").or(data.get("quantity")).unwrap_or(&serde_json::Value::Null));
471
472 if output_format == OutputFormat::Json {
473 println!("{}", serde_json::json!({
474 "event": "order_update", "order_id": order_id, "pair": pair,
475 "side": side, "status": status, "price": price, "amount": amount
476 }));
477 } else {
478 println!("Order Update: ID={} Pair={} Side={} Status={} Price={} Amount={}",
479 order_id, pair, side, status, price, amount);
480 }
481 Some(serde_json::json!({
482 "event": "order_update", "order_id": order_id, "pair": pair,
483 "side": side, "status": status, "price": price, "amount": amount
484 }))
485 } else if let Some(currency) = data.get("currency").or(data.get("asset")).and_then(|v| v.as_str()) {
486 let available = helpers::value_to_string(data.get("available").or(data.get("balance")).unwrap_or(&serde_json::Value::Null));
487 let frozen = helpers::value_to_string(data.get("frozen").or(data.get("hold")).unwrap_or(&serde_json::Value::Null));
488
489 if output_format == OutputFormat::Json {
490 println!("{}", serde_json::json!({
491 "event": "balance_update", "currency": currency,
492 "available": available, "frozen": frozen
493 }));
494 } else {
495 println!("Balance Update: {} Available={} Frozen={}", currency, available, frozen);
496 }
497 Some(serde_json::json!({
498 "event": "balance_update", "currency": currency,
499 "available": available, "frozen": frozen
500 }))
501 } else {
502 if output_format == OutputFormat::Json {
504 let raw = serde_json::json!({"event": "private_update_raw", "data": data});
505 println!("{}", raw);
506 Some(raw)
507 } else {
508 if data.get("method").and_then(|m| m.as_str()) != Some("pong") {
510 println!("Private Event: {}", serde_json::to_string(data).unwrap_or_default());
511 }
512 Some(data.clone())
513 }
514 }
515 }, output_format)
516 .await
517}
518
519async fn ws_private_connect_and_listen(
520 ws_url: &str,
521 token: &str,
522 channel: &str,
523 handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
524 output_format: OutputFormat,
525) -> Result<CommandOutput> {
526 let spinner_ref = if output_format == OutputFormat::Json {
527 eprintln!("{}", serde_json::json!({"event": "connecting", "url": ws_url}));
528 None
529 } else {
530 let pb = ProgressBar::new_spinner();
531 pb.set_message("Connecting to Private WebSocket...");
532 pb.enable_steady_tick(std::time::Duration::from_millis(100));
533 Some(pb)
534 };
535
536 let mut events: Vec<serde_json::Value> = Vec::new();
537 let mut retry_count = 0;
538
539 'reconnect: loop {
540 if retry_count > 0 {
541 let delay = std::time::Duration::from_secs(2u64.pow(retry_count.min(5)));
542 if let Some(ref pb) = spinner_ref {
543 pb.set_message(format!("Disconnected. Retrying in {:?}...", delay));
544 } else {
545 eprintln!("{}", serde_json::json!({"event": "reconnecting", "delay_secs": delay.as_secs()}));
546 }
547 tokio::select! {
548 _ = tokio::signal::ctrl_c() => break 'reconnect,
549 _ = tokio::time::sleep(delay) => {}
550 }
551 }
552
553 let (mut ws_stream, _) = match tokio::time::timeout(std::time::Duration::from_secs(10), connect_async(ws_url)).await {
554 Ok(Ok(s)) => s,
555 Ok(Err(e)) => {
556 retry_count += 1;
557 tracing::warn!("Private WebSocket connection failed: {}. Retrying...", e);
558 continue 'reconnect;
559 }
560 Err(_) => {
561 retry_count += 1;
562 tracing::warn!("Private WebSocket connection timed out after 10s. Retrying...");
563 continue 'reconnect;
564 }
565 };
566
567 let connect_msg = serde_json::json!({
569 "connect": { "token": token },
570 "id": 1
571 });
572 if let Err(_e) = ws_stream.send(Message::Text(connect_msg.to_string())).await {
573 retry_count += 1;
574 continue 'reconnect;
575 }
576
577 let mut authed = false;
578 let mut subscribed = false;
579 let mut ping_interval = tokio::time::interval(std::time::Duration::from_secs(30));
580 ping_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
581
582 loop {
583 tokio::select! {
584 _ = tokio::signal::ctrl_c() => {
585 let _ = ws_stream.send(Message::Close(None)).await;
586 break 'reconnect;
587 }
588 _ = ping_interval.tick() => {
589 let _ = ws_stream.send(Message::Ping(vec![])).await;
592 }
593 msg = ws_stream.next() => {
594 let msg = match msg {
595 Some(m) => m,
596 None => { retry_count += 1; continue 'reconnect; }
597 };
598
599 match msg {
600 Ok(Message::Text(text)) => {
601 let val: serde_json::Value = match serde_json::from_str(&text) {
602 Ok(v) => v,
603 Err(_) => continue,
604 };
605
606 if !authed {
607 if val.get("connect").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(1) {
609 authed = true;
610 retry_count = 0;
611 let sub_msg = serde_json::json!({
613 "subscribe": { "channel": channel },
614 "id": 2
615 });
616 let _ = ws_stream.send(Message::Text(sub_msg.to_string())).await;
617 }
618 continue;
619 }
620
621 if !subscribed {
622 if val.get("subscribe").is_some() || val.get("id").and_then(|v| v.as_i64()) == Some(2) {
623 subscribed = true;
624 if let Some(ref pb) = spinner_ref {
625 pb.finish_and_clear();
626 eprintln!("Private subscription active: {}", channel);
627 eprintln!();
628 }
629 }
630 continue;
631 }
632
633 if let Some(event) = handler(val) {
634 events.push(event);
635 }
636 }
637 Ok(Message::Ping(data)) => { let _ = ws_stream.send(Message::Pong(data)).await; }
638 Ok(Message::Close(_)) => { retry_count += 1; continue 'reconnect; }
639 Err(_) => { retry_count += 1; continue 'reconnect; }
640 _ => {}
641 }
642 }
643 }
644 }
645 }
646
647 if output_format == OutputFormat::Json {
648 Ok(CommandOutput::new_empty().with_suppress_final_output(true))
649 } else {
650 Ok(CommandOutput::json(serde_json::json!({
651 "status": "disconnected",
652 "events": events,
653 "event_count": events.len(),
654 })))
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use serde_json::json;
662
663 #[test]
664 fn test_websocket_command_variants() {
665 let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
666 let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
667 let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
668 let _cmd4 = WebSocketCommand::Summary;
669 let _cmd5 = WebSocketCommand::Orders;
670 }
671
672 #[test]
673 fn test_format_ws_price() {
674 assert_eq!(format_ws_price(&json!(1234.56)), Some("1234.56".to_string()));
675 assert_eq!(format_ws_price(&json!("1234.56")), Some("1234.56".to_string()));
676 assert_eq!(format_ws_price(&json!(1000)), Some("1000".to_string()));
677 assert_eq!(format_ws_price(&json!(0)), Some("0".to_string()));
678 }
679
680 #[test]
681 fn test_ticker_parsing_logic() {
682 let msg = json!({
683 "result": {
684 "data": {
685 "data": [
686 [1632717721, 4087327, 14340, "1063.73019525"]
687 ]
688 }
689 }
690 });
691
692 let rows = &msg["result"]["data"]["data"];
694 if let serde_json::Value::Array(arr) = rows {
695 let fields = arr[0].as_array().unwrap();
696 let price = format_ws_price(&fields[2]).unwrap();
697 assert_eq!(price, "14340");
698 } else {
699 panic!("Expected array");
700 }
701 }
702
703 #[test]
704 fn test_orderbook_parsing_array_format() {
705 let msg = json!({
706 "result": {
707 "data": {
708 "data": {
709 "asks": [["651000000", "0.05000000"]],
710 "bids": [["650000000", "0.12345678"]]
711 }
712 }
713 }
714 });
715
716 let data = &msg["result"]["data"]["data"];
717 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
718 if let Some(arr) = entry.as_array() {
719 if arr.len() >= 2 {
720 let p = helpers::value_to_string(&arr[0]);
721 let v = helpers::value_to_string(&arr[1]);
722 return Some((p, v));
723 }
724 }
725 None
726 };
727
728 let ask = data["asks"].as_array().unwrap().first().and_then(parse_entry).unwrap();
729 assert_eq!(ask.0, "651000000");
730 assert_eq!(ask.1, "0.05000000");
731 }
732
733 #[test]
734 fn test_orderbook_parsing_object_format() {
735 let msg = json!({
736 "result": {
737 "data": {
738 "data": {
739 "ask": [{"price": "319437000", "btc_volume": "0.11035661"}],
740 "bid": [{"price": "319436000", "btc_volume": "0.61427265"}]
741 }
742 }
743 }
744 });
745
746 let data = &msg["result"]["data"]["data"];
747 let parse_entry = |entry: &serde_json::Value| -> Option<(String, String)> {
748 if let Some(obj) = entry.as_object() {
749 let p = helpers::value_to_string(obj.get("price").unwrap());
750 let v = helpers::value_to_string(obj.get("btc_volume").unwrap());
751 return Some((p, v));
752 }
753 None
754 };
755
756 let ask = data["ask"].as_array().unwrap().first().and_then(parse_entry).unwrap();
757 assert_eq!(ask.0, "319437000");
758 assert_eq!(ask.1, "0.11035661");
759 }
760
761 #[test]
762 fn test_private_order_update_parsing() {
763 let msg = json!({
764 "push": {
765 "data": {
766 "order_id": 12345,
767 "pair": "btcidr",
768 "side": "buy",
769 "status": "filled",
770 "price": "500000000",
771 "amount": "0.1"
772 }
773 }
774 });
775
776 let result = msg.get("result").or_else(|| msg.get("push")).unwrap_or(&msg);
777 let data = result.get("data").unwrap_or(result);
778
779 assert_eq!(data["order_id"], 12345);
780 assert_eq!(data["pair"], "btcidr");
781 }
782
783 #[test]
784 fn test_private_balance_update_parsing() {
785 let msg = json!({
786 "currency": "idr",
787 "available": "1000000",
788 "frozen": "50000"
789 });
790
791 let data = &msg;
792 assert_eq!(data["currency"], "idr");
793 assert_eq!(data["available"], "1000000");
794 }
795
796 #[test]
797 fn test_fetch_public_ws_token_precedence() {
798 let default_token = helpers::DEFAULT_STATIC_WS_TOKEN;
801 assert!(default_token.starts_with("eyJ"));
802 }
803}