alumet 0.8.0

Modular framework for hardware and software measurement (including energy consumption and more).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
//! Measurement points and buffers.
//!
//! Each step of the Alumet pipeline reads, produces or modifies timeseries data points,
//! each represented as a [`MeasurementPoint`].
//! This is usually done through a [`MeasurementBuffer`] (for transforms and outputs)
//! or a [`MeasurementAccumulator`] (for sources).
//!
//! # Producing measurements
//!
//! Assuming that you have a `buffer: &mut MeasurementBuffer` (or `MeasurementAccumulator`),
//! you can produce new measurements like this:
//! ```no_run
//! use alumet::measurement::{MeasurementBuffer, MeasurementPoint};
//! use alumet::resources::{Resource, ResourceConsumer};
//!
//! # let buffer = MeasurementBuffer::new();
//! # let my_metric: alumet::metrics::TypedMetricId<u64> = todo!();
//! # let timestamp = todo!();
//! buffer.push(MeasurementPoint::new(
//!     timestamp, // timestamp, provided by Alumet as a parameter of [Source::poll]
//!     my_metric, // a TypedMetricId that you obtained from [AlumetPluginStart::create_metric]
//!     Resource::CpuPackage { id: 0 }, // the resource that you are measuring
//!     ResourceConsumer::LocalMachine, // the thing that consumes the resource (here the "local machine" means "no consumer, we monitor the entire cpu package")
//!     1234, // the measurement value
//! ));
//! ```

use core::fmt;
use fxhash::FxBuildHasher;
use ordered_float::OrderedFloat;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::hash::{Hash, Hasher};
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Display};

use crate::metrics::def::{RawMetricId, TypedMetricId};
use crate::resources::ResourceConsumer;

use super::resources::Resource;

/// A value that has been measured at a given point in time.
///
/// Measurement points may also have attributes.
/// Only certain types of values and attributes are allowed, see [`MeasurementType`] and [`AttributeValue`].
#[derive(Clone, Debug)]
pub struct MeasurementPoint {
    /// The metric that has been measured.
    pub metric: RawMetricId,

    /// The time of the measurement.
    pub timestamp: Timestamp,

    /// The measured value.
    pub value: WrappedMeasurementValue,

    /// The resource this measurement is about: CPU socket, GPU, process, ...
    ///
    /// The `resource` and the `consumer` specify which object has been measured.
    pub resource: Resource,

    /// The consumer of the resource: process, container, ...
    ///
    /// This gives additional information about the perimeter of the measurement.
    /// For instance, we can measure the total CPU usage of the node,
    /// or the usage of the CPU by a particular process.
    pub consumer: ResourceConsumer,

    /// Additional attributes on the measurement point.
    ///
    /// Not public because we could change how they are stored later (in fact it has already changed multiple times).
    /// Uses  [`SmallVec`] to avoid allocations if the number of attributes is small.
    attributes: SmallVec<[(Cow<'static, str>, AttributeValue); 4]>,
}

/// A measurement of a clock.
///
/// This opaque type is currently a wrapper around [`SystemTime`],
/// but this could change in the future.
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct Timestamp(pub(crate) SystemTime);

impl MeasurementPoint {
    /// Creates a new `MeasurementPoint` without attributes.
    ///
    /// Use [`with_attr`](Self::with_attr) or [`with_attr_vec`](Self::with_attr_vec)
    /// to attach arbitrary attributes to the point.
    pub fn new<T: MeasurementType>(
        timestamp: Timestamp,
        metric: TypedMetricId<T>,
        resource: Resource,
        consumer: ResourceConsumer,
        value: T::T,
    ) -> MeasurementPoint {
        Self::new_untyped(timestamp, metric.0, resource, consumer, T::wrapped_value(value))
    }

    /// Creates a new `MeasurementPoint` without attributes, using an untyped metric.
    /// Prefer to use [`MeasurementPoint::new`] with a typed metric instead.
    ///
    /// Use [`with_attr`](Self::with_attr) or [`with_attr_vec`](Self::with_attr_vec)
    /// to attach arbitrary attributes to the point.
    pub fn new_untyped(
        timestamp: Timestamp,
        metric: RawMetricId,
        resource: Resource,
        consumer: ResourceConsumer,
        value: WrappedMeasurementValue,
    ) -> MeasurementPoint {
        MeasurementPoint {
            metric,
            timestamp,
            value,
            resource,
            consumer,
            attributes: SmallVec::new(),
        }
    }

    /// Returns the number of attributes attached to this measurement point.
    pub fn attributes_len(&self) -> usize {
        self.attributes.len()
    }

    /// Iterates on the attributes attached to the measurement point.
    pub fn attributes(&self) -> impl Iterator<Item = (&str, &AttributeValue)> {
        self.attributes.iter().map(|(k, v)| (k.as_ref(), v))
    }

    /// Iterates on the keys of the attributes that are attached to the point.
    pub fn attributes_keys(&self) -> impl Iterator<Item = &str> {
        self.attributes.iter().map(|(k, _v)| k.as_ref())
    }

    /// Sets an attribute on this measurement point.
    /// If an attribute with the same key already exists, its value is replaced.
    pub fn add_attr<K: Into<Cow<'static, str>>, V: Into<AttributeValue>>(&mut self, key: K, value: V) {
        self.attributes.push((key.into(), value.into()));
    }

    /// Sets an attribute on this measurement point, and returns self to allow for method chaining.
    /// If an attribute with the same key already exists, its value is replaced.
    pub fn with_attr<K: Into<Cow<'static, str>>, V: Into<AttributeValue>>(mut self, key: K, value: V) -> Self {
        self.add_attr(key, value);
        self
    }

