1use axum::extract::{Json, Query, State};
7use axum::http::StatusCode;
8use axum::response::IntoResponse;
9use axum::routing::get;
10use axum::Router;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::io::AsyncWriteExt;
16use tokio::sync::{Mutex, RwLock};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct AuditEntry {
25 pub timestamp: DateTime<Utc>,
26 pub actor: String,
27 pub action: AuditAction,
28 pub target: String,
29 pub outcome: AuditOutcome,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub detail: Option<String>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub ip: Option<String>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum AuditAction {
40 Login,
41 Logout,
42 TokenRefresh,
43 PipelineDeploy,
44 PipelineDelete,
45 PipelineUpdate,
46 StreamStart,
47 StreamStop,
48 ApiKeyCreate,
49 ApiKeyDelete,
50 TierChange,
51 CheckoutStarted,
52 WebhookReceived,
53 SettingsChange,
54 AdminAccess,
55 UserCreate,
56 UserUpdate,
57 UserDelete,
58 UserDisable,
59 PasswordChange,
60 SessionRenew,
61 MaxSessionsExceeded,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum AuditOutcome {
68 Success,
69 Failure,
70 Denied,
71}
72
73#[derive(Debug)]
79pub struct AuditLogger {
80 writer: Mutex<tokio::io::BufWriter<tokio::fs::File>>,
81 recent: RwLock<Vec<AuditEntry>>,
82 max_recent: usize,
83}
84
85pub type SharedAuditLogger = Arc<AuditLogger>;
86
87impl AuditLogger {
88 pub async fn open(path: PathBuf) -> Result<SharedAuditLogger, String> {
90 let file = tokio::fs::OpenOptions::new()
91 .create(true)
92 .append(true)
93 .open(&path)
94 .await
95 .map_err(|e| format!("Failed to open audit log {}: {}", path.display(), e))?;
96
97 tracing::info!("Audit log: {}", path.display());
98
99 Ok(Arc::new(Self {
100 writer: Mutex::new(tokio::io::BufWriter::new(file)),
101 recent: RwLock::new(Vec::new()),
102 max_recent: 1000,
103 }))
104 }
105
106 pub async fn log(&self, entry: AuditEntry) {
108 if let Ok(mut line) = serde_json::to_string(&entry) {
110 line.push('\n');
111 let mut writer = self.writer.lock().await;
112 if let Err(e) = writer.write_all(line.as_bytes()).await {
113 tracing::error!("Audit write failed: {}", e);
114 }
115 let _ = writer.flush().await;
116 }
117
118 let mut recent = self.recent.write().await;
120 if recent.len() >= self.max_recent {
121 recent.remove(0);
122 }
123 recent.push(entry);
124 }
125
126 pub async fn recent(&self, limit: usize) -> Vec<AuditEntry> {
128 let recent = self.recent.read().await;
129 recent.iter().rev().take(limit).cloned().collect()
130 }
131}
132
133impl AuditEntry {
138 pub fn new(actor: impl Into<String>, action: AuditAction, target: impl Into<String>) -> Self {
139 Self {
140 timestamp: Utc::now(),
141 actor: actor.into(),
142 action,
143 target: target.into(),
144 outcome: AuditOutcome::Success,
145 detail: None,
146 ip: None,
147 }
148 }
149
150 pub const fn with_outcome(mut self, outcome: AuditOutcome) -> Self {
151 self.outcome = outcome;
152 self
153 }
154
155 pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
156 self.detail = Some(detail.into());
157 self
158 }
159
160 pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
161 self.ip = Some(ip.into());
162 self
163 }
164}
165
166#[derive(Debug, Deserialize)]
171struct AuditQuery {
172 #[serde(default = "default_limit")]
173 limit: usize,
174 #[serde(default)]
175 action: Option<String>,
176 #[serde(default)]
177 actor: Option<String>,
178}
179
180const fn default_limit() -> usize {
181 100
182}
183
184async fn handle_audit_list(
186 Query(query): Query<AuditQuery>,
187 State(logger): State<Option<SharedAuditLogger>>,
188) -> impl IntoResponse {
189 let logger = match logger {
190 Some(l) => l,
191 None => {
192 return (
193 StatusCode::SERVICE_UNAVAILABLE,
194 Json(serde_json::json!({"error": "Audit logging not enabled"})),
195 )
196 .into_response();
197 }
198 };
199
200 let limit = query.limit.min(1000);
201 let mut entries = logger.recent(limit).await;
202
203 if let Some(ref action_filter) = query.action {
205 entries.retain(|e| {
206 let action_str = serde_json::to_string(&e.action).unwrap_or_default();
207 action_str.contains(action_filter)
208 });
209 }
210 if let Some(ref actor_filter) = query.actor {
211 entries.retain(|e| e.actor.contains(actor_filter.as_str()));
212 }
213
214 (
215 StatusCode::OK,
216 Json(serde_json::json!({
217 "entries": entries,
218 "count": entries.len(),
219 })),
220 )
221 .into_response()
222}
223
224pub fn audit_routes(logger: Option<SharedAuditLogger>) -> Router {
226 Router::new()
227 .route("/api/v1/audit", get(handle_audit_list))
228 .with_state(logger)
229}
230
231#[cfg(test)]
236mod tests {
237 use super::*;
238 use axum::body::Body;
239 use axum::http::Request;
240 use tower::ServiceExt;
241
242 #[test]
243 fn test_audit_entry_serialization() {
244 let entry = AuditEntry::new("user@example.com", AuditAction::Login, "/auth/github")
245 .with_detail("GitHub OAuth")
246 .with_ip("10.0.0.1");
247
248 let json = serde_json::to_string(&entry).unwrap();
249 assert!(json.contains("\"action\":\"login\""));
250 assert!(json.contains("\"outcome\":\"success\""));
251 assert!(json.contains("\"ip\":\"10.0.0.1\""));
252
253 let parsed: AuditEntry = serde_json::from_str(&json).unwrap();
255 assert_eq!(parsed.actor, "user@example.com");
256 }
257
258 #[test]
259 fn test_audit_entry_failure() {
260 let entry = AuditEntry::new("anonymous", AuditAction::Login, "/auth/github")
261 .with_outcome(AuditOutcome::Failure)
262 .with_detail("Invalid OAuth code");
263
264 let json = serde_json::to_string(&entry).unwrap();
265 assert!(json.contains("\"outcome\":\"failure\""));
266 }
267
268 #[tokio::test]
269 async fn test_audit_logger_recent() {
270 let dir = tempfile::tempdir().unwrap();
271 let path = dir.path().join("audit.jsonl");
272 let logger = AuditLogger::open(path).await.unwrap();
273
274 for i in 0..5 {
275 logger
276 .log(AuditEntry::new(
277 format!("user_{i}"),
278 AuditAction::Login,
279 "/auth",
280 ))
281 .await;
282 }
283
284 let recent = logger.recent(3).await;
285 assert_eq!(recent.len(), 3);
286 assert_eq!(recent[0].actor, "user_4");
288 assert_eq!(recent[2].actor, "user_2");
289 }
290
291 #[tokio::test]
292 async fn test_audit_routes_not_configured() {
293 let app = audit_routes(None);
294
295 let req: Request<Body> = Request::builder()
296 .method("GET")
297 .uri("/api/v1/audit")
298 .body(Body::empty())
299 .unwrap();
300 let res = app.oneshot(req).await.unwrap();
301
302 assert_eq!(res.status(), 503);
303 }
304
305 #[tokio::test]
306 async fn test_audit_routes_returns_entries() {
307 let dir = tempfile::tempdir().unwrap();
308 let path = dir.path().join("audit.jsonl");
309 let logger = AuditLogger::open(path).await.unwrap();
310
311 logger
312 .log(AuditEntry::new("admin", AuditAction::Login, "/auth"))
313 .await;
314
315 let app = audit_routes(Some(logger));
316 let req: Request<Body> = Request::builder()
317 .method("GET")
318 .uri("/api/v1/audit?limit=10")
319 .body(Body::empty())
320 .unwrap();
321 let res = app.oneshot(req).await.unwrap();
322
323 assert_eq!(res.status(), 200);
324 let body = axum::body::to_bytes(res.into_body(), usize::MAX)
325 .await
326 .unwrap();
327 let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
328 assert_eq!(body["count"], 1);
329 }
330}