srad_eon/metric_manager/
simple.rs

1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3    birth::{BirthInitializer, BirthMetricDetails},
4    device::DeviceHandle,
5    metric::{
6        MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7    },
8    NodeHandle, StateError,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetricId};
13use std::{
14    collections::HashMap,
15    future::Future,
16    ops::DerefMut,
17    pin::Pin,
18    sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22    dyn Fn(
23            SimpleMetricManager<H>,
24            SimpleManagerMetric<T, H>,
25            Option<T>,
26        ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27        + Send
28        + Sync,
29>;
30
31struct MetricData<T, H> {
32    value: T,
33    token: Option<MetricToken<T>>,
34    cb: Option<CmdCallback<T, H>>,
35    use_alias: bool,
36}
37
38pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
39#[derive(Clone)]
40pub struct SimpleManagerMetric<T, H> {
41    data: Arc<Mutex<MetricData<T, H>>>,
42}
43
44impl<T, H> SimpleManagerMetric<T, H>
45where
46    T: traits::MetricValue + Clone,
47{
48    pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
49    where
50        F1: Fn(&mut T),
51    {
52        let mut guard = self.data.lock().unwrap();
53        let x = guard.deref_mut();
54        f(&mut x.value);
55        let option = match &x.token {
56            Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
57            None => None,
58        };
59        SimpleManagerPublishMetric(option)
60    }
61}
62
63#[async_trait]
64trait Stored<H>: Send {
65    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
66    fn has_callback(&self) -> bool;
67    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
68}
69
70#[async_trait]
71impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
72where
73    T: traits::MetricValue + Clone + Send,
74    H: Send + Sync,
75{
76    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
77        let mut metric = self.data.lock().unwrap();
78        let val = metric.value.clone();
79        let token = bi
80            .register_metric(
81                BirthMetricDetails::new_with_initial_value(name, val).use_alias(metric.use_alias),
82            )
83            .unwrap();
84        let id = token.id.clone();
85        metric.token = Some(token);
86        id
87    }
88
89    fn has_callback(&self) -> bool {
90        self.data.lock().unwrap().cb.is_some()
91    }
92
93    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
94        let cb = {
95            let metric = self.data.lock().unwrap();
96            match &metric.cb {
97                Some(cb) => cb.clone(),
98                None => return,
99            }
100        };
101
102        let converted = match value.value {
103            Some(v) => match v.try_into() {
104                Ok(value) => Some(value),
105                Err(_) => return,
106            },
107            None => None,
108        };
109
110        let x = SimpleManagerMetric {
111            data: self.data.clone(),
112        };
113        cb(manager, x, converted).await
114    }
115}
116
117/// A Metric Builder to configure a metric to add to a [SimpleMetricManager]
118pub struct SimpleMetricBuilder<T, H> {
119    name: String,
120    initial_value: T,
121    use_alias: bool,
122    cmd_cb: Option<CmdCallback<T, H>>,
123}
124
125impl<T, H> SimpleMetricBuilder<T, H>
126where
127    T: traits::MetricValue + Clone + Send + 'static,
128{
129    /// Create a new metric builder instance
130    pub fn new<S: Into<String>>(name: S, value: T) -> Self {
131        Self {
132            name: name.into(),
133            initial_value: value,
134            use_alias: true,
135            cmd_cb: None,
136        }
137    }
138
139    /// Set a handler which will be called when command messages for this metric are received.
140    ///
141    /// The command handler is an async function that receives the manager, the metric,
142    /// and an optional new value for the metric. If the value is "None" that indicates the CMD metric's 'is_null'
143    /// field was true
144    pub fn with_cmd_handler<F, Fut>(mut self, cb: F) -> Self
145    where
146        F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
147            + Send
148            + Sync
149            + 'static,
150        Fut: Future<Output = ()> + Send + 'static,
151    {
152        self.cmd_cb = Some(Arc::new(
153            move |manager: SimpleMetricManager<H>,
154                  metric: SimpleManagerMetric<T, H>,
155                  value: Option<T>|
156                  -> Pin<Box<dyn Future<Output = ()> + Send>> {
157                Box::pin(cb(manager, metric, value))
158            },
159        ));
160        self
161    }
162
163    /// Set if the metric should use an alias. Defaults to true.
164    pub fn use_alias(mut self, use_alias: bool) -> Self {
165        self.use_alias = use_alias;
166        self
167    }
168}
169
170struct SimpleMetricManagerInner<H> {
171    handle: Option<H>,
172    metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
173    cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
174}
175
176/// A [MetricManager] implementation that provides simple metric registration and handling.
177///
178/// `SimpleMetricManager` provides a simple way to manage metrics with support for
179/// command callbacks and metric publishing.
180/// # Example
181/// ```no_run
182/// use srad_eon::{SimpleMetricManager, SimpleMetricBuilder};
183/// # use srad_eon::DeviceHandle;
184///
185/// # fn create_device_with_manager(manager: &SimpleMetricManager<DeviceHandle>) {
186/// #   unimplemented!()
187/// # }
188/// //Create a new simple metric manager
189/// let manager = SimpleMetricManager::new();
190///
191/// // Assume we successfully create a device with a SimpleMetricManager as it's metrics manager
192/// create_device_with_manager(&manager);
193///
194/// // Register a simple metric
195/// let counter = manager.register_metric(SimpleMetricBuilder::new("Counter", 0 as i32)).unwrap();
196///
197/// // Register a metric with a command handler
198/// let temperature = manager.register_metric(
199///     SimpleMetricBuilder::new("temperature", 25.5)
200///     .with_cmd_handler(
201///         |mgr, metric, new_value| async move {
202///             if let Some(value) = new_value {
203///                 mgr.publish_metric(metric.update(|x|{ *x = value })).await;
204///             }
205///         }
206///     )
207/// );
208/// ```
209#[derive(Clone)]
210pub struct SimpleMetricManager<H> {
211    inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
212}
213
214impl<H> SimpleMetricManager<H>
215where
216    H: MetricPublisher + Clone + Send + Sync + 'static,
217{
218    /// Creates a new empty `SimpleMetricManager`.
219    ///
220    /// This initialises a new metric manager with no registered metrics.
221    pub fn new() -> Self {
222        Self {
223            inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
224                handle: None,
225                metrics: HashMap::new(),
226                cmd_lookup: HashMap::new(),
227            })),
228        }
229    }
230
231    /// Registers a new metric from the builders settings.
232    ///
233    /// Returns `None` if a metric with the same name already exists, otherwise
234    /// returns the newly created metric.
235    pub fn register_metric<T>(
236        &self,
237        builder: SimpleMetricBuilder<T, H>,
238    ) -> Option<SimpleManagerMetric<T, H>>
239    where
240        T: traits::MetricValue + Clone + Send + 'static,
241    {
242        let mut manager = self.inner.lock().unwrap();
243        if manager.metrics.contains_key(&builder.name) {
244            return None;
245        }
246
247        let metric = SimpleManagerMetric {
248            data: Arc::new(Mutex::new(MetricData {
249                value: builder.initial_value,
250                token: None,
251                cb: builder.cmd_cb,
252                use_alias: builder.use_alias,
253            })),
254        };
255        let metric_insert = Arc::new(metric.clone());
256        manager.metrics.insert(builder.name, metric_insert);
257        Some(metric)
258    }
259
260    fn get_callbacks_from_cmd_message_metrics(
261        &self,
262        metrics: MessageMetrics,
263    ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
264        let manager = self.inner.lock().unwrap();
265        let mut cbs = Vec::with_capacity(metrics.len());
266        for metric in metrics {
267            match manager.cmd_lookup.get(&metric.id) {
268                Some(res) => cbs.push((res.clone(), metric)),
269                None => continue,
270            };
271        }
272        cbs
273    }
274
275    async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
276        let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
277        let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
278            .into_iter()
279            .map(|(stored, value)| {
280                Box::pin({
281                    let handle = self.clone();
282                    async move { stored.cmd_cb(handle, value).await }
283                }) as Pin<Box<dyn Future<Output = ()> + Send>>
284            })
285            .collect();
286        join_all(futures).await;
287    }
288
289    /// Publishes a single metric.
290    ///
291    /// Returns an error if the metric was not published.
292    pub async fn publish_metric(
293        &self,
294        metric: SimpleManagerPublishMetric,
295    ) -> Result<(), PublishError> {
296        self.publish_metrics(vec![metric]).await
297    }
298
299    /// Publishes a multiple metric in a single batch.
300    ///
301    /// Returns an error if the metrics were not published.
302    pub async fn publish_metrics(
303        &self,
304        metrics: Vec<SimpleManagerPublishMetric>,
305    ) -> Result<(), PublishError> {
306        let handle = {
307            match &self.inner.lock().unwrap().handle {
308                Some(handle) => handle.clone(),
309                None => return Err(PublishError::State(StateError::UnBirthed)),
310            }
311        };
312
313        let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
314        handle.publish_metrics(publish_metrics).await
315    }
316}
317
318impl<H> Default for SimpleMetricManager<H>
319where
320    H: MetricPublisher + Clone + Send + Sync + 'static,
321{
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327impl<H> MetricManager for SimpleMetricManager<H> {
328    fn initialise_birth(&self, bi: &mut BirthInitializer) {
329        let mut manager = self.inner.lock().unwrap();
330
331        let mut cmd_lookup = vec![];
332        manager.metrics.iter_mut().for_each(|(i, x)| {
333            let id = x.birth_metric(i, bi);
334            if x.has_callback() {
335                cmd_lookup.push((id, x.clone()));
336            }
337        });
338        manager.cmd_lookup = cmd_lookup.into_iter().collect();
339    }
340}
341
342#[async_trait]
343impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
344    fn init(&self, handle: &NodeHandle) {
345        self.inner.lock().unwrap().handle = Some(handle.clone())
346    }
347
348    async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
349        self.handle_cmd_metrics(metrics).await
350    }
351}
352
353#[async_trait]
354impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
355    fn init(&self, handle: &DeviceHandle) {
356        self.inner.lock().unwrap().handle = Some(handle.clone())
357    }
358
359    async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
360        self.handle_cmd_metrics(metrics).await
361    }
362}