solana_netutil/
ip_echo_server.rs

1use bytes::Bytes;
2use log::*;
3use serde_derive::{Deserialize, Serialize};
4use std::io;
5use std::net::SocketAddr;
6use std::time::Duration;
7use tokio;
8use tokio::net::TcpListener;
9use tokio::prelude::*;
10use tokio::reactor::Handle;
11use tokio::runtime::Runtime;
12use tokio_codec::{BytesCodec, Decoder};
13
14pub type IpEchoServer = Runtime;
15
16#[derive(Serialize, Deserialize, Default)]
17pub(crate) struct IpEchoServerMessage {
18    tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
19    udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
20}
21
22impl IpEchoServerMessage {
23    pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self {
24        let mut msg = Self::default();
25        assert!(tcp_ports.len() <= msg.tcp_ports.len());
26        assert!(udp_ports.len() <= msg.udp_ports.len());
27
28        msg.tcp_ports[..tcp_ports.len()].copy_from_slice(tcp_ports);
29        msg.udp_ports[..udp_ports.len()].copy_from_slice(udp_ports);
30        msg
31    }
32}
33
34/// Starts a simple TCP server on the given port that echos the IP address of any peer that
35/// connects.  Used by |get_public_ip_addr|
36pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
37    info!("bound to {:?}", tcp.local_addr());
38    let tcp =
39        TcpListener::from_std(tcp, &Handle::default()).expect("Failed to convert std::TcpListener");
40
41    let server = tcp
42        .incoming()
43        .map_err(|err| warn!("accept failed: {:?}", err))
44        .for_each(move |socket| {
45            let ip = socket.peer_addr().expect("Expect peer_addr()").ip();
46            info!("connection from {:?}", ip);
47
48            let framed = BytesCodec::new().framed(socket);
49            let (writer, reader) = framed.split();
50
51            let processor = reader
52                .and_then(move |bytes| {
53                    bincode::deserialize::<IpEchoServerMessage>(&bytes).or_else(|err| {
54                        Err(io::Error::new(
55                            io::ErrorKind::Other,
56                            format!("Failed to deserialize IpEchoServerMessage: {:?}", err),
57                        ))
58                    })
59                })
60                .and_then(move |msg| {
61                    // Fire a datagram at each non-zero UDP port
62                    if !msg.udp_ports.is_empty() {
63                        match std::net::UdpSocket::bind("0.0.0.0:0") {
64                            Ok(udp_socket) => {
65                                for udp_port in &msg.udp_ports {
66                                    if *udp_port != 0 {
67                                        match udp_socket
68                                            .send_to(&[0], SocketAddr::from((ip, *udp_port)))
69                                        {
70                                            Ok(_) => debug!("Successful send_to udp/{}", udp_port),
71                                            Err(err) => {
72                                                info!("Failed to send_to udp/{}: {}", udp_port, err)
73                                            }
74                                        }
75                                    }
76                                }
77                            }
78                            Err(err) => {
79                                warn!("Failed to bind local udp socket: {}", err);
80                            }
81                        }
82                    }
83
84                    // Try to connect to each non-zero TCP port
85                    let tcp_futures: Vec<_> = msg
86                        .tcp_ports
87                        .iter()
88                        .filter_map(|tcp_port| {
89                            let tcp_port = *tcp_port;
90                            if tcp_port == 0 {
91                                None
92                            } else {
93                                Some(
94                                    tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port))
95                                        .and_then(move |tcp_stream| {
96                                            debug!("Connection established to tcp/{}", tcp_port);
97                                            let _ = tcp_stream.shutdown(std::net::Shutdown::Both);
98                                            Ok(())
99                                        })
100                                        .timeout(Duration::from_secs(5))
101                                        .or_else(move |err| {
102                                            Err(io::Error::new(
103                                                io::ErrorKind::Other,
104                                                format!(
105                                                    "Connection timeout to {}: {:?}",
106                                                    tcp_port, err
107                                                ),
108                                            ))
109                                        }),
110                                )
111                            }
112                        })
113                        .collect();
114                    future::join_all(tcp_futures)
115                })
116                .and_then(move |_| {
117                    let ip = bincode::serialize(&ip).unwrap_or_else(|err| {
118                        warn!("Failed to serialize: {:?}", err);
119                        vec![]
120                    });
121                    Ok(Bytes::from(ip))
122                });
123
124            let connection = writer
125                .send_all(processor)
126                .timeout(Duration::from_secs(5))
127                .then(|result| {
128                    if let Err(err) = result {
129                        info!("Session failed: {:?}", err);
130                    }
131                    Ok(())
132                });
133
134            tokio::spawn(connection)
135        });
136
137    let mut rt = Runtime::new().expect("Failed to create Runtime");
138    rt.spawn(server);
139    rt
140}