1use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use log::{debug, error, info, warn};
6use srad_client::{DeviceMessage, DynClient, DynEventLoop, MessageKind};
7use srad_client::{Event, NodeMessage};
8
9use srad_types::constants::NODE_CONTROL_REBIRTH;
10use srad_types::payload::metric::Value;
11use srad_types::topic::{DeviceTopic, NodeTopic, QoS, StateTopic, Topic, TopicFilter};
12use srad_types::utils::timestamp;
13use srad_types::MetricValue;
14use srad_types::{
15 constants,
16 payload::Payload,
17 topic::{DeviceMessage as DeviceMessageType, NodeMessage as NodeMessageType},
18};
19use tokio::time::timeout;
20
21use crate::birth::{BirthInitializer, BirthMetricDetails, BirthObjectType};
22use crate::builder::EoNBuilder;
23use crate::device::{DeviceHandle, DeviceMap};
24use crate::error::DeviceRegistrationError;
25use crate::metric::{MessageMetrics, MetricPublisher, PublishError, PublishMetric};
26use crate::metric_manager::manager::{DeviceMetricManager, DynNodeMetricManager};
27use crate::registry::Registry;
28use crate::BirthType;
29
30use tokio::{select, sync::mpsc, task};
31
32#[derive(Debug)]
33struct EoNShutdown;
34
35#[derive(Clone)]
40pub struct NodeHandle {
41 node: Arc<Node>,
42}
43
44impl NodeHandle {
45 pub async fn cancel(&self) {
49 info!("Edge node stopping");
50 let topic = NodeTopic::new(
51 &self.node.state.group_id,
52 NodeMessageType::NDeath,
53 &self.node.state.edge_node_id,
54 );
55 let payload = self.node.generate_death_payload();
56 match self
57 .node
58 .client
59 .try_publish_node_message(topic, payload)
60 .await
61 {
62 Ok(_) => (),
63 Err(_) => debug!("Unable to publish node death certificate on exit"),
64 };
65 _ = self.node.stop_tx.send(EoNShutdown).await;
66 _ = self.node.client.disconnect().await;
67 }
68
69 pub async fn rebirth(&self) {
71 self.node.birth(BirthType::Rebirth).await;
72 }
73
74 pub async fn register_device<S, M>(
80 &self,
81 name: S,
82 dev_impl: M,
83 ) -> Result<DeviceHandle, DeviceRegistrationError>
84 where
85 S: Into<String>,
86 M: DeviceMetricManager + Send + Sync + 'static,
87 {
88 let name = name.into();
89 if let Err(e) = srad_types::utils::validate_name(&name) {
90 return Err(DeviceRegistrationError::InvalidName(e));
91 }
92 let handle = self
93 .node
94 .devices
95 .add_device(
96 &self.node.state.group_id,
97 &self.node.state.edge_node_id,
98 name,
99 Arc::new(dev_impl),
100 )
101 .await?;
102 Ok(handle)
103 }
104
105 pub async fn unregister_device(&self, handle: DeviceHandle) {
107 self.unregister_device_named(&handle.device.info.name).await;
108 }
109
110 pub async fn unregister_device_named(&self, name: &String) {
112 self.node.devices.remove_device(name).await
113 }
114
115 fn check_publish_state(&self) -> Result<(), PublishError> {
116 if !self.node.state.is_online() {
117 return Err(PublishError::Offline);
118 }
119 if !self.node.state.birthed() {
120 return Err(PublishError::UnBirthed);
121 }
122 Ok(())
123 }
124
125 fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
126 let timestamp = timestamp();
127 let mut payload_metrics = Vec::with_capacity(metrics.len());
128 for x in metrics.into_iter() {
129 payload_metrics.push(x.into());
130 }
131 Payload {
132 timestamp: Some(timestamp),
133 metrics: payload_metrics,
134 seq: Some(self.node.state.get_seq()),
135 uuid: None,
136 body: None,
137 }
138 }
139}
140
141impl MetricPublisher for NodeHandle {
142 async fn try_publish_metrics_unsorted(
143 &self,
144 metrics: Vec<PublishMetric>,
145 ) -> Result<(), PublishError> {
146 if metrics.is_empty() {
147 return Err(PublishError::NoMetrics);
148 }
149 self.check_publish_state()?;
150 match self
151 .node
152 .client
153 .try_publish_node_message(
154 self.node.state.ndata_topic.clone(),
155 self.publish_metrics_to_payload(metrics),
156 )
157 .await
158 {
159 Ok(_) => Ok(()),
160 Err(_) => Err(PublishError::Offline),
161 }
162 }
163
164 async fn publish_metrics_unsorted(
165 &self,
166 metrics: Vec<PublishMetric>,
167 ) -> Result<(), PublishError> {
168 if metrics.is_empty() {
169 return Err(PublishError::NoMetrics);
170 }
171 self.check_publish_state()?;
172 match self
173 .node
174 .client
175 .publish_node_message(
176 self.node.state.ndata_topic.clone(),
177 self.publish_metrics_to_payload(metrics),
178 )
179 .await
180 {
181 Ok(_) => Ok(()),
182 Err(_) => Err(PublishError::Offline),
183 }
184 }
185}
186
187pub(crate) struct EoNState {
188 bdseq: AtomicU8,
189 seq: AtomicU8,
190 online: AtomicBool,
191 birthed: AtomicBool,
192 pub group_id: String,
193 pub edge_node_id: String,
194 pub ndata_topic: NodeTopic,
195}
196
197impl EoNState {
198 pub(crate) fn get_seq(&self) -> u64 {
199 self.seq.fetch_add(1, Ordering::Relaxed) as u64
200 }
201
202 pub(crate) fn is_online(&self) -> bool {
203 self.online.load(Ordering::SeqCst)
204 }
205
206 pub(crate) fn birthed(&self) -> bool {
207 self.birthed.load(Ordering::SeqCst)
208 }
209
210 fn birth_topic(&self) -> NodeTopic {
211 NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
212 }
213
214 fn sub_topics(&self) -> Vec<TopicFilter> {
215 vec![
216 TopicFilter::new_with_qos(
217 Topic::NodeTopic(NodeTopic::new(
218 &self.group_id,
219 NodeMessageType::NCmd,
220 &self.edge_node_id,
221 )),
222 QoS::AtLeastOnce,
223 ),
224 TopicFilter::new_with_qos(
225 Topic::DeviceTopic(DeviceTopic::new(
226 &self.group_id,
227 DeviceMessageType::DCmd,
228 &self.edge_node_id,
229 "+",
230 )),
231 QoS::AtLeastOnce,
232 ),
233 TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
234 ]
235 }
236}
237
238pub struct Node {
239 state: Arc<EoNState>,
240 metric_manager: Box<DynNodeMetricManager>,
241 devices: DeviceMap,
242 client: Arc<DynClient>,
243 stop_tx: mpsc::Sender<EoNShutdown>,
244}
245
246impl Node {
247 fn generate_death_payload(&self) -> Payload {
248 let mut metric = srad_types::payload::Metric::new();
249 metric
250 .set_name(constants::BDSEQ.to_string())
251 .set_value(MetricValue::from(self.state.bdseq.load(Ordering::SeqCst) as i64).into());
252 Payload {
253 seq: None,
254 metrics: vec![metric],
255 uuid: None,
256 timestamp: None,
257 body: None,
258 }
259 }
260
261 fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
262 let timestamp = timestamp();
263 let mut birth_initializer = BirthInitializer::new(BirthObjectType::Node);
264 birth_initializer
265 .register_metric(
266 BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
267 .use_alias(false),
268 )
269 .unwrap();
270 birth_initializer
271 .register_metric(
272 BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
273 .use_alias(false),
274 )
275 .unwrap();
276
277 self.metric_manager.initialise_birth(&mut birth_initializer);
278 let metrics = birth_initializer.finish();
279
280 Payload {
281 seq: Some(seq),
282 timestamp: Some(timestamp),
283 metrics,
284 uuid: None,
285 body: None,
286 }
287 }
288
289 async fn node_birth(&self) {
290 self.state.birthed.store(false, Ordering::SeqCst);
292 self.state.seq.store(0, Ordering::SeqCst);
293 let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
294
295 let payload = self.generate_birth_payload(bdseq, 0);
296 let topic = self.state.birth_topic();
297 self.state.seq.store(1, Ordering::SeqCst);
298 match self.client.publish_node_message(topic, payload).await {
299 Ok(_) => self.state.birthed.store(true, Ordering::SeqCst),
300 Err(_) => error!("Publishing birth message failed"),
301 }
302 }
303
304 async fn birth(&self, birth_type: BirthType) {
305 info!("Birthing Node. Type: {:?}", birth_type);
306 self.node_birth().await;
307 self.devices.birth_devices(birth_type).await;
308 }
309}
310
311pub struct EoN {
313 node: Arc<Node>,
314 eventloop: Box<DynEventLoop>,
315 stop_rx: mpsc::Receiver<EoNShutdown>,
316}
317
318impl EoN {
319 pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
320 let group_id = builder
321 .group_id
322 .ok_or("group id must be provided".to_string())?;
323 let node_id = builder
324 .node_id
325 .ok_or("node id must be provided".to_string())?;
326 srad_types::utils::validate_name(&group_id)?;
327 srad_types::utils::validate_name(&node_id)?;
328
329 let metric_manager = builder.metric_manager;
330 let (eventloop, client) = builder.eventloop_client;
331
332 let (stop_tx, stop_rx) = mpsc::channel(1);
333
334 let state = Arc::new(EoNState {
335 seq: AtomicU8::new(0),
336 bdseq: AtomicU8::new(0),
337 online: AtomicBool::new(false),
338 birthed: AtomicBool::new(false),
339 ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
340 group_id,
341 edge_node_id: node_id,
342 });
343
344 let registry = Arc::new(Mutex::new(Registry::new()));
345
346 let node = Arc::new(Node {
347 metric_manager,
348 client: client.clone(),
349 devices: DeviceMap::new(state.clone(), registry.clone(), client),
350 state,
351 stop_tx,
352 });
353
354 let mut eon = Self {
355 node,
356 eventloop,
357 stop_rx,
358 };
359 let handle = NodeHandle {
360 node: eon.node.clone(),
361 };
362 eon.node.metric_manager.init(&handle);
363 eon.update_last_will();
364 Ok((eon, handle))
365 }
366
367 fn update_last_will(&mut self) {
368 self.eventloop
369 .set_last_will(srad_client::LastWill::new_node(
370 &self.node.state.group_id,
371 &self.node.state.edge_node_id,
372 self.node.generate_death_payload(),
373 ));
374 }
375
376 fn on_online(&self) {
377 if self.node.state.online.swap(true, Ordering::SeqCst) {
378 return;
379 }
380 info!("Edge node online");
381 let sub_topics = self.node.state.sub_topics();
382 let node = self.node.clone();
383
384 tokio::spawn(async move {
385 if node.client.subscribe_many(sub_topics).await.is_ok() {
386 node.birth(BirthType::Birth).await
387 };
388 });
389 }
390
391 async fn on_offline(&mut self) {
392 if !self.node.state.online.swap(false, Ordering::SeqCst) {
393 return;
394 }
395 info!("Edge node offline");
396 self.node.devices.on_offline().await;
397 self.node.state.bdseq.fetch_add(1, Ordering::SeqCst);
398 self.update_last_will();
399 }
400
401 fn on_node_message(&self, message: NodeMessage) {
402 let payload = message.message.payload;
403 let message_kind = message.message.kind;
404
405 if message_kind == MessageKind::Cmd {
406 let mut rebirth = false;
407 for x in &payload.metrics {
408 if x.alias.is_some() {
409 continue;
410 }
411
412 let metric_name = match &x.name {
413 Some(name) => name,
414 None => continue,
415 };
416
417 if metric_name != NODE_CONTROL_REBIRTH {
418 continue;
419 }
420
421 rebirth = match &x.value {
422 Some(Value::BooleanValue(val)) => *val,
423 _ => false,
424 };
425
426 if !rebirth {
427 warn!("Received invalid CMD Rebirth metric - ignoring request")
428 }
429 }
430
431 let message_metrics: MessageMetrics = match payload.try_into() {
432 Ok(metrics) => metrics,
433 Err(_) => {
434 warn!("Received invalid CMD payload - ignoring request");
435 return;
436 }
437 };
438
439 let node = self.node.clone();
440 task::spawn(async move {
441 node.metric_manager
442 .on_ncmd(NodeHandle { node: node.clone() }, message_metrics)
443 .await;
444 if rebirth {
445 info!("Got Rebirth CMD - Rebirthing Node");
446 node.birth(BirthType::Rebirth).await
447 }
448 });
449 }
450 }
451
452 fn on_device_message(&self, message: DeviceMessage) {
453 let node = self.node.clone();
454 task::spawn(async move {
455 node.devices.handle_device_message(message).await;
456 });
457 }
458
459 async fn handle_event(&mut self, event: Event) {
460 match event {
461 Event::Online => self.on_online(),
462 Event::Offline => self.on_offline().await,
463 Event::Node(node_message) => self.on_node_message(node_message),
464 Event::Device(device_message) => self.on_device_message(device_message),
465 Event::State {
466 host_id: _,
467 payload: _,
468 } => (),
469 Event::InvalidPublish {
470 reason: _,
471 topic: _,
472 payload: _,
473 } => (),
474 }
475 }
476
477 async fn poll_until_offline(&mut self) -> bool {
478 while self.node.state.is_online() {
479 if Event::Offline == self.eventloop.poll().await {
480 self.on_offline().await
481 }
482 }
483 true
484 }
485
486 async fn poll_until_offline_with_timeout(&mut self) {
487 _ = timeout(Duration::from_secs(1), self.poll_until_offline()).await;
488 }
489
490 pub async fn run(&mut self) {
494 info!("Edge node running");
495 self.update_last_will();
496 loop {
497 select! {
498 event = self.eventloop.poll() => self.handle_event(event).await,
499 Some(_) = self.stop_rx.recv() => break,
500 }
501 }
502 self.poll_until_offline_with_timeout().await;
503 self.on_offline().await;
504 info!("Edge node stopped");
505 }
506}