1use crate::client::IndodaxClient;
2use crate::commands::helpers;
3use crate::output::{CommandOutput, OutputFormat};
4use anyhow::Result;
5use futures_util::{SinkExt, StreamExt};
6use indicatif::ProgressBar;
7use std::io::IsTerminal;
8use tokio_tungstenite::connect_async;
9use tokio_tungstenite::tungstenite::Message;
10use tracing;
11
12const PUBLIC_WS_URL: &str = "wss://ws3.indodax.com/ws/";
13const PRIVATE_WS_URL: &str = "wss://pws.indodax.com/ws/?cf_ws_frame_ping_pong=true";
14
15async fn fetch_public_ws_token(client: &IndodaxClient) -> Result<String> {
16 helpers::fetch_public_ws_token(client).await
17}
18
19#[derive(Debug, clap::Subcommand)]
20pub enum WebSocketCommand {
21 #[command(name = "ticker", about = "Stream real-time ticker for a pair")]
22 Ticker {
23 #[arg(default_value = "btc_idr")]
24 pair: String,
25 },
26
27 #[command(name = "trades", about = "Stream real-time trades for a pair")]
28 Trades {
29 #[arg(default_value = "btc_idr")]
30 pair: String,
31 },
32
33 #[command(name = "book", about = "Stream real-time order book for a pair")]
34 Book {
35 #[arg(default_value = "btc_idr")]
36 pair: String,
37 },
38
39 #[command(name = "summary", about = "Stream 24h summary for all pairs")]
40 Summary,
41
42 #[command(name = "orders", about = "Stream private order updates")]
43 Orders,
44}
45
46pub async fn execute(
47 client: &IndodaxClient,
48 cmd: &WebSocketCommand,
49 output_format: OutputFormat,
50) -> Result<CommandOutput> {
51 match cmd {
52 WebSocketCommand::Ticker { pair } => {
53 let pair = helpers::normalize_pair(pair);
54 ws_ticker(client, &pair, output_format).await
55 }
56 WebSocketCommand::Trades { pair } => {
57 let pair = helpers::normalize_pair(pair);
58 ws_trades(client, &pair, output_format).await
59 }
60 WebSocketCommand::Book { pair } => {
61 let pair = helpers::normalize_pair(pair);
62 ws_book(client, &pair, output_format).await
63 }
64 WebSocketCommand::Summary => ws_summary(client, output_format).await,
65 WebSocketCommand::Orders => ws_orders(client, output_format).await,
66 }
67}
68
69async fn ws_connect_and_listen(
70 ws_url: &str,
71 token: &str,
72 channel: &str,
73 handler: impl Fn(serde_json::Value) -> Option<serde_json::Value>,
74 output_format: OutputFormat,
75) -> Result<CommandOutput> {
76 let spinner = if output_format == OutputFormat::Json {
77 eprintln!(
78 "{}",
79 serde_json::json!({"event": "connecting", "url": ws_url})
80 );
81 None
82 } else {
83 let pb = ProgressBar::new_spinner();
84 pb.set_message("Connecting to Indodax WebSocket...");
85 pb.enable_steady_tick(std::time::Duration::from_millis(100));
86 Some(pb)
87 };
88 let (mut ws_stream, _) = connect_async(ws_url).await?;
89 if let Some(ref pb) = spinner {
90 pb.set_message("Connected. Authenticating...");
91 } else {
92 eprintln!(
93 "{}",
94 serde_json::json!({"event": "connected", "status": "authenticating"})
95 );
96 }
97
98 let auth_msg = serde_json::json!({
99 "params": { "token": token },
100 "id": 1
101 });
102 ws_stream
103 .send(Message::Text(auth_msg.to_string()))
104 .await?;
105
106 let mut authed = false;
107
108 let mut events: Vec<serde_json::Value> = Vec::new();
109
110 loop {
111 tokio::select! {
112 _ = tokio::signal::ctrl_c() => {
113 if let Some(ref pb) = spinner {
114 pb.finish_and_clear();
115 eprintln!("Interrupted by user. Closing connection...");
116 } else {
117 eprintln!("{}", serde_json::json!({"event": "interrupted", "reason": "user_ctrl_c"}));
118 }
119 let _ = ws_stream.send(Message::Close(None)).await;
120 break;
121 }
122 msg = ws_stream.next() => {
123 let msg = match msg {
124 Some(m) => m,
125 None => break,
126 };
127
128 match msg {
129 Ok(Message::Text(text)) => {
130 let val = match serde_json::from_str::<serde_json::Value>(&text) {
131 Ok(v) => v,
132 Err(e) => {
133 tracing::warn!("WebSocket JSON parse error: {} (text: {})", e, text);
134 continue;
135 }
136 };
137
138 if !authed {
139 if val.get("id").and_then(|v| v.as_i64()) == Some(1)
140 && val.get("result").is_some()
141 {
142 authed = true;
143 if let Some(ref pb) = spinner {
144 pb.finish_and_clear();
145 eprintln!("Authenticated. Subscribing to channel: {}", channel);
146 eprintln!();
147 } else {
148 eprintln!("{}", serde_json::json!({"event": "authenticated", "channel": channel}));
149 }
150 let sub_msg = serde_json::json!({
151 "method": 1,
152 "params": { "channel": channel },
153 "id": 2
154 });
155 ws_stream
156 .send(Message::Text(sub_msg.to_string()))
157 .await?;
158 }
159 continue;
160 }
161
162 if val.get("id").and_then(|v| v.as_i64()) == Some(2) {
163 } else if val.get("result").is_some() {
165 if let Some(event) = handler(val) {
166 events.push(event);
167 }
168 }
169 }
170 Ok(Message::Ping(_)) => {
171 let _ = ws_stream.send(Message::Pong(vec![])).await;
172 }
173 Ok(Message::Close(_)) => {
174 if let Some(ref pb) = spinner {
175 pb.finish_and_clear();
176 eprintln!("Connection closed by server.");
177 } else {
178 eprintln!("{}", serde_json::json!({"event": "disconnected", "reason": "server_close"}));
179 }
180 break;
181 }
182 Err(e) => {
183 if let Some(ref pb) = spinner {
184 pb.finish_and_clear();
185 eprintln!("WebSocket error: {}", e);
186 } else {
187 eprintln!("{}", serde_json::json!({"event": "error", "message": e.to_string()}));
188 }
189 break;
190 }
191 _ => {}
192 }
193 }
194 }
195 }
196
197 Ok(CommandOutput::json(serde_json::json!({
198 "status": "disconnected",
199 "events": events,
200 "event_count": events.len(),
201 })))
202}
203
204fn format_ws_price(val: &serde_json::Value) -> Option<String> {
205 let f = val.as_f64()
206 .or_else(|| val.as_str().and_then(|s| s.parse::<f64>().ok()))?;
207 if f == 0.0 {
208 return Some("0".into());
209 }
210 if f.fract() == 0.0 && f.abs() >= 1.0 {
211 return Some(format!("{}", f as u64));
212 }
213 let s = format!("{:.8}", f);
214 let trimmed = s.trim_end_matches('0');
215 Some(trimmed.trim_end_matches('.').to_string())
216}
217
218async fn ws_ticker(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
219 let channel = format!("chart:tick-{}", pair);
220 let token = fetch_public_ws_token(client).await?;
221 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
222 let rows = &val["result"]["data"]["data"];
223 let mut last_event = None;
224 if let serde_json::Value::Array(arr) = rows {
225 for row in arr {
226 if let serde_json::Value::Array(fields) = row {
227 if fields.len() >= 4 {
228 let ts = fields[0].as_u64().unwrap_or(0);
229 let price = format_ws_price(&fields[2]).unwrap_or_default();
230 let time_str = chrono::DateTime::from_timestamp(ts.min(i64::MAX as u64) as i64, 0)
231 .map(|d| d.format("%H:%M:%S").to_string())
232 .unwrap_or_default();
233 if output_format == OutputFormat::Json {
234 println!("{}", serde_json::json!({
235 "event": "ticker", "pair": pair, "time": time_str, "price": price
236 }));
237 } else {
238 println!("[{}] {} {}", time_str, pair, price);
239 }
240 last_event = Some(serde_json::json!({
241 "event": "ticker", "pair": pair, "time": time_str, "price": price
242 }));
243 }
244 }
245 }
246 }
247 last_event
248 }, output_format)
249 .await
250}
251
252async fn ws_trades(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
253 let channel = format!("market:trade-activity-{}", pair);
254 let token = fetch_public_ws_token(client).await?;
255 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
256 let rows = &val["result"]["data"]["data"];
257 let mut last_event = None;
258 if let serde_json::Value::Array(arr) = rows {
259 for row in arr {
260 if let serde_json::Value::Array(fields) = row {
261 if fields.len() >= 7 {
262 let ts = fields[1].as_u64().unwrap_or(0);
263 let side = fields[3].as_str().unwrap_or("?");
264 let price = fields[4].as_f64().unwrap_or(0.0);
265 let volume = fields[6].as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
266 let time_str = chrono::DateTime::from_timestamp(ts as i64, 0)
267 .map(|d| d.format("%H:%M:%S").to_string())
268 .unwrap_or_default();
269 if output_format == OutputFormat::Json {
270 println!("{}", serde_json::json!({
271 "event": "trade", "pair": pair, "time": time_str,
272 "side": side, "price": price, "volume": volume
273 }));
274 } else {
275 println!("[{}] {} {} @ {} vol: {}", time_str, side, pair, price, volume);
276 }
277 last_event = Some(serde_json::json!({
278 "event": "trade", "pair": pair, "time": time_str,
279 "side": side, "price": price, "volume": volume
280 }));
281 }
282 }
283 }
284 }
285 last_event
286 }, output_format)
287 .await
288}
289
290async fn ws_book(client: &IndodaxClient, pair: &str, output_format: OutputFormat) -> Result<CommandOutput> {
291 let channel = format!("market:order-book-{}", pair);
292 let token = fetch_public_ws_token(client).await?;
293 ws_connect_and_listen(PUBLIC_WS_URL, &token, &channel, |val| {
294 let data = &val["result"]["data"]["data"];
295 let ask_price = data["ask"].as_array().and_then(|asks| {
296 asks.last().and_then(|best| {
297 let p = helpers::value_to_string(best.get("price").unwrap_or(&serde_json::Value::Null));
298 let a = helpers::value_to_string(
299 best.get("btc_volume")
300 .or_else(|| best.get("volume"))
301 .or_else(|| best.get("amount"))
302 .unwrap_or(&serde_json::Value::Null),
303 );
304 Some((p, a))
305 })
306 });
307 let bid_price = data["bid"].as_array().and_then(|bids| {
308 bids.first().and_then(|best| {
309 let p = helpers::value_to_string(best.get("price").unwrap_or(&serde_json::Value::Null));
310 let a = helpers::value_to_string(
311 best.get("btc_volume")
312 .or_else(|| best.get("volume"))
313 .or_else(|| best.get("amount"))
314 .unwrap_or(&serde_json::Value::Null),
315 );
316 Some((p, a))
317 })
318 });
319 let event = serde_json::json!({
320 "event": "orderbook", "pair": pair,
321 "ask": ask_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
322 "bid": bid_price.clone().map(|(p, a)| serde_json::json!({"price": p, "amount": a})),
323 });
324 if output_format == OutputFormat::Json {
325 println!("{}", event);
326 } else if std::io::stdout().is_terminal() {
327 if let Some((price, amount)) = ask_price {
328 print!("\r\x1b[KAsk: {} @ {} | ", price, amount);
329 }
330 if let Some((price, amount)) = bid_price {
331 println!("Bid: {} @ {}", price, amount);
332 }
333 } else {
334 if let Some((price, amount)) = ask_price {
335 println!("Ask: {} @ {}", price, amount);
336 }
337 if let Some((price, amount)) = bid_price {
338 println!("Bid: {} @ {}", price, amount);
339 }
340 }
341 Some(event)
342 }, output_format)
343 .await
344}
345
346async fn ws_summary(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
347 let token = fetch_public_ws_token(client).await?;
348 ws_connect_and_listen(PUBLIC_WS_URL, &token, "market:summary-24h", |val| {
349 let rows = &val["result"]["data"]["data"];
350 let mut last_event = None;
351 if let serde_json::Value::Array(arr) = rows {
352 for row in arr {
353 if let serde_json::Value::Array(fields) = row {
354 if fields.len() >= 8 {
355 let pair = fields[0].as_str().unwrap_or("?");
356 let last = helpers::value_to_string(&fields[2]);
357 let high = helpers::value_to_string(&fields[4]);
358 let low = helpers::value_to_string(&fields[3]);
359 let price_24h = fields[5].as_f64().unwrap_or(0.0);
360 let last_f = fields[2].as_f64().unwrap_or(0.0);
361 let change = if price_24h > 0.0 {
362 format!("{:+.2}%", (last_f - price_24h) / price_24h * 100.0)
363 } else {
364 "0%".to_string()
365 };
366 if output_format == OutputFormat::Json {
367 println!("{}", serde_json::json!({
368 "event": "summary", "pair": pair, "last": last,
369 "high": high, "low": low, "change": change
370 }));
371 } else if std::io::stdout().is_terminal() {
372 println!("\x1b[K{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
373 } else {
374 println!("{:15} last: {:>15} high: {:>15} low: {:>15} change: {}", pair, last, high, low, change);
375 }
376 last_event = Some(serde_json::json!({
377 "event": "summary", "pair": pair, "last": last,
378 "high": high, "low": low, "change": change
379 }));
380 }
381 }
382 }
383 }
384 last_event
385 }, output_format)
386 .await
387}
388
389async fn ws_orders(client: &IndodaxClient, output_format: OutputFormat) -> Result<CommandOutput> {
390 if client.signer().is_none() {
391 return Err(anyhow::anyhow!(
392 "Private WebSocket requires API credentials. Use 'indodax auth set' or set INDODAX_API_KEY and INDODAX_API_SECRET environment variables."
393 ));
394 }
395
396 eprintln!("Generating WebSocket token...");
397 let token = client.generate_ws_token().await.map_err(|e| {
398 anyhow::anyhow!("WebSocket token generation failed: {}. Check that your API credentials are valid and have the correct permissions.", e)
399 })?;
400 eprintln!("Token generated. Connecting to private WebSocket...");
401
402 let channel = "private:orders";
403 ws_connect_and_listen(PRIVATE_WS_URL, &token, channel, |val| {
404 let data = &val["result"]["data"];
405 let event = if let Some(order_id) = data.get("order_id").and_then(|v| v.as_u64()) {
406 let pair = data.get("pair").and_then(|v| v.as_str()).unwrap_or("?");
407 let side = data.get("side").and_then(|v| v.as_str()).unwrap_or("?");
408 let status = data.get("status").and_then(|v| v.as_str()).unwrap_or("?");
409 let price = helpers::value_to_string(data.get("price").unwrap_or(&serde_json::Value::Null));
410 let amount = helpers::value_to_string(data.get("amount").unwrap_or(&serde_json::Value::Null));
411 if output_format == OutputFormat::Json {
412 println!("{}", serde_json::json!({
413 "event": "order_update", "order_id": order_id, "pair": pair,
414 "side": side, "status": status, "price": price, "amount": amount
415 }));
416 } else {
417 println!("ID={} Pair={} Side={} Status={} Price={} Amount={}",
418 order_id, pair, side, status, price, amount);
419 }
420 Some(serde_json::json!({
421 "event": "order_update", "order_id": order_id, "pair": pair,
422 "side": side, "status": status, "price": price, "amount": amount
423 }))
424 } else {
425 if output_format == OutputFormat::Json {
426 let raw = serde_json::json!({"event": "order_update_raw", "data": &val["result"]});
427 println!("{}", raw);
428 Some(raw)
429 } else {
430 println!("{}", serde_json::to_string_pretty(&val).unwrap_or_default());
431 None
432 }
433 };
434 event
435 }, output_format)
436 .await
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_websocket_command_variants() {
445 let _cmd1 = WebSocketCommand::Ticker { pair: "btc_idr".into() };
446 let _cmd2 = WebSocketCommand::Trades { pair: "eth_idr".into() };
447 let _cmd3 = WebSocketCommand::Book { pair: "btc_idr".into() };
448 let _cmd4 = WebSocketCommand::Summary;
449 let _cmd5 = WebSocketCommand::Orders;
450 }
451
452 #[test]
453 fn test_websocket_command_ticker() {
454 let cmd = WebSocketCommand::Ticker { pair: "xrp_idr".into() };
455 match cmd {
456 WebSocketCommand::Ticker { pair } => {
457 assert_eq!(pair, "xrp_idr");
458 }
459 _ => assert!(false, "Expected Ticker command, got {:?}", cmd),
460 }
461 }
462
463 #[test]
464 fn test_websocket_command_trades() {
465 let cmd = WebSocketCommand::Trades { pair: "doge_idr".into() };
466 match cmd {
467 WebSocketCommand::Trades { pair } => {
468 assert_eq!(pair, "doge_idr");
469 }
470 _ => assert!(false, "Expected Trades command, got {:?}", cmd),
471 }
472 }
473
474 #[test]
475 fn test_websocket_command_book() {
476 let cmd = WebSocketCommand::Book { pair: "eth_idr".into() };
477 match cmd {
478 WebSocketCommand::Book { pair } => {
479 assert_eq!(pair, "eth_idr");
480 }
481 _ => assert!(false, "Expected Book command, got {:?}", cmd),
482 }
483 }
484
485 #[test]
486 fn test_websocket_command_summary() {
487 let cmd = WebSocketCommand::Summary;
488 match cmd {
489 WebSocketCommand::Summary => (),
490 _ => assert!(false, "Expected Summary command, got {:?}", cmd),
491 }
492 }
493
494 #[test]
495 fn test_websocket_command_orders() {
496 let cmd = WebSocketCommand::Orders;
497 match cmd {
498 WebSocketCommand::Orders => (),
499 _ => assert!(false, "Expected Orders command, got {:?}", cmd),
500 }
501 }
502
503 #[test]
504 fn test_format_ws_price_u64() {
505 let val = serde_json::json!(123456);
506 assert_eq!(format_ws_price(&val).as_deref(), Some("123456"));
507 }
508
509 #[test]
510 fn test_format_ws_price_f64() {
511 let val = serde_json::json!(123.456);
512 let result = format_ws_price(&val);
513 assert!(result.is_some());
514 }
515
516 #[test]
517 fn test_format_ws_price_str() {
518 let val = serde_json::json!("789");
519 assert_eq!(format_ws_price(&val).as_deref(), Some("789"));
520 }
521
522 #[test]
523 fn test_format_ws_price_null() {
524 let val = serde_json::json!(null);
525 assert!(format_ws_price(&val).is_none());
526 }
527
528 #[test]
529 fn test_public_ws_url() {
530 assert!(PUBLIC_WS_URL.contains("ws3.indodax.com"));
531 }
532
533 #[test]
534 fn test_private_ws_url() {
535 assert!(PRIVATE_WS_URL.contains("pws.indodax.com"));
536 }
537
538 #[test]
539 fn test_public_ws_token_url() {
540 assert!(crate::commands::helpers::PUBLIC_WS_TOKEN_URL.contains("indodax.com"));
541 }
542}