Skip to main content

sedsnet/
relay.rs

1use crate::config::{
2    MAX_QUEUE_BUDGET, MAX_RECENT_RX_IDS, QUEUE_GROW_STEP, RECENT_RX_QUEUE_BYTES,
3    RELIABLE_MAX_END_TO_END_ACK_CACHE, RELIABLE_MAX_END_TO_END_PENDING, RELIABLE_MAX_PENDING,
4    RELIABLE_MAX_RETRIES, RELIABLE_MAX_RETURN_ROUTES, RELIABLE_RETRANSMIT_MS, STARTING_QUEUE_SIZE,
5};
6use crate::diagnostics::{
7    AdaptiveLinkStats, DiscoveryRuntimeStats, QueueRuntimeStats, ReliableRuntimeStats,
8    RouteModeStats, RouteOverrideStats, RoutePriorityStats, RouteWeightStats, RuntimeSideStats,
9    RuntimeStatsSnapshot, RuntimeTypeStats, TypedRouteOverrideStats,
10};
11#[cfg(feature = "discovery")]
12use crate::discovery::{
13    self, ClientStatsSnapshot, DISCOVERY_ROUTE_TTL_MS, DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS,
14    DISCOVERY_SLOW_LINK_PING_INTERVAL_MS, DiscoveryCadenceState,
15    TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS, TopologyAnnouncerRoute, TopologyBoardNode,
16    TopologySideRoute, TopologySnapshot,
17};
18use crate::packet::{Packet, hash_bytes_u64};
19use crate::queue::{BoundedDeque, ByteCost};
20use crate::wire_format;
21use crate::{is_reliable_type, message_meta, message_priority, reliable_mode};
22use crate::{
23    router::{Clock, CompactTimestampOmissionPolicy, SideTransportProfile},
24    {
25        RouteSelectionMode, TelemetryError, TelemetryResult,
26        lock::{ReentryGate, ReentryGuard, RouterMutex},
27    },
28};
29use alloc::borrow::ToOwned;
30use alloc::boxed::Box;
31use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
32use alloc::string::{String, ToString};
33use alloc::{sync::Arc, vec, vec::Vec};
34use core::mem::size_of;
35use crc32fast::Hasher as Crc32Hasher;
36
37/// Logical side index (CAN, UART, RADIO, etc.)
38pub type RelaySideId = usize;
39const SIDE_TRANSPORT_MAGIC: &[u8; 3] = b"SDT";
40const SIDE_TRANSPORT_KIND_FULL: u8 = 0x01;
41const SIDE_TRANSPORT_KIND_COMPACT: u8 = 0x02;
42const SIDE_TRANSPORT_KIND_CHUNK: u8 = 0x03;
43const SIDE_TRANSPORT_KIND_COMPACT_DELTA: u8 = 0x04;
44const SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP: u8 = 0x05;
45const SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED: u8 = 0x01;
46const SIDE_TRANSPORT_FLAG_WIRE_CONTRACT: u8 = 0x04;
47const SIDE_TRANSPORT_FLAG_PACKET_NONCE: u8 = 0x08;
48const SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT: u8 = 0x20;
49const SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER: u8 = 0x40;
50const CONTROL_SLOW_LINK_CAPACITY_BPS: u64 = 512;
51const SIDE_TRANSPORT_CHUNK_OVERHEAD: usize = 3 + 1 + 4 + 2 + 2 + wire_format::CRC32_BYTES;
52const SIDE_TRANSPORT_EP_BITMAP_BITS: usize = (crate::MAX_VALUE_DATA_ENDPOINT as usize) + 1;
53const SIDE_TRANSPORT_EP_BITMAP_BYTES: usize = SIDE_TRANSPORT_EP_BITMAP_BITS.div_ceil(8);
54pub const IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
55    crate::router::IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
56pub const IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES: usize =
57    crate::router::IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
58pub const DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT: usize =
59    crate::router::DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT;
60/// Packet Handler function type
61type PacketHandlerFn = dyn Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static;
62
63/// Packed Handler function type
64type PackedHandlerFn = dyn Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static;
65
66/// TX handler for a relay side: either packed or packet-based.
67#[derive(Clone)]
68pub enum RelayTxHandlerFn {
69    Packed(Arc<PackedHandlerFn>),
70    Packet(Arc<PacketHandlerFn>),
71}
72
73#[derive(Clone, Copy, Debug)]
74pub struct RelaySideOptions {
75    /// Enables the relay's per-link reliable transport layer on this side.
76    ///
77    /// When `true` and the side uses a packed TX handler, reliable schema traffic on this hop
78    /// gains relay-managed sequence numbers, ACKs, packet requests, and retransmits.
79    /// Packet-output sides still receive decoded packets rather than packed reliable framing.
80    pub reliable_enabled: bool,
81    /// Marks the side as eligible for link-local-only endpoints and discovery routes.
82    pub link_local_enabled: bool,
83    /// Allows packets received from this side to enter relay processing.
84    pub ingress_enabled: bool,
85    /// Allows the relay to transmit packets toward this side.
86    pub egress_enabled: bool,
87    /// Enables side-local header-template reuse for packed transport.
88    pub header_template_enabled: bool,
89    /// Maximum number of bytes to emit per packed TX callback.
90    ///
91    /// When non-zero and a packed frame would exceed this size, the relay
92    /// splits it into ordered side-transport chunks and reassembles them on RX
93    /// before normal relay processing. This is intended for fixed-size links
94    /// such as CAN or I2C while keeping the user API packet-oriented.
95    pub max_frame_bytes: usize,
96    /// Target total side-transport overhead for compact follow-up frames.
97    pub compact_header_target_bytes: usize,
98    /// Maximum side-local header templates retained for TX and RX dictionaries.
99    pub max_side_transport_templates: usize,
100    /// Omits the timestamp field from compact follow-up frames when it is unchanged.
101    pub omit_unchanged_compact_timestamps: bool,
102    /// Optional per-data-type timestamp omission policy for compact follow-up frames.
103    pub compact_timestamp_omission_types: CompactTimestampOmissionPolicy,
104    /// Declared compact-link profile for stats and future negotiation.
105    pub side_transport_profile: SideTransportProfile,
106}
107
108impl Default for RelaySideOptions {
109    fn default() -> Self {
110        Self {
111            reliable_enabled: false,
112            link_local_enabled: false,
113            ingress_enabled: true,
114            egress_enabled: true,
115            header_template_enabled: false,
116            max_frame_bytes: 0,
117            compact_header_target_bytes: 0,
118            max_side_transport_templates: DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT,
119            omit_unchanged_compact_timestamps: false,
120            compact_timestamp_omission_types: CompactTimestampOmissionPolicy::none(),
121            side_transport_profile: SideTransportProfile::Canonical,
122        }
123    }
124}
125
126impl RelaySideOptions {
127    /// Convenience preset for bounded packed-side transport.
128    ///
129    /// `max_frame_bytes == 0` leaves packed frames unbounded. Values greater
130    /// than zero enable relay-managed chunking/reassembly on this side.
131    #[inline]
132    pub fn with_small_packet_transport(mut self, max_frame_bytes: usize) -> Self {
133        self.header_template_enabled = true;
134        self.max_frame_bytes = max_frame_bytes;
135        self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
136        self.side_transport_profile = SideTransportProfile::Ipv6Like;
137        self
138    }
139
140    #[inline]
141    pub fn with_ipv4_like_compact_header_target(mut self) -> Self {
142        self.header_template_enabled = true;
143        self.compact_header_target_bytes = IPV4_LIKE_COMPACT_HEADER_TARGET_BYTES;
144        self.omit_unchanged_compact_timestamps = true;
145        self.side_transport_profile = SideTransportProfile::Ipv4Like;
146        self
147    }
148
149    #[inline]
150    pub fn with_ipv6_like_compact_header_target(mut self) -> Self {
151        self.header_template_enabled = true;
152        self.compact_header_target_bytes = IPV6_LIKE_COMPACT_HEADER_TARGET_BYTES;
153        self.side_transport_profile = SideTransportProfile::Ipv6Like;
154        self
155    }
156
157    #[inline]
158    pub fn with_template_transport(mut self) -> Self {
159        self.header_template_enabled = true;
160        self.side_transport_profile = SideTransportProfile::Template;
161        self
162    }
163
164    #[inline]
165    pub fn with_omitted_unchanged_compact_timestamps(mut self) -> Self {
166        self.header_template_enabled = true;
167        self.omit_unchanged_compact_timestamps = true;
168        self
169    }
170
171    #[inline]
172    pub fn with_omitted_unchanged_compact_timestamps_for_type(
173        mut self,
174        ty: crate::DataType,
175    ) -> Self {
176        self.header_template_enabled = true;
177        self.compact_timestamp_omission_types.insert(ty);
178        self
179    }
180
181    #[inline]
182    pub fn effective_transport_profile(self) -> SideTransportProfile {
183        if !self.header_template_enabled && self.max_frame_bytes == 0 {
184            SideTransportProfile::Canonical
185        } else if self.side_transport_profile == SideTransportProfile::Canonical {
186            SideTransportProfile::Template
187        } else {
188            self.side_transport_profile
189        }
190    }
191
192    #[cfg(feature = "discovery")]
193    #[inline]
194    pub fn link_capabilities(self) -> discovery::LinkCapabilities {
195        let mut flags = discovery::LINK_CAPABILITY_END_TO_END_RELIABILITY;
196        if self.header_template_enabled {
197            flags |= discovery::LINK_CAPABILITY_HEADER_TEMPLATES;
198        }
199        if self.max_frame_bytes != 0 {
200            flags |= discovery::LINK_CAPABILITY_CHUNKING;
201        }
202        if self.reliable_enabled {
203            flags |= discovery::LINK_CAPABILITY_RELIABILITY;
204        }
205        if self.omit_unchanged_compact_timestamps
206            || !self.compact_timestamp_omission_types.is_empty()
207        {
208            flags |= discovery::LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS;
209        }
210        #[cfg(feature = "cryptography")]
211        {
212            flags |= discovery::LINK_CAPABILITY_CRYPTO;
213        }
214        discovery::LinkCapabilities {
215            version: 1,
216            flags,
217            profile: self.effective_transport_profile().discovery_code(),
218            max_frame_bytes: self.max_frame_bytes.min(u32::MAX as usize) as u32,
219            compact_header_target_bytes: self.compact_header_target_bytes.min(u32::MAX as usize)
220                as u32,
221            max_side_transport_templates: self.max_side_transport_templates.min(u32::MAX as usize)
222                as u32,
223        }
224    }
225}
226
227/// One side of the relay – a name + TX handler.
228#[derive(Clone)]
229pub struct RelaySide {
230    pub name: &'static str,
231    pub tx_handler: RelayTxHandlerFn,
232    pub opts: RelaySideOptions,
233}
234
235#[derive(Clone, Debug, PartialEq, Eq)]
236pub enum RelayItem {
237    Packed(Arc<[u8]>),
238    Packet(Arc<Packet>),
239}
240
241/// Item that was received by the relay from some side.
242#[derive(Clone, Debug, PartialEq, Eq)]
243struct RelayRxItem {
244    src: RelaySideId,
245    data: RelayItem,
246    priority: u8,
247}
248
249impl ByteCost for RelayRxItem {
250    fn byte_cost(&self) -> usize {
251        match &self.data {
252            RelayItem::Packed(bytes) => bytes.len(),
253            RelayItem::Packet(pkt) => pkt.byte_cost(),
254        }
255    }
256}
257
258/// Item that is ready to be transmitted out a destination side.
259#[derive(Clone, Debug, PartialEq, Eq)]
260struct RelayTxItem {
261    src: Option<RelaySideId>,
262    dst: RelaySideId,
263    data: RelayItem,
264    priority: u8,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq)]
268struct RelayReplayItem {
269    dst: RelaySideId,
270    bytes: Arc<[u8]>,
271    priority: u8,
272}
273
274impl ByteCost for RelayTxItem {
275    fn byte_cost(&self) -> usize {
276        match &self.data {
277            RelayItem::Packed(bytes) => bytes.len(),
278            RelayItem::Packet(pkt) => pkt.byte_cost(),
279        }
280    }
281}
282
283impl ByteCost for RelayReplayItem {
284    fn byte_cost(&self) -> usize {
285        self.bytes.len()
286    }
287}
288
289// -------------------- Reliable delivery state (relay) --------------------
290
291#[derive(Debug, Clone)]
292struct ReliableTxState {
293    next_seq: u32,
294    sent_order: VecDeque<u32>,
295    sent: BTreeMap<u32, ReliableSent>,
296}
297
298#[derive(Debug, Clone)]
299struct ReliableSent {
300    bytes: Arc<[u8]>,
301    last_send_ms: u64,
302    retries: u32,
303    queued: bool,
304    partial_acked: bool,
305}
306
307#[derive(Debug, Clone)]
308struct ReliableRxState {
309    expected_seq: u32,
310    buffered: BTreeMap<u32, Arc<[u8]>>,
311}
312
313#[derive(Debug, Clone)]
314struct ReliableReturnRouteState {
315    side: RelaySideId,
316}
317
318#[cfg(feature = "discovery")]
319#[derive(Debug, Clone, Default, PartialEq, Eq)]
320struct DiscoverySenderState {
321    reachable: Vec<crate::DataEndpoint>,
322    reachable_timesync_sources: Vec<String>,
323    topology_boards: Vec<TopologyBoardNode>,
324    last_seen_ms: u64,
325}
326
327#[inline]
328fn is_internal_control_type(ty: crate::DataType) -> bool {
329    if matches!(
330        ty,
331        crate::DataType::ReliableAck
332            | crate::DataType::ReliablePartialAck
333            | crate::DataType::ReliablePacketRequest
334            | crate::DataType::P2pMessage
335    ) {
336        return true;
337    }
338
339    #[cfg(feature = "timesync")]
340    if matches!(
341        ty,
342        crate::DataType::TimeSyncAnnounce
343            | crate::DataType::TimeSyncRequest
344            | crate::DataType::TimeSyncResponse
345    ) {
346        return true;
347    }
348
349    #[cfg(feature = "discovery")]
350    if discovery::is_discovery_type(ty) {
351        return true;
352    }
353
354    let _ = ty;
355    false
356}
357
358#[cfg(feature = "discovery")]
359#[derive(Debug, Clone, Default, PartialEq, Eq)]
360struct DiscoverySideState {
361    reachable: Vec<crate::DataEndpoint>,
362    reachable_timesync_sources: Vec<String>,
363    last_seen_ms: u64,
364    announcers: BTreeMap<String, DiscoverySenderState>,
365}
366
367#[derive(Clone, Debug, Default, PartialEq, Eq)]
368struct SideChunkAssembly {
369    total: u16,
370    received: BTreeMap<u16, Arc<[u8]>>,
371}
372
373#[derive(Clone, Debug, Default)]
374struct SideTransportState {
375    tx_template_ids: BTreeMap<u64, u32>,
376    tx_templates: BTreeMap<u64, SideHeaderTemplate>,
377    tx_last_timestamps: BTreeMap<u32, u64>,
378    rx_templates: BTreeMap<u64, SideHeaderTemplate>,
379    rx_templates_by_id: BTreeMap<u32, SideHeaderTemplate>,
380    rx_last_timestamps: BTreeMap<u32, u64>,
381    rx_chunks: BTreeMap<u32, SideChunkAssembly>,
382    next_chunk_id: u32,
383    next_template_id: u32,
384}
385
386impl SideTransportState {
387    fn tx_template_count(&self) -> usize {
388        self.tx_template_ids.len()
389    }
390
391    fn rx_template_count(&self) -> usize {
392        self.rx_templates_by_id.len()
393    }
394
395    fn insert_tx_template(
396        &mut self,
397        template: SideHeaderTemplate,
398        template_id: u32,
399        max_templates: usize,
400    ) -> bool {
401        if max_templates == 0 {
402            return false;
403        }
404        let mut evicted = false;
405        if self.tx_template_ids.len() >= max_templates
406            && !self.tx_template_ids.contains_key(&template.hash)
407            && let Some(old_hash) = self.tx_template_ids.keys().next().copied()
408        {
409            if let Some(old_id) = self.tx_template_ids.remove(&old_hash) {
410                self.tx_last_timestamps.remove(&old_id);
411            }
412            self.tx_templates.remove(&old_hash);
413            evicted = true;
414        }
415        self.tx_template_ids.insert(template.hash, template_id);
416        self.tx_templates.insert(template.hash, template);
417        evicted
418    }
419
420    fn insert_rx_template(
421        &mut self,
422        template_id: u32,
423        template: SideHeaderTemplate,
424        max_templates: usize,
425    ) -> bool {
426        if max_templates == 0 {
427            return false;
428        }
429        let mut evicted = false;
430        if self.rx_templates_by_id.len() >= max_templates
431            && !self.rx_templates_by_id.contains_key(&template_id)
432            && let Some(old_id) = self.rx_templates_by_id.keys().next().copied()
433            && let Some(old_template) = self.rx_templates_by_id.remove(&old_id)
434        {
435            self.rx_templates.remove(&old_template.hash);
436            self.rx_last_timestamps.remove(&old_id);
437            evicted = true;
438        }
439        self.rx_templates_by_id
440            .insert(template_id, template.clone());
441        self.rx_templates.insert(template.hash, template);
442        evicted
443    }
444}
445
446#[derive(Clone, Debug, PartialEq, Eq)]
447struct SideHeaderTemplate {
448    hash: u64,
449    base_flags: u8,
450    prefix: Arc<[u8]>,
451    between: Arc<[u8]>,
452    reliable_flags: Option<u8>,
453    reliable_compact: bool,
454}
455
456type SideTemplateExtract<'a> = (
457    SideHeaderTemplate,
458    crate::DataType,
459    u8,
460    u64,
461    u64,
462    u16,
463    Option<(u32, u32)>,
464    &'a [u8],
465);
466
467#[derive(Clone, Copy, Debug, PartialEq, Eq)]
468enum SideCompactTimestampMode {
469    Absolute,
470    Delta,
471    Omitted,
472}
473
474#[derive(Debug, Clone, Default)]
475struct AdaptiveRouteStats {
476    estimated_bandwidth_bps: u64,
477    peak_bandwidth_bps: u64,
478    last_observed_ms: u64,
479    last_slow_observed_ms: u64,
480    sample_count: u64,
481    window_started_ms: u64,
482    window_bytes: u64,
483    peak_usage_bps: u64,
484}
485
486#[cfg(feature = "discovery")]
487#[derive(Debug, Clone, Default)]
488struct DiscoverySideThrottleState {
489    next_ping_ms: u64,
490    next_full_ms: u64,
491}
492
493#[cfg(all(feature = "discovery", feature = "timesync"))]
494#[derive(Debug, Clone, Default)]
495struct TimeSyncSideThrottleState {
496    next_allowed_ms: u64,
497}
498
499#[cfg(feature = "discovery")]
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501enum DiscoveryAdvertiseLevel {
502    MinimalPing,
503    Full,
504}
505
506impl AdaptiveRouteStats {
507    #[inline]
508    fn observe(&mut self, bytes: usize, sample_bps: u64, now_ms: u64) {
509        self.estimated_bandwidth_bps = if self.estimated_bandwidth_bps == 0 {
510            sample_bps
511        } else if sample_bps >= self.estimated_bandwidth_bps {
512            self.estimated_bandwidth_bps
513                .saturating_mul(3)
514                .saturating_add(sample_bps.saturating_mul(5))
515                / 8
516        } else {
517            self.estimated_bandwidth_bps
518                .saturating_mul(7)
519                .saturating_add(sample_bps)
520                / 8
521        };
522        self.peak_bandwidth_bps = self.peak_bandwidth_bps.max(sample_bps);
523        self.last_observed_ms = now_ms;
524        if sample_bps > 0 && sample_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS {
525            self.last_slow_observed_ms = now_ms;
526        }
527        self.sample_count = self.sample_count.saturating_add(1);
528        if self.window_started_ms == 0 || now_ms.saturating_sub(self.window_started_ms) > 1_000 {
529            self.window_started_ms = now_ms;
530            self.window_bytes = 0;
531        }
532        self.window_bytes = self.window_bytes.saturating_add(bytes as u64);
533        self.peak_usage_bps = self.peak_usage_bps.max(self.current_usage_bps(now_ms));
534    }
535
536    #[inline]
537    fn current_usage_bps(&self, now_ms: u64) -> u64 {
538        if self.window_started_ms == 0 {
539            return 0;
540        }
541        let elapsed_ms = now_ms.saturating_sub(self.window_started_ms).max(1);
542        ((u128::from(self.window_bytes)).saturating_mul(1000) / u128::from(elapsed_ms))
543            .min(u128::from(u64::MAX)) as u64
544    }
545
546    #[inline]
547    fn available_headroom_bps(&self, now_ms: u64) -> u64 {
548        let capacity = self
549            .estimated_bandwidth_bps
550            .max(self.peak_bandwidth_bps)
551            .max(1);
552        capacity.saturating_sub(self.current_usage_bps(now_ms))
553    }
554
555    #[inline]
556    fn weight(&self, now_ms: u64) -> u64 {
557        self.available_headroom_bps(now_ms).max(1)
558    }
559
560    #[inline]
561    fn snapshot(&self, now_ms: u64, auto_balancing_enabled: bool) -> AdaptiveLinkStats {
562        let current_usage_bps = self.current_usage_bps(now_ms);
563        let estimated_capacity_bps = self.estimated_bandwidth_bps.max(1);
564        let peak_capacity_bps = self.peak_bandwidth_bps.max(estimated_capacity_bps);
565        let available_headroom_bps = peak_capacity_bps.saturating_sub(current_usage_bps);
566        AdaptiveLinkStats {
567            auto_balancing_enabled,
568            estimated_capacity_bps,
569            peak_capacity_bps,
570            current_usage_bps,
571            peak_usage_bps: self.peak_usage_bps.max(current_usage_bps),
572            available_headroom_bps,
573            effective_weight: available_headroom_bps.max(1),
574            last_observed_ms: self.last_observed_ms,
575            sample_count: self.sample_count,
576        }
577    }
578}
579
580#[derive(Debug, Clone, Default)]
581struct TypeRuntimeStatsInner {
582    tx_packets: u64,
583    tx_bytes: u64,
584    rx_packets: u64,
585    rx_bytes: u64,
586    relayed_tx_packets: u64,
587    relayed_tx_bytes: u64,
588    relayed_rx_packets: u64,
589    relayed_rx_bytes: u64,
590    tx_retries: u64,
591    handler_failures: u64,
592}
593
594#[derive(Debug, Clone, Default)]
595struct SideRuntimeStatsInner {
596    tx_packets: u64,
597    tx_bytes: u64,
598    rx_packets: u64,
599    rx_bytes: u64,
600    relayed_tx_packets: u64,
601    relayed_tx_bytes: u64,
602    relayed_rx_packets: u64,
603    relayed_rx_bytes: u64,
604    tx_retries: u64,
605    tx_handler_failures: u64,
606    total_handler_retries: u64,
607    side_transport_full_frames: u64,
608    side_transport_compact_frames: u64,
609    side_transport_compact_delta_frames: u64,
610    side_transport_compact_omitted_timestamp_frames: u64,
611    side_transport_chunk_frames: u64,
612    side_transport_raw_bytes: u64,
613    side_transport_wire_bytes: u64,
614    side_transport_bytes_saved: u64,
615    side_transport_min_compact_overhead_bytes: Option<usize>,
616    side_transport_max_compact_overhead_bytes: Option<usize>,
617    side_transport_compact_target_misses: u64,
618    side_transport_template_evictions: u64,
619    data_types: BTreeMap<u32, TypeRuntimeStatsInner>,
620}
621
622impl SideRuntimeStatsInner {
623    fn type_stats_mut(&mut self, ty: crate::DataType) -> &mut TypeRuntimeStatsInner {
624        self.data_types.entry(ty.as_u32()).or_default()
625    }
626
627    fn note_tx(&mut self, ty: crate::DataType, bytes: usize, retries: usize) {
628        self.tx_packets = self.tx_packets.saturating_add(1);
629        self.tx_bytes = self.tx_bytes.saturating_add(bytes as u64);
630        self.relayed_tx_packets = self.relayed_tx_packets.saturating_add(1);
631        self.relayed_tx_bytes = self.relayed_tx_bytes.saturating_add(bytes as u64);
632        self.tx_retries = self.tx_retries.saturating_add(retries as u64);
633        self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
634        let stats = self.type_stats_mut(ty);
635        stats.tx_packets = stats.tx_packets.saturating_add(1);
636        stats.tx_bytes = stats.tx_bytes.saturating_add(bytes as u64);
637        stats.relayed_tx_packets = stats.relayed_tx_packets.saturating_add(1);
638        stats.relayed_tx_bytes = stats.relayed_tx_bytes.saturating_add(bytes as u64);
639        stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
640    }
641
642    fn note_rx(&mut self, ty: crate::DataType, bytes: usize) {
643        self.rx_packets = self.rx_packets.saturating_add(1);
644        self.rx_bytes = self.rx_bytes.saturating_add(bytes as u64);
645        self.relayed_rx_packets = self.relayed_rx_packets.saturating_add(1);
646        self.relayed_rx_bytes = self.relayed_rx_bytes.saturating_add(bytes as u64);
647        let stats = self.type_stats_mut(ty);
648        stats.rx_packets = stats.rx_packets.saturating_add(1);
649        stats.rx_bytes = stats.rx_bytes.saturating_add(bytes as u64);
650        stats.relayed_rx_packets = stats.relayed_rx_packets.saturating_add(1);
651        stats.relayed_rx_bytes = stats.relayed_rx_bytes.saturating_add(bytes as u64);
652    }
653
654    fn note_tx_failure(&mut self, ty: crate::DataType, retries: usize) {
655        self.tx_handler_failures = self.tx_handler_failures.saturating_add(1);
656        self.tx_retries = self.tx_retries.saturating_add(retries as u64);
657        self.total_handler_retries = self.total_handler_retries.saturating_add(retries as u64);
658        let stats = self.type_stats_mut(ty);
659        stats.handler_failures = stats.handler_failures.saturating_add(1);
660        stats.tx_retries = stats.tx_retries.saturating_add(retries as u64);
661    }
662
663    fn note_side_transport_full(&mut self, raw_bytes: usize, wire_bytes: usize) {
664        self.side_transport_full_frames = self.side_transport_full_frames.saturating_add(1);
665        self.note_side_transport_bytes(raw_bytes, wire_bytes);
666    }
667
668    fn note_side_transport_compact(
669        &mut self,
670        raw_bytes: usize,
671        wire_bytes: usize,
672        compact_overhead_bytes: usize,
673        used_timestamp_delta: bool,
674        omitted_timestamp: bool,
675    ) {
676        self.side_transport_compact_frames = self.side_transport_compact_frames.saturating_add(1);
677        if used_timestamp_delta {
678            self.side_transport_compact_delta_frames =
679                self.side_transport_compact_delta_frames.saturating_add(1);
680        }
681        if omitted_timestamp {
682            self.side_transport_compact_omitted_timestamp_frames = self
683                .side_transport_compact_omitted_timestamp_frames
684                .saturating_add(1);
685        }
686        self.note_side_transport_bytes(raw_bytes, wire_bytes);
687        self.side_transport_min_compact_overhead_bytes = Some(
688            self.side_transport_min_compact_overhead_bytes
689                .map_or(compact_overhead_bytes, |v| v.min(compact_overhead_bytes)),
690        );
691        self.side_transport_max_compact_overhead_bytes = Some(
692            self.side_transport_max_compact_overhead_bytes
693                .map_or(compact_overhead_bytes, |v| v.max(compact_overhead_bytes)),
694        );
695    }
696
697    fn note_side_transport_chunks(&mut self, chunks: usize) {
698        self.side_transport_chunk_frames = self
699            .side_transport_chunk_frames
700            .saturating_add(chunks as u64);
701    }
702
703    fn note_side_transport_bytes(&mut self, raw_bytes: usize, wire_bytes: usize) {
704        self.side_transport_raw_bytes = self
705            .side_transport_raw_bytes
706            .saturating_add(raw_bytes as u64);
707        self.side_transport_wire_bytes = self
708            .side_transport_wire_bytes
709            .saturating_add(wire_bytes as u64);
710        if raw_bytes > wire_bytes {
711            self.side_transport_bytes_saved = self
712                .side_transport_bytes_saved
713                .saturating_add((raw_bytes - wire_bytes) as u64);
714        }
715    }
716
717    fn note_side_transport_compact_target_miss(&mut self) {
718        self.side_transport_compact_target_misses =
719            self.side_transport_compact_target_misses.saturating_add(1);
720    }
721
722    fn note_side_transport_template_eviction(&mut self) {
723        self.side_transport_template_evictions =
724            self.side_transport_template_evictions.saturating_add(1);
725    }
726}
727
728#[derive(Clone, Copy, Debug, PartialEq, Eq)]
729enum RouteSelectionOrigin {
730    Flood,
731    Discovered,
732}
733
734/// Internal state, protected by RouterMutex so all public methods can take &self.
735struct RelayInner {
736    sides: Vec<Option<RelaySide>>,
737    route_overrides: BTreeMap<(Option<RelaySideId>, RelaySideId), bool>,
738    typed_route_overrides: BTreeMap<(Option<RelaySideId>, u32, RelaySideId), bool>,
739    route_weights: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
740    route_priorities: BTreeMap<(Option<RelaySideId>, RelaySideId), u32>,
741    source_route_modes: BTreeMap<Option<RelaySideId>, RouteSelectionMode>,
742    route_selection_cursors: BTreeMap<Option<RelaySideId>, u64>,
743    adaptive_route_stats: BTreeMap<RelaySideId, AdaptiveRouteStats>,
744    side_runtime_stats: BTreeMap<RelaySideId, SideRuntimeStatsInner>,
745    side_transport: BTreeMap<RelaySideId, SideTransportState>,
746    rx_queue: BoundedDeque<RelayRxItem>,
747    tx_queue: BoundedDeque<RelayTxItem>,
748    replay_queue: BoundedDeque<RelayReplayItem>,
749    recent_rx: BoundedDeque<u64>,
750    reliable_tx: BTreeMap<(RelaySideId, u32), ReliableTxState>,
751    reliable_rx: BTreeMap<(RelaySideId, u32), ReliableRxState>,
752    reliable_return_routes: BTreeMap<u64, ReliableReturnRouteState>,
753    reliable_return_route_order: VecDeque<u64>,
754    end_to_end_acked_destinations: BTreeMap<u64, BTreeSet<u64>>,
755    end_to_end_acked_destination_order: VecDeque<u64>,
756    total_handler_failures: u64,
757    total_handler_retries: u64,
758    #[cfg(feature = "discovery")]
759    discovery_routes: BTreeMap<RelaySideId, DiscoverySideState>,
760    #[cfg(feature = "discovery")]
761    discovery_cadence: DiscoveryCadenceState,
762    #[cfg(feature = "discovery")]
763    discovery_side_throttle: BTreeMap<RelaySideId, DiscoverySideThrottleState>,
764    #[cfg(all(feature = "discovery", feature = "timesync"))]
765    timesync_side_throttle: BTreeMap<RelaySideId, TimeSyncSideThrottleState>,
766}
767
768#[derive(Clone, Copy, Debug, PartialEq, Eq)]
769enum RelayQueueKind {
770    Rx,
771    Tx,
772    Replay,
773    Recent,
774    ReliableRxBuffer,
775    #[cfg(feature = "discovery")]
776    Discovery,
777}
778
779impl RelayInner {
780    #[cfg(feature = "discovery")]
781    fn topology_board_byte_cost(board: &TopologyBoardNode) -> usize {
782        board
783            .sender_id
784            .len()
785            .saturating_add(board.reachable_endpoints.len() * size_of::<crate::DataEndpoint>())
786            .saturating_add(
787                board
788                    .reachable_timesync_sources
789                    .iter()
790                    .map(|s| s.len())
791                    .sum::<usize>(),
792            )
793            .saturating_add(board.connections.iter().map(|s| s.len()).sum::<usize>())
794    }
795
796    #[cfg(feature = "discovery")]
797    fn discovery_sender_byte_cost(sender: &str, state: &DiscoverySenderState) -> usize {
798        sender
799            .len()
800            .saturating_add(state.reachable.len() * size_of::<crate::DataEndpoint>())
801            .saturating_add(
802                state
803                    .reachable_timesync_sources
804                    .iter()
805                    .map(|s| s.len())
806                    .sum::<usize>(),
807            )
808            .saturating_add(
809                state
810                    .topology_boards
811                    .iter()
812                    .map(Self::topology_board_byte_cost)
813                    .sum::<usize>(),
814            )
815            .saturating_add(size_of::<DiscoverySenderState>())
816    }
817
818    #[cfg(feature = "discovery")]
819    fn discovery_route_byte_cost(route: &DiscoverySideState) -> usize {
820        size_of::<DiscoverySideState>()
821            .saturating_add(route.reachable.len() * size_of::<crate::DataEndpoint>())
822            .saturating_add(
823                route
824                    .reachable_timesync_sources
825                    .iter()
826                    .map(|s| s.len())
827                    .sum::<usize>(),
828            )
829            .saturating_add(
830                route
831                    .announcers
832                    .iter()
833                    .map(|(sender, state)| Self::discovery_sender_byte_cost(sender, state))
834                    .sum::<usize>(),
835            )
836    }
837
838    #[cfg(feature = "discovery")]
839    fn discovery_bytes_used(&self) -> usize {
840        self.discovery_routes
841            .values()
842            .map(Self::discovery_route_byte_cost)
843            .sum()
844    }
845
846    #[inline]
847    fn reliable_rx_buffered_bytes(&self) -> usize {
848        self.reliable_rx
849            .values()
850            .flat_map(|state| state.buffered.values())
851            .map(|bytes| size_of::<Arc<[u8]>>() + bytes.len())
852            .sum()
853    }
854
855    #[inline]
856    fn shared_queue_bytes_used(&self) -> usize {
857        self.rx_queue
858            .bytes_used()
859            .saturating_add(self.tx_queue.bytes_used())
860            .saturating_add(self.replay_queue.bytes_used())
861            .saturating_add(self.recent_rx.max_bytes())
862            .saturating_add(self.reliable_rx_buffered_bytes())
863            .saturating_add(crate::config::schema_bytes_used())
864            .saturating_add({
865                #[cfg(feature = "discovery")]
866                {
867                    self.discovery_bytes_used()
868                }
869                #[cfg(not(feature = "discovery"))]
870                {
871                    0
872                }
873            })
874    }
875
876    fn reliable_rx_buffer_len(&self) -> usize {
877        self.reliable_rx
878            .values()
879            .map(|state| state.buffered.len())
880            .sum()
881    }
882
883    fn pop_reliable_rx_buffered(&mut self) -> Option<Arc<[u8]>> {
884        let key = self
885            .reliable_rx
886            .iter()
887            .find_map(|(key, state)| (!state.buffered.is_empty()).then_some(*key))?;
888        self.reliable_rx
889            .get_mut(&key)?
890            .buffered
891            .pop_first()
892            .map(|(_, v)| v)
893    }
894
895    fn pop_shared_queue_item(&mut self, preferred: RelayQueueKind) -> bool {
896        match preferred {
897            RelayQueueKind::Rx => self.rx_queue.pop_front().is_some(),
898            RelayQueueKind::Tx => self.tx_queue.pop_front().is_some(),
899            RelayQueueKind::Replay => self.replay_queue.pop_front().is_some(),
900            RelayQueueKind::Recent => self.recent_rx.pop_front().is_some(),
901            RelayQueueKind::ReliableRxBuffer => self.pop_reliable_rx_buffered().is_some(),
902            #[cfg(feature = "discovery")]
903            RelayQueueKind::Discovery => self.pop_discovery_route(),
904        }
905    }
906
907    #[cfg(feature = "discovery")]
908    fn pop_discovery_route(&mut self) -> bool {
909        let Some((&side, _)) = self
910            .discovery_routes
911            .iter()
912            .min_by_key(|(_, route)| route.last_seen_ms)
913        else {
914            return false;
915        };
916        self.discovery_routes.remove(&side);
917        Self::queue_budget_warning("topology route evicted because MAX_QUEUE_BUDGET is full");
918        true
919    }
920
921    fn largest_shared_queue(&self) -> Option<RelayQueueKind> {
922        let candidates = [
923            (
924                RelayQueueKind::Rx,
925                self.rx_queue.bytes_used(),
926                self.rx_queue.len(),
927            ),
928            (
929                RelayQueueKind::Tx,
930                self.tx_queue.bytes_used(),
931                self.tx_queue.len(),
932            ),
933            (
934                RelayQueueKind::Replay,
935                self.replay_queue.bytes_used(),
936                self.replay_queue.len(),
937            ),
938            (RelayQueueKind::Recent, 0, 0),
939            (
940                RelayQueueKind::ReliableRxBuffer,
941                self.reliable_rx_buffered_bytes(),
942                self.reliable_rx_buffer_len(),
943            ),
944            #[cfg(feature = "discovery")]
945            (
946                RelayQueueKind::Discovery,
947                self.discovery_bytes_used(),
948                self.discovery_routes.len(),
949            ),
950        ];
951        candidates
952            .into_iter()
953            .filter(|(_, bytes, len)| *bytes > 0 && *len > 0)
954            .max_by_key(|(kind, bytes, _)| {
955                (
956                    *bytes,
957                    if *kind == RelayQueueKind::ReliableRxBuffer {
958                        0
959                    } else {
960                        1
961                    },
962                )
963            })
964            .map(|(kind, _, _)| kind)
965    }
966
967    fn make_shared_queue_room(
968        &mut self,
969        incoming_cost: usize,
970        preferred: RelayQueueKind,
971    ) -> TelemetryResult<()> {
972        if incoming_cost > MAX_QUEUE_BUDGET {
973            return Err(TelemetryError::PacketTooLarge(
974                "Item exceeds maximum shared queue budget",
975            ));
976        }
977
978        while self.shared_queue_bytes_used().saturating_add(incoming_cost) > MAX_QUEUE_BUDGET {
979            let victim = self.largest_shared_queue().unwrap_or(preferred);
980            if victim == RelayQueueKind::Discovery {
981                Self::queue_budget_warning("topology data is using the largest queue budget share");
982            }
983            if !self.pop_shared_queue_item(victim) && !self.pop_shared_queue_item(preferred) {
984                return Err(TelemetryError::PacketTooLarge(
985                    "Item exceeds maximum shared queue budget",
986                ));
987            }
988        }
989
990        Ok(())
991    }
992
993    #[inline]
994    fn queue_budget_warning(msg: &str) {
995        #[cfg(feature = "std")]
996        eprintln!("sedsnet queue budget warning: {msg}");
997        let _ = msg;
998    }
999
1000    #[cfg(feature = "discovery")]
1001    fn fit_discovery_budget(&mut self) {
1002        while self.shared_queue_bytes_used() > MAX_QUEUE_BUDGET {
1003            if !self.pop_discovery_route() {
1004                break;
1005            }
1006        }
1007    }
1008
1009    fn push_rx(&mut self, item: RelayRxItem) -> TelemetryResult<()> {
1010        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Rx)?;
1011        self.rx_queue
1012            .push_back_prioritized(item, |queued| queued.priority)
1013    }
1014
1015    fn push_tx(&mut self, item: RelayTxItem) -> TelemetryResult<()> {
1016        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Tx)?;
1017        self.tx_queue
1018            .push_back_prioritized(item, |queued| queued.priority)
1019    }
1020
1021    fn push_replay(&mut self, item: RelayReplayItem) -> TelemetryResult<()> {
1022        self.make_shared_queue_room(item.byte_cost(), RelayQueueKind::Replay)?;
1023        self.replay_queue
1024            .push_back_prioritized(item, |queued| queued.priority)
1025    }
1026
1027    fn push_recent_rx(&mut self, id: u64) -> TelemetryResult<()> {
1028        while self.recent_rx.len() >= MAX_RECENT_RX_IDS {
1029            let _ = self.recent_rx.pop_front();
1030        }
1031        self.make_shared_queue_room(0, RelayQueueKind::Recent)?;
1032        self.recent_rx.push_back(id)
1033    }
1034
1035    fn buffer_reliable_rx(
1036        &mut self,
1037        side: RelaySideId,
1038        ty: crate::DataType,
1039        seq: u32,
1040        bytes: Arc<[u8]>,
1041    ) -> TelemetryResult<()> {
1042        let key = Relay::reliable_key(side, ty);
1043        if self
1044            .reliable_rx
1045            .get(&key)
1046            .is_some_and(|state| state.buffered.contains_key(&seq))
1047        {
1048            return Ok(());
1049        }
1050        let cost = size_of::<Arc<[u8]>>() + bytes.len();
1051        self.make_shared_queue_room(cost, RelayQueueKind::ReliableRxBuffer)?;
1052        let rx_state = self
1053            .reliable_rx
1054            .entry(key)
1055            .or_insert_with(|| ReliableRxState {
1056                expected_seq: 1,
1057                buffered: BTreeMap::new(),
1058            });
1059        if rx_state.buffered.len() >= RELIABLE_MAX_PENDING {
1060            let _ = rx_state.buffered.pop_first();
1061        }
1062        rx_state.buffered.insert(seq, bytes);
1063        Ok(())
1064    }
1065}
1066
1067/// Relay that fans out packets from one side to all others.
1068/// - Supports both packed bytes and full Packet.
1069/// - Has RX & TX queues, like Router.
1070/// - Uses a Clock for the *_with_timeout APIs, same style as Router.
1071pub struct Relay {
1072    sender: RouterMutex<Arc<str>>,
1073    state: RouterMutex<RelayInner>,
1074    side_tx_gate: ReentryGate,
1075    clock: Box<dyn Clock + Send + Sync>,
1076}
1077
1078enum RemoteSidePlan {
1079    Target(Vec<RelaySideId>),
1080}
1081
1082impl Relay {
1083    const END_TO_END_ACK_SENDER: &'static str = "E2EACK";
1084    const END_TO_END_ACK_PREFIX: &'static str = "E2EACK:";
1085
1086    fn relay_item_priority(data: &RelayItem) -> TelemetryResult<u8> {
1087        let ty = match data {
1088            RelayItem::Packet(pkt) => pkt.data_type(),
1089            RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
1090        };
1091        Ok(message_priority(ty))
1092    }
1093
1094    #[inline]
1095    fn is_side_tx_busy(err: &TelemetryError) -> bool {
1096        matches!(err, TelemetryError::Io("side tx busy"))
1097    }
1098
1099    fn process_replay_queue_item(&self) -> TelemetryResult<bool> {
1100        let Some(item) = ({
1101            let mut st = self.state.lock();
1102            st.replay_queue.pop_front()
1103        }) else {
1104            return Ok(false);
1105        };
1106        let frame = wire_format::peek_frame_info(item.bytes.as_ref())?;
1107        let ty = frame.envelope.ty;
1108        let Some(hdr) = frame.reliable else {
1109            return Ok(false);
1110        };
1111        {
1112            let mut st = self.state.lock();
1113            let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1114            if !tx_state.sent.contains_key(&hdr.seq) {
1115                return Ok(false);
1116            }
1117        }
1118        if let Err(e) = self.send_reliable_raw_to_side(item.dst, item.bytes.clone()) {
1119            if Self::is_side_tx_busy(&e) {
1120                let mut st = self.state.lock();
1121                st.push_replay(item)?;
1122                return Ok(false);
1123            }
1124            return Err(e);
1125        }
1126        let mut st = self.state.lock();
1127        let tx_state = self.reliable_tx_state_mut(&mut st, item.dst, ty);
1128        if let Some(sent) = tx_state.sent.get_mut(&hdr.seq) {
1129            sent.last_send_ms = self.clock.now_ms();
1130            sent.queued = false;
1131        }
1132        Ok(true)
1133    }
1134
1135    fn pop_ready_tx_item(
1136        &self,
1137    ) -> Option<(
1138        Option<RelaySideId>,
1139        RelaySideId,
1140        RelayTxHandlerFn,
1141        RelaySideOptions,
1142        RelayItem,
1143    )> {
1144        let mut st = self.state.lock();
1145        if let Some(item) = st.tx_queue.pop_front() {
1146            let side = st.sides.get(item.dst).and_then(|side| side.clone());
1147            side.map(|s| (item.src, item.dst, s.tx_handler, s.opts, item.data))
1148        } else {
1149            None
1150        }
1151    }
1152
1153    fn send_tx_item(
1154        &self,
1155        src: Option<RelaySideId>,
1156        dst: RelaySideId,
1157        handler: RelayTxHandlerFn,
1158        opts: RelaySideOptions,
1159        data: RelayItem,
1160    ) -> TelemetryResult<bool> {
1161        let allowed = {
1162            let mut st = self.state.lock();
1163            let ty = match &data {
1164                RelayItem::Packet(pkt) => Some(pkt.data_type()),
1165                RelayItem::Packed(bytes) => Some(wire_format::peek_envelope(bytes.as_ref())?.ty),
1166            };
1167            let route_allowed = self.route_allowed_locked(&st, src, ty, dst);
1168            #[cfg(all(feature = "discovery", feature = "timesync"))]
1169            let timesync_allowed = ty
1170                .map(|ty| {
1171                    Self::timesync_allowed_for_side_locked(&mut st, dst, ty, self.clock.now_ms())
1172                })
1173                .unwrap_or(true);
1174            #[cfg(not(all(feature = "discovery", feature = "timesync")))]
1175            let timesync_allowed = true;
1176            route_allowed && timesync_allowed
1177        };
1178        if !allowed {
1179            return Ok(false);
1180        }
1181        if opts.reliable_enabled && matches!(handler, RelayTxHandlerFn::Packed(_)) {
1182            self.send_reliable_to_side(dst, data)?;
1183            Ok(true)
1184        } else if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1185            self.call_tx_handler(dst, &handler, &adjusted)?;
1186            Ok(true)
1187        } else {
1188            Ok(false)
1189        }
1190    }
1191
1192    /// Create a new relay with the given clock.
1193    pub fn new(clock: Box<dyn Clock + Send + Sync>) -> Self {
1194        Self {
1195            sender: RouterMutex::new(Arc::from("RELAY")),
1196            state: RouterMutex::new(RelayInner {
1197                sides: Vec::new(),
1198                route_overrides: BTreeMap::new(),
1199                typed_route_overrides: BTreeMap::new(),
1200                route_weights: BTreeMap::new(),
1201                route_priorities: BTreeMap::new(),
1202                source_route_modes: BTreeMap::new(),
1203                route_selection_cursors: BTreeMap::new(),
1204                adaptive_route_stats: BTreeMap::new(),
1205                side_runtime_stats: BTreeMap::new(),
1206                side_transport: BTreeMap::new(),
1207                rx_queue: BoundedDeque::new(MAX_QUEUE_BUDGET, STARTING_QUEUE_SIZE, QUEUE_GROW_STEP),
1208                tx_queue: BoundedDeque::new(MAX_QUEUE_BUDGET, STARTING_QUEUE_SIZE, QUEUE_GROW_STEP),
1209                replay_queue: BoundedDeque::new(
1210                    MAX_QUEUE_BUDGET,
1211                    STARTING_QUEUE_SIZE,
1212                    QUEUE_GROW_STEP,
1213                ),
1214                recent_rx: BoundedDeque::new(
1215                    RECENT_RX_QUEUE_BYTES.max(1),
1216                    RECENT_RX_QUEUE_BYTES.max(1),
1217                    QUEUE_GROW_STEP,
1218                ),
1219                reliable_tx: BTreeMap::new(),
1220                reliable_rx: BTreeMap::new(),
1221                reliable_return_routes: BTreeMap::new(),
1222                reliable_return_route_order: VecDeque::new(),
1223                end_to_end_acked_destinations: BTreeMap::new(),
1224                end_to_end_acked_destination_order: VecDeque::new(),
1225                total_handler_failures: 0,
1226                total_handler_retries: 0,
1227                #[cfg(feature = "discovery")]
1228                discovery_routes: BTreeMap::new(),
1229                #[cfg(feature = "discovery")]
1230                discovery_cadence: DiscoveryCadenceState::default(),
1231                #[cfg(feature = "discovery")]
1232                discovery_side_throttle: BTreeMap::new(),
1233                #[cfg(all(feature = "discovery", feature = "timesync"))]
1234                timesync_side_throttle: BTreeMap::new(),
1235            }),
1236            side_tx_gate: ReentryGate::new(),
1237            clock,
1238        }
1239    }
1240
1241    #[inline]
1242    fn sender_arc(&self) -> Arc<str> {
1243        self.sender.lock().clone()
1244    }
1245
1246    #[inline]
1247    pub fn sender(&self) -> Arc<str> {
1248        self.sender_arc()
1249    }
1250
1251    pub fn set_sender<S: AsRef<str>>(&self, sender: S) {
1252        *self.sender.lock() = Arc::from(sender.as_ref());
1253    }
1254
1255    #[inline]
1256    fn try_enter_side_tx(&self) -> Option<ReentryGuard<'_>> {
1257        self.side_tx_gate.try_enter()
1258    }
1259
1260    #[inline]
1261    fn side_tx_active(&self) -> bool {
1262        self.side_tx_gate.is_active()
1263    }
1264
1265    #[inline]
1266    fn side_ref(st: &RelayInner, side: RelaySideId) -> TelemetryResult<&RelaySide> {
1267        st.sides
1268            .get(side)
1269            .and_then(|side| side.as_ref())
1270            .ok_or(TelemetryError::HandlerError("relay: invalid side id"))
1271    }
1272
1273    fn note_side_tx_success(
1274        &self,
1275        side: RelaySideId,
1276        ty: crate::DataType,
1277        bytes: usize,
1278        attempts: usize,
1279    ) {
1280        let mut st = self.state.lock();
1281        let entry = st.side_runtime_stats.entry(side).or_default();
1282        entry.note_tx(ty, bytes, attempts.saturating_sub(1));
1283    }
1284
1285    fn note_side_tx_failure(&self, side: RelaySideId, ty: crate::DataType, attempts: usize) {
1286        let mut st = self.state.lock();
1287        st.total_handler_failures = st.total_handler_failures.saturating_add(1);
1288        st.total_handler_retries = st.total_handler_retries.saturating_add(attempts as u64);
1289        let entry = st.side_runtime_stats.entry(side).or_default();
1290        entry.note_tx_failure(ty, attempts);
1291    }
1292
1293    fn note_side_rx(&self, side: RelaySideId, ty: crate::DataType, bytes: usize) {
1294        let mut st = self.state.lock();
1295        let entry = st.side_runtime_stats.entry(side).or_default();
1296        entry.note_rx(ty, bytes);
1297    }
1298
1299    #[inline]
1300    fn ensure_side_ingress_enabled(&self, side: RelaySideId) -> TelemetryResult<()> {
1301        let st = self.state.lock();
1302        let side_ref = Self::side_ref(&st, side)?;
1303        if side_ref.opts.ingress_enabled {
1304            Ok(())
1305        } else {
1306            Err(TelemetryError::HandlerError(
1307                "relay: ingress disabled for side id",
1308            ))
1309        }
1310    }
1311
1312    #[inline]
1313    fn route_allowed_locked(
1314        &self,
1315        st: &RelayInner,
1316        src: Option<RelaySideId>,
1317        ty: Option<crate::DataType>,
1318        dst: RelaySideId,
1319    ) -> bool {
1320        let Ok(dst_side) = Self::side_ref(st, dst) else {
1321            return false;
1322        };
1323        if !dst_side.opts.egress_enabled {
1324            return false;
1325        }
1326        if let Some(src_id) = src {
1327            let Ok(src_side) = Self::side_ref(st, src_id) else {
1328                return false;
1329            };
1330            if !src_side.opts.ingress_enabled || src_id == dst {
1331                return false;
1332            }
1333        }
1334        let base_allowed = st.route_overrides.get(&(src, dst)).copied().unwrap_or(true);
1335        if !base_allowed {
1336            return false;
1337        }
1338
1339        let Some(ty) = ty else {
1340            return true;
1341        };
1342        if st
1343            .typed_route_overrides
1344            .keys()
1345            .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1346        {
1347            return st
1348                .typed_route_overrides
1349                .get(&(src, ty.as_u32(), dst))
1350                .copied()
1351                .unwrap_or(false);
1352        }
1353        true
1354    }
1355
1356    fn has_typed_route_overrides_locked(
1357        st: &RelayInner,
1358        src: Option<RelaySideId>,
1359        ty: crate::DataType,
1360    ) -> bool {
1361        st.typed_route_overrides
1362            .keys()
1363            .any(|(typed_src, typed_ty, _)| *typed_src == src && *typed_ty == ty.as_u32())
1364    }
1365
1366    fn eligible_side_ids_locked(
1367        &self,
1368        st: &RelayInner,
1369        src: Option<RelaySideId>,
1370        ty: Option<crate::DataType>,
1371        restrict_link_local: bool,
1372    ) -> Vec<RelaySideId> {
1373        st.sides
1374            .iter()
1375            .enumerate()
1376            .filter_map(|(side_id, side)| {
1377                let side = side.as_ref()?;
1378                if restrict_link_local && !side.opts.link_local_enabled {
1379                    return None;
1380                }
1381                if self.route_allowed_locked(st, src, ty, side_id) {
1382                    Some(side_id)
1383                } else {
1384                    None
1385                }
1386            })
1387            .collect()
1388    }
1389
1390    fn apply_route_selection_locked(
1391        &self,
1392        st: &mut RelayInner,
1393        src: Option<RelaySideId>,
1394        mut sides: Vec<RelaySideId>,
1395        origin: RouteSelectionOrigin,
1396    ) -> Vec<RelaySideId> {
1397        if sides.len() <= 1 {
1398            return sides;
1399        }
1400
1401        let selection_mode = st.source_route_modes.get(&src).copied();
1402        if selection_mode.is_none() && origin == RouteSelectionOrigin::Discovered {
1403            return self.apply_adaptive_discovery_selection_locked(st, src, sides);
1404        }
1405
1406        match selection_mode.unwrap_or(RouteSelectionMode::Fanout) {
1407            RouteSelectionMode::Fanout => sides,
1408            RouteSelectionMode::Weighted => {
1409                sides.sort_unstable();
1410                let total_weight = sides.iter().fold(0_u64, |acc, side| {
1411                    acc + u64::from(st.route_weights.get(&(src, *side)).copied().unwrap_or(1))
1412                });
1413                if total_weight == 0 {
1414                    return Vec::new();
1415                }
1416                let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1417                let pick = *cursor % total_weight;
1418                *cursor = cursor.wrapping_add(1);
1419                let mut remaining = pick;
1420                for side in sides {
1421                    let weight =
1422                        u64::from(st.route_weights.get(&(src, side)).copied().unwrap_or(1));
1423                    if remaining < weight {
1424                        return vec![side];
1425                    }
1426                    remaining -= weight;
1427                }
1428                Vec::new()
1429            }
1430            RouteSelectionMode::Failover => {
1431                sides.sort_by_key(|side| {
1432                    (
1433                        st.route_priorities.get(&(src, *side)).copied().unwrap_or(0),
1434                        *side,
1435                    )
1436                });
1437                sides.truncate(1);
1438                sides
1439            }
1440        }
1441    }
1442
1443    fn apply_adaptive_discovery_selection_locked(
1444        &self,
1445        st: &mut RelayInner,
1446        src: Option<RelaySideId>,
1447        mut sides: Vec<RelaySideId>,
1448    ) -> Vec<RelaySideId> {
1449        sides.sort_unstable();
1450        let mut unmeasured: Vec<_> = sides
1451            .iter()
1452            .copied()
1453            .filter(|side| !st.adaptive_route_stats.contains_key(side))
1454            .collect();
1455        if !unmeasured.is_empty() {
1456            let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1457            let pick = (*cursor as usize) % unmeasured.len();
1458            *cursor = cursor.wrapping_add(1);
1459            return vec![unmeasured.swap_remove(pick)];
1460        }
1461
1462        let now_ms = self.clock.now_ms();
1463        let total_weight = sides.iter().fold(0_u64, |acc, side| {
1464            acc + st
1465                .adaptive_route_stats
1466                .get(side)
1467                .map(|stats| stats.weight(now_ms))
1468                .unwrap_or(1)
1469        });
1470        if total_weight == 0 {
1471            sides.truncate(1);
1472            return sides;
1473        }
1474
1475        let cursor = st.route_selection_cursors.entry(src).or_insert(0);
1476        let pick = *cursor % total_weight;
1477        *cursor = cursor.wrapping_add(1);
1478        let mut remaining = pick;
1479        for side in sides {
1480            let weight = st
1481                .adaptive_route_stats
1482                .get(&side)
1483                .map(|stats| stats.weight(now_ms))
1484                .unwrap_or(1);
1485            if remaining < weight {
1486                return vec![side];
1487            }
1488            remaining -= weight;
1489        }
1490        Vec::new()
1491    }
1492
1493    fn record_side_tx_sample(
1494        &self,
1495        side: RelaySideId,
1496        bytes: usize,
1497        started_ms: u64,
1498        ended_ms: u64,
1499    ) {
1500        let sample_ms = ended_ms.saturating_sub(started_ms).max(1);
1501        let sample_bps = ((bytes as u128).saturating_mul(1000) / u128::from(sample_ms))
1502            .min(u128::from(u64::MAX)) as u64;
1503        let mut st = self.state.lock();
1504        st.adaptive_route_stats
1505            .entry(side)
1506            .or_default()
1507            .observe(bytes, sample_bps, ended_ms);
1508    }
1509
1510    /// Seed adaptive route selection with a transport-measured link probe.
1511    ///
1512    /// Call this after a side-specific bring-up probe, or whenever the transport already knows the
1513    /// duration for a frame. The relay does not emit synthetic probe frames by itself.
1514    pub fn note_side_link_probe_sample(
1515        &self,
1516        side: RelaySideId,
1517        bytes: usize,
1518        duration_ms: u64,
1519    ) -> TelemetryResult<()> {
1520        {
1521            let st = self.state.lock();
1522            let _ = Self::side_ref(&st, side).map_err(|_| TelemetryError::BadArg)?;
1523        }
1524        let ended_ms = self.clock.now_ms();
1525        self.record_side_tx_sample(side, bytes, ended_ms.saturating_sub(duration_ms), ended_ms);
1526        Ok(())
1527    }
1528
1529    fn relay_item_wire_len(data: &RelayItem) -> TelemetryResult<usize> {
1530        match data {
1531            RelayItem::Packet(pkt) => Ok(wire_format::pack_packet(pkt).len()),
1532            RelayItem::Packed(bytes) => Ok(bytes.len()),
1533        }
1534    }
1535
1536    #[inline]
1537    fn decode_end_to_end_reliable_ack(payload: &[u8]) -> TelemetryResult<u64> {
1538        if payload.len() != 8 {
1539            return Err(TelemetryError::Unpack("bad reliable e2e ack payload"));
1540        }
1541        Ok(u64::from_le_bytes(payload[0..8].try_into().unwrap()))
1542    }
1543
1544    #[inline]
1545    fn is_end_to_end_ack_sender(sender: &str) -> bool {
1546        sender == Self::END_TO_END_ACK_SENDER || sender.starts_with(Self::END_TO_END_ACK_PREFIX)
1547    }
1548
1549    #[inline]
1550    fn sender_hash(sender: &str) -> u64 {
1551        hash_bytes_u64(0x517C_C1B7_2722_0A95, sender.as_bytes())
1552    }
1553
1554    fn decode_end_to_end_ack_sender_hash(sender: &str) -> Option<u64> {
1555        sender
1556            .strip_prefix(Self::END_TO_END_ACK_PREFIX)
1557            .filter(|sender| !sender.is_empty())
1558            .map(Self::sender_hash)
1559    }
1560
1561    #[cfg(feature = "discovery")]
1562    fn is_end_to_end_destination_sender(&self, sender: &str) -> bool {
1563        sender != self.sender_arc().as_ref() && !Self::is_end_to_end_ack_sender(sender)
1564    }
1565
1566    /// Extract the logical packet ID targeted by an end-to-end reliable ACK item.
1567    ///
1568    /// Relay queues can hold either decoded packets or packed frames. This
1569    /// helper normalizes both forms so relay ACK-routing logic can treat them
1570    /// uniformly.
1571    ///
1572    /// Only relay-visible end-to-end `ReliableAck` packets qualify here.
1573    /// Unrelated traffic returns `Ok(None)`.
1574    fn reliable_control_target_packet_id(data: &RelayItem) -> TelemetryResult<Option<u64>> {
1575        match data {
1576            RelayItem::Packet(pkt) => {
1577                if pkt.data_type() != crate::DataType::ReliableAck
1578                    || !Self::is_end_to_end_ack_sender(pkt.sender())
1579                {
1580                    return Ok(None);
1581                }
1582                Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1583            }
1584            RelayItem::Packed(bytes) => {
1585                if wire_format::peek_frame_info(bytes.as_ref())
1586                    .ok()
1587                    .is_some_and(|frame| frame.ack_only())
1588                {
1589                    return Ok(None);
1590                }
1591                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1592                if pkt.data_type() != crate::DataType::ReliableAck
1593                    || !Self::is_end_to_end_ack_sender(pkt.sender())
1594                {
1595                    return Ok(None);
1596                }
1597                Self::decode_end_to_end_reliable_ack(pkt.payload()).map(Some)
1598            }
1599        }
1600    }
1601
1602    fn note_reliable_return_route(&self, side: RelaySideId, packet_id: u64) {
1603        let mut st = self.state.lock();
1604        Self::remember_reliable_return_route_locked(&mut st, packet_id);
1605        st.reliable_return_routes
1606            .insert(packet_id, ReliableReturnRouteState { side });
1607    }
1608
1609    /// Refresh or insert `packet_id` in the bounded reliable return-route cache.
1610    ///
1611    /// The relay uses this cache to route end-to-end acknowledgements back
1612    /// toward the source side that most recently forwarded the corresponding
1613    /// reliable data packet.
1614    fn remember_reliable_return_route_locked(st: &mut RelayInner, packet_id: u64) {
1615        let cap = RELIABLE_MAX_RETURN_ROUTES.max(1);
1616        st.reliable_return_route_order
1617            .retain(|id| st.reliable_return_routes.contains_key(id) && *id != packet_id);
1618        while st.reliable_return_route_order.len() >= cap {
1619            if let Some(oldest) = st.reliable_return_route_order.pop_front() {
1620                st.reliable_return_routes.remove(&oldest);
1621            } else {
1622                break;
1623            }
1624        }
1625        st.reliable_return_route_order.push_back(packet_id);
1626    }
1627
1628    fn note_end_to_end_acked_destination_locked(
1629        st: &mut RelayInner,
1630        packet_id: u64,
1631        sender_hash: u64,
1632    ) {
1633        let entry_cap = RELIABLE_MAX_END_TO_END_ACK_CACHE.max(1);
1634        st.end_to_end_acked_destination_order
1635            .retain(|id| st.end_to_end_acked_destinations.contains_key(id) && *id != packet_id);
1636        while st.end_to_end_acked_destination_order.len() >= entry_cap {
1637            if let Some(oldest) = st.end_to_end_acked_destination_order.pop_front() {
1638                st.end_to_end_acked_destinations.remove(&oldest);
1639            } else {
1640                break;
1641            }
1642        }
1643        st.end_to_end_acked_destination_order.push_back(packet_id);
1644
1645        let acked = st
1646            .end_to_end_acked_destinations
1647            .entry(packet_id)
1648            .or_default();
1649        let sender_cap = RELIABLE_MAX_END_TO_END_PENDING.max(1);
1650        if acked.len() < sender_cap || acked.contains(&sender_hash) {
1651            acked.insert(sender_hash);
1652        }
1653    }
1654
1655    #[inline]
1656    fn reliable_key(side: RelaySideId, ty: crate::DataType) -> (RelaySideId, u32) {
1657        (side, ty.as_u32())
1658    }
1659
1660    fn reliable_tx_state_mut<'a>(
1661        &'a self,
1662        st: &'a mut RelayInner,
1663        side: RelaySideId,
1664        ty: crate::DataType,
1665    ) -> &'a mut ReliableTxState {
1666        let key = Self::reliable_key(side, ty);
1667        st.reliable_tx
1668            .entry(key)
1669            .or_insert_with(|| ReliableTxState {
1670                next_seq: 1,
1671                sent_order: VecDeque::new(),
1672                sent: BTreeMap::new(),
1673            })
1674    }
1675
1676    fn reliable_rx_state_mut<'a>(
1677        &'a self,
1678        st: &'a mut RelayInner,
1679        side: RelaySideId,
1680        ty: crate::DataType,
1681    ) -> &'a mut ReliableRxState {
1682        let key = Self::reliable_key(side, ty);
1683        st.reliable_rx
1684            .entry(key)
1685            .or_insert_with(|| ReliableRxState {
1686                expected_seq: 1,
1687                buffered: BTreeMap::new(),
1688            })
1689    }
1690
1691    fn handle_reliable_ack(&self, side: RelaySideId, ty: crate::DataType, ack: u32) {
1692        let mut st = self.state.lock();
1693        let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1694        if matches!(reliable_mode(ty), crate::ReliableMode::Unordered) {
1695            tx_state.sent.remove(&ack);
1696            tx_state.sent_order.retain(|seq| *seq != ack);
1697            return;
1698        }
1699
1700        while let Some(seq) = tx_state.sent_order.front().copied() {
1701            if seq > ack {
1702                break;
1703            }
1704            tx_state.sent_order.pop_front();
1705            tx_state.sent.remove(&seq);
1706        }
1707    }
1708
1709    fn handle_reliable_partial_ack(&self, side: RelaySideId, ty: crate::DataType, seq: u32) {
1710        let mut st = self.state.lock();
1711        let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1712        if let Some(sent) = tx_state.sent.get_mut(&seq) {
1713            sent.partial_acked = true;
1714        }
1715    }
1716
1717    fn reliable_control_packet(
1718        &self,
1719        control_ty: crate::DataType,
1720        ty: crate::DataType,
1721        seq: u32,
1722    ) -> TelemetryResult<Packet> {
1723        let sender = self.sender_arc();
1724        Packet::new(
1725            control_ty,
1726            message_meta(control_ty).endpoints,
1727            sender.as_ref(),
1728            self.clock.now_ms(),
1729            crate::router::encode_slice_le(&[ty.as_u32(), seq]),
1730        )
1731    }
1732
1733    fn queue_reliable_ack(
1734        &self,
1735        side: RelaySideId,
1736        ty: crate::DataType,
1737        seq: u32,
1738    ) -> TelemetryResult<()> {
1739        let pkt = self.reliable_control_packet(crate::DataType::ReliableAck, ty, seq)?;
1740        let data = RelayItem::Packet(Arc::new(pkt));
1741        let priority = Self::relay_item_priority(&data)?;
1742        let mut st = self.state.lock();
1743        st.push_tx(RelayTxItem {
1744            src: None,
1745            dst: side,
1746            data,
1747            priority,
1748        })?;
1749        Ok(())
1750    }
1751
1752    fn queue_reliable_packet_request(
1753        &self,
1754        side: RelaySideId,
1755        ty: crate::DataType,
1756        seq: u32,
1757    ) -> TelemetryResult<()> {
1758        let pkt = self.reliable_control_packet(crate::DataType::ReliablePacketRequest, ty, seq)?;
1759        let data = RelayItem::Packet(Arc::new(pkt));
1760        let priority = Self::relay_item_priority(&data)?;
1761        let mut st = self.state.lock();
1762        st.push_tx(RelayTxItem {
1763            src: None,
1764            dst: side,
1765            data,
1766            priority,
1767        })?;
1768        Ok(())
1769    }
1770
1771    fn queue_reliable_partial_ack(
1772        &self,
1773        side: RelaySideId,
1774        ty: crate::DataType,
1775        seq: u32,
1776    ) -> TelemetryResult<()> {
1777        let pkt = self.reliable_control_packet(crate::DataType::ReliablePartialAck, ty, seq)?;
1778        let data = RelayItem::Packet(Arc::new(pkt));
1779        let priority = Self::relay_item_priority(&data)?;
1780        let mut st = self.state.lock();
1781        st.push_tx(RelayTxItem {
1782            src: None,
1783            dst: side,
1784            data,
1785            priority,
1786        })?;
1787        Ok(())
1788    }
1789
1790    fn queue_reliable_retransmit(
1791        &self,
1792        side: RelaySideId,
1793        ty: crate::DataType,
1794        seq: u32,
1795    ) -> TelemetryResult<()> {
1796        let mut queued = None;
1797        {
1798            let mut st = self.state.lock();
1799            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1800            if let Some(sent) = tx_state.sent.get_mut(&seq)
1801                && !sent.queued
1802            {
1803                sent.queued = true;
1804                sent.partial_acked = false;
1805                queued = Some(sent.bytes.clone());
1806            }
1807        }
1808
1809        if let Some(bytes) = queued {
1810            let mut st = self.state.lock();
1811            st.push_replay(RelayReplayItem {
1812                dst: side,
1813                bytes,
1814                priority: message_priority(ty).saturating_add(16),
1815            })?;
1816        }
1817        Ok(())
1818    }
1819
1820    fn send_reliable_raw_to_side(
1821        &self,
1822        side: RelaySideId,
1823        bytes: Arc<[u8]>,
1824    ) -> TelemetryResult<()> {
1825        let (handler, opts) = {
1826            let st = self.state.lock();
1827            let side_ref = Self::side_ref(&st, side)?;
1828            if !side_ref.opts.egress_enabled {
1829                return Ok(());
1830            }
1831            (side_ref.tx_handler.clone(), side_ref.opts)
1832        };
1833
1834        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1835            return Err(TelemetryError::Io("side tx busy"));
1836        };
1837        let started_ms = self.clock.now_ms();
1838        let ty = wire_format::peek_envelope(bytes.as_ref())
1839            .map(|env| env.ty)
1840            .unwrap_or(crate::DataType::ReliableAck);
1841        let result = match handler {
1842            RelayTxHandlerFn::Packed(f) => {
1843                let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1844                let mut sent_bytes = 0usize;
1845                for frame in frames {
1846                    f(frame.as_ref())?;
1847                    sent_bytes = sent_bytes.saturating_add(frame.len());
1848                }
1849                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1850                self.note_side_tx_success(side, ty, sent_bytes, 1);
1851                return Ok(());
1852            }
1853            RelayTxHandlerFn::Packet(f) => {
1854                if wire_format::peek_frame_info(bytes.as_ref())
1855                    .ok()
1856                    .is_some_and(|frame| frame.ack_only())
1857                {
1858                    return Ok(());
1859                }
1860                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
1861                f(&pkt)
1862            }
1863        };
1864        if result.is_ok() {
1865            self.record_side_tx_sample(side, bytes.len(), started_ms, self.clock.now_ms());
1866            self.note_side_tx_success(side, ty, bytes.len(), 1);
1867        } else {
1868            self.note_side_tx_failure(side, ty, 1);
1869        }
1870        result
1871    }
1872
1873    fn send_reliable_to_side(&self, side: RelaySideId, data: RelayItem) -> TelemetryResult<()> {
1874        let (handler, opts, hop_reliable_enabled) = {
1875            let st = self.state.lock();
1876            let side_ref = Self::side_ref(&st, side)?;
1877            let opts = side_ref.opts;
1878            let hop_reliable_enabled = opts.reliable_enabled
1879                && !self.side_has_multiple_announcers_locked(&st, side, self.clock.now_ms());
1880            (side_ref.tx_handler.clone(), opts, hop_reliable_enabled)
1881        };
1882
1883        let RelayTxHandlerFn::Packed(f) = &handler else {
1884            return self.call_tx_handler(side, &handler, &data);
1885        };
1886
1887        if !hop_reliable_enabled {
1888            let mut adjusted_opts = opts;
1889            adjusted_opts.reliable_enabled = false;
1890            if let Some(adjusted) = self.adjust_reliable_for_side(adjusted_opts, data)? {
1891                return self.call_tx_handler(side, &handler, &adjusted);
1892            }
1893            return Ok(());
1894        }
1895
1896        let ty = match &data {
1897            RelayItem::Packet(pkt) => pkt.data_type(),
1898            RelayItem::Packed(bytes) => {
1899                let Ok(frame) = wire_format::peek_frame_info(bytes.as_ref()) else {
1900                    return self.call_tx_handler(side, &handler, &data);
1901                };
1902                frame.envelope.ty
1903            }
1904        };
1905
1906        if !is_reliable_type(ty) {
1907            if let Some(adjusted) = self.adjust_reliable_for_side(opts, data)? {
1908                self.call_tx_handler(side, &handler, &adjusted)?;
1909            }
1910            return Ok(());
1911        }
1912
1913        let (seq, flags) = {
1914            let mut st = self.state.lock();
1915            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1916            if tx_state.sent.len() >= RELIABLE_MAX_PENDING {
1917                return Err(TelemetryError::PacketTooLarge(
1918                    "relay reliable history full",
1919                ));
1920            }
1921            let seq = tx_state.next_seq;
1922            let next = tx_state.next_seq.wrapping_add(1);
1923            tx_state.next_seq = if next == 0 { 1 } else { next };
1924            let flags = match reliable_mode(ty) {
1925                crate::ReliableMode::Unordered => wire_format::RELIABLE_FLAG_UNORDERED,
1926                _ => 0,
1927            };
1928            (seq, flags)
1929        };
1930
1931        let bytes: Arc<[u8]> = match data {
1932            RelayItem::Packet(pkt) => wire_format::pack_packet_with_reliable(
1933                &pkt,
1934                wire_format::ReliableHeader { flags, seq, ack: 0 },
1935            ),
1936            RelayItem::Packed(bytes) => {
1937                let Some(rewritten) =
1938                    wire_format::rewrite_reliable_header_owned(bytes.as_ref(), flags, seq, 0)?
1939                else {
1940                    let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1941                        return Err(TelemetryError::Io("side tx busy"));
1942                    };
1943                    let started_ms = self.clock.now_ms();
1944                    let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1945                    let mut sent_bytes = 0usize;
1946                    for frame in frames {
1947                        f(frame.as_ref())?;
1948                        sent_bytes = sent_bytes.saturating_add(frame.len());
1949                    }
1950                    self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1951                    self.note_side_tx_success(side, ty, sent_bytes, 1);
1952                    return Ok(());
1953                };
1954                rewritten
1955            }
1956        };
1957
1958        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
1959            return Err(TelemetryError::Io("side tx busy"));
1960        };
1961        let started_ms = self.clock.now_ms();
1962        let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
1963        let mut sent_bytes = 0usize;
1964        for frame in frames {
1965            f(frame.as_ref())?;
1966            sent_bytes = sent_bytes.saturating_add(frame.len());
1967        }
1968        self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
1969        self.note_side_tx_success(side, ty, sent_bytes, 1);
1970
1971        {
1972            let mut st = self.state.lock();
1973            let tx_state = self.reliable_tx_state_mut(&mut st, side, ty);
1974            tx_state.sent_order.push_back(seq);
1975            tx_state.sent.insert(
1976                seq,
1977                ReliableSent {
1978                    bytes: bytes.clone(),
1979                    last_send_ms: self.clock.now_ms(),
1980                    retries: 0,
1981                    queued: false,
1982                    partial_acked: false,
1983                },
1984            );
1985        }
1986
1987        Ok(())
1988    }
1989
1990    fn item_route_info(
1991        &self,
1992        data: &RelayItem,
1993    ) -> TelemetryResult<(Vec<crate::DataEndpoint>, crate::DataType)> {
1994        match data {
1995            RelayItem::Packet(pkt) => {
1996                let mut eps = pkt.endpoints().to_vec();
1997                eps.sort_unstable();
1998                eps.dedup();
1999                Ok((eps, pkt.data_type()))
2000            }
2001            RelayItem::Packed(bytes) => {
2002                let env = wire_format::peek_envelope(bytes.as_ref())?;
2003                let mut eps: Vec<crate::DataEndpoint> = env.endpoints.iter().copied().collect();
2004                eps.sort_unstable();
2005                eps.dedup();
2006                Ok((eps, env.ty))
2007            }
2008        }
2009    }
2010
2011    fn endpoints_are_link_local_only(eps: &[crate::DataEndpoint]) -> bool {
2012        !eps.is_empty() && eps.iter().all(|ep| ep.is_link_local_only())
2013    }
2014
2015    fn item_target_senders(&self, data: &RelayItem) -> TelemetryResult<Arc<[u64]>> {
2016        match data {
2017            RelayItem::Packet(pkt) => Ok(Arc::from(pkt.wire_target_senders())),
2018            RelayItem::Packed(bytes) => {
2019                Ok(wire_format::peek_envelope(bytes.as_ref())?.target_senders)
2020            }
2021        }
2022    }
2023
2024    #[cfg(feature = "discovery")]
2025    fn has_explicit_route_policy_locked(
2026        st: &RelayInner,
2027        src: Option<RelaySideId>,
2028        ty: crate::DataType,
2029    ) -> bool {
2030        st.route_overrides
2031            .keys()
2032            .any(|(route_src, _)| *route_src == src)
2033            || Self::has_typed_route_overrides_locked(st, src, ty)
2034    }
2035
2036    #[cfg(feature = "discovery")]
2037    fn side_matches_target_senders_locked(
2038        st: &RelayInner,
2039        side: RelaySideId,
2040        target_senders: &[u64],
2041        now_ms: u64,
2042    ) -> bool {
2043        st.discovery_routes
2044            .get(&side)
2045            .map(|route| {
2046                if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2047                    return false;
2048                }
2049                route.announcers.values().any(|sender_state| {
2050                    if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2051                        return false;
2052                    }
2053                    sender_state
2054                        .topology_boards
2055                        .iter()
2056                        .any(|board| target_senders.contains(&Self::sender_hash(&board.sender_id)))
2057                })
2058            })
2059            .unwrap_or(false)
2060    }
2061
2062    fn remote_side_plan(
2063        &self,
2064        data: &RelayItem,
2065        exclude: RelaySideId,
2066    ) -> TelemetryResult<RemoteSidePlan> {
2067        #[cfg(feature = "discovery")]
2068        {
2069            let (eps, ty) = self.item_route_info(data)?;
2070            let target_senders = self.item_target_senders(data)?;
2071            let preferred_packet_id = Self::reliable_control_target_packet_id(data)?;
2072            if discovery::is_discovery_type(ty) {
2073                let mut st = self.state.lock();
2074                let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2075                return Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2076                    &mut st,
2077                    Some(exclude),
2078                    sides,
2079                    RouteSelectionOrigin::Flood,
2080                )));
2081            }
2082
2083            #[cfg(feature = "timesync")]
2084            let preferred_timesync_source = self.preferred_timesync_route_source(data, ty)?;
2085            #[cfg(not(feature = "timesync"))]
2086            let preferred_timesync_source: Option<String> = None;
2087            let mut st = self.state.lock();
2088            if let Some(packet_id) = preferred_packet_id {
2089                let target_side = self.allowed_target_side_locked(
2090                    &st,
2091                    exclude,
2092                    ty,
2093                    st.reliable_return_routes
2094                        .get(&packet_id)
2095                        .map(|route| route.side),
2096                );
2097                if let Some(side) = target_side {
2098                    #[cfg(feature = "timesync")]
2099                    if !Self::timesync_allowed_for_side_locked(
2100                        &mut st,
2101                        side,
2102                        ty,
2103                        self.clock.now_ms(),
2104                    ) {
2105                        return Ok(RemoteSidePlan::Target(Vec::new()));
2106                    }
2107                    return Ok(RemoteSidePlan::Target(vec![side]));
2108                }
2109                return Ok(RemoteSidePlan::Target(Vec::new()));
2110            }
2111            let restrict_link_local = Self::endpoints_are_link_local_only(&eps);
2112            let discovered_origin = if is_reliable_type(ty) {
2113                RouteSelectionOrigin::Flood
2114            } else {
2115                RouteSelectionOrigin::Discovered
2116            };
2117            if st.discovery_routes.is_empty() {
2118                let mut fallback = self.eligible_side_ids_locked(
2119                    &st,
2120                    Some(exclude),
2121                    Some(ty),
2122                    restrict_link_local,
2123                );
2124                #[cfg(feature = "timesync")]
2125                {
2126                    fallback = Self::filter_timesync_sides_locked(
2127                        &mut st,
2128                        ty,
2129                        self.clock.now_ms(),
2130                        fallback,
2131                    );
2132                }
2133                return Ok(RemoteSidePlan::Target(if fallback.len() == 1 {
2134                    fallback
2135                } else {
2136                    Vec::new()
2137                }));
2138            }
2139            let now_ms = self.clock.now_ms();
2140            let mut had_exact = false;
2141            let mut exact_targets = Vec::new();
2142            let mut had_known = false;
2143            let mut generic_targets = Vec::new();
2144
2145            for (&side, route) in st.discovery_routes.iter() {
2146                if side == exclude
2147                    || now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS
2148                {
2149                    continue;
2150                }
2151                if restrict_link_local
2152                    && st
2153                        .sides
2154                        .get(side)
2155                        .and_then(|side| side.as_ref())
2156                        .map(|s| !s.opts.link_local_enabled)
2157                        .unwrap_or(true)
2158                {
2159                    continue;
2160                }
2161                if !self.route_allowed_locked(&st, Some(exclude), Some(ty), side) {
2162                    continue;
2163                }
2164                if !target_senders.is_empty() {
2165                    if !Self::side_matches_target_senders_locked(&st, side, &target_senders, now_ms)
2166                    {
2167                        continue;
2168                    }
2169                    had_known = true;
2170                    generic_targets.push(side);
2171                    continue;
2172                }
2173                if preferred_timesync_source.as_deref().is_some_and(|source| {
2174                    route.reachable_timesync_sources.iter().any(|s| s == source)
2175                }) {
2176                    had_exact = true;
2177                    exact_targets.push(side);
2178                    continue;
2179                }
2180                if eps.iter().copied().any(|ep| route.reachable.contains(&ep)) {
2181                    had_known = true;
2182                    generic_targets.push(side);
2183                }
2184            }
2185
2186            if had_exact {
2187                #[cfg(feature = "timesync")]
2188                {
2189                    exact_targets = Self::filter_timesync_sides_locked(
2190                        &mut st,
2191                        ty,
2192                        self.clock.now_ms(),
2193                        exact_targets,
2194                    );
2195                }
2196                let targets = self.filter_end_to_end_satisfied_sides_locked(
2197                    &st,
2198                    data,
2199                    exact_targets,
2200                    &eps,
2201                    ty,
2202                )?;
2203                Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2204                    &mut st,
2205                    Some(exclude),
2206                    targets,
2207                    discovered_origin,
2208                )))
2209            } else if had_known {
2210                #[cfg(feature = "timesync")]
2211                {
2212                    generic_targets = Self::filter_timesync_sides_locked(
2213                        &mut st,
2214                        ty,
2215                        self.clock.now_ms(),
2216                        generic_targets,
2217                    );
2218                }
2219                let targets = self.filter_end_to_end_satisfied_sides_locked(
2220                    &st,
2221                    data,
2222                    generic_targets,
2223                    &eps,
2224                    ty,
2225                )?;
2226                Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2227                    &mut st,
2228                    Some(exclude),
2229                    targets,
2230                    discovered_origin,
2231                )))
2232            } else {
2233                if Self::has_explicit_route_policy_locked(&st, Some(exclude), ty) {
2234                    let mut sides = self.eligible_side_ids_locked(
2235                        &st,
2236                        Some(exclude),
2237                        Some(ty),
2238                        restrict_link_local,
2239                    );
2240                    #[cfg(feature = "timesync")]
2241                    {
2242                        sides = Self::filter_timesync_sides_locked(
2243                            &mut st,
2244                            ty,
2245                            self.clock.now_ms(),
2246                            sides,
2247                        );
2248                    }
2249                    Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2250                        &mut st,
2251                        Some(exclude),
2252                        sides,
2253                        RouteSelectionOrigin::Flood,
2254                    )))
2255                } else {
2256                    Ok(RemoteSidePlan::Target(Vec::new()))
2257                }
2258            }
2259        }
2260        #[cfg(not(feature = "discovery"))]
2261        {
2262            let (_, ty) = self.item_route_info(data)?;
2263            let mut st = self.state.lock();
2264            if let Some(packet_id) = Self::reliable_control_target_packet_id(data)? {
2265                let target_side = self.allowed_target_side_locked(
2266                    &st,
2267                    exclude,
2268                    ty,
2269                    st.reliable_return_routes
2270                        .get(&packet_id)
2271                        .map(|route| route.side),
2272                );
2273                if let Some(side) = target_side {
2274                    return Ok(RemoteSidePlan::Target(vec![side]));
2275                }
2276                return Ok(RemoteSidePlan::Target(Vec::new()));
2277            }
2278            let sides = self.eligible_side_ids_locked(&st, Some(exclude), Some(ty), false);
2279            Ok(RemoteSidePlan::Target(self.apply_route_selection_locked(
2280                &mut st,
2281                Some(exclude),
2282                sides,
2283                RouteSelectionOrigin::Flood,
2284            )))
2285        }
2286    }
2287
2288    #[inline]
2289    fn allowed_target_side_locked(
2290        &self,
2291        st: &RelayInner,
2292        exclude: RelaySideId,
2293        ty: crate::DataType,
2294        target_side: Option<RelaySideId>,
2295    ) -> Option<RelaySideId> {
2296        target_side.filter(|side| self.route_allowed_locked(st, Some(exclude), Some(ty), *side))
2297    }
2298
2299    fn filter_end_to_end_satisfied_sides_locked(
2300        &self,
2301        st: &RelayInner,
2302        data: &RelayItem,
2303        sides: Vec<RelaySideId>,
2304        eps: &[crate::DataEndpoint],
2305        ty: crate::DataType,
2306    ) -> TelemetryResult<Vec<RelaySideId>> {
2307        if !is_reliable_type(ty) || Self::reliable_control_target_packet_id(data)?.is_some() {
2308            return Ok(sides);
2309        }
2310        let packet_id = match data {
2311            RelayItem::Packet(pkt) => pkt.packet_id(),
2312            RelayItem::Packed(bytes) => match wire_format::packet_id_from_wire(bytes.as_ref()) {
2313                Ok(packet_id) => packet_id,
2314                Err(TelemetryError::Unpack("reliable control frame")) => return Ok(sides),
2315                Err(err) => return Err(err),
2316            },
2317        };
2318        let Some(acked) = st.end_to_end_acked_destinations.get(&packet_id) else {
2319            return Ok(sides);
2320        };
2321        let now_ms = self.clock.now_ms();
2322        let mut filtered = Vec::new();
2323        for side in sides {
2324            let Some(route) = st.discovery_routes.get(&side) else {
2325                filtered.push(side);
2326                continue;
2327            };
2328            let mut still_pending = false;
2329            let mut had_destination_board = false;
2330            for sender_state in route.announcers.values() {
2331                if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2332                    continue;
2333                }
2334                for board in sender_state.topology_boards.iter() {
2335                    if !self.is_end_to_end_destination_sender(&board.sender_id) {
2336                        continue;
2337                    }
2338                    had_destination_board = true;
2339                    let sender_hash = Self::sender_hash(&board.sender_id);
2340                    if acked.contains(&sender_hash) {
2341                        continue;
2342                    }
2343                    if eps
2344                        .iter()
2345                        .copied()
2346                        .any(|ep| board.reachable_endpoints.contains(&ep))
2347                    {
2348                        still_pending = true;
2349                        break;
2350                    }
2351                    // Keep forwarding while any discovered destination sender for this packet
2352                    // remains unacked, even if topology/schema metadata changed for new packets.
2353                    still_pending = true;
2354                    break;
2355                }
2356                if still_pending {
2357                    break;
2358                }
2359            }
2360            if still_pending || !had_destination_board {
2361                filtered.push(side);
2362            }
2363        }
2364        Ok(filtered)
2365    }
2366
2367    #[cfg(feature = "discovery")]
2368    fn side_has_multiple_announcers_locked(
2369        &self,
2370        st: &RelayInner,
2371        side: RelaySideId,
2372        now_ms: u64,
2373    ) -> bool {
2374        st.discovery_routes
2375            .get(&side)
2376            .map(|route| {
2377                route
2378                    .announcers
2379                    .values()
2380                    .filter(|sender| {
2381                        now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2382                    })
2383                    .take(2)
2384                    .count()
2385                    > 1
2386            })
2387            .unwrap_or(false)
2388    }
2389
2390    #[cfg(not(feature = "discovery"))]
2391    fn side_has_multiple_announcers_locked(
2392        &self,
2393        _st: &RelayInner,
2394        _side: RelaySideId,
2395        _now_ms: u64,
2396    ) -> bool {
2397        false
2398    }
2399
2400    #[cfg(feature = "discovery")]
2401    fn sender_topology_board_mut<'a>(
2402        sender_state: &'a mut DiscoverySenderState,
2403        sender_id: &str,
2404    ) -> &'a mut TopologyBoardNode {
2405        if let Some(idx) = sender_state
2406            .topology_boards
2407            .iter()
2408            .position(|board| board.sender_id == sender_id)
2409        {
2410            return &mut sender_state.topology_boards[idx];
2411        }
2412        sender_state.topology_boards.push(TopologyBoardNode {
2413            sender_id: sender_id.to_string(),
2414            reachable_endpoints: Vec::new(),
2415            reachable_timesync_sources: Vec::new(),
2416            connections: Vec::new(),
2417        });
2418        sender_state
2419            .topology_boards
2420            .last_mut()
2421            .expect("board inserted above")
2422    }
2423
2424    #[cfg(feature = "discovery")]
2425    fn refresh_sender_topology_state(sender_state: &mut DiscoverySenderState) {
2426        discovery::normalize_topology_boards(&mut sender_state.topology_boards);
2427        let (reachable, reachable_timesync_sources) =
2428            discovery::summarize_topology_boards(&sender_state.topology_boards);
2429        sender_state.reachable = reachable;
2430        sender_state.reachable_timesync_sources = reachable_timesync_sources;
2431    }
2432
2433    #[cfg(feature = "discovery")]
2434    fn recompute_discovery_side_state(route: &mut DiscoverySideState) {
2435        let mut reachable = Vec::new();
2436        let mut reachable_timesync_sources = Vec::new();
2437        let mut last_seen_ms = 0u64;
2438        for sender in route.announcers.values() {
2439            reachable.extend(sender.reachable.iter().copied());
2440            reachable_timesync_sources.extend(sender.reachable_timesync_sources.iter().cloned());
2441            last_seen_ms = last_seen_ms.max(sender.last_seen_ms);
2442        }
2443        reachable.sort_unstable();
2444        reachable.dedup();
2445        reachable_timesync_sources.sort_unstable();
2446        reachable_timesync_sources.dedup();
2447        route.reachable = reachable;
2448        route.reachable_timesync_sources = reachable_timesync_sources;
2449        route.last_seen_ms = last_seen_ms;
2450    }
2451
2452    #[cfg(feature = "discovery")]
2453    fn local_discovery_topology_board(&self, st: &RelayInner, now_ms: u64) -> TopologyBoardNode {
2454        let mut connections = Vec::new();
2455        for route in st.discovery_routes.values() {
2456            if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2457                continue;
2458            }
2459            for (sender, sender_state) in route.announcers.iter() {
2460                if now_ms.saturating_sub(sender_state.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS {
2461                    connections.push(sender.clone());
2462                }
2463            }
2464        }
2465        connections.sort_unstable();
2466        connections.dedup();
2467        let sender = self.sender_arc();
2468        TopologyBoardNode {
2469            sender_id: sender.to_string(),
2470            reachable_endpoints: Vec::new(),
2471            reachable_timesync_sources: Vec::new(),
2472            connections,
2473        }
2474    }
2475
2476    #[cfg(feature = "discovery")]
2477    fn advertised_discovery_topology_for_link_locked(
2478        &self,
2479        st: &RelayInner,
2480        now_ms: u64,
2481        link_local_enabled: bool,
2482    ) -> Vec<TopologyBoardNode> {
2483        let mut boards = vec![self.local_discovery_topology_board(st, now_ms)];
2484        for route in st.discovery_routes.values() {
2485            if now_ms.saturating_sub(route.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2486                continue;
2487            }
2488            for (announcer, sender_state) in route.announcers.iter() {
2489                if now_ms.saturating_sub(sender_state.last_seen_ms) > DISCOVERY_ROUTE_TTL_MS {
2490                    continue;
2491                }
2492                let mut sender_boards = sender_state.topology_boards.clone();
2493                if sender_boards.is_empty() {
2494                    let sender = self.sender_arc();
2495                    sender_boards.push(TopologyBoardNode {
2496                        sender_id: announcer.clone(),
2497                        reachable_endpoints: sender_state.reachable.clone(),
2498                        reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
2499                        connections: vec![sender.to_string()],
2500                    });
2501                } else if let Some(board) = sender_boards
2502                    .iter_mut()
2503                    .find(|board| board.sender_id == *announcer)
2504                {
2505                    board.connections.push(self.sender_arc().to_string());
2506                }
2507                if !link_local_enabled {
2508                    for board in sender_boards.iter_mut() {
2509                        board
2510                            .reachable_endpoints
2511                            .retain(|ep| !ep.is_link_local_only());
2512                    }
2513                }
2514                discovery::merge_topology_boards(&mut boards, &sender_boards);
2515            }
2516        }
2517        discovery::normalize_topology_boards(&mut boards);
2518        boards
2519    }
2520
2521    #[cfg(feature = "discovery")]
2522    fn note_discovery_topology_change_locked(st: &mut RelayInner, now_ms: u64) {
2523        st.discovery_cadence.on_topology_change(now_ms);
2524    }
2525
2526    #[cfg(feature = "discovery")]
2527    fn prune_discovery_routes_locked(st: &mut RelayInner, now_ms: u64) -> bool {
2528        let before = st.discovery_routes.clone();
2529        st.discovery_routes.retain(|_, route| {
2530            route.announcers.retain(|_, sender| {
2531                now_ms.saturating_sub(sender.last_seen_ms) <= DISCOVERY_ROUTE_TTL_MS
2532            });
2533            Self::recompute_discovery_side_state(route);
2534            !route.announcers.is_empty()
2535        });
2536        st.discovery_routes != before
2537    }
2538
2539    #[cfg(feature = "discovery")]
2540    fn reconcile_end_to_end_acked_destinations_locked(&self, st: &mut RelayInner) {
2541        let mut active_senders = BTreeSet::new();
2542        for route in st.discovery_routes.values() {
2543            for sender_state in route.announcers.values() {
2544                for board in sender_state.topology_boards.iter() {
2545                    if self.is_end_to_end_destination_sender(&board.sender_id) {
2546                        active_senders.insert(Self::sender_hash(&board.sender_id));
2547                    }
2548                }
2549            }
2550        }
2551        st.end_to_end_acked_destinations.retain(|_, acked| {
2552            acked.retain(|sender_hash| active_senders.contains(sender_hash));
2553            !acked.is_empty()
2554        });
2555    }
2556
2557    #[cfg(feature = "discovery")]
2558    fn advertised_discovery_endpoints_for_link_locked(
2559        &self,
2560        st: &RelayInner,
2561        now_ms: u64,
2562        link_local_enabled: bool,
2563    ) -> Vec<crate::DataEndpoint> {
2564        let (reachable_endpoints, _) = discovery::summarize_topology_boards(
2565            &self.advertised_discovery_topology_for_link_locked(st, now_ms, link_local_enabled),
2566        );
2567        reachable_endpoints
2568            .into_iter()
2569            .filter(|ep| {
2570                !discovery::is_discovery_endpoint(*ep)
2571                    && (link_local_enabled || !ep.is_link_local_only())
2572            })
2573            .collect()
2574    }
2575
2576    #[cfg(feature = "discovery")]
2577    fn advertised_discovery_timesync_sources_for_link_locked(
2578        &self,
2579        st: &RelayInner,
2580        now_ms: u64,
2581    ) -> Vec<String> {
2582        let (_, sources) = discovery::summarize_topology_boards(
2583            &self.advertised_discovery_topology_for_link_locked(st, now_ms, true),
2584        );
2585        sources
2586    }
2587
2588    #[cfg(feature = "discovery")]
2589    #[cfg(feature = "timesync")]
2590    fn preferred_timesync_route_source(
2591        &self,
2592        data: &RelayItem,
2593        ty: crate::DataType,
2594    ) -> TelemetryResult<Option<String>> {
2595        if !matches!(
2596            ty,
2597            crate::DataType::TimeSyncAnnounce | crate::DataType::TimeSyncResponse
2598        ) {
2599            return Ok(None);
2600        }
2601
2602        let sender = match data {
2603            RelayItem::Packet(pkt) => pkt.sender().to_owned(),
2604            RelayItem::Packed(bytes) => {
2605                if wire_format::peek_frame_info(bytes.as_ref())
2606                    .ok()
2607                    .is_some_and(|frame| frame.ack_only())
2608                {
2609                    return Ok(None);
2610                }
2611                wire_format::unpack_packet(bytes.as_ref())?
2612                    .sender()
2613                    .to_owned()
2614            }
2615        };
2616        Ok(Some(sender))
2617    }
2618
2619    #[cfg(feature = "discovery")]
2620    #[inline]
2621    fn side_is_slow_control_link_locked(
2622        st: &RelayInner,
2623        side_id: RelaySideId,
2624        now_ms: u64,
2625    ) -> bool {
2626        st.adaptive_route_stats.get(&side_id).is_some_and(|stats| {
2627            let recent_slow = stats.last_slow_observed_ms > 0
2628                && now_ms.saturating_sub(stats.last_slow_observed_ms)
2629                    <= DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS;
2630            stats.sample_count > 0
2631                && ((stats.estimated_bandwidth_bps > 0
2632                    && stats.estimated_bandwidth_bps <= CONTROL_SLOW_LINK_CAPACITY_BPS)
2633                    || recent_slow)
2634        })
2635    }
2636
2637    #[cfg(feature = "discovery")]
2638    fn discovery_level_for_side_locked(
2639        st: &mut RelayInner,
2640        side_id: RelaySideId,
2641        now_ms: u64,
2642    ) -> Option<DiscoveryAdvertiseLevel> {
2643        if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2644            st.discovery_side_throttle.remove(&side_id);
2645            return Some(DiscoveryAdvertiseLevel::Full);
2646        }
2647
2648        let throttle = st.discovery_side_throttle.entry(side_id).or_default();
2649        if now_ms >= throttle.next_full_ms {
2650            throttle.next_full_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS);
2651            throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2652            return Some(DiscoveryAdvertiseLevel::Full);
2653        }
2654        if now_ms >= throttle.next_ping_ms {
2655            throttle.next_ping_ms = now_ms.saturating_add(DISCOVERY_SLOW_LINK_PING_INTERVAL_MS);
2656            return Some(DiscoveryAdvertiseLevel::MinimalPing);
2657        }
2658        None
2659    }
2660
2661    #[cfg(all(feature = "discovery", feature = "timesync"))]
2662    #[inline]
2663    fn is_timesync_type(ty: crate::DataType) -> bool {
2664        matches!(
2665            ty,
2666            crate::DataType::TimeSyncAnnounce
2667                | crate::DataType::TimeSyncRequest
2668                | crate::DataType::TimeSyncResponse
2669        )
2670    }
2671
2672    #[cfg(all(feature = "discovery", feature = "timesync"))]
2673    fn timesync_allowed_for_side_locked(
2674        st: &mut RelayInner,
2675        side_id: RelaySideId,
2676        ty: crate::DataType,
2677        now_ms: u64,
2678    ) -> bool {
2679        if !Self::is_timesync_type(ty) {
2680            return true;
2681        }
2682        if !Self::side_is_slow_control_link_locked(st, side_id, now_ms) {
2683            st.timesync_side_throttle.remove(&side_id);
2684            return true;
2685        }
2686
2687        let throttle = st.timesync_side_throttle.entry(side_id).or_default();
2688        if now_ms >= throttle.next_allowed_ms {
2689            throttle.next_allowed_ms = now_ms.saturating_add(TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS);
2690            return true;
2691        }
2692        false
2693    }
2694
2695    #[cfg(all(feature = "discovery", feature = "timesync"))]
2696    fn filter_timesync_sides_locked(
2697        st: &mut RelayInner,
2698        ty: crate::DataType,
2699        now_ms: u64,
2700        sides: Vec<RelaySideId>,
2701    ) -> Vec<RelaySideId> {
2702        sides
2703            .into_iter()
2704            .filter(|side| Self::timesync_allowed_for_side_locked(st, *side, ty, now_ms))
2705            .collect()
2706    }
2707
2708    #[cfg(feature = "discovery")]
2709    fn queue_discovery_announce(&self) -> TelemetryResult<()> {
2710        let now_ms = self.clock.now_ms();
2711        let per_side = {
2712            let mut st = self.state.lock();
2713            if Self::prune_discovery_routes_locked(&mut st, now_ms) {
2714                self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2715                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2716            }
2717            st.fit_discovery_budget();
2718            if !st.sides.iter().any(|side| side.is_some()) {
2719                return Ok(());
2720            }
2721            st.discovery_cadence.on_announce_sent(now_ms);
2722            let side_entries = st
2723                .sides
2724                .iter()
2725                .enumerate()
2726                .filter_map(|(side_id, side)| {
2727                    side.as_ref()
2728                        .map(|side| (side_id, side.opts.link_local_enabled, side.opts))
2729                })
2730                .collect::<Vec<_>>();
2731            let mut per_side = Vec::new();
2732            for (side_id, link_local_enabled, opts) in side_entries {
2733                if !self.route_allowed_locked(
2734                    &st,
2735                    None,
2736                    Some(crate::DataType::DiscoveryAnnounce),
2737                    side_id,
2738                ) {
2739                    continue;
2740                }
2741                let Some(level) = Self::discovery_level_for_side_locked(&mut st, side_id, now_ms)
2742                else {
2743                    continue;
2744                };
2745                let capabilities = opts.link_capabilities();
2746                if level == DiscoveryAdvertiseLevel::MinimalPing {
2747                    per_side.push((
2748                        side_id,
2749                        level,
2750                        Vec::new(),
2751                        Vec::new(),
2752                        Vec::new(),
2753                        capabilities,
2754                    ));
2755                    continue;
2756                }
2757                let endpoints = self.advertised_discovery_endpoints_for_link_locked(
2758                    &st,
2759                    now_ms,
2760                    link_local_enabled,
2761                );
2762                let timesync_sources =
2763                    self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
2764                let topology = self.advertised_discovery_topology_for_link_locked(
2765                    &st,
2766                    now_ms,
2767                    link_local_enabled,
2768                );
2769                per_side.push((
2770                    side_id,
2771                    level,
2772                    endpoints,
2773                    timesync_sources,
2774                    topology,
2775                    capabilities,
2776                ));
2777            }
2778            per_side
2779        };
2780        let mut st = self.state.lock();
2781        for (dst, level, endpoints, timesync_sources, topology, capabilities) in per_side {
2782            let sender = self.sender_arc();
2783            if level == DiscoveryAdvertiseLevel::Full {
2784                let pkt = discovery::build_discovery_schema(sender.as_ref(), now_ms)?;
2785                let data = RelayItem::Packet(Arc::new(pkt));
2786                let priority = Self::relay_item_priority(&data)?;
2787                st.push_tx(RelayTxItem {
2788                    src: None,
2789                    dst,
2790                    data,
2791                    priority,
2792                })?;
2793            }
2794            if level == DiscoveryAdvertiseLevel::Full {
2795                let pkt = discovery::build_discovery_link_capabilities(
2796                    sender.as_ref(),
2797                    now_ms,
2798                    capabilities,
2799                )?;
2800                let data = RelayItem::Packet(Arc::new(pkt));
2801                let priority = Self::relay_item_priority(&data)?;
2802                st.push_tx(RelayTxItem {
2803                    src: None,
2804                    dst,
2805                    data,
2806                    priority,
2807                })?;
2808            }
2809            if level == DiscoveryAdvertiseLevel::MinimalPing || !endpoints.is_empty() {
2810                let pkt = discovery::build_discovery_announce(
2811                    sender.as_ref(),
2812                    now_ms,
2813                    endpoints.as_slice(),
2814                )?;
2815                let data = RelayItem::Packet(Arc::new(pkt));
2816                let priority = Self::relay_item_priority(&data)?;
2817                st.push_tx(RelayTxItem {
2818                    src: None,
2819                    dst,
2820                    data,
2821                    priority,
2822                })?;
2823            }
2824            if level == DiscoveryAdvertiseLevel::Full && !timesync_sources.is_empty() {
2825                let pkt = discovery::build_discovery_timesync_sources(
2826                    sender.as_ref(),
2827                    now_ms,
2828                    timesync_sources.as_slice(),
2829                )?;
2830                let data = RelayItem::Packet(Arc::new(pkt));
2831                let priority = Self::relay_item_priority(&data)?;
2832                st.push_tx(RelayTxItem {
2833                    src: None,
2834                    dst,
2835                    data,
2836                    priority,
2837                })?;
2838            }
2839            if level == DiscoveryAdvertiseLevel::Full && !topology.is_empty() {
2840                let pkt = discovery::build_discovery_topology(sender.as_ref(), now_ms, &topology)?;
2841                let data = RelayItem::Packet(Arc::new(pkt));
2842                let priority = Self::relay_item_priority(&data)?;
2843                st.push_tx(RelayTxItem {
2844                    src: None,
2845                    dst,
2846                    data,
2847                    priority,
2848                })?;
2849            }
2850        }
2851        Ok(())
2852    }
2853
2854    #[cfg(feature = "discovery")]
2855    fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
2856        let now_ms = self.clock.now_ms();
2857        let due = {
2858            let mut st = self.state.lock();
2859            let removed = Self::prune_discovery_routes_locked(&mut st, now_ms);
2860            if removed {
2861                self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2862                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2863            }
2864            st.fit_discovery_budget();
2865            let has_any = st.sides.iter().enumerate().any(|(side_id, side)| {
2866                let Some(side) = side.as_ref() else {
2867                    return false;
2868                };
2869                if !self.route_allowed_locked(
2870                    &st,
2871                    None,
2872                    Some(crate::DataType::DiscoveryAnnounce),
2873                    side_id,
2874                ) {
2875                    return false;
2876                }
2877                let _ = side;
2878                true
2879            });
2880            if !st.sides.iter().any(|side| side.is_some()) || !has_any {
2881                return Ok(false);
2882            }
2883            st.discovery_cadence.due(now_ms)
2884        };
2885        if !due {
2886            return Ok(false);
2887        }
2888        self.queue_discovery_announce()?;
2889        Ok(true)
2890    }
2891
2892    #[cfg(feature = "discovery")]
2893    fn learn_discovery_item(&self, src: RelaySideId, data: &RelayItem) -> TelemetryResult<()> {
2894        let pkt = match data {
2895            RelayItem::Packet(pkt) => {
2896                if !discovery::is_discovery_type(pkt.data_type()) {
2897                    return Ok(());
2898                }
2899                pkt.as_ref().clone()
2900            }
2901            RelayItem::Packed(bytes) => {
2902                let env = wire_format::peek_envelope(bytes.as_ref())?;
2903                if !discovery::is_discovery_type(env.ty) {
2904                    return Ok(());
2905                }
2906                if wire_format::peek_frame_info(bytes.as_ref())
2907                    .ok()
2908                    .is_some_and(|frame| frame.ack_only())
2909                {
2910                    return Ok(());
2911                }
2912                wire_format::unpack_packet(bytes.as_ref())?
2913            }
2914        };
2915
2916        let now_ms = self.clock.now_ms();
2917        if pkt.data_type() == crate::DataType::DiscoverySchema {
2918            let snapshot = discovery::decode_discovery_schema(&pkt)?;
2919            let incoming_cost = crate::config::owned_schema_byte_cost(&snapshot);
2920            let mut st = self.state.lock();
2921            st.make_shared_queue_room(incoming_cost, RelayQueueKind::Discovery)?;
2922            drop(st);
2923            let report =
2924                crate::config::merge_owned_schema_snapshot_with_budget(snapshot, MAX_QUEUE_BUDGET)?;
2925            if report.changed() {
2926                let mut st = self.state.lock();
2927                st.fit_discovery_budget();
2928                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2929            }
2930            return Ok(());
2931        }
2932        if pkt.data_type() == crate::DataType::DiscoveryLinkCapabilities {
2933            let _ = discovery::decode_discovery_link_capabilities(&pkt)?;
2934            return Ok(());
2935        }
2936        let mut st = self.state.lock();
2937        if pkt.data_type() == crate::DataType::DiscoveryLeave {
2938            let leaving = pkt.sender();
2939            let before = st.discovery_routes.clone();
2940            for route in st.discovery_routes.values_mut() {
2941                route.announcers.remove(leaving);
2942                for sender_state in route.announcers.values_mut() {
2943                    sender_state
2944                        .topology_boards
2945                        .retain(|board| board.sender_id != leaving);
2946                    for board in sender_state.topology_boards.iter_mut() {
2947                        board.connections.retain(|peer| peer != leaving);
2948                    }
2949                    Self::refresh_sender_topology_state(sender_state);
2950                }
2951                Self::recompute_discovery_side_state(route);
2952            }
2953            st.discovery_routes
2954                .retain(|_, route| !route.announcers.is_empty());
2955            if st.discovery_routes != before {
2956                Self::note_discovery_topology_change_locked(&mut st, now_ms);
2957            }
2958            let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
2959            self.reconcile_end_to_end_acked_destinations_locked(&mut st);
2960            return Ok(());
2961        }
2962        let address_ad = if pkt.data_type() == crate::DataType::DiscoveryAddress {
2963            Some(discovery::decode_discovery_address(&pkt)?)
2964        } else {
2965            None
2966        };
2967        let announcer_id = address_ad
2968            .as_ref()
2969            .map(|ad| ad.hostname.clone())
2970            .unwrap_or_else(|| pkt.sender().to_string());
2971        let mut route = st.discovery_routes.get(&src).cloned().unwrap_or_default();
2972        let side_link_local_enabled = st
2973            .sides
2974            .get(src)
2975            .and_then(|entry| entry.as_ref())
2976            .map(|side_ref| side_ref.opts.link_local_enabled)
2977            .unwrap_or(false);
2978        let mut sender_state = route
2979            .announcers
2980            .get(&announcer_id)
2981            .cloned()
2982            .unwrap_or_default();
2983        let changed = match pkt.data_type() {
2984            crate::DataType::DiscoveryAddress => {
2985                let ad = address_ad.expect("decoded above");
2986                let mut reachable = ad.reachable_endpoints;
2987                if !side_link_local_enabled {
2988                    reachable.retain(|ep| !ep.is_link_local_only());
2989                }
2990                let board = Self::sender_topology_board_mut(&mut sender_state, &ad.hostname);
2991                let changed = board.reachable_endpoints != reachable
2992                    || board.reachable_timesync_sources != ad.reachable_timesync_sources;
2993                board.reachable_endpoints = reachable;
2994                board.reachable_timesync_sources = ad.reachable_timesync_sources;
2995                Self::refresh_sender_topology_state(&mut sender_state);
2996                changed
2997            }
2998            crate::DataType::DiscoveryAnnounce => {
2999                let mut reachable = discovery::decode_discovery_announce(&pkt)?;
3000                if !side_link_local_enabled {
3001                    reachable.retain(|ep| !ep.is_link_local_only());
3002                }
3003                let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3004                let changed = board.reachable_endpoints != reachable;
3005                board.reachable_endpoints = reachable;
3006                Self::refresh_sender_topology_state(&mut sender_state);
3007                changed
3008            }
3009            crate::DataType::DiscoveryTimeSyncSources => {
3010                let sources = discovery::decode_discovery_timesync_sources(&pkt)?;
3011                let board = Self::sender_topology_board_mut(&mut sender_state, pkt.sender());
3012                let changed = board.reachable_timesync_sources != sources;
3013                board.reachable_timesync_sources = sources;
3014                Self::refresh_sender_topology_state(&mut sender_state);
3015                changed
3016            }
3017            crate::DataType::DiscoveryTopology => {
3018                let mut boards = discovery::decode_discovery_topology(&pkt)?;
3019                if !side_link_local_enabled {
3020                    for board in boards.iter_mut() {
3021                        board
3022                            .reachable_endpoints
3023                            .retain(|ep| !ep.is_link_local_only());
3024                    }
3025                }
3026                let changed = sender_state.topology_boards != boards;
3027                sender_state.topology_boards = boards;
3028                Self::refresh_sender_topology_state(&mut sender_state);
3029                changed
3030            }
3031            crate::DataType::DiscoverySchema => false,
3032            _ => false,
3033        };
3034        sender_state.last_seen_ms = now_ms;
3035        route.announcers.insert(announcer_id, sender_state);
3036        Self::recompute_discovery_side_state(&mut route);
3037        st.discovery_routes.insert(src, route);
3038        st.fit_discovery_budget();
3039        if changed {
3040            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3041        }
3042        let _ = Self::prune_discovery_routes_locked(&mut st, now_ms);
3043        self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3044        Ok(())
3045    }
3046
3047    #[cfg(not(feature = "discovery"))]
3048    fn learn_discovery_item(&self, _src: RelaySideId, _data: &RelayItem) -> TelemetryResult<()> {
3049        Ok(())
3050    }
3051
3052    #[cfg(not(feature = "discovery"))]
3053    fn queue_discovery_announce(&self) -> TelemetryResult<()> {
3054        Ok(())
3055    }
3056
3057    #[cfg(not(feature = "discovery"))]
3058    fn poll_discovery_announce(&self) -> TelemetryResult<bool> {
3059        Ok(false)
3060    }
3061
3062    fn process_reliable_timeouts(&self) -> TelemetryResult<()> {
3063        let now = self.clock.now_ms();
3064        let mut requeue: Vec<(RelaySideId, crate::DataType, u32)> = Vec::new();
3065
3066        {
3067            let mut st = self.state.lock();
3068            if st.reliable_tx.is_empty() {
3069                return Ok(());
3070            }
3071
3072            for ((side, ty_u32), tx_state) in st.reliable_tx.iter_mut() {
3073                let Some(ty) = crate::DataType::try_from_u32(*ty_u32) else {
3074                    continue;
3075                };
3076                let sent_order: Vec<u32> = tx_state.sent_order.iter().copied().collect();
3077                for seq in sent_order {
3078                    let Some(sent) = tx_state.sent.get_mut(&seq) else {
3079                        continue;
3080                    };
3081                    if sent.queued || now.wrapping_sub(sent.last_send_ms) < RELIABLE_RETRANSMIT_MS {
3082                        continue;
3083                    }
3084                    if sent.partial_acked {
3085                        continue;
3086                    }
3087                    if sent.retries >= RELIABLE_MAX_RETRIES {
3088                        tx_state.sent.remove(&seq);
3089                        tx_state.sent_order.retain(|existing| *existing != seq);
3090                        continue;
3091                    }
3092                    sent.retries += 1;
3093                    requeue.push((*side, ty, seq));
3094                }
3095            }
3096        }
3097
3098        for (side, ty, seq) in requeue {
3099            self.queue_reliable_retransmit(side, ty, seq)?;
3100        }
3101
3102        Ok(())
3103    }
3104
3105    /// Compute a de-dupe hash for a QueueItem.
3106    /// Uses packet ID for Packet items, and attempts to extract packet ID from
3107    /// packed bytes. If extraction fails, hashes raw bytes as a fallback.
3108    fn get_hash(item: &RelayRxItem) -> u64 {
3109        match &item.data {
3110            RelayItem::Packet(pkt) => pkt.packet_id(),
3111            RelayItem::Packed(bytes) => {
3112                let reliable_seq = wire_format::peek_frame_info(bytes.as_ref())
3113                    .ok()
3114                    .and_then(|frame| frame.reliable)
3115                    .and_then(|hdr| {
3116                        if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
3117                            None
3118                        } else {
3119                            Some(hdr.seq)
3120                        }
3121                    });
3122
3123                match wire_format::packet_id_from_wire(bytes.as_ref()) {
3124                    Ok(id) => {
3125                        if let Some(seq) = reliable_seq {
3126                            hash_bytes_u64(id, &seq.to_le_bytes())
3127                        } else {
3128                            id
3129                        }
3130                    }
3131                    Err(_e) => {
3132                        // Fallback: if bytes are malformed (or compression feature mismatch),
3133                        // hash raw bytes so we can still dedupe identical network duplicates.
3134                        let h: u64 = 0x9E37_79B9_7F4A_7C15;
3135                        hash_bytes_u64(h, bytes.as_ref())
3136                    }
3137                }
3138            }
3139        }
3140    }
3141
3142    /// Compute a dedupe ID for an incoming RelayRxItem.
3143    /// Note: we intentionally do *not* include `src` so that the same
3144    /// packet coming from multiple sides is only processed once.
3145    fn is_duplicate_pkt(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3146        let id = Self::get_hash(item);
3147
3148        let mut st = self.state.lock();
3149        if st.recent_rx.contains(&id) {
3150            Ok(true)
3151        } else {
3152            st.push_recent_rx(id)?;
3153            Ok(false)
3154        }
3155    }
3156
3157    fn should_forward_duplicate_reliable_item(&self, item: &RelayRxItem) -> TelemetryResult<bool> {
3158        let (_, ty) = self.item_route_info(&item.data)?;
3159        if !is_reliable_type(ty)
3160            || matches!(
3161                ty,
3162                crate::DataType::ReliableAck
3163                    | crate::DataType::ReliablePartialAck
3164                    | crate::DataType::ReliablePacketRequest
3165            )
3166        {
3167            return Ok(false);
3168        }
3169
3170        let RemoteSidePlan::Target(sides) = self.remote_side_plan(&item.data, item.src)?;
3171        let st = self.state.lock();
3172        let now_ms = self.clock.now_ms();
3173        Ok(sides
3174            .into_iter()
3175            .any(|side| self.side_has_multiple_announcers_locked(&st, side, now_ms)))
3176    }
3177
3178    /// Register a side whose TX callback consumes packed packet bytes.
3179    ///
3180    /// Returns the side id later used for ingress APIs such as `rx_packed_from_side`.
3181    /// The default options disable the relay's per-link reliable framing on this side.
3182    pub fn add_side_packed<F>(&self, name: &'static str, tx: F) -> RelaySideId
3183    where
3184        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3185    {
3186        self.add_side_packed_with_options(name, tx, RelaySideOptions::default())
3187    }
3188
3189    /// Register a packed side with bounded-frame transport enabled.
3190    ///
3191    /// `max_frame_bytes == 0` leaves frames unbounded.
3192    pub fn add_side_packed_small_packets<F>(
3193        &self,
3194        name: &'static str,
3195        tx: F,
3196        max_frame_bytes: usize,
3197    ) -> RelaySideId
3198    where
3199        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3200    {
3201        self.add_side_packed_with_options(
3202            name,
3203            tx,
3204            RelaySideOptions::default().with_small_packet_transport(max_frame_bytes),
3205        )
3206    }
3207
3208    /// Register a packed-output side with explicit side options.
3209    ///
3210    /// `opts.reliable_enabled` enables relay-managed per-hop ACK/retransmit behavior on this side.
3211    /// `opts.link_local_enabled` gates link-local-only forwarding and discovery use of this side.
3212    /// `ingress_enabled` and `egress_enabled` set the initial directional policy.
3213    pub fn add_side_packed_with_options<F>(
3214        &self,
3215        name: &'static str,
3216        tx: F,
3217        opts: RelaySideOptions,
3218    ) -> RelaySideId
3219    where
3220        F: Fn(&[u8]) -> TelemetryResult<()> + Send + Sync + 'static,
3221    {
3222        let mut st = self.state.lock();
3223        let id = st.sides.len();
3224        st.sides.push(Some(RelaySide {
3225            name,
3226            tx_handler: RelayTxHandlerFn::Packed(Arc::new(tx)),
3227            opts,
3228        }));
3229        st.side_runtime_stats
3230            .insert(id, SideRuntimeStatsInner::default());
3231        st.side_transport.insert(id, SideTransportState::default());
3232        #[cfg(feature = "discovery")]
3233        Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3234        id
3235    }
3236
3237    /// Register a side whose TX callback receives decoded [`Packet`] values.
3238    ///
3239    /// Packet-output sides do not preserve the relay's packed reliable hop framing, so use a
3240    /// packed side when this hop should participate in relay-managed per-link reliability.
3241    pub fn add_side_packet<F>(&self, name: &'static str, tx: F) -> RelaySideId
3242    where
3243        F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3244    {
3245        self.add_side_packet_with_options(name, tx, RelaySideOptions::default())
3246    }
3247
3248    /// Register a packet-output side with explicit side options.
3249    pub fn add_side_packet_with_options<F>(
3250        &self,
3251        name: &'static str,
3252        tx: F,
3253        opts: RelaySideOptions,
3254    ) -> RelaySideId
3255    where
3256        F: Fn(&Packet) -> TelemetryResult<()> + Send + Sync + 'static,
3257    {
3258        let mut st = self.state.lock();
3259        let id = st.sides.len();
3260        st.sides.push(Some(RelaySide {
3261            name,
3262            tx_handler: RelayTxHandlerFn::Packet(Arc::new(tx)),
3263            opts,
3264        }));
3265        st.side_runtime_stats
3266            .insert(id, SideRuntimeStatsInner::default());
3267        st.side_transport.insert(id, SideTransportState::default());
3268        #[cfg(feature = "discovery")]
3269        Self::note_discovery_topology_change_locked(&mut st, self.clock.now_ms());
3270        id
3271    }
3272
3273    /// Remove a side while keeping existing side IDs stable.
3274    ///
3275    /// `side` must be an id returned by one of the `add_side_*` calls. Remaining side ids are not
3276    /// renumbered.
3277    pub fn remove_side(&self, side: RelaySideId) -> TelemetryResult<()> {
3278        let now_ms = self.clock.now_ms();
3279        let mut st = self.state.lock();
3280        let slot = st.sides.get_mut(side).ok_or(TelemetryError::BadArg)?;
3281        if slot.is_none() {
3282            return Err(TelemetryError::BadArg);
3283        }
3284        *slot = None;
3285        st.route_overrides
3286            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3287        st.typed_route_overrides
3288            .retain(|(src_side, _, dst_side), _| *src_side != Some(side) && *dst_side != side);
3289        st.route_weights
3290            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3291        st.route_priorities
3292            .retain(|(src_side, dst_side), _| *src_side != Some(side) && *dst_side != side);
3293        st.source_route_modes.remove(&Some(side));
3294        st.route_selection_cursors.remove(&Some(side));
3295        st.adaptive_route_stats.remove(&side);
3296        #[cfg(feature = "discovery")]
3297        st.discovery_side_throttle.remove(&side);
3298        #[cfg(all(feature = "discovery", feature = "timesync"))]
3299        st.timesync_side_throttle.remove(&side);
3300        st.side_runtime_stats.remove(&side);
3301        st.reliable_return_routes
3302            .retain(|_, route| route.side != side);
3303        st.rx_queue.retain(|queued| queued.src != side);
3304        st.tx_queue
3305            .retain(|queued| queued.dst != side && queued.src != Some(side));
3306        st.replay_queue.retain(|queued| queued.dst != side);
3307        st.reliable_tx.retain(|(side_id, _), _| *side_id != side);
3308        st.reliable_rx.retain(|(side_id, _), _| *side_id != side);
3309        #[cfg(feature = "discovery")]
3310        {
3311            st.discovery_routes.remove(&side);
3312            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3313        }
3314        Ok(())
3315    }
3316
3317    /// Enable or disable ingress processing for a registered side.
3318    pub fn set_side_ingress_enabled(
3319        &self,
3320        side: RelaySideId,
3321        enabled: bool,
3322    ) -> TelemetryResult<()> {
3323        let now_ms = self.clock.now_ms();
3324        let mut st = self.state.lock();
3325        let side_ref = st
3326            .sides
3327            .get_mut(side)
3328            .and_then(|side| side.as_mut())
3329            .ok_or(TelemetryError::BadArg)?;
3330        side_ref.opts.ingress_enabled = enabled;
3331        #[cfg(feature = "discovery")]
3332        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3333        Ok(())
3334    }
3335
3336    /// Enable or disable egress toward a registered side.
3337    pub fn set_side_egress_enabled(&self, side: RelaySideId, enabled: bool) -> TelemetryResult<()> {
3338        let now_ms = self.clock.now_ms();
3339        let mut st = self.state.lock();
3340        let side_ref = st
3341            .sides
3342            .get_mut(side)
3343            .and_then(|side| side.as_mut())
3344            .ok_or(TelemetryError::BadArg)?;
3345        side_ref.opts.egress_enabled = enabled;
3346        if !enabled {
3347            st.tx_queue.retain(|queued| queued.dst != side);
3348            st.replay_queue.retain(|queued| queued.dst != side);
3349        }
3350        #[cfg(feature = "discovery")]
3351        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3352        Ok(())
3353    }
3354
3355    /// Set the route-selection policy for traffic originating from `src`.
3356    ///
3357    /// `src == None` targets locally-originated relay traffic such as discovery output.
3358    pub fn set_source_route_mode(
3359        &self,
3360        src: Option<RelaySideId>,
3361        mode: RouteSelectionMode,
3362    ) -> TelemetryResult<()> {
3363        let now_ms = self.clock.now_ms();
3364        let mut st = self.state.lock();
3365        if let Some(src) = src {
3366            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3367        }
3368        if mode == RouteSelectionMode::Fanout {
3369            st.source_route_modes.remove(&src);
3370        } else {
3371            st.source_route_modes.insert(src, mode);
3372        }
3373        st.route_selection_cursors.remove(&src);
3374        #[cfg(feature = "discovery")]
3375        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3376        Ok(())
3377    }
3378
3379    /// Clear a source-specific route-selection override.
3380    pub fn clear_source_route_mode(&self, src: Option<RelaySideId>) -> TelemetryResult<()> {
3381        self.set_source_route_mode(src, RouteSelectionMode::Fanout)
3382    }
3383
3384    /// Set the weighted-routing weight from `src` toward `dst`.
3385    pub fn set_route_weight(
3386        &self,
3387        src: Option<RelaySideId>,
3388        dst: RelaySideId,
3389        weight: u32,
3390    ) -> TelemetryResult<()> {
3391        let now_ms = self.clock.now_ms();
3392        let mut st = self.state.lock();
3393        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3394        if let Some(src) = src {
3395            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3396        }
3397        st.route_weights.insert((src, dst), weight);
3398        st.route_selection_cursors.remove(&src);
3399        #[cfg(feature = "discovery")]
3400        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3401        Ok(())
3402    }
3403
3404    /// Clear a previously configured weighted-routing weight override.
3405    pub fn clear_route_weight(
3406        &self,
3407        src: Option<RelaySideId>,
3408        dst: RelaySideId,
3409    ) -> TelemetryResult<()> {
3410        let now_ms = self.clock.now_ms();
3411        let mut st = self.state.lock();
3412        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3413        if let Some(src) = src {
3414            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3415        }
3416        st.route_weights.remove(&(src, dst));
3417        st.route_selection_cursors.remove(&src);
3418        #[cfg(feature = "discovery")]
3419        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3420        Ok(())
3421    }
3422
3423    /// Set the failover priority from `src` toward `dst`.
3424    pub fn set_route_priority(
3425        &self,
3426        src: Option<RelaySideId>,
3427        dst: RelaySideId,
3428        priority: u32,
3429    ) -> TelemetryResult<()> {
3430        let now_ms = self.clock.now_ms();
3431        let mut st = self.state.lock();
3432        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3433        if let Some(src) = src {
3434            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3435        }
3436        st.route_priorities.insert((src, dst), priority);
3437        #[cfg(feature = "discovery")]
3438        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3439        Ok(())
3440    }
3441
3442    /// Clear a previously configured failover priority override.
3443    pub fn clear_route_priority(
3444        &self,
3445        src: Option<RelaySideId>,
3446        dst: RelaySideId,
3447    ) -> TelemetryResult<()> {
3448        let now_ms = self.clock.now_ms();
3449        let mut st = self.state.lock();
3450        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3451        if let Some(src) = src {
3452            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3453        }
3454        st.route_priorities.remove(&(src, dst));
3455        #[cfg(feature = "discovery")]
3456        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3457        Ok(())
3458    }
3459
3460    /// Allow or block routing from `src` toward `dst`.
3461    pub fn set_route(
3462        &self,
3463        src: Option<RelaySideId>,
3464        dst: RelaySideId,
3465        enabled: bool,
3466    ) -> TelemetryResult<()> {
3467        let now_ms = self.clock.now_ms();
3468        let mut st = self.state.lock();
3469        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3470        if let Some(src) = src {
3471            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3472        }
3473        st.route_overrides.insert((src, dst), enabled);
3474        #[cfg(feature = "discovery")]
3475        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3476        Ok(())
3477    }
3478
3479    /// Allow or block routing for a specific `DataType` from `src` toward `dst`.
3480    pub fn set_typed_route(
3481        &self,
3482        src: Option<RelaySideId>,
3483        ty: crate::DataType,
3484        dst: RelaySideId,
3485        enabled: bool,
3486    ) -> TelemetryResult<()> {
3487        let now_ms = self.clock.now_ms();
3488        let mut st = self.state.lock();
3489        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3490        if let Some(src) = src {
3491            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3492        }
3493        st.typed_route_overrides
3494            .insert((src, ty.as_u32(), dst), enabled);
3495        #[cfg(feature = "discovery")]
3496        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3497        Ok(())
3498    }
3499
3500    /// Clear a typed route override for the `(src, ty, dst)` triple.
3501    pub fn clear_typed_route(
3502        &self,
3503        src: Option<RelaySideId>,
3504        ty: crate::DataType,
3505        dst: RelaySideId,
3506    ) -> TelemetryResult<()> {
3507        let now_ms = self.clock.now_ms();
3508        let mut st = self.state.lock();
3509        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3510        if let Some(src) = src {
3511            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3512        }
3513        st.typed_route_overrides.remove(&(src, ty.as_u32(), dst));
3514        #[cfg(feature = "discovery")]
3515        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3516        Ok(())
3517    }
3518
3519    /// Clear a non-typed route override so the relay falls back to default behavior.
3520    pub fn clear_route(&self, src: Option<RelaySideId>, dst: RelaySideId) -> TelemetryResult<()> {
3521        let now_ms = self.clock.now_ms();
3522        let mut st = self.state.lock();
3523        let _ = Self::side_ref(&st, dst).map_err(|_| TelemetryError::BadArg)?;
3524        if let Some(src) = src {
3525            let _ = Self::side_ref(&st, src).map_err(|_| TelemetryError::BadArg)?;
3526        }
3527        st.route_overrides.remove(&(src, dst));
3528        #[cfg(feature = "discovery")]
3529        Self::note_discovery_topology_change_locked(&mut st, now_ms);
3530        Ok(())
3531    }
3532
3533    #[cfg(feature = "discovery")]
3534    /// Queues an immediate discovery announcement for this relay.
3535    pub fn announce_discovery(&self) -> TelemetryResult<()> {
3536        self.queue_discovery_announce()
3537    }
3538
3539    /// Broadcast that this relay is leaving so peers can prune topology immediately.
3540    pub fn announce_leave(&self) -> TelemetryResult<()> {
3541        let pkt = discovery::build_discovery_leave("relay", self.clock.now_ms())?;
3542        let mut st = self.state.lock();
3543        let dsts: Vec<usize> = st
3544            .sides
3545            .iter()
3546            .enumerate()
3547            .filter_map(|(idx, side)| side.as_ref().map(|_| idx))
3548            .collect();
3549        for dst in dsts {
3550            let data = RelayItem::Packet(Arc::new(pkt.clone()));
3551            let priority = Self::relay_item_priority(&data)?;
3552            st.push_tx(RelayTxItem {
3553                src: None,
3554                dst,
3555                data,
3556                priority,
3557            })?;
3558        }
3559        Ok(())
3560    }
3561
3562    #[cfg(feature = "discovery")]
3563    /// Polls discovery state and queues an announce if the cadence says one is due.
3564    pub fn poll_discovery(&self) -> TelemetryResult<bool> {
3565        self.poll_discovery_announce()
3566    }
3567
3568    #[cfg(feature = "discovery")]
3569    /// Exports the relay's current discovered topology snapshot.
3570    pub fn export_topology(&self) -> TopologySnapshot {
3571        let now_ms = self.clock.now_ms();
3572        let mut st = self.state.lock();
3573        if Self::prune_discovery_routes_locked(&mut st, now_ms) {
3574            self.reconcile_end_to_end_acked_destinations_locked(&mut st);
3575            Self::note_discovery_topology_change_locked(&mut st, now_ms);
3576        }
3577        let routes = st
3578            .discovery_routes
3579            .iter()
3580            .filter_map(|(&side_id, route)| {
3581                let side = st.sides.get(side_id).and_then(|side| side.as_ref())?;
3582                let announcers = route
3583                    .announcers
3584                    .iter()
3585                    .map(|(sender_id, sender_state)| TopologyAnnouncerRoute {
3586                        sender_id: sender_id.clone(),
3587                        reachable_endpoints: sender_state
3588                            .reachable
3589                            .iter()
3590                            .copied()
3591                            .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3592                            .collect(),
3593                        reachable_timesync_sources: sender_state.reachable_timesync_sources.clone(),
3594                        routers: sender_state.topology_boards.clone(),
3595                        last_seen_ms: sender_state.last_seen_ms,
3596                        age_ms: now_ms.saturating_sub(sender_state.last_seen_ms),
3597                    })
3598                    .collect();
3599                Some(TopologySideRoute {
3600                    side_id,
3601                    side_name: side.name,
3602                    reachable_endpoints: route
3603                        .reachable
3604                        .iter()
3605                        .copied()
3606                        .filter(|ep| !discovery::is_router_control_endpoint(*ep))
3607                        .collect(),
3608                    reachable_timesync_sources: route.reachable_timesync_sources.clone(),
3609                    announcers,
3610                    last_seen_ms: route.last_seen_ms,
3611                    age_ms: now_ms.saturating_sub(route.last_seen_ms),
3612                })
3613            })
3614            .collect();
3615        let routers = self.advertised_discovery_topology_for_link_locked(&st, now_ms, true);
3616        let advertised_endpoints =
3617            self.advertised_discovery_endpoints_for_link_locked(&st, now_ms, true);
3618        let advertised_timesync_sources =
3619            self.advertised_discovery_timesync_sources_for_link_locked(&st, now_ms);
3620        let links = discovery::topology_links_from_boards(&routers);
3621        TopologySnapshot {
3622            advertised_endpoints,
3623            advertised_timesync_sources,
3624            routers,
3625            links,
3626            routes,
3627            current_announce_interval_ms: st.discovery_cadence.current_interval_ms,
3628            next_announce_ms: st.discovery_cadence.next_announce_ms,
3629        }
3630    }
3631
3632    #[cfg(feature = "discovery")]
3633    pub fn client_stats(&self, sender_id: &str) -> Option<ClientStatsSnapshot> {
3634        let now_ms = self.clock.now_ms();
3635        let st = self.state.lock();
3636        let mut side_ids = Vec::new();
3637        let mut side_names = Vec::new();
3638        let mut last_seen_ms = None::<u64>;
3639        let mut reachable_endpoints = Vec::new();
3640        let mut reachable_timesync_sources = Vec::new();
3641        let mut packets_sent = 0u64;
3642        let mut packets_received = 0u64;
3643        let mut bytes_sent = 0u64;
3644        let mut bytes_received = 0u64;
3645
3646        for (side_id, route) in &st.discovery_routes {
3647            let Some(sender_state) = route.announcers.get(sender_id) else {
3648                continue;
3649            };
3650            side_ids.push(*side_id);
3651            if let Some(side_name) = st
3652                .sides
3653                .get(*side_id)
3654                .and_then(|side| side.as_ref())
3655                .map(|side| side.name)
3656            {
3657                side_names.push(side_name);
3658            }
3659            last_seen_ms = Some(last_seen_ms.unwrap_or(0).max(sender_state.last_seen_ms));
3660            reachable_endpoints.extend(sender_state.reachable.iter().copied());
3661            reachable_timesync_sources
3662                .extend(sender_state.reachable_timesync_sources.iter().cloned());
3663            if let Some(stats) = st.side_runtime_stats.get(side_id) {
3664                packets_sent = packets_sent.saturating_add(stats.tx_packets);
3665                packets_received = packets_received.saturating_add(stats.rx_packets);
3666                bytes_sent = bytes_sent.saturating_add(stats.tx_bytes);
3667                bytes_received = bytes_received.saturating_add(stats.rx_bytes);
3668            }
3669        }
3670
3671        if side_ids.is_empty() {
3672            return None;
3673        }
3674        reachable_endpoints.retain(|ep| !discovery::is_router_control_endpoint(*ep));
3675        reachable_endpoints.sort_unstable();
3676        reachable_endpoints.dedup();
3677        reachable_timesync_sources.sort_unstable();
3678        reachable_timesync_sources.dedup();
3679        side_ids.sort_unstable();
3680        side_ids.dedup();
3681        side_names.sort_unstable();
3682        side_names.dedup();
3683        let age_ms = last_seen_ms.map(|seen| now_ms.saturating_sub(seen));
3684        Some(ClientStatsSnapshot {
3685            sender_id: sender_id.to_string(),
3686            connected: age_ms.is_some_and(|age| age <= DISCOVERY_ROUTE_TTL_MS),
3687            side_ids,
3688            side_names,
3689            last_seen_ms,
3690            age_ms,
3691            reachable_endpoints,
3692            reachable_timesync_sources,
3693            packets_sent,
3694            packets_received,
3695            bytes_sent,
3696            bytes_received,
3697        })
3698    }
3699
3700    pub fn export_runtime_stats(&self) -> RuntimeStatsSnapshot {
3701        let now_ms = self.clock.now_ms();
3702        let st = self.state.lock();
3703
3704        let mut sides = Vec::new();
3705        for (side_id, side) in st.sides.iter().enumerate() {
3706            let Some(side) = side.as_ref() else { continue };
3707            let stats = st
3708                .side_runtime_stats
3709                .get(&side_id)
3710                .cloned()
3711                .unwrap_or_default();
3712            let adaptive = st
3713                .adaptive_route_stats
3714                .get(&side_id)
3715                .cloned()
3716                .unwrap_or_default()
3717                .snapshot(now_ms, true);
3718            let (tx_template_count, rx_template_count) = st
3719                .side_transport
3720                .get(&side_id)
3721                .map(|state| (state.tx_template_count(), state.rx_template_count()))
3722                .unwrap_or((0, 0));
3723            let mut data_types: Vec<RuntimeTypeStats> = stats
3724                .data_types
3725                .into_iter()
3726                .map(|(ty, item)| RuntimeTypeStats {
3727                    data_type: crate::DataType(ty),
3728                    tx_packets: item.tx_packets,
3729                    tx_bytes: item.tx_bytes,
3730                    rx_packets: item.rx_packets,
3731                    rx_bytes: item.rx_bytes,
3732                    relayed_tx_packets: item.relayed_tx_packets,
3733                    relayed_tx_bytes: item.relayed_tx_bytes,
3734                    relayed_rx_packets: item.relayed_rx_packets,
3735                    relayed_rx_bytes: item.relayed_rx_bytes,
3736                    tx_retries: item.tx_retries,
3737                    handler_failures: item.handler_failures,
3738                })
3739                .collect();
3740            data_types.sort_unstable_by_key(|item| item.data_type.as_u32());
3741            sides.push(RuntimeSideStats {
3742                side_id,
3743                side_name: side.name,
3744                reliable_enabled: side.opts.reliable_enabled,
3745                link_local_enabled: side.opts.link_local_enabled,
3746                header_template_enabled: side.opts.header_template_enabled,
3747                max_frame_bytes: side.opts.max_frame_bytes,
3748                compact_header_target_bytes: side.opts.compact_header_target_bytes,
3749                side_transport_profile: side.opts.effective_transport_profile().as_str(),
3750                ingress_enabled: side.opts.ingress_enabled,
3751                egress_enabled: side.opts.egress_enabled,
3752                tx_packets: stats.tx_packets,
3753                tx_bytes: stats.tx_bytes,
3754                rx_packets: stats.rx_packets,
3755                rx_bytes: stats.rx_bytes,
3756                relayed_tx_packets: stats.relayed_tx_packets,
3757                relayed_tx_bytes: stats.relayed_tx_bytes,
3758                relayed_rx_packets: stats.relayed_rx_packets,
3759                relayed_rx_bytes: stats.relayed_rx_bytes,
3760                local_delivery_packets: 0,
3761                tx_retries: stats.tx_retries,
3762                tx_handler_failures: stats.tx_handler_failures,
3763                local_handler_failures: 0,
3764                total_handler_retries: stats.total_handler_retries,
3765                side_transport_full_frames: stats.side_transport_full_frames,
3766                side_transport_compact_frames: stats.side_transport_compact_frames,
3767                side_transport_compact_delta_frames: stats.side_transport_compact_delta_frames,
3768                side_transport_compact_omitted_timestamp_frames: stats
3769                    .side_transport_compact_omitted_timestamp_frames,
3770                side_transport_chunk_frames: stats.side_transport_chunk_frames,
3771                side_transport_raw_bytes: stats.side_transport_raw_bytes,
3772                side_transport_wire_bytes: stats.side_transport_wire_bytes,
3773                side_transport_bytes_saved: stats.side_transport_bytes_saved,
3774                side_transport_min_compact_overhead_bytes: stats
3775                    .side_transport_min_compact_overhead_bytes,
3776                side_transport_max_compact_overhead_bytes: stats
3777                    .side_transport_max_compact_overhead_bytes,
3778                side_transport_compact_target_misses: stats.side_transport_compact_target_misses,
3779                side_transport_template_evictions: stats.side_transport_template_evictions,
3780                side_transport_tx_template_count: tx_template_count,
3781                side_transport_rx_template_count: rx_template_count,
3782                max_side_transport_templates: side.opts.max_side_transport_templates,
3783                adaptive,
3784                data_types,
3785            });
3786        }
3787
3788        let mut route_modes: Vec<RouteModeStats> = st
3789            .route_selection_cursors
3790            .iter()
3791            .map(|(src, cursor)| RouteModeStats {
3792                src_side_id: *src,
3793                selection_mode: st.source_route_modes.get(src).copied(),
3794                cursor: *cursor,
3795            })
3796            .collect();
3797        for src in st.source_route_modes.keys() {
3798            if !route_modes.iter().any(|mode| mode.src_side_id == *src) {
3799                route_modes.push(RouteModeStats {
3800                    src_side_id: *src,
3801                    selection_mode: st.source_route_modes.get(src).copied(),
3802                    cursor: 0,
3803                });
3804            }
3805        }
3806        route_modes.sort_unstable_by_key(|mode| mode.src_side_id.unwrap_or(usize::MAX));
3807
3808        let mut route_overrides: Vec<RouteOverrideStats> = st
3809            .route_overrides
3810            .iter()
3811            .map(|((src, dst), enabled)| RouteOverrideStats {
3812                src_side_id: *src,
3813                dst_side_id: *dst,
3814                enabled: *enabled,
3815            })
3816            .collect();
3817        route_overrides.sort_unstable_by_key(|item| {
3818            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3819        });
3820
3821        let mut typed_route_overrides: Vec<TypedRouteOverrideStats> = st
3822            .typed_route_overrides
3823            .iter()
3824            .map(|((src, ty, dst), enabled)| TypedRouteOverrideStats {
3825                src_side_id: *src,
3826                data_type: crate::DataType(*ty),
3827                dst_side_id: *dst,
3828                enabled: *enabled,
3829            })
3830            .collect();
3831        typed_route_overrides.sort_unstable_by_key(|item| {
3832            (
3833                item.src_side_id.unwrap_or(usize::MAX),
3834                item.data_type.as_u32(),
3835                item.dst_side_id,
3836            )
3837        });
3838
3839        let mut route_weights: Vec<RouteWeightStats> = st
3840            .route_weights
3841            .iter()
3842            .map(|((src, dst), weight)| RouteWeightStats {
3843                src_side_id: *src,
3844                dst_side_id: *dst,
3845                weight: *weight,
3846            })
3847            .collect();
3848        route_weights.sort_unstable_by_key(|item| {
3849            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3850        });
3851
3852        let mut route_priorities: Vec<RoutePriorityStats> = st
3853            .route_priorities
3854            .iter()
3855            .map(|((src, dst), priority)| RoutePriorityStats {
3856                src_side_id: *src,
3857                dst_side_id: *dst,
3858                priority: *priority,
3859            })
3860            .collect();
3861        route_priorities.sort_unstable_by_key(|item| {
3862            (item.src_side_id.unwrap_or(usize::MAX), item.dst_side_id)
3863        });
3864
3865        #[cfg(feature = "discovery")]
3866        let discovery = DiscoveryRuntimeStats {
3867            route_count: st.discovery_routes.len(),
3868            announcer_count: st
3869                .discovery_routes
3870                .values()
3871                .map(|route| route.announcers.len())
3872                .sum(),
3873            current_announce_interval_ms: Some(st.discovery_cadence.current_interval_ms),
3874            next_announce_ms: Some(st.discovery_cadence.next_announce_ms),
3875        };
3876        #[cfg(not(feature = "discovery"))]
3877        let discovery = DiscoveryRuntimeStats {
3878            route_count: 0,
3879            announcer_count: 0,
3880            current_announce_interval_ms: None,
3881            next_announce_ms: None,
3882        };
3883
3884        RuntimeStatsSnapshot {
3885            sides,
3886            route_modes,
3887            route_overrides,
3888            typed_route_overrides,
3889            route_weights,
3890            route_priorities,
3891            queues: QueueRuntimeStats {
3892                rx_len: st.rx_queue.len(),
3893                rx_bytes: st.rx_queue.bytes_used(),
3894                tx_len: st.tx_queue.len(),
3895                tx_bytes: st.tx_queue.bytes_used(),
3896                replay_len: st.replay_queue.len(),
3897                replay_bytes: st.replay_queue.bytes_used(),
3898                recent_rx_len: st.recent_rx.len(),
3899                recent_rx_bytes: st.recent_rx.bytes_used(),
3900                reliable_rx_buffered_len: st.reliable_rx_buffer_len(),
3901                reliable_rx_buffered_bytes: st.reliable_rx_buffered_bytes(),
3902                shared_queue_bytes_used: st.shared_queue_bytes_used(),
3903            },
3904            reliable: ReliableRuntimeStats {
3905                reliable_return_route_count: st.reliable_return_routes.len(),
3906                end_to_end_pending_count: 0,
3907                end_to_end_pending_destination_count: 0,
3908                end_to_end_acked_cache_count: st.end_to_end_acked_destinations.len(),
3909            },
3910            discovery,
3911            total_handler_failures: st.total_handler_failures,
3912            total_handler_retries: st.total_handler_retries,
3913        }
3914    }
3915
3916    /// Export current relay memory usage/layout as JSON for profiling.
3917    pub fn export_memory_layout_json(&self) -> String {
3918        let st = self.state.lock();
3919        #[cfg(feature = "discovery")]
3920        let discovery_bytes = st.discovery_bytes_used();
3921        #[cfg(not(feature = "discovery"))]
3922        let discovery_bytes = 0usize;
3923        let schema_bytes = crate::config::schema_bytes_used();
3924        let mut out = String::new();
3925        let _ = core::fmt::Write::write_fmt(
3926            &mut out,
3927            format_args!(
3928                "{{\"kind\":\"relay\",\
3929                 \"shared_queue_bytes_used\":{},\"shared_queue_bytes_allocated\":{},\
3930                 \"rx_queue_bytes_used\":{},\"rx_queue_bytes_allocated\":{},\"rx_queue_len\":{},\
3931                 \"tx_queue_bytes_used\":{},\"tx_queue_bytes_allocated\":{},\"tx_queue_len\":{},\
3932                 \"replay_queue_bytes_used\":{},\"replay_queue_bytes_allocated\":{},\"replay_queue_len\":{},\
3933                 \"recent_rx_bytes_used\":{},\"recent_rx_bytes_allocated\":{},\"recent_rx_len\":{},\
3934                 \"reliable_rx_buffer_bytes_used\":{},\"reliable_rx_buffer_bytes_allocated\":{},\"reliable_rx_buffer_len\":{},\
3935                 \"discovery_bytes_used\":{},\"discovery_bytes_allocated\":{},\
3936                 \"schema_bytes_used\":{},\"schema_bytes_allocated\":{}}}",
3937                st.shared_queue_bytes_used(),
3938                MAX_QUEUE_BUDGET,
3939                st.rx_queue.bytes_used(),
3940                st.rx_queue.max_bytes(),
3941                st.rx_queue.len(),
3942                st.tx_queue.bytes_used(),
3943                st.tx_queue.max_bytes(),
3944                st.tx_queue.len(),
3945                st.replay_queue.bytes_used(),
3946                st.replay_queue.max_bytes(),
3947                st.replay_queue.len(),
3948                st.recent_rx.bytes_used(),
3949                st.recent_rx.max_bytes(),
3950                st.recent_rx.len(),
3951                st.reliable_rx_buffered_bytes(),
3952                MAX_QUEUE_BUDGET,
3953                st.reliable_rx_buffer_len(),
3954                discovery_bytes,
3955                MAX_QUEUE_BUDGET,
3956                schema_bytes,
3957                MAX_QUEUE_BUDGET,
3958            ),
3959        );
3960        out
3961    }
3962
3963    #[cfg(test)]
3964    pub(crate) fn debug_end_to_end_acked_destination_count(&self, packet_id: u64) -> Option<usize> {
3965        let st = self.state.lock();
3966        st.end_to_end_acked_destinations
3967            .get(&packet_id)
3968            .map(BTreeSet::len)
3969    }
3970
3971    #[cfg(test)]
3972    pub(crate) fn debug_end_to_end_acked_packet_count(&self) -> usize {
3973        let st = self.state.lock();
3974        st.end_to_end_acked_destinations.len()
3975    }
3976
3977    #[cfg(test)]
3978    pub(crate) fn debug_reliable_return_route_count(&self) -> usize {
3979        let st = self.state.lock();
3980        st.reliable_return_routes.len()
3981    }
3982
3983    /// Enqueue packed bytes that originated from `src` into the relay RX queue.
3984    ///
3985    /// Note: `Arc::from(bytes)` allocates and copies `len` bytes into a new `Arc<[u8]>`.
3986    /// This is still “fast enough” for many cases, but it is not allocation-free / ISR-safe.
3987    pub fn rx_packed_from_side(&self, src: RelaySideId, bytes: &[u8]) -> TelemetryResult<()> {
3988        self.ensure_side_ingress_enabled(src)?;
3989        let Some(bytes) = self.decode_side_transport_frame(src, bytes)? else {
3990            return Ok(());
3991        };
3992        let mut st = self.state.lock();
3993
3994        let data = RelayItem::Packed(bytes);
3995        let priority = Self::relay_item_priority(&data)?;
3996        st.push_rx(RelayRxItem {
3997            src,
3998            data,
3999            priority,
4000        })
4001    }
4002
4003    /// Enqueue a full packet that originated from `src` into the relay RX queue.
4004    ///
4005    /// The packet is wrapped in `Arc<Packet>` so fanout can clone the pointer cheaply.
4006    pub fn rx_from_side(&self, src: RelaySideId, packet: Packet) -> TelemetryResult<()> {
4007        self.ensure_side_ingress_enabled(src)?;
4008        let mut st = self.state.lock();
4009
4010        let data = RelayItem::Packet(Arc::new(packet));
4011        let priority = Self::relay_item_priority(&data)?;
4012        st.push_rx(RelayRxItem {
4013            src,
4014            data,
4015            priority,
4016        })
4017    }
4018
4019    /// Clear both RX and TX queues.
4020    pub fn clear_queues(&self) {
4021        let mut st = self.state.lock();
4022        st.rx_queue.clear();
4023        st.tx_queue.clear();
4024    }
4025
4026    /// Clear only RX queue.
4027    pub fn clear_rx_queue(&self) {
4028        let mut st = self.state.lock();
4029        st.rx_queue.clear();
4030    }
4031
4032    /// Clear only TX queue.
4033    pub fn clear_tx_queue(&self) {
4034        let mut st = self.state.lock();
4035        st.tx_queue.clear();
4036        st.replay_queue.clear();
4037    }
4038
4039    /// Internal: expand one RX item into TX items for all other sides.
4040    ///
4041    /// Fanout is cheap: the `RelayItem` is cloned (Arc bump) and reused across all destinations.
4042    fn process_rx_queue_item(&self, item: RelayRxItem) -> TelemetryResult<()> {
4043        self.ensure_side_ingress_enabled(item.src)?;
4044        match &item.data {
4045            RelayItem::Packet(pkt) => {
4046                let bytes = wire_format::pack_packet(pkt).len();
4047                self.note_side_rx(item.src, pkt.data_type(), bytes);
4048            }
4049            RelayItem::Packed(bytes) => {
4050                if let Ok(env) = wire_format::peek_envelope(bytes.as_ref()) {
4051                    self.note_side_rx(item.src, env.ty, bytes.len());
4052                }
4053            }
4054        }
4055        match &item.data {
4056            RelayItem::Packet(pkt) => {
4057                if is_reliable_type(pkt.data_type()) && !is_internal_control_type(pkt.data_type()) {
4058                    self.note_reliable_return_route(item.src, pkt.packet_id());
4059                }
4060            }
4061            RelayItem::Packed(bytes) => {
4062                if let Ok(env) = wire_format::peek_envelope(bytes.as_ref())
4063                    && is_reliable_type(env.ty)
4064                    && !is_internal_control_type(env.ty)
4065                    && let Ok(packet_id) = wire_format::packet_id_from_wire(bytes.as_ref())
4066                {
4067                    self.note_reliable_return_route(item.src, packet_id);
4068                }
4069            }
4070        }
4071        let mut released_buffered: Vec<Arc<[u8]>> = Vec::new();
4072        if let RelayItem::Packed(bytes) = &item.data {
4073            let (_opts, handler_is_packed, hop_reliable_enabled) = {
4074                let st = self.state.lock();
4075                let side_ref = Self::side_ref(&st, item.src)?;
4076                let opts = side_ref.opts;
4077                (
4078                    opts,
4079                    matches!(side_ref.tx_handler, RelayTxHandlerFn::Packed(_)),
4080                    opts.reliable_enabled
4081                        && !self.side_has_multiple_announcers_locked(
4082                            &st,
4083                            item.src,
4084                            self.clock.now_ms(),
4085                        ),
4086                )
4087            };
4088
4089            let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4090                Ok(frame) => frame,
4091                Err(e) => {
4092                    if matches!(e, TelemetryError::Unpack(msg) if msg == "crc32 mismatch")
4093                        && hop_reliable_enabled
4094                        && handler_is_packed
4095                        && let Ok(frame) = wire_format::peek_frame_info_unchecked(bytes.as_ref())
4096                    {
4097                        if is_reliable_type(frame.envelope.ty)
4098                            && let Some(hdr) = frame.reliable
4099                        {
4100                            let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4101                            let unsequenced =
4102                                (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4103
4104                            if !unsequenced {
4105                                let requested = if unordered {
4106                                    hdr.seq
4107                                } else {
4108                                    let mut st = self.state.lock();
4109                                    let rx_state = self.reliable_rx_state_mut(
4110                                        &mut st,
4111                                        item.src,
4112                                        frame.envelope.ty,
4113                                    );
4114                                    rx_state.expected_seq.min(hdr.seq)
4115                                };
4116                                self.queue_reliable_packet_request(
4117                                    item.src,
4118                                    frame.envelope.ty,
4119                                    requested,
4120                                )?;
4121                            }
4122                        }
4123                        return Ok(());
4124                    }
4125                    return Err(e);
4126                }
4127            };
4128
4129            if hop_reliable_enabled
4130                && handler_is_packed
4131                && is_reliable_type(frame.envelope.ty)
4132                && let Some(hdr) = frame.reliable
4133            {
4134                if frame.ack_only() {
4135                    self.handle_reliable_ack(item.src, frame.envelope.ty, hdr.ack);
4136                    return Ok(());
4137                }
4138                let unordered = (hdr.flags & wire_format::RELIABLE_FLAG_UNORDERED) != 0;
4139                let unsequenced = (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) != 0;
4140
4141                if !unsequenced {
4142                    if unordered {
4143                        self.queue_reliable_ack(item.src, frame.envelope.ty, hdr.seq)?;
4144                    } else {
4145                        let mut release: Vec<Arc<[u8]>> = Vec::new();
4146                        let mut last_delivered = None;
4147                        let mut ack_old = None;
4148                        let mut request_missing = None;
4149                        let mut partial_ack = None;
4150                        {
4151                            let mut st = self.state.lock();
4152                            let rx_state =
4153                                self.reliable_rx_state_mut(&mut st, item.src, frame.envelope.ty);
4154                            let expected_seq = rx_state.expected_seq;
4155                            if hdr.seq < expected_seq {
4156                                ack_old = Some(expected_seq.saturating_sub(1));
4157                            } else if hdr.seq > expected_seq {
4158                                request_missing = Some(expected_seq);
4159                                partial_ack = Some(hdr.seq);
4160                                st.buffer_reliable_rx(
4161                                    item.src,
4162                                    frame.envelope.ty,
4163                                    hdr.seq,
4164                                    bytes.clone(),
4165                                )?;
4166                            } else {
4167                                release.push(bytes.clone());
4168                                last_delivered = Some(hdr.seq);
4169                                let mut next_expected = hdr.seq.wrapping_add(1);
4170                                while let Some(buf) = rx_state.buffered.remove(&next_expected) {
4171                                    release.push(buf);
4172                                    last_delivered = Some(next_expected);
4173                                    let next = next_expected.wrapping_add(1);
4174                                    next_expected = if next == 0 { 1 } else { next };
4175                                }
4176                                rx_state.expected_seq = next_expected;
4177                            }
4178                        }
4179
4180                        if let Some(ack_seq) = ack_old {
4181                            self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4182                            return Ok(());
4183                        }
4184                        if let Some(request_seq) = request_missing {
4185                            if let Some(partial_seq) = partial_ack {
4186                                self.queue_reliable_partial_ack(
4187                                    item.src,
4188                                    frame.envelope.ty,
4189                                    partial_seq,
4190                                )?;
4191                            }
4192                            self.queue_reliable_packet_request(
4193                                item.src,
4194                                frame.envelope.ty,
4195                                request_seq,
4196                            )?;
4197                            return Ok(());
4198                        }
4199                        if let Some(ack_seq) = last_delivered {
4200                            self.queue_reliable_ack(item.src, frame.envelope.ty, ack_seq)?;
4201                        }
4202                        released_buffered.extend(release.into_iter().skip(1));
4203                    }
4204                }
4205            }
4206        }
4207
4208        if self.is_duplicate_pkt(&item)? && !self.should_forward_duplicate_reliable_item(&item)? {
4209            // Already fanned out this packet recently; skip.
4210            return Ok(());
4211        }
4212
4213        self.dispatch_relay_rx_item(&item)?;
4214
4215        for release_bytes in released_buffered {
4216            let release_item = RelayRxItem {
4217                src: item.src,
4218                priority: Self::relay_item_priority(&RelayItem::Packed(release_bytes.clone()))?,
4219                data: RelayItem::Packed(release_bytes),
4220            };
4221            if self.is_duplicate_pkt(&release_item)?
4222                && !self.should_forward_duplicate_reliable_item(&release_item)?
4223            {
4224                continue;
4225            }
4226            self.dispatch_relay_rx_item(&release_item)?;
4227        }
4228        Ok(())
4229    }
4230
4231    fn dispatch_relay_rx_item(&self, item: &RelayRxItem) -> TelemetryResult<()> {
4232        match &item.data {
4233            RelayItem::Packet(pkt) => {
4234                if matches!(
4235                    pkt.data_type(),
4236                    crate::DataType::ReliableAck
4237                        | crate::DataType::ReliablePartialAck
4238                        | crate::DataType::ReliablePacketRequest
4239                ) {
4240                    if pkt.data_type() == crate::DataType::ReliableAck
4241                        && Self::is_end_to_end_ack_sender(pkt.sender())
4242                        && Self::decode_end_to_end_reliable_ack(pkt.payload()).is_ok()
4243                    {
4244                        if let Ok(packet_id) = Self::decode_end_to_end_reliable_ack(pkt.payload())
4245                            && let Some(sender_hash) =
4246                                Self::decode_end_to_end_ack_sender_hash(pkt.sender())
4247                        {
4248                            let mut st = self.state.lock();
4249                            Self::note_end_to_end_acked_destination_locked(
4250                                &mut st,
4251                                packet_id,
4252                                sender_hash,
4253                            );
4254                        }
4255                    } else {
4256                        let vals = pkt.data_as_u32()?;
4257                        if vals.len() != 2 {
4258                            return Err(TelemetryError::Unpack("bad reliable control payload"));
4259                        }
4260                        let ty = crate::DataType::try_from_u32(vals[0])
4261                            .ok_or(TelemetryError::InvalidType)?;
4262                        let seq = vals[1];
4263                        match pkt.data_type() {
4264                            crate::DataType::ReliableAck => {
4265                                self.handle_reliable_ack(item.src, ty, seq)
4266                            }
4267                            crate::DataType::ReliablePartialAck => {
4268                                self.handle_reliable_partial_ack(item.src, ty, seq)
4269                            }
4270                            crate::DataType::ReliablePacketRequest => {
4271                                self.queue_reliable_retransmit(item.src, ty, seq)?
4272                            }
4273                            _ => {}
4274                        }
4275                        return Ok(());
4276                    }
4277                }
4278            }
4279            RelayItem::Packed(bytes) => {
4280                let env = wire_format::peek_envelope(bytes.as_ref())?;
4281                if matches!(
4282                    env.ty,
4283                    crate::DataType::ReliableAck
4284                        | crate::DataType::ReliablePacketRequest
4285                        | crate::DataType::ReliablePartialAck
4286                ) {
4287                    let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4288                    return self.dispatch_relay_rx_item(&RelayRxItem {
4289                        src: item.src,
4290                        data: RelayItem::Packet(Arc::new(pkt)),
4291                        priority: item.priority,
4292                    });
4293                }
4294            }
4295        }
4296
4297        let src = item.src;
4298        let data = item.data.clone();
4299        self.learn_discovery_item(src, &data)?;
4300
4301        let plan = self.remote_side_plan(&data, src)?;
4302        let mut st = self.state.lock();
4303        let RemoteSidePlan::Target(sides) = plan;
4304        for dst in sides {
4305            let priority = Self::relay_item_priority(&data)?;
4306            st.push_tx(RelayTxItem {
4307                src: Some(src),
4308                dst,
4309                data: data.clone(),
4310                priority,
4311            })?;
4312        }
4313        Ok(())
4314    }
4315
4316    #[inline]
4317    fn crc32_bytes(data: &[u8]) -> u32 {
4318        let mut hasher = Crc32Hasher::new();
4319        hasher.update(data);
4320        hasher.finalize()
4321    }
4322
4323    fn wrap_side_transport_frame(kind: u8, body: &[u8]) -> Arc<[u8]> {
4324        let mut out = Vec::with_capacity(
4325            SIDE_TRANSPORT_MAGIC.len() + 1 + body.len() + wire_format::CRC32_BYTES,
4326        );
4327        out.extend_from_slice(SIDE_TRANSPORT_MAGIC);
4328        out.push(kind);
4329        out.extend_from_slice(body);
4330        let crc = Self::crc32_bytes(&out);
4331        out.extend_from_slice(&crc.to_le_bytes());
4332        Arc::from(out)
4333    }
4334
4335    fn parse_side_transport_wrapper(bytes: &[u8]) -> TelemetryResult<Option<(u8, &[u8])>> {
4336        if bytes.len() < SIDE_TRANSPORT_MAGIC.len() + 1 + wire_format::CRC32_BYTES {
4337            return Ok(None);
4338        }
4339        if &bytes[..SIDE_TRANSPORT_MAGIC.len()] != SIDE_TRANSPORT_MAGIC {
4340            return Ok(None);
4341        }
4342        let data_len = bytes.len() - wire_format::CRC32_BYTES;
4343        let expected = u32::from_le_bytes([
4344            bytes[data_len],
4345            bytes[data_len + 1],
4346            bytes[data_len + 2],
4347            bytes[data_len + 3],
4348        ]);
4349        let data = &bytes[..data_len];
4350        if Self::crc32_bytes(data) != expected {
4351            return Err(TelemetryError::Unpack("side transport crc32 mismatch"));
4352        }
4353        let kind = data[SIDE_TRANSPORT_MAGIC.len()];
4354        Ok(Some((kind, &data[SIDE_TRANSPORT_MAGIC.len() + 1..])))
4355    }
4356
4357    fn read_uleb128_local(buf: &[u8], off: &mut usize) -> TelemetryResult<u64> {
4358        let mut result = 0u64;
4359        let mut shift = 0u32;
4360        for _ in 0..10 {
4361            let byte = *buf.get(*off).ok_or(TelemetryError::Unpack("short read"))?;
4362            *off += 1;
4363            result |= u64::from(byte & 0x7F) << shift;
4364            if (byte & 0x80) == 0 {
4365                return Ok(result);
4366            }
4367            shift += 7;
4368        }
4369        Err(TelemetryError::Unpack("uleb128 too long"))
4370    }
4371
4372    fn write_uleb128_local(mut value: u64, out: &mut Vec<u8>) {
4373        loop {
4374            let mut byte = (value & 0x7F) as u8;
4375            value >>= 7;
4376            if value != 0 {
4377                byte |= 0x80;
4378            }
4379            out.push(byte);
4380            if value == 0 {
4381                break;
4382            }
4383        }
4384    }
4385
4386    fn uleb128_len_local(mut value: u64) -> usize {
4387        let mut len = 1;
4388        while value >= 0x80 {
4389            value >>= 7;
4390            len += 1;
4391        }
4392        len
4393    }
4394
4395    fn extract_side_header_template(bytes: &[u8]) -> TelemetryResult<SideTemplateExtract<'_>> {
4396        if bytes.len() < wire_format::CRC32_BYTES + 4 {
4397            return Err(TelemetryError::Unpack("short buffer"));
4398        }
4399        let data_len = bytes.len() - wire_format::CRC32_BYTES;
4400        let data = &bytes[..data_len];
4401        let mut off = 0usize;
4402        let flags = *data
4403            .get(off)
4404            .ok_or(TelemetryError::Unpack("short prelude"))?;
4405        off += 1;
4406        off += 1; // NEP
4407        let ty_u64 = Self::read_uleb128_local(data, &mut off)?;
4408        let ty_u32 = u32::try_from(ty_u64).map_err(|_| TelemetryError::Unpack("bad data type"))?;
4409        if ty_u32 > crate::MAX_VALUE_DATA_TYPE {
4410            return Err(TelemetryError::Unpack("bad data type"));
4411        }
4412        let ty = crate::DataType(ty_u32);
4413        let data_size_off = off;
4414        let data_size = Self::read_uleb128_local(data, &mut off)?;
4415        let timestamp = Self::read_uleb128_local(data, &mut off)?;
4416        let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4417            u16::try_from(Self::read_uleb128_local(data, &mut off)?)
4418                .map_err(|_| TelemetryError::Unpack("packet nonce too large"))?
4419        } else {
4420            0
4421        };
4422        let between_start = off;
4423        let _source_address = u32::try_from(Self::read_uleb128_local(data, &mut off)?)
4424            .map_err(|_| TelemetryError::Unpack("source address too large"))?;
4425        let endpoint_bitmap_bytes = if (flags & SIDE_TRANSPORT_FLAG_ENDPOINT_BITMAP_PRESENT) != 0 {
4426            SIDE_TRANSPORT_EP_BITMAP_BYTES
4427        } else {
4428            0
4429        };
4430        if data.len() < off + endpoint_bitmap_bytes {
4431            return Err(TelemetryError::Unpack("short buffer"));
4432        }
4433        off += endpoint_bitmap_bytes;
4434        if (flags & SIDE_TRANSPORT_FLAG_WIRE_CONTRACT) != 0 {
4435            let contract_len = usize::try_from(Self::read_uleb128_local(data, &mut off)?)
4436                .map_err(|_| TelemetryError::Unpack("wire contract length"))?;
4437            if data.len() < off + contract_len {
4438                return Err(TelemetryError::Unpack("short buffer"));
4439            }
4440            off += contract_len;
4441        }
4442        let reliable_span = wire_format::reliable_header_span(bytes)?;
4443        let (reliable_flags, reliable_seq_ack, reliable_compact, payload_off) =
4444            if let Some((rel_off, rel_len, hdr)) = reliable_span {
4445                if data.len() < rel_off + rel_len {
4446                    return Err(TelemetryError::Unpack("short buffer"));
4447                }
4448                (
4449                    Some(hdr.flags),
4450                    Some((hdr.seq, hdr.ack)),
4451                    (flags & SIDE_TRANSPORT_FLAG_COMPACT_RELIABLE_HEADER) != 0,
4452                    rel_off + rel_len,
4453                )
4454            } else {
4455                (None, None, false, off)
4456            };
4457        if payload_off > data.len() {
4458            return Err(TelemetryError::Unpack("short buffer"));
4459        }
4460        let payload = &data[payload_off..];
4461        let prefix = Arc::<[u8]>::from(&data[1..data_size_off]);
4462        let between_end = reliable_span
4463            .map(|(rel_off, _, _)| rel_off)
4464            .unwrap_or(payload_off);
4465        let between = Arc::<[u8]>::from(&data[between_start..between_end]);
4466        let base_flags =
4467            flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE);
4468        let mut hash = 0xD1B5_4A32_9C7E_01F3u64;
4469        hash = hash_bytes_u64(hash, &[base_flags]);
4470        hash = hash_bytes_u64(hash, &prefix);
4471        hash = hash_bytes_u64(hash, &between);
4472        if let Some(rel_flags) = reliable_flags {
4473            hash = hash_bytes_u64(hash, &[rel_flags]);
4474        }
4475        let template = SideHeaderTemplate {
4476            hash,
4477            base_flags,
4478            prefix,
4479            between,
4480            reliable_flags,
4481            reliable_compact,
4482        };
4483        Ok((
4484            template,
4485            ty,
4486            flags,
4487            data_size,
4488            timestamp,
4489            nonce,
4490            reliable_seq_ack,
4491            payload,
4492        ))
4493    }
4494
4495    fn reconstruct_side_compact_frame(
4496        template: &SideHeaderTemplate,
4497        body: &[u8],
4498        timestamp_mode: SideCompactTimestampMode,
4499        timestamp_base: Option<u64>,
4500    ) -> TelemetryResult<(Arc<[u8]>, u64)> {
4501        if body.is_empty() {
4502            return Err(TelemetryError::Unpack("short side compact frame"));
4503        }
4504        let mut off = 0usize;
4505        let flags = body[off];
4506        off += 1;
4507        if (flags & !(SIDE_TRANSPORT_FLAG_PAYLOAD_COMPRESSED | SIDE_TRANSPORT_FLAG_PACKET_NONCE))
4508            != template.base_flags
4509        {
4510            return Err(TelemetryError::Unpack("side compact flags mismatch"));
4511        }
4512        let data_size = Self::read_uleb128_local(body, &mut off)?;
4513        let timestamp = match timestamp_mode {
4514            SideCompactTimestampMode::Absolute => Self::read_uleb128_local(body, &mut off)?,
4515            SideCompactTimestampMode::Delta => {
4516                let timestamp_field = Self::read_uleb128_local(body, &mut off)?;
4517                let base = timestamp_base.ok_or(TelemetryError::Unpack(
4518                    "missing side compact timestamp context",
4519                ))?;
4520                base.checked_add(timestamp_field)
4521                    .ok_or(TelemetryError::Unpack(
4522                        "side compact timestamp delta overflow",
4523                    ))?
4524            }
4525            SideCompactTimestampMode::Omitted => timestamp_base.ok_or(TelemetryError::Unpack(
4526                "missing side compact timestamp context",
4527            ))?,
4528        };
4529        let nonce = if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4530            Some(Self::read_uleb128_local(body, &mut off)?)
4531        } else {
4532            None
4533        };
4534        let reliable_seq_ack = if template.reliable_flags.is_some() {
4535            let seq = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4536                .map_err(|_| TelemetryError::Unpack("side compact reliable seq too large"))?;
4537            let ack = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4538                .map_err(|_| TelemetryError::Unpack("side compact reliable ack too large"))?;
4539            Some((seq, ack))
4540        } else {
4541            None
4542        };
4543        let payload = &body[off..];
4544        let mut raw = Vec::with_capacity(
4545            1 + template.prefix.len() + template.between.len() + payload.len() + 32,
4546        );
4547        raw.push(flags);
4548        raw.extend_from_slice(&template.prefix);
4549        Self::write_uleb128_local(data_size, &mut raw);
4550        Self::write_uleb128_local(timestamp, &mut raw);
4551        if let Some(nonce) = nonce {
4552            Self::write_uleb128_local(nonce, &mut raw);
4553        }
4554        raw.extend_from_slice(&template.between);
4555        if let Some(rel_flags) = template.reliable_flags {
4556            let (seq, ack) =
4557                reliable_seq_ack.ok_or(TelemetryError::Unpack("missing side compact reliable"))?;
4558            wire_format::write_reliable_header_encoded(
4559                wire_format::ReliableHeader {
4560                    flags: rel_flags,
4561                    seq,
4562                    ack,
4563                },
4564                template.reliable_compact,
4565                &mut raw,
4566            );
4567        }
4568        raw.extend_from_slice(payload);
4569        let crc = Self::crc32_bytes(&raw);
4570        raw.extend_from_slice(&crc.to_le_bytes());
4571        Ok((Arc::from(raw), timestamp))
4572    }
4573
4574    fn split_side_transport_frame(
4575        &self,
4576        side: RelaySideId,
4577        frame: Arc<[u8]>,
4578        max_frame_bytes: usize,
4579    ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4580        if max_frame_bytes <= SIDE_TRANSPORT_CHUNK_OVERHEAD {
4581            return Err(TelemetryError::BadArg);
4582        }
4583        let payload_budget = max_frame_bytes - SIDE_TRANSPORT_CHUNK_OVERHEAD;
4584        let mut st = self.state.lock();
4585        let side_state = st
4586            .side_transport
4587            .get_mut(&side)
4588            .ok_or(TelemetryError::BadArg)?;
4589        let transfer_id = side_state.next_chunk_id.wrapping_add(1).max(1);
4590        side_state.next_chunk_id = transfer_id;
4591        drop(st);
4592
4593        let total = frame.len().div_ceil(payload_budget);
4594        let total_u16 =
4595            u16::try_from(total).map_err(|_| TelemetryError::PacketTooLarge("too many chunks"))?;
4596        let mut frames = Vec::with_capacity(total);
4597        for (idx, chunk) in frame.chunks(payload_budget).enumerate() {
4598            let mut body = Vec::with_capacity(8 + chunk.len());
4599            body.extend_from_slice(&transfer_id.to_le_bytes());
4600            body.extend_from_slice(&(idx as u16).to_le_bytes());
4601            body.extend_from_slice(&total_u16.to_le_bytes());
4602            body.extend_from_slice(chunk);
4603            frames.push(Self::wrap_side_transport_frame(
4604                SIDE_TRANSPORT_KIND_CHUNK,
4605                &body,
4606            ));
4607        }
4608        Ok(frames)
4609    }
4610
4611    fn encode_side_transport_frames(
4612        &self,
4613        side: RelaySideId,
4614        opts: RelaySideOptions,
4615        raw: Arc<[u8]>,
4616    ) -> TelemetryResult<Vec<Arc<[u8]>>> {
4617        if !opts.header_template_enabled && opts.max_frame_bytes == 0 {
4618            return Ok(vec![raw]);
4619        }
4620        let raw_len = raw.len();
4621        let mut compact_payload_len = None;
4622        let mut used_compact = false;
4623        let mut used_timestamp_delta = false;
4624        let mut omitted_timestamp = false;
4625        let (template, ty, flags, data_size, timestamp, nonce, reliable_seq_ack, payload) =
4626            Self::extract_side_header_template(raw.as_ref())?;
4627        let (template_id, use_compact, previous_timestamp) = {
4628            let mut st = self.state.lock();
4629            let side_state = st
4630                .side_transport
4631                .get_mut(&side)
4632                .ok_or(TelemetryError::BadArg)?;
4633            if let Some(id) = side_state.tx_template_ids.get(&template.hash).copied() {
4634                let previous = side_state.tx_last_timestamps.get(&id).copied();
4635                (id, true, previous)
4636            } else {
4637                let next = side_state.next_template_id.wrapping_add(1).max(1);
4638                side_state.next_template_id = next;
4639                let evicted = side_state.insert_tx_template(
4640                    template,
4641                    next,
4642                    opts.max_side_transport_templates,
4643                );
4644                if evicted {
4645                    st.side_runtime_stats
4646                        .entry(side)
4647                        .or_default()
4648                        .note_side_transport_template_eviction();
4649                }
4650                if let Some(side_state) = st.side_transport.get_mut(&side) {
4651                    side_state.tx_last_timestamps.insert(next, timestamp);
4652                }
4653                (next, false, None)
4654            }
4655        };
4656        let wrapped = if use_compact {
4657            used_compact = true;
4658            compact_payload_len = Some(payload.len());
4659            let timestamp_field = if let Some(previous) = previous_timestamp {
4660                let delta = timestamp.saturating_sub(previous);
4661                let omit_timestamp = opts.omit_unchanged_compact_timestamps
4662                    || opts.compact_timestamp_omission_types.contains(ty);
4663                if omit_timestamp && timestamp == previous {
4664                    omitted_timestamp = true;
4665                    None
4666                } else if timestamp >= previous
4667                    && Self::uleb128_len_local(delta) < Self::uleb128_len_local(timestamp)
4668                {
4669                    used_timestamp_delta = true;
4670                    Some(delta)
4671                } else {
4672                    Some(timestamp)
4673                }
4674            } else {
4675                Some(timestamp)
4676            };
4677            let mut body = Vec::with_capacity(payload.len() + 32);
4678            body.push(flags);
4679            Self::write_uleb128_local(u64::from(template_id), &mut body);
4680            Self::write_uleb128_local(data_size, &mut body);
4681            if let Some(timestamp_field) = timestamp_field {
4682                Self::write_uleb128_local(timestamp_field, &mut body);
4683            }
4684            if (flags & SIDE_TRANSPORT_FLAG_PACKET_NONCE) != 0 {
4685                Self::write_uleb128_local(u64::from(nonce), &mut body);
4686            }
4687            if let Some((seq, ack)) = reliable_seq_ack {
4688                Self::write_uleb128_local(u64::from(seq), &mut body);
4689                Self::write_uleb128_local(u64::from(ack), &mut body);
4690            }
4691            body.extend_from_slice(payload);
4692            {
4693                let mut st = self.state.lock();
4694                if let Some(side_state) = st.side_transport.get_mut(&side) {
4695                    side_state.tx_last_timestamps.insert(template_id, timestamp);
4696                }
4697            }
4698            let kind = if omitted_timestamp {
4699                SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4700            } else if used_timestamp_delta {
4701                SIDE_TRANSPORT_KIND_COMPACT_DELTA
4702            } else {
4703                SIDE_TRANSPORT_KIND_COMPACT
4704            };
4705            Self::wrap_side_transport_frame(kind, &body)
4706        } else {
4707            let mut body = Vec::with_capacity(raw.len() + 4);
4708            Self::write_uleb128_local(u64::from(template_id), &mut body);
4709            body.extend_from_slice(raw.as_ref());
4710            Self::wrap_side_transport_frame(SIDE_TRANSPORT_KIND_FULL, &body)
4711        };
4712        let frames = if opts.max_frame_bytes != 0 && wrapped.len() > opts.max_frame_bytes {
4713            self.split_side_transport_frame(side, wrapped, opts.max_frame_bytes)
4714        } else {
4715            Ok(vec![wrapped])
4716        }?;
4717        let wire_len = frames.iter().map(|frame| frame.len()).sum::<usize>();
4718        let mut st = self.state.lock();
4719        let stats = st.side_runtime_stats.entry(side).or_default();
4720        if used_compact {
4721            let overhead = compact_payload_len
4722                .map(|payload_len| wire_len.saturating_sub(payload_len))
4723                .unwrap_or(wire_len);
4724            stats.note_side_transport_compact(
4725                raw_len,
4726                wire_len,
4727                overhead,
4728                used_timestamp_delta,
4729                omitted_timestamp,
4730            );
4731            if opts.compact_header_target_bytes != 0 && overhead > opts.compact_header_target_bytes
4732            {
4733                stats.note_side_transport_compact_target_miss();
4734            }
4735        } else {
4736            stats.note_side_transport_full(raw_len, wire_len);
4737        }
4738        if frames.len() > 1 {
4739            stats.note_side_transport_chunks(frames.len());
4740        }
4741        Ok(frames)
4742    }
4743
4744    fn decode_side_transport_frame(
4745        &self,
4746        side: RelaySideId,
4747        bytes: &[u8],
4748    ) -> TelemetryResult<Option<Arc<[u8]>>> {
4749        let Some((kind, body)) = Self::parse_side_transport_wrapper(bytes)? else {
4750            return Ok(Some(Arc::from(bytes)));
4751        };
4752        match kind {
4753            SIDE_TRANSPORT_KIND_FULL => {
4754                let mut off = 0usize;
4755                let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4756                    .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4757                let raw = Arc::<[u8]>::from(&body[off..]);
4758                if let Ok((template, _, _, _, timestamp, _, _, _)) =
4759                    Self::extract_side_header_template(raw.as_ref())
4760                {
4761                    let mut st = self.state.lock();
4762                    let max_templates = st
4763                        .sides
4764                        .get(side)
4765                        .and_then(|side| side.as_ref())
4766                        .map(|side| side.opts.max_side_transport_templates)
4767                        .unwrap_or(DEFAULT_SIDE_TRANSPORT_TEMPLATE_LIMIT);
4768                    let evicted = st.side_transport.get_mut(&side).is_some_and(|side_state| {
4769                        let evicted =
4770                            side_state.insert_rx_template(template_id, template, max_templates);
4771                        side_state.rx_last_timestamps.insert(template_id, timestamp);
4772                        evicted
4773                    });
4774                    if evicted {
4775                        st.side_runtime_stats
4776                            .entry(side)
4777                            .or_default()
4778                            .note_side_transport_template_eviction();
4779                    }
4780                }
4781                Ok(Some(raw))
4782            }
4783            SIDE_TRANSPORT_KIND_COMPACT
4784            | SIDE_TRANSPORT_KIND_COMPACT_DELTA
4785            | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => {
4786                if body.is_empty() {
4787                    return Err(TelemetryError::Unpack("short side compact frame"));
4788                }
4789                let mut off = 1usize;
4790                let template_id = u32::try_from(Self::read_uleb128_local(body, &mut off)?)
4791                    .map_err(|_| TelemetryError::Unpack("side template id too large"))?;
4792                let mut compact_body = Vec::with_capacity(1 + body.len().saturating_sub(off));
4793                compact_body.push(body[0]);
4794                compact_body.extend_from_slice(&body[off..]);
4795                let (template, timestamp_base) = {
4796                    let st = self.state.lock();
4797                    let state = st.side_transport.get(&side);
4798                    let template = state
4799                        .and_then(|state| state.rx_templates_by_id.get(&template_id))
4800                        .cloned();
4801                    let timestamp_base = if matches!(
4802                        kind,
4803                        SIDE_TRANSPORT_KIND_COMPACT_DELTA
4804                            | SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP
4805                    ) {
4806                        state
4807                            .and_then(|state| state.rx_last_timestamps.get(&template_id))
4808                            .copied()
4809                    } else {
4810                        None
4811                    };
4812                    (template, timestamp_base)
4813                };
4814                let template =
4815                    template.ok_or(TelemetryError::Unpack("unknown side compact template"))?;
4816                let timestamp_mode = match kind {
4817                    SIDE_TRANSPORT_KIND_COMPACT_DELTA => SideCompactTimestampMode::Delta,
4818                    SIDE_TRANSPORT_KIND_COMPACT_SAME_TIMESTAMP => SideCompactTimestampMode::Omitted,
4819                    _ => SideCompactTimestampMode::Absolute,
4820                };
4821                let (frame, timestamp) = Self::reconstruct_side_compact_frame(
4822                    &template,
4823                    &compact_body,
4824                    timestamp_mode,
4825                    timestamp_base,
4826                )?;
4827                let mut st = self.state.lock();
4828                if let Some(side_state) = st.side_transport.get_mut(&side) {
4829                    side_state.rx_last_timestamps.insert(template_id, timestamp);
4830                }
4831                Ok(Some(frame))
4832            }
4833            SIDE_TRANSPORT_KIND_CHUNK => {
4834                if body.len() < 8 {
4835                    return Err(TelemetryError::Unpack("short side chunk frame"));
4836                }
4837                let transfer_id = u32::from_le_bytes([body[0], body[1], body[2], body[3]]);
4838                let index = u16::from_le_bytes([body[4], body[5]]);
4839                let total = u16::from_le_bytes([body[6], body[7]]);
4840                let payload = Arc::<[u8]>::from(&body[8..]);
4841                let assembled = {
4842                    let mut st = self.state.lock();
4843                    let side_state = st
4844                        .side_transport
4845                        .get_mut(&side)
4846                        .ok_or(TelemetryError::BadArg)?;
4847                    let entry = side_state.rx_chunks.entry(transfer_id).or_default();
4848                    if entry.total == 0 {
4849                        entry.total = total;
4850                    } else if entry.total != total {
4851                        side_state.rx_chunks.remove(&transfer_id);
4852                        return Err(TelemetryError::Unpack("side chunk total mismatch"));
4853                    }
4854                    entry.received.entry(index).or_insert(payload);
4855                    if entry.received.len() == usize::from(total) {
4856                        let entry = side_state
4857                            .rx_chunks
4858                            .remove(&transfer_id)
4859                            .ok_or(TelemetryError::Unpack("side chunk missing"))?;
4860                        let mut out = Vec::new();
4861                        for idx in 0..entry.total {
4862                            let chunk = entry
4863                                .received
4864                                .get(&idx)
4865                                .ok_or(TelemetryError::Unpack("side chunk gap"))?;
4866                            out.extend_from_slice(chunk);
4867                        }
4868                        Some(Arc::<[u8]>::from(out))
4869                    } else {
4870                        None
4871                    }
4872                };
4873                match assembled {
4874                    Some(frame) => self.decode_side_transport_frame(side, frame.as_ref()),
4875                    None => Ok(None),
4876                }
4877            }
4878            _ => Err(TelemetryError::Unpack("unknown side transport frame")),
4879        }
4880    }
4881
4882    /// Helper: call a TX handler with the best representation we have.
4883    /// - Packet handler + Packet item: direct.
4884    /// - Packed handler + Packed item: direct.
4885    /// - Packet handler + Packed item: unpack for this call.
4886    /// - Packed handler + Packet item: pack for this call.
4887    fn call_tx_handler(
4888        &self,
4889        side: RelaySideId,
4890        handler: &RelayTxHandlerFn,
4891        data: &RelayItem,
4892    ) -> TelemetryResult<()> {
4893        let opts = {
4894            let st = self.state.lock();
4895            Self::side_ref(&st, side)?.opts
4896        };
4897        let Some(_side_tx_guard) = self.try_enter_side_tx() else {
4898            return Err(TelemetryError::Io("side tx busy"));
4899        };
4900        let started_ms = self.clock.now_ms();
4901        let ty = match data {
4902            RelayItem::Packet(pkt) => pkt.data_type(),
4903            RelayItem::Packed(bytes) => wire_format::peek_envelope(bytes.as_ref())?.ty,
4904        };
4905        let result = match (handler, data) {
4906            // Fast paths
4907            (RelayTxHandlerFn::Packed(f), RelayItem::Packed(bytes)) => {
4908                let frames = self.encode_side_transport_frames(side, opts, bytes.clone())?;
4909                let mut sent_bytes = 0usize;
4910                for frame in frames {
4911                    f(frame.as_ref())?;
4912                    sent_bytes = sent_bytes.saturating_add(frame.len());
4913                }
4914                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4915                self.note_side_tx_success(side, ty, sent_bytes, 1);
4916                return Ok(());
4917            }
4918            (RelayTxHandlerFn::Packet(f), RelayItem::Packet(pkt)) => f(pkt),
4919
4920            // Conversion paths
4921            (RelayTxHandlerFn::Packed(f), RelayItem::Packet(pkt)) => {
4922                let owned = wire_format::pack_packet(pkt);
4923                let frames = self.encode_side_transport_frames(side, opts, owned)?;
4924                let mut sent_bytes = 0usize;
4925                for frame in frames {
4926                    f(frame.as_ref())?;
4927                    sent_bytes = sent_bytes.saturating_add(frame.len());
4928                }
4929                self.record_side_tx_sample(side, sent_bytes, started_ms, self.clock.now_ms());
4930                self.note_side_tx_success(side, ty, sent_bytes, 1);
4931                return Ok(());
4932            }
4933            (RelayTxHandlerFn::Packet(f), RelayItem::Packed(bytes)) => {
4934                if wire_format::peek_frame_info(bytes.as_ref())
4935                    .ok()
4936                    .is_some_and(|frame| frame.ack_only())
4937                {
4938                    return Ok(());
4939                }
4940                let pkt = wire_format::unpack_packet(bytes.as_ref())?;
4941                f(&pkt)
4942            }
4943        };
4944        if result.is_ok()
4945            && let Ok(bytes) = Self::relay_item_wire_len(data)
4946        {
4947            self.record_side_tx_sample(side, bytes, started_ms, self.clock.now_ms());
4948            self.note_side_tx_success(side, ty, bytes, 1);
4949        } else if result.is_err() {
4950            self.note_side_tx_failure(side, ty, 1);
4951        }
4952        result
4953    }
4954
4955    fn adjust_reliable_for_side(
4956        &self,
4957        opts: RelaySideOptions,
4958        data: RelayItem,
4959    ) -> TelemetryResult<Option<RelayItem>> {
4960        if opts.reliable_enabled {
4961            return Ok(Some(data));
4962        }
4963
4964        match data {
4965            RelayItem::Packed(bytes) => {
4966                let frame = match wire_format::peek_frame_info(bytes.as_ref()) {
4967                    Ok(frame) => frame,
4968                    Err(_) => return Ok(Some(RelayItem::Packed(bytes))),
4969                };
4970                if is_reliable_type(frame.envelope.ty)
4971                    && let Some(hdr) = frame.reliable
4972                {
4973                    if (hdr.flags & wire_format::RELIABLE_FLAG_ACK_ONLY) != 0 {
4974                        return Ok(None);
4975                    }
4976                    if (hdr.flags & wire_format::RELIABLE_FLAG_UNSEQUENCED) == 0 {
4977                        let Some(rewritten) = wire_format::rewrite_reliable_header_owned(
4978                            bytes.as_ref(),
4979                            wire_format::RELIABLE_FLAG_UNSEQUENCED,
4980                            hdr.seq,
4981                            0,
4982                        )?
4983                        else {
4984                            return Ok(Some(RelayItem::Packed(bytes)));
4985                        };
4986                        return Ok(Some(RelayItem::Packed(rewritten)));
4987                    }
4988                }
4989                Ok(Some(RelayItem::Packed(bytes)))
4990            }
4991            RelayItem::Packet(pkt) => {
4992                if matches!(
4993                    pkt.data_type(),
4994                    crate::DataType::ReliableAck
4995                        | crate::DataType::ReliablePartialAck
4996                        | crate::DataType::ReliablePacketRequest
4997                ) {
4998                    return Ok(None);
4999                }
5000                Ok(Some(RelayItem::Packet(pkt)))
5001            }
5002        }
5003    }
5004
5005    /// Drain the RX queue fully, expanding to TX items.
5006    #[inline]
5007    pub fn process_rx_queue(&self) -> TelemetryResult<()> {
5008        self.process_rx_queue_with_timeout(0)
5009    }
5010
5011    /// Drain the TX queue fully, invoking per-side TX handlers.
5012    ///
5013    /// If called from inside a side TX callback, this becomes a no-op so relay TX handlers cannot
5014    /// recurse into nested queue drains on the same stack.
5015    #[inline]
5016    pub fn process_tx_queue(&self) -> TelemetryResult<()> {
5017        self.process_tx_queue_with_timeout(0)
5018    }
5019
5020    /// Drain RX then TX queues fully (one pass).
5021    #[inline]
5022    pub fn process_all_queues(&self) -> TelemetryResult<()> {
5023        self.process_all_queues_with_timeout(0)
5024    }
5025
5026    /// Process the TX queue for up to `timeout_ms` milliseconds.
5027    ///
5028    /// `timeout_ms == 0` drains fully. If called from inside a side TX callback, this becomes a
5029    /// no-op so relay TX handlers cannot recurse into nested queue drains on the same stack.
5030    pub fn process_tx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5031        if self.side_tx_active() {
5032            return Ok(());
5033        }
5034        #[cfg(feature = "discovery")]
5035        {
5036            let _ = self.poll_discovery()?;
5037        }
5038        let start = self.clock.now_ms();
5039        loop {
5040            self.process_reliable_timeouts()?;
5041            if self.process_replay_queue_item()? {
5042                if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5043                    break;
5044                }
5045                continue;
5046            }
5047            let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() else {
5048                break;
5049            };
5050            match self.send_tx_item(src, dst, handler, opts, data.clone()) {
5051                Ok(sent) => {
5052                    if sent
5053                        && timeout_ms != 0
5054                        && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64
5055                    {
5056                        break;
5057                    }
5058                }
5059                Err(e) if Self::is_side_tx_busy(&e) => {
5060                    let priority = Self::relay_item_priority(&data)?;
5061                    let mut st = self.state.lock();
5062                    st.push_tx(RelayTxItem {
5063                        src,
5064                        dst,
5065                        data,
5066                        priority,
5067                    })?;
5068                    break;
5069                }
5070                Err(e) => return Err(e),
5071            }
5072        }
5073        Ok(())
5074    }
5075
5076    /// Process RX queue with timeout.
5077    pub fn process_rx_queue_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5078        #[cfg(feature = "discovery")]
5079        {
5080            let _ = self.poll_discovery()?;
5081        }
5082        let start = self.clock.now_ms();
5083        loop {
5084            let item_opt = {
5085                let mut st = self.state.lock();
5086                st.rx_queue.pop_front()
5087            };
5088            let Some(item) = item_opt else { break };
5089            self.process_rx_queue_item(item)?;
5090
5091            if timeout_ms != 0 && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5092                break;
5093            }
5094        }
5095        Ok(())
5096    }
5097
5098    /// Process RX and TX queues interleaved for up to `timeout_ms` milliseconds.
5099    ///
5100    /// `timeout_ms == 0` drains fully. If called from inside a side TX callback, this becomes a
5101    /// no-op so relay TX handlers cannot recurse into nested queue drains on the same stack.
5102    pub fn process_all_queues_with_timeout(&self, timeout_ms: u32) -> TelemetryResult<()> {
5103        if self.side_tx_active() {
5104            return Ok(());
5105        }
5106        #[cfg(feature = "discovery")]
5107        {
5108            let _ = self.poll_discovery()?;
5109        }
5110        let drain_fully = timeout_ms == 0;
5111        let start = if drain_fully { 0 } else { self.clock.now_ms() };
5112
5113        loop {
5114            let mut did_any = false;
5115            self.process_reliable_timeouts()?;
5116
5117            // First move RX → TX
5118            if let Some(item) = {
5119                let mut st = self.state.lock();
5120                st.rx_queue.pop_front()
5121            } {
5122                self.process_rx_queue_item(item)?;
5123                did_any = true;
5124            }
5125
5126            if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5127                break;
5128            }
5129
5130            if self.process_replay_queue_item()? {
5131                did_any = true;
5132            }
5133
5134            // Then send out TX
5135            let sent_one = if let Some((src, dst, handler, opts, data)) = self.pop_ready_tx_item() {
5136                self.send_tx_item(src, dst, handler, opts, data)?
5137            } else {
5138                false
5139            };
5140
5141            if sent_one {
5142                did_any = true;
5143            }
5144
5145            if !drain_fully && self.clock.now_ms().wrapping_sub(start) >= timeout_ms as u64 {
5146                break;
5147            }
5148
5149            if !did_any {
5150                break;
5151            }
5152        }
5153
5154        Ok(())
5155    }
5156
5157    /// Runs one application-loop maintenance cycle.
5158    ///
5159    /// This polls built-in discovery when that feature is compiled in, then drains queued RX/TX
5160    /// work for up to `timeout_ms` milliseconds.
5161    pub fn periodic(&self, timeout_ms: u32) -> TelemetryResult<()> {
5162        #[cfg(feature = "discovery")]
5163        {
5164            let _ = self.poll_discovery()?;
5165        }
5166
5167        self.process_all_queues_with_timeout(timeout_ms)
5168    }
5169}