use std::{collections::BTreeSet, fmt, fmt::Debug};
use futures_timer::Delay;
use hashlink::LinkedHashMap;
use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use quick_protobuf::MessageWrite;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use web_time::Instant;
use crate::{rpc::Sender, rpc_proto::proto, TopicHash};
#[derive(Clone, Debug, Default)]
pub struct FailedMessages {
pub publish: usize,
pub forward: usize,
pub priority: usize,
pub non_priority: usize,
pub timeout: usize,
}
impl FailedMessages {
pub fn total_queue_full(&self) -> usize {
self.priority + self.non_priority
}
pub fn total(&self) -> usize {
self.priority + self.non_priority
}
}
#[derive(Debug)]
pub enum MessageAcceptance {
Accept,
Reject,
Ignore,
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MessageId(pub Vec<u8>);
impl MessageId {
pub fn new(value: &[u8]) -> Self {
Self(value.to_vec())
}
}
impl<T: Into<Vec<u8>>> From<T> for MessageId {
fn from(value: T) -> Self {
Self(value.into())
}
}
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}
impl std::fmt::Debug for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
}
}
#[derive(Debug)]
pub(crate) struct PeerDetails {
pub(crate) kind: PeerKind,
pub(crate) outbound: bool,
pub(crate) connections: Vec<ConnectionId>,
pub(crate) topics: BTreeSet<TopicHash>,
pub(crate) sender: Sender,
pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
#[cfg_attr(
feature = "metrics",
derive(prometheus_client::encoding::EncodeLabelValue)
)]
pub enum PeerKind {
Gossipsubv1_2,
Gossipsubv1_1,
Gossipsub,
Floodsub,
NotSupported,
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct RawMessage {
pub source: Option<PeerId>,
pub data: Vec<u8>,
pub sequence_number: Option<u64>,
pub topic: TopicHash,
pub signature: Option<Vec<u8>>,
pub key: Option<Vec<u8>>,
pub validated: bool,
}
impl PeerKind {
pub(crate) fn is_gossipsub(&self) -> bool {
matches!(
self,
Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub
)
}
}
impl RawMessage {
pub fn raw_protobuf_len(&self) -> usize {
let message = proto::Message {
from: self.source.map(|m| m.to_bytes()),
data: Some(self.data.clone()),
seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic: TopicHash::into_string(self.topic.clone()),
signature: self.signature.clone(),
key: self.key.clone(),
};
message.get_size()
}
}
impl From<RawMessage> for proto::Message {
fn from(raw: RawMessage) -> Self {
proto::Message {
from: raw.source.map(|m| m.to_bytes()),
data: Some(raw.data),
seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic: TopicHash::into_string(raw.topic),
signature: raw.signature,
key: raw.key,
}
}
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Message {
pub source: Option<PeerId>,
pub data: Vec<u8>,
pub sequence_number: Option<u64>,
pub topic: TopicHash,
}
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Message")
.field(
"data",
&format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
)
.field("source", &self.source)
.field("sequence_number", &self.sequence_number)
.field("topic", &self.topic)
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Subscription {
pub action: SubscriptionAction,
pub topic_hash: TopicHash,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SubscriptionAction {
Subscribe,
Unsubscribe,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct PeerInfo {
pub(crate) peer_id: Option<PeerId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ControlAction {
IHave(IHave),
IWant(IWant),
Graft(Graft),
Prune(Prune),
IDontWant(IDontWant),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IHave {
pub(crate) topic_hash: TopicHash,
pub(crate) message_ids: Vec<MessageId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IWant {
pub(crate) message_ids: Vec<MessageId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Graft {
pub(crate) topic_hash: TopicHash,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Prune {
pub(crate) topic_hash: TopicHash,
pub(crate) peers: Vec<PeerInfo>,
pub(crate) backoff: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IDontWant {
pub(crate) message_ids: Vec<MessageId>,
}
#[derive(Debug)]
pub enum RpcOut {
Publish { message: RawMessage, timeout: Delay },
Forward { message: RawMessage, timeout: Delay },
Subscribe(TopicHash),
Unsubscribe(TopicHash),
Graft(Graft),
Prune(Prune),
IHave(IHave),
IWant(IWant),
IDontWant(IDontWant),
}
impl RpcOut {
pub fn into_protobuf(self) -> proto::RPC {
self.into()
}
}
impl From<RpcOut> for proto::RPC {
fn from(rpc: RpcOut) -> Self {
match rpc {
RpcOut::Publish {
message,
timeout: _,
} => proto::RPC {
subscriptions: Vec::new(),
publish: vec![message.into()],
control: None,
},
RpcOut::Forward {
message,
timeout: _,
} => proto::RPC {
publish: vec![message.into()],
subscriptions: Vec::new(),
control: None,
},
RpcOut::Subscribe(topic) => proto::RPC {
publish: Vec::new(),
subscriptions: vec![proto::SubOpts {
subscribe: Some(true),
topic_id: Some(topic.into_string()),
}],
control: None,
},
RpcOut::Unsubscribe(topic) => proto::RPC {
publish: Vec::new(),
subscriptions: vec![proto::SubOpts {
subscribe: Some(false),
topic_id: Some(topic.into_string()),
}],
control: None,
},
RpcOut::IHave(IHave {
topic_hash,
message_ids,
}) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
ihave: vec![proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}],
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::IWant(IWant { message_ids }) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}],
graft: vec![],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
publish: Vec::new(),
subscriptions: vec![],
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![],
graft: vec![proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
}],
prune: vec![],
idontwant: vec![],
}),
},
RpcOut::Prune(Prune {
topic_hash,
peers,
backoff,
}) => {
proto::RPC {
publish: Vec::new(),
subscriptions: vec![],
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![],
graft: vec![],
prune: vec![proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
signed_peer_record: None,
})
.collect(),
backoff,
}],
idontwant: vec![],
}),
}
}
RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
ihave: vec![],
iwant: vec![],
graft: vec![],
prune: vec![],
idontwant: vec![proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}],
}),
},
}
}
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Rpc {
pub messages: Vec<RawMessage>,
pub subscriptions: Vec<Subscription>,
pub control_msgs: Vec<ControlAction>,
}
impl Rpc {
pub fn into_protobuf(self) -> proto::RPC {
self.into()
}
}
impl From<Rpc> for proto::RPC {
fn from(rpc: Rpc) -> Self {
let mut publish = Vec::new();
for message in rpc.messages.into_iter() {
let message = proto::Message {
from: message.source.map(|m| m.to_bytes()),
data: Some(message.data),
seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic: TopicHash::into_string(message.topic),
signature: message.signature,
key: message.key,
};
publish.push(message);
}
let subscriptions = rpc
.subscriptions
.into_iter()
.map(|sub| proto::SubOpts {
subscribe: Some(sub.action == SubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();
let mut control = proto::ControlMessage {
ihave: Vec::new(),
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
idontwant: Vec::new(),
};
let empty_control_msg = rpc.control_msgs.is_empty();
for action in rpc.control_msgs {
match action {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
}) => {
let rpc_ihave = proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
ControlAction::IWant(IWant { message_ids }) => {
let rpc_iwant = proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
ControlAction::Graft(Graft { topic_hash }) => {
let rpc_graft = proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
}) => {
let rpc_prune = proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
signed_peer_record: None,
})
.collect(),
backoff,
};
control.prune.push(rpc_prune);
}
ControlAction::IDontWant(IDontWant { message_ids }) => {
let rpc_idontwant = proto::ControlIDontWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.idontwant.push(rpc_idontwant);
}
}
}
proto::RPC {
subscriptions,
publish,
control: if empty_control_msg {
None
} else {
Some(control)
},
}
}
}
impl fmt::Debug for Rpc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut b = f.debug_struct("GossipsubRpc");
if !self.messages.is_empty() {
b.field("messages", &self.messages);
}
if !self.subscriptions.is_empty() {
b.field("subscriptions", &self.subscriptions);
}
if !self.control_msgs.is_empty() {
b.field("control_msgs", &self.control_msgs);
}
b.finish()
}
}
impl PeerKind {
pub fn as_static_ref(&self) -> &'static str {
match self {
Self::NotSupported => "Not Supported",
Self::Floodsub => "Floodsub",
Self::Gossipsub => "Gossipsub v1.0",
Self::Gossipsubv1_1 => "Gossipsub v1.1",
Self::Gossipsubv1_2 => "Gossipsub v1.2",
}
}
}
impl AsRef<str> for PeerKind {
fn as_ref(&self) -> &str {
self.as_static_ref()
}
}
impl fmt::Display for PeerKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_ref())
}
}