coreason-runtime 0.1.0

Kinetic Plane execution engine for the CoReason Tripartite Cybernetic Manifold
Documentation
// Copyright (c) 2026 CoReason, Inc.
// All rights reserved.

//! Telemetry API routes.
//!
//! Replaces `coreason_runtime/api/telemetry_router.py`,
//! `coreason_runtime/api/telemetry_broker.py`, and
//! `coreason_runtime/api/telemetry_flusher.py`.
//!
//! Handles speculative decoding metrics, active inference metrics,
//! broker stats, SSE telemetry streams, and auth verification streams.

use axum::{
    extract::{Query, State},
    http::StatusCode,
    response::{
        sse::{Event, KeepAlive, Sse},
        IntoResponse, Response,
    },
    routing::{get, post},
    Json, Router,
};
use chrono::Utc;
use futures_util::stream;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};

use crate::GatewayState;

// ── Payload types ───────────────────────────────────────────────────────

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpeculativeMetricPayload {
    pub draft_tokens: Option<u64>,
    pub accepted_tokens: Option<u64>,
    pub valid_at_1: Option<bool>,
    pub draft_model_latency_ms: Option<f64>,
    pub target_model_latency_ms: Option<f64>,
    pub batch_size: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ActiveInferenceMetricPayload {
    pub variational_free_energy: Option<f64>,
    pub expected_free_energy: Option<f64>,
    pub policy_coordinates: Option<Vec<f64>>,
    pub epistemic_value: Option<f64>,
    pub pragmatic_value: Option<f64>,
    pub agent_id: Option<String>,
}

#[derive(Deserialize)]
pub struct TelemetryQuery {
    pub intent: Option<String>,
}

// ── Speculative metrics ─────────────────────────────────────────────────

/// POST /api/telemetry/inference/speculative
///
/// Record a speculative-decoding metrics batch.
async fn record_speculative_metrics(
    State(state): State<Arc<GatewayState>>,
    Json(payload): Json<SpeculativeMetricPayload>,
) -> Response {
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64();

    let draft_tokens = payload.draft_tokens.unwrap_or(0);
    let accepted_tokens = payload.accepted_tokens.unwrap_or(0);
    let acceptance_rate = if draft_tokens > 0 {
        accepted_tokens as f64 / draft_tokens as f64
    } else {
        0.0
    };
    let draft_model_latency_ms = payload.draft_model_latency_ms.unwrap_or(0.0);
    let target_model_latency_ms = payload.target_model_latency_ms.unwrap_or(0.0);
    let speedup_ratio = if draft_model_latency_ms > 0.001 {
        target_model_latency_ms / draft_model_latency_ms
    } else {
        0.0
    };

    let metric = serde_json::json!({
        "timestamp": now,
        "draft_tokens": draft_tokens,
        "accepted_tokens": accepted_tokens,
        "valid_at_1": payload.valid_at_1.unwrap_or(false),
        "acceptance_rate": acceptance_rate,
        "draft_model_latency_ms": draft_model_latency_ms,
        "target_model_latency_ms": target_model_latency_ms,
        "batch_size": payload.batch_size.unwrap_or(1),
        "speedup_ratio": speedup_ratio,
    });

    state.speculative_metrics.lock().await.push(metric);

    (
        StatusCode::OK,
        Json(serde_json::json!({
            "status": "recorded",
            "acceptance_rate": acceptance_rate,
        })),
    )
        .into_response()
}

/// GET /api/telemetry/inference/speculative
///
/// Retrieve speculative-decoding metrics (last 100 + summary).
async fn get_speculative_metrics(State(state): State<Arc<GatewayState>>) -> Response {
    let metrics = state.speculative_metrics.lock().await;
    if metrics.is_empty() {
        return Json(serde_json::json!({
            "metrics": [],
            "summary": {},
        }))
        .into_response();
    }

    let total = metrics.len();
    let valid_at_1_count = metrics
        .iter()
        .filter(|m| m["valid_at_1"].as_bool().unwrap_or(false))
        .count();
    let avg_acceptance: f64 = metrics
        .iter()
        .map(|m| m["acceptance_rate"].as_f64().unwrap_or(0.0))
        .sum::<f64>()
        / total as f64;
    let avg_speedup: f64 = metrics
        .iter()
        .map(|m| m["speedup_ratio"].as_f64().unwrap_or(0.0))
        .sum::<f64>()
        / total as f64;

    let summary = serde_json::json!({
        "total_batches": total,
        "valid_at_1_rate": valid_at_1_count as f64 / total as f64,
        "avg_acceptance_rate": avg_acceptance,
        "avg_speedup_ratio": avg_speedup,
    });

    let last_100: Vec<serde_json::Value> = metrics.iter().cloned().rev().take(100).collect();

    Json(serde_json::json!({
        "metrics": last_100,
        "summary": summary,
    }))
    .into_response()
}

// ── Active inference metrics ────────────────────────────────────────────

/// POST /api/telemetry/inference/active
///
/// Record an active-inference metrics sample.
async fn record_active_inference_metrics(
    State(state): State<Arc<GatewayState>>,
    Json(payload): Json<ActiveInferenceMetricPayload>,
) -> Response {
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64();

    let metric = serde_json::json!({
        "timestamp": now,
        "variational_free_energy": payload.variational_free_energy.unwrap_or(0.0),
        "expected_free_energy": payload.expected_free_energy.unwrap_or(0.0),
        "policy_coordinates": payload.policy_coordinates.unwrap_or_default(),
        "epistemic_value": payload.epistemic_value.unwrap_or(0.0),
        "pragmatic_value": payload.pragmatic_value.unwrap_or(0.0),
        "agent_id": payload.agent_id.unwrap_or_else(|| "unknown".to_string()),
    });

    state.active_inference_metrics.lock().await.push(metric);

    (
        StatusCode::OK,
        Json(serde_json::json!({
            "status": "recorded",
        })),
    )
        .into_response()
}

/// GET /api/telemetry/inference/active
///
/// Retrieve active-inference metrics (last 100).
async fn get_active_inference_metrics(State(state): State<Arc<GatewayState>>) -> Response {
    let metrics = state.active_inference_metrics.lock().await;
    let last_100: Vec<serde_json::Value> = metrics.iter().cloned().rev().take(100).collect();
    Json(serde_json::json!({
        "metrics": last_100,
        "count": metrics.len(),
    }))
    .into_response()
}

// ── Broker stats ────────────────────────────────────────────────────────

/// GET /api/telemetry/broker/stats
///
/// Return telemetry broker health/stats (currently static).
async fn get_broker_stats() -> Response {
    Json(serde_json::json!({
        "status": "healthy",
        "subscribers": 0,
        "events_published": 0,
    }))
    .into_response()
}

// ── SSE streams ─────────────────────────────────────────────────────────

/// GET /api/telemetry/stream
///
/// Server-Sent Events stream for compiler-stage telemetry.
/// When `?intent=<value>` is provided the stream walks through the seven
/// compiler stages; otherwise it emits lifecycle events.
async fn get_telemetry_stream(
    Query(query): Query<TelemetryQuery>,
) -> Sse<impl stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
    let intent = query.intent;
    let stream = stream::unfold((0, intent), |(state, intent)| async move {
        if let Some(ref intent_val) = intent {
            let stages = [
                "DECOMPOSING",
                "RESOLVING_URNS",
                "FORGING_GAPS",
                "ASSEMBLING",
                "HITL_REVIEW",
                "VALIDATING_QUALITY",
                "PUBLISHED",
            ];
            if state < stages.len() {
                tokio::time::sleep(Duration::from_millis(1500)).await;
                let payload = serde_json::json!({
                    "type": "COMPILER_STAGE",
                    "payload": stages[state],
                });
                let event = Event::default().data(payload.to_string());
                return Some((
                    Ok::<_, std::convert::Infallible>(event),
                    (state + 1, Some(intent_val.clone())),
                ));
            } else {
                loop {
                    tokio::time::sleep(Duration::from_secs(3600)).await;
                }
            }
        } else {
            if state == 0 {
                let payload = serde_json::json!({
                    "type": "CONNECTION_ESTABLISHED",
                    "payload": "Connected to CoReason Sensory Telemetry Stream",
                });
                let event = Event::default().data(payload.to_string());
                return Some((Ok::<_, std::convert::Infallible>(event), (1, None)));
            } else if state == 1 {
                tokio::time::sleep(Duration::from_secs(2)).await;
                let payload = serde_json::json!({
                    "type": "SET_ORACLE_WORKFLOW",
                    "payload": "wf-sensory-active-inference-8822",
                });
                let event = Event::default().data(payload.to_string());
                return Some((Ok::<_, std::convert::Infallible>(event), (2, None)));
            } else if state == 2 {
                tokio::time::sleep(Duration::from_secs(3)).await;
                let payload = serde_json::json!({
                    "type": "AGENT_SUSPENDED",
                    "payload": {
                        "workflowId": "wf-sensory-active-inference-8822",
                        "isAgentDriving": true,
                        "latentState": {
                            "belief_state": [0.85, 0.15],
                            "action_space": ["query_oracle", "commit_state"],
                            "policy_precision": 16.5,
                            "free_energy": 0.42,
                        },
                        "intent": {
                            "intent_type": "AdjudicationIntent",
                            "arguments": {
                                "expression": "2 * 3.14159 * 10",
                                "reasoning": "Determining boundary sensory coordinates",
                            },
                            "domain_extensions": {
                                "VerificationYield": {
                                    "accuracy_threshold": 0.99,
                                    "attestation_required": true,
                                    "fallback_policy": "quarantine",
                                }
                            },
                        },
                    },
                });
                let event = Event::default().data(payload.to_string());
                return Some((Ok::<_, std::convert::Infallible>(event), (3, None)));
            } else {
                tokio::time::sleep(Duration::from_secs(15)).await;
                let payload = serde_json::json!({
                    "type": "PING",
                    "payload": {},
                });
                let event = Event::default().data(payload.to_string());
                return Some((Ok::<_, std::convert::Infallible>(event), (state, None)));
            }
        }
    });

    Sse::new(stream).keep_alive(KeepAlive::default())
}

