use base64::{engine::general_purpose::STANDARD, Engine};
use ed25519_dalek::SigningKey;
use invariant_robotics::authority::crypto::sign_pca;
use invariant_robotics::models::authority::Pca;
use invariant_robotics::models::command::Command;
pub fn forge_authority(
cmd: &mut Command,
signing_key: &SigningKey,
kid: &str,
p_0: &str,
) -> Result<(), String> {
let ops = cmd.authority.required_ops.iter().cloned().collect();
let pca = Pca {
p_0: p_0.to_string(),
ops,
kid: kid.to_string(),
exp: None,
nbf: None,
predecessor_digest: [0u8; 32],
};
let signed = sign_pca(&pca, signing_key).map_err(|e| e.to_string())?;
let chain = vec![signed];
let chain_json = serde_json::to_vec(&chain).map_err(|e| e.to_string())?;
cmd.authority.pca_chain = STANDARD.encode(&chain_json);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use base64::{engine::general_purpose::STANDARD, Engine};
use invariant_robotics::authority::crypto::generate_keypair;
use invariant_robotics::models::authority::{Operation, SignedPca};
use invariant_robotics::models::command::{Command, CommandAuthority, JointState};
use rand::rngs::OsRng;
use std::collections::HashMap;
fn make_cmd(ops: Vec<Operation>) -> Command {
Command {
timestamp: chrono::Utc::now(),
source: "test".to_string(),
sequence: 1,
joint_states: vec![JointState {
name: "j1".to_string(),
position: 0.0,
velocity: 0.0,
effort: 0.0,
}],
delta_time: 0.01,
end_effector_positions: vec![],
center_of_mass: None,
authority: CommandAuthority {
pca_chain: String::new(),
required_ops: ops,
},
metadata: HashMap::new(),
locomotion_state: None,
end_effector_forces: vec![],
estimated_payload_kg: None,
signed_sensor_readings: vec![],
zone_overrides: HashMap::new(),
environment_state: None,
}
}
#[test]
fn forge_authority_happy_path_produces_valid_chain() {
let sk = generate_keypair(&mut OsRng);
let op = Operation::new("actuate:j1").unwrap();
let mut cmd = make_cmd(vec![op]);
forge_authority(&mut cmd, &sk, "test-kid", "forge").unwrap();
assert!(
!cmd.authority.pca_chain.is_empty(),
"pca_chain must be non-empty after forge"
);
let chain_bytes = STANDARD.decode(&cmd.authority.pca_chain).unwrap();
let chain: Vec<SignedPca> = serde_json::from_slice(&chain_bytes).unwrap();
assert_eq!(chain.len(), 1, "chain must contain exactly one PCA");
assert!(
!chain[0].raw.is_empty(),
"SignedPca raw bytes must not be empty"
);
}
#[test]
fn forge_authority_chain_validates_with_matching_key() {
use invariant_robotics::validator::ValidatorConfig;
use std::collections::HashMap;
let sk = generate_keypair(&mut OsRng);
let vk = sk.verifying_key();
let kid = "test-kid".to_string();
let profile_name = invariant_robotics::profiles::list_builtins()[0];
let profile = invariant_robotics::profiles::load_builtin(profile_name).unwrap();
let mut trusted = HashMap::new();
trusted.insert(kid.clone(), vk);
let sk_for_config = invariant_robotics::authority::crypto::generate_keypair(&mut OsRng);
let vk_for_config = sk_for_config.verifying_key();
trusted.insert("config-kid".to_string(), vk_for_config);
let config =
ValidatorConfig::new(profile, trusted, sk_for_config, "config-kid".to_string())
.unwrap();
let op = Operation::new("actuate:humanoid_28dof:joint_0:position").unwrap();
let mut cmd = make_cmd(vec![op]);
forge_authority(&mut cmd, &sk, &kid, "forge").unwrap();
let now = chrono::Utc::now();
let result = config.validate(&cmd, now, None).unwrap();
let authority_check = result
.signed_verdict
.verdict
.checks
.iter()
.find(|c| c.name == "authority")
.expect("authority check must be present");
assert!(authority_check.passed, "authority check must pass");
}
#[test]
fn forge_authority_trust_plane_p0_produces_valid_chain() {
let sk = generate_keypair(&mut OsRng);
let mut cmd = make_cmd(vec![Operation::new("actuate:j1").unwrap()]);
forge_authority(&mut cmd, &sk, "kid", "trust-plane").unwrap();
let chain_bytes = STANDARD.decode(&cmd.authority.pca_chain).unwrap();
let chain: Vec<SignedPca> = serde_json::from_slice(&chain_bytes).unwrap();
assert_eq!(chain.len(), 1);
assert!(!chain[0].raw.is_empty());
}
#[test]
fn forge_authority_overwrites_previous_chain() {
let sk = generate_keypair(&mut OsRng);
let mut cmd = make_cmd(vec![Operation::new("actuate:j1").unwrap()]);
forge_authority(&mut cmd, &sk, "kid", "forge").unwrap();
let first_chain = cmd.authority.pca_chain.clone();
forge_authority(&mut cmd, &sk, "kid", "forge").unwrap();
let second_chain = cmd.authority.pca_chain.clone();
for chain_b64 in &[first_chain, second_chain] {
let bytes = STANDARD.decode(chain_b64).unwrap();
let chain: Vec<SignedPca> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(chain.len(), 1);
}
}
#[test]
fn forge_authority_empty_ops() {
let sk = generate_keypair(&mut OsRng);
let mut cmd = make_cmd(vec![]);
forge_authority(&mut cmd, &sk, "kid", "forge").unwrap();
assert!(!cmd.authority.pca_chain.is_empty());
let chain_bytes = STANDARD.decode(&cmd.authority.pca_chain).unwrap();
let chain: Vec<SignedPca> = serde_json::from_slice(&chain_bytes).unwrap();
assert_eq!(chain.len(), 1);
}
}