use super::{delivery_group, Core};
use crate::{
error::Result,
messages::RoutingMsgUtils,
network::NetworkUtils,
node::Node,
peer::PeerUtils,
routing::{command::Command, enduser_registry::SocketId},
section::{NodeStateUtils, SectionAuthorityProviderUtils, SectionUtils},
Error, Event,
};
use bytes::Bytes;
use secured_linked_list::SecuredLinkedList;
use sn_messaging::{
node::{Network, NodeState, Peer, Proposal, RoutingMsg, Section, Variant},
section_info::Error as TargetSectionError,
DestInfo, EndUser, Itinerary, SectionAuthorityProvider, SrcLocation,
};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use xor_name::{Prefix, XorName};
impl Core {
pub fn first_node(node: Node, event_tx: mpsc::Sender<Event>) -> Result<Self> {
let (section, section_key_share) = Section::first_node(node.peer())?;
Ok(Self::new(node, section, Some(section_key_share), event_tx))
}
pub fn get_enduser_by_addr(&self, sender: &SocketAddr) -> Option<&EndUser> {
self.end_users.get_enduser_by_addr(sender)
}
pub fn get_socket_addr(&self, id: SocketId) -> Option<&SocketAddr> {
self.end_users.get_socket_addr(id)
}
pub fn try_add(&mut self, sender: SocketAddr) -> Result<EndUser> {
let section_prefix = self.section.prefix();
self.end_users.try_add(sender, section_prefix)
}
pub fn node(&self) -> &Node {
&self.node
}
pub fn section(&self) -> &Section {
&self.section
}
pub fn section_chain(&self) -> &SecuredLinkedList {
self.section.chain()
}
pub fn network(&self) -> &Network {
&self.network
}
pub fn is_elder(&self) -> bool {
self.section.is_elder(&self.node.name())
}
pub fn sign_with_section_key_share(
&self,
data: &[u8],
public_key: &bls::PublicKey,
) -> Result<bls::SignatureShare> {
self.section_keys_provider.sign_with(data, public_key)
}
pub fn public_key_set(&self) -> Result<bls::PublicKeySet> {
Ok(self
.section_keys_provider
.key_share()?
.public_key_set
.clone())
}
pub fn section_key(&self, prefix: &Prefix) -> Option<bls::PublicKey> {
if prefix == self.section.prefix() || prefix.is_extension_of(self.section.prefix()) {
Some(*self.section.chain().last_key())
} else {
self.network.key_by_prefix(prefix).or_else(|| {
if self.is_elder() {
Some(*self.section.chain().root_key())
} else {
None
}
})
}
}
pub fn matching_section(&self, name: &XorName) -> Result<SectionAuthorityProvider> {
if self.section.prefix().matches(name) {
Ok(self.section.authority_provider().clone())
} else {
self.network.section_by_name(name)
}
}
pub fn our_index(&self) -> Result<usize> {
Ok(self.section_keys_provider.key_share()?.index)
}
pub async fn send_event(&self, event: Event) {
if self.event_tx.clone().send(event).await.is_err() {
error!("Event receiver has been closed");
}
}
pub async fn relay_message(&self, msg: &RoutingMsg) -> Result<Option<Command>> {
let (presumed_targets, dg_size) = delivery_group::delivery_targets(
&msg.dst,
&self.node.name(),
&self.section,
&self.network,
)?;
let target_name = msg.dst.name().ok_or(Error::CannotRoute)?;
let dest_pk = self.section_key_by_name(&target_name);
let mut targets = vec![];
for peer in presumed_targets {
if self
.msg_filter
.filter_outgoing(msg, peer.name())
.await
.is_new()
{
let _ = targets.push((*peer.name(), *peer.addr()));
}
}
if targets.is_empty() {
return Ok(None);
}
trace!(
"relay {:?} to first {:?} of {:?} (Section PK: {:?})",
msg,
dg_size,
targets,
msg.section_pk,
);
let command = Command::send_message_to_nodes(
targets,
dg_size,
msg.clone(),
DestInfo {
dest: XorName::random(),
dest_section_pk: dest_pk,
},
);
Ok(Some(command))
}
#[allow(unused)]
pub fn check_key_status(&self, bls_pk: &bls::PublicKey) -> Result<(), TargetSectionError> {
let elders_candidates = self.section.promote_and_demote_elders(&self.node.name());
if !elders_candidates.is_empty() {
trace!("Non empty elder candidates {:?}", elders_candidates);
trace!(
"Current authority_provider {:?}",
self.section.authority_provider()
);
return Err(TargetSectionError::DkgInProgress);
}
if !self.section.chain().has_key(bls_pk) {
return Err(TargetSectionError::UnrecognizedSectionKey);
}
if bls_pk != self.section.chain().last_key() {
return if let Ok(public_key_set) = self.public_key_set() {
Err(TargetSectionError::TargetSectionInfoOutdated(
SectionAuthorityProvider {
prefix: *self.section.prefix(),
public_key_set,
elders: self
.section
.section_signed_authority_provider()
.value
.elders(),
},
))
} else {
Err(TargetSectionError::DkgInProgress)
};
}
Ok(())
}
pub async fn send_user_message(
&self,
itinerary: Itinerary,
content: Bytes,
) -> Result<Vec<Command>> {
let are_we_src = itinerary.src.equals(&self.node.name())
|| itinerary.src.equals(&self.section().prefix().name());
if !are_we_src {
error!(
"Not sending user message {:?} -> {:?}: we are not the source location",
itinerary.src, itinerary.dst
);
return Err(Error::InvalidSrcLocation);
}
if matches!(itinerary.src, SrcLocation::EndUser(_)) {
return Err(Error::InvalidSrcLocation);
}
let dst_name = if let Some(name) = itinerary.dst_name() {
name
} else {
trace!(
"Not sending user message {:?} -> {:?}: direct dst not supported",
itinerary.src,
itinerary.dst
);
return Err(Error::InvalidDstLocation);
};
let dest_section_pk = self.section_key_by_name(&dst_name);
let variant = Variant::UserMessage(content.to_vec());
let msg = if itinerary.aggregate_at_dst() {
RoutingMsg::for_dst_accumulation(
self.section_keys_provider.key_share()?,
itinerary.src.name(),
itinerary.dst,
variant,
self.section.chain().clone(),
)?
} else if itinerary.aggregate_at_src() {
let proposal = self.create_aggregate_at_src_proposal(itinerary.dst, variant, None)?;
return self.propose(proposal);
} else {
RoutingMsg::single_src(
&self.node,
itinerary.dst,
variant,
self.section.authority_provider().section_key(),
)?
};
let mut commands = vec![];
if itinerary
.dst
.contains(&self.node.name(), self.section.prefix())
{
commands.push(Command::HandleMessage {
sender: Some(self.node.addr),
message: msg.clone(),
dest_info: DestInfo {
dest: dst_name,
dest_section_pk,
},
});
}
commands.extend(self.relay_message(&msg).await?);
Ok(commands)
}
pub fn set_joins_allowed(&self, joins_allowed: bool) -> Result<Vec<Command>> {
let mut commands = Vec::new();
if self.is_elder() && joins_allowed != self.joins_allowed {
commands.extend(self.propose(Proposal::JoinsAllowed(joins_allowed))?);
}
Ok(commands)
}
pub async fn make_online_proposal(
&self,
peer: Peer,
previous_name: Option<XorName>,
destination_key: Option<bls::PublicKey>,
) -> Result<Vec<Command>> {
self.propose(Proposal::Online {
node_state: NodeState::joined(peer),
previous_name,
destination_key,
})
}
}