use std::net::IpAddr;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::capabilities::MethodPath;
use crate::credential::{Origin, Scope};
use crate::forward::{ForwardDerivation, ForwardPolicyName};
use crate::principal::{Principal, ServiceIdentity};
use crate::verified_user::VerifiedUser;
#[derive(Deserialize)]
struct VerifiedUserWire {
user_id: String,
issuer: String,
issued_at: i64,
expires_at: i64,
}
#[derive(Deserialize)]
struct ServiceIdentityWire {
service_id: String,
}
#[derive(Deserialize)]
enum PrincipalWire {
User(VerifiedUserWire),
Service(ServiceIdentityWire),
Anonymous,
}
impl From<PrincipalWire> for Principal {
fn from(w: PrincipalWire) -> Self {
match w {
PrincipalWire::Anonymous => Principal::anonymous_sealed(),
PrincipalWire::User(u) => Principal::user_sealed(VerifiedUser::new_sealed(
u.user_id,
u.issuer,
u.issued_at,
u.expires_at,
)),
PrincipalWire::Service(s) => {
Principal::service_sealed(ServiceIdentity::new_sealed(s.service_id))
}
}
}
}
fn deserialize_invocation_chain<'de, D>(deserializer: D) -> Result<Vec<Principal>, D::Error>
where
D: serde::Deserializer<'de>,
{
let wire: Vec<PrincipalWire> = Vec::deserialize(deserializer)?;
Ok(wire.into_iter().map(Principal::from).collect())
}
fn empty_invocation_chain() -> Vec<Principal> {
Vec::new()
}
fn deserialize_policy_name<'de, D>(
deserializer: D,
) -> Result<Option<ForwardPolicyName>, D::Error>
where
D: serde::Deserializer<'de>,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
Ok(opt.map(|s| {
match s.as_str() {
"identity_only" => crate::forward::IDENTITY_ONLY_NAME,
"pass_through" => crate::forward::PASS_THROUGH_NAME,
"anonymous" => crate::forward::ANONYMOUS_NAME,
_ => {
let leaked: &'static str = Box::leak(s.into_boxed_str());
ForwardPolicyName::new(leaked)
}
}
}))
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct UserId(String);
impl UserId {
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_string(self) -> String {
self.0
}
}
impl std::fmt::Display for UserId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SessionId(String);
impl SessionId {
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_string(self) -> String {
self.0
}
}
impl std::fmt::Display for SessionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RoleName(String);
impl RoleName {
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_string(self) -> String {
self.0
}
}
impl std::fmt::Display for RoleName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditRecordKind {
#[default]
ScopeCheck,
ForwardPolicyApplied,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ScopeCheck;
impl From<ScopeCheck> for AuditRecordKind {
fn from(_: ScopeCheck) -> Self {
AuditRecordKind::ScopeCheck
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ForwardPolicyApplied;
impl From<ForwardPolicyApplied> for AuditRecordKind {
fn from(_: ForwardPolicyApplied) -> Self {
AuditRecordKind::ForwardPolicyApplied
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "decision", rename_all = "snake_case")]
pub enum AuditDecision {
Allow,
Deny {
reason: AuditDenyReason,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditDenyReason {
Unauthenticated,
InvalidSession,
MissingScope,
NotAccepted,
TenantBoundary,
RateLimited,
Other,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SensitiveField(String);
impl SensitiveField {
pub fn new(path: impl Into<String>) -> Self {
Self(path.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditRecord {
pub timestamp: DateTime<Utc>,
#[serde(default)]
pub kind: AuditRecordKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub originator: Option<UserId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<SessionId>,
#[serde(
default = "empty_invocation_chain",
deserialize_with = "deserialize_invocation_chain"
)]
pub invocation_chain: Vec<Principal>,
#[serde(default)]
pub roles: Vec<RoleName>,
pub method: MethodPath,
#[serde(default)]
pub scope_required: Vec<Scope>,
pub decision: AuditDecision,
pub latency_us: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub origin: Option<Origin>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_ip: Option<IpAddr>,
pub correlation_id: Uuid,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_policy_name"
)]
pub policy_name: Option<ForwardPolicyName>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub derivation: Option<ForwardDerivation>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub caller_ns: Option<String>,
}
#[async_trait]
pub trait AuditSink: Send + Sync + 'static {
async fn write(&self, record: AuditRecord);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TracingAuditSink;
impl TracingAuditSink {
pub const fn new() -> Self {
Self
}
pub fn arc() -> Arc<dyn AuditSink> {
Arc::new(Self::new())
}
}
#[async_trait]
impl AuditSink for TracingAuditSink {
async fn write(&self, record: AuditRecord) {
let AuditRecord {
timestamp,
kind,
originator,
session_id,
invocation_chain,
roles,
method,
scope_required,
decision,
latency_us,
origin,
client_ip,
correlation_id,
policy_name,
derivation,
caller_ns,
} = record;
tracing::info!(
target: "plexus::audit",
%timestamp,
?kind,
originator = ?originator,
session_id = ?session_id,
invocation_chain = ?invocation_chain,
roles = ?roles,
method = %method,
scope_required = ?scope_required,
decision = ?decision,
latency_us,
origin = ?origin,
client_ip = ?client_ip,
%correlation_id,
policy_name = ?policy_name,
derivation = ?derivation,
caller_ns = ?caller_ns,
"audit",
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::forward::ForwardDerivation;
use crate::principal::Principal;
use std::net::{IpAddr, Ipv4Addr};
fn scope_check_record() -> AuditRecord {
AuditRecord {
timestamp: DateTime::from_timestamp(1_700_000_000, 0).unwrap(),
kind: AuditRecordKind::ScopeCheck,
originator: Some(UserId::new("alice")),
session_id: Some(SessionId::new("sess-42")),
invocation_chain: vec![Principal::anonymous_sealed()],
roles: vec![RoleName::new("admin"), RoleName::new("user")],
method: MethodPath::try_new("solar.earth.luna.info").unwrap(),
scope_required: vec![Scope::new("luna.read")],
decision: AuditDecision::Allow,
latency_us: 123,
origin: Some(Origin::new("ws://localhost:4444")),
client_ip: Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))),
correlation_id: Uuid::nil(),
policy_name: None,
derivation: None,
caller_ns: None,
}
}
fn forward_policy_record() -> AuditRecord {
AuditRecord {
timestamp: DateTime::from_timestamp(1_700_000_001, 0).unwrap(),
kind: AuditRecordKind::ForwardPolicyApplied,
originator: Some(UserId::new("bob")),
session_id: Some(SessionId::new("sess-99")),
invocation_chain: vec![
Principal::anonymous_sealed(),
Principal::anonymous_sealed(),
],
roles: vec![RoleName::new("editor")],
method: MethodPath::try_new("solar.earth.atmosphere.layer").unwrap(),
scope_required: vec![Scope::new("atmosphere.read")],
decision: AuditDecision::Deny {
reason: AuditDenyReason::MissingScope,
},
latency_us: 456,
origin: Some(Origin::new("ws://localhost:4444")),
client_ip: Some(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))),
correlation_id: Uuid::from_u128(0xdead_beef_cafe_babe_1234_5678_9abc_def0),
policy_name: Some(crate::forward::IDENTITY_ONLY_NAME),
derivation: Some(ForwardDerivation::IDENTITY_ONLY),
caller_ns: Some("ns.caller".to_string()),
}
}
#[test]
fn audit_record_kind_default_is_scope_check() {
assert_eq!(AuditRecordKind::default(), AuditRecordKind::ScopeCheck);
}
#[test]
fn audit_record_kind_serializes_snake_case() {
let v = serde_json::to_string(&AuditRecordKind::ScopeCheck).unwrap();
assert_eq!(v, "\"scope_check\"");
let v = serde_json::to_string(&AuditRecordKind::ForwardPolicyApplied).unwrap();
assert_eq!(v, "\"forward_policy_applied\"");
}
#[test]
fn audit_record_omitted_kind_deserializes_as_scope_check() {
let blob = r#"{
"timestamp": "2023-11-14T22:13:20Z",
"method": "solar.earth.luna.info",
"decision": {"decision": "allow"},
"latency_us": 17,
"correlation_id": "00000000-0000-0000-0000-000000000000"
}"#;
let record: AuditRecord = serde_json::from_str(blob).unwrap();
assert_eq!(record.kind, AuditRecordKind::ScopeCheck);
}
#[test]
fn audit_record_omitted_optional_forward_fields_default_to_none() {
let blob = r#"{
"timestamp": "2023-11-14T22:13:20Z",
"method": "solar.earth.luna.info",
"decision": {"decision": "allow"},
"latency_us": 17,
"correlation_id": "00000000-0000-0000-0000-000000000000"
}"#;
let record: AuditRecord = serde_json::from_str(blob).unwrap();
assert!(record.policy_name.is_none());
assert!(record.derivation.is_none());
assert!(record.caller_ns.is_none());
assert!(record.originator.is_none());
assert!(record.session_id.is_none());
assert!(record.invocation_chain.is_empty());
assert!(record.roles.is_empty());
assert!(record.scope_required.is_empty());
assert!(record.origin.is_none());
assert!(record.client_ip.is_none());
}
#[test]
fn audit_deny_reason_serializes_snake_case_all_variants() {
let cases = [
(AuditDenyReason::Unauthenticated, "\"unauthenticated\""),
(AuditDenyReason::InvalidSession, "\"invalid_session\""),
(AuditDenyReason::MissingScope, "\"missing_scope\""),
(AuditDenyReason::NotAccepted, "\"not_accepted\""),
(AuditDenyReason::TenantBoundary, "\"tenant_boundary\""),
(AuditDenyReason::RateLimited, "\"rate_limited\""),
(AuditDenyReason::Other, "\"other\""),
];
for (variant, encoded) in cases {
let s = serde_json::to_string(&variant).unwrap();
assert_eq!(s, encoded, "serialize {:?}", variant);
let parsed: AuditDenyReason = serde_json::from_str(encoded).unwrap();
assert_eq!(parsed, variant, "deserialize {}", encoded);
}
}
#[test]
fn audit_decision_allow_round_trips() {
let s = serde_json::to_string(&AuditDecision::Allow).unwrap();
assert_eq!(s, "{\"decision\":\"allow\"}");
let v: AuditDecision = serde_json::from_str(&s).unwrap();
assert_eq!(v, AuditDecision::Allow);
}
#[test]
fn audit_decision_deny_carries_reason() {
let d = AuditDecision::Deny {
reason: AuditDenyReason::MissingScope,
};
let s = serde_json::to_string(&d).unwrap();
assert_eq!(s, "{\"decision\":\"deny\",\"reason\":\"missing_scope\"}");
let parsed: AuditDecision = serde_json::from_str(&s).unwrap();
assert_eq!(parsed, d);
}
#[test]
fn scope_check_marker_into_kind() {
let k: AuditRecordKind = ScopeCheck.into();
assert_eq!(k, AuditRecordKind::ScopeCheck);
}
#[test]
fn forward_policy_applied_marker_into_kind() {
let k: AuditRecordKind = ForwardPolicyApplied.into();
assert_eq!(k, AuditRecordKind::ForwardPolicyApplied);
}
#[test]
fn user_id_round_trips_as_bare_string() {
let u = UserId::new("alice");
let s = serde_json::to_string(&u).unwrap();
assert_eq!(s, "\"alice\"");
let parsed: UserId = serde_json::from_str(&s).unwrap();
assert_eq!(parsed, u);
}
#[test]
fn session_id_round_trips_as_bare_string() {
let s = SessionId::new("sess-1");
let json = serde_json::to_string(&s).unwrap();
assert_eq!(json, "\"sess-1\"");
let parsed: SessionId = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, s);
}
#[test]
fn role_name_round_trips_as_bare_string() {
let r = RoleName::new("admin");
let s = serde_json::to_string(&r).unwrap();
assert_eq!(s, "\"admin\"");
let parsed: RoleName = serde_json::from_str(&s).unwrap();
assert_eq!(parsed, r);
}
#[test]
fn sensitive_field_round_trips_as_bare_string() {
let f = SensitiveField::new("password");
let s = serde_json::to_string(&f).unwrap();
assert_eq!(s, "\"password\"");
let parsed: SensitiveField = serde_json::from_str(&s).unwrap();
assert_eq!(parsed, f);
}
#[test]
fn newtypes_are_display() {
assert_eq!(format!("{}", UserId::new("alice")), "alice");
assert_eq!(format!("{}", SessionId::new("s1")), "s1");
assert_eq!(format!("{}", RoleName::new("admin")), "admin");
}
#[test]
fn audit_record_scope_check_round_trips_byte_for_byte() {
let record = scope_check_record();
let first = serde_json::to_string(&record).unwrap();
let parsed: AuditRecord = serde_json::from_str(&first).unwrap();
let second = serde_json::to_string(&parsed).unwrap();
assert_eq!(first, second, "round-trip not byte-equal");
}
#[test]
fn audit_record_forward_policy_round_trips_byte_for_byte() {
let record = forward_policy_record();
let first = serde_json::to_string(&record).unwrap();
let parsed: AuditRecord = serde_json::from_str(&first).unwrap();
let second = serde_json::to_string(&parsed).unwrap();
assert_eq!(first, second, "round-trip not byte-equal");
assert_eq!(parsed.kind, AuditRecordKind::ForwardPolicyApplied);
assert_eq!(parsed.originator.as_ref().map(|u| u.as_str()), Some("bob"));
assert_eq!(
parsed.session_id.as_ref().map(|s| s.as_str()),
Some("sess-99")
);
assert_eq!(parsed.invocation_chain.len(), 2);
assert_eq!(parsed.roles, vec![RoleName::new("editor")]);
assert_eq!(
parsed.decision,
AuditDecision::Deny {
reason: AuditDenyReason::MissingScope
}
);
assert_eq!(parsed.latency_us, 456);
assert_eq!(
parsed.policy_name,
Some(crate::forward::IDENTITY_ONLY_NAME)
);
assert_eq!(parsed.derivation, Some(ForwardDerivation::IDENTITY_ONLY));
assert_eq!(parsed.caller_ns.as_deref(), Some("ns.caller"));
}
#[test]
fn audit_record_carries_principal_chain() {
let record = forward_policy_record();
assert_eq!(record.invocation_chain.len(), 2);
}
#[test]
fn audit_record_clone_yields_independent_value() {
let record = scope_check_record();
let copy = record.clone();
assert_eq!(copy.latency_us, record.latency_us);
assert_eq!(copy.correlation_id, record.correlation_id);
}
#[test]
fn audit_sink_is_object_safe_via_arc_dyn() {
let _sink: Arc<dyn AuditSink> = Arc::new(TracingAuditSink::new());
}
#[test]
fn tracing_audit_sink_arc_constructs() {
let _sink: Arc<dyn AuditSink> = TracingAuditSink::arc();
}
#[tokio::test]
#[tracing_test::traced_test]
async fn tracing_audit_sink_emits_info_event() {
let sink = TracingAuditSink::new();
sink.write(scope_check_record()).await;
assert!(logs_contain("audit"));
assert!(logs_contain("plexus::audit"));
}
#[tokio::test]
#[tracing_test::traced_test]
async fn tracing_audit_sink_emits_for_deny_records() {
let sink = TracingAuditSink::new();
sink.write(forward_policy_record()).await;
assert!(logs_contain("audit"));
assert!(logs_contain("MissingScope"));
}
struct FailingSink {
attempted: std::sync::atomic::AtomicBool,
}
#[async_trait]
impl AuditSink for FailingSink {
async fn write(&self, _record: AuditRecord) {
self.attempted
.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
#[tokio::test]
async fn failing_sink_does_not_propagate_error_to_caller() {
let sink = FailingSink {
attempted: std::sync::atomic::AtomicBool::new(false),
};
sink.write(scope_check_record()).await;
assert!(sink.attempted.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test]
#[tracing_test::traced_test]
async fn panicking_sink_panic_is_observable_via_join_handle() {
struct PanickingSink;
#[async_trait]
impl AuditSink for PanickingSink {
async fn write(&self, record: AuditRecord) {
panic!("sink down (correlation_id={})", record.correlation_id);
}
}
let sink: Arc<dyn AuditSink> = Arc::new(PanickingSink);
let record = scope_check_record();
let correlation_id = record.correlation_id;
let sink_for_task = Arc::clone(&sink);
let record_for_task = record.clone();
let handle = tokio::spawn(async move {
sink_for_task.write(record_for_task).await;
});
let join_result = handle.await;
let join_err = join_result.expect_err("sink panic should surface as task panic");
assert!(join_err.is_panic());
tracing::error!(
target: "plexus::audit",
%correlation_id,
"audit sink panicked; record dropped"
);
assert!(logs_contain("audit sink panicked"));
assert!(logs_contain(&correlation_id.to_string()));
}
}