Skip to main content

bulk_client/api/
bulk_ws.rs

1//! Bulk Labs WebSocket Trading Client — Actor + Watch architecture.
2//!
3//! All mutable state lives inside a single [`Actor`] task. The public
4//! [`BulkWsClient`] handle is a cheap, cloneable struct that communicates
5//! with the actor via:
6//!
7//! - **`tokio::sync::watch`** for hot-path reads (tickers, prices, margin)
8//!   — zero-cost `.borrow()`, no async round-trip.
9//! - **`mpsc` command channel** for writes (subscribe, place orders, etc.).
10//! - **`oneshot`** for request/response flows (order placement).
11//!
12//! ```text
13//!  ┌──────────────┐         mpsc::channel           ┌───────────────┐
14//!  │ BulkWsClient │ ───── Command ────────────────▶ │     Actor     │
15//!  │   (handle)   │ ◀──── watch::Receiver ────────  │  (owns state) │
16//!  └──────────────┘                                 └───────┬───────┘
17//!        │                                                  │
18//!        │ oneshot for order responses                      │ tokio::select!
19//!        └─────────────────────────────────────────────────▶│◀── ws_read
20//! ```
21//!
22//! # Example
23//!
24//! ```rust,no_run
25//! use bulk_client::*;
26//! use bulk_client::common::side::Side;
27//! use bulk_client::common::tif::TimeInForce;
28//! use bulk_client::transaction::TransactionSigner;
29//! use bulk_client::parts::WSConfig;
30//!
31//! #[tokio::main]
32//! async fn main() -> eyre::Result<()> {
33//!     let signer = TransactionSigner::from_private_key("your_base58_key")?;
34//!
35//!     let client = BulkWsClient::connect(WSConfig {
36//!         url: "wss://exchange-wss.bulk.trade".into(),
37//!         symbols: vec!["BTC-USD".into(), "ETH-USD".into()],
38//!         signer: Some(signer),
39//!         ..Default::default()
40//!     }).await?;
41//!
42//!     // Zero-cost read — no lock, no channel round-trip
43//!     if let Some(ticker) = client.get_ticker("BTC-USD") {
44//!         println!("BTC mark price: {}", ticker.mark_price);
45//!     }
46//!
47//!     // Place an order — goes through actor → ws
48//!     let resp = client.place_limit_order(
49//!         "BTC-USD", Side::Buy, 95_000.0, 0.01,
50//!         TimeInForce::GTC, false, None, None,
51//!     ).await?;
52//!
53//!     client.shutdown().await;
54//!     Ok(())
55//! }
56//! ```
57
58// ─────────────────────────────────────────────────────────────────────────────
59// Topic enum (mirrors topics.py)
60// ─────────────────────────────────────────────────────────────────────────────
61
62use std::collections::HashMap;
63use std::str::FromStr;
64use std::sync::{Arc, Mutex};
65use std::sync::atomic::{AtomicBool, Ordering};
66use std::time::Duration;
67use eyre::bail;
68use serde_json::{json, Value};
69use crate::msgs::account::{Fill, LeverageSetting, Margin, OrderState, PositionInfo};
70use crate::msgs::responses::Response;
71use crate::msgs::subscription::SubscriptionRequest;
72
73use futures_util::{SinkExt, StreamExt};
74use futures_util::stream::SplitSink;
75use serde::Deserialize;
76use solana_hash::Hash;
77use solana_pubkey::Pubkey;
78use tokio::net::TcpStream;
79use tokio::sync::{broadcast, mpsc, oneshot, watch};
80use tokio::time;
81use tokio_tungstenite::{
82    connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
83};
84use tracing::{debug, error, info, warn};
85use crate::api::parts::command::Command;
86use crate::api::parts::config::WSConfig;
87use crate::api::parts::{make_nonce, Event, Topic};
88use crate::common::side::Side;
89use crate::common::tif::TimeInForce;
90use crate::msgs::{CancelAll, CancelOrder, LimitOrder, MarketOrder, Price};
91use crate::msgs::md::{Candle, L2Snapshot, Ticker};
92use crate::transaction::{Action, ActionMeta, Transaction, TransactionSigner};
93// ─────────────────────────────────────────────────────────────────────────────
94// Snapshot: the full state picture pushed over a single watch channel
95// ─────────────────────────────────────────────────────────────────────────────
96
97/// Everything a reader might want, exposed as one cheap clone via `watch`.
98/// The actor publishes a new snapshot after every state-mutating message.
99#[derive(Debug, Clone, Default, Deserialize)]
100#[allow(unused)]
101pub struct AccountState {
102    pub margin: Margin,
103    pub positions: HashMap<String, PositionInfo>,
104    pub open_orders: HashMap<String, OrderState>,
105    pub leverage_settings: HashMap<String, LeverageSetting>,
106}
107
108// ═════════════════════════════════════════════════════════════════════════════
109// Event callback
110// ═════════════════════════════════════════════════════════════════════════════
111
112/// User-supplied callback. Receives the raw JSON payload for the topic.
113/// Runs synchronously inside the actor loop — keep it fast or spawn.
114#[allow(unused)]
115pub type EventHandler = Box<dyn Fn(&Event) + Send + Sync>;
116
117// ═════════════════════════════════════════════════════════════════════════════
118// BulkWsClient  —  the public handle (cheap clone, no locks)
119// ═════════════════════════════════════════════════════════════════════════════
120
121/// Cloneable client handle.
122///
123/// - **Hot reads** (ticker, price, margin): `watch::Receiver::borrow()` — zero
124///   async overhead, just a ref-counted pointer swap.
125/// - **Writes** (subscribe, place orders): go through the `mpsc` command
126///   channel to the actor, which serializes all mutations.
127/// - **Cold reads** (open orders list): round-trip through the actor via
128///   `oneshot` — still fast, but async.
129#[allow(unused)]
130#[derive(Clone)]
131pub struct BulkWsClient {
132    // Command channel to the actor
133    cmd_tx: mpsc::Sender<Command>,
134    // Event handlers
135    handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>>,
136
137    // ── Watch receivers (hot-path, lock-free) ──────────────────────────
138    /// Per-symbol ticker snapshots.
139    ticker_rx: watch::Receiver<HashMap<String, Ticker>>,
140    /// Consolidated account state (margin, positions, orders, leverage).
141    account_rx: watch::Receiver<AccountState>,
142
143    // ── Config carried on the handle for convenience ───────────────────
144    signer: Option<TransactionSigner>,
145    default_timeout: Duration,
146
147    // Monotonic request ID (atomic — no lock needed)
148    next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
149
150    // Actor join handle (held in Arc so Clone works)
151    actor_handle: std::sync::Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>>,
152
153    /// `true` while the WebSocket actor is running.  Flipped to `false` the
154    /// moment the actor begins disconnect teardown.  Cheap sync check, no await.
155    connected: Arc<AtomicBool>,
156
157    /// Fires once (with the disconnect reason) when the actor exits.
158    /// Clone a receiver via [`BulkWsClient::subscribe_disconnect`] so that
159    /// any spawned task can unblock immediately on connection loss.
160    disconnect_tx: broadcast::Sender<String>,
161}
162
163#[allow(unused)]
164impl BulkWsClient {
165    // ─────────────────────────────────────────────────────────────────────
166    // Construction + connection
167    // ─────────────────────────────────────────────────────────────────────
168
169    /// Connect to the exchange and spawn the actor task.
170    /// Returns immediately once the WebSocket handshake succeeds.
171    ///
172    /// # Arguments
173    /// - `config`: web socket config
174    pub async fn connect(config: WSConfig) -> eyre::Result<Self> {
175        info!("Connecting to {}", config.url);
176        let (ws_stream, _) = connect_async(&config.url).await?;
177        let (ws_write, ws_read) = ws_stream.split();
178        info!("Connected to Bulk Exchange WebSocket");
179
180        // Watch channels
181        let (ticker_tx, ticker_rx) = watch::channel(HashMap::new());
182        let (account_tx, account_rx) = watch::channel(AccountState::default());
183
184        // Command channel (bounded — back-pressure if actor falls behind)
185        let (cmd_tx, cmd_rx) = mpsc::channel::<Command>(512);
186
187        // Shared handler map between the dispatch task and BulkWsClient (for on() registration)
188        let handlers: Arc<Mutex<HashMap<Topic, Vec<EventHandler>>>> = Arc::default();
189        let handlers_task = Arc::clone(&handlers);
190        let (event_tx, mut event_rx) = mpsc::channel::<(Topic, Event)>(32768);
191
192        tokio::spawn(async move {
193            while let Some((topic, event)) = event_rx.recv().await {
194                let map = handlers_task.lock().unwrap();
195                if let Some(hs) = map.get(&topic) {
196                    for h in hs {
197                        h(&event);
198                    }
199                }
200            }
201        });
202
203        // Build actor
204        let connected = Arc::new(AtomicBool::new(true));
205        let (disconnect_tx, _) = broadcast::channel::<String>(4);
206
207        let actor = Actor {
208            ws_write,
209            event_tx,
210            cmd_rx,
211            ticker_tx,
212            account_tx,
213            tickers: HashMap::new(),
214            prices: HashMap::new(),
215            account_state: AccountState::default(),
216            pending: HashMap::new(),
217            subscriptions: Vec::new(),
218            connected: Arc::clone(&connected),
219            disconnect_tx: disconnect_tx.clone(),
220        };
221
222        // Default subscriptions (same as Python __init__)
223        let mut initial_subs = Vec::new();
224        if config.track_account {
225            if let Some(ref signer) = config.signer {
226                let pk_str = signer.public_key_b58();
227                initial_subs.push(SubscriptionRequest::new(
228                    "account",
229                    json!({ "user": pk_str }),
230                ));
231            }
232        }
233        if config.track_ticker {
234            for sym in &config.symbols {
235                initial_subs.push(SubscriptionRequest::new(
236                    "ticker",
237                    json!({ "symbol": sym }),
238                ));
239            }
240        }
241
242        // Spawn actor
243        let actor_handle = tokio::spawn(actor.run(ws_read, initial_subs));
244
245        Ok(Self {
246            cmd_tx,
247            handlers,
248            ticker_rx,
249            account_rx,
250            signer: config.signer,
251            default_timeout: config.default_timeout,
252            next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
253            actor_handle: std::sync::Arc::new(tokio::sync::Mutex::new(Some(actor_handle))),
254            connected,
255            disconnect_tx,
256        })
257    }
258
259    /// Shut down the actor and close the WebSocket.
260    pub async fn shutdown(&self) {
261        let _ = self.cmd_tx.send(Command::Shutdown).await;
262        if let Some(h) = self.actor_handle.lock().await.take() {
263            let _ = h.await;
264        }
265    }
266
267    /// Wait for the actor to exit (e.g. on disconnect / error).
268    pub async fn closed(&self) {
269        if let Some(h) = self.actor_handle.lock().await.take() {
270            let _ = h.await;
271        }
272    }
273
274    /// Returns `true` if the WebSocket actor is still running.
275    ///
276    /// This is a cheap, lock-free check — safe to call in hot loops.
277    /// Once it returns `false` you must call [`BulkWsClient::connect`] again
278    /// to establish a new connection.
279    pub fn is_connected(&self) -> bool {
280        self.connected.load(Ordering::Relaxed)
281    }
282
283
284    // ─────────────────────────────────────────────────────────────────────
285    // Hot-path reads (zero-cost — no async, no lock)
286    // ─────────────────────────────────────────────────────────────────────
287
288    /// Get latest ticker for `symbol`, or `None` if not yet received.
289    ///
290    /// # Arguments
291    /// - `symbol`: symbol to retrieve for
292    ///
293    /// # Returns
294    /// - current ticker if available
295    pub fn get_ticker(&self, symbol: &str) -> Option<Ticker> {
296        self.ticker_rx.borrow().get(symbol).cloned()
297    }
298
299    /// Get Latest mark price for `symbol`.
300    ///
301    /// # Arguments
302    /// - `symbol`: symbol to retrieve for
303    ///
304    /// # Returns
305    /// - current price if available
306    pub fn get_price(&self, symbol: &str) -> Option<f64> {
307        self.ticker_rx.borrow().get(symbol).map(|x| x.mark_price)
308    }
309
310    /// All current tickers, keyed by symbol.
311    ///
312    /// # Returns
313    /// - all current tickers
314    pub fn get_tickers(&self) -> HashMap<String, Ticker> {
315        self.ticker_rx.borrow().clone()
316    }
317
318    /// Get Current account margin.
319    pub fn get_margin(&self) -> Margin {
320        self.account_rx.borrow().margin.clone()
321    }
322
323    /// Get current position for `symbol`.
324    ///
325    /// # Arguments
326    /// - `symbol`: symbol to retrieve for
327    ///
328    /// # Returns
329    /// - current position for symbol if available
330    pub fn get_position(&self, symbol: &str) -> Option<PositionInfo> {
331        self.account_rx.borrow().positions.get(symbol).cloned()
332    }
333
334    /// Get all positions.
335    pub fn get_positions(&self) -> HashMap<String, PositionInfo> {
336        self.account_rx.borrow().positions.clone()
337    }
338
339    /// Current leverage setting for `symbol`.
340    ///
341    /// # Arguments
342    /// - `symbol`: symbol to retrieve for
343    ///
344    /// # Returns
345    /// - current leverage if available
346    pub fn get_leverage(&self, symbol: &str) -> Option<f64> {
347        self.account_rx
348            .borrow()
349            .leverage_settings
350            .get(symbol)
351            .map(|l| l.leverage)
352    }
353
354    // ─────────────────────────────────────────────────────────────────────
355    // Async reads that wait for changes
356    // ─────────────────────────────────────────────────────────────────────
357
358    /// Block until any ticker changes, then return the updated map.
359    pub async fn wait_tickers_changed(&mut self) -> eyre::Result<HashMap<String, Ticker>> {
360        self.ticker_rx.changed().await?;
361        Ok(self.ticker_rx.borrow().clone())
362    }
363
364    /// Block until account state changes (margin, positions, orders, leverage).
365    pub async fn wait_account_changed(&mut self) -> eyre::Result<AccountState> {
366        self.account_rx.changed().await?;
367        Ok(self.account_rx.borrow().clone())
368    }
369
370    // ─────────────────────────────────────────────────────────────────────
371    // Cold reads (round-trip through actor)
372    // ─────────────────────────────────────────────────────────────────────
373
374    /// Open orders, optionally filtered by symbol.
375    ///
376    /// # Arguments
377    /// - `symbol`: optional symbol to retrieve for
378    ///
379    /// # Returns
380    /// - current orders or order status
381    pub async fn open_orders(&self, symbol: Option<&str>) -> eyre::Result<Vec<OrderState>> {
382        let (tx, rx) = oneshot::channel();
383        self.cmd_tx
384            .send(Command::GetOrders {
385                symbol: symbol.map(Into::into),
386                respond: tx,
387            })
388            .await
389            .map_err(|_| eyre::eyre!("actor gone"))?;
390        Ok(rx.await?)
391    }
392
393    // ─────────────────────────────────────────────────────────────────────
394    // Order placement
395    // ─────────────────────────────────────────────────────────────────────
396
397    /// Place one or more actions (limit, market, cancel, cancel-all).
398    /// Signs the bundle, sends through the actor, and awaits the exchange
399    /// response with the configured timeout.
400    ///
401    /// # Arguments
402    /// - `actions`: list of orders, cancels, etc
403    /// - `nonce`: nonce to be used
404    ///
405    /// # Returns
406    /// - list of responses
407    pub async fn place_orders(
408        &self,
409        actions: Vec<Action>,
410        account: Option<Pubkey>,
411        nonce: Option<u64>,
412    ) -> eyre::Result<Vec<Response>> {
413        let signer = self
414            .signer
415            .as_ref()
416            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
417
418        let account = if let Some(account) = account {
419            account
420        } else {
421            signer.public_key()
422        };
423
424        let nonce = nonce.unwrap_or_else(make_nonce);
425        let pk = signer.public_key();
426
427        // Build + sign the transaction
428        let mut tx = Transaction {
429            actions,
430            nonce,
431            account,
432            signer: signer.public_key(),
433            signature: Default::default(),
434        };
435        tx.sign(signer)?;
436
437        let request_id = self
438            .next_request_id
439            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
440
441        // Build JSON body via tx serialization
442        let body = serde_json::to_string(&tx)?;
443        let json = format!(
444            r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
445            body, request_id
446        );
447
448        let (resp_tx, resp_rx) = oneshot::channel();
449
450        self.cmd_tx
451            .send(Command::Tx {
452                request_id,
453                json,
454                respond: resp_tx,
455            })
456            .await
457            .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))?;
458
459        match time::timeout(self.default_timeout, resp_rx).await {
460            Ok(Ok(result)) => result,
461            Ok(Err(_)) => bail!("response channel dropped"),
462            Err(_) => bail!("order request {request_id} timed out"),
463        }
464    }
465
466    /// Send oracle price updates
467    ///
468    /// # Arguments
469    /// - `actions`: list of oracle update
470    /// - `nonce`: nonce to be used
471    ///
472    /// # Returns
473    /// - list of responses
474    pub async fn update_oracle(
475        &self,
476        actions: Vec<Price>,
477        account: Option<Pubkey>,
478        nonce: Option<u64>,
479    ) -> eyre::Result<()> {
480        let signer = self
481            .signer
482            .as_ref()
483            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
484
485        let account = if let Some(account) = account {
486            account
487        } else {
488            signer.public_key()
489        };
490
491        let nonce = nonce.unwrap_or_else(make_nonce);
492
493        // Build + sign the transaction
494        let mut tx = Transaction {
495            actions: actions.iter().map(|a| a.clone().into()).collect(),
496            nonce,
497            account,
498            signer: signer.public_key(),
499            signature: Default::default(),
500        };
501        tx.sign(signer)?;
502
503        let request_id = self
504            .next_request_id
505            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
506
507        // Build JSON body via tx serialization
508        let body = serde_json::to_string(&tx)?;
509        let json = format!(
510            r#"{{"method":"post","request":{{"type":"action","payload":{}}},"id":{}}}"#,
511            body, request_id
512        );
513
514        self.cmd_tx
515            .send(Command::AsyncTx {
516                json,
517            })
518            .await
519            .map_err(|_| eyre::eyre!("client is disconnected — call connect() to reconnect"))
520
521    }
522
523    // ── Convenience wrappers ─────────────────────────────────────────────
524
525    /// Place limit order
526    ///
527    /// # Arguments
528    /// - `symbol`: which market to execute in
529    /// - `side`: buy or sell
530    /// - `price`: limit price
531    /// - `size`: order size
532    /// - `tif`: time in force
533    /// - `reduce_only`: true if order is reduce only
534    ///
535    /// # Returns
536    /// - response for order placement
537    pub async fn place_limit_order(
538        &self,
539        symbol: &str,
540        side: Side,
541        price: f64,
542        size: f64,
543        tif: TimeInForce,
544        reduce_only: bool,
545        account: Option<Pubkey>,
546        nonce: Option<u64>,
547    ) -> eyre::Result<Response> {
548        let signer = self
549            .signer
550            .as_ref()
551            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
552
553        let account = if let Some(account) = account {
554            account
555        } else {
556            signer.public_key()
557        };
558
559        let nonce = nonce.unwrap_or_else(make_nonce);
560        let order = LimitOrder {
561            symbol: Arc::from(symbol),
562            is_buy: side == Side::Buy,
563            price,
564            size,
565            tif,
566            reduce_only,
567            iso: false,
568            meta: ActionMeta {
569                account,
570                nonce,
571                seqno: 0,
572                hash: None,
573            }
574        };
575        let resps = self.place_orders(vec![order.into()], None, None).await?;
576        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
577    }
578
579    /// Place market order
580    ///
581    /// # Arguments
582    /// - `symbol`: which market to execute in
583    /// - `side`: buy or sell
584    /// - `size`: order size
585    /// - `reduce_only`: true if order is reduce only
586    ///
587    /// # Returns
588    /// - response for order placement
589    pub async fn place_market_order(
590        &self,
591        symbol: &str,
592        side: Side,
593        size: f64,
594        reduce_only: bool,
595        account: Option<Pubkey>,
596        nonce: Option<u64>,
597    ) -> eyre::Result<Response> {
598        let signer = self
599            .signer
600            .as_ref()
601            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
602
603        let account = if let Some(account) = account {
604            account
605        } else {
606            signer.public_key()
607        };
608
609        let nonce = nonce.unwrap_or_else(make_nonce);
610        let order = MarketOrder {
611            symbol: Arc::from(symbol),
612            is_buy: side == Side::Buy,
613            size,
614            reduce_only,
615            iso: false,
616            meta: ActionMeta {
617                account,
618                nonce,
619                seqno: 0,
620                hash: None,
621            }
622        };
623
624        let resps = self.place_orders(vec![order.into()], None, None).await?;
625        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
626    }
627
628    /// Cancel order
629    ///
630    /// # Arguments
631    /// - `symbol`: which market to execute in
632    /// - `order_id`: order ID to cancel
633    ///
634    /// # Returns
635    /// - response for order cancel
636    pub async fn cancel_order(
637        &self,
638        symbol: &str,
639        order_id: &str,
640        account: Option<Pubkey>,
641        nonce: Option<u64>,
642    ) -> eyre::Result<Response> {
643        let signer = self
644            .signer
645            .as_ref()
646            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
647
648        let account = if let Some(account) = account {
649            account
650        } else {
651            signer.public_key()
652        };
653
654        let nonce = nonce.unwrap_or_else(make_nonce);
655        let cancel = CancelOrder {
656            symbol: symbol.to_string(),
657            oid: Hash::from_str(&order_id)?,
658            meta: ActionMeta {
659                account,
660                nonce,
661                seqno: 0,
662                hash: None,
663            }
664        };
665
666        let resps = self.place_orders(vec![cancel.into()], None, None).await?;
667        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
668    }
669
670    /// Cancel all order
671    ///
672    /// # Arguments
673    /// - `symbols`: which symbols to cancel
674    ///
675    /// # Returns
676    /// - response for order cancel
677    pub async fn cancel_all(
678        &self, 
679        symbols: Vec<String>,
680        account: Option<Pubkey>,
681        nonce: Option<u64>,
682    ) -> eyre::Result<Response> {
683        let signer = self
684            .signer
685            .as_ref()
686            .ok_or_else(|| eyre::eyre!("Private key required for trading operations"))?;
687
688        let account = if let Some(account) = account {
689            account
690        } else {
691            signer.public_key()
692        };
693
694        let nonce = nonce.unwrap_or_else(make_nonce);
695        let cancel = CancelAll {
696            symbols,
697            meta: ActionMeta {
698                account,
699                nonce,
700                seqno: 0,
701                hash: None,
702            }
703        };
704        let resps = self.place_orders(vec![cancel.into()], None, None).await?;
705        resps.into_iter().next().ok_or_else(|| eyre::eyre!("empty response"))
706    }
707
708
709    // ─────────────────────────────────────────────────────────────────────
710    // Subscriptions
711    // ─────────────────────────────────────────────────────────────────────
712
713    /// Subscribe to disconnect notifications.
714    ///
715    /// The returned receiver fires exactly once, carrying the human-readable
716    /// disconnect reason, when the actor exits for any reason (server close,
717    /// network error, or explicit [`shutdown`]).
718    ///
719    /// Use this as a *poison pill* for any tasks you spawned that should stop
720    /// when the connection is lost:
721    ///
722    /// ```text
723    /// let mut rx = client.subscribe_disconnect();
724    /// tokio::spawn(async move {
725    ///     let _ = rx.recv().await; // blocks until disconnect
726    ///     // clean up your task here
727    /// });
728    /// ```
729    pub fn subscribe_disconnect(&self) -> broadcast::Receiver<String> {
730        self.disconnect_tx.subscribe()
731    }
732
733    /// Subscribe to ticker for `symbol`.
734    ///
735    /// # Arguments
736    /// - `symbol`: symbol to subscrive to
737    pub async fn subscribe_ticker(&self, symbol: &str) -> eyre::Result<()> {
738        self.subscribe(vec![
739            SubscriptionRequest::new("ticker", json!({ "symbol": symbol })),
740        ]).await
741    }
742
743    /// Subscribe to fills for `symbol`.
744    ///
745    /// # Arguments
746    /// - `symbols`: list of symbol to subscribe to
747    pub async fn subscribe_trades(&self, symbols: &[&str]) -> eyre::Result<()> {
748        let subs = symbols
749            .iter()
750            .map(|s| SubscriptionRequest::new("trades", json!({ "symbol": s })))
751            .collect();
752        self.subscribe(subs).await
753    }
754
755    /// Subscribe to L2 snapshots for `symbol`.
756    ///
757    /// # Arguments
758    /// - `symbol`: symbol to subscribe to
759    pub async fn subscribe_l2_snapshot(
760        &self,
761        symbol: &str,
762        nlevels: Option<u32>,
763    ) -> eyre::Result<()> {
764        let mut params = json!({ "symbol": symbol });
765        if let Some(n) = nlevels {
766            params["nlevels"] = json!(n);
767        }
768        self.subscribe(vec![SubscriptionRequest::new("l2Snapshot", params)]).await
769    }
770
771    /// Subscribe to L2 deltas for `symbol`.
772    ///
773    /// # Arguments
774    /// - `symbol`: symbol to subscribe to
775    pub async fn subscribe_l2_delta(&self, symbol: &str) -> eyre::Result<()> {
776        self.subscribe(vec![
777            SubscriptionRequest::new("l2Delta", json!({ "symbol": symbol })),
778        ]).await
779    }
780
781    /// Subscribe to candles for `symbol`.
782    ///
783    /// # Arguments
784    /// - `symbol`: symbol to subscribe to
785    /// - `interval`: bar period ("1min", "5min", ...)
786    pub async fn subscribe_candles(&self, symbol: &str, interval: &str) -> eyre::Result<()> {
787        self.subscribe(vec![SubscriptionRequest::new(
788            "candle",
789            json!({ "symbol": symbol, "interval": interval }),
790        )])
791            .await
792    }
793
794    /// Subscribe list of subscription requests
795    ///
796    /// # Arguments
797    /// - `subs`: subscription list
798    async fn subscribe(&self, subs: Vec<SubscriptionRequest>) -> eyre::Result<()> {
799        self.cmd_tx
800            .send(Command::Subscribe(subs))
801            .await
802            .map_err(|_| eyre::eyre!("actor gone"))?;
803        Ok(())
804    }
805
806    // ─────────────────────────────────────────────────────────────────────
807    // Event handlers
808    // ─────────────────────────────────────────────────────────────────────
809
810    /// Register a callback for a topic. The callback runs synchronously
811    /// inside the actor loop — keep it fast or `tokio::spawn` from within.
812    ///
813    /// # Argument
814    /// - `topic`: topic to subscribe to
815    /// - `handler`: callback for topic
816    pub async fn on(&self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static) {
817        self.handlers.lock().unwrap().entry(topic).or_default().push(Box::new(handler));
818    }
819}
820
821// ═════════════════════════════════════════════════════════════════════════════
822// Actor — owns all mutable state, runs in a single task
823// ═════════════════════════════════════════════════════════════════════════════
824
825type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
826type WsReader = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
827
828struct Actor {
829    // WebSocket write half
830    ws_write: WsWriter,
831    // Event sender
832    event_tx: mpsc::Sender<(Topic,Event)>,
833
834    // Inbound commands from the client handle
835    cmd_rx: mpsc::Receiver<Command>,
836
837    // ── Watch senders (actor pushes, clients read) ─────────────────────
838    ticker_tx: watch::Sender<HashMap<String, Ticker>>,
839    account_tx: watch::Sender<AccountState>,
840
841    // ── Owned state (no locks) ─────────────────────────────────────────
842    tickers: HashMap<String, Ticker>,
843    prices: HashMap<String, f64>,
844    account_state: AccountState,
845    pending: HashMap<u64, oneshot::Sender<eyre::Result<Vec<Response>>>>,
846
847    // Subscription log (for reconnection replay)
848    subscriptions: Vec<SubscriptionRequest>,
849
850    // ── Disconnect signalling ─────────────────────────────────────────────
851    /// Shared flag — flipped to `false` before the `Disconnected` event fires.
852    connected: Arc<AtomicBool>,
853    /// Broadcast once with the disconnect reason when the actor exits.
854    disconnect_tx: broadcast::Sender<String>,
855}
856
857impl Actor {
858    /// Run loop
859    async fn run(
860        mut self,
861        mut ws_read: WsReader,
862        initial_subs: Vec<SubscriptionRequest>,
863    ) {
864        // Send initial subscriptions
865        if !initial_subs.is_empty() {
866            if let Err(e) = self.send_subscribe(&initial_subs).await {
867                error!("Initial subscription failed: {e}");
868                return;
869            }
870            self.subscriptions = initial_subs;
871        }
872
873        // Emit Connected so handlers can react immediately.
874        self.emit(Topic::Status, &Event::Connected);
875
876        // Use a labelled loop so every exit path carries an explicit reason.
877        let disconnect_reason: String = 'actor: loop {
878            tokio::select! {
879                // Inbound WebSocket message
880                msg = ws_read.next() => {
881                    match msg {
882                        Some(Ok(Message::Text(text))) => {
883                            debug!("msg {}: {}", text.len(), &text[0..512.min(text.len())]);
884                            match serde_json::from_str::<Value>(&text) {
885                                Ok(data) => self.handle_message(data, &text).await,
886                                Err(e) => error!("JSON decode error: {e}"),
887                            }
888                        }
889                        Some(Ok(Message::Close(_))) => {
890                            warn!("WebSocket closed by server");
891                            break 'actor "server closed the connection".into();
892                        }
893                        Some(Err(e)) => {
894                            error!("WebSocket read error: {e}");
895                            break 'actor format!("WebSocket read error: {e}");
896                        }
897                        None => {
898                            warn!("WebSocket stream ended");
899                            break 'actor "WebSocket stream ended".into();
900                        }
901                        _ => {} // Ping/Pong handled by tungstenite
902                    }
903                }
904
905                // Command from client handle
906                cmd = self.cmd_rx.recv() => {
907                    match cmd {
908                        Some(Command::Subscribe(subs)) => {
909                            if let Err(e) = self.send_subscribe(&subs).await {
910                                error!("Subscription send error: {e}");
911                            }
912                            self.subscriptions.extend(subs);
913                        }
914
915                        Some(Command::Tx { request_id, json, respond }) => {
916                            self.pending.insert(request_id, respond);
917                            if let Err(e) = self.ws_send_text(&json).await {
918                                error!("Order send error: {e}");
919                                if let Some(tx) = self.pending.remove(&request_id) {
920                                    let _ = tx.send(Err(e));
921                                }
922                            }
923                        }
924
925                        Some(Command::AsyncTx { json}) => {
926                            if let Err(e) = self.ws_send_text(&json).await {
927                                error!("Order send error: {e}");
928                            }
929                        }
930
931                        Some(Command::SendRaw(json)) => {
932                            if let Err(e) = self.ws_send_text(&json).await {
933                                error!("Raw send error: {e}");
934                            }
935                        }
936
937                        Some(Command::GetOrders { symbol, respond }) => {
938                            let orders = match symbol {
939                                Some(s) => self.account_state.open_orders
940                                    .values()
941                                    .filter(|o| o.symbol == s)
942                                    .cloned()
943                                    .collect(),
944                                None => self.account_state.open_orders.values().cloned().collect(),
945                            };
946                            let _ = respond.send(orders);
947                        }
948
949                        Some(Command::Shutdown) | None => {
950                            info!("Actor shutting down (requested)");
951                            break 'actor "shutdown requested".into();
952                        }
953                    }
954                }
955            }
956        }; // end 'actor loop
957
958        self.handle_disconnect(disconnect_reason).await;
959    }
960
961    /// Shared teardown called from every exit path in `run()`.
962    ///
963    /// Order of operations:
964    /// 1. Flip `connected` flag so callers see `is_connected() == false` immediately.
965    /// 2. Fail every in-progress `place_orders` that is waiting on a oneshot response.
966    /// 3. Emit `Event::Disconnected` on `Topic::Status` so registered handlers fire.
967    /// 4. Broadcast the reason string to all `subscribe_disconnect()` receivers
968    ///    — this is the poison pill for any other spawned tasks.
969    /// 5. Close the WebSocket write half.
970    async fn handle_disconnect(&mut self, reason: String) {
971        // 1. Mark disconnected — visible to all handles immediately.
972        self.connected.store(false, Ordering::Release);
973
974        // 2. Fail every pending order response.
975        let err_msg = format!("disconnected: {reason}");
976        for (_, tx) in self.pending.drain() {
977            let _ = tx.send(Err(eyre::eyre!("{}", err_msg)));
978        }
979
980        // 3. Emit the Disconnected event to registered handlers.
981        self.emit(Topic::Status, &Event::Disconnected(reason.clone()));
982
983        // 4. Broadcast reason as poison pill (best-effort; ignore no-receivers).
984        let _ = self.disconnect_tx.send(reason.clone());
985
986        // 5. Close WS write half (ignore error — connection may already be gone).
987        let _ = self.ws_write.close().await;
988
989        info!("Actor stopped: {reason}");
990    }
991
992    /// WS send
993    async fn ws_send_text(&mut self, text: &str) -> eyre::Result<()> {
994        let len = text.len();
995        debug!("sending msg len: {}", len);
996        self.ws_write
997            .send(Message::Text(text.into()))
998            .await
999            .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1000        Ok(())
1001    }
1002
1003    // ─────────────────────────────────────────────────────────────────────
1004    // Message dispatch
1005    // ─────────────────────────────────────────────────────────────────────
1006
1007    async fn handle_message(&mut self, data: Value, json: &str) {
1008        let msg_type = data["type"].as_str().unwrap_or("");
1009
1010        match msg_type {
1011            "subscriptionResponse" => {
1012                info!(
1013                    "Subscription confirmed: {:?}",
1014                    data["topics"].as_array().map(|a| a.len())
1015                );
1016            }
1017
1018            "ticker" => {
1019                let ticker_v = &data["data"]["ticker"];
1020                if let Ok(ticker) = serde_json::from_value::<Ticker>(ticker_v.clone()) {
1021                    self.prices.insert(ticker.symbol.clone(), ticker.mark_price);
1022                    self.tickers.insert(ticker.symbol.clone(), ticker.clone());
1023
1024                    // Push watch updates
1025                    let _ = self.ticker_tx.send(self.tickers.clone());
1026
1027                    self.emit(Topic::Ticker, &Event::Ticker(ticker.clone()));
1028                    debug!("Ticker: {} mark={:.2}", ticker.symbol, ticker.mark_price);
1029                } else {
1030                    error!("Could not parse ticker event: {:?}", ticker_v);
1031                }
1032            }
1033
1034            "trades" => {
1035                if let Ok(trades) = serde_json::from_value::<Vec<Fill>>(data["data"].clone()) {
1036                    self.emit(Topic::Trades, &Event::Trades(trades));
1037                } else {
1038                    error!("Could not parse trades event: {:?}", data["data"]);
1039                }
1040            }
1041
1042            "l2Snapshot" => {
1043                if let Ok(l2_snapshot) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1044                    self.emit(Topic::L2Snapshot, &Event::L2Snapshot(l2_snapshot));
1045                } else {
1046                    error!("Could not parse l2_snapshot event: msg: {:?}", data["data"]);
1047                }
1048            }
1049
1050            "l2Delta" => {
1051                if let Ok(l2_delta) = serde_json::from_value::<L2Snapshot>(data["data"]["book"].clone()) {
1052                    self.emit(Topic::L2Delta, &Event::L2Delta(l2_delta));
1053                } else {
1054                    error!("Could not parse l2_delta event: {:?}", data["data"]);
1055                }
1056            }
1057
1058            "candle" => {
1059                if let Ok(candle) = serde_json::from_value::<Candle>(data["data"].clone()) {
1060                    self.emit(Topic::Candle, &Event::Candle(candle));
1061                } else {
1062                    error!("Could not parse candle event: {:?}", data["data"]);
1063                }
1064            }
1065
1066            "account" => {
1067                self.handle_account(&data["data"]).await;
1068            }
1069
1070            "post" => {
1071                self.handle_post_response(&data, json);
1072            }
1073
1074            other => {
1075                debug!("Unhandled message type: {other}");
1076            }
1077        }
1078    }
1079
1080    // ─────────────────────────────────────────────────────────────────────
1081    // Account updates
1082    // ─────────────────────────────────────────────────────────────────────
1083
1084    async fn handle_account(&mut self, data: &Value) {
1085        let update_type = data["type"].as_str().unwrap_or("");
1086
1087        match update_type {
1088            "accountSnapshot" => {
1089                if let Ok(margin) = serde_json::from_value::<Margin>(data["margin"].clone()) {
1090                    self.account_state.margin = margin.clone();
1091                    self.emit(Topic::Margin, &Event::Margin(margin))
1092                }
1093
1094                if let Ok(positions) = serde_json::from_value::<Vec<PositionInfo>>(data["positions"].clone()) {
1095                    for position in &positions {
1096                        self.emit(Topic::Position, &Event::Position(position.clone()))
1097                    }
1098                    self.account_state.positions = positions
1099                        .into_iter()
1100                        .map(|p| (p.symbol.clone(), p.clone()))
1101                        .collect();
1102                }
1103
1104                if let Ok(orders) = serde_json::from_value::<Vec<OrderState>>(data["openOrders"].clone()) {
1105                    for order in &orders {
1106                        self.emit(Topic::Order, &Event::Order(order.clone()))
1107                    }
1108                    self.account_state.open_orders = orders
1109                        .into_iter()
1110                        .map(|o| (o.order_id.clone(), o))
1111                        .collect();
1112                }
1113
1114                if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverageSettings"].clone()) {
1115                    self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1116                    for l in leverages {
1117                        self.account_state.leverage_settings.insert(l.symbol.clone(), l);
1118                    }
1119                }
1120
1121                info!(
1122                    "Account snapshot: balance={:.2}, positions={}, orders={}",
1123                    self.account_state.margin.total_balance,
1124                    self.account_state.positions.len(),
1125                    self.account_state.open_orders.len(),
1126                );
1127            }
1128
1129            "orderUpdate" => {
1130                if let Ok(order) = serde_json::from_value::<OrderState>(data.clone()) {
1131                    let oid = order.order_id.clone();
1132                    if order.status.is_terminal() {
1133                        self.account_state.open_orders.remove(&oid);
1134                    } else {
1135                        self.account_state.open_orders.insert(oid, order.clone());
1136                    }
1137                    self.emit(Topic::Order, &Event::Order(order));
1138                } else {
1139                    error!("Could not parse order event: {:?}", data);
1140                }
1141            }
1142
1143            "marginUpdate" => {
1144                if let Ok(margin) = serde_json::from_value::<Margin>(data.clone()) {
1145                    self.account_state.margin = margin.clone();
1146                    self.publish_account();
1147                    self.emit(Topic::Margin, &Event::Margin(margin));
1148                } else {
1149                    error!("Could not parse margin event: {:?}", data);
1150                }
1151            }
1152
1153            "positionUpdate" => {
1154                if let Ok(pos) = serde_json::from_value::<PositionInfo>(data.clone()) {
1155                    self.account_state.positions.insert(pos.symbol.clone(), pos.clone());
1156                    self.emit(Topic::Position, &Event::Position(pos));
1157                    self.publish_account();
1158                } else {
1159                    error!("Could not parse position event: {:?}", data);
1160                }
1161            }
1162
1163            "fill" => {
1164                if let Ok(fill) = serde_json::from_value::<Fill>(data.clone()) {
1165                    let dir = fill.side.dir();
1166                    if let Some(order) = self.account_state.open_orders.get_mut(&fill.order_id) {
1167                        order.filled_size += fill.size;
1168                        order.signed_size -= dir * fill.size;
1169                        if order.signed_size * dir <= 0.0 {
1170                            self.account_state.open_orders.remove(&fill.order_id);
1171                        }
1172                    }
1173                    self.publish_account();
1174                    self.emit(Topic::Fill, &Event::Fill(fill.clone()));
1175                    info!(
1176                        "Fill: {} {:?} {} @ {} maker={}",
1177                        fill.symbol, fill.side, fill.size, fill.price, fill.is_maker,
1178                    );
1179                } else {
1180                    error!("Could not parse fill event: {:?}", data);
1181                }
1182            }
1183
1184            "leverageUpdate" => {
1185                if let Ok(leverages) = serde_json::from_value::<Vec<LeverageSetting>>(data["leverage"].clone()) {
1186                    for lev in &leverages {
1187                        self.account_state.leverage_settings.insert(lev.symbol.clone(), lev.clone());
1188                    }
1189                    self.emit(Topic::Leverage, &Event::Leverage(leverages.clone()));
1190                    self.publish_account();
1191                } else {
1192                    error!("Could not parse leverage event: {:?}", data);
1193                }
1194            }
1195
1196            _ => {
1197                debug!("Unknown account update: {update_type}");
1198            }
1199        }
1200    }
1201
1202    // ─────────────────────────────────────────────────────────────────────
1203    // Post (order) response
1204    // ─────────────────────────────────────────────────────────────────────
1205
1206    fn handle_post_response(&mut self, data: &Value, _json: &str) {
1207        let request_id = data["id"].as_u64().unwrap_or(0);
1208        let inner = &data["data"];
1209        let rtype = inner["type"].as_str().unwrap_or("");
1210        let sender = self.pending.remove(&request_id);
1211
1212        match rtype {
1213            "action"=> {
1214                let payload = &inner["payload"];
1215                let status = payload["status"].as_str().unwrap_or("");
1216
1217                if status != "ok" {
1218                    error!("Order request {request_id} failed: {status}");
1219                    if let Some(tx) = sender {
1220                        let _ = tx.send(Err(eyre::eyre!("order request failed: {}", data)));
1221                    }
1222                    self.emit(Topic::Error, &Event::Error(data.clone()));
1223                } else {
1224                    let responses = Response::parse_responses(data);
1225                    if let Some(tx) = sender {
1226                        let _ = tx.send(Ok(responses));
1227                    }
1228                }
1229            }
1230            "ack" => {
1231                let ok = inner["ok"].as_bool().unwrap_or(false);
1232                let response = if ok {
1233                    Response {
1234                        order_id: None,
1235                        status: "OK".to_string(),
1236                        message: None,
1237                        raw: inner.clone(),
1238                    }
1239                } else {
1240                    let message = inner["message"].as_str().unwrap_or("");
1241                    Response {
1242                        order_id: None,
1243                        status: "Error".to_string(),
1244                        message: Some(message.to_string()),
1245                        raw: inner.clone(),
1246                    }
1247                };
1248                if let Some(tx) = sender {
1249                    let _ = tx.send(Ok(vec![response]));
1250                }
1251            }
1252            _ => panic!("unknown response type: {}", rtype),
1253        }
1254    }
1255
1256    // ─────────────────────────────────────────────────────────────────────
1257    // Helpers
1258    // ─────────────────────────────────────────────────────────────────────
1259
1260    /// Publish the current account state snapshot to the watch channel.
1261    fn publish_account(&self) {
1262        let _ = self.account_tx.send(self.account_state.clone());
1263    }
1264
1265    /// Fire all handlers registered for `topic`.
1266    fn emit(&self, topic: Topic, data: &Event) {
1267        let _ = self.event_tx.try_send((topic, data.clone()));
1268    }
1269
1270    /// Send a JSON value over the WebSocket.
1271    async fn ws_send_json(&mut self, value: &Value) -> eyre::Result<()> {
1272        let text = serde_json::to_string(value)?;
1273        self.ws_write
1274            .send(Message::Text(text.into()))
1275            .await
1276            .map_err(|e| eyre::eyre!("ws write: {e}"))?;
1277        Ok(())
1278    }
1279
1280    /// Send subscription request(s) over the WebSocket.
1281    async fn send_subscribe(&mut self, subs: &[SubscriptionRequest]) -> eyre::Result<()> {
1282        let request = json!({
1283            "method": "subscribe",
1284            "subscription": subs.iter().map(|s| s.to_json()).collect::<Vec<_>>(),
1285        });
1286        self.ws_send_json(&request).await?;
1287        info!("Subscribed to {} topics", subs.len());
1288        Ok(())
1289    }
1290}