1use std::collections::HashMap;
63use std::str::FromStr;
64use std::sync::{Arc, Mutex};
65use std::sync::atomic::{AtomicBool, Ordering};
66use std::time::Duration;
67use eyre::bail;
68use serde_json::{json, Value};
69use crate::msgs::account::{Fill, LeverageSetting, Margin, OrderState, PositionInfo};
70use crate::msgs::responses::Response;
71use crate::msgs::subscription::SubscriptionRequest;
72
73use futures_util::{SinkExt, StreamExt};
74use futures_util::stream::SplitSink;
75use serde::Deserialize;
76use solana_hash::Hash;
77use solana_pubkey::Pubkey;
78use tokio::net::TcpStream;
79use tokio::sync::{broadcast, mpsc, oneshot, watch};
80use tokio::time;
81use tokio_tungstenite::{
82 connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
83};
84use tracing::{debug, error, info, warn};
85use crate::api::parts::command::Command;
86use crate::api::parts::config::WSConfig;
87use crate::api::parts::{make_nonce, Event, Topic};
88use crate::common::side::Side;
89use crate::common::tif::TimeInForce;
90use crate::msgs::{CancelAll, CancelOrder, LimitOrder, MarketOrder, Price};
91use crate::msgs::md::{Candle, L2Snapshot, Ticker};
92use crate::transaction::{Action, ActionMeta, Transaction, TransactionSigner};
93#[derive(Debug, Clone, Default, Deserialize)]
100#[allow(unused)]
101pub struct AccountState {
102 pub margin: Margin,
103 pub positions: HashMap<String, PositionInfo>,
104 pub open_orders: HashMap<String, OrderState>,
105 pub leverage_settings: HashMap<String, LeverageSetting>,
106}
107
108#[allow(unused)]
115pub type EventHandler = Box<dyn Fn(&Event) + Send + Sync>;
116
117#[allow(unused)]
130#[derive(Clone)]
131pub struct BulkWsClient {
132 cmd_tx: mpsc::Sender<Command>,
134 handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>>,
136
137 ticker_rx: watch::Receiver<HashMap<String, Ticker>>,
140 account_rx: watch::Receiver<AccountState>,
142
143 signer: Option<TransactionSigner>,
145 default_timeout: Duration,
146
147 next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
149
150 actor_handle: std::sync::Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
152
153 connected: Arc<AtomicBool>,
156
157 disconnect_tx: broadcast::Sender<String>,
161}
162
163#[allow(unused)]
164impl BulkWsClient {
165 pub async fn connect(config: WSConfig) -> eyre::Result<Self> {
175 info!("Connecting to {}", config.url);
176 let (ws_stream, _) = connect_async(&config.url).await?;
177 let (ws_write, ws_read) = ws_stream.split();
178 info!("Connected to Bulk Exchange WebSocket");
179
180 let (ticker_tx, ticker_rx) = watch::channel(HashMap::new());
182 let (account_tx, account_rx) = watch::channel(AccountState::default());
183
184 let (cmd_tx, cmd_rx) = mpsc::channel::<Command>(512);
186
187 let handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>> = Arc::default();
189 let handlers_task = Arc::clone(&handlers);
190 let (event_tx, mut event_rx) = mpsc::channel::<(Topic, Event)>(32768);
191
192 tokio::spawn(async move {
193 while let Some((topic, event)) = event_rx.recv().await {
194 let map = handlers_task.lock().unwrap();
195 if let Some(hs) = map.get(&topic) {
196 for h in hs {
197 h(&event);
198 }
199 }
200 }
201 });
202
203 let connected = Arc::new(AtomicBool::new(true));
205 let (disconnect_tx, _) = broadcast::channel::<String>(4);
206
207 let actor = Actor {
208 ws_write,
209 event_tx,
210 cmd_rx,
211 ticker_tx,
212 account_tx,
213 tickers: HashMap::new(),
214 prices: HashMap::new(),
215 account_state: AccountState::default(),
216 pending: HashMap::new(),
217 subscriptions: Vec::new(),
218 connected: Arc::clone(&connected),
219 disconnect_tx: disconnect_tx.clone(),
220 };
221
222 let mut initial_subs = Vec::new();
224 if config.track_account {
225 if let Some(ref signer) = config.signer {
226 let pk_str = signer.public_key_b58();
227 initial_subs.push(SubscriptionRequest::new(
228 "account",
229 json!({ "user": pk_str }),
230 ));
231 }
232 }
233 if config.track_ticker {
234 for sym in &config.symbols {
235 initial_subs.push(SubscriptionRequest::new(
236 "ticker",
237 json!({ "symbol": sym }),
238 ));
239 }
240 }
241
242 let actor_handle = tokio::spawn(actor.run(ws_read, initial_subs));
244
245 Ok(Self {
246 cmd_tx,
247 handlers,
248 ticker_rx,
249 account_rx,
250 signer: config.signer,
251 default_timeout: config.default_timeout,
252 next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
253 actor_handle: std::sync::Arc::new(tokio::sync::Mutex::new(Some(actor_handle))),
254 connected,
255 disconnect_tx,
256 })
257 }
258
259 pub async fn shutdown(&self) {
261 let _ = self.cmd_tx.send(Command::Shutdown).await;
262 if let Some(h) = self.actor_handle.lock().await.take() {
263 let _ = h.await;
264 }
265 }
266
267 pub async fn closed(&self) {
269 if let Some(h) = self.actor_handle.lock().await.take() {
270 let _ = h.await;
271 }
272 }
273
274 pub fn is_connected(&self) -> bool {
280 self.connected.load(Ordering::Relaxed)
281 }
282
283
284 pub fn get_ticker(&self, symbol: &str) -> Option<Ticker> {
296 self.ticker_rx.borrow().get(symbol).cloned()
297 }
298
299 pub fn get_price(&self, symbol: &str) -> Option<f64> {
307 self.ticker_rx.borrow().get(symbol).map(|x| x.mark_price)
308 }
309
310 pub fn get_tickers(&self) -> HashMap<String, Ticker> {
315 self.ticker_rx.borrow().clone()
316 }
317
318 pub fn get_margin(&self) -> Margin {
320 self.account_rx.borrow().margin.clone()
321 }
322
323 pub fn get_position(&self, symbol: &str) -> Option<PositionInfo> {
331 self.account_rx.borrow().positions.get(symbol).cloned()
332 }
333
334 pub fn get_positions(&self) -> HashMap<String, PositionInfo> {
336 self.account_rx.borrow().positions.clone()
337 }
338
339 pub fn get_leverage(&self, symbol: &str) -> Option<f64> {
347 self.account_rx
348 .borrow()
349 .leverage_settings
350 .get(symbol)
351 .map(|l| l.leverage)
352 }
353
354 pub async fn wait_tickers_changed(&mut self) -> eyre::Result<HashMap<String, Ticker>> {
360 self.ticker_rx.changed().await?;
361 Ok(self.ticker_rx.borrow().clone())
362 }
363
364 pub async fn wait_account_changed(&mut self) -> eyre::Result<AccountState> {
366 self.account_rx.changed().await?;
367 Ok(self.account_rx.borrow().clone())
368 }
369
370 pub async fn open_orders(&self, symbol: Option<&str>) -> eyre::Result<Vec<OrderState>> {
382 let (tx, rx) = oneshot::channel();
383 self.cmd_tx
384 .send(Command::GetOrders {
385 symbol: symbol.map(Into::into),
386 respond: tx,
387 })
388 .await
389 .map_err(|_| eyre::eyre!("actor gone"))?;
390 Ok(rx.await?)
391 }
392
393 pub async fn place_orders(
408 &self,
409 actions: Vec<Action>,
410 account: Option<Pubkey>,
411 nonce: Option<u64>,
412 ) -> eyre::Result<Vec<Response>> {
413 let signer = self
414 .signer
415 .as_ref()
416 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
417
418 let account = if let Some(account) = account {
419 account
420 } else {
421 signer.public_key()
422 };
423
424 let nonce = nonce.unwrap_or_else(make_nonce);
425 let pk = signer.public_key();
426
427 let mut tx = Transaction {
429 actions,
430 nonce,
431 account,
432 signer: signer.public_key(),
433 signature: Default::default(),
434 };
435 tx.sign(signer)?;
436
437 let request_id = self
438 .next_request_id
439 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440
441 let body = serde_json::to_string(&tx)?;
443 let json = format!(
444 r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
445 body, request_id
446 );
447
448 let (resp_tx, resp_rx) = oneshot::channel();
449
450 self.cmd_tx
451 .send(Command::Tx {
452 request_id,
453 json,
454 respond: resp_tx,
455 })
456 .await
457 .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
458
459 match time::timeout(self.default_timeout, resp_rx).await {
460 Ok(Ok(result)) => result,
461 Ok(Err(_)) => bail!("response channel dropped"),
462 Err(_) => bail!("order request {request_id} timed out"),
463 }
464 }
465
466 pub async fn update_oracle(
482 &self,
483 actions: Vec<Price>,
484 account: Option<Pubkey>,
485 nonce: Option<u64>,
486 ) -> eyre::Result<Vec<Response>> {
487 let signer = self
488 .signer
489 .as_ref()
490 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
491
492 let account = if let Some(account) = account {
493 account
494 } else {
495 signer.public_key()
496 };
497
498 let nonce = nonce.unwrap_or_else(make_nonce);
499
500 let mut tx = Transaction {
502 actions: actions.iter().map(|a| a.clone().into()).collect(),
503 nonce,
504 account,
505 signer: signer.public_key(),
506 signature: Default::default(),
507 };
508 tx.sign(signer)?;
509
510 let request_id = self
511 .next_request_id
512 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
513
514 let body = serde_json::to_string(&tx)?;
516 let json = format!(
517 r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
518 body, request_id
519 );
520
521 let (resp_tx, resp_rx) = oneshot::channel();
522
523 self.cmd_tx
524 .send(Command::Tx {
525 request_id,
526 json,
527 respond: resp_tx,
528 })
529 .await
530 .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
531
532 match time::timeout(self.default_timeout, resp_rx).await {
533 Ok(Ok(result)) => result,
534 Ok(Err(_)) => bail!("oracle update response channel dropped"),
535 Err(_) => bail!("oracle update request {request_id} timed out"),
536 }
537 }
538
539 pub async fn place_limit_order(
554 &self,
555 symbol: &str,
556 side: Side,
557 price: f64,
558 size: f64,
559 tif: TimeInForce,
560 reduce_only: bool,
561 account: Option<Pubkey>,
562 nonce: Option<u64>,
563 ) -> eyre::Result<Response> {
564 let signer = self
565 .signer
566 .as_ref()
567 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
568
569 let account = if let Some(account) = account {
570 account
571 } else {
572 signer.public_key()
573 };
574
575 let nonce = nonce.unwrap_or_else(make_nonce);
576 let order = LimitOrder {
577 symbol: Arc::from(symbol),
578 is_buy: side == Side::Buy,
579 price,
580 size,
581 tif,
582 reduce_only,
583 iso: false,
584 meta: ActionMeta {
585 account,
586 nonce,
587 seqno: 0,
588 hash: None,
589 }
590 };
591 let resps = self.place_orders(vec![order.into()], None, None).await?;
592 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
593 }
594
595 pub async fn place_market_order(
606 &self,
607 symbol: &str,
608 side: Side,
609 size: f64,
610 reduce_only: bool,
611 account: Option<Pubkey>,
612 nonce: Option<u64>,
613 ) -> eyre::Result<Response> {
614 let signer = self
615 .signer
616 .as_ref()
617 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
618
619 let account = if let Some(account) = account {
620 account
621 } else {
622 signer.public_key()
623 };
624
625 let nonce = nonce.unwrap_or_else(make_nonce);
626 let order = MarketOrder {
627 symbol: Arc::from(symbol),
628 is_buy: side == Side::Buy,
629 size,
630 reduce_only,
631 iso: false,
632 meta: ActionMeta {
633 account,
634 nonce,
635 seqno: 0,
636 hash: None,
637 }
638 };
639
640 let resps = self.place_orders(vec![order.into()], None, None).await?;
641 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
642 }
643
644 pub async fn cancel_order(
653 &self,
654 symbol: &str,
655 order_id: &str,
656 account: Option<Pubkey>,
657 nonce: Option<u64>,
658 ) -> eyre::Result<Response> {
659 let signer = self
660 .signer
661 .as_ref()
662 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
663
664 let account = if let Some(account) = account {
665 account
666 } else {
667 signer.public_key()
668 };
669
670 let nonce = nonce.unwrap_or_else(make_nonce);
671 let cancel = CancelOrder {
672 symbol: symbol.to_string(),
673 oid: Hash::from_str(&order_id)?,
674 meta: ActionMeta {
675 account,
676 nonce,
677 seqno: 0,
678 hash: None,
679 }
680 };
681
682 let resps = self.place_orders(vec![cancel.into()], None, None).await?;
683 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
684 }
685
686 pub async fn cancel_all(
694 &self,
695 symbols: Vec<String>,
696 account: Option<Pubkey>,
697 nonce: Option<u64>,
698 ) -> eyre::Result<Response> {
699 let signer = self
700 .signer
701 .as_ref()
702 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
703
704 let account = if let Some(account) = account {
705 account
706 } else {
707 signer.public_key()
708 };
709
710 let nonce = nonce.unwrap_or_else(make_nonce);
711 let cancel = CancelAll {
712 symbols,
713 meta: ActionMeta {
714 account,
715 nonce,
716 seqno: 0,
717 hash: None,
718 }
719 };
720 let resps = self.place_orders(vec![cancel.into()], None, None).await?;
721 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
722 }
723
724
725 pub fn subscribe_disconnect(&self) -> broadcast::Receiver<String> {
746 self.disconnect_tx.subscribe()
747 }
748
749 pub async fn subscribe_ticker(&self, symbol: &str) -> eyre::Result<()> {
754 self.subscribe(vec![
755 SubscriptionRequest::new("ticker", json!({ "symbol": symbol })),
756 ]).await
757 }
758
759 pub async fn subscribe_trades(&self, symbols: &[&str]) -> eyre::Result<()> {
764 let subs = symbols
765 .iter()
766 .map(|s| SubscriptionRequest::new("trades", json!({ "symbol": s })))
767 .collect();
768 self.subscribe(subs).await
769 }
770
771 pub async fn subscribe_l2_snapshot(
776 &self,
777 symbol: &str,
778 nlevels: Option<u32>,
779 ) -> eyre::Result<()> {
780 let mut params = json!({ "symbol": symbol });
781 if let Some(n) = nlevels {
782 params["nlevels"] = json!(n);
783 }
784 self.subscribe(vec![SubscriptionRequest::new("l2Snapshot", params)]).await
785 }
786
787 pub async fn subscribe_l2_delta(&self, symbol: &str) -> eyre::Result<()> {
792 self.subscribe(vec![
793 SubscriptionRequest::new("l2Delta", json!({ "symbol": symbol })),
794 ]).await
795 }
796
797 pub async fn subscribe_candles(&self, symbol: &str, interval: &str) -> eyre::Result<()> {
803 self.subscribe(vec![SubscriptionRequest::new(
804 "candle",
805 json!({ "symbol": symbol, "interval": interval }),
806 )])
807 .await
808 }
809
810 async fn subscribe(&self, subs: Vec<SubscriptionRequest>) -> eyre::Result<()> {
815 self.cmd_tx
816 .send(Command::Subscribe(subs))
817 .await
818 .map_err(|_| eyre::eyre!("actor gone"))?;
819 Ok(())
820 }
821
822 pub async fn on(&self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static) {
833 self.handlers.lock().unwrap().entry(topic).or_default().push(Box::new(handler));
834 }
835}
836
837type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
842type WsReader = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
843
844struct Actor {
845 ws_write: WsWriter,
847 event_tx: mpsc::Sender<(Topic,Event)>,
849
850 cmd_rx: mpsc::Receiver<Command>,
852
853 ticker_tx: watch::Sender<HashMap<String, Ticker>>,
855 account_tx: watch::Sender<AccountState>,
856
857 tickers: HashMap<String, Ticker>,
859 prices: HashMap<String, f64>,
860 account_state: AccountState,
861 pending: HashMap<u64, oneshot::Sender<eyre::Result<Vec<Response>>>>,
862
863 subscriptions: Vec<SubscriptionRequest>,
865
866 connected: Arc<AtomicBool>,
869 disconnect_tx: broadcast::Sender<String>,
871}
872
873impl Actor {
874 async fn run(
876 mut self,
877 mut ws_read: WsReader,
878 initial_subs: Vec<SubscriptionRequest>,
879 ) {
880 if !initial_subs.is_empty() {
882 if let Err(e) = self.send_subscribe(&initial_subs).await {
883 error!("Initial subscription failed: {e}");
884 return;
885 }
886 self.subscriptions = initial_subs;
887 }
888
889 self.emit(Topic::Status, &Event::Connected);
891
892 let disconnect_reason: String = 'actor: loop {
894 tokio::select! {
895 msg = ws_read.next() => {
897 match msg {
898 Some(Ok(Message::Text(text))) => {
899 debug!("msg {}: {}", text.len(), &text[0..512.min(text.len())]);
900 match serde_json::from_str::<Value>(&text) {
901 Ok(data) => self.handle_message(data, &text).await,
902 Err(e) => error!("JSON decode error: {e}"),
903 }
904 }
905 Some(Ok(Message::Close(_))) => {
906 warn!("WebSocket closed by server");
907 break 'actor "server closed the connection".into();
908 }
909 Some(Err(e)) => {
910 error!("WebSocket read error: {e}");
911 break 'actor format!("WebSocket read error: {e}");
912 }
913 None => {
914 warn!("WebSocket stream ended");
915 break 'actor "WebSocket stream ended".into();
916 }
917 _ => {} }
919 }
920
921 cmd = self.cmd_rx.recv() => {
923 match cmd {
924 Some(Command::Subscribe(subs)) => {
925 if let Err(e) = self.send_subscribe(&subs).await {
926 error!("Subscription send error: {e}");
927 }
928 self.subscriptions.extend(subs);
929 }
930
931 Some(Command::Tx { request_id, json, respond }) => {
932 self.pending.insert(request_id, respond);
933 if let Err(e) = self.ws_send_text(&json).await {
934 error!("Order send error: {e}");
935 if let Some(tx) = self.pending.remove(&request_id) {
936 let _ = tx.send(Err(e));
937 }
938 }
939 }
940
941 Some(Command::AsyncTx { json}) => {
942 if let Err(e) = self.ws_send_text(&json).await {
943 error!("Order send error: {e}");
944 }
945 }
946
947 Some(Command::SendRaw(json)) => {
948 if let Err(e) = self.ws_send_text(&json).await {
949 error!("Raw send error: {e}");
950 }
951 }
952
953 Some(Command::GetOrders { symbol, respond }) => {
954 let orders = match symbol {
955 Some(s) => self.account_state.open_orders
956 .values()
957 .filter(|o| o.symbol == s)
958 .cloned()
959 .collect(),
960 None => self.account_state.open_orders.values().cloned().collect(),
961 };
962 let _ = respond.send(orders);
963 }
964
965 Some(Command::Shutdown) | None => {
966 info!("Actor shutting down (requested)");
967 break 'actor "shutdown requested".into();
968 }
969 }
970 }
971 }
972 }; self.handle_disconnect(disconnect_reason).await;
975 }
976
977 async fn handle_disconnect(&mut self, reason: String) {
987 self.connected.store(false, Ordering::Release);
989
990 let err_msg = format!("disconnected: {reason}");
992 for (_, tx) in self.pending.drain() {
993 let _ = tx.send(Err(eyre::eyre!("{}", err_msg)));
994 }
995
996 self.emit(Topic::Status, &Event::Disconnected(reason.clone()));
998
999 let _ = self.disconnect_tx.send(reason.clone());
1001
1002 let _ = self.ws_write.close().await;
1004
1005 info!("Actor stopped: {reason}");
1006 }
1007
1008 async fn ws_send_text(&mut self, text: &str) -> eyre::Result<()> {
1010 let len = text.len();
1011 debug!("sending msg len: {}", len);
1012 self.ws_write
1013 .send(Message::Text(text.into()))
1014 .await
1015 .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1016 Ok(())
1017 }
1018
1019 async fn handle_message(&mut self, data: Value, json: &str) {
1024 let msg_type = data["type"].as_str().unwrap_or("");
1025
1026 match msg_type {
1027 "subscriptionResponse" => {
1028 info!(
1029 "Subscription confirmed: {:?}",
1030 data["topics"].as_array().map(|a| a.len())
1031 );
1032 }
1033
1034 "ticker" => {
1035 let ticker_v = &data["data"]["ticker"];
1036 if let Ok(ticker) = serde_json::from_value::<Ticker>(ticker_v.clone()) {
1037 self.prices.insert(ticker.symbol.clone(), ticker.mark_price);
1038 self.tickers.insert(ticker.symbol.clone(), ticker.clone());
1039
1040 let _ = self.ticker_tx.send(self.tickers.clone());
1042
1043 self.emit(Topic::Ticker, &Event::Ticker(ticker.clone()));
1044 debug!("Ticker: {} mark={:.2}", ticker.symbol, ticker.mark_price);
1045 } else {
1046 error!("Could not parse ticker event: {:?}", ticker_v);
1047 }
1048 }
1049
1050 "trades" => {
1051 if let Ok(trades) = serde_json::from_value::<Vec<Fill>>(data["data"].clone()) {
1052 self.emit(Topic::Trades, &Event::Trades(trades));
1053 } else {
1054 error!("Could not parse trades event: {:?}", data["data"]);
1055 }
1056 }
1057
1058 "l2Snapshot" => {
1059 if let Ok(l2_snapshot) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1060 self.emit(Topic::L2Snapshot, &Event::L2Snapshot(l2_snapshot));
1061 } else {
1062 error!("Could not parse l2_snapshot event: msg: {:?}", data["data"]);
1063 }
1064 }
1065
1066 "l2Delta" => {
1067 if let Ok(l2_delta) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1068 self.emit(Topic::L2Delta, &Event::L2Delta(l2_delta));
1069 } else {
1070 error!("Could not parse l2_delta event: {:?}", data["data"]);
1071 }
1072 }
1073
1074 "candle" => {
1075 if let Ok(candle) = serde_json::from_value::<Candle>(data["data"].clone()) {
1076 self.emit(Topic::Candle, &Event::Candle(candle));
1077 } else {
1078 error!("Could not parse candle event: {:?}", data["data"]);
1079 }
1080 }
1081
1082 "account" => {
1083 self.handle_account(&data["data"]).await;
1084 }
1085
1086 "post" => {
1087 self.handle_post_response(&data, json);
1088 }
1089
1090 other => {
1091 debug!("Unhandled message type: {other}");
1092 }
1093 }
1094 }
1095
1096 async fn handle_account(&mut self, data: &Value) {
1101 let update_type = data["type"].as_str().unwrap_or("");
1102
1103 match update_type {
1104 "accountSnapshot" => {
1105 if let Ok(margin) = serde_json::from_value::<Margin>(data["margin"].clone()) {
1106 self.account_state.margin = margin.clone();
1107 self.emit(Topic::Margin, &Event::Margin(margin))
1108 }
1109
1110 if let Ok(positions) = serde_json::from_value::<Vec<PositionInfo>>(data["positions"].clone()) {
1111 for position in &positions {
1112 self.emit(Topic::Position, &Event::Position(position.clone()))
1113 }
1114 self.account_state.positions = positions
1115 .into_iter()
1116 .map(|p| (p.symbol.clone(), p.clone()))
1117 .collect();
1118 }
1119
1120 if let Ok(orders) = serde_json::from_value::<Vec<OrderState>>(data["openOrders"].clone()) {
1121 for order in &orders {
1122 self.emit(Topic::Order, &Event::Order(order.clone()))
1123 }
1124 self.account_state.open_orders = orders
1125 .into_iter()
1126 .map(|o| (o.order_id.clone(), o))
1127 .collect();
1128 }
1129
1130 if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverageSettings"].clone()) {
1131 self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1132 for l in leverages {
1133 self.account_state.leverage_settings.insert(l.symbol.clone(), l);
1134 }
1135 }
1136
1137 info!(
1138 "Account snapshot: balance={:.2}, positions={}, orders={}",
1139 self.account_state.margin.total_balance,
1140 self.account_state.positions.len(),
1141 self.account_state.open_orders.len(),
1142 );
1143 }
1144
1145 "orderUpdate" => {
1146 if let Ok(order) = serde_json::from_value::<OrderState>(data.clone()) {
1147 let oid = order.order_id.clone();
1148 if order.status.is_terminal() {
1149 self.account_state.open_orders.remove(&oid);
1150 } else {
1151 self.account_state.open_orders.insert(oid, order.clone());
1152 }
1153 self.emit(Topic::Order, &Event::Order(order));
1154 } else {
1155 error!("Could not parse order event: {:?}", data);
1156 }
1157 }
1158
1159 "marginUpdate" => {
1160 if let Ok(margin) = serde_json::from_value::<Margin>(data.clone()) {
1161 self.account_state.margin = margin.clone();
1162 self.publish_account();
1163 self.emit(Topic::Margin, &Event::Margin(margin));
1164 } else {
1165 error!("Could not parse margin event: {:?}", data);
1166 }
1167 }
1168
1169 "positionUpdate" => {
1170 if let Ok(pos) = serde_json::from_value::<PositionInfo>(data.clone()) {
1171 self.account_state.positions.insert(pos.symbol.clone(), pos.clone());
1172 self.emit(Topic::Position, &Event::Position(pos));
1173 self.publish_account();
1174 } else {
1175 error!("Could not parse position event: {:?}", data);
1176 }
1177 }
1178
1179 "fill" => {
1180 if let Ok(fill) = serde_json::from_value::<Fill>(data.clone()) {
1181 let dir = fill.side.dir();
1182 if let Some(order) = self.account_state.open_orders.get_mut(&fill.order_id) {
1183 order.filled_size += fill.size;
1184 order.signed_size -= dir * fill.size;
1185 if order.signed_size * dir <= 0.0 {
1186 self.account_state.open_orders.remove(&fill.order_id);
1187 }
1188 }
1189 self.publish_account();
1190 self.emit(Topic::Fill, &Event::Fill(fill.clone()));
1191 info!(
1192 "Fill: {} {:?} {} @ {} maker={}",
1193 fill.symbol, fill.side, fill.size, fill.price, fill.is_maker,
1194 );
1195 } else {
1196 error!("Could not parse fill event: {:?}", data);
1197 }
1198 }
1199
1200 "leverageUpdate" => {
1201 if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverage"].clone()) {
1202 for lev in &leverages {
1203 self.account_state.leverage_settings.insert(lev.symbol.clone(), lev.clone());
1204 }
1205 self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1206 self.publish_account();
1207 } else {
1208 error!("Could not parse leverage event: {:?}", data);
1209 }
1210 }
1211
1212 _ => {
1213 debug!("Unknown account update: {update_type}");
1214 }
1215 }
1216 }
1217
1218 fn handle_post_response(&mut self, data: &Value, _json: &str) {
1223 let request_id = data["id"].as_u64().unwrap_or(0);
1224 let inner = &data["data"];
1225 let rtype = inner["type"].as_str().unwrap_or("");
1226 let sender = self.pending.remove(&request_id);
1227
1228 match rtype {
1229 "action"=> {
1230 let payload = &inner["payload"];
1231 let status = payload["status"].as_str().unwrap_or("");
1232
1233 if status != "ok" {
1234 error!("Order request {request_id} failed: {status}");
1235 if let Some(tx) = sender {
1236 let _ = tx.send(Err(eyre::eyre!("order request failed: {}", data)));
1237 }
1238 self.emit(Topic::Error, &Event::Error(data.clone()));
1239 } else {
1240 let responses = Response::parse_responses(data);
1241 if let Some(tx) = sender {
1242 let _ = tx.send(Ok(responses));
1243 }
1244 }
1245 }
1246 "ack" => {
1247 let ok = inner["ok"].as_bool().unwrap_or(false);
1248 let response = if ok {
1249 Response {
1250 order_id: None,
1251 status: "OK".to_string(),
1252 message: None,
1253 raw: inner.clone(),
1254 }
1255 } else {
1256 let message = inner["message"].as_str().unwrap_or("");
1257 Response {
1258 order_id: None,
1259 status: "Error".to_string(),
1260 message: Some(message.to_string()),
1261 raw: inner.clone(),
1262 }
1263 };
1264 if let Some(tx) = sender {
1265 let _ = tx.send(Ok(vec![response]));
1266 }
1267 }
1268 _ => panic!("unknown response type: {}", rtype),
1269 }
1270 }
1271
1272 fn publish_account(&self) {
1278 let _ = self.account_tx.send(self.account_state.clone());
1279 }
1280
1281 fn emit(&self, topic: Topic, data: &Event) {
1283 let _ = self.event_tx.try_send((topic, data.clone()));
1284 }
1285
1286 async fn ws_send_json(&mut self, value: &Value) -> eyre::Result<()> {
1288 let text = serde_json::to_string(value)?;
1289 self.ws_write
1290 .send(Message::Text(text.into()))
1291 .await
1292 .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1293 Ok(())
1294 }
1295
1296 async fn send_subscribe(&mut self, subs: &[SubscriptionRequest]) -> eyre::Result<()> {
1298 let request = json!({
1299 "method": "subscribe",
1300 "subscription": subs.iter().map(|s| s.to_json()).collect::<Vec<_>>(),
1301 });
1302 self.ws_send_json(&request).await?;
1303 info!("Subscribed to {} topics", subs.len());
1304 Ok(())
1305 }
1306}