use crate::{
engine::{
Engine,
error::{EngineError, RecoverableEngineError, UnrecoverableEngineError},
execution_tx::ExecutionTxMap,
},
execution::request::ExecutionRequest,
};
use barter_execution::order::{
OrderEvent,
request::{RequestCancel, RequestOpen},
};
use barter_instrument::{exchange::ExchangeIndex, instrument::InstrumentIndex};
use barter_integration::{Unrecoverable, channel::Tx, collection::none_one_or_many::NoneOneOrMany};
use derive_more::Constructor;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::error;
pub trait SendRequests<ExchangeKey = ExchangeIndex, InstrumentKey = InstrumentIndex> {
fn send_requests<Kind>(
&self,
requests: impl IntoIterator<Item = OrderEvent<Kind, ExchangeKey, InstrumentKey>>,
) -> SendRequestsOutput<Kind, ExchangeKey, InstrumentKey>
where
Kind: Debug + Clone,
ExecutionRequest<ExchangeKey, InstrumentKey>:
From<OrderEvent<Kind, ExchangeKey, InstrumentKey>>;
fn send_request<Kind>(
&self,
request: &OrderEvent<Kind, ExchangeKey, InstrumentKey>,
) -> Result<(), EngineError>
where
Kind: Debug + Clone,
ExecutionRequest<ExchangeKey, InstrumentKey>:
From<OrderEvent<Kind, ExchangeKey, InstrumentKey>>;
}
impl<Clock, State, ExecutionTxs, Strategy, Risk, ExchangeKey, InstrumentKey>
SendRequests<ExchangeKey, InstrumentKey> for Engine<Clock, State, ExecutionTxs, Strategy, Risk>
where
ExecutionTxs: ExecutionTxMap<ExchangeKey, InstrumentKey>,
ExchangeKey: Debug + Clone,
InstrumentKey: Debug + Clone,
{
fn send_requests<Kind>(
&self,
requests: impl IntoIterator<Item = OrderEvent<Kind, ExchangeKey, InstrumentKey>>,
) -> SendRequestsOutput<Kind, ExchangeKey, InstrumentKey>
where
Kind: Debug + Clone,
ExecutionRequest<ExchangeKey, InstrumentKey>:
From<OrderEvent<Kind, ExchangeKey, InstrumentKey>>,
{
let (sent, errors): (Vec<_>, Vec<_>) = requests
.into_iter()
.map(|request| {
self.send_request(&request)
.map_err(|error| (request.clone(), error))
.map(|_| request)
})
.partition_result();
SendRequestsOutput::new(NoneOneOrMany::from(sent), NoneOneOrMany::from(errors))
}
fn send_request<Kind>(
&self,
request: &OrderEvent<Kind, ExchangeKey, InstrumentKey>,
) -> Result<(), EngineError>
where
Kind: Debug + Clone,
ExecutionRequest<ExchangeKey, InstrumentKey>:
From<OrderEvent<Kind, ExchangeKey, InstrumentKey>>,
{
match self
.execution_txs
.find(&request.key.exchange)?
.send(ExecutionRequest::from(request.clone()))
{
Ok(()) => Ok(()),
Err(error) if error.is_unrecoverable() => {
error!(
exchange = ?request.key.exchange,
?request,
?error,
"failed to send ExecutionRequest due to terminated channel"
);
Err(EngineError::Unrecoverable(
UnrecoverableEngineError::ExecutionChannelTerminated(format!(
"{:?} execution channel terminated: {:?}",
request.key.exchange, error
)),
))
}
Err(error) => {
error!(
exchange = ?request.key.exchange,
?request,
?error,
"failed to send ExecutionRequest due to unhealthy channel"
);
Err(EngineError::Recoverable(
RecoverableEngineError::ExecutionChannelUnhealthy(format!(
"{:?} execution channel unhealthy: {:?}",
request.key.exchange, error
)),
))
}
}
}
}
#[derive(
Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
)]
pub struct SendCancelsAndOpensOutput<ExchangeKey = ExchangeIndex, InstrumentKey = InstrumentIndex> {
pub cancels: SendRequestsOutput<RequestCancel, ExchangeKey, InstrumentKey>,
pub opens: SendRequestsOutput<RequestOpen, ExchangeKey, InstrumentKey>,
}
impl<ExchangeKey, InstrumentKey> SendCancelsAndOpensOutput<ExchangeKey, InstrumentKey> {
pub fn is_empty(&self) -> bool {
self.cancels.is_empty() && self.opens.is_empty()
}
pub fn unrecoverable_errors(&self) -> NoneOneOrMany<UnrecoverableEngineError> {
self.cancels
.unrecoverable_errors()
.extend(self.opens.unrecoverable_errors())
}
}
impl<ExchangeKey, InstrumentKey> Default for SendCancelsAndOpensOutput<ExchangeKey, InstrumentKey> {
fn default() -> Self {
Self {
cancels: SendRequestsOutput::default(),
opens: SendRequestsOutput::default(),
}
}
}
#[derive(
Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
)]
pub struct SendRequestsOutput<Kind, ExchangeKey = ExchangeIndex, InstrumentKey = InstrumentIndex> {
pub sent: NoneOneOrMany<OrderEvent<Kind, ExchangeKey, InstrumentKey>>,
pub errors: NoneOneOrMany<(OrderEvent<Kind, ExchangeKey, InstrumentKey>, EngineError)>,
}
impl<Kind, ExchangeKey, InstrumentKey> SendRequestsOutput<Kind, ExchangeKey, InstrumentKey> {
pub fn is_empty(&self) -> bool {
self.sent.is_none() && self.errors.is_none()
}
pub fn unrecoverable_errors(&self) -> NoneOneOrMany<UnrecoverableEngineError> {
self.errors
.iter()
.filter_map(|(_order, error)| match error {
EngineError::Unrecoverable(error) => Some(error.clone()),
_ => None,
})
.collect()
}
}
impl<ExchangeKey, InstrumentKey, Kind> Default
for SendRequestsOutput<ExchangeKey, InstrumentKey, Kind>
{
fn default() -> Self {
Self {
sent: NoneOneOrMany::default(),
errors: NoneOneOrMany::default(),
}
}
}