1use serde::Serialize;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8const FEZ_MUTATION_MESSAGE_ID: &str = "fe2c0ffee0000400a8b1mutati0naud1";
10
11#[derive(Serialize, Clone, Debug)]
13pub struct AuditRecord {
14 pub actor: String,
16 pub target_host: String,
18 pub operation: String,
20 pub unit: String,
22 pub result: String,
24 #[serde(skip_serializing_if = "Option::is_none")]
26 pub error: Option<String>,
27 pub correlation_id: String,
29 pub timestamp_unix_ms: u128,
31}
32
33impl AuditRecord {
34 fn priority(&self) -> &'static str {
35 match self.result.as_str() {
36 "error" => "3", "attempt" => "5", _ => "6", }
40 }
41}
42
43#[derive(Clone, Debug)]
47pub enum Outcome {
48 Attempt,
50 Ok,
52 Error(String),
54}
55
56impl Outcome {
57 fn result(&self) -> &'static str {
59 match self {
60 Outcome::Attempt => "attempt",
61 Outcome::Ok => "ok",
62 Outcome::Error(_) => "error",
63 }
64 }
65
66 fn into_error(self) -> Option<String> {
68 match self {
69 Outcome::Error(e) => Some(e),
70 _ => None,
71 }
72 }
73}
74
75#[derive(Clone, Debug)]
80pub struct AuditContext {
81 actor: String,
82 target_host: String,
83 operation: String,
84 unit: String,
85 correlation_id: String,
86}
87
88impl AuditContext {
89 pub fn new(
91 actor: &str,
92 target_host: &str,
93 operation: &str,
94 unit: &str,
95 correlation_id: &str,
96 ) -> Self {
97 AuditContext {
98 actor: actor.into(),
99 target_host: target_host.into(),
100 operation: operation.into(),
101 unit: unit.into(),
102 correlation_id: correlation_id.into(),
103 }
104 }
105
106 #[must_use]
108 pub fn record(&self, outcome: Outcome) -> AuditRecord {
109 let timestamp_unix_ms = SystemTime::now()
110 .duration_since(UNIX_EPOCH)
111 .map(|d| d.as_millis())
112 .unwrap_or(0);
113 let result = outcome.result().to_string();
114 AuditRecord {
115 actor: self.actor.clone(),
116 target_host: self.target_host.clone(),
117 operation: self.operation.clone(),
118 unit: self.unit.clone(),
119 result,
120 error: outcome.into_error(),
121 correlation_id: self.correlation_id.clone(),
122 timestamp_unix_ms,
123 }
124 }
125}
126
127pub fn run_audited<T, F>(
141 host: &str,
142 operation: &str,
143 unit: &str,
144 action: F,
145) -> crate::error::Result<T>
146where
147 F: FnOnce() -> crate::error::Result<T>,
148{
149 run_audited_with(sink_from_env().as_ref(), host, operation, unit, action)
150}
151
152pub fn run_audited_with<T, F>(
160 sink: &dyn AuditSink,
161 host: &str,
162 operation: &str,
163 unit: &str,
164 action: F,
165) -> crate::error::Result<T>
166where
167 F: FnOnce() -> crate::error::Result<T>,
168{
169 let ctx = AuditContext::new(&actor(), host, operation, unit, &correlation_id());
170 sink.write(&ctx.record(Outcome::Attempt));
171 let result = action();
172 match &result {
173 Ok(_) => sink.write(&ctx.record(Outcome::Ok)),
174 Err(e) => sink.write(&ctx.record(Outcome::Error(e.to_string()))),
175 }
176 result
177}
178
179pub fn actor() -> String {
181 std::env::var("USER")
182 .or_else(|_| std::env::var("LOGNAME"))
183 .unwrap_or_else(|_| "unknown".into())
184}
185
186pub fn correlation_id() -> String {
188 use std::sync::atomic::{AtomicU64, Ordering};
189 static SEQ: AtomicU64 = AtomicU64::new(0);
190 let nanos = SystemTime::now()
191 .duration_since(UNIX_EPOCH)
192 .map(|d| d.as_nanos())
193 .unwrap_or(0);
194 let pid = std::process::id();
195 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
196 format!("{nanos:x}-{pid:x}-{seq:x}")
197}
198
199fn push_field(out: &mut Vec<u8>, key: &str, value: &str) {
203 out.extend_from_slice(key.as_bytes());
204 if value.contains('\n') {
205 out.push(b'\n');
207 out.extend_from_slice(&(value.len() as u64).to_le_bytes());
208 } else {
209 out.push(b'=');
211 }
212 out.extend_from_slice(value.as_bytes());
213 out.push(b'\n');
214}
215
216pub fn encode_journal_fields(rec: &AuditRecord) -> Vec<u8> {
218 let mut out = Vec::new();
219 let message = format!(
220 "fez {} {} on {}: {}",
221 rec.operation, rec.unit, rec.target_host, rec.result
222 );
223 push_field(&mut out, "MESSAGE", &message);
224 push_field(&mut out, "MESSAGE_ID", FEZ_MUTATION_MESSAGE_ID);
225 push_field(&mut out, "SYSLOG_IDENTIFIER", "fez");
226 push_field(&mut out, "PRIORITY", rec.priority());
227 push_field(&mut out, "FEZ_ACTOR", &rec.actor);
228 push_field(&mut out, "FEZ_TARGET_HOST", &rec.target_host);
229 push_field(&mut out, "FEZ_OPERATION", &rec.operation);
230 push_field(&mut out, "FEZ_UNIT", &rec.unit);
231 push_field(&mut out, "FEZ_RESULT", &rec.result);
232 push_field(&mut out, "FEZ_CORRELATION_ID", &rec.correlation_id);
233 if let Some(err) = &rec.error {
234 push_field(&mut out, "FEZ_ERROR", err);
235 }
236 out
237}
238
239pub trait AuditSink {
241 fn write(&self, rec: &AuditRecord);
243}
244
245pub struct NoopSink;
247impl AuditSink for NoopSink {
248 fn write(&self, _rec: &AuditRecord) {}
249}
250
251pub struct FileSink {
253 pub path: std::path::PathBuf,
255}
256impl AuditSink for FileSink {
257 fn write(&self, rec: &AuditRecord) {
258 use std::io::Write;
259 if let Ok(mut f) = std::fs::OpenOptions::new()
260 .create(true)
261 .append(true)
262 .open(&self.path)
263 {
264 if let Ok(line) = serde_json::to_string(rec) {
265 let _ = writeln!(f, "{line}");
266 }
267 }
268 }
269}
270
271pub struct JournalSink;
274impl AuditSink for JournalSink {
275 fn write(&self, rec: &AuditRecord) {
276 let buf = encode_journal_fields(rec);
277 if let Ok(sock) = std::os::unix::net::UnixDatagram::unbound() {
278 let _ = sock.send_to(&buf, "/run/systemd/journal/socket");
279 }
280 }
281}
282
283pub fn sink_from_env() -> Box<dyn AuditSink> {
285 match std::env::var("FEZ_AUDIT").ok().as_deref() {
286 Some("off") | Some("0") => Box::new(NoopSink),
287 Some(v) if v.starts_with("file:") => Box::new(FileSink {
288 path: std::path::PathBuf::from(&v["file:".len()..]),
289 }),
290 _ => Box::new(JournalSink),
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 fn ctx() -> AuditContext {
299 AuditContext::new("alice", "localhost", "stop", "chronyd.service", "abc-1-0")
300 }
301
302 fn rec(result: &str, error: Option<String>) -> AuditRecord {
303 let outcome = match (result, error) {
304 ("attempt", _) => Outcome::Attempt,
305 ("ok", _) => Outcome::Ok,
306 ("error", Some(e)) => Outcome::Error(e),
307 ("error", None) => Outcome::Error(String::new()),
308 (other, _) => panic!("unexpected result {other}"),
309 };
310 ctx().record(outcome)
311 }
312
313 #[test]
314 fn context_records_share_invocation_fields() {
315 let c = ctx();
316 let attempt = c.record(Outcome::Attempt);
317 let ok = c.record(Outcome::Ok);
318 assert_eq!(attempt.actor, "alice");
319 assert_eq!(attempt.target_host, "localhost");
320 assert_eq!(attempt.operation, "stop");
321 assert_eq!(attempt.unit, "chronyd.service");
322 assert_eq!(attempt.correlation_id, "abc-1-0");
323 assert_eq!(attempt.operation, ok.operation);
325 assert_eq!(attempt.correlation_id, ok.correlation_id);
326 }
327
328 #[test]
329 fn outcome_maps_to_result_and_error() {
330 assert_eq!(ctx().record(Outcome::Attempt).result, "attempt");
331 assert_eq!(ctx().record(Outcome::Attempt).error, None);
332 assert_eq!(ctx().record(Outcome::Ok).result, "ok");
333 assert_eq!(ctx().record(Outcome::Ok).error, None);
334 let err = ctx().record(Outcome::Error("boom".into()));
335 assert_eq!(err.result, "error");
336 assert_eq!(err.error.as_deref(), Some("boom"));
337 }
338
339 #[test]
340 fn encodes_plain_fields() {
341 let bytes = encode_journal_fields(&rec("ok", None));
342 let text = String::from_utf8_lossy(&bytes);
343 assert!(text.contains("SYSLOG_IDENTIFIER=fez\n"));
344 assert!(text.contains("FEZ_UNIT=chronyd.service\n"));
345 assert!(text.contains("FEZ_OPERATION=stop\n"));
346 assert!(text.contains("FEZ_RESULT=ok\n"));
347 assert!(text.contains("PRIORITY=6\n"));
348 assert!(!text.contains("FEZ_ERROR"));
350 }
351
352 #[test]
353 fn encodes_newline_value_in_binary_form() {
354 let bytes = encode_journal_fields(&rec("error", Some("line one\nline two".into())));
355 let needle = b"FEZ_ERROR\n";
357 let pos = bytes
358 .windows(needle.len())
359 .position(|w| w == needle)
360 .expect("FEZ_ERROR key present in binary form");
361 let len_start = pos + needle.len();
362 let len_bytes: [u8; 8] = bytes[len_start..len_start + 8].try_into().unwrap();
363 assert_eq!(
364 u64::from_le_bytes(len_bytes) as usize,
365 "line one\nline two".len()
366 );
367 }
368
369 #[test]
370 fn file_sink_appends_json_lines() {
371 let path =
372 std::env::temp_dir().join(format!("fez-audit-test-{}.jsonl", std::process::id()));
373 let _ = std::fs::remove_file(&path);
374 let sink = FileSink { path: path.clone() };
375 sink.write(&rec("attempt", None));
376 sink.write(&rec("ok", None));
377 let body = std::fs::read_to_string(&path).unwrap();
378 let lines: Vec<&str> = body.lines().collect();
379 assert_eq!(lines.len(), 2);
380 assert!(lines[0].contains("\"result\":\"attempt\""));
381 assert!(lines[1].contains("\"result\":\"ok\""));
382 let _ = std::fs::remove_file(&path);
383 }
384
385 #[test]
386 fn priority_varies_by_result() {
387 assert_eq!(rec("error", Some("x".into())).priority(), "3");
388 assert_eq!(rec("attempt", None).priority(), "5");
389 assert_eq!(rec("ok", None).priority(), "6");
390 }
391
392 fn audit_temp_path(tag: &str) -> std::path::PathBuf {
393 std::env::temp_dir().join(format!(
394 "fez-run-audited-{tag}-{}-{:?}.jsonl",
395 std::process::id(),
396 std::thread::current().id()
397 ))
398 }
399
400 #[test]
401 fn run_audited_writes_attempt_then_ok_on_success() {
402 let path = audit_temp_path("ok");
403 let _ = std::fs::remove_file(&path);
404 let sink = FileSink { path: path.clone() };
405 let out: crate::error::Result<i32> =
406 run_audited_with(&sink, "localhost", "stop", "chronyd.service", || Ok(42));
407 assert_eq!(out.unwrap(), 42);
408 let body = std::fs::read_to_string(&path).unwrap();
409 let lines: Vec<&str> = body.lines().collect();
410 assert_eq!(lines.len(), 2);
411 assert!(lines[0].contains("\"result\":\"attempt\""));
412 assert!(lines[1].contains("\"result\":\"ok\""));
413 assert!(lines[0].contains("\"operation\":\"stop\""));
414 let _ = std::fs::remove_file(&path);
415 }
416
417 #[test]
418 fn run_audited_writes_attempt_then_error_on_failure() {
419 let path = audit_temp_path("err");
420 let _ = std::fs::remove_file(&path);
421 let sink = FileSink { path: path.clone() };
422 let out: crate::error::Result<i32> =
423 run_audited_with(&sink, "localhost", "start", "sshd.service", || {
424 Err(crate::error::FezError::Aborted)
425 });
426 assert!(out.is_err());
427 let body = std::fs::read_to_string(&path).unwrap();
428 let lines: Vec<&str> = body.lines().collect();
429 assert_eq!(lines.len(), 2);
430 assert!(lines[0].contains("\"result\":\"attempt\""));
431 assert!(lines[1].contains("\"result\":\"error\""));
432 assert!(lines[1].contains("aborted by user"));
433 let _ = std::fs::remove_file(&path);
434 }
435}