#[allow(missing_docs)]
pub mod arborter_pb {
include!("../../../proto/generated/xyz.aspens.arborter.v1.rs");
}
use std::fmt;
use arborter_pb::arborter_service_client::ArborterServiceClient;
use arborter_pb::{OrderState, OrderbookEntry, OrderbookRequest, Side};
use eyre::Result;
use futures::StreamExt;
use tokio::sync::mpsc;
use crate::grpc::create_channel;
impl fmt::Display for OrderbookEntry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let side_str = match Side::try_from(self.side) {
Ok(Side::Bid) => "BID",
Ok(Side::Ask) => "ASK",
_ => "UNKNOWN",
};
let state_str = match OrderState::try_from(self.state) {
Ok(OrderState::Pending) => "PENDING",
Ok(OrderState::Confirmed) => "CONFIRMED",
Ok(OrderState::Matched) => "MATCHED",
Ok(OrderState::Canceled) => "CANCELED",
Ok(OrderState::Settled) => "SETTLED",
_ => "UNKNOWN",
};
write!(
f,
"[{}] #{} {} {} @ {} (maker: {}) [{}]",
self.timestamp,
self.order_id,
side_str,
self.quantity,
self.price,
self.maker_base_address,
state_str
)
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamOrderbookOptions {
pub market_id: String,
pub historical_open_orders: bool,
pub filter_by_trader: Option<String>,
}
pub async fn stream_orderbook<F>(
url: String,
options: StreamOrderbookOptions,
mut callback: F,
) -> Result<()>
where
F: FnMut(OrderbookEntry),
{
let channel = create_channel(&url).await?;
let mut client = ArborterServiceClient::new(channel);
let request = OrderbookRequest {
continue_stream: true,
market_id: options.market_id,
historical_open_orders: Some(options.historical_open_orders),
filter_by_trader: options.filter_by_trader,
};
let request = tonic::Request::new(request);
let response = client.orderbook(request).await?;
let mut stream = response.into_inner();
while let Some(entry_result) = stream.next().await {
match entry_result {
Ok(entry) => {
callback(entry);
}
Err(e) => {
tracing::error!("Stream error: {}", e);
return Err(e.into());
}
}
}
Ok(())
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TopOfBook {
pub best_bid: Option<u128>,
pub best_ask: Option<u128>,
}
pub async fn fetch_top_of_book(
url: String,
market_id: String,
collection_window: std::time::Duration,
) -> Result<TopOfBook> {
let (mut rx, _handle) = stream_orderbook_channel(
url,
StreamOrderbookOptions {
market_id,
historical_open_orders: true,
filter_by_trader: None,
},
)
.await?;
let mut top = TopOfBook::default();
let deadline = tokio::time::sleep(collection_window);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => break,
maybe_entry = rx.recv() => {
let Some(entry) = maybe_entry else { break };
if entry.state != OrderState::Confirmed as i32 {
continue;
}
let qty: u128 = match entry.quantity.parse() {
Ok(q) if q > 0 => q,
_ => continue,
};
let price: u128 = match entry.price.parse() {
Ok(p) if p > 0 => p,
_ => continue,
};
let _ = qty;
match Side::try_from(entry.side) {
Ok(Side::Bid) if top.best_bid.is_none_or(|b| price > b) => {
top.best_bid = Some(price);
}
Ok(Side::Ask) if top.best_ask.is_none_or(|a| price < a) => {
top.best_ask = Some(price);
}
_ => {}
}
}
}
}
Ok(top)
}
pub fn apply_slippage(reference_price: u128, slippage_bps: u32, is_buy: bool) -> Result<u128> {
let bps = slippage_bps.min(10_000) as u128;
let scale = if is_buy {
10_000u128
.checked_add(bps)
.ok_or_else(|| eyre::eyre!("slippage scale overflow"))?
} else {
10_000u128
.checked_sub(bps)
.ok_or_else(|| eyre::eyre!("slippage scale underflow"))?
};
reference_price
.checked_mul(scale)
.ok_or_else(|| eyre::eyre!("slippage * price overflow"))
.map(|v| v / 10_000)
}
pub async fn stream_orderbook_channel(
url: String,
options: StreamOrderbookOptions,
) -> Result<(
mpsc::Receiver<OrderbookEntry>,
tokio::task::JoinHandle<Result<()>>,
)> {
let (tx, rx) = mpsc::channel(100);
let handle = tokio::spawn(async move {
stream_orderbook(url, options, |entry| {
let _ = tx.blocking_send(entry);
})
.await
});
Ok((rx, handle))
}
pub fn format_orderbook_entry(entry: &OrderbookEntry) -> String {
let side_str = match Side::try_from(entry.side) {
Ok(Side::Bid) => "BID ",
Ok(Side::Ask) => "ASK ",
_ => "??? ",
};
let state_str = match OrderState::try_from(entry.state) {
Ok(OrderState::Pending) => "PENDING ",
Ok(OrderState::Confirmed) => "CONFIRMED",
Ok(OrderState::Matched) => "MATCHED ",
Ok(OrderState::Canceled) => "CANCELED ",
Ok(OrderState::Settled) => "SETTLED ",
_ => "UNKNOWN ",
};
format!(
"{} | ID: {:>8} | {} | Price: {:>12} | Qty: {:>12} | {} | Maker: {}",
state_str,
entry.order_id,
side_str,
entry.price,
entry.quantity,
format_timestamp(entry.timestamp),
truncate_address(&entry.maker_base_address)
)
}
fn format_timestamp(timestamp: u64) -> String {
use std::time::{Duration, UNIX_EPOCH};
let duration = Duration::from_millis(timestamp);
let datetime = UNIX_EPOCH + duration;
match datetime.duration_since(UNIX_EPOCH) {
Ok(d) => {
let secs = d.as_secs();
let hours = (secs / 3600) % 24;
let minutes = (secs / 60) % 60;
let seconds = secs % 60;
format!("{:02}:{:02}:{:02}", hours, minutes, seconds)
}
Err(_) => format!("{}", timestamp),
}
}
fn truncate_address(address: &str) -> String {
if address.len() > 12 {
format!("{}...{}", &address[..6], &address[address.len() - 4..])
} else {
address.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_truncate_address() {
assert_eq!(
truncate_address("0x1234567890abcdef1234567890abcdef12345678"),
"0x1234...5678"
);
assert_eq!(truncate_address("short"), "short");
}
#[test]
fn test_format_timestamp() {
let _ = format_timestamp(0);
let _ = format_timestamp(1000000000000);
}
#[test]
fn test_stream_orderbook_options_default() {
let options = StreamOrderbookOptions::default();
assert_eq!(options.market_id, "");
assert!(!options.historical_open_orders);
assert!(options.filter_by_trader.is_none());
}
#[test]
fn apply_slippage_buy_adds_premium() {
assert_eq!(apply_slippage(1_000_000, 50, true).unwrap(), 1_005_000);
}
#[test]
fn apply_slippage_sell_subtracts_discount() {
assert_eq!(apply_slippage(1_000_000, 50, false).unwrap(), 995_000);
}
#[test]
fn apply_slippage_zero_is_no_op() {
assert_eq!(apply_slippage(42_000, 0, true).unwrap(), 42_000);
assert_eq!(apply_slippage(42_000, 0, false).unwrap(), 42_000);
}
#[test]
fn apply_slippage_truncates_toward_zero() {
assert_eq!(apply_slippage(1_001, 1, true).unwrap(), 1_001);
assert_eq!(apply_slippage(1_001, 1, false).unwrap(), 1_000);
}
#[test]
fn apply_slippage_clamps_above_10000_bps() {
assert_eq!(apply_slippage(100, 20_000, true).unwrap(), 200);
assert_eq!(apply_slippage(100, 20_000, false).unwrap(), 0);
}
#[test]
fn apply_slippage_sell_at_10000_bps_is_zero() {
assert_eq!(apply_slippage(1_000_000, 10_000, false).unwrap(), 0);
}
#[test]
fn apply_slippage_buy_overflow_rejected() {
assert!(apply_slippage(u128::MAX, 1, true).is_err());
}
}