1use std::collections::HashMap;
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18
19use bytes::Bytes;
20
21use crate::alias::AliasRegistry;
22use crate::codec::{EncodeOptions, decode, encode};
23use crate::error::{Result, SparkplugError};
24use crate::model::{Metric, Payload};
25use crate::state::StatePayload;
26use crate::topic::{DeviceId, EdgeNodeId, GroupId, MessageType, SparkplugTopic};
27use crate::transport::{
28 ConnectOptions, IncomingMessage, MqttTransport, OutboundMessage, Qos, TlsConfig,
29};
30use crate::value::MetricValue;
31use crate::{BDSEQ_METRIC_NAME, NODE_CONTROL_REBIRTH};
32
33fn now_ms() -> i64 {
34 SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
37 .unwrap_or(0)
38}
39
40#[derive(Clone, Debug)]
42pub struct HostConfig {
43 pub host_id: String,
45 pub group_subscriptions: Vec<String>,
47 pub client_id: String,
49 pub host: String,
51 pub port: u16,
53 pub keep_alive_secs: u16,
55 pub rebirth_debounce: Duration,
57 pub tls: Option<TlsConfig>,
59}
60
61impl HostConfig {
62 #[must_use]
64 pub fn new(host_id: &str) -> Self {
65 Self {
66 host_id: host_id.to_owned(),
67 group_subscriptions: vec!["spBv1.0/#".to_owned()],
68 client_id: format!("host-{host_id}"),
69 host: "localhost".to_owned(),
70 port: 1883,
71 keep_alive_secs: 30,
72 rebirth_debounce: Duration::from_secs(5),
73 tls: None,
74 }
75 }
76
77 fn state_topic(&self) -> String {
78 format!("spBv1.0/STATE/{}", self.host_id)
79 }
80}
81
82#[derive(Clone, Debug)]
84pub enum HostEvent {
85 NodeBirth {
87 group: String,
89 edge: String,
91 metrics: Vec<Metric>,
93 },
94 NodeData {
96 group: String,
98 edge: String,
100 metrics: Vec<Metric>,
102 },
103 NodeDeath {
107 group: String,
109 edge: String,
111 timestamp: i64,
113 devices: Vec<String>,
115 },
116 DeviceBirth {
118 group: String,
120 edge: String,
122 device: String,
124 metrics: Vec<Metric>,
126 },
127 DeviceData {
129 group: String,
131 edge: String,
133 device: String,
135 metrics: Vec<Metric>,
137 },
138 DeviceDeath {
141 group: String,
143 edge: String,
145 device: String,
147 timestamp: i64,
149 },
150 RebirthRequested {
152 group: String,
154 edge: String,
156 },
157 Ignored,
160}
161
162#[derive(Default)]
163struct DeviceState {
164 online: bool,
165 aliases: AliasRegistry,
166}
167
168struct NodeState {
169 online: bool,
170 bd_seq: Option<i64>,
171 expected_seq: u8,
172 aliases: AliasRegistry,
173 devices: HashMap<String, DeviceState>,
174 last_rebirth: Option<Instant>,
175}
176
177impl NodeState {
178 fn new() -> Self {
179 Self {
180 online: false,
181 bd_seq: None,
182 expected_seq: 0,
183 aliases: AliasRegistry::new(),
184 devices: HashMap::new(),
185 last_rebirth: None,
186 }
187 }
188
189 fn take_rebirth_slot(&mut self, debounce: Duration, now: Instant) -> bool {
191 let allowed = self
192 .last_rebirth
193 .is_none_or(|at| now.duration_since(at) >= debounce);
194 if allowed {
195 self.last_rebirth = Some(now);
196 }
197 allowed
198 }
199}
200
201fn bdseq_of(payload: &Payload) -> Option<i64> {
203 payload
204 .metrics
205 .iter()
206 .find(|m| m.name.as_deref() == Some(BDSEQ_METRIC_NAME))
207 .and_then(|m| match &m.value {
208 MetricValue::Int64(v) => Some(*v),
209 MetricValue::UInt64(v) => i64::try_from(*v).ok(),
210 _ => None,
211 })
212}
213
214fn resolve_names(aliases: &AliasRegistry, mut metrics: Vec<Metric>) -> Vec<Metric> {
216 for metric in &mut metrics {
217 if metric.name.is_none()
218 && let Some(alias) = metric.alias
219 && let Some(name) = aliases.name_for_alias(alias)
220 {
221 metric.name = Some(name.to_owned());
222 }
223 }
224 metrics
225}
226
227enum Step {
230 Event(HostEvent),
231 Rebirth,
232}
233
234pub struct HostApplication<T> {
236 config: HostConfig,
237 transport: T,
238 nodes: HashMap<String, NodeState>,
239 state_ts: i64,
240}
241
242impl<T: MqttTransport> HostApplication<T> {
243 pub fn new(config: HostConfig, transport: T) -> Self {
245 Self {
246 config,
247 transport,
248 nodes: HashMap::new(),
249 state_ts: 0,
250 }
251 }
252
253 pub async fn start(&mut self) -> Result<()> {
260 self.state_ts = now_ms();
261 let state_topic = self.config.state_topic();
262 let will = OutboundMessage {
263 topic: state_topic.clone(),
264 qos: Qos::AtLeastOnce,
265 retain: true,
266 payload: Bytes::from(StatePayload::new(false, self.state_ts).to_json()),
267 };
268 let opts = ConnectOptions {
269 client_id: self.config.client_id.clone(),
270 host: self.config.host.clone(),
271 port: self.config.port,
272 keep_alive_secs: self.config.keep_alive_secs,
273 clean_start: true,
274 will: Some(will),
275 tls: self.config.tls.clone(),
276 };
277 self.transport.connect(&opts).await?;
278
279 self.transport
281 .subscribe(&state_topic, Qos::AtLeastOnce)
282 .await?;
283 let subs = self.config.group_subscriptions.clone();
284 for filter in &subs {
285 self.transport.subscribe(filter, Qos::AtMostOnce).await?;
286 }
287 self.publish_state_birth().await
288 }
289
290 async fn publish_state_birth(&mut self) -> Result<()> {
291 let msg = OutboundMessage {
292 topic: self.config.state_topic(),
293 qos: Qos::AtLeastOnce,
294 retain: true,
295 payload: Bytes::from(StatePayload::new(true, self.state_ts).to_json()),
296 };
297 self.transport.publish(&msg).await
298 }
299
300 pub async fn shutdown(&mut self) -> Result<()> {
305 let msg = OutboundMessage {
306 topic: self.config.state_topic(),
307 qos: Qos::AtLeastOnce,
308 retain: true,
309 payload: Bytes::from(StatePayload::new(false, now_ms()).to_json()),
310 };
311 self.transport.publish(&msg).await?;
312 self.transport.disconnect().await
313 }
314
315 pub async fn publish_node_command(
320 &mut self,
321 group: &str,
322 edge: &str,
323 metrics: Vec<Metric>,
324 ) -> Result<()> {
325 let topic = SparkplugTopic::node(
326 GroupId::new(group)?,
327 EdgeNodeId::new(edge)?,
328 MessageType::NCmd,
329 )?
330 .to_string();
331 let payload = Payload {
332 timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
333 metrics,
334 seq: None,
335 uuid: None,
336 body: None,
337 };
338 self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
341 .await
342 }
343
344 pub async fn publish_device_command(
349 &mut self,
350 group: &str,
351 edge: &str,
352 device: &str,
353 metrics: Vec<Metric>,
354 ) -> Result<()> {
355 let topic = SparkplugTopic::device(
356 GroupId::new(group)?,
357 EdgeNodeId::new(edge)?,
358 DeviceId::new(device)?,
359 MessageType::DCmd,
360 )?
361 .to_string();
362 let payload = Payload {
363 timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
364 metrics,
365 seq: None,
366 uuid: None,
367 body: None,
368 };
369 self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
372 .await
373 }
374
375 async fn publish_raw(&mut self, topic: String, payload: Bytes) -> Result<()> {
376 self.transport
377 .publish(&OutboundMessage {
378 topic,
379 qos: Qos::AtMostOnce,
380 retain: false,
381 payload,
382 })
383 .await
384 }
385
386 async fn send_rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<()> {
388 let topic = SparkplugTopic::Node {
389 group: group.clone(),
390 edge: edge.clone(),
391 ty: MessageType::NCmd,
392 }
393 .to_string();
394 let payload = Payload {
395 timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
396 metrics: vec![Metric::new(
397 NODE_CONTROL_REBIRTH,
398 MetricValue::Boolean(true),
399 )],
400 seq: None,
401 uuid: None,
402 body: None,
403 };
404 self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
407 .await
408 }
409
410 pub async fn recv_and_handle(&mut self) -> Result<Option<HostEvent>> {
415 match self.transport.recv().await? {
416 Some(message) => Ok(Some(self.handle_incoming(&message).await?)),
417 None => Ok(None),
418 }
419 }
420
421 pub async fn handle_incoming(&mut self, message: &IncomingMessage) -> Result<HostEvent> {
427 let topic = SparkplugTopic::parse(&message.topic)?;
428 match topic {
429 SparkplugTopic::HostState { host_id } => {
430 if host_id == self.config.host_id {
433 let state = StatePayload::parse(
434 std::str::from_utf8(&message.payload)
435 .map_err(|_| SparkplugError::InvalidUtf8)?,
436 )?;
437 if !state.online {
438 self.publish_state_birth().await?;
439 }
440 }
441 Ok(HostEvent::Ignored)
442 }
443 SparkplugTopic::Node { group, edge, ty } => match ty {
444 MessageType::NBirth => self.on_node_birth(&group, &edge, &message.payload).await,
445 MessageType::NData => self.on_node_data(&group, &edge, &message.payload).await,
446 MessageType::NDeath => Ok(self.on_node_death(&group, &edge, &message.payload)?),
447 MessageType::NCmd => Ok(HostEvent::Ignored),
449 _ => Ok(HostEvent::Ignored),
450 },
451 SparkplugTopic::Device {
452 group,
453 edge,
454 device,
455 ty,
456 } => match ty {
457 MessageType::DBirth => {
458 self.on_device_birth(&group, &edge, &device, &message.payload)
459 .await
460 }
461 MessageType::DData => {
462 self.on_device_data(&group, &edge, &device, &message.payload)
463 .await
464 }
465 MessageType::DDeath => {
466 self.on_device_death(&group, &edge, &device, &message.payload)
467 .await
468 }
469 MessageType::DCmd => Ok(HostEvent::Ignored),
470 _ => Ok(HostEvent::Ignored),
471 },
472 }
473 }
474
475 fn key(group: &GroupId, edge: &EdgeNodeId) -> String {
476 format!("{group}/{edge}")
477 }
478
479 async fn on_node_birth(
480 &mut self,
481 group: &GroupId,
482 edge: &EdgeNodeId,
483 payload: &[u8],
484 ) -> Result<HostEvent> {
485 let payload = decode(payload, None)?;
486 let key = Self::key(group, edge);
487 let mut duplicate_alias = false;
488 {
489 let node = self.nodes.entry(key).or_insert_with(NodeState::new);
490 node.online = true;
491 node.bd_seq = bdseq_of(&payload);
492 node.expected_seq = payload.seq.unwrap_or(0).wrapping_add(1);
493 node.aliases.clear();
494 node.devices.clear();
495 for m in &payload.metrics {
496 if let Some(name) = &m.name
497 && node
498 .aliases
499 .try_bind(name, m.alias, m.value.datatype())
500 .is_err()
501 {
502 duplicate_alias = true;
503 }
504 }
505 if duplicate_alias {
506 node.online = false;
509 }
510 }
511 if duplicate_alias {
512 return self.rebirth(group, edge).await;
513 }
514 Ok(HostEvent::NodeBirth {
515 group: group.as_str().to_owned(),
516 edge: edge.as_str().to_owned(),
517 metrics: payload.metrics,
518 })
519 }
520
521 async fn on_node_data(
522 &mut self,
523 group: &GroupId,
524 edge: &EdgeNodeId,
525 payload: &[u8],
526 ) -> Result<HostEvent> {
527 let key = Self::key(group, edge);
528 let step = match self.nodes.get_mut(&key) {
529 Some(node) if node.online => match decode(payload, Some(&node.aliases)) {
530 Err(_) => {
534 node.online = false;
535 Step::Rebirth
536 }
537 Ok(payload) if payload.seq == Some(node.expected_seq) => {
538 node.expected_seq = node.expected_seq.wrapping_add(1);
539 Step::Event(HostEvent::NodeData {
540 group: group.as_str().to_owned(),
541 edge: edge.as_str().to_owned(),
542 metrics: resolve_names(&node.aliases, payload.metrics),
543 })
544 }
545 Ok(_) => {
546 node.online = false; Step::Rebirth
548 }
549 },
550 _ => Step::Rebirth, };
552 self.finish(group, edge, step).await
553 }
554
555 fn on_node_death(
556 &mut self,
557 group: &GroupId,
558 edge: &EdgeNodeId,
559 payload: &[u8],
560 ) -> Result<HostEvent> {
561 let decoded = decode(payload, None)?;
562 let incoming = bdseq_of(&decoded);
563 let timestamp = decoded
564 .timestamp
565 .and_then(|t| i64::try_from(t).ok())
566 .unwrap_or_else(now_ms);
567 let key = Self::key(group, edge);
568 if let Some(node) = self.nodes.get_mut(&key)
571 && node.online
572 && node.bd_seq.is_some()
573 && node.bd_seq == incoming
574 {
575 node.online = false;
576 let mut devices: Vec<String> = node
577 .devices
578 .iter()
579 .filter(|(_, d)| d.online)
580 .map(|(name, _)| name.clone())
581 .collect();
582 devices.sort();
583 for device in node.devices.values_mut() {
584 device.online = false;
585 }
586 return Ok(HostEvent::NodeDeath {
587 group: group.as_str().to_owned(),
588 edge: edge.as_str().to_owned(),
589 timestamp,
590 devices,
591 });
592 }
593 Ok(HostEvent::Ignored)
595 }
596
597 async fn on_device_birth(
598 &mut self,
599 group: &GroupId,
600 edge: &EdgeNodeId,
601 device: &DeviceId,
602 payload: &[u8],
603 ) -> Result<HostEvent> {
604 let key = Self::key(group, edge);
605 let dev = device.as_str().to_owned();
606 let step = match self.nodes.get_mut(&key) {
607 Some(node) if node.online => {
608 let payload = decode(payload, None)?;
609 if payload.seq == Some(node.expected_seq) {
610 node.expected_seq = node.expected_seq.wrapping_add(1);
611 let device_state = node.devices.entry(dev.clone()).or_default();
612 device_state.online = true;
613 device_state.aliases.clear();
614 let mut dup = false;
615 for m in &payload.metrics {
616 if let Some(name) = &m.name
617 && device_state
618 .aliases
619 .try_bind(name, m.alias, m.value.datatype())
620 .is_err()
621 {
622 dup = true;
623 }
624 }
625 if dup {
626 Step::Rebirth
627 } else {
628 Step::Event(HostEvent::DeviceBirth {
629 group: group.as_str().to_owned(),
630 edge: edge.as_str().to_owned(),
631 device: dev.clone(),
632 metrics: payload.metrics,
633 })
634 }
635 } else {
636 node.online = false;
637 Step::Rebirth
638 }
639 }
640 _ => Step::Rebirth,
641 };
642 self.finish(group, edge, step).await
643 }
644
645 async fn on_device_data(
646 &mut self,
647 group: &GroupId,
648 edge: &EdgeNodeId,
649 device: &DeviceId,
650 payload: &[u8],
651 ) -> Result<HostEvent> {
652 let key = Self::key(group, edge);
653 let dev = device.as_str().to_owned();
654 let step = match self.nodes.get_mut(&key) {
655 Some(node) if node.online && node.devices.get(&dev).is_some_and(|d| d.online) => {
656 let device_state = node.devices.get(&dev).expect("checked present");
657 match decode(payload, Some(&device_state.aliases)) {
658 Err(_) => {
659 node.online = false;
660 Step::Rebirth
661 }
662 Ok(payload) if payload.seq == Some(node.expected_seq) => {
663 node.expected_seq = node.expected_seq.wrapping_add(1);
664 let device_state = node.devices.get(&dev).expect("checked present");
665 Step::Event(HostEvent::DeviceData {
666 group: group.as_str().to_owned(),
667 edge: edge.as_str().to_owned(),
668 device: dev.clone(),
669 metrics: resolve_names(&device_state.aliases, payload.metrics),
670 })
671 }
672 Ok(_) => {
673 node.online = false;
674 Step::Rebirth
675 }
676 }
677 }
678 _ => Step::Rebirth,
679 };
680 self.finish(group, edge, step).await
681 }
682
683 async fn on_device_death(
684 &mut self,
685 group: &GroupId,
686 edge: &EdgeNodeId,
687 device: &DeviceId,
688 payload: &[u8],
689 ) -> Result<HostEvent> {
690 let key = Self::key(group, edge);
691 let dev = device.as_str().to_owned();
692 let step = match self.nodes.get_mut(&key) {
693 Some(node) if node.online => {
694 let payload = decode(payload, None)?;
695 if payload.seq == Some(node.expected_seq) {
696 node.expected_seq = node.expected_seq.wrapping_add(1);
697 if let Some(device_state) = node.devices.get_mut(&dev) {
698 device_state.online = false;
699 }
700 let timestamp = payload
701 .timestamp
702 .and_then(|t| i64::try_from(t).ok())
703 .unwrap_or_else(now_ms);
704 Step::Event(HostEvent::DeviceDeath {
705 group: group.as_str().to_owned(),
706 edge: edge.as_str().to_owned(),
707 device: dev.clone(),
708 timestamp,
709 })
710 } else {
711 node.online = false;
712 Step::Rebirth
713 }
714 }
715 _ => Step::Rebirth,
716 };
717 self.finish(group, edge, step).await
718 }
719
720 async fn finish(
722 &mut self,
723 group: &GroupId,
724 edge: &EdgeNodeId,
725 step: Step,
726 ) -> Result<HostEvent> {
727 match step {
728 Step::Event(event) => Ok(event),
729 Step::Rebirth => self.rebirth(group, edge).await,
730 }
731 }
732
733 async fn rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<HostEvent> {
734 let key = Self::key(group, edge);
735 let allowed = {
736 let node = self.nodes.entry(key).or_insert_with(NodeState::new);
737 node.take_rebirth_slot(self.config.rebirth_debounce, Instant::now())
738 };
739 if !allowed {
740 return Ok(HostEvent::Ignored);
741 }
742 self.send_rebirth(group, edge).await?;
743 Ok(HostEvent::RebirthRequested {
744 group: group.as_str().to_owned(),
745 edge: edge.as_str().to_owned(),
746 })
747 }
748}