use std::ffi::CString;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::thread;
use rtp::trader::{GenericTraderApi, TraderApi, TraderSpi, ResumeType};
use rtp::trader::{DisconnectionReason, RspResult};
use rtp::trader::{
CThostFtdcInstrumentField, CThostFtdcQryInstrumentField,
};
use rtp::binding::TThostFtdcRequestIDType;
struct InstrumentQueryTraderSpi {
connected: Arc<Mutex<bool>>,
instruments_received: Arc<Mutex<Vec<CThostFtdcInstrumentField>>>,
query_finished: Arc<Mutex<bool>>,
}
impl InstrumentQueryTraderSpi {
fn new() -> Self {
InstrumentQueryTraderSpi {
connected: Arc::new(Mutex::new(false)),
instruments_received: Arc::new(Mutex::new(Vec::new())),
query_finished: Arc::new(Mutex::new(false)),
}
}
fn is_connected(&self) -> bool {
*self.connected.lock().unwrap()
}
fn is_query_finished(&self) -> bool {
*self.query_finished.lock().unwrap()
}
fn get_instruments(&self) -> Vec<CThostFtdcInstrumentField> {
self.instruments_received.lock().unwrap().clone()
}
}
impl TraderSpi for InstrumentQueryTraderSpi {
fn on_front_connected(&mut self) {
println!("Connected to trading server");
*self.connected.lock().unwrap() = true;
}
fn on_front_disconnected(&mut self, reason: DisconnectionReason) {
println!("Disconnected from trading server: {:?}", reason);
*self.connected.lock().unwrap() = false;
}
fn on_rsp_qry_instrument(
&mut self,
instrument_data: Option<&CThostFtdcInstrumentField>,
result: RspResult,
_request_id: TThostFtdcRequestIDType,
is_last: bool,
) {
match result {
Ok(()) => {
if let Some(instrument_data) = instrument_data {
let instrument_id = rtp::trader::gb18030_cstr_to_str(&instrument_data.InstrumentID);
let exchange_id = rtp::trader::gb18030_cstr_to_str(&instrument_data.ExchangeID);
println!("Instrument: {} on {}", instrument_id, exchange_id);
}
if is_last {
println!("All instruments received.");
}
},
Err(e) => {
println!("Failed to query instrument: [{0}] {1}", e.id, e.msg);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_instrument_query() {
let flow_path = CString::new("./flow_path").unwrap();
let mut trader = TraderApi::new(flow_path);
let trader_spi = Box::new(InstrumentQueryTraderSpi::new());
let spi_ref = trader_spi.connected.clone();
let query_finished_ref = trader_spi.query_finished.clone();
trader.register_spi(trader_spi);
trader.subscribe_public_topic(ResumeType::Quick);
trader.subscribe_private_topic(ResumeType::Quick);
println!("Initializing trader API...");
trader.init();
println!("Waiting for potential connection...");
thread::sleep(Duration::from_secs(1));
if *spi_ref.lock().unwrap() {
println!("Connected to server, proceeding with instrument query");
let query_req = CThostFtdcQryInstrumentField::default();
let request_id = 1; match trader.req_qry_instrument(&query_req, request_id) {
Ok(_) => println!("Instrument query request sent successfully"),
Err(e) => println!("Failed to send instrument query: {:?}", e),
}
let max_wait_time = Duration::from_secs(30);
let start = std::time::Instant::now();
while !*query_finished_ref.lock().unwrap() && start.elapsed() < max_wait_time {
thread::sleep(Duration::from_millis(100));
}
if *query_finished_ref.lock().unwrap() {
println!("Query completed successfully");
} else {
println!("Query timed out");
}
} else {
println!("Not connected to server, skipping instrument query");
}
println!("Test completed");
}
}