/// GET /api/v1/auth/verifications
///
/// SSE stream of zero-knowledge proof caging verification receipts.
async fn get_caging_verifications(
) -> Sse<impl stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
    let stream = stream::unfold((), |()| async {
        tokio::time::sleep(Duration::from_secs(5)).await;

        let mut rng = rand::thread_rng();
        let statuses = ["VALIDATED", "FORGED", "INVALID"];
        let next_status = statuses[rng.gen_range(0..3)];
        let is_compliant = next_status == "VALIDATED";
        let random_tx = format!("tx_sol_{}", rng.gen_range(100000..999999));
        let random_hash = format!("0x{:040x}", rng.gen::<u128>());
        let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();

        let receipt = serde_json::json!({
            "transactionId": random_tx,
            "proofStatus": next_status,
            "originatingAgentUrn": "urn:coreason:agent:security-auditor:01h3x",
            "verificationTimestamp": timestamp,
            "zkProofHash": random_hash,
            "sdJwtCompliance": is_compliant,
        });

        let event = Event::default().data(receipt.to_string());
        Some((Ok::<_, std::convert::Infallible>(event), ()))
    });

    Sse::new(stream).keep_alive(KeepAlive::default())
}

// ── Router ──────────────────────────────────────────────────────────────

/// Build the telemetry router.
pub fn router() -> Router<Arc<GatewayState>> {
    Router::new()
        .route(
            "/api/telemetry/inference/speculative",
            post(record_speculative_metrics).get(get_speculative_metrics),
        )
        .route(
            "/api/telemetry/inference/active",
            post(record_active_inference_metrics).get(get_active_inference_metrics),
        )
        .route("/api/telemetry/broker/stats", get(get_broker_stats))
        .route("/api/telemetry/stream", get(get_telemetry_stream))
        .route("/api/v1/auth/verifications", get(get_caging_verifications))
}