use bytes::BytesMut;
use ironfix::core::MsgType;
use ironfix::tagvalue::{Decoder, Encoder};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
mod common;
use common::{ExampleConfig, format_timestamp, init_logging, try_decode_message};
const FIX_VERSION: &str = "FIXT.1.1";
const APPL_VER_ID: &str = "9"; const DEFAULT_PORT: u16 = 9882;
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
init_logging();
let mut cfg = ExampleConfig::server();
cfg.port = std::env::var("FIX_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(DEFAULT_PORT);
info!(
"Starting FIX 5.0 SP2 ({}) server on {}",
FIX_VERSION,
cfg.addr()
);
let listener: TcpListener = TcpListener::bind(&cfg.addr()).await?;
let state = Arc::new(Mutex::new(HashMap::<String, u64>::new()));
loop {
let (socket, addr) = listener.accept().await?;
info!("Connection from {}", addr);
let state = Arc::clone(&state);
let cfg = cfg.clone();
tokio::spawn(async move {
if let Err(e) = handle(socket, state, cfg).await {
error!("Error: {}", e);
}
});
}
}
async fn handle(
mut sock: TcpStream,
state: Arc<Mutex<HashMap<String, u64>>>,
cfg: ExampleConfig,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut buf = BytesMut::with_capacity(4096);
let key = format!("{}:{}", cfg.target_comp_id, cfg.sender_comp_id);
state.lock().await.insert(key.clone(), 1);
loop {
if sock.read_buf(&mut buf).await? == 0 {
break;
}
while let Some(len) = try_decode_message(&buf) {
let msg = buf.split_to(len);
let mut dec = Decoder::new(&msg);
if let Ok(raw) = dec.decode() {
let resp = match raw.msg_type() {
MsgType::Logon => {
info!("Logon");
Some(build_logon(&cfg))
}
MsgType::TestRequest => Some(build_hb(&cfg, raw.get_field_str(112))),
MsgType::Logout => {
sock.write_all(&build_logout(&cfg)).await?;
return Ok(());
}
MsgType::NewOrderSingle => Some(build_exec(&cfg, &raw)),
_ => {
warn!("Unhandled: {:?}", raw.msg_type());
None
}
};
if let Some(r) = resp {
sock.write_all(&r).await?;
*state.lock().await.get_mut(&key).unwrap() += 1;
}
}
}
}
Ok(())
}
fn build_logon(c: &ExampleConfig) -> Vec<u8> {
let mut e = Encoder::new(FIX_VERSION);
e.put_str(35, "A");
e.put_str(49, &c.sender_comp_id);
e.put_str(56, &c.target_comp_id);
e.put_str(34, "1");
e.put_str(52, &format_timestamp());
e.put_str(98, "0");
e.put_str(108, &c.heartbeat_interval.to_string());
e.put_str(1137, APPL_VER_ID);
e.finish().to_vec()
}
fn build_hb(c: &ExampleConfig, id: Option<&str>) -> Vec<u8> {
let mut e = Encoder::new(FIX_VERSION);
e.put_str(35, "0");
e.put_str(49, &c.sender_comp_id);
e.put_str(56, &c.target_comp_id);
e.put_str(34, "1");
e.put_str(52, &format_timestamp());
if let Some(i) = id {
e.put_str(112, i);
}
e.finish().to_vec()
}
fn build_logout(c: &ExampleConfig) -> Vec<u8> {
let mut e = Encoder::new(FIX_VERSION);
e.put_str(35, "5");
e.put_str(49, &c.sender_comp_id);
e.put_str(56, &c.target_comp_id);
e.put_str(34, "1");
e.put_str(52, &format_timestamp());
e.finish().to_vec()
}
fn build_exec(c: &ExampleConfig, raw: &ironfix::tagvalue::RawMessage<'_>) -> Vec<u8> {
let clid = raw.get_field_str(11).unwrap_or("0");
let mut e = Encoder::new(FIX_VERSION);
e.put_str(35, "8");
e.put_str(49, &c.sender_comp_id);
e.put_str(56, &c.target_comp_id);
e.put_str(34, "1");
e.put_str(52, &format_timestamp());
e.put_str(1128, APPL_VER_ID);
e.put_str(37, &format!("O{}", clid));
e.put_str(11, clid);
e.put_str(17, &format!("E{}", clid));
e.put_str(150, "0");
e.put_str(39, "0");
e.put_str(55, raw.get_field_str(55).unwrap_or("N/A"));
e.put_str(54, raw.get_field_str(54).unwrap_or("1"));
e.put_str(151, raw.get_field_str(38).unwrap_or("0"));
e.put_str(14, "0");
e.put_str(6, "0");
e.finish().to_vec()
}