1use aetheris_protocol::telemetry::v1::{
19 TelemetryBatch, TelemetryLevel, TelemetryResponse, telemetry_service_server::TelemetryService,
20};
21use axum::{Json, extract::ConnectInfo, http::StatusCode, response::IntoResponse};
22use dashmap::DashMap;
23use serde::Deserialize;
24use std::net::{IpAddr, SocketAddr};
25use std::sync::Arc;
26use tonic::{Request, Response, Status};
27
28const RATE_LIMIT_MAX: u32 = 60;
30const RATE_LIMIT_WINDOW_SECS: u64 = 60;
32const MAX_FIELD_LEN: usize = 512;
34const MAX_BATCH_SIZE: usize = 256;
36const PRUNE_THRESHOLD: usize = 1000;
38
39fn now_secs() -> u64 {
41 std::time::SystemTime::now()
42 .duration_since(std::time::UNIX_EPOCH)
43 .unwrap_or_default()
44 .as_secs()
45}
46
47fn sanitize(s: &str, max_len: usize) -> String {
50 let clean: String = s.chars().filter(|c| !c.is_control()).collect();
51 if clean.len() <= max_len {
52 clean
53 } else {
54 clean
55 .char_indices()
56 .take_while(|(i, c)| i + c.len_utf8() <= max_len)
57 .last()
58 .map_or_else(String::new, |(i, c)| clean[..i + c.len_utf8()].to_string())
59 }
60}
61
62fn extract_client_ip<T>(request: &Request<T>) -> Option<IpAddr> {
65 let metadata = request.metadata();
66
67 if let Some(forwarded) = metadata.get("forwarded").and_then(|v| v.to_str().ok()) {
69 for part in forwarded.split(';') {
70 let part = part.trim();
71 if let Some(for_val) = part.strip_prefix("for=") {
72 let ip_str = for_val.trim_matches('"').split(':').next()?;
73 if let Ok(ip) = ip_str.parse::<IpAddr>() {
74 return Some(ip);
75 }
76 }
77 }
78 }
79
80 if let Some(xff) = metadata
82 .get("x-forwarded-for")
83 .and_then(|v| v.to_str().ok())
84 {
85 if let Some(first) = xff.split(',').next()
87 && let Ok(ip) = first.trim().parse::<IpAddr>()
88 {
89 return Some(ip);
90 }
91 }
92
93 request.remote_addr().map(|addr| addr.ip())
95}
96
97#[derive(Clone)]
98pub struct AetherisTelemetryService {
99 rate_limits: Arc<DashMap<IpAddr, (u32, u64)>>,
101}
102
103impl AetherisTelemetryService {
104 #[must_use]
105 pub fn new() -> Self {
106 Self {
107 rate_limits: Arc::new(DashMap::new()),
108 }
109 }
110
111 fn prune_expired_entries(&self) {
113 let now = now_secs();
114 self.rate_limits.retain(|_, (_, window_start)| {
116 now.saturating_sub(*window_start) < 2 * RATE_LIMIT_WINDOW_SECS
117 });
118 }
119
120 fn check_rate_limit(&self, ip: IpAddr) -> bool {
122 if self.rate_limits.len() > PRUNE_THRESHOLD {
124 self.prune_expired_entries();
125 }
126
127 let now = now_secs();
128 let mut entry = self.rate_limits.entry(ip).or_insert((0, now));
129 let (count, window_start) = entry.value_mut();
130
131 if now.saturating_sub(*window_start) >= RATE_LIMIT_WINDOW_SECS {
132 *window_start = now;
133 *count = 1;
134 true
135 } else if *count < RATE_LIMIT_MAX {
136 *count += 1;
137 true
138 } else {
139 false
140 }
141 }
142}
143
144impl Default for AetherisTelemetryService {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150#[tonic::async_trait]
151impl TelemetryService for AetherisTelemetryService {
152 async fn submit_telemetry(
153 &self,
154 request: Request<TelemetryBatch>,
155 ) -> Result<Response<TelemetryResponse>, Status> {
156 let client_ip = extract_client_ip(&request).ok_or_else(|| {
158 Status::permission_denied("Identification protocol failed: client IP indeterminate")
159 })?;
160
161 if !self.check_rate_limit(client_ip) {
162 return Err(Status::resource_exhausted(
163 "Telemetry rate limit exceeded. Try again in 60 seconds.",
164 ));
165 }
166
167 let batch = request.into_inner();
168
169 if batch.events.len() > MAX_BATCH_SIZE {
171 return Err(Status::invalid_argument(format!(
172 "Batch size policy violation: limit is {MAX_BATCH_SIZE} events"
173 )));
174 }
175
176 if batch.session_id.len() > 128 {
178 return Err(Status::invalid_argument("session_id exceeds 128 bytes"));
179 }
180
181 let session_id = sanitize(&batch.session_id, 128);
182 process_events_grpc(&batch.events, &session_id);
183 Ok(Response::new(TelemetryResponse {}))
184 }
185}
186
187fn process_events_grpc(
192 events: &[aetheris_protocol::telemetry::v1::TelemetryEvent],
193 session_id: &str,
194) {
195 metrics::counter!(
196 "aetheris_wasm_telemetry_batches_total",
197 "client_type" => "wasm_playground"
198 )
199 .increment(1);
200
201 for event in events {
202 let target = sanitize(&event.target, MAX_FIELD_LEN);
203 let message = sanitize(&event.message, MAX_FIELD_LEN);
204 let trace_id = sanitize(&event.trace_id, 64);
205 let span_name = sanitize(&event.span_name, 128);
206 let ts = event.timestamp_ms;
207
208 if event.span_name == "metrics_snapshot" {
210 record_wasm_metrics(&event.message);
211 }
212
213 let level =
214 TelemetryLevel::try_from(event.level).unwrap_or(TelemetryLevel::LevelUnspecified);
215 metrics::counter!(
216 "aetheris_wasm_telemetry_events_total",
217 "client_type" => "wasm_playground",
218 "level" => level_str(level),
219 )
220 .increment(1);
221
222 match level {
225 TelemetryLevel::Error => tracing::error!(
226 session_id = %session_id,
227 trace_id = %trace_id,
228 span_name = %span_name,
229 target = %target,
230 timestamp_ms = ts,
231 rtt_ms = ?event.rtt_ms,
232 "wasm: {}", message
233 ),
234 TelemetryLevel::Warn => tracing::warn!(
235 session_id = %session_id,
236 trace_id = %trace_id,
237 span_name = %span_name,
238 target = %target,
239 timestamp_ms = ts,
240 rtt_ms = ?event.rtt_ms,
241 "wasm: {}", message
242 ),
243 TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => tracing::info!(
244 session_id = %session_id,
245 trace_id = %trace_id,
246 span_name = %span_name,
247 target = %target,
248 timestamp_ms = ts,
249 rtt_ms = ?event.rtt_ms,
250 "wasm: {}", message
251 ),
252 }
253 }
254}
255
256fn level_str(level: TelemetryLevel) -> &'static str {
257 match level {
258 TelemetryLevel::Error => "error",
259 TelemetryLevel::Warn => "warn",
260 TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => "info",
261 }
262}
263
264fn record_wasm_metrics(msg: &str) {
269 for part in msg.split_whitespace() {
270 if let Some((key, val)) = part.split_once('=') {
271 let num = val
272 .trim_end_matches("ms")
273 .parse::<f64>()
274 .unwrap_or(f64::NAN);
275
276 if !num.is_finite() || !(0.0..=1_000_000.0).contains(&num) {
278 continue;
279 }
280
281 match key {
282 "fps" => {
283 metrics::histogram!(
284 "aetheris_wasm_fps",
285 "client_type" => "wasm_playground"
286 )
287 .record(num);
288 }
289 "frame_p99" => {
290 metrics::histogram!(
291 "aetheris_wasm_frame_time_ms",
292 "client_type" => "wasm_playground"
293 )
294 .record(num);
295 }
296 "sim_p99" => {
297 metrics::histogram!(
298 "aetheris_wasm_sim_time_ms",
299 "client_type" => "wasm_playground"
300 )
301 .record(num);
302 }
303 "rtt" => {
304 metrics::histogram!(
305 "aetheris_wasm_rtt_ms",
306 "client_type" => "wasm_playground"
307 )
308 .record(num);
309 }
310 "entities" => {
311 metrics::gauge!(
312 "aetheris_wasm_entity_count",
313 "client_type" => "wasm_playground"
314 )
315 .set(num);
316 }
317 "dropped" => {
318 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
319 let count = num as u64;
320 metrics::counter!(
321 "aetheris_wasm_telemetry_dropped_total",
322 "client_type" => "wasm_playground"
323 )
324 .increment(count);
325 }
326 _ => {}
327 }
328 }
329 }
330}
331
332#[derive(Debug, Deserialize)]
338pub struct JsonTelemetryEvent {
339 pub timestamp_ms: f64,
340 pub level: u32,
341 pub target: String,
342 pub message: String,
343 #[serde(default)]
344 pub rtt_ms: Option<f64>,
345 pub trace_id: String,
346 pub span_name: String,
347}
348
349#[derive(Debug, Deserialize)]
350pub struct JsonTelemetryBatch {
351 pub events: Vec<JsonTelemetryEvent>,
352 pub session_id: String,
353}
354
355#[allow(clippy::unused_async)]
361pub async fn json_telemetry_handler(
362 ConnectInfo(addr): ConnectInfo<SocketAddr>,
363 axum::extract::State(svc): axum::extract::State<AetherisTelemetryService>,
364 Json(batch): Json<JsonTelemetryBatch>,
365) -> impl IntoResponse {
366 let ip = addr.ip();
367
368 if !svc.check_rate_limit(ip) {
369 return (StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded").into_response();
370 }
371
372 if batch.events.len() > MAX_BATCH_SIZE {
373 return (StatusCode::BAD_REQUEST, "batch too large").into_response();
374 }
375 if batch.session_id.len() > 128 {
376 return (StatusCode::BAD_REQUEST, "session_id too long").into_response();
377 }
378
379 let session_id = sanitize(&batch.session_id, 128);
380
381 let first_trace_id = batch
385 .events
386 .first()
387 .map(|e| sanitize(&e.trace_id, 64))
388 .unwrap_or_default();
389
390 let _span = tracing::info_span!(
391 "wasm_telemetry",
392 session_id = %session_id,
393 trace_id = %first_trace_id,
394 event_count = batch.events.len(),
395 )
396 .entered();
397
398 metrics::counter!(
399 "aetheris_wasm_telemetry_batches_total",
400 "client_type" => "wasm_playground"
401 )
402 .increment(1);
403
404 for event in &batch.events {
405 let target = sanitize(&event.target, MAX_FIELD_LEN);
406 let message = sanitize(&event.message, MAX_FIELD_LEN);
407 let trace_id = sanitize(&event.trace_id, 64);
408 let span_name = sanitize(&event.span_name, 128);
409 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
410 let ts = event.timestamp_ms as u64;
411
412 if event.span_name == "metrics_snapshot" {
413 record_wasm_metrics(&event.message);
414 }
415
416 let level_label = match event.level {
417 3 => "error",
418 2 => "warn",
419 _ => "info",
420 };
421 metrics::counter!(
422 "aetheris_wasm_telemetry_events_total",
423 "client_type" => "wasm_playground",
424 "level" => level_label,
425 )
426 .increment(1);
427
428 match event.level {
429 3 => tracing::error!(
430 session_id = %session_id,
431 trace_id = %trace_id,
432 span_name = %span_name,
433 target = %target,
434 timestamp_ms = ts,
435 rtt_ms = ?event.rtt_ms,
436 "wasm: {}", message
437 ),
438 2 => tracing::warn!(
439 session_id = %session_id,
440 trace_id = %trace_id,
441 span_name = %span_name,
442 target = %target,
443 timestamp_ms = ts,
444 rtt_ms = ?event.rtt_ms,
445 "wasm: {}", message
446 ),
447 _ => tracing::info!(
448 session_id = %session_id,
449 trace_id = %trace_id,
450 span_name = %span_name,
451 target = %target,
452 timestamp_ms = ts,
453 rtt_ms = ?event.rtt_ms,
454 "wasm: {}", message
455 ),
456 }
457 }
458
459 (StatusCode::OK, "accepted").into_response()
460}
461
462#[cfg(test)]
463mod tests {
464 use super::{AetherisTelemetryService, sanitize};
465 use std::net::{IpAddr, Ipv4Addr};
466
467 #[test]
468 fn sanitize_strips_control_chars() {
469 assert_eq!(sanitize("hello\x00\nworld", 512), "helloworld");
470 }
471
472 #[test]
473 fn sanitize_truncates_at_boundary() {
474 let long = "a".repeat(600);
475 let result = sanitize(&long, 512);
476 assert_eq!(result.len(), 512);
477 }
478
479 #[test]
480 fn sanitize_handles_multibyte() {
481 let s = "€".repeat(200);
483 let result = sanitize(&s, 512);
484 assert!(result.len() <= 512);
485 let _ = result.chars().count(); }
487
488 #[test]
489 fn rate_limit_allows_up_to_max() {
490 let svc = AetherisTelemetryService::new();
491 let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
492 for _ in 0..60 {
493 assert!(svc.check_rate_limit(ip));
494 }
495 assert!(!svc.check_rate_limit(ip));
496 }
497
498 #[test]
499 fn rate_limit_resets_after_window() {
500 let svc = AetherisTelemetryService::new();
501 let ip = IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8));
502 for _ in 0..60 {
503 svc.check_rate_limit(ip);
504 }
505 assert!(!svc.check_rate_limit(ip));
507 svc.rate_limits.entry(ip).and_modify(|e| e.1 = 0);
509 assert!(svc.check_rate_limit(ip));
510 }
511
512 #[test]
513 fn json_deserialization_handles_missing_rtt() {
514 let json = r#"{
515 "session_id": "01JSZG2XKQP4V3R8N0CDWM7HFT",
516 "events": [
517 {
518 "timestamp_ms": 123456789.0,
519 "level": 1,
520 "target": "test",
521 "message": "hello",
522 "trace_id": "trace1",
523 "span_name": "span1"
524 }
525 ]
526 }"#;
527 let batch: super::JsonTelemetryBatch =
528 serde_json::from_str(json).expect("Should deserialize even without rtt_ms");
529 assert!(batch.events[0].rtt_ms.is_none());
530 }
531}