solana_netutil/
ip_echo_server.rs1use bytes::Bytes;
2use log::*;
3use serde_derive::{Deserialize, Serialize};
4use std::io;
5use std::net::SocketAddr;
6use std::time::Duration;
7use tokio;
8use tokio::net::TcpListener;
9use tokio::prelude::*;
10use tokio::reactor::Handle;
11use tokio::runtime::Runtime;
12use tokio_codec::{BytesCodec, Decoder};
13
14pub type IpEchoServer = Runtime;
15
16#[derive(Serialize, Deserialize, Default)]
17pub(crate) struct IpEchoServerMessage {
18 tcp_ports: [u16; 4], udp_ports: [u16; 4], }
21
22impl IpEchoServerMessage {
23 pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self {
24 let mut msg = Self::default();
25 assert!(tcp_ports.len() <= msg.tcp_ports.len());
26 assert!(udp_ports.len() <= msg.udp_ports.len());
27
28 msg.tcp_ports[..tcp_ports.len()].copy_from_slice(tcp_ports);
29 msg.udp_ports[..udp_ports.len()].copy_from_slice(udp_ports);
30 msg
31 }
32}
33
34pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
37 info!("bound to {:?}", tcp.local_addr());
38 let tcp =
39 TcpListener::from_std(tcp, &Handle::default()).expect("Failed to convert std::TcpListener");
40
41 let server = tcp
42 .incoming()
43 .map_err(|err| warn!("accept failed: {:?}", err))
44 .for_each(move |socket| {
45 let ip = socket.peer_addr().expect("Expect peer_addr()").ip();
46 info!("connection from {:?}", ip);
47
48 let framed = BytesCodec::new().framed(socket);
49 let (writer, reader) = framed.split();
50
51 let processor = reader
52 .and_then(move |bytes| {
53 bincode::deserialize::<IpEchoServerMessage>(&bytes).or_else(|err| {
54 Err(io::Error::new(
55 io::ErrorKind::Other,
56 format!("Failed to deserialize IpEchoServerMessage: {:?}", err),
57 ))
58 })
59 })
60 .and_then(move |msg| {
61 if !msg.udp_ports.is_empty() {
63 match std::net::UdpSocket::bind("0.0.0.0:0") {
64 Ok(udp_socket) => {
65 for udp_port in &msg.udp_ports {
66 if *udp_port != 0 {
67 match udp_socket
68 .send_to(&[0], SocketAddr::from((ip, *udp_port)))
69 {
70 Ok(_) => debug!("Successful send_to udp/{}", udp_port),
71 Err(err) => {
72 info!("Failed to send_to udp/{}: {}", udp_port, err)
73 }
74 }
75 }
76 }
77 }
78 Err(err) => {
79 warn!("Failed to bind local udp socket: {}", err);
80 }
81 }
82 }
83
84 let tcp_futures: Vec<_> = msg
86 .tcp_ports
87 .iter()
88 .filter_map(|tcp_port| {
89 let tcp_port = *tcp_port;
90 if tcp_port == 0 {
91 None
92 } else {
93 Some(
94 tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port))
95 .and_then(move |tcp_stream| {
96 debug!("Connection established to tcp/{}", tcp_port);
97 let _ = tcp_stream.shutdown(std::net::Shutdown::Both);
98 Ok(())
99 })
100 .timeout(Duration::from_secs(5))
101 .or_else(move |err| {
102 Err(io::Error::new(
103 io::ErrorKind::Other,
104 format!(
105 "Connection timeout to {}: {:?}",
106 tcp_port, err
107 ),
108 ))
109 }),
110 )
111 }
112 })
113 .collect();
114 future::join_all(tcp_futures)
115 })
116 .and_then(move |_| {
117 let ip = bincode::serialize(&ip).unwrap_or_else(|err| {
118 warn!("Failed to serialize: {:?}", err);
119 vec![]
120 });
121 Ok(Bytes::from(ip))
122 });
123
124 let connection = writer
125 .send_all(processor)
126 .timeout(Duration::from_secs(5))
127 .then(|result| {
128 if let Err(err) = result {
129 info!("Session failed: {:?}", err);
130 }
131 Ok(())
132 });
133
134 tokio::spawn(connection)
135 });
136
137 let mut rt = Runtime::new().expect("Failed to create Runtime");
138 rt.spawn(server);
139 rt
140}