Skip to main content

bulk_client/api/
bulk_ws.rs

1//! Bulk Labs WebSocket Trading Client — Actor + Watch architecture.
2//!
3//! All mutable state lives inside a single [`Actor`] task. The public
4//! [`BulkWsClient`] handle is a cheap, cloneable struct that communicates
5//! with the actor via:
6//!
7//! - **`tokio::sync::watch`** for hot-path reads (tickers, prices, margin)
8//!   — zero-cost `.borrow()`, no async round-trip.
9//! - **`mpsc` command channel** for writes (subscribe, place orders, etc.).
10//! - **`oneshot`** for request/response flows (order placement).
11//!
12//! ```text
13//!  ┌──────────────┐         mpsc::channel           ┌───────────────┐
14//!  │ BulkWsClient │ ───── Command ────────────────▶ │     Actor     │
15//!  │   (handle)   │ ◀──── watch::Receiver ────────  │  (owns state) │
16//!  └──────────────┘                                 └───────┬───────┘
17//!        │                                                  │
18//!        │ oneshot for order responses                      │ tokio::select!
19//!        └─────────────────────────────────────────────────▶│◀── ws_read
20//! ```
21//!
22//! # Example
23//!
24//! ```rust,no_run
25//! use bulk_client::*;
26//! use bulk_client::common::side::Side;
27//! use bulk_client::common::tif::TimeInForce;
28//! use bulk_client::transaction::TransactionSigner;
29//! use bulk_client::parts::WSConfig;
30//!
31//! #[tokio::main]
32//! async fn main() -> eyre::Result<()> {
33//!     let signer = TransactionSigner::from_private_key("your_base58_key")?;
34//!
35//!     let client = BulkWsClient::connect(WSConfig {
36//!         url: "wss://exchange-wss.bulk.trade".into(),
37//!         symbols: vec!["BTC-USD".into(), "ETH-USD".into()],
38//!         signer: Some(signer),
39//!         ..Default::default()
40//!     }).await?;
41//!
42//!     // Zero-cost read — no lock, no channel round-trip
43//!     if let Some(ticker) = client.get_ticker("BTC-USD") {
44//!         println!("BTC mark price: {}", ticker.mark_price);
45//!     }
46//!
47//!     // Place an order — goes through actor → ws
48//!     let resp = client.place_limit_order(
49//!         "BTC-USD", Side::Buy, 95_000.0, 0.01,
50//!         TimeInForce::GTC, false, None, None,
51//!     ).await?;
52//!
53//!     client.shutdown().await;
54//!     Ok(())
55//! }
56//! ```
57
58// ─────────────────────────────────────────────────────────────────────────────
59// Topic enum (mirrors topics.py)
60// ─────────────────────────────────────────────────────────────────────────────
61
62use std::collections::HashMap;
63use std::str::FromStr;
64use std::sync::{Arc, Mutex};
65use std::sync::atomic::{AtomicBool, Ordering};
66use std::time::Duration;
67use eyre::bail;
68use serde_json::{json, Value};
69use crate::msgs::account::{Fill, LeverageSetting, Margin, OrderState, PositionInfo};
70use crate::msgs::responses::Response;
71use crate::msgs::subscription::SubscriptionRequest;
72
73use futures_util::{SinkExt, StreamExt};
74use futures_util::stream::SplitSink;
75use serde::Deserialize;
76use solana_hash::Hash;
77use solana_pubkey::Pubkey;
78use tokio::net::TcpStream;
79use tokio::sync::{broadcast, mpsc, oneshot, watch};
80use tokio::time;
81use tokio_tungstenite::{
82    connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
83};
84use tracing::{debug, error, info, warn};
85use crate::api::parts::command::Command;
86use crate::api::parts::config::WSConfig;
87use crate::api::parts::{make_nonce, Event, Topic};
88use crate::common::side::Side;
89use crate::common::tif::TimeInForce;
90use crate::msgs::{CancelAll, CancelOrder, LimitOrder, MarketOrder, Price};
91use crate::msgs::md::{Candle, L2Snapshot, Ticker};
92use crate::transaction::{Action, ActionMeta, Transaction, TransactionSigner};
93// ─────────────────────────────────────────────────────────────────────────────
94// Snapshot: the full state picture pushed over a single watch channel
95// ─────────────────────────────────────────────────────────────────────────────
96
97/// Everything a reader might want, exposed as one cheap clone via `watch`.
98/// The actor publishes a new snapshot after every state-mutating message.
99#[derive(Debug, Clone, Default, Deserialize)]
100#[allow(unused)]
101pub struct AccountState {
102    pub margin: Margin,
103    pub positions: HashMap<String, PositionInfo>,
104    pub open_orders: HashMap<String, OrderState>,
105    pub leverage_settings: HashMap<String, LeverageSetting>,
106}
107
108// ═════════════════════════════════════════════════════════════════════════════
109// Event callback
110// ═════════════════════════════════════════════════════════════════════════════
111
112/// User-supplied callback. Receives the raw JSON payload for the topic.
113/// Runs synchronously inside the actor loop — keep it fast or spawn.
114#[allow(unused)]
115pub type EventHandler = Box<dyn Fn(&Event) + Send + Sync>;
116
117// ═════════════════════════════════════════════════════════════════════════════
118// BulkWsClient  —  the public handle (cheap clone, no locks)
119// ═════════════════════════════════════════════════════════════════════════════
120
121/// Cloneable client handle.
122///
123/// - **Hot reads** (ticker, price, margin): `watch::Receiver::borrow()` — zero
124///   async overhead, just a ref-counted pointer swap.
125/// - **Writes** (subscribe, place orders): go through the `mpsc` command
126///   channel to the actor, which serializes all mutations.
127/// - **Cold reads** (open orders list): round-trip through the actor via
128///   `oneshot` — still fast, but async.
129#[allow(unused)]
130#[derive(Clone)]
131pub struct BulkWsClient {
132    // Command channel to the actor
133    cmd_tx: mpsc::Sender<Command>,
134    // Event handlers
135    handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>>,
136
137    // ── Watch receivers (hot-path, lock-free) ──────────────────────────
138    /// Per-symbol ticker snapshots.
139    ticker_rx: watch::Receiver<HashMap<String, Ticker>>,
140    /// Consolidated account state (margin, positions, orders, leverage).
141    account_rx: watch::Receiver<AccountState>,
142
143    // ── Config carried on the handle for convenience ───────────────────
144    signer: Option<TransactionSigner>,
145    default_timeout: Duration,
146
147    // Monotonic request ID (atomic — no lock needed)
148    next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
149
150    // Actor join handle (held in Arc so Clone works)
151    actor_handle: std::sync::Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
152
153    /// `true` while the WebSocket actor is running.  Flipped to `false` the
154    /// moment the actor begins disconnect teardown.  Cheap sync check, no await.
155    connected: Arc<AtomicBool>,
156
157    /// Fires once (with the disconnect reason) when the actor exits.
158    /// Clone a receiver via [`BulkWsClient::subscribe_disconnect`] so that
159    /// any spawned task can unblock immediately on connection loss.
160    disconnect_tx: broadcast::Sender<String>,
161}
162
163#[allow(unused)]
164impl BulkWsClient {
165    // ─────────────────────────────────────────────────────────────────────
166    // Construction + connection
167    // ─────────────────────────────────────────────────────────────────────
168
169    /// Connect to the exchange and spawn the actor task.
170    /// Returns immediately once the WebSocket handshake succeeds.
171    ///
172    /// # Arguments
173    /// - `config`: web socket config
174    pub async fn connect(config: WSConfig) -> eyre::Result<Self> {
175        info!("Connecting to {}", config.url);
176        let (ws_stream, _) = connect_async(&config.url).await?;
177        let (ws_write, ws_read) = ws_stream.split();
178        info!("Connected to Bulk Exchange WebSocket");
179
180        // Watch channels
181        let (ticker_tx, ticker_rx) = watch::channel(HashMap::new());
182        let (account_tx, account_rx) = watch::channel(AccountState::default());
183
184        // Command channel (bounded — back-pressure if actor falls behind)
185        let (cmd_tx, cmd_rx) = mpsc::channel::<Command>(512);
186
187        // Shared handler map between the dispatch task and BulkWsClient (for on() registration)
188        let handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>> = Arc::default();
189        let handlers_task = Arc::clone(&handlers);
190        let (event_tx, mut event_rx) = mpsc::channel::<(Topic, Event)>(32768);
191
192        tokio::spawn(async move {
193            while let Some((topic, event)) = event_rx.recv().await {
194                let map = handlers_task.lock().unwrap();
195                if let Some(hs) = map.get(&topic) {
196                    for h in hs {
197                        h(&event);
198                    }
199                }
200            }
201        });
202
203        // Build actor
204        let connected = Arc::new(AtomicBool::new(true));
205        let (disconnect_tx, _) = broadcast::channel::<String>(4);
206
207        let actor = Actor {
208            ws_write,
209            event_tx,
210            cmd_rx,
211            ticker_tx,
212            account_tx,
213            tickers: HashMap::new(),
214            prices: HashMap::new(),
215            account_state: AccountState::default(),
216            pending: HashMap::new(),
217            subscriptions: Vec::new(),
218            connected: Arc::clone(&connected),
219            disconnect_tx: disconnect_tx.clone(),
220        };
221
222        // Default subscriptions (same as Python __init__)
223        let mut initial_subs = Vec::new();
224        if config.track_account {
225            if let Some(ref signer) = config.signer {
226                let pk_str = signer.public_key_b58();
227                initial_subs.push(SubscriptionRequest::new(
228                    "account",
229                    json!({ "user": pk_str }),
230                ));
231            }
232        }
233        if config.track_ticker {
234            for sym in &config.symbols {
235                initial_subs.push(SubscriptionRequest::new(
236                    "ticker",
237                    json!({ "symbol": sym }),
238                ));
239            }
240        }
241
242        // Spawn actor
243        let actor_handle = tokio::spawn(actor.run(ws_read, initial_subs));
244
245        Ok(Self {
246            cmd_tx,
247            handlers,
248            ticker_rx,
249            account_rx,
250            signer: config.signer,
251            default_timeout: config.default_timeout,
252            next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
253            actor_handle: std::sync::Arc::new(tokio::sync::Mutex::new(Some(actor_handle))),
254            connected,
255            disconnect_tx,
256        })
257    }
258
259    /// Shut down the actor and close the WebSocket.
260    pub async fn shutdown(&self) {
261        let _ = self.cmd_tx.send(Command::Shutdown).await;
262        if let Some(h) = self.actor_handle.lock().await.take() {
263            let _ = h.await;
264        }
265    }
266
267    /// Wait for the actor to exit (e.g. on disconnect / error).
268    pub async fn closed(&self) {
269        if let Some(h) = self.actor_handle.lock().await.take() {
270            let _ = h.await;
271        }
272    }
273
274    /// Returns `true` if the WebSocket actor is still running.
275    ///
276    /// This is a cheap, lock-free check — safe to call in hot loops.
277    /// Once it returns `false` you must call [`BulkWsClient::connect`] again
278    /// to establish a new connection.
279    pub fn is_connected(&self) -> bool {
280        self.connected.load(Ordering::Relaxed)
281    }
282
283
284    // ─────────────────────────────────────────────────────────────────────
285    // Hot-path reads (zero-cost — no async, no lock)
286    // ─────────────────────────────────────────────────────────────────────
287
288    /// Get latest ticker for `symbol`, or `None` if not yet received.
289    ///
290    /// # Arguments
291    /// - `symbol`: symbol to retrieve for
292    ///
293    /// # Returns
294    /// - current ticker if available
295    pub fn get_ticker(&self, symbol: &str) -> Option<Ticker> {
296        self.ticker_rx.borrow().get(symbol).cloned()
297    }
298
299    /// Get Latest mark price for `symbol`.
300    ///
301    /// # Arguments
302    /// - `symbol`: symbol to retrieve for
303    ///
304    /// # Returns
305    /// - current price if available
306    pub fn get_price(&self, symbol: &str) -> Option<f64> {
307        self.ticker_rx.borrow().get(symbol).map(|x| x.mark_price)
308    }
309
310    /// All current tickers, keyed by symbol.
311    ///
312    /// # Returns
313    /// - all current tickers
314    pub fn get_tickers(&self) -> HashMap<String, Ticker> {
315        self.ticker_rx.borrow().clone()
316    }
317
318    /// Get Current account margin.
319    pub fn get_margin(&self) -> Margin {
320        self.account_rx.borrow().margin.clone()
321    }
322
323    /// Get current position for `symbol`.
324    ///
325    /// # Arguments
326    /// - `symbol`: symbol to retrieve for
327    ///
328    /// # Returns
329    /// - current position for symbol if available
330    pub fn get_position(&self, symbol: &str) -> Option<PositionInfo> {
331        self.account_rx.borrow().positions.get(symbol).cloned()
332    }
333
334    /// Get all positions.
335    pub fn get_positions(&self) -> HashMap<String, PositionInfo> {
336        self.account_rx.borrow().positions.clone()
337    }
338
339    /// Current leverage setting for `symbol`.
340    ///
341    /// # Arguments
342    /// - `symbol`: symbol to retrieve for
343    ///
344    /// # Returns
345    /// - current leverage if available
346    pub fn get_leverage(&self, symbol: &str) -> Option<f64> {
347        self.account_rx
348            .borrow()
349            .leverage_settings
350            .get(symbol)
351            .map(|l| l.leverage)
352    }
353
354    // ─────────────────────────────────────────────────────────────────────
355    // Async reads that wait for changes
356    // ─────────────────────────────────────────────────────────────────────
357
358    /// Block until any ticker changes, then return the updated map.
359    pub async fn wait_tickers_changed(&mut self) -> eyre::Result<HashMap<String, Ticker>> {
360        self.ticker_rx.changed().await?;
361        Ok(self.ticker_rx.borrow().clone())
362    }
363
364    /// Block until account state changes (margin, positions, orders, leverage).
365    pub async fn wait_account_changed(&mut self) -> eyre::Result<AccountState> {
366        self.account_rx.changed().await?;
367        Ok(self.account_rx.borrow().clone())
368    }
369
370    // ─────────────────────────────────────────────────────────────────────
371    // Cold reads (round-trip through actor)
372    // ─────────────────────────────────────────────────────────────────────
373
374    /// Open orders, optionally filtered by symbol.
375    ///
376    /// # Arguments
377    /// - `symbol`: optional symbol to retrieve for
378    ///
379    /// # Returns
380    /// - current orders or order status
381    pub async fn open_orders(&self, symbol: Option<&str>) -> eyre::Result<Vec<OrderState>> {
382        let (tx, rx) = oneshot::channel();
383        self.cmd_tx
384            .send(Command::GetOrders {
385                symbol: symbol.map(Into::into),
386                respond: tx,
387            })
388            .await
389            .map_err(|_| eyre::eyre!("actor gone"))?;
390        Ok(rx.await?)
391    }
392
393    // ─────────────────────────────────────────────────────────────────────
394    // Order placement
395    // ─────────────────────────────────────────────────────────────────────
396
397    /// Place one or more actions (limit, market, cancel, cancel-all).
398    /// Signs the bundle, sends through the actor, and awaits the exchange
399    /// response with the configured timeout.
400    ///
401    /// # Arguments
402    /// - `actions`: list of orders, cancels, etc
403    /// - `nonce`: nonce to be used
404    ///
405    /// # Returns
406    /// - list of responses
407    pub async fn place_orders(
408        &self,
409        actions: Vec<Action>,
410        account: Option<Pubkey>,
411        nonce: Option<u64>,
412    ) -> eyre::Result<Vec<Response>> {
413        let signer = self
414            .signer
415            .as_ref()
416            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
417
418        let account = if let Some(account) = account {
419            account
420        } else {
421            signer.public_key()
422        };
423
424        let nonce = nonce.unwrap_or_else(make_nonce);
425        let pk = signer.public_key();
426
427        // Build + sign the transaction
428        let mut tx = Transaction {
429            actions,
430            nonce,
431            account,
432            signer: signer.public_key(),
433            signature: Default::default(),
434        };
435        tx.sign(signer)?;
436
437        let request_id = self
438            .next_request_id
439            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440
441        // Build JSON body via tx serialization
442        let body = serde_json::to_string(&tx)?;
443        let json = format!(
444            r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
445            body, request_id
446        );
447
448        let (resp_tx, resp_rx) = oneshot::channel();
449
450        self.cmd_tx
451            .send(Command::Tx {
452                request_id,
453                json,
454                respond: resp_tx,
455            })
456            .await
457            .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
458
459        match time::timeout(self.default_timeout, resp_rx).await {
460            Ok(Ok(result)) => result,
461            Ok(Err(_)) => bail!("response channel dropped"),
462            Err(_) => bail!("order request {request_id} timed out"),
463        }
464    }
465
466    /// Send oracle price updates and return the exchange's responses.
467    ///
468    /// Waits for the exchange to acknowledge the transaction so that any
469    /// rejection (e.g. invalid price, authorisation failure) is surfaced to
470    /// the caller rather than silently dropped.
471    ///
472    /// # Arguments
473    /// - `actions`: list of oracle price updates
474    /// - `account`: optional override for the signing account
475    /// - `nonce`: optional nonce override; a fresh one is generated when `None`
476    ///
477    /// # Returns
478    /// - `Ok(responses)` — one [`Response`] per submitted price; callers should
479    ///   inspect each entry with [`Response::is_error`] to detect rejections.
480    /// - `Err(_)` — transport-level failure (send error, timeout, dropped channel).
481    pub async fn update_oracle(
482        &self,
483        actions: Vec<Price>,
484        account: Option<Pubkey>,
485        nonce: Option<u64>,
486    ) -> eyre::Result<Vec<Response>> {
487        let signer = self
488            .signer
489            .as_ref()
490            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
491
492        let account = if let Some(account) = account {
493            account
494        } else {
495            signer.public_key()
496        };
497
498        let nonce = nonce.unwrap_or_else(make_nonce);
499
500        // Build + sign the transaction
501        let mut tx = Transaction {
502            actions: actions.iter().map(|a| a.clone().into()).collect(),
503            nonce,
504            account,
505            signer: signer.public_key(),
506            signature: Default::default(),
507        };
508        tx.sign(signer)?;
509
510        let request_id = self
511            .next_request_id
512            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
513
514        // Build JSON body via tx serialization
515        let body = serde_json::to_string(&tx)?;
516        let json = format!(
517            r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
518            body, request_id
519        );
520
521        let (resp_tx, resp_rx) = oneshot::channel();
522
523        self.cmd_tx
524            .send(Command::Tx {
525                request_id,
526                json,
527                respond: resp_tx,
528            })
529            .await
530            .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
531
532        match time::timeout(self.default_timeout, resp_rx).await {
533            Ok(Ok(result)) => result,
534            Ok(Err(_)) => bail!("oracle update response channel dropped"),
535            Err(_) => bail!("oracle update request {request_id} timed out"),
536        }
537    }
538
539    // ── Convenience wrappers ─────────────────────────────────────────────
540
541    /// Place limit order
542    ///
543    /// # Arguments
544    /// - `symbol`: which market to execute in
545    /// - `side`: buy or sell
546    /// - `price`: limit price
547    /// - `size`: order size
548    /// - `tif`: time in force
549    /// - `reduce_only`: true if order is reduce only
550    ///
551    /// # Returns
552    /// - response for order placement
553    pub async fn place_limit_order(
554        &self,
555        symbol: &str,
556        side: Side,
557        price: f64,
558        size: f64,
559        tif: TimeInForce,
560        reduce_only: bool,
561        account: Option<Pubkey>,
562        nonce: Option<u64>,
563    ) -> eyre::Result<Response> {
564        let signer = self
565            .signer
566            .as_ref()
567            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
568
569        let account = if let Some(account) = account {
570            account
571        } else {
572            signer.public_key()
573        };
574
575        let nonce = nonce.unwrap_or_else(make_nonce);
576        let order = LimitOrder {
577            symbol: Arc::from(symbol),
578            is_buy: side == Side::Buy,
579            price,
580            size,
581            tif,
582            reduce_only,
583            iso: false,
584            meta: ActionMeta {
585                account,
586                nonce,
587                seqno: 0,
588                hash: None,
589            }
590        };
591        let resps = self.place_orders(vec![order.into()], None, None).await?;
592        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
593    }
594
595    /// Place market order
596    ///
597    /// # Arguments
598    /// - `symbol`: which market to execute in
599    /// - `side`: buy or sell
600    /// - `size`: order size
601    /// - `reduce_only`: true if order is reduce only
602    ///
603    /// # Returns
604    /// - response for order placement
605    pub async fn place_market_order(
606        &self,
607        symbol: &str,
608        side: Side,
609        size: f64,
610        reduce_only: bool,
611        account: Option<Pubkey>,
612        nonce: Option<u64>,
613    ) -> eyre::Result<Response> {
614        let signer = self
615            .signer
616            .as_ref()
617            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
618
619        let account = if let Some(account) = account {
620            account
621        } else {
622            signer.public_key()
623        };
624
625        let nonce = nonce.unwrap_or_else(make_nonce);
626        let order = MarketOrder {
627            symbol: Arc::from(symbol),
628            is_buy: side == Side::Buy,
629            size,
630            reduce_only,
631            iso: false,
632            meta: ActionMeta {
633                account,
634                nonce,
635                seqno: 0,
636                hash: None,
637            }
638        };
639
640        let resps = self.place_orders(vec![order.into()], None, None).await?;
641        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
642    }
643
644    /// Cancel order
645    ///
646    /// # Arguments
647    /// - `symbol`: which market to execute in
648    /// - `order_id`: order ID to cancel
649    ///
650    /// # Returns
651    /// - response for order cancel
652    pub async fn cancel_order(
653        &self,
654        symbol: &str,
655        order_id: &str,
656        account: Option<Pubkey>,
657        nonce: Option<u64>,
658    ) -> eyre::Result<Response> {
659        let signer = self
660            .signer
661            .as_ref()
662            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
663
664        let account = if let Some(account) = account {
665            account
666        } else {
667            signer.public_key()
668        };
669
670        let nonce = nonce.unwrap_or_else(make_nonce);
671        let cancel = CancelOrder {
672            symbol: symbol.to_string(),
673            oid: Hash::from_str(&order_id)?,
674            meta: ActionMeta {
675                account,
676                nonce,
677                seqno: 0,
678                hash: None,
679            }
680        };
681
682        let resps = self.place_orders(vec![cancel.into()], None, None).await?;
683        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
684    }
685
686    /// Cancel all order
687    ///
688    /// # Arguments
689    /// - `symbols`: which symbols to cancel
690    ///
691    /// # Returns
692    /// - response for order cancel
693    pub async fn cancel_all(
694        &self, 
695        symbols: Vec<String>,
696        account: Option<Pubkey>,
697        nonce: Option<u64>,
698    ) -> eyre::Result<Response> {
699        let signer = self
700            .signer
701            .as_ref()
702            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
703
704        let account = if let Some(account) = account {
705            account
706        } else {
707            signer.public_key()
708        };
709
710        let nonce = nonce.unwrap_or_else(make_nonce);
711        let cancel = CancelAll {
712            symbols,
713            meta: ActionMeta {
714                account,
715                nonce,
716                seqno: 0,
717                hash: None,
718            }
719        };
720        let resps = self.place_orders(vec![cancel.into()], None, None).await?;
721        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
722    }
723
724
725    // ─────────────────────────────────────────────────────────────────────
726    // Subscriptions
727    // ─────────────────────────────────────────────────────────────────────
728
729    /// Subscribe to disconnect notifications.
730    ///
731    /// The returned receiver fires exactly once, carrying the human-readable
732    /// disconnect reason, when the actor exits for any reason (server close,
733    /// network error, or explicit [`shutdown`]).
734    ///
735    /// Use this as a *poison pill* for any tasks you spawned that should stop
736    /// when the connection is lost:
737    ///
738    /// ```text
739    /// let mut rx = client.subscribe_disconnect();
740    /// tokio::spawn(async move {
741    ///     let _ = rx.recv().await; // blocks until disconnect
742    ///     // clean up your task here
743    /// });
744    /// ```
745    pub fn subscribe_disconnect(&self) -> broadcast::Receiver<String> {
746        self.disconnect_tx.subscribe()
747    }
748
749    /// Subscribe to ticker for `symbol`.
750    ///
751    /// # Arguments
752    /// - `symbol`: symbol to subscrive to
753    pub async fn subscribe_ticker(&self, symbol: &str) -> eyre::Result<()> {
754        self.subscribe(vec![
755            SubscriptionRequest::new("ticker", json!({ "symbol": symbol })),
756        ]).await
757    }
758
759    /// Subscribe to fills for `symbol`.
760    ///
761    /// # Arguments
762    /// - `symbols`: list of symbol to subscribe to
763    pub async fn subscribe_trades(&self, symbols: &[&str]) -> eyre::Result<()> {
764        let subs = symbols
765            .iter()
766            .map(|s| SubscriptionRequest::new("trades", json!({ "symbol": s })))
767            .collect();
768        self.subscribe(subs).await
769    }
770
771    /// Subscribe to L2 snapshots for `symbol`.
772    ///
773    /// # Arguments
774    /// - `symbol`: symbol to subscribe to
775    pub async fn subscribe_l2_snapshot(
776        &self,
777        symbol: &str,
778        nlevels: Option<u32>,
779    ) -> eyre::Result<()> {
780        let mut params = json!({ "symbol": symbol });
781        if let Some(n) = nlevels {
782            params["nlevels"] = json!(n);
783        }
784        self.subscribe(vec![SubscriptionRequest::new("l2Snapshot", params)]).await
785    }
786
787    /// Subscribe to L2 deltas for `symbol`.
788    ///
789    /// # Arguments
790    /// - `symbol`: symbol to subscribe to
791    pub async fn subscribe_l2_delta(&self, symbol: &str) -> eyre::Result<()> {
792        self.subscribe(vec![
793            SubscriptionRequest::new("l2Delta", json!({ "symbol": symbol })),
794        ]).await
795    }
796
797    /// Subscribe to candles for `symbol`.
798    ///
799    /// # Arguments
800    /// - `symbol`: symbol to subscribe to
801    /// - `interval`: bar period ("1min", "5min", ...)
802    pub async fn subscribe_candles(&self, symbol: &str, interval: &str) -> eyre::Result<()> {
803        self.subscribe(vec![SubscriptionRequest::new(
804            "candle",
805            json!({ "symbol": symbol, "interval": interval }),
806        )])
807            .await
808    }
809
810    /// Subscribe list of subscription requests
811    ///
812    /// # Arguments
813    /// - `subs`: subscription list
814    async fn subscribe(&self, subs: Vec<SubscriptionRequest>) -> eyre::Result<()> {
815        self.cmd_tx
816            .send(Command::Subscribe(subs))
817            .await
818            .map_err(|_| eyre::eyre!("actor gone"))?;
819        Ok(())
820    }
821
822    // ─────────────────────────────────────────────────────────────────────
823    // Event handlers
824    // ─────────────────────────────────────────────────────────────────────
825
826    /// Register a callback for a topic. The callback runs synchronously
827    /// inside the actor loop — keep it fast or `tokio::spawn` from within.
828    ///
829    /// # Argument
830    /// - `topic`: topic to subscribe to
831    /// - `handler`: callback for topic
832    pub async fn on(&self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static) {
833        self.handlers.lock().unwrap().entry(topic).or_default().push(Box::new(handler));
834    }
835}
836
837// ═════════════════════════════════════════════════════════════════════════════
838// Actor — owns all mutable state, runs in a single task
839// ═════════════════════════════════════════════════════════════════════════════
840
841type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
842type WsReader = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
843
844struct Actor {
845    // WebSocket write half
846    ws_write: WsWriter,
847    // Event sender
848    event_tx: mpsc::Sender<(Topic,Event)>,
849
850    // Inbound commands from the client handle
851    cmd_rx: mpsc::Receiver<Command>,
852
853    // ── Watch senders (actor pushes, clients read) ─────────────────────
854    ticker_tx: watch::Sender<HashMap<String, Ticker>>,
855    account_tx: watch::Sender<AccountState>,
856
857    // ── Owned state (no locks) ─────────────────────────────────────────
858    tickers: HashMap<String, Ticker>,
859    prices: HashMap<String, f64>,
860    account_state: AccountState,
861    pending: HashMap<u64, oneshot::Sender<eyre::Result<Vec<Response>>>>,
862
863    // Subscription log (for reconnection replay)
864    subscriptions: Vec<SubscriptionRequest>,
865
866    // ── Disconnect signalling ─────────────────────────────────────────────
867    /// Shared flag — flipped to `false` before the `Disconnected` event fires.
868    connected: Arc<AtomicBool>,
869    /// Broadcast once with the disconnect reason when the actor exits.
870    disconnect_tx: broadcast::Sender<String>,
871}
872
873impl Actor {
874    /// Run loop
875    async fn run(
876        mut self,
877        mut ws_read: WsReader,
878        initial_subs: Vec<SubscriptionRequest>,
879    ) {
880        // Send initial subscriptions
881        if !initial_subs.is_empty() {
882            if let Err(e) = self.send_subscribe(&initial_subs).await {
883                error!("Initial subscription failed: {e}");
884                return;
885            }
886            self.subscriptions = initial_subs;
887        }
888
889        // Emit Connected so handlers can react immediately.
890        self.emit(Topic::Status, &Event::Connected);
891
892        // Use a labelled loop so every exit path carries an explicit reason.
893        let disconnect_reason: String = 'actor: loop {
894            tokio::select! {
895                // Inbound WebSocket message
896                msg = ws_read.next() => {
897                    match msg {
898                        Some(Ok(Message::Text(text))) => {
899                            debug!("msg {}: {}", text.len(), &text[0..512.min(text.len())]);
900                            match serde_json::from_str::<Value>(&text) {
901                                Ok(data) => self.handle_message(data, &text).await,
902                                Err(e) => error!("JSON decode error: {e}"),
903                            }
904                        }
905                        Some(Ok(Message::Close(_))) => {
906                            warn!("WebSocket closed by server");
907                            break 'actor "server closed the connection".into();
908                        }
909                        Some(Err(e)) => {
910                            error!("WebSocket read error: {e}");
911                            break 'actor format!("WebSocket read error: {e}");
912                        }
913                        None => {
914                            warn!("WebSocket stream ended");
915                            break 'actor "WebSocket stream ended".into();
916                        }
917                        _ => {} // Ping/Pong handled by tungstenite
918                    }
919                }
920
921                // Command from client handle
922                cmd = self.cmd_rx.recv() => {
923                    match cmd {
924                        Some(Command::Subscribe(subs)) => {
925                            if let Err(e) = self.send_subscribe(&subs).await {
926                                error!("Subscription send error: {e}");
927                            }
928                            self.subscriptions.extend(subs);
929                        }
930
931                        Some(Command::Tx { request_id, json, respond }) => {
932                            self.pending.insert(request_id, respond);
933                            if let Err(e) = self.ws_send_text(&json).await {
934                                error!("Order send error: {e}");
935                                if let Some(tx) = self.pending.remove(&request_id) {
936                                    let _ = tx.send(Err(e));
937                                }
938                            }
939                        }
940
941                        Some(Command::AsyncTx { json}) => {
942                            if let Err(e) = self.ws_send_text(&json).await {
943                                error!("Order send error: {e}");
944                            }
945                        }
946
947                        Some(Command::SendRaw(json)) => {
948                            if let Err(e) = self.ws_send_text(&json).await {
949                                error!("Raw send error: {e}");
950                            }
951                        }
952
953                        Some(Command::GetOrders { symbol, respond }) => {
954                            let orders = match symbol {
955                                Some(s) => self.account_state.open_orders
956                                    .values()
957                                    .filter(|o| o.symbol == s)
958                                    .cloned()
959                                    .collect(),
960                                None => self.account_state.open_orders.values().cloned().collect(),
961                            };
962                            let _ = respond.send(orders);
963                        }
964
965                        Some(Command::Shutdown) | None => {
966                            info!("Actor shutting down (requested)");
967                            break 'actor "shutdown requested".into();
968                        }
969                    }
970                }
971            }
972        }; // end 'actor loop
973
974        self.handle_disconnect(disconnect_reason).await;
975    }
976
977    /// Shared teardown called from every exit path in `run()`.
978    ///
979    /// Order of operations:
980    /// 1. Flip `connected` flag so callers see `is_connected() == false` immediately.
981    /// 2. Fail every in-progress `place_orders` that is waiting on a oneshot response.
982    /// 3. Emit `Event::Disconnected` on `Topic::Status` so registered handlers fire.
983    /// 4. Broadcast the reason string to all `subscribe_disconnect()` receivers
984    ///    — this is the poison pill for any other spawned tasks.
985    /// 5. Close the WebSocket write half.
986    async fn handle_disconnect(&mut self, reason: String) {
987        // 1. Mark disconnected — visible to all handles immediately.
988        self.connected.store(false, Ordering::Release);
989
990        // 2. Fail every pending order response.
991        let err_msg = format!("disconnected: {reason}");
992        for (_, tx) in self.pending.drain() {
993            let _ = tx.send(Err(eyre::eyre!("{}", err_msg)));
994        }
995
996        // 3. Emit the Disconnected event to registered handlers.
997        self.emit(Topic::Status, &Event::Disconnected(reason.clone()));
998
999        // 4. Broadcast reason as poison pill (best-effort; ignore no-receivers).
1000        let _ = self.disconnect_tx.send(reason.clone());
1001
1002        // 5. Close WS write half (ignore error — connection may already be gone).
1003        let _ = self.ws_write.close().await;
1004
1005        info!("Actor stopped: {reason}");
1006    }
1007
1008    /// WS send
1009    async fn ws_send_text(&mut self, text: &str) -> eyre::Result<()> {
1010        let len = text.len();
1011        debug!("sending msg len: {}", len);
1012        self.ws_write
1013            .send(Message::Text(text.into()))
1014            .await
1015            .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1016        Ok(())
1017    }
1018
1019    // ─────────────────────────────────────────────────────────────────────
1020    // Message dispatch
1021    // ─────────────────────────────────────────────────────────────────────
1022
1023    async fn handle_message(&mut self, data: Value, json: &str) {
1024        let msg_type = data["type"].as_str().unwrap_or("");
1025
1026        match msg_type {
1027            "subscriptionResponse" => {
1028                info!(
1029                    "Subscription confirmed: {:?}",
1030                    data["topics"].as_array().map(|a| a.len())
1031                );
1032            }
1033
1034            "ticker" => {
1035                let ticker_v = &data["data"]["ticker"];
1036                if let Ok(ticker) = serde_json::from_value::<Ticker>(ticker_v.clone()) {
1037                    self.prices.insert(ticker.symbol.clone(), ticker.mark_price);
1038                    self.tickers.insert(ticker.symbol.clone(), ticker.clone());
1039
1040                    // Push watch updates
1041                    let _ = self.ticker_tx.send(self.tickers.clone());
1042
1043                    self.emit(Topic::Ticker, &Event::Ticker(ticker.clone()));
1044                    debug!("Ticker: {} mark={:.2}", ticker.symbol, ticker.mark_price);
1045                } else {
1046                    error!("Could not parse ticker event: {:?}", ticker_v);
1047                }
1048            }
1049
1050            "trades" => {
1051                if let Ok(trades) = serde_json::from_value::<Vec<Fill>>(data["data"].clone()) {
1052                    self.emit(Topic::Trades, &Event::Trades(trades));
1053                } else {
1054                    error!("Could not parse trades event: {:?}", data["data"]);
1055                }
1056            }
1057
1058            "l2Snapshot" => {
1059                if let Ok(l2_snapshot) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1060                    self.emit(Topic::L2Snapshot, &Event::L2Snapshot(l2_snapshot));
1061                } else {
1062                    error!("Could not parse l2_snapshot event: msg: {:?}", data["data"]);
1063                }
1064            }
1065
1066            "l2Delta" => {
1067                if let Ok(l2_delta) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1068                    self.emit(Topic::L2Delta, &Event::L2Delta(l2_delta));
1069                } else {
1070                    error!("Could not parse l2_delta event: {:?}", data["data"]);
1071                }
1072            }
1073
1074            "candle" => {
1075                if let Ok(candle) = serde_json::from_value::<Candle>(data["data"].clone()) {
1076                    self.emit(Topic::Candle, &Event::Candle(candle));
1077                } else {
1078                    error!("Could not parse candle event: {:?}", data["data"]);
1079                }
1080            }
1081
1082            "account" => {
1083                self.handle_account(&data["data"]).await;
1084            }
1085
1086            "post" => {
1087                self.handle_post_response(&data, json);
1088            }
1089
1090            other => {
1091                debug!("Unhandled message type: {other}");
1092            }
1093        }
1094    }
1095
1096    // ─────────────────────────────────────────────────────────────────────
1097    // Account updates
1098    // ─────────────────────────────────────────────────────────────────────
1099
1100    async fn handle_account(&mut self, data: &Value) {
1101        let update_type = data["type"].as_str().unwrap_or("");
1102
1103        match update_type {
1104            "accountSnapshot" => {
1105                if let Ok(margin) = serde_json::from_value::<Margin>(data["margin"].clone()) {
1106                    self.account_state.margin = margin.clone();
1107                    self.emit(Topic::Margin, &Event::Margin(margin))
1108                }
1109
1110                if let Ok(positions) = serde_json::from_value::<Vec<PositionInfo>>(data["positions"].clone()) {
1111                    for position in &positions {
1112                        self.emit(Topic::Position, &Event::Position(position.clone()))
1113                    }
1114                    self.account_state.positions = positions
1115                        .into_iter()
1116                        .map(|p| (p.symbol.clone(), p.clone()))
1117                        .collect();
1118                }
1119
1120                if let Ok(orders) = serde_json::from_value::<Vec<OrderState>>(data["openOrders"].clone()) {
1121                    for order in &orders {
1122                        self.emit(Topic::Order, &Event::Order(order.clone()))
1123                    }
1124                    self.account_state.open_orders = orders
1125                        .into_iter()
1126                        .map(|o| (o.order_id.clone(), o))
1127                        .collect();
1128                }
1129
1130                if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverageSettings"].clone()) {
1131                    self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1132                    for l in leverages {
1133                        self.account_state.leverage_settings.insert(l.symbol.clone(), l);
1134                    }
1135                }
1136
1137                info!(
1138                    "Account snapshot: balance={:.2}, positions={}, orders={}",
1139                    self.account_state.margin.total_balance,
1140                    self.account_state.positions.len(),
1141                    self.account_state.open_orders.len(),
1142                );
1143            }
1144
1145            "orderUpdate" => {
1146                if let Ok(order) = serde_json::from_value::<OrderState>(data.clone()) {
1147                    let oid = order.order_id.clone();
1148                    if order.status.is_terminal() {
1149                        self.account_state.open_orders.remove(&oid);
1150                    } else {
1151                        self.account_state.open_orders.insert(oid, order.clone());
1152                    }
1153                    self.emit(Topic::Order, &Event::Order(order));
1154                } else {
1155                    error!("Could not parse order event: {:?}", data);
1156                }
1157            }
1158
1159            "marginUpdate" => {
1160                if let Ok(margin) = serde_json::from_value::<Margin>(data.clone()) {
1161                    self.account_state.margin = margin.clone();
1162                    self.publish_account();
1163                    self.emit(Topic::Margin, &Event::Margin(margin));
1164                } else {
1165                    error!("Could not parse margin event: {:?}", data);
1166                }
1167            }
1168
1169            "positionUpdate" => {
1170                if let Ok(pos) = serde_json::from_value::<PositionInfo>(data.clone()) {
1171                    self.account_state.positions.insert(pos.symbol.clone(), pos.clone());
1172                    self.emit(Topic::Position, &Event::Position(pos));
1173                    self.publish_account();
1174                } else {
1175                    error!("Could not parse position event: {:?}", data);
1176                }
1177            }
1178
1179            "fill" => {
1180                if let Ok(fill) = serde_json::from_value::<Fill>(data.clone()) {
1181                    let dir = fill.side.dir();
1182                    if let Some(order) = self.account_state.open_orders.get_mut(&fill.order_id) {
1183                        order.filled_size += fill.size;
1184                        order.signed_size -= dir * fill.size;
1185                        if order.signed_size * dir <= 0.0 {
1186                            self.account_state.open_orders.remove(&fill.order_id);
1187                        }
1188                    }
1189                    self.publish_account();
1190                    self.emit(Topic::Fill, &Event::Fill(fill.clone()));
1191                    info!(
1192                        "Fill: {} {:?} {} @ {} maker={}",
1193                        fill.symbol, fill.side, fill.size, fill.price, fill.is_maker,
1194                    );
1195                } else {
1196                    error!("Could not parse fill event: {:?}", data);
1197                }
1198            }
1199
1200            "leverageUpdate" => {
1201                if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverage"].clone()) {
1202                    for lev in &leverages {
1203                        self.account_state.leverage_settings.insert(lev.symbol.clone(), lev.clone());
1204                    }
1205                    self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1206                    self.publish_account();
1207                } else {
1208                    error!("Could not parse leverage event: {:?}", data);
1209                }
1210            }
1211
1212            _ => {
1213                debug!("Unknown account update: {update_type}");
1214            }
1215        }
1216    }
1217
1218    // ─────────────────────────────────────────────────────────────────────
1219    // Post (order) response
1220    // ─────────────────────────────────────────────────────────────────────
1221
1222    fn handle_post_response(&mut self, data: &Value, _json: &str) {
1223        let request_id = data["id"].as_u64().unwrap_or(0);
1224        let inner = &data["data"];
1225        let rtype = inner["type"].as_str().unwrap_or("");
1226        let sender = self.pending.remove(&request_id);
1227
1228        match rtype {
1229            "action"=> {
1230                let payload = &inner["payload"];
1231                let status = payload["status"].as_str().unwrap_or("");
1232
1233                if status != "ok" {
1234                    error!("Order request {request_id} failed: {status}");
1235                    if let Some(tx) = sender {
1236                        let _ = tx.send(Err(eyre::eyre!("order request failed: {}", data)));
1237                    }
1238                    self.emit(Topic::Error, &Event::Error(data.clone()));
1239                } else {
1240                    let responses = Response::parse_responses(data);
1241                    if let Some(tx) = sender {
1242                        let _ = tx.send(Ok(responses));
1243                    }
1244                }
1245            }
1246            "ack" => {
1247                let ok = inner["ok"].as_bool().unwrap_or(false);
1248                let response = if ok {
1249                    Response {
1250                        order_id: None,
1251                        status: "OK".to_string(),
1252                        message: None,
1253                        raw: inner.clone(),
1254                    }
1255                } else {
1256                    let message = inner["message"].as_str().unwrap_or("");
1257                    Response {
1258                        order_id: None,
1259                        status: "Error".to_string(),
1260                        message: Some(message.to_string()),
1261                        raw: inner.clone(),
1262                    }
1263                };
1264                if let Some(tx) = sender {
1265                    let _ = tx.send(Ok(vec![response]));
1266                }
1267            }
1268            _ => panic!("unknown response type: {}", rtype),
1269        }
1270    }
1271
1272    // ─────────────────────────────────────────────────────────────────────
1273    // Helpers
1274    // ─────────────────────────────────────────────────────────────────────
1275
1276    /// Publish the current account state snapshot to the watch channel.
1277    fn publish_account(&self) {
1278        let _ = self.account_tx.send(self.account_state.clone());
1279    }
1280
1281    /// Fire all handlers registered for `topic`.
1282    fn emit(&self, topic: Topic, data: &Event) {
1283        let _ = self.event_tx.try_send((topic, data.clone()));
1284    }
1285
1286    /// Send a JSON value over the WebSocket.
1287    async fn ws_send_json(&mut self, value: &Value) -> eyre::Result<()> {
1288        let text = serde_json::to_string(value)?;
1289        self.ws_write
1290            .send(Message::Text(text.into()))
1291            .await
1292            .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1293        Ok(())
1294    }
1295
1296    /// Send subscription request(s) over the WebSocket.
1297    async fn send_subscribe(&mut self, subs: &[SubscriptionRequest]) -> eyre::Result<()> {
1298        let request = json!({
1299            "method": "subscribe",
1300            "subscription": subs.iter().map(|s| s.to_json()).collect::<Vec<_>>(),
1301        });
1302        self.ws_send_json(&request).await?;
1303        info!("Subscribed to {} topics", subs.len());
1304        Ok(())
1305    }
1306}