Skip to main content

irtt_client/
client.rs

1use std::{
2    io,
3    net::{SocketAddr, UdpSocket},
4    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
5};
6
7use irtt_proto::{
8    close::CloseRequest, decode_echo_reply, echo_packet_len, encode_close_request,
9    encode_echo_request, encode_open_request, flags, EchoReply, EchoRequest, OpenReply,
10    OpenRequest, Params, ServerFill, TimestampFields, PROTOCOL_VERSION,
11};
12
13use crate::{
14    config::{
15        ClientConfig, RecvBudget, RunMode, MAX_DSCP_CODEPOINT, MAX_SERVER_FILL_BYTES,
16        MAX_UDP_PAYLOAD_LENGTH,
17    },
18    error::ClientError,
19    event::{
20        ClientEvent, OneWayDelaySample, OpenOutcome, ReceivedStatsSample, RttSample, ServerTiming,
21        SignedDuration, WarningKind,
22    },
23    metadata::ReceiveMeta,
24    probe::{CompletedSet, PendingMap, PendingProbe, TimedOutMap},
25    receive::recv_datagram,
26    session::{validate_negotiated_params, ActiveSession, ClientPhase, NegotiatedParams},
27    socket::{connect_udp_socket, resolve_remote, validate_open_timeouts},
28    socket_options::{apply_dscp_to_socket, clear_dscp_on_socket},
29    timing::ClientTimestamp,
30};
31
32const MAX_OPEN_PACKET_SIZE: usize = 512;
33const MIN_RECV_BUFFER_SIZE: usize = 2048;
34
35#[derive(Debug)]
36pub struct Client {
37    config: ClientConfig,
38    socket: UdpSocket,
39    remote: SocketAddr,
40    requested: Params,
41    negotiated: Option<NegotiatedParams>,
42    phase: ClientPhase,
43    session: Option<ActiveSession>,
44    recv_buffer: Vec<u8>,
45}
46
47impl Client {
48    pub fn connect(config: ClientConfig) -> Result<Self, ClientError> {
49        validate_open_timeouts(&config.open_timeouts)?;
50        if config.max_pending_probes == 0 {
51            return Err(ClientError::InvalidConfig {
52                reason: "max_pending_probes must be greater than zero".to_owned(),
53            });
54        }
55        if config.probe_timeout == Duration::ZERO {
56            return Err(ClientError::InvalidConfig {
57                reason: "probe_timeout must be greater than zero".to_owned(),
58            });
59        }
60        let remote = resolve_remote(&config)?;
61        let socket = connect_udp_socket(&config.socket_config, remote)?;
62        let requested = params_from_config(&config)?;
63
64        Ok(Self {
65            config,
66            socket,
67            remote,
68            requested,
69            negotiated: None,
70            phase: ClientPhase::Connected,
71            session: None,
72            recv_buffer: vec![0_u8; MIN_RECV_BUFFER_SIZE],
73        })
74    }
75
76    pub fn open(&mut self, now: ClientTimestamp) -> Result<OpenOutcome, ClientError> {
77        match self.phase {
78            ClientPhase::Connected => {}
79            ClientPhase::Open { .. } => return Err(ClientError::AlreadyOpen),
80            ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
81            ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
82        }
83
84        let outcome = self.open_inner(now);
85        let restore = self
86            .socket
87            .set_read_timeout(self.config.socket_config.recv_timeout);
88        match (outcome, restore) {
89            (Ok(outcome), Ok(())) => Ok(outcome),
90            (Ok(_), Err(source)) => Err(ClientError::ReadTimeoutRestore { source }),
91            (Err(err), Ok(())) => Err(err),
92            (Err(err), Err(_)) => Err(err),
93        }
94    }
95
96    fn open_inner(&mut self, _now: ClientTimestamp) -> Result<OpenOutcome, ClientError> {
97        let request = OpenRequest {
98            params: self.requested.clone(),
99            close: self.config.run_mode == RunMode::NoTest,
100        };
101        let packet = encode_open_request(&request, self.config.hmac_key.as_deref())?;
102        let mut buf = [0_u8; MAX_OPEN_PACKET_SIZE];
103
104        for timeout in &self.config.open_timeouts {
105            self.socket.set_read_timeout(Some(*timeout))?;
106            self.socket.send(&packet)?;
107
108            match self.socket.recv(&mut buf) {
109                Ok(size) => {
110                    let reply = irtt_proto::decode_open_reply(
111                        &buf[..size],
112                        self.config.hmac_key.as_deref(),
113                    )?;
114                    return self.accept_open_reply(reply, ClientTimestamp::now());
115                }
116                Err(err)
117                    if matches!(
118                        err.kind(),
119                        io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
120                    ) => {}
121                Err(err) => return Err(ClientError::Socket(err)),
122            }
123        }
124
125        Err(ClientError::OpenTimeout)
126    }
127
128    pub fn close(&mut self, now: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
129        let token = match self.phase {
130            ClientPhase::Open { token } => token,
131            ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
132            ClientPhase::Connected | ClientPhase::NoTestCompleted => {
133                return Err(ClientError::NotOpen)
134            }
135        };
136
137        clear_dscp_on_socket(&self.socket, self.remote)?;
138        let packet =
139            encode_close_request(&CloseRequest { token }, self.config.hmac_key.as_deref())?;
140        self.socket.send(&packet)?;
141        self.phase = ClientPhase::Closed;
142        if let Some(session) = self.session.as_mut() {
143            session.timed_out.clear();
144        }
145        self.session = None;
146
147        Ok(vec![ClientEvent::SessionClosed {
148            remote: self.remote,
149            token,
150            at: now,
151        }])
152    }
153
154    pub fn next_send_deadline(&self) -> Option<Instant> {
155        let session = self.session.as_ref()?;
156        if session.sending_done {
157            return None;
158        }
159        Some(session.next_send_at)
160    }
161
162    pub fn probe_timeout(&self) -> Duration {
163        self.config.probe_timeout
164    }
165
166    pub fn send_probe(&mut self) -> Result<Vec<ClientEvent>, ClientError> {
167        self.send_probe_inner(None)
168    }
169
170    pub fn recv_once(&mut self) -> Result<Vec<ClientEvent>, ClientError> {
171        self.recv_once_inner(None)
172    }
173
174    fn send_probe_inner(
175        &mut self,
176        override_ts: Option<ClientTimestamp>,
177    ) -> Result<Vec<ClientEvent>, ClientError> {
178        let token = match self.phase {
179            ClientPhase::Open { token } => token,
180            ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
181            ClientPhase::Connected => return Err(ClientError::NotOpen),
182            ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
183        };
184
185        let session = self
186            .session
187            .as_mut()
188            .expect("session must exist when phase is Open");
189
190        if session.sending_done {
191            return Ok(vec![]);
192        }
193
194        let now = override_ts.unwrap_or_else(ClientTimestamp::now);
195
196        if let Some(end) = session.end_mono {
197            if now.mono >= end {
198                session.sending_done = true;
199                return Ok(vec![]);
200            }
201        }
202
203        session.pending.check_capacity()?;
204
205        let negotiated = self
206            .negotiated
207            .as_ref()
208            .expect("negotiated must exist when Open");
209
210        let wire_seq = session.next_wire_seq;
211        let logical_seq = session.next_logical_seq;
212        let scheduled_at = session.next_send_at;
213
214        let request = EchoRequest {
215            token,
216            sequence: wire_seq,
217            params: negotiated.params.clone(),
218            payload: vec![],
219        };
220        let packet = encode_echo_request(&request, self.config.hmac_key.as_deref())?;
221        let sent_at = override_ts.unwrap_or_else(ClientTimestamp::now);
222        let send_call_start = Instant::now();
223        let bytes = self.socket.send(&packet)?;
224        let send_call = send_call_start.elapsed();
225        let timer_error = instant_abs_diff(sent_at.mono, scheduled_at);
226
227        let session = self
228            .session
229            .as_mut()
230            .expect("session must exist when phase is Open");
231
232        let pending = PendingProbe {
233            logical_seq,
234            wire_seq,
235            sent_at,
236            timeout_at: sent_at
237                .mono
238                .checked_add(self.config.probe_timeout)
239                .ok_or(ClientError::DurationOverflow)?,
240        };
241        session.pending.insert(pending)?;
242
243        session.next_wire_seq = session.next_wire_seq.wrapping_add(1);
244        session.next_logical_seq =
245            session
246                .next_logical_seq
247                .checked_add(1)
248                .ok_or(ClientError::CounterOverflow {
249                    counter: "next_logical_seq",
250                })?;
251        session.packets_sent =
252            session
253                .packets_sent
254                .checked_add(1)
255                .ok_or(ClientError::CounterOverflow {
256                    counter: "packets_sent",
257                })?;
258
259        let negotiated = self
260            .negotiated
261            .as_ref()
262            .expect("negotiated must exist when Open");
263        let interval_ns = u64::try_from(negotiated.params.interval_ns)
264            .expect("validated positive negotiated interval");
265        let session = self
266            .session
267            .as_mut()
268            .expect("session must exist when phase is Open");
269        session.next_send_at =
270            next_probe_deadline(session.start_mono, interval_ns, session.packets_sent)?;
271
272        if let Some(end) = session.end_mono {
273            if session.next_send_at >= end {
274                session.sending_done = true;
275            }
276        }
277
278        Ok(vec![ClientEvent::EchoSent {
279            seq: wire_seq,
280            logical_seq,
281            remote: self.remote,
282            scheduled_at,
283            sent_at,
284            bytes,
285            send_call,
286            timer_error,
287        }])
288    }
289
290    fn recv_once_inner(
291        &mut self,
292        override_ts: Option<ClientTimestamp>,
293    ) -> Result<Vec<ClientEvent>, ClientError> {
294        match self.phase {
295            ClientPhase::Open { .. } => {}
296            ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
297            ClientPhase::Connected => return Err(ClientError::NotOpen),
298            ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
299        }
300
301        let datagram = match recv_datagram(&self.socket, &mut self.recv_buffer) {
302            Ok(datagram) => datagram,
303            Err(err)
304                if matches!(
305                    err.kind(),
306                    io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
307                ) =>
308            {
309                return Ok(vec![]);
310            }
311            Err(err) => return Err(ClientError::Socket(err)),
312        };
313
314        let now = override_ts.unwrap_or(datagram.received_at);
315        let packet_len = datagram.len;
316        let Some(reply) = self.decode_received_packet(&self.recv_buffer[..packet_len]) else {
317            return Ok(vec![ClientEvent::Warning {
318                kind: WarningKind::MalformedOrUnrelatedPacket,
319                message: "dropped malformed or unrelated packet".to_owned(),
320            }]);
321        };
322        self.process_echo_reply(reply, packet_len, now, datagram.meta)
323    }
324
325    pub fn recv_available(&mut self, budget: RecvBudget) -> Result<Vec<ClientEvent>, ClientError> {
326        let mut all_events = Vec::new();
327        for _ in 0..budget.max_packets {
328            let events = self.recv_once()?;
329            if events.is_empty() {
330                break;
331            }
332            all_events.extend(events);
333        }
334        Ok(all_events)
335    }
336
337    #[cfg(test)]
338    fn recv_buffer_size(&self) -> usize {
339        recv_buffer_size(self.config.hmac_key.is_some(), self.negotiated.as_ref())
340    }
341
342    pub fn poll_timeouts(&mut self, now: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
343        match self.phase {
344            ClientPhase::Open { .. } => {}
345            ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
346            ClientPhase::Connected => return Err(ClientError::NotOpen),
347            ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
348        }
349
350        let session = self
351            .session
352            .as_mut()
353            .expect("session must exist when phase is Open");
354
355        let expired = session.pending.drain_expired(now.mono);
356        let mut events = Vec::with_capacity(expired.len());
357        for probe in expired {
358            events.push(ClientEvent::EchoLoss {
359                seq: probe.wire_seq,
360                logical_seq: probe.logical_seq,
361                sent_at: probe.sent_at,
362                timeout_at: probe.timeout_at,
363            });
364            session.timed_out.insert(probe);
365        }
366
367        Ok(events)
368    }
369
370    pub fn is_run_complete(&self) -> bool {
371        let Some(session) = self.session.as_ref() else {
372            return matches!(
373                self.phase,
374                ClientPhase::Closed | ClientPhase::NoTestCompleted
375            );
376        };
377        session.sending_done && session.pending.len() == 0
378    }
379
380    pub(crate) fn has_timed_out_metadata(&self) -> bool {
381        self.session
382            .as_ref()
383            .is_some_and(|session| session.timed_out.len() > 0)
384    }
385
386    pub(crate) fn packets_sent(&self) -> u64 {
387        self.session
388            .as_ref()
389            .map_or(0, |session| session.packets_sent)
390    }
391
392    #[cfg(test)]
393    fn process_received_packet(
394        &mut self,
395        packet: &[u8],
396        now: ClientTimestamp,
397        meta: ReceiveMeta,
398    ) -> Result<Vec<ClientEvent>, ClientError> {
399        let packet_len = packet.len();
400        let Some(reply) = self.decode_received_packet(packet) else {
401            return Ok(vec![ClientEvent::Warning {
402                kind: WarningKind::MalformedOrUnrelatedPacket,
403                message: "dropped malformed or unrelated packet".to_owned(),
404            }]);
405        };
406        self.process_echo_reply(reply, packet_len, now, meta)
407    }
408
409    fn decode_received_packet(&self, packet: &[u8]) -> Option<EchoReply> {
410        let negotiated = self
411            .negotiated
412            .as_ref()
413            .expect("negotiated must exist when Open");
414
415        decode_echo_reply(packet, &negotiated.params, self.config.hmac_key.as_deref()).ok()
416    }
417
418    fn process_echo_reply(
419        &mut self,
420        reply: EchoReply,
421        packet_len: usize,
422        now: ClientTimestamp,
423        meta: ReceiveMeta,
424    ) -> Result<Vec<ClientEvent>, ClientError> {
425        let token = match self.phase {
426            ClientPhase::Open { token } => token,
427            _ => unreachable!(),
428        };
429        if reply.token != token {
430            return Ok(vec![ClientEvent::Warning {
431                kind: WarningKind::WrongToken,
432                message: format!(
433                    "dropped reply with wrong token: expected {token:#x}, got {:#x}",
434                    reply.token
435                ),
436            }]);
437        }
438
439        let session = self.session.as_mut().expect("session must exist when Open");
440
441        let wire_seq = reply.sequence;
442
443        if let Some(pending) = session.pending.remove(wire_seq) {
444            let rtt = compute_rtt(&pending.sent_at, &now, &reply.timestamps);
445            let server_timing = build_server_timing(&reply.timestamps);
446            let one_way = compute_one_way(&pending.sent_at, &now, &reply.timestamps);
447            let received_stats = build_received_stats(&reply);
448            let is_late = session
449                .highest_received_seq
450                .is_some_and(|h| sequence_is_before(wire_seq, h));
451            let highest_seen = session.highest_received_seq.unwrap_or(wire_seq);
452
453            update_highest_received(session, wire_seq);
454            session.completed.insert(wire_seq, pending.logical_seq);
455
456            let mut events = Vec::new();
457            if is_late {
458                events.push(ClientEvent::LateReply {
459                    seq: wire_seq,
460                    logical_seq: Some(pending.logical_seq),
461                    highest_seen,
462                    remote: self.remote,
463                    sent_at: Some(pending.sent_at),
464                    received_at: now,
465                    rtt: Some(rtt),
466                    server_timing,
467                    one_way,
468                    received_stats,
469                    bytes: packet_len,
470                    packet_meta: meta.into(),
471                });
472            } else {
473                events.push(ClientEvent::EchoReply {
474                    seq: wire_seq,
475                    logical_seq: pending.logical_seq,
476                    remote: self.remote,
477                    sent_at: pending.sent_at,
478                    received_at: now,
479                    rtt,
480                    server_timing,
481                    one_way,
482                    received_stats,
483                    bytes: packet_len,
484                    packet_meta: meta.into(),
485                });
486            }
487            if flags::has(reply.flags, flags::FLAG_CLOSE) {
488                self.close_from_peer(token, now, &mut events)?;
489            }
490            Ok(events)
491        } else if session.completed.contains(wire_seq) {
492            update_highest_received(session, wire_seq);
493            Ok(vec![ClientEvent::DuplicateReply {
494                seq: wire_seq,
495                remote: self.remote,
496                received_at: now,
497                bytes: packet_len,
498            }])
499        } else if let Some(timed_out) = session.timed_out.remove(wire_seq) {
500            let rtt = compute_rtt(&timed_out.sent_at, &now, &reply.timestamps);
501            let server_timing = build_server_timing(&reply.timestamps);
502            let one_way = compute_one_way(&timed_out.sent_at, &now, &reply.timestamps);
503            let received_stats = build_received_stats(&reply);
504            let highest_seen = session.highest_received_seq.unwrap_or(wire_seq);
505            update_highest_received(session, wire_seq);
506            session.completed.insert(wire_seq, timed_out.logical_seq);
507
508            let mut events = vec![ClientEvent::LateReply {
509                seq: wire_seq,
510                logical_seq: Some(timed_out.logical_seq),
511                highest_seen,
512                remote: self.remote,
513                sent_at: Some(timed_out.sent_at),
514                received_at: now,
515                rtt: Some(rtt),
516                server_timing,
517                one_way,
518                received_stats,
519                bytes: packet_len,
520                packet_meta: meta.into(),
521            }];
522            if flags::has(reply.flags, flags::FLAG_CLOSE) {
523                self.close_from_peer(token, now, &mut events)?;
524            }
525            Ok(events)
526        } else if session
527            .highest_received_seq
528            .is_some_and(|h| sequence_is_before(wire_seq, h))
529        {
530            Ok(vec![ClientEvent::LateReply {
531                seq: wire_seq,
532                logical_seq: None,
533                highest_seen: session.highest_received_seq.unwrap(),
534                remote: self.remote,
535                sent_at: None,
536                received_at: now,
537                rtt: None,
538                server_timing: build_server_timing(&reply.timestamps),
539                one_way: None,
540                received_stats: build_received_stats(&reply),
541                bytes: packet_len,
542                packet_meta: meta.into(),
543            }])
544        } else {
545            Ok(vec![ClientEvent::Warning {
546                kind: WarningKind::UntrackedReply,
547                message: format!(
548                    "dropped reply with untracked seq {wire_seq} (no pending or completed entry)"
549                ),
550            }])
551        }
552    }
553
554    fn close_from_peer(
555        &mut self,
556        token: u64,
557        now: ClientTimestamp,
558        events: &mut Vec<ClientEvent>,
559    ) -> Result<(), ClientError> {
560        clear_dscp_on_socket(&self.socket, self.remote)?;
561        self.phase = ClientPhase::Closed;
562        if let Some(session) = self.session.as_mut() {
563            session.timed_out.clear();
564        }
565        self.session = None;
566        events.push(ClientEvent::SessionClosed {
567            remote: self.remote,
568            token,
569            at: now,
570        });
571        Ok(())
572    }
573
574    fn accept_open_reply(
575        &mut self,
576        reply: OpenReply,
577        now: ClientTimestamp,
578    ) -> Result<OpenOutcome, ClientError> {
579        if reply.params.protocol_version != PROTOCOL_VERSION {
580            return Err(ClientError::ProtocolVersionMismatch {
581                requested: PROTOCOL_VERSION,
582                received: reply.params.protocol_version,
583            });
584        }
585
586        let reply_is_close = flags::has(reply.flags, flags::FLAG_CLOSE);
587        match self.config.run_mode {
588            RunMode::Normal if reply_is_close => Err(ClientError::ServerRejected),
589            RunMode::Normal if reply.token == 0 => Err(ClientError::ZeroToken),
590            RunMode::Normal => self.accept_normal_open(reply, now),
591            RunMode::NoTest if !reply_is_close => Err(ClientError::UnexpectedNoTestReply),
592            RunMode::NoTest if reply.token != 0 => {
593                Err(ClientError::NonZeroNoTestToken { token: reply.token })
594            }
595            RunMode::NoTest => self.accept_no_test_open(reply, now),
596        }
597    }
598
599    fn accept_normal_open(
600        &mut self,
601        reply: OpenReply,
602        now: ClientTimestamp,
603    ) -> Result<OpenOutcome, ClientError> {
604        validate_negotiated_params(
605            &self.requested,
606            &reply.params,
607            self.config.negotiation_policy,
608        )?;
609        let negotiated = NegotiatedParams {
610            params: reply.params.clone(),
611        };
612        let recv_buffer_size = recv_buffer_size(self.config.hmac_key.is_some(), Some(&negotiated));
613        let negotiated_dscp =
614            u8::try_from(negotiated.params.dscp).map_err(|_| ClientError::InvalidConfig {
615                reason: "negotiated dscp must be in range 0..=63".to_owned(),
616            })?;
617        apply_dscp_to_socket(&self.socket, self.remote, negotiated_dscp)?;
618        self.negotiated = Some(negotiated.clone());
619        self.recv_buffer.resize(recv_buffer_size, 0);
620        self.phase = ClientPhase::Open { token: reply.token };
621
622        let end_mono = if negotiated.params.duration_ns > 0 {
623            Some(negotiated_end_mono(
624                now.mono,
625                negotiated.params.duration_ns,
626            )?)
627        } else {
628            None
629        };
630
631        self.session = Some(ActiveSession {
632            next_wire_seq: 0,
633            next_logical_seq: 0,
634            highest_received_seq: None,
635            packets_sent: 0,
636            start_mono: now.mono,
637            end_mono,
638            next_send_at: now.mono,
639            pending: PendingMap::new(self.config.max_pending_probes),
640            timed_out: TimedOutMap::new(self.config.max_pending_probes),
641            completed: CompletedSet::new(self.config.max_pending_probes),
642            sending_done: false,
643        });
644
645        let event = ClientEvent::SessionStarted {
646            remote: self.remote,
647            token: reply.token,
648            negotiated: negotiated.clone(),
649            at: now,
650        };
651
652        Ok(OpenOutcome::Started {
653            remote: self.remote,
654            token: reply.token,
655            negotiated,
656            event,
657        })
658    }
659
660    fn accept_no_test_open(
661        &mut self,
662        reply: OpenReply,
663        now: ClientTimestamp,
664    ) -> Result<OpenOutcome, ClientError> {
665        validate_negotiated_params(
666            &self.requested,
667            &reply.params,
668            self.config.negotiation_policy,
669        )?;
670        let negotiated = NegotiatedParams {
671            params: reply.params.clone(),
672        };
673        self.negotiated = Some(negotiated.clone());
674        self.phase = ClientPhase::NoTestCompleted;
675        let event = ClientEvent::NoTestCompleted {
676            remote: self.remote,
677            negotiated: negotiated.clone(),
678            at: now,
679        };
680        Ok(OpenOutcome::NoTestCompleted {
681            remote: self.remote,
682            negotiated,
683            event,
684        })
685    }
686}
687
688#[cfg(test)]
689impl Client {
690    fn send_probe_at(&mut self, ts: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
691        self.send_probe_inner(Some(ts))
692    }
693
694    fn recv_once_at(&mut self, ts: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
695        self.recv_once_inner(Some(ts))
696    }
697}
698
699fn update_highest_received(session: &mut ActiveSession, wire_seq: u32) {
700    session.highest_received_seq = Some(session.highest_received_seq.map_or(wire_seq, |h| {
701        if sequence_is_after(wire_seq, h) {
702            wire_seq
703        } else {
704            h
705        }
706    }));
707}
708
709fn next_probe_deadline(
710    start: Instant,
711    interval_ns: u64,
712    packets_sent: u64,
713) -> Result<Instant, ClientError> {
714    let offset_ns = interval_ns
715        .checked_mul(packets_sent)
716        .ok_or(ClientError::DurationOverflow)?;
717    start
718        .checked_add(Duration::from_nanos(offset_ns))
719        .ok_or(ClientError::DurationOverflow)
720}
721
722fn sequence_is_after(candidate: u32, current: u32) -> bool {
723    candidate != current && candidate.wrapping_sub(current) < (1 << 31)
724}
725
726fn sequence_is_before(candidate: u32, current: u32) -> bool {
727    current != candidate && current.wrapping_sub(candidate) < (1 << 31)
728}
729
730fn instant_abs_diff(left: Instant, right: Instant) -> Duration {
731    left.checked_duration_since(right)
732        .or_else(|| right.checked_duration_since(left))
733        .unwrap_or(Duration::ZERO)
734}
735
736fn recv_buffer_size(has_hmac: bool, negotiated: Option<&NegotiatedParams>) -> usize {
737    match negotiated {
738        Some(negotiated) => echo_packet_len(has_hmac, &negotiated.params)
739            .saturating_add(1)
740            .max(MIN_RECV_BUFFER_SIZE),
741        None => MIN_RECV_BUFFER_SIZE,
742    }
743}
744
745fn compute_rtt(
746    sent_at: &ClientTimestamp,
747    received_at: &ClientTimestamp,
748    ts: &TimestampFields,
749) -> RttSample {
750    let raw = received_at
751        .mono
752        .checked_duration_since(sent_at.mono)
753        .unwrap_or(Duration::ZERO);
754
755    let server_processing = compute_server_processing(ts);
756
757    let adjusted = server_processing.and_then(|sp| raw.checked_sub(sp));
758
759    let effective = adjusted.unwrap_or(raw);
760    let adjusted_signed = server_processing.map(|sp| SignedDuration {
761        ns: duration_ns_i128(raw) - duration_ns_i128(sp),
762    });
763    let effective_signed = adjusted_signed.unwrap_or(SignedDuration {
764        ns: duration_ns_i128(raw),
765    });
766
767    RttSample {
768        raw,
769        adjusted,
770        effective,
771        adjusted_signed,
772        effective_signed,
773    }
774}
775
776fn duration_ns_i128(duration: Duration) -> i128 {
777    i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX)
778}
779
780fn compute_server_processing(ts: &TimestampFields) -> Option<Duration> {
781    if let (Some(recv_mono), Some(send_mono)) = (ts.recv_mono, ts.send_mono) {
782        let diff = send_mono.checked_sub(recv_mono)?;
783        return Some(Duration::from_nanos(u64::try_from(diff).ok()?));
784    }
785    if let (Some(recv_wall), Some(send_wall)) = (ts.recv_wall, ts.send_wall) {
786        let diff = send_wall.checked_sub(recv_wall)?;
787        return Some(Duration::from_nanos(u64::try_from(diff).ok()?));
788    }
789    None
790}
791
792fn build_server_timing(ts: &TimestampFields) -> Option<ServerTiming> {
793    if ts.recv_wall.is_none()
794        && ts.recv_mono.is_none()
795        && ts.send_wall.is_none()
796        && ts.send_mono.is_none()
797        && ts.midpoint_wall.is_none()
798        && ts.midpoint_mono.is_none()
799    {
800        return None;
801    }
802    Some(ServerTiming {
803        receive_wall_ns: ts.recv_wall,
804        receive_mono_ns: ts.recv_mono,
805        send_wall_ns: ts.send_wall,
806        send_mono_ns: ts.send_mono,
807        midpoint_wall_ns: ts.midpoint_wall,
808        midpoint_mono_ns: ts.midpoint_mono,
809        processing: compute_server_processing(ts),
810    })
811}
812
813fn compute_one_way(
814    sent_at: &ClientTimestamp,
815    received_at: &ClientTimestamp,
816    ts: &TimestampFields,
817) -> Option<OneWayDelaySample> {
818    let server_recv_wall = ts.recv_wall.or(ts.midpoint_wall);
819    let server_send_wall = ts.send_wall.or(ts.midpoint_wall);
820
821    let client_send_ns = unix_epoch_ns_i64(sent_at.wall);
822    let client_recv_ns = unix_epoch_ns_i64(received_at.wall);
823
824    let c2s = server_recv_wall
825        .zip(client_send_ns)
826        .and_then(|(srv, cli)| srv.checked_sub(cli))
827        .and_then(|d| u64::try_from(d).ok().map(Duration::from_nanos));
828    let s2c = client_recv_ns
829        .zip(server_send_wall)
830        .and_then(|(cli, srv)| cli.checked_sub(srv))
831        .and_then(|d| u64::try_from(d).ok().map(Duration::from_nanos));
832
833    if c2s.is_none() && s2c.is_none() {
834        return None;
835    }
836
837    Some(OneWayDelaySample {
838        client_to_server: c2s,
839        server_to_client: s2c,
840    })
841}
842
843fn unix_epoch_ns_i64(time: SystemTime) -> Option<i64> {
844    time.duration_since(UNIX_EPOCH)
845        .ok()
846        .and_then(|duration| i64::try_from(duration.as_nanos()).ok())
847}
848
849fn build_received_stats(reply: &EchoReply) -> Option<ReceivedStatsSample> {
850    if reply.recv_count.is_none() && reply.recv_window.is_none() {
851        return None;
852    }
853    Some(ReceivedStatsSample {
854        count: reply.recv_count,
855        window: reply.recv_window,
856    })
857}
858
859fn params_from_config(config: &ClientConfig) -> Result<Params, ClientError> {
860    validate_protocol_config(config)?;
861    Ok(Params {
862        protocol_version: PROTOCOL_VERSION,
863        duration_ns: match config.duration {
864            Some(duration) => config_duration_to_ns("duration", duration)?,
865            None => 0,
866        },
867        interval_ns: config_duration_to_ns("interval", config.interval)?,
868        length: i64::from(config.length),
869        received_stats: config.received_stats,
870        stamp_at: config.stamp_at,
871        clock: config.clock,
872        dscp: i64::from(config.dscp),
873        server_fill: config.server_fill.clone().map(|value| ServerFill { value }),
874    })
875}
876
877fn validate_protocol_config(config: &ClientConfig) -> Result<(), ClientError> {
878    if config.duration == Some(Duration::ZERO) {
879        return Err(ClientError::InvalidConfig {
880            reason: "duration must be greater than zero; use None for continuous mode".to_owned(),
881        });
882    }
883    if config.interval == Duration::ZERO {
884        return Err(ClientError::InvalidConfig {
885            reason: "interval must be greater than zero".to_owned(),
886        });
887    }
888    if config.dscp > MAX_DSCP_CODEPOINT {
889        return Err(ClientError::InvalidConfig {
890            reason: format!("dscp must be <= {MAX_DSCP_CODEPOINT}"),
891        });
892    }
893    if config.length > MAX_UDP_PAYLOAD_LENGTH {
894        return Err(ClientError::InvalidConfig {
895            reason: format!("packet length must be <= {MAX_UDP_PAYLOAD_LENGTH}"),
896        });
897    }
898
899    if let Some(fill) = &config.server_fill {
900        let len = fill.len();
901        if len == 0 {
902            return Err(ClientError::InvalidConfig {
903                reason: "server_fill must not be empty".to_owned(),
904            });
905        }
906        if len > MAX_SERVER_FILL_BYTES {
907            return Err(ClientError::InvalidConfig {
908                reason: format!("server_fill must be <= {MAX_SERVER_FILL_BYTES} bytes, got {len}"),
909            });
910        }
911    }
912
913    Ok(())
914}
915
916fn config_duration_to_ns(field: &str, duration: Duration) -> Result<i64, ClientError> {
917    i64::try_from(duration.as_nanos()).map_err(|_| ClientError::InvalidConfig {
918        reason: format!("{field} is too large to encode as nanoseconds"),
919    })
920}
921
922fn negotiated_end_mono(start: Instant, duration_ns: i64) -> Result<Instant, ClientError> {
923    debug_assert!(duration_ns > 0);
924    let duration_ns = u64::try_from(duration_ns).expect("validated positive negotiated duration");
925    start
926        .checked_add(Duration::from_nanos(duration_ns))
927        .ok_or_else(|| ClientError::NegotiationRejected {
928            reason: "duration is too large to schedule".to_owned(),
929        })
930}
931
932#[cfg(test)]
933mod tests;