1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3 birth::{BirthInitializer, BirthMetricDetails},
4 device::DeviceHandle,
5 metric::{
6 MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7 },
8 NodeHandle, StateError,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetaData, MetricId, PropertySet};
13use std::{
14 collections::HashMap,
15 future::Future,
16 ops::DerefMut,
17 pin::Pin,
18 sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22 dyn Fn(
23 SimpleMetricManager<H>,
24 SimpleManagerMetric<T, H>,
25 Option<T>,
26 ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27 + Send
28 + Sync,
29>;
30
31struct MetricData<T, H> {
32 value: T,
33 token: Option<MetricToken<T>>,
34 cb: Option<CmdCallback<T, H>>,
35 use_alias: bool,
36}
37
38pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
39
40impl SimpleManagerPublishMetric {
41 pub fn with_timestamp(self, timestamp: u64) -> Self {
45 SimpleManagerPublishMetric(self.0.map(|m| m.timestamp(timestamp)))
46 }
47
48 pub fn with_transient(self, is_transient: bool) -> Self {
52 SimpleManagerPublishMetric(self.0.map(|m| m.transient(is_transient)))
53 }
54
55 pub fn with_historical(self, is_historical: bool) -> Self {
59 SimpleManagerPublishMetric(self.0.map(|m| m.historical(is_historical)))
60 }
61
62 pub fn with_metadata(self, metadata: MetaData) -> Self {
66 SimpleManagerPublishMetric(self.0.map(|m| m.metadata(metadata)))
67 }
68
69 pub fn with_properties<P: Into<PropertySet>>(self, properties: P) -> Self {
71 SimpleManagerPublishMetric(self.0.map(|m| m.properties(properties)))
72 }
73}
74
75#[derive(Clone)]
76pub struct SimpleManagerMetric<T, H> {
77 data: Arc<Mutex<MetricData<T, H>>>,
78}
79
80impl<T, H> SimpleManagerMetric<T, H>
81where
82 T: traits::MetricValue + Clone,
83{
84 pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
85 where
86 F1: Fn(&mut T),
87 {
88 let mut guard = self.data.lock().unwrap();
89 let x = guard.deref_mut();
90 f(&mut x.value);
91 let option = match &x.token {
92 Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
93 None => None,
94 };
95 SimpleManagerPublishMetric(option)
96 }
97}
98
99#[async_trait]
100trait Stored<H>: Send {
101 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
102 fn has_callback(&self) -> bool;
103 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
104}
105
106#[async_trait]
107impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
108where
109 T: traits::MetricValue + Clone + Send,
110 H: Send + Sync,
111{
112 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
113 let mut metric = self.data.lock().unwrap();
114 let val = metric.value.clone();
115 let token = bi
116 .register_metric(
117 BirthMetricDetails::new_with_initial_value(name, val).use_alias(metric.use_alias),
118 )
119 .unwrap();
120 let id = token.id.clone();
121 metric.token = Some(token);
122 id
123 }
124
125 fn has_callback(&self) -> bool {
126 self.data.lock().unwrap().cb.is_some()
127 }
128
129 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
130 let cb = {
131 let metric = self.data.lock().unwrap();
132 match &metric.cb {
133 Some(cb) => cb.clone(),
134 None => return,
135 }
136 };
137
138 let converted = match value.value {
139 Some(v) => match v.try_into() {
140 Ok(value) => Some(value),
141 Err(_) => return,
142 },
143 None => None,
144 };
145
146 let x = SimpleManagerMetric {
147 data: self.data.clone(),
148 };
149 cb(manager, x, converted).await
150 }
151}
152
153pub struct SimpleMetricBuilder<T, H> {
155 name: String,
156 initial_value: T,
157 use_alias: bool,
158 cmd_cb: Option<CmdCallback<T, H>>,
159}
160
161impl<T, H> SimpleMetricBuilder<T, H>
162where
163 T: traits::MetricValue + Clone + Send + 'static,
164{
165 pub fn new<S: Into<String>>(name: S, value: T) -> Self {
167 Self {
168 name: name.into(),
169 initial_value: value,
170 use_alias: true,
171 cmd_cb: None,
172 }
173 }
174
175 pub fn with_cmd_handler<F, Fut>(mut self, cb: F) -> Self
181 where
182 F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
183 + Send
184 + Sync
185 + 'static,
186 Fut: Future<Output = ()> + Send + 'static,
187 {
188 self.cmd_cb = Some(Arc::new(
189 move |manager: SimpleMetricManager<H>,
190 metric: SimpleManagerMetric<T, H>,
191 value: Option<T>|
192 -> Pin<Box<dyn Future<Output = ()> + Send>> {
193 Box::pin(cb(manager, metric, value))
194 },
195 ));
196 self
197 }
198
199 pub fn use_alias(mut self, use_alias: bool) -> Self {
201 self.use_alias = use_alias;
202 self
203 }
204}
205
206struct SimpleMetricManagerInner<H> {
207 handle: Option<H>,
208 metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
209 cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
210}
211
212#[derive(Clone)]
246pub struct SimpleMetricManager<H> {
247 inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
248}
249
250impl<H> SimpleMetricManager<H>
251where
252 H: MetricPublisher + Clone + Send + Sync + 'static,
253{
254 pub fn new() -> Self {
258 Self {
259 inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
260 handle: None,
261 metrics: HashMap::new(),
262 cmd_lookup: HashMap::new(),
263 })),
264 }
265 }
266
267 pub fn register_metric<T>(
272 &self,
273 builder: SimpleMetricBuilder<T, H>,
274 ) -> Option<SimpleManagerMetric<T, H>>
275 where
276 T: traits::MetricValue + Clone + Send + 'static,
277 {
278 let mut manager = self.inner.lock().unwrap();
279 if manager.metrics.contains_key(&builder.name) {
280 return None;
281 }
282
283 let metric = SimpleManagerMetric {
284 data: Arc::new(Mutex::new(MetricData {
285 value: builder.initial_value,
286 token: None,
287 cb: builder.cmd_cb,
288 use_alias: builder.use_alias,
289 })),
290 };
291 let metric_insert = Arc::new(metric.clone());
292 manager.metrics.insert(builder.name, metric_insert);
293 Some(metric)
294 }
295
296 fn get_callbacks_from_cmd_message_metrics(
297 &self,
298 metrics: MessageMetrics,
299 ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
300 let manager = self.inner.lock().unwrap();
301 let mut cbs = Vec::with_capacity(metrics.len());
302 for metric in metrics {
303 match manager.cmd_lookup.get(&metric.id) {
304 Some(res) => cbs.push((res.clone(), metric)),
305 None => continue,
306 };
307 }
308 cbs
309 }
310
311 async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
312 let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
313 let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
314 .into_iter()
315 .map(|(stored, value)| {
316 Box::pin({
317 let handle = self.clone();
318 async move { stored.cmd_cb(handle, value).await }
319 }) as Pin<Box<dyn Future<Output = ()> + Send>>
320 })
321 .collect();
322 join_all(futures).await;
323 }
324
325 pub async fn publish_metric(
329 &self,
330 metric: SimpleManagerPublishMetric,
331 ) -> Result<(), PublishError> {
332 self.publish_metrics(vec![metric]).await
333 }
334
335 pub async fn publish_metrics(
339 &self,
340 metrics: Vec<SimpleManagerPublishMetric>,
341 ) -> Result<(), PublishError> {
342 let handle = {
343 match &self.inner.lock().unwrap().handle {
344 Some(handle) => handle.clone(),
345 None => return Err(PublishError::State(StateError::UnBirthed)),
346 }
347 };
348
349 let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
350 handle.publish_metrics(publish_metrics).await
351 }
352}
353
354impl<H> Default for SimpleMetricManager<H>
355where
356 H: MetricPublisher + Clone + Send + Sync + 'static,
357{
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363impl<H> MetricManager for SimpleMetricManager<H> {
364 fn initialise_birth(&self, bi: &mut BirthInitializer) {
365 let mut manager = self.inner.lock().unwrap();
366
367 let mut cmd_lookup = vec![];
368 manager.metrics.iter_mut().for_each(|(i, x)| {
369 let id = x.birth_metric(i, bi);
370 if x.has_callback() {
371 cmd_lookup.push((id, x.clone()));
372 }
373 });
374 manager.cmd_lookup = cmd_lookup.into_iter().collect();
375 }
376}
377
378#[async_trait]
379impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
380 fn init(&self, handle: &NodeHandle) {
381 self.inner.lock().unwrap().handle = Some(handle.clone())
382 }
383
384 async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
385 self.handle_cmd_metrics(metrics).await
386 }
387}
388
389#[async_trait]
390impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
391 fn init(&self, handle: &DeviceHandle) {
392 self.inner.lock().unwrap().handle = Some(handle.clone())
393 }
394
395 async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
396 self.handle_cmd_metrics(metrics).await
397 }
398}