1use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Json, Query, State};
10use axum::http::StatusCode;
11use axum::response::IntoResponse;
12use axum::routing::get;
13use axum::Router;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tokio::io::AsyncWriteExt;
17use tokio::sync::{Mutex, RwLock};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct AuditEntry {
26 pub timestamp: DateTime<Utc>,
27 pub actor: String,
28 pub action: AuditAction,
29 pub target: String,
30 pub outcome: AuditOutcome,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub detail: Option<String>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub ip: Option<String>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum AuditAction {
41 Login,
42 Logout,
43 TokenRefresh,
44 PipelineDeploy,
45 PipelineDelete,
46 PipelineUpdate,
47 StreamStart,
48 StreamStop,
49 ApiKeyCreate,
50 ApiKeyDelete,
51 TierChange,
52 CheckoutStarted,
53 WebhookReceived,
54 SettingsChange,
55 AdminAccess,
56 UserCreate,
57 UserUpdate,
58 UserDelete,
59 UserDisable,
60 PasswordChange,
61 SessionRenew,
62 MaxSessionsExceeded,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub enum AuditOutcome {
69 Success,
70 Failure,
71 Denied,
72}
73
74#[derive(Debug)]
80pub struct AuditLogger {
81 writer: Mutex<tokio::io::BufWriter<tokio::fs::File>>,
82 recent: RwLock<Vec<AuditEntry>>,
83 max_recent: usize,
84}
85
86pub type SharedAuditLogger = Arc<AuditLogger>;
87
88impl AuditLogger {
89 pub async fn open(path: PathBuf) -> Result<SharedAuditLogger, String> {
91 let file = tokio::fs::OpenOptions::new()
92 .create(true)
93 .append(true)
94 .open(&path)
95 .await
96 .map_err(|e| format!("Failed to open audit log {}: {}", path.display(), e))?;
97
98 tracing::info!("Audit log: {}", path.display());
99
100 Ok(Arc::new(Self {
101 writer: Mutex::new(tokio::io::BufWriter::new(file)),
102 recent: RwLock::new(Vec::new()),
103 max_recent: 1000,
104 }))
105 }
106
107 pub async fn log(&self, entry: AuditEntry) {
109 if let Ok(mut line) = serde_json::to_string(&entry) {
111 line.push('\n');
112 let mut writer = self.writer.lock().await;
113 if let Err(e) = writer.write_all(line.as_bytes()).await {
114 tracing::error!("Audit write failed: {}", e);
115 }
116 let _ = writer.flush().await;
117 }
118
119 let mut recent = self.recent.write().await;
121 if recent.len() >= self.max_recent {
122 recent.remove(0);
123 }
124 recent.push(entry);
125 }
126
127 pub async fn recent(&self, limit: usize) -> Vec<AuditEntry> {
129 let recent = self.recent.read().await;
130 recent.iter().rev().take(limit).cloned().collect()
131 }
132}
133
134impl AuditEntry {
139 pub fn new(actor: impl Into<String>, action: AuditAction, target: impl Into<String>) -> Self {
140 Self {
141 timestamp: Utc::now(),
142 actor: actor.into(),
143 action,
144 target: target.into(),
145 outcome: AuditOutcome::Success,
146 detail: None,
147 ip: None,
148 }
149 }
150
151 pub const fn with_outcome(mut self, outcome: AuditOutcome) -> Self {
152 self.outcome = outcome;
153 self
154 }
155
156 pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
157 self.detail = Some(detail.into());
158 self
159 }
160
161 pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
162 self.ip = Some(ip.into());
163 self
164 }
165}
166
167#[derive(Debug, Deserialize)]
172struct AuditQuery {
173 #[serde(default = "default_limit")]
174 limit: usize,
175 #[serde(default)]
176 action: Option<String>,
177 #[serde(default)]
178 actor: Option<String>,
179}
180
181const fn default_limit() -> usize {
182 100
183}
184
185async fn handle_audit_list(
187 Query(query): Query<AuditQuery>,
188 State(logger): State<Option<SharedAuditLogger>>,
189) -> impl IntoResponse {
190 let logger = match logger {
191 Some(l) => l,
192 None => {
193 return (
194 StatusCode::SERVICE_UNAVAILABLE,
195 Json(serde_json::json!({"error": "Audit logging not enabled"})),
196 )
197 .into_response();
198 }
199 };
200
201 let limit = query.limit.min(1000);
202 let mut entries = logger.recent(limit).await;
203
204 if let Some(ref action_filter) = query.action {
206 entries.retain(|e| {
207 let action_str = serde_json::to_string(&e.action).unwrap_or_default();
208 action_str.contains(action_filter)
209 });
210 }
211 if let Some(ref actor_filter) = query.actor {
212 entries.retain(|e| e.actor.contains(actor_filter.as_str()));
213 }
214
215 (
216 StatusCode::OK,
217 Json(serde_json::json!({
218 "entries": entries,
219 "count": entries.len(),
220 })),
221 )
222 .into_response()
223}
224
225pub fn audit_routes(logger: Option<SharedAuditLogger>) -> Router {
227 Router::new()
228 .route("/api/v1/audit", get(handle_audit_list))
229 .with_state(logger)
230}
231
232#[cfg(test)]
237mod tests {
238 use axum::body::Body;
239 use axum::http::Request;
240 use tower::ServiceExt;
241
242 use super::*;
243
244 #[test]
245 fn test_audit_entry_serialization() {
246 let entry = AuditEntry::new("user@example.com", AuditAction::Login, "/auth/github")
247 .with_detail("GitHub OAuth")
248 .with_ip("10.0.0.1");
249
250 let json = serde_json::to_string(&entry).unwrap();
251 assert!(json.contains("\"action\":\"login\""));
252 assert!(json.contains("\"outcome\":\"success\""));
253 assert!(json.contains("\"ip\":\"10.0.0.1\""));
254
255 let parsed: AuditEntry = serde_json::from_str(&json).unwrap();
257 assert_eq!(parsed.actor, "user@example.com");
258 }
259
260 #[test]
261 fn test_audit_entry_failure() {
262 let entry = AuditEntry::new("anonymous", AuditAction::Login, "/auth/github")
263 .with_outcome(AuditOutcome::Failure)
264 .with_detail("Invalid OAuth code");
265
266 let json = serde_json::to_string(&entry).unwrap();
267 assert!(json.contains("\"outcome\":\"failure\""));
268 }
269
270 #[tokio::test]
271 async fn test_audit_logger_recent() {
272 let dir = tempfile::tempdir().unwrap();
273 let path = dir.path().join("audit.jsonl");
274 let logger = AuditLogger::open(path).await.unwrap();
275
276 for i in 0..5 {
277 logger
278 .log(AuditEntry::new(
279 format!("user_{i}"),
280 AuditAction::Login,
281 "/auth",
282 ))
283 .await;
284 }
285
286 let recent = logger.recent(3).await;
287 assert_eq!(recent.len(), 3);
288 assert_eq!(recent[0].actor, "user_4");
290 assert_eq!(recent[2].actor, "user_2");
291 }
292
293 #[tokio::test]
294 async fn test_audit_routes_not_configured() {
295 let app = audit_routes(None);
296
297 let req: Request<Body> = Request::builder()
298 .method("GET")
299 .uri("/api/v1/audit")
300 .body(Body::empty())
301 .unwrap();
302 let res = app.oneshot(req).await.unwrap();
303
304 assert_eq!(res.status(), 503);
305 }
306
307 #[tokio::test]
308 async fn test_audit_routes_returns_entries() {
309 let dir = tempfile::tempdir().unwrap();
310 let path = dir.path().join("audit.jsonl");
311 let logger = AuditLogger::open(path).await.unwrap();
312
313 logger
314 .log(AuditEntry::new("admin", AuditAction::Login, "/auth"))
315 .await;
316
317 let app = audit_routes(Some(logger));
318 let req: Request<Body> = Request::builder()
319 .method("GET")
320 .uri("/api/v1/audit?limit=10")
321 .body(Body::empty())
322 .unwrap();
323 let res = app.oneshot(req).await.unwrap();
324
325 assert_eq!(res.status(), 200);
326 let body = axum::body::to_bytes(res.into_body(), usize::MAX)
327 .await
328 .unwrap();
329 let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
330 assert_eq!(body["count"], 1);
331 }
332}