use std::collections::HashSet;
use bytes::Bytes;
use nautilus_common::{
messages::{
data::{
BarsResponse, BookDeltasResponse, BookDepthResponse, BookResponse, CustomDataResponse,
DataCommand, DataResponse, ForwardPricesResponse, FundingRatesResponse,
InstrumentResponse, InstrumentsResponse, QuotesResponse, TradesResponse,
},
execution::{
BatchCancelOrders, CancelAllOrders, CancelOrder, ExecutionReport, ModifyOrder,
QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList, TradingCommand,
},
},
timer::TimeEvent,
};
use nautilus_core::{Params, UUID4, UnixNanos};
use nautilus_model::{
data::DataType,
events::{
AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
OrderEmulated, OrderEventAny, OrderExpired, OrderFilled, OrderInitialized,
OrderModifyRejected, OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased,
OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
PositionClosed, PositionEvent, PositionOpened,
},
identifiers::{ClientId, InstrumentId, Venue},
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
};
use serde::Serialize;
use ustr::Ustr;
use crate::{
backend::{IndexKey, IndexKind},
capture::{
encoder::{EncodeError, EncodedPayload},
registry::EncoderRegistry,
},
entry::PayloadType,
headers::Headers,
};
pub const PAYLOAD_TYPE_SUBMIT_ORDER: &str = "SubmitOrder";
pub const PAYLOAD_TYPE_SUBMIT_ORDER_LIST: &str = "SubmitOrderList";
pub const PAYLOAD_TYPE_MODIFY_ORDER: &str = "ModifyOrder";
pub const PAYLOAD_TYPE_CANCEL_ORDER: &str = "CancelOrder";
pub const PAYLOAD_TYPE_CANCEL_ALL_ORDERS: &str = "CancelAllOrders";
pub const PAYLOAD_TYPE_BATCH_CANCEL_ORDERS: &str = "BatchCancelOrders";
pub const PAYLOAD_TYPE_QUERY_ORDER: &str = "QueryOrder";
pub const PAYLOAD_TYPE_QUERY_ACCOUNT: &str = "QueryAccount";
pub const PAYLOAD_TYPE_ORDER_INITIALIZED: &str = "OrderInitialized";
pub const PAYLOAD_TYPE_ORDER_DENIED: &str = "OrderDenied";
pub const PAYLOAD_TYPE_ORDER_EMULATED: &str = "OrderEmulated";
pub const PAYLOAD_TYPE_ORDER_RELEASED: &str = "OrderReleased";
pub const PAYLOAD_TYPE_ORDER_SUBMITTED: &str = "OrderSubmitted";
pub const PAYLOAD_TYPE_ORDER_ACCEPTED: &str = "OrderAccepted";
pub const PAYLOAD_TYPE_ORDER_REJECTED: &str = "OrderRejected";
pub const PAYLOAD_TYPE_ORDER_CANCELED: &str = "OrderCanceled";
pub const PAYLOAD_TYPE_ORDER_EXPIRED: &str = "OrderExpired";
pub const PAYLOAD_TYPE_ORDER_TRIGGERED: &str = "OrderTriggered";
pub const PAYLOAD_TYPE_ORDER_PENDING_UPDATE: &str = "OrderPendingUpdate";
pub const PAYLOAD_TYPE_ORDER_PENDING_CANCEL: &str = "OrderPendingCancel";
pub const PAYLOAD_TYPE_ORDER_MODIFY_REJECTED: &str = "OrderModifyRejected";
pub const PAYLOAD_TYPE_ORDER_CANCEL_REJECTED: &str = "OrderCancelRejected";
pub const PAYLOAD_TYPE_ORDER_UPDATED: &str = "OrderUpdated";
pub const PAYLOAD_TYPE_ORDER_FILLED: &str = "OrderFilled";
pub const PAYLOAD_TYPE_ORDER_STATUS_REPORT: &str = "OrderStatusReport";
pub const PAYLOAD_TYPE_FILL_REPORT: &str = "FillReport";
pub const PAYLOAD_TYPE_ORDER_WITH_FILLS: &str = "OrderWithFills";
pub const PAYLOAD_TYPE_POSITION_STATUS_REPORT: &str = "PositionStatusReport";
pub const PAYLOAD_TYPE_EXECUTION_MASS_STATUS: &str = "ExecutionMassStatus";
pub const PAYLOAD_TYPE_POSITION_OPENED: &str = "PositionOpened";
pub const PAYLOAD_TYPE_POSITION_CHANGED: &str = "PositionChanged";
pub const PAYLOAD_TYPE_POSITION_CLOSED: &str = "PositionClosed";
pub const PAYLOAD_TYPE_POSITION_ADJUSTED: &str = "PositionAdjusted";
pub const PAYLOAD_TYPE_ACCOUNT_STATE: &str = "AccountState";
pub const PAYLOAD_TYPE_TIME_EVENT: &str = "TimeEvent";
pub const PAYLOAD_TYPE_REQUEST_COMMAND: &str = "RequestCommand";
pub const PAYLOAD_TYPE_SUBSCRIBE_COMMAND: &str = "SubscribeCommand";
pub const PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND: &str = "UnsubscribeCommand";
#[cfg(feature = "defi")]
pub const PAYLOAD_TYPE_DEFI_REQUEST_COMMAND: &str = "DefiRequestCommand";
#[cfg(feature = "defi")]
pub const PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND: &str = "DefiSubscribeCommand";
#[cfg(feature = "defi")]
pub const PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND: &str = "DefiUnsubscribeCommand";
pub const PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE: &str = "CustomDataResponse";
pub const PAYLOAD_TYPE_INSTRUMENT_RESPONSE: &str = "InstrumentResponse";
pub const PAYLOAD_TYPE_INSTRUMENTS_RESPONSE: &str = "InstrumentsResponse";
pub const PAYLOAD_TYPE_BOOK_RESPONSE: &str = "BookResponse";
pub const PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE: &str = "BookDeltasResponse";
pub const PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE: &str = "BookDepthResponse";
pub const PAYLOAD_TYPE_QUOTES_RESPONSE: &str = "QuotesResponse";
pub const PAYLOAD_TYPE_TRADES_RESPONSE: &str = "TradesResponse";
pub const PAYLOAD_TYPE_FUNDING_RATES_RESPONSE: &str = "FundingRatesResponse";
pub const PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE: &str = "ForwardPricesResponse";
pub const PAYLOAD_TYPE_BARS_RESPONSE: &str = "BarsResponse";
const PAYLOAD_TYPE_TRADING_COMMAND: &str = "TradingCommand";
const PAYLOAD_TYPE_ORDER_EVENT_ANY: &str = "OrderEventAny";
const PAYLOAD_TYPE_EXECUTION_REPORT: &str = "ExecutionReport";
const PAYLOAD_TYPE_POSITION_EVENT: &str = "PositionEvent";
const PAYLOAD_TYPE_DATA_COMMAND: &str = "DataCommand";
const PAYLOAD_TYPE_DATA_RESPONSE: &str = "DataResponse";
#[cfg(test)]
pub(crate) const DEFAULT_CAPTURE_PAYLOAD_TYPES: &[&str] = &[
PAYLOAD_TYPE_SUBMIT_ORDER,
PAYLOAD_TYPE_SUBMIT_ORDER_LIST,
PAYLOAD_TYPE_MODIFY_ORDER,
PAYLOAD_TYPE_CANCEL_ORDER,
PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
PAYLOAD_TYPE_BATCH_CANCEL_ORDERS,
PAYLOAD_TYPE_QUERY_ORDER,
PAYLOAD_TYPE_QUERY_ACCOUNT,
PAYLOAD_TYPE_ORDER_INITIALIZED,
PAYLOAD_TYPE_ORDER_DENIED,
PAYLOAD_TYPE_ORDER_EMULATED,
PAYLOAD_TYPE_ORDER_RELEASED,
PAYLOAD_TYPE_ORDER_SUBMITTED,
PAYLOAD_TYPE_ORDER_ACCEPTED,
PAYLOAD_TYPE_ORDER_REJECTED,
PAYLOAD_TYPE_ORDER_CANCELED,
PAYLOAD_TYPE_ORDER_EXPIRED,
PAYLOAD_TYPE_ORDER_TRIGGERED,
PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
PAYLOAD_TYPE_ORDER_UPDATED,
PAYLOAD_TYPE_ORDER_FILLED,
PAYLOAD_TYPE_ORDER_STATUS_REPORT,
PAYLOAD_TYPE_FILL_REPORT,
PAYLOAD_TYPE_ORDER_WITH_FILLS,
PAYLOAD_TYPE_POSITION_STATUS_REPORT,
PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
PAYLOAD_TYPE_POSITION_OPENED,
PAYLOAD_TYPE_POSITION_CHANGED,
PAYLOAD_TYPE_POSITION_CLOSED,
PAYLOAD_TYPE_POSITION_ADJUSTED,
PAYLOAD_TYPE_ACCOUNT_STATE,
PAYLOAD_TYPE_TIME_EVENT,
PAYLOAD_TYPE_REQUEST_COMMAND,
PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
#[cfg(feature = "defi")]
PAYLOAD_TYPE_DEFI_REQUEST_COMMAND,
#[cfg(feature = "defi")]
PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
#[cfg(feature = "defi")]
PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
PAYLOAD_TYPE_BOOK_RESPONSE,
PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
PAYLOAD_TYPE_QUOTES_RESPONSE,
PAYLOAD_TYPE_TRADES_RESPONSE,
PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE,
PAYLOAD_TYPE_BARS_RESPONSE,
];
#[must_use]
pub fn default_registry() -> EncoderRegistry {
let mut registry = EncoderRegistry::new();
register_default(&mut registry);
registry
}
pub fn register_default(registry: &mut EncoderRegistry) {
registry
.register::<SubmitOrder, _>(payload_type(PAYLOAD_TYPE_SUBMIT_ORDER), encode_submit_order);
registry
.register::<OrderFilled, _>(payload_type(PAYLOAD_TYPE_ORDER_FILLED), encode_order_filled);
registry.register::<OrderStatusReport, _>(
payload_type(PAYLOAD_TYPE_ORDER_STATUS_REPORT),
encode_order_status_report,
);
registry.register::<FillReport, _>(payload_type(PAYLOAD_TYPE_FILL_REPORT), encode_fill_report);
registry.register::<PositionStatusReport, _>(
payload_type(PAYLOAD_TYPE_POSITION_STATUS_REPORT),
encode_position_status_report,
);
registry.register::<TradingCommand, _>(
payload_type(PAYLOAD_TYPE_TRADING_COMMAND),
encode_trading_command,
);
registry.register::<OrderEventAny, _>(
payload_type(PAYLOAD_TYPE_ORDER_EVENT_ANY),
encode_order_event_any,
);
registry.register::<ExecutionReport, _>(
payload_type(PAYLOAD_TYPE_EXECUTION_REPORT),
encode_execution_report,
);
registry.register::<PositionEvent, _>(
payload_type(PAYLOAD_TYPE_POSITION_EVENT),
encode_position_event,
);
registry.register::<AccountState, _>(
payload_type(PAYLOAD_TYPE_ACCOUNT_STATE),
encode_account_state,
);
registry.register::<TimeEvent, _>(payload_type(PAYLOAD_TYPE_TIME_EVENT), encode_time_event);
registry
.register::<DataCommand, _>(payload_type(PAYLOAD_TYPE_DATA_COMMAND), encode_data_command);
registry.register::<DataResponse, _>(
payload_type(PAYLOAD_TYPE_DATA_RESPONSE),
encode_data_response,
);
register_default_headers(registry);
}
fn register_default_headers(registry: &mut EncoderRegistry) {
registry.register_headers::<SubmitOrder, _>(extract_submit_order_headers);
registry.register_headers::<SubmitOrderList, _>(extract_submit_order_list_headers);
registry.register_headers::<ModifyOrder, _>(extract_modify_order_headers);
registry.register_headers::<CancelOrder, _>(extract_cancel_order_headers);
registry.register_headers::<CancelAllOrders, _>(extract_cancel_all_orders_headers);
registry.register_headers::<BatchCancelOrders, _>(extract_batch_cancel_orders_headers);
registry.register_headers::<QueryOrder, _>(extract_query_order_headers);
registry.register_headers::<QueryAccount, _>(extract_query_account_headers);
registry.register_headers::<TradingCommand, _>(extract_trading_command_headers);
registry.register_headers::<DataCommand, _>(extract_data_command_headers);
registry.register_headers::<DataResponse, _>(extract_data_response_headers);
}
fn headers_from_fields(correlation_id: Option<UUID4>, causation_id: Option<UUID4>) -> Headers {
Headers {
correlation_id,
causation_id,
}
}
fn extract_submit_order_headers(cmd: &SubmitOrder) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_submit_order_list_headers(cmd: &SubmitOrderList) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_modify_order_headers(cmd: &ModifyOrder) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_cancel_order_headers(cmd: &CancelOrder) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_cancel_all_orders_headers(cmd: &CancelAllOrders) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_batch_cancel_orders_headers(cmd: &BatchCancelOrders) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_query_order_headers(cmd: &QueryOrder) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_query_account_headers(cmd: &QueryAccount) -> Headers {
headers_from_fields(cmd.correlation_id, cmd.causation_id)
}
fn extract_trading_command_headers(command: &TradingCommand) -> Headers {
match command {
TradingCommand::SubmitOrder(cmd) => extract_submit_order_headers(cmd),
TradingCommand::SubmitOrderList(cmd) => extract_submit_order_list_headers(cmd),
TradingCommand::ModifyOrder(cmd) => extract_modify_order_headers(cmd),
TradingCommand::CancelOrder(cmd) => extract_cancel_order_headers(cmd),
TradingCommand::CancelAllOrders(cmd) => extract_cancel_all_orders_headers(cmd),
TradingCommand::BatchCancelOrders(cmd) => extract_batch_cancel_orders_headers(cmd),
TradingCommand::QueryOrder(cmd) => extract_query_order_headers(cmd),
TradingCommand::QueryAccount(cmd) => extract_query_account_headers(cmd),
}
}
fn extract_data_command_headers(command: &DataCommand) -> Headers {
match command {
DataCommand::Request(cmd) => headers_from_fields(Some(*cmd.request_id()), None),
DataCommand::Subscribe(cmd) => headers_from_fields(cmd.correlation_id(), None),
DataCommand::Unsubscribe(cmd) => headers_from_fields(cmd.correlation_id(), None),
_ => Headers::empty(),
}
}
fn extract_data_response_headers(response: &DataResponse) -> Headers {
headers_from_fields(Some(*response.correlation_id()), None)
}
fn payload_type(tag: &str) -> PayloadType {
Ustr::from(tag)
}
fn encode_serde<T: Serialize>(value: &T) -> Result<Bytes, EncodeError> {
rmp_serde::to_vec_named(value)
.map(Bytes::from)
.map_err(|e| EncodeError::Serialize(e.to_string()))
}
pub fn encode_submit_order(message: &SubmitOrder) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(message)?;
let index_keys = vec![IndexKey::new(
IndexKind::ClientOrderId,
message.client_order_id.to_string(),
)];
Ok(EncodedPayload::new(payload, index_keys))
}
pub fn encode_order_filled(message: &OrderFilled) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(message)?;
let index_keys = vec![
IndexKey::new(
IndexKind::ClientOrderId,
message.client_order_id.to_string(),
),
IndexKey::new(IndexKind::VenueOrderId, message.venue_order_id.to_string()),
];
Ok(EncodedPayload::new(payload, index_keys))
}
pub fn encode_trading_command(command: &TradingCommand) -> Result<EncodedPayload, EncodeError> {
match command {
TradingCommand::SubmitOrder(cmd) => {
Ok(retag(encode_submit_order(cmd)?, PAYLOAD_TYPE_SUBMIT_ORDER))
}
TradingCommand::SubmitOrderList(cmd) => encode_submit_order_list(cmd),
TradingCommand::ModifyOrder(cmd) => encode_modify_order(cmd),
TradingCommand::CancelOrder(cmd) => encode_cancel_order(cmd),
TradingCommand::CancelAllOrders(cmd) => encode_cancel_all_orders(cmd),
TradingCommand::BatchCancelOrders(cmd) => encode_batch_cancel_orders(cmd),
TradingCommand::QueryOrder(cmd) => encode_query_order(cmd),
TradingCommand::QueryAccount(cmd) => encode_query_account(cmd),
}
}
pub fn encode_order_event_any(event: &OrderEventAny) -> Result<EncodedPayload, EncodeError> {
match event {
OrderEventAny::Initialized(e) => encode_order_initialized(e),
OrderEventAny::Denied(e) => encode_order_denied(e),
OrderEventAny::Emulated(e) => encode_order_emulated(e),
OrderEventAny::Released(e) => encode_order_released(e),
OrderEventAny::Submitted(e) => encode_order_submitted(e),
OrderEventAny::Accepted(e) => encode_order_accepted(e),
OrderEventAny::Rejected(e) => encode_order_rejected(e),
OrderEventAny::Canceled(e) => encode_order_canceled(e),
OrderEventAny::Expired(e) => encode_order_expired(e),
OrderEventAny::Triggered(e) => encode_order_triggered(e),
OrderEventAny::PendingUpdate(e) => encode_order_pending_update(e),
OrderEventAny::PendingCancel(e) => encode_order_pending_cancel(e),
OrderEventAny::ModifyRejected(e) => encode_order_modify_rejected(e),
OrderEventAny::CancelRejected(e) => encode_order_cancel_rejected(e),
OrderEventAny::Updated(e) => encode_order_updated(e),
OrderEventAny::Filled(e) => Ok(retag(encode_order_filled(e)?, PAYLOAD_TYPE_ORDER_FILLED)),
}
}
pub fn encode_execution_report(report: &ExecutionReport) -> Result<EncodedPayload, EncodeError> {
match report {
ExecutionReport::Order(r) => Ok(retag(
encode_order_status_report(r)?,
PAYLOAD_TYPE_ORDER_STATUS_REPORT,
)),
ExecutionReport::Fill(r) => encode_fill_report(r),
ExecutionReport::OrderWithFills(order, fills) => encode_order_with_fills(order, fills),
ExecutionReport::Position(r) => encode_position_status_report(r),
ExecutionReport::MassStatus(s) => encode_execution_mass_status(s),
}
}
pub fn encode_fill_report(report: &FillReport) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(report)?;
let mut index_keys = Vec::with_capacity(2);
index_keys.push(IndexKey::new(
IndexKind::VenueOrderId,
report.venue_order_id.to_string(),
));
if let Some(client_order_id) = &report.client_order_id {
index_keys.push(IndexKey::new(
IndexKind::ClientOrderId,
client_order_id.to_string(),
));
}
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_FILL_REPORT),
payload,
index_keys,
))
}
pub fn encode_position_status_report(
report: &PositionStatusReport,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(report)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_POSITION_STATUS_REPORT),
payload,
Vec::new(),
))
}
fn encode_order_with_fills(
order: &OrderStatusReport,
fills: &[FillReport],
) -> Result<EncodedPayload, EncodeError> {
#[derive(Serialize)]
struct OrderWithFillsRef<'a> {
order_report: &'a OrderStatusReport,
fill_reports: &'a [FillReport],
}
let payload = encode_serde(&OrderWithFillsRef {
order_report: order,
fill_reports: fills,
})?;
let mut index_keys = Vec::new();
let mut seen = HashSet::new();
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::VenueOrderId,
order.venue_order_id.to_string(),
);
if let Some(client_order_id) = &order.client_order_id {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
client_order_id.to_string(),
);
}
for fill in fills {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::VenueOrderId,
fill.venue_order_id.to_string(),
);
if let Some(client_order_id) = &fill.client_order_id {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
client_order_id.to_string(),
);
}
}
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_ORDER_WITH_FILLS),
payload,
index_keys,
))
}
fn encode_execution_mass_status(
status: &ExecutionMassStatus,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(status)?;
let mut index_keys = Vec::new();
let mut seen = HashSet::new();
let order_reports = status.order_reports();
for (venue_order_id, report) in &order_reports {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::VenueOrderId,
venue_order_id.to_string(),
);
if let Some(client_order_id) = &report.client_order_id {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
client_order_id.to_string(),
);
}
}
let fill_reports = status.fill_reports();
for (venue_order_id, fills) in &fill_reports {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::VenueOrderId,
venue_order_id.to_string(),
);
for fill in fills {
if let Some(client_order_id) = &fill.client_order_id {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
client_order_id.to_string(),
);
}
}
}
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_EXECUTION_MASS_STATUS),
payload,
index_keys,
))
}
fn push_unique_index_key(
index_keys: &mut Vec<IndexKey>,
seen: &mut HashSet<(IndexKind, String)>,
kind: IndexKind,
key: String,
) {
if seen.insert((kind, key.clone())) {
index_keys.push(IndexKey::new(kind, key));
}
}
pub fn encode_position_event(event: &PositionEvent) -> Result<EncodedPayload, EncodeError> {
match event {
PositionEvent::PositionOpened(e) => encode_position_opened(e),
PositionEvent::PositionChanged(e) => encode_position_changed(e),
PositionEvent::PositionClosed(e) => encode_position_closed(e),
PositionEvent::PositionAdjusted(e) => encode_position_adjusted(e),
}
}
fn encode_position_opened(event: &PositionOpened) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(event)?;
let index_keys = vec![IndexKey::new(
IndexKind::ClientOrderId,
event.opening_order_id.to_string(),
)];
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_POSITION_OPENED),
payload,
index_keys,
))
}
fn encode_position_changed(event: &PositionChanged) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(event)?;
let index_keys = vec![IndexKey::new(
IndexKind::ClientOrderId,
event.opening_order_id.to_string(),
)];
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_POSITION_CHANGED),
payload,
index_keys,
))
}
fn encode_position_closed(event: &PositionClosed) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(event)?;
let mut index_keys = Vec::new();
let mut seen = HashSet::new();
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
event.opening_order_id.to_string(),
);
if let Some(closing_order_id) = &event.closing_order_id {
push_unique_index_key(
&mut index_keys,
&mut seen,
IndexKind::ClientOrderId,
closing_order_id.to_string(),
);
}
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_POSITION_CLOSED),
payload,
index_keys,
))
}
fn encode_position_adjusted(event: &PositionAdjusted) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(event)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_POSITION_ADJUSTED),
payload,
Vec::new(),
))
}
fn encode_submit_order_list(cmd: &SubmitOrderList) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(cmd)?;
let index_keys = cmd
.order_list
.client_order_ids
.iter()
.map(|cid| IndexKey::new(IndexKind::ClientOrderId, cid.to_string()))
.collect();
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_SUBMIT_ORDER_LIST),
payload,
index_keys,
))
}
fn encode_modify_order(cmd: &ModifyOrder) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
cmd,
PAYLOAD_TYPE_MODIFY_ORDER,
cmd.client_order_id.to_string(),
cmd.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_cancel_order(cmd: &CancelOrder) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
cmd,
PAYLOAD_TYPE_CANCEL_ORDER,
cmd.client_order_id.to_string(),
cmd.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_cancel_all_orders(cmd: &CancelAllOrders) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(cmd)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_CANCEL_ALL_ORDERS),
payload,
Vec::new(),
))
}
fn encode_batch_cancel_orders(cmd: &BatchCancelOrders) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(cmd)?;
let mut index_keys = Vec::with_capacity(cmd.cancels.len() * 2);
for c in &cmd.cancels {
index_keys.push(IndexKey::new(
IndexKind::ClientOrderId,
c.client_order_id.to_string(),
));
if let Some(venue) = c.venue_order_id {
index_keys.push(IndexKey::new(IndexKind::VenueOrderId, venue.to_string()));
}
}
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_BATCH_CANCEL_ORDERS),
payload,
index_keys,
))
}
fn encode_query_order(cmd: &QueryOrder) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
cmd,
PAYLOAD_TYPE_QUERY_ORDER,
cmd.client_order_id.to_string(),
cmd.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_query_account(cmd: &QueryAccount) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(cmd)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_QUERY_ACCOUNT),
payload,
Vec::new(),
))
}
fn encode_order_initialized(e: &OrderInitialized) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_INITIALIZED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_denied(e: &OrderDenied) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_DENIED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_emulated(e: &OrderEmulated) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_EMULATED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_released(e: &OrderReleased) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_RELEASED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_submitted(e: &OrderSubmitted) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_SUBMITTED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_accepted(e: &OrderAccepted) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_ACCEPTED,
e.client_order_id.to_string(),
Some(e.venue_order_id.to_string()),
)
}
fn encode_order_rejected(e: &OrderRejected) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_REJECTED,
e.client_order_id.to_string(),
None,
)
}
fn encode_order_canceled(e: &OrderCanceled) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_CANCELED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_expired(e: &OrderExpired) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_EXPIRED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_triggered(e: &OrderTriggered) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_TRIGGERED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_pending_update(e: &OrderPendingUpdate) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_pending_cancel(e: &OrderPendingCancel) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_modify_rejected(e: &OrderModifyRejected) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_cancel_rejected(e: &OrderCancelRejected) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_order_updated(e: &OrderUpdated) -> Result<EncodedPayload, EncodeError> {
encode_with_order_ids(
e,
PAYLOAD_TYPE_ORDER_UPDATED,
e.client_order_id.to_string(),
e.venue_order_id.map(|v| v.to_string()),
)
}
fn encode_with_order_ids<T: Serialize>(
value: &T,
tag: &str,
client_order_id: String,
venue_order_id: Option<String>,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(value)?;
let mut index_keys = Vec::with_capacity(2);
index_keys.push(IndexKey::new(IndexKind::ClientOrderId, client_order_id));
if let Some(venue) = venue_order_id {
index_keys.push(IndexKey::new(IndexKind::VenueOrderId, venue));
}
Ok(EncodedPayload::with_payload_type(
payload_type(tag),
payload,
index_keys,
))
}
fn retag(mut encoded: EncodedPayload, tag: &str) -> EncodedPayload {
encoded.payload_type = Some(payload_type(tag));
encoded
}
pub fn encode_order_status_report(
message: &OrderStatusReport,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(message)?;
let mut index_keys = Vec::with_capacity(2);
index_keys.push(IndexKey::new(
IndexKind::VenueOrderId,
message.venue_order_id.to_string(),
));
if let Some(client_order_id) = &message.client_order_id {
index_keys.push(IndexKey::new(
IndexKind::ClientOrderId,
client_order_id.to_string(),
));
}
Ok(EncodedPayload::new(payload, index_keys))
}
pub fn encode_account_state(message: &AccountState) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(message)?;
Ok(EncodedPayload::new(payload, Vec::new()))
}
#[derive(Serialize)]
struct TimeEventPayload<'a> {
name: &'a str,
event_id: UUID4,
ts_event: UnixNanos,
ts_init: UnixNanos,
}
pub fn encode_time_event(event: &TimeEvent) -> Result<EncodedPayload, EncodeError> {
let payload = TimeEventPayload {
name: event.name.as_str(),
event_id: event.event_id,
ts_event: event.ts_event,
ts_init: event.ts_init,
};
Ok(EncodedPayload::new(encode_serde(&payload)?, Vec::new()))
}
pub fn encode_data_command(command: &DataCommand) -> Result<EncodedPayload, EncodeError> {
match command {
DataCommand::Request(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_REQUEST_COMMAND)
}
DataCommand::Subscribe(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_SUBSCRIBE_COMMAND)
}
DataCommand::Unsubscribe(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND)
}
#[cfg(feature = "defi")]
DataCommand::DefiRequest(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_DEFI_REQUEST_COMMAND)
}
#[cfg(feature = "defi")]
DataCommand::DefiSubscribe(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND)
}
#[cfg(feature = "defi")]
DataCommand::DefiUnsubscribe(cmd) => {
encode_data_command_category(cmd, PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND)
}
_ => Err(EncodeError::Serialize(
"unsupported DataCommand variant".to_string(),
)),
}
}
fn encode_data_command_category<T: Serialize>(
command: &T,
tag: &str,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(command)?;
Ok(EncodedPayload::with_payload_type(
payload_type(tag),
payload,
Vec::new(),
))
}
pub fn encode_data_response(response: &DataResponse) -> Result<EncodedPayload, EncodeError> {
match response {
DataResponse::Data(resp) => encode_custom_data_response(resp),
DataResponse::Instrument(resp) => encode_instrument_response(resp),
DataResponse::Instruments(resp) => encode_instruments_response(resp),
DataResponse::Book(resp) => encode_book_response(resp),
DataResponse::BookDeltas(resp) => encode_book_deltas_response(resp),
DataResponse::BookDepth(resp) => encode_book_depth_response(resp),
DataResponse::Quotes(resp) => encode_quotes_response(resp),
DataResponse::Trades(resp) => encode_trades_response(resp),
DataResponse::FundingRates(resp) => encode_funding_rates_response(resp),
DataResponse::ForwardPrices(resp) => encode_forward_prices_response(resp),
DataResponse::Bars(resp) => encode_bars_response(resp),
}
}
fn encode_custom_data_response(
response: &CustomDataResponse,
) -> Result<EncodedPayload, EncodeError> {
#[derive(Serialize)]
struct CustomDataResponseRef<'a> {
correlation_id: &'a UUID4,
client_id: &'a ClientId,
venue: &'a Option<Venue>,
data_type: &'a DataType,
start: &'a Option<UnixNanos>,
end: &'a Option<UnixNanos>,
ts_init: &'a UnixNanos,
params: &'a Option<Params>,
}
let payload = encode_serde(&CustomDataResponseRef {
correlation_id: &response.correlation_id,
client_id: &response.client_id,
venue: &response.venue,
data_type: &response.data_type,
start: &response.start,
end: &response.end,
ts_init: &response.ts_init,
params: &response.params,
})?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_instrument_response(
response: &InstrumentResponse,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_INSTRUMENT_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_instruments_response(
response: &InstrumentsResponse,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_INSTRUMENTS_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_book_response(response: &BookResponse) -> Result<EncodedPayload, EncodeError> {
#[derive(Serialize)]
struct BookResponseRef<'a> {
correlation_id: &'a UUID4,
client_id: &'a ClientId,
instrument_id: &'a InstrumentId,
start: &'a Option<UnixNanos>,
end: &'a Option<UnixNanos>,
ts_init: &'a UnixNanos,
params: &'a Option<Params>,
}
let payload = encode_serde(&BookResponseRef {
correlation_id: &response.correlation_id,
client_id: &response.client_id,
instrument_id: &response.instrument_id,
start: &response.start,
end: &response.end,
ts_init: &response.ts_init,
params: &response.params,
})?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_BOOK_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_quotes_response(response: &QuotesResponse) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_QUOTES_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_book_deltas_response(
response: &BookDeltasResponse,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_book_depth_response(response: &BookDepthResponse) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_trades_response(response: &TradesResponse) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_TRADES_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_funding_rates_response(
response: &FundingRatesResponse,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_FUNDING_RATES_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_forward_prices_response(
response: &ForwardPricesResponse,
) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE),
payload,
Vec::new(),
))
}
fn encode_bars_response(response: &BarsResponse) -> Result<EncodedPayload, EncodeError> {
let payload = encode_serde(response)?;
Ok(EncodedPayload::with_payload_type(
payload_type(PAYLOAD_TYPE_BARS_RESPONSE),
payload,
Vec::new(),
))
}
#[cfg(test)]
mod tests {
use nautilus_common::messages::data::{
RequestCommand, RequestQuotes, SubscribeCommand, SubscribeQuotes, UnsubscribeCommand,
UnsubscribeQuotes,
};
#[cfg(feature = "defi")]
use nautilus_common::messages::defi::{
DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
SubscribeBlocks, UnsubscribeBlocks,
};
use nautilus_core::{UUID4, UnixNanos};
#[cfg(feature = "defi")]
use nautilus_model::defi::Blockchain;
use nautilus_model::{
data::{Bar, BarType, stubs::stub_depth10},
enums::{
AccountType, BookType, LiquiditySide, OrderSide, OrderStatus, OrderType,
PositionAdjustmentType, PositionSide, PositionSideSpecified, TimeInForce,
},
events::{
PositionAdjusted, PositionChanged, PositionClosed, PositionOpened,
order::spec::{OrderFilledSpec, OrderInitializedSpec, OrderSubmittedSpec},
},
identifiers::{
AccountId, ClientId, ClientOrderId, InstrumentId, OrderListId, PositionId, StrategyId,
TradeId, TraderId, Venue, VenueOrderId,
},
instruments::{InstrumentAny, stubs::currency_pair_ethusdt},
orderbook::OrderBook,
orders::OrderList,
reports::{ExecutionMassStatus, FillReport, PositionStatusReport},
types::{AccountBalance, Currency, Money, Price, Quantity},
};
use rstest::rstest;
use serde::Deserialize;
use super::*;
fn trader_id() -> TraderId {
TraderId::from("TRADER-001")
}
fn strategy_id() -> StrategyId {
StrategyId::from("S-001")
}
fn instrument_id() -> InstrumentId {
InstrumentId::from("ETHUSDT-PERP.BINANCE")
}
fn client_order_id() -> ClientOrderId {
ClientOrderId::from("O-20260510-000001")
}
fn venue_order_id() -> VenueOrderId {
VenueOrderId::from("V-12345")
}
fn make_submit_order() -> SubmitOrder {
let order_init = OrderInitializedSpec::builder()
.instrument_id(instrument_id())
.client_order_id(client_order_id())
.quantity(Quantity::from("1"))
.time_in_force(TimeInForce::Gtc)
.ts_event(UnixNanos::from(1))
.ts_init(UnixNanos::from(2))
.build();
SubmitOrder::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
client_order_id(),
order_init,
None,
None,
None,
UUID4::new(),
UnixNanos::from(3),
None, )
}
fn make_order_filled() -> OrderFilled {
OrderFilledSpec::builder()
.instrument_id(instrument_id())
.client_order_id(client_order_id())
.venue_order_id(venue_order_id())
.account_id(AccountId::from("BINANCE-001"))
.trade_id(TradeId::from("T-9999"))
.last_qty(Quantity::from("1"))
.last_px(Price::from("100.00"))
.currency(Currency::USDT())
.ts_event(UnixNanos::from(10))
.ts_init(UnixNanos::from(11))
.commission(Money::new(0.10, Currency::USDT()))
.build()
}
fn make_order_status_report() -> OrderStatusReport {
OrderStatusReport::new(
AccountId::from("BINANCE-001"),
instrument_id(),
Some(client_order_id()),
venue_order_id(),
OrderSide::Buy,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
Quantity::from("1"),
Quantity::from("1"),
UnixNanos::from(20),
UnixNanos::from(21),
UnixNanos::from(22),
Some(UUID4::new()),
)
}
#[rstest]
fn submit_order_encoder_emits_client_order_id_index() {
let cmd = make_submit_order();
let encoded = encode_submit_order(&cmd).expect("encode");
assert!(!encoded.payload.is_empty());
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(encoded.index_keys[0].key, cmd.client_order_id.to_string());
}
#[rstest]
fn order_filled_encoder_emits_client_and_venue_order_id_indices() {
let event = make_order_filled();
let encoded = encode_order_filled(&event).expect("encode");
assert!(!encoded.payload.is_empty());
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(encoded.index_keys[0].key, event.client_order_id.to_string());
assert_eq!(encoded.index_keys[1].kind, IndexKind::VenueOrderId);
assert_eq!(encoded.index_keys[1].key, event.venue_order_id.to_string());
}
#[rstest]
fn order_status_report_encoder_includes_client_order_id_when_present() {
let report = make_order_status_report();
let encoded = encode_order_status_report(&report).expect("encode");
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
assert_eq!(encoded.index_keys[1].kind, IndexKind::ClientOrderId);
}
#[rstest]
fn order_status_report_encoder_omits_client_order_id_when_absent() {
let mut report = make_order_status_report();
report.client_order_id = None;
let encoded = encode_order_status_report(&report).expect("encode");
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
}
#[rstest]
fn default_registry_covers_published_state_affecting_surface() {
let registry = default_registry();
let expected = [
(
"send_any_value(SubmitOrder) / bare SubmitOrder",
registry.contains::<SubmitOrder>(),
),
(
"publish_order_event(OrderFilled) / bare OrderFilled",
registry.contains::<OrderFilled>(),
),
(
"reconciliation.raw.order_status / OrderStatusReport",
registry.contains::<OrderStatusReport>(),
),
(
"reconciliation.raw.fill / FillReport",
registry.contains::<FillReport>(),
),
(
"reconciliation.raw.position / PositionStatusReport",
registry.contains::<PositionStatusReport>(),
),
(
"send_trading_command / TradingCommand",
registry.contains::<TradingCommand>(),
),
(
"publish_order_event / OrderEventAny",
registry.contains::<OrderEventAny>(),
),
(
"send_execution_report / ExecutionReport",
registry.contains::<ExecutionReport>(),
),
(
"publish_position_event / PositionEvent",
registry.contains::<PositionEvent>(),
),
(
"publish_account_state and send_account_state / AccountState",
registry.contains::<AccountState>(),
),
(
"time event handler firing / TimeEvent",
registry.contains::<TimeEvent>(),
),
(
"send_data_command / DataCommand",
registry.contains::<DataCommand>(),
),
(
"send_data_response / DataResponse",
registry.contains::<DataResponse>(),
),
];
let missing: Vec<&str> = expected
.iter()
.filter_map(|(name, registered)| (!*registered).then_some(*name))
.collect();
assert!(
missing.is_empty(),
"missing default event-store encoder registrations for {missing:?}",
);
assert_eq!(
registry.len(),
expected.len(),
"default registry must match the audited state-affecting surface",
);
}
#[rstest]
fn submit_order_payload_round_trips_through_msgpack() {
let cmd = make_submit_order();
let encoded = encode_submit_order(&cmd).expect("encode");
let decoded: SubmitOrder = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, cmd);
}
#[rstest]
fn order_filled_payload_round_trips_through_msgpack() {
let event = make_order_filled();
let encoded = encode_order_filled(&event).expect("encode");
let decoded: OrderFilled = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, event);
}
#[rstest]
fn order_status_report_payload_round_trips_through_msgpack() {
let report = make_order_status_report();
let encoded = encode_order_status_report(&report).expect("encode");
let decoded: OrderStatusReport = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, report);
}
fn make_cancel_order() -> CancelOrder {
CancelOrder::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
client_order_id(),
Some(venue_order_id()),
UUID4::new(),
UnixNanos::from(4),
None,
None, )
}
fn make_query_account() -> QueryAccount {
QueryAccount::new(
trader_id(),
Some(ClientId::from("BINANCE")),
AccountId::from("BINANCE-001"),
UUID4::new(),
UnixNanos::from(5),
None,
None, )
}
fn make_order_submitted() -> OrderSubmitted {
OrderSubmittedSpec::builder()
.instrument_id(instrument_id())
.client_order_id(client_order_id())
.account_id(AccountId::from("BINANCE-001"))
.ts_event(UnixNanos::from(30))
.ts_init(UnixNanos::from(31))
.build()
}
fn make_modify_order(venue: Option<VenueOrderId>) -> ModifyOrder {
ModifyOrder::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
client_order_id(),
venue,
Some(Quantity::from("2")),
Some(Price::from("100.00")),
None,
UUID4::new(),
UnixNanos::from(6),
None,
None, )
}
fn make_cancel_all_orders() -> CancelAllOrders {
CancelAllOrders::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
OrderSide::Buy,
UUID4::new(),
UnixNanos::from(7),
None,
None, )
}
fn make_query_order(venue: Option<VenueOrderId>) -> QueryOrder {
QueryOrder::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
client_order_id(),
venue,
UUID4::new(),
UnixNanos::from(8),
None,
None, )
}
fn make_batch_cancel_orders(cancels: Vec<CancelOrder>) -> BatchCancelOrders {
BatchCancelOrders::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
instrument_id(),
cancels,
UUID4::new(),
UnixNanos::from(9),
None,
None, )
}
fn make_submit_order_list(client_order_ids: Vec<ClientOrderId>) -> SubmitOrderList {
let order_inits: Vec<OrderInitialized> = client_order_ids
.iter()
.copied()
.map(make_order_initialized_with_id)
.collect();
let order_list = OrderList::new(
OrderListId::from("OL-1"),
instrument_id(),
strategy_id(),
client_order_ids,
UnixNanos::from(10),
);
SubmitOrderList::new(
trader_id(),
Some(ClientId::from("BINANCE")),
strategy_id(),
order_list,
order_inits,
None,
None,
None,
UUID4::new(),
UnixNanos::from(11),
None, )
}
fn make_order_initialized_with_id(client_order_id: ClientOrderId) -> OrderInitialized {
OrderInitialized {
client_order_id,
..OrderInitialized::default()
}
}
fn ev_initialized() -> OrderEventAny {
OrderEventAny::Initialized(make_order_initialized_with_id(client_order_id()))
}
fn ev_denied() -> OrderEventAny {
OrderEventAny::Denied(OrderDenied {
client_order_id: client_order_id(),
..Default::default()
})
}
fn ev_emulated() -> OrderEventAny {
OrderEventAny::Emulated(OrderEmulated {
client_order_id: client_order_id(),
..Default::default()
})
}
fn ev_released() -> OrderEventAny {
OrderEventAny::Released(OrderReleased {
client_order_id: client_order_id(),
..Default::default()
})
}
fn ev_submitted() -> OrderEventAny {
OrderEventAny::Submitted(make_order_submitted())
}
fn ev_accepted_with_venue(venue: VenueOrderId) -> OrderEventAny {
OrderEventAny::Accepted(OrderAccepted {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_rejected() -> OrderEventAny {
OrderEventAny::Rejected(OrderRejected {
client_order_id: client_order_id(),
..Default::default()
})
}
fn ev_canceled(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::Canceled(OrderCanceled {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_expired(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::Expired(OrderExpired {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_triggered(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::Triggered(OrderTriggered {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_pending_update(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::PendingUpdate(OrderPendingUpdate {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_pending_cancel(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::PendingCancel(OrderPendingCancel {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_modify_rejected(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::ModifyRejected(OrderModifyRejected {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_cancel_rejected(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::CancelRejected(OrderCancelRejected {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_updated(venue: Option<VenueOrderId>) -> OrderEventAny {
OrderEventAny::Updated(OrderUpdated {
client_order_id: client_order_id(),
venue_order_id: venue,
..Default::default()
})
}
fn ev_filled() -> OrderEventAny {
OrderEventAny::Filled(make_order_filled())
}
#[rstest]
fn trading_command_envelope_stamps_inner_submit_order_payload_type() {
let cmd = make_submit_order();
let bare = encode_submit_order(&cmd).expect("bare");
let envelope = TradingCommand::SubmitOrder(cmd);
let wrapped = encode_trading_command(&envelope).expect("envelope");
assert_eq!(wrapped.payload, bare.payload);
assert_eq!(wrapped.index_keys, bare.index_keys);
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_SUBMIT_ORDER,
);
}
#[rstest]
fn trading_command_cancel_order_envelope_emits_client_and_venue_indices() {
let cancel = make_cancel_order();
let envelope = TradingCommand::CancelOrder(cancel.clone());
let wrapped = encode_trading_command(&envelope).expect("envelope");
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_CANCEL_ORDER,
);
assert_eq!(wrapped.index_keys.len(), 2);
assert_eq!(wrapped.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
wrapped.index_keys[0].key,
cancel.client_order_id.to_string(),
);
assert_eq!(wrapped.index_keys[1].kind, IndexKind::VenueOrderId);
assert_eq!(
wrapped.index_keys[1].key,
cancel.venue_order_id.expect("set").to_string(),
);
let decoded: CancelOrder = rmp_serde::from_slice(&wrapped.payload).expect("decode");
assert_eq!(decoded, cancel);
}
#[rstest]
fn trading_command_query_account_envelope_records_no_order_indices() {
let envelope = TradingCommand::QueryAccount(make_query_account());
let wrapped = encode_trading_command(&envelope).expect("envelope");
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_QUERY_ACCOUNT,
);
assert!(wrapped.index_keys.is_empty());
}
#[rstest]
fn order_event_any_envelope_stamps_inner_filled_payload_type() {
let filled = make_order_filled();
let bare = encode_order_filled(&filled).expect("bare");
let envelope = OrderEventAny::Filled(filled);
let wrapped = encode_order_event_any(&envelope).expect("envelope");
assert_eq!(wrapped.payload, bare.payload);
assert_eq!(wrapped.index_keys, bare.index_keys);
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_ORDER_FILLED,
);
}
#[rstest]
fn order_event_any_submitted_envelope_emits_client_order_id_index() {
let submitted = make_order_submitted();
let envelope = OrderEventAny::Submitted(submitted);
let wrapped = encode_order_event_any(&envelope).expect("envelope");
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_ORDER_SUBMITTED,
);
assert_eq!(wrapped.index_keys.len(), 1);
assert_eq!(wrapped.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
wrapped.index_keys[0].key,
submitted.client_order_id.to_string(),
);
let decoded: OrderSubmitted = rmp_serde::from_slice(&wrapped.payload).expect("decode");
assert_eq!(decoded, submitted);
}
#[rstest]
#[case::submit_order(
TradingCommand::SubmitOrder(make_submit_order()),
PAYLOAD_TYPE_SUBMIT_ORDER,
1
)]
#[case::submit_order_list(
TradingCommand::SubmitOrderList(make_submit_order_list(vec![
ClientOrderId::from("O-A"),
ClientOrderId::from("O-B"),
])),
PAYLOAD_TYPE_SUBMIT_ORDER_LIST,
2,
)]
#[case::modify_order(
TradingCommand::ModifyOrder(make_modify_order(Some(venue_order_id()))),
PAYLOAD_TYPE_MODIFY_ORDER,
2
)]
#[case::cancel_order(
TradingCommand::CancelOrder(make_cancel_order()),
PAYLOAD_TYPE_CANCEL_ORDER,
2
)]
#[case::cancel_all_orders(
TradingCommand::CancelAllOrders(make_cancel_all_orders()),
PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
0
)]
#[case::batch_cancel_orders(
TradingCommand::BatchCancelOrders(make_batch_cancel_orders(vec![make_cancel_order()])),
PAYLOAD_TYPE_BATCH_CANCEL_ORDERS,
2,
)]
#[case::query_order(
TradingCommand::QueryOrder(make_query_order(Some(venue_order_id()))),
PAYLOAD_TYPE_QUERY_ORDER,
2
)]
#[case::query_account(
TradingCommand::QueryAccount(make_query_account()),
PAYLOAD_TYPE_QUERY_ACCOUNT,
0
)]
fn trading_command_envelope_stamps_inner_tag_for_every_variant(
#[case] command: TradingCommand,
#[case] expected_tag: &str,
#[case] expected_index_count: usize,
) {
let encoded = encode_trading_command(&command).expect("encode");
let tag = encoded.payload_type.expect("override").as_str().to_string();
assert_eq!(tag, expected_tag);
assert_ne!(
tag, PAYLOAD_TYPE_TRADING_COMMAND,
"wrapper fallback tag must never reach the writer",
);
assert_eq!(encoded.index_keys.len(), expected_index_count);
}
#[rstest]
#[case::initialized(ev_initialized(), PAYLOAD_TYPE_ORDER_INITIALIZED, false)]
#[case::denied(ev_denied(), PAYLOAD_TYPE_ORDER_DENIED, false)]
#[case::emulated(ev_emulated(), PAYLOAD_TYPE_ORDER_EMULATED, false)]
#[case::released(ev_released(), PAYLOAD_TYPE_ORDER_RELEASED, false)]
#[case::submitted(ev_submitted(), PAYLOAD_TYPE_ORDER_SUBMITTED, false)]
#[case::accepted(
ev_accepted_with_venue(venue_order_id()),
PAYLOAD_TYPE_ORDER_ACCEPTED,
true
)]
#[case::rejected(ev_rejected(), PAYLOAD_TYPE_ORDER_REJECTED, false)]
#[case::canceled(ev_canceled(Some(venue_order_id())), PAYLOAD_TYPE_ORDER_CANCELED, true)]
#[case::expired(ev_expired(Some(venue_order_id())), PAYLOAD_TYPE_ORDER_EXPIRED, true)]
#[case::triggered(
ev_triggered(Some(venue_order_id())),
PAYLOAD_TYPE_ORDER_TRIGGERED,
true
)]
#[case::pending_update(
ev_pending_update(Some(venue_order_id())),
PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
true
)]
#[case::pending_cancel(
ev_pending_cancel(Some(venue_order_id())),
PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
true
)]
#[case::modify_rejected(
ev_modify_rejected(Some(venue_order_id())),
PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
true
)]
#[case::cancel_rejected(
ev_cancel_rejected(Some(venue_order_id())),
PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
true
)]
#[case::updated(ev_updated(Some(venue_order_id())), PAYLOAD_TYPE_ORDER_UPDATED, true)]
#[case::filled(ev_filled(), PAYLOAD_TYPE_ORDER_FILLED, true)]
fn order_event_any_envelope_stamps_inner_tag_for_every_variant(
#[case] event: OrderEventAny,
#[case] expected_tag: &str,
#[case] expects_venue_index: bool,
) {
let encoded = encode_order_event_any(&event).expect("encode");
let tag = encoded.payload_type.expect("override").as_str().to_string();
assert_eq!(tag, expected_tag);
assert_ne!(
tag, PAYLOAD_TYPE_ORDER_EVENT_ANY,
"wrapper fallback tag must never reach the writer",
);
assert_eq!(
encoded.index_keys[0].kind,
IndexKind::ClientOrderId,
"first index must always be ClientOrderId for every order event",
);
assert_eq!(encoded.index_keys[0].key, client_order_id().to_string(),);
if expects_venue_index {
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[1].kind, IndexKind::VenueOrderId);
} else {
assert_eq!(encoded.index_keys.len(), 1);
}
}
#[rstest]
#[case::cancel_order_some(TradingCommand::CancelOrder(make_cancel_order()), 2)]
#[case::cancel_order_none(
TradingCommand::CancelOrder(CancelOrder {
venue_order_id: None,
..make_cancel_order()
}),
1,
)]
#[case::modify_order_some(
TradingCommand::ModifyOrder(make_modify_order(Some(venue_order_id()))),
2
)]
#[case::modify_order_none(TradingCommand::ModifyOrder(make_modify_order(None)), 1)]
#[case::query_order_some(
TradingCommand::QueryOrder(make_query_order(Some(venue_order_id()))),
2
)]
#[case::query_order_none(TradingCommand::QueryOrder(make_query_order(None)), 1)]
fn trading_command_envelope_index_count_matches_venue_optionality(
#[case] command: TradingCommand,
#[case] expected_index_count: usize,
) {
let encoded = encode_trading_command(&command).expect("encode");
assert_eq!(encoded.index_keys.len(), expected_index_count);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
if expected_index_count == 2 {
assert_eq!(encoded.index_keys[1].kind, IndexKind::VenueOrderId);
}
}
#[rstest]
#[case::canceled_some(ev_canceled(Some(venue_order_id())), 2)]
#[case::canceled_none(ev_canceled(None), 1)]
#[case::updated_some(ev_updated(Some(venue_order_id())), 2)]
#[case::updated_none(ev_updated(None), 1)]
#[case::pending_update_some(ev_pending_update(Some(venue_order_id())), 2)]
#[case::pending_update_none(ev_pending_update(None), 1)]
fn order_event_any_envelope_index_count_matches_venue_optionality(
#[case] event: OrderEventAny,
#[case] expected_index_count: usize,
) {
let encoded = encode_order_event_any(&event).expect("encode");
assert_eq!(encoded.index_keys.len(), expected_index_count);
}
#[rstest]
fn batch_cancel_orders_envelope_indexes_each_child_with_optional_venue() {
let with_venue = make_cancel_order();
let mut without_venue = make_cancel_order();
without_venue.venue_order_id = None;
without_venue.client_order_id = ClientOrderId::from("O-NOVENUE");
let batch = make_batch_cancel_orders(vec![with_venue.clone(), without_venue.clone()]);
let encoded =
encode_trading_command(&TradingCommand::BatchCancelOrders(batch)).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_BATCH_CANCEL_ORDERS,
);
assert_eq!(encoded.index_keys.len(), 3);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[0].key,
with_venue.client_order_id.to_string(),
);
assert_eq!(encoded.index_keys[1].kind, IndexKind::VenueOrderId);
assert_eq!(
encoded.index_keys[1].key,
with_venue.venue_order_id.expect("set").to_string(),
);
assert_eq!(encoded.index_keys[2].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[2].key,
without_venue.client_order_id.to_string(),
);
}
#[rstest]
fn submit_order_list_envelope_indexes_each_client_order_id() {
let ids = vec![
ClientOrderId::from("O-LIST-1"),
ClientOrderId::from("O-LIST-2"),
ClientOrderId::from("O-LIST-3"),
];
let cmd = make_submit_order_list(ids.clone());
let encoded =
encode_trading_command(&TradingCommand::SubmitOrderList(cmd)).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_SUBMIT_ORDER_LIST,
);
assert_eq!(encoded.index_keys.len(), ids.len());
for (idx, expected_id) in ids.iter().enumerate() {
assert_eq!(encoded.index_keys[idx].kind, IndexKind::ClientOrderId);
assert_eq!(encoded.index_keys[idx].key, expected_id.to_string());
}
}
fn make_fill_report() -> FillReport {
FillReport::new(
AccountId::from("BINANCE-001"),
instrument_id(),
venue_order_id(),
TradeId::from("T-1111"),
OrderSide::Buy,
Quantity::from("1"),
Price::from("100.00"),
Money::new(0.10, Currency::USDT()),
LiquiditySide::Taker,
Some(client_order_id()),
None,
UnixNanos::from(40),
UnixNanos::from(41),
None,
)
}
fn make_position_status_report() -> PositionStatusReport {
PositionStatusReport::new(
AccountId::from("BINANCE-001"),
instrument_id(),
PositionSideSpecified::Long,
Quantity::from("1"),
UnixNanos::from(50),
UnixNanos::from(51),
None,
Some(PositionId::from("P-001")),
None,
)
}
fn make_execution_mass_status_with_reports() -> ExecutionMassStatus {
let mut status = ExecutionMassStatus::new(
ClientId::from("BINANCE"),
AccountId::from("BINANCE-001"),
Venue::from("BINANCE"),
UnixNanos::from(60),
None,
);
status.add_order_reports(vec![make_order_status_report()]);
status.add_fill_reports(vec![make_fill_report()]);
status.add_position_reports(vec![make_position_status_report()]);
status
}
#[rstest]
fn execution_report_order_envelope_reuses_bare_status_encoder() {
let report = make_order_status_report();
let bare = encode_order_status_report(&report).expect("bare");
let envelope = ExecutionReport::Order(Box::new(report));
let wrapped = encode_execution_report(&envelope).expect("envelope");
assert_eq!(wrapped.payload, bare.payload);
assert_eq!(wrapped.index_keys, bare.index_keys);
assert_eq!(
wrapped.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_ORDER_STATUS_REPORT,
);
}
#[rstest]
fn execution_report_fill_envelope_emits_venue_and_client_order_id_indices() {
let fill = make_fill_report();
let envelope = ExecutionReport::Fill(Box::new(fill.clone()));
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_FILL_REPORT,
);
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
assert_eq!(encoded.index_keys[0].key, fill.venue_order_id.to_string());
assert_eq!(encoded.index_keys[1].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[1].key,
fill.client_order_id.expect("set").to_string(),
);
let decoded: FillReport = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, fill);
}
#[rstest]
fn execution_report_fill_envelope_omits_client_order_id_when_absent() {
let mut fill = make_fill_report();
fill.client_order_id = None;
let envelope = ExecutionReport::Fill(Box::new(fill));
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
}
#[rstest]
fn execution_report_position_envelope_records_no_indices() {
let position = make_position_status_report();
let envelope = ExecutionReport::Position(Box::new(position.clone()));
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_POSITION_STATUS_REPORT,
);
assert!(encoded.index_keys.is_empty());
let decoded: PositionStatusReport =
rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, position);
}
#[rstest]
fn execution_report_order_with_fills_envelope_dedupes_shared_order_ids() {
let order = make_order_status_report();
let fills = vec![make_fill_report()];
let envelope = ExecutionReport::OrderWithFills(Box::new(order.clone()), fills);
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_ORDER_WITH_FILLS,
);
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
assert_eq!(encoded.index_keys[0].key, order.venue_order_id.to_string());
assert_eq!(encoded.index_keys[1].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[1].key,
order.client_order_id.expect("set").to_string(),
);
}
#[rstest]
fn execution_report_order_with_fills_envelope_indexes_distinct_fill_ids() {
let order = make_order_status_report();
let mut fill = make_fill_report();
fill.client_order_id = Some(ClientOrderId::from("O-EXTRA-001"));
let envelope = ExecutionReport::OrderWithFills(Box::new(order), vec![fill.clone()]);
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(encoded.index_keys.len(), 3);
assert_eq!(encoded.index_keys[2].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[2].key,
fill.client_order_id.expect("set").to_string(),
);
}
#[rstest]
fn execution_report_order_with_fills_payload_round_trips() {
#[derive(serde::Deserialize)]
struct OrderWithFillsOwned {
order_report: OrderStatusReport,
fill_reports: Vec<FillReport>,
}
let order = make_order_status_report();
let fills = vec![make_fill_report()];
let envelope = ExecutionReport::OrderWithFills(Box::new(order.clone()), fills.clone());
let encoded = encode_execution_report(&envelope).expect("encode");
let decoded: OrderWithFillsOwned = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.order_report, order);
assert_eq!(decoded.fill_reports, fills);
}
#[rstest]
fn execution_report_mass_status_envelope_indexes_orders_and_fills() {
let status = make_execution_mass_status_with_reports();
let envelope = ExecutionReport::MassStatus(Box::new(status.clone()));
let encoded = encode_execution_report(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
);
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::VenueOrderId);
assert_eq!(encoded.index_keys[0].key, venue_order_id().to_string());
assert_eq!(encoded.index_keys[1].kind, IndexKind::ClientOrderId);
assert_eq!(encoded.index_keys[1].key, client_order_id().to_string());
let decoded: ExecutionMassStatus = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, status);
}
#[rstest]
fn execution_report_mass_status_envelope_indexes_distinct_children() {
let mut status = ExecutionMassStatus::new(
ClientId::from("BINANCE"),
AccountId::from("BINANCE-001"),
Venue::from("BINANCE"),
UnixNanos::from(60),
None,
);
let order_a = make_order_status_report();
let order_b = OrderStatusReport {
client_order_id: Some(ClientOrderId::from("O-B")),
venue_order_id: VenueOrderId::from("V-B"),
..make_order_status_report()
};
let fill_c = FillReport {
venue_order_id: VenueOrderId::from("V-C"),
client_order_id: Some(ClientOrderId::from("O-C")),
..make_fill_report()
};
status.add_order_reports(vec![order_a, order_b]);
status.add_fill_reports(vec![fill_c]);
let encoded = encode_execution_report(&ExecutionReport::MassStatus(Box::new(status)))
.expect("encode");
assert_eq!(encoded.index_keys.len(), 6);
let venue_keys: Vec<&str> = encoded
.index_keys
.iter()
.filter(|k| k.kind == IndexKind::VenueOrderId)
.map(|k| k.key.as_str())
.collect();
let client_keys: Vec<&str> = encoded
.index_keys
.iter()
.filter(|k| k.kind == IndexKind::ClientOrderId)
.map(|k| k.key.as_str())
.collect();
assert!(venue_keys.contains(&venue_order_id().to_string().as_str()));
assert!(venue_keys.contains(&"V-B"));
assert!(venue_keys.contains(&"V-C"));
assert!(client_keys.contains(&client_order_id().to_string().as_str()));
assert!(client_keys.contains(&"O-B"));
assert!(client_keys.contains(&"O-C"));
}
#[rstest]
#[case::order(
ExecutionReport::Order(Box::new(make_order_status_report())),
PAYLOAD_TYPE_ORDER_STATUS_REPORT
)]
#[case::fill(
ExecutionReport::Fill(Box::new(make_fill_report())),
PAYLOAD_TYPE_FILL_REPORT
)]
#[case::order_with_fills(
ExecutionReport::OrderWithFills(
Box::new(make_order_status_report()),
vec![make_fill_report()],
),
PAYLOAD_TYPE_ORDER_WITH_FILLS,
)]
#[case::position(
ExecutionReport::Position(Box::new(make_position_status_report())),
PAYLOAD_TYPE_POSITION_STATUS_REPORT
)]
#[case::mass_status(
ExecutionReport::MassStatus(Box::new(make_execution_mass_status_with_reports())),
PAYLOAD_TYPE_EXECUTION_MASS_STATUS
)]
fn execution_report_envelope_stamps_inner_tag_for_every_variant(
#[case] report: ExecutionReport,
#[case] expected_tag: &str,
) {
let encoded = encode_execution_report(&report).expect("encode");
let tag = encoded.payload_type.expect("override").as_str().to_string();
assert_eq!(tag, expected_tag);
assert_ne!(
tag, PAYLOAD_TYPE_EXECUTION_REPORT,
"wrapper fallback tag must never reach the writer",
);
}
fn opening_order_id() -> ClientOrderId {
ClientOrderId::from("O-OPEN-001")
}
fn closing_order_id() -> ClientOrderId {
ClientOrderId::from("O-CLOSE-001")
}
fn position_id() -> PositionId {
PositionId::from("P-001")
}
fn make_position_opened() -> PositionOpened {
PositionOpened {
trader_id: trader_id(),
strategy_id: strategy_id(),
instrument_id: instrument_id(),
position_id: position_id(),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: opening_order_id(),
entry: OrderSide::Buy,
side: PositionSide::Long,
signed_qty: 1.0,
quantity: Quantity::from("1"),
last_qty: Quantity::from("1"),
last_px: Price::from("100.00"),
currency: Currency::USDT(),
avg_px_open: 100.0,
event_id: UUID4::new(),
ts_event: UnixNanos::from(70),
ts_init: UnixNanos::from(71),
}
}
fn make_position_changed() -> PositionChanged {
PositionChanged {
trader_id: trader_id(),
strategy_id: strategy_id(),
instrument_id: instrument_id(),
position_id: position_id(),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: opening_order_id(),
entry: OrderSide::Buy,
side: PositionSide::Long,
signed_qty: 2.0,
quantity: Quantity::from("2"),
peak_quantity: Quantity::from("2"),
last_qty: Quantity::from("1"),
last_px: Price::from("101.00"),
currency: Currency::USDT(),
avg_px_open: 100.5,
avg_px_close: None,
realized_return: 0.0,
realized_pnl: None,
unrealized_pnl: Money::new(1.0, Currency::USDT()),
event_id: UUID4::new(),
ts_opened: UnixNanos::from(70),
ts_event: UnixNanos::from(80),
ts_init: UnixNanos::from(81),
}
}
fn make_position_closed() -> PositionClosed {
PositionClosed {
trader_id: trader_id(),
strategy_id: strategy_id(),
instrument_id: instrument_id(),
position_id: position_id(),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: opening_order_id(),
closing_order_id: Some(closing_order_id()),
entry: OrderSide::Buy,
side: PositionSide::Flat,
signed_qty: 0.0,
quantity: Quantity::from("0"),
peak_quantity: Quantity::from("2"),
last_qty: Quantity::from("2"),
last_px: Price::from("102.00"),
currency: Currency::USDT(),
avg_px_open: 100.5,
avg_px_close: Some(102.0),
realized_return: 0.015,
realized_pnl: Some(Money::new(3.0, Currency::USDT())),
unrealized_pnl: Money::new(0.0, Currency::USDT()),
duration: 3_600_000_000_000,
event_id: UUID4::new(),
ts_opened: UnixNanos::from(70),
ts_closed: Some(UnixNanos::from(90)),
ts_event: UnixNanos::from(90),
ts_init: UnixNanos::from(91),
}
}
fn make_position_adjusted() -> PositionAdjusted {
PositionAdjusted::new(
trader_id(),
strategy_id(),
instrument_id(),
position_id(),
AccountId::from("BINANCE-001"),
PositionAdjustmentType::Commission,
None,
None,
None,
UUID4::new(),
UnixNanos::from(100),
UnixNanos::from(101),
)
}
#[rstest]
fn position_event_opened_envelope_emits_opening_order_id_index() {
let opened = make_position_opened();
let envelope = PositionEvent::PositionOpened(opened.clone());
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_POSITION_OPENED,
);
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[0].key,
opened.opening_order_id.to_string(),
);
let decoded: PositionOpened = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, opened);
}
#[rstest]
fn position_event_changed_envelope_emits_opening_order_id_index() {
let changed = make_position_changed();
let envelope = PositionEvent::PositionChanged(changed.clone());
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_POSITION_CHANGED,
);
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[0].key,
changed.opening_order_id.to_string(),
);
let decoded: PositionChanged = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, changed);
}
#[rstest]
fn position_event_closed_envelope_indexes_both_opening_and_closing_order_ids() {
let closed = make_position_closed();
let envelope = PositionEvent::PositionClosed(closed.clone());
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_POSITION_CLOSED,
);
assert_eq!(encoded.index_keys.len(), 2);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[0].key,
closed.opening_order_id.to_string(),
);
assert_eq!(encoded.index_keys[1].kind, IndexKind::ClientOrderId);
assert_eq!(
encoded.index_keys[1].key,
closed.closing_order_id.expect("set").to_string(),
);
let decoded: PositionClosed = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, closed);
}
#[rstest]
fn position_event_closed_envelope_omits_closing_order_id_when_absent() {
let mut closed = make_position_closed();
closed.closing_order_id = None;
let envelope = PositionEvent::PositionClosed(closed);
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
assert_eq!(encoded.index_keys[0].key, opening_order_id().to_string());
}
#[rstest]
fn position_event_closed_envelope_dedupes_when_open_and_close_match() {
let mut closed = make_position_closed();
closed.closing_order_id = Some(closed.opening_order_id);
let envelope = PositionEvent::PositionClosed(closed);
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(encoded.index_keys.len(), 1);
assert_eq!(encoded.index_keys[0].key, opening_order_id().to_string());
}
#[rstest]
fn position_event_adjusted_envelope_records_no_indices() {
let adjusted = make_position_adjusted();
let envelope = PositionEvent::PositionAdjusted(adjusted);
let encoded = encode_position_event(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_POSITION_ADJUSTED,
);
assert!(encoded.index_keys.is_empty());
let decoded: PositionAdjusted = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, adjusted);
}
#[rstest]
#[case::opened(
PositionEvent::PositionOpened(make_position_opened()),
PAYLOAD_TYPE_POSITION_OPENED
)]
#[case::changed(
PositionEvent::PositionChanged(make_position_changed()),
PAYLOAD_TYPE_POSITION_CHANGED
)]
#[case::closed(
PositionEvent::PositionClosed(make_position_closed()),
PAYLOAD_TYPE_POSITION_CLOSED
)]
#[case::adjusted(
PositionEvent::PositionAdjusted(make_position_adjusted()),
PAYLOAD_TYPE_POSITION_ADJUSTED
)]
fn position_event_envelope_stamps_inner_tag_for_every_variant(
#[case] event: PositionEvent,
#[case] expected_tag: &str,
) {
let encoded = encode_position_event(&event).expect("encode");
let tag = encoded.payload_type.expect("override").as_str().to_string();
assert_eq!(tag, expected_tag);
assert_ne!(
tag, PAYLOAD_TYPE_POSITION_EVENT,
"wrapper fallback tag must never reach the writer",
);
}
fn make_account_state() -> AccountState {
AccountState::new(
AccountId::from("BINANCE-001"),
AccountType::Cash,
vec![AccountBalance::new(
Money::from("1000000 USD"),
Money::from("0 USD"),
Money::from("1000000 USD"),
)],
vec![],
true,
UUID4::new(),
UnixNanos::from(110),
UnixNanos::from(111),
Some(Currency::USD()),
)
}
#[rstest]
fn account_state_encoder_records_no_indices() {
let state = make_account_state();
let encoded = encode_account_state(&state).expect("encode");
assert!(!encoded.payload.is_empty());
assert!(encoded.index_keys.is_empty());
assert!(
encoded.payload_type.is_none(),
"bare-type encoders inherit the registry's registered tag",
);
}
#[rstest]
fn account_state_payload_round_trips_through_msgpack() {
let state = make_account_state();
let encoded = encode_account_state(&state).expect("encode");
let decoded: AccountState = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded, state);
}
#[rstest]
fn account_state_registered_under_canonical_payload_type() {
let registry = default_registry();
let state = make_account_state();
let (tag, encoded) = registry
.encode(&state)
.expect("encode")
.expect("registered");
assert_eq!(tag.as_str(), PAYLOAD_TYPE_ACCOUNT_STATE);
assert!(encoded.index_keys.is_empty());
}
#[derive(Debug, Deserialize, PartialEq, Eq)]
struct DecodedTimeEventPayload {
name: String,
event_id: UUID4,
ts_event: UnixNanos,
ts_init: UnixNanos,
}
#[rstest]
fn time_event_payload_round_trips_through_msgpack() {
let event = TimeEvent::new(
Ustr::from("heartbeat"),
UUID4::new(),
UnixNanos::from(100),
UnixNanos::from(99),
);
let encoded = encode_time_event(&event).expect("encode");
let decoded: DecodedTimeEventPayload =
rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(
decoded,
DecodedTimeEventPayload {
name: event.name.to_string(),
event_id: event.event_id,
ts_event: event.ts_event,
ts_init: event.ts_init,
},
);
assert!(encoded.index_keys.is_empty());
}
#[rstest]
fn time_event_registered_under_canonical_payload_type() {
let registry = default_registry();
let event = TimeEvent::new(
Ustr::from("heartbeat"),
UUID4::new(),
UnixNanos::from(100),
UnixNanos::from(99),
);
let (tag, encoded) = registry
.encode(&event)
.expect("encode")
.expect("registered");
assert_eq!(tag.as_str(), PAYLOAD_TYPE_TIME_EVENT);
assert!(encoded.index_keys.is_empty());
}
#[rstest]
fn data_command_request_envelope_stamps_request_command_payload_type() {
let request = make_request_command();
let envelope = DataCommand::Request(request.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_REQUEST_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: RequestCommand = rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, request) {
(RequestCommand::Quotes(decoded), RequestCommand::Quotes(expected)) => {
assert_eq!(decoded.request_id, expected.request_id);
assert_eq!(decoded.instrument_id, expected.instrument_id);
}
other => panic!("expected RequestCommand::Quotes round trip, was {other:?}"),
}
}
#[rstest]
fn data_command_subscribe_envelope_stamps_subscribe_command_payload_type() {
let subscribe = make_subscribe_command();
let envelope = DataCommand::Subscribe(subscribe.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: SubscribeCommand = rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, subscribe) {
(SubscribeCommand::Quotes(decoded), SubscribeCommand::Quotes(expected)) => {
assert_eq!(decoded.command_id, expected.command_id);
assert_eq!(decoded.instrument_id, expected.instrument_id);
}
other => panic!("expected SubscribeCommand::Quotes round trip, was {other:?}"),
}
}
#[rstest]
fn data_command_unsubscribe_envelope_stamps_unsubscribe_command_payload_type() {
let unsubscribe = make_unsubscribe_command();
let envelope = DataCommand::Unsubscribe(unsubscribe.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: UnsubscribeCommand = rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, unsubscribe) {
(UnsubscribeCommand::Quotes(decoded), UnsubscribeCommand::Quotes(expected)) => {
assert_eq!(decoded.command_id, expected.command_id);
assert_eq!(decoded.instrument_id, expected.instrument_id);
}
other => panic!("expected UnsubscribeCommand::Quotes round trip, was {other:?}"),
}
}
#[rstest]
fn data_command_registered_under_category_payload_type() {
let registry = default_registry();
let envelope = DataCommand::Subscribe(make_subscribe_command());
let (tag, encoded) = registry
.encode(&envelope)
.expect("encode")
.expect("registered");
assert_eq!(tag.as_str(), PAYLOAD_TYPE_SUBSCRIBE_COMMAND);
assert!(encoded.index_keys.is_empty());
}
#[cfg(feature = "defi")]
#[rstest]
fn data_command_defi_request_envelope_stamps_defi_request_command_payload_type() {
let request = make_defi_request_command();
let envelope = DataCommand::DefiRequest(request.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_DEFI_REQUEST_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: DefiRequestCommand = rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, request) {
(
DefiRequestCommand::PoolSnapshot(decoded),
DefiRequestCommand::PoolSnapshot(expected),
) => {
assert_eq!(decoded.request_id, expected.request_id);
assert_eq!(decoded.instrument_id, expected.instrument_id);
assert_eq!(decoded.client_id, expected.client_id);
assert_eq!(decoded.ts_init, expected.ts_init);
}
}
}
#[cfg(feature = "defi")]
#[rstest]
fn data_command_defi_subscribe_envelope_stamps_defi_subscribe_command_payload_type() {
let subscribe = make_defi_subscribe_command();
let envelope = DataCommand::DefiSubscribe(subscribe.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: DefiSubscribeCommand =
rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, subscribe) {
(DefiSubscribeCommand::Blocks(decoded), DefiSubscribeCommand::Blocks(expected)) => {
assert_eq!(decoded.command_id, expected.command_id);
assert_eq!(decoded.chain, expected.chain);
assert_eq!(decoded.client_id, expected.client_id);
assert_eq!(decoded.ts_init, expected.ts_init);
}
other => panic!("expected DefiSubscribeCommand::Blocks round trip, was {other:?}"),
}
}
#[cfg(feature = "defi")]
#[rstest]
fn data_command_defi_unsubscribe_envelope_stamps_defi_unsubscribe_command_payload_type() {
let unsubscribe = make_defi_unsubscribe_command();
let envelope = DataCommand::DefiUnsubscribe(unsubscribe.clone());
let encoded = encode_data_command(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
);
assert!(encoded.index_keys.is_empty());
let decoded: DefiUnsubscribeCommand =
rmp_serde::from_slice(&encoded.payload).expect("decode");
match (decoded, unsubscribe) {
(DefiUnsubscribeCommand::Blocks(decoded), DefiUnsubscribeCommand::Blocks(expected)) => {
assert_eq!(decoded.command_id, expected.command_id);
assert_eq!(decoded.chain, expected.chain);
assert_eq!(decoded.client_id, expected.client_id);
assert_eq!(decoded.ts_init, expected.ts_init);
}
other => panic!("expected DefiUnsubscribeCommand::Blocks round trip, was {other:?}"),
}
}
fn client_id() -> ClientId {
ClientId::from("BINANCE")
}
fn venue() -> Venue {
Venue::from("BINANCE")
}
fn correlation_id() -> UUID4 {
UUID4::new()
}
fn bar_type() -> BarType {
BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL")
}
fn data_type() -> DataType {
DataType::new("Bar", None, None)
}
fn make_request_command() -> RequestCommand {
RequestCommand::Quotes(RequestQuotes::new(
instrument_id(),
None,
None,
None,
Some(client_id()),
correlation_id(),
UnixNanos::from(197),
None,
))
}
fn make_subscribe_command() -> SubscribeCommand {
SubscribeCommand::Quotes(SubscribeQuotes::new(
instrument_id(),
Some(client_id()),
Some(venue()),
correlation_id(),
UnixNanos::from(198),
Some(correlation_id()),
None,
))
}
fn make_unsubscribe_command() -> UnsubscribeCommand {
UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
instrument_id(),
Some(client_id()),
Some(venue()),
correlation_id(),
UnixNanos::from(199),
Some(correlation_id()),
None,
))
}
#[cfg(feature = "defi")]
fn make_defi_request_command() -> DefiRequestCommand {
DefiRequestCommand::PoolSnapshot(RequestPoolSnapshot::new(
instrument_id(),
Some(client_id()),
correlation_id(),
UnixNanos::from(196),
None,
))
}
#[cfg(feature = "defi")]
fn make_defi_subscribe_command() -> DefiSubscribeCommand {
DefiSubscribeCommand::Blocks(SubscribeBlocks::new(
Blockchain::Ethereum,
Some(client_id()),
correlation_id(),
UnixNanos::from(195),
None,
))
}
#[cfg(feature = "defi")]
fn make_defi_unsubscribe_command() -> DefiUnsubscribeCommand {
DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks::new(
Blockchain::Ethereum,
Some(client_id()),
correlation_id(),
UnixNanos::from(194),
None,
))
}
fn make_custom_data_response() -> CustomDataResponse {
CustomDataResponse::new(
correlation_id(),
client_id(),
Some(venue()),
data_type(),
(),
None,
None,
UnixNanos::from(200),
None,
)
}
fn make_instrument_response() -> InstrumentResponse {
InstrumentResponse::new(
correlation_id(),
client_id(),
instrument_id(),
InstrumentAny::CurrencyPair(currency_pair_ethusdt()),
None,
None,
UnixNanos::from(201),
None,
)
}
fn make_instruments_response() -> InstrumentsResponse {
InstrumentsResponse::new(
correlation_id(),
client_id(),
venue(),
vec![InstrumentAny::CurrencyPair(currency_pair_ethusdt())],
None,
None,
UnixNanos::from(202),
None,
)
}
fn make_book_response() -> BookResponse {
BookResponse::new(
correlation_id(),
client_id(),
instrument_id(),
OrderBook::new(instrument_id(), BookType::L2_MBP),
None,
None,
UnixNanos::from(203),
None,
)
}
fn make_book_deltas_response() -> BookDeltasResponse {
BookDeltasResponse::new(
correlation_id(),
client_id(),
instrument_id(),
Vec::new(),
None,
None,
UnixNanos::from(204),
None,
)
}
fn make_book_depth_response() -> BookDepthResponse {
let mut depth = stub_depth10();
depth.instrument_id = instrument_id();
BookDepthResponse::new(
correlation_id(),
client_id(),
instrument_id(),
vec![depth],
None,
None,
UnixNanos::from(205),
None,
)
}
fn make_quotes_response() -> QuotesResponse {
QuotesResponse::new(
correlation_id(),
client_id(),
instrument_id(),
Vec::new(),
None,
None,
UnixNanos::from(206),
None,
)
}
fn make_trades_response() -> TradesResponse {
TradesResponse::new(
correlation_id(),
client_id(),
instrument_id(),
Vec::new(),
None,
None,
UnixNanos::from(207),
None,
)
}
fn make_funding_rates_response() -> FundingRatesResponse {
FundingRatesResponse::new(
correlation_id(),
client_id(),
instrument_id(),
Vec::new(),
None,
None,
UnixNanos::from(208),
None,
)
}
fn make_forward_prices_response() -> ForwardPricesResponse {
ForwardPricesResponse::new(
correlation_id(),
client_id(),
venue(),
Vec::new(),
UnixNanos::from(209),
None,
)
}
fn make_bars_response() -> BarsResponse {
BarsResponse::new(
correlation_id(),
client_id(),
bar_type(),
Vec::<Bar>::new(),
None,
None,
UnixNanos::from(210),
None,
)
}
#[rstest]
fn data_response_custom_data_envelope_stamps_inner_tag_with_no_indices() {
let response = make_custom_data_response();
let envelope = DataResponse::Data(response);
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
);
assert!(encoded.index_keys.is_empty());
assert!(!encoded.payload.is_empty());
}
#[rstest]
fn data_response_custom_data_payload_round_trips_metadata() {
#[derive(serde::Deserialize)]
struct CustomDataResponseOwned {
correlation_id: UUID4,
client_id: ClientId,
venue: Option<Venue>,
ts_init: UnixNanos,
}
let response = make_custom_data_response();
let envelope = DataResponse::Data(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
let decoded: CustomDataResponseOwned =
rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.client_id, response.client_id);
assert_eq!(decoded.venue, response.venue);
assert_eq!(decoded.ts_init, response.ts_init);
}
#[rstest]
fn data_response_book_envelope_stamps_inner_tag_with_no_indices() {
let response = make_book_response();
let envelope = DataResponse::Book(response);
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_BOOK_RESPONSE,
);
assert!(encoded.index_keys.is_empty());
assert!(!encoded.payload.is_empty());
}
#[rstest]
fn data_response_book_payload_round_trips_metadata() {
#[derive(serde::Deserialize)]
struct BookResponseOwned {
correlation_id: UUID4,
instrument_id: InstrumentId,
}
let response = make_book_response();
let envelope = DataResponse::Book(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
let decoded: BookResponseOwned = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.instrument_id, response.instrument_id);
}
#[rstest]
fn data_response_book_depth_payload_round_trips() {
let response = make_book_depth_response();
let envelope = DataResponse::BookDepth(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
);
assert!(encoded.index_keys.is_empty());
let decoded: BookDepthResponse = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.instrument_id, response.instrument_id);
assert_eq!(decoded.data, response.data);
}
#[rstest]
fn data_response_quotes_payload_round_trips() {
let response = make_quotes_response();
let envelope = DataResponse::Quotes(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_QUOTES_RESPONSE,
);
assert!(encoded.index_keys.is_empty());
let decoded: QuotesResponse = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.instrument_id, response.instrument_id);
assert_eq!(decoded.data, response.data);
}
#[rstest]
fn data_response_trades_payload_round_trips() {
let response = make_trades_response();
let envelope = DataResponse::Trades(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_TRADES_RESPONSE,
);
let decoded: TradesResponse = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
}
#[rstest]
fn data_response_bars_payload_round_trips() {
let response = make_bars_response();
let envelope = DataResponse::Bars(response.clone());
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_BARS_RESPONSE,
);
let decoded: BarsResponse = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.bar_type, response.bar_type);
}
#[rstest]
fn data_response_instrument_payload_round_trips() {
let response = make_instrument_response();
let envelope = DataResponse::Instrument(Box::new(response.clone()));
let encoded = encode_data_response(&envelope).expect("encode");
assert_eq!(
encoded.payload_type.expect("override").as_str(),
PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
);
let decoded: InstrumentResponse = rmp_serde::from_slice(&encoded.payload).expect("decode");
assert_eq!(decoded.correlation_id, response.correlation_id);
assert_eq!(decoded.instrument_id, response.instrument_id);
}
#[rstest]
#[case::data(
DataResponse::Data(make_custom_data_response()),
PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE
)]
#[case::instrument(
DataResponse::Instrument(Box::new(make_instrument_response())),
PAYLOAD_TYPE_INSTRUMENT_RESPONSE
)]
#[case::instruments(
DataResponse::Instruments(make_instruments_response()),
PAYLOAD_TYPE_INSTRUMENTS_RESPONSE
)]
#[case::book(DataResponse::Book(make_book_response()), PAYLOAD_TYPE_BOOK_RESPONSE)]
#[case::book_deltas(
DataResponse::BookDeltas(make_book_deltas_response()),
PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE
)]
#[case::book_depth(
DataResponse::BookDepth(make_book_depth_response()),
PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE
)]
#[case::quotes(
DataResponse::Quotes(make_quotes_response()),
PAYLOAD_TYPE_QUOTES_RESPONSE
)]
#[case::trades(
DataResponse::Trades(make_trades_response()),
PAYLOAD_TYPE_TRADES_RESPONSE
)]
#[case::funding_rates(
DataResponse::FundingRates(make_funding_rates_response()),
PAYLOAD_TYPE_FUNDING_RATES_RESPONSE
)]
#[case::forward_prices(
DataResponse::ForwardPrices(make_forward_prices_response()),
PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE
)]
#[case::bars(DataResponse::Bars(make_bars_response()), PAYLOAD_TYPE_BARS_RESPONSE)]
fn data_response_envelope_stamps_inner_tag_for_every_variant(
#[case] response: DataResponse,
#[case] expected_tag: &str,
) {
let encoded = encode_data_response(&response).expect("encode");
let tag = encoded.payload_type.expect("override").as_str().to_string();
assert_eq!(tag, expected_tag);
assert_ne!(
tag, PAYLOAD_TYPE_DATA_RESPONSE,
"wrapper fallback tag must never reach the writer",
);
assert!(
encoded.index_keys.is_empty(),
"DataResponse correlation_id has no matching IndexKind today",
);
}
#[rstest]
fn data_response_registered_under_canonical_payload_type() {
let registry = default_registry();
let envelope = DataResponse::Quotes(make_quotes_response());
let (tag, encoded) = registry
.encode(&envelope)
.expect("encode")
.expect("registered");
assert_eq!(tag.as_str(), PAYLOAD_TYPE_QUOTES_RESPONSE);
assert!(encoded.index_keys.is_empty());
}
#[rstest]
fn data_response_headers_extractor_surfaces_correlation_id() {
let registry = default_registry();
let response = make_quotes_response();
let expected = response.correlation_id;
let envelope = DataResponse::Quotes(response);
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, Some(expected));
assert_eq!(headers.causation_id, None);
}
#[rstest]
fn data_command_request_headers_use_request_id_as_correlation() {
let registry = default_registry();
let request = make_quotes_request();
let expected = request.request_id;
let envelope = DataCommand::Request(RequestCommand::Quotes(request));
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, Some(expected));
}
#[rstest]
fn data_command_subscribe_headers_surface_correlation_id() {
let registry = default_registry();
let subscribe = make_subscribe_command();
let expected = subscribe.correlation_id();
let envelope = DataCommand::Subscribe(subscribe);
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, expected);
}
#[rstest]
fn data_command_unsubscribe_headers_surface_correlation_id() {
let registry = default_registry();
let unsubscribe = make_unsubscribe_command();
let expected = unsubscribe.correlation_id();
let envelope = DataCommand::Unsubscribe(unsubscribe);
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, expected);
}
#[rstest]
#[case::submit_order(trading_command_submit_order)]
#[case::submit_order_list(trading_command_submit_order_list)]
#[case::modify_order(trading_command_modify_order)]
#[case::cancel_order(trading_command_cancel_order)]
#[case::cancel_all_orders(trading_command_cancel_all_orders)]
#[case::batch_cancel_orders(trading_command_batch_cancel_orders)]
#[case::query_order(trading_command_query_order)]
#[case::query_account(trading_command_query_account)]
fn trading_command_extractor_surfaces_both_headers(
#[case] builder: fn() -> (TradingCommand, UUID4, UUID4),
) {
let (envelope, corr, caus) = builder();
let registry = default_registry();
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, Some(corr));
assert_eq!(headers.causation_id, Some(caus));
}
fn trading_command_submit_order() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_submit_order();
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::SubmitOrder(cmd), corr, caus)
}
fn trading_command_submit_order_list() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_submit_order_list(vec![client_order_id()]);
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::SubmitOrderList(cmd), corr, caus)
}
fn trading_command_modify_order() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_modify_order(Some(venue_order_id()));
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::ModifyOrder(cmd), corr, caus)
}
fn trading_command_cancel_order() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_cancel_order();
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::CancelOrder(cmd), corr, caus)
}
fn trading_command_cancel_all_orders() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_cancel_all_orders();
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::CancelAllOrders(cmd), corr, caus)
}
fn trading_command_batch_cancel_orders() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_batch_cancel_orders(vec![make_cancel_order()]);
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::BatchCancelOrders(cmd), corr, caus)
}
fn trading_command_query_order() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_query_order(Some(venue_order_id()));
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::QueryOrder(cmd), corr, caus)
}
fn trading_command_query_account() -> (TradingCommand, UUID4, UUID4) {
let corr = UUID4::new();
let caus = UUID4::new();
let mut cmd = make_query_account();
cmd.correlation_id = Some(corr);
cmd.causation_id = Some(caus);
(TradingCommand::QueryAccount(cmd), corr, caus)
}
#[rstest]
#[case::data(data_response_data())]
#[case::instrument(data_response_instrument())]
#[case::instruments(data_response_instruments())]
#[case::book(data_response_book())]
#[case::book_deltas(data_response_book_deltas())]
#[case::book_depth(data_response_book_depth())]
#[case::quotes(data_response_quotes())]
#[case::trades(data_response_trades())]
#[case::funding_rates(data_response_funding_rates())]
#[case::forward_prices(data_response_forward_prices())]
#[case::bars(data_response_bars())]
fn data_response_extractor_surfaces_correlation_id_for_every_variant(
#[case] envelope_with_expected: (DataResponse, UUID4),
) {
let (envelope, expected) = envelope_with_expected;
let registry = default_registry();
let headers = registry
.headers_for_any(&envelope as &dyn std::any::Any)
.expect("registered");
assert_eq!(headers.correlation_id, Some(expected));
assert_eq!(headers.causation_id, None);
}
fn data_response_data() -> (DataResponse, UUID4) {
let resp = make_custom_data_response();
let expected = resp.correlation_id;
(DataResponse::Data(resp), expected)
}
fn data_response_instrument() -> (DataResponse, UUID4) {
let resp = make_instrument_response();
let expected = resp.correlation_id;
(DataResponse::Instrument(Box::new(resp)), expected)
}
fn data_response_instruments() -> (DataResponse, UUID4) {
let resp = make_instruments_response();
let expected = resp.correlation_id;
(DataResponse::Instruments(resp), expected)
}
fn data_response_book() -> (DataResponse, UUID4) {
let resp = make_book_response();
let expected = resp.correlation_id;
(DataResponse::Book(resp), expected)
}
fn data_response_book_deltas() -> (DataResponse, UUID4) {
let resp = make_book_deltas_response();
let expected = resp.correlation_id;
(DataResponse::BookDeltas(resp), expected)
}
fn data_response_book_depth() -> (DataResponse, UUID4) {
let resp = make_book_depth_response();
let expected = resp.correlation_id;
(DataResponse::BookDepth(resp), expected)
}
fn data_response_quotes() -> (DataResponse, UUID4) {
let resp = make_quotes_response();
let expected = resp.correlation_id;
(DataResponse::Quotes(resp), expected)
}
fn data_response_trades() -> (DataResponse, UUID4) {
let resp = make_trades_response();
let expected = resp.correlation_id;
(DataResponse::Trades(resp), expected)
}
fn data_response_funding_rates() -> (DataResponse, UUID4) {
let resp = make_funding_rates_response();
let expected = resp.correlation_id;
(DataResponse::FundingRates(resp), expected)
}
fn data_response_forward_prices() -> (DataResponse, UUID4) {
let resp = make_forward_prices_response();
let expected = resp.correlation_id;
(DataResponse::ForwardPrices(resp), expected)
}
fn data_response_bars() -> (DataResponse, UUID4) {
let resp = make_bars_response();
let expected = resp.correlation_id;
(DataResponse::Bars(resp), expected)
}
fn make_quotes_request() -> RequestQuotes {
RequestQuotes {
instrument_id: InstrumentId::from("EUR/USD.SIM"),
start: None,
end: None,
limit: None,
client_id: None,
request_id: UUID4::new(),
ts_init: UnixNanos::default(),
params: None,
}
}
}