use std::path::Path;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
pub enum AuditSink {
File(Mutex<tokio::fs::File>),
Stderr,
Custom(Box<dyn AuditWriter + Send + Sync>),
}
#[async_trait::async_trait]
pub trait AuditWriter {
async fn write_line(&self, line: &str);
}
impl AuditSink {
pub async fn file(path: impl AsRef<Path>) -> std::io::Result<Self> {
let f = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
Ok(AuditSink::File(Mutex::new(f)))
}
pub async fn write(&self, line: &str) {
match self {
AuditSink::File(m) => {
let mut f = m.lock().await;
let _ = f.write_all(line.as_bytes()).await;
let _ = f.write_all(b"\n").await;
let _ = f.flush().await;
}
AuditSink::Stderr => {
eprintln!("{line}");
}
AuditSink::Custom(w) => {
w.write_line(line).await;
}
}
}
}
#[derive(Debug, Clone)]
pub struct AuditEvent<'a> {
pub event: &'a str,
pub peer: &'a str,
pub user: &'a str,
pub host: &'a str,
pub pv: &'a str,
pub value: &'a str,
pub result: &'a str,
}
#[derive(Clone, Copy, Debug, Default)]
pub enum AuditFormat {
#[default]
Json,
LegacyAslog,
}
impl AuditEvent<'_> {
fn to_aslog_line(&self) -> String {
let now = chrono::Utc::now();
let ts = now.format("%m/%d/%Y %H:%M:%S");
let op = match self.event {
"subscribe" | "unsubscribe" | "caget" => "R",
"caput" => "W",
"connect" => "C",
"disconnect" => "D",
"create_chan" => "O",
"acf_deny" => "X",
_ => "?",
};
let identity = if self.user.is_empty() && self.host.is_empty() {
self.peer.to_string()
} else if self.host.is_empty() {
self.user.to_string()
} else if self.user.is_empty() {
format!("anonymous@{}", self.host)
} else {
format!("{}@{}", self.user, self.host)
};
let pv_value = if self.value.is_empty() {
self.pv.to_string()
} else {
format!("{}={}", self.pv, self.value)
};
let result = if self.result.is_empty() {
String::new()
} else {
format!(" {}", self.result)
};
let line = format!(
"{ts} ASUSER {op} {identity} {ev}: {pv_value}{result}",
ev = self.event,
);
line.trim_end().to_string()
}
fn to_json(&self) -> String {
let ts = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let mut s = String::with_capacity(192);
s.push('{');
push_kv(&mut s, "ts", &ts);
s.push(',');
push_kv(&mut s, "ev", self.event);
s.push(',');
push_kv(&mut s, "peer", self.peer);
if !self.user.is_empty() {
s.push(',');
push_kv(&mut s, "user", self.user);
}
if !self.host.is_empty() {
s.push(',');
push_kv(&mut s, "host", self.host);
}
if !self.pv.is_empty() {
s.push(',');
push_kv(&mut s, "pv", self.pv);
}
if !self.value.is_empty() {
s.push(',');
push_kv(&mut s, "value", self.value);
}
if !self.result.is_empty() {
s.push(',');
push_kv(&mut s, "result", self.result);
}
s.push('}');
s
}
}
fn push_kv(s: &mut String, k: &str, v: &str) {
s.push('"');
s.push_str(k);
s.push_str("\":\"");
for c in v.chars() {
match c {
'"' => s.push_str("\\\""),
'\\' => s.push_str("\\\\"),
'\n' => s.push_str("\\n"),
'\r' => s.push_str("\\r"),
'\t' => s.push_str("\\t"),
c if (c as u32) < 0x20 => {
use std::fmt::Write;
let _ = write!(s, "\\u{:04x}", c as u32);
}
c => s.push(c),
}
}
s.push('"');
}
#[derive(Clone)]
pub struct AuditLogger {
tx: tokio::sync::mpsc::Sender<String>,
format: AuditFormat,
}
const AUDIT_QUEUE_CAPACITY: usize = 4096;
impl AuditLogger {
pub fn new(sink: AuditSink) -> Self {
Self::new_with_format(sink, AuditFormat::Json)
}
pub fn new_with_format(sink: AuditSink, format: AuditFormat) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(AUDIT_QUEUE_CAPACITY);
let sink = Arc::new(sink);
tokio::spawn(async move {
while let Some(line) = rx.recv().await {
sink.write(&line).await;
}
});
Self { tx, format }
}
pub async fn log(&self, ev: AuditEvent<'_>) {
let line = match self.format {
AuditFormat::Json => ev.to_json(),
AuditFormat::LegacyAslog => ev.to_aslog_line(),
};
if self.tx.try_send(line).is_err() {
metrics::counter!("ca_server_audit_drops_total").increment(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn json_basic() {
let ev = AuditEvent {
event: "caput",
peer: "10.0.0.5:1234",
user: "alice",
host: "opi-1",
pv: "MOTOR:VAL",
value: "3.14",
result: "ok",
};
let s = ev.to_json();
assert!(s.contains("\"ev\":\"caput\""));
assert!(s.contains("\"pv\":\"MOTOR:VAL\""));
assert!(s.contains("\"result\":\"ok\""));
}
#[test]
fn aslog_caput_render() {
let ev = AuditEvent {
event: "caput",
peer: "10.0.0.5:1234",
user: "alice",
host: "opi-1",
pv: "MOTOR:VAL",
value: "3.14",
result: "ok",
};
let s = ev.to_aslog_line();
assert!(s.starts_with(&chrono::Utc::now().format("%m/%d/%Y").to_string()));
assert!(s.contains(" ASUSER W alice@opi-1 caput: MOTOR:VAL=3.14 ok"));
}
#[test]
fn aslog_subscribe_render() {
let ev = AuditEvent {
event: "subscribe",
peer: "p",
user: "bob",
host: "ws-2",
pv: "BL10C:VG-01:PRESSURE",
value: "",
result: "",
};
let s = ev.to_aslog_line();
assert!(s.contains(" ASUSER R bob@ws-2 subscribe: BL10C:VG-01:PRESSURE"));
assert!(!s.contains("="));
}
#[test]
fn aslog_anonymous_no_result() {
let ev = AuditEvent {
event: "connect",
peer: "192.0.2.4:55001",
user: "",
host: "",
pv: "",
value: "",
result: "",
};
let s = ev.to_aslog_line();
assert!(s.contains(" ASUSER C 192.0.2.4:55001 connect:"));
assert!(!s.ends_with(' '));
}
#[test]
fn json_escapes_quotes_and_control() {
let ev = AuditEvent {
event: "caput",
peer: "p",
user: "u",
host: "h",
pv: "PV",
value: "a\"b\nc",
result: "ok",
};
let s = ev.to_json();
assert!(s.contains("\"value\":\"a\\\"b\\nc\""));
}
#[test]
fn skips_empty_optional_fields() {
let ev = AuditEvent {
event: "connect",
peer: "10.0.0.5:1234",
user: "",
host: "",
pv: "",
value: "",
result: "",
};
let s = ev.to_json();
assert!(!s.contains("\"user\""));
assert!(!s.contains("\"pv\""));
}
}