#![feature(attr_literals)]
#![feature(const_fn)]
#[macro_use]
extern crate fix_rs;
#[macro_use]
extern crate fix_rs_macros;
extern crate mio;
extern crate phf;
use mio::tcp::Shutdown;
use std::io::Write;
use std::thread;
use std::time::{Duration,Instant};
use std::sync::{Arc,Mutex};
use std::sync::atomic::{AtomicBool,Ordering};
#[macro_use]
mod common;
use common::{SERVER_SENDER_COMP_ID,SERVER_TARGET_COMP_ID,TestStream,new_logon_message};
use fix_rs::dictionary::field_types::other::{MsgDirection,SessionRejectReason};
use fix_rs::dictionary::fields::{MsgTypeGrp,SenderCompID,TargetCompID,Text};
use fix_rs::dictionary::messages::{Heartbeat,Logon,Logout,Reject,ResendRequest,SequenceReset,TestRequest};
use fix_rs::field::Field;
use fix_rs::field_tag::{self,FieldTag};
use fix_rs::fix::ParseError;
use fix_rs::fix_version::FIXVersion;
use fix_rs::fixt;
use fix_rs::fixt::engine::{EngineEvent,ConnectionTerminatedReason,ResendResponse};
use fix_rs::fixt::tests::{AUTO_DISCONNECT_AFTER_INBOUND_RESEND_REQUEST_LOOP_COUNT,INBOUND_MESSAGES_BUFFER_LEN_MAX,INBOUND_BYTES_BUFFER_CAPACITY};
use fix_rs::fixt::message::FIXTMessage;
use fix_rs::message::{self,NOT_REQUIRED,REQUIRED,Message};
use fix_rs::message_version::{self,MessageVersion};
#[test]
fn test_recv_resend_request_invalid_end_seq_no() {
define_dictionary!(
Logon,
ResendRequest,
Reject,
);
let (mut test_server,_client,_) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(ResendRequest);
message.msg_seq_num = 5;
message.begin_seq_no = 2;
message.end_seq_no = 1;
test_server.send_message(message);
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.ref_seq_num,5);
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::ValueIsIncorrectForThisTag);
}
#[test]
fn test_send_logout_before_logon() {
define_dictionary!(
Logon,
Logout,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 1;
test_server.send_message(message);
thread::sleep(Duration::from_millis(500));
assert!(test_server.is_stream_closed(Duration::from_secs(5)));
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::LogonNotFirstMessageError = reason { true } else { false });
});
}
#[test]
fn test_recv_logout_with_high_msg_seq_num() {
define_dictionary!(
Logon,
Logout,
ResendRequest,
SequenceReset,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 15;
test_server.send_message(message);
let message = test_server.recv_message::<ResendRequest>();
assert_eq!(message.begin_seq_no,2);
assert!(message.end_seq_no == 0 || message.end_seq_no == 14);
let mut message = new_fixt_message!(SequenceReset);
message.gap_fill_flag = true;
message.new_seq_no = 15;
message.msg_seq_num = 2;
test_server.send_message(message);
let _ = engine_poll_message!(client,connection,SequenceReset);
let message = test_server.recv_message::<Logout>();
assert_eq!(message.msg_seq_num,3);
let _ = test_server.stream.shutdown(Shutdown::Both);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::RemoteRequested = reason { true } else { false });
});
}
#[test]
fn test_recv_logout_with_high_msg_seq_num_and_no_reply() {
define_dictionary!(
Logon,
Logout,
ResendRequest,
SequenceReset,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 15;
test_server.send_message(message);
let message = test_server.recv_message::<ResendRequest>();
assert_eq!(message.begin_seq_no,2);
assert!(message.end_seq_no == 0 || message.end_seq_no == 14);
thread::sleep(Duration::from_millis(10500));
let message = test_server.recv_message::<Logout>();
assert_eq!(message.msg_seq_num,3);
let _ = test_server.stream.shutdown(Shutdown::Both);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::RemoteRequested = reason { true } else { false });
});
}
#[test]
fn test_recv_logout_send_logout_recv_resend_request() {
define_dictionary!(
Heartbeat,
Logon,
Logout,
ResendRequest,
SequenceReset,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 2;
test_server.send_message(message);
let message = test_server.recv_message::<Logout>();
assert_eq!(message.msg_seq_num,2);
let _ = engine_poll_message!(client,connection,Logout);
let mut message = new_fixt_message!(ResendRequest);
message.msg_seq_num = 3;
message.begin_seq_no = 2;
message.end_seq_no = 0;
test_server.send_message(message);
engine_gap_fill_resend_request!(client,connection,2..3);
let _ = engine_poll_message!(client,connection,ResendRequest);
let message = test_server.recv_message::<SequenceReset>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.new_seq_no,3);
thread::sleep(Duration::from_millis(5500));
let _ = test_server.recv_message::<Heartbeat>();
let _ = test_server.recv_message::<TestRequest>();
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 4;
test_server.send_message(message);
let _ = engine_poll_message!(client,connection,Logout);
let _ = test_server.stream.shutdown(Shutdown::Both);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::RemoteRequested = reason { true } else { false });
});
}
#[test]
fn test_send_logout_and_recv_resend_request() {
define_dictionary!(
Heartbeat,
Logon,
Logout,
ResendRequest,
SequenceReset,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
thread::sleep(Duration::from_millis(5500));
let _ = test_server.recv_message::<Heartbeat>();
let _ = test_server.recv_message::<TestRequest>();
client.logout(connection);
let _ = test_server.recv_message::<Logout>();
let mut message = new_fixt_message!(ResendRequest);
message.msg_seq_num = 2;
message.begin_seq_no = 2;
message.end_seq_no = 0;
test_server.send_message(message);
engine_gap_fill_resend_request!(client,connection,2..5);
let _ = engine_poll_message!(client,connection,ResendRequest);
let message = test_server.recv_message::<SequenceReset>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.new_seq_no,5);
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 3;
test_server.send_message(message);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::LocalRequested = reason { true } else { false });
});
}
#[test]
fn test_send_logout_and_recv_logout_with_high_msg_seq_num() {
define_dictionary!(
Heartbeat,
Logon,
Logout,
ResendRequest,
SequenceReset,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
client.logout(connection);
let _ = test_server.recv_message::<Logout>();
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 15;
test_server.send_message(message);
let message = test_server.recv_message::<ResendRequest>();
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.begin_seq_no,2);
assert!(message.end_seq_no == 0 || message.end_seq_no == 15);
let mut message = new_fixt_message!(SequenceReset);
message.gap_fill_flag = true;
message.msg_seq_num = 2;
message.new_seq_no = 16;
test_server.send_message(message);
let _ = engine_poll_message!(client,connection,SequenceReset);
let _ = test_server.recv_message::<Logout>();
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 16;
test_server.send_message(message);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::LocalRequested = reason { true } else { false });
});
}
#[test]
fn test_send_logout_and_recv_logout_with_high_msg_seq_num_and_no_reply() {
define_dictionary!(
Heartbeat,
Logon,
Logout,
ResendRequest,
SequenceReset,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
client.logout(connection);
let _ = test_server.recv_message::<Logout>();
let mut message = new_fixt_message!(Logout);
message.msg_seq_num = 15;
test_server.send_message(message);
let message = test_server.recv_message::<ResendRequest>();
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.begin_seq_no,2);
assert!(message.end_seq_no == 0 || message.end_seq_no == 15);
thread::sleep(Duration::from_millis(10500));
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::LogoutNoResponseError = reason { true } else { false });
});
}
#[test]
fn test_wrong_sender_comp_id_in_logon_response() {
define_dictionary!(
Logon,
Logout,
Reject,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let message = new_logon_message();
client.send_message(connection,message);
let _ = test_server.recv_message::<Logon>();
let mut message = new_logon_message();
message.sender_comp_id = b"unknown".to_vec();
test_server.send_message(message);
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.ref_seq_num,1);
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::CompIDProblem);
assert_eq!(message.text,b"CompID problem".to_vec());
let message = test_server.recv_message::<Logout>();
assert_eq!(message.text,b"SenderCompID is wrong".to_vec());
engine_poll_event!(client,EngineEvent::MessageRejected(msg_connection,rejected_message) => {
assert_eq!(msg_connection,connection);
let message = rejected_message.as_any().downcast_ref::<Logon>().expect("Not expected message type").clone();
assert_eq!(message.msg_seq_num,1);
assert_eq!(message.sender_comp_id,b"unknown".to_vec());
});
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::SenderCompIDWrongError = reason { true } else { false });
});
}
#[test]
fn test_wrong_target_comp_id_in_logon_response() {
define_dictionary!(
Logon,
Logout,
Reject,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let message = new_logon_message();
client.send_message(connection,message);
let _ = test_server.recv_message::<Logon>();
let mut message = new_logon_message();
message.target_comp_id = b"unknown".to_vec();
test_server.send_message(message);
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.ref_seq_num,1);
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::CompIDProblem);
assert_eq!(message.text,b"CompID problem".to_vec());
let message = test_server.recv_message::<Logout>();
assert_eq!(message.text,b"TargetCompID is wrong".to_vec());
engine_poll_event!(client,EngineEvent::MessageRejected(msg_connection,rejected_message) => {
assert_eq!(msg_connection,connection);
let message = rejected_message.as_any().downcast_ref::<Logon>().expect("Not expected message type").clone();
assert_eq!(message.msg_seq_num,1);
assert_eq!(message.target_comp_id,b"unknown".to_vec());
});
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::TargetCompIDWrongError = reason { true } else { false });
});
}
#[test]
fn test_overflowing_inbound_messages_buffer_does_resume() {
define_dictionary!(
Logon,
Heartbeat,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut bytes = Vec::new();
for x in 0..INBOUND_MESSAGES_BUFFER_LEN_MAX + 1 {
let mut test_request_message = new_fixt_message!(TestRequest);
test_request_message.msg_seq_num = (x + 2) as u64;
test_request_message.test_req_id = b"test".to_vec();
test_request_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
}
assert!(bytes.len() < 1400); assert!(bytes.len() < INBOUND_BYTES_BUFFER_CAPACITY); let bytes_written = test_server.stream.write(&bytes).unwrap();
assert_eq!(bytes_written,bytes.len());
for x in 0..INBOUND_MESSAGES_BUFFER_LEN_MAX + 1 {
let message = engine_poll_message!(client,connection,TestRequest);
assert_eq!(message.msg_seq_num,(x + 2) as u64);
let message = test_server.recv_message::<Heartbeat>();
assert_eq!(message.msg_seq_num,(x + 2) as u64);
}
}
#[test]
fn test_sender_comp_id() {
define_fixt_message!(TestMessage: b"9999" => {
NOT_REQUIRED, text: Text [FIX50..],
});
define_dictionary!(
Logon,
Reject,
TestMessage,
);
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX50,build_dictionary());
let target_comp_id_fifth_tag_message = b"8=FIXT.1.1\x019=48\x0135=9999\x0149=TX\x0156=TEST\x0134=2\x0152=20170105-01:01:01\x0110=236\x01";
let bytes_written = test_server.stream.write(target_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_fifth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.sender_comp_id,SERVER_SENDER_COMP_ID);
let sender_comp_id_fifth_tag_message = b"8=FIXT.1.1\x019=48\x0135=9999\x0156=TEST\x0149=TX\x0134=3\x0152=20170105-01:01:01\x0110=012\x01";
let bytes_written = test_server.stream.write(sender_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,sender_comp_id_fifth_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.session_reject_reason.expect("SessionRejectReason must be provided"),SessionRejectReason::TagSpecifiedOutOfRequiredOrder);
assert_eq!(message.text,b"SenderCompID must be the 4th tag".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::SenderCompIDNotFourthTag = parse_error { true } else { false });
});
let missing_sender_comp_id_tag_message = b"8=FIXT.1.1\x019=50\x0135=9999\x0156=TEST\x0134=10\x0152=20170105-01:01:01\x0110=086\x01";
let bytes_written = test_server.stream.write(missing_sender_comp_id_tag_message).unwrap();
assert_eq!(bytes_written,missing_sender_comp_id_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.session_reject_reason.expect("SessionRejectReason must be provided"),SessionRejectReason::TagSpecifiedOutOfRequiredOrder);
assert_eq!(message.text,b"SenderCompID must be the 4th tag".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::SenderCompIDNotFourthTag = parse_error { true } else { false });
});
}
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon_with_ver(FIXVersion::FIX_4_0,MessageVersion::FIX40,build_dictionary());
let target_comp_id_fifth_tag_message = b"8=FIX.4.0\x019=48\x0135=9999\x0149=TX\x0156=TEST\x0134=2\x0152=20170105-01:01:01\x0110=154\x01";
let bytes_written = test_server.stream.write(target_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_fifth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.sender_comp_id,SERVER_SENDER_COMP_ID);
let sender_comp_id_fifth_tag_message = b"8=FIX.4.0\x019=48\x0135=9999\x0156=TEST\x0149=TX\x0134=3\x0152=20170105-01:01:01\x0110=155\x01";
let bytes_written = test_server.stream.write(sender_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,sender_comp_id_fifth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.sender_comp_id,SERVER_SENDER_COMP_ID);
let missing_sender_comp_id_tag_message = b"8=FIX.4.0\x019=42\x0135=9999\x0156=TEST\x0134=4\x0152=20170105-01:01:01\x0110=063\x01";
let bytes_written = test_server.stream.write(missing_sender_comp_id_tag_message).unwrap();
assert_eq!(bytes_written,missing_sender_comp_id_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.text,b"Required tag missing".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::MissingRequiredTag(ref tag,_) = parse_error { *tag == SenderCompID::tag() } else { false });
});
}
}
#[test]
fn test_target_comp_id() {
define_fixt_message!(TestMessage: b"9999" => {
NOT_REQUIRED, text: Text [FIX50..],
});
define_dictionary!(
Logon,
Reject,
TestMessage,
);
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX50,build_dictionary());
let target_comp_id_fifth_tag_message = b"8=FIXT.1.1\x019=48\x0135=9999\x0149=TX\x0156=TEST\x0134=2\x0152=20170105-01:01:01\x0110=236\x01";
let bytes_written = test_server.stream.write(target_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_fifth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.target_comp_id,SERVER_TARGET_COMP_ID);
let target_comp_id_sixth_tag_message = b"8=FIXT.1.1\x019=48\x0135=9999\x0149=TX\x0134=3\x0156=TEST\x0152=20170105-01:01:01\x0110=237\x01";
let bytes_written = test_server.stream.write(target_comp_id_sixth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_sixth_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.session_reject_reason.expect("SessionRejectReason must be provided"),SessionRejectReason::TagSpecifiedOutOfRequiredOrder);
assert_eq!(message.text,b"TargetCompID must be the 5th tag".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::TargetCompIDNotFifthTag = parse_error { true } else { false });
});
let missing_target_comp_id_tag_message = b"8=FIXT.1.1\x019=59\x0135=9999\x0149=TX\x0134=3\x0152=20170105-01:01:01\x0110=086\x01";
let bytes_written = test_server.stream.write(missing_target_comp_id_tag_message).unwrap();
assert_eq!(bytes_written,missing_target_comp_id_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.session_reject_reason.expect("SessionRejectReason must be provided"),SessionRejectReason::TagSpecifiedOutOfRequiredOrder);
assert_eq!(message.text,b"TargetCompID must be the 5th tag".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::TargetCompIDNotFifthTag = parse_error { true } else { false });
});
}
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon_with_ver(FIXVersion::FIX_4_0,MessageVersion::FIX40,build_dictionary());
let target_comp_id_fifth_tag_message = b"8=FIX.4.0\x019=48\x0135=9999\x0149=TX\x0156=TEST\x0134=2\x0152=20170105-01:01:01\x0110=154\x01";
let bytes_written = test_server.stream.write(target_comp_id_fifth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_fifth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.target_comp_id,SERVER_TARGET_COMP_ID);
let target_comp_id_sixth_tag_message = b"8=FIX.4.0\x019=48\x0135=9999\x0149=TX\x0134=3\x0156=TEST\x0152=20170105-01:01:01\x0110=155\x01";
let bytes_written = test_server.stream.write(target_comp_id_sixth_tag_message).unwrap();
assert_eq!(bytes_written,target_comp_id_sixth_tag_message.len());
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,3);
assert_eq!(message.target_comp_id,SERVER_TARGET_COMP_ID);
let missing_target_comp_id_tag_message = b"8=FIX.4.0\x019=40\x0135=9999\x0149=TX\x0134=4\x0152=20170105-01:01:01\x0110=171\x01";
let bytes_written = test_server.stream.write(missing_target_comp_id_tag_message).unwrap();
assert_eq!(bytes_written,missing_target_comp_id_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.text,b"Required tag missing".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::MissingRequiredTag(ref tag,_) = parse_error { *tag == TargetCompID::tag() } else { false });
});
}
}
#[test]
fn test_default_appl_ver_id() {
define_fixt_message!(TestMessage: b"9999" => {
REQUIRED, text: Text [FIX50..],
});
define_fixt_message!(TestMessage2: b"9999" => {
REQUIRED, text: Text [FIX40..],
});
define_dictionary!(
Logon,
TestMessage,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX40,build_dictionary());
{
let mut message = new_fixt_message!(TestMessage);
message.text = b"text".to_vec();
client.send_message(connection,message);
let message = test_server.recv_message::<TestMessage>();
assert_eq!(message.text.len(),0);
}
{
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 2;
message.text = b"text".to_vec();
test_server.send_message(message);
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.text.len(),0);
let mut message = new_fixt_message!(TestMessage2);
message.msg_seq_num = 3;
message.text = b"text".to_vec();
test_server.send_message(message);
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::UnknownTag(ref tag) = parse_error { *tag == FieldTag(58) } else { false });
});
}
}
#[test]
fn test_appl_ver_id() {
define_fixt_message!(TestMessage: b"9999" => {
REQUIRED, text: Text [FIX50..],
});
define_dictionary!(
Logon,
Reject,
TestMessage,
);
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let appl_ver_id_seventh_tag_message = b"8=FIXT.1.1\x019=44\x0135=9999\x0149=SERVER\x0156=CLIENT\x0134=2\x011128=9\x0110=000\x01";
let bytes_written = test_server.stream.write(appl_ver_id_seventh_tag_message).unwrap();
assert_eq!(bytes_written,appl_ver_id_seventh_tag_message.len());
let message = test_server.recv_message::<Reject>();
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::TagSpecifiedOutOfRequiredOrder);
assert_eq!(message.text,b"ApplVerID must be the 6th tag if specified".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::ApplVerIDNotSixthTag = parse_error { true } else { false });
});
}
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 2;
message.appl_ver_id = Some(MessageVersion::FIX40);
test_server.send_message_with_ver(FIXVersion::FIXT_1_1,message.appl_ver_id.unwrap(),message);
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.appl_ver_id,Some(MessageVersion::FIX40));
assert_eq!(message.text.len(),0);
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 3;
message.appl_ver_id = Some(MessageVersion::FIX40);
message.text = b"text".to_vec();
test_server.send_message_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,message);
let message = test_server.recv_message::<Reject>();
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::TagNotDefinedForThisMessageType);
assert_eq!(message.text,b"Tag not defined for this message type".to_vec());
engine_poll_event!(client,EngineEvent::MessageReceivedGarbled(msg_connection,parse_error) => {
assert_eq!(msg_connection,connection);
assert!(if let ParseError::UnexpectedTag(ref tag) = parse_error { *tag == Text::tag() } else { false });
});
}
}
#[test]
fn test_message_type_default_application_version() {
define_fixt_message!(TestMessage: b"9999" => {
REQUIRED, text: Text [FIX50SP1..],
});
define_dictionary!(
Logon,
Reject,
TestMessage,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let mut logon_message = new_logon_message();
logon_message.default_appl_ver_id = MessageVersion::FIX50;
client.send_message_box_with_message_version(connection,MessageVersion::FIX50SP2,Box::new(logon_message));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
let mut response_message = new_fixt_message!(Logon);
response_message.encrypt_method = message.encrypt_method;
response_message.heart_bt_int = message.heart_bt_int;
response_message.default_appl_ver_id = message.default_appl_ver_id;
let mut msg_type_grp = MsgTypeGrp::new();
msg_type_grp.ref_msg_type = TestMessage::msg_type().to_vec();
msg_type_grp.ref_appl_ver_id = Some(MessageVersion::FIX50SP1);
msg_type_grp.msg_direction = MsgDirection::Send;
msg_type_grp.default_ver_indicator = true;
response_message.no_msg_types.push(Box::new(msg_type_grp));
test_server.send_message_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,response_message);
engine_poll_event!(client,EngineEvent::SessionEstablished(_) => {});
let message = engine_poll_message!(client,connection,Logon);
assert_eq!(message.msg_seq_num,1);
{
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 2;
message.text = b"test".to_vec();
test_server.send_message_with_ver(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP1,message);
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.appl_ver_id,Some(MessageVersion::FIX50SP1)); assert_eq!(message.text,b"test");
}
{
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 3;
message.appl_ver_id = Some(MessageVersion::FIX40);
test_server.send_message_with_ver(FIXVersion::FIXT_1_1,message.appl_ver_id.unwrap(),message);
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.appl_ver_id,Some(MessageVersion::FIX40));
assert_eq!(message.text.len(),0);
}
}
#[test]
fn test_respond_to_test_request_immediately_after_logon() {
define_dictionary!(
Logon,
Heartbeat,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
client.send_message_box(connection,Box::new(new_logon_message()));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
let mut logon_message = new_fixt_message!(Logon);
logon_message.msg_seq_num = 1;
logon_message.encrypt_method = message.encrypt_method;
logon_message.heart_bt_int = message.heart_bt_int;
logon_message.default_appl_ver_id = message.default_appl_ver_id;
let mut test_request_message = new_fixt_message!(TestRequest);
test_request_message.msg_seq_num = 2;
test_request_message.test_req_id = b"test".to_vec();
let mut bytes = Vec::new();
logon_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
test_request_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
assert!(bytes.len() < 1400); let bytes_written = test_server.stream.write(&bytes).unwrap();
assert_eq!(bytes_written,bytes.len());
engine_poll_event!(client,EngineEvent::SessionEstablished(_) => {});
let message = engine_poll_message!(client,connection,Logon);
assert_eq!(message.msg_seq_num,1);
let message = engine_poll_message!(client,connection,TestRequest);
assert_eq!(message.msg_seq_num,2);
let message = test_server.recv_message::<Heartbeat>();
assert_eq!(message.msg_seq_num,2);
}
#[test]
fn test_respect_default_appl_ver_id_in_test_request_immediately_after_logon() {
define_fixt_message!(TestMessage: b"9999" => {
REQUIRED, text: Text [FIX50SP2..],
});
define_dictionary!(
Logon,
Logout,
Reject,
TestMessage,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let mut logon_message = new_logon_message();
logon_message.default_appl_ver_id = MessageVersion::FIX50SP2;
client.send_message_box(connection,Box::new(logon_message));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
let mut logon_message = new_fixt_message!(Logon);
logon_message.msg_seq_num = 1;
logon_message.encrypt_method = message.encrypt_method;
logon_message.heart_bt_int = message.heart_bt_int;
logon_message.default_appl_ver_id = message.default_appl_ver_id;
let mut test_message = new_fixt_message!(TestMessage);
test_message.msg_seq_num = 2;
test_message.text = b"test".to_vec();
let mut bytes = Vec::new();
logon_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
test_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
assert!(bytes.len() < 1400); let bytes_written = test_server.stream.write(&bytes).unwrap();
assert_eq!(bytes_written,bytes.len());
engine_poll_event!(client,EngineEvent::SessionEstablished(_) => {});
let message = engine_poll_message!(client,connection,Logon);
assert_eq!(message.msg_seq_num,1);
let message = engine_poll_message!(client,connection,TestMessage);
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.text,b"test".to_vec());
}
#[test]
fn test_logout_and_terminate_wrong_versioned_test_request_immediately_after_logon() {
define_dictionary!(
Logon,
Logout,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
client.send_message_box(connection,Box::new(new_logon_message()));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
let mut logon_message = new_fixt_message!(Logon);
logon_message.msg_seq_num = 1;
logon_message.encrypt_method = message.encrypt_method;
logon_message.heart_bt_int = message.heart_bt_int;
logon_message.default_appl_ver_id = message.default_appl_ver_id;
let mut test_request_message = new_fixt_message!(TestRequest);
test_request_message.msg_seq_num = 2;
test_request_message.test_req_id = b"test".to_vec();
let mut bytes = Vec::new();
logon_message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut bytes);
test_request_message.read(FIXVersion::FIX_4_2,MessageVersion::FIX42,&mut bytes);
assert!(bytes.len() < 1400); let bytes_written = test_server.stream.write(&bytes).unwrap();
assert_eq!(bytes_written,bytes.len());
engine_poll_event!(client,EngineEvent::SessionEstablished(_) => {});
let message = engine_poll_message!(client,connection,Logon);
assert_eq!(message.msg_seq_num,1);
let message = test_server.recv_message::<Logout>();
assert_eq!(message.text,b"BeginStr is wrong, expected 'FIXT.1.1' but received 'FIX.4.2'".to_vec());
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(
if let ConnectionTerminatedReason::BeginStrWrongError{received,expected} = reason {
assert_eq!(received,FIXVersion::FIX_4_2);
assert_eq!(expected,FIXVersion::FIXT_1_1);
true
}
else {
false
}
);
});
}
#[test]
fn test_max_message_size() {
const MAX_MESSAGE_SIZE: u64 = 4096;
define_fixt_message!(TestMessage: b"9999" => {
REQUIRED, text: Text [FIX40..],
});
define_dictionary!(
Logon,
Logout,
Reject,
TestMessage,
);
fn message_length<T: Message>(message: &T) -> u64 {
let mut buffer = Vec::new();
message.read(FIXVersion::FIXT_1_1,MessageVersion::FIX50SP2,&mut buffer);
buffer.len() as u64
}
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let mut message = new_logon_message();
message.max_message_size = MAX_MESSAGE_SIZE;
client.send_message_box(connection,Box::new(message));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
assert_eq!(message.max_message_size,MAX_MESSAGE_SIZE);
let mut response_message = new_fixt_message!(Logon);
response_message.encrypt_method = message.encrypt_method;
response_message.heart_bt_int = message.heart_bt_int;
response_message.default_appl_ver_id = message.default_appl_ver_id;
test_server.send_message(response_message);
engine_poll_event!(client,EngineEvent::SessionEstablished(_) => {});
let message = engine_poll_message!(client,connection,Logon);
assert_eq!(message.msg_seq_num,1);
let mut message = new_fixt_message!(TestMessage);
message.msg_seq_num = 2;
let current_message_len = message_length(&message);
for _ in 0..(MAX_MESSAGE_SIZE - current_message_len) + 1 {
message.text.push(b'A');
}
test_server.send_message(message);
let message = test_server.recv_message::<Reject>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.ref_seq_num,2);
assert_eq!(message.session_reject_reason.unwrap(),SessionRejectReason::Other);
let mut expected_error_text = b"Message size exceeds MaxMessageSize=".to_vec();
expected_error_text.extend_from_slice(MAX_MESSAGE_SIZE.to_string().as_bytes());
assert_eq!(message.text,expected_error_text);
}
{
let (mut test_server,mut client,connection) = TestStream::setup_test_server(build_dictionary());
let mut message = new_logon_message();
message.max_message_size = MAX_MESSAGE_SIZE;
client.send_message_box(connection,Box::new(message));
let message = test_server.recv_message::<Logon>();
assert_eq!(message.msg_seq_num,1);
assert_eq!(message.max_message_size,MAX_MESSAGE_SIZE);
let mut response_message = new_fixt_message!(Logon);
response_message.encrypt_method = message.encrypt_method.clone();
response_message.heart_bt_int = message.heart_bt_int;
response_message.default_appl_ver_id = message.default_appl_ver_id;
while message_length(&response_message) <= MAX_MESSAGE_SIZE {
let mut msg_type_grp = MsgTypeGrp::new();
msg_type_grp.ref_msg_type = b"L".to_vec();
msg_type_grp.ref_appl_ver_id = Some(MessageVersion::FIX50SP1);
msg_type_grp.msg_direction = MsgDirection::Send;
response_message.no_msg_types.push(Box::new(msg_type_grp));
}
test_server.send_message(response_message);
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::LogonParseError(parse_error) = reason {
if let ParseError::MessageSizeTooBig = parse_error { true } else { false }
}
else {
false
});
});
}
}
#[test]
fn test_block_read_when_write_blocks() {
define_dictionary!(
Logon,
Heartbeat,
Reject,
ResendRequest,
TestRequest,
);
{
let (mut test_server,client,_) = TestStream::setup_test_server_and_logon(build_dictionary());
let client = Arc::new(Mutex::new(client)); let client_clone = client.clone(); let thread_running = Arc::new(AtomicBool::new(true));
let thread_running_clone = thread_running.clone();
let thread_handle = thread::spawn(move || {
let mut client = client_clone.lock().unwrap();
while let Some(event) = client.poll(Duration::from_secs(2)) {
match event {
EngineEvent::ConnectionTerminated(_,_) => panic!("Engine should not have terminated connection yet."),
_ => {},
}
}
thread_running_clone.store(false,Ordering::Relaxed);
});
let mut outbound_msg_seq_num = 2;
let now = Instant::now();
let mut stop_writing = false;
loop {
if !thread_running.load(Ordering::Relaxed) {
thread_handle.join().expect("Thread must be stopped.");
break;
}
else if now.elapsed() > Duration::from_secs(15) {
panic!("Engine never blocked receiving of new messages.");
}
if !stop_writing {
let mut message = new_fixt_message!(TestRequest);
message.msg_seq_num = outbound_msg_seq_num;
message.test_req_id = b"test".to_vec();
if let Err(bytes_not_written) = test_server.send_message_with_timeout(message,Duration::from_millis(10)) {
stop_writing = true;
if bytes_not_written > 0 {
continue;
}
}
outbound_msg_seq_num += 1;
}
}
loop {
let message = test_server.recv_message::<Heartbeat>();
if message.msg_seq_num == outbound_msg_seq_num - 1 {
break;
}
}
let _ = test_server.stream.write(b"\x0110=000\x01=000");
let mut message = new_fixt_message!(TestRequest);
message.msg_seq_num = outbound_msg_seq_num + 1;
message.test_req_id = b"final".to_vec();
test_server.send_message(message);
let message = test_server.recv_fixt_message();
let message = match message_to_enum(&*message) {
MessageEnum::Heartbeat(message) => message,
_ => test_server.recv_message::<Heartbeat>(),
};
assert_eq!(message.test_req_id,b"final");
}
{
let (mut test_server,mut client,_) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut outbound_msg_seq_num = 2;
let now = Instant::now();
let mut stop_writing = false;
loop {
if now.elapsed() > Duration::from_secs(30) {
panic!("Engine never disconnected.");
}
if let Some(EngineEvent::ConnectionTerminated(_,reason)) = client.poll(Duration::from_millis(0)) {
assert!(if let ConnectionTerminatedReason::SocketNotWritableTimeoutError = reason { true } else { false });
assert!(test_server.is_stream_closed(Duration::from_secs(3)));
break;
}
if !stop_writing {
let mut message = new_fixt_message!(TestRequest);
message.msg_seq_num = outbound_msg_seq_num;
message.test_req_id = b"test".to_vec();
if let Err(_) = test_server.send_message_with_timeout(message,Duration::from_millis(10)) {
stop_writing = true;
}
outbound_msg_seq_num += 1;
}
}
}
}
#[test]
fn test_inbound_resend_loop_detection() {
define_dictionary!(
Logon,
Logout,
Heartbeat,
ResendRequest,
SequenceReset,
TestRequest,
);
let (mut test_server,mut client,connection) = TestStream::setup_test_server_and_logon(build_dictionary());
let mut message = new_fixt_message!(TestRequest);
message.msg_seq_num = 2;
message.test_req_id = b"test".to_vec();
test_server.send_message(message);
engine_poll_message!(client,connection,TestRequest);
let message = test_server.recv_message::<Heartbeat>();
assert_eq!(message.msg_seq_num,2);
assert_eq!(message.test_req_id,b"test");
const BASE_MSG_SEQ_NUM: u64 = 3;
for x in 0..AUTO_DISCONNECT_AFTER_INBOUND_RESEND_REQUEST_LOOP_COUNT {
let mut message = new_fixt_message!(ResendRequest);
message.msg_seq_num = BASE_MSG_SEQ_NUM + x;
message.begin_seq_no = 2;
message.end_seq_no = 0;
test_server.send_message(message);
engine_gap_fill_resend_request!(client,connection,2..3);
let _ = engine_poll_message!(client,connection,ResendRequest);
let message = test_server.recv_message::<SequenceReset>();
assert_eq!(message.gap_fill_flag,true);
assert_eq!(message.new_seq_no,3);
assert_eq!(message.msg_seq_num,2);
}
let mut message = new_fixt_message!(ResendRequest);
message.msg_seq_num = BASE_MSG_SEQ_NUM + AUTO_DISCONNECT_AFTER_INBOUND_RESEND_REQUEST_LOOP_COUNT;
message.begin_seq_no = 2;
message.end_seq_no = 0;
test_server.send_message(message);
let message = test_server.recv_message::<Logout>();
assert_eq!(message.text,b"Detected ResendRequest loop for BeginSeqNo 2".to_vec());
engine_poll_event!(client,EngineEvent::ConnectionTerminated(terminated_connection,reason) => {
assert_eq!(terminated_connection,connection);
assert!(if let ConnectionTerminatedReason::InboundResendRequestLoopError = reason { true } else { false });
});
assert!(test_server.is_stream_closed(Duration::from_secs(3)));
}