live-binance 0.1.0

Binance public market data WebSocket streams. Plugs into the live-feed publisher SDK; data flow goes via lightstream's WebSocket-JSON pipeline.
Documentation
//! # live-binance
//!
//! Plugs into lightstream's WebSocket-JSON transport to produce
//! Binance public market data connections. Each constructor returns a
//! [`WsJsonSourceSpec`] - URL, schema, per-field decode hints - that
//! `live-feed` hands to lightstream's pipeline to drive the
//! connection and stream Arrow tables to subscribers.
//!
//! ## Usage rights
//!
//! Users are responsible for compliance with Binance's market data
//! licensing. This crate is a client library: data flows from Binance
//! directly into the user's process, the same way `binance-rs` or any
//! other client library works. Operating a public service that
//! redistributes Binance data to third parties via these specs is a
//! separate matter and not covered by personal-use terms.

use std::borrow::Cow;
use std::collections::BTreeMap;

pub mod spec;

use minarrow::enums::time_units::TimeUnit;
use minarrow::{ArrowType, Field};

pub use spec::{
    EmitMode, FIELD_META_FROM_STRING, FIELD_META_SOURCE_KEY, FromStringHint, WsJsonSourceSpec,
};

/// Base host for Binance Spot combined-stream and raw-stream endpoints.
pub const BINANCE_SPOT_WS_HOST: &str = "wss://stream.binance.com:9443";

/// Public trade-stream source spec for one symbol on Binance Spot.
///
/// Symbol is case-insensitive and is lowercased into the URL path per
/// Binance's documented convention; the canonical uppercase form is
/// used in the spec's `name`.
///
/// Wire-shape reference (Binance trade-stream payload):
///
/// ```json
/// {"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
///  "p":"63245.10","q":"0.00125","T":1700000000010,
///  "m":false,"M":true}
/// ```
pub fn trade(symbol: &str) -> WsJsonSourceSpec {
    let lower = symbol.to_ascii_lowercase();
    let upper = symbol.to_ascii_uppercase();
    WsJsonSourceSpec {
        name: format!("binance.trade.{lower}").into(),
        url: format!("{BINANCE_SPOT_WS_HOST}/ws/{lower}@trade").into(),
        subscribe_message: None,
        schema: trade_schema(),
        // Binance emits `E` as milliseconds since the Unix epoch. The
        // Arrow column carrying that value is `event_time`; downstream
        // consumers align batches across feeds on this column.
        event_time_key: "event_time".into(),
        emit_mode: EmitMode::Immediate,
    }
    .also_set_symbol(&upper)
}

/// Output schema for the trade stream. Field metadata maps the Arrow
/// column name to the upstream JSON key and opts string-encoded
/// numerics in to the from-string parse path.
pub fn trade_schema() -> Vec<Field> {
    vec![
        // Event time. Binance publishes at millisecond granularity;
        // the column carries TimeUnit::Milliseconds so downstream
        // consumers convert across feeds without out-of-band knowledge.
        Field::new(
            "event_time",
            ArrowType::Timestamp(TimeUnit::Milliseconds, None),
            false,
            Some(meta(&[(FIELD_META_SOURCE_KEY, "E")])),
        ),
        // Trade time. `E` is when the event was emitted, `T` is when
        // the trade was matched on the book. Both carry millisecond
        // unit metadata on the DatetimeArray.
        Field::new(
            "trade_time",
            ArrowType::Timestamp(TimeUnit::Milliseconds, None),
            false,
            Some(meta(&[(FIELD_META_SOURCE_KEY, "T")])),
        ),
        // Symbol, e.g. "BTCUSDT".
        Field::new(
            "symbol",
            ArrowType::String,
            false,
            Some(meta(&[(FIELD_META_SOURCE_KEY, "s")])),
        ),
        // Aggregated trade id.
        Field::new(
            "trade_id",
            ArrowType::Int64,
            false,
            Some(meta(&[(FIELD_META_SOURCE_KEY, "t")])),
        ),
        // Price - sent as a JSON string to preserve precision. The
        // from-string hint activates fast-float2 parsing into the
        // Float64 column.
        Field::new(
            "price",
            ArrowType::Float64,
            false,
            Some(meta(&[
                (FIELD_META_SOURCE_KEY, "p"),
                (FIELD_META_FROM_STRING, "number"),
            ])),
        ),
        // Quantity in the base asset - also sent as a JSON string.
        Field::new(
            "qty",
            ArrowType::Float64,
            false,
            Some(meta(&[
                (FIELD_META_SOURCE_KEY, "q"),
                (FIELD_META_FROM_STRING, "number"),
            ])),
        ),
        // True when the buyer was the market maker.
        Field::new(
            "buyer_is_maker",
            ArrowType::Boolean,
            false,
            Some(meta(&[(FIELD_META_SOURCE_KEY, "m")])),
        ),
    ]
}

