use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
use super::credential::{FederationCredential, SignedCredential};
use super::outbound::{self, OutboundCredentialHolder};
static LAST_RENEW_UNIX: AtomicI64 = AtomicI64::new(0);
pub const DEFAULT_RENEWAL_INTERVAL_SECS: i64 = crate::SECS_PER_MINUTE;
pub const DEFAULT_RENEWAL_LEAD_SECS: i64 = crate::SECS_PER_HOUR / 4;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RenewalOutcome {
Reloaded,
Unchanged,
Failed(String),
}
#[must_use]
pub fn apply_refresh(
holder: &OutboundCredentialHolder,
loaded: Option<SignedCredential>,
) -> RenewalOutcome {
let Some(loaded) = loaded else {
return RenewalOutcome::Unchanged;
};
match holder.current() {
Some(current) if current.credential() == loaded.credential() => RenewalOutcome::Unchanged,
_ => {
holder.store(Some(loaded));
RenewalOutcome::Reloaded
}
}
}
pub fn refresh_once(holder: &OutboundCredentialHolder, now_unix: i64) -> RenewalOutcome {
let loaded = match SignedCredential::load_from_env() {
Ok(loaded) => loaded,
Err(e) => {
tracing::warn!(target: crate::federation::SIGNING_TRACE_TARGET, error = %e,
"outbound credential refresh: load failed; keeping the currently-held credential");
return RenewalOutcome::Failed(e.to_string());
}
};
let outcome = apply_refresh(holder, loaded);
if outcome == RenewalOutcome::Reloaded {
tracing::info!(target: crate::federation::SIGNING_TRACE_TARGET,
"outbound federation credential reloaded from disk");
LAST_RENEW_UNIX.store(now_unix, Ordering::Relaxed);
}
if let Some(current) = holder.current() {
let claims = current.credential();
crate::metrics::set_federation_cred_max_age_seconds((now_unix - claims.not_before).max(0));
let last = match LAST_RENEW_UNIX.compare_exchange(
0,
now_unix,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => now_unix,
Err(prev) => prev,
};
crate::metrics::set_federation_renewal_lag_seconds((now_unix - last).max(0));
let not_after = claims.not_after;
if now_unix > not_after {
tracing::warn!(target: crate::federation::SIGNING_TRACE_TARGET, not_after,
"held outbound credential has EXPIRED and no fresh credential is on disk; \
peers will fall back to per-peer enrollment for this node");
} else if now_unix + DEFAULT_RENEWAL_LEAD_SECS >= not_after {
tracing::info!(target: crate::federation::SIGNING_TRACE_TARGET, not_after,
"held outbound credential nearing expiry; awaiting a fresh credential file");
}
}
outcome
}
fn emit_renewal_audit(conn: &rusqlite::Connection, claims: &FederationCredential, now_unix: i64) {
use std::collections::BTreeMap;
let timestamp = chrono::Utc::now().to_rfc3339();
let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
map.insert(
"subject_agent_id",
ciborium::Value::Text(claims.subject_agent_id.clone()),
);
map.insert("issuer_id", ciborium::Value::Text(claims.issuer_id.clone()));
map.insert(
"trust_domain",
ciborium::Value::Text(claims.trust_domain.clone()),
);
map.insert(
"not_before",
ciborium::Value::Integer(claims.not_before.into()),
);
map.insert(
"not_after",
ciborium::Value::Integer(claims.not_after.into()),
);
map.insert("renewed_at_unix", ciborium::Value::Integer(now_unix.into()));
let entries: Vec<(ciborium::Value, ciborium::Value)> = map
.into_iter()
.map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
.collect();
let value = ciborium::Value::Map(entries);
let mut cbor: Vec<u8> = Vec::with_capacity(128);
if let Err(e) = ciborium::ser::into_writer(&value, &mut cbor) {
tracing::warn!(target: crate::federation::SIGNING_TRACE_TARGET, error = %e,
"failed to encode canonical CBOR for credential-renewal audit event");
return;
}
let event = crate::signed_events::SignedEvent::with_daemon_signature(
crate::signed_events::payload_hash(&cbor),
claims.subject_agent_id.clone(),
crate::signed_events::event_types::FED_CREDENTIAL_RENEWED.to_string(),
timestamp,
);
if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
tracing::warn!(target: crate::federation::SIGNING_TRACE_TARGET, error = %e,
"failed to append credential-renewal audit row; live credential is retained");
}
}
pub fn spawn_refresh_outbound_credential(
db: crate::handlers::Db,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
let now_unix = chrono::Utc::now().timestamp();
let outcome = refresh_once(outbound::shared(), now_unix);
if outcome == RenewalOutcome::Reloaded {
if let Some(cred) = outbound::shared().current() {
let lock = db.lock().await;
emit_renewal_audit(&lock.0, cred.credential(), now_unix);
}
}
if let Err(e) = outbound::reload_intermediates_from_env() {
tracing::warn!(target: crate::federation::SIGNING_TRACE_TARGET, error = %e,
"failed to reload outbound federation intermediate chain; retaining prior");
}
tokio::time::sleep(interval).await;
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::federation::identity::issuer::{FederationIssuer, IssuerConfig};
use ed25519_dalek::SigningKey;
fn issuer() -> FederationIssuer {
FederationIssuer::new(
SigningKey::from_bytes(&[1u8; 32]),
IssuerConfig::new("trust-domain-root", "fleet.example"),
)
}
fn cred_at(now_unix: i64) -> SignedCredential {
let subject = SigningKey::from_bytes(&[7u8; 32]);
issuer()
.issue("region/nyc/node-1", &subject.verifying_key(), now_unix)
.expect("mint")
}
#[test]
fn loaded_none_keeps_held_credential() {
let held = cred_at(1_900_000_000);
let holder = OutboundCredentialHolder::new(Some(held.clone()));
assert_eq!(apply_refresh(&holder, None), RenewalOutcome::Unchanged);
assert_eq!(
holder.current().expect("still held").credential(),
held.credential()
);
}
#[test]
fn loaded_into_empty_holder_reloads() {
let holder = OutboundCredentialHolder::new(None);
let fresh = cred_at(1_900_000_000);
assert_eq!(
apply_refresh(&holder, Some(fresh.clone())),
RenewalOutcome::Reloaded
);
assert_eq!(
holder.current().expect("now held").credential(),
fresh.credential()
);
}
#[test]
fn identical_loaded_credential_is_unchanged() {
let held = cred_at(1_900_000_000);
let holder = OutboundCredentialHolder::new(Some(held.clone()));
assert_eq!(
apply_refresh(&holder, Some(held)),
RenewalOutcome::Unchanged
);
}
#[test]
fn refresh_once_sets_cred_age_gauge() {
let issued = 1_900_000_000;
let held = cred_at(issued);
let not_before = held.credential().not_before;
let holder = OutboundCredentialHolder::new(Some(held));
let now = not_before + 1234;
let _ = refresh_once(&holder, now);
assert_eq!(
crate::metrics::registry()
.federation_cred_max_age_seconds
.get(),
1234
);
}
#[test]
fn newer_loaded_credential_swaps_in() {
let old = cred_at(1_900_000_000);
let holder = OutboundCredentialHolder::new(Some(old));
let renewed = cred_at(1_900_000_000 + crate::SECS_PER_HOUR);
assert_eq!(
apply_refresh(&holder, Some(renewed.clone())),
RenewalOutcome::Reloaded
);
assert_eq!(
holder.current().expect("held").credential().not_after,
renewed.credential().not_after
);
}
#[test]
fn emit_renewal_audit_appends_signed_event() {
let conn = rusqlite::Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(include_str!(
"../../../migrations/sqlite/0020_v07_signed_events.sql"
))
.expect("apply signed_events migration");
let issued = 1_900_000_000;
let cred = cred_at(issued);
let claims = cred.credential().clone();
emit_renewal_audit(&conn, &claims, issued + 5);
let rows =
crate::signed_events::list_signed_events(&conn, Some(&claims.subject_agent_id), 10, 0)
.expect("list");
assert_eq!(rows.len(), 1, "exactly one renewal audit row");
assert_eq!(
rows[0].event_type,
crate::signed_events::event_types::FED_CREDENTIAL_RENEWED
);
assert_eq!(rows[0].agent_id, claims.subject_agent_id);
assert_eq!(rows[0].sequence, 1);
assert!(!rows[0].payload_hash.is_empty());
}
}