1use std::borrow::Cow;
19use std::collections::BTreeMap;
20
21pub mod spec;
22
23use minarrow::enums::time_units::TimeUnit;
24use minarrow::{ArrowType, Field};
25
26pub use spec::{
27 EmitMode, FIELD_META_FROM_STRING, FIELD_META_SOURCE_KEY, FromStringHint, WsJsonSourceSpec,
28};
29
30pub const BINANCE_SPOT_WS_HOST: &str = "wss://stream.binance.com:9443";
32
33pub fn trade(symbol: &str) -> WsJsonSourceSpec {
47 let lower = symbol.to_ascii_lowercase();
48 let upper = symbol.to_ascii_uppercase();
49 WsJsonSourceSpec {
50 name: format!("binance.trade.{lower}").into(),
51 url: format!("{BINANCE_SPOT_WS_HOST}/ws/{lower}@trade").into(),
52 subscribe_message: None,
53 schema: trade_schema(),
54 event_time_key: "event_time".into(),
58 emit_mode: EmitMode::Immediate,
59 }
60 .also_set_symbol(&upper)
61}
62
63pub fn trade_schema() -> Vec<Field> {
67 vec![
68 Field::new(
72 "event_time",
73 ArrowType::Timestamp(TimeUnit::Milliseconds, None),
74 false,
75 Some(meta(&[(FIELD_META_SOURCE_KEY, "E")])),
76 ),
77 Field::new(
81 "trade_time",
82 ArrowType::Timestamp(TimeUnit::Milliseconds, None),
83 false,
84 Some(meta(&[(FIELD_META_SOURCE_KEY, "T")])),
85 ),
86 Field::new(
88 "symbol",
89 ArrowType::String,
90 false,
91 Some(meta(&[(FIELD_META_SOURCE_KEY, "s")])),
92 ),
93 Field::new(
95 "trade_id",
96 ArrowType::Int64,
97 false,
98 Some(meta(&[(FIELD_META_SOURCE_KEY, "t")])),
99 ),
100 Field::new(
104 "price",
105 ArrowType::Float64,
106 false,
107 Some(meta(&[
108 (FIELD_META_SOURCE_KEY, "p"),
109 (FIELD_META_FROM_STRING, "number"),
110 ])),
111 ),
112 Field::new(
114 "qty",
115 ArrowType::Float64,
116 false,
117 Some(meta(&[
118 (FIELD_META_SOURCE_KEY, "q"),
119 (FIELD_META_FROM_STRING, "number"),
120 ])),
121 ),
122 Field::new(
124 "buyer_is_maker",
125 ArrowType::Boolean,
126 false,
127 Some(meta(&[(FIELD_META_SOURCE_KEY, "m")])),
128 ),
129 ]
130}
131
132fn meta(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
134 pairs
135 .iter()
136 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
137 .collect()
138}
139
140trait SymbolAware {
142 fn also_set_symbol(self, symbol: &str) -> Self;
143}
144
145impl SymbolAware for WsJsonSourceSpec {
146 fn also_set_symbol(mut self, symbol: &str) -> Self {
147 if let Some(stripped) = self.name.strip_prefix("binance.trade.") {
150 let new_name = format!("binance.trade.{symbol}");
151 if stripped.eq_ignore_ascii_case(symbol) {
152 self.name = Cow::Owned(new_name);
153 }
154 }
155 self
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[test]
164 fn trade_spec_url_uses_lowercase_symbol() {
165 let spec = trade("BTCUSDT");
166 assert_eq!(spec.url, "wss://stream.binance.com:9443/ws/btcusdt@trade");
167 }
168
169 #[test]
170 fn trade_spec_name_uses_uppercase_symbol() {
171 let spec = trade("btcusdt");
172 assert_eq!(spec.name, "binance.trade.BTCUSDT");
173 }
174
175 #[test]
176 fn trade_schema_routes_cryptic_keys_via_source_key_metadata() {
177 let schema = trade_schema();
178 let price = schema.iter().find(|f| f.name == "price").unwrap();
179 assert_eq!(price.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str), Some("p"));
180 assert_eq!(
181 price.metadata.get(FIELD_META_FROM_STRING).map(String::as_str),
182 Some("number"),
183 );
184 let event_time = schema.iter().find(|f| f.name == "event_time").unwrap();
185 assert_eq!(
186 event_time.metadata.get(FIELD_META_SOURCE_KEY).map(String::as_str),
187 Some("E"),
188 );
189 }
190
191 #[test]
197 fn decodes_real_binance_trade_payload_via_lightstream() {
198 use minarrow::{Array, NumericArray, TemporalArray, TextArray};
199 use lightstream::models::decoders::json::{JsonDecodeOptions, decode_json};
200 use std::io::Cursor;
201
202 let payload = br#"[{"e":"trade","E":1700000000000,"s":"BTCUSDT","t":12345,
203"p":"63245.10","q":"0.00125","T":1700000000010,"m":false,"M":true}]"#;
204
205 let schema = trade_schema();
206 let opts = JsonDecodeOptions {
207 schema: Some(schema),
208 ..Default::default()
209 };
210
211 let tbl = decode_json(Cursor::new(&payload[..]), &opts).expect("decode succeeds");
212 assert_eq!(tbl.n_rows, 1);
213 assert_eq!(tbl.cols.len(), 7);
214
215 let col = |name: &str| {
216 tbl.cols
217 .iter()
218 .find(|c| c.field.name == name)
219 .unwrap_or_else(|| panic!("no column {name}"))
220 };
221
222 match &col("event_time").array {
223 Array::TemporalArray(TemporalArray::Datetime64(a)) => {
224 assert_eq!(a.data.as_ref()[0], 1_700_000_000_000);
225 assert_eq!(a.time_unit, TimeUnit::Milliseconds);
226 }
227 _ => panic!("event_time should be Datetime64 with millisecond unit"),
228 }
229 match &col("trade_id").array {
230 Array::NumericArray(NumericArray::Int64(a)) => {
231 assert_eq!(a.data.as_ref()[0], 12345);
232 }
233 _ => panic!("trade_id should be Int64"),
234 }
235 match &col("symbol").array {
236 Array::TextArray(TextArray::String32(a)) => {
237 let start = a.offsets.as_ref()[0] as usize;
238 let end = a.offsets.as_ref()[1] as usize;
239 let s = std::str::from_utf8(&a.data.as_ref()[start..end]).unwrap();
240 assert_eq!(s, "BTCUSDT");
241 }
242 _ => panic!("symbol should be String32"),
243 }
244 match &col("price").array {
245 Array::NumericArray(NumericArray::Float64(a)) => {
246 assert!((a.data.as_ref()[0] - 63245.10).abs() < 1e-9);
247 }
248 _ => panic!("price should be Float64"),
249 }
250 match &col("qty").array {
251 Array::NumericArray(NumericArray::Float64(a)) => {
252 assert!((a.data.as_ref()[0] - 0.00125).abs() < 1e-12);
253 }
254 _ => panic!("qty should be Float64"),
255 }
256 match &col("buyer_is_maker").array {
257 Array::BooleanArray(a) => assert!(!a.data.get(0)),
258 _ => panic!("buyer_is_maker should be Boolean"),
259 }
260 }
261}