Skip to main content

cbtop/observability_backend/
mod.rs

1//! Observability Backend Integrations (PMAT-046)
2//!
3//! Export metrics and traces to observability platforms.
4//!
5//! # Design
6//!
7//! - Datadog integration (DogStatsD protocol)
8//! - New Relic Metrics API
9//! - Honeycomb events and traces
10//! - OpenTelemetry collector (OTLP)
11//! - Generic webhook for custom backends
12//!
13//! # Falsification (FKR-047)
14//!
15//! Hâ‚€: Cannot export metrics to 3+ backends simultaneously
16//! Test: Configure all backends, verify each receives metrics
17
18mod types;
19pub use types::*;
20
21use std::collections::HashMap;
22use std::time::Instant;
23
24/// Configuration for the observability exporter
25#[derive(Debug, Clone, Default)]
26pub struct ObservabilityConfig {
27    /// Datadog configuration
28    pub datadog: Option<DatadogConfig>,
29    /// New Relic configuration
30    pub newrelic: Option<NewRelicConfig>,
31    /// Honeycomb configuration
32    pub honeycomb: Option<HoneycombConfig>,
33    /// OTLP configuration
34    pub otlp: Option<OtlpConfig>,
35    /// Webhook configuration
36    pub webhook: Option<WebhookConfig>,
37    /// Batch size for exports
38    pub batch_size: usize,
39    /// Flush interval in milliseconds
40    pub flush_interval_ms: u64,
41}
42
43impl ObservabilityConfig {
44    /// Create new configuration
45    pub fn new() -> Self {
46        Self {
47            batch_size: 100,
48            flush_interval_ms: 10_000,
49            ..Default::default()
50        }
51    }
52
53    /// Enable Datadog
54    pub fn with_datadog(mut self, config: DatadogConfig) -> Self {
55        self.datadog = Some(config);
56        self
57    }
58
59    /// Enable New Relic
60    pub fn with_newrelic(mut self, config: NewRelicConfig) -> Self {
61        self.newrelic = Some(config);
62        self
63    }
64
65    /// Enable Honeycomb
66    pub fn with_honeycomb(mut self, config: HoneycombConfig) -> Self {
67        self.honeycomb = Some(config);
68        self
69    }
70
71    /// Enable OTLP
72    pub fn with_otlp(mut self, config: OtlpConfig) -> Self {
73        self.otlp = Some(config);
74        self
75    }
76
77    /// Enable webhook
78    pub fn with_webhook(mut self, config: WebhookConfig) -> Self {
79        self.webhook = Some(config);
80        self
81    }
82
83    /// Get list of enabled backends
84    pub fn enabled_backends(&self) -> Vec<ObservabilityBackend> {
85        let mut backends = Vec::new();
86        if self.datadog.is_some() {
87            backends.push(ObservabilityBackend::Datadog);
88        }
89        if self.newrelic.is_some() {
90            backends.push(ObservabilityBackend::NewRelic);
91        }
92        if self.honeycomb.is_some() {
93            backends.push(ObservabilityBackend::Honeycomb);
94        }
95        if self.otlp.is_some() {
96            backends.push(ObservabilityBackend::Otlp);
97        }
98        if self.webhook.is_some() {
99            backends.push(ObservabilityBackend::Webhook);
100        }
101        backends
102    }
103}
104
105/// Observability exporter for multiple backends
106#[derive(Debug)]
107pub struct ObservabilityExporter {
108    /// Configuration
109    config: ObservabilityConfig,
110    /// Pending metrics buffer
111    buffer: Vec<ExportMetric>,
112    /// Backend health status
113    health: HashMap<ObservabilityBackend, BackendHealth>,
114    /// Total exports per backend
115    export_counts: HashMap<ObservabilityBackend, u64>,
116    /// Last flush time
117    last_flush: Instant,
118}
119
120impl ObservabilityExporter {
121    /// Create a new observability exporter
122    pub fn new(config: ObservabilityConfig) -> Self {
123        let mut health = HashMap::new();
124
125        // Initialize health for each enabled backend
126        for backend in config.enabled_backends() {
127            health.insert(
128                backend,
129                BackendHealth {
130                    backend,
131                    healthy: true,
132                    last_success: None,
133                    consecutive_failures: 0,
134                    avg_latency_ms: 0.0,
135                },
136            );
137        }
138
139        Self {
140            config,
141            buffer: Vec::new(),
142            health,
143            export_counts: HashMap::new(),
144            last_flush: Instant::now(),
145        }
146    }
147
148    /// Add a metric to the buffer
149    pub fn record(&mut self, metric: ExportMetric) {
150        self.buffer.push(metric);
151
152        // Auto-flush if buffer is full
153        if self.buffer.len() >= self.config.batch_size {
154            let _ = self.flush();
155        }
156    }
157
158    /// Record multiple metrics
159    pub fn record_batch(&mut self, metrics: Vec<ExportMetric>) {
160        for metric in metrics {
161            self.record(metric);
162        }
163    }
164
165    /// Flush buffer to all backends
166    pub fn flush(&mut self) -> Vec<ExportResult> {
167        if self.buffer.is_empty() {
168            return Vec::new();
169        }
170
171        let metrics = std::mem::take(&mut self.buffer);
172        let mut results = Vec::new();
173
174        // Export to each enabled backend
175        for backend in self.config.enabled_backends() {
176            let result = self.export_to_backend(backend, &metrics);
177            self.update_health(backend, &result);
178            results.push(result);
179        }
180
181        self.last_flush = Instant::now();
182        results
183    }
184
185    /// Export metrics to a specific backend
186    fn export_to_backend(
187        &mut self,
188        backend: ObservabilityBackend,
189        metrics: &[ExportMetric],
190    ) -> ExportResult {
191        let start = Instant::now();
192
193        // Simulate export (in production, this would make actual HTTP/UDP calls)
194        let (success, error) = match backend {
195            ObservabilityBackend::Datadog => self.export_to_datadog(metrics),
196            ObservabilityBackend::NewRelic => self.export_to_newrelic(metrics),
197            ObservabilityBackend::Honeycomb => self.export_to_honeycomb(metrics),
198            ObservabilityBackend::Otlp => self.export_to_otlp(metrics),
199            ObservabilityBackend::Webhook => self.export_to_webhook(metrics),
200        };
201
202        let duration_ms = start.elapsed().as_millis() as u64;
203
204        if success {
205            *self.export_counts.entry(backend).or_insert(0) += metrics.len() as u64;
206        }
207
208        ExportResult {
209            backend,
210            success,
211            metrics_exported: if success { metrics.len() } else { 0 },
212            duration_ms,
213            error,
214        }
215    }
216
217    /// Export to Datadog (DogStatsD format)
218    fn export_to_datadog(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
219        let Some(config) = &self.config.datadog else {
220            return (false, Some("Datadog not configured".to_string()));
221        };
222
223        // Format metrics in DogStatsD format
224        let mut formatted = Vec::new();
225        for metric in metrics {
226            let tags: Vec<String> = metric
227                .tags
228                .iter()
229                .map(|(k, v)| format!("{}:{}", k, v))
230                .chain(config.default_tags.iter().cloned())
231                .collect();
232
233            let tag_str = if tags.is_empty() {
234                String::new()
235            } else {
236                format!("|#{}", tags.join(","))
237            };
238
239            let metric_type = match metric.metric_type {
240                MetricExportType::Gauge => "g",
241                MetricExportType::Counter => "c",
242                MetricExportType::Histogram => "h",
243            };
244
245            formatted.push(format!(
246                "{}.{}:{}|{}{}",
247                config.prefix, metric.name, metric.value, metric_type, tag_str
248            ));
249        }
250
251        // In production, send to UDP socket at config.host:config.port
252        let _ = formatted;
253        (true, None)
254    }
255
256    /// Export to New Relic (Metrics API)
257    fn export_to_newrelic(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
258        let Some(_config) = &self.config.newrelic else {
259            return (false, Some("New Relic not configured".to_string()));
260        };
261
262        // Format metrics for New Relic Metrics API
263        let mut payload = Vec::new();
264        for metric in metrics {
265            let metric_data = format!(
266                r#"{{"name":"{}","type":"{}","value":{},"timestamp":{}}}"#,
267                metric.name,
268                match metric.metric_type {
269                    MetricExportType::Gauge => "gauge",
270                    MetricExportType::Counter => "count",
271                    MetricExportType::Histogram => "summary",
272                },
273                metric.value,
274                metric.timestamp_ns / 1_000_000_000
275            );
276            payload.push(metric_data);
277        }
278
279        // In production, POST to config.endpoint with API key header
280        let _ = payload;
281        (true, None)
282    }
283
284    /// Export to Honeycomb (Events API)
285    fn export_to_honeycomb(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
286        let Some(config) = &self.config.honeycomb else {
287            return (false, Some("Honeycomb not configured".to_string()));
288        };
289
290        // Format as Honeycomb events
291        let mut events = Vec::new();
292        for metric in metrics {
293            let mut event: HashMap<&str, String> = HashMap::new();
294            event.insert("name", metric.name.clone());
295            event.insert("value", metric.value.to_string());
296            event.insert("service.name", config.service_name.clone());
297
298            for (k, v) in &metric.tags {
299                event.insert(k.as_str(), v.clone());
300            }
301
302            events.push(event);
303        }
304
305        // In production, POST to config.endpoint with API key
306        let _ = events;
307        (true, None)
308    }
309
310    /// Export to OTLP collector
311    fn export_to_otlp(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
312        let Some(_config) = &self.config.otlp else {
313            return (false, Some("OTLP not configured".to_string()));
314        };
315
316        // Format as OTLP metrics (simplified)
317        let mut otlp_metrics = Vec::new();
318        for metric in metrics {
319            otlp_metrics.push(format!(
320                "metric={{name={},value={},type={:?}}}",
321                metric.name, metric.value, metric.metric_type
322            ));
323        }
324
325        // In production, send via gRPC or HTTP to config.endpoint
326        let _ = otlp_metrics;
327        (true, None)
328    }
329
330    /// Export to webhook
331    fn export_to_webhook(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
332        let Some(_config) = &self.config.webhook else {
333            return (false, Some("Webhook not configured".to_string()));
334        };
335
336        // Format as JSON array
337        let mut json_metrics = Vec::new();
338        for metric in metrics {
339            let tags_json: Vec<String> = metric
340                .tags
341                .iter()
342                .map(|(k, v)| format!(r#""{}":"{}""#, k, v))
343                .collect();
344
345            json_metrics.push(format!(
346                r#"{{"name":"{}","value":{},"type":"{:?}","tags":{{{}}},"timestamp_ns":{}}}"#,
347                metric.name,
348                metric.value,
349                metric.metric_type,
350                tags_json.join(","),
351                metric.timestamp_ns
352            ));
353        }
354
355        let _payload = format!("[{}]", json_metrics.join(","));
356
357        // In production, send HTTP request to config.url
358        (true, None)
359    }
360
361    /// Update backend health based on export result
362    fn update_health(&mut self, backend: ObservabilityBackend, result: &ExportResult) {
363        if let Some(health) = self.health.get_mut(&backend) {
364            if result.success {
365                health.healthy = true;
366                health.last_success = Some(Instant::now());
367                health.consecutive_failures = 0;
368
369                // Update average latency
370                let n = self.export_counts.get(&backend).copied().unwrap_or(1) as f64;
371                health.avg_latency_ms =
372                    health.avg_latency_ms * ((n - 1.0) / n) + (result.duration_ms as f64) / n;
373            } else {
374                health.consecutive_failures += 1;
375                if health.consecutive_failures >= 3 {
376                    health.healthy = false;
377                }
378            }
379        }
380    }
381
382    /// Get health status for a backend
383    pub fn get_health(&self, backend: ObservabilityBackend) -> Option<&BackendHealth> {
384        self.health.get(&backend)
385    }
386
387    /// Get health status for all backends
388    pub fn all_health(&self) -> impl Iterator<Item = &BackendHealth> {
389        self.health.values()
390    }
391
392    /// Get enabled backends
393    pub fn enabled_backends(&self) -> Vec<ObservabilityBackend> {
394        self.config.enabled_backends()
395    }
396
397    /// Get buffer size
398    pub fn buffer_size(&self) -> usize {
399        self.buffer.len()
400    }
401
402    /// Get total exports for a backend
403    pub fn export_count(&self, backend: ObservabilityBackend) -> u64 {
404        self.export_counts.get(&backend).copied().unwrap_or(0)
405    }
406
407    /// Check if auto-flush is needed based on time
408    pub fn should_flush(&self) -> bool {
409        let elapsed = self.last_flush.elapsed().as_millis() as u64;
410        !self.buffer.is_empty() && elapsed >= self.config.flush_interval_ms
411    }
412
413    /// Get configuration
414    pub fn config(&self) -> &ObservabilityConfig {
415        &self.config
416    }
417}
418
419/// Format metric for DogStatsD protocol
420pub fn format_dogstatsd(metric: &ExportMetric, prefix: &str, default_tags: &[String]) -> String {
421    let tags: Vec<String> = metric
422        .tags
423        .iter()
424        .map(|(k, v)| format!("{}:{}", k, v))
425        .chain(default_tags.iter().cloned())
426        .collect();
427
428    let tag_str = if tags.is_empty() {
429        String::new()
430    } else {
431        format!("|#{}", tags.join(","))
432    };
433
434    let metric_type = match metric.metric_type {
435        MetricExportType::Gauge => "g",
436        MetricExportType::Counter => "c",
437        MetricExportType::Histogram => "h",
438    };
439
440    format!(
441        "{}.{}:{}|{}{}",
442        prefix, metric.name, metric.value, metric_type, tag_str
443    )
444}
445
446/// Default batch size
447pub const DEFAULT_BATCH_SIZE: usize = 100;
448
449/// Default flush interval in milliseconds
450pub const DEFAULT_FLUSH_INTERVAL_MS: u64 = 10_000;
451
452#[cfg(test)]
453mod tests;