use ustr::Ustr;
use super::{MessageHeader, StreamDecodeError};
use crate::spot::sbe::{cursor::SbeCursor, error::SbeDecodeError};
#[derive(Debug, Clone, Copy)]
pub struct Trade {
pub id: i64,
pub price_mantissa: i64,
pub qty_mantissa: i64,
pub is_buyer_maker: bool,
}
impl Trade {
pub const ENCODED_LENGTH: usize = 25;
fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
Ok(Self {
id: cursor.read_i64_le()?,
price_mantissa: cursor.read_i64_le()?,
qty_mantissa: cursor.read_i64_le()?,
is_buyer_maker: cursor.read_u8()? != 0,
})
}
}
#[derive(Debug, Clone)]
pub struct TradesStreamEvent {
pub event_time_us: i64,
pub transact_time_us: i64,
pub price_exponent: i8,
pub qty_exponent: i8,
pub trades: Vec<Trade>,
pub symbol: Ustr,
}
impl TradesStreamEvent {
pub const BLOCK_LENGTH: usize = 18;
pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
let header = MessageHeader::decode(buf)?;
header.validate_schema()?;
Self::decode_validated(buf)
}
pub(crate) fn decode_validated(buf: &[u8]) -> Result<Self, StreamDecodeError> {
let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
Self::decode_body(&mut cursor)
}
#[inline]
fn decode_body(cursor: &mut SbeCursor<'_>) -> Result<Self, StreamDecodeError> {
let event_time_us = cursor.read_i64_le()?;
let transact_time_us = cursor.read_i64_le()?;
let price_exponent = cursor.read_i8()?;
let qty_exponent = cursor.read_i8()?;
let (block_length, num_in_group) = cursor.read_group_header()?;
let trades = cursor.read_group(block_length, num_in_group, Trade::decode)?;
let symbol = Ustr::from(cursor.read_var_string8_ref()?);
Ok(Self {
event_time_us,
transact_time_us,
price_exponent,
qty_exponent,
trades,
symbol,
})
}
#[inline]
#[must_use]
pub fn trade_price(&self, trade: &Trade) -> f64 {
super::mantissa_to_f64(trade.price_mantissa, self.price_exponent)
}
#[inline]
#[must_use]
pub fn trade_qty(&self, trade: &Trade) -> f64 {
super::mantissa_to_f64(trade.qty_mantissa, self.qty_exponent)
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::spot::sbe::stream::{STREAM_SCHEMA_ID, template_id};
fn make_valid_buffer(num_trades: usize) -> Vec<u8> {
let trade_block_len = 25u16;
let body_size = 18 + 6 + (num_trades * trade_block_len as usize) + 8; let mut buf = vec![0u8; 8 + body_size];
buf[0..2].copy_from_slice(&18u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::TRADES_STREAM_EVENT.to_le_bytes());
buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
buf[6..8].copy_from_slice(&0u16.to_le_bytes());
let body = &mut buf[8..];
body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&1000001i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8;
body[18..20].copy_from_slice(&trade_block_len.to_le_bytes());
body[20..24].copy_from_slice(&(num_trades as u32).to_le_bytes());
let mut offset = 24;
for i in 0..num_trades {
body[offset..offset + 8].copy_from_slice(&(i as i64 + 1).to_le_bytes()); body[offset + 8..offset + 16].copy_from_slice(&4200000i64.to_le_bytes()); body[offset + 16..offset + 24].copy_from_slice(&100000000i64.to_le_bytes()); body[offset + 24] = u8::from(i % 2 == 0); offset += trade_block_len as usize;
}
body[offset] = 7;
body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
buf
}
#[rstest]
fn test_decode_valid_single_trade() {
let buf = make_valid_buffer(1);
let event = TradesStreamEvent::decode(&buf).unwrap();
assert_eq!(event.event_time_us, 1000000);
assert_eq!(event.transact_time_us, 1000001);
assert_eq!(event.trades.len(), 1);
assert_eq!(event.trades[0].id, 1);
assert!(event.trades[0].is_buyer_maker);
assert_eq!(event.symbol, "BTCUSDT");
}
#[rstest]
fn test_decode_valid_multiple_trades() {
let buf = make_valid_buffer(5);
let event = TradesStreamEvent::decode(&buf).unwrap();
assert_eq!(event.trades.len(), 5);
for (i, trade) in event.trades.iter().enumerate() {
assert_eq!(trade.id, i as i64 + 1);
}
}
#[rstest]
fn test_decode_truncated_trades() {
let mut buf = make_valid_buffer(3);
buf.truncate(50); let err = TradesStreamEvent::decode(&buf).unwrap_err();
assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
}
#[rstest]
fn test_decode_wrong_schema() {
let mut buf = make_valid_buffer(1);
buf[4..6].copy_from_slice(&99u16.to_le_bytes());
let err = TradesStreamEvent::decode(&buf).unwrap_err();
assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
}
#[rstest]
fn test_decode_validated_matches_decode() {
let buf = make_valid_buffer(3);
let decode_event = TradesStreamEvent::decode(&buf).unwrap();
let validated_event = TradesStreamEvent::decode_validated(&buf).unwrap();
assert_eq!(validated_event.event_time_us, decode_event.event_time_us);
assert_eq!(
validated_event.transact_time_us,
decode_event.transact_time_us
);
assert_eq!(validated_event.price_exponent, decode_event.price_exponent);
assert_eq!(validated_event.qty_exponent, decode_event.qty_exponent);
assert_eq!(validated_event.trades.len(), decode_event.trades.len());
assert_eq!(validated_event.symbol, decode_event.symbol);
}
}