srad_eon/
device.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc, Mutex,
6    },
7};
8
9use futures::future::join_all;
10use log::{debug, info, warn};
11use srad_client::{DeviceMessage, DynClient, MessageKind};
12use srad_types::{payload::Payload, topic::DeviceTopic, utils::timestamp};
13
14use crate::{
15    birth::{BirthInitializer, BirthObjectType},
16    error::DeviceRegistrationError,
17    metric::{MetricPublisher, PublishError, PublishMetric},
18    metric_manager::manager::DynDeviceMetricManager,
19    node::EoNState,
20    registry::{self, DeviceId},
21    BirthType,
22};
23
24pub(crate) struct DeviceInfo {
25    id: DeviceId,
26    pub(crate) name: Arc<String>,
27    ddata_topic: DeviceTopic,
28}
29
30/// A handle for interacting with an Edge Device
31#[derive(Clone)]
32pub struct DeviceHandle {
33    pub(crate) device: Arc<Device>,
34}
35
36impl DeviceHandle {
37    /// Enabled the device
38    ///
39    /// Will attempt to birth the device. If the node is not online, the device will be birthed when it is next online.
40    pub async fn enable(&self) {
41        self.device.enabled.store(true, Ordering::SeqCst);
42        self.device.birth(&BirthType::Birth).await;
43    }
44
45    /// Rebirth the device
46    ///
47    /// Manually trigger a rebirth for the device.
48    pub async fn rebirth(&self) {
49        self.device.enabled.store(true, Ordering::SeqCst);
50        self.device.birth(&BirthType::Rebirth).await;
51    }
52
53    /// Disable the device
54    ///
55    /// Will produce a death message for the device. The node will no longer attempt to birth the device when it comes online.
56    pub async fn disable(&self) {
57        if !self.device.enabled.swap(false, Ordering::SeqCst) {
58            //already disabled
59            return;
60        };
61        self.device.death(true).await
62    }
63
64    fn check_publish_state(&self) -> Result<(), PublishError> {
65        if !self.device.eon_state.is_online() {
66            return Err(PublishError::Offline);
67        }
68        if !self.device.birthed.load(Ordering::Relaxed) {
69            return Err(PublishError::UnBirthed);
70        }
71        Ok(())
72    }
73
74    fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
75        let timestamp = timestamp();
76        let mut payload_metrics = Vec::with_capacity(metrics.len());
77        for x in metrics.into_iter() {
78            payload_metrics.push(x.into());
79        }
80        Payload {
81            timestamp: Some(timestamp),
82            metrics: payload_metrics,
83            seq: Some(self.device.eon_state.get_seq()),
84            uuid: None,
85            body: None,
86        }
87    }
88}
89
90impl MetricPublisher for DeviceHandle {
91    async fn try_publish_metrics_unsorted(
92        &self,
93        metrics: Vec<PublishMetric>,
94    ) -> Result<(), PublishError> {
95        if metrics.is_empty() {
96            return Err(PublishError::NoMetrics);
97        }
98        self.check_publish_state()?;
99        match self
100            .device
101            .client
102            .try_publish_device_message(
103                self.device.info.ddata_topic.clone(),
104                self.publish_metrics_to_payload(metrics),
105            )
106            .await
107        {
108            Ok(_) => Ok(()),
109            Err(_) => Err(PublishError::Offline),
110        }
111    }
112
113    async fn publish_metrics_unsorted(
114        &self,
115        metrics: Vec<PublishMetric>,
116    ) -> Result<(), PublishError> {
117        if metrics.is_empty() {
118            return Err(PublishError::NoMetrics);
119        }
120        self.check_publish_state()?;
121        match self
122            .device
123            .client
124            .publish_device_message(
125                self.device.info.ddata_topic.clone(),
126                self.publish_metrics_to_payload(metrics),
127            )
128            .await
129        {
130            Ok(_) => Ok(()),
131            Err(_) => Err(PublishError::Offline),
132        }
133    }
134}
135
136pub struct Device {
137    pub(crate) info: DeviceInfo,
138    birthed: AtomicBool,
139    birth_lock: tokio::sync::Mutex<()>,
140    enabled: AtomicBool,
141    eon_state: Arc<EoNState>,
142    dev_impl: Arc<DynDeviceMetricManager>,
143    client: Arc<DynClient>,
144}
145
146impl Device {
147    fn generate_birth_payload(&self) -> Payload {
148        let mut birth_initializer = BirthInitializer::new(BirthObjectType::Device(self.info.id));
149        self.dev_impl.initialise_birth(&mut birth_initializer);
150        let timestamp = timestamp();
151        let metrics = birth_initializer.finish();
152
153        Payload {
154            seq: Some(self.eon_state.get_seq()),
155            timestamp: Some(timestamp),
156            metrics,
157            uuid: None,
158            body: None,
159        }
160    }
161
162    fn generate_death_payload(&self) -> Payload {
163        let timestamp = timestamp();
164        Payload {
165            seq: Some(self.eon_state.get_seq()),
166            timestamp: Some(timestamp),
167            metrics: Vec::new(),
168            uuid: None,
169            body: None,
170        }
171    }
172
173    pub async fn birth(&self, birth_type: &BirthType) {
174        if !self.enabled.load(Ordering::SeqCst) {
175            return;
176        }
177        let guard = self.birth_lock.lock().await;
178        if !self.eon_state.birthed() {
179            return;
180        }
181        if *birth_type == BirthType::Birth && self.birthed.load(Ordering::SeqCst) {
182            return;
183        }
184        debug!("Device {} birthing. Type: {:?}", self.info.name, birth_type);
185        let payload = self.generate_birth_payload();
186        if self
187            .client
188            .publish_device_message(
189                DeviceTopic::new(
190                    &self.eon_state.group_id,
191                    srad_types::topic::DeviceMessage::DBirth,
192                    &self.eon_state.edge_node_id,
193                    &self.info.name,
194                ),
195                payload,
196            )
197            .await
198            .is_ok()
199        {
200            self.birthed.store(true, Ordering::SeqCst)
201        };
202        drop(guard)
203    }
204
205    pub async fn death(&self, publish: bool) {
206        let guard = self.birth_lock.lock().await;
207        if !self.birthed.load(Ordering::SeqCst) {
208            return;
209        }
210        if publish {
211            let payload = self.generate_death_payload();
212            _ = self
213                .client
214                .publish_device_message(
215                    DeviceTopic::new(
216                        &self.eon_state.group_id,
217                        srad_types::topic::DeviceMessage::DDeath,
218                        &self.eon_state.edge_node_id,
219                        &self.info.name,
220                    ),
221                    payload,
222                )
223                .await;
224        }
225        self.birthed.store(false, Ordering::SeqCst);
226        debug!("Device {} dead", self.info.name);
227        drop(guard)
228    }
229}
230
231pub struct DeviceMapInner {
232    devices: HashMap<Arc<String>, Arc<Device>>,
233}
234
235pub struct DeviceMap {
236    client: Arc<DynClient>,
237    state: tokio::sync::Mutex<DeviceMapInner>,
238    eon_state: Arc<EoNState>,
239    registry: Arc<Mutex<registry::Registry>>,
240}
241
242impl DeviceMap {
243    pub fn new(
244        eon_state: Arc<EoNState>,
245        registry: Arc<Mutex<registry::Registry>>,
246        client: Arc<DynClient>,
247    ) -> Self {
248        Self {
249            eon_state,
250            registry,
251            client,
252            state: tokio::sync::Mutex::new(DeviceMapInner {
253                devices: HashMap::new(),
254            }),
255        }
256    }
257
258    pub async fn add_device(
259        &self,
260        group_id: &str,
261        node_id: &str,
262        name: String,
263        dev_impl: Arc<DynDeviceMetricManager>,
264    ) -> Result<DeviceHandle, DeviceRegistrationError> {
265        let mut state = self.state.lock().await;
266        if state.devices.get_key_value(&name).is_some() {
267            return Err(DeviceRegistrationError::DuplicateDevice);
268        }
269
270        let name = Arc::new(name);
271        let mut registry = self.registry.lock().unwrap();
272        let id = registry.generate_device_id(name.clone());
273        drop(registry);
274
275        let ddata_topic = DeviceTopic::new(
276            group_id,
277            srad_types::topic::DeviceMessage::DData,
278            node_id,
279            &name,
280        );
281
282        let device = Arc::new(Device {
283            info: DeviceInfo {
284                id,
285                name: name.clone(),
286                ddata_topic,
287            },
288            birth_lock: tokio::sync::Mutex::new(()),
289            birthed: AtomicBool::new(false),
290            enabled: AtomicBool::new(false),
291            eon_state: self.eon_state.clone(),
292            dev_impl,
293            client: self.client.clone(),
294        });
295        let handle = DeviceHandle {
296            device: device.clone(),
297        };
298        device.dev_impl.init(&handle);
299        state.devices.insert(name, device);
300        drop(state);
301        Ok(handle)
302    }
303
304    pub async fn remove_device(&self, device: &String) {
305        let dev = {
306            let mut state = self.state.lock().await;
307            let dev = match state.devices.remove(device) {
308                Some(dev) => dev,
309                None => return,
310            };
311
312            {
313                let mut registry = self.registry.lock().unwrap();
314                registry.remove_device_id(dev.info.id);
315            }
316            dev
317        };
318        dev.death(true).await;
319    }
320
321    pub async fn birth_devices(&self, birth_type: BirthType) {
322        info!("Birthing Devices. Type: {:?}", birth_type);
323        let device_map = self.state.lock().await;
324        let futures: Vec<_> = device_map
325            .devices
326            .values()
327            .map(|x| x.birth(&birth_type))
328            .collect();
329        join_all(futures).await;
330    }
331
332    pub async fn on_offline(&self) {
333        let device_map = self.state.lock().await;
334        let futures: Vec<_> = device_map
335            .devices
336            .values()
337            .map(|x| x.death(false))
338            .collect();
339        join_all(futures).await;
340    }
341
342    pub async fn handle_device_message(&self, message: DeviceMessage) {
343        let dev = {
344            let state = self.state.lock().await;
345            let dev = state.devices.get(&message.device_id);
346            match dev {
347                Some(dev) => dev.clone(),
348                None => {
349                    warn!("Got message for unknown device '{}'", message.device_id);
350                    return;
351                }
352            }
353        };
354
355        let payload = message.message.payload;
356        let message_kind = message.message.kind;
357        if MessageKind::Cmd == message_kind {
358            let message_metrics = match payload.try_into() {
359                Ok(metrics) => metrics,
360                Err(_) => {
361                    warn!(
362                        "Got invalid CMD payload for device '{}' - ignoring",
363                        message.device_id
364                    );
365                    return;
366                }
367            };
368            dev.dev_impl
369                .on_dcmd(
370                    DeviceHandle {
371                        device: dev.clone(),
372                    },
373                    message_metrics,
374                )
375                .await
376        }
377    }
378}