cloudpub_client/
ping.rs

1use anyhow::{Context, Result};
2use cloudpub_common::protocol::message::Message;
3use cloudpub_common::protocol::{ClientEndpoint, ServerEndpoint};
4use cloudpub_common::utils::find_free_tcp_port;
5use std::time::{Duration, Instant};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::{TcpListener, TcpStream};
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10use tracing::{debug, error};
11
12#[derive(Debug, Clone)]
13pub struct Settings {
14    pub warm_up_count: u64,
15    pub msg_count: u64,
16    pub msg_size: u64,
17    pub sleep_time: u64,
18}
19
20pub async fn start(port: u16) -> Result<mpsc::Sender<()>> {
21    // Create a channel for stop signal
22    let (stop_tx, mut stop_rx) = mpsc::channel(1);
23
24    let tcp_addr = format!("localhost:{}", port);
25
26    // Spawn TCP ponger in a non-blocking task
27    tokio::spawn(async move {
28        debug!("Starting TCP ponger on {}", tcp_addr);
29        match TcpListener::bind(&tcp_addr).await {
30            Ok(acceptor) => {
31                tokio::spawn(async move {
32                    loop {
33                        tokio::select! {
34                            _ = stop_rx.recv() => {
35                                debug!("TCP ponger received stop signal");
36                                break;
37                            }
38                            accept_result = acceptor.accept() => {
39                                match accept_result {
40                                    Ok((client, addr)) => {
41                                        debug!("TCP client connected from {}", addr);
42                                        tokio::spawn(pong_tcp(client));
43                                    }
44                                    Err(e) => {
45                                        error!("Failed to accept TCP connection: {}", e);
46                                        break;
47                                    }
48                                }
49                            }
50                        }
51                    }
52                });
53            }
54            Err(e) => {
55                error!("Failed to bind TCP ponger to {}: {}", tcp_addr, e);
56            }
57        }
58    });
59
60    Ok(stop_tx)
61}
62
63pub async fn publish(command_tx: mpsc::Sender<Message>) -> Result<()> {
64    let port = find_free_tcp_port()
65        .await
66        .context("Failed to find free TCP port")?;
67    // Create TCP publish args
68    let client = ClientEndpoint {
69        local_proto: cloudpub_common::protocol::Protocol::Tcp.into(),
70        local_addr: "localhost".to_string(),
71        local_port: port as u32,
72        description: Some("TCP Ponger".to_string()),
73        ..Default::default()
74    };
75
76    debug!("Publishing TCP service on port {}", port);
77    command_tx.send(Message::EndpointStart(client)).await?;
78    Ok(())
79}
80
81pub async fn ping_test(endpoint: ServerEndpoint, bare: bool) -> Result<String> {
82    debug!("Running ping test on {}", endpoint);
83    let addr = format!("{}:{}", endpoint.remote_addr, endpoint.remote_port);
84
85    let _stop_tx = start(endpoint.client.as_ref().unwrap().local_port as u16)
86        .await
87        .context("Failed to start ping service")?;
88
89    // Wait for the server to start
90    sleep(Duration::from_millis(100)).await;
91
92    let settings = Settings {
93        warm_up_count: 1,
94        msg_count: 10,
95        msg_size: 48,
96        sleep_time: 0,
97    };
98
99    let client = TcpStream::connect(&addr)
100        .await
101        .context(format!("Failed to connect to {}", addr))?;
102    let mut times = ping_tcp(client, &settings).await;
103
104    if times.is_empty() {
105        return Ok(crate::t!("error-measurement"));
106    }
107
108    times.sort();
109
110    if bare {
111        let p50 = times.len() as f64 * 0.5;
112        Ok(format!("{}", times[p50 as usize] / 1_000))
113    } else {
114        Ok(format_stats(times))
115    }
116}
117
118// TCP implementation
119async fn ping_tcp(mut client: TcpStream, settings: &Settings) -> Vec<u32> {
120    let msg_string = "x".to_string().repeat(settings.msg_size as usize);
121    let msg: &[u8] = msg_string.as_bytes();
122    let mut recv_buf: [u8; 65000] = [0; 65000];
123
124    let mut times = Vec::with_capacity(settings.msg_count as usize);
125
126    // Warm-up phase
127    for _ in 0..settings.warm_up_count {
128        send_single_ping_tcp(&mut client, msg, &mut recv_buf).await;
129    }
130
131    // Measurement phase
132    for _ in 0..settings.msg_count {
133        let start = Instant::now();
134        let bytes_read = send_single_ping_tcp(&mut client, msg, &mut recv_buf).await;
135        let end = Instant::now();
136
137        if bytes_read == 0 {
138            return times;
139        }
140
141        if bytes_read != msg.len() {
142            return times;
143        }
144
145        let duration = end.duration_since(start).subsec_nanos();
146        times.push(duration);
147
148        sleep(Duration::from_millis(settings.sleep_time)).await;
149    }
150
151    times
152}
153
154async fn send_single_ping_tcp(client: &mut TcpStream, msg: &[u8], recv_buf: &mut [u8]) -> usize {
155    debug!("Sending ping");
156    if let Err(e) = client.write_all(msg).await {
157        error!("Sending ping failed: {}", e);
158        return 0;
159    }
160
161    let mut bytes_read = 0;
162    while bytes_read < msg.len() {
163        match client.read(&mut recv_buf[bytes_read..]).await {
164            Ok(0) => return 0, // Connection closed
165            Ok(n) => bytes_read += n,
166            Err(e) => {
167                error!("Error reading from socket: {}", e);
168                return 0;
169            }
170        }
171    }
172
173    bytes_read
174}
175
176async fn pong_tcp(mut sock: TcpStream) {
177    let mut buf: [u8; 65000] = [0; 65000];
178
179    loop {
180        let total_read = match sock.read(&mut buf).await {
181            Ok(0) => return, // Connection closed
182            Ok(n) => n,
183            Err(e) => {
184                error!("Error reading from TCP socket: {}", e);
185                return;
186            }
187        };
188
189        // Send the response
190        if let Err(e) = sock.write_all(&buf[0..total_read]).await {
191            error!("Error writing to TCP socket: {}", e);
192            return;
193        }
194    }
195}
196
197fn format_stats(times: Vec<u32>) -> String {
198    let p50 = times.len() as f64 * 0.5;
199    let p95 = times.len() as f64 * 0.95;
200    let p99 = times.len() as f64 * 0.99;
201
202    // Convert nanoseconds to appropriate time units for better readability
203    let format_duration = |ns: u32| -> String {
204        if ns < 1_000 {
205            format!("{} ns", ns)
206        } else if ns < 1_000_000 {
207            format!("{:.2} µs", ns as f64 / 1_000.0)
208        } else if ns < 1_000_000_000 {
209            format!("{:.2} ms", ns as f64 / 1_000_000.0)
210        } else {
211            format!("{:.2} s", ns as f64 / 1_000_000_000.0)
212        }
213    };
214
215    format!(
216        "{}:\n   p50: {}\n   p95: {}\n   p99: {}\n   max: {}",
217        crate::t!("ping-time-percentiles"),
218        format_duration(times[p50 as usize]),
219        format_duration(times[p95 as usize]),
220        format_duration(times[p99 as usize]),
221        format_duration(*times.last().unwrap()),
222    )
223}