alumet 0.8.0

Modular framework for hardware and software measurement (including energy consumption and more).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
//! Phases of the plugins lifecycle.
use std::future::Future;
use std::marker::PhantomData;

use crate::measurement::{MeasurementType, WrappedMeasurementType};
use crate::metrics::def::{Metric, RawMetricId, TypedMetricId};
use crate::metrics::duplicate::{DuplicateCriteria, DuplicateReaction};
use crate::metrics::error::MetricCreationError;
use crate::metrics::online::listener::{MetricListener, MetricListenerBuilder};
use crate::metrics::online::{MetricReader, MetricSender};
use crate::metrics::registry::MetricRegistry;
use crate::pipeline::control::key::{OutputKey, SourceKey, TransformKey};
use crate::pipeline::elements::source::builder::{ManagedSource, SourceBuilder};
use crate::pipeline::elements::source::trigger::TriggerSpec;
use crate::pipeline::elements::{output, source, transform};
use crate::pipeline::naming::{namespace::DuplicateNameError, PluginName};
use crate::pipeline::{self, Output, Source, Transform};
use crate::units::PrefixedUnit;

/// Structure passed to plugins for the start-up phase.
///
/// It allows the plugins to perform some actions before starting the measurement pipeline,
/// such as registering new measurement sources.
///
/// # Note for applications
/// You cannot create `AlumetPluginStart` manually, build an agent with [`agent::Builder`](crate::agent::Builder) instead.
pub struct AlumetPluginStart<'a> {
    pub(crate) current_plugin: PluginName,
    pub(crate) pipeline_builder: &'a mut pipeline::Builder,
    pub(crate) pre_start_actions: &'a mut Vec<(PluginName, Box<dyn PreStartAction>)>,
    pub(crate) post_start_actions: &'a mut Vec<(PluginName, Box<dyn PostStartAction>)>,
}

pub trait PostStartAction: FnOnce(&mut AlumetPostStart) -> anyhow::Result<()> {}
impl<F> PostStartAction for F where F: FnOnce(&mut AlumetPostStart) -> anyhow::Result<()> {}

pub trait PreStartAction: FnOnce(&mut AlumetPreStart) -> anyhow::Result<()> {}
impl<F> PreStartAction for F where F: FnOnce(&mut AlumetPreStart) -> anyhow::Result<()> {}

impl<'a> AlumetPluginStart<'a> {
    /// Returns the name of the plugin that is being started.
    fn current_plugin_name(&self) -> PluginName {
        self.current_plugin.clone()
    }

    /// Creates a new metric with a measurement type `T` (checked at compile time).
    /// Fails if a metric with the same name already exists.
    ///
    /// # Example
    /// ```no_run
    /// use alumet::units::{Unit, PrefixedUnit};
    /// use alumet::metrics::TypedMetricId;
    /// # use alumet::plugin::AlumetPluginStart;
    ///
    /// # fn f() -> anyhow::Result<()> {
    /// # let alumet: &AlumetPluginStart = todo!();
    /// let proc_exec_time: TypedMetricId<u64> = alumet
    ///     .create_metric("process_execution_time", Unit::Second, "execution time of a process")?;
    ///
    /// let ram_power: TypedMetricId<u64> = alumet
    ///     .create_metric("ram_electrical_power", PrefixedUnit::milli(Unit::Watt), "instantaneous power consumption of a memory module")?;
    ///
    /// # }
    /// ```
    pub fn create_metric<T: MeasurementType>(
        &mut self,
        name: impl Into<String>,
        unit: impl Into<PrefixedUnit>,
        description: impl Into<String>,
    ) -> Result<TypedMetricId<T>, MetricCreationError> {
        let m = Metric {
            name: name.into(),
            description: description.into(),
            value_type: T::wrapped_type(),
            unit: unit.into(),
        };
        let untyped_id =
            self.pipeline_builder
                .metrics
                .register(m, DuplicateCriteria::Different, DuplicateReaction::Error)?;
        Ok(TypedMetricId(untyped_id, PhantomData))
    }

