pub struct ControlClient {
pub stream: Option<TcpStream>,
}Expand description
Control-Client is responsible for initiating and handling TWAMP-Control with a Server.
Responsibilites of Control-Client on TWAMP-Control are:
Fields§
§stream: Option<TcpStream>TCP stream on which TWAMP-Control is being used.
Implementations§
Source§impl ControlClient
impl ControlClient
pub fn new() -> Self
Sourcepub async fn do_twamp_control(
&mut self,
twamp_control: TcpStream,
start_session_tx: Sender<()>,
reflector_port_tx: Sender<u16>,
responder_reflect_port: u16,
controller_port: u16,
reflector_timeout: u64,
twamp_test_complete_rx: Receiver<()>,
) -> Result<()>
pub async fn do_twamp_control( &mut self, twamp_control: TcpStream, start_session_tx: Sender<()>, reflector_port_tx: Sender<u16>, responder_reflect_port: u16, controller_port: u16, reflector_timeout: u64, twamp_test_complete_rx: Receiver<()>, ) -> Result<()>
Initiates TCP connection and starts the TWAMP-Control protocol with Server, handling communication until the test ends or connection is killed/stopped.
Examples found in repository?
40 pub async fn do_twamp(
41 mut self,
42 responder_addr: Ipv4Addr,
43 responder_port: u16,
44 controller_addr: Ipv4Addr,
45 mut controller_port: u16,
46 responder_reflect_port: u16,
47 number_of_test_packets: u32,
48 reflector_timeout: u64,
49 stop_session_sleep: u64,
50 ) -> Result<()> {
51 let twamp_control =
52 TcpStream::connect(SocketAddrV4::new(responder_addr, responder_port)).await?;
53 let udp_socket =
54 UdpSocket::bind(SocketAddrV4::new(controller_addr, controller_port)).await?;
55 controller_port = udp_socket.local_addr().unwrap().port();
56
57 let (start_session_tx, start_session_rx) = oneshot::channel::<()>();
58 let (twamp_test_complete_tx, twamp_test_complete_rx) = oneshot::channel::<()>();
59 let (reflector_port_tx, reflector_port_rx) = oneshot::channel::<u16>();
60 let control_client_handle = spawn(async move {
61 self.control_client
62 .do_twamp_control(
63 twamp_control,
64 start_session_tx,
65 reflector_port_tx,
66 responder_reflect_port,
67 controller_port,
68 reflector_timeout,
69 twamp_test_complete_rx,
70 )
71 .await
72 .unwrap();
73 });
74 let reflected_pkts_vec: Arc<Mutex<Vec<(TwampTestPacketUnauthReflected, TimeStamp)>>> =
75 Arc::new(Mutex::new(Vec::new()));
76 let reflected_pkts_vec_cloned = Arc::clone(&reflected_pkts_vec);
77 let session_sender_handle = spawn(async move {
78 // Wait until we get the Accept-Session's port.
79 let final_port = reflector_port_rx.await.unwrap();
80 debug!("Received reflector port: {}", final_port);
81 udp_socket
82 .connect(SocketAddrV4::new(responder_addr, final_port))
83 .await
84 .unwrap();
85 // Wait until start-sessions is received
86 start_session_rx.await.unwrap();
87 debug!("Start-Session identified. Start Session-Sender.");
88 self.session_sender = Some(Arc::new(
89 SessionSender::new(
90 Arc::new(udp_socket),
91 SocketAddrV4::new(responder_addr, final_port),
92 )
93 .await,
94 ));
95 let session_sender_send = Arc::clone(self.session_sender.as_ref().unwrap());
96 let session_sender_recv = Arc::clone(self.session_sender.as_ref().unwrap());
97 let send_task = spawn(async move {
98 let _ = session_sender_send.send_it(number_of_test_packets).await;
99 info!("Sent all test packets");
100 });
101 let recv_task = spawn(async move {
102 let _ = session_sender_recv
103 .recv(number_of_test_packets, reflected_pkts_vec_cloned)
104 .await;
105 info!("Got back all test packets");
106 });
107 // wait for all test pkts to be sent.
108 send_task.await.unwrap();
109
110 select! {
111 // If stop-session-sleep duration finishes before all pkts are received, drop
112 // recv task and conclude.
113 _ = sleep(Duration::from_secs(stop_session_sleep)) => (),
114 // Ignore stop-session-sleep duration if session-sender got all test pkts before
115 // duration.
116 _ = recv_task => ()
117 }
118 // Inform Control-Client to send Stop-Sessions
119 twamp_test_complete_tx.send(()).unwrap();
120 });
121 try_join!(control_client_handle, session_sender_handle).unwrap();
122 debug!("Control-Client & Session-Sender tasks completed.");
123 let acquired_vec = reflected_pkts_vec.lock().await;
124 debug!("Reflected pkts len: {}", acquired_vec.len());
125 get_metrics(&acquired_vec, number_of_test_packets as f64);
126 Ok(())
127 }Sourcepub async fn read_server_greeting(&mut self) -> Result<ServerGreeting>
pub async fn read_server_greeting(&mut self) -> Result<ServerGreeting>
Reads from TWAMP-Control stream assuming the bytes to be received will be of a
ServerGreeting. Converts those bytes into a ServerGreeting struct and returns it.
Sourcepub async fn send_set_up_response(&mut self) -> Result<()>
pub async fn send_set_up_response(&mut self) -> Result<()>
Creates a SetUpResponse, converts to bytes and sends it out on TWAMP-Control.
Sourcepub async fn read_server_start(&mut self) -> Result<ServerStart>
pub async fn read_server_start(&mut self) -> Result<ServerStart>
Reads from TWAMP-Control stream assuming the bytes to be received will be of a
ServerStart. Converts those bytes into a ServerStart struct and returns it.
Sourcepub async fn send_request_tw_session(
&mut self,
session_reflector_port: u16,
controller_port: u16,
timeout: u64,
) -> Result<RequestTwSession>
pub async fn send_request_tw_session( &mut self, session_reflector_port: u16, controller_port: u16, timeout: u64, ) -> Result<RequestTwSession>
Creates a Request-Tw-Session, converts to bytes and sends it out on TWAMP-Control.
Sourcepub async fn read_accept_session(&mut self) -> Result<AcceptSession>
pub async fn read_accept_session(&mut self) -> Result<AcceptSession>
Reads from TWAMP-Control stream assuming the bytes to be received will be of a
AcceptSession. Converts those bytes into a AcceptSession struct and returns it.
Sourcepub async fn send_start_sessions(&mut self) -> Result<()>
pub async fn send_start_sessions(&mut self) -> Result<()>
Creates a Start-Sessions, converts to bytes and sends it out on TWAMP-Control.
Sourcepub async fn read_start_ack(&mut self) -> Result<StartAck>
pub async fn read_start_ack(&mut self) -> Result<StartAck>
Reads from TWAMP-Control stream assuming the bytes to be received will be of a
Start-Ack. Converts those bytes into a Start-Ack struct and returns it.
Sourcepub async fn send_stop_sessions(&mut self) -> Result<()>
pub async fn send_stop_sessions(&mut self) -> Result<()>
Creates a Stop-Sessions, converts to bytes and sends it out on TWAMP-Control.
Trait Implementations§
Source§impl Debug for ControlClient
impl Debug for ControlClient
Auto Trait Implementations§
impl !Freeze for ControlClient
impl RefUnwindSafe for ControlClient
impl Send for ControlClient
impl Sync for ControlClient
impl Unpin for ControlClient
impl UnwindSafe for ControlClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.