Skip to main content

live_binance/
lib.rs

1//! # live-binance
2//!
3//! Plugs into lightstream's WebSocket-JSON transport to produce
4//! Binance public market data connections. Each constructor returns a
5//! [`WsJsonSourceSpec`] - URL, schema, per-field decode hints - that
6//! `live-feed` hands to lightstream's pipeline to drive the
7//! connection and stream Arrow tables to subscribers.
8//!
9//! ## Usage rights
10//!
11//! Users are responsible for compliance with Binance's market data
12//! licensing. This crate is a client library: data flows from Binance
13//! directly into the user's process, the same way `binance-rs` or any
14//! other client library works. Operating a public service that
15//! redistributes Binance data to third parties via these specs is a
16//! separate matter and not covered by personal-use terms.
17
18use std::borrow::Cow;
19use std::collections::BTreeMap;
20
21pub mod spec;
22
23use minarrow::enums::time_units::TimeUnit;
24use minarrow::{ArrowType, Field};
25
26pub use spec::{
27    EmitMode, FIELD_META_FROM_STRING, FIELD_META_SOURCE_KEY, FromStringHint, WsJsonSourceSpec,
28};
29
30/// Base host for Binance Spot combined-stream and raw-stream endpoints.
31pub const BINANCE_SPOT_WS_HOST: &str = "wss://stream.binance.com:9443";
32
33/// Public trade-stream source spec for one symbol on Binance Spot.
34///
35/// Symbol is case-insensitive and is lowercased into the URL path per
36/// Binance's documented convention; the canonical uppercase form is
37/// used in the spec's `name`.
38///
39/// Wire-shape reference (Binance trade-stream payload):
40///
41/// ```json
42/// {"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
43///  "p":"63245.10","q":"0.00125","T":1700000000010,
44///  "m":false,"M":true}
45/// ```
46pub fn trade(symbol: &str) -> WsJsonSourceSpec {
47    let lower = symbol.to_ascii_lowercase();
48    let upper = symbol.to_ascii_uppercase();
49    WsJsonSourceSpec {
50        name: format!("binance.trade.{lower}").into(),
51        url: format!("{BINANCE_SPOT_WS_HOST}/ws/{lower}@trade").into(),
52        subscribe_message: None,
53        schema: trade_schema(),
54        // Binance emits `E` as milliseconds since the Unix epoch. The
55        // Arrow column carrying that value is `event_time`; downstream
56        // consumers align batches across feeds on this column.
57        event_time_key: "event_time".into(),
58        emit_mode: EmitMode::Immediate,
59    }
60    .also_set_symbol(&upper)
61}
62
63/// Output schema for the trade stream. Field metadata maps the Arrow
64/// column name to the upstream JSON key and opts string-encoded
65/// numerics in to the from-string parse path.
66pub fn trade_schema() -> Vec<Field> {
67    vec![
68        // Event time. Binance publishes at millisecond granularity;
69        // the column carries TimeUnit::Milliseconds so downstream
70        // consumers convert across feeds without out-of-band knowledge.
71        Field::new(
72            "event_time",
73            ArrowType::Timestamp(TimeUnit::Milliseconds, None),
74            false,
75            Some(meta(&[(FIELD_META_SOURCE_KEY, "E")])),
76        ),
77        // Trade time. `E` is when the event was emitted, `T` is when
78        // the trade was matched on the book. Both carry millisecond
79        // unit metadata on the DatetimeArray.
80        Field::new(
81            "trade_time",
82            ArrowType::Timestamp(TimeUnit::Milliseconds, None),
83            false,
84            Some(meta(&[(FIELD_META_SOURCE_KEY, "T")])),
85        ),
86        // Symbol, e.g. "BTCUSDT".
87        Field::new(
88            "symbol",
89            ArrowType::String,
90            false,
91            Some(meta(&[(FIELD_META_SOURCE_KEY, "s")])),
92        ),
93        // Aggregated trade id.
94        Field::new(
95            "trade_id",
96            ArrowType::Int64,
97            false,
98            Some(meta(&[(FIELD_META_SOURCE_KEY, "t")])),
99        ),
100        // Price - sent as a JSON string to preserve precision. The
101        // from-string hint activates fast-float2 parsing into the
102        // Float64 column.
103        Field::new(
104            "price",
105            ArrowType::Float64,
106            false,
107            Some(meta(&[
108                (FIELD_META_SOURCE_KEY, "p"),
109                (FIELD_META_FROM_STRING, "number"),
110            ])),
111        ),
112        // Quantity in the base asset - also sent as a JSON string.
113        Field::new(
114            "qty",
115            ArrowType::Float64,
116            false,
117            Some(meta(&[
118                (FIELD_META_SOURCE_KEY, "q"),
119                (FIELD_META_FROM_STRING, "number"),
120            ])),
121        ),
122        // True when the buyer was the market maker.
123        Field::new(
124            "buyer_is_maker",
125            ArrowType::Boolean,
126            false,
127            Some(meta(&[(FIELD_META_SOURCE_KEY, "m")])),
128        ),
129    ]
130}
131
132/// Per-field metadata builder.
133fn meta(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
134    pairs
135        .iter()
136        .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
137        .collect()
138}
139
140/// Attach the canonical symbol to the spec name where useful.
141trait SymbolAware {
142    fn also_set_symbol(self, symbol: &str) -> Self;
143}
144
145impl SymbolAware for WsJsonSourceSpec {
146    fn also_set_symbol(mut self, symbol: &str) -> Self {
147        // Replace the lowercase suffix with the canonical uppercase
148        // form so the name reads as users expect.
149        if let Some(stripped) = self.name.strip_prefix("binance.trade.") {
150            let new_name = format!("binance.trade.{symbol}");
151            if stripped.eq_ignore_ascii_case(symbol) {
152                self.name = Cow::Owned(new_name);
153            }
154        }
155        self
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    #[test]
164    fn trade_spec_url_uses_lowercase_symbol() {
165        let spec = trade("BTCUSDT");
166        assert_eq!(spec.url, "wss://stream.binance.com:9443/ws/btcusdt@trade");
167    }
168
169    #[test]
170    fn trade_spec_name_uses_uppercase_symbol() {
171        let spec = trade("btcusdt");
172        assert_eq!(spec.name, "binance.trade.BTCUSDT");
173    }
174
175    #[test]
176    fn trade_schema_routes_cryptic_keys_via_source_key_metadata() {
177        let schema = trade_schema();
178        let price = schema.iter().find(|f| f.name == "price").unwrap();
179        assert_eq!(price.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str), Some("p"));
180        assert_eq!(
181            price.metadata.get(FIELD_META_FROM_STRING).map(String::as_str),
182            Some("number"),
183        );
184        let event_time = schema.iter().find(|f| f.name == "event_time").unwrap();
185        assert_eq!(
186            event_time.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str),
187            Some("E"),
188        );
189    }
190
191    /// End-to-end: a real Binance trade payload, decoded by lightstream
192    /// using the live-binance schema, lands as a typed Arrow `Table`
193    /// with the configured Float64 columns parsed from JSON strings.
194    /// Exercises both metadata hints (source-key alias and number
195    /// from string) in a single pass.
196    #[test]
197    fn decodes_real_binance_trade_payload_via_lightstream() {
198        use minarrow::{Array, NumericArray, TemporalArray, TextArray};
199        use lightstream::models::decoders::json::{JsonDecodeOptions, decode_json};
200        use std::io::Cursor;
201
202        let payload = br#"[{"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
203"p":"63245.10","q":"0.00125","T":1700000000010,"m":false,"M":true}]"#;
204
205        let schema = trade_schema();
206        let opts = JsonDecodeOptions {
207            schema: Some(schema),
208            ..Default::default()
209        };
210
211        let tbl = decode_json(Cursor::new(&payload[..]), &opts).expect("decode succeeds");
212        assert_eq!(tbl.n_rows, 1);
213        assert_eq!(tbl.cols.len(), 7);
214
215        let col = |name: &str| {
216            tbl.cols
217                .iter()
218                .find(|c| c.field.name == name)
219                .unwrap_or_else(|| panic!("no column {name}"))
220        };
221
222        match &col("event_time").array {
223            Array::TemporalArray(TemporalArray::Datetime64(a)) => {
224                assert_eq!(a.data.as_ref()[0], 1_700_000_000_000);
225                assert_eq!(a.time_unit, TimeUnit::Milliseconds);
226            }
227            _ => panic!("event_time should be Datetime64 with millisecond unit"),
228        }
229        match &col("trade_id").array {
230            Array::NumericArray(NumericArray::Int64(a)) => {
231                assert_eq!(a.data.as_ref()[0], 12345);
232            }
233            _ => panic!("trade_id should be Int64"),
234        }
235        match &col("symbol").array {
236            Array::TextArray(TextArray::String32(a)) => {
237                let start = a.offsets.as_ref()[0] as usize;
238                let end = a.offsets.as_ref()[1] as usize;
239                let s = std::str::from_utf8(&a.data.as_ref()[start..end]).unwrap();
240                assert_eq!(s, "BTCUSDT");
241            }
242            _ => panic!("symbol should be String32"),
243        }
244        match &col("price").array {
245            Array::NumericArray(NumericArray::Float64(a)) => {
246                assert!((a.data.as_ref()[0] - 63245.10).abs() < 1e-9);
247            }
248            _ => panic!("price should be Float64"),
249        }
250        match &col("qty").array {
251            Array::NumericArray(NumericArray::Float64(a)) => {
252                assert!((a.data.as_ref()[0] - 0.00125).abs() < 1e-12);
253            }
254            _ => panic!("qty should be Float64"),
255        }
256        match &col("buyer_is_maker").array {
257            Array::BooleanArray(a) => assert!(!a.data.get(0)),
258            _ => panic!("buyer_is_maker should be Boolean"),
259        }
260    }
261}