solana_net_utils/
ip_echo_server.rs1use {
2 crate::{bind_to_unspecified, HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH},
3 log::*,
4 serde_derive::{Deserialize, Serialize},
5 solana_serde::default_on_eof,
6 std::{
7 io,
8 net::{IpAddr, SocketAddr},
9 num::NonZeroUsize,
10 time::Duration,
11 },
12 tokio::{
13 io::{AsyncReadExt, AsyncWriteExt},
14 net::{TcpListener, TcpStream},
15 runtime::{self, Runtime},
16 time::timeout,
17 },
18};
19
20pub type IpEchoServer = Runtime;
21
22pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = NonZeroUsize::new(2).unwrap();
26pub const DEFAULT_IP_ECHO_SERVER_THREADS: NonZeroUsize = MINIMUM_IP_ECHO_SERVER_THREADS;
29pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;
30
31const IO_TIMEOUT: Duration = Duration::from_secs(5);
32
33#[derive(Serialize, Deserialize, Default, Debug)]
34pub(crate) struct IpEchoServerMessage {
35 tcp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], }
38
39#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub struct IpEchoServerResponse {
41 pub(crate) address: IpAddr,
43 #[serde(deserialize_with = "default_on_eof")]
45 pub(crate) shred_version: Option<u16>,
46}
47
48impl IpEchoServerMessage {
49 pub fn new(tcp_ports: &[u16], udp_ports: &[u16]) -> Self {
50 let mut msg = Self::default();
51 assert!(tcp_ports.len() <= msg.tcp_ports.len());
52 assert!(udp_ports.len() <= msg.udp_ports.len());
53
54 msg.tcp_ports[..tcp_ports.len()].copy_from_slice(tcp_ports);
55 msg.udp_ports[..udp_ports.len()].copy_from_slice(udp_ports);
56 msg
57 }
58}
59
60pub(crate) fn ip_echo_server_request_length() -> usize {
61 const REQUEST_TERMINUS_LENGTH: usize = 1;
62 (HEADER_LENGTH + REQUEST_TERMINUS_LENGTH)
63 .wrapping_add(bincode::serialized_size(&IpEchoServerMessage::default()).unwrap() as usize)
64}
65
66async fn process_connection(
67 mut socket: TcpStream,
68 peer_addr: SocketAddr,
69 shred_version: Option<u16>,
70) -> io::Result<()> {
71 info!("connection from {peer_addr:?}");
72
73 let mut data = vec![0u8; ip_echo_server_request_length()];
74
75 let mut writer = {
76 let (mut reader, writer) = socket.split();
77 let _ = timeout(IO_TIMEOUT, reader.read_exact(&mut data)).await??;
78 writer
79 };
80
81 let request_header: String = data[0..HEADER_LENGTH].iter().map(|b| *b as char).collect();
82 if request_header != "\0\0\0\0" {
83 if request_header == "GET " || request_header == "POST" {
87 timeout(
89 IO_TIMEOUT,
90 writer.write_all(b"HTTP/1.1 400 Bad Request\nContent-length: 0\n\n"),
91 )
92 .await??;
93 return Ok(());
94 }
95 return Err(io::Error::other(format!(
96 "Bad request header: {request_header}"
97 )));
98 }
99
100 let msg =
101 bincode::deserialize::<IpEchoServerMessage>(&data[HEADER_LENGTH..]).map_err(|err| {
102 io::Error::other(format!(
103 "Failed to deserialize IpEchoServerMessage: {err:?}"
104 ))
105 })?;
106
107 trace!("request: {msg:?}");
108
109 match bind_to_unspecified() {
111 Ok(udp_socket) => {
112 for udp_port in &msg.udp_ports {
113 if *udp_port != 0 {
114 let result =
115 udp_socket.send_to(&[0], SocketAddr::from((peer_addr.ip(), *udp_port)));
116 match result {
117 Ok(_) => debug!("Successful send_to udp/{udp_port}"),
118 Err(err) => info!("Failed to send_to udp/{udp_port}: {err}"),
119 }
120 }
121 }
122 }
123 Err(err) => {
124 warn!("Failed to bind local udp socket: {err}");
125 }
126 }
127
128 for tcp_port in &msg.tcp_ports {
130 if *tcp_port != 0 {
131 debug!("Connecting to tcp/{tcp_port}");
132
133 let mut tcp_stream = timeout(
134 IO_TIMEOUT,
135 TcpStream::connect(&SocketAddr::new(peer_addr.ip(), *tcp_port)),
136 )
137 .await??;
138
139 debug!("Connection established to tcp/{}", *tcp_port);
140 tcp_stream.shutdown().await?;
141 }
142 }
143 let response = IpEchoServerResponse {
144 address: peer_addr.ip(),
145 shred_version,
146 };
147 let mut bytes = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
150 bincode::serialize_into(&mut bytes[HEADER_LENGTH..], &response).unwrap();
151 trace!("response: {bytes:?}");
152 writer.write_all(&bytes).await
153}
154
155async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Option<u16>) {
156 info!("bound to {:?}", tcp_listener.local_addr().unwrap());
157 let tcp_listener =
158 TcpListener::from_std(tcp_listener).expect("Failed to convert std::TcpListener");
159
160 loop {
161 let connection = tcp_listener.accept().await;
162 match connection {
163 Ok((socket, peer_addr)) => {
164 runtime::Handle::current().spawn(async move {
165 if let Err(err) = process_connection(socket, peer_addr, shred_version).await {
166 info!("session failed: {err:?}");
167 }
168 });
169 }
170 Err(err) => warn!("listener accept failed: {err:?}"),
171 }
172 }
173}
174
175pub fn ip_echo_server(
178 tcp_listener: std::net::TcpListener,
179 num_server_threads: NonZeroUsize,
180 shred_version: Option<u16>,
182) -> IpEchoServer {
183 tcp_listener.set_nonblocking(true).unwrap();
184
185 let runtime = tokio::runtime::Builder::new_multi_thread()
186 .thread_name("solIpEchoSrvrRt")
187 .worker_threads(num_server_threads.get())
188 .enable_all()
189 .build()
190 .expect("new tokio runtime");
191 runtime.spawn(run_echo_server(tcp_listener, shred_version));
192 runtime
193}