mod keep_alive;
use super::{
broadcast::Broadcaster,
connection::*,
dispatch::{self, ActorDispatcher, Receive, SendableDispatcher},
server::event::*,
status::Status,
};
use crate::{
bisq::{
constants::{
self, BaseCurrencyNetwork, Capability, CloseConnectionReason, LOCAL_CAPABILITIES,
},
payload::*,
},
prelude::{fut::Either, *},
};
use keep_alive::*;
use std::{
collections::{HashMap, HashSet},
convert::TryInto,
time::{Duration, SystemTime, UNIX_EPOCH},
};
const CONSOLIDATE_CONNECTIONS: Duration = Duration::from_secs(60);
const MAX_CONNECTIONS: usize = 12;
const MIN_CONNECTIONS: usize = MAX_CONNECTIONS / 7 * 10;
struct PeerInfo {
reported_alive_at: SystemTime,
gossiped_capabilities: Option<Vec<Capability>>,
reported_capabilities: Option<Vec<Capability>>,
}
impl From<(NodeAddress, &PeerInfo)> for Peer {
fn from((addr, info): (NodeAddress, &PeerInfo)) -> Peer {
Peer {
node_address: Some(addr),
date: info
.reported_alive_at
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64,
supported_capabilities: info
.reported_capabilities
.as_ref()
.or_else(|| info.gossiped_capabilities.as_ref())
.map(|v| v.iter().map(|c| *c as i32).collect())
.unwrap_or_default(),
}
}
}
pub struct Peers<D: SendableDispatcher> {
keep_alive: Addr<KeepAlive>,
broadcaster: Addr<Broadcaster>,
network: BaseCurrencyNetwork,
connections: HashMap<ConnectionId, Addr<Connection>>,
identified_connections: HashMap<ConnectionId, NodeAddress>,
peer_infos: HashMap<NodeAddress, PeerInfo>,
local_addr: Option<NodeAddress>,
dispatcher: D,
proxy_port: Option<u16>,
status: Status,
}
impl<D: SendableDispatcher> Peers<D> {
pub fn start(
network: BaseCurrencyNetwork,
broadcaster: Addr<Broadcaster>,
status: Status,
dispatcher: D,
proxy_port: Option<u16>,
) -> Addr<Self> {
Self {
keep_alive: KeepAlive::start(),
broadcaster,
network,
connections: HashMap::new(),
identified_connections: HashMap::new(),
peer_infos: HashMap::new(),
local_addr: None,
dispatcher,
proxy_port,
status,
}
.start()
}
fn get_dispatcher(&self, addr: Addr<Peers<D>>) -> impl SendableDispatcher {
dispatch::chain(self.dispatcher.clone())
.forward_to(ActorDispatcher::<KeepAlive, Ping>::new(
self.keep_alive.clone(),
))
.forward_to(ActorDispatcher::<Self, GetPeersRequest>::new(addr.clone()))
.forward_to(ActorDispatcher::<Self, CloseConnectionMessage>::new(addr))
}
fn add_connection(
&mut self,
id: ConnectionId,
conn: Addr<Connection>,
addr: Option<NodeAddress>,
) {
info!("Adding {:?} @ {:?}", id, addr);
let for_keep_alive = conn.downgrade();
let for_broadcaster = conn.downgrade();
self.connections.insert(id, conn);
self.status.connection_added(id, addr.clone());
if let Some(addr) = addr.as_ref() {
self.update_peer_info(&addr, SystemTime::now(), None, None);
self.identified_connections.insert(id, addr.clone());
}
arbiter_spawn!(self
.keep_alive
.send(event::ConnectionAdded(id, for_keep_alive)));
arbiter_spawn!(self
.broadcaster
.send(event::ConnectionAdded(id, for_broadcaster)));
}
fn update_peer_info(
&mut self,
addr: &NodeAddress,
reported_alive_at: SystemTime,
gossiped_capabilities: Option<Vec<i32>>,
reported_capabilities: Option<Vec<i32>>,
) {
let gossiped_capabilities = gossiped_capabilities
.map(|c| c.into_iter().filter_map(|i| i.try_into().ok()).collect());
let reported_capabilities = reported_capabilities
.map(|c| c.into_iter().filter_map(|i| i.try_into().ok()).collect());
if let Some(info) = self.peer_infos.get_mut(addr) {
if reported_alive_at > info.reported_alive_at {
info.reported_alive_at = reported_alive_at;
}
if gossiped_capabilities.is_some() {
info.gossiped_capabilities = gossiped_capabilities
}
if reported_capabilities.is_some() {
info.reported_capabilities = reported_capabilities
}
} else {
self.peer_infos.insert(
addr.clone(),
PeerInfo {
reported_alive_at,
gossiped_capabilities,
reported_capabilities,
},
);
}
}
fn consolidate_connections(&mut self, ctx: &mut <Self as Actor>::Context) {
info!("Consolidating peer connections");
let remove_ids: Vec<ConnectionId> = self
.connections
.iter()
.filter_map(|(id, conn)| {
if self.identified_connections.get(&id).is_none() || !conn.connected() {
Some(id)
} else {
None
}
})
.cloned()
.collect();
remove_ids.into_iter().for_each(|id| {
if self.identified_connections.remove(&id).is_none() {
self.drop_connection(&id, CloseConnectionReason::UnknownPeerAddress);
} else {
self.connections.remove(&id);
self.status.connection_removed(&id);
}
});
ctx.spawn(self.update_alive_times().then(|_, peers, _ctx| {
let candidates = peers.new_connection_candidates();
if candidates.len() + peers.identified_connections.len() < MIN_CONNECTIONS * 2 {
Either::A(peers.request_peers())
} else {
Either::B(fut::ok(()))
}
.then(|_, peers, ctx| {
peers.do_consolidate_connections(ctx);
fut::ok(())
})
}));
}
fn drop_connection(&mut self, id: &ConnectionId, reason: CloseConnectionReason) {
self.status.connection_removed(id);
self.identified_connections.remove(id);
if let Some(addr) = self.connections.remove(id) {
if addr.connected() {
arbiter_spawn!(addr.send(Shutdown(reason)));
}
}
}
fn new_connection_candidates(&self) -> HashSet<&NodeAddress> {
let mut candidates: HashSet<&NodeAddress> = self.peer_infos.keys().collect();
self.identified_connections.values().for_each(|v| {
candidates.remove(&v);
});
candidates
}
fn do_consolidate_connections(&mut self, ctx: &mut <Self as Actor>::Context) {
if self.identified_connections.len() < MIN_CONNECTIONS {
self.new_connection_candidates()
.into_iter()
.take(MAX_CONNECTIONS - self.connections.len())
.cloned()
.for_each(|addr| {
ctx.spawn(
fut::wrap_future(
Connection::open(
addr.clone(),
self.network.into(),
self.get_dispatcher(ctx.address()),
self.proxy_port,
)
.map_err(|_| ()),
)
.map(|(id, conn), peers: &mut Self, ctx| {
peers.add_connection(id, conn, Some(addr));
ctx.spawn(peers.request_peers_from(id));
}),
);
});
} else if self.connections.len() > MAX_CONNECTIONS {
let to_drop: Vec<ConnectionId> = self
.connections
.keys()
.take(self.connections.len() - MAX_CONNECTIONS)
.cloned()
.collect();
to_drop.into_iter().for_each(|id| {
self.drop_connection(&id, CloseConnectionReason::TooManyConnectionsOpen)
});
}
}
fn request_peers(&self) -> impl ActorFuture<Item = (), Error = (), Actor = Self> {
let ids: Vec<ConnectionId> = self.connections.keys().cloned().collect();
fut::wrap_stream(stream::iter_ok::<_, ()>(ids.into_iter()))
.and_then(|id, peers: &mut Self, _ctx| peers.request_peers_from(id))
.finish()
}
fn request_peers_from(
&self,
id: ConnectionId,
) -> impl ActorFuture<Item = (), Error = (), Actor = Self> {
if let Some(conn) = self.connections.get(&id) {
let request = GetPeersRequest {
sender_node_address: self.local_addr.clone(),
nonce: gen_nonce(),
supported_capabilities: LOCAL_CAPABILITIES.clone(),
reported_peers: self.peers_to_report(&id),
};
Either::A(
fut::wrap_future(conn.send(Request(request)).flatten())
.map(
move |GetPeersResponse {
reported_peers,
supported_capabilities,
..
},
peers: &mut Peers<D>,
_ctx| {
if let Some(ref addr) = peers
.identified_connections
.get(&id)
.map(NodeAddress::clone)
{
peers.update_peer_info(
&addr,
SystemTime::now(),
None,
Some(supported_capabilities),
)
};
peers.add_to_peer_infos(reported_peers)
},
)
.then(|_, _, _| fut::ok(())),
)
} else {
Either::B(fut::ok(()))
}
}
fn peers_to_report(&self, exclude: &ConnectionId) -> Vec<Peer> {
self.identified_connections
.iter()
.filter_map(|(id, addr)| {
if *id == *exclude {
None
} else {
self.peer_infos
.get(addr)
.map(|info| (addr.clone(), info).into())
}
})
.collect()
}
fn add_to_peer_infos(&mut self, mut reported: Vec<Peer>) {
reported.drain(..).for_each(
|Peer {
node_address,
date,
supported_capabilities,
}| {
if let Some(addr) = node_address {
self.update_peer_info(
&addr,
UNIX_EPOCH + Duration::from_millis(date as u64),
Some(supported_capabilities),
None,
)
};
},
)
}
fn update_alive_times(&self) -> impl ActorFuture<Item = (), Error = (), Actor = Self> {
fut::wrap_future(self.keep_alive.send(ReportLastActive))
.and_then(|alive_times, peers: &mut Self, _| {
alive_times.into_iter().for_each(|(id, last_active)| {
if let Some(ref addr) = peers
.identified_connections
.get(&id)
.map(NodeAddress::clone)
{
peers.status.connection_alive(&id, last_active);
peers.update_peer_info(addr, last_active, None, None)
}
});
fut::ok(())
})
.map_err(|_, _, _| ())
}
}
impl<D: SendableDispatcher> Actor for Peers<D> {
type Context = Context<Peers<D>>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(CONSOLIDATE_CONNECTIONS, |peers, ctx| {
peers.consolidate_connections(ctx);
});
}
}
pub struct SeedConnection(pub NodeAddress, pub ConnectionId, pub Addr<Connection>);
impl Message for SeedConnection {
type Result = ();
}
impl<D: SendableDispatcher> Handler<SeedConnection> for super::Peers<D> {
type Result = ();
fn handle(
&mut self,
SeedConnection(addr, id, connection): SeedConnection,
ctx: &mut Self::Context,
) -> Self::Result {
arbiter_spawn!(connection.send(SetDispatcher(self.get_dispatcher(ctx.address()))));
self.add_connection(id, connection, Some(addr));
self.consolidate_connections(ctx);
}
}
impl<D: SendableDispatcher> Handler<Receive<GetPeersRequest>> for Peers<D> {
type Result = ();
fn handle(
&mut self,
Receive(
conn_id,
GetPeersRequest {
nonce,
sender_node_address,
supported_capabilities,
reported_peers,
},
): Receive<GetPeersRequest>,
ctx: &mut Self::Context,
) -> Self::Result {
self.add_to_peer_infos(reported_peers);
if let Some(addr) = sender_node_address {
self.update_peer_info(&addr, SystemTime::now(), None, Some(supported_capabilities));
self.status.connection_identified(&conn_id, &addr);
self.identified_connections.insert(conn_id, addr);
}
if let Some(conn) = self.connections.get(&conn_id).map(Addr::clone) {
ctx.spawn(self.update_alive_times().then(move |_, peers, _| {
let res = GetPeersResponse {
request_nonce: nonce,
reported_peers: peers.peers_to_report(&conn_id),
supported_capabilities: constants::LOCAL_CAPABILITIES.clone(),
};
fut::wrap_future(conn.send(Payload(res)).then(|_| Ok(())))
}));
}
}
}
impl<D: SendableDispatcher> Handler<Receive<CloseConnectionMessage>> for Peers<D> {
type Result = ();
fn handle(
&mut self,
Receive(conn_id, _): Receive<CloseConnectionMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
self.drop_connection(&conn_id, CloseConnectionReason::CloseRequestedByPeer)
}
}
impl<D: SendableDispatcher> Handler<ServerStarted> for Peers<D> {
type Result = ();
fn handle(
&mut self,
ServerStarted(addr): ServerStarted,
_: &mut Self::Context,
) -> Self::Result {
self.local_addr = Some(addr);
}
}
impl<D: SendableDispatcher> Handler<IncomingConnection> for Peers<D> {
type Result = ();
fn handle(
&mut self,
IncomingConnection(tcp): IncomingConnection,
ctx: &mut Self::Context,
) -> Self::Result {
let dispatcher = self.get_dispatcher(ctx.address());
let (id, conn) = Connection::from_tcp_stream(tcp, self.network.into(), dispatcher);
self.add_connection(id, conn, None);
}
}
pub mod event {
use crate::p2p::connection::{Connection, ConnectionId};
use actix::{Message, WeakAddr};
pub struct ConnectionAdded(pub ConnectionId, pub WeakAddr<Connection>);
impl Message for ConnectionAdded {
type Result = ();
}
}