use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use ed25519_dalek::{SigningKey, Signer};
use rand::rngs::OsRng;
pub fn canonicalize_source(source_code: &str) -> String {
let temp_name = format!("coresig_temp_{}.py", rand::random::<u64>());
let temp_path = env::temp_dir().join(temp_name);
if let Err(e) = fs::write(&temp_path, source_code) {
log::error!("Failed to write temp file for canonicalization: {}", e);
return fallback_canonicalization(source_code);
}
let python_exe = env::var("PYTHON").unwrap_or_else(|_| "python".to_string());
let workspace_root = env::var("COREASON_WORKSPACE_ROOT").unwrap_or_else(|_| ".".to_string());
let worker_module = "coreason_meta_engineering.validation_worker";
let mut cmd = Command::new(&python_exe);
cmd.arg("-m")
.arg(worker_module)
.arg("canonicalize")
.arg("--code-path")
.arg(&temp_path);
let python_path = format!("{}{}{}/src", workspace_root, if cfg!(windows) { ";" } else { ":" }, workspace_root);
cmd.env("PYTHONPATH", python_path);
let output = cmd.output();
let _ = fs::remove_file(&temp_path);
match output {
Ok(out) if out.status.success() => {
String::from_utf8_lossy(&out.stdout).into_owned()
}
_ => {
log::warn!("Python canonicalization failed; falling back to basic cleaning.");
fallback_canonicalization(source_code)
}
}
}
fn fallback_canonicalization(source_code: &str) -> String {
let mut clean_lines = Vec::new();
for line in source_code.lines() {
if line.trim().starts_with("# CORESIG:") {
continue;
}
clean_lines.push(line);
}
clean_lines.join("\n").replace("\r\n", "\n")
}
pub fn sign_source_code(source_code: &str, private_key_hex: &str) -> String {
let private_bytes = match hex::decode(private_key_hex.trim()) {
Ok(b) => b,
Err(e) => {
log::error!("Failed to decode private key hex: {}", e);
return source_code.to_string();
}
};
if private_bytes.len() != 32 {
log::error!("Ed25519 private key must be exactly 32 bytes (64 hex characters), got {}", private_bytes.len());
return source_code.to_string();
}
let mut key_arr = [0u8; 32];
key_arr.copy_from_slice(&private_bytes);
let signing_key = SigningKey::from_bytes(&key_arr);
let canonical_source = canonicalize_source(source_code);
let signature = signing_key.sign(canonical_source.as_bytes());
let signature_hex = hex::encode(signature.to_bytes());
let base_code = source_code.trim_end();
format!("{}\n\n# CORESIG: {}\n", base_code, signature_hex)
}
pub fn resolve_signing_key() -> Option<String> {
if let Ok(key_env) = env::var("COREASON_SIGNING_KEY") {
let key_env = key_env.trim();
let path = Path::new(key_env);
if path.is_file() {
if let Ok(content) = fs::read_to_string(path) {
return Some(content.trim().to_string());
}
}
return Some(key_env.to_string());
}
let vault_url = env::var("VAULT_ADDR");
let vault_token = env::var("VAULT_TOKEN");
if let (Ok(url), Ok(token)) = (vault_url, vault_token) {
log::info!("Attempting to fetch signing key from Vault...");
let client = reqwest::blocking::Client::new();
let api_url = format!("{}/v1/secret/data/coreason/identity", url.trim_end_matches('/'));
let response = client.get(&api_url)
.header("X-Vault-Token", token)
.send();
match response {
Ok(res) if res.status().is_success() => {
if let Ok(json) = res.json::<serde_json::Value>() {
if let Some(private_key) = json["data"]["data"]["private_key"].as_str() {
return Some(private_key.trim().to_string());
}
}
}
Ok(res) => log::warn!("Vault query returned non-success status: {}", res.status()),
Err(e) => log::warn!("Failed to query Vault: {}", e),
}
}
let home_dir = env::var("COREASON_HOME")
.map(PathBuf::from)
.ok()
.or_else(|| dirs::home_dir());
if let Some(home) = home_dir {
let dev_dir = home.join(".coreason");
let dev_key_path = dev_dir.join("dev_signing_key.hex");
let dev_pub_path = dev_dir.join("dev_signing_key.pub");
if dev_key_path.is_file() {
if let Ok(content) = fs::read_to_string(&dev_key_path) {
return Some(content.trim().to_string());
}
}
if let Err(e) = fs::create_dir_all(&dev_dir) {
log::warn!("Failed to create dev dir {:?}: {}", dev_dir, e);
return None;
}
let mut rng = OsRng;
let signing_key = SigningKey::generate(&mut rng);
let private_hex = hex::encode(signing_key.to_bytes());
let public_hex = hex::encode(signing_key.verifying_key().to_bytes());
if fs::write(&dev_key_path, &private_hex).is_ok() && fs::write(&dev_pub_path, &public_hex).is_ok() {
log::warn!(
"No COREASON_SIGNING_KEY found. Generated a new developer key-pair at:\n\
Private: {:?}\n\
Public: {:?}\n\
Please register the public key with your runtime verification engine.",
dev_key_path,
dev_pub_path
);
return Some(private_hex);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sign_source_code() {
let code = "print('hello')\n";
let private_bytes = [1u8; 32];
let private_hex = hex::encode(private_bytes);
let signed = sign_source_code(code, &private_hex);
assert!(signed.contains("print('hello')"));
assert!(signed.contains("# CORESIG:"));
}
}