1use std::collections::HashMap;
63use std::str::FromStr;
64use std::sync::{Arc, Mutex};
65use std::sync::atomic::{AtomicBool, Ordering};
66use std::time::Duration;
67use eyre::bail;
68use serde_json::{json, Value};
69use crate::msgs::account::{Fill, LeverageSetting, Margin, OrderState, PositionInfo};
70use crate::msgs::responses::Response;
71use crate::msgs::subscription::SubscriptionRequest;
72
73use futures_util::{SinkExt, StreamExt};
74use futures_util::stream::SplitSink;
75use serde::Deserialize;
76use solana_hash::Hash;
77use solana_pubkey::Pubkey;
78use tokio::net::TcpStream;
79use tokio::sync::{broadcast, mpsc, oneshot, watch};
80use tokio::time;
81use tokio_tungstenite::{
82 connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
83};
84use tracing::{debug, error, info, warn};
85use crate::api::parts::command::Command;
86use crate::api::parts::config::WSConfig;
87use crate::api::parts::{make_nonce, Event, Topic};
88use crate::common::side::Side;
89use crate::common::tif::TimeInForce;
90use crate::msgs::{CancelAll, CancelOrder, LimitOrder, MarketOrder, Price};
91use crate::msgs::md::{Candle, L2Snapshot, Ticker};
92use crate::transaction::{Action, ActionMeta, Transaction, TransactionSigner};
93#[derive(Debug, Clone, Default, Deserialize)]
100#[allow(unused)]
101pub struct AccountState {
102 pub margin: Margin,
103 pub positions: HashMap<String, PositionInfo>,
104 pub open_orders: HashMap<String, OrderState>,
105 pub leverage_settings: HashMap<String, LeverageSetting>,
106}
107
108#[allow(unused)]
115pub type EventHandler = Box<dyn Fn(&Event) + Send + Sync>;
116
117#[allow(unused)]
130#[derive(Clone)]
131pub struct BulkWsClient {
132 cmd_tx: mpsc::Sender<Command>,
134 handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>>,
136
137 ticker_rx: watch::Receiver<HashMap<String, Ticker>>,
140 account_rx: watch::Receiver<AccountState>,
142
143 signer: Option<TransactionSigner>,
145 default_timeout: Duration,
146
147 next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
149
150 actor_handle: std::sync::Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
152
153 connected: Arc<AtomicBool>,
156
157 disconnect_tx: broadcast::Sender<String>,
161}
162
163#[allow(unused)]
164impl BulkWsClient {
165 pub async fn connect(config: WSConfig) -> eyre::Result<Self> {
175 info!("Connecting to {}", config.url);
176 let (ws_stream, _) = connect_async(&config.url).await?;
177 let (ws_write, ws_read) = ws_stream.split();
178 info!("Connected to Bulk Exchange WebSocket");
179
180 let (ticker_tx, ticker_rx) = watch::channel(HashMap::new());
182 let (account_tx, account_rx) = watch::channel(AccountState::default());
183
184 let (cmd_tx, cmd_rx) = mpsc::channel::<Command>(512);
186
187 let handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>> = Arc::default();
189 let handlers_task = Arc::clone(&handlers);
190 let (event_tx, mut event_rx) = mpsc::channel::<(Topic, Event)>(32768);
191
192 tokio::spawn(async move {
193 while let Some((topic, event)) = event_rx.recv().await {
194 let map = handlers_task.lock().unwrap();
195 if let Some(hs) = map.get(&topic) {
196 for h in hs {
197 h(&event);
198 }
199 }
200 }
201 });
202
203 let connected = Arc::new(AtomicBool::new(true));
205 let (disconnect_tx, _) = broadcast::channel::<String>(4);
206
207 let actor = Actor {
208 ws_write,
209 event_tx,
210 cmd_rx,
211 ticker_tx,
212 account_tx,
213 tickers: HashMap::new(),
214 prices: HashMap::new(),
215 account_state: AccountState::default(),
216 pending: HashMap::new(),
217 subscriptions: Vec::new(),
218 connected: Arc::clone(&connected),
219 disconnect_tx: disconnect_tx.clone(),
220 };
221
222 let mut initial_subs = Vec::new();
224 if config.track_account {
225 if let Some(ref signer) = config.signer {
226 let pk_str = signer.public_key_b58();
227 initial_subs.push(SubscriptionRequest::new(
228 "account",
229 json!({ "user": pk_str }),
230 ));
231 }
232 }
233 if config.track_ticker {
234 for sym in &config.symbols {
235 initial_subs.push(SubscriptionRequest::new(
236 "ticker",
237 json!({ "symbol": sym }),
238 ));
239 }
240 }
241
242 let actor_handle = tokio::spawn(actor.run(ws_read, initial_subs));
244
245 Ok(Self {
246 cmd_tx,
247 handlers,
248 ticker_rx,
249 account_rx,
250 signer: config.signer,
251 default_timeout: config.default_timeout,
252 next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
253 actor_handle: std::sync::Arc::new(tokio::sync::Mutex::new(Some(actor_handle))),
254 connected,
255 disconnect_tx,
256 })
257 }
258
259 pub async fn shutdown(&self) {
261 let _ = self.cmd_tx.send(Command::Shutdown).await;
262 if let Some(h) = self.actor_handle.lock().await.take() {
263 let _ = h.await;
264 }
265 }
266
267 pub async fn closed(&self) {
269 if let Some(h) = self.actor_handle.lock().await.take() {
270 let _ = h.await;
271 }
272 }
273
274 pub fn is_connected(&self) -> bool {
280 self.connected.load(Ordering::Relaxed)
281 }
282
283
284 pub fn get_ticker(&self, symbol: &str) -> Option<Ticker> {
296 self.ticker_rx.borrow().get(symbol).cloned()
297 }
298
299 pub fn get_price(&self, symbol: &str) -> Option<f64> {
307 self.ticker_rx.borrow().get(symbol).map(|x| x.mark_price)
308 }
309
310 pub fn get_tickers(&self) -> HashMap<String, Ticker> {
315 self.ticker_rx.borrow().clone()
316 }
317
318 pub fn get_margin(&self) -> Margin {
320 self.account_rx.borrow().margin.clone()
321 }
322
323 pub fn get_position(&self, symbol: &str) -> Option<PositionInfo> {
331 self.account_rx.borrow().positions.get(symbol).cloned()
332 }
333
334 pub fn get_positions(&self) -> HashMap<String, PositionInfo> {
336 self.account_rx.borrow().positions.clone()
337 }
338
339 pub fn get_leverage(&self, symbol: &str) -> Option<f64> {
347 self.account_rx
348 .borrow()
349 .leverage_settings
350 .get(symbol)
351 .map(|l| l.leverage)
352 }
353
354 pub async fn wait_tickers_changed(&mut self) -> eyre::Result<HashMap<String, Ticker>> {
360 self.ticker_rx.changed().await?;
361 Ok(self.ticker_rx.borrow().clone())
362 }
363
364 pub async fn wait_account_changed(&mut self) -> eyre::Result<AccountState> {
366 self.account_rx.changed().await?;
367 Ok(self.account_rx.borrow().clone())
368 }
369
370 pub async fn open_orders(&self, symbol: Option<&str>) -> eyre::Result<Vec<OrderState>> {
382 let (tx, rx) = oneshot::channel();
383 self.cmd_tx
384 .send(Command::GetOrders {
385 symbol: symbol.map(Into::into),
386 respond: tx,
387 })
388 .await
389 .map_err(|_| eyre::eyre!("actor gone"))?;
390 Ok(rx.await?)
391 }
392
393 pub async fn place_orders(
408 &self,
409 actions: Vec<Action>,
410 account: Option<Pubkey>,
411 nonce: Option<u64>,
412 ) -> eyre::Result<Vec<Response>> {
413 let signer = self
414 .signer
415 .as_ref()
416 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
417
418 let account = if let Some(account) = account {
419 account
420 } else {
421 signer.public_key()
422 };
423
424 let nonce = nonce.unwrap_or_else(make_nonce);
425 let pk = signer.public_key();
426
427 let mut tx = Transaction {
429 actions,
430 nonce,
431 account,
432 signer: signer.public_key(),
433 signature: Default::default(),
434 };
435 tx.sign(signer)?;
436
437 let request_id = self
438 .next_request_id
439 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440
441 let body = serde_json::to_string(&tx)?;
443 let json = format!(
444 r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
445 body, request_id
446 );
447
448 let (resp_tx, resp_rx) = oneshot::channel();
449
450 self.cmd_tx
451 .send(Command::Tx {
452 request_id,
453 json,
454 respond: resp_tx,
455 })
456 .await
457 .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
458
459 match time::timeout(self.default_timeout, resp_rx).await {
460 Ok(Ok(result)) => result,
461 Ok(Err(_)) => bail!("response channel dropped"),
462 Err(_) => bail!("order request {request_id} timed out"),
463 }
464 }
465
466 pub async fn update_oracle(
475 &self,
476 actions: Vec<Price>,
477 account: Option<Pubkey>,
478 nonce: Option<u64>,
479 ) -> eyre::Result<()> {
480 let signer = self
481 .signer
482 .as_ref()
483 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
484
485 let account = if let Some(account) = account {
486 account
487 } else {
488 signer.public_key()
489 };
490
491 let nonce = nonce.unwrap_or_else(make_nonce);
492
493 let mut tx = Transaction {
495 actions: actions.iter().map(|a| a.clone().into()).collect(),
496 nonce,
497 account,
498 signer: signer.public_key(),
499 signature: Default::default(),
500 };
501 tx.sign(signer)?;
502
503 let request_id = self
504 .next_request_id
505 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
506
507 let body = serde_json::to_string(&tx)?;
509 let json = format!(
510 r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
511 body, request_id
512 );
513
514 self.cmd_tx
515 .send(Command::AsyncTx {
516 json,
517 })
518 .await
519 .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))
520
521 }
522
523 pub async fn place_limit_order(
538 &self,
539 symbol: &str,
540 side: Side,
541 price: f64,
542 size: f64,
543 tif: TimeInForce,
544 reduce_only: bool,
545 account: Option<Pubkey>,
546 nonce: Option<u64>,
547 ) -> eyre::Result<Response> {
548 let signer = self
549 .signer
550 .as_ref()
551 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
552
553 let account = if let Some(account) = account {
554 account
555 } else {
556 signer.public_key()
557 };
558
559 let nonce = nonce.unwrap_or_else(make_nonce);
560 let order = LimitOrder {
561 symbol: Arc::from(symbol),
562 is_buy: side == Side::Buy,
563 price,
564 size,
565 tif,
566 reduce_only,
567 iso: false,
568 meta: ActionMeta {
569 account,
570 nonce,
571 seqno: 0,
572 hash: None,
573 }
574 };
575 let resps = self.place_orders(vec![order.into()], None, None).await?;
576 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
577 }
578
579 pub async fn place_market_order(
590 &self,
591 symbol: &str,
592 side: Side,
593 size: f64,
594 reduce_only: bool,
595 account: Option<Pubkey>,
596 nonce: Option<u64>,
597 ) -> eyre::Result<Response> {
598 let signer = self
599 .signer
600 .as_ref()
601 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
602
603 let account = if let Some(account) = account {
604 account
605 } else {
606 signer.public_key()
607 };
608
609 let nonce = nonce.unwrap_or_else(make_nonce);
610 let order = MarketOrder {
611 symbol: Arc::from(symbol),
612 is_buy: side == Side::Buy,
613 size,
614 reduce_only,
615 iso: false,
616 meta: ActionMeta {
617 account,
618 nonce,
619 seqno: 0,
620 hash: None,
621 }
622 };
623
624 let resps = self.place_orders(vec![order.into()], None, None).await?;
625 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
626 }
627
628 pub async fn cancel_order(
637 &self,
638 symbol: &str,
639 order_id: &str,
640 account: Option<Pubkey>,
641 nonce: Option<u64>,
642 ) -> eyre::Result<Response> {
643 let signer = self
644 .signer
645 .as_ref()
646 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
647
648 let account = if let Some(account) = account {
649 account
650 } else {
651 signer.public_key()
652 };
653
654 let nonce = nonce.unwrap_or_else(make_nonce);
655 let cancel = CancelOrder {
656 symbol: symbol.to_string(),
657 oid: Hash::from_str(&order_id)?,
658 meta: ActionMeta {
659 account,
660 nonce,
661 seqno: 0,
662 hash: None,
663 }
664 };
665
666 let resps = self.place_orders(vec![cancel.into()], None, None).await?;
667 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
668 }
669
670 pub async fn cancel_all(
678 &self,
679 symbols: Vec<String>,
680 account: Option<Pubkey>,
681 nonce: Option<u64>,
682 ) -> eyre::Result<Response> {
683 let signer = self
684 .signer
685 .as_ref()
686 .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
687
688 let account = if let Some(account) = account {
689 account
690 } else {
691 signer.public_key()
692 };
693
694 let nonce = nonce.unwrap_or_else(make_nonce);
695 let cancel = CancelAll {
696 symbols,
697 meta: ActionMeta {
698 account,
699 nonce,
700 seqno: 0,
701 hash: None,
702 }
703 };
704 let resps = self.place_orders(vec![cancel.into()], None, None).await?;
705 resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
706 }
707
708
709 pub fn subscribe_disconnect(&self) -> broadcast::Receiver<String> {
730 self.disconnect_tx.subscribe()
731 }
732
733 pub async fn subscribe_ticker(&self, symbol: &str) -> eyre::Result<()> {
738 self.subscribe(vec![
739 SubscriptionRequest::new("ticker", json!({ "symbol": symbol })),
740 ]).await
741 }
742
743 pub async fn subscribe_trades(&self, symbols: &[&str]) -> eyre::Result<()> {
748 let subs = symbols
749 .iter()
750 .map(|s| SubscriptionRequest::new("trades", json!({ "symbol": s })))
751 .collect();
752 self.subscribe(subs).await
753 }
754
755 pub async fn subscribe_l2_snapshot(
760 &self,
761 symbol: &str,
762 nlevels: Option<u32>,
763 ) -> eyre::Result<()> {
764 let mut params = json!({ "symbol": symbol });
765 if let Some(n) = nlevels {
766 params["nlevels"] = json!(n);
767 }
768 self.subscribe(vec![SubscriptionRequest::new("l2Snapshot", params)]).await
769 }
770
771 pub async fn subscribe_l2_delta(&self, symbol: &str) -> eyre::Result<()> {
776 self.subscribe(vec![
777 SubscriptionRequest::new("l2Delta", json!({ "symbol": symbol })),
778 ]).await
779 }
780
781 pub async fn subscribe_candles(&self, symbol: &str, interval: &str) -> eyre::Result<()> {
787 self.subscribe(vec![SubscriptionRequest::new(
788 "candle",
789 json!({ "symbol": symbol, "interval": interval }),
790 )])
791 .await
792 }
793
794 async fn subscribe(&self, subs: Vec<SubscriptionRequest>) -> eyre::Result<()> {
799 self.cmd_tx
800 .send(Command::Subscribe(subs))
801 .await
802 .map_err(|_| eyre::eyre!("actor gone"))?;
803 Ok(())
804 }
805
806 pub async fn on(&self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static) {
817 self.handlers.lock().unwrap().entry(topic).or_default().push(Box::new(handler));
818 }
819}
820
821type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
826type WsReader = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
827
828struct Actor {
829 ws_write: WsWriter,
831 event_tx: mpsc::Sender<(Topic,Event)>,
833
834 cmd_rx: mpsc::Receiver<Command>,
836
837 ticker_tx: watch::Sender<HashMap<String, Ticker>>,
839 account_tx: watch::Sender<AccountState>,
840
841 tickers: HashMap<String, Ticker>,
843 prices: HashMap<String, f64>,
844 account_state: AccountState,
845 pending: HashMap<u64, oneshot::Sender<eyre::Result<Vec<Response>>>>,
846
847 subscriptions: Vec<SubscriptionRequest>,
849
850 connected: Arc<AtomicBool>,
853 disconnect_tx: broadcast::Sender<String>,
855}
856
857impl Actor {
858 async fn run(
860 mut self,
861 mut ws_read: WsReader,
862 initial_subs: Vec<SubscriptionRequest>,
863 ) {
864 if !initial_subs.is_empty() {
866 if let Err(e) = self.send_subscribe(&initial_subs).await {
867 error!("Initial subscription failed: {e}");
868 return;
869 }
870 self.subscriptions = initial_subs;
871 }
872
873 self.emit(Topic::Status, &Event::Connected);
875
876 let disconnect_reason: String = 'actor: loop {
878 tokio::select! {
879 msg = ws_read.next() => {
881 match msg {
882 Some(Ok(Message::Text(text))) => {
883 debug!("msg {}: {}", text.len(), &text[0..512.min(text.len())]);
884 match serde_json::from_str::<Value>(&text) {
885 Ok(data) => self.handle_message(data, &text).await,
886 Err(e) => error!("JSON decode error: {e}"),
887 }
888 }
889 Some(Ok(Message::Close(_))) => {
890 warn!("WebSocket closed by server");
891 break 'actor "server closed the connection".into();
892 }
893 Some(Err(e)) => {
894 error!("WebSocket read error: {e}");
895 break 'actor format!("WebSocket read error: {e}");
896 }
897 None => {
898 warn!("WebSocket stream ended");
899 break 'actor "WebSocket stream ended".into();
900 }
901 _ => {} }
903 }
904
905 cmd = self.cmd_rx.recv() => {
907 match cmd {
908 Some(Command::Subscribe(subs)) => {
909 if let Err(e) = self.send_subscribe(&subs).await {
910 error!("Subscription send error: {e}");
911 }
912 self.subscriptions.extend(subs);
913 }
914
915 Some(Command::Tx { request_id, json, respond }) => {
916 self.pending.insert(request_id, respond);
917 if let Err(e) = self.ws_send_text(&json).await {
918 error!("Order send error: {e}");
919 if let Some(tx) = self.pending.remove(&request_id) {
920 let _ = tx.send(Err(e));
921 }
922 }
923 }
924
925 Some(Command::AsyncTx { json}) => {
926 if let Err(e) = self.ws_send_text(&json).await {
927 error!("Order send error: {e}");
928 }
929 }
930
931 Some(Command::SendRaw(json)) => {
932 if let Err(e) = self.ws_send_text(&json).await {
933 error!("Raw send error: {e}");
934 }
935 }
936
937 Some(Command::GetOrders { symbol, respond }) => {
938 let orders = match symbol {
939 Some(s) => self.account_state.open_orders
940 .values()
941 .filter(|o| o.symbol == s)
942 .cloned()
943 .collect(),
944 None => self.account_state.open_orders.values().cloned().collect(),
945 };
946 let _ = respond.send(orders);
947 }
948
949 Some(Command::Shutdown) | None => {
950 info!("Actor shutting down (requested)");
951 break 'actor "shutdown requested".into();
952 }
953 }
954 }
955 }
956 }; self.handle_disconnect(disconnect_reason).await;
959 }
960
961 async fn handle_disconnect(&mut self, reason: String) {
971 self.connected.store(false, Ordering::Release);
973
974 let err_msg = format!("disconnected: {reason}");
976 for (_, tx) in self.pending.drain() {
977 let _ = tx.send(Err(eyre::eyre!("{}", err_msg)));
978 }
979
980 self.emit(Topic::Status, &Event::Disconnected(reason.clone()));
982
983 let _ = self.disconnect_tx.send(reason.clone());
985
986 let _ = self.ws_write.close().await;
988
989 info!("Actor stopped: {reason}");
990 }
991
992 async fn ws_send_text(&mut self, text: &str) -> eyre::Result<()> {
994 let len = text.len();
995 debug!("sending msg len: {}", len);
996 self.ws_write
997 .send(Message::Text(text.into()))
998 .await
999 .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1000 Ok(())
1001 }
1002
1003 async fn handle_message(&mut self, data: Value, json: &str) {
1008 let msg_type = data["type"].as_str().unwrap_or("");
1009
1010 match msg_type {
1011 "subscriptionResponse" => {
1012 info!(
1013 "Subscription confirmed: {:?}",
1014 data["topics"].as_array().map(|a| a.len())
1015 );
1016 }
1017
1018 "ticker" => {
1019 let ticker_v = &data["data"]["ticker"];
1020 if let Ok(ticker) = serde_json::from_value::<Ticker>(ticker_v.clone()) {
1021 self.prices.insert(ticker.symbol.clone(), ticker.mark_price);
1022 self.tickers.insert(ticker.symbol.clone(), ticker.clone());
1023
1024 let _ = self.ticker_tx.send(self.tickers.clone());
1026
1027 self.emit(Topic::Ticker, &Event::Ticker(ticker.clone()));
1028 debug!("Ticker: {} mark={:.2}", ticker.symbol, ticker.mark_price);
1029 } else {
1030 error!("Could not parse ticker event: {:?}", ticker_v);
1031 }
1032 }
1033
1034 "trades" => {
1035 if let Ok(trades) = serde_json::from_value::<Vec<Fill>>(data["data"].clone()) {
1036 self.emit(Topic::Trades, &Event::Trades(trades));
1037 } else {
1038 error!("Could not parse trades event: {:?}", data["data"]);
1039 }
1040 }
1041
1042 "l2Snapshot" => {
1043 if let Ok(l2_snapshot) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1044 self.emit(Topic::L2Snapshot, &Event::L2Snapshot(l2_snapshot));
1045 } else {
1046 error!("Could not parse l2_snapshot event: msg: {:?}", data["data"]);
1047 }
1048 }
1049
1050 "l2Delta" => {
1051 if let Ok(l2_delta) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1052 self.emit(Topic::L2Delta, &Event::L2Delta(l2_delta));
1053 } else {
1054 error!("Could not parse l2_delta event: {:?}", data["data"]);
1055 }
1056 }
1057
1058 "candle" => {
1059 if let Ok(candle) = serde_json::from_value::<Candle>(data["data"].clone()) {
1060 self.emit(Topic::Candle, &Event::Candle(candle));
1061 } else {
1062 error!("Could not parse candle event: {:?}", data["data"]);
1063 }
1064 }
1065
1066 "account" => {
1067 self.handle_account(&data["data"]).await;
1068 }
1069
1070 "post" => {
1071 self.handle_post_response(&data, json);
1072 }
1073
1074 other => {
1075 debug!("Unhandled message type: {other}");
1076 }
1077 }
1078 }
1079
1080 async fn handle_account(&mut self, data: &Value) {
1085 let update_type = data["type"].as_str().unwrap_or("");
1086
1087 match update_type {
1088 "accountSnapshot" => {
1089 if let Ok(margin) = serde_json::from_value::<Margin>(data["margin"].clone()) {
1090 self.account_state.margin = margin.clone();
1091 self.emit(Topic::Margin, &Event::Margin(margin))
1092 }
1093
1094 if let Ok(positions) = serde_json::from_value::<Vec<PositionInfo>>(data["positions"].clone()) {
1095 for position in &positions {
1096 self.emit(Topic::Position, &Event::Position(position.clone()))
1097 }
1098 self.account_state.positions = positions
1099 .into_iter()
1100 .map(|p| (p.symbol.clone(), p.clone()))
1101 .collect();
1102 }
1103
1104 if let Ok(orders) = serde_json::from_value::<Vec<OrderState>>(data["openOrders"].clone()) {
1105 for order in &orders {
1106 self.emit(Topic::Order, &Event::Order(order.clone()))
1107 }
1108 self.account_state.open_orders = orders
1109 .into_iter()
1110 .map(|o| (o.order_id.clone(), o))
1111 .collect();
1112 }
1113
1114 if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverageSettings"].clone()) {
1115 self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1116 for l in leverages {
1117 self.account_state.leverage_settings.insert(l.symbol.clone(), l);
1118 }
1119 }
1120
1121 info!(
1122 "Account snapshot: balance={:.2}, positions={}, orders={}",
1123 self.account_state.margin.total_balance,
1124 self.account_state.positions.len(),
1125 self.account_state.open_orders.len(),
1126 );
1127 }
1128
1129 "orderUpdate" => {
1130 if let Ok(order) = serde_json::from_value::<OrderState>(data.clone()) {
1131 let oid = order.order_id.clone();
1132 if order.status.is_terminal() {
1133 self.account_state.open_orders.remove(&oid);
1134 } else {
1135 self.account_state.open_orders.insert(oid, order.clone());
1136 }
1137 self.emit(Topic::Order, &Event::Order(order));
1138 } else {
1139 error!("Could not parse order event: {:?}", data);
1140 }
1141 }
1142
1143 "marginUpdate" => {
1144 if let Ok(margin) = serde_json::from_value::<Margin>(data.clone()) {
1145 self.account_state.margin = margin.clone();
1146 self.publish_account();
1147 self.emit(Topic::Margin, &Event::Margin(margin));
1148 } else {
1149 error!("Could not parse margin event: {:?}", data);
1150 }
1151 }
1152
1153 "positionUpdate" => {
1154 if let Ok(pos) = serde_json::from_value::<PositionInfo>(data.clone()) {
1155 self.account_state.positions.insert(pos.symbol.clone(), pos.clone());
1156 self.emit(Topic::Position, &Event::Position(pos));
1157 self.publish_account();
1158 } else {
1159 error!("Could not parse position event: {:?}", data);
1160 }
1161 }
1162
1163 "fill" => {
1164 if let Ok(fill) = serde_json::from_value::<Fill>(data.clone()) {
1165 let dir = fill.side.dir();
1166 if let Some(order) = self.account_state.open_orders.get_mut(&fill.order_id) {
1167 order.filled_size += fill.size;
1168 order.signed_size -= dir * fill.size;
1169 if order.signed_size * dir <= 0.0 {
1170 self.account_state.open_orders.remove(&fill.order_id);
1171 }
1172 }
1173 self.publish_account();
1174 self.emit(Topic::Fill, &Event::Fill(fill.clone()));
1175 info!(
1176 "Fill: {} {:?} {} @ {} maker={}",
1177 fill.symbol, fill.side, fill.size, fill.price, fill.is_maker,
1178 );
1179 } else {
1180 error!("Could not parse fill event: {:?}", data);
1181 }
1182 }
1183
1184 "leverageUpdate" => {
1185 if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverage"].clone()) {
1186 for lev in &leverages {
1187 self.account_state.leverage_settings.insert(lev.symbol.clone(), lev.clone());
1188 }
1189 self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1190 self.publish_account();
1191 } else {
1192 error!("Could not parse leverage event: {:?}", data);
1193 }
1194 }
1195
1196 _ => {
1197 debug!("Unknown account update: {update_type}");
1198 }
1199 }
1200 }
1201
1202 fn handle_post_response(&mut self, data: &Value, _json: &str) {
1207 let request_id = data["id"].as_u64().unwrap_or(0);
1208 let inner = &data["data"];
1209 let rtype = inner["type"].as_str().unwrap_or("");
1210 let sender = self.pending.remove(&request_id);
1211
1212 match rtype {
1213 "action"=> {
1214 let payload = &inner["payload"];
1215 let status = payload["status"].as_str().unwrap_or("");
1216
1217 if status != "ok" {
1218 error!("Order request {request_id} failed: {status}");
1219 if let Some(tx) = sender {
1220 let _ = tx.send(Err(eyre::eyre!("order request failed: {}", data)));
1221 }
1222 self.emit(Topic::Error, &Event::Error(data.clone()));
1223 } else {
1224 let responses = Response::parse_responses(data);
1225 if let Some(tx) = sender {
1226 let _ = tx.send(Ok(responses));
1227 }
1228 }
1229 }
1230 "ack" => {
1231 let ok = inner["ok"].as_bool().unwrap_or(false);
1232 let response = if ok {
1233 Response {
1234 order_id: None,
1235 status: "OK".to_string(),
1236 message: None,
1237 raw: inner.clone(),
1238 }
1239 } else {
1240 let message = inner["message"].as_str().unwrap_or("");
1241 Response {
1242 order_id: None,
1243 status: "Error".to_string(),
1244 message: Some(message.to_string()),
1245 raw: inner.clone(),
1246 }
1247 };
1248 if let Some(tx) = sender {
1249 let _ = tx.send(Ok(vec![response]));
1250 }
1251 }
1252 _ => panic!("unknown response type: {}", rtype),
1253 }
1254 }
1255
1256 fn publish_account(&self) {
1262 let _ = self.account_tx.send(self.account_state.clone());
1263 }
1264
1265 fn emit(&self, topic: Topic, data: &Event) {
1267 let _ = self.event_tx.try_send((topic, data.clone()));
1268 }
1269
1270 async fn ws_send_json(&mut self, value: &Value) -> eyre::Result<()> {
1272 let text = serde_json::to_string(value)?;
1273 self.ws_write
1274 .send(Message::Text(text.into()))
1275 .await
1276 .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1277 Ok(())
1278 }
1279
1280 async fn send_subscribe(&mut self, subs: &[SubscriptionRequest]) -> eyre::Result<()> {
1282 let request = json!({
1283 "method": "subscribe",
1284 "subscription": subs.iter().map(|s| s.to_json()).collect::<Vec<_>>(),
1285 });
1286 self.ws_send_json(&request).await?;
1287 info!("Subscribed to {} topics", subs.len());
1288 Ok(())
1289 }
1290}