Skip to main content

fez/
audit.rs

1//! Structured audit for mutations (Section 8, layer 4). Records are written to a
2//! pluggable sink; the default writes to the systemd journal via its native
3//! protocol over a datagram socket. Selection is via the `FEZ_AUDIT` env var:
4//!   unset | "journal" -> journal   "off" | "0" -> no-op   "file:<path>" -> JSON lines
5use serde::Serialize;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8/// A stable MESSAGE_ID (128-bit hex, journald convention) tagging fez mutations.
9const FEZ_MUTATION_MESSAGE_ID: &str = "fe2c0ffee0000400a8b1mutati0naud1";
10
11/// One audit event describing an attempted or completed mutation.
12#[derive(Serialize, Clone, Debug)]
13pub struct AuditRecord {
14    /// Identity that initiated the operation (see [`actor`]).
15    pub actor: String,
16    /// Host the operation targets.
17    pub target_host: String,
18    /// The mutation performed (e.g. `start`, `stop`, `restart`).
19    pub operation: String,
20    /// The systemd unit the operation acts on.
21    pub unit: String,
22    /// "attempt" | "ok" | "error"
23    pub result: String,
24    /// Error detail when `result` is `"error"`, otherwise omitted.
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub error: Option<String>,
27    /// Correlation id linking all records for one invocation (see [`correlation_id`]).
28    pub correlation_id: String,
29    /// Record creation time as Unix milliseconds.
30    pub timestamp_unix_ms: u128,
31}
32
33impl AuditRecord {
34    fn priority(&self) -> &'static str {
35        match self.result.as_str() {
36            "error" => "3",   // err
37            "attempt" => "5", // notice
38            _ => "6",         // info
39        }
40    }
41}
42
43/// The result of one mutation step, replacing a stringly-typed `result` plus a
44/// loosely-coupled `error`. The error payload exists only on [`Outcome::Error`],
45/// so a record can never claim success while carrying an error (or vice versa).
46#[derive(Clone, Debug)]
47pub enum Outcome {
48    /// The mutation is about to run (audited before any side effect).
49    Attempt,
50    /// The mutation completed successfully.
51    Ok,
52    /// The mutation failed; the payload is the error detail.
53    Error(String),
54}
55
56impl Outcome {
57    /// The wire `result` string (`"attempt"`, `"ok"`, or `"error"`).
58    fn result(&self) -> &'static str {
59        match self {
60            Outcome::Attempt => "attempt",
61            Outcome::Ok => "ok",
62            Outcome::Error(_) => "error",
63        }
64    }
65
66    /// The error detail, present only for [`Outcome::Error`].
67    fn into_error(self) -> Option<String> {
68        match self {
69            Outcome::Error(e) => Some(e),
70            _ => None,
71        }
72    }
73}
74
75/// The invocation-shared context for a sequence of audit records. One mutation
76/// produces several records (attempt, then ok/error) that agree on actor, host,
77/// operation, unit, and correlation id; capture those once here and stamp each
78/// [`Outcome`] via [`AuditContext::record`].
79#[derive(Clone, Debug)]
80pub struct AuditContext {
81    actor: String,
82    target_host: String,
83    operation: String,
84    unit: String,
85    correlation_id: String,
86}
87
88impl AuditContext {
89    /// Capture the fields shared by every record for one mutation invocation.
90    pub fn new(
91        actor: &str,
92        target_host: &str,
93        operation: &str,
94        unit: &str,
95        correlation_id: &str,
96    ) -> Self {
97        AuditContext {
98            actor: actor.into(),
99            target_host: target_host.into(),
100            operation: operation.into(),
101            unit: unit.into(),
102            correlation_id: correlation_id.into(),
103        }
104    }
105
106    /// Build a record for `outcome`, stamping the current Unix-millisecond time.
107    #[must_use]
108    pub fn record(&self, outcome: Outcome) -> AuditRecord {
109        let timestamp_unix_ms = SystemTime::now()
110            .duration_since(UNIX_EPOCH)
111            .map(|d| d.as_millis())
112            .unwrap_or(0);
113        let result = outcome.result().to_string();
114        AuditRecord {
115            actor: self.actor.clone(),
116            target_host: self.target_host.clone(),
117            operation: self.operation.clone(),
118            unit: self.unit.clone(),
119            result,
120            error: outcome.into_error(),
121            correlation_id: self.correlation_id.clone(),
122            timestamp_unix_ms,
123        }
124    }
125}
126
127/// Run a mutation under the standard audit envelope: write an `attempt`
128/// record, execute `action`, then write an `ok`/`error` record from its result,
129/// and return that result unchanged.
130///
131/// The sink comes from [`sink_from_env`] and the [`AuditContext`] is built from
132/// [`actor`]/[`correlation_id`] plus the supplied `host`/`operation`/`unit`,
133/// so every mutating capability emits the identical attempt+result pair without
134/// repeating the wiring. `action` runs exactly once.
135///
136/// # Errors
137///
138/// Propagates `action`'s error unchanged; the audit writes are best-effort and
139/// never alter the returned [`crate::error::Result`].
140pub fn run_audited<T, F>(
141    host: &str,
142    operation: &str,
143    unit: &str,
144    action: F,
145) -> crate::error::Result<T>
146where
147    F: FnOnce() -> crate::error::Result<T>,
148{
149    run_audited_with(sink_from_env().as_ref(), host, operation, unit, action)
150}
151
152/// [`run_audited`] against an explicit sink. Carries the attempt/result wiring;
153/// [`run_audited`] is the production wrapper that supplies [`sink_from_env`].
154///
155/// # Errors
156///
157/// Propagates `action`'s error unchanged; the [`AuditSink::write`] calls are
158/// best-effort and never alter the returned [`crate::error::Result`].
159pub fn run_audited_with<T, F>(
160    sink: &dyn AuditSink,
161    host: &str,
162    operation: &str,
163    unit: &str,
164    action: F,
165) -> crate::error::Result<T>
166where
167    F: FnOnce() -> crate::error::Result<T>,
168{
169    let ctx = AuditContext::new(&actor(), host, operation, unit, &correlation_id());
170    sink.write(&ctx.record(Outcome::Attempt));
171    let result = action();
172    match &result {
173        Ok(_) => sink.write(&ctx.record(Outcome::Ok)),
174        Err(e) => sink.write(&ctx.record(Outcome::Error(e.to_string()))),
175    }
176    result
177}
178
179/// The actor identity, best-effort from the environment.
180pub fn actor() -> String {
181    std::env::var("USER")
182        .or_else(|_| std::env::var("LOGNAME"))
183        .unwrap_or_else(|_| "unknown".into())
184}
185
186/// A best-effort-unique correlation id for one invocation's records.
187pub fn correlation_id() -> String {
188    use std::sync::atomic::{AtomicU64, Ordering};
189    static SEQ: AtomicU64 = AtomicU64::new(0);
190    let nanos = SystemTime::now()
191        .duration_since(UNIX_EPOCH)
192        .map(|d| d.as_nanos())
193        .unwrap_or(0);
194    let pid = std::process::id();
195    let seq = SEQ.fetch_add(1, Ordering::Relaxed);
196    format!("{nanos:x}-{pid:x}-{seq:x}")
197}
198
199/// Encode one journal field in the systemd native protocol. Values without a
200/// newline use `KEY=VALUE\n`; values with a newline use the binary form:
201/// `KEY\n` + little-endian u64 length + raw bytes + `\n`.
202fn push_field(out: &mut Vec<u8>, key: &str, value: &str) {
203    out.extend_from_slice(key.as_bytes());
204    if value.contains('\n') {
205        // Binary form: KEY\n + LE u64 length + raw bytes + \n.
206        out.push(b'\n');
207        out.extend_from_slice(&(value.len() as u64).to_le_bytes());
208    } else {
209        // Plain form: KEY=VALUE\n.
210        out.push(b'=');
211    }
212    out.extend_from_slice(value.as_bytes());
213    out.push(b'\n');
214}
215
216/// Encode an audit record as systemd journal native-protocol fields.
217pub fn encode_journal_fields(rec: &AuditRecord) -> Vec<u8> {
218    let mut out = Vec::new();
219    let message = format!(
220        "fez {} {} on {}: {}",
221        rec.operation, rec.unit, rec.target_host, rec.result
222    );
223    push_field(&mut out, "MESSAGE", &message);
224    push_field(&mut out, "MESSAGE_ID", FEZ_MUTATION_MESSAGE_ID);
225    push_field(&mut out, "SYSLOG_IDENTIFIER", "fez");
226    push_field(&mut out, "PRIORITY", rec.priority());
227    push_field(&mut out, "FEZ_ACTOR", &rec.actor);
228    push_field(&mut out, "FEZ_TARGET_HOST", &rec.target_host);
229    push_field(&mut out, "FEZ_OPERATION", &rec.operation);
230    push_field(&mut out, "FEZ_UNIT", &rec.unit);
231    push_field(&mut out, "FEZ_RESULT", &rec.result);
232    push_field(&mut out, "FEZ_CORRELATION_ID", &rec.correlation_id);
233    if let Some(err) = &rec.error {
234        push_field(&mut out, "FEZ_ERROR", err);
235    }
236    out
237}
238
239/// A destination that persists [`AuditRecord`]s.
240pub trait AuditSink {
241    /// Write one record. Implementations are best-effort and must not panic.
242    fn write(&self, rec: &AuditRecord);
243}
244
245/// Discards records (used when `FEZ_AUDIT=off`).
246pub struct NoopSink;
247impl AuditSink for NoopSink {
248    fn write(&self, _rec: &AuditRecord) {}
249}
250
251/// Appends each record as a JSON line (used by tests and the E2E harness).
252pub struct FileSink {
253    /// File the JSON lines are appended to.
254    pub path: std::path::PathBuf,
255}
256impl AuditSink for FileSink {
257    fn write(&self, rec: &AuditRecord) {
258        use std::io::Write;
259        if let Ok(mut f) = std::fs::OpenOptions::new()
260            .create(true)
261            .append(true)
262            .open(&self.path)
263        {
264            if let Ok(line) = serde_json::to_string(rec) {
265                let _ = writeln!(f, "{line}");
266            }
267        }
268    }
269}
270
271/// Writes to the systemd journal via its native datagram protocol. Best-effort:
272/// audit must never break a mutation, so a missing socket is silently ignored.
273pub struct JournalSink;
274impl AuditSink for JournalSink {
275    fn write(&self, rec: &AuditRecord) {
276        let buf = encode_journal_fields(rec);
277        if let Ok(sock) = std::os::unix::net::UnixDatagram::unbound() {
278            let _ = sock.send_to(&buf, "/run/systemd/journal/socket");
279        }
280    }
281}
282
283/// Select a sink from the `FEZ_AUDIT` environment variable.
284pub fn sink_from_env() -> Box<dyn AuditSink> {
285    match std::env::var("FEZ_AUDIT").ok().as_deref() {
286        Some("off") | Some("0") => Box::new(NoopSink),
287        Some(v) if v.starts_with("file:") => Box::new(FileSink {
288            path: std::path::PathBuf::from(&v["file:".len()..]),
289        }),
290        _ => Box::new(JournalSink),
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    fn ctx() -> AuditContext {
299        AuditContext::new("alice", "localhost", "stop", "chronyd.service", "abc-1-0")
300    }
301
302    fn rec(result: &str, error: Option<String>) -> AuditRecord {
303        let outcome = match (result, error) {
304            ("attempt", _) => Outcome::Attempt,
305            ("ok", _) => Outcome::Ok,
306            ("error", Some(e)) => Outcome::Error(e),
307            ("error", None) => Outcome::Error(String::new()),
308            (other, _) => panic!("unexpected result {other}"),
309        };
310        ctx().record(outcome)
311    }
312
313    #[test]
314    fn context_records_share_invocation_fields() {
315        let c = ctx();
316        let attempt = c.record(Outcome::Attempt);
317        let ok = c.record(Outcome::Ok);
318        assert_eq!(attempt.actor, "alice");
319        assert_eq!(attempt.target_host, "localhost");
320        assert_eq!(attempt.operation, "stop");
321        assert_eq!(attempt.unit, "chronyd.service");
322        assert_eq!(attempt.correlation_id, "abc-1-0");
323        // Same context produces records that agree on every shared field.
324        assert_eq!(attempt.operation, ok.operation);
325        assert_eq!(attempt.correlation_id, ok.correlation_id);
326    }
327
328    #[test]
329    fn outcome_maps_to_result_and_error() {
330        assert_eq!(ctx().record(Outcome::Attempt).result, "attempt");
331        assert_eq!(ctx().record(Outcome::Attempt).error, None);
332        assert_eq!(ctx().record(Outcome::Ok).result, "ok");
333        assert_eq!(ctx().record(Outcome::Ok).error, None);
334        let err = ctx().record(Outcome::Error("boom".into()));
335        assert_eq!(err.result, "error");
336        assert_eq!(err.error.as_deref(), Some("boom"));
337    }
338
339    #[test]
340    fn encodes_plain_fields() {
341        let bytes = encode_journal_fields(&rec("ok", None));
342        let text = String::from_utf8_lossy(&bytes);
343        assert!(text.contains("SYSLOG_IDENTIFIER=fez\n"));
344        assert!(text.contains("FEZ_UNIT=chronyd.service\n"));
345        assert!(text.contains("FEZ_OPERATION=stop\n"));
346        assert!(text.contains("FEZ_RESULT=ok\n"));
347        assert!(text.contains("PRIORITY=6\n"));
348        // No FEZ_ERROR field when error is None.
349        assert!(!text.contains("FEZ_ERROR"));
350    }
351
352    #[test]
353    fn encodes_newline_value_in_binary_form() {
354        let bytes = encode_journal_fields(&rec("error", Some("line one\nline two".into())));
355        // Find the FEZ_ERROR key written in binary form: `FEZ_ERROR\n` + u64 LE len.
356        let needle = b"FEZ_ERROR\n";
357        let pos = bytes
358            .windows(needle.len())
359            .position(|w| w == needle)
360            .expect("FEZ_ERROR key present in binary form");
361        let len_start = pos + needle.len();
362        let len_bytes: [u8; 8] = bytes[len_start..len_start + 8].try_into().unwrap();
363        assert_eq!(
364            u64::from_le_bytes(len_bytes) as usize,
365            "line one\nline two".len()
366        );
367    }
368
369    #[test]
370    fn file_sink_appends_json_lines() {
371        let path =
372            std::env::temp_dir().join(format!("fez-audit-test-{}.jsonl", std::process::id()));
373        let _ = std::fs::remove_file(&path);
374        let sink = FileSink { path: path.clone() };
375        sink.write(&rec("attempt", None));
376        sink.write(&rec("ok", None));
377        let body = std::fs::read_to_string(&path).unwrap();
378        let lines: Vec<&str> = body.lines().collect();
379        assert_eq!(lines.len(), 2);
380        assert!(lines[0].contains("\"result\":\"attempt\""));
381        assert!(lines[1].contains("\"result\":\"ok\""));
382        let _ = std::fs::remove_file(&path);
383    }
384
385    #[test]
386    fn priority_varies_by_result() {
387        assert_eq!(rec("error", Some("x".into())).priority(), "3");
388        assert_eq!(rec("attempt", None).priority(), "5");
389        assert_eq!(rec("ok", None).priority(), "6");
390    }
391
392    fn audit_temp_path(tag: &str) -> std::path::PathBuf {
393        std::env::temp_dir().join(format!(
394            "fez-run-audited-{tag}-{}-{:?}.jsonl",
395            std::process::id(),
396            std::thread::current().id()
397        ))
398    }
399
400    #[test]
401    fn run_audited_writes_attempt_then_ok_on_success() {
402        let path = audit_temp_path("ok");
403        let _ = std::fs::remove_file(&path);
404        let sink = FileSink { path: path.clone() };
405        let out: crate::error::Result<i32> =
406            run_audited_with(&sink, "localhost", "stop", "chronyd.service", || Ok(42));
407        assert_eq!(out.unwrap(), 42);
408        let body = std::fs::read_to_string(&path).unwrap();
409        let lines: Vec<&str> = body.lines().collect();
410        assert_eq!(lines.len(), 2);
411        assert!(lines[0].contains("\"result\":\"attempt\""));
412        assert!(lines[1].contains("\"result\":\"ok\""));
413        assert!(lines[0].contains("\"operation\":\"stop\""));
414        let _ = std::fs::remove_file(&path);
415    }
416
417    #[test]
418    fn run_audited_writes_attempt_then_error_on_failure() {
419        let path = audit_temp_path("err");
420        let _ = std::fs::remove_file(&path);
421        let sink = FileSink { path: path.clone() };
422        let out: crate::error::Result<i32> =
423            run_audited_with(&sink, "localhost", "start", "sshd.service", || {
424                Err(crate::error::FezError::Aborted)
425            });
426        assert!(out.is_err());
427        let body = std::fs::read_to_string(&path).unwrap();
428        let lines: Vec<&str> = body.lines().collect();
429        assert_eq!(lines.len(), 2);
430        assert!(lines[0].contains("\"result\":\"attempt\""));
431        assert!(lines[1].contains("\"result\":\"error\""));
432        assert!(lines[1].contains("aborted by user"));
433        let _ = std::fs::remove_file(&path);
434    }
435}