manta_shared/common/
audit.rs1use serde::{Deserialize, Serialize};
4
5use super::{error::MantaError, jwt_ops, kafka::Kafka};
6
7#[derive(Serialize, Deserialize, Debug, Clone)]
8pub struct Auditor {
10 pub kafka: Kafka,
11}
12
13pub trait Audit {
15 #[allow(async_fn_in_trait)]
16 async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
17}
18
19async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
25 let msg_data = match serde_json::to_string(&msg_json) {
26 Ok(data) => data,
27 Err(e) => {
28 tracing::warn!("Failed serializing audit message: {}", e);
29 return;
30 }
31 };
32
33 if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
34 tracing::warn!("Failed producing audit message: {}", e);
35 }
36}
37
38pub async fn send_audit(
47 kafka: &Kafka,
48 token: &str,
49 message: impl Into<String>,
50 host: Option<serde_json::Value>,
51 group: Option<serde_json::Value>,
52) {
53 let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
54 tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
55 String::new()
56 });
57 let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
58 tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
59 String::new()
60 });
61
62 let mut msg = serde_json::json!({
63 "user": {"id": user_id, "name": username},
64 "message": message.into(),
65 });
66
67 if let Some(h) = host {
68 msg["host"] = serde_json::json!({"hostname": h});
69 }
70 if let Some(g) = group {
71 msg["group"] = g;
72 }
73
74 send_audit_message(kafka, msg).await;
75}
76
77pub async fn maybe_send_audit(
83 kafka_opt: Option<&Kafka>,
84 token: &str,
85 message: impl Into<String>,
86 host: Option<serde_json::Value>,
87 group: Option<serde_json::Value>,
88) {
89 if let Some(kafka) = kafka_opt {
90 send_audit(kafka, token, message, host, group).await;
91 }
92}
93
94pub async fn send_auth_audit(
103 kafka_opt: Option<&Kafka>,
104 outcome: &str,
105 username: &str,
106 source_ip: &str,
107 site: &str,
108) {
109 let Some(kafka) = kafka_opt else { return };
110 let msg = serde_json::json!({
111 "event": "auth_attempt",
112 "outcome": outcome,
113 "username": username,
114 "source_ip": source_ip,
115 "site": site,
116 });
117 send_audit_message(kafka, msg).await;
118}