Skip to main content

srad_eon/metric_manager/
simple_manager.rs

1use super::manager::{DeviceMetricManager, MetricManager, NodeMetricManager};
2use crate::{
3    birth::{BirthInitializer, BirthMetricDetails},
4    device::DeviceHandle,
5    metric::{
6        MessageMetric, MessageMetrics, MetricPublisher, MetricToken, PublishError, PublishMetric,
7    },
8    NodeHandle, StateError,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use srad_types::{traits, MetaData, MetricId, PropertySet};
13use std::{
14    collections::HashMap,
15    future::Future,
16    ops::DerefMut,
17    pin::Pin,
18    sync::{Arc, Mutex},
19};
20
21type CmdCallback<T, H> = Arc<
22    dyn Fn(
23            SimpleMetricManager<H>,
24            SimpleManagerMetric<T, H>,
25            Option<T>,
26        ) -> Pin<Box<dyn Future<Output = ()> + Send>>
27        + Send
28        + Sync,
29>;
30
31struct MetricData<T, H> {
32    value: T,
33    token: Option<MetricToken<T>>,
34    cb: Option<CmdCallback<T, H>>,
35    use_alias: bool,
36}
37
38pub struct SimpleManagerPublishMetric(Option<PublishMetric>);
39
40impl SimpleManagerPublishMetric {
41    /// Sets a custom timestamp for the metric.
42    ///
43    /// By default, the current system time is used.
44    pub fn with_timestamp(self, timestamp: u64) -> Self {
45        SimpleManagerPublishMetric(self.0.map(|m| m.timestamp(timestamp)))
46    }
47
48    /// Marks the metric as transient or persistent.
49    ///
50    /// Transient metrics are typically not stored permanently. By default metrics are not transient.
51    pub fn with_transient(self, is_transient: bool) -> Self {
52        SimpleManagerPublishMetric(self.0.map(|m| m.transient(is_transient)))
53    }
54
55    /// Marks the metric as a historical metric that does not represent a current value.
56    ///
57    /// By default, metrics are not historical.
58    pub fn with_historical(self, is_historical: bool) -> Self {
59        SimpleManagerPublishMetric(self.0.map(|m| m.historical(is_historical)))
60    }
61
62    /// Sets custom metadata for the metric.
63    ///
64    /// By default, the result from [MetricValue::publish_metadata][srad_types::traits::MetricValue::publish_metadata]  will be used.
65    pub fn with_metadata(self, metadata: MetaData) -> Self {
66        SimpleManagerPublishMetric(self.0.map(|m| m.metadata(metadata)))
67    }
68
69    /// Sets custom properties for the metric.
70    pub fn with_properties<P: Into<PropertySet>>(self, properties: P) -> Self {
71        SimpleManagerPublishMetric(self.0.map(|m| m.properties(properties)))
72    }
73}
74
75#[derive(Clone)]
76pub struct SimpleManagerMetric<T, H> {
77    data: Arc<Mutex<MetricData<T, H>>>,
78}
79
80impl<T, H> SimpleManagerMetric<T, H>
81where
82    T: traits::MetricValue + Clone,
83{
84    pub fn update<F1>(&self, f: F1) -> SimpleManagerPublishMetric
85    where
86        F1: Fn(&mut T),
87    {
88        let mut guard = self.data.lock().unwrap();
89        let x = guard.deref_mut();
90        f(&mut x.value);
91        let option = match &x.token {
92            Some(h) => Some(h.create_publish_metric(Some(x.value.clone()))),
93            None => None,
94        };
95        SimpleManagerPublishMetric(option)
96    }
97}
98
99#[async_trait]
100trait Stored<H>: Send {
101    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId;
102    fn has_callback(&self) -> bool;
103    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric);
104}
105
106#[async_trait]
107impl<T, H> Stored<H> for SimpleManagerMetric<T, H>
108where
109    T: traits::MetricValue + Clone + Send,
110    H: Send + Sync,
111{
112    fn birth_metric(&self, name: &str, bi: &mut BirthInitializer) -> MetricId {
113        let mut metric = self.data.lock().unwrap();
114        let val = metric.value.clone();
115        let token = bi
116            .register_metric(
117                BirthMetricDetails::new_with_initial_value(name, val).use_alias(metric.use_alias),
118            )
119            .unwrap();
120        let id = token.id.clone();
121        metric.token = Some(token);
122        id
123    }
124
125    fn has_callback(&self) -> bool {
126        self.data.lock().unwrap().cb.is_some()
127    }
128
129    async fn cmd_cb(&self, manager: SimpleMetricManager<H>, value: MessageMetric) {
130        let cb = {
131            let metric = self.data.lock().unwrap();
132            match &metric.cb {
133                Some(cb) => cb.clone(),
134                None => return,
135            }
136        };
137
138        let converted = match value.value {
139            Some(v) => match v.try_into() {
140                Ok(value) => Some(value),
141                Err(_) => return,
142            },
143            None => None,
144        };
145
146        let x = SimpleManagerMetric {
147            data: self.data.clone(),
148        };
149        cb(manager, x, converted).await
150    }
151}
152
153/// A Metric Builder to configure a metric to add to a [SimpleMetricManager]
154pub struct SimpleMetricBuilder<T, H> {
155    name: String,
156    initial_value: T,
157    use_alias: bool,
158    cmd_cb: Option<CmdCallback<T, H>>,
159}
160
161impl<T, H> SimpleMetricBuilder<T, H>
162where
163    T: traits::MetricValue + Clone + Send + 'static,
164{
165    /// Create a new metric builder instance
166    pub fn new<S: Into<String>>(name: S, value: T) -> Self {
167        Self {
168            name: name.into(),
169            initial_value: value,
170            use_alias: true,
171            cmd_cb: None,
172        }
173    }
174
175    /// Set a handler which will be called when command messages for this metric are received.
176    ///
177    /// The command handler is an async function that receives the manager, the metric,
178    /// and an optional new value for the metric. If the value is "None" that indicates the CMD metric's 'is_null'
179    /// field was true
180    pub fn with_cmd_handler<F, Fut>(mut self, cb: F) -> Self
181    where
182        F: Fn(SimpleMetricManager<H>, SimpleManagerMetric<T, H>, Option<T>) -> Fut
183            + Send
184            + Sync
185            + 'static,
186        Fut: Future<Output = ()> + Send + 'static,
187    {
188        self.cmd_cb = Some(Arc::new(
189            move |manager: SimpleMetricManager<H>,
190                  metric: SimpleManagerMetric<T, H>,
191                  value: Option<T>|
192                  -> Pin<Box<dyn Future<Output = ()> + Send>> {
193                Box::pin(cb(manager, metric, value))
194            },
195        ));
196        self
197    }
198
199    /// Set if the metric should use an alias. Defaults to true.
200    pub fn use_alias(mut self, use_alias: bool) -> Self {
201        self.use_alias = use_alias;
202        self
203    }
204}
205
206struct SimpleMetricManagerInner<H> {
207    handle: Option<H>,
208    metrics: HashMap<String, Arc<dyn Stored<H> + Send + Sync>>,
209    cmd_lookup: HashMap<MetricId, Arc<dyn Stored<H> + Send + Sync>>,
210}
211
212/// A [MetricManager] implementation that provides simple metric registration and handling.
213///
214/// `SimpleMetricManager` provides a simple way to manage metrics with support for
215/// command callbacks and metric publishing.
216/// # Example
217/// ```no_run
218/// use srad_eon::simple_manager::{SimpleMetricManager, SimpleMetricBuilder};
219/// # use srad_eon::DeviceHandle;
220///
221/// # fn create_device_with_manager(manager: &SimpleMetricManager<DeviceHandle>) {
222/// #   unimplemented!()
223/// # }
224/// //Create a new simple metric manager
225/// let manager = SimpleMetricManager::new();
226///
227/// // Assume we successfully create a device with a SimpleMetricManager as it's metrics manager
228/// create_device_with_manager(&manager);
229///
230/// // Register a simple metric
231/// let counter = manager.register_metric(SimpleMetricBuilder::new("Counter", 0 as i32)).unwrap();
232///
233/// // Register a metric with a command handler
234/// let temperature = manager.register_metric(
235///     SimpleMetricBuilder::new("temperature", 25.5)
236///     .with_cmd_handler(
237///         |mgr, metric, new_value| async move {
238///             if let Some(value) = new_value {
239///                 mgr.publish_metric(metric.update(|x|{ *x = value })).await;
240///             }
241///         }
242///     )
243/// );
244/// ```
245#[derive(Clone)]
246pub struct SimpleMetricManager<H> {
247    inner: Arc<Mutex<SimpleMetricManagerInner<H>>>,
248}
249
250impl<H> SimpleMetricManager<H>
251where
252    H: MetricPublisher + Clone + Send + Sync + 'static,
253{
254    /// Creates a new empty `SimpleMetricManager`.
255    ///
256    /// This initialises a new metric manager with no registered metrics.
257    pub fn new() -> Self {
258        Self {
259            inner: Arc::new(Mutex::new(SimpleMetricManagerInner {
260                handle: None,
261                metrics: HashMap::new(),
262                cmd_lookup: HashMap::new(),
263            })),
264        }
265    }
266
267    /// Registers a new metric from the builders settings.
268    ///
269    /// Returns `None` if a metric with the same name already exists, otherwise
270    /// returns the newly created metric.
271    pub fn register_metric<T>(
272        &self,
273        builder: SimpleMetricBuilder<T, H>,
274    ) -> Option<SimpleManagerMetric<T, H>>
275    where
276        T: traits::MetricValue + Clone + Send + 'static,
277    {
278        let mut manager = self.inner.lock().unwrap();
279        if manager.metrics.contains_key(&builder.name) {
280            return None;
281        }
282
283        let metric = SimpleManagerMetric {
284            data: Arc::new(Mutex::new(MetricData {
285                value: builder.initial_value,
286                token: None,
287                cb: builder.cmd_cb,
288                use_alias: builder.use_alias,
289            })),
290        };
291        let metric_insert = Arc::new(metric.clone());
292        manager.metrics.insert(builder.name, metric_insert);
293        Some(metric)
294    }
295
296    fn get_callbacks_from_cmd_message_metrics(
297        &self,
298        metrics: MessageMetrics,
299    ) -> Vec<(Arc<dyn Stored<H> + Send + Sync>, MessageMetric)> {
300        let manager = self.inner.lock().unwrap();
301        let mut cbs = Vec::with_capacity(metrics.len());
302        for metric in metrics {
303            match manager.cmd_lookup.get(&metric.id) {
304                Some(res) => cbs.push((res.clone(), metric)),
305                None => continue,
306            };
307        }
308        cbs
309    }
310
311    async fn handle_cmd_metrics(&self, metrics: MessageMetrics) {
312        let callbacks = self.get_callbacks_from_cmd_message_metrics(metrics);
313        let futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = callbacks
314            .into_iter()
315            .map(|(stored, value)| {
316                Box::pin({
317                    let handle = self.clone();
318                    async move { stored.cmd_cb(handle, value).await }
319                }) as Pin<Box<dyn Future<Output = ()> + Send>>
320            })
321            .collect();
322        join_all(futures).await;
323    }
324
325    /// Publishes a single metric.
326    ///
327    /// Returns an error if the metric was not published.
328    pub async fn publish_metric(
329        &self,
330        metric: SimpleManagerPublishMetric,
331    ) -> Result<(), PublishError> {
332        self.publish_metrics(vec![metric]).await
333    }
334
335    /// Publishes a multiple metric in a single batch.
336    ///
337    /// Returns an error if the metrics were not published.
338    pub async fn publish_metrics(
339        &self,
340        metrics: Vec<SimpleManagerPublishMetric>,
341    ) -> Result<(), PublishError> {
342        let handle = {
343            match &self.inner.lock().unwrap().handle {
344                Some(handle) => handle.clone(),
345                None => return Err(PublishError::State(StateError::UnBirthed)),
346            }
347        };
348
349        let publish_metrics = metrics.into_iter().filter_map(|x| x.0).collect();
350        handle.publish_metrics(publish_metrics).await
351    }
352}
353
354impl<H> Default for SimpleMetricManager<H>
355where
356    H: MetricPublisher + Clone + Send + Sync + 'static,
357{
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363impl<H> MetricManager for SimpleMetricManager<H> {
364    fn initialise_birth(&self, bi: &mut BirthInitializer) {
365        let mut manager = self.inner.lock().unwrap();
366
367        let mut cmd_lookup = vec![];
368        manager.metrics.iter_mut().for_each(|(i, x)| {
369            let id = x.birth_metric(i, bi);
370            if x.has_callback() {
371                cmd_lookup.push((id, x.clone()));
372            }
373        });
374        manager.cmd_lookup = cmd_lookup.into_iter().collect();
375    }
376}
377
378#[async_trait]
379impl NodeMetricManager for SimpleMetricManager<NodeHandle> {
380    fn init(&self, handle: &NodeHandle) {
381        self.inner.lock().unwrap().handle = Some(handle.clone())
382    }
383
384    async fn on_ncmd(&self, _: NodeHandle, metrics: MessageMetrics) {
385        self.handle_cmd_metrics(metrics).await
386    }
387}
388
389#[async_trait]
390impl DeviceMetricManager for SimpleMetricManager<DeviceHandle> {
391    fn init(&self, handle: &DeviceHandle) {
392        self.inner.lock().unwrap().handle = Some(handle.clone())
393    }
394
395    async fn on_dcmd(&self, _: DeviceHandle, metrics: MessageMetrics) {
396        self.handle_cmd_metrics(metrics).await
397    }
398}