use std::{
fmt,
sync::{Arc, Mutex, MutexGuard},
};
use tokio::sync::broadcast;
use crate::sync::SyncStatusNotSynced;
use super::{
core::Signal,
process::error::{SignalProcessFatalError, SignalProcessRecoverableError},
};
#[derive(Debug, Clone)]
pub enum LiveSignalStatusNotRunning {
NotInitiated,
Starting,
WaitingForSync(SyncStatusNotSynced),
Failed(Arc<SignalProcessRecoverableError>),
Restarting,
}
impl PartialEq for LiveSignalStatusNotRunning {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NotInitiated, Self::NotInitiated)
| (Self::Starting, Self::Starting)
| (Self::Restarting, Self::Restarting) => true,
(Self::WaitingForSync(a), Self::WaitingForSync(b)) => a == b,
(Self::Failed(a), Self::Failed(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
impl fmt::Display for LiveSignalStatusNotRunning {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotInitiated => write!(f, "Not initiated"),
Self::Starting => write!(f, "Starting"),
Self::WaitingForSync(status) => {
write!(f, "Waiting for sync ({status})")
}
Self::Failed(error) => write!(f, "Failed: {error}"),
Self::Restarting => write!(f, "Restarting"),
}
}
}
#[derive(Debug, Clone)]
pub enum LiveSignalStatus {
NotRunning(LiveSignalStatusNotRunning),
Running,
ShutdownInitiated,
Shutdown,
Terminated(Arc<SignalProcessFatalError>),
}
impl PartialEq for LiveSignalStatus {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::NotRunning(a), Self::NotRunning(b)) => a == b,
(Self::Running, Self::Running)
| (Self::ShutdownInitiated, Self::ShutdownInitiated)
| (Self::Shutdown, Self::Shutdown) => true,
(Self::Terminated(a), Self::Terminated(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
impl LiveSignalStatus {
pub fn is_stopped(&self) -> bool {
matches!(self, Self::Shutdown | Self::Terminated(_))
}
}
impl fmt::Display for LiveSignalStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotRunning(status) => write!(f, "Not running ({status})"),
Self::Running => write!(f, "Running"),
Self::ShutdownInitiated => write!(f, "Shutdown initiated"),
Self::Shutdown => write!(f, "Shutdown"),
Self::Terminated(error) => write!(f, "Terminated: {error}"),
}
}
}
impl From<LiveSignalStatusNotRunning> for LiveSignalStatus {
fn from(value: LiveSignalStatusNotRunning) -> Self {
Self::NotRunning(value)
}
}
impl From<SignalProcessRecoverableError> for LiveSignalStatus {
fn from(value: SignalProcessRecoverableError) -> Self {
LiveSignalStatusNotRunning::Failed(Arc::new(value)).into()
}
}
impl From<Arc<SignalProcessFatalError>> for LiveSignalStatus {
fn from(value: Arc<SignalProcessFatalError>) -> Self {
Self::Terminated(value)
}
}
impl From<SignalProcessFatalError> for LiveSignalStatus {
fn from(value: SignalProcessFatalError) -> Self {
Arc::new(value).into()
}
}
#[derive(Debug, Clone)]
pub enum LiveSignalUpdate<S: Signal> {
Status(LiveSignalStatus),
Signal(S),
}
impl<S: Signal> From<LiveSignalStatus> for LiveSignalUpdate<S> {
fn from(value: LiveSignalStatus) -> Self {
Self::Status(value)
}
}
pub(crate) type LiveSignalTransmitter<S> = broadcast::Sender<LiveSignalUpdate<S>>;
pub type LiveSignalReceiver<S> = broadcast::Receiver<LiveSignalUpdate<S>>;
pub trait LiveSignalReader<S: Signal>: Send + Sync + 'static {
fn update_receiver(&self) -> LiveSignalReceiver<S>;
fn status_snapshot(&self) -> LiveSignalStatus;
}
#[derive(Debug)]
pub(crate) struct LiveSignalStatusManager<S: Signal> {
status: Mutex<LiveSignalStatus>,
update_tx: LiveSignalTransmitter<S>,
}
impl<S: Signal> LiveSignalStatusManager<S> {
pub fn new(update_tx: LiveSignalTransmitter<S>) -> Arc<Self> {
let status = Mutex::new(LiveSignalStatusNotRunning::NotInitiated.into());
Arc::new(Self { status, update_tx })
}
fn lock_status(&self) -> MutexGuard<'_, LiveSignalStatus> {
self.status
.lock()
.expect("`LiveSignalStatusManager` mutex can't be poisoned")
}
pub fn update(&self, new_status: LiveSignalStatus) {
let mut status_guard = self.lock_status();
if *status_guard == new_status {
return;
}
*status_guard = new_status.clone();
drop(status_guard);
let _ = self.update_tx.send(new_status.into());
}
}
impl<S: Signal> LiveSignalReader<S> for LiveSignalStatusManager<S> {
fn update_receiver(&self) -> LiveSignalReceiver<S> {
self.update_tx.subscribe()
}
fn status_snapshot(&self) -> LiveSignalStatus {
self.lock_status().clone()
}
}