twamp_rs/control_client/
mod.rs

1use crate::twamp_control::accept::Accept;
2use crate::twamp_control::accept_session::AcceptSession;
3use crate::twamp_control::request_tw_session::RequestTwSession;
4use crate::twamp_control::security_mode::Mode;
5use crate::twamp_control::server_greeting::ServerGreeting;
6use crate::twamp_control::server_start::ServerStart;
7use crate::twamp_control::set_up_response::SetUpResponse;
8use crate::twamp_control::start_ack::StartAck;
9use crate::twamp_control::start_sessions::StartSessions;
10use crate::twamp_control::stop_sessions::StopSessions;
11use anyhow::{Result, anyhow};
12use deku::prelude::*;
13use std::mem::size_of;
14use std::net::IpAddr;
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpStream;
17use tokio::sync::oneshot;
18use tracing::*;
19
20/// Control-Client is responsible for initiating and handling TWAMP-Control with a Server.
21///
22/// Responsibilites of Control-Client on TWAMP-Control are:
23/// -   [Read Server Greeting](Self::read_server_greeting)
24/// -   [Send Set-Up-Response](Self::send_set_up_response)
25/// -   [Read Server-Start](Self::read_server_start)
26/// -   [Send Request-TW-Session](Self::send_request_tw_session)
27#[derive(Debug)]
28pub struct ControlClient {
29    /// TCP stream on which TWAMP-Control is being used.
30    pub stream: Option<TcpStream>,
31}
32
33impl ControlClient {
34    pub fn new() -> Self {
35        Self { stream: None }
36    }
37    /// Initiates TCP connection and starts the [TWAMP-Control](twamp_control) protocol with
38    /// Server, handling communication until the test ends or connection is killed/stopped.
39    pub async fn do_twamp_control(
40        &mut self,
41        twamp_control: TcpStream,
42        start_session_tx: oneshot::Sender<()>,
43        reflector_port_tx: oneshot::Sender<u16>,
44        responder_reflect_port: u16,
45        controller_port: u16,
46        reflector_timeout: u64,
47        twamp_test_complete_rx: oneshot::Receiver<()>,
48    ) -> Result<()> {
49        self.stream = Some(twamp_control);
50        self.read_server_greeting().await?;
51        self.send_set_up_response().await?;
52        self.read_server_start().await?;
53        self.send_request_tw_session(responder_reflect_port, controller_port, reflector_timeout)
54            .await?;
55        let accept_session = self.read_accept_session().await?;
56        if accept_session.accept != Accept::Ok {
57            return Err(anyhow!("Did not receive Ok in Accept-Session"));
58        };
59
60        debug!("Responder provided port: {}", accept_session.port);
61        reflector_port_tx.send(accept_session.port).unwrap();
62        self.send_start_sessions().await?;
63        let start_ack = self.read_start_ack().await?;
64        if start_ack.accept != Accept::Ok {
65            return Err(anyhow!("Start-Ack should be zero"));
66        }
67        start_session_tx.send(()).unwrap();
68        // testing
69        debug!(
70            "Waiting for Session-Sender to complete, Control-Client will then send Stop-Sessions."
71        );
72        let _ = twamp_test_complete_rx.await;
73        debug!("Received confirmation that TWAMP-Test is complete. Sending Stop-Sessions");
74        self.send_stop_sessions().await?;
75        Ok(())
76    }
77
78    /// Reads from TWAMP-Control stream assuming the bytes to be received will be of a
79    /// `ServerGreeting`. Converts those bytes into a `ServerGreeting` struct and returns it.
80    pub async fn read_server_greeting(&mut self) -> Result<ServerGreeting> {
81        let mut buf = [0; size_of::<ServerGreeting>()];
82        info!("Reading ServerGreeting");
83        self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
84        let (_rest, server_greeting) = ServerGreeting::from_bytes((&buf, 0)).unwrap();
85        debug!("Server greeting: {:?}", server_greeting);
86        info!("Done reading ServerGreeting");
87        Ok(server_greeting)
88    }
89
90    /// Creates a `SetUpResponse`, converts to bytes and sends it out on `TWAMP-Control`.
91    pub async fn send_set_up_response(&mut self) -> Result<()> {
92        info!("Preparing to send Set-Up-Response");
93        let set_up_response = SetUpResponse::new(Mode::Unauthenticated);
94        debug!("Set-Up-Response: {:?}", set_up_response);
95        let encoded = set_up_response.unwrap().to_bytes().unwrap();
96        self.stream
97            .as_mut()
98            .unwrap()
99            .write_all(&encoded[..])
100            .await?;
101        info!("Set-Up-Response sent");
102        Ok(())
103    }
104
105    /// Reads from `TWAMP-Control` stream assuming the bytes to be received will be of a
106    /// `ServerStart`. Converts those bytes into a `ServerStart` struct and returns it.
107    pub async fn read_server_start(&mut self) -> Result<ServerStart> {
108        let mut buf = [0; size_of::<ServerStart>()];
109        info!("Reading Server-Start");
110        self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
111        let (_rest, server_start) = ServerStart::from_bytes((&buf, 0)).unwrap();
112        debug!("Server-Start: {:?}", server_start);
113        info!("Done reading Server-Start");
114        Ok(server_start)
115    }
116
117    /// Creates a `Request-Tw-Session`, converts to bytes and sends it out on `TWAMP-Control`.
118    pub async fn send_request_tw_session(
119        &mut self,
120        session_reflector_port: u16,
121        controller_port: u16,
122        timeout: u64,
123    ) -> Result<RequestTwSession> {
124        info!("Preparing to send Request-TW-Session");
125        let stream = self.stream.as_ref().unwrap();
126        let sender_address = match stream.local_addr().unwrap().ip() {
127            IpAddr::V4(ip) => ip,
128            IpAddr::V6(ip) => panic!("da hail did v6 come from: {ip}"),
129        };
130        let receiver_address = match stream.peer_addr().unwrap().ip() {
131            IpAddr::V4(ip) => ip,
132            IpAddr::V6(ip) => panic!("da hail did v6 come from: {ip}"),
133        };
134        debug!(
135            "Request-TW-Session reflector port: {}",
136            session_reflector_port
137        );
138        let request_tw_session = RequestTwSession::new(
139            sender_address,
140            controller_port,
141            receiver_address,
142            session_reflector_port,
143            None,
144            timeout,
145        );
146        debug!("request-tw-session: {:?}", request_tw_session);
147        let encoded = request_tw_session.to_bytes().unwrap();
148        self.stream
149            .as_mut()
150            .unwrap()
151            .write_all(&encoded[..])
152            .await?;
153        info!("Request-TW-Session sent");
154        Ok(request_tw_session)
155    }
156
157    /// Reads from `TWAMP-Control` stream assuming the bytes to be received will be of a
158    /// `AcceptSession`. Converts those bytes into a `AcceptSession` struct and returns it.
159    pub async fn read_accept_session(&mut self) -> Result<AcceptSession> {
160        let mut buf = [0; size_of::<AcceptSession>()];
161        info!("Reading Accept-Session");
162        self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
163        let (_rest, accept_session) = AcceptSession::from_bytes((&buf, 0)).unwrap();
164        debug!("Accept-Session: {:?}", accept_session);
165        info!("Read Accept-Session");
166
167        Ok(accept_session)
168    }
169
170    /// Creates a `Start-Sessions`, converts to bytes and sends it out on `TWAMP-Control`.
171    pub async fn send_start_sessions(&mut self) -> Result<()> {
172        info!("Preparing to send Start-Sessions");
173        let start_sessions = StartSessions::new();
174        debug!("Start-Sessions: {:?}", start_sessions);
175        let encoded = start_sessions.to_bytes().unwrap();
176        self.stream
177            .as_mut()
178            .unwrap()
179            .write_all(&encoded[..])
180            .await?;
181        info!("Start-Sessions sent");
182        Ok(())
183    }
184
185    /// Reads from `TWAMP-Control` stream assuming the bytes to be received will be of a
186    /// `Start-Ack`. Converts those bytes into a `Start-Ack` struct and returns it.
187    pub async fn read_start_ack(&mut self) -> Result<StartAck> {
188        let mut buf = [0; size_of::<StartAck>()];
189        info!("Reading Start-Ack");
190        self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
191        let (_rest, start_ack) = StartAck::from_bytes((&buf, 0)).unwrap();
192        debug!("Start-Ack: {:?}", start_ack);
193        info!("Done reading Start-Ack");
194        Ok(start_ack)
195    }
196
197    /// Creates a `Stop-Sessions`, converts to bytes and sends it out on `TWAMP-Control`.
198    pub async fn send_stop_sessions(&mut self) -> Result<()> {
199        info!("Preparing to send Stop-Sessions");
200        let stop_sessions = StopSessions::new(Accept::Ok);
201        debug!("Stop-Sessions: {:?}", stop_sessions);
202        let encoded = stop_sessions.to_bytes().unwrap();
203        self.stream
204            .as_mut()
205            .unwrap()
206            .write_all(&encoded[..])
207            .await?;
208        info!("Stop-Sessions sent");
209        Ok(())
210    }
211}
212
213impl Default for ControlClient {
214    /// Construct an empty `ControlClient` with no context.
215    fn default() -> Self {
216        ControlClient { stream: None }
217    }
218}