1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3 birth::{BirthInitializer, BirthMetricDetails},
4 device::DeviceHandle,
5 metric::{
6 MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7 },
8 NodeHandle,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetricId};
13use std::{
14 collections::HashMap,
15 future::Future,
16 ops::DerefMut,
17 pin::Pin,
18 sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22 dyn Fn(
23 SimpleMetricManager<H>,
24 SimpleManagerMetric<T, H>,
25 Option<T>,
26 ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27 + Send
28 + Sync,
29>;
30
31struct MetricData<T, H> {
32 value: T,
33 token: Option<MetricToken<T>>,
34 cb: Option<CmdCallback<T, H>>,
35}
36
37pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
38#[derive(Clone)]
39pub struct SimpleManagerMetric<T, H> {
40 data: Arc<Mutex<MetricData<T, H>>>,
41}
42
43impl<T, H> SimpleManagerMetric<T, H>
44where
45 T: traits::MetricValue + Clone,
46{
47 pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
48 where
49 F1: Fn(&mut T),
50 {
51 let mut guard = self.data.lock().unwrap();
52 let x = guard.deref_mut();
53 f(&mut x.value);
54 let option = match &x.token {
55 Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
56 None => None,
57 };
58 SimpleManagerPublishMetric(option)
59 }
60}
61
62#[async_trait]
63trait Stored<H>: Send {
64 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
65 fn has_callback(&self) -> bool;
66 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
67}
68
69#[async_trait]
70impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
71where
72 T: traits::MetricValue + Clone + Send,
73 H: Send + Sync,
74{
75 fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
76 let mut metric = self.data.lock().unwrap();
77 let val = metric.value.clone();
78 let token = bi
79 .register_metric(BirthMetricDetails::new_with_initial_value(name, val).use_alias(true))
80 .unwrap();
81 let id = token.id.clone();
82 metric.token = Some(token);
83 id
84 }
85
86 fn has_callback(&self) -> bool {
87 self.data.lock().unwrap().cb.is_some()
88 }
89
90 async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
91 let cb = {
92 let metric = self.data.lock().unwrap();
93 match &metric.cb {
94 Some(cb) => cb.clone(),
95 None => return,
96 }
97 };
98
99 let converted = match value.value {
100 Some(v) => match v.try_into() {
101 Ok(value) => Some(value),
102 Err(_) => return,
103 },
104 None => None,
105 };
106
107 let x = SimpleManagerMetric {
108 data: self.data.clone(),
109 };
110 cb(manager, x, converted).await
111 }
112}
113
114struct SimpleMetricManagerInner<H> {
115 handle: Option<H>,
116 metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
117 cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
118}
119
120#[derive(Clone)]
153pub struct SimpleMetricManager<H> {
154 inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
155}
156
157impl<H> SimpleMetricManager<H>
158where
159 H: MetricPublisher + Clone + Send + Sync + 'static,
160{
161 pub fn new() -> Self {
165 Self {
166 inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
167 handle: None,
168 metrics: HashMap::new(),
169 cmd_lookup: HashMap::new(),
170 })),
171 }
172 }
173
174 fn register<T>(
175 &self,
176 name: String,
177 value: T,
178 cb: Option<CmdCallback<T, H>>,
179 ) -> Option<SimpleManagerMetric<T, H>>
180 where
181 T: traits::MetricValue + Clone + Send + 'static,
182 {
183 let mut manager = self.inner.lock().unwrap();
184 if manager.metrics.contains_key(&name) {
185 return None;
186 }
187
188 let metric = SimpleManagerMetric {
189 data: Arc::new(Mutex::new(MetricData {
190 value,
191 token: None,
192 cb,
193 })),
194 };
195 let metric_insert = Arc::new(metric.clone());
196 manager.metrics.insert(name, metric_insert);
197 Some(metric)
198 }
199
200 pub fn register_metric<S, T>(&self, name: S, value: T) -> Option<SimpleManagerMetric<T, H>>
205 where
206 S: Into<String>,
207 T: traits::MetricValue + Clone + Send + 'static,
208 {
209 self.register::<T>(name.into(), value, None)
210 }
211
212 pub fn register_metric_with_cmd_handler<S, T, F, Fut>(
222 &self,
223 name: S,
224 value: T,
225 cmd_handler: F,
226 ) -> Option<SimpleManagerMetric<T, H>>
227 where
228 S: Into<String>,
229 T: traits::MetricValue + Clone + Send + 'static,
230 F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
231 + Send
232 + Sync
233 + 'static,
234 Fut: Future<Output = ()> + Send + 'static,
235 {
236 let cmd_handler = Arc::new(
237 move |manager: SimpleMetricManager<H>,
238 metric: SimpleManagerMetric<T, H>,
239 value: Option<T>|
240 -> Pin<Box<dyn Future<Output = ()> + Send>> {
241 Box::pin(cmd_handler(manager, metric, value))
242 },
243 );
244 self.register::<T>(name.into(), value, Some(cmd_handler))
245 }
246
247 fn get_callbacks_from_cmd_message_metrics(
248 &self,
249 metrics: MessageMetrics,
250 ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
251 let manager = self.inner.lock().unwrap();
252 let mut cbs = Vec::with_capacity(metrics.len());
253 for metric in metrics {
254 match manager.cmd_lookup.get(&metric.id) {
255 Some(res) => cbs.push((res.clone(), metric)),
256 None => continue,
257 };
258 }
259 cbs
260 }
261
262 async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
263 let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
264 let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
265 .into_iter()
266 .map(|(stored, value)| {
267 Box::pin({
268 let handle = self.clone();
269 async move { stored.cmd_cb(handle, value).await }
270 }) as Pin<Box<dyn Future<Output = ()> + Send>>
271 })
272 .collect();
273 join_all(futures).await;
274 }
275
276 pub async fn publish_metric(
280 &self,
281 metric: SimpleManagerPublishMetric,
282 ) -> Result<(), PublishError> {
283 self.publish_metrics(vec![metric]).await
284 }
285
286 pub async fn publish_metrics(
290 &self,
291 metrics: Vec<SimpleManagerPublishMetric>,
292 ) -> Result<(), PublishError> {
293 let handle = {
294 match &self.inner.lock().unwrap().handle {
295 Some(handle) => handle.clone(),
296 None => return Err(PublishError::UnBirthed),
297 }
298 };
299
300 let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
301 handle.publish_metrics(publish_metrics).await
302 }
303}
304
305impl<H> Default for SimpleMetricManager<H>
306where
307 H: MetricPublisher + Clone + Send + Sync + 'static,
308{
309 fn default() -> Self {
310 Self::new()
311 }
312}
313
314impl<H> MetricManager for SimpleMetricManager<H> {
315 fn initialise_birth(&self, bi: &mut BirthInitializer) {
316 let mut manager = self.inner.lock().unwrap();
317
318 let mut cmd_lookup = vec![];
319 manager.metrics.iter_mut().for_each(|(i, x)| {
320 let id = x.birth_metric(i, bi);
321 if x.has_callback() {
322 cmd_lookup.push((id, x.clone()));
323 }
324 });
325 manager.cmd_lookup = cmd_lookup.into_iter().collect();
326 }
327}
328
329#[async_trait]
330impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
331 fn init(&self, handle: &NodeHandle) {
332 self.inner.lock().unwrap().handle = Some(handle.clone())
333 }
334
335 async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
336 self.handle_cmd_metrics(metrics).await
337 }
338}
339
340#[async_trait]
341impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
342 fn init(&self, handle: &DeviceHandle) {
343 self.inner.lock().unwrap().handle = Some(handle.clone())
344 }
345
346 async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
347 self.handle_cmd_metrics(metrics).await
348 }
349}