use std::borrow::Cow;
use std::collections::BTreeMap;
pub mod spec;
use minarrow::enums::time_units::TimeUnit;
use minarrow::{ArrowType, Field};
pub use spec::{
EmitMode, FIELD_META_FROM_STRING, FIELD_META_SOURCE_KEY, FromStringHint, WsJsonSourceSpec,
};
pub const BINANCE_SPOT_WS_HOST: &str = "wss://stream.binance.com:9443";
pub fn trade(symbol: &str) -> WsJsonSourceSpec {
let lower = symbol.to_ascii_lowercase();
let upper = symbol.to_ascii_uppercase();
WsJsonSourceSpec {
name: format!("binance.trade.{lower}").into(),
url: format!("{BINANCE_SPOT_WS_HOST}/ws/{lower}@trade").into(),
subscribe_message: None,
schema: trade_schema(),
event_time_key: "event_time".into(),
emit_mode: EmitMode::Immediate,
}
.also_set_symbol(&upper)
}
pub fn trade_schema() -> Vec<Field> {
vec![
Field::new(
"event_time",
ArrowType::Timestamp(TimeUnit::Milliseconds, None),
false,
Some(meta(&[(FIELD_META_SOURCE_KEY, "E")])),
),
Field::new(
"trade_time",
ArrowType::Timestamp(TimeUnit::Milliseconds, None),
false,
Some(meta(&[(FIELD_META_SOURCE_KEY, "T")])),
),
Field::new(
"symbol",
ArrowType::String,
false,
Some(meta(&[(FIELD_META_SOURCE_KEY, "s")])),
),
Field::new(
"trade_id",
ArrowType::Int64,
false,
Some(meta(&[(FIELD_META_SOURCE_KEY, "t")])),
),
Field::new(
"price",
ArrowType::Float64,
false,
Some(meta(&[
(FIELD_META_SOURCE_KEY, "p"),
(FIELD_META_FROM_STRING, "number"),
])),
),
Field::new(
"qty",
ArrowType::Float64,
false,
Some(meta(&[
(FIELD_META_SOURCE_KEY, "q"),
(FIELD_META_FROM_STRING, "number"),
])),
),
Field::new(
"buyer_is_maker",
ArrowType::Boolean,
false,
Some(meta(&[(FIELD_META_SOURCE_KEY, "m")])),
),
]
}
fn meta(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
trait SymbolAware {
fn also_set_symbol(self, symbol: &str) -> Self;
}
impl SymbolAware for WsJsonSourceSpec {
fn also_set_symbol(mut self, symbol: &str) -> Self {
if let Some(stripped) = self.name.strip_prefix("binance.trade.") {
let new_name = format!("binance.trade.{symbol}");
if stripped.eq_ignore_ascii_case(symbol) {
self.name = Cow::Owned(new_name);
}
}
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn trade_spec_url_uses_lowercase_symbol() {
let spec = trade("BTCUSDT");
assert_eq!(spec.url, "wss://stream.binance.com:9443/ws/btcusdt@trade");
}
#[test]
fn trade_spec_name_uses_uppercase_symbol() {
let spec = trade("btcusdt");
assert_eq!(spec.name, "binance.trade.BTCUSDT");
}
#[test]
fn trade_schema_routes_cryptic_keys_via_source_key_metadata() {
let schema = trade_schema();
let price = schema.iter().find(|f| f.name == "price").unwrap();
assert_eq!(price.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str), Some("p"));
assert_eq!(
price.metadata.get(FIELD_META_FROM_STRING).map(String::as_str),
Some("number"),
);
let event_time = schema.iter().find(|f| f.name == "event_time").unwrap();
assert_eq!(
event_time.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str),
Some("E"),
);
}
#[test]
fn decodes_real_binance_trade_payload_via_lightstream() {
use minarrow::{Array, NumericArray, TemporalArray, TextArray};
use lightstream::models::decoders::json::{JsonDecodeOptions, decode_json};
use std::io::Cursor;
let payload = br#"[{"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
"p":"63245.10","q":"0.00125","T":1700000000010,"m":false,"M":true}]"#;
let schema = trade_schema();
let opts = JsonDecodeOptions {
schema: Some(schema),
..Default::default()
};
let tbl = decode_json(Cursor::new(&payload[..]), &opts).expect("decode succeeds");
assert_eq!(tbl.n_rows, 1);
assert_eq!(tbl.cols.len(), 7);
let col = |name: &str| {
tbl.cols
.iter()
.find(|c| c.field.name == name)
.unwrap_or_else(|| panic!("no column {name}"))
};
match &col("event_time").array {
Array::TemporalArray(TemporalArray::Datetime64(a)) => {
assert_eq!(a.data.as_ref()[0], 1_700_000_000_000);
assert_eq!(a.time_unit, TimeUnit::Milliseconds);
}
_ => panic!("event_time should be Datetime64 with millisecond unit"),
}
match &col("trade_id").array {
Array::NumericArray(NumericArray::Int64(a)) => {
assert_eq!(a.data.as_ref()[0], 12345);
}
_ => panic!("trade_id should be Int64"),
}
match &col("symbol").array {
Array::TextArray(TextArray::String32(a)) => {
let start = a.offsets.as_ref()[0] as usize;
let end = a.offsets.as_ref()[1] as usize;
let s = std::str::from_utf8(&a.data.as_ref()[start..end]).unwrap();
assert_eq!(s, "BTCUSDT");
}
_ => panic!("symbol should be String32"),
}
match &col("price").array {
Array::NumericArray(NumericArray::Float64(a)) => {
assert!((a.data.as_ref()[0] - 63245.10).abs() < 1e-9);
}
_ => panic!("price should be Float64"),
}
match &col("qty").array {
Array::NumericArray(NumericArray::Float64(a)) => {
assert!((a.data.as_ref()[0] - 0.00125).abs() < 1e-12);
}
_ => panic!("qty should be Float64"),
}
match &col("buyer_is_maker").array {
Array::BooleanArray(a) => assert!(!a.data.get(0)),
_ => panic!("buyer_is_maker should be Boolean"),
}
}
}