pub struct BulkWsClient { /* private fields */ }Expand description
Cloneable client handle.
- Hot reads (ticker, price, margin):
watch::Receiver::borrow()— zero async overhead, just a ref-counted pointer swap. - Writes (subscribe, place orders): go through the
mpsccommand channel to the actor, which serializes all mutations. - Cold reads (open orders list): round-trip through the actor via
oneshot— still fast, but async.
Implementations§
Source§impl BulkWsClient
impl BulkWsClient
Sourcepub async fn connect(config: WSConfig) -> Result<Self>
pub async fn connect(config: WSConfig) -> Result<Self>
Connect to the exchange and spawn the actor task. Returns immediately once the WebSocket handshake succeeds.
§Arguments
config: web socket config
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub async fn shutdown(&self)
pub async fn shutdown(&self)
Shut down the actor and close the WebSocket.
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub fn is_connected(&self) -> bool
pub fn is_connected(&self) -> bool
Returns true if the WebSocket actor is still running.
This is a cheap, lock-free check — safe to call in hot loops.
Once it returns false you must call BulkWsClient::connect again
to establish a new connection.
Sourcepub fn get_ticker(&self, symbol: &str) -> Option<Ticker>
pub fn get_ticker(&self, symbol: &str) -> Option<Ticker>
Sourcepub fn get_tickers(&self) -> HashMap<String, Ticker>
pub fn get_tickers(&self) -> HashMap<String, Ticker>
Sourcepub fn get_margin(&self) -> Margin
pub fn get_margin(&self) -> Margin
Get Current account margin.
Sourcepub fn get_position(&self, symbol: &str) -> Option<PositionInfo>
pub fn get_position(&self, symbol: &str) -> Option<PositionInfo>
Sourcepub fn get_positions(&self) -> HashMap<String, PositionInfo>
pub fn get_positions(&self) -> HashMap<String, PositionInfo>
Get all positions.
Sourcepub fn get_leverage(&self, symbol: &str) -> Option<f64>
pub fn get_leverage(&self, symbol: &str) -> Option<f64>
Sourcepub async fn wait_tickers_changed(&mut self) -> Result<HashMap<String, Ticker>>
pub async fn wait_tickers_changed(&mut self) -> Result<HashMap<String, Ticker>>
Block until any ticker changes, then return the updated map.
Sourcepub async fn wait_account_changed(&mut self) -> Result<AccountState>
pub async fn wait_account_changed(&mut self) -> Result<AccountState>
Block until account state changes (margin, positions, orders, leverage).
Sourcepub async fn open_orders(&self, symbol: Option<&str>) -> Result<Vec<OrderState>>
pub async fn open_orders(&self, symbol: Option<&str>) -> Result<Vec<OrderState>>
Sourcepub async fn place_orders(
&self,
actions: Vec<Action>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Vec<Response>>
pub async fn place_orders( &self, actions: Vec<Action>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Vec<Response>>
Sourcepub async fn update_oracle(
&self,
actions: Vec<Price>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<()>
pub async fn update_oracle( &self, actions: Vec<Price>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<()>
Sourcepub async fn place_limit_order(
&self,
symbol: &str,
side: Side,
price: f64,
size: f64,
tif: TimeInForce,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn place_limit_order( &self, symbol: &str, side: Side, price: f64, size: f64, tif: TimeInForce, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn place_market_order(
&self,
symbol: &str,
side: Side,
size: f64,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn place_market_order( &self, symbol: &str, side: Side, size: f64, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn cancel_order(
&self,
symbol: &str,
order_id: &str,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn cancel_order( &self, symbol: &str, order_id: &str, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn cancel_all(
&self,
symbols: Vec<String>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn cancel_all( &self, symbols: Vec<String>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub fn subscribe_disconnect(&self) -> Receiver<String>
pub fn subscribe_disconnect(&self) -> Receiver<String>
Subscribe to disconnect notifications.
The returned receiver fires exactly once, carrying the human-readable
disconnect reason, when the actor exits for any reason (server close,
network error, or explicit [shutdown]).
Use this as a poison pill for any tasks you spawned that should stop when the connection is lost:
let mut rx = client.subscribe_disconnect();
tokio::spawn(async move {
let _ = rx.recv().await; // blocks until disconnect
// clean up your task here
});Sourcepub async fn subscribe_ticker(&self, symbol: &str) -> Result<()>
pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()>
Sourcepub async fn subscribe_trades(&self, symbols: &[&str]) -> Result<()>
pub async fn subscribe_trades(&self, symbols: &[&str]) -> Result<()>
Sourcepub async fn subscribe_l2_snapshot(
&self,
symbol: &str,
nlevels: Option<u32>,
) -> Result<()>
pub async fn subscribe_l2_snapshot( &self, symbol: &str, nlevels: Option<u32>, ) -> Result<()>
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub async fn subscribe_l2_delta(&self, symbol: &str) -> Result<()>
pub async fn subscribe_l2_delta(&self, symbol: &str) -> Result<()>
Sourcepub async fn subscribe_candles(
&self,
symbol: &str,
interval: &str,
) -> Result<()>
pub async fn subscribe_candles( &self, symbol: &str, interval: &str, ) -> Result<()>
Subscribe to candles for symbol.
§Arguments
symbol: symbol to subscribe tointerval: bar period (“1min”, “5min”, …)
Sourcepub async fn on(
&self,
topic: Topic,
handler: impl Fn(&Event) + Send + Sync + 'static,
)
pub async fn on( &self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static, )
Register a callback for a topic. The callback runs synchronously
inside the actor loop — keep it fast or tokio::spawn from within.
§Argument
topic: topic to subscribe tohandler: callback for topic
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Trait Implementations§
Source§impl Clone for BulkWsClient
impl Clone for BulkWsClient
Source§fn clone(&self) -> BulkWsClient
fn clone(&self) -> BulkWsClient
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more