1use std::marker::PhantomData;
2use std::vec::IntoIter;
3
4use log::warn;
5use srad_types::payload::{Metric, Payload};
6use srad_types::utils::timestamp;
7use srad_types::PropertySet;
8use srad_types::{traits, MetricId, MetricValue};
9use srad_types::{MetaData, PartialTemplate, Template};
10
11use thiserror::Error;
12
13use crate::StateError;
14
15#[derive(Debug, Error)]
16pub enum PublishError {
17 #[error("No metrics provided.")]
18 NoMetrics,
19 #[error("State Error: {0}.")]
20 State(StateError),
21}
22
23impl From<StateError> for PublishError {
24 fn from(value: StateError) -> Self {
25 PublishError::State(value)
26 }
27}
28
29pub trait MetricPublisher {
37 fn try_publish_metrics_unsorted(
39 &self,
40 metrics: Vec<PublishMetric>,
41 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
42
43 fn try_publish_metric(
45 &self,
46 metric: PublishMetric,
47 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
48 self.try_publish_metrics_unsorted(vec![metric])
49 }
50
51 fn try_publish_metrics(
53 &self,
54 mut metrics: Vec<PublishMetric>,
55 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
56 metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
57 self.publish_metrics_unsorted(metrics)
58 }
59
60 fn publish_metrics_unsorted(
62 &self,
63 metrics: Vec<PublishMetric>,
64 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
65
66 fn publish_metric(
68 &self,
69 metric: PublishMetric,
70 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
71 self.publish_metrics_unsorted(vec![metric])
72 }
73
74 fn publish_metrics(
76 &self,
77 mut metrics: Vec<PublishMetric>,
78 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
79 metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
80 self.publish_metrics_unsorted(metrics)
81 }
82}
83
84pub struct PublishMetric {
90 metric_identifier: MetricId,
91 value: Option<MetricValue>,
92 is_transient: Option<bool>,
93 is_historical: Option<bool>,
94 timestamp: u64,
95 metadata: Option<MetaData>,
96 properties: Option<PropertySet>,
97}
98
99impl PublishMetric {
100 pub(crate) fn new<T: traits::MetricValue>(
101 metric_identifier: MetricId,
102 value: Option<T>,
103 ) -> Self {
104 let metadata = if let Some(v) = &value {
105 v.publish_metadata()
106 } else {
107 None
108 };
109 Self {
110 metric_identifier,
111 metadata,
112 value: value.map(T::into),
113 is_transient: None,
114 is_historical: None,
115 properties: None,
116 timestamp: timestamp(),
117 }
118 }
119 pub fn timestamp(mut self, timestamp: u64) -> Self {
123 self.timestamp = timestamp;
124 self
125 }
126
127 pub fn transient(mut self, is_transient: bool) -> Self {
131 self.is_transient = Some(is_transient);
132 self
133 }
134
135 pub fn historical(mut self, is_historical: bool) -> Self {
139 self.is_historical = Some(is_historical);
140 self
141 }
142
143 pub fn metadata(mut self, metadata: MetaData) -> Self {
147 self.metadata = Some(metadata);
148 self
149 }
150
151 pub fn properties<P: Into<PropertySet>>(mut self, properties: P) -> Self {
153 self.properties = Some(properties.into());
154 self
155 }
156}
157
158impl From<PublishMetric> for Metric {
159 fn from(value: PublishMetric) -> Self {
160 let mut metric = Metric::new();
161 match value.metric_identifier {
162 MetricId::Name(name) => metric.set_name(name),
163 MetricId::Alias(alias) => metric.set_alias(alias),
164 };
165
166 metric.metadata = value.metadata.map(MetaData::into);
167
168 if let Some(val) = value.value {
169 metric.set_value(val.into());
170 }
171
172 metric.timestamp = Some(value.timestamp);
173 metric.properties = value.properties.map(PropertySet::into);
174
175 metric.is_historical = value.is_historical;
176 metric.is_transient = value.is_transient;
177
178 metric
179 }
180}
181
182pub struct MetricToken<T> {
186 phantom: PhantomData<T>,
187 pub id: MetricId,
189}
190
191impl<T> MetricToken<T> {
192 pub(crate) fn new(id: MetricId) -> Self {
193 Self {
194 phantom: PhantomData,
195 id,
196 }
197 }
198}
199
200impl<T> MetricToken<T>
201where
202 T: traits::MetricValue,
203{
204 pub fn create_publish_metric(&self, value: Option<T>) -> PublishMetric {
206 PublishMetric::new(self.id.clone(), value)
207 }
208}
209
210impl<T> MetricToken<T>
211where
212 T: Template,
213{
214 pub fn create_publish_template_metric(&self, value: T) -> PublishMetric {
216 PublishMetric::new(self.id.clone(), Some(value.template_instance()))
217 }
218}
219
220impl<T> MetricToken<T>
221where
222 T: Template + PartialTemplate,
223{
224 pub fn create_publish_template_metric_from_difference(
225 &self,
226 value: T,
227 other: &T,
228 ) -> Option<PublishMetric> {
229 let diff = value.template_instance_from_difference(other)?;
230 Some(PublishMetric::new(self.id.clone(), Some(diff)))
231 }
232}
233
234pub struct MessageMetrics {
236 pub timestamp: u64,
238 metrics: Vec<Metric>,
239}
240
241impl MessageMetrics {
242 pub fn len(&self) -> usize {
243 self.metrics.len()
244 }
245
246 pub fn is_empty(&self) -> bool {
247 self.metrics.len() == 0
248 }
249}
250
251pub struct MessageMetric {
253 pub id: MetricId,
255 pub timestamp: Option<u64>,
256 pub value: Option<MetricValue>,
257 pub properties: Option<PropertySet>,
258}
259
260impl TryFrom<Metric> for MessageMetric {
261 type Error = ();
262
263 fn try_from(value: Metric) -> Result<Self, Self::Error> {
264 let id = if let Some(alias) = value.alias {
265 MetricId::Alias(alias)
266 } else if let Some(name) = value.name {
267 MetricId::Name(name)
268 } else {
269 return Err(());
270 };
271
272 let metric_value = if value.value.is_some() {
273 value.value.map(MetricValue::from)
274 } else if let Some(is_null) = value.is_null {
275 if is_null {
276 return Err(());
277 }
278 None
279 } else {
280 return Err(());
281 };
282
283 Ok(MessageMetric {
284 id,
285 timestamp: value.timestamp,
286 value: metric_value,
287 properties: None,
288 })
289 }
290}
291
292pub struct MessageMetricsIterator {
293 metric_iter: IntoIter<Metric>,
294}
295
296impl Iterator for MessageMetricsIterator {
297 type Item = MessageMetric;
298
299 fn next(&mut self) -> Option<Self::Item> {
300 let metric = self.metric_iter.next();
301 match metric {
302 Some(metric) => match metric.try_into() {
303 Ok(message_metric) => Some(message_metric),
304 Err(_) => {
305 warn!("Got invalid or badly formed metric - skipping");
306 self.next()
307 }
308 },
309 None => None,
310 }
311 }
312}
313
314impl IntoIterator for MessageMetrics {
315 type Item = MessageMetric;
316
317 type IntoIter = MessageMetricsIterator;
318
319 fn into_iter(self) -> Self::IntoIter {
320 MessageMetricsIterator {
321 metric_iter: self.metrics.into_iter(),
322 }
323 }
324}
325
326impl TryFrom<Payload> for MessageMetrics {
327 type Error = ();
328
329 fn try_from(value: Payload) -> Result<Self, Self::Error> {
330 let timestamp = match value.timestamp {
335 Some(timestamp) => timestamp,
336 None => return Err(()),
337 };
338
339 Ok(MessageMetrics {
340 timestamp,
341 metrics: value.metrics,
342 })
343 }
344}