use crate::{
behaviour::EpisubNetworkBehaviourAction, config::Config, error::FormatError,
handler::EpisubHandler, rpc, EpisubEvent,
};
use futures::Future;
use libp2p_core::PeerId;
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
NotifyHandler,
};
use multiaddr::Multiaddr;
use rand::prelude::IteratorRandom;
use std::{
collections::{HashSet, VecDeque},
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use tracing::{debug, trace};
pub struct HyParView {
config: Config,
topic: String,
local_node: AddressablePeer,
active: HashSet<AddressablePeer>,
passive: HashSet<AddressablePeer>,
last_tick: Instant,
last_shuffle: Instant,
out_events: VecDeque<EpisubNetworkBehaviourAction>,
}
impl HyParView {
pub fn new(topic: String, config: Config, local: AddressablePeer) -> Self {
Self {
config,
topic,
last_tick: Instant::now(),
last_shuffle: Instant::now(),
active: HashSet::new(),
passive: HashSet::new(),
out_events: VecDeque::new(),
local_node: local,
}
}
pub fn active(&self) -> impl Iterator<Item = &AddressablePeer> {
self.active.iter()
}
pub fn passive(&self) -> impl Iterator<Item = &AddressablePeer> {
self.passive.iter()
}
pub fn is_active(&self, peer: &PeerId) -> bool {
self.active.contains(&AddressablePeer {
peer_id: *peer,
addresses: HashSet::new(),
})
}
pub fn starved(&self) -> bool {
self.active.len() < self.config.max_active_view_size()
}
}
impl HyParView {
fn free_up_active_slot(&mut self) {
if self.starved() {
let random = self.passive.iter().choose(&mut rand::thread_rng()).cloned();
if let Some(random) = random {
debug!(
"Moving peer {} from active view to passive.",
random.peer_id
);
self.active.remove(&random);
self.out_events.push_back(
EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: random.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::Disconnect(rpc::Disconnect {
alive: true,
})),
},
},
);
self
.out_events
.push_back(EpisubNetworkBehaviourAction::GenerateEvent(
EpisubEvent::PeerRemoved(random.peer_id),
));
self.add_node_to_passive_view(random);
}
}
}
}
impl HyParView {
pub fn initiate_join(&mut self, peer: AddressablePeer) {
self
.out_events
.push_back(EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: peer.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::Join(rpc::Join {
ttl: self.config.active_walk_length() as u32,
peer: self.local_node.clone().into(),
})),
},
});
}
pub fn inject_join(
&mut self,
sender: PeerId,
peer: AddressablePeer,
ttl: u32,
) {
if self.active.contains(&peer) {
return;
}
if self.starved() {
self.add_node_to_active_view(peer.clone(), true);
} else {
self.add_node_to_passive_view(peer.clone());
}
for active_peer in self.active.iter().collect::<Vec<_>>() {
if active_peer.peer_id != peer.peer_id && active_peer.peer_id != sender {
self.out_events.push_back(
EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: active_peer.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::ForwardJoin(rpc::ForwardJoin {
peer: peer.clone().into(),
ttl,
})),
},
},
);
}
}
}
pub fn inject_forward_join(
&mut self,
peer: AddressablePeer,
ttl: usize,
local_node: PeerId,
sender: PeerId,
) {
if peer.peer_id == local_node {
debug!("ignoring cyclic forward join from this node");
return;
}
if ttl == 0 && !self.is_active(&peer.peer_id) {
self.free_up_active_slot();
}
if self.starved() {
self.add_node_to_active_view(peer.clone(), true);
} else {
self.add_node_to_passive_view(peer.clone());
}
if ttl != 0 {
for n in &self.active {
if n.peer_id == sender || n.peer_id == peer.peer_id {
continue;
}
self.out_events.push_back(
EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: n.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::ForwardJoin(rpc::ForwardJoin {
peer: peer.clone().into(),
ttl: (ttl - 1) as u32,
})),
},
},
);
}
}
}
pub fn inject_neighbor(&mut self, peer: AddressablePeer, priority: i32) {
if self.starved() && !self.is_active(&peer.peer_id) {
self.add_node_to_active_view(peer, false);
} else {
if priority == rpc::neighbor::Priority::High.into() {
self.free_up_active_slot();
self.add_node_to_active_view(peer, false);
} else {
self.inject_disconnect(peer.peer_id, true);
}
}
}
pub fn inject_disconnect(&mut self, peer: PeerId, alive: bool) {
if alive {
#[allow(clippy::needless_collect)]
let selected: Vec<_> = self
.active()
.filter(|p| p.peer_id == peer)
.cloned()
.collect();
selected
.into_iter()
.for_each(|p| self.add_node_to_passive_view(p));
} else {
self.passive.retain(|p| p.peer_id != peer);
}
if self.active.contains(&AddressablePeer {
peer_id: peer,
addresses: HashSet::new(),
}) {
self
.out_events
.push_back(EpisubNetworkBehaviourAction::GenerateEvent(
EpisubEvent::PeerRemoved(peer),
));
self.active.retain(|p| p.peer_id != peer);
debug!("disconnecting peer {}, from passive: {}", peer, !alive);
}
}
pub fn inject_shuffle(
&mut self,
from: PeerId,
ttl: u32,
nodes: Vec<AddressablePeer>,
origin: AddressablePeer,
) {
let origin_peer_id = origin.peer_id;
let mut deduped_passive: HashSet<AddressablePeer> =
nodes.clone().into_iter().collect();
self.send_shuffle_reply(
origin.clone(),
self
.active
.union(&self.passive)
.cloned()
.collect::<HashSet<_>>()
.difference(&deduped_passive)
.cloned()
.choose_multiple(
&mut rand::thread_rng(),
self.config.shuffle_max_size(),
),
);
deduped_passive.retain(|n| {
!self.active.contains(n)
&& !self.passive.contains(n)
&& n.peer_id != self.local_node.peer_id
});
deduped_passive.into_iter().for_each(|p| {
self.add_node_to_passive_view(p);
});
if ttl != 0 {
if let Some(random) = self
.active
.iter()
.filter(|n| n.peer_id != from && origin_peer_id != n.peer_id)
.choose(&mut rand::thread_rng())
{
self.out_events.push_back(
EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: random.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::Shuffle(rpc::Shuffle {
origin: origin.into(),
nodes: nodes.into_iter().map(|n| n.into()).collect(),
ttl: ttl - 1,
})),
},
},
);
} else {
self.add_node_to_active_view(origin, true);
}
}
}
pub fn inject_shuffle_reply(&mut self, params: rpc::ShuffleReply) {
let nodes = params
.nodes
.into_iter()
.filter_map(|n| n.try_into().ok())
.collect();
self.passive = self.passive.union(&nodes).cloned().collect();
while self.passive.len() > self.config.max_passive_view_size() {
self.passive.drain().next();
}
}
}
impl HyParView {
fn add_node_to_active_view(
&mut self,
node: AddressablePeer,
initiator: bool,
) {
if node.peer_id == self.local_node.peer_id {
return;
}
if self.active.insert(node.clone()) {
debug!("Adding peer to active view: {:?}", node);
if initiator {
self
.out_events
.push_back(EpisubNetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(node.peer_id)
.addresses(node.addresses.into_iter().collect())
.condition(PeerCondition::Disconnected)
.build(),
handler: EpisubHandler::new(self.config.max_transmit_size, false),
});
self.out_events.push_back(
EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: node.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::Neighbor(rpc::Neighbor {
peer: self.local_node.clone().into(),
priority: match self.active.is_empty() {
true => rpc::neighbor::Priority::High.into(),
false => rpc::neighbor::Priority::Low.into(),
},
})),
},
},
);
}
self
.out_events
.push_back(EpisubNetworkBehaviourAction::GenerateEvent(
EpisubEvent::PeerAdded(node.peer_id),
));
}
}
fn add_node_to_passive_view(&mut self, node: AddressablePeer) {
if node.peer_id != self.local_node.peer_id {
trace!("Adding peer to passive view: {:?}", node);
self.passive.insert(node);
if self.passive.len() > self.config.max_passive_view_size() {
if let Some(random) =
self.passive().choose(&mut rand::thread_rng()).cloned()
{
self.passive.remove(&random);
}
}
}
}
fn send_shuffle(&mut self, peer: PeerId) {
self
.out_events
.push_back(EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::Shuffle(rpc::Shuffle {
ttl: self.config.active_walk_length() as u32,
origin: self.local_node.clone().into(),
nodes: self
.active()
.chain(self.passive.iter())
.choose_multiple(
&mut rand::thread_rng(),
self.config.shuffle_max_size(),
)
.iter()
.cloned()
.map(|a| a.into())
.collect(),
})),
},
});
}
fn send_shuffle_reply(
&mut self,
origin: AddressablePeer,
nodes: Vec<AddressablePeer>,
) {
if !self.is_active(&origin.peer_id) {
self
.out_events
.push_back(EpisubNetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(origin.peer_id)
.addresses(origin.addresses.into_iter().collect())
.condition(PeerCondition::Disconnected)
.build(),
handler: EpisubHandler::new(self.config.max_transmit_size, true),
});
}
self
.out_events
.push_back(EpisubNetworkBehaviourAction::NotifyHandler {
peer_id: origin.peer_id,
handler: NotifyHandler::Any,
event: rpc::Rpc {
topic: self.topic.clone(),
action: Some(rpc::rpc::Action::ShuffleReply(rpc::ShuffleReply {
nodes: nodes.into_iter().map(|n| n.into()).collect(),
})),
},
});
}
fn maybe_move_random_passive_to_active(&mut self) {
if self.starved() {
let random = self.passive.iter().choose(&mut rand::thread_rng()).cloned();
if let Some(random) = random {
self.passive.remove(&random);
trace!("removing peer {:?} from passive view", random);
self.add_node_to_active_view(random, true);
}
}
}
}
impl Future for HyParView {
type Output = EpisubNetworkBehaviourAction;
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now().duration_since(self.last_shuffle)
> self.config.shuffle_interval
{
if let Some(random) =
self.active().choose(&mut rand::thread_rng()).cloned()
{
self.send_shuffle(random.peer_id);
self.last_shuffle = Instant::now();
}
}
if Instant::now().duration_since(self.last_tick)
> self.config.tick_frequency
{
self.maybe_move_random_passive_to_active();
self.last_tick = Instant::now();
}
if let Some(event) = self.out_events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}
#[derive(Debug, Clone)]
pub struct AddressablePeer {
pub peer_id: PeerId,
pub addresses: HashSet<Multiaddr>,
}
impl PartialEq for AddressablePeer {
fn eq(&self, other: &Self) -> bool {
self.peer_id == other.peer_id
}
}
impl Eq for AddressablePeer {}
impl std::hash::Hash for AddressablePeer {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.peer_id.hash(state);
}
}
impl TryFrom<rpc::AddressablePeer> for AddressablePeer {
type Error = FormatError;
fn try_from(value: rpc::AddressablePeer) -> Result<Self, Self::Error> {
Ok(Self {
peer_id: PeerId::from_bytes(value.peer_id.as_slice())
.map_err(|_| FormatError::Multihash)?,
addresses: value
.addresses
.into_iter()
.filter_map(|a| Multiaddr::try_from(a).ok())
.collect(),
})
}
}
impl From<AddressablePeer> for rpc::AddressablePeer {
fn from(val: AddressablePeer) -> Self {
rpc::AddressablePeer {
peer_id: val.peer_id.to_bytes(),
addresses: val.addresses.into_iter().map(|a| a.to_vec()).collect(),
}
}
}
impl From<&AddressablePeer> for rpc::AddressablePeer {
fn from(val: &AddressablePeer) -> Self {
val.clone().into()
}
}