    /// Attaches multiple attributes to this measurement point, from a [`Vec`].
    /// Existing attributes with conflicting keys are replaced.
    pub fn with_attr_vec<K: Into<Cow<'static, str>>>(mut self, attributes: Vec<(K, AttributeValue)>) -> Self {
        self.attributes
            .extend(attributes.into_iter().map(|(k, v)| (k.into(), v)));
        self
    }

    /// Attaches multiple attributes to this measurement point, from a [`HashMap`].
    /// Existing attributes with conflicting keys are replaced.
    pub fn with_attr_map<K: Into<Cow<'static, str>>>(
        mut self,
        attributes: HashMap<K, AttributeValue, FxBuildHasher>,
    ) -> Self {
        let converted = attributes.into_iter().map(|(k, v)| (k.into(), v));
        if self.attributes.is_empty() {
            self.attributes = converted.collect();
        } else {
            self.attributes.extend(converted);
        }
        self
    }
}

impl Timestamp {
    /// Returns a `Timestamp` representing the current system time.
    pub fn now() -> Self {
        Self(SystemTime::now())
    }

    pub fn to_unix_timestamp(&self) -> (u64, u32) {
        let t = self.0.duration_since(UNIX_EPOCH).unwrap();
        (t.as_secs(), t.subsec_nanos())
    }

    /// Returns the amount of time elapsed from an earlier point in time.
    pub fn duration_since(&self, earlier: Timestamp) -> Result<Duration, SystemTimeError> {
        self.0.duration_since(earlier.0)
    }
}

impl From<SystemTime> for Timestamp {
    fn from(value: SystemTime) -> Self {
        Self(value)
    }
}

impl From<Timestamp> for SystemTime {
    fn from(value: Timestamp) -> Self {
        value.0
    }
}

impl fmt::Debug for Timestamp {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        self.0.fmt(f)
    }
}

/// Trait implemented by types that are accepted as measurement values.
pub trait MeasurementType {
    type T;

    fn wrapped_value(v: Self::T) -> WrappedMeasurementValue;
    fn wrapped_type() -> WrappedMeasurementType;
}
impl MeasurementType for u64 {
    type T = u64;

    fn wrapped_value(v: Self::T) -> WrappedMeasurementValue {
        WrappedMeasurementValue::U64(v)
    }

    fn wrapped_type() -> WrappedMeasurementType {
        WrappedMeasurementType::U64
    }
}
impl MeasurementType for f64 {
    type T = f64;

    fn wrapped_value(v: Self::T) -> WrappedMeasurementValue {
        WrappedMeasurementValue::F64(v)
    }

    fn wrapped_type() -> WrappedMeasurementType {
        WrappedMeasurementType::F64
    }
}

/// Enum of the possible measurement types.
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum WrappedMeasurementType {
    F64,
    U64,
}
impl fmt::Display for WrappedMeasurementType {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{self:?}")
    }
}

/// A measurement value of any supported measurement type.
#[derive(Debug, Clone, PartialEq)]
pub enum WrappedMeasurementValue {
    F64(f64),
    U64(u64),
}

impl WrappedMeasurementValue {
    pub fn measurement_type(&self) -> WrappedMeasurementType {
        match self {
            WrappedMeasurementValue::F64(_) => WrappedMeasurementType::F64,
            WrappedMeasurementValue::U64(_) => WrappedMeasurementType::U64,
        }
    }
}

/// An attribute value of any supported attribute type.
#[derive(Debug, Clone, PartialEq)]
pub enum AttributeValue {
    F64(f64),
    U64(u64),
    Bool(bool),
    /// A borrowed string attribute.
    ///
    /// If you can use `AttributeValue::Str` instead of `AttributeValue::String`,
    /// do it: it will save a memory allocation.
    Str(&'static str),
    String(String),
}

impl Hash for AttributeValue {
    fn hash<H: Hasher>(&self, state: &mut H) {
        match self {
            AttributeValue::F64(f64_value) => OrderedFloat(*f64_value).hash(state),
            AttributeValue::Bool(bool_value) => bool_value.hash(state),
            AttributeValue::U64(u64_value) => u64_value.hash(state),
            AttributeValue::Str(str_value) => str_value.hash(state),
            AttributeValue::String(string_value) => string_value.hash(state),
        }
    }
}

impl Eq for AttributeValue {}

impl Display for AttributeValue {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            AttributeValue::F64(x) => write!(f, "{x}"),
            AttributeValue::U64(x) => write!(f, "{x}"),
            AttributeValue::Bool(x) => write!(f, "{x}"),
            AttributeValue::Str(str) => f.write_str(str),
            AttributeValue::String(str) => f.write_str(str),
        }
    }
}

