netdata_plugin/collector.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2//
3use super::{Begin, Chart, Dimension, Instruction, Set};
4
5use std::collections::HashMap;
6use std::time::Instant;
7use thiserror::Error;
8use validator::{Validate, ValidationErrors};
9
10use std::io::Write;
11
12/// Error codes used by [Collector] and its methods.
13#[derive(Error, Debug)]
14pub enum CollectorError {
15 /// The referred chart is not yet registert in the collector.
16 #[error("unknown Chart: type.id = {0}")]
17 UnkownChart(String),
18 /// The referred chart is not yet registert in the collector.
19 #[error("unknown Dimension: id = {0}")]
20 UnkownDimension(String),
21 /// One of the field entries violates the formal requirements.
22 #[error(transparent)]
23 ValidationErrors(#[from] ValidationErrors),
24 /// Can't Write to the provided writer.
25 #[error(transparent)]
26 IOError(#[from] std::io::Error),
27}
28
29/// Internal List of active Dimensions.
30#[derive(Default)]
31struct CollectedDimensionInfo {
32 /// Prepared value.
33 value: i64,
34 /// Will be set to `true` if already commited.
35 commited: bool,
36}
37
38/// Internal List of active Charts.
39#[derive(Default)]
40struct CollectedChartInfo {
41 dimensions: HashMap<String, CollectedDimensionInfo>,
42 /// Timestamp of last commit.
43 last_commit: Option<Instant>,
44}
45
46/// A High-Level interface to run the data collecting `BEGIN`-`SET`-`END` loop
47/// and setup [Chart] and [Dimension] info in an efficient manner.
48///
49/// It's used roughly like this:
50///
51/// Example:
52///
53/// ```rust
54/// use std::error;
55/// use netdata_plugin::{Chart, Dimension};
56/// use netdata_plugin::collector::Collector;
57///
58/// fn main() -> Result<(), Box<dyn error::Error>> {
59/// // prepare collector
60/// let mut writer = std::io::stdout();
61/// let mut c = Collector::new(&mut writer);
62///
63/// // add charts and their associated dimensions
64/// c.add_chart(&mut Chart { type_id: "mytype.something" /*...*/, ..Default::default()})?;
65/// // writes CHART ...
66/// c.add_dimension( "mytype.something", &Dimension {id: "d1", ..Default::default()})?;
67/// c.add_dimension( "mytype.something", &Dimension {id: "d2", ..Default::default()})?;
68/// // writes DIMENSION ...
69///
70/// // data collecting loop
71/// loop {
72/// c.prepare_value("mytype.something", "d1", 4242)?;
73/// c.prepare_value("mytype.something", "d2", 4242)?;
74/// c.commit_chart("mytype.something").unwrap();
75/// // The BEGIN ... - SET ... - END block will be written to stdout
76///
77/// std::thread::sleep(std::time::Duration::from_secs(1));
78/// break; // stop the demonstration ;)
79/// }
80/// Ok(())
81/// }
82/// ```
83#[derive()]
84pub struct Collector<'a, W: Write> {
85 charts: HashMap<String, CollectedChartInfo>,
86 writer: &'a mut W,
87}
88
89impl<'a, W: Write> Collector<'a, W> {
90 /// Initilaize the data store to manage the list of active [Chart]s.
91 ///
92 /// In most cases you can use a mutable `stdout` reference as writer.
93 pub fn new(writer: &'a mut W) -> Self {
94 Collector {
95 charts: HashMap::new(),
96 writer: writer,
97 }
98 }
99
100 /// Registers a new [Chart] in this [Collector] and write
101 /// a `CHART...`-command to the plugins `stdout` to add the Chart properties
102 /// on the host side, too.
103 ///
104 /// The `type_id` of the [Chart] entry will be checkt for
105 /// [formal validity](super::validate_type_id()) in
106 /// advance and may raise [ValidationErrors](CollectorError::ValidationErrors).
107 pub fn add_chart(&mut self, chart: &Chart) -> Result<(), CollectorError> {
108 chart.validate()?;
109 let cci = CollectedChartInfo {
110 ..Default::default()
111 };
112 self.charts.insert(format!("{}", chart.type_id), cci);
113 writeln!(self.writer, "{}", chart)?;
114 Ok(())
115 }
116
117 /// Registers a new [Dimension] for a Chart of a given `type.id` in this
118 /// [Collector] and write a `DIMENSION...`-command to the plugins `stdout` to add it
119 /// on the host side as well.
120 ///
121 /// The `id` field of the [Dimension] entry will be checkt for
122 /// [formal validity](super::validate_id()) in
123 /// advance and may raise [ValidationErrors](CollectorError::ValidationErrors).
124 pub fn add_dimension(
125 &mut self,
126 chart_id: &str,
127 dimension: &Dimension,
128 ) -> Result<(), CollectorError> {
129 dimension.validate()?;
130 let cci = self
131 .charts
132 .get_mut(chart_id)
133 .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
134 let cdi = CollectedDimensionInfo {
135 ..Default::default()
136 };
137 cci.dimensions.insert(dimension.id.to_owned(), cdi);
138 writeln!(self.writer, "{}", dimension)?;
139 Ok(())
140 }
141
142 /// Update the value for one [Dimension] to be later commited
143 /// with all other updated values of this [Chart]
144 ///
145 /// If the reffered [Dimension] or [Chart] isn't already registered in the
146 /// [Collector], it will will raise [UnkownChart](CollectorError::UnkownChart) or
147 /// [UnkownDimension](CollectorError::UnkownDimension). This may be used to
148 /// dynamically setup the needed categories on demand.
149 pub fn prepare_value(
150 &mut self,
151 chart_id: &str,
152 dimension_id: &str,
153 value: i64,
154 ) -> Result<(), CollectorError> {
155 let cci = self
156 .charts
157 .get_mut(chart_id)
158 .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
159 let mut cdi = cci
160 .dimensions
161 .get_mut(dimension_id)
162 .ok_or(CollectorError::UnkownDimension(dimension_id.to_owned()))?;
163 cdi.value = value;
164 cdi.commited = false;
165 Ok(())
166 }
167
168 /// Send a block containing all updated values to `stdout`.
169 ///
170 /// In multi threaded scenarios, this could require additional lock preventions.
171 pub fn commit_chart(&mut self, chart_id: &str) -> Result<(), CollectorError> {
172 let mut cci = self
173 .charts
174 .get_mut(chart_id)
175 .ok_or(CollectorError::UnkownChart(chart_id.to_owned()))?;
176 let now = Instant::now();
177
178 let begin = Begin {
179 type_id: chart_id,
180 microseconds: match cci.last_commit {
181 Some(t) => Some(now.duration_since(t).as_micros()),
182 _ => None,
183 },
184 };
185 cci.last_commit = Some(now);
186 writeln!(self.writer, "{}", begin).unwrap();
187
188 for (id, cdi) in cci.dimensions.iter_mut() {
189 if !cdi.commited {
190 writeln!(
191 self.writer,
192 "{}",
193 Set {
194 id: id,
195 value: Some(cdi.value)
196 }
197 )
198 .unwrap();
199 cdi.commited = true;
200 }
201 }
202
203 writeln!(self.writer, "{}", Instruction::END)?;
204 Ok(())
205 }
206}
207
208#[cfg(test)]
209mod collector_tests {
210 use super::{Chart, Collector, Dimension};
211 use pretty_assertions::assert_eq;
212
213 #[test]
214 fn collector_test() {
215 let mut redirect_buf = Vec::new();
216
217 let mut collector = Collector::new(&mut redirect_buf);
218 collector
219 .add_chart(&mut Chart {
220 type_id: "olsr.test_id",
221 name: "test_name",
222 title: "captions",
223 units: "ms",
224 ..Default::default()
225 })
226 .unwrap();
227 collector
228 .add_dimension(
229 "olsr.test_id",
230 &Dimension {
231 id: "test_dim_id",
232 name: "test_dim_name",
233 ..Default::default()
234 },
235 )
236 .unwrap();
237 collector
238 .prepare_value("olsr.test_id", "test_dim_id", 4242)
239 .unwrap();
240 collector.commit_chart("olsr.test_id").unwrap();
241 collector
242 .prepare_value("olsr.test_id", "test_dim_id", 4343)
243 .unwrap();
244 collector.commit_chart("olsr.test_id").unwrap();
245
246 let should_be = r#"CHART "olsr.test_id" "test_name" "captions" "ms"
247DIMENSION "test_dim_id" "test_dim_name"
248BEGIN "olsr.test_id"
249SET "test_dim_id" = 00
250END
251BEGIN "olsr.test_id" 0
252SET "test_dim_id" = 00
253END
254"#;
255 let mut output = String::from_utf8(redirect_buf).unwrap();
256 output = output
257 .chars()
258 .map(|x| if x.is_numeric() { '0' } else { x })
259 .collect::<String>()
260 .replace("00", "0");
261 assert_eq!(output, should_be);
262 }
263}