barter 0.12.5

Framework for building high-performance live-trading, paper-trading and back-testing systems
Documentation
use crate::{
    engine::{
        Engine, EngineOutput, UpdateFromAccountOutput, UpdateFromMarketOutput,
        audit::context::EngineContext, clock::EngineClock, error::UnrecoverableEngineError,
    },
    strategy::{on_disconnect::OnDisconnectStrategy, on_trading_disabled::OnTradingDisabled},
};
use barter_integration::{FeedEnded, Terminal, collection::none_one_or_many::NoneOneOrMany};
use derive_more::Constructor;
use serde::{Deserialize, Serialize};

/// Defines data structures that represent the context an `Engine` [`AuditTick`] was generated.
pub mod context;

/// Defines a `StateReplicaManager` that can be used to maintain an `EngineState` replica.
///
/// Useful for supporting non-hot path trading system components such as UIs, web apps, etc.
pub mod state_replica;

/// Interface that defines how a component (eg/ `Engine`) generates [`AuditTick`]s.
pub trait Auditor<AuditKind> {
    /// Full state snapshot.
    type Snapshot;

    /// `AuditTick` context.
    ///
    /// For example, the `Engine` uses [`EngineContext`].
    type Context;

    /// Build an `AuditTick` containing a full state snapshot.
    fn audit_snapshot(&mut self) -> AuditTick<Self::Snapshot, Self::Context>;

    /// Build an `AuditTick` from the provided `Kind`.
    fn audit<Kind>(&mut self, kind: Kind) -> AuditTick<AuditKind, Self::Context>
    where
        AuditKind: From<Kind>;
}

impl<Audit, Clock, State, ExecutionTxs, Strategy, Risk> Auditor<Audit>
    for Engine<Clock, State, ExecutionTxs, Strategy, Risk>
where
    Clock: EngineClock,
    State: Clone,
    Strategy: OnTradingDisabled<Clock, State, ExecutionTxs, Risk>
        + OnDisconnectStrategy<Clock, State, ExecutionTxs, Risk>,
{
    type Snapshot = State;
    type Context = EngineContext;

    fn audit_snapshot(&mut self) -> AuditTick<Self::Snapshot, Self::Context> {
        self.audit(self.state.clone())
    }

    fn audit<Kind>(&mut self, kind: Kind) -> AuditTick<Audit, Self::Context>
    where
        Audit: From<Kind>,
    {
        AuditTick {
            event: Audit::from(kind),
            context: EngineContext {
                sequence: self.meta.sequence.fetch_add(),
                time: self.clock.time(),
            },
        }
    }
}

/// `Engine` audit event & it's associated context. Sent via the AuditStream.
#[derive(
    Debug,
    Copy,
    Clone,
    Eq,
    PartialEq,
    Ord,
    PartialOrd,
    Hash,
    Default,
    Deserialize,
    Serialize,
    Constructor,
)]
pub struct AuditTick<Kind, Context = EngineContext> {
    pub event: Kind,
    pub context: Context,
}

#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
/// Represents [`AuditTick`] types that are generated by the `Engine` and sent via the AuditStream.
pub enum EngineAudit<Event, Output> {
    /// Input event feed ended.
    FeedEnded,
    /// `Engine` processed an `Event`.
    Process(ProcessAudit<Event, Output>),
}

impl<Event, Output> Terminal for EngineAudit<Event, Output>
where
    Event: Terminal,
{
    fn is_terminal(&self) -> bool {
        match self {
            EngineAudit::FeedEnded => true,
            EngineAudit::Process(audit) => audit.is_terminal(),
        }
    }
}

impl<Event, Output> From<FeedEnded> for EngineAudit<Event, Output> {
    fn from(_: FeedEnded) -> Self {
        Self::FeedEnded
    }
}

impl<Event, Output> EngineAudit<Event, Output> {
    pub fn process<E>(event: E) -> Self
    where
        E: Into<Event>,
    {
        Self::Process(ProcessAudit::with_event(event))
    }

    pub fn process_with_output<E, O>(event: E, output: O) -> Self
    where
        E: Into<Event>,
        O: Into<Output>,
    {
        Self::Process(ProcessAudit::with_output(event, output))
    }

