1use crate::buffer_pool::BufferPool;
2use crate::config::Config;
3use crate::interval_reporter::{run_reporter_task, IntervalReport, IntervalReporter};
4use crate::measurements::{get_tcp_stats, IntervalStats, MeasurementsCollector};
5use crate::protocol::{deserialize_message, serialize_message, Message, DEFAULT_STREAM_ID};
6use crate::{Error, Result};
7use log::{debug, error, info};
8use socket2::SockRef;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::time;
15use tokio_util::sync::CancellationToken;
16
17fn configure_tcp_socket(stream: &TcpStream) -> Result<()> {
36 stream.set_nodelay(true).map_err(|e| {
38 Error::Io(std::io::Error::new(
39 e.kind(),
40 format!("Failed to set TCP_NODELAY: {}", e),
41 ))
42 })?;
43
44 const BUFFER_SIZE: usize = 256 * 1024; let sock_ref = SockRef::from(stream);
47
48 sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
49 Error::Io(std::io::Error::new(
50 e.kind(),
51 format!("Failed to set send buffer size: {}", e),
52 ))
53 })?;
54
55 sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
56 Error::Io(std::io::Error::new(
57 e.kind(),
58 format!("Failed to set recv buffer size: {}", e),
59 ))
60 })?;
61
62 debug!(
63 "TCP socket configured: TCP_NODELAY=true, buffers={}KB",
64 BUFFER_SIZE / 1024
65 );
66
67 Ok(())
68}
69
70fn configure_udp_socket(socket: &UdpSocket) -> Result<()> {
88 const BUFFER_SIZE: usize = 2 * 1024 * 1024; let sock_ref = SockRef::from(socket);
91
92 sock_ref.set_send_buffer_size(BUFFER_SIZE).map_err(|e| {
93 Error::Io(std::io::Error::new(
94 e.kind(),
95 format!("Failed to set UDP send buffer size: {}", e),
96 ))
97 })?;
98
99 sock_ref.set_recv_buffer_size(BUFFER_SIZE).map_err(|e| {
100 Error::Io(std::io::Error::new(
101 e.kind(),
102 format!("Failed to set UDP recv buffer size: {}", e),
103 ))
104 })?;
105
106 debug!(
107 "UDP socket configured: buffers={}MB",
108 BUFFER_SIZE / (1024 * 1024)
109 );
110
111 Ok(())
112}
113
114pub struct Server {
167 config: Config,
168 measurements: MeasurementsCollector,
169 tcp_buffer_pool: Arc<BufferPool>,
170 udp_buffer_pool: Arc<BufferPool>,
171 cancellation_token: CancellationToken,
172}
173
174impl Server {
175 pub fn new(config: Config) -> Self {
190 let tcp_pool_size = config.parallel * 2; let tcp_buffer_pool = Arc::new(BufferPool::new(config.buffer_size, tcp_pool_size));
194
195 let udp_buffer_pool = Arc::new(BufferPool::new(65536, 10));
197
198 Self {
199 config,
200 measurements: MeasurementsCollector::new(),
201 tcp_buffer_pool,
202 udp_buffer_pool,
203 cancellation_token: CancellationToken::new(),
204 }
205 }
206
207 pub fn cancellation_token(&self) -> &CancellationToken {
234 &self.cancellation_token
235 }
236
237 pub async fn run(&self) -> Result<()> {
265 let bind_addr = format!(
266 "{}:{}",
267 self.config
268 .bind_addr
269 .map(|a| a.to_string())
270 .unwrap_or_else(|| "0.0.0.0".to_string()),
271 self.config.port
272 );
273
274 info!("Starting rperf3 server on {}", bind_addr);
275
276 self.run_tcp(&bind_addr).await
280 }
281
282 async fn run_tcp(&self, bind_addr: &str) -> Result<()> {
283 let listener = TcpListener::bind(bind_addr).await?;
284 info!("TCP server listening on {}", bind_addr);
285
286 loop {
287 if self.cancellation_token.is_cancelled() {
289 info!("Server shutting down gracefully");
290 break;
291 }
292
293 tokio::select! {
294 accept_result = listener.accept() => {
295 match accept_result {
296 Ok((stream, addr)) => {
297 info!("New connection from {}", addr);
298 let config = self.config.clone();
299 let measurements = self.measurements.clone();
300 let tcp_buffer_pool = self.tcp_buffer_pool.clone();
301 let udp_buffer_pool = self.udp_buffer_pool.clone();
302
303 tokio::spawn(async move {
304 if let Err(e) = handle_tcp_client(
305 stream,
306 addr,
307 config,
308 measurements,
309 tcp_buffer_pool,
310 udp_buffer_pool,
311 )
312 .await
313 {
314 error!("Error handling client {}: {}", addr, e);
315 }
316 });
317 }
318 Err(e) => {
319 error!("Error accepting connection: {}", e);
320 }
321 }
322 }
323 _ = self.cancellation_token.cancelled() => {
324 info!("Server shutting down gracefully");
325 break;
326 }
327 }
328 }
329 Ok(())
330 }
331
332 #[allow(dead_code)]
333 async fn run_udp(&self, bind_addr: &str) -> Result<()> {
334 let socket = UdpSocket::bind(bind_addr).await?;
335 let local_addr = socket.local_addr()?;
336
337 configure_udp_socket(&socket)?;
339
340 info!("UDP server listening on {}", local_addr);
341
342 #[cfg(target_os = "linux")]
344 return self.run_udp_batched(socket).await;
345
346 #[cfg(not(target_os = "linux"))]
347 return self.run_udp_standard(socket).await;
348 }
349
350 #[cfg_attr(target_os = "linux", allow(dead_code))]
352 async fn run_udp_standard(&self, socket: UdpSocket) -> Result<()> {
353 let (reporter, receiver) = IntervalReporter::new();
355 let reporter_task = tokio::spawn(run_reporter_task(
356 receiver,
357 self.config.json,
358 None, ));
360
361 let mut buf = self.udp_buffer_pool.get();
362 let start = Instant::now();
363 let mut last_interval = start;
364 let mut interval_bytes = 0u64;
365 let mut interval_packets = 0u64;
366
367 loop {
368 if self.cancellation_token.is_cancelled() {
370 info!("Server shutting down gracefully");
371 break;
372 }
373
374 match socket.recv_from(&mut buf).await {
375 Ok((len, addr)) => {
376 debug!("Received {} bytes from {}", len, addr);
377
378 if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buf[..len]) {
380 let recv_timestamp_us = std::time::SystemTime::now()
382 .duration_since(std::time::UNIX_EPOCH)
383 .expect("Time went backwards")
384 .as_micros() as u64;
385
386 self.measurements.record_udp_packet_received(
388 header.sequence,
389 header.timestamp_us,
390 recv_timestamp_us,
391 );
392 self.measurements.record_bytes_received(0, len as u64);
393
394 interval_bytes += len as u64;
395 interval_packets += 1;
396 } else {
397 debug!("Received non-rperf3 UDP packet from {}", addr);
398 }
399
400 if last_interval.elapsed() >= self.config.interval {
402 let elapsed = start.elapsed();
403 let interval_duration = last_interval.elapsed();
404 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
405
406 let interval_start = if elapsed > interval_duration {
407 elapsed - interval_duration
408 } else {
409 Duration::ZERO
410 };
411
412 self.measurements.add_interval(IntervalStats {
413 start: interval_start,
414 end: elapsed,
415 bytes: interval_bytes,
416 bits_per_second: bps,
417 packets: interval_packets,
418 });
419
420 let (lost, expected) = self.measurements.calculate_udp_loss();
422 let loss_percent = if expected > 0 {
423 (lost as f64 / expected as f64) * 100.0
424 } else {
425 0.0
426 };
427 let measurements = self.measurements.get();
428
429 reporter.report(IntervalReport {
431 stream_id: DEFAULT_STREAM_ID,
432 interval_start,
433 interval_end: elapsed,
434 bytes: interval_bytes,
435 bits_per_second: bps,
436 packets: Some(interval_packets),
437 jitter_ms: Some(measurements.jitter_ms),
438 lost_packets: Some(lost),
439 lost_percent: Some(loss_percent),
440 retransmits: None,
441 cwnd: None,
442 });
443
444 interval_bytes = 0;
445 interval_packets = 0;
446 last_interval = Instant::now();
447 }
448 }
449 Err(e) => {
450 error!("Error receiving UDP packet: {}", e);
451 }
452 }
453 }
454
455 reporter.complete();
457 let _ = reporter_task.await;
458
459 Ok(())
460 }
461
462 #[allow(dead_code)]
464 #[cfg(target_os = "linux")]
465 async fn run_udp_batched(&self, socket: UdpSocket) -> Result<()> {
466 use crate::batch_socket::UdpRecvBatch;
467
468 let (reporter, receiver) = IntervalReporter::new();
470 let reporter_task = tokio::spawn(run_reporter_task(
471 receiver,
472 self.config.json,
473 None, ));
475
476 let mut batch = UdpRecvBatch::new();
477 let start = Instant::now();
478 let mut last_interval = start;
479 let mut interval_bytes = 0u64;
480 let mut interval_packets = 0u64;
481
482 loop {
483 if self.cancellation_token.is_cancelled() {
485 info!("Server shutting down gracefully");
486 break;
487 }
488
489 match batch.recv(&socket).await {
491 Ok(count) => {
492 if count == 0 {
493 continue;
494 }
495
496 debug!("Received {} packets in batch", count);
497
498 for i in 0..count {
500 if let Some((packet, addr)) = batch.get(i) {
501 debug!(
502 "Processing packet {} of {} bytes from {}",
503 i,
504 packet.len(),
505 addr
506 );
507
508 if let Some((header, _payload)) =
510 crate::udp_packet::parse_packet(packet)
511 {
512 let recv_timestamp_us = std::time::SystemTime::now()
514 .duration_since(std::time::UNIX_EPOCH)
515 .expect("Time went backwards")
516 .as_micros()
517 as u64;
518
519 self.measurements.record_udp_packet_received(
521 header.sequence,
522 header.timestamp_us,
523 recv_timestamp_us,
524 );
525 self.measurements
526 .record_bytes_received(0, packet.len() as u64);
527
528 interval_bytes += packet.len() as u64;
529 interval_packets += 1;
530 } else {
531 debug!("Received non-rperf3 UDP packet from {}", addr);
532 }
533 }
534 }
535
536 if last_interval.elapsed() >= self.config.interval {
538 let elapsed = start.elapsed();
539 let interval_duration = last_interval.elapsed();
540 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
541
542 let interval_start = if elapsed > interval_duration {
543 elapsed - interval_duration
544 } else {
545 Duration::ZERO
546 };
547
548 self.measurements.add_interval(IntervalStats {
549 start: interval_start,
550 end: elapsed,
551 bytes: interval_bytes,
552 bits_per_second: bps,
553 packets: interval_packets,
554 });
555
556 let (lost, expected) = self.measurements.calculate_udp_loss();
558 let loss_percent = if expected > 0 {
559 (lost as f64 / expected as f64) * 100.0
560 } else {
561 0.0
562 };
563 let measurements = self.measurements.get();
564
565 reporter.report(IntervalReport {
567 stream_id: DEFAULT_STREAM_ID,
568 interval_start,
569 interval_end: elapsed,
570 bytes: interval_bytes,
571 bits_per_second: bps,
572 packets: Some(interval_packets),
573 jitter_ms: Some(measurements.jitter_ms),
574 lost_packets: Some(lost),
575 lost_percent: Some(loss_percent),
576 retransmits: None,
577 cwnd: None,
578 });
579
580 interval_bytes = 0;
581 interval_packets = 0;
582 last_interval = Instant::now();
583 }
584 }
585 Err(e) => {
586 error!("Error receiving UDP batch: {}", e);
587 }
588 }
589 }
590
591 reporter.complete();
593 let _ = reporter_task.await;
594
595 Ok(())
596 }
597
598 pub fn get_measurements(&self) -> crate::Measurements {
623 self.measurements.get()
624 }
625}
626
627async fn handle_tcp_client(
628 mut stream: TcpStream,
629 addr: SocketAddr,
630 config: Config,
631 measurements: MeasurementsCollector,
632 tcp_buffer_pool: Arc<BufferPool>,
633 udp_buffer_pool: Arc<BufferPool>,
634) -> Result<()> {
635 configure_tcp_socket(&stream)?;
637
638 let setup_msg = deserialize_message(&mut stream).await?;
640
641 let (protocol, duration, reverse, _parallel, bandwidth, buffer_size) = match setup_msg {
642 Message::Setup {
643 version: _,
644 protocol,
645 duration,
646 reverse,
647 parallel,
648 bandwidth,
649 buffer_size,
650 ..
651 } => {
652 info!(
653 "Client {} setup: protocol={}, duration={}s, reverse={}, parallel={}",
654 addr, protocol, duration, reverse, parallel
655 );
656 (
657 protocol,
658 Duration::from_secs(duration),
659 reverse,
660 parallel,
661 bandwidth,
662 buffer_size,
663 )
664 }
665 _ => {
666 return Err(Error::Protocol("Expected Setup message".to_string()));
667 }
668 };
669
670 if protocol == "Udp" {
672 let mut udp_config = config.clone();
674 udp_config.duration = duration;
675 udp_config.reverse = reverse;
676 udp_config.bandwidth = bandwidth;
677 udp_config.buffer_size = buffer_size;
678
679 return handle_udp_test(stream, addr, udp_config, measurements, udp_buffer_pool).await;
681 }
682
683 let ack = Message::setup_ack(config.port, format!("{}", addr));
685 let ack_bytes = serialize_message(&ack)?;
686 stream.write_all(&ack_bytes).await?;
687 stream.flush().await?;
688
689 let start_msg = Message::start(
691 std::time::SystemTime::now()
692 .duration_since(std::time::UNIX_EPOCH)
693 .unwrap()
694 .as_secs(),
695 );
696 let start_bytes = serialize_message(&start_msg)?;
697 stream.write_all(&start_bytes).await?;
698 stream.flush().await?;
699
700 measurements.set_start_time(Instant::now());
701
702 if reverse {
703 send_data(
705 &mut stream,
706 0,
707 duration,
708 bandwidth,
709 &measurements,
710 &config,
711 tcp_buffer_pool.clone(),
712 )
713 .await?;
714 } else {
715 receive_data(
717 &mut stream,
718 0,
719 duration,
720 &measurements,
721 &config,
722 tcp_buffer_pool.clone(),
723 )
724 .await?;
725 }
726
727 let final_measurements = measurements.get();
729 if let Some(stream_stats) = final_measurements.streams.first() {
730 let result_msg = Message::result(
731 0,
732 stream_stats.bytes_sent,
733 stream_stats.bytes_received,
734 final_measurements.total_duration.as_secs_f64(),
735 final_measurements.total_bits_per_second(),
736 None,
737 );
738 let result_bytes = serialize_message(&result_msg)?;
739 stream.write_all(&result_bytes).await?;
740 stream.flush().await?;
741 }
742
743 let done_msg = Message::done();
745 let done_bytes = serialize_message(&done_msg)?;
746 stream.write_all(&done_bytes).await?;
747 stream.flush().await?;
748
749 info!(
750 "Test completed for {}: {:.2} Mbps",
751 addr,
752 final_measurements.total_bits_per_second() / 1_000_000.0
753 );
754
755 Ok(())
756}
757
758async fn handle_udp_test(
759 mut control_stream: TcpStream,
760 client_addr: SocketAddr,
761 config: Config,
762 measurements: MeasurementsCollector,
763 udp_buffer_pool: Arc<BufferPool>,
764) -> Result<()> {
765 let duration = config.duration;
766 let reverse = config.reverse;
767 let bandwidth = config.bandwidth;
768 let buffer_size = config.buffer_size;
769 let ack = Message::setup_ack(config.port, format!("{}", client_addr));
771 let ack_bytes = serialize_message(&ack)?;
772 control_stream.write_all(&ack_bytes).await?;
773 control_stream.flush().await?;
774
775 let start_msg = Message::start(
777 std::time::SystemTime::now()
778 .duration_since(std::time::UNIX_EPOCH)
779 .unwrap()
780 .as_secs(),
781 );
782 let start_bytes = serialize_message(&start_msg)?;
783 control_stream.write_all(&start_bytes).await?;
784 control_stream.flush().await?;
785
786 measurements.set_start_time(Instant::now());
787
788 if reverse {
789 send_udp_data(
791 client_addr,
792 duration,
793 bandwidth,
794 buffer_size,
795 &measurements,
796 &config,
797 udp_buffer_pool.clone(),
798 )
799 .await?;
800 } else {
801 receive_udp_data(duration, &measurements, &config, udp_buffer_pool.clone()).await?;
803 }
804
805 info!(
806 "UDP test completed for {}: {:.2} Mbps",
807 client_addr,
808 measurements.get().total_bits_per_second() / 1_000_000.0
809 );
810
811 Ok(())
812}
813
814async fn send_udp_data(
815 _client_tcp_addr: SocketAddr,
816 duration: Duration,
817 bandwidth: Option<u64>,
818 buffer_size: usize,
819 measurements: &MeasurementsCollector,
820 config: &Config,
821 buffer_pool: Arc<BufferPool>,
822) -> Result<()> {
823 let bind_addr = format!("0.0.0.0:{}", config.port);
828 let socket = UdpSocket::bind(&bind_addr).await?;
829
830 configure_udp_socket(&socket)?;
832
833 info!("UDP server listening on port {}", config.port);
834
835 let mut buf = buffer_pool.get();
837 let (_n, client_udp_addr) = socket.recv_from(&mut buf).await?;
838
839 info!("UDP client address discovered: {}", client_udp_addr);
840
841 socket.connect(client_udp_addr).await?;
843
844 let start = Instant::now();
845 let mut last_interval = start;
846 let mut interval_bytes = 0u64;
847 let mut interval_packets = 0u64;
848 let mut sequence = 0u64;
849
850 let payload_size = if buffer_size > crate::udp_packet::UdpPacketHeader::SIZE {
852 buffer_size - crate::udp_packet::UdpPacketHeader::SIZE
853 } else {
854 1024
855 };
856
857 let target_bytes_per_sec = bandwidth.map(|bw| bw / 8);
859 let mut total_bytes_sent = 0u64;
860 let mut last_bandwidth_check = start;
861
862 while start.elapsed() < duration {
863 let packet = crate::udp_packet::create_packet_fast(sequence, payload_size);
864
865 match socket.send(&packet).await {
866 Ok(n) => {
867 measurements.record_bytes_sent(0, n as u64);
868 measurements.record_udp_packet(0);
869 interval_bytes += n as u64;
870 interval_packets += 1;
871 sequence += 1;
872 total_bytes_sent += n as u64;
873
874 if let Some(target_bps) = target_bytes_per_sec {
876 let elapsed = last_bandwidth_check.elapsed().as_secs_f64();
877
878 if elapsed >= 0.001 {
879 let expected_bytes = (target_bps as f64 * elapsed) as u64;
880 let bytes_sent_in_period = total_bytes_sent;
881
882 if bytes_sent_in_period > expected_bytes {
883 let bytes_ahead = (bytes_sent_in_period - expected_bytes) as f64;
884 let sleep_time = bytes_ahead / target_bps as f64;
885 if sleep_time > 0.0001 {
886 time::sleep(Duration::from_secs_f64(sleep_time)).await;
887 }
888 }
889
890 last_bandwidth_check = Instant::now();
891 total_bytes_sent = 0;
892 }
893 }
894
895 if last_interval.elapsed() >= config.interval {
897 let elapsed = start.elapsed();
898 let interval_duration = last_interval.elapsed();
899 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
900
901 let interval_start = if elapsed > interval_duration {
902 elapsed - interval_duration
903 } else {
904 Duration::ZERO
905 };
906
907 measurements.add_interval(IntervalStats {
908 start: interval_start,
909 end: elapsed,
910 bytes: interval_bytes,
911 bits_per_second: bps,
912 packets: interval_packets,
913 });
914
915 interval_bytes = 0;
916 interval_packets = 0;
917 last_interval = Instant::now();
918 }
919 }
920 Err(e) => {
921 error!("Error sending UDP packet: {}", e);
922 break;
923 }
924 }
925 }
926
927 measurements.set_duration(start.elapsed());
928 Ok(())
929}
930
931async fn receive_udp_data(
932 duration: Duration,
933 measurements: &MeasurementsCollector,
934 config: &Config,
935 buffer_pool: Arc<BufferPool>,
936) -> Result<()> {
937 let bind_addr = format!("0.0.0.0:{}", config.port);
939 let socket = UdpSocket::bind(&bind_addr).await?;
940
941 configure_udp_socket(&socket)?;
943
944 info!("UDP server listening for packets on port {}", config.port);
945
946 let start = Instant::now();
947 let mut buf = buffer_pool.get();
948
949 while start.elapsed() < duration {
951 let remaining = duration.saturating_sub(start.elapsed());
953 let timeout = remaining.min(Duration::from_millis(100));
954
955 match tokio::time::timeout(timeout, socket.recv_from(&mut buf)).await {
956 Ok(Ok((n, _addr))) => {
957 if let Some((header, _payload)) = crate::udp_packet::parse_packet(&buf[..n]) {
959 let recv_timestamp_us = std::time::SystemTime::now()
960 .duration_since(std::time::UNIX_EPOCH)
961 .unwrap()
962 .as_micros() as u64;
963
964 measurements.record_bytes_received(0, n as u64);
965 measurements.record_udp_packet_received(
966 header.sequence,
967 header.timestamp_us,
968 recv_timestamp_us,
969 );
970 }
971 }
972 Ok(Err(e)) => {
973 error!("Error receiving UDP packet: {}", e);
974 break;
975 }
976 Err(_) => {
977 continue;
979 }
980 }
981 }
982
983 measurements.set_duration(start.elapsed());
984 Ok(())
985}
986
987async fn send_data(
988 stream: &mut TcpStream,
989 stream_id: usize,
990 duration: Duration,
991 bandwidth: Option<u64>,
992 measurements: &MeasurementsCollector,
993 config: &Config,
994 buffer_pool: Arc<BufferPool>,
995) -> Result<()> {
996 let (reporter, receiver) = IntervalReporter::new();
998 let reporter_task = tokio::spawn(run_reporter_task(
999 receiver,
1000 config.json,
1001 None, ));
1003
1004 let buffer = buffer_pool.get();
1005 let start = Instant::now();
1006 let mut last_interval = start;
1007 let mut interval_bytes = 0u64;
1008 let mut last_retransmits = 0u64;
1009
1010 let target_bytes_per_sec = bandwidth.map(|bw| bw / 8);
1012 let mut total_bytes_sent = 0u64;
1013 let mut last_bandwidth_check = start;
1014
1015 while start.elapsed() < duration {
1016 match stream.write(&buffer).await {
1017 Ok(n) => {
1018 measurements.record_bytes_sent(stream_id, n as u64);
1019 interval_bytes += n as u64;
1020 total_bytes_sent += n as u64;
1021
1022 if let Some(target_bps) = target_bytes_per_sec {
1024 let elapsed = last_bandwidth_check.elapsed().as_secs_f64();
1025
1026 if elapsed >= 0.001 {
1027 let expected_bytes = (target_bps as f64 * elapsed) as u64;
1028 let bytes_sent_in_period = total_bytes_sent;
1029
1030 if bytes_sent_in_period > expected_bytes {
1031 let bytes_ahead = (bytes_sent_in_period - expected_bytes) as f64;
1032 let sleep_time = bytes_ahead / target_bps as f64;
1033 if sleep_time > 0.0001 {
1034 time::sleep(Duration::from_secs_f64(sleep_time)).await;
1035 }
1036 }
1037
1038 last_bandwidth_check = Instant::now();
1039 total_bytes_sent = 0;
1040 }
1041 }
1042
1043 if last_interval.elapsed() >= config.interval {
1045 let elapsed = start.elapsed();
1046 let interval_duration = last_interval.elapsed();
1047 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1048
1049 let interval_start = if elapsed > interval_duration {
1050 elapsed - interval_duration
1051 } else {
1052 Duration::ZERO
1053 };
1054
1055 let tcp_stats = get_tcp_stats(stream).ok();
1057 let current_retransmits =
1058 tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1059 let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1060 last_retransmits = current_retransmits;
1061
1062 measurements.add_interval(IntervalStats {
1063 start: interval_start,
1064 end: elapsed,
1065 bytes: interval_bytes,
1066 bits_per_second: bps,
1067 packets: u64::MAX,
1068 });
1069
1070 let cwnd_kbytes = tcp_stats
1072 .as_ref()
1073 .and_then(|s| s.snd_cwnd_opt())
1074 .map(|cwnd| cwnd / 1024);
1075
1076 reporter.report(IntervalReport {
1078 stream_id: DEFAULT_STREAM_ID,
1079 interval_start,
1080 interval_end: elapsed,
1081 bytes: interval_bytes,
1082 bits_per_second: bps,
1083 packets: None,
1084 jitter_ms: None,
1085 lost_packets: None,
1086 lost_percent: None,
1087 retransmits: if interval_retransmits > 0 {
1088 Some(interval_retransmits)
1089 } else {
1090 None
1091 },
1092 cwnd: cwnd_kbytes,
1093 });
1094
1095 interval_bytes = 0;
1096 last_interval = Instant::now();
1097 }
1098 }
1099 Err(e) => {
1100 error!("Error sending data: {}", e);
1101 break;
1102 }
1103 }
1104 }
1105
1106 reporter.complete();
1108 let _ = reporter_task.await;
1109
1110 measurements.set_duration(start.elapsed());
1111 stream.flush().await?;
1112
1113 Ok(())
1114}
1115
1116async fn receive_data(
1117 stream: &mut TcpStream,
1118 stream_id: usize,
1119 duration: Duration,
1120 measurements: &MeasurementsCollector,
1121 config: &Config,
1122 buffer_pool: Arc<BufferPool>,
1123) -> Result<()> {
1124 let (reporter, receiver) = IntervalReporter::new();
1126 let reporter_task = tokio::spawn(run_reporter_task(
1127 receiver,
1128 config.json,
1129 None, ));
1131
1132 let mut buffer = buffer_pool.get();
1133 let start = Instant::now();
1134 let mut last_interval = start;
1135 let mut interval_bytes = 0u64;
1136 let mut last_retransmits = 0u64;
1137
1138 while start.elapsed() < duration {
1139 match time::timeout(Duration::from_millis(100), stream.read(&mut buffer)).await {
1140 Ok(Ok(0)) => {
1141 break;
1143 }
1144 Ok(Ok(n)) => {
1145 measurements.record_bytes_received(stream_id, n as u64);
1146 interval_bytes += n as u64;
1147
1148 if last_interval.elapsed() >= config.interval {
1150 let elapsed = start.elapsed();
1151 let interval_duration = last_interval.elapsed();
1152 let bps = (interval_bytes as f64 * 8.0) / interval_duration.as_secs_f64();
1153
1154 let interval_start = if elapsed > interval_duration {
1155 elapsed - interval_duration
1156 } else {
1157 Duration::ZERO
1158 };
1159
1160 let tcp_stats = get_tcp_stats(stream).ok();
1162 let current_retransmits =
1163 tcp_stats.as_ref().map(|s| s.retransmits).unwrap_or(0);
1164 let interval_retransmits = current_retransmits.saturating_sub(last_retransmits);
1165 last_retransmits = current_retransmits;
1166
1167 measurements.add_interval(IntervalStats {
1168 start: interval_start,
1169 end: elapsed,
1170 bytes: interval_bytes,
1171 bits_per_second: bps,
1172 packets: u64::MAX,
1173 });
1174
1175 reporter.report(IntervalReport {
1177 stream_id: DEFAULT_STREAM_ID,
1178 interval_start,
1179 interval_end: elapsed,
1180 bytes: interval_bytes,
1181 bits_per_second: bps,
1182 packets: None,
1183 jitter_ms: None,
1184 lost_packets: None,
1185 lost_percent: None,
1186 retransmits: if interval_retransmits > 0 {
1187 Some(interval_retransmits)
1188 } else {
1189 None
1190 },
1191 cwnd: None,
1192 });
1193
1194 interval_bytes = 0;
1195 last_interval = Instant::now();
1196 }
1197 }
1198 Ok(Err(e)) => {
1199 error!("Error receiving data: {}", e);
1200 break;
1201 }
1202 Err(_) => {
1203 if start.elapsed() >= duration {
1205 break;
1206 }
1207 }
1208 }
1209 }
1210
1211 reporter.complete();
1213 let _ = reporter_task.await;
1214
1215 measurements.set_duration(start.elapsed());
1216
1217 Ok(())
1218}