use tokio::sync::broadcast::error::RecvError;
use tracing::info;
use rithmic_rs::{
BracketDuration, BracketPriceType, BracketTransactionType, ConnectStrategy, RithmicAccount,
RithmicBracketOrder, RithmicConfig, RithmicEnv, RithmicOrderPlant,
plants::subscription::SubscriptionFilter, rti::messages::RithmicMessage,
};
fn spawn_order_listener(mut receiver: SubscriptionFilter) {
tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(update) => {
if let Some(error) = &update.error {
info!("Connection error: {}", error);
break;
}
match &update.message {
RithmicMessage::HeartbeatTimeout
| RithmicMessage::ForcedLogout(_)
| RithmicMessage::ConnectionError => {
info!("Connection lost");
break;
}
RithmicMessage::RithmicOrderNotification(notif) => {
info!(
"Rithmic order: status={:?} symbol={} qty={} price={:?} basket_id={}",
notif.status,
notif.symbol.as_deref().unwrap_or("?"),
notif.quantity.unwrap_or(0),
notif.price,
notif.basket_id.as_deref().unwrap_or("?")
);
}
RithmicMessage::ExchangeOrderNotification(notif) => {
info!(
"Exchange order: status={:?} symbol={} filled_qty={} avg_price={:?}",
notif.status.as_deref().unwrap_or("?"),
notif.symbol.as_deref().unwrap_or("?"),
notif.total_fill_size.unwrap_or(0),
notif.avg_fill_price
);
}
RithmicMessage::BracketUpdates(bracket) => {
info!(
"Bracket update: basket_id={} target_ticks={:?} stop_ticks={:?}",
bracket.basket_id.as_deref().unwrap_or("?"),
bracket.target_ticks,
bracket.stop_ticks
);
}
_ => {}
}
}
Err(RecvError::Closed) => {
info!("Subscription channel closed");
break;
}
Err(RecvError::Lagged(n)) => {
info!("Listener lagged, missed {} messages", n);
}
}
}
});
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
tracing_subscriber::fmt().init();
let config = RithmicConfig::from_env(RithmicEnv::Demo)?;
let account = RithmicAccount::from_env(RithmicEnv::Demo)?;
let order_plant = RithmicOrderPlant::connect(&config, ConnectStrategy::Retry).await?;
let handle = order_plant.get_handle(&account);
handle.login().await?;
info!("Logged in to order plant");
handle.subscribe_order_updates().await?;
handle.subscribe_bracket_updates().await?;
info!("Subscribed to order and bracket updates");
let listener_receiver = handle.subscription_receiver.resubscribe();
spawn_order_listener(listener_receiver);
let bracket_order = RithmicBracketOrder {
action: BracketTransactionType::Buy,
duration: BracketDuration::Day,
exchange: "CME".to_string(),
localid: "example-bracket-1".to_string(),
price_type: BracketPriceType::Limit,
price: Some(5000.00), profit_ticks: 20, stop_ticks: 10, quantity: 1,
symbol: "ESM6".to_string(), };
info!("Placing bracket order: {:?}", bracket_order);
let responses = handle.place_bracket_order(bracket_order).await?;
for resp in &responses {
info!("Order response: {:?}", resp);
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
handle.disconnect().await?;
info!("Disconnected from order plant");
Ok(())
}