rperf3/
server.rs

1use crate::buffer_pool::BufferPool;
2use crate::config::Config;
3use crate::interval_reporter::{run_reporter_task, IntervalReport, IntervalReporter};
4use crate::measurements::{get_tcp_stats, IntervalStats, MeasurementsCollector};
5use crate::protocol::{deserialize_message, serialize_message, Message, DEFAULT_STREAM_ID};
6use crate::{Error, Result};
7use log::{debug, error, info};
8use socket2::SockRef;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::time;
15use tokio_util::sync::CancellationToken;
16
17/// Configure TCP socket options for optimal performance.
18///
19/// This function applies the following optimizations:
20/// - **TCP_NODELAY**: Disables Nagle's algorithm to reduce latency
21/// - **Send buffer**: Increases to 256KB for higher throughput
22/// - **Receive buffer**: Increases to 256KB for higher throughput
23///
24/// # Arguments
25///
26/// * `stream` - The TCP stream to configure
27///
28/// # Returns
29///
30/// Returns `Ok(())` on success, or an `Error` if any socket option fails to set.
31///
32/// # Performance Impact
33///
34/// Expected 10-20% improvement in TCP throughput tests with these optimizations.
35fn configure_tcp_socket(stream: &TcpStream) -> Result<()> {
36    // Disable Nagle's algorithm for lower latency
37    stream.set_nodelay(true).map_err(|e| {
38        Error::Io(std::io::Error::new(
39            e.kind(),
40            format!("Failed to set TCP_NODELAY: {}", e),
41        ))
42    })?;
43
44    // Set larger send and receive buffers for higher throughput
45    const BUFFER_SIZE: usize = 256 * 1024; // 256KB
46    let sock_ref = SockRef::from(stream);
47
48    sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
49        Error::Io(std::io::Error::new(
50            e.kind(),
51            format!("Failed to set send buffer size: {}", e),
52        ))
53    })?;
54
55    sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
56        Error::Io(std::io::Error::new(
57            e.kind(),
58            format!("Failed to set recv buffer size: {}", e),
59        ))
60    })?;
61
62    debug!(
63        "TCP socket configured: TCP_NODELAY=true, buffers={}KB",
64        BUFFER_SIZE / 1024
65    );
66
67    Ok(())
68}
69
70/// Configure UDP socket options for optimal performance.
71///
72/// This function applies the following optimizations:
73/// - **Send buffer**: Increases to 2MB for better burst handling
74/// - **Receive buffer**: Increases to 2MB to reduce packet loss
75///
76/// # Arguments
77///
78/// * `socket` - The UDP socket to configure
79///
80/// # Returns
81///
82/// Returns `Ok(())` on success, or an `Error` if any socket option fails to set.
83///
84/// # Performance Impact
85///
86/// Expected 10-20% improvement in UDP throughput tests with reduced packet loss.
87fn configure_udp_socket(socket: &UdpSocket) -> Result<()> {
88    // Set larger send and receive buffers for UDP
89    const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2MB
90    let sock_ref = SockRef::from(socket);
91
92    sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
93        Error::Io(std::io::Error::new(
94            e.kind(),
95            format!("Failed to set UDP send buffer size: {}", e),
96        ))
97    })?;
98
99    sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
100        Error::Io(std::io::Error::new(
101            e.kind(),
102            format!("Failed to set UDP recv buffer size: {}", e),
103        ))
104    })?;
105
106    debug!(
107        "UDP socket configured: buffers={}MB",
108        BUFFER_SIZE / (1024 * 1024)
109    );
110
111    Ok(())
112}
113
114/// Network performance test server.
115///
116/// The `Server` listens for incoming TCP control connections and handles both TCP and UDP
117/// performance test requests. All tests use a TCP control channel for coordination, with
118/// UDP data transfer happening on the same port. The server supports reverse mode testing,
119/// bandwidth limiting, interval reporting, JSON output, and can handle multiple concurrent clients.
120///
121/// # Features
122///
123/// - **TCP Control Channel**: Always uses TCP for client coordination
124/// - **TCP and UDP Data**: Handle both reliable (TCP) and unreliable (UDP) performance tests
125/// - **Reverse Mode**: Send data to client for reverse throughput testing
126/// - **Bandwidth Limiting**: Control send rate in reverse mode tests
127/// - **Interval Reporting**: Display periodic statistics during tests
128/// - **JSON Output**: Machine-readable output format for automation
129/// - **UDP Metrics**: Track packet loss, jitter, and out-of-order delivery
130/// - **Concurrent Clients**: Handle multiple simultaneous test connections
131///
132/// # Examples
133///
134/// ## Basic TCP Server
135///
136/// ```no_run
137/// use rperf3::{Server, Config};
138///
139/// # #[tokio::main]
140/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
141/// let config = Config::server(5201);
142/// let server = Server::new(config);
143///
144/// println!("Starting server on port 5201...");
145/// server.run().await?;
146/// # Ok(())
147/// # }
148/// ```
149///
150/// ## UDP Server with Reverse Mode
151///
152/// ```no_run
153/// use rperf3::{Server, Config, Protocol};
154///
155/// # #[tokio::main]
156/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
157/// let config = Config::server(5201)
158///     .with_protocol(Protocol::Udp)
159///     .with_reverse(true); // Server will send UDP data
160///
161/// let server = Server::new(config);
162/// server.run().await?;
163/// # Ok(())
164/// # }
165/// ```
166pub struct Server {
167    config: Config,
168    measurements: MeasurementsCollector,
169    tcp_buffer_pool: Arc<BufferPool>,
170    udp_buffer_pool: Arc<BufferPool>,
171    cancellation_token: CancellationToken,
172}
173
174impl Server {
175    /// Creates a new server with the given configuration.
176    ///
177    /// # Arguments
178    ///
179    /// * `config` - The server configuration including port and protocol
180    ///
181    /// # Examples
182    ///
183    /// ```
184    /// use rperf3::{Server, Config};
185    ///
186    /// let config = Config::server(5201);
187    /// let server = Server::new(config);
188    /// ```
189    pub fn new(config: Config) -> Self {
190        // Create buffer pools for TCP and UDP
191        // TCP: use configured buffer size, pool up to 10 buffers per stream
192        let tcp_pool_size = config.parallel * 2; // 2 buffers per stream (send + receive)
193        let tcp_buffer_pool = Arc::new(BufferPool::new(config.buffer_size, tcp_pool_size));
194
195        // UDP: fixed 65536 bytes (max UDP packet size), pool up to 10 buffers
196        let udp_buffer_pool = Arc::new(BufferPool::new(65536, 10));
197
198        Self {
199            config,
200            measurements: MeasurementsCollector::new(),
201            tcp_buffer_pool,
202            udp_buffer_pool,
203            cancellation_token: CancellationToken::new(),
204        }
205    }
206
207    /// Returns a reference to the cancellation token.
208    ///
209    /// This allows external code to cancel the running server gracefully.
210    ///
211    /// # Examples
212    ///
213    /// ```no_run
214    /// use rperf3::{Server, Config};
215    ///
216    /// # #[tokio::main]
217    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
218    /// let config = Config::server(5201);
219    /// let server = Server::new(config);
220    ///
221    /// // Get cancellation token to stop from another task
222    /// let cancel_token = server.cancellation_token().clone();
223    ///
224    /// tokio::spawn(async move {
225    ///     // Server will be running
226    /// });
227    ///
228    /// // Later, to stop the server:
229    /// cancel_token.cancel();
230    /// # Ok(())
231    /// # }
232    /// ```
233    pub fn cancellation_token(&self) -> &CancellationToken {
234        &self.cancellation_token
235    }
236
237    /// Starts the server and begins listening for client connections.
238    ///
239    /// This method will run indefinitely, accepting and handling client connections.
240    /// For TCP, each client connection is handled in a separate task. For UDP,
241    /// the server processes incoming datagrams.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if:
246    /// - Cannot bind to the specified port
247    /// - Network I/O errors occur
248    ///
249    /// # Examples
250    ///
251    /// ```no_run
252    /// use rperf3::{Server, Config};
253    ///
254    /// # #[tokio::main]
255    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
256    /// let config = Config::server(5201);
257    /// let server = Server::new(config);
258    ///
259    /// println!("Server running...");
260    /// server.run().await?;
261    /// # Ok(())
262    /// # }
263    /// ```
264    pub async fn run(&self) -> Result<()> {
265        let bind_addr = format!(
266            "{}:{}",
267            self.config
268                .bind_addr
269                .map(|a| a.to_string())
270                .unwrap_or_else(|| "0.0.0.0".to_string()),
271            self.config.port
272        );
273
274        info!("Starting rperf3 server on {}", bind_addr);
275
276        // Server always uses TCP for control connections
277        // The protocol config is just for display/logging purposes
278        // Both TCP and UDP tests are handled via the TCP control channel
279        self.run_tcp(&bind_addr).await
280    }
281
282    async fn run_tcp(&self, bind_addr: &str) -> Result<()> {
283        let listener = TcpListener::bind(bind_addr).await?;
284        info!("TCP server listening on {}", bind_addr);
285
286        loop {
287            // Check for cancellation
288            if self.cancellation_token.is_cancelled() {
289                info!("Server shutting down gracefully");
290                break;
291            }
292
293            tokio::select! {
294                accept_result = listener.accept() => {
295                    match accept_result {
296                        Ok((stream, addr)) => {
297                            info!("New connection from {}", addr);
298                            let config = self.config.clone();
299                            let measurements = self.measurements.clone();
300                            let tcp_buffer_pool = self.tcp_buffer_pool.clone();
301                            let udp_buffer_pool = self.udp_buffer_pool.clone();
302
303                            tokio::spawn(async move {
304                                if let Err(e) = handle_tcp_client(
305                                    stream,
306                                    addr,
307                                    config,
308                                    measurements,
309                                    tcp_buffer_pool,
310                                    udp_buffer_pool,
311                                )
312                                .await
313                                {
314                                    error!("Error handling client {}: {}", addr, e);
315                                }
316                            });
317                        }
318                        Err(e) => {
319                            error!("Error accepting connection: {}", e);
320                        }
321                    }
322                }
323                _ = self.cancellation_token.cancelled() => {
324                    info!("Server shutting down gracefully");
325                    break;
326                }
327            }
328        }
329        Ok(())
330    }
331
332    #[allow(dead_code)]
333    async fn run_udp(&self, bind_addr: &str) -> Result<()> {
334        let socket = UdpSocket::bind(bind_addr).await?;
335        let local_addr = socket.local_addr()?;
336
337        // Configure UDP socket for optimal performance
338        configure_udp_socket(&socket)?;
339
340        info!("UDP server listening on {}", local_addr);
341
342        // Use batch operations on Linux for better performance
343        #[cfg(target_os = "linux")]
344        return self.run_udp_batched(socket).await;
345
346        #[cfg(not(target_os = "linux"))]
347        return self.run_udp_standard(socket).await;
348    }
349
350    /// Standard UDP receive implementation (one packet per system call)
351    #[cfg_attr(target_os = "linux", allow(dead_code))]
352    async fn run_udp_standard(&self, socket: UdpSocket) -> Result<()> {
353        // Create async interval reporter
354        let (reporter, receiver) = IntervalReporter::new();
355        let reporter_task = tokio::spawn(run_reporter_task(
356            receiver,
357            self.config.json,
358            None, // Server doesn't have callbacks
359        ));
360
361        let mut buf = self.udp_buffer_pool.get();
362        let start = Instant::now();
363        let mut last_interval = start;
364        let mut interval_bytes = 0u64;
365        let mut interval_packets = 0u64;
366
367        loop {
368            // Check for cancellation
369            if self.cancellation_token.is_cancelled() {
370                info!("Server shutting down gracefully");
371                break;
372            }
373
374            match socket.recv_from(&mut buf).await {
375                Ok((len, addr)) => {
376                    debug!("Received {} bytes from {}", len, addr);
377
378                    // Parse UDP packet
379                    if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buf[..len]) {
380                        // Get current receive timestamp
381                        let recv_timestamp_us = std::time::SystemTime::now()
382                            .duration_since(std::time::UNIX_EPOCH)
383                            .expect("Time went backwards")
384                            .as_micros() as u64;
385
386                        // Record packet with timing information
387                        self.measurements.record_udp_packet_received(
388                            header.sequence,
389                            header.timestamp_us,
390                            recv_timestamp_us,
391                        );
392                        self.measurements.record_bytes_received(0, len as u64);
393
394                        interval_bytes += len as u64;
395                        interval_packets += 1;
396                    } else {
397                        debug!("Received non-rperf3 UDP packet from {}", addr);
398                    }
399
400                    // Report interval
401                    if last_interval.elapsed() >= self.config.interval {
402                        let elapsed = start.elapsed();
403                        let interval_duration = last_interval.elapsed();
404                        let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
405
406                        let interval_start = if elapsed > interval_duration {
407                            elapsed - interval_duration
408                        } else {
409                            Duration::ZERO
410                        };
411
412                        self.measurements.add_interval(IntervalStats {
413                            start: interval_start,
414                            end: elapsed,
415                            bytes: interval_bytes,
416                            bits_per_second: bps,
417                            packets: interval_packets,
418                        });
419
420                        // Calculate UDP metrics
421                        let (lost, expected) = self.measurements.calculate_udp_loss();
422                        let loss_percent = if expected > 0 {
423                            (lost as f64 / expected as f64) * 100.0
424                        } else {
425                            0.0
426                        };
427                        let measurements = self.measurements.get();
428
429                        // Send to reporter task (async, non-blocking)
430                        reporter.report(IntervalReport {
431                            stream_id: DEFAULT_STREAM_ID,
432                            interval_start,
433                            interval_end: elapsed,
434                            bytes: interval_bytes,
435                            bits_per_second: bps,
436                            packets: Some(interval_packets),
437                            jitter_ms: Some(measurements.jitter_ms),
438                            lost_packets: Some(lost),
439                            lost_percent: Some(loss_percent),
440                            retransmits: None,
441                            cwnd: None,
442                        });
443
444                        interval_bytes = 0;
445                        interval_packets = 0;
446                        last_interval = Instant::now();
447                    }
448                }
449                Err(e) => {
450                    error!("Error receiving UDP packet: {}", e);
451                }
452            }
453        }
454
455        // Signal reporter completion and wait for it to finish
456        reporter.complete();
457        let _ = reporter_task.await;
458
459        Ok(())
460    }
461
462    /// Batched UDP receive implementation using recvmmsg (Linux only)
463    #[allow(dead_code)]
464    #[cfg(target_os = "linux")]
465    async fn run_udp_batched(&self, socket: UdpSocket) -> Result<()> {
466        use crate::batch_socket::UdpRecvBatch;
467
468        // Create async interval reporter
469        let (reporter, receiver) = IntervalReporter::new();
470        let reporter_task = tokio::spawn(run_reporter_task(
471            receiver,
472            self.config.json,
473            None, // Server doesn't have callbacks
474        ));
475
476        let mut batch = UdpRecvBatch::new();
477        let start = Instant::now();
478        let mut last_interval = start;
479        let mut interval_bytes = 0u64;
480        let mut interval_packets = 0u64;
481
482        loop {
483            // Check for cancellation
484            if self.cancellation_token.is_cancelled() {
485                info!("Server shutting down gracefully");
486                break;
487            }
488
489            // Receive a batch of packets
490            match batch.recv(&socket).await {
491                Ok(count) => {
492                    if count == 0 {
493                        continue;
494                    }
495
496                    debug!("Received {} packets in batch", count);
497
498                    // Process each packet in the batch
499                    for i in 0..count {
500                        if let Some((packet, addr)) = batch.get(i) {
501                            debug!(
502                                "Processing packet {} of {} bytes from {}",
503                                i,
504                                packet.len(),
505                                addr
506                            );
507
508                            // Parse UDP packet
509                            if let Some((header, _payload)) =
510                                crate::udp_packet::parse_packet(packet)
511                            {
512                                // Get current receive timestamp
513                                let recv_timestamp_us = std::time::SystemTime::now()
514                                    .duration_since(std::time::UNIX_EPOCH)
515                                    .expect("Time went backwards")
516                                    .as_micros()
517                                    as u64;
518
519                                // Record packet with timing information
520                                self.measurements.record_udp_packet_received(
521                                    header.sequence,
522                                    header.timestamp_us,
523                                    recv_timestamp_us,
524                                );
525                                self.measurements
526                                    .record_bytes_received(0, packet.len() as u64);
527
528                                interval_bytes += packet.len() as u64;
529                                interval_packets += 1;
530                            } else {
531                                debug!("Received non-rperf3 UDP packet from {}", addr);
532                            }
533                        }
534                    }
535
536                    // Report interval
537                    if last_interval.elapsed() >= self.config.interval {
538                        let elapsed = start.elapsed();
539                        let interval_duration = last_interval.elapsed();
540                        let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
541
542                        let interval_start = if elapsed > interval_duration {
543                            elapsed - interval_duration
544                        } else {
545                            Duration::ZERO
546                        };
547
548                        self.measurements.add_interval(IntervalStats {
549                            start: interval_start,
550                            end: elapsed,
551                            bytes: interval_bytes,
552                            bits_per_second: bps,
553                            packets: interval_packets,
554                        });
555
556                        // Calculate UDP metrics
557                        let (lost, expected) = self.measurements.calculate_udp_loss();
558                        let loss_percent = if expected > 0 {
559                            (lost as f64 / expected as f64) * 100.0
560                        } else {
561                            0.0
562                        };
563                        let measurements = self.measurements.get();
564
565                        // Send to reporter task (async, non-blocking)
566                        reporter.report(IntervalReport {
567                            stream_id: DEFAULT_STREAM_ID,
568                            interval_start,
569                            interval_end: elapsed,
570                            bytes: interval_bytes,
571                            bits_per_second: bps,
572                            packets: Some(interval_packets),
573                            jitter_ms: Some(measurements.jitter_ms),
574                            lost_packets: Some(lost),
575                            lost_percent: Some(loss_percent),
576                            retransmits: None,
577                            cwnd: None,
578                        });
579
580                        interval_bytes = 0;
581                        interval_packets = 0;
582                        last_interval = Instant::now();
583                    }
584                }
585                Err(e) => {
586                    error!("Error receiving UDP batch: {}", e);
587                }
588            }
589        }
590
591        // Signal reporter completion and wait for it to finish
592        reporter.complete();
593        let _ = reporter_task.await;
594
595        Ok(())
596    }
597
598    /// Retrieves the current measurements collected by the server.
599    ///
600    /// Returns a snapshot of the statistics collected from client tests. This
601    /// includes total bytes transferred, bandwidth measurements, and UDP-specific
602    /// metrics like packet loss and jitter.
603    ///
604    /// # Returns
605    ///
606    /// A `Measurements` struct containing comprehensive test statistics.
607    ///
608    /// # Examples
609    ///
610    /// ```
611    /// use rperf3::{Server, Config};
612    ///
613    /// let config = Config::server(5201);
614    /// let server = Server::new(config);
615    ///
616    /// // After tests have run
617    /// let measurements = server.get_measurements();
618    /// println!("Total bytes: {}", measurements.total_bytes_received);
619    /// println!("Throughput: {:.2} Mbps",
620    ///          measurements.total_bits_per_second() / 1_000_000.0);
621    /// ```
622    pub fn get_measurements(&self) -> crate::Measurements {
623        self.measurements.get()
624    }
625}
626
627async fn handle_tcp_client(
628    mut stream: TcpStream,
629    addr: SocketAddr,
630    config: Config,
631    measurements: MeasurementsCollector,
632    tcp_buffer_pool: Arc<BufferPool>,
633    udp_buffer_pool: Arc<BufferPool>,
634) -> Result<()> {
635    // Configure TCP socket options for optimal performance
636    configure_tcp_socket(&stream)?;
637
638    // Read setup message
639    let setup_msg = deserialize_message(&mut stream).await?;
640
641    let (protocol, duration, reverse, _parallel, bandwidth, buffer_size) = match setup_msg {
642        Message::Setup {
643            version: _,
644            protocol,
645            duration,
646            reverse,
647            parallel,
648            bandwidth,
649            buffer_size,
650            ..
651        } => {
652            info!(
653                "Client {} setup: protocol={}, duration={}s, reverse={}, parallel={}",
654                addr, protocol, duration, reverse, parallel
655            );
656            (
657                protocol,
658                Duration::from_secs(duration),
659                reverse,
660                parallel,
661                bandwidth,
662                buffer_size,
663            )
664        }
665        _ => {
666            return Err(Error::Protocol("Expected Setup message".to_string()));
667        }
668    };
669
670    // Check if this is UDP mode
671    if protocol == "Udp" {
672        // Create a config with the client's test parameters
673        let mut udp_config = config.clone();
674        udp_config.duration = duration;
675        udp_config.reverse = reverse;
676        udp_config.bandwidth = bandwidth;
677        udp_config.buffer_size = buffer_size;
678
679        // Handle UDP test via control channel
680        return handle_udp_test(stream, addr, udp_config, measurements, udp_buffer_pool).await;
681    }
682
683    // Send setup acknowledgment for TCP
684    let ack = Message::setup_ack(config.port, format!("{}", addr));
685    let ack_bytes = serialize_message(&ack)?;
686    stream.write_all(&ack_bytes).await?;
687    stream.flush().await?;
688
689    // Send start signal
690    let start_msg = Message::start(
691        std::time::SystemTime::now()
692            .duration_since(std::time::UNIX_EPOCH)
693            .unwrap()
694            .as_secs(),
695    );
696    let start_bytes = serialize_message(&start_msg)?;
697    stream.write_all(&start_bytes).await?;
698    stream.flush().await?;
699
700    measurements.set_start_time(Instant::now());
701
702    if reverse {
703        // Server sends data to client
704        send_data(
705            &mut stream,
706            0,
707            duration,
708            bandwidth,
709            &measurements,
710            &config,
711            tcp_buffer_pool.clone(),
712        )
713        .await?;
714    } else {
715        // Server receives data from client
716        receive_data(
717            &mut stream,
718            0,
719            duration,
720            &measurements,
721            &config,
722            tcp_buffer_pool.clone(),
723        )
724        .await?;
725    }
726
727    // Send final results
728    let final_measurements = measurements.get();
729    if let Some(stream_stats) = final_measurements.streams.first() {
730        let result_msg = Message::result(
731            0,
732            stream_stats.bytes_sent,
733            stream_stats.bytes_received,
734            final_measurements.total_duration.as_secs_f64(),
735            final_measurements.total_bits_per_second(),
736            None,
737        );
738        let result_bytes = serialize_message(&result_msg)?;
739        stream.write_all(&result_bytes).await?;
740        stream.flush().await?;
741    }
742
743    // Send done signal
744    let done_msg = Message::done();
745    let done_bytes = serialize_message(&done_msg)?;
746    stream.write_all(&done_bytes).await?;
747    stream.flush().await?;
748
749    info!(
750        "Test completed for {}: {:.2} Mbps",
751        addr,
752        final_measurements.total_bits_per_second() / 1_000_000.0
753    );
754
755    Ok(())
756}
757
758async fn handle_udp_test(
759    mut control_stream: TcpStream,
760    client_addr: SocketAddr,
761    config: Config,
762    measurements: MeasurementsCollector,
763    udp_buffer_pool: Arc<BufferPool>,
764) -> Result<()> {
765    let duration = config.duration;
766    let reverse = config.reverse;
767    let bandwidth = config.bandwidth;
768    let buffer_size = config.buffer_size;
769    // Send setup acknowledgment
770    let ack = Message::setup_ack(config.port, format!("{}", client_addr));
771    let ack_bytes = serialize_message(&ack)?;
772    control_stream.write_all(&ack_bytes).await?;
773    control_stream.flush().await?;
774
775    // Send start signal
776    let start_msg = Message::start(
777        std::time::SystemTime::now()
778            .duration_since(std::time::UNIX_EPOCH)
779            .unwrap()
780            .as_secs(),
781    );
782    let start_bytes = serialize_message(&start_msg)?;
783    control_stream.write_all(&start_bytes).await?;
784    control_stream.flush().await?;
785
786    measurements.set_start_time(Instant::now());
787
788    if reverse {
789        // Server sends UDP data to client
790        send_udp_data(
791            client_addr,
792            duration,
793            bandwidth,
794            buffer_size,
795            &measurements,
796            &config,
797            udp_buffer_pool.clone(),
798        )
799        .await?;
800    } else {
801        // Server receives UDP data from client
802        receive_udp_data(duration, &measurements, &config, udp_buffer_pool.clone()).await?;
803    }
804
805    info!(
806        "UDP test completed for {}: {:.2} Mbps",
807        client_addr,
808        measurements.get().total_bits_per_second() / 1_000_000.0
809    );
810
811    Ok(())
812}
813
814async fn send_udp_data(
815    _client_tcp_addr: SocketAddr,
816    duration: Duration,
817    bandwidth: Option<u64>,
818    buffer_size: usize,
819    measurements: &MeasurementsCollector,
820    config: &Config,
821    buffer_pool: Arc<BufferPool>,
822) -> Result<()> {
823    // Note: In UDP reverse mode, only the client prints interval reports.
824    // The server just tracks measurements for the final summary.
825
826    // Bind to the server's configured port for UDP
827    let bind_addr = format!("0.0.0.0:{}", config.port);
828    let socket = UdpSocket::bind(&bind_addr).await?;
829
830    // Configure UDP socket for optimal performance
831    configure_udp_socket(&socket)?;
832
833    info!("UDP server listening on port {}", config.port);
834
835    // Wait for first packet from client to discover their UDP port
836    let mut buf = buffer_pool.get();
837    let (_n, client_udp_addr) = socket.recv_from(&mut buf).await?;
838
839    info!("UDP client address discovered: {}", client_udp_addr);
840
841    // Now connect to client's UDP address
842    socket.connect(client_udp_addr).await?;
843
844    let start = Instant::now();
845    let mut last_interval = start;
846    let mut interval_bytes = 0u64;
847    let mut interval_packets = 0u64;
848    let mut sequence = 0u64;
849
850    // Calculate payload size accounting for UDP packet header
851    let payload_size = if buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
852        buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
853    } else {
854        1024
855    };
856
857    // Bandwidth limiting
858    let target_bytes_per_sec = bandwidth.map(|bw| bw / 8);
859    let mut total_bytes_sent = 0u64;
860    let mut last_bandwidth_check = start;
861
862    while start.elapsed() < duration {
863        let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
864
865        match socket.send(&packet).await {
866            Ok(n) => {
867                measurements.record_bytes_sent(0, n as u64);
868                measurements.record_udp_packet(0);
869                interval_bytes += n as u64;
870                interval_packets += 1;
871                sequence += 1;
872                total_bytes_sent += n as u64;
873
874                // Bandwidth limiting
875                if let Some(target_bps) = target_bytes_per_sec {
876                    let elapsed = last_bandwidth_check.elapsed().as_secs_f64();
877
878                    if elapsed >= 0.001 {
879                        let expected_bytes = (target_bps as f64 * elapsed) as u64;
880                        let bytes_sent_in_period = total_bytes_sent;
881
882                        if bytes_sent_in_period > expected_bytes {
883                            let bytes_ahead = (bytes_sent_in_period - expected_bytes) as f64;
884                            let sleep_time = bytes_ahead / target_bps as f64;
885                            if sleep_time > 0.0001 {
886                                time::sleep(Duration::from_secs_f64(sleep_time)).await;
887                            }
888                        }
889
890                        last_bandwidth_check = Instant::now();
891                        total_bytes_sent = 0;
892                    }
893                }
894
895                // Track interval stats internally (no printing on server side for reverse mode)
896                if last_interval.elapsed() >= config.interval {
897                    let elapsed = start.elapsed();
898                    let interval_duration = last_interval.elapsed();
899                    let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
900
901                    let interval_start = if elapsed > interval_duration {
902                        elapsed - interval_duration
903                    } else {
904                        Duration::ZERO
905                    };
906
907                    measurements.add_interval(IntervalStats {
908                        start: interval_start,
909                        end: elapsed,
910                        bytes: interval_bytes,
911                        bits_per_second: bps,
912                        packets: interval_packets,
913                    });
914
915                    interval_bytes = 0;
916                    interval_packets = 0;
917                    last_interval = Instant::now();
918                }
919            }
920            Err(e) => {
921                error!("Error sending UDP packet: {}", e);
922                break;
923            }
924        }
925    }
926
927    measurements.set_duration(start.elapsed());
928    Ok(())
929}
930
931async fn receive_udp_data(
932    duration: Duration,
933    measurements: &MeasurementsCollector,
934    config: &Config,
935    buffer_pool: Arc<BufferPool>,
936) -> Result<()> {
937    // Bind UDP socket on the server port
938    let bind_addr = format!("0.0.0.0:{}", config.port);
939    let socket = UdpSocket::bind(&bind_addr).await?;
940
941    // Configure UDP socket for optimal performance
942    configure_udp_socket(&socket)?;
943
944    info!("UDP server listening for packets on port {}", config.port);
945
946    let start = Instant::now();
947    let mut buf = buffer_pool.get();
948
949    // Receive packets until duration expires or timeout
950    while start.elapsed() < duration {
951        // Set a timeout so we can check elapsed time
952        let remaining = duration.saturating_sub(start.elapsed());
953        let timeout = remaining.min(Duration::from_millis(100));
954
955        match tokio::time::timeout(timeout, socket.recv_from(&mut buf)).await {
956            Ok(Ok((n, _addr))) => {
957                // Parse UDP packet
958                if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buf[..n]) {
959                    let recv_timestamp_us = std::time::SystemTime::now()
960                        .duration_since(std::time::UNIX_EPOCH)
961                        .unwrap()
962                        .as_micros() as u64;
963
964                    measurements.record_bytes_received(0, n as u64);
965                    measurements.record_udp_packet_received(
966                        header.sequence,
967                        header.timestamp_us,
968                        recv_timestamp_us,
969                    );
970                }
971            }
972            Ok(Err(e)) => {
973                error!("Error receiving UDP packet: {}", e);
974                break;
975            }
976            Err(_) => {
977                // Timeout - continue to check if duration expired
978                continue;
979            }
980        }
981    }
982
983    measurements.set_duration(start.elapsed());
984    Ok(())
985}
986
987async fn send_data(
988    stream: &mut TcpStream,
989    stream_id: usize,
990    duration: Duration,
991    bandwidth: Option<u64>,
992    measurements: &MeasurementsCollector,
993    config: &Config,
994    buffer_pool: Arc<BufferPool>,
995) -> Result<()> {
996    // Create async interval reporter
997    let (reporter, receiver) = IntervalReporter::new();
998    let reporter_task = tokio::spawn(run_reporter_task(
999        receiver,
1000        config.json,
1001        None, // Server doesn't have callbacks
1002    ));
1003
1004    let buffer = buffer_pool.get();
1005    let start = Instant::now();
1006    let mut last_interval = start;
1007    let mut interval_bytes = 0u64;
1008    let mut last_retransmits = 0u64;
1009
1010    // Bandwidth limiting
1011    let target_bytes_per_sec = bandwidth.map(|bw| bw / 8);
1012    let mut total_bytes_sent = 0u64;
1013    let mut last_bandwidth_check = start;
1014
1015    while start.elapsed() < duration {
1016        match stream.write(&buffer).await {
1017            Ok(n) => {
1018                measurements.record_bytes_sent(stream_id, n as u64);
1019                interval_bytes += n as u64;
1020                total_bytes_sent += n as u64;
1021
1022                // Bandwidth limiting
1023                if let Some(target_bps) = target_bytes_per_sec {
1024                    let elapsed = last_bandwidth_check.elapsed().as_secs_f64();
1025
1026                    if elapsed >= 0.001 {
1027                        let expected_bytes = (target_bps as f64 * elapsed) as u64;
1028                        let bytes_sent_in_period = total_bytes_sent;
1029
1030                        if bytes_sent_in_period > expected_bytes {
1031                            let bytes_ahead = (bytes_sent_in_period - expected_bytes) as f64;
1032                            let sleep_time = bytes_ahead / target_bps as f64;
1033                            if sleep_time > 0.0001 {
1034                                time::sleep(Duration::from_secs_f64(sleep_time)).await;
1035                            }
1036                        }
1037
1038                        last_bandwidth_check = Instant::now();
1039                        total_bytes_sent = 0;
1040                    }
1041                }
1042
1043                // Report interval
1044                if last_interval.elapsed() >= config.interval {
1045                    let elapsed = start.elapsed();
1046                    let interval_duration = last_interval.elapsed();
1047                    let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1048
1049                    let interval_start = if elapsed > interval_duration {
1050                        elapsed - interval_duration
1051                    } else {
1052                        Duration::ZERO
1053                    };
1054
1055                    // Get TCP stats for retransmits
1056                    let tcp_stats = get_tcp_stats(stream).ok();
1057                    let current_retransmits =
1058                        tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1059                    let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1060                    last_retransmits = current_retransmits;
1061
1062                    measurements.add_interval(IntervalStats {
1063                        start: interval_start,
1064                        end: elapsed,
1065                        bytes: interval_bytes,
1066                        bits_per_second: bps,
1067                        packets: u64::MAX,
1068                    });
1069
1070                    // Get congestion window for reporting
1071                    let cwnd_kbytes = tcp_stats
1072                        .as_ref()
1073                        .and_then(|s| s.snd_cwnd_opt())
1074                        .map(|cwnd| cwnd / 1024);
1075
1076                    // Send to reporter task (async, non-blocking)
1077                    reporter.report(IntervalReport {
1078                        stream_id: DEFAULT_STREAM_ID,
1079                        interval_start,
1080                        interval_end: elapsed,
1081                        bytes: interval_bytes,
1082                        bits_per_second: bps,
1083                        packets: None,
1084                        jitter_ms: None,
1085                        lost_packets: None,
1086                        lost_percent: None,
1087                        retransmits: if interval_retransmits > 0 {
1088                            Some(interval_retransmits)
1089                        } else {
1090                            None
1091                        },
1092                        cwnd: cwnd_kbytes,
1093                    });
1094
1095                    interval_bytes = 0;
1096                    last_interval = Instant::now();
1097                }
1098            }
1099            Err(e) => {
1100                error!("Error sending data: {}", e);
1101                break;
1102            }
1103        }
1104    }
1105
1106    // Signal reporter completion and wait for it to finish
1107    reporter.complete();
1108    let _ = reporter_task.await;
1109
1110    measurements.set_duration(start.elapsed());
1111    stream.flush().await?;
1112
1113    Ok(())
1114}
1115
1116async fn receive_data(
1117    stream: &mut TcpStream,
1118    stream_id: usize,
1119    duration: Duration,
1120    measurements: &MeasurementsCollector,
1121    config: &Config,
1122    buffer_pool: Arc<BufferPool>,
1123) -> Result<()> {
1124    // Create async interval reporter
1125    let (reporter, receiver) = IntervalReporter::new();
1126    let reporter_task = tokio::spawn(run_reporter_task(
1127        receiver,
1128        config.json,
1129        None, // Server doesn't have callbacks
1130    ));
1131
1132    let mut buffer = buffer_pool.get();
1133    let start = Instant::now();
1134    let mut last_interval = start;
1135    let mut interval_bytes = 0u64;
1136    let mut last_retransmits = 0u64;
1137
1138    while start.elapsed() < duration {
1139        match time::timeout(Duration::from_millis(100), stream.read(&mut buffer)).await {
1140            Ok(Ok(0)) => {
1141                // Connection closed
1142                break;
1143            }
1144            Ok(Ok(n)) => {
1145                measurements.record_bytes_received(stream_id, n as u64);
1146                interval_bytes += n as u64;
1147
1148                // Report interval
1149                if last_interval.elapsed() >= config.interval {
1150                    let elapsed = start.elapsed();
1151                    let interval_duration = last_interval.elapsed();
1152                    let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1153
1154                    let interval_start = if elapsed > interval_duration {
1155                        elapsed - interval_duration
1156                    } else {
1157                        Duration::ZERO
1158                    };
1159
1160                    // Get TCP stats for retransmits
1161                    let tcp_stats = get_tcp_stats(stream).ok();
1162                    let current_retransmits =
1163                        tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1164                    let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1165                    last_retransmits = current_retransmits;
1166
1167                    measurements.add_interval(IntervalStats {
1168                        start: interval_start,
1169                        end: elapsed,
1170                        bytes: interval_bytes,
1171                        bits_per_second: bps,
1172                        packets: u64::MAX,
1173                    });
1174
1175                    // Send to reporter task (async, non-blocking)
1176                    reporter.report(IntervalReport {
1177                        stream_id: DEFAULT_STREAM_ID,
1178                        interval_start,
1179                        interval_end: elapsed,
1180                        bytes: interval_bytes,
1181                        bits_per_second: bps,
1182                        packets: None,
1183                        jitter_ms: None,
1184                        lost_packets: None,
1185                        lost_percent: None,
1186                        retransmits: if interval_retransmits > 0 {
1187                            Some(interval_retransmits)
1188                        } else {
1189                            None
1190                        },
1191                        cwnd: None,
1192                    });
1193
1194                    interval_bytes = 0;
1195                    last_interval = Instant::now();
1196                }
1197            }
1198            Ok(Err(e)) => {
1199                error!("Error receiving data: {}", e);
1200                break;
1201            }
1202            Err(_) => {
1203                // Timeout, check if duration expired
1204                if start.elapsed() >= duration {
1205                    break;
1206                }
1207            }
1208        }
1209    }
1210
1211    // Signal reporter completion and wait for it to finish
1212    reporter.complete();
1213    let _ = reporter_task.await;
1214
1215    measurements.set_duration(start.elapsed());
1216
1217    Ok(())
1218}