use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use tokio::sync::broadcast::error::RecvError;
use crate::{
tui::{TuiControllerShutdown, error::Result as TuiResult},
util::AbortOnDropHandle,
};
use super::super::{
error::{BacktestError, Result},
state::{BacktestReceiver, BacktestStatus, BacktestStatusManager, BacktestUpdate},
};
#[derive(Debug)]
pub struct BacktestController {
handle: Mutex<Option<AbortOnDropHandle<()>>>,
status_manager: Arc<BacktestStatusManager<BacktestUpdate>>,
}
impl BacktestController {
pub(super) fn new(
handle: AbortOnDropHandle<()>,
status_manager: Arc<BacktestStatusManager<BacktestUpdate>>,
) -> Arc<Self> {
Arc::new(Self {
handle: Mutex::new(Some(handle)),
status_manager,
})
}
pub fn receiver(&self) -> BacktestReceiver {
self.status_manager.receiver()
}
pub fn status_snapshot(&self) -> BacktestStatus {
self.status_manager.snapshot()
}
fn try_consume_handle(&self) -> Option<AbortOnDropHandle<()>> {
self.handle
.lock()
.expect("`BacktestController` mutex can't be poisoned")
.take()
}
pub async fn until_stopped(&self) -> BacktestStatus {
let mut backtest_rx = self.receiver();
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
loop {
match backtest_rx.recv().await {
Ok(backtest_update) => {
if let BacktestUpdate::Status(status) = backtest_update
&& status.is_stopped()
{
return status;
}
}
Err(RecvError::Lagged(_)) => {
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
}
Err(RecvError::Closed) => return self.status_snapshot(),
}
}
}
pub async fn abort(&self) -> Result<()> {
if let Some(handle) = self.try_consume_handle() {
if !handle.is_finished() {
handle.abort();
self.status_manager.update(BacktestStatus::Aborted);
}
return handle.await.map_err(BacktestError::TaskJoin);
}
Err(BacktestError::ProcessAlreadyConsumed)
}
}
#[async_trait]
impl TuiControllerShutdown for BacktestController {
async fn tui_shutdown(&self) -> TuiResult<()> {
let _ = self.abort().await;
Ok(())
}
}