1use crate::config::{
2 MAX_QUEUE_BUDGET, MAX_RECENT_RX_IDS, QUEUE_GROW_STEP, RECENT_RX_QUEUE_BYTES,
3 RELIABLE_MAX_END_TO_END_ACK_CACHE, RELIABLE_MAX_END_TO_END_PENDING, RELIABLE_MAX_PENDING,
4 RELIABLE_MAX_RETRIES, RELIABLE_MAX_RETURN_ROUTES, RELIABLE_RETRANSMIT_MS, STARTING_QUEUE_SIZE,
5};
6use crate::diagnostics::{
7 AdaptiveLinkStats, DiscoveryRuntimeStats, QueueRuntimeStats, ReliableRuntimeStats,
8 RouteModeStats, RouteOverrideStats, RoutePriorityStats, RouteWeightStats, RuntimeSideStats,
9 RuntimeStatsSnapshot, RuntimeTypeStats, TypedRouteOverrideStats,
10};
11#[cfg(feature = "discovery")]
12use crate::discovery::{
13 self, ClientStatsSnapshot, DISCOVERY_ROUTE_TTL_MS, DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS,
14 DISCOVERY_SLOW_LINK_PING_INTERVAL_MS, DiscoveryCadenceState,
15 TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS, TopologyAnnouncerRoute, TopologyBoardNode,
16 TopologySideRoute, TopologySnapshot,
17};
18use crate::packet::{Packet, hash_bytes_u64};
19use crate::queue::{BoundedDeque, ByteCost};
20use crate::wire_format;
21use crate::{is_reliable_type, message_meta, message_priority, reliable_mode};
22use crate::{
23 router::{Clock, CompactTimestampOmissionPolicy, SideTransportProfile},
24 {
25 RouteSelectionMode, TelemetryError, TelemetryResult,
26 lock::{ReentryGate, ReentryGuard, RouterMutex},
27 },
28};
29use alloc::borrow::ToOwned;
30use alloc::boxed::Box;
31use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
32use alloc::string::{String, ToString};
33use alloc::{sync::Arc, vec, vec::Vec};
34use core::mem::size_of;
35use crc32fast::Hasher as Crc32Hasher;
36
37pub type RelaySideId = usize;
39const SIDE_TRANSPORT_MAGIC: &[u8; 3] = b"SDT";
40const SIDE_TRANSPORT_KIND_FULL: u8 = 0x01;
41const SIDE_TRANSPORT_KIND_COMPACT: u8 = 0x02;
42const SIDE_TRANSPORT_KIND_CHUNK: u8 = 0x03;
43const SIDE_TRANSPORT_KIND_COMPACT_DELTA: u8 = 0x04;
44const SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP: u8 = 0x05;
45const SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED: u8 = 0x01;
46const SIDE_TRANSPORT_FLAG_WIRE_CONTRACT: u8 = 0x04;
47const SIDE_TRANSPORT_FLAG_PACKET_NONCE: u8 = 0x08;
48const SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
49const SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
50const CONTROL_SLOW_LINK_CAPACITY_BPS: u64 = 512;
51const SIDE_TRANSPORT_CHUNK_OVERHEAD: usize = 3 + 1 + 4 + 2 + 2 + wire_format::CRC32_BYTES;
52const SIDE_TRANSPORT_EP_BITMAP_BITS: usize = (crate::MAX_VALUE_DATA_ENDPOINT as usize) + 1;
53const SIDE_TRANSPORT_EP_BITMAP_BYTES: usize = SIDE_TRANSPORT_EP_BITMAP_BITS.div_ceil(8);
54pub const IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
55 crate::router::IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
56pub const IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
57 crate::router::IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
58pub const DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT: usize =
59 crate::router::DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT;
60type PacketHandlerFn = dyn Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static;
62
63type PackedHandlerFn = dyn Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static;
65
66#[derive(Clone)]
68pub enum RelayTxHandlerFn {
69 Packed(Arc<PackedHandlerFn>),
70 Packet(Arc<PacketHandlerFn>),
71}
72
73#[derive(Clone, Copy, Debug)]
74pub struct RelaySideOptions {
75 pub reliable_enabled: bool,
81 pub link_local_enabled: bool,
83 pub ingress_enabled: bool,
85 pub egress_enabled: bool,
87 pub header_template_enabled: bool,
89 pub max_frame_bytes: usize,
96 pub compact_header_target_bytes: usize,
98 pub max_side_transport_templates: usize,
100 pub omit_unchanged_compact_timestamps: bool,
102 pub compact_timestamp_omission_types: CompactTimestampOmissionPolicy,
104 pub side_transport_profile: SideTransportProfile,
106}
107
108impl Default for RelaySideOptions {
109 fn default() -> Self {
110 Self {
111 reliable_enabled: false,
112 link_local_enabled: false,
113 ingress_enabled: true,
114 egress_enabled: true,
115 header_template_enabled: false,
116 max_frame_bytes: 0,
117 compact_header_target_bytes: 0,
118 max_side_transport_templates: DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT,
119 omit_unchanged_compact_timestamps: false,
120 compact_timestamp_omission_types: CompactTimestampOmissionPolicy::none(),
121 side_transport_profile: SideTransportProfile::Canonical,
122 }
123 }
124}
125
126impl RelaySideOptions {
127 #[inline]
132 pub fn with_small_packet_transport(mut self, max_frame_bytes: usize) -> Self {
133 self.header_template_enabled = true;
134 self.max_frame_bytes = max_frame_bytes;
135 self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
136 self.side_transport_profile = SideTransportProfile::Ipv6Like;
137 self
138 }
139
140 #[inline]
141 pub fn with_ipv4_like_compact_header_target(mut self) -> Self {
142 self.header_template_enabled = true;
143 self.compact_header_target_bytes = IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
144 self.omit_unchanged_compact_timestamps = true;
145 self.side_transport_profile = SideTransportProfile::Ipv4Like;
146 self
147 }
148
149 #[inline]
150 pub fn with_ipv6_like_compact_header_target(mut self) -> Self {
151 self.header_template_enabled = true;
152 self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
153 self.side_transport_profile = SideTransportProfile::Ipv6Like;
154 self
155 }
156
157 #[inline]
158 pub fn with_template_transport(mut self) -> Self {
159 self.header_template_enabled = true;
160 self.side_transport_profile = SideTransportProfile::Template;
161 self
162 }
163
164 #[inline]
165 pub fn with_omitted_unchanged_compact_timestamps(mut self) -> Self {
166 self.header_template_enabled = true;
167 self.omit_unchanged_compact_timestamps = true;
168 self
169 }
170
171 #[inline]
172 pub fn with_omitted_unchanged_compact_timestamps_for_type(
173 mut self,
174 ty: crate::DataType,
175 ) -> Self {
176 self.header_template_enabled = true;
177 self.compact_timestamp_omission_types.insert(ty);
178 self
179 }
180
181 #[inline]
182 pub fn effective_transport_profile(self) -> SideTransportProfile {
183 if !self.header_template_enabled && self.max_frame_bytes == 0 {
184 SideTransportProfile::Canonical
185 } else if self.side_transport_profile == SideTransportProfile::Canonical {
186 SideTransportProfile::Template
187 } else {
188 self.side_transport_profile
189 }
190 }
191
192 #[cfg(feature = "discovery")]
193 #[inline]
194 pub fn link_capabilities(self) -> discovery::LinkCapabilities {
195 let mut flags = discovery::LINK_CAPABILITY_END_TO_END_RELIABILITY;
196 if self.header_template_enabled {
197 flags |= discovery::LINK_CAPABILITY_HEADER_TEMPLATES;
198 }
199 if self.max_frame_bytes != 0 {
200 flags |= discovery::LINK_CAPABILITY_CHUNKING;
201 }
202 if self.reliable_enabled {
203 flags |= discovery::LINK_CAPABILITY_RELIABILITY;
204 }
205 if self.omit_unchanged_compact_timestamps
206 || !self.compact_timestamp_omission_types.is_empty()
207 {
208 flags |= discovery::LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS;
209 }
210 #[cfg(feature = "cryptography")]
211 {
212 flags |= discovery::LINK_CAPABILITY_CRYPTO;
213 }
214 discovery::LinkCapabilities {
215 version: 1,
216 flags,
217 profile: self.effective_transport_profile().discovery_code(),
218 max_frame_bytes: self.max_frame_bytes.min(u32::MAX as usize) as u32,
219 compact_header_target_bytes: self.compact_header_target_bytes.min(u32::MAX as usize)
220 as u32,
221 max_side_transport_templates: self.max_side_transport_templates.min(u32::MAX as usize)
222 as u32,
223 }
224 }
225}
226
227#[derive(Clone)]
229pub struct RelaySide {
230 pub name: &'static str,
231 pub tx_handler: RelayTxHandlerFn,
232 pub opts: RelaySideOptions,
233}
234
235#[derive(Clone, Debug, PartialEq, Eq)]
236pub enum RelayItem {
237 Packed(Arc<[u8]>),
238 Packet(Arc<Packet>),
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
243struct RelayRxItem {
244 src: RelaySideId,
245 data: RelayItem,
246 priority: u8,
247}
248
249impl ByteCost for RelayRxItem {
250 fn byte_cost(&self) -> usize {
251 match &self.data {
252 RelayItem::Packed(bytes) => bytes.len(),
253 RelayItem::Packet(pkt) => pkt.byte_cost(),
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, Eq)]
260struct RelayTxItem {
261 src: Option<RelaySideId>,
262 dst: RelaySideId,
263 data: RelayItem,
264 priority: u8,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq)]
268struct RelayReplayItem {
269 dst: RelaySideId,
270 bytes: Arc<[u8]>,
271 priority: u8,
272}
273
274impl ByteCost for RelayTxItem {
275 fn byte_cost(&self) -> usize {
276 match &self.data {
277 RelayItem::Packed(bytes) => bytes.len(),
278 RelayItem::Packet(pkt) => pkt.byte_cost(),
279 }
280 }
281}
282
283impl ByteCost for RelayReplayItem {
284 fn byte_cost(&self) -> usize {
285 self.bytes.len()
286 }
287}
288
289#[derive(Debug, Clone)]
292struct ReliableTxState {
293 next_seq: u32,
294 sent_order: VecDeque<u32>,
295 sent: BTreeMap<u32, ReliableSent>,
296}
297
298#[derive(Debug, Clone)]
299struct ReliableSent {
300 bytes: Arc<[u8]>,
301 last_send_ms: u64,
302 retries: u32,
303 queued: bool,
304 partial_acked: bool,
305}
306
307#[derive(Debug, Clone)]
308struct ReliableRxState {
309 expected_seq: u32,
310 buffered: BTreeMap<u32, Arc<[u8]>>,
311}
312
313#[derive(Debug, Clone)]
314struct ReliableReturnRouteState {
315 side: RelaySideId,
316}
317
318#[cfg(feature = "discovery")]
319#[derive(Debug, Clone, Default, PartialEq, Eq)]
320struct DiscoverySenderState {
321 reachable: Vec<crate::DataEndpoint>,
322 reachable_timesync_sources: Vec<String>,
323 topology_boards: Vec<TopologyBoardNode>,
324 last_seen_ms: u64,
325}
326
327#[inline]
328fn is_internal_control_type(ty: crate::DataType) -> bool {
329 if matches!(
330 ty,
331 crate::DataType::ReliableAck
332 | crate::DataType::ReliablePartialAck
333 | crate::DataType::ReliablePacketRequest
334 | crate::DataType::P2pMessage
335 ) {
336 return true;
337 }
338
339 #[cfg(feature = "timesync")]
340 if matches!(
341 ty,
342 crate::DataType::TimeSyncAnnounce
343 | crate::DataType::TimeSyncRequest
344 | crate::DataType::TimeSyncResponse
345 ) {
346 return true;
347 }
348
349 #[cfg(feature = "discovery")]
350 if discovery::is_discovery_type(ty) {
351 return true;
352 }
353
354 let _ = ty;
355 false
356}
357
358#[cfg(feature = "discovery")]
359#[derive(Debug, Clone, Default, PartialEq, Eq)]
360struct DiscoverySideState {
361 reachable: Vec<crate::DataEndpoint>,
362 reachable_timesync_sources: Vec<String>,
363 last_seen_ms: u64,
364 announcers: BTreeMap<String, DiscoverySenderState>,
365}
366
367#[derive(Clone, Debug, Default, PartialEq, Eq)]
368struct SideChunkAssembly {
369 total: u16,
370 received: BTreeMap<u16, Arc<[u8]>>,
371}
372
373#[derive(Clone, Debug, Default)]
374struct SideTransportState {
375 tx_template_ids: BTreeMap<u64, u32>,
376 tx_templates: BTreeMap<u64, SideHeaderTemplate>,
377 tx_last_timestamps: BTreeMap<u32, u64>,
378 rx_templates: BTreeMap<u64, SideHeaderTemplate>,
379 rx_templates_by_id: BTreeMap<u32, SideHeaderTemplate>,
380 rx_last_timestamps: BTreeMap<u32, u64>,
381 rx_chunks: BTreeMap<u32, SideChunkAssembly>,
382 next_chunk_id: u32,
383 next_template_id: u32,
384}
385
386impl SideTransportState {
387 fn tx_template_count(&self) -> usize {
388 self.tx_template_ids.len()
389 }
390
391 fn rx_template_count(&self) -> usize {
392 self.rx_templates_by_id.len()
393 }
394
395 fn insert_tx_template(
396 &mut self,
397 template: SideHeaderTemplate,
398 template_id: u32,
399 max_templates: usize,
400 ) -> bool {
401 if max_templates == 0 {
402 return false;
403 }
404 let mut evicted = false;
405 if self.tx_template_ids.len() >= max_templates
406 && !self.tx_template_ids.contains_key(&template.hash)
407 && let Some(old_hash) = self.tx_template_ids.keys().next().copied()
408 {
409 if let Some(old_id) = self.tx_template_ids.remove(&old_hash) {
410 self.tx_last_timestamps.remove(&old_id);
411 }
412 self.tx_templates.remove(&old_hash);
413 evicted = true;
414 }
415 self.tx_template_ids.insert(template.hash, template_id);
416 self.tx_templates.insert(template.hash, template);
417 evicted
418 }
419
420 fn insert_rx_template(
421 &mut self,
422 template_id: u32,
423 template: SideHeaderTemplate,
424 max_templates: usize,
425 ) -> bool {
426 if max_templates == 0 {
427 return false;
428 }
429 let mut evicted = false;
430 if self.rx_templates_by_id.len() >= max_templates
431 && !self.rx_templates_by_id.contains_key(&template_id)
432 && let Some(old_id) = self.rx_templates_by_id.keys().next().copied()
433 && let Some(old_template) = self.rx_templates_by_id.remove(&old_id)
434 {
435 self.rx_templates.remove(&old_template.hash);
436 self.rx_last_timestamps.remove(&old_id);
437 evicted = true;
438 }
439 self.rx_templates_by_id
440 .insert(template_id, template.clone());
441 self.rx_templates.insert(template.hash, template);
442 evicted
443 }
444}
445
446#[derive(Clone, Debug, PartialEq, Eq)]
447struct SideHeaderTemplate {
448 hash: u64,
449 base_flags: u8,
450 prefix: Arc<[u8]>,
451 between: Arc<[u8]>,
452 reliable_flags: Option<u8>,
453 reliable_compact: bool,
454}
455
456type SideTemplateExtract<'a> = (
457 SideHeaderTemplate,
458 crate::DataType,
459 u8,
460 u64,
461 u64,
462 u16,
463 Option<(u32, u32)>,
464 &'a [u8],
465);
466
467#[derive(Clone, Copy, Debug, PartialEq, Eq)]
468enum SideCompactTimestampMode {
469 Absolute,
470 Delta,
471 Omitted,
472}
473
474#[derive(Debug, Clone, Default)]
475struct AdaptiveRouteStats {
476 estimated_bandwidth_bps: u64,
477 peak_bandwidth_bps: u64,
478 last_observed_ms: u64,
479 last_slow_observed_ms: u64,
480 sample_count: u64,
481 window_started_ms: u64,
482 window_bytes: u64,
483 peak_usage_bps: u64,
484}
485
486#[cfg(feature = "discovery")]
487#[derive(Debug, Clone, Default)]
488struct DiscoverySideThrottleState {
489 next_ping_ms: u64,
490 next_full_ms: u64,
491}
492
493#[cfg(all(feature = "discovery", feature = "timesync"))]
494#[derive(Debug, Clone, Default)]
495struct TimeSyncSideThrottleState {
496 next_allowed_ms: u64,
497}
498
499#[cfg(feature = "discovery")]
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501enum DiscoveryAdvertiseLevel {
502 MinimalPing,
503 Full,
504}
505
506impl AdaptiveRouteStats {
507 #[inline]
508 fn observe(&mut self, bytes: usize, sample_bps: u64, now_ms: u64) {
509 self.estimated_bandwidth_bps = if self.estimated_bandwidth_bps == 0 {
510 sample_bps
511 } else if sample_bps >= self.estimated_bandwidth_bps {
512 self.estimated_bandwidth_bps
513 .saturating_mul(3)
514 .saturating_add(sample_bps.saturating_mul(5))
515 / 8
516 } else {
517 self.estimated_bandwidth_bps
518 .saturating_mul(7)
519 .saturating_add(sample_bps)
520 / 8
521 };
522 self.peak_bandwidth_bps = self.peak_bandwidth_bps.max(sample_bps);
523 self.last_observed_ms = now_ms;
524 if sample_bps > 0 && sample_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS {
525 self.last_slow_observed_ms = now_ms;
526 }
527 self.sample_count = self.sample_count.saturating_add(1);
528 if self.window_started_ms == 0 || now_ms.saturating_sub(self.window_started_ms) > 1_000 {
529 self.window_started_ms = now_ms;
530 self.window_bytes = 0;
531 }
532 self.window_bytes = self.window_bytes.saturating_add(bytes as u64);
533 self.peak_usage_bps = self.peak_usage_bps.max(self.current_usage_bps(now_ms));
534 }
535
536 #[inline]
537 fn current_usage_bps(&self, now_ms: u64) -> u64 {
538 if self.window_started_ms == 0 {
539 return 0;
540 }
541 let elapsed_ms = now_ms.saturating_sub(self.window_started_ms).max(1);
542 ((u128::from(self.window_bytes)).saturating_mul(1000) / u128::from(elapsed_ms))
543 .min(u128::from(u64::MAX)) as u64
544 }
545
546 #[inline]
547 fn available_headroom_bps(&self, now_ms: u64) -> u64 {
548 let capacity = self
549 .estimated_bandwidth_bps
550 .max(self.peak_bandwidth_bps)
551 .max(1);
552 capacity.saturating_sub(self.current_usage_bps(now_ms))
553 }
554
555 #[inline]
556 fn weight(&self, now_ms: u64) -> u64 {
557 self.available_headroom_bps(now_ms).max(1)
558 }
559
560 #[inline]
561 fn snapshot(&self, now_ms: u64, auto_balancing_enabled: bool) -> AdaptiveLinkStats {
562 let current_usage_bps = self.current_usage_bps(now_ms);
563 let estimated_capacity_bps = self.estimated_bandwidth_bps.max(1);
564 let peak_capacity_bps = self.peak_bandwidth_bps.max(estimated_capacity_bps);
565 let available_headroom_bps = peak_capacity_bps.saturating_sub(current_usage_bps);
566 AdaptiveLinkStats {
567 auto_balancing_enabled,
568 estimated_capacity_bps,
569 peak_capacity_bps,
570 current_usage_bps,
571 peak_usage_bps: self.peak_usage_bps.max(current_usage_bps),
572 available_headroom_bps,
573 effective_weight: available_headroom_bps.max(1),
574 last_observed_ms: self.last_observed_ms,
575 sample_count: self.sample_count,
576 }
577 }
578}
579
580#[derive(Debug, Clone, Default)]
581struct TypeRuntimeStatsInner {
582 tx_packets: u64,
583 tx_bytes: u64,
584 rx_packets: u64,
585 rx_bytes: u64,
586 relayed_tx_packets: u64,
587 relayed_tx_bytes: u64,
588 relayed_rx_packets: u64,
589 relayed_rx_bytes: u64,
590 tx_retries: u64,
591 handler_failures: u64,
592}
593
594#[derive(Debug, Clone, Default)]
595struct SideRuntimeStatsInner {
596 tx_packets: u64,
597 tx_bytes: u64,
598 rx_packets: u64,
599 rx_bytes: u64,
600 relayed_tx_packets: u64,
601 relayed_tx_bytes: u64,
602 relayed_rx_packets: u64,
603 relayed_rx_bytes: u64,
604 tx_retries: u64,
605 tx_handler_failures: u64,
606 total_handler_retries: u64,
607 side_transport_full_frames: u64,
608 side_transport_compact_frames: u64,
609 side_transport_compact_delta_frames: u64,
610 side_transport_compact_omitted_timestamp_frames: u64,
611 side_transport_chunk_frames: u64,
612 side_transport_raw_bytes: u64,
613 side_transport_wire_bytes: u64,
614 side_transport_bytes_saved: u64,
615 side_transport_min_compact_overhead_bytes: Option<usize>,
616 side_transport_max_compact_overhead_bytes: Option<usize>,
617 side_transport_compact_target_misses: u64,
618 side_transport_template_evictions: u64,
619 data_types: BTreeMap<u32, TypeRuntimeStatsInner>,
620}
621
622impl SideRuntimeStatsInner {
623 fn type_stats_mut(&mut self, ty: crate::DataType) -> &mut TypeRuntimeStatsInner {
624 self.data_types.entry(ty.as_u32()).or_default()
625 }
626
627 fn note_tx(&mut self, ty: crate::DataType, bytes: usize, retries: usize) {
628 self.tx_packets = self.tx_packets.saturating_add(1);
629 self.tx_bytes = self.tx_bytes.saturating_add(bytes as u64);
630 self.relayed_tx_packets = self.relayed_tx_packets.saturating_add(1);
631 self.relayed_tx_bytes = self.relayed_tx_bytes.saturating_add(bytes as u64);
632 self.tx_retries = self.tx_retries.saturating_add(retries as u64);
633 self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
634 let stats = self.type_stats_mut(ty);
635 stats.tx_packets = stats.tx_packets.saturating_add(1);
636 stats.tx_bytes = stats.tx_bytes.saturating_add(bytes as u64);
637 stats.relayed_tx_packets = stats.relayed_tx_packets.saturating_add(1);
638 stats.relayed_tx_bytes = stats.relayed_tx_bytes.saturating_add(bytes as u64);
639 stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
640 }
641
642 fn note_rx(&mut self, ty: crate::DataType, bytes: usize) {
643 self.rx_packets = self.rx_packets.saturating_add(1);
644 self.rx_bytes = self.rx_bytes.saturating_add(bytes as u64);
645 self.relayed_rx_packets = self.relayed_rx_packets.saturating_add(1);
646 self.relayed_rx_bytes = self.relayed_rx_bytes.saturating_add(bytes as u64);
647 let stats = self.type_stats_mut(ty);
648 stats.rx_packets = stats.rx_packets.saturating_add(1);
649 stats.rx_bytes = stats.rx_bytes.saturating_add(bytes as u64);
650 stats.relayed_rx_packets = stats.relayed_rx_packets.saturating_add(1);
651 stats.relayed_rx_bytes = stats.relayed_rx_bytes.saturating_add(bytes as u64);
652 }
653
654 fn note_tx_failure(&mut self, ty: crate::DataType, retries: usize) {
655 self.tx_handler_failures = self.tx_handler_failures.saturating_add(1);
656 self.tx_retries = self.tx_retries.saturating_add(retries as u64);
657 self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
658 let stats = self.type_stats_mut(ty);
659 stats.handler_failures = stats.handler_failures.saturating_add(1);
660 stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
661 }
662
663 fn note_side_transport_full(&mut self, raw_bytes: usize, wire_bytes: usize) {
664 self.side_transport_full_frames = self.side_transport_full_frames.saturating_add(1);
665 self.note_side_transport_bytes(raw_bytes, wire_bytes);
666 }
667
668 fn note_side_transport_compact(
669 &mut self,
670 raw_bytes: usize,
671 wire_bytes: usize,
672 compact_overhead_bytes: usize,
673 used_timestamp_delta: bool,
674 omitted_timestamp: bool,
675 ) {
676 self.side_transport_compact_frames = self.side_transport_compact_frames.saturating_add(1);
677 if used_timestamp_delta {
678 self.side_transport_compact_delta_frames =
679 self.side_transport_compact_delta_frames.saturating_add(1);
680 }
681 if omitted_timestamp {
682 self.side_transport_compact_omitted_timestamp_frames = self
683 .side_transport_compact_omitted_timestamp_frames
684 .saturating_add(1);
685 }
686 self.note_side_transport_bytes(raw_bytes, wire_bytes);
687 self.side_transport_min_compact_overhead_bytes = Some(
688 self.side_transport_min_compact_overhead_bytes
689 .map_or(compact_overhead_bytes, |v| v.min(compact_overhead_bytes)),
690 );
691 self.side_transport_max_compact_overhead_bytes = Some(
692 self.side_transport_max_compact_overhead_bytes
693 .map_or(compact_overhead_bytes, |v| v.max(compact_overhead_bytes)),
694 );
695 }
696
697 fn note_side_transport_chunks(&mut self, chunks: usize) {
698 self.side_transport_chunk_frames = self
699 .side_transport_chunk_frames
700 .saturating_add(chunks as u64);
701 }
702
703 fn note_side_transport_bytes(&mut self, raw_bytes: usize, wire_bytes: usize) {
704 self.side_transport_raw_bytes = self
705 .side_transport_raw_bytes
706 .saturating_add(raw_bytes as u64);
707 self.side_transport_wire_bytes = self
708 .side_transport_wire_bytes
709 .saturating_add(wire_bytes as u64);
710 if raw_bytes > wire_bytes {
711 self.side_transport_bytes_saved = self
712 .side_transport_bytes_saved
713 .saturating_add((raw_bytes - wire_bytes) as u64);
714 }
715 }
716
717 fn note_side_transport_compact_target_miss(&mut self) {
718 self.side_transport_compact_target_misses =
719 self.side_transport_compact_target_misses.saturating_add(1);
720 }
721
722 fn note_side_transport_template_eviction(&mut self) {
723 self.side_transport_template_evictions =
724 self.side_transport_template_evictions.saturating_add(1);
725 }
726}
727
728#[derive(Clone, Copy, Debug, PartialEq, Eq)]
729enum RouteSelectionOrigin {
730 Flood,
731 Discovered,
732}
733
734struct RelayInner {
736 sides: Vec<Option<RelaySide>>,
737 route_overrides: BTreeMap<(Option<RelaySideId>, RelaySideId), bool>,
738 typed_route_overrides: BTreeMap<(Option<RelaySideId>, u32, RelaySideId), bool>,
739 route_weights: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
740 route_priorities: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
741 source_route_modes: BTreeMap<Option<RelaySideId>, RouteSelectionMode>,
742 route_selection_cursors: BTreeMap<Option<RelaySideId>, u64>,
743 adaptive_route_stats: BTreeMap<RelaySideId, AdaptiveRouteStats>,
744 side_runtime_stats: BTreeMap<RelaySideId, SideRuntimeStatsInner>,
745 side_transport: BTreeMap<RelaySideId, SideTransportState>,
746 rx_queue: BoundedDeque<RelayRxItem>,
747 tx_queue: BoundedDeque<RelayTxItem>,
748 replay_queue: BoundedDeque<RelayReplayItem>,
749 recent_rx: BoundedDeque<u64>,
750 reliable_tx: BTreeMap<(RelaySideId, u32), ReliableTxState>,
751 reliable_rx: BTreeMap<(RelaySideId, u32), ReliableRxState>,
752 reliable_return_routes: BTreeMap<u64, ReliableReturnRouteState>,
753 reliable_return_route_order: VecDeque<u64>,
754 end_to_end_acked_destinations: BTreeMap<u64, BTreeSet<u64>>,
755 end_to_end_acked_destination_order: VecDeque<u64>,
756 total_handler_failures: u64,
757 total_handler_retries: u64,
758 #[cfg(feature = "discovery")]
759 discovery_routes: BTreeMap<RelaySideId, DiscoverySideState>,
760 #[cfg(feature = "discovery")]
761 discovery_cadence: DiscoveryCadenceState,
762 #[cfg(feature = "discovery")]
763 discovery_side_throttle: BTreeMap<RelaySideId, DiscoverySideThrottleState>,
764 #[cfg(all(feature = "discovery", feature = "timesync"))]
765 timesync_side_throttle: BTreeMap<RelaySideId, TimeSyncSideThrottleState>,
766}
767
768#[derive(Clone, Copy, Debug, PartialEq, Eq)]
769enum RelayQueueKind {
770 Rx,
771 Tx,
772 Replay,
773 Recent,
774 ReliableRxBuffer,
775 #[cfg(feature = "discovery")]
776 Discovery,
777}
778
779impl RelayInner {
780 #[cfg(feature = "discovery")]
781 fn topology_board_byte_cost(board: &TopologyBoardNode) -> usize {
782 board
783 .sender_id
784 .len()
785 .saturating_add(board.reachable_endpoints.len() * size_of::<crate::DataEndpoint>())
786 .saturating_add(
787 board
788 .reachable_timesync_sources
789 .iter()
790 .map(|s| s.len())
791 .sum::<usize>(),
792 )
793 .saturating_add(board.connections.iter().map(|s| s.len()).sum::<usize>())
794 }
795
796 #[cfg(feature = "discovery")]
797 fn discovery_sender_byte_cost(sender: &str, state: &DiscoverySenderState) -> usize {
798 sender
799 .len()
800 .saturating_add(state.reachable.len() * size_of::<crate::DataEndpoint>())
801 .saturating_add(
802 state
803 .reachable_timesync_sources
804 .iter()
805 .map(|s| s.len())
806 .sum::<usize>(),
807 )
808 .saturating_add(
809 state
810 .topology_boards
811 .iter()
812 .map(Self::topology_board_byte_cost)
813 .sum::<usize>(),
814 )
815 .saturating_add(size_of::<DiscoverySenderState>())
816 }
817
818 #[cfg(feature = "discovery")]
819 fn discovery_route_byte_cost(route: &DiscoverySideState) -> usize {
820 size_of::<DiscoverySideState>()
821 .saturating_add(route.reachable.len() * size_of::<crate::DataEndpoint>())
822 .saturating_add(
823 route
824 .reachable_timesync_sources
825 .iter()
826 .map(|s| s.len())
827 .sum::<usize>(),
828 )
829 .saturating_add(
830 route
831 .announcers
832 .iter()
833 .map(|(sender, state)| Self::discovery_sender_byte_cost(sender, state))
834 .sum::<usize>(),
835 )
836 }
837
838 #[cfg(feature = "discovery")]
839 fn discovery_bytes_used(&self) -> usize {
840 self.discovery_routes
841 .values()
842 .map(Self::discovery_route_byte_cost)
843 .sum()
844 }
845
846 #[inline]
847 fn reliable_rx_buffered_bytes(&self) -> usize {
848 self.reliable_rx
849 .values()
850 .flat_map(|state| state.buffered.values())
851 .map(|bytes| size_of::<Arc<[u8]>>() + bytes.len())
852 .sum()
853 }
854
855 #[inline]
856 fn shared_queue_bytes_used(&self) -> usize {
857 self.rx_queue
858 .bytes_used()
859 .saturating_add(self.tx_queue.bytes_used())
860 .saturating_add(self.replay_queue.bytes_used())
861 .saturating_add(self.recent_rx.max_bytes())
862 .saturating_add(self.reliable_rx_buffered_bytes())
863 .saturating_add(crate::config::schema_bytes_used())
864 .saturating_add({
865 #[cfg(feature = "discovery")]
866 {
867 self.discovery_bytes_used()
868 }
869 #[cfg(not(feature = "discovery"))]
870 {
871 0
872 }
873 })
874 }
875
876 fn reliable_rx_buffer_len(&self) -> usize {
877 self.reliable_rx
878 .values()
879 .map(|state| state.buffered.len())
880 .sum()
881 }
882
883 fn pop_reliable_rx_buffered(&mut self) -> Option<Arc<[u8]>> {
884 let key = self
885 .reliable_rx
886 .iter()
887 .find_map(|(key, state)| (!state.buffered.is_empty()).then_some(*key))?;
888 self.reliable_rx
889 .get_mut(&key)?
890 .buffered
891 .pop_first()
892 .map(|(_, v)| v)
893 }
894
895 fn pop_shared_queue_item(&mut self, preferred: RelayQueueKind) -> bool {
896 match preferred {
897 RelayQueueKind::Rx => self.rx_queue.pop_front().is_some(),
898 RelayQueueKind::Tx => self.tx_queue.pop_front().is_some(),
899 RelayQueueKind::Replay => self.replay_queue.pop_front().is_some(),
900 RelayQueueKind::Recent => self.recent_rx.pop_front().is_some(),
901 RelayQueueKind::ReliableRxBuffer => self.pop_reliable_rx_buffered().is_some(),
902 #[cfg(feature = "discovery")]
903 RelayQueueKind::Discovery => self.pop_discovery_route(),
904 }
905 }
906
907 #[cfg(feature = "discovery")]
908 fn pop_discovery_route(&mut self) -> bool {
909 let Some((&side, _)) = self
910 .discovery_routes
911 .iter()
912 .min_by_key(|(_, route)| route.last_seen_ms)
913 else {
914 return false;
915 };
916 self.discovery_routes.remove(&side);
917 Self::queue_budget_warning("topology route evicted because MAX_QUEUE_BUDGET is full");
918 true
919 }
920
921 fn largest_shared_queue(&self) -> Option<RelayQueueKind> {
922 let candidates = [
923 (
924 RelayQueueKind::Rx,
925 self.rx_queue.bytes_used(),
926 self.rx_queue.len(),
927 ),
928 (
929 RelayQueueKind::Tx,
930 self.tx_queue.bytes_used(),
931 self.tx_queue.len(),
932 ),
933 (
934 RelayQueueKind::Replay,
935 self.replay_queue.bytes_used(),
936 self.replay_queue.len(),
937 ),
938 (RelayQueueKind::Recent, 0, 0),
939 (
940 RelayQueueKind::ReliableRxBuffer,
941 self.reliable_rx_buffered_bytes(),
942 self.reliable_rx_buffer_len(),
943 ),
944 #[cfg(feature = "discovery")]
945 (
946 RelayQueueKind::Discovery,
947 self.discovery_bytes_used(),
948 self.discovery_routes.len(),
949 ),
950 ];
951 candidates
952 .into_iter()
953 .filter(|(_, bytes, len)| *bytes > 0 && *len > 0)
954 .max_by_key(|(kind, bytes, _)| {
955 (
956 *bytes,
957 if *kind == RelayQueueKind::ReliableRxBuffer {
958 0
959 } else {
960 1
961 },
962 )
963 })
964 .map(|(kind, _, _)| kind)
965 }
966
967 fn make_shared_queue_room(
968 &mut self,
969 incoming_cost: usize,
970 preferred: RelayQueueKind,
971 ) -> TelemetryResult<()> {
972 if incoming_cost > MAX_QUEUE_BUDGET {
973 return Err(TelemetryError::PacketTooLarge(
974 "Item exceeds maximum shared queue budget",
975 ));
976 }
977
978 while self.shared_queue_bytes_used().saturating_add(incoming_cost) > MAX_QUEUE_BUDGET {
979 let victim = self.largest_shared_queue().unwrap_or(preferred);
980 if victim == RelayQueueKind::Discovery {
981 Self::queue_budget_warning("topology data is using the largest queue budget share");
982 }
983 if !self.pop_shared_queue_item(victim) && !self.pop_shared_queue_item(preferred) {
984 return Err(TelemetryError::PacketTooLarge(
985 "Item exceeds maximum shared queue budget",
986 ));
987 }
988 }
989
990 Ok(())
991 }
992
993 #[inline]
994 fn queue_budget_warning(msg: &str) {
995 #[cfg(feature = "std")]
996 eprintln!("sedsnet queue budget warning: {msg}");
997 let _ = msg;
998 }
999
1000 #[cfg(feature = "discovery")]
1001 fn fit_discovery_budget(&mut self) {
1002 while self.shared_queue_bytes_used() > MAX_QUEUE_BUDGET {
1003 if !self.pop_discovery_route() {
1004 break;
1005 }
1006 }
1007 }
1008
1009 fn push_rx(&mut self, item: RelayRxItem) -> TelemetryResult<()> {
1010 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Rx)?;
1011 self.rx_queue
1012 .push_back_prioritized(item, |queued| queued.priority)
1013 }
1014
1015 fn push_tx(&mut self, item: RelayTxItem) -> TelemetryResult<()> {
1016 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Tx)?;
1017 self.tx_queue
1018 .push_back_prioritized(item, |queued| queued.priority)
1019 }
1020
1021 fn push_replay(&mut self, item: RelayReplayItem) -> TelemetryResult<()> {
1022 self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Replay)?;
1023 self.replay_queue
1024 .push_back_prioritized(item, |queued| queued.priority)
1025 }
1026
1027 fn push_recent_rx(&mut self, id: u64) -> TelemetryResult<()> {
1028 while self.recent_rx.len() >= MAX_RECENT_RX_IDS {
1029 let _ = self.recent_rx.pop_front();
1030 }
1031 self.make_shared_queue_room(0, RelayQueueKind::Recent)?;
1032 self.recent_rx.push_back(id)
1033 }
1034
1035 fn buffer_reliable_rx(
1036 &mut self,
1037 side: RelaySideId,
1038 ty: crate::DataType,
1039 seq: u32,
1040 bytes: Arc<[u8]>,
1041 ) -> TelemetryResult<()> {
1042 let key = Relay::reliable_key(side, ty);
1043 if self
1044 .reliable_rx
1045 .get(&key)
1046 .is_some_and(|state| state.buffered.contains_key(&seq))
1047 {
1048 return Ok(());
1049 }
1050 let cost = size_of::<Arc<[u8]>>() + bytes.len();
1051 self.make_shared_queue_room(cost, RelayQueueKind::ReliableRxBuffer)?;
1052 let rx_state = self
1053 .reliable_rx
1054 .entry(key)
1055 .or_insert_with(|| ReliableRxState {
1056 expected_seq: 1,
1057 buffered: BTreeMap::new(),
1058 });
1059 if rx_state.buffered.len() >= RELIABLE_MAX_PENDING {
1060 let _ = rx_state.buffered.pop_first();
1061 }
1062 rx_state.buffered.insert(seq, bytes);
1063 Ok(())
1064 }
1065}
1066
1067pub struct Relay {
1072 sender: RouterMutex<Arc<str>>,
1073 state: RouterMutex<RelayInner>,
1074 side_tx_gate: ReentryGate,
1075 clock: Box<dyn Clock + Send + Sync>,
1076}
1077
1078enum RemoteSidePlan {
1079 Target(Vec<RelaySideId>),
1080}
1081
1082impl Relay {
1083 const END_TO_END_ACK_SENDER: &'static str = "E2EACK";
1084 const END_TO_END_ACK_PREFIX: &'static str = "E2EACK:";
1085
1086 fn relay_item_priority(data: &RelayItem) -> TelemetryResult<u8> {
1087 let ty = match data {
1088 RelayItem::Packet(pkt) => pkt.data_type(),
1089 RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
1090 };
1091 Ok(message_priority(ty))
1092 }
1093
1094 #[inline]
1095 fn is_side_tx_busy(err: &TelemetryError) -> bool {
1096 matches!(err, TelemetryError::Io("side tx busy"))
1097 }
1098
1099 fn process_replay_queue_item(&self) -> TelemetryResult<bool> {
1100 let Some(item) = ({
1101 let mut st = self.state.lock();
1102 st.replay_queue.pop_front()
1103 }) else {
1104 return Ok(false);
1105 };
1106 let frame = wire_format::peek_frame_info(item.bytes.as_ref())?;
1107 let ty = frame.envelope.ty;
1108 let Some(hdr) = frame.reliable else {
1109 return Ok(false);
1110 };
1111 {
1112 let mut st = self.state.lock();
1113 let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1114 if !tx_state.sent.contains_key(&hdr.seq) {
1115 return Ok(false);
1116 }
1117 }
1118 if let Err(e) = self.send_reliable_raw_to_side(item.dst, item.bytes.clone()) {
1119 if Self::is_side_tx_busy(&e) {
1120 let mut st = self.state.lock();
1121 st.push_replay(item)?;
1122 return Ok(false);
1123 }
1124 return Err(e);
1125 }
1126 let mut st = self.state.lock();
1127 let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1128 if let Some(sent) = tx_state.sent.get_mut(&hdr.seq) {
1129 sent.last_send_ms = self.clock.now_ms();
1130 sent.queued = false;
1131 }
1132 Ok(true)
1133 }
1134
1135 fn pop_ready_tx_item(
1136 &self,
1137 ) -> Option<(
1138 Option<RelaySideId>,
1139 RelaySideId,
1140 RelayTxHandlerFn,
1141 RelaySideOptions,
1142 RelayItem,
1143 )> {
1144 let mut st = self.state.lock();
1145 if let Some(item) = st.tx_queue.pop_front() {
1146 let side = st.sides.get(item.dst).and_then(|side| side.clone());
1147 side.map(|s| (item.src, item.dst, s.tx_handler, s.opts, item.data))
1148 } else {
1149 None
1150 }
1151 }
1152
1153 fn send_tx_item(
1154 &self,
1155 src: Option<RelaySideId>,
1156 dst: RelaySideId,
1157 handler: RelayTxHandlerFn,
1158 opts: RelaySideOptions,
1159 data: RelayItem,
1160 ) -> TelemetryResult<bool> {
1161 let allowed = {
1162 let mut st = self.state.lock();
1163 let ty = match &data {
1164 RelayItem::Packet(pkt) => Some(pkt.data_type()),
1165 RelayItem::Packed(bytes) => Some(wire_format::peek_envelope(bytes.as_ref())?.ty),
1166 };
1167 let route_allowed = self.route_allowed_locked(&st, src, ty, dst);
1168 #[cfg(all(feature = "discovery", feature = "timesync"))]
1169 let timesync_allowed = ty
1170 .map(|ty| {
1171 Self::timesync_allowed_for_side_locked(&mut st, dst, ty, self.clock.now_ms())
1172 })
1173 .unwrap_or(true);
1174 #[cfg(not(all(feature = "discovery", feature = "timesync")))]
1175 let timesync_allowed = true;
1176 route_allowed && timesync_allowed
1177 };
1178 if !allowed {
1179 return Ok(false);
1180 }
1181 if opts.reliable_enabled && matches!(handler, RelayTxHandlerFn::Packed(_)) {
1182 self.send_reliable_to_side(dst, data)?;
1183 Ok(true)
1184 } else if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1185 self.call_tx_handler(dst, &handler, &adjusted)?;
1186 Ok(true)
1187 } else {
1188 Ok(false)
1189 }
1190 }
1191
1192 pub fn new(clock: Box<dyn Clock + Send + Sync>) -> Self {
1194 Self {
1195 sender: RouterMutex::new(Arc::from("RELAY")),
1196 state: RouterMutex::new(RelayInner {
1197 sides: Vec::new(),
1198 route_overrides: BTreeMap::new(),
1199 typed_route_overrides: BTreeMap::new(),
1200 route_weights: BTreeMap::new(),
1201 route_priorities: BTreeMap::new(),
1202 source_route_modes: BTreeMap::new(),
1203 route_selection_cursors: BTreeMap::new(),
1204 adaptive_route_stats: BTreeMap::new(),
1205 side_runtime_stats: BTreeMap::new(),
1206 side_transport: BTreeMap::new(),
1207 rx_queue: BoundedDeque::new(MAX_QUEUE_BUDGET, STARTING_QUEUE_SIZE, QUEUE_GROW_STEP),
1208 tx_queue: BoundedDeque::new(MAX_QUEUE_BUDGET, STARTING_QUEUE_SIZE, QUEUE_GROW_STEP),
1209 replay_queue: BoundedDeque::new(
1210 MAX_QUEUE_BUDGET,
1211 STARTING_QUEUE_SIZE,
1212 QUEUE_GROW_STEP,
1213 ),
1214 recent_rx: BoundedDeque::new(
1215 RECENT_RX_QUEUE_BYTES.max(1),
1216 RECENT_RX_QUEUE_BYTES.max(1),
1217 QUEUE_GROW_STEP,
1218 ),
1219 reliable_tx: BTreeMap::new(),
1220 reliable_rx: BTreeMap::new(),
1221 reliable_return_routes: BTreeMap::new(),
1222 reliable_return_route_order: VecDeque::new(),
1223 end_to_end_acked_destinations: BTreeMap::new(),
1224 end_to_end_acked_destination_order: VecDeque::new(),
1225 total_handler_failures: 0,
1226 total_handler_retries: 0,
1227 #[cfg(feature = "discovery")]
1228 discovery_routes: BTreeMap::new(),
1229 #[cfg(feature = "discovery")]
1230 discovery_cadence: DiscoveryCadenceState::default(),
1231 #[cfg(feature = "discovery")]
1232 discovery_side_throttle: BTreeMap::new(),
1233 #[cfg(all(feature = "discovery", feature = "timesync"))]
1234 timesync_side_throttle: BTreeMap::new(),
1235 }),
1236 side_tx_gate: ReentryGate::new(),
1237 clock,
1238 }
1239 }
1240
1241 #[inline]
1242 fn sender_arc(&self) -> Arc<str> {
1243 self.sender.lock().clone()
1244 }
1245
1246 #[inline]
1247 pub fn sender(&self) -> Arc<str> {
1248 self.sender_arc()
1249 }
1250
1251 pub fn set_sender<S: AsRef<str>>(&self, sender: S) {
1252 *self.sender.lock() = Arc::from(sender.as_ref());
1253 }
1254
1255 #[inline]
1256 fn try_enter_side_tx(&self) -> Option<ReentryGuard<'_>> {
1257 self.side_tx_gate.try_enter()
1258 }
1259
1260 #[inline]
1261 fn side_tx_active(&self) -> bool {
1262 self.side_tx_gate.is_active()
1263 }
1264
1265 #[inline]
1266 fn side_ref(st: &RelayInner, side: RelaySideId) -> TelemetryResult<&RelaySide> {
1267 st.sides
1268 .get(side)
1269 .and_then(|side| side.as_ref())
1270 .ok_or(TelemetryError::HandlerError("relay: invalid side id"))
1271 }
1272
1273 fn note_side_tx_success(
1274 &self,
1275 side: RelaySideId,
1276 ty: crate::DataType,
1277 bytes: usize,
1278 attempts: usize,
1279 ) {
1280 let mut st = self.state.lock();
1281 let entry = st.side_runtime_stats.entry(side).or_default();
1282 entry.note_tx(ty, bytes, attempts.saturating_sub(1));
1283 }
1284
1285 fn note_side_tx_failure(&self, side: RelaySideId, ty: crate::DataType, attempts: usize) {
1286 let mut st = self.state.lock();
1287 st.total_handler_failures = st.total_handler_failures.saturating_add(1);
1288 st.total_handler_retries = st.total_handler_retries.saturating_add(attempts as u64);
1289 let entry = st.side_runtime_stats.entry(side).or_default();
1290 entry.note_tx_failure(ty, attempts);
1291 }
1292
1293 fn note_side_rx(&self, side: RelaySideId, ty: crate::DataType, bytes: usize) {
1294 let mut st = self.state.lock();
1295 let entry = st.side_runtime_stats.entry(side).or_default();
1296 entry.note_rx(ty, bytes);
1297 }
1298
1299 #[inline]
1300 fn ensure_side_ingress_enabled(&self, side: RelaySideId) -> TelemetryResult<()> {
1301 let st = self.state.lock();
1302 let side_ref = Self::side_ref(&st, side)?;
1303 if side_ref.opts.ingress_enabled {
1304 Ok(())
1305 } else {
1306 Err(TelemetryError::HandlerError(
1307 "relay: ingress disabled for side id",
1308 ))
1309 }
1310 }
1311
1312 #[inline]
1313 fn route_allowed_locked(
1314 &self,
1315 st: &RelayInner,
1316 src: Option<RelaySideId>,
1317 ty: Option<crate::DataType>,
1318 dst: RelaySideId,
1319 ) -> bool {
1320 let Ok(dst_side) = Self::side_ref(st, dst) else {
1321 return false;
1322 };
1323 if !dst_side.opts.egress_enabled {
1324 return false;
1325 }
1326 if let Some(src_id) = src {
1327 let Ok(src_side) = Self::side_ref(st, src_id) else {
1328 return false;
1329 };
1330 if !src_side.opts.ingress_enabled || src_id == dst {
1331 return false;
1332 }
1333 }
1334 let base_allowed = st.route_overrides.get(&(src, dst)).copied().unwrap_or(true);
1335 if !base_allowed {
1336 return false;
1337 }
1338
1339 let Some(ty) = ty else {
1340 return true;
1341 };
1342 if st
1343 .typed_route_overrides
1344 .keys()
1345 .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1346 {
1347 return st
1348 .typed_route_overrides
1349 .get(&(src, ty.as_u32(), dst))
1350 .copied()
1351 .unwrap_or(false);
1352 }
1353 true
1354 }
1355
1356 fn has_typed_route_overrides_locked(
1357 st: &RelayInner,
1358 src: Option<RelaySideId>,
1359 ty: crate::DataType,
1360 ) -> bool {
1361 st.typed_route_overrides
1362 .keys()
1363 .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1364 }
1365
1366 fn eligible_side_ids_locked(
1367 &self,
1368 st: &RelayInner,
1369 src: Option<RelaySideId>,
1370 ty: Option<crate::DataType>,
1371 restrict_link_local: bool,
1372 ) -> Vec<RelaySideId> {
1373 st.sides
1374 .iter()
1375 .enumerate()
1376 .filter_map(|(side_id, side)| {
1377 let side = side.as_ref()?;
1378 if restrict_link_local && !side.opts.link_local_enabled {
1379 return None;
1380 }
1381 if self.route_allowed_locked(st, src, ty, side_id) {
1382 Some(side_id)
1383 } else {
1384 None
1385 }
1386 })
1387 .collect()
1388 }
1389
1390 fn apply_route_selection_locked(
1391 &self,
1392 st: &mut RelayInner,
1393 src: Option<RelaySideId>,
1394 mut sides: Vec<RelaySideId>,
1395 origin: RouteSelectionOrigin,
1396 ) -> Vec<RelaySideId> {
1397 if sides.len() <= 1 {
1398 return sides;
1399 }
1400
1401 let selection_mode = st.source_route_modes.get(&src).copied();
1402 if selection_mode.is_none() && origin == RouteSelectionOrigin::Discovered {
1403 return self.apply_adaptive_discovery_selection_locked(st, src, sides);
1404 }
1405
1406 match selection_mode.unwrap_or(RouteSelectionMode::Fanout) {
1407 RouteSelectionMode::Fanout => sides,
1408 RouteSelectionMode::Weighted => {
1409 sides.sort_unstable();
1410 let total_weight = sides.iter().fold(0_u64, |acc, side| {
1411 acc + u64::from(st.route_weights.get(&(src, *side)).copied().unwrap_or(1))
1412 });
1413 if total_weight == 0 {
1414 return Vec::new();
1415 }
1416 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1417 let pick = *cursor % total_weight;
1418 *cursor = cursor.wrapping_add(1);
1419 let mut remaining = pick;
1420 for side in sides {
1421 let weight =
1422 u64::from(st.route_weights.get(&(src, side)).copied().unwrap_or(1));
1423 if remaining < weight {
1424 return vec![side];
1425 }
1426 remaining -= weight;
1427 }
1428 Vec::new()
1429 }
1430 RouteSelectionMode::Failover => {
1431 sides.sort_by_key(|side| {
1432 (
1433 st.route_priorities.get(&(src, *side)).copied().unwrap_or(0),
1434 *side,
1435 )
1436 });
1437 sides.truncate(1);
1438 sides
1439 }
1440 }
1441 }
1442
1443 fn apply_adaptive_discovery_selection_locked(
1444 &self,
1445 st: &mut RelayInner,
1446 src: Option<RelaySideId>,
1447 mut sides: Vec<RelaySideId>,
1448 ) -> Vec<RelaySideId> {
1449 sides.sort_unstable();
1450 let mut unmeasured: Vec<_> = sides
1451 .iter()
1452 .copied()
1453 .filter(|side| !st.adaptive_route_stats.contains_key(side))
1454 .collect();
1455 if !unmeasured.is_empty() {
1456 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1457 let pick = (*cursor as usize) % unmeasured.len();
1458 *cursor = cursor.wrapping_add(1);
1459 return vec![unmeasured.swap_remove(pick)];
1460 }
1461
1462 let now_ms = self.clock.now_ms();
1463 let total_weight = sides.iter().fold(0_u64, |acc, side| {
1464 acc + st
1465 .adaptive_route_stats
1466 .get(side)
1467 .map(|stats| stats.weight(now_ms))
1468 .unwrap_or(1)
1469 });
1470 if total_weight == 0 {
1471 sides.truncate(1);
1472 return sides;
1473 }
1474
1475 let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1476 let pick = *cursor % total_weight;
1477 *cursor = cursor.wrapping_add(1);
1478 let mut remaining = pick;
1479 for side in sides {
1480 let weight = st
1481 .adaptive_route_stats
1482 .get(&side)
1483 .map(|stats| stats.weight(now_ms))
1484 .unwrap_or(1);
1485 if remaining < weight {
1486 return vec![side];
1487 }
1488 remaining -= weight;
1489 }
1490 Vec::new()
1491 }
1492
1493 fn record_side_tx_sample(
1494 &self,
1495 side: RelaySideId,
1496 bytes: usize,
1497 started_ms: u64,
1498 ended_ms: u64,
1499 ) {
1500 let sample_ms = ended_ms.saturating_sub(started_ms).max(1);
1501 let sample_bps = ((bytes as u128).saturating_mul(1000) / u128::from(sample_ms))
1502 .min(u128::from(u64::MAX)) as u64;
1503 let mut st = self.state.lock();
1504 st.adaptive_route_stats
1505 .entry(side)
1506 .or_default()
1507 .observe(bytes, sample_bps, ended_ms);
1508 }
1509
1510 pub fn note_side_link_probe_sample(
1515 &self,
1516 side: RelaySideId,
1517 bytes: usize,
1518 duration_ms: u64,
1519 ) -> TelemetryResult<()> {
1520 {
1521 let st = self.state.lock();
1522 let _ = Self::side_ref(&st, side).map_err(|_| TelemetryError::BadArg)?;
1523 }
1524 let ended_ms = self.clock.now_ms();
1525 self.record_side_tx_sample(side, bytes, ended_ms.saturating_sub(duration_ms), ended_ms);
1526 Ok(())
1527 }
1528
1529 fn relay_item_wire_len(data: &RelayItem) -> TelemetryResult<usize> {
1530 match data {
1531 RelayItem::Packet(pkt) => Ok(wire_format::pack_packet(pkt).len()),
1532 RelayItem::Packed(bytes) => Ok(bytes.len()),
1533 }
1534 }
1535
1536 #[inline]
1537 fn decode_end_to_end_reliable_ack(payload: &[u8]) -> TelemetryResult<u64> {
1538 if payload.len() != 8 {
1539 return Err(TelemetryError::Unpack("bad reliable e2e ack payload"));
1540 }
1541 Ok(u64::from_le_bytes(payload[0..8].try_into().unwrap()))
1542 }
1543
1544 #[inline]
1545 fn is_end_to_end_ack_sender(sender: &str) -> bool {
1546 sender == Self::END_TO_END_ACK_SENDER || sender.starts_with(Self::END_TO_END_ACK_PREFIX)
1547 }
1548
1549 #[inline]
1550 fn sender_hash(sender: &str) -> u64 {
1551 hash_bytes_u64(0x517C_C1B7_2722_0A95, sender.as_bytes())
1552 }
1553
1554 fn decode_end_to_end_ack_sender_hash(sender: &str) -> Option<u64> {
1555 sender
1556 .strip_prefix(Self::END_TO_END_ACK_PREFIX)
1557 .filter(|sender| !sender.is_empty())
1558 .map(Self::sender_hash)
1559 }
1560
1561 #[cfg(feature = "discovery")]
1562 fn is_end_to_end_destination_sender(&self, sender: &str) -> bool {
1563 sender != self.sender_arc().as_ref() && !Self::is_end_to_end_ack_sender(sender)
1564 }
1565
1566 fn reliable_control_target_packet_id(data: &RelayItem) -> TelemetryResult<Option<u64>> {
1575 match data {
1576 RelayItem::Packet(pkt) => {
1577 if pkt.data_type() != crate::DataType::ReliableAck
1578 || !Self::is_end_to_end_ack_sender(pkt.sender())
1579 {
1580 return Ok(None);
1581 }
1582 Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1583 }
1584 RelayItem::Packed(bytes) => {
1585 if wire_format::peek_frame_info(bytes.as_ref())
1586 .ok()
1587 .is_some_and(|frame| frame.ack_only())
1588 {
1589 return Ok(None);
1590 }
1591 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1592 if pkt.data_type() != crate::DataType::ReliableAck
1593 || !Self::is_end_to_end_ack_sender(pkt.sender())
1594 {
1595 return Ok(None);
1596 }
1597 Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1598 }
1599 }
1600 }
1601
1602 fn note_reliable_return_route(&self, side: RelaySideId, packet_id: u64) {
1603 let mut st = self.state.lock();
1604 Self::remember_reliable_return_route_locked(&mut st, packet_id);
1605 st.reliable_return_routes
1606 .insert(packet_id, ReliableReturnRouteState { side });
1607 }
1608
1609 fn remember_reliable_return_route_locked(st: &mut RelayInner, packet_id: u64) {
1615 let cap = RELIABLE_MAX_RETURN_ROUTES.max(1);
1616 st.reliable_return_route_order
1617 .retain(|id| st.reliable_return_routes.contains_key(id) && *id != packet_id);
1618 while st.reliable_return_route_order.len() >= cap {
1619 if let Some(oldest) = st.reliable_return_route_order.pop_front() {
1620 st.reliable_return_routes.remove(&oldest);
1621 } else {
1622 break;
1623 }
1624 }
1625 st.reliable_return_route_order.push_back(packet_id);
1626 }
1627
1628 fn note_end_to_end_acked_destination_locked(
1629 st: &mut RelayInner,
1630 packet_id: u64,
1631 sender_hash: u64,
1632 ) {
1633 let entry_cap = RELIABLE_MAX_END_TO_END_ACK_CACHE.max(1);
1634 st.end_to_end_acked_destination_order
1635 .retain(|id| st.end_to_end_acked_destinations.contains_key(id) && *id != packet_id);
1636 while st.end_to_end_acked_destination_order.len() >= entry_cap {
1637 if let Some(oldest) = st.end_to_end_acked_destination_order.pop_front() {
1638 st.end_to_end_acked_destinations.remove(&oldest);
1639 } else {
1640 break;
1641 }
1642 }
1643 st.end_to_end_acked_destination_order.push_back(packet_id);
1644
1645 let acked = st
1646 .end_to_end_acked_destinations
1647 .entry(packet_id)
1648 .or_default();
1649 let sender_cap = RELIABLE_MAX_END_TO_END_PENDING.max(1);
1650 if acked.len() < sender_cap || acked.contains(&sender_hash) {
1651 acked.insert(sender_hash);
1652 }
1653 }
1654
1655 #[inline]
1656 fn reliable_key(side: RelaySideId, ty: crate::DataType) -> (RelaySideId, u32) {
1657 (side, ty.as_u32())
1658 }
1659
1660 fn reliable_tx_state_mut<'a>(
1661 &'a self,
1662 st: &'a mut RelayInner,
1663 side: RelaySideId,
1664 ty: crate::DataType,
1665 ) -> &'a mut ReliableTxState {
1666 let key = Self::reliable_key(side, ty);
1667 st.reliable_tx
1668 .entry(key)
1669 .or_insert_with(|| ReliableTxState {
1670 next_seq: 1,
1671 sent_order: VecDeque::new(),
1672 sent: BTreeMap::new(),
1673 })
1674 }
1675
1676 fn reliable_rx_state_mut<'a>(
1677 &'a self,
1678 st: &'a mut RelayInner,
1679 side: RelaySideId,
1680 ty: crate::DataType,
1681 ) -> &'a mut ReliableRxState {
1682 let key = Self::reliable_key(side, ty);
1683 st.reliable_rx
1684 .entry(key)
1685 .or_insert_with(|| ReliableRxState {
1686 expected_seq: 1,
1687 buffered: BTreeMap::new(),
1688 })
1689 }
1690
1691 fn handle_reliable_ack(&self, side: RelaySideId, ty: crate::DataType, ack: u32) {
1692 let mut st = self.state.lock();
1693 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1694 if matches!(reliable_mode(ty), crate::ReliableMode::Unordered) {
1695 tx_state.sent.remove(&ack);
1696 tx_state.sent_order.retain(|seq| *seq != ack);
1697 return;
1698 }
1699
1700 while let Some(seq) = tx_state.sent_order.front().copied() {
1701 if seq > ack {
1702 break;
1703 }
1704 tx_state.sent_order.pop_front();
1705 tx_state.sent.remove(&seq);
1706 }
1707 }
1708
1709 fn handle_reliable_partial_ack(&self, side: RelaySideId, ty: crate::DataType, seq: u32) {
1710 let mut st = self.state.lock();
1711 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1712 if let Some(sent) = tx_state.sent.get_mut(&seq) {
1713 sent.partial_acked = true;
1714 }
1715 }
1716
1717 fn reliable_control_packet(
1718 &self,
1719 control_ty: crate::DataType,
1720 ty: crate::DataType,
1721 seq: u32,
1722 ) -> TelemetryResult<Packet> {
1723 let sender = self.sender_arc();
1724 Packet::new(
1725 control_ty,
1726 message_meta(control_ty).endpoints,
1727 sender.as_ref(),
1728 self.clock.now_ms(),
1729 crate::router::encode_slice_le(&[ty.as_u32(), seq]),
1730 )
1731 }
1732
1733 fn queue_reliable_ack(
1734 &self,
1735 side: RelaySideId,
1736 ty: crate::DataType,
1737 seq: u32,
1738 ) -> TelemetryResult<()> {
1739 let pkt = self.reliable_control_packet(crate::DataType::ReliableAck, ty, seq)?;
1740 let data = RelayItem::Packet(Arc::new(pkt));
1741 let priority = Self::relay_item_priority(&data)?;
1742 let mut st = self.state.lock();
1743 st.push_tx(RelayTxItem {
1744 src: None,
1745 dst: side,
1746 data,
1747 priority,
1748 })?;
1749 Ok(())
1750 }
1751
1752 fn queue_reliable_packet_request(
1753 &self,
1754 side: RelaySideId,
1755 ty: crate::DataType,
1756 seq: u32,
1757 ) -> TelemetryResult<()> {
1758 let pkt = self.reliable_control_packet(crate::DataType::ReliablePacketRequest, ty, seq)?;
1759 let data = RelayItem::Packet(Arc::new(pkt));
1760 let priority = Self::relay_item_priority(&data)?;
1761 let mut st = self.state.lock();
1762 st.push_tx(RelayTxItem {
1763 src: None,
1764 dst: side,
1765 data,
1766 priority,
1767 })?;
1768 Ok(())
1769 }
1770
1771 fn queue_reliable_partial_ack(
1772 &self,
1773 side: RelaySideId,
1774 ty: crate::DataType,
1775 seq: u32,
1776 ) -> TelemetryResult<()> {
1777 let pkt = self.reliable_control_packet(crate::DataType::ReliablePartialAck, ty, seq)?;
1778 let data = RelayItem::Packet(Arc::new(pkt));
1779 let priority = Self::relay_item_priority(&data)?;
1780 let mut st = self.state.lock();
1781 st.push_tx(RelayTxItem {
1782 src: None,
1783 dst: side,
1784 data,
1785 priority,
1786 })?;
1787 Ok(())
1788 }
1789
1790 fn queue_reliable_retransmit(
1791 &self,
1792 side: RelaySideId,
1793 ty: crate::DataType,
1794 seq: u32,
1795 ) -> TelemetryResult<()> {
1796 let mut queued = None;
1797 {
1798 let mut st = self.state.lock();
1799 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1800 if let Some(sent) = tx_state.sent.get_mut(&seq)
1801 && !sent.queued
1802 {
1803 sent.queued = true;
1804 sent.partial_acked = false;
1805 queued = Some(sent.bytes.clone());
1806 }
1807 }
1808
1809 if let Some(bytes) = queued {
1810 let mut st = self.state.lock();
1811 st.push_replay(RelayReplayItem {
1812 dst: side,
1813 bytes,
1814 priority: message_priority(ty).saturating_add(16),
1815 })?;
1816 }
1817 Ok(())
1818 }
1819
1820 fn send_reliable_raw_to_side(
1821 &self,
1822 side: RelaySideId,
1823 bytes: Arc<[u8]>,
1824 ) -> TelemetryResult<()> {
1825 let (handler, opts) = {
1826 let st = self.state.lock();
1827 let side_ref = Self::side_ref(&st, side)?;
1828 if !side_ref.opts.egress_enabled {
1829 return Ok(());
1830 }
1831 (side_ref.tx_handler.clone(), side_ref.opts)
1832 };
1833
1834 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1835 return Err(TelemetryError::Io("side tx busy"));
1836 };
1837 let started_ms = self.clock.now_ms();
1838 let ty = wire_format::peek_envelope(bytes.as_ref())
1839 .map(|env| env.ty)
1840 .unwrap_or(crate::DataType::ReliableAck);
1841 let result = match handler {
1842 RelayTxHandlerFn::Packed(f) => {
1843 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1844 let mut sent_bytes = 0usize;
1845 for frame in frames {
1846 f(frame.as_ref())?;
1847 sent_bytes = sent_bytes.saturating_add(frame.len());
1848 }
1849 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1850 self.note_side_tx_success(side, ty, sent_bytes, 1);
1851 return Ok(());
1852 }
1853 RelayTxHandlerFn::Packet(f) => {
1854 if wire_format::peek_frame_info(bytes.as_ref())
1855 .ok()
1856 .is_some_and(|frame| frame.ack_only())
1857 {
1858 return Ok(());
1859 }
1860 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1861 f(&pkt)
1862 }
1863 };
1864 if result.is_ok() {
1865 self.record_side_tx_sample(side, bytes.len(), started_ms, self.clock.now_ms());
1866 self.note_side_tx_success(side, ty, bytes.len(), 1);
1867 } else {
1868 self.note_side_tx_failure(side, ty, 1);
1869 }
1870 result
1871 }
1872
1873 fn send_reliable_to_side(&self, side: RelaySideId, data: RelayItem) -> TelemetryResult<()> {
1874 let (handler, opts, hop_reliable_enabled) = {
1875 let st = self.state.lock();
1876 let side_ref = Self::side_ref(&st, side)?;
1877 let opts = side_ref.opts;
1878 let hop_reliable_enabled = opts.reliable_enabled
1879 && !self.side_has_multiple_announcers_locked(&st, side, self.clock.now_ms());
1880 (side_ref.tx_handler.clone(), opts, hop_reliable_enabled)
1881 };
1882
1883 let RelayTxHandlerFn::Packed(f) = &handler else {
1884 return self.call_tx_handler(side, &handler, &data);
1885 };
1886
1887 if !hop_reliable_enabled {
1888 let mut adjusted_opts = opts;
1889 adjusted_opts.reliable_enabled = false;
1890 if let Some(adjusted) = self.adjust_reliable_for_side(adjusted_opts, data)? {
1891 return self.call_tx_handler(side, &handler, &adjusted);
1892 }
1893 return Ok(());
1894 }
1895
1896 let ty = match &data {
1897 RelayItem::Packet(pkt) => pkt.data_type(),
1898 RelayItem::Packed(bytes) => {
1899 let Ok(frame) = wire_format::peek_frame_info(bytes.as_ref()) else {
1900 return self.call_tx_handler(side, &handler, &data);
1901 };
1902 frame.envelope.ty
1903 }
1904 };
1905
1906 if !is_reliable_type(ty) {
1907 if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1908 self.call_tx_handler(side, &handler, &adjusted)?;
1909 }
1910 return Ok(());
1911 }
1912
1913 let (seq, flags) = {
1914 let mut st = self.state.lock();
1915 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1916 if tx_state.sent.len() >= RELIABLE_MAX_PENDING {
1917 return Err(TelemetryError::PacketTooLarge(
1918 "relay reliable history full",
1919 ));
1920 }
1921 let seq = tx_state.next_seq;
1922 let next = tx_state.next_seq.wrapping_add(1);
1923 tx_state.next_seq = if next == 0 { 1 } else { next };
1924 let flags = match reliable_mode(ty) {
1925 crate::ReliableMode::Unordered => wire_format::RELIABLE_FLAG_UNORDERED,
1926 _ => 0,
1927 };
1928 (seq, flags)
1929 };
1930
1931 let bytes: Arc<[u8]> = match data {
1932 RelayItem::Packet(pkt) => wire_format::pack_packet_with_reliable(
1933 &pkt,
1934 wire_format::ReliableHeader { flags, seq, ack: 0 },
1935 ),
1936 RelayItem::Packed(bytes) => {
1937 let Some(rewritten) =
1938 wire_format::rewrite_reliable_header_owned(bytes.as_ref(), flags, seq, 0)?
1939 else {
1940 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1941 return Err(TelemetryError::Io("side tx busy"));
1942 };
1943 let started_ms = self.clock.now_ms();
1944 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1945 let mut sent_bytes = 0usize;
1946 for frame in frames {
1947 f(frame.as_ref())?;
1948 sent_bytes = sent_bytes.saturating_add(frame.len());
1949 }
1950 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1951 self.note_side_tx_success(side, ty, sent_bytes, 1);
1952 return Ok(());
1953 };
1954 rewritten
1955 }
1956 };
1957
1958 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1959 return Err(TelemetryError::Io("side tx busy"));
1960 };
1961 let started_ms = self.clock.now_ms();
1962 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1963 let mut sent_bytes = 0usize;
1964 for frame in frames {
1965 f(frame.as_ref())?;
1966 sent_bytes = sent_bytes.saturating_add(frame.len());
1967 }
1968 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1969 self.note_side_tx_success(side, ty, sent_bytes, 1);
1970
1971 {
1972 let mut st = self.state.lock();
1973 let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1974 tx_state.sent_order.push_back(seq);
1975 tx_state.sent.insert(
1976 seq,
1977 ReliableSent {
1978 bytes: bytes.clone(),
1979 last_send_ms: self.clock.now_ms(),
1980 retries: 0,
1981 queued: false,
1982 partial_acked: false,
1983 },
1984 );
1985 }
1986
1987 Ok(())
1988 }
1989
1990 fn item_route_info(
1991 &self,
1992 data: &RelayItem,
1993 ) -> TelemetryResult<(Vec<crate::DataEndpoint>, crate::DataType)> {
1994 match data {
1995 RelayItem::Packet(pkt) => {
1996 let mut eps = pkt.endpoints().to_vec();
1997 eps.sort_unstable();
1998 eps.dedup();
1999 Ok((eps, pkt.data_type()))
2000 }
2001 RelayItem::Packed(bytes) => {
2002 let env = wire_format::peek_envelope(bytes.as_ref())?;
2003 let mut eps: Vec<crate::DataEndpoint> = env.endpoints.iter().copied().collect();
2004 eps.sort_unstable();
2005 eps.dedup();
2006 Ok((eps, env.ty))
2007 }
2008 }
2009 }
2010
2011 fn endpoints_are_link_local_only(eps: &[crate::DataEndpoint]) -> bool {
2012 !eps.is_empty() && eps.iter().all(|ep| ep.is_link_local_only())
2013 }
2014
2015 fn item_target_senders(&self, data: &RelayItem) -> TelemetryResult<Arc<[u64]>> {
2016 match data {
2017 RelayItem::Packet(pkt) => Ok(Arc::from(pkt.wire_target_senders())),
2018 RelayItem::Packed(bytes) => {
2019 Ok(wire_format::peek_envelope(bytes.as_ref())?.target_senders)
2020 }
2021 }
2022 }
2023
2024 #[cfg(feature = "discovery")]
2025 fn has_explicit_route_policy_locked(
2026 st: &RelayInner,
2027 src: Option<RelaySideId>,
2028 ty: crate::DataType,
2029 ) -> bool {
2030 st.route_overrides
2031 .keys()
2032 .any(|(route_src, _)| *route_src == src)
2033 || Self::has_typed_route_overrides_locked(st, src, ty)
2034 }
2035
2036 #[cfg(feature = "discovery")]
2037 fn side_matches_target_senders_locked(
2038 st: &RelayInner,
2039 side: RelaySideId,
2040 target_senders: &[u64],
2041 now_ms: u64,
2042 ) -> bool {
2043 st.discovery_routes
2044 .get(&side)
2045 .map(|route| {
2046 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2047 return false;
2048 }
2049 route.announcers.values().any(|sender_state| {
2050 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2051 return false;
2052 }
2053 sender_state
2054 .topology_boards
2055 .iter()
2056 .any(|board| target_senders.contains(&Self::sender_hash(&board.sender_id)))
2057 })
2058 })
2059 .unwrap_or(false)
2060 }
2061
2062 fn remote_side_plan(
2063 &self,
2064 data: &RelayItem,
2065 exclude: RelaySideId,
2066 ) -> TelemetryResult<RemoteSidePlan> {
2067 #[cfg(feature = "discovery")]
2068 {
2069 let (eps, ty) = self.item_route_info(data)?;
2070 let target_senders = self.item_target_senders(data)?;
2071 let preferred_packet_id = Self::reliable_control_target_packet_id(data)?;
2072 if discovery::is_discovery_type(ty) {
2073 let mut st = self.state.lock();
2074 let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2075 return Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2076 &mut st,
2077 Some(exclude),
2078 sides,
2079 RouteSelectionOrigin::Flood,
2080 )));
2081 }
2082
2083 #[cfg(feature = "timesync")]
2084 let preferred_timesync_source = self.preferred_timesync_route_source(data, ty)?;
2085 #[cfg(not(feature = "timesync"))]
2086 let preferred_timesync_source: Option<String> = None;
2087 let mut st = self.state.lock();
2088 if let Some(packet_id) = preferred_packet_id {
2089 let target_side = self.allowed_target_side_locked(
2090 &st,
2091 exclude,
2092 ty,
2093 st.reliable_return_routes
2094 .get(&packet_id)
2095 .map(|route| route.side),
2096 );
2097 if let Some(side) = target_side {
2098 #[cfg(feature = "timesync")]
2099 if !Self::timesync_allowed_for_side_locked(
2100 &mut st,
2101 side,
2102 ty,
2103 self.clock.now_ms(),
2104 ) {
2105 return Ok(RemoteSidePlan::Target(Vec::new()));
2106 }
2107 return Ok(RemoteSidePlan::Target(vec![side]));
2108 }
2109 return Ok(RemoteSidePlan::Target(Vec::new()));
2110 }
2111 let restrict_link_local = Self::endpoints_are_link_local_only(&eps);
2112 let discovered_origin = if is_reliable_type(ty) {
2113 RouteSelectionOrigin::Flood
2114 } else {
2115 RouteSelectionOrigin::Discovered
2116 };
2117 if st.discovery_routes.is_empty() {
2118 let mut fallback = self.eligible_side_ids_locked(
2119 &st,
2120 Some(exclude),
2121 Some(ty),
2122 restrict_link_local,
2123 );
2124 #[cfg(feature = "timesync")]
2125 {
2126 fallback = Self::filter_timesync_sides_locked(
2127 &mut st,
2128 ty,
2129 self.clock.now_ms(),
2130 fallback,
2131 );
2132 }
2133 return Ok(RemoteSidePlan::Target(if fallback.len() == 1 {
2134 fallback
2135 } else {
2136 Vec::new()
2137 }));
2138 }
2139 let now_ms = self.clock.now_ms();
2140 let mut had_exact = false;
2141 let mut exact_targets = Vec::new();
2142 let mut had_known = false;
2143 let mut generic_targets = Vec::new();
2144
2145 for (&side, route) in st.discovery_routes.iter() {
2146 if side == exclude
2147 || now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS
2148 {
2149 continue;
2150 }
2151 if restrict_link_local
2152 && st
2153 .sides
2154 .get(side)
2155 .and_then(|side| side.as_ref())
2156 .map(|s| !s.opts.link_local_enabled)
2157 .unwrap_or(true)
2158 {
2159 continue;
2160 }
2161 if !self.route_allowed_locked(&st, Some(exclude), Some(ty), side) {
2162 continue;
2163 }
2164 if !target_senders.is_empty() {
2165 if !Self::side_matches_target_senders_locked(&st, side, &target_senders, now_ms)
2166 {
2167 continue;
2168 }
2169 had_known = true;
2170 generic_targets.push(side);
2171 continue;
2172 }
2173 if preferred_timesync_source.as_deref().is_some_and(|source| {
2174 route.reachable_timesync_sources.iter().any(|s| s == source)
2175 }) {
2176 had_exact = true;
2177 exact_targets.push(side);
2178 continue;
2179 }
2180 if eps.iter().copied().any(|ep| route.reachable.contains(&ep)) {
2181 had_known = true;
2182 generic_targets.push(side);
2183 }
2184 }
2185
2186 if had_exact {
2187 #[cfg(feature = "timesync")]
2188 {
2189 exact_targets = Self::filter_timesync_sides_locked(
2190 &mut st,
2191 ty,
2192 self.clock.now_ms(),
2193 exact_targets,
2194 );
2195 }
2196 let targets = self.filter_end_to_end_satisfied_sides_locked(
2197 &st,
2198 data,
2199 exact_targets,
2200 &eps,
2201 ty,
2202 )?;
2203 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2204 &mut st,
2205 Some(exclude),
2206 targets,
2207 discovered_origin,
2208 )))
2209 } else if had_known {
2210 #[cfg(feature = "timesync")]
2211 {
2212 generic_targets = Self::filter_timesync_sides_locked(
2213 &mut st,
2214 ty,
2215 self.clock.now_ms(),
2216 generic_targets,
2217 );
2218 }
2219 let targets = self.filter_end_to_end_satisfied_sides_locked(
2220 &st,
2221 data,
2222 generic_targets,
2223 &eps,
2224 ty,
2225 )?;
2226 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2227 &mut st,
2228 Some(exclude),
2229 targets,
2230 discovered_origin,
2231 )))
2232 } else {
2233 if Self::has_explicit_route_policy_locked(&st, Some(exclude), ty) {
2234 let mut sides = self.eligible_side_ids_locked(
2235 &st,
2236 Some(exclude),
2237 Some(ty),
2238 restrict_link_local,
2239 );
2240 #[cfg(feature = "timesync")]
2241 {
2242 sides = Self::filter_timesync_sides_locked(
2243 &mut st,
2244 ty,
2245 self.clock.now_ms(),
2246 sides,
2247 );
2248 }
2249 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2250 &mut st,
2251 Some(exclude),
2252 sides,
2253 RouteSelectionOrigin::Flood,
2254 )))
2255 } else {
2256 Ok(RemoteSidePlan::Target(Vec::new()))
2257 }
2258 }
2259 }
2260 #[cfg(not(feature = "discovery"))]
2261 {
2262 let (_, ty) = self.item_route_info(data)?;
2263 let mut st = self.state.lock();
2264 if let Some(packet_id) = Self::reliable_control_target_packet_id(data)? {
2265 let target_side = self.allowed_target_side_locked(
2266 &st,
2267 exclude,
2268 ty,
2269 st.reliable_return_routes
2270 .get(&packet_id)
2271 .map(|route| route.side),
2272 );
2273 if let Some(side) = target_side {
2274 return Ok(RemoteSidePlan::Target(vec![side]));
2275 }
2276 return Ok(RemoteSidePlan::Target(Vec::new()));
2277 }
2278 let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2279 Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2280 &mut st,
2281 Some(exclude),
2282 sides,
2283 RouteSelectionOrigin::Flood,
2284 )))
2285 }
2286 }
2287
2288 #[inline]
2289 fn allowed_target_side_locked(
2290 &self,
2291 st: &RelayInner,
2292 exclude: RelaySideId,
2293 ty: crate::DataType,
2294 target_side: Option<RelaySideId>,
2295 ) -> Option<RelaySideId> {
2296 target_side.filter(|side| self.route_allowed_locked(st, Some(exclude), Some(ty), *side))
2297 }
2298
2299 fn filter_end_to_end_satisfied_sides_locked(
2300 &self,
2301 st: &RelayInner,
2302 data: &RelayItem,
2303 sides: Vec<RelaySideId>,
2304 eps: &[crate::DataEndpoint],
2305 ty: crate::DataType,
2306 ) -> TelemetryResult<Vec<RelaySideId>> {
2307 if !is_reliable_type(ty) || Self::reliable_control_target_packet_id(data)?.is_some() {
2308 return Ok(sides);
2309 }
2310 let packet_id = match data {
2311 RelayItem::Packet(pkt) => pkt.packet_id(),
2312 RelayItem::Packed(bytes) => match wire_format::packet_id_from_wire(bytes.as_ref()) {
2313 Ok(packet_id) => packet_id,
2314 Err(TelemetryError::Unpack("reliable control frame")) => return Ok(sides),
2315 Err(err) => return Err(err),
2316 },
2317 };
2318 let Some(acked) = st.end_to_end_acked_destinations.get(&packet_id) else {
2319 return Ok(sides);
2320 };
2321 let now_ms = self.clock.now_ms();
2322 let mut filtered = Vec::new();
2323 for side in sides {
2324 let Some(route) = st.discovery_routes.get(&side) else {
2325 filtered.push(side);
2326 continue;
2327 };
2328 let mut still_pending = false;
2329 let mut had_destination_board = false;
2330 for sender_state in route.announcers.values() {
2331 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2332 continue;
2333 }
2334 for board in sender_state.topology_boards.iter() {
2335 if !self.is_end_to_end_destination_sender(&board.sender_id) {
2336 continue;
2337 }
2338 had_destination_board = true;
2339 let sender_hash = Self::sender_hash(&board.sender_id);
2340 if acked.contains(&sender_hash) {
2341 continue;
2342 }
2343 if eps
2344 .iter()
2345 .copied()
2346 .any(|ep| board.reachable_endpoints.contains(&ep))
2347 {
2348 still_pending = true;
2349 break;
2350 }
2351 still_pending = true;
2354 break;
2355 }
2356 if still_pending {
2357 break;
2358 }
2359 }
2360 if still_pending || !had_destination_board {
2361 filtered.push(side);
2362 }
2363 }
2364 Ok(filtered)
2365 }
2366
2367 #[cfg(feature = "discovery")]
2368 fn side_has_multiple_announcers_locked(
2369 &self,
2370 st: &RelayInner,
2371 side: RelaySideId,
2372 now_ms: u64,
2373 ) -> bool {
2374 st.discovery_routes
2375 .get(&side)
2376 .map(|route| {
2377 route
2378 .announcers
2379 .values()
2380 .filter(|sender| {
2381 now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2382 })
2383 .take(2)
2384 .count()
2385 > 1
2386 })
2387 .unwrap_or(false)
2388 }
2389
2390 #[cfg(not(feature = "discovery"))]
2391 fn side_has_multiple_announcers_locked(
2392 &self,
2393 _st: &RelayInner,
2394 _side: RelaySideId,
2395 _now_ms: u64,
2396 ) -> bool {
2397 false
2398 }
2399
2400 #[cfg(feature = "discovery")]
2401 fn sender_topology_board_mut<'a>(
2402 sender_state: &'a mut DiscoverySenderState,
2403 sender_id: &str,
2404 ) -> &'a mut TopologyBoardNode {
2405 if let Some(idx) = sender_state
2406 .topology_boards
2407 .iter()
2408 .position(|board| board.sender_id == sender_id)
2409 {
2410 return &mut sender_state.topology_boards[idx];
2411 }
2412 sender_state.topology_boards.push(TopologyBoardNode {
2413 sender_id: sender_id.to_string(),
2414 reachable_endpoints: Vec::new(),
2415 reachable_timesync_sources: Vec::new(),
2416 connections: Vec::new(),
2417 });
2418 sender_state
2419 .topology_boards
2420 .last_mut()
2421 .expect("board inserted above")
2422 }
2423
2424 #[cfg(feature = "discovery")]
2425 fn refresh_sender_topology_state(sender_state: &mut DiscoverySenderState) {
2426 discovery::normalize_topology_boards(&mut sender_state.topology_boards);
2427 let (reachable, reachable_timesync_sources) =
2428 discovery::summarize_topology_boards(&sender_state.topology_boards);
2429 sender_state.reachable = reachable;
2430 sender_state.reachable_timesync_sources = reachable_timesync_sources;
2431 }
2432
2433 #[cfg(feature = "discovery")]
2434 fn recompute_discovery_side_state(route: &mut DiscoverySideState) {
2435 let mut reachable = Vec::new();
2436 let mut reachable_timesync_sources = Vec::new();
2437 let mut last_seen_ms = 0u64;
2438 for sender in route.announcers.values() {
2439 reachable.extend(sender.reachable.iter().copied());
2440 reachable_timesync_sources.extend(sender.reachable_timesync_sources.iter().cloned());
2441 last_seen_ms = last_seen_ms.max(sender.last_seen_ms);
2442 }
2443 reachable.sort_unstable();
2444 reachable.dedup();
2445 reachable_timesync_sources.sort_unstable();
2446 reachable_timesync_sources.dedup();
2447 route.reachable = reachable;
2448 route.reachable_timesync_sources = reachable_timesync_sources;
2449 route.last_seen_ms = last_seen_ms;
2450 }
2451
2452 #[cfg(feature = "discovery")]
2453 fn local_discovery_topology_board(&self, st: &RelayInner, now_ms: u64) -> TopologyBoardNode {
2454 let mut connections = Vec::new();
2455 for route in st.discovery_routes.values() {
2456 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2457 continue;
2458 }
2459 for (sender, sender_state) in route.announcers.iter() {
2460 if now_ms.saturating_sub(sender_state.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS {
2461 connections.push(sender.clone());
2462 }
2463 }
2464 }
2465 connections.sort_unstable();
2466 connections.dedup();
2467 let sender = self.sender_arc();
2468 TopologyBoardNode {
2469 sender_id: sender.to_string(),
2470 reachable_endpoints: Vec::new(),
2471 reachable_timesync_sources: Vec::new(),
2472 connections,
2473 }
2474 }
2475
2476 #[cfg(feature = "discovery")]
2477 fn advertised_discovery_topology_for_link_locked(
2478 &self,
2479 st: &RelayInner,
2480 now_ms: u64,
2481 link_local_enabled: bool,
2482 ) -> Vec<TopologyBoardNode> {
2483 let mut boards = vec![self.local_discovery_topology_board(st, now_ms)];
2484 for route in st.discovery_routes.values() {
2485 if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2486 continue;
2487 }
2488 for (announcer, sender_state) in route.announcers.iter() {
2489 if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2490 continue;
2491 }
2492 let mut sender_boards = sender_state.topology_boards.clone();
2493 if sender_boards.is_empty() {
2494 let sender = self.sender_arc();
2495 sender_boards.push(TopologyBoardNode {
2496 sender_id: announcer.clone(),
2497 reachable_endpoints: sender_state.reachable.clone(),
2498 reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
2499 connections: vec![sender.to_string()],
2500 });
2501 } else if let Some(board) = sender_boards
2502 .iter_mut()
2503 .find(|board| board.sender_id == *announcer)
2504 {
2505 board.connections.push(self.sender_arc().to_string());
2506 }
2507 if !link_local_enabled {
2508 for board in sender_boards.iter_mut() {
2509 board
2510 .reachable_endpoints
2511 .retain(|ep| !ep.is_link_local_only());
2512 }
2513 }
2514 discovery::merge_topology_boards(&mut boards, &sender_boards);
2515 }
2516 }
2517 discovery::normalize_topology_boards(&mut boards);
2518 boards
2519 }
2520
2521 #[cfg(feature = "discovery")]
2522 fn note_discovery_topology_change_locked(st: &mut RelayInner, now_ms: u64) {
2523 st.discovery_cadence.on_topology_change(now_ms);
2524 }
2525
2526 #[cfg(feature = "discovery")]
2527 fn prune_discovery_routes_locked(st: &mut RelayInner, now_ms: u64) -> bool {
2528 let before = st.discovery_routes.clone();
2529 st.discovery_routes.retain(|_, route| {
2530 route.announcers.retain(|_, sender| {
2531 now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2532 });
2533 Self::recompute_discovery_side_state(route);
2534 !route.announcers.is_empty()
2535 });
2536 st.discovery_routes != before
2537 }
2538
2539 #[cfg(feature = "discovery")]
2540 fn reconcile_end_to_end_acked_destinations_locked(&self, st: &mut RelayInner) {
2541 let mut active_senders = BTreeSet::new();
2542 for route in st.discovery_routes.values() {
2543 for sender_state in route.announcers.values() {
2544 for board in sender_state.topology_boards.iter() {
2545 if self.is_end_to_end_destination_sender(&board.sender_id) {
2546 active_senders.insert(Self::sender_hash(&board.sender_id));
2547 }
2548 }
2549 }
2550 }
2551 st.end_to_end_acked_destinations.retain(|_, acked| {
2552 acked.retain(|sender_hash| active_senders.contains(sender_hash));
2553 !acked.is_empty()
2554 });
2555 }
2556
2557 #[cfg(feature = "discovery")]
2558 fn advertised_discovery_endpoints_for_link_locked(
2559 &self,
2560 st: &RelayInner,
2561 now_ms: u64,
2562 link_local_enabled: bool,
2563 ) -> Vec<crate::DataEndpoint> {
2564 let (reachable_endpoints, _) = discovery::summarize_topology_boards(
2565 &self.advertised_discovery_topology_for_link_locked(st, now_ms, link_local_enabled),
2566 );
2567 reachable_endpoints
2568 .into_iter()
2569 .filter(|ep| {
2570 !discovery::is_discovery_endpoint(*ep)
2571 && (link_local_enabled || !ep.is_link_local_only())
2572 })
2573 .collect()
2574 }
2575
2576 #[cfg(feature = "discovery")]
2577 fn advertised_discovery_timesync_sources_for_link_locked(
2578 &self,
2579 st: &RelayInner,
2580 now_ms: u64,
2581 ) -> Vec<String> {
2582 let (_, sources) = discovery::summarize_topology_boards(
2583 &self.advertised_discovery_topology_for_link_locked(st, now_ms, true),
2584 );
2585 sources
2586 }
2587
2588 #[cfg(feature = "discovery")]
2589 #[cfg(feature = "timesync")]
2590 fn preferred_timesync_route_source(
2591 &self,
2592 data: &RelayItem,
2593 ty: crate::DataType,
2594 ) -> TelemetryResult<Option<String>> {
2595 if !matches!(
2596 ty,
2597 crate::DataType::TimeSyncAnnounce | crate::DataType::TimeSyncResponse
2598 ) {
2599 return Ok(None);
2600 }
2601
2602 let sender = match data {
2603 RelayItem::Packet(pkt) => pkt.sender().to_owned(),
2604 RelayItem::Packed(bytes) => {
2605 if wire_format::peek_frame_info(bytes.as_ref())
2606 .ok()
2607 .is_some_and(|frame| frame.ack_only())
2608 {
2609 return Ok(None);
2610 }
2611 wire_format::unpack_packet(bytes.as_ref())?
2612 .sender()
2613 .to_owned()
2614 }
2615 };
2616 Ok(Some(sender))
2617 }
2618
2619 #[cfg(feature = "discovery")]
2620 #[inline]
2621 fn side_is_slow_control_link_locked(
2622 st: &RelayInner,
2623 side_id: RelaySideId,
2624 now_ms: u64,
2625 ) -> bool {
2626 st.adaptive_route_stats.get(&side_id).is_some_and(|stats| {
2627 let recent_slow = stats.last_slow_observed_ms > 0
2628 && now_ms.saturating_sub(stats.last_slow_observed_ms)
2629 <= DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS;
2630 stats.sample_count > 0
2631 && ((stats.estimated_bandwidth_bps > 0
2632 && stats.estimated_bandwidth_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS)
2633 || recent_slow)
2634 })
2635 }
2636
2637 #[cfg(feature = "discovery")]
2638 fn discovery_level_for_side_locked(
2639 st: &mut RelayInner,
2640 side_id: RelaySideId,
2641 now_ms: u64,
2642 ) -> Option<DiscoveryAdvertiseLevel> {
2643 if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2644 st.discovery_side_throttle.remove(&side_id);
2645 return Some(DiscoveryAdvertiseLevel::Full);
2646 }
2647
2648 let throttle = st.discovery_side_throttle.entry(side_id).or_default();
2649 if now_ms >= throttle.next_full_ms {
2650 throttle.next_full_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS);
2651 throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2652 return Some(DiscoveryAdvertiseLevel::Full);
2653 }
2654 if now_ms >= throttle.next_ping_ms {
2655 throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2656 return Some(DiscoveryAdvertiseLevel::MinimalPing);
2657 }
2658 None
2659 }
2660
2661 #[cfg(all(feature = "discovery", feature = "timesync"))]
2662 #[inline]
2663 fn is_timesync_type(ty: crate::DataType) -> bool {
2664 matches!(
2665 ty,
2666 crate::DataType::TimeSyncAnnounce
2667 | crate::DataType::TimeSyncRequest
2668 | crate::DataType::TimeSyncResponse
2669 )
2670 }
2671
2672 #[cfg(all(feature = "discovery", feature = "timesync"))]
2673 fn timesync_allowed_for_side_locked(
2674 st: &mut RelayInner,
2675 side_id: RelaySideId,
2676 ty: crate::DataType,
2677 now_ms: u64,
2678 ) -> bool {
2679 if !Self::is_timesync_type(ty) {
2680 return true;
2681 }
2682 if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2683 st.timesync_side_throttle.remove(&side_id);
2684 return true;
2685 }
2686
2687 let throttle = st.timesync_side_throttle.entry(side_id).or_default();
2688 if now_ms >= throttle.next_allowed_ms {
2689 throttle.next_allowed_ms = now_ms.saturating_add(TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS);
2690 return true;
2691 }
2692 false
2693 }
2694
2695 #[cfg(all(feature = "discovery", feature = "timesync"))]
2696 fn filter_timesync_sides_locked(
2697 st: &mut RelayInner,
2698 ty: crate::DataType,
2699 now_ms: u64,
2700 sides: Vec<RelaySideId>,
2701 ) -> Vec<RelaySideId> {
2702 sides
2703 .into_iter()
2704 .filter(|side| Self::timesync_allowed_for_side_locked(st, *side, ty, now_ms))
2705 .collect()
2706 }
2707
2708 #[cfg(feature = "discovery")]
2709 fn queue_discovery_announce(&self) -> TelemetryResult<()> {
2710 let now_ms = self.clock.now_ms();
2711 let per_side = {
2712 let mut st = self.state.lock();
2713 if Self::prune_discovery_routes_locked(&mut st, now_ms) {
2714 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2715 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2716 }
2717 st.fit_discovery_budget();
2718 if !st.sides.iter().any(|side| side.is_some()) {
2719 return Ok(());
2720 }
2721 st.discovery_cadence.on_announce_sent(now_ms);
2722 let side_entries = st
2723 .sides
2724 .iter()
2725 .enumerate()
2726 .filter_map(|(side_id, side)| {
2727 side.as_ref()
2728 .map(|side| (side_id, side.opts.link_local_enabled, side.opts))
2729 })
2730 .collect::<Vec<_>>();
2731 let mut per_side = Vec::new();
2732 for (side_id, link_local_enabled, opts) in side_entries {
2733 if !self.route_allowed_locked(
2734 &st,
2735 None,
2736 Some(crate::DataType::DiscoveryAnnounce),
2737 side_id,
2738 ) {
2739 continue;
2740 }
2741 let Some(level) = Self::discovery_level_for_side_locked(&mut st, side_id, now_ms)
2742 else {
2743 continue;
2744 };
2745 let capabilities = opts.link_capabilities();
2746 if level == DiscoveryAdvertiseLevel::MinimalPing {
2747 per_side.push((
2748 side_id,
2749 level,
2750 Vec::new(),
2751 Vec::new(),
2752 Vec::new(),
2753 capabilities,
2754 ));
2755 continue;
2756 }
2757 let endpoints = self.advertised_discovery_endpoints_for_link_locked(
2758 &st,
2759 now_ms,
2760 link_local_enabled,
2761 );
2762 let timesync_sources =
2763 self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
2764 let topology = self.advertised_discovery_topology_for_link_locked(
2765 &st,
2766 now_ms,
2767 link_local_enabled,
2768 );
2769 per_side.push((
2770 side_id,
2771 level,
2772 endpoints,
2773 timesync_sources,
2774 topology,
2775 capabilities,
2776 ));
2777 }
2778 per_side
2779 };
2780 let mut st = self.state.lock();
2781 for (dst, level, endpoints, timesync_sources, topology, capabilities) in per_side {
2782 let sender = self.sender_arc();
2783 if level == DiscoveryAdvertiseLevel::Full {
2784 let pkt = discovery::build_discovery_schema(sender.as_ref(), now_ms)?;
2785 let data = RelayItem::Packet(Arc::new(pkt));
2786 let priority = Self::relay_item_priority(&data)?;
2787 st.push_tx(RelayTxItem {
2788 src: None,
2789 dst,
2790 data,
2791 priority,
2792 })?;
2793 }
2794 if level == DiscoveryAdvertiseLevel::Full {
2795 let pkt = discovery::build_discovery_link_capabilities(
2796 sender.as_ref(),
2797 now_ms,
2798 capabilities,
2799 )?;
2800 let data = RelayItem::Packet(Arc::new(pkt));
2801 let priority = Self::relay_item_priority(&data)?;
2802 st.push_tx(RelayTxItem {
2803 src: None,
2804 dst,
2805 data,
2806 priority,
2807 })?;
2808 }
2809 if level == DiscoveryAdvertiseLevel::MinimalPing || !endpoints.is_empty() {
2810 let pkt = discovery::build_discovery_announce(
2811 sender.as_ref(),
2812 now_ms,
2813 endpoints.as_slice(),
2814 )?;
2815 let data = RelayItem::Packet(Arc::new(pkt));
2816 let priority = Self::relay_item_priority(&data)?;
2817 st.push_tx(RelayTxItem {
2818 src: None,
2819 dst,
2820 data,
2821 priority,
2822 })?;
2823 }
2824 if level == DiscoveryAdvertiseLevel::Full && !timesync_sources.is_empty() {
2825 let pkt = discovery::build_discovery_timesync_sources(
2826 sender.as_ref(),
2827 now_ms,
2828 timesync_sources.as_slice(),
2829 )?;
2830 let data = RelayItem::Packet(Arc::new(pkt));
2831 let priority = Self::relay_item_priority(&data)?;
2832 st.push_tx(RelayTxItem {
2833 src: None,
2834 dst,
2835 data,
2836 priority,
2837 })?;
2838 }
2839 if level == DiscoveryAdvertiseLevel::Full && !topology.is_empty() {
2840 let pkt = discovery::build_discovery_topology(sender.as_ref(), now_ms, &topology)?;
2841 let data = RelayItem::Packet(Arc::new(pkt));
2842 let priority = Self::relay_item_priority(&data)?;
2843 st.push_tx(RelayTxItem {
2844 src: None,
2845 dst,
2846 data,
2847 priority,
2848 })?;
2849 }
2850 }
2851 Ok(())
2852 }
2853
2854 #[cfg(feature = "discovery")]
2855 fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
2856 let now_ms = self.clock.now_ms();
2857 let due = {
2858 let mut st = self.state.lock();
2859 let removed = Self::prune_discovery_routes_locked(&mut st, now_ms);
2860 if removed {
2861 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2862 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2863 }
2864 st.fit_discovery_budget();
2865 let has_any = st.sides.iter().enumerate().any(|(side_id, side)| {
2866 let Some(side) = side.as_ref() else {
2867 return false;
2868 };
2869 if !self.route_allowed_locked(
2870 &st,
2871 None,
2872 Some(crate::DataType::DiscoveryAnnounce),
2873 side_id,
2874 ) {
2875 return false;
2876 }
2877 let _ = side;
2878 true
2879 });
2880 if !st.sides.iter().any(|side| side.is_some()) || !has_any {
2881 return Ok(false);
2882 }
2883 st.discovery_cadence.due(now_ms)
2884 };
2885 if !due {
2886 return Ok(false);
2887 }
2888 self.queue_discovery_announce()?;
2889 Ok(true)
2890 }
2891
2892 #[cfg(feature = "discovery")]
2893 fn learn_discovery_item(&self, src: RelaySideId, data: &RelayItem) -> TelemetryResult<()> {
2894 let pkt = match data {
2895 RelayItem::Packet(pkt) => {
2896 if !discovery::is_discovery_type(pkt.data_type()) {
2897 return Ok(());
2898 }
2899 pkt.as_ref().clone()
2900 }
2901 RelayItem::Packed(bytes) => {
2902 let env = wire_format::peek_envelope(bytes.as_ref())?;
2903 if !discovery::is_discovery_type(env.ty) {
2904 return Ok(());
2905 }
2906 if wire_format::peek_frame_info(bytes.as_ref())
2907 .ok()
2908 .is_some_and(|frame| frame.ack_only())
2909 {
2910 return Ok(());
2911 }
2912 wire_format::unpack_packet(bytes.as_ref())?
2913 }
2914 };
2915
2916 let now_ms = self.clock.now_ms();
2917 if pkt.data_type() == crate::DataType::DiscoverySchema {
2918 let snapshot = discovery::decode_discovery_schema(&pkt)?;
2919 let incoming_cost = crate::config::owned_schema_byte_cost(&snapshot);
2920 let mut st = self.state.lock();
2921 st.make_shared_queue_room(incoming_cost, RelayQueueKind::Discovery)?;
2922 drop(st);
2923 let report =
2924 crate::config::merge_owned_schema_snapshot_with_budget(snapshot, MAX_QUEUE_BUDGET)?;
2925 if report.changed() {
2926 let mut st = self.state.lock();
2927 st.fit_discovery_budget();
2928 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2929 }
2930 return Ok(());
2931 }
2932 if pkt.data_type() == crate::DataType::DiscoveryLinkCapabilities {
2933 let _ = discovery::decode_discovery_link_capabilities(&pkt)?;
2934 return Ok(());
2935 }
2936 let mut st = self.state.lock();
2937 if pkt.data_type() == crate::DataType::DiscoveryLeave {
2938 let leaving = pkt.sender();
2939 let before = st.discovery_routes.clone();
2940 for route in st.discovery_routes.values_mut() {
2941 route.announcers.remove(leaving);
2942 for sender_state in route.announcers.values_mut() {
2943 sender_state
2944 .topology_boards
2945 .retain(|board| board.sender_id != leaving);
2946 for board in sender_state.topology_boards.iter_mut() {
2947 board.connections.retain(|peer| peer != leaving);
2948 }
2949 Self::refresh_sender_topology_state(sender_state);
2950 }
2951 Self::recompute_discovery_side_state(route);
2952 }
2953 st.discovery_routes
2954 .retain(|_, route| !route.announcers.is_empty());
2955 if st.discovery_routes != before {
2956 Self::note_discovery_topology_change_locked(&mut st, now_ms);
2957 }
2958 let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
2959 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2960 return Ok(());
2961 }
2962 let address_ad = if pkt.data_type() == crate::DataType::DiscoveryAddress {
2963 Some(discovery::decode_discovery_address(&pkt)?)
2964 } else {
2965 None
2966 };
2967 let announcer_id = address_ad
2968 .as_ref()
2969 .map(|ad| ad.hostname.clone())
2970 .unwrap_or_else(|| pkt.sender().to_string());
2971 let mut route = st.discovery_routes.get(&src).cloned().unwrap_or_default();
2972 let side_link_local_enabled = st
2973 .sides
2974 .get(src)
2975 .and_then(|entry| entry.as_ref())
2976 .map(|side_ref| side_ref.opts.link_local_enabled)
2977 .unwrap_or(false);
2978 let mut sender_state = route
2979 .announcers
2980 .get(&announcer_id)
2981 .cloned()
2982 .unwrap_or_default();
2983 let changed = match pkt.data_type() {
2984 crate::DataType::DiscoveryAddress => {
2985 let ad = address_ad.expect("decoded above");
2986 let mut reachable = ad.reachable_endpoints;
2987 if !side_link_local_enabled {
2988 reachable.retain(|ep| !ep.is_link_local_only());
2989 }
2990 let board = Self::sender_topology_board_mut(&mut sender_state, &ad.hostname);
2991 let changed = board.reachable_endpoints != reachable
2992 || board.reachable_timesync_sources != ad.reachable_timesync_sources;
2993 board.reachable_endpoints = reachable;
2994 board.reachable_timesync_sources = ad.reachable_timesync_sources;
2995 Self::refresh_sender_topology_state(&mut sender_state);
2996 changed
2997 }
2998 crate::DataType::DiscoveryAnnounce => {
2999 let mut reachable = discovery::decode_discovery_announce(&pkt)?;
3000 if !side_link_local_enabled {
3001 reachable.retain(|ep| !ep.is_link_local_only());
3002 }
3003 let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3004 let changed = board.reachable_endpoints != reachable;
3005 board.reachable_endpoints = reachable;
3006 Self::refresh_sender_topology_state(&mut sender_state);
3007 changed
3008 }
3009 crate::DataType::DiscoveryTimeSyncSources => {
3010 let sources = discovery::decode_discovery_timesync_sources(&pkt)?;
3011 let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3012 let changed = board.reachable_timesync_sources != sources;
3013 board.reachable_timesync_sources = sources;
3014 Self::refresh_sender_topology_state(&mut sender_state);
3015 changed
3016 }
3017 crate::DataType::DiscoveryTopology => {
3018 let mut boards = discovery::decode_discovery_topology(&pkt)?;
3019 if !side_link_local_enabled {
3020 for board in boards.iter_mut() {
3021 board
3022 .reachable_endpoints
3023 .retain(|ep| !ep.is_link_local_only());
3024 }
3025 }
3026 let changed = sender_state.topology_boards != boards;
3027 sender_state.topology_boards = boards;
3028 Self::refresh_sender_topology_state(&mut sender_state);
3029 changed
3030 }
3031 crate::DataType::DiscoverySchema => false,
3032 _ => false,
3033 };
3034 sender_state.last_seen_ms = now_ms;
3035 route.announcers.insert(announcer_id, sender_state);
3036 Self::recompute_discovery_side_state(&mut route);
3037 st.discovery_routes.insert(src, route);
3038 st.fit_discovery_budget();
3039 if changed {
3040 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3041 }
3042 let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3043 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3044 Ok(())
3045 }
3046
3047 #[cfg(not(feature = "discovery"))]
3048 fn learn_discovery_item(&self, _src: RelaySideId, _data: &RelayItem) -> TelemetryResult<()> {
3049 Ok(())
3050 }
3051
3052 #[cfg(not(feature = "discovery"))]
3053 fn queue_discovery_announce(&self) -> TelemetryResult<()> {
3054 Ok(())
3055 }
3056
3057 #[cfg(not(feature = "discovery"))]
3058 fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
3059 Ok(false)
3060 }
3061
3062 fn process_reliable_timeouts(&self) -> TelemetryResult<()> {
3063 let now = self.clock.now_ms();
3064 let mut requeue: Vec<(RelaySideId, crate::DataType, u32)> = Vec::new();
3065
3066 {
3067 let mut st = self.state.lock();
3068 if st.reliable_tx.is_empty() {
3069 return Ok(());
3070 }
3071
3072 for ((side, ty_u32), tx_state) in st.reliable_tx.iter_mut() {
3073 let Some(ty) = crate::DataType::try_from_u32(*ty_u32) else {
3074 continue;
3075 };
3076 let sent_order: Vec<u32> = tx_state.sent_order.iter().copied().collect();
3077 for seq in sent_order {
3078 let Some(sent) = tx_state.sent.get_mut(&seq) else {
3079 continue;
3080 };
3081 if sent.queued || now.wrapping_sub(sent.last_send_ms) < RELIABLE_RETRANSMIT_MS {
3082 continue;
3083 }
3084 if sent.partial_acked {
3085 continue;
3086 }
3087 if sent.retries >= RELIABLE_MAX_RETRIES {
3088 tx_state.sent.remove(&seq);
3089 tx_state.sent_order.retain(|existing| *existing != seq);
3090 continue;
3091 }
3092 sent.retries += 1;
3093 requeue.push((*side, ty, seq));
3094 }
3095 }
3096 }
3097
3098 for (side, ty, seq) in requeue {
3099 self.queue_reliable_retransmit(side, ty, seq)?;
3100 }
3101
3102 Ok(())
3103 }
3104
3105 fn get_hash(item: &RelayRxItem) -> u64 {
3109 match &item.data {
3110 RelayItem::Packet(pkt) => pkt.packet_id(),
3111 RelayItem::Packed(bytes) => {
3112 let reliable_seq = wire_format::peek_frame_info(bytes.as_ref())
3113 .ok()
3114 .and_then(|frame| frame.reliable)
3115 .and_then(|hdr| {
3116 if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
3117 None
3118 } else {
3119 Some(hdr.seq)
3120 }
3121 });
3122
3123 match wire_format::packet_id_from_wire(bytes.as_ref()) {
3124 Ok(id) => {
3125 if let Some(seq) = reliable_seq {
3126 hash_bytes_u64(id, &seq.to_le_bytes())
3127 } else {
3128 id
3129 }
3130 }
3131 Err(_e) => {
3132 let h: u64 = 0x9E37_79B9_7F4A_7C15;
3135 hash_bytes_u64(h, bytes.as_ref())
3136 }
3137 }
3138 }
3139 }
3140 }
3141
3142 fn is_duplicate_pkt(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3146 let id = Self::get_hash(item);
3147
3148 let mut st = self.state.lock();
3149 if st.recent_rx.contains(&id) {
3150 Ok(true)
3151 } else {
3152 st.push_recent_rx(id)?;
3153 Ok(false)
3154 }
3155 }
3156
3157 fn should_forward_duplicate_reliable_item(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3158 let (_, ty) = self.item_route_info(&item.data)?;
3159 if !is_reliable_type(ty)
3160 || matches!(
3161 ty,
3162 crate::DataType::ReliableAck
3163 | crate::DataType::ReliablePartialAck
3164 | crate::DataType::ReliablePacketRequest
3165 )
3166 {
3167 return Ok(false);
3168 }
3169
3170 let RemoteSidePlan::Target(sides) = self.remote_side_plan(&item.data, item.src)?;
3171 let st = self.state.lock();
3172 let now_ms = self.clock.now_ms();
3173 Ok(sides
3174 .into_iter()
3175 .any(|side| self.side_has_multiple_announcers_locked(&st, side, now_ms)))
3176 }
3177
3178 pub fn add_side_packed<F>(&self, name: &'static str, tx: F) -> RelaySideId
3183 where
3184 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3185 {
3186 self.add_side_packed_with_options(name, tx, RelaySideOptions::default())
3187 }
3188
3189 pub fn add_side_packed_small_packets<F>(
3193 &self,
3194 name: &'static str,
3195 tx: F,
3196 max_frame_bytes: usize,
3197 ) -> RelaySideId
3198 where
3199 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3200 {
3201 self.add_side_packed_with_options(
3202 name,
3203 tx,
3204 RelaySideOptions::default().with_small_packet_transport(max_frame_bytes),
3205 )
3206 }
3207
3208 pub fn add_side_packed_with_options<F>(
3214 &self,
3215 name: &'static str,
3216 tx: F,
3217 opts: RelaySideOptions,
3218 ) -> RelaySideId
3219 where
3220 F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3221 {
3222 let mut st = self.state.lock();
3223 let id = st.sides.len();
3224 st.sides.push(Some(RelaySide {
3225 name,
3226 tx_handler: RelayTxHandlerFn::Packed(Arc::new(tx)),
3227 opts,
3228 }));
3229 st.side_runtime_stats
3230 .insert(id, SideRuntimeStatsInner::default());
3231 st.side_transport.insert(id, SideTransportState::default());
3232 #[cfg(feature = "discovery")]
3233 Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3234 id
3235 }
3236
3237 pub fn add_side_packet<F>(&self, name: &'static str, tx: F) -> RelaySideId
3242 where
3243 F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3244 {
3245 self.add_side_packet_with_options(name, tx, RelaySideOptions::default())
3246 }
3247
3248 pub fn add_side_packet_with_options<F>(
3250 &self,
3251 name: &'static str,
3252 tx: F,
3253 opts: RelaySideOptions,
3254 ) -> RelaySideId
3255 where
3256 F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3257 {
3258 let mut st = self.state.lock();
3259 let id = st.sides.len();
3260 st.sides.push(Some(RelaySide {
3261 name,
3262 tx_handler: RelayTxHandlerFn::Packet(Arc::new(tx)),
3263 opts,
3264 }));
3265 st.side_runtime_stats
3266 .insert(id, SideRuntimeStatsInner::default());
3267 st.side_transport.insert(id, SideTransportState::default());
3268 #[cfg(feature = "discovery")]
3269 Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3270 id
3271 }
3272
3273 pub fn remove_side(&self, side: RelaySideId) -> TelemetryResult<()> {
3278 let now_ms = self.clock.now_ms();
3279 let mut st = self.state.lock();
3280 let slot = st.sides.get_mut(side).ok_or(TelemetryError::BadArg)?;
3281 if slot.is_none() {
3282 return Err(TelemetryError::BadArg);
3283 }
3284 *slot = None;
3285 st.route_overrides
3286 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3287 st.typed_route_overrides
3288 .retain(|(src_side, _, dst_side), _| *src_side != Some(side) && *dst_side != side);
3289 st.route_weights
3290 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3291 st.route_priorities
3292 .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3293 st.source_route_modes.remove(&Some(side));
3294 st.route_selection_cursors.remove(&Some(side));
3295 st.adaptive_route_stats.remove(&side);
3296 #[cfg(feature = "discovery")]
3297 st.discovery_side_throttle.remove(&side);
3298 #[cfg(all(feature = "discovery", feature = "timesync"))]
3299 st.timesync_side_throttle.remove(&side);
3300 st.side_runtime_stats.remove(&side);
3301 st.reliable_return_routes
3302 .retain(|_, route| route.side != side);
3303 st.rx_queue.retain(|queued| queued.src != side);
3304 st.tx_queue
3305 .retain(|queued| queued.dst != side && queued.src != Some(side));
3306 st.replay_queue.retain(|queued| queued.dst != side);
3307 st.reliable_tx.retain(|(side_id, _), _| *side_id != side);
3308 st.reliable_rx.retain(|(side_id, _), _| *side_id != side);
3309 #[cfg(feature = "discovery")]
3310 {
3311 st.discovery_routes.remove(&side);
3312 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3313 }
3314 Ok(())
3315 }
3316
3317 pub fn set_side_ingress_enabled(
3319 &self,
3320 side: RelaySideId,
3321 enabled: bool,
3322 ) -> TelemetryResult<()> {
3323 let now_ms = self.clock.now_ms();
3324 let mut st = self.state.lock();
3325 let side_ref = st
3326 .sides
3327 .get_mut(side)
3328 .and_then(|side| side.as_mut())
3329 .ok_or(TelemetryError::BadArg)?;
3330 side_ref.opts.ingress_enabled = enabled;
3331 #[cfg(feature = "discovery")]
3332 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3333 Ok(())
3334 }
3335
3336 pub fn set_side_egress_enabled(&self, side: RelaySideId, enabled: bool) -> TelemetryResult<()> {
3338 let now_ms = self.clock.now_ms();
3339 let mut st = self.state.lock();
3340 let side_ref = st
3341 .sides
3342 .get_mut(side)
3343 .and_then(|side| side.as_mut())
3344 .ok_or(TelemetryError::BadArg)?;
3345 side_ref.opts.egress_enabled = enabled;
3346 if !enabled {
3347 st.tx_queue.retain(|queued| queued.dst != side);
3348 st.replay_queue.retain(|queued| queued.dst != side);
3349 }
3350 #[cfg(feature = "discovery")]
3351 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3352 Ok(())
3353 }
3354
3355 pub fn set_source_route_mode(
3359 &self,
3360 src: Option<RelaySideId>,
3361 mode: RouteSelectionMode,
3362 ) -> TelemetryResult<()> {
3363 let now_ms = self.clock.now_ms();
3364 let mut st = self.state.lock();
3365 if let Some(src) = src {
3366 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3367 }
3368 if mode == RouteSelectionMode::Fanout {
3369 st.source_route_modes.remove(&src);
3370 } else {
3371 st.source_route_modes.insert(src, mode);
3372 }
3373 st.route_selection_cursors.remove(&src);
3374 #[cfg(feature = "discovery")]
3375 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3376 Ok(())
3377 }
3378
3379 pub fn clear_source_route_mode(&self, src: Option<RelaySideId>) -> TelemetryResult<()> {
3381 self.set_source_route_mode(src, RouteSelectionMode::Fanout)
3382 }
3383
3384 pub fn set_route_weight(
3386 &self,
3387 src: Option<RelaySideId>,
3388 dst: RelaySideId,
3389 weight: u32,
3390 ) -> TelemetryResult<()> {
3391 let now_ms = self.clock.now_ms();
3392 let mut st = self.state.lock();
3393 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3394 if let Some(src) = src {
3395 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3396 }
3397 st.route_weights.insert((src, dst), weight);
3398 st.route_selection_cursors.remove(&src);
3399 #[cfg(feature = "discovery")]
3400 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3401 Ok(())
3402 }
3403
3404 pub fn clear_route_weight(
3406 &self,
3407 src: Option<RelaySideId>,
3408 dst: RelaySideId,
3409 ) -> TelemetryResult<()> {
3410 let now_ms = self.clock.now_ms();
3411 let mut st = self.state.lock();
3412 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3413 if let Some(src) = src {
3414 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3415 }
3416 st.route_weights.remove(&(src, dst));
3417 st.route_selection_cursors.remove(&src);
3418 #[cfg(feature = "discovery")]
3419 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3420 Ok(())
3421 }
3422
3423 pub fn set_route_priority(
3425 &self,
3426 src: Option<RelaySideId>,
3427 dst: RelaySideId,
3428 priority: u32,
3429 ) -> TelemetryResult<()> {
3430 let now_ms = self.clock.now_ms();
3431 let mut st = self.state.lock();
3432 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3433 if let Some(src) = src {
3434 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3435 }
3436 st.route_priorities.insert((src, dst), priority);
3437 #[cfg(feature = "discovery")]
3438 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3439 Ok(())
3440 }
3441
3442 pub fn clear_route_priority(
3444 &self,
3445 src: Option<RelaySideId>,
3446 dst: RelaySideId,
3447 ) -> TelemetryResult<()> {
3448 let now_ms = self.clock.now_ms();
3449 let mut st = self.state.lock();
3450 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3451 if let Some(src) = src {
3452 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3453 }
3454 st.route_priorities.remove(&(src, dst));
3455 #[cfg(feature = "discovery")]
3456 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3457 Ok(())
3458 }
3459
3460 pub fn set_route(
3462 &self,
3463 src: Option<RelaySideId>,
3464 dst: RelaySideId,
3465 enabled: bool,
3466 ) -> TelemetryResult<()> {
3467 let now_ms = self.clock.now_ms();
3468 let mut st = self.state.lock();
3469 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3470 if let Some(src) = src {
3471 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3472 }
3473 st.route_overrides.insert((src, dst), enabled);
3474 #[cfg(feature = "discovery")]
3475 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3476 Ok(())
3477 }
3478
3479 pub fn set_typed_route(
3481 &self,
3482 src: Option<RelaySideId>,
3483 ty: crate::DataType,
3484 dst: RelaySideId,
3485 enabled: bool,
3486 ) -> TelemetryResult<()> {
3487 let now_ms = self.clock.now_ms();
3488 let mut st = self.state.lock();
3489 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3490 if let Some(src) = src {
3491 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3492 }
3493 st.typed_route_overrides
3494 .insert((src, ty.as_u32(), dst), enabled);
3495 #[cfg(feature = "discovery")]
3496 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3497 Ok(())
3498 }
3499
3500 pub fn clear_typed_route(
3502 &self,
3503 src: Option<RelaySideId>,
3504 ty: crate::DataType,
3505 dst: RelaySideId,
3506 ) -> TelemetryResult<()> {
3507 let now_ms = self.clock.now_ms();
3508 let mut st = self.state.lock();
3509 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3510 if let Some(src) = src {
3511 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3512 }
3513 st.typed_route_overrides.remove(&(src, ty.as_u32(), dst));
3514 #[cfg(feature = "discovery")]
3515 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3516 Ok(())
3517 }
3518
3519 pub fn clear_route(&self, src: Option<RelaySideId>, dst: RelaySideId) -> TelemetryResult<()> {
3521 let now_ms = self.clock.now_ms();
3522 let mut st = self.state.lock();
3523 let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3524 if let Some(src) = src {
3525 let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3526 }
3527 st.route_overrides.remove(&(src, dst));
3528 #[cfg(feature = "discovery")]
3529 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3530 Ok(())
3531 }
3532
3533 #[cfg(feature = "discovery")]
3534 pub fn announce_discovery(&self) -> TelemetryResult<()> {
3536 self.queue_discovery_announce()
3537 }
3538
3539 pub fn announce_leave(&self) -> TelemetryResult<()> {
3541 let pkt = discovery::build_discovery_leave("relay", self.clock.now_ms())?;
3542 let mut st = self.state.lock();
3543 let dsts: Vec<usize> = st
3544 .sides
3545 .iter()
3546 .enumerate()
3547 .filter_map(|(idx, side)| side.as_ref().map(|_| idx))
3548 .collect();
3549 for dst in dsts {
3550 let data = RelayItem::Packet(Arc::new(pkt.clone()));
3551 let priority = Self::relay_item_priority(&data)?;
3552 st.push_tx(RelayTxItem {
3553 src: None,
3554 dst,
3555 data,
3556 priority,
3557 })?;
3558 }
3559 Ok(())
3560 }
3561
3562 #[cfg(feature = "discovery")]
3563 pub fn poll_discovery(&self) -> TelemetryResult<bool> {
3565 self.poll_discovery_announce()
3566 }
3567
3568 #[cfg(feature = "discovery")]
3569 pub fn export_topology(&self) -> TopologySnapshot {
3571 let now_ms = self.clock.now_ms();
3572 let mut st = self.state.lock();
3573 if Self::prune_discovery_routes_locked(&mut st, now_ms) {
3574 self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3575 Self::note_discovery_topology_change_locked(&mut st, now_ms);
3576 }
3577 let routes = st
3578 .discovery_routes
3579 .iter()
3580 .filter_map(|(&side_id, route)| {
3581 let side = st.sides.get(side_id).and_then(|side| side.as_ref())?;
3582 let announcers = route
3583 .announcers
3584 .iter()
3585 .map(|(sender_id, sender_state)| TopologyAnnouncerRoute {
3586 sender_id: sender_id.clone(),
3587 reachable_endpoints: sender_state
3588 .reachable
3589 .iter()
3590 .copied()
3591 .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3592 .collect(),
3593 reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
3594 routers: sender_state.topology_boards.clone(),
3595 last_seen_ms: sender_state.last_seen_ms,
3596 age_ms: now_ms.saturating_sub(sender_state.last_seen_ms),
3597 })
3598 .collect();
3599 Some(TopologySideRoute {
3600 side_id,
3601 side_name: side.name,
3602 reachable_endpoints: route
3603 .reachable
3604 .iter()
3605 .copied()
3606 .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3607 .collect(),
3608 reachable_timesync_sources: route.reachable_timesync_sources.clone(),
3609 announcers,
3610 last_seen_ms: route.last_seen_ms,
3611 age_ms: now_ms.saturating_sub(route.last_seen_ms),
3612 })
3613 })
3614 .collect();
3615 let routers = self.advertised_discovery_topology_for_link_locked(&st, now_ms, true);
3616 let advertised_endpoints =
3617 self.advertised_discovery_endpoints_for_link_locked(&st, now_ms, true);
3618 let advertised_timesync_sources =
3619 self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
3620 let links = discovery::topology_links_from_boards(&routers);
3621 TopologySnapshot {
3622 advertised_endpoints,
3623 advertised_timesync_sources,
3624 routers,
3625 links,
3626 routes,
3627 current_announce_interval_ms: st.discovery_cadence.current_interval_ms,
3628 next_announce_ms: st.discovery_cadence.next_announce_ms,
3629 }
3630 }
3631
3632 #[cfg(feature = "discovery")]
3633 pub fn client_stats(&self, sender_id: &str) -> Option<ClientStatsSnapshot> {
3634 let now_ms = self.clock.now_ms();
3635 let st = self.state.lock();
3636 let mut side_ids = Vec::new();
3637 let mut side_names = Vec::new();
3638 let mut last_seen_ms = None::<u64>;
3639 let mut reachable_endpoints = Vec::new();
3640 let mut reachable_timesync_sources = Vec::new();
3641 let mut packets_sent = 0u64;
3642 let mut packets_received = 0u64;
3643 let mut bytes_sent = 0u64;
3644 let mut bytes_received = 0u64;
3645
3646 for (side_id, route) in &st.discovery_routes {
3647 let Some(sender_state) = route.announcers.get(sender_id) else {
3648 continue;
3649 };
3650 side_ids.push(*side_id);
3651 if let Some(side_name) = st
3652 .sides
3653 .get(*side_id)
3654 .and_then(|side| side.as_ref())
3655 .map(|side| side.name)
3656 {
3657 side_names.push(side_name);
3658 }
3659 last_seen_ms = Some(last_seen_ms.unwrap_or(0).max(sender_state.last_seen_ms));
3660 reachable_endpoints.extend(sender_state.reachable.iter().copied());
3661 reachable_timesync_sources
3662 .extend(sender_state.reachable_timesync_sources.iter().cloned());
3663 if let Some(stats) = st.side_runtime_stats.get(side_id) {
3664 packets_sent = packets_sent.saturating_add(stats.tx_packets);
3665 packets_received = packets_received.saturating_add(stats.rx_packets);
3666 bytes_sent = bytes_sent.saturating_add(stats.tx_bytes);
3667 bytes_received = bytes_received.saturating_add(stats.rx_bytes);
3668 }
3669 }
3670
3671 if side_ids.is_empty() {
3672 return None;
3673 }
3674 reachable_endpoints.retain(|ep| !discovery::is_router_control_endpoint(*ep));
3675 reachable_endpoints.sort_unstable();
3676 reachable_endpoints.dedup();
3677 reachable_timesync_sources.sort_unstable();
3678 reachable_timesync_sources.dedup();
3679 side_ids.sort_unstable();
3680 side_ids.dedup();
3681 side_names.sort_unstable();
3682 side_names.dedup();
3683 let age_ms = last_seen_ms.map(|seen| now_ms.saturating_sub(seen));
3684 Some(ClientStatsSnapshot {
3685 sender_id: sender_id.to_string(),
3686 connected: age_ms.is_some_and(|age| age <= DISCOVERY_ROUTE_TTL_MS),
3687 side_ids,
3688 side_names,
3689 last_seen_ms,
3690 age_ms,
3691 reachable_endpoints,
3692 reachable_timesync_sources,
3693 packets_sent,
3694 packets_received,
3695 bytes_sent,
3696 bytes_received,
3697 })
3698 }
3699
3700 pub fn export_runtime_stats(&self) -> RuntimeStatsSnapshot {
3701 let now_ms = self.clock.now_ms();
3702 let st = self.state.lock();
3703
3704 let mut sides = Vec::new();
3705 for (side_id, side) in st.sides.iter().enumerate() {
3706 let Some(side) = side.as_ref() else { continue };
3707 let stats = st
3708 .side_runtime_stats
3709 .get(&side_id)
3710 .cloned()
3711 .unwrap_or_default();
3712 let adaptive = st
3713 .adaptive_route_stats
3714 .get(&side_id)
3715 .cloned()
3716 .unwrap_or_default()
3717 .snapshot(now_ms, true);
3718 let (tx_template_count, rx_template_count) = st
3719 .side_transport
3720 .get(&side_id)
3721 .map(|state| (state.tx_template_count(), state.rx_template_count()))
3722 .unwrap_or((0, 0));
3723 let mut data_types: Vec<RuntimeTypeStats> = stats
3724 .data_types
3725 .into_iter()
3726 .map(|(ty, item)| RuntimeTypeStats {
3727 data_type: crate::DataType(ty),
3728 tx_packets: item.tx_packets,
3729 tx_bytes: item.tx_bytes,
3730 rx_packets: item.rx_packets,
3731 rx_bytes: item.rx_bytes,
3732 relayed_tx_packets: item.relayed_tx_packets,
3733 relayed_tx_bytes: item.relayed_tx_bytes,
3734 relayed_rx_packets: item.relayed_rx_packets,
3735 relayed_rx_bytes: item.relayed_rx_bytes,
3736 tx_retries: item.tx_retries,
3737 handler_failures: item.handler_failures,
3738 })
3739 .collect();
3740 data_types.sort_unstable_by_key(|item| item.data_type.as_u32());
3741 sides.push(RuntimeSideStats {
3742 side_id,
3743 side_name: side.name,
3744 reliable_enabled: side.opts.reliable_enabled,
3745 link_local_enabled: side.opts.link_local_enabled,
3746 header_template_enabled: side.opts.header_template_enabled,
3747 max_frame_bytes: side.opts.max_frame_bytes,
3748 compact_header_target_bytes: side.opts.compact_header_target_bytes,
3749 side_transport_profile: side.opts.effective_transport_profile().as_str(),
3750 ingress_enabled: side.opts.ingress_enabled,
3751 egress_enabled: side.opts.egress_enabled,
3752 tx_packets: stats.tx_packets,
3753 tx_bytes: stats.tx_bytes,
3754 rx_packets: stats.rx_packets,
3755 rx_bytes: stats.rx_bytes,
3756 relayed_tx_packets: stats.relayed_tx_packets,
3757 relayed_tx_bytes: stats.relayed_tx_bytes,
3758 relayed_rx_packets: stats.relayed_rx_packets,
3759 relayed_rx_bytes: stats.relayed_rx_bytes,
3760 local_delivery_packets: 0,
3761 tx_retries: stats.tx_retries,
3762 tx_handler_failures: stats.tx_handler_failures,
3763 local_handler_failures: 0,
3764 total_handler_retries: stats.total_handler_retries,
3765 side_transport_full_frames: stats.side_transport_full_frames,
3766 side_transport_compact_frames: stats.side_transport_compact_frames,
3767 side_transport_compact_delta_frames: stats.side_transport_compact_delta_frames,
3768 side_transport_compact_omitted_timestamp_frames: stats
3769 .side_transport_compact_omitted_timestamp_frames,
3770 side_transport_chunk_frames: stats.side_transport_chunk_frames,
3771 side_transport_raw_bytes: stats.side_transport_raw_bytes,
3772 side_transport_wire_bytes: stats.side_transport_wire_bytes,
3773 side_transport_bytes_saved: stats.side_transport_bytes_saved,
3774 side_transport_min_compact_overhead_bytes: stats
3775 .side_transport_min_compact_overhead_bytes,
3776 side_transport_max_compact_overhead_bytes: stats
3777 .side_transport_max_compact_overhead_bytes,
3778 side_transport_compact_target_misses: stats.side_transport_compact_target_misses,
3779 side_transport_template_evictions: stats.side_transport_template_evictions,
3780 side_transport_tx_template_count: tx_template_count,
3781 side_transport_rx_template_count: rx_template_count,
3782 max_side_transport_templates: side.opts.max_side_transport_templates,
3783 adaptive,
3784 data_types,
3785 });
3786 }
3787
3788 let mut route_modes: Vec<RouteModeStats> = st
3789 .route_selection_cursors
3790 .iter()
3791 .map(|(src, cursor)| RouteModeStats {
3792 src_side_id: *src,
3793 selection_mode: st.source_route_modes.get(src).copied(),
3794 cursor: *cursor,
3795 })
3796 .collect();
3797 for src in st.source_route_modes.keys() {
3798 if !route_modes.iter().any(|mode| mode.src_side_id == *src) {
3799 route_modes.push(RouteModeStats {
3800 src_side_id: *src,
3801 selection_mode: st.source_route_modes.get(src).copied(),
3802 cursor: 0,
3803 });
3804 }
3805 }
3806 route_modes.sort_unstable_by_key(|mode| mode.src_side_id.unwrap_or(usize::MAX));
3807
3808 let mut route_overrides: Vec<RouteOverrideStats> = st
3809 .route_overrides
3810 .iter()
3811 .map(|((src, dst), enabled)| RouteOverrideStats {
3812 src_side_id: *src,
3813 dst_side_id: *dst,
3814 enabled: *enabled,
3815 })
3816 .collect();
3817 route_overrides.sort_unstable_by_key(|item| {
3818 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3819 });
3820
3821 let mut typed_route_overrides: Vec<TypedRouteOverrideStats> = st
3822 .typed_route_overrides
3823 .iter()
3824 .map(|((src, ty, dst), enabled)| TypedRouteOverrideStats {
3825 src_side_id: *src,
3826 data_type: crate::DataType(*ty),
3827 dst_side_id: *dst,
3828 enabled: *enabled,
3829 })
3830 .collect();
3831 typed_route_overrides.sort_unstable_by_key(|item| {
3832 (
3833 item.src_side_id.unwrap_or(usize::MAX),
3834 item.data_type.as_u32(),
3835 item.dst_side_id,
3836 )
3837 });
3838
3839 let mut route_weights: Vec<RouteWeightStats> = st
3840 .route_weights
3841 .iter()
3842 .map(|((src, dst), weight)| RouteWeightStats {
3843 src_side_id: *src,
3844 dst_side_id: *dst,
3845 weight: *weight,
3846 })
3847 .collect();
3848 route_weights.sort_unstable_by_key(|item| {
3849 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3850 });
3851
3852 let mut route_priorities: Vec<RoutePriorityStats> = st
3853 .route_priorities
3854 .iter()
3855 .map(|((src, dst), priority)| RoutePriorityStats {
3856 src_side_id: *src,
3857 dst_side_id: *dst,
3858 priority: *priority,
3859 })
3860 .collect();
3861 route_priorities.sort_unstable_by_key(|item| {
3862 (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3863 });
3864
3865 #[cfg(feature = "discovery")]
3866 let discovery = DiscoveryRuntimeStats {
3867 route_count: st.discovery_routes.len(),
3868 announcer_count: st
3869 .discovery_routes
3870 .values()
3871 .map(|route| route.announcers.len())
3872 .sum(),
3873 current_announce_interval_ms: Some(st.discovery_cadence.current_interval_ms),
3874 next_announce_ms: Some(st.discovery_cadence.next_announce_ms),
3875 };
3876 #[cfg(not(feature = "discovery"))]
3877 let discovery = DiscoveryRuntimeStats {
3878 route_count: 0,
3879 announcer_count: 0,
3880 current_announce_interval_ms: None,
3881 next_announce_ms: None,
3882 };
3883
3884 RuntimeStatsSnapshot {
3885 sides,
3886 route_modes,
3887 route_overrides,
3888 typed_route_overrides,
3889 route_weights,
3890 route_priorities,
3891 queues: QueueRuntimeStats {
3892 rx_len: st.rx_queue.len(),
3893 rx_bytes: st.rx_queue.bytes_used(),
3894 tx_len: st.tx_queue.len(),
3895 tx_bytes: st.tx_queue.bytes_used(),
3896 replay_len: st.replay_queue.len(),
3897 replay_bytes: st.replay_queue.bytes_used(),
3898 recent_rx_len: st.recent_rx.len(),
3899 recent_rx_bytes: st.recent_rx.bytes_used(),
3900 reliable_rx_buffered_len: st.reliable_rx_buffer_len(),
3901 reliable_rx_buffered_bytes: st.reliable_rx_buffered_bytes(),
3902 shared_queue_bytes_used: st.shared_queue_bytes_used(),
3903 },
3904 reliable: ReliableRuntimeStats {
3905 reliable_return_route_count: st.reliable_return_routes.len(),
3906 end_to_end_pending_count: 0,
3907 end_to_end_pending_destination_count: 0,
3908 end_to_end_acked_cache_count: st.end_to_end_acked_destinations.len(),
3909 },
3910 discovery,
3911 total_handler_failures: st.total_handler_failures,
3912 total_handler_retries: st.total_handler_retries,
3913 }
3914 }
3915
3916 pub fn export_memory_layout_json(&self) -> String {
3918 let st = self.state.lock();
3919 #[cfg(feature = "discovery")]
3920 let discovery_bytes = st.discovery_bytes_used();
3921 #[cfg(not(feature = "discovery"))]
3922 let discovery_bytes = 0usize;
3923 let schema_bytes = crate::config::schema_bytes_used();
3924 let mut out = String::new();
3925 let _ = core::fmt::Write::write_fmt(
3926 &mut out,
3927 format_args!(
3928 "{{\"kind\":\"relay\",\
3929 \"shared_queue_bytes_used\":{},\"shared_queue_bytes_allocated\":{},\
3930 \"rx_queue_bytes_used\":{},\"rx_queue_bytes_allocated\":{},\"rx_queue_len\":{},\
3931 \"tx_queue_bytes_used\":{},\"tx_queue_bytes_allocated\":{},\"tx_queue_len\":{},\
3932 \"replay_queue_bytes_used\":{},\"replay_queue_bytes_allocated\":{},\"replay_queue_len\":{},\
3933 \"recent_rx_bytes_used\":{},\"recent_rx_bytes_allocated\":{},\"recent_rx_len\":{},\
3934 \"reliable_rx_buffer_bytes_used\":{},\"reliable_rx_buffer_bytes_allocated\":{},\"reliable_rx_buffer_len\":{},\
3935 \"discovery_bytes_used\":{},\"discovery_bytes_allocated\":{},\
3936 \"schema_bytes_used\":{},\"schema_bytes_allocated\":{}}}",
3937 st.shared_queue_bytes_used(),
3938 MAX_QUEUE_BUDGET,
3939 st.rx_queue.bytes_used(),
3940 st.rx_queue.max_bytes(),
3941 st.rx_queue.len(),
3942 st.tx_queue.bytes_used(),
3943 st.tx_queue.max_bytes(),
3944 st.tx_queue.len(),
3945 st.replay_queue.bytes_used(),
3946 st.replay_queue.max_bytes(),
3947 st.replay_queue.len(),
3948 st.recent_rx.bytes_used(),
3949 st.recent_rx.max_bytes(),
3950 st.recent_rx.len(),
3951 st.reliable_rx_buffered_bytes(),
3952 MAX_QUEUE_BUDGET,
3953 st.reliable_rx_buffer_len(),
3954 discovery_bytes,
3955 MAX_QUEUE_BUDGET,
3956 schema_bytes,
3957 MAX_QUEUE_BUDGET,
3958 ),
3959 );
3960 out
3961 }
3962
3963 #[cfg(test)]
3964 pub(crate) fn debug_end_to_end_acked_destination_count(&self, packet_id: u64) -> Option<usize> {
3965 let st = self.state.lock();
3966 st.end_to_end_acked_destinations
3967 .get(&packet_id)
3968 .map(BTreeSet::len)
3969 }
3970
3971 #[cfg(test)]
3972 pub(crate) fn debug_end_to_end_acked_packet_count(&self) -> usize {
3973 let st = self.state.lock();
3974 st.end_to_end_acked_destinations.len()
3975 }
3976
3977 #[cfg(test)]
3978 pub(crate) fn debug_reliable_return_route_count(&self) -> usize {
3979 let st = self.state.lock();
3980 st.reliable_return_routes.len()
3981 }
3982
3983 pub fn rx_packed_from_side(&self, src: RelaySideId, bytes: &[u8]) -> TelemetryResult<()> {
3988 self.ensure_side_ingress_enabled(src)?;
3989 let Some(bytes) = self.decode_side_transport_frame(src, bytes)? else {
3990 return Ok(());
3991 };
3992 let mut st = self.state.lock();
3993
3994 let data = RelayItem::Packed(bytes);
3995 let priority = Self::relay_item_priority(&data)?;
3996 st.push_rx(RelayRxItem {
3997 src,
3998 data,
3999 priority,
4000 })
4001 }
4002
4003 pub fn rx_from_side(&self, src: RelaySideId, packet: Packet) -> TelemetryResult<()> {
4007 self.ensure_side_ingress_enabled(src)?;
4008 let mut st = self.state.lock();
4009
4010 let data = RelayItem::Packet(Arc::new(packet));
4011 let priority = Self::relay_item_priority(&data)?;
4012 st.push_rx(RelayRxItem {
4013 src,
4014 data,
4015 priority,
4016 })
4017 }
4018
4019 pub fn clear_queues(&self) {
4021 let mut st = self.state.lock();
4022 st.rx_queue.clear();
4023 st.tx_queue.clear();
4024 }
4025
4026 pub fn clear_rx_queue(&self) {
4028 let mut st = self.state.lock();
4029 st.rx_queue.clear();
4030 }
4031
4032 pub fn clear_tx_queue(&self) {
4034 let mut st = self.state.lock();
4035 st.tx_queue.clear();
4036 st.replay_queue.clear();
4037 }
4038
4039 fn process_rx_queue_item(&self, item: RelayRxItem) -> TelemetryResult<()> {
4043 self.ensure_side_ingress_enabled(item.src)?;
4044 match &item.data {
4045 RelayItem::Packet(pkt) => {
4046 let bytes = wire_format::pack_packet(pkt).len();
4047 self.note_side_rx(item.src, pkt.data_type(), bytes);
4048 }
4049 RelayItem::Packed(bytes) => {
4050 if let Ok(env) = wire_format::peek_envelope(bytes.as_ref()) {
4051 self.note_side_rx(item.src, env.ty, bytes.len());
4052 }
4053 }
4054 }
4055 match &item.data {
4056 RelayItem::Packet(pkt) => {
4057 if is_reliable_type(pkt.data_type()) && !is_internal_control_type(pkt.data_type()) {
4058 self.note_reliable_return_route(item.src, pkt.packet_id());
4059 }
4060 }
4061 RelayItem::Packed(bytes) => {
4062 if let Ok(env) = wire_format::peek_envelope(bytes.as_ref())
4063 && is_reliable_type(env.ty)
4064 && !is_internal_control_type(env.ty)
4065 && let Ok(packet_id) = wire_format::packet_id_from_wire(bytes.as_ref())
4066 {
4067 self.note_reliable_return_route(item.src, packet_id);
4068 }
4069 }
4070 }
4071 let mut released_buffered: Vec<Arc<[u8]>> = Vec::new();
4072 if let RelayItem::Packed(bytes) = &item.data {
4073 let (_opts, handler_is_packed, hop_reliable_enabled) = {
4074 let st = self.state.lock();
4075 let side_ref = Self::side_ref(&st, item.src)?;
4076 let opts = side_ref.opts;
4077 (
4078 opts,
4079 matches!(side_ref.tx_handler, RelayTxHandlerFn::Packed(_)),
4080 opts.reliable_enabled
4081 && !self.side_has_multiple_announcers_locked(
4082 &st,
4083 item.src,
4084 self.clock.now_ms(),
4085 ),
4086 )
4087 };
4088
4089 let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4090 Ok(frame) => frame,
4091 Err(e) => {
4092 if matches!(e, TelemetryError::Unpack(msg) if msg == "crc32 mismatch")
4093 && hop_reliable_enabled
4094 && handler_is_packed
4095 && let Ok(frame) = wire_format::peek_frame_info_unchecked(bytes.as_ref())
4096 {
4097 if is_reliable_type(frame.envelope.ty)
4098 && let Some(hdr) = frame.reliable
4099 {
4100 let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4101 let unsequenced =
4102 (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4103
4104 if !unsequenced {
4105 let requested = if unordered {
4106 hdr.seq
4107 } else {
4108 let mut st = self.state.lock();
4109 let rx_state = self.reliable_rx_state_mut(
4110 &mut st,
4111 item.src,
4112 frame.envelope.ty,
4113 );
4114 rx_state.expected_seq.min(hdr.seq)
4115 };
4116 self.queue_reliable_packet_request(
4117 item.src,
4118 frame.envelope.ty,
4119 requested,
4120 )?;
4121 }
4122 }
4123 return Ok(());
4124 }
4125 return Err(e);
4126 }
4127 };
4128
4129 if hop_reliable_enabled
4130 && handler_is_packed
4131 && is_reliable_type(frame.envelope.ty)
4132 && let Some(hdr) = frame.reliable
4133 {
4134 if frame.ack_only() {
4135 self.handle_reliable_ack(item.src, frame.envelope.ty, hdr.ack);
4136 return Ok(());
4137 }
4138 let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4139 let unsequenced = (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4140
4141 if !unsequenced {
4142 if unordered {
4143 self.queue_reliable_ack(item.src, frame.envelope.ty, hdr.seq)?;
4144 } else {
4145 let mut release: Vec<Arc<[u8]>> = Vec::new();
4146 let mut last_delivered = None;
4147 let mut ack_old = None;
4148 let mut request_missing = None;
4149 let mut partial_ack = None;
4150 {
4151 let mut st = self.state.lock();
4152 let rx_state =
4153 self.reliable_rx_state_mut(&mut st, item.src, frame.envelope.ty);
4154 let expected_seq = rx_state.expected_seq;
4155 if hdr.seq < expected_seq {
4156 ack_old = Some(expected_seq.saturating_sub(1));
4157 } else if hdr.seq > expected_seq {
4158 request_missing = Some(expected_seq);
4159 partial_ack = Some(hdr.seq);
4160 st.buffer_reliable_rx(
4161 item.src,
4162 frame.envelope.ty,
4163 hdr.seq,
4164 bytes.clone(),
4165 )?;
4166 } else {
4167 release.push(bytes.clone());
4168 last_delivered = Some(hdr.seq);
4169 let mut next_expected = hdr.seq.wrapping_add(1);
4170 while let Some(buf) = rx_state.buffered.remove(&next_expected) {
4171 release.push(buf);
4172 last_delivered = Some(next_expected);
4173 let next = next_expected.wrapping_add(1);
4174 next_expected = if next == 0 { 1 } else { next };
4175 }
4176 rx_state.expected_seq = next_expected;
4177 }
4178 }
4179
4180 if let Some(ack_seq) = ack_old {
4181 self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4182 return Ok(());
4183 }
4184 if let Some(request_seq) = request_missing {
4185 if let Some(partial_seq) = partial_ack {
4186 self.queue_reliable_partial_ack(
4187 item.src,
4188 frame.envelope.ty,
4189 partial_seq,
4190 )?;
4191 }
4192 self.queue_reliable_packet_request(
4193 item.src,
4194 frame.envelope.ty,
4195 request_seq,
4196 )?;
4197 return Ok(());
4198 }
4199 if let Some(ack_seq) = last_delivered {
4200 self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4201 }
4202 released_buffered.extend(release.into_iter().skip(1));
4203 }
4204 }
4205 }
4206 }
4207
4208 if self.is_duplicate_pkt(&item)? && !self.should_forward_duplicate_reliable_item(&item)? {
4209 return Ok(());
4211 }
4212
4213 self.dispatch_relay_rx_item(&item)?;
4214
4215 for release_bytes in released_buffered {
4216 let release_item = RelayRxItem {
4217 src: item.src,
4218 priority: Self::relay_item_priority(&RelayItem::Packed(release_bytes.clone()))?,
4219 data: RelayItem::Packed(release_bytes),
4220 };
4221 if self.is_duplicate_pkt(&release_item)?
4222 && !self.should_forward_duplicate_reliable_item(&release_item)?
4223 {
4224 continue;
4225 }
4226 self.dispatch_relay_rx_item(&release_item)?;
4227 }
4228 Ok(())
4229 }
4230
4231 fn dispatch_relay_rx_item(&self, item: &RelayRxItem) -> TelemetryResult<()> {
4232 match &item.data {
4233 RelayItem::Packet(pkt) => {
4234 if matches!(
4235 pkt.data_type(),
4236 crate::DataType::ReliableAck
4237 | crate::DataType::ReliablePartialAck
4238 | crate::DataType::ReliablePacketRequest
4239 ) {
4240 if pkt.data_type() == crate::DataType::ReliableAck
4241 && Self::is_end_to_end_ack_sender(pkt.sender())
4242 && Self::decode_end_to_end_reliable_ack(pkt.payload()).is_ok()
4243 {
4244 if let Ok(packet_id) = Self::decode_end_to_end_reliable_ack(pkt.payload())
4245 && let Some(sender_hash) =
4246 Self::decode_end_to_end_ack_sender_hash(pkt.sender())
4247 {
4248 let mut st = self.state.lock();
4249 Self::note_end_to_end_acked_destination_locked(
4250 &mut st,
4251 packet_id,
4252 sender_hash,
4253 );
4254 }
4255 } else {
4256 let vals = pkt.data_as_u32()?;
4257 if vals.len() != 2 {
4258 return Err(TelemetryError::Unpack("bad reliable control payload"));
4259 }
4260 let ty = crate::DataType::try_from_u32(vals[0])
4261 .ok_or(TelemetryError::InvalidType)?;
4262 let seq = vals[1];
4263 match pkt.data_type() {
4264 crate::DataType::ReliableAck => {
4265 self.handle_reliable_ack(item.src, ty, seq)
4266 }
4267 crate::DataType::ReliablePartialAck => {
4268 self.handle_reliable_partial_ack(item.src, ty, seq)
4269 }
4270 crate::DataType::ReliablePacketRequest => {
4271 self.queue_reliable_retransmit(item.src, ty, seq)?
4272 }
4273 _ => {}
4274 }
4275 return Ok(());
4276 }
4277 }
4278 }
4279 RelayItem::Packed(bytes) => {
4280 let env = wire_format::peek_envelope(bytes.as_ref())?;
4281 if matches!(
4282 env.ty,
4283 crate::DataType::ReliableAck
4284 | crate::DataType::ReliablePacketRequest
4285 | crate::DataType::ReliablePartialAck
4286 ) {
4287 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4288 return self.dispatch_relay_rx_item(&RelayRxItem {
4289 src: item.src,
4290 data: RelayItem::Packet(Arc::new(pkt)),
4291 priority: item.priority,
4292 });
4293 }
4294 }
4295 }
4296
4297 let src = item.src;
4298 let data = item.data.clone();
4299 self.learn_discovery_item(src, &data)?;
4300
4301 let plan = self.remote_side_plan(&data, src)?;
4302 let mut st = self.state.lock();
4303 let RemoteSidePlan::Target(sides) = plan;
4304 for dst in sides {
4305 let priority = Self::relay_item_priority(&data)?;
4306 st.push_tx(RelayTxItem {
4307 src: Some(src),
4308 dst,
4309 data: data.clone(),
4310 priority,
4311 })?;
4312 }
4313 Ok(())
4314 }
4315
4316 #[inline]
4317 fn crc32_bytes(data: &[u8]) -> u32 {
4318 let mut hasher = Crc32Hasher::new();
4319 hasher.update(data);
4320 hasher.finalize()
4321 }
4322
4323 fn wrap_side_transport_frame(kind: u8, body: &[u8]) -> Arc<[u8]> {
4324 let mut out = Vec::with_capacity(
4325 SIDE_TRANSPORT_MAGIC.len() + 1 + body.len() + wire_format::CRC32_BYTES,
4326 );
4327 out.extend_from_slice(SIDE_TRANSPORT_MAGIC);
4328 out.push(kind);
4329 out.extend_from_slice(body);
4330 let crc = Self::crc32_bytes(&out);
4331 out.extend_from_slice(&crc.to_le_bytes());
4332 Arc::from(out)
4333 }
4334
4335 fn parse_side_transport_wrapper(bytes: &[u8]) -> TelemetryResult<Option<(u8, &[u8])>> {
4336 if bytes.len() < SIDE_TRANSPORT_MAGIC.len() + 1 + wire_format::CRC32_BYTES {
4337 return Ok(None);
4338 }
4339 if &bytes[..SIDE_TRANSPORT_MAGIC.len()] != SIDE_TRANSPORT_MAGIC {
4340 return Ok(None);
4341 }
4342 let data_len = bytes.len() - wire_format::CRC32_BYTES;
4343 let expected = u32::from_le_bytes([
4344 bytes[data_len],
4345 bytes[data_len + 1],
4346 bytes[data_len + 2],
4347 bytes[data_len + 3],
4348 ]);
4349 let data = &bytes[..data_len];
4350 if Self::crc32_bytes(data) != expected {
4351 return Err(TelemetryError::Unpack("side transport crc32 mismatch"));
4352 }
4353 let kind = data[SIDE_TRANSPORT_MAGIC.len()];
4354 Ok(Some((kind, &data[SIDE_TRANSPORT_MAGIC.len() + 1..])))
4355 }
4356
4357 fn read_uleb128_local(buf: &[u8], off: &mut usize) -> TelemetryResult<u64> {
4358 let mut result = 0u64;
4359 let mut shift = 0u32;
4360 for _ in 0..10 {
4361 let byte = *buf.get(*off).ok_or(TelemetryError::Unpack("short read"))?;
4362 *off += 1;
4363 result |= u64::from(byte & 0x7F) << shift;
4364 if (byte & 0x80) == 0 {
4365 return Ok(result);
4366 }
4367 shift += 7;
4368 }
4369 Err(TelemetryError::Unpack("uleb128 too long"))
4370 }
4371
4372 fn write_uleb128_local(mut value: u64, out: &mut Vec<u8>) {
4373 loop {
4374 let mut byte = (value & 0x7F) as u8;
4375 value >>= 7;
4376 if value != 0 {
4377 byte |= 0x80;
4378 }
4379 out.push(byte);
4380 if value == 0 {
4381 break;
4382 }
4383 }
4384 }
4385
4386 fn uleb128_len_local(mut value: u64) -> usize {
4387 let mut len = 1;
4388 while value >= 0x80 {
4389 value >>= 7;
4390 len += 1;
4391 }
4392 len
4393 }
4394
4395 fn extract_side_header_template(bytes: &[u8]) -> TelemetryResult<SideTemplateExtract<'_>> {
4396 if bytes.len() < wire_format::CRC32_BYTES + 4 {
4397 return Err(TelemetryError::Unpack("short buffer"));
4398 }
4399 let data_len = bytes.len() - wire_format::CRC32_BYTES;
4400 let data = &bytes[..data_len];
4401 let mut off = 0usize;
4402 let flags = *data
4403 .get(off)
4404 .ok_or(TelemetryError::Unpack("short prelude"))?;
4405 off += 1;
4406 off += 1; let ty_u64 = Self::read_uleb128_local(data, &mut off)?;
4408 let ty_u32 = u32::try_from(ty_u64).map_err(|_| TelemetryError::Unpack("bad data type"))?;
4409 if ty_u32 > crate::MAX_VALUE_DATA_TYPE {
4410 return Err(TelemetryError::Unpack("bad data type"));
4411 }
4412 let ty = crate::DataType(ty_u32);
4413 let data_size_off = off;
4414 let data_size = Self::read_uleb128_local(data, &mut off)?;
4415 let timestamp = Self::read_uleb128_local(data, &mut off)?;
4416 let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4417 u16::try_from(Self::read_uleb128_local(data, &mut off)?)
4418 .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
4419 } else {
4420 0
4421 };
4422 let between_start = off;
4423 let _source_address = u32::try_from(Self::read_uleb128_local(data, &mut off)?)
4424 .map_err(|_| TelemetryError::Unpack("source address too large"))?;
4425 let endpoint_bitmap_bytes = if (flags & SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT) != 0 {
4426 SIDE_TRANSPORT_EP_BITMAP_BYTES
4427 } else {
4428 0
4429 };
4430 if data.len() < off + endpoint_bitmap_bytes {
4431 return Err(TelemetryError::Unpack("short buffer"));
4432 }
4433 off += endpoint_bitmap_bytes;
4434 if (flags & SIDE_TRANSPORT_FLAG_WIRE_CONTRACT) != 0 {
4435 let contract_len = usize::try_from(Self::read_uleb128_local(data, &mut off)?)
4436 .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
4437 if data.len() < off + contract_len {
4438 return Err(TelemetryError::Unpack("short buffer"));
4439 }
4440 off += contract_len;
4441 }
4442 let reliable_span = wire_format::reliable_header_span(bytes)?;
4443 let (reliable_flags, reliable_seq_ack, reliable_compact, payload_off) =
4444 if let Some((rel_off, rel_len, hdr)) = reliable_span {
4445 if data.len() < rel_off + rel_len {
4446 return Err(TelemetryError::Unpack("short buffer"));
4447 }
4448 (
4449 Some(hdr.flags),
4450 Some((hdr.seq, hdr.ack)),
4451 (flags & SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER) != 0,
4452 rel_off + rel_len,
4453 )
4454 } else {
4455 (None, None, false, off)
4456 };
4457 if payload_off > data.len() {
4458 return Err(TelemetryError::Unpack("short buffer"));
4459 }
4460 let payload = &data[payload_off..];
4461 let prefix = Arc::<[u8]>::from(&data[1..data_size_off]);
4462 let between_end = reliable_span
4463 .map(|(rel_off, _, _)| rel_off)
4464 .unwrap_or(payload_off);
4465 let between = Arc::<[u8]>::from(&data[between_start..between_end]);
4466 let base_flags =
4467 flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE);
4468 let mut hash = 0xD1B5_4A32_9C7E_01F3u64;
4469 hash = hash_bytes_u64(hash, &[base_flags]);
4470 hash = hash_bytes_u64(hash, &prefix);
4471 hash = hash_bytes_u64(hash, &between);
4472 if let Some(rel_flags) = reliable_flags {
4473 hash = hash_bytes_u64(hash, &[rel_flags]);
4474 }
4475 let template = SideHeaderTemplate {
4476 hash,
4477 base_flags,
4478 prefix,
4479 between,
4480 reliable_flags,
4481 reliable_compact,
4482 };
4483 Ok((
4484 template,
4485 ty,
4486 flags,
4487 data_size,
4488 timestamp,
4489 nonce,
4490 reliable_seq_ack,
4491 payload,
4492 ))
4493 }
4494
4495 fn reconstruct_side_compact_frame(
4496 template: &SideHeaderTemplate,
4497 body: &[u8],
4498 timestamp_mode: SideCompactTimestampMode,
4499 timestamp_base: Option<u64>,
4500 ) -> TelemetryResult<(Arc<[u8]>, u64)> {
4501 if body.is_empty() {
4502 return Err(TelemetryError::Unpack("short side compact frame"));
4503 }
4504 let mut off = 0usize;
4505 let flags = body[off];
4506 off += 1;
4507 if (flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE))
4508 != template.base_flags
4509 {
4510 return Err(TelemetryError::Unpack("side compact flags mismatch"));
4511 }
4512 let data_size = Self::read_uleb128_local(body, &mut off)?;
4513 let timestamp = match timestamp_mode {
4514 SideCompactTimestampMode::Absolute => Self::read_uleb128_local(body, &mut off)?,
4515 SideCompactTimestampMode::Delta => {
4516 let timestamp_field = Self::read_uleb128_local(body, &mut off)?;
4517 let base = timestamp_base.ok_or(TelemetryError::Unpack(
4518 "missing side compact timestamp context",
4519 ))?;
4520 base.checked_add(timestamp_field)
4521 .ok_or(TelemetryError::Unpack(
4522 "side compact timestamp delta overflow",
4523 ))?
4524 }
4525 SideCompactTimestampMode::Omitted => timestamp_base.ok_or(TelemetryError::Unpack(
4526 "missing side compact timestamp context",
4527 ))?,
4528 };
4529 let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4530 Some(Self::read_uleb128_local(body, &mut off)?)
4531 } else {
4532 None
4533 };
4534 let reliable_seq_ack = if template.reliable_flags.is_some() {
4535 let seq = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4536 .map_err(|_| TelemetryError::Unpack("side compact reliable seq too large"))?;
4537 let ack = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4538 .map_err(|_| TelemetryError::Unpack("side compact reliable ack too large"))?;
4539 Some((seq, ack))
4540 } else {
4541 None
4542 };
4543 let payload = &body[off..];
4544 let mut raw = Vec::with_capacity(
4545 1 + template.prefix.len() + template.between.len() + payload.len() + 32,
4546 );
4547 raw.push(flags);
4548 raw.extend_from_slice(&template.prefix);
4549 Self::write_uleb128_local(data_size, &mut raw);
4550 Self::write_uleb128_local(timestamp, &mut raw);
4551 if let Some(nonce) = nonce {
4552 Self::write_uleb128_local(nonce, &mut raw);
4553 }
4554 raw.extend_from_slice(&template.between);
4555 if let Some(rel_flags) = template.reliable_flags {
4556 let (seq, ack) =
4557 reliable_seq_ack.ok_or(TelemetryError::Unpack("missing side compact reliable"))?;
4558 wire_format::write_reliable_header_encoded(
4559 wire_format::ReliableHeader {
4560 flags: rel_flags,
4561 seq,
4562 ack,
4563 },
4564 template.reliable_compact,
4565 &mut raw,
4566 );
4567 }
4568 raw.extend_from_slice(payload);
4569 let crc = Self::crc32_bytes(&raw);
4570 raw.extend_from_slice(&crc.to_le_bytes());
4571 Ok((Arc::from(raw), timestamp))
4572 }
4573
4574 fn split_side_transport_frame(
4575 &self,
4576 side: RelaySideId,
4577 frame: Arc<[u8]>,
4578 max_frame_bytes: usize,
4579 ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4580 if max_frame_bytes <= SIDE_TRANSPORT_CHUNK_OVERHEAD {
4581 return Err(TelemetryError::BadArg);
4582 }
4583 let payload_budget = max_frame_bytes - SIDE_TRANSPORT_CHUNK_OVERHEAD;
4584 let mut st = self.state.lock();
4585 let side_state = st
4586 .side_transport
4587 .get_mut(&side)
4588 .ok_or(TelemetryError::BadArg)?;
4589 let transfer_id = side_state.next_chunk_id.wrapping_add(1).max(1);
4590 side_state.next_chunk_id = transfer_id;
4591 drop(st);
4592
4593 let total = frame.len().div_ceil(payload_budget);
4594 let total_u16 =
4595 u16::try_from(total).map_err(|_| TelemetryError::PacketTooLarge("too many chunks"))?;
4596 let mut frames = Vec::with_capacity(total);
4597 for (idx, chunk) in frame.chunks(payload_budget).enumerate() {
4598 let mut body = Vec::with_capacity(8 + chunk.len());
4599 body.extend_from_slice(&transfer_id.to_le_bytes());
4600 body.extend_from_slice(&(idx as u16).to_le_bytes());
4601 body.extend_from_slice(&total_u16.to_le_bytes());
4602 body.extend_from_slice(chunk);
4603 frames.push(Self::wrap_side_transport_frame(
4604 SIDE_TRANSPORT_KIND_CHUNK,
4605 &body,
4606 ));
4607 }
4608 Ok(frames)
4609 }
4610
4611 fn encode_side_transport_frames(
4612 &self,
4613 side: RelaySideId,
4614 opts: RelaySideOptions,
4615 raw: Arc<[u8]>,
4616 ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4617 if !opts.header_template_enabled && opts.max_frame_bytes == 0 {
4618 return Ok(vec![raw]);
4619 }
4620 let raw_len = raw.len();
4621 let mut compact_payload_len = None;
4622 let mut used_compact = false;
4623 let mut used_timestamp_delta = false;
4624 let mut omitted_timestamp = false;
4625 let (template, ty, flags, data_size, timestamp, nonce, reliable_seq_ack, payload) =
4626 Self::extract_side_header_template(raw.as_ref())?;
4627 let (template_id, use_compact, previous_timestamp) = {
4628 let mut st = self.state.lock();
4629 let side_state = st
4630 .side_transport
4631 .get_mut(&side)
4632 .ok_or(TelemetryError::BadArg)?;
4633 if let Some(id) = side_state.tx_template_ids.get(&template.hash).copied() {
4634 let previous = side_state.tx_last_timestamps.get(&id).copied();
4635 (id, true, previous)
4636 } else {
4637 let next = side_state.next_template_id.wrapping_add(1).max(1);
4638 side_state.next_template_id = next;
4639 let evicted = side_state.insert_tx_template(
4640 template,
4641 next,
4642 opts.max_side_transport_templates,
4643 );
4644 if evicted {
4645 st.side_runtime_stats
4646 .entry(side)
4647 .or_default()
4648 .note_side_transport_template_eviction();
4649 }
4650 if let Some(side_state) = st.side_transport.get_mut(&side) {
4651 side_state.tx_last_timestamps.insert(next, timestamp);
4652 }
4653 (next, false, None)
4654 }
4655 };
4656 let wrapped = if use_compact {
4657 used_compact = true;
4658 compact_payload_len = Some(payload.len());
4659 let timestamp_field = if let Some(previous) = previous_timestamp {
4660 let delta = timestamp.saturating_sub(previous);
4661 let omit_timestamp = opts.omit_unchanged_compact_timestamps
4662 || opts.compact_timestamp_omission_types.contains(ty);
4663 if omit_timestamp && timestamp == previous {
4664 omitted_timestamp = true;
4665 None
4666 } else if timestamp >= previous
4667 && Self::uleb128_len_local(delta) < Self::uleb128_len_local(timestamp)
4668 {
4669 used_timestamp_delta = true;
4670 Some(delta)
4671 } else {
4672 Some(timestamp)
4673 }
4674 } else {
4675 Some(timestamp)
4676 };
4677 let mut body = Vec::with_capacity(payload.len() + 32);
4678 body.push(flags);
4679 Self::write_uleb128_local(u64::from(template_id), &mut body);
4680 Self::write_uleb128_local(data_size, &mut body);
4681 if let Some(timestamp_field) = timestamp_field {
4682 Self::write_uleb128_local(timestamp_field, &mut body);
4683 }
4684 if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4685 Self::write_uleb128_local(u64::from(nonce), &mut body);
4686 }
4687 if let Some((seq, ack)) = reliable_seq_ack {
4688 Self::write_uleb128_local(u64::from(seq), &mut body);
4689 Self::write_uleb128_local(u64::from(ack), &mut body);
4690 }
4691 body.extend_from_slice(payload);
4692 {
4693 let mut st = self.state.lock();
4694 if let Some(side_state) = st.side_transport.get_mut(&side) {
4695 side_state.tx_last_timestamps.insert(template_id, timestamp);
4696 }
4697 }
4698 let kind = if omitted_timestamp {
4699 SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4700 } else if used_timestamp_delta {
4701 SIDE_TRANSPORT_KIND_COMPACT_DELTA
4702 } else {
4703 SIDE_TRANSPORT_KIND_COMPACT
4704 };
4705 Self::wrap_side_transport_frame(kind, &body)
4706 } else {
4707 let mut body = Vec::with_capacity(raw.len() + 4);
4708 Self::write_uleb128_local(u64::from(template_id), &mut body);
4709 body.extend_from_slice(raw.as_ref());
4710 Self::wrap_side_transport_frame(SIDE_TRANSPORT_KIND_FULL, &body)
4711 };
4712 let frames = if opts.max_frame_bytes != 0 && wrapped.len() > opts.max_frame_bytes {
4713 self.split_side_transport_frame(side, wrapped, opts.max_frame_bytes)
4714 } else {
4715 Ok(vec![wrapped])
4716 }?;
4717 let wire_len = frames.iter().map(|frame| frame.len()).sum::<usize>();
4718 let mut st = self.state.lock();
4719 let stats = st.side_runtime_stats.entry(side).or_default();
4720 if used_compact {
4721 let overhead = compact_payload_len
4722 .map(|payload_len| wire_len.saturating_sub(payload_len))
4723 .unwrap_or(wire_len);
4724 stats.note_side_transport_compact(
4725 raw_len,
4726 wire_len,
4727 overhead,
4728 used_timestamp_delta,
4729 omitted_timestamp,
4730 );
4731 if opts.compact_header_target_bytes != 0 && overhead > opts.compact_header_target_bytes
4732 {
4733 stats.note_side_transport_compact_target_miss();
4734 }
4735 } else {
4736 stats.note_side_transport_full(raw_len, wire_len);
4737 }
4738 if frames.len() > 1 {
4739 stats.note_side_transport_chunks(frames.len());
4740 }
4741 Ok(frames)
4742 }
4743
4744 fn decode_side_transport_frame(
4745 &self,
4746 side: RelaySideId,
4747 bytes: &[u8],
4748 ) -> TelemetryResult<Option<Arc<[u8]>>> {
4749 let Some((kind, body)) = Self::parse_side_transport_wrapper(bytes)? else {
4750 return Ok(Some(Arc::from(bytes)));
4751 };
4752 match kind {
4753 SIDE_TRANSPORT_KIND_FULL => {
4754 let mut off = 0usize;
4755 let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4756 .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4757 let raw = Arc::<[u8]>::from(&body[off..]);
4758 if let Ok((template, _, _, _, timestamp, _, _, _)) =
4759 Self::extract_side_header_template(raw.as_ref())
4760 {
4761 let mut st = self.state.lock();
4762 let max_templates = st
4763 .sides
4764 .get(side)
4765 .and_then(|side| side.as_ref())
4766 .map(|side| side.opts.max_side_transport_templates)
4767 .unwrap_or(DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT);
4768 let evicted = st.side_transport.get_mut(&side).is_some_and(|side_state| {
4769 let evicted =
4770 side_state.insert_rx_template(template_id, template, max_templates);
4771 side_state.rx_last_timestamps.insert(template_id, timestamp);
4772 evicted
4773 });
4774 if evicted {
4775 st.side_runtime_stats
4776 .entry(side)
4777 .or_default()
4778 .note_side_transport_template_eviction();
4779 }
4780 }
4781 Ok(Some(raw))
4782 }
4783 SIDE_TRANSPORT_KIND_COMPACT
4784 | SIDE_TRANSPORT_KIND_COMPACT_DELTA
4785 | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => {
4786 if body.is_empty() {
4787 return Err(TelemetryError::Unpack("short side compact frame"));
4788 }
4789 let mut off = 1usize;
4790 let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4791 .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4792 let mut compact_body = Vec::with_capacity(1 + body.len().saturating_sub(off));
4793 compact_body.push(body[0]);
4794 compact_body.extend_from_slice(&body[off..]);
4795 let (template, timestamp_base) = {
4796 let st = self.state.lock();
4797 let state = st.side_transport.get(&side);
4798 let template = state
4799 .and_then(|state| state.rx_templates_by_id.get(&template_id))
4800 .cloned();
4801 let timestamp_base = if matches!(
4802 kind,
4803 SIDE_TRANSPORT_KIND_COMPACT_DELTA
4804 | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4805 ) {
4806 state
4807 .and_then(|state| state.rx_last_timestamps.get(&template_id))
4808 .copied()
4809 } else {
4810 None
4811 };
4812 (template, timestamp_base)
4813 };
4814 let template =
4815 template.ok_or(TelemetryError::Unpack("unknown side compact template"))?;
4816 let timestamp_mode = match kind {
4817 SIDE_TRANSPORT_KIND_COMPACT_DELTA => SideCompactTimestampMode::Delta,
4818 SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => SideCompactTimestampMode::Omitted,
4819 _ => SideCompactTimestampMode::Absolute,
4820 };
4821 let (frame, timestamp) = Self::reconstruct_side_compact_frame(
4822 &template,
4823 &compact_body,
4824 timestamp_mode,
4825 timestamp_base,
4826 )?;
4827 let mut st = self.state.lock();
4828 if let Some(side_state) = st.side_transport.get_mut(&side) {
4829 side_state.rx_last_timestamps.insert(template_id, timestamp);
4830 }
4831 Ok(Some(frame))
4832 }
4833 SIDE_TRANSPORT_KIND_CHUNK => {
4834 if body.len() < 8 {
4835 return Err(TelemetryError::Unpack("short side chunk frame"));
4836 }
4837 let transfer_id = u32::from_le_bytes([body[0], body[1], body[2], body[3]]);
4838 let index = u16::from_le_bytes([body[4], body[5]]);
4839 let total = u16::from_le_bytes([body[6], body[7]]);
4840 let payload = Arc::<[u8]>::from(&body[8..]);
4841 let assembled = {
4842 let mut st = self.state.lock();
4843 let side_state = st
4844 .side_transport
4845 .get_mut(&side)
4846 .ok_or(TelemetryError::BadArg)?;
4847 let entry = side_state.rx_chunks.entry(transfer_id).or_default();
4848 if entry.total == 0 {
4849 entry.total = total;
4850 } else if entry.total != total {
4851 side_state.rx_chunks.remove(&transfer_id);
4852 return Err(TelemetryError::Unpack("side chunk total mismatch"));
4853 }
4854 entry.received.entry(index).or_insert(payload);
4855 if entry.received.len() == usize::from(total) {
4856 let entry = side_state
4857 .rx_chunks
4858 .remove(&transfer_id)
4859 .ok_or(TelemetryError::Unpack("side chunk missing"))?;
4860 let mut out = Vec::new();
4861 for idx in 0..entry.total {
4862 let chunk = entry
4863 .received
4864 .get(&idx)
4865 .ok_or(TelemetryError::Unpack("side chunk gap"))?;
4866 out.extend_from_slice(chunk);
4867 }
4868 Some(Arc::<[u8]>::from(out))
4869 } else {
4870 None
4871 }
4872 };
4873 match assembled {
4874 Some(frame) => self.decode_side_transport_frame(side, frame.as_ref()),
4875 None => Ok(None),
4876 }
4877 }
4878 _ => Err(TelemetryError::Unpack("unknown side transport frame")),
4879 }
4880 }
4881
4882 fn call_tx_handler(
4888 &self,
4889 side: RelaySideId,
4890 handler: &RelayTxHandlerFn,
4891 data: &RelayItem,
4892 ) -> TelemetryResult<()> {
4893 let opts = {
4894 let st = self.state.lock();
4895 Self::side_ref(&st, side)?.opts
4896 };
4897 let Some(_side_tx_guard) = self.try_enter_side_tx() else {
4898 return Err(TelemetryError::Io("side tx busy"));
4899 };
4900 let started_ms = self.clock.now_ms();
4901 let ty = match data {
4902 RelayItem::Packet(pkt) => pkt.data_type(),
4903 RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
4904 };
4905 let result = match (handler, data) {
4906 (RelayTxHandlerFn::Packed(f), RelayItem::Packed(bytes)) => {
4908 let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
4909 let mut sent_bytes = 0usize;
4910 for frame in frames {
4911 f(frame.as_ref())?;
4912 sent_bytes = sent_bytes.saturating_add(frame.len());
4913 }
4914 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4915 self.note_side_tx_success(side, ty, sent_bytes, 1);
4916 return Ok(());
4917 }
4918 (RelayTxHandlerFn::Packet(f), RelayItem::Packet(pkt)) => f(pkt),
4919
4920 (RelayTxHandlerFn::Packed(f), RelayItem::Packet(pkt)) => {
4922 let owned = wire_format::pack_packet(pkt);
4923 let frames = self.encode_side_transport_frames(side, opts, owned)?;
4924 let mut sent_bytes = 0usize;
4925 for frame in frames {
4926 f(frame.as_ref())?;
4927 sent_bytes = sent_bytes.saturating_add(frame.len());
4928 }
4929 self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4930 self.note_side_tx_success(side, ty, sent_bytes, 1);
4931 return Ok(());
4932 }
4933 (RelayTxHandlerFn::Packet(f), RelayItem::Packed(bytes)) => {
4934 if wire_format::peek_frame_info(bytes.as_ref())
4935 .ok()
4936 .is_some_and(|frame| frame.ack_only())
4937 {
4938 return Ok(());
4939 }
4940 let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4941 f(&pkt)
4942 }
4943 };
4944 if result.is_ok()
4945 && let Ok(bytes) = Self::relay_item_wire_len(data)
4946 {
4947 self.record_side_tx_sample(side, bytes, started_ms, self.clock.now_ms());
4948 self.note_side_tx_success(side, ty, bytes, 1);
4949 } else if result.is_err() {
4950 self.note_side_tx_failure(side, ty, 1);
4951 }
4952 result
4953 }
4954
4955 fn adjust_reliable_for_side(
4956 &self,
4957 opts: RelaySideOptions,
4958 data: RelayItem,
4959 ) -> TelemetryResult<Option<RelayItem>> {
4960 if opts.reliable_enabled {
4961 return Ok(Some(data));
4962 }
4963
4964 match data {
4965 RelayItem::Packed(bytes) => {
4966 let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4967 Ok(frame) => frame,
4968 Err(_) => return Ok(Some(RelayItem::Packed(bytes))),
4969 };
4970 if is_reliable_type(frame.envelope.ty)
4971 && let Some(hdr) = frame.reliable
4972 {
4973 if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
4974 return Ok(None);
4975 }
4976 if (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) == 0 {
4977 let Some(rewritten) = wire_format::rewrite_reliable_header_owned(
4978 bytes.as_ref(),
4979 wire_format::RELIABLE_FLAG_UNSEQUENCED,
4980 hdr.seq,
4981 0,
4982 )?
4983 else {
4984 return Ok(Some(RelayItem::Packed(bytes)));
4985 };
4986 return Ok(Some(RelayItem::Packed(rewritten)));
4987 }
4988 }
4989 Ok(Some(RelayItem::Packed(bytes)))
4990 }
4991 RelayItem::Packet(pkt) => {
4992 if matches!(
4993 pkt.data_type(),
4994 crate::DataType::ReliableAck
4995 | crate::DataType::ReliablePartialAck
4996 | crate::DataType::ReliablePacketRequest
4997 ) {
4998 return Ok(None);
4999 }
5000 Ok(Some(RelayItem::Packet(pkt)))
5001 }
5002 }
5003 }
5004
5005 #[inline]
5007 pub fn process_rx_queue(&self) -> TelemetryResult<()> {
5008 self.process_rx_queue_with_timeout(0)
5009 }
5010
5011 #[inline]
5016 pub fn process_tx_queue(&self) -> TelemetryResult<()> {
5017 self.process_tx_queue_with_timeout(0)
5018 }
5019
5020 #[inline]
5022 pub fn process_all_queues(&self) -> TelemetryResult<()> {
5023 self.process_all_queues_with_timeout(0)
5024 }
5025
5026 pub fn process_tx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5031 if self.side_tx_active() {
5032 return Ok(());
5033 }
5034 #[cfg(feature = "discovery")]
5035 {
5036 let _ = self.poll_discovery()?;
5037 }
5038 let start = self.clock.now_ms();
5039 loop {
5040 self.process_reliable_timeouts()?;
5041 if self.process_replay_queue_item()? {
5042 if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5043 break;
5044 }
5045 continue;
5046 }
5047 let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() else {
5048 break;
5049 };
5050 match self.send_tx_item(src, dst, handler, opts, data.clone()) {
5051 Ok(sent) => {
5052 if sent
5053 && timeout_ms != 0
5054 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64
5055 {
5056 break;
5057 }
5058 }
5059 Err(e) if Self::is_side_tx_busy(&e) => {
5060 let priority = Self::relay_item_priority(&data)?;
5061 let mut st = self.state.lock();
5062 st.push_tx(RelayTxItem {
5063 src,
5064 dst,
5065 data,
5066 priority,
5067 })?;
5068 break;
5069 }
5070 Err(e) => return Err(e),
5071 }
5072 }
5073 Ok(())
5074 }
5075
5076 pub fn process_rx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5078 #[cfg(feature = "discovery")]
5079 {
5080 let _ = self.poll_discovery()?;
5081 }
5082 let start = self.clock.now_ms();
5083 loop {
5084 let item_opt = {
5085 let mut st = self.state.lock();
5086 st.rx_queue.pop_front()
5087 };
5088 let Some(item) = item_opt else { break };
5089 self.process_rx_queue_item(item)?;
5090
5091 if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5092 break;
5093 }
5094 }
5095 Ok(())
5096 }
5097
5098 pub fn process_all_queues_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5103 if self.side_tx_active() {
5104 return Ok(());
5105 }
5106 #[cfg(feature = "discovery")]
5107 {
5108 let _ = self.poll_discovery()?;
5109 }
5110 let drain_fully = timeout_ms == 0;
5111 let start = if drain_fully { 0 } else { self.clock.now_ms() };
5112
5113 loop {
5114 let mut did_any = false;
5115 self.process_reliable_timeouts()?;
5116
5117 if let Some(item) = {
5119 let mut st = self.state.lock();
5120 st.rx_queue.pop_front()
5121 } {
5122 self.process_rx_queue_item(item)?;
5123 did_any = true;
5124 }
5125
5126 if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5127 break;
5128 }
5129
5130 if self.process_replay_queue_item()? {
5131 did_any = true;
5132 }
5133
5134 let sent_one = if let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() {
5136 self.send_tx_item(src, dst, handler, opts, data)?
5137 } else {
5138 false
5139 };
5140
5141 if sent_one {
5142 did_any = true;
5143 }
5144
5145 if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5146 break;
5147 }
5148
5149 if !did_any {
5150 break;
5151 }
5152 }
5153
5154 Ok(())
5155 }
5156
5157 pub fn periodic(&self, timeout_ms: u32) -> TelemetryResult<()> {
5162 #[cfg(feature = "discovery")]
5163 {
5164 let _ = self.poll_discovery()?;
5165 }
5166
5167 self.process_all_queues_with_timeout(timeout_ms)
5168 }
5169}