use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use crate::client::r#async::Client;
use crate::errors::Error;
use crate::messages::OutgoingMessages;
use crate::subscriptions::{DecoderContext, StreamDecoder, Subscription};
use crate::transport::{AsyncInternalSubscription, AsyncMessageBus};
#[allow(dead_code)]
pub(crate) struct RequestBuilder<'a> {
client: &'a Client,
request_id: i32,
}
#[allow(dead_code)]
impl<'a> RequestBuilder<'a> {
pub fn new(client: &'a Client) -> Self {
Self {
client,
request_id: client.next_request_id(),
}
}
pub fn with_id(client: &'a Client, request_id: i32) -> Self {
Self { client, request_id }
}
pub fn request_id(&self) -> i32 {
self.request_id
}
pub async fn check_version(self, required_version: i32, feature: &str) -> Result<Self, Error> {
self.client.check_server_version(required_version, feature)?;
Ok(self)
}
pub async fn send<T>(self, message: Vec<u8>) -> Result<Subscription<T>, Error>
where
T: StreamDecoder<T> + Send + 'static,
{
let context = self.client.decoder_context();
let message_bus = self.client.message_bus.clone();
SubscriptionBuilder::<T>::new_with_components(context, message_bus)
.send_with_request_id::<T>(self.request_id, message)
.await
}
pub async fn send_with_context<T>(self, message: Vec<u8>, context: DecoderContext) -> Result<Subscription<T>, Error>
where
T: StreamDecoder<T> + Send + 'static,
{
let message_bus = self.client.message_bus.clone();
SubscriptionBuilder::<T>::new_with_components(context, message_bus)
.send_with_request_id::<T>(self.request_id, message)
.await
}
pub async fn send_raw(self, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
self.client.send_request(self.request_id, message).await
}
}
#[allow(dead_code)]
pub(crate) struct SharedRequestBuilder<'a> {
client: &'a Client,
message_type: OutgoingMessages,
}
#[allow(dead_code)]
impl<'a> SharedRequestBuilder<'a> {
pub fn new(client: &'a Client, message_type: OutgoingMessages) -> Self {
Self { client, message_type }
}
pub async fn check_version(self, required_version: i32, feature: &str) -> Result<Self, Error> {
self.client.check_server_version(required_version, feature)?;
Ok(self)
}
pub async fn send<T>(self, message: Vec<u8>) -> Result<Subscription<T>, Error>
where
T: StreamDecoder<T> + Send + 'static,
{
let context = self.client.decoder_context();
let message_bus = self.client.message_bus.clone();
SubscriptionBuilder::<T>::new_with_components(context, message_bus)
.send_shared::<T>(self.message_type, message)
.await
}
pub async fn send_with_context<T>(self, message: Vec<u8>, context: DecoderContext) -> Result<Subscription<T>, Error>
where
T: StreamDecoder<T> + Send + 'static,
{
let message_bus = self.client.message_bus.clone();
SubscriptionBuilder::<T>::new_with_components(context, message_bus)
.send_shared::<T>(self.message_type, message)
.await
}
pub async fn send_raw(self, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
self.client.send_shared_request(self.message_type, message).await
}
}
#[allow(dead_code)]
pub(crate) struct OrderRequestBuilder<'a> {
client: &'a Client,
order_id: i32,
}
#[allow(dead_code)]
impl<'a> OrderRequestBuilder<'a> {
pub fn new(client: &'a Client) -> Self {
Self {
client,
order_id: client.next_order_id(),
}
}
pub fn with_id(client: &'a Client, order_id: i32) -> Self {
Self { client, order_id }
}
pub fn order_id(&self) -> i32 {
self.order_id
}
pub async fn check_version(self, required_version: i32, feature: &str) -> Result<Self, Error> {
self.client.check_server_version(required_version, feature)?;
Ok(self)
}
pub async fn send(self, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
self.client.send_order(self.order_id, message).await
}
}
#[allow(dead_code)]
pub(crate) struct MessageBuilder<'a> {
client: &'a Client,
}
#[allow(dead_code)]
impl<'a> MessageBuilder<'a> {
pub fn new(client: &'a Client) -> Self {
Self { client }
}
pub async fn check_version(self, required_version: i32, feature: &str) -> Result<Self, Error> {
self.client.check_server_version(required_version, feature)?;
Ok(self)
}
pub async fn send(self, message: Vec<u8>) -> Result<(), Error> {
self.client.send_message(message).await
}
}
#[allow(dead_code)]
pub(crate) struct SubscriptionBuilder<T> {
message_bus: Arc<dyn AsyncMessageBus>,
context: DecoderContext,
_phantom: PhantomData<T>,
}
#[allow(dead_code)]
impl<T> SubscriptionBuilder<T>
where
T: Send + 'static,
{
pub fn new_with_components(context: DecoderContext, message_bus: Arc<dyn AsyncMessageBus>) -> Self {
Self {
message_bus,
context,
_phantom: PhantomData,
}
}
pub fn with_context(mut self, context: DecoderContext) -> Self {
self.context = context;
self
}
pub fn with_smart_depth(mut self, is_smart_depth: bool) -> Self {
self.context.is_smart_depth = is_smart_depth;
self
}
pub async fn send_with_request_id<D>(self, request_id: i32, message: Vec<u8>) -> Result<Subscription<T>, Error>
where
D: StreamDecoder<T> + 'static,
{
let subscription = self.message_bus.send_request(request_id, message).await?;
Ok(Subscription::new_from_internal::<D>(
subscription,
self.message_bus.clone(),
Some(request_id),
None,
self.context,
))
}
pub async fn send_shared<D>(self, message_type: OutgoingMessages, message: Vec<u8>) -> Result<Subscription<T>, Error>
where
D: StreamDecoder<T> + 'static,
{
let subscription = self.message_bus.send_shared_request(message_type, message).await?;
Ok(Subscription::new_from_internal::<D>(
subscription,
self.message_bus.clone(),
None,
None,
self.context,
))
}
pub async fn send_order<D>(self, order_id: i32, message: Vec<u8>) -> Result<Subscription<T>, Error>
where
D: StreamDecoder<T> + 'static,
{
let subscription = self.message_bus.send_order_request(order_id, message).await?;
Ok(Subscription::new_from_internal::<D>(
subscription,
self.message_bus.clone(),
None,
Some(order_id),
self.context,
))
}
}
#[async_trait]
#[allow(dead_code)]
pub(crate) trait ClientRequestBuilders {
fn request(&self) -> RequestBuilder<'_>;
fn request_with_id(&self, request_id: i32) -> RequestBuilder<'_>;
fn shared_request(&self, message_type: OutgoingMessages) -> SharedRequestBuilder<'_>;
fn order_request(&self) -> OrderRequestBuilder<'_>;
fn order_request_with_id(&self, order_id: i32) -> OrderRequestBuilder<'_>;
fn message(&self) -> MessageBuilder<'_>;
}
#[allow(dead_code)]
impl ClientRequestBuilders for Client {
fn request(&self) -> RequestBuilder<'_> {
RequestBuilder::new(self)
}
fn request_with_id(&self, request_id: i32) -> RequestBuilder<'_> {
RequestBuilder::with_id(self, request_id)
}
fn shared_request(&self, message_type: OutgoingMessages) -> SharedRequestBuilder<'_> {
SharedRequestBuilder::new(self, message_type)
}
fn order_request(&self) -> OrderRequestBuilder<'_> {
OrderRequestBuilder::new(self)
}
fn order_request_with_id(&self, order_id: i32) -> OrderRequestBuilder<'_> {
OrderRequestBuilder::with_id(self, order_id)
}
fn message(&self) -> MessageBuilder<'_> {
MessageBuilder::new(self)
}
}
pub(crate) trait SubscriptionBuilderExt {
fn subscription<T>(&self) -> SubscriptionBuilder<T>
where
T: Send + 'static;
}
impl SubscriptionBuilderExt for Client {
fn subscription<T>(&self) -> SubscriptionBuilder<T>
where
T: Send + 'static,
{
let context = self.decoder_context();
let message_bus = self.message_bus.clone();
SubscriptionBuilder::new_with_components(context, message_bus)
}
}
#[cfg(test)]
#[path = "async_tests.rs"]
mod tests;