/// Per-field metadata builder.
fn meta(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
    pairs
        .iter()
        .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
        .collect()
}

/// Attach the canonical symbol to the spec name where useful.
trait SymbolAware {
    fn also_set_symbol(self, symbol: &str) -> Self;
}

impl SymbolAware for WsJsonSourceSpec {
    fn also_set_symbol(mut self, symbol: &str) -> Self {
        // Replace the lowercase suffix with the canonical uppercase
        // form so the name reads as users expect.
        if let Some(stripped) = self.name.strip_prefix("binance.trade.") {
            let new_name = format!("binance.trade.{symbol}");
            if stripped.eq_ignore_ascii_case(symbol) {
                self.name = Cow::Owned(new_name);
            }
        }
        self
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn trade_spec_url_uses_lowercase_symbol() {
        let spec = trade("BTCUSDT");
        assert_eq!(spec.url, "wss://stream.binance.com:9443/ws/btcusdt@trade");
    }

    #[test]
    fn trade_spec_name_uses_uppercase_symbol() {
        let spec = trade("btcusdt");
        assert_eq!(spec.name, "binance.trade.BTCUSDT");
    }

    #[test]
    fn trade_schema_routes_cryptic_keys_via_source_key_metadata() {
        let schema = trade_schema();
        let price = schema.iter().find(|f| f.name == "price").unwrap();
        assert_eq!(price.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str), Some("p"));
        assert_eq!(
            price.metadata.get(FIELD_META_FROM_STRING).map(String::as_str),
            Some("number"),
        );
        let event_time = schema.iter().find(|f| f.name == "event_time").unwrap();
        assert_eq!(
            event_time.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str),
            Some("E"),
        );
    }

    /// End-to-end: a real Binance trade payload, decoded by lightstream
    /// using the live-binance schema, lands as a typed Arrow `Table`
    /// with the configured Float64 columns parsed from JSON strings.
    /// Exercises both metadata hints (source-key alias and number
    /// from string) in a single pass.
    #[test]
    fn decodes_real_binance_trade_payload_via_lightstream() {
        use minarrow::{Array, NumericArray, TemporalArray, TextArray};
        use lightstream::models::decoders::json::{JsonDecodeOptions, decode_json};
        use std::io::Cursor;

        let payload = br#"[{"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
"p":"63245.10","q":"0.00125","T":1700000000010,"m":false,"M":true}]"#;

        let schema = trade_schema();
        let opts = JsonDecodeOptions {
            schema: Some(schema),
            ..Default::default()
        };

        let tbl = decode_json(Cursor::new(&payload[..]), &opts).expect("decode succeeds");
        assert_eq!(tbl.n_rows, 1);
        assert_eq!(tbl.cols.len(), 7);

        let col = |name: &str| {
            tbl.cols
                .iter()
                .find(|c| c.field.name == name)
                .unwrap_or_else(|| panic!("no column {name}"))
        };

        match &col("event_time").array {
            Array::TemporalArray(TemporalArray::Datetime64(a)) => {
                assert_eq!(a.data.as_ref()[0], 1_700_000_000_000);
                assert_eq!(a.time_unit, TimeUnit::Milliseconds);
            }
            _ => panic!("event_time should be Datetime64 with millisecond unit"),
        }
        match &col("trade_id").array {
            Array::NumericArray(NumericArray::Int64(a)) => {
                assert_eq!(a.data.as_ref()[0], 12345);
            }
            _ => panic!("trade_id should be Int64"),
        }
        match &col("symbol").array {
            Array::TextArray(TextArray::String32(a)) => {
                let start = a.offsets.as_ref()[0] as usize;
                let end = a.offsets.as_ref()[1] as usize;
                let s = std::str::from_utf8(&a.data.as_ref()[start..end]).unwrap();
                assert_eq!(s, "BTCUSDT");
            }
            _ => panic!("symbol should be String32"),
        }
        match &col("price").array {
            Array::NumericArray(NumericArray::Float64(a)) => {
                assert!((a.data.as_ref()[0] - 63245.10).abs() < 1e-9);
            }
            _ => panic!("price should be Float64"),
        }
        match &col("qty").array {
            Array::NumericArray(NumericArray::Float64(a)) => {
                assert!((a.data.as_ref()[0] - 0.00125).abs() < 1e-12);
            }
            _ => panic!("qty should be Float64"),
        }
        match &col("buyer_is_maker").array {
            Array::BooleanArray(a) => assert!(!a.data.get(0)),
            _ => panic!("buyer_is_maker should be Boolean"),
        }
    }
}