use crate::messages::{IncomingMessages, ResponseMessage, WARNING_CODE_RANGE};
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RoutingDecision {
ByRequestId(i32),
ByOrderId(i32),
ByMessageType(IncomingMessages),
SharedMessage(IncomingMessages),
Error(DecodedError),
Shutdown,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct DecodedError {
pub request_id: i32,
pub error_code: i32,
pub error_message: String,
pub error_time: Option<i64>,
pub advanced_order_reject_json: String,
}
impl Default for DecodedError {
fn default() -> Self {
Self {
request_id: UNSPECIFIED_REQUEST_ID,
error_code: 0,
error_message: String::new(),
error_time: None,
advanced_order_reject_json: String::new(),
}
}
}
pub(crate) fn decode_error_envelope(raw_bytes: &[u8]) -> Option<DecodedError> {
let envelope: crate::proto::ErrorMessage = prost::Message::decode(raw_bytes).ok()?;
Some(DecodedError {
request_id: envelope.id.unwrap_or(UNSPECIFIED_REQUEST_ID),
error_code: envelope.error_code.unwrap_or(0),
error_message: envelope.error_msg.unwrap_or_default(),
error_time: envelope.error_time,
advanced_order_reject_json: envelope.advanced_order_reject_json.unwrap_or_default(),
})
}
fn is_order_message(message_type: IncomingMessages) -> bool {
matches!(
message_type,
IncomingMessages::OrderStatus
| IncomingMessages::OpenOrder
| IncomingMessages::OpenOrderEnd
| IncomingMessages::CompletedOrder
| IncomingMessages::CompletedOrdersEnd
| IncomingMessages::ExecutionData
| IncomingMessages::ExecutionDataEnd
| IncomingMessages::CommissionsReport
)
}
fn is_shared_message(message_type: IncomingMessages) -> bool {
matches!(
message_type,
IncomingMessages::ManagedAccounts | IncomingMessages::NextValidId | IncomingMessages::CurrentTime
)
}
pub(crate) fn determine_routing(message: &ResponseMessage) -> RoutingDecision {
let message_type = message.message_type();
if message_type == IncomingMessages::Shutdown {
return RoutingDecision::Shutdown;
}
if message_type == IncomingMessages::Error {
let decoded = message.raw_bytes().and_then(decode_error_envelope).unwrap_or_default();
return RoutingDecision::Error(decoded);
}
if is_order_message(message_type) {
return RoutingDecision::ByOrderId(message.order_id().unwrap_or(-1));
}
if is_shared_message(message_type) {
return RoutingDecision::SharedMessage(message_type);
}
if let Some(request_id) = message.request_id() {
return RoutingDecision::ByRequestId(request_id);
}
RoutingDecision::ByMessageType(message_type)
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum OrderRoutingStrategy {
ExecutionData,
ExecutionDataEnd,
OrderOrShared,
ByExecutionId,
SharedOnly,
ByOrderId,
}
pub(crate) fn order_routing_strategy(message_type: IncomingMessages) -> OrderRoutingStrategy {
match message_type {
IncomingMessages::ExecutionData => OrderRoutingStrategy::ExecutionData,
IncomingMessages::ExecutionDataEnd => OrderRoutingStrategy::ExecutionDataEnd,
IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => OrderRoutingStrategy::OrderOrShared,
IncomingMessages::CommissionsReport => OrderRoutingStrategy::ByExecutionId,
IncomingMessages::CompletedOrder | IncomingMessages::OpenOrderEnd | IncomingMessages::CompletedOrdersEnd => OrderRoutingStrategy::SharedOnly,
_ => OrderRoutingStrategy::ByOrderId,
}
}
pub(crate) fn is_warning_error(error_code: i32) -> bool {
WARNING_CODE_RANGE.contains(&error_code)
}
pub(crate) const UNSPECIFIED_REQUEST_ID: i32 = -1;
#[cfg(test)]
#[path = "routing_tests.rs"]
mod tests;