#![allow(dead_code)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_assignments)]
#![allow(clippy::type_complexity)]
#![allow(clippy::new_without_default)]
#![allow(clippy::unnecessary_map_or)]
#![allow(clippy::needless_return)]
mod api;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, Query, State,
},
http::{header, HeaderValue, Method, StatusCode},
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse, Response,
},
routing::{get, post},
Json, Router,
};
use chrono::Utc;
use clap::Parser;
use dashmap::DashMap;
use futures_util::{sink::SinkExt, stream, stream::StreamExt};
use rand::Rng;
use serde::{Deserialize, Serialize};
use sha2::Digest;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
#[derive(Parser, Debug)]
#[command(
name = "coreason",
version = "0.1.0",
about = "CoReason Zero-Trust Kinetic Execution Engine"
)]
struct Cli {
#[clap(subcommand)]
command: Commands,
}
#[derive(clap::Subcommand, Debug)]
enum Commands {
Start {
#[clap(subcommand)]
action: StartCommands,
},
Execute {
#[arg(help = "Path to a local JSON manifest to parse and dispatch")]
manifest_path: std::path::PathBuf,
#[arg(
short,
long,
help = "Inject a dynamic user query into the root agent node"
)]
query: Option<String>,
#[arg(long, help = "Parses structural inputs without hooking telemetry")]
dry_run: bool,
},
Compute {
#[arg(help = "The target execution plane module/function to run")]
module: String,
#[arg(help = "JSON payload containing input parameters")]
payload: String,
},
}
#[derive(clap::Subcommand, Debug)]
enum StartCommands {
Api {
#[arg(short, long, default_value_t = 8080)]
port: u16,
#[arg(long, default_value = "http://127.0.0.1:8000")]
python_sidecar: String,
},
Node {
#[arg(long, help = "Parses structural inputs without hooking telemetry")]
dry_run: bool,
},
}
type RoomName = String;
type ClientTx = broadcast::Sender<Message>;
struct GatewayState {
rooms: DashMap<RoomName, ClientTx>,
sidecar_url: String,
speculative_metrics: tokio::sync::Mutex<Vec<serde_json::Value>>,
active_inference_metrics: tokio::sync::Mutex<Vec<serde_json::Value>>,
}
#[derive(Serialize, Deserialize, Debug)]
struct TelemetryPayload {
node_cid: String,
metric_name: String,
metric_value: f64,
timestamp_ns: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct ActuatorPayload {
plugin_path: String,
input_data: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct GenericResponse {
status: String,
message: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct CapabilityExecutePayload {
tool_name: String,
intent: serde_json::Value,
state: serde_json::Value,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct SpeculativeMetricPayload {
draft_tokens: Option<u64>,
accepted_tokens: Option<u64>,
valid_at_1: Option<bool>,
draft_model_latency_ms: Option<f64>,
target_model_latency_ms: Option<f64>,
batch_size: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct ActiveInferenceMetricPayload {
variational_free_energy: Option<f64>,
expected_free_energy: Option<f64>,
policy_coordinates: Option<Vec<f64>>,
epistemic_value: Option<f64>,
pragmatic_value: Option<f64>,
agent_id: Option<String>,
}
#[derive(Deserialize)]
struct TelemetryQuery {
intent: Option<String>,
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Start { action } => match action {
StartCommands::Api {
port,
python_sidecar,
} => {
println!("Initializing CoReason SOTA 2026 Rust/Axum Ingress Gateway...");
println!("Listening on port: {}", port);
println!(
"Proxying dynamic reasoning payloads to Python sidecar at: {}",
python_sidecar
);
let state = Arc::new(GatewayState {
rooms: DashMap::new(),
sidecar_url: python_sidecar,
speculative_metrics: tokio::sync::Mutex::new(Vec::new()),
active_inference_metrics: tokio::sync::Mutex::new(Vec::new()),
});
let cors = CorsLayer::new()
.allow_origin("*".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST])
.allow_headers([header::CONTENT_TYPE]);
let app = Router::new()
.route("/health", get(health_check))
.route("/telemetry", post(receive_telemetry))
.route("/actuator/execute", post(execute_actuator))
.route("/ws/:room_name", get(websocket_handler))
.merge(api::build_api_router())
.layer(cors)
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
StartCommands::Node { dry_run } => {
println!("Starting Temporal Worker node...");
if dry_run {
println!("Dry run complete. Exiting.");
return;
}
println!("Connecting to Temporal cluster... (Mocked for Phase 1/2)");
loop {
tokio::time::sleep(Duration::from_secs(3600)).await;
}
}
},
Commands::Execute {
manifest_path,
query,
dry_run,
} => {
println!("Executing manifest at {:?}...", manifest_path);
if dry_run {
println!("Dry run complete. Exiting.");
return;
}
let content = std::fs::read_to_string(&manifest_path).unwrap_or_else(|e| {
eprintln!("Failed to read manifest file: {}", e);
std::process::exit(1);
});
let manifest_json: serde_json::Value =
serde_json::from_str(&content).unwrap_or_else(|e| {
eprintln!("Failed to parse manifest JSON: {}", e);
std::process::exit(1);
});
let sidecar_url = std::env::var("PYTHON_SIDECAR_URL")
.unwrap_or_else(|_| "http://127.0.0.1:8080".to_string());
let url = format!("{}/api/v1/state/execute", sidecar_url);
let payload = serde_json::json!({
"manifest": manifest_json,
"query": query,
});
println!(
"Dispatching manifest to execution engine sidecar at: {}",
url
);
match ureq::post(&url).send_json(&payload) {
Ok(res) => {
let body: serde_json::Value =
res.into_json().unwrap_or(serde_json::Value::Null);
println!("Manifest executed successfully: {:#?}", body);
}
Err(e) => {
eprintln!("Manifest execution failed: {}", e);
std::process::exit(1);
}
}
}
Commands::Compute { module, payload } => {
let val: serde_json::Value = serde_json::from_str(&payload).unwrap_or_else(|e| {
eprintln!("Invalid JSON payload: {}", e);
std::process::exit(1);
});
match module.as_str() {
"lmsr_prices" => {
let shares: Vec<f64> =
serde_json::from_value(val["shares"].clone()).unwrap_or_default();
let liquidity: f64 = val["liquidity"].as_f64().unwrap_or(100.0);
let mut market = coreason_runtime_rust::execution_plane::lmsr_consensus::LMSRMarketMaker::new(liquidity);
for (i, &s) in shares.iter().enumerate() {
let id = format!("outcome_{}", i);
let label = format!("Outcome {}", i);
market.outcomes.push(coreason_runtime_rust::execution_plane::lmsr_consensus::MarketOutcome::new(&id, &label));
market.outcomes[i].shares = s;
}
let prices: Vec<f64> = (0..shares.len()).map(|i| market.price(i)).collect();
println!("{}", serde_json::to_string(&prices).unwrap());
}
"consensus_score" => {
let agent_beliefs: std::collections::HashMap<String, Vec<f64>> =
serde_json::from_value(val["agent_beliefs"].clone()).unwrap_or_default();
let result = coreason_runtime_rust::execution_plane::lmsr_consensus::ConsensusMetrics::consensus_score(&agent_beliefs);
println!("{}", result);
}
"debate_round" => {
let agent_positions: std::collections::HashMap<String, Vec<f64>> =
serde_json::from_value(val["agent_positions"].clone()).unwrap_or_default();
let liquidity: f64 = val["liquidity"].as_f64().unwrap_or(100.0);
let mut round = coreason_runtime_rust::execution_plane::lmsr_consensus::DialecticDebateRound::new("cli_round", liquidity);
for (agent_id, position) in &agent_positions {
let confidence = position.first().copied().unwrap_or(0.5);
round.submit_argument(agent_id, "position", vec![], confidence);
}
let result = round.resolve_consensus();
println!("{}", result);
}
"compute_blast_radius" => {
let edges: Vec<(String, String)> =
serde_json::from_value(val["edges"].clone()).unwrap_or_default();
let target_node = val["target_node"].as_str().unwrap_or("").to_string();
let mut adjacency: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
for (parent, child) in &edges {
adjacency
.entry(parent.clone())
.or_default()
.push(child.clone());
adjacency.entry(child.clone()).or_default();
}
let result = coreason_runtime_rust::execution_plane::blast_radius::BlastRadiusCalculator::calculate_blast_radius(&target_node, &adjacency);
println!("{}", serde_json::to_string(&result).unwrap());
}
"compute_defeasibility_score" => {
let edges: Vec<(String, String)> =
serde_json::from_value(val["edges"].clone()).unwrap_or_default();
let target_node = val["target_node"].as_str().unwrap_or("").to_string();
let mut adjacency: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
for (parent, child) in &edges {
adjacency
.entry(parent.clone())
.or_default()
.push(child.clone());
adjacency.entry(child.clone()).or_default();
}
let score = coreason_runtime_rust::execution_plane::blast_radius::BlastRadiusCalculator::compute_defeasibility_score(&target_node, &adjacency);
println!("{}", score);
}
"worm_append" => {
let workflow_id = val["workflow_id"].as_str().unwrap_or("").to_string();
let delta = val["state_delta"].clone();
let ledger =
coreason_runtime_rust::execution_plane::worm_ledger::get_worm_ledger();
let mut guard = ledger.lock().unwrap();
let hash = guard.append(&workflow_id, &delta);
println!("{}", hash);
}
"worm_verify_chain" => {
let ledger =
coreason_runtime_rust::execution_plane::worm_ledger::get_worm_ledger();
let guard = ledger.lock().unwrap();
match guard.verify_chain() {
Ok(v) => println!("{}", v),
Err(e) => {
eprintln!("Verification failed: {}", e);
std::process::exit(2);
}
}
}
"worm_len" => {
let ledger =
coreason_runtime_rust::execution_plane::worm_ledger::get_worm_ledger();
let guard = ledger.lock().unwrap();
println!("{}", guard.len());
}
"worm_to_json" => {
let ledger =
coreason_runtime_rust::execution_plane::worm_ledger::get_worm_ledger();
let guard = ledger.lock().unwrap();
println!("{}", guard.to_json());
}
"verify_license" => {
let receipt_json = val["receipt_json"].as_str().unwrap_or("{}").to_string();
let public_key_hex = val["public_key_hex"].as_str().unwrap_or("").to_string();
let local_fingerprint =
val["local_fingerprint"].as_str().map(|s| s.to_string());
let receipt: coreason_runtime_rust::license::CommercialOverrideReceipt =
serde_json::from_str(&receipt_json).unwrap();
let verifier =
coreason_runtime_rust::license::LicenseVerifier::new(local_fingerprint);
let entitlements = verifier.verify_and_apply(&receipt, &public_key_hex);
println!("{}", serde_json::to_string(&entitlements).unwrap());
}
_ => {
eprintln!("Unknown compute module: {}", module);
std::process::exit(1);
}
}
}
}
}
async fn health_check() -> &'static str {
"CoReason Ingress Gateway is healthy and running on Axum."
}
async fn receive_telemetry(Json(payload): Json<TelemetryPayload>) -> impl IntoResponse {
println!(
"[TELEMETRY] Node: {} | {} = {} | Time: {}ns",
payload.node_cid, payload.metric_name, payload.metric_value, payload.timestamp_ns
);
(
StatusCode::ACCEPTED,
Json(GenericResponse {
status: "success".to_string(),
message: "Telemetry packet queued successfully.".to_string(),
}),
)
}
async fn execute_actuator(Json(payload): Json<ActuatorPayload>) -> Response {
println!(
"[ACTUATOR] Attempting execution of sandboxed WASM plugin: {}",
payload.plugin_path
);
match coreason_runtime_rust::wasm_dispatcher::WasmDispatcher::execute_plugin(
&payload.plugin_path,
"run_actuator",
&payload.input_data,
) {
Ok(output) => (
StatusCode::OK,
Json(GenericResponse {
status: "success".to_string(),
message: output,
}),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(GenericResponse {
status: "error".to_string(),
message: e,
}),
)
.into_response(),
}
}
async fn get_capabilities() -> Response {
Json(serde_json::json!(["redirected_to_api_module"])).into_response()
}
async fn websocket_handler(
ws: WebSocketUpgrade,
Path(room_name): Path<String>,
State(state): State<Arc<GatewayState>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket_stream(socket, room_name, state))
}
async fn handle_socket_stream(socket: WebSocket, room_name: String, state: Arc<GatewayState>) {
let (mut sender, mut receiver) = socket.split();
let rx = state
.rooms
.entry(room_name.clone())
.or_insert_with(|| {
let (tx, _rx) = broadcast::channel(128);
tx
})
.value()
.clone();
let mut broadcast_rx = rx.subscribe();
let tx_clone = rx.clone();
let mut write_task = tokio::spawn(async move {
while let Ok(msg) = broadcast_rx.recv().await {
if sender.send(msg).await.is_err() {
break;
}
}
});
let mut read_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(_) | Message::Binary(_) => {
let _ = tx_clone.send(msg);
}
Message::Close(_) => {
break;
}
_ => {}
}
}
});
tokio::select! {
_ = (&mut read_task) => {
write_task.abort();
}
_ = (&mut write_task) => {
read_task.abort();
}
}
if let Some(entry) = state.rooms.get_mut(&room_name) {
if entry.receiver_count() == 0 {
drop(entry);
state.rooms.remove(&room_name);
}
}
}