srad_eon/
metric.rs

1use std::marker::PhantomData;
2use std::vec::IntoIter;
3
4use log::warn;
5use srad_types::payload::{Metric, Payload};
6use srad_types::utils::timestamp;
7use srad_types::PropertySet;
8use srad_types::{traits, MetricId, MetricValue};
9use srad_types::{MetaData, PartialTemplate, Template};
10
11use thiserror::Error;
12
13use crate::StateError;
14
15#[derive(Debug, Error)]
16pub enum PublishError {
17    #[error("No metrics provided.")]
18    NoMetrics,
19    #[error("State Error: {0}.")]
20    State(StateError),
21}
22
23impl From<StateError> for PublishError {
24    fn from(value: StateError) -> Self {
25        PublishError::State(value)
26    }
27}
28
29/// A trait for publishing metrics to the network.
30///
31/// `MetricPublisher` defines a set of methods for publishing single metrics
32/// or batches of metrics. It provides "try_" variants that may fail immediately.
33///
34/// "try_publish" variants will use the "try_publish" variants from the [srad_client::Client] trait.
35/// Similarly, the "publish" variants will use the "publish" from the [srad_client::Client] trait.
36pub trait MetricPublisher {
37    /// Attempts to publish a batch of metrics without modifying their order.
38    fn try_publish_metrics_unsorted(
39        &self,
40        metrics: Vec<PublishMetric>,
41    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
42
43    /// Attempts to publish a single metric.
44    fn try_publish_metric(
45        &self,
46        metric: PublishMetric,
47    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
48        self.try_publish_metrics_unsorted(vec![metric])
49    }
50
51    /// Attempts to publish a batch of metrics after sorting by timestamp.
52    fn try_publish_metrics(
53        &self,
54        mut metrics: Vec<PublishMetric>,
55    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
56        metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
57        self.publish_metrics_unsorted(metrics)
58    }
59
60    /// Publish a batch of metrics without modifying their order.
61    fn publish_metrics_unsorted(
62        &self,
63        metrics: Vec<PublishMetric>,
64    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
65
66    /// Publish a single metric.
67    fn publish_metric(
68        &self,
69        metric: PublishMetric,
70    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
71        self.publish_metrics_unsorted(vec![metric])
72    }
73
74    /// Publish a batch of metrics after sorting by timestamp.
75    fn publish_metrics(
76        &self,
77        mut metrics: Vec<PublishMetric>,
78    ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
79        metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
80        self.publish_metrics_unsorted(metrics)
81    }
82}
83
84/// A struct for creating a metric to be published with associated metadata and properties.
85///
86/// `PublishMetric` provides a builder pattern for configuring metric publications,
87/// allowing for optional fields like transience, historical status, custom timestamps,
88/// metadata, and properties.
89pub struct PublishMetric {
90    metric_identifier: MetricId,
91    value: Option<MetricValue>,
92    is_transient: Option<bool>,
93    is_historical: Option<bool>,
94    timestamp: u64,
95    metadata: Option<MetaData>,
96    properties: Option<PropertySet>,
97}
98
99impl PublishMetric {
100    pub(crate) fn new<T: traits::MetricValue>(
101        metric_identifier: MetricId,
102        value: Option<T>,
103    ) -> Self {
104        let metadata = if let Some(v) = &value {
105            v.publish_metadata()
106        } else {
107            None
108        };
109        Self {
110            metric_identifier,
111            metadata,
112            value: value.map(T::into),
113            is_transient: None,
114            is_historical: None,
115            properties: None,
116            timestamp: timestamp(),
117        }
118    }
119    /// Sets a custom timestamp for the metric.
120    ///
121    /// By default, the current system time is used.
122    pub fn timestamp(mut self, timestamp: u64) -> Self {
123        self.timestamp = timestamp;
124        self
125    }
126
127    /// Marks the metric as transient or persistent.
128    ///
129    /// Transient metrics are typically not stored permanently. By default metrics are not transient.
130    pub fn transient(mut self, is_transient: bool) -> Self {
131        self.is_transient = Some(is_transient);
132        self
133    }
134
135    /// Marks the metric as a historical metric that does not represent a current value.
136    ///
137    /// By default, metrics are not historical.
138    pub fn historical(mut self, is_historical: bool) -> Self {
139        self.is_historical = Some(is_historical);
140        self
141    }
142
143    /// Sets custom metadata for the metric.
144    ///
145    /// By default, the result from [MetricValue::publish_metadata][srad_types::traits::MetricValue::publish_metadata]  will be used.
146    pub fn metadata(mut self, metadata: MetaData) -> Self {
147        self.metadata = Some(metadata);
148        self
149    }
150
151    /// Sets custom properties for the metric.
152    pub fn properties<P: Into<PropertySet>>(mut self, properties: P) -> Self {
153        self.properties = Some(properties.into());
154        self
155    }
156}
157
158impl From<PublishMetric> for Metric {
159    fn from(value: PublishMetric) -> Self {
160        let mut metric = Metric::new();
161        match value.metric_identifier {
162            MetricId::Name(name) => metric.set_name(name),
163            MetricId::Alias(alias) => metric.set_alias(alias),
164        };
165
166        metric.metadata = value.metadata.map(MetaData::into);
167
168        if let Some(val) = value.value {
169            metric.set_value(val.into());
170        }
171
172        metric.timestamp = Some(value.timestamp);
173        metric.properties = value.properties.map(PropertySet::into);
174
175        metric.is_historical = value.is_historical;
176        metric.is_transient = value.is_transient;
177
178        metric
179    }
180}
181
182/// A token representing a birthed metric
183///
184/// Used to create a [PublishMetric] for publishing and match a [MessageMetric] identifier
185pub struct MetricToken<T> {
186    phantom: PhantomData<T>,
187    /// The unique identifier of the metric
188    pub id: MetricId,
189}
190
191impl<T> MetricToken<T> {
192    pub(crate) fn new(id: MetricId) -> Self {
193        Self {
194            phantom: PhantomData,
195            id,
196        }
197    }
198}
199
200impl<T> MetricToken<T>
201where
202    T: traits::MetricValue,
203{
204    /// Create a new [PublishMetric]
205    pub fn create_publish_metric(&self, value: Option<T>) -> PublishMetric {
206        PublishMetric::new(self.id.clone(), value)
207    }
208}
209
210impl<T> MetricToken<T>
211where
212    T: Template,
213{
214    /// Create a new [PublishMetric]
215    pub fn create_publish_template_metric(&self, value: T) -> PublishMetric {
216        PublishMetric::new(self.id.clone(), Some(value.template_instance()))
217    }
218}
219
220impl<T> MetricToken<T>
221where
222    T: Template + PartialTemplate,
223{
224    pub fn create_publish_template_metric_from_difference(
225        &self,
226        value: T,
227        other: &T,
228    ) -> Option<PublishMetric> {
229        let diff = value.template_instance_from_difference(other)?;
230        Some(PublishMetric::new(self.id.clone(), Some(diff)))
231    }
232}
233
234/// A collection of metrics from a message
235pub struct MessageMetrics {
236    /// The timestamp of the payload
237    pub timestamp: u64,
238    metrics: Vec<Metric>,
239}
240
241impl MessageMetrics {
242    pub fn len(&self) -> usize {
243        self.metrics.len()
244    }
245
246    pub fn is_empty(&self) -> bool {
247        self.metrics.len() == 0
248    }
249}
250
251/// A metric from a message
252pub struct MessageMetric {
253    /// The unique identifier of the metric
254    pub id: MetricId,
255    pub timestamp: Option<u64>,
256    pub value: Option<MetricValue>,
257    pub properties: Option<PropertySet>,
258}
259
260impl TryFrom<Metric> for MessageMetric {
261    type Error = ();
262
263    fn try_from(value: Metric) -> Result<Self, Self::Error> {
264        let id = if let Some(alias) = value.alias {
265            MetricId::Alias(alias)
266        } else if let Some(name) = value.name {
267            MetricId::Name(name)
268        } else {
269            return Err(());
270        };
271
272        let metric_value = if value.value.is_some() {
273            value.value.map(MetricValue::from)
274        } else if let Some(is_null) = value.is_null {
275            if is_null {
276                return Err(());
277            }
278            None
279        } else {
280            return Err(());
281        };
282
283        Ok(MessageMetric {
284            id,
285            timestamp: value.timestamp,
286            value: metric_value,
287            properties: None,
288        })
289    }
290}
291
292pub struct MessageMetricsIterator {
293    metric_iter: IntoIter<Metric>,
294}
295
296impl Iterator for MessageMetricsIterator {
297    type Item = MessageMetric;
298
299    fn next(&mut self) -> Option<Self::Item> {
300        let metric = self.metric_iter.next();
301        match metric {
302            Some(metric) => match metric.try_into() {
303                Ok(message_metric) => Some(message_metric),
304                Err(_) => {
305                    warn!("Got invalid or badly formed metric - skipping");
306                    self.next()
307                }
308            },
309            None => None,
310        }
311    }
312}
313
314impl IntoIterator for MessageMetrics {
315    type Item = MessageMetric;
316
317    type IntoIter = MessageMetricsIterator;
318
319    fn into_iter(self) -> Self::IntoIter {
320        MessageMetricsIterator {
321            metric_iter: self.metrics.into_iter(),
322        }
323    }
324}
325
326impl TryFrom<Payload> for MessageMetrics {
327    type Error = ();
328
329    fn try_from(value: Payload) -> Result<Self, Self::Error> {
330        /*
331        tck-id-payloads-ncmd-timestamp and tck-id-payloads-cmd-timestamp
332        messages MUST include a payload timestamp that denotes the time at which the message was published.
333        */
334        let timestamp = match value.timestamp {
335            Some(timestamp) => timestamp,
336            None => return Err(()),
337        };
338
339        Ok(MessageMetrics {
340            timestamp,
341            metrics: value.metrics,
342        })
343    }
344}