twamp_rs/control_client/
mod.rs1use crate::twamp_control::accept::Accept;
2use crate::twamp_control::accept_session::AcceptSession;
3use crate::twamp_control::request_tw_session::RequestTwSession;
4use crate::twamp_control::security_mode::Mode;
5use crate::twamp_control::server_greeting::ServerGreeting;
6use crate::twamp_control::server_start::ServerStart;
7use crate::twamp_control::set_up_response::SetUpResponse;
8use crate::twamp_control::start_ack::StartAck;
9use crate::twamp_control::start_sessions::StartSessions;
10use crate::twamp_control::stop_sessions::StopSessions;
11use anyhow::{Result, anyhow};
12use deku::prelude::*;
13use std::mem::size_of;
14use std::net::IpAddr;
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpStream;
17use tokio::sync::oneshot;
18use tracing::*;
19
20#[derive(Debug)]
28pub struct ControlClient {
29 pub stream: Option<TcpStream>,
31}
32
33impl ControlClient {
34 pub fn new() -> Self {
35 Self { stream: None }
36 }
37 pub async fn do_twamp_control(
40 &mut self,
41 twamp_control: TcpStream,
42 start_session_tx: oneshot::Sender<()>,
43 reflector_port_tx: oneshot::Sender<u16>,
44 responder_reflect_port: u16,
45 controller_port: u16,
46 reflector_timeout: u64,
47 twamp_test_complete_rx: oneshot::Receiver<()>,
48 ) -> Result<()> {
49 self.stream = Some(twamp_control);
50 self.read_server_greeting().await?;
51 self.send_set_up_response().await?;
52 self.read_server_start().await?;
53 self.send_request_tw_session(responder_reflect_port, controller_port, reflector_timeout)
54 .await?;
55 let accept_session = self.read_accept_session().await?;
56 if accept_session.accept != Accept::Ok {
57 return Err(anyhow!("Did not receive Ok in Accept-Session"));
58 };
59
60 debug!("Responder provided port: {}", accept_session.port);
61 reflector_port_tx.send(accept_session.port).unwrap();
62 self.send_start_sessions().await?;
63 let start_ack = self.read_start_ack().await?;
64 if start_ack.accept != Accept::Ok {
65 return Err(anyhow!("Start-Ack should be zero"));
66 }
67 start_session_tx.send(()).unwrap();
68 debug!(
70 "Waiting for Session-Sender to complete, Control-Client will then send Stop-Sessions."
71 );
72 let _ = twamp_test_complete_rx.await;
73 debug!("Received confirmation that TWAMP-Test is complete. Sending Stop-Sessions");
74 self.send_stop_sessions().await?;
75 Ok(())
76 }
77
78 pub async fn read_server_greeting(&mut self) -> Result<ServerGreeting> {
81 let mut buf = [0; size_of::<ServerGreeting>()];
82 info!("Reading ServerGreeting");
83 self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
84 let (_rest, server_greeting) = ServerGreeting::from_bytes((&buf, 0)).unwrap();
85 debug!("Server greeting: {:?}", server_greeting);
86 info!("Done reading ServerGreeting");
87 Ok(server_greeting)
88 }
89
90 pub async fn send_set_up_response(&mut self) -> Result<()> {
92 info!("Preparing to send Set-Up-Response");
93 let set_up_response = SetUpResponse::new(Mode::Unauthenticated);
94 debug!("Set-Up-Response: {:?}", set_up_response);
95 let encoded = set_up_response.unwrap().to_bytes().unwrap();
96 self.stream
97 .as_mut()
98 .unwrap()
99 .write_all(&encoded[..])
100 .await?;
101 info!("Set-Up-Response sent");
102 Ok(())
103 }
104
105 pub async fn read_server_start(&mut self) -> Result<ServerStart> {
108 let mut buf = [0; size_of::<ServerStart>()];
109 info!("Reading Server-Start");
110 self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
111 let (_rest, server_start) = ServerStart::from_bytes((&buf, 0)).unwrap();
112 debug!("Server-Start: {:?}", server_start);
113 info!("Done reading Server-Start");
114 Ok(server_start)
115 }
116
117 pub async fn send_request_tw_session(
119 &mut self,
120 session_reflector_port: u16,
121 controller_port: u16,
122 timeout: u64,
123 ) -> Result<RequestTwSession> {
124 info!("Preparing to send Request-TW-Session");
125 let stream = self.stream.as_ref().unwrap();
126 let sender_address = match stream.local_addr().unwrap().ip() {
127 IpAddr::V4(ip) => ip,
128 IpAddr::V6(ip) => panic!("da hail did v6 come from: {ip}"),
129 };
130 let receiver_address = match stream.peer_addr().unwrap().ip() {
131 IpAddr::V4(ip) => ip,
132 IpAddr::V6(ip) => panic!("da hail did v6 come from: {ip}"),
133 };
134 debug!(
135 "Request-TW-Session reflector port: {}",
136 session_reflector_port
137 );
138 let request_tw_session = RequestTwSession::new(
139 sender_address,
140 controller_port,
141 receiver_address,
142 session_reflector_port,
143 None,
144 timeout,
145 );
146 debug!("request-tw-session: {:?}", request_tw_session);
147 let encoded = request_tw_session.to_bytes().unwrap();
148 self.stream
149 .as_mut()
150 .unwrap()
151 .write_all(&encoded[..])
152 .await?;
153 info!("Request-TW-Session sent");
154 Ok(request_tw_session)
155 }
156
157 pub async fn read_accept_session(&mut self) -> Result<AcceptSession> {
160 let mut buf = [0; size_of::<AcceptSession>()];
161 info!("Reading Accept-Session");
162 self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
163 let (_rest, accept_session) = AcceptSession::from_bytes((&buf, 0)).unwrap();
164 debug!("Accept-Session: {:?}", accept_session);
165 info!("Read Accept-Session");
166
167 Ok(accept_session)
168 }
169
170 pub async fn send_start_sessions(&mut self) -> Result<()> {
172 info!("Preparing to send Start-Sessions");
173 let start_sessions = StartSessions::new();
174 debug!("Start-Sessions: {:?}", start_sessions);
175 let encoded = start_sessions.to_bytes().unwrap();
176 self.stream
177 .as_mut()
178 .unwrap()
179 .write_all(&encoded[..])
180 .await?;
181 info!("Start-Sessions sent");
182 Ok(())
183 }
184
185 pub async fn read_start_ack(&mut self) -> Result<StartAck> {
188 let mut buf = [0; size_of::<StartAck>()];
189 info!("Reading Start-Ack");
190 self.stream.as_mut().unwrap().read_exact(&mut buf).await?;
191 let (_rest, start_ack) = StartAck::from_bytes((&buf, 0)).unwrap();
192 debug!("Start-Ack: {:?}", start_ack);
193 info!("Done reading Start-Ack");
194 Ok(start_ack)
195 }
196
197 pub async fn send_stop_sessions(&mut self) -> Result<()> {
199 info!("Preparing to send Stop-Sessions");
200 let stop_sessions = StopSessions::new(Accept::Ok);
201 debug!("Stop-Sessions: {:?}", stop_sessions);
202 let encoded = stop_sessions.to_bytes().unwrap();
203 self.stream
204 .as_mut()
205 .unwrap()
206 .write_all(&encoded[..])
207 .await?;
208 info!("Stop-Sessions sent");
209 Ok(())
210 }
211}
212
213impl Default for ControlClient {
214 fn default() -> Self {
216 ControlClient { stream: None }
217 }
218}