1use alloc::collections::BTreeMap;
2use alloc::string::{String, ToString};
3use alloc::vec;
4use alloc::vec::Vec;
5
6use crate::router::encode_slice_le;
7use crate::{
8 DataEndpoint, DataType, E2eEncryptionPolicy, MessageElement, TelemetryError, TelemetryResult,
9 config::{
10 OwnedDataTypeDefinition, OwnedEndpointDefinition, OwnedRuntimeSchemaSnapshot,
11 RuntimeSchemaSnapshot, e2e_encryption_policy_code, e2e_encryption_policy_from_code,
12 export_schema, message_class_code, message_class_from_code, message_data_type_code,
13 message_data_type_from_code, reliable_code, reliable_from_code,
14 },
15 packet::Packet,
16 try_enum_from_u32,
17};
18
19pub const DISCOVERY_ROUTE_TTL_MS: u64 = 30_000;
20pub const DISCOVERY_FAST_INTERVAL_MS: u64 = 250;
21pub const DISCOVERY_SLOW_INTERVAL_MS: u64 = 5_000;
22pub const DISCOVERY_SLOW_LINK_CAPACITY_BPS: u64 = 512;
23pub const DISCOVERY_SLOW_LINK_PING_INTERVAL_MS: u64 = 15_000;
24pub const DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS: u64 = 120_000;
25pub const TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS: u64 = 30_000;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct DiscoveryCadenceState {
29 pub current_interval_ms: u64,
30 pub next_announce_ms: u64,
31}
32
33impl Default for DiscoveryCadenceState {
34 fn default() -> Self {
35 Self {
36 current_interval_ms: DISCOVERY_FAST_INTERVAL_MS,
37 next_announce_ms: 0,
38 }
39 }
40}
41
42impl DiscoveryCadenceState {
43 pub fn on_topology_change(&mut self, now_ms: u64) {
45 self.current_interval_ms = DISCOVERY_FAST_INTERVAL_MS;
46 self.next_announce_ms = now_ms;
47 }
48
49 pub fn on_announce_sent(&mut self, now_ms: u64) {
51 self.next_announce_ms = now_ms.saturating_add(self.current_interval_ms);
52 self.current_interval_ms = core::cmp::min(
53 self.current_interval_ms.saturating_mul(2),
54 DISCOVERY_SLOW_INTERVAL_MS,
55 );
56 }
57
58 pub fn due(&self, now_ms: u64) -> bool {
60 now_ms >= self.next_announce_ms
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct TopologyBoardNode {
66 pub sender_id: String,
67 pub reachable_endpoints: Vec<DataEndpoint>,
68 pub reachable_timesync_sources: Vec<String>,
69 pub connections: Vec<String>,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct TopologyLink {
74 pub source: String,
75 pub target: String,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct TopologyAnnouncerRoute {
80 pub sender_id: String,
81 pub reachable_endpoints: Vec<DataEndpoint>,
82 pub reachable_timesync_sources: Vec<String>,
83 pub routers: Vec<TopologyBoardNode>,
84 pub last_seen_ms: u64,
85 pub age_ms: u64,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct TopologySideRoute {
90 pub side_id: usize,
91 pub side_name: &'static str,
92 pub reachable_endpoints: Vec<DataEndpoint>,
93 pub reachable_timesync_sources: Vec<String>,
94 pub announcers: Vec<TopologyAnnouncerRoute>,
95 pub last_seen_ms: u64,
96 pub age_ms: u64,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct TopologySnapshot {
101 pub advertised_endpoints: Vec<DataEndpoint>,
102 pub advertised_timesync_sources: Vec<String>,
103 pub routers: Vec<TopologyBoardNode>,
104 pub links: Vec<TopologyLink>,
105 pub routes: Vec<TopologySideRoute>,
106 pub current_announce_interval_ms: u64,
107 pub next_announce_ms: u64,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct ClientStatsSnapshot {
112 pub sender_id: String,
113 pub connected: bool,
114 pub side_ids: Vec<usize>,
115 pub side_names: Vec<&'static str>,
116 pub last_seen_ms: Option<u64>,
117 pub age_ms: Option<u64>,
118 pub reachable_endpoints: Vec<DataEndpoint>,
119 pub reachable_timesync_sources: Vec<String>,
120 pub packets_sent: u64,
121 pub packets_received: u64,
122 pub bytes_sent: u64,
123 pub bytes_received: u64,
124}
125
126pub const LINK_CAPABILITY_HEADER_TEMPLATES: u32 = 0x0000_0001;
127pub const LINK_CAPABILITY_CHUNKING: u32 = 0x0000_0002;
128pub const LINK_CAPABILITY_RELIABILITY: u32 = 0x0000_0004;
129pub const LINK_CAPABILITY_CRYPTO: u32 = 0x0000_0008;
130pub const LINK_CAPABILITY_END_TO_END_RELIABILITY: u32 = 0x0000_0010;
131pub const LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS: u32 = 0x0000_0020;
132
133pub const LINK_PROFILE_CANONICAL: u8 = 0;
134pub const LINK_PROFILE_TEMPLATE: u8 = 1;
135pub const LINK_PROFILE_IPV6_LIKE: u8 = 2;
136pub const LINK_PROFILE_IPV4_LIKE: u8 = 3;
137
138pub const ADDRESS_MODE_DYNAMIC: u8 = 0;
139pub const ADDRESS_MODE_REQUESTED: u8 = 1;
140pub const ADDRESS_MODE_STATIC: u8 = 2;
141pub const ADDRESS_STATE_REQUEST: u8 = 0;
142pub const ADDRESS_STATE_APPROVED: u8 = 1;
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct LinkCapabilities {
146 pub version: u8,
147 pub flags: u32,
148 pub profile: u8,
149 pub max_frame_bytes: u32,
150 pub compact_header_target_bytes: u32,
151 pub max_side_transport_templates: u32,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct AddressAdvertisement {
156 pub hostname: String,
157 pub address: u32,
158 pub requested_address: u32,
159 pub mode: u8,
160 pub state: u8,
161 pub birth_ms: u64,
162 pub owner_hash: u64,
163 pub reachable_endpoints: Vec<DataEndpoint>,
164 pub reachable_timesync_sources: Vec<String>,
165 pub link_capabilities: LinkCapabilities,
166}
167
168#[inline]
169pub const fn is_router_control_endpoint(ep: DataEndpoint) -> bool {
170 matches!(
171 ep,
172 DataEndpoint::TelemetryError | DataEndpoint::TimeSync | DataEndpoint::Discovery
173 )
174}
175
176#[inline]
178pub const fn is_discovery_endpoint(ep: DataEndpoint) -> bool {
179 matches!(ep, DataEndpoint::Discovery)
180}
181
182#[inline]
184pub const fn is_discovery_type(ty: DataType) -> bool {
185 matches!(
186 ty,
187 DataType::DiscoveryAnnounce
188 | DataType::DiscoveryTimeSyncSources
189 | DataType::DiscoveryTopology
190 | DataType::DiscoverySchema
191 | DataType::DiscoveryTopologyRequest
192 | DataType::DiscoverySchemaRequest
193 | DataType::ManagedVariableRequest
194 | DataType::ManagedVariableValue
195 | DataType::DiscoveryLeave
196 | DataType::DiscoveryLinkCapabilities
197 | DataType::DiscoveryAddress
198 )
199}
200
201#[inline]
202pub const fn is_discovery_request_type(ty: DataType) -> bool {
203 matches!(
204 ty,
205 DataType::DiscoveryTopologyRequest
206 | DataType::DiscoverySchemaRequest
207 | DataType::ManagedVariableRequest
208 | DataType::DiscoveryLeave
209 | DataType::DiscoveryAddress
210 )
211}
212
213fn sort_dedup_strings(items: &mut Vec<String>) {
214 items.sort_unstable();
215 items.dedup();
216}
217
218pub fn normalize_topology_boards(boards: &mut Vec<TopologyBoardNode>) {
220 for board in boards.iter_mut() {
221 board
222 .reachable_endpoints
223 .retain(|ep| !is_router_control_endpoint(*ep));
224 board.reachable_endpoints.sort_unstable();
225 board.reachable_endpoints.dedup();
226 sort_dedup_strings(&mut board.reachable_timesync_sources);
227 board.connections.retain(|peer| peer != &board.sender_id);
228 sort_dedup_strings(&mut board.connections);
229 }
230 boards.sort_unstable_by(|a, b| a.sender_id.cmp(&b.sender_id));
231 boards.dedup_by(|a, b| a.sender_id == b.sender_id);
232}
233
234pub fn topology_links_from_boards(boards: &[TopologyBoardNode]) -> Vec<TopologyLink> {
235 let mut links = Vec::new();
236 for board in boards {
237 for peer in &board.connections {
238 if peer == &board.sender_id {
239 continue;
240 }
241 let (source, target) = if board.sender_id <= *peer {
242 (board.sender_id.clone(), peer.clone())
243 } else {
244 (peer.clone(), board.sender_id.clone())
245 };
246 links.push(TopologyLink { source, target });
247 }
248 }
249 links.sort_unstable_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
250 links.dedup_by(|a, b| a.source == b.source && a.target == b.target);
251 links
252}
253
254pub fn merge_topology_boards(dst: &mut Vec<TopologyBoardNode>, src: &[TopologyBoardNode]) {
256 let mut merged: BTreeMap<String, TopologyBoardNode> = dst
257 .iter()
258 .cloned()
259 .map(|board| (board.sender_id.clone(), board))
260 .collect();
261 for board in src {
262 let entry = merged
263 .entry(board.sender_id.clone())
264 .or_insert_with(|| TopologyBoardNode {
265 sender_id: board.sender_id.clone(),
266 reachable_endpoints: Vec::new(),
267 reachable_timesync_sources: Vec::new(),
268 connections: Vec::new(),
269 });
270 entry
271 .reachable_endpoints
272 .extend(board.reachable_endpoints.iter().copied());
273 entry
274 .reachable_timesync_sources
275 .extend(board.reachable_timesync_sources.iter().cloned());
276 entry.connections.extend(board.connections.iter().cloned());
277 }
278 let mut out: Vec<TopologyBoardNode> = merged.into_values().collect();
279 normalize_topology_boards(&mut out);
280 *dst = out;
281}
282
283pub fn summarize_topology_boards(boards: &[TopologyBoardNode]) -> (Vec<DataEndpoint>, Vec<String>) {
285 let mut reachable_endpoints = Vec::new();
286 let mut reachable_timesync_sources = Vec::new();
287 for board in boards {
288 reachable_endpoints.extend(board.reachable_endpoints.iter().copied());
289 reachable_timesync_sources.extend(board.reachable_timesync_sources.iter().cloned());
290 }
291 reachable_endpoints.sort_unstable();
292 reachable_endpoints.dedup();
293 reachable_endpoints.retain(|ep| !is_router_control_endpoint(*ep));
294 sort_dedup_strings(&mut reachable_timesync_sources);
295 (reachable_endpoints, reachable_timesync_sources)
296}
297
298pub fn build_discovery_announce(
300 sender: &str,
301 timestamp_ms: u64,
302 endpoints: &[DataEndpoint],
303) -> TelemetryResult<Packet> {
304 let payload_words: Vec<u32> = endpoints.iter().copied().map(|ep| ep.as_u32()).collect();
305 Packet::new(
306 DataType::DiscoveryAnnounce,
307 &[DataEndpoint::Discovery],
308 sender,
309 timestamp_ms,
310 encode_slice_le(payload_words.as_slice()),
311 )
312}
313
314pub fn decode_discovery_announce(pkt: &Packet) -> TelemetryResult<Vec<DataEndpoint>> {
316 if pkt.data_type() != DataType::DiscoveryAnnounce {
317 return Err(TelemetryError::InvalidType);
318 }
319 decode_discovery_payload(pkt.payload())
320}
321
322pub fn decode_discovery_payload(payload: &[u8]) -> TelemetryResult<Vec<DataEndpoint>> {
324 if !payload.len().is_multiple_of(4) {
325 return Err(TelemetryError::Unpack("discovery payload width"));
326 }
327
328 let mut endpoints = Vec::with_capacity(payload.len() / 4);
329 for chunk in payload.chunks_exact(4) {
330 let raw = u32::from_le_bytes(chunk.try_into().expect("4-byte chunk"));
331 let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
332 if is_discovery_endpoint(ep) {
333 continue;
334 }
335 endpoints.push(ep);
336 }
337 endpoints.sort_unstable();
338 endpoints.dedup();
339 Ok(endpoints)
340}
341
342pub fn build_discovery_timesync_sources<S: AsRef<str>>(
344 sender: &str,
345 timestamp_ms: u64,
346 sources: &[S],
347) -> TelemetryResult<Packet> {
348 let mut payload = Vec::new();
349 let mut deduped: Vec<&str> = sources.iter().map(|s| s.as_ref()).collect();
350 deduped.sort_unstable();
351 deduped.dedup();
352
353 payload.extend_from_slice(&(deduped.len() as u32).to_le_bytes());
354 for source in deduped {
355 let bytes = source.as_bytes();
356 let len = u32::try_from(bytes.len())
357 .map_err(|_| TelemetryError::Pack("discovery source id too long"))?;
358 payload.extend_from_slice(&len.to_le_bytes());
359 payload.extend_from_slice(bytes);
360 }
361
362 Packet::new(
363 DataType::DiscoveryTimeSyncSources,
364 &[DataEndpoint::Discovery],
365 sender,
366 timestamp_ms,
367 payload.into(),
368 )
369}
370
371pub fn build_discovery_topology_request(
372 sender: &str,
373 timestamp_ms: u64,
374) -> TelemetryResult<Packet> {
375 Packet::new(
376 DataType::DiscoveryTopologyRequest,
377 &[DataEndpoint::Discovery],
378 sender,
379 timestamp_ms,
380 Vec::<u8>::new().into(),
381 )
382}
383
384pub fn build_discovery_schema_request(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
385 Packet::new(
386 DataType::DiscoverySchemaRequest,
387 &[DataEndpoint::Discovery],
388 sender,
389 timestamp_ms,
390 Vec::<u8>::new().into(),
391 )
392}
393
394pub fn build_discovery_leave(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
395 Packet::new(
396 DataType::DiscoveryLeave,
397 &[DataEndpoint::Discovery],
398 sender,
399 timestamp_ms,
400 Vec::<u8>::new().into(),
401 )
402}
403
404pub fn build_discovery_link_capabilities(
405 sender: &str,
406 timestamp_ms: u64,
407 capabilities: LinkCapabilities,
408) -> TelemetryResult<Packet> {
409 let mut payload = Vec::with_capacity(18);
410 payload.push(capabilities.version);
411 payload.extend_from_slice(&capabilities.flags.to_le_bytes());
412 payload.push(capabilities.profile);
413 payload.extend_from_slice(&capabilities.max_frame_bytes.to_le_bytes());
414 payload.extend_from_slice(&capabilities.compact_header_target_bytes.to_le_bytes());
415 payload.extend_from_slice(&capabilities.max_side_transport_templates.to_le_bytes());
416 Packet::new(
417 DataType::DiscoveryLinkCapabilities,
418 &[DataEndpoint::Discovery],
419 sender,
420 timestamp_ms,
421 payload.into(),
422 )
423}
424
425pub fn decode_discovery_link_capabilities(pkt: &Packet) -> TelemetryResult<LinkCapabilities> {
426 if pkt.data_type() != DataType::DiscoveryLinkCapabilities {
427 return Err(TelemetryError::InvalidType);
428 }
429 let payload = pkt.payload();
430 if payload.len() != 18 {
431 return Err(TelemetryError::Unpack("discovery link capabilities width"));
432 }
433 Ok(LinkCapabilities {
434 version: payload[0],
435 flags: u32::from_le_bytes(payload[1..5].try_into().expect("4-byte flags")),
436 profile: payload[5],
437 max_frame_bytes: u32::from_le_bytes(payload[6..10].try_into().expect("4-byte max frame")),
438 compact_header_target_bytes: u32::from_le_bytes(
439 payload[10..14].try_into().expect("4-byte target"),
440 ),
441 max_side_transport_templates: u32::from_le_bytes(
442 payload[14..18].try_into().expect("4-byte templates"),
443 ),
444 })
445}
446
447pub fn build_discovery_address(
448 sender: &str,
449 timestamp_ms: u64,
450 ad: &AddressAdvertisement,
451) -> TelemetryResult<Packet> {
452 let mut payload = Vec::new();
453 payload.push(1);
454 payload.push(ad.mode);
455 payload.push(ad.state);
456 payload.extend_from_slice(&ad.address.to_le_bytes());
457 payload.extend_from_slice(&ad.requested_address.to_le_bytes());
458 payload.extend_from_slice(&ad.birth_ms.to_le_bytes());
459 payload.extend_from_slice(&ad.owner_hash.to_le_bytes());
460 encode_string(&mut payload, &ad.hostname)?;
461 let mut endpoints = ad.reachable_endpoints.clone();
462 endpoints.retain(|ep| !is_discovery_endpoint(*ep));
463 endpoints.sort_unstable();
464 endpoints.dedup();
465 let endpoint_count = u32::try_from(endpoints.len())
466 .map_err(|_| TelemetryError::Pack("discovery address endpoint count"))?;
467 payload.extend_from_slice(&endpoint_count.to_le_bytes());
468 for ep in endpoints {
469 payload.extend_from_slice(&ep.as_u32().to_le_bytes());
470 }
471 let mut sources = ad.reachable_timesync_sources.clone();
472 sort_dedup_strings(&mut sources);
473 let source_count = u32::try_from(sources.len())
474 .map_err(|_| TelemetryError::Pack("discovery address source count"))?;
475 payload.extend_from_slice(&source_count.to_le_bytes());
476 for source in sources {
477 encode_string(&mut payload, &source)?;
478 }
479 payload.push(ad.link_capabilities.version);
480 payload.extend_from_slice(&ad.link_capabilities.flags.to_le_bytes());
481 payload.push(ad.link_capabilities.profile);
482 payload.extend_from_slice(&ad.link_capabilities.max_frame_bytes.to_le_bytes());
483 payload.extend_from_slice(
484 &ad.link_capabilities
485 .compact_header_target_bytes
486 .to_le_bytes(),
487 );
488 payload.extend_from_slice(
489 &ad.link_capabilities
490 .max_side_transport_templates
491 .to_le_bytes(),
492 );
493 Packet::new(
494 DataType::DiscoveryAddress,
495 &[DataEndpoint::Discovery],
496 sender,
497 timestamp_ms,
498 payload.into(),
499 )
500}
501
502pub fn decode_discovery_address(pkt: &Packet) -> TelemetryResult<AddressAdvertisement> {
503 if pkt.data_type() != DataType::DiscoveryAddress {
504 return Err(TelemetryError::InvalidType);
505 }
506 let payload = pkt.payload();
507 let mut cursor = 0usize;
508 let version = read_u8(payload, &mut cursor, "discovery address version")?;
509 if version != 1 {
510 return Err(TelemetryError::Unpack("discovery address version"));
511 }
512 let mode = read_u8(payload, &mut cursor, "discovery address mode")?;
513 let state = read_u8(payload, &mut cursor, "discovery address state")?;
514 let address = read_u32(payload, &mut cursor, "discovery address current")?;
515 let requested_address = read_u32(payload, &mut cursor, "discovery address requested")?;
516 if payload.len().saturating_sub(cursor) < 8 {
517 return Err(TelemetryError::Unpack("discovery address birth"));
518 }
519 let birth_ms = u64::from_le_bytes(
520 payload[cursor..cursor + 8]
521 .try_into()
522 .expect("8-byte birth"),
523 );
524 cursor += 8;
525 if payload.len().saturating_sub(cursor) < 8 {
526 return Err(TelemetryError::Unpack("discovery address owner"));
527 }
528 let owner_hash = u64::from_le_bytes(
529 payload[cursor..cursor + 8]
530 .try_into()
531 .expect("8-byte owner"),
532 );
533 cursor += 8;
534 let hostname = decode_string(payload, &mut cursor, "discovery address hostname")?;
535 let endpoint_count =
536 read_u32(payload, &mut cursor, "discovery address endpoint count")? as usize;
537 let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
538 for _ in 0..endpoint_count {
539 let raw = read_u32(payload, &mut cursor, "discovery address endpoint")?;
540 let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
541 if !is_discovery_endpoint(ep) {
542 reachable_endpoints.push(ep);
543 }
544 }
545 reachable_endpoints.sort_unstable();
546 reachable_endpoints.dedup();
547 let source_count = read_u32(payload, &mut cursor, "discovery address source count")? as usize;
548 let mut reachable_timesync_sources = Vec::with_capacity(source_count);
549 for _ in 0..source_count {
550 let source = decode_string(payload, &mut cursor, "discovery address source")?;
551 if !source.is_empty() {
552 reachable_timesync_sources.push(source);
553 }
554 }
555 sort_dedup_strings(&mut reachable_timesync_sources);
556 let version = read_u8(payload, &mut cursor, "discovery address link version")?;
557 let flags = read_u32(payload, &mut cursor, "discovery address link flags")?;
558 let profile = read_u8(payload, &mut cursor, "discovery address link profile")?;
559 let max_frame_bytes = read_u32(payload, &mut cursor, "discovery address link max frame")?;
560 let compact_header_target_bytes = read_u32(
561 payload,
562 &mut cursor,
563 "discovery address link compact target",
564 )?;
565 let max_side_transport_templates =
566 read_u32(payload, &mut cursor, "discovery address link templates")?;
567 if cursor != payload.len() {
568 return Err(TelemetryError::Unpack("discovery address trailing bytes"));
569 }
570 if hostname.is_empty() || address == 0 {
571 return Err(TelemetryError::Unpack("bad discovery address"));
572 }
573 Ok(AddressAdvertisement {
574 hostname,
575 address,
576 requested_address,
577 mode,
578 state,
579 birth_ms,
580 owner_hash,
581 reachable_endpoints,
582 reachable_timesync_sources,
583 link_capabilities: LinkCapabilities {
584 version,
585 flags,
586 profile,
587 max_frame_bytes,
588 compact_header_target_bytes,
589 max_side_transport_templates,
590 },
591 })
592}
593
594pub fn build_managed_variable_request(
595 sender: &str,
596 timestamp_ms: u64,
597 ty: DataType,
598) -> TelemetryResult<Packet> {
599 Packet::new(
600 DataType::ManagedVariableRequest,
601 &[DataEndpoint::Discovery],
602 sender,
603 timestamp_ms,
604 encode_slice_le(&[ty.as_u32()]),
605 )
606}
607
608pub fn decode_managed_variable_request(pkt: &Packet) -> TelemetryResult<DataType> {
609 if pkt.data_type() != DataType::ManagedVariableRequest {
610 return Err(TelemetryError::InvalidType);
611 }
612 let payload = pkt.payload();
613 if payload.len() != 4 {
614 return Err(TelemetryError::Unpack("managed variable request width"));
615 }
616 let raw = u32::from_le_bytes(payload.try_into().expect("4-byte payload"));
617 try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad managed variable data type"))
618}
619
620pub fn decode_discovery_timesync_sources(pkt: &Packet) -> TelemetryResult<Vec<String>> {
622 if pkt.data_type() != DataType::DiscoveryTimeSyncSources {
623 return Err(TelemetryError::InvalidType);
624 }
625 decode_discovery_timesync_sources_payload(pkt.payload())
626}
627
628pub fn decode_discovery_timesync_sources_payload(payload: &[u8]) -> TelemetryResult<Vec<String>> {
630 if payload.len() < 4 {
631 return Err(TelemetryError::Unpack("discovery timesync source count"));
632 }
633
634 let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
635 let mut cursor = 4usize;
636 let mut out = Vec::with_capacity(count);
637
638 for _ in 0..count {
639 if payload.len().saturating_sub(cursor) < 4 {
640 return Err(TelemetryError::Unpack("discovery timesync source len"));
641 }
642 let len = u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte len"))
643 as usize;
644 cursor += 4;
645 if payload.len().saturating_sub(cursor) < len {
646 return Err(TelemetryError::Unpack("discovery timesync source bytes"));
647 }
648 let raw = &payload[cursor..cursor + len];
649 cursor += len;
650 let source = core::str::from_utf8(raw)
651 .map_err(|_| TelemetryError::Unpack("discovery timesync source utf8"))?;
652 if !source.is_empty() {
653 out.push(source.to_string());
654 }
655 }
656
657 if cursor != payload.len() {
658 return Err(TelemetryError::Unpack("discovery timesync trailing bytes"));
659 }
660
661 out.sort_unstable();
662 out.dedup();
663 Ok(out)
664}
665
666pub fn build_discovery_topology(
668 sender: &str,
669 timestamp_ms: u64,
670 boards: &[TopologyBoardNode],
671) -> TelemetryResult<Packet> {
672 let mut payload = Vec::new();
673 let mut normalized = boards.to_vec();
674 normalize_topology_boards(&mut normalized);
675
676 payload.extend_from_slice(&(normalized.len() as u32).to_le_bytes());
677 for board in normalized {
678 let sender_bytes = board.sender_id.as_bytes();
679 let sender_len = u32::try_from(sender_bytes.len())
680 .map_err(|_| TelemetryError::Pack("discovery topology sender id too long"))?;
681 payload.extend_from_slice(&sender_len.to_le_bytes());
682 payload.extend_from_slice(sender_bytes);
683
684 payload.extend_from_slice(&(board.reachable_endpoints.len() as u32).to_le_bytes());
685 for ep in board.reachable_endpoints {
686 payload.extend_from_slice(&ep.as_u32().to_le_bytes());
687 }
688
689 payload.extend_from_slice(&(board.reachable_timesync_sources.len() as u32).to_le_bytes());
690 for source in board.reachable_timesync_sources {
691 let bytes = source.as_bytes();
692 let len = u32::try_from(bytes.len())
693 .map_err(|_| TelemetryError::Pack("discovery topology source id too long"))?;
694 payload.extend_from_slice(&len.to_le_bytes());
695 payload.extend_from_slice(bytes);
696 }
697
698 payload.extend_from_slice(&(board.connections.len() as u32).to_le_bytes());
699 for peer in board.connections {
700 let bytes = peer.as_bytes();
701 let len = u32::try_from(bytes.len())
702 .map_err(|_| TelemetryError::Pack("discovery topology connection id too long"))?;
703 payload.extend_from_slice(&len.to_le_bytes());
704 payload.extend_from_slice(bytes);
705 }
706 }
707
708 Packet::new(
709 DataType::DiscoveryTopology,
710 &[DataEndpoint::Discovery],
711 sender,
712 timestamp_ms,
713 payload.into(),
714 )
715}
716
717fn decode_string(
718 payload: &[u8],
719 cursor: &mut usize,
720 label: &'static str,
721) -> TelemetryResult<String> {
722 if payload.len().saturating_sub(*cursor) < 4 {
723 return Err(TelemetryError::Unpack(label));
724 }
725 let len = u32::from_le_bytes(
726 payload[*cursor..*cursor + 4]
727 .try_into()
728 .expect("4-byte len"),
729 ) as usize;
730 *cursor += 4;
731 if payload.len().saturating_sub(*cursor) < len {
732 return Err(TelemetryError::Unpack(label));
733 }
734 let raw = &payload[*cursor..*cursor + len];
735 *cursor += len;
736 core::str::from_utf8(raw)
737 .map(|s| s.to_string())
738 .map_err(|_| TelemetryError::Unpack(label))
739}
740
741pub fn decode_discovery_topology(pkt: &Packet) -> TelemetryResult<Vec<TopologyBoardNode>> {
743 if pkt.data_type() != DataType::DiscoveryTopology {
744 return Err(TelemetryError::InvalidType);
745 }
746 decode_discovery_topology_payload(pkt.payload())
747}
748
749pub fn decode_discovery_topology_payload(
751 payload: &[u8],
752) -> TelemetryResult<Vec<TopologyBoardNode>> {
753 if payload.len() < 4 {
754 return Err(TelemetryError::Unpack("discovery topology board count"));
755 }
756
757 let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
758 let mut cursor = 4usize;
759 let mut boards = Vec::with_capacity(count);
760
761 for _ in 0..count {
762 let sender_id = decode_string(payload, &mut cursor, "discovery topology sender id")?;
763
764 if payload.len().saturating_sub(cursor) < 4 {
765 return Err(TelemetryError::Unpack("discovery topology endpoint count"));
766 }
767 let endpoint_count = u32::from_le_bytes(
768 payload[cursor..cursor + 4]
769 .try_into()
770 .expect("4-byte count"),
771 ) as usize;
772 cursor += 4;
773 let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
774 for _ in 0..endpoint_count {
775 if payload.len().saturating_sub(cursor) < 4 {
776 return Err(TelemetryError::Unpack("discovery topology endpoint"));
777 }
778 let raw =
779 u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte ep"));
780 cursor += 4;
781 let ep =
782 try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
783 if !is_discovery_endpoint(ep) {
784 reachable_endpoints.push(ep);
785 }
786 }
787
788 if payload.len().saturating_sub(cursor) < 4 {
789 return Err(TelemetryError::Unpack(
790 "discovery topology timesync source count",
791 ));
792 }
793 let source_count = u32::from_le_bytes(
794 payload[cursor..cursor + 4]
795 .try_into()
796 .expect("4-byte count"),
797 ) as usize;
798 cursor += 4;
799 let mut reachable_timesync_sources = Vec::with_capacity(source_count);
800 for _ in 0..source_count {
801 let source = decode_string(payload, &mut cursor, "discovery topology timesync source")?;
802 if !source.is_empty() {
803 reachable_timesync_sources.push(source);
804 }
805 }
806
807 if payload.len().saturating_sub(cursor) < 4 {
808 return Err(TelemetryError::Unpack(
809 "discovery topology connection count",
810 ));
811 }
812 let connection_count = u32::from_le_bytes(
813 payload[cursor..cursor + 4]
814 .try_into()
815 .expect("4-byte count"),
816 ) as usize;
817 cursor += 4;
818 let mut connections = Vec::with_capacity(connection_count);
819 for _ in 0..connection_count {
820 let peer = decode_string(payload, &mut cursor, "discovery topology connection")?;
821 if !peer.is_empty() {
822 connections.push(peer);
823 }
824 }
825
826 boards.push(TopologyBoardNode {
827 sender_id,
828 reachable_endpoints,
829 reachable_timesync_sources,
830 connections,
831 });
832 }
833
834 if cursor != payload.len() {
835 return Err(TelemetryError::Unpack("discovery topology trailing bytes"));
836 }
837
838 normalize_topology_boards(&mut boards);
839 Ok(boards)
840}
841
842fn encode_string(payload: &mut Vec<u8>, value: &str) -> TelemetryResult<()> {
843 let len = u32::try_from(value.len())
844 .map_err(|_| TelemetryError::Pack("discovery schema string too long"))?;
845 payload.extend_from_slice(&len.to_le_bytes());
846 payload.extend_from_slice(value.as_bytes());
847 Ok(())
848}
849
850fn read_u8(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u8> {
851 if payload.len().saturating_sub(*cursor) < 1 {
852 return Err(TelemetryError::Unpack(label));
853 }
854 let out = payload[*cursor];
855 *cursor += 1;
856 Ok(out)
857}
858
859fn read_u32(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u32> {
860 if payload.len().saturating_sub(*cursor) < 4 {
861 return Err(TelemetryError::Unpack(label));
862 }
863 let out = u32::from_le_bytes(
864 payload[*cursor..*cursor + 4]
865 .try_into()
866 .expect("4-byte u32"),
867 );
868 *cursor += 4;
869 Ok(out)
870}
871
872pub fn build_discovery_schema(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
874 build_discovery_schema_from_snapshot(sender, timestamp_ms, export_schema())
875}
876
877pub fn elect_discovery_master(local_sender: &str, boards: &[TopologyBoardNode]) -> String {
885 let mut nodes: BTreeMap<String, Vec<String>> = BTreeMap::new();
886 for board in boards {
887 nodes.entry(board.sender_id.clone()).or_default();
888 for peer in board.connections.iter() {
889 nodes
890 .entry(peer.clone())
891 .or_default()
892 .push(board.sender_id.clone());
893 nodes
894 .entry(board.sender_id.clone())
895 .or_default()
896 .push(peer.clone());
897 }
898 }
899 nodes.entry(local_sender.to_string()).or_default();
900 for peers in nodes.values_mut() {
901 peers.sort_unstable();
902 peers.dedup();
903 }
904
905 let mut best_sender = local_sender.to_string();
906 let mut best_unreachable = usize::MAX;
907 let mut best_max_hops = usize::MAX;
908 let mut best_total_hops = usize::MAX;
909
910 for sender in nodes.keys() {
911 let mut frontier = vec![sender.clone()];
912 let mut seen: BTreeMap<String, usize> = BTreeMap::new();
913 seen.insert(sender.clone(), 0);
914 let mut idx = 0;
915 while idx < frontier.len() {
916 let cur = frontier[idx].clone();
917 idx += 1;
918 let cur_dist = seen[&cur];
919 if let Some(peers) = nodes.get(&cur) {
920 for peer in peers {
921 if !seen.contains_key(peer) {
922 seen.insert(peer.clone(), cur_dist + 1);
923 frontier.push(peer.clone());
924 }
925 }
926 }
927 }
928
929 let unreachable = nodes.len().saturating_sub(seen.len());
930 let max_hops = seen.values().copied().max().unwrap_or(0);
931 let total_hops = seen.values().copied().sum();
932 let better = unreachable < best_unreachable
933 || (unreachable == best_unreachable && max_hops < best_max_hops)
934 || (unreachable == best_unreachable
935 && max_hops == best_max_hops
936 && total_hops < best_total_hops)
937 || (unreachable == best_unreachable
938 && max_hops == best_max_hops
939 && total_hops == best_total_hops
940 && sender < &best_sender);
941 if better {
942 best_sender = sender.clone();
943 best_unreachable = unreachable;
944 best_max_hops = max_hops;
945 best_total_hops = total_hops;
946 }
947 }
948
949 best_sender
950}
951
952pub fn build_discovery_schema_from_snapshot(
954 sender: &str,
955 timestamp_ms: u64,
956 mut schema: RuntimeSchemaSnapshot,
957) -> TelemetryResult<Packet> {
958 let mut payload = Vec::new();
959 payload.extend_from_slice(&3u32.to_le_bytes());
960
961 schema.endpoints.sort_unstable_by_key(|def| def.id.as_u32());
962 schema.types.sort_unstable_by_key(|def| def.id.as_u32());
963
964 payload.extend_from_slice(&(schema.endpoints.len() as u32).to_le_bytes());
965 for ep in schema.endpoints {
966 payload.extend_from_slice(&ep.id.as_u32().to_le_bytes());
967 payload.push(ep.link_local_only as u8);
968 encode_string(&mut payload, ep.name)?;
969 encode_string(&mut payload, ep.description)?;
970 }
971
972 payload.extend_from_slice(&(schema.types.len() as u32).to_le_bytes());
973 for ty in schema.types {
974 payload.extend_from_slice(&ty.id.as_u32().to_le_bytes());
975 encode_string(&mut payload, ty.name)?;
976 encode_string(&mut payload, ty.description)?;
977 match ty.element {
978 MessageElement::Static(count, data_type, class) => {
979 payload.push(0);
980 payload.extend_from_slice(&(count as u32).to_le_bytes());
981 payload.push(message_data_type_code(data_type));
982 payload.push(message_class_code(class));
983 }
984 MessageElement::Dynamic(data_type, class) => {
985 payload.push(1);
986 payload.extend_from_slice(&0u32.to_le_bytes());
987 payload.push(message_data_type_code(data_type));
988 payload.push(message_class_code(class));
989 }
990 }
991 payload.push(reliable_code(ty.reliable));
992 payload.push(ty.priority);
993 payload.push(e2e_encryption_policy_code(ty.e2e_encryption));
994 payload.extend_from_slice(&(ty.endpoints.len() as u32).to_le_bytes());
995 for ep in ty.endpoints {
996 payload.extend_from_slice(&ep.as_u32().to_le_bytes());
997 }
998 }
999
1000 Packet::new(
1001 DataType::DiscoverySchema,
1002 &[DataEndpoint::Discovery],
1003 sender,
1004 timestamp_ms,
1005 payload.into(),
1006 )
1007}
1008
1009pub fn decode_discovery_schema(pkt: &Packet) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
1011 if pkt.data_type() != DataType::DiscoverySchema {
1012 return Err(TelemetryError::InvalidType);
1013 }
1014 decode_discovery_schema_payload(pkt.payload())
1015}
1016
1017pub fn decode_discovery_schema_payload(
1019 payload: &[u8],
1020) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
1021 let mut cursor = 0usize;
1022 let version = read_u32(payload, &mut cursor, "discovery schema version")?;
1023 if version != 1 && version != 2 && version != 3 {
1024 return Err(TelemetryError::Unpack("discovery schema version"));
1025 }
1026
1027 let endpoint_count =
1028 read_u32(payload, &mut cursor, "discovery schema endpoint count")? as usize;
1029 let mut endpoints = Vec::with_capacity(endpoint_count);
1030 for _ in 0..endpoint_count {
1031 let id = DataEndpoint(read_u32(
1032 payload,
1033 &mut cursor,
1034 "discovery schema endpoint id",
1035 )?);
1036 let link_local_only =
1037 read_u8(payload, &mut cursor, "discovery schema endpoint flags")? != 0;
1038 let name = decode_string(payload, &mut cursor, "discovery schema endpoint name")?;
1039 let description = if version >= 2 {
1040 decode_string(
1041 payload,
1042 &mut cursor,
1043 "discovery schema endpoint description",
1044 )?
1045 } else {
1046 String::new()
1047 };
1048 endpoints.push(OwnedEndpointDefinition {
1049 id,
1050 name,
1051 description,
1052 link_local_only,
1053 });
1054 }
1055
1056 let type_count = read_u32(payload, &mut cursor, "discovery schema type count")? as usize;
1057 let mut types = Vec::with_capacity(type_count);
1058 for _ in 0..type_count {
1059 let id = DataType(read_u32(payload, &mut cursor, "discovery schema type id")?);
1060 let name = decode_string(payload, &mut cursor, "discovery schema type name")?;
1061 let description = if version >= 2 {
1062 decode_string(payload, &mut cursor, "discovery schema type description")?
1063 } else {
1064 String::new()
1065 };
1066 let element_kind = read_u8(payload, &mut cursor, "discovery schema element kind")?;
1067 let count = read_u32(payload, &mut cursor, "discovery schema element count")? as usize;
1068 let data_type = message_data_type_from_code(read_u8(
1069 payload,
1070 &mut cursor,
1071 "discovery schema data type",
1072 )?)
1073 .ok_or(TelemetryError::Unpack("discovery schema data type"))?;
1074 let class =
1075 message_class_from_code(read_u8(payload, &mut cursor, "discovery schema class")?)
1076 .ok_or(TelemetryError::Unpack("discovery schema class"))?;
1077 let element = match element_kind {
1078 0 => MessageElement::Static(count, data_type, class),
1079 1 => MessageElement::Dynamic(data_type, class),
1080 _ => return Err(TelemetryError::Unpack("discovery schema element kind")),
1081 };
1082 let reliable =
1083 reliable_from_code(read_u8(payload, &mut cursor, "discovery schema reliable")?)
1084 .ok_or(TelemetryError::Unpack("discovery schema reliable"))?;
1085 let priority = read_u8(payload, &mut cursor, "discovery schema priority")?;
1086 let e2e_encryption = if version >= 3 {
1087 e2e_encryption_policy_from_code(read_u8(
1088 payload,
1089 &mut cursor,
1090 "discovery schema e2e cryptography",
1091 )?)
1092 .ok_or(TelemetryError::Unpack("discovery schema e2e cryptography"))?
1093 } else {
1094 E2eEncryptionPolicy::PreferOff
1095 };
1096 let endpoint_count =
1097 read_u32(payload, &mut cursor, "discovery schema type endpoint count")? as usize;
1098 let mut type_endpoints = Vec::with_capacity(endpoint_count);
1099 for _ in 0..endpoint_count {
1100 type_endpoints.push(DataEndpoint(read_u32(
1101 payload,
1102 &mut cursor,
1103 "discovery schema type endpoint",
1104 )?));
1105 }
1106 types.push(OwnedDataTypeDefinition {
1107 id,
1108 name,
1109 description,
1110 element,
1111 endpoints: type_endpoints,
1112 reliable,
1113 priority,
1114 e2e_encryption,
1115 });
1116 }
1117
1118 if cursor != payload.len() {
1119 return Err(TelemetryError::Unpack("discovery schema trailing bytes"));
1120 }
1121 Ok(OwnedRuntimeSchemaSnapshot { endpoints, types })
1122}