controller/
controller.rs

1use core::f64;
2use std::{
3    net::{Ipv4Addr, SocketAddrV4},
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Result;
9use tokio::{
10    net::{TcpStream, UdpSocket},
11    select, spawn,
12    sync::{Mutex, oneshot},
13    time::sleep,
14    try_join,
15};
16use tracing::*;
17use twamp_rs::control_client::ControlClient;
18use twamp_rs::session_sender::SessionSender;
19use twamp_rs::timestamp::timestamp::TimeStamp;
20use twamp_rs::twamp_test::twamp_test_unauth_reflected::TwampTestPacketUnauthReflected;
21
22#[derive(Debug, Default)]
23pub struct Controller {
24    control_client: ControlClient,
25    session_sender: Option<Arc<SessionSender>>,
26}
27
28impl Controller {
29    pub fn new() -> Self {
30        Controller {
31            control_client: ControlClient::default(),
32            session_sender: None,
33        }
34    }
35
36    /// Informs `Control-Client` to establish TCP connection with provided
37    /// `server_addr` and negotiate a TWAMP session. The `Controller` does
38    /// not walk `Control-Client` through the TWAMP-Control communication.
39    /// That is up to `Control-Client` to handle.
40    pub async fn do_twamp(
41        mut self,
42        responder_addr: Ipv4Addr,
43        responder_port: u16,
44        controller_addr: Ipv4Addr,
45        mut controller_port: u16,
46        responder_reflect_port: u16,
47        number_of_test_packets: u32,
48        reflector_timeout: u64,
49        stop_session_sleep: u64,
50    ) -> Result<()> {
51        let twamp_control =
52            TcpStream::connect(SocketAddrV4::new(responder_addr, responder_port)).await?;
53        let udp_socket =
54            UdpSocket::bind(SocketAddrV4::new(controller_addr, controller_port)).await?;
55        controller_port = udp_socket.local_addr().unwrap().port();
56
57        let (start_session_tx, start_session_rx) = oneshot::channel::<()>();
58        let (twamp_test_complete_tx, twamp_test_complete_rx) = oneshot::channel::<()>();
59        let (reflector_port_tx, reflector_port_rx) = oneshot::channel::<u16>();
60        let control_client_handle = spawn(async move {
61            self.control_client
62                .do_twamp_control(
63                    twamp_control,
64                    start_session_tx,
65                    reflector_port_tx,
66                    responder_reflect_port,
67                    controller_port,
68                    reflector_timeout,
69                    twamp_test_complete_rx,
70                )
71                .await
72                .unwrap();
73        });
74        let reflected_pkts_vec: Arc<Mutex<Vec<(TwampTestPacketUnauthReflected, TimeStamp)>>> =
75            Arc::new(Mutex::new(Vec::new()));
76        let reflected_pkts_vec_cloned = Arc::clone(&reflected_pkts_vec);
77        let session_sender_handle = spawn(async move {
78            // Wait until we get the Accept-Session's port.
79            let final_port = reflector_port_rx.await.unwrap();
80            debug!("Received reflector port: {}", final_port);
81            udp_socket
82                .connect(SocketAddrV4::new(responder_addr, final_port))
83                .await
84                .unwrap();
85            // Wait until start-sessions is received
86            start_session_rx.await.unwrap();
87            debug!("Start-Session identified. Start Session-Sender.");
88            self.session_sender = Some(Arc::new(
89                SessionSender::new(
90                    Arc::new(udp_socket),
91                    SocketAddrV4::new(responder_addr, final_port),
92                )
93                .await,
94            ));
95            let session_sender_send = Arc::clone(self.session_sender.as_ref().unwrap());
96            let session_sender_recv = Arc::clone(self.session_sender.as_ref().unwrap());
97            let send_task = spawn(async move {
98                let _ = session_sender_send.send_it(number_of_test_packets).await;
99                info!("Sent all test packets");
100            });
101            let recv_task = spawn(async move {
102                let _ = session_sender_recv
103                    .recv(number_of_test_packets, reflected_pkts_vec_cloned)
104                    .await;
105                info!("Got back all test packets");
106            });
107            // wait for all test pkts to be sent.
108            send_task.await.unwrap();
109
110            select! {
111                // If stop-session-sleep duration finishes before all pkts are received, drop
112                // recv task and conclude.
113                _ = sleep(Duration::from_secs(stop_session_sleep)) => (),
114                // Ignore stop-session-sleep duration if session-sender got all test pkts before
115                // duration.
116                _ = recv_task => ()
117            }
118            // Inform Control-Client to send Stop-Sessions
119            twamp_test_complete_tx.send(()).unwrap();
120        });
121        try_join!(control_client_handle, session_sender_handle).unwrap();
122        debug!("Control-Client & Session-Sender tasks completed.");
123        let acquired_vec = reflected_pkts_vec.lock().await;
124        debug!("Reflected pkts len: {}", acquired_vec.len());
125        get_metrics(&acquired_vec, number_of_test_packets as f64);
126        Ok(())
127    }
128}
129
130fn get_metrics(pkts: &Vec<(TwampTestPacketUnauthReflected, TimeStamp)>, total_sent: f64) {
131    info!("Producing metrics");
132    let received = pkts.len() as f64;
133    let total_packets_lost = total_sent - received;
134    let total_packets_sent = total_sent;
135    let packet_loss = (total_packets_lost / total_packets_sent) * 100.0;
136    info!("Packet loss: {}%", packet_loss.trunc());
137
138    // RTT
139    let mut rtt_pkts: Vec<f64> = vec![];
140    let mut sender_to_reflector: Vec<f64> = vec![];
141    let mut reflector_to_sender: Vec<f64> = vec![];
142    for pkt in pkts {
143        let t1: f64 = pkt.0.sender_timestamp.into();
144        let t2: f64 = pkt.0.receive_timestamp.into();
145        let t3: f64 = pkt.0.timestamp.into();
146        let t4: f64 = pkt.1.into();
147
148        let rtt = (t4 - t1) - (t3 - t2);
149        let one_way_delay_sent = t2 - t1;
150        let one_way_delay_recv = t4 - t3;
151        rtt_pkts.push(rtt);
152        sender_to_reflector.push(one_way_delay_sent);
153        reflector_to_sender.push(one_way_delay_recv);
154    }
155    let rtt_avg = rtt_pkts.iter().sum::<f64>() / received;
156    let sender_to_reflector_avg = sender_to_reflector.iter().sum::<f64>() / received;
157    let reflector_to_sender_avg = reflector_to_sender.iter().sum::<f64>() / received;
158    let rtt_min = rtt_pkts.iter().copied().fold(f64::INFINITY, f64::min);
159    let rtt_max = rtt_pkts.iter().copied().fold(f64::NEG_INFINITY, f64::max);
160
161    info!("RTT (MIN): {:.2}ms", (rtt_min * 1e3));
162    info!("RTT (MAX): {:.2}ms", (rtt_max * 1e3));
163    info!("RTT (AVG): {:.2}ms", (rtt_avg * 1e3));
164    info!(
165        "OWD (Sender -> Reflector) (AVG): {:.2}ms",
166        (sender_to_reflector_avg * 1e3)
167    );
168    info!(
169        "OWD (Reflector -> Sender) (AVG): {:.2}ms",
170        (reflector_to_sender_avg * 1e3)
171    );
172
173    let mut jitter = 0.0;
174    for i in 1..rtt_pkts.len() {
175        let rtt_diff = (rtt_pkts[i] - rtt_pkts[i - 1]).abs();
176        jitter = jitter + (rtt_diff - jitter) / 16.0;
177    }
178
179    info!("Jitter: {:.2}ms", jitter * 1e3)
180}