use super::{delivery_group, Comm, Node};
use crate::node::{
api::cmds::Cmd,
error::{Error, Result},
Event,
};
use crate::UsedSpace;
use sn_interface::messaging::{
system::{JoinResponse, MembershipState, NodeState, SigShare, SystemMsg},
WireMsg,
};
use sn_interface::network_knowledge::{
NetworkKnowledge, NodeInfo, SectionAuthorityProvider, SectionKeyShare,
};
use sn_interface::types::{log_markers::LogMarker, Peer};
use secured_linked_list::SecuredLinkedList;
use std::{collections::BTreeSet, net::SocketAddr, path::PathBuf};
use tokio::sync::mpsc;
use xor_name::XorName;
impl Node {
pub(crate) async fn first_node(
comm: Comm,
mut node: NodeInfo,
event_tx: mpsc::Sender<Event>,
used_space: UsedSpace,
root_storage_dir: PathBuf,
genesis_sk_set: bls::SecretKeySet,
) -> Result<Self> {
node.addr = comm.our_connection_info();
let (section, section_key_share) =
NetworkKnowledge::first_node(node.peer(), genesis_sk_set).await?;
Self::new(
comm,
node,
section,
Some(section_key_share),
event_tx,
used_space,
root_storage_dir,
)
.await
}
pub(crate) async fn relocate(
&self,
mut new_node: NodeInfo,
new_section: NetworkKnowledge,
) -> Result<()> {
self.network_knowledge.relocated_to(new_section).await?;
new_node.addr = self.comm.our_connection_info();
let mut our_node = self.info.write().await;
*our_node = new_node;
Ok(())
}
pub(crate) fn network_knowledge(&self) -> &NetworkKnowledge {
&self.network_knowledge
}
pub(crate) async fn section_chain(&self) -> SecuredLinkedList {
self.network_knowledge.section_chain().await
}
pub(crate) async fn is_elder(&self) -> bool {
self.network_knowledge
.is_elder(&self.info.read().await.name())
.await
}
pub(crate) async fn is_not_elder(&self) -> bool {
!self.is_elder().await
}
pub(crate) fn our_connection_info(&self) -> SocketAddr {
self.comm.our_connection_info()
}
pub(crate) async fn sign_with_section_key_share(
&self,
data: &[u8],
public_key: &bls::PublicKey,
) -> Result<(usize, bls::SignatureShare)> {
self.section_keys_provider
.sign_with(data, public_key)
.await
.map_err(Error::from)
}
pub(crate) async fn public_key_set(&self) -> Result<bls::PublicKeySet> {
Ok(self.key_share().await?.public_key_set)
}
pub(crate) async fn matching_section(
&self,
name: &XorName,
) -> Result<SectionAuthorityProvider> {
self.network_knowledge
.section_by_name(name)
.map_err(Error::from)
}
pub(crate) async fn key_share(&self) -> Result<SectionKeyShare> {
let section_key = self.network_knowledge.section_key().await;
self.section_keys_provider
.key_share(§ion_key)
.await
.map_err(Error::from)
}
pub(crate) async fn send_event(&self, event: Event) {
if self.event_tx.clone().send(event).await.is_err() {
error!("Event receiver has been closed");
}
}
pub(crate) async fn handle_timeout(&self, token: u64) -> Result<Vec<Cmd>> {
self.dkg_voter.handle_timeout(
&self.info.read().await.clone(),
token,
self.network_knowledge().section_key().await,
)
}
pub(crate) async fn send_msg_to_nodes(&self, mut wire_msg: WireMsg) -> Result<Option<Cmd>> {
let dst_location = wire_msg.dst_location();
let (targets, dg_size) = delivery_group::delivery_targets(
dst_location,
&self.info.read().await.name(),
&self.network_knowledge,
)
.await?;
let target_name = dst_location.name();
if self.is_elder().await
&& targets.len() > 1
&& dst_location.is_to_node()
&& self.network_knowledge.prefix().await.matches(&target_name)
{
return Ok(None);
}
trace!(
"relay {:?} to first {:?} of {:?} (Section PK: {:?})",
wire_msg,
dg_size,
targets,
wire_msg.src_section_pk(),
);
let dst_pk = self.section_key_by_name(&target_name).await;
wire_msg.set_dst_section_pk(dst_pk);
let cmd = Cmd::SendMsgDeliveryGroup {
recipients: targets.into_iter().collect(),
delivery_group_size: dg_size,
wire_msg,
};
Ok(Some(cmd))
}
pub(crate) async fn promote_and_demote_elders(&self) -> Result<Vec<Cmd>> {
self.promote_and_demote_elders_except(&BTreeSet::new())
.await
}
pub(crate) async fn promote_and_demote_elders_except(
&self,
excluded_names: &BTreeSet<XorName>,
) -> Result<Vec<Cmd>> {
let mut cmds = vec![];
let our_name = self.info.read().await.name();
debug!("{}", LogMarker::TriggeringPromotionAndDemotion);
for elder_candidates in self
.network_knowledge
.promote_and_demote_elders(&our_name, excluded_names)
.await
{
cmds.extend(self.send_dkg_start(elder_candidates).await?);
}
Ok(cmds)
}
pub(crate) async fn send_accepted_online_share(
&self,
peer: Peer,
previous_name: Option<XorName>,
) -> Result<Vec<Cmd>> {
let public_key_set = self.public_key_set().await?;
let section_key = public_key_set.public_key();
let node_state = NodeState {
name: peer.name(),
addr: peer.addr(),
state: MembershipState::Joined,
previous_name,
};
let serialized_details = bincode::serialize(&node_state)?;
let (index, signature_share) = self
.sign_with_section_key_share(&serialized_details, §ion_key)
.await?;
let sig_share = SigShare {
public_key_set,
index,
signature_share,
};
let members = self
.network_knowledge
.section_signed_members()
.await
.into_iter()
.map(|itr| itr.into_authed_msg())
.collect();
let signed_sap = self
.network_knowledge
.section_signed_authority_provider()
.await;
let node_msg = SystemMsg::JoinResponse(Box::new(JoinResponse::ApprovalShare {
node_state,
sig_share,
section_auth: signed_sap.value.to_msg(),
section_signed: signed_sap.sig,
section_chain: self.network_knowledge.section_chain().await,
members,
}));
Ok(vec![
self.send_direct_msg(peer, node_msg, section_key).await?,
])
}
}