use std::{collections::BTreeSet, mem, net::SocketAddr, time::Instant};
use datasize::DataSize;
use tracing::{debug, warn};
#[derive(DataSize, Debug, Default)]
pub(super) enum ConnectionSymmetry {
IncomingOnly {
since: Instant,
peer_addrs: BTreeSet<SocketAddr>,
},
OutgoingOnly {
since: Instant,
},
Symmetric {
peer_addrs: BTreeSet<SocketAddr>,
},
#[default]
Gone,
}
impl ConnectionSymmetry {
pub(super) fn add_incoming(&mut self, peer_addr: SocketAddr, since: Instant) -> bool {
match self {
ConnectionSymmetry::IncomingOnly {
ref mut peer_addrs, ..
} => {
peer_addrs.insert(peer_addr);
debug!(
total_incoming_count = peer_addrs.len(),
"added additional incoming connection on non-symmetric"
);
false
}
ConnectionSymmetry::OutgoingOnly { .. } => {
let mut peer_addrs = BTreeSet::new();
peer_addrs.insert(peer_addr);
*self = ConnectionSymmetry::Symmetric { peer_addrs };
debug!("added incoming connection, now symmetric");
true
}
ConnectionSymmetry::Symmetric { peer_addrs } => {
peer_addrs.insert(peer_addr);
debug!(
total_incoming_count = peer_addrs.len(),
"added additional incoming connection on symmetric"
);
false
}
ConnectionSymmetry::Gone => {
let mut peer_addrs = BTreeSet::new();
peer_addrs.insert(peer_addr);
*self = ConnectionSymmetry::IncomingOnly { peer_addrs, since };
debug!("added incoming connection, now incoming only");
false
}
}
}
pub(super) fn remove_incoming(&mut self, peer_addr: SocketAddr, now: Instant) -> bool {
match self {
ConnectionSymmetry::IncomingOnly { peer_addrs, .. } => {
if !peer_addrs.remove(&peer_addr) {
warn!("tried to remove non-existent incoming connection from symmetry");
}
if peer_addrs.is_empty() {
*self = ConnectionSymmetry::Gone;
debug!("removed incoming connection, now gone");
false
} else {
debug!(
total_incoming_count = peer_addrs.len(),
"removed incoming connection, still has remaining incoming"
);
true
}
}
ConnectionSymmetry::OutgoingOnly { .. } => {
warn!("cannot remove incoming connection from outgoing-only");
true
}
ConnectionSymmetry::Symmetric { peer_addrs } => {
if !peer_addrs.remove(&peer_addr) {
warn!("tried to remove non-existent symmetric connection from symmetry");
}
if peer_addrs.is_empty() {
*self = ConnectionSymmetry::OutgoingOnly { since: now };
debug!("removed incoming connection, now incoming-only");
}
true
}
ConnectionSymmetry::Gone => {
warn!("removing incoming connection from already gone symmetry");
false
}
}
}
pub(super) fn mark_outgoing(&mut self, now: Instant) -> bool {
match self {
ConnectionSymmetry::IncomingOnly { peer_addrs, .. } => {
debug!("incoming connection marked outgoing, now complete");
*self = ConnectionSymmetry::Symmetric {
peer_addrs: mem::take(peer_addrs),
};
true
}
ConnectionSymmetry::OutgoingOnly { .. } => {
warn!("outgoing connection marked outgoing");
false
}
ConnectionSymmetry::Symmetric { .. } => {
warn!("symmetric connection marked outgoing");
false
}
ConnectionSymmetry::Gone => {
*self = ConnectionSymmetry::OutgoingOnly { since: now };
debug!("absent connection marked outgoing");
false
}
}
}
pub(super) fn unmark_outgoing(&mut self, now: Instant) -> bool {
match self {
ConnectionSymmetry::IncomingOnly { .. } => {
warn!("incoming-only unmarked outgoing");
true
}
ConnectionSymmetry::OutgoingOnly { .. } => {
*self = ConnectionSymmetry::Gone;
debug!("outgoing connection unmarked, now gone");
false
}
ConnectionSymmetry::Symmetric { peer_addrs } => {
*self = ConnectionSymmetry::IncomingOnly {
peer_addrs: mem::take(peer_addrs),
since: now,
};
debug!("symmetric connection unmarked, now outgoing only");
true
}
ConnectionSymmetry::Gone => {
warn!("gone marked outgoing");
false
}
}
}
pub(super) fn incoming_addrs(&self) -> Option<&BTreeSet<SocketAddr>> {
match self {
ConnectionSymmetry::IncomingOnly { peer_addrs, .. }
| ConnectionSymmetry::Symmetric { peer_addrs, .. } => Some(peer_addrs),
ConnectionSymmetry::OutgoingOnly { .. } | ConnectionSymmetry::Gone => None,
}
}
}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeSet,
net::SocketAddr,
time::{Duration, Instant},
};
use crate::testing::test_clock::TestClock;
use super::ConnectionSymmetry;
fn should_be_reaped(
connection_symmetry: &ConnectionSymmetry,
now: Instant,
max_time_asymmetric: Duration,
) -> bool {
match connection_symmetry {
ConnectionSymmetry::IncomingOnly { since, .. } => now >= *since + max_time_asymmetric,
ConnectionSymmetry::OutgoingOnly { since } => now >= *since + max_time_asymmetric,
ConnectionSymmetry::Symmetric { .. } => false,
ConnectionSymmetry::Gone => true,
}
}
#[test]
fn symmetry_successful_lifecycles() {
let mut clock = TestClock::new();
let max_time_asymmetric = Duration::from_secs(240);
let peer_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut sym = ConnectionSymmetry::default();
assert!(should_be_reaped(&sym, clock.now(), max_time_asymmetric));
sym.add_incoming(peer_addr, clock.now());
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
clock.advance(Duration::from_secs(20));
sym.mark_outgoing(clock.now());
clock.advance(Duration::from_secs(1_000_000));
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
}
#[test]
fn symmetry_lifecycle_reaps_incoming_only() {
let mut clock = TestClock::new();
let max_time_asymmetric = Duration::from_secs(240);
let peer_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let peer_addr2: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let mut sym = ConnectionSymmetry::default();
sym.add_incoming(peer_addr, clock.now());
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
clock.advance(Duration::from_secs(120));
sym.add_incoming(peer_addr2, clock.now());
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
let mut expected = BTreeSet::new();
expected.insert(peer_addr);
expected.insert(peer_addr2);
assert_eq!(sym.incoming_addrs(), Some(&expected));
clock.advance(Duration::from_secs(120));
assert!(should_be_reaped(&sym, clock.now(), max_time_asymmetric));
}
#[test]
fn symmetry_lifecycle_reaps_outgoing_only() {
let mut clock = TestClock::new();
let max_time_asymmetric = Duration::from_secs(240);
let mut sym = ConnectionSymmetry::default();
sym.mark_outgoing(clock.now());
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
clock.advance(Duration::from_secs(120));
assert!(!should_be_reaped(&sym, clock.now(), max_time_asymmetric));
clock.advance(Duration::from_secs(120));
assert!(should_be_reaped(&sym, clock.now(), max_time_asymmetric));
}
}