Skip to main content

varpulis_cli/
audit.rs

1//! Structured audit logging for Varpulis.
2//!
3//! Records security-relevant events as JSON-lines to a dedicated audit log file
4//! and exposes a read-only API endpoint for Admin users.
5
6use axum::extract::{Json, Query, State};
7use axum::http::StatusCode;
8use axum::response::IntoResponse;
9use axum::routing::get;
10use axum::Router;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::io::AsyncWriteExt;
16use tokio::sync::{Mutex, RwLock};
17
18// ---------------------------------------------------------------------------
19// Audit entry
20// ---------------------------------------------------------------------------
21
22/// A single audit log entry.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct AuditEntry {
25    pub timestamp: DateTime<Utc>,
26    pub actor: String,
27    pub action: AuditAction,
28    pub target: String,
29    pub outcome: AuditOutcome,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub detail: Option<String>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub ip: Option<String>,
34}
35
36/// The kind of action being audited.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum AuditAction {
40    Login,
41    Logout,
42    TokenRefresh,
43    PipelineDeploy,
44    PipelineDelete,
45    PipelineUpdate,
46    StreamStart,
47    StreamStop,
48    ApiKeyCreate,
49    ApiKeyDelete,
50    TierChange,
51    CheckoutStarted,
52    WebhookReceived,
53    SettingsChange,
54    AdminAccess,
55    UserCreate,
56    UserUpdate,
57    UserDelete,
58    UserDisable,
59    PasswordChange,
60    SessionRenew,
61    MaxSessionsExceeded,
62}
63
64/// Whether the action succeeded or failed.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum AuditOutcome {
68    Success,
69    Failure,
70    Denied,
71}
72
73// ---------------------------------------------------------------------------
74// Audit logger
75// ---------------------------------------------------------------------------
76
77/// Thread-safe audit logger that writes JSON-lines to a file.
78#[derive(Debug)]
79pub struct AuditLogger {
80    writer: Mutex<tokio::io::BufWriter<tokio::fs::File>>,
81    recent: RwLock<Vec<AuditEntry>>,
82    max_recent: usize,
83}
84
85pub type SharedAuditLogger = Arc<AuditLogger>;
86
87impl AuditLogger {
88    /// Open (or create) the audit log file and return a shared logger.
89    pub async fn open(path: PathBuf) -> Result<SharedAuditLogger, String> {
90        let file = tokio::fs::OpenOptions::new()
91            .create(true)
92            .append(true)
93            .open(&path)
94            .await
95            .map_err(|e| format!("Failed to open audit log {}: {}", path.display(), e))?;
96
97        tracing::info!("Audit log: {}", path.display());
98
99        Ok(Arc::new(Self {
100            writer: Mutex::new(tokio::io::BufWriter::new(file)),
101            recent: RwLock::new(Vec::new()),
102            max_recent: 1000,
103        }))
104    }
105
106    /// Record an audit event.
107    pub async fn log(&self, entry: AuditEntry) {
108        // Write to file
109        if let Ok(mut line) = serde_json::to_string(&entry) {
110            line.push('\n');
111            let mut writer = self.writer.lock().await;
112            if let Err(e) = writer.write_all(line.as_bytes()).await {
113                tracing::error!("Audit write failed: {}", e);
114            }
115            let _ = writer.flush().await;
116        }
117
118        // Keep in recent buffer
119        let mut recent = self.recent.write().await;
120        if recent.len() >= self.max_recent {
121            recent.remove(0);
122        }
123        recent.push(entry);
124    }
125
126    /// Get recent audit entries (most recent first).
127    pub async fn recent(&self, limit: usize) -> Vec<AuditEntry> {
128        let recent = self.recent.read().await;
129        recent.iter().rev().take(limit).cloned().collect()
130    }
131}
132
133// ---------------------------------------------------------------------------
134// Convenience constructors
135// ---------------------------------------------------------------------------
136
137impl AuditEntry {
138    pub fn new(actor: impl Into<String>, action: AuditAction, target: impl Into<String>) -> Self {
139        Self {
140            timestamp: Utc::now(),
141            actor: actor.into(),
142            action,
143            target: target.into(),
144            outcome: AuditOutcome::Success,
145            detail: None,
146            ip: None,
147        }
148    }
149
150    pub const fn with_outcome(mut self, outcome: AuditOutcome) -> Self {
151        self.outcome = outcome;
152        self
153    }
154
155    pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
156        self.detail = Some(detail.into());
157        self
158    }
159
160    pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
161        self.ip = Some(ip.into());
162        self
163    }
164}
165
166// ---------------------------------------------------------------------------
167// API routes
168// ---------------------------------------------------------------------------
169
170#[derive(Debug, Deserialize)]
171struct AuditQuery {
172    #[serde(default = "default_limit")]
173    limit: usize,
174    #[serde(default)]
175    action: Option<String>,
176    #[serde(default)]
177    actor: Option<String>,
178}
179
180const fn default_limit() -> usize {
181    100
182}
183
184/// GET /api/v1/audit — returns recent audit entries (Admin only).
185async fn handle_audit_list(
186    Query(query): Query<AuditQuery>,
187    State(logger): State<Option<SharedAuditLogger>>,
188) -> impl IntoResponse {
189    let logger = match logger {
190        Some(l) => l,
191        None => {
192            return (
193                StatusCode::SERVICE_UNAVAILABLE,
194                Json(serde_json::json!({"error": "Audit logging not enabled"})),
195            )
196                .into_response();
197        }
198    };
199
200    let limit = query.limit.min(1000);
201    let mut entries = logger.recent(limit).await;
202
203    // Optional filters
204    if let Some(ref action_filter) = query.action {
205        entries.retain(|e| {
206            let action_str = serde_json::to_string(&e.action).unwrap_or_default();
207            action_str.contains(action_filter)
208        });
209    }
210    if let Some(ref actor_filter) = query.actor {
211        entries.retain(|e| e.actor.contains(actor_filter.as_str()));
212    }
213
214    (
215        StatusCode::OK,
216        Json(serde_json::json!({
217            "entries": entries,
218            "count": entries.len(),
219        })),
220    )
221        .into_response()
222}
223
224/// Build audit log routes.
225pub fn audit_routes(logger: Option<SharedAuditLogger>) -> Router {
226    Router::new()
227        .route("/api/v1/audit", get(handle_audit_list))
228        .with_state(logger)
229}
230
231// ---------------------------------------------------------------------------
232// Tests
233// ---------------------------------------------------------------------------
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use axum::body::Body;
239    use axum::http::Request;
240    use tower::ServiceExt;
241
242    #[test]
243    fn test_audit_entry_serialization() {
244        let entry = AuditEntry::new("user@example.com", AuditAction::Login, "/auth/github")
245            .with_detail("GitHub OAuth")
246            .with_ip("10.0.0.1");
247
248        let json = serde_json::to_string(&entry).unwrap();
249        assert!(json.contains("\"action\":\"login\""));
250        assert!(json.contains("\"outcome\":\"success\""));
251        assert!(json.contains("\"ip\":\"10.0.0.1\""));
252
253        // Roundtrip
254        let parsed: AuditEntry = serde_json::from_str(&json).unwrap();
255        assert_eq!(parsed.actor, "user@example.com");
256    }
257
258    #[test]
259    fn test_audit_entry_failure() {
260        let entry = AuditEntry::new("anonymous", AuditAction::Login, "/auth/github")
261            .with_outcome(AuditOutcome::Failure)
262            .with_detail("Invalid OAuth code");
263
264        let json = serde_json::to_string(&entry).unwrap();
265        assert!(json.contains("\"outcome\":\"failure\""));
266    }
267
268    #[tokio::test]
269    async fn test_audit_logger_recent() {
270        let dir = tempfile::tempdir().unwrap();
271        let path = dir.path().join("audit.jsonl");
272        let logger = AuditLogger::open(path).await.unwrap();
273
274        for i in 0..5 {
275            logger
276                .log(AuditEntry::new(
277                    format!("user_{i}"),
278                    AuditAction::Login,
279                    "/auth",
280                ))
281                .await;
282        }
283
284        let recent = logger.recent(3).await;
285        assert_eq!(recent.len(), 3);
286        // Most recent first
287        assert_eq!(recent[0].actor, "user_4");
288        assert_eq!(recent[2].actor, "user_2");
289    }
290
291    #[tokio::test]
292    async fn test_audit_routes_not_configured() {
293        let app = audit_routes(None);
294
295        let req: Request<Body> = Request::builder()
296            .method("GET")
297            .uri("/api/v1/audit")
298            .body(Body::empty())
299            .unwrap();
300        let res = app.oneshot(req).await.unwrap();
301
302        assert_eq!(res.status(), 503);
303    }
304
305    #[tokio::test]
306    async fn test_audit_routes_returns_entries() {
307        let dir = tempfile::tempdir().unwrap();
308        let path = dir.path().join("audit.jsonl");
309        let logger = AuditLogger::open(path).await.unwrap();
310
311        logger
312            .log(AuditEntry::new("admin", AuditAction::Login, "/auth"))
313            .await;
314
315        let app = audit_routes(Some(logger));
316        let req: Request<Body> = Request::builder()
317            .method("GET")
318            .uri("/api/v1/audit?limit=10")
319            .body(Body::empty())
320            .unwrap();
321        let res = app.oneshot(req).await.unwrap();
322
323        assert_eq!(res.status(), 200);
324        let body = axum::body::to_bytes(res.into_body(), usize::MAX)
325            .await
326            .unwrap();
327        let body: serde_json::Value = serde_json::from_slice(&body).unwrap();
328        assert_eq!(body["count"], 1);
329    }
330}