Skip to main content

varpulis_cli/
audit.rs

1//! Structured audit logging for Varpulis.
2//!
3//! Records security-relevant events as JSON-lines to a dedicated audit log file
4//! and exposes a read-only API endpoint for Admin users.
5
6use std::path::PathBuf;
7use std::sync::Arc;
8
9use axum::extract::{Json, Query, State};
10use axum::http::StatusCode;
11use axum::response::IntoResponse;
12use axum::routing::get;
13use axum::Router;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tokio::io::AsyncWriteExt;
17use tokio::sync::{Mutex, RwLock};
18
19// ---------------------------------------------------------------------------
20// Audit entry
21// ---------------------------------------------------------------------------
22
23/// A single audit log entry.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct AuditEntry {
26    pub timestamp: DateTime<Utc>,
27    pub actor: String,
28    pub action: AuditAction,
29    pub target: String,
30    pub outcome: AuditOutcome,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub detail: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub ip: Option<String>,
35}
36
37/// The kind of action being audited.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum AuditAction {
41    Login,
42    Logout,
43    TokenRefresh,
44    PipelineDeploy,
45    PipelineDelete,
46    PipelineUpdate,
47    StreamStart,
48    StreamStop,
49    ApiKeyCreate,
50    ApiKeyDelete,
51    TierChange,
52    CheckoutStarted,
53    WebhookReceived,
54    SettingsChange,
55    AdminAccess,
56    UserCreate,
57    UserUpdate,
58    UserDelete,
59    UserDisable,
60    PasswordChange,
61    SessionRenew,
62    MaxSessionsExceeded,
63}
64
65/// Whether the action succeeded or failed.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub enum AuditOutcome {
69    Success,
70    Failure,
71    Denied,
72}
73
74// ---------------------------------------------------------------------------
75// Audit logger
76// ---------------------------------------------------------------------------
77
78/// Thread-safe audit logger that writes JSON-lines to a file.
79#[derive(Debug)]
80pub struct AuditLogger {
81    writer: Mutex<tokio::io::BufWriter<tokio::fs::File>>,
82    recent: RwLock<Vec<AuditEntry>>,
83    max_recent: usize,
84}
85
86pub type SharedAuditLogger = Arc<AuditLogger>;
87
88impl AuditLogger {
89    /// Open (or create) the audit log file and return a shared logger.
90    pub async fn open(path: PathBuf) -> Result<SharedAuditLogger, String> {
91        let file = tokio::fs::OpenOptions::new()
92            .create(true)
93            .append(true)
94            .open(&path)
95            .await
96            .map_err(|e| format!("Failed to open audit log {}: {}", path.display(), e))?;
97
98        tracing::info!("Audit log: {}", path.display());
99
100        Ok(Arc::new(Self {
101            writer: Mutex::new(tokio::io::BufWriter::new(file)),
102            recent: RwLock::new(Vec::new()),
103            max_recent: 1000,
104        }))
105    }
106
107    /// Record an audit event.
108    pub async fn log(&self, entry: AuditEntry) {
109        // Write to file
110        if let Ok(mut line) = serde_json::to_string(&entry) {
111            line.push('\n');
112            let mut writer = self.writer.lock().await;
113            if let Err(e) = writer.write_all(line.as_bytes()).await {
114                tracing::error!("Audit write failed: {}", e);
115            }
116            let _ = writer.flush().await;
117        }
118
119        // Keep in recent buffer
120        let mut recent = self.recent.write().await;
121        if recent.len() >= self.max_recent {
122            recent.remove(0);
123        }
124        recent.push(entry);
125    }
126
127    /// Get recent audit entries (most recent first).
128    pub async fn recent(&self, limit: usize) -> Vec<AuditEntry> {
129        let recent = self.recent.read().await;
130        recent.iter().rev().take(limit).cloned().collect()
131    }
132}
133
134// ---------------------------------------------------------------------------
135// Convenience constructors
136// ---------------------------------------------------------------------------
137
138impl AuditEntry {
139    pub fn new(actor: impl Into<String>, action: AuditAction, target: impl Into<String>) -> Self {
140        Self {
141            timestamp: Utc::now(),
142            actor: actor.into(),
143            action,
144            target: target.into(),
145            outcome: AuditOutcome::Success,
146            detail: None,
147            ip: None,
148        }
149    }
150
151    pub const fn with_outcome(mut self, outcome: AuditOutcome) -> Self {
152        self.outcome = outcome;
153        self
154    }
155
156    pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
157        self.detail = Some(detail.into());
158        self
159    }
160
161    pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
162        self.ip = Some(ip.into());
163        self
164    }
165}
166
167// ---------------------------------------------------------------------------
168// API routes
169// ---------------------------------------------------------------------------
170
171#[derive(Debug, Deserialize)]
172struct AuditQuery {
173    #[serde(default = "default_limit")]
174    limit: usize,
175    #[serde(default)]
176    action: Option<String>,
177    #[serde(default)]
178    actor: Option<String>,
179}
180
181const fn default_limit() -> usize {
182    100
183}
184
185/// GET /api/v1/audit — returns recent audit entries (Admin only).
186async fn handle_audit_list(
187    Query(query): Query<AuditQuery>,
188    State(logger): State<Option<SharedAuditLogger>>,
189) -> impl IntoResponse {
190    let logger = match logger {
191        Some(l) => l,
192        None => {
193            return (
194                StatusCode::SERVICE_UNAVAILABLE,
195                Json(serde_json::json!({"error": "Audit logging not enabled"})),
196            )
197                .into_response();
198        }
199    };
200
201    let limit = query.limit.min(1000);
202    let mut entries = logger.recent(limit).await;
203
204    // Optional filters
205    if let Some(ref action_filter) = query.action {
206        entries.retain(|e| {
207            let action_str = serde_json::to_string(&e.action).unwrap_or_default();
208            action_str.contains(action_filter)
209        });
210    }
211    if let Some(ref actor_filter) = query.actor {
212        entries.retain(|e| e.actor.contains(actor_filter.as_str()));
213    }
214
215    (
216        StatusCode::OK,
217        Json(serde_json::json!({
218            "entries": entries,
219            "count": entries.len(),
220        })),
221    )
222        .into_response()
223}
224
225/// Build audit log routes.
226pub fn audit_routes(logger: Option<SharedAuditLogger>) -> Router {
227    Router::new()
228        .route("/api/v1/audit", get(handle_audit_list))
229        .with_state(logger)
230}
231
232// ---------------------------------------------------------------------------
233// Tests
234// ---------------------------------------------------------------------------
235
236#[cfg(test)]
237mod tests {
238    use axum::body::Body;
239    use axum::http::Request;
240    use tower::ServiceExt;
241
242    use super::*;
243
244    #[test]
245    fn test_audit_entry_serialization() {
246        let entry = AuditEntry::new("user@example.com", AuditAction::Login, "/auth/github")
247            .with_detail("GitHub OAuth")
248            .with_ip("10.0.0.1");
249
250        let json = serde_json::to_string(&entry).unwrap();
251        assert!(json.contains("\"action\":\"login\""));
252        assert!(json.contains("\"outcome\":\"success\""));
253        assert!(json.contains("\"ip\":\"10.0.0.1\""));
254
255        // Roundtrip
256        let parsed: AuditEntry = serde_json::from_str(&json).unwrap();
257        assert_eq!(parsed.actor, "user@example.com");
258    }
259
260    #[test]
261    fn test_audit_entry_failure() {
262        let entry = AuditEntry::new("anonymous", AuditAction::Login, "/auth/github")
263            .with_outcome(AuditOutcome::Failure)
264            .with_detail("Invalid OAuth code");
265
266        let json = serde_json::to_string(&entry).unwrap();
267        assert!(json.contains("\"outcome\":\"failure\""));
268    }
269
270    #[tokio::test]
271    async fn test_audit_logger_recent() {
272        let dir = tempfile::tempdir().unwrap();
273        let path = dir.path().join("audit.jsonl");
274        let logger = AuditLogger::open(path).await.unwrap();
275
276        for i in 0..5 {
277            logger
278                .log(AuditEntry::new(
279                    format!("user_{i}"),
280                    AuditAction::Login,
281                    "/auth",
282                ))
283                .await;
284        }
285
286        let recent = logger.recent(3).await;
287        assert_eq!(recent.len(), 3);
288        // Most recent first
289        assert_eq!(recent[0].actor, "user_4");
290        assert_eq!(recent[2].actor, "user_2");
291    }
292
293    #[tokio::test]
294    async fn test_audit_routes_not_configured() {
295        let app = audit_routes(None);
296
297        let req: Request<Body> = Request::builder()
298            .method("GET")
299            .uri("/api/v1/audit")
300            .body(Body::empty())
301            .unwrap();
302        let res = app.oneshot(req).await.unwrap();
303
304        assert_eq!(res.status(), 503);
305    }
306
307    #[tokio::test]
308    async fn test_audit_routes_returns_entries() {
309        let dir = tempfile::tempdir().unwrap();
310        let path = dir.path().join("audit.jsonl");
311        let logger = AuditLogger::open(path).await.unwrap();
312
313        logger
314            .log(AuditEntry::new("admin", AuditAction::Login, "/auth"))
315            .await;
316
317        let app = audit_routes(Some(logger));
318        let req: Request<Body> = Request::builder()
319            .method("GET")
320            .uri("/api/v1/audit?limit=10")
321            .body(Body::empty())
322            .unwrap();
323        let res = app.oneshot(req).await.unwrap();
324
325        assert_eq!(res.status(), 200);
326        let body = axum::body::to_bytes(res.into_body(), usize::MAX)
327            .await
328            .unwrap();
329        let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
330        assert_eq!(body["count"], 1);
331    }
332}