    /// Creates a new metric with a measurement type `value_type` (checked at **run time**).
    /// Fails if a metric with the same name already exists.
    ///
    /// Unlike [`TypedMetricId`], an [`RawMetricId`] does not allow to check that the
    /// measured values are of the right type at compile time.
    /// It is better to use [`create_metric`](Self::create_metric).
    pub fn create_metric_untyped(
        &mut self,
        name: &str,
        value_type: WrappedMeasurementType,
        unit: impl Into<PrefixedUnit>,
        description: &str,
    ) -> Result<RawMetricId, MetricCreationError> {
        let m = Metric {
            name: name.to_owned(),
            description: description.to_owned(),
            value_type,
            unit: unit.into(),
        };
        self.pipeline_builder
            .metrics
            .register(m, DuplicateCriteria::Different, DuplicateReaction::Error)
    }

    /// Adds a _managed_ measurement source to the Alumet pipeline.
    pub fn add_source(
        &mut self,
        name: &str,
        source: Box<dyn Source>,
        trigger_spec: TriggerSpec,
    ) -> Result<SourceKey, DuplicateNameError> {
        self.add_source_builder(name, |_| Ok(ManagedSource { trigger_spec, source }))
    }

    /// Adds the builder of a _managed_ measurement source to the Alumet pipeline.
    ///
    /// Unlike [`add_source`](Self::add_source), the source is not created immediately but during the construction
    /// of the measurement pipeline. This allows to use some information about the pipeline while
    /// creating the source. A good use case is to access the late registration of metrics.
    ///
    /// The downside is a more complicated code.
    /// In general, you should prefer to use [`add_source`](Self::add_source) if possible.
    pub fn add_source_builder<F: source::builder::ManagedSourceBuilder + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<SourceKey, DuplicateNameError> {
        let plugin = self.current_plugin_name();
        self.pipeline_builder
            .add_source_builder(plugin, name, SourceBuilder::Managed(Box::new(builder)))
    }

    /// Adds the builder of an _autonomous_ source to the Alumet pipeline.
    ///
    /// # Autonomous sources
    /// An autonomous source is not triggered by Alumet, but runs independently.
    /// It is given a [`Sender`](tokio::sync::mpsc::Sender) to send its measurements
    /// to the rest of the Alumet pipeline (transforms and outputs).
    ///
    /// # Graceful shutdown
    /// To stop the autonomous source, a [`CancellationToken`](tokio_util::sync::CancellationToken) is provided.
    /// When the token is cancelled, you should stop the source.
    ///
    /// # Example
    /// ```no_run
    /// use std::time::SystemTime;
    ///
    /// use alumet::measurement::{MeasurementBuffer, MeasurementPoint, Timestamp};
    /// use alumet::units::Unit;
    /// # use alumet::plugin::AlumetPluginStart;
    ///
    /// # let alumet: &AlumetPluginStart = todo!();
    /// let metric = alumet.create_metric::<u64>("my_metric", Unit::Second, "...").unwrap();
    /// alumet.add_autonomous_source_builder("source_name", move |ctx, cancel_token, tx| {
    ///     let out_tx = tx.clone();
    ///     let source = Box::pin(async move {
    ///         let mut buf = MeasurementBuffer::new();
    ///         while !cancel_token.is_cancelled() {
    ///             let timestamp = Timestamp::now();
    ///             let resource = todo!();
    ///             let consumer = todo!();
    ///             let value = todo!();
    ///             let measurement = MeasurementPoint::new(
    ///                 timestamp,
    ///                 metric,
    ///                 resource,
    ///                 consumer,
    ///                 value
    ///             );
    ///             buf.push(measurement);
    ///             out_tx.send(buf.clone());
    ///             buf.clear();
    ///         }
    ///         Ok(())
    ///     });
    ///     Ok(source)
    /// }).expect("source names should be unique (in the same plugin)");
    /// ```
    pub fn add_autonomous_source_builder<F: source::builder::AutonomousSourceBuilder + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<SourceKey, DuplicateNameError> {
        let plugin = self.current_plugin_name();
        self.pipeline_builder
            .add_source_builder(plugin, name, SourceBuilder::Autonomous(Box::new(builder)))
    }

