use std::{
fmt::{self, Debug, Display, Formatter},
io, mem,
net::SocketAddr,
sync::Arc,
};
use casper_types::PublicKey;
use derive_more::From;
use futures::stream::{SplitSink, SplitStream};
use serde::Serialize;
use static_assertions::const_assert;
use tracing::Span;
use super::{error::ConnectionError, FramedTransport, GossipedAddress, Message, NodeId};
use crate::{
components::contract_runtime::ContractRuntimeAnnouncement,
effect::{
announcements::BlocklistAnnouncement,
requests::{NetworkInfoRequest, NetworkRequest},
},
protocol::Message as ProtocolMessage,
};
const _SMALL_NETWORK_EVENT_SIZE: usize = mem::size_of::<Event<ProtocolMessage>>();
const_assert!(_SMALL_NETWORK_EVENT_SIZE < 89);
#[derive(Debug, From, Serialize)]
pub(crate) enum Event<P> {
IncomingConnection {
incoming: Box<IncomingConnection<P>>,
#[serde(skip)]
span: Span,
},
IncomingMessage {
peer_id: Box<NodeId>,
msg: Box<Message<P>>,
#[serde(skip)]
span: Span,
},
IncomingClosed {
#[serde(skip_serializing)]
result: io::Result<()>,
peer_id: Box<NodeId>,
peer_addr: SocketAddr,
#[serde(skip_serializing)]
span: Box<Span>,
},
OutgoingConnection {
outgoing: Box<OutgoingConnection<P>>,
#[serde(skip_serializing)]
span: Span,
},
OutgoingDropped {
peer_id: Box<NodeId>,
peer_addr: SocketAddr,
},
#[from]
NetworkRequest {
#[serde(skip_serializing)]
req: Box<NetworkRequest<NodeId, P>>,
},
#[from]
NetworkInfoRequest {
#[serde(skip_serializing)]
req: Box<NetworkInfoRequest<NodeId>>,
},
GossipOurAddress,
PeerAddressReceived(GossipedAddress),
SweepOutgoing,
#[from]
BlocklistAnnouncement(BlocklistAnnouncement<NodeId>),
#[from]
ContractRuntimeAnnouncement(ContractRuntimeAnnouncement),
}
impl From<NetworkRequest<NodeId, ProtocolMessage>> for Event<ProtocolMessage> {
fn from(req: NetworkRequest<NodeId, ProtocolMessage>) -> Self {
Self::NetworkRequest { req: Box::new(req) }
}
}
impl From<NetworkInfoRequest<NodeId>> for Event<ProtocolMessage> {
fn from(req: NetworkInfoRequest<NodeId>) -> Self {
Self::NetworkInfoRequest { req: Box::new(req) }
}
}
impl<P: Display> Display for Event<P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Event::IncomingConnection { incoming, span: _ } => {
write!(f, "incoming connection: {}", incoming)
}
Event::IncomingMessage {
peer_id: node_id,
msg,
span: _,
} => write!(f, "msg from {}: {}", node_id, msg),
Event::IncomingClosed { peer_addr, .. } => {
write!(f, "closed connection from {}", peer_addr)
}
Event::OutgoingConnection { outgoing, span: _ } => {
write!(f, "outgoing connection: {}", outgoing)
}
Event::OutgoingDropped { peer_id, peer_addr } => {
write!(f, "dropped outgoing {} {}", peer_id, peer_addr)
}
Event::NetworkRequest { req } => write!(f, "request: {}", req),
Event::NetworkInfoRequest { req } => write!(f, "request: {}", req),
Event::GossipOurAddress => write!(f, "gossip our address"),
Event::PeerAddressReceived(gossiped_address) => {
write!(f, "received gossiped peer address {}", gossiped_address)
}
Event::BlocklistAnnouncement(ann) => {
write!(f, "handling blocklist announcement: {}", ann)
}
Event::ContractRuntimeAnnouncement(ann) => {
write!(f, "handling contract runtime announcement: {}", ann)
}
Event::SweepOutgoing => {
write!(f, "sweep outgoing connections")
}
}
}
}
#[derive(Debug, Serialize)]
pub(crate) enum IncomingConnection<P> {
FailedEarly {
peer_addr: SocketAddr,
error: ConnectionError,
},
Failed {
peer_addr: SocketAddr,
peer_id: NodeId,
error: ConnectionError,
},
Loopback,
Established {
peer_addr: SocketAddr,
public_addr: SocketAddr,
peer_id: NodeId,
peer_consensus_public_key: Option<PublicKey>,
#[serde(skip_serializing)]
stream: SplitStream<FramedTransport<P>>,
},
}
impl<P> Display for IncomingConnection<P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
IncomingConnection::FailedEarly { peer_addr, error } => {
write!(f, "early failure from {}: {}", peer_addr, error)
}
IncomingConnection::Failed {
peer_addr,
peer_id,
error,
} => write!(f, "failure from {}/{}: {}", peer_addr, peer_id, error),
IncomingConnection::Loopback => f.write_str("loopback"),
IncomingConnection::Established {
peer_addr,
public_addr,
peer_id,
peer_consensus_public_key,
stream: _,
} => {
write!(
f,
"connection established from {}/{}; public: {}",
peer_addr, peer_id, public_addr,
)?;
if let Some(public_key) = peer_consensus_public_key {
write!(f, " [{}]", public_key)
} else {
f.write_str(" [no validator id]")
}
}
}
}
}
#[derive(Debug, Serialize)]
pub(crate) enum OutgoingConnection<P> {
FailedEarly {
peer_addr: SocketAddr,
error: ConnectionError,
},
Failed {
peer_addr: SocketAddr,
peer_id: NodeId,
error: ConnectionError,
},
Loopback { peer_addr: SocketAddr },
Established {
peer_addr: SocketAddr,
peer_id: NodeId,
peer_consensus_public_key: Option<PublicKey>,
#[serde(skip_serializing)]
sink: SplitSink<FramedTransport<P>, Arc<Message<P>>>,
},
}
impl<P> Display for OutgoingConnection<P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
OutgoingConnection::FailedEarly { peer_addr, error } => {
write!(f, "early failure to {}: {}", peer_addr, error)
}
OutgoingConnection::Failed {
peer_addr,
peer_id,
error,
} => write!(f, "failure to {}/{}: {}", peer_addr, peer_id, error),
OutgoingConnection::Loopback { peer_addr } => write!(f, "loopback to {}", peer_addr),
OutgoingConnection::Established {
peer_addr,
peer_id,
peer_consensus_public_key,
sink: _,
} => {
write!(f, "connection established to {}/{}", peer_addr, peer_id)?;
if let Some(public_key) = peer_consensus_public_key {
write!(f, " [{}]", public_key)
} else {
f.write_str(" [no validator id]")
}
}
}
}
}