use std::fmt::Debug;
use std::sync::Arc;
use log::debug;
use time::OffsetDateTime;
use time_tz::Tz;
use crate::client::builders::client_builder::sync_impl::ClientBuilder;
use crate::connection::common::StartupMessage;
use crate::connection::{sync::Connection, ConnectionMetadata};
use crate::contracts::Contract;
use crate::errors::Error;
use crate::market_data::builder::MarketDataBuilder;
use crate::messages::OutgoingMessages;
use crate::orders::OrderBuilder;
use crate::transport::sync::NoticeBroadcaster;
use crate::transport::{InternalSubscription, MessageBus, TcpMessageBus};
use super::id_generator::ClientIdManager;
pub struct Client {
pub(crate) server_version: i32,
pub(crate) connection_time: Option<OffsetDateTime>,
pub(crate) time_zone: Option<&'static Tz>,
pub(crate) message_bus: Arc<dyn MessageBus>,
client_id: i32, id_manager: ClientIdManager, }
impl Client {
pub fn connect(address: &str, client_id: i32) -> Result<Client, Error> {
Self::builder().address(address).client_id(client_id).connect()
}
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub(crate) fn connect_with_pieces(
address: &str,
client_id: i32,
tcp_no_delay: bool,
startup_callback: Option<Arc<dyn Fn(StartupMessage) + Send + Sync>>,
notice_broadcaster: Arc<NoticeBroadcaster>,
) -> Result<Client, Error> {
let connection = Connection::with_pieces(address, client_id, tcp_no_delay, startup_callback, notice_broadcaster)?;
let connection_metadata = connection.connection_metadata();
let message_bus = Arc::new(TcpMessageBus::new(connection)?);
message_bus.process_messages(connection_metadata.server_version)?;
Client::new(connection_metadata, message_bus)
}
fn new(connection_metadata: ConnectionMetadata, message_bus: Arc<dyn MessageBus>) -> Result<Client, Error> {
let client = Client {
server_version: connection_metadata.server_version,
connection_time: connection_metadata.connection_time,
time_zone: connection_metadata.time_zone,
message_bus,
client_id: connection_metadata.client_id,
id_manager: ClientIdManager::new(connection_metadata.next_order_id),
};
Ok(client)
}
pub fn client_id(&self) -> i32 {
self.client_id
}
pub fn next_request_id(&self) -> i32 {
self.id_manager.next_request_id()
}
pub fn next_order_id(&self) -> i32 {
self.id_manager.next_order_id()
}
pub(crate) fn set_next_order_id(&self, order_id: i32) {
self.id_manager.set_order_id(order_id);
}
pub fn order<'a>(&'a self, contract: &'a Contract) -> OrderBuilder<'a, Self> {
OrderBuilder::new(self, contract)
}
pub fn server_version(&self) -> i32 {
self.server_version
}
pub fn connection_time(&self) -> Option<OffsetDateTime> {
self.connection_time
}
pub fn time_zone(&self) -> Option<&'static Tz> {
self.time_zone
}
pub(crate) fn decoder_context(&self) -> crate::subscriptions::DecoderContext {
crate::subscriptions::DecoderContext::new(self.server_version, self.time_zone)
}
pub fn is_connected(&self) -> bool {
self.message_bus.is_connected()
}
pub fn disconnect(&self) {
self.message_bus.ensure_shutdown();
}
pub fn notice_stream(&self) -> Result<crate::subscriptions::notice_stream::sync_impl::NoticeStream, Error> {
Ok(self.message_bus.notice_subscribe())
}
pub fn market_data<'a>(&'a self, contract: &'a Contract) -> MarketDataBuilder<'a, Self> {
MarketDataBuilder::new(self, contract)
}
#[cfg(test)]
pub(crate) fn stubbed(message_bus: Arc<dyn MessageBus>, server_version: i32) -> Client {
Client {
server_version,
connection_time: None,
time_zone: None,
message_bus,
client_id: 100,
id_manager: ClientIdManager::new(-1),
}
}
pub(crate) fn send_request(&self, request_id: i32, message: Vec<u8>) -> Result<InternalSubscription, Error> {
debug!("send_message({request_id:?})");
self.message_bus.send_request(request_id, &message)
}
pub(crate) fn send_order(&self, order_id: i32, message: Vec<u8>) -> Result<InternalSubscription, Error> {
debug!("send_order({order_id:?})");
self.message_bus.send_order_request(order_id, &message)
}
pub(crate) fn send_message(&self, message: Vec<u8>) -> Result<(), Error> {
debug!("send_message()");
self.message_bus.send_message(&message)
}
pub(crate) fn create_order_update_subscription(&self) -> Result<InternalSubscription, Error> {
self.message_bus.create_order_update_subscription()
}
pub(crate) fn send_shared_request(&self, message_id: OutgoingMessages, message: Vec<u8>) -> Result<InternalSubscription, Error> {
self.message_bus.send_shared_request(message_id, &message)
}
pub(crate) fn check_server_version(&self, version: i32, message: &str) -> Result<(), Error> {
if version <= self.server_version {
Ok(())
} else {
Err(Error::ServerVersion(version, self.server_version, message.into()))
}
}
}
impl Drop for Client {
fn drop(&mut self) {
debug!("dropping basic client");
self.message_bus.ensure_shutdown();
}
}
impl Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("server_version", &self.server_version)
.field("server_time", &self.connection_time)
.field("client_id", &self.client_id)
.finish()
}
}
#[cfg(test)]
#[path = "sync_tests.rs"]
mod tests;