Skip to main content

manta_shared/common/
audit.rs

1//! Audit trail helpers: build and send structured JSON messages to Kafka.
2
3use serde::{Deserialize, Serialize};
4
5use super::{error::MantaError, jwt_ops, kafka::Kafka};
6
7#[derive(Serialize, Deserialize, Debug, Clone)]
8/// Wraps a [`Kafka`] instance for sending audit messages.
9pub struct Auditor {
10  pub kafka: Kafka,
11}
12
13/// Trait for producing audit messages to a message broker.
14pub trait Audit {
15  #[allow(async_fn_in_trait)]
16  async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
17}
18
19/// Serialize a JSON audit message and send it to Kafka.
20///
21/// Logs a warning on failure instead of propagating the
22/// error, since audit failures should not abort the
23/// operation.
24async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
25  let msg_data = match serde_json::to_string(&msg_json) {
26    Ok(data) => data,
27    Err(e) => {
28      tracing::warn!("Failed serializing audit message: {}", e);
29      return;
30    }
31  };
32
33  if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
34    tracing::warn!("Failed producing audit message: {}", e);
35  }
36}
37
38/// Build and send an audit message to Kafka.
39///
40/// Extracts user identity from the JWT token (falling
41/// back to empty strings on parse failure) and
42/// constructs a JSON message with the provided fields.
43///
44/// Both `host` and `group` are optional — they are only
45/// included in the JSON if `Some`.
46pub async fn send_audit(
47  kafka: &Kafka,
48  token: &str,
49  message: impl Into<String>,
50  host: Option<serde_json::Value>,
51  group: Option<serde_json::Value>,
52) {
53  let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
54    tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
55    String::new()
56  });
57  let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
58    tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
59    String::new()
60  });
61
62  let mut msg = serde_json::json!({
63    "user": {"id": user_id, "name": username},
64    "message": message.into(),
65  });
66
67  if let Some(h) = host {
68    msg["host"] = serde_json::json!({"hostname": h});
69  }
70  if let Some(g) = group {
71    msg["group"] = g;
72  }
73
74  send_audit_message(kafka, msg).await;
75}
76
77/// Send an audit message if a Kafka instance is configured.
78///
79/// This is a convenience wrapper around [`send_audit`] that
80/// handles the common `if let Some(kafka) = kafka_opt { ... }`
81/// pattern found at every audit call site.
82pub async fn maybe_send_audit(
83  kafka_opt: Option<&Kafka>,
84  token: &str,
85  message: impl Into<String>,
86  host: Option<serde_json::Value>,
87  group: Option<serde_json::Value>,
88) {
89  if let Some(kafka) = kafka_opt {
90    send_audit(kafka, token, message, host, group).await;
91  }
92}
93
94/// Send a structured audit event for an `/api/v1/auth/token` attempt.
95///
96/// Used by the server's auth handler — there is no JWT yet (the user is
97/// asking for one), so identity is captured from the submitted username
98/// rather than extracted from a token. The password is never logged.
99///
100/// Always Kafka-only; failures log a warning and do not abort the
101/// outer auth flow.
102pub async fn send_auth_audit(
103  kafka_opt: Option<&Kafka>,
104  outcome: &str,
105  username: &str,
106  source_ip: &str,
107  site: &str,
108) {
109  let Some(kafka) = kafka_opt else { return };
110  let msg = serde_json::json!({
111    "event": "auth_attempt",
112    "outcome": outcome,
113    "username": username,
114    "source_ip": source_ip,
115    "site": site,
116  });
117  send_audit_message(kafka, msg).await;
118}