netdata_plugin/
collector.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// 
3use super::{Begin, Chart, Dimension, Instruction, Set};
4
5use std::collections::HashMap;
6use std::time::Instant;
7use thiserror::Error;
8use validator::{Validate, ValidationErrors};
9
10use std::io::Write;
11
12/// Error codes used by [Collector] and its methods.
13#[derive(Error, Debug)]
14pub enum CollectorError {
15    /// The referred chart is not yet registert in the collector.
16    #[error("unknown Chart: type.id = {0}")]
17    UnkownChart(String),
18    /// The referred chart is not yet registert in the collector.
19    #[error("unknown Dimension: id = {0}")]
20    UnkownDimension(String),
21    /// One of the field entries violates the formal requirements.
22    #[error(transparent)]
23    ValidationErrors(#[from] ValidationErrors),
24    /// Can't Write to the provided writer.
25    #[error(transparent)]
26    IOError(#[from] std::io::Error),
27}
28
29/// Internal List of active Dimensions.
30#[derive(Default)]
31struct CollectedDimensionInfo {
32    /// Prepared value.
33    value: i64,
34    /// Will be set to `true` if already commited.
35    commited: bool,
36}
37
38/// Internal List of active Charts.
39#[derive(Default)]
40struct CollectedChartInfo {
41    dimensions: HashMap<String, CollectedDimensionInfo>,
42    /// Timestamp of last commit.
43    last_commit: Option<Instant>,
44}
45
46/// A High-Level interface to run the data collecting `BEGIN`-`SET`-`END` loop
47/// and setup [Chart] and [Dimension] info in an efficient manner.
48///
49/// It's used roughly like this:
50///
51/// Example:
52///
53/// ```rust
54/// use std::error;
55/// use netdata_plugin::{Chart, Dimension};
56/// use netdata_plugin::collector::Collector;
57///
58/// fn main() -> Result<(), Box<dyn error::Error>> {
59///     // prepare collector
60///     let mut writer = std::io::stdout();
61///     let mut c = Collector::new(&mut writer);
62///
63///     // add charts and their associated dimensions
64///     c.add_chart(&mut Chart { type_id: "mytype.something" /*...*/, ..Default::default()})?;
65///     // writes CHART ...
66///     c.add_dimension( "mytype.something", &Dimension {id: "d1", ..Default::default()})?;
67///     c.add_dimension( "mytype.something", &Dimension {id: "d2", ..Default::default()})?;
68///     // writes DIMENSION ...
69///
70///     // data collecting loop
71///     loop {
72///         c.prepare_value("mytype.something", "d1", 4242)?;
73///         c.prepare_value("mytype.something", "d2", 4242)?;
74///         c.commit_chart("mytype.something").unwrap();
75///         // The BEGIN ... - SET ... - END block will be written to stdout
76///
77///         std::thread::sleep(std::time::Duration::from_secs(1));
78///         break; // stop the demonstration ;)
79///     }
80///     Ok(())
81/// }
82/// ```
83#[derive()]
84pub struct Collector<'a, W: Write> {
85    charts: HashMap<String, CollectedChartInfo>,
86    writer: &'a mut W,
87}
88
89impl<'a, W: Write> Collector<'a, W> {
90    /// Initilaize the data store to manage the list of active [Chart]s.
91    ///
92    /// In most cases you can use a mutable `stdout` reference as writer.
93    pub fn new(writer: &'a mut W) -> Self {
94        Collector {
95            charts: HashMap::new(),
96            writer: writer,
97        }
98    }
99
100    /// Registers a new [Chart] in this [Collector] and write
101    /// a `CHART...`-command to the plugins `stdout` to add the Chart properties
102    /// on the host side, too.
103    ///
104    /// The `type_id` of the [Chart] entry will be checkt for
105    /// [formal validity](super::validate_type_id()) in
106    /// advance and may raise [ValidationErrors](CollectorError::ValidationErrors).
107    pub fn add_chart(&mut self, chart: &Chart) -> Result<(), CollectorError> {
108        chart.validate()?;
109        let cci = CollectedChartInfo {
110            ..Default::default()
111        };
112        self.charts.insert(format!("{}", chart.type_id), cci);
113        writeln!(self.writer, "{}", chart)?;
114        Ok(())
115    }
116
117    /// Registers a new [Dimension] for a Chart of a given `type.id` in this
118    /// [Collector] and write a `DIMENSION...`-command to the plugins `stdout` to add it
119    /// on the host side as well.
120    ///
121    /// The `id` field of the [Dimension] entry will be checkt for
122    /// [formal validity](super::validate_id()) in
123    /// advance and may raise [ValidationErrors](CollectorError::ValidationErrors).
124    pub fn add_dimension(
125        &mut self,
126        chart_id: &str,
127        dimension: &Dimension,
128    ) -> Result<(), CollectorError> {
129        dimension.validate()?;
130        let cci = self
131            .charts
132            .get_mut(chart_id)
133            .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
134        let cdi = CollectedDimensionInfo {
135            ..Default::default()
136        };
137        cci.dimensions.insert(dimension.id.to_owned(), cdi);
138        writeln!(self.writer, "{}", dimension)?;
139        Ok(())
140    }
141
142    /// Update the value for one [Dimension] to be later commited
143    /// with all other updated values of this [Chart]
144    ///
145    /// If the reffered [Dimension] or [Chart] isn't already registered in the
146    /// [Collector], it will will raise [UnkownChart](CollectorError::UnkownChart) or
147    /// [UnkownDimension](CollectorError::UnkownDimension). This may be used to
148    /// dynamically setup the needed categories on demand.
149    pub fn prepare_value(
150        &mut self,
151        chart_id: &str,
152        dimension_id: &str,
153        value: i64,
154    ) -> Result<(), CollectorError> {
155        let cci = self
156            .charts
157            .get_mut(chart_id)
158            .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
159        let mut cdi = cci
160            .dimensions
161            .get_mut(dimension_id)
162            .ok_or(CollectorError::UnkownDimension(dimension_id.to_owned()))?;
163        cdi.value = value;
164        cdi.commited = false;
165        Ok(())
166    }
167
168    /// Send a block containing all updated values to `stdout`.
169    ///
170    /// In multi threaded scenarios, this could require additional lock preventions.
171    pub fn commit_chart(&mut self, chart_id: &str) -> Result<(), CollectorError> {
172        let mut cci = self
173            .charts
174            .get_mut(chart_id)
175            .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
176        let now = Instant::now();
177
178        let begin = Begin {
179            type_id: chart_id,
180            microseconds: match cci.last_commit {
181                Some(t) => Some(now.duration_since(t).as_micros()),
182                _ => None,
183            },
184        };
185        cci.last_commit = Some(now);
186        writeln!(self.writer, "{}", begin).unwrap();
187
188        for (id, cdi) in cci.dimensions.iter_mut() {
189            if !cdi.commited {
190                writeln!(
191                    self.writer,
192                    "{}",
193                    Set {
194                        id: id,
195                        value: Some(cdi.value)
196                    }
197                )
198                .unwrap();
199                cdi.commited = true;
200            }
201        }
202
203        writeln!(self.writer, "{}", Instruction::END)?;
204        Ok(())
205    }
206}
207
208#[cfg(test)]
209mod collector_tests {
210    use super::{Chart, Collector, Dimension};
211    use pretty_assertions::assert_eq;
212
213    #[test]
214    fn collector_test() {
215        let mut redirect_buf = Vec::new();
216
217        let mut collector = Collector::new(&mut redirect_buf);
218        collector
219            .add_chart(&mut Chart {
220                type_id: "olsr.test_id",
221                name: "test_name",
222                title: "captions",
223                units: "ms",
224                ..Default::default()
225            })
226            .unwrap();
227        collector
228            .add_dimension(
229                "olsr.test_id",
230                &Dimension {
231                    id: "test_dim_id",
232                    name: "test_dim_name",
233                    ..Default::default()
234                },
235            )
236            .unwrap();
237        collector
238            .prepare_value("olsr.test_id", "test_dim_id", 4242)
239            .unwrap();
240        collector.commit_chart("olsr.test_id").unwrap();
241        collector
242            .prepare_value("olsr.test_id", "test_dim_id", 4343)
243            .unwrap();
244        collector.commit_chart("olsr.test_id").unwrap();
245
246        let should_be = r#"CHART "olsr.test_id" "test_name" "captions" "ms"
247DIMENSION "test_dim_id" "test_dim_name"
248BEGIN "olsr.test_id"
249SET "test_dim_id" = 00
250END
251BEGIN "olsr.test_id" 0
252SET "test_dim_id" = 00
253END
254"#;
255        let mut output = String::from_utf8(redirect_buf).unwrap();
256        output = output
257            .chars()
258            .map(|x| if x.is_numeric() { '0' } else { x })
259            .collect::<String>()
260            .replace("00", "0");
261        assert_eq!(output, should_be);
262    }
263}