use crate::error::ParseError;
use crate::messages::*;
use std::ops::ControlFlow;
use zerocopy::{FromBytes, Immutable, KnownLayout};
pub trait MessageHandler {
fn on_system_event(&mut self, _msg: &SystemEvent) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_stock_directory(&mut self, _msg: &StockDirectory) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_stock_trading_action(&mut self, _msg: &StockTradingAction) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_reg_sho_restriction(&mut self, _msg: &RegSHORestriction) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_market_participant_position(
&mut self,
_msg: &MarketParticipantPosition,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_mwcb_decline_level(&mut self, _msg: &MWCBDeclineLevel) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_mwcb_status(&mut self, _msg: &MWCBStatus) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_quoting_period_update(&mut self, _msg: &QuotingPeriodUpdate) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_luld_auction_collar(&mut self, _msg: &LULDAuctionCollar) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_operational_halt(&mut self, _msg: &OperationalHalt) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_add_order_no_mpid_attribution(
&mut self,
_msg: &AddOrderNoMPIDAttribution,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_add_order_with_mpid_attribution(
&mut self,
_msg: &AddOrderWithMPIDAttribution,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_order_executed(&mut self, _msg: &OrderExecuted) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_order_executed_with_price(&mut self, _msg: &OrderExecutedWithPrice) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_order_cancel(&mut self, _msg: &OrderCancel) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_order_delete(&mut self, _msg: &OrderDelete) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_order_replace(&mut self, _msg: &OrderReplace) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_trade(&mut self, _msg: &Trade) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_cross_trade(&mut self, _msg: &CrossTrade) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_broken_trade(&mut self, _msg: &BrokenTrade) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_net_order_imbalance_indicator(
&mut self,
_msg: &NetOrderImbalanceIndicator,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_retail_price_improvement_indicator(
&mut self,
_msg: &RetailPriceImprovementIndicator,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
fn on_direct_listing_with_capital_raise_price_discovery(
&mut self,
_msg: &DirectListingwithCapitalRaisePriceDiscovery,
) -> ControlFlow<()> {
ControlFlow::Continue(())
}
}
pub struct Parser<'a> {
buf: &'a [u8],
}
impl<'a> Parser<'a> {
pub fn new(buf: &'a [u8]) -> Self {
Self { buf }
}
pub fn parse_stream(&mut self, handler: &mut impl MessageHandler) -> Result<(), ParseError> {
while !self.buf.is_empty() {
let (body, rest) = parse_one(self.buf)?;
self.buf = rest;
let cf = match body[0] {
b'S' => handler.on_system_event(cast(body)?),
b'R' => handler.on_stock_directory(cast(body)?),
b'H' => handler.on_stock_trading_action(cast(body)?),
b'Y' => handler.on_reg_sho_restriction(cast(body)?),
b'L' => handler.on_market_participant_position(cast(body)?),
b'V' => handler.on_mwcb_decline_level(cast(body)?),
b'W' => handler.on_mwcb_status(cast(body)?),
b'K' => handler.on_quoting_period_update(cast(body)?),
b'J' => handler.on_luld_auction_collar(cast(body)?),
b'h' => handler.on_operational_halt(cast(body)?),
b'A' => handler.on_add_order_no_mpid_attribution(cast(body)?),
b'F' => handler.on_add_order_with_mpid_attribution(cast(body)?),
b'E' => handler.on_order_executed(cast(body)?),
b'C' => handler.on_order_executed_with_price(cast(body)?),
b'X' => handler.on_order_cancel(cast(body)?),
b'D' => handler.on_order_delete(cast(body)?),
b'U' => handler.on_order_replace(cast(body)?),
b'P' => handler.on_trade(cast(body)?),
b'Q' => handler.on_cross_trade(cast(body)?),
b'B' => handler.on_broken_trade(cast(body)?),
b'I' => handler.on_net_order_imbalance_indicator(cast(body)?),
b'N' => handler.on_retail_price_improvement_indicator(cast(body)?),
b'O' => handler.on_direct_listing_with_capital_raise_price_discovery(cast(body)?),
unknown => return Err(ParseError::UnknownMessageType(unknown)),
};
if cf.is_break() {
break;
}
}
Ok(())
}
}
#[inline]
pub fn parse_one(buf: &[u8]) -> Result<(&[u8], &[u8]), ParseError> {
let (len_bytes, rest) = buf.split_at_checked(2).ok_or(ParseError::EmptyBuffer)?;
let msg_len = u16::from_be_bytes(len_bytes.try_into().unwrap()) as usize;
let (body, rest) = rest
.split_at_checked(msg_len)
.ok_or(ParseError::MalformedData)?;
match body.first() {
Some(
b'S' | b'R' | b'H' | b'Y' | b'L' | b'V' | b'W' | b'K' | b'J' | b'h' | b'A' | b'F'
| b'E' | b'C' | b'X' | b'D' | b'U' | b'P' | b'Q' | b'B' | b'I' | b'N' | b'O',
) => Ok((body, rest)),
Some(&unknown) => Err(ParseError::UnknownMessageType(unknown)),
None => Err(ParseError::MalformedData),
}
}
#[inline]
pub fn cast<T: FromBytes + KnownLayout + Immutable>(body: &[u8]) -> Result<&T, ParseError> {
T::ref_from_bytes(body).map_err(|_| ParseError::MalformedData)
}