Skip to main content

sparkplug_b/
host.rs

1//! Host Application + Primary Host (Phase 3).
2//!
3//! [`HostApplication`] is an async engine, generic over an [`MqttTransport`],
4//! that implements the Sparkplug Host Application / Primary Host role: publish a
5//! retained STATE birth (and register the offline STATE as the will, sharing one
6//! timestamp), subscribe to the data namespace, and consume Edge Node traffic —
7//! binding aliases on birth, validating the per-Edge-Node sequence number,
8//! resolving alias-only DATA back to names, gating NDEATH on the `bdSeq`, and
9//! requesting a (debounced) rebirth on a sequence gap or data-before-birth.
10//!
11//! Like the edge engine it spawns no tasks and uses no runtime timers; the caller
12//! drives [`HostApplication::recv_and_handle`]. A reorder buffer (the spec's
13//! optional in-order delivery window) is a future enhancement — the base
14//! behavior is the spec's fail-fast-and-rebirth.
15
16use std::collections::HashMap;
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18
19use bytes::Bytes;
20
21use crate::alias::AliasRegistry;
22use crate::codec::{EncodeOptions, decode, encode};
23use crate::error::{Result, SparkplugError};
24use crate::model::{Metric, Payload};
25use crate::state::StatePayload;
26use crate::topic::{DeviceId, EdgeNodeId, GroupId, MessageType, SparkplugTopic};
27use crate::transport::{
28    ConnectOptions, IncomingMessage, MqttTransport, OutboundMessage, Qos, TlsConfig,
29};
30use crate::value::MetricValue;
31use crate::{BDSEQ_METRIC_NAME, NODE_CONTROL_REBIRTH};
32
33fn now_ms() -> i64 {
34    SystemTime::now()
35        .duration_since(UNIX_EPOCH)
36        .map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
37        .unwrap_or(0)
38}
39
40/// Configuration for a [`HostApplication`].
41#[derive(Clone, Debug)]
42pub struct HostConfig {
43    /// The Host Application ID (its STATE topic is `spBv1.0/STATE/<host_id>`).
44    pub host_id: String,
45    /// Sparkplug data-namespace topic filters to subscribe (default `spBv1.0/#`).
46    pub group_subscriptions: Vec<String>,
47    /// MQTT client id.
48    pub client_id: String,
49    /// Broker host.
50    pub host: String,
51    /// Broker port.
52    pub port: u16,
53    /// Keep-alive interval, seconds.
54    pub keep_alive_secs: u16,
55    /// Debounce window for rebirth requests, per Edge Node.
56    pub rebirth_debounce: Duration,
57    /// Optional TLS/mTLS configuration (honored with the `tls` feature).
58    pub tls: Option<TlsConfig>,
59}
60
61impl HostConfig {
62    /// A config for `host_id` subscribing the whole Sparkplug B namespace.
63    #[must_use]
64    pub fn new(host_id: &str) -> Self {
65        Self {
66            host_id: host_id.to_owned(),
67            group_subscriptions: vec!["spBv1.0/#".to_owned()],
68            client_id: format!("host-{host_id}"),
69            host: "localhost".to_owned(),
70            port: 1883,
71            keep_alive_secs: 30,
72            rebirth_debounce: Duration::from_secs(5),
73            tls: None,
74        }
75    }
76
77    fn state_topic(&self) -> String {
78        format!("spBv1.0/STATE/{}", self.host_id)
79    }
80}
81
82/// An event surfaced by [`HostApplication::handle_incoming`].
83#[derive(Clone, Debug)]
84pub enum HostEvent {
85    /// An Edge Node birth was processed; `metrics` carry names + datatypes.
86    NodeBirth {
87        /// Group ID.
88        group: String,
89        /// Edge Node ID.
90        edge: String,
91        /// The birth metrics.
92        metrics: Vec<Metric>,
93    },
94    /// Edge Node data (alias-only metrics resolved back to names where possible).
95    NodeData {
96        /// Group ID.
97        group: String,
98        /// Edge Node ID.
99        edge: String,
100        /// The data metrics.
101        metrics: Vec<Metric>,
102    },
103    /// An Edge Node death: its metrics, and the listed `devices`, are now stale
104    /// as of `timestamp` (epoch ms). The consumer should mark every cached metric
105    /// for this node and those devices stale.
106    NodeDeath {
107        /// Group ID.
108        group: String,
109        /// Edge Node ID.
110        edge: String,
111        /// Death timestamp (the NDEATH payload timestamp, epoch ms).
112        timestamp: i64,
113        /// Devices that were online under this node and are now stale.
114        devices: Vec<String>,
115    },
116    /// A Device birth.
117    DeviceBirth {
118        /// Group ID.
119        group: String,
120        /// Edge Node ID.
121        edge: String,
122        /// Device ID.
123        device: String,
124        /// The birth metrics.
125        metrics: Vec<Metric>,
126    },
127    /// Device data (alias-resolved).
128    DeviceData {
129        /// Group ID.
130        group: String,
131        /// Edge Node ID.
132        edge: String,
133        /// Device ID.
134        device: String,
135        /// The data metrics.
136        metrics: Vec<Metric>,
137    },
138    /// A Device death: its metrics are now stale as of `timestamp` (the DDEATH
139    /// payload timestamp, epoch ms).
140    DeviceDeath {
141        /// Group ID.
142        group: String,
143        /// Edge Node ID.
144        edge: String,
145        /// Device ID.
146        device: String,
147        /// Death timestamp (the DDEATH payload timestamp, epoch ms).
148        timestamp: i64,
149    },
150    /// The host requested a rebirth (published an NCMD) for an Edge Node.
151    RebirthRequested {
152        /// Group ID.
153        group: String,
154        /// Edge Node ID.
155        edge: String,
156    },
157    /// A message that did not require action (own command echo, foreign/own STATE,
158    /// a stale NDEATH, or a debounced rebirth).
159    Ignored,
160}
161
162#[derive(Default)]
163struct DeviceState {
164    online: bool,
165    aliases: AliasRegistry,
166}
167
168struct NodeState {
169    online: bool,
170    bd_seq: Option<i64>,
171    expected_seq: u8,
172    aliases: AliasRegistry,
173    devices: HashMap<String, DeviceState>,
174    last_rebirth: Option<Instant>,
175}
176
177impl NodeState {
178    fn new() -> Self {
179        Self {
180            online: false,
181            bd_seq: None,
182            expected_seq: 0,
183            aliases: AliasRegistry::new(),
184            devices: HashMap::new(),
185            last_rebirth: None,
186        }
187    }
188
189    /// Check the rebirth debounce and, if allowed, record the attempt.
190    fn take_rebirth_slot(&mut self, debounce: Duration, now: Instant) -> bool {
191        let allowed = self
192            .last_rebirth
193            .is_none_or(|at| now.duration_since(at) >= debounce);
194        if allowed {
195            self.last_rebirth = Some(now);
196        }
197        allowed
198    }
199}
200
201/// Extract the `bdSeq` metric value from a payload (INT64/UInt64).
202fn bdseq_of(payload: &Payload) -> Option<i64> {
203    payload
204        .metrics
205        .iter()
206        .find(|m| m.name.as_deref() == Some(BDSEQ_METRIC_NAME))
207        .and_then(|m| match &m.value {
208            MetricValue::Int64(v) => Some(*v),
209            MetricValue::UInt64(v) => i64::try_from(*v).ok(),
210            _ => None,
211        })
212}
213
214/// Fill in metric names from an alias registry for alias-only DATA metrics.
215fn resolve_names(aliases: &AliasRegistry, mut metrics: Vec<Metric>) -> Vec<Metric> {
216    for metric in &mut metrics {
217        if metric.name.is_none()
218            && let Some(alias) = metric.alias
219            && let Some(name) = aliases.name_for_alias(alias)
220        {
221            metric.name = Some(name.to_owned());
222        }
223    }
224    metrics
225}
226
227/// The internal decision a per-node handler reaches without holding a borrow
228/// across an `await`.
229enum Step {
230    Event(HostEvent),
231    Rebirth,
232}
233
234/// The Host Application engine.
235pub struct HostApplication<T> {
236    config: HostConfig,
237    transport: T,
238    nodes: HashMap<String, NodeState>,
239    state_ts: i64,
240}
241
242impl<T: MqttTransport> HostApplication<T> {
243    /// Create the engine. Call [`HostApplication::start`] to connect + birth.
244    pub fn new(config: HostConfig, transport: T) -> Self {
245        Self {
246            config,
247            transport,
248            nodes: HashMap::new(),
249            state_ts: 0,
250        }
251    }
252
253    /// Connect (registering the offline STATE will), subscribe to the own STATE
254    /// topic (QoS 1) and the data namespace (QoS 0), then publish the retained
255    /// online STATE birth that shares the will's timestamp.
256    ///
257    /// # Errors
258    /// Propagates any transport error.
259    pub async fn start(&mut self) -> Result<()> {
260        self.state_ts = now_ms();
261        let state_topic = self.config.state_topic();
262        let will = OutboundMessage {
263            topic: state_topic.clone(),
264            qos: Qos::AtLeastOnce,
265            retain: true,
266            payload: Bytes::from(StatePayload::new(false, self.state_ts).to_json()),
267        };
268        let opts = ConnectOptions {
269            client_id: self.config.client_id.clone(),
270            host: self.config.host.clone(),
271            port: self.config.port,
272            keep_alive_secs: self.config.keep_alive_secs,
273            clean_start: true,
274            will: Some(will),
275            tls: self.config.tls.clone(),
276        };
277        self.transport.connect(&opts).await?;
278
279        // Own STATE at QoS 1, then the data namespace at QoS 0.
280        self.transport
281            .subscribe(&state_topic, Qos::AtLeastOnce)
282            .await?;
283        let subs = self.config.group_subscriptions.clone();
284        for filter in &subs {
285            self.transport.subscribe(filter, Qos::AtMostOnce).await?;
286        }
287        self.publish_state_birth().await
288    }
289
290    async fn publish_state_birth(&mut self) -> Result<()> {
291        let msg = OutboundMessage {
292            topic: self.config.state_topic(),
293            qos: Qos::AtLeastOnce,
294            retain: true,
295            payload: Bytes::from(StatePayload::new(true, self.state_ts).to_json()),
296        };
297        self.transport.publish(&msg).await
298    }
299
300    /// Publish a retained offline STATE and disconnect.
301    ///
302    /// # Errors
303    /// Propagates any transport error.
304    pub async fn shutdown(&mut self) -> Result<()> {
305        let msg = OutboundMessage {
306            topic: self.config.state_topic(),
307            qos: Qos::AtLeastOnce,
308            retain: true,
309            payload: Bytes::from(StatePayload::new(false, now_ms()).to_json()),
310        };
311        self.transport.publish(&msg).await?;
312        self.transport.disconnect().await
313    }
314
315    /// Publish an NCMD to an Edge Node (QoS 0, retain=false).
316    ///
317    /// # Errors
318    /// Propagates an invalid-id or transport error.
319    pub async fn publish_node_command(
320        &mut self,
321        group: &str,
322        edge: &str,
323        metrics: Vec<Metric>,
324    ) -> Result<()> {
325        let topic = SparkplugTopic::node(
326            GroupId::new(group)?,
327            EdgeNodeId::new(edge)?,
328            MessageType::NCmd,
329        )?
330        .to_string();
331        let payload = Payload {
332            timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
333            metrics,
334            seq: None,
335            uuid: None,
336            body: None,
337        };
338        // Commands include datatypes so the receiver can decode them without a
339        // birth-derived registry (e.g. the un-aliased Node Control/Rebirth metric).
340        self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
341            .await
342    }
343
344    /// Publish a DCMD to a Device (QoS 0, retain=false).
345    ///
346    /// # Errors
347    /// Propagates an invalid-id or transport error.
348    pub async fn publish_device_command(
349        &mut self,
350        group: &str,
351        edge: &str,
352        device: &str,
353        metrics: Vec<Metric>,
354    ) -> Result<()> {
355        let topic = SparkplugTopic::device(
356            GroupId::new(group)?,
357            EdgeNodeId::new(edge)?,
358            DeviceId::new(device)?,
359            MessageType::DCmd,
360        )?
361        .to_string();
362        let payload = Payload {
363            timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
364            metrics,
365            seq: None,
366            uuid: None,
367            body: None,
368        };
369        // Commands include datatypes so the receiver can decode them without a
370        // birth-derived registry (e.g. the un-aliased Node Control/Rebirth metric).
371        self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
372            .await
373    }
374
375    async fn publish_raw(&mut self, topic: String, payload: Bytes) -> Result<()> {
376        self.transport
377            .publish(&OutboundMessage {
378                topic,
379                qos: Qos::AtMostOnce,
380                retain: false,
381                payload,
382            })
383            .await
384    }
385
386    /// Request a rebirth: publish an NCMD with `Node Control/Rebirth = true`.
387    async fn send_rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<()> {
388        let topic = SparkplugTopic::Node {
389            group: group.clone(),
390            edge: edge.clone(),
391            ty: MessageType::NCmd,
392        }
393        .to_string();
394        let payload = Payload {
395            timestamp: Some(u64::try_from(now_ms()).unwrap_or(0)),
396            metrics: vec![Metric::new(
397                NODE_CONTROL_REBIRTH,
398                MetricValue::Boolean(true),
399            )],
400            seq: None,
401            uuid: None,
402            body: None,
403        };
404        // Commands include datatypes so the receiver can decode them without a
405        // birth-derived registry (e.g. the un-aliased Node Control/Rebirth metric).
406        self.publish_raw(topic, encode(&payload, EncodeOptions::birth()))
407            .await
408    }
409
410    /// Receive the next inbound message and handle it; `None` if the stream closed.
411    ///
412    /// # Errors
413    /// Propagates transport/decode errors.
414    pub async fn recv_and_handle(&mut self) -> Result<Option<HostEvent>> {
415        match self.transport.recv().await? {
416            Some(message) => Ok(Some(self.handle_incoming(&message).await?)),
417            None => Ok(None),
418        }
419    }
420
421    /// Handle one inbound message, advancing the per-Edge-Node state machine.
422    ///
423    /// # Errors
424    /// Returns a decode/transport error; malformed topics/payloads are surfaced
425    /// rather than panicking.
426    pub async fn handle_incoming(&mut self, message: &IncomingMessage) -> Result<HostEvent> {
427        let topic = SparkplugTopic::parse(&message.topic)?;
428        match topic {
429            SparkplugTopic::HostState { host_id } => {
430                // Self-STATE repair: a retained OFFLINE for our own host id means
431                // the broker still holds our prior will; re-assert ONLINE.
432                if host_id == self.config.host_id {
433                    let state = StatePayload::parse(
434                        std::str::from_utf8(&message.payload)
435                            .map_err(|_| SparkplugError::InvalidUtf8)?,
436                    )?;
437                    if !state.online {
438                        self.publish_state_birth().await?;
439                    }
440                }
441                Ok(HostEvent::Ignored)
442            }
443            SparkplugTopic::Node { group, edge, ty } => match ty {
444                MessageType::NBirth => self.on_node_birth(&group, &edge, &message.payload).await,
445                MessageType::NData => self.on_node_data(&group, &edge, &message.payload).await,
446                MessageType::NDeath => Ok(self.on_node_death(&group, &edge, &message.payload)?),
447                // NCMD is our own outbound echo.
448                MessageType::NCmd => Ok(HostEvent::Ignored),
449                _ => Ok(HostEvent::Ignored),
450            },
451            SparkplugTopic::Device {
452                group,
453                edge,
454                device,
455                ty,
456            } => match ty {
457                MessageType::DBirth => {
458                    self.on_device_birth(&group, &edge, &device, &message.payload)
459                        .await
460                }
461                MessageType::DData => {
462                    self.on_device_data(&group, &edge, &device, &message.payload)
463                        .await
464                }
465                MessageType::DDeath => {
466                    self.on_device_death(&group, &edge, &device, &message.payload)
467                        .await
468                }
469                MessageType::DCmd => Ok(HostEvent::Ignored),
470                _ => Ok(HostEvent::Ignored),
471            },
472        }
473    }
474
475    fn key(group: &GroupId, edge: &EdgeNodeId) -> String {
476        format!("{group}/{edge}")
477    }
478
479    async fn on_node_birth(
480        &mut self,
481        group: &GroupId,
482        edge: &EdgeNodeId,
483        payload: &[u8],
484    ) -> Result<HostEvent> {
485        let payload = decode(payload, None)?;
486        let key = Self::key(group, edge);
487        let mut duplicate_alias = false;
488        {
489            let node = self.nodes.entry(key).or_insert_with(NodeState::new);
490            node.online = true;
491            node.bd_seq = bdseq_of(&payload);
492            node.expected_seq = payload.seq.unwrap_or(0).wrapping_add(1);
493            node.aliases.clear();
494            node.devices.clear();
495            for m in &payload.metrics {
496                if let Some(name) = &m.name
497                    && node
498                        .aliases
499                        .try_bind(name, m.alias, m.value.datatype())
500                        .is_err()
501                {
502                    duplicate_alias = true;
503                }
504            }
505            if duplicate_alias {
506                // A corrupt alias map must not accept subsequent DATA before the
507                // rebirth lands; invalidate the session now.
508                node.online = false;
509            }
510        }
511        if duplicate_alias {
512            return self.rebirth(group, edge).await;
513        }
514        Ok(HostEvent::NodeBirth {
515            group: group.as_str().to_owned(),
516            edge: edge.as_str().to_owned(),
517            metrics: payload.metrics,
518        })
519    }
520
521    async fn on_node_data(
522        &mut self,
523        group: &GroupId,
524        edge: &EdgeNodeId,
525        payload: &[u8],
526    ) -> Result<HostEvent> {
527        let key = Self::key(group, edge);
528        let step = match self.nodes.get_mut(&key) {
529            Some(node) if node.online => match decode(payload, Some(&node.aliases)) {
530                // A malformed in-session payload is treated like a sequence
531                // anomaly: drop the session and request a (debounced) rebirth
532                // rather than failing the caller's receive loop.
533                Err(_) => {
534                    node.online = false;
535                    Step::Rebirth
536                }
537                Ok(payload) if payload.seq == Some(node.expected_seq) => {
538                    node.expected_seq = node.expected_seq.wrapping_add(1);
539                    Step::Event(HostEvent::NodeData {
540                        group: group.as_str().to_owned(),
541                        edge: edge.as_str().to_owned(),
542                        metrics: resolve_names(&node.aliases, payload.metrics),
543                    })
544                }
545                Ok(_) => {
546                    node.online = false; // a gap invalidates the session until rebirth
547                    Step::Rebirth
548                }
549            },
550            _ => Step::Rebirth, // data before/without a birth
551        };
552        self.finish(group, edge, step).await
553    }
554
555    fn on_node_death(
556        &mut self,
557        group: &GroupId,
558        edge: &EdgeNodeId,
559        payload: &[u8],
560    ) -> Result<HostEvent> {
561        let decoded = decode(payload, None)?;
562        let incoming = bdseq_of(&decoded);
563        let timestamp = decoded
564            .timestamp
565            .and_then(|t| i64::try_from(t).ok())
566            .unwrap_or_else(now_ms);
567        let key = Self::key(group, edge);
568        // Honor the death only on a real, matching bdSeq (a `None == None` match
569        // — both sides omitting bdSeq — does not count).
570        if let Some(node) = self.nodes.get_mut(&key)
571            && node.online
572            && node.bd_seq.is_some()
573            && node.bd_seq == incoming
574        {
575            node.online = false;
576            let mut devices: Vec<String> = node
577                .devices
578                .iter()
579                .filter(|(_, d)| d.online)
580                .map(|(name, _)| name.clone())
581                .collect();
582            devices.sort();
583            for device in node.devices.values_mut() {
584                device.online = false;
585            }
586            return Ok(HostEvent::NodeDeath {
587                group: group.as_str().to_owned(),
588                edge: edge.as_str().to_owned(),
589                timestamp,
590                devices,
591            });
592        }
593        // Unknown node, already offline, or bdSeq mismatch (stale death) -> ignore.
594        Ok(HostEvent::Ignored)
595    }
596
597    async fn on_device_birth(
598        &mut self,
599        group: &GroupId,
600        edge: &EdgeNodeId,
601        device: &DeviceId,
602        payload: &[u8],
603    ) -> Result<HostEvent> {
604        let key = Self::key(group, edge);
605        let dev = device.as_str().to_owned();
606        let step = match self.nodes.get_mut(&key) {
607            Some(node) if node.online => {
608                let payload = decode(payload, None)?;
609                if payload.seq == Some(node.expected_seq) {
610                    node.expected_seq = node.expected_seq.wrapping_add(1);
611                    let device_state = node.devices.entry(dev.clone()).or_default();
612                    device_state.online = true;
613                    device_state.aliases.clear();
614                    let mut dup = false;
615                    for m in &payload.metrics {
616                        if let Some(name) = &m.name
617                            && device_state
618                                .aliases
619                                .try_bind(name, m.alias, m.value.datatype())
620                                .is_err()
621                        {
622                            dup = true;
623                        }
624                    }
625                    if dup {
626                        Step::Rebirth
627                    } else {
628                        Step::Event(HostEvent::DeviceBirth {
629                            group: group.as_str().to_owned(),
630                            edge: edge.as_str().to_owned(),
631                            device: dev.clone(),
632                            metrics: payload.metrics,
633                        })
634                    }
635                } else {
636                    node.online = false;
637                    Step::Rebirth
638                }
639            }
640            _ => Step::Rebirth,
641        };
642        self.finish(group, edge, step).await
643    }
644
645    async fn on_device_data(
646        &mut self,
647        group: &GroupId,
648        edge: &EdgeNodeId,
649        device: &DeviceId,
650        payload: &[u8],
651    ) -> Result<HostEvent> {
652        let key = Self::key(group, edge);
653        let dev = device.as_str().to_owned();
654        let step = match self.nodes.get_mut(&key) {
655            Some(node) if node.online && node.devices.get(&dev).is_some_and(|d| d.online) => {
656                let device_state = node.devices.get(&dev).expect("checked present");
657                match decode(payload, Some(&device_state.aliases)) {
658                    Err(_) => {
659                        node.online = false;
660                        Step::Rebirth
661                    }
662                    Ok(payload) if payload.seq == Some(node.expected_seq) => {
663                        node.expected_seq = node.expected_seq.wrapping_add(1);
664                        let device_state = node.devices.get(&dev).expect("checked present");
665                        Step::Event(HostEvent::DeviceData {
666                            group: group.as_str().to_owned(),
667                            edge: edge.as_str().to_owned(),
668                            device: dev.clone(),
669                            metrics: resolve_names(&device_state.aliases, payload.metrics),
670                        })
671                    }
672                    Ok(_) => {
673                        node.online = false;
674                        Step::Rebirth
675                    }
676                }
677            }
678            _ => Step::Rebirth,
679        };
680        self.finish(group, edge, step).await
681    }
682
683    async fn on_device_death(
684        &mut self,
685        group: &GroupId,
686        edge: &EdgeNodeId,
687        device: &DeviceId,
688        payload: &[u8],
689    ) -> Result<HostEvent> {
690        let key = Self::key(group, edge);
691        let dev = device.as_str().to_owned();
692        let step = match self.nodes.get_mut(&key) {
693            Some(node) if node.online => {
694                let payload = decode(payload, None)?;
695                if payload.seq == Some(node.expected_seq) {
696                    node.expected_seq = node.expected_seq.wrapping_add(1);
697                    if let Some(device_state) = node.devices.get_mut(&dev) {
698                        device_state.online = false;
699                    }
700                    let timestamp = payload
701                        .timestamp
702                        .and_then(|t| i64::try_from(t).ok())
703                        .unwrap_or_else(now_ms);
704                    Step::Event(HostEvent::DeviceDeath {
705                        group: group.as_str().to_owned(),
706                        edge: edge.as_str().to_owned(),
707                        device: dev.clone(),
708                        timestamp,
709                    })
710                } else {
711                    node.online = false;
712                    Step::Rebirth
713                }
714            }
715            _ => Step::Rebirth,
716        };
717        self.finish(group, edge, step).await
718    }
719
720    /// Turn a [`Step`] into an event, sending a (debounced) rebirth NCMD when needed.
721    async fn finish(
722        &mut self,
723        group: &GroupId,
724        edge: &EdgeNodeId,
725        step: Step,
726    ) -> Result<HostEvent> {
727        match step {
728            Step::Event(event) => Ok(event),
729            Step::Rebirth => self.rebirth(group, edge).await,
730        }
731    }
732
733    async fn rebirth(&mut self, group: &GroupId, edge: &EdgeNodeId) -> Result<HostEvent> {
734        let key = Self::key(group, edge);
735        let allowed = {
736            let node = self.nodes.entry(key).or_insert_with(NodeState::new);
737            node.take_rebirth_slot(self.config.rebirth_debounce, Instant::now())
738        };
739        if !allowed {
740            return Ok(HostEvent::Ignored);
741        }
742        self.send_rebirth(group, edge).await?;
743        Ok(HostEvent::RebirthRequested {
744            group: group.as_str().to_owned(),
745            edge: edge.as_str().to_owned(),
746        })
747    }
748}