allsource_core/infrastructure/web/
audit_api.rs1use crate::{
2 domain::{
3 entities::{Actor, AuditAction, AuditCategory, AuditEvent, AuditOutcome},
4 repositories::audit_event_repository::AuditEventQuery,
5 value_objects::TenantId,
6 },
7 error::{AllSourceError, Result},
8 infrastructure::{security::middleware::Admin, web::api_v1::AppState},
9};
10use axum::{Json, extract::State, http::StatusCode};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Deserialize)]
19pub struct LogAuditEventRequest {
20 pub tenant_id: String,
21 pub action: String,
22 pub actor_type: String,
23 pub actor_id: String,
24 pub actor_name: String,
25 pub outcome: Option<String>,
26 pub resource_type: Option<String>,
27 pub resource_id: Option<String>,
28 pub ip_address: Option<String>,
29 pub user_agent: Option<String>,
30 pub error_message: Option<String>,
31 pub metadata: Option<serde_json::Value>,
32}
33
34#[derive(Debug, Deserialize)]
35pub struct AuditQueryParams {
36 pub tenant_id: Option<String>,
37 pub user_id: Option<String>,
38 pub action: Option<String>,
39 pub category: Option<String>,
40 pub start: Option<DateTime<Utc>>,
41 pub end: Option<DateTime<Utc>>,
42 pub security_only: Option<bool>,
43 pub limit: Option<usize>,
44 pub offset: Option<usize>,
45}
46
47#[derive(Debug, Serialize)]
48pub struct AuditEventResponse {
49 pub id: String,
50 pub tenant_id: String,
51 pub timestamp: DateTime<Utc>,
52 pub action: AuditAction,
53 pub actor: Actor,
54 pub outcome: AuditOutcome,
55 pub resource_type: Option<String>,
56 pub resource_id: Option<String>,
57 pub ip_address: Option<String>,
58 pub error_message: Option<String>,
59 pub metadata: Option<serde_json::Value>,
60}
61
62impl From<&AuditEvent> for AuditEventResponse {
63 fn from(event: &AuditEvent) -> Self {
64 Self {
65 id: event.id().as_str(),
66 tenant_id: event.tenant_id().as_str().to_string(),
67 timestamp: *event.timestamp(),
68 action: event.action().clone(),
69 actor: event.actor().clone(),
70 outcome: event.outcome().clone(),
71 resource_type: event.resource_type().map(std::string::ToString::to_string),
72 resource_id: event.resource_id().map(std::string::ToString::to_string),
73 ip_address: event.ip_address().map(std::string::ToString::to_string),
74 error_message: event.error_message().map(std::string::ToString::to_string),
75 metadata: event.metadata().cloned(),
76 }
77 }
78}
79
80#[derive(Debug, Serialize)]
81pub struct AuditEventsResponse {
82 pub events: Vec<AuditEventResponse>,
83 pub total: usize,
84}
85
86pub async fn log_audit_event(
93 State(state): State<AppState>,
94 Admin(_): Admin,
95 Json(req): Json<LogAuditEventRequest>,
96) -> Result<(StatusCode, Json<serde_json::Value>)> {
97 let audit_repo = state
98 .service_container
99 .audit_repository()
100 .ok_or_else(|| AllSourceError::InternalError("Audit repository not configured".into()))?;
101
102 let tenant_id =
103 TenantId::new(req.tenant_id).map_err(|e| AllSourceError::InvalidInput(e.to_string()))?;
104
105 let action = parse_audit_action(&req.action)
106 .ok_or_else(|| AllSourceError::InvalidInput(format!("Unknown action: {}", req.action)))?;
107
108 let actor = match req.actor_type.as_str() {
109 "user" => Actor::user(req.actor_id, req.actor_name),
110 "api_key" => Actor::api_key(req.actor_id, req.actor_name),
111 "system" => Actor::system(req.actor_name),
112 _ => {
113 return Err(AllSourceError::InvalidInput(format!(
114 "Unknown actor_type: {}",
115 req.actor_type
116 )));
117 }
118 };
119
120 let outcome = match req.outcome.as_deref() {
121 Some("failure") => AuditOutcome::Failure,
122 Some("partial_success") => AuditOutcome::PartialSuccess,
123 _ => AuditOutcome::Success,
124 };
125
126 let mut event = AuditEvent::new(tenant_id, action, actor, outcome);
127
128 if let (Some(rt), Some(ri)) = (req.resource_type, req.resource_id) {
129 event = event.with_resource(rt, ri);
130 }
131 if let Some(ip) = req.ip_address {
132 event = event.with_ip_address(ip);
133 }
134 if let Some(ua) = req.user_agent {
135 event = event.with_user_agent(ua);
136 }
137 if let Some(err) = req.error_message {
138 event = event.with_error(err);
139 }
140 if let Some(meta) = req.metadata {
141 event = event.with_metadata(meta);
142 }
143
144 let event_id = event.id().as_str();
145 audit_repo.append(event).await?;
146
147 tracing::debug!("Audit event logged: {}", event_id);
148
149 Ok((
150 StatusCode::OK,
151 Json(serde_json::json!({
152 "id": event_id,
153 "saved": true,
154 })),
155 ))
156}
157
158pub async fn query_audit_events(
161 State(state): State<AppState>,
162 Admin(_): Admin,
163 axum::extract::Query(params): axum::extract::Query<AuditQueryParams>,
164) -> Result<Json<AuditEventsResponse>> {
165 let audit_repo = state
166 .service_container
167 .audit_repository()
168 .ok_or_else(|| AllSourceError::InternalError("Audit repository not configured".into()))?;
169
170 let tenant_id_str = params.tenant_id.as_deref().unwrap_or("default");
171 let tenant_id = TenantId::new(tenant_id_str.to_string())
172 .map_err(|e| AllSourceError::InvalidInput(e.to_string()))?;
173
174 let mut query = AuditEventQuery::new(tenant_id);
175
176 if let (Some(start), Some(end)) = (params.start, params.end) {
177 query = query.with_time_range(start, end);
178 }
179 if let Some(ref action_str) = params.action
180 && let Some(action) = parse_audit_action(action_str)
181 {
182 query = query.with_action(action);
183 }
184 if let Some(ref category_str) = params.category
185 && let Some(category) = parse_audit_category(category_str)
186 {
187 query = query.with_category(category);
188 }
189 if let Some(ref user_id) = params.user_id {
190 query = query.with_actor(format!("user:{user_id}"));
191 }
192 if params.security_only.unwrap_or(false) {
193 query = query.security_only();
194 }
195
196 let limit = params.limit.unwrap_or(100);
197 let offset = params.offset.unwrap_or(0);
198 query = query.with_pagination(limit, offset);
199
200 let events = audit_repo.query(query.clone()).await?;
201 let total = audit_repo.count(query).await?;
202
203 let event_responses: Vec<AuditEventResponse> =
204 events.iter().map(AuditEventResponse::from).collect();
205
206 Ok(Json(AuditEventsResponse {
207 events: event_responses,
208 total,
209 }))
210}
211
212fn parse_audit_action(s: &str) -> Option<AuditAction> {
217 match s {
218 "login" => Some(AuditAction::Login),
219 "logout" => Some(AuditAction::Logout),
220 "login_failed" => Some(AuditAction::LoginFailed),
221 "token_refreshed" => Some(AuditAction::TokenRefreshed),
222 "password_changed" => Some(AuditAction::PasswordChanged),
223 "api_key_created" => Some(AuditAction::ApiKeyCreated),
224 "api_key_revoked" => Some(AuditAction::ApiKeyRevoked),
225 "api_key_used" => Some(AuditAction::ApiKeyUsed),
226 "event_ingested" => Some(AuditAction::EventIngested),
227 "event_queried" => Some(AuditAction::EventQueried),
228 "event_stream_created" => Some(AuditAction::EventStreamCreated),
229 "tenant_created" => Some(AuditAction::TenantCreated),
230 "tenant_updated" => Some(AuditAction::TenantUpdated),
231 "tenant_activated" => Some(AuditAction::TenantActivated),
232 "tenant_deactivated" => Some(AuditAction::TenantDeactivated),
233 "tenant_deleted" => Some(AuditAction::TenantDeleted),
234 "schema_registered" => Some(AuditAction::SchemaRegistered),
235 "schema_updated" => Some(AuditAction::SchemaUpdated),
236 "schema_deleted" => Some(AuditAction::SchemaDeleted),
237 "user_created" => Some(AuditAction::UserCreated),
238 "user_updated" => Some(AuditAction::UserUpdated),
239 "user_deleted" => Some(AuditAction::UserDeleted),
240 "role_changed" => Some(AuditAction::RoleChanged),
241 "permission_denied" => Some(AuditAction::PermissionDenied),
242 "rate_limit_exceeded" => Some(AuditAction::RateLimitExceeded),
243 "configuration_changed" => Some(AuditAction::ConfigurationChanged),
244 "backup_created" => Some(AuditAction::BackupCreated),
245 "backup_restored" => Some(AuditAction::BackupRestored),
246 _ => None,
247 }
248}
249
250fn parse_audit_category(s: &str) -> Option<AuditCategory> {
251 match s {
252 "authentication" => Some(AuditCategory::Authentication),
253 "api_key" => Some(AuditCategory::ApiKey),
254 "event" => Some(AuditCategory::Event),
255 "tenant" => Some(AuditCategory::Tenant),
256 "schema" => Some(AuditCategory::Schema),
257 "projection" => Some(AuditCategory::Projection),
258 "pipeline" => Some(AuditCategory::Pipeline),
259 "user" => Some(AuditCategory::User),
260 "security" => Some(AuditCategory::Security),
261 "system" => Some(AuditCategory::System),
262 _ => None,
263 }
264}