mod active;
mod awaiting_logon;
mod awaiting_logout;
mod awaiting_resend;
mod disconnected;
pub(crate) use active::{ActiveState, calculate_peer_interval};
pub(crate) use awaiting_logon::AwaitingLogonState;
pub(crate) use awaiting_logout::AwaitingLogoutState;
pub(crate) use awaiting_resend::AwaitingResendState;
pub(crate) use disconnected::DisconnectedState;
use crate::Application;
use crate::message::OutboundMessage;
use crate::message::logon::Logon;
use crate::message::logout::Logout;
use crate::message::verification::VerificationFlags;
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
use crate::session::error::{
InternalSendError, InternalSendResultExt, SessionOperationError, SetNextTargetSeqNumError,
};
use crate::session::event::ScheduleResponse;
use crate::session::info::Status as SessionInfoStatus;
use crate::transport::writer::WriterRef;
use hotfix_message::message::Message;
use hotfix_store::MessageStore;
use std::num::NonZeroU64;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tracing::{debug, error, warn};
const TEST_REQUEST_THRESHOLD: f64 = 1.2;
pub(crate) type TestRequestId = String;
pub enum SessionState {
AwaitingLogon(AwaitingLogonState),
AwaitingResend(AwaitingResendState),
AwaitingLogout(AwaitingLogoutState),
Active(ActiveState),
Disconnected(DisconnectedState),
}
impl SessionState {
pub fn new_disconnected(reconnect: bool, reason: &str) -> Self {
Self::Disconnected(DisconnectedState::new(reconnect, reason))
}
pub fn new_active(writer: WriterRef, heartbeat_interval: u64) -> Self {
let peer_interval = calculate_peer_interval(heartbeat_interval);
Self::Active(ActiveState {
writer,
heartbeat_deadline: Instant::now() + Duration::from_secs(heartbeat_interval),
peer_deadline: Instant::now() + Duration::from_secs(peer_interval),
sent_test_request_id: None,
})
}
pub(crate) async fn on_peer_logon<A: Application, S: MessageStore>(
&self,
ctx: &mut SessionCtx<A, S>,
) -> Result<TransitionResult, SessionOperationError> {
match self {
Self::AwaitingLogon(state) => state.on_peer_logon(ctx).await,
_ => {
error!("received unexpected logon message");
Ok(TransitionResult::Stay)
}
}
}
pub(crate) fn on_peer_logout(&self) -> TransitionResult {
match self {
Self::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
TransitionResult::TransitionTo(Self::new_disconnected(
*reconnect,
"logout completed",
))
}
Self::Disconnected(_) => TransitionResult::Stay,
_ => TransitionResult::TransitionTo(Self::new_disconnected(
true,
"peer has logged us out",
)),
}
}
pub(crate) fn on_disconnect(&self, reason: &str) -> TransitionResult {
match self {
Self::Active(_) | Self::AwaitingLogon(_) | Self::AwaitingResend(_) => {
TransitionResult::TransitionTo(Self::new_disconnected(true, reason))
}
Self::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
TransitionResult::TransitionTo(Self::new_disconnected(*reconnect, reason))
}
Self::Disconnected(_) => {
warn!("disconnect message was received, but the session is already disconnected");
TransitionResult::Stay
}
}
}
pub(crate) fn pre_process_inbound(&mut self, message: Message) -> PreProcessDecision {
match self {
Self::AwaitingResend(state) => state.pre_process_inbound(message),
Self::AwaitingLogon(state) => state.pre_process_inbound(message),
_ => PreProcessDecision::Accept(message),
}
}
pub fn should_reconnect(&self) -> bool {
match self {
SessionState::Disconnected(DisconnectedState { reconnect, .. }) => *reconnect,
_ => true,
}
}
pub async fn send_message<A, S>(
&mut self,
ctx: &mut SessionCtx<A, S>,
message: impl OutboundMessage,
) -> Result<u64, InternalSendError>
where
A: Application,
S: MessageStore,
{
let message_type = message.message_type().to_string();
let prepared = ctx.prepare_message(message).await?;
let raw = prepared.raw;
match self {
Self::Active(ActiveState { writer, .. })
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => {
if message_type == Logon::MSG_TYPE {
error!("logon message is invalid for active sessions")
} else {
writer.send_raw_message(raw).await
}
}
Self::AwaitingLogon(AwaitingLogonState {
writer, logon_sent, ..
}) => match message_type.as_str() {
Logon::MSG_TYPE => {
if *logon_sent {
error!("trying to send logon twice");
} else {
writer.send_raw_message(raw).await;
*logon_sent = true;
}
}
Logout::MSG_TYPE => {
writer.send_raw_message(raw).await;
}
_ => error!("invalid outgoing message for AwaitingLogon state"),
},
Self::AwaitingLogout(_) => {
error!("trying to send message while awaiting logout");
}
_ => error!("trying to write without an established connection"),
}
self.reset_heartbeat_timer(ctx.config.heartbeat_interval);
Ok(prepared.seq_num)
}
pub async fn disconnect_writer(&self) {
match self {
Self::Active(ActiveState { writer, .. })
| Self::AwaitingLogon(AwaitingLogonState { writer, .. })
| Self::AwaitingLogout(AwaitingLogoutState { writer, .. })
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => writer.disconnect().await,
_ => debug!("disconnecting an already disconnected session"),
}
}
pub(crate) fn get_writer(&self) -> Option<&WriterRef> {
match self {
Self::Active(ActiveState { writer, .. })
| Self::AwaitingLogon(AwaitingLogonState { writer, .. })
| Self::AwaitingLogout(AwaitingLogoutState { writer, .. })
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => Some(writer),
_ => None,
}
}
pub fn try_transition_to_awaiting_logout(
&self,
logout_timeout: Duration,
reconnect: bool,
) -> TransitionResult {
if matches!(self, SessionState::AwaitingLogout(_)) {
debug!("already in awaiting logout state");
return TransitionResult::Stay;
}
if let Some(writer) = self.get_writer() {
TransitionResult::TransitionTo(SessionState::AwaitingLogout(AwaitingLogoutState {
writer: writer.clone(),
logout_timeout: Instant::now() + logout_timeout,
reconnect,
}))
} else {
error!("trying to transition to awaiting logout without an established connection");
TransitionResult::Stay
}
}
pub(crate) async fn handle_verification_issue<A: Application, S: MessageStore>(
&mut self,
ctx: &mut SessionCtx<A, S>,
message: &Message,
flags: VerificationFlags,
) -> Result<VerificationResult, SessionOperationError> {
match self {
SessionState::Active(state) => {
state.handle_verification_issue(ctx, message, flags).await
}
SessionState::AwaitingResend(state) => {
state.handle_verification_issue(ctx, message, flags).await
}
SessionState::AwaitingLogon(state) => {
state.handle_verification_issue(ctx, message, flags).await
}
SessionState::AwaitingLogout(state) => {
state.handle_verification_issue(ctx, message, flags).await
}
SessionState::Disconnected(_) => {
error!("handle_verification_issue called while disconnected");
Ok(VerificationResult::Passed)
}
}
}
pub(crate) async fn try_set_next_target_seq_num<A, S>(
&self,
ctx: &mut SessionCtx<A, S>,
seq_num: NonZeroU64,
) -> Result<(), SetNextTargetSeqNumError>
where
A: Application,
S: MessageStore,
{
match self {
SessionState::Disconnected(_) => {
let target_seq_num = seq_num.get() - 1;
ctx.store
.set_target_seq_number(target_seq_num)
.await
.map_err(SetNextTargetSeqNumError::from)
}
_ => Err(SetNextTargetSeqNumError::InvalidState {
current: self.as_status(),
}),
}
}
pub async fn initiate_graceful_logout<A, S>(
&mut self,
ctx: &mut SessionCtx<A, S>,
reason: &str,
reconnect: bool,
) -> Result<TransitionResult, SessionOperationError>
where
A: Application,
S: MessageStore,
{
let result = self.try_transition_to_awaiting_logout(
Duration::from_secs(ctx.config.logout_timeout),
reconnect,
);
if matches!(result, TransitionResult::TransitionTo(_)) {
self.send_logout(ctx, reason).await?;
}
Ok(result)
}
pub async fn logout_and_terminate<A, S>(&mut self, ctx: &mut SessionCtx<A, S>, reason: &str)
where
A: Application,
S: MessageStore,
{
if let Err(err) = self.send_logout(ctx, reason).await {
warn!("failed to send logout during session termination: {}", err);
}
self.disconnect_writer().await;
}
pub async fn send_logout<A, S>(
&mut self,
ctx: &mut SessionCtx<A, S>,
reason: &str,
) -> Result<(), SessionOperationError>
where
A: Application,
S: MessageStore,
{
let logout = Logout::with_reason(reason.to_string());
self.send_message(ctx, logout)
.await
.with_send_context("logout")?;
Ok(())
}
pub fn register_schedule_awaiter(&mut self, responder: oneshot::Sender<ScheduleResponse>) {
match self {
SessionState::Disconnected(state) => {
if state.has_schedule_awaiter() {
let reason = &state.reason;
error!(
"schedule awaiter already registered on state disconnected due to: {reason}"
);
if let Err(err) = responder.send(ScheduleResponse::Shutdown) {
error!("failed to send schedule awaiter response: {err:?}");
}
} else {
state.set_schedule_awaiter(responder);
debug!("registered schedule awaiter");
}
}
_ => {
error!("schedule awaiter can only be registered on disconnected sessions");
if let Err(err) = responder.send(ScheduleResponse::Shutdown) {
error!("failed to send schedule awaiter response: {err:?}");
}
}
}
}
pub fn notify_schedule_awaiter(&mut self) {
if let SessionState::Disconnected(state) = self
&& let Some(awaiter) = state.take_schedule_awaiter()
{
if let Err(err) = awaiter.send(ScheduleResponse::InSchedule) {
error!("failed to send schedule awaiter response: {err:?}");
} else {
debug!("notified schedule awaiter");
}
}
}
pub fn heartbeat_deadline(&self) -> Option<&Instant> {
match self {
Self::Active(ActiveState {
heartbeat_deadline, ..
}) => Some(heartbeat_deadline),
_ => None,
}
}
pub fn reset_heartbeat_timer(&mut self, heartbeat_interval: u64) {
if let Self::Active(ActiveState {
heartbeat_deadline, ..
}) = self
{
*heartbeat_deadline = Instant::now() + Duration::from_secs(heartbeat_interval);
}
}
pub fn peer_deadline(&self) -> Option<&Instant> {
match self {
Self::Active(ActiveState { peer_deadline, .. }) => Some(peer_deadline),
Self::AwaitingLogon(AwaitingLogonState { logon_timeout, .. }) => Some(logon_timeout),
Self::AwaitingLogout(AwaitingLogoutState { logout_timeout, .. }) => {
Some(logout_timeout)
}
_ => None,
}
}
pub fn reset_peer_timer(
&mut self,
heartbeat_interval: u64,
test_request_id: Option<TestRequestId>,
) {
if let Self::Active(ActiveState {
peer_deadline,
sent_test_request_id,
..
}) = self
{
let interval = calculate_peer_interval(heartbeat_interval);
*peer_deadline = Instant::now() + Duration::from_secs(interval);
*sent_test_request_id = test_request_id;
}
}
pub fn expected_test_response_id(&self) -> Option<&TestRequestId> {
match self {
Self::Active(ActiveState {
sent_test_request_id: expected_test_response_id,
..
}) => expected_test_response_id.as_ref(),
_ => None,
}
}
pub fn is_connected(&self) -> bool {
self.get_writer().is_some()
}
pub fn is_logged_on(&self) -> bool {
matches!(self, SessionState::Active(_))
|| matches!(self, SessionState::AwaitingResend { .. })
}
pub fn is_expecting_test_response(&self) -> bool {
self.expected_test_response_id().is_some()
}
pub fn is_awaiting_logon(&self) -> bool {
matches!(self, SessionState::AwaitingLogon(_))
}
pub fn is_awaiting_logout(&self) -> bool {
matches!(self, SessionState::AwaitingLogout(_))
}
pub fn as_status(&self) -> SessionInfoStatus {
match self {
SessionState::AwaitingLogon(_) => SessionInfoStatus::AwaitingLogon,
SessionState::AwaitingResend(AwaitingResendState {
begin_seq_number,
end_seq_number,
resend_attempts,
..
}) => SessionInfoStatus::AwaitingResend {
begin: *begin_seq_number,
end: *end_seq_number,
attempts: *resend_attempts,
},
SessionState::AwaitingLogout(_) => SessionInfoStatus::AwaitingLogout,
SessionState::Active(_) => SessionInfoStatus::Active,
SessionState::Disconnected(_) => SessionInfoStatus::Disconnected,
}
}
}