use anyhow::{Context, Result};
use async_nats::jetstream::{self, kv};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::time::Duration;
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct NatsAuth {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub token: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub username: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub password: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub inline_creds: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub creds_file: Option<String>,
}
impl NatsAuth {
pub fn is_configured(&self) -> bool {
self.token.is_some()
|| (self.username.is_some() && self.password.is_some())
|| self.inline_creds.is_some()
|| self.creds_file.is_some()
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct OrchestratorEntry {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(default)]
pub url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bearer_token: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub invite_code: Option<String>,
}
pub async fn connect_nats(url: &str, auth: Option<&NatsAuth>) -> Result<async_nats::Client> {
match auth.filter(|a| a.is_configured()) {
Some(auth) => {
let opts = if let Some(token) = &auth.token {
async_nats::ConnectOptions::new().token(token.clone())
} else if let (Some(user), Some(pass)) = (&auth.username, &auth.password) {
async_nats::ConnectOptions::new().user_and_password(user.clone(), pass.clone())
} else if let Some(inline) = &auth.inline_creds {
async_nats::ConnectOptions::with_credentials(inline)
.context("Failed to parse inline NATS credentials")?
} else if let Some(creds) = &auth.creds_file {
async_nats::ConnectOptions::new()
.credentials_file(creds)
.await
.context(format!("Failed to load NATS credentials file: {}", creds))?
} else {
async_nats::ConnectOptions::new()
};
opts.connect(url)
.await
.map_err(|e| anyhow::anyhow!(e))
.context(format!("Failed to connect to NATS at {} with auth", url))
}
None => async_nats::connect(url)
.await
.map_err(|e| anyhow::anyhow!(e))
.context(format!("Failed to connect to NATS at {}", url)),
}
}
const NATS_FORBIDDEN_CHARS: &[char] = &['\0', ' ', '.', '*', '>', '/'];
pub fn validate_nats_name(name: &str, field_label: &str) -> Result<(), String> {
if name.is_empty() {
return Err(format!("{} must not be empty", field_label));
}
let invalid_chars: Vec<char> = name
.chars()
.filter(|c| NATS_FORBIDDEN_CHARS.contains(c) || c.is_whitespace() || c.is_control())
.collect();
if invalid_chars.is_empty() {
Ok(())
} else {
let mut unique_chars: Vec<char> = invalid_chars;
unique_chars.sort();
unique_chars.dedup();
Err(format!(
"{} contains invalid characters: {:?}",
field_label, unique_chars
))
}
}
pub fn sanitize_subject_component(component: &str) -> String {
component
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect()
}
pub async fn ensure_kv_bucket(js: &jetstream::Context, config: kv::Config) -> Result<kv::Store> {
match js.create_key_value(config.clone()).await {
Ok(store) => Ok(store),
Err(e) => {
if e.to_string().contains("already in use") {
js.get_key_value(&config.bucket)
.await
.map_err(|e| anyhow::anyhow!(e))
.context(format!(
"Failed to get existing KV bucket {}",
config.bucket
))
} else {
Err(anyhow::anyhow!(e).context("Failed to create KV bucket"))
}
}
}
}
pub fn format_nats_creds(user_jwt: &str, user_seed: &str) -> String {
format!(
"-----BEGIN NATS USER JWT-----\n\
{user_jwt}\n\
------END NATS USER JWT------\n\
\n\
************************* IMPORTANT *************************\n\
NKEY Seed printed below can be used to sign and prove identity.\n\
NKEYs are sensitive and should be treated as secrets.\n\
\n\
-----BEGIN USER NKEY SEED-----\n\
{user_seed}\n\
------END USER NKEY SEED------\n\
\n\
*************************************************************\n"
)
}
#[derive(Debug)]
pub struct HashMismatchError {
pub expected: String,
pub computed: String,
}
impl std::fmt::Display for HashMismatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NATS URL hash mismatch — expected {} but computed {}; \
the orchestrator response may have been tampered with",
self.expected, self.computed
)
}
}
impl std::error::Error for HashMismatchError {}
#[derive(Debug)]
pub struct CredentialsNotEnabledError;
impl std::fmt::Display for CredentialsNotEnabledError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Credential issuance not enabled on orchestrator (503); \
fall back to direct NATS connection"
)
}
}
impl std::error::Error for CredentialsNotEnabledError {}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ChallengeResponse {
pub orchestrator_pub_key: String,
pub nats_url_hash: String,
pub nonce: String,
pub expires_in_secs: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RegistrationResponse {
pub user_jwt: String,
pub nats_url: String,
}
pub fn sha256_hex(input: &str) -> String {
let hash = Sha256::digest(input.as_bytes());
hash.iter().fold(String::with_capacity(64), |mut s, b| {
use std::fmt::Write;
write!(s, "{b:02x}").unwrap();
s
})
}
#[derive(Debug)]
pub struct RegistrationResult {
pub creds: String,
pub nats_url: String,
pub keypair: nkeys::KeyPair,
}
pub async fn register_with_orchestrator(
orchestrator_url: &str,
agent_id: &str,
bearer_token: &str,
) -> Result<RegistrationResult> {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.context("Failed to build HTTP client")?;
let base = orchestrator_url.trim_end_matches('/');
let user_kp = nkeys::KeyPair::new_user();
let user_pub = user_kp.public_key();
let challenge_resp = http
.get(format!("{base}/credentials/challenge"))
.bearer_auth(bearer_token)
.send()
.await
.context("Failed to request credentials challenge")?;
if challenge_resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE {
return Err(CredentialsNotEnabledError.into());
}
let challenge: ChallengeResponse = challenge_resp
.error_for_status()
.context("Credentials challenge request rejected")?
.json()
.await
.context("Failed to parse challenge response")?;
let challenge_msg = format!(
"{}:{}:{}",
challenge.nonce, challenge.orchestrator_pub_key, challenge.nats_url_hash
);
let signature = user_kp
.sign(challenge_msg.as_bytes())
.map_err(|e| anyhow::anyhow!("Failed to sign challenge: {e}"))?;
let reg_body = serde_json::json!({
"agent_id": agent_id,
"user_pub_key": user_pub,
"nonce": challenge.nonce,
"signature": signature,
});
let reg_resp: RegistrationResponse = http
.post(format!("{base}/credentials/register"))
.bearer_auth(bearer_token)
.json(®_body)
.send()
.await
.context("Failed to send registration request")?
.error_for_status()
.context("Registration request rejected")?
.json()
.await
.context("Failed to parse registration response")?;
let computed_hash = sha256_hex(®_resp.nats_url);
if computed_hash != challenge.nats_url_hash {
return Err(anyhow::Error::new(HashMismatchError {
expected: challenge.nats_url_hash.clone(),
computed: computed_hash,
}));
}
let seed_str = user_kp
.seed()
.map_err(|e| anyhow::anyhow!("Failed to extract user seed: {e}"))?;
let creds = format_nats_creds(®_resp.user_jwt, &seed_str);
Ok(RegistrationResult {
creds,
nats_url: reg_resp.nats_url,
keypair: user_kp,
})
}
pub async fn register_with_orchestrator_with_retry(
orchestrator_url: &str,
agent_id: &str,
bearer_token: &str,
max_attempts: u32,
) -> Result<RegistrationResult> {
let base_delay_ms = 1_000u64;
let mut last_err = anyhow::anyhow!("No attempts made");
for attempt in 1..=max_attempts {
match register_with_orchestrator(orchestrator_url, agent_id, bearer_token).await {
Ok(result) => return Ok(result),
Err(e) => {
if !is_retryable_registration_error(&e) || attempt == max_attempts {
return Err(e);
}
let wait = Duration::from_millis(base_delay_ms * 2u64.pow(attempt.min(6) - 1));
tracing::warn!(
orchestrator_url,
attempt,
max_attempts,
wait_ms = wait.as_millis(),
error = %e,
"Orchestrator not yet reachable, retrying registration..."
);
tokio::time::sleep(wait).await;
last_err = e;
}
}
}
Err(last_err)
}
pub const AUD_OPERATOR_REDEEM: &str = "nsed-operator-redeem";
pub const AUD_AGENT_REDEEM: &str = "nsed-agent-redeem";
pub fn invite_audience(code: &str) -> Result<Option<String>> {
use base64::Engine;
let parts: Vec<&str> = code.split('.').collect();
if parts.len() != 3 {
anyhow::bail!(
"invite code is not JWT-shaped (expected 3 dot-separated segments, got {})",
parts.len()
);
}
let payload = parts[1];
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(payload)
.map_err(|e| anyhow::anyhow!("invite payload not base64url: {e}"))?;
#[derive(serde::Deserialize)]
struct AudOnly {
#[serde(default)]
aud: Option<String>,
}
let parsed: AudOnly = serde_json::from_slice(&bytes)
.map_err(|e| anyhow::anyhow!("invite payload not JSON: {e}"))?;
Ok(parsed.aud)
}
#[derive(Debug)]
pub enum RedeemInviteError {
InvalidCode,
Expired,
Revoked,
Replayed,
NotConfigured,
KvUnavailable,
Unexpected {
status: reqwest::StatusCode,
body: String,
},
Transport(anyhow::Error),
Decode(anyhow::Error),
}
impl RedeemInviteError {
pub fn is_retryable(&self) -> bool {
match self {
RedeemInviteError::Transport(_) | RedeemInviteError::KvUnavailable => true,
RedeemInviteError::Unexpected { status, .. } => status.is_server_error(),
RedeemInviteError::InvalidCode
| RedeemInviteError::Expired
| RedeemInviteError::Revoked
| RedeemInviteError::Replayed
| RedeemInviteError::NotConfigured
| RedeemInviteError::Decode(_) => false,
}
}
}
impl std::fmt::Display for RedeemInviteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RedeemInviteError::InvalidCode => write!(f, "invite code invalid"),
RedeemInviteError::Expired => write!(f, "invite code expired"),
RedeemInviteError::Revoked => write!(f, "invite code revoked"),
RedeemInviteError::Replayed => write!(f, "invite code already redeemed"),
RedeemInviteError::NotConfigured => {
write!(f, "orchestrator does not have invites configured")
}
RedeemInviteError::KvUnavailable => {
write!(f, "orchestrator KV store temporarily unreachable")
}
RedeemInviteError::Unexpected { status, body } => {
write!(f, "unexpected redeem response: {status} body={body:?}")
}
RedeemInviteError::Transport(e) => write!(f, "transport error: {e:#}"),
RedeemInviteError::Decode(e) => {
write!(f, "redeem response decode error (JTI consumed): {e:#}")
}
}
}
}
impl std::error::Error for RedeemInviteError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
RedeemInviteError::Transport(e) | RedeemInviteError::Decode(e) => Some(e.as_ref()),
_ => None,
}
}
}
async fn classify_redeem_error(resp: reqwest::Response) -> RedeemInviteError {
let status = resp.status();
let body_text = resp.text().await.unwrap_or_default();
let body_error: Option<String> = serde_json::from_str::<serde_json::Value>(&body_text)
.ok()
.and_then(|v| v.get("error").and_then(|e| e.as_str()).map(String::from));
match (status.as_u16(), body_error.as_deref()) {
(401, Some("expired")) => RedeemInviteError::Expired,
(401, _) => RedeemInviteError::InvalidCode,
(403, _) => RedeemInviteError::Revoked,
(409, _) => RedeemInviteError::Replayed,
(503, Some("kv_unavailable")) => RedeemInviteError::KvUnavailable,
(503, _) => RedeemInviteError::NotConfigured,
_ => RedeemInviteError::Unexpected {
status,
body: body_text,
},
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RedeemAgentInviteResponse {
pub user_jwt: String,
pub nats_url: String,
pub agent_id: String,
}
pub async fn redeem_invite_with_orchestrator(
orchestrator_url: &str,
invite_code: &str,
keypair: &nkeys::KeyPair,
) -> std::result::Result<RegistrationResult, RedeemInviteError> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| RedeemInviteError::Transport(anyhow::Error::new(e)))?;
let base = orchestrator_url.trim_end_matches('/');
let user_pub_key = keypair.public_key();
let body = serde_json::json!({
"code": invite_code,
"user_pub_key": user_pub_key,
});
let response = http
.post(format!("{base}/redeem-agent"))
.json(&body)
.send()
.await
.map_err(|e| {
RedeemInviteError::Transport(
anyhow::Error::new(e).context("Failed to send /redeem-agent request"),
)
})?;
if !response.status().is_success() {
return Err(classify_redeem_error(response).await);
}
let resp: RedeemAgentInviteResponse = response.json().await.map_err(|e| {
RedeemInviteError::Decode(
anyhow::Error::new(e).context("Failed to parse /redeem-agent response"),
)
})?;
let seed_str = keypair.seed().map_err(|e| {
RedeemInviteError::Decode(anyhow::anyhow!("Failed to extract user seed: {e}"))
})?;
let creds = format_nats_creds(&resp.user_jwt, &seed_str);
let owned_kp = nkeys::KeyPair::from_seed(&seed_str).map_err(|e| {
RedeemInviteError::Decode(anyhow::anyhow!("Failed to clone keypair from seed: {e}"))
})?;
Ok(RegistrationResult {
creds,
nats_url: resp.nats_url,
keypair: owned_kp,
})
}
pub async fn redeem_invite_with_orchestrator_with_retry(
orchestrator_url: &str,
invite_code: &str,
keypair: &nkeys::KeyPair,
max_attempts: u32,
) -> std::result::Result<RegistrationResult, RedeemInviteError> {
let base_delay_ms = 1_000u64;
let mut last_err = RedeemInviteError::Transport(anyhow::anyhow!("No attempts made"));
for attempt in 1..=max_attempts {
match redeem_invite_with_orchestrator(orchestrator_url, invite_code, keypair).await {
Ok(result) => return Ok(result),
Err(e) => {
if !e.is_retryable() || attempt == max_attempts {
return Err(e);
}
let wait = Duration::from_millis(base_delay_ms * 2u64.pow(attempt.min(6) - 1));
tracing::warn!(
orchestrator_url,
attempt,
max_attempts,
wait_ms = wait.as_millis(),
error = %e,
"Orchestrator not yet reachable, retrying invite redemption..."
);
tokio::time::sleep(wait).await;
last_err = e;
}
}
}
Err(last_err)
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RedeemOperatorInviteResponse {
pub token: String,
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub budget: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user_jwt: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nats_url: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
}
pub async fn redeem_operator_invite_with_orchestrator(
orchestrator_url: &str,
invite_code: &str,
user_pub_key: Option<&str>,
device_hint: Option<&str>,
) -> std::result::Result<RedeemOperatorInviteResponse, RedeemInviteError> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| RedeemInviteError::Transport(anyhow::Error::new(e)))?;
let base = orchestrator_url.trim_end_matches('/');
let mut body = serde_json::json!({ "code": invite_code });
if let Some(hint) = device_hint {
body["device_hint"] = serde_json::Value::String(hint.to_string());
}
if let Some(pub_key) = user_pub_key {
body["user_pub_key"] = serde_json::Value::String(pub_key.to_string());
}
let response = http
.post(format!("{base}/redeem"))
.json(&body)
.send()
.await
.map_err(|e| {
RedeemInviteError::Transport(
anyhow::Error::new(e).context("Failed to send /redeem request"),
)
})?;
if !response.status().is_success() {
return Err(classify_redeem_error(response).await);
}
response
.json::<RedeemOperatorInviteResponse>()
.await
.map_err(|e| {
RedeemInviteError::Decode(
anyhow::Error::new(e).context("Failed to parse /redeem response"),
)
})
}
pub async fn redeem_operator_invite_with_orchestrator_with_retry(
orchestrator_url: &str,
invite_code: &str,
user_pub_key: Option<&str>,
device_hint: Option<&str>,
max_attempts: u32,
) -> std::result::Result<RedeemOperatorInviteResponse, RedeemInviteError> {
let base_delay_ms = 1_000u64;
let mut last_err = RedeemInviteError::Transport(anyhow::anyhow!("No attempts made"));
for attempt in 1..=max_attempts {
match redeem_operator_invite_with_orchestrator(
orchestrator_url,
invite_code,
user_pub_key,
device_hint,
)
.await
{
Ok(result) => return Ok(result),
Err(e) => {
if !e.is_retryable() || attempt == max_attempts {
return Err(e);
}
let wait = Duration::from_millis(base_delay_ms * 2u64.pow(attempt.min(6) - 1));
tracing::warn!(
orchestrator_url,
attempt,
max_attempts,
wait_ms = wait.as_millis(),
error = %e,
"Orchestrator not yet reachable for operator redeem, retrying..."
);
tokio::time::sleep(wait).await;
last_err = e;
}
}
}
Err(last_err)
}
fn is_retryable_registration_error(e: &anyhow::Error) -> bool {
for cause in e.chain() {
if let Some(reqwest_err) = cause.downcast_ref::<reqwest::Error>() {
if let Some(status) = reqwest_err.status() {
if status.is_client_error() {
return false;
}
}
}
if cause.downcast_ref::<HashMismatchError>().is_some() {
return false;
}
if cause.downcast_ref::<CredentialsNotEnabledError>().is_some() {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_subject_component() {
assert_eq!(sanitize_subject_component("hello"), "hello");
assert_eq!(sanitize_subject_component("hello.world"), "hello_world");
assert_eq!(
sanitize_subject_component("my.session>with*wildcards"),
"my_session_with_wildcards"
);
assert_eq!(sanitize_subject_component("agent-1_v2"), "agent-1_v2");
}
#[test]
fn test_validate_nats_name_valid() {
assert!(validate_nats_name("my-job-123", "job_id").is_ok());
assert!(validate_nats_name("agent_v2", "agent_id").is_ok());
}
#[test]
fn test_validate_nats_name_empty() {
assert!(validate_nats_name("", "job_id").is_err());
}
#[test]
fn test_validate_nats_name_forbidden_chars() {
assert!(validate_nats_name("my.job", "job_id").is_err());
assert!(validate_nats_name("my job", "job_id").is_err());
assert!(validate_nats_name("my*job", "job_id").is_err());
assert!(validate_nats_name("my>job", "job_id").is_err());
}
#[test]
fn test_nats_auth_default_not_configured() {
let auth = NatsAuth::default();
assert!(!auth.is_configured());
}
#[test]
fn test_nats_auth_token_configured() {
let auth = NatsAuth {
token: Some("secret".into()),
..Default::default()
};
assert!(auth.is_configured());
}
#[test]
fn test_nats_auth_user_pass_configured() {
let auth = NatsAuth {
username: Some("user".into()),
password: Some("pass".into()),
..Default::default()
};
assert!(auth.is_configured());
}
#[test]
fn test_nats_auth_partial_user_not_configured() {
let auth = NatsAuth {
username: Some("user".into()),
..Default::default()
};
assert!(!auth.is_configured());
}
#[test]
fn test_nats_auth_is_configured_priority() {
let auth = NatsAuth {
token: Some("my-token".into()),
username: Some("user".into()),
password: Some("pass".into()),
inline_creds: Some("creds-content".into()),
creds_file: Some("/path/to/creds".into()),
};
assert!(
auth.is_configured(),
"All auth methods set should be configured"
);
let token_only = NatsAuth {
token: Some("t".into()),
..Default::default()
};
assert!(token_only.is_configured());
let creds_only = NatsAuth {
creds_file: Some("/path".into()),
..Default::default()
};
assert!(creds_only.is_configured());
let pass_only = NatsAuth {
password: Some("pass".into()),
..Default::default()
};
assert!(
!pass_only.is_configured(),
"Password alone should not count as configured"
);
}
#[test]
fn test_inline_creds_is_configured() {
let auth = NatsAuth {
inline_creds: Some("some-creds-content".into()),
..Default::default()
};
assert!(
auth.is_configured(),
"inline_creds alone should count as configured"
);
}
#[test]
fn test_nats_auth_serde_roundtrip() {
let auth = NatsAuth {
token: Some("my-token".into()),
username: None,
password: None,
inline_creds: None,
creds_file: Some("/path/to/creds".into()),
};
let json = serde_json::to_string(&auth).unwrap();
let parsed: NatsAuth = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.token.as_deref(), Some("my-token"));
assert_eq!(parsed.creds_file.as_deref(), Some("/path/to/creds"));
assert!(parsed.username.is_none());
assert!(parsed.inline_creds.is_none());
}
#[test]
fn test_nats_auth_serde_with_inline_creds() {
let auth = NatsAuth {
inline_creds: Some("jwt-and-seed-content".into()),
..Default::default()
};
let json = serde_json::to_string(&auth).unwrap();
assert!(json.contains("inline_creds"));
let parsed: NatsAuth = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.inline_creds.as_deref(), Some("jwt-and-seed-content"));
assert!(parsed.token.is_none());
assert!(parsed.creds_file.is_none());
}
#[test]
fn test_sha256_hex() {
let hash = sha256_hex("nats://localhost:4222");
assert_eq!(hash.len(), 64, "SHA-256 hex should be 64 chars");
assert_eq!(hash, sha256_hex("nats://localhost:4222"));
assert_ne!(hash, sha256_hex("nats://other:4222"));
}
#[test]
fn test_format_nats_creds() {
let jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.test";
let seed = "SUACIWOXXKLRCL7DTPOV3P7CQHNDCAP5JBHFPAGKE32GVVHZCPIBXAVBU";
let creds = format_nats_creds(jwt, seed);
assert!(creds.contains("-----BEGIN NATS USER JWT-----"));
assert!(creds.contains("------END NATS USER JWT------"));
assert!(creds.contains("-----BEGIN USER NKEY SEED-----"));
assert!(creds.contains("------END USER NKEY SEED------"));
assert!(creds.contains(jwt));
assert!(creds.contains(seed));
}
#[test]
fn test_validate_nats_name_unicode() {
assert!(validate_nats_name("agent_日本語", "agent_id").is_ok());
}
#[test]
fn test_validate_nats_name_control_chars() {
assert!(validate_nats_name("agent\tid", "field").is_err());
assert!(validate_nats_name("agent\nid", "field").is_err());
assert!(validate_nats_name("agent\rid", "field").is_err());
}
#[test]
fn test_validate_nats_name_null_byte() {
assert!(validate_nats_name("agent\0id", "field").is_err());
}
#[test]
fn test_validate_nats_name_slash() {
assert!(validate_nats_name("path/to/thing", "field").is_err());
}
#[test]
fn test_validate_nats_name_long() {
let long_name: String = "a".repeat(256);
assert!(validate_nats_name(&long_name, "field").is_ok());
}
#[test]
fn test_validate_nats_name_single_char() {
assert!(validate_nats_name("a", "field").is_ok());
assert!(validate_nats_name(".", "field").is_err());
}
#[test]
fn test_sanitize_dot_and_space() {
assert_eq!(
sanitize_subject_component("hello world.v2"),
"hello_world_v2"
);
}
#[test]
fn test_sanitize_preserves_hyphen_underscore() {
assert_eq!(sanitize_subject_component("my-agent_v2"), "my-agent_v2");
}
#[test]
fn test_sanitize_empty() {
assert_eq!(sanitize_subject_component(""), "");
}
#[test]
fn test_sanitize_all_special() {
assert_eq!(sanitize_subject_component(".*>"), "___");
}
#[test]
fn test_orchestrator_entry_serde_roundtrip() {
let entry = OrchestratorEntry {
id: Some("primary".into()),
url: "http://localhost:8080".into(),
bearer_token: Some("secret-token".into()),
invite_code: None,
};
let json = serde_json::to_string(&entry).unwrap();
let parsed: OrchestratorEntry = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.id.as_deref(), Some("primary"));
assert_eq!(parsed.url, "http://localhost:8080");
assert_eq!(parsed.bearer_token.as_deref(), Some("secret-token"));
}
#[test]
fn test_orchestrator_entry_defaults() {
let parsed: OrchestratorEntry = serde_json::from_str("{}").unwrap();
assert!(parsed.id.is_none());
assert_eq!(parsed.url, "");
assert!(parsed.bearer_token.is_none());
}
#[test]
fn test_challenge_response_serde() {
let cr = ChallengeResponse {
orchestrator_pub_key: "AAXYZ".into(),
nats_url_hash: "abc123def456".into(),
nonce: "random-nonce".into(),
expires_in_secs: 300,
};
let json = serde_json::to_string(&cr).unwrap();
let parsed: ChallengeResponse = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.orchestrator_pub_key, "AAXYZ");
assert_eq!(parsed.nats_url_hash, "abc123def456");
assert_eq!(parsed.nonce, "random-nonce");
assert_eq!(parsed.expires_in_secs, 300);
}
#[test]
fn test_registration_response_serde() {
let rr = RegistrationResponse {
user_jwt: "eyJ0eXAi.test.jwt".into(),
nats_url: "nats://example.com:4222".into(),
};
let json = serde_json::to_string(&rr).unwrap();
let parsed: RegistrationResponse = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.user_jwt, "eyJ0eXAi.test.jwt");
assert_eq!(parsed.nats_url, "nats://example.com:4222");
}
#[test]
fn test_hash_mismatch_not_retryable() {
let hm_err = HashMismatchError {
expected: "aabbcc".into(),
computed: "ddeeff".into(),
};
let anyhow_err = anyhow::Error::new(hm_err);
assert!(
!is_retryable_registration_error(&anyhow_err),
"HashMismatchError should be classified as non-retryable"
);
}
#[test]
fn test_hash_mismatch_display() {
let hm_err = HashMismatchError {
expected: "abc123".into(),
computed: "def456".into(),
};
let display = format!("{}", hm_err);
assert!(
display.contains("abc123"),
"Display should include expected hash"
);
assert!(
display.contains("def456"),
"Display should include computed hash"
);
assert!(
display.contains("tampered"),
"Display should warn about tampering"
);
}
#[test]
fn test_generic_error_is_retryable() {
let err = anyhow::anyhow!("connection timed out");
assert!(
is_retryable_registration_error(&err),
"Generic error should be retryable"
);
}
#[test]
fn test_hash_mismatch_wrapped_in_context_not_retryable() {
let hm_err = HashMismatchError {
expected: "aaa".into(),
computed: "bbb".into(),
};
let anyhow_err = anyhow::Error::new(hm_err).context("registration failed");
assert!(
!is_retryable_registration_error(&anyhow_err),
"HashMismatchError wrapped in context should still be non-retryable"
);
}
#[test]
fn test_credentials_not_enabled_not_retryable() {
let err = anyhow::Error::new(CredentialsNotEnabledError);
assert!(
!is_retryable_registration_error(&err),
"CredentialsNotEnabledError should be classified as non-retryable"
);
}
#[test]
fn test_credentials_not_enabled_wrapped_in_context_not_retryable() {
let err = anyhow::Error::new(CredentialsNotEnabledError).context("registration failed");
assert!(
!is_retryable_registration_error(&err),
"CredentialsNotEnabledError wrapped in context should still be non-retryable"
);
}
#[tokio::test]
async fn test_register_exhausts_retries() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/credentials/challenge"))
.respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error"))
.expect(2) .mount(&mock_server)
.await;
let result = register_with_orchestrator_with_retry(
&mock_server.uri(),
"test-agent",
"test-token",
2, )
.await;
match result {
Err(e) => {
let err_msg = format!("{:#}", e);
assert!(
err_msg.contains("challenge")
|| err_msg.contains("500")
|| err_msg.contains("rejected")
|| err_msg.contains("status"),
"Error should mention the challenge failure: {}",
err_msg
);
}
Ok(_) => panic!("Should return error after exhausting retries"),
}
}
#[tokio::test]
async fn test_register_no_retry_on_4xx() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/credentials/challenge"))
.respond_with(ResponseTemplate::new(401).set_body_string("Unauthorized"))
.expect(1) .mount(&mock_server)
.await;
let result = register_with_orchestrator_with_retry(
&mock_server.uri(),
"test-agent",
"bad-token",
5, )
.await;
assert!(result.is_err(), "Should return error immediately on 4xx");
}
#[tokio::test]
async fn test_register_503_returns_credentials_not_enabled() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/credentials/challenge"))
.respond_with(
ResponseTemplate::new(503).set_body_string("Credential issuance is not enabled"),
)
.expect(1) .mount(&mock_server)
.await;
let result = register_with_orchestrator_with_retry(
&mock_server.uri(),
"test-agent",
"test-token",
5, )
.await;
let err = result.expect_err("Should return error on 503");
assert!(
err.downcast_ref::<CredentialsNotEnabledError>().is_some(),
"Error should be CredentialsNotEnabledError, got: {err:#}"
);
}
#[tokio::test]
async fn redeem_invite_success_returns_creds() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user_jwt": "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.fake.jwt",
"nats_url": "nats://api.example.com:4222",
"agent_id": "researcher-bot-3",
})))
.expect(1)
.mount(&mock_server)
.await;
let result = redeem_invite_with_orchestrator(
&mock_server.uri(),
"eyJ.fake.invite.code",
&nkeys::KeyPair::new_user(),
)
.await
.expect("redeem must succeed on 200");
assert_eq!(result.nats_url, "nats://api.example.com:4222");
assert!(
result.creds.contains("eyJ0eXAi"),
"creds must embed the orchestrator-issued JWT"
);
assert!(
result.creds.contains("BEGIN USER NKEY SEED"),
"creds must embed the freshly-generated seed"
);
assert!(result.keypair.public_key().starts_with('U'));
assert!(result.keypair.seed().unwrap().starts_with("SU"));
}
#[tokio::test]
async fn redeem_invite_sends_user_pub_key_in_body() {
use wiremock::matchers::{body_partial_json, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.and(body_partial_json(serde_json::json!({
"code": "eyJ.fake.invite.code",
})))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user_jwt": "eyJ.ok.jwt",
"nats_url": "nats://localhost:4222",
"agent_id": "bot-1",
})))
.mount(&mock_server)
.await;
let _ = mock_server.received_requests().await;
let result = redeem_invite_with_orchestrator(
&mock_server.uri(),
"eyJ.fake.invite.code",
&nkeys::KeyPair::new_user(),
)
.await
.expect("redeem must succeed");
let reqs = mock_server.received_requests().await.unwrap();
assert_eq!(reqs.len(), 1);
let body: serde_json::Value = serde_json::from_slice(&reqs[0].body).unwrap();
let pub_key = body
.get("user_pub_key")
.and_then(|v| v.as_str())
.expect("redeem body must carry user_pub_key");
assert!(pub_key.starts_with('U'), "must be U-prefixed: {pub_key}");
assert_eq!(pub_key, result.keypair.public_key());
}
#[tokio::test]
async fn redeem_invite_401_invalid_code_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": "invalid_code",
})))
.expect(1) .mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"tampered.code.here",
&nkeys::KeyPair::new_user(),
5,
)
.await
.expect_err("401 must surface as error");
assert!(matches!(err, RedeemInviteError::InvalidCode), "got {err:?}");
assert!(!err.is_retryable());
}
#[tokio::test]
async fn redeem_invite_401_expired_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": "expired",
})))
.expect(1)
.mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator(
&mock_server.uri(),
"stale.code",
&nkeys::KeyPair::new_user(),
)
.await
.expect_err("401 expired must surface as error");
assert!(matches!(err, RedeemInviteError::Expired), "got {err:?}");
assert!(!err.is_retryable());
}
#[tokio::test]
async fn redeem_invite_409_replayed_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(409).set_body_json(serde_json::json!({
"error": "replayed",
})))
.expect(1)
.mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"used.up.code",
&nkeys::KeyPair::new_user(),
5,
)
.await
.expect_err("409 must surface as error");
assert!(matches!(err, RedeemInviteError::Replayed), "got {err:?}");
assert!(!err.is_retryable());
}
#[tokio::test]
async fn redeem_invite_403_revoked_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(403).set_body_json(serde_json::json!({
"error": "revoked",
})))
.expect(1)
.mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"revoked.code",
&nkeys::KeyPair::new_user(),
5,
)
.await
.expect_err("403 must surface as error");
assert!(matches!(err, RedeemInviteError::Revoked), "got {err:?}");
assert!(!err.is_retryable());
}
#[tokio::test]
async fn redeem_invite_503_not_configured_does_not_retry() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(503).set_body_json(serde_json::json!({
"error": "not_configured",
})))
.expect(1)
.mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"valid.code",
&nkeys::KeyPair::new_user(),
5,
)
.await
.expect_err("503 not_configured must surface as error");
assert!(
matches!(err, RedeemInviteError::NotConfigured),
"got {err:?}"
);
assert!(!err.is_retryable());
}
#[tokio::test]
async fn redeem_invite_503_retries_then_gives_up() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(503).set_body_json(serde_json::json!({
"error": "kv_unavailable",
})))
.expect(2)
.mount(&mock_server)
.await;
let result = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"valid.code",
&nkeys::KeyPair::new_user(),
2,
)
.await;
assert!(result.is_err(), "503 × 2 must surface as error");
}
#[tokio::test]
async fn redeem_invite_5xx_then_success_returns_ok() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(500))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user_jwt": "eyJ.ok.jwt",
"nats_url": "nats://api.example.com:4222",
"agent_id": "bot-1",
})))
.expect(1)
.mount(&mock_server)
.await;
let result = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"valid.code",
&nkeys::KeyPair::new_user(),
3,
)
.await
.expect("retry must recover the 200");
assert_eq!(result.nats_url, "nats://api.example.com:4222");
}
#[tokio::test]
async fn redeem_invite_keypair_matches_creds_embedded_seed() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user_jwt": "eyJ.fake.jwt",
"nats_url": "nats://localhost:4222",
"agent_id": "bot-1",
})))
.mount(&mock_server)
.await;
let result = redeem_invite_with_orchestrator(
&mock_server.uri(),
"code",
&nkeys::KeyPair::new_user(),
)
.await
.unwrap();
let seed = result.keypair.seed().unwrap();
assert!(
result.creds.contains(&seed),
"creds blob must contain the keypair's seed verbatim"
);
}
#[tokio::test]
async fn redeem_invite_retry_reuses_caller_keypair() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(503).set_body_json(serde_json::json!({
"error": "kv_unavailable",
})))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"user_jwt": "eyJ.ok.jwt",
"nats_url": "nats://localhost:4222",
"agent_id": "bot-1",
})))
.expect(1)
.mount(&mock_server)
.await;
let caller_kp = nkeys::KeyPair::new_user();
let expected_pub = caller_kp.public_key();
let expected_seed = caller_kp.seed().unwrap();
let result = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"valid.code",
&caller_kp,
3,
)
.await
.expect("retry must succeed on attempt 2");
assert_eq!(result.keypair.public_key(), expected_pub);
assert_eq!(result.keypair.seed().unwrap(), expected_seed);
let reqs = mock_server.received_requests().await.unwrap();
assert_eq!(reqs.len(), 2, "expected two attempts (503 then 200)");
for (i, req) in reqs.iter().enumerate() {
let body: serde_json::Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(
body["user_pub_key"].as_str(),
Some(expected_pub.as_str()),
"attempt {} pubkey rotated — retries must reuse caller's keypair",
i + 1
);
}
}
#[tokio::test]
async fn redeem_invite_decode_failure_does_not_retry() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem-agent"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"nats_url": "nats://localhost:4222",
"agent_id": "bot-1",
})))
.expect(1)
.mount(&mock_server)
.await;
let err = redeem_invite_with_orchestrator_with_retry(
&mock_server.uri(),
"code",
&nkeys::KeyPair::new_user(),
5,
)
.await
.unwrap_err();
match err {
RedeemInviteError::Decode(_) => {}
other => panic!("expected Decode, got {other:?}"),
}
}
#[test]
fn redeem_invite_decode_is_not_retryable() {
let e = RedeemInviteError::Decode(anyhow::anyhow!("synthetic"));
assert!(!e.is_retryable());
let e = RedeemInviteError::Transport(anyhow::anyhow!("synthetic"));
assert!(e.is_retryable());
}
fn jwt_with_aud(aud: Option<&str>) -> String {
use base64::Engine;
let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(br#"{"alg":"HS256","typ":"JWT"}"#);
let payload_json = match aud {
Some(a) => format!(r#"{{"sub":"alice","exp":1,"aud":"{a}"}}"#),
None => r#"{"sub":"alice","exp":1}"#.to_string(),
};
let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload_json);
format!("{header}.{payload}.AAAA")
}
#[test]
fn invite_audience_extracts_operator_aud() {
let code = jwt_with_aud(Some(AUD_OPERATOR_REDEEM));
assert_eq!(
invite_audience(&code).unwrap().as_deref(),
Some(AUD_OPERATOR_REDEEM)
);
}
#[test]
fn invite_audience_extracts_agent_aud() {
let code = jwt_with_aud(Some(AUD_AGENT_REDEEM));
assert_eq!(
invite_audience(&code).unwrap().as_deref(),
Some(AUD_AGENT_REDEEM)
);
}
#[test]
fn invite_audience_handles_legacy_no_aud() {
let code = jwt_with_aud(None);
assert_eq!(invite_audience(&code).unwrap(), None);
}
#[test]
fn invite_audience_rejects_non_jwt() {
assert!(invite_audience("not-a-jwt").is_err());
}
#[test]
fn invite_audience_rejects_invalid_base64() {
assert!(invite_audience("aaa.!!!notbase64!!!.bbb").is_err());
}
#[test]
fn invite_audience_rejects_invalid_json_payload() {
use base64::Engine;
let bad = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"not json");
let code = format!("AAAA.{bad}.BBBB");
assert!(invite_audience(&code).is_err());
}
#[test]
fn invite_audience_rejects_two_segments() {
let err = invite_audience("aaa.bbb").unwrap_err().to_string();
assert!(
err.contains("3 dot-separated segments") && err.contains("got 2"),
"must surface segment-count mismatch: {err}"
);
}
#[test]
fn invite_audience_rejects_four_segments() {
let err = invite_audience("aaa.bbb.ccc.ddd").unwrap_err().to_string();
assert!(
err.contains("3 dot-separated segments") && err.contains("got 4"),
"must surface segment-count mismatch: {err}"
);
}
#[test]
fn invite_audience_rejects_empty_input() {
assert!(invite_audience("").is_err());
}
#[tokio::test]
async fn redeem_operator_invite_returns_bearer_token() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"token": "op-bearer-abc-123",
"name": "alice",
"budget": 5.0,
})))
.expect(1)
.mount(&mock_server)
.await;
let result = redeem_operator_invite_with_orchestrator(
&mock_server.uri(),
"eyJ.fake.invite.code",
None,
Some("nsed init / wizard"),
)
.await
.expect("operator redeem must succeed on 200");
assert_eq!(result.token, "op-bearer-abc-123");
assert_eq!(result.name, "alice");
assert_eq!(result.budget, Some(5.0));
assert!(result.user_jwt.is_none());
assert!(result.nats_url.is_none());
assert!(result.agent_id.is_none());
}
#[tokio::test]
async fn redeem_operator_invite_401_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": "expired",
})))
.mount(&mock_server)
.await;
let err =
redeem_operator_invite_with_orchestrator(&mock_server.uri(), "stale.code", None, None)
.await
.expect_err("401 must surface as error");
assert!(matches!(err, RedeemInviteError::Expired), "got {err:?}");
}
#[tokio::test]
async fn redeem_operator_invite_409_replayed_returns_typed_variant() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(409).set_body_json(serde_json::json!({
"error": "replayed",
})))
.mount(&mock_server)
.await;
let err =
redeem_operator_invite_with_orchestrator(&mock_server.uri(), "used.code", None, None)
.await
.expect_err("409 must surface as error");
assert!(matches!(err, RedeemInviteError::Replayed), "got {err:?}");
}
#[tokio::test]
async fn redeem_operator_invite_omits_device_hint_when_none() {
use wiremock::matchers::{body_partial_json, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"token": "op-x",
"name": "bob",
})))
.mount(&mock_server)
.await;
let _ = redeem_operator_invite_with_orchestrator(&mock_server.uri(), "c", None, None)
.await
.expect("must succeed");
let reqs = mock_server.received_requests().await.unwrap();
assert_eq!(reqs.len(), 1);
let body: serde_json::Value = serde_json::from_slice(&reqs[0].body).unwrap();
assert!(
body.get("device_hint").is_none(),
"device_hint must not be present when None: {body}"
);
let _ = body_partial_json(serde_json::json!({"code": "c"}));
assert_eq!(body.get("code").and_then(|v| v.as_str()), Some("c"));
}
#[tokio::test]
async fn redeem_operator_invite_unified_returns_both_bearer_and_nats() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"token": "op-bearer-xyz",
"name": "alice",
"budget": 2.0,
"user_jwt": "eyJ.scoped.jwt",
"nats_url": "nats://api.example.com:4222",
"agent_id": "alice",
})))
.mount(&mock_server)
.await;
let result = redeem_operator_invite_with_orchestrator(
&mock_server.uri(),
"unified.code",
Some("UABCDEFG123"),
None,
)
.await
.expect("unified redeem must succeed");
assert_eq!(result.token, "op-bearer-xyz");
assert_eq!(result.user_jwt.as_deref(), Some("eyJ.scoped.jwt"));
assert_eq!(
result.nats_url.as_deref(),
Some("nats://api.example.com:4222")
);
assert_eq!(result.agent_id.as_deref(), Some("alice"));
}
#[tokio::test]
async fn redeem_operator_invite_passes_pub_key_in_body_when_supplied() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/redeem"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"token": "op-x",
"name": "x",
})))
.mount(&mock_server)
.await;
let _ = redeem_operator_invite_with_orchestrator(
&mock_server.uri(),
"c",
Some("UPUBKEY123"),
None,
)
.await
.expect("must succeed");
let reqs = mock_server.received_requests().await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&reqs[0].body).unwrap();
assert_eq!(
body.get("user_pub_key").and_then(|v| v.as_str()),
Some("UPUBKEY123")
);
}
#[test]
fn orchestrator_entry_deserializes_with_bearer_token_only() {
let yaml = r#"
id: primary
url: http://localhost:8080
bearer_token: ${NSED_BEARER_TOKEN}
"#;
let e: OrchestratorEntry = serde_yaml::from_str(yaml).unwrap();
assert_eq!(e.id.as_deref(), Some("primary"));
assert_eq!(e.bearer_token.as_deref(), Some("${NSED_BEARER_TOKEN}"));
assert!(e.invite_code.is_none());
}
#[test]
fn orchestrator_entry_deserializes_with_invite_code_only() {
let yaml = r#"
id: primary
url: http://localhost:8080
invite_code: ${NSED_INVITE_CODE}
"#;
let e: OrchestratorEntry = serde_yaml::from_str(yaml).unwrap();
assert_eq!(e.invite_code.as_deref(), Some("${NSED_INVITE_CODE}"));
assert!(e.bearer_token.is_none());
}
#[test]
fn orchestrator_entry_deserializes_with_both_fields() {
let yaml = r#"
url: http://localhost:8080
bearer_token: ${BT}
invite_code: ${IC}
"#;
let e: OrchestratorEntry = serde_yaml::from_str(yaml).unwrap();
assert!(e.bearer_token.is_some());
assert!(e.invite_code.is_some());
}
#[test]
fn orchestrator_entry_omits_invite_code_when_none() {
let e = OrchestratorEntry {
id: Some("primary".into()),
url: "http://localhost:8080".into(),
bearer_token: Some("${BT}".into()),
invite_code: None,
};
let yaml = serde_yaml::to_string(&e).unwrap();
assert!(
!yaml.contains("invite_code"),
"None invite_code must be skipped from output, got:\n{yaml}"
);
}
}