1use crate::structs_proto;
22use futures::prelude::*;
23use libp2p_core::{
24 Multiaddr,
25 PublicKey,
26 upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}
27};
28use log::{debug, trace};
29use prost::Message;
30use std::convert::TryFrom;
31use std::{fmt, io, iter, pin::Pin};
32
33#[derive(Debug, Clone)]
35pub struct IdentifyProtocolConfig;
36
37#[derive(Debug, Clone)]
38#[non_exhaustive]
39pub struct RemoteInfo {
40 pub info: IdentifyInfo,
42 pub observed_addr: Multiaddr,
44}
45
46pub struct ReplySubstream<T> {
48 inner: T,
49}
50
51impl<T> fmt::Debug for ReplySubstream<T> {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 f.debug_tuple("ReplySubstream").finish()
54 }
55}
56
57impl<T> ReplySubstream<T>
58where
59 T: AsyncWrite + Unpin
60{
61 pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr)
66 -> impl Future<Output = Result<(), io::Error>>
67 {
68 debug!("Sending identify info to client");
69 trace!("Sending: {:?}", info);
70
71 let listen_addrs = info.listen_addrs
72 .into_iter()
73 .map(|addr| addr.to_vec())
74 .collect();
75
76 let pubkey_bytes = info.public_key.into_protobuf_encoding();
77
78 let message = structs_proto::Identify {
79 agent_version: Some(info.agent_version),
80 protocol_version: Some(info.protocol_version),
81 public_key: Some(pubkey_bytes),
82 listen_addrs,
83 observed_addr: Some(observed_addr.to_vec()),
84 protocols: info.protocols
85 };
86
87 async move {
88 let mut bytes = Vec::with_capacity(message.encoded_len());
89 message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
90 upgrade::write_one(&mut self.inner, &bytes).await
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct IdentifyInfo {
98 pub public_key: PublicKey,
100 pub protocol_version: String,
103 pub agent_version: String,
106 pub listen_addrs: Vec<Multiaddr>,
108 pub protocols: Vec<String>,
110}
111
112impl UpgradeInfo for IdentifyProtocolConfig {
113 type Info = &'static [u8];
114 type InfoIter = iter::Once<Self::Info>;
115
116 fn protocol_info(&self) -> Self::InfoIter {
117 iter::once(b"/ipfs/id/1.0.0")
118 }
119}
120
121impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
122where
123 C: AsyncRead + AsyncWrite + Unpin,
124{
125 type Output = ReplySubstream<C>;
126 type Error = io::Error;
127 type Future = future::Ready<Result<Self::Output, io::Error>>;
128
129 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
130 trace!("Upgrading inbound connection");
131 future::ok(ReplySubstream { inner: socket })
132 }
133}
134
135impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
136where
137 C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
138{
139 type Output = RemoteInfo;
140 type Error = upgrade::ReadOneError;
141 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
142
143 fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
144 Box::pin(async move {
145 socket.close().await?;
146 let msg = upgrade::read_one(&mut socket, 4096).await?;
147 let (info, observed_addr) = match parse_proto_msg(msg) {
148 Ok(v) => v,
149 Err(err) => {
150 debug!("Failed to parse protobuf message; error = {:?}", err);
151 return Err(err.into())
152 }
153 };
154
155 trace!("Remote observes us as {:?}", observed_addr);
156 trace!("Information received: {:?}", info);
157
158 Ok(RemoteInfo {
159 info,
160 observed_addr,
161 })
162 })
163 }
164}
165
166fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> {
169 match structs_proto::Identify::decode(msg.as_ref()) {
170 Ok(msg) => {
171 fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
174 Multiaddr::try_from(bytes)
175 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
176 }
177
178 let listen_addrs = {
179 let mut addrs = Vec::new();
180 for addr in msg.listen_addrs.into_iter() {
181 addrs.push(bytes_to_multiaddr(addr)?);
182 }
183 addrs
184 };
185
186 let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())
187 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
188
189 let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?;
190 let info = IdentifyInfo {
191 public_key,
192 protocol_version: msg.protocol_version.unwrap_or_default(),
193 agent_version: msg.agent_version.unwrap_or_default(),
194 listen_addrs,
195 protocols: msg.protocols
196 };
197
198 Ok((info, observed_addr))
199 }
200
201 Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)),
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
208 use libp2p_tcp::TcpConfig;
209 use futures::{prelude::*, channel::oneshot};
210 use libp2p_core::{
211 identity,
212 Transport,
213 upgrade::{self, apply_outbound, apply_inbound}
214 };
215
216 #[test]
217 fn correct_transfer() {
218 let send_pubkey = identity::Keypair::generate_ed25519().public();
221 let recv_pubkey = send_pubkey.clone();
222
223 let (tx, rx) = oneshot::channel();
224
225 let bg_task = async_std::task::spawn(async move {
226 let transport = TcpConfig::new();
227
228 let mut listener = transport
229 .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
230 .unwrap();
231
232 let addr = listener.next().await
233 .expect("some event")
234 .expect("no error")
235 .into_new_address()
236 .expect("listen address");
237 tx.send(addr).unwrap();
238
239 let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
240 let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
241 sender.send(
242 IdentifyInfo {
243 public_key: send_pubkey,
244 protocol_version: "proto_version".to_owned(),
245 agent_version: "agent_version".to_owned(),
246 listen_addrs: vec![
247 "/ip4/80.81.82.83/tcp/500".parse().unwrap(),
248 "/ip6/::1/udp/1000".parse().unwrap(),
249 ],
250 protocols: vec!["proto1".to_string(), "proto2".to_string()],
251 },
252 &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
253 ).await.unwrap();
254 });
255
256 async_std::task::block_on(async move {
257 let transport = TcpConfig::new();
258
259 let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
260 let RemoteInfo { info, observed_addr, .. } =
261 apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
262 assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
263 assert_eq!(info.public_key, recv_pubkey);
264 assert_eq!(info.protocol_version, "proto_version");
265 assert_eq!(info.agent_version, "agent_version");
266 assert_eq!(info.listen_addrs,
267 &["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
268 "/ip6/::1/udp/1000".parse().unwrap()]);
269 assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
270
271 bg_task.await;
272 });
273 }
274}