    /// Adds a transform step to the Alumet pipeline.
    ///
    /// # Example
    ///
    /// ```no_run
    /// use alumet::pipeline::elements::transform::{Transform, TransformContext};
    /// use alumet::pipeline::elements::error::TransformError;
    /// use alumet::measurement::MeasurementBuffer;
    /// # use alumet::plugin::AlumetPluginStart;
    ///
    /// // Define the transform
    /// struct ExampleTransform;
    /// impl Transform for ExampleTransform {
    ///     fn apply(&mut self, m: &mut MeasurementBuffer, ctx: &TransformContext) -> Result<(), TransformError> {
    ///         todo!(); // do something with the measurements
    ///         Ok(())
    ///     }
    /// }
    ///
    /// # let alumet: &AlumetPluginStart = todo!();
    /// #
    /// // In start(&mut self, alumet: &mut AlumetPluginStart),
    /// // add the transform to the pipeline.
    /// let transform = ExampleTransform;
    /// alumet.add_transform("name", Box::new(transform));
    /// ```
    pub fn add_transform(
        &mut self,
        name: &str,
        transform: Box<dyn Transform>,
    ) -> Result<TransformKey, DuplicateNameError> {
        self.add_transform_builder(name, |_| Ok(transform))
    }

    /// Adds the builder of a transform step to the Alumet pipeline.
    ///
    /// # Example
    ///
    /// ```no_run
    ///
    /// # use alumet::plugin::AlumetPluginStart;
    /// # let alumet: &AlumetPluginStart = todo!();
    ///
    /// use alumet::pipeline::elements::transform::Transform;
    ///
    /// alumet.add_transform_builder("name", move |ctx| {
    ///     let transform: Box<dyn Transform> = todo!();
    ///     Ok(transform)
    /// });
    /// ```
    pub fn add_transform_builder<F: transform::builder::TransformBuilder + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<TransformKey, DuplicateNameError> {
        let plugin = self.current_plugin_name();
        self.pipeline_builder
            .add_transform_builder(plugin, name, Box::new(builder))
    }

    /// Adds a _blocking_ output to the Alumet pipeline.
    ///
    /// # Example
    /// ```no_run
    /// use alumet::pipeline::elements::output::{Output, OutputContext};
    /// use alumet::pipeline::elements::error::WriteError;
    /// use alumet::measurement::MeasurementBuffer;
    /// # use alumet::plugin::AlumetPluginStart;
    ///
    /// use anyhow::Context;
    ///
    /// // Define the output
    /// struct ExampleOutput;
    /// impl Output for ExampleOutput {
    ///     fn write(&mut self, m: &MeasurementBuffer, ctx: &OutputContext) -> Result<(), WriteError> {
    ///         // do something with the measurements
    ///         for point in m.iter() {
    ///             todo!()
    ///         }
    ///         Ok(())
    ///     }
    /// }
    ///
    /// # let alumet: &AlumetPluginStart = todo!();
    /// #
    /// // In start(&mut self, alumet: &mut AlumetPluginStart),
    /// // add the output to the pipeline.
    /// let output = ExampleOutput;
    /// alumet.add_blocking_output("name", Box::new(output));
    /// ```
    pub fn add_blocking_output(
        &mut self,
        name: &str,
        output: Box<dyn Output>,
    ) -> Result<OutputKey, DuplicateNameError> {
        self.add_blocking_output_builder(name, |_| Ok(output))
    }

    /// Adds the builder of a _blocking_ output to the Alumet pipeline.
    ///
    /// Unlike [`add_blocking_output`](Self::add_blocking_output), the output is not created immediately but during the construction
    /// of the measurement pipeline. This allows to use some information about the pipeline while
    /// creating the output.
    ///
    /// # Async outputs
    /// If you intend to use async functions to implement your output, consider using [`add_async_output_builder`](Self::add_async_output_builder)
    /// instead.
    pub fn add_blocking_output_builder<F: output::builder::BlockingOutputBuilder + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<OutputKey, DuplicateNameError> {
        let plugin = self.current_plugin_name();
        let builder = output::builder::OutputBuilder::Blocking(Box::new(builder));
        self.pipeline_builder.add_output_builder(plugin, name, builder)
    }

    /// Adds the builder of an _async_ output to the Alumet pipeline.
    pub fn add_async_output_builder<F: output::builder::AsyncOutputBuilder + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<OutputKey, DuplicateNameError> {
        let plugin = self.current_plugin_name();
        let builder = output::builder::OutputBuilder::Async(Box::new(builder));
        self.pipeline_builder.add_output_builder(plugin, name, builder)
    }

