rperf3/
client.rs

1use crate::buffer_pool::BufferPool;
2use crate::config::{Config, Protocol};
3use crate::interval_reporter::{run_reporter_task, IntervalReport, IntervalReporter};
4use crate::measurements::{
5    get_connection_info, get_system_info, get_tcp_stats, IntervalStats, MeasurementsCollector,
6    TestConfig,
7};
8use crate::protocol::{deserialize_message, serialize_message, Message, DEFAULT_STREAM_ID};
9use crate::{Error, Result};
10use log::{debug, error, info};
11use socket2::SockRef;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::{TcpStream, UdpSocket};
16use tokio::time;
17use tokio_util::sync::CancellationToken;
18
19/// Configure TCP socket options for optimal performance.
20///
21/// This function applies the following optimizations:
22/// - **TCP_NODELAY**: Disables Nagle's algorithm to reduce latency
23/// - **Send buffer**: Increases to 256KB for higher throughput
24/// - **Receive buffer**: Increases to 256KB for higher throughput
25///
26/// # Arguments
27///
28/// * `stream` - The TCP stream to configure
29///
30/// # Returns
31///
32/// Returns `Ok(())` on success, or an `Error` if any socket option fails to set.
33///
34/// # Performance Impact
35///
36/// Expected 10-20% improvement in TCP throughput tests with these optimizations.
37fn configure_tcp_socket(stream: &TcpStream) -> Result<()> {
38    // Disable Nagle's algorithm for lower latency
39    stream.set_nodelay(true).map_err(|e| {
40        Error::Io(std::io::Error::new(
41            e.kind(),
42            format!("Failed to set TCP_NODELAY: {}", e),
43        ))
44    })?;
45
46    // Set larger send and receive buffers for higher throughput
47    const BUFFER_SIZE: usize = 256 * 1024; // 256KB
48    let sock_ref = SockRef::from(stream);
49
50    sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
51        Error::Io(std::io::Error::new(
52            e.kind(),
53            format!("Failed to set send buffer size: {}", e),
54        ))
55    })?;
56
57    sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
58        Error::Io(std::io::Error::new(
59            e.kind(),
60            format!("Failed to set recv buffer size: {}", e),
61        ))
62    })?;
63
64    debug!(
65        "TCP socket configured: TCP_NODELAY=true, buffers={}KB",
66        BUFFER_SIZE / 1024
67    );
68
69    Ok(())
70}
71
72/// Configure UDP socket options for optimal performance.
73///
74/// This function applies the following optimizations:
75/// - **Send buffer**: Increases to 2MB for better burst handling
76/// - **Receive buffer**: Increases to 2MB to reduce packet loss
77///
78/// # Arguments
79///
80/// * `socket` - The UDP socket to configure
81///
82/// # Returns
83///
84/// Returns `Ok(())` on success, or an `Error` if any socket option fails to set.
85///
86/// # Performance Impact
87///
88/// Expected 10-20% improvement in UDP throughput tests with reduced packet loss.
89fn configure_udp_socket(socket: &UdpSocket) -> Result<()> {
90    // Set larger send and receive buffers for UDP
91    const BUFFER_SIZE: usize = 2 * 1024 * 1024; // 2MB
92    let sock_ref = SockRef::from(socket);
93
94    sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
95        Error::Io(std::io::Error::new(
96            e.kind(),
97            format!("Failed to set UDP send buffer size: {}", e),
98        ))
99    })?;
100
101    sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
102        Error::Io(std::io::Error::new(
103            e.kind(),
104            format!("Failed to set UDP recv buffer size: {}", e),
105        ))
106    })?;
107
108    debug!(
109        "UDP socket configured: buffers={}MB",
110        BUFFER_SIZE / (1024 * 1024)
111    );
112
113    Ok(())
114}
115
116/// Progress event types reported during test execution.
117///
118/// These events allow monitoring of test progress in real-time through callbacks.
119/// Events are emitted for test lifecycle stages and periodic updates.
120///
121/// # Examples
122///
123/// ```no_run
124/// use rperf3::{Client, Config, ProgressEvent};
125/// use std::time::Duration;
126///
127/// # #[tokio::main]
128/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
129/// let config = Config::client("127.0.0.1".to_string(), 5201)
130///     .with_duration(Duration::from_secs(10));
131///
132/// let client = Client::new(config)?
133///     .with_callback(|event: ProgressEvent| {
134///         match event {
135///             ProgressEvent::TestStarted => println!("Starting..."),
136///             ProgressEvent::IntervalUpdate { bits_per_second, .. } => {
137///                 println!("Speed: {:.2} Mbps", bits_per_second / 1_000_000.0);
138///             }
139///             ProgressEvent::TestCompleted { total_bytes, .. } => {
140///                 println!("Transferred {} bytes", total_bytes);
141///             }
142///             ProgressEvent::Error(msg) => eprintln!("Error: {}", msg),
143///         }
144///     });
145///
146/// client.run().await?;
147/// # Ok(())
148/// # }
149/// ```
150#[derive(Debug, Clone)]
151pub enum ProgressEvent {
152    /// Test is starting.
153    ///
154    /// This event is emitted once at the beginning of test execution.
155    TestStarted,
156    /// Interval update with statistics.
157    ///
158    /// Emitted periodically (based on the interval configuration) with
159    /// cumulative statistics for the current interval.
160    ///
161    /// # Fields
162    ///
163    /// * `interval_start` - Start time of this interval relative to test start
164    /// * `interval_end` - End time of this interval relative to test start
165    /// * `bytes` - Number of bytes transferred during this interval
166    /// * `bits_per_second` - Throughput in bits per second for this interval
167    /// * `packets` - Number of packets (UDP only)
168    /// * `jitter_ms` - Jitter in milliseconds (UDP only)
169    /// * `lost_packets` - Number of lost packets (UDP only)
170    /// * `lost_percent` - Packet loss percentage (UDP only)
171    /// * `retransmits` - Number of TCP retransmits (TCP only)
172    IntervalUpdate {
173        interval_start: Duration,
174        interval_end: Duration,
175        bytes: u64,
176        bits_per_second: f64,
177        packets: Option<u64>,
178        jitter_ms: Option<f64>,
179        lost_packets: Option<u64>,
180        lost_percent: Option<f64>,
181        retransmits: Option<u64>,
182    },
183    /// Test completed with final measurements.
184    ///
185    /// Emitted once at the end of a successful test with total statistics.
186    ///
187    /// # Fields
188    ///
189    /// * `total_bytes` - Total bytes transferred during the entire test
190    /// * `duration` - Actual test duration
191    /// * `bits_per_second` - Average throughput over the entire test
192    /// * `total_packets` - Total packets sent/received (UDP only)
193    /// * `jitter_ms` - Final jitter measurement in milliseconds (UDP only)
194    /// * `lost_packets` - Total lost packets (UDP only)
195    /// * `lost_percent` - Final packet loss percentage (UDP only)
196    /// * `out_of_order` - Out-of-order packet count (UDP only)
197    TestCompleted {
198        total_bytes: u64,
199        duration: Duration,
200        bits_per_second: f64,
201        total_packets: Option<u64>,
202        jitter_ms: Option<f64>,
203        lost_packets: Option<u64>,
204        lost_percent: Option<f64>,
205        out_of_order: Option<u64>,
206    },
207    /// Error occurred during test execution.
208    ///
209    /// Contains a descriptive error message. After this event, the test
210    /// will typically terminate.
211    Error(String),
212}
213
214/// Callback trait for receiving progress updates during test execution.
215///
216/// Implement this trait to receive real-time notifications about test progress.
217/// The trait is automatically implemented for any function or closure with the
218/// correct signature.
219///
220/// # Examples
221///
222/// ## Using a Closure
223///
224/// ```no_run
225/// use rperf3::{Client, Config, ProgressEvent};
226///
227/// # #[tokio::main]
228/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
229/// let config = Config::client("127.0.0.1".to_string(), 5201);
230/// let client = Client::new(config)?
231///     .with_callback(|event| {
232///         println!("Event: {:?}", event);
233///     });
234/// # Ok(())
235/// # }
236/// ```
237///
238/// ## Custom Implementation
239///
240/// ```
241/// use rperf3::ProgressCallback;
242/// use rperf3::ProgressEvent;
243///
244/// struct MyCallback;
245///
246/// impl ProgressCallback for MyCallback {
247///     fn on_progress(&self, event: ProgressEvent) {
248///         // Custom handling
249///     }
250/// }
251/// ```
252pub trait ProgressCallback: Send + Sync {
253    /// Called when a progress event occurs.
254    ///
255    /// # Arguments
256    ///
257    /// * `event` - The progress event that occurred
258    fn on_progress(&self, event: ProgressEvent);
259}
260
261/// Simple function-based callback
262impl<F> ProgressCallback for F
263where
264    F: Fn(ProgressEvent) + Send + Sync,
265{
266    fn on_progress(&self, event: ProgressEvent) {
267        self(event)
268    }
269}
270
271type CallbackRef = Arc<dyn ProgressCallback>;
272
273/// Network performance test client.
274///
275/// The `Client` is responsible for connecting to a server and running network
276/// performance tests. It supports TCP and UDP protocols, reverse mode testing,
277/// bandwidth limiting, and provides real-time progress updates through callbacks.
278///
279/// # Features
280///
281/// - **TCP and UDP**: Test both reliable (TCP) and unreliable (UDP) protocols
282/// - **Reverse Mode**: Server sends data to client instead of client to server
283/// - **Bandwidth Limiting**: Control send rate with configurable bandwidth targets
284/// - **UDP Metrics**: Packet loss, jitter (RFC 3550), and out-of-order detection
285/// - **Progress Callbacks**: Real-time updates during test execution
286///
287/// # Examples
288///
289/// ## Basic TCP Test
290///
291/// ```no_run
292/// use rperf3::{Client, Config, Protocol};
293/// use std::time::Duration;
294///
295/// # #[tokio::main]
296/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
297/// let config = Config::client("192.168.1.100".to_string(), 5201)
298///     .with_protocol(Protocol::Tcp)
299///     .with_duration(Duration::from_secs(10));
300///
301/// let client = Client::new(config)?;
302/// client.run().await?;
303///
304/// let measurements = client.get_measurements();
305/// println!("Average throughput: {:.2} Mbps",
306///          measurements.total_bits_per_second() / 1_000_000.0);
307/// # Ok(())
308/// # }
309/// ```
310///
311/// ## UDP Test with Bandwidth Limit
312///
313/// ```no_run
314/// use rperf3::{Client, Config, Protocol};
315/// use std::time::Duration;
316///
317/// # #[tokio::main]
318/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
319/// let config = Config::client("192.168.1.100".to_string(), 5201)
320///     .with_protocol(Protocol::Udp)
321///     .with_bandwidth(100_000_000) // 100 Mbps
322///     .with_duration(Duration::from_secs(10));
323///
324/// let client = Client::new(config)?;
325/// client.run().await?;
326///
327/// let measurements = client.get_measurements();
328/// println!("Packets: {}, Loss: {}, Jitter: {:.3} ms",
329///          measurements.total_packets,
330///          measurements.lost_packets,
331///          measurements.jitter_ms);
332/// # Ok(())
333/// # }
334/// ```
335///
336/// ## With Progress Callback
337///
338/// ```no_run
339/// use rperf3::{Client, Config, ProgressEvent};
340/// use std::time::Duration;
341///
342/// # #[tokio::main]
343/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
344/// let config = Config::client("127.0.0.1".to_string(), 5201);
345///
346/// let client = Client::new(config)?
347///     .with_callback(|event: ProgressEvent| {
348///         match event {
349///             ProgressEvent::IntervalUpdate { bits_per_second, .. } => {
350///                 println!("{:.2} Mbps", bits_per_second / 1_000_000.0);
351///             }
352///             _ => {}
353///         }
354///     });
355///
356/// client.run().await?;
357/// # Ok(())
358/// # }
359/// ```
360pub struct Client {
361    config: Config,
362    measurements: MeasurementsCollector,
363    callback: Option<CallbackRef>,
364    tcp_buffer_pool: Arc<BufferPool>,
365    udp_buffer_pool: Arc<BufferPool>,
366    cancellation_token: CancellationToken,
367    stream_id: usize,
368}
369
370impl Client {
371    /// Creates a new client with the given configuration.
372    ///
373    /// # Arguments
374    ///
375    /// * `config` - The test configuration. Must have a server address set.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the configuration doesn't have a server address set.
380    ///
381    /// # Examples
382    ///
383    /// ```
384    /// use rperf3::{Client, Config};
385    ///
386    /// let config = Config::client("127.0.0.1".to_string(), 5201);
387    /// let client = Client::new(config).expect("Failed to create client");
388    /// ```
389    pub fn new(config: Config) -> Result<Self> {
390        if config.server_addr.is_none() {
391            return Err(Error::Config(
392                "Server address is required for client mode".to_string(),
393            ));
394        }
395
396        // Create buffer pools for TCP and UDP
397        // TCP: use configured buffer size, pool up to 10 buffers per stream
398        let tcp_pool_size = config.parallel * 2; // 2 buffers per stream (send + receive)
399        let tcp_buffer_pool = Arc::new(BufferPool::new(config.buffer_size, tcp_pool_size));
400
401        // UDP: fixed 65536 bytes (max UDP packet size), pool up to 10 buffers
402        let udp_buffer_pool = Arc::new(BufferPool::new(65536, 10));
403
404        Ok(Self {
405            config,
406            measurements: MeasurementsCollector::new(),
407            callback: None,
408            tcp_buffer_pool,
409            udp_buffer_pool,
410            cancellation_token: CancellationToken::new(),
411            stream_id: DEFAULT_STREAM_ID, // Use default stream ID matching iperf3
412        })
413    }
414
415    /// Attaches a progress callback to receive real-time test updates.
416    ///
417    /// The callback will be invoked for each progress event during test execution,
418    /// including test start, interval updates, completion, and errors.
419    ///
420    /// # Arguments
421    ///
422    /// * `callback` - A function or closure that implements `ProgressCallback`
423    ///
424    /// # Returns
425    ///
426    /// Returns `self` for method chaining.
427    ///
428    /// # Examples
429    ///
430    /// ```no_run
431    /// use rperf3::{Client, Config, ProgressEvent};
432    ///
433    /// # #[tokio::main]
434    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
435    /// let config = Config::client("127.0.0.1".to_string(), 5201);
436    /// let client = Client::new(config)?
437    ///     .with_callback(|event: ProgressEvent| {
438    ///         println!("Progress: {:?}", event);
439    ///     });
440    /// # Ok(())
441    /// # }
442    /// ```
443    pub fn with_callback<C: ProgressCallback + 'static>(mut self, callback: C) -> Self {
444        self.callback = Some(Arc::new(callback));
445        self
446    }
447
448    /// Notify callback of progress event
449    fn notify(&self, event: ProgressEvent) {
450        if let Some(callback) = &self.callback {
451            callback.on_progress(event);
452        }
453    }
454
455    /// Returns a reference to the cancellation token.
456    ///
457    /// This allows external code to cancel the running test gracefully.
458    ///
459    /// # Examples
460    ///
461    /// ```no_run
462    /// use rperf3::{Client, Config};
463    /// use std::time::Duration;
464    ///
465    /// # #[tokio::main]
466    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
467    /// let config = Config::client("127.0.0.1".to_string(), 5201);
468    /// let client = Client::new(config)?;
469    ///
470    /// // Get cancellation token to cancel from another task
471    /// let cancel_token = client.cancellation_token().clone();
472    ///
473    /// tokio::spawn(async move {
474    ///     tokio::time::sleep(Duration::from_secs(5)).await;
475    ///     cancel_token.cancel();
476    /// });
477    ///
478    /// client.run().await?;
479    /// # Ok(())
480    /// # }
481    /// ```
482    pub fn cancellation_token(&self) -> &CancellationToken {
483        &self.cancellation_token
484    }
485
486    /// Runs the network performance test.
487    ///
488    /// This method connects to the server and executes the configured test.
489    /// It will block until the test completes or an error occurs.
490    ///
491    /// Progress events are emitted through the callback (if set) during execution.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if:
496    /// - Cannot connect to the server
497    /// - Network communication fails
498    /// - Protocol errors occur
499    ///
500    /// # Examples
501    ///
502    /// ```no_run
503    /// use rperf3::{Client, Config};
504    ///
505    /// # #[tokio::main]
506    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
507    /// let config = Config::client("127.0.0.1".to_string(), 5201);
508    /// let client = Client::new(config)?;
509    ///
510    /// client.run().await?;
511    /// println!("Test completed successfully");
512    /// # Ok(())
513    /// # }
514    /// ```
515    pub async fn run(&self) -> Result<()> {
516        let server_addr = self
517            .config
518            .server_addr
519            .as_ref()
520            .ok_or_else(|| Error::Config("Server address not set".to_string()))?;
521
522        let full_addr = format!("{}:{}", server_addr, self.config.port);
523
524        info!("Connecting to rperf3 server at {}", full_addr);
525
526        match self.config.protocol {
527            Protocol::Tcp => self.run_tcp(&full_addr).await,
528            Protocol::Udp => self.run_udp(&full_addr).await,
529        }
530    }
531
532    async fn run_tcp(&self, server_addr: &str) -> Result<()> {
533        let mut stream = TcpStream::connect(server_addr).await?;
534        info!("Connected to {}", server_addr);
535
536        // Configure TCP socket options for optimal performance
537        configure_tcp_socket(&stream)?;
538
539        // Print iperf3-style connection info
540        if !self.config.json {
541            let local_addr = stream.local_addr()?;
542            let remote_addr = stream.peer_addr()?;
543            println!(
544                "Connecting to host {}, port {}",
545                remote_addr.ip(),
546                remote_addr.port()
547            );
548            println!(
549                "[{:3}] local {} port {} connected to {} port {}",
550                self.stream_id,
551                local_addr.ip(),
552                local_addr.port(),
553                remote_addr.ip(),
554                remote_addr.port()
555            );
556        }
557
558        // Collect connection and system information
559        let connection_info = get_connection_info(&stream).ok();
560        let system_info = Some(get_system_info());
561
562        // Send setup message
563        let setup = Message::setup(
564            self.config.protocol.as_str().to_string(),
565            self.config.duration,
566            self.config.bandwidth,
567            self.config.buffer_size,
568            self.config.parallel,
569            self.config.reverse,
570        );
571        let setup_bytes = serialize_message(&setup)?;
572        stream.write_all(&setup_bytes).await?;
573        stream.flush().await?;
574
575        // Read setup acknowledgment
576        let ack_msg = deserialize_message(&mut stream).await?;
577        match ack_msg {
578            Message::SetupAck { port, cookie } => {
579                debug!("Received setup ack: port={}, cookie={}", port, cookie);
580            }
581            Message::Error { message } => {
582                return Err(Error::Protocol(format!("Server error: {}", message)));
583            }
584            _ => {
585                return Err(Error::Protocol("Expected SetupAck message".to_string()));
586            }
587        }
588
589        // Read start signal
590        let start_msg = deserialize_message(&mut stream).await?;
591        match start_msg {
592            Message::Start { .. } => {
593                info!("Test started");
594                self.notify(ProgressEvent::TestStarted);
595            }
596            _ => {
597                return Err(Error::Protocol("Expected Start message".to_string()));
598            }
599        }
600
601        self.measurements.set_start_time(Instant::now());
602
603        // Print iperf3-style header
604        if !self.config.json {
605            if self.config.reverse {
606                println!("[ ID] Interval           Transfer        Bitrate            Retr");
607            } else {
608                println!("[ ID] Interval           Transfer        Bitrate            Retr  Cwnd");
609            }
610        }
611
612        if self.config.reverse {
613            // Client receives data from server
614            receive_data(
615                &mut stream,
616                self.stream_id,
617                &self.measurements,
618                &self.config,
619                &self.callback,
620                self.tcp_buffer_pool.clone(),
621                &self.cancellation_token,
622            )
623            .await?;
624        } else {
625            // Client sends data to server
626            send_data(
627                &mut stream,
628                self.stream_id,
629                &self.measurements,
630                &self.config,
631                &self.callback,
632                self.tcp_buffer_pool.clone(),
633                &self.cancellation_token,
634            )
635            .await?;
636        }
637
638        // Read final results - handle connection errors gracefully
639        match deserialize_message(&mut stream).await {
640            Ok(result_msg) => match result_msg {
641                Message::Result {
642                    stream_id,
643                    bytes_sent,
644                    bytes_received,
645                    duration: _,
646                    bits_per_second,
647                    ..
648                } => {
649                    info!(
650                        "Stream {}: {} bytes sent, {} bytes received, {:.2} Mbps",
651                        stream_id,
652                        bytes_sent,
653                        bytes_received,
654                        bits_per_second / 1_000_000.0
655                    );
656                }
657                _ => {
658                    debug!("Unexpected message, continuing");
659                }
660            },
661            Err(e) => {
662                debug!(
663                    "Could not read result message (connection may be closed): {}",
664                    e
665                );
666            }
667        }
668
669        // Read done signal - handle connection errors gracefully
670        match deserialize_message(&mut stream).await {
671            Ok(done_msg) => match done_msg {
672                Message::Done => {
673                    info!("Test completed");
674                }
675                _ => {
676                    debug!("Expected Done message");
677                }
678            },
679            Err(e) => {
680                debug!(
681                    "Could not read done message (connection may be closed): {}",
682                    e
683                );
684                info!("Test completed");
685            }
686        }
687
688        let final_measurements = self.measurements.get();
689
690        // Notify callback of completion
691        self.notify(ProgressEvent::TestCompleted {
692            total_bytes: final_measurements.total_bytes_sent
693                + final_measurements.total_bytes_received,
694            duration: final_measurements.total_duration,
695            bits_per_second: final_measurements.total_bits_per_second(),
696            total_packets: None, // TCP doesn't track packets
697            jitter_ms: None,
698            lost_packets: None,
699            lost_percent: None,
700            out_of_order: None,
701        });
702
703        if !self.config.json {
704            print_results(&final_measurements, self.stream_id, self.config.reverse);
705        } else {
706            // Use detailed results for JSON output
707            let test_config = TestConfig {
708                protocol: self.config.protocol.as_str().to_string(),
709                num_streams: self.config.parallel,
710                blksize: self.config.buffer_size,
711                omit: 0,
712                duration: self.config.duration.as_secs(),
713                reverse: self.config.reverse,
714            };
715            let detailed_results =
716                self.measurements
717                    .get_detailed_results(connection_info, system_info, test_config);
718            let json = serde_json::to_string_pretty(&detailed_results)?;
719            println!("{}", json);
720        }
721
722        Ok(())
723    }
724
725    async fn run_udp(&self, server_addr: &str) -> Result<()> {
726        // For UDP, we still need a TCP control connection for setup
727        // This is similar to how iperf3 works
728        let mut control_stream = TcpStream::connect(server_addr).await?;
729
730        // Configure TCP socket options for control connection
731        configure_tcp_socket(&control_stream)?;
732
733        // Send setup message via TCP
734        let setup = Message::setup(
735            self.config.protocol.as_str().to_string(),
736            self.config.duration,
737            self.config.bandwidth,
738            self.config.buffer_size,
739            self.config.parallel,
740            self.config.reverse,
741        );
742        let setup_bytes = serialize_message(&setup)?;
743        control_stream.write_all(&setup_bytes).await?;
744        control_stream.flush().await?;
745
746        // Read setup acknowledgment
747        let ack_msg = deserialize_message(&mut control_stream).await?;
748        match ack_msg {
749            Message::SetupAck { port, cookie } => {
750                debug!("Received setup ack: port={}, cookie={}", port, cookie);
751            }
752            Message::Error { message } => {
753                return Err(Error::Protocol(format!("Server error: {}", message)));
754            }
755            _ => {
756                return Err(Error::Protocol("Expected SetupAck message".to_string()));
757            }
758        }
759
760        // Read start signal
761        let start_msg = deserialize_message(&mut control_stream).await?;
762        match start_msg {
763            Message::Start { .. } => {
764                info!("Test started");
765                self.notify(ProgressEvent::TestStarted);
766            }
767            _ => {
768                return Err(Error::Protocol("Expected Start message".to_string()));
769            }
770        }
771
772        // Now create UDP socket for data
773        let socket = UdpSocket::bind("0.0.0.0:0").await?;
774        socket.connect(server_addr).await?;
775
776        // Configure UDP socket for optimal performance
777        configure_udp_socket(&socket)?;
778
779        info!("UDP client connected to {}", server_addr);
780
781        // Print iperf3-style connection info
782        if !self.config.json {
783            let local_addr = socket.local_addr()?;
784            let remote_addr = socket.peer_addr()?;
785            println!(
786                "Connecting to host {}, port {}",
787                remote_addr.ip(),
788                remote_addr.port()
789            );
790            println!(
791                "[{:3}] local {} port {} connected to {} port {}",
792                self.stream_id,
793                local_addr.ip(),
794                local_addr.port(),
795                remote_addr.ip(),
796                remote_addr.port()
797            );
798            println!("[ ID] Interval           Transfer        Bitrate            Total Datagrams");
799        }
800
801        let result = if self.config.reverse {
802            // Reverse mode: Send one initialization packet to let server know our UDP port
803            let init_packet = crate::udp_packet::create_packet(0, 0);
804            socket.send(&init_packet).await?;
805
806            // Receive data from server
807            self.run_udp_receive(socket).await
808        } else {
809            // Normal mode: send data to server
810            self.run_udp_send(socket).await
811        };
812
813        // Close control connection
814        drop(control_stream);
815
816        result
817    }
818
819    async fn run_udp_send(&self, socket: UdpSocket) -> Result<()> {
820        // Use batch operations on Linux for better performance
821        #[cfg(target_os = "linux")]
822        return self.run_udp_send_batched(socket).await;
823
824        #[cfg(not(target_os = "linux"))]
825        return self.run_udp_send_standard(socket).await;
826    }
827
828    /// Standard UDP send implementation (one packet per system call)
829    #[cfg_attr(target_os = "linux", allow(dead_code))]
830    async fn run_udp_send_standard(&self, socket: UdpSocket) -> Result<()> {
831        // Create interval reporter and spawn reporting task
832        let (reporter, receiver) = IntervalReporter::new();
833        let reporter_task = tokio::spawn(run_reporter_task(
834            receiver,
835            self.config.json,
836            self.callback.clone(),
837        ));
838
839        let start = Instant::now();
840        let mut last_interval = start;
841        let mut interval_bytes = 0u64;
842        let mut interval_packets = 0u64;
843        let mut sequence = 0u64;
844
845        // Calculate payload size accounting for UDP packet header
846        let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
847            self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
848        } else {
849            1024
850        };
851
852        // Create token bucket for bandwidth limiting if needed
853        let mut token_bucket = self
854            .config
855            .bandwidth
856            .map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
857
858        while start.elapsed() < self.config.duration {
859            // Check for cancellation
860            if self.cancellation_token.is_cancelled() {
861                info!("Test cancelled by user");
862                break;
863            }
864
865            let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
866
867            match socket.send(&packet).await {
868                Ok(n) => {
869                    self.measurements.record_bytes_sent(0, n as u64);
870                    self.measurements.record_udp_packet(0);
871                    interval_bytes += n as u64;
872                    interval_packets += 1;
873                    sequence += 1;
874
875                    // Token bucket bandwidth limiting
876                    if let Some(ref mut bucket) = token_bucket {
877                        bucket.consume(n).await;
878                    }
879
880                    // Report interval
881                    if last_interval.elapsed() >= self.config.interval {
882                        let elapsed = start.elapsed();
883                        let interval_duration = last_interval.elapsed();
884                        let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
885
886                        let interval_start = if elapsed > interval_duration {
887                            elapsed - interval_duration
888                        } else {
889                            Duration::ZERO
890                        };
891
892                        self.measurements.add_interval(IntervalStats {
893                            start: interval_start,
894                            end: elapsed,
895                            bytes: interval_bytes,
896                            bits_per_second: bps,
897                            packets: interval_packets,
898                        });
899
900                        // Calculate UDP metrics
901                        let (lost, expected) = self.measurements.calculate_udp_loss();
902                        let loss_percent = if expected > 0 {
903                            (lost as f64 / expected as f64) * 100.0
904                        } else {
905                            0.0
906                        };
907                        let measurements = self.measurements.get();
908
909                        // Send to reporter task (async, non-blocking)
910                        reporter.report(IntervalReport {
911                            stream_id: self.stream_id,
912                            interval_start,
913                            interval_end: elapsed,
914                            bytes: interval_bytes,
915                            bits_per_second: bps,
916                            packets: Some(interval_packets),
917                            jitter_ms: Some(measurements.jitter_ms),
918                            lost_packets: Some(lost),
919                            lost_percent: Some(loss_percent),
920                            retransmits: None,
921                            cwnd: None,
922                        });
923
924                        interval_bytes = 0;
925                        interval_packets = 0;
926                        last_interval = Instant::now();
927                    }
928                }
929                Err(e) => {
930                    error!("Error sending UDP packet: {}", e);
931                    break;
932                }
933            }
934        }
935
936        // Signal reporter task to complete
937        reporter.complete();
938        // Wait for reporter task to finish
939        let _ = reporter_task.await;
940
941        self.measurements.set_duration(start.elapsed());
942
943        let final_measurements = self.measurements.get();
944
945        // Calculate final UDP metrics
946        let (lost, expected) = self.measurements.calculate_udp_loss();
947        let loss_percent = if expected > 0 {
948            (lost as f64 / expected as f64) * 100.0
949        } else {
950            0.0
951        };
952
953        // Notify callback of completion
954        self.notify(ProgressEvent::TestCompleted {
955            total_bytes: final_measurements.total_bytes_sent
956                + final_measurements.total_bytes_received,
957            duration: final_measurements.total_duration,
958            bits_per_second: final_measurements.total_bits_per_second(),
959            total_packets: Some(final_measurements.total_packets),
960            jitter_ms: Some(final_measurements.jitter_ms),
961            lost_packets: Some(lost),
962            lost_percent: Some(loss_percent),
963            out_of_order: Some(final_measurements.out_of_order_packets),
964        });
965
966        if !self.config.json {
967            print_results(&final_measurements, self.stream_id, self.config.reverse);
968        } else {
969            // Use detailed results for JSON output
970            let system_info = Some(get_system_info());
971            let test_config = TestConfig {
972                protocol: self.config.protocol.as_str().to_string(),
973                num_streams: self.config.parallel,
974                blksize: self.config.buffer_size,
975                omit: 0,
976                duration: self.config.duration.as_secs(),
977                reverse: self.config.reverse,
978            };
979            let detailed_results = self.measurements.get_detailed_results(
980                None, // UDP doesn't have connection info
981                system_info,
982                test_config,
983            );
984            let json = serde_json::to_string_pretty(&detailed_results)?;
985            println!("{}", json);
986        }
987
988        Ok(())
989    }
990
991    /// Batched UDP send implementation using sendmmsg (Linux only)
992    #[cfg(target_os = "linux")]
993    async fn run_udp_send_batched(&self, socket: UdpSocket) -> Result<()> {
994        use crate::batch_socket::{UdpSendBatch, MAX_BATCH_SIZE};
995
996        // Create async interval reporter
997        let (reporter, receiver) = IntervalReporter::new();
998        let reporter_task = tokio::spawn(run_reporter_task(
999            receiver,
1000            self.config.json,
1001            self.callback.clone(),
1002        ));
1003
1004        let start = Instant::now();
1005        let mut last_interval = start;
1006        let mut interval_bytes = 0u64;
1007        let mut interval_packets = 0u64;
1008        let mut sequence = 0u64;
1009
1010        // Calculate payload size accounting for UDP packet header
1011        let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
1012            self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
1013        } else {
1014            1024
1015        };
1016
1017        // Create token bucket for bandwidth limiting if needed
1018        let mut token_bucket = self
1019            .config
1020            .bandwidth
1021            .map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
1022
1023        // Batch for sending multiple packets at once
1024        let mut batch = UdpSendBatch::new();
1025        let remote_addr = socket.peer_addr()?;
1026
1027        // Adapt batch size based on bandwidth target
1028        let adaptive_batch_size = if let Some(ref bucket) = token_bucket {
1029            // For lower bandwidth, use smaller batches to maintain rate control accuracy
1030            let target_bps = bucket.bytes_per_sec;
1031            let packets_per_sec = target_bps / payload_size as u64;
1032            if packets_per_sec < 1000 {
1033                // Low rate: use smaller batches for better control
1034                (MAX_BATCH_SIZE / 4).max(4)
1035            } else if packets_per_sec < 10000 {
1036                // Medium rate
1037                MAX_BATCH_SIZE / 2
1038            } else {
1039                // High rate: use full batch size
1040                MAX_BATCH_SIZE
1041            }
1042        } else {
1043            // No bandwidth limit: use maximum batch size
1044            MAX_BATCH_SIZE
1045        };
1046
1047        while start.elapsed() < self.config.duration {
1048            // Check for cancellation
1049            if self.cancellation_token.is_cancelled() {
1050                info!("Test cancelled by user");
1051                break;
1052            }
1053
1054            // Fill the batch
1055            while !batch.is_full()
1056                && batch.len() < adaptive_batch_size
1057                && start.elapsed() < self.config.duration
1058            {
1059                let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
1060                batch.add(packet, remote_addr);
1061                sequence += 1;
1062            }
1063
1064            // Send the batch
1065            if !batch.is_empty() {
1066                match batch.send(&socket).await {
1067                    Ok((bytes_sent, packets_sent)) => {
1068                        // Record measurements for all packets in the batch
1069                        self.measurements.record_bytes_sent(0, bytes_sent as u64);
1070                        for _ in 0..packets_sent {
1071                            self.measurements.record_udp_packet(0);
1072                        }
1073
1074                        interval_bytes += bytes_sent as u64;
1075                        interval_packets += packets_sent as u64;
1076
1077                        // Token bucket bandwidth limiting
1078                        if let Some(ref mut bucket) = token_bucket {
1079                            bucket.consume(bytes_sent).await;
1080                        }
1081
1082                        // Report interval
1083                        if last_interval.elapsed() >= self.config.interval {
1084                            let elapsed = start.elapsed();
1085                            let interval_duration = last_interval.elapsed();
1086                            let bps =
1087                                (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1088
1089                            let interval_start = if elapsed > interval_duration {
1090                                elapsed - interval_duration
1091                            } else {
1092                                Duration::ZERO
1093                            };
1094
1095                            self.measurements.add_interval(IntervalStats {
1096                                start: interval_start,
1097                                end: elapsed,
1098                                bytes: interval_bytes,
1099                                bits_per_second: bps,
1100                                packets: interval_packets,
1101                            });
1102
1103                            // Calculate UDP metrics for callback
1104                            let (lost, expected) = self.measurements.calculate_udp_loss();
1105                            let loss_percent = if expected > 0 {
1106                                (lost as f64 / expected as f64) * 100.0
1107                            } else {
1108                                0.0
1109                            };
1110                            let measurements = self.measurements.get();
1111
1112                            // Send to reporter task (async, non-blocking)
1113                            reporter.report(IntervalReport {
1114                                stream_id: self.stream_id,
1115                                interval_start,
1116                                interval_end: elapsed,
1117                                bytes: interval_bytes,
1118                                bits_per_second: bps,
1119                                packets: Some(interval_packets),
1120                                jitter_ms: Some(measurements.jitter_ms),
1121                                lost_packets: Some(lost),
1122                                lost_percent: Some(loss_percent),
1123                                retransmits: None,
1124                                cwnd: None,
1125                            });
1126
1127                            interval_bytes = 0;
1128                            interval_packets = 0;
1129                            last_interval = Instant::now();
1130                        }
1131                    }
1132                    Err(e) => {
1133                        error!("Error sending batch: {}", e);
1134                        break;
1135                    }
1136                }
1137            }
1138        }
1139
1140        // Signal reporter completion and wait for it to finish
1141        reporter.complete();
1142        let _ = reporter_task.await;
1143
1144        self.measurements.set_duration(start.elapsed());
1145
1146        let final_measurements = self.measurements.get();
1147
1148        // Calculate final UDP metrics
1149        let (lost, expected) = self.measurements.calculate_udp_loss();
1150        let loss_percent = if expected > 0 {
1151            (lost as f64 / expected as f64) * 100.0
1152        } else {
1153            0.0
1154        };
1155
1156        // Notify callback of completion
1157        self.notify(ProgressEvent::TestCompleted {
1158            total_bytes: final_measurements.total_bytes_sent
1159                + final_measurements.total_bytes_received,
1160            duration: final_measurements.total_duration,
1161            bits_per_second: final_measurements.total_bits_per_second(),
1162            total_packets: Some(final_measurements.total_packets),
1163            jitter_ms: Some(final_measurements.jitter_ms),
1164            lost_packets: Some(lost),
1165            lost_percent: Some(loss_percent),
1166            out_of_order: Some(final_measurements.out_of_order_packets),
1167        });
1168
1169        if !self.config.json {
1170            print_results(&final_measurements, self.stream_id, self.config.reverse);
1171        } else {
1172            // Use detailed results for JSON output
1173            let system_info = Some(get_system_info());
1174            let test_config = TestConfig {
1175                protocol: self.config.protocol.as_str().to_string(),
1176                num_streams: self.config.parallel,
1177                blksize: self.config.buffer_size,
1178                omit: 0,
1179                duration: self.config.duration.as_secs(),
1180                reverse: self.config.reverse,
1181            };
1182            let detailed_results = self.measurements.get_detailed_results(
1183                None, // UDP doesn't have connection info
1184                system_info,
1185                test_config,
1186            );
1187            let json = serde_json::to_string_pretty(&detailed_results)?;
1188            println!("{}", json);
1189        }
1190
1191        Ok(())
1192    }
1193
1194    async fn run_udp_receive(&self, socket: UdpSocket) -> Result<()> {
1195        // Create async interval reporter
1196        let (reporter, receiver) = IntervalReporter::new();
1197        let reporter_task = tokio::spawn(run_reporter_task(
1198            receiver,
1199            self.config.json,
1200            self.callback.clone(),
1201        ));
1202
1203        let start = Instant::now();
1204        let mut last_interval = start;
1205        let mut interval_bytes = 0u64;
1206        let mut interval_packets = 0u64;
1207        let mut buffer = self.udp_buffer_pool.get();
1208
1209        while start.elapsed() < self.config.duration {
1210            // Check for cancellation
1211            if self.cancellation_token.is_cancelled() {
1212                info!("Test cancelled by user");
1213                break;
1214            }
1215
1216            // Set a timeout for recv to check duration periodically
1217            let timeout =
1218                tokio::time::timeout(Duration::from_millis(100), socket.recv(&mut buffer));
1219
1220            match timeout.await {
1221                Ok(Ok(n)) => {
1222                    // Try to parse as UDP packet to get sequence and timestamp
1223                    if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buffer[..n])
1224                    {
1225                        // Get current receive timestamp
1226                        let recv_timestamp_us = std::time::SystemTime::now()
1227                            .duration_since(std::time::UNIX_EPOCH)
1228                            .expect("Time went backwards")
1229                            .as_micros() as u64;
1230
1231                        self.measurements.record_udp_packet_received(
1232                            header.sequence,
1233                            header.timestamp_us,
1234                            recv_timestamp_us,
1235                        );
1236                    }
1237
1238                    self.measurements.record_bytes_received(0, n as u64);
1239                    interval_bytes += n as u64;
1240                    interval_packets += 1;
1241
1242                    // Report interval
1243                    if last_interval.elapsed() >= self.config.interval {
1244                        let elapsed = start.elapsed();
1245                        let interval_duration = last_interval.elapsed();
1246                        let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1247
1248                        let interval_start = if elapsed > interval_duration {
1249                            elapsed - interval_duration
1250                        } else {
1251                            Duration::ZERO
1252                        };
1253
1254                        self.measurements.add_interval(IntervalStats {
1255                            start: interval_start,
1256                            end: elapsed,
1257                            bytes: interval_bytes,
1258                            bits_per_second: bps,
1259                            packets: interval_packets,
1260                        });
1261
1262                        // Calculate UDP metrics for callback
1263                        let (lost, expected) = self.measurements.calculate_udp_loss();
1264                        let loss_percent = if expected > 0 {
1265                            (lost as f64 / expected as f64) * 100.0
1266                        } else {
1267                            0.0
1268                        };
1269                        let measurements = self.measurements.get();
1270
1271                        // Send to reporter task (async, non-blocking)
1272                        reporter.report(IntervalReport {
1273                            stream_id: self.stream_id,
1274                            interval_start,
1275                            interval_end: elapsed,
1276                            bytes: interval_bytes,
1277                            bits_per_second: bps,
1278                            packets: Some(interval_packets),
1279                            jitter_ms: Some(measurements.jitter_ms),
1280                            lost_packets: Some(lost),
1281                            lost_percent: Some(loss_percent),
1282                            retransmits: None,
1283                            cwnd: None,
1284                        });
1285
1286                        interval_bytes = 0;
1287                        interval_packets = 0;
1288                        last_interval = Instant::now();
1289                    }
1290                }
1291                Ok(Err(e)) => {
1292                    error!("Error receiving UDP packet: {}", e);
1293                    break;
1294                }
1295                Err(_) => {
1296                    // Timeout - continue to check duration
1297                    continue;
1298                }
1299            }
1300        }
1301
1302        // Signal reporter completion and wait for it to finish
1303        reporter.complete();
1304        let _ = reporter_task.await;
1305
1306        self.measurements.set_duration(start.elapsed());
1307
1308        let final_measurements = self.measurements.get();
1309
1310        // Calculate final UDP metrics
1311        let (lost, expected) = self.measurements.calculate_udp_loss();
1312        let loss_percent = if expected > 0 {
1313            (lost as f64 / expected as f64) * 100.0
1314        } else {
1315            0.0
1316        };
1317
1318        // Notify callback of completion
1319        self.notify(ProgressEvent::TestCompleted {
1320            total_bytes: final_measurements.total_bytes_sent
1321                + final_measurements.total_bytes_received,
1322            duration: final_measurements.total_duration,
1323            bits_per_second: final_measurements.total_bits_per_second(),
1324            total_packets: Some(final_measurements.total_packets),
1325            jitter_ms: Some(final_measurements.jitter_ms),
1326            lost_packets: Some(lost),
1327            lost_percent: Some(loss_percent),
1328            out_of_order: Some(final_measurements.out_of_order_packets),
1329        });
1330
1331        if !self.config.json {
1332            print_results(&final_measurements, self.stream_id, self.config.reverse);
1333        } else {
1334            // Use detailed results for JSON output
1335            let system_info = Some(get_system_info());
1336            let test_config = TestConfig {
1337                protocol: self.config.protocol.as_str().to_string(),
1338                num_streams: self.config.parallel,
1339                blksize: self.config.buffer_size,
1340                omit: 0,
1341                duration: self.config.duration.as_secs(),
1342                reverse: self.config.reverse,
1343            };
1344            let detailed_results = self.measurements.get_detailed_results(
1345                None, // UDP doesn't have connection info
1346                system_info,
1347                test_config,
1348            );
1349            let json = serde_json::to_string_pretty(&detailed_results)?;
1350            println!("{}", json);
1351        }
1352
1353        Ok(())
1354    }
1355
1356    /// Retrieves the measurements collected during the test.
1357    ///
1358    /// This method should be called after `run()` completes to get the final
1359    /// test statistics including throughput, bytes transferred, and timing information.
1360    ///
1361    /// # Returns
1362    ///
1363    /// A `Measurements` struct containing all test statistics.
1364    ///
1365    /// # Examples
1366    ///
1367    /// ```no_run
1368    /// use rperf3::{Client, Config};
1369    ///
1370    /// # #[tokio::main]
1371    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1372    /// let config = Config::client("127.0.0.1".to_string(), 5201);
1373    /// let client = Client::new(config)?;
1374    ///
1375    /// client.run().await?;
1376    ///
1377    /// let measurements = client.get_measurements();
1378    /// println!("Throughput: {:.2} Mbps",
1379    ///          measurements.total_bits_per_second() / 1_000_000.0);
1380    /// println!("Bytes transferred: {} sent, {} received",
1381    ///          measurements.total_bytes_sent,
1382    ///          measurements.total_bytes_received);
1383    ///
1384    /// // UDP-specific metrics
1385    /// if measurements.total_packets > 0 {
1386    ///     println!("UDP Loss: {} / {} ({:.2}%)",
1387    ///              measurements.lost_packets,
1388    ///              measurements.total_packets,
1389    ///              (measurements.lost_packets as f64 / measurements.total_packets as f64) * 100.0);
1390    ///     println!("Jitter: {:.3} ms", measurements.jitter_ms);
1391    /// }
1392    /// # Ok(())
1393    /// # }
1394    /// ```
1395    ///
1396    /// # Returns
1397    ///
1398    /// A snapshot of test measurements including:
1399    /// - Total bytes sent/received (bidirectional support)
1400    /// - Test duration and bandwidth calculations
1401    /// - Per-stream statistics
1402    /// - Interval measurements
1403    /// - UDP-specific metrics: packet count, loss percentage, jitter (RFC 3550),
1404    ///   and out-of-order detection
1405    pub fn get_measurements(&self) -> crate::Measurements {
1406        self.measurements.get()
1407    }
1408}
1409
1410async fn send_data(
1411    stream: &mut TcpStream,
1412    stream_id: usize,
1413    measurements: &MeasurementsCollector,
1414    config: &Config,
1415    callback: &Option<CallbackRef>,
1416    buffer_pool: Arc<BufferPool>,
1417    cancel_token: &CancellationToken,
1418) -> Result<()> {
1419    // Create async interval reporter
1420    let (reporter, receiver) = IntervalReporter::new();
1421    let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
1422
1423    let buffer = buffer_pool.get();
1424    let start = Instant::now();
1425    let mut last_interval = start;
1426    let mut interval_bytes = 0u64;
1427    let mut last_retransmits = 0u64;
1428
1429    while start.elapsed() < config.duration {
1430        // Check for cancellation
1431        if cancel_token.is_cancelled() {
1432            info!("Test cancelled by user");
1433            break;
1434        }
1435
1436        match stream.write(&buffer).await {
1437            Ok(n) => {
1438                measurements.record_bytes_sent(stream_id, n as u64);
1439                interval_bytes += n as u64;
1440
1441                // Report interval
1442                if last_interval.elapsed() >= config.interval {
1443                    let elapsed = start.elapsed();
1444                    let interval_duration = last_interval.elapsed();
1445                    let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1446
1447                    let interval_start = if elapsed > interval_duration {
1448                        elapsed - interval_duration
1449                    } else {
1450                        Duration::ZERO
1451                    };
1452
1453                    // Get TCP stats for retransmits
1454                    let tcp_stats = get_tcp_stats(stream).ok();
1455                    let current_retransmits =
1456                        tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1457                    let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1458                    last_retransmits = current_retransmits;
1459
1460                    measurements.add_interval(IntervalStats {
1461                        start: interval_start,
1462                        end: elapsed,
1463                        bytes: interval_bytes,
1464                        bits_per_second: bps,
1465                        packets: u64::MAX,
1466                    });
1467
1468                    // Get congestion window for reporting
1469                    let cwnd_kbytes = tcp_stats
1470                        .as_ref()
1471                        .and_then(|s| s.snd_cwnd_opt())
1472                        .map(|cwnd| cwnd / 1024);
1473
1474                    // Send to reporter task (async, non-blocking)
1475                    reporter.report(IntervalReport {
1476                        stream_id,
1477                        interval_start,
1478                        interval_end: elapsed,
1479                        bytes: interval_bytes,
1480                        bits_per_second: bps,
1481                        packets: None,
1482                        jitter_ms: None,
1483                        lost_packets: None,
1484                        lost_percent: None,
1485                        retransmits: if interval_retransmits > 0 {
1486                            Some(interval_retransmits)
1487                        } else {
1488                            None
1489                        },
1490                        cwnd: cwnd_kbytes,
1491                    });
1492
1493                    interval_bytes = 0;
1494                    last_interval = Instant::now();
1495                }
1496            }
1497            Err(e) => {
1498                error!("Error sending data: {}", e);
1499                break;
1500            }
1501        }
1502    }
1503
1504    // Signal reporter completion and wait for it to finish
1505    reporter.complete();
1506    let _ = reporter_task.await;
1507
1508    measurements.set_duration(start.elapsed());
1509    stream.flush().await?;
1510
1511    Ok(())
1512}
1513
1514async fn receive_data(
1515    stream: &mut TcpStream,
1516    stream_id: usize,
1517    measurements: &MeasurementsCollector,
1518    config: &Config,
1519    callback: &Option<CallbackRef>,
1520    buffer_pool: Arc<BufferPool>,
1521    cancel_token: &CancellationToken,
1522) -> Result<()> {
1523    // Create async interval reporter
1524    let (reporter, receiver) = IntervalReporter::new();
1525    let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
1526
1527    let mut buffer = buffer_pool.get();
1528    let start = Instant::now();
1529    let mut last_interval = start;
1530    let mut interval_bytes = 0u64;
1531    let mut last_retransmits = 0u64;
1532
1533    while start.elapsed() < config.duration {
1534        // Check for cancellation
1535        if cancel_token.is_cancelled() {
1536            info!("Test cancelled by user");
1537            break;
1538        }
1539
1540        match time::timeout(Duration::from_millis(100), stream.read(&mut buffer)).await {
1541            Ok(Ok(0)) => {
1542                // Connection closed
1543                break;
1544            }
1545            Ok(Ok(n)) => {
1546                measurements.record_bytes_received(stream_id, n as u64);
1547                interval_bytes += n as u64;
1548
1549                // Report interval
1550                if last_interval.elapsed() >= config.interval {
1551                    let elapsed = start.elapsed();
1552                    let interval_duration = last_interval.elapsed();
1553                    let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1554
1555                    let interval_start = if elapsed > interval_duration {
1556                        elapsed - interval_duration
1557                    } else {
1558                        Duration::ZERO
1559                    };
1560
1561                    // Get TCP stats for retransmits
1562                    let tcp_stats = get_tcp_stats(stream).ok();
1563                    let current_retransmits =
1564                        tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1565                    let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1566                    last_retransmits = current_retransmits;
1567
1568                    measurements.add_interval(IntervalStats {
1569                        start: interval_start,
1570                        end: elapsed,
1571                        bytes: interval_bytes,
1572                        bits_per_second: bps,
1573                        packets: u64::MAX,
1574                    });
1575
1576                    // Send to reporter task (async, non-blocking)
1577                    reporter.report(IntervalReport {
1578                        stream_id,
1579                        interval_start,
1580                        interval_end: elapsed,
1581                        bytes: interval_bytes,
1582                        bits_per_second: bps,
1583                        packets: None,
1584                        jitter_ms: None,
1585                        lost_packets: None,
1586                        lost_percent: None,
1587                        retransmits: if interval_retransmits > 0 {
1588                            Some(interval_retransmits)
1589                        } else {
1590                            None
1591                        },
1592                        cwnd: None, // Not applicable for receiver
1593                    });
1594
1595                    interval_bytes = 0;
1596                    last_interval = Instant::now();
1597                }
1598            }
1599            Ok(Err(e)) => {
1600                error!("Error receiving data: {}", e);
1601                break;
1602            }
1603            Err(_) => {
1604                // Timeout, check if duration expired
1605                if start.elapsed() >= config.duration {
1606                    break;
1607                }
1608            }
1609        }
1610    }
1611
1612    // Signal reporter completion and wait for it to finish
1613    reporter.complete();
1614    let _ = reporter_task.await;
1615
1616    measurements.set_duration(start.elapsed());
1617
1618    Ok(())
1619}
1620
1621fn print_results(measurements: &crate::Measurements, stream_id: usize, _reverse: bool) {
1622    let is_udp = measurements.total_packets > 0;
1623
1624    if !is_udp {
1625        // TCP formatting
1626        println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
1627
1628        let duration = measurements.total_duration.as_secs_f64();
1629
1630        // Print header for final summary
1631        println!("[ ID] Interval           Transfer        Bitrate            Retr");
1632
1633        // Print sender summary
1634        let sent_bytes = measurements.total_bytes_sent;
1635        let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
1636            (sent_bytes as f64 / 1_000_000_000.0, "GBytes")
1637        } else {
1638            (sent_bytes as f64 / 1_000_000.0, "MBytes")
1639        };
1640        let sent_bps = (sent_bytes as f64 * 8.0) / duration;
1641        let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
1642            (sent_bps / 1_000_000_000.0, "Gbits/sec")
1643        } else {
1644            (sent_bps / 1_000_000.0, "Mbits/sec")
1645        };
1646
1647        println!(
1648            "[{:3}]   {:4.2}-{:4.2}  sec  {:6.2} {:>7}  {:6.1} {:>10}  {:4}             sender",
1649            stream_id,
1650            0.0,
1651            duration,
1652            sent_val,
1653            sent_unit,
1654            sent_bitrate_val,
1655            sent_bitrate_unit,
1656            0 // Total retransmits - would need to track cumulative
1657        );
1658
1659        // Print receiver summary if we received data
1660        if measurements.total_bytes_received > 0 {
1661            let recv_bytes = measurements.total_bytes_received;
1662            let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
1663                (recv_bytes as f64 / 1_000_000_000.0, "GBytes")
1664            } else {
1665                (recv_bytes as f64 / 1_000_000.0, "MBytes")
1666            };
1667            let recv_bps = (recv_bytes as f64 * 8.0) / duration;
1668            let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
1669                (recv_bps / 1_000_000_000.0, "Gbits/sec")
1670            } else {
1671                (recv_bps / 1_000_000.0, "Mbits/sec")
1672            };
1673
1674            println!(
1675                "[{:3}]   {:4.2}-{:4.2}  sec  {:6.2} {:>7}  {:6.1} {:>10}                  receiver",
1676                stream_id, 0.0, duration, recv_val, recv_unit, recv_bitrate_val, recv_bitrate_unit
1677            );
1678        }
1679
1680        println!();
1681    } else {
1682        // UDP formatting
1683        println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
1684
1685        let duration = measurements.total_duration.as_secs_f64();
1686
1687        // Calculate loss statistics
1688        let (lost, expected) = if measurements.total_bytes_received > 0 {
1689            let (l, e) = measurements.calculate_udp_loss();
1690            (l, e)
1691        } else {
1692            (0, measurements.total_packets)
1693        };
1694
1695        let loss_percent = if expected > 0 {
1696            (lost as f64 / expected as f64) * 100.0
1697        } else {
1698            0.0
1699        };
1700
1701        // Print header for final summary
1702        println!(
1703            "[ ID] Interval           Transfer        Bitrate            Jitter    Lost/Total Datagrams"
1704        );
1705
1706        // Print sender summary
1707        if measurements.total_bytes_sent > 0 {
1708            let sent_bytes = measurements.total_bytes_sent;
1709            let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
1710                (sent_bytes as f64 / 1_000_000_000.0, "GBytes")
1711            } else if sent_bytes >= 1_000_000 {
1712                (sent_bytes as f64 / 1_000_000.0, "MBytes")
1713            } else {
1714                (sent_bytes as f64 / 1_000.0, "KBytes")
1715            };
1716            let sent_bps = (sent_bytes as f64 * 8.0) / duration;
1717            let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
1718                (sent_bps / 1_000_000_000.0, "Gbits/sec")
1719            } else {
1720                (sent_bps / 1_000_000.0, "Mbits/sec")
1721            };
1722
1723            println!(
1724                "[{:3}]   {:4.2}-{:4.2}  sec  {:6.2} {:>7}  {:6.1} {:>10}  {:6.3} ms  {}/{} ({:.0}%)  sender",
1725                stream_id,
1726                0.0,
1727                duration,
1728                sent_val,
1729                sent_unit,
1730                sent_bitrate_val,
1731                sent_bitrate_unit,
1732                0.0, // Jitter can't be measured at sender
1733                lost,
1734                expected,
1735                loss_percent
1736            );
1737        }
1738
1739        // Print receiver summary if we received data
1740        if measurements.total_bytes_received > 0 {
1741            let recv_bytes = measurements.total_bytes_received;
1742            let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
1743                (recv_bytes as f64 / 1_000_000_000.0, "GBytes")
1744            } else if recv_bytes >= 1_000_000 {
1745                (recv_bytes as f64 / 1_000_000.0, "MBytes")
1746            } else {
1747                (recv_bytes as f64 / 1_000.0, "KBytes")
1748            };
1749            let recv_bps = (recv_bytes as f64 * 8.0) / duration;
1750            let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
1751                (recv_bps / 1_000_000_000.0, "Gbits/sec")
1752            } else {
1753                (recv_bps / 1_000_000.0, "Mbits/sec")
1754            };
1755
1756            println!(
1757                "[{:3}]   {:4.2}-{:4.2}  sec  {:6.2} {:>7}  {:6.1} {:>10}  {:6.3} ms  {}/{} ({:.0}%)  receiver",
1758                stream_id,
1759                0.0,
1760                duration,
1761                recv_val,
1762                recv_unit,
1763                recv_bitrate_val,
1764                recv_bitrate_unit,
1765                measurements.jitter_ms,
1766                lost,
1767                expected,
1768                loss_percent
1769            );
1770        }
1771
1772        println!();
1773    }
1774}