srad_eon/metric_manager/
simple.rs

1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3    birth::{BirthInitializer, BirthMetricDetails},
4    device::DeviceHandle,
5    metric::{
6        MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7    },
8    NodeHandle,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetricId};
13use std::{
14    collections::HashMap,
15    future::Future,
16    ops::DerefMut,
17    pin::Pin,
18    sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22    dyn Fn(
23            SimpleMetricManager<H>,
24            SimpleManagerMetric<T, H>,
25            Option<T>,
26        ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27        + Send
28        + Sync,
29>;
30
31struct MetricData<T, H> {
32    value: T,
33    token: Option<MetricToken<T>>,
34    cb: Option<CmdCallback<T, H>>,
35}
36
37pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
38#[derive(Clone)]
39pub struct SimpleManagerMetric<T, H> {
40    data: Arc<Mutex<MetricData<T, H>>>,
41}
42
43impl<T, H> SimpleManagerMetric<T, H>
44where
45    T: traits::MetricValue + Clone,
46{
47    pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
48    where
49        F1: Fn(&mut T),
50    {
51        let mut guard = self.data.lock().unwrap();
52        let x = guard.deref_mut();
53        f(&mut x.value);
54        let option = match &x.token {
55            Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
56            None => None,
57        };
58        SimpleManagerPublishMetric(option)
59    }
60}
61
62#[async_trait]
63trait Stored<H>: Send {
64    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
65    fn has_callback(&self) -> bool;
66    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
67}
68
69#[async_trait]
70impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
71where
72    T: traits::MetricValue + Clone + Send,
73    H: Send + Sync,
74{
75    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
76        let mut metric = self.data.lock().unwrap();
77        let val = metric.value.clone();
78        let token = bi
79            .register_metric(BirthMetricDetails::new_with_initial_value(name, val).use_alias(true))
80            .unwrap();
81        let id = token.id.clone();
82        metric.token = Some(token);
83        id
84    }
85
86    fn has_callback(&self) -> bool {
87        self.data.lock().unwrap().cb.is_some()
88    }
89
90    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
91        let cb = {
92            let metric = self.data.lock().unwrap();
93            match &metric.cb {
94                Some(cb) => cb.clone(),
95                None => return,
96            }
97        };
98
99        let converted = match value.value {
100            Some(v) => match v.try_into() {
101                Ok(value) => Some(value),
102                Err(_) => return,
103            },
104            None => None,
105        };
106
107        let x = SimpleManagerMetric {
108            data: self.data.clone(),
109        };
110        cb(manager, x, converted).await
111    }
112}
113
114struct SimpleMetricManagerInner<H> {
115    handle: Option<H>,
116    metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
117    cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
118}
119
120/// A [MetricManager] implementation that provides simple metric registration and handling.
121///
122/// `SimpleMetricManager` provides a simple way to manage metrics with support for
123/// command callbacks and metric publishing.
124/// # Example
125/// ```no_run
126/// use srad_eon::SimpleMetricManager;
127/// # use srad_eon::DeviceHandle;
128///
129/// # fn create_device_with_manager(manager: &SimpleMetricManager<DeviceHandle>) {
130/// #   unimplemented!()
131/// # }
132/// //Create a new simple metric manager
133/// let manager = SimpleMetricManager::new();
134///
135/// // Assume we successfully create a device with a SimpleMetricManager as it's metrics manager
136/// create_device_with_manager(&manager);
137///
138/// // Register a simple metric
139/// let counter = manager.register_metric("Counter", 0 as i32).unwrap();
140///
141/// // Register a metric with a command handler
142/// let temperature = manager.register_metric_with_cmd_handler(
143///     "temperature",
144///     25.5,
145///     |mgr, metric, new_value| async move {
146///         if let Some(value) = new_value {
147///           mgr.publish_metric(metric.update(|x|{ *x = value })).await;
148///         }
149///     }
150/// );
151/// ```
152#[derive(Clone)]
153pub struct SimpleMetricManager<H> {
154    inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
155}
156
157impl<H> SimpleMetricManager<H>
158where
159    H: MetricPublisher + Clone + Send + Sync + 'static,
160{
161    /// Creates a new empty `SimpleMetricManager`.
162    ///
163    /// This initialises a new metric manager with no registered metrics.
164    pub fn new() -> Self {
165        Self {
166            inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
167                handle: None,
168                metrics: HashMap::new(),
169                cmd_lookup: HashMap::new(),
170            })),
171        }
172    }
173
174    fn register<T>(
175        &self,
176        name: String,
177        value: T,
178        cb: Option<CmdCallback<T, H>>,
179    ) -> Option<SimpleManagerMetric<T, H>>
180    where
181        T: traits::MetricValue + Clone + Send + 'static,
182    {
183        let mut manager = self.inner.lock().unwrap();
184        if manager.metrics.contains_key(&name) {
185            return None;
186        }
187
188        let metric = SimpleManagerMetric {
189            data: Arc::new(Mutex::new(MetricData {
190                value,
191                token: None,
192                cb,
193            })),
194        };
195        let metric_insert = Arc::new(metric.clone());
196        manager.metrics.insert(name, metric_insert);
197        Some(metric)
198    }
199
200    /// Registers a new metric with the given name and initial value.
201    ///
202    /// Returns `None` if a metric with the same name already exists, otherwise
203    /// returns the newly created metric.
204    pub fn register_metric<S, T>(&self, name: S, value: T) -> Option<SimpleManagerMetric<T, H>>
205    where
206        S: Into<String>,
207        T: traits::MetricValue + Clone + Send + 'static,
208    {
209        self.register::<T>(name.into(), value, None)
210    }
211
212    /// Registers a new metric with a command handler that will be called when
213    /// commands for this metric are received.
214    ///
215    /// The command handler is an async function that receives the manager, the metric,
216    /// and an optional new value for the metric. If the value is "None" that indicates the CMD metric 'is_null'
217    /// field was true
218    ///
219    /// Returns `None` if a metric with the same name already exists, otherwise
220    /// returns the newly created metric.
221    pub fn register_metric_with_cmd_handler<S, T, F, Fut>(
222        &self,
223        name: S,
224        value: T,
225        cmd_handler: F,
226    ) -> Option<SimpleManagerMetric<T, H>>
227    where
228        S: Into<String>,
229        T: traits::MetricValue + Clone + Send + 'static,
230        F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
231            + Send
232            + Sync
233            + 'static,
234        Fut: Future<Output = ()> + Send + 'static,
235    {
236        let cmd_handler = Arc::new(
237            move |manager: SimpleMetricManager<H>,
238                  metric: SimpleManagerMetric<T, H>,
239                  value: Option<T>|
240                  -> Pin<Box<dyn Future<Output = ()> + Send>> {
241                Box::pin(cmd_handler(manager, metric, value))
242            },
243        );
244        self.register::<T>(name.into(), value, Some(cmd_handler))
245    }
246
247    fn get_callbacks_from_cmd_message_metrics(
248        &self,
249        metrics: MessageMetrics,
250    ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
251        let manager = self.inner.lock().unwrap();
252        let mut cbs = Vec::with_capacity(metrics.len());
253        for metric in metrics {
254            match manager.cmd_lookup.get(&metric.id) {
255                Some(res) => cbs.push((res.clone(), metric)),
256                None => continue,
257            };
258        }
259        cbs
260    }
261
262    async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
263        let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
264        let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
265            .into_iter()
266            .map(|(stored, value)| {
267                Box::pin({
268                    let handle = self.clone();
269                    async move { stored.cmd_cb(handle, value).await }
270                }) as Pin<Box<dyn Future<Output = ()> + Send>>
271            })
272            .collect();
273        join_all(futures).await;
274    }
275
276    /// Publishes a single metric.
277    ///
278    /// Returns an error if the metric was not published.
279    pub async fn publish_metric(
280        &self,
281        metric: SimpleManagerPublishMetric,
282    ) -> Result<(), PublishError> {
283        self.publish_metrics(vec![metric]).await
284    }
285
286    /// Publishes a multiple metric in a single batch.
287    ///
288    /// Returns an error if the metrics were not published.
289    pub async fn publish_metrics(
290        &self,
291        metrics: Vec<SimpleManagerPublishMetric>,
292    ) -> Result<(), PublishError> {
293        let handle = {
294            match &self.inner.lock().unwrap().handle {
295                Some(handle) => handle.clone(),
296                None => return Err(PublishError::UnBirthed),
297            }
298        };
299
300        let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
301        handle.publish_metrics(publish_metrics).await
302    }
303}
304
305impl<H> Default for SimpleMetricManager<H>
306where
307    H: MetricPublisher + Clone + Send + Sync + 'static,
308{
309    fn default() -> Self {
310        Self::new()
311    }
312}
313
314impl<H> MetricManager for SimpleMetricManager<H> {
315    fn initialise_birth(&self, bi: &mut BirthInitializer) {
316        let mut manager = self.inner.lock().unwrap();
317
318        let mut cmd_lookup = vec![];
319        manager.metrics.iter_mut().for_each(|(i, x)| {
320            let id = x.birth_metric(i, bi);
321            if x.has_callback() {
322                cmd_lookup.push((id, x.clone()));
323            }
324        });
325        manager.cmd_lookup = cmd_lookup.into_iter().collect();
326    }
327}
328
329#[async_trait]
330impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
331    fn init(&self, handle: &NodeHandle) {
332        self.inner.lock().unwrap().handle = Some(handle.clone())
333    }
334
335    async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
336        self.handle_cmd_metrics(metrics).await
337    }
338}
339
340#[async_trait]
341impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
342    fn init(&self, handle: &DeviceHandle) {
343        self.inner.lock().unwrap().handle = Some(handle.clone())
344    }
345
346    async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
347        self.handle_cmd_metrics(metrics).await
348    }
349}