//! Management of outgoing connections.
//!
//! This module implements outgoing connection management, decoupled from the underlying transport
//! or any higher-level level parts. It encapsulates the reconnection and blocklisting logic on the
//! `SocketAddr` level.
//!
//! # Basic structure
//!
//! Core of this module is the `OutgoingManager`, which supports the following functionality:
//!
//! * Handed a `SocketAddr`s via the `learn_addr` function, it will permanently maintain a
//! connection to the given address, only giving up if retry thresholds are exceeded, after which
//! it will be forgotten.
//! * `block_addr` and `redeem_addr` can be used to maintain a `SocketAddr`-keyed block list.
//! * `OutgoingManager` maintains an internal routing table. The `get_route` function can be used to
//! retrieve a "route" (typically a `sync::channel` accepting network messages) to a remote peer
//! by `NodeId`.
//!
//! # Requirements
//!
//! `OutgoingManager` is decoupled from the underlying protocol, all of its interactions are
//! performed through [`DialRequest`] and [`DialOutcome`]s. This frees the `OutgoingManager` from
//! having to worry about protocol specifics.
//!
//! Three conditions not expressed in code must be fulfilled for the `OutgoingManager` to function:
//!
//! * The `Dialer` is expected to produce `DialOutcomes` for every dial [`DialRequest::Dial`]
//! eventually. These must be forwarded to the `OutgoingManager` via the `handle_dial_outcome`
//! function.
//! * The `perform_housekeeping` method must be called periodically to give the the
//! `OutgoingManager` a chance to initiate reconnections and collect garbage.
//! * When a connection is dropped, the connection manager must be notified via
//! `handle_connection_drop`.
//!
//! # Lifecycle
//!
//! The following chart illustrates the lifecycle of an outgoing connection.
//!
//! ```text
//! forget (after n tries)
//! ┌────────────────────────────────────┐
//! │ learn ▼
//! │ ┌────────────── unknown/forgotten
//! │ │ (implicit state)
//! │ │
//! │ │ │
//! │ │ │ block
//! │ │ │
//! │ │ │
//! │ │ ▼
//! ┌────┴────┐ │ ┌─────────┐
//! │ │ fail │ block │ │
//! │ Waiting │◄───────┐ │ ┌─────►│ Blocked │◄──────────┐
//! ┌───┤ │ │ │ │ │ │ │
//! │ └────┬────┘ │ │ │ └────┬────┘ │
//! │ block │ │ │ │ │ │
//! │ │ timeout │ ▼ │ │ redeem, │
//! │ │ ┌────┴─────┴───┐ │ block timeout │
//! │ │ │ │ │ │
//! │ └───────►│ Connecting │◄──────┘ │
//! │ │ │ │
//! │ └─────┬────┬───┘ │
//! │ │ ▲ │ │
//! │ success │ │ │ detect │
//! │ │ │ │ ┌──────────┐ │
//! │ ┌───────────┐ │ │ │ │ │ │
//! │ │ │◄────────┘ │ │ │ Loopback │ │
//! │ │ Connected │ │ └─────►│ │ │
//! │ │ │ dropped │ └──────────┘ │
//! │ └─────┬─────┴───────────┘ │
//! │ │ │
//! │ │ block │
//! └───────┴─────────────────────────────────────────────────┘
//! ```
//!
//! # Timeouts/safety
//!
//! The `sweep` transition for connections usually does not happen during normal operations. Three
//! causes are typical for it:
//!
//! * A configured TCP timeout above [`OutgoingConfig::sweep_timeout`].
//! * Very slow responses from remote peers (similar to a Slowloris-attack)
//! * Faulty handling by the driver of the [`OutgoingManager`], i.e. the outside component.
//!
//! Should a dial attempt exceed a certain timeout, it is considered failed and put into the waiting
//! state again.
//!
//! If a conflict (multiple successful dial results) occurs, the more recent connection takes
//! precedence over the previous one. This prevents problems when a notification of a terminated
//! connection is overtaken by the new connection announcement.
// Clippy has a lot of false positives due to `span.clone()`-closures.
#![allow(clippy::redundant_clone)]
use std::{
collections::{hash_map::Entry, HashMap},
error::Error,
fmt::{self, Debug, Display, Formatter},
mem,
net::SocketAddr,
time::{Duration, Instant},
};
use datasize::DataSize;
use tracing::{debug, error_span, field::Empty, info, trace, warn, Span};
use super::{display_error, NodeId};
/// An outgoing connection/address in various states.
#[derive(DataSize, Debug)]
pub struct Outgoing<H, E>
where
H: DataSize,
E: DataSize,
{
/// Whether or not the address is unforgettable, see `learn_addr` for details.
is_unforgettable: bool,
/// The current state the connection/address is in.
state: OutgoingState<H, E>,
}
/// Active state for a connection/address.
#[derive(DataSize, Debug)]
pub enum OutgoingState<H, E>
where
H: DataSize,
E: DataSize,
{
/// The outgoing address has been known for the first time and we are currently connecting.
Connecting {
/// Number of attempts that failed, so far.
failures_so_far: u8,
/// Time when the connection attempt was instantiated.
since: Instant,
},
/// The connection has failed at least one connection attempt and is waiting for a retry.
Waiting {
/// Number of attempts that failed, so far.
failures_so_far: u8,
/// The most recent connection error.
///
/// If not given, the connection was put into a `Waiting` state due to a sweep timeout.
error: Option<E>,
/// The precise moment when the last connection attempt failed.
last_failure: Instant,
},
/// An established outgoing connection.
Connected {
/// The peers remote ID.
peer_id: NodeId,
/// Handle to a communication channel that can be used to send data to the peer.
///
/// Can be a channel to decouple sending, or even a direct connection handle.
handle: H,
},
/// The address was blocked and will not be retried.
Blocked { since: Instant },
/// The address is owned by ourselves and will not be tried again.
Loopback,
}
impl<H, E> Display for OutgoingState<H, E>
where
H: DataSize,
E: DataSize,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
OutgoingState::Connecting {
failures_so_far, ..
} => {
write!(f, "connecting({})", failures_so_far)
}
OutgoingState::Waiting {
failures_so_far, ..
} => write!(f, "waiting({})", failures_so_far),
OutgoingState::Connected { .. } => write!(f, "connected"),
OutgoingState::Blocked { .. } => write!(f, "blocked"),
OutgoingState::Loopback => write!(f, "loopback"),
}
}
}
/// The result of dialing `SocketAddr`.
#[derive(Debug)]
pub enum DialOutcome<H, E> {
/// A connection was successfully established.
Successful {
/// The address dialed.
addr: SocketAddr,
/// A handle to send data down the connection.
handle: H,
/// The remote peer's authenticated node ID.
node_id: NodeId,
},
/// The connection attempt failed.
Failed {
/// The address dialed.
addr: SocketAddr,
/// The error encountered while dialing.
error: E,
/// The moment the connection attempt failed.
when: Instant,
},
/// The connection was aborted, because the remote peer turned out to be a loopback.
Loopback {
/// The address used to connect.
addr: SocketAddr,
},
}
impl<H, E> DialOutcome<H, E> {
/// Retrieves the socket address from the `DialOutcome`.
fn addr(&self) -> SocketAddr {
match self {
DialOutcome::Successful { addr, .. } => *addr,
DialOutcome::Failed { addr, .. } => *addr,
DialOutcome::Loopback { addr, .. } => *addr,
}
}
}
/// A request made for dialing.
#[derive(Clone, Debug)]
#[must_use]
pub(crate) enum DialRequest<H> {
/// Attempt to connect to the outgoing socket address.
///
/// For every time this request is emitted, there must be a corresponding call to
/// `handle_dial_outcome` eventually.
///
/// Any logging of connection issues should be done in the context of `span` for better log
/// output.
Dial { addr: SocketAddr, span: Span },
/// Disconnects a potentially existing connection.
///
/// Used when a peer has been blocked or should be disconnected for other reasons.
Disconnect { handle: H, span: Span },
}
impl<H> Display for DialRequest<H>
where
H: Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
DialRequest::Dial { addr, .. } => {
write!(f, "dial: {}", addr)
}
DialRequest::Disconnect { handle, .. } => {
write!(f, "disconnect: {}", handle)
}
}
}
}
#[derive(DataSize, Debug)]
/// Connection settings for the outgoing connection manager.
pub struct OutgoingConfig {
/// The maximum number of attempts before giving up and forgetting an address, if permitted.
pub(crate) retry_attempts: u8,
/// The basic time slot for exponential backoff when reconnecting.
pub(crate) base_timeout: Duration,
/// Time until an outgoing address is unblocked.
pub(crate) unblock_after: Duration,
/// Safety timeout, after which a connection is no longer expected to finish dialing.
pub(crate) sweep_timeout: Duration,
}
impl OutgoingConfig {
/// Calculates the backoff time.
///
/// `failed_attempts` (n) is the number of previous attempts *before* the current failure (thus
/// starting at 0). The backoff time will be double for each attempt.
fn calc_backoff(&self, failed_attempts: u8) -> Duration {
2u32.pow(failed_attempts as u32) * self.base_timeout
}
}
/// Manager of outbound connections.
///
/// See the module documentation for usage suggestions.
#[derive(DataSize, Debug)]
pub struct OutgoingManager<H, E>
where
H: DataSize,
E: DataSize,
{
/// Outgoing connections subsystem configuration.
config: OutgoingConfig,
/// Mapping of address to their current connection state.
outgoing: HashMap<SocketAddr, Outgoing<H, E>>,
/// Routing table.
///
/// Contains a mapping from node IDs to connected socket addresses. A missing entry means that
/// the destination is not connected.
routes: HashMap<NodeId, SocketAddr>,
}
impl<H, E> OutgoingManager<H, E>
where
H: DataSize,
E: DataSize,
{
/// Creates a new outgoing manager.
pub(crate) fn new(config: OutgoingConfig) -> Self {
Self {
config,
outgoing: Default::default(),
routes: Default::default(),
}
}
}
/// Creates a logging span for a specific connection.
#[inline]
fn make_span<H, E>(addr: SocketAddr, outgoing: Option<&Outgoing<H, E>>) -> Span
where
H: DataSize,
E: DataSize,
{
// Note: The jury is still out on whether we want to create a single span per connection and
// cache it, or create a new one (with the same connection ID) each time this is called. The
// advantage of the former is external tools have it easier correlating all related
// information, while the drawback is not being able to change the parent span link, which
// might be awkward.
if let Some(outgoing) = outgoing {
match outgoing.state {
OutgoingState::Connected { peer_id, .. } => {
error_span!("outgoing", %addr, state=%outgoing.state, %peer_id, validator_id=Empty)
}
_ => {
error_span!("outgoing", %addr, state=%outgoing.state, peer_id=Empty, validator_id=Empty)
}
}
} else {
error_span!("outgoing", %addr, state = "-")
}
}
impl<H, E> OutgoingManager<H, E>
where
H: DataSize + Clone,
E: DataSize + Error,
{
/// Changes the state of an outgoing connection.
///
/// Will trigger an update of the routing table if necessary. Does not emit any other
/// side-effects.
fn change_outgoing_state(
&mut self,
addr: SocketAddr,
mut new_state: OutgoingState<H, E>,
) -> &mut Outgoing<H, E> {
let (prev_state, new_outgoing) = match self.outgoing.entry(addr) {
Entry::Vacant(vacant) => {
let inserted = vacant.insert(Outgoing {
state: new_state,
is_unforgettable: false,
});
(None, inserted)
}
Entry::Occupied(occupied) => {
let prev = occupied.into_mut();
mem::swap(&mut prev.state, &mut new_state);
// `new_state` and `prev.state` are swapped now.
(Some(new_state), prev)
}
};
// Update the routing table.
match (&prev_state, &new_outgoing.state) {
(Some(OutgoingState::Connected { .. }), OutgoingState::Connected { .. }) => {
trace!("route unchanged, already connected");
}
// Dropping from connected to any other state requires clearing the route.
(Some(OutgoingState::Connected { peer_id, .. }), _) => {
debug!(%peer_id, "route removed");
self.routes.remove(peer_id);
}
// Otherwise we have established a new route.
(_, OutgoingState::Connected { peer_id, .. }) => {
debug!(%peer_id, "route added");
self.routes.insert(*peer_id, addr);
}
_ => {
trace!("route unchanged");
}
}
new_outgoing
}
/// Retrieves the address by peer.
pub(crate) fn get_addr(&self, peer_id: NodeId) -> Option<SocketAddr> {
self.routes.get(&peer_id).copied()
}
/// Retrieves a handle to a peer.
///
/// Primary function to send data to peers; clients retrieve a handle to it which can then
/// be used to send data.
pub(crate) fn get_route(&self, peer_id: NodeId) -> Option<&H> {
let outgoing = self.outgoing.get(self.routes.get(&peer_id)?)?;
if let OutgoingState::Connected { ref handle, .. } = outgoing.state {
Some(handle)
} else {
None
}
}
/// Iterates over all connected peer IDs.
#[allow(clippy::needless_lifetimes)]
pub(crate) fn connected_peers<'a>(&'a self) -> impl Iterator<Item = NodeId> + 'a {
self.routes.keys().cloned()
}
/// Notify about a potentially new address that has been discovered.
///
/// Immediately triggers the connection process to said address if it was not known before.
///
/// A connection marked `unforgettable` will never be evicted but reset instead when it exceeds
/// the retry limit.
pub(crate) fn learn_addr(
&mut self,
addr: SocketAddr,
unforgettable: bool,
now: Instant,
) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Occupied(_) => {
debug!("ignoring already known address");
None
}
Entry::Vacant(_vacant) => {
info!("connecting to newly learned address");
let outgoing = self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
if outgoing.is_unforgettable != unforgettable {
outgoing.is_unforgettable = unforgettable;
debug!(unforgettable, "marked");
}
Some(DialRequest::Dial { addr, span })
}
})
}
/// Blocks an address.
///
/// Causes any current connection to the address to be terminated and future ones prohibited.
pub(crate) fn block_addr(&mut self, addr: SocketAddr, now: Instant) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Vacant(_vacant) => {
info!("unknown address blocked");
self.change_outgoing_state(addr, OutgoingState::Blocked { since: now });
None
}
// TODO: Check what happens on close on our end, i.e. can we distinguish in logs
// between a closed connection on our end vs one that failed?
Entry::Occupied(occupied) => match occupied.get().state {
OutgoingState::Blocked { .. } => {
debug!("address already blocked");
None
}
OutgoingState::Loopback => {
warn!("loopback address block ignored");
None
}
OutgoingState::Connected { ref handle, .. } => {
info!("connected address blocked, disconnecting");
let handle = handle.clone();
self.change_outgoing_state(addr, OutgoingState::Blocked { since: now });
Some(DialRequest::Disconnect { span, handle })
}
OutgoingState::Waiting { .. } | OutgoingState::Connecting { .. } => {
info!("address blocked");
self.change_outgoing_state(addr, OutgoingState::Blocked { since: now });
None
}
},
})
}
/// Checks if an address is blocked.
#[cfg(test)]
pub(crate) fn is_blocked(&self, addr: SocketAddr) -> bool {
match self.outgoing.get(&addr) {
Some(outgoing) => matches!(outgoing.state, OutgoingState::Blocked { .. }),
None => false,
}
}
/// Removes an address from the block list.
///
/// Does nothing if the address was not blocked.
// This function is currently not in use by `small_network` itself.
#[allow(dead_code)]
pub(crate) fn redeem_addr(&mut self, addr: SocketAddr, now: Instant) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone()
.in_scope(move || match self.outgoing.entry(addr) {
Entry::Vacant(_) => {
debug!("unknown address redeemed");
None
}
Entry::Occupied(occupied) => match occupied.get().state {
OutgoingState::Blocked { .. } => {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
Some(DialRequest::Dial { addr, span })
}
_ => {
debug!("address redemption ignored, not blocked");
None
}
},
})
}
/// Performs housekeeping like reconnection or unblocking peers.
///
/// This function must periodically be called. A good interval is every second.
pub(super) fn perform_housekeeping(&mut self, now: Instant) -> Vec<DialRequest<H>> {
let mut to_forget = Vec::new();
let mut to_fail = Vec::new();
let mut to_reconnect = Vec::new();
for (&addr, outgoing) in self.outgoing.iter() {
let span = make_span(addr, Some(outgoing));
span.in_scope(|| match outgoing.state {
// Decide whether to attempt reconnecting a failed-waiting address.
OutgoingState::Waiting {
failures_so_far,
last_failure,
..
} => {
if failures_so_far > self.config.retry_attempts {
if outgoing.is_unforgettable {
// Unforgettable addresses simply have their timer reset.
info!("unforgettable address reset");
to_reconnect.push((addr, 0));
} else {
// Address had too many attempts at reconnection, we will forget
// it after exiting this closure.
to_forget.push(addr);
info!("address forgotten");
}
} else {
// The address has not exceeded the limit, so check if it is due.
let due = last_failure + self.config.calc_backoff(failures_so_far);
if now >= due {
debug!(attempts = failures_so_far, "address reconnecting");
to_reconnect.push((addr, failures_so_far));
}
}
}
OutgoingState::Blocked { since } => {
if now >= since + self.config.unblock_after {
info!("address unblocked");
to_reconnect.push((addr, 0));
}
}
OutgoingState::Connecting {
since,
failures_so_far,
} => {
let timeout = since + self.config.sweep_timeout;
if now >= timeout {
// The outer component has not called us with a `DialOutcome` in a
// reasonable amount of time. This should happen very rarely, ideally
// never.
warn!("address timed out connecting, was swept");
// Count the timeout as a failure against the connection.
to_fail.push((addr, failures_so_far + 1));
}
}
OutgoingState::Connected { .. } | OutgoingState::Loopback => {
// Entry is ignored. Not outputting any `trace` because this is log spam even at
// the `trace` level.
}
});
}
// Remove all addresses marked for forgetting.
to_forget.into_iter().for_each(|addr| {
self.outgoing.remove(&addr);
});
// Fail connections that are taking way too long to connect.
to_fail.into_iter().for_each(|(addr, failures_so_far)| {
let span = make_span(addr, self.outgoing.get(&addr));
span.in_scope(|| {
self.change_outgoing_state(
addr,
OutgoingState::Waiting {
failures_so_far,
error: None,
last_failure: now,
},
)
});
});
// Reconnect all others.
to_reconnect
.into_iter()
.map(|(addr, failures_so_far)| {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(|| {
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far,
since: now,
},
)
});
DialRequest::Dial { addr, span }
})
.collect()
}
/// Handles the outcome of a dialing attempt.
///
/// Note that reconnects will earliest happen on the next `perform_housekeeping` call.
pub(crate) fn handle_dial_outcome(
&mut self,
dial_outcome: DialOutcome<H, E>,
) -> Option<DialRequest<H>> {
let addr = dial_outcome.addr();
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(move || match dial_outcome {
DialOutcome::Successful {
addr,
handle,
node_id,
..
} => {
info!("established outgoing connection");
if let Some(Outgoing{
state: OutgoingState::Blocked { .. }, ..
}) = self.outgoing.get(&addr) {
// If we connected to a blocked address, do not go into connected, but stay
// blocked instead.
Some(DialRequest::Disconnect{
handle, span
})
} else {
// Otherwise, just record the connected state.
self.change_outgoing_state(
addr,
OutgoingState::Connected {
peer_id: node_id,
handle,
},
);
None
}
}
DialOutcome::Failed { addr, error, when } => {
info!(err = display_error(&error), "outgoing connection failed");
if let Some(outgoing) = self.outgoing.get(&addr) {
match outgoing.state {
OutgoingState::Connecting { failures_so_far,.. } => {
self.change_outgoing_state(
addr,
OutgoingState::Waiting {
failures_so_far: failures_so_far + 1,
error: Some(error),
last_failure: when,
},
);
None
}
OutgoingState::Blocked { .. } => {
debug!("failed dial outcome after block ignored");
// We do not set the connection to "waiting" if an out-of-order failed
// connection arrives, but continue to honor the blocking.
None
}
OutgoingState::Waiting { .. } |
OutgoingState::Connected { .. } |
OutgoingState::Loopback => {
warn!(
"processing dial outcome on a connection that was not marked as connecting or blocked"
);
None
}
}
} else {
warn!("processing dial outcome non-existent connection");
// If the connection does not exist, do not introduce it!
None
}
}
DialOutcome::Loopback { addr } => {
info!("found loopback address");
self.change_outgoing_state(addr, OutgoingState::Loopback);
None
}
})
}
/// Notifies the connection manager about a dropped connection.
///
/// This will usually result in an immediate reconnection.
pub(crate) fn handle_connection_drop(
&mut self,
addr: SocketAddr,
now: Instant,
) -> Option<DialRequest<H>> {
let span = make_span(addr, self.outgoing.get(&addr));
span.clone().in_scope(move || {
if let Some(outgoing) = self.outgoing.get(&addr) {
match outgoing.state {
OutgoingState::Waiting { .. }
| OutgoingState::Loopback
| OutgoingState::Connecting { .. } => {
// We should, under normal circumstances, not receive drop notifications for
// any of these. Connection failures are handled by the dialer.
warn!("unexpected drop notification");
None
}
OutgoingState::Connected { .. } => {
// Drop the handle, immediately initiate a reconnection.
self.change_outgoing_state(
addr,
OutgoingState::Connecting {
failures_so_far: 0,
since: now,
},
);
Some(DialRequest::Dial { addr, span })
}
OutgoingState::Blocked { .. } => {
// Blocked addresses ignore connection drops.
debug!("received drop notification for blocked connection");
None
}
}
} else {
warn!("received connection drop notification for unknown connection");
None
}
})
}
}
#[cfg(test)]
mod tests {
use std::{net::SocketAddr, time::Duration};
use datasize::DataSize;
use thiserror::Error;
use super::{DialOutcome, DialRequest, NodeId, OutgoingConfig, OutgoingManager};
use crate::testing::{init_logging, test_clock::TestClock};
/// Error for test dialer.
///
/// Tracks a configurable id for the error.
#[derive(DataSize, Debug, Error)]
#[error("test dialer error({})", id)]
struct TestDialerError {
id: u32,
}
/// Setup an outgoing configuration for testing.
fn test_config() -> OutgoingConfig {
OutgoingConfig {
retry_attempts: 3,
base_timeout: Duration::from_secs(1),
unblock_after: Duration::from_secs(60),
sweep_timeout: Duration::from_secs(45),
}
}
/// Helper function that checks if a given dial request actually dials the expected address.
fn dials<'a, H, T>(expected: SocketAddr, requests: T) -> bool
where
T: IntoIterator<Item = &'a DialRequest<H>> + 'a,
H: 'a,
{
for req in requests.into_iter() {
if let DialRequest::Dial { addr, .. } = req {
if *addr == expected {
return true;
}
}
}
false
}
/// Helper function that checks if a given dial request actually disconnects the expected
/// address.
fn disconnects<'a, H, T>(expected: H, requests: T) -> bool
where
T: IntoIterator<Item = &'a DialRequest<H>> + 'a,
H: 'a + PartialEq,
{
for req in requests.into_iter() {
if let DialRequest::Disconnect { handle, .. } = req {
if *handle == expected {
return true;
}
}
}
false
}
#[test]
fn successful_lifecycle() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
// We begin by learning a single, regular address, triggering a dial request.
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
// Our first connection attempt fails. The connection should now be in waiting state, but
// not reconnect, since the minimum delay is 2 seconds (2*base_timeout).
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 1 },
when: clock.now(),
},)
.is_none());
// Performing housekeeping multiple times should not make a difference.
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Advancing the clock will trigger a reconnection on the next housekeeping.
clock.advance_time(2_000);
assert!(dials(addr_a, &manager.perform_housekeeping(clock.now())));
// This time the connection succeeds.
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 99,
node_id: id_a,
},)
.is_none());
// The routing table should have been updated and should return the handle.
assert_eq!(manager.get_route(id_a), Some(&99));
assert_eq!(manager.get_addr(id_a), Some(addr_a));
// Time passes, and our connection drops. Reconnecting should be immediate.
assert!(manager.perform_housekeeping(clock.now()).is_empty());
clock.advance_time(20_000);
assert!(dials(
addr_a,
&manager.handle_connection_drop(addr_a, clock.now())
));
// The route should have been cleared.
assert!(manager.get_route(id_a).is_none());
assert!(manager.get_addr(id_a).is_none());
// Reconnection is already in progress, so we do not expect another request on housekeeping.
assert!(manager.perform_housekeeping(clock.now()).is_empty());
}
#[test]
fn connections_forgotten_after_too_many_tries() {
init_logging();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
// Address `addr_b` will be a known address.
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
// First, attempt to connect. Tests are set to 3 retries after 2, 4 and 8 seconds.
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
assert!(dials(
addr_b,
&manager.learn_addr(addr_b, true, clock.now())
));
// Fail the first connection attempts, not triggering a retry (timeout not reached yet).
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 11 },
when: clock.now(),
},)
.is_none());
// Learning the address again should not cause a reconnection.
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(manager.learn_addr(addr_b, false, clock.now()).is_none());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(manager.learn_addr(addr_b, false, clock.now()).is_none());
// After 1.999 seconds, reconnection should still be delayed.
clock.advance_time(1_999);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Adding 0.001 seconds finally is enough to reconnect.
clock.advance_time(1);
let requests = manager.perform_housekeeping(clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
// Waiting for more than the reconnection delay should not be harmful or change
// anything, as we are currently connecting.
clock.advance_time(6_000);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Fail the connection again, wait 3.999 seconds, expecting no reconnection.
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 40 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 41 },
when: clock.now(),
},)
.is_none());
clock.advance_time(3_999);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Adding 0.001 seconds finally again pushes us over the threshold.
clock.advance_time(1);
let requests = manager.perform_housekeeping(clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
// Fail the connection quickly.
clock.advance_time(25);
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// The last attempt should happen 8 seconds after the error, not the last attempt.
clock.advance_time(7_999);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
clock.advance_time(1);
let requests = manager.perform_housekeeping(clock.now());
assert!(dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
// Fail the last attempt. No more reconnections should be happening.
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_b,
error: TestDialerError { id: 10 },
when: clock.now(),
},)
.is_none());
// Only the unforgettable address should be reconnecting.
let requests = manager.perform_housekeeping(clock.now());
assert!(!dials(addr_a, &requests));
assert!(dials(addr_b, &requests));
// But not `addr_a`, even after a long wait.
clock.advance_time(1_000_000_000);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
}
#[test]
fn blocking_works() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
// We use `addr_b` as an unforgettable address, which does not mean it cannot be blocked!
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let addr_c: SocketAddr = "9.0.1.2:9012".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let id_b = NodeId::random(&mut rng);
let id_c = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
// Block `addr_a` from the start.
assert!(manager.block_addr(addr_a, clock.now()).is_none());
// Learning both `addr_a` and `addr_b` should only trigger a connection to `addr_b` now.
assert!(manager.learn_addr(addr_a, false, clock.now()).is_none());
assert!(dials(
addr_b,
&manager.learn_addr(addr_b, true, clock.now())
));
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Fifteen seconds later we succeed in connecting to `addr_b`.
clock.advance_time(15_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 101,
node_id: id_b,
},)
.is_none());
assert_eq!(manager.get_route(id_b), Some(&101));
// Invariant through housekeeping.
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert_eq!(manager.get_route(id_b), Some(&101));
// Another fifteen seconds later, we block `addr_b`.
clock.advance_time(15_000);
assert!(disconnects(101, &manager.block_addr(addr_b, clock.now())));
// `addr_c` will be blocked during the connection phase.
assert!(dials(
addr_c,
&manager.learn_addr(addr_c, false, clock.now())
));
assert!(manager.block_addr(addr_c, clock.now()).is_none());
// We are still expect to provide a dial outcome, but afterwards, there should be no
// route to C and an immediate disconnection should be queued.
assert!(disconnects(
42,
&manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_c,
handle: 42,
node_id: id_c,
},)
));
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.get_route(id_c).is_none());
// At this point, we have blocked all three addresses. 30 seconds later, the first one is
// unblocked due to the block timing out.
clock.advance_time(30_000);
assert!(dials(addr_a, &manager.perform_housekeeping(clock.now())));
// Fifteen seconds later, B and C are still blocked, but we redeem B early.
clock.advance_time(15_000);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(dials(addr_b, &manager.redeem_addr(addr_b, clock.now())));
// Succeed both connections, and ensure we have routes to both.
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 77,
node_id: id_b,
},)
.is_none());
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 66,
node_id: id_a,
},)
.is_none());
assert_eq!(manager.get_route(id_a), Some(&66));
assert_eq!(manager.get_route(id_b), Some(&77));
}
#[test]
fn loopback_handled_correctly() {
init_logging();
let mut clock = TestClock::new();
let loopback_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
// Loopback addresses are connected to only once, and then marked as loopback forever.
assert!(dials(
loopback_addr,
&manager.learn_addr(loopback_addr, false, clock.now())
));
assert!(manager
.handle_dial_outcome(DialOutcome::Loopback {
addr: loopback_addr,
},)
.is_none());
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// Learning loopbacks again should not trigger another connection
assert!(manager
.learn_addr(loopback_addr, false, clock.now())
.is_none());
// Blocking loopbacks does not result in a block, since regular blocks would clear after
// some time.
assert!(manager.block_addr(loopback_addr, clock.now()).is_none());
clock.advance_time(1_000_000_000);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
}
#[test]
fn connected_peers_works() {
init_logging();
let mut rng = crate::new_rng();
let clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let addr_b: SocketAddr = "5.6.7.8:5678".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let id_b = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
manager.learn_addr(addr_a, false, clock.now());
manager.learn_addr(addr_b, true, clock.now());
manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 22,
node_id: id_a,
});
manager.handle_dial_outcome(DialOutcome::Successful {
addr: addr_b,
handle: 33,
node_id: id_b,
});
let mut peer_ids: Vec<_> = manager.connected_peers().collect();
let mut expected = vec![id_a, id_b];
peer_ids.sort();
expected.sort();
assert_eq!(peer_ids, expected);
}
#[test]
fn sweeping_works() {
init_logging();
let mut rng = crate::new_rng();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let id_a = NodeId::random(&mut rng);
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
// Trigger a new connection via learning an address.
assert!(dials(
addr_a,
&manager.learn_addr(addr_a, false, clock.now())
));
// We now let enough time pass to cause the connection to be considered failed aborted.
// No effects are expected at this point.
clock.advance_time(50_000);
assert!(manager.perform_housekeeping(clock.now()).is_empty());
// The connection will now experience a regular failure. Since this is the first connection
// failure, it should reconnect after 2 seconds.
clock.advance_time(2_000);
assert!(dials(addr_a, &manager.perform_housekeeping(clock.now())));
// We now simulate the second connection (`handle: 2`) succeeding first, after 1 second.
clock.advance_time(1_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 2,
node_id: id_a,
})
.is_none());
// A route should now be established.
assert_eq!(manager.get_route(id_a), Some(&2));
// More time passes and the first connection attempt finally finishes.
clock.advance_time(30_000);
assert!(manager
.handle_dial_outcome(DialOutcome::Successful {
addr: addr_a,
handle: 1,
node_id: id_a,
})
.is_none());
// We now expect to be connected through the first connection (see documentation).
assert_eq!(manager.get_route(id_a), Some(&1));
}
#[test]
fn blocking_not_overridden_by_racing_failed_connections() {
init_logging();
let mut clock = TestClock::new();
let addr_a: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut manager = OutgoingManager::<u32, TestDialerError>::new(test_config());
assert!(!manager.is_blocked(addr_a));
// Block `addr_a` from the start.
assert!(manager.block_addr(addr_a, clock.now()).is_none());
assert!(manager.is_blocked(addr_a));
clock.advance_time(60);
// Receive an "illegal" dial outcome, even though we did not dial.
assert!(manager
.handle_dial_outcome(DialOutcome::Failed {
addr: addr_a,
error: TestDialerError { id: 12345 },
/// The moment the connection attempt failed.
when: clock.now(),
})
.is_none());
// The failed connection should _not_ have reset the block!
assert!(manager.is_blocked(addr_a));
clock.advance_time(60);
assert!(manager.is_blocked(addr_a));
assert!(manager.perform_housekeeping(clock.now()).is_empty());
assert!(manager.is_blocked(addr_a));
}
}