Skip to main content

srad_eon/
node.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicBool, AtomicU8, Ordering},
5        Arc, Mutex,
6    },
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use log::{debug, error, info, warn};
11use srad_client::{DeviceMessage, DynClient, DynEventLoop, Event, LastWill, Message, MessageKind};
12use srad_types::{
13    constants::{self, BDSEQ, NODE_CONTROL_REBIRTH},
14    payload::{self, metric::Value, DataType, Payload},
15    topic::{
16        DeviceMessage as DeviceMessageType, DeviceTopic, NodeMessage as NodeMessageType, NodeTopic,
17        QoS, StateTopic, Topic, TopicFilter,
18    },
19    utils::timestamp,
20    MetricValue, Template, TemplateDefinition,
21};
22use thiserror::Error;
23use tokio::{
24    select,
25    sync::{mpsc, oneshot},
26    time::timeout,
27};
28
29use crate::{
30    birth::BirthObjectType,
31    device::{DeviceMap, DeviceRegistrationError},
32    metric_manager::manager::DynNodeMetricManager,
33    BirthInitializer, BirthMetricDetails, BirthType, DeviceHandle, DeviceMetricManager, EoNBuilder,
34    MessageMetrics, MetricPublisher, PublishError, PublishMetric, StateError,
35};
36
37pub(crate) struct EoNConfig {
38    node_rebirth_request_cooldown: Duration,
39}
40
41struct EoNStateInner {
42    seq: u8,
43    online: bool,
44    birthed: bool,
45}
46
47pub(crate) struct EoNState {
48    running: AtomicBool,
49    bdseq: AtomicU8,
50    inner: Mutex<EoNStateInner>,
51    pub group_id: String,
52    pub edge_node_id: String,
53    pub ndata_topic: NodeTopic,
54}
55
56impl EoNState {
57    pub(crate) fn get_next_seq(&self) -> Result<u64, StateError> {
58        let mut state = self.inner.lock().unwrap();
59        if !state.online {
60            return Err(StateError::Offline);
61        }
62        if !state.birthed {
63            return Err(StateError::UnBirthed);
64        }
65        state.seq = state.seq.wrapping_add(1);
66        Ok(state.seq as u64)
67    }
68
69    fn online_swap(&self, online: bool) -> bool {
70        let mut state = self.inner.lock().unwrap();
71        let old_online_state = state.online;
72        state.online = online;
73        old_online_state
74    }
75
76    fn is_online(&self) -> bool {
77        self.inner.lock().unwrap().online
78    }
79
80    fn set_dead(&self) {
81        let mut state = self.inner.lock().unwrap();
82        state.birthed = false;
83    }
84
85    fn birthed(&self) -> bool {
86        self.inner.lock().unwrap().birthed
87    }
88
89    fn start_birth(&self) {
90        let mut state = self.inner.lock().unwrap();
91        state.birthed = false;
92        state.seq = 0;
93    }
94
95    fn birth_completed(&self) {
96        self.inner.lock().unwrap().birthed = true
97    }
98
99    fn birth_topic(&self) -> NodeTopic {
100        NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
101    }
102
103    fn generate_death_payload(&self) -> Payload {
104        let mut metric = srad_types::payload::Metric::new();
105        metric
106            .set_name(constants::BDSEQ.to_string())
107            .set_value(MetricValue::from(self.bdseq.load(Ordering::SeqCst) as i64).into());
108        Payload {
109            seq: None,
110            metrics: vec![metric],
111            uuid: None,
112            timestamp: None,
113            body: None,
114        }
115    }
116
117    fn create_last_will(&self) -> LastWill {
118        LastWill::new_node(
119            &self.group_id,
120            &self.edge_node_id,
121            self.generate_death_payload(),
122        )
123    }
124
125    fn sub_topics(&self) -> Vec<TopicFilter> {
126        vec![
127            TopicFilter::new_with_qos(
128                Topic::NodeTopic(NodeTopic::new(
129                    &self.group_id,
130                    NodeMessageType::NCmd,
131                    &self.edge_node_id,
132                )),
133                QoS::AtLeastOnce,
134            ),
135            TopicFilter::new_with_qos(
136                Topic::DeviceTopic(DeviceTopic::new(
137                    &self.group_id,
138                    DeviceMessageType::DCmd,
139                    &self.edge_node_id,
140                    "+",
141                )),
142                QoS::AtLeastOnce,
143            ),
144            TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
145        ]
146    }
147}
148
149#[derive(Debug)]
150struct EoNShutdown;
151
152/// A handle for interacting with the Edge Node.
153///
154/// `NodeHandle` provides an interface for interacting with an edge node,
155/// including device management, node lifecycle operations, and metric publishing.
156#[derive(Clone)]
157pub struct NodeHandle {
158    state: Arc<EoNState>,
159    client: Arc<DynClient>,
160    devices: Arc<Mutex<DeviceMap>>,
161    stop_tx: mpsc::Sender<EoNShutdown>,
162    rebirth_tx: mpsc::Sender<()>,
163}
164
165impl NodeHandle {
166    /// Stop all operations, sending a death certificate and disconnect from the broker.
167    ///
168    /// This will cancel [EoN::run()]
169    pub async fn cancel(&self) {
170        if !self.state.running.load(Ordering::SeqCst) {
171            return;
172        }
173        info!("Edge node stopping. node={}", self.state.edge_node_id);
174        let topic = NodeTopic::new(
175            &self.state.group_id,
176            NodeMessageType::NDeath,
177            &self.state.edge_node_id,
178        );
179        let payload = self.state.generate_death_payload();
180        match self.client.try_publish_node_message(topic, payload).await {
181            Ok(_) => (),
182            Err(_) => debug!(
183                "Unable to publish node death certificate on exit. node={}",
184                self.state.edge_node_id
185            ),
186        };
187        _ = self.stop_tx.send(EoNShutdown).await;
188        _ = self.client.disconnect().await;
189    }
190
191    /// Manually trigger a rebirth for the node
192    pub fn rebirth(&self) {
193        //try send, if the channel (size 1) is full then a rebirth will be executed anyways
194        _ = self.rebirth_tx.try_send(());
195    }
196
197    /// Registers a new device with the node.
198    ///
199    /// Returns an error if:
200    ///   - A device with the same name is already registered
201    ///   - The device name is invalid
202    pub fn register_device<S, M>(
203        &self,
204        name: S,
205        dev_impl: M,
206    ) -> Result<DeviceHandle, DeviceRegistrationError>
207    where
208        S: Into<String>,
209        M: DeviceMetricManager + Send + Sync + 'static,
210    {
211        let name = name.into();
212        if let Err(e) = srad_types::utils::validate_name(&name) {
213            return Err(DeviceRegistrationError::InvalidName(e));
214        }
215        let handle = self.devices.lock().unwrap().add_device(
216            &self.state.group_id,
217            &self.state.edge_node_id,
218            name,
219            Box::new(dev_impl),
220            self.state.clone(),
221            self.client.clone(),
222        )?;
223        Ok(handle)
224    }
225
226    /// Unregister a device using it's handle.
227    pub async fn unregister_device(&self, handle: DeviceHandle) {
228        self.unregister_device_named(&handle.state.name).await;
229    }
230
231    /// Unregister a device using it's name.
232    pub async fn unregister_device_named(&self, name: &String) {
233        self.devices.lock().unwrap().remove_device(name)
234    }
235
236    fn publish_metrics_to_payload(&self, seq: u64, metrics: Vec<PublishMetric>) -> Payload {
237        let timestamp = timestamp();
238        let mut payload_metrics = Vec::with_capacity(metrics.len());
239        for x in metrics.into_iter() {
240            payload_metrics.push(x.into());
241        }
242        Payload {
243            timestamp: Some(timestamp),
244            metrics: payload_metrics,
245            seq: Some(seq),
246            uuid: None,
247            body: None,
248        }
249    }
250}
251
252impl MetricPublisher for NodeHandle {
253    async fn try_publish_metrics_unsorted(
254        &self,
255        metrics: Vec<PublishMetric>,
256    ) -> Result<(), PublishError> {
257        if metrics.is_empty() {
258            return Err(PublishError::NoMetrics);
259        }
260        match self
261            .client
262            .try_publish_node_message(
263                self.state.ndata_topic.clone(),
264                self.publish_metrics_to_payload(self.state.get_next_seq()?, metrics),
265            )
266            .await
267        {
268            Ok(_) => Ok(()),
269            Err(_) => Err(PublishError::State(StateError::Offline)),
270        }
271    }
272
273    async fn publish_metrics_unsorted(
274        &self,
275        metrics: Vec<PublishMetric>,
276    ) -> Result<(), PublishError> {
277        if metrics.is_empty() {
278            return Err(PublishError::NoMetrics);
279        }
280        match self
281            .client
282            .publish_node_message(
283                self.state.ndata_topic.clone(),
284                self.publish_metrics_to_payload(self.state.get_next_seq()?, metrics),
285            )
286            .await
287        {
288            Ok(_) => Ok(()),
289            Err(_) => Err(PublishError::State(StateError::Offline)),
290        }
291    }
292}
293
294#[derive(Debug, Error)]
295pub enum TemplateRegistryError {
296    #[error("Invalid template name")]
297    InvalidName,
298    #[error("Duplicate template. A template with that name is already registered")]
299    Duplicate,
300    #[error("The Templates Definition is invalid")]
301    InvalidDefinition,
302    #[error("The Templates Definition contained a template that has not been registered")]
303    UnregisteredMetric,
304}
305
306/// A struct representing a collection of Template Definitions for a Node
307///
308/// Used to manage the template definitions for the node. A definition must be included in the registry
309/// in order to register a metric that has a template datatype with a [BirthInitializer].
310///
311/// When a Node generates a birth message, a template definition for each templates registered with this structure
312/// will be included in the Nodes birth message. [srad_types::TemplateMetadata::template_definition_metric_name]
313/// for the type is used to define the name of the metric that represents the template definition. [srad_types::Template::template_definition()]
314/// is used to generate the definition used for the metrics value.
315///
316/// For templates which use another template as one of it's metrics, the template used as a field must be registered first.
317///
318/// This registry is checked any time a metric that uses a template as the type for its value is registered for birth.
319#[derive(Debug, Clone)]
320pub struct TemplateRegistry {
321    templates: HashMap<String, TemplateDefinition>,
322}
323
324impl TemplateRegistry {
325    pub(crate) fn new() -> Self {
326        Self {
327            templates: HashMap::new(),
328        }
329    }
330
331    /// Empty the registry of all template definitions
332    pub fn clear(&mut self) {
333        self.templates.clear();
334    }
335
336    /// Remove a template
337    pub fn deregister(&mut self, name: &str) {
338        self.templates.remove(name);
339    }
340
341    // Recurse through all template metrics in the metric list and ensure they have been registered.
342    fn check_template_metrics(
343        &self,
344        metrics: &Vec<payload::Metric>,
345    ) -> Result<(), TemplateRegistryError> {
346        for x in metrics {
347            let datatype = match &x.datatype {
348                Some(t) => match DataType::try_from(*t) {
349                    Ok(datatype) => datatype,
350                    Err(_) => return Err(TemplateRegistryError::InvalidDefinition),
351                },
352                None => return Err(TemplateRegistryError::InvalidDefinition),
353            };
354            if datatype != DataType::Template {
355                continue;
356            }
357
358            if let Value::TemplateValue(template) = x
359                .value
360                .as_ref()
361                .ok_or(TemplateRegistryError::InvalidDefinition)?
362            {
363                let ref_name = template
364                    .template_ref
365                    .as_ref()
366                    .ok_or(TemplateRegistryError::InvalidDefinition)?;
367                if self.templates.contains_key(ref_name) {
368                    return Ok(());
369                }
370                self.check_template_metrics(&template.metrics)?;
371            } else {
372                return Err(TemplateRegistryError::InvalidDefinition);
373            }
374        }
375        Ok(())
376    }
377
378    /// Add a template definition
379    pub fn register<T: Template>(&mut self) -> Result<(), TemplateRegistryError> {
380        let name = T::template_definition_metric_name();
381        if name == NODE_CONTROL_REBIRTH || name == BDSEQ {
382            return Err(TemplateRegistryError::InvalidName);
383        }
384
385        if self.templates.contains_key(&name) {
386            return Err(TemplateRegistryError::Duplicate);
387        }
388
389        let definition = T::template_definition();
390        self.check_template_metrics(&definition.metrics)?;
391
392        self.templates.insert(name, definition);
393        Ok(())
394    }
395
396    /// Check the registry contains a template with the given name
397    pub fn contains(&self, template_definition_metric_name: &str) -> bool {
398        self.templates.contains_key(template_definition_metric_name)
399    }
400}
401
402struct Node {
403    metric_manager: Box<DynNodeMetricManager>,
404    client: Arc<DynClient>,
405    devices: Arc<Mutex<DeviceMap>>,
406    state: Arc<EoNState>,
407    config: Arc<EoNConfig>,
408    stop_tx: mpsc::Sender<EoNShutdown>,
409    last_node_rebirth_request: Duration,
410
411    template_registry: Arc<TemplateRegistry>,
412
413    rebirth_request_tx: mpsc::Sender<()>,
414
415    node_message_rx: mpsc::UnboundedReceiver<Message>,
416    client_state_rx: mpsc::Receiver<ClientStateMessage>,
417    rebirth_request_rx: mpsc::Receiver<()>,
418}
419
420impl Node {
421    fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
422        let timestamp = timestamp();
423        let mut birth_initializer =
424            BirthInitializer::new(BirthObjectType::Node, self.template_registry.clone());
425
426        birth_initializer
427            .register_metric(
428                BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
429                    .use_alias(false),
430            )
431            .unwrap();
432        birth_initializer
433            .register_metric(
434                BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
435                    .use_alias(false),
436            )
437            .unwrap();
438
439        for (name, template_definition) in &self.template_registry.templates {
440            birth_initializer
441                .register_template_definition(name.clone(), template_definition.clone())
442                .unwrap();
443        }
444
445        self.metric_manager.initialise_birth(&mut birth_initializer);
446        let metrics = birth_initializer.finish();
447
448        Payload {
449            seq: Some(seq),
450            timestamp: Some(timestamp),
451            metrics,
452            uuid: None,
453            body: None,
454        }
455    }
456
457    async fn node_birth(&mut self) -> Result<(), ()> {
458        /* [tck-id-topics-nbirth-seq-num] The NBIRTH MUST include a sequence number in the payload and it MUST have a value of 0. */
459        self.state.start_birth();
460
461        let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
462
463        //TODO any way we can avoid this clone? could use Ref counting in the TemplateRegistry but it might not be worth it
464        let mut updatable_template_registry = self.template_registry.as_ref().clone();
465        self.metric_manager
466            .birth_update_template_registry(&mut updatable_template_registry);
467        self.template_registry = Arc::new(updatable_template_registry);
468
469        let payload = self.generate_birth_payload(bdseq, 0);
470        let topic = self.state.birth_topic();
471        match self.client.publish_node_message(topic, payload).await {
472            Ok(_) => {
473                self.state.birth_completed();
474                Ok(())
475            }
476            Err(_) => {
477                error!(
478                    "Publishing node birth message failed. node={}",
479                    self.state.edge_node_id
480                );
481                Err(())
482            }
483        }
484    }
485
486    async fn birth(&mut self, birth_type: BirthType) {
487        info!(
488            "Birthing Node. node={} type={birth_type:?}",
489            self.state.edge_node_id
490        );
491        if self.node_birth().await.is_err() {
492            return;
493        }
494        self.devices
495            .lock()
496            .unwrap()
497            .birth_devices(birth_type, &self.template_registry);
498    }
499
500    async fn rebirth(&mut self) {
501        if !self.state.birthed() {
502            return;
503        }
504        self.birth(BirthType::Rebirth).await;
505    }
506
507    fn death(&self) {
508        self.state.set_dead();
509        self.state.bdseq.fetch_add(1, Ordering::SeqCst);
510        self.devices.lock().unwrap().on_death();
511    }
512
513    async fn on_online(&mut self) {
514        if self.state.online_swap(true) {
515            return;
516        }
517
518        info!("Edge node online. node={}", self.state.edge_node_id);
519        let sub_topics = self.state.sub_topics();
520
521        if self.client.subscribe_many(sub_topics).await.is_ok() {
522            self.birth(BirthType::Birth).await
523        };
524    }
525
526    fn on_offline(&self, will_sender: oneshot::Sender<LastWill>) {
527        if !self.state.online_swap(false) {
528            return;
529        }
530
531        info!("Edge node offline. node={}", self.state.edge_node_id);
532        self.death();
533        let new_lastwill = self.state.create_last_will();
534        _ = will_sender.send(new_lastwill);
535    }
536
537    async fn on_sparkplug_message(&mut self, message: Message, handle: NodeHandle) {
538        let payload = message.payload;
539        let message_kind = message.kind;
540
541        if message_kind == MessageKind::Cmd {
542            let mut rebirth = false;
543            for x in &payload.metrics {
544                if x.alias.is_some() {
545                    continue;
546                }
547
548                let metric_name = match &x.name {
549                    Some(name) => name,
550                    None => continue,
551                };
552
553                if metric_name != NODE_CONTROL_REBIRTH {
554                    continue;
555                }
556
557                rebirth = match &x.value {
558                    Some(Value::BooleanValue(val)) => *val,
559                    _ => false,
560                };
561
562                if !rebirth {
563                    warn!(
564                        "Received invalid NCMD Rebirth metric - ignoring request. node={}",
565                        self.state.edge_node_id
566                    )
567                }
568            }
569
570            let message_metrics: MessageMetrics = match payload.try_into() {
571                Ok(metrics) => metrics,
572                Err(_) => {
573                    warn!(
574                        "Received invalid CMD payload - ignoring request. node={}",
575                        self.state.edge_node_id
576                    );
577                    return;
578                }
579            };
580
581            self.metric_manager.on_ncmd(handle, message_metrics).await;
582            if rebirth {
583                let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
584                let time_since_last = now - self.last_node_rebirth_request;
585                if time_since_last < self.config.node_rebirth_request_cooldown {
586                    info!(
587                        "Got Rebirth CMD but cooldown time not expired. Ignoring. node={}",
588                        self.state.edge_node_id
589                    );
590                    return;
591                }
592                info!(
593                    "Got Rebirth CMD - Rebirthing Node. node={}",
594                    self.state.edge_node_id
595                );
596                self.rebirth().await;
597                self.last_node_rebirth_request = now;
598            }
599        }
600    }
601
602    fn create_node_handle(&self) -> NodeHandle {
603        NodeHandle {
604            state: self.state.clone(),
605            client: self.client.clone(),
606            devices: self.devices.clone(),
607            stop_tx: self.stop_tx.clone(),
608            rebirth_tx: self.rebirth_request_tx.clone(),
609        }
610    }
611
612    async fn run(mut self) {
613        loop {
614            select! {
615                biased;
616                maybe_state_update = self.client_state_rx.recv() => match maybe_state_update {
617                    Some (state_update) => match state_update {
618                        ClientStateMessage::Online => self.on_online().await,
619                        ClientStateMessage::Offline(sender) => self.on_offline(sender),
620                        ClientStateMessage::Stopped => break
621                    },
622                    None => break, //EoN has been dropped
623                },
624                Some(_) = self.rebirth_request_rx.recv() => self.rebirth().await,
625                maybe_message = self.node_message_rx.recv() => match maybe_message {
626                    Some(message) => self.on_sparkplug_message(message, self.create_node_handle()).await,
627                    None => break, //EoN has been dropped
628                },
629            }
630        }
631    }
632}
633
634enum ClientStateMessage {
635    Stopped,
636    Online,
637    Offline(oneshot::Sender<LastWill>),
638}
639
640/// struct that represents a Sparkplug Edge Node instance.
641///
642/// See [EoNBuilder] on how to create an [EoN] instance.
643pub struct EoN {
644    eventloop: Box<DynEventLoop>,
645    stop_rx: mpsc::Receiver<EoNShutdown>,
646    node_message_tx: mpsc::UnboundedSender<Message>,
647    client_state_tx: mpsc::Sender<ClientStateMessage>,
648    state: Arc<EoNState>,
649    devices: Arc<Mutex<DeviceMap>>,
650}
651
652impl EoN {
653    pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
654        let group_id = builder
655            .group_id
656            .ok_or("group id must be provided".to_string())?;
657        let node_id = builder
658            .node_id
659            .ok_or("node id must be provided".to_string())?;
660        srad_types::utils::validate_name(&group_id)?;
661        srad_types::utils::validate_name(&node_id)?;
662
663        let metric_manager = builder.metric_manager;
664        let (eventloop, client) = builder.eventloop_client;
665        let (stop_tx, stop_rx) = mpsc::channel(1);
666
667        let state = Arc::new(EoNState {
668            running: AtomicBool::new(false),
669            bdseq: AtomicU8::new(0),
670            inner: Mutex::new(EoNStateInner {
671                seq: 0,
672                online: false,
673                birthed: false,
674            }),
675            ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
676            group_id,
677            edge_node_id: node_id,
678        });
679
680        let template_registry = Arc::new(builder.templates);
681
682        let devices = Arc::new(Mutex::new(DeviceMap::new(template_registry.clone())));
683
684        let (node_message_tx, node_message_rx) = mpsc::unbounded_channel();
685        let (rebirth_request_tx, rebirth_request_rx) = mpsc::channel(1);
686        let (client_state_tx, client_state_rx) = mpsc::channel(1);
687
688        let node = Node {
689            metric_manager,
690            template_registry,
691            client: client.clone(),
692            state: state.clone(),
693            devices: devices.clone(),
694            stop_tx,
695            config: Arc::new(EoNConfig {
696                node_rebirth_request_cooldown: builder.node_rebirth_request_cooldown,
697            }),
698            last_node_rebirth_request: Duration::new(0, 0),
699            node_message_rx,
700            rebirth_request_rx,
701            rebirth_request_tx,
702            client_state_rx,
703        };
704
705        let eon = Self {
706            eventloop,
707            stop_rx,
708            node_message_tx,
709            client_state_tx,
710            state,
711            devices,
712        };
713
714        let handle = node.create_node_handle();
715
716        node.metric_manager.init(&handle);
717
718        tokio::spawn(async move { node.run().await });
719
720        Ok((eon, handle))
721    }
722
723    fn update_last_will(&mut self, lastwill: LastWill) {
724        self.eventloop.set_last_will(lastwill);
725    }
726
727    async fn on_online(&mut self) {
728        _ = self.client_state_tx.send(ClientStateMessage::Online).await;
729    }
730
731    async fn on_offline(&mut self) {
732        let (lastwill_tx, lastwill_rx) = oneshot::channel();
733        _ = self
734            .client_state_tx
735            .send(ClientStateMessage::Offline(lastwill_tx))
736            .await;
737        if let Ok(will) = lastwill_rx.await {
738            self.update_last_will(will)
739        }
740    }
741
742    fn on_node_message(&mut self, message: Message) {
743        _ = self.node_message_tx.send(message)
744    }
745
746    fn on_device_message(&mut self, message: DeviceMessage) {
747        self.devices.lock().unwrap().handle_device_message(message);
748    }
749
750    async fn handle_event(&mut self, event: Event) {
751        match event {
752            Event::Online => self.on_online().await,
753            Event::Offline => self.on_offline().await,
754            Event::Node(node_message) => self.on_node_message(node_message.message),
755            Event::Device(device_message) => self.on_device_message(device_message),
756            Event::State {
757                host_id: _,
758                payload: _,
759            } => (),
760            Event::InvalidPublish {
761                reason: _,
762                topic: _,
763                payload: _,
764            } => (),
765        }
766    }
767
768    async fn poll_until_offline(&mut self) -> bool {
769        while self.state.is_online() {
770            if Event::Offline == self.eventloop.poll().await {
771                self.on_offline().await;
772                break;
773            }
774        }
775        true
776    }
777
778    /// Run the Edge Node
779    ///
780    /// Runs the Edge Node until [NodeHandle::cancel()] is called
781    pub async fn run(mut self) {
782        info!("Edge node running. node={}", self.state.edge_node_id);
783        self.state.running.store(true, Ordering::SeqCst);
784
785        self.update_last_will(self.state.create_last_will());
786
787        loop {
788            select! {
789              event = self.eventloop.poll() => self.handle_event(event).await,
790              Some(_) = self.stop_rx.recv() => break,
791            }
792        }
793
794        if timeout(Duration::from_secs(1), self.poll_until_offline())
795            .await
796            .is_err()
797        {
798            self.on_offline().await;
799        }
800
801        _ = self.client_state_tx.send(ClientStateMessage::Stopped).await;
802        info!("Edge node stopped. node={}", self.state.edge_node_id);
803        self.state.running.store(false, Ordering::SeqCst);
804    }
805}