1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
use crate::{
execution::FillEvent,
portfolio::{
position::{Position, PositionExit, PositionUpdate},
Balance, OrderEvent,
},
strategy::{Signal, SignalForceExit},
};
use barter_data::event::{DataKind, MarketEvent};
use serde::Serialize;
use std::fmt::Debug;
use tokio::sync::mpsc;
use tracing::warn;
/// Events that occur when bartering. [`MarketEvent`], [`Signal`], [`OrderEvent`], and
/// [`FillEvent`] are vital to the [`Trader`](crate::engine::trader::Trader) event loop, dictating
/// the trading sequence. The [`PositionExit`] Event is a representation of work done by the
/// system, and is useful for analysing performance & reconciliations.
#[derive(Clone, PartialEq, Debug, Serialize)]
pub enum Event {
Market(MarketEvent<DataKind>),
Signal(Signal),
SignalForceExit(SignalForceExit),
OrderNew(OrderEvent),
OrderUpdate,
Fill(FillEvent),
PositionNew(Position),
PositionUpdate(PositionUpdate),
PositionExit(PositionExit),
Balance(Balance),
}
/// Message transmitter for sending Barter messages to downstream consumers.
pub trait MessageTransmitter<Message> {
/// Attempts to send a message to an external message subscriber.
fn send(&mut self, message: Message);
/// Attempts to send many messages to an external message subscriber.
fn send_many(&mut self, messages: Vec<Message>);
}
/// Transmitter for sending Barter [`Event`]s to an external sink. Useful for event-sourcing,
/// real-time dashboards & general monitoring.
#[derive(Debug, Clone)]
pub struct EventTx {
/// Flag to communicate if the external [`Event`] receiver has been dropped.
receiver_dropped: bool,
/// [`Event`] channel transmitter to send [`Event`]s to an external sink.
event_tx: mpsc::UnboundedSender<Event>,
}
impl MessageTransmitter<Event> for EventTx {
fn send(&mut self, message: Event) {
if self.receiver_dropped {
return;
}
if self.event_tx.send(message).is_err() {
warn!(
action = "setting receiver_dropped = true",
why = "event receiver dropped",
"cannot send Events"
);
self.receiver_dropped = true;
}
}
fn send_many(&mut self, messages: Vec<Event>) {
if self.receiver_dropped {
return;
}
messages.into_iter().for_each(|message| {
let _ = self.event_tx.send(message);
})
}
}
impl EventTx {
/// Constructs a new [`EventTx`] instance using the provided channel transmitter.
pub fn new(event_tx: mpsc::UnboundedSender<Event>) -> Self {
Self {
receiver_dropped: false,
event_tx,
}
}
}