use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use crate::router::encode_slice_le;
use crate::{
DataEndpoint, DataType, E2eEncryptionPolicy, MessageElement, TelemetryError, TelemetryResult,
config::{
OwnedDataTypeDefinition, OwnedEndpointDefinition, OwnedRuntimeSchemaSnapshot,
RuntimeSchemaSnapshot, e2e_encryption_policy_code, e2e_encryption_policy_from_code,
export_schema, message_class_code, message_class_from_code, message_data_type_code,
message_data_type_from_code, reliable_code, reliable_from_code,
},
packet::Packet,
try_enum_from_u32,
};
pub const DISCOVERY_ROUTE_TTL_MS: u64 = 30_000;
pub const DISCOVERY_FAST_INTERVAL_MS: u64 = 250;
pub const DISCOVERY_SLOW_INTERVAL_MS: u64 = 5_000;
pub const DISCOVERY_SLOW_LINK_CAPACITY_BPS: u64 = 512;
pub const DISCOVERY_SLOW_LINK_PING_INTERVAL_MS: u64 = 15_000;
pub const DISCOVERY_SLOW_LINK_FULL_INTERVAL_MS: u64 = 120_000;
pub const TIMESYNC_SLOW_LINK_MIN_INTERVAL_MS: u64 = 30_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DiscoveryCadenceState {
pub current_interval_ms: u64,
pub next_announce_ms: u64,
}
impl Default for DiscoveryCadenceState {
fn default() -> Self {
Self {
current_interval_ms: DISCOVERY_FAST_INTERVAL_MS,
next_announce_ms: 0,
}
}
}
impl DiscoveryCadenceState {
pub fn on_topology_change(&mut self, now_ms: u64) {
self.current_interval_ms = DISCOVERY_FAST_INTERVAL_MS;
self.next_announce_ms = now_ms;
}
pub fn on_announce_sent(&mut self, now_ms: u64) {
self.next_announce_ms = now_ms.saturating_add(self.current_interval_ms);
self.current_interval_ms = core::cmp::min(
self.current_interval_ms.saturating_mul(2),
DISCOVERY_SLOW_INTERVAL_MS,
);
}
pub fn due(&self, now_ms: u64) -> bool {
now_ms >= self.next_announce_ms
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologyBoardNode {
pub sender_id: String,
pub reachable_endpoints: Vec<DataEndpoint>,
pub reachable_timesync_sources: Vec<String>,
pub connections: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologyLink {
pub source: String,
pub target: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologyAnnouncerRoute {
pub sender_id: String,
pub reachable_endpoints: Vec<DataEndpoint>,
pub reachable_timesync_sources: Vec<String>,
pub routers: Vec<TopologyBoardNode>,
pub last_seen_ms: u64,
pub age_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologySideRoute {
pub side_id: usize,
pub side_name: &'static str,
pub reachable_endpoints: Vec<DataEndpoint>,
pub reachable_timesync_sources: Vec<String>,
pub announcers: Vec<TopologyAnnouncerRoute>,
pub last_seen_ms: u64,
pub age_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopologySnapshot {
pub advertised_endpoints: Vec<DataEndpoint>,
pub advertised_timesync_sources: Vec<String>,
pub routers: Vec<TopologyBoardNode>,
pub links: Vec<TopologyLink>,
pub routes: Vec<TopologySideRoute>,
pub current_announce_interval_ms: u64,
pub next_announce_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientStatsSnapshot {
pub sender_id: String,
pub connected: bool,
pub side_ids: Vec<usize>,
pub side_names: Vec<&'static str>,
pub last_seen_ms: Option<u64>,
pub age_ms: Option<u64>,
pub reachable_endpoints: Vec<DataEndpoint>,
pub reachable_timesync_sources: Vec<String>,
pub packets_sent: u64,
pub packets_received: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
}
pub const LINK_CAPABILITY_HEADER_TEMPLATES: u32 = 0x0000_0001;
pub const LINK_CAPABILITY_CHUNKING: u32 = 0x0000_0002;
pub const LINK_CAPABILITY_RELIABILITY: u32 = 0x0000_0004;
pub const LINK_CAPABILITY_CRYPTO: u32 = 0x0000_0008;
pub const LINK_CAPABILITY_END_TO_END_RELIABILITY: u32 = 0x0000_0010;
pub const LINK_CAPABILITY_OMIT_UNCHANGED_TIMESTAMPS: u32 = 0x0000_0020;
pub const LINK_PROFILE_CANONICAL: u8 = 0;
pub const LINK_PROFILE_TEMPLATE: u8 = 1;
pub const LINK_PROFILE_IPV6_LIKE: u8 = 2;
pub const LINK_PROFILE_IPV4_LIKE: u8 = 3;
pub const ADDRESS_MODE_DYNAMIC: u8 = 0;
pub const ADDRESS_MODE_REQUESTED: u8 = 1;
pub const ADDRESS_MODE_STATIC: u8 = 2;
pub const ADDRESS_STATE_REQUEST: u8 = 0;
pub const ADDRESS_STATE_APPROVED: u8 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LinkCapabilities {
pub version: u8,
pub flags: u32,
pub profile: u8,
pub max_frame_bytes: u32,
pub compact_header_target_bytes: u32,
pub max_side_transport_templates: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AddressAdvertisement {
pub hostname: String,
pub address: u32,
pub requested_address: u32,
pub mode: u8,
pub state: u8,
pub birth_ms: u64,
pub owner_hash: u64,
pub reachable_endpoints: Vec<DataEndpoint>,
pub reachable_timesync_sources: Vec<String>,
pub link_capabilities: LinkCapabilities,
}
#[inline]
pub const fn is_router_control_endpoint(ep: DataEndpoint) -> bool {
matches!(
ep,
DataEndpoint::TelemetryError | DataEndpoint::TimeSync | DataEndpoint::Discovery
)
}
#[inline]
pub const fn is_discovery_endpoint(ep: DataEndpoint) -> bool {
matches!(ep, DataEndpoint::Discovery)
}
#[inline]
pub const fn is_discovery_type(ty: DataType) -> bool {
matches!(
ty,
DataType::DiscoveryAnnounce
| DataType::DiscoveryTimeSyncSources
| DataType::DiscoveryTopology
| DataType::DiscoverySchema
| DataType::DiscoveryTopologyRequest
| DataType::DiscoverySchemaRequest
| DataType::ManagedVariableRequest
| DataType::ManagedVariableValue
| DataType::DiscoveryLeave
| DataType::DiscoveryLinkCapabilities
| DataType::DiscoveryAddress
)
}
#[inline]
pub const fn is_discovery_request_type(ty: DataType) -> bool {
matches!(
ty,
DataType::DiscoveryTopologyRequest
| DataType::DiscoverySchemaRequest
| DataType::ManagedVariableRequest
| DataType::DiscoveryLeave
| DataType::DiscoveryAddress
)
}
fn sort_dedup_strings(items: &mut Vec<String>) {
items.sort_unstable();
items.dedup();
}
pub fn normalize_topology_boards(boards: &mut Vec<TopologyBoardNode>) {
for board in boards.iter_mut() {
board
.reachable_endpoints
.retain(|ep| !is_router_control_endpoint(*ep));
board.reachable_endpoints.sort_unstable();
board.reachable_endpoints.dedup();
sort_dedup_strings(&mut board.reachable_timesync_sources);
board.connections.retain(|peer| peer != &board.sender_id);
sort_dedup_strings(&mut board.connections);
}
boards.sort_unstable_by(|a, b| a.sender_id.cmp(&b.sender_id));
boards.dedup_by(|a, b| a.sender_id == b.sender_id);
}
pub fn topology_links_from_boards(boards: &[TopologyBoardNode]) -> Vec<TopologyLink> {
let mut links = Vec::new();
for board in boards {
for peer in &board.connections {
if peer == &board.sender_id {
continue;
}
let (source, target) = if board.sender_id <= *peer {
(board.sender_id.clone(), peer.clone())
} else {
(peer.clone(), board.sender_id.clone())
};
links.push(TopologyLink { source, target });
}
}
links.sort_unstable_by(|a, b| (&a.source, &a.target).cmp(&(&b.source, &b.target)));
links.dedup_by(|a, b| a.source == b.source && a.target == b.target);
links
}
pub fn merge_topology_boards(dst: &mut Vec<TopologyBoardNode>, src: &[TopologyBoardNode]) {
let mut merged: BTreeMap<String, TopologyBoardNode> = dst
.iter()
.cloned()
.map(|board| (board.sender_id.clone(), board))
.collect();
for board in src {
let entry = merged
.entry(board.sender_id.clone())
.or_insert_with(|| TopologyBoardNode {
sender_id: board.sender_id.clone(),
reachable_endpoints: Vec::new(),
reachable_timesync_sources: Vec::new(),
connections: Vec::new(),
});
entry
.reachable_endpoints
.extend(board.reachable_endpoints.iter().copied());
entry
.reachable_timesync_sources
.extend(board.reachable_timesync_sources.iter().cloned());
entry.connections.extend(board.connections.iter().cloned());
}
let mut out: Vec<TopologyBoardNode> = merged.into_values().collect();
normalize_topology_boards(&mut out);
*dst = out;
}
pub fn summarize_topology_boards(boards: &[TopologyBoardNode]) -> (Vec<DataEndpoint>, Vec<String>) {
let mut reachable_endpoints = Vec::new();
let mut reachable_timesync_sources = Vec::new();
for board in boards {
reachable_endpoints.extend(board.reachable_endpoints.iter().copied());
reachable_timesync_sources.extend(board.reachable_timesync_sources.iter().cloned());
}
reachable_endpoints.sort_unstable();
reachable_endpoints.dedup();
reachable_endpoints.retain(|ep| !is_router_control_endpoint(*ep));
sort_dedup_strings(&mut reachable_timesync_sources);
(reachable_endpoints, reachable_timesync_sources)
}
pub fn build_discovery_announce(
sender: &str,
timestamp_ms: u64,
endpoints: &[DataEndpoint],
) -> TelemetryResult<Packet> {
let payload_words: Vec<u32> = endpoints.iter().copied().map(|ep| ep.as_u32()).collect();
Packet::new(
DataType::DiscoveryAnnounce,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
encode_slice_le(payload_words.as_slice()),
)
}
pub fn decode_discovery_announce(pkt: &Packet) -> TelemetryResult<Vec<DataEndpoint>> {
if pkt.data_type() != DataType::DiscoveryAnnounce {
return Err(TelemetryError::InvalidType);
}
decode_discovery_payload(pkt.payload())
}
pub fn decode_discovery_payload(payload: &[u8]) -> TelemetryResult<Vec<DataEndpoint>> {
if !payload.len().is_multiple_of(4) {
return Err(TelemetryError::Unpack("discovery payload width"));
}
let mut endpoints = Vec::with_capacity(payload.len() / 4);
for chunk in payload.chunks_exact(4) {
let raw = u32::from_le_bytes(chunk.try_into().expect("4-byte chunk"));
let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
if is_discovery_endpoint(ep) {
continue;
}
endpoints.push(ep);
}
endpoints.sort_unstable();
endpoints.dedup();
Ok(endpoints)
}
pub fn build_discovery_timesync_sources<S: AsRef<str>>(
sender: &str,
timestamp_ms: u64,
sources: &[S],
) -> TelemetryResult<Packet> {
let mut payload = Vec::new();
let mut deduped: Vec<&str> = sources.iter().map(|s| s.as_ref()).collect();
deduped.sort_unstable();
deduped.dedup();
payload.extend_from_slice(&(deduped.len() as u32).to_le_bytes());
for source in deduped {
let bytes = source.as_bytes();
let len = u32::try_from(bytes.len())
.map_err(|_| TelemetryError::Pack("discovery source id too long"))?;
payload.extend_from_slice(&len.to_le_bytes());
payload.extend_from_slice(bytes);
}
Packet::new(
DataType::DiscoveryTimeSyncSources,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
payload.into(),
)
}
pub fn build_discovery_topology_request(
sender: &str,
timestamp_ms: u64,
) -> TelemetryResult<Packet> {
Packet::new(
DataType::DiscoveryTopologyRequest,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
Vec::<u8>::new().into(),
)
}
pub fn build_discovery_schema_request(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
Packet::new(
DataType::DiscoverySchemaRequest,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
Vec::<u8>::new().into(),
)
}
pub fn build_discovery_leave(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
Packet::new(
DataType::DiscoveryLeave,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
Vec::<u8>::new().into(),
)
}
pub fn build_discovery_link_capabilities(
sender: &str,
timestamp_ms: u64,
capabilities: LinkCapabilities,
) -> TelemetryResult<Packet> {
let mut payload = Vec::with_capacity(18);
payload.push(capabilities.version);
payload.extend_from_slice(&capabilities.flags.to_le_bytes());
payload.push(capabilities.profile);
payload.extend_from_slice(&capabilities.max_frame_bytes.to_le_bytes());
payload.extend_from_slice(&capabilities.compact_header_target_bytes.to_le_bytes());
payload.extend_from_slice(&capabilities.max_side_transport_templates.to_le_bytes());
Packet::new(
DataType::DiscoveryLinkCapabilities,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
payload.into(),
)
}
pub fn decode_discovery_link_capabilities(pkt: &Packet) -> TelemetryResult<LinkCapabilities> {
if pkt.data_type() != DataType::DiscoveryLinkCapabilities {
return Err(TelemetryError::InvalidType);
}
let payload = pkt.payload();
if payload.len() != 18 {
return Err(TelemetryError::Unpack("discovery link capabilities width"));
}
Ok(LinkCapabilities {
version: payload[0],
flags: u32::from_le_bytes(payload[1..5].try_into().expect("4-byte flags")),
profile: payload[5],
max_frame_bytes: u32::from_le_bytes(payload[6..10].try_into().expect("4-byte max frame")),
compact_header_target_bytes: u32::from_le_bytes(
payload[10..14].try_into().expect("4-byte target"),
),
max_side_transport_templates: u32::from_le_bytes(
payload[14..18].try_into().expect("4-byte templates"),
),
})
}
pub fn build_discovery_address(
sender: &str,
timestamp_ms: u64,
ad: &AddressAdvertisement,
) -> TelemetryResult<Packet> {
let mut payload = Vec::new();
payload.push(1);
payload.push(ad.mode);
payload.push(ad.state);
payload.extend_from_slice(&ad.address.to_le_bytes());
payload.extend_from_slice(&ad.requested_address.to_le_bytes());
payload.extend_from_slice(&ad.birth_ms.to_le_bytes());
payload.extend_from_slice(&ad.owner_hash.to_le_bytes());
encode_string(&mut payload, &ad.hostname)?;
let mut endpoints = ad.reachable_endpoints.clone();
endpoints.retain(|ep| !is_discovery_endpoint(*ep));
endpoints.sort_unstable();
endpoints.dedup();
let endpoint_count = u32::try_from(endpoints.len())
.map_err(|_| TelemetryError::Pack("discovery address endpoint count"))?;
payload.extend_from_slice(&endpoint_count.to_le_bytes());
for ep in endpoints {
payload.extend_from_slice(&ep.as_u32().to_le_bytes());
}
let mut sources = ad.reachable_timesync_sources.clone();
sort_dedup_strings(&mut sources);
let source_count = u32::try_from(sources.len())
.map_err(|_| TelemetryError::Pack("discovery address source count"))?;
payload.extend_from_slice(&source_count.to_le_bytes());
for source in sources {
encode_string(&mut payload, &source)?;
}
payload.push(ad.link_capabilities.version);
payload.extend_from_slice(&ad.link_capabilities.flags.to_le_bytes());
payload.push(ad.link_capabilities.profile);
payload.extend_from_slice(&ad.link_capabilities.max_frame_bytes.to_le_bytes());
payload.extend_from_slice(
&ad.link_capabilities
.compact_header_target_bytes
.to_le_bytes(),
);
payload.extend_from_slice(
&ad.link_capabilities
.max_side_transport_templates
.to_le_bytes(),
);
Packet::new(
DataType::DiscoveryAddress,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
payload.into(),
)
}
pub fn decode_discovery_address(pkt: &Packet) -> TelemetryResult<AddressAdvertisement> {
if pkt.data_type() != DataType::DiscoveryAddress {
return Err(TelemetryError::InvalidType);
}
let payload = pkt.payload();
let mut cursor = 0usize;
let version = read_u8(payload, &mut cursor, "discovery address version")?;
if version != 1 {
return Err(TelemetryError::Unpack("discovery address version"));
}
let mode = read_u8(payload, &mut cursor, "discovery address mode")?;
let state = read_u8(payload, &mut cursor, "discovery address state")?;
let address = read_u32(payload, &mut cursor, "discovery address current")?;
let requested_address = read_u32(payload, &mut cursor, "discovery address requested")?;
if payload.len().saturating_sub(cursor) < 8 {
return Err(TelemetryError::Unpack("discovery address birth"));
}
let birth_ms = u64::from_le_bytes(
payload[cursor..cursor + 8]
.try_into()
.expect("8-byte birth"),
);
cursor += 8;
if payload.len().saturating_sub(cursor) < 8 {
return Err(TelemetryError::Unpack("discovery address owner"));
}
let owner_hash = u64::from_le_bytes(
payload[cursor..cursor + 8]
.try_into()
.expect("8-byte owner"),
);
cursor += 8;
let hostname = decode_string(payload, &mut cursor, "discovery address hostname")?;
let endpoint_count =
read_u32(payload, &mut cursor, "discovery address endpoint count")? as usize;
let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
for _ in 0..endpoint_count {
let raw = read_u32(payload, &mut cursor, "discovery address endpoint")?;
let ep = try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
if !is_discovery_endpoint(ep) {
reachable_endpoints.push(ep);
}
}
reachable_endpoints.sort_unstable();
reachable_endpoints.dedup();
let source_count = read_u32(payload, &mut cursor, "discovery address source count")? as usize;
let mut reachable_timesync_sources = Vec::with_capacity(source_count);
for _ in 0..source_count {
let source = decode_string(payload, &mut cursor, "discovery address source")?;
if !source.is_empty() {
reachable_timesync_sources.push(source);
}
}
sort_dedup_strings(&mut reachable_timesync_sources);
let version = read_u8(payload, &mut cursor, "discovery address link version")?;
let flags = read_u32(payload, &mut cursor, "discovery address link flags")?;
let profile = read_u8(payload, &mut cursor, "discovery address link profile")?;
let max_frame_bytes = read_u32(payload, &mut cursor, "discovery address link max frame")?;
let compact_header_target_bytes = read_u32(
payload,
&mut cursor,
"discovery address link compact target",
)?;
let max_side_transport_templates =
read_u32(payload, &mut cursor, "discovery address link templates")?;
if cursor != payload.len() {
return Err(TelemetryError::Unpack("discovery address trailing bytes"));
}
if hostname.is_empty() || address == 0 {
return Err(TelemetryError::Unpack("bad discovery address"));
}
Ok(AddressAdvertisement {
hostname,
address,
requested_address,
mode,
state,
birth_ms,
owner_hash,
reachable_endpoints,
reachable_timesync_sources,
link_capabilities: LinkCapabilities {
version,
flags,
profile,
max_frame_bytes,
compact_header_target_bytes,
max_side_transport_templates,
},
})
}
pub fn build_managed_variable_request(
sender: &str,
timestamp_ms: u64,
ty: DataType,
) -> TelemetryResult<Packet> {
Packet::new(
DataType::ManagedVariableRequest,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
encode_slice_le(&[ty.as_u32()]),
)
}
pub fn decode_managed_variable_request(pkt: &Packet) -> TelemetryResult<DataType> {
if pkt.data_type() != DataType::ManagedVariableRequest {
return Err(TelemetryError::InvalidType);
}
let payload = pkt.payload();
if payload.len() != 4 {
return Err(TelemetryError::Unpack("managed variable request width"));
}
let raw = u32::from_le_bytes(payload.try_into().expect("4-byte payload"));
try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad managed variable data type"))
}
pub fn decode_discovery_timesync_sources(pkt: &Packet) -> TelemetryResult<Vec<String>> {
if pkt.data_type() != DataType::DiscoveryTimeSyncSources {
return Err(TelemetryError::InvalidType);
}
decode_discovery_timesync_sources_payload(pkt.payload())
}
pub fn decode_discovery_timesync_sources_payload(payload: &[u8]) -> TelemetryResult<Vec<String>> {
if payload.len() < 4 {
return Err(TelemetryError::Unpack("discovery timesync source count"));
}
let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
let mut cursor = 4usize;
let mut out = Vec::with_capacity(count);
for _ in 0..count {
if payload.len().saturating_sub(cursor) < 4 {
return Err(TelemetryError::Unpack("discovery timesync source len"));
}
let len = u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte len"))
as usize;
cursor += 4;
if payload.len().saturating_sub(cursor) < len {
return Err(TelemetryError::Unpack("discovery timesync source bytes"));
}
let raw = &payload[cursor..cursor + len];
cursor += len;
let source = core::str::from_utf8(raw)
.map_err(|_| TelemetryError::Unpack("discovery timesync source utf8"))?;
if !source.is_empty() {
out.push(source.to_string());
}
}
if cursor != payload.len() {
return Err(TelemetryError::Unpack("discovery timesync trailing bytes"));
}
out.sort_unstable();
out.dedup();
Ok(out)
}
pub fn build_discovery_topology(
sender: &str,
timestamp_ms: u64,
boards: &[TopologyBoardNode],
) -> TelemetryResult<Packet> {
let mut payload = Vec::new();
let mut normalized = boards.to_vec();
normalize_topology_boards(&mut normalized);
payload.extend_from_slice(&(normalized.len() as u32).to_le_bytes());
for board in normalized {
let sender_bytes = board.sender_id.as_bytes();
let sender_len = u32::try_from(sender_bytes.len())
.map_err(|_| TelemetryError::Pack("discovery topology sender id too long"))?;
payload.extend_from_slice(&sender_len.to_le_bytes());
payload.extend_from_slice(sender_bytes);
payload.extend_from_slice(&(board.reachable_endpoints.len() as u32).to_le_bytes());
for ep in board.reachable_endpoints {
payload.extend_from_slice(&(ep.as_u32()).to_le_bytes());
}
payload.extend_from_slice(&(board.reachable_timesync_sources.len() as u32).to_le_bytes());
for source in board.reachable_timesync_sources {
let bytes = source.as_bytes();
let len = u32::try_from(bytes.len())
.map_err(|_| TelemetryError::Pack("discovery topology source id too long"))?;
payload.extend_from_slice(&len.to_le_bytes());
payload.extend_from_slice(bytes);
}
payload.extend_from_slice(&(board.connections.len() as u32).to_le_bytes());
for peer in board.connections {
let bytes = peer.as_bytes();
let len = u32::try_from(bytes.len())
.map_err(|_| TelemetryError::Pack("discovery topology connection id too long"))?;
payload.extend_from_slice(&len.to_le_bytes());
payload.extend_from_slice(bytes);
}
}
Packet::new(
DataType::DiscoveryTopology,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
payload.into(),
)
}
fn decode_string(
payload: &[u8],
cursor: &mut usize,
label: &'static str,
) -> TelemetryResult<String> {
if payload.len().saturating_sub(*cursor) < 4 {
return Err(TelemetryError::Unpack(label));
}
let len = u32::from_le_bytes(
payload[*cursor..*cursor + 4]
.try_into()
.expect("4-byte len"),
) as usize;
*cursor += 4;
if payload.len().saturating_sub(*cursor) < len {
return Err(TelemetryError::Unpack(label));
}
let raw = &payload[*cursor..*cursor + len];
*cursor += len;
core::str::from_utf8(raw)
.map(|s| s.to_string())
.map_err(|_| TelemetryError::Unpack(label))
}
pub fn decode_discovery_topology(pkt: &Packet) -> TelemetryResult<Vec<TopologyBoardNode>> {
if pkt.data_type() != DataType::DiscoveryTopology {
return Err(TelemetryError::InvalidType);
}
decode_discovery_topology_payload(pkt.payload())
}
pub fn decode_discovery_topology_payload(
payload: &[u8],
) -> TelemetryResult<Vec<TopologyBoardNode>> {
if payload.len() < 4 {
return Err(TelemetryError::Unpack("discovery topology board count"));
}
let count = u32::from_le_bytes(payload[..4].try_into().expect("4-byte count")) as usize;
let mut cursor = 4usize;
let mut boards = Vec::with_capacity(count);
for _ in 0..count {
let sender_id = decode_string(payload, &mut cursor, "discovery topology sender id")?;
if payload.len().saturating_sub(cursor) < 4 {
return Err(TelemetryError::Unpack("discovery topology endpoint count"));
}
let endpoint_count = u32::from_le_bytes(
payload[cursor..cursor + 4]
.try_into()
.expect("4-byte count"),
) as usize;
cursor += 4;
let mut reachable_endpoints = Vec::with_capacity(endpoint_count);
for _ in 0..endpoint_count {
if payload.len().saturating_sub(cursor) < 4 {
return Err(TelemetryError::Unpack("discovery topology endpoint"));
}
let raw =
u32::from_le_bytes(payload[cursor..cursor + 4].try_into().expect("4-byte ep"));
cursor += 4;
let ep =
try_enum_from_u32(raw).ok_or(TelemetryError::Unpack("bad discovery endpoint"))?;
if !is_discovery_endpoint(ep) {
reachable_endpoints.push(ep);
}
}
if payload.len().saturating_sub(cursor) < 4 {
return Err(TelemetryError::Unpack(
"discovery topology timesync source count",
));
}
let source_count = u32::from_le_bytes(
payload[cursor..cursor + 4]
.try_into()
.expect("4-byte count"),
) as usize;
cursor += 4;
let mut reachable_timesync_sources = Vec::with_capacity(source_count);
for _ in 0..source_count {
let source = decode_string(payload, &mut cursor, "discovery topology timesync source")?;
if !source.is_empty() {
reachable_timesync_sources.push(source);
}
}
if payload.len().saturating_sub(cursor) < 4 {
return Err(TelemetryError::Unpack(
"discovery topology connection count",
));
}
let connection_count = u32::from_le_bytes(
payload[cursor..cursor + 4]
.try_into()
.expect("4-byte count"),
) as usize;
cursor += 4;
let mut connections = Vec::with_capacity(connection_count);
for _ in 0..connection_count {
let peer = decode_string(payload, &mut cursor, "discovery topology connection")?;
if !peer.is_empty() {
connections.push(peer);
}
}
boards.push(TopologyBoardNode {
sender_id,
reachable_endpoints,
reachable_timesync_sources,
connections,
});
}
if cursor != payload.len() {
return Err(TelemetryError::Unpack("discovery topology trailing bytes"));
}
normalize_topology_boards(&mut boards);
Ok(boards)
}
fn encode_string(payload: &mut Vec<u8>, value: &str) -> TelemetryResult<()> {
let len = u32::try_from(value.len())
.map_err(|_| TelemetryError::Pack("discovery schema string too long"))?;
payload.extend_from_slice(&len.to_le_bytes());
payload.extend_from_slice(value.as_bytes());
Ok(())
}
fn read_u8(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u8> {
if payload.len().saturating_sub(*cursor) < 1 {
return Err(TelemetryError::Unpack(label));
}
let out = payload[*cursor];
*cursor += 1;
Ok(out)
}
fn read_u32(payload: &[u8], cursor: &mut usize, label: &'static str) -> TelemetryResult<u32> {
if payload.len().saturating_sub(*cursor) < 4 {
return Err(TelemetryError::Unpack(label));
}
let out = u32::from_le_bytes(
payload[*cursor..*cursor + 4]
.try_into()
.expect("4-byte u32"),
);
*cursor += 4;
Ok(out)
}
pub fn build_discovery_schema(sender: &str, timestamp_ms: u64) -> TelemetryResult<Packet> {
build_discovery_schema_from_snapshot(sender, timestamp_ms, export_schema())
}
pub fn elect_discovery_master(local_sender: &str, boards: &[TopologyBoardNode]) -> String {
let mut nodes: BTreeMap<String, Vec<String>> = BTreeMap::new();
for board in boards {
nodes.entry(board.sender_id.clone()).or_default();
for peer in board.connections.iter() {
nodes
.entry(peer.clone())
.or_default()
.push(board.sender_id.clone());
nodes
.entry(board.sender_id.clone())
.or_default()
.push(peer.clone());
}
}
nodes.entry(local_sender.to_string()).or_default();
for peers in nodes.values_mut() {
peers.sort_unstable();
peers.dedup();
}
let mut best_sender = local_sender.to_string();
let mut best_unreachable = usize::MAX;
let mut best_max_hops = usize::MAX;
let mut best_total_hops = usize::MAX;
for sender in nodes.keys() {
let mut frontier = vec![sender.clone()];
let mut seen: BTreeMap<String, usize> = BTreeMap::new();
seen.insert(sender.clone(), 0);
let mut idx = 0;
while idx < frontier.len() {
let cur = frontier[idx].clone();
idx += 1;
let cur_dist = seen[&cur];
if let Some(peers) = nodes.get(&cur) {
for peer in peers {
if !seen.contains_key(peer) {
seen.insert(peer.clone(), cur_dist + 1);
frontier.push(peer.clone());
}
}
}
}
let unreachable = nodes.len().saturating_sub(seen.len());
let max_hops = seen.values().copied().max().unwrap_or(0);
let total_hops = seen.values().copied().sum();
let better = unreachable < best_unreachable
|| (unreachable == best_unreachable && max_hops < best_max_hops)
|| (unreachable == best_unreachable
&& max_hops == best_max_hops
&& total_hops < best_total_hops)
|| (unreachable == best_unreachable
&& max_hops == best_max_hops
&& total_hops == best_total_hops
&& sender < &best_sender);
if better {
best_sender = sender.clone();
best_unreachable = unreachable;
best_max_hops = max_hops;
best_total_hops = total_hops;
}
}
best_sender
}
pub fn build_discovery_schema_from_snapshot(
sender: &str,
timestamp_ms: u64,
mut schema: RuntimeSchemaSnapshot,
) -> TelemetryResult<Packet> {
let mut payload = Vec::new();
payload.extend_from_slice(&3u32.to_le_bytes());
schema.endpoints.sort_unstable_by_key(|def| def.id.as_u32());
schema.types.sort_unstable_by_key(|def| def.id.as_u32());
payload.extend_from_slice(&(schema.endpoints.len() as u32).to_le_bytes());
for ep in schema.endpoints {
payload.extend_from_slice(&ep.id.as_u32().to_le_bytes());
payload.push(ep.link_local_only as u8);
encode_string(&mut payload, ep.name)?;
encode_string(&mut payload, ep.description)?;
}
payload.extend_from_slice(&(schema.types.len() as u32).to_le_bytes());
for ty in schema.types {
payload.extend_from_slice(&ty.id.as_u32().to_le_bytes());
encode_string(&mut payload, ty.name)?;
encode_string(&mut payload, ty.description)?;
match ty.element {
MessageElement::Static(count, data_type, class) => {
payload.push(0);
payload.extend_from_slice(&(count as u32).to_le_bytes());
payload.push(message_data_type_code(data_type));
payload.push(message_class_code(class));
}
MessageElement::Dynamic(data_type, class) => {
payload.push(1);
payload.extend_from_slice(&0u32.to_le_bytes());
payload.push(message_data_type_code(data_type));
payload.push(message_class_code(class));
}
}
payload.push(reliable_code(ty.reliable));
payload.push(ty.priority);
payload.push(e2e_encryption_policy_code(ty.e2e_encryption));
payload.extend_from_slice(&(ty.endpoints.len() as u32).to_le_bytes());
for ep in ty.endpoints {
payload.extend_from_slice(&ep.as_u32().to_le_bytes());
}
}
Packet::new(
DataType::DiscoverySchema,
&[DataEndpoint::Discovery],
sender,
timestamp_ms,
payload.into(),
)
}
pub fn decode_discovery_schema(pkt: &Packet) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
if pkt.data_type() != DataType::DiscoverySchema {
return Err(TelemetryError::InvalidType);
}
decode_discovery_schema_payload(pkt.payload())
}
pub fn decode_discovery_schema_payload(
payload: &[u8],
) -> TelemetryResult<OwnedRuntimeSchemaSnapshot> {
let mut cursor = 0usize;
let version = read_u32(payload, &mut cursor, "discovery schema version")?;
if version != 1 && version != 2 && version != 3 {
return Err(TelemetryError::Unpack("discovery schema version"));
}
let endpoint_count =
read_u32(payload, &mut cursor, "discovery schema endpoint count")? as usize;
let mut endpoints = Vec::with_capacity(endpoint_count);
for _ in 0..endpoint_count {
let id = DataEndpoint(read_u32(
payload,
&mut cursor,
"discovery schema endpoint id",
)?);
let link_local_only =
read_u8(payload, &mut cursor, "discovery schema endpoint flags")? != 0;
let name = decode_string(payload, &mut cursor, "discovery schema endpoint name")?;
let description = if version >= 2 {
decode_string(
payload,
&mut cursor,
"discovery schema endpoint description",
)?
} else {
String::new()
};
endpoints.push(OwnedEndpointDefinition {
id,
name,
description,
link_local_only,
});
}
let type_count = read_u32(payload, &mut cursor, "discovery schema type count")? as usize;
let mut types = Vec::with_capacity(type_count);
for _ in 0..type_count {
let id = DataType(read_u32(payload, &mut cursor, "discovery schema type id")?);
let name = decode_string(payload, &mut cursor, "discovery schema type name")?;
let description = if version >= 2 {
decode_string(payload, &mut cursor, "discovery schema type description")?
} else {
String::new()
};
let element_kind = read_u8(payload, &mut cursor, "discovery schema element kind")?;
let count = read_u32(payload, &mut cursor, "discovery schema element count")? as usize;
let data_type = message_data_type_from_code(read_u8(
payload,
&mut cursor,
"discovery schema data type",
)?)
.ok_or(TelemetryError::Unpack("discovery schema data type"))?;
let class =
message_class_from_code(read_u8(payload, &mut cursor, "discovery schema class")?)
.ok_or(TelemetryError::Unpack("discovery schema class"))?;
let element = match element_kind {
0 => MessageElement::Static(count, data_type, class),
1 => MessageElement::Dynamic(data_type, class),
_ => return Err(TelemetryError::Unpack("discovery schema element kind")),
};
let reliable =
reliable_from_code(read_u8(payload, &mut cursor, "discovery schema reliable")?)
.ok_or(TelemetryError::Unpack("discovery schema reliable"))?;
let priority = read_u8(payload, &mut cursor, "discovery schema priority")?;
let e2e_encryption = if version >= 3 {
e2e_encryption_policy_from_code(read_u8(
payload,
&mut cursor,
"discovery schema e2e cryptography",
)?)
.ok_or(TelemetryError::Unpack("discovery schema e2e cryptography"))?
} else {
E2eEncryptionPolicy::PreferOff
};
let endpoint_count =
read_u32(payload, &mut cursor, "discovery schema type endpoint count")? as usize;
let mut type_endpoints = Vec::with_capacity(endpoint_count);
for _ in 0..endpoint_count {
type_endpoints.push(DataEndpoint(read_u32(
payload,
&mut cursor,
"discovery schema type endpoint",
)?));
}
types.push(OwnedDataTypeDefinition {
id,
name,
description,
element,
endpoints: type_endpoints,
reliable,
priority,
e2e_encryption,
});
}
if cursor != payload.len() {
return Err(TelemetryError::Unpack("discovery schema trailing bytes"));
}
Ok(OwnedRuntimeSchemaSnapshot { endpoints, types })
}