use std::{
fmt,
sync::{Arc, Mutex, MutexGuard},
};
use tokio::sync::broadcast;
use lnm_sdk::api_v3::models::Trade;
use crate::{
signal::{LiveSignalStatusNotRunning, Signal},
sync::SyncStatusNotSynced,
};
use super::{
super::core::TradingState,
executor::{state::LiveTradeExecutorStatusNotReady, update::LiveTradeExecutorUpdateOrder},
process::error::{LiveProcessFatalError, LiveProcessRecoverableError},
};
#[derive(Debug, Clone)]
pub enum LiveTradeStatus {
NotInitiated,
Starting,
WaitingForSync(SyncStatusNotSynced),
WaitingForSignal(LiveSignalStatusNotRunning),
WaitingTradeExecutor(LiveTradeExecutorStatusNotReady),
Running,
Failed(Arc<LiveProcessRecoverableError>),
Restarting,
ShutdownInitiated,
Shutdown,
Terminated(Arc<LiveProcessFatalError>),
}
impl PartialEq for LiveTradeStatus {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NotInitiated, Self::NotInitiated)
| (Self::Starting, Self::Starting)
| (Self::Running, Self::Running)
| (Self::Restarting, Self::Restarting)
| (Self::ShutdownInitiated, Self::ShutdownInitiated)
| (Self::Shutdown, Self::Shutdown) => true,
(Self::WaitingForSync(a), Self::WaitingForSync(b)) => a == b,
(Self::WaitingForSignal(a), Self::WaitingForSignal(b)) => a == b,
(Self::WaitingTradeExecutor(a), Self::WaitingTradeExecutor(b)) => a == b,
(Self::Failed(a), Self::Failed(b)) => Arc::ptr_eq(a, b),
(Self::Terminated(a), Self::Terminated(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
impl LiveTradeStatus {
pub fn is_stopped(&self) -> bool {
matches!(self, Self::Shutdown | Self::Terminated(_))
}
}
impl fmt::Display for LiveTradeStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotInitiated => write!(f, "Not initiated"),
Self::Starting => write!(f, "Starting"),
Self::WaitingForSync(status) => write!(f, "Waiting for sync ({status})"),
Self::WaitingForSignal(status) => write!(f, "Waiting for signal ({status})"),
Self::WaitingTradeExecutor(status) => {
write!(f, "Waiting trade executor ({status})")
}
Self::Running => write!(f, "Running"),
Self::Failed(error) => write!(f, "Failed: {error}"),
Self::Restarting => write!(f, "Restarting"),
Self::ShutdownInitiated => write!(f, "Shutdown initiated"),
Self::Shutdown => write!(f, "Shutdown"),
Self::Terminated(error) => write!(f, "Terminated: {error}"),
}
}
}
impl From<LiveProcessRecoverableError> for LiveTradeStatus {
fn from(value: LiveProcessRecoverableError) -> Self {
Self::Failed(Arc::new(value))
}
}
impl From<Arc<LiveProcessFatalError>> for LiveTradeStatus {
fn from(value: Arc<LiveProcessFatalError>) -> Self {
Self::Terminated(value)
}
}
impl From<LiveProcessFatalError> for LiveTradeStatus {
fn from(value: LiveProcessFatalError) -> Self {
Arc::new(value).into()
}
}
#[derive(Clone)]
pub enum LiveTradeUpdate<S: Signal> {
Status(LiveTradeStatus),
Signal(S),
Order(LiveTradeExecutorUpdateOrder),
TradingState(TradingState),
ClosedTrade(Trade),
}
impl<S: Signal> From<LiveTradeStatus> for LiveTradeUpdate<S> {
fn from(value: LiveTradeStatus) -> Self {
Self::Status(value)
}
}
impl<S: Signal> From<LiveTradeExecutorUpdateOrder> for LiveTradeUpdate<S> {
fn from(value: LiveTradeExecutorUpdateOrder) -> Self {
Self::Order(value)
}
}
impl<S: Signal> From<TradingState> for LiveTradeUpdate<S> {
fn from(value: TradingState) -> Self {
Self::TradingState(value)
}
}
pub(super) type LiveTradeTransmitter<S> = broadcast::Sender<LiveTradeUpdate<S>>;
pub type LiveTradeReceiver<S> = broadcast::Receiver<LiveTradeUpdate<S>>;
pub trait LiveTradeReader<S: Signal>: Send + Sync + 'static {
fn update_receiver(&self) -> LiveTradeReceiver<S>;
fn status_snapshot(&self) -> LiveTradeStatus;
}
pub(crate) struct LiveTradeStatusManager<S: Signal> {
status: Mutex<LiveTradeStatus>,
update_tx: LiveTradeTransmitter<S>,
}
impl<S: Signal> LiveTradeStatusManager<S> {
pub fn new(update_tx: LiveTradeTransmitter<S>) -> Arc<Self> {
let status = Mutex::new(LiveTradeStatus::NotInitiated);
Arc::new(Self { status, update_tx })
}
fn update_status_guard(
&self,
mut status_guard: MutexGuard<'_, LiveTradeStatus>,
new_status: LiveTradeStatus,
) {
if *status_guard == new_status {
return;
}
*status_guard = new_status.clone();
drop(status_guard);
let _ = self.update_tx.send(new_status.into());
}
fn lock_status(&self) -> MutexGuard<'_, LiveTradeStatus> {
self.status
.lock()
.expect("`LiveTradeStatusManager` mutex can't be poisoned")
}
pub fn update(&self, new_status: LiveTradeStatus) {
let status_guard = self.lock_status();
self.update_status_guard(status_guard, new_status);
}
pub fn update_if_not_running(&self, new_status: LiveTradeStatus) {
let status_guard = self.lock_status();
if matches!(*status_guard, LiveTradeStatus::Running) {
return;
}
self.update_status_guard(status_guard, new_status);
}
pub fn transmitter(&self) -> &LiveTradeTransmitter<S> {
&self.update_tx
}
}
impl<S: Signal> LiveTradeReader<S> for LiveTradeStatusManager<S> {
fn update_receiver(&self) -> LiveTradeReceiver<S> {
self.update_tx.subscribe()
}
fn status_snapshot(&self) -> LiveTradeStatus {
self.lock_status().clone()
}
}