use super::*;
use crate::common::test_utils::helpers::{proto_error_response, proto_response};
use crate::messages::{HANDSHAKE_DECODE_FAILURE_CODE, HANDSHAKE_UNKNOWN_FRAME_CODE};
use std::sync::{Arc, Mutex};
use time::macros::datetime;
use time_tz::{timezones, OffsetResult, PrimitiveDateTimeExt, TimeZone};
const TEST_SERVER_VERSION: i32 = server_versions::PROTOBUF_REST_MESSAGES_3;
#[derive(Default)]
struct DiscardingSink;
impl NoticeSink for DiscardingSink {
fn deliver(&self, _: Notice) {}
}
#[derive(Default)]
struct CapturingSink {
notices: Mutex<Vec<Notice>>,
}
impl CapturingSink {
fn last(&self) -> Option<Notice> {
self.notices.lock().unwrap().last().cloned()
}
fn count(&self) -> usize {
self.notices.lock().unwrap().len()
}
}
impl NoticeSink for CapturingSink {
fn deliver(&self, n: Notice) {
self.notices.lock().unwrap().push(n);
}
}
fn empty_ctx<'a>() -> StartupHandshakeContext<'a> {
static SINK: DiscardingSink = DiscardingSink;
StartupHandshakeContext {
startup: None,
notice_sink: &SINK,
}
}
fn startup_ctx<'a>(cb: &'a (dyn Fn(StartupMessage) + Send + Sync)) -> StartupHandshakeContext<'a> {
static SINK: DiscardingSink = DiscardingSink;
StartupHandshakeContext {
startup: Some(cb),
notice_sink: &SINK,
}
}
fn notice_sink_ctx(sink: &CapturingSink) -> StartupHandshakeContext<'_> {
StartupHandshakeContext {
startup: None,
notice_sink: sink,
}
}
fn full_ctx<'a>(cb: &'a (dyn Fn(StartupMessage) + Send + Sync), sink: &'a CapturingSink) -> StartupHandshakeContext<'a> {
StartupHandshakeContext {
startup: Some(cb),
notice_sink: sink,
}
}
#[test]
fn test_dispatch_unsolicited_open_order_decode_failure_emits_notice() {
let mut message = ResponseMessage::from("5\0123\0AAPL\0STK\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(
notice.message.contains("OpenOrder"),
"notice message should name the kind: {}",
notice.message
);
}
#[test]
fn test_dispatch_unsolicited_order_status_decode_failure_emits_notice() {
let mut message = ResponseMessage::from("3\0456\0Filled\0100\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(notice.message.contains("OrderStatus"), "notice should name the kind: {}", notice.message);
}
#[test]
fn test_dispatch_unsolicited_account_value_typed() {
use crate::messages::IncomingMessages;
use crate::testdata::builders::accounts::account_value;
use crate::testdata::builders::ResponseProtoEncoder;
let mut message = proto_response(
IncomingMessages::AccountValue,
account_value()
.key("NetLiquidation")
.value("123456.78")
.currency("USD")
.account("DU1234567")
.encode_proto(),
);
let captured: Arc<Mutex<Option<crate::accounts::AccountValue>>> = Arc::new(Mutex::new(None));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| {
if let StartupMessage::AccountUpdate(crate::accounts::AccountUpdate::AccountValue(av)) = msg {
*captured.lock().unwrap() = Some(av);
} else {
panic!("expected AccountUpdate::AccountValue, got {msg:?}");
}
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
let got = captured_clone.lock().unwrap().take().expect("callback didn't fire");
assert_eq!(got.key, "NetLiquidation");
assert_eq!(got.value, "123456.78");
assert_eq!(got.currency, "USD");
assert_eq!(got.account.as_deref(), Some("DU1234567"));
}
#[test]
fn test_dispatch_unsolicited_open_order_end_typed() {
let mut message = ResponseMessage::from("53\01\0");
let captured: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| {
if matches!(msg, StartupMessage::OpenOrderEnd) {
*captured.lock().unwrap() = true;
} else {
panic!("expected OpenOrderEnd, got {msg:?}");
}
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(*captured_clone.lock().unwrap(), "OpenOrderEnd not delivered as typed variant");
}
#[test]
fn test_dispatch_unsolicited_account_download_end_typed() {
let mut message = ResponseMessage::from("54\01\0DU1234567\0");
let captured: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| {
if matches!(msg, StartupMessage::AccountUpdate(crate::accounts::AccountUpdate::End)) {
*captured.lock().unwrap() = true;
}
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(*captured_clone.lock().unwrap(), "End variant not delivered");
}
#[test]
fn test_dispatch_unsolicited_unknown_emits_notice() {
let mut message = ResponseMessage::from("14\0\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire for unknown handshake frame");
let notice = sink.last().expect("notice sink should receive unknown-frame notice");
assert_eq!(notice.code, HANDSHAKE_UNKNOWN_FRAME_CODE);
assert!(
notice.message.contains("NewsBulletins"),
"notice should name the kind: {}",
notice.message
);
assert!(notice.is_handshake_synthetic());
}
#[test]
fn test_dispatch_unsolicited_notice_warning_invokes_notice_sink() {
let mut message = proto_error_response(-1, 2104, "Market data farm OK");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
let got = sink.last().expect("notice sink didn't receive notice");
assert_eq!(got.code, 2104);
assert_eq!(got.message, "Market data farm OK");
}
#[test]
fn test_dispatch_unsolicited_notice_hard_error_invokes_notice_sink() {
let mut message = proto_error_response(-1, 504, "Not connected");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.last().expect("sink missed").code, 504);
}
#[test]
fn test_dispatch_unsolicited_notice_only_fires_notice_sink() {
let mut message = proto_error_response(-1, 2104, "farm OK");
let startup_fired = Arc::new(Mutex::new(false));
let startup_fired_clone = startup_fired.clone();
let startup_cb = move |_msg: StartupMessage| {
*startup_fired_clone.lock().unwrap() = true;
};
let sink = CapturingSink::default();
dispatch_unsolicited_message(
TEST_SERVER_VERSION,
&mut message,
&StartupHandshakeContext {
startup: Some(&startup_cb),
notice_sink: &sink,
},
);
assert!(!*startup_fired.lock().unwrap(), "startup callback should not fire on Error");
assert_eq!(sink.count(), 1, "notice sink should receive exactly one notice on Error");
}
#[test]
fn test_parse_account_info_callback_not_invoked_for_next_valid_id() {
use prost::Message;
let handler = ConnectionHandler::default();
let bytes = crate::proto::NextValidId { order_id: Some(1000) }.encode_to_vec();
let mut message = proto_response(IncomingMessages::NextValidId, bytes);
let fired = Arc::new(Mutex::new(false));
let fired_clone = fired.clone();
let cb = move |_: StartupMessage| {
*fired_clone.lock().unwrap() = true;
};
let result = handler.parse_account_info(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(result.is_ok());
assert!(!*fired.lock().unwrap(), "callback should NOT be invoked for NextValidId");
}
#[test]
fn test_parse_account_info_callback_not_invoked_for_managed_accounts() {
use prost::Message;
let handler = ConnectionHandler::default();
let bytes = crate::proto::ManagedAccounts {
accounts_list: Some("DU123".to_string()),
}
.encode_to_vec();
let mut message = proto_response(IncomingMessages::ManagedAccounts, bytes);
let fired = Arc::new(Mutex::new(false));
let fired_clone = fired.clone();
let cb = move |_: StartupMessage| {
*fired_clone.lock().unwrap() = true;
};
let result = handler.parse_account_info(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(result.is_ok());
assert!(!*fired.lock().unwrap(), "callback should NOT be invoked for ManagedAccounts");
}
#[test]
fn test_parse_account_info_multiple_messages_callback() {
use prost::Message;
let handler = ConnectionHandler::default();
let count = Arc::new(Mutex::new(0));
let count_clone = count.clone();
let cb = move |_: StartupMessage| {
*count_clone.lock().unwrap() += 1;
};
let cbs = startup_ctx(&cb);
let mut msg1 = ResponseMessage::from("53\01\0");
handler.parse_account_info(TEST_SERVER_VERSION, &mut msg1, &cbs).unwrap();
let mut msg2 = ResponseMessage::from("102\0");
handler.parse_account_info(TEST_SERVER_VERSION, &mut msg2, &cbs).unwrap();
let bytes = crate::proto::NextValidId { order_id: Some(1000) }.encode_to_vec();
let mut msg3 = proto_response(IncomingMessages::NextValidId, bytes);
handler.parse_account_info(TEST_SERVER_VERSION, &mut msg3, &cbs).unwrap();
assert_eq!(*count.lock().unwrap(), 2, "callback should be invoked exactly twice");
}
#[test]
fn test_parse_account_info_next_valid_id_rejects_text_framing() {
let handler = ConnectionHandler::default();
let mut message = ResponseMessage::from("9\01\01000\0");
let err = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect_err("text-framed NextValidId must be rejected");
assert!(matches!(err, Error::UnexpectedResponse(_)), "got {err:?}");
}
#[test]
fn test_parse_account_info_managed_accounts_rejects_text_framing() {
let handler = ConnectionHandler::default();
let mut message = ResponseMessage::from("15\01\0DU123,DU456\0");
let err = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect_err("text-framed ManagedAccounts must be rejected");
assert!(matches!(err, Error::UnexpectedResponse(_)), "got {err:?}");
}
#[test]
fn test_require_protobuf_support_accepts_minimum() {
require_protobuf_support(server_versions::PROTOBUF_REST_MESSAGES_3).expect("floor version must be accepted");
}
#[test]
fn test_require_protobuf_support_accepts_newer() {
require_protobuf_support(server_versions::PROTOBUF_REST_MESSAGES_3 + 5).expect("newer versions must be accepted");
}
#[test]
fn test_require_protobuf_support_rejects_older() {
let actual = server_versions::PROTOBUF_REST_MESSAGES_3 - 1;
let err = require_protobuf_support(actual).expect_err("older versions must be rejected");
match &err {
Error::ServerVersion(required, got, msg) => {
assert_eq!(*required, server_versions::PROTOBUF_REST_MESSAGES_3);
assert_eq!(*got, actual);
assert!(msg.contains("protobuf"), "message should mention protobuf: {msg}");
assert!(msg.contains("upgrade"), "message should tell user to upgrade: {msg}");
}
other => panic!("expected Error::ServerVersion, got {other:?}"),
}
let rendered = err.to_string();
let expected_required = format!("server version {} required", server_versions::PROTOBUF_REST_MESSAGES_3);
assert!(rendered.contains(&expected_required), "rendered: {rendered}");
assert!(rendered.contains(&actual.to_string()), "rendered: {rendered}");
}
#[test]
fn test_require_protobuf_support_rejects_previous_scan_data_floor() {
let previous_floor = server_versions::PROTOBUF_SCAN_DATA;
let err = require_protobuf_support(previous_floor).expect_err("previous floor must now be rejected");
match err {
Error::ServerVersion(required, got, _) => {
assert_eq!(required, server_versions::PROTOBUF_REST_MESSAGES_3);
assert_eq!(got, previous_floor);
}
other => panic!("expected Error::ServerVersion, got {other:?}"),
}
}
#[test]
fn test_parse_connection_time() {
let example = "20230405 22:20:39 PST";
let (connection_time, _) = parse_connection_time(example).unwrap();
let la = timezones::db::america::LOS_ANGELES;
if let OffsetResult::Some(other) = datetime!(2023-04-05 22:20:39).assume_timezone(la) {
assert_eq!(connection_time, Some(other));
}
}
#[test]
fn test_parse_connection_time_china_standard_time() {
let example = "20230405 22:20:39 China Standard Time";
let (connection_time, timezone) = parse_connection_time(example).unwrap();
assert!(connection_time.is_some());
assert!(timezone.is_some());
assert_eq!(timezone.unwrap().name(), "Asia/Shanghai");
}
#[test]
fn test_parse_connection_time_chinese_utf8() {
let example = "20230405 22:20:39 中国标准时间";
let (connection_time, timezone) = parse_connection_time(example).unwrap();
assert!(connection_time.is_some());
assert!(timezone.is_some());
assert_eq!(timezone.unwrap().name(), "Asia/Shanghai");
}
#[test]
fn test_parse_connection_time_mojibake() {
let example = "20230405 22:20:39 \u{FFFD}\u{FFFD}\u{FFFD}";
let (connection_time, timezone) = parse_connection_time(example).unwrap();
assert!(connection_time.is_some());
assert!(timezone.is_some());
assert_eq!(timezone.unwrap().name(), "Asia/Shanghai");
}
#[test]
fn test_parse_connection_time_unknown_timezone_errors() {
let example = "20230405 22:20:39 Bogus Standard Time";
let err = parse_connection_time(example).expect_err("unknown tz must error");
assert!(matches!(err, Error::UnsupportedTimeZone(ref name) if name == "Bogus Standard Time"));
let rendered = err.to_string();
assert!(rendered.contains("Bogus Standard Time"), "missing tz name: {rendered}");
assert!(
rendered.contains("register_timezone_alias"),
"missing programmatic-fix pointer: {rendered}"
);
assert!(rendered.contains("IBAPI_TIMEZONE_ALIASES"), "missing env-var pointer: {rendered}");
assert!(
rendered.contains("github.com/wboayue/rust-ibapi"),
"missing issue-tracker pointer: {rendered}"
);
}
#[test]
fn test_parse_connection_time_short_input_still_ok() {
let (time, tz) = parse_connection_time("20230405").unwrap();
assert!(time.is_none());
assert!(tz.is_none());
}
#[test]
fn test_parse_connection_time_unparseable_date_still_ok() {
let (time, tz) = parse_connection_time("BADDATE 99:99:99 PST").unwrap();
assert!(time.is_none());
assert!(tz.is_some());
}
#[test]
fn test_connection_handler_handshake() {
let handler = ConnectionHandler::default();
let handshake = handler.format_handshake();
assert_eq!(&handshake[0..4], b"API\0");
let version_part = &handshake[4..];
assert!(!version_part.is_empty());
}
#[test]
fn test_connection_handler_start_api() {
use crate::messages::PROTOBUF_MSG_ID;
let handler = ConnectionHandler::default();
let data = handler.format_start_api(123, server_versions::PROTOBUF);
let msg_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
assert_eq!(msg_id, 71 + PROTOBUF_MSG_ID);
let request: crate::proto::StartApiRequest = prost::Message::decode(&data[4..]).unwrap();
assert_eq!(request.client_id, Some(123));
}
#[test]
fn test_connection_handler_start_api_protobuf() {
let handler = ConnectionHandler::default();
let data = handler.format_start_api(123, server_versions::PROTOBUF);
let msg_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
assert_eq!(msg_id, 271);
use prost::Message;
let req = crate::proto::StartApiRequest::decode(&data[4..]).unwrap();
assert_eq!(req.client_id, Some(123));
}
#[test]
fn test_parse_raw_message_protobuf() {
use crate::messages::PROTOBUF_MSG_ID;
let msg_id: i32 = 5 + PROTOBUF_MSG_ID;
let payload = vec![0x08, 0x64]; let mut data = msg_id.to_be_bytes().to_vec();
data.extend_from_slice(&payload);
let (message, trace_str) = parse_raw_message(&data);
assert!(message.raw_bytes().is_some(), "protobuf framing must populate raw_bytes");
assert_eq!(message.message_type(), IncomingMessages::OpenOrder);
assert_eq!(message.raw_bytes(), Some(payload.as_slice()));
assert!(trace_str.is_none()); }
#[test]
fn test_parse_raw_message_binary_id_text_payload() {
let msg_id: i32 = 9; let text_payload = b"1\01000\0";
let mut data = msg_id.to_be_bytes().to_vec();
data.extend_from_slice(text_payload);
let (message, trace_str) = parse_raw_message(&data);
assert!(message.raw_bytes().is_none(), "text framing must leave raw_bytes empty");
assert_eq!(message.message_type(), IncomingMessages::NextValidId);
assert_eq!(message.fields[1], "1"); assert_eq!(message.peek_int(2).unwrap(), 1000); assert!(trace_str.is_some());
}
#[test]
fn test_non_utf8_handshake_response() {
let gb2312_bytes: Vec<u8> = vec![
49, 55, 51, 0, 50, 48, 50, 53, 49, 50, 48, 53, 32, 50, 51, 58, 49, 51, 58, 52, 53, 32, 214, 208, 185, 250, 177, 234, 215, 188, 202, 177, 188, 228, 0, ];
let raw_string = String::from_utf8_lossy(&gb2312_bytes).into_owned();
assert!(raw_string.contains("173"));
assert!(raw_string.contains("20251205"));
assert!(raw_string.contains("23:13:45"));
assert!(raw_string.contains('\u{FFFD}'));
let mut response = ResponseMessage::from(&raw_string);
let handler = ConnectionHandler::default();
let result = handler.parse_handshake_response(&mut response);
assert!(result.is_ok());
let handshake_data = result.unwrap();
assert_eq!(handshake_data.server_version, 173);
assert!(handshake_data.server_time.contains("20251205"));
}
#[test]
fn test_startup_message_message_type_typed_variants() {
use crate::accounts::{AccountPortfolioValue, AccountUpdateTime, AccountValue};
use crate::orders::{CommissionReport, ExecutionData, OrderData, OrderStatus};
assert_eq!(
StartupMessage::OpenOrder(OrderData::default()).message_type(),
IncomingMessages::OpenOrder
);
assert_eq!(
StartupMessage::OrderStatus(OrderStatus::default()).message_type(),
IncomingMessages::OrderStatus
);
assert_eq!(StartupMessage::OpenOrderEnd.message_type(), IncomingMessages::OpenOrderEnd);
assert_eq!(
StartupMessage::AccountUpdate(AccountUpdate::AccountValue(AccountValue::default())).message_type(),
IncomingMessages::AccountValue
);
assert_eq!(
StartupMessage::AccountUpdate(AccountUpdate::PortfolioValue(AccountPortfolioValue::default())).message_type(),
IncomingMessages::PortfolioValue
);
assert_eq!(
StartupMessage::AccountUpdate(AccountUpdate::UpdateTime(AccountUpdateTime::default())).message_type(),
IncomingMessages::AccountUpdateTime
);
assert_eq!(
StartupMessage::AccountUpdate(AccountUpdate::End).message_type(),
IncomingMessages::AccountDownloadEnd
);
assert_eq!(
StartupMessage::Execution(ExecutionData::default()).message_type(),
IncomingMessages::ExecutionData
);
assert_eq!(
StartupMessage::CommissionReport(CommissionReport::default()).message_type(),
IncomingMessages::CommissionsReport
);
assert_eq!(
StartupMessage::CompletedOrder(OrderData::default()).message_type(),
IncomingMessages::CompletedOrder
);
assert_eq!(StartupMessage::ExecutionDataEnd.message_type(), IncomingMessages::ExecutionDataEnd);
assert_eq!(StartupMessage::CompletedOrdersEnd.message_type(), IncomingMessages::CompletedOrdersEnd);
}
#[test]
fn test_parse_account_info_next_valid_id_protobuf() {
use prost::Message;
let handler = ConnectionHandler::default();
let proto = crate::proto::NextValidId { order_id: Some(4242) };
let bytes = proto.encode_to_vec();
let mut message = proto_response(IncomingMessages::NextValidId, bytes);
let info = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect("protobuf NextValidId must parse");
assert_eq!(info.next_order_id, Some(4242));
assert_eq!(info.managed_accounts, None);
}
#[test]
fn test_parse_account_info_managed_accounts_protobuf() {
use prost::Message;
let handler = ConnectionHandler::default();
let proto = crate::proto::ManagedAccounts {
accounts_list: Some("DU111,DU222".to_string()),
};
let bytes = proto.encode_to_vec();
let mut message = proto_response(IncomingMessages::ManagedAccounts, bytes);
let info = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect("protobuf ManagedAccounts must parse");
assert_eq!(info.next_order_id, None);
assert_eq!(info.managed_accounts, Some("DU111,DU222".to_string()));
}
#[test]
fn test_parse_account_info_next_valid_id_protobuf_decode_error() {
let handler = ConnectionHandler::default();
let mut message = proto_response(IncomingMessages::NextValidId, vec![0xff, 0xff, 0xff]);
let err = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect_err("garbage protobuf must error");
assert!(matches!(err, Error::ProtobufDecode(_)), "got {err:?}");
}
#[test]
fn test_parse_account_info_managed_accounts_protobuf_decode_error() {
let handler = ConnectionHandler::default();
let mut message = proto_response(IncomingMessages::ManagedAccounts, vec![0xff, 0xff, 0xff]);
let err = handler
.parse_account_info(TEST_SERVER_VERSION, &mut message, &empty_ctx())
.expect_err("garbage protobuf must error");
assert!(matches!(err, Error::ProtobufDecode(_)), "got {err:?}");
}
#[test]
fn test_dispatch_unsolicited_open_order_no_callback_is_noop() {
let mut message = ResponseMessage::from("5\0123\0AAPL\0STK\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0, "OpenOrder must not deliver to notice sink");
}
#[test]
fn test_dispatch_unsolicited_order_status_no_callback_is_noop() {
let mut message = ResponseMessage::from("3\0456\0Filled\0100\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_open_order_end_no_callback_is_noop() {
let mut message = ResponseMessage::from("53\01\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_account_update_no_callback_is_noop() {
use crate::messages::IncomingMessages;
use crate::testdata::builders::accounts::account_value;
use crate::testdata::builders::ResponseProtoEncoder;
let mut message = proto_response(
IncomingMessages::AccountValue,
account_value()
.key("NetLiquidation")
.value("123.45")
.currency("USD")
.account("DU1")
.encode_proto(),
);
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_account_value_decode_failure_emits_notice() {
use crate::messages::IncomingMessages;
let mut message = proto_response(IncomingMessages::AccountValue, vec![0xFF, 0xFF, 0xFF]);
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(notice.message.contains("AccountValue"));
}
fn proto_frame<B: crate::testdata::builders::ResponseProtoEncoder>(kind: IncomingMessages, builder: &B) -> ResponseMessage {
proto_response(kind, builder.encode_proto())
}
#[test]
fn test_dispatch_unsolicited_execution_typed() {
use crate::testdata::builders::orders::ExecutionDataResponse;
let builder = ExecutionDataResponse::default().symbol("TSLA").shares(100.0).price(196.52);
let mut message = proto_frame(IncomingMessages::ExecutionData, &builder);
let captured: Arc<Mutex<Option<crate::orders::ExecutionData>>> = Arc::new(Mutex::new(None));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| match msg {
StartupMessage::Execution(e) => *captured.lock().unwrap() = Some(e),
other => panic!("expected Execution, got {other:?}"),
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
let got = captured_clone.lock().unwrap().take().expect("callback didn't fire");
assert_eq!(got.contract.symbol.to_string(), "TSLA");
assert_eq!(got.execution.shares, 100.0);
assert_eq!(got.execution.price, 196.52);
}
#[test]
fn test_dispatch_unsolicited_commission_report_typed() {
use crate::testdata::builders::orders::CommissionReportResponse;
let builder = CommissionReportResponse::default().commission(2.5).currency("USD");
let mut message = proto_frame(IncomingMessages::CommissionsReport, &builder);
let captured: Arc<Mutex<Option<crate::orders::CommissionReport>>> = Arc::new(Mutex::new(None));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| match msg {
StartupMessage::CommissionReport(c) => *captured.lock().unwrap() = Some(c),
other => panic!("expected CommissionReport, got {other:?}"),
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
let got = captured_clone.lock().unwrap().take().expect("callback didn't fire");
assert_eq!(got.commission, 2.5);
assert_eq!(got.currency, "USD");
}
#[test]
fn test_dispatch_unsolicited_completed_order_typed() {
use crate::testdata::builders::orders::CompletedOrderResponse;
let builder = CompletedOrderResponse::default();
let mut message = proto_frame(IncomingMessages::CompletedOrder, &builder);
let captured: Arc<Mutex<Option<crate::orders::OrderData>>> = Arc::new(Mutex::new(None));
let captured_clone = captured.clone();
let cb = move |msg: StartupMessage| match msg {
StartupMessage::CompletedOrder(o) => *captured.lock().unwrap() = Some(o),
other => panic!("expected CompletedOrder, got {other:?}"),
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
let got = captured_clone.lock().unwrap().take().expect("callback didn't fire");
assert_eq!(got.order_id, -1);
}
#[test]
fn test_dispatch_unsolicited_execution_data_end_typed() {
let mut message = ResponseMessage::from("55\01\042\0");
let fired = Arc::new(Mutex::new(false));
let fired_clone = fired.clone();
let cb = move |msg: StartupMessage| {
if matches!(msg, StartupMessage::ExecutionDataEnd) {
*fired.lock().unwrap() = true;
} else {
panic!("expected ExecutionDataEnd, got {msg:?}");
}
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(*fired_clone.lock().unwrap(), "ExecutionDataEnd not delivered as typed variant");
}
#[test]
fn test_dispatch_unsolicited_completed_orders_end_typed() {
let mut message = ResponseMessage::from("102\0");
let fired = Arc::new(Mutex::new(false));
let fired_clone = fired.clone();
let cb = move |msg: StartupMessage| {
if matches!(msg, StartupMessage::CompletedOrdersEnd) {
*fired.lock().unwrap() = true;
} else {
panic!("expected CompletedOrdersEnd, got {msg:?}");
}
};
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &startup_ctx(&cb));
assert!(*fired_clone.lock().unwrap(), "CompletedOrdersEnd not delivered as typed variant");
}
#[test]
fn test_dispatch_unsolicited_execution_decode_failure_emits_notice() {
let mut message = ResponseMessage::from("11\042\013\0AAPL\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(notice.message.contains("ExecutionData"));
}
#[test]
fn test_dispatch_unsolicited_commission_report_decode_failure_emits_notice() {
let mut message = ResponseMessage::from("59\01\0EXEC0001.01.01\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(notice.message.contains("CommissionsReport"));
}
#[test]
fn test_dispatch_unsolicited_completed_order_decode_failure_emits_notice() {
let mut message = ResponseMessage::from("101\0\0");
let cb_fired = Arc::new(Mutex::new(false));
let cb_fired_clone = cb_fired.clone();
let cb = move |_msg: StartupMessage| *cb_fired_clone.lock().unwrap() = true;
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, &full_ctx(&cb, &sink));
assert!(!*cb_fired.lock().unwrap(), "callback must not fire on decode failure");
let notice = sink.last().expect("notice sink should receive decode-failure notice");
assert_eq!(notice.code, HANDSHAKE_DECODE_FAILURE_CODE);
assert!(notice.message.contains("CompletedOrder"));
}
#[test]
fn test_dispatch_unsolicited_execution_no_callback_is_noop() {
let mut message = ResponseMessage::from("11\042\013\0AAPL\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0, "ExecutionData must not deliver to notice sink");
}
#[test]
fn test_dispatch_unsolicited_commission_report_no_callback_is_noop() {
let mut message = ResponseMessage::from("59\01\0EXEC0001.01.01\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_completed_order_no_callback_is_noop() {
let mut message = ResponseMessage::from("101\0\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_execution_data_end_no_callback_is_noop() {
let mut message = ResponseMessage::from("55\01\042\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_completed_orders_end_no_callback_is_noop() {
let mut message = ResponseMessage::from("102\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
assert_eq!(sink.count(), 0);
}
#[test]
fn test_dispatch_unsolicited_unknown_no_callback_still_notices() {
let mut message = ResponseMessage::from("14\0\0");
let sink = CapturingSink::default();
dispatch_unsolicited_message(TEST_SERVER_VERSION, &mut message, ¬ice_sink_ctx(&sink));
let notice = sink.last().expect("notice sink should receive unknown-frame notice");
assert_eq!(notice.code, HANDSHAKE_UNKNOWN_FRAME_CODE);
assert!(notice.is_handshake_synthetic());
}