pub struct BulkWsClient { /* private fields */ }Expand description
Cloneable client handle.
- Hot reads (ticker, price, margin):
watch::Receiver::borrow()— zero async overhead, just a ref-counted pointer swap. - Writes (subscribe, place orders): go through the
mpsccommand channel to the actor, which serializes all mutations. - Cold reads (open orders list): round-trip through the actor via
oneshot— still fast, but async.
Implementations§
Source§impl BulkWsClient
impl BulkWsClient
Sourcepub async fn connect(config: WSConfig) -> Result<Self>
pub async fn connect(config: WSConfig) -> Result<Self>
Connect to the exchange and spawn the actor task. Returns immediately once the WebSocket handshake succeeds.
§Arguments
config: web socket config
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub async fn shutdown(&self)
pub async fn shutdown(&self)
Shut down the actor and close the WebSocket.
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub fn is_connected(&self) -> bool
pub fn is_connected(&self) -> bool
Returns true if the WebSocket actor is still running.
This is a cheap, lock-free check — safe to call in hot loops.
Once it returns false you must call BulkWsClient::connect again
to establish a new connection.
Sourcepub fn get_ticker(&self, symbol: &str) -> Option<Ticker>
pub fn get_ticker(&self, symbol: &str) -> Option<Ticker>
Sourcepub fn get_tickers(&self) -> HashMap<String, Ticker>
pub fn get_tickers(&self) -> HashMap<String, Ticker>
Sourcepub fn get_margin(&self) -> Margin
pub fn get_margin(&self) -> Margin
Get Current account margin.
Sourcepub fn get_position(&self, symbol: &str) -> Option<PositionInfo>
pub fn get_position(&self, symbol: &str) -> Option<PositionInfo>
Sourcepub fn get_positions(&self) -> HashMap<String, PositionInfo>
pub fn get_positions(&self) -> HashMap<String, PositionInfo>
Get all positions.
Sourcepub fn get_leverage(&self, symbol: &str) -> Option<f64>
pub fn get_leverage(&self, symbol: &str) -> Option<f64>
Sourcepub async fn wait_tickers_changed(&mut self) -> Result<HashMap<String, Ticker>>
pub async fn wait_tickers_changed(&mut self) -> Result<HashMap<String, Ticker>>
Block until any ticker changes, then return the updated map.
Sourcepub async fn wait_account_changed(&mut self) -> Result<AccountState>
pub async fn wait_account_changed(&mut self) -> Result<AccountState>
Block until account state changes (margin, positions, orders, leverage).
Sourcepub async fn open_orders(&self, symbol: Option<&str>) -> Result<Vec<OrderState>>
pub async fn open_orders(&self, symbol: Option<&str>) -> Result<Vec<OrderState>>
Sourcepub async fn place_orders(
&self,
actions: Vec<Action>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Vec<Response>>
pub async fn place_orders( &self, actions: Vec<Action>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Vec<Response>>
Sourcepub async fn update_oracle(
&self,
actions: Vec<Price>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Vec<Response>>
pub async fn update_oracle( &self, actions: Vec<Price>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Vec<Response>>
Send oracle price updates and return the exchange’s responses.
Waits for the exchange to acknowledge the transaction so that any rejection (e.g. invalid price, authorisation failure) is surfaced to the caller rather than silently dropped.
§Arguments
actions: list of oracle price updatesaccount: optional override for the signing accountnonce: optional nonce override; a fresh one is generated whenNone
§Returns
Ok(responses)— oneResponseper submitted price; callers should inspect each entry withResponse::is_errorto detect rejections.Err(_)— transport-level failure (send error, timeout, dropped channel).
Sourcepub async fn place_limit_order(
&self,
symbol: &str,
side: Side,
price: f64,
size: f64,
tif: TimeInForce,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn place_limit_order( &self, symbol: &str, side: Side, price: f64, size: f64, tif: TimeInForce, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn place_market_order(
&self,
symbol: &str,
side: Side,
size: f64,
reduce_only: bool,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn place_market_order( &self, symbol: &str, side: Side, size: f64, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn cancel_order(
&self,
symbol: &str,
order_id: &str,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn cancel_order( &self, symbol: &str, order_id: &str, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub async fn cancel_all(
&self,
symbols: Vec<String>,
account: Option<Pubkey>,
nonce: Option<u64>,
) -> Result<Response>
pub async fn cancel_all( &self, symbols: Vec<String>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>
Sourcepub fn subscribe_disconnect(&self) -> Receiver<String>
pub fn subscribe_disconnect(&self) -> Receiver<String>
Subscribe to disconnect notifications.
The returned receiver fires exactly once, carrying the human-readable
disconnect reason, when the actor exits for any reason (server close,
network error, or explicit [shutdown]).
Use this as a poison pill for any tasks you spawned that should stop when the connection is lost:
let mut rx = client.subscribe_disconnect();
tokio::spawn(async move {
let _ = rx.recv().await; // blocks until disconnect
// clean up your task here
});Sourcepub async fn subscribe_ticker(&self, symbol: &str) -> Result<()>
pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()>
Sourcepub async fn subscribe_trades(&self, symbols: &[&str]) -> Result<()>
pub async fn subscribe_trades(&self, symbols: &[&str]) -> Result<()>
Sourcepub async fn subscribe_l2_snapshot(
&self,
symbol: &str,
nlevels: Option<u32>,
) -> Result<()>
pub async fn subscribe_l2_snapshot( &self, symbol: &str, nlevels: Option<u32>, ) -> Result<()>
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Sourcepub async fn subscribe_l2_delta(&self, symbol: &str) -> Result<()>
pub async fn subscribe_l2_delta(&self, symbol: &str) -> Result<()>
Sourcepub async fn subscribe_candles(
&self,
symbol: &str,
interval: &str,
) -> Result<()>
pub async fn subscribe_candles( &self, symbol: &str, interval: &str, ) -> Result<()>
Subscribe to candles for symbol.
§Arguments
symbol: symbol to subscribe tointerval: bar period (“1min”, “5min”, …)
Sourcepub async fn on(
&self,
topic: Topic,
handler: impl Fn(&Event) + Send + Sync + 'static,
)
pub async fn on( &self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static, )
Register a callback for a topic. The callback runs synchronously
inside the actor loop — keep it fast or tokio::spawn from within.
§Argument
topic: topic to subscribe tohandler: callback for topic
Examples found in repository?
34async fn main() -> eyre::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(
37 EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38 )
39 .init();
40
41 let args = Args::parse();
42
43 info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44 let client = BulkWsClient::connect(WSConfig {
45 url: args.url,
46 symbols: args.symbols.clone(),
47 signer: None,
48 ..Default::default()
49 })
50 .await?;
51
52 // ── Subscribe to L2 snapshots ───────────────────────────────────────
53 if args.l2_levels > 0 {
54 for sym in &args.symbols {
55 client
56 .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57 .await?;
58 }
59 }
60
61 // ── Register event handlers ─────────────────────────────────────────
62 client
63 .on(Topic::Ticker, |event| {
64 if let Event::Ticker(t) = event {
65 info!(
66 "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67 t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68 );
69 }
70 })
71 .await;
72
73 client
74 .on(Topic::L2Snapshot, |event| {
75 if let Event::L2Snapshot(book) = event {
76 let (bids, asks) = &book.levels;
77 let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78 let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79 let spread = best_ask - best_bid;
80 info!(
81 "[L2] {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82 book.symbol,
83 best_bid,
84 best_ask,
85 spread,
86 bids.len(),
87 asks.len(),
88 );
89 }
90 })
91 .await;
92
93 client
94 .on(Topic::Error, |event| {
95 if let Event::Error(e) = event {
96 error!("[ERROR] {e}");
97 }
98 })
99 .await;
100
101 // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102 info!("Listening... press Ctrl-C to stop.");
103
104 tokio::signal::ctrl_c().await?;
105 info!("\nShutting down...");
106
107 client.shutdown().await;
108 process::exit(0);
109}Trait Implementations§
Source§impl Clone for BulkWsClient
impl Clone for BulkWsClient
Source§fn clone(&self) -> BulkWsClient
fn clone(&self) -> BulkWsClient
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more