Skip to main content

manta_shared/common/
audit.rs

1//! Audit trail helpers: build and send structured JSON messages to Kafka.
2
3use serde::{Deserialize, Serialize};
4
5use super::{error::MantaError, jwt_ops, kafka::Kafka};
6
7#[derive(Serialize, Deserialize, Debug, Clone)]
8/// Wraps a [`Kafka`] instance for sending audit messages.
9pub struct Auditor {
10  /// Kafka producer configured from `[auditor.kafka]` in the binary's
11  /// config file.
12  pub kafka: Kafka,
13}
14
15/// Trait for producing audit messages to a message broker.
16pub trait Audit {
17  /// Publish a single audit message payload. Implementations are
18  /// expected to be fire-and-forget — failures should be logged but
19  /// not propagated to the caller, since audit failures must not
20  /// abort the outer operation.
21  #[allow(async_fn_in_trait)]
22  async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
23}
24
25/// Serialize a JSON audit message and send it to Kafka.
26///
27/// Logs a warning on failure instead of propagating the
28/// error, since audit failures should not abort the
29/// operation.
30async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
31  let msg_data = match serde_json::to_string(&msg_json) {
32    Ok(data) => data,
33    Err(e) => {
34      tracing::warn!("Failed serializing audit message: {}", e);
35      return;
36    }
37  };
38
39  if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
40    tracing::warn!("Failed producing audit message: {}", e);
41  }
42}
43
44/// Build the JSON payload that [`send_audit`] sends to Kafka.
45///
46/// Split out as a pure function so unit tests can pin the wire
47/// shape (key names, optional-field inclusion, JWT fallback) without
48/// needing a Kafka broker.
49pub(crate) fn build_audit_message(
50  token: &str,
51  message: impl Into<String>,
52  host: Option<serde_json::Value>,
53  group: Option<serde_json::Value>,
54) -> serde_json::Value {
55  let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
56    tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
57    String::new()
58  });
59  let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
60    tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
61    String::new()
62  });
63
64  let mut msg = serde_json::json!({
65    "user": {"id": user_id, "name": username},
66    "message": message.into(),
67  });
68
69  if let Some(h) = host {
70    msg["host"] = serde_json::json!({"hostname": h});
71  }
72  if let Some(g) = group {
73    msg["group"] = g;
74  }
75
76  msg
77}
78
79/// Build and send an audit message to Kafka.
80///
81/// Extracts user identity from the JWT token (falling
82/// back to empty strings on parse failure) and
83/// constructs a JSON message with the provided fields.
84///
85/// Both `host` and `group` are optional — they are only
86/// included in the JSON if `Some`.
87pub async fn send_audit(
88  kafka: &Kafka,
89  token: &str,
90  message: impl Into<String>,
91  host: Option<serde_json::Value>,
92  group: Option<serde_json::Value>,
93) {
94  send_audit_message(kafka, build_audit_message(token, message, host, group))
95    .await;
96}
97
98/// Send an audit message if a Kafka instance is configured.
99///
100/// This is a convenience wrapper around [`send_audit`] that
101/// handles the common `if let Some(kafka) = kafka_opt { ... }`
102/// pattern found at every audit call site.
103pub async fn maybe_send_audit(
104  kafka_opt: Option<&Kafka>,
105  token: &str,
106  message: impl Into<String>,
107  host: Option<serde_json::Value>,
108  group: Option<serde_json::Value>,
109) {
110  if let Some(kafka) = kafka_opt {
111    send_audit(kafka, token, message, host, group).await;
112  }
113}
114
115/// Build the JSON payload that [`send_auth_audit`] sends to Kafka.
116///
117/// Split out so unit tests can pin the wire shape (notably: NO
118/// password field, by construction — the function doesn't take one).
119pub(crate) fn build_auth_audit_message(
120  outcome: &str,
121  username: &str,
122  source_ip: &str,
123  site: &str,
124) -> serde_json::Value {
125  serde_json::json!({
126    "event": "auth_attempt",
127    "outcome": outcome,
128    "username": username,
129    "source_ip": source_ip,
130    "site": site,
131  })
132}
133
134/// Send a structured audit event for an `/api/v1/auth/token` attempt.
135///
136/// Used by the server's auth handler — there is no JWT yet (the user is
137/// asking for one), so identity is captured from the submitted username
138/// rather than extracted from a token. The password is never logged.
139///
140/// Always Kafka-only; failures log a warning and do not abort the
141/// outer auth flow.
142pub async fn send_auth_audit(
143  kafka_opt: Option<&Kafka>,
144  outcome: &str,
145  username: &str,
146  source_ip: &str,
147  site: &str,
148) {
149  let Some(kafka) = kafka_opt else { return };
150  send_audit_message(
151    kafka,
152    build_auth_audit_message(outcome, username, source_ip, site),
153  )
154  .await;
155}
156
157#[cfg(test)]
158mod tests {
159  use super::*;
160  use serde_json::json;
161
162  /// Build a minimal JWT whose `name` and `preferred_username` claims
163  /// can be extracted by `jwt_ops::get_name` / `get_preferred_username`.
164  fn jwt_with(name: &str, preferred_username: &str) -> String {
165    use base64::prelude::*;
166    let header = BASE64_URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
167    let body = BASE64_URL_SAFE_NO_PAD.encode(
168      json!({"name": name, "preferred_username": preferred_username})
169        .to_string(),
170    );
171    format!("{header}.{body}.sig")
172  }
173
174  // ---- build_audit_message ----
175
176  #[test]
177  fn audit_includes_user_message_keys_unconditionally() {
178    let msg = build_audit_message(
179      &jwt_with("Alice", "alice"),
180      "deleted node",
181      None,
182      None,
183    );
184    assert_eq!(msg["user"]["name"], "Alice");
185    assert_eq!(msg["user"]["id"], "alice");
186    assert_eq!(msg["message"], "deleted node");
187    // No `host` or `group` keys when both are None.
188    assert!(msg.get("host").is_none(), "host must be omitted when None");
189    assert!(
190      msg.get("group").is_none(),
191      "group must be omitted when None"
192    );
193  }
194
195  #[test]
196  fn audit_wraps_host_in_hostname_object() {
197    // Host is wrapped as `{"hostname": <provided>}` — a "simplification"
198    // that flattened this to `"host": <provided>` would break log
199    // ingestion downstream. Pin the structure.
200    let msg = build_audit_message(
201      &jwt_with("a", "a"),
202      "m",
203      Some(json!("x3000c0s1b0n0")),
204      None,
205    );
206    assert_eq!(msg["host"], json!({"hostname": "x3000c0s1b0n0"}));
207  }
208
209  #[test]
210  fn audit_inserts_group_value_as_is() {
211    // Group is passed through verbatim — caller is responsible for
212    // pre-shaping it. Pin so a future "wrap it like host" doesn't
213    // silently change the wire shape.
214    let group = json!({"name": "compute", "members": ["x1", "x2"]});
215    let msg =
216      build_audit_message(&jwt_with("a", "a"), "m", None, Some(group.clone()));
217    assert_eq!(msg["group"], group);
218  }
219
220  #[test]
221  fn audit_falls_back_to_empty_strings_on_malformed_jwt() {
222    // "nodots" can't be parsed; both jwt_ops calls return Err. The
223    // audit fallback turns those into empty strings rather than
224    // dropping the audit event entirely or panicking.
225    let msg = build_audit_message("nodots", "m", None, None);
226    assert_eq!(msg["user"]["name"], "");
227    assert_eq!(msg["user"]["id"], "");
228    assert_eq!(msg["message"], "m");
229  }
230
231  #[test]
232  fn audit_does_not_leak_the_token_into_the_payload() {
233    // Belt-and-braces: a refactor that accidentally embedded the
234    // whole token in the audit payload would be a security incident.
235    // Search the serialized JSON for the JWT body marker.
236    let token = jwt_with("Alice", "alice");
237    let msg = build_audit_message(&token, "deleted node", None, None);
238    let json = serde_json::to_string(&msg).unwrap();
239    assert!(
240      !json.contains(&token),
241      "audit payload must not contain the raw JWT"
242    );
243    // The base64-encoded JWT body contains "alic" (from "alice") — a
244    // bare substring check would false-positive on the username, so
245    // we check the full token string instead.
246  }
247
248  // ---- build_auth_audit_message ----
249
250  #[test]
251  fn auth_audit_has_expected_wire_shape() {
252    let msg = build_auth_audit_message("success", "alice", "10.0.0.1", "alps");
253    assert_eq!(msg["event"], "auth_attempt");
254    assert_eq!(msg["outcome"], "success");
255    assert_eq!(msg["username"], "alice");
256    assert_eq!(msg["source_ip"], "10.0.0.1");
257    assert_eq!(msg["site"], "alps");
258  }
259
260  #[test]
261  fn auth_audit_payload_has_no_password_field_by_construction() {
262    // The function doesn't take a password — pin via the wire shape
263    // that no `password` / `passwd` / `secret` key sneaks in.
264    let msg = build_auth_audit_message("failure", "alice", "10.0.0.1", "alps");
265    let obj = msg.as_object().expect("payload is an object");
266    for forbidden in ["password", "passwd", "secret", "token"] {
267      assert!(
268        !obj.contains_key(forbidden),
269        "auth audit payload must not contain `{forbidden}`"
270      );
271    }
272  }
273
274  #[test]
275  fn auth_audit_handles_empty_strings_without_panicking() {
276    // Some auth-failure paths pass empty source_ip or site (when not
277    // resolvable). The function should still produce a well-formed
278    // JSON object, not panic or omit keys.
279    let msg = build_auth_audit_message("failure", "", "", "");
280    assert_eq!(msg["username"], "");
281    assert_eq!(msg["source_ip"], "");
282    assert_eq!(msg["site"], "");
283    assert_eq!(msg["event"], "auth_attempt");
284  }
285}