use anyhow::{Context, Result};
use manta_backend_dispatcher::interfaces::hsm::group::GroupTrait;
use serde::{Deserialize, Serialize};
use super::{jwt_ops, kafka::Kafka};
use crate::manta_backend_dispatcher::StaticBackendDispatcher;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Auditor {
pub kafka: Kafka,
}
pub trait Audit {
async fn produce_message(&self, data: &[u8]) -> Result<()>;
}
async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
let msg_data = match serde_json::to_string(&msg_json) {
Ok(data) => data,
Err(e) => {
log::warn!("Failed serializing audit message: {}", e);
return;
}
};
if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
log::warn!("Failed producing audit message: {}", e);
}
}
pub async fn send_audit(
kafka: &Kafka,
token: &str,
message: impl Into<String>,
host: Option<serde_json::Value>,
group: Option<serde_json::Value>,
) {
let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
log::warn!("Failed to extract user name from JWT for audit: {}", e);
String::new()
});
let user_id =
jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
log::warn!("Failed to extract user ID from JWT for audit: {}", e);
String::new()
});
let mut msg = serde_json::json!({
"user": {"id": user_id, "name": username},
"message": message.into(),
});
if let Some(h) = host {
msg["host"] = serde_json::json!({"hostname": h});
}
if let Some(g) = group {
msg["group"] = g;
}
send_audit_message(kafka, msg).await;
}
pub async fn maybe_send_audit(
kafka_opt: Option<&Kafka>,
token: &str,
message: impl Into<String>,
host: Option<serde_json::Value>,
group: Option<serde_json::Value>,
) {
if let Some(kafka) = kafka_opt {
send_audit(kafka, token, message, host, group).await;
}
}
pub async fn maybe_send_audit_with_group_lookup(
kafka_opt: Option<&Kafka>,
backend: &StaticBackendDispatcher,
token: &str,
message: impl Into<String>,
xnames: &[String],
) -> Result<()> {
if let Some(kafka) = kafka_opt {
let xname_refs: Vec<&str> =
xnames.iter().map(String::as_str).collect();
let group_map = backend
.get_group_map_and_filter_by_member_vec(token, &xname_refs)
.await
.context("Failed to get group map for audit")?;
send_audit(
kafka,
token,
message,
Some(serde_json::json!(xnames)),
Some(serde_json::json!(group_map.keys().collect::<Vec<_>>())),
)
.await;
}
Ok(())
}