mod anti_entropy;
mod api;
mod connectivity;
mod delivery_group;
mod messaging;
use super::{command::Command, enduser_registry::EndUserRegistry, split_barrier::SplitBarrier};
use crate::{
dkg::{DkgVoter, ProposalAggregator},
error::Result,
event::{Elders, Event, NodeElderChange},
message_filter::MessageFilter,
messages::RoutingMsgUtils,
network::NetworkUtils,
node::Node,
relocation::RelocateState,
section::{SectionAuthorityProviderUtils, SectionKeyShare, SectionKeysProvider, SectionUtils},
};
use itertools::Itertools;
use resource_proof::ResourceProof;
use secured_linked_list::SecuredLinkedList;
use sn_messaging::node::SignatureAggregator;
use sn_messaging::{
node::{Network, Proposal, RoutingMsg, Section, SectionSigned, Variant},
DestInfo, DstLocation, MessageId, SectionAuthorityProvider,
};
use std::collections::BTreeSet;
use tokio::sync::mpsc;
use xor_name::{Prefix, XorName};
pub const RESOURCE_PROOF_DATA_SIZE: usize = 64;
pub const RESOURCE_PROOF_DIFFICULTY: u8 = 2;
const KEY_CACHE_SIZE: u8 = 5;
pub(crate) struct Core {
node: Node,
section: Section,
network: Network,
section_keys_provider: SectionKeysProvider,
message_aggregator: SignatureAggregator,
proposal_aggregator: ProposalAggregator,
split_barrier: SplitBarrier,
dkg_voter: DkgVoter,
relocate_state: Option<RelocateState>,
msg_filter: MessageFilter,
pub(super) event_tx: mpsc::Sender<Event>,
joins_allowed: bool,
resource_proof: ResourceProof,
end_users: EndUserRegistry,
}
impl Core {
pub fn new(
node: Node,
section: Section,
section_key_share: Option<SectionKeyShare>,
event_tx: mpsc::Sender<Event>,
) -> Self {
let section_keys_provider = SectionKeysProvider::new(KEY_CACHE_SIZE, section_key_share);
Self {
node,
section,
network: Network::new(),
section_keys_provider,
proposal_aggregator: ProposalAggregator::default(),
split_barrier: SplitBarrier::new(),
message_aggregator: SignatureAggregator::default(),
dkg_voter: DkgVoter::default(),
relocate_state: None,
msg_filter: MessageFilter::new(),
event_tx,
joins_allowed: true,
resource_proof: ResourceProof::new(RESOURCE_PROOF_DATA_SIZE, RESOURCE_PROOF_DIFFICULTY),
end_users: EndUserRegistry::new(),
}
}
pub async fn add_to_filter(&mut self, msg_id: &MessageId) -> bool {
self.msg_filter.add_to_filter(msg_id).await
}
async fn check_for_entropy(
&self,
msg: &RoutingMsg,
dest_info: DestInfo,
) -> Result<(Vec<Command>, bool)> {
if !self.is_elder() {
return Ok((vec![], true));
}
let (actions, can_be_executed) =
anti_entropy::process(&self.node, &self.section, msg, dest_info)?;
let mut commands = vec![];
for msg in actions.send {
commands.extend(self.relay_message(&msg).await?);
}
Ok((commands, can_be_executed))
}
pub(crate) fn state_snapshot(&self) -> StateSnapshot {
StateSnapshot {
is_elder: self.is_elder(),
last_key: *self.section.chain().last_key(),
prefix: *self.section.prefix(),
elders: self.section().authority_provider().names(),
}
}
pub(crate) fn update_section_knowledge(
&mut self,
section_auth: SectionSigned<SectionAuthorityProvider>,
section_chain: SecuredLinkedList,
) {
let prefix = section_auth.value.prefix;
if self
.network
.update_section(section_auth, None, §ion_chain)
{
info!("Neighbour section knowledge updated: {:?}", prefix);
} else {
warn!("Neighbour section update failed");
}
}
pub(crate) async fn update_state(&mut self, old: StateSnapshot) -> Result<Vec<Command>> {
let mut commands = vec![];
let new = self.state_snapshot();
self.section_keys_provider
.finalise_dkg(self.section.chain().last_key());
if new.prefix != old.prefix {
info!("Split");
}
if new.last_key != old.last_key {
self.msg_filter.reset().await;
if new.is_elder {
info!(
"Section updated: prefix: ({:b}), key: {:?}, elders: {}",
new.prefix,
new.last_key,
self.section.authority_provider().peers().format(", ")
);
if self.section_keys_provider.has_key_share() {
commands.extend(self.promote_and_demote_elders()?);
commands.extend(self.propose(Proposal::JoinsAllowed(self.joins_allowed))?);
}
self.print_network_stats();
let section_auth = self.section.section_signed_authority_provider();
let variant = Variant::SectionKnowledge {
src_info: (section_auth.clone(), self.section.chain().clone()),
msg: None,
};
for sap in self.network.all() {
let msg = RoutingMsg::single_src(
&self.node,
DstLocation::DirectAndUnrouted,
variant.clone(),
section_auth.value.section_key(),
)?;
let targets: Vec<_> = sap
.elders()
.iter()
.map(|(name, addr)| (*name, *addr))
.collect();
let len = targets.len();
let dest_info = DestInfo {
dest: XorName::random(),
dest_section_pk: sap.section_key(),
};
trace!("Sending updated SectionInfo to all known sections");
commands.push(Command::send_message_to_nodes(targets, len, msg, dest_info));
}
}
if new.is_elder || old.is_elder {
commands.extend(self.send_sync(self.section.clone(), self.network.clone())?);
}
let current: BTreeSet<_> = self.section.authority_provider().names();
let added = current.difference(&old.elders).copied().collect();
let removed = old.elders.difference(¤t).copied().collect();
let remaining = old.elders.intersection(¤t).copied().collect();
let elders = Elders {
prefix: new.prefix,
key: new.last_key,
remaining,
added,
removed,
};
let self_status_change = if !old.is_elder && new.is_elder {
info!("Promoted to elder");
NodeElderChange::Promoted
} else if old.is_elder && !new.is_elder {
info!("Demoted");
self.network = Network::new();
self.section_keys_provider = SectionKeysProvider::new(KEY_CACHE_SIZE, None);
NodeElderChange::Demoted
} else {
NodeElderChange::None
};
let sibling_elders = if new.prefix != old.prefix {
self.network.get(&new.prefix.sibling()).map(|sec_auth| {
let current: BTreeSet<_> = sec_auth.names();
let added = current.difference(&old.elders).copied().collect();
let removed = old.elders.difference(¤t).copied().collect();
let remaining = old.elders.intersection(¤t).copied().collect();
Elders {
prefix: new.prefix.sibling(),
key: sec_auth.section_key(),
remaining,
added,
removed,
}
})
} else {
None
};
let event = if let Some(sibling_elders) = sibling_elders {
Event::SectionSplit {
elders,
sibling_elders,
self_status_change,
}
} else {
Event::EldersChanged {
elders,
self_status_change,
}
};
self.send_event(event).await;
}
if !new.is_elder {
commands.extend(self.return_relocate_promise());
}
Ok(commands)
}
pub(crate) fn section_key_by_name(&self, name: &XorName) -> bls::PublicKey {
if self.section.prefix().matches(name) {
*self.section.chain().last_key()
} else if let Ok(key) = self.network.key_by_name(name) {
key
} else if self.section.prefix().sibling().matches(name) {
*self.section.chain().prev_key()
} else {
*self.section.chain().root_key()
}
}
pub(crate) fn print_network_stats(&self) {
self.network
.network_stats(self.section.authority_provider())
.print()
}
}
pub(crate) struct StateSnapshot {
is_elder: bool,
last_key: bls::PublicKey,
prefix: Prefix,
elders: BTreeSet<XorName>,
}