1use std::{
2 io,
3 net::{SocketAddr, UdpSocket},
4 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
5};
6
7use irtt_proto::{
8 close::CloseRequest, decode_echo_reply, echo_packet_len, encode_close_request,
9 encode_echo_request, encode_open_request, flags, EchoReply, EchoRequest, OpenReply,
10 OpenRequest, Params, ServerFill, TimestampFields, PROTOCOL_VERSION,
11};
12
13use crate::{
14 config::{
15 ClientConfig, RecvBudget, RunMode, MAX_DSCP_CODEPOINT, MAX_SERVER_FILL_BYTES,
16 MAX_UDP_PAYLOAD_LENGTH,
17 },
18 error::ClientError,
19 event::{
20 ClientEvent, OneWayDelaySample, OpenOutcome, ReceivedStatsSample, RttSample, ServerTiming,
21 SignedDuration, WarningKind,
22 },
23 metadata::ReceiveMeta,
24 probe::{CompletedSet, PendingMap, PendingProbe, TimedOutMap},
25 receive::recv_datagram,
26 session::{validate_negotiated_params, ActiveSession, ClientPhase, NegotiatedParams},
27 socket::{connect_udp_socket, resolve_remote, validate_open_timeouts},
28 socket_options::{apply_dscp_to_socket, clear_dscp_on_socket},
29 timing::ClientTimestamp,
30};
31
32const MAX_OPEN_PACKET_SIZE: usize = 512;
33const MIN_RECV_BUFFER_SIZE: usize = 2048;
34
35#[derive(Debug)]
36pub struct Client {
37 config: ClientConfig,
38 socket: UdpSocket,
39 remote: SocketAddr,
40 requested: Params,
41 negotiated: Option<NegotiatedParams>,
42 phase: ClientPhase,
43 session: Option<ActiveSession>,
44 recv_buffer: Vec<u8>,
45}
46
47impl Client {
48 pub fn connect(config: ClientConfig) -> Result<Self, ClientError> {
49 validate_open_timeouts(&config.open_timeouts)?;
50 if config.max_pending_probes == 0 {
51 return Err(ClientError::InvalidConfig {
52 reason: "max_pending_probes must be greater than zero".to_owned(),
53 });
54 }
55 if config.probe_timeout == Duration::ZERO {
56 return Err(ClientError::InvalidConfig {
57 reason: "probe_timeout must be greater than zero".to_owned(),
58 });
59 }
60 let remote = resolve_remote(&config)?;
61 let socket = connect_udp_socket(&config.socket_config, remote)?;
62 let requested = params_from_config(&config)?;
63
64 Ok(Self {
65 config,
66 socket,
67 remote,
68 requested,
69 negotiated: None,
70 phase: ClientPhase::Connected,
71 session: None,
72 recv_buffer: vec![0_u8; MIN_RECV_BUFFER_SIZE],
73 })
74 }
75
76 pub fn open(&mut self, now: ClientTimestamp) -> Result<OpenOutcome, ClientError> {
77 match self.phase {
78 ClientPhase::Connected => {}
79 ClientPhase::Open { .. } => return Err(ClientError::AlreadyOpen),
80 ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
81 ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
82 }
83
84 let outcome = self.open_inner(now);
85 let restore = self
86 .socket
87 .set_read_timeout(self.config.socket_config.recv_timeout);
88 match (outcome, restore) {
89 (Ok(outcome), Ok(())) => Ok(outcome),
90 (Ok(_), Err(source)) => Err(ClientError::ReadTimeoutRestore { source }),
91 (Err(err), Ok(())) => Err(err),
92 (Err(err), Err(_)) => Err(err),
93 }
94 }
95
96 fn open_inner(&mut self, _now: ClientTimestamp) -> Result<OpenOutcome, ClientError> {
97 let request = OpenRequest {
98 params: self.requested.clone(),
99 close: self.config.run_mode == RunMode::NoTest,
100 };
101 let packet = encode_open_request(&request, self.config.hmac_key.as_deref())?;
102 let mut buf = [0_u8; MAX_OPEN_PACKET_SIZE];
103
104 for timeout in &self.config.open_timeouts {
105 self.socket.set_read_timeout(Some(*timeout))?;
106 self.socket.send(&packet)?;
107
108 match self.socket.recv(&mut buf) {
109 Ok(size) => {
110 let reply = irtt_proto::decode_open_reply(
111 &buf[..size],
112 self.config.hmac_key.as_deref(),
113 )?;
114 return self.accept_open_reply(reply, ClientTimestamp::now());
115 }
116 Err(err)
117 if matches!(
118 err.kind(),
119 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
120 ) => {}
121 Err(err) => return Err(ClientError::Socket(err)),
122 }
123 }
124
125 Err(ClientError::OpenTimeout)
126 }
127
128 pub fn close(&mut self, now: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
129 let token = match self.phase {
130 ClientPhase::Open { token } => token,
131 ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
132 ClientPhase::Connected | ClientPhase::NoTestCompleted => {
133 return Err(ClientError::NotOpen)
134 }
135 };
136
137 clear_dscp_on_socket(&self.socket, self.remote)?;
138 let packet =
139 encode_close_request(&CloseRequest { token }, self.config.hmac_key.as_deref())?;
140 self.socket.send(&packet)?;
141 self.phase = ClientPhase::Closed;
142 if let Some(session) = self.session.as_mut() {
143 session.timed_out.clear();
144 }
145 self.session = None;
146
147 Ok(vec![ClientEvent::SessionClosed {
148 remote: self.remote,
149 token,
150 at: now,
151 }])
152 }
153
154 pub fn next_send_deadline(&self) -> Option<Instant> {
155 let session = self.session.as_ref()?;
156 if session.sending_done {
157 return None;
158 }
159 Some(session.next_send_at)
160 }
161
162 pub fn probe_timeout(&self) -> Duration {
163 self.config.probe_timeout
164 }
165
166 pub fn send_probe(&mut self) -> Result<Vec<ClientEvent>, ClientError> {
167 self.send_probe_inner(None)
168 }
169
170 pub fn recv_once(&mut self) -> Result<Vec<ClientEvent>, ClientError> {
171 self.recv_once_inner(None)
172 }
173
174 fn send_probe_inner(
175 &mut self,
176 override_ts: Option<ClientTimestamp>,
177 ) -> Result<Vec<ClientEvent>, ClientError> {
178 let token = match self.phase {
179 ClientPhase::Open { token } => token,
180 ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
181 ClientPhase::Connected => return Err(ClientError::NotOpen),
182 ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
183 };
184
185 let session = self
186 .session
187 .as_mut()
188 .expect("session must exist when phase is Open");
189
190 if session.sending_done {
191 return Ok(vec![]);
192 }
193
194 let now = override_ts.unwrap_or_else(ClientTimestamp::now);
195
196 if let Some(end) = session.end_mono {
197 if now.mono >= end {
198 session.sending_done = true;
199 return Ok(vec![]);
200 }
201 }
202
203 session.pending.check_capacity()?;
204
205 let negotiated = self
206 .negotiated
207 .as_ref()
208 .expect("negotiated must exist when Open");
209
210 let wire_seq = session.next_wire_seq;
211 let logical_seq = session.next_logical_seq;
212 let scheduled_at = session.next_send_at;
213
214 let request = EchoRequest {
215 token,
216 sequence: wire_seq,
217 params: negotiated.params.clone(),
218 payload: vec![],
219 };
220 let packet = encode_echo_request(&request, self.config.hmac_key.as_deref())?;
221 let sent_at = override_ts.unwrap_or_else(ClientTimestamp::now);
222 let send_call_start = Instant::now();
223 let bytes = self.socket.send(&packet)?;
224 let send_call = send_call_start.elapsed();
225 let timer_error = instant_abs_diff(sent_at.mono, scheduled_at);
226
227 let session = self
228 .session
229 .as_mut()
230 .expect("session must exist when phase is Open");
231
232 let pending = PendingProbe {
233 logical_seq,
234 wire_seq,
235 sent_at,
236 timeout_at: sent_at
237 .mono
238 .checked_add(self.config.probe_timeout)
239 .ok_or(ClientError::DurationOverflow)?,
240 };
241 session.pending.insert(pending)?;
242
243 session.next_wire_seq = session.next_wire_seq.wrapping_add(1);
244 session.next_logical_seq =
245 session
246 .next_logical_seq
247 .checked_add(1)
248 .ok_or(ClientError::CounterOverflow {
249 counter: "next_logical_seq",
250 })?;
251 session.packets_sent =
252 session
253 .packets_sent
254 .checked_add(1)
255 .ok_or(ClientError::CounterOverflow {
256 counter: "packets_sent",
257 })?;
258
259 let negotiated = self
260 .negotiated
261 .as_ref()
262 .expect("negotiated must exist when Open");
263 let interval_ns = u64::try_from(negotiated.params.interval_ns)
264 .expect("validated positive negotiated interval");
265 let session = self
266 .session
267 .as_mut()
268 .expect("session must exist when phase is Open");
269 session.next_send_at =
270 next_probe_deadline(session.start_mono, interval_ns, session.packets_sent)?;
271
272 if let Some(end) = session.end_mono {
273 if session.next_send_at >= end {
274 session.sending_done = true;
275 }
276 }
277
278 Ok(vec![ClientEvent::EchoSent {
279 seq: wire_seq,
280 logical_seq,
281 remote: self.remote,
282 scheduled_at,
283 sent_at,
284 bytes,
285 send_call,
286 timer_error,
287 }])
288 }
289
290 fn recv_once_inner(
291 &mut self,
292 override_ts: Option<ClientTimestamp>,
293 ) -> Result<Vec<ClientEvent>, ClientError> {
294 match self.phase {
295 ClientPhase::Open { .. } => {}
296 ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
297 ClientPhase::Connected => return Err(ClientError::NotOpen),
298 ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
299 }
300
301 let datagram = match recv_datagram(&self.socket, &mut self.recv_buffer) {
302 Ok(datagram) => datagram,
303 Err(err)
304 if matches!(
305 err.kind(),
306 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
307 ) =>
308 {
309 return Ok(vec![]);
310 }
311 Err(err) => return Err(ClientError::Socket(err)),
312 };
313
314 let now = override_ts.unwrap_or(datagram.received_at);
315 let packet_len = datagram.len;
316 let Some(reply) = self.decode_received_packet(&self.recv_buffer[..packet_len]) else {
317 return Ok(vec![ClientEvent::Warning {
318 kind: WarningKind::MalformedOrUnrelatedPacket,
319 message: "dropped malformed or unrelated packet".to_owned(),
320 }]);
321 };
322 self.process_echo_reply(reply, packet_len, now, datagram.meta)
323 }
324
325 pub fn recv_available(&mut self, budget: RecvBudget) -> Result<Vec<ClientEvent>, ClientError> {
326 let mut all_events = Vec::new();
327 for _ in 0..budget.max_packets {
328 let events = self.recv_once()?;
329 if events.is_empty() {
330 break;
331 }
332 all_events.extend(events);
333 }
334 Ok(all_events)
335 }
336
337 #[cfg(test)]
338 fn recv_buffer_size(&self) -> usize {
339 recv_buffer_size(self.config.hmac_key.is_some(), self.negotiated.as_ref())
340 }
341
342 pub fn poll_timeouts(&mut self, now: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
343 match self.phase {
344 ClientPhase::Open { .. } => {}
345 ClientPhase::Closed => return Err(ClientError::AlreadyClosed),
346 ClientPhase::Connected => return Err(ClientError::NotOpen),
347 ClientPhase::NoTestCompleted => return Err(ClientError::AlreadyCompleted),
348 }
349
350 let session = self
351 .session
352 .as_mut()
353 .expect("session must exist when phase is Open");
354
355 let expired = session.pending.drain_expired(now.mono);
356 let mut events = Vec::with_capacity(expired.len());
357 for probe in expired {
358 events.push(ClientEvent::EchoLoss {
359 seq: probe.wire_seq,
360 logical_seq: probe.logical_seq,
361 sent_at: probe.sent_at,
362 timeout_at: probe.timeout_at,
363 });
364 session.timed_out.insert(probe);
365 }
366
367 Ok(events)
368 }
369
370 pub fn is_run_complete(&self) -> bool {
371 let Some(session) = self.session.as_ref() else {
372 return matches!(
373 self.phase,
374 ClientPhase::Closed | ClientPhase::NoTestCompleted
375 );
376 };
377 session.sending_done && session.pending.len() == 0
378 }
379
380 pub(crate) fn has_timed_out_metadata(&self) -> bool {
381 self.session
382 .as_ref()
383 .is_some_and(|session| session.timed_out.len() > 0)
384 }
385
386 pub(crate) fn packets_sent(&self) -> u64 {
387 self.session
388 .as_ref()
389 .map_or(0, |session| session.packets_sent)
390 }
391
392 #[cfg(test)]
393 fn process_received_packet(
394 &mut self,
395 packet: &[u8],
396 now: ClientTimestamp,
397 meta: ReceiveMeta,
398 ) -> Result<Vec<ClientEvent>, ClientError> {
399 let packet_len = packet.len();
400 let Some(reply) = self.decode_received_packet(packet) else {
401 return Ok(vec![ClientEvent::Warning {
402 kind: WarningKind::MalformedOrUnrelatedPacket,
403 message: "dropped malformed or unrelated packet".to_owned(),
404 }]);
405 };
406 self.process_echo_reply(reply, packet_len, now, meta)
407 }
408
409 fn decode_received_packet(&self, packet: &[u8]) -> Option<EchoReply> {
410 let negotiated = self
411 .negotiated
412 .as_ref()
413 .expect("negotiated must exist when Open");
414
415 decode_echo_reply(packet, &negotiated.params, self.config.hmac_key.as_deref()).ok()
416 }
417
418 fn process_echo_reply(
419 &mut self,
420 reply: EchoReply,
421 packet_len: usize,
422 now: ClientTimestamp,
423 meta: ReceiveMeta,
424 ) -> Result<Vec<ClientEvent>, ClientError> {
425 let token = match self.phase {
426 ClientPhase::Open { token } => token,
427 _ => unreachable!(),
428 };
429 if reply.token != token {
430 return Ok(vec![ClientEvent::Warning {
431 kind: WarningKind::WrongToken,
432 message: format!(
433 "dropped reply with wrong token: expected {token:#x}, got {:#x}",
434 reply.token
435 ),
436 }]);
437 }
438
439 let session = self.session.as_mut().expect("session must exist when Open");
440
441 let wire_seq = reply.sequence;
442
443 if let Some(pending) = session.pending.remove(wire_seq) {
444 let rtt = compute_rtt(&pending.sent_at, &now, &reply.timestamps);
445 let server_timing = build_server_timing(&reply.timestamps);
446 let one_way = compute_one_way(&pending.sent_at, &now, &reply.timestamps);
447 let received_stats = build_received_stats(&reply);
448 let is_late = session
449 .highest_received_seq
450 .is_some_and(|h| sequence_is_before(wire_seq, h));
451 let highest_seen = session.highest_received_seq.unwrap_or(wire_seq);
452
453 update_highest_received(session, wire_seq);
454 session.completed.insert(wire_seq, pending.logical_seq);
455
456 let mut events = Vec::new();
457 if is_late {
458 events.push(ClientEvent::LateReply {
459 seq: wire_seq,
460 logical_seq: Some(pending.logical_seq),
461 highest_seen,
462 remote: self.remote,
463 sent_at: Some(pending.sent_at),
464 received_at: now,
465 rtt: Some(rtt),
466 server_timing,
467 one_way,
468 received_stats,
469 bytes: packet_len,
470 packet_meta: meta.into(),
471 });
472 } else {
473 events.push(ClientEvent::EchoReply {
474 seq: wire_seq,
475 logical_seq: pending.logical_seq,
476 remote: self.remote,
477 sent_at: pending.sent_at,
478 received_at: now,
479 rtt,
480 server_timing,
481 one_way,
482 received_stats,
483 bytes: packet_len,
484 packet_meta: meta.into(),
485 });
486 }
487 if flags::has(reply.flags, flags::FLAG_CLOSE) {
488 self.close_from_peer(token, now, &mut events)?;
489 }
490 Ok(events)
491 } else if session.completed.contains(wire_seq) {
492 update_highest_received(session, wire_seq);
493 Ok(vec![ClientEvent::DuplicateReply {
494 seq: wire_seq,
495 remote: self.remote,
496 received_at: now,
497 bytes: packet_len,
498 }])
499 } else if let Some(timed_out) = session.timed_out.remove(wire_seq) {
500 let rtt = compute_rtt(&timed_out.sent_at, &now, &reply.timestamps);
501 let server_timing = build_server_timing(&reply.timestamps);
502 let one_way = compute_one_way(&timed_out.sent_at, &now, &reply.timestamps);
503 let received_stats = build_received_stats(&reply);
504 let highest_seen = session.highest_received_seq.unwrap_or(wire_seq);
505 update_highest_received(session, wire_seq);
506 session.completed.insert(wire_seq, timed_out.logical_seq);
507
508 let mut events = vec![ClientEvent::LateReply {
509 seq: wire_seq,
510 logical_seq: Some(timed_out.logical_seq),
511 highest_seen,
512 remote: self.remote,
513 sent_at: Some(timed_out.sent_at),
514 received_at: now,
515 rtt: Some(rtt),
516 server_timing,
517 one_way,
518 received_stats,
519 bytes: packet_len,
520 packet_meta: meta.into(),
521 }];
522 if flags::has(reply.flags, flags::FLAG_CLOSE) {
523 self.close_from_peer(token, now, &mut events)?;
524 }
525 Ok(events)
526 } else if session
527 .highest_received_seq
528 .is_some_and(|h| sequence_is_before(wire_seq, h))
529 {
530 Ok(vec![ClientEvent::LateReply {
531 seq: wire_seq,
532 logical_seq: None,
533 highest_seen: session.highest_received_seq.unwrap(),
534 remote: self.remote,
535 sent_at: None,
536 received_at: now,
537 rtt: None,
538 server_timing: build_server_timing(&reply.timestamps),
539 one_way: None,
540 received_stats: build_received_stats(&reply),
541 bytes: packet_len,
542 packet_meta: meta.into(),
543 }])
544 } else {
545 Ok(vec![ClientEvent::Warning {
546 kind: WarningKind::UntrackedReply,
547 message: format!(
548 "dropped reply with untracked seq {wire_seq} (no pending or completed entry)"
549 ),
550 }])
551 }
552 }
553
554 fn close_from_peer(
555 &mut self,
556 token: u64,
557 now: ClientTimestamp,
558 events: &mut Vec<ClientEvent>,
559 ) -> Result<(), ClientError> {
560 clear_dscp_on_socket(&self.socket, self.remote)?;
561 self.phase = ClientPhase::Closed;
562 if let Some(session) = self.session.as_mut() {
563 session.timed_out.clear();
564 }
565 self.session = None;
566 events.push(ClientEvent::SessionClosed {
567 remote: self.remote,
568 token,
569 at: now,
570 });
571 Ok(())
572 }
573
574 fn accept_open_reply(
575 &mut self,
576 reply: OpenReply,
577 now: ClientTimestamp,
578 ) -> Result<OpenOutcome, ClientError> {
579 if reply.params.protocol_version != PROTOCOL_VERSION {
580 return Err(ClientError::ProtocolVersionMismatch {
581 requested: PROTOCOL_VERSION,
582 received: reply.params.protocol_version,
583 });
584 }
585
586 let reply_is_close = flags::has(reply.flags, flags::FLAG_CLOSE);
587 match self.config.run_mode {
588 RunMode::Normal if reply_is_close => Err(ClientError::ServerRejected),
589 RunMode::Normal if reply.token == 0 => Err(ClientError::ZeroToken),
590 RunMode::Normal => self.accept_normal_open(reply, now),
591 RunMode::NoTest if !reply_is_close => Err(ClientError::UnexpectedNoTestReply),
592 RunMode::NoTest if reply.token != 0 => {
593 Err(ClientError::NonZeroNoTestToken { token: reply.token })
594 }
595 RunMode::NoTest => self.accept_no_test_open(reply, now),
596 }
597 }
598
599 fn accept_normal_open(
600 &mut self,
601 reply: OpenReply,
602 now: ClientTimestamp,
603 ) -> Result<OpenOutcome, ClientError> {
604 validate_negotiated_params(
605 &self.requested,
606 &reply.params,
607 self.config.negotiation_policy,
608 )?;
609 let negotiated = NegotiatedParams {
610 params: reply.params.clone(),
611 };
612 let recv_buffer_size = recv_buffer_size(self.config.hmac_key.is_some(), Some(&negotiated));
613 let negotiated_dscp =
614 u8::try_from(negotiated.params.dscp).map_err(|_| ClientError::InvalidConfig {
615 reason: "negotiated dscp must be in range 0..=63".to_owned(),
616 })?;
617 apply_dscp_to_socket(&self.socket, self.remote, negotiated_dscp)?;
618 self.negotiated = Some(negotiated.clone());
619 self.recv_buffer.resize(recv_buffer_size, 0);
620 self.phase = ClientPhase::Open { token: reply.token };
621
622 let end_mono = if negotiated.params.duration_ns > 0 {
623 Some(negotiated_end_mono(
624 now.mono,
625 negotiated.params.duration_ns,
626 )?)
627 } else {
628 None
629 };
630
631 self.session = Some(ActiveSession {
632 next_wire_seq: 0,
633 next_logical_seq: 0,
634 highest_received_seq: None,
635 packets_sent: 0,
636 start_mono: now.mono,
637 end_mono,
638 next_send_at: now.mono,
639 pending: PendingMap::new(self.config.max_pending_probes),
640 timed_out: TimedOutMap::new(self.config.max_pending_probes),
641 completed: CompletedSet::new(self.config.max_pending_probes),
642 sending_done: false,
643 });
644
645 let event = ClientEvent::SessionStarted {
646 remote: self.remote,
647 token: reply.token,
648 negotiated: negotiated.clone(),
649 at: now,
650 };
651
652 Ok(OpenOutcome::Started {
653 remote: self.remote,
654 token: reply.token,
655 negotiated,
656 event,
657 })
658 }
659
660 fn accept_no_test_open(
661 &mut self,
662 reply: OpenReply,
663 now: ClientTimestamp,
664 ) -> Result<OpenOutcome, ClientError> {
665 validate_negotiated_params(
666 &self.requested,
667 &reply.params,
668 self.config.negotiation_policy,
669 )?;
670 let negotiated = NegotiatedParams {
671 params: reply.params.clone(),
672 };
673 self.negotiated = Some(negotiated.clone());
674 self.phase = ClientPhase::NoTestCompleted;
675 let event = ClientEvent::NoTestCompleted {
676 remote: self.remote,
677 negotiated: negotiated.clone(),
678 at: now,
679 };
680 Ok(OpenOutcome::NoTestCompleted {
681 remote: self.remote,
682 negotiated,
683 event,
684 })
685 }
686}
687
688#[cfg(test)]
689impl Client {
690 fn send_probe_at(&mut self, ts: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
691 self.send_probe_inner(Some(ts))
692 }
693
694 fn recv_once_at(&mut self, ts: ClientTimestamp) -> Result<Vec<ClientEvent>, ClientError> {
695 self.recv_once_inner(Some(ts))
696 }
697}
698
699fn update_highest_received(session: &mut ActiveSession, wire_seq: u32) {
700 session.highest_received_seq = Some(session.highest_received_seq.map_or(wire_seq, |h| {
701 if sequence_is_after(wire_seq, h) {
702 wire_seq
703 } else {
704 h
705 }
706 }));
707}
708
709fn next_probe_deadline(
710 start: Instant,
711 interval_ns: u64,
712 packets_sent: u64,
713) -> Result<Instant, ClientError> {
714 let offset_ns = interval_ns
715 .checked_mul(packets_sent)
716 .ok_or(ClientError::DurationOverflow)?;
717 start
718 .checked_add(Duration::from_nanos(offset_ns))
719 .ok_or(ClientError::DurationOverflow)
720}
721
722fn sequence_is_after(candidate: u32, current: u32) -> bool {
723 candidate != current && candidate.wrapping_sub(current) < (1 << 31)
724}
725
726fn sequence_is_before(candidate: u32, current: u32) -> bool {
727 current != candidate && current.wrapping_sub(candidate) < (1 << 31)
728}
729
730fn instant_abs_diff(left: Instant, right: Instant) -> Duration {
731 left.checked_duration_since(right)
732 .or_else(|| right.checked_duration_since(left))
733 .unwrap_or(Duration::ZERO)
734}
735
736fn recv_buffer_size(has_hmac: bool, negotiated: Option<&NegotiatedParams>) -> usize {
737 match negotiated {
738 Some(negotiated) => echo_packet_len(has_hmac, &negotiated.params)
739 .saturating_add(1)
740 .max(MIN_RECV_BUFFER_SIZE),
741 None => MIN_RECV_BUFFER_SIZE,
742 }
743}
744
745fn compute_rtt(
746 sent_at: &ClientTimestamp,
747 received_at: &ClientTimestamp,
748 ts: &TimestampFields,
749) -> RttSample {
750 let raw = received_at
751 .mono
752 .checked_duration_since(sent_at.mono)
753 .unwrap_or(Duration::ZERO);
754
755 let server_processing = compute_server_processing(ts);
756
757 let adjusted = server_processing.and_then(|sp| raw.checked_sub(sp));
758
759 let effective = adjusted.unwrap_or(raw);
760 let adjusted_signed = server_processing.map(|sp| SignedDuration {
761 ns: duration_ns_i128(raw) - duration_ns_i128(sp),
762 });
763 let effective_signed = adjusted_signed.unwrap_or(SignedDuration {
764 ns: duration_ns_i128(raw),
765 });
766
767 RttSample {
768 raw,
769 adjusted,
770 effective,
771 adjusted_signed,
772 effective_signed,
773 }
774}
775
776fn duration_ns_i128(duration: Duration) -> i128 {
777 i128::try_from(duration.as_nanos()).unwrap_or(i128::MAX)
778}
779
780fn compute_server_processing(ts: &TimestampFields) -> Option<Duration> {
781 if let (Some(recv_mono), Some(send_mono)) = (ts.recv_mono, ts.send_mono) {
782 let diff = send_mono.checked_sub(recv_mono)?;
783 return Some(Duration::from_nanos(u64::try_from(diff).ok()?));
784 }
785 if let (Some(recv_wall), Some(send_wall)) = (ts.recv_wall, ts.send_wall) {
786 let diff = send_wall.checked_sub(recv_wall)?;
787 return Some(Duration::from_nanos(u64::try_from(diff).ok()?));
788 }
789 None
790}
791
792fn build_server_timing(ts: &TimestampFields) -> Option<ServerTiming> {
793 if ts.recv_wall.is_none()
794 && ts.recv_mono.is_none()
795 && ts.send_wall.is_none()
796 && ts.send_mono.is_none()
797 && ts.midpoint_wall.is_none()
798 && ts.midpoint_mono.is_none()
799 {
800 return None;
801 }
802 Some(ServerTiming {
803 receive_wall_ns: ts.recv_wall,
804 receive_mono_ns: ts.recv_mono,
805 send_wall_ns: ts.send_wall,
806 send_mono_ns: ts.send_mono,
807 midpoint_wall_ns: ts.midpoint_wall,
808 midpoint_mono_ns: ts.midpoint_mono,
809 processing: compute_server_processing(ts),
810 })
811}
812
813fn compute_one_way(
814 sent_at: &ClientTimestamp,
815 received_at: &ClientTimestamp,
816 ts: &TimestampFields,
817) -> Option<OneWayDelaySample> {
818 let server_recv_wall = ts.recv_wall.or(ts.midpoint_wall);
819 let server_send_wall = ts.send_wall.or(ts.midpoint_wall);
820
821 let client_send_ns = unix_epoch_ns_i64(sent_at.wall);
822 let client_recv_ns = unix_epoch_ns_i64(received_at.wall);
823
824 let c2s = server_recv_wall
825 .zip(client_send_ns)
826 .and_then(|(srv, cli)| srv.checked_sub(cli))
827 .and_then(|d| u64::try_from(d).ok().map(Duration::from_nanos));
828 let s2c = client_recv_ns
829 .zip(server_send_wall)
830 .and_then(|(cli, srv)| cli.checked_sub(srv))
831 .and_then(|d| u64::try_from(d).ok().map(Duration::from_nanos));
832
833 if c2s.is_none() && s2c.is_none() {
834 return None;
835 }
836
837 Some(OneWayDelaySample {
838 client_to_server: c2s,
839 server_to_client: s2c,
840 })
841}
842
843fn unix_epoch_ns_i64(time: SystemTime) -> Option<i64> {
844 time.duration_since(UNIX_EPOCH)
845 .ok()
846 .and_then(|duration| i64::try_from(duration.as_nanos()).ok())
847}
848
849fn build_received_stats(reply: &EchoReply) -> Option<ReceivedStatsSample> {
850 if reply.recv_count.is_none() && reply.recv_window.is_none() {
851 return None;
852 }
853 Some(ReceivedStatsSample {
854 count: reply.recv_count,
855 window: reply.recv_window,
856 })
857}
858
859fn params_from_config(config: &ClientConfig) -> Result<Params, ClientError> {
860 validate_protocol_config(config)?;
861 Ok(Params {
862 protocol_version: PROTOCOL_VERSION,
863 duration_ns: match config.duration {
864 Some(duration) => config_duration_to_ns("duration", duration)?,
865 None => 0,
866 },
867 interval_ns: config_duration_to_ns("interval", config.interval)?,
868 length: i64::from(config.length),
869 received_stats: config.received_stats,
870 stamp_at: config.stamp_at,
871 clock: config.clock,
872 dscp: i64::from(config.dscp),
873 server_fill: config.server_fill.clone().map(|value| ServerFill { value }),
874 })
875}
876
877fn validate_protocol_config(config: &ClientConfig) -> Result<(), ClientError> {
878 if config.duration == Some(Duration::ZERO) {
879 return Err(ClientError::InvalidConfig {
880 reason: "duration must be greater than zero; use None for continuous mode".to_owned(),
881 });
882 }
883 if config.interval == Duration::ZERO {
884 return Err(ClientError::InvalidConfig {
885 reason: "interval must be greater than zero".to_owned(),
886 });
887 }
888 if config.dscp > MAX_DSCP_CODEPOINT {
889 return Err(ClientError::InvalidConfig {
890 reason: format!("dscp must be <= {MAX_DSCP_CODEPOINT}"),
891 });
892 }
893 if config.length > MAX_UDP_PAYLOAD_LENGTH {
894 return Err(ClientError::InvalidConfig {
895 reason: format!("packet length must be <= {MAX_UDP_PAYLOAD_LENGTH}"),
896 });
897 }
898
899 if let Some(fill) = &config.server_fill {
900 let len = fill.len();
901 if len == 0 {
902 return Err(ClientError::InvalidConfig {
903 reason: "server_fill must not be empty".to_owned(),
904 });
905 }
906 if len > MAX_SERVER_FILL_BYTES {
907 return Err(ClientError::InvalidConfig {
908 reason: format!("server_fill must be <= {MAX_SERVER_FILL_BYTES} bytes, got {len}"),
909 });
910 }
911 }
912
913 Ok(())
914}
915
916fn config_duration_to_ns(field: &str, duration: Duration) -> Result<i64, ClientError> {
917 i64::try_from(duration.as_nanos()).map_err(|_| ClientError::InvalidConfig {
918 reason: format!("{field} is too large to encode as nanoseconds"),
919 })
920}
921
922fn negotiated_end_mono(start: Instant, duration_ns: i64) -> Result<Instant, ClientError> {
923 debug_assert!(duration_ns > 0);
924 let duration_ns = u64::try_from(duration_ns).expect("validated positive negotiated duration");
925 start
926 .checked_add(Duration::from_nanos(duration_ns))
927 .ok_or_else(|| ClientError::NegotiationRejected {
928 reason: "duration is too large to schedule".to_owned(),
929 })
930}
931
932#[cfg(test)]
933mod tests;