srad_eon/
metric.rs

1use std::marker::PhantomData;
2use std::vec::IntoIter;
3
4use log::warn;
5use srad_types::payload::{Metric, Payload};
6use srad_types::utils::timestamp;
7use srad_types::MetaData;
8use srad_types::PropertySet;
9use srad_types::{traits, MetricId, MetricValue};
10
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14pub enum PublishError {
15    #[error("Connection state is Offline")]
16    Offline,
17    #[error("No metrics provided.")]
18    NoMetrics,
19    #[error("The node or device is not birthed.")]
20    UnBirthed,
21}
22
23/// A trait for publishing metrics to the network.
24///
25/// `MetricPublisher` defines a set of methods for publishing single metrics
26/// or batches of metrics. It provides "try_" variants that may fail immediately.
27///
28/// "try_publish" variants will use the "try_publish" variants from the [srad_client::Client] trait.
29/// Similarly, the "publish" variants will use the "publish" from the [srad_client::Client] trait.
30pub trait MetricPublisher {
31    /// Attempts to publish a batch of metrics without modifying their order.
32    fn try_publish_metrics_unsorted(
33        &self,
34        metrics: Vec<PublishMetric>,
35    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
36
37    /// Attempts to publish a single metric.
38    fn try_publish_metric(
39        &self,
40        metric: PublishMetric,
41    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
42        self.try_publish_metrics_unsorted(vec![metric])
43    }
44
45    /// Attempts to publish a batch of metrics after sorting by timestamp.
46    fn try_publish_metrics(
47        &self,
48        mut metrics: Vec<PublishMetric>,
49    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
50        metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
51        self.publish_metrics_unsorted(metrics)
52    }
53
54    /// Publish a batch of metrics without modifying their order.
55    fn publish_metrics_unsorted(
56        &self,
57        metrics: Vec<PublishMetric>,
58    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
59
60    /// Publish a single metric.
61    fn publish_metric(
62        &self,
63        metric: PublishMetric,
64    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
65        self.publish_metrics_unsorted(vec![metric])
66    }
67
68    /// Publish a batch of metrics after sorting by timestamp.
69    fn publish_metrics(
70        &self,
71        mut metrics: Vec<PublishMetric>,
72    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
73        metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
74        self.publish_metrics_unsorted(metrics)
75    }
76}
77
78/// A structure for creating a metric to be published with associated metadata and properties.
79///
80/// `PublishMetric` provides a builder pattern for configuring metric publications,
81/// allowing for optional fields like transience, historical status, custom timestamps,
82/// metadata, and properties.
83pub struct PublishMetric {
84    metric_identifier: MetricId,
85    value: Option<MetricValue>,
86    is_transient: Option<bool>,
87    is_historical: Option<bool>,
88    timestamp: u64,
89    metadata: Option<MetaData>,
90    properties: Option<PropertySet>,
91}
92
93impl PublishMetric {
94    pub(crate) fn new<T: traits::MetricValue>(
95        metric_identifier: MetricId,
96        value: Option<T>,
97    ) -> Self {
98        let metadata = if let Some(v) = &value {
99            v.publish_metadata()
100        } else {
101            None
102        };
103        Self {
104            metric_identifier,
105            metadata,
106            value: value.map(T::into),
107            is_transient: None,
108            is_historical: None,
109            properties: None,
110            timestamp: timestamp(),
111        }
112    }
113    /// Sets a custom timestamp for the metric.
114    ///
115    /// By default, the current system time is used.
116    pub fn timestamp(mut self, timestamp: u64) -> Self {
117        self.timestamp = timestamp;
118        self
119    }
120
121    /// Marks the metric as transient or persistent.
122    ///
123    /// Transient metrics are typically not stored permanently. By default metrics are not transient.
124    pub fn transient(mut self, is_transient: bool) -> Self {
125        self.is_transient = Some(is_transient);
126        self
127    }
128
129    /// Marks the metric as a historical metric that does not represent a current value.
130    ///
131    /// By default, metrics are not historical.
132    pub fn historical(mut self, is_historical: bool) -> Self {
133        self.is_historical = Some(is_historical);
134        self
135    }
136
137    /// Sets custom metadata for the metric.
138    ///
139    /// By default, the result from [MetricValue::publish_metadata][srad_types::traits::MetricValue::publish_metadata]  will be used.
140    pub fn metadata(mut self, metadata: MetaData) -> Self {
141        self.metadata = Some(metadata);
142        self
143    }
144
145    /// Sets custom properties for the metric.
146    pub fn properties<P: Into<PropertySet>>(mut self, properties: P) -> Self {
147        self.properties = Some(properties.into());
148        self
149    }
150}
151
152impl From<PublishMetric> for Metric {
153    fn from(value: PublishMetric) -> Self {
154        let mut metric = Metric::new();
155        match value.metric_identifier {
156            MetricId::Name(name) => metric.set_name(name),
157            MetricId::Alias(alias) => metric.set_alias(alias),
158        };
159
160        metric.metadata = value.metadata.map(MetaData::into);
161
162        if let Some(val) = value.value {
163            metric.set_value(val.into());
164        }
165
166        metric.timestamp = Some(value.timestamp);
167        metric.properties = value.properties.map(PropertySet::into);
168
169        metric.is_historical = value.is_historical;
170        metric.is_transient = value.is_transient;
171
172        metric
173    }
174}
175
176/// A token representing a birthed metric
177///
178/// Used to create a [PublishMetric] for publishing and match a [MessageMetric] identifier
179pub struct MetricToken<T> {
180    phantom: PhantomData<T>,
181    /// The unique identifier of the metric
182    pub id: MetricId,
183}
184
185impl<T> MetricToken<T>
186where
187    T: traits::MetricValue,
188{
189    pub(crate) fn new(id: MetricId) -> Self {
190        Self {
191            phantom: PhantomData,
192            id,
193        }
194    }
195
196    /// Create a new [PublishMetric]
197    pub fn create_publish_metric(&self, value: Option<T>) -> PublishMetric {
198        PublishMetric::new(self.id.clone(), value)
199    }
200}
201
202/// A collection of metrics from a message
203pub struct MessageMetrics {
204    /// The timestamp of the payload
205    pub timestamp: u64,
206    metrics: Vec<Metric>,
207}
208
209impl MessageMetrics {
210    pub fn len(&self) -> usize {
211        self.metrics.len()
212    }
213
214    pub fn is_empty(&self) -> bool {
215        self.metrics.len() == 0
216    }
217}
218
219/// A metric from a message
220pub struct MessageMetric {
221    /// The unique identifier of the metric
222    pub id: MetricId,
223    pub timestamp: Option<u64>,
224    pub value: Option<MetricValue>,
225    pub properties: Option<PropertySet>,
226}
227
228impl TryFrom<Metric> for MessageMetric {
229    type Error = ();
230
231    fn try_from(value: Metric) -> Result<Self, Self::Error> {
232        let id = if let Some(alias) = value.alias {
233            MetricId::Alias(alias)
234        } else if let Some(name) = value.name {
235            MetricId::Name(name)
236        } else {
237            return Err(());
238        };
239
240        let metric_value = if value.value.is_some() {
241            value.value.map(MetricValue::from)
242        } else if let Some(is_null) = value.is_null {
243            if is_null {
244                return Err(());
245            }
246            None
247        } else {
248            return Err(());
249        };
250
251        Ok(MessageMetric {
252            id,
253            timestamp: value.timestamp,
254            value: metric_value,
255            properties: None,
256        })
257    }
258}
259
260pub struct MessageMetricsIterator {
261    metric_iter: IntoIter<Metric>,
262}
263
264impl Iterator for MessageMetricsIterator {
265    type Item = MessageMetric;
266
267    fn next(&mut self) -> Option<Self::Item> {
268        let metric = self.metric_iter.next();
269        match metric {
270            Some(metric) => match metric.try_into() {
271                Ok(message_metric) => Some(message_metric),
272                Err(_) => {
273                    warn!("Got invalid or badly formed metric - skipping");
274                    self.next()
275                }
276            },
277            None => None,
278        }
279    }
280}
281
282impl IntoIterator for MessageMetrics {
283    type Item = MessageMetric;
284
285    type IntoIter = MessageMetricsIterator;
286
287    fn into_iter(self) -> Self::IntoIter {
288        MessageMetricsIterator {
289            metric_iter: self.metrics.into_iter(),
290        }
291    }
292}
293
294impl TryFrom<Payload> for MessageMetrics {
295    type Error = ();
296
297    fn try_from(value: Payload) -> Result<Self, Self::Error> {
298        /*
299        tck-id-payloads-ncmd-timestamp and tck-id-payloads-cmd-timestamp
300        messages MUST include a payload timestamp that denotes the time at which the message was published.
301        */
302        let timestamp = match value.timestamp {
303            Some(timestamp) => timestamp,
304            None => return Err(()),
305        };
306
307        Ok(MessageMetrics {
308            timestamp,
309            metrics: value.metrics,
310        })
311    }
312}