use std::sync::{Arc, Mutex};
use tokio::{
sync::broadcast::{self, error::RecvError},
time,
};
use crate::{
db::Database,
signal::{
SignalEvaluator,
config::{LiveSignalConfig, LiveSignalControllerConfig},
error::SignalOperatorError,
process::{LiveSignalProcess, error::SignalProcessFatalError},
},
sync::SyncReader,
util::AbortOnDropHandle,
};
use super::{
core::{Signal, WrappedSignalEvaluator},
error::{Result, SignalError},
state::{
LiveSignalReader, LiveSignalReceiver, LiveSignalStatus, LiveSignalStatusManager,
LiveSignalTransmitter, LiveSignalUpdate,
},
};
#[derive(Debug)]
pub struct LiveSignalController<S: Signal> {
config: LiveSignalControllerConfig,
handle: Mutex<Option<AbortOnDropHandle<()>>>,
shutdown_tx: broadcast::Sender<()>,
status_manager: Arc<LiveSignalStatusManager<S>>,
}
impl<S: Signal> LiveSignalController<S> {
fn new(
config: &LiveSignalConfig,
handle: AbortOnDropHandle<()>,
shutdown_tx: broadcast::Sender<()>,
status_manager: Arc<LiveSignalStatusManager<S>>,
) -> Arc<Self> {
Arc::new(Self {
config: config.into(),
handle: Mutex::new(Some(handle)),
shutdown_tx,
status_manager,
})
}
pub fn reader(&self) -> Arc<dyn LiveSignalReader<S>> {
self.status_manager.clone()
}
pub fn update_receiver(&self) -> LiveSignalReceiver<S> {
self.status_manager.update_receiver()
}
pub fn status_snapshot(&self) -> LiveSignalStatus {
self.status_manager.status_snapshot()
}
fn try_consume_handle(&self) -> Option<AbortOnDropHandle<()>> {
self.handle
.lock()
.expect("`LiveSignalController` mutex can't be poisoned")
.take()
}
pub async fn shutdown(&self) -> Result<()> {
let Some(mut handle) = self.try_consume_handle() else {
return Err(SignalError::LiveSignalAlreadyShutdown);
};
if handle.is_finished() {
let status = self.status_manager.status_snapshot();
return Err(SignalError::LiveSignalAlreadyTerminated(status));
}
self.status_manager
.update(LiveSignalStatus::ShutdownInitiated);
let shutdown_send_res = self.shutdown_tx.send(()).map_err(|e| {
handle.abort();
SignalProcessFatalError::SendShutdownSignalFailed(e)
});
let shutdown_res = match shutdown_send_res {
Ok(_) => {
tokio::select! {
join_res = &mut handle => {
join_res.map_err(SignalProcessFatalError::LiveSignalProcessTaskJoin)
}
_ = time::sleep(self.config.shutdown_timeout()) => {
handle.abort();
Err(SignalProcessFatalError::ShutdownTimeout)
}
}
}
Err(e) => Err(e),
};
if let Err(e) = shutdown_res {
let e_ref = Arc::new(e);
self.status_manager.update(e_ref.clone().into());
return Err(SignalError::SignalShutdownFailed(e_ref));
}
self.status_manager.update(LiveSignalStatus::Shutdown);
Ok(())
}
pub async fn until_stopped(&self) -> LiveSignalStatus {
let mut signal_rx = self.update_receiver();
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
loop {
match signal_rx.recv().await {
Ok(signal_update) => {
if let LiveSignalUpdate::Status(status) = signal_update
&& status.is_stopped()
{
return status;
}
}
Err(RecvError::Lagged(_)) => {
let status = self.status_snapshot();
if status.is_stopped() {
return status;
}
}
Err(RecvError::Closed) => return self.status_snapshot(),
}
}
}
}
pub struct LiveSignalEngine<S: Signal> {
config: LiveSignalConfig,
db: Arc<Database>,
sync_reader: Arc<dyn SyncReader>,
evaluators: Vec<WrappedSignalEvaluator<S>>,
status_manager: Arc<LiveSignalStatusManager<S>>,
update_tx: LiveSignalTransmitter<S>,
}
impl<S: Signal> LiveSignalEngine<S> {
pub fn new(
config: impl Into<LiveSignalConfig>,
db: Arc<Database>,
sync_reader: Arc<dyn SyncReader>,
evaluators: Vec<Box<dyn SignalEvaluator<S>>>,
) -> Result<Self> {
if evaluators.is_empty() {
return Err(SignalError::Operator(
SignalOperatorError::EmptyEvaluatorsVec,
));
}
let evaluators: Vec<_> = evaluators
.into_iter()
.map(WrappedSignalEvaluator::new)
.collect();
let (update_tx, _) = broadcast::channel::<LiveSignalUpdate<S>>(1_000);
let status_manager = LiveSignalStatusManager::new(update_tx.clone());
Ok(Self {
config: config.into(),
db,
sync_reader,
evaluators,
status_manager,
update_tx,
})
}
pub fn reader(&self) -> Arc<dyn LiveSignalReader<S>> {
self.status_manager.clone()
}
pub fn update_receiver(&self) -> LiveSignalReceiver<S> {
self.status_manager.update_receiver()
}
pub fn status_snapshot(&self) -> LiveSignalStatus {
self.status_manager.status_snapshot()
}
pub fn start(self) -> Arc<LiveSignalController<S>> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
let handle = LiveSignalProcess::spawn(
&self.config,
self.db,
self.evaluators,
shutdown_tx.clone(),
self.sync_reader,
self.status_manager.clone(),
self.update_tx,
);
LiveSignalController::new(&self.config, handle, shutdown_tx, self.status_manager)
}
}