use std::{fmt, sync::Arc};
use tokio::sync::{Mutex, MutexGuard};
use crate::sync::SyncStatusNotSynced;
use super::{
error::{
ExecutorActionError, ExecutorActionResult, ExecutorProcessFatalError,
ExecutorProcessRecoverableError,
},
update::LiveTradeExecutorTransmitter,
};
pub(in crate::trade) mod live_trading_session;
use live_trading_session::LiveTradingSession;
#[derive(Debug, Clone)]
pub enum LiveTradeExecutorStatusNotReady {
Starting,
WaitingForSync(SyncStatusNotSynced),
Failed(Arc<ExecutorProcessRecoverableError>),
Terminated(Arc<ExecutorProcessFatalError>),
ShutdownInitiated,
Shutdown,
}
impl LiveTradeExecutorStatusNotReady {
pub fn is_failed(&self) -> bool {
matches!(self, Self::Failed(_))
}
pub fn is_unrecoverable(&self) -> bool {
match self {
Self::Starting | Self::WaitingForSync(_) | Self::Failed(_) => false,
Self::ShutdownInitiated | Self::Shutdown | Self::Terminated(_) => true,
}
}
}
impl PartialEq for LiveTradeExecutorStatusNotReady {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Starting, Self::Starting)
| (Self::ShutdownInitiated, Self::ShutdownInitiated)
| (Self::Shutdown, Self::Shutdown) => true,
(Self::WaitingForSync(a), Self::WaitingForSync(b)) => a == b,
(Self::Failed(a), Self::Failed(b)) => Arc::ptr_eq(a, b),
(Self::Terminated(a), Self::Terminated(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
impl fmt::Display for LiveTradeExecutorStatusNotReady {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Starting => write!(f, "Starting"),
Self::WaitingForSync(status) => {
write!(f, "Waiting for sync ({status})")
}
Self::Failed(error) => write!(f, "Failed: {error}"),
Self::Terminated(error) => write!(f, "Terminated: {error}"),
Self::ShutdownInitiated => write!(f, "Shutdown initiated"),
Self::Shutdown => write!(f, "Shutdown"),
}
}
}
#[derive(Debug, Clone)]
pub enum LiveTradeExecutorStatus {
NotReady(LiveTradeExecutorStatusNotReady),
Ready,
}
impl fmt::Display for LiveTradeExecutorStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotReady(status) => write!(f, "Not ready ({})", status),
Self::Ready => write!(f, "Ready"),
}
}
}
#[derive(Debug, Clone)]
pub(in crate::trade) struct LiveTradeExecutorState {
status: LiveTradeExecutorStatus,
trading_session: Option<LiveTradingSession>,
}
impl LiveTradeExecutorState {
pub fn status(&self) -> &LiveTradeExecutorStatus {
&self.status
}
pub fn trading_session(&self) -> Option<&LiveTradingSession> {
self.trading_session.as_ref()
}
}
impl From<LiveTradeExecutorStatusNotReady> for LiveTradeExecutorStatus {
fn from(value: LiveTradeExecutorStatusNotReady) -> Self {
Self::NotReady(value)
}
}
pub(super) struct LockedLiveTradeExecutorState<'a> {
state_guard: MutexGuard<'a, LiveTradeExecutorState>,
update_tx: LiveTradeExecutorTransmitter,
}
impl<'a> LockedLiveTradeExecutorState<'a> {
pub fn trading_session(&self) -> Option<&LiveTradingSession> {
self.state_guard.trading_session()
}
pub fn update_status_not_ready(
mut self,
new_status_not_ready: LiveTradeExecutorStatusNotReady,
) {
if let LiveTradeExecutorStatus::NotReady(current) = &self.state_guard.status
&& *current == new_status_not_ready
{
return;
}
if new_status_not_ready.is_failed()
&& let Some(session) = self.state_guard.trading_session.as_mut()
{
session.expire();
}
let new_status: LiveTradeExecutorStatus = new_status_not_ready.into();
self.state_guard.status = new_status.clone();
let _ = self.update_tx.send(new_status.into());
}
pub fn update_status_ready(mut self, new_trading_session: LiveTradingSession) {
if !matches!(self.state_guard.status, LiveTradeExecutorStatus::Ready) {
self.state_guard.status = LiveTradeExecutorStatus::Ready;
let _ = self.update_tx.send(LiveTradeExecutorStatus::Ready.into());
}
self.state_guard.trading_session = Some(new_trading_session.clone());
let _ = self.update_tx.send(new_trading_session.into());
}
}
pub(super) struct LockedLiveTradeExecutorStateReady<'a>(LockedLiveTradeExecutorState<'a>);
impl<'a> TryFrom<LockedLiveTradeExecutorState<'a>> for LockedLiveTradeExecutorStateReady<'a> {
type Error = ExecutorActionError;
fn try_from(value: LockedLiveTradeExecutorState<'a>) -> Result<Self, Self::Error> {
match value.state_guard.status {
LiveTradeExecutorStatus::Ready if value.state_guard.trading_session.is_some() => {
Ok(Self(value))
}
LiveTradeExecutorStatus::Ready => Err(ExecutorActionError::ExecutorNotReadyNoSession),
LiveTradeExecutorStatus::NotReady(ref not_ready_status) => Err(
ExecutorActionError::ExecutorNotReady(not_ready_status.clone()),
),
}
}
}
impl<'a> LockedLiveTradeExecutorStateReady<'a> {
pub fn trading_session(&self) -> &LiveTradingSession {
if !matches!(self.0.state_guard.status, LiveTradeExecutorStatus::Ready) {
panic!("Must be `LiveTradeExecutorStatus::Ready` from `TryFrom`");
}
self.0.state_guard.trading_session.as_ref().unwrap()
}
pub async fn update_trading_session(self, new_trading_session: LiveTradingSession) {
self.0.update_status_ready(new_trading_session)
}
}
pub(super) struct LiveTradeExecutorStateManager {
state: Mutex<LiveTradeExecutorState>,
update_tx: LiveTradeExecutorTransmitter,
}
impl LiveTradeExecutorStateManager {
pub fn new(update_tx: LiveTradeExecutorTransmitter) -> Arc<Self> {
let initial_state = LiveTradeExecutorState {
status: LiveTradeExecutorStatusNotReady::Starting.into(),
trading_session: None,
};
let state = Mutex::new(initial_state);
Arc::new(Self { state, update_tx })
}
pub async fn lock_state(&self) -> LockedLiveTradeExecutorState<'_> {
let state_guard = self.state.lock().await;
LockedLiveTradeExecutorState {
state_guard,
update_tx: self.update_tx.clone(),
}
}
pub async fn try_lock_ready_state(
&self,
) -> ExecutorActionResult<LockedLiveTradeExecutorStateReady<'_>> {
let locked_state = self.lock_state().await;
LockedLiveTradeExecutorStateReady::try_from(locked_state)
}
pub async fn snapshot(&self) -> LiveTradeExecutorState {
self.state.lock().await.clone()
}
pub async fn update_status_not_ready(
&self,
new_status_not_ready: LiveTradeExecutorStatusNotReady,
) {
self.lock_state()
.await
.update_status_not_ready(new_status_not_ready)
}
pub async fn has_registered_running_trades(&self) -> bool {
self.lock_state()
.await
.trading_session()
.is_some_and(|session| !session.running_map().is_empty())
}
}