Skip to main content

sedsnet/
discovery.rs

1use alloc::collections::BTreeMap;
2use alloc::string::{String, ToString};
3use alloc::vec;
4use alloc::vec::Vec;
5
6use crate::router::encode_slice_le;
7use crate::{
8    DataEndpoint, DataType, E2eEncryptionPolicy, MessageElement, TelemetryError, TelemetryResult,
9    config::{
10        OwnedDataTypeDefinition, OwnedEndpointDefinition, OwnedRuntimeSchemaSnapshot,
11        RuntimeSchemaSnapshot, e2e_encryption_policy_code, e2e_encryption_policy_from_code,
12        export_schema, message_class_code, message_class_from_code, message_data_type_code,
13        message_data_type_from_code, reliable_code, reliable_from_code,
14    },
15    packet::Packet,
16    try_enum_from_u32,
17};
18
19pub const DISCOVERY_ROUTE_TTL_MS: u64 = 30_000;
20pub const DISCOVERY_FAST_INTERVAL_MS: u64 = 250;
21pub const DISCOVERY_SLOW_INTERVAL_MS: u64 = 5_000;
22pub const DISCOVERY_SLOW_LINK_CAPACITY_BPS: u64 = 512;
23pub const DISCOVERY_SLOW_LINK_PING_INTERVAL_MS: u64 = 15_000;
24pub const DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS: u64 = 120_000;
25pub const TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS: u64 = 30_000;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct DiscoveryCadenceState {
29    pub current_interval_ms: u64,
30    pub next_announce_ms: u64,
31}
32
33impl Default for DiscoveryCadenceState {
34    fn default() -> Self {
35        Self {
36            current_interval_ms: DISCOVERY_FAST_INTERVAL_MS,
37            next_announce_ms: 0,
38        }
39    }
40}
41
42impl DiscoveryCadenceState {
43    /// Switches discovery back to fast cadence and schedules an immediate announce.
44    pub fn on_topology_change(&mut self, now_ms: u64) {
45        self.current_interval_ms = DISCOVERY_FAST_INTERVAL_MS;
46        self.next_announce_ms = now_ms;
47    }
48
49    /// Advances the cadence after sending an announce, backing off toward the slow interval.
50    pub fn on_announce_sent(&mut self, now_ms: u64) {
51        self.next_announce_ms = now_ms.saturating_add(self.current_interval_ms);
52        self.current_interval_ms = core::cmp::min(
53            self.current_interval_ms.saturating_mul(2),
54            DISCOVERY_SLOW_INTERVAL_MS,
55        );
56    }
57
58    /// Returns `true` when discovery should emit another announce at `now_ms`.
59    pub fn due(&self, now_ms: u64) -> bool {
60        now_ms >= self.next_announce_ms
61    }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct TopologyBoardNode {
66    pub sender_id: String,
67    pub reachable_endpoints: Vec<DataEndpoint>,
68    pub reachable_timesync_sources: Vec<String>,
69    pub connections: Vec<String>,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct TopologyLink {
74    pub source: String,
75    pub target: String,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct TopologyAnnouncerRoute {
80    pub sender_id: String,
81    pub reachable_endpoints: Vec<DataEndpoint>,
82    pub reachable_timesync_sources: Vec<String>,
83    pub routers: Vec<TopologyBoardNode>,
84    pub last_seen_ms: u64,
85    pub age_ms: u64,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct TopologySideRoute {
90    pub side_id: usize,
91    pub side_name: &'static str,
92    pub reachable_endpoints: Vec<DataEndpoint>,
93    pub reachable_timesync_sources: Vec<String>,
94    pub announcers: Vec<TopologyAnnouncerRoute>,
95    pub last_seen_ms: u64,
96    pub age_ms: u64,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct TopologySnapshot {
101    pub advertised_endpoints: Vec<DataEndpoint>,
102    pub advertised_timesync_sources: Vec<String>,
103    pub routers: Vec<TopologyBoardNode>,
104    pub links: Vec<TopologyLink>,
105    pub routes: Vec<TopologySideRoute>,
106    pub current_announce_interval_ms: u64,
107    pub next_announce_ms: u64,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ClientStatsSnapshot {
112    pub sender_id: String,
113    pub connected: bool,
114    pub side_ids: Vec<usize>,
115    pub side_names: Vec<&'static str>,
116    pub last_seen_ms: Option<u64>,
117    pub age_ms: Option<u64>,
118    pub reachable_endpoints: Vec<DataEndpoint>,
119    pub reachable_timesync_sources: Vec<String>,
120    pub packets_sent: u64,
121    pub packets_received: u64,
122    pub bytes_sent: u64,
123    pub bytes_received: u64,
124}
125
126pub const LINK_CAPABILITY_HEADER_TEMPLATES: u32 = 0x0000_0001;
127pub const LINK_CAPABILITY_CHUNKING: u32 = 0x0000_0002;
128pub const LINK_CAPABILITY_RELIABILITY: u32 = 0x0000_0004;
129pub const LINK_CAPABILITY_CRYPTO: u32 = 0x0000_0008;
130pub const LINK_CAPABILITY_END_TO_END_RELIABILITY: u32 = 0x0000_0010;
131pub const LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS: u32 = 0x0000_0020;
132
133pub const LINK_PROFILE_CANONICAL: u8 = 0;
134pub const LINK_PROFILE_TEMPLATE: u8 = 1;
135pub const LINK_PROFILE_IPV6_LIKE: u8 = 2;
136pub const LINK_PROFILE_IPV4_LIKE: u8 = 3;
137
138pub const ADDRESS_MODE_DYNAMIC: u8 = 0;
139pub const ADDRESS_MODE_REQUESTED: u8 = 1;
140pub const ADDRESS_MODE_STATIC: u8 = 2;
141pub const ADDRESS_STATE_REQUEST: u8 = 0;
142pub const ADDRESS_STATE_APPROVED: u8 = 1;
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct LinkCapabilities {
146    pub version: u8,
147    pub flags: u32,
148    pub profile: u8,
149    pub max_frame_bytes: u32,
150    pub compact_header_target_bytes: u32,
151    pub max_side_transport_templates: u32,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct AddressAdvertisement {
156    pub hostname: String,
157    pub address: u32,
158    pub requested_address: u32,
159    pub mode: u8,
160    pub state: u8,
161    pub birth_ms: u64,
162    pub owner_hash: u64,
163    pub reachable_endpoints: Vec<DataEndpoint>,
164    pub reachable_timesync_sources: Vec<String>,
165    pub link_capabilities: LinkCapabilities,
166}
167
168#[inline]
169pub const fn is_router_control_endpoint(ep: DataEndpoint) -> bool {
170    matches!(
171        ep,
172        DataEndpoint::TelemetryError | DataEndpoint::TimeSync | DataEndpoint::Discovery
173    )
174}
175
176/// Returns `true` when the endpoint is reserved for discovery control traffic.
177#[inline]
178pub const fn is_discovery_endpoint(ep: DataEndpoint) -> bool {
179    matches!(ep, DataEndpoint::Discovery)
180}
181
182/// Returns `true` when the data type is a discovery control packet type.
183#[inline]
184pub const fn is_discovery_type(ty: DataType) -> bool {
185    matches!(
186        ty,
187        DataType::DiscoveryAnnounce
188            | DataType::DiscoveryTimeSyncSources
189            | DataType::DiscoveryTopology
190            | DataType::DiscoverySchema
191            | DataType::DiscoveryTopologyRequest
192            | DataType::DiscoverySchemaRequest
193            | DataType::ManagedVariableRequest
194            | DataType::ManagedVariableValue
195            | DataType::DiscoveryLeave
196            | DataType::DiscoveryLinkCapabilities
197            | DataType::DiscoveryAddress
198    )
199}
200
201#[inline]
202pub const fn is_discovery_request_type(ty: DataType) -> bool {
203    matches!(
204        ty,
205        DataType::DiscoveryTopologyRequest
206            | DataType::DiscoverySchemaRequest
207            | DataType::ManagedVariableRequest
208            | DataType::DiscoveryLeave
209            | DataType::DiscoveryAddress
210    )
211}
212
213fn sort_dedup_strings(items: &mut Vec<String>) {
214    items.sort_unstable();
215    items.dedup();
216}
217
218/// Normalizes a topology-board list in place so it can be compared, exported, or encoded.
219pub fn normalize_topology_boards(boards: &mut Vec<TopologyBoardNode>) {
220    for board in boards.iter_mut() {
221        board
222            .reachable_endpoints
223            .retain(|ep| !is_router_control_endpoint(*ep));
224        board.reachable_endpoints.sort_unstable();
225        board.reachable_endpoints.dedup();
226        sort_dedup_strings(&mut board.reachable_timesync_sources);
227        board.connections.retain(|peer| peer != &board.sender_id);
228        sort_dedup_strings(&mut board.connections);
229    }
230    boards.sort_unstable_by(|a, b| a.sender_id.cmp(&b.sender_id));
231    boards.dedup_by(|a, b| a.sender_id == b.sender_id);
232}
233
234pub fn topology_links_from_boards(boards: &[TopologyBoardNode]) -> Vec<TopologyLink> {
235    let mut links = Vec::new();
236    for board in boards {
237        for peer in &board.connections {
238            if peer == &board.sender_id {
239                continue;
240            }
241            let (source, target) = if board.sender_id <= *peer {
242                (board.sender_id.clone(), peer.clone())
243            } else {
244                (peer.clone(), board.sender_id.clone())
245            };
246            links.push(TopologyLink { source, target });
247        }
248    }
249    links.sort_unstable_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
250    links.dedup_by(|a, b| a.source == b.source && a.target == b.target);
251    links
252}
253
254/// Merges board topology views keyed by sender ID.
255pub fn merge_topology_boards(dst: &mut Vec<TopologyBoardNode>, src: &[TopologyBoardNode]) {
256    let mut merged: BTreeMap<String, TopologyBoardNode> = dst
257        .iter()
258        .cloned()
259        .map(|board| (board.sender_id.clone(), board))
260        .collect();
261    for board in src {
262        let entry = merged
263            .entry(board.sender_id.clone())
264            .or_insert_with(|| TopologyBoardNode {
265                sender_id: board.sender_id.clone(),
266                reachable_endpoints: Vec::new(),
267                reachable_timesync_sources: Vec::new(),
268                connections: Vec::new(),
269            });
270        entry
271            .reachable_endpoints
272            .extend(board.reachable_endpoints.iter().copied());
273        entry
274            .reachable_timesync_sources
275            .extend(board.reachable_timesync_sources.iter().cloned());
276        entry.connections.extend(board.connections.iter().cloned());
277    }
278    let mut out: Vec<TopologyBoardNode> = merged.into_values().collect();
279    normalize_topology_boards(&mut out);
280    *dst = out;
281}
282
283/// Summarizes a board topology list into aggregated endpoint and time-source reachability.
284pub fn summarize_topology_boards(boards: &[TopologyBoardNode]) -> (Vec<DataEndpoint>, Vec<String>) {
285    let mut reachable_endpoints = Vec::new();
286    let mut reachable_timesync_sources = Vec::new();
287    for board in boards {
288        reachable_endpoints.extend(board.reachable_endpoints.iter().copied());
289        reachable_timesync_sources.extend(board.reachable_timesync_sources.iter().cloned());
290    }
291    reachable_endpoints.sort_unstable();
292    reachable_endpoints.dedup();
293    reachable_endpoints.retain(|ep| !is_router_control_endpoint(*ep));
294    sort_dedup_strings(&mut reachable_timesync_sources);
295    (reachable_endpoints, reachable_timesync_sources)
296}
297
298/// Builds a discovery announce packet advertising reachable non-discovery endpoints.
299pub fn build_discovery_announce(
300    sender: &str,
301    timestamp_ms: u64,
302    endpoints: &[DataEndpoint],
303) -> TelemetryResult<Packet> {
304    let payload_words: Vec<u32> = endpoints.iter().copied().map(|ep| ep.as_u32()).collect();
305    Packet::new(
306        DataType::DiscoveryAnnounce,
307        &[DataEndpoint::Discovery],
308        sender,
309        timestamp_ms,
310        encode_slice_le(payload_words.as_slice()),
311    )
312}
313
314/// Decodes a discovery announce packet into its advertised endpoints.
315pub fn decode_discovery_announce(pkt: &Packet) -> TelemetryResult<Vec<DataEndpoint>> {
316    if pkt.data_type() != DataType::DiscoveryAnnounce {
317        return Err(TelemetryError::InvalidType);
318    }
319    decode_discovery_payload(pkt.payload())
320}
321
322/// Decodes a discovery announce payload into a sorted, deduplicated endpoint list.
323pub fn decode_discovery_payload(payload: &[u8]) -> TelemetryResult<Vec<DataEndpoint>> {
324    if !payload.len().is_multiple_of(4) {
325        return Err(TelemetryError::Unpack("discovery payload width"));
326    }
327
328    let mut endpoints = Vec::with_capacity(payload.len() / 4);
329    for chunk in payload.chunks_exact(4) {
330        let raw = u32::from_le_bytes(chunk.try_into().expect("4-byte chunk"));
331        let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
332        if is_discovery_endpoint(ep) {
333            continue;
334        }
335        endpoints.push(ep);
336    }
337    endpoints.sort_unstable();
338    endpoints.dedup();
339    Ok(endpoints)
340}
341
342/// Builds a discovery packet advertising reachable time sync source identifiers.
343pub fn build_discovery_timesync_sources<S: AsRef<str>>(
344    sender: &str,
345    timestamp_ms: u64,
346    sources: &[S],
347) -> TelemetryResult<Packet> {
348    let mut payload = Vec::new();
349    let mut deduped: Vec<&str> = sources.iter().map(|s| s.as_ref()).collect();
350    deduped.sort_unstable();
351    deduped.dedup();
352
353    payload.extend_from_slice(&(deduped.len() as u32).to_le_bytes());
354    for source in deduped {
355        let bytes = source.as_bytes();
356        let len = u32::try_from(bytes.len())
357            .map_err(|_| TelemetryError::Pack("discovery source id too long"))?;
358        payload.extend_from_slice(&len.to_le_bytes());
359        payload.extend_from_slice(bytes);
360    }
361
362    Packet::new(
363        DataType::DiscoveryTimeSyncSources,
364        &[DataEndpoint::Discovery],
365        sender,
366        timestamp_ms,
367        payload.into(),
368    )
369}
370
371pub fn build_discovery_topology_request(
372    sender: &str,
373    timestamp_ms: u64,
374) -> TelemetryResult<Packet> {
375    Packet::new(
376        DataType::DiscoveryTopologyRequest,
377        &[DataEndpoint::Discovery],
378        sender,
379        timestamp_ms,
380        Vec::<u8>::new().into(),
381    )
382}
383
384pub fn build_discovery_schema_request(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
385    Packet::new(
386        DataType::DiscoverySchemaRequest,
387        &[DataEndpoint::Discovery],
388        sender,
389        timestamp_ms,
390        Vec::<u8>::new().into(),
391    )
392}
393
394pub fn build_discovery_leave(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
395    Packet::new(
396        DataType::DiscoveryLeave,
397        &[DataEndpoint::Discovery],
398        sender,
399        timestamp_ms,
400        Vec::<u8>::new().into(),
401    )
402}
403
404pub fn build_discovery_link_capabilities(
405    sender: &str,
406    timestamp_ms: u64,
407    capabilities: LinkCapabilities,
408) -> TelemetryResult<Packet> {
409    let mut payload = Vec::with_capacity(18);
410    payload.push(capabilities.version);
411    payload.extend_from_slice(&capabilities.flags.to_le_bytes());
412    payload.push(capabilities.profile);
413    payload.extend_from_slice(&capabilities.max_frame_bytes.to_le_bytes());
414    payload.extend_from_slice(&capabilities.compact_header_target_bytes.to_le_bytes());
415    payload.extend_from_slice(&capabilities.max_side_transport_templates.to_le_bytes());
416    Packet::new(
417        DataType::DiscoveryLinkCapabilities,
418        &[DataEndpoint::Discovery],
419        sender,
420        timestamp_ms,
421        payload.into(),
422    )
423}
424
425pub fn decode_discovery_link_capabilities(pkt: &Packet) -> TelemetryResult<LinkCapabilities> {
426    if pkt.data_type() != DataType::DiscoveryLinkCapabilities {
427        return Err(TelemetryError::InvalidType);
428    }
429    let payload = pkt.payload();
430    if payload.len() != 18 {
431        return Err(TelemetryError::Unpack("discovery link capabilities width"));
432    }
433    Ok(LinkCapabilities {
434        version: payload[0],
435        flags: u32::from_le_bytes(payload[1..5].try_into().expect("4-byte flags")),
436        profile: payload[5],
437        max_frame_bytes: u32::from_le_bytes(payload[6..10].try_into().expect("4-byte max frame")),
438        compact_header_target_bytes: u32::from_le_bytes(
439            payload[10..14].try_into().expect("4-byte target"),
440        ),
441        max_side_transport_templates: u32::from_le_bytes(
442            payload[14..18].try_into().expect("4-byte templates"),
443        ),
444    })
445}
446
447pub fn build_discovery_address(
448    sender: &str,
449    timestamp_ms: u64,
450    ad: &AddressAdvertisement,
451) -> TelemetryResult<Packet> {
452    let mut payload = Vec::new();
453    payload.push(1);
454    payload.push(ad.mode);
455    payload.push(ad.state);
456    payload.extend_from_slice(&ad.address.to_le_bytes());
457    payload.extend_from_slice(&ad.requested_address.to_le_bytes());
458    payload.extend_from_slice(&ad.birth_ms.to_le_bytes());
459    payload.extend_from_slice(&ad.owner_hash.to_le_bytes());
460    encode_string(&mut payload, &ad.hostname)?;
461    let mut endpoints = ad.reachable_endpoints.clone();
462    endpoints.retain(|ep| !is_discovery_endpoint(*ep));
463    endpoints.sort_unstable();
464    endpoints.dedup();
465    let endpoint_count = u32::try_from(endpoints.len())
466        .map_err(|_| TelemetryError::Pack("discovery address endpoint count"))?;
467    payload.extend_from_slice(&endpoint_count.to_le_bytes());
468    for ep in endpoints {
469        payload.extend_from_slice(&ep.as_u32().to_le_bytes());
470    }
471    let mut sources = ad.reachable_timesync_sources.clone();
472    sort_dedup_strings(&mut sources);
473    let source_count = u32::try_from(sources.len())
474        .map_err(|_| TelemetryError::Pack("discovery address source count"))?;
475    payload.extend_from_slice(&source_count.to_le_bytes());
476    for source in sources {
477        encode_string(&mut payload, &source)?;
478    }
479    payload.push(ad.link_capabilities.version);
480    payload.extend_from_slice(&ad.link_capabilities.flags.to_le_bytes());
481    payload.push(ad.link_capabilities.profile);
482    payload.extend_from_slice(&ad.link_capabilities.max_frame_bytes.to_le_bytes());
483    payload.extend_from_slice(
484        &ad.link_capabilities
485            .compact_header_target_bytes
486            .to_le_bytes(),
487    );
488    payload.extend_from_slice(
489        &ad.link_capabilities
490            .max_side_transport_templates
491            .to_le_bytes(),
492    );
493    Packet::new(
494        DataType::DiscoveryAddress,
495        &[DataEndpoint::Discovery],
496        sender,
497        timestamp_ms,
498        payload.into(),
499    )
500}
501
502pub fn decode_discovery_address(pkt: &Packet) -> TelemetryResult<AddressAdvertisement> {
503    if pkt.data_type() != DataType::DiscoveryAddress {
504        return Err(TelemetryError::InvalidType);
505    }
506    let payload = pkt.payload();
507    let mut cursor = 0usize;
508    let version = read_u8(payload, &mut cursor, "discovery address version")?;
509    if version != 1 {
510        return Err(TelemetryError::Unpack("discovery address version"));
511    }
512    let mode = read_u8(payload, &mut cursor, "discovery address mode")?;
513    let state = read_u8(payload, &mut cursor, "discovery address state")?;
514    let address = read_u32(payload, &mut cursor, "discovery address current")?;
515    let requested_address = read_u32(payload, &mut cursor, "discovery address requested")?;
516    if payload.len().saturating_sub(cursor) < 8 {
517        return Err(TelemetryError::Unpack("discovery address birth"));
518    }
519    let birth_ms = u64::from_le_bytes(
520        payload[cursor..cursor + 8]
521            .try_into()
522            .expect("8-byte birth"),
523    );
524    cursor += 8;
525    if payload.len().saturating_sub(cursor) < 8 {
526        return Err(TelemetryError::Unpack("discovery address owner"));
527    }
528    let owner_hash = u64::from_le_bytes(
529        payload[cursor..cursor + 8]
530            .try_into()
531            .expect("8-byte owner"),
532    );
533    cursor += 8;
534    let hostname = decode_string(payload, &mut cursor, "discovery address hostname")?;
535    let endpoint_count =
536        read_u32(payload, &mut cursor, "discovery address endpoint count")? as usize;
537    let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
538    for _ in 0..endpoint_count {
539        let raw = read_u32(payload, &mut cursor, "discovery address endpoint")?;
540        let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
541        if !is_discovery_endpoint(ep) {
542            reachable_endpoints.push(ep);
543        }
544    }
545    reachable_endpoints.sort_unstable();
546    reachable_endpoints.dedup();
547    let source_count = read_u32(payload, &mut cursor, "discovery address source count")? as usize;
548    let mut reachable_timesync_sources = Vec::with_capacity(source_count);
549    for _ in 0..source_count {
550        let source = decode_string(payload, &mut cursor, "discovery address source")?;
551        if !source.is_empty() {
552            reachable_timesync_sources.push(source);
553        }
554    }
555    sort_dedup_strings(&mut reachable_timesync_sources);
556    let version = read_u8(payload, &mut cursor, "discovery address link version")?;
557    let flags = read_u32(payload, &mut cursor, "discovery address link flags")?;
558    let profile = read_u8(payload, &mut cursor, "discovery address link profile")?;
559    let max_frame_bytes = read_u32(payload, &mut cursor, "discovery address link max frame")?;
560    let compact_header_target_bytes = read_u32(
561        payload,
562        &mut cursor,
563        "discovery address link compact target",
564    )?;
565    let max_side_transport_templates =
566        read_u32(payload, &mut cursor, "discovery address link templates")?;
567    if cursor != payload.len() {
568        return Err(TelemetryError::Unpack("discovery address trailing bytes"));
569    }
570    if hostname.is_empty() || address == 0 {
571        return Err(TelemetryError::Unpack("bad discovery address"));
572    }
573    Ok(AddressAdvertisement {
574        hostname,
575        address,
576        requested_address,
577        mode,
578        state,
579        birth_ms,
580        owner_hash,
581        reachable_endpoints,
582        reachable_timesync_sources,
583        link_capabilities: LinkCapabilities {
584            version,
585            flags,
586            profile,
587            max_frame_bytes,
588            compact_header_target_bytes,
589            max_side_transport_templates,
590        },
591    })
592}
593
594pub fn build_managed_variable_request(
595    sender: &str,
596    timestamp_ms: u64,
597    ty: DataType,
598) -> TelemetryResult<Packet> {
599    Packet::new(
600        DataType::ManagedVariableRequest,
601        &[DataEndpoint::Discovery],
602        sender,
603        timestamp_ms,
604        encode_slice_le(&[ty.as_u32()]),
605    )
606}
607
608pub fn decode_managed_variable_request(pkt: &Packet) -> TelemetryResult<DataType> {
609    if pkt.data_type() != DataType::ManagedVariableRequest {
610        return Err(TelemetryError::InvalidType);
611    }
612    let payload = pkt.payload();
613    if payload.len() != 4 {
614        return Err(TelemetryError::Unpack("managed variable request width"));
615    }
616    let raw = u32::from_le_bytes(payload.try_into().expect("4-byte payload"));
617    try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad managed variable data type"))
618}
619
620/// Decodes a discovery time sync source packet into source identifiers.
621pub fn decode_discovery_timesync_sources(pkt: &Packet) -> TelemetryResult<Vec<String>> {
622    if pkt.data_type() != DataType::DiscoveryTimeSyncSources {
623        return Err(TelemetryError::InvalidType);
624    }
625    decode_discovery_timesync_sources_payload(pkt.payload())
626}
627
628/// Decodes a discovery time sync source payload into a sorted, deduplicated source list.
629pub fn decode_discovery_timesync_sources_payload(payload: &[u8]) -> TelemetryResult<Vec<String>> {
630    if payload.len() < 4 {
631        return Err(TelemetryError::Unpack("discovery timesync source count"));
632    }
633
634    let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
635    let mut cursor = 4usize;
636    let mut out = Vec::with_capacity(count);
637
638    for _ in 0..count {
639        if payload.len().saturating_sub(cursor) < 4 {
640            return Err(TelemetryError::Unpack("discovery timesync source len"));
641        }
642        let len = u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte len"))
643            as usize;
644        cursor += 4;
645        if payload.len().saturating_sub(cursor) < len {
646            return Err(TelemetryError::Unpack("discovery timesync source bytes"));
647        }
648        let raw = &payload[cursor..cursor + len];
649        cursor += len;
650        let source = core::str::from_utf8(raw)
651            .map_err(|_| TelemetryError::Unpack("discovery timesync source utf8"))?;
652        if !source.is_empty() {
653            out.push(source.to_string());
654        }
655    }
656
657    if cursor != payload.len() {
658        return Err(TelemetryError::Unpack("discovery timesync trailing bytes"));
659    }
660
661    out.sort_unstable();
662    out.dedup();
663    Ok(out)
664}
665
666/// Builds a discovery packet advertising the sender's current board/edge topology graph.
667pub fn build_discovery_topology(
668    sender: &str,
669    timestamp_ms: u64,
670    boards: &[TopologyBoardNode],
671) -> TelemetryResult<Packet> {
672    let mut payload = Vec::new();
673    let mut normalized = boards.to_vec();
674    normalize_topology_boards(&mut normalized);
675
676    payload.extend_from_slice(&(normalized.len() as u32).to_le_bytes());
677    for board in normalized {
678        let sender_bytes = board.sender_id.as_bytes();
679        let sender_len = u32::try_from(sender_bytes.len())
680            .map_err(|_| TelemetryError::Pack("discovery topology sender id too long"))?;
681        payload.extend_from_slice(&sender_len.to_le_bytes());
682        payload.extend_from_slice(sender_bytes);
683
684        payload.extend_from_slice(&(board.reachable_endpoints.len() as u32).to_le_bytes());
685        for ep in board.reachable_endpoints {
686            payload.extend_from_slice(&(ep.as_u32()).to_le_bytes());
687        }
688
689        payload.extend_from_slice(&(board.reachable_timesync_sources.len() as u32).to_le_bytes());
690        for source in board.reachable_timesync_sources {
691            let bytes = source.as_bytes();
692            let len = u32::try_from(bytes.len())
693                .map_err(|_| TelemetryError::Pack("discovery topology source id too long"))?;
694            payload.extend_from_slice(&len.to_le_bytes());
695            payload.extend_from_slice(bytes);
696        }
697
698        payload.extend_from_slice(&(board.connections.len() as u32).to_le_bytes());
699        for peer in board.connections {
700            let bytes = peer.as_bytes();
701            let len = u32::try_from(bytes.len())
702                .map_err(|_| TelemetryError::Pack("discovery topology connection id too long"))?;
703            payload.extend_from_slice(&len.to_le_bytes());
704            payload.extend_from_slice(bytes);
705        }
706    }
707
708    Packet::new(
709        DataType::DiscoveryTopology,
710        &[DataEndpoint::Discovery],
711        sender,
712        timestamp_ms,
713        payload.into(),
714    )
715}
716
717fn decode_string(
718    payload: &[u8],
719    cursor: &mut usize,
720    label: &'static str,
721) -> TelemetryResult<String> {
722    if payload.len().saturating_sub(*cursor) < 4 {
723        return Err(TelemetryError::Unpack(label));
724    }
725    let len = u32::from_le_bytes(
726        payload[*cursor..*cursor + 4]
727            .try_into()
728            .expect("4-byte len"),
729    ) as usize;
730    *cursor += 4;
731    if payload.len().saturating_sub(*cursor) < len {
732        return Err(TelemetryError::Unpack(label));
733    }
734    let raw = &payload[*cursor..*cursor + len];
735    *cursor += len;
736    core::str::from_utf8(raw)
737        .map(|s| s.to_string())
738        .map_err(|_| TelemetryError::Unpack(label))
739}
740
741/// Decodes a discovery topology packet into board-node records.
742pub fn decode_discovery_topology(pkt: &Packet) -> TelemetryResult<Vec<TopologyBoardNode>> {
743    if pkt.data_type() != DataType::DiscoveryTopology {
744        return Err(TelemetryError::InvalidType);
745    }
746    decode_discovery_topology_payload(pkt.payload())
747}
748
749/// Decodes a discovery topology payload into normalized board-node records.
750pub fn decode_discovery_topology_payload(
751    payload: &[u8],
752) -> TelemetryResult<Vec<TopologyBoardNode>> {
753    if payload.len() < 4 {
754        return Err(TelemetryError::Unpack("discovery topology board count"));
755    }
756
757    let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
758    let mut cursor = 4usize;
759    let mut boards = Vec::with_capacity(count);
760
761    for _ in 0..count {
762        let sender_id = decode_string(payload, &mut cursor, "discovery topology sender id")?;
763
764        if payload.len().saturating_sub(cursor) < 4 {
765            return Err(TelemetryError::Unpack("discovery topology endpoint count"));
766        }
767        let endpoint_count = u32::from_le_bytes(
768            payload[cursor..cursor + 4]
769                .try_into()
770                .expect("4-byte count"),
771        ) as usize;
772        cursor += 4;
773        let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
774        for _ in 0..endpoint_count {
775            if payload.len().saturating_sub(cursor) < 4 {
776                return Err(TelemetryError::Unpack("discovery topology endpoint"));
777            }
778            let raw =
779                u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte ep"));
780            cursor += 4;
781            let ep =
782                try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
783            if !is_discovery_endpoint(ep) {
784                reachable_endpoints.push(ep);
785            }
786        }
787
788        if payload.len().saturating_sub(cursor) < 4 {
789            return Err(TelemetryError::Unpack(
790                "discovery topology timesync source count",
791            ));
792        }
793        let source_count = u32::from_le_bytes(
794            payload[cursor..cursor + 4]
795                .try_into()
796                .expect("4-byte count"),
797        ) as usize;
798        cursor += 4;
799        let mut reachable_timesync_sources = Vec::with_capacity(source_count);
800        for _ in 0..source_count {
801            let source = decode_string(payload, &mut cursor, "discovery topology timesync source")?;
802            if !source.is_empty() {
803                reachable_timesync_sources.push(source);
804            }
805        }
806
807        if payload.len().saturating_sub(cursor) < 4 {
808            return Err(TelemetryError::Unpack(
809                "discovery topology connection count",
810            ));
811        }
812        let connection_count = u32::from_le_bytes(
813            payload[cursor..cursor + 4]
814                .try_into()
815                .expect("4-byte count"),
816        ) as usize;
817        cursor += 4;
818        let mut connections = Vec::with_capacity(connection_count);
819        for _ in 0..connection_count {
820            let peer = decode_string(payload, &mut cursor, "discovery topology connection")?;
821            if !peer.is_empty() {
822                connections.push(peer);
823            }
824        }
825
826        boards.push(TopologyBoardNode {
827            sender_id,
828            reachable_endpoints,
829            reachable_timesync_sources,
830            connections,
831        });
832    }
833
834    if cursor != payload.len() {
835        return Err(TelemetryError::Unpack("discovery topology trailing bytes"));
836    }
837
838    normalize_topology_boards(&mut boards);
839    Ok(boards)
840}
841
842fn encode_string(payload: &mut Vec<u8>, value: &str) -> TelemetryResult<()> {
843    let len = u32::try_from(value.len())
844        .map_err(|_| TelemetryError::Pack("discovery schema string too long"))?;
845    payload.extend_from_slice(&len.to_le_bytes());
846    payload.extend_from_slice(value.as_bytes());
847    Ok(())
848}
849
850fn read_u8(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u8> {
851    if payload.len().saturating_sub(*cursor) < 1 {
852        return Err(TelemetryError::Unpack(label));
853    }
854    let out = payload[*cursor];
855    *cursor += 1;
856    Ok(out)
857}
858
859fn read_u32(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u32> {
860    if payload.len().saturating_sub(*cursor) < 4 {
861        return Err(TelemetryError::Unpack(label));
862    }
863    let out = u32::from_le_bytes(
864        payload[*cursor..*cursor + 4]
865            .try_into()
866            .expect("4-byte u32"),
867    );
868    *cursor += 4;
869    Ok(out)
870}
871
872/// Builds a discovery packet containing the complete runtime schema snapshot.
873pub fn build_discovery_schema(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
874    build_discovery_schema_from_snapshot(sender, timestamp_ms, export_schema())
875}
876
877/// Elects the authoritative discovery/schema master for the current topology view.
878///
879/// Tie-breaks are deterministic:
880/// 1. Fewest unreachable boards.
881/// 2. Lowest maximum hop distance to any reachable board.
882/// 3. Lowest total hop distance across all reachable boards.
883/// 4. Lexicographically smallest sender ID.
884pub fn elect_discovery_master(local_sender: &str, boards: &[TopologyBoardNode]) -> String {
885    let mut nodes: BTreeMap<String, Vec<String>> = BTreeMap::new();
886    for board in boards {
887        nodes.entry(board.sender_id.clone()).or_default();
888        for peer in board.connections.iter() {
889            nodes
890                .entry(peer.clone())
891                .or_default()
892                .push(board.sender_id.clone());
893            nodes
894                .entry(board.sender_id.clone())
895                .or_default()
896                .push(peer.clone());
897        }
898    }
899    nodes.entry(local_sender.to_string()).or_default();
900    for peers in nodes.values_mut() {
901        peers.sort_unstable();
902        peers.dedup();
903    }
904
905    let mut best_sender = local_sender.to_string();
906    let mut best_unreachable = usize::MAX;
907    let mut best_max_hops = usize::MAX;
908    let mut best_total_hops = usize::MAX;
909
910    for sender in nodes.keys() {
911        let mut frontier = vec![sender.clone()];
912        let mut seen: BTreeMap<String, usize> = BTreeMap::new();
913        seen.insert(sender.clone(), 0);
914        let mut idx = 0;
915        while idx < frontier.len() {
916            let cur = frontier[idx].clone();
917            idx += 1;
918            let cur_dist = seen[&cur];
919            if let Some(peers) = nodes.get(&cur) {
920                for peer in peers {
921                    if !seen.contains_key(peer) {
922                        seen.insert(peer.clone(), cur_dist + 1);
923                        frontier.push(peer.clone());
924                    }
925                }
926            }
927        }
928
929        let unreachable = nodes.len().saturating_sub(seen.len());
930        let max_hops = seen.values().copied().max().unwrap_or(0);
931        let total_hops = seen.values().copied().sum();
932        let better = unreachable < best_unreachable
933            || (unreachable == best_unreachable && max_hops < best_max_hops)
934            || (unreachable == best_unreachable
935                && max_hops == best_max_hops
936                && total_hops < best_total_hops)
937            || (unreachable == best_unreachable
938                && max_hops == best_max_hops
939                && total_hops == best_total_hops
940                && sender < &best_sender);
941        if better {
942            best_sender = sender.clone();
943            best_unreachable = unreachable;
944            best_max_hops = max_hops;
945            best_total_hops = total_hops;
946        }
947    }
948
949    best_sender
950}
951
952/// Builds a discovery schema packet from an explicit snapshot.
953pub fn build_discovery_schema_from_snapshot(
954    sender: &str,
955    timestamp_ms: u64,
956    mut schema: RuntimeSchemaSnapshot,
957) -> TelemetryResult<Packet> {
958    let mut payload = Vec::new();
959    payload.extend_from_slice(&3u32.to_le_bytes());
960
961    schema.endpoints.sort_unstable_by_key(|def| def.id.as_u32());
962    schema.types.sort_unstable_by_key(|def| def.id.as_u32());
963
964    payload.extend_from_slice(&(schema.endpoints.len() as u32).to_le_bytes());
965    for ep in schema.endpoints {
966        payload.extend_from_slice(&ep.id.as_u32().to_le_bytes());
967        payload.push(ep.link_local_only as u8);
968        encode_string(&mut payload, ep.name)?;
969        encode_string(&mut payload, ep.description)?;
970    }
971
972    payload.extend_from_slice(&(schema.types.len() as u32).to_le_bytes());
973    for ty in schema.types {
974        payload.extend_from_slice(&ty.id.as_u32().to_le_bytes());
975        encode_string(&mut payload, ty.name)?;
976        encode_string(&mut payload, ty.description)?;
977        match ty.element {
978            MessageElement::Static(count, data_type, class) => {
979                payload.push(0);
980                payload.extend_from_slice(&(count as u32).to_le_bytes());
981                payload.push(message_data_type_code(data_type));
982                payload.push(message_class_code(class));
983            }
984            MessageElement::Dynamic(data_type, class) => {
985                payload.push(1);
986                payload.extend_from_slice(&0u32.to_le_bytes());
987                payload.push(message_data_type_code(data_type));
988                payload.push(message_class_code(class));
989            }
990        }
991        payload.push(reliable_code(ty.reliable));
992        payload.push(ty.priority);
993        payload.push(e2e_encryption_policy_code(ty.e2e_encryption));
994        payload.extend_from_slice(&(ty.endpoints.len() as u32).to_le_bytes());
995        for ep in ty.endpoints {
996            payload.extend_from_slice(&ep.as_u32().to_le_bytes());
997        }
998    }
999
1000    Packet::new(
1001        DataType::DiscoverySchema,
1002        &[DataEndpoint::Discovery],
1003        sender,
1004        timestamp_ms,
1005        payload.into(),
1006    )
1007}
1008
1009/// Decodes a discovery schema packet.
1010pub fn decode_discovery_schema(pkt: &Packet) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
1011    if pkt.data_type() != DataType::DiscoverySchema {
1012        return Err(TelemetryError::InvalidType);
1013    }
1014    decode_discovery_schema_payload(pkt.payload())
1015}
1016
1017/// Decodes a discovery schema payload into runtime definitions.
1018pub fn decode_discovery_schema_payload(
1019    payload: &[u8],
1020) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
1021    let mut cursor = 0usize;
1022    let version = read_u32(payload, &mut cursor, "discovery schema version")?;
1023    if version != 1 && version != 2 && version != 3 {
1024        return Err(TelemetryError::Unpack("discovery schema version"));
1025    }
1026
1027    let endpoint_count =
1028        read_u32(payload, &mut cursor, "discovery schema endpoint count")? as usize;
1029    let mut endpoints = Vec::with_capacity(endpoint_count);
1030    for _ in 0..endpoint_count {
1031        let id = DataEndpoint(read_u32(
1032            payload,
1033            &mut cursor,
1034            "discovery schema endpoint id",
1035        )?);
1036        let link_local_only =
1037            read_u8(payload, &mut cursor, "discovery schema endpoint flags")? != 0;
1038        let name = decode_string(payload, &mut cursor, "discovery schema endpoint name")?;
1039        let description = if version >= 2 {
1040            decode_string(
1041                payload,
1042                &mut cursor,
1043                "discovery schema endpoint description",
1044            )?
1045        } else {
1046            String::new()
1047        };
1048        endpoints.push(OwnedEndpointDefinition {
1049            id,
1050            name,
1051            description,
1052            link_local_only,
1053        });
1054    }
1055
1056    let type_count = read_u32(payload, &mut cursor, "discovery schema type count")? as usize;
1057    let mut types = Vec::with_capacity(type_count);
1058    for _ in 0..type_count {
1059        let id = DataType(read_u32(payload, &mut cursor, "discovery schema type id")?);
1060        let name = decode_string(payload, &mut cursor, "discovery schema type name")?;
1061        let description = if version >= 2 {
1062            decode_string(payload, &mut cursor, "discovery schema type description")?
1063        } else {
1064            String::new()
1065        };
1066        let element_kind = read_u8(payload, &mut cursor, "discovery schema element kind")?;
1067        let count = read_u32(payload, &mut cursor, "discovery schema element count")? as usize;
1068        let data_type = message_data_type_from_code(read_u8(
1069            payload,
1070            &mut cursor,
1071            "discovery schema data type",
1072        )?)
1073        .ok_or(TelemetryError::Unpack("discovery schema data type"))?;
1074        let class =
1075            message_class_from_code(read_u8(payload, &mut cursor, "discovery schema class")?)
1076                .ok_or(TelemetryError::Unpack("discovery schema class"))?;
1077        let element = match element_kind {
1078            0 => MessageElement::Static(count, data_type, class),
1079            1 => MessageElement::Dynamic(data_type, class),
1080            _ => return Err(TelemetryError::Unpack("discovery schema element kind")),
1081        };
1082        let reliable =
1083            reliable_from_code(read_u8(payload, &mut cursor, "discovery schema reliable")?)
1084                .ok_or(TelemetryError::Unpack("discovery schema reliable"))?;
1085        let priority = read_u8(payload, &mut cursor, "discovery schema priority")?;
1086        let e2e_encryption = if version >= 3 {
1087            e2e_encryption_policy_from_code(read_u8(
1088                payload,
1089                &mut cursor,
1090                "discovery schema e2e cryptography",
1091            )?)
1092            .ok_or(TelemetryError::Unpack("discovery schema e2e cryptography"))?
1093        } else {
1094            E2eEncryptionPolicy::PreferOff
1095        };
1096        let endpoint_count =
1097            read_u32(payload, &mut cursor, "discovery schema type endpoint count")? as usize;
1098        let mut type_endpoints = Vec::with_capacity(endpoint_count);
1099        for _ in 0..endpoint_count {
1100            type_endpoints.push(DataEndpoint(read_u32(
1101                payload,
1102                &mut cursor,
1103                "discovery schema type endpoint",
1104            )?));
1105        }
1106        types.push(OwnedDataTypeDefinition {
1107            id,
1108            name,
1109            description,
1110            element,
1111            endpoints: type_endpoints,
1112            reliable,
1113            priority,
1114            e2e_encryption,
1115        });
1116    }
1117
1118    if cursor != payload.len() {
1119        return Err(TelemetryError::Unpack("discovery schema trailing bytes"));
1120    }
1121    Ok(OwnedRuntimeSchemaSnapshot { endpoints, types })
1122}