srad_eon/
node.rs

1use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use log::{debug, error, info, warn};
6use srad_client::{DeviceMessage, DynClient, DynEventLoop, MessageKind};
7use srad_client::{Event, NodeMessage};
8
9use srad_types::constants::NODE_CONTROL_REBIRTH;
10use srad_types::payload::metric::Value;
11use srad_types::topic::{DeviceTopic, NodeTopic, QoS, StateTopic, Topic, TopicFilter};
12use srad_types::utils::timestamp;
13use srad_types::MetricValue;
14use srad_types::{
15    constants,
16    payload::Payload,
17    topic::{DeviceMessage as DeviceMessageType, NodeMessage as NodeMessageType},
18};
19use tokio::time::timeout;
20
21use crate::birth::{BirthInitializer, BirthMetricDetails, BirthObjectType};
22use crate::builder::EoNBuilder;
23use crate::device::{DeviceHandle, DeviceMap};
24use crate::error::DeviceRegistrationError;
25use crate::metric::{MessageMetrics, MetricPublisher, PublishError, PublishMetric};
26use crate::metric_manager::manager::{DeviceMetricManager, DynNodeMetricManager};
27use crate::registry::Registry;
28use crate::BirthType;
29
30use tokio::{select, sync::mpsc, task};
31
32#[derive(Debug)]
33struct EoNShutdown;
34
35/// A handle for interacting with the Edge Node.
36///
37/// `NodeHandle` provides an interface for interacting with an edge node,
38/// including device management, node lifecycle operations, and metric publishing.
39#[derive(Clone)]
40pub struct NodeHandle {
41    node: Arc<Node>,
42}
43
44impl NodeHandle {
45    /// Stop all operations, sending a death certificate and disconnect from the broker.
46    ///
47    /// This will cancel [EoN::run()]
48    pub async fn cancel(&self) {
49        info!("Edge node stopping");
50        let topic = NodeTopic::new(
51            &self.node.state.group_id,
52            NodeMessageType::NDeath,
53            &self.node.state.edge_node_id,
54        );
55        let payload = self.node.generate_death_payload();
56        match self
57            .node
58            .client
59            .try_publish_node_message(topic, payload)
60            .await
61        {
62            Ok(_) => (),
63            Err(_) => debug!("Unable to publish node death certificate on exit"),
64        };
65        _ = self.node.stop_tx.send(EoNShutdown).await;
66        _ = self.node.client.disconnect().await;
67    }
68
69    /// Manually trigger a rebirth for the node
70    pub async fn rebirth(&self) {
71        self.node.birth(BirthType::Rebirth).await;
72    }
73
74    /// Registers a new device with the node.
75    ///
76    /// Returns an error if:
77    ///   - A device with the same name is already registered
78    ///   - The device name is invalid
79    pub async fn register_device<S, M>(
80        &self,
81        name: S,
82        dev_impl: M,
83    ) -> Result<DeviceHandle, DeviceRegistrationError>
84    where
85        S: Into<String>,
86        M: DeviceMetricManager + Send + Sync + 'static,
87    {
88        let name = name.into();
89        if let Err(e) = srad_types::utils::validate_name(&name) {
90            return Err(DeviceRegistrationError::InvalidName(e));
91        }
92        let handle = self
93            .node
94            .devices
95            .add_device(
96                &self.node.state.group_id,
97                &self.node.state.edge_node_id,
98                name,
99                Arc::new(dev_impl),
100            )
101            .await?;
102        Ok(handle)
103    }
104
105    /// Unregister a device using it's handle.
106    pub async fn unregister_device(&self, handle: DeviceHandle) {
107        self.unregister_device_named(&handle.device.info.name).await;
108    }
109
110    /// Unregister a device using it's name.
111    pub async fn unregister_device_named(&self, name: &String) {
112        self.node.devices.remove_device(name).await
113    }
114
115    fn check_publish_state(&self) -> Result<(), PublishError> {
116        if !self.node.state.is_online() {
117            return Err(PublishError::Offline);
118        }
119        if !self.node.state.birthed() {
120            return Err(PublishError::UnBirthed);
121        }
122        Ok(())
123    }
124
125    fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
126        let timestamp = timestamp();
127        let mut payload_metrics = Vec::with_capacity(metrics.len());
128        for x in metrics.into_iter() {
129            payload_metrics.push(x.into());
130        }
131        Payload {
132            timestamp: Some(timestamp),
133            metrics: payload_metrics,
134            seq: Some(self.node.state.get_seq()),
135            uuid: None,
136            body: None,
137        }
138    }
139}
140
141impl MetricPublisher for NodeHandle {
142    async fn try_publish_metrics_unsorted(
143        &self,
144        metrics: Vec<PublishMetric>,
145    ) -> Result<(), PublishError> {
146        if metrics.is_empty() {
147            return Err(PublishError::NoMetrics);
148        }
149        self.check_publish_state()?;
150        match self
151            .node
152            .client
153            .try_publish_node_message(
154                self.node.state.ndata_topic.clone(),
155                self.publish_metrics_to_payload(metrics),
156            )
157            .await
158        {
159            Ok(_) => Ok(()),
160            Err(_) => Err(PublishError::Offline),
161        }
162    }
163
164    async fn publish_metrics_unsorted(
165        &self,
166        metrics: Vec<PublishMetric>,
167    ) -> Result<(), PublishError> {
168        if metrics.is_empty() {
169            return Err(PublishError::NoMetrics);
170        }
171        self.check_publish_state()?;
172        match self
173            .node
174            .client
175            .publish_node_message(
176                self.node.state.ndata_topic.clone(),
177                self.publish_metrics_to_payload(metrics),
178            )
179            .await
180        {
181            Ok(_) => Ok(()),
182            Err(_) => Err(PublishError::Offline),
183        }
184    }
185}
186
187pub(crate) struct EoNState {
188    bdseq: AtomicU8,
189    seq: AtomicU8,
190    online: AtomicBool,
191    birthed: AtomicBool,
192    pub group_id: String,
193    pub edge_node_id: String,
194    pub ndata_topic: NodeTopic,
195}
196
197impl EoNState {
198    pub(crate) fn get_seq(&self) -> u64 {
199        self.seq.fetch_add(1, Ordering::Relaxed) as u64
200    }
201
202    pub(crate) fn is_online(&self) -> bool {
203        self.online.load(Ordering::SeqCst)
204    }
205
206    pub(crate) fn birthed(&self) -> bool {
207        self.birthed.load(Ordering::SeqCst)
208    }
209
210    fn birth_topic(&self) -> NodeTopic {
211        NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
212    }
213
214    fn sub_topics(&self) -> Vec<TopicFilter> {
215        vec![
216            TopicFilter::new_with_qos(
217                Topic::NodeTopic(NodeTopic::new(
218                    &self.group_id,
219                    NodeMessageType::NCmd,
220                    &self.edge_node_id,
221                )),
222                QoS::AtLeastOnce,
223            ),
224            TopicFilter::new_with_qos(
225                Topic::DeviceTopic(DeviceTopic::new(
226                    &self.group_id,
227                    DeviceMessageType::DCmd,
228                    &self.edge_node_id,
229                    "+",
230                )),
231                QoS::AtLeastOnce,
232            ),
233            TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
234        ]
235    }
236}
237
238pub struct Node {
239    state: Arc<EoNState>,
240    metric_manager: Box<DynNodeMetricManager>,
241    devices: DeviceMap,
242    client: Arc<DynClient>,
243    stop_tx: mpsc::Sender<EoNShutdown>,
244}
245
246impl Node {
247    fn generate_death_payload(&self) -> Payload {
248        let mut metric = srad_types::payload::Metric::new();
249        metric
250            .set_name(constants::BDSEQ.to_string())
251            .set_value(MetricValue::from(self.state.bdseq.load(Ordering::SeqCst) as i64).into());
252        Payload {
253            seq: None,
254            metrics: vec![metric],
255            uuid: None,
256            timestamp: None,
257            body: None,
258        }
259    }
260
261    fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
262        let timestamp = timestamp();
263        let mut birth_initializer = BirthInitializer::new(BirthObjectType::Node);
264        birth_initializer
265            .register_metric(
266                BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
267                    .use_alias(false),
268            )
269            .unwrap();
270        birth_initializer
271            .register_metric(
272                BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
273                    .use_alias(false),
274            )
275            .unwrap();
276
277        self.metric_manager.initialise_birth(&mut birth_initializer);
278        let metrics = birth_initializer.finish();
279
280        Payload {
281            seq: Some(seq),
282            timestamp: Some(timestamp),
283            metrics,
284            uuid: None,
285            body: None,
286        }
287    }
288
289    async fn node_birth(&self) {
290        /* [tck-id-topics-nbirth-seq-num] The NBIRTH MUST include a sequence number in the payload and it MUST have a value of 0. */
291        self.state.birthed.store(false, Ordering::SeqCst);
292        self.state.seq.store(0, Ordering::SeqCst);
293        let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
294
295        let payload = self.generate_birth_payload(bdseq, 0);
296        let topic = self.state.birth_topic();
297        self.state.seq.store(1, Ordering::SeqCst);
298        match self.client.publish_node_message(topic, payload).await {
299            Ok(_) => self.state.birthed.store(true, Ordering::SeqCst),
300            Err(_) => error!("Publishing birth message failed"),
301        }
302    }
303
304    async fn birth(&self, birth_type: BirthType) {
305        info!("Birthing Node. Type: {:?}", birth_type);
306        self.node_birth().await;
307        self.devices.birth_devices(birth_type).await;
308    }
309}
310
311/// Structure that represents a Sparkplug Edge Node instance.
312///
313/// See [EoNBuilder] on how to create an [EoN] instance.
314pub struct EoN {
315    node: Arc<Node>,
316    eventloop: Box<DynEventLoop>,
317    stop_rx: mpsc::Receiver<EoNShutdown>,
318}
319
320impl EoN {
321    pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
322        let group_id = builder
323            .group_id
324            .ok_or("group id must be provided".to_string())?;
325        let node_id = builder
326            .node_id
327            .ok_or("node id must be provided".to_string())?;
328        srad_types::utils::validate_name(&group_id)?;
329        srad_types::utils::validate_name(&node_id)?;
330
331        let metric_manager = builder.metric_manager;
332        let (eventloop, client) = builder.eventloop_client;
333
334        let (stop_tx, stop_rx) = mpsc::channel(1);
335
336        let state = Arc::new(EoNState {
337            seq: AtomicU8::new(0),
338            bdseq: AtomicU8::new(0),
339            online: AtomicBool::new(false),
340            birthed: AtomicBool::new(false),
341            ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
342            group_id,
343            edge_node_id: node_id,
344        });
345
346        let registry = Arc::new(Mutex::new(Registry::new()));
347
348        let node = Arc::new(Node {
349            metric_manager,
350            client: client.clone(),
351            devices: DeviceMap::new(state.clone(), registry.clone(), client),
352            state,
353            stop_tx,
354        });
355
356        let mut eon = Self {
357            node,
358            eventloop,
359            stop_rx,
360        };
361        let handle = NodeHandle {
362            node: eon.node.clone(),
363        };
364        eon.node.metric_manager.init(&handle);
365        eon.update_last_will();
366        Ok((eon, handle))
367    }
368
369    fn update_last_will(&mut self) {
370        self.eventloop
371            .set_last_will(srad_client::LastWill::new_node(
372                &self.node.state.group_id,
373                &self.node.state.edge_node_id,
374                self.node.generate_death_payload(),
375            ));
376    }
377
378    fn on_online(&self) {
379        if self.node.state.online.swap(true, Ordering::SeqCst) {
380            return;
381        }
382        info!("Edge node online");
383        let sub_topics = self.node.state.sub_topics();
384        let node = self.node.clone();
385
386        tokio::spawn(async move {
387            if node.client.subscribe_many(sub_topics).await.is_ok() {
388                node.birth(BirthType::Birth).await
389            };
390        });
391    }
392
393    async fn on_offline(&mut self) {
394        if !self.node.state.online.swap(false, Ordering::SeqCst) {
395            return;
396        }
397        info!("Edge node offline");
398        self.node.devices.on_offline().await;
399        self.node.state.bdseq.fetch_add(1, Ordering::SeqCst);
400        self.update_last_will();
401    }
402
403    fn on_node_message(&self, message: NodeMessage) {
404        let payload = message.message.payload;
405        let message_kind = message.message.kind;
406
407        if message_kind == MessageKind::Cmd {
408            let mut rebirth = false;
409            for x in &payload.metrics {
410                if x.alias.is_some() {
411                    continue;
412                }
413
414                let metric_name = match &x.name {
415                    Some(name) => name,
416                    None => continue,
417                };
418
419                if metric_name != NODE_CONTROL_REBIRTH {
420                    continue;
421                }
422
423                rebirth = match &x.value {
424                    Some(Value::BooleanValue(val)) => *val,
425                    _ => false,
426                };
427
428                if !rebirth {
429                    warn!("Received invalid CMD Rebirth metric - ignoring request")
430                }
431            }
432
433            let message_metrics: MessageMetrics = match payload.try_into() {
434                Ok(metrics) => metrics,
435                Err(_) => {
436                    warn!("Received invalid CMD payload - ignoring request");
437                    return;
438                }
439            };
440
441            let node = self.node.clone();
442            task::spawn(async move {
443                node.metric_manager
444                    .on_ncmd(NodeHandle { node: node.clone() }, message_metrics)
445                    .await;
446                if rebirth {
447                    info!("Got Rebirth CMD - Rebirthing Node");
448                    node.birth(BirthType::Rebirth).await
449                }
450            });
451        }
452    }
453
454    fn on_device_message(&self, message: DeviceMessage) {
455        let node = self.node.clone();
456        task::spawn(async move {
457            node.devices.handle_device_message(message).await;
458        });
459    }
460
461    async fn handle_event(&mut self, event: Event) {
462        match event {
463            Event::Online => self.on_online(),
464            Event::Offline => self.on_offline().await,
465            Event::Node(node_message) => self.on_node_message(node_message),
466            Event::Device(device_message) => self.on_device_message(device_message),
467            Event::State {
468                host_id: _,
469                payload: _,
470            } => (),
471            Event::InvalidPublish {
472                reason: _,
473                topic: _,
474                payload: _,
475            } => (),
476        }
477    }
478
479    async fn poll_until_offline(&mut self) -> bool {
480        while self.node.state.is_online() {
481            if Event::Offline == self.eventloop.poll().await {
482                self.on_offline().await
483            }
484        }
485        true
486    }
487
488    async fn poll_until_offline_with_timeout(&mut self) {
489        _ = timeout(Duration::from_secs(1), self.poll_until_offline()).await;
490    }
491
492    /// Run the Edge Node
493    ///
494    /// Runs the Edge Node until [NodeHandle::cancel()] is called
495    pub async fn run(&mut self) {
496        info!("Edge node running");
497        self.update_last_will();
498        loop {
499            select! {
500              event = self.eventloop.poll() => self.handle_event(event).await,
501              Some(_) = self.stop_rx.recv() => break,
502            }
503        }
504        self.poll_until_offline_with_timeout().await;
505        self.on_offline().await;
506        info!("Edge node stopped");
507    }
508}