pub(crate) mod common;
#[cfg(feature = "sync")]
use std::sync::Arc;
#[cfg(feature = "sync")]
use std::time::Duration;
#[cfg(feature = "sync")]
use crossbeam::channel::{Receiver, Sender};
use crate::errors::Error;
use crate::messages::{RequestMessage, ResponseMessage};
#[cfg(feature = "sync")]
use crate::messages::OutgoingMessages;
#[cfg(feature = "sync")]
pub mod sync;
#[cfg(feature = "async")]
pub mod r#async;
#[allow(dead_code)]
pub(crate) type Response = Result<ResponseMessage, Error>;
#[cfg(feature = "sync")]
pub(crate) trait MessageBus: Send + Sync {
fn send_request(&self, request_id: i32, packet: &RequestMessage) -> Result<InternalSubscription, Error>;
fn cancel_subscription(&self, request_id: i32, packet: &RequestMessage) -> Result<(), Error>;
fn send_shared_request(&self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<InternalSubscription, Error>;
fn cancel_shared_subscription(&self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<(), Error>;
fn send_order_request(&self, request_id: i32, packet: &RequestMessage) -> Result<InternalSubscription, Error>;
fn send_message(&self, packet: &RequestMessage) -> Result<(), Error>;
fn create_order_update_subscription(&self) -> Result<InternalSubscription, Error>;
fn cancel_order_subscription(&self, request_id: i32, packet: &RequestMessage) -> Result<(), Error>;
fn ensure_shutdown(&self);
fn is_connected(&self) -> bool;
#[cfg(test)]
fn request_messages(&self) -> Vec<RequestMessage> {
vec![]
}
}
#[cfg(feature = "sync")]
#[derive(Debug, Default)]
pub(crate) struct InternalSubscription {
receiver: Option<Receiver<Response>>, sender: Option<Sender<Response>>, shared_receiver: Option<Arc<Receiver<Response>>>, signaler: Option<Sender<Signal>>, pub(crate) request_id: Option<i32>, pub(crate) order_id: Option<i32>, pub(crate) message_type: Option<OutgoingMessages>, }
#[cfg(feature = "sync")]
impl InternalSubscription {
pub(crate) fn next(&self) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::receive(receiver)
} else if let Some(receiver) = &self.shared_receiver {
Self::receive(receiver)
} else {
None
}
}
pub(crate) fn try_next(&self) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::try_receive(receiver)
} else if let Some(receiver) = &self.shared_receiver {
Self::try_receive(receiver)
} else {
None
}
}
pub(crate) fn next_timeout(&self, timeout: Duration) -> Option<Response> {
if let Some(receiver) = &self.receiver {
Self::timeout_receive(receiver, timeout)
} else if let Some(receiver) = &self.shared_receiver {
Self::timeout_receive(receiver, timeout)
} else {
None
}
}
pub(crate) fn cancel(&self) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(Err(Error::Cancelled)) {
log::warn!("error sending cancel notification: {e}")
}
}
}
fn receive(receiver: &Receiver<Response>) -> Option<Response> {
receiver.recv().ok()
}
fn try_receive(receiver: &Receiver<Response>) -> Option<Response> {
receiver.try_recv().ok()
}
fn timeout_receive(receiver: &Receiver<Response>, timeout: Duration) -> Option<Response> {
receiver.recv_timeout(timeout).ok()
}
}
#[cfg(feature = "sync")]
impl Drop for InternalSubscription {
fn drop(&mut self) {
if let (Some(request_id), Some(signaler)) = (self.request_id, &self.signaler) {
if let Err(e) = signaler.send(Signal::Request(request_id)) {
log::warn!("error sending drop signal: {e}");
}
} else if let (Some(order_id), Some(signaler)) = (self.order_id, &self.signaler) {
if let Err(e) = signaler.send(Signal::Order(order_id)) {
log::warn!("error sending drop signal: {e}");
}
} else if let Some(signaler) = &self.signaler {
if let Err(e) = signaler.send(Signal::OrderUpdateStream) {
log::warn!("error sending drop signal: {e}");
}
}
}
}
#[cfg(feature = "sync")]
pub enum Signal {
Request(i32),
Order(i32),
OrderUpdateStream,
}
#[cfg(feature = "sync")]
pub(crate) struct SubscriptionBuilder {
receiver: Option<Receiver<Response>>,
sender: Option<Sender<Response>>,
shared_receiver: Option<Arc<Receiver<Response>>>,
signaler: Option<Sender<Signal>>,
order_id: Option<i32>,
request_id: Option<i32>,
message_type: Option<OutgoingMessages>,
}
#[cfg(feature = "sync")]
impl SubscriptionBuilder {
pub(crate) fn new() -> Self {
Self {
receiver: None,
sender: None,
shared_receiver: None,
signaler: None,
order_id: None,
request_id: None,
message_type: None,
}
}
pub(crate) fn receiver(mut self, receiver: Receiver<Response>) -> Self {
self.receiver = Some(receiver);
self
}
pub(crate) fn sender(mut self, sender: Sender<Response>) -> Self {
self.sender = Some(sender);
self
}
pub(crate) fn shared_receiver(mut self, receiver: Arc<Receiver<Response>>) -> Self {
self.shared_receiver = Some(receiver);
self
}
pub(crate) fn signaler(mut self, signaler: Sender<Signal>) -> Self {
self.signaler = Some(signaler);
self
}
pub(crate) fn order_id(mut self, order_id: i32) -> Self {
self.order_id = Some(order_id);
self
}
pub(crate) fn request_id(mut self, request_id: i32) -> Self {
self.request_id = Some(request_id);
self
}
pub(crate) fn message_type(mut self, message_type: OutgoingMessages) -> Self {
self.message_type = Some(message_type);
self
}
pub(crate) fn build(self) -> InternalSubscription {
if let (Some(receiver), Some(signaler)) = (self.receiver, self.signaler) {
InternalSubscription {
receiver: Some(receiver),
sender: self.sender,
shared_receiver: None,
signaler: Some(signaler),
request_id: self.request_id,
order_id: self.order_id,
message_type: self.message_type,
}
} else if let Some(receiver) = self.shared_receiver {
InternalSubscription {
receiver: None,
sender: None,
shared_receiver: Some(receiver),
signaler: None,
request_id: self.request_id,
order_id: self.order_id,
message_type: self.message_type,
}
} else {
panic!("bad configuration");
}
}
}
#[cfg(feature = "sync")]
pub use sync::TcpMessageBus;
#[cfg(feature = "async")]
pub use r#async::{AsyncInternalSubscription, AsyncMessageBus};
pub mod connection;
pub mod recorder;
pub mod routing;