use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use eyre::bail;
use serde_json::{json, Value};
use crate::msgs::account::{Fill, LeverageSetting, Margin, OrderState, PositionInfo};
use crate::msgs::responses::Response;
use crate::msgs::subscription::SubscriptionRequest;
use futures_util::{SinkExt, StreamExt};
use futures_util::stream::SplitSink;
use serde::Deserialize;
use solana_hash::Hash;
use solana_pubkey::Pubkey;
use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc, oneshot, watch};
use tokio::time;
use tokio_tungstenite::{
connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, error, info, warn};
use crate::api::parts::command::Command;
use crate::api::parts::config::WSConfig;
use crate::api::parts::{make_nonce, Event, Topic};
use crate::common::side::Side;
use crate::common::tif::TimeInForce;
use crate::msgs::{CancelAll, CancelOrder, LimitOrder, MarketOrder, Price};
use crate::msgs::md::{Candle, L2Snapshot, Ticker};
use crate::transaction::{Action, ActionMeta, Transaction, TransactionSigner};
#[derive(Debug, Clone, Default, Deserialize)]
#[allow(unused)]
pub struct AccountState {
pub margin: Margin,
pub positions: HashMap<String, PositionInfo>,
pub open_orders: HashMap<String, OrderState>,
pub leverage_settings: HashMap<String, LeverageSetting>,
}
#[allow(unused)]
pub type EventHandler = Box<dyn Fn(&Event) + Send + Sync>;
#[allow(unused)]
#[derive(Clone)]
pub struct BulkWsClient {
cmd_tx: mpsc::Sender<Command>,
handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>>,
ticker_rx: watch::Receiver<HashMap<String, Ticker>>,
account_rx: watch::Receiver<AccountState>,
signer: Option<TransactionSigner>,
default_timeout: Duration,
next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
actor_handle: std::sync::Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
connected: Arc<AtomicBool>,
disconnect_tx: broadcast::Sender<String>,
}
#[allow(unused)]
impl BulkWsClient {
pub async fn connect(config: WSConfig) -> eyre::Result<Self> {
info!("Connecting to {}", config.url);
let (ws_stream, _) = connect_async(&config.url).await?;
let (ws_write, ws_read) = ws_stream.split();
info!("Connected to Bulk Exchange WebSocket");
let (ticker_tx, ticker_rx) = watch::channel(HashMap::new());
let (account_tx, account_rx) = watch::channel(AccountState::default());
let (cmd_tx, cmd_rx) = mpsc::channel::<Command>(512);
let handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>> = Arc::default();
let handlers_task = Arc::clone(&handlers);
let (event_tx, mut event_rx) = mpsc::channel::<(Topic, Event)>(32768);
tokio::spawn(async move {
while let Some((topic, event)) = event_rx.recv().await {
let map = handlers_task.lock().unwrap();
if let Some(hs) = map.get(&topic) {
for h in hs {
h(&event);
}
}
}
});
let connected = Arc::new(AtomicBool::new(true));
let (disconnect_tx, _) = broadcast::channel::<String>(4);
let actor = Actor {
ws_write,
event_tx,
cmd_rx,
ticker_tx,
account_tx,
tickers: HashMap::new(),
prices: HashMap::new(),
account_state: AccountState::default(),
pending: HashMap::new(),
subscriptions: Vec::new(),
connected: Arc::clone(&connected),
disconnect_tx: disconnect_tx.clone(),
};
let mut initial_subs = Vec::new();
if config.track_account {
if let Some(ref signer) = config.signer {
let pk_str = signer.public_key_b58();
initial_subs.push(SubscriptionRequest::new(
"account",
json!({ "user": pk_str }),
));
}
}
if config.track_ticker {
for sym in &config.symbols {
initial_subs.push(SubscriptionRequest::new(
"ticker",
json!({ "symbol": sym }),
));
}
}
let actor_handle = tokio::spawn(actor.run(ws_read, initial_subs));
Ok(Self {
cmd_tx,
handlers,
ticker_rx,
account_rx,
signer: config.signer,
default_timeout: config.default_timeout,
next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
actor_handle: std::sync::Arc::new(tokio::sync::Mutex::new(Some(actor_handle))),
connected,
disconnect_tx,
})
}
pub async fn shutdown(&self) {
let _ = self.cmd_tx.send(Command::Shutdown).await;
if let Some(h) = self.actor_handle.lock().await.take() {
let _ = h.await;
}
}
pub async fn closed(&self) {
if let Some(h) = self.actor_handle.lock().await.take() {
let _ = h.await;
}
}
pub fn is_connected(&self) -> bool {
self.connected.load(Ordering::Relaxed)
}
pub fn get_ticker(&self, symbol: &str) -> Option<Ticker> {
self.ticker_rx.borrow().get(symbol).cloned()
}
pub fn get_price(&self, symbol: &str) -> Option<f64> {
self.ticker_rx.borrow().get(symbol).map(|x| x.mark_price)
}
pub fn get_tickers(&self) -> HashMap<String, Ticker> {
self.ticker_rx.borrow().clone()
}
pub fn get_margin(&self) -> Margin {
self.account_rx.borrow().margin.clone()
}
pub fn get_position(&self, symbol: &str) -> Option<PositionInfo> {
self.account_rx.borrow().positions.get(symbol).cloned()
}
pub fn get_positions(&self) -> HashMap<String, PositionInfo> {
self.account_rx.borrow().positions.clone()
}
pub fn get_leverage(&self, symbol: &str) -> Option<f64> {
self.account_rx
.borrow()
.leverage_settings
.get(symbol)
.map(|l| l.leverage)
}
pub async fn wait_tickers_changed(&mut self) -> eyre::Result<HashMap<String, Ticker>> {
self.ticker_rx.changed().await?;
Ok(self.ticker_rx.borrow().clone())
}
pub async fn wait_account_changed(&mut self) -> eyre::Result<AccountState> {
self.account_rx.changed().await?;
Ok(self.account_rx.borrow().clone())
}
pub async fn open_orders(&self, symbol: Option<&str>) -> eyre::Result<Vec<OrderState>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(Command::GetOrders {
symbol: symbol.map(Into::into),
respond: tx,
})
.await
.map_err(|_| eyre::eyre!("actor gone"))?;
Ok(rx.await?)
}
pub async fn place_orders(
&self,
actions: Vec<Action>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<Vec<Response>> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let pk = signer.public_key();
let mut tx = Transaction {
actions,
nonce,
account,
signer: signer.public_key(),
signature: Default::default(),
};
tx.sign(signer)?;
let request_id = self
.next_request_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let body = serde_json::to_string(&tx)?;
let json = format!(
r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
body, request_id
);
let (resp_tx, resp_rx) = oneshot::channel();
self.cmd_tx
.send(Command::Tx {
request_id,
json,
respond: resp_tx,
})
.await
.map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
match time::timeout(self.default_timeout, resp_rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => bail!("response channel dropped"),
Err(_) => bail!("order request {request_id} timed out"),
}
}
pub async fn update_oracle(
&self,
actions: Vec<Price>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<()> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let mut tx = Transaction {
actions: actions.iter().map(|a| a.clone().into()).collect(),
nonce,
account,
signer: signer.public_key(),
signature: Default::default(),
};
tx.sign(signer)?;
let request_id = self
.next_request_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let body = serde_json::to_string(&tx)?;
let json = format!(
r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
body, request_id
);
self.cmd_tx
.send(Command::AsyncTx {
json,
})
.await
.map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))
}
pub async fn place_limit_order(
&self,
symbol: &str,
side: Side,
price: f64,
size: f64,
tif: TimeInForce,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<Response> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let order = LimitOrder {
symbol: Arc::from(symbol),
is_buy: side == Side::Buy,
price,
size,
tif,
reduce_only,
iso: false,
meta: ActionMeta {
account,
nonce,
seqno: 0,
hash: None,
}
};
let resps = self.place_orders(vec![order.into()], None, None).await?;
resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
}
pub async fn place_market_order(
&self,
symbol: &str,
side: Side,
size: f64,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<Response> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let order = MarketOrder {
symbol: Arc::from(symbol),
is_buy: side == Side::Buy,
size,
reduce_only,
iso: false,
meta: ActionMeta {
account,
nonce,
seqno: 0,
hash: None,
}
};
let resps = self.place_orders(vec![order.into()], None, None).await?;
resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
}
pub async fn cancel_order(
&self,
symbol: &str,
order_id: &str,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<Response> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let cancel = CancelOrder {
symbol: symbol.to_string(),
oid: Hash::from_str(&order_id)?,
meta: ActionMeta {
account,
nonce,
seqno: 0,
hash: None,
}
};
let resps = self.place_orders(vec![cancel.into()], None, None).await?;
resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
}
pub async fn cancel_all(
&self,
symbols: Vec<String>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> eyre::Result<Response> {
let signer = self
.signer
.as_ref()
.ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
let account = if let Some(account) = account {
account
} else {
signer.public_key()
};
let nonce = nonce.unwrap_or_else(make_nonce);
let cancel = CancelAll {
symbols,
meta: ActionMeta {
account,
nonce,
seqno: 0,
hash: None,
}
};
let resps = self.place_orders(vec![cancel.into()], None, None).await?;
resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
}
pub fn subscribe_disconnect(&self) -> broadcast::Receiver<String> {
self.disconnect_tx.subscribe()
}
pub async fn subscribe_ticker(&self, symbol: &str) -> eyre::Result<()> {
self.subscribe(vec![
SubscriptionRequest::new("ticker", json!({ "symbol": symbol })),
]).await
}
pub async fn subscribe_trades(&self, symbols: &[&str]) -> eyre::Result<()> {
let subs = symbols
.iter()
.map(|s| SubscriptionRequest::new("trades", json!({ "symbol": s })))
.collect();
self.subscribe(subs).await
}
pub async fn subscribe_l2_snapshot(
&self,
symbol: &str,
nlevels: Option<u32>,
) -> eyre::Result<()> {
let mut params = json!({ "symbol": symbol });
if let Some(n) = nlevels {
params["nlevels"] = json!(n);
}
self.subscribe(vec![SubscriptionRequest::new("l2Snapshot", params)]).await
}
pub async fn subscribe_l2_delta(&self, symbol: &str) -> eyre::Result<()> {
self.subscribe(vec![
SubscriptionRequest::new("l2Delta", json!({ "symbol": symbol })),
]).await
}
pub async fn subscribe_candles(&self, symbol: &str, interval: &str) -> eyre::Result<()> {
self.subscribe(vec![SubscriptionRequest::new(
"candle",
json!({ "symbol": symbol, "interval": interval }),
)])
.await
}
async fn subscribe(&self, subs: Vec<SubscriptionRequest>) -> eyre::Result<()> {
self.cmd_tx
.send(Command::Subscribe(subs))
.await
.map_err(|_| eyre::eyre!("actor gone"))?;
Ok(())
}
pub async fn on(&self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static) {
self.handlers.lock().unwrap().entry(topic).or_default().push(Box::new(handler));
}
}
type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
type WsReader = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
struct Actor {
ws_write: WsWriter,
event_tx: mpsc::Sender<(Topic,Event)>,
cmd_rx: mpsc::Receiver<Command>,
ticker_tx: watch::Sender<HashMap<String, Ticker>>,
account_tx: watch::Sender<AccountState>,
tickers: HashMap<String, Ticker>,
prices: HashMap<String, f64>,
account_state: AccountState,
pending: HashMap<u64, oneshot::Sender<eyre::Result<Vec<Response>>>>,
subscriptions: Vec<SubscriptionRequest>,
connected: Arc<AtomicBool>,
disconnect_tx: broadcast::Sender<String>,
}
impl Actor {
async fn run(
mut self,
mut ws_read: WsReader,
initial_subs: Vec<SubscriptionRequest>,
) {
if !initial_subs.is_empty() {
if let Err(e) = self.send_subscribe(&initial_subs).await {
error!("Initial subscription failed: {e}");
return;
}
self.subscriptions = initial_subs;
}
self.emit(Topic::Status, &Event::Connected);
let disconnect_reason: String = 'actor: loop {
tokio::select! {
msg = ws_read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
debug!("msg {}: {}", text.len(), &text[0..512.min(text.len())]);
match serde_json::from_str::<Value>(&text) {
Ok(data) => self.handle_message(data, &text).await,
Err(e) => error!("JSON decode error: {e}"),
}
}
Some(Ok(Message::Close(_))) => {
warn!("WebSocket closed by server");
break 'actor "server closed the connection".into();
}
Some(Err(e)) => {
error!("WebSocket read error: {e}");
break 'actor format!("WebSocket read error: {e}");
}
None => {
warn!("WebSocket stream ended");
break 'actor "WebSocket stream ended".into();
}
_ => {} }
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(Command::Subscribe(subs)) => {
if let Err(e) = self.send_subscribe(&subs).await {
error!("Subscription send error: {e}");
}
self.subscriptions.extend(subs);
}
Some(Command::Tx { request_id, json, respond }) => {
self.pending.insert(request_id, respond);
if let Err(e) = self.ws_send_text(&json).await {
error!("Order send error: {e}");
if let Some(tx) = self.pending.remove(&request_id) {
let _ = tx.send(Err(e));
}
}
}
Some(Command::AsyncTx { json}) => {
if let Err(e) = self.ws_send_text(&json).await {
error!("Order send error: {e}");
}
}
Some(Command::SendRaw(json)) => {
if let Err(e) = self.ws_send_text(&json).await {
error!("Raw send error: {e}");
}
}
Some(Command::GetOrders { symbol, respond }) => {
let orders = match symbol {
Some(s) => self.account_state.open_orders
.values()
.filter(|o| o.symbol == s)
.cloned()
.collect(),
None => self.account_state.open_orders.values().cloned().collect(),
};
let _ = respond.send(orders);
}
Some(Command::Shutdown) | None => {
info!("Actor shutting down (requested)");
break 'actor "shutdown requested".into();
}
}
}
}
};
self.handle_disconnect(disconnect_reason).await;
}
async fn handle_disconnect(&mut self, reason: String) {
self.connected.store(false, Ordering::Release);
let err_msg = format!("disconnected: {reason}");
for (_, tx) in self.pending.drain() {
let _ = tx.send(Err(eyre::eyre!("{}", err_msg)));
}
self.emit(Topic::Status, &Event::Disconnected(reason.clone()));
let _ = self.disconnect_tx.send(reason.clone());
let _ = self.ws_write.close().await;
info!("Actor stopped: {reason}");
}
async fn ws_send_text(&mut self, text: &str) -> eyre::Result<()> {
let len = text.len();
debug!("sending msg len: {}", len);
self.ws_write
.send(Message::Text(text.into()))
.await
.map_err(|e| eyre::eyre!("ws write: {e}"))?;
Ok(())
}
async fn handle_message(&mut self, data: Value, json: &str) {
let msg_type = data["type"].as_str().unwrap_or("");
match msg_type {
"subscriptionResponse" => {
info!(
"Subscription confirmed: {:?}",
data["topics"].as_array().map(|a| a.len())
);
}
"ticker" => {
let ticker_v = &data["data"]["ticker"];
if let Ok(ticker) = serde_json::from_value::<Ticker>(ticker_v.clone()) {
self.prices.insert(ticker.symbol.clone(), ticker.mark_price);
self.tickers.insert(ticker.symbol.clone(), ticker.clone());
let _ = self.ticker_tx.send(self.tickers.clone());
self.emit(Topic::Ticker, &Event::Ticker(ticker.clone()));
debug!("Ticker: {} mark={:.2}", ticker.symbol, ticker.mark_price);
} else {
error!("Could not parse ticker event: {:?}", ticker_v);
}
}
"trades" => {
if let Ok(trades) = serde_json::from_value::<Vec<Fill>>(data["data"].clone()) {
self.emit(Topic::Trades, &Event::Trades(trades));
} else {
error!("Could not parse trades event: {:?}", data["data"]);
}
}
"l2Snapshot" => {
if let Ok(l2_snapshot) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
self.emit(Topic::L2Snapshot, &Event::L2Snapshot(l2_snapshot));
} else {
error!("Could not parse l2_snapshot event: msg: {:?}", data["data"]);
}
}
"l2Delta" => {
if let Ok(l2_delta) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
self.emit(Topic::L2Delta, &Event::L2Delta(l2_delta));
} else {
error!("Could not parse l2_delta event: {:?}", data["data"]);
}
}
"candle" => {
if let Ok(candle) = serde_json::from_value::<Candle>(data["data"].clone()) {
self.emit(Topic::Candle, &Event::Candle(candle));
} else {
error!("Could not parse candle event: {:?}", data["data"]);
}
}
"account" => {
self.handle_account(&data["data"]).await;
}
"post" => {
self.handle_post_response(&data, json);
}
other => {
debug!("Unhandled message type: {other}");
}
}
}
async fn handle_account(&mut self, data: &Value) {
let update_type = data["type"].as_str().unwrap_or("");
match update_type {
"accountSnapshot" => {
if let Ok(margin) = serde_json::from_value::<Margin>(data["margin"].clone()) {
self.account_state.margin = margin.clone();
self.emit(Topic::Margin, &Event::Margin(margin))
}
if let Ok(positions) = serde_json::from_value::<Vec<PositionInfo>>(data["positions"].clone()) {
for position in &positions {
self.emit(Topic::Position, &Event::Position(position.clone()))
}
self.account_state.positions = positions
.into_iter()
.map(|p| (p.symbol.clone(), p.clone()))
.collect();
}
if let Ok(orders) = serde_json::from_value::<Vec<OrderState>>(data["openOrders"].clone()) {
for order in &orders {
self.emit(Topic::Order, &Event::Order(order.clone()))
}
self.account_state.open_orders = orders
.into_iter()
.map(|o| (o.order_id.clone(), o))
.collect();
}
if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverageSettings"].clone()) {
self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
for l in leverages {
self.account_state.leverage_settings.insert(l.symbol.clone(), l);
}
}
info!(
"Account snapshot: balance={:.2}, positions={}, orders={}",
self.account_state.margin.total_balance,
self.account_state.positions.len(),
self.account_state.open_orders.len(),
);
}
"orderUpdate" => {
if let Ok(order) = serde_json::from_value::<OrderState>(data.clone()) {
let oid = order.order_id.clone();
if order.status.is_terminal() {
self.account_state.open_orders.remove(&oid);
} else {
self.account_state.open_orders.insert(oid, order.clone());
}
self.emit(Topic::Order, &Event::Order(order));
} else {
error!("Could not parse order event: {:?}", data);
}
}
"marginUpdate" => {
if let Ok(margin) = serde_json::from_value::<Margin>(data.clone()) {
self.account_state.margin = margin.clone();
self.publish_account();
self.emit(Topic::Margin, &Event::Margin(margin));
} else {
error!("Could not parse margin event: {:?}", data);
}
}
"positionUpdate" => {
if let Ok(pos) = serde_json::from_value::<PositionInfo>(data.clone()) {
self.account_state.positions.insert(pos.symbol.clone(), pos.clone());
self.emit(Topic::Position, &Event::Position(pos));
self.publish_account();
} else {
error!("Could not parse position event: {:?}", data);
}
}
"fill" => {
if let Ok(fill) = serde_json::from_value::<Fill>(data.clone()) {
let dir = fill.side.dir();
if let Some(order) = self.account_state.open_orders.get_mut(&fill.order_id) {
order.filled_size += fill.size;
order.signed_size -= dir * fill.size;
if order.signed_size * dir <= 0.0 {
self.account_state.open_orders.remove(&fill.order_id);
}
}
self.publish_account();
self.emit(Topic::Fill, &Event::Fill(fill.clone()));
info!(
"Fill: {} {:?} {} @ {} maker={}",
fill.symbol, fill.side, fill.size, fill.price, fill.is_maker,
);
} else {
error!("Could not parse fill event: {:?}", data);
}
}
"leverageUpdate" => {
if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverage"].clone()) {
for lev in &leverages {
self.account_state.leverage_settings.insert(lev.symbol.clone(), lev.clone());
}
self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
self.publish_account();
} else {
error!("Could not parse leverage event: {:?}", data);
}
}
_ => {
debug!("Unknown account update: {update_type}");
}
}
}
fn handle_post_response(&mut self, data: &Value, _json: &str) {
let request_id = data["id"].as_u64().unwrap_or(0);
let inner = &data["data"];
let rtype = inner["type"].as_str().unwrap_or("");
let sender = self.pending.remove(&request_id);
match rtype {
"action"=> {
let payload = &inner["payload"];
let status = payload["status"].as_str().unwrap_or("");
if status != "ok" {
error!("Order request {request_id} failed: {status}");
if let Some(tx) = sender {
let _ = tx.send(Err(eyre::eyre!("order request failed: {}", data)));
}
self.emit(Topic::Error, &Event::Error(data.clone()));
} else {
let responses = Response::parse_responses(data);
if let Some(tx) = sender {
let _ = tx.send(Ok(responses));
}
}
}
"ack" => {
let ok = inner["ok"].as_bool().unwrap_or(false);
let response = if ok {
Response {
order_id: None,
status: "OK".to_string(),
message: None,
raw: inner.clone(),
}
} else {
let message = inner["message"].as_str().unwrap_or("");
Response {
order_id: None,
status: "Error".to_string(),
message: Some(message.to_string()),
raw: inner.clone(),
}
};
if let Some(tx) = sender {
let _ = tx.send(Ok(vec![response]));
}
}
_ => panic!("unknown response type: {}", rtype),
}
}
fn publish_account(&self) {
let _ = self.account_tx.send(self.account_state.clone());
}
fn emit(&self, topic: Topic, data: &Event) {
let _ = self.event_tx.try_send((topic, data.clone()));
}
async fn ws_send_json(&mut self, value: &Value) -> eyre::Result<()> {
let text = serde_json::to_string(value)?;
self.ws_write
.send(Message::Text(text.into()))
.await
.map_err(|e| eyre::eyre!("ws write: {e}"))?;
Ok(())
}
async fn send_subscribe(&mut self, subs: &[SubscriptionRequest]) -> eyre::Result<()> {
let request = json!({
"method": "subscribe",
"subscription": subs.iter().map(|s| s.to_json()).collect::<Vec<_>>(),
});
self.ws_send_json(&request).await?;
info!("Subscribed to {} topics", subs.len());
Ok(())
}
}