1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bytes::Bytes;
19
20use crate::alias::AliasRegistry;
21use crate::codec::{EncodeOptions, decode, encode};
22use crate::error::{Result, SparkplugError};
23use crate::model::{Metric, Payload};
24use crate::sequence::{BdSeqStore, Seq};
25use crate::state::StatePayload;
26use crate::topic::{DeviceId, EdgeNodeId, GroupId, MessageType, SparkplugTopic};
27use crate::transport::{
28 ConnectOptions, IncomingMessage, MqttTransport, OutboundMessage, Qos, TlsConfig,
29};
30use crate::value::MetricValue;
31use crate::{BDSEQ_METRIC_NAME, NODE_CONTROL_REBIRTH};
32
33fn now_ms() -> u64 {
35 SystemTime::now()
36 .duration_since(UNIX_EPOCH)
37 .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
38 .unwrap_or(0)
39}
40
41fn stamp(metrics: &mut [Metric], ts: u64) {
46 for metric in metrics.iter_mut() {
47 if metric.timestamp.is_none() {
48 metric.timestamp = Some(ts);
49 }
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum EdgeState {
56 Disconnected,
58 WaitingForPrimaryHost,
60 Online,
62}
63
64#[derive(Clone, Debug)]
66pub struct EdgeNodeConfig {
67 pub group: GroupId,
69 pub edge: EdgeNodeId,
71 pub devices: Vec<DeviceId>,
73 pub primary_host_id: Option<String>,
75 pub use_aliases: bool,
77 pub rebirth_debounce: Duration,
79 pub client_id: String,
81 pub host: String,
83 pub port: u16,
85 pub keep_alive_secs: u16,
87 pub tls: Option<TlsConfig>,
89}
90
91impl EdgeNodeConfig {
92 pub fn new(group: &str, edge: &str, devices: &[&str]) -> Result<Self> {
98 Ok(Self {
99 group: GroupId::new(group)?,
100 edge: EdgeNodeId::new(edge)?,
101 devices: devices
102 .iter()
103 .map(|d| DeviceId::new(*d))
104 .collect::<Result<_>>()?,
105 primary_host_id: None,
106 use_aliases: false,
107 rebirth_debounce: Duration::from_secs(5),
108 client_id: format!("{group}-{edge}"),
109 host: "localhost".to_owned(),
110 port: 1883,
111 keep_alive_secs: 30,
112 tls: None,
113 })
114 }
115}
116
117pub trait DataSource {
119 fn node_birth_metrics(&self) -> Vec<Metric>;
121
122 fn device_birth_metrics(&self, device: &str) -> Vec<Metric>;
124}
125
126#[derive(Clone, Debug)]
128pub enum EdgeEvent {
129 Rebirthed,
131 RebirthDebounced,
133 NodeCommand(Payload),
135 DeviceCommand {
137 device: String,
139 payload: Payload,
141 },
142 PrimaryHostState(StatePayload),
144 Ignored,
146}
147
148pub struct EdgeNode<T, S> {
150 config: EdgeNodeConfig,
151 transport: T,
152 bdseq_store: S,
153 seq: Seq,
154 bd_seq: u8,
155 aliases: AliasRegistry,
156 next_alias: u64,
157 born_devices: Vec<String>,
158 last_rebirth: Option<Instant>,
159 last_state_ts: Option<i64>,
160 state: EdgeState,
161}
162
163impl<T: MqttTransport, S: BdSeqStore> EdgeNode<T, S> {
164 pub fn new(config: EdgeNodeConfig, transport: T, bdseq_store: S) -> Self {
166 Self {
167 config,
168 transport,
169 bdseq_store,
170 seq: Seq::new(),
171 bd_seq: 0,
172 aliases: AliasRegistry::new(),
173 next_alias: 0,
174 born_devices: Vec::new(),
175 last_rebirth: None,
176 last_state_ts: None,
177 state: EdgeState::Disconnected,
178 }
179 }
180
181 #[must_use]
183 pub fn state(&self) -> EdgeState {
184 self.state
185 }
186
187 #[must_use]
189 pub fn bd_seq(&self) -> u8 {
190 self.bd_seq
191 }
192
193 fn node_topic(&self, ty: MessageType) -> String {
194 SparkplugTopic::Node {
195 group: self.config.group.clone(),
196 edge: self.config.edge.clone(),
197 ty,
198 }
199 .to_string()
200 }
201
202 fn device_topic(&self, device: &DeviceId, ty: MessageType) -> String {
203 SparkplugTopic::Device {
204 group: self.config.group.clone(),
205 edge: self.config.edge.clone(),
206 device: device.clone(),
207 ty,
208 }
209 .to_string()
210 }
211
212 fn ndeath_payload(&self) -> Payload {
214 let ts = now_ms();
215 let mut metrics = vec![Metric::new(
216 BDSEQ_METRIC_NAME,
217 MetricValue::Int64(i64::from(self.bd_seq)),
218 )];
219 stamp(&mut metrics, ts);
220 Payload {
221 timestamp: Some(ts),
222 metrics,
223 seq: None, uuid: None,
225 body: None,
226 }
227 }
228
229 async fn publish_raw(
230 &mut self,
231 topic: String,
232 payload: Bytes,
233 qos: Qos,
234 retain: bool,
235 ) -> Result<()> {
236 self.transport
237 .publish(&OutboundMessage {
238 topic,
239 qos,
240 retain,
241 payload,
242 })
243 .await
244 }
245
246 pub async fn connect<D: DataSource>(&mut self, source: &D) -> Result<()> {
253 self.bd_seq = self.bdseq_store.load_next_death()?;
255 self.bdseq_store
256 .store_next_death(self.bd_seq.wrapping_add(1))?;
257
258 let will_payload = encode(&self.ndeath_payload(), EncodeOptions::birth());
259 let will = OutboundMessage {
260 topic: self.node_topic(MessageType::NDeath),
261 qos: Qos::AtLeastOnce, retain: false, payload: will_payload,
264 };
265 let opts = ConnectOptions {
266 client_id: self.config.client_id.clone(),
267 host: self.config.host.clone(),
268 port: self.config.port,
269 keep_alive_secs: self.config.keep_alive_secs,
270 clean_start: true,
271 will: Some(will),
272 tls: self.config.tls.clone(),
273 };
274 self.transport.connect(&opts).await?;
275
276 let ncmd_topic = self.node_topic(MessageType::NCmd);
280 self.transport
281 .subscribe(&ncmd_topic, Qos::AtLeastOnce)
282 .await?;
283 let devices = self.config.devices.clone();
284 for device in &devices {
285 let dcmd_topic = self.device_topic(device, MessageType::DCmd);
286 self.transport
287 .subscribe(&dcmd_topic, Qos::AtLeastOnce)
288 .await?;
289 }
290
291 if let Some(host_id) = self.config.primary_host_id.clone() {
292 let state_topic = format!("spBv1.0/STATE/{host_id}");
293 self.transport
294 .subscribe(&state_topic, Qos::AtLeastOnce)
295 .await?;
296 self.state = EdgeState::WaitingForPrimaryHost;
297 } else {
298 self.publish_birth_sequence(source).await?;
299 }
300 Ok(())
301 }
302
303 pub async fn publish_birth_sequence<D: DataSource>(&mut self, source: &D) -> Result<()> {
308 self.seq.reset();
309 if self.config.use_aliases {
310 self.aliases.clear();
311 self.next_alias = 0;
312 }
313 self.born_devices.clear();
314
315 let ts = now_ms();
316 let mut metrics = self.build_node_birth_metrics(source);
317 stamp(&mut metrics, ts);
318 let payload = Payload {
319 timestamp: Some(ts),
320 metrics,
321 seq: Some(self.seq.next_value()), uuid: None,
323 body: None,
324 };
325 let bytes = encode(&payload, EncodeOptions::birth());
326 self.publish_raw(
327 self.node_topic(MessageType::NBirth),
328 bytes,
329 Qos::AtMostOnce,
330 false,
331 )
332 .await?;
333
334 let devices = self.config.devices.clone();
335 for device in &devices {
336 self.publish_device_birth(source, device).await?;
337 }
338 self.state = EdgeState::Online;
339 Ok(())
340 }
341
342 fn build_node_birth_metrics<D: DataSource>(&mut self, source: &D) -> Vec<Metric> {
343 let mut metrics = source.node_birth_metrics();
344 if !metrics
345 .iter()
346 .any(|m| m.name.as_deref() == Some(NODE_CONTROL_REBIRTH))
347 {
348 metrics.push(Metric::new(
349 NODE_CONTROL_REBIRTH,
350 MetricValue::Boolean(false),
351 ));
352 }
353 if !metrics
354 .iter()
355 .any(|m| m.name.as_deref() == Some(BDSEQ_METRIC_NAME))
356 {
357 metrics.push(Metric::new(
358 BDSEQ_METRIC_NAME,
359 MetricValue::Int64(i64::from(self.bd_seq)),
360 ));
361 }
362 self.assign_aliases(&mut metrics);
363 metrics
364 }
365
366 async fn publish_device_birth<D: DataSource>(
367 &mut self,
368 source: &D,
369 device: &DeviceId,
370 ) -> Result<()> {
371 let ts = now_ms();
372 let mut metrics = source.device_birth_metrics(device.as_str());
373 self.assign_aliases(&mut metrics);
374 stamp(&mut metrics, ts);
375 let payload = Payload {
376 timestamp: Some(ts),
377 metrics,
378 seq: Some(self.seq.next_value()),
379 uuid: None,
380 body: None,
381 };
382 let bytes = encode(&payload, EncodeOptions::birth());
383 self.publish_raw(
384 self.device_topic(device, MessageType::DBirth),
385 bytes,
386 Qos::AtMostOnce,
387 false,
388 )
389 .await?;
390 self.born_devices.push(device.as_str().to_owned());
391 Ok(())
392 }
393
394 fn assign_aliases(&mut self, metrics: &mut [Metric]) {
397 if !self.config.use_aliases {
398 return;
399 }
400 for metric in metrics.iter_mut() {
401 let Some(name) = metric.name.clone() else {
402 continue;
403 };
404 if name == NODE_CONTROL_REBIRTH || name == BDSEQ_METRIC_NAME {
406 continue;
407 }
408 let alias = self.next_alias;
409 self.next_alias += 1;
410 metric.alias = Some(alias);
411 self.aliases
412 .bind(&name, Some(alias), metric.value.datatype());
413 }
414 }
415
416 fn to_data_metrics(&self, metrics: Vec<Metric>) -> Vec<Metric> {
419 if !self.config.use_aliases {
420 return metrics;
421 }
422 metrics
423 .into_iter()
424 .map(|mut m| {
425 if let Some(name) = m.name.as_deref()
426 && let Some(alias) = self.aliases.alias_for_name(name)
427 {
428 m.alias = Some(alias);
429 m.name = None;
430 }
431 m
432 })
433 .collect()
434 }
435
436 pub async fn publish_node_data(&mut self, metrics: Vec<Metric>) -> Result<()> {
441 if self.state != EdgeState::Online {
442 return Err(SparkplugError::InvalidTopic(
443 "cannot publish NDATA before NBIRTH".to_owned(),
444 ));
445 }
446 let ts = now_ms();
447 let mut data = self.to_data_metrics(metrics);
448 stamp(&mut data, ts);
449 let payload = Payload {
450 timestamp: Some(ts),
451 metrics: data,
452 seq: Some(self.seq.next_value()),
453 uuid: None,
454 body: None,
455 };
456 let bytes = encode(&payload, EncodeOptions::data());
457 self.publish_raw(
458 self.node_topic(MessageType::NData),
459 bytes,
460 Qos::AtMostOnce,
461 false,
462 )
463 .await
464 }
465
466 pub async fn publish_device_data(&mut self, device: &str, metrics: Vec<Metric>) -> Result<()> {
471 if !self.born_devices.iter().any(|d| d == device) {
472 return Err(SparkplugError::InvalidTopic(format!(
473 "device {device:?} is not born; cannot publish DDATA"
474 )));
475 }
476 let device_id = DeviceId::new(device)?;
477 let ts = now_ms();
478 let mut data = self.to_data_metrics(metrics);
479 stamp(&mut data, ts);
480 let payload = Payload {
481 timestamp: Some(ts),
482 metrics: data,
483 seq: Some(self.seq.next_value()),
484 uuid: None,
485 body: None,
486 };
487 let bytes = encode(&payload, EncodeOptions::data());
488 self.publish_raw(
489 self.device_topic(&device_id, MessageType::DData),
490 bytes,
491 Qos::AtMostOnce,
492 false,
493 )
494 .await
495 }
496
497 pub async fn publish_device_death(&mut self, device: &str) -> Result<()> {
503 let pos = self
504 .born_devices
505 .iter()
506 .position(|d| d == device)
507 .ok_or_else(|| {
508 SparkplugError::InvalidTopic(format!(
509 "device {device:?} is not born; cannot publish DDEATH"
510 ))
511 })?;
512 let device_id = DeviceId::new(device)?;
513 let payload = Payload {
514 timestamp: Some(now_ms()),
515 metrics: Vec::new(),
516 seq: Some(self.seq.next_value()),
517 uuid: None,
518 body: None,
519 };
520 let bytes = encode(&payload, EncodeOptions::birth());
521 self.publish_raw(
522 self.device_topic(&device_id, MessageType::DDeath),
523 bytes,
524 Qos::AtMostOnce,
525 false,
526 )
527 .await?;
528 self.born_devices.remove(pos);
529 Ok(())
530 }
531
532 fn rebirth_allowed(&self) -> bool {
533 match self.last_rebirth {
534 None => true,
535 Some(at) => at.elapsed() >= self.config.rebirth_debounce,
536 }
537 }
538
539 pub async fn handle_incoming<D: DataSource>(
549 &mut self,
550 message: &IncomingMessage,
551 source: &D,
552 ) -> Result<EdgeEvent> {
553 let topic = SparkplugTopic::parse(&message.topic)?;
554 match topic {
555 SparkplugTopic::Node {
556 ty: MessageType::NCmd,
557 ..
558 } => {
559 let payload = decode(&message.payload, Some(&self.aliases))?;
560 let is_rebirth = payload.metrics.iter().any(|m| {
561 m.name.as_deref() == Some(NODE_CONTROL_REBIRTH)
562 && matches!(m.value, MetricValue::Boolean(true))
563 });
564 if is_rebirth {
565 if self.rebirth_allowed() {
566 self.publish_birth_sequence(source).await?;
567 self.last_rebirth = Some(Instant::now());
568 return Ok(EdgeEvent::Rebirthed);
569 }
570 return Ok(EdgeEvent::RebirthDebounced);
571 }
572 Ok(EdgeEvent::NodeCommand(payload))
573 }
574 SparkplugTopic::Device {
575 device,
576 ty: MessageType::DCmd,
577 ..
578 } => {
579 let payload = decode(&message.payload, Some(&self.aliases))?;
580 Ok(EdgeEvent::DeviceCommand {
581 device: device.as_str().to_owned(),
582 payload,
583 })
584 }
585 SparkplugTopic::HostState { host_id } => {
586 if self.config.primary_host_id.as_deref() != Some(host_id.as_str()) {
587 return Ok(EdgeEvent::Ignored);
588 }
589 let state = StatePayload::parse(
590 std::str::from_utf8(&message.payload)
591 .map_err(|_| SparkplugError::InvalidUtf8)?,
592 )?;
593 if self
596 .last_state_ts
597 .is_some_and(|last| state.timestamp < last)
598 {
599 return Ok(EdgeEvent::Ignored);
600 }
601 self.last_state_ts = Some(state.timestamp);
602 if state.online {
603 if self.state == EdgeState::WaitingForPrimaryHost {
604 self.publish_birth_sequence(source).await?;
605 }
606 } else if self.state == EdgeState::Online {
607 self.disconnect().await?;
611 }
612 Ok(EdgeEvent::PrimaryHostState(state))
613 }
614 _ => Ok(EdgeEvent::Ignored),
615 }
616 }
617
618 pub async fn recv_and_handle<D: DataSource>(
623 &mut self,
624 source: &D,
625 ) -> Result<Option<EdgeEvent>> {
626 match self.transport.recv().await? {
627 Some(message) => Ok(Some(self.handle_incoming(&message, source).await?)),
628 None => Ok(None),
629 }
630 }
631
632 pub async fn disconnect(&mut self) -> Result<()> {
638 let bytes = encode(&self.ndeath_payload(), EncodeOptions::birth());
639 self.publish_raw(
640 self.node_topic(MessageType::NDeath),
641 bytes,
642 Qos::AtLeastOnce,
643 false,
644 )
645 .await?;
646 self.transport.disconnect().await?;
647 self.state = EdgeState::Disconnected;
648 Ok(())
649 }
650}