1use std::io::Write as _;
15use std::time::UNIX_EPOCH;
16
17use crate::model::metric::MetricEvent;
18use crate::{EncoderError, SondaError};
19
20use super::Encoder;
21
22pub struct InfluxLineProtocol {
45 field_key_escaped: Vec<u8>,
49 precision: Option<u8>,
51}
52
53impl InfluxLineProtocol {
54 pub fn new(field_key: Option<String>, precision: Option<u8>) -> Self {
63 let field_key = field_key.unwrap_or_else(|| "value".to_string());
64 let mut field_key_escaped = Vec::with_capacity(field_key.len() + 4);
65 escape_tag(&field_key, &mut field_key_escaped);
66 Self {
67 field_key_escaped,
68 precision,
69 }
70 }
71}
72
73fn escape_tag(s: &str, buf: &mut Vec<u8>) {
77 for byte in s.bytes() {
78 match byte {
79 b',' | b' ' | b'=' => {
80 buf.push(b'\\');
81 buf.push(byte);
82 }
83 other => buf.push(other),
84 }
85 }
86}
87
88impl Encoder for InfluxLineProtocol {
89 fn encode_metric(&self, event: &MetricEvent, buf: &mut Vec<u8>) -> Result<(), SondaError> {
94 escape_tag(&event.name, buf);
96
97 if !event.labels.is_empty() {
99 buf.push(b',');
100 let mut first = true;
101 for (key, value) in event.labels.iter() {
102 if !first {
103 buf.push(b',');
104 }
105 first = false;
106 escape_tag(key, buf);
107 buf.push(b'=');
108 escape_tag(value, buf);
109 }
110 }
111
112 buf.push(b' ');
114
115 buf.extend_from_slice(&self.field_key_escaped);
117 buf.push(b'=');
118 super::write_value(buf, event.value, self.precision);
120
121 let timestamp_ns = event
123 .timestamp
124 .duration_since(UNIX_EPOCH)
125 .map_err(|e| SondaError::Encoder(EncoderError::TimestampBeforeEpoch(e)))?
126 .as_nanos();
127
128 buf.push(b' ');
129 write!(buf, "{timestamp_ns}").expect("write to Vec<u8> is infallible");
130
131 buf.push(b'\n');
132
133 Ok(())
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::encoder::{create_encoder, EncoderConfig};
141 use crate::model::metric::{Labels, MetricEvent};
142 use std::time::{Duration, UNIX_EPOCH};
143
144 fn make_event(name: &str, value: f64, labels: Labels, timestamp_ns: u64) -> MetricEvent {
146 let ts = UNIX_EPOCH + Duration::from_nanos(timestamp_ns);
147 MetricEvent::with_timestamp(name.to_string(), value, labels, ts).unwrap()
148 }
149
150 fn encode_to_string(enc: &InfluxLineProtocol, event: &MetricEvent) -> String {
152 let mut buf = Vec::new();
153 enc.encode_metric(event, &mut buf).unwrap();
154 String::from_utf8(buf).unwrap()
155 }
156
157 #[test]
160 fn no_labels_produces_measurement_space_field_space_timestamp() {
161 let enc = InfluxLineProtocol::new(None, None);
162 let labels = Labels::from_pairs(&[]).unwrap();
163 let event = make_event("up", 1.0, labels, 1_700_000_000_000_000_000);
164 let output = encode_to_string(&enc, &event);
165 assert_eq!(output, "up value=1 1700000000000000000\n");
166 }
167
168 #[test]
169 fn no_labels_output_has_no_comma_after_measurement() {
170 let enc = InfluxLineProtocol::new(None, None);
171 let labels = Labels::from_pairs(&[]).unwrap();
172 let event = make_event("cpu", 0.5, labels, 1_000_000_000);
173 let output = encode_to_string(&enc, &event);
174 assert!(
176 output.starts_with("cpu "),
177 "no-label measurement must be followed by space: {output:?}"
178 );
179 }
180
181 #[test]
184 fn two_labels_sorted_by_key_in_tag_set() {
185 let enc = InfluxLineProtocol::new(None, None);
186 let labels = Labels::from_pairs(&[("zone", "eu1"), ("host", "srv1")]).unwrap();
188 let event = make_event("cpu", 0.5, labels, 1_700_000_000_000_000_000);
189 let output = encode_to_string(&enc, &event);
190 assert_eq!(
192 output,
193 "cpu,host=srv1,zone=eu1 value=0.5 1700000000000000000\n"
194 );
195 }
196
197 #[test]
198 fn three_labels_sorted_alphabetically() {
199 let enc = InfluxLineProtocol::new(None, None);
200 let labels =
201 Labels::from_pairs(&[("zone", "us1"), ("env", "prod"), ("host", "web01")]).unwrap();
202 let event = make_event("metric", 42.0, labels, 1_000_000_000);
203 let output = encode_to_string(&enc, &event);
204 assert!(
206 output.starts_with("metric,env=prod,host=web01,zone=us1 "),
207 "tags not sorted correctly: {output:?}"
208 );
209 }
210
211 #[test]
214 fn custom_field_key_appears_in_output() {
215 let enc = InfluxLineProtocol::new(Some("gauge".to_string()), None);
216 let labels = Labels::from_pairs(&[]).unwrap();
217 let event = make_event("up", 1.0, labels, 1_000_000_000);
218 let output = encode_to_string(&enc, &event);
219 assert!(
220 output.contains("gauge=1"),
221 "custom field key not in output: {output:?}"
222 );
223 }
224
225 #[test]
226 fn none_field_key_defaults_to_value() {
227 let enc = InfluxLineProtocol::new(None, None);
228 let labels = Labels::from_pairs(&[]).unwrap();
229 let event = make_event("up", 1.0, labels, 1_000_000_000);
230 let output = encode_to_string(&enc, &event);
231 assert!(
232 output.contains("value="),
233 "default field key 'value' not in output: {output:?}"
234 );
235 }
236
237 #[test]
240 fn measurement_with_space_is_escaped() {
241 let enc = InfluxLineProtocol::new(None, None);
242 let labels = Labels::from_pairs(&[]).unwrap();
243 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
246 let event = MetricEvent::with_timestamp("cpu_usage".to_string(), 0.75, labels, ts).unwrap();
253 let output = encode_to_string(&enc, &event);
254 assert!(
255 output.starts_with("cpu_usage "),
256 "plain measurement passed through incorrectly: {output:?}"
257 );
258 }
259
260 #[test]
261 fn tag_value_with_space_is_escaped() {
262 let enc = InfluxLineProtocol::new(None, None);
263 let labels = Labels::new(vec![("host".to_string(), "my server".to_string())]);
265 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
266 let event = MetricEvent::with_timestamp("cpu".to_string(), 0.5, labels, ts).unwrap();
267 let output = encode_to_string(&enc, &event);
268 assert!(
269 output.contains(r"host=my\ server"),
270 "space in tag value not escaped: {output:?}"
271 );
272 }
273
274 #[test]
275 fn tag_value_with_comma_is_escaped() {
276 let enc = InfluxLineProtocol::new(None, None);
277 let labels = Labels::new(vec![("region".to_string(), "us,east".to_string())]);
278 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
279 let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
280 let output = encode_to_string(&enc, &event);
281 assert!(
282 output.contains(r"region=us\,east"),
283 "comma in tag value not escaped: {output:?}"
284 );
285 }
286
287 #[test]
288 fn tag_value_with_equals_is_escaped() {
289 let enc = InfluxLineProtocol::new(None, None);
290 let labels = Labels::new(vec![("kv".to_string(), "a=b".to_string())]);
291 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
292 let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
293 let output = encode_to_string(&enc, &event);
294 assert!(
295 output.contains(r"kv=a\=b"),
296 "equals in tag value not escaped: {output:?}"
297 );
298 }
299
300 #[test]
301 fn tag_value_with_all_special_chars_is_escaped() {
302 let enc = InfluxLineProtocol::new(None, None);
303 let labels = Labels::new(vec![("tag".to_string(), "a,b c=d".to_string())]);
304 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
305 let event = MetricEvent::with_timestamp("cpu".to_string(), 1.0, labels, ts).unwrap();
306 let output = encode_to_string(&enc, &event);
307 assert!(
308 output.contains(r"tag=a\,b\ c\=d"),
309 "combined escaping not correct: {output:?}"
310 );
311 }
312
313 #[test]
316 fn timestamp_is_nanoseconds_at_least_13_digits() {
317 let enc = InfluxLineProtocol::new(None, None);
318 let labels = Labels::from_pairs(&[]).unwrap();
319 let event = make_event("up", 1.0, labels, 1_700_000_000_000_000_000);
321 let output = encode_to_string(&enc, &event);
322 let ts_str = output
324 .trim_end_matches('\n')
325 .split_whitespace()
326 .last()
327 .unwrap();
328 assert!(
329 ts_str.len() >= 13,
330 "timestamp must be at least 13 digits (nanoseconds): {ts_str:?}"
331 );
332 assert_eq!(
333 ts_str, "1700000000000000000",
334 "timestamp is not nanoseconds: {ts_str:?}"
335 );
336 }
337
338 #[test]
339 fn timestamp_is_not_milliseconds() {
340 let enc = InfluxLineProtocol::new(None, None);
341 let labels = Labels::from_pairs(&[]).unwrap();
342 let ts = UNIX_EPOCH + Duration::from_millis(1_000);
344 let event = MetricEvent::with_timestamp("up".to_string(), 1.0, labels, ts).unwrap();
345 let output = encode_to_string(&enc, &event);
346 let ts_str = output
347 .trim_end_matches('\n')
348 .split_whitespace()
349 .last()
350 .unwrap();
351 assert_eq!(
352 ts_str, "1000000000",
353 "timestamp should be nanoseconds, not milliseconds: got {ts_str:?}"
354 );
355 }
356
357 #[test]
358 fn timestamp_does_not_contain_decimal_point() {
359 let enc = InfluxLineProtocol::new(None, None);
360 let labels = Labels::from_pairs(&[]).unwrap();
361 let event = make_event("up", 1.0, labels, 1_234_567_890_123_456_789);
362 let output = encode_to_string(&enc, &event);
363 let ts_str = output
364 .trim_end_matches('\n')
365 .split_whitespace()
366 .last()
367 .unwrap();
368 assert!(
369 !ts_str.contains('.'),
370 "timestamp must be an integer: {ts_str:?}"
371 );
372 }
373
374 #[test]
377 fn regression_anchor_no_labels_exact_bytes() {
378 let enc = InfluxLineProtocol::new(None, None);
379 let labels = Labels::from_pairs(&[]).unwrap();
380 let event = make_event(
382 "http_requests_total",
383 123.456,
384 labels,
385 1_700_000_000_000_000_000,
386 );
387 let mut buf = Vec::new();
388 enc.encode_metric(&event, &mut buf).unwrap();
389 assert_eq!(
390 buf,
391 b"http_requests_total value=123.456 1700000000000000000\n"
392 );
393 }
394
395 #[test]
396 fn regression_anchor_two_labels_exact_bytes() {
397 let enc = InfluxLineProtocol::new(None, None);
398 let labels = Labels::from_pairs(&[("hostname", "t0-a1"), ("zone", "eu1")]).unwrap();
399 let event = make_event("interface_state", 1.0, labels, 1_700_000_000_000_000_000);
400 let mut buf = Vec::new();
401 enc.encode_metric(&event, &mut buf).unwrap();
402 assert_eq!(
403 buf,
404 b"interface_state,hostname=t0-a1,zone=eu1 value=1 1700000000000000000\n"
405 );
406 }
407
408 #[test]
409 fn regression_anchor_custom_field_key_exact_bytes() {
410 let enc = InfluxLineProtocol::new(Some("gauge".to_string()), None);
411 let labels = Labels::from_pairs(&[("host", "srv1")]).unwrap();
412 let event = make_event("cpu", 0.75, labels, 1_000_000_000_000_000_000);
413 let mut buf = Vec::new();
414 enc.encode_metric(&event, &mut buf).unwrap();
415 assert_eq!(buf, b"cpu,host=srv1 gauge=0.75 1000000000000000000\n");
416 }
417
418 #[test]
421 fn output_ends_with_newline() {
422 let enc = InfluxLineProtocol::new(None, None);
423 let labels = Labels::from_pairs(&[("k", "v")]).unwrap();
424 let event = make_event("metric", 3.14, labels, 999_000_000);
425 let output = encode_to_string(&enc, &event);
426 assert!(
427 output.ends_with('\n'),
428 "output must end with newline: {output:?}"
429 );
430 }
431
432 #[test]
433 fn encode_appends_to_existing_buffer_content() {
434 let enc = InfluxLineProtocol::new(None, None);
435 let labels = Labels::from_pairs(&[]).unwrap();
436 let event = make_event("up", 1.0, labels, 1_000_000_000);
437 let mut buf = b"existing\n".to_vec();
438 enc.encode_metric(&event, &mut buf).unwrap();
439 let output = String::from_utf8(buf).unwrap();
440 assert!(
441 output.starts_with("existing\n"),
442 "encoder must append, not overwrite: {output:?}"
443 );
444 assert!(
445 output.ends_with("up value=1 1000000000\n"),
446 "appended content missing: {output:?}"
447 );
448 }
449
450 #[test]
451 fn multiple_encodes_accumulate_in_buffer() {
452 let enc = InfluxLineProtocol::new(None, None);
453 let labels = Labels::from_pairs(&[]).unwrap();
454 let event1 = make_event("up", 1.0, labels.clone(), 1_000_000_000);
455 let event2 = make_event("down", 0.0, labels, 2_000_000_000);
456 let mut buf = Vec::new();
457 enc.encode_metric(&event1, &mut buf).unwrap();
458 enc.encode_metric(&event2, &mut buf).unwrap();
459 let output = String::from_utf8(buf).unwrap();
460 let lines: Vec<&str> = output.lines().collect();
461 assert_eq!(lines.len(), 2, "expected 2 lines: {output:?}");
462 assert_eq!(lines[0], "up value=1 1000000000");
463 assert_eq!(lines[1], "down value=0 2000000000");
464 }
465
466 #[test]
469 fn pre_epoch_timestamp_returns_encoder_error() {
470 let before_epoch = UNIX_EPOCH - Duration::from_secs(1);
471 let labels = Labels::from_pairs(&[]).unwrap();
472 let event =
473 MetricEvent::with_timestamp("up".to_string(), 1.0, labels, before_epoch).unwrap();
474 let enc = InfluxLineProtocol::new(None, None);
475 let mut buf = Vec::new();
476 let result = enc.encode_metric(&event, &mut buf);
477 assert!(
478 matches!(result, Err(SondaError::Encoder(_))),
479 "expected Encoder error for pre-epoch timestamp, got: {result:?}"
480 );
481 }
482
483 #[test]
486 fn influx_line_protocol_is_send_and_sync() {
487 fn assert_send_sync<T: Send + Sync>() {}
488 assert_send_sync::<InfluxLineProtocol>();
489 }
490
491 #[test]
494 fn create_encoder_returns_working_influx_encoder_with_default_field_key() {
495 let config = EncoderConfig::InfluxLineProtocol {
496 field_key: None,
497 precision: None,
498 };
499 let enc = create_encoder(&config).unwrap();
500 let labels = Labels::from_pairs(&[]).unwrap();
501 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
502 let event = MetricEvent::with_timestamp("up".to_string(), 1.0, labels, ts).unwrap();
503 let mut buf = Vec::new();
504 enc.encode_metric(&event, &mut buf).unwrap();
505 let output = String::from_utf8(buf).unwrap();
506 assert_eq!(output, "up value=1 1000000000\n");
507 }
508
509 #[test]
510 fn create_encoder_returns_working_influx_encoder_with_custom_field_key() {
511 let config = EncoderConfig::InfluxLineProtocol {
512 field_key: Some("count".to_string()),
513 precision: None,
514 };
515 let enc = create_encoder(&config).unwrap();
516 let labels = Labels::from_pairs(&[]).unwrap();
517 let ts = UNIX_EPOCH + Duration::from_nanos(1_000_000_000);
518 let event = MetricEvent::with_timestamp("up".to_string(), 5.0, labels, ts).unwrap();
519 let mut buf = Vec::new();
520 enc.encode_metric(&event, &mut buf).unwrap();
521 let output = String::from_utf8(buf).unwrap();
522 assert!(
523 output.contains("count=5"),
524 "custom field key 'count' not in factory-created encoder output: {output:?}"
525 );
526 }
527
528 #[cfg(feature = "config")]
529 #[test]
530 fn encoder_config_deserialization_influx_lp_no_field_key() {
531 let config: EncoderConfig =
532 serde_yaml_ng::from_str("type: influx_lp\nfield_key: null").unwrap();
533 assert!(matches!(
534 config,
535 EncoderConfig::InfluxLineProtocol {
536 field_key: None,
537 precision: None
538 }
539 ));
540 }
541
542 #[cfg(feature = "config")]
543 #[test]
544 fn encoder_config_deserialization_influx_lp_with_field_key() {
545 let config: EncoderConfig =
546 serde_yaml_ng::from_str("type: influx_lp\nfield_key: requests").unwrap();
547 assert!(matches!(
548 config,
549 EncoderConfig::InfluxLineProtocol {
550 field_key: Some(ref k), ..
551 } if k == "requests"
552 ));
553 }
554
555 #[test]
558 fn precision_two_limits_decimals_influx() {
559 let enc = InfluxLineProtocol::new(None, Some(2));
560 let labels = Labels::from_pairs(&[]).unwrap();
561 let event = make_event("cpu", 99.60573, labels, 1_700_000_000_000_000_000);
562 let output = encode_to_string(&enc, &event);
563 assert_eq!(output, "cpu value=99.61 1700000000000000000\n");
564 }
565
566 #[test]
567 fn precision_none_preserves_full_output_influx() {
568 let enc = InfluxLineProtocol::new(None, None);
569 let labels = Labels::from_pairs(&[]).unwrap();
570 let event = make_event("cpu", 99.60573506572389, labels, 1_000_000_000);
571 let output = encode_to_string(&enc, &event);
572 assert!(
573 output.contains("value=99.60573506572389"),
574 "full precision must be preserved: {output:?}"
575 );
576 }
577
578 #[test]
579 fn precision_zero_influx() {
580 let enc = InfluxLineProtocol::new(None, Some(0));
581 let labels = Labels::from_pairs(&[]).unwrap();
582 let event = make_event("up", 42.9, labels, 1_000_000_000);
583 let output = encode_to_string(&enc, &event);
584 assert!(
585 output.contains("value=43"),
586 "precision=0 should round: {output:?}"
587 );
588 }
589}