1use crate::structs_proto;
22use futures::prelude::*;
23use tet_libp2p_core::{
24 Multiaddr,
25 PublicKey,
26 upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}
27};
28use log::{debug, trace};
29use prost::Message;
30use std::convert::TryFrom;
31use std::{fmt, io, iter, pin::Pin};
32
33#[derive(Debug, Clone)]
35pub struct IdentifyProtocolConfig;
36
37#[derive(Debug, Clone)]
38pub struct RemoteInfo {
39 pub info: IdentifyInfo,
41 pub observed_addr: Multiaddr,
43
44 _priv: ()
45}
46
47pub struct ReplySubstream<T> {
49 inner: T,
50}
51
52impl<T> fmt::Debug for ReplySubstream<T> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_tuple("ReplySubstream").finish()
55 }
56}
57
58impl<T> ReplySubstream<T>
59where
60 T: AsyncWrite + Unpin
61{
62 pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr)
67 -> impl Future<Output = Result<(), io::Error>>
68 {
69 debug!("Sending identify info to client");
70 trace!("Sending: {:?}", info);
71
72 let listen_addrs = info.listen_addrs
73 .into_iter()
74 .map(|addr| addr.to_vec())
75 .collect();
76
77 let pubkey_bytes = info.public_key.into_protobuf_encoding();
78
79 let message = structs_proto::Identify {
80 agent_version: Some(info.agent_version),
81 protocol_version: Some(info.protocol_version),
82 public_key: Some(pubkey_bytes),
83 listen_addrs: listen_addrs,
84 observed_addr: Some(observed_addr.to_vec()),
85 protocols: info.protocols
86 };
87
88 async move {
89 let mut bytes = Vec::with_capacity(message.encoded_len());
90 message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
91 upgrade::write_one(&mut self.inner, &bytes).await
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct IdentifyInfo {
99 pub public_key: PublicKey,
101 pub protocol_version: String,
104 pub agent_version: String,
107 pub listen_addrs: Vec<Multiaddr>,
109 pub protocols: Vec<String>,
111}
112
113impl UpgradeInfo for IdentifyProtocolConfig {
114 type Info = &'static [u8];
115 type InfoIter = iter::Once<Self::Info>;
116
117 fn protocol_info(&self) -> Self::InfoIter {
118 iter::once(b"/ipfs/id/1.0.0")
119 }
120}
121
122impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
123where
124 C: AsyncRead + AsyncWrite + Unpin,
125{
126 type Output = ReplySubstream<C>;
127 type Error = io::Error;
128 type Future = future::Ready<Result<Self::Output, io::Error>>;
129
130 fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
131 trace!("Upgrading inbound connection");
132 future::ok(ReplySubstream { inner: socket })
133 }
134}
135
136impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
137where
138 C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
139{
140 type Output = RemoteInfo;
141 type Error = upgrade::ReadOneError;
142 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
143
144 fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
145 Box::pin(async move {
146 socket.close().await?;
147 let msg = upgrade::read_one(&mut socket, 4096).await?;
148 let (info, observed_addr) = match parse_proto_msg(msg) {
149 Ok(v) => v,
150 Err(err) => {
151 debug!("Failed to parse protobuf message; error = {:?}", err);
152 return Err(err.into())
153 }
154 };
155
156 trace!("Remote observes us as {:?}", observed_addr);
157 trace!("Information received: {:?}", info);
158
159 Ok(RemoteInfo {
160 info,
161 observed_addr: observed_addr.clone(),
162 _priv: ()
163 })
164 })
165 }
166}
167
168fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> {
171 match structs_proto::Identify::decode(msg.as_ref()) {
172 Ok(msg) => {
173 fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
176 Multiaddr::try_from(bytes)
177 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
178 }
179
180 let listen_addrs = {
181 let mut addrs = Vec::new();
182 for addr in msg.listen_addrs.into_iter() {
183 addrs.push(bytes_to_multiaddr(addr)?);
184 }
185 addrs
186 };
187
188 let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())
189 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
190
191 let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?;
192 let info = IdentifyInfo {
193 public_key,
194 protocol_version: msg.protocol_version.unwrap_or_default(),
195 agent_version: msg.agent_version.unwrap_or_default(),
196 listen_addrs,
197 protocols: msg.protocols
198 };
199
200 Ok((info, observed_addr))
201 }
202
203 Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)),
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
210 use tet_libp2p_tcp::TcpConfig;
211 use futures::{prelude::*, channel::oneshot};
212 use tet_libp2p_core::{
213 identity,
214 Transport,
215 upgrade::{self, apply_outbound, apply_inbound}
216 };
217
218 #[test]
219 fn correct_transfer() {
220 let send_pubkey = identity::Keypair::generate_ed25519().public();
223 let recv_pubkey = send_pubkey.clone();
224
225 let (tx, rx) = oneshot::channel();
226
227 let bg_task = async_std::task::spawn(async move {
228 let transport = TcpConfig::new();
229
230 let mut listener = transport
231 .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
232 .unwrap();
233
234 let addr = listener.next().await
235 .expect("some event")
236 .expect("no error")
237 .into_new_address()
238 .expect("listen address");
239 tx.send(addr).unwrap();
240
241 let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
242 let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
243 sender.send(
244 IdentifyInfo {
245 public_key: send_pubkey,
246 protocol_version: "proto_version".to_owned(),
247 agent_version: "agent_version".to_owned(),
248 listen_addrs: vec![
249 "/ip4/80.81.82.83/tcp/500".parse().unwrap(),
250 "/ip6/::1/udp/1000".parse().unwrap(),
251 ],
252 protocols: vec!["proto1".to_string(), "proto2".to_string()],
253 },
254 &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
255 ).await.unwrap();
256 });
257
258 async_std::task::block_on(async move {
259 let transport = TcpConfig::new();
260
261 let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
262 let RemoteInfo { info, observed_addr, .. } =
263 apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
264 assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
265 assert_eq!(info.public_key, recv_pubkey);
266 assert_eq!(info.protocol_version, "proto_version");
267 assert_eq!(info.agent_version, "agent_version");
268 assert_eq!(info.listen_addrs,
269 &["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
270 "/ip6/::1/udp/1000".parse().unwrap()]);
271 assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
272
273 bg_task.await;
274 });
275 }
276}