1use serde::{Deserialize, Serialize};
4
5use super::{error::MantaError, jwt_ops, kafka::Kafka};
6
7#[derive(Serialize, Deserialize, Debug, Clone)]
8pub struct Auditor {
10 pub kafka: Kafka,
13}
14
15pub trait Audit {
17 #[allow(async_fn_in_trait)]
22 async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
23}
24
25async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
31 let msg_data = match serde_json::to_string(&msg_json) {
32 Ok(data) => data,
33 Err(e) => {
34 tracing::warn!("Failed serializing audit message: {}", e);
35 return;
36 }
37 };
38
39 if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
40 tracing::warn!("Failed producing audit message: {}", e);
41 }
42}
43
44pub(crate) fn build_audit_message(
50 token: &str,
51 message: impl Into<String>,
52 host: Option<serde_json::Value>,
53 group: Option<serde_json::Value>,
54) -> serde_json::Value {
55 let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
56 tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
57 String::new()
58 });
59 let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
60 tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
61 String::new()
62 });
63
64 let mut msg = serde_json::json!({
65 "user": {"id": user_id, "name": username},
66 "message": message.into(),
67 });
68
69 if let Some(h) = host {
70 msg["host"] = serde_json::json!({"hostname": h});
71 }
72 if let Some(g) = group {
73 msg["group"] = g;
74 }
75
76 msg
77}
78
79pub async fn send_audit(
88 kafka: &Kafka,
89 token: &str,
90 message: impl Into<String>,
91 host: Option<serde_json::Value>,
92 group: Option<serde_json::Value>,
93) {
94 send_audit_message(kafka, build_audit_message(token, message, host, group))
95 .await;
96}
97
98pub async fn maybe_send_audit(
104 kafka_opt: Option<&Kafka>,
105 token: &str,
106 message: impl Into<String>,
107 host: Option<serde_json::Value>,
108 group: Option<serde_json::Value>,
109) {
110 if let Some(kafka) = kafka_opt {
111 send_audit(kafka, token, message, host, group).await;
112 }
113}
114
115pub(crate) fn build_auth_audit_message(
120 outcome: &str,
121 username: &str,
122 source_ip: &str,
123 site: &str,
124) -> serde_json::Value {
125 serde_json::json!({
126 "event": "auth_attempt",
127 "outcome": outcome,
128 "username": username,
129 "source_ip": source_ip,
130 "site": site,
131 })
132}
133
134pub async fn send_auth_audit(
143 kafka_opt: Option<&Kafka>,
144 outcome: &str,
145 username: &str,
146 source_ip: &str,
147 site: &str,
148) {
149 let Some(kafka) = kafka_opt else { return };
150 send_audit_message(
151 kafka,
152 build_auth_audit_message(outcome, username, source_ip, site),
153 )
154 .await;
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use serde_json::json;
161
162 fn jwt_with(name: &str, preferred_username: &str) -> String {
165 use base64::prelude::*;
166 let header = BASE64_URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
167 let body = BASE64_URL_SAFE_NO_PAD.encode(
168 json!({"name": name, "preferred_username": preferred_username})
169 .to_string(),
170 );
171 format!("{header}.{body}.sig")
172 }
173
174 #[test]
177 fn audit_includes_user_message_keys_unconditionally() {
178 let msg = build_audit_message(
179 &jwt_with("Alice", "alice"),
180 "deleted node",
181 None,
182 None,
183 );
184 assert_eq!(msg["user"]["name"], "Alice");
185 assert_eq!(msg["user"]["id"], "alice");
186 assert_eq!(msg["message"], "deleted node");
187 assert!(msg.get("host").is_none(), "host must be omitted when None");
189 assert!(
190 msg.get("group").is_none(),
191 "group must be omitted when None"
192 );
193 }
194
195 #[test]
196 fn audit_wraps_host_in_hostname_object() {
197 let msg = build_audit_message(
201 &jwt_with("a", "a"),
202 "m",
203 Some(json!("x3000c0s1b0n0")),
204 None,
205 );
206 assert_eq!(msg["host"], json!({"hostname": "x3000c0s1b0n0"}));
207 }
208
209 #[test]
210 fn audit_inserts_group_value_as_is() {
211 let group = json!({"name": "compute", "members": ["x1", "x2"]});
215 let msg =
216 build_audit_message(&jwt_with("a", "a"), "m", None, Some(group.clone()));
217 assert_eq!(msg["group"], group);
218 }
219
220 #[test]
221 fn audit_falls_back_to_empty_strings_on_malformed_jwt() {
222 let msg = build_audit_message("nodots", "m", None, None);
226 assert_eq!(msg["user"]["name"], "");
227 assert_eq!(msg["user"]["id"], "");
228 assert_eq!(msg["message"], "m");
229 }
230
231 #[test]
232 fn audit_does_not_leak_the_token_into_the_payload() {
233 let token = jwt_with("Alice", "alice");
237 let msg = build_audit_message(&token, "deleted node", None, None);
238 let json = serde_json::to_string(&msg).unwrap();
239 assert!(
240 !json.contains(&token),
241 "audit payload must not contain the raw JWT"
242 );
243 }
247
248 #[test]
251 fn auth_audit_has_expected_wire_shape() {
252 let msg = build_auth_audit_message("success", "alice", "10.0.0.1", "alps");
253 assert_eq!(msg["event"], "auth_attempt");
254 assert_eq!(msg["outcome"], "success");
255 assert_eq!(msg["username"], "alice");
256 assert_eq!(msg["source_ip"], "10.0.0.1");
257 assert_eq!(msg["site"], "alps");
258 }
259
260 #[test]
261 fn auth_audit_payload_has_no_password_field_by_construction() {
262 let msg = build_auth_audit_message("failure", "alice", "10.0.0.1", "alps");
265 let obj = msg.as_object().expect("payload is an object");
266 for forbidden in ["password", "passwd", "secret", "token"] {
267 assert!(
268 !obj.contains_key(forbidden),
269 "auth audit payload must not contain `{forbidden}`"
270 );
271 }
272 }
273
274 #[test]
275 fn auth_audit_handles_empty_strings_without_panicking() {
276 let msg = build_auth_audit_message("failure", "", "", "");
280 assert_eq!(msg["username"], "");
281 assert_eq!(msg["source_ip"], "");
282 assert_eq!(msg["site"], "");
283 assert_eq!(msg["event"], "auth_attempt");
284 }
285}