1use std::{
2 collections::HashMap,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc, Mutex,
6 },
7};
8
9use futures::future::join_all;
10use log::{debug, info, warn};
11use srad_client::{DeviceMessage, DynClient, MessageKind};
12use srad_types::{payload::Payload, topic::DeviceTopic, utils::timestamp};
13
14use crate::{
15 birth::{BirthInitializer, BirthObjectType},
16 error::DeviceRegistrationError,
17 metric::{MetricPublisher, PublishError, PublishMetric},
18 metric_manager::manager::DynDeviceMetricManager,
19 node::EoNState,
20 registry::{self, DeviceId},
21 BirthType,
22};
23
24pub(crate) struct DeviceInfo {
25 id: DeviceId,
26 pub(crate) name: Arc<String>,
27 ddata_topic: DeviceTopic,
28}
29
30#[derive(Clone)]
32pub struct DeviceHandle {
33 pub(crate) device: Arc<Device>,
34}
35
36impl DeviceHandle {
37 pub async fn enable(&self) {
41 self.device.enabled.store(true, Ordering::SeqCst);
42 self.device.birth(&BirthType::Birth).await;
43 }
44
45 pub async fn rebirth(&self) {
49 self.device.enabled.store(true, Ordering::SeqCst);
50 self.device.birth(&BirthType::Rebirth).await;
51 }
52
53 pub async fn disable(&self) {
57 if !self.device.enabled.swap(false, Ordering::SeqCst) {
58 return;
60 };
61 self.device.death(true).await
62 }
63
64 fn check_publish_state(&self) -> Result<(), PublishError> {
65 if !self.device.eon_state.is_online() {
66 return Err(PublishError::Offline);
67 }
68 if !self.device.birthed.load(Ordering::Relaxed) {
69 return Err(PublishError::UnBirthed);
70 }
71 Ok(())
72 }
73
74 fn publish_metrics_to_payload(&self, metrics: Vec<PublishMetric>) -> Payload {
75 let timestamp = timestamp();
76 let mut payload_metrics = Vec::with_capacity(metrics.len());
77 for x in metrics.into_iter() {
78 payload_metrics.push(x.into());
79 }
80 Payload {
81 timestamp: Some(timestamp),
82 metrics: payload_metrics,
83 seq: Some(self.device.eon_state.get_seq()),
84 uuid: None,
85 body: None,
86 }
87 }
88}
89
90impl MetricPublisher for DeviceHandle {
91 async fn try_publish_metrics_unsorted(
92 &self,
93 metrics: Vec<PublishMetric>,
94 ) -> Result<(), PublishError> {
95 if metrics.is_empty() {
96 return Err(PublishError::NoMetrics);
97 }
98 self.check_publish_state()?;
99 match self
100 .device
101 .client
102 .try_publish_device_message(
103 self.device.info.ddata_topic.clone(),
104 self.publish_metrics_to_payload(metrics),
105 )
106 .await
107 {
108 Ok(_) => Ok(()),
109 Err(_) => Err(PublishError::Offline),
110 }
111 }
112
113 async fn publish_metrics_unsorted(
114 &self,
115 metrics: Vec<PublishMetric>,
116 ) -> Result<(), PublishError> {
117 if metrics.is_empty() {
118 return Err(PublishError::NoMetrics);
119 }
120 self.check_publish_state()?;
121 match self
122 .device
123 .client
124 .publish_device_message(
125 self.device.info.ddata_topic.clone(),
126 self.publish_metrics_to_payload(metrics),
127 )
128 .await
129 {
130 Ok(_) => Ok(()),
131 Err(_) => Err(PublishError::Offline),
132 }
133 }
134}
135
136pub struct Device {
137 pub(crate) info: DeviceInfo,
138 birthed: AtomicBool,
139 birth_lock: tokio::sync::Mutex<()>,
140 enabled: AtomicBool,
141 eon_state: Arc<EoNState>,
142 dev_impl: Arc<DynDeviceMetricManager>,
143 client: Arc<DynClient>,
144}
145
146impl Device {
147 fn generate_birth_payload(&self) -> Payload {
148 let mut birth_initializer = BirthInitializer::new(BirthObjectType::Device(self.info.id));
149 self.dev_impl.initialise_birth(&mut birth_initializer);
150 let timestamp = timestamp();
151 let metrics = birth_initializer.finish();
152
153 Payload {
154 seq: Some(self.eon_state.get_seq()),
155 timestamp: Some(timestamp),
156 metrics,
157 uuid: None,
158 body: None,
159 }
160 }
161
162 fn generate_death_payload(&self) -> Payload {
163 let timestamp = timestamp();
164 Payload {
165 seq: Some(self.eon_state.get_seq()),
166 timestamp: Some(timestamp),
167 metrics: Vec::new(),
168 uuid: None,
169 body: None,
170 }
171 }
172
173 pub async fn birth(&self, birth_type: &BirthType) {
174 if !self.enabled.load(Ordering::SeqCst) {
175 return;
176 }
177 let guard = self.birth_lock.lock().await;
178 if !self.eon_state.birthed() {
179 return;
180 }
181 if *birth_type == BirthType::Birth && self.birthed.load(Ordering::SeqCst) {
182 return;
183 }
184 debug!("Device {} birthing. Type: {:?}", self.info.name, birth_type);
185 let payload = self.generate_birth_payload();
186 if self
187 .client
188 .publish_device_message(
189 DeviceTopic::new(
190 &self.eon_state.group_id,
191 srad_types::topic::DeviceMessage::DBirth,
192 &self.eon_state.edge_node_id,
193 &self.info.name,
194 ),
195 payload,
196 )
197 .await
198 .is_ok()
199 {
200 self.birthed.store(true, Ordering::SeqCst)
201 };
202 drop(guard)
203 }
204
205 pub async fn death(&self, publish: bool) {
206 let guard = self.birth_lock.lock().await;
207 if !self.birthed.load(Ordering::SeqCst) {
208 return;
209 }
210 if publish {
211 let payload = self.generate_death_payload();
212 _ = self
213 .client
214 .publish_device_message(
215 DeviceTopic::new(
216 &self.eon_state.group_id,
217 srad_types::topic::DeviceMessage::DDeath,
218 &self.eon_state.edge_node_id,
219 &self.info.name,
220 ),
221 payload,
222 )
223 .await;
224 }
225 self.birthed.store(false, Ordering::SeqCst);
226 debug!("Device {} dead", self.info.name);
227 drop(guard)
228 }
229}
230
231pub struct DeviceMapInner {
232 devices: HashMap<Arc<String>, Arc<Device>>,
233}
234
235pub struct DeviceMap {
236 client: Arc<DynClient>,
237 state: tokio::sync::Mutex<DeviceMapInner>,
238 eon_state: Arc<EoNState>,
239 registry: Arc<Mutex<registry::Registry>>,
240}
241
242impl DeviceMap {
243 pub fn new(
244 eon_state: Arc<EoNState>,
245 registry: Arc<Mutex<registry::Registry>>,
246 client: Arc<DynClient>,
247 ) -> Self {
248 Self {
249 eon_state,
250 registry,
251 client,
252 state: tokio::sync::Mutex::new(DeviceMapInner {
253 devices: HashMap::new(),
254 }),
255 }
256 }
257
258 pub async fn add_device(
259 &self,
260 group_id: &str,
261 node_id: &str,
262 name: String,
263 dev_impl: Arc<DynDeviceMetricManager>,
264 ) -> Result<DeviceHandle, DeviceRegistrationError> {
265 let mut state = self.state.lock().await;
266 if state.devices.get_key_value(&name).is_some() {
267 return Err(DeviceRegistrationError::DuplicateDevice);
268 }
269
270 let name = Arc::new(name);
271 let mut registry = self.registry.lock().unwrap();
272 let id = registry.generate_device_id(name.clone());
273 drop(registry);
274
275 let ddata_topic = DeviceTopic::new(
276 group_id,
277 srad_types::topic::DeviceMessage::DData,
278 node_id,
279 &name,
280 );
281
282 let device = Arc::new(Device {
283 info: DeviceInfo {
284 id,
285 name: name.clone(),
286 ddata_topic,
287 },
288 birth_lock: tokio::sync::Mutex::new(()),
289 birthed: AtomicBool::new(false),
290 enabled: AtomicBool::new(false),
291 eon_state: self.eon_state.clone(),
292 dev_impl,
293 client: self.client.clone(),
294 });
295 let handle = DeviceHandle {
296 device: device.clone(),
297 };
298 device.dev_impl.init(&handle);
299 state.devices.insert(name, device);
300 drop(state);
301 Ok(handle)
302 }
303
304 pub async fn remove_device(&self, device: &String) {
305 let dev = {
306 let mut state = self.state.lock().await;
307 let dev = match state.devices.remove(device) {
308 Some(dev) => dev,
309 None => return,
310 };
311
312 {
313 let mut registry = self.registry.lock().unwrap();
314 registry.remove_device_id(dev.info.id);
315 }
316 dev
317 };
318 dev.death(true).await;
319 }
320
321 pub async fn birth_devices(&self, birth_type: BirthType) {
322 info!("Birthing Devices. Type: {:?}", birth_type);
323 let device_map = self.state.lock().await;
324 let futures: Vec<_> = device_map
325 .devices
326 .values()
327 .map(|x| x.birth(&birth_type))
328 .collect();
329 join_all(futures).await;
330 }
331
332 pub async fn on_offline(&self) {
333 let device_map = self.state.lock().await;
334 let futures: Vec<_> = device_map
335 .devices
336 .values()
337 .map(|x| x.death(false))
338 .collect();
339 join_all(futures).await;
340 }
341
342 pub async fn handle_device_message(&self, message: DeviceMessage) {
343 let dev = {
344 let state = self.state.lock().await;
345 let dev = state.devices.get(&message.device_id);
346 match dev {
347 Some(dev) => dev.clone(),
348 None => {
349 warn!("Got message for unknown device '{}'", message.device_id);
350 return;
351 }
352 }
353 };
354
355 let payload = message.message.payload;
356 let message_kind = message.message.kind;
357 if MessageKind::Cmd == message_kind {
358 let message_metrics = match payload.try_into() {
359 Ok(metrics) => metrics,
360 Err(_) => {
361 warn!(
362 "Got invalid CMD payload for device '{}' - ignoring",
363 message.device_id
364 );
365 return;
366 }
367 };
368 dev.dev_impl
369 .on_dcmd(
370 DeviceHandle {
371 device: dev.clone(),
372 },
373 message_metrics,
374 )
375 .await
376 }
377 }
378}