Skip to main content

sonda_core/encoder/
influx.rs

1//! InfluxDB Line Protocol encoder.
2//!
3//! Implements the InfluxDB line protocol format.
4//! Reference: <https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/>
5//!
6//! Format:
7//! ```text
8//! measurement,tag1=val1,tag2=val2 field_key=value timestamp_ns\n
9//! ```
10//!
11//! Tags are sorted by key (InfluxDB best practice for performance). Measurement names and
12//! tag keys/values escape `,`, ` `, and `=` with a backslash.
13
14use std::io::Write as _;
15use std::time::UNIX_EPOCH;
16
17use crate::model::metric::MetricEvent;
18use crate::{EncoderError, SondaError};
19
20use super::Encoder;
21
22/// Encodes [`MetricEvent`]s into InfluxDB line protocol format.
23///
24/// The field key used for the metric value is configured at construction time. It defaults
25/// to `"value"`.
26///
27/// Output format (with tags):
28/// ```text
29/// measurement,tag1=val1,tag2=val2 field_key=value 1700000000000000000\n
30/// ```
31///
32/// Output format (no tags):
33/// ```text
34/// measurement field_key=value 1700000000000000000\n
35/// ```
36///
37/// Timestamp is nanoseconds since the Unix epoch.
38///
39/// Characters `,`, ` `, and `=` are escaped with a backslash in measurement names and
40/// tag keys/values.
41///
42/// When `precision` is set, metric values are formatted to the specified number
43/// of decimal places.
44pub struct InfluxLineProtocol {
45    /// Pre-escaped field key bytes written into the buffer on every encode call.
46    ///
47    /// Built once at construction from the configured field key (default: `"value"`).
48    field_key_escaped: Vec<u8>,
49    /// Optional decimal precision for metric values.
50    precision: Option<u8>,
51}
52
53impl InfluxLineProtocol {
54    /// Create a new `InfluxLineProtocol` encoder.
55    ///
56    /// `field_key` sets the InfluxDB field key for the metric value. If `None`, defaults
57    /// to `"value"`. The field key is escaped and stored at construction time to avoid
58    /// per-event work.
59    ///
60    /// `precision` optionally limits the number of decimal places in metric values.
61    /// `None` preserves full `f64` precision (default behavior).
62    pub fn new(field_key: Option<String>, precision: Option<u8>) -> Self {
63        let field_key = field_key.unwrap_or_else(|| "value".to_string());
64        let mut field_key_escaped = Vec::with_capacity(field_key.len() + 4);
65        escape_tag(&field_key, &mut field_key_escaped);
66        Self {
67            field_key_escaped,
68            precision,
69        }
70    }
71}
72
73/// Escape a measurement name, tag key, or tag value per InfluxDB line protocol rules.
74///
75/// The following characters are escaped with a leading backslash: `,`, ` ` (space), `=`.
76fn escape_tag(s: &str, buf: &mut Vec<u8>) {
77    for byte in s.bytes() {
78        match byte {
79            b',' | b' ' | b'=' => {
80                buf.push(b'\\');
81                buf.push(byte);
82            }
83            other => buf.push(other),
84        }
85    }
86}
87
88impl Encoder for InfluxLineProtocol {
89    /// Encode a metric event into InfluxDB line protocol format.
90    ///
91    /// Appends a complete line to `buf`. The buffer is not cleared before writing.
92    /// Writes into the caller-provided buffer to minimize allocations.
93    fn encode_metric(&self, event: &MetricEvent, buf: &mut Vec<u8>) -> Result<(), SondaError> {
94        // Measurement name (escaped)
95        escape_tag(&event.name, buf);
96
97        // Tag set (only if non-empty). Tags are already sorted by key from BTreeMap.
98        if !event.labels.is_empty() {
99            buf.push(b',');
100            let mut first = true;
101            for (key, value) in event.labels.iter() {
102                if !first {
103                    buf.push(b',');
104                }
105                first = false;
106                escape_tag(key, buf);
107                buf.push(b'=');
108                escape_tag(value, buf);
109            }
110        }
111
112        // Space separates tag set from field set
113        buf.push(b' ');
114
115        // Field set: field_key=value (field values are not escaped; numeric values need no escaping)
116        buf.extend_from_slice(&self.field_key_escaped);
117        buf.push(b'=');
118        // Write the float value, optionally with fixed decimal precision
119        super::write_value(buf, event.value, self.precision);
120
121        // Timestamp in nanoseconds since epoch
122        let timestamp_ns = event
123            .timestamp
124            .duration_since(UNIX_EPOCH)
125            .map_err(|e| SondaError::Encoder(EncoderError::TimestampBeforeEpoch(e)))?
126            .as_nanos();
127
128        buf.push(b' ');
129        write!(buf, "{timestamp_ns}").expect("write to Vec<u8> is infallible");
130
131        buf.push(b'\n');
132
133        Ok(())
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::encoder::{create_encoder, EncoderConfig};
141    use crate::model::metric::{Labels, MetricEvent};
142    use std::time::{Duration, UNIX_EPOCH};
143
144    /// Build a MetricEvent with a fixed nanosecond-precision timestamp for deterministic tests.
145    fn make_event(name: &str, value: f64, labels: Labels, timestamp_ns: u64) -> MetricEvent {
146        let ts = UNIX_EPOCH + Duration::from_nanos(timestamp_ns);
147        MetricEvent::with_timestamp(name.to_string(), value, labels, ts).unwrap()
148    }
149
150    /// Encode one event and return the result as a UTF-8 String.
151    fn encode_to_string(enc: &InfluxLineProtocol, event: &MetricEvent) -> String {
152        let mut buf = Vec::new();
153        enc.encode_metric(event, &mut buf).unwrap();
154        String::from_utf8(buf).unwrap()
155    }
156
157    // --- Happy path: metric with no labels ---
158
159    #[test]
160    fn no_labels_produces_measurement_space_field_space_timestamp() {
161        let enc = InfluxLineProtocol::new(None, None);
162        let labels = Labels::from_pairs(&[]).unwrap();
163        let event = make_event("up", 1.0, labels, 1_700_000_000_000_000_000);
164        let output = encode_to_string(&enc, &event);
165        assert_eq!(output, "up value=1 1700000000000000000\n");
166    }
167
168    #[test]
169    fn no_labels_output_has_no_comma_after_measurement() {
170        let enc = InfluxLineProtocol::new(None, None);
171        let labels = Labels::from_pairs(&[]).unwrap();
172        let event = make_event("cpu", 0.5, labels, 1_000_000_000);
173        let output = encode_to_string(&enc, &event);
174        // Measurement must be directly followed by a space (no tag set comma)
175        assert!(
176            output.starts_with("cpu "),
177            "no-label measurement must be followed by space: {output:?}"
178        );
179    }
180
181    // --- Happy path: metric with two labels (sorted) ---
182
183    #[test]
184    fn two_labels_sorted_by_key_in_tag_set() {
185        let enc = InfluxLineProtocol::new(None, None);
186        // Insert in reverse alphabetical order — BTreeMap must sort them.
187        let labels = Labels::from_pairs(&[("zone", "eu1"), ("host", "srv1")]).unwrap();
188        let event = make_event("cpu", 0.5, labels, 1_700_000_000_000_000_000);
189        let output = encode_to_string(&enc, &event);
190        // host < zone alphabetically
191        assert_eq!(
192            output,
193            "cpu,host=srv1,zone=eu1 value=0.5 1700000000000000000\n"
194        );
195    }
196
197    #[test]
198    fn three_labels_sorted_alphabetically() {
199        let enc = InfluxLineProtocol::new(None, None);
200        let labels =
201            Labels::from_pairs(&[("zone", "us1"), ("env", "prod"), ("host", "web01")]).unwrap();
202        let event = make_event("metric", 42.0, labels, 1_000_000_000);
203        let output = encode_to_string(&enc, &event);
204        // env < host < zone
205        assert!(
206            output.starts_with("metric,env=prod,host=web01,zone=us1 "),
207            "tags not sorted correctly: {output:?}"
208        );
209    }
210
211    // --- Custom field key ---
212
213    #[test]
214    fn custom_field_key_appears_in_output() {
215        let enc = InfluxLineProtocol::new(Some("gauge".to_string()), None);
216        let labels = Labels::from_pairs(&[]).unwrap();
217        let event = make_event("up", 1.0, labels, 1_000_000_000);
218        let output = encode_to_string(&enc, &event);
219        assert!(
220            output.contains("gauge=1"),
221            "custom field key not in output: {output:?}"
222        );
223    }
224
225    #[test]
226    fn none_field_key_defaults_to_value() {
227        let enc = InfluxLineProtocol::new(None, None);
228        let labels = Labels::from_pairs(&[]).unwrap();
229        let event = make_event("up", 1.0, labels, 1_000_000_000);
230        let output = encode_to_string(&enc, &event);
231        assert!(
232            output.contains("value="),
233            "default field key 'value' not in output: {output:?}"
234        );
235    }
236
237    // --- Escaping: measurement name ---
238
239    #[test]
240    fn measurement_with_space_is_escaped() {
241        let enc = InfluxLineProtocol::new(None, None);
242        let labels = Labels::from_pairs(&[]).unwrap();
243        // Use Labels::new (bypasses key validation) so we can test escaping independently
244        // For measurement name, we construct the event directly
245        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
246        // MetricEvent validates metric name — spaces are not allowed in valid Prometheus names.
247        // We test via a name with a comma or underscore that influx would escape.
248        // Since MetricEvent::with_timestamp validates names with Prometheus rules (no spaces allowed),
249        // we instead test with a name containing characters that pass Prometheus validation
250        // but verify the escaping machinery via the escape_tag function directly through a name
251        // that has an underscore (no escaping needed) to confirm non-special chars pass through.
252        let event = MetricEvent::with_timestamp("cpu_usage".to_string(), 0.75, labels, ts).unwrap();
253        let output = encode_to_string(&enc, &event);
254        assert!(
255            output.starts_with("cpu_usage "),
256            "plain measurement passed through incorrectly: {output:?}"
257        );
258    }
259
260    #[test]
261    fn tag_value_with_space_is_escaped() {
262        let enc = InfluxLineProtocol::new(None, None);
263        // Labels::from_pairs validates keys but not values — values can contain special chars.
264        let labels = Labels::new(vec![("host".to_string(), "my server".to_string())]);
265        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
266        let event = MetricEvent::with_timestamp("cpu".to_string(), 0.5, labels, ts).unwrap();
267        let output = encode_to_string(&enc, &event);
268        assert!(
269            output.contains(r"host=my\ server"),
270            "space in tag value not escaped: {output:?}"
271        );
272    }
273
274    #[test]
275    fn tag_value_with_comma_is_escaped() {
276        let enc = InfluxLineProtocol::new(None, None);
277        let labels = Labels::new(vec![("region".to_string(), "us,east".to_string())]);
278        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
279        let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
280        let output = encode_to_string(&enc, &event);
281        assert!(
282            output.contains(r"region=us\,east"),
283            "comma in tag value not escaped: {output:?}"
284        );
285    }
286
287    #[test]
288    fn tag_value_with_equals_is_escaped() {
289        let enc = InfluxLineProtocol::new(None, None);
290        let labels = Labels::new(vec![("kv".to_string(), "a=b".to_string())]);
291        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
292        let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
293        let output = encode_to_string(&enc, &event);
294        assert!(
295            output.contains(r"kv=a\=b"),
296            "equals in tag value not escaped: {output:?}"
297        );
298    }
299
300    #[test]
301    fn tag_value_with_all_special_chars_is_escaped() {
302        let enc = InfluxLineProtocol::new(None, None);
303        let labels = Labels::new(vec![("tag".to_string(), "a,b c=d".to_string())]);
304        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
305        let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
306        let output = encode_to_string(&enc, &event);
307        assert!(
308            output.contains(r"tag=a\,b\ c\=d"),
309            "combined escaping not correct: {output:?}"
310        );
311    }
312
313    // --- Timestamp is nanoseconds ---
314
315    #[test]
316    fn timestamp_is_nanoseconds_at_least_13_digits() {
317        let enc = InfluxLineProtocol::new(None, None);
318        let labels = Labels::from_pairs(&[]).unwrap();
319        // 1_700_000_000 seconds = 1_700_000_000_000_000_000 ns (19 digits)
320        let event = make_event("up", 1.0, labels, 1_700_000_000_000_000_000);
321        let output = encode_to_string(&enc, &event);
322        // Extract the timestamp (last token before newline)
323        let ts_str = output
324            .trim_end_matches('\n')
325            .split_whitespace()
326            .last()
327            .unwrap();
328        assert!(
329            ts_str.len() >= 13,
330            "timestamp must be at least 13 digits (nanoseconds): {ts_str:?}"
331        );
332        assert_eq!(
333            ts_str, "1700000000000000000",
334            "timestamp is not nanoseconds: {ts_str:?}"
335        );
336    }
337
338    #[test]
339    fn timestamp_is_not_milliseconds() {
340        let enc = InfluxLineProtocol::new(None, None);
341        let labels = Labels::from_pairs(&[]).unwrap();
342        // 1_000 ms = 1 second = 1_000_000_000 ns; ms would be "1000", ns would be "1000000000"
343        let ts = UNIX_EPOCH + Duration::from_millis(1_000);
344        let event = MetricEvent::with_timestamp("up".to_string(), 1.0, labels, ts).unwrap();
345        let output = encode_to_string(&enc, &event);
346        let ts_str = output
347            .trim_end_matches('\n')
348            .split_whitespace()
349            .last()
350            .unwrap();
351        assert_eq!(
352            ts_str, "1000000000",
353            "timestamp should be nanoseconds, not milliseconds: got {ts_str:?}"
354        );
355    }
356
357    #[test]
358    fn timestamp_does_not_contain_decimal_point() {
359        let enc = InfluxLineProtocol::new(None, None);
360        let labels = Labels::from_pairs(&[]).unwrap();
361        let event = make_event("up", 1.0, labels, 1_234_567_890_123_456_789);
362        let output = encode_to_string(&enc, &event);
363        let ts_str = output
364            .trim_end_matches('\n')
365            .split_whitespace()
366            .last()
367            .unwrap();
368        assert!(
369            !ts_str.contains('.'),
370            "timestamp must be an integer: {ts_str:?}"
371        );
372    }
373
374    // --- Regression anchor: hardcoded expected byte strings ---
375
376    #[test]
377    fn regression_anchor_no_labels_exact_bytes() {
378        let enc = InfluxLineProtocol::new(None, None);
379        let labels = Labels::from_pairs(&[]).unwrap();
380        // Timestamp: exactly 1_700_000_000 seconds = 1_700_000_000_000_000_000 ns
381        let event = make_event(
382            "http_requests_total",
383            123.456,
384            labels,
385            1_700_000_000_000_000_000,
386        );
387        let mut buf = Vec::new();
388        enc.encode_metric(&event, &mut buf).unwrap();
389        assert_eq!(
390            buf,
391            b"http_requests_total value=123.456 1700000000000000000\n"
392        );
393    }
394
395    #[test]
396    fn regression_anchor_two_labels_exact_bytes() {
397        let enc = InfluxLineProtocol::new(None, None);
398        let labels = Labels::from_pairs(&[("hostname", "t0-a1"), ("zone", "eu1")]).unwrap();
399        let event = make_event("interface_state", 1.0, labels, 1_700_000_000_000_000_000);
400        let mut buf = Vec::new();
401        enc.encode_metric(&event, &mut buf).unwrap();
402        assert_eq!(
403            buf,
404            b"interface_state,hostname=t0-a1,zone=eu1 value=1 1700000000000000000\n"
405        );
406    }
407
408    #[test]
409    fn regression_anchor_custom_field_key_exact_bytes() {
410        let enc = InfluxLineProtocol::new(Some("gauge".to_string()), None);
411        let labels = Labels::from_pairs(&[("host", "srv1")]).unwrap();
412        let event = make_event("cpu", 0.75, labels, 1_000_000_000_000_000_000);
413        let mut buf = Vec::new();
414        enc.encode_metric(&event, &mut buf).unwrap();
415        assert_eq!(buf, b"cpu,host=srv1 gauge=0.75 1000000000000000000\n");
416    }
417
418    // --- Output format ---
419
420    #[test]
421    fn output_ends_with_newline() {
422        let enc = InfluxLineProtocol::new(None, None);
423        let labels = Labels::from_pairs(&[("k", "v")]).unwrap();
424        let event = make_event("metric", 3.14, labels, 999_000_000);
425        let output = encode_to_string(&enc, &event);
426        assert!(
427            output.ends_with('\n'),
428            "output must end with newline: {output:?}"
429        );
430    }
431
432    #[test]
433    fn encode_appends_to_existing_buffer_content() {
434        let enc = InfluxLineProtocol::new(None, None);
435        let labels = Labels::from_pairs(&[]).unwrap();
436        let event = make_event("up", 1.0, labels, 1_000_000_000);
437        let mut buf = b"existing\n".to_vec();
438        enc.encode_metric(&event, &mut buf).unwrap();
439        let output = String::from_utf8(buf).unwrap();
440        assert!(
441            output.starts_with("existing\n"),
442            "encoder must append, not overwrite: {output:?}"
443        );
444        assert!(
445            output.ends_with("up value=1 1000000000\n"),
446            "appended content missing: {output:?}"
447        );
448    }
449
450    #[test]
451    fn multiple_encodes_accumulate_in_buffer() {
452        let enc = InfluxLineProtocol::new(None, None);
453        let labels = Labels::from_pairs(&[]).unwrap();
454        let event1 = make_event("up", 1.0, labels.clone(), 1_000_000_000);
455        let event2 = make_event("down", 0.0, labels, 2_000_000_000);
456        let mut buf = Vec::new();
457        enc.encode_metric(&event1, &mut buf).unwrap();
458        enc.encode_metric(&event2, &mut buf).unwrap();
459        let output = String::from_utf8(buf).unwrap();
460        let lines: Vec<&str> = output.lines().collect();
461        assert_eq!(lines.len(), 2, "expected 2 lines: {output:?}");
462        assert_eq!(lines[0], "up value=1 1000000000");
463        assert_eq!(lines[1], "down value=0 2000000000");
464    }
465
466    // --- Pre-epoch timestamp error ---
467
468    #[test]
469    fn pre_epoch_timestamp_returns_encoder_error() {
470        let before_epoch = UNIX_EPOCH - Duration::from_secs(1);
471        let labels = Labels::from_pairs(&[]).unwrap();
472        let event =
473            MetricEvent::with_timestamp("up".to_string(), 1.0, labels, before_epoch).unwrap();
474        let enc = InfluxLineProtocol::new(None, None);
475        let mut buf = Vec::new();
476        let result = enc.encode_metric(&event, &mut buf);
477        assert!(
478            matches!(result, Err(SondaError::Encoder(_))),
479            "expected Encoder error for pre-epoch timestamp, got: {result:?}"
480        );
481    }
482
483    // --- Send + Sync contract ---
484
485    #[test]
486    fn influx_line_protocol_is_send_and_sync() {
487        fn assert_send_sync<T: Send + Sync>() {}
488        assert_send_sync::<InfluxLineProtocol>();
489    }
490
491    // --- Factory and EncoderConfig ---
492
493    #[test]
494    fn create_encoder_returns_working_influx_encoder_with_default_field_key() {
495        let config = EncoderConfig::InfluxLineProtocol {
496            field_key: None,
497            precision: None,
498        };
499        let enc = create_encoder(&config).unwrap();
500        let labels = Labels::from_pairs(&[]).unwrap();
501        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
502        let event = MetricEvent::with_timestamp("up".to_string(), 1.0, labels, ts).unwrap();
503        let mut buf = Vec::new();
504        enc.encode_metric(&event, &mut buf).unwrap();
505        let output = String::from_utf8(buf).unwrap();
506        assert_eq!(output, "up value=1 1000000000\n");
507    }
508
509    #[test]
510    fn create_encoder_returns_working_influx_encoder_with_custom_field_key() {
511        let config = EncoderConfig::InfluxLineProtocol {
512            field_key: Some("count".to_string()),
513            precision: None,
514        };
515        let enc = create_encoder(&config).unwrap();
516        let labels = Labels::from_pairs(&[]).unwrap();
517        let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
518        let event = MetricEvent::with_timestamp("up".to_string(), 5.0, labels, ts).unwrap();
519        let mut buf = Vec::new();
520        enc.encode_metric(&event, &mut buf).unwrap();
521        let output = String::from_utf8(buf).unwrap();
522        assert!(
523            output.contains("count=5"),
524            "custom field key 'count' not in factory-created encoder output: {output:?}"
525        );
526    }
527
528    #[cfg(feature = "config")]
529    #[test]
530    fn encoder_config_deserialization_influx_lp_no_field_key() {
531        let config: EncoderConfig =
532            serde_yaml_ng::from_str("type: influx_lp\nfield_key: null").unwrap();
533        assert!(matches!(
534            config,
535            EncoderConfig::InfluxLineProtocol {
536                field_key: None,
537                precision: None
538            }
539        ));
540    }
541
542    #[cfg(feature = "config")]
543    #[test]
544    fn encoder_config_deserialization_influx_lp_with_field_key() {
545        let config: EncoderConfig =
546            serde_yaml_ng::from_str("type: influx_lp\nfield_key: requests").unwrap();
547        assert!(matches!(
548            config,
549            EncoderConfig::InfluxLineProtocol {
550                field_key: Some(ref k), ..
551            } if k == "requests"
552        ));
553    }
554
555    // --- Precision: 2 limits decimal places in field value ---
556
557    #[test]
558    fn precision_two_limits_decimals_influx() {
559        let enc = InfluxLineProtocol::new(None, Some(2));
560        let labels = Labels::from_pairs(&[]).unwrap();
561        let event = make_event("cpu", 99.60573, labels, 1_700_000_000_000_000_000);
562        let output = encode_to_string(&enc, &event);
563        assert_eq!(output, "cpu value=99.61 1700000000000000000\n");
564    }
565
566    #[test]
567    fn precision_none_preserves_full_output_influx() {
568        let enc = InfluxLineProtocol::new(None, None);
569        let labels = Labels::from_pairs(&[]).unwrap();
570        let event = make_event("cpu", 99.60573506572389, labels, 1_000_000_000);
571        let output = encode_to_string(&enc, &event);
572        assert!(
573            output.contains("value=99.60573506572389"),
574            "full precision must be preserved: {output:?}"
575        );
576    }
577
578    #[test]
579    fn precision_zero_influx() {
580        let enc = InfluxLineProtocol::new(None, Some(0));
581        let labels = Labels::from_pairs(&[]).unwrap();
582        let event = make_event("up", 42.9, labels, 1_000_000_000);
583        let output = encode_to_string(&enc, &event);
584        assert!(
585            output.contains("value=43"),
586            "precision=0 should round: {output:?}"
587        );
588    }
589}