1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3 birth::{BirthInitializer, BirthMetricDetails},
4 device::DeviceHandle,
5 metric::{
6 MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7 },
8 NodeHandle, StateError,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetricId};
13use std::{
14 collections::HashMap,
15 future::Future,
16 ops::DerefMut,
17 pin::Pin,
18 sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22 dyn Fn(
23 SimpleMetricManager<H>,
24 SimpleManagerMetric<T, H>,
25 Option<T>,
26 ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27 + Send
28 + Sync,
29>;
30
31struct MetricData<T, H> {
32 value: T,
33 token: Option<MetricToken<T>>,
34 cb: Option<CmdCallback<T, H>>,
35 use_alias: bool,
36}
37
38pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
39#[derive(Clone)]
40pub struct SimpleManagerMetric<T, H> {
41 data: Arc<Mutex<MetricData<T, H>>>,
42}
43
44impl<T, H> SimpleManagerMetric<T, H>
45where
46 T: traits::MetricValue + Clone,
47{
48 pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
49 where
50 F1: Fn(&mut T),
51 {
52 let mut guard = self.data.lock().unwrap();
53 let x = guard.deref_mut();
54 f(&mut x.value);
55 let option = match &x.token {
56 Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
57 None => None,
58 };
59 SimpleManagerPublishMetric(option)
60 }
61}
62
63#[async_trait]
64trait Stored<H>: Send {
65 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
66 fn has_callback(&self) -> bool;
67 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
68}
69
70#[async_trait]
71impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
72where
73 T: traits::MetricValue + Clone + Send,
74 H: Send + Sync,
75{
76 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
77 let mut metric = self.data.lock().unwrap();
78 let val = metric.value.clone();
79 let token = bi
80 .register_metric(
81 BirthMetricDetails::new_with_initial_value(name, val).use_alias(metric.use_alias),
82 )
83 .unwrap();
84 let id = token.id.clone();
85 metric.token = Some(token);
86 id
87 }
88
89 fn has_callback(&self) -> bool {
90 self.data.lock().unwrap().cb.is_some()
91 }
92
93 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
94 let cb = {
95 let metric = self.data.lock().unwrap();
96 match &metric.cb {
97 Some(cb) => cb.clone(),
98 None => return,
99 }
100 };
101
102 let converted = match value.value {
103 Some(v) => match v.try_into() {
104 Ok(value) => Some(value),
105 Err(_) => return,
106 },
107 None => None,
108 };
109
110 let x = SimpleManagerMetric {
111 data: self.data.clone(),
112 };
113 cb(manager, x, converted).await
114 }
115}
116
117pub struct SimpleMetricBuilder<T, H> {
119 name: String,
120 initial_value: T,
121 use_alias: bool,
122 cmd_cb: Option<CmdCallback<T, H>>,
123}
124
125impl<T, H> SimpleMetricBuilder<T, H>
126where
127 T: traits::MetricValue + Clone + Send + 'static,
128{
129 pub fn new<S: Into<String>>(name: S, value: T) -> Self {
131 Self {
132 name: name.into(),
133 initial_value: value,
134 use_alias: true,
135 cmd_cb: None,
136 }
137 }
138
139 pub fn with_cmd_handler<F, Fut>(mut self, cb: F) -> Self
145 where
146 F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
147 + Send
148 + Sync
149 + 'static,
150 Fut: Future<Output = ()> + Send + 'static,
151 {
152 self.cmd_cb = Some(Arc::new(
153 move |manager: SimpleMetricManager<H>,
154 metric: SimpleManagerMetric<T, H>,
155 value: Option<T>|
156 -> Pin<Box<dyn Future<Output = ()> + Send>> {
157 Box::pin(cb(manager, metric, value))
158 },
159 ));
160 self
161 }
162
163 pub fn use_alias(mut self, use_alias: bool) -> Self {
165 self.use_alias = use_alias;
166 self
167 }
168}
169
170struct SimpleMetricManagerInner<H> {
171 handle: Option<H>,
172 metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
173 cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
174}
175
176#[derive(Clone)]
210pub struct SimpleMetricManager<H> {
211 inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
212}
213
214impl<H> SimpleMetricManager<H>
215where
216 H: MetricPublisher + Clone + Send + Sync + 'static,
217{
218 pub fn new() -> Self {
222 Self {
223 inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
224 handle: None,
225 metrics: HashMap::new(),
226 cmd_lookup: HashMap::new(),
227 })),
228 }
229 }
230
231 pub fn register_metric<T>(
236 &self,
237 builder: SimpleMetricBuilder<T, H>,
238 ) -> Option<SimpleManagerMetric<T, H>>
239 where
240 T: traits::MetricValue + Clone + Send + 'static,
241 {
242 let mut manager = self.inner.lock().unwrap();
243 if manager.metrics.contains_key(&builder.name) {
244 return None;
245 }
246
247 let metric = SimpleManagerMetric {
248 data: Arc::new(Mutex::new(MetricData {
249 value: builder.initial_value,
250 token: None,
251 cb: builder.cmd_cb,
252 use_alias: builder.use_alias,
253 })),
254 };
255 let metric_insert = Arc::new(metric.clone());
256 manager.metrics.insert(builder.name, metric_insert);
257 Some(metric)
258 }
259
260 fn get_callbacks_from_cmd_message_metrics(
261 &self,
262 metrics: MessageMetrics,
263 ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
264 let manager = self.inner.lock().unwrap();
265 let mut cbs = Vec::with_capacity(metrics.len());
266 for metric in metrics {
267 match manager.cmd_lookup.get(&metric.id) {
268 Some(res) => cbs.push((res.clone(), metric)),
269 None => continue,
270 };
271 }
272 cbs
273 }
274
275 async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
276 let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
277 let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
278 .into_iter()
279 .map(|(stored, value)| {
280 Box::pin({
281 let handle = self.clone();
282 async move { stored.cmd_cb(handle, value).await }
283 }) as Pin<Box<dyn Future<Output = ()> + Send>>
284 })
285 .collect();
286 join_all(futures).await;
287 }
288
289 pub async fn publish_metric(
293 &self,
294 metric: SimpleManagerPublishMetric,
295 ) -> Result<(), PublishError> {
296 self.publish_metrics(vec![metric]).await
297 }
298
299 pub async fn publish_metrics(
303 &self,
304 metrics: Vec<SimpleManagerPublishMetric>,
305 ) -> Result<(), PublishError> {
306 let handle = {
307 match &self.inner.lock().unwrap().handle {
308 Some(handle) => handle.clone(),
309 None => return Err(PublishError::State(StateError::UnBirthed)),
310 }
311 };
312
313 let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
314 handle.publish_metrics(publish_metrics).await
315 }
316}
317
318impl<H> Default for SimpleMetricManager<H>
319where
320 H: MetricPublisher + Clone + Send + Sync + 'static,
321{
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327impl<H> MetricManager for SimpleMetricManager<H> {
328 fn initialise_birth(&self, bi: &mut BirthInitializer) {
329 let mut manager = self.inner.lock().unwrap();
330
331 let mut cmd_lookup = vec![];
332 manager.metrics.iter_mut().for_each(|(i, x)| {
333 let id = x.birth_metric(i, bi);
334 if x.has_callback() {
335 cmd_lookup.push((id, x.clone()));
336 }
337 });
338 manager.cmd_lookup = cmd_lookup.into_iter().collect();
339 }
340}
341
342#[async_trait]
343impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
344 fn init(&self, handle: &NodeHandle) {
345 self.inner.lock().unwrap().handle = Some(handle.clone())
346 }
347
348 async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
349 self.handle_cmd_metrics(metrics).await
350 }
351}
352
353#[async_trait]
354impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
355 fn init(&self, handle: &DeviceHandle) {
356 self.inner.lock().unwrap().handle = Some(handle.clone())
357 }
358
359 async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
360 self.handle_cmd_metrics(metrics).await
361 }
362}