cbtop/observability_backend/
mod.rs1mod types;
19pub use types::*;
20
21use std::collections::HashMap;
22use std::time::Instant;
23
24#[derive(Debug, Clone, Default)]
26pub struct ObservabilityConfig {
27 pub datadog: Option<DatadogConfig>,
29 pub newrelic: Option<NewRelicConfig>,
31 pub honeycomb: Option<HoneycombConfig>,
33 pub otlp: Option<OtlpConfig>,
35 pub webhook: Option<WebhookConfig>,
37 pub batch_size: usize,
39 pub flush_interval_ms: u64,
41}
42
43impl ObservabilityConfig {
44 pub fn new() -> Self {
46 Self {
47 batch_size: 100,
48 flush_interval_ms: 10_000,
49 ..Default::default()
50 }
51 }
52
53 pub fn with_datadog(mut self, config: DatadogConfig) -> Self {
55 self.datadog = Some(config);
56 self
57 }
58
59 pub fn with_newrelic(mut self, config: NewRelicConfig) -> Self {
61 self.newrelic = Some(config);
62 self
63 }
64
65 pub fn with_honeycomb(mut self, config: HoneycombConfig) -> Self {
67 self.honeycomb = Some(config);
68 self
69 }
70
71 pub fn with_otlp(mut self, config: OtlpConfig) -> Self {
73 self.otlp = Some(config);
74 self
75 }
76
77 pub fn with_webhook(mut self, config: WebhookConfig) -> Self {
79 self.webhook = Some(config);
80 self
81 }
82
83 pub fn enabled_backends(&self) -> Vec<ObservabilityBackend> {
85 let mut backends = Vec::new();
86 if self.datadog.is_some() {
87 backends.push(ObservabilityBackend::Datadog);
88 }
89 if self.newrelic.is_some() {
90 backends.push(ObservabilityBackend::NewRelic);
91 }
92 if self.honeycomb.is_some() {
93 backends.push(ObservabilityBackend::Honeycomb);
94 }
95 if self.otlp.is_some() {
96 backends.push(ObservabilityBackend::Otlp);
97 }
98 if self.webhook.is_some() {
99 backends.push(ObservabilityBackend::Webhook);
100 }
101 backends
102 }
103}
104
105#[derive(Debug)]
107pub struct ObservabilityExporter {
108 config: ObservabilityConfig,
110 buffer: Vec<ExportMetric>,
112 health: HashMap<ObservabilityBackend, BackendHealth>,
114 export_counts: HashMap<ObservabilityBackend, u64>,
116 last_flush: Instant,
118}
119
120impl ObservabilityExporter {
121 pub fn new(config: ObservabilityConfig) -> Self {
123 let mut health = HashMap::new();
124
125 for backend in config.enabled_backends() {
127 health.insert(
128 backend,
129 BackendHealth {
130 backend,
131 healthy: true,
132 last_success: None,
133 consecutive_failures: 0,
134 avg_latency_ms: 0.0,
135 },
136 );
137 }
138
139 Self {
140 config,
141 buffer: Vec::new(),
142 health,
143 export_counts: HashMap::new(),
144 last_flush: Instant::now(),
145 }
146 }
147
148 pub fn record(&mut self, metric: ExportMetric) {
150 self.buffer.push(metric);
151
152 if self.buffer.len() >= self.config.batch_size {
154 let _ = self.flush();
155 }
156 }
157
158 pub fn record_batch(&mut self, metrics: Vec<ExportMetric>) {
160 for metric in metrics {
161 self.record(metric);
162 }
163 }
164
165 pub fn flush(&mut self) -> Vec<ExportResult> {
167 if self.buffer.is_empty() {
168 return Vec::new();
169 }
170
171 let metrics = std::mem::take(&mut self.buffer);
172 let mut results = Vec::new();
173
174 for backend in self.config.enabled_backends() {
176 let result = self.export_to_backend(backend, &metrics);
177 self.update_health(backend, &result);
178 results.push(result);
179 }
180
181 self.last_flush = Instant::now();
182 results
183 }
184
185 fn export_to_backend(
187 &mut self,
188 backend: ObservabilityBackend,
189 metrics: &[ExportMetric],
190 ) -> ExportResult {
191 let start = Instant::now();
192
193 let (success, error) = match backend {
195 ObservabilityBackend::Datadog => self.export_to_datadog(metrics),
196 ObservabilityBackend::NewRelic => self.export_to_newrelic(metrics),
197 ObservabilityBackend::Honeycomb => self.export_to_honeycomb(metrics),
198 ObservabilityBackend::Otlp => self.export_to_otlp(metrics),
199 ObservabilityBackend::Webhook => self.export_to_webhook(metrics),
200 };
201
202 let duration_ms = start.elapsed().as_millis() as u64;
203
204 if success {
205 *self.export_counts.entry(backend).or_insert(0) += metrics.len() as u64;
206 }
207
208 ExportResult {
209 backend,
210 success,
211 metrics_exported: if success { metrics.len() } else { 0 },
212 duration_ms,
213 error,
214 }
215 }
216
217 fn export_to_datadog(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
219 let Some(config) = &self.config.datadog else {
220 return (false, Some("Datadog not configured".to_string()));
221 };
222
223 let mut formatted = Vec::new();
225 for metric in metrics {
226 let tags: Vec<String> = metric
227 .tags
228 .iter()
229 .map(|(k, v)| format!("{}:{}", k, v))
230 .chain(config.default_tags.iter().cloned())
231 .collect();
232
233 let tag_str = if tags.is_empty() {
234 String::new()
235 } else {
236 format!("|#{}", tags.join(","))
237 };
238
239 let metric_type = match metric.metric_type {
240 MetricExportType::Gauge => "g",
241 MetricExportType::Counter => "c",
242 MetricExportType::Histogram => "h",
243 };
244
245 formatted.push(format!(
246 "{}.{}:{}|{}{}",
247 config.prefix, metric.name, metric.value, metric_type, tag_str
248 ));
249 }
250
251 let _ = formatted;
253 (true, None)
254 }
255
256 fn export_to_newrelic(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
258 let Some(_config) = &self.config.newrelic else {
259 return (false, Some("New Relic not configured".to_string()));
260 };
261
262 let mut payload = Vec::new();
264 for metric in metrics {
265 let metric_data = format!(
266 r#"{{"name":"{}","type":"{}","value":{},"timestamp":{}}}"#,
267 metric.name,
268 match metric.metric_type {
269 MetricExportType::Gauge => "gauge",
270 MetricExportType::Counter => "count",
271 MetricExportType::Histogram => "summary",
272 },
273 metric.value,
274 metric.timestamp_ns / 1_000_000_000
275 );
276 payload.push(metric_data);
277 }
278
279 let _ = payload;
281 (true, None)
282 }
283
284 fn export_to_honeycomb(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
286 let Some(config) = &self.config.honeycomb else {
287 return (false, Some("Honeycomb not configured".to_string()));
288 };
289
290 let mut events = Vec::new();
292 for metric in metrics {
293 let mut event: HashMap<&str, String> = HashMap::new();
294 event.insert("name", metric.name.clone());
295 event.insert("value", metric.value.to_string());
296 event.insert("service.name", config.service_name.clone());
297
298 for (k, v) in &metric.tags {
299 event.insert(k.as_str(), v.clone());
300 }
301
302 events.push(event);
303 }
304
305 let _ = events;
307 (true, None)
308 }
309
310 fn export_to_otlp(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
312 let Some(_config) = &self.config.otlp else {
313 return (false, Some("OTLP not configured".to_string()));
314 };
315
316 let mut otlp_metrics = Vec::new();
318 for metric in metrics {
319 otlp_metrics.push(format!(
320 "metric={{name={},value={},type={:?}}}",
321 metric.name, metric.value, metric.metric_type
322 ));
323 }
324
325 let _ = otlp_metrics;
327 (true, None)
328 }
329
330 fn export_to_webhook(&self, metrics: &[ExportMetric]) -> (bool, Option<String>) {
332 let Some(_config) = &self.config.webhook else {
333 return (false, Some("Webhook not configured".to_string()));
334 };
335
336 let mut json_metrics = Vec::new();
338 for metric in metrics {
339 let tags_json: Vec<String> = metric
340 .tags
341 .iter()
342 .map(|(k, v)| format!(r#""{}":"{}""#, k, v))
343 .collect();
344
345 json_metrics.push(format!(
346 r#"{{"name":"{}","value":{},"type":"{:?}","tags":{{{}}},"timestamp_ns":{}}}"#,
347 metric.name,
348 metric.value,
349 metric.metric_type,
350 tags_json.join(","),
351 metric.timestamp_ns
352 ));
353 }
354
355 let _payload = format!("[{}]", json_metrics.join(","));
356
357 (true, None)
359 }
360
361 fn update_health(&mut self, backend: ObservabilityBackend, result: &ExportResult) {
363 if let Some(health) = self.health.get_mut(&backend) {
364 if result.success {
365 health.healthy = true;
366 health.last_success = Some(Instant::now());
367 health.consecutive_failures = 0;
368
369 let n = self.export_counts.get(&backend).copied().unwrap_or(1) as f64;
371 health.avg_latency_ms =
372 health.avg_latency_ms * ((n - 1.0) / n) + (result.duration_ms as f64) / n;
373 } else {
374 health.consecutive_failures += 1;
375 if health.consecutive_failures >= 3 {
376 health.healthy = false;
377 }
378 }
379 }
380 }
381
382 pub fn get_health(&self, backend: ObservabilityBackend) -> Option<&BackendHealth> {
384 self.health.get(&backend)
385 }
386
387 pub fn all_health(&self) -> impl Iterator<Item = &BackendHealth> {
389 self.health.values()
390 }
391
392 pub fn enabled_backends(&self) -> Vec<ObservabilityBackend> {
394 self.config.enabled_backends()
395 }
396
397 pub fn buffer_size(&self) -> usize {
399 self.buffer.len()
400 }
401
402 pub fn export_count(&self, backend: ObservabilityBackend) -> u64 {
404 self.export_counts.get(&backend).copied().unwrap_or(0)
405 }
406
407 pub fn should_flush(&self) -> bool {
409 let elapsed = self.last_flush.elapsed().as_millis() as u64;
410 !self.buffer.is_empty() && elapsed >= self.config.flush_interval_ms
411 }
412
413 pub fn config(&self) -> &ObservabilityConfig {
415 &self.config
416 }
417}
418
419pub fn format_dogstatsd(metric: &ExportMetric, prefix: &str, default_tags: &[String]) -> String {
421 let tags: Vec<String> = metric
422 .tags
423 .iter()
424 .map(|(k, v)| format!("{}:{}", k, v))
425 .chain(default_tags.iter().cloned())
426 .collect();
427
428 let tag_str = if tags.is_empty() {
429 String::new()
430 } else {
431 format!("|#{}", tags.join(","))
432 };
433
434 let metric_type = match metric.metric_type {
435 MetricExportType::Gauge => "g",
436 MetricExportType::Counter => "c",
437 MetricExportType::Histogram => "h",
438 };
439
440 format!(
441 "{}.{}:{}|{}{}",
442 prefix, metric.name, metric.value, metric_type, tag_str
443 )
444}
445
446pub const DEFAULT_BATCH_SIZE: usize = 100;
448
449pub const DEFAULT_FLUSH_INTERVAL_MS: u64 = 10_000;
451
452#[cfg(test)]
453mod tests;