use core::str::FromStr;
use std::fs;
use std::path::Path;
use ibapi::accounts::{AccountSummaryResult, PositionUpdate};
use ibapi::client::blocking::Client;
use ibapi::messages::parser_registry::{MessageParserRegistry, ParsedField};
use ibapi::messages::*;
use ibapi::trace;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct Field {
name: String,
value: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct InteractionRecord {
name: String,
request: MessageRecord,
responses: Vec<MessageRecord>,
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageRecord {
raw: String,
fields: Vec<Field>,
}
mod sanitization {
use super::Field;
pub fn sanitize_field_value(value: &str) -> String {
if (value.starts_with("DU") || value.starts_with("U")) && value.len() > 5 {
return "ACCOUNT_ID".to_string();
}
if value.len() > 20 && value.chars().all(|c| c.is_alphanumeric()) {
return format!("{}...", &value[..6]);
}
value.to_string()
}
pub fn sanitize_raw_message(raw: &str, fields: &[Field]) -> String {
let mut sanitized = raw.to_string();
let parts: Vec<&str> = raw.split('\0').collect();
for (i, part) in parts.iter().enumerate() {
if (part.starts_with("DU") || part.starts_with("U")) && part.len() > 5 {
if let Some(field) = fields.get(i) {
if field.value == "ACCOUNT_ID" {
sanitized = sanitized.replace(part, "ACCOUNT_ID");
}
}
}
}
sanitized
}
}
use sanitization::{sanitize_field_value, sanitize_raw_message};
fn parse_message_parts(raw: &str) -> Vec<&str> {
let mut parts: Vec<&str> = raw.split('\0').collect();
if parts.last() == Some(&"") {
parts.pop();
}
parts
}
fn parse_request_fields(raw: &str, registry: &MessageParserRegistry) -> Vec<Field> {
let parts = parse_message_parts(raw);
if parts.is_empty() {
return Vec::new();
}
match parts.first().map(|s| OutgoingMessages::from_str(s)) {
Some(Ok(msg_type)) => {
let mut parsed = registry.parse_request(msg_type, &parts);
if matches!(msg_type, OutgoingMessages::RequestPnL) {
for field in &mut parsed {
if field.name == "account" {
field.value = sanitize_field_value(&field.value);
}
}
}
convert_parsed_fields(parsed)
}
_ => {
let parsed = parser_registry::parse_generic_message(&parts);
convert_parsed_fields(parsed)
}
}
}
fn parse_response_fields(raw: &str, registry: &MessageParserRegistry) -> Vec<Field> {
let parts = parse_message_parts(raw);
if parts.is_empty() {
return Vec::new();
}
match parts.first().map(|s| IncomingMessages::from_str(s)) {
Some(Ok(msg_type)) => {
let mut parsed = registry.parse_response(msg_type, &parts);
match msg_type {
IncomingMessages::ManagedAccounts => {
for field in &mut parsed {
if field.name == "accounts" {
field.value = field
.value
.split(',')
.map(|acc| sanitize_field_value(acc.trim()))
.collect::<Vec<_>>()
.join(",");
}
}
}
IncomingMessages::Position | IncomingMessages::AccountSummary | IncomingMessages::PnL => {
for field in &mut parsed {
if field.name == "account" {
field.value = sanitize_field_value(&field.value);
}
}
}
_ => {}
}
convert_parsed_fields(parsed)
}
_ => {
let parsed = parser_registry::parse_generic_message(&parts);
convert_parsed_fields(parsed)
}
}
}
fn convert_parsed_fields(parsed: Vec<ParsedField>) -> Vec<Field> {
parsed
.into_iter()
.map(|pf| Field {
name: pf.name,
value: pf.value,
})
.collect()
}
struct InteractionRecorder {
registry: MessageParserRegistry,
}
impl InteractionRecorder {
fn new() -> Self {
Self {
registry: MessageParserRegistry::new(),
}
}
fn record_interaction<F>(&self, name: &str, operation: F) -> Result<InteractionRecord, Box<dyn std::error::Error>>
where
F: FnOnce() -> Result<InteractionRecord, Box<dyn std::error::Error>>,
{
println!("Recording {name} interaction...");
std::thread::sleep(std::time::Duration::from_millis(100));
let record = operation()?;
Ok(record)
}
fn create_request_record(&self, raw: String) -> MessageRecord {
let fields = parse_request_fields(&raw, &self.registry);
let sanitized_raw = sanitize_raw_message(&raw, &fields);
MessageRecord { raw: sanitized_raw, fields }
}
fn create_response_record(&self, raw: String) -> MessageRecord {
let fields = parse_response_fields(&raw, &self.registry);
let sanitized_raw = sanitize_raw_message(&raw, &fields);
MessageRecord { raw: sanitized_raw, fields }
}
}
#[derive(Debug, Serialize, Deserialize)]
struct RecordingHeader {
server_version: i32,
recorded_at: String,
}
#[derive(Serialize)]
struct YamlOutput {
header: RecordingHeader,
interactions: Vec<InteractionRecord>,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init();
println!("Connecting to TWS/Gateway...");
let client = Client::connect("127.0.0.1:4002", 100)?;
println!("Connected successfully!");
let recorder = InteractionRecorder::new();
let mut interactions = Vec::new();
match recorder.record_interaction("server_time", || {
let server_time = client.server_time()?;
println!("Server time: {server_time}");
let interaction = trace::blocking::last_interaction().ok_or("No interaction captured - ensure debug logging is enabled")?;
let record = InteractionRecord {
name: "server_time".to_string(),
request: recorder.create_request_record(interaction.request),
responses: interaction.responses.into_iter().map(|r| recorder.create_response_record(r)).collect(),
};
Ok(record)
}) {
Ok(record) => {
println!("\nRecorded interaction: {}", record.name);
println!("Request: {}", record.request.raw);
println!("Responses: {} message(s)", record.responses.len());
interactions.push(record);
}
Err(e) => eprintln!("Failed to record server_time: {e}"),
}
match recorder.record_interaction("managed_accounts", || {
let accounts = client.managed_accounts()?;
println!("Managed accounts: {accounts:?} (will be sanitized)");
let interaction = trace::blocking::last_interaction().ok_or("No interaction captured - ensure debug logging is enabled")?;
let record = InteractionRecord {
name: "managed_accounts".to_string(),
request: recorder.create_request_record(interaction.request),
responses: interaction.responses.into_iter().map(|r| recorder.create_response_record(r)).collect(),
};
Ok(record)
}) {
Ok(record) => {
println!("\nRecorded interaction: {}", record.name);
println!("Request: {}", record.request.raw);
println!("Responses: {} message(s)", record.responses.len());
interactions.push(record);
}
Err(e) => eprintln!("Failed to record managed_accounts: {e}"),
}
let accounts = client.managed_accounts().unwrap_or_default();
let account = accounts.first().map(|s| s.as_str()).unwrap_or("DU1234567");
println!("\nUsing account: {account} for subsequent queries");
match recorder.record_interaction("positions", || {
let mut position_count = 0;
let positions = client.positions()?;
println!("Starting to consume positions...");
while let Some(position) = positions.next() {
if let PositionUpdate::PositionEnd = position {
break;
}
position_count += 1;
println!("Got position #{position_count}");
}
println!("Finished consuming positions: {position_count}");
let interaction = trace::blocking::last_interaction().ok_or("No interaction captured - ensure debug logging is enabled")?;
drop(positions);
let record = InteractionRecord {
name: "positions".to_string(),
request: recorder.create_request_record(interaction.request),
responses: interaction.responses.into_iter().map(|r| recorder.create_response_record(r)).collect(),
};
Ok(record)
}) {
Ok(record) => {
println!("\nRecorded interaction: {}", record.name);
println!("Request: {}", record.request.raw);
println!("Responses: {} message(s)", record.responses.len());
interactions.push(record);
}
Err(e) => eprintln!("Failed to record positions: {e}"),
}
match recorder.record_interaction("account_summary", || {
use ibapi::accounts::types::AccountGroup;
let mut summary_count = 0;
let tags = vec!["NetLiquidation", "TotalCashValue", "GrossPositionValue"];
let group = AccountGroup("All".to_string());
let summaries = client.account_summary(&group, &tags)?;
while let Some(summary) = summaries.next() {
if let AccountSummaryResult::End = summary {
break;
}
summary_count += 1;
println!("Got summary #{summary_count}");
}
println!("Finished consuming summaries: {summary_count}");
let interaction = trace::blocking::last_interaction().ok_or("No interaction captured - ensure debug logging is enabled")?;
drop(summaries);
let record = InteractionRecord {
name: "account_summary".to_string(),
request: recorder.create_request_record(interaction.request),
responses: interaction.responses.into_iter().map(|r| recorder.create_response_record(r)).collect(),
};
Ok(record)
}) {
Ok(record) => {
println!("\nRecorded interaction: {}", record.name);
println!("Request: {}", record.request.raw);
println!("Responses: {} message(s)", record.responses.len());
interactions.push(record);
}
Err(e) => eprintln!("Failed to record account_summary: {e}"),
}
match recorder.record_interaction("pnl", || {
use ibapi::accounts::types::AccountId;
let mut pnl_updates = 0;
let account_id = AccountId(account.to_string());
let pnl_stream = client.pnl(&account_id, None)?;
for update in pnl_stream.into_iter().take(3) {
pnl_updates += 1;
println!("Got PnL update #{pnl_updates}: {update:?}");
}
println!("Finished consuming PnL updates: {pnl_updates}");
std::thread::sleep(std::time::Duration::from_millis(100));
let interaction = trace::blocking::last_interaction().ok_or("No interaction captured - ensure debug logging is enabled")?;
println!("PnL interaction request: {}", interaction.request);
println!("PnL interaction responses: {} messages", interaction.responses.len());
let record = InteractionRecord {
name: "pnl".to_string(),
request: recorder.create_request_record(interaction.request),
responses: interaction.responses.into_iter().map(|r| recorder.create_response_record(r)).collect(),
};
Ok(record)
}) {
Ok(record) => {
println!("\nRecorded interaction: {}", record.name);
println!("Request: {}", record.request.raw);
println!("Responses: {} message(s)", record.responses.len());
interactions.push(record);
}
Err(e) => eprintln!("Failed to record pnl: {e}"),
}
let header = RecordingHeader {
server_version: client.server_version(),
recorded_at: time::OffsetDateTime::now_utc().to_string(),
};
let output_path = Path::new("tws_interactions.yaml");
let yaml_output = YamlOutput { header, interactions };
let yaml_content = serde_yaml::to_string(&yaml_output)?;
fs::write(output_path, &yaml_content)?;
println!("\nSaved to {}:", output_path.display());
println!("{yaml_content}");
Ok(())
}