1use std::{
2 collections::{HashMap, HashSet},
3 hash::{DefaultHasher, Hash, Hasher},
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8};
9
10use log::{info, warn};
11use srad_client::{DeviceMessage, DynClient, Message, MessageKind};
12use srad_types::{payload::Payload, topic::DeviceTopic, utils::timestamp};
13use tokio::{select, sync::mpsc, task};
14
15use crate::{
16 birth::{BirthInitializer, BirthObjectType},
17 metric::{MetricPublisher, PublishError, PublishMetric},
18 metric_manager::manager::DynDeviceMetricManager,
19 node::{EoNState, TemplateRegistry},
20 BirthType, StateError,
21};
22use thiserror::Error;
23
24#[derive(Clone)]
25pub struct DeviceHandle {
26 node_state: Arc<EoNState>,
27 pub(crate) state: Arc<DeviceState>,
28 client: Arc<DynClient>,
29 handle_tx: mpsc::UnboundedSender<DeviceHandleRequest>,
30}
31
32impl DeviceHandle {
33 pub fn enable(&self) {
37 _ = self.handle_tx.send(DeviceHandleRequest::Enable);
38 }
39
40 pub fn rebirth(&self) {
44 _ = self.handle_tx.send(DeviceHandleRequest::Rebirth);
45 }
46
47 pub fn disable(&self) {
51 _ = self.handle_tx.send(DeviceHandleRequest::Disable);
52 }
53
54 fn check_publish_state_and_get_seq(&self) -> Result<u64, PublishError> {
55 if !self.state.birthed.load(Ordering::Relaxed) {
56 return Err(PublishError::State(StateError::UnBirthed));
57 }
58 Ok(self.node_state.get_next_seq()?)
59 }
60
61 fn publish_metrics_to_payload(&self, seq: u64, metrics: Vec<PublishMetric>) -> Payload {
62 let timestamp = timestamp();
63 let mut payload_metrics = Vec::with_capacity(metrics.len());
64 for x in metrics.into_iter() {
65 payload_metrics.push(x.into());
66 }
67 Payload {
68 timestamp: Some(timestamp),
69 metrics: payload_metrics,
70 seq: Some(seq),
71 uuid: None,
72 body: None,
73 }
74 }
75}
76
77impl MetricPublisher for DeviceHandle {
78 async fn try_publish_metrics_unsorted(
79 &self,
80 metrics: Vec<PublishMetric>,
81 ) -> Result<(), PublishError> {
82 if metrics.is_empty() {
83 return Err(PublishError::NoMetrics);
84 }
85 match self
86 .client
87 .try_publish_device_message(
88 self.state.ddata_topic.clone(),
89 self.publish_metrics_to_payload(self.check_publish_state_and_get_seq()?, metrics),
90 )
91 .await
92 {
93 Ok(_) => Ok(()),
94 Err(_) => Err(PublishError::State(StateError::Offline)),
95 }
96 }
97
98 async fn publish_metrics_unsorted(
99 &self,
100 metrics: Vec<PublishMetric>,
101 ) -> Result<(), PublishError> {
102 if metrics.is_empty() {
103 return Err(PublishError::NoMetrics);
104 }
105 match self
106 .client
107 .publish_device_message(
108 self.state.ddata_topic.clone(),
109 self.publish_metrics_to_payload(self.check_publish_state_and_get_seq()?, metrics),
110 )
111 .await
112 {
113 Ok(_) => Ok(()),
114 Err(_) => Err(PublishError::State(StateError::Offline)),
115 }
116 }
117}
118
119pub(crate) struct DeviceState {
120 birthed: AtomicBool,
121 id: DeviceId,
122 pub(crate) name: Arc<String>,
123 ddata_topic: DeviceTopic,
124}
125
126#[derive(Debug)]
127enum DeviceHandleRequest {
128 Enable,
129 Disable,
130 Rebirth,
131}
132
133pub struct Device {
134 state: Arc<DeviceState>,
135 template_registry: Arc<TemplateRegistry>,
136 eon_state: Arc<EoNState>,
137 dev_impl: Box<DynDeviceMetricManager>,
138 client: Arc<DynClient>,
139 enabled: bool,
140 handle_request_tx: mpsc::UnboundedSender<DeviceHandleRequest>,
141 device_message_rx: mpsc::UnboundedReceiver<Message>,
142 node_state_rx: mpsc::UnboundedReceiver<NodeStateMessage>,
143 handle_request_rx: mpsc::UnboundedReceiver<DeviceHandleRequest>,
144}
145
146impl Device {
147 fn generate_birth_payload(&self, seq: u64) -> Payload {
148 let mut birth_initializer = BirthInitializer::new(
149 BirthObjectType::Device(self.state.id),
150 self.template_registry.clone(),
151 );
152 self.dev_impl.initialise_birth(&mut birth_initializer);
153 let timestamp = timestamp();
154 let metrics = birth_initializer.finish();
155 Payload {
156 seq: Some(seq),
157 timestamp: Some(timestamp),
158 metrics,
159 uuid: None,
160 body: None,
161 }
162 }
163
164 fn generate_death_payload(&self, seq: u64) -> Payload {
165 let timestamp = timestamp();
166 Payload {
167 seq: Some(seq),
168 timestamp: Some(timestamp),
169 metrics: Vec::new(),
170 uuid: None,
171 body: None,
172 }
173 }
174
175 async fn birth(&self, birth_type: &BirthType) {
176 if !self.enabled {
177 return;
178 }
179
180 if *birth_type == BirthType::Birth && self.state.birthed.load(Ordering::SeqCst) {
181 return;
182 }
183
184 let seq = match self.eon_state.get_next_seq() {
185 Ok(seq) => seq,
186 Err(_) => return,
187 };
188
189 self.state.birthed.store(false, Ordering::SeqCst);
190
191 let payload = self.generate_birth_payload(seq);
192 if self
193 .client
194 .publish_device_message(
195 DeviceTopic::new(
196 &self.eon_state.group_id,
197 srad_types::topic::DeviceMessage::DBirth,
198 &self.eon_state.edge_node_id,
199 &self.state.name,
200 ),
201 payload,
202 )
203 .await
204 .is_ok()
205 {
206 self.state.birthed.store(true, Ordering::SeqCst);
207 info!(
208 "Device birthed. Node = {}, Device = {}, Type = {:?}",
209 self.eon_state.edge_node_id, self.state.name, birth_type
210 );
211 }
212 }
213
214 async fn death(&self, publish: bool) {
215 if !self.state.birthed.load(Ordering::SeqCst) {
216 return;
217 }
218
219 self.state.birthed.store(false, Ordering::SeqCst);
220
221 info!(
222 "Device dead. Node = {}, Device = {}",
223 self.eon_state.edge_node_id, self.state.name
224 );
225
226 if publish {
227 let seq = match self.eon_state.get_next_seq() {
230 Ok(seq) => seq,
231 Err(_) => return,
232 };
233
234 let payload = self.generate_death_payload(seq);
235 _ = self
236 .client
237 .publish_device_message(
238 DeviceTopic::new(
239 &self.eon_state.group_id,
240 srad_types::topic::DeviceMessage::DDeath,
241 &self.eon_state.edge_node_id,
242 &self.state.name,
243 ),
244 payload,
245 )
246 .await;
247 }
248 }
249
250 async fn handle_sparkplug_message(&self, message: Message, handle: DeviceHandle) {
251 let payload = message.payload;
252 let message_kind = message.kind;
253 if MessageKind::Cmd == message_kind {
254 let message_metrics = match payload.try_into() {
255 Ok(metrics) => metrics,
256 Err(_) => {
257 warn!(
258 "Got invalid CMD payload for device - ignoring. Device = {}",
259 self.state.name
260 );
261 return;
262 }
263 };
264 self.dev_impl.on_dcmd(handle, message_metrics).await
265 }
266 }
267
268 fn create_handle(&self) -> DeviceHandle {
269 DeviceHandle {
270 node_state: self.eon_state.clone(),
271 state: self.state.clone(),
272 client: self.client.clone(),
273 handle_tx: self.handle_request_tx.clone(),
274 }
275 }
276
277 async fn enable(&mut self) {
278 self.enabled = true;
279 self.birth(&BirthType::Birth).await
280 }
281
282 async fn disable(&mut self) {
283 self.enabled = false;
284 self.death(true).await
285 }
286
287 async fn run(mut self) {
288 loop {
289 select! {
290 biased;
291 maybe_state_update = self.node_state_rx.recv() => match maybe_state_update {
292 Some(state_update) => match state_update {
293 NodeStateMessage::Birth(birth_type, new_template_registry) => {
294 self.template_registry = new_template_registry;
295 self.birth(&birth_type).await
296 },
297 NodeStateMessage::Death => self.death(false).await,
298 NodeStateMessage::Removed => {
299 self.death(true).await;
300 break;
301 },
302 },
303 None => break, },
305 Some(request) = self.handle_request_rx.recv() => {
306 match request {
307 DeviceHandleRequest::Enable => self.enable().await,
308 DeviceHandleRequest::Disable => self.disable().await,
309 DeviceHandleRequest::Rebirth => self.birth(&BirthType::Rebirth).await,
310 }
311 },
312 maybe_message = self.device_message_rx.recv() => match maybe_message {
313 Some(message) => self.handle_sparkplug_message(message, self.create_handle()).await,
314 None => break,
315 }
316
317 }
318 }
319 }
320}
321
322#[derive(Debug)]
323enum NodeStateMessage {
324 Birth(BirthType, Arc<TemplateRegistry>),
325 Death,
326 Removed,
327}
328
329#[derive(Error, Debug)]
330pub enum DeviceRegistrationError {
331 #[error("Duplicate device")]
332 DuplicateDevice,
333 #[error("Invalid Device name: {0}")]
334 InvalidName(String),
335}
336
337struct DeviceMapEntry {
338 id: DeviceId,
339 device_message_tx: mpsc::UnboundedSender<Message>,
340 node_state_tx: mpsc::UnboundedSender<NodeStateMessage>,
341}
342
343pub struct DeviceMap {
344 device_ids: HashSet<DeviceId>,
345 devices: HashMap<Arc<String>, DeviceMapEntry>,
346 template_registry: Arc<TemplateRegistry>,
347}
348
349const OBJECT_ID_NODE: u32 = 0;
350pub type DeviceId = u32;
351
352impl DeviceMap {
353 pub(crate) fn new(template_registry: Arc<TemplateRegistry>) -> Self {
354 Self {
355 device_ids: HashSet::new(),
356 devices: HashMap::new(),
357 template_registry,
358 }
359 }
360
361 fn generate_device_id(&mut self, name: &String) -> DeviceId {
362 let mut hasher = DefaultHasher::new();
363 name.hash(&mut hasher);
364 let mut id = hasher.finish() as DeviceId;
365 while id == OBJECT_ID_NODE || self.device_ids.contains(&id) {
366 id += 1;
367 }
368 self.device_ids.insert(id);
369 id
370 }
371
372 fn remove_device_id(&mut self, id: DeviceId) {
373 self.device_ids.remove(&id);
374 }
375
376 pub(crate) fn add_device(
377 &mut self,
378 group_id: &str,
379 node_id: &str,
380 name: String,
381 dev_impl: Box<DynDeviceMetricManager>,
382 eon_state: Arc<EoNState>,
383 client: Arc<DynClient>,
384 ) -> Result<DeviceHandle, DeviceRegistrationError> {
385 if self.devices.get_key_value(&name).is_some() {
386 return Err(DeviceRegistrationError::DuplicateDevice);
387 }
388
389 let name = Arc::new(name);
390 let id = self.generate_device_id(&name);
391
392 let (device_message_tx, device_message_rx) = mpsc::unbounded_channel();
393 let (handle_request_tx, handle_request_rx) = mpsc::unbounded_channel();
394 let (node_state_tx, node_state_rx) = mpsc::unbounded_channel();
395
396 let device_map_entry = DeviceMapEntry {
397 id,
398 device_message_tx,
399 node_state_tx,
400 };
401
402 let device = Device {
403 state: Arc::new(DeviceState {
404 id,
405 name: name.clone(),
406 ddata_topic: DeviceTopic::new(
407 group_id,
408 srad_types::topic::DeviceMessage::DData,
409 node_id,
410 &name,
411 ),
412 birthed: AtomicBool::new(false),
413 }),
414 template_registry: self.template_registry.clone(),
415 enabled: false,
416 eon_state,
417 dev_impl,
418 client,
419 handle_request_tx,
420 device_message_rx,
421 node_state_rx,
422 handle_request_rx,
423 };
424
425 let handle = device.create_handle();
426 device.dev_impl.init(&handle);
427
428 task::spawn(async move { device.run().await });
429 self.devices.insert(name, device_map_entry);
430 Ok(handle)
431 }
432
433 pub(crate) fn remove_device(&mut self, device: &String) {
434 let entry = {
435 let entry = match self.devices.remove(device) {
436 Some(entry) => entry,
437 None => return,
438 };
439
440 self.remove_device_id(entry.id);
441 entry
442 };
443 _ = entry.node_state_tx.send(NodeStateMessage::Removed);
444 }
445
446 pub(crate) fn birth_devices(
447 &mut self,
448 birth_type: BirthType,
449 new_template_registry: &Arc<TemplateRegistry>,
450 ) {
451 self.template_registry = new_template_registry.clone();
452 info!("Birthing Devices. Type = {birth_type:?}");
453 for entry in self.devices.values() {
454 _ = entry.node_state_tx.send(NodeStateMessage::Birth(
455 birth_type,
456 new_template_registry.clone(),
457 ));
458 }
459 }
460
461 pub(crate) fn on_death(&self) {
462 for entry in self.devices.values() {
463 _ = entry.node_state_tx.send(NodeStateMessage::Death);
464 }
465 }
466
467 pub(crate) fn handle_device_message(&self, message: DeviceMessage) {
468 let entry = {
469 match self.devices.get(&message.device_id) {
470 Some(entry) => entry,
471 None => {
472 warn!(
473 "Got message for unknown device. Device = '{}'",
474 message.device_id
475 );
476 return;
477 }
478 }
479 };
480 _ = entry.device_message_tx.send(message.message);
481 }
482}