use std::time::Duration;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, info, warn};
use moloch_core::event::AuditEvent;
pub use moloch_holocrypt::{
generate_keypair as generate_encryption_keypair, EncryptedEvent, EncryptedEventBuilder,
EncryptionPolicy, EventOpeningKey, EventSealingKey, FieldVisibility,
};
pub mod policies {
use super::{EncryptionPolicy, FieldVisibility};
pub fn tool_execution() -> EncryptionPolicy {
EncryptionPolicy {
event_type: FieldVisibility::Public,
actor: FieldVisibility::Encrypted,
resource: FieldVisibility::Encrypted,
outcome: FieldVisibility::Public,
metadata: FieldVisibility::Private,
timestamp: FieldVisibility::Public,
key_id: None,
}
}
pub fn key_lifecycle() -> EncryptionPolicy {
EncryptionPolicy {
event_type: FieldVisibility::Public,
actor: FieldVisibility::Encrypted,
resource: FieldVisibility::Encrypted,
outcome: FieldVisibility::Public,
metadata: FieldVisibility::Private,
timestamp: FieldVisibility::Public,
key_id: None,
}
}
pub fn user_prompt() -> EncryptionPolicy {
EncryptionPolicy {
event_type: FieldVisibility::Encrypted,
actor: FieldVisibility::Private,
resource: FieldVisibility::Private,
outcome: FieldVisibility::Public,
metadata: FieldVisibility::Private,
timestamp: FieldVisibility::Public,
key_id: None,
}
}
pub fn maximum_privacy() -> EncryptionPolicy {
EncryptionPolicy::all_private()
}
pub fn transparent() -> EncryptionPolicy {
EncryptionPolicy::all_public()
}
}
#[derive(Debug, Error)]
pub enum AuditClientError {
#[error("HTTP error: {0}")]
Http(String),
#[error("server error: {status} - {message}")]
ServerError {
status: u16,
message: String,
},
#[error("event rejected: {0}")]
Rejected(String),
#[error("serialization error: {0}")]
Serialization(String),
#[error("connection failed: {0}")]
Connection(String),
#[error("request timed out")]
Timeout,
}
pub type Result<T> = std::result::Result<T, AuditClientError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditClientConfig {
pub endpoint: String,
pub timeout_ms: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
}
impl Default for AuditClientConfig {
fn default() -> Self {
Self {
endpoint: "http://localhost:8090".to_string(),
timeout_ms: 30_000,
max_retries: 3,
retry_delay_ms: 100,
}
}
}
impl AuditClientConfig {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
..Default::default()
}
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_retries(mut self, max_retries: u32, retry_delay_ms: u64) -> Self {
self.max_retries = max_retries;
self.retry_delay_ms = retry_delay_ms;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitEventRequest {
pub event: AuditEvent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmitEventResponse {
pub id: String,
pub accepted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
pub struct AuditClient {
config: AuditClientConfig,
http_client: reqwest::Client,
}
impl AuditClient {
pub fn new(config: AuditClientConfig) -> Self {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_millis(config.timeout_ms))
.build()
.expect("failed to build HTTP client");
Self {
config,
http_client,
}
}
pub async fn submit(&self, event: AuditEvent) -> Result<SubmitEventResponse> {
event
.validate()
.map_err(|e| AuditClientError::Rejected(format!("invalid event signature: {e}")))?;
self.submit_with_retry(event).await
}
pub async fn submit_batch(&self, events: Vec<AuditEvent>) -> Vec<Result<SubmitEventResponse>> {
let mut results = Vec::with_capacity(events.len());
for event in events {
results.push(self.submit(event).await);
}
results
}
pub async fn submit_encrypted(&self, event: EncryptedEvent) -> Result<SubmitEventResponse> {
let url = format!("{}/v1/events/encrypted", self.config.endpoint);
debug!(url = %url, "submitting encrypted event");
let response = self
.http_client
.post(&url)
.json(&event)
.send()
.await
.map_err(|e| {
if e.is_timeout() {
AuditClientError::Timeout
} else if e.is_connect() {
AuditClientError::Connection(e.to_string())
} else {
AuditClientError::Http(e.to_string())
}
})?;
let status = response.status();
if status.is_success() {
response
.json::<SubmitEventResponse>()
.await
.map_err(|e| AuditClientError::Serialization(e.to_string()))
} else {
let body = response.text().await.unwrap_or_default();
Err(AuditClientError::ServerError {
status: status.as_u16(),
message: body,
})
}
}
async fn submit_with_retry(&self, event: AuditEvent) -> Result<SubmitEventResponse> {
let mut last_error = None;
let mut delay = Duration::from_millis(self.config.retry_delay_ms);
for attempt in 0..=self.config.max_retries {
if attempt > 0 {
warn!(attempt, "retrying event submission after delay");
tokio::time::sleep(delay).await;
delay *= 2; }
match self.submit_once(&event).await {
Ok(response) => {
if response.accepted {
info!(event_id = %response.id, "event submitted successfully");
return Ok(response);
} else {
return Err(AuditClientError::Rejected(
response
.message
.unwrap_or_else(|| "unknown reason".to_string()),
));
}
},
Err(e) => {
if matches!(
e,
AuditClientError::Connection(_) | AuditClientError::Timeout
) {
last_error = Some(e);
} else {
return Err(e);
}
},
}
}
Err(last_error.unwrap_or(AuditClientError::Timeout))
}
async fn submit_once(&self, event: &AuditEvent) -> Result<SubmitEventResponse> {
let url = format!("{}/v1/events", self.config.endpoint);
let request = SubmitEventRequest {
event: event.clone(),
};
debug!(url = %url, event_id = %hex::encode(event.id().0.as_bytes()), "submitting event");
let response = self
.http_client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| {
if e.is_timeout() {
AuditClientError::Timeout
} else if e.is_connect() {
AuditClientError::Connection(e.to_string())
} else {
AuditClientError::Http(e.to_string())
}
})?;
let status = response.status();
if status.is_success() {
response
.json::<SubmitEventResponse>()
.await
.map_err(|e| AuditClientError::Serialization(e.to_string()))
} else {
let body = response.text().await.unwrap_or_default();
Err(AuditClientError::ServerError {
status: status.as_u16(),
message: body,
})
}
}
pub fn config(&self) -> &AuditClientConfig {
&self.config
}
}
impl std::fmt::Debug for AuditClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditClient")
.field("endpoint", &self.config.endpoint)
.field("timeout_ms", &self.config.timeout_ms)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = AuditClientConfig::default();
assert_eq!(config.endpoint, "http://localhost:8090");
assert_eq!(config.timeout_ms, 30_000);
assert_eq!(config.max_retries, 3);
}
#[test]
fn test_config_builder() {
let config = AuditClientConfig::new("http://moloch:8090")
.with_timeout(5_000)
.with_retries(5, 200);
assert_eq!(config.endpoint, "http://moloch:8090");
assert_eq!(config.timeout_ms, 5_000);
assert_eq!(config.max_retries, 5);
assert_eq!(config.retry_delay_ms, 200);
}
#[test]
fn test_client_creation() {
let config = AuditClientConfig::default();
let client = AuditClient::new(config);
assert_eq!(client.config().endpoint, "http://localhost:8090");
}
#[test]
fn test_debug_output() {
let config = AuditClientConfig::default();
let client = AuditClient::new(config);
let debug_str = format!("{:?}", client);
assert!(debug_str.contains("AuditClient"));
assert!(debug_str.contains("localhost:8090"));
}
}