use std::collections::HashMap;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use crate::alias::AliasRegistry;
use crate::codec::{EncodeOptions, decode, encode};
use crate::error::{Result, SparkplugError};
use crate::model::{Metric, Payload};
use crate::state::StatePayload;
use crate::topic::{DeviceId, EdgeNodeId, GroupId, MessageType, SparkplugTopic};
use crate::transport::{
ConnectOptions, IncomingMessage, MqttTransport, OutboundMessage, Qos, TlsConfig,
};
use crate::value::MetricValue;
use crate::{BDSEQ_METRIC_NAME, NODE_CONTROL_REBIRTH};
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
.unwrap_or(0)
}
#[derive(Clone, Debug)]
pub struct HostConfig {
pub host_id: String,
pub group_subscriptions: Vec<String>,
pub client_id: String,
pub host: String,
pub port: u16,
pub keep_alive_secs: u16,
pub rebirth_debounce: Duration,
pub tls: Option<TlsConfig>,
}
impl HostConfig {
#[must_use]
pub fn new(host_id: &str) -> Self {
Self {
host_id: host_id.to_owned(),
group_subscriptions: vec!["spBv1.0/#".to_owned()],
client_id: format!("host-{host_id}"),
host: "localhost".to_owned(),
port: 1883,
keep_alive_secs: 30,
rebirth_debounce: Duration::from_secs(5),
tls: None,
}
}
fn state_topic(&self) -> String {
format!("spBv1.0/STATE/{}", self.host_id)
}
}
#[derive(Clone, Debug)]
pub enum HostEvent {
NodeBirth {
group: String,
edge: String,
metrics: Vec<Metric>,
},
NodeData {
group: String,
edge: String,
metrics: Vec<Metric>,
},
NodeDeath {
group: String,
edge: String,
timestamp: i64,
devices: Vec<String>,
},
DeviceBirth {
group: String,
edge: String,
device: String,
metrics: Vec<Metric>,
},
DeviceData {
group: String,
edge: String,
device: String,
metrics: Vec<Metric>,
},
DeviceDeath {
group: String,
edge: String,
device: String,
timestamp: i64,
},
RebirthRequested {
group: String,
edge: String,
},
Ignored,
}
#[derive(Default)]
struct DeviceState {
online: bool,
aliases: AliasRegistry,
}
struct NodeState {
online: bool,
bd_seq: Option<i64>,
expected_seq: u8,
aliases: AliasRegistry,
devices: HashMap<String, DeviceState>,
last_rebirth: Option<Instant>,
}
impl NodeState {
fn new() -> Self {
Self {
online: false,
bd_seq: None,
expected_seq: 0,
aliases: AliasRegistry::new(),
devices: HashMap::new(),
last_rebirth: None,
}
}
fn take_rebirth_slot(&mut self, debounce: Duration, now: Instant) -> bool {
let allowed = self
.last_rebirth
.is_none_or(|at| now.duration_since(at) >= debounce);
if allowed {
self.last_rebirth = Some(now);
}
allowed
}
}
fn bdseq_of(payload: &Payload) -> Option<i64> {
payload
.metrics
.iter()
.find(|m| m.name.as_deref() == Some(BDSEQ_METRIC_NAME))
.and_then(|m| match &m.value {
MetricValue::Int64(v) => Some(*v),
MetricValue::UInt64(v) => i64::try_from(*v).ok(),
_ => None,
})
}
fn resolve_names(aliases: &AliasRegistry, mut metrics: Vec<Metric>) -> Vec<Metric> {
for metric in &mut metrics {
if metric.name.is_none()
&& let Some(alias) = metric.alias
&& let Some(name) = aliases.name_for_alias(alias)
{
metric.name = Some(name.to_owned());
}
}
metrics
}
enum Step {
Event(HostEvent),
Rebirth,
}
pub struct HostApplication<T> {
config: HostConfig,
transport: T,
nodes: HashMap<String, NodeState>,
state_ts: i64,
}
impl<T: MqttTransport> HostApplication<T> {
pub fn new(config: HostConfig, transport: T) -> Self {
Self {
config,
transport,
nodes: HashMap::new(),
state_ts: 0,
}
}
pub async fn start(&mut self) -> Result<()> {
self.state_ts = now_ms();
let state_topic = self.config.state_topic();
let will = OutboundMessage {
topic: state_topic.clone(),
qos: Qos::AtLeastOnce,
retain: true,
payload: Bytes::from(StatePayload::new(false, self.state_ts).to_json()),
};
let opts = ConnectOptions {
client_id: self.config.client_id.clone(),
host: self.config.host.clone(),
port: self.config.port,
keep_alive_secs: self.config.keep_alive_secs,
clean_start: true,
will: Some(will),
tls: self.config.tls.clone(),
};
self.transport.connect(&opts).await?;
self.transport
.subscribe(&state_topic, Qos::AtLeastOnce)
.await?;
let subs = self.config.group_subscriptions.clone();
for filter in &subs {
self.transport.subscribe(filter, Qos::AtMostOnce).await?;
}
self.publish_state_birth().await
}
async fn publish_state_birth(&mut self) -> Result<()> {
let msg = OutboundMessage {
topic: self.config.state_topic(),
qos: Qos::AtLeastOnce,
retain: true,
payload: Bytes::from(StatePayload::new(true, self.state_ts).to_json()),
};
self.transport.publish(&msg).await
}
pub async fn shutdown(&mut self) -> Result<()> {
let msg = OutboundMessage {
topic: self.config.state_topic(),
qos: Qos::AtLeastOnce,
retain: true,
payload: Bytes::from(StatePayload::new(false, now_ms()).to_json()),
};
self.transport.publish(&msg).await?;
self.transport.disconnect().await
}
pub async fn publish_node_command(
&mut self,
group: &str,
edge: &str,
metrics: Vec<Metric>,
) -> Result<()> {
let topic = SparkplugTopic::node(
GroupId::new(group)?,
EdgeNodeId::new(edge)?,
MessageType::NCmd,
)?
.to_string();
let payload = Payload {
timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
metrics,
seq: None,
uuid: None,
body: None,
};
self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
.await
}
pub async fn publish_device_command(
&mut self,
group: &str,
edge: &str,
device: &str,
metrics: Vec<Metric>,
) -> Result<()> {
let topic = SparkplugTopic::device(
GroupId::new(group)?,
EdgeNodeId::new(edge)?,
DeviceId::new(device)?,
MessageType::DCmd,
)?
.to_string();
let payload = Payload {
timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
metrics,
seq: None,
uuid: None,
body: None,
};
self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
.await
}
async fn publish_raw(&mut self, topic: String, payload: Bytes) -> Result<()> {
self.transport
.publish(&OutboundMessage {
topic,
qos: Qos::AtMostOnce,
retain: false,
payload,
})
.await
}
async fn send_rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<()> {
let topic = SparkplugTopic::Node {
group: group.clone(),
edge: edge.clone(),
ty: MessageType::NCmd,
}
.to_string();
let payload = Payload {
timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
metrics: vec![Metric::new(
NODE_CONTROL_REBIRTH,
MetricValue::Boolean(true),
)],
seq: None,
uuid: None,
body: None,
};
self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
.await
}
pub async fn recv_and_handle(&mut self) -> Result<Option<HostEvent>> {
match self.transport.recv().await? {
Some(message) => Ok(Some(self.handle_incoming(&message).await?)),
None => Ok(None),
}
}
pub async fn handle_incoming(&mut self, message: &IncomingMessage) -> Result<HostEvent> {
let topic = SparkplugTopic::parse(&message.topic)?;
match topic {
SparkplugTopic::HostState { host_id } => {
if host_id == self.config.host_id {
let state = StatePayload::parse(
std::str::from_utf8(&message.payload)
.map_err(|_| SparkplugError::InvalidUtf8)?,
)?;
if !state.online {
self.publish_state_birth().await?;
}
}
Ok(HostEvent::Ignored)
}
SparkplugTopic::Node { group, edge, ty } => match ty {
MessageType::NBirth => self.on_node_birth(&group, &edge, &message.payload).await,
MessageType::NData => self.on_node_data(&group, &edge, &message.payload).await,
MessageType::NDeath => Ok(self.on_node_death(&group, &edge, &message.payload)?),
MessageType::NCmd => Ok(HostEvent::Ignored),
_ => Ok(HostEvent::Ignored),
},
SparkplugTopic::Device {
group,
edge,
device,
ty,
} => match ty {
MessageType::DBirth => {
self.on_device_birth(&group, &edge, &device, &message.payload)
.await
}
MessageType::DData => {
self.on_device_data(&group, &edge, &device, &message.payload)
.await
}
MessageType::DDeath => {
self.on_device_death(&group, &edge, &device, &message.payload)
.await
}
MessageType::DCmd => Ok(HostEvent::Ignored),
_ => Ok(HostEvent::Ignored),
},
}
}
fn key(group: &GroupId, edge: &EdgeNodeId) -> String {
format!("{group}/{edge}")
}
async fn on_node_birth(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
payload: &[u8],
) -> Result<HostEvent> {
let payload = decode(payload, None)?;
let key = Self::key(group, edge);
let mut duplicate_alias = false;
{
let node = self.nodes.entry(key).or_insert_with(NodeState::new);
node.online = true;
node.bd_seq = bdseq_of(&payload);
node.expected_seq = payload.seq.unwrap_or(0).wrapping_add(1);
node.aliases.clear();
node.devices.clear();
for m in &payload.metrics {
if let Some(name) = &m.name
&& node
.aliases
.try_bind(name, m.alias, m.value.datatype())
.is_err()
{
duplicate_alias = true;
}
}
if duplicate_alias {
node.online = false;
}
}
if duplicate_alias {
return self.rebirth(group, edge).await;
}
Ok(HostEvent::NodeBirth {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
metrics: payload.metrics,
})
}
async fn on_node_data(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
payload: &[u8],
) -> Result<HostEvent> {
let key = Self::key(group, edge);
let step = match self.nodes.get_mut(&key) {
Some(node) if node.online => match decode(payload, Some(&node.aliases)) {
Err(_) => {
node.online = false;
Step::Rebirth
}
Ok(payload) if payload.seq == Some(node.expected_seq) => {
node.expected_seq = node.expected_seq.wrapping_add(1);
Step::Event(HostEvent::NodeData {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
metrics: resolve_names(&node.aliases, payload.metrics),
})
}
Ok(_) => {
node.online = false; Step::Rebirth
}
},
_ => Step::Rebirth, };
self.finish(group, edge, step).await
}
fn on_node_death(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
payload: &[u8],
) -> Result<HostEvent> {
let decoded = decode(payload, None)?;
let incoming = bdseq_of(&decoded);
let timestamp = decoded
.timestamp
.and_then(|t| i64::try_from(t).ok())
.unwrap_or_else(now_ms);
let key = Self::key(group, edge);
if let Some(node) = self.nodes.get_mut(&key)
&& node.online
&& node.bd_seq.is_some()
&& node.bd_seq == incoming
{
node.online = false;
let mut devices: Vec<String> = node
.devices
.iter()
.filter(|(_, d)| d.online)
.map(|(name, _)| name.clone())
.collect();
devices.sort();
for device in node.devices.values_mut() {
device.online = false;
}
return Ok(HostEvent::NodeDeath {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
timestamp,
devices,
});
}
Ok(HostEvent::Ignored)
}
async fn on_device_birth(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
device: &DeviceId,
payload: &[u8],
) -> Result<HostEvent> {
let key = Self::key(group, edge);
let dev = device.as_str().to_owned();
let step = match self.nodes.get_mut(&key) {
Some(node) if node.online => {
let payload = decode(payload, None)?;
if payload.seq == Some(node.expected_seq) {
node.expected_seq = node.expected_seq.wrapping_add(1);
let device_state = node.devices.entry(dev.clone()).or_default();
device_state.online = true;
device_state.aliases.clear();
let mut dup = false;
for m in &payload.metrics {
if let Some(name) = &m.name
&& device_state
.aliases
.try_bind(name, m.alias, m.value.datatype())
.is_err()
{
dup = true;
}
}
if dup {
Step::Rebirth
} else {
Step::Event(HostEvent::DeviceBirth {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
device: dev.clone(),
metrics: payload.metrics,
})
}
} else {
node.online = false;
Step::Rebirth
}
}
_ => Step::Rebirth,
};
self.finish(group, edge, step).await
}
async fn on_device_data(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
device: &DeviceId,
payload: &[u8],
) -> Result<HostEvent> {
let key = Self::key(group, edge);
let dev = device.as_str().to_owned();
let step = match self.nodes.get_mut(&key) {
Some(node) if node.online && node.devices.get(&dev).is_some_and(|d| d.online) => {
let device_state = node.devices.get(&dev).expect("checked present");
match decode(payload, Some(&device_state.aliases)) {
Err(_) => {
node.online = false;
Step::Rebirth
}
Ok(payload) if payload.seq == Some(node.expected_seq) => {
node.expected_seq = node.expected_seq.wrapping_add(1);
let device_state = node.devices.get(&dev).expect("checked present");
Step::Event(HostEvent::DeviceData {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
device: dev.clone(),
metrics: resolve_names(&device_state.aliases, payload.metrics),
})
}
Ok(_) => {
node.online = false;
Step::Rebirth
}
}
}
_ => Step::Rebirth,
};
self.finish(group, edge, step).await
}
async fn on_device_death(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
device: &DeviceId,
payload: &[u8],
) -> Result<HostEvent> {
let key = Self::key(group, edge);
let dev = device.as_str().to_owned();
let step = match self.nodes.get_mut(&key) {
Some(node) if node.online => {
let payload = decode(payload, None)?;
if payload.seq == Some(node.expected_seq) {
node.expected_seq = node.expected_seq.wrapping_add(1);
if let Some(device_state) = node.devices.get_mut(&dev) {
device_state.online = false;
}
let timestamp = payload
.timestamp
.and_then(|t| i64::try_from(t).ok())
.unwrap_or_else(now_ms);
Step::Event(HostEvent::DeviceDeath {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
device: dev.clone(),
timestamp,
})
} else {
node.online = false;
Step::Rebirth
}
}
_ => Step::Rebirth,
};
self.finish(group, edge, step).await
}
async fn finish(
&mut self,
group: &GroupId,
edge: &EdgeNodeId,
step: Step,
) -> Result<HostEvent> {
match step {
Step::Event(event) => Ok(event),
Step::Rebirth => self.rebirth(group, edge).await,
}
}
async fn rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<HostEvent> {
let key = Self::key(group, edge);
let allowed = {
let node = self.nodes.entry(key).or_insert_with(NodeState::new);
node.take_rebirth_slot(self.config.rebirth_debounce, Instant::now())
};
if !allowed {
return Ok(HostEvent::Ignored);
}
self.send_rebirth(group, edge).await?;
Ok(HostEvent::RebirthRequested {
group: group.as_str().to_owned(),
edge: edge.as_str().to_owned(),
})
}
}