srad_eon/
device.rs

1use std::{
2    collections::{HashMap, HashSet},
3    hash::{DefaultHasher, Hash, Hasher},
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc,
7    },
8};
9
10use log::{info, warn};
11use srad_client::{DeviceMessage, DynClient, Message, MessageKind};
12use srad_types::{payload::Payload, topic::DeviceTopic, utils::timestamp};
13use tokio::{select, sync::mpsc, task};
14
15use crate::{
16    birth::{BirthInitializer, BirthObjectType},
17    metric::{MetricPublisher, PublishError, PublishMetric},
18    metric_manager::manager::DynDeviceMetricManager,
19    node::{EoNState, TemplateRegistry},
20    BirthType, StateError,
21};
22use thiserror::Error;
23
24#[derive(Clone)]
25pub struct DeviceHandle {
26    node_state: Arc<EoNState>,
27    pub(crate) state: Arc<DeviceState>,
28    client: Arc<DynClient>,
29    handle_tx: mpsc::UnboundedSender<DeviceHandleRequest>,
30}
31
32impl DeviceHandle {
33    /// Enabled the device
34    ///
35    /// Will attempt to birth the device. If the node is not online, the device will be birthed when it is next online.
36    pub fn enable(&self) {
37        _ = self.handle_tx.send(DeviceHandleRequest::Enable);
38    }
39
40    /// Rebirth the device
41    ///
42    /// Manually trigger a rebirth for the device.
43    pub fn rebirth(&self) {
44        _ = self.handle_tx.send(DeviceHandleRequest::Rebirth);
45    }
46
47    /// Disable the device
48    ///
49    /// Will produce a death message for the device. The node will no longer attempt to birth the device when it comes online.
50    pub fn disable(&self) {
51        _ = self.handle_tx.send(DeviceHandleRequest::Disable);
52    }
53
54    fn check_publish_state_and_get_seq(&self) -> Result<u64, PublishError> {
55        if !self.state.birthed.load(Ordering::Relaxed) {
56            return Err(PublishError::State(StateError::UnBirthed));
57        }
58        Ok(self.node_state.get_next_seq()?)
59    }
60
61    fn publish_metrics_to_payload(&self, seq: u64, metrics: Vec<PublishMetric>) -> Payload {
62        let timestamp = timestamp();
63        let mut payload_metrics = Vec::with_capacity(metrics.len());
64        for x in metrics.into_iter() {
65            payload_metrics.push(x.into());
66        }
67        Payload {
68            timestamp: Some(timestamp),
69            metrics: payload_metrics,
70            seq: Some(seq),
71            uuid: None,
72            body: None,
73        }
74    }
75}
76
77impl MetricPublisher for DeviceHandle {
78    async fn try_publish_metrics_unsorted(
79        &self,
80        metrics: Vec<PublishMetric>,
81    ) -> Result<(), PublishError> {
82        if metrics.is_empty() {
83            return Err(PublishError::NoMetrics);
84        }
85        match self
86            .client
87            .try_publish_device_message(
88                self.state.ddata_topic.clone(),
89                self.publish_metrics_to_payload(self.check_publish_state_and_get_seq()?, metrics),
90            )
91            .await
92        {
93            Ok(_) => Ok(()),
94            Err(_) => Err(PublishError::State(StateError::Offline)),
95        }
96    }
97
98    async fn publish_metrics_unsorted(
99        &self,
100        metrics: Vec<PublishMetric>,
101    ) -> Result<(), PublishError> {
102        if metrics.is_empty() {
103            return Err(PublishError::NoMetrics);
104        }
105        match self
106            .client
107            .publish_device_message(
108                self.state.ddata_topic.clone(),
109                self.publish_metrics_to_payload(self.check_publish_state_and_get_seq()?, metrics),
110            )
111            .await
112        {
113            Ok(_) => Ok(()),
114            Err(_) => Err(PublishError::State(StateError::Offline)),
115        }
116    }
117}
118
119pub(crate) struct DeviceState {
120    birthed: AtomicBool,
121    id: DeviceId,
122    pub(crate) name: Arc<String>,
123    ddata_topic: DeviceTopic,
124}
125
126#[derive(Debug)]
127enum DeviceHandleRequest {
128    Enable,
129    Disable,
130    Rebirth,
131}
132
133pub struct Device {
134    state: Arc<DeviceState>,
135    template_registry: Arc<TemplateRegistry>,
136    eon_state: Arc<EoNState>,
137    dev_impl: Box<DynDeviceMetricManager>,
138    client: Arc<DynClient>,
139    enabled: bool,
140    handle_request_tx: mpsc::UnboundedSender<DeviceHandleRequest>,
141    device_message_rx: mpsc::UnboundedReceiver<Message>,
142    node_state_rx: mpsc::UnboundedReceiver<NodeStateMessage>,
143    handle_request_rx: mpsc::UnboundedReceiver<DeviceHandleRequest>,
144}
145
146impl Device {
147    fn generate_birth_payload(&self, seq: u64) -> Payload {
148        let mut birth_initializer = BirthInitializer::new(
149            BirthObjectType::Device(self.state.id),
150            self.template_registry.clone(),
151        );
152        self.dev_impl.initialise_birth(&mut birth_initializer);
153        let timestamp = timestamp();
154        let metrics = birth_initializer.finish();
155        Payload {
156            seq: Some(seq),
157            timestamp: Some(timestamp),
158            metrics,
159            uuid: None,
160            body: None,
161        }
162    }
163
164    fn generate_death_payload(&self, seq: u64) -> Payload {
165        let timestamp = timestamp();
166        Payload {
167            seq: Some(seq),
168            timestamp: Some(timestamp),
169            metrics: Vec::new(),
170            uuid: None,
171            body: None,
172        }
173    }
174
175    async fn birth(&self, birth_type: &BirthType) {
176        if !self.enabled {
177            return;
178        }
179
180        if *birth_type == BirthType::Birth && self.state.birthed.load(Ordering::SeqCst) {
181            return;
182        }
183
184        let seq = match self.eon_state.get_next_seq() {
185            Ok(seq) => seq,
186            Err(_) => return,
187        };
188
189        self.state.birthed.store(false, Ordering::SeqCst);
190
191        let payload = self.generate_birth_payload(seq);
192        if self
193            .client
194            .publish_device_message(
195                DeviceTopic::new(
196                    &self.eon_state.group_id,
197                    srad_types::topic::DeviceMessage::DBirth,
198                    &self.eon_state.edge_node_id,
199                    &self.state.name,
200                ),
201                payload,
202            )
203            .await
204            .is_ok()
205        {
206            self.state.birthed.store(true, Ordering::SeqCst);
207            info!(
208                "Device birthed. Node = {}, Device = {}, Type = {:?}",
209                self.eon_state.edge_node_id, self.state.name, birth_type
210            );
211        }
212    }
213
214    async fn death(&self, publish: bool) {
215        if !self.state.birthed.load(Ordering::SeqCst) {
216            return;
217        }
218
219        self.state.birthed.store(false, Ordering::SeqCst);
220
221        info!(
222            "Device dead. Node = {}, Device = {}",
223            self.eon_state.edge_node_id, self.state.name
224        );
225
226        if publish {
227            //getting the sequence can only fail if the node is no longer birthed/offline
228            //in this case we cant/shouldn't publish
229            let seq = match self.eon_state.get_next_seq() {
230                Ok(seq) => seq,
231                Err(_) => return,
232            };
233
234            let payload = self.generate_death_payload(seq);
235            _ = self
236                .client
237                .publish_device_message(
238                    DeviceTopic::new(
239                        &self.eon_state.group_id,
240                        srad_types::topic::DeviceMessage::DDeath,
241                        &self.eon_state.edge_node_id,
242                        &self.state.name,
243                    ),
244                    payload,
245                )
246                .await;
247        }
248    }
249
250    async fn handle_sparkplug_message(&self, message: Message, handle: DeviceHandle) {
251        let payload = message.payload;
252        let message_kind = message.kind;
253        if MessageKind::Cmd == message_kind {
254            let message_metrics = match payload.try_into() {
255                Ok(metrics) => metrics,
256                Err(_) => {
257                    warn!(
258                        "Got invalid CMD payload for device - ignoring. Device = {}",
259                        self.state.name
260                    );
261                    return;
262                }
263            };
264            self.dev_impl.on_dcmd(handle, message_metrics).await
265        }
266    }
267
268    fn create_handle(&self) -> DeviceHandle {
269        DeviceHandle {
270            node_state: self.eon_state.clone(),
271            state: self.state.clone(),
272            client: self.client.clone(),
273            handle_tx: self.handle_request_tx.clone(),
274        }
275    }
276
277    async fn enable(&mut self) {
278        self.enabled = true;
279        self.birth(&BirthType::Birth).await
280    }
281
282    async fn disable(&mut self) {
283        self.enabled = false;
284        self.death(true).await
285    }
286
287    async fn run(mut self) {
288        loop {
289            select! {
290                biased;
291                maybe_state_update = self.node_state_rx.recv() => match maybe_state_update {
292                    Some(state_update) => match state_update {
293                            NodeStateMessage::Birth(birth_type, new_template_registry) => {
294                                self.template_registry = new_template_registry;
295                                self.birth(&birth_type).await
296                            },
297                            NodeStateMessage::Death => self.death(false).await,
298                            NodeStateMessage::Removed => {
299                                self.death(true).await;
300                                break;
301                            },
302                    },
303                    None => break, //Node has dropped tx
304                },
305                Some(request) = self.handle_request_rx.recv() => {
306                    match request {
307                        DeviceHandleRequest::Enable => self.enable().await,
308                        DeviceHandleRequest::Disable => self.disable().await,
309                        DeviceHandleRequest::Rebirth => self.birth(&BirthType::Rebirth).await,
310                    }
311                },
312                maybe_message = self.device_message_rx.recv() => match maybe_message {
313                    Some(message) => self.handle_sparkplug_message(message, self.create_handle()).await,
314                    None => break,
315                }
316
317            }
318        }
319    }
320}
321
322#[derive(Debug)]
323enum NodeStateMessage {
324    Birth(BirthType, Arc<TemplateRegistry>),
325    Death,
326    Removed,
327}
328
329#[derive(Error, Debug)]
330pub enum DeviceRegistrationError {
331    #[error("Duplicate device")]
332    DuplicateDevice,
333    #[error("Invalid Device name: {0}")]
334    InvalidName(String),
335}
336
337struct DeviceMapEntry {
338    id: DeviceId,
339    device_message_tx: mpsc::UnboundedSender<Message>,
340    node_state_tx: mpsc::UnboundedSender<NodeStateMessage>,
341}
342
343pub struct DeviceMap {
344    device_ids: HashSet<DeviceId>,
345    devices: HashMap<Arc<String>, DeviceMapEntry>,
346    template_registry: Arc<TemplateRegistry>,
347}
348
349const OBJECT_ID_NODE: u32 = 0;
350pub type DeviceId = u32;
351
352impl DeviceMap {
353    pub(crate) fn new(template_registry: Arc<TemplateRegistry>) -> Self {
354        Self {
355            device_ids: HashSet::new(),
356            devices: HashMap::new(),
357            template_registry,
358        }
359    }
360
361    fn generate_device_id(&mut self, name: &String) -> DeviceId {
362        let mut hasher = DefaultHasher::new();
363        name.hash(&mut hasher);
364        let mut id = hasher.finish() as DeviceId;
365        while id == OBJECT_ID_NODE || self.device_ids.contains(&id) {
366            id += 1;
367        }
368        self.device_ids.insert(id);
369        id
370    }
371
372    fn remove_device_id(&mut self, id: DeviceId) {
373        self.device_ids.remove(&id);
374    }
375
376    pub(crate) fn add_device(
377        &mut self,
378        group_id: &str,
379        node_id: &str,
380        name: String,
381        dev_impl: Box<DynDeviceMetricManager>,
382        eon_state: Arc<EoNState>,
383        client: Arc<DynClient>,
384    ) -> Result<DeviceHandle, DeviceRegistrationError> {
385        if self.devices.get_key_value(&name).is_some() {
386            return Err(DeviceRegistrationError::DuplicateDevice);
387        }
388
389        let name = Arc::new(name);
390        let id = self.generate_device_id(&name);
391
392        let (device_message_tx, device_message_rx) = mpsc::unbounded_channel();
393        let (handle_request_tx, handle_request_rx) = mpsc::unbounded_channel();
394        let (node_state_tx, node_state_rx) = mpsc::unbounded_channel();
395
396        let device_map_entry = DeviceMapEntry {
397            id,
398            device_message_tx,
399            node_state_tx,
400        };
401
402        let device = Device {
403            state: Arc::new(DeviceState {
404                id,
405                name: name.clone(),
406                ddata_topic: DeviceTopic::new(
407                    group_id,
408                    srad_types::topic::DeviceMessage::DData,
409                    node_id,
410                    &name,
411                ),
412                birthed: AtomicBool::new(false),
413            }),
414            template_registry: self.template_registry.clone(),
415            enabled: false,
416            eon_state,
417            dev_impl,
418            client,
419            handle_request_tx,
420            device_message_rx,
421            node_state_rx,
422            handle_request_rx,
423        };
424
425        let handle = device.create_handle();
426        device.dev_impl.init(&handle);
427
428        task::spawn(async move { device.run().await });
429        self.devices.insert(name, device_map_entry);
430        Ok(handle)
431    }
432
433    pub(crate) fn remove_device(&mut self, device: &String) {
434        let entry = {
435            let entry = match self.devices.remove(device) {
436                Some(entry) => entry,
437                None => return,
438            };
439
440            self.remove_device_id(entry.id);
441            entry
442        };
443        _ = entry.node_state_tx.send(NodeStateMessage::Removed);
444    }
445
446    pub(crate) fn birth_devices(
447        &mut self,
448        birth_type: BirthType,
449        new_template_registry: &Arc<TemplateRegistry>,
450    ) {
451        self.template_registry = new_template_registry.clone();
452        info!("Birthing Devices. Type = {birth_type:?}");
453        for entry in self.devices.values() {
454            _ = entry.node_state_tx.send(NodeStateMessage::Birth(
455                birth_type,
456                new_template_registry.clone(),
457            ));
458        }
459    }
460
461    pub(crate) fn on_death(&self) {
462        for entry in self.devices.values() {
463            _ = entry.node_state_tx.send(NodeStateMessage::Death);
464        }
465    }
466
467    pub(crate) fn handle_device_message(&self, message: DeviceMessage) {
468        let entry = {
469            match self.devices.get(&message.device_id) {
470                Some(entry) => entry,
471                None => {
472                    warn!(
473                        "Got message for unknown device. Device = '{}'",
474                        message.device_id
475                    );
476                    return;
477                }
478            }
479        };
480        _ = entry.device_message_tx.send(message.message);
481    }
482}