Skip to main content

sedsnet/
relay.rs

1use crate::config::{
2    RuntimeMemoryConfig, runtime_reliable_max_end_to_end_ack_cache,
3    runtime_reliable_max_end_to_end_pending, runtime_reliable_max_pending,
4    runtime_reliable_max_retries, runtime_reliable_max_return_routes,
5    runtime_reliable_retransmit_ms,
6};
7use crate::diagnostics::{
8    AdaptiveLinkStats, DiscoveryRuntimeStats, QueueRuntimeStats, ReliableRuntimeStats,
9    RouteModeStats, RouteOverrideStats, RoutePriorityStats, RouteWeightStats, RuntimeSideStats,
10    RuntimeStatsSnapshot, RuntimeTypeStats, TypedRouteOverrideStats,
11};
12#[cfg(feature = "discovery")]
13use crate::discovery::{
14    self, ClientStatsSnapshot, DISCOVERY_ROUTE_TTL_MS, DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS,
15    DISCOVERY_SLOW_LINK_PING_INTERVAL_MS, DiscoveryCadenceState,
16    TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS, TopologyAnnouncerRoute, TopologyBoardNode,
17    TopologySideRoute, TopologySnapshot,
18};
19use crate::packet::{Packet, hash_bytes_u64};
20use crate::queue::{BoundedDeque, ByteCost};
21use crate::wire_format;
22use crate::{is_reliable_type, message_meta, message_priority, reliable_mode};
23use crate::{
24    router::{Clock, CompactTimestampOmissionPolicy, SideTransportProfile},
25    {
26        RouteSelectionMode, TelemetryError, TelemetryResult,
27        lock::{ReentryGate, ReentryGuard, RouterMutex},
28    },
29};
30use alloc::borrow::ToOwned;
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
33use alloc::string::{String, ToString};
34use alloc::{sync::Arc, vec, vec::Vec};
35use core::mem::size_of;
36use crc32fast::Hasher as Crc32Hasher;
37
38/// Logical side index (CAN, UART, RADIO, etc.)
39pub type RelaySideId = usize;
40const SIDE_TRANSPORT_MAGIC: &[u8; 3] = b"SDT";
41const SIDE_TRANSPORT_KIND_FULL: u8 = 0x01;
42const SIDE_TRANSPORT_KIND_COMPACT: u8 = 0x02;
43const SIDE_TRANSPORT_KIND_CHUNK: u8 = 0x03;
44const SIDE_TRANSPORT_KIND_COMPACT_DELTA: u8 = 0x04;
45const SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP: u8 = 0x05;
46const SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED: u8 = 0x01;
47const SIDE_TRANSPORT_FLAG_WIRE_CONTRACT: u8 = 0x04;
48const SIDE_TRANSPORT_FLAG_PACKET_NONCE: u8 = 0x08;
49const SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
50const SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
51const CONTROL_SLOW_LINK_CAPACITY_BPS: u64 = 512;
52const SIDE_TRANSPORT_CHUNK_OVERHEAD: usize = 3 + 1 + 4 + 2 + 2 + wire_format::CRC32_BYTES;
53const SIDE_TRANSPORT_EP_BITMAP_BITS: usize = (crate::MAX_VALUE_DATA_ENDPOINT as usize) + 1;
54const SIDE_TRANSPORT_EP_BITMAP_BYTES: usize = SIDE_TRANSPORT_EP_BITMAP_BITS.div_ceil(8);
55pub const IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
56    crate::router::IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
57pub const IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
58    crate::router::IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
59pub const DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT: usize =
60    crate::router::DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT;
61/// Packet Handler function type
62type PacketHandlerFn = dyn Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static;
63
64/// Packed Handler function type
65type PackedHandlerFn = dyn Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static;
66
67/// TX handler for a relay side: either packed or packet-based.
68#[derive(Clone)]
69pub enum RelayTxHandlerFn {
70    Packed(Arc<PackedHandlerFn>),
71    Packet(Arc<PacketHandlerFn>),
72}
73
74#[derive(Clone, Copy, Debug)]
75pub struct RelaySideOptions {
76    /// Enables the relay's per-link reliable transport layer on this side.
77    ///
78    /// When `true` and the side uses a packed TX handler, reliable schema traffic on this hop
79    /// gains relay-managed sequence numbers, ACKs, packet requests, and retransmits.
80    /// Packet-output sides still receive decoded packets rather than packed reliable framing.
81    pub reliable_enabled: bool,
82    /// Marks the side as eligible for link-local-only endpoints and discovery routes.
83    pub link_local_enabled: bool,
84    /// Allows packets received from this side to enter relay processing.
85    pub ingress_enabled: bool,
86    /// Allows the relay to transmit packets toward this side.
87    pub egress_enabled: bool,
88    /// Enables side-local header-template reuse for packed transport.
89    pub header_template_enabled: bool,
90    /// Maximum number of bytes to emit per packed TX callback.
91    ///
92    /// When non-zero and a packed frame would exceed this size, the relay
93    /// splits it into ordered side-transport chunks and reassembles them on RX
94    /// before normal relay processing. This is intended for fixed-size links
95    /// such as CAN or I2C while keeping the user API packet-oriented.
96    pub max_frame_bytes: usize,
97    /// Target total side-transport overhead for compact follow-up frames.
98    pub compact_header_target_bytes: usize,
99    /// Maximum side-local header templates retained for TX and RX dictionaries.
100    pub max_side_transport_templates: usize,
101    /// Omits the timestamp field from compact follow-up frames when it is unchanged.
102    pub omit_unchanged_compact_timestamps: bool,
103    /// Optional per-data-type timestamp omission policy for compact follow-up frames.
104    pub compact_timestamp_omission_types: CompactTimestampOmissionPolicy,
105    /// Declared compact-link profile for stats and future negotiation.
106    pub side_transport_profile: SideTransportProfile,
107}
108
109impl Default for RelaySideOptions {
110    fn default() -> Self {
111        Self {
112            reliable_enabled: false,
113            link_local_enabled: false,
114            ingress_enabled: true,
115            egress_enabled: true,
116            header_template_enabled: false,
117            max_frame_bytes: 0,
118            compact_header_target_bytes: 0,
119            max_side_transport_templates: DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT,
120            omit_unchanged_compact_timestamps: false,
121            compact_timestamp_omission_types: CompactTimestampOmissionPolicy::none(),
122            side_transport_profile: SideTransportProfile::Canonical,
123        }
124    }
125}
126
127impl RelaySideOptions {
128    /// Convenience preset for bounded packed-side transport.
129    ///
130    /// `max_frame_bytes == 0` leaves packed frames unbounded. Values greater
131    /// than zero enable relay-managed chunking/reassembly on this side.
132    #[inline]
133    pub fn with_small_packet_transport(mut self, max_frame_bytes: usize) -> Self {
134        self.header_template_enabled = true;
135        self.max_frame_bytes = max_frame_bytes;
136        self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
137        self.side_transport_profile = SideTransportProfile::Ipv6Like;
138        self
139    }
140
141    #[inline]
142    pub fn with_ipv4_like_compact_header_target(mut self) -> Self {
143        self.header_template_enabled = true;
144        self.compact_header_target_bytes = IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
145        self.omit_unchanged_compact_timestamps = true;
146        self.side_transport_profile = SideTransportProfile::Ipv4Like;
147        self
148    }
149
150    #[inline]
151    pub fn with_ipv6_like_compact_header_target(mut self) -> Self {
152        self.header_template_enabled = true;
153        self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
154        self.side_transport_profile = SideTransportProfile::Ipv6Like;
155        self
156    }
157
158    #[inline]
159    pub fn with_template_transport(mut self) -> Self {
160        self.header_template_enabled = true;
161        self.side_transport_profile = SideTransportProfile::Template;
162        self
163    }
164
165    #[inline]
166    pub fn with_omitted_unchanged_compact_timestamps(mut self) -> Self {
167        self.header_template_enabled = true;
168        self.omit_unchanged_compact_timestamps = true;
169        self
170    }
171
172    #[inline]
173    pub fn with_omitted_unchanged_compact_timestamps_for_type(
174        mut self,
175        ty: crate::DataType,
176    ) -> Self {
177        self.header_template_enabled = true;
178        self.compact_timestamp_omission_types.insert(ty);
179        self
180    }
181
182    #[inline]
183    pub fn effective_transport_profile(self) -> SideTransportProfile {
184        if !self.header_template_enabled && self.max_frame_bytes == 0 {
185            SideTransportProfile::Canonical
186        } else if self.side_transport_profile == SideTransportProfile::Canonical {
187            SideTransportProfile::Template
188        } else {
189            self.side_transport_profile
190        }
191    }
192
193    #[cfg(feature = "discovery")]
194    #[inline]
195    pub fn link_capabilities(self) -> discovery::LinkCapabilities {
196        let mut flags = discovery::LINK_CAPABILITY_END_TO_END_RELIABILITY;
197        if self.header_template_enabled {
198            flags |= discovery::LINK_CAPABILITY_HEADER_TEMPLATES;
199        }
200        if self.max_frame_bytes != 0 {
201            flags |= discovery::LINK_CAPABILITY_CHUNKING;
202        }
203        if self.reliable_enabled {
204            flags |= discovery::LINK_CAPABILITY_RELIABILITY;
205        }
206        if self.omit_unchanged_compact_timestamps
207            || !self.compact_timestamp_omission_types.is_empty()
208        {
209            flags |= discovery::LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS;
210        }
211        #[cfg(feature = "cryptography")]
212        {
213            flags |= discovery::LINK_CAPABILITY_CRYPTO;
214        }
215        discovery::LinkCapabilities {
216            version: 1,
217            flags,
218            profile: self.effective_transport_profile().discovery_code(),
219            max_frame_bytes: self.max_frame_bytes.min(u32::MAX as usize) as u32,
220            compact_header_target_bytes: self.compact_header_target_bytes.min(u32::MAX as usize)
221                as u32,
222            max_side_transport_templates: self.max_side_transport_templates.min(u32::MAX as usize)
223                as u32,
224        }
225    }
226}
227
228/// One side of the relay – a name + TX handler.
229#[derive(Clone)]
230pub struct RelaySide {
231    pub name: &'static str,
232    pub tx_handler: RelayTxHandlerFn,
233    pub opts: RelaySideOptions,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
237pub enum RelayItem {
238    Packed(Arc<[u8]>),
239    Packet(Arc<Packet>),
240}
241
242/// Item that was received by the relay from some side.
243#[derive(Clone, Debug, PartialEq, Eq)]
244struct RelayRxItem {
245    src: RelaySideId,
246    data: RelayItem,
247    priority: u8,
248}
249
250impl ByteCost for RelayRxItem {
251    fn byte_cost(&self) -> usize {
252        match &self.data {
253            RelayItem::Packed(bytes) => bytes.len(),
254            RelayItem::Packet(pkt) => pkt.byte_cost(),
255        }
256    }
257}
258
259/// Item that is ready to be transmitted out a destination side.
260#[derive(Clone, Debug, PartialEq, Eq)]
261struct RelayTxItem {
262    src: Option<RelaySideId>,
263    dst: RelaySideId,
264    data: RelayItem,
265    priority: u8,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq)]
269struct RelayReplayItem {
270    dst: RelaySideId,
271    bytes: Arc<[u8]>,
272    priority: u8,
273}
274
275impl ByteCost for RelayTxItem {
276    fn byte_cost(&self) -> usize {
277        match &self.data {
278            RelayItem::Packed(bytes) => bytes.len(),
279            RelayItem::Packet(pkt) => pkt.byte_cost(),
280        }
281    }
282}
283
284impl ByteCost for RelayReplayItem {
285    fn byte_cost(&self) -> usize {
286        self.bytes.len()
287    }
288}
289
290// -------------------- Reliable delivery state (relay) --------------------
291
292#[derive(Debug, Clone)]
293struct ReliableTxState {
294    next_seq: u32,
295    sent_order: VecDeque<u32>,
296    sent: BTreeMap<u32, ReliableSent>,
297}
298
299#[derive(Debug, Clone)]
300struct ReliableSent {
301    bytes: Arc<[u8]>,
302    last_send_ms: u64,
303    retries: u32,
304    queued: bool,
305    partial_acked: bool,
306}
307
308#[derive(Debug, Clone)]
309struct ReliableRxState {
310    expected_seq: u32,
311    buffered: BTreeMap<u32, Arc<[u8]>>,
312}
313
314#[derive(Debug, Clone)]
315struct ReliableReturnRouteState {
316    side: RelaySideId,
317}
318
319#[cfg(feature = "discovery")]
320#[derive(Debug, Clone, Default, PartialEq, Eq)]
321struct DiscoverySenderState {
322    reachable: Vec<crate::DataEndpoint>,
323    reachable_timesync_sources: Vec<String>,
324    topology_boards: Vec<TopologyBoardNode>,
325    last_seen_ms: u64,
326}
327
328#[inline]
329fn is_internal_control_type(ty: crate::DataType) -> bool {
330    if matches!(
331        ty,
332        crate::DataType::ReliableAck
333            | crate::DataType::ReliablePartialAck
334            | crate::DataType::ReliablePacketRequest
335            | crate::DataType::P2pMessage
336    ) {
337        return true;
338    }
339
340    #[cfg(feature = "timesync")]
341    if matches!(
342        ty,
343        crate::DataType::TimeSyncAnnounce
344            | crate::DataType::TimeSyncRequest
345            | crate::DataType::TimeSyncResponse
346    ) {
347        return true;
348    }
349
350    #[cfg(feature = "discovery")]
351    if discovery::is_discovery_type(ty) {
352        return true;
353    }
354
355    let _ = ty;
356    false
357}
358
359#[cfg(feature = "discovery")]
360#[derive(Debug, Clone, Default, PartialEq, Eq)]
361struct DiscoverySideState {
362    reachable: Vec<crate::DataEndpoint>,
363    reachable_timesync_sources: Vec<String>,
364    last_seen_ms: u64,
365    announcers: BTreeMap<String, DiscoverySenderState>,
366}
367
368#[derive(Clone, Debug, Default, PartialEq, Eq)]
369struct SideChunkAssembly {
370    total: u16,
371    received: BTreeMap<u16, Arc<[u8]>>,
372}
373
374#[derive(Clone, Debug, Default)]
375struct SideTransportState {
376    tx_template_ids: BTreeMap<u64, u32>,
377    tx_templates: BTreeMap<u64, SideHeaderTemplate>,
378    tx_last_timestamps: BTreeMap<u32, u64>,
379    rx_templates: BTreeMap<u64, SideHeaderTemplate>,
380    rx_templates_by_id: BTreeMap<u32, SideHeaderTemplate>,
381    rx_last_timestamps: BTreeMap<u32, u64>,
382    rx_chunks: BTreeMap<u32, SideChunkAssembly>,
383    next_chunk_id: u32,
384    next_template_id: u32,
385}
386
387impl SideTransportState {
388    fn tx_template_count(&self) -> usize {
389        self.tx_template_ids.len()
390    }
391
392    fn rx_template_count(&self) -> usize {
393        self.rx_templates_by_id.len()
394    }
395
396    fn insert_tx_template(
397        &mut self,
398        template: SideHeaderTemplate,
399        template_id: u32,
400        max_templates: usize,
401    ) -> bool {
402        if max_templates == 0 {
403            return false;
404        }
405        let mut evicted = false;
406        if self.tx_template_ids.len() >= max_templates
407            && !self.tx_template_ids.contains_key(&template.hash)
408            && let Some(old_hash) = self.tx_template_ids.keys().next().copied()
409        {
410            if let Some(old_id) = self.tx_template_ids.remove(&old_hash) {
411                self.tx_last_timestamps.remove(&old_id);
412            }
413            self.tx_templates.remove(&old_hash);
414            evicted = true;
415        }
416        self.tx_template_ids.insert(template.hash, template_id);
417        self.tx_templates.insert(template.hash, template);
418        evicted
419    }
420
421    fn insert_rx_template(
422        &mut self,
423        template_id: u32,
424        template: SideHeaderTemplate,
425        max_templates: usize,
426    ) -> bool {
427        if max_templates == 0 {
428            return false;
429        }
430        let mut evicted = false;
431        if self.rx_templates_by_id.len() >= max_templates
432            && !self.rx_templates_by_id.contains_key(&template_id)
433            && let Some(old_id) = self.rx_templates_by_id.keys().next().copied()
434            && let Some(old_template) = self.rx_templates_by_id.remove(&old_id)
435        {
436            self.rx_templates.remove(&old_template.hash);
437            self.rx_last_timestamps.remove(&old_id);
438            evicted = true;
439        }
440        self.rx_templates_by_id
441            .insert(template_id, template.clone());
442        self.rx_templates.insert(template.hash, template);
443        evicted
444    }
445}
446
447#[derive(Clone, Debug, PartialEq, Eq)]
448struct SideHeaderTemplate {
449    hash: u64,
450    base_flags: u8,
451    prefix: Arc<[u8]>,
452    between: Arc<[u8]>,
453    reliable_flags: Option<u8>,
454    reliable_compact: bool,
455}
456
457type SideTemplateExtract<'a> = (
458    SideHeaderTemplate,
459    crate::DataType,
460    u8,
461    u64,
462    u64,
463    u16,
464    Option<(u32, u32)>,
465    &'a [u8],
466);
467
468#[derive(Clone, Copy, Debug, PartialEq, Eq)]
469enum SideCompactTimestampMode {
470    Absolute,
471    Delta,
472    Omitted,
473}
474
475#[derive(Debug, Clone, Default)]
476struct AdaptiveRouteStats {
477    estimated_bandwidth_bps: u64,
478    peak_bandwidth_bps: u64,
479    last_observed_ms: u64,
480    last_slow_observed_ms: u64,
481    sample_count: u64,
482    window_started_ms: u64,
483    window_bytes: u64,
484    peak_usage_bps: u64,
485}
486
487#[cfg(feature = "discovery")]
488#[derive(Debug, Clone, Default)]
489struct DiscoverySideThrottleState {
490    next_ping_ms: u64,
491    next_full_ms: u64,
492}
493
494#[cfg(all(feature = "discovery", feature = "timesync"))]
495#[derive(Debug, Clone, Default)]
496struct TimeSyncSideThrottleState {
497    next_allowed_ms: u64,
498}
499
500#[cfg(feature = "discovery")]
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
502enum DiscoveryAdvertiseLevel {
503    MinimalPing,
504    Full,
505}
506
507impl AdaptiveRouteStats {
508    #[inline]
509    fn observe(&mut self, bytes: usize, sample_bps: u64, now_ms: u64) {
510        self.estimated_bandwidth_bps = if self.estimated_bandwidth_bps == 0 {
511            sample_bps
512        } else if sample_bps >= self.estimated_bandwidth_bps {
513            self.estimated_bandwidth_bps
514                .saturating_mul(3)
515                .saturating_add(sample_bps.saturating_mul(5))
516                / 8
517        } else {
518            self.estimated_bandwidth_bps
519                .saturating_mul(7)
520                .saturating_add(sample_bps)
521                / 8
522        };
523        self.peak_bandwidth_bps = self.peak_bandwidth_bps.max(sample_bps);
524        self.last_observed_ms = now_ms;
525        if sample_bps > 0 && sample_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS {
526            self.last_slow_observed_ms = now_ms;
527        }
528        self.sample_count = self.sample_count.saturating_add(1);
529        if self.window_started_ms == 0 || now_ms.saturating_sub(self.window_started_ms) > 1_000 {
530            self.window_started_ms = now_ms;
531            self.window_bytes = 0;
532        }
533        self.window_bytes = self.window_bytes.saturating_add(bytes as u64);
534        self.peak_usage_bps = self.peak_usage_bps.max(self.current_usage_bps(now_ms));
535    }
536
537    #[inline]
538    fn current_usage_bps(&self, now_ms: u64) -> u64 {
539        if self.window_started_ms == 0 {
540            return 0;
541        }
542        let elapsed_ms = now_ms.saturating_sub(self.window_started_ms).max(1);
543        (u128::from(self.window_bytes).saturating_mul(1000) / u128::from(elapsed_ms))
544            .min(u128::from(u64::MAX)) as u64
545    }
546
547    #[inline]
548    fn available_headroom_bps(&self, now_ms: u64) -> u64 {
549        let capacity = self
550            .estimated_bandwidth_bps
551            .max(self.peak_bandwidth_bps)
552            .max(1);
553        capacity.saturating_sub(self.current_usage_bps(now_ms))
554    }
555
556    #[inline]
557    fn weight(&self, now_ms: u64) -> u64 {
558        self.available_headroom_bps(now_ms).max(1)
559    }
560
561    #[inline]
562    fn snapshot(&self, now_ms: u64, auto_balancing_enabled: bool) -> AdaptiveLinkStats {
563        let current_usage_bps = self.current_usage_bps(now_ms);
564        let estimated_capacity_bps = self.estimated_bandwidth_bps.max(1);
565        let peak_capacity_bps = self.peak_bandwidth_bps.max(estimated_capacity_bps);
566        let available_headroom_bps = peak_capacity_bps.saturating_sub(current_usage_bps);
567        AdaptiveLinkStats {
568            auto_balancing_enabled,
569            estimated_capacity_bps,
570            peak_capacity_bps,
571            current_usage_bps,
572            peak_usage_bps: self.peak_usage_bps.max(current_usage_bps),
573            available_headroom_bps,
574            effective_weight: available_headroom_bps.max(1),
575            last_observed_ms: self.last_observed_ms,
576            sample_count: self.sample_count,
577        }
578    }
579}
580
581#[derive(Debug, Clone, Default)]
582struct TypeRuntimeStatsInner {
583    tx_packets: u64,
584    tx_bytes: u64,
585    rx_packets: u64,
586    rx_bytes: u64,
587    relayed_tx_packets: u64,
588    relayed_tx_bytes: u64,
589    relayed_rx_packets: u64,
590    relayed_rx_bytes: u64,
591    tx_retries: u64,
592    handler_failures: u64,
593}
594
595#[derive(Debug, Clone, Default)]
596struct SideRuntimeStatsInner {
597    tx_packets: u64,
598    tx_bytes: u64,
599    rx_packets: u64,
600    rx_bytes: u64,
601    relayed_tx_packets: u64,
602    relayed_tx_bytes: u64,
603    relayed_rx_packets: u64,
604    relayed_rx_bytes: u64,
605    tx_retries: u64,
606    tx_handler_failures: u64,
607    total_handler_retries: u64,
608    side_transport_full_frames: u64,
609    side_transport_compact_frames: u64,
610    side_transport_compact_delta_frames: u64,
611    side_transport_compact_omitted_timestamp_frames: u64,
612    side_transport_chunk_frames: u64,
613    side_transport_raw_bytes: u64,
614    side_transport_wire_bytes: u64,
615    side_transport_bytes_saved: u64,
616    side_transport_min_compact_overhead_bytes: Option<usize>,
617    side_transport_max_compact_overhead_bytes: Option<usize>,
618    side_transport_compact_target_misses: u64,
619    side_transport_template_evictions: u64,
620    data_types: BTreeMap<u32, TypeRuntimeStatsInner>,
621}
622
623impl SideRuntimeStatsInner {
624    fn type_stats_mut(&mut self, ty: crate::DataType) -> &mut TypeRuntimeStatsInner {
625        self.data_types.entry(ty.as_u32()).or_default()
626    }
627
628    fn note_tx(&mut self, ty: crate::DataType, bytes: usize, retries: usize) {
629        self.tx_packets = self.tx_packets.saturating_add(1);
630        self.tx_bytes = self.tx_bytes.saturating_add(bytes as u64);
631        self.relayed_tx_packets = self.relayed_tx_packets.saturating_add(1);
632        self.relayed_tx_bytes = self.relayed_tx_bytes.saturating_add(bytes as u64);
633        self.tx_retries = self.tx_retries.saturating_add(retries as u64);
634        self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
635        let stats = self.type_stats_mut(ty);
636        stats.tx_packets = stats.tx_packets.saturating_add(1);
637        stats.tx_bytes = stats.tx_bytes.saturating_add(bytes as u64);
638        stats.relayed_tx_packets = stats.relayed_tx_packets.saturating_add(1);
639        stats.relayed_tx_bytes = stats.relayed_tx_bytes.saturating_add(bytes as u64);
640        stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
641    }
642
643    fn note_rx(&mut self, ty: crate::DataType, bytes: usize) {
644        self.rx_packets = self.rx_packets.saturating_add(1);
645        self.rx_bytes = self.rx_bytes.saturating_add(bytes as u64);
646        self.relayed_rx_packets = self.relayed_rx_packets.saturating_add(1);
647        self.relayed_rx_bytes = self.relayed_rx_bytes.saturating_add(bytes as u64);
648        let stats = self.type_stats_mut(ty);
649        stats.rx_packets = stats.rx_packets.saturating_add(1);
650        stats.rx_bytes = stats.rx_bytes.saturating_add(bytes as u64);
651        stats.relayed_rx_packets = stats.relayed_rx_packets.saturating_add(1);
652        stats.relayed_rx_bytes = stats.relayed_rx_bytes.saturating_add(bytes as u64);
653    }
654
655    fn note_tx_failure(&mut self, ty: crate::DataType, retries: usize) {
656        self.tx_handler_failures = self.tx_handler_failures.saturating_add(1);
657        self.tx_retries = self.tx_retries.saturating_add(retries as u64);
658        self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
659        let stats = self.type_stats_mut(ty);
660        stats.handler_failures = stats.handler_failures.saturating_add(1);
661        stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
662    }
663
664    fn note_side_transport_full(&mut self, raw_bytes: usize, wire_bytes: usize) {
665        self.side_transport_full_frames = self.side_transport_full_frames.saturating_add(1);
666        self.note_side_transport_bytes(raw_bytes, wire_bytes);
667    }
668
669    fn note_side_transport_compact(
670        &mut self,
671        raw_bytes: usize,
672        wire_bytes: usize,
673        compact_overhead_bytes: usize,
674        used_timestamp_delta: bool,
675        omitted_timestamp: bool,
676    ) {
677        self.side_transport_compact_frames = self.side_transport_compact_frames.saturating_add(1);
678        if used_timestamp_delta {
679            self.side_transport_compact_delta_frames =
680                self.side_transport_compact_delta_frames.saturating_add(1);
681        }
682        if omitted_timestamp {
683            self.side_transport_compact_omitted_timestamp_frames = self
684                .side_transport_compact_omitted_timestamp_frames
685                .saturating_add(1);
686        }
687        self.note_side_transport_bytes(raw_bytes, wire_bytes);
688        self.side_transport_min_compact_overhead_bytes = Some(
689            self.side_transport_min_compact_overhead_bytes
690                .map_or(compact_overhead_bytes, |v| v.min(compact_overhead_bytes)),
691        );
692        self.side_transport_max_compact_overhead_bytes = Some(
693            self.side_transport_max_compact_overhead_bytes
694                .map_or(compact_overhead_bytes, |v| v.max(compact_overhead_bytes)),
695        );
696    }
697
698    fn note_side_transport_chunks(&mut self, chunks: usize) {
699        self.side_transport_chunk_frames = self
700            .side_transport_chunk_frames
701            .saturating_add(chunks as u64);
702    }
703
704    fn note_side_transport_bytes(&mut self, raw_bytes: usize, wire_bytes: usize) {
705        self.side_transport_raw_bytes = self
706            .side_transport_raw_bytes
707            .saturating_add(raw_bytes as u64);
708        self.side_transport_wire_bytes = self
709            .side_transport_wire_bytes
710            .saturating_add(wire_bytes as u64);
711        if raw_bytes > wire_bytes {
712            self.side_transport_bytes_saved = self
713                .side_transport_bytes_saved
714                .saturating_add((raw_bytes - wire_bytes) as u64);
715        }
716    }
717
718    fn note_side_transport_compact_target_miss(&mut self) {
719        self.side_transport_compact_target_misses =
720            self.side_transport_compact_target_misses.saturating_add(1);
721    }
722
723    fn note_side_transport_template_eviction(&mut self) {
724        self.side_transport_template_evictions =
725            self.side_transport_template_evictions.saturating_add(1);
726    }
727}
728
729#[derive(Clone, Debug)]
730pub struct RelayConfig {
731    sender: Arc<str>,
732    memory: RuntimeMemoryConfig,
733}
734
735impl RelayConfig {
736    pub fn new() -> Self {
737        Self::default()
738    }
739
740    pub fn with_sender<S: AsRef<str>>(mut self, sender: S) -> Self {
741        self.sender = Arc::from(sender.as_ref());
742        self
743    }
744
745    pub fn with_memory_config(mut self, memory: RuntimeMemoryConfig) -> TelemetryResult<Self> {
746        memory.validate()?;
747        self.memory = memory;
748        Ok(self)
749    }
750
751    fn sender(&self) -> Arc<str> {
752        self.sender.clone()
753    }
754
755    fn memory_config(&self) -> RuntimeMemoryConfig {
756        self.memory
757    }
758}
759
760impl Default for RelayConfig {
761    fn default() -> Self {
762        Self {
763            sender: Arc::from("RELAY"),
764            memory: RuntimeMemoryConfig::default(),
765        }
766    }
767}
768
769#[derive(Clone, Copy, Debug, PartialEq, Eq)]
770enum RouteSelectionOrigin {
771    Flood,
772    Discovered,
773}
774
775/// Internal state, protected by RouterMutex so all public methods can take &self.
776struct RelayInner {
777    memory: RuntimeMemoryConfig,
778    sides: Vec<Option<RelaySide>>,
779    route_overrides: BTreeMap<(Option<RelaySideId>, RelaySideId), bool>,
780    typed_route_overrides: BTreeMap<(Option<RelaySideId>, u32, RelaySideId), bool>,
781    route_weights: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
782    route_priorities: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
783    source_route_modes: BTreeMap<Option<RelaySideId>, RouteSelectionMode>,
784    route_selection_cursors: BTreeMap<Option<RelaySideId>, u64>,
785    adaptive_route_stats: BTreeMap<RelaySideId, AdaptiveRouteStats>,
786    side_runtime_stats: BTreeMap<RelaySideId, SideRuntimeStatsInner>,
787    side_transport: BTreeMap<RelaySideId, SideTransportState>,
788    rx_queue: BoundedDeque<RelayRxItem>,
789    tx_queue: BoundedDeque<RelayTxItem>,
790    replay_queue: BoundedDeque<RelayReplayItem>,
791    recent_rx: BoundedDeque<u64>,
792    reliable_tx: BTreeMap<(RelaySideId, u32), ReliableTxState>,
793    reliable_rx: BTreeMap<(RelaySideId, u32), ReliableRxState>,
794    reliable_return_routes: BTreeMap<u64, ReliableReturnRouteState>,
795    reliable_return_route_order: VecDeque<u64>,
796    end_to_end_acked_destinations: BTreeMap<u64, BTreeSet<u64>>,
797    end_to_end_acked_destination_order: VecDeque<u64>,
798    total_handler_failures: u64,
799    total_handler_retries: u64,
800    #[cfg(feature = "discovery")]
801    discovery_routes: BTreeMap<RelaySideId, DiscoverySideState>,
802    #[cfg(feature = "discovery")]
803    discovery_cadence: DiscoveryCadenceState,
804    #[cfg(feature = "discovery")]
805    discovery_side_throttle: BTreeMap<RelaySideId, DiscoverySideThrottleState>,
806    #[cfg(all(feature = "discovery", feature = "timesync"))]
807    timesync_side_throttle: BTreeMap<RelaySideId, TimeSyncSideThrottleState>,
808}
809
810#[derive(Clone, Copy, Debug, PartialEq, Eq)]
811enum RelayQueueKind {
812    Rx,
813    Tx,
814    Replay,
815    Recent,
816    ReliableRxBuffer,
817    #[cfg(feature = "discovery")]
818    Discovery,
819}
820
821impl RelayInner {
822    #[cfg(feature = "discovery")]
823    fn topology_board_byte_cost(board: &TopologyBoardNode) -> usize {
824        board
825            .sender_id
826            .len()
827            .saturating_add(board.reachable_endpoints.len() * size_of::<crate::DataEndpoint>())
828            .saturating_add(
829                board
830                    .reachable_timesync_sources
831                    .iter()
832                    .map(|s| s.len())
833                    .sum::<usize>(),
834            )
835            .saturating_add(board.connections.iter().map(|s| s.len()).sum::<usize>())
836    }
837
838    #[cfg(feature = "discovery")]
839    fn discovery_sender_byte_cost(sender: &str, state: &DiscoverySenderState) -> usize {
840        sender
841            .len()
842            .saturating_add(state.reachable.len() * size_of::<crate::DataEndpoint>())
843            .saturating_add(
844                state
845                    .reachable_timesync_sources
846                    .iter()
847                    .map(|s| s.len())
848                    .sum::<usize>(),
849            )
850            .saturating_add(
851                state
852                    .topology_boards
853                    .iter()
854                    .map(Self::topology_board_byte_cost)
855                    .sum::<usize>(),
856            )
857            .saturating_add(size_of::<DiscoverySenderState>())
858    }
859
860    #[cfg(feature = "discovery")]
861    fn discovery_route_byte_cost(route: &DiscoverySideState) -> usize {
862        size_of::<DiscoverySideState>()
863            .saturating_add(route.reachable.len() * size_of::<crate::DataEndpoint>())
864            .saturating_add(
865                route
866                    .reachable_timesync_sources
867                    .iter()
868                    .map(|s| s.len())
869                    .sum::<usize>(),
870            )
871            .saturating_add(
872                route
873                    .announcers
874                    .iter()
875                    .map(|(sender, state)| Self::discovery_sender_byte_cost(sender, state))
876                    .sum::<usize>(),
877            )
878    }
879
880    #[cfg(feature = "discovery")]
881    fn discovery_bytes_used(&self) -> usize {
882        self.discovery_routes
883            .values()
884            .map(Self::discovery_route_byte_cost)
885            .sum()
886    }
887
888    #[inline]
889    fn reliable_rx_buffered_bytes(&self) -> usize {
890        self.reliable_rx
891            .values()
892            .flat_map(|state| state.buffered.values())
893            .map(|bytes| size_of::<Arc<[u8]>>() + bytes.len())
894            .sum()
895    }
896
897    #[inline]
898    fn shared_queue_bytes_used(&self) -> usize {
899        self.rx_queue
900            .bytes_used()
901            .saturating_add(self.tx_queue.bytes_used())
902            .saturating_add(self.replay_queue.bytes_used())
903            .saturating_add(self.recent_rx.max_bytes())
904            .saturating_add(self.reliable_rx_buffered_bytes())
905            .saturating_add(crate::config::schema_bytes_used())
906            .saturating_add({
907                #[cfg(feature = "discovery")]
908                {
909                    self.discovery_bytes_used()
910                }
911                #[cfg(not(feature = "discovery"))]
912                {
913                    0
914                }
915            })
916    }
917
918    fn reliable_rx_buffer_len(&self) -> usize {
919        self.reliable_rx
920            .values()
921            .map(|state| state.buffered.len())
922            .sum()
923    }
924
925    fn pop_reliable_rx_buffered(&mut self) -> Option<Arc<[u8]>> {
926        let key = self
927            .reliable_rx
928            .iter()
929            .find_map(|(key, state)| (!state.buffered.is_empty()).then_some(*key))?;
930        self.reliable_rx
931            .get_mut(&key)?
932            .buffered
933            .pop_first()
934            .map(|(_, v)| v)
935    }
936
937    fn pop_shared_queue_item(&mut self, preferred: RelayQueueKind) -> bool {
938        match preferred {
939            RelayQueueKind::Rx => self.rx_queue.pop_front().is_some(),
940            RelayQueueKind::Tx => self.tx_queue.pop_front().is_some(),
941            RelayQueueKind::Replay => self.replay_queue.pop_front().is_some(),
942            RelayQueueKind::Recent => self.recent_rx.pop_front().is_some(),
943            RelayQueueKind::ReliableRxBuffer => self.pop_reliable_rx_buffered().is_some(),
944            #[cfg(feature = "discovery")]
945            RelayQueueKind::Discovery => self.pop_discovery_route(),
946        }
947    }
948
949    #[cfg(feature = "discovery")]
950    fn pop_discovery_route(&mut self) -> bool {
951        let Some((&side, _)) = self
952            .discovery_routes
953            .iter()
954            .min_by_key(|(_, route)| route.last_seen_ms)
955        else {
956            return false;
957        };
958        self.discovery_routes.remove(&side);
959        Self::queue_budget_warning("topology route evicted because shared queue budget is full");
960        true
961    }
962
963    fn largest_shared_queue(&self) -> Option<RelayQueueKind> {
964        let candidates = [
965            (
966                RelayQueueKind::Rx,
967                self.rx_queue.bytes_used(),
968                self.rx_queue.len(),
969            ),
970            (
971                RelayQueueKind::Tx,
972                self.tx_queue.bytes_used(),
973                self.tx_queue.len(),
974            ),
975            (
976                RelayQueueKind::Replay,
977                self.replay_queue.bytes_used(),
978                self.replay_queue.len(),
979            ),
980            (RelayQueueKind::Recent, 0, 0),
981            (
982                RelayQueueKind::ReliableRxBuffer,
983                self.reliable_rx_buffered_bytes(),
984                self.reliable_rx_buffer_len(),
985            ),
986            #[cfg(feature = "discovery")]
987            (
988                RelayQueueKind::Discovery,
989                self.discovery_bytes_used(),
990                self.discovery_routes.len(),
991            ),
992        ];
993        candidates
994            .into_iter()
995            .filter(|(_, bytes, len)| *bytes > 0 && *len > 0)
996            .max_by_key(|(kind, bytes, _)| {
997                (
998                    *bytes,
999                    if *kind == RelayQueueKind::ReliableRxBuffer {
1000                        0
1001                    } else {
1002                        1
1003                    },
1004                )
1005            })
1006            .map(|(kind, _, _)| kind)
1007    }
1008
1009    fn make_shared_queue_room(
1010        &mut self,
1011        incoming_cost: usize,
1012        preferred: RelayQueueKind,
1013    ) -> TelemetryResult<()> {
1014        if incoming_cost > self.memory.max_queue_budget {
1015            return Err(TelemetryError::PacketTooLarge(
1016                "Item exceeds maximum shared queue budget",
1017            ));
1018        }
1019
1020        while self.shared_queue_bytes_used().saturating_add(incoming_cost)
1021            > self.memory.max_queue_budget
1022        {
1023            let victim = self.largest_shared_queue().unwrap_or(preferred);
1024            if victim == RelayQueueKind::Discovery {
1025                Self::queue_budget_warning("topology data is using the largest queue budget share");
1026            }
1027            if !self.pop_shared_queue_item(victim) && !self.pop_shared_queue_item(preferred) {
1028                return Err(TelemetryError::PacketTooLarge(
1029                    "Item exceeds maximum shared queue budget",
1030                ));
1031            }
1032        }
1033
1034        Ok(())
1035    }
1036
1037    #[inline]
1038    fn queue_budget_warning(msg: &str) {
1039        #[cfg(feature = "std")]
1040        eprintln!("sedsnet queue budget warning: {msg}");
1041        let _ = msg;
1042    }
1043
1044    #[cfg(feature = "discovery")]
1045    fn fit_discovery_budget(&mut self) {
1046        while self.shared_queue_bytes_used() > self.memory.max_queue_budget {
1047            if !self.pop_discovery_route() {
1048                break;
1049            }
1050        }
1051    }
1052
1053    fn push_rx(&mut self, item: RelayRxItem) -> TelemetryResult<()> {
1054        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Rx)?;
1055        self.rx_queue
1056            .push_back_prioritized(item, |queued| queued.priority)
1057    }
1058
1059    fn push_tx(&mut self, item: RelayTxItem) -> TelemetryResult<()> {
1060        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Tx)?;
1061        self.tx_queue
1062            .push_back_prioritized(item, |queued| queued.priority)
1063    }
1064
1065    fn push_replay(&mut self, item: RelayReplayItem) -> TelemetryResult<()> {
1066        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Replay)?;
1067        self.replay_queue
1068            .push_back_prioritized(item, |queued| queued.priority)
1069    }
1070
1071    fn push_recent_rx(&mut self, id: u64) -> TelemetryResult<()> {
1072        while self.recent_rx.len() >= self.memory.max_recent_rx_ids {
1073            let _ = self.recent_rx.pop_front();
1074        }
1075        self.make_shared_queue_room(0, RelayQueueKind::Recent)?;
1076        self.recent_rx.push_back(id)
1077    }
1078
1079    fn buffer_reliable_rx(
1080        &mut self,
1081        side: RelaySideId,
1082        ty: crate::DataType,
1083        seq: u32,
1084        bytes: Arc<[u8]>,
1085    ) -> TelemetryResult<()> {
1086        let key = Relay::reliable_key(side, ty);
1087        if self
1088            .reliable_rx
1089            .get(&key)
1090            .is_some_and(|state| state.buffered.contains_key(&seq))
1091        {
1092            return Ok(());
1093        }
1094        let cost = size_of::<Arc<[u8]>>() + bytes.len();
1095        self.make_shared_queue_room(cost, RelayQueueKind::ReliableRxBuffer)?;
1096        let rx_state = self
1097            .reliable_rx
1098            .entry(key)
1099            .or_insert_with(|| ReliableRxState {
1100                expected_seq: 1,
1101                buffered: BTreeMap::new(),
1102            });
1103        if rx_state.buffered.len() >= runtime_reliable_max_pending() {
1104            let _ = rx_state.buffered.pop_first();
1105        }
1106        rx_state.buffered.insert(seq, bytes);
1107        Ok(())
1108    }
1109}
1110
1111/// Relay that fans out packets from one side to all others.
1112/// - Supports both packed bytes and full Packet.
1113/// - Has RX & TX queues, like Router.
1114/// - Uses a Clock for the *_with_timeout APIs, same style as Router.
1115pub struct Relay {
1116    sender: RouterMutex<Arc<str>>,
1117    state: RouterMutex<RelayInner>,
1118    side_tx_gate: ReentryGate,
1119    clock: Box<dyn Clock + Send + Sync>,
1120}
1121
1122enum RemoteSidePlan {
1123    Target(Vec<RelaySideId>),
1124}
1125
1126impl Relay {
1127    const END_TO_END_ACK_SENDER: &'static str = "E2EACK";
1128    const END_TO_END_ACK_PREFIX: &'static str = "E2EACK:";
1129
1130    fn relay_item_priority(data: &RelayItem) -> TelemetryResult<u8> {
1131        let ty = match data {
1132            RelayItem::Packet(pkt) => pkt.data_type(),
1133            RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
1134        };
1135        Ok(message_priority(ty))
1136    }
1137
1138    #[inline]
1139    fn is_side_tx_busy(err: &TelemetryError) -> bool {
1140        matches!(err, TelemetryError::Io("side tx busy"))
1141    }
1142
1143    fn process_replay_queue_item(&self) -> TelemetryResult<bool> {
1144        let Some(item) = ({
1145            let mut st = self.state.lock();
1146            st.replay_queue.pop_front()
1147        }) else {
1148            return Ok(false);
1149        };
1150        let frame = wire_format::peek_frame_info(item.bytes.as_ref())?;
1151        let ty = frame.envelope.ty;
1152        let Some(hdr) = frame.reliable else {
1153            return Ok(false);
1154        };
1155        {
1156            let mut st = self.state.lock();
1157            let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1158            if !tx_state.sent.contains_key(&hdr.seq) {
1159                return Ok(false);
1160            }
1161        }
1162        if let Err(e) = self.send_reliable_raw_to_side(item.dst, item.bytes.clone()) {
1163            if Self::is_side_tx_busy(&e) {
1164                let mut st = self.state.lock();
1165                st.push_replay(item)?;
1166                return Ok(false);
1167            }
1168            return Err(e);
1169        }
1170        let mut st = self.state.lock();
1171        let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1172        if let Some(sent) = tx_state.sent.get_mut(&hdr.seq) {
1173            sent.last_send_ms = self.clock.now_ms();
1174            sent.queued = false;
1175        }
1176        Ok(true)
1177    }
1178
1179    fn pop_ready_tx_item(
1180        &self,
1181    ) -> Option<(
1182        Option<RelaySideId>,
1183        RelaySideId,
1184        RelayTxHandlerFn,
1185        RelaySideOptions,
1186        RelayItem,
1187    )> {
1188        let mut st = self.state.lock();
1189        if let Some(item) = st.tx_queue.pop_front() {
1190            let side = st.sides.get(item.dst).and_then(|side| side.clone());
1191            side.map(|s| (item.src, item.dst, s.tx_handler, s.opts, item.data))
1192        } else {
1193            None
1194        }
1195    }
1196
1197    fn send_tx_item(
1198        &self,
1199        src: Option<RelaySideId>,
1200        dst: RelaySideId,
1201        handler: RelayTxHandlerFn,
1202        opts: RelaySideOptions,
1203        data: RelayItem,
1204    ) -> TelemetryResult<bool> {
1205        let allowed = {
1206            let mut st = self.state.lock();
1207            let ty = match &data {
1208                RelayItem::Packet(pkt) => Some(pkt.data_type()),
1209                RelayItem::Packed(bytes) => Some(wire_format::peek_envelope(bytes.as_ref())?.ty),
1210            };
1211            let route_allowed = self.route_allowed_locked(&st, src, ty, dst);
1212            #[cfg(all(feature = "discovery", feature = "timesync"))]
1213            let timesync_allowed = ty
1214                .map(|ty| {
1215                    Self::timesync_allowed_for_side_locked(&mut st, dst, ty, self.clock.now_ms())
1216                })
1217                .unwrap_or(true);
1218            #[cfg(not(all(feature = "discovery", feature = "timesync")))]
1219            let timesync_allowed = true;
1220            route_allowed && timesync_allowed
1221        };
1222        if !allowed {
1223            return Ok(false);
1224        }
1225        if opts.reliable_enabled && matches!(handler, RelayTxHandlerFn::Packed(_)) {
1226            self.send_reliable_to_side(dst, data)?;
1227            Ok(true)
1228        } else if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1229            self.call_tx_handler(dst, &handler, &adjusted)?;
1230            Ok(true)
1231        } else {
1232            Ok(false)
1233        }
1234    }
1235
1236    /// Create a new relay with the given clock.
1237    pub fn new(clock: Box<dyn Clock + Send + Sync>) -> Self {
1238        Self::new_with_config(RelayConfig::default(), clock)
1239    }
1240
1241    /// Create a new relay with explicit runtime configuration and clock.
1242    pub fn new_with_config(cfg: RelayConfig, clock: Box<dyn Clock + Send + Sync>) -> Self {
1243        let memory = cfg.memory_config();
1244        Self {
1245            sender: RouterMutex::new(cfg.sender()),
1246            state: RouterMutex::new(RelayInner {
1247                memory,
1248                sides: Vec::new(),
1249                route_overrides: BTreeMap::new(),
1250                typed_route_overrides: BTreeMap::new(),
1251                route_weights: BTreeMap::new(),
1252                route_priorities: BTreeMap::new(),
1253                source_route_modes: BTreeMap::new(),
1254                route_selection_cursors: BTreeMap::new(),
1255                adaptive_route_stats: BTreeMap::new(),
1256                side_runtime_stats: BTreeMap::new(),
1257                side_transport: BTreeMap::new(),
1258                rx_queue: BoundedDeque::new(
1259                    memory.max_queue_budget,
1260                    memory.starting_queue_size,
1261                    memory.queue_grow_step,
1262                ),
1263                tx_queue: BoundedDeque::new(
1264                    memory.max_queue_budget,
1265                    memory.starting_queue_size,
1266                    memory.queue_grow_step,
1267                ),
1268                replay_queue: BoundedDeque::new(
1269                    memory.max_queue_budget,
1270                    memory.starting_queue_size,
1271                    memory.queue_grow_step,
1272                ),
1273                recent_rx: BoundedDeque::new(
1274                    memory.recent_rx_queue_bytes(),
1275                    memory.recent_rx_queue_bytes(),
1276                    memory.queue_grow_step,
1277                ),
1278                reliable_tx: BTreeMap::new(),
1279                reliable_rx: BTreeMap::new(),
1280                reliable_return_routes: BTreeMap::new(),
1281                reliable_return_route_order: VecDeque::new(),
1282                end_to_end_acked_destinations: BTreeMap::new(),
1283                end_to_end_acked_destination_order: VecDeque::new(),
1284                total_handler_failures: 0,
1285                total_handler_retries: 0,
1286                #[cfg(feature = "discovery")]
1287                discovery_routes: BTreeMap::new(),
1288                #[cfg(feature = "discovery")]
1289                discovery_cadence: DiscoveryCadenceState::default(),
1290                #[cfg(feature = "discovery")]
1291                discovery_side_throttle: BTreeMap::new(),
1292                #[cfg(all(feature = "discovery", feature = "timesync"))]
1293                timesync_side_throttle: BTreeMap::new(),
1294            }),
1295            side_tx_gate: ReentryGate::new(),
1296            clock,
1297        }
1298    }
1299
1300    #[inline]
1301    fn sender_arc(&self) -> Arc<str> {
1302        self.sender.lock().clone()
1303    }
1304
1305    #[inline]
1306    pub fn sender(&self) -> Arc<str> {
1307        self.sender_arc()
1308    }
1309
1310    pub fn set_sender<S: AsRef<str>>(&self, sender: S) {
1311        *self.sender.lock() = Arc::from(sender.as_ref());
1312    }
1313
1314    #[inline]
1315    fn try_enter_side_tx(&self) -> Option<ReentryGuard<'_>> {
1316        self.side_tx_gate.try_enter()
1317    }
1318
1319    #[inline]
1320    fn side_tx_active(&self) -> bool {
1321        self.side_tx_gate.is_active()
1322    }
1323
1324    #[inline]
1325    fn side_ref(st: &RelayInner, side: RelaySideId) -> TelemetryResult<&RelaySide> {
1326        st.sides
1327            .get(side)
1328            .and_then(|side| side.as_ref())
1329            .ok_or(TelemetryError::HandlerError("relay: invalid side id"))
1330    }
1331
1332    fn note_side_tx_success(
1333        &self,
1334        side: RelaySideId,
1335        ty: crate::DataType,
1336        bytes: usize,
1337        attempts: usize,
1338    ) {
1339        let mut st = self.state.lock();
1340        let entry = st.side_runtime_stats.entry(side).or_default();
1341        entry.note_tx(ty, bytes, attempts.saturating_sub(1));
1342    }
1343
1344    fn note_side_tx_failure(&self, side: RelaySideId, ty: crate::DataType, attempts: usize) {
1345        let mut st = self.state.lock();
1346        st.total_handler_failures = st.total_handler_failures.saturating_add(1);
1347        st.total_handler_retries = st.total_handler_retries.saturating_add(attempts as u64);
1348        let entry = st.side_runtime_stats.entry(side).or_default();
1349        entry.note_tx_failure(ty, attempts);
1350    }
1351
1352    fn note_side_rx(&self, side: RelaySideId, ty: crate::DataType, bytes: usize) {
1353        let mut st = self.state.lock();
1354        let entry = st.side_runtime_stats.entry(side).or_default();
1355        entry.note_rx(ty, bytes);
1356    }
1357
1358    #[inline]
1359    fn ensure_side_ingress_enabled(&self, side: RelaySideId) -> TelemetryResult<()> {
1360        let st = self.state.lock();
1361        let side_ref = Self::side_ref(&st, side)?;
1362        if side_ref.opts.ingress_enabled {
1363            Ok(())
1364        } else {
1365            Err(TelemetryError::HandlerError(
1366                "relay: ingress disabled for side id",
1367            ))
1368        }
1369    }
1370
1371    #[inline]
1372    fn route_allowed_locked(
1373        &self,
1374        st: &RelayInner,
1375        src: Option<RelaySideId>,
1376        ty: Option<crate::DataType>,
1377        dst: RelaySideId,
1378    ) -> bool {
1379        let Ok(dst_side) = Self::side_ref(st, dst) else {
1380            return false;
1381        };
1382        if !dst_side.opts.egress_enabled {
1383            return false;
1384        }
1385        if let Some(src_id) = src {
1386            let Ok(src_side) = Self::side_ref(st, src_id) else {
1387                return false;
1388            };
1389            if !src_side.opts.ingress_enabled || src_id == dst {
1390                return false;
1391            }
1392        }
1393        let base_allowed = st.route_overrides.get(&(src, dst)).copied().unwrap_or(true);
1394        if !base_allowed {
1395            return false;
1396        }
1397
1398        let Some(ty) = ty else {
1399            return true;
1400        };
1401        if st
1402            .typed_route_overrides
1403            .keys()
1404            .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1405        {
1406            return st
1407                .typed_route_overrides
1408                .get(&(src, ty.as_u32(), dst))
1409                .copied()
1410                .unwrap_or(false);
1411        }
1412        true
1413    }
1414
1415    fn has_typed_route_overrides_locked(
1416        st: &RelayInner,
1417        src: Option<RelaySideId>,
1418        ty: crate::DataType,
1419    ) -> bool {
1420        st.typed_route_overrides
1421            .keys()
1422            .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1423    }
1424
1425    fn eligible_side_ids_locked(
1426        &self,
1427        st: &RelayInner,
1428        src: Option<RelaySideId>,
1429        ty: Option<crate::DataType>,
1430        restrict_link_local: bool,
1431    ) -> Vec<RelaySideId> {
1432        st.sides
1433            .iter()
1434            .enumerate()
1435            .filter_map(|(side_id, side)| {
1436                let side = side.as_ref()?;
1437                if restrict_link_local && !side.opts.link_local_enabled {
1438                    return None;
1439                }
1440                if self.route_allowed_locked(st, src, ty, side_id) {
1441                    Some(side_id)
1442                } else {
1443                    None
1444                }
1445            })
1446            .collect()
1447    }
1448
1449    fn apply_route_selection_locked(
1450        &self,
1451        st: &mut RelayInner,
1452        src: Option<RelaySideId>,
1453        mut sides: Vec<RelaySideId>,
1454        origin: RouteSelectionOrigin,
1455    ) -> Vec<RelaySideId> {
1456        if sides.len() <= 1 {
1457            return sides;
1458        }
1459
1460        let selection_mode = st.source_route_modes.get(&src).copied();
1461        if selection_mode.is_none() && origin == RouteSelectionOrigin::Discovered {
1462            return self.apply_adaptive_discovery_selection_locked(st, src, sides);
1463        }
1464
1465        match selection_mode.unwrap_or(RouteSelectionMode::Fanout) {
1466            RouteSelectionMode::Fanout => sides,
1467            RouteSelectionMode::Weighted => {
1468                sides.sort_unstable();
1469                let total_weight = sides.iter().fold(0_u64, |acc, side| {
1470                    acc + u64::from(st.route_weights.get(&(src, *side)).copied().unwrap_or(1))
1471                });
1472                if total_weight == 0 {
1473                    return Vec::new();
1474                }
1475                let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1476                let pick = *cursor % total_weight;
1477                *cursor = cursor.wrapping_add(1);
1478                let mut remaining = pick;
1479                for side in sides {
1480                    let weight =
1481                        u64::from(st.route_weights.get(&(src, side)).copied().unwrap_or(1));
1482                    if remaining < weight {
1483                        return vec![side];
1484                    }
1485                    remaining -= weight;
1486                }
1487                Vec::new()
1488            }
1489            RouteSelectionMode::Failover => {
1490                sides.sort_by_key(|side| {
1491                    (
1492                        st.route_priorities.get(&(src, *side)).copied().unwrap_or(0),
1493                        *side,
1494                    )
1495                });
1496                sides.truncate(1);
1497                sides
1498            }
1499        }
1500    }
1501
1502    fn apply_adaptive_discovery_selection_locked(
1503        &self,
1504        st: &mut RelayInner,
1505        src: Option<RelaySideId>,
1506        mut sides: Vec<RelaySideId>,
1507    ) -> Vec<RelaySideId> {
1508        sides.sort_unstable();
1509        let mut unmeasured: Vec<_> = sides
1510            .iter()
1511            .copied()
1512            .filter(|side| !st.adaptive_route_stats.contains_key(side))
1513            .collect();
1514        if !unmeasured.is_empty() {
1515            let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1516            let pick = (*cursor as usize) % unmeasured.len();
1517            *cursor = cursor.wrapping_add(1);
1518            return vec![unmeasured.swap_remove(pick)];
1519        }
1520
1521        let now_ms = self.clock.now_ms();
1522        let total_weight = sides.iter().fold(0_u64, |acc, side| {
1523            acc + st
1524                .adaptive_route_stats
1525                .get(side)
1526                .map(|stats| stats.weight(now_ms))
1527                .unwrap_or(1)
1528        });
1529        if total_weight == 0 {
1530            sides.truncate(1);
1531            return sides;
1532        }
1533
1534        let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1535        let pick = *cursor % total_weight;
1536        *cursor = cursor.wrapping_add(1);
1537        let mut remaining = pick;
1538        for side in sides {
1539            let weight = st
1540                .adaptive_route_stats
1541                .get(&side)
1542                .map(|stats| stats.weight(now_ms))
1543                .unwrap_or(1);
1544            if remaining < weight {
1545                return vec![side];
1546            }
1547            remaining -= weight;
1548        }
1549        Vec::new()
1550    }
1551
1552    fn record_side_tx_sample(
1553        &self,
1554        side: RelaySideId,
1555        bytes: usize,
1556        started_ms: u64,
1557        ended_ms: u64,
1558    ) {
1559        let sample_ms = ended_ms.saturating_sub(started_ms).max(1);
1560        let sample_bps = ((bytes as u128).saturating_mul(1000) / u128::from(sample_ms))
1561            .min(u128::from(u64::MAX)) as u64;
1562        let mut st = self.state.lock();
1563        st.adaptive_route_stats
1564            .entry(side)
1565            .or_default()
1566            .observe(bytes, sample_bps, ended_ms);
1567    }
1568
1569    /// Seed adaptive route selection with a transport-measured link probe.
1570    ///
1571    /// Call this after a side-specific bring-up probe, or whenever the transport already knows the
1572    /// duration for a frame. The relay does not emit synthetic probe frames by itself.
1573    pub fn note_side_link_probe_sample(
1574        &self,
1575        side: RelaySideId,
1576        bytes: usize,
1577        duration_ms: u64,
1578    ) -> TelemetryResult<()> {
1579        {
1580            let st = self.state.lock();
1581            let _ = Self::side_ref(&st, side).map_err(|_| TelemetryError::BadArg)?;
1582        }
1583        let ended_ms = self.clock.now_ms();
1584        self.record_side_tx_sample(side, bytes, ended_ms.saturating_sub(duration_ms), ended_ms);
1585        Ok(())
1586    }
1587
1588    fn relay_item_wire_len(data: &RelayItem) -> TelemetryResult<usize> {
1589        match data {
1590            RelayItem::Packet(pkt) => Ok(wire_format::pack_packet(pkt).len()),
1591            RelayItem::Packed(bytes) => Ok(bytes.len()),
1592        }
1593    }
1594
1595    #[inline]
1596    fn decode_end_to_end_reliable_ack(payload: &[u8]) -> TelemetryResult<u64> {
1597        if payload.len() != 8 {
1598            return Err(TelemetryError::Unpack("bad reliable e2e ack payload"));
1599        }
1600        Ok(u64::from_le_bytes(payload[0..8].try_into().unwrap()))
1601    }
1602
1603    #[inline]
1604    fn is_end_to_end_ack_sender(sender: &str) -> bool {
1605        sender == Self::END_TO_END_ACK_SENDER || sender.starts_with(Self::END_TO_END_ACK_PREFIX)
1606    }
1607
1608    #[inline]
1609    fn sender_hash(sender: &str) -> u64 {
1610        hash_bytes_u64(0x517C_C1B7_2722_0A95, sender.as_bytes())
1611    }
1612
1613    fn decode_end_to_end_ack_sender_hash(sender: &str) -> Option<u64> {
1614        sender
1615            .strip_prefix(Self::END_TO_END_ACK_PREFIX)
1616            .filter(|sender| !sender.is_empty())
1617            .map(Self::sender_hash)
1618    }
1619
1620    #[cfg(feature = "discovery")]
1621    fn is_end_to_end_destination_sender(&self, sender: &str) -> bool {
1622        sender != self.sender_arc().as_ref() && !Self::is_end_to_end_ack_sender(sender)
1623    }
1624
1625    /// Extract the logical packet ID targeted by an end-to-end reliable ACK item.
1626    ///
1627    /// Relay queues can hold either decoded packets or packed frames. This
1628    /// helper normalizes both forms so relay ACK-routing logic can treat them
1629    /// uniformly.
1630    ///
1631    /// Only relay-visible end-to-end `ReliableAck` packets qualify here.
1632    /// Unrelated traffic returns `Ok(None)`.
1633    fn reliable_control_target_packet_id(data: &RelayItem) -> TelemetryResult<Option<u64>> {
1634        match data {
1635            RelayItem::Packet(pkt) => {
1636                if pkt.data_type() != crate::DataType::ReliableAck
1637                    || !Self::is_end_to_end_ack_sender(pkt.sender())
1638                {
1639                    return Ok(None);
1640                }
1641                Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1642            }
1643            RelayItem::Packed(bytes) => {
1644                if wire_format::peek_frame_info(bytes.as_ref())
1645                    .ok()
1646                    .is_some_and(|frame| frame.ack_only())
1647                {
1648                    return Ok(None);
1649                }
1650                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1651                if pkt.data_type() != crate::DataType::ReliableAck
1652                    || !Self::is_end_to_end_ack_sender(pkt.sender())
1653                {
1654                    return Ok(None);
1655                }
1656                Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1657            }
1658        }
1659    }
1660
1661    fn note_reliable_return_route(&self, side: RelaySideId, packet_id: u64) {
1662        let mut st = self.state.lock();
1663        Self::remember_reliable_return_route_locked(&mut st, packet_id);
1664        st.reliable_return_routes
1665            .insert(packet_id, ReliableReturnRouteState { side });
1666    }
1667
1668    /// Refresh or insert `packet_id` in the bounded reliable return-route cache.
1669    ///
1670    /// The relay uses this cache to route end-to-end acknowledgements back
1671    /// toward the source side that most recently forwarded the corresponding
1672    /// reliable data packet.
1673    fn remember_reliable_return_route_locked(st: &mut RelayInner, packet_id: u64) {
1674        let cap = runtime_reliable_max_return_routes().max(1);
1675        st.reliable_return_route_order
1676            .retain(|id| st.reliable_return_routes.contains_key(id) && *id != packet_id);
1677        while st.reliable_return_route_order.len() >= cap {
1678            if let Some(oldest) = st.reliable_return_route_order.pop_front() {
1679                st.reliable_return_routes.remove(&oldest);
1680            } else {
1681                break;
1682            }
1683        }
1684        st.reliable_return_route_order.push_back(packet_id);
1685    }
1686
1687    fn note_end_to_end_acked_destination_locked(
1688        st: &mut RelayInner,
1689        packet_id: u64,
1690        sender_hash: u64,
1691    ) {
1692        let entry_cap = runtime_reliable_max_end_to_end_ack_cache().max(1);
1693        st.end_to_end_acked_destination_order
1694            .retain(|id| st.end_to_end_acked_destinations.contains_key(id) && *id != packet_id);
1695        while st.end_to_end_acked_destination_order.len() >= entry_cap {
1696            if let Some(oldest) = st.end_to_end_acked_destination_order.pop_front() {
1697                st.end_to_end_acked_destinations.remove(&oldest);
1698            } else {
1699                break;
1700            }
1701        }
1702        st.end_to_end_acked_destination_order.push_back(packet_id);
1703
1704        let acked = st
1705            .end_to_end_acked_destinations
1706            .entry(packet_id)
1707            .or_default();
1708        let sender_cap = runtime_reliable_max_end_to_end_pending().max(1);
1709        if acked.len() < sender_cap || acked.contains(&sender_hash) {
1710            acked.insert(sender_hash);
1711        }
1712    }
1713
1714    #[inline]
1715    fn reliable_key(side: RelaySideId, ty: crate::DataType) -> (RelaySideId, u32) {
1716        (side, ty.as_u32())
1717    }
1718
1719    fn reliable_tx_state_mut<'a>(
1720        &'a self,
1721        st: &'a mut RelayInner,
1722        side: RelaySideId,
1723        ty: crate::DataType,
1724    ) -> &'a mut ReliableTxState {
1725        let key = Self::reliable_key(side, ty);
1726        st.reliable_tx
1727            .entry(key)
1728            .or_insert_with(|| ReliableTxState {
1729                next_seq: 1,
1730                sent_order: VecDeque::new(),
1731                sent: BTreeMap::new(),
1732            })
1733    }
1734
1735    fn reliable_rx_state_mut<'a>(
1736        &'a self,
1737        st: &'a mut RelayInner,
1738        side: RelaySideId,
1739        ty: crate::DataType,
1740    ) -> &'a mut ReliableRxState {
1741        let key = Self::reliable_key(side, ty);
1742        st.reliable_rx
1743            .entry(key)
1744            .or_insert_with(|| ReliableRxState {
1745                expected_seq: 1,
1746                buffered: BTreeMap::new(),
1747            })
1748    }
1749
1750    fn handle_reliable_ack(&self, side: RelaySideId, ty: crate::DataType, ack: u32) {
1751        let mut st = self.state.lock();
1752        let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1753        if matches!(reliable_mode(ty), crate::ReliableMode::Unordered) {
1754            tx_state.sent.remove(&ack);
1755            tx_state.sent_order.retain(|seq| *seq != ack);
1756            return;
1757        }
1758
1759        while let Some(seq) = tx_state.sent_order.front().copied() {
1760            if seq > ack {
1761                break;
1762            }
1763            tx_state.sent_order.pop_front();
1764            tx_state.sent.remove(&seq);
1765        }
1766    }
1767
1768    fn handle_reliable_partial_ack(&self, side: RelaySideId, ty: crate::DataType, seq: u32) {
1769        let mut st = self.state.lock();
1770        let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1771        if let Some(sent) = tx_state.sent.get_mut(&seq) {
1772            sent.partial_acked = true;
1773        }
1774    }
1775
1776    fn reliable_control_packet(
1777        &self,
1778        control_ty: crate::DataType,
1779        ty: crate::DataType,
1780        seq: u32,
1781    ) -> TelemetryResult<Packet> {
1782        let sender = self.sender_arc();
1783        Packet::new(
1784            control_ty,
1785            message_meta(control_ty).endpoints,
1786            sender.as_ref(),
1787            self.clock.now_ms(),
1788            crate::router::encode_slice_le(&[ty.as_u32(), seq]),
1789        )
1790    }
1791
1792    fn queue_reliable_ack(
1793        &self,
1794        side: RelaySideId,
1795        ty: crate::DataType,
1796        seq: u32,
1797    ) -> TelemetryResult<()> {
1798        let pkt = self.reliable_control_packet(crate::DataType::ReliableAck, ty, seq)?;
1799        let data = RelayItem::Packet(Arc::new(pkt));
1800        let priority = Self::relay_item_priority(&data)?;
1801        let mut st = self.state.lock();
1802        st.push_tx(RelayTxItem {
1803            src: None,
1804            dst: side,
1805            data,
1806            priority,
1807        })?;
1808        Ok(())
1809    }
1810
1811    fn queue_reliable_packet_request(
1812        &self,
1813        side: RelaySideId,
1814        ty: crate::DataType,
1815        seq: u32,
1816    ) -> TelemetryResult<()> {
1817        let pkt = self.reliable_control_packet(crate::DataType::ReliablePacketRequest, ty, seq)?;
1818        let data = RelayItem::Packet(Arc::new(pkt));
1819        let priority = Self::relay_item_priority(&data)?;
1820        let mut st = self.state.lock();
1821        st.push_tx(RelayTxItem {
1822            src: None,
1823            dst: side,
1824            data,
1825            priority,
1826        })?;
1827        Ok(())
1828    }
1829
1830    fn queue_reliable_partial_ack(
1831        &self,
1832        side: RelaySideId,
1833        ty: crate::DataType,
1834        seq: u32,
1835    ) -> TelemetryResult<()> {
1836        let pkt = self.reliable_control_packet(crate::DataType::ReliablePartialAck, ty, seq)?;
1837        let data = RelayItem::Packet(Arc::new(pkt));
1838        let priority = Self::relay_item_priority(&data)?;
1839        let mut st = self.state.lock();
1840        st.push_tx(RelayTxItem {
1841            src: None,
1842            dst: side,
1843            data,
1844            priority,
1845        })?;
1846        Ok(())
1847    }
1848
1849    fn queue_reliable_retransmit(
1850        &self,
1851        side: RelaySideId,
1852        ty: crate::DataType,
1853        seq: u32,
1854    ) -> TelemetryResult<()> {
1855        let mut queued = None;
1856        {
1857            let mut st = self.state.lock();
1858            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1859            if let Some(sent) = tx_state.sent.get_mut(&seq)
1860                && !sent.queued
1861            {
1862                sent.queued = true;
1863                sent.partial_acked = false;
1864                queued = Some(sent.bytes.clone());
1865            }
1866        }
1867
1868        if let Some(bytes) = queued {
1869            let mut st = self.state.lock();
1870            st.push_replay(RelayReplayItem {
1871                dst: side,
1872                bytes,
1873                priority: message_priority(ty).saturating_add(16),
1874            })?;
1875        }
1876        Ok(())
1877    }
1878
1879    fn send_reliable_raw_to_side(
1880        &self,
1881        side: RelaySideId,
1882        bytes: Arc<[u8]>,
1883    ) -> TelemetryResult<()> {
1884        let (handler, opts) = {
1885            let st = self.state.lock();
1886            let side_ref = Self::side_ref(&st, side)?;
1887            if !side_ref.opts.egress_enabled {
1888                return Ok(());
1889            }
1890            (side_ref.tx_handler.clone(), side_ref.opts)
1891        };
1892
1893        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1894            return Err(TelemetryError::Io("side tx busy"));
1895        };
1896        let started_ms = self.clock.now_ms();
1897        let ty = wire_format::peek_envelope(bytes.as_ref())
1898            .map(|env| env.ty)
1899            .unwrap_or(crate::DataType::ReliableAck);
1900        let result = match handler {
1901            RelayTxHandlerFn::Packed(f) => {
1902                let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1903                let mut sent_bytes = 0usize;
1904                for frame in frames {
1905                    f(frame.as_ref())?;
1906                    sent_bytes = sent_bytes.saturating_add(frame.len());
1907                }
1908                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1909                self.note_side_tx_success(side, ty, sent_bytes, 1);
1910                return Ok(());
1911            }
1912            RelayTxHandlerFn::Packet(f) => {
1913                if wire_format::peek_frame_info(bytes.as_ref())
1914                    .ok()
1915                    .is_some_and(|frame| frame.ack_only())
1916                {
1917                    return Ok(());
1918                }
1919                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1920                f(&pkt)
1921            }
1922        };
1923        if result.is_ok() {
1924            self.record_side_tx_sample(side, bytes.len(), started_ms, self.clock.now_ms());
1925            self.note_side_tx_success(side, ty, bytes.len(), 1);
1926        } else {
1927            self.note_side_tx_failure(side, ty, 1);
1928        }
1929        result
1930    }
1931
1932    fn send_reliable_to_side(&self, side: RelaySideId, data: RelayItem) -> TelemetryResult<()> {
1933        let (handler, opts, hop_reliable_enabled) = {
1934            let st = self.state.lock();
1935            let side_ref = Self::side_ref(&st, side)?;
1936            let opts = side_ref.opts;
1937            let hop_reliable_enabled = opts.reliable_enabled
1938                && !self.side_has_multiple_announcers_locked(&st, side, self.clock.now_ms());
1939            (side_ref.tx_handler.clone(), opts, hop_reliable_enabled)
1940        };
1941
1942        let RelayTxHandlerFn::Packed(f) = &handler else {
1943            return self.call_tx_handler(side, &handler, &data);
1944        };
1945
1946        if !hop_reliable_enabled {
1947            let mut adjusted_opts = opts;
1948            adjusted_opts.reliable_enabled = false;
1949            if let Some(adjusted) = self.adjust_reliable_for_side(adjusted_opts, data)? {
1950                return self.call_tx_handler(side, &handler, &adjusted);
1951            }
1952            return Ok(());
1953        }
1954
1955        let ty = match &data {
1956            RelayItem::Packet(pkt) => pkt.data_type(),
1957            RelayItem::Packed(bytes) => {
1958                let Ok(frame) = wire_format::peek_frame_info(bytes.as_ref()) else {
1959                    return self.call_tx_handler(side, &handler, &data);
1960                };
1961                frame.envelope.ty
1962            }
1963        };
1964
1965        if !is_reliable_type(ty) {
1966            if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1967                self.call_tx_handler(side, &handler, &adjusted)?;
1968            }
1969            return Ok(());
1970        }
1971
1972        let (seq, flags) = {
1973            let mut st = self.state.lock();
1974            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1975            if tx_state.sent.len() >= runtime_reliable_max_pending() {
1976                return Err(TelemetryError::PacketTooLarge(
1977                    "relay reliable history full",
1978                ));
1979            }
1980            let seq = tx_state.next_seq;
1981            let next = tx_state.next_seq.wrapping_add(1);
1982            tx_state.next_seq = if next == 0 { 1 } else { next };
1983            let flags = match reliable_mode(ty) {
1984                crate::ReliableMode::Unordered => wire_format::RELIABLE_FLAG_UNORDERED,
1985                _ => 0,
1986            };
1987            (seq, flags)
1988        };
1989
1990        let bytes: Arc<[u8]> = match data {
1991            RelayItem::Packet(pkt) => wire_format::pack_packet_with_reliable(
1992                &pkt,
1993                wire_format::ReliableHeader { flags, seq, ack: 0 },
1994            ),
1995            RelayItem::Packed(bytes) => {
1996                let Some(rewritten) =
1997                    wire_format::rewrite_reliable_header_owned(bytes.as_ref(), flags, seq, 0)?
1998                else {
1999                    let Some(_side_tx_guard) = self.try_enter_side_tx() else {
2000                        return Err(TelemetryError::Io("side tx busy"));
2001                    };
2002                    let started_ms = self.clock.now_ms();
2003                    let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
2004                    let mut sent_bytes = 0usize;
2005                    for frame in frames {
2006                        f(frame.as_ref())?;
2007                        sent_bytes = sent_bytes.saturating_add(frame.len());
2008                    }
2009                    self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
2010                    self.note_side_tx_success(side, ty, sent_bytes, 1);
2011                    return Ok(());
2012                };
2013                rewritten
2014            }
2015        };
2016
2017        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
2018            return Err(TelemetryError::Io("side tx busy"));
2019        };
2020        let started_ms = self.clock.now_ms();
2021        let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
2022        let mut sent_bytes = 0usize;
2023        for frame in frames {
2024            f(frame.as_ref())?;
2025            sent_bytes = sent_bytes.saturating_add(frame.len());
2026        }
2027        self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
2028        self.note_side_tx_success(side, ty, sent_bytes, 1);
2029
2030        {
2031            let mut st = self.state.lock();
2032            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
2033            tx_state.sent_order.push_back(seq);
2034            tx_state.sent.insert(
2035                seq,
2036                ReliableSent {
2037                    bytes: bytes.clone(),
2038                    last_send_ms: self.clock.now_ms(),
2039                    retries: 0,
2040                    queued: false,
2041                    partial_acked: false,
2042                },
2043            );
2044        }
2045
2046        Ok(())
2047    }
2048
2049    fn item_route_info(
2050        &self,
2051        data: &RelayItem,
2052    ) -> TelemetryResult<(Vec<crate::DataEndpoint>, crate::DataType)> {
2053        match data {
2054            RelayItem::Packet(pkt) => {
2055                let mut eps = pkt.endpoints().to_vec();
2056                eps.sort_unstable();
2057                eps.dedup();
2058                Ok((eps, pkt.data_type()))
2059            }
2060            RelayItem::Packed(bytes) => {
2061                let env = wire_format::peek_envelope(bytes.as_ref())?;
2062                let mut eps: Vec<crate::DataEndpoint> = env.endpoints.iter().copied().collect();
2063                eps.sort_unstable();
2064                eps.dedup();
2065                Ok((eps, env.ty))
2066            }
2067        }
2068    }
2069
2070    fn endpoints_are_link_local_only(eps: &[crate::DataEndpoint]) -> bool {
2071        !eps.is_empty() && eps.iter().all(|ep| ep.is_link_local_only())
2072    }
2073
2074    fn item_target_senders(&self, data: &RelayItem) -> TelemetryResult<Arc<[u64]>> {
2075        match data {
2076            RelayItem::Packet(pkt) => Ok(Arc::from(pkt.wire_target_senders())),
2077            RelayItem::Packed(bytes) => {
2078                Ok(wire_format::peek_envelope(bytes.as_ref())?.target_senders)
2079            }
2080        }
2081    }
2082
2083    #[cfg(feature = "discovery")]
2084    fn has_explicit_route_policy_locked(
2085        st: &RelayInner,
2086        src: Option<RelaySideId>,
2087        ty: crate::DataType,
2088    ) -> bool {
2089        st.route_overrides
2090            .keys()
2091            .any(|(route_src, _)| *route_src == src)
2092            || Self::has_typed_route_overrides_locked(st, src, ty)
2093    }
2094
2095    #[cfg(feature = "discovery")]
2096    fn side_matches_target_senders_locked(
2097        st: &RelayInner,
2098        side: RelaySideId,
2099        target_senders: &[u64],
2100        now_ms: u64,
2101    ) -> bool {
2102        st.discovery_routes
2103            .get(&side)
2104            .map(|route| {
2105                if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2106                    return false;
2107                }
2108                route.announcers.values().any(|sender_state| {
2109                    if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2110                        return false;
2111                    }
2112                    sender_state
2113                        .topology_boards
2114                        .iter()
2115                        .any(|board| target_senders.contains(&Self::sender_hash(&board.sender_id)))
2116                })
2117            })
2118            .unwrap_or(false)
2119    }
2120
2121    fn remote_side_plan(
2122        &self,
2123        data: &RelayItem,
2124        exclude: RelaySideId,
2125    ) -> TelemetryResult<RemoteSidePlan> {
2126        #[cfg(feature = "discovery")]
2127        {
2128            let (eps, ty) = self.item_route_info(data)?;
2129            let target_senders = self.item_target_senders(data)?;
2130            let preferred_packet_id = Self::reliable_control_target_packet_id(data)?;
2131            if discovery::is_discovery_type(ty) {
2132                let mut st = self.state.lock();
2133                let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2134                return Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2135                    &mut st,
2136                    Some(exclude),
2137                    sides,
2138                    RouteSelectionOrigin::Flood,
2139                )));
2140            }
2141
2142            #[cfg(feature = "timesync")]
2143            let preferred_timesync_source = self.preferred_timesync_route_source(data, ty)?;
2144            #[cfg(not(feature = "timesync"))]
2145            let preferred_timesync_source: Option<String> = None;
2146            let mut st = self.state.lock();
2147            if let Some(packet_id) = preferred_packet_id {
2148                let target_side = self.allowed_target_side_locked(
2149                    &st,
2150                    exclude,
2151                    ty,
2152                    st.reliable_return_routes
2153                        .get(&packet_id)
2154                        .map(|route| route.side),
2155                );
2156                if let Some(side) = target_side {
2157                    #[cfg(feature = "timesync")]
2158                    if !Self::timesync_allowed_for_side_locked(
2159                        &mut st,
2160                        side,
2161                        ty,
2162                        self.clock.now_ms(),
2163                    ) {
2164                        return Ok(RemoteSidePlan::Target(Vec::new()));
2165                    }
2166                    return Ok(RemoteSidePlan::Target(vec![side]));
2167                }
2168                return Ok(RemoteSidePlan::Target(Vec::new()));
2169            }
2170            let restrict_link_local = Self::endpoints_are_link_local_only(&eps);
2171            let discovered_origin = if is_reliable_type(ty) {
2172                RouteSelectionOrigin::Flood
2173            } else {
2174                RouteSelectionOrigin::Discovered
2175            };
2176            if st.discovery_routes.is_empty() {
2177                let mut fallback = self.eligible_side_ids_locked(
2178                    &st,
2179                    Some(exclude),
2180                    Some(ty),
2181                    restrict_link_local,
2182                );
2183                #[cfg(feature = "timesync")]
2184                {
2185                    fallback = Self::filter_timesync_sides_locked(
2186                        &mut st,
2187                        ty,
2188                        self.clock.now_ms(),
2189                        fallback,
2190                    );
2191                }
2192                return Ok(RemoteSidePlan::Target(if fallback.len() == 1 {
2193                    fallback
2194                } else {
2195                    Vec::new()
2196                }));
2197            }
2198            let now_ms = self.clock.now_ms();
2199            let mut had_exact = false;
2200            let mut exact_targets = Vec::new();
2201            let mut had_known = false;
2202            let mut generic_targets = Vec::new();
2203
2204            for (&side, route) in st.discovery_routes.iter() {
2205                if side == exclude
2206                    || now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS
2207                {
2208                    continue;
2209                }
2210                if restrict_link_local
2211                    && st
2212                        .sides
2213                        .get(side)
2214                        .and_then(|side| side.as_ref())
2215                        .map(|s| !s.opts.link_local_enabled)
2216                        .unwrap_or(true)
2217                {
2218                    continue;
2219                }
2220                if !self.route_allowed_locked(&st, Some(exclude), Some(ty), side) {
2221                    continue;
2222                }
2223                if !target_senders.is_empty() {
2224                    if !Self::side_matches_target_senders_locked(&st, side, &target_senders, now_ms)
2225                    {
2226                        continue;
2227                    }
2228                    had_known = true;
2229                    generic_targets.push(side);
2230                    continue;
2231                }
2232                if preferred_timesync_source.as_deref().is_some_and(|source| {
2233                    route.reachable_timesync_sources.iter().any(|s| s == source)
2234                }) {
2235                    had_exact = true;
2236                    exact_targets.push(side);
2237                    continue;
2238                }
2239                if eps.iter().copied().any(|ep| route.reachable.contains(&ep)) {
2240                    had_known = true;
2241                    generic_targets.push(side);
2242                }
2243            }
2244
2245            if had_exact {
2246                #[cfg(feature = "timesync")]
2247                {
2248                    exact_targets = Self::filter_timesync_sides_locked(
2249                        &mut st,
2250                        ty,
2251                        self.clock.now_ms(),
2252                        exact_targets,
2253                    );
2254                }
2255                let targets = self.filter_end_to_end_satisfied_sides_locked(
2256                    &st,
2257                    data,
2258                    exact_targets,
2259                    &eps,
2260                    ty,
2261                )?;
2262                Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2263                    &mut st,
2264                    Some(exclude),
2265                    targets,
2266                    discovered_origin,
2267                )))
2268            } else if had_known {
2269                #[cfg(feature = "timesync")]
2270                {
2271                    generic_targets = Self::filter_timesync_sides_locked(
2272                        &mut st,
2273                        ty,
2274                        self.clock.now_ms(),
2275                        generic_targets,
2276                    );
2277                }
2278                let targets = self.filter_end_to_end_satisfied_sides_locked(
2279                    &st,
2280                    data,
2281                    generic_targets,
2282                    &eps,
2283                    ty,
2284                )?;
2285                Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2286                    &mut st,
2287                    Some(exclude),
2288                    targets,
2289                    discovered_origin,
2290                )))
2291            } else {
2292                if Self::has_explicit_route_policy_locked(&st, Some(exclude), ty) {
2293                    let mut sides = self.eligible_side_ids_locked(
2294                        &st,
2295                        Some(exclude),
2296                        Some(ty),
2297                        restrict_link_local,
2298                    );
2299                    #[cfg(feature = "timesync")]
2300                    {
2301                        sides = Self::filter_timesync_sides_locked(
2302                            &mut st,
2303                            ty,
2304                            self.clock.now_ms(),
2305                            sides,
2306                        );
2307                    }
2308                    Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2309                        &mut st,
2310                        Some(exclude),
2311                        sides,
2312                        RouteSelectionOrigin::Flood,
2313                    )))
2314                } else {
2315                    Ok(RemoteSidePlan::Target(Vec::new()))
2316                }
2317            }
2318        }
2319        #[cfg(not(feature = "discovery"))]
2320        {
2321            let (_, ty) = self.item_route_info(data)?;
2322            let mut st = self.state.lock();
2323            if let Some(packet_id) = Self::reliable_control_target_packet_id(data)? {
2324                let target_side = self.allowed_target_side_locked(
2325                    &st,
2326                    exclude,
2327                    ty,
2328                    st.reliable_return_routes
2329                        .get(&packet_id)
2330                        .map(|route| route.side),
2331                );
2332                if let Some(side) = target_side {
2333                    return Ok(RemoteSidePlan::Target(vec![side]));
2334                }
2335                return Ok(RemoteSidePlan::Target(Vec::new()));
2336            }
2337            let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2338            Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2339                &mut st,
2340                Some(exclude),
2341                sides,
2342                RouteSelectionOrigin::Flood,
2343            )))
2344        }
2345    }
2346
2347    #[inline]
2348    fn allowed_target_side_locked(
2349        &self,
2350        st: &RelayInner,
2351        exclude: RelaySideId,
2352        ty: crate::DataType,
2353        target_side: Option<RelaySideId>,
2354    ) -> Option<RelaySideId> {
2355        target_side.filter(|side| self.route_allowed_locked(st, Some(exclude), Some(ty), *side))
2356    }
2357
2358    fn filter_end_to_end_satisfied_sides_locked(
2359        &self,
2360        st: &RelayInner,
2361        data: &RelayItem,
2362        sides: Vec<RelaySideId>,
2363        eps: &[crate::DataEndpoint],
2364        ty: crate::DataType,
2365    ) -> TelemetryResult<Vec<RelaySideId>> {
2366        if !is_reliable_type(ty) || Self::reliable_control_target_packet_id(data)?.is_some() {
2367            return Ok(sides);
2368        }
2369        let packet_id = match data {
2370            RelayItem::Packet(pkt) => pkt.packet_id(),
2371            RelayItem::Packed(bytes) => match wire_format::packet_id_from_wire(bytes.as_ref()) {
2372                Ok(packet_id) => packet_id,
2373                Err(TelemetryError::Unpack("reliable control frame")) => return Ok(sides),
2374                Err(err) => return Err(err),
2375            },
2376        };
2377        let Some(acked) = st.end_to_end_acked_destinations.get(&packet_id) else {
2378            return Ok(sides);
2379        };
2380        let now_ms = self.clock.now_ms();
2381        let mut filtered = Vec::new();
2382        for side in sides {
2383            let Some(route) = st.discovery_routes.get(&side) else {
2384                filtered.push(side);
2385                continue;
2386            };
2387            let mut still_pending = false;
2388            let mut had_destination_board = false;
2389            for sender_state in route.announcers.values() {
2390                if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2391                    continue;
2392                }
2393                for board in sender_state.topology_boards.iter() {
2394                    if !self.is_end_to_end_destination_sender(&board.sender_id) {
2395                        continue;
2396                    }
2397                    had_destination_board = true;
2398                    let sender_hash = Self::sender_hash(&board.sender_id);
2399                    if acked.contains(&sender_hash) {
2400                        continue;
2401                    }
2402                    if eps
2403                        .iter()
2404                        .copied()
2405                        .any(|ep| board.reachable_endpoints.contains(&ep))
2406                    {
2407                        still_pending = true;
2408                        break;
2409                    }
2410                    // Keep forwarding while any discovered destination sender for this packet
2411                    // remains unacked, even if topology/schema metadata changed for new packets.
2412                    still_pending = true;
2413                    break;
2414                }
2415                if still_pending {
2416                    break;
2417                }
2418            }
2419            if still_pending || !had_destination_board {
2420                filtered.push(side);
2421            }
2422        }
2423        Ok(filtered)
2424    }
2425
2426    #[cfg(feature = "discovery")]
2427    fn side_has_multiple_announcers_locked(
2428        &self,
2429        st: &RelayInner,
2430        side: RelaySideId,
2431        now_ms: u64,
2432    ) -> bool {
2433        st.discovery_routes
2434            .get(&side)
2435            .map(|route| {
2436                route
2437                    .announcers
2438                    .values()
2439                    .filter(|sender| {
2440                        now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2441                    })
2442                    .take(2)
2443                    .count()
2444                    > 1
2445            })
2446            .unwrap_or(false)
2447    }
2448
2449    #[cfg(not(feature = "discovery"))]
2450    fn side_has_multiple_announcers_locked(
2451        &self,
2452        _st: &RelayInner,
2453        _side: RelaySideId,
2454        _now_ms: u64,
2455    ) -> bool {
2456        false
2457    }
2458
2459    #[cfg(feature = "discovery")]
2460    fn sender_topology_board_mut<'a>(
2461        sender_state: &'a mut DiscoverySenderState,
2462        sender_id: &str,
2463    ) -> &'a mut TopologyBoardNode {
2464        if let Some(idx) = sender_state
2465            .topology_boards
2466            .iter()
2467            .position(|board| board.sender_id == sender_id)
2468        {
2469            return &mut sender_state.topology_boards[idx];
2470        }
2471        sender_state.topology_boards.push(TopologyBoardNode {
2472            sender_id: sender_id.to_string(),
2473            reachable_endpoints: Vec::new(),
2474            reachable_timesync_sources: Vec::new(),
2475            connections: Vec::new(),
2476        });
2477        sender_state
2478            .topology_boards
2479            .last_mut()
2480            .expect("board inserted above")
2481    }
2482
2483    #[cfg(feature = "discovery")]
2484    fn refresh_sender_topology_state(sender_state: &mut DiscoverySenderState) {
2485        discovery::normalize_topology_boards(&mut sender_state.topology_boards);
2486        let (reachable, reachable_timesync_sources) =
2487            discovery::summarize_topology_boards(&sender_state.topology_boards);
2488        sender_state.reachable = reachable;
2489        sender_state.reachable_timesync_sources = reachable_timesync_sources;
2490    }
2491
2492    #[cfg(feature = "discovery")]
2493    fn recompute_discovery_side_state(route: &mut DiscoverySideState) {
2494        let mut reachable = Vec::new();
2495        let mut reachable_timesync_sources = Vec::new();
2496        let mut last_seen_ms = 0u64;
2497        for sender in route.announcers.values() {
2498            reachable.extend(sender.reachable.iter().copied());
2499            reachable_timesync_sources.extend(sender.reachable_timesync_sources.iter().cloned());
2500            last_seen_ms = last_seen_ms.max(sender.last_seen_ms);
2501        }
2502        reachable.sort_unstable();
2503        reachable.dedup();
2504        reachable_timesync_sources.sort_unstable();
2505        reachable_timesync_sources.dedup();
2506        route.reachable = reachable;
2507        route.reachable_timesync_sources = reachable_timesync_sources;
2508        route.last_seen_ms = last_seen_ms;
2509    }
2510
2511    #[cfg(feature = "discovery")]
2512    fn local_discovery_topology_board(&self, st: &RelayInner, now_ms: u64) -> TopologyBoardNode {
2513        let mut connections = Vec::new();
2514        for route in st.discovery_routes.values() {
2515            if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2516                continue;
2517            }
2518            for (sender, sender_state) in route.announcers.iter() {
2519                if now_ms.saturating_sub(sender_state.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS {
2520                    connections.push(sender.clone());
2521                }
2522            }
2523        }
2524        connections.sort_unstable();
2525        connections.dedup();
2526        let sender = self.sender_arc();
2527        TopologyBoardNode {
2528            sender_id: sender.to_string(),
2529            reachable_endpoints: Vec::new(),
2530            reachable_timesync_sources: Vec::new(),
2531            connections,
2532        }
2533    }
2534
2535    #[cfg(feature = "discovery")]
2536    fn advertised_discovery_topology_for_link_locked(
2537        &self,
2538        st: &RelayInner,
2539        now_ms: u64,
2540        link_local_enabled: bool,
2541    ) -> Vec<TopologyBoardNode> {
2542        let mut boards = vec![self.local_discovery_topology_board(st, now_ms)];
2543        for route in st.discovery_routes.values() {
2544            if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2545                continue;
2546            }
2547            for (announcer, sender_state) in route.announcers.iter() {
2548                if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2549                    continue;
2550                }
2551                let mut sender_boards = sender_state.topology_boards.clone();
2552                if sender_boards.is_empty() {
2553                    let sender = self.sender_arc();
2554                    sender_boards.push(TopologyBoardNode {
2555                        sender_id: announcer.clone(),
2556                        reachable_endpoints: sender_state.reachable.clone(),
2557                        reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
2558                        connections: vec![sender.to_string()],
2559                    });
2560                } else if let Some(board) = sender_boards
2561                    .iter_mut()
2562                    .find(|board| board.sender_id == *announcer)
2563                {
2564                    board.connections.push(self.sender_arc().to_string());
2565                }
2566                if !link_local_enabled {
2567                    for board in sender_boards.iter_mut() {
2568                        board
2569                            .reachable_endpoints
2570                            .retain(|ep| !ep.is_link_local_only());
2571                    }
2572                }
2573                discovery::merge_topology_boards(&mut boards, &sender_boards);
2574            }
2575        }
2576        discovery::normalize_topology_boards(&mut boards);
2577        boards
2578    }
2579
2580    #[cfg(feature = "discovery")]
2581    fn note_discovery_topology_change_locked(st: &mut RelayInner, now_ms: u64) {
2582        st.discovery_cadence.on_topology_change(now_ms);
2583    }
2584
2585    #[cfg(feature = "discovery")]
2586    fn prune_discovery_routes_locked(st: &mut RelayInner, now_ms: u64) -> bool {
2587        let before = st.discovery_routes.clone();
2588        st.discovery_routes.retain(|_, route| {
2589            route.announcers.retain(|_, sender| {
2590                now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2591            });
2592            Self::recompute_discovery_side_state(route);
2593            !route.announcers.is_empty()
2594        });
2595        st.discovery_routes != before
2596    }
2597
2598    #[cfg(feature = "discovery")]
2599    fn reconcile_end_to_end_acked_destinations_locked(&self, st: &mut RelayInner) {
2600        let mut active_senders = BTreeSet::new();
2601        for route in st.discovery_routes.values() {
2602            for sender_state in route.announcers.values() {
2603                for board in sender_state.topology_boards.iter() {
2604                    if self.is_end_to_end_destination_sender(&board.sender_id) {
2605                        active_senders.insert(Self::sender_hash(&board.sender_id));
2606                    }
2607                }
2608            }
2609        }
2610        st.end_to_end_acked_destinations.retain(|_, acked| {
2611            acked.retain(|sender_hash| active_senders.contains(sender_hash));
2612            !acked.is_empty()
2613        });
2614    }
2615
2616    #[cfg(feature = "discovery")]
2617    fn advertised_discovery_endpoints_for_link_locked(
2618        &self,
2619        st: &RelayInner,
2620        now_ms: u64,
2621        link_local_enabled: bool,
2622    ) -> Vec<crate::DataEndpoint> {
2623        let (reachable_endpoints, _) = discovery::summarize_topology_boards(
2624            &self.advertised_discovery_topology_for_link_locked(st, now_ms, link_local_enabled),
2625        );
2626        reachable_endpoints
2627            .into_iter()
2628            .filter(|ep| {
2629                !discovery::is_discovery_endpoint(*ep)
2630                    && (link_local_enabled || !ep.is_link_local_only())
2631            })
2632            .collect()
2633    }
2634
2635    #[cfg(feature = "discovery")]
2636    fn advertised_discovery_timesync_sources_for_link_locked(
2637        &self,
2638        st: &RelayInner,
2639        now_ms: u64,
2640    ) -> Vec<String> {
2641        let (_, sources) = discovery::summarize_topology_boards(
2642            &self.advertised_discovery_topology_for_link_locked(st, now_ms, true),
2643        );
2644        sources
2645    }
2646
2647    #[cfg(feature = "discovery")]
2648    #[cfg(feature = "timesync")]
2649    fn preferred_timesync_route_source(
2650        &self,
2651        data: &RelayItem,
2652        ty: crate::DataType,
2653    ) -> TelemetryResult<Option<String>> {
2654        if !matches!(
2655            ty,
2656            crate::DataType::TimeSyncAnnounce | crate::DataType::TimeSyncResponse
2657        ) {
2658            return Ok(None);
2659        }
2660
2661        let sender = match data {
2662            RelayItem::Packet(pkt) => pkt.sender().to_owned(),
2663            RelayItem::Packed(bytes) => {
2664                if wire_format::peek_frame_info(bytes.as_ref())
2665                    .ok()
2666                    .is_some_and(|frame| frame.ack_only())
2667                {
2668                    return Ok(None);
2669                }
2670                wire_format::unpack_packet(bytes.as_ref())?
2671                    .sender()
2672                    .to_owned()
2673            }
2674        };
2675        Ok(Some(sender))
2676    }
2677
2678    #[cfg(feature = "discovery")]
2679    #[inline]
2680    fn side_is_slow_control_link_locked(
2681        st: &RelayInner,
2682        side_id: RelaySideId,
2683        now_ms: u64,
2684    ) -> bool {
2685        st.adaptive_route_stats.get(&side_id).is_some_and(|stats| {
2686            let recent_slow = stats.last_slow_observed_ms > 0
2687                && now_ms.saturating_sub(stats.last_slow_observed_ms)
2688                    <= DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS;
2689            stats.sample_count > 0
2690                && ((stats.estimated_bandwidth_bps > 0
2691                    && stats.estimated_bandwidth_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS)
2692                    || recent_slow)
2693        })
2694    }
2695
2696    #[cfg(feature = "discovery")]
2697    fn discovery_level_for_side_locked(
2698        st: &mut RelayInner,
2699        side_id: RelaySideId,
2700        now_ms: u64,
2701    ) -> Option<DiscoveryAdvertiseLevel> {
2702        if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2703            st.discovery_side_throttle.remove(&side_id);
2704            return Some(DiscoveryAdvertiseLevel::Full);
2705        }
2706
2707        let throttle = st.discovery_side_throttle.entry(side_id).or_default();
2708        if now_ms >= throttle.next_full_ms {
2709            throttle.next_full_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS);
2710            throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2711            return Some(DiscoveryAdvertiseLevel::Full);
2712        }
2713        if now_ms >= throttle.next_ping_ms {
2714            throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2715            return Some(DiscoveryAdvertiseLevel::MinimalPing);
2716        }
2717        None
2718    }
2719
2720    #[cfg(all(feature = "discovery", feature = "timesync"))]
2721    #[inline]
2722    fn is_timesync_type(ty: crate::DataType) -> bool {
2723        matches!(
2724            ty,
2725            crate::DataType::TimeSyncAnnounce
2726                | crate::DataType::TimeSyncRequest
2727                | crate::DataType::TimeSyncResponse
2728        )
2729    }
2730
2731    #[cfg(all(feature = "discovery", feature = "timesync"))]
2732    fn timesync_allowed_for_side_locked(
2733        st: &mut RelayInner,
2734        side_id: RelaySideId,
2735        ty: crate::DataType,
2736        now_ms: u64,
2737    ) -> bool {
2738        if !Self::is_timesync_type(ty) {
2739            return true;
2740        }
2741        if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2742            st.timesync_side_throttle.remove(&side_id);
2743            return true;
2744        }
2745
2746        let throttle = st.timesync_side_throttle.entry(side_id).or_default();
2747        if now_ms >= throttle.next_allowed_ms {
2748            throttle.next_allowed_ms = now_ms.saturating_add(TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS);
2749            return true;
2750        }
2751        false
2752    }
2753
2754    #[cfg(all(feature = "discovery", feature = "timesync"))]
2755    fn filter_timesync_sides_locked(
2756        st: &mut RelayInner,
2757        ty: crate::DataType,
2758        now_ms: u64,
2759        sides: Vec<RelaySideId>,
2760    ) -> Vec<RelaySideId> {
2761        sides
2762            .into_iter()
2763            .filter(|side| Self::timesync_allowed_for_side_locked(st, *side, ty, now_ms))
2764            .collect()
2765    }
2766
2767    #[cfg(feature = "discovery")]
2768    fn queue_discovery_announce(&self) -> TelemetryResult<()> {
2769        let now_ms = self.clock.now_ms();
2770        let per_side = {
2771            let mut st = self.state.lock();
2772            if Self::prune_discovery_routes_locked(&mut st, now_ms) {
2773                self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2774                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2775            }
2776            st.fit_discovery_budget();
2777            if !st.sides.iter().any(|side| side.is_some()) {
2778                return Ok(());
2779            }
2780            st.discovery_cadence.on_announce_sent(now_ms);
2781            let side_entries = st
2782                .sides
2783                .iter()
2784                .enumerate()
2785                .filter_map(|(side_id, side)| {
2786                    side.as_ref()
2787                        .map(|side| (side_id, side.opts.link_local_enabled, side.opts))
2788                })
2789                .collect::<Vec<_>>();
2790            let mut per_side = Vec::new();
2791            for (side_id, link_local_enabled, opts) in side_entries {
2792                if !self.route_allowed_locked(
2793                    &st,
2794                    None,
2795                    Some(crate::DataType::DiscoveryAnnounce),
2796                    side_id,
2797                ) {
2798                    continue;
2799                }
2800                let Some(level) = Self::discovery_level_for_side_locked(&mut st, side_id, now_ms)
2801                else {
2802                    continue;
2803                };
2804                let capabilities = opts.link_capabilities();
2805                if level == DiscoveryAdvertiseLevel::MinimalPing {
2806                    per_side.push((
2807                        side_id,
2808                        level,
2809                        Vec::new(),
2810                        Vec::new(),
2811                        Vec::new(),
2812                        capabilities,
2813                    ));
2814                    continue;
2815                }
2816                let endpoints = self.advertised_discovery_endpoints_for_link_locked(
2817                    &st,
2818                    now_ms,
2819                    link_local_enabled,
2820                );
2821                let timesync_sources =
2822                    self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
2823                let topology = self.advertised_discovery_topology_for_link_locked(
2824                    &st,
2825                    now_ms,
2826                    link_local_enabled,
2827                );
2828                per_side.push((
2829                    side_id,
2830                    level,
2831                    endpoints,
2832                    timesync_sources,
2833                    topology,
2834                    capabilities,
2835                ));
2836            }
2837            per_side
2838        };
2839        let mut st = self.state.lock();
2840        for (dst, level, endpoints, timesync_sources, topology, capabilities) in per_side {
2841            let sender = self.sender_arc();
2842            if level == DiscoveryAdvertiseLevel::Full {
2843                let pkt = discovery::build_discovery_schema(sender.as_ref(), now_ms)?;
2844                let data = RelayItem::Packet(Arc::new(pkt));
2845                let priority = Self::relay_item_priority(&data)?;
2846                st.push_tx(RelayTxItem {
2847                    src: None,
2848                    dst,
2849                    data,
2850                    priority,
2851                })?;
2852            }
2853            if level == DiscoveryAdvertiseLevel::Full {
2854                let pkt = discovery::build_discovery_link_capabilities(
2855                    sender.as_ref(),
2856                    now_ms,
2857                    capabilities,
2858                )?;
2859                let data = RelayItem::Packet(Arc::new(pkt));
2860                let priority = Self::relay_item_priority(&data)?;
2861                st.push_tx(RelayTxItem {
2862                    src: None,
2863                    dst,
2864                    data,
2865                    priority,
2866                })?;
2867            }
2868            if level == DiscoveryAdvertiseLevel::MinimalPing || !endpoints.is_empty() {
2869                let pkt = discovery::build_discovery_announce(
2870                    sender.as_ref(),
2871                    now_ms,
2872                    endpoints.as_slice(),
2873                )?;
2874                let data = RelayItem::Packet(Arc::new(pkt));
2875                let priority = Self::relay_item_priority(&data)?;
2876                st.push_tx(RelayTxItem {
2877                    src: None,
2878                    dst,
2879                    data,
2880                    priority,
2881                })?;
2882            }
2883            if level == DiscoveryAdvertiseLevel::Full && !timesync_sources.is_empty() {
2884                let pkt = discovery::build_discovery_timesync_sources(
2885                    sender.as_ref(),
2886                    now_ms,
2887                    timesync_sources.as_slice(),
2888                )?;
2889                let data = RelayItem::Packet(Arc::new(pkt));
2890                let priority = Self::relay_item_priority(&data)?;
2891                st.push_tx(RelayTxItem {
2892                    src: None,
2893                    dst,
2894                    data,
2895                    priority,
2896                })?;
2897            }
2898            if level == DiscoveryAdvertiseLevel::Full && !topology.is_empty() {
2899                let pkt = discovery::build_discovery_topology(sender.as_ref(), now_ms, &topology)?;
2900                let data = RelayItem::Packet(Arc::new(pkt));
2901                let priority = Self::relay_item_priority(&data)?;
2902                st.push_tx(RelayTxItem {
2903                    src: None,
2904                    dst,
2905                    data,
2906                    priority,
2907                })?;
2908            }
2909        }
2910        Ok(())
2911    }
2912
2913    #[cfg(feature = "discovery")]
2914    fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
2915        let now_ms = self.clock.now_ms();
2916        let due = {
2917            let mut st = self.state.lock();
2918            let removed = Self::prune_discovery_routes_locked(&mut st, now_ms);
2919            if removed {
2920                self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2921                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2922            }
2923            st.fit_discovery_budget();
2924            let has_any = st.sides.iter().enumerate().any(|(side_id, side)| {
2925                let Some(side) = side.as_ref() else {
2926                    return false;
2927                };
2928                if !self.route_allowed_locked(
2929                    &st,
2930                    None,
2931                    Some(crate::DataType::DiscoveryAnnounce),
2932                    side_id,
2933                ) {
2934                    return false;
2935                }
2936                let _ = side;
2937                true
2938            });
2939            if !st.sides.iter().any(|side| side.is_some()) || !has_any {
2940                return Ok(false);
2941            }
2942            st.discovery_cadence.due(now_ms)
2943        };
2944        if !due {
2945            return Ok(false);
2946        }
2947        self.queue_discovery_announce()?;
2948        Ok(true)
2949    }
2950
2951    #[cfg(feature = "discovery")]
2952    fn learn_discovery_item(&self, src: RelaySideId, data: &RelayItem) -> TelemetryResult<()> {
2953        let pkt = match data {
2954            RelayItem::Packet(pkt) => {
2955                if !discovery::is_discovery_type(pkt.data_type()) {
2956                    return Ok(());
2957                }
2958                pkt.as_ref().clone()
2959            }
2960            RelayItem::Packed(bytes) => {
2961                let env = wire_format::peek_envelope(bytes.as_ref())?;
2962                if !discovery::is_discovery_type(env.ty) {
2963                    return Ok(());
2964                }
2965                if wire_format::peek_frame_info(bytes.as_ref())
2966                    .ok()
2967                    .is_some_and(|frame| frame.ack_only())
2968                {
2969                    return Ok(());
2970                }
2971                wire_format::unpack_packet(bytes.as_ref())?
2972            }
2973        };
2974
2975        let now_ms = self.clock.now_ms();
2976        if pkt.data_type() == crate::DataType::DiscoverySchema {
2977            let snapshot = discovery::decode_discovery_schema(&pkt)?;
2978            let incoming_cost = crate::config::owned_schema_byte_cost(&snapshot);
2979            let mut st = self.state.lock();
2980            st.make_shared_queue_room(incoming_cost, RelayQueueKind::Discovery)?;
2981            let budget = st.memory.max_queue_budget;
2982            drop(st);
2983            let report = crate::config::merge_owned_schema_snapshot_with_budget(snapshot, budget)?;
2984            if report.changed() {
2985                let mut st = self.state.lock();
2986                st.fit_discovery_budget();
2987                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2988            }
2989            return Ok(());
2990        }
2991        if pkt.data_type() == crate::DataType::DiscoveryLinkCapabilities {
2992            let _ = discovery::decode_discovery_link_capabilities(&pkt)?;
2993            return Ok(());
2994        }
2995        let mut st = self.state.lock();
2996        if pkt.data_type() == crate::DataType::DiscoveryLeave {
2997            let leaving = pkt.sender();
2998            let before = st.discovery_routes.clone();
2999            for route in st.discovery_routes.values_mut() {
3000                route.announcers.remove(leaving);
3001                for sender_state in route.announcers.values_mut() {
3002                    sender_state
3003                        .topology_boards
3004                        .retain(|board| board.sender_id != leaving);
3005                    for board in sender_state.topology_boards.iter_mut() {
3006                        board.connections.retain(|peer| peer != leaving);
3007                    }
3008                    Self::refresh_sender_topology_state(sender_state);
3009                }
3010                Self::recompute_discovery_side_state(route);
3011            }
3012            st.discovery_routes
3013                .retain(|_, route| !route.announcers.is_empty());
3014            if st.discovery_routes != before {
3015                Self::note_discovery_topology_change_locked(&mut st, now_ms);
3016            }
3017            let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3018            self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3019            return Ok(());
3020        }
3021        let address_ad = if pkt.data_type() == crate::DataType::DiscoveryAddress {
3022            Some(discovery::decode_discovery_address(&pkt)?)
3023        } else {
3024            None
3025        };
3026        let announcer_id = address_ad
3027            .as_ref()
3028            .map(|ad| ad.hostname.clone())
3029            .unwrap_or_else(|| pkt.sender().to_string());
3030        let mut route = st.discovery_routes.get(&src).cloned().unwrap_or_default();
3031        let side_link_local_enabled = st
3032            .sides
3033            .get(src)
3034            .and_then(|entry| entry.as_ref())
3035            .map(|side_ref| side_ref.opts.link_local_enabled)
3036            .unwrap_or(false);
3037        let mut sender_state = route
3038            .announcers
3039            .get(&announcer_id)
3040            .cloned()
3041            .unwrap_or_default();
3042        let changed = match pkt.data_type() {
3043            crate::DataType::DiscoveryAddress => {
3044                let ad = address_ad.expect("decoded above");
3045                let mut reachable = ad.reachable_endpoints;
3046                if !side_link_local_enabled {
3047                    reachable.retain(|ep| !ep.is_link_local_only());
3048                }
3049                let board = Self::sender_topology_board_mut(&mut sender_state, &ad.hostname);
3050                let changed = board.reachable_endpoints != reachable
3051                    || board.reachable_timesync_sources != ad.reachable_timesync_sources;
3052                board.reachable_endpoints = reachable;
3053                board.reachable_timesync_sources = ad.reachable_timesync_sources;
3054                Self::refresh_sender_topology_state(&mut sender_state);
3055                changed
3056            }
3057            crate::DataType::DiscoveryAnnounce => {
3058                let mut reachable = discovery::decode_discovery_announce(&pkt)?;
3059                if !side_link_local_enabled {
3060                    reachable.retain(|ep| !ep.is_link_local_only());
3061                }
3062                let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3063                let changed = board.reachable_endpoints != reachable;
3064                board.reachable_endpoints = reachable;
3065                Self::refresh_sender_topology_state(&mut sender_state);
3066                changed
3067            }
3068            crate::DataType::DiscoveryTimeSyncSources => {
3069                let sources = discovery::decode_discovery_timesync_sources(&pkt)?;
3070                let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3071                let changed = board.reachable_timesync_sources != sources;
3072                board.reachable_timesync_sources = sources;
3073                Self::refresh_sender_topology_state(&mut sender_state);
3074                changed
3075            }
3076            crate::DataType::DiscoveryTopology => {
3077                let mut boards = discovery::decode_discovery_topology(&pkt)?;
3078                if !side_link_local_enabled {
3079                    for board in boards.iter_mut() {
3080                        board
3081                            .reachable_endpoints
3082                            .retain(|ep| !ep.is_link_local_only());
3083                    }
3084                }
3085                let changed = sender_state.topology_boards != boards;
3086                sender_state.topology_boards = boards;
3087                Self::refresh_sender_topology_state(&mut sender_state);
3088                changed
3089            }
3090            crate::DataType::DiscoverySchema => false,
3091            _ => false,
3092        };
3093        sender_state.last_seen_ms = now_ms;
3094        route.announcers.insert(announcer_id, sender_state);
3095        Self::recompute_discovery_side_state(&mut route);
3096        st.discovery_routes.insert(src, route);
3097        st.fit_discovery_budget();
3098        if changed {
3099            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3100        }
3101        let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3102        self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3103        Ok(())
3104    }
3105
3106    #[cfg(not(feature = "discovery"))]
3107    fn learn_discovery_item(&self, _src: RelaySideId, _data: &RelayItem) -> TelemetryResult<()> {
3108        Ok(())
3109    }
3110
3111    #[cfg(not(feature = "discovery"))]
3112    fn queue_discovery_announce(&self) -> TelemetryResult<()> {
3113        Ok(())
3114    }
3115
3116    #[cfg(not(feature = "discovery"))]
3117    fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
3118        Ok(false)
3119    }
3120
3121    fn process_reliable_timeouts(&self) -> TelemetryResult<()> {
3122        let now = self.clock.now_ms();
3123        let mut requeue: Vec<(RelaySideId, crate::DataType, u32)> = Vec::new();
3124
3125        {
3126            let mut st = self.state.lock();
3127            if st.reliable_tx.is_empty() {
3128                return Ok(());
3129            }
3130
3131            for ((side, ty_u32), tx_state) in st.reliable_tx.iter_mut() {
3132                let Some(ty) = crate::DataType::try_from_u32(*ty_u32) else {
3133                    continue;
3134                };
3135                let sent_order: Vec<u32> = tx_state.sent_order.iter().copied().collect();
3136                for seq in sent_order {
3137                    let Some(sent) = tx_state.sent.get_mut(&seq) else {
3138                        continue;
3139                    };
3140                    if sent.queued
3141                        || now.wrapping_sub(sent.last_send_ms) < runtime_reliable_retransmit_ms()
3142                    {
3143                        continue;
3144                    }
3145                    if sent.partial_acked {
3146                        continue;
3147                    }
3148                    if sent.retries >= runtime_reliable_max_retries() {
3149                        tx_state.sent.remove(&seq);
3150                        tx_state.sent_order.retain(|existing| *existing != seq);
3151                        continue;
3152                    }
3153                    sent.retries += 1;
3154                    requeue.push((*side, ty, seq));
3155                }
3156            }
3157        }
3158
3159        for (side, ty, seq) in requeue {
3160            self.queue_reliable_retransmit(side, ty, seq)?;
3161        }
3162
3163        Ok(())
3164    }
3165
3166    /// Compute a de-dupe hash for a QueueItem.
3167    /// Uses packet ID for Packet items, and attempts to extract packet ID from
3168    /// packed bytes. If extraction fails, hashes raw bytes as a fallback.
3169    fn get_hash(item: &RelayRxItem) -> u64 {
3170        match &item.data {
3171            RelayItem::Packet(pkt) => pkt.packet_id(),
3172            RelayItem::Packed(bytes) => {
3173                let reliable_seq = wire_format::peek_frame_info(bytes.as_ref())
3174                    .ok()
3175                    .and_then(|frame| frame.reliable)
3176                    .and_then(|hdr| {
3177                        if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
3178                            None
3179                        } else {
3180                            Some(hdr.seq)
3181                        }
3182                    });
3183
3184                match wire_format::packet_id_from_wire(bytes.as_ref()) {
3185                    Ok(id) => {
3186                        if let Some(seq) = reliable_seq {
3187                            hash_bytes_u64(id, &seq.to_le_bytes())
3188                        } else {
3189                            id
3190                        }
3191                    }
3192                    Err(_e) => {
3193                        // Fallback: if bytes are malformed (or compression feature mismatch),
3194                        // hash raw bytes so we can still dedupe identical network duplicates.
3195                        let h: u64 = 0x9E37_79B9_7F4A_7C15;
3196                        hash_bytes_u64(h, bytes.as_ref())
3197                    }
3198                }
3199            }
3200        }
3201    }
3202
3203    /// Compute a dedupe ID for an incoming RelayRxItem.
3204    /// Note: we intentionally do *not* include `src` so that the same
3205    /// packet coming from multiple sides is only processed once.
3206    fn is_duplicate_pkt(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3207        let id = Self::get_hash(item);
3208
3209        let mut st = self.state.lock();
3210        if st.recent_rx.contains(&id) {
3211            Ok(true)
3212        } else {
3213            st.push_recent_rx(id)?;
3214            Ok(false)
3215        }
3216    }
3217
3218    fn should_forward_duplicate_reliable_item(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3219        let (_, ty) = self.item_route_info(&item.data)?;
3220        if !is_reliable_type(ty)
3221            || matches!(
3222                ty,
3223                crate::DataType::ReliableAck
3224                    | crate::DataType::ReliablePartialAck
3225                    | crate::DataType::ReliablePacketRequest
3226            )
3227        {
3228            return Ok(false);
3229        }
3230
3231        let RemoteSidePlan::Target(sides) = self.remote_side_plan(&item.data, item.src)?;
3232        let st = self.state.lock();
3233        let now_ms = self.clock.now_ms();
3234        Ok(sides
3235            .into_iter()
3236            .any(|side| self.side_has_multiple_announcers_locked(&st, side, now_ms)))
3237    }
3238
3239    /// Register a side whose TX callback consumes packed packet bytes.
3240    ///
3241    /// Returns the side id later used for ingress APIs such as `rx_packed_from_side`.
3242    /// The default options disable the relay's per-link reliable framing on this side.
3243    pub fn add_side_packed<F>(&self, name: &'static str, tx: F) -> RelaySideId
3244    where
3245        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3246    {
3247        self.add_side_packed_with_options(name, tx, RelaySideOptions::default())
3248    }
3249
3250    /// Register a packed side with bounded-frame transport enabled.
3251    ///
3252    /// `max_frame_bytes == 0` leaves frames unbounded.
3253    pub fn add_side_packed_small_packets<F>(
3254        &self,
3255        name: &'static str,
3256        tx: F,
3257        max_frame_bytes: usize,
3258    ) -> RelaySideId
3259    where
3260        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3261    {
3262        self.add_side_packed_with_options(
3263            name,
3264            tx,
3265            RelaySideOptions::default().with_small_packet_transport(max_frame_bytes),
3266        )
3267    }
3268
3269    /// Register a packed-output side with explicit side options.
3270    ///
3271    /// `opts.reliable_enabled` enables relay-managed per-hop ACK/retransmit behavior on this side.
3272    /// `opts.link_local_enabled` gates link-local-only forwarding and discovery use of this side.
3273    /// `ingress_enabled` and `egress_enabled` set the initial directional policy.
3274    pub fn add_side_packed_with_options<F>(
3275        &self,
3276        name: &'static str,
3277        tx: F,
3278        opts: RelaySideOptions,
3279    ) -> RelaySideId
3280    where
3281        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3282    {
3283        let mut st = self.state.lock();
3284        let id = st.sides.len();
3285        st.sides.push(Some(RelaySide {
3286            name,
3287            tx_handler: RelayTxHandlerFn::Packed(Arc::new(tx)),
3288            opts,
3289        }));
3290        st.side_runtime_stats
3291            .insert(id, SideRuntimeStatsInner::default());
3292        st.side_transport.insert(id, SideTransportState::default());
3293        #[cfg(feature = "discovery")]
3294        Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3295        id
3296    }
3297
3298    /// Register a side whose TX callback receives decoded [`Packet`] values.
3299    ///
3300    /// Packet-output sides do not preserve the relay's packed reliable hop framing, so use a
3301    /// packed side when this hop should participate in relay-managed per-link reliability.
3302    pub fn add_side_packet<F>(&self, name: &'static str, tx: F) -> RelaySideId
3303    where
3304        F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3305    {
3306        self.add_side_packet_with_options(name, tx, RelaySideOptions::default())
3307    }
3308
3309    /// Register a packet-output side with explicit side options.
3310    pub fn add_side_packet_with_options<F>(
3311        &self,
3312        name: &'static str,
3313        tx: F,
3314        opts: RelaySideOptions,
3315    ) -> RelaySideId
3316    where
3317        F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3318    {
3319        let mut st = self.state.lock();
3320        let id = st.sides.len();
3321        st.sides.push(Some(RelaySide {
3322            name,
3323            tx_handler: RelayTxHandlerFn::Packet(Arc::new(tx)),
3324            opts,
3325        }));
3326        st.side_runtime_stats
3327            .insert(id, SideRuntimeStatsInner::default());
3328        st.side_transport.insert(id, SideTransportState::default());
3329        #[cfg(feature = "discovery")]
3330        Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3331        id
3332    }
3333
3334    /// Remove a side while keeping existing side IDs stable.
3335    ///
3336    /// `side` must be an id returned by one of the `add_side_*` calls. Remaining side ids are not
3337    /// renumbered.
3338    pub fn remove_side(&self, side: RelaySideId) -> TelemetryResult<()> {
3339        let now_ms = self.clock.now_ms();
3340        let mut st = self.state.lock();
3341        let slot = st.sides.get_mut(side).ok_or(TelemetryError::BadArg)?;
3342        if slot.is_none() {
3343            return Err(TelemetryError::BadArg);
3344        }
3345        *slot = None;
3346        st.route_overrides
3347            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3348        st.typed_route_overrides
3349            .retain(|(src_side, _, dst_side), _| *src_side != Some(side) && *dst_side != side);
3350        st.route_weights
3351            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3352        st.route_priorities
3353            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3354        st.source_route_modes.remove(&Some(side));
3355        st.route_selection_cursors.remove(&Some(side));
3356        st.adaptive_route_stats.remove(&side);
3357        #[cfg(feature = "discovery")]
3358        st.discovery_side_throttle.remove(&side);
3359        #[cfg(all(feature = "discovery", feature = "timesync"))]
3360        st.timesync_side_throttle.remove(&side);
3361        st.side_runtime_stats.remove(&side);
3362        st.reliable_return_routes
3363            .retain(|_, route| route.side != side);
3364        st.rx_queue.retain(|queued| queued.src != side);
3365        st.tx_queue
3366            .retain(|queued| queued.dst != side && queued.src != Some(side));
3367        st.replay_queue.retain(|queued| queued.dst != side);
3368        st.reliable_tx.retain(|(side_id, _), _| *side_id != side);
3369        st.reliable_rx.retain(|(side_id, _), _| *side_id != side);
3370        #[cfg(feature = "discovery")]
3371        {
3372            st.discovery_routes.remove(&side);
3373            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3374        }
3375        Ok(())
3376    }
3377
3378    /// Enable or disable ingress processing for a registered side.
3379    pub fn set_side_ingress_enabled(
3380        &self,
3381        side: RelaySideId,
3382        enabled: bool,
3383    ) -> TelemetryResult<()> {
3384        let now_ms = self.clock.now_ms();
3385        let mut st = self.state.lock();
3386        let side_ref = st
3387            .sides
3388            .get_mut(side)
3389            .and_then(|side| side.as_mut())
3390            .ok_or(TelemetryError::BadArg)?;
3391        side_ref.opts.ingress_enabled = enabled;
3392        #[cfg(feature = "discovery")]
3393        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3394        Ok(())
3395    }
3396
3397    /// Enable or disable egress toward a registered side.
3398    pub fn set_side_egress_enabled(&self, side: RelaySideId, enabled: bool) -> TelemetryResult<()> {
3399        let now_ms = self.clock.now_ms();
3400        let mut st = self.state.lock();
3401        let side_ref = st
3402            .sides
3403            .get_mut(side)
3404            .and_then(|side| side.as_mut())
3405            .ok_or(TelemetryError::BadArg)?;
3406        side_ref.opts.egress_enabled = enabled;
3407        if !enabled {
3408            st.tx_queue.retain(|queued| queued.dst != side);
3409            st.replay_queue.retain(|queued| queued.dst != side);
3410        }
3411        #[cfg(feature = "discovery")]
3412        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3413        Ok(())
3414    }
3415
3416    /// Set the route-selection policy for traffic originating from `src`.
3417    ///
3418    /// `src == None` targets locally-originated relay traffic such as discovery output.
3419    pub fn set_source_route_mode(
3420        &self,
3421        src: Option<RelaySideId>,
3422        mode: RouteSelectionMode,
3423    ) -> TelemetryResult<()> {
3424        let now_ms = self.clock.now_ms();
3425        let mut st = self.state.lock();
3426        if let Some(src) = src {
3427            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3428        }
3429        if mode == RouteSelectionMode::Fanout {
3430            st.source_route_modes.remove(&src);
3431        } else {
3432            st.source_route_modes.insert(src, mode);
3433        }
3434        st.route_selection_cursors.remove(&src);
3435        #[cfg(feature = "discovery")]
3436        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3437        Ok(())
3438    }
3439
3440    /// Clear a source-specific route-selection override.
3441    pub fn clear_source_route_mode(&self, src: Option<RelaySideId>) -> TelemetryResult<()> {
3442        self.set_source_route_mode(src, RouteSelectionMode::Fanout)
3443    }
3444
3445    /// Set the weighted-routing weight from `src` toward `dst`.
3446    pub fn set_route_weight(
3447        &self,
3448        src: Option<RelaySideId>,
3449        dst: RelaySideId,
3450        weight: u32,
3451    ) -> TelemetryResult<()> {
3452        let now_ms = self.clock.now_ms();
3453        let mut st = self.state.lock();
3454        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3455        if let Some(src) = src {
3456            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3457        }
3458        st.route_weights.insert((src, dst), weight);
3459        st.route_selection_cursors.remove(&src);
3460        #[cfg(feature = "discovery")]
3461        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3462        Ok(())
3463    }
3464
3465    /// Clear a previously configured weighted-routing weight override.
3466    pub fn clear_route_weight(
3467        &self,
3468        src: Option<RelaySideId>,
3469        dst: RelaySideId,
3470    ) -> TelemetryResult<()> {
3471        let now_ms = self.clock.now_ms();
3472        let mut st = self.state.lock();
3473        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3474        if let Some(src) = src {
3475            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3476        }
3477        st.route_weights.remove(&(src, dst));
3478        st.route_selection_cursors.remove(&src);
3479        #[cfg(feature = "discovery")]
3480        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3481        Ok(())
3482    }
3483
3484    /// Set the failover priority from `src` toward `dst`.
3485    pub fn set_route_priority(
3486        &self,
3487        src: Option<RelaySideId>,
3488        dst: RelaySideId,
3489        priority: u32,
3490    ) -> TelemetryResult<()> {
3491        let now_ms = self.clock.now_ms();
3492        let mut st = self.state.lock();
3493        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3494        if let Some(src) = src {
3495            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3496        }
3497        st.route_priorities.insert((src, dst), priority);
3498        #[cfg(feature = "discovery")]
3499        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3500        Ok(())
3501    }
3502
3503    /// Clear a previously configured failover priority override.
3504    pub fn clear_route_priority(
3505        &self,
3506        src: Option<RelaySideId>,
3507        dst: RelaySideId,
3508    ) -> TelemetryResult<()> {
3509        let now_ms = self.clock.now_ms();
3510        let mut st = self.state.lock();
3511        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3512        if let Some(src) = src {
3513            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3514        }
3515        st.route_priorities.remove(&(src, dst));
3516        #[cfg(feature = "discovery")]
3517        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3518        Ok(())
3519    }
3520
3521    /// Allow or block routing from `src` toward `dst`.
3522    pub fn set_route(
3523        &self,
3524        src: Option<RelaySideId>,
3525        dst: RelaySideId,
3526        enabled: bool,
3527    ) -> TelemetryResult<()> {
3528        let now_ms = self.clock.now_ms();
3529        let mut st = self.state.lock();
3530        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3531        if let Some(src) = src {
3532            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3533        }
3534        st.route_overrides.insert((src, dst), enabled);
3535        #[cfg(feature = "discovery")]
3536        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3537        Ok(())
3538    }
3539
3540    /// Allow or block routing for a specific `DataType` from `src` toward `dst`.
3541    pub fn set_typed_route(
3542        &self,
3543        src: Option<RelaySideId>,
3544        ty: crate::DataType,
3545        dst: RelaySideId,
3546        enabled: bool,
3547    ) -> TelemetryResult<()> {
3548        let now_ms = self.clock.now_ms();
3549        let mut st = self.state.lock();
3550        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3551        if let Some(src) = src {
3552            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3553        }
3554        st.typed_route_overrides
3555            .insert((src, ty.as_u32(), dst), enabled);
3556        #[cfg(feature = "discovery")]
3557        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3558        Ok(())
3559    }
3560
3561    /// Clear a typed route override for the `(src, ty, dst)` triple.
3562    pub fn clear_typed_route(
3563        &self,
3564        src: Option<RelaySideId>,
3565        ty: crate::DataType,
3566        dst: RelaySideId,
3567    ) -> TelemetryResult<()> {
3568        let now_ms = self.clock.now_ms();
3569        let mut st = self.state.lock();
3570        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3571        if let Some(src) = src {
3572            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3573        }
3574        st.typed_route_overrides.remove(&(src, ty.as_u32(), dst));
3575        #[cfg(feature = "discovery")]
3576        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3577        Ok(())
3578    }
3579
3580    /// Clear a non-typed route override so the relay falls back to default behavior.
3581    pub fn clear_route(&self, src: Option<RelaySideId>, dst: RelaySideId) -> TelemetryResult<()> {
3582        let now_ms = self.clock.now_ms();
3583        let mut st = self.state.lock();
3584        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3585        if let Some(src) = src {
3586            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3587        }
3588        st.route_overrides.remove(&(src, dst));
3589        #[cfg(feature = "discovery")]
3590        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3591        Ok(())
3592    }
3593
3594    #[cfg(feature = "discovery")]
3595    /// Queues an immediate discovery announcement for this relay.
3596    pub fn announce_discovery(&self) -> TelemetryResult<()> {
3597        self.queue_discovery_announce()
3598    }
3599
3600    /// Broadcast that this relay is leaving so peers can prune topology immediately.
3601    pub fn announce_leave(&self) -> TelemetryResult<()> {
3602        let pkt = discovery::build_discovery_leave("relay", self.clock.now_ms())?;
3603        let mut st = self.state.lock();
3604        let dsts: Vec<usize> = st
3605            .sides
3606            .iter()
3607            .enumerate()
3608            .filter_map(|(idx, side)| side.as_ref().map(|_| idx))
3609            .collect();
3610        for dst in dsts {
3611            let data = RelayItem::Packet(Arc::new(pkt.clone()));
3612            let priority = Self::relay_item_priority(&data)?;
3613            st.push_tx(RelayTxItem {
3614                src: None,
3615                dst,
3616                data,
3617                priority,
3618            })?;
3619        }
3620        Ok(())
3621    }
3622
3623    #[cfg(feature = "discovery")]
3624    /// Polls discovery state and queues an announce if the cadence says one is due.
3625    pub fn poll_discovery(&self) -> TelemetryResult<bool> {
3626        self.poll_discovery_announce()
3627    }
3628
3629    #[cfg(feature = "discovery")]
3630    /// Exports the relay's current discovered topology snapshot.
3631    pub fn export_topology(&self) -> TopologySnapshot {
3632        let now_ms = self.clock.now_ms();
3633        let mut st = self.state.lock();
3634        if Self::prune_discovery_routes_locked(&mut st, now_ms) {
3635            self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3636            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3637        }
3638        let routes = st
3639            .discovery_routes
3640            .iter()
3641            .filter_map(|(&side_id, route)| {
3642                let side = st.sides.get(side_id).and_then(|side| side.as_ref())?;
3643                let announcers = route
3644                    .announcers
3645                    .iter()
3646                    .map(|(sender_id, sender_state)| TopologyAnnouncerRoute {
3647                        sender_id: sender_id.clone(),
3648                        reachable_endpoints: sender_state
3649                            .reachable
3650                            .iter()
3651                            .copied()
3652                            .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3653                            .collect(),
3654                        reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
3655                        routers: sender_state.topology_boards.clone(),
3656                        last_seen_ms: sender_state.last_seen_ms,
3657                        age_ms: now_ms.saturating_sub(sender_state.last_seen_ms),
3658                    })
3659                    .collect();
3660                Some(TopologySideRoute {
3661                    side_id,
3662                    side_name: side.name,
3663                    reachable_endpoints: route
3664                        .reachable
3665                        .iter()
3666                        .copied()
3667                        .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3668                        .collect(),
3669                    reachable_timesync_sources: route.reachable_timesync_sources.clone(),
3670                    announcers,
3671                    last_seen_ms: route.last_seen_ms,
3672                    age_ms: now_ms.saturating_sub(route.last_seen_ms),
3673                })
3674            })
3675            .collect();
3676        let routers = self.advertised_discovery_topology_for_link_locked(&st, now_ms, true);
3677        let advertised_endpoints =
3678            self.advertised_discovery_endpoints_for_link_locked(&st, now_ms, true);
3679        let advertised_timesync_sources =
3680            self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
3681        let links = discovery::topology_links_from_boards(&routers);
3682        TopologySnapshot {
3683            advertised_endpoints,
3684            advertised_timesync_sources,
3685            routers,
3686            links,
3687            routes,
3688            current_announce_interval_ms: st.discovery_cadence.current_interval_ms,
3689            next_announce_ms: st.discovery_cadence.next_announce_ms,
3690        }
3691    }
3692
3693    #[cfg(feature = "discovery")]
3694    pub fn client_stats(&self, sender_id: &str) -> Option<ClientStatsSnapshot> {
3695        let now_ms = self.clock.now_ms();
3696        let st = self.state.lock();
3697        let mut side_ids = Vec::new();
3698        let mut side_names = Vec::new();
3699        let mut last_seen_ms = None::<u64>;
3700        let mut reachable_endpoints = Vec::new();
3701        let mut reachable_timesync_sources = Vec::new();
3702        let mut packets_sent = 0u64;
3703        let mut packets_received = 0u64;
3704        let mut bytes_sent = 0u64;
3705        let mut bytes_received = 0u64;
3706
3707        for (side_id, route) in &st.discovery_routes {
3708            let Some(sender_state) = route.announcers.get(sender_id) else {
3709                continue;
3710            };
3711            side_ids.push(*side_id);
3712            if let Some(side_name) = st
3713                .sides
3714                .get(*side_id)
3715                .and_then(|side| side.as_ref())
3716                .map(|side| side.name)
3717            {
3718                side_names.push(side_name);
3719            }
3720            last_seen_ms = Some(last_seen_ms.unwrap_or(0).max(sender_state.last_seen_ms));
3721            reachable_endpoints.extend(sender_state.reachable.iter().copied());
3722            reachable_timesync_sources
3723                .extend(sender_state.reachable_timesync_sources.iter().cloned());
3724            if let Some(stats) = st.side_runtime_stats.get(side_id) {
3725                packets_sent = packets_sent.saturating_add(stats.tx_packets);
3726                packets_received = packets_received.saturating_add(stats.rx_packets);
3727                bytes_sent = bytes_sent.saturating_add(stats.tx_bytes);
3728                bytes_received = bytes_received.saturating_add(stats.rx_bytes);
3729            }
3730        }
3731
3732        if side_ids.is_empty() {
3733            return None;
3734        }
3735        reachable_endpoints.retain(|ep| !discovery::is_router_control_endpoint(*ep));
3736        reachable_endpoints.sort_unstable();
3737        reachable_endpoints.dedup();
3738        reachable_timesync_sources.sort_unstable();
3739        reachable_timesync_sources.dedup();
3740        side_ids.sort_unstable();
3741        side_ids.dedup();
3742        side_names.sort_unstable();
3743        side_names.dedup();
3744        let age_ms = last_seen_ms.map(|seen| now_ms.saturating_sub(seen));
3745        Some(ClientStatsSnapshot {
3746            sender_id: sender_id.to_string(),
3747            connected: age_ms.is_some_and(|age| age <= DISCOVERY_ROUTE_TTL_MS),
3748            side_ids,
3749            side_names,
3750            last_seen_ms,
3751            age_ms,
3752            reachable_endpoints,
3753            reachable_timesync_sources,
3754            packets_sent,
3755            packets_received,
3756            bytes_sent,
3757            bytes_received,
3758        })
3759    }
3760
3761    pub fn export_runtime_stats(&self) -> RuntimeStatsSnapshot {
3762        let now_ms = self.clock.now_ms();
3763        let st = self.state.lock();
3764
3765        let mut sides = Vec::new();
3766        for (side_id, side) in st.sides.iter().enumerate() {
3767            let Some(side) = side.as_ref() else { continue };
3768            let stats = st
3769                .side_runtime_stats
3770                .get(&side_id)
3771                .cloned()
3772                .unwrap_or_default();
3773            let adaptive = st
3774                .adaptive_route_stats
3775                .get(&side_id)
3776                .cloned()
3777                .unwrap_or_default()
3778                .snapshot(now_ms, true);
3779            let (tx_template_count, rx_template_count) = st
3780                .side_transport
3781                .get(&side_id)
3782                .map(|state| (state.tx_template_count(), state.rx_template_count()))
3783                .unwrap_or((0, 0));
3784            let mut data_types: Vec<RuntimeTypeStats> = stats
3785                .data_types
3786                .into_iter()
3787                .map(|(ty, item)| RuntimeTypeStats {
3788                    data_type: crate::DataType(ty),
3789                    tx_packets: item.tx_packets,
3790                    tx_bytes: item.tx_bytes,
3791                    rx_packets: item.rx_packets,
3792                    rx_bytes: item.rx_bytes,
3793                    relayed_tx_packets: item.relayed_tx_packets,
3794                    relayed_tx_bytes: item.relayed_tx_bytes,
3795                    relayed_rx_packets: item.relayed_rx_packets,
3796                    relayed_rx_bytes: item.relayed_rx_bytes,
3797                    tx_retries: item.tx_retries,
3798                    handler_failures: item.handler_failures,
3799                })
3800                .collect();
3801            data_types.sort_unstable_by_key(|item| item.data_type.as_u32());
3802            sides.push(RuntimeSideStats {
3803                side_id,
3804                side_name: side.name,
3805                reliable_enabled: side.opts.reliable_enabled,
3806                link_local_enabled: side.opts.link_local_enabled,
3807                header_template_enabled: side.opts.header_template_enabled,
3808                max_frame_bytes: side.opts.max_frame_bytes,
3809                compact_header_target_bytes: side.opts.compact_header_target_bytes,
3810                side_transport_profile: side.opts.effective_transport_profile().as_str(),
3811                ingress_enabled: side.opts.ingress_enabled,
3812                egress_enabled: side.opts.egress_enabled,
3813                tx_packets: stats.tx_packets,
3814                tx_bytes: stats.tx_bytes,
3815                rx_packets: stats.rx_packets,
3816                rx_bytes: stats.rx_bytes,
3817                relayed_tx_packets: stats.relayed_tx_packets,
3818                relayed_tx_bytes: stats.relayed_tx_bytes,
3819                relayed_rx_packets: stats.relayed_rx_packets,
3820                relayed_rx_bytes: stats.relayed_rx_bytes,
3821                local_delivery_packets: 0,
3822                tx_retries: stats.tx_retries,
3823                tx_handler_failures: stats.tx_handler_failures,
3824                local_handler_failures: 0,
3825                total_handler_retries: stats.total_handler_retries,
3826                side_transport_full_frames: stats.side_transport_full_frames,
3827                side_transport_compact_frames: stats.side_transport_compact_frames,
3828                side_transport_compact_delta_frames: stats.side_transport_compact_delta_frames,
3829                side_transport_compact_omitted_timestamp_frames: stats
3830                    .side_transport_compact_omitted_timestamp_frames,
3831                side_transport_chunk_frames: stats.side_transport_chunk_frames,
3832                side_transport_raw_bytes: stats.side_transport_raw_bytes,
3833                side_transport_wire_bytes: stats.side_transport_wire_bytes,
3834                side_transport_bytes_saved: stats.side_transport_bytes_saved,
3835                side_transport_min_compact_overhead_bytes: stats
3836                    .side_transport_min_compact_overhead_bytes,
3837                side_transport_max_compact_overhead_bytes: stats
3838                    .side_transport_max_compact_overhead_bytes,
3839                side_transport_compact_target_misses: stats.side_transport_compact_target_misses,
3840                side_transport_template_evictions: stats.side_transport_template_evictions,
3841                side_transport_tx_template_count: tx_template_count,
3842                side_transport_rx_template_count: rx_template_count,
3843                max_side_transport_templates: side.opts.max_side_transport_templates,
3844                adaptive,
3845                data_types,
3846            });
3847        }
3848
3849        let mut route_modes: Vec<RouteModeStats> = st
3850            .route_selection_cursors
3851            .iter()
3852            .map(|(src, cursor)| RouteModeStats {
3853                src_side_id: *src,
3854                selection_mode: st.source_route_modes.get(src).copied(),
3855                cursor: *cursor,
3856            })
3857            .collect();
3858        for src in st.source_route_modes.keys() {
3859            if !route_modes.iter().any(|mode| mode.src_side_id == *src) {
3860                route_modes.push(RouteModeStats {
3861                    src_side_id: *src,
3862                    selection_mode: st.source_route_modes.get(src).copied(),
3863                    cursor: 0,
3864                });
3865            }
3866        }
3867        route_modes.sort_unstable_by_key(|mode| mode.src_side_id.unwrap_or(usize::MAX));
3868
3869        let mut route_overrides: Vec<RouteOverrideStats> = st
3870            .route_overrides
3871            .iter()
3872            .map(|((src, dst), enabled)| RouteOverrideStats {
3873                src_side_id: *src,
3874                dst_side_id: *dst,
3875                enabled: *enabled,
3876            })
3877            .collect();
3878        route_overrides.sort_unstable_by_key(|item| {
3879            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3880        });
3881
3882        let mut typed_route_overrides: Vec<TypedRouteOverrideStats> = st
3883            .typed_route_overrides
3884            .iter()
3885            .map(|((src, ty, dst), enabled)| TypedRouteOverrideStats {
3886                src_side_id: *src,
3887                data_type: crate::DataType(*ty),
3888                dst_side_id: *dst,
3889                enabled: *enabled,
3890            })
3891            .collect();
3892        typed_route_overrides.sort_unstable_by_key(|item| {
3893            (
3894                item.src_side_id.unwrap_or(usize::MAX),
3895                item.data_type.as_u32(),
3896                item.dst_side_id,
3897            )
3898        });
3899
3900        let mut route_weights: Vec<RouteWeightStats> = st
3901            .route_weights
3902            .iter()
3903            .map(|((src, dst), weight)| RouteWeightStats {
3904                src_side_id: *src,
3905                dst_side_id: *dst,
3906                weight: *weight,
3907            })
3908            .collect();
3909        route_weights.sort_unstable_by_key(|item| {
3910            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3911        });
3912
3913        let mut route_priorities: Vec<RoutePriorityStats> = st
3914            .route_priorities
3915            .iter()
3916            .map(|((src, dst), priority)| RoutePriorityStats {
3917                src_side_id: *src,
3918                dst_side_id: *dst,
3919                priority: *priority,
3920            })
3921            .collect();
3922        route_priorities.sort_unstable_by_key(|item| {
3923            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3924        });
3925
3926        #[cfg(feature = "discovery")]
3927        let discovery = DiscoveryRuntimeStats {
3928            route_count: st.discovery_routes.len(),
3929            announcer_count: st
3930                .discovery_routes
3931                .values()
3932                .map(|route| route.announcers.len())
3933                .sum(),
3934            current_announce_interval_ms: Some(st.discovery_cadence.current_interval_ms),
3935            next_announce_ms: Some(st.discovery_cadence.next_announce_ms),
3936        };
3937        #[cfg(not(feature = "discovery"))]
3938        let discovery = DiscoveryRuntimeStats {
3939            route_count: 0,
3940            announcer_count: 0,
3941            current_announce_interval_ms: None,
3942            next_announce_ms: None,
3943        };
3944
3945        RuntimeStatsSnapshot {
3946            sides,
3947            route_modes,
3948            route_overrides,
3949            typed_route_overrides,
3950            route_weights,
3951            route_priorities,
3952            queues: QueueRuntimeStats {
3953                rx_len: st.rx_queue.len(),
3954                rx_bytes: st.rx_queue.bytes_used(),
3955                tx_len: st.tx_queue.len(),
3956                tx_bytes: st.tx_queue.bytes_used(),
3957                replay_len: st.replay_queue.len(),
3958                replay_bytes: st.replay_queue.bytes_used(),
3959                recent_rx_len: st.recent_rx.len(),
3960                recent_rx_bytes: st.recent_rx.bytes_used(),
3961                reliable_rx_buffered_len: st.reliable_rx_buffer_len(),
3962                reliable_rx_buffered_bytes: st.reliable_rx_buffered_bytes(),
3963                shared_queue_bytes_used: st.shared_queue_bytes_used(),
3964            },
3965            reliable: ReliableRuntimeStats {
3966                reliable_return_route_count: st.reliable_return_routes.len(),
3967                end_to_end_pending_count: 0,
3968                end_to_end_pending_destination_count: 0,
3969                end_to_end_acked_cache_count: st.end_to_end_acked_destinations.len(),
3970            },
3971            discovery,
3972            total_handler_failures: st.total_handler_failures,
3973            total_handler_retries: st.total_handler_retries,
3974        }
3975    }
3976
3977    /// Export current relay memory usage/layout as JSON for profiling.
3978    pub fn export_memory_layout_json(&self) -> String {
3979        let st = self.state.lock();
3980        #[cfg(feature = "discovery")]
3981        let discovery_bytes = st.discovery_bytes_used();
3982        #[cfg(not(feature = "discovery"))]
3983        let discovery_bytes = 0usize;
3984        let schema_bytes = crate::config::schema_bytes_used();
3985        let memory = st.memory;
3986        let mut out = String::new();
3987        let _ = core::fmt::Write::write_fmt(
3988            &mut out,
3989            format_args!(
3990                "{{\"kind\":\"relay\",\
3991                 \"shared_queue_bytes_used\":{},\"shared_queue_bytes_allocated\":{},\
3992                 \"rx_queue_bytes_used\":{},\"rx_queue_bytes_allocated\":{},\"rx_queue_len\":{},\
3993                 \"tx_queue_bytes_used\":{},\"tx_queue_bytes_allocated\":{},\"tx_queue_len\":{},\
3994                 \"replay_queue_bytes_used\":{},\"replay_queue_bytes_allocated\":{},\"replay_queue_len\":{},\
3995                 \"recent_rx_bytes_used\":{},\"recent_rx_bytes_allocated\":{},\"recent_rx_len\":{},\
3996                 \"reliable_rx_buffer_bytes_used\":{},\"reliable_rx_buffer_bytes_allocated\":{},\"reliable_rx_buffer_len\":{},\
3997                 \"discovery_bytes_used\":{},\"discovery_bytes_allocated\":{},\
3998                 \"schema_bytes_used\":{},\"schema_bytes_allocated\":{}}}",
3999                st.shared_queue_bytes_used(),
4000                memory.max_queue_budget,
4001                st.rx_queue.bytes_used(),
4002                st.rx_queue.max_bytes(),
4003                st.rx_queue.len(),
4004                st.tx_queue.bytes_used(),
4005                st.tx_queue.max_bytes(),
4006                st.tx_queue.len(),
4007                st.replay_queue.bytes_used(),
4008                st.replay_queue.max_bytes(),
4009                st.replay_queue.len(),
4010                st.recent_rx.bytes_used(),
4011                st.recent_rx.max_bytes(),
4012                st.recent_rx.len(),
4013                st.reliable_rx_buffered_bytes(),
4014                memory.max_queue_budget,
4015                st.reliable_rx_buffer_len(),
4016                discovery_bytes,
4017                memory.max_queue_budget,
4018                schema_bytes,
4019                memory.max_queue_budget,
4020            ),
4021        );
4022        out
4023    }
4024
4025    #[cfg(test)]
4026    pub(crate) fn debug_end_to_end_acked_destination_count(&self, packet_id: u64) -> Option<usize> {
4027        let st = self.state.lock();
4028        st.end_to_end_acked_destinations
4029            .get(&packet_id)
4030            .map(BTreeSet::len)
4031    }
4032
4033    #[cfg(test)]
4034    pub(crate) fn debug_end_to_end_acked_packet_count(&self) -> usize {
4035        let st = self.state.lock();
4036        st.end_to_end_acked_destinations.len()
4037    }
4038
4039    #[cfg(test)]
4040    pub(crate) fn debug_reliable_return_route_count(&self) -> usize {
4041        let st = self.state.lock();
4042        st.reliable_return_routes.len()
4043    }
4044
4045    /// Enqueue packed bytes that originated from `src` into the relay RX queue.
4046    ///
4047    /// Note: `Arc::from(bytes)` allocates and copies `len` bytes into a new `Arc<[u8]>`.
4048    /// This is still “fast enough” for many cases, but it is not allocation-free / ISR-safe.
4049    pub fn rx_packed_from_side(&self, src: RelaySideId, bytes: &[u8]) -> TelemetryResult<()> {
4050        self.ensure_side_ingress_enabled(src)?;
4051        let Some(bytes) = self.decode_side_transport_frame(src, bytes)? else {
4052            return Ok(());
4053        };
4054        let mut st = self.state.lock();
4055
4056        let data = RelayItem::Packed(bytes);
4057        let priority = Self::relay_item_priority(&data)?;
4058        st.push_rx(RelayRxItem {
4059            src,
4060            data,
4061            priority,
4062        })
4063    }
4064
4065    /// Enqueue a full packet that originated from `src` into the relay RX queue.
4066    ///
4067    /// The packet is wrapped in `Arc<Packet>` so fanout can clone the pointer cheaply.
4068    pub fn rx_from_side(&self, src: RelaySideId, packet: Packet) -> TelemetryResult<()> {
4069        self.ensure_side_ingress_enabled(src)?;
4070        let mut st = self.state.lock();
4071
4072        let data = RelayItem::Packet(Arc::new(packet));
4073        let priority = Self::relay_item_priority(&data)?;
4074        st.push_rx(RelayRxItem {
4075            src,
4076            data,
4077            priority,
4078        })
4079    }
4080
4081    /// Clear both RX and TX queues.
4082    pub fn clear_queues(&self) {
4083        let mut st = self.state.lock();
4084        st.rx_queue.clear();
4085        st.tx_queue.clear();
4086    }
4087
4088    /// Clear only RX queue.
4089    pub fn clear_rx_queue(&self) {
4090        let mut st = self.state.lock();
4091        st.rx_queue.clear();
4092    }
4093
4094    /// Clear only TX queue.
4095    pub fn clear_tx_queue(&self) {
4096        let mut st = self.state.lock();
4097        st.tx_queue.clear();
4098        st.replay_queue.clear();
4099    }
4100
4101    /// Internal: expand one RX item into TX items for all other sides.
4102    ///
4103    /// Fanout is cheap: the `RelayItem` is cloned (Arc bump) and reused across all destinations.
4104    fn process_rx_queue_item(&self, item: RelayRxItem) -> TelemetryResult<()> {
4105        self.ensure_side_ingress_enabled(item.src)?;
4106        match &item.data {
4107            RelayItem::Packet(pkt) => {
4108                let bytes = wire_format::pack_packet(pkt).len();
4109                self.note_side_rx(item.src, pkt.data_type(), bytes);
4110            }
4111            RelayItem::Packed(bytes) => {
4112                if let Ok(env) = wire_format::peek_envelope(bytes.as_ref()) {
4113                    self.note_side_rx(item.src, env.ty, bytes.len());
4114                }
4115            }
4116        }
4117        match &item.data {
4118            RelayItem::Packet(pkt) => {
4119                if is_reliable_type(pkt.data_type()) && !is_internal_control_type(pkt.data_type()) {
4120                    self.note_reliable_return_route(item.src, pkt.packet_id());
4121                }
4122            }
4123            RelayItem::Packed(bytes) => {
4124                if let Ok(env) = wire_format::peek_envelope(bytes.as_ref())
4125                    && is_reliable_type(env.ty)
4126                    && !is_internal_control_type(env.ty)
4127                    && let Ok(packet_id) = wire_format::packet_id_from_wire(bytes.as_ref())
4128                {
4129                    self.note_reliable_return_route(item.src, packet_id);
4130                }
4131            }
4132        }
4133        let mut released_buffered: Vec<Arc<[u8]>> = Vec::new();
4134        if let RelayItem::Packed(bytes) = &item.data {
4135            let (_opts, handler_is_packed, hop_reliable_enabled) = {
4136                let st = self.state.lock();
4137                let side_ref = Self::side_ref(&st, item.src)?;
4138                let opts = side_ref.opts;
4139                (
4140                    opts,
4141                    matches!(side_ref.tx_handler, RelayTxHandlerFn::Packed(_)),
4142                    opts.reliable_enabled
4143                        && !self.side_has_multiple_announcers_locked(
4144                            &st,
4145                            item.src,
4146                            self.clock.now_ms(),
4147                        ),
4148                )
4149            };
4150
4151            let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4152                Ok(frame) => frame,
4153                Err(e) => {
4154                    if matches!(e, TelemetryError::Unpack(msg) if msg == "crc32 mismatch")
4155                        && hop_reliable_enabled
4156                        && handler_is_packed
4157                        && let Ok(frame) = wire_format::peek_frame_info_unchecked(bytes.as_ref())
4158                    {
4159                        if is_reliable_type(frame.envelope.ty)
4160                            && let Some(hdr) = frame.reliable
4161                        {
4162                            let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4163                            let unsequenced =
4164                                (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4165
4166                            if !unsequenced {
4167                                let requested = if unordered {
4168                                    hdr.seq
4169                                } else {
4170                                    let mut st = self.state.lock();
4171                                    let rx_state = self.reliable_rx_state_mut(
4172                                        &mut st,
4173                                        item.src,
4174                                        frame.envelope.ty,
4175                                    );
4176                                    rx_state.expected_seq.min(hdr.seq)
4177                                };
4178                                self.queue_reliable_packet_request(
4179                                    item.src,
4180                                    frame.envelope.ty,
4181                                    requested,
4182                                )?;
4183                            }
4184                        }
4185                        return Ok(());
4186                    }
4187                    return Err(e);
4188                }
4189            };
4190
4191            if hop_reliable_enabled
4192                && handler_is_packed
4193                && is_reliable_type(frame.envelope.ty)
4194                && let Some(hdr) = frame.reliable
4195            {
4196                if frame.ack_only() {
4197                    self.handle_reliable_ack(item.src, frame.envelope.ty, hdr.ack);
4198                    return Ok(());
4199                }
4200                let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4201                let unsequenced = (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4202
4203                if !unsequenced {
4204                    if unordered {
4205                        self.queue_reliable_ack(item.src, frame.envelope.ty, hdr.seq)?;
4206                    } else {
4207                        let mut release: Vec<Arc<[u8]>> = Vec::new();
4208                        let mut last_delivered = None;
4209                        let mut ack_old = None;
4210                        let mut request_missing = None;
4211                        let mut partial_ack = None;
4212                        {
4213                            let mut st = self.state.lock();
4214                            let rx_state =
4215                                self.reliable_rx_state_mut(&mut st, item.src, frame.envelope.ty);
4216                            let expected_seq = rx_state.expected_seq;
4217                            if hdr.seq < expected_seq {
4218                                ack_old = Some(expected_seq.saturating_sub(1));
4219                            } else if hdr.seq > expected_seq {
4220                                request_missing = Some(expected_seq);
4221                                partial_ack = Some(hdr.seq);
4222                                st.buffer_reliable_rx(
4223                                    item.src,
4224                                    frame.envelope.ty,
4225                                    hdr.seq,
4226                                    bytes.clone(),
4227                                )?;
4228                            } else {
4229                                release.push(bytes.clone());
4230                                last_delivered = Some(hdr.seq);
4231                                let mut next_expected = hdr.seq.wrapping_add(1);
4232                                while let Some(buf) = rx_state.buffered.remove(&next_expected) {
4233                                    release.push(buf);
4234                                    last_delivered = Some(next_expected);
4235                                    let next = next_expected.wrapping_add(1);
4236                                    next_expected = if next == 0 { 1 } else { next };
4237                                }
4238                                rx_state.expected_seq = next_expected;
4239                            }
4240                        }
4241
4242                        if let Some(ack_seq) = ack_old {
4243                            self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4244                            return Ok(());
4245                        }
4246                        if let Some(request_seq) = request_missing {
4247                            if let Some(partial_seq) = partial_ack {
4248                                self.queue_reliable_partial_ack(
4249                                    item.src,
4250                                    frame.envelope.ty,
4251                                    partial_seq,
4252                                )?;
4253                            }
4254                            self.queue_reliable_packet_request(
4255                                item.src,
4256                                frame.envelope.ty,
4257                                request_seq,
4258                            )?;
4259                            return Ok(());
4260                        }
4261                        if let Some(ack_seq) = last_delivered {
4262                            self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4263                        }
4264                        released_buffered.extend(release.into_iter().skip(1));
4265                    }
4266                }
4267            }
4268        }
4269
4270        if self.is_duplicate_pkt(&item)? && !self.should_forward_duplicate_reliable_item(&item)? {
4271            // Already fanned out this packet recently; skip.
4272            return Ok(());
4273        }
4274
4275        self.dispatch_relay_rx_item(&item)?;
4276
4277        for release_bytes in released_buffered {
4278            let release_item = RelayRxItem {
4279                src: item.src,
4280                priority: Self::relay_item_priority(&RelayItem::Packed(release_bytes.clone()))?,
4281                data: RelayItem::Packed(release_bytes),
4282            };
4283            if self.is_duplicate_pkt(&release_item)?
4284                && !self.should_forward_duplicate_reliable_item(&release_item)?
4285            {
4286                continue;
4287            }
4288            self.dispatch_relay_rx_item(&release_item)?;
4289        }
4290        Ok(())
4291    }
4292
4293    fn dispatch_relay_rx_item(&self, item: &RelayRxItem) -> TelemetryResult<()> {
4294        match &item.data {
4295            RelayItem::Packet(pkt) => {
4296                if matches!(
4297                    pkt.data_type(),
4298                    crate::DataType::ReliableAck
4299                        | crate::DataType::ReliablePartialAck
4300                        | crate::DataType::ReliablePacketRequest
4301                ) {
4302                    if pkt.data_type() == crate::DataType::ReliableAck
4303                        && Self::is_end_to_end_ack_sender(pkt.sender())
4304                        && Self::decode_end_to_end_reliable_ack(pkt.payload()).is_ok()
4305                    {
4306                        if let Ok(packet_id) = Self::decode_end_to_end_reliable_ack(pkt.payload())
4307                            && let Some(sender_hash) =
4308                                Self::decode_end_to_end_ack_sender_hash(pkt.sender())
4309                        {
4310                            let mut st = self.state.lock();
4311                            Self::note_end_to_end_acked_destination_locked(
4312                                &mut st,
4313                                packet_id,
4314                                sender_hash,
4315                            );
4316                        }
4317                    } else {
4318                        let vals = pkt.data_as_u32()?;
4319                        if vals.len() != 2 {
4320                            return Err(TelemetryError::Unpack("bad reliable control payload"));
4321                        }
4322                        let ty = crate::DataType::try_from_u32(vals[0])
4323                            .ok_or(TelemetryError::InvalidType)?;
4324                        let seq = vals[1];
4325                        match pkt.data_type() {
4326                            crate::DataType::ReliableAck => {
4327                                self.handle_reliable_ack(item.src, ty, seq)
4328                            }
4329                            crate::DataType::ReliablePartialAck => {
4330                                self.handle_reliable_partial_ack(item.src, ty, seq)
4331                            }
4332                            crate::DataType::ReliablePacketRequest => {
4333                                self.queue_reliable_retransmit(item.src, ty, seq)?
4334                            }
4335                            _ => {}
4336                        }
4337                        return Ok(());
4338                    }
4339                }
4340            }
4341            RelayItem::Packed(bytes) => {
4342                let env = wire_format::peek_envelope(bytes.as_ref())?;
4343                if matches!(
4344                    env.ty,
4345                    crate::DataType::ReliableAck
4346                        | crate::DataType::ReliablePacketRequest
4347                        | crate::DataType::ReliablePartialAck
4348                ) {
4349                    let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4350                    return self.dispatch_relay_rx_item(&RelayRxItem {
4351                        src: item.src,
4352                        data: RelayItem::Packet(Arc::new(pkt)),
4353                        priority: item.priority,
4354                    });
4355                }
4356            }
4357        }
4358
4359        let src = item.src;
4360        let data = item.data.clone();
4361        self.learn_discovery_item(src, &data)?;
4362
4363        let plan = self.remote_side_plan(&data, src)?;
4364        let mut st = self.state.lock();
4365        let RemoteSidePlan::Target(sides) = plan;
4366        for dst in sides {
4367            let priority = Self::relay_item_priority(&data)?;
4368            st.push_tx(RelayTxItem {
4369                src: Some(src),
4370                dst,
4371                data: data.clone(),
4372                priority,
4373            })?;
4374        }
4375        Ok(())
4376    }
4377
4378    #[inline]
4379    fn crc32_bytes(data: &[u8]) -> u32 {
4380        let mut hasher = Crc32Hasher::new();
4381        hasher.update(data);
4382        hasher.finalize()
4383    }
4384
4385    fn wrap_side_transport_frame(kind: u8, body: &[u8]) -> Arc<[u8]> {
4386        let mut out = Vec::with_capacity(
4387            SIDE_TRANSPORT_MAGIC.len() + 1 + body.len() + wire_format::CRC32_BYTES,
4388        );
4389        out.extend_from_slice(SIDE_TRANSPORT_MAGIC);
4390        out.push(kind);
4391        out.extend_from_slice(body);
4392        let crc = Self::crc32_bytes(&out);
4393        out.extend_from_slice(&crc.to_le_bytes());
4394        Arc::from(out)
4395    }
4396
4397    fn parse_side_transport_wrapper(bytes: &[u8]) -> TelemetryResult<Option<(u8, &[u8])>> {
4398        if bytes.len() < SIDE_TRANSPORT_MAGIC.len() + 1 + wire_format::CRC32_BYTES {
4399            return Ok(None);
4400        }
4401        if &bytes[..SIDE_TRANSPORT_MAGIC.len()] != SIDE_TRANSPORT_MAGIC {
4402            return Ok(None);
4403        }
4404        let data_len = bytes.len() - wire_format::CRC32_BYTES;
4405        let expected = u32::from_le_bytes([
4406            bytes[data_len],
4407            bytes[data_len + 1],
4408            bytes[data_len + 2],
4409            bytes[data_len + 3],
4410        ]);
4411        let data = &bytes[..data_len];
4412        if Self::crc32_bytes(data) != expected {
4413            return Err(TelemetryError::Unpack("side transport crc32 mismatch"));
4414        }
4415        let kind = data[SIDE_TRANSPORT_MAGIC.len()];
4416        Ok(Some((kind, &data[SIDE_TRANSPORT_MAGIC.len() + 1..])))
4417    }
4418
4419    fn read_uleb128_local(buf: &[u8], off: &mut usize) -> TelemetryResult<u64> {
4420        let mut result = 0u64;
4421        let mut shift = 0u32;
4422        for _ in 0..10 {
4423            let byte = *buf.get(*off).ok_or(TelemetryError::Unpack("short read"))?;
4424            *off += 1;
4425            result |= u64::from(byte & 0x7F) << shift;
4426            if (byte & 0x80) == 0 {
4427                return Ok(result);
4428            }
4429            shift += 7;
4430        }
4431        Err(TelemetryError::Unpack("uleb128 too long"))
4432    }
4433
4434    fn write_uleb128_local(mut value: u64, out: &mut Vec<u8>) {
4435        loop {
4436            let mut byte = (value & 0x7F) as u8;
4437            value >>= 7;
4438            if value != 0 {
4439                byte |= 0x80;
4440            }
4441            out.push(byte);
4442            if value == 0 {
4443                break;
4444            }
4445        }
4446    }
4447
4448    fn uleb128_len_local(mut value: u64) -> usize {
4449        let mut len = 1;
4450        while value >= 0x80 {
4451            value >>= 7;
4452            len += 1;
4453        }
4454        len
4455    }
4456
4457    fn extract_side_header_template(bytes: &[u8]) -> TelemetryResult<SideTemplateExtract<'_>> {
4458        if bytes.len() < wire_format::CRC32_BYTES + 4 {
4459            return Err(TelemetryError::Unpack("short buffer"));
4460        }
4461        let data_len = bytes.len() - wire_format::CRC32_BYTES;
4462        let data = &bytes[..data_len];
4463        let mut off = 0usize;
4464        let flags = *data
4465            .get(off)
4466            .ok_or(TelemetryError::Unpack("short prelude"))?;
4467        off += 1;
4468        off += 1; // NEP
4469        let ty_u64 = Self::read_uleb128_local(data, &mut off)?;
4470        let ty_u32 = u32::try_from(ty_u64).map_err(|_| TelemetryError::Unpack("bad data type"))?;
4471        if ty_u32 > crate::MAX_VALUE_DATA_TYPE {
4472            return Err(TelemetryError::Unpack("bad data type"));
4473        }
4474        let ty = crate::DataType(ty_u32);
4475        let data_size_off = off;
4476        let data_size = Self::read_uleb128_local(data, &mut off)?;
4477        let timestamp = Self::read_uleb128_local(data, &mut off)?;
4478        let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4479            u16::try_from(Self::read_uleb128_local(data, &mut off)?)
4480                .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
4481        } else {
4482            0
4483        };
4484        let between_start = off;
4485        let _source_address = u32::try_from(Self::read_uleb128_local(data, &mut off)?)
4486            .map_err(|_| TelemetryError::Unpack("source address too large"))?;
4487        let endpoint_bitmap_bytes = if (flags & SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT) != 0 {
4488            SIDE_TRANSPORT_EP_BITMAP_BYTES
4489        } else {
4490            0
4491        };
4492        if data.len() < off + endpoint_bitmap_bytes {
4493            return Err(TelemetryError::Unpack("short buffer"));
4494        }
4495        off += endpoint_bitmap_bytes;
4496        if (flags & SIDE_TRANSPORT_FLAG_WIRE_CONTRACT) != 0 {
4497            let contract_len = usize::try_from(Self::read_uleb128_local(data, &mut off)?)
4498                .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
4499            if data.len() < off + contract_len {
4500                return Err(TelemetryError::Unpack("short buffer"));
4501            }
4502            off += contract_len;
4503        }
4504        let reliable_span = wire_format::reliable_header_span(bytes)?;
4505        let (reliable_flags, reliable_seq_ack, reliable_compact, payload_off) =
4506            if let Some((rel_off, rel_len, hdr)) = reliable_span {
4507                if data.len() < rel_off + rel_len {
4508                    return Err(TelemetryError::Unpack("short buffer"));
4509                }
4510                (
4511                    Some(hdr.flags),
4512                    Some((hdr.seq, hdr.ack)),
4513                    (flags & SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER) != 0,
4514                    rel_off + rel_len,
4515                )
4516            } else {
4517                (None, None, false, off)
4518            };
4519        if payload_off > data.len() {
4520            return Err(TelemetryError::Unpack("short buffer"));
4521        }
4522        let payload = &data[payload_off..];
4523        let prefix = Arc::<[u8]>::from(&data[1..data_size_off]);
4524        let between_end = reliable_span
4525            .map(|(rel_off, _, _)| rel_off)
4526            .unwrap_or(payload_off);
4527        let between = Arc::<[u8]>::from(&data[between_start..between_end]);
4528        let base_flags =
4529            flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE);
4530        let mut hash = 0xD1B5_4A32_9C7E_01F3u64;
4531        hash = hash_bytes_u64(hash, &[base_flags]);
4532        hash = hash_bytes_u64(hash, &prefix);
4533        hash = hash_bytes_u64(hash, &between);
4534        if let Some(rel_flags) = reliable_flags {
4535            hash = hash_bytes_u64(hash, &[rel_flags]);
4536        }
4537        let template = SideHeaderTemplate {
4538            hash,
4539            base_flags,
4540            prefix,
4541            between,
4542            reliable_flags,
4543            reliable_compact,
4544        };
4545        Ok((
4546            template,
4547            ty,
4548            flags,
4549            data_size,
4550            timestamp,
4551            nonce,
4552            reliable_seq_ack,
4553            payload,
4554        ))
4555    }
4556
4557    fn reconstruct_side_compact_frame(
4558        template: &SideHeaderTemplate,
4559        body: &[u8],
4560        timestamp_mode: SideCompactTimestampMode,
4561        timestamp_base: Option<u64>,
4562    ) -> TelemetryResult<(Arc<[u8]>, u64)> {
4563        if body.is_empty() {
4564            return Err(TelemetryError::Unpack("short side compact frame"));
4565        }
4566        let mut off = 0usize;
4567        let flags = body[off];
4568        off += 1;
4569        if (flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE))
4570            != template.base_flags
4571        {
4572            return Err(TelemetryError::Unpack("side compact flags mismatch"));
4573        }
4574        let data_size = Self::read_uleb128_local(body, &mut off)?;
4575        let timestamp = match timestamp_mode {
4576            SideCompactTimestampMode::Absolute => Self::read_uleb128_local(body, &mut off)?,
4577            SideCompactTimestampMode::Delta => {
4578                let timestamp_field = Self::read_uleb128_local(body, &mut off)?;
4579                let base = timestamp_base.ok_or(TelemetryError::Unpack(
4580                    "missing side compact timestamp context",
4581                ))?;
4582                base.checked_add(timestamp_field)
4583                    .ok_or(TelemetryError::Unpack(
4584                        "side compact timestamp delta overflow",
4585                    ))?
4586            }
4587            SideCompactTimestampMode::Omitted => timestamp_base.ok_or(TelemetryError::Unpack(
4588                "missing side compact timestamp context",
4589            ))?,
4590        };
4591        let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4592            Some(Self::read_uleb128_local(body, &mut off)?)
4593        } else {
4594            None
4595        };
4596        let reliable_seq_ack = if template.reliable_flags.is_some() {
4597            let seq = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4598                .map_err(|_| TelemetryError::Unpack("side compact reliable seq too large"))?;
4599            let ack = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4600                .map_err(|_| TelemetryError::Unpack("side compact reliable ack too large"))?;
4601            Some((seq, ack))
4602        } else {
4603            None
4604        };
4605        let payload = &body[off..];
4606        let mut raw = Vec::with_capacity(
4607            1 + template.prefix.len() + template.between.len() + payload.len() + 32,
4608        );
4609        raw.push(flags);
4610        raw.extend_from_slice(&template.prefix);
4611        Self::write_uleb128_local(data_size, &mut raw);
4612        Self::write_uleb128_local(timestamp, &mut raw);
4613        if let Some(nonce) = nonce {
4614            Self::write_uleb128_local(nonce, &mut raw);
4615        }
4616        raw.extend_from_slice(&template.between);
4617        if let Some(rel_flags) = template.reliable_flags {
4618            let (seq, ack) =
4619                reliable_seq_ack.ok_or(TelemetryError::Unpack("missing side compact reliable"))?;
4620            wire_format::write_reliable_header_encoded(
4621                wire_format::ReliableHeader {
4622                    flags: rel_flags,
4623                    seq,
4624                    ack,
4625                },
4626                template.reliable_compact,
4627                &mut raw,
4628            );
4629        }
4630        raw.extend_from_slice(payload);
4631        let crc = Self::crc32_bytes(&raw);
4632        raw.extend_from_slice(&crc.to_le_bytes());
4633        Ok((Arc::from(raw), timestamp))
4634    }
4635
4636    fn split_side_transport_frame(
4637        &self,
4638        side: RelaySideId,
4639        frame: Arc<[u8]>,
4640        max_frame_bytes: usize,
4641    ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4642        if max_frame_bytes <= SIDE_TRANSPORT_CHUNK_OVERHEAD {
4643            return Err(TelemetryError::BadArg);
4644        }
4645        let payload_budget = max_frame_bytes - SIDE_TRANSPORT_CHUNK_OVERHEAD;
4646        let mut st = self.state.lock();
4647        let side_state = st
4648            .side_transport
4649            .get_mut(&side)
4650            .ok_or(TelemetryError::BadArg)?;
4651        let transfer_id = side_state.next_chunk_id.wrapping_add(1).max(1);
4652        side_state.next_chunk_id = transfer_id;
4653        drop(st);
4654
4655        let total = frame.len().div_ceil(payload_budget);
4656        let total_u16 =
4657            u16::try_from(total).map_err(|_| TelemetryError::PacketTooLarge("too many chunks"))?;
4658        let mut frames = Vec::with_capacity(total);
4659        for (idx, chunk) in frame.chunks(payload_budget).enumerate() {
4660            let mut body = Vec::with_capacity(8 + chunk.len());
4661            body.extend_from_slice(&transfer_id.to_le_bytes());
4662            body.extend_from_slice(&(idx as u16).to_le_bytes());
4663            body.extend_from_slice(&total_u16.to_le_bytes());
4664            body.extend_from_slice(chunk);
4665            frames.push(Self::wrap_side_transport_frame(
4666                SIDE_TRANSPORT_KIND_CHUNK,
4667                &body,
4668            ));
4669        }
4670        Ok(frames)
4671    }
4672
4673    fn encode_side_transport_frames(
4674        &self,
4675        side: RelaySideId,
4676        opts: RelaySideOptions,
4677        raw: Arc<[u8]>,
4678    ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4679        if !opts.header_template_enabled && opts.max_frame_bytes == 0 {
4680            return Ok(vec![raw]);
4681        }
4682        let raw_len = raw.len();
4683        let mut compact_payload_len = None;
4684        let mut used_compact = false;
4685        let mut used_timestamp_delta = false;
4686        let mut omitted_timestamp = false;
4687        let (template, ty, flags, data_size, timestamp, nonce, reliable_seq_ack, payload) =
4688            Self::extract_side_header_template(raw.as_ref())?;
4689        let (template_id, use_compact, previous_timestamp) = {
4690            let mut st = self.state.lock();
4691            let side_state = st
4692                .side_transport
4693                .get_mut(&side)
4694                .ok_or(TelemetryError::BadArg)?;
4695            if let Some(id) = side_state.tx_template_ids.get(&template.hash).copied() {
4696                let previous = side_state.tx_last_timestamps.get(&id).copied();
4697                (id, true, previous)
4698            } else {
4699                let next = side_state.next_template_id.wrapping_add(1).max(1);
4700                side_state.next_template_id = next;
4701                let evicted = side_state.insert_tx_template(
4702                    template,
4703                    next,
4704                    opts.max_side_transport_templates,
4705                );
4706                if evicted {
4707                    st.side_runtime_stats
4708                        .entry(side)
4709                        .or_default()
4710                        .note_side_transport_template_eviction();
4711                }
4712                if let Some(side_state) = st.side_transport.get_mut(&side) {
4713                    side_state.tx_last_timestamps.insert(next, timestamp);
4714                }
4715                (next, false, None)
4716            }
4717        };
4718        let wrapped = if use_compact {
4719            used_compact = true;
4720            compact_payload_len = Some(payload.len());
4721            let timestamp_field = if let Some(previous) = previous_timestamp {
4722                let delta = timestamp.saturating_sub(previous);
4723                let omit_timestamp = opts.omit_unchanged_compact_timestamps
4724                    || opts.compact_timestamp_omission_types.contains(ty);
4725                if omit_timestamp && timestamp == previous {
4726                    omitted_timestamp = true;
4727                    None
4728                } else if timestamp >= previous
4729                    && Self::uleb128_len_local(delta) < Self::uleb128_len_local(timestamp)
4730                {
4731                    used_timestamp_delta = true;
4732                    Some(delta)
4733                } else {
4734                    Some(timestamp)
4735                }
4736            } else {
4737                Some(timestamp)
4738            };
4739            let mut body = Vec::with_capacity(payload.len() + 32);
4740            body.push(flags);
4741            Self::write_uleb128_local(u64::from(template_id), &mut body);
4742            Self::write_uleb128_local(data_size, &mut body);
4743            if let Some(timestamp_field) = timestamp_field {
4744                Self::write_uleb128_local(timestamp_field, &mut body);
4745            }
4746            if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4747                Self::write_uleb128_local(u64::from(nonce), &mut body);
4748            }
4749            if let Some((seq, ack)) = reliable_seq_ack {
4750                Self::write_uleb128_local(u64::from(seq), &mut body);
4751                Self::write_uleb128_local(u64::from(ack), &mut body);
4752            }
4753            body.extend_from_slice(payload);
4754            {
4755                let mut st = self.state.lock();
4756                if let Some(side_state) = st.side_transport.get_mut(&side) {
4757                    side_state.tx_last_timestamps.insert(template_id, timestamp);
4758                }
4759            }
4760            let kind = if omitted_timestamp {
4761                SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4762            } else if used_timestamp_delta {
4763                SIDE_TRANSPORT_KIND_COMPACT_DELTA
4764            } else {
4765                SIDE_TRANSPORT_KIND_COMPACT
4766            };
4767            Self::wrap_side_transport_frame(kind, &body)
4768        } else {
4769            let mut body = Vec::with_capacity(raw.len() + 4);
4770            Self::write_uleb128_local(u64::from(template_id), &mut body);
4771            body.extend_from_slice(raw.as_ref());
4772            Self::wrap_side_transport_frame(SIDE_TRANSPORT_KIND_FULL, &body)
4773        };
4774        let frames = if opts.max_frame_bytes != 0 && wrapped.len() > opts.max_frame_bytes {
4775            self.split_side_transport_frame(side, wrapped, opts.max_frame_bytes)
4776        } else {
4777            Ok(vec![wrapped])
4778        }?;
4779        let wire_len = frames.iter().map(|frame| frame.len()).sum::<usize>();
4780        let mut st = self.state.lock();
4781        let stats = st.side_runtime_stats.entry(side).or_default();
4782        if used_compact {
4783            let overhead = compact_payload_len
4784                .map(|payload_len| wire_len.saturating_sub(payload_len))
4785                .unwrap_or(wire_len);
4786            stats.note_side_transport_compact(
4787                raw_len,
4788                wire_len,
4789                overhead,
4790                used_timestamp_delta,
4791                omitted_timestamp,
4792            );
4793            if opts.compact_header_target_bytes != 0 && overhead > opts.compact_header_target_bytes
4794            {
4795                stats.note_side_transport_compact_target_miss();
4796            }
4797        } else {
4798            stats.note_side_transport_full(raw_len, wire_len);
4799        }
4800        if frames.len() > 1 {
4801            stats.note_side_transport_chunks(frames.len());
4802        }
4803        Ok(frames)
4804    }
4805
4806    fn decode_side_transport_frame(
4807        &self,
4808        side: RelaySideId,
4809        bytes: &[u8],
4810    ) -> TelemetryResult<Option<Arc<[u8]>>> {
4811        let Some((kind, body)) = Self::parse_side_transport_wrapper(bytes)? else {
4812            return Ok(Some(Arc::from(bytes)));
4813        };
4814        match kind {
4815            SIDE_TRANSPORT_KIND_FULL => {
4816                let mut off = 0usize;
4817                let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4818                    .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4819                let raw = Arc::<[u8]>::from(&body[off..]);
4820                if let Ok((template, _, _, _, timestamp, _, _, _)) =
4821                    Self::extract_side_header_template(raw.as_ref())
4822                {
4823                    let mut st = self.state.lock();
4824                    let max_templates = st
4825                        .sides
4826                        .get(side)
4827                        .and_then(|side| side.as_ref())
4828                        .map(|side| side.opts.max_side_transport_templates)
4829                        .unwrap_or(DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT);
4830                    let evicted = st.side_transport.get_mut(&side).is_some_and(|side_state| {
4831                        let evicted =
4832                            side_state.insert_rx_template(template_id, template, max_templates);
4833                        side_state.rx_last_timestamps.insert(template_id, timestamp);
4834                        evicted
4835                    });
4836                    if evicted {
4837                        st.side_runtime_stats
4838                            .entry(side)
4839                            .or_default()
4840                            .note_side_transport_template_eviction();
4841                    }
4842                }
4843                Ok(Some(raw))
4844            }
4845            SIDE_TRANSPORT_KIND_COMPACT
4846            | SIDE_TRANSPORT_KIND_COMPACT_DELTA
4847            | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => {
4848                if body.is_empty() {
4849                    return Err(TelemetryError::Unpack("short side compact frame"));
4850                }
4851                let mut off = 1usize;
4852                let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4853                    .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4854                let mut compact_body = Vec::with_capacity(1 + body.len().saturating_sub(off));
4855                compact_body.push(body[0]);
4856                compact_body.extend_from_slice(&body[off..]);
4857                let (template, timestamp_base) = {
4858                    let st = self.state.lock();
4859                    let state = st.side_transport.get(&side);
4860                    let template = state
4861                        .and_then(|state| state.rx_templates_by_id.get(&template_id))
4862                        .cloned();
4863                    let timestamp_base = if matches!(
4864                        kind,
4865                        SIDE_TRANSPORT_KIND_COMPACT_DELTA
4866                            | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4867                    ) {
4868                        state
4869                            .and_then(|state| state.rx_last_timestamps.get(&template_id))
4870                            .copied()
4871                    } else {
4872                        None
4873                    };
4874                    (template, timestamp_base)
4875                };
4876                let template =
4877                    template.ok_or(TelemetryError::Unpack("unknown side compact template"))?;
4878                let timestamp_mode = match kind {
4879                    SIDE_TRANSPORT_KIND_COMPACT_DELTA => SideCompactTimestampMode::Delta,
4880                    SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => SideCompactTimestampMode::Omitted,
4881                    _ => SideCompactTimestampMode::Absolute,
4882                };
4883                let (frame, timestamp) = Self::reconstruct_side_compact_frame(
4884                    &template,
4885                    &compact_body,
4886                    timestamp_mode,
4887                    timestamp_base,
4888                )?;
4889                let mut st = self.state.lock();
4890                if let Some(side_state) = st.side_transport.get_mut(&side) {
4891                    side_state.rx_last_timestamps.insert(template_id, timestamp);
4892                }
4893                Ok(Some(frame))
4894            }
4895            SIDE_TRANSPORT_KIND_CHUNK => {
4896                if body.len() < 8 {
4897                    return Err(TelemetryError::Unpack("short side chunk frame"));
4898                }
4899                let transfer_id = u32::from_le_bytes([body[0], body[1], body[2], body[3]]);
4900                let index = u16::from_le_bytes([body[4], body[5]]);
4901                let total = u16::from_le_bytes([body[6], body[7]]);
4902                let payload = Arc::<[u8]>::from(&body[8..]);
4903                let assembled = {
4904                    let mut st = self.state.lock();
4905                    let side_state = st
4906                        .side_transport
4907                        .get_mut(&side)
4908                        .ok_or(TelemetryError::BadArg)?;
4909                    let entry = side_state.rx_chunks.entry(transfer_id).or_default();
4910                    if entry.total == 0 {
4911                        entry.total = total;
4912                    } else if entry.total != total {
4913                        side_state.rx_chunks.remove(&transfer_id);
4914                        return Err(TelemetryError::Unpack("side chunk total mismatch"));
4915                    }
4916                    entry.received.entry(index).or_insert(payload);
4917                    if entry.received.len() == usize::from(total) {
4918                        let entry = side_state
4919                            .rx_chunks
4920                            .remove(&transfer_id)
4921                            .ok_or(TelemetryError::Unpack("side chunk missing"))?;
4922                        let mut out = Vec::new();
4923                        for idx in 0..entry.total {
4924                            let chunk = entry
4925                                .received
4926                                .get(&idx)
4927                                .ok_or(TelemetryError::Unpack("side chunk gap"))?;
4928                            out.extend_from_slice(chunk);
4929                        }
4930                        Some(Arc::<[u8]>::from(out))
4931                    } else {
4932                        None
4933                    }
4934                };
4935                match assembled {
4936                    Some(frame) => self.decode_side_transport_frame(side, frame.as_ref()),
4937                    None => Ok(None),
4938                }
4939            }
4940            _ => Err(TelemetryError::Unpack("unknown side transport frame")),
4941        }
4942    }
4943
4944    /// Helper: call a TX handler with the best representation we have.
4945    /// - Packet handler + Packet item: direct.
4946    /// - Packed handler + Packed item: direct.
4947    /// - Packet handler + Packed item: unpack for this call.
4948    /// - Packed handler + Packet item: pack for this call.
4949    fn call_tx_handler(
4950        &self,
4951        side: RelaySideId,
4952        handler: &RelayTxHandlerFn,
4953        data: &RelayItem,
4954    ) -> TelemetryResult<()> {
4955        let opts = {
4956            let st = self.state.lock();
4957            Self::side_ref(&st, side)?.opts
4958        };
4959        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
4960            return Err(TelemetryError::Io("side tx busy"));
4961        };
4962        let started_ms = self.clock.now_ms();
4963        let ty = match data {
4964            RelayItem::Packet(pkt) => pkt.data_type(),
4965            RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
4966        };
4967        let result = match (handler, data) {
4968            // Fast paths
4969            (RelayTxHandlerFn::Packed(f), RelayItem::Packed(bytes)) => {
4970                let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
4971                let mut sent_bytes = 0usize;
4972                for frame in frames {
4973                    f(frame.as_ref())?;
4974                    sent_bytes = sent_bytes.saturating_add(frame.len());
4975                }
4976                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4977                self.note_side_tx_success(side, ty, sent_bytes, 1);
4978                return Ok(());
4979            }
4980            (RelayTxHandlerFn::Packet(f), RelayItem::Packet(pkt)) => f(pkt),
4981
4982            // Conversion paths
4983            (RelayTxHandlerFn::Packed(f), RelayItem::Packet(pkt)) => {
4984                let owned = wire_format::pack_packet(pkt);
4985                let frames = self.encode_side_transport_frames(side, opts, owned)?;
4986                let mut sent_bytes = 0usize;
4987                for frame in frames {
4988                    f(frame.as_ref())?;
4989                    sent_bytes = sent_bytes.saturating_add(frame.len());
4990                }
4991                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4992                self.note_side_tx_success(side, ty, sent_bytes, 1);
4993                return Ok(());
4994            }
4995            (RelayTxHandlerFn::Packet(f), RelayItem::Packed(bytes)) => {
4996                if wire_format::peek_frame_info(bytes.as_ref())
4997                    .ok()
4998                    .is_some_and(|frame| frame.ack_only())
4999                {
5000                    return Ok(());
5001                }
5002                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
5003                f(&pkt)
5004            }
5005        };
5006        if result.is_ok()
5007            && let Ok(bytes) = Self::relay_item_wire_len(data)
5008        {
5009            self.record_side_tx_sample(side, bytes, started_ms, self.clock.now_ms());
5010            self.note_side_tx_success(side, ty, bytes, 1);
5011        } else if result.is_err() {
5012            self.note_side_tx_failure(side, ty, 1);
5013        }
5014        result
5015    }
5016
5017    fn adjust_reliable_for_side(
5018        &self,
5019        opts: RelaySideOptions,
5020        data: RelayItem,
5021    ) -> TelemetryResult<Option<RelayItem>> {
5022        if opts.reliable_enabled {
5023            return Ok(Some(data));
5024        }
5025
5026        match data {
5027            RelayItem::Packed(bytes) => {
5028                let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
5029                    Ok(frame) => frame,
5030                    Err(_) => return Ok(Some(RelayItem::Packed(bytes))),
5031                };
5032                if is_reliable_type(frame.envelope.ty)
5033                    && let Some(hdr) = frame.reliable
5034                {
5035                    if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
5036                        return Ok(None);
5037                    }
5038                    if (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) == 0 {
5039                        let Some(rewritten) = wire_format::rewrite_reliable_header_owned(
5040                            bytes.as_ref(),
5041                            wire_format::RELIABLE_FLAG_UNSEQUENCED,
5042                            hdr.seq,
5043                            0,
5044                        )?
5045                        else {
5046                            return Ok(Some(RelayItem::Packed(bytes)));
5047                        };
5048                        return Ok(Some(RelayItem::Packed(rewritten)));
5049                    }
5050                }
5051                Ok(Some(RelayItem::Packed(bytes)))
5052            }
5053            RelayItem::Packet(pkt) => {
5054                if matches!(
5055                    pkt.data_type(),
5056                    crate::DataType::ReliableAck
5057                        | crate::DataType::ReliablePartialAck
5058                        | crate::DataType::ReliablePacketRequest
5059                ) {
5060                    return Ok(None);
5061                }
5062                Ok(Some(RelayItem::Packet(pkt)))
5063            }
5064        }
5065    }
5066
5067    /// Drain the RX queue fully, expanding to TX items.
5068    #[inline]
5069    pub fn process_rx_queue(&self) -> TelemetryResult<()> {
5070        self.process_rx_queue_with_timeout(0)
5071    }
5072
5073    /// Drain the TX queue fully, invoking per-side TX handlers.
5074    ///
5075    /// If called from inside a side TX callback, this becomes a no-op so relay TX handlers cannot
5076    /// recurse into nested queue drains on the same stack.
5077    #[inline]
5078    pub fn process_tx_queue(&self) -> TelemetryResult<()> {
5079        self.process_tx_queue_with_timeout(0)
5080    }
5081
5082    /// Drain RX then TX queues fully (one pass).
5083    #[inline]
5084    pub fn process_all_queues(&self) -> TelemetryResult<()> {
5085        self.process_all_queues_with_timeout(0)
5086    }
5087
5088    /// Process the TX queue for up to `timeout_ms` milliseconds.
5089    ///
5090    /// `timeout_ms == 0` drains fully. If called from inside a side TX callback, this becomes a
5091    /// no-op so relay TX handlers cannot recurse into nested queue drains on the same stack.
5092    pub fn process_tx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5093        if self.side_tx_active() {
5094            return Ok(());
5095        }
5096        #[cfg(feature = "discovery")]
5097        {
5098            let _ = self.poll_discovery()?;
5099        }
5100        let start = self.clock.now_ms();
5101        loop {
5102            self.process_reliable_timeouts()?;
5103            if self.process_replay_queue_item()? {
5104                if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5105                    break;
5106                }
5107                continue;
5108            }
5109            let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() else {
5110                break;
5111            };
5112            match self.send_tx_item(src, dst, handler, opts, data.clone()) {
5113                Ok(sent) => {
5114                    if sent
5115                        && timeout_ms != 0
5116                        && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64
5117                    {
5118                        break;
5119                    }
5120                }
5121                Err(e) if Self::is_side_tx_busy(&e) => {
5122                    let priority = Self::relay_item_priority(&data)?;
5123                    let mut st = self.state.lock();
5124                    st.push_tx(RelayTxItem {
5125                        src,
5126                        dst,
5127                        data,
5128                        priority,
5129                    })?;
5130                    break;
5131                }
5132                Err(e) => return Err(e),
5133            }
5134        }
5135        Ok(())
5136    }
5137
5138    /// Process RX queue with timeout.
5139    pub fn process_rx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5140        #[cfg(feature = "discovery")]
5141        {
5142            let _ = self.poll_discovery()?;
5143        }
5144        let start = self.clock.now_ms();
5145        loop {
5146            let item_opt = {
5147                let mut st = self.state.lock();
5148                st.rx_queue.pop_front()
5149            };
5150            let Some(item) = item_opt else { break };
5151            self.process_rx_queue_item(item)?;
5152
5153            if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5154                break;
5155            }
5156        }
5157        Ok(())
5158    }
5159
5160    /// Process RX and TX queues interleaved for up to `timeout_ms` milliseconds.
5161    ///
5162    /// `timeout_ms == 0` drains fully. If called from inside a side TX callback, this becomes a
5163    /// no-op so relay TX handlers cannot recurse into nested queue drains on the same stack.
5164    pub fn process_all_queues_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5165        if self.side_tx_active() {
5166            return Ok(());
5167        }
5168        #[cfg(feature = "discovery")]
5169        {
5170            let _ = self.poll_discovery()?;
5171        }
5172        let drain_fully = timeout_ms == 0;
5173        let start = if drain_fully { 0 } else { self.clock.now_ms() };
5174
5175        loop {
5176            let mut did_any = false;
5177            self.process_reliable_timeouts()?;
5178
5179            // First move RX → TX
5180            if let Some(item) = {
5181                let mut st = self.state.lock();
5182                st.rx_queue.pop_front()
5183            } {
5184                self.process_rx_queue_item(item)?;
5185                did_any = true;
5186            }
5187
5188            if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5189                break;
5190            }
5191
5192            if self.process_replay_queue_item()? {
5193                did_any = true;
5194            }
5195
5196            // Then send out TX
5197            let sent_one = if let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() {
5198                self.send_tx_item(src, dst, handler, opts, data)?
5199            } else {
5200                false
5201            };
5202
5203            if sent_one {
5204                did_any = true;
5205            }
5206
5207            if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5208                break;
5209            }
5210
5211            if !did_any {
5212                break;
5213            }
5214        }
5215
5216        Ok(())
5217    }
5218
5219    /// Runs one application-loop maintenance cycle.
5220    ///
5221    /// This polls built-in discovery when that feature is compiled in, then drains queued RX/TX
5222    /// work for up to `timeout_ms` milliseconds.
5223    pub fn periodic(&self, timeout_ms: u32) -> TelemetryResult<()> {
5224        #[cfg(feature = "discovery")]
5225        {
5226            let _ = self.poll_discovery()?;
5227        }
5228
5229        self.process_all_queues_with_timeout(timeout_ms)
5230    }
5231}