1use anyhow::{Context, Result};
2use cloudpub_common::protocol::message::Message;
3use cloudpub_common::protocol::{ClientEndpoint, ServerEndpoint};
4use cloudpub_common::utils::find_free_tcp_port;
5use std::time::{Duration, Instant};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::{TcpListener, TcpStream};
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10use tracing::{debug, error};
11
12#[derive(Debug, Clone)]
13pub struct Settings {
14 pub warm_up_count: u64,
15 pub msg_count: u64,
16 pub msg_size: u64,
17 pub sleep_time: u64,
18}
19
20pub async fn start(port: u16) -> Result<mpsc::Sender<()>> {
21 let (stop_tx, mut stop_rx) = mpsc::channel(1);
23
24 let tcp_addr = format!("localhost:{}", port);
25
26 tokio::spawn(async move {
28 debug!("Starting TCP ponger on {}", tcp_addr);
29 match TcpListener::bind(&tcp_addr).await {
30 Ok(acceptor) => {
31 tokio::spawn(async move {
32 loop {
33 tokio::select! {
34 _ = stop_rx.recv() => {
35 debug!("TCP ponger received stop signal");
36 break;
37 }
38 accept_result = acceptor.accept() => {
39 match accept_result {
40 Ok((client, addr)) => {
41 debug!("TCP client connected from {}", addr);
42 tokio::spawn(pong_tcp(client));
43 }
44 Err(e) => {
45 error!("Failed to accept TCP connection: {}", e);
46 break;
47 }
48 }
49 }
50 }
51 }
52 });
53 }
54 Err(e) => {
55 error!("Failed to bind TCP ponger to {}: {}", tcp_addr, e);
56 }
57 }
58 });
59
60 Ok(stop_tx)
61}
62
63pub async fn publish(command_tx: mpsc::Sender<Message>) -> Result<()> {
64 let port = find_free_tcp_port()
65 .await
66 .context("Failed to find free TCP port")?;
67 let client = ClientEndpoint {
69 local_proto: cloudpub_common::protocol::Protocol::Tcp.into(),
70 local_addr: "localhost".to_string(),
71 local_port: port as u32,
72 description: Some("TCP Ponger".to_string()),
73 ..Default::default()
74 };
75
76 debug!("Publishing TCP service on port {}", port);
77 command_tx.send(Message::EndpointStart(client)).await?;
78 Ok(())
79}
80
81pub async fn ping_test(endpoint: ServerEndpoint, bare: bool) -> Result<String> {
82 debug!("Running ping test on {}", endpoint);
83 let addr = format!("{}:{}", endpoint.remote_addr, endpoint.remote_port);
84
85 let _stop_tx = start(endpoint.client.as_ref().unwrap().local_port as u16)
86 .await
87 .context("Failed to start ping service")?;
88
89 sleep(Duration::from_millis(100)).await;
91
92 let settings = Settings {
93 warm_up_count: 1,
94 msg_count: 10,
95 msg_size: 48,
96 sleep_time: 0,
97 };
98
99 let client = TcpStream::connect(&addr)
100 .await
101 .context(format!("Failed to connect to {}", addr))?;
102 let mut times = ping_tcp(client, &settings).await;
103
104 if times.is_empty() {
105 return Ok(crate::t!("error-measurement"));
106 }
107
108 times.sort();
109
110 if bare {
111 let p50 = times.len() as f64 * 0.5;
112 Ok(format!("{}", times[p50 as usize] / 1_000))
113 } else {
114 Ok(format_stats(times))
115 }
116}
117
118async fn ping_tcp(mut client: TcpStream, settings: &Settings) -> Vec<u32> {
120 let msg_string = "x".to_string().repeat(settings.msg_size as usize);
121 let msg: &[u8] = msg_string.as_bytes();
122 let mut recv_buf: [u8; 65000] = [0; 65000];
123
124 let mut times = Vec::with_capacity(settings.msg_count as usize);
125
126 for _ in 0..settings.warm_up_count {
128 send_single_ping_tcp(&mut client, msg, &mut recv_buf).await;
129 }
130
131 for _ in 0..settings.msg_count {
133 let start = Instant::now();
134 let bytes_read = send_single_ping_tcp(&mut client, msg, &mut recv_buf).await;
135 let end = Instant::now();
136
137 if bytes_read == 0 {
138 return times;
139 }
140
141 if bytes_read != msg.len() {
142 return times;
143 }
144
145 let duration = end.duration_since(start).subsec_nanos();
146 times.push(duration);
147
148 sleep(Duration::from_millis(settings.sleep_time)).await;
149 }
150
151 times
152}
153
154async fn send_single_ping_tcp(client: &mut TcpStream, msg: &[u8], recv_buf: &mut [u8]) -> usize {
155 debug!("Sending ping");
156 if let Err(e) = client.write_all(msg).await {
157 error!("Sending ping failed: {}", e);
158 return 0;
159 }
160
161 let mut bytes_read = 0;
162 while bytes_read < msg.len() {
163 match client.read(&mut recv_buf[bytes_read..]).await {
164 Ok(0) => return 0, Ok(n) => bytes_read += n,
166 Err(e) => {
167 error!("Error reading from socket: {}", e);
168 return 0;
169 }
170 }
171 }
172
173 bytes_read
174}
175
176async fn pong_tcp(mut sock: TcpStream) {
177 let mut buf: [u8; 65000] = [0; 65000];
178
179 loop {
180 let total_read = match sock.read(&mut buf).await {
181 Ok(0) => return, Ok(n) => n,
183 Err(e) => {
184 error!("Error reading from TCP socket: {}", e);
185 return;
186 }
187 };
188
189 if let Err(e) = sock.write_all(&buf[0..total_read]).await {
191 error!("Error writing to TCP socket: {}", e);
192 return;
193 }
194 }
195}
196
197fn format_stats(times: Vec<u32>) -> String {
198 let p50 = times.len() as f64 * 0.5;
199 let p95 = times.len() as f64 * 0.95;
200 let p99 = times.len() as f64 * 0.99;
201
202 let format_duration = |ns: u32| -> String {
204 if ns < 1_000 {
205 format!("{} ns", ns)
206 } else if ns < 1_000_000 {
207 format!("{:.2} µs", ns as f64 / 1_000.0)
208 } else if ns < 1_000_000_000 {
209 format!("{:.2} ms", ns as f64 / 1_000_000.0)
210 } else {
211 format!("{:.2} s", ns as f64 / 1_000_000_000.0)
212 }
213 };
214
215 format!(
216 "{}:\n p50: {}\n p95: {}\n p99: {}\n max: {}",
217 crate::t!("ping-time-percentiles"),
218 format_duration(times[p50 as usize]),
219 format_duration(times[p95 as usize]),
220 format_duration(times[p99 as usize]),
221 format_duration(*times.last().unwrap()),
222 )
223}