use crate::common::actions::when;
use crate::common::assertions::{assert_msg_type, then};
use crate::common::cleanup::finally;
use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session};
use crate::common::test_messages::{
TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request,
};
use hotfix::message::ResendRequest;
use hotfix::session::Status;
use hotfix_message::fix44::{GAP_FILL_FLAG, MSG_TYPE, MsgType, NEW_SEQ_NO, ORDER_ID};
use hotfix_message::{FieldType, Part};
use std::time::Duration;
#[tokio::test]
async fn test_message_sequence_number_too_high() {
let (mut session, mut counterparty) = given_an_active_session().await;
when(&mut counterparty)
.has_previously_sent(TestMessage::dummy_execution_report())
.await;
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut session)
.status_changes_to(Status::AwaitingResend {
begin: 2,
end: 3,
attempts: 1,
})
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ResendRequest))
.await;
when(&mut counterparty).resends_message(2).await; when(&mut counterparty).resends_message(3).await; then(&mut session).status_changes_to(Status::Active).await;
finally(&session, &mut counterparty).disconnect().await;
}
#[tokio::test]
async fn test_infinite_resend_requests_are_prevented() {
let (mut session, mut counterparty) = given_an_active_session().await;
let garbled_message_seq_num = counterparty.next_target_sequence_number();
let garbled_message =
build_execution_report_with_incorrect_body_length(garbled_message_seq_num);
when(&mut counterparty)
.sends_raw_message(garbled_message)
.await;
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ResendRequest))
.await;
then(&mut session)
.status_changes_to(Status::AwaitingResend {
begin: garbled_message_seq_num,
end: garbled_message_seq_num + 1,
attempts: 1,
})
.await;
for attempts in 2..4 {
when(&mut counterparty)
.resends_message_without_modification(garbled_message_seq_num)
.await;
when(&mut counterparty)
.resends_message(garbled_message_seq_num + 1)
.await;
then(&mut session)
.status_changes_to(Status::AwaitingResend {
begin: garbled_message_seq_num,
end: garbled_message_seq_num + 1,
attempts,
})
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ResendRequest))
.await;
}
when(&mut counterparty)
.resends_message_without_modification(garbled_message_seq_num)
.await;
when(&mut counterparty)
.resends_message(garbled_message_seq_num + 1)
.await;
then(&mut counterparty).gets_disconnected().await;
}
#[tokio::test]
async fn test_resent_message_previously_received_is_ignored() {
let (mut session, mut counterparty) = given_an_active_session().await;
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut session)
.receives(|msg| {
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap();
assert_eq!(msg_type, MsgType::ExecutionReport.to_string());
})
.await;
then(&mut session).target_sequence_number_reaches(2).await;
when(&mut counterparty).resends_message(2).await;
let new_report_order_id = "xxx".to_string();
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report_with_order_id(
new_report_order_id.clone(),
))
.await;
then(&mut session)
.receives(|msg| {
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap();
assert_eq!(msg_type, MsgType::ExecutionReport.to_string());
let order_id: &str = msg.get(ORDER_ID).unwrap();
assert_eq!(order_id, &new_report_order_id);
})
.await;
finally(&session, &mut counterparty).disconnect().await;
}
#[tokio::test]
async fn test_invalid_resend_request_gets_rejected() {
for (begin_seq_no, end_seq_no) in [(None, Some(2)), (Some(1), None)] {
let (session, mut counterparty) = given_an_active_session().await;
let seq_num = counterparty.next_target_sequence_number();
let invalid_resend_request =
build_invalid_resend_request(seq_num, begin_seq_no, end_seq_no);
when(&mut counterparty)
.sends_raw_message(invalid_resend_request)
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::Reject))
.await;
finally(&session, &mut counterparty).disconnect().await;
}
}
#[tokio::test(start_paused = true)]
async fn test_resend_request_with_gap_fill_for_admin_messages() {
let (session, mut counterparty) = given_an_active_session().await;
when(Duration::from_secs(HEARTBEAT_INTERVAL + 1))
.elapses()
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::Heartbeat))
.await;
when(&session)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
.await;
let resend_request = ResendRequest::new(2, 3);
when(&mut counterparty).sends_message(resend_request).await;
then(&mut counterparty)
.receives(|msg| {
assert_msg_type(msg, MsgType::SequenceReset);
assert_eq!(msg.get::<&str>(GAP_FILL_FLAG).unwrap(), "Y");
assert_eq!(
msg.header()
.get::<u64>(hotfix_message::fix44::MSG_SEQ_NUM)
.unwrap(),
2
);
assert_eq!(msg.get::<u64>(NEW_SEQ_NO).unwrap(), 3);
})
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
.await;
finally(&session, &mut counterparty).disconnect().await;
}
#[tokio::test]
async fn test_resend_request_not_deadlocked_when_both_sides_detect_gap() {
let (mut session, mut counterparty) = given_an_active_session().await;
when(&mut counterparty)
.has_previously_sent(TestMessage::dummy_execution_report())
.await;
when(&mut counterparty)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut session)
.status_changes_to(Status::AwaitingResend {
begin: 2,
end: 3,
attempts: 1,
})
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ResendRequest))
.await;
let resend_request = ResendRequest::new(1, 0);
when(&mut counterparty).sends_message(resend_request).await;
then(&mut counterparty)
.receives(|msg| {
let msg_type: &str = msg.header().get(MSG_TYPE).unwrap();
assert!(
msg_type == MsgType::SequenceReset.to_string()
|| msg_type == MsgType::ExecutionReport.to_string(),
"expected SequenceReset or resent message in response to ResendRequest, got {msg_type}"
);
})
.await;
when(&mut counterparty).resends_message(2).await;
when(&mut counterparty).resends_message(3).await;
then(&mut session).status_changes_to(Status::Active).await;
finally(&session, &mut counterparty).disconnect().await;
}