use stratum_apps::utils::types::DownstreamId;
use tracing::{debug, warn};
use crate::error::{Action, PoolError, PoolErrorKind};
#[derive(Debug, Clone)]
pub enum StatusSender {
Downstream {
downstream_id: DownstreamId,
tx: async_channel::Sender<Status>,
},
TemplateReceiver(async_channel::Sender<Status>),
ChannelManager(async_channel::Sender<Status>),
}
#[derive(Debug, PartialEq, Eq)]
pub enum StatusType {
Downstream(DownstreamId),
TemplateReceiver,
ChannelManager,
}
impl From<&StatusSender> for StatusType {
fn from(value: &StatusSender) -> Self {
match value {
StatusSender::ChannelManager(_) => StatusType::ChannelManager,
StatusSender::Downstream {
downstream_id,
tx: _,
} => StatusType::Downstream(*downstream_id),
StatusSender::TemplateReceiver(_) => StatusType::TemplateReceiver,
}
}
}
#[cfg_attr(not(test), hotpath::measure_all)]
impl StatusSender {
pub async fn send(&self, status: Status) -> Result<(), async_channel::SendError<Status>> {
match self {
Self::Downstream { downstream_id, tx } => {
debug!(
"Sending status from Downstream [{}]: {:?}",
downstream_id, status.state
);
tx.send(status).await
}
Self::TemplateReceiver(tx) => {
debug!("Sending status from TemplateReceiver: {:?}", status.state);
tx.send(status).await
}
Self::ChannelManager(tx) => {
debug!("Sending status from ChannelManager: {:?}", status.state);
tx.send(status).await
}
}
}
}
#[derive(Debug)]
pub enum State {
DownstreamShutdown {
downstream_id: DownstreamId,
reason: PoolErrorKind,
},
TemplateReceiverShutdown(PoolErrorKind),
ChannelManagerShutdown(PoolErrorKind),
}
#[derive(Debug)]
pub struct Status {
pub state: State,
}
#[cfg_attr(not(test), hotpath::measure)]
async fn send_status<O>(sender: &StatusSender, error: PoolError<O>) -> bool {
use Action::*;
match error.action {
Log => {
warn!("Log-only error from {:?}: {:?}", sender, error.kind);
false
}
Disconnect(downstream_id) => {
let state = State::DownstreamShutdown {
downstream_id,
reason: error.kind,
};
if let Err(e) = sender.send(Status { state }).await {
tracing::error!(
"Failed to send downstream shutdown status from {:?}: {:?}",
sender,
e
);
std::process::abort();
}
matches!(sender, StatusSender::Downstream { .. })
}
Shutdown => {
let state = match sender {
StatusSender::ChannelManager(_) => {
warn!(
"Channel Manager shutdown requested due to error: {:?}",
error.kind
);
State::ChannelManagerShutdown(error.kind)
}
StatusSender::TemplateReceiver(_) => {
warn!(
"Template Receiver shutdown requested due to error: {:?}",
error.kind
);
State::TemplateReceiverShutdown(error.kind)
}
_ => State::ChannelManagerShutdown(error.kind),
};
if let Err(e) = sender.send(Status { state }).await {
tracing::error!("Failed to send shutdown status from {:?}: {:?}", sender, e);
std::process::abort();
}
true
}
}
}
pub async fn handle_error<O>(sender: &StatusSender, e: PoolError<O>) -> bool {
send_status(sender, e).await
}