manta-shared 2.0.0-beta.5

Shared types and pure helpers used by both manta-cli and manta-server.
Documentation
//! Audit trail helpers: build and send structured JSON messages to Kafka.

use serde::{Deserialize, Serialize};

use super::{error::MantaError, jwt_ops, kafka::Kafka};

#[derive(Serialize, Deserialize, Debug, Clone)]
/// Wraps a [`Kafka`] instance for sending audit messages.
pub struct Auditor {
  pub kafka: Kafka,
}

/// Trait for producing audit messages to a message broker.
pub trait Audit {
  #[allow(async_fn_in_trait)]
  async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError>;
}

/// Serialize a JSON audit message and send it to Kafka.
///
/// Logs a warning on failure instead of propagating the
/// error, since audit failures should not abort the
/// operation.
async fn send_audit_message(kafka: &Kafka, msg_json: serde_json::Value) {
  let msg_data = match serde_json::to_string(&msg_json) {
    Ok(data) => data,
    Err(e) => {
      tracing::warn!("Failed serializing audit message: {}", e);
      return;
    }
  };

  if let Err(e) = kafka.produce_message(msg_data.as_bytes()).await {
    tracing::warn!("Failed producing audit message: {}", e);
  }
}

/// Build and send an audit message to Kafka.
///
/// Extracts user identity from the JWT token (falling
/// back to empty strings on parse failure) and
/// constructs a JSON message with the provided fields.
///
/// Both `host` and `group` are optional — they are only
/// included in the JSON if `Some`.
pub async fn send_audit(
  kafka: &Kafka,
  token: &str,
  message: impl Into<String>,
  host: Option<serde_json::Value>,
  group: Option<serde_json::Value>,
) {
  let username = jwt_ops::get_name(token).unwrap_or_else(|e| {
    tracing::warn!("Failed to extract user name from JWT for audit: {}", e);
    String::new()
  });
  let user_id = jwt_ops::get_preferred_username(token).unwrap_or_else(|e| {
    tracing::warn!("Failed to extract user ID from JWT for audit: {}", e);
    String::new()
  });

  let mut msg = serde_json::json!({
    "user": {"id": user_id, "name": username},
    "message": message.into(),
  });

  if let Some(h) = host {
    msg["host"] = serde_json::json!({"hostname": h});
  }
  if let Some(g) = group {
    msg["group"] = g;
  }

  send_audit_message(kafka, msg).await;
}

/// Send an audit message if a Kafka instance is configured.
///
/// This is a convenience wrapper around [`send_audit`] that
/// handles the common `if let Some(kafka) = kafka_opt { ... }`
/// pattern found at every audit call site.
pub async fn maybe_send_audit(
  kafka_opt: Option<&Kafka>,
  token: &str,
  message: impl Into<String>,
  host: Option<serde_json::Value>,
  group: Option<serde_json::Value>,
) {
  if let Some(kafka) = kafka_opt {
    send_audit(kafka, token, message, host, group).await;
  }
}

/// Send a structured audit event for an `/api/v1/auth/token` attempt.
///
/// Used by the server's auth handler — there is no JWT yet (the user is
/// asking for one), so identity is captured from the submitted username
/// rather than extracted from a token. The password is never logged.
///
/// Always Kafka-only; failures log a warning and do not abort the
/// outer auth flow.
pub async fn send_auth_audit(
  kafka_opt: Option<&Kafka>,
  outcome: &str,
  username: &str,
  source_ip: &str,
  site: &str,
) {
  let Some(kafka) = kafka_opt else { return };
  let msg = serde_json::json!({
    "event": "auth_attempt",
    "outcome": outcome,
    "username": username,
    "source_ip": source_ip,
    "site": site,
  });
  send_audit_message(kafka, msg).await;
}