Skip to main content

aetheris_server/
telemetry.rs

1//! Telemetry gRPC service implementation + JSON HTTP handler.
2//!
3//! Receives structured log events from WASM clients over gRPC-web (TCP/HTTP)
4//! and forwards them to the tracing pipeline (Loki via Promtail).
5//!
6//! Also exposes a plain JSON endpoint (`POST /telemetry/json`) consumed by the
7//! WASM `metrics.rs` flush path (no gRPC framing needed on the client side).
8//!
9//! This is an **out-of-band** diagnostic channel, intentionally independent of
10//! WebTransport, so it remains reachable during QUIC failures.
11//!
12//! # M10105 — Prometheus Metrics (static labels only, anti-cardinality rule)
13//!
14//! All metrics use `client_type = "wasm_playground"` as the only label.
15//! No UUIDs or client IDs appear in Prometheus label sets to prevent
16//! time-series cardinality explosion during continuous Playground testing.
17
18use aetheris_protocol::telemetry::v1::{
19    TelemetryBatch, TelemetryLevel, TelemetryResponse, telemetry_service_server::TelemetryService,
20};
21use axum::{Json, extract::ConnectInfo, http::StatusCode, response::IntoResponse};
22use dashmap::DashMap;
23use serde::Deserialize;
24use std::net::{IpAddr, SocketAddr};
25use std::sync::Arc;
26use tonic::{Request, Response, Status};
27
28/// Requests allowed per IP within the rate-limit window.
29const RATE_LIMIT_MAX: u32 = 60;
30/// Duration of the rate-limit sliding window in seconds.
31const RATE_LIMIT_WINDOW_SECS: u64 = 60;
32/// Maximum length of `message` and `target` fields after server-side sanitization.
33const MAX_FIELD_LEN: usize = 512;
34/// Maximum number of events allowed in a single batch.
35const MAX_BATCH_SIZE: usize = 256;
36/// Threshold of rate limit entries before an opportunistic prune is triggered.
37const PRUNE_THRESHOLD: usize = 1000;
38
39/// Returns the current Unix timestamp in seconds.
40fn now_secs() -> u64 {
41    std::time::SystemTime::now()
42        .duration_since(std::time::UNIX_EPOCH)
43        .unwrap_or_default()
44        .as_secs()
45}
46
47/// Strips ASCII control characters and truncates to `max_len` bytes at a char boundary.
48/// Ensures the invariant that the output byte length is <= `max_len`.
49fn sanitize(s: &str, max_len: usize) -> String {
50    let clean: String = s.chars().filter(|c| !c.is_control()).collect();
51    if clean.len() <= max_len {
52        clean
53    } else {
54        clean
55            .char_indices()
56            .take_while(|(i, c)| i + c.len_utf8() <= max_len)
57            .last()
58            .map_or_else(String::new, |(i, c)| clean[..i + c.len_utf8()].to_string())
59    }
60}
61
62/// Extract the client's real IP address from metadata (Forwarded or X-Forwarded-For)
63/// falling back to the remote address provided by tonic.
64fn extract_client_ip<T>(request: &Request<T>) -> Option<IpAddr> {
65    let metadata = request.metadata();
66
67    // 1. Check Forwarded header (RFC 7239)
68    if let Some(forwarded) = metadata.get("forwarded").and_then(|v| v.to_str().ok()) {
69        for part in forwarded.split(';') {
70            let part = part.trim();
71            if let Some(for_val) = part.strip_prefix("for=") {
72                let ip_str = for_val.trim_matches('"').split(':').next()?;
73                if let Ok(ip) = ip_str.parse::<IpAddr>() {
74                    return Some(ip);
75                }
76            }
77        }
78    }
79
80    // 2. Check X-Forwarded-For (Common legacy proxy header)
81    if let Some(xff) = metadata
82        .get("x-forwarded-for")
83        .and_then(|v| v.to_str().ok())
84    {
85        // First entry is the original client
86        if let Some(first) = xff.split(',').next()
87            && let Ok(ip) = first.trim().parse::<IpAddr>()
88        {
89            return Some(ip);
90        }
91    }
92
93    // 3. Fallback to physical peer address
94    request.remote_addr().map(|addr| addr.ip())
95}
96
97#[derive(Clone)]
98pub struct AetherisTelemetryService {
99    /// Per-IP sliding-window rate limiter: maps IP → (`request_count`, `window_start_secs`).
100    rate_limits: Arc<DashMap<IpAddr, (u32, u64)>>,
101}
102
103impl AetherisTelemetryService {
104    #[must_use]
105    pub fn new() -> Self {
106        Self {
107            rate_limits: Arc::new(DashMap::new()),
108        }
109    }
110
111    /// Performs an opportunistic sweep of the rate limit map to remove expired entries.
112    fn prune_expired_entries(&self) {
113        let now = now_secs();
114        // Entry is considered stale if the window started more than 2x window duration ago.
115        self.rate_limits.retain(|_, (_, window_start)| {
116            now.saturating_sub(*window_start) < 2 * RATE_LIMIT_WINDOW_SECS
117        });
118    }
119
120    /// Returns `true` if the request from `ip` is within the allowed rate.
121    fn check_rate_limit(&self, ip: IpAddr) -> bool {
122        // Trigger opportunistic pruning if the map is getting large
123        if self.rate_limits.len() > PRUNE_THRESHOLD {
124            self.prune_expired_entries();
125        }
126
127        let now = now_secs();
128        let mut entry = self.rate_limits.entry(ip).or_insert((0, now));
129        let (count, window_start) = entry.value_mut();
130
131        if now.saturating_sub(*window_start) >= RATE_LIMIT_WINDOW_SECS {
132            *window_start = now;
133            *count = 1;
134            true
135        } else if *count < RATE_LIMIT_MAX {
136            *count += 1;
137            true
138        } else {
139            false
140        }
141    }
142}
143
144impl Default for AetherisTelemetryService {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150#[tonic::async_trait]
151impl TelemetryService for AetherisTelemetryService {
152    async fn submit_telemetry(
153        &self,
154        request: Request<TelemetryBatch>,
155    ) -> Result<Response<TelemetryResponse>, Status> {
156        // Resolve the real client IP, denying request if it cannot be determined (security hardening)
157        let client_ip = extract_client_ip(&request).ok_or_else(|| {
158            Status::permission_denied("Identification protocol failed: client IP indeterminate")
159        })?;
160
161        if !self.check_rate_limit(client_ip) {
162            return Err(Status::resource_exhausted(
163                "Telemetry rate limit exceeded. Try again in 60 seconds.",
164            ));
165        }
166
167        let batch = request.into_inner();
168
169        // Enforce batch size limit before heavy processing
170        if batch.events.len() > MAX_BATCH_SIZE {
171            return Err(Status::invalid_argument(format!(
172                "Batch size policy violation: limit is {MAX_BATCH_SIZE} events"
173            )));
174        }
175
176        // Validate session_id length — user-supplied, treat as untrusted input.
177        if batch.session_id.len() > 128 {
178            return Err(Status::invalid_argument("session_id exceeds 128 bytes"));
179        }
180
181        let session_id = sanitize(&batch.session_id, 128);
182        process_events_grpc(&batch.events, &session_id);
183        Ok(Response::new(TelemetryResponse {}))
184    }
185}
186
187// ---------------------------------------------------------------------------
188// Shared event processing — used by both gRPC and JSON handlers
189// ---------------------------------------------------------------------------
190
191fn process_events_grpc(
192    events: &[aetheris_protocol::telemetry::v1::TelemetryEvent],
193    session_id: &str,
194) {
195    metrics::counter!(
196        "aetheris_wasm_telemetry_batches_total",
197        "client_type" => "wasm_playground"
198    )
199    .increment(1);
200
201    for event in events {
202        let target = sanitize(&event.target, MAX_FIELD_LEN);
203        let message = sanitize(&event.message, MAX_FIELD_LEN);
204        let trace_id = sanitize(&event.trace_id, 64);
205        let span_name = sanitize(&event.span_name, 128);
206        let ts = event.timestamp_ms;
207
208        // Record Prometheus metrics parsed from the metrics_snapshot event
209        if event.span_name == "metrics_snapshot" {
210            record_wasm_metrics(&event.message);
211        }
212
213        let level =
214            TelemetryLevel::try_from(event.level).unwrap_or(TelemetryLevel::LevelUnspecified);
215        metrics::counter!(
216            "aetheris_wasm_telemetry_events_total",
217            "client_type" => "wasm_playground",
218            "level" => level_str(level),
219        )
220        .increment(1);
221
222        // Emit into tracing so events flow through Loki via Promtail.
223        // trace_id field enables log-to-trace correlation in Grafana.
224        match level {
225            TelemetryLevel::Error => tracing::error!(
226                session_id = %session_id,
227                trace_id = %trace_id,
228                span_name = %span_name,
229                target = %target,
230                timestamp_ms = ts,
231                rtt_ms = ?event.rtt_ms,
232                "wasm: {}", message
233            ),
234            TelemetryLevel::Warn => tracing::warn!(
235                session_id = %session_id,
236                trace_id = %trace_id,
237                span_name = %span_name,
238                target = %target,
239                timestamp_ms = ts,
240                rtt_ms = ?event.rtt_ms,
241                "wasm: {}", message
242            ),
243            TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => tracing::info!(
244                session_id = %session_id,
245                trace_id = %trace_id,
246                span_name = %span_name,
247                target = %target,
248                timestamp_ms = ts,
249                rtt_ms = ?event.rtt_ms,
250                "wasm: {}", message
251            ),
252        }
253    }
254}
255
256fn level_str(level: TelemetryLevel) -> &'static str {
257    match level {
258        TelemetryLevel::Error => "error",
259        TelemetryLevel::Warn => "warn",
260        TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => "info",
261    }
262}
263
264/// Parse `metrics_snapshot` message fields and record as Prometheus histograms.
265/// Message format: `fps=60.0 frame_p99=2.10ms sim_p99=0.50ms rtt=12.3ms entities=5 snapshots=3 dropped=0`
266///
267/// Uses static `client_type` label only — no UUIDs (anti-cardinality rule, M10105 §5.2).
268fn record_wasm_metrics(msg: &str) {
269    for part in msg.split_whitespace() {
270        if let Some((key, val)) = part.split_once('=') {
271            let num = val
272                .trim_end_matches("ms")
273                .parse::<f64>()
274                .unwrap_or(f64::NAN);
275
276            // M10105 — Validation: ignore non-finite, out-of-range, or negative values.
277            if !num.is_finite() || !(0.0..=1_000_000.0).contains(&num) {
278                continue;
279            }
280
281            match key {
282                "fps" => {
283                    metrics::histogram!(
284                        "aetheris_wasm_fps",
285                        "client_type" => "wasm_playground"
286                    )
287                    .record(num);
288                }
289                "frame_p99" => {
290                    metrics::histogram!(
291                        "aetheris_wasm_frame_time_ms",
292                        "client_type" => "wasm_playground"
293                    )
294                    .record(num);
295                }
296                "sim_p99" => {
297                    metrics::histogram!(
298                        "aetheris_wasm_sim_time_ms",
299                        "client_type" => "wasm_playground"
300                    )
301                    .record(num);
302                }
303                "rtt" => {
304                    metrics::histogram!(
305                        "aetheris_wasm_rtt_ms",
306                        "client_type" => "wasm_playground"
307                    )
308                    .record(num);
309                }
310                "entities" => {
311                    metrics::gauge!(
312                        "aetheris_wasm_entity_count",
313                        "client_type" => "wasm_playground"
314                    )
315                    .set(num);
316                }
317                "dropped" => {
318                    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
319                    let count = num as u64;
320                    metrics::counter!(
321                        "aetheris_wasm_telemetry_dropped_total",
322                        "client_type" => "wasm_playground"
323                    )
324                    .increment(count);
325                }
326                _ => {}
327            }
328        }
329    }
330}
331
332// ---------------------------------------------------------------------------
333// JSON HTTP handler (POST /telemetry/json) — consumed by WASM metrics.rs
334// ---------------------------------------------------------------------------
335
336/// JSON wire format matching `TelemetryEventJson` in WASM `metrics.rs`.
337#[derive(Debug, Deserialize)]
338pub struct JsonTelemetryEvent {
339    pub timestamp_ms: f64,
340    pub level: u32,
341    pub target: String,
342    pub message: String,
343    #[serde(default)]
344    pub rtt_ms: Option<f64>,
345    pub trace_id: String,
346    pub span_name: String,
347}
348
349#[derive(Debug, Deserialize)]
350pub struct JsonTelemetryBatch {
351    pub events: Vec<JsonTelemetryEvent>,
352    pub session_id: String,
353}
354
355/// Axum handler for `POST /telemetry/json`.
356///
357/// Accepts the JSON batch emitted by the WASM `MetricsCollector::flush()`.
358/// Rate-limited per IP using the same `DashMap` as the gRPC handler.
359/// Returns 429 on rate-limit exceeded, 400 on validation failure.
360#[allow(clippy::unused_async)]
361pub async fn json_telemetry_handler(
362    ConnectInfo(addr): ConnectInfo<SocketAddr>,
363    axum::extract::State(svc): axum::extract::State<AetherisTelemetryService>,
364    Json(batch): Json<JsonTelemetryBatch>,
365) -> impl IntoResponse {
366    let ip = addr.ip();
367
368    if !svc.check_rate_limit(ip) {
369        return (StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded").into_response();
370    }
371
372    if batch.events.len() > MAX_BATCH_SIZE {
373        return (StatusCode::BAD_REQUEST, "batch too large").into_response();
374    }
375    if batch.session_id.len() > 128 {
376        return (StatusCode::BAD_REQUEST, "session_id too long").into_response();
377    }
378
379    let session_id = sanitize(&batch.session_id, 128);
380
381    // Emit a batch-level Jaeger span so all events from this flush are correlated.
382    // The trace_id from the first event (if present) is recorded as an attribute;
383    // full W3C trace context injection into Jaeger is deferred to M1062.
384    let first_trace_id = batch
385        .events
386        .first()
387        .map(|e| sanitize(&e.trace_id, 64))
388        .unwrap_or_default();
389
390    let _span = tracing::info_span!(
391        "wasm_telemetry",
392        session_id = %session_id,
393        trace_id = %first_trace_id,
394        event_count = batch.events.len(),
395    )
396    .entered();
397
398    metrics::counter!(
399        "aetheris_wasm_telemetry_batches_total",
400        "client_type" => "wasm_playground"
401    )
402    .increment(1);
403
404    for event in &batch.events {
405        let target = sanitize(&event.target, MAX_FIELD_LEN);
406        let message = sanitize(&event.message, MAX_FIELD_LEN);
407        let trace_id = sanitize(&event.trace_id, 64);
408        let span_name = sanitize(&event.span_name, 128);
409        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
410        let ts = event.timestamp_ms as u64;
411
412        if event.span_name == "metrics_snapshot" {
413            record_wasm_metrics(&event.message);
414        }
415
416        let level_label = match event.level {
417            3 => "error",
418            2 => "warn",
419            _ => "info",
420        };
421        metrics::counter!(
422            "aetheris_wasm_telemetry_events_total",
423            "client_type" => "wasm_playground",
424            "level" => level_label,
425        )
426        .increment(1);
427
428        match event.level {
429            3 => tracing::error!(
430                session_id = %session_id,
431                trace_id = %trace_id,
432                span_name = %span_name,
433                target = %target,
434                timestamp_ms = ts,
435                rtt_ms = ?event.rtt_ms,
436                "wasm: {}", message
437            ),
438            2 => tracing::warn!(
439                session_id = %session_id,
440                trace_id = %trace_id,
441                span_name = %span_name,
442                target = %target,
443                timestamp_ms = ts,
444                rtt_ms = ?event.rtt_ms,
445                "wasm: {}", message
446            ),
447            _ => tracing::info!(
448                session_id = %session_id,
449                trace_id = %trace_id,
450                span_name = %span_name,
451                target = %target,
452                timestamp_ms = ts,
453                rtt_ms = ?event.rtt_ms,
454                "wasm: {}", message
455            ),
456        }
457    }
458
459    (StatusCode::OK, "accepted").into_response()
460}
461
462#[cfg(test)]
463mod tests {
464    use super::{AetherisTelemetryService, sanitize};
465    use std::net::{IpAddr, Ipv4Addr};
466
467    #[test]
468    fn sanitize_strips_control_chars() {
469        assert_eq!(sanitize("hello\x00\nworld", 512), "helloworld");
470    }
471
472    #[test]
473    fn sanitize_truncates_at_boundary() {
474        let long = "a".repeat(600);
475        let result = sanitize(&long, 512);
476        assert_eq!(result.len(), 512);
477    }
478
479    #[test]
480    fn sanitize_handles_multibyte() {
481        // '€' is 3 bytes; 171 × 3 = 513 bytes — must truncate to 170 chars (510 bytes).
482        let s = "€".repeat(200);
483        let result = sanitize(&s, 512);
484        assert!(result.len() <= 512);
485        let _ = result.chars().count(); // Must not panic (valid UTF-8).
486    }
487
488    #[test]
489    fn rate_limit_allows_up_to_max() {
490        let svc = AetherisTelemetryService::new();
491        let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
492        for _ in 0..60 {
493            assert!(svc.check_rate_limit(ip));
494        }
495        assert!(!svc.check_rate_limit(ip));
496    }
497
498    #[test]
499    fn rate_limit_resets_after_window() {
500        let svc = AetherisTelemetryService::new();
501        let ip = IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8));
502        for _ in 0..60 {
503            svc.check_rate_limit(ip);
504        }
505        // Ensure 61st call is rejected before window reset
506        assert!(!svc.check_rate_limit(ip));
507        // Backdate the window start to simulate expiry.
508        svc.rate_limits.entry(ip).and_modify(|e| e.1 = 0);
509        assert!(svc.check_rate_limit(ip));
510    }
511
512    #[test]
513    fn json_deserialization_handles_missing_rtt() {
514        let json = r#"{
515            "session_id": "01JSZG2XKQP4V3R8N0CDWM7HFT",
516            "events": [
517                {
518                    "timestamp_ms": 123456789.0,
519                    "level": 1,
520                    "target": "test",
521                    "message": "hello",
522                    "trace_id": "trace1",
523                    "span_name": "span1"
524                }
525            ]
526        }"#;
527        let batch: super::JsonTelemetryBatch =
528            serde_json::from_str(json).expect("Should deserialize even without rtt_ms");
529        assert!(batch.events[0].rtt_ms.is_none());
530    }
531}