srad_eon/
node.rs

1use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use log::{debug, error, info, warn};
6use srad_client::{DeviceMessage, DynClient, DynEventLoop, MessageKind};
7use srad_client::{Event, NodeMessage};
8
9use srad_types::constants::NODE_CONTROL_REBIRTH;
10use srad_types::payload::metric::Value;
11use srad_types::topic::{DeviceTopic, NodeTopic, QoS, StateTopic, Topic, TopicFilter};
12use srad_types::utils::timestamp;
13use srad_types::MetricValue;
14use srad_types::{
15    constants,
16    payload::Payload,
17    topic::{DeviceMessage as DeviceMessageType, NodeMessage as NodeMessageType},
18};
19use tokio::time::timeout;
20
21use crate::birth::{BirthInitializer, BirthMetricDetails, BirthObjectType};
22use crate::builder::EoNBuilder;
23use crate::device::{DeviceHandle, DeviceMap};
24use crate::error::DeviceRegistrationError;
25use crate::metric::{MessageMetrics, MetricPublisher, PublishError, PublishMetric};
26use crate::metric_manager::manager::{DeviceMetricManager, DynNodeMetricManager};
27use crate::registry::Registry;
28use crate::BirthType;
29
30use tokio::{select, sync::mpsc, task};
31
32#[derive(Debug)]
33struct EoNShutdown;
34
35/// A handle for interacting with the Edge Node.
36///
37/// `NodeHandle` provides an interface for interacting with an edge node,
38/// including device management, node lifecycle operations, and metric publishing.
39#[derive(Clone)]
40pub struct NodeHandle {
41    node: Arc<Node>,
42}
43
44impl NodeHandle {
45    /// Stop all operations, sending a death certificate and disconnect from the broker.
46    ///
47    /// This will cancel [EoN::run()]
48    pub async fn cancel(&self) {
49        info!("Edge node stopping");
50        let topic = NodeTopic::new(
51            &self.node.state.group_id,
52            NodeMessageType::NDeath,
53            &self.node.state.edge_node_id,
54        );
55        let payload = self.node.generate_death_payload();
56        match self
57            .node
58            .client
59            .try_publish_node_message(topic, payload)
60            .await
61        {
62            Ok(_) => (),
63            Err(_) => debug!("Unable to publish node death certificate on exit"),
64        };
65        _ = self.node.stop_tx.send(EoNShutdown).await;
66        _ = self.node.client.disconnect().await;
67    }
68
69    /// Manually trigger a rebirth for the node
70    pub async fn rebirth(&self) {
71        self.node.birth(BirthType::Rebirth).await;
72    }
73
74    /// Registers a new device with the node.
75    ///
76    /// Returns an error if:
77    ///   - A device with the same name is already registered
78    ///   - The device name is invalid
79    pub async fn register_device<S, M>(
80        &self,
81        name: S,
82        dev_impl: M,
83    ) -> Result<DeviceHandle, DeviceRegistrationError>
84    where
85        S: Into<String>,
86        M: DeviceMetricManager + Send + Sync + 'static,
87    {
88        let name = name.into();
89        if let Err(e) = srad_types::utils::validate_name(&name) {
90            return Err(DeviceRegistrationError::InvalidName(e));
91        }
92        let handle = self
93            .node
94            .devices
95            .add_device(
96                &self.node.state.group_id,
97                &self.node.state.edge_node_id,
98                name,
99                Arc::new(dev_impl),
100            )
101            .await?;
102        Ok(handle)
103    }
104
105    /// Unregister a device using it's handle.
106    pub async fn unregister_device(&self, handle: DeviceHandle) {
107        self.unregister_device_named(&handle.device.info.name).await;
108    }
109
110    /// Unregister a device using it's name.
111    pub async fn unregister_device_named(&self, name: &String) {
112        self.node.devices.remove_device(name).await
113    }
114
115    fn check_publish_state(&self) -> Result<(), PublishError> {
116        if !self.node.state.is_online() {
117            return Err(PublishError::Offline);
118        }
119        if !self.node.state.birthed() {
120            return Err(PublishError::UnBirthed);
121        }
122        Ok(())
123    }
124
125    fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
126        let timestamp = timestamp();
127        let mut payload_metrics = Vec::with_capacity(metrics.len());
128        for x in metrics.into_iter() {
129            payload_metrics.push(x.into());
130        }
131        Payload {
132            timestamp: Some(timestamp),
133            metrics: payload_metrics,
134            seq: Some(self.node.state.get_seq()),
135            uuid: None,
136            body: None,
137        }
138    }
139}
140
141impl MetricPublisher for NodeHandle {
142    async fn try_publish_metrics_unsorted(
143        &self,
144        metrics: Vec<PublishMetric>,
145    ) -> Result<(), PublishError> {
146        if metrics.is_empty() {
147            return Err(PublishError::NoMetrics);
148        }
149        self.check_publish_state()?;
150        match self
151            .node
152            .client
153            .try_publish_node_message(
154                self.node.state.ndata_topic.clone(),
155                self.publish_metrics_to_payload(metrics),
156            )
157            .await
158        {
159            Ok(_) => Ok(()),
160            Err(_) => Err(PublishError::Offline),
161        }
162    }
163
164    async fn publish_metrics_unsorted(
165        &self,
166        metrics: Vec<PublishMetric>,
167    ) -> Result<(), PublishError> {
168        if metrics.is_empty() {
169            return Err(PublishError::NoMetrics);
170        }
171        self.check_publish_state()?;
172        match self
173            .node
174            .client
175            .publish_node_message(
176                self.node.state.ndata_topic.clone(),
177                self.publish_metrics_to_payload(metrics),
178            )
179            .await
180        {
181            Ok(_) => Ok(()),
182            Err(_) => Err(PublishError::Offline),
183        }
184    }
185}
186
187pub(crate) struct EoNState {
188    bdseq: AtomicU8,
189    seq: AtomicU8,
190    online: AtomicBool,
191    birthed: AtomicBool,
192    pub group_id: String,
193    pub edge_node_id: String,
194    pub ndata_topic: NodeTopic,
195}
196
197impl EoNState {
198    pub(crate) fn get_seq(&self) -> u64 {
199        self.seq.fetch_add(1, Ordering::Relaxed) as u64
200    }
201
202    pub(crate) fn is_online(&self) -> bool {
203        self.online.load(Ordering::SeqCst)
204    }
205
206    pub(crate) fn birthed(&self) -> bool {
207        self.birthed.load(Ordering::SeqCst)
208    }
209
210    fn birth_topic(&self) -> NodeTopic {
211        NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
212    }
213
214    fn sub_topics(&self) -> Vec<TopicFilter> {
215        vec![
216            TopicFilter::new_with_qos(
217                Topic::NodeTopic(NodeTopic::new(
218                    &self.group_id,
219                    NodeMessageType::NCmd,
220                    &self.edge_node_id,
221                )),
222                QoS::AtLeastOnce,
223            ),
224            TopicFilter::new_with_qos(
225                Topic::DeviceTopic(DeviceTopic::new(
226                    &self.group_id,
227                    DeviceMessageType::DCmd,
228                    &self.edge_node_id,
229                    "+",
230                )),
231                QoS::AtLeastOnce,
232            ),
233            TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
234        ]
235    }
236}
237
238pub struct Node {
239    state: Arc<EoNState>,
240    metric_manager: Box<DynNodeMetricManager>,
241    devices: DeviceMap,
242    client: Arc<DynClient>,
243    stop_tx: mpsc::Sender<EoNShutdown>,
244}
245
246impl Node {
247    fn generate_death_payload(&self) -> Payload {
248        let mut metric = srad_types::payload::Metric::new();
249        metric
250            .set_name(constants::BDSEQ.to_string())
251            .set_value(MetricValue::from(self.state.bdseq.load(Ordering::SeqCst) as i64).into());
252        Payload {
253            seq: None,
254            metrics: vec![metric],
255            uuid: None,
256            timestamp: None,
257            body: None,
258        }
259    }
260
261    fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
262        let timestamp = timestamp();
263        let mut birth_initializer = BirthInitializer::new(BirthObjectType::Node);
264        birth_initializer
265            .register_metric(
266                BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
267                    .use_alias(false),
268            )
269            .unwrap();
270        birth_initializer
271            .register_metric(
272                BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
273                    .use_alias(false),
274            )
275            .unwrap();
276
277        self.metric_manager.initialise_birth(&mut birth_initializer);
278        let metrics = birth_initializer.finish();
279
280        Payload {
281            seq: Some(seq),
282            timestamp: Some(timestamp),
283            metrics,
284            uuid: None,
285            body: None,
286        }
287    }
288
289    async fn node_birth(&self) {
290        /* [tck-id-topics-nbirth-seq-num] The NBIRTH MUST include a sequence number in the payload and it MUST have a value of 0. */
291        self.state.birthed.store(false, Ordering::SeqCst);
292        self.state.seq.store(0, Ordering::SeqCst);
293        let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
294
295        let payload = self.generate_birth_payload(bdseq, 0);
296        let topic = self.state.birth_topic();
297        self.state.seq.store(1, Ordering::SeqCst);
298        match self.client.publish_node_message(topic, payload).await {
299            Ok(_) => self.state.birthed.store(true, Ordering::SeqCst),
300            Err(_) => error!("Publishing birth message failed"),
301        }
302    }
303
304    async fn birth(&self, birth_type: BirthType) {
305        info!("Birthing Node. Type: {:?}", birth_type);
306        self.node_birth().await;
307        self.devices.birth_devices(birth_type).await;
308    }
309}
310
311/// Structure that represents a Sparkplug Edge Node instance
312pub struct EoN {
313    node: Arc<Node>,
314    eventloop: Box<DynEventLoop>,
315    stop_rx: mpsc::Receiver<EoNShutdown>,
316}
317
318impl EoN {
319    pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
320        let group_id = builder
321            .group_id
322            .ok_or("group id must be provided".to_string())?;
323        let node_id = builder
324            .node_id
325            .ok_or("node id must be provided".to_string())?;
326        srad_types::utils::validate_name(&group_id)?;
327        srad_types::utils::validate_name(&node_id)?;
328
329        let metric_manager = builder.metric_manager;
330        let (eventloop, client) = builder.eventloop_client;
331
332        let (stop_tx, stop_rx) = mpsc::channel(1);
333
334        let state = Arc::new(EoNState {
335            seq: AtomicU8::new(0),
336            bdseq: AtomicU8::new(0),
337            online: AtomicBool::new(false),
338            birthed: AtomicBool::new(false),
339            ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
340            group_id,
341            edge_node_id: node_id,
342        });
343
344        let registry = Arc::new(Mutex::new(Registry::new()));
345
346        let node = Arc::new(Node {
347            metric_manager,
348            client: client.clone(),
349            devices: DeviceMap::new(state.clone(), registry.clone(), client),
350            state,
351            stop_tx,
352        });
353
354        let mut eon = Self {
355            node,
356            eventloop,
357            stop_rx,
358        };
359        let handle = NodeHandle {
360            node: eon.node.clone(),
361        };
362        eon.node.metric_manager.init(&handle);
363        eon.update_last_will();
364        Ok((eon, handle))
365    }
366
367    fn update_last_will(&mut self) {
368        self.eventloop
369            .set_last_will(srad_client::LastWill::new_node(
370                &self.node.state.group_id,
371                &self.node.state.edge_node_id,
372                self.node.generate_death_payload(),
373            ));
374    }
375
376    fn on_online(&self) {
377        if self.node.state.online.swap(true, Ordering::SeqCst) {
378            return;
379        }
380        info!("Edge node online");
381        let sub_topics = self.node.state.sub_topics();
382        let node = self.node.clone();
383
384        tokio::spawn(async move {
385            if node.client.subscribe_many(sub_topics).await.is_ok() {
386                node.birth(BirthType::Birth).await
387            };
388        });
389    }
390
391    async fn on_offline(&mut self) {
392        if !self.node.state.online.swap(false, Ordering::SeqCst) {
393            return;
394        }
395        info!("Edge node offline");
396        self.node.devices.on_offline().await;
397        self.node.state.bdseq.fetch_add(1, Ordering::SeqCst);
398        self.update_last_will();
399    }
400
401    fn on_node_message(&self, message: NodeMessage) {
402        let payload = message.message.payload;
403        let message_kind = message.message.kind;
404
405        if message_kind == MessageKind::Cmd {
406            let mut rebirth = false;
407            for x in &payload.metrics {
408                if x.alias.is_some() {
409                    continue;
410                }
411
412                let metric_name = match &x.name {
413                    Some(name) => name,
414                    None => continue,
415                };
416
417                if metric_name != NODE_CONTROL_REBIRTH {
418                    continue;
419                }
420
421                rebirth = match &x.value {
422                    Some(Value::BooleanValue(val)) => *val,
423                    _ => false,
424                };
425
426                if !rebirth {
427                    warn!("Received invalid CMD Rebirth metric - ignoring request")
428                }
429            }
430
431            let message_metrics: MessageMetrics = match payload.try_into() {
432                Ok(metrics) => metrics,
433                Err(_) => {
434                    warn!("Received invalid CMD payload - ignoring request");
435                    return;
436                }
437            };
438
439            let node = self.node.clone();
440            task::spawn(async move {
441                node.metric_manager
442                    .on_ncmd(NodeHandle { node: node.clone() }, message_metrics)
443                    .await;
444                if rebirth {
445                    info!("Got Rebirth CMD - Rebirthing Node");
446                    node.birth(BirthType::Rebirth).await
447                }
448            });
449        }
450    }
451
452    fn on_device_message(&self, message: DeviceMessage) {
453        let node = self.node.clone();
454        task::spawn(async move {
455            node.devices.handle_device_message(message).await;
456        });
457    }
458
459    async fn handle_event(&mut self, event: Event) {
460        match event {
461            Event::Online => self.on_online(),
462            Event::Offline => self.on_offline().await,
463            Event::Node(node_message) => self.on_node_message(node_message),
464            Event::Device(device_message) => self.on_device_message(device_message),
465            Event::State {
466                host_id: _,
467                payload: _,
468            } => (),
469            Event::InvalidPublish {
470                reason: _,
471                topic: _,
472                payload: _,
473            } => (),
474        }
475    }
476
477    async fn poll_until_offline(&mut self) -> bool {
478        while self.node.state.is_online() {
479            if Event::Offline == self.eventloop.poll().await {
480                self.on_offline().await
481            }
482        }
483        true
484    }
485
486    async fn poll_until_offline_with_timeout(&mut self) {
487        _ = timeout(Duration::from_secs(1), self.poll_until_offline()).await;
488    }
489
490    /// Run the Edge Node
491    ///
492    /// Runs the Edge Node until [NodeHandle::cancel()] is called
493    pub async fn run(&mut self) {
494        info!("Edge node running");
495        self.update_last_will();
496        loop {
497            select! {
498              event = self.eventloop.poll() => self.handle_event(event).await,
499              Some(_) = self.stop_rx.recv() => break,
500            }
501        }
502        self.poll_until_offline_with_timeout().await;
503        self.on_offline().await;
504        info!("Edge node stopped");
505    }
506}