1use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use log::{debug, error, info, warn};
6use srad_client::{DeviceMessage, DynClient, DynEventLoop, MessageKind};
7use srad_client::{Event, NodeMessage};
8
9use srad_types::constants::NODE_CONTROL_REBIRTH;
10use srad_types::payload::metric::Value;
11use srad_types::topic::{DeviceTopic, NodeTopic, QoS, StateTopic, Topic, TopicFilter};
12use srad_types::utils::timestamp;
13use srad_types::MetricValue;
14use srad_types::{
15 constants,
16 payload::Payload,
17 topic::{DeviceMessage as DeviceMessageType, NodeMessage as NodeMessageType},
18};
19use tokio::time::timeout;
20
21use crate::birth::{BirthInitializer, BirthMetricDetails, BirthObjectType};
22use crate::builder::EoNBuilder;
23use crate::device::{DeviceHandle, DeviceMap};
24use crate::error::DeviceRegistrationError;
25use crate::metric::{MessageMetrics, MetricPublisher, PublishError, PublishMetric};
26use crate::metric_manager::manager::{DeviceMetricManager, DynNodeMetricManager};
27use crate::registry::Registry;
28use crate::BirthType;
29
30use tokio::{select, sync::mpsc, task};
31
32#[derive(Debug)]
33struct EoNShutdown;
34
35#[derive(Clone)]
40pub struct NodeHandle {
41 node: Arc<Node>,
42}
43
44impl NodeHandle {
45 pub async fn cancel(&self) {
49 info!("Edge node stopping");
50 let topic = NodeTopic::new(
51 &self.node.state.group_id,
52 NodeMessageType::NDeath,
53 &self.node.state.edge_node_id,
54 );
55 let payload = self.node.generate_death_payload();
56 match self
57 .node
58 .client
59 .try_publish_node_message(topic, payload)
60 .await
61 {
62 Ok(_) => (),
63 Err(_) => debug!("Unable to publish node death certificate on exit"),
64 };
65 _ = self.node.stop_tx.send(EoNShutdown).await;
66 _ = self.node.client.disconnect().await;
67 }
68
69 pub async fn rebirth(&self) {
71 self.node.birth(BirthType::Rebirth).await;
72 }
73
74 pub async fn register_device<S, M>(
80 &self,
81 name: S,
82 dev_impl: M,
83 ) -> Result<DeviceHandle, DeviceRegistrationError>
84 where
85 S: Into<String>,
86 M: DeviceMetricManager + Send + Sync + 'static,
87 {
88 let name = name.into();
89 if let Err(e) = srad_types::utils::validate_name(&name) {
90 return Err(DeviceRegistrationError::InvalidName(e));
91 }
92 let handle = self
93 .node
94 .devices
95 .add_device(
96 &self.node.state.group_id,
97 &self.node.state.edge_node_id,
98 name,
99 Arc::new(dev_impl),
100 )
101 .await?;
102 Ok(handle)
103 }
104
105 pub async fn unregister_device(&self, handle: DeviceHandle) {
107 self.unregister_device_named(&handle.device.info.name).await;
108 }
109
110 pub async fn unregister_device_named(&self, name: &String) {
112 self.node.devices.remove_device(name).await
113 }
114
115 fn check_publish_state(&self) -> Result<(), PublishError> {
116 if !self.node.state.is_online() {
117 return Err(PublishError::Offline);
118 }
119 if !self.node.state.birthed() {
120 return Err(PublishError::UnBirthed);
121 }
122 Ok(())
123 }
124
125 fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
126 let timestamp = timestamp();
127 let mut payload_metrics = Vec::with_capacity(metrics.len());
128 for x in metrics.into_iter() {
129 payload_metrics.push(x.into());
130 }
131 Payload {
132 timestamp: Some(timestamp),
133 metrics: payload_metrics,
134 seq: Some(self.node.state.get_seq()),
135 uuid: None,
136 body: None,
137 }
138 }
139}
140
141impl MetricPublisher for NodeHandle {
142 async fn try_publish_metrics_unsorted(
143 &self,
144 metrics: Vec<PublishMetric>,
145 ) -> Result<(), PublishError> {
146 if metrics.is_empty() {
147 return Err(PublishError::NoMetrics);
148 }
149 self.check_publish_state()?;
150 match self
151 .node
152 .client
153 .try_publish_node_message(
154 self.node.state.ndata_topic.clone(),
155 self.publish_metrics_to_payload(metrics),
156 )
157 .await
158 {
159 Ok(_) => Ok(()),
160 Err(_) => Err(PublishError::Offline),
161 }
162 }
163
164 async fn publish_metrics_unsorted(
165 &self,
166 metrics: Vec<PublishMetric>,
167 ) -> Result<(), PublishError> {
168 if metrics.is_empty() {
169 return Err(PublishError::NoMetrics);
170 }
171 self.check_publish_state()?;
172 match self
173 .node
174 .client
175 .publish_node_message(
176 self.node.state.ndata_topic.clone(),
177 self.publish_metrics_to_payload(metrics),
178 )
179 .await
180 {
181 Ok(_) => Ok(()),
182 Err(_) => Err(PublishError::Offline),
183 }
184 }
185}
186
187pub(crate) struct EoNState {
188 bdseq: AtomicU8,
189 seq: AtomicU8,
190 online: AtomicBool,
191 birthed: AtomicBool,
192 pub group_id: String,
193 pub edge_node_id: String,
194 pub ndata_topic: NodeTopic,
195}
196
197impl EoNState {
198 pub(crate) fn get_seq(&self) -> u64 {
199 self.seq.fetch_add(1, Ordering::Relaxed) as u64
200 }
201
202 pub(crate) fn is_online(&self) -> bool {
203 self.online.load(Ordering::SeqCst)
204 }
205
206 pub(crate) fn birthed(&self) -> bool {
207 self.birthed.load(Ordering::SeqCst)
208 }
209
210 fn birth_topic(&self) -> NodeTopic {
211 NodeTopic::new(&self.group_id, NodeMessageType::NBirth, &self.edge_node_id)
212 }
213
214 fn sub_topics(&self) -> Vec<TopicFilter> {
215 vec![
216 TopicFilter::new_with_qos(
217 Topic::NodeTopic(NodeTopic::new(
218 &self.group_id,
219 NodeMessageType::NCmd,
220 &self.edge_node_id,
221 )),
222 QoS::AtLeastOnce,
223 ),
224 TopicFilter::new_with_qos(
225 Topic::DeviceTopic(DeviceTopic::new(
226 &self.group_id,
227 DeviceMessageType::DCmd,
228 &self.edge_node_id,
229 "+",
230 )),
231 QoS::AtLeastOnce,
232 ),
233 TopicFilter::new_with_qos(Topic::State(StateTopic::new()), QoS::AtLeastOnce),
234 ]
235 }
236}
237
238pub struct Node {
239 state: Arc<EoNState>,
240 metric_manager: Box<DynNodeMetricManager>,
241 devices: DeviceMap,
242 client: Arc<DynClient>,
243 stop_tx: mpsc::Sender<EoNShutdown>,
244}
245
246impl Node {
247 fn generate_death_payload(&self) -> Payload {
248 let mut metric = srad_types::payload::Metric::new();
249 metric
250 .set_name(constants::BDSEQ.to_string())
251 .set_value(MetricValue::from(self.state.bdseq.load(Ordering::SeqCst) as i64).into());
252 Payload {
253 seq: None,
254 metrics: vec![metric],
255 uuid: None,
256 timestamp: None,
257 body: None,
258 }
259 }
260
261 fn generate_birth_payload(&self, bdseq: i64, seq: u64) -> Payload {
262 let timestamp = timestamp();
263 let mut birth_initializer = BirthInitializer::new(BirthObjectType::Node);
264 birth_initializer
265 .register_metric(
266 BirthMetricDetails::new_with_initial_value(constants::BDSEQ, bdseq)
267 .use_alias(false),
268 )
269 .unwrap();
270 birth_initializer
271 .register_metric(
272 BirthMetricDetails::new_with_initial_value(constants::NODE_CONTROL_REBIRTH, false)
273 .use_alias(false),
274 )
275 .unwrap();
276
277 self.metric_manager.initialise_birth(&mut birth_initializer);
278 let metrics = birth_initializer.finish();
279
280 Payload {
281 seq: Some(seq),
282 timestamp: Some(timestamp),
283 metrics,
284 uuid: None,
285 body: None,
286 }
287 }
288
289 async fn node_birth(&self) {
290 self.state.birthed.store(false, Ordering::SeqCst);
292 self.state.seq.store(0, Ordering::SeqCst);
293 let bdseq = self.state.bdseq.load(Ordering::SeqCst) as i64;
294
295 let payload = self.generate_birth_payload(bdseq, 0);
296 let topic = self.state.birth_topic();
297 self.state.seq.store(1, Ordering::SeqCst);
298 match self.client.publish_node_message(topic, payload).await {
299 Ok(_) => self.state.birthed.store(true, Ordering::SeqCst),
300 Err(_) => error!("Publishing birth message failed"),
301 }
302 }
303
304 async fn birth(&self, birth_type: BirthType) {
305 info!("Birthing Node. Type: {:?}", birth_type);
306 self.node_birth().await;
307 self.devices.birth_devices(birth_type).await;
308 }
309}
310
311pub struct EoN {
315 node: Arc<Node>,
316 eventloop: Box<DynEventLoop>,
317 stop_rx: mpsc::Receiver<EoNShutdown>,
318}
319
320impl EoN {
321 pub(crate) fn new_from_builder(builder: EoNBuilder) -> Result<(Self, NodeHandle), String> {
322 let group_id = builder
323 .group_id
324 .ok_or("group id must be provided".to_string())?;
325 let node_id = builder
326 .node_id
327 .ok_or("node id must be provided".to_string())?;
328 srad_types::utils::validate_name(&group_id)?;
329 srad_types::utils::validate_name(&node_id)?;
330
331 let metric_manager = builder.metric_manager;
332 let (eventloop, client) = builder.eventloop_client;
333
334 let (stop_tx, stop_rx) = mpsc::channel(1);
335
336 let state = Arc::new(EoNState {
337 seq: AtomicU8::new(0),
338 bdseq: AtomicU8::new(0),
339 online: AtomicBool::new(false),
340 birthed: AtomicBool::new(false),
341 ndata_topic: NodeTopic::new(&group_id, NodeMessageType::NData, &node_id),
342 group_id,
343 edge_node_id: node_id,
344 });
345
346 let registry = Arc::new(Mutex::new(Registry::new()));
347
348 let node = Arc::new(Node {
349 metric_manager,
350 client: client.clone(),
351 devices: DeviceMap::new(state.clone(), registry.clone(), client),
352 state,
353 stop_tx,
354 });
355
356 let mut eon = Self {
357 node,
358 eventloop,
359 stop_rx,
360 };
361 let handle = NodeHandle {
362 node: eon.node.clone(),
363 };
364 eon.node.metric_manager.init(&handle);
365 eon.update_last_will();
366 Ok((eon, handle))
367 }
368
369 fn update_last_will(&mut self) {
370 self.eventloop
371 .set_last_will(srad_client::LastWill::new_node(
372 &self.node.state.group_id,
373 &self.node.state.edge_node_id,
374 self.node.generate_death_payload(),
375 ));
376 }
377
378 fn on_online(&self) {
379 if self.node.state.online.swap(true, Ordering::SeqCst) {
380 return;
381 }
382 info!("Edge node online");
383 let sub_topics = self.node.state.sub_topics();
384 let node = self.node.clone();
385
386 tokio::spawn(async move {
387 if node.client.subscribe_many(sub_topics).await.is_ok() {
388 node.birth(BirthType::Birth).await
389 };
390 });
391 }
392
393 async fn on_offline(&mut self) {
394 if !self.node.state.online.swap(false, Ordering::SeqCst) {
395 return;
396 }
397 info!("Edge node offline");
398 self.node.devices.on_offline().await;
399 self.node.state.bdseq.fetch_add(1, Ordering::SeqCst);
400 self.update_last_will();
401 }
402
403 fn on_node_message(&self, message: NodeMessage) {
404 let payload = message.message.payload;
405 let message_kind = message.message.kind;
406
407 if message_kind == MessageKind::Cmd {
408 let mut rebirth = false;
409 for x in &payload.metrics {
410 if x.alias.is_some() {
411 continue;
412 }
413
414 let metric_name = match &x.name {
415 Some(name) => name,
416 None => continue,
417 };
418
419 if metric_name != NODE_CONTROL_REBIRTH {
420 continue;
421 }
422
423 rebirth = match &x.value {
424 Some(Value::BooleanValue(val)) => *val,
425 _ => false,
426 };
427
428 if !rebirth {
429 warn!("Received invalid CMD Rebirth metric - ignoring request")
430 }
431 }
432
433 let message_metrics: MessageMetrics = match payload.try_into() {
434 Ok(metrics) => metrics,
435 Err(_) => {
436 warn!("Received invalid CMD payload - ignoring request");
437 return;
438 }
439 };
440
441 let node = self.node.clone();
442 task::spawn(async move {
443 node.metric_manager
444 .on_ncmd(NodeHandle { node: node.clone() }, message_metrics)
445 .await;
446 if rebirth {
447 info!("Got Rebirth CMD - Rebirthing Node");
448 node.birth(BirthType::Rebirth).await
449 }
450 });
451 }
452 }
453
454 fn on_device_message(&self, message: DeviceMessage) {
455 let node = self.node.clone();
456 task::spawn(async move {
457 node.devices.handle_device_message(message).await;
458 });
459 }
460
461 async fn handle_event(&mut self, event: Event) {
462 match event {
463 Event::Online => self.on_online(),
464 Event::Offline => self.on_offline().await,
465 Event::Node(node_message) => self.on_node_message(node_message),
466 Event::Device(device_message) => self.on_device_message(device_message),
467 Event::State {
468 host_id: _,
469 payload: _,
470 } => (),
471 Event::InvalidPublish {
472 reason: _,
473 topic: _,
474 payload: _,
475 } => (),
476 }
477 }
478
479 async fn poll_until_offline(&mut self) -> bool {
480 while self.node.state.is_online() {
481 if Event::Offline == self.eventloop.poll().await {
482 self.on_offline().await
483 }
484 }
485 true
486 }
487
488 async fn poll_until_offline_with_timeout(&mut self) {
489 _ = timeout(Duration::from_secs(1), self.poll_until_offline()).await;
490 }
491
492 pub async fn run(&mut self) {
496 info!("Edge node running");
497 self.update_last_will();
498 loop {
499 select! {
500 event = self.eventloop.poll() => self.handle_event(event).await,
501 Some(_) = self.stop_rx.recv() => break,
502 }
503 }
504 self.poll_until_offline_with_timeout().await;
505 self.on_offline().await;
506 info!("Edge node stopped");
507 }
508}