libp2p_floodsub/
protocol.rs1use crate::rpc_proto;
22use crate::topic::Topic;
23use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
24use prost::Message;
25use std::{error, fmt, io, iter, pin::Pin};
26use futures::{Future, io::{AsyncRead, AsyncWrite}};
27
28#[derive(Debug, Clone, Default)]
30pub struct FloodsubProtocol {}
31
32impl FloodsubProtocol {
33 pub fn new() -> FloodsubProtocol {
35 FloodsubProtocol {}
36 }
37}
38
39impl UpgradeInfo for FloodsubProtocol {
40 type Info = &'static [u8];
41 type InfoIter = iter::Once<Self::Info>;
42
43 fn protocol_info(&self) -> Self::InfoIter {
44 iter::once(b"/floodsub/1.0.0")
45 }
46}
47
48impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
49where
50 TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
51{
52 type Output = FloodsubRpc;
53 type Error = FloodsubDecodeError;
54 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
55
56 fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
57 Box::pin(async move {
58 let packet = upgrade::read_one(&mut socket, 2048).await?;
59 let rpc = rpc_proto::Rpc::decode(&packet[..])?;
60
61 let mut messages = Vec::with_capacity(rpc.publish.len());
62 for publish in rpc.publish.into_iter() {
63 messages.push(FloodsubMessage {
64 source: PeerId::from_bytes(&publish.from.unwrap_or_default()).map_err(|_| {
65 FloodsubDecodeError::InvalidPeerId
66 })?,
67 data: publish.data.unwrap_or_default(),
68 sequence_number: publish.seqno.unwrap_or_default(),
69 topics: publish.topic_ids
70 .into_iter()
71 .map(Topic::new)
72 .collect(),
73 });
74 }
75
76 Ok(FloodsubRpc {
77 messages,
78 subscriptions: rpc.subscriptions
79 .into_iter()
80 .map(|sub| FloodsubSubscription {
81 action: if Some(true) == sub.subscribe {
82 FloodsubSubscriptionAction::Subscribe
83 } else {
84 FloodsubSubscriptionAction::Unsubscribe
85 },
86 topic: Topic::new(sub.topic_id.unwrap_or_default()),
87 })
88 .collect(),
89 })
90 })
91 }
92}
93
94#[derive(Debug)]
96pub enum FloodsubDecodeError {
97 ReadError(upgrade::ReadOneError),
99 ProtobufError(prost::DecodeError),
101 InvalidPeerId,
103}
104
105impl From<upgrade::ReadOneError> for FloodsubDecodeError {
106 fn from(err: upgrade::ReadOneError) -> Self {
107 FloodsubDecodeError::ReadError(err)
108 }
109}
110
111impl From<prost::DecodeError> for FloodsubDecodeError {
112 fn from(err: prost::DecodeError) -> Self {
113 FloodsubDecodeError::ProtobufError(err)
114 }
115}
116
117impl fmt::Display for FloodsubDecodeError {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 match *self {
120 FloodsubDecodeError::ReadError(ref err) =>
121 write!(f, "Error while reading from socket: {}", err),
122 FloodsubDecodeError::ProtobufError(ref err) =>
123 write!(f, "Error while decoding protobuf: {}", err),
124 FloodsubDecodeError::InvalidPeerId =>
125 write!(f, "Error while decoding PeerId from message"),
126 }
127 }
128}
129
130impl error::Error for FloodsubDecodeError {
131 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
132 match *self {
133 FloodsubDecodeError::ReadError(ref err) => Some(err),
134 FloodsubDecodeError::ProtobufError(ref err) => Some(err),
135 FloodsubDecodeError::InvalidPeerId => None,
136 }
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Hash)]
142pub struct FloodsubRpc {
143 pub messages: Vec<FloodsubMessage>,
145 pub subscriptions: Vec<FloodsubSubscription>,
147}
148
149impl UpgradeInfo for FloodsubRpc {
150 type Info = &'static [u8];
151 type InfoIter = iter::Once<Self::Info>;
152
153 fn protocol_info(&self) -> Self::InfoIter {
154 iter::once(b"/floodsub/1.0.0")
155 }
156}
157
158impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
159where
160 TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
161{
162 type Output = ();
163 type Error = io::Error;
164 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
165
166 fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
167 Box::pin(async move {
168 let bytes = self.into_bytes();
169 upgrade::write_one(&mut socket, bytes).await?;
170 Ok(())
171 })
172 }
173}
174
175impl FloodsubRpc {
176 fn into_bytes(self) -> Vec<u8> {
178 let rpc = rpc_proto::Rpc {
179 publish: self.messages.into_iter()
180 .map(|msg| {
181 rpc_proto::Message {
182 from: Some(msg.source.to_bytes()),
183 data: Some(msg.data),
184 seqno: Some(msg.sequence_number),
185 topic_ids: msg.topics
186 .into_iter()
187 .map(|topic| topic.into())
188 .collect()
189 }
190 })
191 .collect(),
192
193 subscriptions: self.subscriptions.into_iter()
194 .map(|topic| {
195 rpc_proto::rpc::SubOpts {
196 subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
197 topic_id: Some(topic.topic.into())
198 }
199 })
200 .collect()
201 };
202
203 let mut buf = Vec::with_capacity(rpc.encoded_len());
204 rpc.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
205 buf
206 }
207}
208
209#[derive(Debug, Clone, PartialEq, Eq, Hash)]
211pub struct FloodsubMessage {
212 pub source: PeerId,
214
215 pub data: Vec<u8>,
217
218 pub sequence_number: Vec<u8>,
220
221 pub topics: Vec<Topic>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
229pub struct FloodsubSubscription {
230 pub action: FloodsubSubscriptionAction,
232 pub topic: Topic,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum FloodsubSubscriptionAction {
239 Subscribe,
241 Unsubscribe,
243}