use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, info, warn};
use crate::{
ConnectStrategy,
api::{receiver_api::RithmicResponse, rithmic_command_types::LoginConfig},
config::RithmicConfig,
error::RithmicError,
plants::core::{PlantCore, SelectResult},
request_handler::RithmicRequest,
rti::{
messages::RithmicMessage,
request_depth_by_order_updates,
request_login::SysInfraType,
request_market_data_update::{Request, UpdateBits},
request_market_data_update_by_underlying, request_search_symbols,
},
ws::{HEARTBEAT_SECS, PlantActor},
};
use tokio::{
sync::{broadcast, mpsc, oneshot},
time::sleep_until,
};
pub(crate) enum TickerPlantCommand {
Close,
Abort,
ListSystemInfo {
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
Login {
config: LoginConfig,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
SetLogin,
Logout {
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
UpdateHeartbeat {
seconds: u64,
},
Subscribe {
symbol: String,
exchange: String,
fields: Vec<UpdateBits>,
request_type: Request,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
SubscribeOrderBook {
symbol: String,
exchange: String,
request_type: request_depth_by_order_updates::Request,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
RequestDepthByOrderSnapshot {
symbol: String,
exchange: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
SearchSymbols {
search_text: String,
exchange: Option<String>,
product_code: Option<String>,
instrument_type: Option<request_search_symbols::InstrumentType>,
pattern: Option<request_search_symbols::Pattern>,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
ListExchanges {
user: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetInstrumentByUnderlying {
underlying_symbol: String,
exchange: String,
expiration_date: Option<String>,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
SubscribeByUnderlying {
underlying_symbol: String,
exchange: String,
expiration_date: Option<String>,
fields: Vec<request_market_data_update_by_underlying::UpdateBits>,
request_type: request_market_data_update_by_underlying::Request,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetTickSizeTypeTable {
tick_size_type: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetProductCodes {
exchange: Option<String>,
give_toi_products_only: Option<bool>,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetVolumeAtPrice {
symbol: String,
exchange: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetAuxilliaryReferenceData {
symbol: String,
exchange: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetReferenceData {
symbol: String,
exchange: String,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetFrontMonthContract {
symbol: String,
exchange: String,
need_updates: bool,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
GetSystemGatewayInfo {
system_name: Option<String>,
response_sender: oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>,
},
}
impl TickerPlantCommand {
fn into_response_sender_or_command(
self,
) -> Result<oneshot::Sender<Result<Vec<RithmicResponse>, RithmicError>>, Self> {
match self {
Self::ListSystemInfo { response_sender }
| Self::Login {
response_sender, ..
}
| Self::Logout { response_sender }
| Self::Subscribe {
response_sender, ..
}
| Self::SubscribeOrderBook {
response_sender, ..
}
| Self::RequestDepthByOrderSnapshot {
response_sender, ..
}
| Self::SearchSymbols {
response_sender, ..
}
| Self::ListExchanges {
response_sender, ..
}
| Self::GetInstrumentByUnderlying {
response_sender, ..
}
| Self::SubscribeByUnderlying {
response_sender, ..
}
| Self::GetTickSizeTypeTable {
response_sender, ..
}
| Self::GetProductCodes {
response_sender, ..
}
| Self::GetVolumeAtPrice {
response_sender, ..
}
| Self::GetAuxilliaryReferenceData {
response_sender, ..
}
| Self::GetReferenceData {
response_sender, ..
}
| Self::GetFrontMonthContract {
response_sender, ..
}
| Self::GetSystemGatewayInfo {
response_sender, ..
} => Ok(response_sender),
other @ (Self::Close | Self::SetLogin | Self::UpdateHeartbeat { .. } | Self::Abort) => {
Err(other)
}
}
}
}
#[derive(Debug)]
pub struct RithmicTickerPlant {
pub(crate) connection_handle: tokio::task::JoinHandle<()>,
sender: mpsc::Sender<TickerPlantCommand>,
subscription_sender: broadcast::Sender<RithmicResponse>,
}
impl RithmicTickerPlant {
pub async fn connect(
config: &RithmicConfig,
strategy: ConnectStrategy,
) -> Result<RithmicTickerPlant, RithmicError> {
let (req_tx, req_rx) = mpsc::channel::<TickerPlantCommand>(64);
let (sub_tx, _sub_rx) = broadcast::channel(10_000);
let mut ticker_plant = TickerPlant::new(req_rx, sub_tx.clone(), config, strategy).await?;
let connection_handle = tokio::spawn(async move {
ticker_plant.run().await;
});
Ok(RithmicTickerPlant {
connection_handle,
sender: req_tx,
subscription_sender: sub_tx,
})
}
}
impl RithmicTickerPlant {
pub async fn await_shutdown(self) -> Result<(), tokio::task::JoinError> {
self.connection_handle.await
}
pub fn get_handle(&self) -> RithmicTickerPlantHandle {
RithmicTickerPlantHandle {
sender: self.sender.clone(),
subscription_sender: self.subscription_sender.clone(),
subscription_receiver: self.subscription_sender.subscribe(),
}
}
}
#[derive(Debug)]
struct TickerPlant {
core: PlantCore,
request_receiver: mpsc::Receiver<TickerPlantCommand>,
}
impl TickerPlant {
async fn new(
request_receiver: mpsc::Receiver<TickerPlantCommand>,
subscription_sender: broadcast::Sender<RithmicResponse>,
config: &RithmicConfig,
strategy: ConnectStrategy,
) -> Result<TickerPlant, RithmicError> {
let core = PlantCore::new(subscription_sender, config, strategy, "ticker_plant").await?;
Ok(TickerPlant {
core,
request_receiver,
})
}
}
impl PlantActor for TickerPlant {
type Command = TickerPlantCommand;
async fn run(&mut self) {
loop {
let result = {
let interval = &mut self.core.interval;
let ping_interval = &mut self.core.ping_interval;
let ping_manager = &mut self.core.ping_manager;
let reader = &mut self.core.rithmic_reader;
let receiver = &mut self.request_receiver;
tokio::select! {
_ = interval.tick() => SelectResult::HeartbeatFired,
_ = ping_interval.tick() => SelectResult::PingFired,
_ = async {
if let Some(t) = ping_manager.next_timeout_at() {
sleep_until(t).await
} else {
std::future::pending::<()>().await
}
} => SelectResult::PingTimeout,
Some(cmd) = receiver.recv() => SelectResult::Command(cmd),
msg = reader.next() => match msg {
Some(m) => SelectResult::RithmicMessage(m),
None => SelectResult::StreamClosed,
},
}
};
let stop = match result {
SelectResult::HeartbeatFired => self.core.send_heartbeat().await,
SelectResult::PingFired => self.core.send_ping().await,
SelectResult::PingTimeout => {
if self.core.ping_manager.check_timeout() {
if self.core.close_requested {
warn!(
"ticker_plant: ping timed out while waiting for server close echo — terminating"
);
self.core.request_handler.drain_and_drop();
} else {
self.core.fail_connection_and_drain(
"websocket_ping_timeout",
RithmicError::HeartbeatTimeout,
);
}
true
} else {
false
}
}
SelectResult::Command(cmd) => {
if matches!(cmd, TickerPlantCommand::Abort) {
info!("ticker_plant: abort requested, shutting down immediately");
self.core
.fail_connection_and_drain("", RithmicError::ConnectionClosed);
true
} else {
self.handle_command(cmd).await;
false
}
}
SelectResult::RithmicMessage(msg) => self.core.handle_rithmic_message(msg).await,
SelectResult::StreamClosed => self.core.handle_stream_closed(),
};
if stop {
break;
}
}
}
async fn handle_command(&mut self, command: TickerPlantCommand) {
let command = if self.core.close_requested {
match command.into_response_sender_or_command() {
Ok(tx) => {
let _ = tx.send(Err(RithmicError::ConnectionClosed));
return;
}
Err(cmd) => cmd,
}
} else {
command
};
match command {
TickerPlantCommand::Close => {
self.core.handle_close().await;
}
TickerPlantCommand::ListSystemInfo { response_sender } => {
self.core.handle_list_system_info(response_sender).await;
}
TickerPlantCommand::Login {
config,
response_sender,
} => {
self.core
.handle_login(config, SysInfraType::TickerPlant, response_sender)
.await;
}
TickerPlantCommand::SetLogin => {
self.core.handle_set_login();
}
TickerPlantCommand::Logout { response_sender } => {
self.core.handle_logout(response_sender).await;
}
TickerPlantCommand::UpdateHeartbeat { seconds } => {
self.core.handle_update_heartbeat(seconds);
}
TickerPlantCommand::Subscribe {
symbol,
exchange,
fields,
request_type,
response_sender,
} => {
let (sub_buf, id) = self.core.rithmic_sender_api.request_market_data_update(
&symbol,
&exchange,
fields,
request_type,
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(sub_buf.into()), &id)
.await;
}
TickerPlantCommand::SubscribeOrderBook {
symbol,
exchange,
request_type,
response_sender,
} => {
let (sub_buf, id) = self.core.rithmic_sender_api.request_depth_by_order_update(
&symbol,
&exchange,
request_type,
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(sub_buf.into()), &id)
.await;
}
TickerPlantCommand::RequestDepthByOrderSnapshot {
symbol,
exchange,
response_sender,
} => {
let (snapshot_buf, id) = self
.core
.rithmic_sender_api
.request_depth_by_order_snapshot(&symbol, &exchange);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(snapshot_buf.into()), &id)
.await;
}
TickerPlantCommand::SearchSymbols {
search_text,
exchange,
product_code,
instrument_type,
pattern,
response_sender,
} => {
let (search_buf, id) = self.core.rithmic_sender_api.request_search_symbols(
&search_text,
exchange.as_deref(),
product_code.as_deref(),
instrument_type,
pattern,
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(search_buf.into()), &id)
.await;
}
TickerPlantCommand::ListExchanges {
user,
response_sender,
} => {
let (list_buf, id) = self.core.rithmic_sender_api.request_list_exchanges(&user);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(list_buf.into()), &id)
.await;
}
TickerPlantCommand::GetInstrumentByUnderlying {
underlying_symbol,
exchange,
expiration_date,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_get_instrument_by_underlying(
&underlying_symbol,
&exchange,
expiration_date.as_deref(),
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::SubscribeByUnderlying {
underlying_symbol,
exchange,
expiration_date,
fields,
request_type,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_market_data_update_by_underlying(
&underlying_symbol,
&exchange,
expiration_date.as_deref(),
fields,
request_type,
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetTickSizeTypeTable {
tick_size_type,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_give_tick_size_type_table(&tick_size_type);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetProductCodes {
exchange,
give_toi_products_only,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_product_codes(exchange.as_deref(), give_toi_products_only);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetVolumeAtPrice {
symbol,
exchange,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_get_volume_at_price(&symbol, &exchange);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetAuxilliaryReferenceData {
symbol,
exchange,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_auxilliary_reference_data(&symbol, &exchange);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetReferenceData {
symbol,
exchange,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_reference_data(&symbol, &exchange);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetFrontMonthContract {
symbol,
exchange,
need_updates,
response_sender,
} => {
let (buf, id) = self.core.rithmic_sender_api.request_front_month_contract(
&symbol,
&exchange,
need_updates,
);
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::GetSystemGatewayInfo {
system_name,
response_sender,
} => {
let (buf, id) = self
.core
.rithmic_sender_api
.request_rithmic_system_gateway_info(system_name.as_deref());
self.core.request_handler.register_request(RithmicRequest {
request_id: id.clone(),
responder: response_sender,
});
self.core
.send_or_fail(Message::Binary(buf.into()), &id)
.await;
}
TickerPlantCommand::Abort => {
unreachable!("Abort is handled in run() before handle_command");
}
}
}
}
pub struct RithmicTickerPlantHandle {
sender: mpsc::Sender<TickerPlantCommand>,
subscription_sender: broadcast::Sender<RithmicResponse>,
pub subscription_receiver: broadcast::Receiver<RithmicResponse>,
}
impl std::fmt::Debug for RithmicTickerPlantHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RithmicTickerPlantHandle")
.field("sender", &self.sender)
.field("subscription_sender", &self.subscription_sender)
.finish_non_exhaustive()
}
}
impl RithmicTickerPlantHandle {
pub async fn list_system_info(&self) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::ListSystemInfo {
response_sender: tx,
};
let _ = self.sender.send(command).await;
let response = rx
.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)?;
Ok(response)
}
pub async fn login(&self) -> Result<RithmicResponse, RithmicError> {
self.login_with_config(LoginConfig::default()).await
}
pub async fn login_with_config(
&self,
config: LoginConfig,
) -> Result<RithmicResponse, RithmicError> {
info!("ticker_plant: logging in");
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let mut config = config;
if config.aggregated_quotes.is_none() {
config.aggregated_quotes = Some(false);
}
let command = TickerPlantCommand::Login {
config,
response_sender: tx,
};
let _ = self.sender.send(command).await;
let response = rx
.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)?;
if let Some(err) = response.error.clone() {
error!("ticker_plant: login failed {:?}", err);
return Err(err);
}
let _ = self.sender.send(TickerPlantCommand::SetLogin).await;
if let RithmicMessage::ResponseLogin(resp) = &response.message {
if let Some(hb) = resp.heartbeat_interval {
let secs = hb.max(HEARTBEAT_SECS as f64) as u64;
self.update_heartbeat(secs).await;
}
if let Some(session_id) = &resp.unique_user_id {
info!("ticker_plant: session id: {}", session_id);
}
}
info!("ticker_plant: logged in");
Ok(response)
}
pub async fn disconnect(&self) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::Logout {
response_sender: tx,
};
let _ = self.sender.send(command).await;
let r = rx.await.map_err(|_| RithmicError::ConnectionClosed)??;
let _ = self.sender.send(TickerPlantCommand::Close).await;
let response = r.into_iter().next().ok_or(RithmicError::EmptyResponse)?;
let _ = self.subscription_sender.send(response.clone());
Ok(response)
}
pub fn abort(&self) {
let _ = self.sender.try_send(TickerPlantCommand::Abort);
}
pub async fn subscribe(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::Subscribe {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
fields: vec![UpdateBits::LastTrade, UpdateBits::Bbo],
request_type: Request::Subscribe,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn subscribe_order_book(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::SubscribeOrderBook {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
request_type: request_depth_by_order_updates::Request::Subscribe,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn unsubscribe(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::Subscribe {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
fields: vec![UpdateBits::LastTrade, UpdateBits::Bbo],
request_type: Request::Unsubscribe,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn unsubscribe_order_book(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::SubscribeOrderBook {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
request_type: request_depth_by_order_updates::Request::Unsubscribe,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
async fn request_market_data_update(
&self,
symbol: &str,
exchange: &str,
fields: Vec<UpdateBits>,
request_type: Request,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::Subscribe {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
fields,
request_type,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn subscribe_instrument_status(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::MarketMode],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_instrument_status(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::MarketMode],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_order_book_summary(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OrderBook],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_order_book_summary(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OrderBook],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_session_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::Open, UpdateBits::HighLow],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_session_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::Open, UpdateBits::HighLow],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_quote_statistics(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::HighBidLowAsk],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_quote_statistics(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::HighBidLowAsk],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_indicator_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OpeningIndicator, UpdateBits::ClosingIndicator],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_indicator_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OpeningIndicator, UpdateBits::ClosingIndicator],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_open_interest(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OpenInterest],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_open_interest(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::OpenInterest],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_end_of_day_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![
UpdateBits::Close,
UpdateBits::Settlement,
UpdateBits::ProjectedSettlement,
UpdateBits::AdjustedClose,
],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_end_of_day_prices(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![
UpdateBits::Close,
UpdateBits::Settlement,
UpdateBits::ProjectedSettlement,
UpdateBits::AdjustedClose,
],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_order_price_limits(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::HighPriceLimit, UpdateBits::LowPriceLimit],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_order_price_limits(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::HighPriceLimit, UpdateBits::LowPriceLimit],
Request::Unsubscribe,
)
.await
}
pub async fn subscribe_symbol_margin_rate(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::MarginRate],
Request::Subscribe,
)
.await
}
pub async fn unsubscribe_symbol_margin_rate(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
self.request_market_data_update(
symbol,
exchange,
vec![UpdateBits::MarginRate],
Request::Unsubscribe,
)
.await
}
pub async fn request_depth_by_order_snapshot(
&self,
symbol: &str,
exchange: &str,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::RequestDepthByOrderSnapshot {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
async fn update_heartbeat(&self, seconds: u64) {
let command = TickerPlantCommand::UpdateHeartbeat { seconds };
let _ = self.sender.send(command).await;
}
pub async fn search_symbols(
&self,
search_text: &str,
exchange: Option<&str>,
product_code: Option<&str>,
instrument_type: Option<request_search_symbols::InstrumentType>,
pattern: Option<request_search_symbols::Pattern>,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::SearchSymbols {
search_text: search_text.to_string(),
exchange: exchange.map(|e| e.to_string()),
product_code: product_code.map(|p| p.to_string()),
instrument_type,
pattern,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn list_exchanges(&self, user: &str) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::ListExchanges {
user: user.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn get_instrument_by_underlying(
&self,
underlying_symbol: &str,
exchange: &str,
expiration_date: Option<&str>,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetInstrumentByUnderlying {
underlying_symbol: underlying_symbol.to_string(),
exchange: exchange.to_string(),
expiration_date: expiration_date.map(|d| d.to_string()),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn subscribe_by_underlying(
&self,
underlying_symbol: &str,
exchange: &str,
expiration_date: Option<&str>,
fields: Vec<request_market_data_update_by_underlying::UpdateBits>,
request_type: request_market_data_update_by_underlying::Request,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::SubscribeByUnderlying {
underlying_symbol: underlying_symbol.to_string(),
exchange: exchange.to_string(),
expiration_date: expiration_date.map(|d| d.to_string()),
fields,
request_type,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn get_tick_size_type_table(
&self,
tick_size_type: &str,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetTickSizeTypeTable {
tick_size_type: tick_size_type.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn get_product_codes(
&self,
exchange: Option<&str>,
give_toi_products_only: Option<bool>,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetProductCodes {
exchange: exchange.map(|e| e.to_string()),
give_toi_products_only,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn get_volume_at_price(
&self,
symbol: &str,
exchange: &str,
) -> Result<Vec<RithmicResponse>, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetVolumeAtPrice {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await.map_err(|_| RithmicError::ConnectionClosed)?
}
pub async fn get_auxilliary_reference_data(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetAuxilliaryReferenceData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn get_reference_data(
&self,
symbol: &str,
exchange: &str,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetReferenceData {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn get_front_month_contract(
&self,
symbol: &str,
exchange: &str,
need_updates: bool,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetFrontMonthContract {
symbol: symbol.to_string(),
exchange: exchange.to_string(),
need_updates,
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
pub async fn get_system_gateway_info(
&self,
system_name: Option<&str>,
) -> Result<RithmicResponse, RithmicError> {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let command = TickerPlantCommand::GetSystemGatewayInfo {
system_name: system_name.map(|s| s.to_string()),
response_sender: tx,
};
let _ = self.sender.send(command).await;
rx.await
.map_err(|_| RithmicError::ConnectionClosed)??
.into_iter()
.next()
.ok_or(RithmicError::EmptyResponse)
}
}
impl Clone for RithmicTickerPlantHandle {
fn clone(&self) -> Self {
RithmicTickerPlantHandle {
sender: self.sender.clone(),
subscription_receiver: self.subscription_sender.subscribe(),
subscription_sender: self.subscription_sender.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::{TickerPlantCommand, *};
use crate::{
error::RithmicError,
rti::request_market_data_update::{Request, UpdateBits},
};
#[test]
fn responder_bearing_variants_surface_sender() {
let (tx, _rx) = oneshot::channel();
let cmd = TickerPlantCommand::Subscribe {
symbol: "ESH6".to_string(),
exchange: "CME".to_string(),
fields: vec![UpdateBits::LastTrade],
request_type: Request::Subscribe,
response_sender: tx,
};
assert!(cmd.into_response_sender_or_command().is_ok());
}
#[test]
fn fire_and_forget_variants_are_preserved() {
assert!(matches!(
TickerPlantCommand::Close.into_response_sender_or_command(),
Err(TickerPlantCommand::Close)
));
assert!(matches!(
TickerPlantCommand::Abort.into_response_sender_or_command(),
Err(TickerPlantCommand::Abort)
));
assert!(matches!(
TickerPlantCommand::SetLogin.into_response_sender_or_command(),
Err(TickerPlantCommand::SetLogin)
));
}
#[tokio::test]
async fn responder_drained_with_connection_closed() {
let (tx, rx) = oneshot::channel::<Result<Vec<RithmicResponse>, RithmicError>>();
let cmd = TickerPlantCommand::Subscribe {
symbol: "ESH6".to_string(),
exchange: "CME".to_string(),
fields: vec![UpdateBits::LastTrade],
request_type: Request::Subscribe,
response_sender: tx,
};
if let Ok(sender) = cmd.into_response_sender_or_command() {
let _ = sender.send(Err(RithmicError::ConnectionClosed));
} else {
panic!("Subscribe must carry a responder");
}
assert!(matches!(
rx.await.unwrap(),
Err(RithmicError::ConnectionClosed)
));
}
}