use axum::{
extract::{Query, State},
http::StatusCode,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse, Response,
},
routing::{get, post},
Json, Router,
};
use chrono::Utc;
use futures_util::stream;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use crate::GatewayState;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpeculativeMetricPayload {
pub draft_tokens: Option<u64>,
pub accepted_tokens: Option<u64>,
pub valid_at_1: Option<bool>,
pub draft_model_latency_ms: Option<f64>,
pub target_model_latency_ms: Option<f64>,
pub batch_size: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ActiveInferenceMetricPayload {
pub variational_free_energy: Option<f64>,
pub expected_free_energy: Option<f64>,
pub policy_coordinates: Option<Vec<f64>>,
pub epistemic_value: Option<f64>,
pub pragmatic_value: Option<f64>,
pub agent_id: Option<String>,
}
#[derive(Deserialize)]
pub struct TelemetryQuery {
pub intent: Option<String>,
}
async fn record_speculative_metrics(
State(state): State<Arc<GatewayState>>,
Json(payload): Json<SpeculativeMetricPayload>,
) -> Response {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
let draft_tokens = payload.draft_tokens.unwrap_or(0);
let accepted_tokens = payload.accepted_tokens.unwrap_or(0);
let acceptance_rate = if draft_tokens > 0 {
accepted_tokens as f64 / draft_tokens as f64
} else {
0.0
};
let draft_model_latency_ms = payload.draft_model_latency_ms.unwrap_or(0.0);
let target_model_latency_ms = payload.target_model_latency_ms.unwrap_or(0.0);
let speedup_ratio = if draft_model_latency_ms > 0.001 {
target_model_latency_ms / draft_model_latency_ms
} else {
0.0
};
let metric = serde_json::json!({
"timestamp": now,
"draft_tokens": draft_tokens,
"accepted_tokens": accepted_tokens,
"valid_at_1": payload.valid_at_1.unwrap_or(false),
"acceptance_rate": acceptance_rate,
"draft_model_latency_ms": draft_model_latency_ms,
"target_model_latency_ms": target_model_latency_ms,
"batch_size": payload.batch_size.unwrap_or(1),
"speedup_ratio": speedup_ratio,
});
state.speculative_metrics.lock().await.push(metric);
(
StatusCode::OK,
Json(serde_json::json!({
"status": "recorded",
"acceptance_rate": acceptance_rate,
})),
)
.into_response()
}
async fn get_speculative_metrics(State(state): State<Arc<GatewayState>>) -> Response {
let metrics = state.speculative_metrics.lock().await;
if metrics.is_empty() {
return Json(serde_json::json!({
"metrics": [],
"summary": {},
}))
.into_response();
}
let total = metrics.len();
let valid_at_1_count = metrics
.iter()
.filter(|m| m["valid_at_1"].as_bool().unwrap_or(false))
.count();
let avg_acceptance: f64 = metrics
.iter()
.map(|m| m["acceptance_rate"].as_f64().unwrap_or(0.0))
.sum::<f64>()
/ total as f64;
let avg_speedup: f64 = metrics
.iter()
.map(|m| m["speedup_ratio"].as_f64().unwrap_or(0.0))
.sum::<f64>()
/ total as f64;
let summary = serde_json::json!({
"total_batches": total,
"valid_at_1_rate": valid_at_1_count as f64 / total as f64,
"avg_acceptance_rate": avg_acceptance,
"avg_speedup_ratio": avg_speedup,
});
let last_100: Vec<serde_json::Value> = metrics.iter().cloned().rev().take(100).collect();
Json(serde_json::json!({
"metrics": last_100,
"summary": summary,
}))
.into_response()
}
async fn record_active_inference_metrics(
State(state): State<Arc<GatewayState>>,
Json(payload): Json<ActiveInferenceMetricPayload>,
) -> Response {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
let metric = serde_json::json!({
"timestamp": now,
"variational_free_energy": payload.variational_free_energy.unwrap_or(0.0),
"expected_free_energy": payload.expected_free_energy.unwrap_or(0.0),
"policy_coordinates": payload.policy_coordinates.unwrap_or_default(),
"epistemic_value": payload.epistemic_value.unwrap_or(0.0),
"pragmatic_value": payload.pragmatic_value.unwrap_or(0.0),
"agent_id": payload.agent_id.unwrap_or_else(|| "unknown".to_string()),
});
state.active_inference_metrics.lock().await.push(metric);
(
StatusCode::OK,
Json(serde_json::json!({
"status": "recorded",
})),
)
.into_response()
}
async fn get_active_inference_metrics(State(state): State<Arc<GatewayState>>) -> Response {
let metrics = state.active_inference_metrics.lock().await;
let last_100: Vec<serde_json::Value> = metrics.iter().cloned().rev().take(100).collect();
Json(serde_json::json!({
"metrics": last_100,
"count": metrics.len(),
}))
.into_response()
}
async fn get_broker_stats() -> Response {
Json(serde_json::json!({
"status": "healthy",
"subscribers": 0,
"events_published": 0,
}))
.into_response()
}
async fn get_telemetry_stream(
Query(query): Query<TelemetryQuery>,
) -> Sse<impl stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
let intent = query.intent;
let stream = stream::unfold((0, intent), |(state, intent)| async move {
if let Some(ref intent_val) = intent {
let stages = [
"DECOMPOSING",
"RESOLVING_URNS",
"FORGING_GAPS",
"ASSEMBLING",
"HITL_REVIEW",
"VALIDATING_QUALITY",
"PUBLISHED",
];
if state < stages.len() {
tokio::time::sleep(Duration::from_millis(1500)).await;
let payload = serde_json::json!({
"type": "COMPILER_STAGE",
"payload": stages[state],
});
let event = Event::default().data(payload.to_string());
return Some((
Ok::<_, std::convert::Infallible>(event),
(state + 1, Some(intent_val.clone())),
));
} else {
loop {
tokio::time::sleep(Duration::from_secs(3600)).await;
}
}
} else {
if state == 0 {
let payload = serde_json::json!({
"type": "CONNECTION_ESTABLISHED",
"payload": "Connected to CoReason Sensory Telemetry Stream",
});
let event = Event::default().data(payload.to_string());
return Some((Ok::<_, std::convert::Infallible>(event), (1, None)));
} else if state == 1 {
tokio::time::sleep(Duration::from_secs(2)).await;
let payload = serde_json::json!({
"type": "SET_ORACLE_WORKFLOW",
"payload": "wf-sensory-active-inference-8822",
});
let event = Event::default().data(payload.to_string());
return Some((Ok::<_, std::convert::Infallible>(event), (2, None)));
} else if state == 2 {
tokio::time::sleep(Duration::from_secs(3)).await;
let payload = serde_json::json!({
"type": "AGENT_SUSPENDED",
"payload": {
"workflowId": "wf-sensory-active-inference-8822",
"isAgentDriving": true,
"latentState": {
"belief_state": [0.85, 0.15],
"action_space": ["query_oracle", "commit_state"],
"policy_precision": 16.5,
"free_energy": 0.42,
},
"intent": {
"intent_type": "AdjudicationIntent",
"arguments": {
"expression": "2 * 3.14159 * 10",
"reasoning": "Determining boundary sensory coordinates",
},
"domain_extensions": {
"VerificationYield": {
"accuracy_threshold": 0.99,
"attestation_required": true,
"fallback_policy": "quarantine",
}
},
},
},
});
let event = Event::default().data(payload.to_string());
return Some((Ok::<_, std::convert::Infallible>(event), (3, None)));
} else {
tokio::time::sleep(Duration::from_secs(15)).await;
let payload = serde_json::json!({
"type": "PING",
"payload": {},
});
let event = Event::default().data(payload.to_string());
return Some((Ok::<_, std::convert::Infallible>(event), (state, None)));
}
}
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn get_caging_verifications(
) -> Sse<impl stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
let stream = stream::unfold((), |()| async {
tokio::time::sleep(Duration::from_secs(5)).await;
let mut rng = rand::thread_rng();
let statuses = ["VALIDATED", "FORGED", "INVALID"];
let next_status = statuses[rng.gen_range(0..3)];
let is_compliant = next_status == "VALIDATED";
let random_tx = format!("tx_sol_{}", rng.gen_range(100000..999999));
let random_hash = format!("0x{:040x}", rng.gen::<u128>());
let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let receipt = serde_json::json!({
"transactionId": random_tx,
"proofStatus": next_status,
"originatingAgentUrn": "urn:coreason:agent:security-auditor:01h3x",
"verificationTimestamp": timestamp,
"zkProofHash": random_hash,
"sdJwtCompliance": is_compliant,
});
let event = Event::default().data(receipt.to_string());
Some((Ok::<_, std::convert::Infallible>(event), ()))
});
Sse::new(stream).keep_alive(KeepAlive::default())
}
pub fn router() -> Router<Arc<GatewayState>> {
Router::new()
.route(
"/api/telemetry/inference/speculative",
post(record_speculative_metrics).get(get_speculative_metrics),
)
.route(
"/api/telemetry/inference/active",
post(record_active_inference_metrics).get(get_active_inference_metrics),
)
.route("/api/telemetry/broker/stats", get(get_broker_stats))
.route("/api/telemetry/stream", get(get_telemetry_stream))
.route("/api/v1/auth/verifications", get(get_caging_verifications))
}