use std::{
cell::{Cell, RefCell},
rc::Rc,
time::{Duration, Instant},
};
use easyfix_messages::{
fields::{
DefaultApplVerId, EncryptMethod, FixStr, FixString, Int, MsgType, SeqNum,
SessionRejectReason, SessionStatus, ToFixString, Utc, UtcTimestamp,
},
messages::{
FieldTag, FixtMessage, Header, Heartbeat, Logon, Logout, Message, MsgCat, Reject,
ResendRequest, SequenceReset, TestRequest,
},
};
use tracing::{debug, error, info, instrument, trace, warn};
use crate::{
DisconnectReason, Sender,
application::{DeserializeError, Emitter, FixEventInternal, InputResponderMsg, Responder},
messages_storage::MessagesStorage,
new_header, new_trailer,
session_id::SessionId,
session_state::State,
settings::{SessionSettings, Settings},
};
#[derive(Debug, thiserror::Error)]
enum VerifyError {
#[error("Message already received")]
Duplicate,
#[error("Too high target sequence number {msg_seq_num}")]
ResendRequest { msg_seq_num: SeqNum },
#[error("Reject due to {reason:?} (tag={tag:?}, disconnect_reason={disconnect_reason:?})")]
Reject {
reason: SessionRejectReason,
tag: Option<FieldTag>,
disconnect_reason: Option<DisconnectReason>,
},
#[error("Invalid logon state")]
InvalidLogonState,
#[error("MsgSeqNum too low, expected {next_target_msg_seq_num}, got {msg_seq_num}")]
SeqNumTooLow {
msg_seq_num: SeqNum,
next_target_msg_seq_num: SeqNum,
},
#[error("Rejected by application ({reason:?}: {text})")]
ApplicationForcedReject {
ref_msg_type: FixString,
ref_seq_num: SeqNum,
reason: SessionRejectReason,
text: FixString,
ref_tag_id: Option<i64>,
},
#[error("Rejected by application with Logout<5> ({})", .text.as_ref().map(FixString::as_utf8).unwrap_or_default())]
ApplicationForcedLogout {
session_status: Option<SessionStatus>,
text: Option<FixString>,
disconnect: bool,
},
#[error("Disconnected by application: {reason:?}")]
ApplicationForcedDisconnect { reason: Option<String> },
#[error("Message processing aborted by application")]
ApplicationAbortedProcessing,
}
impl VerifyError {
fn invalid_time() -> VerifyError {
VerifyError::Reject {
reason: SessionRejectReason::SendingtimeAccuracyProblem,
tag: Some(FieldTag::SendingTime),
disconnect_reason: None,
}
}
fn invalid_comp_id(field_tag: FieldTag) -> VerifyError {
VerifyError::Reject {
reason: SessionRejectReason::CompidProblem,
tag: Some(field_tag),
disconnect_reason: Some(DisconnectReason::InvalidCompId),
}
}
fn target_seq_num_too_high(msg_seq_num: SeqNum) -> VerifyError {
VerifyError::ResendRequest { msg_seq_num }
}
fn missing_orig_time() -> VerifyError {
VerifyError::Reject {
reason: SessionRejectReason::RequiredTagMissing,
tag: Some(FieldTag::OrigSendingTime),
disconnect_reason: None,
}
}
fn invalid_orig_time() -> VerifyError {
VerifyError::Reject {
reason: SessionRejectReason::SendingtimeAccuracyProblem,
tag: Some(FieldTag::OrigSendingTime),
disconnect_reason: Some(DisconnectReason::InvalidOrigSendingTime),
}
}
}
impl From<InputResponderMsg> for VerifyError {
fn from(msg: InputResponderMsg) -> VerifyError {
match msg {
InputResponderMsg::Ignore => VerifyError::ApplicationAbortedProcessing,
InputResponderMsg::Reject {
ref_msg_type,
ref_seq_num,
reason,
text,
ref_tag_id,
} => VerifyError::ApplicationForcedReject {
ref_msg_type,
ref_seq_num,
reason,
text,
ref_tag_id,
},
InputResponderMsg::Logout {
session_status,
text,
disconnect,
} => VerifyError::ApplicationForcedLogout {
session_status,
text,
disconnect,
},
InputResponderMsg::Disconnect { reason } => {
VerifyError::ApplicationForcedDisconnect { reason }
}
}
}
}
trait MessageExt {
fn resend_as_gap_fill(&self) -> bool;
}
impl MessageExt for FixtMessage {
fn resend_as_gap_fill(&self) -> bool {
matches!(self.msg_cat(), MsgCat::Admin) && !matches!(self.msg_type(), MsgType::Reject)
}
}
#[derive(Debug)]
pub(crate) struct Session<S> {
state: Rc<RefCell<State<S>>>,
sender: Sender,
settings: Settings,
session_settings: SessionSettings,
emitter: Emitter,
heartbeat_interval: Cell<u64>,
disconnect_notify: RefCell<Option<tokio::sync::oneshot::Sender<()>>>,
}
impl<S: MessagesStorage> Session<S> {
pub(crate) fn new(
settings: Settings,
session_settings: SessionSettings,
state: Rc<RefCell<State<S>>>,
sender: Sender,
emitter: Emitter,
disconnect_notify_tx: tokio::sync::oneshot::Sender<()>,
) -> Session<S> {
let heartbeat_interval = settings
.heartbeat_interval
.unwrap_or(settings.auto_disconnect_after_no_logout.as_secs());
Session {
state,
settings,
session_settings,
sender,
emitter,
heartbeat_interval: Cell::new(heartbeat_interval),
disconnect_notify: RefCell::new(Some(disconnect_notify_tx)),
}
}
pub fn session_id(&self) -> &SessionId {
&self.session_settings.session_id
}
pub(crate) fn state(&self) -> &Rc<RefCell<State<S>>> {
&self.state
}
pub fn is_logged_on(state: &State<S>) -> bool {
state.logon_received() && state.logon_sent()
}
pub fn is_logon_time(&self, time: UtcTimestamp) -> bool {
self.session_settings
.logon_time
.contains(&time.timestamp().time())
}
fn check_sending_time(&self, sending_time: UtcTimestamp) -> Result<(), VerifyError> {
let Some(max_latency) = self.session_settings.max_latency else {
return Ok(());
};
let max_latency = chrono::Duration::from_std(max_latency).expect("duration");
let now = Utc::now();
let sending_timestamp = sending_time.timestamp();
let abs_time_diff = if now > sending_timestamp {
now - sending_timestamp
} else {
sending_timestamp - now
};
if abs_time_diff > max_latency {
warn!(
?abs_time_diff,
?max_latency,
"SendingTime<52> verification failed"
);
Err(VerifyError::invalid_time())
} else {
Ok(())
}
}
fn is_target_too_high(state: &State<S>, msg_seq_num: SeqNum) -> bool {
msg_seq_num > state.next_target_msg_seq_num()
}
fn is_target_too_low(state: &State<S>, msg_seq_num: SeqNum) -> bool {
msg_seq_num < state.next_target_msg_seq_num()
}
fn check_comp_id(
&self,
sender_comp_id: &FixStr,
target_comp_id: &FixStr,
) -> Result<(), VerifyError> {
if !self.session_settings.check_comp_id {
Ok(())
} else if self.session_settings.session_id.sender_comp_id() != target_comp_id {
Err(VerifyError::invalid_comp_id(FieldTag::TargetCompId))
} else if self.session_settings.session_id.target_comp_id() != sender_comp_id {
Err(VerifyError::invalid_comp_id(FieldTag::SenderCompId))
} else {
Ok(())
}
}
fn should_send_reset(&self, state: &State<S>) -> bool {
(self.session_settings.reset_on_logon
|| self.session_settings.reset_on_logout
|| self.session_settings.reset_on_disconnect)
&& state.next_target_msg_seq_num() == 1
&& state.next_sender_msg_seq_num() == 1
}
#[instrument(skip_all, err)]
fn check_logon_state(state: &State<S>, msg_type: MsgType) -> Result<(), VerifyError> {
if (msg_type == MsgType::Logon && state.reset_sent()) || state.reset_received() {
trace!("Allowed: Logon with ResetSeqNumFlag(141)=Y sent or received");
Ok(())
} else if msg_type == MsgType::Logon && !state.logon_received() {
trace!("Allowed: First Logon in session (Logon not received yet)");
Ok(())
} else if msg_type != MsgType::Logon && state.logon_received() {
trace!("Allowed: Message after Logon received");
Ok(())
} else if msg_type == MsgType::Logout && state.logon_sent() {
trace!("Allowed: Logout after Logon sent");
Ok(())
} else if msg_type != MsgType::Logout && state.logout_sent_time().is_some() {
trace!("Allowed: Message after Logout sent");
Ok(())
} else if msg_type == MsgType::SequenceReset {
trace!("Allowed: SequenceReset<4>");
Ok(())
} else if msg_type == MsgType::Reject {
trace!("Allowed: Reject<3>");
Ok(())
} else {
warn!(
state.reset_sent = state.reset_sent(),
state.reset_received = state.reset_received(),
state.logon_received = state.logon_received(),
state.logon_sent = state.logon_sent(),
state.logout_sent = ?state.logout_sent_time(),
"Not allowed: Invalid session state",
);
Err(VerifyError::InvalidLogonState)
}
}
#[instrument(skip_all, err)]
#[expect(clippy::await_holding_refcell_ref)]
async fn verify(
&self,
msg: Box<FixtMessage>,
check_too_high: bool,
check_too_low: bool,
) -> Result<(), VerifyError> {
let msg_type = msg.header.msg_type;
let sender_comp_id = &msg.header.sender_comp_id;
let target_comp_id = &msg.header.target_comp_id;
let sending_time = msg.header.sending_time;
let msg_seq_num = msg.header.msg_seq_num;
let mut state = self.state.borrow_mut();
let reset_received = state.reset_received();
Self::check_logon_state(&state, msg.header.msg_type)?;
self.check_sending_time(sending_time)?;
self.check_comp_id(sender_comp_id, target_comp_id)?;
if check_too_high && !reset_received && Self::is_target_too_high(&state, msg_seq_num) {
warn!(
"Target MsgSeqNum too high, expected {}, got {msg_seq_num}",
state.next_target_msg_seq_num()
);
state.enqueue_msg(msg);
Err(VerifyError::target_seq_num_too_high(msg_seq_num))
} else if check_too_low && !reset_received && Self::is_target_too_low(&state, msg_seq_num) {
if msg.header.poss_dup_flag.unwrap_or(false) {
if msg_type != MsgType::SequenceReset {
let Some(orig_sending_time) = msg.header.orig_sending_time else {
warn!("Target too low (orig sending time missing)");
return Err(VerifyError::missing_orig_time());
};
if orig_sending_time.timestamp() > sending_time.timestamp() {
error!("Target too low (invalid orig sending time)");
return Err(VerifyError::invalid_orig_time());
}
}
warn!("Target too low (duplicate)");
Err(VerifyError::Duplicate)
} else {
error!(
expected_msg_seq_num = state.next_target_msg_seq_num(),
"Target too low"
);
Err(VerifyError::SeqNumTooLow {
msg_seq_num,
next_target_msg_seq_num: state.next_target_msg_seq_num(),
})
}
} else {
if let Some(resend_range) = state.resend_range()
&& check_too_high
{
let begin_seq_num = *resend_range.start();
let end_seq_num = *resend_range.end();
if msg_seq_num >= end_seq_num {
info!(
begin_seq_num,
end_seq_num, "Resend request has been satisfied"
);
state.reset_resend_range();
}
}
drop(state);
let (sender, receiver) = tokio::sync::oneshot::channel();
match msg.msg_cat() {
MsgCat::Admin => {
self.emitter
.send(FixEventInternal::AdmMsgIn(Some(msg), Some(sender)))
.await
}
MsgCat::App => {
self.emitter
.send(FixEventInternal::AppMsgIn(Some(msg), Some(sender)))
.await
}
}
if let Ok(input_responder_message) = receiver.await {
return Err(input_responder_message.into());
}
Ok(())
}
}
pub(crate) fn send_logon_request(&self, state: &mut State<S>) {
if self.session_settings.reset_on_logon {
state.reset();
}
self.send(Box::new(Message::Logon(Logon {
encrypt_method: EncryptMethod::NoneOther,
heart_bt_int: self.heartbeat_interval.get().try_into().unwrap_or(Int::MAX),
reset_seq_num_flag: self.should_send_reset(state).then_some(true),
next_expected_msg_seq_num: if self.session_settings.enable_next_expected_msg_seq_num {
let next_expected_msg_seq_num = state.next_sender_msg_seq_num();
state.set_last_expected_logon_next_seq_num(next_expected_msg_seq_num);
Some(next_expected_msg_seq_num)
} else {
None
},
default_appl_ver_id: DefaultApplVerId::Fix50Sp2,
..Default::default()
})));
}
fn send_logon_response(&self, state: &mut State<S>, next_expected_msg_seq_num: Option<SeqNum>) {
if self.session_settings.reset_on_logon {
state.reset();
}
self.send(Box::new(Message::Logon(Logon {
encrypt_method: EncryptMethod::NoneOther,
heart_bt_int: self.heartbeat_interval.get().try_into().unwrap_or(Int::MAX),
reset_seq_num_flag: self.should_send_reset(state).then_some(true),
next_expected_msg_seq_num,
default_appl_ver_id: DefaultApplVerId::Fix50Sp2,
..Default::default()
})));
state.set_last_received_time(Instant::now());
state.set_logon_sent(true);
}
pub(crate) fn send_logout(
&self,
state: &mut State<S>,
session_status: Option<SessionStatus>,
text: Option<FixString>,
) {
#[allow(clippy::needless_update)]
self.send(Box::new(Message::Logout(Logout {
session_status,
text,
..Default::default()
})));
state.set_logout_sent_time(true);
}
fn send_reject(
&self,
state: &mut State<S>,
ref_msg_type: Option<FixString>,
ref_seq_num: SeqNum,
reason: SessionRejectReason,
text: FixString,
ref_tag_id: Option<i64>,
) {
if !matches!(
ref_msg_type.as_deref().and_then(MsgType::from_fix_str),
Some(MsgType::Logon) | Some(MsgType::SequenceReset)
) && ref_seq_num == state.next_target_msg_seq_num()
{
state.incr_next_target_msg_seq_num();
}
info!("Message {ref_seq_num} Rejected: {reason:?} (tag={ref_tag_id:?})");
self.send(Box::new(Message::Reject(Reject {
ref_seq_num,
ref_tag_id,
ref_msg_type,
session_reject_reason: Some(reason),
text: Some(text),
..Default::default()
})));
}
fn send_sequence_reset(&self, seq_num: SeqNum, new_seq_num: SeqNum) {
let mut sequence_reset = Box::new(FixtMessage {
header: Box::new(new_header(MsgType::SequenceReset)),
body: Box::new(Message::SequenceReset(SequenceReset {
gap_fill_flag: Some(true),
new_seq_no: new_seq_num,
})),
trailer: Box::new(new_trailer()),
});
sequence_reset.header.msg_seq_num = seq_num;
sequence_reset.header.poss_dup_flag = Some(true);
sequence_reset.header.sending_time = UtcTimestamp::now();
sequence_reset.header.orig_sending_time = Some(sequence_reset.header.sending_time);
info!(seq_num, new_seq_num, "SequenceReset sent (gap fill)");
self.send_raw(sequence_reset);
}
#[instrument(level = "trace", skip_all, fields(too_high_msg_seq_num))]
fn send_resend_request(&self, state: &mut State<S>, too_high_msg_seq_num: SeqNum) {
let begin_seq_no = state.next_target_msg_seq_num();
let mut end_seq_no = too_high_msg_seq_num.saturating_sub(1);
if let Some(queued_lowest) = state.lowest_queued_seq_num()
&& queued_lowest > begin_seq_no
{
let new_end_seq_no = queued_lowest.saturating_sub(1);
if new_end_seq_no < end_seq_no {
trace!(
new_end_seq_no = queued_lowest,
prev_end_seq_no = end_seq_no,
"clamping resend request upper bound to queued gap"
);
end_seq_no = new_end_seq_no;
}
}
if begin_seq_no > end_seq_no {
trace!(
begin_seq_no,
"ResendRequest suppressed; queued messages cover the gap"
);
return;
}
trace!(begin_seq_no, end_seq_no);
self.send(Box::new(Message::ResendRequest(ResendRequest {
begin_seq_no,
end_seq_no,
})));
state.set_resend_range(begin_seq_no..=end_seq_no);
}
fn send(&self, msg: Box<Message>) {
if let Err(msg) = self.sender.send(msg) {
unreachable!(
"Can't send message {:?}/{} - output stream is closed",
msg.msg_type(),
msg.header.msg_seq_num
);
}
}
fn send_raw(&self, msg: Box<FixtMessage>) {
if let Err(msg) = self.sender.send_raw(msg) {
unreachable!(
"Can't send message {:?}/{} - output stream is closed",
msg.msg_type(),
msg.header.msg_seq_num
);
}
}
#[expect(clippy::await_holding_refcell_ref)]
#[instrument(skip_all)]
pub(crate) async fn emit_logout(&self, reason: DisconnectReason) {
info!(?reason);
let mut state = self.state.borrow_mut();
if state.logon_received() || state.logon_sent() {
state.set_logon_received(false);
state.set_logon_sent(false);
drop(state);
self.emitter
.send(FixEventInternal::Logout(
self.session_settings.session_id.clone(),
reason,
))
.await;
} else {
info!(
"Logout not emitted: session was never established \
(neither Logon received nor Logon sent)"
);
}
}
#[instrument(
skip_all,
fields(?reason, reset = self.session_settings.reset_on_disconnect),
ret
)]
pub(crate) fn disconnect(&self, state: &mut State<S>, reason: DisconnectReason) {
if state.disconnected() {
info!("already disconnected");
return;
}
state.disconnect(self.session_settings.reset_on_disconnect);
self.sender.disconnect(reason);
if let Some(tx) = self.disconnect_notify.borrow_mut().take() {
let _ = tx.send(());
}
}
#[instrument(level = "trace", skip_all)]
fn resend_range(&self, state: &mut State<S>, begin_seq_num: SeqNum, mut end_seq_num: SeqNum) {
info!("resend range: ({begin_seq_num}, {end_seq_num})");
let next_sender_msg_seq_num = state.next_sender_msg_seq_num();
if end_seq_num == 0 || end_seq_num >= next_sender_msg_seq_num {
end_seq_num = next_sender_msg_seq_num - 1;
info!("adjust end_seq_num to {end_seq_num}");
}
if !self.session_settings.persist {
let next_sender_msg_seq_num = state.next_sender_msg_seq_num();
end_seq_num += 1;
if end_seq_num > next_sender_msg_seq_num {
end_seq_num = next_sender_msg_seq_num;
}
self.send_sequence_reset(begin_seq_num, end_seq_num);
return;
}
let mut gap_fill_range = None;
info!("fetch messages range from {begin_seq_num} to {end_seq_num}");
for msg_str in state.fetch_range(begin_seq_num..=end_seq_num) {
let mut msg = match FixtMessage::from_bytes(msg_str) {
Ok(msg) => msg,
Err(err) => {
error!(%err, "Failed to decode message bytes");
continue;
}
};
if msg.resend_as_gap_fill() {
trace!(
"Message {:?}/{} changed to gap fill",
msg.msg_type(),
msg.header.msg_seq_num
);
gap_fill_range
.get_or_insert((msg.header.msg_seq_num, msg.header.msg_seq_num - 1))
.1 += 1;
} else {
if let Some((begin_seq_num, end_seq_num)) = gap_fill_range.take() {
trace!("Resending messages from {begin_seq_num} to {end_seq_num} as gap fill");
self.send_sequence_reset(begin_seq_num, end_seq_num + 1);
}
trace!(
"Resending message {:?}/{}",
msg.msg_type(),
msg.header.msg_seq_num
);
msg.header.orig_sending_time = Some(msg.header.sending_time);
msg.header.sending_time = UtcTimestamp::MIN_UTC;
msg.header.poss_dup_flag = Some(true);
self.send_raw(msg);
}
}
if let Some((begin_seq_num, end_seq_num)) = gap_fill_range {
info!("Resending messages from {begin_seq_num} to {end_seq_num} as gap fill");
self.send_sequence_reset(begin_seq_num, end_seq_num + 1);
}
}
async fn on_heartbeat(&self, message: Box<FixtMessage>) -> Result<(), VerifyError> {
trace!("got heartbeat");
let Message::Heartbeat(ref heartbeat) = *message.body else {
unreachable!();
};
let test_req_id = if self.session_settings.verify_test_request_id {
heartbeat.test_req_id.clone()
} else {
None
};
self.verify(message, true, true).await?;
let mut state = self.state.borrow_mut();
if let Some(test_req_id) = test_req_id {
state.validate_grace_period_test_req_id(&test_req_id);
}
state.incr_next_target_msg_seq_num();
Ok(())
}
async fn on_test_request(&self, message: Box<FixtMessage>) -> Result<(), VerifyError> {
trace!("on_test_request");
let Message::TestRequest(ref test_request) = *message.body else {
unreachable!();
};
let test_req_id = test_request.test_req_id.clone();
self.verify(message, true, true).await?;
trace!("Send Heartbeat");
self.send(Box::new(Message::Heartbeat(Heartbeat {
test_req_id: Some(test_req_id),
})));
self.state.borrow_mut().incr_next_target_msg_seq_num();
Ok(())
}
async fn on_resend_request(&self, msg: Box<FixtMessage>) -> Result<(), VerifyError> {
trace!("on_resend_request");
let Message::ResendRequest(ref resend_request) = *msg.body else {
unreachable!();
};
let begin_seq_no = resend_request.begin_seq_no;
let end_seq_no = resend_request.end_seq_no;
let msg_seq_num = msg.header.msg_seq_num;
self.verify(msg, false, true).await?;
info!("Received ResendRequest FROM: {begin_seq_no} TO: {end_seq_no}");
let mut state = self.state.borrow_mut();
self.resend_range(&mut state, begin_seq_no, end_seq_no);
if Self::is_target_too_high(&state, msg_seq_num) {
state.enqueue_msg(Box::new(FixtMessage {
header: Box::new(Header {
msg_seq_num,
..new_header(MsgType::ResendRequest)
}),
body: Box::new(Message::ResendRequest(ResendRequest {
begin_seq_no,
end_seq_no,
})),
trailer: Box::new(new_trailer()),
}));
return Err(VerifyError::ResendRequest { msg_seq_num });
} else if state.next_target_msg_seq_num() == msg_seq_num {
state.incr_next_target_msg_seq_num();
}
Ok(())
}
async fn on_reject(&self, message: Box<FixtMessage>) -> Result<(), VerifyError> {
trace!("on_reject");
self.verify(message, false, true).await?;
self.state.borrow_mut().incr_next_target_msg_seq_num();
Ok(())
}
async fn on_sequence_reset(&self, message: Box<FixtMessage>) -> Result<(), VerifyError> {
trace!("on_sequence_reset");
let Message::SequenceReset(ref sequence_reset) = *message.body else {
unreachable!();
};
let ref_msg_type = message.header.msg_type.as_fix_str().to_owned();
let ref_seq_num = message.header.msg_seq_num;
let is_gap_fill = sequence_reset.gap_fill_flag.unwrap_or(false);
let new_seq_no = sequence_reset.new_seq_no;
self.verify(message, is_gap_fill, is_gap_fill).await?;
let mut state = self.state().borrow_mut();
if new_seq_no > state.next_target_msg_seq_num() {
info!("Set next target MsgSeqNo to {new_seq_no}");
state.set_next_target_msg_seq_num(new_seq_no);
} else if new_seq_no < state.next_sender_msg_seq_num() {
let reject_reason = SessionRejectReason::ValueIsIncorrect;
let tag = FieldTag::NewSeqNo as i64;
let text = format!("{reject_reason:?} (tag={tag}) - NewSeqNum too low");
self.send_reject(
&mut state,
Some(ref_msg_type),
ref_seq_num,
reject_reason,
FixString::from_ascii_lossy(text.into_bytes()),
Some(tag),
);
}
Ok(())
}
async fn on_logout(&self, message: Box<FixtMessage>) -> Result<DisconnectReason, VerifyError> {
if self.session_settings.verify_logout {
self.verify(message, true, true).await?;
} else if let Err(e) = self.verify(message, false, false).await {
error!("logout failed: {e}");
}
let mut state = self.state.borrow_mut();
let disconnect_reason = if state.logout_sent_time().is_some() {
info!("received logout response");
DisconnectReason::LocalRequestedLogout
} else {
info!("received logout request");
self.send_logout(
&mut state,
Some(SessionStatus::SessionLogoutComplete),
Some(FixString::from_ascii_lossy(b"Responding".to_vec())),
);
info!("sending logout response");
DisconnectReason::RemoteRequestedLogout
};
state.incr_next_target_msg_seq_num();
if self.session_settings.reset_on_logout {
state.reset();
}
Ok(disconnect_reason)
}
#[instrument(level = "trace", skip_all, err, ret)]
#[expect(clippy::await_holding_refcell_ref)]
async fn on_logon(
&self,
message: Box<FixtMessage>,
) -> Result<Option<DisconnectReason>, VerifyError> {
let (
enabled,
initiate,
should_send_logon,
reset_received,
reset_sent,
reset_seq_num_flag,
heart_bt_int,
next_expected_msg_seq_num,
) = {
let state = self.state.borrow_mut();
let Message::Logon(ref logon) = *message.body else {
unreachable!()
};
(
state.enabled(),
state.initiate(),
state.should_send_logon(),
state.reset_received(),
state.reset_sent(),
logon.reset_seq_num_flag.unwrap_or(false),
logon.heart_bt_int,
logon.next_expected_msg_seq_num,
)
};
if !enabled {
error!("Session is not enabled for logon");
return Ok(Some(DisconnectReason::InvalidLogonState));
}
if !self.is_logon_time(message.header.sending_time) {
error!("Received logon outside of valid logon time");
return Ok(Some(DisconnectReason::InvalidLogonState));
}
if reset_seq_num_flag {
self.state.borrow_mut().set_reset_received(true);
}
let msg_seq_num = message.header.msg_seq_num;
self.verify(message, false, true).await?;
let enable_next_expected_msg_seq_num =
self.session_settings.enable_next_expected_msg_seq_num
&& next_expected_msg_seq_num.is_some();
if reset_seq_num_flag {
let mut state = self.state.borrow_mut();
state.set_reset_received(true);
info!("Logon contains ResetSeqNumFlag=Y, reseting sequence numbers to 1");
if !state.reset_sent() {
state.reset();
}
} else if reset_sent && msg_seq_num == 1 {
info!("Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request");
self.state.borrow_mut().set_reset_received(true);
}
if should_send_logon && !reset_received {
error!("Received logon response before sending request");
return Ok(Some(DisconnectReason::InvalidLogonState));
}
if !initiate && self.session_settings.reset_on_logon {
self.state.borrow_mut().reset();
}
let mut state = self.state.borrow_mut();
state.set_logon_received(true);
let next_sender_msg_num_at_logon_received = state.next_sender_msg_seq_num();
if enable_next_expected_msg_seq_num
&& let Some(next_expected_msg_seq_num) = next_expected_msg_seq_num
{
let next_sender_msg_seq_num = state.next_sender_msg_seq_num();
if next_expected_msg_seq_num > next_sender_msg_seq_num {
let error_msg = format!(
"NextExpectedMsgSeqNum<789> too high \
(expected {next_sender_msg_seq_num}, \
got {next_expected_msg_seq_num})",
);
error!(error_msg);
let err = FixString::from_ascii_lossy(error_msg.into_bytes());
self.send_logout(
&mut state,
Some(SessionStatus::ReceivedNextExpectedMsgSeqNumTooHigh),
Some(err),
);
return Ok(Some(DisconnectReason::InvalidLogonState));
}
}
let is_logon_in_normal_sequence =
!Self::is_target_too_high(&state, msg_seq_num) || self.session_settings.reset_on_logon;
if !state.initiate() || (state.reset_received() && !state.reset_sent()) {
info!("Received logon request");
if self.settings.heartbeat_interval.is_none() {
if heart_bt_int <= 0 {
return Err(VerifyError::Reject {
reason: SessionRejectReason::ValueIsIncorrect,
tag: Some(FieldTag::HeartBtInt),
disconnect_reason: None,
});
}
self.heartbeat_interval.set(heart_bt_int as u64);
}
if enable_next_expected_msg_seq_num {
let mut next_expected_target_num = state.next_target_msg_seq_num();
if is_logon_in_normal_sequence {
next_expected_target_num += 1;
}
info!("Responding to Logon request with tag 789={next_expected_target_num}");
state.set_last_expected_logon_next_seq_num(next_expected_target_num);
self.send_logon_response(&mut state, Some(next_expected_target_num));
} else {
info!("Responding to Logon request");
self.send_logon_response(&mut state, None);
}
} else {
info!("Received logon response");
}
state.set_reset_sent(false);
state.set_reset_received(false);
let mut ret = Ok(None);
if !is_logon_in_normal_sequence {
if state.is_expected_logon_next_seq_num_sent() {
state.set_reset_range_from_last_expected_logon_next_seq_num();
info!("Required resend will be suppressed as we are setting tag 789");
}
warn!(
"Target MsgSeqNum too high, expected {}, got {msg_seq_num}",
state.next_target_msg_seq_num()
);
state.enqueue_msg(
Box::new(FixtMessage {
header: Box::new(Header {
msg_seq_num,
..new_header(MsgType::Logon)
}),
body: Box::new(Message::Logon(Logon::default())),
trailer: Box::new(new_trailer()),
}),
);
ret = Err(VerifyError::ResendRequest { msg_seq_num });
} else {
state.incr_next_target_msg_seq_num();
}
if enable_next_expected_msg_seq_num
&& let Some(next_expected_msg_seq_num) = next_expected_msg_seq_num
{
if next_expected_msg_seq_num != next_sender_msg_num_at_logon_received {
let mut end_seq_no = next_sender_msg_num_at_logon_received;
if !self.session_settings.persist {
end_seq_no += 1;
let next = state.next_sender_msg_seq_num();
if end_seq_no > next {
end_seq_no = next;
}
info!(
"Received implicit ResendRequest via Logon FROM: {next_expected_msg_seq_num}, \
TO: {next_sender_msg_num_at_logon_received} will be reset"
);
self.send_sequence_reset(next_expected_msg_seq_num, end_seq_no);
} else {
info!(
"Received implicit ResendRequest via Logon FROM: {next_expected_msg_seq_num} \
TO: {next_sender_msg_num_at_logon_received} will be resent"
);
self.resend_range(&mut state, next_expected_msg_seq_num, end_seq_no)
}
}
}
if Self::is_logged_on(&state) {
state.reset_grace_period();
drop(state);
self.emitter
.send(FixEventInternal::Logon(
self.session_settings.session_id.clone(),
Some(self.sender.clone()),
))
.await;
}
ret
}
#[instrument(
name = "on_msg",
level = "trace",
skip_all,
fields(
msg_seq_num = msg.header.msg_seq_num,
msg_type = ?msg.msg_type()
)
)]
#[expect(clippy::await_holding_refcell_ref)]
async fn on_message_in_impl(&self, msg: Box<FixtMessage>) -> Option<DisconnectReason> {
let msg_type = msg.header.msg_type;
let msg_seq_num = msg.header.msg_seq_num;
trace!(msg_type = format!("{msg_type:?}<{}>", msg_type.as_fix_str()));
let result = match *msg.body {
Message::Heartbeat(ref _heartbeat) => self.on_heartbeat(msg).await,
Message::TestRequest(ref _test_request) => self.on_test_request(msg).await,
Message::ResendRequest(ref _resend_request) => self.on_resend_request(msg).await,
Message::Reject(ref _reject) => self.on_reject(msg).await,
Message::SequenceReset(ref _sequence_reset) => self.on_sequence_reset(msg).await,
Message::Logout(ref _logout) => match self.on_logout(msg).await {
Ok(disconnect_reason) => return Some(disconnect_reason),
Err(e) => Err(e),
},
Message::Logon(ref _logon) => match self.on_logon(msg).await {
Ok(Some(disconnect_reason)) => {
return Some(disconnect_reason);
}
Ok(None) => Ok(()),
Err(e) => Err(e),
},
_ => self
.verify(msg, true, true)
.await
.map(|_| self.state.borrow_mut().incr_next_target_msg_seq_num()),
};
match result {
Ok(()) => return None,
Err(VerifyError::Duplicate) => {
}
Err(VerifyError::ResendRequest { msg_seq_num }) => {
if let Some(resend_range) = self.state.borrow().resend_range() {
let begin_seq_num = *resend_range.start();
let end_seq_num = *resend_range.end();
if !self.session_settings.send_redundant_resend_requests
&& msg_seq_num >= begin_seq_num
{
warn!(
begin_seq_num,
end_seq_num,
too_high_msg_seq_num = msg_seq_num,
"ResendRequest already sent, suppressing another attempt"
);
return None;
}
}
self.send_resend_request(&mut self.state.borrow_mut(), msg_seq_num);
}
Err(VerifyError::Reject {
reason,
tag,
disconnect_reason,
}) => {
let mut state = self.state().borrow_mut();
let tag_as_i64 = tag.map(|t| t as i64);
self.send_reject(
&mut state,
Some(msg_type.as_fix_str().to_owned()),
msg_seq_num,
reason,
if let Some(tag) = tag_as_i64 {
FixString::from_ascii_lossy(format!("{reason:?} (tag={tag})").into_bytes())
} else {
FixString::from_ascii_lossy(format!("{reason:?}").into_bytes())
},
tag_as_i64,
);
self.emitter
.send(FixEventInternal::DeserializeError(
self.session_id().clone(),
DeserializeError::Reject {
msg_type: Some(msg_type.as_fix_str().to_fix_string()),
seq_num: msg_seq_num,
tag: tag.map(|t| t as u16),
reason,
},
))
.await;
if let Some(disconnect_reason) = disconnect_reason {
self.send_logout(&mut state, None, None);
return Some(disconnect_reason);
}
}
Err(e @ VerifyError::SeqNumTooLow { .. }) => {
let mut state = self.state.borrow_mut();
self.send_logout(
&mut state,
Some(SessionStatus::ReceivedMsgSeqNumTooLow),
Some(FixString::from_ascii_lossy(e.to_string().into_bytes())),
);
return Some(DisconnectReason::MsgSeqNumTooLow);
}
Err(VerifyError::InvalidLogonState) => {
error!("disconnecting because of invalid logon state");
return Some(DisconnectReason::InvalidLogonState);
}
Err(VerifyError::ApplicationForcedReject {
ref_msg_type,
ref_seq_num,
reason,
text,
ref_tag_id,
}) => {
warn!("Rejected by application ({reason:?}: {text})");
self.send_reject(
&mut self.state().borrow_mut(),
Some(ref_msg_type),
ref_seq_num,
reason,
text,
ref_tag_id,
);
}
Err(VerifyError::ApplicationForcedLogout {
session_status,
text,
disconnect,
}) => {
error!(
"Rejected by application with Logout<5> ({})",
text.as_ref().map(FixString::as_utf8).unwrap_or_default()
);
let mut state = self.state.borrow_mut();
self.send_logout(&mut state, session_status, text);
if disconnect {
return Some(DisconnectReason::ApplicationForcedDisconnect);
}
}
Err(VerifyError::ApplicationForcedDisconnect { reason }) => {
error!("Disconnected by application: {reason:?}");
return Some(DisconnectReason::ApplicationForcedDisconnect);
}
Err(VerifyError::ApplicationAbortedProcessing) => {
self.state().borrow_mut().incr_next_target_msg_seq_num();
}
}
None
}
pub async fn on_message_in(&self, msg: Box<FixtMessage>) -> Option<DisconnectReason> {
if !self.session_settings.verify_test_request_id {
self.state.borrow_mut().reset_grace_period();
}
if let Some(disconnect_reason) = self.on_message_in_impl(msg).await {
return Some(disconnect_reason);
}
loop {
let Some(msg) = self.state.borrow_mut().retrieve_msg() else {
break;
};
debug!("Processing queued message {}", msg.header.msg_seq_num);
if matches!(msg.msg_type(), MsgType::Logon | MsgType::ResendRequest) {
debug!(msg_type = ?msg.msg_type(), "message already processed");
self.state.borrow_mut().incr_next_target_msg_seq_num();
} else if let Some(disconnect_reason) = self.on_message_in_impl(msg).await {
return Some(disconnect_reason);
}
}
None
}
pub async fn on_message_out(&self, msg: Box<FixtMessage>) -> Option<Box<FixtMessage>> {
let (sender, receiver) = tokio::sync::oneshot::channel();
match msg.msg_cat() {
MsgCat::Admin => {
self.emitter
.send(FixEventInternal::AdmMsgOut(
Some(msg),
Responder::new(sender),
))
.await;
Some(receiver.await.unwrap())
}
MsgCat::App => {
self.emitter
.send(FixEventInternal::AppMsgOut(
Some(msg),
Responder::new(sender),
))
.await;
#[expect(clippy::manual_ok_err)]
match receiver.await {
Ok(msg) => Some(msg),
Err(_gap_fill) => {
None
} }
}
}
}
pub async fn on_deserialize_error(&self, error: DeserializeError) -> Option<DisconnectReason> {
trace!("on_deserialize_error");
let text = FixString::from_ascii_lossy(error.to_string().into_bytes());
error!(deserialize_error = %text);
match &error {
DeserializeError::GarbledMessage(reason) => error!("Garbled message: {reason}"),
DeserializeError::Logout => {
let mut state = self.state.borrow_mut();
self.send_logout(
&mut state,
None,
Some(FixString::from_ascii_lossy(
b"MsgSeqNum(34) not found".to_vec(),
)),
);
return Some(DisconnectReason::MsgSeqNumNotFound);
}
DeserializeError::Reject {
msg_type,
seq_num,
tag,
reason,
} => self.send_reject(
&mut self.state().borrow_mut(),
msg_type.clone(),
*seq_num,
*reason,
text,
tag.map(Int::from),
),
}
self.emitter
.send(FixEventInternal::DeserializeError(
self.session_id().clone(),
error,
))
.await;
None
}
pub async fn on_in_timeout(self: &Rc<Self>) -> bool {
trace!("on_in_timeout");
let mut state = self.state().borrow_mut();
let timeout_cnt_limit = self.settings.auto_disconnect_after_no_heartbeat;
if timeout_cnt_limit > 0 && state.input_timeout_cnt() >= timeout_cnt_limit as usize {
warn!("Grace period is over");
return true;
}
let test_req_id = FixString::from_ascii_lossy(
format!("{}", Utc::now().format("%Y%m%d-%H:%M:%S.%f")).into_bytes(),
);
state.register_grace_period_test_req_id(test_req_id.clone());
self.send(Box::new(Message::TestRequest(TestRequest { test_req_id })));
false
}
pub async fn on_out_timeout(self: &Rc<Self>) {
trace!("on_out_timeout");
self.send(Box::new(Message::Heartbeat(Heartbeat {
test_req_id: None,
})));
}
pub fn heartbeat_interval(&self) -> Duration {
Duration::from_secs(self.heartbeat_interval.get())
}
pub fn logout_deadline(&self) -> Option<Instant> {
let logout_sent_time = self.state.borrow().logout_sent_time()?;
Some(
logout_sent_time
.checked_add(self.settings.auto_disconnect_after_no_logout)
.unwrap_or(logout_sent_time),
)
}
}