Skip to main content

irtt_client/managed/
runner.rs

1use std::{
2    thread::{self, JoinHandle},
3    time::{Duration, Instant},
4};
5
6use crate::{
7    config::{ClientConfig, RecvBudget},
8    error::ClientError,
9    event::{ClientEvent, OpenOutcome},
10    timing::ClientTimestamp,
11    Client,
12};
13
14use super::{
15    cancellation::CancellationToken,
16    hub::{EventHub, EventSubscription, SubscriberConfig},
17};
18
19const MANAGED_RECV_TIMEOUT: Duration = Duration::from_millis(20);
20const MANAGED_RECV_BUDGET: RecvBudget = RecvBudget { max_packets: 64 };
21const MANAGED_FINAL_DRAIN: Duration = Duration::from_millis(100);
22const IDLE_SLEEP: Duration = Duration::from_millis(1);
23const MAX_SLEEP: Duration = Duration::from_millis(20);
24
25#[derive(Debug)]
26pub struct ManagedClient;
27
28/// Running managed client session.
29///
30/// [`ManagedClientSession::join`] waits for the worker and returns the session
31/// outcome or client error. Dropping the session requests cooperative
32/// cancellation; callers that need the outcome should explicitly join instead
33/// of relying on drop.
34#[must_use = "dropping the session cancels the managed client; call join() to wait for completion"]
35#[derive(Debug)]
36pub struct ManagedClientSession {
37    hub: EventHub,
38    cancellation: CancellationToken,
39    worker: Option<JoinHandle<Result<SessionOutcome, ClientError>>>,
40}
41
42/// Outcome returned by a completed managed client session.
43///
44/// These are runner-level lifecycle counters, not statistical summaries. Use
45/// `irtt-stats` with emitted `ClientEvent` values for RTT, loss, IPDV, and
46/// related summaries.
47#[must_use = "managed session outcomes contain completion status and counters"]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct SessionOutcome {
50    pub end_reason: SessionEndReason,
51    pub packets_sent: u64,
52    pub replies_received: u64,
53    pub duplicates: u64,
54    pub late: u64,
55    pub warning_events: u64,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SessionEndReason {
60    TestComplete,
61    Cancelled,
62    NoTestComplete,
63}
64
65impl ManagedClient {
66    pub fn start(config: ClientConfig) -> Result<ManagedClientSession, ClientError> {
67        Self::start_inner(config, None).map(|(session, _)| session)
68    }
69
70    pub fn start_with_subscription(
71        config: ClientConfig,
72        subscriber_config: SubscriberConfig,
73    ) -> Result<(ManagedClientSession, EventSubscription), ClientError> {
74        let (session, subscription) = Self::start_inner(config, Some(subscriber_config))?;
75        Ok((
76            session,
77            subscription.expect("initial subscription must be present"),
78        ))
79    }
80
81    fn start_inner(
82        mut config: ClientConfig,
83        subscriber_config: Option<SubscriberConfig>,
84    ) -> Result<(ManagedClientSession, Option<EventSubscription>), ClientError> {
85        if config.socket_config.recv_timeout.is_none()
86            || config.socket_config.recv_timeout > Some(MANAGED_RECV_TIMEOUT)
87        {
88            config.socket_config.recv_timeout = Some(MANAGED_RECV_TIMEOUT);
89        }
90
91        let hub = EventHub::new();
92        let initial_subscription = subscriber_config
93            .map(|config| hub.subscribe(config))
94            .transpose()?;
95
96        let mut client = Client::connect(config)?;
97        let outcome = client.open(ClientTimestamp::now())?;
98        publish_open_outcome(&hub, &outcome);
99
100        let cancellation = CancellationToken::new();
101        let worker_hub = hub.clone();
102        let worker_cancellation = cancellation.clone();
103        let worker =
104            thread::spawn(move || run_client_with_cleanup(client, worker_hub, worker_cancellation));
105
106        Ok((
107            ManagedClientSession {
108                hub,
109                cancellation,
110                worker: Some(worker),
111            },
112            initial_subscription,
113        ))
114    }
115}
116
117impl ManagedClientSession {
118    pub fn subscribe(&self, config: SubscriberConfig) -> Result<EventSubscription, ClientError> {
119        self.hub.subscribe(config)
120    }
121
122    pub fn stop(&self) {
123        self.cancellation.cancel();
124    }
125
126    pub fn join(mut self) -> Result<SessionOutcome, ClientError> {
127        let worker = self
128            .worker
129            .take()
130            .expect("ManagedClientSession invariant violated: worker handle missing before join");
131        match worker.join() {
132            Ok(outcome) => {
133                self.hub.disconnect_all();
134                outcome
135            }
136            Err(_) => {
137                self.hub.disconnect_all();
138                Err(ClientError::WorkerPanicked)
139            }
140        }
141    }
142}
143
144impl Drop for ManagedClientSession {
145    fn drop(&mut self) {
146        self.cancellation.cancel();
147    }
148}
149
150fn publish_open_outcome(hub: &EventHub, outcome: &OpenOutcome) {
151    match outcome {
152        OpenOutcome::Started { event, .. } | OpenOutcome::NoTestCompleted { event, .. } => {
153            hub.publish(event.clone());
154        }
155    }
156}
157
158fn run_client(
159    mut client: Client,
160    hub: EventHub,
161    cancellation: CancellationToken,
162) -> Result<SessionOutcome, ClientError> {
163    if client.is_run_complete() {
164        return Ok(SessionOutcome {
165            end_reason: SessionEndReason::NoTestComplete,
166            packets_sent: 0,
167            replies_received: 0,
168            duplicates: 0,
169            late: 0,
170            warning_events: 0,
171        });
172    }
173
174    let mut counters = OutcomeCounters::default();
175    let mut cancelled = false;
176
177    loop {
178        if cancellation.is_cancelled() {
179            cancelled = true;
180            publish_events(
181                &hub,
182                &mut counters,
183                client.recv_available(MANAGED_RECV_BUDGET)?,
184            );
185            publish_events(
186                &hub,
187                &mut counters,
188                client.poll_timeouts(ClientTimestamp::now())?,
189            );
190            break;
191        }
192
193        let now = Instant::now();
194        if client
195            .next_send_deadline()
196            .is_some_and(|deadline| deadline <= now)
197        {
198            let events = client.send_probe()?;
199            publish_events(&hub, &mut counters, events);
200            continue;
201        }
202
203        publish_events(
204            &hub,
205            &mut counters,
206            client.recv_available(MANAGED_RECV_BUDGET)?,
207        );
208        publish_events(
209            &hub,
210            &mut counters,
211            client.poll_timeouts(ClientTimestamp::now())?,
212        );
213
214        if client.is_run_complete() {
215            break;
216        }
217
218        sleep_until_next_wakeup(client.next_send_deadline());
219    }
220
221    if !cancelled {
222        drain_final_late_replies(&mut client, &hub, &mut counters)?;
223    }
224
225    let packets_sent = client.packets_sent();
226    let close_events = client.close(ClientTimestamp::now())?;
227    publish_events(&hub, &mut counters, close_events);
228
229    Ok(SessionOutcome {
230        end_reason: if cancelled {
231            SessionEndReason::Cancelled
232        } else {
233            SessionEndReason::TestComplete
234        },
235        packets_sent,
236        replies_received: counters.replies_received,
237        duplicates: counters.duplicates,
238        late: counters.late,
239        warning_events: counters.warning_events,
240    })
241}
242
243fn run_client_with_cleanup(
244    client: Client,
245    hub: EventHub,
246    cancellation: CancellationToken,
247) -> Result<SessionOutcome, ClientError> {
248    let outcome = run_client(client, hub.clone(), cancellation);
249    hub.disconnect_all();
250    outcome
251}
252
253fn drain_final_late_replies(
254    client: &mut Client,
255    hub: &EventHub,
256    counters: &mut OutcomeCounters,
257) -> Result<(), ClientError> {
258    if !client.has_timed_out_metadata() {
259        return Ok(());
260    }
261
262    let deadline = Instant::now() + MANAGED_FINAL_DRAIN;
263    while Instant::now() < deadline && client.has_timed_out_metadata() {
264        let mut published = false;
265
266        let events = client.recv_available(MANAGED_RECV_BUDGET)?;
267        published |= !events.is_empty();
268        publish_events(hub, counters, events);
269
270        let events = client.poll_timeouts(ClientTimestamp::now())?;
271        published |= !events.is_empty();
272        publish_events(hub, counters, events);
273
274        if !published {
275            thread::sleep(IDLE_SLEEP);
276        }
277    }
278    Ok(())
279}
280
281fn publish_events(hub: &EventHub, counters: &mut OutcomeCounters, events: Vec<ClientEvent>) {
282    for event in events {
283        counters.observe(&event);
284        hub.publish(event);
285    }
286}
287
288fn sleep_until_next_wakeup(deadline: Option<Instant>) {
289    let sleep_for = deadline
290        .and_then(|deadline| deadline.checked_duration_since(Instant::now()))
291        .map(|duration| duration.min(MAX_SLEEP))
292        .unwrap_or(IDLE_SLEEP);
293    if sleep_for > Duration::ZERO {
294        thread::sleep(sleep_for);
295    }
296}
297
298#[derive(Debug, Default)]
299struct OutcomeCounters {
300    replies_received: u64,
301    duplicates: u64,
302    late: u64,
303    warning_events: u64,
304}
305
306impl OutcomeCounters {
307    fn observe(&mut self, event: &ClientEvent) {
308        match event {
309            ClientEvent::EchoReply { .. } => self.replies_received += 1,
310            ClientEvent::DuplicateReply { .. } => self.duplicates += 1,
311            ClientEvent::LateReply { .. } => self.late += 1,
312            ClientEvent::Warning { .. } => self.warning_events += 1,
313            _ => {}
314        }
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::{config::NegotiationPolicy, SubscriberOverflow};
322    use irtt_proto::{
323        echo_packet_len, flags, flags::FLAG_OPEN, flags::FLAG_REPLY, layout::PacketLayout, Clock,
324        Params, ReceivedStats, StampAt, TimestampFields, MAGIC, PROTOCOL_VERSION,
325    };
326    use std::{
327        net::{SocketAddr, UdpSocket},
328        sync::mpsc,
329    };
330
331    const TOKEN: u64 = 0x1234_5678_90ab_cdef;
332    struct FakeServer {
333        addr: SocketAddr,
334        done: JoinHandle<()>,
335    }
336
337    impl FakeServer {
338        fn join(self) {
339            self.done.join().unwrap();
340        }
341    }
342
343    fn test_params(duration: Option<Duration>, interval: Duration) -> Params {
344        Params {
345            protocol_version: PROTOCOL_VERSION,
346            duration_ns: duration.map_or(0, test_duration_ns_i64),
347            interval_ns: test_duration_ns_i64(interval),
348            length: 0,
349            received_stats: ReceivedStats::Both,
350            stamp_at: StampAt::Both,
351            clock: Clock::Both,
352            dscp: 0,
353            server_fill: None,
354        }
355    }
356
357    fn test_duration_ns_i64(duration: Duration) -> i64 {
358        i64::try_from(duration.as_nanos()).expect("test duration fits i64 nanoseconds")
359    }
360
361    fn config(addr: SocketAddr, duration: Option<Duration>) -> ClientConfig {
362        ClientConfig {
363            server_addr: addr.to_string(),
364            duration,
365            interval: Duration::from_millis(10),
366            negotiation_policy: NegotiationPolicy::Strict,
367            open_timeouts: vec![Duration::from_millis(200)],
368            probe_timeout: Duration::from_millis(40),
369            ..ClientConfig::default()
370        }
371    }
372
373    fn start_echo_server(params: Params) -> FakeServer {
374        let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
375        let addr = socket.local_addr().unwrap();
376        let done = thread::spawn(move || {
377            let (_, peer) = recv_request(&socket);
378            socket
379                .send_to(&open_reply(FLAG_OPEN | FLAG_REPLY, TOKEN, &params), peer)
380                .unwrap();
381            socket
382                .set_read_timeout(Some(Duration::from_millis(500)))
383                .unwrap();
384
385            while let Some((packet, peer)) = recv_request_timeout(&socket) {
386                if packet[3] & flags::FLAG_CLOSE != 0 {
387                    break;
388                }
389
390                let seq = u32::from_le_bytes(packet[12..16].try_into().unwrap());
391                let ts = TimestampFields {
392                    recv_wall: Some(1_000_000_000),
393                    recv_mono: Some(100_000),
394                    send_wall: Some(1_000_000_000),
395                    send_mono: Some(100_000),
396                    ..Default::default()
397                };
398                socket
399                    .send_to(&echo_reply_packet(TOKEN, seq, &params, &ts), peer)
400                    .unwrap();
401            }
402        });
403        FakeServer { addr, done }
404    }
405
406    fn start_delayed_reply_server(params: Params, delay: Duration) -> FakeServer {
407        let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
408        let addr = socket.local_addr().unwrap();
409        let done = thread::spawn(move || {
410            let (_, peer) = recv_request(&socket);
411            socket
412                .send_to(&open_reply(FLAG_OPEN | FLAG_REPLY, TOKEN, &params), peer)
413                .unwrap();
414            socket
415                .set_read_timeout(Some(Duration::from_millis(500)))
416                .unwrap();
417
418            let Some((packet, peer)) = recv_request_timeout(&socket) else {
419                return;
420            };
421            let seq = u32::from_le_bytes(packet[12..16].try_into().unwrap());
422            thread::sleep(delay);
423            let ts = TimestampFields {
424                recv_wall: Some(1_000_000_000),
425                recv_mono: Some(100_000),
426                send_wall: Some(1_001_000_000),
427                send_mono: Some(1_100_000),
428                ..Default::default()
429            };
430            socket
431                .send_to(&echo_reply_packet(TOKEN, seq, &params, &ts), peer)
432                .unwrap();
433
434            while let Some((packet, _)) = recv_request_timeout(&socket) {
435                if packet[3] & flags::FLAG_CLOSE != 0 {
436                    break;
437                }
438            }
439        });
440        FakeServer { addr, done }
441    }
442
443    fn recv_request(socket: &UdpSocket) -> (Vec<u8>, SocketAddr) {
444        let mut buf = [0_u8; 2048];
445        let (size, peer) = socket.recv_from(&mut buf).unwrap();
446        (buf[..size].to_vec(), peer)
447    }
448
449    fn recv_request_timeout(socket: &UdpSocket) -> Option<(Vec<u8>, SocketAddr)> {
450        let mut buf = [0_u8; 2048];
451        socket
452            .recv_from(&mut buf)
453            .ok()
454            .map(|(size, peer)| (buf[..size].to_vec(), peer))
455    }
456
457    fn open_reply(flags: u8, token: u64, params: &Params) -> Vec<u8> {
458        let mut packet = Vec::new();
459        packet.extend_from_slice(&MAGIC);
460        packet.push(flags);
461        packet.extend_from_slice(&token.to_le_bytes());
462        packet.extend_from_slice(&params.encode());
463        packet
464    }
465
466    fn echo_reply_packet(
467        token: u64,
468        seq: u32,
469        params: &Params,
470        timestamps: &TimestampFields,
471    ) -> Vec<u8> {
472        let layout = PacketLayout::echo(false, params);
473        let packet_len = echo_packet_len(false, params);
474        let mut packet = Vec::with_capacity(packet_len);
475
476        packet.extend_from_slice(&MAGIC);
477        packet.push(FLAG_REPLY);
478        packet.extend_from_slice(&token.to_le_bytes());
479        packet.extend_from_slice(&seq.to_le_bytes());
480
481        if layout.recv_count {
482            packet.extend_from_slice(&42_u32.to_le_bytes());
483        }
484        if layout.recv_window {
485            packet.extend_from_slice(&0x07_u64.to_le_bytes());
486        }
487        if layout.recv_wall {
488            packet.extend_from_slice(&timestamps.recv_wall.unwrap_or(0).to_le_bytes());
489        }
490        if layout.recv_mono {
491            packet.extend_from_slice(&timestamps.recv_mono.unwrap_or(0).to_le_bytes());
492        }
493        if layout.midpoint_wall {
494            packet.extend_from_slice(&timestamps.midpoint_wall.unwrap_or(0).to_le_bytes());
495        }
496        if layout.midpoint_mono {
497            packet.extend_from_slice(&timestamps.midpoint_mono.unwrap_or(0).to_le_bytes());
498        }
499        if layout.send_wall {
500            packet.extend_from_slice(&timestamps.send_wall.unwrap_or(0).to_le_bytes());
501        }
502        if layout.send_mono {
503            packet.extend_from_slice(&timestamps.send_mono.unwrap_or(0).to_le_bytes());
504        }
505
506        packet.resize(packet_len, 0);
507        packet
508    }
509
510    fn recv_event_with_timeout(sub: &EventSubscription) -> ClientEvent {
511        let deadline = Instant::now() + Duration::from_secs(2);
512        loop {
513            match sub.try_recv() {
514                Ok(Some(event)) => return event,
515                Ok(None) if Instant::now() < deadline => {
516                    thread::sleep(Duration::from_millis(1));
517                }
518                Ok(None) => panic!("timed out waiting for managed client event"),
519                Err(err) => panic!("subscription ended while waiting for event: {err}"),
520            }
521        }
522    }
523
524    fn collect_until_closed(sub: &EventSubscription) -> Vec<ClientEvent> {
525        let mut events = Vec::new();
526        let deadline = Instant::now() + Duration::from_secs(2);
527        while Instant::now() < deadline {
528            match sub.try_recv() {
529                Ok(Some(event)) => {
530                    let closed = matches!(event, ClientEvent::SessionClosed { .. });
531                    events.push(event);
532                    if closed {
533                        return events;
534                    }
535                }
536                Ok(None) => thread::sleep(Duration::from_millis(1)),
537                Err(_) => return events,
538            }
539        }
540        panic!("timed out waiting for session close event");
541    }
542
543    #[test]
544    fn stop_is_idempotent() {
545        let server = start_echo_server(test_params(None, Duration::from_millis(10)));
546        let session = ManagedClient::start(config(server.addr, None)).unwrap();
547        session.stop();
548        session.stop();
549        let outcome = session.join().unwrap();
550        server.join();
551
552        assert_eq!(outcome.end_reason, SessionEndReason::Cancelled);
553    }
554
555    #[test]
556    fn finite_managed_run_emits_session_probe_and_close_events() {
557        let duration = Duration::from_millis(35);
558        let server = start_echo_server(test_params(Some(duration), Duration::from_millis(10)));
559        let (session, sub) = ManagedClient::start_with_subscription(
560            config(server.addr, Some(duration)),
561            SubscriberConfig {
562                capacity: 16,
563                overflow: SubscriberOverflow::DropNewest,
564            },
565        )
566        .unwrap();
567
568        let events = collect_until_closed(&sub);
569        let outcome = session.join().unwrap();
570        server.join();
571
572        assert_eq!(outcome.end_reason, SessionEndReason::TestComplete);
573        assert!(events
574            .iter()
575            .any(|event| matches!(event, ClientEvent::SessionStarted { .. })));
576        assert!(events
577            .iter()
578            .any(|event| matches!(event, ClientEvent::EchoReply { .. })));
579        assert!(events
580            .iter()
581            .any(|event| matches!(event, ClientEvent::SessionClosed { .. })));
582    }
583
584    #[test]
585    fn finite_managed_run_drains_late_reply_after_timeout_before_close() {
586        let duration = Duration::from_millis(1);
587        let params = test_params(Some(duration), Duration::from_millis(10));
588        let server = start_delayed_reply_server(params, Duration::from_millis(60));
589        let mut cfg = config(server.addr, Some(duration));
590        cfg.probe_timeout = Duration::from_millis(20);
591
592        let (session, sub) = ManagedClient::start_with_subscription(
593            cfg,
594            SubscriberConfig {
595                capacity: 16,
596                overflow: SubscriberOverflow::DropNewest,
597            },
598        )
599        .unwrap();
600
601        let events = collect_until_closed(&sub);
602        let outcome = session.join().unwrap();
603        server.join();
604
605        assert_eq!(outcome.end_reason, SessionEndReason::TestComplete);
606        assert!(events.iter().any(|event| matches!(
607            event,
608            ClientEvent::EchoLoss {
609                seq: 0,
610                logical_seq: 0,
611                ..
612            }
613        )));
614        assert!(events.iter().any(|event| matches!(
615            event,
616            ClientEvent::LateReply {
617                seq: 0,
618                logical_seq: Some(0),
619                sent_at: Some(_),
620                rtt: Some(_),
621                ..
622            }
623        )));
624        let late_before_close = events
625            .iter()
626            .position(|event| matches!(event, ClientEvent::LateReply { rtt: Some(_), .. }));
627        let close = events
628            .iter()
629            .position(|event| matches!(event, ClientEvent::SessionClosed { .. }));
630        let late_before_close = late_before_close.expect("missing stats-eligible LateReply");
631        let close = close.expect("missing SessionClosed");
632        assert!(late_before_close < close);
633    }
634
635    #[test]
636    fn continuous_managed_run_can_be_stopped_cleanly() {
637        let server = start_echo_server(test_params(None, Duration::from_millis(10)));
638        let session = ManagedClient::start(config(server.addr, None)).unwrap();
639        let sub = session
640            .subscribe(SubscriberConfig {
641                capacity: 16,
642                overflow: SubscriberOverflow::DropNewest,
643            })
644            .unwrap();
645
646        loop {
647            if matches!(recv_event_with_timeout(&sub), ClientEvent::EchoReply { .. }) {
648                break;
649            }
650        }
651        session.stop();
652        let events = collect_until_closed(&sub);
653        let outcome = session.join().unwrap();
654        server.join();
655
656        assert_eq!(outcome.end_reason, SessionEndReason::Cancelled);
657        assert!(events
658            .iter()
659            .any(|event| matches!(event, ClientEvent::SessionClosed { .. })));
660    }
661
662    #[test]
663    fn join_reports_worker_panic() {
664        let hub = EventHub::new();
665        let cancellation = CancellationToken::new();
666        let worker = thread::spawn(|| -> Result<SessionOutcome, ClientError> {
667            panic!("intentional managed worker panic")
668        });
669        let session = ManagedClientSession {
670            hub,
671            cancellation,
672            worker: Some(worker),
673        };
674
675        assert!(matches!(session.join(), Err(ClientError::WorkerPanicked)));
676    }
677
678    #[test]
679    fn no_test_managed_run_returns_no_test_outcome() {
680        use crate::RunMode;
681
682        let params = test_params(Some(Duration::from_millis(10)), Duration::from_millis(10));
683        let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
684        let addr = socket.local_addr().unwrap();
685        let (tx, rx) = mpsc::channel();
686        let done = thread::spawn(move || {
687            let (request, peer) = recv_request(&socket);
688            tx.send(request[3] & flags::FLAG_CLOSE != 0).unwrap();
689            socket
690                .send_to(
691                    &open_reply(FLAG_OPEN | FLAG_REPLY | flags::FLAG_CLOSE, 0, &params),
692                    peer,
693                )
694                .unwrap();
695        });
696
697        let mut cfg = config(addr, Some(Duration::from_millis(10)));
698        cfg.run_mode = RunMode::NoTest;
699        let session = ManagedClient::start(cfg).unwrap();
700        assert!(rx.recv_timeout(Duration::from_secs(2)).unwrap());
701        let outcome = session.join().unwrap();
702        done.join().unwrap();
703
704        assert_eq!(outcome.end_reason, SessionEndReason::NoTestComplete);
705    }
706}