Skip to main content

BulkWsClient

Struct BulkWsClient 

Source
pub struct BulkWsClient { /* private fields */ }
Expand description

Cloneable client handle.

  • Hot reads (ticker, price, margin): watch::Receiver::borrow() — zero async overhead, just a ref-counted pointer swap.
  • Writes (subscribe, place orders): go through the mpsc command channel to the actor, which serializes all mutations.
  • Cold reads (open orders list): round-trip through the actor via oneshot — still fast, but async.

Implementations§

Source§

impl BulkWsClient

Source

pub async fn connect(config: WSConfig) -> Result<Self>

Connect to the exchange and spawn the actor task. Returns immediately once the WebSocket handshake succeeds.

§Arguments
  • config: web socket config
Examples found in repository?
examples/md_listener.rs (lines 44-49)
34async fn main() -> eyre::Result<()> {
35    tracing_subscriber::fmt()
36        .with_env_filter(
37            EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38        )
39        .init();
40
41    let args = Args::parse();
42
43    info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44    let client = BulkWsClient::connect(WSConfig {
45        url: args.url,
46        symbols: args.symbols.clone(),
47        signer: None,
48        ..Default::default()
49    })
50        .await?;
51
52    // ── Subscribe to L2 snapshots ───────────────────────────────────────
53    if args.l2_levels > 0 {
54        for sym in &args.symbols {
55            client
56                .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57                .await?;
58        }
59    }
60
61    // ── Register event handlers ─────────────────────────────────────────
62    client
63        .on(Topic::Ticker, |event| {
64            if let Event::Ticker(t) = event {
65                info!(
66                    "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67                    t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68                );
69            }
70        })
71        .await;
72
73    client
74        .on(Topic::L2Snapshot, |event| {
75            if let Event::L2Snapshot(book) = event {
76                let (bids, asks) = &book.levels;
77                let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78                let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79                let spread = best_ask - best_bid;
80                info!(
81                    "[L2]     {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82                    book.symbol,
83                    best_bid,
84                    best_ask,
85                    spread,
86                    bids.len(),
87                    asks.len(),
88                );
89            }
90        })
91        .await;
92
93    client
94        .on(Topic::Error, |event| {
95            if let Event::Error(e) = event {
96                error!("[ERROR] {e}");
97            }
98        })
99        .await;
100
101    // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102    info!("Listening... press Ctrl-C to stop.");
103
104    tokio::signal::ctrl_c().await?;
105    info!("\nShutting down...");
106
107    client.shutdown().await;
108    process::exit(0);
109}
Source

pub async fn shutdown(&self)

Shut down the actor and close the WebSocket.

Examples found in repository?
examples/md_listener.rs (line 107)
34async fn main() -> eyre::Result<()> {
35    tracing_subscriber::fmt()
36        .with_env_filter(
37            EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38        )
39        .init();
40
41    let args = Args::parse();
42
43    info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44    let client = BulkWsClient::connect(WSConfig {
45        url: args.url,
46        symbols: args.symbols.clone(),
47        signer: None,
48        ..Default::default()
49    })
50        .await?;
51
52    // ── Subscribe to L2 snapshots ───────────────────────────────────────
53    if args.l2_levels > 0 {
54        for sym in &args.symbols {
55            client
56                .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57                .await?;
58        }
59    }
60
61    // ── Register event handlers ─────────────────────────────────────────
62    client
63        .on(Topic::Ticker, |event| {
64            if let Event::Ticker(t) = event {
65                info!(
66                    "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67                    t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68                );
69            }
70        })
71        .await;
72
73    client
74        .on(Topic::L2Snapshot, |event| {
75            if let Event::L2Snapshot(book) = event {
76                let (bids, asks) = &book.levels;
77                let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78                let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79                let spread = best_ask - best_bid;
80                info!(
81                    "[L2]     {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82                    book.symbol,
83                    best_bid,
84                    best_ask,
85                    spread,
86                    bids.len(),
87                    asks.len(),
88                );
89            }
90        })
91        .await;
92
93    client
94        .on(Topic::Error, |event| {
95            if let Event::Error(e) = event {
96                error!("[ERROR] {e}");
97            }
98        })
99        .await;
100
101    // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102    info!("Listening... press Ctrl-C to stop.");
103
104    tokio::signal::ctrl_c().await?;
105    info!("\nShutting down...");
106
107    client.shutdown().await;
108    process::exit(0);
109}
Source

pub async fn closed(&self)

Wait for the actor to exit (e.g. on disconnect / error).

Source

pub fn is_connected(&self) -> bool

Returns true if the WebSocket actor is still running.

This is a cheap, lock-free check — safe to call in hot loops. Once it returns false you must call BulkWsClient::connect again to establish a new connection.

Source

pub fn get_ticker(&self, symbol: &str) -> Option<Ticker>

Get latest ticker for symbol, or None if not yet received.

§Arguments
  • symbol: symbol to retrieve for
§Returns
  • current ticker if available
Source

pub fn get_price(&self, symbol: &str) -> Option<f64>

Get Latest mark price for symbol.

§Arguments
  • symbol: symbol to retrieve for
§Returns
  • current price if available
Source

pub fn get_tickers(&self) -> HashMap<String, Ticker>

All current tickers, keyed by symbol.

§Returns
  • all current tickers
Source

pub fn get_margin(&self) -> Margin

Get Current account margin.

Source

pub fn get_position(&self, symbol: &str) -> Option<PositionInfo>

Get current position for symbol.

§Arguments
  • symbol: symbol to retrieve for
§Returns
  • current position for symbol if available
Source

pub fn get_positions(&self) -> HashMap<String, PositionInfo>

Get all positions.

Source

pub fn get_leverage(&self, symbol: &str) -> Option<f64>

Current leverage setting for symbol.

§Arguments
  • symbol: symbol to retrieve for
§Returns
  • current leverage if available
Source

pub async fn wait_tickers_changed(&mut self) -> Result<HashMap<String, Ticker>>

Block until any ticker changes, then return the updated map.

Source

pub async fn wait_account_changed(&mut self) -> Result<AccountState>

Block until account state changes (margin, positions, orders, leverage).

Source

pub async fn open_orders(&self, symbol: Option<&str>) -> Result<Vec<OrderState>>

Open orders, optionally filtered by symbol.

§Arguments
  • symbol: optional symbol to retrieve for
§Returns
  • current orders or order status
Source

pub async fn place_orders( &self, actions: Vec<Action>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Vec<Response>>

Place one or more actions (limit, market, cancel, cancel-all). Signs the bundle, sends through the actor, and awaits the exchange response with the configured timeout.

§Arguments
  • actions: list of orders, cancels, etc
  • nonce: nonce to be used
§Returns
  • list of responses
Source

pub async fn update_oracle( &self, actions: Vec<Price>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Vec<Response>>

Send oracle price updates and return the exchange’s responses.

Waits for the exchange to acknowledge the transaction so that any rejection (e.g. invalid price, authorisation failure) is surfaced to the caller rather than silently dropped.

§Arguments
  • actions: list of oracle price updates
  • account: optional override for the signing account
  • nonce: optional nonce override; a fresh one is generated when None
§Returns
  • Ok(responses) — one Response per submitted price; callers should inspect each entry with Response::is_error to detect rejections.
  • Err(_) — transport-level failure (send error, timeout, dropped channel).
Source

pub async fn place_limit_order( &self, symbol: &str, side: Side, price: f64, size: f64, tif: TimeInForce, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>

Place limit order

§Arguments
  • symbol: which market to execute in
  • side: buy or sell
  • price: limit price
  • size: order size
  • tif: time in force
  • reduce_only: true if order is reduce only
§Returns
  • response for order placement
Source

pub async fn place_market_order( &self, symbol: &str, side: Side, size: f64, reduce_only: bool, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>

Place market order

§Arguments
  • symbol: which market to execute in
  • side: buy or sell
  • size: order size
  • reduce_only: true if order is reduce only
§Returns
  • response for order placement
Source

pub async fn cancel_order( &self, symbol: &str, order_id: &str, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>

Cancel order

§Arguments
  • symbol: which market to execute in
  • order_id: order ID to cancel
§Returns
  • response for order cancel
Source

pub async fn cancel_all( &self, symbols: Vec<String>, account: Option<Pubkey>, nonce: Option<u64>, ) -> Result<Response>

Cancel all order

§Arguments
  • symbols: which symbols to cancel
§Returns
  • response for order cancel
Source

pub fn subscribe_disconnect(&self) -> Receiver<String>

Subscribe to disconnect notifications.

The returned receiver fires exactly once, carrying the human-readable disconnect reason, when the actor exits for any reason (server close, network error, or explicit [shutdown]).

Use this as a poison pill for any tasks you spawned that should stop when the connection is lost:

let mut rx = client.subscribe_disconnect();
tokio::spawn(async move {
    let _ = rx.recv().await; // blocks until disconnect
    // clean up your task here
});
Source

pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()>

Subscribe to ticker for symbol.

§Arguments
  • symbol: symbol to subscrive to
Source

pub async fn subscribe_trades(&self, symbols: &[&str]) -> Result<()>

Subscribe to fills for symbol.

§Arguments
  • symbols: list of symbol to subscribe to
Source

pub async fn subscribe_l2_snapshot( &self, symbol: &str, nlevels: Option<u32>, ) -> Result<()>

Subscribe to L2 snapshots for symbol.

§Arguments
  • symbol: symbol to subscribe to
Examples found in repository?
examples/md_listener.rs (line 56)
34async fn main() -> eyre::Result<()> {
35    tracing_subscriber::fmt()
36        .with_env_filter(
37            EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38        )
39        .init();
40
41    let args = Args::parse();
42
43    info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44    let client = BulkWsClient::connect(WSConfig {
45        url: args.url,
46        symbols: args.symbols.clone(),
47        signer: None,
48        ..Default::default()
49    })
50        .await?;
51
52    // ── Subscribe to L2 snapshots ───────────────────────────────────────
53    if args.l2_levels > 0 {
54        for sym in &args.symbols {
55            client
56                .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57                .await?;
58        }
59    }
60
61    // ── Register event handlers ─────────────────────────────────────────
62    client
63        .on(Topic::Ticker, |event| {
64            if let Event::Ticker(t) = event {
65                info!(
66                    "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67                    t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68                );
69            }
70        })
71        .await;
72
73    client
74        .on(Topic::L2Snapshot, |event| {
75            if let Event::L2Snapshot(book) = event {
76                let (bids, asks) = &book.levels;
77                let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78                let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79                let spread = best_ask - best_bid;
80                info!(
81                    "[L2]     {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82                    book.symbol,
83                    best_bid,
84                    best_ask,
85                    spread,
86                    bids.len(),
87                    asks.len(),
88                );
89            }
90        })
91        .await;
92
93    client
94        .on(Topic::Error, |event| {
95            if let Event::Error(e) = event {
96                error!("[ERROR] {e}");
97            }
98        })
99        .await;
100
101    // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102    info!("Listening... press Ctrl-C to stop.");
103
104    tokio::signal::ctrl_c().await?;
105    info!("\nShutting down...");
106
107    client.shutdown().await;
108    process::exit(0);
109}
Source

pub async fn subscribe_l2_delta(&self, symbol: &str) -> Result<()>

Subscribe to L2 deltas for symbol.

§Arguments
  • symbol: symbol to subscribe to
Source

pub async fn subscribe_candles( &self, symbol: &str, interval: &str, ) -> Result<()>

Subscribe to candles for symbol.

§Arguments
  • symbol: symbol to subscribe to
  • interval: bar period (“1min”, “5min”, …)
Source

pub async fn on( &self, topic: Topic, handler: impl Fn(&Event) + Send + Sync + 'static, )

Register a callback for a topic. The callback runs synchronously inside the actor loop — keep it fast or tokio::spawn from within.

§Argument
  • topic: topic to subscribe to
  • handler: callback for topic
Examples found in repository?
examples/md_listener.rs (lines 63-70)
34async fn main() -> eyre::Result<()> {
35    tracing_subscriber::fmt()
36        .with_env_filter(
37            EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into())
38        )
39        .init();
40
41    let args = Args::parse();
42
43    info!("Connecting to {} for symbols: {:?}", args.url, args.symbols);
44    let client = BulkWsClient::connect(WSConfig {
45        url: args.url,
46        symbols: args.symbols.clone(),
47        signer: None,
48        ..Default::default()
49    })
50        .await?;
51
52    // ── Subscribe to L2 snapshots ───────────────────────────────────────
53    if args.l2_levels > 0 {
54        for sym in &args.symbols {
55            client
56                .subscribe_l2_snapshot(sym, Some(args.l2_levels))
57                .await?;
58        }
59    }
60
61    // ── Register event handlers ─────────────────────────────────────────
62    client
63        .on(Topic::Ticker, |event| {
64            if let Event::Ticker(t) = event {
65                info!(
66                    "[TICKER] {:<10} mark={:1.2} last={:1.2} funding={:.6} vol={:.2}",
67                    t.symbol, t.mark_price, t.last_price, t.funding_rate, t.volume,
68                );
69            }
70        })
71        .await;
72
73    client
74        .on(Topic::L2Snapshot, |event| {
75            if let Event::L2Snapshot(book) = event {
76                let (bids, asks) = &book.levels;
77                let best_bid = bids.first().map(|l| l.price).unwrap_or(0.0);
78                let best_ask = asks.first().map(|l| l.price).unwrap_or(0.0);
79                let spread = best_ask - best_bid;
80                info!(
81                    "[L2]     {:<10} bid={:1.2} ask={:1.2} spread={:.2} levels={}x{}",
82                    book.symbol,
83                    best_bid,
84                    best_ask,
85                    spread,
86                    bids.len(),
87                    asks.len(),
88                );
89            }
90        })
91        .await;
92
93    client
94        .on(Topic::Error, |event| {
95            if let Event::Error(e) = event {
96                error!("[ERROR] {e}");
97            }
98        })
99        .await;
100
101    // ── Wait for Ctrl-C ─────────────────────────────────────────────────
102    info!("Listening... press Ctrl-C to stop.");
103
104    tokio::signal::ctrl_c().await?;
105    info!("\nShutting down...");
106
107    client.shutdown().await;
108    process::exit(0);
109}

Trait Implementations§

Source§

impl Clone for BulkWsClient

Source§

fn clone(&self) -> BulkWsClient

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more