#![allow(dead_code)]
use iceoryx2::prelude::ZeroCopySend;
use serde::{Deserialize, Serialize};
use wingfoil::*;
pub const SVC_ORDERS: &str = "wingfoil/latency_e2e/orders";
pub const SVC_FILLS: &str = "wingfoil/latency_e2e/fills";
pub const TOPIC_ORDERS: &str = "orders";
pub const TOPIC_FILLS: &str = "fills";
pub const TOPIC_ECHO: &str = "latency_echo";
pub const TOPIC_SESSION: &str = "session";
pub type SessionId = [u8; 16];
pub const SIDE_BUY: u8 = 0;
pub const SIDE_SELL: u8 = 1;
#[repr(C)]
#[derive(Debug, Clone, Copy, Default, ZeroCopySend, Serialize, Deserialize)]
#[type_name("wingfoil::latency_e2e::RoundTrip")]
pub struct RoundTrip {
pub session: SessionId,
pub client_seq: u64,
pub qty: u64,
pub filled_qty: u64,
pub fill_price_bps: i64,
pub t_client_send: u64,
pub side: u8,
pub _pad: [u8; 7],
}
latency_stages! {
#[type_name("wingfoil::latency_e2e::RoundTripLatency")]
pub RoundTripLatency {
ws_recv,
ws_publish,
gw_recv,
gw_price,
fix_send,
fix_recv,
gw_publish,
ws_sub_recv,
ws_send,
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OrderFrame {
pub session: SessionId,
pub client_seq: u64,
pub side: u8,
pub qty: u64,
pub t_client_send: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FillFrame {
pub session: SessionId,
pub client_seq: u64,
pub side: u8,
pub filled_qty: u64,
pub fill_price_bps: i64,
pub t_client_send: u64,
pub stamps: [u64; 9],
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EchoFrame {
pub session: SessionId,
pub client_seq: u64,
pub t_client_send: u64,
pub t_client_recv: u64,
pub stamps: [u64; 9],
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SessionStatus {
pub session: SessionId,
pub state: String,
pub remaining_secs: u32,
pub queue_pos: u32,
}
pub fn env_flag(name: &str) -> bool {
std::env::var(name)
.ok()
.map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on"))
.unwrap_or(false)
}
pub fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
pub fn env_string(name: &str, default: &str) -> String {
std::env::var(name).unwrap_or_else(|_| default.to_string())
}
pub fn precise_stamps_enabled() -> bool {
if std::env::args().any(|a| a == "--no-precise") {
return false;
}
if let Ok(v) = std::env::var("WINGFOIL_PRECISE_STAMPS") {
return matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "on");
}
true
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ParsedCoreList {
pub cores: Vec<usize>,
pub bad_tokens: Vec<String>,
}
pub fn parse_core_list(raw: &str) -> ParsedCoreList {
let mut out = ParsedCoreList::default();
for token in raw.split(',') {
let trimmed = token.trim();
if trimmed.is_empty() {
continue;
}
match trimmed.parse::<usize>() {
Ok(c) => out.cores.push(c),
Err(_) => out.bad_tokens.push(trimmed.to_string()),
}
}
out
}
#[cfg(target_os = "linux")]
pub fn pin_current(cores: &[usize]) -> anyhow::Result<()> {
if cores.is_empty() {
return Ok(());
}
let max = libc::CPU_SETSIZE as usize;
if let Some(&bad) = cores.iter().find(|&&c| c >= max) {
anyhow::bail!("core {bad} is out of range (CPU_SETSIZE = {max})");
}
unsafe {
let mut set: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_ZERO(&mut set);
for &c in cores {
libc::CPU_SET(c, &mut set);
}
let rc = libc::sched_setaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &set);
if rc != 0 {
return Err(std::io::Error::last_os_error().into());
}
}
Ok(())
}
#[cfg(not(target_os = "linux"))]
pub fn pin_current(_cores: &[usize]) -> anyhow::Result<()> {
Ok(())
}
pub fn pin_current_from_env(name: &str) {
let Ok(raw) = std::env::var(name) else {
return;
};
let parsed = parse_core_list(&raw);
if !parsed.bad_tokens.is_empty() {
log::warn!(
"{name}={raw:?} has unparseable token(s) {:?}; not pinning",
parsed.bad_tokens,
);
return;
}
if parsed.cores.is_empty() {
log::warn!("{name}={raw:?} parsed as empty core list; not pinning");
return;
}
match pin_current(&parsed.cores) {
Ok(()) => log::info!(
"pinned current thread to cores {:?} (via {name})",
parsed.cores,
),
Err(e) => log::warn!("failed to pin to cores {:?} from {name}: {e}", parsed.cores,),
}
}
pub fn session_hex(id: &SessionId) -> String {
let mut s = String::with_capacity(32);
for b in id {
use std::fmt::Write;
let _ = write!(s, "{b:02x}");
}
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_core_list_single() {
let p = parse_core_list("2");
assert_eq!(p.cores, vec![2]);
assert!(p.bad_tokens.is_empty());
}
#[test]
fn parse_core_list_multiple_and_whitespace() {
let p = parse_core_list("2, 3 ,5");
assert_eq!(p.cores, vec![2, 3, 5]);
assert!(p.bad_tokens.is_empty());
}
#[test]
fn parse_core_list_empty_tokens_skipped() {
let p = parse_core_list("2,,3, ,4");
assert_eq!(p.cores, vec![2, 3, 4]);
assert!(p.bad_tokens.is_empty());
}
#[test]
fn parse_core_list_collects_bad_tokens() {
let p = parse_core_list("2,abc,3,-1");
assert_eq!(p.cores, vec![2, 3]);
assert_eq!(p.bad_tokens, vec!["abc".to_string(), "-1".to_string()]);
}
#[test]
fn parse_core_list_all_bad() {
let p = parse_core_list("foo,bar");
assert!(p.cores.is_empty());
assert_eq!(p.bad_tokens, vec!["foo".to_string(), "bar".to_string()]);
}
#[test]
fn parse_core_list_empty_input() {
let p = parse_core_list("");
assert!(p.cores.is_empty());
assert!(p.bad_tokens.is_empty());
}
#[test]
fn pin_current_empty_is_noop() {
assert!(pin_current(&[]).is_ok());
}
#[cfg(target_os = "linux")]
#[test]
fn pin_current_rejects_out_of_range_core() {
let huge = libc::CPU_SETSIZE as usize;
let err = pin_current(&[huge]).unwrap_err();
assert!(
err.to_string().contains("out of range"),
"unexpected error: {err}",
);
}
}