1use crate::config::{
2 RuntimeMemoryConfig, runtime_reliable_max_end_to_end_ack_cache,
3 runtime_reliable_max_end_to_end_pending, runtime_reliable_max_pending,
4 runtime_reliable_max_retries, runtime_reliable_max_return_routes,
5 runtime_reliable_retransmit_ms,
6};
7use crate::diagnostics::{
8 AdaptiveLinkStats, DiscoveryRuntimeStats, QueueRuntimeStats, ReliableRuntimeStats,
9 RouteModeStats, RouteOverrideStats, RoutePriorityStats, RouteWeightStats, RuntimeSideStats,
10 RuntimeStatsSnapshot, RuntimeTypeStats, TypedRouteOverrideStats,
11};
12#[cfg(feature = "discovery")]
13use crate::discovery::{
14 self, ClientStatsSnapshot, DISCOVERY_ROUTE_TTL_MS, DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS,
15 DISCOVERY_SLOW_LINK_PING_INTERVAL_MS, DiscoveryCadenceState,
16 TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS, TopologyAnnouncerRoute, TopologyBoardNode,
17 TopologySideRoute, TopologySnapshot,
18};
19use crate::packet::{Packet, hash_bytes_u64};
20use crate::queue::{BoundedDeque, ByteCost};
21use crate::wire_format;
22use crate::{is_reliable_type, message_meta, message_priority, reliable_mode};
23use crate::{
24 router::{Clock, CompactTimestampOmissionPolicy, SideTransportProfile},
25 {
26 RouteSelectionMode, TelemetryError, TelemetryResult,
27 lock::{ReentryGate, ReentryGuard, RouterMutex},
28 },
29};
30use alloc::borrow::ToOwned;
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
33use alloc::string::{String, ToString};
34use alloc::{sync::Arc, vec, vec::Vec};
35use core::mem::size_of;
36use crc32fast::Hasher as Crc32Hasher;
37
38pub type RelaySideId = usize;
40const SIDE_TRANSPORT_MAGIC: &[u8; 3] = b"SDT";
41const SIDE_TRANSPORT_KIND_FULL: u8 = 0x01;
42const SIDE_TRANSPORT_KIND_COMPACT: u8 = 0x02;
43const SIDE_TRANSPORT_KIND_CHUNK: u8 = 0x03;
44const SIDE_TRANSPORT_KIND_COMPACT_DELTA: u8 = 0x04;
45const SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP: u8 = 0x05;
46const SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED: u8 = 0x01;
47const SIDE_TRANSPORT_FLAG_WIRE_CONTRACT: u8 = 0x04;
48const SIDE_TRANSPORT_FLAG_PACKET_NONCE: u8 = 0x08;
49const SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
50const SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
51const CONTROL_SLOW_LINK_CAPACITY_BPS: u64 = 512;
52const SIDE_TRANSPORT_CHUNK_OVERHEAD: usize = 3 + 1 + 4 + 2 + 2 + wire_format::CRC32_BYTES;
53const SIDE_TRANSPORT_EP_BITMAP_BITS: usize = (crate::MAX_VALUE_DATA_ENDPOINT as usize) + 1;
54const SIDE_TRANSPORT_EP_BITMAP_BYTES: usize = SIDE_TRANSPORT_EP_BITMAP_BITS.div_ceil(8);
55pub const IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
56 crate::router::IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
57pub const IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
58 crate::router::IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
59pub const DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT: usize =
60 crate::router::DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT;
61type PacketHandlerFn = dyn Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static;
63
64type PackedHandlerFn = dyn Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static;
66
67#[derive(Clone)]
69pub enum RelayTxHandlerFn {
70 Packed(Arc<PackedHandlerFn>),
71 Packet(Arc<PacketHandlerFn>),
72}
73
74#[derive(Clone, Copy, Debug)]
75pub struct RelaySideOptions {
76 pub reliable_enabled: bool,
82 pub link_local_enabled: bool,
84 pub ingress_enabled: bool,
86 pub egress_enabled: bool,
88 pub header_template_enabled: bool,
90 pub max_frame_bytes: usize,
97 pub compact_header_target_bytes: usize,
99 pub max_side_transport_templates: usize,
101 pub omit_unchanged_compact_timestamps: bool,
103 pub compact_timestamp_omission_types: CompactTimestampOmissionPolicy,
105 pub side_transport_profile: SideTransportProfile,
107}
108
109impl Default for RelaySideOptions {
110 fn default() -> Self {
111 Self {
112 reliable_enabled: false,
113 link_local_enabled: false,
114 ingress_enabled: true,
115 egress_enabled: true,
116 header_template_enabled: false,
117 max_frame_bytes: 0,
118 compact_header_target_bytes: 0,
119 max_side_transport_templates: DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT,
120 omit_unchanged_compact_timestamps: false,
121 compact_timestamp_omission_types: CompactTimestampOmissionPolicy::none(),
122 side_transport_profile: SideTransportProfile::Canonical,
123 }
124 }
125}
126
127impl RelaySideOptions {
128 #[inline]
133 pub fn with_small_packet_transport(mut self, max_frame_bytes: usize) -> Self {
134 self.header_template_enabled = true;
135 self.max_frame_bytes = max_frame_bytes;
136 self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
137 self.side_transport_profile = SideTransportProfile::Ipv6Like;
138 self
139 }
140
141 #[inline]
142 pub fn with_ipv4_like_compact_header_target(mut self) -> Self {
143 self.header_template_enabled = true;
144 self.compact_header_target_bytes = IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
145 self.omit_unchanged_compact_timestamps = true;
146 self.side_transport_profile = SideTransportProfile::Ipv4Like;
147 self
148 }
149
150 #[inline]
151 pub fn with_ipv6_like_compact_header_target(mut self) -> Self {
152 self.header_template_enabled = true;
153 self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
154 self.side_transport_profile = SideTransportProfile::Ipv6Like;
155 self
156 }
157
158 #[inline]
159 pub fn with_template_transport(mut self) -> Self {
160 self.header_template_enabled = true;
161 self.side_transport_profile = SideTransportProfile::Template;
162 self
163 }
164
165 #[inline]
166 pub fn with_omitted_unchanged_compact_timestamps(mut self) -> Self {
167 self.header_template_enabled = true;
168 self.omit_unchanged_compact_timestamps = true;
169 self
170 }
171
172 #[inline]
173 pub fn with_omitted_unchanged_compact_timestamps_for_type(
174 mut self,
175 ty: crate::DataType,
176 ) -> Self {
177 self.header_template_enabled = true;
178 self.compact_timestamp_omission_types.insert(ty);
179 self
180 }
181
182 #[inline]
183 pub fn effective_transport_profile(self) -> SideTransportProfile {
184 if !self.header_template_enabled && self.max_frame_bytes == 0 {
185 SideTransportProfile::Canonical
186 } else if self.side_transport_profile == SideTransportProfile::Canonical {
187 SideTransportProfile::Template
188 } else {
189 self.side_transport_profile
190 }
191 }
192
193 #[cfg(feature = "discovery")]
194 #[inline]
195 pub fn link_capabilities(self) -> discovery::LinkCapabilities {
196 let mut flags = discovery::LINK_CAPABILITY_END_TO_END_RELIABILITY;
197 if self.header_template_enabled {
198 flags |= discovery::LINK_CAPABILITY_HEADER_TEMPLATES;
199 }
200 if self.max_frame_bytes != 0 {
201 flags |= discovery::LINK_CAPABILITY_CHUNKING;
202 }
203 if self.reliable_enabled {
204 flags |= discovery::LINK_CAPABILITY_RELIABILITY;
205 }
206 if self.omit_unchanged_compact_timestamps
207 || !self.compact_timestamp_omission_types.is_empty()
208 {
209 flags |= discovery::LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS;
210 }
211 #[cfg(feature = "cryptography")]
212 {
213 flags |= discovery::LINK_CAPABILITY_CRYPTO;
214 }
215 discovery::LinkCapabilities {
216 version: 1,
217 flags,
218 profile: self.effective_transport_profile().discovery_code(),
219 max_frame_bytes: self.max_frame_bytes.min(u32::MAX as usize) as u32,
220 compact_header_target_bytes: self.compact_header_target_bytes.min(u32::MAX as usize)
221 as u32,
222 max_side_transport_templates: self.max_side_transport_templates.min(u32::MAX as usize)
223 as u32,
224 }
225 }
226}
227
228#[derive(Clone)]
230pub struct RelaySide {
231 pub name: &'static str,
232 pub tx_handler: RelayTxHandlerFn,
233 pub opts: RelaySideOptions,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
237pub enum RelayItem {
238 Packed(Arc<[u8]>),
239 Packet(Arc<Packet>),
240}
241
242#[derive(Clone, Debug, PartialEq, Eq)]
244struct RelayRxItem {
245 src: RelaySideId,
246 data: RelayItem,
247 priority: u8,
248}
249
250impl ByteCost for RelayRxItem {
251 fn byte_cost(&self) -> usize {
252 match &self.data {
253 RelayItem::Packed(bytes) => bytes.len(),
254 RelayItem::Packet(pkt) => pkt.byte_cost(),
255 }
256 }
257}
258
259#[derive(Clone, Debug, PartialEq, Eq)]
261struct RelayTxItem {
262 src: Option<RelaySideId>,
263 dst: RelaySideId,
264 data: RelayItem,
265 priority: u8,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq)]
269struct RelayReplayItem {
270 dst: RelaySideId,
271 bytes: Arc<[u8]>,
272 priority: u8,
273}
274
275impl ByteCost for RelayTxItem {
276 fn byte_cost(&self) -> usize {
277 match &self.data {
278 RelayItem::Packed(bytes) => bytes.len(),
279 RelayItem::Packet(pkt) => pkt.byte_cost(),
280 }
281 }
282}
283
284impl ByteCost for RelayReplayItem {
285 fn byte_cost(&self) -> usize {
286 self.bytes.len()
287 }
288}
289
290#[derive(Debug, Clone)]
293struct ReliableTxState {
294 next_seq: u32,
295 sent_order: VecDeque<u32>,
296 sent: BTreeMap<u32, ReliableSent>,
297}
298
299#[derive(Debug, Clone)]
300struct ReliableSent {
301 bytes: Arc<[u8]>,
302 last_send_ms: u64,
303 retries: u32,
304 queued: bool,
305 partial_acked: bool,
306}
307
308#[derive(Debug, Clone)]
309struct ReliableRxState {
310 expected_seq: u32,
311 buffered: BTreeMap<u32, Arc<[u8]>>,
312}
313
314#[derive(Debug, Clone)]
315struct ReliableReturnRouteState {
316 side: RelaySideId,
317}
318
319#[cfg(feature = "discovery")]
320#[derive(Debug, Clone, Default, PartialEq, Eq)]
321struct DiscoverySenderState {
322 reachable: Vec<crate::DataEndpoint>,
323 reachable_timesync_sources: Vec<String>,
324 topology_boards: Vec<TopologyBoardNode>,
325 last_seen_ms: u64,
326}
327
328#[inline]
329fn is_internal_control_type(ty: crate::DataType) -> bool {
330 if matches!(
331 ty,
332 crate::DataType::ReliableAck
333 | crate::DataType::ReliablePartialAck
334 | crate::DataType::ReliablePacketRequest
335 | crate::DataType::P2pMessage
336 ) {
337 return true;
338 }
339
340 #[cfg(feature = "timesync")]
341 if matches!(
342 ty,
343 crate::DataType::TimeSyncAnnounce
344 | crate::DataType::TimeSyncRequest
345 | crate::DataType::TimeSyncResponse
346 ) {
347 return true;
348 }
349
350 #[cfg(feature = "discovery")]
351 if discovery::is_discovery_type(ty) {
352 return true;
353 }
354
355 let _ = ty;
356 false
357}
358
359#[cfg(feature = "discovery")]
360#[derive(Debug, Clone, Default, PartialEq, Eq)]
361struct DiscoverySideState {
362 reachable: Vec<crate::DataEndpoint>,
363 reachable_timesync_sources: Vec<String>,
364 last_seen_ms: u64,
365 announcers: BTreeMap<String, DiscoverySenderState>,
366}
367
368#[derive(Clone, Debug, Default, PartialEq, Eq)]
369struct SideChunkAssembly {
370 total: u16,
371 received: BTreeMap<u16, Arc<[u8]>>,
372}
373
374#[derive(Clone, Debug, Default)]
375struct SideTransportState {
376 tx_template_ids: BTreeMap<u64, u32>,
377 tx_templates: BTreeMap<u64, SideHeaderTemplate>,
378 tx_last_timestamps: BTreeMap<u32, u64>,
379 rx_templates: BTreeMap<u64, SideHeaderTemplate>,
380 rx_templates_by_id: BTreeMap<u32, SideHeaderTemplate>,
381 rx_last_timestamps: BTreeMap<u32, u64>,
382 rx_chunks: BTreeMap<u32, SideChunkAssembly>,
383 next_chunk_id: u32,
384 next_template_id: u32,
385}
386
387impl SideTransportState {
388 fn tx_template_count(&self) -> usize {
389 self.tx_template_ids.len()
390 }
391
392 fn rx_template_count(&self) -> usize {
393 self.rx_templates_by_id.len()
394 }
395
396 fn insert_tx_template(
397 &mut self,
398 template: SideHeaderTemplate,
399 template_id: u32,
400 max_templates: usize,
401 ) -> bool {
402 if max_templates == 0 {
403 return false;
404 }
405 let mut evicted = false;
406 if self.tx_template_ids.len() >= max_templates
407 && !self.tx_template_ids.contains_key(&template.hash)
408 && let Some(old_hash) = self.tx_template_ids.keys().next().copied()
409 {
410 if let Some(old_id) = self.tx_template_ids.remove(&old_hash) {
411 self.tx_last_timestamps.remove(&old_id);
412 }
413 self.tx_templates.remove(&old_hash);
414 evicted = true;
415 }
416 self.tx_template_ids.insert(template.hash, template_id);
417 self.tx_templates.insert(template.hash, template);
418 evicted
419 }
420
421 fn insert_rx_template(
422 &mut self,
423 template_id: u32,
424 template: SideHeaderTemplate,
425 max_templates: usize,
426 ) -> bool {
427 if max_templates == 0 {
428 return false;
429 }
430 let mut evicted = false;
431 if self.rx_templates_by_id.len() >= max_templates
432 && !self.rx_templates_by_id.contains_key(&template_id)
433 && let Some(old_id) = self.rx_templates_by_id.keys().next().copied()
434 && let Some(old_template) = self.rx_templates_by_id.remove(&old_id)
435 {
436 self.rx_templates.remove(&old_template.hash);
437 self.rx_last_timestamps.remove(&old_id);
438 evicted = true;
439 }
440 self.rx_templates_by_id
441 .insert(template_id, template.clone());
442 self.rx_templates.insert(template.hash, template);
443 evicted
444 }
445}
446
447#[derive(Clone, Debug, PartialEq, Eq)]
448struct SideHeaderTemplate {
449 hash: u64,
450 base_flags: u8,
451 prefix: Arc<[u8]>,
452 between: Arc<[u8]>,
453 reliable_flags: Option<u8>,
454 reliable_compact: bool,
455}
456
457type SideTemplateExtract<'a> = (
458 SideHeaderTemplate,
459 crate::DataType,
460 u8,
461 u64,
462 u64,
463 u16,
464 Option<(u32, u32)>,
465 &'a [u8],
466);
467
468#[derive(Clone, Copy, Debug, PartialEq, Eq)]
469enum SideCompactTimestampMode {
470 Absolute,
471 Delta,
472 Omitted,
473}
474
475#[derive(Debug, Clone, Default)]
476struct AdaptiveRouteStats {
477 estimated_bandwidth_bps: u64,
478 peak_bandwidth_bps: u64,
479 last_observed_ms: u64,
480 last_slow_observed_ms: u64,
481 sample_count: u64,
482 window_started_ms: u64,
483 window_bytes: u64,
484 peak_usage_bps: u64,
485}
486
487#[cfg(feature = "discovery")]
488#[derive(Debug, Clone, Default)]
489struct DiscoverySideThrottleState {
490 next_ping_ms: u64,
491 next_full_ms: u64,
492}
493
494#[cfg(all(feature = "discovery", feature = "timesync"))]
495#[derive(Debug, Clone, Default)]
496struct TimeSyncSideThrottleState {
497 next_allowed_ms: u64,
498}
499
500#[cfg(feature = "discovery")]
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
502enum DiscoveryAdvertiseLevel {
503 MinimalPing,
504 Full,
505}
506
507impl AdaptiveRouteStats {
508 #[inline]
509 fn observe(&mut self, bytes: usize, sample_bps: u64, now_ms: u64) {
510 self.estimated_bandwidth_bps = if self.estimated_bandwidth_bps == 0 {
511 sample_bps
512 } else if sample_bps >= self.estimated_bandwidth_bps {
513 self.estimated_bandwidth_bps
514 .saturating_mul(3)
515 .saturating_add(sample_bps.saturating_mul(5))
516 / 8
517 } else {
518 self.estimated_bandwidth_bps
519 .saturating_mul(7)
520 .saturating_add(sample_bps)
521 / 8
522 };
523 self.peak_bandwidth_bps = self.peak_bandwidth_bps.max(sample_bps);
524 self.last_observed_ms = now_ms;
525 if sample_bps > 0 && sample_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS {
526 self.last_slow_observed_ms = now_ms;
527 }
528 self.sample_count = self.sample_count.saturating_add(1);
529 if self.window_started_ms == 0 || now_ms.saturating_sub(self.window_started_ms) > 1_000 {
530 self.window_started_ms = now_ms;
531 self.window_bytes = 0;
532 }
533 self.window_bytes = self.window_bytes.saturating_add(bytes as u64);
534 self.peak_usage_bps = self.peak_usage_bps.max(self.current_usage_bps(now_ms));
535 }
536
537 #[inline]
538 fn current_usage_bps(&self, now_ms: u64) -> u64 {
539 if self.window_started_ms == 0 {
540 return 0;
541 }
542 let elapsed_ms = now_ms.saturating_sub(self.window_started_ms).max(1);
543 (u128::from(self.window_bytes).saturating_mul(1000) / u128::from(elapsed_ms))
544 .min(u128::from(u64::MAX)) as u64
545 }
546
547 #[inline]
548 fn available_headroom_bps(&self, now_ms: u64) -> u64 {
549 let capacity = self
550 .estimated_bandwidth_bps
551 .max(self.peak_bandwidth_bps)
552 .max(1);
553 capacity.saturating_sub(self.current_usage_bps(now_ms))
554 }
555
556 #[inline]
557 fn weight(&self, now_ms: u64) -> u64 {
558 self.available_headroom_bps(now_ms).max(1)
559 }
560
561 #[inline]
562 fn snapshot(&self, now_ms: u64, auto_balancing_enabled: bool) -> AdaptiveLinkStats {
563 let current_usage_bps = self.current_usage_bps(now_ms);
564 let estimated_capacity_bps = self.estimated_bandwidth_bps.max(1);
565 let peak_capacity_bps = self.peak_bandwidth_bps.max(estimated_capacity_bps);
566 let available_headroom_bps = peak_capacity_bps.saturating_sub(current_usage_bps);
567 AdaptiveLinkStats {
568 auto_balancing_enabled,
569 estimated_capacity_bps,
570 peak_capacity_bps,
571 current_usage_bps,
572 peak_usage_bps: self.peak_usage_bps.max(current_usage_bps),
573 available_headroom_bps,
574 effective_weight: available_headroom_bps.max(1),
575 last_observed_ms: self.last_observed_ms,
576 sample_count: self.sample_count,
577 }
578 }
579}
580
581#[derive(Debug, Clone, Default)]
582struct TypeRuntimeStatsInner {
583 tx_packets: u64,
584 tx_bytes: u64,
585 rx_packets: u64,
586 rx_bytes: u64,
587 relayed_tx_packets: u64,
588 relayed_tx_bytes: u64,
589 relayed_rx_packets: u64,
590 relayed_rx_bytes: u64,
591 tx_retries: u64,
592 handler_failures: u64,
593}
594
595#[derive(Debug, Clone, Default)]
596struct SideRuntimeStatsInner {
597 tx_packets: u64,
598 tx_bytes: u64,
599 rx_packets: u64,
600 rx_bytes: u64,
601 relayed_tx_packets: u64,
602 relayed_tx_bytes: u64,
603 relayed_rx_packets: u64,
604 relayed_rx_bytes: u64,
605 tx_retries: u64,
606 tx_handler_failures: u64,
607 total_handler_retries: u64,
608 side_transport_full_frames: u64,
609 side_transport_compact_frames: u64,
610 side_transport_compact_delta_frames: u64,
611 side_transport_compact_omitted_timestamp_frames: u64,
612 side_transport_chunk_frames: u64,
613 side_transport_raw_bytes: u64,
614 side_transport_wire_bytes: u64,
615 side_transport_bytes_saved: u64,
616 side_transport_min_compact_overhead_bytes: Option<usize>,
617 side_transport_max_compact_overhead_bytes: Option<usize>,
618 side_transport_compact_target_misses: u64,
619 side_transport_template_evictions: u64,
620 data_types: BTreeMap<u32, TypeRuntimeStatsInner>,
621}
622
623impl SideRuntimeStatsInner {
624 fn type_stats_mut(&mut self, ty: crate::DataType) -> &mut TypeRuntimeStatsInner {
625 self.data_types.entry(ty.as_u32()).or_default()
626 }
627
628 fn note_tx(&mut self, ty: crate::DataType, bytes: usize, retries: usize) {
629 self.tx_packets = self.tx_packets.saturating_add(1);
630 self.tx_bytes = self.tx_bytes.saturating_add(bytes as u64);
631 self.relayed_tx_packets = self.relayed_tx_packets.saturating_add(1);
632 self.relayed_tx_bytes = self.relayed_tx_bytes.saturating_add(bytes as u64);
633 self.tx_retries = self.tx_retries.saturating_add(retries as u64);
634 self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
635 let stats = self.type_stats_mut(ty);
636 stats.tx_packets = stats.tx_packets.saturating_add(1);
637 stats.tx_bytes = stats.tx_bytes.saturating_add(bytes as u64);
638 stats.relayed_tx_packets = stats.relayed_tx_packets.saturating_add(1);
639 stats.relayed_tx_bytes = stats.relayed_tx_bytes.saturating_add(bytes as u64);
640 stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
641 }
642
643 fn note_rx(&mut self, ty: crate::DataType, bytes: usize) {
644 self.rx_packets = self.rx_packets.saturating_add(1);
645 self.rx_bytes = self.rx_bytes.saturating_add(bytes as u64);
646 self.relayed_rx_packets = self.relayed_rx_packets.saturating_add(1);
647 self.relayed_rx_bytes = self.relayed_rx_bytes.saturating_add(bytes as u64);
648 let stats = self.type_stats_mut(ty);
649 stats.rx_packets = stats.rx_packets.saturating_add(1);
650 stats.rx_bytes = stats.rx_bytes.saturating_add(bytes as u64);
651 stats.relayed_rx_packets = stats.relayed_rx_packets.saturating_add(1);
652 stats.relayed_rx_bytes = stats.relayed_rx_bytes.saturating_add(bytes as u64);
653 }
654
655 fn note_tx_failure(&mut self, ty: crate::DataType, retries: usize) {
656 self.tx_handler_failures = self.tx_handler_failures.saturating_add(1);
657 self.tx_retries = self.tx_retries.saturating_add(retries as u64);
658 self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
659 let stats = self.type_stats_mut(ty);
660 stats.handler_failures = stats.handler_failures.saturating_add(1);
661 stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
662 }
663
664 fn note_side_transport_full(&mut self, raw_bytes: usize, wire_bytes: usize) {
665 self.side_transport_full_frames = self.side_transport_full_frames.saturating_add(1);
666 self.note_side_transport_bytes(raw_bytes, wire_bytes);
667 }
668
669 fn note_side_transport_compact(
670 &mut self,
671 raw_bytes: usize,
672 wire_bytes: usize,
673 compact_overhead_bytes: usize,
674 used_timestamp_delta: bool,
675 omitted_timestamp: bool,
676 ) {
677 self.side_transport_compact_frames = self.side_transport_compact_frames.saturating_add(1);
678 if used_timestamp_delta {
679 self.side_transport_compact_delta_frames =
680 self.side_transport_compact_delta_frames.saturating_add(1);
681 }
682 if omitted_timestamp {
683 self.side_transport_compact_omitted_timestamp_frames = self
684 .side_transport_compact_omitted_timestamp_frames
685 .saturating_add(1);
686 }
687 self.note_side_transport_bytes(raw_bytes, wire_bytes);
688 self.side_transport_min_compact_overhead_bytes = Some(
689 self.side_transport_min_compact_overhead_bytes
690 .map_or(compact_overhead_bytes, |v| v.min(compact_overhead_bytes)),
691 );
692 self.side_transport_max_compact_overhead_bytes = Some(
693 self.side_transport_max_compact_overhead_bytes
694 .map_or(compact_overhead_bytes, |v| v.max(compact_overhead_bytes)),
695 );
696 }
697
698 fn note_side_transport_chunks(&mut self, chunks: usize) {
699 self.side_transport_chunk_frames = self
700 .side_transport_chunk_frames
701 .saturating_add(chunks as u64);
702 }
703
704 fn note_side_transport_bytes(&mut self, raw_bytes: usize, wire_bytes: usize) {
705 self.side_transport_raw_bytes = self
706 .side_transport_raw_bytes
707 .saturating_add(raw_bytes as u64);
708 self.side_transport_wire_bytes = self
709 .side_transport_wire_bytes
710 .saturating_add(wire_bytes as u64);
711 if raw_bytes > wire_bytes {
712 self.side_transport_bytes_saved = self
713 .side_transport_bytes_saved
714 .saturating_add((raw_bytes - wire_bytes) as u64);
715 }
716 }
717
718 fn note_side_transport_compact_target_miss(&mut self) {
719 self.side_transport_compact_target_misses =
720 self.side_transport_compact_target_misses.saturating_add(1);
721 }
722
723 fn note_side_transport_template_eviction(&mut self) {
724 self.side_transport_template_evictions =
725 self.side_transport_template_evictions.saturating_add(1);
726 }
727}
728
729#[derive(Clone, Debug)]
730pub struct RelayConfig {
731 sender: Arc<str>,
732 memory: RuntimeMemoryConfig,
733}
734
735impl RelayConfig {
736 pub fn new() -> Self {
737 Self::default()
738 }
739
740 pub fn with_sender<S: AsRef<str>>(mut self, sender: S) -> Self {
741 self.sender = Arc::from(sender.as_ref());
742 self
743 }
744
745 pub fn with_memory_config(mut self, memory: RuntimeMemoryConfig) -> TelemetryResult<Self> {
746 memory.validate()?;
747 self.memory = memory;
748 Ok(self)
749 }
750
751 fn sender(&self) -> Arc<str> {
752 self.sender.clone()
753 }
754
755 fn memory_config(&self) -> RuntimeMemoryConfig {
756 self.memory
757 }
758}
759
760impl Default for RelayConfig {
761 fn default() -> Self {
762 Self {
763 sender: Arc::from("RELAY"),
764 memory: RuntimeMemoryConfig::default(),
765 }
766 }
767}
768
769#[derive(Clone, Copy, Debug, PartialEq, Eq)]
770enum RouteSelectionOrigin {
771 Flood,
772 Discovered,
773}
774
775struct RelayInner {
777 memory: RuntimeMemoryConfig,
778 sides: Vec<Option<RelaySide>>,
779 route_overrides: BTreeMap<(Option<RelaySideId>, RelaySideId), bool>,
780 typed_route_overrides: BTreeMap<(Option<RelaySideId>, u32, RelaySideId), bool>,
781 route_weights: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
782 route_priorities: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
783 source_route_modes: BTreeMap<Option<RelaySideId>, RouteSelectionMode>,
784 route_selection_cursors: BTreeMap<Option<RelaySideId>, u64>,
785 adaptive_route_stats: BTreeMap<RelaySideId, AdaptiveRouteStats>,
786 side_runtime_stats: BTreeMap<RelaySideId, SideRuntimeStatsInner>,
787 side_transport: BTreeMap<RelaySideId, SideTransportState>,
788 rx_queue: BoundedDeque<RelayRxItem>,
789 tx_queue: BoundedDeque<RelayTxItem>,
790 replay_queue: BoundedDeque<RelayReplayItem>,
791 recent_rx: BoundedDeque<u64>,
792 reliable_tx: BTreeMap<(RelaySideId, u32), ReliableTxState>,
793 reliable_rx: BTreeMap<(RelaySideId, u32), ReliableRxState>,
794 reliable_return_routes: BTreeMap<u64, ReliableReturnRouteState>,
795 reliable_return_route_order: VecDeque<u64>,
796 end_to_end_acked_destinations: BTreeMap<u64, BTreeSet<u64>>,
797 end_to_end_acked_destination_order: VecDeque<u64>,
798 total_handler_failures: u64,
799 total_handler_retries: u64,
800 #[cfg(feature = "discovery")]
801 discovery_routes: BTreeMap<RelaySideId, DiscoverySideState>,
802 #[cfg(feature = "discovery")]
803 discovery_cadence: DiscoveryCadenceState,
804 #[cfg(feature = "discovery")]
805 discovery_side_throttle: BTreeMap<RelaySideId, DiscoverySideThrottleState>,
806 #[cfg(all(feature = "discovery", feature = "timesync"))]
807 timesync_side_throttle: BTreeMap<RelaySideId, TimeSyncSideThrottleState>,
808}
809
810#[derive(Clone, Copy, Debug, PartialEq, Eq)]
811enum RelayQueueKind {
812 Rx,
813 Tx,
814 Replay,
815 Recent,
816 ReliableRxBuffer,
817 #[cfg(feature = "discovery")]
818 Discovery,
819}
820
821impl RelayInner {
822 #[cfg(feature = "discovery")]
823 fn topology_board_byte_cost(board: &TopologyBoardNode) -> usize {
824 board
825 .sender_id
826 .len()
827 .saturating_add(board.reachable_endpoints.len() * size_of::<crate::DataEndpoint>())
828 .saturating_add(
829 board
830 .reachable_timesync_sources
831 .iter()
832 .map(|s| s.len())
833 .sum::<usize>(),
834 )
835 .saturating_add(board.connections.iter().map(|s| s.len()).sum::<usize>())
836 }
837
838 #[cfg(feature = "discovery")]
839 fn discovery_sender_byte_cost(sender: &str, state: &DiscoverySenderState) -> usize {
840 sender
841 .len()
842 .saturating_add(state.reachable.len() * size_of::<crate::DataEndpoint>())
843 .saturating_add(
844 state
845 .reachable_timesync_sources
846 .iter()
847 .map(|s| s.len())
848 .sum::<usize>(),
849 )
850 .saturating_add(
851 state
852 .topology_boards
853 .iter()
854 .map(Self::topology_board_byte_cost)
855 .sum::<usize>(),
856 )
857 .saturating_add(size_of::<DiscoverySenderState>())
858 }
859
860 #[cfg(feature = "discovery")]
861 fn discovery_route_byte_cost(route: &DiscoverySideState) -> usize {
862 size_of::<DiscoverySideState>()
863 .saturating_add(route.reachable.len() * size_of::<crate::DataEndpoint>())
864 .saturating_add(
865 route
866 .reachable_timesync_sources
867 .iter()
868 .map(|s| s.len())
869 .sum::<usize>(),
870 )
871 .saturating_add(
872 route
873 .announcers
874 .iter()
875 .map(|(sender, state)| Self::discovery_sender_byte_cost(sender, state))
876 .sum::<usize>(),
877 )
878 }
879
880 #[cfg(feature = "discovery")]
881 fn discovery_bytes_used(&self) -> usize {
882 self.discovery_routes
883 .values()
884 .map(Self::discovery_route_byte_cost)
885 .sum()
886 }
887
888 #[inline]
889 fn reliable_rx_buffered_bytes(&self) -> usize {
890 self.reliable_rx
891 .values()
892 .flat_map(|state| state.buffered.values())
893 .map(|bytes| size_of::<Arc<[u8]>>() + bytes.len())
894 .sum()
895 }
896
897 #[inline]
898 fn shared_queue_bytes_used(&self) -> usize {
899 self.rx_queue
900 .bytes_used()
901 .saturating_add(self.tx_queue.bytes_used())
902 .saturating_add(self.replay_queue.bytes_used())
903 .saturating_add(self.recent_rx.max_bytes())
904 .saturating_add(self.reliable_rx_buffered_bytes())
905 .saturating_add(crate::config::schema_bytes_used())
906 .saturating_add({
907 #[cfg(feature = "discovery")]
908 {
909 self.discovery_bytes_used()
910 }
911 #[cfg(not(feature = "discovery"))]
912 {
913 0
914 }
915 })
916 }
917
918 fn reliable_rx_buffer_len(&self) -> usize {
919 self.reliable_rx
920 .values()
921 .map(|state| state.buffered.len())
922 .sum()
923 }
924
925 fn pop_reliable_rx_buffered(&mut self) -> Option<Arc<[u8]>> {
926 let key = self
927 .reliable_rx
928 .iter()
929 .find_map(|(key, state)| (!state.buffered.is_empty()).then_some(*key))?;
930 self.reliable_rx
931 .get_mut(&key)?
932 .buffered
933 .pop_first()
934 .map(|(_, v)| v)
935 }
936
937 fn pop_shared_queue_item(&mut self, preferred: RelayQueueKind) -> bool {
938 match preferred {
939 RelayQueueKind::Rx => self.rx_queue.pop_front().is_some(),
940 RelayQueueKind::Tx => self.tx_queue.pop_front().is_some(),
941 RelayQueueKind::Replay => self.replay_queue.pop_front().is_some(),
942 RelayQueueKind::Recent => self.recent_rx.pop_front().is_some(),
943 RelayQueueKind::ReliableRxBuffer => self.pop_reliable_rx_buffered().is_some(),
944 #[cfg(feature = "discovery")]
945 RelayQueueKind::Discovery => self.pop_discovery_route(),
946 }
947 }
948
949 #[cfg(feature = "discovery")]
950 fn pop_discovery_route(&mut self) -> bool {
951 let Some((&side, _)) = self
952 .discovery_routes
953 .iter()
954 .min_by_key(|(_, route)| route.last_seen_ms)
955 else {
956 return false;
957 };
958 self.discovery_routes.remove(&side);
959 Self::queue_budget_warning("topology route evicted because shared queue budget is full");
960 true
961 }
962
963 fn largest_shared_queue(&self) -> Option<RelayQueueKind> {
964 let candidates = [
965 (
966 RelayQueueKind::Rx,
967 self.rx_queue.bytes_used(),
968 self.rx_queue.len(),
969 ),
970 (
971 RelayQueueKind::Tx,
972 self.tx_queue.bytes_used(),
973 self.tx_queue.len(),
974 ),
975 (
976 RelayQueueKind::Replay,
977 self.replay_queue.bytes_used(),
978 self.replay_queue.len(),
979 ),
980 (RelayQueueKind::Recent, 0, 0),
981 (
982 RelayQueueKind::ReliableRxBuffer,
983 self.reliable_rx_buffered_bytes(),
984 self.reliable_rx_buffer_len(),
985 ),
986 #[cfg(feature = "discovery")]
987 (
988 RelayQueueKind::Discovery,
989 self.discovery_bytes_used(),
990 self.discovery_routes.len(),
991 ),
992 ];
993 candidates
994 .into_iter()
995 .filter(|(_, bytes, len)| *bytes > 0 && *len > 0)
996 .max_by_key(|(kind, bytes, _)| {
997 (
998 *bytes,
999 if *kind == RelayQueueKind::ReliableRxBuffer {
1000 0
1001 } else {
1002 1
1003 },
1004 )
1005 })
1006 .map(|(kind, _, _)| kind)
1007 }
1008
1009 fn make_shared_queue_room(
1010 &mut self,
1011 incoming_cost: usize,
1012 preferred: RelayQueueKind,
1013 ) -> TelemetryResult<()> {
1014 if incoming_cost > self.memory.max_queue_budget {
1015 return Err(TelemetryError::PacketTooLarge(
1016 "Item exceeds maximum shared queue budget",
1017 ));
1018 }
1019
1020 while self.shared_queue_bytes_used().saturating_add(incoming_cost)
1021 > self.memory.max_queue_budget
1022 {
1023 let victim = self.largest_shared_queue().unwrap_or(preferred);
1024 if victim == RelayQueueKind::Discovery {
1025 Self::queue_budget_warning("topology data is using the largest queue budget share");
1026 }
1027 if !self.pop_shared_queue_item(victim) && !self.pop_shared_queue_item(preferred) {
1028 return Err(TelemetryError::PacketTooLarge(
1029 "Item exceeds maximum shared queue budget",
1030 ));
1031 }
1032 }
1033
1034 Ok(())
1035 }
1036
1037 #[inline]
1038 fn queue_budget_warning(msg: &str) {
1039 #[cfg(feature = "std")]
1040 eprintln!("sedsnet queue budget warning: {msg}");
1041 let _ = msg;
1042 }
1043
1044 #[cfg(feature = "discovery")]
1045 fn fit_discovery_budget(&mut self) {
1046 while self.shared_queue_bytes_used() > self.memory.max_queue_budget {
1047 if !self.pop_discovery_route() {
1048 break;
1049 }
1050 }
1051 }
1052
1053 fn push_rx(&mut self, item: RelayRxItem) -> TelemetryResult<()> {
1054 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Rx)?;
1055 self.rx_queue
1056 .push_back_prioritized(item, |queued| queued.priority)
1057 }
1058
1059 fn push_tx(&mut self, item: RelayTxItem) -> TelemetryResult<()> {
1060 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Tx)?;
1061 self.tx_queue
1062 .push_back_prioritized(item, |queued| queued.priority)
1063 }
1064
1065 fn push_replay(&mut self, item: RelayReplayItem) -> TelemetryResult<()> {
1066 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Replay)?;
1067 self.replay_queue
1068 .push_back_prioritized(item, |queued| queued.priority)
1069 }
1070
1071 fn push_recent_rx(&mut self, id: u64) -> TelemetryResult<()> {
1072 while self.recent_rx.len() >= self.memory.max_recent_rx_ids {
1073 let _ = self.recent_rx.pop_front();
1074 }
1075 self.make_shared_queue_room(0, RelayQueueKind::Recent)?;
1076 self.recent_rx.push_back(id)
1077 }
1078
1079 fn buffer_reliable_rx(
1080 &mut self,
1081 side: RelaySideId,
1082 ty: crate::DataType,
1083 seq: u32,
1084 bytes: Arc<[u8]>,
1085 ) -> TelemetryResult<()> {
1086 let key = Relay::reliable_key(side, ty);
1087 if self
1088 .reliable_rx
1089 .get(&key)
1090 .is_some_and(|state| state.buffered.contains_key(&seq))
1091 {
1092 return Ok(());
1093 }
1094 let cost = size_of::<Arc<[u8]>>() + bytes.len();
1095 self.make_shared_queue_room(cost, RelayQueueKind::ReliableRxBuffer)?;
1096 let rx_state = self
1097 .reliable_rx
1098 .entry(key)
1099 .or_insert_with(|| ReliableRxState {
1100 expected_seq: 1,
1101 buffered: BTreeMap::new(),
1102 });
1103 if rx_state.buffered.len() >= runtime_reliable_max_pending() {
1104 let _ = rx_state.buffered.pop_first();
1105 }
1106 rx_state.buffered.insert(seq, bytes);
1107 Ok(())
1108 }
1109}
1110
1111pub struct Relay {
1116 sender: RouterMutex<Arc<str>>,
1117 state: RouterMutex<RelayInner>,
1118 side_tx_gate: ReentryGate,
1119 clock: Box<dyn Clock + Send + Sync>,
1120}
1121
1122enum RemoteSidePlan {
1123 Target(Vec<RelaySideId>),
1124}
1125
1126impl Relay {
1127 const END_TO_END_ACK_SENDER: &'static str = "E2EACK";
1128 const END_TO_END_ACK_PREFIX: &'static str = "E2EACK:";
1129
1130 fn relay_item_priority(data: &RelayItem) -> TelemetryResult<u8> {
1131 let ty = match data {
1132 RelayItem::Packet(pkt) => pkt.data_type(),
1133 RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
1134 };
1135 Ok(message_priority(ty))
1136 }
1137
1138 #[inline]
1139 fn is_side_tx_busy(err: &TelemetryError) -> bool {
1140 matches!(err, TelemetryError::Io("side tx busy"))
1141 }
1142
1143 fn process_replay_queue_item(&self) -> TelemetryResult<bool> {
1144 let Some(item) = ({
1145 let mut st = self.state.lock();
1146 st.replay_queue.pop_front()
1147 }) else {
1148 return Ok(false);
1149 };
1150 let frame = wire_format::peek_frame_info(item.bytes.as_ref())?;
1151 let ty = frame.envelope.ty;
1152 let Some(hdr) = frame.reliable else {
1153 return Ok(false);
1154 };
1155 {
1156 let mut st = self.state.lock();
1157 let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1158 if !tx_state.sent.contains_key(&hdr.seq) {
1159 return Ok(false);
1160 }
1161 }
1162 if let Err(e) = self.send_reliable_raw_to_side(item.dst, item.bytes.clone()) {
1163 if Self::is_side_tx_busy(&e) {
1164 let mut st = self.state.lock();
1165 st.push_replay(item)?;
1166 return Ok(false);
1167 }
1168 return Err(e);
1169 }
1170 let mut st = self.state.lock();
1171 let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1172 if let Some(sent) = tx_state.sent.get_mut(&hdr.seq) {
1173 sent.last_send_ms = self.clock.now_ms();
1174 sent.queued = false;
1175 }
1176 Ok(true)
1177 }
1178
1179 fn pop_ready_tx_item(
1180 &self,
1181 ) -> Option<(
1182 Option<RelaySideId>,
1183 RelaySideId,
1184 RelayTxHandlerFn,
1185 RelaySideOptions,
1186 RelayItem,
1187 )> {
1188 let mut st = self.state.lock();
1189 if let Some(item) = st.tx_queue.pop_front() {
1190 let side = st.sides.get(item.dst).and_then(|side| side.clone());
1191 side.map(|s| (item.src, item.dst, s.tx_handler, s.opts, item.data))
1192 } else {
1193 None
1194 }
1195 }
1196
1197 fn send_tx_item(
1198 &self,
1199 src: Option<RelaySideId>,
1200 dst: RelaySideId,
1201 handler: RelayTxHandlerFn,
1202 opts: RelaySideOptions,
1203 data: RelayItem,
1204 ) -> TelemetryResult<bool> {
1205 let allowed = {
1206 let mut st = self.state.lock();
1207 let ty = match &data {
1208 RelayItem::Packet(pkt) => Some(pkt.data_type()),
1209 RelayItem::Packed(bytes) => Some(wire_format::peek_envelope(bytes.as_ref())?.ty),
1210 };
1211 let route_allowed = self.route_allowed_locked(&st, src, ty, dst);
1212 #[cfg(all(feature = "discovery", feature = "timesync"))]
1213 let timesync_allowed = ty
1214 .map(|ty| {
1215 Self::timesync_allowed_for_side_locked(&mut st, dst, ty, self.clock.now_ms())
1216 })
1217 .unwrap_or(true);
1218 #[cfg(not(all(feature = "discovery", feature = "timesync")))]
1219 let timesync_allowed = true;
1220 route_allowed && timesync_allowed
1221 };
1222 if !allowed {
1223 return Ok(false);
1224 }
1225 if opts.reliable_enabled && matches!(handler, RelayTxHandlerFn::Packed(_)) {
1226 self.send_reliable_to_side(dst, data)?;
1227 Ok(true)
1228 } else if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1229 self.call_tx_handler(dst, &handler, &adjusted)?;
1230 Ok(true)
1231 } else {
1232 Ok(false)
1233 }
1234 }
1235
1236 pub fn new(clock: Box<dyn Clock + Send + Sync>) -> Self {
1238 Self::new_with_config(RelayConfig::default(), clock)
1239 }
1240
1241 pub fn new_with_config(cfg: RelayConfig, clock: Box<dyn Clock + Send + Sync>) -> Self {
1243 let memory = cfg.memory_config();
1244 Self {
1245 sender: RouterMutex::new(cfg.sender()),
1246 state: RouterMutex::new(RelayInner {
1247 memory,
1248 sides: Vec::new(),
1249 route_overrides: BTreeMap::new(),
1250 typed_route_overrides: BTreeMap::new(),
1251 route_weights: BTreeMap::new(),
1252 route_priorities: BTreeMap::new(),
1253 source_route_modes: BTreeMap::new(),
1254 route_selection_cursors: BTreeMap::new(),
1255 adaptive_route_stats: BTreeMap::new(),
1256 side_runtime_stats: BTreeMap::new(),
1257 side_transport: BTreeMap::new(),
1258 rx_queue: BoundedDeque::new(
1259 memory.max_queue_budget,
1260 memory.starting_queue_size,
1261 memory.queue_grow_step,
1262 ),
1263 tx_queue: BoundedDeque::new(
1264 memory.max_queue_budget,
1265 memory.starting_queue_size,
1266 memory.queue_grow_step,
1267 ),
1268 replay_queue: BoundedDeque::new(
1269 memory.max_queue_budget,
1270 memory.starting_queue_size,
1271 memory.queue_grow_step,
1272 ),
1273 recent_rx: BoundedDeque::new(
1274 memory.recent_rx_queue_bytes(),
1275 memory.recent_rx_queue_bytes(),
1276 memory.queue_grow_step,
1277 ),
1278 reliable_tx: BTreeMap::new(),
1279 reliable_rx: BTreeMap::new(),
1280 reliable_return_routes: BTreeMap::new(),
1281 reliable_return_route_order: VecDeque::new(),
1282 end_to_end_acked_destinations: BTreeMap::new(),
1283 end_to_end_acked_destination_order: VecDeque::new(),
1284 total_handler_failures: 0,
1285 total_handler_retries: 0,
1286 #[cfg(feature = "discovery")]
1287 discovery_routes: BTreeMap::new(),
1288 #[cfg(feature = "discovery")]
1289 discovery_cadence: DiscoveryCadenceState::default(),
1290 #[cfg(feature = "discovery")]
1291 discovery_side_throttle: BTreeMap::new(),
1292 #[cfg(all(feature = "discovery", feature = "timesync"))]
1293 timesync_side_throttle: BTreeMap::new(),
1294 }),
1295 side_tx_gate: ReentryGate::new(),
1296 clock,
1297 }
1298 }
1299
1300 #[inline]
1301 fn sender_arc(&self) -> Arc<str> {
1302 self.sender.lock().clone()
1303 }
1304
1305 #[inline]
1306 pub fn sender(&self) -> Arc<str> {
1307 self.sender_arc()
1308 }
1309
1310 pub fn set_sender<S: AsRef<str>>(&self, sender: S) {
1311 *self.sender.lock() = Arc::from(sender.as_ref());
1312 }
1313
1314 #[inline]
1315 fn try_enter_side_tx(&self) -> Option<ReentryGuard<'_>> {
1316 self.side_tx_gate.try_enter()
1317 }
1318
1319 #[inline]
1320 fn side_tx_active(&self) -> bool {
1321 self.side_tx_gate.is_active()
1322 }
1323
1324 #[inline]
1325 fn side_ref(st: &RelayInner, side: RelaySideId) -> TelemetryResult<&RelaySide> {
1326 st.sides
1327 .get(side)
1328 .and_then(|side| side.as_ref())
1329 .ok_or(TelemetryError::HandlerError("relay: invalid side id"))
1330 }
1331
1332 fn note_side_tx_success(
1333 &self,
1334 side: RelaySideId,
1335 ty: crate::DataType,
1336 bytes: usize,
1337 attempts: usize,
1338 ) {
1339 let mut st = self.state.lock();
1340 let entry = st.side_runtime_stats.entry(side).or_default();
1341 entry.note_tx(ty, bytes, attempts.saturating_sub(1));
1342 }
1343
1344 fn note_side_tx_failure(&self, side: RelaySideId, ty: crate::DataType, attempts: usize) {
1345 let mut st = self.state.lock();
1346 st.total_handler_failures = st.total_handler_failures.saturating_add(1);
1347 st.total_handler_retries = st.total_handler_retries.saturating_add(attempts as u64);
1348 let entry = st.side_runtime_stats.entry(side).or_default();
1349 entry.note_tx_failure(ty, attempts);
1350 }
1351
1352 fn note_side_rx(&self, side: RelaySideId, ty: crate::DataType, bytes: usize) {
1353 let mut st = self.state.lock();
1354 let entry = st.side_runtime_stats.entry(side).or_default();
1355 entry.note_rx(ty, bytes);
1356 }
1357
1358 #[inline]
1359 fn ensure_side_ingress_enabled(&self, side: RelaySideId) -> TelemetryResult<()> {
1360 let st = self.state.lock();
1361 let side_ref = Self::side_ref(&st, side)?;
1362 if side_ref.opts.ingress_enabled {
1363 Ok(())
1364 } else {
1365 Err(TelemetryError::HandlerError(
1366 "relay: ingress disabled for side id",
1367 ))
1368 }
1369 }
1370
1371 #[inline]
1372 fn route_allowed_locked(
1373 &self,
1374 st: &RelayInner,
1375 src: Option<RelaySideId>,
1376 ty: Option<crate::DataType>,
1377 dst: RelaySideId,
1378 ) -> bool {
1379 let Ok(dst_side) = Self::side_ref(st, dst) else {
1380 return false;
1381 };
1382 if !dst_side.opts.egress_enabled {
1383 return false;
1384 }
1385 if let Some(src_id) = src {
1386 let Ok(src_side) = Self::side_ref(st, src_id) else {
1387 return false;
1388 };
1389 if !src_side.opts.ingress_enabled || src_id == dst {
1390 return false;
1391 }
1392 }
1393 let base_allowed = st.route_overrides.get(&(src, dst)).copied().unwrap_or(true);
1394 if !base_allowed {
1395 return false;
1396 }
1397
1398 let Some(ty) = ty else {
1399 return true;
1400 };
1401 if st
1402 .typed_route_overrides
1403 .keys()
1404 .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1405 {
1406 return st
1407 .typed_route_overrides
1408 .get(&(src, ty.as_u32(), dst))
1409 .copied()
1410 .unwrap_or(false);
1411 }
1412 true
1413 }
1414
1415 fn has_typed_route_overrides_locked(
1416 st: &RelayInner,
1417 src: Option<RelaySideId>,
1418 ty: crate::DataType,
1419 ) -> bool {
1420 st.typed_route_overrides
1421 .keys()
1422 .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1423 }
1424
1425 fn eligible_side_ids_locked(
1426 &self,
1427 st: &RelayInner,
1428 src: Option<RelaySideId>,
1429 ty: Option<crate::DataType>,
1430 restrict_link_local: bool,
1431 ) -> Vec<RelaySideId> {
1432 st.sides
1433 .iter()
1434 .enumerate()
1435 .filter_map(|(side_id, side)| {
1436 let side = side.as_ref()?;
1437 if restrict_link_local && !side.opts.link_local_enabled {
1438 return None;
1439 }
1440 if self.route_allowed_locked(st, src, ty, side_id) {
1441 Some(side_id)
1442 } else {
1443 None
1444 }
1445 })
1446 .collect()
1447 }
1448
1449 fn apply_route_selection_locked(
1450 &self,
1451 st: &mut RelayInner,
1452 src: Option<RelaySideId>,
1453 mut sides: Vec<RelaySideId>,
1454 origin: RouteSelectionOrigin,
1455 ) -> Vec<RelaySideId> {
1456 if sides.len() <= 1 {
1457 return sides;
1458 }
1459
1460 let selection_mode = st.source_route_modes.get(&src).copied();
1461 if selection_mode.is_none() && origin == RouteSelectionOrigin::Discovered {
1462 return self.apply_adaptive_discovery_selection_locked(st, src, sides);
1463 }
1464
1465 match selection_mode.unwrap_or(RouteSelectionMode::Fanout) {
1466 RouteSelectionMode::Fanout => sides,
1467 RouteSelectionMode::Weighted => {
1468 sides.sort_unstable();
1469 let total_weight = sides.iter().fold(0_u64, |acc, side| {
1470 acc + u64::from(st.route_weights.get(&(src, *side)).copied().unwrap_or(1))
1471 });
1472 if total_weight == 0 {
1473 return Vec::new();
1474 }
1475 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1476 let pick = *cursor % total_weight;
1477 *cursor = cursor.wrapping_add(1);
1478 let mut remaining = pick;
1479 for side in sides {
1480 let weight =
1481 u64::from(st.route_weights.get(&(src, side)).copied().unwrap_or(1));
1482 if remaining < weight {
1483 return vec![side];
1484 }
1485 remaining -= weight;
1486 }
1487 Vec::new()
1488 }
1489 RouteSelectionMode::Failover => {
1490 sides.sort_by_key(|side| {
1491 (
1492 st.route_priorities.get(&(src, *side)).copied().unwrap_or(0),
1493 *side,
1494 )
1495 });
1496 sides.truncate(1);
1497 sides
1498 }
1499 }
1500 }
1501
1502 fn apply_adaptive_discovery_selection_locked(
1503 &self,
1504 st: &mut RelayInner,
1505 src: Option<RelaySideId>,
1506 mut sides: Vec<RelaySideId>,
1507 ) -> Vec<RelaySideId> {
1508 sides.sort_unstable();
1509 let mut unmeasured: Vec<_> = sides
1510 .iter()
1511 .copied()
1512 .filter(|side| !st.adaptive_route_stats.contains_key(side))
1513 .collect();
1514 if !unmeasured.is_empty() {
1515 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1516 let pick = (*cursor as usize) % unmeasured.len();
1517 *cursor = cursor.wrapping_add(1);
1518 return vec![unmeasured.swap_remove(pick)];
1519 }
1520
1521 let now_ms = self.clock.now_ms();
1522 let total_weight = sides.iter().fold(0_u64, |acc, side| {
1523 acc + st
1524 .adaptive_route_stats
1525 .get(side)
1526 .map(|stats| stats.weight(now_ms))
1527 .unwrap_or(1)
1528 });
1529 if total_weight == 0 {
1530 sides.truncate(1);
1531 return sides;
1532 }
1533
1534 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1535 let pick = *cursor % total_weight;
1536 *cursor = cursor.wrapping_add(1);
1537 let mut remaining = pick;
1538 for side in sides {
1539 let weight = st
1540 .adaptive_route_stats
1541 .get(&side)
1542 .map(|stats| stats.weight(now_ms))
1543 .unwrap_or(1);
1544 if remaining < weight {
1545 return vec![side];
1546 }
1547 remaining -= weight;
1548 }
1549 Vec::new()
1550 }
1551
1552 fn record_side_tx_sample(
1553 &self,
1554 side: RelaySideId,
1555 bytes: usize,
1556 started_ms: u64,
1557 ended_ms: u64,
1558 ) {
1559 let sample_ms = ended_ms.saturating_sub(started_ms).max(1);
1560 let sample_bps = ((bytes as u128).saturating_mul(1000) / u128::from(sample_ms))
1561 .min(u128::from(u64::MAX)) as u64;
1562 let mut st = self.state.lock();
1563 st.adaptive_route_stats
1564 .entry(side)
1565 .or_default()
1566 .observe(bytes, sample_bps, ended_ms);
1567 }
1568
1569 pub fn note_side_link_probe_sample(
1574 &self,
1575 side: RelaySideId,
1576 bytes: usize,
1577 duration_ms: u64,
1578 ) -> TelemetryResult<()> {
1579 {
1580 let st = self.state.lock();
1581 let _ = Self::side_ref(&st, side).map_err(|_| TelemetryError::BadArg)?;
1582 }
1583 let ended_ms = self.clock.now_ms();
1584 self.record_side_tx_sample(side, bytes, ended_ms.saturating_sub(duration_ms), ended_ms);
1585 Ok(())
1586 }
1587
1588 fn relay_item_wire_len(data: &RelayItem) -> TelemetryResult<usize> {
1589 match data {
1590 RelayItem::Packet(pkt) => Ok(wire_format::pack_packet(pkt).len()),
1591 RelayItem::Packed(bytes) => Ok(bytes.len()),
1592 }
1593 }
1594
1595 #[inline]
1596 fn decode_end_to_end_reliable_ack(payload: &[u8]) -> TelemetryResult<u64> {
1597 if payload.len() != 8 {
1598 return Err(TelemetryError::Unpack("bad reliable e2e ack payload"));
1599 }
1600 Ok(u64::from_le_bytes(payload[0..8].try_into().unwrap()))
1601 }
1602
1603 #[inline]
1604 fn is_end_to_end_ack_sender(sender: &str) -> bool {
1605 sender == Self::END_TO_END_ACK_SENDER || sender.starts_with(Self::END_TO_END_ACK_PREFIX)
1606 }
1607
1608 #[inline]
1609 fn sender_hash(sender: &str) -> u64 {
1610 hash_bytes_u64(0x517C_C1B7_2722_0A95, sender.as_bytes())
1611 }
1612
1613 fn decode_end_to_end_ack_sender_hash(sender: &str) -> Option<u64> {
1614 sender
1615 .strip_prefix(Self::END_TO_END_ACK_PREFIX)
1616 .filter(|sender| !sender.is_empty())
1617 .map(Self::sender_hash)
1618 }
1619
1620 #[cfg(feature = "discovery")]
1621 fn is_end_to_end_destination_sender(&self, sender: &str) -> bool {
1622 sender != self.sender_arc().as_ref() && !Self::is_end_to_end_ack_sender(sender)
1623 }
1624
1625 fn reliable_control_target_packet_id(data: &RelayItem) -> TelemetryResult<Option<u64>> {
1634 match data {
1635 RelayItem::Packet(pkt) => {
1636 if pkt.data_type() != crate::DataType::ReliableAck
1637 || !Self::is_end_to_end_ack_sender(pkt.sender())
1638 {
1639 return Ok(None);
1640 }
1641 Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1642 }
1643 RelayItem::Packed(bytes) => {
1644 if wire_format::peek_frame_info(bytes.as_ref())
1645 .ok()
1646 .is_some_and(|frame| frame.ack_only())
1647 {
1648 return Ok(None);
1649 }
1650 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1651 if pkt.data_type() != crate::DataType::ReliableAck
1652 || !Self::is_end_to_end_ack_sender(pkt.sender())
1653 {
1654 return Ok(None);
1655 }
1656 Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1657 }
1658 }
1659 }
1660
1661 fn note_reliable_return_route(&self, side: RelaySideId, packet_id: u64) {
1662 let mut st = self.state.lock();
1663 Self::remember_reliable_return_route_locked(&mut st, packet_id);
1664 st.reliable_return_routes
1665 .insert(packet_id, ReliableReturnRouteState { side });
1666 }
1667
1668 fn remember_reliable_return_route_locked(st: &mut RelayInner, packet_id: u64) {
1674 let cap = runtime_reliable_max_return_routes().max(1);
1675 st.reliable_return_route_order
1676 .retain(|id| st.reliable_return_routes.contains_key(id) && *id != packet_id);
1677 while st.reliable_return_route_order.len() >= cap {
1678 if let Some(oldest) = st.reliable_return_route_order.pop_front() {
1679 st.reliable_return_routes.remove(&oldest);
1680 } else {
1681 break;
1682 }
1683 }
1684 st.reliable_return_route_order.push_back(packet_id);
1685 }
1686
1687 fn note_end_to_end_acked_destination_locked(
1688 st: &mut RelayInner,
1689 packet_id: u64,
1690 sender_hash: u64,
1691 ) {
1692 let entry_cap = runtime_reliable_max_end_to_end_ack_cache().max(1);
1693 st.end_to_end_acked_destination_order
1694 .retain(|id| st.end_to_end_acked_destinations.contains_key(id) && *id != packet_id);
1695 while st.end_to_end_acked_destination_order.len() >= entry_cap {
1696 if let Some(oldest) = st.end_to_end_acked_destination_order.pop_front() {
1697 st.end_to_end_acked_destinations.remove(&oldest);
1698 } else {
1699 break;
1700 }
1701 }
1702 st.end_to_end_acked_destination_order.push_back(packet_id);
1703
1704 let acked = st
1705 .end_to_end_acked_destinations
1706 .entry(packet_id)
1707 .or_default();
1708 let sender_cap = runtime_reliable_max_end_to_end_pending().max(1);
1709 if acked.len() < sender_cap || acked.contains(&sender_hash) {
1710 acked.insert(sender_hash);
1711 }
1712 }
1713
1714 #[inline]
1715 fn reliable_key(side: RelaySideId, ty: crate::DataType) -> (RelaySideId, u32) {
1716 (side, ty.as_u32())
1717 }
1718
1719 fn reliable_tx_state_mut<'a>(
1720 &'a self,
1721 st: &'a mut RelayInner,
1722 side: RelaySideId,
1723 ty: crate::DataType,
1724 ) -> &'a mut ReliableTxState {
1725 let key = Self::reliable_key(side, ty);
1726 st.reliable_tx
1727 .entry(key)
1728 .or_insert_with(|| ReliableTxState {
1729 next_seq: 1,
1730 sent_order: VecDeque::new(),
1731 sent: BTreeMap::new(),
1732 })
1733 }
1734
1735 fn reliable_rx_state_mut<'a>(
1736 &'a self,
1737 st: &'a mut RelayInner,
1738 side: RelaySideId,
1739 ty: crate::DataType,
1740 ) -> &'a mut ReliableRxState {
1741 let key = Self::reliable_key(side, ty);
1742 st.reliable_rx
1743 .entry(key)
1744 .or_insert_with(|| ReliableRxState {
1745 expected_seq: 1,
1746 buffered: BTreeMap::new(),
1747 })
1748 }
1749
1750 fn handle_reliable_ack(&self, side: RelaySideId, ty: crate::DataType, ack: u32) {
1751 let mut st = self.state.lock();
1752 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1753 if matches!(reliable_mode(ty), crate::ReliableMode::Unordered) {
1754 tx_state.sent.remove(&ack);
1755 tx_state.sent_order.retain(|seq| *seq != ack);
1756 return;
1757 }
1758
1759 while let Some(seq) = tx_state.sent_order.front().copied() {
1760 if seq > ack {
1761 break;
1762 }
1763 tx_state.sent_order.pop_front();
1764 tx_state.sent.remove(&seq);
1765 }
1766 }
1767
1768 fn handle_reliable_partial_ack(&self, side: RelaySideId, ty: crate::DataType, seq: u32) {
1769 let mut st = self.state.lock();
1770 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1771 if let Some(sent) = tx_state.sent.get_mut(&seq) {
1772 sent.partial_acked = true;
1773 }
1774 }
1775
1776 fn reliable_control_packet(
1777 &self,
1778 control_ty: crate::DataType,
1779 ty: crate::DataType,
1780 seq: u32,
1781 ) -> TelemetryResult<Packet> {
1782 let sender = self.sender_arc();
1783 Packet::new(
1784 control_ty,
1785 message_meta(control_ty).endpoints,
1786 sender.as_ref(),
1787 self.clock.now_ms(),
1788 crate::router::encode_slice_le(&[ty.as_u32(), seq]),
1789 )
1790 }
1791
1792 fn queue_reliable_ack(
1793 &self,
1794 side: RelaySideId,
1795 ty: crate::DataType,
1796 seq: u32,
1797 ) -> TelemetryResult<()> {
1798 let pkt = self.reliable_control_packet(crate::DataType::ReliableAck, ty, seq)?;
1799 let data = RelayItem::Packet(Arc::new(pkt));
1800 let priority = Self::relay_item_priority(&data)?;
1801 let mut st = self.state.lock();
1802 st.push_tx(RelayTxItem {
1803 src: None,
1804 dst: side,
1805 data,
1806 priority,
1807 })?;
1808 Ok(())
1809 }
1810
1811 fn queue_reliable_packet_request(
1812 &self,
1813 side: RelaySideId,
1814 ty: crate::DataType,
1815 seq: u32,
1816 ) -> TelemetryResult<()> {
1817 let pkt = self.reliable_control_packet(crate::DataType::ReliablePacketRequest, ty, seq)?;
1818 let data = RelayItem::Packet(Arc::new(pkt));
1819 let priority = Self::relay_item_priority(&data)?;
1820 let mut st = self.state.lock();
1821 st.push_tx(RelayTxItem {
1822 src: None,
1823 dst: side,
1824 data,
1825 priority,
1826 })?;
1827 Ok(())
1828 }
1829
1830 fn queue_reliable_partial_ack(
1831 &self,
1832 side: RelaySideId,
1833 ty: crate::DataType,
1834 seq: u32,
1835 ) -> TelemetryResult<()> {
1836 let pkt = self.reliable_control_packet(crate::DataType::ReliablePartialAck, ty, seq)?;
1837 let data = RelayItem::Packet(Arc::new(pkt));
1838 let priority = Self::relay_item_priority(&data)?;
1839 let mut st = self.state.lock();
1840 st.push_tx(RelayTxItem {
1841 src: None,
1842 dst: side,
1843 data,
1844 priority,
1845 })?;
1846 Ok(())
1847 }
1848
1849 fn queue_reliable_retransmit(
1850 &self,
1851 side: RelaySideId,
1852 ty: crate::DataType,
1853 seq: u32,
1854 ) -> TelemetryResult<()> {
1855 let mut queued = None;
1856 {
1857 let mut st = self.state.lock();
1858 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1859 if let Some(sent) = tx_state.sent.get_mut(&seq)
1860 && !sent.queued
1861 {
1862 sent.queued = true;
1863 sent.partial_acked = false;
1864 queued = Some(sent.bytes.clone());
1865 }
1866 }
1867
1868 if let Some(bytes) = queued {
1869 let mut st = self.state.lock();
1870 st.push_replay(RelayReplayItem {
1871 dst: side,
1872 bytes,
1873 priority: message_priority(ty).saturating_add(16),
1874 })?;
1875 }
1876 Ok(())
1877 }
1878
1879 fn send_reliable_raw_to_side(
1880 &self,
1881 side: RelaySideId,
1882 bytes: Arc<[u8]>,
1883 ) -> TelemetryResult<()> {
1884 let (handler, opts) = {
1885 let st = self.state.lock();
1886 let side_ref = Self::side_ref(&st, side)?;
1887 if !side_ref.opts.egress_enabled {
1888 return Ok(());
1889 }
1890 (side_ref.tx_handler.clone(), side_ref.opts)
1891 };
1892
1893 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1894 return Err(TelemetryError::Io("side tx busy"));
1895 };
1896 let started_ms = self.clock.now_ms();
1897 let ty = wire_format::peek_envelope(bytes.as_ref())
1898 .map(|env| env.ty)
1899 .unwrap_or(crate::DataType::ReliableAck);
1900 let result = match handler {
1901 RelayTxHandlerFn::Packed(f) => {
1902 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1903 let mut sent_bytes = 0usize;
1904 for frame in frames {
1905 f(frame.as_ref())?;
1906 sent_bytes = sent_bytes.saturating_add(frame.len());
1907 }
1908 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1909 self.note_side_tx_success(side, ty, sent_bytes, 1);
1910 return Ok(());
1911 }
1912 RelayTxHandlerFn::Packet(f) => {
1913 if wire_format::peek_frame_info(bytes.as_ref())
1914 .ok()
1915 .is_some_and(|frame| frame.ack_only())
1916 {
1917 return Ok(());
1918 }
1919 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1920 f(&pkt)
1921 }
1922 };
1923 if result.is_ok() {
1924 self.record_side_tx_sample(side, bytes.len(), started_ms, self.clock.now_ms());
1925 self.note_side_tx_success(side, ty, bytes.len(), 1);
1926 } else {
1927 self.note_side_tx_failure(side, ty, 1);
1928 }
1929 result
1930 }
1931
1932 fn send_reliable_to_side(&self, side: RelaySideId, data: RelayItem) -> TelemetryResult<()> {
1933 let (handler, opts, hop_reliable_enabled) = {
1934 let st = self.state.lock();
1935 let side_ref = Self::side_ref(&st, side)?;
1936 let opts = side_ref.opts;
1937 let hop_reliable_enabled = opts.reliable_enabled
1938 && !self.side_has_multiple_announcers_locked(&st, side, self.clock.now_ms());
1939 (side_ref.tx_handler.clone(), opts, hop_reliable_enabled)
1940 };
1941
1942 let RelayTxHandlerFn::Packed(f) = &handler else {
1943 return self.call_tx_handler(side, &handler, &data);
1944 };
1945
1946 if !hop_reliable_enabled {
1947 let mut adjusted_opts = opts;
1948 adjusted_opts.reliable_enabled = false;
1949 if let Some(adjusted) = self.adjust_reliable_for_side(adjusted_opts, data)? {
1950 return self.call_tx_handler(side, &handler, &adjusted);
1951 }
1952 return Ok(());
1953 }
1954
1955 let ty = match &data {
1956 RelayItem::Packet(pkt) => pkt.data_type(),
1957 RelayItem::Packed(bytes) => {
1958 let Ok(frame) = wire_format::peek_frame_info(bytes.as_ref()) else {
1959 return self.call_tx_handler(side, &handler, &data);
1960 };
1961 frame.envelope.ty
1962 }
1963 };
1964
1965 if !is_reliable_type(ty) {
1966 if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1967 self.call_tx_handler(side, &handler, &adjusted)?;
1968 }
1969 return Ok(());
1970 }
1971
1972 let (seq, flags) = {
1973 let mut st = self.state.lock();
1974 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1975 if tx_state.sent.len() >= runtime_reliable_max_pending() {
1976 return Err(TelemetryError::PacketTooLarge(
1977 "relay reliable history full",
1978 ));
1979 }
1980 let seq = tx_state.next_seq;
1981 let next = tx_state.next_seq.wrapping_add(1);
1982 tx_state.next_seq = if next == 0 { 1 } else { next };
1983 let flags = match reliable_mode(ty) {
1984 crate::ReliableMode::Unordered => wire_format::RELIABLE_FLAG_UNORDERED,
1985 _ => 0,
1986 };
1987 (seq, flags)
1988 };
1989
1990 let bytes: Arc<[u8]> = match data {
1991 RelayItem::Packet(pkt) => wire_format::pack_packet_with_reliable(
1992 &pkt,
1993 wire_format::ReliableHeader { flags, seq, ack: 0 },
1994 ),
1995 RelayItem::Packed(bytes) => {
1996 let Some(rewritten) =
1997 wire_format::rewrite_reliable_header_owned(bytes.as_ref(), flags, seq, 0)?
1998 else {
1999 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
2000 return Err(TelemetryError::Io("side tx busy"));
2001 };
2002 let started_ms = self.clock.now_ms();
2003 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
2004 let mut sent_bytes = 0usize;
2005 for frame in frames {
2006 f(frame.as_ref())?;
2007 sent_bytes = sent_bytes.saturating_add(frame.len());
2008 }
2009 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
2010 self.note_side_tx_success(side, ty, sent_bytes, 1);
2011 return Ok(());
2012 };
2013 rewritten
2014 }
2015 };
2016
2017 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
2018 return Err(TelemetryError::Io("side tx busy"));
2019 };
2020 let started_ms = self.clock.now_ms();
2021 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
2022 let mut sent_bytes = 0usize;
2023 for frame in frames {
2024 f(frame.as_ref())?;
2025 sent_bytes = sent_bytes.saturating_add(frame.len());
2026 }
2027 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
2028 self.note_side_tx_success(side, ty, sent_bytes, 1);
2029
2030 {
2031 let mut st = self.state.lock();
2032 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
2033 tx_state.sent_order.push_back(seq);
2034 tx_state.sent.insert(
2035 seq,
2036 ReliableSent {
2037 bytes: bytes.clone(),
2038 last_send_ms: self.clock.now_ms(),
2039 retries: 0,
2040 queued: false,
2041 partial_acked: false,
2042 },
2043 );
2044 }
2045
2046 Ok(())
2047 }
2048
2049 fn item_route_info(
2050 &self,
2051 data: &RelayItem,
2052 ) -> TelemetryResult<(Vec<crate::DataEndpoint>, crate::DataType)> {
2053 match data {
2054 RelayItem::Packet(pkt) => {
2055 let mut eps = pkt.endpoints().to_vec();
2056 eps.sort_unstable();
2057 eps.dedup();
2058 Ok((eps, pkt.data_type()))
2059 }
2060 RelayItem::Packed(bytes) => {
2061 let env = wire_format::peek_envelope(bytes.as_ref())?;
2062 let mut eps: Vec<crate::DataEndpoint> = env.endpoints.iter().copied().collect();
2063 eps.sort_unstable();
2064 eps.dedup();
2065 Ok((eps, env.ty))
2066 }
2067 }
2068 }
2069
2070 fn endpoints_are_link_local_only(eps: &[crate::DataEndpoint]) -> bool {
2071 !eps.is_empty() && eps.iter().all(|ep| ep.is_link_local_only())
2072 }
2073
2074 fn item_target_senders(&self, data: &RelayItem) -> TelemetryResult<Arc<[u64]>> {
2075 match data {
2076 RelayItem::Packet(pkt) => Ok(Arc::from(pkt.wire_target_senders())),
2077 RelayItem::Packed(bytes) => {
2078 Ok(wire_format::peek_envelope(bytes.as_ref())?.target_senders)
2079 }
2080 }
2081 }
2082
2083 #[cfg(feature = "discovery")]
2084 fn has_explicit_route_policy_locked(
2085 st: &RelayInner,
2086 src: Option<RelaySideId>,
2087 ty: crate::DataType,
2088 ) -> bool {
2089 st.route_overrides
2090 .keys()
2091 .any(|(route_src, _)| *route_src == src)
2092 || Self::has_typed_route_overrides_locked(st, src, ty)
2093 }
2094
2095 #[cfg(feature = "discovery")]
2096 fn side_matches_target_senders_locked(
2097 st: &RelayInner,
2098 side: RelaySideId,
2099 target_senders: &[u64],
2100 now_ms: u64,
2101 ) -> bool {
2102 st.discovery_routes
2103 .get(&side)
2104 .map(|route| {
2105 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2106 return false;
2107 }
2108 route.announcers.values().any(|sender_state| {
2109 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2110 return false;
2111 }
2112 sender_state
2113 .topology_boards
2114 .iter()
2115 .any(|board| target_senders.contains(&Self::sender_hash(&board.sender_id)))
2116 })
2117 })
2118 .unwrap_or(false)
2119 }
2120
2121 fn remote_side_plan(
2122 &self,
2123 data: &RelayItem,
2124 exclude: RelaySideId,
2125 ) -> TelemetryResult<RemoteSidePlan> {
2126 #[cfg(feature = "discovery")]
2127 {
2128 let (eps, ty) = self.item_route_info(data)?;
2129 let target_senders = self.item_target_senders(data)?;
2130 let preferred_packet_id = Self::reliable_control_target_packet_id(data)?;
2131 if discovery::is_discovery_type(ty) {
2132 let mut st = self.state.lock();
2133 let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2134 return Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2135 &mut st,
2136 Some(exclude),
2137 sides,
2138 RouteSelectionOrigin::Flood,
2139 )));
2140 }
2141
2142 #[cfg(feature = "timesync")]
2143 let preferred_timesync_source = self.preferred_timesync_route_source(data, ty)?;
2144 #[cfg(not(feature = "timesync"))]
2145 let preferred_timesync_source: Option<String> = None;
2146 let mut st = self.state.lock();
2147 if let Some(packet_id) = preferred_packet_id {
2148 let target_side = self.allowed_target_side_locked(
2149 &st,
2150 exclude,
2151 ty,
2152 st.reliable_return_routes
2153 .get(&packet_id)
2154 .map(|route| route.side),
2155 );
2156 if let Some(side) = target_side {
2157 #[cfg(feature = "timesync")]
2158 if !Self::timesync_allowed_for_side_locked(
2159 &mut st,
2160 side,
2161 ty,
2162 self.clock.now_ms(),
2163 ) {
2164 return Ok(RemoteSidePlan::Target(Vec::new()));
2165 }
2166 return Ok(RemoteSidePlan::Target(vec![side]));
2167 }
2168 return Ok(RemoteSidePlan::Target(Vec::new()));
2169 }
2170 let restrict_link_local = Self::endpoints_are_link_local_only(&eps);
2171 let discovered_origin = if is_reliable_type(ty) {
2172 RouteSelectionOrigin::Flood
2173 } else {
2174 RouteSelectionOrigin::Discovered
2175 };
2176 if st.discovery_routes.is_empty() {
2177 let mut fallback = self.eligible_side_ids_locked(
2178 &st,
2179 Some(exclude),
2180 Some(ty),
2181 restrict_link_local,
2182 );
2183 #[cfg(feature = "timesync")]
2184 {
2185 fallback = Self::filter_timesync_sides_locked(
2186 &mut st,
2187 ty,
2188 self.clock.now_ms(),
2189 fallback,
2190 );
2191 }
2192 return Ok(RemoteSidePlan::Target(if fallback.len() == 1 {
2193 fallback
2194 } else {
2195 Vec::new()
2196 }));
2197 }
2198 let now_ms = self.clock.now_ms();
2199 let mut had_exact = false;
2200 let mut exact_targets = Vec::new();
2201 let mut had_known = false;
2202 let mut generic_targets = Vec::new();
2203
2204 for (&side, route) in st.discovery_routes.iter() {
2205 if side == exclude
2206 || now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS
2207 {
2208 continue;
2209 }
2210 if restrict_link_local
2211 && st
2212 .sides
2213 .get(side)
2214 .and_then(|side| side.as_ref())
2215 .map(|s| !s.opts.link_local_enabled)
2216 .unwrap_or(true)
2217 {
2218 continue;
2219 }
2220 if !self.route_allowed_locked(&st, Some(exclude), Some(ty), side) {
2221 continue;
2222 }
2223 if !target_senders.is_empty() {
2224 if !Self::side_matches_target_senders_locked(&st, side, &target_senders, now_ms)
2225 {
2226 continue;
2227 }
2228 had_known = true;
2229 generic_targets.push(side);
2230 continue;
2231 }
2232 if preferred_timesync_source.as_deref().is_some_and(|source| {
2233 route.reachable_timesync_sources.iter().any(|s| s == source)
2234 }) {
2235 had_exact = true;
2236 exact_targets.push(side);
2237 continue;
2238 }
2239 if eps.iter().copied().any(|ep| route.reachable.contains(&ep)) {
2240 had_known = true;
2241 generic_targets.push(side);
2242 }
2243 }
2244
2245 if had_exact {
2246 #[cfg(feature = "timesync")]
2247 {
2248 exact_targets = Self::filter_timesync_sides_locked(
2249 &mut st,
2250 ty,
2251 self.clock.now_ms(),
2252 exact_targets,
2253 );
2254 }
2255 let targets = self.filter_end_to_end_satisfied_sides_locked(
2256 &st,
2257 data,
2258 exact_targets,
2259 &eps,
2260 ty,
2261 )?;
2262 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2263 &mut st,
2264 Some(exclude),
2265 targets,
2266 discovered_origin,
2267 )))
2268 } else if had_known {
2269 #[cfg(feature = "timesync")]
2270 {
2271 generic_targets = Self::filter_timesync_sides_locked(
2272 &mut st,
2273 ty,
2274 self.clock.now_ms(),
2275 generic_targets,
2276 );
2277 }
2278 let targets = self.filter_end_to_end_satisfied_sides_locked(
2279 &st,
2280 data,
2281 generic_targets,
2282 &eps,
2283 ty,
2284 )?;
2285 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2286 &mut st,
2287 Some(exclude),
2288 targets,
2289 discovered_origin,
2290 )))
2291 } else {
2292 if Self::has_explicit_route_policy_locked(&st, Some(exclude), ty) {
2293 let mut sides = self.eligible_side_ids_locked(
2294 &st,
2295 Some(exclude),
2296 Some(ty),
2297 restrict_link_local,
2298 );
2299 #[cfg(feature = "timesync")]
2300 {
2301 sides = Self::filter_timesync_sides_locked(
2302 &mut st,
2303 ty,
2304 self.clock.now_ms(),
2305 sides,
2306 );
2307 }
2308 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2309 &mut st,
2310 Some(exclude),
2311 sides,
2312 RouteSelectionOrigin::Flood,
2313 )))
2314 } else {
2315 Ok(RemoteSidePlan::Target(Vec::new()))
2316 }
2317 }
2318 }
2319 #[cfg(not(feature = "discovery"))]
2320 {
2321 let (_, ty) = self.item_route_info(data)?;
2322 let mut st = self.state.lock();
2323 if let Some(packet_id) = Self::reliable_control_target_packet_id(data)? {
2324 let target_side = self.allowed_target_side_locked(
2325 &st,
2326 exclude,
2327 ty,
2328 st.reliable_return_routes
2329 .get(&packet_id)
2330 .map(|route| route.side),
2331 );
2332 if let Some(side) = target_side {
2333 return Ok(RemoteSidePlan::Target(vec![side]));
2334 }
2335 return Ok(RemoteSidePlan::Target(Vec::new()));
2336 }
2337 let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2338 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2339 &mut st,
2340 Some(exclude),
2341 sides,
2342 RouteSelectionOrigin::Flood,
2343 )))
2344 }
2345 }
2346
2347 #[inline]
2348 fn allowed_target_side_locked(
2349 &self,
2350 st: &RelayInner,
2351 exclude: RelaySideId,
2352 ty: crate::DataType,
2353 target_side: Option<RelaySideId>,
2354 ) -> Option<RelaySideId> {
2355 target_side.filter(|side| self.route_allowed_locked(st, Some(exclude), Some(ty), *side))
2356 }
2357
2358 fn filter_end_to_end_satisfied_sides_locked(
2359 &self,
2360 st: &RelayInner,
2361 data: &RelayItem,
2362 sides: Vec<RelaySideId>,
2363 eps: &[crate::DataEndpoint],
2364 ty: crate::DataType,
2365 ) -> TelemetryResult<Vec<RelaySideId>> {
2366 if !is_reliable_type(ty) || Self::reliable_control_target_packet_id(data)?.is_some() {
2367 return Ok(sides);
2368 }
2369 let packet_id = match data {
2370 RelayItem::Packet(pkt) => pkt.packet_id(),
2371 RelayItem::Packed(bytes) => match wire_format::packet_id_from_wire(bytes.as_ref()) {
2372 Ok(packet_id) => packet_id,
2373 Err(TelemetryError::Unpack("reliable control frame")) => return Ok(sides),
2374 Err(err) => return Err(err),
2375 },
2376 };
2377 let Some(acked) = st.end_to_end_acked_destinations.get(&packet_id) else {
2378 return Ok(sides);
2379 };
2380 let now_ms = self.clock.now_ms();
2381 let mut filtered = Vec::new();
2382 for side in sides {
2383 let Some(route) = st.discovery_routes.get(&side) else {
2384 filtered.push(side);
2385 continue;
2386 };
2387 let mut still_pending = false;
2388 let mut had_destination_board = false;
2389 for sender_state in route.announcers.values() {
2390 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2391 continue;
2392 }
2393 for board in sender_state.topology_boards.iter() {
2394 if !self.is_end_to_end_destination_sender(&board.sender_id) {
2395 continue;
2396 }
2397 had_destination_board = true;
2398 let sender_hash = Self::sender_hash(&board.sender_id);
2399 if acked.contains(&sender_hash) {
2400 continue;
2401 }
2402 if eps
2403 .iter()
2404 .copied()
2405 .any(|ep| board.reachable_endpoints.contains(&ep))
2406 {
2407 still_pending = true;
2408 break;
2409 }
2410 still_pending = true;
2413 break;
2414 }
2415 if still_pending {
2416 break;
2417 }
2418 }
2419 if still_pending || !had_destination_board {
2420 filtered.push(side);
2421 }
2422 }
2423 Ok(filtered)
2424 }
2425
2426 #[cfg(feature = "discovery")]
2427 fn side_has_multiple_announcers_locked(
2428 &self,
2429 st: &RelayInner,
2430 side: RelaySideId,
2431 now_ms: u64,
2432 ) -> bool {
2433 st.discovery_routes
2434 .get(&side)
2435 .map(|route| {
2436 route
2437 .announcers
2438 .values()
2439 .filter(|sender| {
2440 now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2441 })
2442 .take(2)
2443 .count()
2444 > 1
2445 })
2446 .unwrap_or(false)
2447 }
2448
2449 #[cfg(not(feature = "discovery"))]
2450 fn side_has_multiple_announcers_locked(
2451 &self,
2452 _st: &RelayInner,
2453 _side: RelaySideId,
2454 _now_ms: u64,
2455 ) -> bool {
2456 false
2457 }
2458
2459 #[cfg(feature = "discovery")]
2460 fn sender_topology_board_mut<'a>(
2461 sender_state: &'a mut DiscoverySenderState,
2462 sender_id: &str,
2463 ) -> &'a mut TopologyBoardNode {
2464 if let Some(idx) = sender_state
2465 .topology_boards
2466 .iter()
2467 .position(|board| board.sender_id == sender_id)
2468 {
2469 return &mut sender_state.topology_boards[idx];
2470 }
2471 sender_state.topology_boards.push(TopologyBoardNode {
2472 sender_id: sender_id.to_string(),
2473 reachable_endpoints: Vec::new(),
2474 reachable_timesync_sources: Vec::new(),
2475 connections: Vec::new(),
2476 });
2477 sender_state
2478 .topology_boards
2479 .last_mut()
2480 .expect("board inserted above")
2481 }
2482
2483 #[cfg(feature = "discovery")]
2484 fn refresh_sender_topology_state(sender_state: &mut DiscoverySenderState) {
2485 discovery::normalize_topology_boards(&mut sender_state.topology_boards);
2486 let (reachable, reachable_timesync_sources) =
2487 discovery::summarize_topology_boards(&sender_state.topology_boards);
2488 sender_state.reachable = reachable;
2489 sender_state.reachable_timesync_sources = reachable_timesync_sources;
2490 }
2491
2492 #[cfg(feature = "discovery")]
2493 fn recompute_discovery_side_state(route: &mut DiscoverySideState) {
2494 let mut reachable = Vec::new();
2495 let mut reachable_timesync_sources = Vec::new();
2496 let mut last_seen_ms = 0u64;
2497 for sender in route.announcers.values() {
2498 reachable.extend(sender.reachable.iter().copied());
2499 reachable_timesync_sources.extend(sender.reachable_timesync_sources.iter().cloned());
2500 last_seen_ms = last_seen_ms.max(sender.last_seen_ms);
2501 }
2502 reachable.sort_unstable();
2503 reachable.dedup();
2504 reachable_timesync_sources.sort_unstable();
2505 reachable_timesync_sources.dedup();
2506 route.reachable = reachable;
2507 route.reachable_timesync_sources = reachable_timesync_sources;
2508 route.last_seen_ms = last_seen_ms;
2509 }
2510
2511 #[cfg(feature = "discovery")]
2512 fn local_discovery_topology_board(&self, st: &RelayInner, now_ms: u64) -> TopologyBoardNode {
2513 let mut connections = Vec::new();
2514 for route in st.discovery_routes.values() {
2515 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2516 continue;
2517 }
2518 for (sender, sender_state) in route.announcers.iter() {
2519 if now_ms.saturating_sub(sender_state.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS {
2520 connections.push(sender.clone());
2521 }
2522 }
2523 }
2524 connections.sort_unstable();
2525 connections.dedup();
2526 let sender = self.sender_arc();
2527 TopologyBoardNode {
2528 sender_id: sender.to_string(),
2529 reachable_endpoints: Vec::new(),
2530 reachable_timesync_sources: Vec::new(),
2531 connections,
2532 }
2533 }
2534
2535 #[cfg(feature = "discovery")]
2536 fn advertised_discovery_topology_for_link_locked(
2537 &self,
2538 st: &RelayInner,
2539 now_ms: u64,
2540 link_local_enabled: bool,
2541 ) -> Vec<TopologyBoardNode> {
2542 let mut boards = vec![self.local_discovery_topology_board(st, now_ms)];
2543 for route in st.discovery_routes.values() {
2544 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2545 continue;
2546 }
2547 for (announcer, sender_state) in route.announcers.iter() {
2548 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2549 continue;
2550 }
2551 let mut sender_boards = sender_state.topology_boards.clone();
2552 if sender_boards.is_empty() {
2553 let sender = self.sender_arc();
2554 sender_boards.push(TopologyBoardNode {
2555 sender_id: announcer.clone(),
2556 reachable_endpoints: sender_state.reachable.clone(),
2557 reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
2558 connections: vec![sender.to_string()],
2559 });
2560 } else if let Some(board) = sender_boards
2561 .iter_mut()
2562 .find(|board| board.sender_id == *announcer)
2563 {
2564 board.connections.push(self.sender_arc().to_string());
2565 }
2566 if !link_local_enabled {
2567 for board in sender_boards.iter_mut() {
2568 board
2569 .reachable_endpoints
2570 .retain(|ep| !ep.is_link_local_only());
2571 }
2572 }
2573 discovery::merge_topology_boards(&mut boards, &sender_boards);
2574 }
2575 }
2576 discovery::normalize_topology_boards(&mut boards);
2577 boards
2578 }
2579
2580 #[cfg(feature = "discovery")]
2581 fn note_discovery_topology_change_locked(st: &mut RelayInner, now_ms: u64) {
2582 st.discovery_cadence.on_topology_change(now_ms);
2583 }
2584
2585 #[cfg(feature = "discovery")]
2586 fn prune_discovery_routes_locked(st: &mut RelayInner, now_ms: u64) -> bool {
2587 let before = st.discovery_routes.clone();
2588 st.discovery_routes.retain(|_, route| {
2589 route.announcers.retain(|_, sender| {
2590 now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2591 });
2592 Self::recompute_discovery_side_state(route);
2593 !route.announcers.is_empty()
2594 });
2595 st.discovery_routes != before
2596 }
2597
2598 #[cfg(feature = "discovery")]
2599 fn reconcile_end_to_end_acked_destinations_locked(&self, st: &mut RelayInner) {
2600 let mut active_senders = BTreeSet::new();
2601 for route in st.discovery_routes.values() {
2602 for sender_state in route.announcers.values() {
2603 for board in sender_state.topology_boards.iter() {
2604 if self.is_end_to_end_destination_sender(&board.sender_id) {
2605 active_senders.insert(Self::sender_hash(&board.sender_id));
2606 }
2607 }
2608 }
2609 }
2610 st.end_to_end_acked_destinations.retain(|_, acked| {
2611 acked.retain(|sender_hash| active_senders.contains(sender_hash));
2612 !acked.is_empty()
2613 });
2614 }
2615
2616 #[cfg(feature = "discovery")]
2617 fn advertised_discovery_endpoints_for_link_locked(
2618 &self,
2619 st: &RelayInner,
2620 now_ms: u64,
2621 link_local_enabled: bool,
2622 ) -> Vec<crate::DataEndpoint> {
2623 let (reachable_endpoints, _) = discovery::summarize_topology_boards(
2624 &self.advertised_discovery_topology_for_link_locked(st, now_ms, link_local_enabled),
2625 );
2626 reachable_endpoints
2627 .into_iter()
2628 .filter(|ep| {
2629 !discovery::is_discovery_endpoint(*ep)
2630 && (link_local_enabled || !ep.is_link_local_only())
2631 })
2632 .collect()
2633 }
2634
2635 #[cfg(feature = "discovery")]
2636 fn advertised_discovery_timesync_sources_for_link_locked(
2637 &self,
2638 st: &RelayInner,
2639 now_ms: u64,
2640 ) -> Vec<String> {
2641 let (_, sources) = discovery::summarize_topology_boards(
2642 &self.advertised_discovery_topology_for_link_locked(st, now_ms, true),
2643 );
2644 sources
2645 }
2646
2647 #[cfg(feature = "discovery")]
2648 #[cfg(feature = "timesync")]
2649 fn preferred_timesync_route_source(
2650 &self,
2651 data: &RelayItem,
2652 ty: crate::DataType,
2653 ) -> TelemetryResult<Option<String>> {
2654 if !matches!(
2655 ty,
2656 crate::DataType::TimeSyncAnnounce | crate::DataType::TimeSyncResponse
2657 ) {
2658 return Ok(None);
2659 }
2660
2661 let sender = match data {
2662 RelayItem::Packet(pkt) => pkt.sender().to_owned(),
2663 RelayItem::Packed(bytes) => {
2664 if wire_format::peek_frame_info(bytes.as_ref())
2665 .ok()
2666 .is_some_and(|frame| frame.ack_only())
2667 {
2668 return Ok(None);
2669 }
2670 wire_format::unpack_packet(bytes.as_ref())?
2671 .sender()
2672 .to_owned()
2673 }
2674 };
2675 Ok(Some(sender))
2676 }
2677
2678 #[cfg(feature = "discovery")]
2679 #[inline]
2680 fn side_is_slow_control_link_locked(
2681 st: &RelayInner,
2682 side_id: RelaySideId,
2683 now_ms: u64,
2684 ) -> bool {
2685 st.adaptive_route_stats.get(&side_id).is_some_and(|stats| {
2686 let recent_slow = stats.last_slow_observed_ms > 0
2687 && now_ms.saturating_sub(stats.last_slow_observed_ms)
2688 <= DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS;
2689 stats.sample_count > 0
2690 && ((stats.estimated_bandwidth_bps > 0
2691 && stats.estimated_bandwidth_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS)
2692 || recent_slow)
2693 })
2694 }
2695
2696 #[cfg(feature = "discovery")]
2697 fn discovery_level_for_side_locked(
2698 st: &mut RelayInner,
2699 side_id: RelaySideId,
2700 now_ms: u64,
2701 ) -> Option<DiscoveryAdvertiseLevel> {
2702 if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2703 st.discovery_side_throttle.remove(&side_id);
2704 return Some(DiscoveryAdvertiseLevel::Full);
2705 }
2706
2707 let throttle = st.discovery_side_throttle.entry(side_id).or_default();
2708 if now_ms >= throttle.next_full_ms {
2709 throttle.next_full_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS);
2710 throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2711 return Some(DiscoveryAdvertiseLevel::Full);
2712 }
2713 if now_ms >= throttle.next_ping_ms {
2714 throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2715 return Some(DiscoveryAdvertiseLevel::MinimalPing);
2716 }
2717 None
2718 }
2719
2720 #[cfg(all(feature = "discovery", feature = "timesync"))]
2721 #[inline]
2722 fn is_timesync_type(ty: crate::DataType) -> bool {
2723 matches!(
2724 ty,
2725 crate::DataType::TimeSyncAnnounce
2726 | crate::DataType::TimeSyncRequest
2727 | crate::DataType::TimeSyncResponse
2728 )
2729 }
2730
2731 #[cfg(all(feature = "discovery", feature = "timesync"))]
2732 fn timesync_allowed_for_side_locked(
2733 st: &mut RelayInner,
2734 side_id: RelaySideId,
2735 ty: crate::DataType,
2736 now_ms: u64,
2737 ) -> bool {
2738 if !Self::is_timesync_type(ty) {
2739 return true;
2740 }
2741 if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2742 st.timesync_side_throttle.remove(&side_id);
2743 return true;
2744 }
2745
2746 let throttle = st.timesync_side_throttle.entry(side_id).or_default();
2747 if now_ms >= throttle.next_allowed_ms {
2748 throttle.next_allowed_ms = now_ms.saturating_add(TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS);
2749 return true;
2750 }
2751 false
2752 }
2753
2754 #[cfg(all(feature = "discovery", feature = "timesync"))]
2755 fn filter_timesync_sides_locked(
2756 st: &mut RelayInner,
2757 ty: crate::DataType,
2758 now_ms: u64,
2759 sides: Vec<RelaySideId>,
2760 ) -> Vec<RelaySideId> {
2761 sides
2762 .into_iter()
2763 .filter(|side| Self::timesync_allowed_for_side_locked(st, *side, ty, now_ms))
2764 .collect()
2765 }
2766
2767 #[cfg(feature = "discovery")]
2768 fn queue_discovery_announce(&self) -> TelemetryResult<()> {
2769 let now_ms = self.clock.now_ms();
2770 let per_side = {
2771 let mut st = self.state.lock();
2772 if Self::prune_discovery_routes_locked(&mut st, now_ms) {
2773 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2774 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2775 }
2776 st.fit_discovery_budget();
2777 if !st.sides.iter().any(|side| side.is_some()) {
2778 return Ok(());
2779 }
2780 st.discovery_cadence.on_announce_sent(now_ms);
2781 let side_entries = st
2782 .sides
2783 .iter()
2784 .enumerate()
2785 .filter_map(|(side_id, side)| {
2786 side.as_ref()
2787 .map(|side| (side_id, side.opts.link_local_enabled, side.opts))
2788 })
2789 .collect::<Vec<_>>();
2790 let mut per_side = Vec::new();
2791 for (side_id, link_local_enabled, opts) in side_entries {
2792 if !self.route_allowed_locked(
2793 &st,
2794 None,
2795 Some(crate::DataType::DiscoveryAnnounce),
2796 side_id,
2797 ) {
2798 continue;
2799 }
2800 let Some(level) = Self::discovery_level_for_side_locked(&mut st, side_id, now_ms)
2801 else {
2802 continue;
2803 };
2804 let capabilities = opts.link_capabilities();
2805 if level == DiscoveryAdvertiseLevel::MinimalPing {
2806 per_side.push((
2807 side_id,
2808 level,
2809 Vec::new(),
2810 Vec::new(),
2811 Vec::new(),
2812 capabilities,
2813 ));
2814 continue;
2815 }
2816 let endpoints = self.advertised_discovery_endpoints_for_link_locked(
2817 &st,
2818 now_ms,
2819 link_local_enabled,
2820 );
2821 let timesync_sources =
2822 self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
2823 let topology = self.advertised_discovery_topology_for_link_locked(
2824 &st,
2825 now_ms,
2826 link_local_enabled,
2827 );
2828 per_side.push((
2829 side_id,
2830 level,
2831 endpoints,
2832 timesync_sources,
2833 topology,
2834 capabilities,
2835 ));
2836 }
2837 per_side
2838 };
2839 let mut st = self.state.lock();
2840 for (dst, level, endpoints, timesync_sources, topology, capabilities) in per_side {
2841 let sender = self.sender_arc();
2842 if level == DiscoveryAdvertiseLevel::Full {
2843 let pkt = discovery::build_discovery_schema(sender.as_ref(), now_ms)?;
2844 let data = RelayItem::Packet(Arc::new(pkt));
2845 let priority = Self::relay_item_priority(&data)?;
2846 st.push_tx(RelayTxItem {
2847 src: None,
2848 dst,
2849 data,
2850 priority,
2851 })?;
2852 }
2853 if level == DiscoveryAdvertiseLevel::Full {
2854 let pkt = discovery::build_discovery_link_capabilities(
2855 sender.as_ref(),
2856 now_ms,
2857 capabilities,
2858 )?;
2859 let data = RelayItem::Packet(Arc::new(pkt));
2860 let priority = Self::relay_item_priority(&data)?;
2861 st.push_tx(RelayTxItem {
2862 src: None,
2863 dst,
2864 data,
2865 priority,
2866 })?;
2867 }
2868 if level == DiscoveryAdvertiseLevel::MinimalPing || !endpoints.is_empty() {
2869 let pkt = discovery::build_discovery_announce(
2870 sender.as_ref(),
2871 now_ms,
2872 endpoints.as_slice(),
2873 )?;
2874 let data = RelayItem::Packet(Arc::new(pkt));
2875 let priority = Self::relay_item_priority(&data)?;
2876 st.push_tx(RelayTxItem {
2877 src: None,
2878 dst,
2879 data,
2880 priority,
2881 })?;
2882 }
2883 if level == DiscoveryAdvertiseLevel::Full && !timesync_sources.is_empty() {
2884 let pkt = discovery::build_discovery_timesync_sources(
2885 sender.as_ref(),
2886 now_ms,
2887 timesync_sources.as_slice(),
2888 )?;
2889 let data = RelayItem::Packet(Arc::new(pkt));
2890 let priority = Self::relay_item_priority(&data)?;
2891 st.push_tx(RelayTxItem {
2892 src: None,
2893 dst,
2894 data,
2895 priority,
2896 })?;
2897 }
2898 if level == DiscoveryAdvertiseLevel::Full && !topology.is_empty() {
2899 let pkt = discovery::build_discovery_topology(sender.as_ref(), now_ms, &topology)?;
2900 let data = RelayItem::Packet(Arc::new(pkt));
2901 let priority = Self::relay_item_priority(&data)?;
2902 st.push_tx(RelayTxItem {
2903 src: None,
2904 dst,
2905 data,
2906 priority,
2907 })?;
2908 }
2909 }
2910 Ok(())
2911 }
2912
2913 #[cfg(feature = "discovery")]
2914 fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
2915 let now_ms = self.clock.now_ms();
2916 let due = {
2917 let mut st = self.state.lock();
2918 let removed = Self::prune_discovery_routes_locked(&mut st, now_ms);
2919 if removed {
2920 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2921 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2922 }
2923 st.fit_discovery_budget();
2924 let has_any = st.sides.iter().enumerate().any(|(side_id, side)| {
2925 let Some(side) = side.as_ref() else {
2926 return false;
2927 };
2928 if !self.route_allowed_locked(
2929 &st,
2930 None,
2931 Some(crate::DataType::DiscoveryAnnounce),
2932 side_id,
2933 ) {
2934 return false;
2935 }
2936 let _ = side;
2937 true
2938 });
2939 if !st.sides.iter().any(|side| side.is_some()) || !has_any {
2940 return Ok(false);
2941 }
2942 st.discovery_cadence.due(now_ms)
2943 };
2944 if !due {
2945 return Ok(false);
2946 }
2947 self.queue_discovery_announce()?;
2948 Ok(true)
2949 }
2950
2951 #[cfg(feature = "discovery")]
2952 fn learn_discovery_item(&self, src: RelaySideId, data: &RelayItem) -> TelemetryResult<()> {
2953 let pkt = match data {
2954 RelayItem::Packet(pkt) => {
2955 if !discovery::is_discovery_type(pkt.data_type()) {
2956 return Ok(());
2957 }
2958 pkt.as_ref().clone()
2959 }
2960 RelayItem::Packed(bytes) => {
2961 let env = wire_format::peek_envelope(bytes.as_ref())?;
2962 if !discovery::is_discovery_type(env.ty) {
2963 return Ok(());
2964 }
2965 if wire_format::peek_frame_info(bytes.as_ref())
2966 .ok()
2967 .is_some_and(|frame| frame.ack_only())
2968 {
2969 return Ok(());
2970 }
2971 wire_format::unpack_packet(bytes.as_ref())?
2972 }
2973 };
2974
2975 let now_ms = self.clock.now_ms();
2976 if pkt.data_type() == crate::DataType::DiscoverySchema {
2977 let snapshot = discovery::decode_discovery_schema(&pkt)?;
2978 let incoming_cost = crate::config::owned_schema_byte_cost(&snapshot);
2979 let mut st = self.state.lock();
2980 st.make_shared_queue_room(incoming_cost, RelayQueueKind::Discovery)?;
2981 let budget = st.memory.max_queue_budget;
2982 drop(st);
2983 let report = crate::config::merge_owned_schema_snapshot_with_budget(snapshot, budget)?;
2984 if report.changed() {
2985 let mut st = self.state.lock();
2986 st.fit_discovery_budget();
2987 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2988 }
2989 return Ok(());
2990 }
2991 if pkt.data_type() == crate::DataType::DiscoveryLinkCapabilities {
2992 let _ = discovery::decode_discovery_link_capabilities(&pkt)?;
2993 return Ok(());
2994 }
2995 let mut st = self.state.lock();
2996 if pkt.data_type() == crate::DataType::DiscoveryLeave {
2997 let leaving = pkt.sender();
2998 let before = st.discovery_routes.clone();
2999 for route in st.discovery_routes.values_mut() {
3000 route.announcers.remove(leaving);
3001 for sender_state in route.announcers.values_mut() {
3002 sender_state
3003 .topology_boards
3004 .retain(|board| board.sender_id != leaving);
3005 for board in sender_state.topology_boards.iter_mut() {
3006 board.connections.retain(|peer| peer != leaving);
3007 }
3008 Self::refresh_sender_topology_state(sender_state);
3009 }
3010 Self::recompute_discovery_side_state(route);
3011 }
3012 st.discovery_routes
3013 .retain(|_, route| !route.announcers.is_empty());
3014 if st.discovery_routes != before {
3015 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3016 }
3017 let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3018 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3019 return Ok(());
3020 }
3021 let address_ad = if pkt.data_type() == crate::DataType::DiscoveryAddress {
3022 Some(discovery::decode_discovery_address(&pkt)?)
3023 } else {
3024 None
3025 };
3026 let announcer_id = address_ad
3027 .as_ref()
3028 .map(|ad| ad.hostname.clone())
3029 .unwrap_or_else(|| pkt.sender().to_string());
3030 let mut route = st.discovery_routes.get(&src).cloned().unwrap_or_default();
3031 let side_link_local_enabled = st
3032 .sides
3033 .get(src)
3034 .and_then(|entry| entry.as_ref())
3035 .map(|side_ref| side_ref.opts.link_local_enabled)
3036 .unwrap_or(false);
3037 let mut sender_state = route
3038 .announcers
3039 .get(&announcer_id)
3040 .cloned()
3041 .unwrap_or_default();
3042 let changed = match pkt.data_type() {
3043 crate::DataType::DiscoveryAddress => {
3044 let ad = address_ad.expect("decoded above");
3045 let mut reachable = ad.reachable_endpoints;
3046 if !side_link_local_enabled {
3047 reachable.retain(|ep| !ep.is_link_local_only());
3048 }
3049 let board = Self::sender_topology_board_mut(&mut sender_state, &ad.hostname);
3050 let changed = board.reachable_endpoints != reachable
3051 || board.reachable_timesync_sources != ad.reachable_timesync_sources;
3052 board.reachable_endpoints = reachable;
3053 board.reachable_timesync_sources = ad.reachable_timesync_sources;
3054 Self::refresh_sender_topology_state(&mut sender_state);
3055 changed
3056 }
3057 crate::DataType::DiscoveryAnnounce => {
3058 let mut reachable = discovery::decode_discovery_announce(&pkt)?;
3059 if !side_link_local_enabled {
3060 reachable.retain(|ep| !ep.is_link_local_only());
3061 }
3062 let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3063 let changed = board.reachable_endpoints != reachable;
3064 board.reachable_endpoints = reachable;
3065 Self::refresh_sender_topology_state(&mut sender_state);
3066 changed
3067 }
3068 crate::DataType::DiscoveryTimeSyncSources => {
3069 let sources = discovery::decode_discovery_timesync_sources(&pkt)?;
3070 let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3071 let changed = board.reachable_timesync_sources != sources;
3072 board.reachable_timesync_sources = sources;
3073 Self::refresh_sender_topology_state(&mut sender_state);
3074 changed
3075 }
3076 crate::DataType::DiscoveryTopology => {
3077 let mut boards = discovery::decode_discovery_topology(&pkt)?;
3078 if !side_link_local_enabled {
3079 for board in boards.iter_mut() {
3080 board
3081 .reachable_endpoints
3082 .retain(|ep| !ep.is_link_local_only());
3083 }
3084 }
3085 let changed = sender_state.topology_boards != boards;
3086 sender_state.topology_boards = boards;
3087 Self::refresh_sender_topology_state(&mut sender_state);
3088 changed
3089 }
3090 crate::DataType::DiscoverySchema => false,
3091 _ => false,
3092 };
3093 sender_state.last_seen_ms = now_ms;
3094 route.announcers.insert(announcer_id, sender_state);
3095 Self::recompute_discovery_side_state(&mut route);
3096 st.discovery_routes.insert(src, route);
3097 st.fit_discovery_budget();
3098 if changed {
3099 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3100 }
3101 let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3102 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3103 Ok(())
3104 }
3105
3106 #[cfg(not(feature = "discovery"))]
3107 fn learn_discovery_item(&self, _src: RelaySideId, _data: &RelayItem) -> TelemetryResult<()> {
3108 Ok(())
3109 }
3110
3111 #[cfg(not(feature = "discovery"))]
3112 fn queue_discovery_announce(&self) -> TelemetryResult<()> {
3113 Ok(())
3114 }
3115
3116 #[cfg(not(feature = "discovery"))]
3117 fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
3118 Ok(false)
3119 }
3120
3121 fn process_reliable_timeouts(&self) -> TelemetryResult<()> {
3122 let now = self.clock.now_ms();
3123 let mut requeue: Vec<(RelaySideId, crate::DataType, u32)> = Vec::new();
3124
3125 {
3126 let mut st = self.state.lock();
3127 if st.reliable_tx.is_empty() {
3128 return Ok(());
3129 }
3130
3131 for ((side, ty_u32), tx_state) in st.reliable_tx.iter_mut() {
3132 let Some(ty) = crate::DataType::try_from_u32(*ty_u32) else {
3133 continue;
3134 };
3135 let sent_order: Vec<u32> = tx_state.sent_order.iter().copied().collect();
3136 for seq in sent_order {
3137 let Some(sent) = tx_state.sent.get_mut(&seq) else {
3138 continue;
3139 };
3140 if sent.queued
3141 || now.wrapping_sub(sent.last_send_ms) < runtime_reliable_retransmit_ms()
3142 {
3143 continue;
3144 }
3145 if sent.partial_acked {
3146 continue;
3147 }
3148 if sent.retries >= runtime_reliable_max_retries() {
3149 tx_state.sent.remove(&seq);
3150 tx_state.sent_order.retain(|existing| *existing != seq);
3151 continue;
3152 }
3153 sent.retries += 1;
3154 requeue.push((*side, ty, seq));
3155 }
3156 }
3157 }
3158
3159 for (side, ty, seq) in requeue {
3160 self.queue_reliable_retransmit(side, ty, seq)?;
3161 }
3162
3163 Ok(())
3164 }
3165
3166 fn get_hash(item: &RelayRxItem) -> u64 {
3170 match &item.data {
3171 RelayItem::Packet(pkt) => pkt.packet_id(),
3172 RelayItem::Packed(bytes) => {
3173 let reliable_seq = wire_format::peek_frame_info(bytes.as_ref())
3174 .ok()
3175 .and_then(|frame| frame.reliable)
3176 .and_then(|hdr| {
3177 if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
3178 None
3179 } else {
3180 Some(hdr.seq)
3181 }
3182 });
3183
3184 match wire_format::packet_id_from_wire(bytes.as_ref()) {
3185 Ok(id) => {
3186 if let Some(seq) = reliable_seq {
3187 hash_bytes_u64(id, &seq.to_le_bytes())
3188 } else {
3189 id
3190 }
3191 }
3192 Err(_e) => {
3193 let h: u64 = 0x9E37_79B9_7F4A_7C15;
3196 hash_bytes_u64(h, bytes.as_ref())
3197 }
3198 }
3199 }
3200 }
3201 }
3202
3203 fn is_duplicate_pkt(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3207 let id = Self::get_hash(item);
3208
3209 let mut st = self.state.lock();
3210 if st.recent_rx.contains(&id) {
3211 Ok(true)
3212 } else {
3213 st.push_recent_rx(id)?;
3214 Ok(false)
3215 }
3216 }
3217
3218 fn should_forward_duplicate_reliable_item(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3219 let (_, ty) = self.item_route_info(&item.data)?;
3220 if !is_reliable_type(ty)
3221 || matches!(
3222 ty,
3223 crate::DataType::ReliableAck
3224 | crate::DataType::ReliablePartialAck
3225 | crate::DataType::ReliablePacketRequest
3226 )
3227 {
3228 return Ok(false);
3229 }
3230
3231 let RemoteSidePlan::Target(sides) = self.remote_side_plan(&item.data, item.src)?;
3232 let st = self.state.lock();
3233 let now_ms = self.clock.now_ms();
3234 Ok(sides
3235 .into_iter()
3236 .any(|side| self.side_has_multiple_announcers_locked(&st, side, now_ms)))
3237 }
3238
3239 pub fn add_side_packed<F>(&self, name: &'static str, tx: F) -> RelaySideId
3244 where
3245 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3246 {
3247 self.add_side_packed_with_options(name, tx, RelaySideOptions::default())
3248 }
3249
3250 pub fn add_side_packed_small_packets<F>(
3254 &self,
3255 name: &'static str,
3256 tx: F,
3257 max_frame_bytes: usize,
3258 ) -> RelaySideId
3259 where
3260 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3261 {
3262 self.add_side_packed_with_options(
3263 name,
3264 tx,
3265 RelaySideOptions::default().with_small_packet_transport(max_frame_bytes),
3266 )
3267 }
3268
3269 pub fn add_side_packed_with_options<F>(
3275 &self,
3276 name: &'static str,
3277 tx: F,
3278 opts: RelaySideOptions,
3279 ) -> RelaySideId
3280 where
3281 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3282 {
3283 let mut st = self.state.lock();
3284 let id = st.sides.len();
3285 st.sides.push(Some(RelaySide {
3286 name,
3287 tx_handler: RelayTxHandlerFn::Packed(Arc::new(tx)),
3288 opts,
3289 }));
3290 st.side_runtime_stats
3291 .insert(id, SideRuntimeStatsInner::default());
3292 st.side_transport.insert(id, SideTransportState::default());
3293 #[cfg(feature = "discovery")]
3294 Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3295 id
3296 }
3297
3298 pub fn add_side_packet<F>(&self, name: &'static str, tx: F) -> RelaySideId
3303 where
3304 F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3305 {
3306 self.add_side_packet_with_options(name, tx, RelaySideOptions::default())
3307 }
3308
3309 pub fn add_side_packet_with_options<F>(
3311 &self,
3312 name: &'static str,
3313 tx: F,
3314 opts: RelaySideOptions,
3315 ) -> RelaySideId
3316 where
3317 F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3318 {
3319 let mut st = self.state.lock();
3320 let id = st.sides.len();
3321 st.sides.push(Some(RelaySide {
3322 name,
3323 tx_handler: RelayTxHandlerFn::Packet(Arc::new(tx)),
3324 opts,
3325 }));
3326 st.side_runtime_stats
3327 .insert(id, SideRuntimeStatsInner::default());
3328 st.side_transport.insert(id, SideTransportState::default());
3329 #[cfg(feature = "discovery")]
3330 Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3331 id
3332 }
3333
3334 pub fn remove_side(&self, side: RelaySideId) -> TelemetryResult<()> {
3339 let now_ms = self.clock.now_ms();
3340 let mut st = self.state.lock();
3341 let slot = st.sides.get_mut(side).ok_or(TelemetryError::BadArg)?;
3342 if slot.is_none() {
3343 return Err(TelemetryError::BadArg);
3344 }
3345 *slot = None;
3346 st.route_overrides
3347 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3348 st.typed_route_overrides
3349 .retain(|(src_side, _, dst_side), _| *src_side != Some(side) && *dst_side != side);
3350 st.route_weights
3351 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3352 st.route_priorities
3353 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3354 st.source_route_modes.remove(&Some(side));
3355 st.route_selection_cursors.remove(&Some(side));
3356 st.adaptive_route_stats.remove(&side);
3357 #[cfg(feature = "discovery")]
3358 st.discovery_side_throttle.remove(&side);
3359 #[cfg(all(feature = "discovery", feature = "timesync"))]
3360 st.timesync_side_throttle.remove(&side);
3361 st.side_runtime_stats.remove(&side);
3362 st.reliable_return_routes
3363 .retain(|_, route| route.side != side);
3364 st.rx_queue.retain(|queued| queued.src != side);
3365 st.tx_queue
3366 .retain(|queued| queued.dst != side && queued.src != Some(side));
3367 st.replay_queue.retain(|queued| queued.dst != side);
3368 st.reliable_tx.retain(|(side_id, _), _| *side_id != side);
3369 st.reliable_rx.retain(|(side_id, _), _| *side_id != side);
3370 #[cfg(feature = "discovery")]
3371 {
3372 st.discovery_routes.remove(&side);
3373 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3374 }
3375 Ok(())
3376 }
3377
3378 pub fn set_side_ingress_enabled(
3380 &self,
3381 side: RelaySideId,
3382 enabled: bool,
3383 ) -> TelemetryResult<()> {
3384 let now_ms = self.clock.now_ms();
3385 let mut st = self.state.lock();
3386 let side_ref = st
3387 .sides
3388 .get_mut(side)
3389 .and_then(|side| side.as_mut())
3390 .ok_or(TelemetryError::BadArg)?;
3391 side_ref.opts.ingress_enabled = enabled;
3392 #[cfg(feature = "discovery")]
3393 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3394 Ok(())
3395 }
3396
3397 pub fn set_side_egress_enabled(&self, side: RelaySideId, enabled: bool) -> TelemetryResult<()> {
3399 let now_ms = self.clock.now_ms();
3400 let mut st = self.state.lock();
3401 let side_ref = st
3402 .sides
3403 .get_mut(side)
3404 .and_then(|side| side.as_mut())
3405 .ok_or(TelemetryError::BadArg)?;
3406 side_ref.opts.egress_enabled = enabled;
3407 if !enabled {
3408 st.tx_queue.retain(|queued| queued.dst != side);
3409 st.replay_queue.retain(|queued| queued.dst != side);
3410 }
3411 #[cfg(feature = "discovery")]
3412 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3413 Ok(())
3414 }
3415
3416 pub fn set_source_route_mode(
3420 &self,
3421 src: Option<RelaySideId>,
3422 mode: RouteSelectionMode,
3423 ) -> TelemetryResult<()> {
3424 let now_ms = self.clock.now_ms();
3425 let mut st = self.state.lock();
3426 if let Some(src) = src {
3427 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3428 }
3429 if mode == RouteSelectionMode::Fanout {
3430 st.source_route_modes.remove(&src);
3431 } else {
3432 st.source_route_modes.insert(src, mode);
3433 }
3434 st.route_selection_cursors.remove(&src);
3435 #[cfg(feature = "discovery")]
3436 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3437 Ok(())
3438 }
3439
3440 pub fn clear_source_route_mode(&self, src: Option<RelaySideId>) -> TelemetryResult<()> {
3442 self.set_source_route_mode(src, RouteSelectionMode::Fanout)
3443 }
3444
3445 pub fn set_route_weight(
3447 &self,
3448 src: Option<RelaySideId>,
3449 dst: RelaySideId,
3450 weight: u32,
3451 ) -> TelemetryResult<()> {
3452 let now_ms = self.clock.now_ms();
3453 let mut st = self.state.lock();
3454 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3455 if let Some(src) = src {
3456 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3457 }
3458 st.route_weights.insert((src, dst), weight);
3459 st.route_selection_cursors.remove(&src);
3460 #[cfg(feature = "discovery")]
3461 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3462 Ok(())
3463 }
3464
3465 pub fn clear_route_weight(
3467 &self,
3468 src: Option<RelaySideId>,
3469 dst: RelaySideId,
3470 ) -> TelemetryResult<()> {
3471 let now_ms = self.clock.now_ms();
3472 let mut st = self.state.lock();
3473 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3474 if let Some(src) = src {
3475 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3476 }
3477 st.route_weights.remove(&(src, dst));
3478 st.route_selection_cursors.remove(&src);
3479 #[cfg(feature = "discovery")]
3480 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3481 Ok(())
3482 }
3483
3484 pub fn set_route_priority(
3486 &self,
3487 src: Option<RelaySideId>,
3488 dst: RelaySideId,
3489 priority: u32,
3490 ) -> TelemetryResult<()> {
3491 let now_ms = self.clock.now_ms();
3492 let mut st = self.state.lock();
3493 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3494 if let Some(src) = src {
3495 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3496 }
3497 st.route_priorities.insert((src, dst), priority);
3498 #[cfg(feature = "discovery")]
3499 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3500 Ok(())
3501 }
3502
3503 pub fn clear_route_priority(
3505 &self,
3506 src: Option<RelaySideId>,
3507 dst: RelaySideId,
3508 ) -> TelemetryResult<()> {
3509 let now_ms = self.clock.now_ms();
3510 let mut st = self.state.lock();
3511 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3512 if let Some(src) = src {
3513 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3514 }
3515 st.route_priorities.remove(&(src, dst));
3516 #[cfg(feature = "discovery")]
3517 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3518 Ok(())
3519 }
3520
3521 pub fn set_route(
3523 &self,
3524 src: Option<RelaySideId>,
3525 dst: RelaySideId,
3526 enabled: bool,
3527 ) -> TelemetryResult<()> {
3528 let now_ms = self.clock.now_ms();
3529 let mut st = self.state.lock();
3530 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3531 if let Some(src) = src {
3532 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3533 }
3534 st.route_overrides.insert((src, dst), enabled);
3535 #[cfg(feature = "discovery")]
3536 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3537 Ok(())
3538 }
3539
3540 pub fn set_typed_route(
3542 &self,
3543 src: Option<RelaySideId>,
3544 ty: crate::DataType,
3545 dst: RelaySideId,
3546 enabled: bool,
3547 ) -> TelemetryResult<()> {
3548 let now_ms = self.clock.now_ms();
3549 let mut st = self.state.lock();
3550 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3551 if let Some(src) = src {
3552 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3553 }
3554 st.typed_route_overrides
3555 .insert((src, ty.as_u32(), dst), enabled);
3556 #[cfg(feature = "discovery")]
3557 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3558 Ok(())
3559 }
3560
3561 pub fn clear_typed_route(
3563 &self,
3564 src: Option<RelaySideId>,
3565 ty: crate::DataType,
3566 dst: RelaySideId,
3567 ) -> TelemetryResult<()> {
3568 let now_ms = self.clock.now_ms();
3569 let mut st = self.state.lock();
3570 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3571 if let Some(src) = src {
3572 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3573 }
3574 st.typed_route_overrides.remove(&(src, ty.as_u32(), dst));
3575 #[cfg(feature = "discovery")]
3576 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3577 Ok(())
3578 }
3579
3580 pub fn clear_route(&self, src: Option<RelaySideId>, dst: RelaySideId) -> TelemetryResult<()> {
3582 let now_ms = self.clock.now_ms();
3583 let mut st = self.state.lock();
3584 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3585 if let Some(src) = src {
3586 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3587 }
3588 st.route_overrides.remove(&(src, dst));
3589 #[cfg(feature = "discovery")]
3590 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3591 Ok(())
3592 }
3593
3594 #[cfg(feature = "discovery")]
3595 pub fn announce_discovery(&self) -> TelemetryResult<()> {
3597 self.queue_discovery_announce()
3598 }
3599
3600 pub fn announce_leave(&self) -> TelemetryResult<()> {
3602 let pkt = discovery::build_discovery_leave("relay", self.clock.now_ms())?;
3603 let mut st = self.state.lock();
3604 let dsts: Vec<usize> = st
3605 .sides
3606 .iter()
3607 .enumerate()
3608 .filter_map(|(idx, side)| side.as_ref().map(|_| idx))
3609 .collect();
3610 for dst in dsts {
3611 let data = RelayItem::Packet(Arc::new(pkt.clone()));
3612 let priority = Self::relay_item_priority(&data)?;
3613 st.push_tx(RelayTxItem {
3614 src: None,
3615 dst,
3616 data,
3617 priority,
3618 })?;
3619 }
3620 Ok(())
3621 }
3622
3623 #[cfg(feature = "discovery")]
3624 pub fn poll_discovery(&self) -> TelemetryResult<bool> {
3626 self.poll_discovery_announce()
3627 }
3628
3629 #[cfg(feature = "discovery")]
3630 pub fn export_topology(&self) -> TopologySnapshot {
3632 let now_ms = self.clock.now_ms();
3633 let mut st = self.state.lock();
3634 if Self::prune_discovery_routes_locked(&mut st, now_ms) {
3635 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3636 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3637 }
3638 let routes = st
3639 .discovery_routes
3640 .iter()
3641 .filter_map(|(&side_id, route)| {
3642 let side = st.sides.get(side_id).and_then(|side| side.as_ref())?;
3643 let announcers = route
3644 .announcers
3645 .iter()
3646 .map(|(sender_id, sender_state)| TopologyAnnouncerRoute {
3647 sender_id: sender_id.clone(),
3648 reachable_endpoints: sender_state
3649 .reachable
3650 .iter()
3651 .copied()
3652 .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3653 .collect(),
3654 reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
3655 routers: sender_state.topology_boards.clone(),
3656 last_seen_ms: sender_state.last_seen_ms,
3657 age_ms: now_ms.saturating_sub(sender_state.last_seen_ms),
3658 })
3659 .collect();
3660 Some(TopologySideRoute {
3661 side_id,
3662 side_name: side.name,
3663 reachable_endpoints: route
3664 .reachable
3665 .iter()
3666 .copied()
3667 .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3668 .collect(),
3669 reachable_timesync_sources: route.reachable_timesync_sources.clone(),
3670 announcers,
3671 last_seen_ms: route.last_seen_ms,
3672 age_ms: now_ms.saturating_sub(route.last_seen_ms),
3673 })
3674 })
3675 .collect();
3676 let routers = self.advertised_discovery_topology_for_link_locked(&st, now_ms, true);
3677 let advertised_endpoints =
3678 self.advertised_discovery_endpoints_for_link_locked(&st, now_ms, true);
3679 let advertised_timesync_sources =
3680 self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
3681 let links = discovery::topology_links_from_boards(&routers);
3682 TopologySnapshot {
3683 advertised_endpoints,
3684 advertised_timesync_sources,
3685 routers,
3686 links,
3687 routes,
3688 current_announce_interval_ms: st.discovery_cadence.current_interval_ms,
3689 next_announce_ms: st.discovery_cadence.next_announce_ms,
3690 }
3691 }
3692
3693 #[cfg(feature = "discovery")]
3694 pub fn client_stats(&self, sender_id: &str) -> Option<ClientStatsSnapshot> {
3695 let now_ms = self.clock.now_ms();
3696 let st = self.state.lock();
3697 let mut side_ids = Vec::new();
3698 let mut side_names = Vec::new();
3699 let mut last_seen_ms = None::<u64>;
3700 let mut reachable_endpoints = Vec::new();
3701 let mut reachable_timesync_sources = Vec::new();
3702 let mut packets_sent = 0u64;
3703 let mut packets_received = 0u64;
3704 let mut bytes_sent = 0u64;
3705 let mut bytes_received = 0u64;
3706
3707 for (side_id, route) in &st.discovery_routes {
3708 let Some(sender_state) = route.announcers.get(sender_id) else {
3709 continue;
3710 };
3711 side_ids.push(*side_id);
3712 if let Some(side_name) = st
3713 .sides
3714 .get(*side_id)
3715 .and_then(|side| side.as_ref())
3716 .map(|side| side.name)
3717 {
3718 side_names.push(side_name);
3719 }
3720 last_seen_ms = Some(last_seen_ms.unwrap_or(0).max(sender_state.last_seen_ms));
3721 reachable_endpoints.extend(sender_state.reachable.iter().copied());
3722 reachable_timesync_sources
3723 .extend(sender_state.reachable_timesync_sources.iter().cloned());
3724 if let Some(stats) = st.side_runtime_stats.get(side_id) {
3725 packets_sent = packets_sent.saturating_add(stats.tx_packets);
3726 packets_received = packets_received.saturating_add(stats.rx_packets);
3727 bytes_sent = bytes_sent.saturating_add(stats.tx_bytes);
3728 bytes_received = bytes_received.saturating_add(stats.rx_bytes);
3729 }
3730 }
3731
3732 if side_ids.is_empty() {
3733 return None;
3734 }
3735 reachable_endpoints.retain(|ep| !discovery::is_router_control_endpoint(*ep));
3736 reachable_endpoints.sort_unstable();
3737 reachable_endpoints.dedup();
3738 reachable_timesync_sources.sort_unstable();
3739 reachable_timesync_sources.dedup();
3740 side_ids.sort_unstable();
3741 side_ids.dedup();
3742 side_names.sort_unstable();
3743 side_names.dedup();
3744 let age_ms = last_seen_ms.map(|seen| now_ms.saturating_sub(seen));
3745 Some(ClientStatsSnapshot {
3746 sender_id: sender_id.to_string(),
3747 connected: age_ms.is_some_and(|age| age <= DISCOVERY_ROUTE_TTL_MS),
3748 side_ids,
3749 side_names,
3750 last_seen_ms,
3751 age_ms,
3752 reachable_endpoints,
3753 reachable_timesync_sources,
3754 packets_sent,
3755 packets_received,
3756 bytes_sent,
3757 bytes_received,
3758 })
3759 }
3760
3761 pub fn export_runtime_stats(&self) -> RuntimeStatsSnapshot {
3762 let now_ms = self.clock.now_ms();
3763 let st = self.state.lock();
3764
3765 let mut sides = Vec::new();
3766 for (side_id, side) in st.sides.iter().enumerate() {
3767 let Some(side) = side.as_ref() else { continue };
3768 let stats = st
3769 .side_runtime_stats
3770 .get(&side_id)
3771 .cloned()
3772 .unwrap_or_default();
3773 let adaptive = st
3774 .adaptive_route_stats
3775 .get(&side_id)
3776 .cloned()
3777 .unwrap_or_default()
3778 .snapshot(now_ms, true);
3779 let (tx_template_count, rx_template_count) = st
3780 .side_transport
3781 .get(&side_id)
3782 .map(|state| (state.tx_template_count(), state.rx_template_count()))
3783 .unwrap_or((0, 0));
3784 let mut data_types: Vec<RuntimeTypeStats> = stats
3785 .data_types
3786 .into_iter()
3787 .map(|(ty, item)| RuntimeTypeStats {
3788 data_type: crate::DataType(ty),
3789 tx_packets: item.tx_packets,
3790 tx_bytes: item.tx_bytes,
3791 rx_packets: item.rx_packets,
3792 rx_bytes: item.rx_bytes,
3793 relayed_tx_packets: item.relayed_tx_packets,
3794 relayed_tx_bytes: item.relayed_tx_bytes,
3795 relayed_rx_packets: item.relayed_rx_packets,
3796 relayed_rx_bytes: item.relayed_rx_bytes,
3797 tx_retries: item.tx_retries,
3798 handler_failures: item.handler_failures,
3799 })
3800 .collect();
3801 data_types.sort_unstable_by_key(|item| item.data_type.as_u32());
3802 sides.push(RuntimeSideStats {
3803 side_id,
3804 side_name: side.name,
3805 reliable_enabled: side.opts.reliable_enabled,
3806 link_local_enabled: side.opts.link_local_enabled,
3807 header_template_enabled: side.opts.header_template_enabled,
3808 max_frame_bytes: side.opts.max_frame_bytes,
3809 compact_header_target_bytes: side.opts.compact_header_target_bytes,
3810 side_transport_profile: side.opts.effective_transport_profile().as_str(),
3811 ingress_enabled: side.opts.ingress_enabled,
3812 egress_enabled: side.opts.egress_enabled,
3813 tx_packets: stats.tx_packets,
3814 tx_bytes: stats.tx_bytes,
3815 rx_packets: stats.rx_packets,
3816 rx_bytes: stats.rx_bytes,
3817 relayed_tx_packets: stats.relayed_tx_packets,
3818 relayed_tx_bytes: stats.relayed_tx_bytes,
3819 relayed_rx_packets: stats.relayed_rx_packets,
3820 relayed_rx_bytes: stats.relayed_rx_bytes,
3821 local_delivery_packets: 0,
3822 tx_retries: stats.tx_retries,
3823 tx_handler_failures: stats.tx_handler_failures,
3824 local_handler_failures: 0,
3825 total_handler_retries: stats.total_handler_retries,
3826 side_transport_full_frames: stats.side_transport_full_frames,
3827 side_transport_compact_frames: stats.side_transport_compact_frames,
3828 side_transport_compact_delta_frames: stats.side_transport_compact_delta_frames,
3829 side_transport_compact_omitted_timestamp_frames: stats
3830 .side_transport_compact_omitted_timestamp_frames,
3831 side_transport_chunk_frames: stats.side_transport_chunk_frames,
3832 side_transport_raw_bytes: stats.side_transport_raw_bytes,
3833 side_transport_wire_bytes: stats.side_transport_wire_bytes,
3834 side_transport_bytes_saved: stats.side_transport_bytes_saved,
3835 side_transport_min_compact_overhead_bytes: stats
3836 .side_transport_min_compact_overhead_bytes,
3837 side_transport_max_compact_overhead_bytes: stats
3838 .side_transport_max_compact_overhead_bytes,
3839 side_transport_compact_target_misses: stats.side_transport_compact_target_misses,
3840 side_transport_template_evictions: stats.side_transport_template_evictions,
3841 side_transport_tx_template_count: tx_template_count,
3842 side_transport_rx_template_count: rx_template_count,
3843 max_side_transport_templates: side.opts.max_side_transport_templates,
3844 adaptive,
3845 data_types,
3846 });
3847 }
3848
3849 let mut route_modes: Vec<RouteModeStats> = st
3850 .route_selection_cursors
3851 .iter()
3852 .map(|(src, cursor)| RouteModeStats {
3853 src_side_id: *src,
3854 selection_mode: st.source_route_modes.get(src).copied(),
3855 cursor: *cursor,
3856 })
3857 .collect();
3858 for src in st.source_route_modes.keys() {
3859 if !route_modes.iter().any(|mode| mode.src_side_id == *src) {
3860 route_modes.push(RouteModeStats {
3861 src_side_id: *src,
3862 selection_mode: st.source_route_modes.get(src).copied(),
3863 cursor: 0,
3864 });
3865 }
3866 }
3867 route_modes.sort_unstable_by_key(|mode| mode.src_side_id.unwrap_or(usize::MAX));
3868
3869 let mut route_overrides: Vec<RouteOverrideStats> = st
3870 .route_overrides
3871 .iter()
3872 .map(|((src, dst), enabled)| RouteOverrideStats {
3873 src_side_id: *src,
3874 dst_side_id: *dst,
3875 enabled: *enabled,
3876 })
3877 .collect();
3878 route_overrides.sort_unstable_by_key(|item| {
3879 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3880 });
3881
3882 let mut typed_route_overrides: Vec<TypedRouteOverrideStats> = st
3883 .typed_route_overrides
3884 .iter()
3885 .map(|((src, ty, dst), enabled)| TypedRouteOverrideStats {
3886 src_side_id: *src,
3887 data_type: crate::DataType(*ty),
3888 dst_side_id: *dst,
3889 enabled: *enabled,
3890 })
3891 .collect();
3892 typed_route_overrides.sort_unstable_by_key(|item| {
3893 (
3894 item.src_side_id.unwrap_or(usize::MAX),
3895 item.data_type.as_u32(),
3896 item.dst_side_id,
3897 )
3898 });
3899
3900 let mut route_weights: Vec<RouteWeightStats> = st
3901 .route_weights
3902 .iter()
3903 .map(|((src, dst), weight)| RouteWeightStats {
3904 src_side_id: *src,
3905 dst_side_id: *dst,
3906 weight: *weight,
3907 })
3908 .collect();
3909 route_weights.sort_unstable_by_key(|item| {
3910 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3911 });
3912
3913 let mut route_priorities: Vec<RoutePriorityStats> = st
3914 .route_priorities
3915 .iter()
3916 .map(|((src, dst), priority)| RoutePriorityStats {
3917 src_side_id: *src,
3918 dst_side_id: *dst,
3919 priority: *priority,
3920 })
3921 .collect();
3922 route_priorities.sort_unstable_by_key(|item| {
3923 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3924 });
3925
3926 #[cfg(feature = "discovery")]
3927 let discovery = DiscoveryRuntimeStats {
3928 route_count: st.discovery_routes.len(),
3929 announcer_count: st
3930 .discovery_routes
3931 .values()
3932 .map(|route| route.announcers.len())
3933 .sum(),
3934 current_announce_interval_ms: Some(st.discovery_cadence.current_interval_ms),
3935 next_announce_ms: Some(st.discovery_cadence.next_announce_ms),
3936 };
3937 #[cfg(not(feature = "discovery"))]
3938 let discovery = DiscoveryRuntimeStats {
3939 route_count: 0,
3940 announcer_count: 0,
3941 current_announce_interval_ms: None,
3942 next_announce_ms: None,
3943 };
3944
3945 RuntimeStatsSnapshot {
3946 sides,
3947 route_modes,
3948 route_overrides,
3949 typed_route_overrides,
3950 route_weights,
3951 route_priorities,
3952 queues: QueueRuntimeStats {
3953 rx_len: st.rx_queue.len(),
3954 rx_bytes: st.rx_queue.bytes_used(),
3955 tx_len: st.tx_queue.len(),
3956 tx_bytes: st.tx_queue.bytes_used(),
3957 replay_len: st.replay_queue.len(),
3958 replay_bytes: st.replay_queue.bytes_used(),
3959 recent_rx_len: st.recent_rx.len(),
3960 recent_rx_bytes: st.recent_rx.bytes_used(),
3961 reliable_rx_buffered_len: st.reliable_rx_buffer_len(),
3962 reliable_rx_buffered_bytes: st.reliable_rx_buffered_bytes(),
3963 shared_queue_bytes_used: st.shared_queue_bytes_used(),
3964 },
3965 reliable: ReliableRuntimeStats {
3966 reliable_return_route_count: st.reliable_return_routes.len(),
3967 end_to_end_pending_count: 0,
3968 end_to_end_pending_destination_count: 0,
3969 end_to_end_acked_cache_count: st.end_to_end_acked_destinations.len(),
3970 },
3971 discovery,
3972 total_handler_failures: st.total_handler_failures,
3973 total_handler_retries: st.total_handler_retries,
3974 }
3975 }
3976
3977 pub fn export_memory_layout_json(&self) -> String {
3979 let st = self.state.lock();
3980 #[cfg(feature = "discovery")]
3981 let discovery_bytes = st.discovery_bytes_used();
3982 #[cfg(not(feature = "discovery"))]
3983 let discovery_bytes = 0usize;
3984 let schema_bytes = crate::config::schema_bytes_used();
3985 let memory = st.memory;
3986 let mut out = String::new();
3987 let _ = core::fmt::Write::write_fmt(
3988 &mut out,
3989 format_args!(
3990 "{{\"kind\":\"relay\",\
3991 \"shared_queue_bytes_used\":{},\"shared_queue_bytes_allocated\":{},\
3992 \"rx_queue_bytes_used\":{},\"rx_queue_bytes_allocated\":{},\"rx_queue_len\":{},\
3993 \"tx_queue_bytes_used\":{},\"tx_queue_bytes_allocated\":{},\"tx_queue_len\":{},\
3994 \"replay_queue_bytes_used\":{},\"replay_queue_bytes_allocated\":{},\"replay_queue_len\":{},\
3995 \"recent_rx_bytes_used\":{},\"recent_rx_bytes_allocated\":{},\"recent_rx_len\":{},\
3996 \"reliable_rx_buffer_bytes_used\":{},\"reliable_rx_buffer_bytes_allocated\":{},\"reliable_rx_buffer_len\":{},\
3997 \"discovery_bytes_used\":{},\"discovery_bytes_allocated\":{},\
3998 \"schema_bytes_used\":{},\"schema_bytes_allocated\":{}}}",
3999 st.shared_queue_bytes_used(),
4000 memory.max_queue_budget,
4001 st.rx_queue.bytes_used(),
4002 st.rx_queue.max_bytes(),
4003 st.rx_queue.len(),
4004 st.tx_queue.bytes_used(),
4005 st.tx_queue.max_bytes(),
4006 st.tx_queue.len(),
4007 st.replay_queue.bytes_used(),
4008 st.replay_queue.max_bytes(),
4009 st.replay_queue.len(),
4010 st.recent_rx.bytes_used(),
4011 st.recent_rx.max_bytes(),
4012 st.recent_rx.len(),
4013 st.reliable_rx_buffered_bytes(),
4014 memory.max_queue_budget,
4015 st.reliable_rx_buffer_len(),
4016 discovery_bytes,
4017 memory.max_queue_budget,
4018 schema_bytes,
4019 memory.max_queue_budget,
4020 ),
4021 );
4022 out
4023 }
4024
4025 #[cfg(test)]
4026 pub(crate) fn debug_end_to_end_acked_destination_count(&self, packet_id: u64) -> Option<usize> {
4027 let st = self.state.lock();
4028 st.end_to_end_acked_destinations
4029 .get(&packet_id)
4030 .map(BTreeSet::len)
4031 }
4032
4033 #[cfg(test)]
4034 pub(crate) fn debug_end_to_end_acked_packet_count(&self) -> usize {
4035 let st = self.state.lock();
4036 st.end_to_end_acked_destinations.len()
4037 }
4038
4039 #[cfg(test)]
4040 pub(crate) fn debug_reliable_return_route_count(&self) -> usize {
4041 let st = self.state.lock();
4042 st.reliable_return_routes.len()
4043 }
4044
4045 pub fn rx_packed_from_side(&self, src: RelaySideId, bytes: &[u8]) -> TelemetryResult<()> {
4050 self.ensure_side_ingress_enabled(src)?;
4051 let Some(bytes) = self.decode_side_transport_frame(src, bytes)? else {
4052 return Ok(());
4053 };
4054 let mut st = self.state.lock();
4055
4056 let data = RelayItem::Packed(bytes);
4057 let priority = Self::relay_item_priority(&data)?;
4058 st.push_rx(RelayRxItem {
4059 src,
4060 data,
4061 priority,
4062 })
4063 }
4064
4065 pub fn rx_from_side(&self, src: RelaySideId, packet: Packet) -> TelemetryResult<()> {
4069 self.ensure_side_ingress_enabled(src)?;
4070 let mut st = self.state.lock();
4071
4072 let data = RelayItem::Packet(Arc::new(packet));
4073 let priority = Self::relay_item_priority(&data)?;
4074 st.push_rx(RelayRxItem {
4075 src,
4076 data,
4077 priority,
4078 })
4079 }
4080
4081 pub fn clear_queues(&self) {
4083 let mut st = self.state.lock();
4084 st.rx_queue.clear();
4085 st.tx_queue.clear();
4086 }
4087
4088 pub fn clear_rx_queue(&self) {
4090 let mut st = self.state.lock();
4091 st.rx_queue.clear();
4092 }
4093
4094 pub fn clear_tx_queue(&self) {
4096 let mut st = self.state.lock();
4097 st.tx_queue.clear();
4098 st.replay_queue.clear();
4099 }
4100
4101 fn process_rx_queue_item(&self, item: RelayRxItem) -> TelemetryResult<()> {
4105 self.ensure_side_ingress_enabled(item.src)?;
4106 match &item.data {
4107 RelayItem::Packet(pkt) => {
4108 let bytes = wire_format::pack_packet(pkt).len();
4109 self.note_side_rx(item.src, pkt.data_type(), bytes);
4110 }
4111 RelayItem::Packed(bytes) => {
4112 if let Ok(env) = wire_format::peek_envelope(bytes.as_ref()) {
4113 self.note_side_rx(item.src, env.ty, bytes.len());
4114 }
4115 }
4116 }
4117 match &item.data {
4118 RelayItem::Packet(pkt) => {
4119 if is_reliable_type(pkt.data_type()) && !is_internal_control_type(pkt.data_type()) {
4120 self.note_reliable_return_route(item.src, pkt.packet_id());
4121 }
4122 }
4123 RelayItem::Packed(bytes) => {
4124 if let Ok(env) = wire_format::peek_envelope(bytes.as_ref())
4125 && is_reliable_type(env.ty)
4126 && !is_internal_control_type(env.ty)
4127 && let Ok(packet_id) = wire_format::packet_id_from_wire(bytes.as_ref())
4128 {
4129 self.note_reliable_return_route(item.src, packet_id);
4130 }
4131 }
4132 }
4133 let mut released_buffered: Vec<Arc<[u8]>> = Vec::new();
4134 if let RelayItem::Packed(bytes) = &item.data {
4135 let (_opts, handler_is_packed, hop_reliable_enabled) = {
4136 let st = self.state.lock();
4137 let side_ref = Self::side_ref(&st, item.src)?;
4138 let opts = side_ref.opts;
4139 (
4140 opts,
4141 matches!(side_ref.tx_handler, RelayTxHandlerFn::Packed(_)),
4142 opts.reliable_enabled
4143 && !self.side_has_multiple_announcers_locked(
4144 &st,
4145 item.src,
4146 self.clock.now_ms(),
4147 ),
4148 )
4149 };
4150
4151 let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4152 Ok(frame) => frame,
4153 Err(e) => {
4154 if matches!(e, TelemetryError::Unpack(msg) if msg == "crc32 mismatch")
4155 && hop_reliable_enabled
4156 && handler_is_packed
4157 && let Ok(frame) = wire_format::peek_frame_info_unchecked(bytes.as_ref())
4158 {
4159 if is_reliable_type(frame.envelope.ty)
4160 && let Some(hdr) = frame.reliable
4161 {
4162 let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4163 let unsequenced =
4164 (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4165
4166 if !unsequenced {
4167 let requested = if unordered {
4168 hdr.seq
4169 } else {
4170 let mut st = self.state.lock();
4171 let rx_state = self.reliable_rx_state_mut(
4172 &mut st,
4173 item.src,
4174 frame.envelope.ty,
4175 );
4176 rx_state.expected_seq.min(hdr.seq)
4177 };
4178 self.queue_reliable_packet_request(
4179 item.src,
4180 frame.envelope.ty,
4181 requested,
4182 )?;
4183 }
4184 }
4185 return Ok(());
4186 }
4187 return Err(e);
4188 }
4189 };
4190
4191 if hop_reliable_enabled
4192 && handler_is_packed
4193 && is_reliable_type(frame.envelope.ty)
4194 && let Some(hdr) = frame.reliable
4195 {
4196 if frame.ack_only() {
4197 self.handle_reliable_ack(item.src, frame.envelope.ty, hdr.ack);
4198 return Ok(());
4199 }
4200 let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4201 let unsequenced = (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4202
4203 if !unsequenced {
4204 if unordered {
4205 self.queue_reliable_ack(item.src, frame.envelope.ty, hdr.seq)?;
4206 } else {
4207 let mut release: Vec<Arc<[u8]>> = Vec::new();
4208 let mut last_delivered = None;
4209 let mut ack_old = None;
4210 let mut request_missing = None;
4211 let mut partial_ack = None;
4212 {
4213 let mut st = self.state.lock();
4214 let rx_state =
4215 self.reliable_rx_state_mut(&mut st, item.src, frame.envelope.ty);
4216 let expected_seq = rx_state.expected_seq;
4217 if hdr.seq < expected_seq {
4218 ack_old = Some(expected_seq.saturating_sub(1));
4219 } else if hdr.seq > expected_seq {
4220 request_missing = Some(expected_seq);
4221 partial_ack = Some(hdr.seq);
4222 st.buffer_reliable_rx(
4223 item.src,
4224 frame.envelope.ty,
4225 hdr.seq,
4226 bytes.clone(),
4227 )?;
4228 } else {
4229 release.push(bytes.clone());
4230 last_delivered = Some(hdr.seq);
4231 let mut next_expected = hdr.seq.wrapping_add(1);
4232 while let Some(buf) = rx_state.buffered.remove(&next_expected) {
4233 release.push(buf);
4234 last_delivered = Some(next_expected);
4235 let next = next_expected.wrapping_add(1);
4236 next_expected = if next == 0 { 1 } else { next };
4237 }
4238 rx_state.expected_seq = next_expected;
4239 }
4240 }
4241
4242 if let Some(ack_seq) = ack_old {
4243 self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4244 return Ok(());
4245 }
4246 if let Some(request_seq) = request_missing {
4247 if let Some(partial_seq) = partial_ack {
4248 self.queue_reliable_partial_ack(
4249 item.src,
4250 frame.envelope.ty,
4251 partial_seq,
4252 )?;
4253 }
4254 self.queue_reliable_packet_request(
4255 item.src,
4256 frame.envelope.ty,
4257 request_seq,
4258 )?;
4259 return Ok(());
4260 }
4261 if let Some(ack_seq) = last_delivered {
4262 self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4263 }
4264 released_buffered.extend(release.into_iter().skip(1));
4265 }
4266 }
4267 }
4268 }
4269
4270 if self.is_duplicate_pkt(&item)? && !self.should_forward_duplicate_reliable_item(&item)? {
4271 return Ok(());
4273 }
4274
4275 self.dispatch_relay_rx_item(&item)?;
4276
4277 for release_bytes in released_buffered {
4278 let release_item = RelayRxItem {
4279 src: item.src,
4280 priority: Self::relay_item_priority(&RelayItem::Packed(release_bytes.clone()))?,
4281 data: RelayItem::Packed(release_bytes),
4282 };
4283 if self.is_duplicate_pkt(&release_item)?
4284 && !self.should_forward_duplicate_reliable_item(&release_item)?
4285 {
4286 continue;
4287 }
4288 self.dispatch_relay_rx_item(&release_item)?;
4289 }
4290 Ok(())
4291 }
4292
4293 fn dispatch_relay_rx_item(&self, item: &RelayRxItem) -> TelemetryResult<()> {
4294 match &item.data {
4295 RelayItem::Packet(pkt) => {
4296 if matches!(
4297 pkt.data_type(),
4298 crate::DataType::ReliableAck
4299 | crate::DataType::ReliablePartialAck
4300 | crate::DataType::ReliablePacketRequest
4301 ) {
4302 if pkt.data_type() == crate::DataType::ReliableAck
4303 && Self::is_end_to_end_ack_sender(pkt.sender())
4304 && Self::decode_end_to_end_reliable_ack(pkt.payload()).is_ok()
4305 {
4306 if let Ok(packet_id) = Self::decode_end_to_end_reliable_ack(pkt.payload())
4307 && let Some(sender_hash) =
4308 Self::decode_end_to_end_ack_sender_hash(pkt.sender())
4309 {
4310 let mut st = self.state.lock();
4311 Self::note_end_to_end_acked_destination_locked(
4312 &mut st,
4313 packet_id,
4314 sender_hash,
4315 );
4316 }
4317 } else {
4318 let vals = pkt.data_as_u32()?;
4319 if vals.len() != 2 {
4320 return Err(TelemetryError::Unpack("bad reliable control payload"));
4321 }
4322 let ty = crate::DataType::try_from_u32(vals[0])
4323 .ok_or(TelemetryError::InvalidType)?;
4324 let seq = vals[1];
4325 match pkt.data_type() {
4326 crate::DataType::ReliableAck => {
4327 self.handle_reliable_ack(item.src, ty, seq)
4328 }
4329 crate::DataType::ReliablePartialAck => {
4330 self.handle_reliable_partial_ack(item.src, ty, seq)
4331 }
4332 crate::DataType::ReliablePacketRequest => {
4333 self.queue_reliable_retransmit(item.src, ty, seq)?
4334 }
4335 _ => {}
4336 }
4337 return Ok(());
4338 }
4339 }
4340 }
4341 RelayItem::Packed(bytes) => {
4342 let env = wire_format::peek_envelope(bytes.as_ref())?;
4343 if matches!(
4344 env.ty,
4345 crate::DataType::ReliableAck
4346 | crate::DataType::ReliablePacketRequest
4347 | crate::DataType::ReliablePartialAck
4348 ) {
4349 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4350 return self.dispatch_relay_rx_item(&RelayRxItem {
4351 src: item.src,
4352 data: RelayItem::Packet(Arc::new(pkt)),
4353 priority: item.priority,
4354 });
4355 }
4356 }
4357 }
4358
4359 let src = item.src;
4360 let data = item.data.clone();
4361 self.learn_discovery_item(src, &data)?;
4362
4363 let plan = self.remote_side_plan(&data, src)?;
4364 let mut st = self.state.lock();
4365 let RemoteSidePlan::Target(sides) = plan;
4366 for dst in sides {
4367 let priority = Self::relay_item_priority(&data)?;
4368 st.push_tx(RelayTxItem {
4369 src: Some(src),
4370 dst,
4371 data: data.clone(),
4372 priority,
4373 })?;
4374 }
4375 Ok(())
4376 }
4377
4378 #[inline]
4379 fn crc32_bytes(data: &[u8]) -> u32 {
4380 let mut hasher = Crc32Hasher::new();
4381 hasher.update(data);
4382 hasher.finalize()
4383 }
4384
4385 fn wrap_side_transport_frame(kind: u8, body: &[u8]) -> Arc<[u8]> {
4386 let mut out = Vec::with_capacity(
4387 SIDE_TRANSPORT_MAGIC.len() + 1 + body.len() + wire_format::CRC32_BYTES,
4388 );
4389 out.extend_from_slice(SIDE_TRANSPORT_MAGIC);
4390 out.push(kind);
4391 out.extend_from_slice(body);
4392 let crc = Self::crc32_bytes(&out);
4393 out.extend_from_slice(&crc.to_le_bytes());
4394 Arc::from(out)
4395 }
4396
4397 fn parse_side_transport_wrapper(bytes: &[u8]) -> TelemetryResult<Option<(u8, &[u8])>> {
4398 if bytes.len() < SIDE_TRANSPORT_MAGIC.len() + 1 + wire_format::CRC32_BYTES {
4399 return Ok(None);
4400 }
4401 if &bytes[..SIDE_TRANSPORT_MAGIC.len()] != SIDE_TRANSPORT_MAGIC {
4402 return Ok(None);
4403 }
4404 let data_len = bytes.len() - wire_format::CRC32_BYTES;
4405 let expected = u32::from_le_bytes([
4406 bytes[data_len],
4407 bytes[data_len + 1],
4408 bytes[data_len + 2],
4409 bytes[data_len + 3],
4410 ]);
4411 let data = &bytes[..data_len];
4412 if Self::crc32_bytes(data) != expected {
4413 return Err(TelemetryError::Unpack("side transport crc32 mismatch"));
4414 }
4415 let kind = data[SIDE_TRANSPORT_MAGIC.len()];
4416 Ok(Some((kind, &data[SIDE_TRANSPORT_MAGIC.len() + 1..])))
4417 }
4418
4419 fn read_uleb128_local(buf: &[u8], off: &mut usize) -> TelemetryResult<u64> {
4420 let mut result = 0u64;
4421 let mut shift = 0u32;
4422 for _ in 0..10 {
4423 let byte = *buf.get(*off).ok_or(TelemetryError::Unpack("short read"))?;
4424 *off += 1;
4425 result |= u64::from(byte & 0x7F) << shift;
4426 if (byte & 0x80) == 0 {
4427 return Ok(result);
4428 }
4429 shift += 7;
4430 }
4431 Err(TelemetryError::Unpack("uleb128 too long"))
4432 }
4433
4434 fn write_uleb128_local(mut value: u64, out: &mut Vec<u8>) {
4435 loop {
4436 let mut byte = (value & 0x7F) as u8;
4437 value >>= 7;
4438 if value != 0 {
4439 byte |= 0x80;
4440 }
4441 out.push(byte);
4442 if value == 0 {
4443 break;
4444 }
4445 }
4446 }
4447
4448 fn uleb128_len_local(mut value: u64) -> usize {
4449 let mut len = 1;
4450 while value >= 0x80 {
4451 value >>= 7;
4452 len += 1;
4453 }
4454 len
4455 }
4456
4457 fn extract_side_header_template(bytes: &[u8]) -> TelemetryResult<SideTemplateExtract<'_>> {
4458 if bytes.len() < wire_format::CRC32_BYTES + 4 {
4459 return Err(TelemetryError::Unpack("short buffer"));
4460 }
4461 let data_len = bytes.len() - wire_format::CRC32_BYTES;
4462 let data = &bytes[..data_len];
4463 let mut off = 0usize;
4464 let flags = *data
4465 .get(off)
4466 .ok_or(TelemetryError::Unpack("short prelude"))?;
4467 off += 1;
4468 off += 1; let ty_u64 = Self::read_uleb128_local(data, &mut off)?;
4470 let ty_u32 = u32::try_from(ty_u64).map_err(|_| TelemetryError::Unpack("bad data type"))?;
4471 if ty_u32 > crate::MAX_VALUE_DATA_TYPE {
4472 return Err(TelemetryError::Unpack("bad data type"));
4473 }
4474 let ty = crate::DataType(ty_u32);
4475 let data_size_off = off;
4476 let data_size = Self::read_uleb128_local(data, &mut off)?;
4477 let timestamp = Self::read_uleb128_local(data, &mut off)?;
4478 let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4479 u16::try_from(Self::read_uleb128_local(data, &mut off)?)
4480 .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
4481 } else {
4482 0
4483 };
4484 let between_start = off;
4485 let _source_address = u32::try_from(Self::read_uleb128_local(data, &mut off)?)
4486 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
4487 let endpoint_bitmap_bytes = if (flags & SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT) != 0 {
4488 SIDE_TRANSPORT_EP_BITMAP_BYTES
4489 } else {
4490 0
4491 };
4492 if data.len() < off + endpoint_bitmap_bytes {
4493 return Err(TelemetryError::Unpack("short buffer"));
4494 }
4495 off += endpoint_bitmap_bytes;
4496 if (flags & SIDE_TRANSPORT_FLAG_WIRE_CONTRACT) != 0 {
4497 let contract_len = usize::try_from(Self::read_uleb128_local(data, &mut off)?)
4498 .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
4499 if data.len() < off + contract_len {
4500 return Err(TelemetryError::Unpack("short buffer"));
4501 }
4502 off += contract_len;
4503 }
4504 let reliable_span = wire_format::reliable_header_span(bytes)?;
4505 let (reliable_flags, reliable_seq_ack, reliable_compact, payload_off) =
4506 if let Some((rel_off, rel_len, hdr)) = reliable_span {
4507 if data.len() < rel_off + rel_len {
4508 return Err(TelemetryError::Unpack("short buffer"));
4509 }
4510 (
4511 Some(hdr.flags),
4512 Some((hdr.seq, hdr.ack)),
4513 (flags & SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER) != 0,
4514 rel_off + rel_len,
4515 )
4516 } else {
4517 (None, None, false, off)
4518 };
4519 if payload_off > data.len() {
4520 return Err(TelemetryError::Unpack("short buffer"));
4521 }
4522 let payload = &data[payload_off..];
4523 let prefix = Arc::<[u8]>::from(&data[1..data_size_off]);
4524 let between_end = reliable_span
4525 .map(|(rel_off, _, _)| rel_off)
4526 .unwrap_or(payload_off);
4527 let between = Arc::<[u8]>::from(&data[between_start..between_end]);
4528 let base_flags =
4529 flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE);
4530 let mut hash = 0xD1B5_4A32_9C7E_01F3u64;
4531 hash = hash_bytes_u64(hash, &[base_flags]);
4532 hash = hash_bytes_u64(hash, &prefix);
4533 hash = hash_bytes_u64(hash, &between);
4534 if let Some(rel_flags) = reliable_flags {
4535 hash = hash_bytes_u64(hash, &[rel_flags]);
4536 }
4537 let template = SideHeaderTemplate {
4538 hash,
4539 base_flags,
4540 prefix,
4541 between,
4542 reliable_flags,
4543 reliable_compact,
4544 };
4545 Ok((
4546 template,
4547 ty,
4548 flags,
4549 data_size,
4550 timestamp,
4551 nonce,
4552 reliable_seq_ack,
4553 payload,
4554 ))
4555 }
4556
4557 fn reconstruct_side_compact_frame(
4558 template: &SideHeaderTemplate,
4559 body: &[u8],
4560 timestamp_mode: SideCompactTimestampMode,
4561 timestamp_base: Option<u64>,
4562 ) -> TelemetryResult<(Arc<[u8]>, u64)> {
4563 if body.is_empty() {
4564 return Err(TelemetryError::Unpack("short side compact frame"));
4565 }
4566 let mut off = 0usize;
4567 let flags = body[off];
4568 off += 1;
4569 if (flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE))
4570 != template.base_flags
4571 {
4572 return Err(TelemetryError::Unpack("side compact flags mismatch"));
4573 }
4574 let data_size = Self::read_uleb128_local(body, &mut off)?;
4575 let timestamp = match timestamp_mode {
4576 SideCompactTimestampMode::Absolute => Self::read_uleb128_local(body, &mut off)?,
4577 SideCompactTimestampMode::Delta => {
4578 let timestamp_field = Self::read_uleb128_local(body, &mut off)?;
4579 let base = timestamp_base.ok_or(TelemetryError::Unpack(
4580 "missing side compact timestamp context",
4581 ))?;
4582 base.checked_add(timestamp_field)
4583 .ok_or(TelemetryError::Unpack(
4584 "side compact timestamp delta overflow",
4585 ))?
4586 }
4587 SideCompactTimestampMode::Omitted => timestamp_base.ok_or(TelemetryError::Unpack(
4588 "missing side compact timestamp context",
4589 ))?,
4590 };
4591 let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4592 Some(Self::read_uleb128_local(body, &mut off)?)
4593 } else {
4594 None
4595 };
4596 let reliable_seq_ack = if template.reliable_flags.is_some() {
4597 let seq = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4598 .map_err(|_| TelemetryError::Unpack("side compact reliable seq too large"))?;
4599 let ack = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4600 .map_err(|_| TelemetryError::Unpack("side compact reliable ack too large"))?;
4601 Some((seq, ack))
4602 } else {
4603 None
4604 };
4605 let payload = &body[off..];
4606 let mut raw = Vec::with_capacity(
4607 1 + template.prefix.len() + template.between.len() + payload.len() + 32,
4608 );
4609 raw.push(flags);
4610 raw.extend_from_slice(&template.prefix);
4611 Self::write_uleb128_local(data_size, &mut raw);
4612 Self::write_uleb128_local(timestamp, &mut raw);
4613 if let Some(nonce) = nonce {
4614 Self::write_uleb128_local(nonce, &mut raw);
4615 }
4616 raw.extend_from_slice(&template.between);
4617 if let Some(rel_flags) = template.reliable_flags {
4618 let (seq, ack) =
4619 reliable_seq_ack.ok_or(TelemetryError::Unpack("missing side compact reliable"))?;
4620 wire_format::write_reliable_header_encoded(
4621 wire_format::ReliableHeader {
4622 flags: rel_flags,
4623 seq,
4624 ack,
4625 },
4626 template.reliable_compact,
4627 &mut raw,
4628 );
4629 }
4630 raw.extend_from_slice(payload);
4631 let crc = Self::crc32_bytes(&raw);
4632 raw.extend_from_slice(&crc.to_le_bytes());
4633 Ok((Arc::from(raw), timestamp))
4634 }
4635
4636 fn split_side_transport_frame(
4637 &self,
4638 side: RelaySideId,
4639 frame: Arc<[u8]>,
4640 max_frame_bytes: usize,
4641 ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4642 if max_frame_bytes <= SIDE_TRANSPORT_CHUNK_OVERHEAD {
4643 return Err(TelemetryError::BadArg);
4644 }
4645 let payload_budget = max_frame_bytes - SIDE_TRANSPORT_CHUNK_OVERHEAD;
4646 let mut st = self.state.lock();
4647 let side_state = st
4648 .side_transport
4649 .get_mut(&side)
4650 .ok_or(TelemetryError::BadArg)?;
4651 let transfer_id = side_state.next_chunk_id.wrapping_add(1).max(1);
4652 side_state.next_chunk_id = transfer_id;
4653 drop(st);
4654
4655 let total = frame.len().div_ceil(payload_budget);
4656 let total_u16 =
4657 u16::try_from(total).map_err(|_| TelemetryError::PacketTooLarge("too many chunks"))?;
4658 let mut frames = Vec::with_capacity(total);
4659 for (idx, chunk) in frame.chunks(payload_budget).enumerate() {
4660 let mut body = Vec::with_capacity(8 + chunk.len());
4661 body.extend_from_slice(&transfer_id.to_le_bytes());
4662 body.extend_from_slice(&(idx as u16).to_le_bytes());
4663 body.extend_from_slice(&total_u16.to_le_bytes());
4664 body.extend_from_slice(chunk);
4665 frames.push(Self::wrap_side_transport_frame(
4666 SIDE_TRANSPORT_KIND_CHUNK,
4667 &body,
4668 ));
4669 }
4670 Ok(frames)
4671 }
4672
4673 fn encode_side_transport_frames(
4674 &self,
4675 side: RelaySideId,
4676 opts: RelaySideOptions,
4677 raw: Arc<[u8]>,
4678 ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4679 if !opts.header_template_enabled && opts.max_frame_bytes == 0 {
4680 return Ok(vec![raw]);
4681 }
4682 let raw_len = raw.len();
4683 let mut compact_payload_len = None;
4684 let mut used_compact = false;
4685 let mut used_timestamp_delta = false;
4686 let mut omitted_timestamp = false;
4687 let (template, ty, flags, data_size, timestamp, nonce, reliable_seq_ack, payload) =
4688 Self::extract_side_header_template(raw.as_ref())?;
4689 let (template_id, use_compact, previous_timestamp) = {
4690 let mut st = self.state.lock();
4691 let side_state = st
4692 .side_transport
4693 .get_mut(&side)
4694 .ok_or(TelemetryError::BadArg)?;
4695 if let Some(id) = side_state.tx_template_ids.get(&template.hash).copied() {
4696 let previous = side_state.tx_last_timestamps.get(&id).copied();
4697 (id, true, previous)
4698 } else {
4699 let next = side_state.next_template_id.wrapping_add(1).max(1);
4700 side_state.next_template_id = next;
4701 let evicted = side_state.insert_tx_template(
4702 template,
4703 next,
4704 opts.max_side_transport_templates,
4705 );
4706 if evicted {
4707 st.side_runtime_stats
4708 .entry(side)
4709 .or_default()
4710 .note_side_transport_template_eviction();
4711 }
4712 if let Some(side_state) = st.side_transport.get_mut(&side) {
4713 side_state.tx_last_timestamps.insert(next, timestamp);
4714 }
4715 (next, false, None)
4716 }
4717 };
4718 let wrapped = if use_compact {
4719 used_compact = true;
4720 compact_payload_len = Some(payload.len());
4721 let timestamp_field = if let Some(previous) = previous_timestamp {
4722 let delta = timestamp.saturating_sub(previous);
4723 let omit_timestamp = opts.omit_unchanged_compact_timestamps
4724 || opts.compact_timestamp_omission_types.contains(ty);
4725 if omit_timestamp && timestamp == previous {
4726 omitted_timestamp = true;
4727 None
4728 } else if timestamp >= previous
4729 && Self::uleb128_len_local(delta) < Self::uleb128_len_local(timestamp)
4730 {
4731 used_timestamp_delta = true;
4732 Some(delta)
4733 } else {
4734 Some(timestamp)
4735 }
4736 } else {
4737 Some(timestamp)
4738 };
4739 let mut body = Vec::with_capacity(payload.len() + 32);
4740 body.push(flags);
4741 Self::write_uleb128_local(u64::from(template_id), &mut body);
4742 Self::write_uleb128_local(data_size, &mut body);
4743 if let Some(timestamp_field) = timestamp_field {
4744 Self::write_uleb128_local(timestamp_field, &mut body);
4745 }
4746 if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4747 Self::write_uleb128_local(u64::from(nonce), &mut body);
4748 }
4749 if let Some((seq, ack)) = reliable_seq_ack {
4750 Self::write_uleb128_local(u64::from(seq), &mut body);
4751 Self::write_uleb128_local(u64::from(ack), &mut body);
4752 }
4753 body.extend_from_slice(payload);
4754 {
4755 let mut st = self.state.lock();
4756 if let Some(side_state) = st.side_transport.get_mut(&side) {
4757 side_state.tx_last_timestamps.insert(template_id, timestamp);
4758 }
4759 }
4760 let kind = if omitted_timestamp {
4761 SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4762 } else if used_timestamp_delta {
4763 SIDE_TRANSPORT_KIND_COMPACT_DELTA
4764 } else {
4765 SIDE_TRANSPORT_KIND_COMPACT
4766 };
4767 Self::wrap_side_transport_frame(kind, &body)
4768 } else {
4769 let mut body = Vec::with_capacity(raw.len() + 4);
4770 Self::write_uleb128_local(u64::from(template_id), &mut body);
4771 body.extend_from_slice(raw.as_ref());
4772 Self::wrap_side_transport_frame(SIDE_TRANSPORT_KIND_FULL, &body)
4773 };
4774 let frames = if opts.max_frame_bytes != 0 && wrapped.len() > opts.max_frame_bytes {
4775 self.split_side_transport_frame(side, wrapped, opts.max_frame_bytes)
4776 } else {
4777 Ok(vec![wrapped])
4778 }?;
4779 let wire_len = frames.iter().map(|frame| frame.len()).sum::<usize>();
4780 let mut st = self.state.lock();
4781 let stats = st.side_runtime_stats.entry(side).or_default();
4782 if used_compact {
4783 let overhead = compact_payload_len
4784 .map(|payload_len| wire_len.saturating_sub(payload_len))
4785 .unwrap_or(wire_len);
4786 stats.note_side_transport_compact(
4787 raw_len,
4788 wire_len,
4789 overhead,
4790 used_timestamp_delta,
4791 omitted_timestamp,
4792 );
4793 if opts.compact_header_target_bytes != 0 && overhead > opts.compact_header_target_bytes
4794 {
4795 stats.note_side_transport_compact_target_miss();
4796 }
4797 } else {
4798 stats.note_side_transport_full(raw_len, wire_len);
4799 }
4800 if frames.len() > 1 {
4801 stats.note_side_transport_chunks(frames.len());
4802 }
4803 Ok(frames)
4804 }
4805
4806 fn decode_side_transport_frame(
4807 &self,
4808 side: RelaySideId,
4809 bytes: &[u8],
4810 ) -> TelemetryResult<Option<Arc<[u8]>>> {
4811 let Some((kind, body)) = Self::parse_side_transport_wrapper(bytes)? else {
4812 return Ok(Some(Arc::from(bytes)));
4813 };
4814 match kind {
4815 SIDE_TRANSPORT_KIND_FULL => {
4816 let mut off = 0usize;
4817 let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4818 .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4819 let raw = Arc::<[u8]>::from(&body[off..]);
4820 if let Ok((template, _, _, _, timestamp, _, _, _)) =
4821 Self::extract_side_header_template(raw.as_ref())
4822 {
4823 let mut st = self.state.lock();
4824 let max_templates = st
4825 .sides
4826 .get(side)
4827 .and_then(|side| side.as_ref())
4828 .map(|side| side.opts.max_side_transport_templates)
4829 .unwrap_or(DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT);
4830 let evicted = st.side_transport.get_mut(&side).is_some_and(|side_state| {
4831 let evicted =
4832 side_state.insert_rx_template(template_id, template, max_templates);
4833 side_state.rx_last_timestamps.insert(template_id, timestamp);
4834 evicted
4835 });
4836 if evicted {
4837 st.side_runtime_stats
4838 .entry(side)
4839 .or_default()
4840 .note_side_transport_template_eviction();
4841 }
4842 }
4843 Ok(Some(raw))
4844 }
4845 SIDE_TRANSPORT_KIND_COMPACT
4846 | SIDE_TRANSPORT_KIND_COMPACT_DELTA
4847 | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => {
4848 if body.is_empty() {
4849 return Err(TelemetryError::Unpack("short side compact frame"));
4850 }
4851 let mut off = 1usize;
4852 let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4853 .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4854 let mut compact_body = Vec::with_capacity(1 + body.len().saturating_sub(off));
4855 compact_body.push(body[0]);
4856 compact_body.extend_from_slice(&body[off..]);
4857 let (template, timestamp_base) = {
4858 let st = self.state.lock();
4859 let state = st.side_transport.get(&side);
4860 let template = state
4861 .and_then(|state| state.rx_templates_by_id.get(&template_id))
4862 .cloned();
4863 let timestamp_base = if matches!(
4864 kind,
4865 SIDE_TRANSPORT_KIND_COMPACT_DELTA
4866 | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4867 ) {
4868 state
4869 .and_then(|state| state.rx_last_timestamps.get(&template_id))
4870 .copied()
4871 } else {
4872 None
4873 };
4874 (template, timestamp_base)
4875 };
4876 let template =
4877 template.ok_or(TelemetryError::Unpack("unknown side compact template"))?;
4878 let timestamp_mode = match kind {
4879 SIDE_TRANSPORT_KIND_COMPACT_DELTA => SideCompactTimestampMode::Delta,
4880 SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => SideCompactTimestampMode::Omitted,
4881 _ => SideCompactTimestampMode::Absolute,
4882 };
4883 let (frame, timestamp) = Self::reconstruct_side_compact_frame(
4884 &template,
4885 &compact_body,
4886 timestamp_mode,
4887 timestamp_base,
4888 )?;
4889 let mut st = self.state.lock();
4890 if let Some(side_state) = st.side_transport.get_mut(&side) {
4891 side_state.rx_last_timestamps.insert(template_id, timestamp);
4892 }
4893 Ok(Some(frame))
4894 }
4895 SIDE_TRANSPORT_KIND_CHUNK => {
4896 if body.len() < 8 {
4897 return Err(TelemetryError::Unpack("short side chunk frame"));
4898 }
4899 let transfer_id = u32::from_le_bytes([body[0], body[1], body[2], body[3]]);
4900 let index = u16::from_le_bytes([body[4], body[5]]);
4901 let total = u16::from_le_bytes([body[6], body[7]]);
4902 let payload = Arc::<[u8]>::from(&body[8..]);
4903 let assembled = {
4904 let mut st = self.state.lock();
4905 let side_state = st
4906 .side_transport
4907 .get_mut(&side)
4908 .ok_or(TelemetryError::BadArg)?;
4909 let entry = side_state.rx_chunks.entry(transfer_id).or_default();
4910 if entry.total == 0 {
4911 entry.total = total;
4912 } else if entry.total != total {
4913 side_state.rx_chunks.remove(&transfer_id);
4914 return Err(TelemetryError::Unpack("side chunk total mismatch"));
4915 }
4916 entry.received.entry(index).or_insert(payload);
4917 if entry.received.len() == usize::from(total) {
4918 let entry = side_state
4919 .rx_chunks
4920 .remove(&transfer_id)
4921 .ok_or(TelemetryError::Unpack("side chunk missing"))?;
4922 let mut out = Vec::new();
4923 for idx in 0..entry.total {
4924 let chunk = entry
4925 .received
4926 .get(&idx)
4927 .ok_or(TelemetryError::Unpack("side chunk gap"))?;
4928 out.extend_from_slice(chunk);
4929 }
4930 Some(Arc::<[u8]>::from(out))
4931 } else {
4932 None
4933 }
4934 };
4935 match assembled {
4936 Some(frame) => self.decode_side_transport_frame(side, frame.as_ref()),
4937 None => Ok(None),
4938 }
4939 }
4940 _ => Err(TelemetryError::Unpack("unknown side transport frame")),
4941 }
4942 }
4943
4944 fn call_tx_handler(
4950 &self,
4951 side: RelaySideId,
4952 handler: &RelayTxHandlerFn,
4953 data: &RelayItem,
4954 ) -> TelemetryResult<()> {
4955 let opts = {
4956 let st = self.state.lock();
4957 Self::side_ref(&st, side)?.opts
4958 };
4959 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
4960 return Err(TelemetryError::Io("side tx busy"));
4961 };
4962 let started_ms = self.clock.now_ms();
4963 let ty = match data {
4964 RelayItem::Packet(pkt) => pkt.data_type(),
4965 RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
4966 };
4967 let result = match (handler, data) {
4968 (RelayTxHandlerFn::Packed(f), RelayItem::Packed(bytes)) => {
4970 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
4971 let mut sent_bytes = 0usize;
4972 for frame in frames {
4973 f(frame.as_ref())?;
4974 sent_bytes = sent_bytes.saturating_add(frame.len());
4975 }
4976 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4977 self.note_side_tx_success(side, ty, sent_bytes, 1);
4978 return Ok(());
4979 }
4980 (RelayTxHandlerFn::Packet(f), RelayItem::Packet(pkt)) => f(pkt),
4981
4982 (RelayTxHandlerFn::Packed(f), RelayItem::Packet(pkt)) => {
4984 let owned = wire_format::pack_packet(pkt);
4985 let frames = self.encode_side_transport_frames(side, opts, owned)?;
4986 let mut sent_bytes = 0usize;
4987 for frame in frames {
4988 f(frame.as_ref())?;
4989 sent_bytes = sent_bytes.saturating_add(frame.len());
4990 }
4991 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4992 self.note_side_tx_success(side, ty, sent_bytes, 1);
4993 return Ok(());
4994 }
4995 (RelayTxHandlerFn::Packet(f), RelayItem::Packed(bytes)) => {
4996 if wire_format::peek_frame_info(bytes.as_ref())
4997 .ok()
4998 .is_some_and(|frame| frame.ack_only())
4999 {
5000 return Ok(());
5001 }
5002 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
5003 f(&pkt)
5004 }
5005 };
5006 if result.is_ok()
5007 && let Ok(bytes) = Self::relay_item_wire_len(data)
5008 {
5009 self.record_side_tx_sample(side, bytes, started_ms, self.clock.now_ms());
5010 self.note_side_tx_success(side, ty, bytes, 1);
5011 } else if result.is_err() {
5012 self.note_side_tx_failure(side, ty, 1);
5013 }
5014 result
5015 }
5016
5017 fn adjust_reliable_for_side(
5018 &self,
5019 opts: RelaySideOptions,
5020 data: RelayItem,
5021 ) -> TelemetryResult<Option<RelayItem>> {
5022 if opts.reliable_enabled {
5023 return Ok(Some(data));
5024 }
5025
5026 match data {
5027 RelayItem::Packed(bytes) => {
5028 let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
5029 Ok(frame) => frame,
5030 Err(_) => return Ok(Some(RelayItem::Packed(bytes))),
5031 };
5032 if is_reliable_type(frame.envelope.ty)
5033 && let Some(hdr) = frame.reliable
5034 {
5035 if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
5036 return Ok(None);
5037 }
5038 if (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) == 0 {
5039 let Some(rewritten) = wire_format::rewrite_reliable_header_owned(
5040 bytes.as_ref(),
5041 wire_format::RELIABLE_FLAG_UNSEQUENCED,
5042 hdr.seq,
5043 0,
5044 )?
5045 else {
5046 return Ok(Some(RelayItem::Packed(bytes)));
5047 };
5048 return Ok(Some(RelayItem::Packed(rewritten)));
5049 }
5050 }
5051 Ok(Some(RelayItem::Packed(bytes)))
5052 }
5053 RelayItem::Packet(pkt) => {
5054 if matches!(
5055 pkt.data_type(),
5056 crate::DataType::ReliableAck
5057 | crate::DataType::ReliablePartialAck
5058 | crate::DataType::ReliablePacketRequest
5059 ) {
5060 return Ok(None);
5061 }
5062 Ok(Some(RelayItem::Packet(pkt)))
5063 }
5064 }
5065 }
5066
5067 #[inline]
5069 pub fn process_rx_queue(&self) -> TelemetryResult<()> {
5070 self.process_rx_queue_with_timeout(0)
5071 }
5072
5073 #[inline]
5078 pub fn process_tx_queue(&self) -> TelemetryResult<()> {
5079 self.process_tx_queue_with_timeout(0)
5080 }
5081
5082 #[inline]
5084 pub fn process_all_queues(&self) -> TelemetryResult<()> {
5085 self.process_all_queues_with_timeout(0)
5086 }
5087
5088 pub fn process_tx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5093 if self.side_tx_active() {
5094 return Ok(());
5095 }
5096 #[cfg(feature = "discovery")]
5097 {
5098 let _ = self.poll_discovery()?;
5099 }
5100 let start = self.clock.now_ms();
5101 loop {
5102 self.process_reliable_timeouts()?;
5103 if self.process_replay_queue_item()? {
5104 if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5105 break;
5106 }
5107 continue;
5108 }
5109 let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() else {
5110 break;
5111 };
5112 match self.send_tx_item(src, dst, handler, opts, data.clone()) {
5113 Ok(sent) => {
5114 if sent
5115 && timeout_ms != 0
5116 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64
5117 {
5118 break;
5119 }
5120 }
5121 Err(e) if Self::is_side_tx_busy(&e) => {
5122 let priority = Self::relay_item_priority(&data)?;
5123 let mut st = self.state.lock();
5124 st.push_tx(RelayTxItem {
5125 src,
5126 dst,
5127 data,
5128 priority,
5129 })?;
5130 break;
5131 }
5132 Err(e) => return Err(e),
5133 }
5134 }
5135 Ok(())
5136 }
5137
5138 pub fn process_rx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5140 #[cfg(feature = "discovery")]
5141 {
5142 let _ = self.poll_discovery()?;
5143 }
5144 let start = self.clock.now_ms();
5145 loop {
5146 let item_opt = {
5147 let mut st = self.state.lock();
5148 st.rx_queue.pop_front()
5149 };
5150 let Some(item) = item_opt else { break };
5151 self.process_rx_queue_item(item)?;
5152
5153 if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5154 break;
5155 }
5156 }
5157 Ok(())
5158 }
5159
5160 pub fn process_all_queues_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5165 if self.side_tx_active() {
5166 return Ok(());
5167 }
5168 #[cfg(feature = "discovery")]
5169 {
5170 let _ = self.poll_discovery()?;
5171 }
5172 let drain_fully = timeout_ms == 0;
5173 let start = if drain_fully { 0 } else { self.clock.now_ms() };
5174
5175 loop {
5176 let mut did_any = false;
5177 self.process_reliable_timeouts()?;
5178
5179 if let Some(item) = {
5181 let mut st = self.state.lock();
5182 st.rx_queue.pop_front()
5183 } {
5184 self.process_rx_queue_item(item)?;
5185 did_any = true;
5186 }
5187
5188 if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5189 break;
5190 }
5191
5192 if self.process_replay_queue_item()? {
5193 did_any = true;
5194 }
5195
5196 let sent_one = if let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() {
5198 self.send_tx_item(src, dst, handler, opts, data)?
5199 } else {
5200 false
5201 };
5202
5203 if sent_one {
5204 did_any = true;
5205 }
5206
5207 if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5208 break;
5209 }
5210
5211 if !did_any {
5212 break;
5213 }
5214 }
5215
5216 Ok(())
5217 }
5218
5219 pub fn periodic(&self, timeout_ms: u32) -> TelemetryResult<()> {
5224 #[cfg(feature = "discovery")]
5225 {
5226 let _ = self.poll_discovery()?;
5227 }
5228
5229 self.process_all_queues_with_timeout(timeout_ms)
5230 }
5231}