1use core::f64;
2use std::{
3 net::{Ipv4Addr, SocketAddrV4},
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Result;
9use tokio::{
10 net::{TcpStream, UdpSocket},
11 select, spawn,
12 sync::{Mutex, oneshot},
13 time::sleep,
14 try_join,
15};
16use tracing::*;
17use twamp_rs::control_client::ControlClient;
18use twamp_rs::session_sender::SessionSender;
19use twamp_rs::timestamp::timestamp::TimeStamp;
20use twamp_rs::twamp_test::twamp_test_unauth_reflected::TwampTestPacketUnauthReflected;
21
22#[derive(Debug, Default)]
23pub struct Controller {
24 control_client: ControlClient,
25 session_sender: Option<Arc<SessionSender>>,
26}
27
28impl Controller {
29 pub fn new() -> Self {
30 Controller {
31 control_client: ControlClient::default(),
32 session_sender: None,
33 }
34 }
35
36 pub async fn do_twamp(
41 mut self,
42 responder_addr: Ipv4Addr,
43 responder_port: u16,
44 controller_addr: Ipv4Addr,
45 mut controller_port: u16,
46 responder_reflect_port: u16,
47 number_of_test_packets: u32,
48 reflector_timeout: u64,
49 stop_session_sleep: u64,
50 ) -> Result<()> {
51 let twamp_control =
52 TcpStream::connect(SocketAddrV4::new(responder_addr, responder_port)).await?;
53 let udp_socket =
54 UdpSocket::bind(SocketAddrV4::new(controller_addr, controller_port)).await?;
55 controller_port = udp_socket.local_addr().unwrap().port();
56
57 let (start_session_tx, start_session_rx) = oneshot::channel::<()>();
58 let (twamp_test_complete_tx, twamp_test_complete_rx) = oneshot::channel::<()>();
59 let (reflector_port_tx, reflector_port_rx) = oneshot::channel::<u16>();
60 let control_client_handle = spawn(async move {
61 self.control_client
62 .do_twamp_control(
63 twamp_control,
64 start_session_tx,
65 reflector_port_tx,
66 responder_reflect_port,
67 controller_port,
68 reflector_timeout,
69 twamp_test_complete_rx,
70 )
71 .await
72 .unwrap();
73 });
74 let reflected_pkts_vec: Arc<Mutex<Vec<(TwampTestPacketUnauthReflected, TimeStamp)>>> =
75 Arc::new(Mutex::new(Vec::new()));
76 let reflected_pkts_vec_cloned = Arc::clone(&reflected_pkts_vec);
77 let session_sender_handle = spawn(async move {
78 let final_port = reflector_port_rx.await.unwrap();
80 debug!("Received reflector port: {}", final_port);
81 udp_socket
82 .connect(SocketAddrV4::new(responder_addr, final_port))
83 .await
84 .unwrap();
85 start_session_rx.await.unwrap();
87 debug!("Start-Session identified. Start Session-Sender.");
88 self.session_sender = Some(Arc::new(
89 SessionSender::new(
90 Arc::new(udp_socket),
91 SocketAddrV4::new(responder_addr, final_port),
92 )
93 .await,
94 ));
95 let session_sender_send = Arc::clone(self.session_sender.as_ref().unwrap());
96 let session_sender_recv = Arc::clone(self.session_sender.as_ref().unwrap());
97 let send_task = spawn(async move {
98 let _ = session_sender_send.send_it(number_of_test_packets).await;
99 info!("Sent all test packets");
100 });
101 let recv_task = spawn(async move {
102 let _ = session_sender_recv
103 .recv(number_of_test_packets, reflected_pkts_vec_cloned)
104 .await;
105 info!("Got back all test packets");
106 });
107 send_task.await.unwrap();
109
110 select! {
111 _ = sleep(Duration::from_secs(stop_session_sleep)) => (),
114 _ = recv_task => ()
117 }
118 twamp_test_complete_tx.send(()).unwrap();
120 });
121 try_join!(control_client_handle, session_sender_handle).unwrap();
122 debug!("Control-Client & Session-Sender tasks completed.");
123 let acquired_vec = reflected_pkts_vec.lock().await;
124 debug!("Reflected pkts len: {}", acquired_vec.len());
125 get_metrics(&acquired_vec, number_of_test_packets as f64);
126 Ok(())
127 }
128}
129
130fn get_metrics(pkts: &Vec<(TwampTestPacketUnauthReflected, TimeStamp)>, total_sent: f64) {
131 info!("Producing metrics");
132 let received = pkts.len() as f64;
133 let total_packets_lost = total_sent - received;
134 let total_packets_sent = total_sent;
135 let packet_loss = (total_packets_lost / total_packets_sent) * 100.0;
136 info!("Packet loss: {}%", packet_loss.trunc());
137
138 let mut rtt_pkts: Vec<f64> = vec![];
140 let mut sender_to_reflector: Vec<f64> = vec![];
141 let mut reflector_to_sender: Vec<f64> = vec![];
142 for pkt in pkts {
143 let t1: f64 = pkt.0.sender_timestamp.into();
144 let t2: f64 = pkt.0.receive_timestamp.into();
145 let t3: f64 = pkt.0.timestamp.into();
146 let t4: f64 = pkt.1.into();
147
148 let rtt = (t4 - t1) - (t3 - t2);
149 let one_way_delay_sent = t2 - t1;
150 let one_way_delay_recv = t4 - t3;
151 rtt_pkts.push(rtt);
152 sender_to_reflector.push(one_way_delay_sent);
153 reflector_to_sender.push(one_way_delay_recv);
154 }
155 let rtt_avg = rtt_pkts.iter().sum::<f64>() / received;
156 let sender_to_reflector_avg = sender_to_reflector.iter().sum::<f64>() / received;
157 let reflector_to_sender_avg = reflector_to_sender.iter().sum::<f64>() / received;
158 let rtt_min = rtt_pkts.iter().copied().fold(f64::INFINITY, f64::min);
159 let rtt_max = rtt_pkts.iter().copied().fold(f64::NEG_INFINITY, f64::max);
160
161 info!("RTT (MIN): {:.2}ms", (rtt_min * 1e3));
162 info!("RTT (MAX): {:.2}ms", (rtt_max * 1e3));
163 info!("RTT (AVG): {:.2}ms", (rtt_avg * 1e3));
164 info!(
165 "OWD (Sender -> Reflector) (AVG): {:.2}ms",
166 (sender_to_reflector_avg * 1e3)
167 );
168 info!(
169 "OWD (Reflector -> Sender) (AVG): {:.2}ms",
170 (reflector_to_sender_avg * 1e3)
171 );
172
173 let mut jitter = 0.0;
174 for i in 1..rtt_pkts.len() {
175 let rtt_diff = (rtt_pkts[i] - rtt_pkts[i - 1]).abs();
176 jitter = jitter + (rtt_diff - jitter) / 16.0;
177 }
178
179 info!("Jitter: {:.2}ms", jitter * 1e3)
180}