1use std::{
2 thread::{self, JoinHandle},
3 time::{Duration, Instant},
4};
5
6use crate::{
7 config::{ClientConfig, RecvBudget},
8 error::ClientError,
9 event::{ClientEvent, OpenOutcome},
10 timing::ClientTimestamp,
11 Client,
12};
13
14use super::{
15 cancellation::CancellationToken,
16 hub::{EventHub, EventSubscription, SubscriberConfig},
17};
18
19const MANAGED_RECV_TIMEOUT: Duration = Duration::from_millis(20);
20const MANAGED_RECV_BUDGET: RecvBudget = RecvBudget { max_packets: 64 };
21const MANAGED_FINAL_DRAIN: Duration = Duration::from_millis(100);
22const IDLE_SLEEP: Duration = Duration::from_millis(1);
23const MAX_SLEEP: Duration = Duration::from_millis(20);
24
25#[derive(Debug)]
26pub struct ManagedClient;
27
28#[must_use = "dropping the session cancels the managed client; call join() to wait for completion"]
35#[derive(Debug)]
36pub struct ManagedClientSession {
37 hub: EventHub,
38 cancellation: CancellationToken,
39 worker: Option<JoinHandle<Result<SessionOutcome, ClientError>>>,
40}
41
42#[must_use = "managed session outcomes contain completion status and counters"]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct SessionOutcome {
50 pub end_reason: SessionEndReason,
51 pub packets_sent: u64,
52 pub replies_received: u64,
53 pub duplicates: u64,
54 pub late: u64,
55 pub warning_events: u64,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SessionEndReason {
60 TestComplete,
61 Cancelled,
62 NoTestComplete,
63}
64
65impl ManagedClient {
66 pub fn start(config: ClientConfig) -> Result<ManagedClientSession, ClientError> {
67 Self::start_inner(config, None).map(|(session, _)| session)
68 }
69
70 pub fn start_with_subscription(
71 config: ClientConfig,
72 subscriber_config: SubscriberConfig,
73 ) -> Result<(ManagedClientSession, EventSubscription), ClientError> {
74 let (session, subscription) = Self::start_inner(config, Some(subscriber_config))?;
75 Ok((
76 session,
77 subscription.expect("initial subscription must be present"),
78 ))
79 }
80
81 fn start_inner(
82 mut config: ClientConfig,
83 subscriber_config: Option<SubscriberConfig>,
84 ) -> Result<(ManagedClientSession, Option<EventSubscription>), ClientError> {
85 if config.socket_config.recv_timeout.is_none()
86 || config.socket_config.recv_timeout > Some(MANAGED_RECV_TIMEOUT)
87 {
88 config.socket_config.recv_timeout = Some(MANAGED_RECV_TIMEOUT);
89 }
90
91 let hub = EventHub::new();
92 let initial_subscription = subscriber_config
93 .map(|config| hub.subscribe(config))
94 .transpose()?;
95
96 let mut client = Client::connect(config)?;
97 let outcome = client.open(ClientTimestamp::now())?;
98 publish_open_outcome(&hub, &outcome);
99
100 let cancellation = CancellationToken::new();
101 let worker_hub = hub.clone();
102 let worker_cancellation = cancellation.clone();
103 let worker =
104 thread::spawn(move || run_client_with_cleanup(client, worker_hub, worker_cancellation));
105
106 Ok((
107 ManagedClientSession {
108 hub,
109 cancellation,
110 worker: Some(worker),
111 },
112 initial_subscription,
113 ))
114 }
115}
116
117impl ManagedClientSession {
118 pub fn subscribe(&self, config: SubscriberConfig) -> Result<EventSubscription, ClientError> {
119 self.hub.subscribe(config)
120 }
121
122 pub fn stop(&self) {
123 self.cancellation.cancel();
124 }
125
126 pub fn join(mut self) -> Result<SessionOutcome, ClientError> {
127 let worker = self
128 .worker
129 .take()
130 .expect("ManagedClientSession invariant violated: worker handle missing before join");
131 match worker.join() {
132 Ok(outcome) => {
133 self.hub.disconnect_all();
134 outcome
135 }
136 Err(_) => {
137 self.hub.disconnect_all();
138 Err(ClientError::WorkerPanicked)
139 }
140 }
141 }
142}
143
144impl Drop for ManagedClientSession {
145 fn drop(&mut self) {
146 self.cancellation.cancel();
147 }
148}
149
150fn publish_open_outcome(hub: &EventHub, outcome: &OpenOutcome) {
151 match outcome {
152 OpenOutcome::Started { event, .. } | OpenOutcome::NoTestCompleted { event, .. } => {
153 hub.publish(event.clone());
154 }
155 }
156}
157
158fn run_client(
159 mut client: Client,
160 hub: EventHub,
161 cancellation: CancellationToken,
162) -> Result<SessionOutcome, ClientError> {
163 if client.is_run_complete() {
164 return Ok(SessionOutcome {
165 end_reason: SessionEndReason::NoTestComplete,
166 packets_sent: 0,
167 replies_received: 0,
168 duplicates: 0,
169 late: 0,
170 warning_events: 0,
171 });
172 }
173
174 let mut counters = OutcomeCounters::default();
175 let mut cancelled = false;
176
177 loop {
178 if cancellation.is_cancelled() {
179 cancelled = true;
180 publish_events(
181 &hub,
182 &mut counters,
183 client.recv_available(MANAGED_RECV_BUDGET)?,
184 );
185 publish_events(
186 &hub,
187 &mut counters,
188 client.poll_timeouts(ClientTimestamp::now())?,
189 );
190 break;
191 }
192
193 let now = Instant::now();
194 if client
195 .next_send_deadline()
196 .is_some_and(|deadline| deadline <= now)
197 {
198 let events = client.send_probe()?;
199 publish_events(&hub, &mut counters, events);
200 continue;
201 }
202
203 publish_events(
204 &hub,
205 &mut counters,
206 client.recv_available(MANAGED_RECV_BUDGET)?,
207 );
208 publish_events(
209 &hub,
210 &mut counters,
211 client.poll_timeouts(ClientTimestamp::now())?,
212 );
213
214 if client.is_run_complete() {
215 break;
216 }
217
218 sleep_until_next_wakeup(client.next_send_deadline());
219 }
220
221 if !cancelled {
222 drain_final_late_replies(&mut client, &hub, &mut counters)?;
223 }
224
225 let packets_sent = client.packets_sent();
226 let close_events = client.close(ClientTimestamp::now())?;
227 publish_events(&hub, &mut counters, close_events);
228
229 Ok(SessionOutcome {
230 end_reason: if cancelled {
231 SessionEndReason::Cancelled
232 } else {
233 SessionEndReason::TestComplete
234 },
235 packets_sent,
236 replies_received: counters.replies_received,
237 duplicates: counters.duplicates,
238 late: counters.late,
239 warning_events: counters.warning_events,
240 })
241}
242
243fn run_client_with_cleanup(
244 client: Client,
245 hub: EventHub,
246 cancellation: CancellationToken,
247) -> Result<SessionOutcome, ClientError> {
248 let outcome = run_client(client, hub.clone(), cancellation);
249 hub.disconnect_all();
250 outcome
251}
252
253fn drain_final_late_replies(
254 client: &mut Client,
255 hub: &EventHub,
256 counters: &mut OutcomeCounters,
257) -> Result<(), ClientError> {
258 if !client.has_timed_out_metadata() {
259 return Ok(());
260 }
261
262 let deadline = Instant::now() + MANAGED_FINAL_DRAIN;
263 while Instant::now() < deadline && client.has_timed_out_metadata() {
264 let mut published = false;
265
266 let events = client.recv_available(MANAGED_RECV_BUDGET)?;
267 published |= !events.is_empty();
268 publish_events(hub, counters, events);
269
270 let events = client.poll_timeouts(ClientTimestamp::now())?;
271 published |= !events.is_empty();
272 publish_events(hub, counters, events);
273
274 if !published {
275 thread::sleep(IDLE_SLEEP);
276 }
277 }
278 Ok(())
279}
280
281fn publish_events(hub: &EventHub, counters: &mut OutcomeCounters, events: Vec<ClientEvent>) {
282 for event in events {
283 counters.observe(&event);
284 hub.publish(event);
285 }
286}
287
288fn sleep_until_next_wakeup(deadline: Option<Instant>) {
289 let sleep_for = deadline
290 .and_then(|deadline| deadline.checked_duration_since(Instant::now()))
291 .map(|duration| duration.min(MAX_SLEEP))
292 .unwrap_or(IDLE_SLEEP);
293 if sleep_for > Duration::ZERO {
294 thread::sleep(sleep_for);
295 }
296}
297
298#[derive(Debug, Default)]
299struct OutcomeCounters {
300 replies_received: u64,
301 duplicates: u64,
302 late: u64,
303 warning_events: u64,
304}
305
306impl OutcomeCounters {
307 fn observe(&mut self, event: &ClientEvent) {
308 match event {
309 ClientEvent::EchoReply { .. } => self.replies_received += 1,
310 ClientEvent::DuplicateReply { .. } => self.duplicates += 1,
311 ClientEvent::LateReply { .. } => self.late += 1,
312 ClientEvent::Warning { .. } => self.warning_events += 1,
313 _ => {}
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::{config::NegotiationPolicy, SubscriberOverflow};
322 use irtt_proto::{
323 echo_packet_len, flags, flags::FLAG_OPEN, flags::FLAG_REPLY, layout::PacketLayout, Clock,
324 Params, ReceivedStats, StampAt, TimestampFields, MAGIC, PROTOCOL_VERSION,
325 };
326 use std::{
327 net::{SocketAddr, UdpSocket},
328 sync::mpsc,
329 };
330
331 const TOKEN: u64 = 0x1234_5678_90ab_cdef;
332 struct FakeServer {
333 addr: SocketAddr,
334 done: JoinHandle<()>,
335 }
336
337 impl FakeServer {
338 fn join(self) {
339 self.done.join().unwrap();
340 }
341 }
342
343 fn test_params(duration: Option<Duration>, interval: Duration) -> Params {
344 Params {
345 protocol_version: PROTOCOL_VERSION,
346 duration_ns: duration.map_or(0, test_duration_ns_i64),
347 interval_ns: test_duration_ns_i64(interval),
348 length: 0,
349 received_stats: ReceivedStats::Both,
350 stamp_at: StampAt::Both,
351 clock: Clock::Both,
352 dscp: 0,
353 server_fill: None,
354 }
355 }
356
357 fn test_duration_ns_i64(duration: Duration) -> i64 {
358 i64::try_from(duration.as_nanos()).expect("test duration fits i64 nanoseconds")
359 }
360
361 fn config(addr: SocketAddr, duration: Option<Duration>) -> ClientConfig {
362 ClientConfig {
363 server_addr: addr.to_string(),
364 duration,
365 interval: Duration::from_millis(10),
366 negotiation_policy: NegotiationPolicy::Strict,
367 open_timeouts: vec![Duration::from_millis(200)],
368 probe_timeout: Duration::from_millis(40),
369 ..ClientConfig::default()
370 }
371 }
372
373 fn start_echo_server(params: Params) -> FakeServer {
374 let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
375 let addr = socket.local_addr().unwrap();
376 let done = thread::spawn(move || {
377 let (_, peer) = recv_request(&socket);
378 socket
379 .send_to(&open_reply(FLAG_OPEN | FLAG_REPLY, TOKEN, ¶ms), peer)
380 .unwrap();
381 socket
382 .set_read_timeout(Some(Duration::from_millis(500)))
383 .unwrap();
384
385 while let Some((packet, peer)) = recv_request_timeout(&socket) {
386 if packet[3] & flags::FLAG_CLOSE != 0 {
387 break;
388 }
389
390 let seq = u32::from_le_bytes(packet[12..16].try_into().unwrap());
391 let ts = TimestampFields {
392 recv_wall: Some(1_000_000_000),
393 recv_mono: Some(100_000),
394 send_wall: Some(1_000_000_000),
395 send_mono: Some(100_000),
396 ..Default::default()
397 };
398 socket
399 .send_to(&echo_reply_packet(TOKEN, seq, ¶ms, &ts), peer)
400 .unwrap();
401 }
402 });
403 FakeServer { addr, done }
404 }
405
406 fn start_delayed_reply_server(params: Params, delay: Duration) -> FakeServer {
407 let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
408 let addr = socket.local_addr().unwrap();
409 let done = thread::spawn(move || {
410 let (_, peer) = recv_request(&socket);
411 socket
412 .send_to(&open_reply(FLAG_OPEN | FLAG_REPLY, TOKEN, ¶ms), peer)
413 .unwrap();
414 socket
415 .set_read_timeout(Some(Duration::from_millis(500)))
416 .unwrap();
417
418 let Some((packet, peer)) = recv_request_timeout(&socket) else {
419 return;
420 };
421 let seq = u32::from_le_bytes(packet[12..16].try_into().unwrap());
422 thread::sleep(delay);
423 let ts = TimestampFields {
424 recv_wall: Some(1_000_000_000),
425 recv_mono: Some(100_000),
426 send_wall: Some(1_001_000_000),
427 send_mono: Some(1_100_000),
428 ..Default::default()
429 };
430 socket
431 .send_to(&echo_reply_packet(TOKEN, seq, ¶ms, &ts), peer)
432 .unwrap();
433
434 while let Some((packet, _)) = recv_request_timeout(&socket) {
435 if packet[3] & flags::FLAG_CLOSE != 0 {
436 break;
437 }
438 }
439 });
440 FakeServer { addr, done }
441 }
442
443 fn recv_request(socket: &UdpSocket) -> (Vec<u8>, SocketAddr) {
444 let mut buf = [0_u8; 2048];
445 let (size, peer) = socket.recv_from(&mut buf).unwrap();
446 (buf[..size].to_vec(), peer)
447 }
448
449 fn recv_request_timeout(socket: &UdpSocket) -> Option<(Vec<u8>, SocketAddr)> {
450 let mut buf = [0_u8; 2048];
451 socket
452 .recv_from(&mut buf)
453 .ok()
454 .map(|(size, peer)| (buf[..size].to_vec(), peer))
455 }
456
457 fn open_reply(flags: u8, token: u64, params: &Params) -> Vec<u8> {
458 let mut packet = Vec::new();
459 packet.extend_from_slice(&MAGIC);
460 packet.push(flags);
461 packet.extend_from_slice(&token.to_le_bytes());
462 packet.extend_from_slice(¶ms.encode());
463 packet
464 }
465
466 fn echo_reply_packet(
467 token: u64,
468 seq: u32,
469 params: &Params,
470 timestamps: &TimestampFields,
471 ) -> Vec<u8> {
472 let layout = PacketLayout::echo(false, params);
473 let packet_len = echo_packet_len(false, params);
474 let mut packet = Vec::with_capacity(packet_len);
475
476 packet.extend_from_slice(&MAGIC);
477 packet.push(FLAG_REPLY);
478 packet.extend_from_slice(&token.to_le_bytes());
479 packet.extend_from_slice(&seq.to_le_bytes());
480
481 if layout.recv_count {
482 packet.extend_from_slice(&42_u32.to_le_bytes());
483 }
484 if layout.recv_window {
485 packet.extend_from_slice(&0x07_u64.to_le_bytes());
486 }
487 if layout.recv_wall {
488 packet.extend_from_slice(×tamps.recv_wall.unwrap_or(0).to_le_bytes());
489 }
490 if layout.recv_mono {
491 packet.extend_from_slice(×tamps.recv_mono.unwrap_or(0).to_le_bytes());
492 }
493 if layout.midpoint_wall {
494 packet.extend_from_slice(×tamps.midpoint_wall.unwrap_or(0).to_le_bytes());
495 }
496 if layout.midpoint_mono {
497 packet.extend_from_slice(×tamps.midpoint_mono.unwrap_or(0).to_le_bytes());
498 }
499 if layout.send_wall {
500 packet.extend_from_slice(×tamps.send_wall.unwrap_or(0).to_le_bytes());
501 }
502 if layout.send_mono {
503 packet.extend_from_slice(×tamps.send_mono.unwrap_or(0).to_le_bytes());
504 }
505
506 packet.resize(packet_len, 0);
507 packet
508 }
509
510 fn recv_event_with_timeout(sub: &EventSubscription) -> ClientEvent {
511 let deadline = Instant::now() + Duration::from_secs(2);
512 loop {
513 match sub.try_recv() {
514 Ok(Some(event)) => return event,
515 Ok(None) if Instant::now() < deadline => {
516 thread::sleep(Duration::from_millis(1));
517 }
518 Ok(None) => panic!("timed out waiting for managed client event"),
519 Err(err) => panic!("subscription ended while waiting for event: {err}"),
520 }
521 }
522 }
523
524 fn collect_until_closed(sub: &EventSubscription) -> Vec<ClientEvent> {
525 let mut events = Vec::new();
526 let deadline = Instant::now() + Duration::from_secs(2);
527 while Instant::now() < deadline {
528 match sub.try_recv() {
529 Ok(Some(event)) => {
530 let closed = matches!(event, ClientEvent::SessionClosed { .. });
531 events.push(event);
532 if closed {
533 return events;
534 }
535 }
536 Ok(None) => thread::sleep(Duration::from_millis(1)),
537 Err(_) => return events,
538 }
539 }
540 panic!("timed out waiting for session close event");
541 }
542
543 #[test]
544 fn stop_is_idempotent() {
545 let server = start_echo_server(test_params(None, Duration::from_millis(10)));
546 let session = ManagedClient::start(config(server.addr, None)).unwrap();
547 session.stop();
548 session.stop();
549 let outcome = session.join().unwrap();
550 server.join();
551
552 assert_eq!(outcome.end_reason, SessionEndReason::Cancelled);
553 }
554
555 #[test]
556 fn finite_managed_run_emits_session_probe_and_close_events() {
557 let duration = Duration::from_millis(35);
558 let server = start_echo_server(test_params(Some(duration), Duration::from_millis(10)));
559 let (session, sub) = ManagedClient::start_with_subscription(
560 config(server.addr, Some(duration)),
561 SubscriberConfig {
562 capacity: 16,
563 overflow: SubscriberOverflow::DropNewest,
564 },
565 )
566 .unwrap();
567
568 let events = collect_until_closed(&sub);
569 let outcome = session.join().unwrap();
570 server.join();
571
572 assert_eq!(outcome.end_reason, SessionEndReason::TestComplete);
573 assert!(events
574 .iter()
575 .any(|event| matches!(event, ClientEvent::SessionStarted { .. })));
576 assert!(events
577 .iter()
578 .any(|event| matches!(event, ClientEvent::EchoReply { .. })));
579 assert!(events
580 .iter()
581 .any(|event| matches!(event, ClientEvent::SessionClosed { .. })));
582 }
583
584 #[test]
585 fn finite_managed_run_drains_late_reply_after_timeout_before_close() {
586 let duration = Duration::from_millis(1);
587 let params = test_params(Some(duration), Duration::from_millis(10));
588 let server = start_delayed_reply_server(params, Duration::from_millis(60));
589 let mut cfg = config(server.addr, Some(duration));
590 cfg.probe_timeout = Duration::from_millis(20);
591
592 let (session, sub) = ManagedClient::start_with_subscription(
593 cfg,
594 SubscriberConfig {
595 capacity: 16,
596 overflow: SubscriberOverflow::DropNewest,
597 },
598 )
599 .unwrap();
600
601 let events = collect_until_closed(&sub);
602 let outcome = session.join().unwrap();
603 server.join();
604
605 assert_eq!(outcome.end_reason, SessionEndReason::TestComplete);
606 assert!(events.iter().any(|event| matches!(
607 event,
608 ClientEvent::EchoLoss {
609 seq: 0,
610 logical_seq: 0,
611 ..
612 }
613 )));
614 assert!(events.iter().any(|event| matches!(
615 event,
616 ClientEvent::LateReply {
617 seq: 0,
618 logical_seq: Some(0),
619 sent_at: Some(_),
620 rtt: Some(_),
621 ..
622 }
623 )));
624 let late_before_close = events
625 .iter()
626 .position(|event| matches!(event, ClientEvent::LateReply { rtt: Some(_), .. }));
627 let close = events
628 .iter()
629 .position(|event| matches!(event, ClientEvent::SessionClosed { .. }));
630 let late_before_close = late_before_close.expect("missing stats-eligible LateReply");
631 let close = close.expect("missing SessionClosed");
632 assert!(late_before_close < close);
633 }
634
635 #[test]
636 fn continuous_managed_run_can_be_stopped_cleanly() {
637 let server = start_echo_server(test_params(None, Duration::from_millis(10)));
638 let session = ManagedClient::start(config(server.addr, None)).unwrap();
639 let sub = session
640 .subscribe(SubscriberConfig {
641 capacity: 16,
642 overflow: SubscriberOverflow::DropNewest,
643 })
644 .unwrap();
645
646 loop {
647 if matches!(recv_event_with_timeout(&sub), ClientEvent::EchoReply { .. }) {
648 break;
649 }
650 }
651 session.stop();
652 let events = collect_until_closed(&sub);
653 let outcome = session.join().unwrap();
654 server.join();
655
656 assert_eq!(outcome.end_reason, SessionEndReason::Cancelled);
657 assert!(events
658 .iter()
659 .any(|event| matches!(event, ClientEvent::SessionClosed { .. })));
660 }
661
662 #[test]
663 fn join_reports_worker_panic() {
664 let hub = EventHub::new();
665 let cancellation = CancellationToken::new();
666 let worker = thread::spawn(|| -> Result<SessionOutcome, ClientError> {
667 panic!("intentional managed worker panic")
668 });
669 let session = ManagedClientSession {
670 hub,
671 cancellation,
672 worker: Some(worker),
673 };
674
675 assert!(matches!(session.join(), Err(ClientError::WorkerPanicked)));
676 }
677
678 #[test]
679 fn no_test_managed_run_returns_no_test_outcome() {
680 use crate::RunMode;
681
682 let params = test_params(Some(Duration::from_millis(10)), Duration::from_millis(10));
683 let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
684 let addr = socket.local_addr().unwrap();
685 let (tx, rx) = mpsc::channel();
686 let done = thread::spawn(move || {
687 let (request, peer) = recv_request(&socket);
688 tx.send(request[3] & flags::FLAG_CLOSE != 0).unwrap();
689 socket
690 .send_to(
691 &open_reply(FLAG_OPEN | FLAG_REPLY | flags::FLAG_CLOSE, 0, ¶ms),
692 peer,
693 )
694 .unwrap();
695 });
696
697 let mut cfg = config(addr, Some(Duration::from_millis(10)));
698 cfg.run_mode = RunMode::NoTest;
699 let session = ManagedClient::start(cfg).unwrap();
700 assert!(rx.recv_timeout(Duration::from_secs(2)).unwrap());
701 let outcome = session.join().unwrap();
702 done.join().unwrap();
703
704 assert_eq!(outcome.end_reason, SessionEndReason::NoTestComplete);
705 }
706}