use super::{
network::{Inner, Packet, NETWORK},
Config, Error, Event, NodeInfo, OurType, Peer,
};
use crossbeam_channel::Sender;
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use std::{cell::RefCell, net::SocketAddr, rc::Rc};
use unwrap::unwrap;
pub(super) struct Node {
network: Rc<RefCell<Inner>>,
addr: SocketAddr,
event_tx: Sender<Event>,
config: Config,
peers: FxHashMap<SocketAddr, ConnectionType>,
bootstrap_cache: FxHashSet<NodeInfo>,
pending_bootstraps: FxHashSet<SocketAddr>,
pending_messages: FxHashMap<SocketAddr, Vec<Bytes>>,
}
impl Node {
pub fn new(event_tx: Sender<Event>, config: Config) -> Rc<RefCell<Self>> {
let network = NETWORK.with(|network| {
Rc::clone(unwrap!(
network.borrow().as_ref(),
"Mock Network must exist before creating instances of QuicP2p."
))
});
let addr = network.borrow_mut().gen_addr(config.ip, config.port);
let node = Rc::new(RefCell::new(Node {
network,
addr,
event_tx,
config,
peers: Default::default(),
bootstrap_cache: Default::default(),
pending_bootstraps: Default::default(),
pending_messages: Default::default(),
}));
node.borrow()
.network
.borrow_mut()
.insert_node(addr, Rc::clone(&node));
node
}
pub fn bootstrap(&mut self) {
if self
.peers
.values()
.cloned()
.any(ConnectionType::is_bootstrap)
{
return;
}
if self.config.hard_coded_contacts.is_empty() && self.bootstrap_cache.is_empty() {
self.fire_event(Event::BootstrapFailure);
return;
}
for contact in self
.config
.hard_coded_contacts
.iter()
.chain(&self.bootstrap_cache)
{
let _ = self.pending_bootstraps.insert(contact.peer_addr);
self.network.borrow_mut().send(
self.addr,
contact.peer_addr,
Packet::BootstrapRequest(self.config.our_type),
)
}
}
pub fn connect(&self, dst: SocketAddr) {
if self.peers.contains_key(&dst) {
return;
}
self.send_connect_request(dst)
}
pub fn disconnect(&mut self, dst: SocketAddr) {
if self.peers.remove(&dst).is_some() {
self.network.borrow_mut().disconnect(self.addr, dst)
}
}
pub fn send(&mut self, dst: SocketAddr, msg: Bytes) {
if self.peers.contains_key(&dst) {
self.send_message(dst, msg)
} else {
self.send_connect_request(dst);
self.add_pending_message(dst, msg)
}
}
pub fn receive_packet(&mut self, src: SocketAddr, packet: Packet) {
match packet {
Packet::BootstrapRequest(peer_type) => {
if self.peers.insert(src, ConnectionType::Bootstrap).is_none() {
self.network
.borrow_mut()
.send(self.addr, src, Packet::BootstrapSuccess);
self.fire_event(Event::ConnectedTo {
peer: Peer::new(peer_type, src),
})
}
}
Packet::BootstrapSuccess => {
if !self
.peers
.values()
.cloned()
.any(ConnectionType::is_bootstrap)
{
let _ = self.peers.insert(src, ConnectionType::Bootstrap);
self.pending_bootstraps.clear();
self.fire_event(Event::BootstrappedTo {
node: NodeInfo::from(src),
})
} else {
self.network
.borrow_mut()
.send(self.addr, src, Packet::Disconnect)
}
}
Packet::BootstrapFailure => {
if !self
.peers
.values()
.cloned()
.any(ConnectionType::is_bootstrap)
{
let _ = self.pending_bootstraps.remove(&src);
if self.pending_bootstraps.is_empty() {
self.fire_event(Event::BootstrapFailure)
}
}
}
Packet::ConnectRequest(peer_type) => {
if self.peers.insert(src, ConnectionType::Normal).is_none() {
self.network
.borrow_mut()
.send(self.addr, src, Packet::ConnectSuccess);
self.fire_event(Event::ConnectedTo {
peer: Peer::new(peer_type, src),
})
}
}
Packet::ConnectSuccess => {
if self.peers.insert(src, ConnectionType::Normal).is_none() {
let _ = self.bootstrap_cache.insert(NodeInfo::from(src));
self.send_pending_messages(src);
self.fire_event(Event::ConnectedTo {
peer: Peer::node(src),
});
}
}
Packet::ConnectFailure => {
}
Packet::Message(msg) => self.fire_event(Event::NewMessage {
peer_addr: src,
msg,
}),
Packet::MessageFailure(msg) => self.fire_event(Event::UnsentUserMessage {
peer_addr: src,
msg,
}),
Packet::Disconnect => {
if self.peers.remove(&src).is_some() {
self.fire_event(Event::ConnectionFailure {
peer_addr: src,
err: Error,
})
}
}
}
}
pub fn our_connection_info(&self) -> Result<NodeInfo, Error> {
match self.config.our_type {
OurType::Client => Err(Error),
OurType::Node => Ok(NodeInfo::from(self.addr)),
}
}
pub fn bootstrap_cache(&self) -> Vec<NodeInfo> {
self.bootstrap_cache.iter().cloned().collect()
}
pub fn is_connected(&self, addr: &SocketAddr) -> bool {
self.peers.get(addr).is_some()
}
fn fire_event(&self, event: Event) {
let _ = self.event_tx.send(event);
}
fn send_connect_request(&self, dst: SocketAddr) {
self.network
.borrow_mut()
.send(self.addr, dst, Packet::ConnectRequest(self.config.our_type))
}
fn send_message(&self, dst: SocketAddr, msg: Bytes) {
self.network
.borrow_mut()
.send(self.addr, dst, Packet::Message(msg))
}
fn add_pending_message(&mut self, addr: SocketAddr, msg: Bytes) {
self.pending_messages
.entry(addr)
.or_insert_with(Default::default)
.push(msg)
}
fn send_pending_messages(&mut self, addr: SocketAddr) {
let messages = if let Some(messages) = self.pending_messages.remove(&addr) {
messages
} else {
return;
};
for msg in messages {
self.send_message(addr, msg)
}
}
}
#[cfg(test)]
impl Node {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn our_type(&self) -> OurType {
self.config.our_type
}
}
impl Drop for Node {
fn drop(&mut self) {
for (dst, _) in self.peers.drain() {
self.network.borrow_mut().disconnect(self.addr, dst)
}
self.network.borrow_mut().remove_node(&self.addr);
self.fire_event(Event::Finish)
}
}
#[derive(Clone, Copy, Eq, PartialEq)]
enum ConnectionType {
Normal,
Bootstrap,
}
impl ConnectionType {
fn is_bootstrap(self) -> bool {
match self {
ConnectionType::Normal => false,
ConnectionType::Bootstrap => true,
}
}
}