use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use crate::message::{tags, FixMessage, MsgType};
use crate::seq_store::FixSeqStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FixVersion {
Fix42,
Fix44,
Fix50Sp2,
}
impl FixVersion {
pub fn begin_string(&self) -> &'static str {
match self {
FixVersion::Fix42 => "FIX.4.2",
FixVersion::Fix44 => "FIX.4.4",
FixVersion::Fix50Sp2 => "FIXT.1.1",
}
}
pub fn is_fixt(&self) -> bool {
matches!(self, FixVersion::Fix50Sp2)
}
}
#[derive(Debug, Clone)]
pub struct FixSessionConfig {
pub version: FixVersion,
pub sender_comp_id: String,
pub target_comp_id: String,
pub heartbeat: Duration,
pub reset_on_logon: bool,
}
impl FixSessionConfig {
pub fn new(
version: FixVersion,
sender_comp_id: impl Into<String>,
target_comp_id: impl Into<String>,
) -> Self {
FixSessionConfig {
version,
sender_comp_id: sender_comp_id.into(),
target_comp_id: target_comp_id.into(),
heartbeat: Duration::from_secs(30),
reset_on_logon: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionState {
Disconnected,
LogonSent,
Active,
LogoutSent,
}
pub struct FixSession {
config: FixSessionConfig,
store: Arc<dyn FixSeqStore>,
state: SessionState,
}
impl FixSession {
pub fn new(config: FixSessionConfig, store: Arc<dyn FixSeqStore>) -> Self {
FixSession { config, store, state: SessionState::Disconnected }
}
pub fn state(&self) -> SessionState {
self.state
}
pub fn config(&self) -> &FixSessionConfig {
&self.config
}
pub fn store(&self) -> &Arc<dyn FixSeqStore> {
&self.store
}
async fn finalize_outbound(&self, mut msg: FixMessage) -> FixMessage {
let seq = self.store.next_out().await;
msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
msg.set(tags::MSG_SEQ_NUM, seq.to_string());
msg.set(tags::SENDING_TIME, sending_time());
msg
}
pub async fn build_logon(&mut self) -> FixMessage {
if self.config.reset_on_logon {
self.store.reset().await;
}
let mut msg = FixMessage::of_type(MsgType::Logon);
msg.set(tags::ENCRYPT_METHOD, "0");
msg.set(tags::HEART_BT_INT, self.config.heartbeat.as_secs().to_string());
if self.config.reset_on_logon {
msg.set(tags::RESET_SEQ_NUM_FLAG, "Y");
}
if self.config.version.is_fixt() {
msg.set(tags::DEFAULT_APPL_VER_ID, "9");
}
let out = self.finalize_outbound(msg).await;
self.state = SessionState::LogonSent;
out
}
pub async fn build_logout(&mut self, reason: Option<&str>) -> FixMessage {
let mut msg = FixMessage::of_type(MsgType::Logout);
if let Some(r) = reason {
msg.set(tags::TEXT, r);
}
let out = self.finalize_outbound(msg).await;
self.state = SessionState::LogoutSent;
out
}
pub async fn build_heartbeat(&mut self, test_req_id: Option<&str>) -> FixMessage {
let mut msg = FixMessage::of_type(MsgType::Heartbeat);
if let Some(id) = test_req_id {
msg.set(tags::TEST_REQ_ID, id);
}
self.finalize_outbound(msg).await
}
pub async fn heartbeat(&mut self) -> FixMessage {
self.build_heartbeat(None).await
}
pub async fn build_resend_request(&mut self, begin: u64, end: u64) -> FixMessage {
let mut msg = FixMessage::of_type(MsgType::ResendRequest);
msg.set(tags::BEGIN_SEQ_NO, begin.to_string());
msg.set(tags::END_SEQ_NO, end.to_string());
self.finalize_outbound(msg).await
}
pub async fn build_gap_fill(&self, begin_seq: u64, new_seq_no: u64) -> FixMessage {
let mut msg = FixMessage::of_type(MsgType::SequenceReset);
msg.set(tags::GAP_FILL_FLAG, "Y");
msg.set(tags::POSS_DUP_FLAG, "Y");
msg.set(tags::NEW_SEQ_NO, new_seq_no.to_string());
msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
msg.set(tags::MSG_SEQ_NUM, begin_seq.to_string());
msg.set(tags::SENDING_TIME, sending_time());
msg
}
pub async fn build_app_message(&self, mut msg: FixMessage) -> FixMessage {
let seq = self.store.next_out().await;
msg.set(tags::BEGIN_STRING, self.config.version.begin_string());
msg.set(tags::SENDER_COMP_ID, self.config.sender_comp_id.clone());
msg.set(tags::TARGET_COMP_ID, self.config.target_comp_id.clone());
msg.set(tags::MSG_SEQ_NUM, seq.to_string());
msg.set(tags::SENDING_TIME, sending_time());
msg
}
pub async fn handle_inbound(&mut self, msg: FixMessage) -> InboundOutcome {
let mut outcome = InboundOutcome::default();
let msg_type = msg.msg_type();
let seq = msg.seq_num();
let is_seq_reset = matches!(msg_type, Some(MsgType::SequenceReset));
let is_reset_logon = matches!(msg_type, Some(MsgType::Logon))
&& msg.get(tags::RESET_SEQ_NUM_FLAG).map(|v| v == "Y" || v == "y").unwrap_or(false);
if let Some(seq) = seq {
if !is_seq_reset && !is_reset_logon {
let expected = self.store.current_in().await;
if seq > expected {
let rr = self.build_resend_request(expected, 0).await;
outcome.outbound.push(rr);
outcome.gap_detected = Some((expected, seq));
return outcome;
} else if seq < expected {
outcome.ignored_duplicate = true;
return outcome;
}
}
}
match msg_type {
Some(MsgType::Logon) => {
if is_reset_logon {
self.store.reset().await;
}
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
if self.state == SessionState::LogonSent {
self.state = SessionState::Active;
} else {
let reply = self.build_logon().await;
outcome.outbound.push(reply);
self.state = SessionState::Active;
}
}
Some(MsgType::TestRequest) => {
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
let id = msg.get(tags::TEST_REQ_ID).map(|s| s.to_string());
let hb = self.build_heartbeat(id.as_deref()).await;
outcome.outbound.push(hb);
}
Some(MsgType::Heartbeat) => {
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
}
Some(MsgType::ResendRequest) => {
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
let begin = msg.get_u64(tags::BEGIN_SEQ_NO).unwrap_or(1);
let end = msg.get_u64(tags::END_SEQ_NO).unwrap_or(0);
let next_out = self.store.peek_out().await;
let new_seq_no = if end == 0 { next_out } else { (end + 1).max(next_out) };
let gap_fill = self.build_gap_fill(begin, new_seq_no).await;
outcome.outbound.push(gap_fill);
}
Some(MsgType::SequenceReset) => {
if let Some(new_seq) = msg.get_u64(tags::NEW_SEQ_NO) {
if new_seq >= 1 {
self.store.observed_in(new_seq - 1).await;
}
}
}
Some(MsgType::Logout) => {
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
if self.state == SessionState::LogoutSent {
self.state = SessionState::Disconnected;
} else {
let reply = self.build_logout(None).await;
outcome.outbound.push(reply);
self.state = SessionState::Disconnected;
}
}
Some(MsgType::NewOrderSingle) | Some(MsgType::ExecutionReport) | Some(MsgType::Other(_)) => {
if let Some(seq) = seq {
self.store.observed_in(seq).await;
}
outcome.application = Some(msg);
}
None => {
outcome.ignored_duplicate = true;
}
}
outcome
}
pub async fn run(
mut self,
inbound: atomr_streams::Source<Result<Bytes, atomr_streams::FramingError>>,
writer: tokio::sync::mpsc::UnboundedSender<Bytes>,
app_tx: tokio::sync::mpsc::UnboundedSender<FixMessage>,
mut app_rx: tokio::sync::mpsc::UnboundedReceiver<FixMessage>,
) {
let (frame_tx, mut frame_rx) =
tokio::sync::mpsc::unbounded_channel::<Result<Bytes, atomr_streams::FramingError>>();
tokio::spawn(async move {
atomr_streams::Sink::to_sender(inbound, frame_tx).await;
});
let logon = self.build_logon().await;
let _ = writer.send(logon.to_wire());
let mut hb = tokio::time::interval(self.config.heartbeat);
hb.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
frame = frame_rx.recv() => {
let Some(frame) = frame else { break; };
let Ok(bytes) = frame else { continue; };
let Ok(msg) = FixMessage::parse(&bytes) else { continue; };
let outcome = self.handle_inbound(msg).await;
for out in &outcome.outbound {
let _ = writer.send(out.to_wire());
}
if let Some(app) = outcome.application {
let _ = app_tx.send(app);
}
if self.state == SessionState::Disconnected {
break;
}
}
app = app_rx.recv() => {
let Some(app) = app else { continue; };
if self.state == SessionState::Active {
let out = self.build_app_message(app).await;
let _ = writer.send(out.to_wire());
}
}
_ = hb.tick() => {
if self.state == SessionState::Active {
let hb_msg = self.heartbeat().await;
let _ = writer.send(hb_msg.to_wire());
}
}
}
}
}
}
#[derive(Debug, Default)]
pub struct InboundOutcome {
pub outbound: Vec<FixMessage>,
pub application: Option<FixMessage>,
pub gap_detected: Option<(u64, u64)>,
pub ignored_duplicate: bool,
}
fn sending_time() -> String {
let now =
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs();
let days = now / 86_400;
let secs_of_day = now % 86_400;
let (h, m, s) = (secs_of_day / 3600, (secs_of_day % 3600) / 60, secs_of_day % 60);
let (y, mo, d) = civil_from_days(days as i64);
format!("{y:04}{mo:02}{d:02}-{h:02}:{m:02}:{s:02}")
}
fn civil_from_days(z: i64) -> (i64, u32, u32) {
let z = z + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = (z - era * 146_097) as u64;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
(if m <= 2 { y + 1 } else { y }, m, d)
}