use std::{
collections::{hash_map::Entry, HashMap},
error::Error,
fmt::{self, Debug, Display, Formatter},
mem,
net::SocketAddr,
time::{Duration, Instant},
};
use datasize::DataSize;
use prometheus::IntGauge;
use rand::Rng;
use tracing::{debug, error, error_span, field::Empty, info, trace, warn, Span};
use super::{
blocklist::BlocklistJustification,
display_error,
health::{ConnectionHealth, HealthCheckOutcome, HealthConfig, Nonce, TaggedTimestamp},
NodeId,
};
#[derive(DataSize, Debug)]
pub struct Outgoing<H, E>
where
H: DataSize,
E: DataSize,
{
pub(super) is_unforgettable: bool,
pub(super) state: OutgoingState<H, E>,
}
#[derive(DataSize, Debug)]
pub(crate) enum OutgoingState<H, E>
where
H: DataSize,
E: DataSize,
{
Connecting {
failures_so_far: u8,
since: Instant,
},
Waiting {
failures_so_far: u8,
error: Option<E>,
last_failure: Instant,
},
Connected {
peer_id: NodeId,
handle: H,
health: ConnectionHealth,
},
Blocked {
since: Instant,
justification: BlocklistJustification,
until: Instant,
},
Loopback,
}
impl<H, E> Display for OutgoingState<H, E>
where
H: DataSize,
E: DataSize,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
OutgoingState::Connecting {
failures_so_far, ..
} => {
write!(f, "connecting({})", failures_so_far)
}
OutgoingState::Waiting {
failures_so_far, ..
} => write!(f, "waiting({})", failures_so_far),
OutgoingState::Connected { .. } => write!(f, "connected"),
OutgoingState::Blocked { .. } => write!(f, "blocked"),
OutgoingState::Loopback => write!(f, "loopback"),
}
}
}
#[derive(Debug)]
pub enum DialOutcome<H, E> {
Successful {
addr: SocketAddr,
handle: H,
node_id: NodeId,
when: Instant,
},
Failed {
addr: SocketAddr,
error: E,
when: Instant,
},
Loopback {
addr: SocketAddr,
},
}
impl<H, E> DialOutcome<H, E> {
fn addr(&self) -> SocketAddr {
match self {
DialOutcome::Successful { addr, .. }
| DialOutcome::Failed { addr, .. }
| DialOutcome::Loopback { addr, .. } => *addr,
}
}
}
#[derive(Clone, Debug)]
#[must_use]
pub(crate) enum DialRequest<H> {
Dial { addr: SocketAddr, span: Span },
Disconnect { handle: H, span: Span },
SendPing {
peer_id: NodeId,
nonce: Nonce,
span: Span,
},
}
impl<H> Display for DialRequest<H>
where
H: Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
DialRequest::Dial { addr, .. } => {
write!(f, "dial: {}", addr)
}
DialRequest::Disconnect { handle, .. } => {
write!(f, "disconnect: {}", handle)
}
DialRequest::SendPing { peer_id, nonce, .. } => {
write!(f, "ping[{}]: {}", nonce, peer_id)
}
}
}
}
#[derive(DataSize, Debug)]
pub struct OutgoingConfig {
pub(crate) retry_attempts: u8,
pub(crate) base_timeout: Duration,
pub(crate) unblock_after_min: Duration,
pub(crate) unblock_after_max: Duration,
pub(crate) sweep_timeout: Duration,
pub(crate) health: HealthConfig,
}
impl OutgoingConfig {
fn calc_backoff(&self, failed_attempts: u8) -> Duration {
(1u32 << failed_attempts as u32) * self.base_timeout
}
}
#[derive(DataSize, Debug)]
pub struct OutgoingManager<H, E>
where
H: DataSize,
E: DataSize,
{
config: OutgoingConfig,
pub(super) outgoing: HashMap<SocketAddr, Outgoing<H, E>>,
routes: HashMap<NodeId, SocketAddr>,
#[data_size(skip)]
metrics: OutgoingMetrics,
}
#[derive(Clone, Debug)]
pub(super) struct OutgoingMetrics {
pub(super) out_state_connecting: IntGauge,
pub(super) out_state_waiting: IntGauge,
pub(super) out_state_connected: IntGauge,
pub(super) out_state_blocked: IntGauge,
pub(super) out_state_loopback: IntGauge,
}
#[cfg(test)]
impl Default for OutgoingMetrics {
fn default() -> Self {
Self {
out_state_connecting: IntGauge::new(
"out_state_connecting",
"internal out_state_connecting",
)
.unwrap(),
out_state_waiting: IntGauge::new("out_state_waiting", "internal out_state_waiting")
.unwrap(),
out_state_connected: IntGauge::new(
"out_state_connected",
"internal out_state_connected",
)
.unwrap(),
out_state_blocked: IntGauge::new("out_state_blocked", "internal out_state_blocked")
.unwrap(),
out_state_loopback: IntGauge::new("out_state_loopback", "internal loopback").unwrap(),
}
}
}
impl<H, E> OutgoingManager<H, E>
where
H: DataSize,
E: DataSize,
{
#[cfg(test)]
#[inline]
pub(super) fn new(config: OutgoingConfig) -> Self {
Self::with_metrics(config, Default::default())
}
pub(super) fn with_metrics(config: OutgoingConfig, metrics: OutgoingMetrics) -> Self {
Self {
config,
outgoing: Default::default(),
routes: Default::default(),
metrics,
}
}
#[cfg(test)]
fn metrics(&self) -> &OutgoingMetrics {
&self.metrics
}
}
#[inline]
fn make_span<H, E>(addr: SocketAddr, outgoing: Option<&Outgoing<H, E>>) -> Span
where
H: DataSize,
E: DataSize,
{
if let Some(outgoing) = outgoing {
match outgoing.state {
OutgoingState::Connected { peer_id, .. } => {
error_span!("outgoing", %addr, state=%outgoing.state, %peer_id, consensus_key=Empty)
}
_ => {
error_span!("outgoing", %addr, state=%outgoing.state, peer_id=Empty, consensus_key=Empty)
}
}
} else {
error_span!("outgoing", %addr, state = "-")
}
}
impl<H, E> OutgoingManager<H, E>
where
H: DataSize + Clone,
E: DataSize + Error,
{
fn change_outgoing_state(
&mut self,
addr: SocketAddr,
mut new_state: OutgoingState<H, E>,
) -> (&mut Outgoing<H, E>, Option<H>) {
let (prev_state, new_outgoing) = match self.outgoing.entry(addr) {
Entry::Vacant(vacant) => {
let inserted = vacant.insert(Outgoing {
state: new_state,
is_unforgettable: false,
});
(None, inserted)
}
Entry::Occupied(occupied) => {
let prev = occupied.into_mut();
mem::swap(&mut prev.state, &mut new_state);
(Some(new_state), prev)
}
};
match (&prev_state, &new_outgoing.state) {
(Some(OutgoingState::Connected { .. }), OutgoingState::Connected { .. }) => {
trace!("route unchanged, already connected");
}
(Some(OutgoingState::Connected { peer_id, .. }), _) => {
debug!(%peer_id, "route removed");
self.routes.remove(peer_id);
}
(_, OutgoingState::Connected { peer_id, .. }) => {
debug!(%peer_id, "route added");
self.routes.insert(*peer_id, addr);
}
_ => {
trace!("route unchanged");
}
}
match prev_state {
Some(OutgoingState::Blocked { .. }) => self.metrics.out_state_blocked.dec(),
Some(OutgoingState::Connected { .. }) => self.metrics.out_state_connected.dec(),
Some(OutgoingState::Connecting { .. }) => self.metrics.out_state_connecting.dec(),
Some(OutgoingState::Loopback) => self.metrics.out_state_loopback.dec(),
Some(OutgoingState::Waiting { .. }) => self.metrics.out_state_waiting.dec(),
None => {
}
}
match new_outgoing.state {
OutgoingState::Blocked { .. } => self.metrics.out_state_blocked.inc(),
OutgoingState::Connected { .. } => self.metrics.out_state_connected.inc(),
OutgoingState::Connecting { .. } => self.metrics.out_state_connecting.inc(),
OutgoingState::Loopback => self.metrics.out_state_loopback.inc(),
OutgoingState::Waiting { .. } => self.metrics.out_state_waiting.inc(),
}
let handle = if let Some(OutgoingState::Connected { handle, .. }) = prev_state {
Some(handle)
} else {
None
};
(new_outgoing, handle)
}
pub(crate) fn get_addr(&self, peer_id: NodeId) -> Option<SocketAddr> {
self.routes.get(&peer_id).copied()
}
pub(crate) fn get_route(&self, peer_id: NodeId) -> Option<&H> {
let outgoing = self.outgoing.get(self.routes.get(&peer_id)?)?;
if let OutgoingState::Connected { ref handle, .. } = outgoing.state {
Some(handle)
} else {
None
}
}
pub(crate) fn connected_peers(&'_ self) -> impl Iterator<Item = NodeId> + '_ {
self.routes.keys().copied()
}
pub(crate) fn learn_addr(
&mut self,
addr: SocketAddr,
unforgettable: bool,
now: Instant,
) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Occupied(_) => {
trace!("ignoring already known address");
None
}
Entry::Vacant(_vacant) => {
info!("connecting to newly learned address");
let (outgoing, _) = self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
if outgoing.is_unforgettable != unforgettable {
outgoing.is_unforgettable = unforgettable;
debug!(unforgettable, "marked");
}
Some(DialRequest::Dial { addr, span })
}
})
}
pub(crate) fn block_addr<R: Rng>(
&mut self,
addr: SocketAddr,
now: Instant,
justification: BlocklistJustification,
rng: &mut R,
) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Vacant(_vacant) => {
info!("unknown address blocked");
let until = self.calculate_block_until(now, rng);
self.change_outgoing_state(
addr,
OutgoingState::Blocked {
since: now,
justification,
until,
},
);
None
}
Entry::Occupied(occupied) => match occupied.get().state {
OutgoingState::Blocked { .. } => {
debug!("address already blocked");
None
}
OutgoingState::Loopback => {
warn!("loopback address block ignored");
None
}
OutgoingState::Connected { ref handle, .. } => {
info!("connected address blocked, disconnecting");
let handle = handle.clone();
let until = self.calculate_block_until(now, rng);
self.change_outgoing_state(
addr,
OutgoingState::Blocked {
since: now,
justification,
until,
},
);
Some(DialRequest::Disconnect { span, handle })
}
OutgoingState::Waiting { .. } | OutgoingState::Connecting { .. } => {
let until = self.calculate_block_until(now, rng);
info!("address blocked");
self.change_outgoing_state(
addr,
OutgoingState::Blocked {
since: now,
justification,
until,
},
);
None
}
},
})
}
#[cfg(test)]
pub(crate) fn is_blocked(&self, addr: SocketAddr) -> bool {
match self.outgoing.get(&addr) {
Some(outgoing) => matches!(outgoing.state, OutgoingState::Blocked { .. }),
None => false,
}
}
#[allow(dead_code)]
pub(crate) fn redeem_addr(&mut self, addr: SocketAddr, now: Instant) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Vacant(_) => {
debug!("unknown address redeemed");
None
}
Entry::Occupied(occupied) => match occupied.get().state {
OutgoingState::Blocked { .. } => {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
Some(DialRequest::Dial { addr, span })
}
_ => {
debug!("address redemption ignored, not blocked");
None
}
},
})
}
pub(super) fn record_pong(&mut self, peer_id: NodeId, pong: TaggedTimestamp) -> bool {
let addr = if let Some(addr) = self.routes.get(&peer_id) {
*addr
} else {
debug!(%peer_id, nonce=%pong.nonce(), "ignoring pong received from peer without route");
return false;
};
if let Some(outgoing) = self.outgoing.get_mut(&addr) {
if let OutgoingState::Connected { ref mut health, .. } = outgoing.state {
health.record_pong(&self.config.health, pong)
} else {
debug!(%peer_id, nonce=%pong.nonce(), "ignoring pong received from peer that is not in connected state");
false
}
} else {
debug!(%peer_id, nonce=%pong.nonce(), "ignoring pong received from peer without route");
false
}
}
pub(super) fn perform_housekeeping<R: Rng>(
&mut self,
rng: &mut R,
now: Instant,
) -> Vec<DialRequest<H>> {
let mut to_forget = Vec::new();
let mut to_fail = Vec::new();
let mut to_ping_timeout = Vec::new();
let mut to_reconnect = Vec::new();
let mut to_ping = Vec::new();
for (&addr, outgoing) in &mut self.outgoing {
let _span_guard = make_span(addr, Some(outgoing)).entered();
match outgoing.state {
OutgoingState::Waiting {
failures_so_far,
last_failure,
..
} => {
if failures_so_far > self.config.retry_attempts {
if outgoing.is_unforgettable {
info!("unforgettable address reset");
to_reconnect.push((addr, 0));
} else {
to_forget.push(addr);
info!("address forgotten");
}
} else {
let due = last_failure + self.config.calc_backoff(failures_so_far);
if now >= due {
debug!(attempts = failures_so_far, "address reconnecting");
to_reconnect.push((addr, failures_so_far));
}
}
}
OutgoingState::Blocked { until, .. } => {
if now >= until {
info!("address unblocked");
to_reconnect.push((addr, 0));
}
}
OutgoingState::Connecting {
since,
failures_so_far,
} => {
let timeout = since + self.config.sweep_timeout;
if now >= timeout {
warn!("address timed out connecting, was swept");
to_fail.push((addr, failures_so_far + 1));
}
}
OutgoingState::Connected {
peer_id,
ref mut health,
..
} => {
let health_outcome = health.update_health(rng, &self.config.health, now);
match health_outcome {
HealthCheckOutcome::DoNothing => {
}
HealthCheckOutcome::SendPing(nonce) => {
trace!(%nonce, "sending ping");
to_ping.push((peer_id, addr, nonce));
}
HealthCheckOutcome::GiveUp => {
info!("disconnecting after ping retries were exhausted");
to_ping_timeout.push(addr);
}
}
}
OutgoingState::Loopback => {
}
}
}
for addr in to_forget {
self.outgoing.remove(&addr);
}
for (addr, failures_so_far) in to_fail {
let span = make_span(addr, self.outgoing.get(&addr));
span.in_scope(|| {
self.change_outgoing_state(
addr,
OutgoingState::Waiting {
failures_so_far,
error: None,
last_failure: now,
},
)
});
}
let mut dial_requests = Vec::new();
for addr in to_ping_timeout {
let span = make_span(addr, self.outgoing.get(&addr));
let (_, opt_handle) = span.clone().in_scope(|| {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
)
});
if let Some(handle) = opt_handle {
dial_requests.push(DialRequest::Disconnect {
handle,
span: span.clone(),
});
} else {
error!("did not expect connection under ping timeout to not have a residual connection handle. this is a bug");
}
dial_requests.push(DialRequest::Dial { addr, span });
}
dial_requests.extend(to_reconnect.into_iter().map(|(addr, failures_so_far)| {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(|| {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far,
since: now,
},
)
});
DialRequest::Dial { addr, span }
}));
dial_requests.extend(to_ping.into_iter().map(|(peer_id, addr, nonce)| {
let span = make_span(addr, self.outgoing.get(&addr));
DialRequest::SendPing {
peer_id,
nonce,
span,
}
}));
dial_requests
}
pub(crate) fn handle_dial_outcome(
&mut self,
dial_outcome: DialOutcome<H, E>,
) -> Option<DialRequest<H>> {
let addr = dial_outcome.addr();
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(move || match dial_outcome {
DialOutcome::Successful {
addr,
handle,
node_id,
when
} => {
info!("established outgoing connection");
if let Some(Outgoing{
state: OutgoingState::Blocked { .. }, ..
}) = self.outgoing.get(&addr) {
Some(DialRequest::Disconnect{
handle, span
})
} else {
self.change_outgoing_state(
addr,
OutgoingState::Connected {
peer_id: node_id,
handle,
health: ConnectionHealth::new(when),
},
);
None
}
}
DialOutcome::Failed { addr, error, when } => {
info!(err = display_error(&error), "outgoing connection failed");
if let Some(outgoing) = self.outgoing.get(&addr) {
match outgoing.state {
OutgoingState::Connecting { failures_so_far,.. } => {
self.change_outgoing_state(
addr,
OutgoingState::Waiting {
failures_so_far: failures_so_far + 1,
error: Some(error),
last_failure: when,
},
);
None
}
OutgoingState::Blocked { .. } => {
debug!("failed dial outcome after block ignored");
None
}
OutgoingState::Waiting { .. } |
OutgoingState::Connected { .. } |
OutgoingState::Loopback => {
warn!(
"processing dial outcome on a connection that was not marked as connecting or blocked"
);
None
}
}
} else {
warn!("processing dial outcome non-existent connection");
None
}
}
DialOutcome::Loopback { addr } => {
info!("found loopback address");
self.change_outgoing_state(addr, OutgoingState::Loopback);
None
}
})
}
pub(crate) fn handle_connection_drop(
&mut self,
addr: SocketAddr,
now: Instant,
) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(move || {
if let Some(outgoing) = self.outgoing.get(&addr) {
match outgoing.state {
OutgoingState::Waiting { .. }
| OutgoingState::Loopback
| OutgoingState::Connecting { .. } => {
warn!("unexpected drop notification");
None
}
OutgoingState::Connected { .. } => {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
Some(DialRequest::Dial { addr, span })
}
OutgoingState::Blocked { .. } => {
debug!("received drop notification for blocked connection");
None
}
}
} else {
warn!("received connection drop notification for unknown connection");
None
}
})
}
fn calculate_block_until<R: Rng>(&self, now: Instant, rng: &mut R) -> Instant {
let min = self.config.unblock_after_min;
let max = self.config.unblock_after_max;
if min == max {
return now + min;
}
let block_duration = rng.gen_range(min..=max);
now + block_duration
}
}
#[cfg(test)]
mod tests {
use std::{net::SocketAddr, time::Duration};
use assert_matches::assert_matches;
use datasize::DataSize;
use rand::Rng;
use thiserror::Error;
use super::{DialOutcome, DialRequest, NodeId, OutgoingConfig, OutgoingManager};
use crate::{
components::network::{
blocklist::BlocklistJustification,
health::{HealthConfig, TaggedTimestamp},
},
testing::{init_logging, test_clock::TestClock},
};
#[derive(DataSize, Debug, Error)]
#[error("test dialer error({})", id)]
struct TestDialerError {
id: u32,
}
fn test_config() -> OutgoingConfig {
OutgoingConfig {
retry_attempts: 3,
base_timeout: Duration::from_secs(1),
unblock_after_min: Duration::from_secs(60),
unblock_after_max: Duration::from_secs(60),
sweep_timeout: Duration::from_secs(45),
health: HealthConfig::test_config(),
}
}
fn config_variant_unblock() -> OutgoingConfig {
OutgoingConfig {
retry_attempts: 3,
base_timeout: Duration::from_secs(1),
unblock_after_min: Duration::from_secs(60),
unblock_after_max: Duration::from_secs(80),
sweep_timeout: Duration::from_secs(45),
health: HealthConfig::test_config(),
}
}
fn dials<'a, H, T>(expected: SocketAddr, requests: T) -> bool
where
T: IntoIterator<Item = &'a DialRequest<H>> + 'a,
H: 'a,
{
for req in requests.into_iter() {
if let DialRequest::Dial { addr, .. } = req {
if *addr == expected {
return true;
}
}
}
false
}
fn disconnects<'a, H, T>(expected: H, requests: T) -> bool
where
T: IntoIterator<Item = &'a DialRequest<H>> + 'a,
H: 'a + PartialEq,
{
for req in requests.into_iter() {
if let DialRequest::Disconnect { handle, .. } = req {
if *handle == expected {
return true;
}
}
}
false
}
#[test]
fn successful_lifecycle() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
assert_eq!(manager.metrics().out_state_connecting.get(), 1);
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 1 },
when: clock.now(),
},)
.is_none());
assert_eq!(manager.metrics().out_state_connecting.get(), 0);
assert_eq!(manager.metrics().out_state_waiting.get(), 1);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(2_000);
assert!(dials(
addr_a,
&manager.perform_housekeeping(&mut rng, clock.now())
));
assert_eq!(manager.metrics().out_state_connecting.get(), 1);
assert_eq!(manager.metrics().out_state_waiting.get(), 0);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 99,
node_id: id_a,
when: clock.now(),
},)
.is_none());
assert_eq!(manager.metrics().out_state_connecting.get(), 0);
assert_eq!(manager.metrics().out_state_connected.get(), 1);
assert_eq!(manager.get_route(id_a), Some(&99));
assert_eq!(manager.get_addr(id_a), Some(addr_a));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(20_000);
assert!(dials(
addr_a,
&manager.handle_connection_drop(addr_a, clock.now())
));
assert_eq!(manager.metrics().out_state_connecting.get(), 1);
assert_eq!(manager.metrics().out_state_waiting.get(), 0);
assert!(manager.get_route(id_a).is_none());
assert!(manager.get_addr(id_a).is_none());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
}
#[test]
fn connections_forgotten_after_too_many_tries() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
assert!(dials(
addr_b,
&manager.learn_addr(addr_b, true, clock.now())
));
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 11 },
when: clock.now(),
},)
.is_none());
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(manager.learn_addr(addr_b, false, clock.now()).is_none());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(manager.learn_addr(addr_b, false, clock.now()).is_none());
clock.advance_time(1_999);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(1);
let requests = manager.perform_housekeeping(&mut rng, clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
clock.advance_time(6_000);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 40 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 41 },
when: clock.now(),
},)
.is_none());
clock.advance_time(3_999);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(1);
let requests = manager.perform_housekeeping(&mut rng, clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
clock.advance_time(25);
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(7_999);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(1);
let requests = manager.perform_housekeeping(&mut rng, clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
let requests = manager.perform_housekeeping(&mut rng, clock.now());
assert!(!dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
clock.advance_time(1_000_000_000);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
}
#[test]
fn blocking_works() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let addr_c: SocketAddr = "9.0.1.2:9012".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let id_b = NodeId::random(&mut rng);
let id_c = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(manager
.block_addr(
addr_a,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
.is_none());
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(dials(
addr_b,
&manager.learn_addr(addr_b, true, clock.now())
));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(15_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 101,
node_id: id_b,
when: clock.now(),
},)
.is_none());
assert_eq!(manager.get_route(id_b), Some(&101));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert_eq!(manager.get_route(id_b), Some(&101));
clock.advance_time(15_000);
assert!(disconnects(
101,
&manager.block_addr(
addr_b,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
));
assert!(dials(
addr_c,
&manager.learn_addr(addr_c, false, clock.now())
));
assert!(manager
.block_addr(
addr_c,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
.is_none());
assert!(disconnects(
42,
&manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_c,
handle: 42,
node_id: id_c,
when: clock.now(),
},)
));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager.get_route(id_c).is_none());
clock.advance_time(30_000);
assert!(dials(
addr_a,
&manager.perform_housekeeping(&mut rng, clock.now())
));
clock.advance_time(15_000);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(dials(addr_b, &manager.redeem_addr(addr_b, clock.now())));
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 77,
node_id: id_b,
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 66,
node_id: id_a,
when: clock.now(),
},)
.is_none());
assert_eq!(manager.get_route(id_a), Some(&66));
assert_eq!(manager.get_route(id_b), Some(&77));
}
#[test]
fn loopback_handled_correctly() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let loopback_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(
loopback_addr,
&manager.learn_addr(loopback_addr, false, clock.now())
));
assert!(manager
.handle_dial_outcome(DialOutcome::Loopback {
addr: loopback_addr,
},)
.is_none());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager
.learn_addr(loopback_addr, false, clock.now())
.is_none());
assert!(manager
.block_addr(
loopback_addr,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
.is_none());
clock.advance_time(1_000_000_000);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
}
#[test]
fn connected_peers_works() {
init_logging();
let mut rng = crate::new_rng();
let clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let id_b = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
manager.learn_addr(addr_a, false, clock.now());
manager.learn_addr(addr_b, true, clock.now());
manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 22,
node_id: id_a,
when: clock.now(),
});
manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 33,
node_id: id_b,
when: clock.now(),
});
let mut peer_ids: Vec<_> = manager.connected_peers().collect();
let mut expected = vec![id_a, id_b];
peer_ids.sort();
expected.sort();
assert_eq!(peer_ids, expected);
}
#[test]
fn sweeping_works() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
clock.advance_time(50_000);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance_time(2_000);
assert!(dials(
addr_a,
&manager.perform_housekeeping(&mut rng, clock.now())
));
clock.advance_time(1_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 2,
node_id: id_a,
when: clock.now(),
})
.is_none());
assert_eq!(manager.get_route(id_a), Some(&2));
clock.advance_time(30_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 1,
node_id: id_a,
when: clock.now(),
})
.is_none());
assert_eq!(manager.get_route(id_a), Some(&1));
}
#[test]
fn blocking_not_overridden_by_racing_failed_connections() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(!manager.is_blocked(addr_a));
assert!(manager
.block_addr(
addr_a,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
.is_none());
assert!(manager.is_blocked(addr_a));
clock.advance_time(60);
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 12345 },
when: clock.now(),
})
.is_none());
assert!(manager.is_blocked(addr_a));
clock.advance_time(60);
assert!(manager.is_blocked(addr_a));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
assert!(manager.is_blocked(addr_a));
}
#[test]
fn emits_and_accepts_pings() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(addr, &manager.learn_addr(addr, false, clock.now())));
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr,
handle: 1,
node_id: id,
when: clock.now(),
})
.is_none());
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
for _ in 0..50 {
clock.advance(Duration::from_secs(3));
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance(Duration::from_secs(2));
let (_first_nonce, peer_id) = assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { nonce, peer_id, .. }] => (nonce, peer_id)
);
assert_eq!(peer_id, id);
assert!(manager
.perform_housekeeping(&mut rng, clock.now())
.is_empty());
clock.advance(Duration::from_secs(1));
clock.advance(Duration::from_secs(1));
let (second_nonce, peer_id) = assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { nonce, peer_id, .. }] => (nonce, peer_id)
);
assert_eq!(peer_id, id);
clock.advance(Duration::from_secs(1));
assert!(!manager.record_pong(
peer_id,
TaggedTimestamp::from_parts(clock.now(), second_nonce),
));
}
clock.advance(Duration::from_secs(5));
assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { .. }]
);
clock.advance(Duration::from_secs(2));
assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { .. }]
);
clock.advance(Duration::from_secs(2));
assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { .. }]
);
clock.advance(Duration::from_secs(2));
assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::SendPing { .. }]
);
clock.advance(Duration::from_secs(2));
let dial_addr = assert_matches!(
manager
.perform_housekeeping(&mut rng, clock.now())
.as_slice(),
&[DialRequest::Disconnect { .. }, DialRequest::Dial { addr, .. }] => addr
);
assert_eq!(dial_addr, addr);
}
#[test]
fn indicates_issue_when_excessive_pongs_are_encountered() {
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(dials(addr, &manager.learn_addr(addr, false, clock.now())));
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr,
handle: 1,
node_id: id,
when: clock.now(),
})
.is_none());
clock.advance(Duration::from_millis(50));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(!manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
assert!(manager.record_pong(id, TaggedTimestamp::from_parts(clock.now(), rng.gen())));
}
#[test]
fn unblocking_in_variant_block_time() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(config_variant_unblock());
assert!(!manager.is_blocked(addr_a));
assert!(manager
.block_addr(
addr_a,
clock.now(),
BlocklistJustification::MissingChainspecHash,
&mut rng,
)
.is_none());
assert!(manager.is_blocked(addr_a));
clock.advance_time(config_variant_unblock().unblock_after_max.as_millis() as u64 + 1);
assert!(dials(
addr_a,
&manager.perform_housekeeping(&mut rng, clock.now())
));
assert!(!manager.is_blocked(addr_a));
}
}