use aetheris_protocol::telemetry::v1::{
TelemetryBatch, TelemetryLevel, TelemetryResponse, telemetry_service_server::TelemetryService,
};
use axum::{Json, extract::ConnectInfo, http::StatusCode, response::IntoResponse};
use dashmap::DashMap;
use serde::Deserialize;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tonic::{Request, Response, Status};
const RATE_LIMIT_MAX: u32 = 60;
const RATE_LIMIT_WINDOW_SECS: u64 = 60;
const MAX_FIELD_LEN: usize = 512;
const MAX_BATCH_SIZE: usize = 256;
const PRUNE_THRESHOLD: usize = 1000;
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn sanitize(s: &str, max_len: usize) -> String {
let clean: String = s.chars().filter(|c| !c.is_control()).collect();
if clean.len() <= max_len {
clean
} else {
clean
.char_indices()
.take_while(|(i, c)| i + c.len_utf8() <= max_len)
.last()
.map_or_else(String::new, |(i, c)| clean[..i + c.len_utf8()].to_string())
}
}
fn extract_client_ip<T>(request: &Request<T>) -> Option<IpAddr> {
let metadata = request.metadata();
if let Some(forwarded) = metadata.get("forwarded").and_then(|v| v.to_str().ok()) {
for part in forwarded.split(';') {
let part = part.trim();
if let Some(for_val) = part.strip_prefix("for=") {
let ip_str = for_val.trim_matches('"').split(':').next()?;
if let Ok(ip) = ip_str.parse::<IpAddr>() {
return Some(ip);
}
}
}
}
if let Some(xff) = metadata
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
{
if let Some(first) = xff.split(',').next()
&& let Ok(ip) = first.trim().parse::<IpAddr>()
{
return Some(ip);
}
}
request.remote_addr().map(|addr| addr.ip())
}
#[derive(Clone)]
pub struct AetherisTelemetryService {
rate_limits: Arc<DashMap<IpAddr, (u32, u64)>>,
}
impl AetherisTelemetryService {
#[must_use]
pub fn new() -> Self {
Self {
rate_limits: Arc::new(DashMap::new()),
}
}
fn prune_expired_entries(&self) {
let now = now_secs();
self.rate_limits.retain(|_, (_, window_start)| {
now.saturating_sub(*window_start) < 2 * RATE_LIMIT_WINDOW_SECS
});
}
fn check_rate_limit(&self, ip: IpAddr) -> bool {
if self.rate_limits.len() > PRUNE_THRESHOLD {
self.prune_expired_entries();
}
let now = now_secs();
let mut entry = self.rate_limits.entry(ip).or_insert((0, now));
let (count, window_start) = entry.value_mut();
if now.saturating_sub(*window_start) >= RATE_LIMIT_WINDOW_SECS {
*window_start = now;
*count = 1;
true
} else if *count < RATE_LIMIT_MAX {
*count += 1;
true
} else {
false
}
}
}
impl Default for AetherisTelemetryService {
fn default() -> Self {
Self::new()
}
}
#[tonic::async_trait]
impl TelemetryService for AetherisTelemetryService {
async fn submit_telemetry(
&self,
request: Request<TelemetryBatch>,
) -> Result<Response<TelemetryResponse>, Status> {
let client_ip = extract_client_ip(&request).ok_or_else(|| {
Status::permission_denied("Identification protocol failed: client IP indeterminate")
})?;
if !self.check_rate_limit(client_ip) {
return Err(Status::resource_exhausted(
"Telemetry rate limit exceeded. Try again in 60 seconds.",
));
}
let batch = request.into_inner();
if batch.events.len() > MAX_BATCH_SIZE {
return Err(Status::invalid_argument(format!(
"Batch size policy violation: limit is {MAX_BATCH_SIZE} events"
)));
}
if batch.session_id.len() > 128 {
return Err(Status::invalid_argument("session_id exceeds 128 bytes"));
}
let session_id = sanitize(&batch.session_id, 128);
process_events_grpc(&batch.events, &session_id);
Ok(Response::new(TelemetryResponse {}))
}
}
fn process_events_grpc(
events: &[aetheris_protocol::telemetry::v1::TelemetryEvent],
session_id: &str,
) {
metrics::counter!(
"aetheris_wasm_telemetry_batches_total",
"client_type" => "wasm_playground"
)
.increment(1);
for event in events {
let target = sanitize(&event.target, MAX_FIELD_LEN);
let message = sanitize(&event.message, MAX_FIELD_LEN);
let trace_id = sanitize(&event.trace_id, 64);
let span_name = sanitize(&event.span_name, 128);
let ts = event.timestamp_ms;
if event.span_name == "metrics_snapshot" {
record_wasm_metrics(&event.message);
}
let level =
TelemetryLevel::try_from(event.level).unwrap_or(TelemetryLevel::LevelUnspecified);
metrics::counter!(
"aetheris_wasm_telemetry_events_total",
"client_type" => "wasm_playground",
"level" => level_str(level),
)
.increment(1);
match level {
TelemetryLevel::Error => tracing::error!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
TelemetryLevel::Warn => tracing::warn!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => tracing::info!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
}
}
}
fn level_str(level: TelemetryLevel) -> &'static str {
match level {
TelemetryLevel::Error => "error",
TelemetryLevel::Warn => "warn",
TelemetryLevel::Info | TelemetryLevel::LevelUnspecified => "info",
}
}
fn record_wasm_metrics(msg: &str) {
for part in msg.split_whitespace() {
if let Some((key, val)) = part.split_once('=') {
let num = val
.trim_end_matches("ms")
.parse::<f64>()
.unwrap_or(f64::NAN);
if !num.is_finite() || !(0.0..=1_000_000.0).contains(&num) {
continue;
}
match key {
"fps" => {
metrics::histogram!(
"aetheris_wasm_fps",
"client_type" => "wasm_playground"
)
.record(num);
}
"frame_p99" => {
metrics::histogram!(
"aetheris_wasm_frame_time_ms",
"client_type" => "wasm_playground"
)
.record(num);
}
"sim_p99" => {
metrics::histogram!(
"aetheris_wasm_sim_time_ms",
"client_type" => "wasm_playground"
)
.record(num);
}
"rtt" => {
metrics::histogram!(
"aetheris_wasm_rtt_ms",
"client_type" => "wasm_playground"
)
.record(num);
}
"entities" => {
metrics::gauge!(
"aetheris_wasm_entity_count",
"client_type" => "wasm_playground"
)
.set(num);
}
"dropped" => {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let count = num as u64;
metrics::counter!(
"aetheris_wasm_telemetry_dropped_total",
"client_type" => "wasm_playground"
)
.increment(count);
}
_ => {}
}
}
}
}
#[derive(Debug, Deserialize)]
pub struct JsonTelemetryEvent {
pub timestamp_ms: f64,
pub level: u32,
pub target: String,
pub message: String,
#[serde(default)]
pub rtt_ms: Option<f64>,
pub trace_id: String,
pub span_name: String,
}
#[derive(Debug, Deserialize)]
pub struct JsonTelemetryBatch {
pub events: Vec<JsonTelemetryEvent>,
pub session_id: String,
}
#[allow(clippy::unused_async)]
pub async fn json_telemetry_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
axum::extract::State(svc): axum::extract::State<AetherisTelemetryService>,
Json(batch): Json<JsonTelemetryBatch>,
) -> impl IntoResponse {
let ip = addr.ip();
if !svc.check_rate_limit(ip) {
return (StatusCode::TOO_MANY_REQUESTS, "rate limit exceeded").into_response();
}
if batch.events.len() > MAX_BATCH_SIZE {
return (StatusCode::BAD_REQUEST, "batch too large").into_response();
}
if batch.session_id.len() > 128 {
return (StatusCode::BAD_REQUEST, "session_id too long").into_response();
}
let session_id = sanitize(&batch.session_id, 128);
let first_trace_id = batch
.events
.first()
.map(|e| sanitize(&e.trace_id, 64))
.unwrap_or_default();
let _span = tracing::info_span!(
"wasm_telemetry",
session_id = %session_id,
trace_id = %first_trace_id,
event_count = batch.events.len(),
)
.entered();
metrics::counter!(
"aetheris_wasm_telemetry_batches_total",
"client_type" => "wasm_playground"
)
.increment(1);
for event in &batch.events {
let target = sanitize(&event.target, MAX_FIELD_LEN);
let message = sanitize(&event.message, MAX_FIELD_LEN);
let trace_id = sanitize(&event.trace_id, 64);
let span_name = sanitize(&event.span_name, 128);
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let ts = event.timestamp_ms as u64;
if event.span_name == "metrics_snapshot" {
record_wasm_metrics(&event.message);
}
let level_label = match event.level {
3 => "error",
2 => "warn",
_ => "info",
};
metrics::counter!(
"aetheris_wasm_telemetry_events_total",
"client_type" => "wasm_playground",
"level" => level_label,
)
.increment(1);
match event.level {
3 => tracing::error!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
2 => tracing::warn!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
_ => tracing::info!(
session_id = %session_id,
trace_id = %trace_id,
span_name = %span_name,
target = %target,
timestamp_ms = ts,
rtt_ms = ?event.rtt_ms,
"wasm: {}", message
),
}
}
(StatusCode::OK, "accepted").into_response()
}
#[cfg(test)]
mod tests {
use super::{AetherisTelemetryService, sanitize};
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn sanitize_strips_control_chars() {
assert_eq!(sanitize("hello\x00\nworld", 512), "helloworld");
}
#[test]
fn sanitize_truncates_at_boundary() {
let long = "a".repeat(600);
let result = sanitize(&long, 512);
assert_eq!(result.len(), 512);
}
#[test]
fn sanitize_handles_multibyte() {
let s = "€".repeat(200);
let result = sanitize(&s, 512);
assert!(result.len() <= 512);
let _ = result.chars().count(); }
#[test]
fn rate_limit_allows_up_to_max() {
let svc = AetherisTelemetryService::new();
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
for _ in 0..60 {
assert!(svc.check_rate_limit(ip));
}
assert!(!svc.check_rate_limit(ip));
}
#[test]
fn rate_limit_resets_after_window() {
let svc = AetherisTelemetryService::new();
let ip = IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8));
for _ in 0..60 {
svc.check_rate_limit(ip);
}
assert!(!svc.check_rate_limit(ip));
svc.rate_limits.entry(ip).and_modify(|e| e.1 = 0);
assert!(svc.check_rate_limit(ip));
}
#[test]
fn json_deserialization_handles_missing_rtt() {
let json = r#"{
"session_id": "01JSZG2XKQP4V3R8N0CDWM7HFT",
"events": [
{
"timestamp_ms": 123456789.0,
"level": 1,
"target": "test",
"message": "hello",
"trace_id": "trace1",
"span_name": "span1"
}
]
}"#;
let batch: super::JsonTelemetryBatch =
serde_json::from_str(json).expect("Should deserialize even without rtt_ms");
assert!(batch.events[0].rtt_ms.is_none());
}
}