    pub fn process_with_output_and_errs<E, ErrIter, O>(
        event: E,
        unrecoverable: ErrIter,
        output: O,
    ) -> Self
    where
        E: Into<Event>,
        ErrIter: IntoIterator<Item = UnrecoverableEngineError>,
        O: Into<Output>,
    {
        Self::Process(ProcessAudit {
            event: event.into(),
            outputs: NoneOneOrMany::One(output.into()),
            errors: NoneOneOrMany::from_iter(unrecoverable),
        })
    }

    pub fn with_process_and_err<ErrIter>(
        process: ProcessAudit<Event, Output>,
        unrecoverable: ErrIter,
    ) -> Self
    where
        ErrIter: IntoIterator<Item = UnrecoverableEngineError>,
    {
        let process = process.add_errors(unrecoverable);
        Self::Process(process)
    }
}

#[derive(
    Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
)]
/// Represents [`AuditTick`] types that are generated when an `Engine` processes an `Event`.
pub struct ProcessAudit<Event, Output> {
    pub event: Event,
    pub outputs: NoneOneOrMany<Output>,
    pub errors: NoneOneOrMany<UnrecoverableEngineError>,
}

impl<Event, Output> Terminal for ProcessAudit<Event, Output>
where
    Event: Terminal,
{
    fn is_terminal(&self) -> bool {
        self.event.is_terminal() || !self.errors.is_empty()
    }
}

impl<Event, Output> ProcessAudit<Event, Output> {
    pub fn with_event<E>(event: E) -> Self
    where
        E: Into<Event>,
    {
        Self {
            event: event.into(),
            outputs: NoneOneOrMany::None,
            errors: NoneOneOrMany::None,
        }
    }

    pub fn with_output<E, O>(event: E, output: O) -> Self
    where
        E: Into<Event>,
        O: Into<Output>,
    {
        Self {
            event: event.into(),
            outputs: NoneOneOrMany::One(output.into()),
            errors: NoneOneOrMany::None,
        }
    }
}

impl<Event, OnTradingDisabled, OnDisconnect>
    ProcessAudit<Event, EngineOutput<OnTradingDisabled, OnDisconnect>>
{
    pub fn with_trading_state_update<E>(event: E, disabled: Option<OnTradingDisabled>) -> Self
    where
        E: Into<Event>,
    {
        if let Some(disabled) = disabled {
            Self {
                event: event.into(),
                outputs: NoneOneOrMany::One(EngineOutput::OnTradingDisabled(disabled)),
                errors: NoneOneOrMany::None,
            }
        } else {
            Self::with_event(event)
        }
    }

    pub fn with_account_update<E>(event: E, account: UpdateFromAccountOutput<OnDisconnect>) -> Self
    where
        E: Into<Event>,
    {
        match account {
            UpdateFromAccountOutput::None => Self::with_event(event),
            UpdateFromAccountOutput::OnDisconnect(disconnect) => {
                Self::with_output(event, EngineOutput::AccountDisconnect(disconnect))
            }
            UpdateFromAccountOutput::PositionExit(position) => Self::with_output(event, position),
        }
    }

    pub fn with_market_update<E>(event: E, account: UpdateFromMarketOutput<OnDisconnect>) -> Self
    where
        E: Into<Event>,
    {
        match account {
            UpdateFromMarketOutput::None => Self::with_event(event),
            UpdateFromMarketOutput::OnDisconnect(disconnect) => {
                Self::with_output(event, EngineOutput::MarketDisconnect(disconnect))
            }
        }
    }
}

impl<Event, Output> ProcessAudit<Event, Output> {
    pub fn add_output<O>(self, output: O) -> Self
    where
        O: Into<Output>,
    {
        let Self {
            event,
            outputs,
            errors,
        } = self;

        Self {
            event,
            outputs: outputs.extend(NoneOneOrMany::One(output.into())),
            errors,
        }
    }

    pub fn add_errors<ErrIter>(self, errs: ErrIter) -> Self
    where
        ErrIter: IntoIterator<Item = UnrecoverableEngineError>,
    {
        let Self {
            event,
            outputs,
            errors,
        } = self;

        Self {
            event,
            outputs,
            errors: errors.extend(errs),
        }
    }
}

impl<Event, Output> From<ProcessAudit<Event, Output>> for EngineAudit<Event, Output> {
    fn from(value: ProcessAudit<Event, Output>) -> Self {
        Self::Process(value)
    }
}