use crate::node::flow_ctrl::FlowCtrl;
use crate::node::STANDARD_CHANNEL_SIZE;
use sn_fault_detection::{FaultDetection, IssueType};
use std::collections::BTreeSet;
use tokio::sync::mpsc::{self, Receiver, Sender};
use xor_name::XorName;
pub(crate) struct FaultChannels {
pub(crate) cmds_sender: Sender<FaultsCmd>,
pub(crate) faulty_nodes_receiver: Receiver<Vec<XorName>>,
}
pub(crate) enum FaultsCmd {
AddNode(XorName),
UpdateNodes(BTreeSet<XorName>, BTreeSet<XorName>),
TrackIssue(XorName, IssueType),
UntrackIssue(XorName, IssueType),
GetFaultyNodes,
}
impl FlowCtrl {
pub(crate) fn start_fault_detection(
mut tracker: FaultDetection,
mut fault_cmds_from_node: Receiver<FaultsCmd>,
) -> Receiver<Vec<XorName>> {
let (fault_nodes_sender, faulty_nodes_receiver) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let _handle = tokio::task::spawn(async move {
while let Some(cmd) = fault_cmds_from_node.recv().await {
match cmd {
FaultsCmd::AddNode(node) => tracker.add_new_node(node),
FaultsCmd::UpdateNodes(adults, elders) => {
tracker.update_and_only_retain_members(adults, elders)
}
FaultsCmd::TrackIssue(node, issue) => tracker.track_issue(node, issue),
FaultsCmd::UntrackIssue(node, issue) => {
debug!("Attempting to remove {issue:?} from {node:?}");
match issue {
IssueType::AeProbeMsg => tracker.ae_update_msg_received(&node),
IssueType::Dkg => tracker.dkg_ack_fulfilled(&node),
IssueType::ElderVoting => tracker.elder_vote_received(&node),
_ => {}
};
}
FaultsCmd::GetFaultyNodes => {
if let Err(error) =
fault_nodes_sender.send(tracker.get_faulty_nodes()).await
{
warn!(
"Could not send faulty nodes through the mpsc channel: {error:?}"
);
}
}
}
}
});
faulty_nodes_receiver
}
pub(crate) async fn get_faulty_node_names(&mut self) -> Vec<XorName> {
if let Err(error) = self
.fault_channels
.cmds_sender
.send(FaultsCmd::GetFaultyNodes)
.await
{
warn!("Could not send FaultsCmd through fault_cmds_tx: {error}");
vec![]
} else {
if let Some(faulty_nodes) = self.fault_channels.faulty_nodes_receiver.recv().await {
faulty_nodes
} else {
warn!("faulty_nodes_rx channel closed?");
vec![]
}
}
}
}