1use std::{
2 collections::HashMap,
3 sync::{
4 atomic::{AtomicBool, AtomicU8, Ordering},
5 Arc, Mutex,
6 },
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use log::{debug, error, info, warn};
11use srad_client::{DeviceMessage, DynClient, DynEventLoop, Event, LastWill, Message, MessageKind};
12use srad_types::{
13 constants::{self, BDSEQ, NODE_CONTROL_REBIRTH},
14 payload::{self, metric::Value, DataType, Payload},
15 topic::{
16 DeviceMessage as DeviceMessageType, DeviceTopic, NodeMessage as NodeMessageType, NodeTopic,
17 QoS, StateTopic, Topic, TopicFilter,
18 },
19 utils::timestamp,
20 MetricValue, Template, TemplateDefinition,
21};
22use thiserror::Error;
23use tokio::{
24 select,
25 sync::{mpsc, oneshot},
26 time::timeout,
27};
28
29use crate::{
30 birth::BirthObjectType,
31 device::{DeviceMap, DeviceRegistrationError},
32 metric_manager::manager::DynNodeMetricManager,
33 BirthInitializer, BirthMetricDetails, BirthType, DeviceHandle, DeviceMetricManager, EoNBuilder,
34 MessageMetrics, MetricPublisher, PublishError, PublishMetric, StateError,
35};
36
37pub(crate) struct EoNConfig {
38 node_rebirth_request_cooldown: Duration,
39}
40
41struct EoNStateInner {
42 seq: u8,
43 online: bool,
44 birthed: bool,
45}
46
47pub(crate) struct EoNState {
48 running: AtomicBool,
49 bdseq: AtomicU8,
50 inner: Mutex<EoNStateInner>,
51 pub group_id: String,
52 pub edge_node_id: String,
53 pub ndata_topic: NodeTopic,
54}
55
56impl EoNState {
57 pub(crate) fn get_next_seq(&self) -> Result<u64, StateError> {
58 let mut state = self.inner.lock().unwrap();
59 if !state.online {
60 return Err(StateError::Offline);
61 }
62 if !state.birthed {
63 return Err(StateError::UnBirthed);
64 }
65 state.seq = state.seq.wrapping_add(1);
66 Ok(state.seq as u64)
67 }
68
69 fn online_swap(&self, online: bool) -> bool {
70 let mut state = self.inner.lock().unwrap();
71 let old_online_state = state.online;
72 state.online = online;
73 old_online_state
74 }
75
76 fn is_online(&self) -> bool {
77 self.inner.lock().unwrap().online
78 }
79
80 fn set_dead(&self) {
81 let mut state = self.inner.lock().unwrap();
82 state.birthed = false;
83 }
84
85 fn birthed(&self) -> bool {
86 self.inner.lock().unwrap().birthed
87 }
88
89 fn start_birth(&self) {
90 let mut state = self.inner.lock().unwrap();
91 state.birthed = false;
92 state.seq = 0;
93 }
94
95 fn birth_completed(&self) {
96 self.inner.lock().unwrap().birthed = true
97 }
98
99 fn birth_topic(&self) -> NodeTopic {
100 NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
101 }
102
103 fn generate_death_payload(&self) -> Payload {
104 let mut metric = srad_types::payload::Metric::new();
105 metric
106 .set_name(constants::BDSEQ.to_string())
107 .set_value(MetricValue::from(self.bdseq.load(Ordering::SeqCst) as i64).into());
108 Payload {
109 seq: None,
110 metrics: vec![metric],
111 uuid: None,
112 timestamp: None,
113 body: None,
114 }
115 }
116
117 fn create_last_will(&self) -> LastWill {
118 LastWill::new_node(
119 &self.group_id,
120 &self.edge_node_id,
121 self.generate_death_payload(),
122 )
123 }
124
125 fn sub_topics(&self) -> Vec<TopicFilter> {
126 vec![
127 TopicFilter::new_with_qos(
128 Topic::NodeTopic(NodeTopic::new(
129 &self.group_id,
130 NodeMessageType::NCmd,
131 &self.edge_node_id,
132 )),
133 QoS::AtLeastOnce,
134 ),
135 TopicFilter::new_with_qos(
136 Topic::DeviceTopic(DeviceTopic::new(
137 &self.group_id,
138 DeviceMessageType::DCmd,
139 &self.edge_node_id,
140 "+",
141 )),
142 QoS::AtLeastOnce,
143 ),
144 TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
145 ]
146 }
147}
148
149#[derive(Debug)]
150struct EoNShutdown;
151
152#[derive(Clone)]
157pub struct NodeHandle {
158 state: Arc<EoNState>,
159 client: Arc<DynClient>,
160 devices: Arc<Mutex<DeviceMap>>,
161 stop_tx: mpsc::Sender<EoNShutdown>,
162 rebirth_tx: mpsc::Sender<()>,
163}
164
165impl NodeHandle {
166 pub async fn cancel(&self) {
170 if !self.state.running.load(Ordering::SeqCst) {
171 return;
172 }
173 info!("Edge node stopping. node={}", self.state.edge_node_id);
174 let topic = NodeTopic::new(
175 &self.state.group_id,
176 NodeMessageType::NDeath,
177 &self.state.edge_node_id,
178 );
179 let payload = self.state.generate_death_payload();
180 match self.client.try_publish_node_message(topic, payload).await {
181 Ok(_) => (),
182 Err(_) => debug!(
183 "Unable to publish node death certificate on exit. node={}",
184 self.state.edge_node_id
185 ),
186 };
187 _ = self.stop_tx.send(EoNShutdown).await;
188 _ = self.client.disconnect().await;
189 }
190
191 pub fn rebirth(&self) {
193 _ = self.rebirth_tx.try_send(());
195 }
196
197 pub fn register_device<S, M>(
203 &self,
204 name: S,
205 dev_impl: M,
206 ) -> Result<DeviceHandle, DeviceRegistrationError>
207 where
208 S: Into<String>,
209 M: DeviceMetricManager + Send + Sync + 'static,
210 {
211 let name = name.into();
212 if let Err(e) = srad_types::utils::validate_name(&name) {
213 return Err(DeviceRegistrationError::InvalidName(e));
214 }
215 let handle = self.devices.lock().unwrap().add_device(
216 &self.state.group_id,
217 &self.state.edge_node_id,
218 name,
219 Box::new(dev_impl),
220 self.state.clone(),
221 self.client.clone(),
222 )?;
223 Ok(handle)
224 }
225
226 pub async fn unregister_device(&self, handle: DeviceHandle) {
228 self.unregister_device_named(&handle.state.name).await;
229 }
230
231 pub async fn unregister_device_named(&self, name: &String) {
233 self.devices.lock().unwrap().remove_device(name)
234 }
235
236 fn publish_metrics_to_payload(&self, seq: u64, metrics: Vec<PublishMetric>) -> Payload {
237 let timestamp = timestamp();
238 let mut payload_metrics = Vec::with_capacity(metrics.len());
239 for x in metrics.into_iter() {
240 payload_metrics.push(x.into());
241 }
242 Payload {
243 timestamp: Some(timestamp),
244 metrics: payload_metrics,
245 seq: Some(seq),
246 uuid: None,
247 body: None,
248 }
249 }
250}
251
252impl MetricPublisher for NodeHandle {
253 async fn try_publish_metrics_unsorted(
254 &self,
255 metrics: Vec<PublishMetric>,
256 ) -> Result<(), PublishError> {
257 if metrics.is_empty() {
258 return Err(PublishError::NoMetrics);
259 }
260 match self
261 .client
262 .try_publish_node_message(
263 self.state.ndata_topic.clone(),
264 self.publish_metrics_to_payload(self.state.get_next_seq()?, metrics),
265 )
266 .await
267 {
268 Ok(_) => Ok(()),
269 Err(_) => Err(PublishError::State(StateError::Offline)),
270 }
271 }
272
273 async fn publish_metrics_unsorted(
274 &self,
275 metrics: Vec<PublishMetric>,
276 ) -> Result<(), PublishError> {
277 if metrics.is_empty() {
278 return Err(PublishError::NoMetrics);
279 }
280 match self
281 .client
282 .publish_node_message(
283 self.state.ndata_topic.clone(),
284 self.publish_metrics_to_payload(self.state.get_next_seq()?, metrics),
285 )
286 .await
287 {
288 Ok(_) => Ok(()),
289 Err(_) => Err(PublishError::State(StateError::Offline)),
290 }
291 }
292}
293
294#[derive(Debug, Error)]
295pub enum TemplateRegistryError {
296 #[error("Invalid template name")]
297 InvalidName,
298 #[error("Duplicate template. A template with that name is already registered")]
299 Duplicate,
300 #[error("The Templates Definition is invalid")]
301 InvalidDefinition,
302 #[error("The Templates Definition contained a template that has not been registered")]
303 UnregisteredMetric,
304}
305
306#[derive(Debug, Clone)]
320pub struct TemplateRegistry {
321 templates: HashMap<String, TemplateDefinition>,
322}
323
324impl TemplateRegistry {
325 pub(crate) fn new() -> Self {
326 Self {
327 templates: HashMap::new(),
328 }
329 }
330
331 pub fn clear(&mut self) {
333 self.templates.clear();
334 }
335
336 pub fn deregister(&mut self, name: &str) {
338 self.templates.remove(name);
339 }
340
341 fn check_template_metrics(
343 &self,
344 metrics: &Vec<payload::Metric>,
345 ) -> Result<(), TemplateRegistryError> {
346 for x in metrics {
347 let datatype = match &x.datatype {
348 Some(t) => match DataType::try_from(*t) {
349 Ok(datatype) => datatype,
350 Err(_) => return Err(TemplateRegistryError::InvalidDefinition),
351 },
352 None => return Err(TemplateRegistryError::InvalidDefinition),
353 };
354 if datatype != DataType::Template {
355 continue;
356 }
357
358 if let Value::TemplateValue(template) = x
359 .value
360 .as_ref()
361 .ok_or(TemplateRegistryError::InvalidDefinition)?
362 {
363 let ref_name = template
364 .template_ref
365 .as_ref()
366 .ok_or(TemplateRegistryError::InvalidDefinition)?;
367 if self.templates.contains_key(ref_name) {
368 return Ok(());
369 }
370 self.check_template_metrics(&template.metrics)?;
371 } else {
372 return Err(TemplateRegistryError::InvalidDefinition);
373 }
374 }
375 Ok(())
376 }
377
378 pub fn register<T: Template>(&mut self) -> Result<(), TemplateRegistryError> {
380 let name = T::template_definition_metric_name();
381 if name == NODE_CONTROL_REBIRTH || name == BDSEQ {
382 return Err(TemplateRegistryError::InvalidName);
383 }
384
385 if self.templates.contains_key(&name) {
386 return Err(TemplateRegistryError::Duplicate);
387 }
388
389 let definition = T::template_definition();
390 self.check_template_metrics(&definition.metrics)?;
391
392 self.templates.insert(name, definition);
393 Ok(())
394 }
395
396 pub fn contains(&self, template_definition_metric_name: &str) -> bool {
398 self.templates.contains_key(template_definition_metric_name)
399 }
400}
401
402struct Node {
403 metric_manager: Box<DynNodeMetricManager>,
404 client: Arc<DynClient>,
405 devices: Arc<Mutex<DeviceMap>>,
406 state: Arc<EoNState>,
407 config: Arc<EoNConfig>,
408 stop_tx: mpsc::Sender<EoNShutdown>,
409 last_node_rebirth_request: Duration,
410
411 template_registry: Arc<TemplateRegistry>,
412
413 rebirth_request_tx: mpsc::Sender<()>,
414
415 node_message_rx: mpsc::UnboundedReceiver<Message>,
416 client_state_rx: mpsc::Receiver<ClientStateMessage>,
417 rebirth_request_rx: mpsc::Receiver<()>,
418}
419
420impl Node {
421 fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
422 let timestamp = timestamp();
423 let mut birth_initializer =
424 BirthInitializer::new(BirthObjectType::Node, self.template_registry.clone());
425
426 birth_initializer
427 .register_metric(
428 BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
429 .use_alias(false),
430 )
431 .unwrap();
432 birth_initializer
433 .register_metric(
434 BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
435 .use_alias(false),
436 )
437 .unwrap();
438
439 for (name, template_definition) in &self.template_registry.templates {
440 birth_initializer
441 .register_template_definition(name.clone(), template_definition.clone())
442 .unwrap();
443 }
444
445 self.metric_manager.initialise_birth(&mut birth_initializer);
446 let metrics = birth_initializer.finish();
447
448 Payload {
449 seq: Some(seq),
450 timestamp: Some(timestamp),
451 metrics,
452 uuid: None,
453 body: None,
454 }
455 }
456
457 async fn node_birth(&mut self) -> Result<(), ()> {
458 self.state.start_birth();
460
461 let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
462
463 let mut updatable_template_registry = self.template_registry.as_ref().clone();
465 self.metric_manager
466 .birth_update_template_registry(&mut updatable_template_registry);
467 self.template_registry = Arc::new(updatable_template_registry);
468
469 let payload = self.generate_birth_payload(bdseq, 0);
470 let topic = self.state.birth_topic();
471 match self.client.publish_node_message(topic, payload).await {
472 Ok(_) => {
473 self.state.birth_completed();
474 Ok(())
475 }
476 Err(_) => {
477 error!(
478 "Publishing node birth message failed. node={}",
479 self.state.edge_node_id
480 );
481 Err(())
482 }
483 }
484 }
485
486 async fn birth(&mut self, birth_type: BirthType) {
487 info!(
488 "Birthing Node. node={} type={birth_type:?}",
489 self.state.edge_node_id
490 );
491 if self.node_birth().await.is_err() {
492 return;
493 }
494 self.devices
495 .lock()
496 .unwrap()
497 .birth_devices(birth_type, &self.template_registry);
498 }
499
500 async fn rebirth(&mut self) {
501 if !self.state.birthed() {
502 return;
503 }
504 self.birth(BirthType::Rebirth).await;
505 }
506
507 fn death(&self) {
508 self.state.set_dead();
509 self.state.bdseq.fetch_add(1, Ordering::SeqCst);
510 self.devices.lock().unwrap().on_death();
511 }
512
513 async fn on_online(&mut self) {
514 if self.state.online_swap(true) {
515 return;
516 }
517
518 info!("Edge node online. node={}", self.state.edge_node_id);
519 let sub_topics = self.state.sub_topics();
520
521 if self.client.subscribe_many(sub_topics).await.is_ok() {
522 self.birth(BirthType::Birth).await
523 };
524 }
525
526 fn on_offline(&self, will_sender: oneshot::Sender<LastWill>) {
527 if !self.state.online_swap(false) {
528 return;
529 }
530
531 info!("Edge node offline. node={}", self.state.edge_node_id);
532 self.death();
533 let new_lastwill = self.state.create_last_will();
534 _ = will_sender.send(new_lastwill);
535 }
536
537 async fn on_sparkplug_message(&mut self, message: Message, handle: NodeHandle) {
538 let payload = message.payload;
539 let message_kind = message.kind;
540
541 if message_kind == MessageKind::Cmd {
542 let mut rebirth = false;
543 for x in &payload.metrics {
544 if x.alias.is_some() {
545 continue;
546 }
547
548 let metric_name = match &x.name {
549 Some(name) => name,
550 None => continue,
551 };
552
553 if metric_name != NODE_CONTROL_REBIRTH {
554 continue;
555 }
556
557 rebirth = match &x.value {
558 Some(Value::BooleanValue(val)) => *val,
559 _ => false,
560 };
561
562 if !rebirth {
563 warn!(
564 "Received invalid NCMD Rebirth metric - ignoring request. node={}",
565 self.state.edge_node_id
566 )
567 }
568 }
569
570 let message_metrics: MessageMetrics = match payload.try_into() {
571 Ok(metrics) => metrics,
572 Err(_) => {
573 warn!(
574 "Received invalid CMD payload - ignoring request. node={}",
575 self.state.edge_node_id
576 );
577 return;
578 }
579 };
580
581 self.metric_manager.on_ncmd(handle, message_metrics).await;
582 if rebirth {
583 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
584 let time_since_last = now - self.last_node_rebirth_request;
585 if time_since_last < self.config.node_rebirth_request_cooldown {
586 info!(
587 "Got Rebirth CMD but cooldown time not expired. Ignoring. node={}",
588 self.state.edge_node_id
589 );
590 return;
591 }
592 info!(
593 "Got Rebirth CMD - Rebirthing Node. node={}",
594 self.state.edge_node_id
595 );
596 self.rebirth().await;
597 self.last_node_rebirth_request = now;
598 }
599 }
600 }
601
602 fn create_node_handle(&self) -> NodeHandle {
603 NodeHandle {
604 state: self.state.clone(),
605 client: self.client.clone(),
606 devices: self.devices.clone(),
607 stop_tx: self.stop_tx.clone(),
608 rebirth_tx: self.rebirth_request_tx.clone(),
609 }
610 }
611
612 async fn run(mut self) {
613 loop {
614 select! {
615 biased;
616 maybe_state_update = self.client_state_rx.recv() => match maybe_state_update {
617 Some (state_update) => match state_update {
618 ClientStateMessage::Online => self.on_online().await,
619 ClientStateMessage::Offline(sender) => self.on_offline(sender),
620 ClientStateMessage::Stopped => break
621 },
622 None => break, },
624 Some(_) = self.rebirth_request_rx.recv() => self.rebirth().await,
625 maybe_message = self.node_message_rx.recv() => match maybe_message {
626 Some(message) => self.on_sparkplug_message(message, self.create_node_handle()).await,
627 None => break, },
629 }
630 }
631 }
632}
633
634enum ClientStateMessage {
635 Stopped,
636 Online,
637 Offline(oneshot::Sender<LastWill>),
638}
639
640pub struct EoN {
644 eventloop: Box<DynEventLoop>,
645 stop_rx: mpsc::Receiver<EoNShutdown>,
646 node_message_tx: mpsc::UnboundedSender<Message>,
647 client_state_tx: mpsc::Sender<ClientStateMessage>,
648 state: Arc<EoNState>,
649 devices: Arc<Mutex<DeviceMap>>,
650}
651
652impl EoN {
653 pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
654 let group_id = builder
655 .group_id
656 .ok_or("group id must be provided".to_string())?;
657 let node_id = builder
658 .node_id
659 .ok_or("node id must be provided".to_string())?;
660 srad_types::utils::validate_name(&group_id)?;
661 srad_types::utils::validate_name(&node_id)?;
662
663 let metric_manager = builder.metric_manager;
664 let (eventloop, client) = builder.eventloop_client;
665 let (stop_tx, stop_rx) = mpsc::channel(1);
666
667 let state = Arc::new(EoNState {
668 running: AtomicBool::new(false),
669 bdseq: AtomicU8::new(0),
670 inner: Mutex::new(EoNStateInner {
671 seq: 0,
672 online: false,
673 birthed: false,
674 }),
675 ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
676 group_id,
677 edge_node_id: node_id,
678 });
679
680 let template_registry = Arc::new(builder.templates);
681
682 let devices = Arc::new(Mutex::new(DeviceMap::new(template_registry.clone())));
683
684 let (node_message_tx, node_message_rx) = mpsc::unbounded_channel();
685 let (rebirth_request_tx, rebirth_request_rx) = mpsc::channel(1);
686 let (client_state_tx, client_state_rx) = mpsc::channel(1);
687
688 let node = Node {
689 metric_manager,
690 template_registry,
691 client: client.clone(),
692 state: state.clone(),
693 devices: devices.clone(),
694 stop_tx,
695 config: Arc::new(EoNConfig {
696 node_rebirth_request_cooldown: builder.node_rebirth_request_cooldown,
697 }),
698 last_node_rebirth_request: Duration::new(0, 0),
699 node_message_rx,
700 rebirth_request_rx,
701 rebirth_request_tx,
702 client_state_rx,
703 };
704
705 let eon = Self {
706 eventloop,
707 stop_rx,
708 node_message_tx,
709 client_state_tx,
710 state,
711 devices,
712 };
713
714 let handle = node.create_node_handle();
715
716 node.metric_manager.init(&handle);
717
718 tokio::spawn(async move { node.run().await });
719
720 Ok((eon, handle))
721 }
722
723 fn update_last_will(&mut self, lastwill: LastWill) {
724 self.eventloop.set_last_will(lastwill);
725 }
726
727 async fn on_online(&mut self) {
728 _ = self.client_state_tx.send(ClientStateMessage::Online).await;
729 }
730
731 async fn on_offline(&mut self) {
732 let (lastwill_tx, lastwill_rx) = oneshot::channel();
733 _ = self
734 .client_state_tx
735 .send(ClientStateMessage::Offline(lastwill_tx))
736 .await;
737 if let Ok(will) = lastwill_rx.await {
738 self.update_last_will(will)
739 }
740 }
741
742 fn on_node_message(&mut self, message: Message) {
743 _ = self.node_message_tx.send(message)
744 }
745
746 fn on_device_message(&mut self, message: DeviceMessage) {
747 self.devices.lock().unwrap().handle_device_message(message);
748 }
749
750 async fn handle_event(&mut self, event: Event) {
751 match event {
752 Event::Online => self.on_online().await,
753 Event::Offline => self.on_offline().await,
754 Event::Node(node_message) => self.on_node_message(node_message.message),
755 Event::Device(device_message) => self.on_device_message(device_message),
756 Event::State {
757 host_id: _,
758 payload: _,
759 } => (),
760 Event::InvalidPublish {
761 reason: _,
762 topic: _,
763 payload: _,
764 } => (),
765 }
766 }
767
768 async fn poll_until_offline(&mut self) -> bool {
769 while self.state.is_online() {
770 if Event::Offline == self.eventloop.poll().await {
771 self.on_offline().await;
772 break;
773 }
774 }
775 true
776 }
777
778 pub async fn run(mut self) {
782 info!("Edge node running. node={}", self.state.edge_node_id);
783 self.state.running.store(true, Ordering::SeqCst);
784
785 self.update_last_will(self.state.create_last_will());
786
787 loop {
788 select! {
789 event = self.eventloop.poll() => self.handle_event(event).await,
790 Some(_) = self.stop_rx.recv() => break,
791 }
792 }
793
794 if timeout(Duration::from_secs(1), self.poll_until_offline())
795 .await
796 .is_err()
797 {
798 self.on_offline().await;
799 }
800
801 _ = self.client_state_tx.send(ClientStateMessage::Stopped).await;
802 info!("Edge node stopped. node={}", self.state.edge_node_id);
803 self.state.running.store(false, Ordering::SeqCst);
804 }
805}