impl From<f64> for AttributeValue {
    fn from(value: f64) -> Self {
        AttributeValue::F64(value)
    }
}

impl From<u64> for AttributeValue {
    fn from(value: u64) -> Self {
        AttributeValue::U64(value)
    }
}

impl From<bool> for AttributeValue {
    fn from(value: bool) -> Self {
        AttributeValue::Bool(value)
    }
}

impl From<String> for AttributeValue {
    fn from(value: String) -> Self {
        AttributeValue::String(value)
    }
}

impl From<&'static str> for AttributeValue {
    fn from(value: &'static str) -> Self {
        AttributeValue::Str(value)
    }
}

/// A `MeasurementBuffer` stores measured data points.
/// Unlike a [`MeasurementAccumulator`], the buffer allows to modify the measurements.
#[derive(Clone, Debug)]
pub struct MeasurementBuffer {
    points: Vec<MeasurementPoint>,
}

impl MeasurementBuffer {
    /// Constructs a new buffer.
    pub fn new() -> MeasurementBuffer {
        MeasurementBuffer { points: Vec::new() }
    }

    /// Constructs a new buffer with at least the specified capacity (allocated on construction).
    pub fn with_capacity(capacity: usize) -> MeasurementBuffer {
        MeasurementBuffer {
            points: Vec::with_capacity(capacity),
        }
    }

    /// Returns true if this buffer is empty.
    pub fn is_empty(&self) -> bool {
        self.points.is_empty()
    }

    /// Returns the number of measurement points in the buffer.
    pub fn len(&self) -> usize {
        self.points.len()
    }

    /// Reserves capacity for at least `additional` more elements.
    /// See [`Vec::reserve`].
    pub fn reserve(&mut self, additional: usize) {
        self.points.reserve(additional);
    }

    /// Adds a measurement to the buffer.
    /// The measurement points are *not* automatically deduplicated by the buffer.
    pub fn push(&mut self, point: MeasurementPoint) {
        self.points.push(point);
    }

    /// Merges another buffer into this buffer.
    /// All the measurement points of `other` are moved to `self`.
    pub fn merge(&mut self, other: &mut MeasurementBuffer) {
        self.points.append(&mut other.points);
    }

    /// Clears the buffer, removing all the measurements.
    pub fn clear(&mut self) {
        self.points.clear();
    }

    /// Creates an iterator on the buffer's content.
    pub fn iter(&self) -> impl Iterator<Item = &MeasurementPoint> {
        self.points.iter()
    }

    /// Creates an iterator that allows to modify the measurements.
    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut MeasurementPoint> {
        self.points.iter_mut()
    }

    /// Returns a `MeasurementAccumulator` that will push all measurements to this buffer.
    pub fn as_accumulator(&mut self) -> MeasurementAccumulator {
        MeasurementAccumulator(self)
    }
}

impl Default for MeasurementBuffer {
    fn default() -> Self {
        Self {
            points: Default::default(),
        }
    }
}

impl<'a> IntoIterator for &'a MeasurementBuffer {
    type Item = &'a MeasurementPoint;
    type IntoIter = std::slice::Iter<'a, MeasurementPoint>;

    fn into_iter(self) -> Self::IntoIter {
        self.points.iter()
    }
}

impl IntoIterator for MeasurementBuffer {
    type Item = MeasurementPoint;
    type IntoIter = std::vec::IntoIter<MeasurementPoint>;

    fn into_iter(self) -> Self::IntoIter {
        self.points.into_iter()
    }
}

impl FromIterator<MeasurementPoint> for MeasurementBuffer {
    fn from_iter<T: IntoIterator<Item = MeasurementPoint>>(iter: T) -> Self {
        Self {
            points: Vec::from_iter(iter),
        }
    }
}

impl From<Vec<MeasurementPoint>> for MeasurementBuffer {
    fn from(value: Vec<MeasurementPoint>) -> Self {
        MeasurementBuffer { points: value }
    }
}

/// An accumulator stores measured data points.
/// Unlike a [`MeasurementBuffer`], the accumulator only allows to [`push`](MeasurementAccumulator::push) new points, not to modify them.
pub struct MeasurementAccumulator<'a>(&'a mut MeasurementBuffer);

impl<'a> MeasurementAccumulator<'a> {
    /// Adds a new measurement to this accumulator.
    /// The measurement points are not deduplicated by the accumulator.
    pub fn push(&mut self, point: MeasurementPoint) {
        self.0.push(point)
    }

    pub(crate) fn as_inner(&'a self) -> &'a MeasurementBuffer {
        self.0
    }
}