Skip to main content

coil_observability/
telemetry.rs

1use crate::ObservabilityError;
2use crate::health::ErrorCategory;
3use crate::validation::{DimensionKey, MetricName};
4use coil_config::{Environment, ObservabilityConfig};
5use std::collections::{BTreeMap, BTreeSet, VecDeque};
6use std::sync::{Arc, Mutex};
7
8const MAX_RECENT_TRACES: usize = 64;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
11pub enum MetricKind {
12    Counter,
13    Gauge,
14    Histogram,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub enum MetricUnit {
19    Count,
20    Milliseconds,
21    Bytes,
22    Ratio,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct MetricDefinition {
27    pub name: MetricName,
28    pub kind: MetricKind,
29    pub unit: MetricUnit,
30    pub dimensions: BTreeSet<DimensionKey>,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct HistogramReading {
35    pub samples: u64,
36    pub last: u64,
37    pub max: u64,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum MetricReading {
42    Counter(u64),
43    Gauge(i64),
44    Histogram(HistogramReading),
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct TraceRecord {
49    pub trace_id: String,
50    pub span: String,
51    pub outcome: String,
52    pub recorded_at_unix_seconds: u64,
53    pub fields: BTreeMap<String, String>,
54}
55
56impl TraceRecord {
57    pub fn new(
58        trace_id: impl Into<String>,
59        span: impl Into<String>,
60        outcome: impl Into<String>,
61        recorded_at_unix_seconds: u64,
62    ) -> Self {
63        Self {
64            trace_id: trace_id.into(),
65            span: span.into(),
66            outcome: outcome.into(),
67            recorded_at_unix_seconds,
68            fields: BTreeMap::new(),
69        }
70    }
71
72    pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
73        self.fields.insert(key.into(), value.into());
74        self
75    }
76}
77
78#[derive(Debug, Default)]
79struct TelemetryState {
80    readings: BTreeMap<MetricName, MetricReading>,
81    recent_traces: VecDeque<TraceRecord>,
82}
83
84impl MetricDefinition {
85    pub fn new(
86        name: impl Into<String>,
87        kind: MetricKind,
88        unit: MetricUnit,
89    ) -> Result<Self, ObservabilityError> {
90        Ok(Self {
91            name: MetricName::new(name)?,
92            kind,
93            unit,
94            dimensions: BTreeSet::new(),
95        })
96    }
97
98    pub fn with_dimension(
99        mut self,
100        dimension: impl Into<String>,
101    ) -> Result<Self, ObservabilityError> {
102        self.dimensions.insert(DimensionKey::new(dimension)?);
103        Ok(self)
104    }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub struct TracePolicy {
109    pub enabled: bool,
110    pub sample_permyriad: u16,
111}
112
113impl TracePolicy {
114    pub fn new(enabled: bool, sample_permyriad: u16) -> Result<Self, ObservabilityError> {
115        if sample_permyriad > 10_000 {
116            return Err(ObservabilityError::InvalidTraceSampleRate {
117                permyriad: sample_permyriad,
118            });
119        }
120
121        Ok(Self {
122            enabled,
123            sample_permyriad,
124        })
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct TelemetryCatalog {
130    pub metrics_enabled: bool,
131    pub required_log_dimensions: BTreeSet<DimensionKey>,
132    pub metrics: BTreeMap<MetricName, MetricDefinition>,
133    pub trace: TracePolicy,
134    pub error_categories: BTreeSet<ErrorCategory>,
135    live_state: Arc<Mutex<TelemetryState>>,
136}
137
138impl TelemetryCatalog {
139    pub fn baseline(
140        config: &ObservabilityConfig,
141        environment: Environment,
142    ) -> Result<Self, ObservabilityError> {
143        let mut metrics = BTreeMap::new();
144        for metric in baseline_metrics()? {
145            let name = metric.name.clone();
146            if metrics.insert(name.clone(), metric).is_some() {
147                return Err(ObservabilityError::DuplicateMetric {
148                    metric: name.to_string(),
149                });
150            }
151        }
152
153        let mut dimensions = BTreeSet::new();
154        for value in [
155            "customer_app",
156            "site",
157            "brand",
158            "route",
159            "module",
160            "extension_point",
161            "outcome",
162            "error_category",
163        ] {
164            dimensions.insert(DimensionKey::new(value)?);
165        }
166
167        let trace = TracePolicy::new(
168            config.tracing,
169            match environment {
170                Environment::Development => 10_000,
171                Environment::Staging => 5_000,
172                Environment::Production => 1_000,
173            },
174        )?;
175        let initial_readings = initial_metric_readings(&metrics);
176
177        Ok(Self {
178            metrics_enabled: config.metrics,
179            required_log_dimensions: dimensions,
180            metrics,
181            trace,
182            error_categories: BTreeSet::from([
183                ErrorCategory::Validation,
184                ErrorCategory::AuthorizationDenied,
185                ErrorCategory::StateConflict,
186                ErrorCategory::DependencyFailure,
187                ErrorCategory::Timeout,
188                ErrorCategory::Capacity,
189                ErrorCategory::InvariantViolation,
190                ErrorCategory::ExtensionTrap,
191            ]),
192            live_state: Arc::new(Mutex::new(TelemetryState {
193                readings: initial_readings,
194                recent_traces: VecDeque::new(),
195            })),
196        })
197    }
198
199    pub fn metric(&self, name: &str) -> Option<&MetricDefinition> {
200        self.metrics.get(&MetricName::new(name.to_string()).ok()?)
201    }
202
203    pub fn metric_reading(&self, name: &str) -> Option<MetricReading> {
204        let metric = MetricName::new(name.to_string()).ok()?;
205        self.live_state
206            .lock()
207            .expect("telemetry mutex poisoned")
208            .readings
209            .get(&metric)
210            .copied()
211    }
212
213    pub fn increment_counter(&self, name: &str, delta: u64) -> bool {
214        self.update_metric(name, |reading| match reading {
215            MetricReading::Counter(value) => {
216                *value = value.saturating_add(delta);
217                true
218            }
219            _ => false,
220        })
221    }
222
223    pub fn adjust_gauge(&self, name: &str, delta: i64) -> bool {
224        self.update_metric(name, |reading| match reading {
225            MetricReading::Gauge(value) => {
226                *value = value.saturating_add(delta);
227                true
228            }
229            _ => false,
230        })
231    }
232
233    pub fn record_histogram(&self, name: &str, sample: u64) -> bool {
234        self.update_metric(name, |reading| match reading {
235            MetricReading::Histogram(value) => {
236                value.samples = value.samples.saturating_add(1);
237                value.last = sample;
238                value.max = value.max.max(sample);
239                true
240            }
241            _ => false,
242        })
243    }
244
245    pub fn set_gauge(&self, name: &str, value: i64) -> bool {
246        self.update_metric(name, |reading| match reading {
247            MetricReading::Gauge(current) => {
248                *current = value;
249                true
250            }
251            _ => false,
252        })
253    }
254
255    pub fn record_trace(&self, trace: TraceRecord) -> bool {
256        if !self.trace.enabled {
257            return false;
258        }
259
260        let mut state = self.live_state.lock().expect("telemetry mutex poisoned");
261        if state.recent_traces.len() >= MAX_RECENT_TRACES {
262            state.recent_traces.pop_front();
263        }
264        state.recent_traces.push_back(trace);
265        true
266    }
267
268    pub fn recent_traces(&self, limit: usize) -> Vec<TraceRecord> {
269        let state = self.live_state.lock().expect("telemetry mutex poisoned");
270        state
271            .recent_traces
272            .iter()
273            .rev()
274            .take(limit)
275            .cloned()
276            .collect()
277    }
278
279    fn update_metric(
280        &self,
281        name: &str,
282        mut update: impl FnMut(&mut MetricReading) -> bool,
283    ) -> bool {
284        let Ok(metric_name) = MetricName::new(name.to_string()) else {
285            return false;
286        };
287        let mut state = self.live_state.lock().expect("telemetry mutex poisoned");
288        let Some(reading) = state.readings.get_mut(&metric_name) else {
289            return false;
290        };
291        update(reading)
292    }
293}
294
295impl PartialEq for TelemetryCatalog {
296    fn eq(&self, other: &Self) -> bool {
297        self.metrics_enabled == other.metrics_enabled
298            && self.required_log_dimensions == other.required_log_dimensions
299            && self.metrics == other.metrics
300            && self.trace == other.trace
301            && self.error_categories == other.error_categories
302    }
303}
304
305impl Eq for TelemetryCatalog {}
306
307fn baseline_metrics() -> Result<Vec<MetricDefinition>, ObservabilityError> {
308    let customer_dimensions = ["customer_app", "route", "outcome"];
309    let storage_dimensions = ["customer_app", "module", "outcome"];
310    let extension_dimensions = ["customer_app", "extension_point", "outcome"];
311
312    Ok(vec![
313        metric(
314            "coil.http.requests.total",
315            MetricKind::Counter,
316            MetricUnit::Count,
317            &["customer_app", "outcome"],
318        )?,
319        metric(
320            "coil.http.requests.in_flight",
321            MetricKind::Gauge,
322            MetricUnit::Count,
323            &["customer_app"],
324        )?,
325        metric(
326            "coil.http.request.latency_ms",
327            MetricKind::Histogram,
328            MetricUnit::Milliseconds,
329            &customer_dimensions,
330        )?,
331        metric(
332            "coil.auth.check.latency_ms",
333            MetricKind::Histogram,
334            MetricUnit::Milliseconds,
335            &["customer_app", "module", "outcome"],
336        )?,
337        metric(
338            "coil.cache.hit_ratio",
339            MetricKind::Gauge,
340            MetricUnit::Ratio,
341            &["customer_app", "module"],
342        )?,
343        metric(
344            "coil.queue.depth",
345            MetricKind::Gauge,
346            MetricUnit::Count,
347            &["customer_app", "module"],
348        )?,
349        metric(
350            "coil.storage.sync.backlog",
351            MetricKind::Gauge,
352            MetricUnit::Count,
353            &storage_dimensions,
354        )?,
355        metric(
356            "coil.tls.renewal.failures",
357            MetricKind::Counter,
358            MetricUnit::Count,
359            &["customer_app", "outcome"],
360        )?,
361        metric(
362            "coil.extension.runtime_ms",
363            MetricKind::Histogram,
364            MetricUnit::Milliseconds,
365            &extension_dimensions,
366        )?,
367    ])
368}
369
370fn metric(
371    name: &str,
372    kind: MetricKind,
373    unit: MetricUnit,
374    dimensions: &[&str],
375) -> Result<MetricDefinition, ObservabilityError> {
376    let mut definition = MetricDefinition::new(name, kind, unit)?;
377    for dimension in dimensions {
378        definition = definition.with_dimension(*dimension)?;
379    }
380    Ok(definition)
381}
382
383fn initial_metric_readings(
384    metrics: &BTreeMap<MetricName, MetricDefinition>,
385) -> BTreeMap<MetricName, MetricReading> {
386    metrics
387        .iter()
388        .map(|(name, definition)| {
389            let reading = match definition.kind {
390                MetricKind::Counter => MetricReading::Counter(0),
391                MetricKind::Gauge => MetricReading::Gauge(0),
392                MetricKind::Histogram => MetricReading::Histogram(HistogramReading {
393                    samples: 0,
394                    last: 0,
395                    max: 0,
396                }),
397            };
398            (name.clone(), reading)
399        })
400        .collect()
401}