use axum::extract::State;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Serialize, Deserialize)]
pub struct AuditLog {
#[serde(rename = "@timestamp")]
pub timestamp: DateTime<Utc>,
pub event_type: String,
pub user_id: String,
pub user_role: String,
pub api_key_id: Option<String>,
pub client_ip: String,
pub user_agent: Option<String>,
pub action: String,
pub resource: Option<String>,
pub result: String,
pub error_message: Option<String>,
pub session_id: Option<String>,
pub geo_location: Option<GeoLocation>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GeoLocation {
pub country: String,
pub city: String,
pub latitude: f64,
pub longitude: f64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SessionInfo {
pub session_id: String,
pub user_id: String,
pub api_key_id: Option<String>,
pub client_ip: String,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub requests_count: u64,
pub user_agent: Option<String>,
}
pub async fn log_auth_event(
state: &Arc<crate::AppState>,
user_id: String,
client_ip: String,
success: bool,
error_msg: Option<String>,
) {
let audit_log = AuditLog {
timestamp: Utc::now(),
event_type: "authentication".to_string(),
user_id: user_id.clone(),
user_role: "unknown".to_string(),
api_key_id: None,
client_ip: client_ip.clone(),
user_agent: None,
action: "login".to_string(),
resource: None,
result: if success { "success" } else { "failure" }.to_string(),
error_message: error_msg,
session_id: None,
geo_location: get_geo_location(&client_ip).await,
};
let index_name = format!("opensearch-api-audit-{}", Utc::now().format("%Y.%m"));
let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);
let _ = state.client.post(&url).json(&audit_log).send().await;
}
pub async fn log_access(
state: &Arc<crate::AppState>,
user: &crate::auth::AuthUser,
client_ip: String,
action: String,
resource: String,
success: bool,
) {
let audit_log = AuditLog {
timestamp: Utc::now(),
event_type: "api_access".to_string(),
user_id: user.id.clone(),
user_role: user.role.clone(),
api_key_id: if user.id.starts_with("api_key_") {
Some(user.id.clone())
} else {
None
},
client_ip: client_ip.clone(),
user_agent: None,
action,
resource: Some(resource),
result: if success { "success" } else { "failure" }.to_string(),
error_message: None,
session_id: None,
geo_location: get_geo_location(&client_ip).await,
};
let index_name = format!("opensearch-api-audit-{}", Utc::now().format("%Y.%m"));
let url = format!("{}/{}/_doc", state.config.opensearch_url, index_name);
let _ = state.client.post(&url).json(&audit_log).send().await;
}
async fn get_geo_location(_ip: &str) -> Option<GeoLocation> {
None
}
pub async fn get_user_access_history(
state: &Arc<crate::AppState>,
user_id: &str,
days: u32,
) -> Result<Vec<AuditLog>, reqwest::Error> {
let query = serde_json::json!({
"query": {
"bool": {
"must": [
{"term": {"user_id.keyword": user_id}},
{"range": {"@timestamp": {"gte": format!("now-{}d", days)}}}
]
}
},
"sort": [{"@timestamp": {"order": "desc"}}],
"size": 1000
});
let index_pattern = "opensearch-api-audit-*";
let url = format!("{}/{}/_search", state.config.opensearch_url, index_pattern);
let response = state.client.post(&url).json(&query).send().await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
let empty_vec = Vec::new();
let hits = body["hits"]["hits"].as_array().unwrap_or(&empty_vec);
let logs: Vec<AuditLog> = hits
.iter()
.filter_map(|hit| serde_json::from_value(hit["_source"].clone()).ok())
.collect();
Ok(logs)
} else {
Ok(Vec::new())
}
}
pub async fn get_suspicious_ips(
state: &Arc<crate::AppState>,
threshold: u32,
) -> Result<Vec<SuspiciousIP>, reqwest::Error> {
let query = serde_json::json!({
"query": {
"bool": {
"must": [
{"term": {"result": "failure"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
"aggs": {
"by_ip": {
"terms": {
"field": "client_ip.keyword",
"min_doc_count": threshold,
"size": 100
}
}
},
"size": 0
});
let index_pattern = "opensearch-api-audit-*";
let url = format!("{}/{}/_search", state.config.opensearch_url, index_pattern);
let response = state.client.post(&url).json(&query).send().await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
let empty_vec = Vec::new();
let buckets = body["aggregations"]["by_ip"]["buckets"]
.as_array()
.unwrap_or(&empty_vec);
let suspicious_ips: Vec<SuspiciousIP> = buckets
.iter()
.map(|bucket| SuspiciousIP {
ip: bucket["key"].as_str().unwrap_or("").to_string(),
failed_attempts: bucket["doc_count"].as_u64().unwrap_or(0) as u32,
last_attempt: Utc::now(), })
.collect();
Ok(suspicious_ips)
} else {
Ok(Vec::new())
}
}
#[derive(Debug, Serialize)]
pub struct SuspiciousIP {
pub ip: String,
pub failed_attempts: u32,
pub last_attempt: DateTime<Utc>,
}
pub async fn get_audit_logs(
auth_user: crate::auth::AuthUser,
State(state): State<Arc<crate::AppState>>,
) -> Result<axum::Json<AuditResponse>, crate::auth::AuthError> {
if auth_user.role != "admin" {
return Err(crate::auth::AuthError {
error: "forbidden".to_string(),
message: "Only admins can access audit logs".to_string(),
});
}
let query = serde_json::json!({
"query": {
"range": {
"@timestamp": {"gte": "now-24h"}
}
},
"sort": [{"@timestamp": {"order": "desc"}}],
"size": 100
});
let index_pattern = "opensearch-api-audit-*";
let url = format!("{}/{}/_search", state.config.opensearch_url, index_pattern);
let response = state
.client
.post(&url)
.json(&query)
.send()
.await
.map_err(|_| crate::auth::AuthError {
error: "internal_error".to_string(),
message: "Failed to fetch audit logs".to_string(),
})?;
let body: serde_json::Value = response.json().await.map_err(|_| crate::auth::AuthError {
error: "internal_error".to_string(),
message: "Failed to parse audit logs".to_string(),
})?;
let logs: Vec<AuditLog> = body["hits"]["hits"]
.as_array()
.unwrap_or(&Vec::new())
.iter()
.filter_map(|hit| serde_json::from_value(hit["_source"].clone()).ok())
.collect();
let stats = AuditStats {
total_requests: logs.len() as u64,
unique_users: logs
.iter()
.map(|l| &l.user_id)
.collect::<std::collections::HashSet<_>>()
.len() as u32,
unique_ips: logs
.iter()
.map(|l| &l.client_ip)
.collect::<std::collections::HashSet<_>>()
.len() as u32,
failed_attempts: logs.iter().filter(|l| l.result == "failure").count() as u32,
};
Ok(axum::Json(AuditResponse { logs, stats }))
}
#[derive(Serialize)]
pub struct AuditResponse {
pub logs: Vec<AuditLog>,
pub stats: AuditStats,
}
#[derive(Serialize)]
pub struct AuditStats {
pub total_requests: u64,
pub unique_users: u32,
pub unique_ips: u32,
pub failed_attempts: u32,
}