    /// Registers a callback that will run just after the pipeline startup.
    ///
    /// If you have some data to move to the pipeline start phase, it's easier
    /// to use this method than [`crate::plugin::Plugin::post_pipeline_start`].
    ///
    /// # Example
    /// ```no_run
    /// # use alumet::plugin::AlumetPluginStart;
    /// # let alumet: &AlumetPluginStart = todo!();
    /// alumet.on_pipeline_start(|ctx| {
    ///     // ctx is a `&mut AlumetPostStart`
    ///     let control_handle = ctx.pipeline_control();
    ///     todo!();
    ///     Ok(())
    /// })
    /// ```
    pub fn on_pipeline_start<F: PostStartAction + 'static>(&mut self, action: F) {
        let plugin = self.current_plugin_name();
        self.post_start_actions.push((plugin, Box::new(action)));
    }

    /// Registers a callback that will run just before the pipeline startup.
    ///
    /// If you have some data to move to the pipeline start phase, it's easier
    /// to use this method than [`crate::plugin::Plugin::pre_pipeline_start`].
    pub fn on_pre_pipeline_start<F: PreStartAction + 'static>(&mut self, action: F) {
        let plugin = self.current_plugin_name();
        self.pre_start_actions.push((plugin, Box::new(action)));
    }
}

/// Structure passed to plugins for the pre start-up phase.
pub struct AlumetPreStart<'a> {
    pub(crate) current_plugin: PluginName,
    pub(crate) pipeline_builder: &'a mut pipeline::Builder,
}

impl<'a> AlumetPreStart<'a> {
    /// Returns the name of the plugin that has started.
    pub fn current_plugin_name(&self) -> PluginName {
        self.current_plugin.clone()
    }

    /// Returns a read-only access to the [`MetricRegistry`].
    pub fn metrics(&self) -> &MetricRegistry {
        &self.pipeline_builder.metrics
    }

    /// Registers a metric listener, which will be notified of all the new registered metrics.
    pub fn add_metric_listener<F: MetricListener + Send + 'static>(
        &mut self,
        name: &str,
        listener: F,
    ) -> Result<(), DuplicateNameError> {
        let plugin = self.current_plugin_name();
        self.pipeline_builder
            .add_metric_listener_builder(plugin, name, Box::new(|_| Ok(Box::new(listener))))
    }

    /// Registers a metric listener builder, which will construct a listener that
    /// will be notified of all the new registered metrics.
    pub fn add_metric_listener_builder<F: MetricListenerBuilder + Send + 'static>(
        &mut self,
        name: &str,
        builder: F,
    ) -> Result<(), DuplicateNameError> {
        let plugin = self.current_plugin_name();
        self.pipeline_builder
            .add_metric_listener_builder(plugin, name, Box::new(builder))
    }
}

/// Structure passed to plugins for the post start-up phase.
pub struct AlumetPostStart<'a> {
    pub(crate) current_plugin: PluginName,
    pub(crate) pipeline: &'a mut pipeline::MeasurementPipeline,
}

impl<'a> AlumetPostStart<'a> {
    /// Returns the name of the plugin that has started.
    pub fn current_plugin_name(&self) -> PluginName {
        self.current_plugin.clone()
    }

    /// Returns a handle that allows to send commands to control the measurement pipeline
    /// while it is running.
    pub fn pipeline_control(&self) -> pipeline::control::PluginControlHandle {
        self.pipeline.control_handle().with_plugin(self.current_plugin.clone())
    }

    /// Returns a handle that allows to register new metrics while the pipeline is running,
    /// and to subscribe to new registrations.
    pub fn metrics_sender(&self) -> MetricSender {
        self.pipeline.metrics_sender()
    }

    /// Returns a read-only access to the [`MetricRegistry`].
    pub fn metrics_reader(&self) -> MetricReader {
        self.pipeline.metrics_reader()
    }

    /// Returns a handle to the main asynchronous runtime used by the pipeline.
    pub fn async_runtime(&self) -> tokio::runtime::Handle {
        self.pipeline.async_runtime().clone()
    }

    /// Runs a future to completion on the underlying async runtime.
    ///
    /// It is fine to block the thread in `post_pipeline_start`,
    /// since the pipeline runs in separate threads.
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
        self.pipeline.async_runtime().block_on(future)
    }
}