use std::time::{
Duration,
SystemTime
};
use serde::{Serialize, Deserialize};
use crate::control::Conspirator;
use crate::message::PingOrPong;
use super::{
Address,
Clock,
link::{
Link,
LinkTickResult
},
NodeID
};
use super::status::{
elapsed,
remaining,
PeerState,
Status
};
use crate::util::*;
pub const TIME_TO_PING: Duration = Duration::from_secs(30);
pub const TIME_TO_DOUBLE_PING: Duration = Duration::from_secs(33);
pub const REPING_DELAY: Duration = Duration::from_secs(3);
pub const REPING_DELAY_MAX: Duration = Duration::from_secs(60);
pub const REACH_OUT_TIME: Duration = Duration::from_secs(300);
pub const INDIRECT_TIMEOUT: Duration = Duration::from_secs(90);
pub const PINGS_UNTIL_SUSPICIOUS: u8 = 5;
pub const PINGS_UNTIL_UNREACHABLE: u8 = 10;
use Status::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Peer {
pub id: NodeID,
pub name: String,
clk: Clock,
state: Status,
link: Option<Link>,
known_addrs: usize,
pub new_addr_pending: bool,
seen_by_peer: Option<SystemTime>,
addr_reach_out_idx: usize,
reached_out_to: Option<SystemTime>
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TickResult {
pub address_to_ping: Option<Address>,
pub state_changed: Option<Status>,
pub reach_out_to: Option<usize>,
pub next_tick_in: Duration
}
pub struct LinkUpdateResult {
pub was_inactive: bool,
pub new_address: bool
}
pub fn reping_delay(unponged: u8) -> Duration {
std::cmp::min(
REPING_DELAY * u32::pow(2, unponged.into()),
REPING_DELAY_MAX
)
}
impl Peer {
pub fn unknown(id: NodeID) -> Self {
Self {
id,
name: "unknown".to_string(),
clk: Clock::null_clock(),
state: Unreachable,
link: None,
known_addrs: 0,
new_addr_pending: false,
seen_by_peer: None,
addr_reach_out_idx: 0,
reached_out_to: None
}
}
pub fn new(id: NodeID, name: String, address: Address) -> Self {
Self {
id,
name,
clk: Clock::null_clock(),
state: Unreachable,
link: Some(Link::new(address)),
known_addrs: 1,
new_addr_pending: false,
seen_by_peer: None,
addr_reach_out_idx: 0,
reached_out_to: None
}
}
pub fn load(
id: NodeID,
name: String,
clk: Clock,
state: Status,
known_addrs: usize)
-> Self
{
Self {
id,
name,
clk,
state,
link: None,
known_addrs,
new_addr_pending: false,
seen_by_peer: None,
addr_reach_out_idx: 0,
reached_out_to: None
}
}
pub fn is_active(&self) -> bool {
self.state == Active
}
pub fn get_info(&self) -> PeerInfo {
PeerInfo {
id: self.id,
name: self.name.clone()
}
}
pub fn get_peer_state(&self) -> PeerState {
PeerState {
id: self.id,
clk: self.clk,
state: self.state
}
}
pub fn get_address(&self) -> Option<Address> {
Some(self.link.as_ref()?.address)
}
pub fn get_pending_address(&self) -> Option<Address> {
if self.new_addr_pending {Some(self.link.as_ref()?.address)}
else {None}
}
#[must_use = "link marked as pinged"]
pub fn get_address_to_ping(&mut self) -> Option<Address> {
self.link
.as_mut()
.map(|l| {l.been_pinged(); l.address})
}
pub fn last_seen(&self) -> Option<Duration> {
elapsed(self.link.as_ref()?.seen)
}
pub fn update_address_count(&mut self, addr_count: usize) {
if let (0, 1.., Indirect | Unreachable)
= (self.known_addrs, addr_count, self.state)
{
self.reached_out_to = None;
}
self.known_addrs = addr_count;
}
pub fn new_clock(&mut self, pkt_clock: Clock) -> bool {
if pkt_clock > self.clk {
self.clk = pkt_clock;
true
}
else {
warn!(%self.id, ?self.clk, ?pkt_clock, "packet clock outdated");
false
}
}
pub fn has_pnged(&mut self, from: Address, png: PingOrPong)
-> LinkUpdateResult
{
let was_inactive = self.state != Active;
self.state = Active;
let new_address = match self.link.as_mut() {
Some(l) if l.address == from => {
l.has_pnged(png);
false
},
Some(_) | None => {
self.link
.insert(Link::new(from))
.has_pnged(png);
true
}
};
LinkUpdateResult { was_inactive, new_address }
}
#[tracing::instrument]
pub fn absorb_peer_state(&mut self, state: PeerState) -> Option<Status> {
if self.clk < state.clk {
self.clk = state.clk;
match (self.state, state.state) {
(Active | Indirect, Active | Indirect) => {
self.seen_by_peer.set_now();
None
},
(Suspicious, Suspicious) |
(Quit, Quit) |
(Unreachable, Unreachable) => None,
(Unreachable, Suspicious) => None,
(Suspicious, Indirect) => None,
(Indirect, Suspicious) => None,
(Active, new @ (Quit | Unreachable | Suspicious)) |
(Indirect, new @ (Quit | Unreachable)) |
(Suspicious, new @ (Active | Quit | Unreachable)) |
(Quit, new @ (Active | Unreachable | Suspicious | Indirect)) |
(Unreachable, new @ Quit) => {
self.state = new;
Some(new)
},
(Unreachable, Active | Indirect) => {
self.seen_by_peer.set_now();
self.state = Indirect;
Some(Indirect)
}
}
}
else if self.clk == state.clk
&& state.state == Unreachable
&& self.state != Unreachable
{
self.state = Unreachable;
Some(Unreachable)
}
else {
None
}
}
pub fn has_quit(&mut self) {
info!("{} quit", self.name);
self.state = Quit;
self.link.take();
self.reached_out_to.set_now();
}
#[tracing::instrument(skip_all, fields(self = %self))]
#[must_use = "assumes due ping will be sent"]
pub fn tick(&mut self) -> TickResult {
let result = match self.state {
Active | Suspicious => self.tick_link(),
Indirect => match remaining(self.seen_by_peer, INDIRECT_TIMEOUT) {
Some(time) => {
let mut res = self.reach_out();
res.next_tick_in = res.next_tick_in.min(time);
res
},
None => self.go_unreachable()
},
Quit | Unreachable => self.reach_out()
};
debug!(?result);
result
}
fn tick_link(&mut self) -> TickResult {
use LinkTickResult as Ltr;
match self.link.as_mut().map(Link::tick) {
Some(Ltr::Ok(time)) => TickResult {
address_to_ping: None,
state_changed: None,
reach_out_to: None,
next_tick_in: time
},
Some(Ltr::PingDue(time)) => TickResult {
address_to_ping: self.get_address(),
state_changed: None,
reach_out_to: None,
next_tick_in: time
},
Some(Ltr::LinkSuspicious(time)) => {
info!(addr = %self.get_address().unwrap(), "link suspicious");
self.state = Suspicious;
TickResult {
address_to_ping: self.get_address(),
state_changed: Some(Suspicious),
reach_out_to: None,
next_tick_in: time
}
},
Some(Ltr::LinkUnreachable) => self.go_unreachable(),
None => {
error!("called tick_link() without active link");
self.go_unreachable()
}
}
}
fn go_unreachable(&mut self) -> TickResult {
info!(addr = ?self.get_address(), "link is unreachable");
self.state = Unreachable;
self.reached_out_to.take();
let mut res = self.reach_out();
res.state_changed.replace(Unreachable);
res
}
fn reach_out(&mut self) -> TickResult {
match remaining(self.reached_out_to, REACH_OUT_TIME) {
_ if self.known_addrs == 0 => TickResult {
address_to_ping: None,
state_changed: None,
reach_out_to: None,
next_tick_in: Duration::MAX
},
Some(next_tick_in) => TickResult {
address_to_ping: None,
state_changed: None,
reach_out_to: None,
next_tick_in
},
None => {
self.reached_out_to = Some(SystemTime::now());
self.addr_reach_out_idx += 1;
TickResult {
address_to_ping: None,
state_changed: None,
reach_out_to: Some(self.addr_reach_out_idx),
next_tick_in: REACH_OUT_TIME
}
}
}
}
}
impl From<Peer> for Conspirator {
fn from(other: Peer) -> Conspirator {
Conspirator {
id: other.id.into(),
name: other.name,
state: other.state,
link: other.link
}
}
}
impl std::fmt::Display for Peer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {} ({:?})", self.id, self.name, self.state)?;
if self.clk != Clock::null_clock() {
write!(f, " @ {}", self.clk)?;
}
if let Some(ref l) = self.link {write!(f, ", Link {{{l}}}")?;}
if self.new_addr_pending {
write!(f, " (new address, sync pending)")?;
}
if let Some(seen) = self.seen_by_peer {
write!(f, ", seen by peer ")?;
fmt_time(f, seen)?;
}
if self.addr_reach_out_idx != 0 || self.reached_out_to.is_some() {
write!(
f,
", reached out: (idx: {})",
self.addr_reach_out_idx + 1
)?;
if let Some(reached_out) = self.reached_out_to {
write!(f, " ")?;
fmt_time(f, reached_out)?;
}
}
Ok(())
}
}