use stratum_apps::utils::types::DownstreamId;
use tracing::{debug, warn};
use crate::error::{Action, TproxyError, TproxyErrorKind};
#[derive(Debug, Clone)]
pub enum StatusSender {
Downstream {
downstream_id: DownstreamId,
tx: async_channel::Sender<Status>,
},
Sv1Server(async_channel::Sender<Status>),
ChannelManager(async_channel::Sender<Status>),
Upstream(async_channel::Sender<Status>),
}
impl StatusSender {
#[cfg_attr(not(test), hotpath::measure)]
pub async fn send(&self, status: Status) -> Result<(), async_channel::SendError<Status>> {
match self {
Self::Downstream { downstream_id, tx } => {
debug!(
"Sending status from Downstream [{}]: {:?}",
downstream_id, status.state
);
tx.send(status).await
}
Self::Sv1Server(tx) => {
debug!("Sending status from Sv1Server: {:?}", status.state);
tx.send(status).await
}
Self::ChannelManager(tx) => {
debug!("Sending status from ChannelManager: {:?}", status.state);
tx.send(status).await
}
Self::Upstream(tx) => {
debug!("Sending status from Upstream: {:?}", status.state);
tx.send(status).await
}
}
}
}
#[derive(Debug)]
pub enum State {
DownstreamShutdown {
downstream_id: DownstreamId,
reason: TproxyErrorKind,
},
Sv1ServerShutdown(TproxyErrorKind),
ChannelManagerShutdown(TproxyErrorKind),
UpstreamShutdown(TproxyErrorKind),
}
#[derive(Debug)]
pub struct Status {
pub state: State,
}
#[cfg_attr(not(test), hotpath::measure)]
async fn send_status<O>(sender: &StatusSender, error: TproxyError<O>) -> bool {
use Action::*;
match error.action {
Log => {
warn!("Log-only error from {:?}: {:?}", sender, error.kind);
false
}
Disconnect(downstream_id) => {
let state = State::DownstreamShutdown {
downstream_id,
reason: error.kind,
};
if let Err(e) = sender.send(Status { state }).await {
tracing::error!(
"Failed to send downstream shutdown status from {:?}: {:?}",
sender,
e
);
std::process::abort();
}
matches!(sender, StatusSender::Downstream { .. })
}
Fallback => {
let state = State::UpstreamShutdown(error.kind);
if let Err(e) = sender.send(Status { state }).await {
tracing::error!("Failed to send fallback status from {:?}: {:?}", sender, e);
std::process::abort();
}
matches!(sender, StatusSender::Upstream { .. })
}
Shutdown => {
let state = match sender {
StatusSender::ChannelManager(_) => {
warn!(
"Channel Manager shutdown requested due to error: {:?}",
error.kind
);
State::ChannelManagerShutdown(error.kind)
}
StatusSender::Sv1Server(_) => {
warn!(
"Sv1Server shutdown requested due to error: {:?}",
error.kind
);
State::Sv1ServerShutdown(error.kind)
}
_ => State::ChannelManagerShutdown(error.kind),
};
if let Err(e) = sender.send(Status { state }).await {
tracing::error!("Failed to send shutdown status from {:?}: {:?}", sender, e);
std::process::abort();
}
true
}
}
}
#[cfg_attr(not(test), hotpath::measure)]
pub async fn handle_error<O>(sender: &StatusSender, e: TproxyError<O>) -> bool {
send_status(sender, e).await
}