1use std::marker::PhantomData;
2use std::vec::IntoIter;
3
4use log::warn;
5use srad_types::payload::{Metric, Payload};
6use srad_types::utils::timestamp;
7use srad_types::MetaData;
8use srad_types::PropertySet;
9use srad_types::{traits, MetricId, MetricValue};
10
11use thiserror::Error;
12
13#[derive(Debug, Error)]
14pub enum PublishError {
15 #[error("Connection state is Offline")]
16 Offline,
17 #[error("No metrics provided.")]
18 NoMetrics,
19 #[error("The node or device is not birthed.")]
20 UnBirthed,
21}
22
23pub trait MetricPublisher {
31 fn try_publish_metrics_unsorted(
33 &self,
34 metrics: Vec<PublishMetric>,
35 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
36
37 fn try_publish_metric(
39 &self,
40 metric: PublishMetric,
41 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
42 self.try_publish_metrics_unsorted(vec![metric])
43 }
44
45 fn try_publish_metrics(
47 &self,
48 mut metrics: Vec<PublishMetric>,
49 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
50 metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
51 self.publish_metrics_unsorted(metrics)
52 }
53
54 fn publish_metrics_unsorted(
56 &self,
57 metrics: Vec<PublishMetric>,
58 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send;
59
60 fn publish_metric(
62 &self,
63 metric: PublishMetric,
64 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
65 self.publish_metrics_unsorted(vec![metric])
66 }
67
68 fn publish_metrics(
70 &self,
71 mut metrics: Vec<PublishMetric>,
72 ) -> impl std::future::Future<Output = Result<(), PublishError>> + Send {
73 metrics.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
74 self.publish_metrics_unsorted(metrics)
75 }
76}
77
78pub struct PublishMetric {
84 metric_identifier: MetricId,
85 value: Option<MetricValue>,
86 is_transient: Option<bool>,
87 is_historical: Option<bool>,
88 timestamp: u64,
89 metadata: Option<MetaData>,
90 properties: Option<PropertySet>,
91}
92
93impl PublishMetric {
94 pub(crate) fn new<T: traits::MetricValue>(
95 metric_identifier: MetricId,
96 value: Option<T>,
97 ) -> Self {
98 let metadata = if let Some(v) = &value {
99 v.publish_metadata()
100 } else {
101 None
102 };
103 Self {
104 metric_identifier,
105 metadata,
106 value: value.map(T::into),
107 is_transient: None,
108 is_historical: None,
109 properties: None,
110 timestamp: timestamp(),
111 }
112 }
113 pub fn timestamp(mut self, timestamp: u64) -> Self {
117 self.timestamp = timestamp;
118 self
119 }
120
121 pub fn transient(mut self, is_transient: bool) -> Self {
125 self.is_transient = Some(is_transient);
126 self
127 }
128
129 pub fn historical(mut self, is_historical: bool) -> Self {
133 self.is_historical = Some(is_historical);
134 self
135 }
136
137 pub fn metadata(mut self, metadata: MetaData) -> Self {
141 self.metadata = Some(metadata);
142 self
143 }
144
145 pub fn properties<P: Into<PropertySet>>(mut self, properties: P) -> Self {
147 self.properties = Some(properties.into());
148 self
149 }
150}
151
152impl From<PublishMetric> for Metric {
153 fn from(value: PublishMetric) -> Self {
154 let mut metric = Metric::new();
155 match value.metric_identifier {
156 MetricId::Name(name) => metric.set_name(name),
157 MetricId::Alias(alias) => metric.set_alias(alias),
158 };
159
160 metric.metadata = value.metadata.map(MetaData::into);
161
162 if let Some(val) = value.value {
163 metric.set_value(val.into());
164 }
165
166 metric.timestamp = Some(value.timestamp);
167 metric.properties = value.properties.map(PropertySet::into);
168
169 metric.is_historical = value.is_historical;
170 metric.is_transient = value.is_transient;
171
172 metric
173 }
174}
175
176pub struct MetricToken<T> {
180 phantom: PhantomData<T>,
181 pub id: MetricId,
183}
184
185impl<T> MetricToken<T>
186where
187 T: traits::MetricValue,
188{
189 pub(crate) fn new(id: MetricId) -> Self {
190 Self {
191 phantom: PhantomData,
192 id,
193 }
194 }
195
196 pub fn create_publish_metric(&self, value: Option<T>) -> PublishMetric {
198 PublishMetric::new(self.id.clone(), value)
199 }
200}
201
202pub struct MessageMetrics {
204 pub timestamp: u64,
206 metrics: Vec<Metric>,
207}
208
209impl MessageMetrics {
210 pub fn len(&self) -> usize {
211 self.metrics.len()
212 }
213
214 pub fn is_empty(&self) -> bool {
215 self.metrics.len() == 0
216 }
217}
218
219pub struct MessageMetric {
221 pub id: MetricId,
223 pub timestamp: Option<u64>,
224 pub value: Option<MetricValue>,
225 pub properties: Option<PropertySet>,
226}
227
228impl TryFrom<Metric> for MessageMetric {
229 type Error = ();
230
231 fn try_from(value: Metric) -> Result<Self, Self::Error> {
232 let id = if let Some(alias) = value.alias {
233 MetricId::Alias(alias)
234 } else if let Some(name) = value.name {
235 MetricId::Name(name)
236 } else {
237 return Err(());
238 };
239
240 let metric_value = if value.value.is_some() {
241 value.value.map(MetricValue::from)
242 } else if let Some(is_null) = value.is_null {
243 if is_null {
244 return Err(());
245 }
246 None
247 } else {
248 return Err(());
249 };
250
251 Ok(MessageMetric {
252 id,
253 timestamp: value.timestamp,
254 value: metric_value,
255 properties: None,
256 })
257 }
258}
259
260pub struct MessageMetricsIterator {
261 metric_iter: IntoIter<Metric>,
262}
263
264impl Iterator for MessageMetricsIterator {
265 type Item = MessageMetric;
266
267 fn next(&mut self) -> Option<Self::Item> {
268 let metric = self.metric_iter.next();
269 match metric {
270 Some(metric) => match metric.try_into() {
271 Ok(message_metric) => Some(message_metric),
272 Err(_) => {
273 warn!("Got invalid or badly formed metric - skipping");
274 self.next()
275 }
276 },
277 None => None,
278 }
279 }
280}
281
282impl IntoIterator for MessageMetrics {
283 type Item = MessageMetric;
284
285 type IntoIter = MessageMetricsIterator;
286
287 fn into_iter(self) -> Self::IntoIter {
288 MessageMetricsIterator {
289 metric_iter: self.metrics.into_iter(),
290 }
291 }
292}
293
294impl TryFrom<Payload> for MessageMetrics {
295 type Error = ();
296
297 fn try_from(value: Payload) -> Result<Self, Self::Error> {
298 let timestamp = match value.timestamp {
303 Some(timestamp) => timestamp,
304 None => return Err(()),
305 };
306
307 Ok(MessageMetrics {
308 timestamp,
309 metrics: value.metrics,
310 })
311 }
312}