use crate::crypto::SigningKey;
use crate::planes::Authorizer;
use crate::revocation::SignedRevocationList;
use crate::PublicKey;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::time::interval;
use tracing::{debug, info, warn};
#[derive(Clone, Debug, Serialize)]
pub struct ApprovalRecord {
pub approver_key: String,
pub external_id: String,
pub approved_at: u64,
pub expires_at: u64,
pub request_hash: String,
}
#[derive(Clone, Debug, Serialize)]
pub struct AuthorizationEvent {
pub timestamp: String,
pub authorizer_id: String,
pub warrant_id: String,
pub decision: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub deny_reason: Option<String>,
pub tool: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed_constraint: Option<String>,
pub chain_depth: u8,
#[serde(skip_serializing_if = "Option::is_none")]
pub root_principal: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub warrant_stack: Option<String>,
pub latency_us: u64,
pub request_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub approvals: Option<Vec<ApprovalRecord>>,
}
impl AuthorizationEvent {
#[allow(clippy::too_many_arguments)]
pub fn allow(
authorizer_id: String,
warrant_id: String,
tool: String,
chain_depth: u8,
root_principal: Option<String>,
warrant_stack: Option<String>,
latency_us: u64,
request_id: String,
arguments: Option<String>,
approvals: Option<Vec<ApprovalRecord>>,
) -> Self {
Self {
timestamp: chrono::Utc::now().to_rfc3339(),
authorizer_id,
warrant_id,
decision: "allow",
deny_reason: None,
tool,
failed_constraint: None,
chain_depth,
root_principal,
warrant_stack,
latency_us,
request_id,
arguments,
approvals,
}
}
#[allow(clippy::too_many_arguments)]
pub fn deny(
authorizer_id: String,
warrant_id: String,
tool: String,
deny_reason: String,
failed_constraint: Option<String>,
chain_depth: u8,
root_principal: Option<String>,
warrant_stack: Option<String>,
latency_us: u64,
request_id: String,
arguments: Option<String>,
approvals: Option<Vec<ApprovalRecord>>,
) -> Self {
Self {
timestamp: chrono::Utc::now().to_rfc3339(),
authorizer_id,
warrant_id,
decision: "deny",
deny_reason: Some(deny_reason),
tool,
failed_constraint,
chain_depth,
root_principal,
warrant_stack,
latency_us,
request_id,
arguments,
approvals,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct SignedEvent {
#[serde(flatten)]
pub event: AuthorizationEvent,
pub signature: String,
pub signing_payload: String,
pub action: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action_category: Option<String>,
}
#[derive(serde::Serialize)]
struct ReceiptSigningPayload<'a> {
#[serde(rename = "1")]
authorizer_id: &'a str,
#[serde(rename = "2", with = "serde_bytes")]
warrant_chain: &'a [u8],
#[serde(rename = "3")]
action: &'a str,
#[serde(rename = "4")]
outcome: &'a str,
#[serde(rename = "5")]
timestamp: i64,
#[serde(rename = "6", skip_serializing_if = "Option::is_none")]
root_principal: Option<&'a str>,
}
pub type AuditEventSender = mpsc::Sender<AuthorizationEvent>;
pub fn create_audit_channel(
buffer_size: usize,
) -> (AuditEventSender, mpsc::Receiver<AuthorizationEvent>) {
mpsc::channel(buffer_size)
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct RuntimeMetrics {
pub uptime_seconds: u64,
pub requests_total: u64,
pub requests_since_last: u64,
pub avg_latency_us: u64,
pub p99_latency_us: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_bytes: Option<u64>,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct HeartbeatStats {
pub allow_count: u64,
pub deny_count: u64,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub top_deny_reasons: Vec<(String, u64)>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub top_actions: Vec<(String, u64)>,
pub unique_principals: u64,
pub unique_warrants: u64,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct SrlHealth {
#[serde(skip_serializing_if = "Option::is_none")]
pub last_fetch_at: Option<String>,
pub last_fetch_success: bool,
pub fetch_failures_total: u64,
pub verification_failures_total: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_srl_version: Option<u64>,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct EnvironmentInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub k8s_namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub k8s_pod_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub k8s_node_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub k8s_cluster: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cloud_provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cloud_region: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub environment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deploy_id: Option<String>,
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
pub metadata: HashMap<String, String>,
}
impl EnvironmentInfo {
pub fn from_env() -> Self {
Self {
k8s_namespace: std::env::var("TENUO_K8S_NAMESPACE")
.or_else(|_| std::env::var("POD_NAMESPACE"))
.ok(),
k8s_pod_name: std::env::var("TENUO_K8S_POD_NAME")
.or_else(|_| std::env::var("POD_NAME"))
.or_else(|_| std::env::var("HOSTNAME"))
.ok(),
k8s_node_name: std::env::var("TENUO_K8S_NODE_NAME")
.or_else(|_| std::env::var("NODE_NAME"))
.ok(),
k8s_cluster: std::env::var("TENUO_K8S_CLUSTER").ok(),
cloud_provider: std::env::var("TENUO_CLOUD_PROVIDER").ok(),
cloud_region: std::env::var("TENUO_CLOUD_REGION")
.or_else(|_| std::env::var("AWS_REGION"))
.or_else(|_| std::env::var("GOOGLE_CLOUD_REGION"))
.ok(),
environment: std::env::var("TENUO_ENVIRONMENT")
.or_else(|_| std::env::var("ENV"))
.or_else(|_| std::env::var("ENVIRONMENT"))
.ok(),
deploy_id: std::env::var("TENUO_DEPLOY_ID")
.or_else(|_| std::env::var("BUILD_ID"))
.or_else(|_| std::env::var("CI_COMMIT_SHA"))
.ok(),
metadata: HashMap::new(),
}
}
}
#[derive(Clone)]
pub struct MetricsCollector {
inner: Arc<MetricsCollectorInner>,
}
struct MetricsCollectorInner {
start_time: Instant,
requests_total: AtomicU64,
requests_since_last: AtomicU64,
allow_count: AtomicU64,
deny_count: AtomicU64,
latencies: Mutex<LatencyTracker>,
deny_reasons: Mutex<HashMap<String, u64>>,
actions: Mutex<HashMap<String, u64>>,
principals: Mutex<std::collections::HashSet<String>>,
warrants: Mutex<std::collections::HashSet<String>>,
srl_last_fetch_at: Mutex<Option<String>>,
srl_last_fetch_success: AtomicU64, srl_fetch_failures: AtomicU64,
srl_verification_failures: AtomicU64,
srl_current_version: AtomicU64,
}
struct LatencyTracker {
buffer: Vec<u64>,
index: usize,
total_sum: u64,
total_count: u64,
}
impl LatencyTracker {
fn new(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
index: 0,
total_sum: 0,
total_count: 0,
}
}
fn record(&mut self, latency_us: u64) {
self.total_sum += latency_us;
self.total_count += 1;
if self.buffer.len() < self.buffer.capacity() {
self.buffer.push(latency_us);
} else {
self.buffer[self.index] = latency_us;
self.index = (self.index + 1) % self.buffer.capacity();
}
}
fn avg(&self) -> u64 {
if self.total_count == 0 {
0
} else {
self.total_sum / self.total_count
}
}
fn p99(&self) -> u64 {
if self.buffer.is_empty() {
return 0;
}
let mut sorted = self.buffer.clone();
sorted.sort_unstable();
let idx = ((sorted.len() as f64) * 0.99).ceil() as usize;
sorted[idx.min(sorted.len() - 1)]
}
fn reset_interval(&mut self) {
self.total_sum = 0;
self.total_count = 0;
}
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
inner: Arc::new(MetricsCollectorInner {
start_time: Instant::now(),
requests_total: AtomicU64::new(0),
requests_since_last: AtomicU64::new(0),
allow_count: AtomicU64::new(0),
deny_count: AtomicU64::new(0),
latencies: Mutex::new(LatencyTracker::new(1000)), deny_reasons: Mutex::new(HashMap::new()),
actions: Mutex::new(HashMap::new()),
principals: Mutex::new(std::collections::HashSet::new()),
warrants: Mutex::new(std::collections::HashSet::new()),
srl_last_fetch_at: Mutex::new(None),
srl_last_fetch_success: AtomicU64::new(0),
srl_fetch_failures: AtomicU64::new(0),
srl_verification_failures: AtomicU64::new(0),
srl_current_version: AtomicU64::new(0),
}),
}
}
pub async fn record_authorization(
&self,
allowed: bool,
tool: &str,
latency_us: u64,
warrant_id: &str,
principal: Option<&str>,
deny_reason: Option<&str>,
) {
self.inner.requests_total.fetch_add(1, Ordering::Relaxed);
self.inner
.requests_since_last
.fetch_add(1, Ordering::Relaxed);
if allowed {
self.inner.allow_count.fetch_add(1, Ordering::Relaxed);
} else {
self.inner.deny_count.fetch_add(1, Ordering::Relaxed);
if let Some(reason) = deny_reason {
let mut reasons = self.inner.deny_reasons.lock().await;
*reasons.entry(reason.to_string()).or_insert(0) += 1;
}
}
{
let mut latencies = self.inner.latencies.lock().await;
latencies.record(latency_us);
}
{
let mut warrants = self.inner.warrants.lock().await;
warrants.insert(warrant_id.to_string());
}
if let Some(p) = principal {
let mut principals = self.inner.principals.lock().await;
principals.insert(p.to_string());
}
{
let mut actions = self.inner.actions.lock().await;
*actions.entry(tool.to_string()).or_insert(0) += 1;
}
}
pub async fn record_srl_fetch(&self, success: bool, version: Option<u64>) {
if success {
self.inner
.srl_last_fetch_success
.store(1, Ordering::Relaxed);
if let Some(v) = version {
self.inner.srl_current_version.store(v, Ordering::Relaxed);
}
let mut last_fetch = self.inner.srl_last_fetch_at.lock().await;
*last_fetch = Some(chrono::Utc::now().to_rfc3339());
} else {
self.inner
.srl_last_fetch_success
.store(0, Ordering::Relaxed);
self.inner
.srl_fetch_failures
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_srl_verification_failure(&self) {
self.inner
.srl_verification_failures
.fetch_add(1, Ordering::Relaxed);
}
pub async fn collect_runtime_metrics(&self) -> RuntimeMetrics {
let latencies = self.inner.latencies.lock().await;
RuntimeMetrics {
uptime_seconds: self.inner.start_time.elapsed().as_secs(),
requests_total: self.inner.requests_total.load(Ordering::Relaxed),
requests_since_last: self.inner.requests_since_last.load(Ordering::Relaxed),
avg_latency_us: latencies.avg(),
p99_latency_us: latencies.p99(),
memory_bytes: get_memory_usage(),
}
}
pub async fn collect_and_reset_stats(&self) -> HeartbeatStats {
let top_deny_reasons = {
let mut reasons = self.inner.deny_reasons.lock().await;
let mut sorted: Vec<_> = reasons.drain().collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1));
sorted.truncate(10);
sorted
};
let top_actions = {
let mut actions = self.inner.actions.lock().await;
let mut sorted: Vec<_> = actions.drain().collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1));
sorted.truncate(10);
sorted
};
let unique_principals = {
let mut principals = self.inner.principals.lock().await;
let count = principals.len() as u64;
principals.clear();
count
};
let unique_warrants = {
let mut warrants = self.inner.warrants.lock().await;
let count = warrants.len() as u64;
warrants.clear();
count
};
let allow_count = self.inner.allow_count.swap(0, Ordering::Relaxed);
let deny_count = self.inner.deny_count.swap(0, Ordering::Relaxed);
self.inner.requests_since_last.store(0, Ordering::Relaxed);
{
let mut latencies = self.inner.latencies.lock().await;
latencies.reset_interval();
}
HeartbeatStats {
allow_count,
deny_count,
top_deny_reasons,
top_actions,
unique_principals,
unique_warrants,
}
}
pub async fn collect_srl_health(&self) -> SrlHealth {
let last_fetch_at = self.inner.srl_last_fetch_at.lock().await.clone();
let version = self.inner.srl_current_version.load(Ordering::Relaxed);
SrlHealth {
last_fetch_at,
last_fetch_success: self.inner.srl_last_fetch_success.load(Ordering::Relaxed) == 1,
fetch_failures_total: self.inner.srl_fetch_failures.load(Ordering::Relaxed),
verification_failures_total: self
.inner
.srl_verification_failures
.load(Ordering::Relaxed),
current_srl_version: if version == 0 { None } else { Some(version) },
}
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
fn get_memory_usage() -> Option<u64> {
#[cfg(target_os = "linux")]
{
if let Ok(statm) = std::fs::read_to_string("/proc/self/statm") {
if let Some(rss_pages) = statm.split_whitespace().nth(1) {
if let Ok(pages) = rss_pages.parse::<u64>() {
return Some(pages * 4096);
}
}
}
None
}
#[cfg(target_os = "macos")]
{
None
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
None
}
}
#[derive(Clone)]
pub struct HeartbeatConfig {
pub control_plane_url: String,
pub api_key: String,
pub authorizer_name: String,
pub authorizer_type: String,
pub version: String,
pub interval_secs: u64,
pub authorizer: Option<Arc<RwLock<Authorizer>>>,
pub trusted_root: Option<PublicKey>,
pub audit_batch_size: usize,
pub audit_flush_interval_secs: u64,
pub environment: EnvironmentInfo,
pub metrics: Option<MetricsCollector>,
pub signing_key: SigningKey,
pub id_notify: Option<tokio::sync::watch::Sender<Option<String>>>,
pub agent_id: Option<String>,
pub connect_token: Option<crate::connect_token::ConnectToken>,
}
impl Default for HeartbeatConfig {
fn default() -> Self {
let signing_key = SigningKey::generate();
Self {
control_plane_url: String::new(),
api_key: String::new(),
authorizer_name: String::new(),
authorizer_type: "sidecar".to_string(),
version: String::new(),
interval_secs: 30,
authorizer: None,
trusted_root: None,
audit_batch_size: 100,
audit_flush_interval_secs: 10,
environment: EnvironmentInfo::default(),
metrics: None,
signing_key,
id_notify: None,
agent_id: None,
connect_token: None,
}
}
}
impl HeartbeatConfig {
pub fn from_connect_token(
raw_token: &str,
authorizer_name: &str,
authorizer_type: &str,
) -> Result<Self, crate::connect_token::ConnectTokenError> {
let ct = crate::connect_token::ConnectToken::parse(raw_token)?;
let signing_key = SigningKey::generate();
Ok(Self {
control_plane_url: ct.endpoint.clone(),
api_key: ct.api_key.clone(),
authorizer_name: authorizer_name.to_string(),
authorizer_type: authorizer_type.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
signing_key,
agent_id: ct.agent_id.clone(),
connect_token: Some(ct),
..Default::default()
})
}
}
#[derive(Serialize)]
struct RegisterRequest<'a> {
name: &'a str,
#[serde(rename = "type")]
authorizer_type: &'a str,
version: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
public_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
agent_id: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<&'a HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
environment: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
k8s_namespace: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
k8s_pod_name: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
k8s_cluster: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
cloud_provider: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
cloud_region: Option<&'a str>,
}
#[derive(Deserialize)]
struct RegisterResponse {
id: String,
}
#[derive(Serialize)]
struct HeartbeatRequest {
#[serde(skip_serializing_if = "Option::is_none")]
srl_version: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
metrics: Option<RuntimeMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
stats: Option<HeartbeatStats>,
#[serde(skip_serializing_if = "Option::is_none")]
srl_health: Option<SrlHealth>,
}
#[derive(Deserialize)]
struct HeartbeatResponse {
#[allow(dead_code)]
status: String,
#[serde(default)]
latest_srl_version: Option<u64>,
#[serde(default)]
refresh_required: bool,
}
#[derive(Deserialize)]
struct SrlResponse {
srl: String,
version: u64,
}
pub async fn start_heartbeat_loop(config: HeartbeatConfig) {
start_heartbeat_loop_with_audit(config, None).await;
}
pub async fn start_heartbeat_loop_with_audit(
config: HeartbeatConfig,
audit_rx: Option<mpsc::Receiver<AuthorizationEvent>>,
) {
start_heartbeat_loop_with_audit_and_id(config, audit_rx, Arc::new(RwLock::new(None))).await;
}
pub async fn start_heartbeat_loop_with_audit_and_id(
config: HeartbeatConfig,
audit_rx: Option<mpsc::Receiver<AuthorizationEvent>>,
shared_authorizer_id: Arc<RwLock<Option<String>>>,
) {
let client = Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
if let Some(ref ct) = config.connect_token {
if let Err(e) = ct.claim_agent(&config.signing_key).await {
warn!(error = %e, "connect token agent claim failed (non-fatal)");
}
}
let authorizer_id = match register_with_retry(&client, &config).await {
Some(id) => id,
None => {
warn!(
"Failed to register with control plane after 3 attempts. \
Authorizer will run in standalone mode without heartbeats."
);
return;
}
};
info!(
authorizer_id = %authorizer_id,
name = %config.authorizer_name,
"Registered with control plane"
);
{
let mut id_guard = shared_authorizer_id.write().await;
*id_guard = Some(authorizer_id.clone());
}
if let Some(ref tx) = config.id_notify {
let _ = tx.send(Some(authorizer_id.clone()));
}
let mut local_srl_version: u64 = 0;
if let (Some(ref authorizer), Some(ref trusted_root)) =
(&config.authorizer, &config.trusted_root)
{
match fetch_and_apply_srl(&client, &config, authorizer, trusted_root).await {
Ok(version) => {
local_srl_version = version;
info!(srl_version = version, "Initial SRL fetched");
if let Some(ref metrics) = config.metrics {
metrics.record_srl_fetch(true, Some(version)).await;
}
}
Err(e) => {
warn!(error = %e, "Failed to fetch initial SRL, will retry on heartbeat");
if let Some(ref metrics) = config.metrics {
metrics.record_srl_fetch(false, None).await;
}
}
}
}
if let Some(rx) = audit_rx {
let audit_client = client.clone();
let audit_config = config.clone();
let audit_authorizer_id = authorizer_id.clone();
tokio::spawn(async move {
run_audit_flush_loop(audit_client, audit_config, audit_authorizer_id, rx).await;
});
info!("Audit event streaming enabled");
}
let mut ticker = interval(Duration::from_secs(config.interval_secs));
ticker.tick().await;
loop {
ticker.tick().await;
match send_heartbeat(&client, &config, &authorizer_id).await {
Ok(response) => {
debug!(
authorizer_id = %authorizer_id,
"Heartbeat sent successfully"
);
let needs_update = response.refresh_required
|| response
.latest_srl_version
.map(|v| v > local_srl_version)
.unwrap_or(false);
if needs_update {
if let Some(ref authorizer) = config.authorizer {
if let Some(ref trusted_root) = config.trusted_root {
match fetch_and_apply_srl(&client, &config, authorizer, trusted_root)
.await
{
Ok(new_version) => {
local_srl_version = new_version;
info!(
srl_version = new_version,
refresh_required = response.refresh_required,
"SRL updated from control plane"
);
if let Some(ref metrics) = config.metrics {
metrics.record_srl_fetch(true, Some(new_version)).await;
}
}
Err(e) => {
warn!(
error = %e,
"Failed to fetch SRL from control plane"
);
if let Some(ref metrics) = config.metrics {
metrics.record_srl_fetch(false, None).await;
}
}
}
} else {
warn!("SRL update needed but no trusted root configured");
}
}
}
}
Err(e) => {
warn!(
authorizer_id = %authorizer_id,
error = %e,
"Heartbeat failed, will retry on next interval"
);
}
}
}
}
async fn run_audit_flush_loop(
client: Client,
config: HeartbeatConfig,
authorizer_id: String,
mut rx: mpsc::Receiver<AuthorizationEvent>,
) {
let mut buffer: Vec<AuthorizationEvent> = Vec::with_capacity(config.audit_batch_size);
let mut flush_ticker = interval(Duration::from_secs(config.audit_flush_interval_secs));
flush_ticker.tick().await;
loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(e) => {
buffer.push(e);
if buffer.len() >= config.audit_batch_size {
flush_audit_events(&client, &config, &authorizer_id, &mut buffer).await;
}
}
None => {
if !buffer.is_empty() {
info!(
authorizer_id = %authorizer_id,
remaining_events = buffer.len(),
"Flushing remaining audit events before shutdown"
);
flush_audit_events(&client, &config, &authorizer_id, &mut buffer).await;
}
info!("Audit channel closed, exiting flush loop");
break;
}
}
}
_ = flush_ticker.tick() => {
if !buffer.is_empty() {
flush_audit_events(&client, &config, &authorizer_id, &mut buffer).await;
}
}
}
}
}
fn sign_event(
event: &AuthorizationEvent,
signing_key: &SigningKey,
authorizer_id_override: &str,
) -> SignedEvent {
let warrant_chain_bytes = event
.warrant_stack
.as_ref()
.and_then(|ws| base64::Engine::decode(&base64::engine::general_purpose::STANDARD, ws).ok())
.unwrap_or_default();
let action = format!("tool:{}", event.tool);
let timestamp = chrono::DateTime::parse_from_rfc3339(&event.timestamp)
.map(|dt| dt.timestamp())
.unwrap_or_else(|_| chrono::Utc::now().timestamp());
let payload = ReceiptSigningPayload {
authorizer_id: authorizer_id_override,
warrant_chain: &warrant_chain_bytes,
action: &action,
outcome: event.decision,
timestamp,
root_principal: event.root_principal.as_deref(),
};
let mut payload_buf = Vec::new();
if ciborium::into_writer(&payload, &mut payload_buf).is_err() {
payload_buf.clear();
}
let signature = signing_key.sign(&payload_buf);
let signature_b64 = base64::Engine::encode(
&base64::engine::general_purpose::STANDARD,
signature.to_bytes(),
);
let signing_payload_b64 =
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &payload_buf);
let mut patched_event = event.clone();
patched_event.authorizer_id = authorizer_id_override.to_string();
SignedEvent {
event: patched_event,
signature: signature_b64,
signing_payload: signing_payload_b64,
action,
session_id: None,
action_category: Some("tool".to_string()),
}
}
async fn flush_audit_events(
client: &Client,
config: &HeartbeatConfig,
authorizer_id: &str,
buffer: &mut Vec<AuthorizationEvent>,
) {
if buffer.is_empty() {
return;
}
let event_count = buffer.len();
let url = format!(
"{}/v1/authorizers/{}/events",
config.control_plane_url, authorizer_id
);
let signed_events: Vec<SignedEvent> = buffer
.iter()
.map(|event| sign_event(event, &config.signing_key, authorizer_id))
.collect();
let result = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.api_key))
.header("Content-Type", "application/json")
.json(&signed_events)
.send()
.await;
match result {
Ok(response) if response.status().is_success() => {
debug!(
authorizer_id = %authorizer_id,
event_count = %event_count,
"Flushed audit events to control plane"
);
eprintln!(
"[tenuo] flushed {} audit events for {}",
event_count, authorizer_id
);
buffer.clear();
}
Ok(response) => {
let status = response.status();
let body = response.text().await.unwrap_or_default();
warn!(
authorizer_id = %authorizer_id,
event_count = %event_count,
status = %status,
"Failed to flush audit events, will retry"
);
eprintln!(
"[tenuo] WARN: flush failed status={} body={} (authorizer={})",
status,
&body[..body.len().min(200)],
authorizer_id
);
if buffer.len() > config.audit_batch_size * 10 {
let drain_count = buffer.len() - config.audit_batch_size;
warn!(
dropped_events = %drain_count,
"Dropping oldest audit events due to buffer overflow"
);
buffer.drain(0..drain_count);
}
}
Err(e) => {
warn!(
authorizer_id = %authorizer_id,
event_count = %event_count,
error = %e,
"Network error flushing audit events, will retry"
);
if buffer.len() > config.audit_batch_size * 10 {
let drain_count = buffer.len() - config.audit_batch_size;
warn!(
dropped_events = %drain_count,
"Dropping oldest audit events due to buffer overflow"
);
buffer.drain(0..drain_count);
}
}
}
}
async fn register_with_retry(client: &Client, config: &HeartbeatConfig) -> Option<String> {
const MAX_ATTEMPTS: u32 = 3;
for attempt in 1..=MAX_ATTEMPTS {
match register(client, config).await {
Ok(id) => return Some(id),
Err(e) => {
let backoff = Duration::from_secs(2u64.pow(attempt));
warn!(
attempt = attempt,
max_attempts = MAX_ATTEMPTS,
error = %e,
backoff_secs = backoff.as_secs(),
"Registration attempt failed, retrying..."
);
eprintln!(
"[tenuo] registration attempt {}/{} failed: {} (retry in {}s)",
attempt,
MAX_ATTEMPTS,
e,
backoff.as_secs(),
);
if attempt < MAX_ATTEMPTS {
tokio::time::sleep(backoff).await;
}
}
}
}
eprintln!(
"[tenuo] WARN: failed to register '{}' with {} after {} attempts. \
Running in standalone mode — events will not reach the dashboard.",
config.authorizer_name, config.control_plane_url, MAX_ATTEMPTS,
);
None
}
async fn register(client: &Client, config: &HeartbeatConfig) -> Result<String, HeartbeatError> {
let url = format!("{}/v1/authorizers/register", config.control_plane_url);
let public_key_hex = hex::encode(config.signing_key.public_key().to_bytes());
let env = &config.environment;
let meta = if config.environment.metadata.is_empty() {
None
} else {
Some(&config.environment.metadata)
};
let request_body = RegisterRequest {
name: &config.authorizer_name,
authorizer_type: &config.authorizer_type,
version: &config.version,
public_key: Some(public_key_hex),
agent_id: config.agent_id.as_deref(),
metadata: meta,
environment: env.environment.as_deref(),
k8s_namespace: env.k8s_namespace.as_deref(),
k8s_pod_name: env.k8s_pod_name.as_deref(),
k8s_cluster: env.k8s_cluster.as_deref(),
cloud_provider: env.cloud_provider.as_deref(),
cloud_region: env.cloud_region.as_deref(),
};
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.api_key))
.header("Content-Type", "application/json")
.json(&request_body)
.send()
.await
.map_err(|e| HeartbeatError::Network(e.to_string()))?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
return Err(HeartbeatError::Api {
status: status.as_u16(),
message: body,
});
}
let register_response: RegisterResponse = response
.json()
.await
.map_err(|e| HeartbeatError::Parse(e.to_string()))?;
Ok(register_response.id)
}
async fn send_heartbeat(
client: &Client,
config: &HeartbeatConfig,
authorizer_id: &str,
) -> Result<HeartbeatResponse, HeartbeatError> {
let url = format!(
"{}/v1/authorizers/{}/heartbeat",
config.control_plane_url, authorizer_id
);
let (metrics, stats, srl_health) = if let Some(ref collector) = config.metrics {
let metrics = collector.collect_runtime_metrics().await;
let stats = collector.collect_and_reset_stats().await;
let srl_health = collector.collect_srl_health().await;
(Some(metrics), Some(stats), Some(srl_health))
} else {
(None, None, None)
};
let srl_version = srl_health.as_ref().and_then(|h| h.current_srl_version);
let request_body = HeartbeatRequest {
srl_version,
metrics,
stats,
srl_health,
};
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", config.api_key))
.header("Content-Type", "application/json")
.json(&request_body)
.send()
.await
.map_err(|e| HeartbeatError::Network(e.to_string()))?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
return Err(HeartbeatError::Api {
status: status.as_u16(),
message: body,
});
}
let heartbeat_response: HeartbeatResponse = response
.json()
.await
.map_err(|e| HeartbeatError::Parse(e.to_string()))?;
Ok(heartbeat_response)
}
async fn fetch_and_apply_srl(
client: &Client,
config: &HeartbeatConfig,
authorizer: &Arc<RwLock<Authorizer>>,
trusted_root: &PublicKey,
) -> Result<u64, HeartbeatError> {
let url = format!("{}/v1/revocations/srl/signed", config.control_plane_url);
let response = client
.get(&url)
.header("Authorization", format!("Bearer {}", config.api_key))
.send()
.await
.map_err(|e| HeartbeatError::Network(e.to_string()))?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
return Err(HeartbeatError::Api {
status: status.as_u16(),
message: body,
});
}
let srl_response: SrlResponse = response
.json()
.await
.map_err(|e| HeartbeatError::Parse(e.to_string()))?;
let srl_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&srl_response.srl,
)
.map_err(|e| HeartbeatError::Parse(format!("Invalid base64 SRL: {}", e)))?;
let srl = SignedRevocationList::from_bytes(&srl_bytes)
.map_err(|e| HeartbeatError::Parse(format!("Invalid SRL format: {}", e)))?;
let mut auth = authorizer.write().await;
auth.set_revocation_list(srl, trusted_root)
.map_err(|e| HeartbeatError::Parse(format!("SRL verification failed: {}", e)))?;
Ok(srl_response.version)
}
#[derive(Debug)]
pub enum HeartbeatError {
Network(String),
Api { status: u16, message: String },
Parse(String),
}
impl std::fmt::Display for HeartbeatError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HeartbeatError::Network(msg) => write!(f, "Network error: {}", msg),
HeartbeatError::Api { status, message } => {
write!(f, "API error ({}): {}", status, message)
}
HeartbeatError::Parse(msg) => write!(f, "Parse error: {}", msg),
}
}
}
impl std::error::Error for HeartbeatError {}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> HeartbeatConfig {
HeartbeatConfig {
control_plane_url: "https://api.tenuo.cloud".to_string(),
api_key: "tc_test".to_string(),
authorizer_name: "test-auth".to_string(),
authorizer_type: "sidecar".to_string(),
version: "0.1.0-beta.7+authz.1".to_string(),
interval_secs: 30,
authorizer: None,
trusted_root: None,
audit_batch_size: 100,
audit_flush_interval_secs: 10,
environment: EnvironmentInfo::default(),
metrics: None,
signing_key: SigningKey::generate(),
id_notify: None,
agent_id: None,
connect_token: None,
}
}
#[test]
fn test_environment_info_default() {
let env = EnvironmentInfo::default();
assert!(env.k8s_namespace.is_none());
assert!(env.cloud_provider.is_none());
assert!(env.environment.is_none());
}
#[test]
fn test_runtime_metrics_default() {
let metrics = RuntimeMetrics::default();
assert_eq!(metrics.uptime_seconds, 0);
assert_eq!(metrics.requests_total, 0);
}
#[test]
fn test_heartbeat_stats_default() {
let stats = HeartbeatStats::default();
assert_eq!(stats.allow_count, 0);
assert_eq!(stats.deny_count, 0);
assert!(stats.top_deny_reasons.is_empty());
}
#[test]
fn test_srl_health_default() {
let health = SrlHealth::default();
assert!(health.last_fetch_at.is_none());
assert!(!health.last_fetch_success);
assert_eq!(health.current_srl_version, None);
}
#[tokio::test]
async fn test_metrics_collector_record_authorization() {
let collector = MetricsCollector::new();
collector
.record_authorization(true, "read_file", 100, "wid-1", Some("user-1"), None)
.await;
collector
.record_authorization(
false,
"write_file",
200,
"wid-2",
Some("user-1"),
Some("constraint_violation"),
)
.await;
let stats = collector.collect_and_reset_stats().await;
assert_eq!(stats.allow_count, 1);
assert_eq!(stats.deny_count, 1);
assert_eq!(stats.unique_principals, 1);
assert_eq!(stats.unique_warrants, 2);
}
#[tokio::test]
async fn test_metrics_collector_latency() {
let collector = MetricsCollector::new();
for i in 1..=100 {
collector
.record_authorization(true, "test", i * 10, &format!("wid-{}", i), None, None)
.await;
}
let metrics = collector.collect_runtime_metrics().await;
assert_eq!(metrics.requests_total, 100);
assert!(metrics.avg_latency_us > 0);
assert!(metrics.p99_latency_us >= metrics.avg_latency_us);
}
#[test]
fn test_authorization_event_allow() {
let event = AuthorizationEvent::allow(
"auth-123".to_string(),
"wid-456".to_string(),
"read_file".to_string(),
0,
Some("root-pk".to_string()),
Some("base64stack".to_string()),
1234,
"req-789".to_string(),
None,
None,
);
assert_eq!(event.decision, "allow");
assert!(event.deny_reason.is_none());
assert_eq!(event.tool, "read_file");
assert_eq!(event.chain_depth, 0);
assert_eq!(event.warrant_stack, Some("base64stack".to_string()));
assert!(event.approvals.is_none());
}
#[test]
fn test_authorization_event_deny() {
let event = AuthorizationEvent::deny(
"auth-123".to_string(),
"wid-456".to_string(),
"write_file".to_string(),
"constraint_violation".to_string(),
Some("path".to_string()),
1,
Some("root-pk".to_string()),
Some("base64stack".to_string()),
5678,
"req-999".to_string(),
None,
None,
);
assert_eq!(event.decision, "deny");
assert_eq!(event.deny_reason, Some("constraint_violation".to_string()));
assert_eq!(event.failed_constraint, Some("path".to_string()));
assert_eq!(event.chain_depth, 1);
assert_eq!(event.warrant_stack, Some("base64stack".to_string()));
assert!(event.approvals.is_none());
}
#[test]
fn test_authorization_event_with_approvals() {
let records = vec![ApprovalRecord {
approver_key: "abcd1234".to_string(),
external_id: "arn:aws:iam::123:user/admin".to_string(),
approved_at: 1700000000,
expires_at: 1700000300,
request_hash: "deadbeef".to_string(),
}];
let event = AuthorizationEvent::allow(
"auth-123".to_string(),
"wid-456".to_string(),
"execute_payment".to_string(),
2,
Some("root-pk".to_string()),
None,
500,
"req-approval".to_string(),
None,
Some(records),
);
assert_eq!(event.decision, "allow");
let approvals = event.approvals.as_ref().unwrap();
assert_eq!(approvals.len(), 1);
assert_eq!(approvals[0].external_id, "arn:aws:iam::123:user/admin");
assert_eq!(approvals[0].approved_at, 1700000000);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"approvals\""));
assert!(json.contains("arn:aws:iam::123:user/admin"));
}
#[test]
fn test_authorization_event_serialization() {
let event = AuthorizationEvent::allow(
"auth-123".to_string(),
"wid-456".to_string(),
"read_file".to_string(),
0,
None,
None, 1234,
"req-789".to_string(),
None,
None,
);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"decision\":\"allow\""));
assert!(json.contains("\"tool\":\"read_file\""));
assert!(!json.contains("deny_reason"));
assert!(!json.contains("warrant_stack"));
assert!(!json.contains("approvals"));
}
#[test]
fn test_create_audit_channel() {
let (tx, _rx) = create_audit_channel(100);
let _tx2 = tx.clone();
}
#[test]
fn test_heartbeat_config_clone() {
let config = test_config();
let cloned = config.clone();
assert_eq!(cloned.control_plane_url, config.control_plane_url);
assert_eq!(cloned.api_key, config.api_key);
assert_eq!(cloned.authorizer_name, config.authorizer_name);
}
#[test]
fn test_heartbeat_error_display() {
let network_err = HeartbeatError::Network("connection refused".to_string());
assert!(network_err.to_string().contains("Network error"));
let api_err = HeartbeatError::Api {
status: 401,
message: "Unauthorized".to_string(),
};
assert!(api_err.to_string().contains("401"));
let parse_err = HeartbeatError::Parse("invalid json".to_string());
assert!(parse_err.to_string().contains("Parse error"));
}
#[test]
fn test_heartbeat_response_deserialization() {
let minimal = r#"{"status": "active"}"#;
let resp: HeartbeatResponse = serde_json::from_str(minimal).unwrap();
assert_eq!(resp.status, "active");
assert_eq!(resp.latest_srl_version, None);
assert!(!resp.refresh_required);
let full = r#"{"status": "active", "latest_srl_version": 8, "refresh_required": true}"#;
let resp: HeartbeatResponse = serde_json::from_str(full).unwrap();
assert_eq!(resp.status, "active");
assert_eq!(resp.latest_srl_version, Some(8));
assert!(resp.refresh_required);
}
#[test]
fn test_srl_response_deserialization() {
let json = r#"{"srl": "dGVzdA==", "version": 5}"#;
let resp: SrlResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.srl, "dGVzdA==");
assert_eq!(resp.version, 5);
}
#[test]
fn test_receipt_signing_payload_cbor_encoding() {
let payload = ReceiptSigningPayload {
authorizer_id: "test",
warrant_chain: &[1, 2, 3],
action: "hello",
outcome: "world",
timestamp: 42,
root_principal: None,
};
let mut buf = Vec::new();
ciborium::into_writer(&payload, &mut buf).unwrap();
let hex_str: String = buf.iter().map(|b| format!("{:02x}", b)).collect();
println!("Rust CBOR hex: {}", hex_str);
assert_eq!(buf[0], 0xa5, "Expected CBOR map of 5 items");
assert_eq!(buf[1], 0x61, "Key should be 1-byte text string");
assert_eq!(buf[2], 0x31, "Key should be ASCII '1'");
let go_hex = "a56131647465737461324301020361336568656c6c6f613465776f726c646135182a";
assert_eq!(
hex_str, go_hex,
"Rust CBOR encoding must match Go encoding for signature verification"
);
}
#[test]
fn test_receipt_signing_payload_with_root_principal() {
let payload = ReceiptSigningPayload {
authorizer_id: "auth-1",
warrant_chain: &[],
action: "tool:read",
outcome: "allow",
timestamp: 1000,
root_principal: Some("org:acme"),
};
let mut buf = Vec::new();
ciborium::into_writer(&payload, &mut buf).unwrap();
assert_eq!(buf[0], 0xa6, "Expected CBOR map of 6 items");
let mut buf2 = Vec::new();
ciborium::into_writer(&payload, &mut buf2).unwrap();
assert_eq!(buf, buf2, "CBOR encoding must be deterministic");
}
#[test]
fn test_sign_event_produces_verifiable_signature() {
let signing_key = SigningKey::generate();
let event = AuthorizationEvent::allow(
"".to_string(),
"wrt-test-456".to_string(),
"send_email".to_string(),
2,
Some("org:acme".to_string()),
Some(base64::Engine::encode(
&base64::engine::general_purpose::STANDARD,
b"fake-chain",
)),
42,
"req-sign-test".to_string(),
None,
None,
);
let signed = sign_event(&event, &signing_key, "authz-real-id");
assert_eq!(signed.event.authorizer_id, "authz-real-id");
let payload_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signing_payload,
)
.expect("signing_payload must be valid base64");
let sig_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signature,
)
.expect("signature must be valid base64");
let sig = crate::crypto::Signature::from_bytes(
sig_bytes
.as_slice()
.try_into()
.expect("signature is 64 bytes"),
)
.expect("valid Ed25519 signature");
assert!(
signing_key
.public_key()
.verify(&payload_bytes, &sig)
.is_ok(),
"receipt signature must be verifiable with the signing key's public key"
);
}
#[test]
fn test_sign_event_different_keys_produce_different_signatures() {
let key1 = SigningKey::generate();
let key2 = SigningKey::generate();
let event = AuthorizationEvent::allow(
"".to_string(),
"wrt-1".to_string(),
"read_file".to_string(),
0,
None,
None,
10,
"req-1".to_string(),
None,
None,
);
let signed1 = sign_event(&event, &key1, "auth-1");
let signed2 = sign_event(&event, &key2, "auth-1");
assert_ne!(
signed1.signature, signed2.signature,
"different keys must produce different signatures"
);
}
#[test]
fn test_sign_event_tampered_payload_fails_verification() {
let signing_key = SigningKey::generate();
let event = AuthorizationEvent::deny(
"".to_string(),
"wrt-deny".to_string(),
"delete_db".to_string(),
"constraint_violation".to_string(),
Some("path".to_string()),
1,
None,
None,
99,
"req-deny".to_string(),
None,
None,
);
let signed = sign_event(&event, &signing_key, "auth-id");
let mut payload_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signing_payload,
)
.unwrap();
if let Some(byte) = payload_bytes.last_mut() {
*byte ^= 0xFF;
}
let sig_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signature,
)
.unwrap();
let sig = crate::crypto::Signature::from_bytes(sig_bytes.as_slice().try_into().unwrap())
.expect("valid Ed25519 signature bytes");
assert!(
signing_key
.public_key()
.verify(&payload_bytes, &sig)
.is_err(),
"tampered payload must fail signature verification"
);
}
#[test]
fn test_sign_event_with_approvals_in_event() {
let signing_key = SigningKey::generate();
let records = vec![
ApprovalRecord {
approver_key: "aabb".to_string(),
external_id: "admin@corp.com".to_string(),
approved_at: 1700000000,
expires_at: 1700000300,
request_hash: "ccdd".to_string(),
},
ApprovalRecord {
approver_key: "eeff".to_string(),
external_id: "security@corp.com".to_string(),
approved_at: 1700000001,
expires_at: 1700000301,
request_hash: "ccdd".to_string(),
},
];
let event = AuthorizationEvent::allow(
"".to_string(),
"wrt-approval".to_string(),
"execute_payment".to_string(),
1,
Some("org:bank".to_string()),
None,
50,
"req-approval".to_string(),
None,
Some(records),
);
let signed = sign_event(&event, &signing_key, "auth-bank");
let json = serde_json::to_string(&signed).unwrap();
assert!(json.contains("admin@corp.com"));
assert!(json.contains("security@corp.com"));
assert!(json.contains("approvals"));
let payload_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signing_payload,
)
.unwrap();
let payload_hex: String = payload_bytes.iter().map(|b| format!("{:02x}", b)).collect();
assert!(
!payload_hex.contains(&hex::encode(b"admin@corp.com")),
"approval data must not leak into the signing payload"
);
let sig_bytes = base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
&signed.signature,
)
.unwrap();
let sig = crate::crypto::Signature::from_bytes(sig_bytes.as_slice().try_into().unwrap())
.expect("valid Ed25519 signature bytes");
assert!(
signing_key
.public_key()
.verify(&payload_bytes, &sig)
.is_ok(),
"signature must verify even with approvals in the event"
);
}
#[test]
fn test_approval_record_serialization_skip_none() {
let event_with = AuthorizationEvent::allow(
"a".into(),
"w".into(),
"t".into(),
0,
None,
None,
0,
"r".into(),
None,
Some(vec![ApprovalRecord {
approver_key: "key".into(),
external_id: "id".into(),
approved_at: 1,
expires_at: 2,
request_hash: "hash".into(),
}]),
);
let event_without = AuthorizationEvent::allow(
"a".into(),
"w".into(),
"t".into(),
0,
None,
None,
0,
"r".into(),
None,
None,
);
let json_with = serde_json::to_string(&event_with).unwrap();
let json_without = serde_json::to_string(&event_without).unwrap();
assert!(json_with.contains("\"approvals\""));
assert!(
!json_without.contains("\"approvals\""),
"None approvals must be omitted from JSON (skip_serializing_if)"
);
}
}