IronFix 0.1.1

High-performance FIX/FAST protocol engine for Rust
Documentation
//! FIX 5.0 Server Example (FIXT.1.1 Transport)
use bytes::BytesMut;
use ironfix::core::MsgType;
use ironfix::tagvalue::{Decoder, Encoder};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
mod common;
use common::{ExampleConfig, format_timestamp, init_logging, try_decode_message};

const FIX_VERSION: &str = "FIXT.1.1";
const APPL_VER_ID: &str = "7"; // FIX 5.0
const DEFAULT_PORT: u16 = 9880;

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    init_logging();
    let mut cfg = ExampleConfig::server();
    cfg.port = std::env::var("FIX_PORT")
        .ok()
        .and_then(|p| p.parse().ok())
        .unwrap_or(DEFAULT_PORT);
    info!(
        "Starting FIX 5.0 ({}) server on {}",
        FIX_VERSION,
        cfg.addr()
    );
    let listener: TcpListener = TcpListener::bind(&cfg.addr()).await?;
    let state = Arc::new(Mutex::new(HashMap::<String, u64>::new()));
    loop {
        let (socket, addr) = listener.accept().await?;
        info!("Connection from {}", addr);
        let state = Arc::clone(&state);
        let cfg = cfg.clone();
        tokio::spawn(async move {
            if let Err(e) = handle(socket, state, cfg).await {
                error!("Error: {}", e);
            }
        });
    }
}

async fn handle(
    mut sock: TcpStream,
    state: Arc<Mutex<HashMap<String, u64>>>,
    cfg: ExampleConfig,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let mut buf = BytesMut::with_capacity(4096);
    let key = format!("{}:{}", cfg.target_comp_id, cfg.sender_comp_id);
    state.lock().await.insert(key.clone(), 1);
    loop {
        if sock.read_buf(&mut buf).await? == 0 {
            break;
        }
        while let Some(len) = try_decode_message(&buf) {
            let msg = buf.split_to(len);
            let mut dec = Decoder::new(&msg);
            if let Ok(raw) = dec.decode() {
                let resp = match raw.msg_type() {
                    MsgType::Logon => {
                        info!("Logon");
                        Some(build_logon(&cfg))
                    }
                    MsgType::TestRequest => Some(build_hb(&cfg, raw.get_field_str(112))),
                    MsgType::Logout => {
                        sock.write_all(&build_logout(&cfg)).await?;
                        return Ok(());
                    }
                    MsgType::NewOrderSingle => Some(build_exec(&cfg, &raw)),
                    _ => {
                        warn!("Unhandled: {:?}", raw.msg_type());
                        None
                    }
                };
                if let Some(r) = resp {
                    sock.write_all(&r).await?;
                    *state.lock().await.get_mut(&key).unwrap() += 1;
                }
            }
        }
    }
    Ok(())
}

fn build_logon(c: &ExampleConfig) -> Vec<u8> {
    let mut e = Encoder::new(FIX_VERSION);
    e.put_str(35, "A");
    e.put_str(49, &c.sender_comp_id);
    e.put_str(56, &c.target_comp_id);
    e.put_str(34, "1");
    e.put_str(52, &format_timestamp());
    e.put_str(98, "0");
    e.put_str(108, &c.heartbeat_interval.to_string());
    e.put_str(1137, APPL_VER_ID);
    e.finish().to_vec()
}

fn build_hb(c: &ExampleConfig, id: Option<&str>) -> Vec<u8> {
    let mut e = Encoder::new(FIX_VERSION);
    e.put_str(35, "0");
    e.put_str(49, &c.sender_comp_id);
    e.put_str(56, &c.target_comp_id);
    e.put_str(34, "1");
    e.put_str(52, &format_timestamp());
    if let Some(i) = id {
        e.put_str(112, i);
    }
    e.finish().to_vec()
}

fn build_logout(c: &ExampleConfig) -> Vec<u8> {
    let mut e = Encoder::new(FIX_VERSION);
    e.put_str(35, "5");
    e.put_str(49, &c.sender_comp_id);
    e.put_str(56, &c.target_comp_id);
    e.put_str(34, "1");
    e.put_str(52, &format_timestamp());
    e.finish().to_vec()
}

fn build_exec(c: &ExampleConfig, raw: &ironfix::tagvalue::RawMessage<'_>) -> Vec<u8> {
    let clid = raw.get_field_str(11).unwrap_or("0");
    let mut e = Encoder::new(FIX_VERSION);
    e.put_str(35, "8");
    e.put_str(49, &c.sender_comp_id);
    e.put_str(56, &c.target_comp_id);
    e.put_str(34, "1");
    e.put_str(52, &format_timestamp());
    e.put_str(1128, APPL_VER_ID);
    e.put_str(37, &format!("O{}", clid));
    e.put_str(11, clid);
    e.put_str(17, &format!("E{}", clid));
    e.put_str(150, "0");
    e.put_str(39, "0");
    e.put_str(55, raw.get_field_str(55).unwrap_or("N/A"));
    e.put_str(54, raw.get_field_str(54).unwrap_or("1"));
    e.put_str(151, raw.get_field_str(38).unwrap_or("0"));
    e.put_str(14, "0");
    e.put_str(6, "0");
    e.finish().to_vec()
}