use std::time::Duration;
use std::sync::atomic::Ordering;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::AtomicU64;
use crypto_box::SecretKey;
use dashmap::DashMap;
use rusqlite::{
Error,
types::Type,
Result as SqlResult
};
use tracing::instrument;
use crate::control::{
Conspirator as InlineConspirator,
CnsprcyStatus,
KeyArg,
NodeArg,
NodeReq
};
use crate::event::{
AnyEvent,
PeerEvent
};
use crate::message::{
self,
DynamicPayload,
Payload,
Packet,
PingOrPong,
ProtocolPayload
};
use crate::peer::{
Address,
AtomicClock,
data::TickResult,
Invitation,
induct::Content as InvitationContent,
NodeID as ID,
Peer,
PeerState,
Status
};
use crate::store::{
Store,
Action,
Config,
Conspirator,
Op
};
use crate::util::*;
#[derive(Debug)]
pub struct Node {
id: ID,
name: String,
clk: AtomicClock,
hash: AtomicU64,
outbound: Sender<PktTo>,
dispatcher_sender: Sender<AnyEvent>,
db: Store,
table: DashMap<ID, Peer>
}
pub enum ValidationResult {
Ok,
NewAddress(Address),
Disabled,
Invalid
}
impl Node {
#[instrument(skip_all, level = "debug")]
pub fn load(
db: Store,
outbound: Sender<PktTo>,
dispatcher_sender: Sender<AnyEvent>)
-> SqlResult<Option<Self>>
{
let own_id = db.read_config(Config::ID)
.inspect_err(|err| error!(%err, "failed to load own id"))?
.ok_or(Error::QueryReturnedNoRows)?
.parse::<ID>()
.map_err(|err| Error::FromSqlConversionFailure(
1,
Type::Text,
format!("failed to load own node ID: {err}").into()
))?;
debug!(%own_id, "loaded");
let mut clock: Option<AtomicClock> = None;
let mut name: Option<String> = None;
let table = DashMap::new();
for (conspirator, addr_count) in db.get_conspirators()? {
debug!(?conspirator, "loading");
if conspirator.id == own_id {
clock = Some(AtomicClock::load(conspirator.clock));
name = Some(conspirator.name);
if !conspirator.active {
warn!("node has been disabled!");
return Ok(None);
}
}
else if conspirator.active {
let mut peer = Peer::load(
conspirator.id,
conspirator.name,
conspirator.clock,
conspirator.state,
addr_count
);
peer.update_address_count(addr_count);
table.insert(conspirator.id, peer);
}
}
let hash = db.hash()?.into();
let Some((name, clk)) = name.zip(clock) else {
error!(%own_id, "corrupt db: missing own entry among conspirators");
return Err(Error::QueryReturnedNoRows);
};
Ok(Some(Self {
id: own_id,
name,
clk,
hash,
outbound,
dispatcher_sender,
db,
table
}))
}
pub fn init(id: ID, name: String, db: Store)
-> SqlResult<(Self, Receiver<AnyEvent>, Receiver<PktTo>)>
{
let (dispatcher_sender, dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, outbound_packet_receiver)
= new_channel::<PktTo>();
db.write_config(Config::ID, &id.to_string())?;
let name_op = db.create_op(id, id, Action::Name(name))?;
db.absorb_op(name_op)?;
Self::load(db, outbound, dispatcher_sender).map(|node_opt| (
node_opt.expect("newly created node should be enabled"),
dispatcher_receiver,
outbound_packet_receiver
))
}
#[cfg(test)]
pub fn stop(self) -> Store {
self.db
}
fn hash(&self) -> u64 {
self.hash.load(Ordering::Acquire)
}
fn update_hash(&self) {
match self.db.hash() {
Ok(hash) => self.hash.store(hash, Ordering::Release),
Err(err) => error!(%err, "failed to update hash")
}
}
fn vector_clock_for(&self, to: ID) -> Vec<PeerState> {
self.table
.iter()
.filter(|p| p.id != to)
.map(|p| p.get_peer_state())
.collect()
}
fn init_sync(&self) -> Payload {
match self.db.vector() {
Ok(counters) => ProtocolPayload::Sync {
counters,
ops: vec![] }.into(),
Err(err) => {
error!(%err, "failed to construct Sync payload");
Payload::None
}
}
}
#[instrument(skip_all, fields(peer = %new), level = "debug")]
fn add_new_peer(&self, mut new: Peer) -> bool {
if let Some(old) = self.table.get(&new.id) {
error!(old = ?old.value(), ?new, "attempted to clobber peer");
false
}
else if new.id == self.id {
error!(%self.id, %new.name, "tried to add peer with own id");
false
}
else if self.db.get::<Conspirator>(new.id)
.is_ok_and(|o| o.is_some_and(|c| !c.active))
{
warn!("tried to re-add disabled peer");
false
}
else {
if let Some(addr) = new.get_address() {
match self.db.new_address(new.id, addr) {
Ok(true) => new.new_addr_pending = true,
Ok(false) => {},
Err(err) => error!(
%err,
%addr,
"failed to determine whether address is new"
)
}
}
self.table.insert(new.id, new);
info!("added");
true
}
}
fn resolve_and_then<F, R>(&self, node_arg: NodeArg, f: F) -> Option<R>
where F: Fn(&mut Peer) -> R
{
match node_arg {
NodeArg::ID(id) => self.table
.get_mut(&id.into())
.map(|mut p| f(&mut p)),
NodeArg::Name(name) => self.table
.iter_mut()
.find(|p| p.name == name)
.map(|mut p| f(&mut p))
}
}
#[instrument(skip(self), level = "debug")]
fn add_address(&mut self, id: ID, addr: Address) -> SqlResult<Op> {
let op = self.db.create_op(self.id, id, Action::AddAddress(addr))?;
info!(?op, "created");
let _ = self.absorb([op.clone()].into_iter());
Ok(op)
}
#[must_use]
fn absorb(&mut self, ops: impl Iterator<Item = Op>) -> bool {
let mut unchanged = true;
for op in ops {
match self.db.absorb_op(op) {
Ok(true) => unchanged = false,
Ok(false) => {},
Err(err) => error!(%err, "failed to absorb op")
}
}
if unchanged {return true}
self.update_hash();
let conspirators = match self.db.get_conspirators() {
Ok(conspirators) => conspirators,
Err(err) => {
error!(%err, "failed to load conspirators");
return true
}
};
for (conspirator, addr_count) in conspirators {
if conspirator.id == self.id {
self.name = conspirator.name;
if !conspirator.active {return false}
}
else if !conspirator.active {
if let Some((_id, p)) = self.table.remove(&conspirator.id) {
info!(%p, "removed deactivated conspirator");
}
}
else {
let Some(mut p) = self.table.get_mut(&conspirator.id) else {
let peer = Peer::load(
conspirator.id,
conspirator.name,
conspirator.clock,
conspirator.state,
addr_count
);
self.table.insert(conspirator.id, peer);
continue;
};
p.update_address_count(addr_count);
p.name = conspirator.name;
p.new_addr_pending = p.get_pending_address()
.and_then(|address| self.db.new_address(p.id, address)
.map_err(|err| error!(
%err,
%address,
"can't tell if address is new, assuming no"
))
.ok())
.inspect(|&pending| if !pending {
debug!(%p.id, "pending address cleared")
})
.unwrap_or(false);
}
}
true
}
#[instrument(skip(self), level = "debug")]
fn author_op(&mut self, target: ID, action: Action) -> bool {
match self.db.create_op(self.id, target, action) {
Ok(op) => {
let _ = self.absorb([op.clone()].into_iter());
self.broadcast(ProtocolPayload::Ops(vec![op]).into())
},
Err(err) => {
error!(%err, "failed to create op");
false
}
}
}
fn shutdown(&mut self) {
let (dispatcher_sender, _dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, _outbound_packet_receiver)
= new_channel::<PktTo>();
self.outbound = outbound;
self.dispatcher_sender = dispatcher_sender;
}
fn send_packet(&self, pkt: PktTo) {
self.outbound
.send(pkt)
.unwrap_or_else(|pkt| error!(?pkt, "node failed to send packet"))
}
fn packet(&self, dst: ID, png: PingOrPong, pyl: Payload) -> Packet {
let clk = self.clk.next();
let hsh = self.hash();
let vec = self.vector_clock_for(dst);
Packet { dst, src: self.id, clk, hsh, vec, png, pyl }
}
fn ping(&self, to: ID, at: Address) {
let packet = self.packet(to, PingOrPong::Ping, Payload::None);
self.send_packet((at, packet));
}
fn send(&self, to: ID, pyl: Payload) {
let addr = self.table
.get_mut(&to)
.expect("send() to non-existent peer")
.get_address_to_ping()
.expect("send() to peer without address");
let packet = self.packet(to, PingOrPong::Ping, pyl);
self.send_packet((addr, packet));
}
fn respond(&self, to: ID, pyl: Payload) {
let addr = self.table
.get(&to)
.expect("respond to non-existent peer")
.get_address()
.expect("respond to peer without address");
let packet = self.packet(to, PingOrPong::Pong, pyl);
self.send_packet((addr, packet));
}
fn send_or_respond(&self, to: ID, png: PingOrPong, pyl: Payload) {
match png {
PingOrPong::Ping => self.send(to, pyl),
PingOrPong::Pong => self.respond(to, pyl)
}
}
fn broadcast(&self, pyl: Payload) -> bool {
let active_ids: Vec<ID> = self.table
.iter()
.filter(|p| p.is_active())
.map(|p| p.id)
.collect();
for active_id in active_ids {
self.send(active_id, pyl.clone())
}
let clocks = self.table.iter()
.map(|r| (*r.key(), r.value().get_peer_state().clk))
.chain([(self.id, self.clk.prev())]);
self.db.persist_clocks(clocks)
.map_err(|err| error!(%err, "failed to persist clocks"))
.is_ok()
}
fn dispatch_event<E: Into<AnyEvent>>(&self, event: E) {
self.dispatcher_sender
.send(event.into())
.unwrap_or_else(|evt| error!(?evt, "node failed to send event"))
}
pub fn handle_request(&mut self, req: NodeReq) {
match req {
NodeReq::Write(((key, value), replier)) => {
replier.reply(
self.author_op(self.id, Action::Write {key, value})
);
},
NodeReq::AddAddress(((arg, addr), replier)) => {
let known_id = match &arg {
NodeArg::ID(id) => self.table.contains_key(&id.get())
.then_some(id.get()),
NodeArg::Name(n) => self.table.iter()
.find(|p| &p.name == n)
.map(|p| p.id)
};
let Some(id) = known_id else {
error!(%arg, "no such node");
replier.reply(false);
return;
};
match self.db.new_address(id, addr) {
Ok(true) => match self.add_address(id, addr) {
Ok(_op) => replier.reply(true),
Err(err) => {
error!(%addr, %err, "failed to add address");
replier.reply(false)
}
},
Ok(false) => replier.reply(true),
Err(err) => {
error!(%addr, %err, "failed to add address");
replier.reply(false)
}
}
},
NodeReq::Advertise(req) => {
let private_key = KeyArg::random();
let res = self.db.write_config(
Config::PRIVATE_KEY,
&private_key.to_string()
);
match res {
Ok(true) => {},
Ok(false) => warn!("unexpected number of changed rows"),
Err(err) => {
error!(?err, "failed to store new private key");
return;
}
}
let public_key = SecretKey::from_bytes(private_key.get())
.public_key()
.to_bytes();
req.reply(public_key);
},
NodeReq::Invite(((pubkey, addresses), replier)) => {
let key = match self.db.read_config(Config::KEY) {
Ok(Some(key_str)) => match key_str.parse() {
Ok(key) => key,
Err(err) => {
error!(?err, "failed to read cnsprcy key from db");
return;
}
},
Ok(None) => {
error!("cnsprcy key not in db");
return;
},
Err(err) => {
error!(?err, "failed to read cnsprcy key from db");
return;
}
};
let content = InvitationContent {
id: self.id,
name: self.name.clone(),
key,
addresses
};
let cb_public_key = pubkey.clone().get().into();
match Invitation::encrypt(&cb_public_key, &content) {
Ok(invitation) => replier.reply(invitation),
Err(err) => {
error!(%err, %pubkey, "failed to encrypt invitation");
}
};
},
NodeReq::Disable((arg, replier)) => {
let known_id = match &arg {
NodeArg::ID(id) if id.get() == self.id => Some(self.id),
NodeArg::Name(n) if n == &self.name => Some(self.id),
NodeArg::ID(id) => self.table.contains_key(&id.get())
.then_some(id.get()),
NodeArg::Name(n) => self.table.iter()
.find(|p| &p.name == n)
.map(|p| p.id)
};
let Some(id) = known_id else {
error!(%arg, "no such (enabled) node");
replier.reply(false);
return;
};
replier.reply(
self.author_op(id, Action::Active(false))
);
if id == self.id {
warn!("self-disabled successfully, shutting down");
self.shutdown();
}
},
NodeReq::Join((join_req, replier)) => replier.reply(
self.join(join_req.id.into(), join_req.name, join_req.addr)
),
NodeReq::GetConspirator((arg, replier)) => {
if let Some(p) = self.resolve_and_then(arg, |p| p.clone()) {
replier.reply(p);
}
},
NodeReq::GetConspirators(req) => req.reply(
self.table
.iter()
.map(|p| p.value().clone().into())
.collect::<Vec<InlineConspirator>>()
),
NodeReq::SendPayload(((arg, pyl, addr), replier)) => replier.reply(
self.send_payload(arg, pyl, addr)
),
NodeReq::GetStatus(((addrs, handlers), replier)) => {
let conspirators = self.table
.iter()
.map(|p| p.value().clone().into())
.collect::<Vec<InlineConspirator>>();
replier.reply(CnsprcyStatus {
id: self.id.into(),
name: self.name.clone(),
addrs,
handlers,
conspirators
})
}
}
}
#[instrument(skip_all, level = "debug", name = "node_tick")]
pub fn tick_peers(&self) -> Duration {
let mut soonest_tick = Duration::MAX;
let mut addrs_to_ping = Vec::new();
for mut peer in self.table.iter_mut() {
let TickResult{
address_to_ping,
state_changed,
mut next_tick_in,
reach_out_to,
} = peer.tick();
if let Some(addr) = address_to_ping {
addrs_to_ping.push((peer.id, addr));
}
if let Some(index) = reach_out_to {
match self.db.reach_out_addr(peer.id, index) {
Ok(Some(addr)) => addrs_to_ping.push((peer.id, addr)),
Ok(None) => {
debug!(
peer = ?peer.value(),
"no known addresses to reach out to"
);
next_tick_in = Duration::MAX;
}
Err(err) => error!(
peer=%peer.value(),
%err,
index,
"failed to select reach-out address"
)
}
}
if let Some(state) = state_changed {
self.dispatch_event(PeerEvent::changed(peer.id, state));
}
soonest_tick = std::cmp::min(soonest_tick, next_tick_in);
}
if addrs_to_ping.is_empty() {debug!("no pings due")}
else {debug!("{} pings due", addrs_to_ping.len())}
for (id, addr) in addrs_to_ping {
self.ping(id, addr);
}
soonest_tick
}
pub fn decrypt(&self, invitation: Invitation)
-> Result<InvitationContent, String>
{
let priv_key = self.db.read_config(Config::PRIVATE_KEY)
.map_err(|e| format!("failed to read private key from db: {e}"))?
.ok_or("cannot accept invitation without private key")?
.parse::<KeyArg>()?;
let content = invitation.decrypt(&priv_key.get().into())?;
let InvitationContent { id, name, key, addresses } = &content;
info!(%id, %name, ?addresses, "accepting invitation");
match self.db.write_config(Config::KEY, &key.to_string()) {
Ok(true) => Ok(content),
Ok(false) => Err(
"writing new key to db didn't change any rows".to_string()
),
Err(err) => Err(format!("failed to store new key in the db: {err}"))
}
}
#[instrument(skip(self), name = "node_join")]
fn join(&mut self, id: ID, name: String, addr: Address) -> bool {
if self.add_new_peer(Peer::new(id, name, addr)) {
self.send(id, Payload::None);
info!("message sent");
true
}
else {
error!("failed to add peer, message not sent");
false
}
}
pub fn leave(&self) -> bool {
self.broadcast(ProtocolPayload::Quit.into())
}
pub fn send_payload(
&self,
to: NodeArg,
pyl: Payload,
address: Option<Address>)
-> bool
{
let get_id_and_address = |p: &mut Peer| {
match address {
Some(addr) if address != p.get_address() => (p.id, Some(addr)),
Some(_) | None => (p.id, p.get_address_to_ping())
}
};
let is_id = match &to {
NodeArg::ID(id) => Some((id.get(), address)),
_ => None
};
self.resolve_and_then(to, get_id_and_address)
.or(is_id)
.and_then(|(id, maybe_addr)| maybe_addr.map(|a| (id, a)))
.map(|(dst, addr)| (addr, self.packet(dst, PingOrPong::Ping, pyl)))
.map(|pkt_to| self.send_packet(pkt_to))
.is_some()
}
#[instrument(skip(self), level = "debug" name = "node_handle_pkt")]
pub fn handle_pkt(&mut self, from: Address, pkt: message::Packet) {
let new_address = match self.validate_packet(&pkt, from) {
ValidationResult::Ok => None,
ValidationResult::NewAddress(addr) => match &pkt.pyl {
Payload::Protocol(
ProtocolPayload::Ops(ops) | ProtocolPayload::Sync {ops, ..}
) => {
let added_this_address = |op: &Op| {
op.target == pkt.src &&
op.action == Action::AddAddress(addr)
};
if ops.iter().any(added_this_address) {None}
else {Some(addr)}
},
_ => {Some(addr)}
},
ValidationResult::Disabled => {
if pkt.hsh == self.hash() {
warn!("in sync with disabled peer");
return;
}
match pkt.pyl {
Payload::Protocol(ProtocolPayload::Sync{counters, ops}) => {
let countered = !self.absorb(ops.into_iter());
match self.db.sync(&counters)
.and_then(|ops| Ok(ProtocolPayload::Sync {
ops,
counters: self.db.vector()?
}))
{
Ok(pyl) => {
self.send_payload(
pkt.src.into(),
pyl.into(),
Some(from)
);
}
Err(err) => {
error!(%err, "failed to create sync op");
}
}
if countered {
warn!("received counter-disable op, shutting down");
self.shutdown();
}
},
_ => {
self.send_payload(
pkt.src.into(),
self.init_sync(),
Some(from)
);
}
}
return;
},
ValidationResult::Invalid => {
warn!("Ignoring invalid packet from {}", from);
return
}
};
self.dispatch_event(PeerEvent::seen(&pkt));
self.process_vector_clock(pkt.vec);
let response = match pkt.pyl {
Payload::Protocol(pyl) => match pyl {
ProtocolPayload::Ops(ops) => {
if !self.absorb(ops.into_iter()) {
warn!("received disable op, shutting down");
self.shutdown();
return;
}
Payload::None
},
ProtocolPayload::Sync { counters, ops } => {
if !self.absorb(ops.into_iter()) {
warn!("received disable op, shutting down");
self.shutdown();
return;
}
if pkt.hsh == self.hash() {Payload::None} else {
self.db.sync(&counters)
.and_then(|ops| Ok(ProtocolPayload::Sync {
ops,
counters: self.db.vector()?
}))
.map(Payload::Protocol)
.unwrap_or_else(|err| {
error!(%err, "failed to sync");
Payload::None
})
}
},
ProtocolPayload::Quit => {
self.table
.get_mut(&pkt.src)
.expect("peer has to exist after validate_packet")
.has_quit();
self.dispatch_event(
PeerEvent::changed(pkt.src, Status::Quit)
);
debug!("peer quit, finishing early");
return
}
},
Payload::Dynamic(e) => self.handle_dynamic(pkt.src, e),
Payload::None => Payload::None
};
let mismatch = pkt.hsh != self.hash();
if response != Payload::None {
debug!(?response, "finished!");
self.send_or_respond(pkt.src, !pkt.png, response)
}
else if mismatch {
let resp = self.init_sync();
debug!(?resp, "synchronizing with peer");
self.send_or_respond(pkt.src, !pkt.png, resp)
}
else if let Some(addr) = new_address {
match self.add_address(pkt.src, addr) {
Ok(op) => self.send_or_respond(
pkt.src,
!pkt.png,
ProtocolPayload::Ops(vec![op]).into()
),
Err(err) if pkt.png == PingOrPong::Ping => {
error!(%err, %addr, "failed to log new address");
debug!("ponging anyway");
self.respond(pkt.src, Payload::None)
},
Err(err) => error!(%err, %addr, "failed to log new address")
};
}
else if pkt.png == PingOrPong::Ping {
debug!(?response, "finished!");
self.send_or_respond(pkt.src, !pkt.png, response)
}
else {debug!("finished")}
}
fn validate_packet(&self, pkt: &Packet, addr: Address) -> ValidationResult {
if pkt.dst != self.id {
warn!(%pkt.dst, "packet has invalid dst ID");
return ValidationResult::Invalid;
}
let mut peer = if let Some(p) = self.table.get_mut(&pkt.src) {p} else {
if let Ok(Some((c, addrs))) = self.db.get_conspirator(pkt.src) {
if !c.active {
if pkt.clk <= c.clock {
return ValidationResult::Invalid;
}
info!("contacted by disabled peer");
return match self.db.persist_clocks([(pkt.src, pkt.clk)]) {
Ok(()) => ValidationResult::Disabled,
Err(err) => {
error!(
%err,
%pkt.clk,
%pkt.src,
"failed to persist clock for disabled peer"
);
ValidationResult::Invalid
}
}
}
warn!(?c, "contacted by conspirator that was not in the table");
self.add_new_peer(
Peer::load(c.id, c.name, c.clock, c.state, addrs)
);
}
else {
info!(id=%pkt.src, "contacted by unknown peer");
if !self.add_new_peer(Peer::unknown(pkt.src)) {
error!("unable to add peer, cannot process packet");
return ValidationResult::Invalid
}
}
let Some(peer) = self.table.get_mut(&pkt.src) else {
error!("cannot get peer that was just added");
return ValidationResult::Invalid
};
peer
};
self.update_link(&mut peer, pkt, addr)
}
fn update_link(
&self,
peer: &mut Peer,
pkt: &Packet,
addr: Address)
-> ValidationResult
{
if peer.new_clock(pkt.clk) {
if !peer.is_active() {
self.dispatch_event(
PeerEvent::changed(peer.id, Status::Active)
);
}
let res = peer.has_pnged(addr, pkt.png);
if res.was_inactive {
self.dispatch_event(
PeerEvent::changed(peer.id, Status::Active)
);
}
if res.new_address {
info!(%peer.id, %addr, "peer changed address");
self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
let is_unknown_address = self.db.new_address(peer.id, addr)
.map_err(|err| error!(
%err,
%peer.id,
%addr,
"failed to determine if address is new"
))
.unwrap_or(false);
if is_unknown_address {
info!(%peer.id, %addr, "discovered new address");
peer.new_addr_pending = true;
self.dispatch_event(PeerEvent::new_address(pkt.src, addr));
}
}
peer.get_pending_address()
.map(ValidationResult::NewAddress)
.unwrap_or(ValidationResult::Ok)
}
else {ValidationResult::Invalid}
}
fn process_vector_clock(&self, clock: Vec<PeerState>) {
for ps in clock {
match self.table.get_mut(&ps.id) {
Some(mut p) => if let Some(status) = p.absorb_peer_state(ps) {
self.dispatch_event(PeerEvent::changed(p.id, status));
},
None if ps.id == self.id =>
error!("received PeerState with own ID"),
None => {
let id = ps.id;
info!(%id, "discovered unknown peer");
let state_changed = self.table.entry(id)
.or_insert_with(|| Peer::unknown(id))
.absorb_peer_state(ps);
if let Some(status) = state_changed {
self.dispatch_event(PeerEvent::changed(id, status));
}
}
}
}
}
fn handle_dynamic(&self, id: ID, dynamic: DynamicPayload) -> Payload {
match &dynamic {
DynamicPayload::Push{tag, msg} => {
self.with_name(id, |name| info!(
"[PUSH] {}: [{}] {}",
name,
tag,
msg
));
},
DynamicPayload::Query{tag, msg} => {
self.with_name(id, |name| info!(
"[QURY] {}: [{}] {}",
name,
tag,
msg
));
},
&DynamicPayload::Response{ref tag, ref msg, to} => {
let (resp_to_id, resp_to_clk) = to;
if resp_to_id == self.id {
self.with_name(id, |name| info!(
"[RESP] {} (/{}): [{}] {}",
name,
resp_to_clk,
tag,
msg
));
}
else if let Some(resp_to_peer) = self.table.get(&resp_to_id) {
let resp_to_name = resp_to_peer.name.clone();
self.with_name(id, |name| info!(
"[RESP] {} @ {}/{}: [{}] {}",
name,
resp_to_name,
resp_to_clk,
tag,
msg
));
}
else {
warn!(%resp_to_id, "received response to an unknown peer");
self.with_name(id, |name| info!(
"[RESP] {} @ {}/{}: [{}] {}",
name,
resp_to_id,
resp_to_clk,
tag,
msg
));
}
}
}
self.dispatch_event((id, dynamic));
Payload::None
}
fn with_name<R>(&self, id: ID, f: impl FnOnce(&str) -> R) -> Option<R> {
self.table.view(&id, |_, p| f(p.name.as_str()))
}
}