1use crate::buffer_pool::BufferPool;
2use crate::config::{Config, Protocol};
3use crate::interval_reporter::{run_reporter_task, IntervalReport, IntervalReporter};
4use crate::measurements::{
5 get_connection_info, get_system_info, get_tcp_stats, IntervalStats, MeasurementsCollector,
6 TestConfig,
7};
8use crate::protocol::{deserialize_message, serialize_message, Message, DEFAULT_STREAM_ID};
9use crate::{Error, Result};
10use log::{debug, error, info};
11use socket2::SockRef;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tokio::net::{TcpStream, UdpSocket};
16use tokio::time;
17use tokio_util::sync::CancellationToken;
18
19fn configure_tcp_socket(stream: &TcpStream) -> Result<()> {
38 stream.set_nodelay(true).map_err(|e| {
40 Error::Io(std::io::Error::new(
41 e.kind(),
42 format!("Failed to set TCP_NODELAY: {}", e),
43 ))
44 })?;
45
46 const BUFFER_SIZE: usize = 256 * 1024; let sock_ref = SockRef::from(stream);
49
50 sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
51 Error::Io(std::io::Error::new(
52 e.kind(),
53 format!("Failed to set send buffer size: {}", e),
54 ))
55 })?;
56
57 sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
58 Error::Io(std::io::Error::new(
59 e.kind(),
60 format!("Failed to set recv buffer size: {}", e),
61 ))
62 })?;
63
64 debug!(
65 "TCP socket configured: TCP_NODELAY=true, buffers={}KB",
66 BUFFER_SIZE / 1024
67 );
68
69 Ok(())
70}
71
72fn configure_udp_socket(socket: &UdpSocket) -> Result<()> {
90 const BUFFER_SIZE: usize = 2 * 1024 * 1024; let sock_ref = SockRef::from(socket);
93
94 sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
95 Error::Io(std::io::Error::new(
96 e.kind(),
97 format!("Failed to set UDP send buffer size: {}", e),
98 ))
99 })?;
100
101 sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
102 Error::Io(std::io::Error::new(
103 e.kind(),
104 format!("Failed to set UDP recv buffer size: {}", e),
105 ))
106 })?;
107
108 debug!(
109 "UDP socket configured: buffers={}MB",
110 BUFFER_SIZE / (1024 * 1024)
111 );
112
113 Ok(())
114}
115
116#[derive(Debug, Clone)]
151pub enum ProgressEvent {
152 TestStarted,
156 IntervalUpdate {
173 interval_start: Duration,
174 interval_end: Duration,
175 bytes: u64,
176 bits_per_second: f64,
177 packets: Option<u64>,
178 jitter_ms: Option<f64>,
179 lost_packets: Option<u64>,
180 lost_percent: Option<f64>,
181 retransmits: Option<u64>,
182 },
183 TestCompleted {
198 total_bytes: u64,
199 duration: Duration,
200 bits_per_second: f64,
201 total_packets: Option<u64>,
202 jitter_ms: Option<f64>,
203 lost_packets: Option<u64>,
204 lost_percent: Option<f64>,
205 out_of_order: Option<u64>,
206 },
207 Error(String),
212}
213
214pub trait ProgressCallback: Send + Sync {
253 fn on_progress(&self, event: ProgressEvent);
259}
260
261impl<F> ProgressCallback for F
263where
264 F: Fn(ProgressEvent) + Send + Sync,
265{
266 fn on_progress(&self, event: ProgressEvent) {
267 self(event)
268 }
269}
270
271type CallbackRef = Arc<dyn ProgressCallback>;
272
273pub struct Client {
361 config: Config,
362 measurements: MeasurementsCollector,
363 callback: Option<CallbackRef>,
364 tcp_buffer_pool: Arc<BufferPool>,
365 udp_buffer_pool: Arc<BufferPool>,
366 cancellation_token: CancellationToken,
367 stream_id: usize,
368}
369
370impl Client {
371 pub fn new(config: Config) -> Result<Self> {
390 if config.server_addr.is_none() {
391 return Err(Error::Config(
392 "Server address is required for client mode".to_string(),
393 ));
394 }
395
396 let tcp_pool_size = config.parallel * 2; let tcp_buffer_pool = Arc::new(BufferPool::new(config.buffer_size, tcp_pool_size));
400
401 let udp_buffer_pool = Arc::new(BufferPool::new(65536, 10));
403
404 Ok(Self {
405 config,
406 measurements: MeasurementsCollector::new(),
407 callback: None,
408 tcp_buffer_pool,
409 udp_buffer_pool,
410 cancellation_token: CancellationToken::new(),
411 stream_id: DEFAULT_STREAM_ID, })
413 }
414
415 pub fn with_callback<C: ProgressCallback + 'static>(mut self, callback: C) -> Self {
444 self.callback = Some(Arc::new(callback));
445 self
446 }
447
448 fn notify(&self, event: ProgressEvent) {
450 if let Some(callback) = &self.callback {
451 callback.on_progress(event);
452 }
453 }
454
455 pub fn cancellation_token(&self) -> &CancellationToken {
483 &self.cancellation_token
484 }
485
486 pub async fn run(&self) -> Result<()> {
516 let server_addr = self
517 .config
518 .server_addr
519 .as_ref()
520 .ok_or_else(|| Error::Config("Server address not set".to_string()))?;
521
522 let full_addr = format!("{}:{}", server_addr, self.config.port);
523
524 info!("Connecting to rperf3 server at {}", full_addr);
525
526 match self.config.protocol {
527 Protocol::Tcp => self.run_tcp(&full_addr).await,
528 Protocol::Udp => self.run_udp(&full_addr).await,
529 }
530 }
531
532 async fn run_tcp(&self, server_addr: &str) -> Result<()> {
533 let mut stream = TcpStream::connect(server_addr).await?;
534 info!("Connected to {}", server_addr);
535
536 configure_tcp_socket(&stream)?;
538
539 if !self.config.json {
541 let local_addr = stream.local_addr()?;
542 let remote_addr = stream.peer_addr()?;
543 println!(
544 "Connecting to host {}, port {}",
545 remote_addr.ip(),
546 remote_addr.port()
547 );
548 println!(
549 "[{:3}] local {} port {} connected to {} port {}",
550 self.stream_id,
551 local_addr.ip(),
552 local_addr.port(),
553 remote_addr.ip(),
554 remote_addr.port()
555 );
556 }
557
558 let connection_info = get_connection_info(&stream).ok();
560 let system_info = Some(get_system_info());
561
562 let setup = Message::setup(
564 self.config.protocol.as_str().to_string(),
565 self.config.duration,
566 self.config.bandwidth,
567 self.config.buffer_size,
568 self.config.parallel,
569 self.config.reverse,
570 );
571 let setup_bytes = serialize_message(&setup)?;
572 stream.write_all(&setup_bytes).await?;
573 stream.flush().await?;
574
575 let ack_msg = deserialize_message(&mut stream).await?;
577 match ack_msg {
578 Message::SetupAck { port, cookie } => {
579 debug!("Received setup ack: port={}, cookie={}", port, cookie);
580 }
581 Message::Error { message } => {
582 return Err(Error::Protocol(format!("Server error: {}", message)));
583 }
584 _ => {
585 return Err(Error::Protocol("Expected SetupAck message".to_string()));
586 }
587 }
588
589 let start_msg = deserialize_message(&mut stream).await?;
591 match start_msg {
592 Message::Start { .. } => {
593 info!("Test started");
594 self.notify(ProgressEvent::TestStarted);
595 }
596 _ => {
597 return Err(Error::Protocol("Expected Start message".to_string()));
598 }
599 }
600
601 self.measurements.set_start_time(Instant::now());
602
603 if !self.config.json {
605 if self.config.reverse {
606 println!("[ ID] Interval Transfer Bitrate Retr");
607 } else {
608 println!("[ ID] Interval Transfer Bitrate Retr Cwnd");
609 }
610 }
611
612 if self.config.reverse {
613 receive_data(
615 &mut stream,
616 self.stream_id,
617 &self.measurements,
618 &self.config,
619 &self.callback,
620 self.tcp_buffer_pool.clone(),
621 &self.cancellation_token,
622 )
623 .await?;
624 } else {
625 send_data(
627 &mut stream,
628 self.stream_id,
629 &self.measurements,
630 &self.config,
631 &self.callback,
632 self.tcp_buffer_pool.clone(),
633 &self.cancellation_token,
634 )
635 .await?;
636 }
637
638 match deserialize_message(&mut stream).await {
640 Ok(result_msg) => match result_msg {
641 Message::Result {
642 stream_id,
643 bytes_sent,
644 bytes_received,
645 duration: _,
646 bits_per_second,
647 ..
648 } => {
649 info!(
650 "Stream {}: {} bytes sent, {} bytes received, {:.2} Mbps",
651 stream_id,
652 bytes_sent,
653 bytes_received,
654 bits_per_second / 1_000_000.0
655 );
656 }
657 _ => {
658 debug!("Unexpected message, continuing");
659 }
660 },
661 Err(e) => {
662 debug!(
663 "Could not read result message (connection may be closed): {}",
664 e
665 );
666 }
667 }
668
669 match deserialize_message(&mut stream).await {
671 Ok(done_msg) => match done_msg {
672 Message::Done => {
673 info!("Test completed");
674 }
675 _ => {
676 debug!("Expected Done message");
677 }
678 },
679 Err(e) => {
680 debug!(
681 "Could not read done message (connection may be closed): {}",
682 e
683 );
684 info!("Test completed");
685 }
686 }
687
688 let final_measurements = self.measurements.get();
689
690 self.notify(ProgressEvent::TestCompleted {
692 total_bytes: final_measurements.total_bytes_sent
693 + final_measurements.total_bytes_received,
694 duration: final_measurements.total_duration,
695 bits_per_second: final_measurements.total_bits_per_second(),
696 total_packets: None, jitter_ms: None,
698 lost_packets: None,
699 lost_percent: None,
700 out_of_order: None,
701 });
702
703 if !self.config.json {
704 print_results(&final_measurements, self.stream_id, self.config.reverse);
705 } else {
706 let test_config = TestConfig {
708 protocol: self.config.protocol.as_str().to_string(),
709 num_streams: self.config.parallel,
710 blksize: self.config.buffer_size,
711 omit: 0,
712 duration: self.config.duration.as_secs(),
713 reverse: self.config.reverse,
714 };
715 let detailed_results =
716 self.measurements
717 .get_detailed_results(connection_info, system_info, test_config);
718 let json = serde_json::to_string_pretty(&detailed_results)?;
719 println!("{}", json);
720 }
721
722 Ok(())
723 }
724
725 async fn run_udp(&self, server_addr: &str) -> Result<()> {
726 let mut control_stream = TcpStream::connect(server_addr).await?;
729
730 configure_tcp_socket(&control_stream)?;
732
733 let setup = Message::setup(
735 self.config.protocol.as_str().to_string(),
736 self.config.duration,
737 self.config.bandwidth,
738 self.config.buffer_size,
739 self.config.parallel,
740 self.config.reverse,
741 );
742 let setup_bytes = serialize_message(&setup)?;
743 control_stream.write_all(&setup_bytes).await?;
744 control_stream.flush().await?;
745
746 let ack_msg = deserialize_message(&mut control_stream).await?;
748 match ack_msg {
749 Message::SetupAck { port, cookie } => {
750 debug!("Received setup ack: port={}, cookie={}", port, cookie);
751 }
752 Message::Error { message } => {
753 return Err(Error::Protocol(format!("Server error: {}", message)));
754 }
755 _ => {
756 return Err(Error::Protocol("Expected SetupAck message".to_string()));
757 }
758 }
759
760 let start_msg = deserialize_message(&mut control_stream).await?;
762 match start_msg {
763 Message::Start { .. } => {
764 info!("Test started");
765 self.notify(ProgressEvent::TestStarted);
766 }
767 _ => {
768 return Err(Error::Protocol("Expected Start message".to_string()));
769 }
770 }
771
772 let socket = UdpSocket::bind("0.0.0.0:0").await?;
774 socket.connect(server_addr).await?;
775
776 configure_udp_socket(&socket)?;
778
779 info!("UDP client connected to {}", server_addr);
780
781 if !self.config.json {
783 let local_addr = socket.local_addr()?;
784 let remote_addr = socket.peer_addr()?;
785 println!(
786 "Connecting to host {}, port {}",
787 remote_addr.ip(),
788 remote_addr.port()
789 );
790 println!(
791 "[{:3}] local {} port {} connected to {} port {}",
792 self.stream_id,
793 local_addr.ip(),
794 local_addr.port(),
795 remote_addr.ip(),
796 remote_addr.port()
797 );
798 println!("[ ID] Interval Transfer Bitrate Total Datagrams");
799 }
800
801 let result = if self.config.reverse {
802 let init_packet = crate::udp_packet::create_packet(0, 0);
804 socket.send(&init_packet).await?;
805
806 self.run_udp_receive(socket).await
808 } else {
809 self.run_udp_send(socket).await
811 };
812
813 drop(control_stream);
815
816 result
817 }
818
819 async fn run_udp_send(&self, socket: UdpSocket) -> Result<()> {
820 #[cfg(target_os = "linux")]
822 return self.run_udp_send_batched(socket).await;
823
824 #[cfg(not(target_os = "linux"))]
825 return self.run_udp_send_standard(socket).await;
826 }
827
828 #[cfg_attr(target_os = "linux", allow(dead_code))]
830 async fn run_udp_send_standard(&self, socket: UdpSocket) -> Result<()> {
831 let (reporter, receiver) = IntervalReporter::new();
833 let reporter_task = tokio::spawn(run_reporter_task(
834 receiver,
835 self.config.json,
836 self.callback.clone(),
837 ));
838
839 let start = Instant::now();
840 let mut last_interval = start;
841 let mut interval_bytes = 0u64;
842 let mut interval_packets = 0u64;
843 let mut sequence = 0u64;
844
845 let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
847 self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
848 } else {
849 1024
850 };
851
852 let mut token_bucket = self
854 .config
855 .bandwidth
856 .map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
857
858 while start.elapsed() < self.config.duration {
859 if self.cancellation_token.is_cancelled() {
861 info!("Test cancelled by user");
862 break;
863 }
864
865 let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
866
867 match socket.send(&packet).await {
868 Ok(n) => {
869 self.measurements.record_bytes_sent(0, n as u64);
870 self.measurements.record_udp_packet(0);
871 interval_bytes += n as u64;
872 interval_packets += 1;
873 sequence += 1;
874
875 if let Some(ref mut bucket) = token_bucket {
877 bucket.consume(n).await;
878 }
879
880 if last_interval.elapsed() >= self.config.interval {
882 let elapsed = start.elapsed();
883 let interval_duration = last_interval.elapsed();
884 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
885
886 let interval_start = if elapsed > interval_duration {
887 elapsed - interval_duration
888 } else {
889 Duration::ZERO
890 };
891
892 self.measurements.add_interval(IntervalStats {
893 start: interval_start,
894 end: elapsed,
895 bytes: interval_bytes,
896 bits_per_second: bps,
897 packets: interval_packets,
898 });
899
900 let (lost, expected) = self.measurements.calculate_udp_loss();
902 let loss_percent = if expected > 0 {
903 (lost as f64 / expected as f64) * 100.0
904 } else {
905 0.0
906 };
907 let measurements = self.measurements.get();
908
909 reporter.report(IntervalReport {
911 stream_id: self.stream_id,
912 interval_start,
913 interval_end: elapsed,
914 bytes: interval_bytes,
915 bits_per_second: bps,
916 packets: Some(interval_packets),
917 jitter_ms: Some(measurements.jitter_ms),
918 lost_packets: Some(lost),
919 lost_percent: Some(loss_percent),
920 retransmits: None,
921 cwnd: None,
922 });
923
924 interval_bytes = 0;
925 interval_packets = 0;
926 last_interval = Instant::now();
927 }
928 }
929 Err(e) => {
930 error!("Error sending UDP packet: {}", e);
931 break;
932 }
933 }
934 }
935
936 reporter.complete();
938 let _ = reporter_task.await;
940
941 self.measurements.set_duration(start.elapsed());
942
943 let final_measurements = self.measurements.get();
944
945 let (lost, expected) = self.measurements.calculate_udp_loss();
947 let loss_percent = if expected > 0 {
948 (lost as f64 / expected as f64) * 100.0
949 } else {
950 0.0
951 };
952
953 self.notify(ProgressEvent::TestCompleted {
955 total_bytes: final_measurements.total_bytes_sent
956 + final_measurements.total_bytes_received,
957 duration: final_measurements.total_duration,
958 bits_per_second: final_measurements.total_bits_per_second(),
959 total_packets: Some(final_measurements.total_packets),
960 jitter_ms: Some(final_measurements.jitter_ms),
961 lost_packets: Some(lost),
962 lost_percent: Some(loss_percent),
963 out_of_order: Some(final_measurements.out_of_order_packets),
964 });
965
966 if !self.config.json {
967 print_results(&final_measurements, self.stream_id, self.config.reverse);
968 } else {
969 let system_info = Some(get_system_info());
971 let test_config = TestConfig {
972 protocol: self.config.protocol.as_str().to_string(),
973 num_streams: self.config.parallel,
974 blksize: self.config.buffer_size,
975 omit: 0,
976 duration: self.config.duration.as_secs(),
977 reverse: self.config.reverse,
978 };
979 let detailed_results = self.measurements.get_detailed_results(
980 None, system_info,
982 test_config,
983 );
984 let json = serde_json::to_string_pretty(&detailed_results)?;
985 println!("{}", json);
986 }
987
988 Ok(())
989 }
990
991 #[cfg(target_os = "linux")]
993 async fn run_udp_send_batched(&self, socket: UdpSocket) -> Result<()> {
994 use crate::batch_socket::{UdpSendBatch, MAX_BATCH_SIZE};
995
996 let (reporter, receiver) = IntervalReporter::new();
998 let reporter_task = tokio::spawn(run_reporter_task(
999 receiver,
1000 self.config.json,
1001 self.callback.clone(),
1002 ));
1003
1004 let start = Instant::now();
1005 let mut last_interval = start;
1006 let mut interval_bytes = 0u64;
1007 let mut interval_packets = 0u64;
1008 let mut sequence = 0u64;
1009
1010 let payload_size = if self.config.buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
1012 self.config.buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
1013 } else {
1014 1024
1015 };
1016
1017 let mut token_bucket = self
1019 .config
1020 .bandwidth
1021 .map(|bw| crate::token_bucket::TokenBucket::new(bw / 8));
1022
1023 let mut batch = UdpSendBatch::new();
1025 let remote_addr = socket.peer_addr()?;
1026
1027 let adaptive_batch_size = if let Some(ref bucket) = token_bucket {
1029 let target_bps = bucket.bytes_per_sec;
1031 let packets_per_sec = target_bps / payload_size as u64;
1032 if packets_per_sec < 1000 {
1033 (MAX_BATCH_SIZE / 4).max(4)
1035 } else if packets_per_sec < 10000 {
1036 MAX_BATCH_SIZE / 2
1038 } else {
1039 MAX_BATCH_SIZE
1041 }
1042 } else {
1043 MAX_BATCH_SIZE
1045 };
1046
1047 while start.elapsed() < self.config.duration {
1048 if self.cancellation_token.is_cancelled() {
1050 info!("Test cancelled by user");
1051 break;
1052 }
1053
1054 while !batch.is_full()
1056 && batch.len() < adaptive_batch_size
1057 && start.elapsed() < self.config.duration
1058 {
1059 let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
1060 batch.add(packet, remote_addr);
1061 sequence += 1;
1062 }
1063
1064 if !batch.is_empty() {
1066 match batch.send(&socket).await {
1067 Ok((bytes_sent, packets_sent)) => {
1068 self.measurements.record_bytes_sent(0, bytes_sent as u64);
1070 for _ in 0..packets_sent {
1071 self.measurements.record_udp_packet(0);
1072 }
1073
1074 interval_bytes += bytes_sent as u64;
1075 interval_packets += packets_sent as u64;
1076
1077 if let Some(ref mut bucket) = token_bucket {
1079 bucket.consume(bytes_sent).await;
1080 }
1081
1082 if last_interval.elapsed() >= self.config.interval {
1084 let elapsed = start.elapsed();
1085 let interval_duration = last_interval.elapsed();
1086 let bps =
1087 (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1088
1089 let interval_start = if elapsed > interval_duration {
1090 elapsed - interval_duration
1091 } else {
1092 Duration::ZERO
1093 };
1094
1095 self.measurements.add_interval(IntervalStats {
1096 start: interval_start,
1097 end: elapsed,
1098 bytes: interval_bytes,
1099 bits_per_second: bps,
1100 packets: interval_packets,
1101 });
1102
1103 let (lost, expected) = self.measurements.calculate_udp_loss();
1105 let loss_percent = if expected > 0 {
1106 (lost as f64 / expected as f64) * 100.0
1107 } else {
1108 0.0
1109 };
1110 let measurements = self.measurements.get();
1111
1112 reporter.report(IntervalReport {
1114 stream_id: self.stream_id,
1115 interval_start,
1116 interval_end: elapsed,
1117 bytes: interval_bytes,
1118 bits_per_second: bps,
1119 packets: Some(interval_packets),
1120 jitter_ms: Some(measurements.jitter_ms),
1121 lost_packets: Some(lost),
1122 lost_percent: Some(loss_percent),
1123 retransmits: None,
1124 cwnd: None,
1125 });
1126
1127 interval_bytes = 0;
1128 interval_packets = 0;
1129 last_interval = Instant::now();
1130 }
1131 }
1132 Err(e) => {
1133 error!("Error sending batch: {}", e);
1134 break;
1135 }
1136 }
1137 }
1138 }
1139
1140 reporter.complete();
1142 let _ = reporter_task.await;
1143
1144 self.measurements.set_duration(start.elapsed());
1145
1146 let final_measurements = self.measurements.get();
1147
1148 let (lost, expected) = self.measurements.calculate_udp_loss();
1150 let loss_percent = if expected > 0 {
1151 (lost as f64 / expected as f64) * 100.0
1152 } else {
1153 0.0
1154 };
1155
1156 self.notify(ProgressEvent::TestCompleted {
1158 total_bytes: final_measurements.total_bytes_sent
1159 + final_measurements.total_bytes_received,
1160 duration: final_measurements.total_duration,
1161 bits_per_second: final_measurements.total_bits_per_second(),
1162 total_packets: Some(final_measurements.total_packets),
1163 jitter_ms: Some(final_measurements.jitter_ms),
1164 lost_packets: Some(lost),
1165 lost_percent: Some(loss_percent),
1166 out_of_order: Some(final_measurements.out_of_order_packets),
1167 });
1168
1169 if !self.config.json {
1170 print_results(&final_measurements, self.stream_id, self.config.reverse);
1171 } else {
1172 let system_info = Some(get_system_info());
1174 let test_config = TestConfig {
1175 protocol: self.config.protocol.as_str().to_string(),
1176 num_streams: self.config.parallel,
1177 blksize: self.config.buffer_size,
1178 omit: 0,
1179 duration: self.config.duration.as_secs(),
1180 reverse: self.config.reverse,
1181 };
1182 let detailed_results = self.measurements.get_detailed_results(
1183 None, system_info,
1185 test_config,
1186 );
1187 let json = serde_json::to_string_pretty(&detailed_results)?;
1188 println!("{}", json);
1189 }
1190
1191 Ok(())
1192 }
1193
1194 async fn run_udp_receive(&self, socket: UdpSocket) -> Result<()> {
1195 let (reporter, receiver) = IntervalReporter::new();
1197 let reporter_task = tokio::spawn(run_reporter_task(
1198 receiver,
1199 self.config.json,
1200 self.callback.clone(),
1201 ));
1202
1203 let start = Instant::now();
1204 let mut last_interval = start;
1205 let mut interval_bytes = 0u64;
1206 let mut interval_packets = 0u64;
1207 let mut buffer = self.udp_buffer_pool.get();
1208
1209 while start.elapsed() < self.config.duration {
1210 if self.cancellation_token.is_cancelled() {
1212 info!("Test cancelled by user");
1213 break;
1214 }
1215
1216 let timeout =
1218 tokio::time::timeout(Duration::from_millis(100), socket.recv(&mut buffer));
1219
1220 match timeout.await {
1221 Ok(Ok(n)) => {
1222 if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buffer[..n])
1224 {
1225 let recv_timestamp_us = std::time::SystemTime::now()
1227 .duration_since(std::time::UNIX_EPOCH)
1228 .expect("Time went backwards")
1229 .as_micros() as u64;
1230
1231 self.measurements.record_udp_packet_received(
1232 header.sequence,
1233 header.timestamp_us,
1234 recv_timestamp_us,
1235 );
1236 }
1237
1238 self.measurements.record_bytes_received(0, n as u64);
1239 interval_bytes += n as u64;
1240 interval_packets += 1;
1241
1242 if last_interval.elapsed() >= self.config.interval {
1244 let elapsed = start.elapsed();
1245 let interval_duration = last_interval.elapsed();
1246 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1247
1248 let interval_start = if elapsed > interval_duration {
1249 elapsed - interval_duration
1250 } else {
1251 Duration::ZERO
1252 };
1253
1254 self.measurements.add_interval(IntervalStats {
1255 start: interval_start,
1256 end: elapsed,
1257 bytes: interval_bytes,
1258 bits_per_second: bps,
1259 packets: interval_packets,
1260 });
1261
1262 let (lost, expected) = self.measurements.calculate_udp_loss();
1264 let loss_percent = if expected > 0 {
1265 (lost as f64 / expected as f64) * 100.0
1266 } else {
1267 0.0
1268 };
1269 let measurements = self.measurements.get();
1270
1271 reporter.report(IntervalReport {
1273 stream_id: self.stream_id,
1274 interval_start,
1275 interval_end: elapsed,
1276 bytes: interval_bytes,
1277 bits_per_second: bps,
1278 packets: Some(interval_packets),
1279 jitter_ms: Some(measurements.jitter_ms),
1280 lost_packets: Some(lost),
1281 lost_percent: Some(loss_percent),
1282 retransmits: None,
1283 cwnd: None,
1284 });
1285
1286 interval_bytes = 0;
1287 interval_packets = 0;
1288 last_interval = Instant::now();
1289 }
1290 }
1291 Ok(Err(e)) => {
1292 error!("Error receiving UDP packet: {}", e);
1293 break;
1294 }
1295 Err(_) => {
1296 continue;
1298 }
1299 }
1300 }
1301
1302 reporter.complete();
1304 let _ = reporter_task.await;
1305
1306 self.measurements.set_duration(start.elapsed());
1307
1308 let final_measurements = self.measurements.get();
1309
1310 let (lost, expected) = self.measurements.calculate_udp_loss();
1312 let loss_percent = if expected > 0 {
1313 (lost as f64 / expected as f64) * 100.0
1314 } else {
1315 0.0
1316 };
1317
1318 self.notify(ProgressEvent::TestCompleted {
1320 total_bytes: final_measurements.total_bytes_sent
1321 + final_measurements.total_bytes_received,
1322 duration: final_measurements.total_duration,
1323 bits_per_second: final_measurements.total_bits_per_second(),
1324 total_packets: Some(final_measurements.total_packets),
1325 jitter_ms: Some(final_measurements.jitter_ms),
1326 lost_packets: Some(lost),
1327 lost_percent: Some(loss_percent),
1328 out_of_order: Some(final_measurements.out_of_order_packets),
1329 });
1330
1331 if !self.config.json {
1332 print_results(&final_measurements, self.stream_id, self.config.reverse);
1333 } else {
1334 let system_info = Some(get_system_info());
1336 let test_config = TestConfig {
1337 protocol: self.config.protocol.as_str().to_string(),
1338 num_streams: self.config.parallel,
1339 blksize: self.config.buffer_size,
1340 omit: 0,
1341 duration: self.config.duration.as_secs(),
1342 reverse: self.config.reverse,
1343 };
1344 let detailed_results = self.measurements.get_detailed_results(
1345 None, system_info,
1347 test_config,
1348 );
1349 let json = serde_json::to_string_pretty(&detailed_results)?;
1350 println!("{}", json);
1351 }
1352
1353 Ok(())
1354 }
1355
1356 pub fn get_measurements(&self) -> crate::Measurements {
1406 self.measurements.get()
1407 }
1408}
1409
1410async fn send_data(
1411 stream: &mut TcpStream,
1412 stream_id: usize,
1413 measurements: &MeasurementsCollector,
1414 config: &Config,
1415 callback: &Option<CallbackRef>,
1416 buffer_pool: Arc<BufferPool>,
1417 cancel_token: &CancellationToken,
1418) -> Result<()> {
1419 let (reporter, receiver) = IntervalReporter::new();
1421 let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
1422
1423 let buffer = buffer_pool.get();
1424 let start = Instant::now();
1425 let mut last_interval = start;
1426 let mut interval_bytes = 0u64;
1427 let mut last_retransmits = 0u64;
1428
1429 while start.elapsed() < config.duration {
1430 if cancel_token.is_cancelled() {
1432 info!("Test cancelled by user");
1433 break;
1434 }
1435
1436 match stream.write(&buffer).await {
1437 Ok(n) => {
1438 measurements.record_bytes_sent(stream_id, n as u64);
1439 interval_bytes += n as u64;
1440
1441 if last_interval.elapsed() >= config.interval {
1443 let elapsed = start.elapsed();
1444 let interval_duration = last_interval.elapsed();
1445 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1446
1447 let interval_start = if elapsed > interval_duration {
1448 elapsed - interval_duration
1449 } else {
1450 Duration::ZERO
1451 };
1452
1453 let tcp_stats = get_tcp_stats(stream).ok();
1455 let current_retransmits =
1456 tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1457 let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1458 last_retransmits = current_retransmits;
1459
1460 measurements.add_interval(IntervalStats {
1461 start: interval_start,
1462 end: elapsed,
1463 bytes: interval_bytes,
1464 bits_per_second: bps,
1465 packets: u64::MAX,
1466 });
1467
1468 let cwnd_kbytes = tcp_stats
1470 .as_ref()
1471 .and_then(|s| s.snd_cwnd_opt())
1472 .map(|cwnd| cwnd / 1024);
1473
1474 reporter.report(IntervalReport {
1476 stream_id,
1477 interval_start,
1478 interval_end: elapsed,
1479 bytes: interval_bytes,
1480 bits_per_second: bps,
1481 packets: None,
1482 jitter_ms: None,
1483 lost_packets: None,
1484 lost_percent: None,
1485 retransmits: if interval_retransmits > 0 {
1486 Some(interval_retransmits)
1487 } else {
1488 None
1489 },
1490 cwnd: cwnd_kbytes,
1491 });
1492
1493 interval_bytes = 0;
1494 last_interval = Instant::now();
1495 }
1496 }
1497 Err(e) => {
1498 error!("Error sending data: {}", e);
1499 break;
1500 }
1501 }
1502 }
1503
1504 reporter.complete();
1506 let _ = reporter_task.await;
1507
1508 measurements.set_duration(start.elapsed());
1509 stream.flush().await?;
1510
1511 Ok(())
1512}
1513
1514async fn receive_data(
1515 stream: &mut TcpStream,
1516 stream_id: usize,
1517 measurements: &MeasurementsCollector,
1518 config: &Config,
1519 callback: &Option<CallbackRef>,
1520 buffer_pool: Arc<BufferPool>,
1521 cancel_token: &CancellationToken,
1522) -> Result<()> {
1523 let (reporter, receiver) = IntervalReporter::new();
1525 let reporter_task = tokio::spawn(run_reporter_task(receiver, config.json, callback.clone()));
1526
1527 let mut buffer = buffer_pool.get();
1528 let start = Instant::now();
1529 let mut last_interval = start;
1530 let mut interval_bytes = 0u64;
1531 let mut last_retransmits = 0u64;
1532
1533 while start.elapsed() < config.duration {
1534 if cancel_token.is_cancelled() {
1536 info!("Test cancelled by user");
1537 break;
1538 }
1539
1540 match time::timeout(Duration::from_millis(100), stream.read(&mut buffer)).await {
1541 Ok(Ok(0)) => {
1542 break;
1544 }
1545 Ok(Ok(n)) => {
1546 measurements.record_bytes_received(stream_id, n as u64);
1547 interval_bytes += n as u64;
1548
1549 if last_interval.elapsed() >= config.interval {
1551 let elapsed = start.elapsed();
1552 let interval_duration = last_interval.elapsed();
1553 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1554
1555 let interval_start = if elapsed > interval_duration {
1556 elapsed - interval_duration
1557 } else {
1558 Duration::ZERO
1559 };
1560
1561 let tcp_stats = get_tcp_stats(stream).ok();
1563 let current_retransmits =
1564 tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1565 let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1566 last_retransmits = current_retransmits;
1567
1568 measurements.add_interval(IntervalStats {
1569 start: interval_start,
1570 end: elapsed,
1571 bytes: interval_bytes,
1572 bits_per_second: bps,
1573 packets: u64::MAX,
1574 });
1575
1576 reporter.report(IntervalReport {
1578 stream_id,
1579 interval_start,
1580 interval_end: elapsed,
1581 bytes: interval_bytes,
1582 bits_per_second: bps,
1583 packets: None,
1584 jitter_ms: None,
1585 lost_packets: None,
1586 lost_percent: None,
1587 retransmits: if interval_retransmits > 0 {
1588 Some(interval_retransmits)
1589 } else {
1590 None
1591 },
1592 cwnd: None, });
1594
1595 interval_bytes = 0;
1596 last_interval = Instant::now();
1597 }
1598 }
1599 Ok(Err(e)) => {
1600 error!("Error receiving data: {}", e);
1601 break;
1602 }
1603 Err(_) => {
1604 if start.elapsed() >= config.duration {
1606 break;
1607 }
1608 }
1609 }
1610 }
1611
1612 reporter.complete();
1614 let _ = reporter_task.await;
1615
1616 measurements.set_duration(start.elapsed());
1617
1618 Ok(())
1619}
1620
1621fn print_results(measurements: &crate::Measurements, stream_id: usize, _reverse: bool) {
1622 let is_udp = measurements.total_packets > 0;
1623
1624 if !is_udp {
1625 println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
1627
1628 let duration = measurements.total_duration.as_secs_f64();
1629
1630 println!("[ ID] Interval Transfer Bitrate Retr");
1632
1633 let sent_bytes = measurements.total_bytes_sent;
1635 let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
1636 (sent_bytes as f64 / 1_000_000_000.0, "GBytes")
1637 } else {
1638 (sent_bytes as f64 / 1_000_000.0, "MBytes")
1639 };
1640 let sent_bps = (sent_bytes as f64 * 8.0) / duration;
1641 let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
1642 (sent_bps / 1_000_000_000.0, "Gbits/sec")
1643 } else {
1644 (sent_bps / 1_000_000.0, "Mbits/sec")
1645 };
1646
1647 println!(
1648 "[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:4} sender",
1649 stream_id,
1650 0.0,
1651 duration,
1652 sent_val,
1653 sent_unit,
1654 sent_bitrate_val,
1655 sent_bitrate_unit,
1656 0 );
1658
1659 if measurements.total_bytes_received > 0 {
1661 let recv_bytes = measurements.total_bytes_received;
1662 let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
1663 (recv_bytes as f64 / 1_000_000_000.0, "GBytes")
1664 } else {
1665 (recv_bytes as f64 / 1_000_000.0, "MBytes")
1666 };
1667 let recv_bps = (recv_bytes as f64 * 8.0) / duration;
1668 let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
1669 (recv_bps / 1_000_000_000.0, "Gbits/sec")
1670 } else {
1671 (recv_bps / 1_000_000.0, "Mbits/sec")
1672 };
1673
1674 println!(
1675 "[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} receiver",
1676 stream_id, 0.0, duration, recv_val, recv_unit, recv_bitrate_val, recv_bitrate_unit
1677 );
1678 }
1679
1680 println!();
1681 } else {
1682 println!("- - - - - - - - - - - - - - - - - - - - - - - - -");
1684
1685 let duration = measurements.total_duration.as_secs_f64();
1686
1687 let (lost, expected) = if measurements.total_bytes_received > 0 {
1689 let (l, e) = measurements.calculate_udp_loss();
1690 (l, e)
1691 } else {
1692 (0, measurements.total_packets)
1693 };
1694
1695 let loss_percent = if expected > 0 {
1696 (lost as f64 / expected as f64) * 100.0
1697 } else {
1698 0.0
1699 };
1700
1701 println!(
1703 "[ ID] Interval Transfer Bitrate Jitter Lost/Total Datagrams"
1704 );
1705
1706 if measurements.total_bytes_sent > 0 {
1708 let sent_bytes = measurements.total_bytes_sent;
1709 let (sent_val, sent_unit) = if sent_bytes >= 1_000_000_000 {
1710 (sent_bytes as f64 / 1_000_000_000.0, "GBytes")
1711 } else if sent_bytes >= 1_000_000 {
1712 (sent_bytes as f64 / 1_000_000.0, "MBytes")
1713 } else {
1714 (sent_bytes as f64 / 1_000.0, "KBytes")
1715 };
1716 let sent_bps = (sent_bytes as f64 * 8.0) / duration;
1717 let (sent_bitrate_val, sent_bitrate_unit) = if sent_bps >= 1_000_000_000.0 {
1718 (sent_bps / 1_000_000_000.0, "Gbits/sec")
1719 } else {
1720 (sent_bps / 1_000_000.0, "Mbits/sec")
1721 };
1722
1723 println!(
1724 "[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:6.3} ms {}/{} ({:.0}%) sender",
1725 stream_id,
1726 0.0,
1727 duration,
1728 sent_val,
1729 sent_unit,
1730 sent_bitrate_val,
1731 sent_bitrate_unit,
1732 0.0, lost,
1734 expected,
1735 loss_percent
1736 );
1737 }
1738
1739 if measurements.total_bytes_received > 0 {
1741 let recv_bytes = measurements.total_bytes_received;
1742 let (recv_val, recv_unit) = if recv_bytes >= 1_000_000_000 {
1743 (recv_bytes as f64 / 1_000_000_000.0, "GBytes")
1744 } else if recv_bytes >= 1_000_000 {
1745 (recv_bytes as f64 / 1_000_000.0, "MBytes")
1746 } else {
1747 (recv_bytes as f64 / 1_000.0, "KBytes")
1748 };
1749 let recv_bps = (recv_bytes as f64 * 8.0) / duration;
1750 let (recv_bitrate_val, recv_bitrate_unit) = if recv_bps >= 1_000_000_000.0 {
1751 (recv_bps / 1_000_000_000.0, "Gbits/sec")
1752 } else {
1753 (recv_bps / 1_000_000.0, "Mbits/sec")
1754 };
1755
1756 println!(
1757 "[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:6.3} ms {}/{} ({:.0}%) receiver",
1758 stream_id,
1759 0.0,
1760 duration,
1761 recv_val,
1762 recv_unit,
1763 recv_bitrate_val,
1764 recv_bitrate_unit,
1765 measurements.jitter_ms,
1766 lost,
1767 expected,
1768 loss_percent
1769 );
1770 }
1771
1772 println!();
1773 }
1774}