Skip to main content

sonda_core/encoder/
mod.rs

1//! Encoders serialize telemetry events into wire format bytes.
2//!
3//! All encoders implement the `Encoder` trait. They write into a caller-provided
4//! `Vec<u8>` to avoid per-event allocations.
5
6pub mod influx;
7pub mod json;
8#[cfg(feature = "otlp")]
9pub mod otlp;
10pub mod prometheus;
11#[cfg(feature = "remote-write")]
12pub mod remote_write;
13pub mod syslog;
14
15use crate::model::log::LogEvent;
16use crate::model::metric::MetricEvent;
17
18/// Encodes telemetry events into a specific wire format.
19///
20/// Implementations should pre-build any invariant content (label prefixes,
21/// metric name validation) at construction time.
22pub trait Encoder: Send + Sync {
23    /// Encode a metric event into the provided buffer.
24    fn encode_metric(
25        &self,
26        event: &MetricEvent,
27        buf: &mut Vec<u8>,
28    ) -> Result<(), crate::SondaError>;
29
30    /// Encode a log event into the provided buffer.
31    ///
32    /// Returns an error by default. Encoders that support log encoding must
33    /// override this method.
34    fn encode_log(&self, _event: &LogEvent, _buf: &mut Vec<u8>) -> Result<(), crate::SondaError> {
35        Err(crate::SondaError::Encoder(
36            crate::EncoderError::NotSupported("log encoding not supported by this encoder".into()),
37        ))
38    }
39}
40
41/// Configuration selecting which encoder to use for a scenario.
42///
43/// This enum is serde-deserializable from YAML scenario files.
44/// The `type` field selects the variant: `prometheus_text`, `influx_lp`,
45/// `json_lines`, `syslog`, `remote_write`, or `otlp`.
46///
47/// Feature-gated encoders (`remote_write`, `otlp`) have companion
48/// `*Disabled` variants that are compiled in when their feature is absent.
49/// These accept the YAML tag so that deserialization succeeds with a
50/// descriptive error from [`create_encoder`] instead of a generic
51/// "unknown variant" error from serde.
52#[derive(Debug, Clone)]
53#[cfg_attr(feature = "config", derive(serde::Serialize, serde::Deserialize))]
54#[cfg_attr(feature = "config", serde(tag = "type"))]
55#[non_exhaustive]
56pub enum EncoderConfig {
57    /// Prometheus text exposition format (version 0.0.4).
58    ///
59    /// `precision` optionally limits the number of decimal places in metric values.
60    #[cfg_attr(feature = "config", serde(rename = "prometheus_text"))]
61    PrometheusText {
62        /// Maximum decimal places for metric values. `None` preserves full `f64` precision.
63        #[cfg_attr(feature = "config", serde(default))]
64        precision: Option<u8>,
65    },
66    /// InfluxDB line protocol.
67    ///
68    /// `field_key` sets the field key used for the metric value. Defaults to `"value"`.
69    /// `precision` optionally limits the number of decimal places in metric values.
70    #[cfg_attr(feature = "config", serde(rename = "influx_lp"))]
71    InfluxLineProtocol {
72        /// The InfluxDB field key for the metric value. Defaults to `"value"` if absent.
73        field_key: Option<String>,
74        /// Maximum decimal places for metric values. `None` preserves full `f64` precision.
75        #[cfg_attr(feature = "config", serde(default))]
76        precision: Option<u8>,
77    },
78    /// JSON Lines (NDJSON) format.
79    ///
80    /// Each event is serialized as one JSON object per line. Compatible with Elasticsearch,
81    /// Loki, and generic HTTP ingest endpoints.
82    ///
83    /// `precision` optionally rounds the metric value before JSON serialization.
84    #[cfg_attr(feature = "config", serde(rename = "json_lines"))]
85    JsonLines {
86        /// Maximum decimal places for metric values. `None` preserves full `f64` precision.
87        #[cfg_attr(feature = "config", serde(default))]
88        precision: Option<u8>,
89    },
90    /// RFC 5424 syslog format.
91    ///
92    /// Encodes log events as syslog lines. `hostname` and `app_name` default to `"sonda"`.
93    #[cfg_attr(feature = "config", serde(rename = "syslog"))]
94    Syslog {
95        /// The HOSTNAME field in the syslog header. Defaults to `"sonda"`.
96        hostname: Option<String>,
97        /// The APP-NAME field in the syslog header. Defaults to `"sonda"`.
98        app_name: Option<String>,
99    },
100    /// Prometheus remote write protobuf format.
101    ///
102    /// Encodes metric events as length-prefixed protobuf `TimeSeries` messages.
103    /// Must be paired with the `remote_write` sink type, which batches TimeSeries
104    /// into a single `WriteRequest`, snappy-compresses, and HTTP POSTs with the
105    /// correct protocol headers. Requires the `remote-write` feature flag.
106    #[cfg(feature = "remote-write")]
107    #[cfg_attr(feature = "config", serde(rename = "remote_write"))]
108    RemoteWrite,
109
110    /// Placeholder variant when the `remote-write` feature is not compiled in.
111    ///
112    /// Deserializes the `remote_write` YAML tag so that the error message can
113    /// point the user at the missing feature flag instead of producing a
114    /// generic "unknown variant" error from serde.
115    #[cfg(not(feature = "remote-write"))]
116    #[cfg_attr(feature = "config", serde(rename = "remote_write"))]
117    RemoteWriteDisabled {},
118    /// OTLP protobuf format.
119    ///
120    /// Encodes metric events as length-prefixed protobuf `Metric` messages and
121    /// log events as length-prefixed protobuf `LogRecord` messages. Must be
122    /// paired with the `otlp_grpc` sink type, which batches and sends via gRPC.
123    /// Requires the `otlp` feature flag.
124    #[cfg(feature = "otlp")]
125    #[cfg_attr(feature = "config", serde(rename = "otlp"))]
126    Otlp,
127
128    /// Placeholder variant when the `otlp` feature is not compiled in.
129    ///
130    /// Deserializes the `otlp` YAML tag so that the error message can
131    /// point the user at the missing feature flag instead of producing a
132    /// generic "unknown variant" error from serde.
133    #[cfg(not(feature = "otlp"))]
134    #[cfg_attr(feature = "config", serde(rename = "otlp"))]
135    OtlpDisabled {},
136}
137
138/// Create a boxed [`Encoder`] from the given [`EncoderConfig`].
139///
140/// Returns `Err` if the config refers to a feature-gated encoder whose Cargo
141/// feature was not enabled at compile time.
142pub fn create_encoder(config: &EncoderConfig) -> Result<Box<dyn Encoder>, crate::SondaError> {
143    match config {
144        EncoderConfig::PrometheusText { precision } => {
145            Ok(Box::new(prometheus::PrometheusText::new(*precision)))
146        }
147        EncoderConfig::InfluxLineProtocol {
148            field_key,
149            precision,
150        } => Ok(Box::new(influx::InfluxLineProtocol::new(
151            field_key.clone(),
152            *precision,
153        ))),
154        EncoderConfig::JsonLines { precision } => Ok(Box::new(json::JsonLines::new(*precision))),
155        EncoderConfig::Syslog { hostname, app_name } => Ok(Box::new(syslog::Syslog::new(
156            hostname.clone(),
157            app_name.clone(),
158        ))),
159        #[cfg(feature = "remote-write")]
160        EncoderConfig::RemoteWrite => Ok(Box::new(remote_write::RemoteWriteEncoder::new())),
161        #[cfg(feature = "otlp")]
162        EncoderConfig::Otlp => Ok(Box::new(otlp::OtlpEncoder::new())),
163        #[cfg(not(feature = "remote-write"))]
164        EncoderConfig::RemoteWriteDisabled { .. } => {
165            Err(crate::SondaError::Config(crate::ConfigError::invalid(
166                "encoder type 'remote_write' requires the 'remote-write' feature: \
167                 cargo build -F remote-write",
168            )))
169        }
170        #[cfg(not(feature = "otlp"))]
171        EncoderConfig::OtlpDisabled { .. } => {
172            Err(crate::SondaError::Config(crate::ConfigError::invalid(
173                "encoder type 'otlp' requires the 'otlp' feature: cargo build -F otlp",
174            )))
175        }
176    }
177}
178
179/// Write an f64 value to the buffer, optionally with fixed decimal precision.
180///
181/// When `precision` is `None`, uses Rust's default `Display` formatting for `f64`.
182/// When `precision` is `Some(n)`, formats to exactly `n` decimal places.
183pub(crate) fn write_value(buf: &mut Vec<u8>, value: f64, precision: Option<u8>) {
184    use std::io::Write as _;
185    match precision {
186        None => write!(buf, "{}", value),
187        Some(n) => write!(buf, "{:.1$}", value, n as usize),
188    }
189    .expect("write to Vec<u8> is infallible");
190}
191
192/// Fixed byte length of an RFC 3339 timestamp with millisecond precision.
193///
194/// Format: `YYYY-MM-DDTHH:MM:SS.mmmZ` — always exactly 24 bytes.
195pub(crate) const RFC3339_MILLIS_LEN: usize = 24;
196
197/// Format a [`std::time::SystemTime`] as RFC 3339 with millisecond precision,
198/// writing directly into the caller-provided buffer.
199///
200/// Appends exactly 24 bytes of the form `2026-03-20T12:00:00.000Z` to `buf`.
201/// Computed entirely from `UNIX_EPOCH` arithmetic using the Gregorian calendar
202/// algorithm from <https://howardhinnant.github.io/date_algorithms.html> — no
203/// external crate required.
204///
205/// Returns a [`crate::SondaError::Encoder`] if the timestamp predates the Unix epoch.
206pub(crate) fn format_rfc3339_millis(
207    ts: std::time::SystemTime,
208    buf: &mut Vec<u8>,
209) -> Result<(), crate::SondaError> {
210    let arr = format_rfc3339_millis_array(ts)?;
211    buf.extend_from_slice(&arr);
212    Ok(())
213}
214
215/// Format a [`std::time::SystemTime`] as RFC 3339 with millisecond precision
216/// into a stack-allocated byte array.
217///
218/// Returns a fixed-size `[u8; 24]` containing valid UTF-8 of the form
219/// `2026-03-20T12:00:00.000Z`. This avoids heap allocation entirely and is
220/// suitable for callers that need a `&str` (e.g., serde serialization structs).
221///
222/// Returns a [`crate::SondaError::Encoder`] if the timestamp predates the Unix epoch.
223pub(crate) fn format_rfc3339_millis_array(
224    ts: std::time::SystemTime,
225) -> Result<[u8; RFC3339_MILLIS_LEN], crate::SondaError> {
226    use std::time::UNIX_EPOCH;
227
228    let duration = ts
229        .duration_since(UNIX_EPOCH)
230        .map_err(|e| crate::SondaError::Encoder(crate::EncoderError::TimestampBeforeEpoch(e)))?;
231
232    let total_secs = duration.as_secs();
233    let millis = duration.subsec_millis();
234
235    let days = total_secs / 86400;
236    let time_of_day = total_secs % 86400;
237
238    let hour = time_of_day / 3600;
239    let minute = (time_of_day % 3600) / 60;
240    let second = time_of_day % 60;
241
242    // civil_from_days: converts days since Unix epoch to (year, month, day).
243    // Algorithm: https://howardhinnant.github.io/date_algorithms.html
244    let z = days as i64 + 719468;
245    let era = if z >= 0 { z } else { z - 146096 } / 146097;
246    let doe = (z - era * 146097) as u64;
247    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
248    let y = yoe as i64 + era * 400;
249    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
250    let mp = (5 * doy + 2) / 153;
251    let day = doy - (153 * mp + 2) / 5 + 1;
252    let month = if mp < 10 { mp + 3 } else { mp - 9 };
253    let year = if month <= 2 { y + 1 } else { y };
254
255    let mut arr = [0u8; RFC3339_MILLIS_LEN];
256    // write! into a &mut [u8] slice via std::io::Write.
257    // The formatted output is always exactly 24 bytes, so this cannot fail.
258    use std::io::Write as _;
259    let mut cursor = &mut arr[..];
260    write!(
261        cursor,
262        "{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{millis:03}Z",
263    )
264    // Invariant: this holds for years 0000..=9999 (the 4-digit format field).
265    // Years >= 10000 would overflow the 24-byte buffer. SystemTime values
266    // from the Unix epoch (1970) cannot reach year 10000 within u64 range.
267    .expect("RFC 3339 millis timestamp is always exactly 24 bytes");
268    Ok(arr)
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    // ---------------------------------------------------------------------------
276    // EncoderConfig: internally-tagged deserialization (`type:` field)
277    // These tests require the `config` feature (serde_yaml_ng).
278    // ---------------------------------------------------------------------------
279
280    #[cfg(feature = "config")]
281    #[test]
282    fn encoder_config_prometheus_text_deserializes_with_type_field() {
283        let yaml = "type: prometheus_text";
284        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
285        assert!(matches!(config, EncoderConfig::PrometheusText { .. }));
286    }
287
288    #[cfg(feature = "config")]
289    #[test]
290    fn encoder_config_json_lines_deserializes_with_type_field() {
291        let yaml = "type: json_lines";
292        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
293        assert!(matches!(config, EncoderConfig::JsonLines { .. }));
294    }
295
296    #[cfg(feature = "config")]
297    #[test]
298    fn encoder_config_influx_lp_without_field_key_deserializes_with_type_field() {
299        let yaml = "type: influx_lp";
300        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
301        assert!(matches!(
302            config,
303            EncoderConfig::InfluxLineProtocol {
304                field_key: None,
305                precision: None
306            }
307        ));
308    }
309
310    #[cfg(feature = "config")]
311    #[test]
312    fn encoder_config_influx_lp_with_field_key_deserializes_with_type_field() {
313        let yaml = "type: influx_lp\nfield_key: requests";
314        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
315        assert!(matches!(
316            config,
317            EncoderConfig::InfluxLineProtocol { field_key: Some(ref k), .. } if k == "requests"
318        ));
319    }
320
321    #[cfg(feature = "config")]
322    #[test]
323    fn encoder_config_unknown_type_returns_error() {
324        let yaml = "type: no_such_encoder";
325        let result: Result<EncoderConfig, _> = serde_yaml_ng::from_str(yaml);
326        assert!(
327            result.is_err(),
328            "unknown type tag should fail deserialization"
329        );
330    }
331
332    #[cfg(feature = "config")]
333    #[test]
334    fn encoder_config_missing_type_field_returns_error() {
335        // Without the `type` field the internally-tagged enum cannot identify the variant.
336        let yaml = "prometheus_text";
337        let result: Result<EncoderConfig, _> = serde_yaml_ng::from_str(yaml);
338        assert!(
339            result.is_err(),
340            "missing type field should fail deserialization"
341        );
342    }
343
344    #[cfg(feature = "config")]
345    #[test]
346    fn encoder_config_old_external_tag_format_is_rejected() {
347        // The old externally-tagged format (`!prometheus_text`) must no longer be accepted.
348        let yaml = "!prometheus_text";
349        let result: Result<EncoderConfig, _> = serde_yaml_ng::from_str(yaml);
350        assert!(
351            result.is_err(),
352            "externally-tagged YAML format must be rejected in favour of internally-tagged"
353        );
354    }
355
356    // ---------------------------------------------------------------------------
357    // EncoderConfig: factory wiring for all variants
358    // ---------------------------------------------------------------------------
359
360    #[test]
361    fn create_encoder_prometheus_text_succeeds() {
362        let config = EncoderConfig::PrometheusText { precision: None };
363        let _enc = create_encoder(&config).expect("factory must succeed");
364    }
365
366    #[test]
367    fn create_encoder_json_lines_succeeds() {
368        let config = EncoderConfig::JsonLines { precision: None };
369        let _enc = create_encoder(&config).expect("factory must succeed");
370    }
371
372    #[test]
373    fn create_encoder_influx_lp_no_field_key_succeeds() {
374        let config = EncoderConfig::InfluxLineProtocol {
375            field_key: None,
376            precision: None,
377        };
378        let _enc = create_encoder(&config).expect("factory must succeed");
379    }
380
381    #[test]
382    fn create_encoder_influx_lp_with_field_key_succeeds() {
383        let config = EncoderConfig::InfluxLineProtocol {
384            field_key: Some("bytes".to_string()),
385            precision: None,
386        };
387        let _enc = create_encoder(&config).expect("factory must succeed");
388    }
389
390    // ---------------------------------------------------------------------------
391    // EncoderConfig: Send + Sync contract
392    // ---------------------------------------------------------------------------
393
394    #[test]
395    fn encoder_config_is_send_and_sync() {
396        fn assert_send_sync<T: Send + Sync>() {}
397        assert_send_sync::<EncoderConfig>();
398    }
399
400    // ---------------------------------------------------------------------------
401    // EncoderConfig: Clone + Debug contract
402    // ---------------------------------------------------------------------------
403
404    #[test]
405    fn encoder_config_prometheus_text_is_cloneable_and_debuggable() {
406        let config = EncoderConfig::PrometheusText { precision: None };
407        let cloned = config.clone();
408        assert!(matches!(cloned, EncoderConfig::PrometheusText { .. }));
409        let s = format!("{config:?}");
410        assert!(s.contains("PrometheusText"));
411    }
412
413    #[test]
414    fn encoder_config_json_lines_is_cloneable_and_debuggable() {
415        let config = EncoderConfig::JsonLines { precision: None };
416        let cloned = config.clone();
417        assert!(matches!(cloned, EncoderConfig::JsonLines { .. }));
418        let s = format!("{config:?}");
419        assert!(s.contains("JsonLines"));
420    }
421
422    #[test]
423    fn encoder_config_influx_lp_is_cloneable_and_debuggable() {
424        let config = EncoderConfig::InfluxLineProtocol {
425            field_key: Some("val".to_string()),
426            precision: None,
427        };
428        let cloned = config.clone();
429        assert!(matches!(
430            cloned,
431            EncoderConfig::InfluxLineProtocol { field_key: Some(ref k), .. } if k == "val"
432        ));
433        let s = format!("{config:?}");
434        assert!(s.contains("InfluxLineProtocol"));
435    }
436
437    // ---------------------------------------------------------------------------
438    // Encoder trait: default encode_log() returns "not supported" error
439    // ---------------------------------------------------------------------------
440
441    fn make_log_event() -> crate::model::log::LogEvent {
442        use std::collections::BTreeMap;
443        crate::model::log::LogEvent::new(
444            crate::model::log::Severity::Info,
445            "test message".to_string(),
446            crate::model::metric::Labels::default(),
447            BTreeMap::new(),
448        )
449    }
450
451    #[test]
452    fn prometheus_encoder_encode_log_returns_not_supported_error() {
453        let encoder = create_encoder(&EncoderConfig::PrometheusText { precision: None }).unwrap();
454        let event = make_log_event();
455        let mut buf = Vec::new();
456        let result = encoder.encode_log(&event, &mut buf);
457        assert!(
458            result.is_err(),
459            "prometheus encoder must return an error for encode_log"
460        );
461        let err = result.unwrap_err();
462        let msg = err.to_string();
463        assert!(
464            msg.contains("not supported"),
465            "error message should contain 'not supported', got: {msg}"
466        );
467    }
468
469    #[test]
470    fn influx_encoder_encode_log_returns_not_supported_error() {
471        let encoder = create_encoder(&EncoderConfig::InfluxLineProtocol {
472            field_key: None,
473            precision: None,
474        })
475        .unwrap();
476        let event = make_log_event();
477        let mut buf = Vec::new();
478        let result = encoder.encode_log(&event, &mut buf);
479        assert!(
480            result.is_err(),
481            "influx encoder must return an error for encode_log"
482        );
483        let err = result.unwrap_err();
484        let msg = err.to_string();
485        assert!(
486            msg.contains("not supported"),
487            "error message should contain 'not supported', got: {msg}"
488        );
489    }
490
491    #[test]
492    fn json_lines_encoder_encode_log_succeeds() {
493        // Slice 2.3: JsonLines now implements encode_log — it must succeed, not return an error.
494        let encoder = create_encoder(&EncoderConfig::JsonLines { precision: None }).unwrap();
495        let event = make_log_event();
496        let mut buf = Vec::new();
497        let result = encoder.encode_log(&event, &mut buf);
498        assert!(
499            result.is_ok(),
500            "json_lines encoder must support encode_log after slice 2.3"
501        );
502        assert!(!buf.is_empty(), "buffer must contain encoded data");
503    }
504
505    #[test]
506    fn encode_log_default_does_not_write_to_buffer() {
507        // The default implementation must not produce partial output in the buffer.
508        let encoder = create_encoder(&EncoderConfig::PrometheusText { precision: None }).unwrap();
509        let event = make_log_event();
510        let mut buf = Vec::new();
511        let _ = encoder.encode_log(&event, &mut buf);
512        assert!(
513            buf.is_empty(),
514            "buffer must remain empty when encode_log returns an error"
515        );
516    }
517
518    #[test]
519    fn encode_log_error_is_encoder_variant() {
520        // The error must come back as SondaError::Encoder, not some other variant.
521        let encoder = create_encoder(&EncoderConfig::PrometheusText { precision: None }).unwrap();
522        let event = make_log_event();
523        let mut buf = Vec::new();
524        let result = encoder.encode_log(&event, &mut buf);
525        let err = result.unwrap_err();
526        assert!(
527            matches!(err, crate::SondaError::Encoder(_)),
528            "error must be SondaError::Encoder variant, got: {err:?}"
529        );
530    }
531
532    // ---------------------------------------------------------------------------
533    // EncoderConfig::RemoteWrite (feature-gated tests)
534    // ---------------------------------------------------------------------------
535
536    #[cfg(all(feature = "remote-write", feature = "config"))]
537    #[test]
538    fn encoder_config_remote_write_deserializes_from_yaml() {
539        let yaml = "type: remote_write";
540        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
541        assert!(
542            matches!(config, EncoderConfig::RemoteWrite),
543            "should deserialize as RemoteWrite variant"
544        );
545    }
546
547    #[cfg(feature = "remote-write")]
548    #[test]
549    fn create_encoder_remote_write_succeeds() {
550        let config = EncoderConfig::RemoteWrite;
551        let _enc = create_encoder(&config).expect("factory must succeed");
552    }
553
554    #[cfg(feature = "remote-write")]
555    #[test]
556    fn encoder_config_remote_write_is_cloneable_and_debuggable() {
557        let config = EncoderConfig::RemoteWrite;
558        let cloned = config.clone();
559        assert!(matches!(cloned, EncoderConfig::RemoteWrite));
560        let s = format!("{config:?}");
561        assert!(
562            s.contains("RemoteWrite"),
563            "debug output should contain 'RemoteWrite', got: {s}"
564        );
565    }
566
567    #[cfg(feature = "remote-write")]
568    #[test]
569    fn remote_write_encoder_produces_valid_output_through_factory() {
570        use crate::model::metric::{Labels, MetricEvent};
571        use std::time::{Duration, UNIX_EPOCH};
572
573        let config = EncoderConfig::RemoteWrite;
574        let enc = create_encoder(&config).unwrap();
575
576        let labels = Labels::from_pairs(&[("job", "sonda")]).unwrap();
577        let ts = UNIX_EPOCH + Duration::from_secs(1_700_000_000);
578        let event =
579            MetricEvent::with_timestamp("factory_test".to_string(), 10.0, labels, ts).unwrap();
580
581        let mut buf = Vec::new();
582        enc.encode_metric(&event, &mut buf)
583            .expect("encode through factory should succeed");
584        assert!(
585            !buf.is_empty(),
586            "factory-created encoder should produce output"
587        );
588    }
589
590    #[cfg(all(feature = "remote-write", feature = "config"))]
591    #[test]
592    fn scenario_yaml_with_remote_write_encoder_deserializes() {
593        use crate::config::ScenarioConfig;
594        use crate::sink::SinkConfig;
595
596        let yaml = r#"
597name: rw_test_metric
598rate: 10.0
599generator:
600  type: constant
601  value: 1.0
602encoder:
603  type: remote_write
604sink:
605  type: remote_write
606  url: "http://localhost:8428/api/v1/write"
607"#;
608        let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).unwrap();
609        assert_eq!(config.name, "rw_test_metric");
610        assert!(matches!(config.encoder, EncoderConfig::RemoteWrite));
611        assert!(matches!(config.sink, SinkConfig::RemoteWrite { .. }));
612    }
613
614    // ---------------------------------------------------------------------------
615    // EncoderConfig::Otlp (feature-gated tests)
616    // ---------------------------------------------------------------------------
617
618    #[cfg(all(feature = "otlp", feature = "config"))]
619    #[test]
620    fn encoder_config_otlp_deserializes_from_yaml() {
621        let yaml = "type: otlp";
622        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
623        assert!(
624            matches!(config, EncoderConfig::Otlp),
625            "should deserialize as Otlp variant"
626        );
627    }
628
629    #[cfg(feature = "otlp")]
630    #[test]
631    fn create_encoder_otlp_succeeds() {
632        let config = EncoderConfig::Otlp;
633        let _enc = create_encoder(&config).expect("factory must succeed");
634    }
635
636    #[cfg(feature = "otlp")]
637    #[test]
638    fn encoder_config_otlp_is_cloneable_and_debuggable() {
639        let config = EncoderConfig::Otlp;
640        let cloned = config.clone();
641        assert!(matches!(cloned, EncoderConfig::Otlp));
642        let s = format!("{config:?}");
643        assert!(
644            s.contains("Otlp"),
645            "debug output should contain 'Otlp', got: {s}"
646        );
647    }
648
649    #[cfg(feature = "otlp")]
650    #[test]
651    fn otlp_encoder_produces_valid_output_through_factory() {
652        use crate::model::metric::{Labels, MetricEvent};
653        use std::time::{Duration, UNIX_EPOCH};
654
655        let config = EncoderConfig::Otlp;
656        let enc = create_encoder(&config).unwrap();
657
658        let labels = Labels::from_pairs(&[("job", "sonda")]).unwrap();
659        let ts = UNIX_EPOCH + Duration::from_secs(1_700_000_000);
660        let event =
661            MetricEvent::with_timestamp("factory_test".to_string(), 10.0, labels, ts).unwrap();
662
663        let mut buf = Vec::new();
664        enc.encode_metric(&event, &mut buf)
665            .expect("encode through factory should succeed");
666        assert!(
667            !buf.is_empty(),
668            "factory-created encoder should produce output"
669        );
670    }
671
672    #[cfg(feature = "otlp")]
673    #[test]
674    fn otlp_encoder_encode_log_succeeds_through_factory() {
675        let config = EncoderConfig::Otlp;
676        let enc = create_encoder(&config).unwrap();
677        let event = make_log_event();
678        let mut buf = Vec::new();
679        let result = enc.encode_log(&event, &mut buf);
680        assert!(result.is_ok(), "otlp encoder must support encode_log");
681        assert!(!buf.is_empty(), "buffer must contain encoded data");
682    }
683
684    // ---------------------------------------------------------------------------
685    // write_value: shared helper for formatted f64 output
686    // ---------------------------------------------------------------------------
687
688    #[test]
689    fn write_value_none_uses_default_display() {
690        let mut buf = Vec::new();
691        write_value(&mut buf, 1.0, None);
692        assert_eq!(String::from_utf8(buf).unwrap(), "1");
693
694        let mut buf = Vec::new();
695        write_value(&mut buf, 3.14159, None);
696        assert_eq!(String::from_utf8(buf).unwrap(), "3.14159");
697    }
698
699    #[test]
700    fn write_value_precision_0() {
701        let mut buf = Vec::new();
702        write_value(&mut buf, 99.6, Some(0));
703        assert_eq!(String::from_utf8(buf).unwrap(), "100");
704    }
705
706    #[test]
707    fn write_value_precision_2() {
708        let mut buf = Vec::new();
709        write_value(&mut buf, 99.60573, Some(2));
710        assert_eq!(String::from_utf8(buf).unwrap(), "99.61");
711
712        let mut buf = Vec::new();
713        write_value(&mut buf, 100.0, Some(2));
714        assert_eq!(String::from_utf8(buf).unwrap(), "100.00");
715    }
716
717    #[test]
718    fn write_value_precision_with_negative() {
719        let mut buf = Vec::new();
720        write_value(&mut buf, -3.14159, Some(2));
721        assert_eq!(String::from_utf8(buf).unwrap(), "-3.14");
722    }
723
724    #[test]
725    fn write_value_precision_4() {
726        let mut buf = Vec::new();
727        write_value(&mut buf, 1.23456789, Some(4));
728        assert_eq!(String::from_utf8(buf).unwrap(), "1.2346");
729    }
730
731    // ---------------------------------------------------------------------------
732    // EncoderConfig deserialization: precision field
733    // These tests require the `config` feature (serde_yaml_ng).
734    // ---------------------------------------------------------------------------
735
736    #[cfg(feature = "config")]
737    #[test]
738    fn prometheus_text_with_precision_deserializes() {
739        let yaml = "type: prometheus_text\nprecision: 3";
740        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
741        assert!(matches!(
742            config,
743            EncoderConfig::PrometheusText { precision: Some(3) }
744        ));
745    }
746
747    #[cfg(feature = "config")]
748    #[test]
749    fn prometheus_text_without_precision_defaults_to_none() {
750        let yaml = "type: prometheus_text";
751        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
752        assert!(matches!(
753            config,
754            EncoderConfig::PrometheusText { precision: None }
755        ));
756    }
757
758    #[cfg(feature = "config")]
759    #[test]
760    fn influx_with_precision_and_field_key_deserializes() {
761        let yaml = "type: influx_lp\nfield_key: gauge\nprecision: 2";
762        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
763        assert!(matches!(
764            config,
765            EncoderConfig::InfluxLineProtocol {
766                field_key: Some(ref k),
767                precision: Some(2)
768            } if k == "gauge"
769        ));
770    }
771
772    #[cfg(feature = "config")]
773    #[test]
774    fn json_lines_with_precision_deserializes() {
775        let yaml = "type: json_lines\nprecision: 5";
776        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
777        assert!(matches!(
778            config,
779            EncoderConfig::JsonLines { precision: Some(5) }
780        ));
781    }
782
783    #[cfg(feature = "config")]
784    #[test]
785    fn json_lines_without_precision_defaults_to_none() {
786        let yaml = "type: json_lines";
787        let config: EncoderConfig = serde_yaml_ng::from_str(yaml).unwrap();
788        assert!(matches!(
789            config,
790            EncoderConfig::JsonLines { precision: None }
791        ));
792    }
793
794    // ---------------------------------------------------------------------------
795    // format_rfc3339_millis: buffer-based API
796    // ---------------------------------------------------------------------------
797
798    #[test]
799    fn format_rfc3339_millis_writes_to_buffer() {
800        use std::time::{Duration, UNIX_EPOCH};
801        let ts = UNIX_EPOCH + Duration::from_millis(1_774_008_000_000);
802        let mut buf = Vec::new();
803        format_rfc3339_millis(ts, &mut buf).unwrap();
804        assert_eq!(String::from_utf8(buf).unwrap(), "2026-03-20T12:00:00.000Z");
805    }
806
807    #[test]
808    fn format_rfc3339_millis_appends_to_existing_buffer() {
809        use std::time::{Duration, UNIX_EPOCH};
810        let ts = UNIX_EPOCH + Duration::from_millis(1_774_008_000_000);
811        let mut buf = b"prefix:".to_vec();
812        format_rfc3339_millis(ts, &mut buf).unwrap();
813        assert_eq!(
814            String::from_utf8(buf).unwrap(),
815            "prefix:2026-03-20T12:00:00.000Z"
816        );
817    }
818
819    #[test]
820    fn format_rfc3339_millis_epoch_writes_correct_bytes() {
821        use std::time::UNIX_EPOCH;
822        let mut buf = Vec::new();
823        format_rfc3339_millis(UNIX_EPOCH, &mut buf).unwrap();
824        assert_eq!(String::from_utf8(buf).unwrap(), "1970-01-01T00:00:00.000Z");
825    }
826
827    #[test]
828    fn format_rfc3339_millis_before_epoch_returns_error() {
829        use std::time::{Duration, UNIX_EPOCH};
830        let ts = UNIX_EPOCH - Duration::from_secs(1);
831        let mut buf = Vec::new();
832        let result = format_rfc3339_millis(ts, &mut buf);
833        assert!(result.is_err(), "timestamps before epoch must return error");
834        assert!(
835            buf.is_empty(),
836            "buffer must remain empty on error (nothing written before failure)"
837        );
838    }
839
840    // ---------------------------------------------------------------------------
841    // format_rfc3339_millis_array: stack-allocated API
842    // ---------------------------------------------------------------------------
843
844    #[test]
845    fn format_rfc3339_millis_array_returns_correct_bytes() {
846        use std::time::{Duration, UNIX_EPOCH};
847        let ts = UNIX_EPOCH + Duration::from_millis(1_774_008_000_000);
848        let arr = format_rfc3339_millis_array(ts).unwrap();
849        assert_eq!(
850            std::str::from_utf8(&arr).unwrap(),
851            "2026-03-20T12:00:00.000Z"
852        );
853    }
854
855    #[test]
856    fn format_rfc3339_millis_array_epoch() {
857        use std::time::UNIX_EPOCH;
858        let arr = format_rfc3339_millis_array(UNIX_EPOCH).unwrap();
859        assert_eq!(
860            std::str::from_utf8(&arr).unwrap(),
861            "1970-01-01T00:00:00.000Z"
862        );
863    }
864
865    #[test]
866    fn format_rfc3339_millis_array_before_epoch_returns_error() {
867        use std::time::{Duration, UNIX_EPOCH};
868        let ts = UNIX_EPOCH - Duration::from_secs(1);
869        let result = format_rfc3339_millis_array(ts);
870        assert!(result.is_err());
871        let err = result.unwrap_err();
872        assert!(
873            matches!(err, crate::SondaError::Encoder(_)),
874            "error must be Encoder variant, got: {err:?}"
875        );
876    }
877
878    #[test]
879    fn format_rfc3339_millis_array_preserves_milliseconds() {
880        use std::time::{Duration, UNIX_EPOCH};
881        let ts = UNIX_EPOCH + Duration::from_millis(1_700_000_000_789);
882        let arr = format_rfc3339_millis_array(ts).unwrap();
883        let s = std::str::from_utf8(&arr).unwrap();
884        assert!(s.ends_with(".789Z"), "must end with .789Z but got: {s}");
885    }
886
887    #[test]
888    fn format_rfc3339_millis_array_and_buf_produce_identical_output() {
889        use std::time::{Duration, UNIX_EPOCH};
890        let ts = UNIX_EPOCH + Duration::from_millis(1_700_000_000_123);
891        let arr = format_rfc3339_millis_array(ts).unwrap();
892        let mut buf = Vec::new();
893        format_rfc3339_millis(ts, &mut buf).unwrap();
894        assert_eq!(&arr[..], &buf[..]);
895    }
896
897    #[test]
898    fn rfc3339_millis_len_constant_matches_output_size() {
899        use std::time::{Duration, UNIX_EPOCH};
900        let ts = UNIX_EPOCH + Duration::from_millis(1_774_008_000_000);
901        let mut buf = Vec::new();
902        format_rfc3339_millis(ts, &mut buf).unwrap();
903        assert_eq!(buf.len(), RFC3339_MILLIS_LEN);
904    }
905
906    // ---------------------------------------------------------------------------
907    // format_rfc3339_millis: edge-case tests for Gregorian calendar conversion
908    // ---------------------------------------------------------------------------
909
910    /// Leap year: Feb 29 in a divisible-by-4 year that is not a century year.
911    #[test]
912    fn format_rfc3339_millis_leap_year_feb_29_2024() {
913        use std::time::{Duration, UNIX_EPOCH};
914        // 2024-02-29T00:00:00.000Z
915        // Days from 1970-01-01 to 2024-02-29:
916        // 54 full years (1970-2023): 13 leap years (72,76,...2020) + 41 regular
917        // = 13*366 + 41*365 = 4758 + 14965 = 19723 days to 2024-01-01
918        // + 31 (Jan) + 28 (Feb 1-28) = 59, but Feb 29 is day 60
919        // Actually, compute more precisely.
920        // 2024-02-29 00:00:00 UTC = 1709164800 epoch seconds
921        let ts = UNIX_EPOCH + Duration::from_secs(1_709_164_800);
922        let arr = format_rfc3339_millis_array(ts).unwrap();
923        assert_eq!(
924            std::str::from_utf8(&arr).unwrap(),
925            "2024-02-29T00:00:00.000Z"
926        );
927    }
928
929    /// Non-leap year: Mar 1 follows Feb 28 (no Feb 29).
930    #[test]
931    fn format_rfc3339_millis_non_leap_year_mar_1_2023() {
932        use std::time::{Duration, UNIX_EPOCH};
933        // 2023-03-01T00:00:00.000Z = 1677628800 epoch seconds
934        let ts = UNIX_EPOCH + Duration::from_secs(1_677_628_800);
935        let arr = format_rfc3339_millis_array(ts).unwrap();
936        assert_eq!(
937            std::str::from_utf8(&arr).unwrap(),
938            "2023-03-01T00:00:00.000Z"
939        );
940    }
941
942    /// Century boundary: year 2000 IS a leap year (divisible by 400).
943    #[test]
944    fn format_rfc3339_millis_century_leap_year_2000_feb_29() {
945        use std::time::{Duration, UNIX_EPOCH};
946        // 2000-02-29T00:00:00.000Z = 951782400 epoch seconds
947        let ts = UNIX_EPOCH + Duration::from_secs(951_782_400);
948        let arr = format_rfc3339_millis_array(ts).unwrap();
949        assert_eq!(
950            std::str::from_utf8(&arr).unwrap(),
951            "2000-02-29T00:00:00.000Z"
952        );
953    }
954
955    /// Century boundary: year 1900 is NOT a leap year (divisible by 100 but
956    /// not by 400). Since 1900 is before the Unix epoch, we test 2100 instead.
957    /// 2100-03-01 should follow 2100-02-28 (no Feb 29).
958    #[test]
959    fn format_rfc3339_millis_century_non_leap_year_2100_mar_1() {
960        use std::time::{Duration, UNIX_EPOCH};
961        // 2100-03-01T00:00:00.000Z = 4107542400 epoch seconds
962        let ts = UNIX_EPOCH + Duration::from_secs(4_107_542_400);
963        let arr = format_rfc3339_millis_array(ts).unwrap();
964        assert_eq!(
965            std::str::from_utf8(&arr).unwrap(),
966            "2100-03-01T00:00:00.000Z"
967        );
968    }
969
970    /// 2100-02-28 should be valid (last day of Feb in non-leap century year).
971    #[test]
972    fn format_rfc3339_millis_century_non_leap_year_2100_feb_28() {
973        use std::time::{Duration, UNIX_EPOCH};
974        // 2100-02-28T23:59:59.999Z = 4107542400 - 1 ms from midnight Mar 1
975        let ts = UNIX_EPOCH + Duration::from_millis(4_107_542_400_000 - 1);
976        let arr = format_rfc3339_millis_array(ts).unwrap();
977        assert_eq!(
978            std::str::from_utf8(&arr).unwrap(),
979            "2100-02-28T23:59:59.999Z"
980        );
981    }
982
983    /// Dec 31 → Jan 1 transition (year boundary): last second of year.
984    #[test]
985    fn format_rfc3339_millis_dec_31_to_jan_1_transition() {
986        use std::time::{Duration, UNIX_EPOCH};
987        // 2025-12-31T23:59:59.999Z
988        // 2026-01-01T00:00:00.000Z = 1767225600 epoch seconds
989        let ts = UNIX_EPOCH + Duration::from_millis(1_767_225_600_000 - 1);
990        let arr = format_rfc3339_millis_array(ts).unwrap();
991        assert_eq!(
992            std::str::from_utf8(&arr).unwrap(),
993            "2025-12-31T23:59:59.999Z"
994        );
995    }
996
997    /// Jan 1 midnight of a new year.
998    #[test]
999    fn format_rfc3339_millis_jan_1_midnight() {
1000        use std::time::{Duration, UNIX_EPOCH};
1001        // 2026-01-01T00:00:00.000Z = 1767225600 epoch seconds
1002        let ts = UNIX_EPOCH + Duration::from_secs(1_767_225_600);
1003        let arr = format_rfc3339_millis_array(ts).unwrap();
1004        assert_eq!(
1005            std::str::from_utf8(&arr).unwrap(),
1006            "2026-01-01T00:00:00.000Z"
1007        );
1008    }
1009
1010    /// Leap year Dec 31 → Jan 1 transition (end of a 366-day year).
1011    #[test]
1012    fn format_rfc3339_millis_leap_year_dec_31_to_jan_1() {
1013        use std::time::{Duration, UNIX_EPOCH};
1014        // 2024-12-31T23:59:59.999Z
1015        // 2025-01-01T00:00:00.000Z = 1735689600 epoch seconds
1016        let ts = UNIX_EPOCH + Duration::from_millis(1_735_689_600_000 - 1);
1017        let arr = format_rfc3339_millis_array(ts).unwrap();
1018        assert_eq!(
1019            std::str::from_utf8(&arr).unwrap(),
1020            "2024-12-31T23:59:59.999Z"
1021        );
1022    }
1023
1024    /// Mid-day timestamp with non-zero milliseconds.
1025    #[test]
1026    fn format_rfc3339_millis_mid_day_with_millis() {
1027        use std::time::{Duration, UNIX_EPOCH};
1028        // 2024-06-15T14:30:45.123Z
1029        // Compute: 2024-06-15 14:30:45.123 UTC
1030        // 2024-01-01 = 1704067200 epoch seconds
1031        // + 31 (Jan) + 29 (Feb, leap) + 31 (Mar) + 30 (Apr) + 31 (May) + 14 (Jun 1-14) = 166 days
1032        // 166 * 86400 = 14342400
1033        // + 14*3600 + 30*60 + 45 = 50400 + 1800 + 45 = 52245
1034        // Total = 1704067200 + 14342400 + 52245 = 1718461845
1035        let ts = UNIX_EPOCH + Duration::from_millis(1_718_461_845_123);
1036        let arr = format_rfc3339_millis_array(ts).unwrap();
1037        assert_eq!(
1038            std::str::from_utf8(&arr).unwrap(),
1039            "2024-06-15T14:30:45.123Z"
1040        );
1041    }
1042
1043    // ---------------------------------------------------------------------------
1044    // Disabled feature variants: YAML deserialization succeeds and create_encoder
1045    // returns a helpful error instead of a generic "unknown variant" error.
1046    // These tests only compile when the corresponding feature is disabled.
1047    // ---------------------------------------------------------------------------
1048
1049    #[cfg(all(not(feature = "remote-write"), feature = "config"))]
1050    #[test]
1051    fn remote_write_yaml_deserializes_into_disabled_variant_when_feature_is_off() {
1052        let yaml = "type: remote_write";
1053        let config: EncoderConfig = serde_yaml_ng::from_str(yaml)
1054            .expect("type: remote_write must deserialize even without the remote-write feature");
1055        assert!(matches!(config, EncoderConfig::RemoteWriteDisabled { .. }));
1056    }
1057
1058    #[cfg(not(feature = "remote-write"))]
1059    #[test]
1060    fn create_encoder_remote_write_disabled_returns_feature_hint_error() {
1061        let config = EncoderConfig::RemoteWriteDisabled {};
1062        let err = create_encoder(&config)
1063            .err()
1064            .expect("must return Err for disabled variant");
1065        let msg = err.to_string();
1066        assert!(
1067            msg.contains("remote_write"),
1068            "error must mention the encoder type, got: {msg}"
1069        );
1070        assert!(
1071            msg.contains("cargo build -F remote-write"),
1072            "error must tell the user how to enable the feature, got: {msg}"
1073        );
1074    }
1075
1076    #[cfg(all(not(feature = "otlp"), feature = "config"))]
1077    #[test]
1078    fn otlp_yaml_deserializes_into_disabled_variant_when_feature_is_off() {
1079        let yaml = "type: otlp";
1080        let config: EncoderConfig = serde_yaml_ng::from_str(yaml)
1081            .expect("type: otlp must deserialize even without the otlp feature");
1082        assert!(matches!(config, EncoderConfig::OtlpDisabled { .. }));
1083    }
1084
1085    #[cfg(not(feature = "otlp"))]
1086    #[test]
1087    fn create_encoder_otlp_disabled_returns_feature_hint_error() {
1088        let config = EncoderConfig::OtlpDisabled {};
1089        let err = create_encoder(&config)
1090            .err()
1091            .expect("must return Err for disabled variant");
1092        let msg = err.to_string();
1093        assert!(
1094            msg.contains("otlp"),
1095            "error must mention the encoder type, got: {msg}"
1096        );
1097        assert!(
1098            msg.contains("cargo build -F otlp"),
1099            "error must tell the user how to enable the feature, got: {msg}"
1100        );
1101    }
1102}