use serde::{Deserialize, Serialize};
use super::{error::MantaError, jwt_ops, kafka::Kafka};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Auditor {
pub kafka: Kafka,
}
pub trait Audit {
#[allow(async_fn_in_trait)]
async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
}
async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
let msg_data = match serde_json::to_string(&msg_json) {
Ok(data) => data,
Err(e) => {
tracing::warn!("Failed serializing audit message: {}", e);
return;
}
};
if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
tracing::warn!("Failed producing audit message: {}", e);
}
}
pub(crate) fn build_audit_message(
token: &str,
message: impl Into<String>,
host: Option<serde_json::Value>,
group: Option<serde_json::Value>,
) -> serde_json::Value {
let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
String::new()
});
let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
String::new()
});
let mut msg = serde_json::json!({
"user": {"id": user_id, "name": username},
"message": message.into(),
});
if let Some(h) = host {
msg["host"] = serde_json::json!({"hostname": h});
}
if let Some(g) = group {
msg["group"] = g;
}
msg
}
pub async fn send_audit(
kafka: &Kafka,
token: &str,
message: impl Into<String>,
host: Option<serde_json::Value>,
group: Option<serde_json::Value>,
) {
send_audit_message(kafka, build_audit_message(token, message, host, group))
.await;
}
pub async fn maybe_send_audit(
kafka_opt: Option<&Kafka>,
token: &str,
message: impl Into<String>,
host: Option<serde_json::Value>,
group: Option<serde_json::Value>,
) {
if let Some(kafka) = kafka_opt {
send_audit(kafka, token, message, host, group).await;
}
}
pub(crate) fn build_auth_audit_message(
outcome: &str,
username: &str,
source_ip: &str,
site: &str,
) -> serde_json::Value {
serde_json::json!({
"event": "auth_attempt",
"outcome": outcome,
"username": username,
"source_ip": source_ip,
"site": site,
})
}
pub async fn send_auth_audit(
kafka_opt: Option<&Kafka>,
outcome: &str,
username: &str,
source_ip: &str,
site: &str,
) {
let Some(kafka) = kafka_opt else { return };
send_audit_message(
kafka,
build_auth_audit_message(outcome, username, source_ip, site),
)
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn jwt_with(name: &str, preferred_username: &str) -> String {
use base64::prelude::*;
let header =
BASE64_URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
let body = BASE64_URL_SAFE_NO_PAD.encode(
json!({"name": name, "preferred_username": preferred_username})
.to_string(),
);
format!("{}.{}.sig", header, body)
}
#[test]
fn audit_includes_user_message_keys_unconditionally() {
let msg = build_audit_message(
&jwt_with("Alice", "alice"),
"deleted node",
None,
None,
);
assert_eq!(msg["user"]["name"], "Alice");
assert_eq!(msg["user"]["id"], "alice");
assert_eq!(msg["message"], "deleted node");
assert!(msg.get("host").is_none(), "host must be omitted when None");
assert!(msg.get("group").is_none(), "group must be omitted when None");
}
#[test]
fn audit_wraps_host_in_hostname_object() {
let msg = build_audit_message(
&jwt_with("a", "a"),
"m",
Some(json!("x3000c0s1b0n0")),
None,
);
assert_eq!(msg["host"], json!({"hostname": "x3000c0s1b0n0"}));
}
#[test]
fn audit_inserts_group_value_as_is() {
let group = json!({"name": "compute", "members": ["x1", "x2"]});
let msg =
build_audit_message(&jwt_with("a", "a"), "m", None, Some(group.clone()));
assert_eq!(msg["group"], group);
}
#[test]
fn audit_falls_back_to_empty_strings_on_malformed_jwt() {
let msg = build_audit_message("nodots", "m", None, None);
assert_eq!(msg["user"]["name"], "");
assert_eq!(msg["user"]["id"], "");
assert_eq!(msg["message"], "m");
}
#[test]
fn audit_does_not_leak_the_token_into_the_payload() {
let token = jwt_with("Alice", "alice");
let msg =
build_audit_message(&token, "deleted node", None, None);
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains(&token),
"audit payload must not contain the raw JWT"
);
}
#[test]
fn auth_audit_has_expected_wire_shape() {
let msg = build_auth_audit_message(
"success",
"alice",
"10.0.0.1",
"alps",
);
assert_eq!(msg["event"], "auth_attempt");
assert_eq!(msg["outcome"], "success");
assert_eq!(msg["username"], "alice");
assert_eq!(msg["source_ip"], "10.0.0.1");
assert_eq!(msg["site"], "alps");
}
#[test]
fn auth_audit_payload_has_no_password_field_by_construction() {
let msg =
build_auth_audit_message("failure", "alice", "10.0.0.1", "alps");
let obj = msg.as_object().expect("payload is an object");
for forbidden in ["password", "passwd", "secret", "token"] {
assert!(
!obj.contains_key(forbidden),
"auth audit payload must not contain `{forbidden}`"
);
}
}
#[test]
fn auth_audit_handles_empty_strings_without_panicking() {
let msg = build_auth_audit_message("failure", "", "", "");
assert_eq!(msg["username"], "");
assert_eq!(msg["source_ip"], "");
assert_eq!(msg["site"], "");
assert_eq!(msg["event"], "auth_attempt");
}
}