1use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57 ApiKey,
58 Session,
59 Password,
60 Oauth,
61 ClientCert,
62 Anonymous,
63 System,
64}
65
66impl AuditAuthSource {
67 pub fn as_str(self) -> &'static str {
68 match self {
69 Self::ApiKey => "api_key",
70 Self::Session => "session",
71 Self::Password => "password",
72 Self::Oauth => "oauth",
73 Self::ClientCert => "client_cert",
74 Self::Anonymous => "anonymous",
75 Self::System => "system",
76 }
77 }
78
79 pub fn parse(s: &str) -> Option<Self> {
80 Some(match s {
81 "api_key" => Self::ApiKey,
82 "session" => Self::Session,
83 "password" => Self::Password,
84 "oauth" => Self::Oauth,
85 "client_cert" => Self::ClientCert,
86 "anonymous" => Self::Anonymous,
87 "system" => Self::System,
88 _ => return None,
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96 Success,
97 Denied,
98 Error,
99}
100
101impl Outcome {
102 pub fn as_str(self) -> &'static str {
103 match self {
104 Self::Success => "success",
105 Self::Denied => "denied",
106 Self::Error => "error",
107 }
108 }
109
110 pub fn parse(s: &str) -> Option<Self> {
111 Some(match s {
112 "success" | "ok" => Self::Success,
113 "denied" | "deny" => Self::Denied,
114 "error" | "err" => Self::Error,
115 _ => return None,
116 })
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct AuditEvent {
123 pub ts: u128,
124 pub event_id: String,
125 pub principal: Option<String>,
126 pub source: AuditAuthSource,
127 pub tenant: Option<String>,
128 pub action: String,
129 pub resource: Option<String>,
130 pub outcome: Outcome,
131 pub detail: JsonValue,
132 pub remote_addr: Option<String>,
133 pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137 pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140 AuditEventBuilder {
141 inner: AuditEvent {
142 ts: crate::utils::now_unix_millis() as u128,
143 event_id: new_event_id(),
144 principal: None,
145 source: AuditAuthSource::System,
146 tenant: None,
147 action: action.into(),
148 resource: None,
149 outcome: Outcome::Success,
150 detail: JsonValue::Null,
151 remote_addr: None,
152 correlation_id: None,
153 },
154 }
155 }
156
157 pub fn from_legacy(
161 action: &str,
162 principal: &str,
163 target: &str,
164 result: &str,
165 details: JsonValue,
166 ) -> Self {
167 let outcome = if result == "ok" {
168 Outcome::Success
169 } else if result.starts_with("err") {
170 Outcome::Error
171 } else if result.starts_with("denied") || result.starts_with("deny") {
172 Outcome::Denied
173 } else {
174 Outcome::Success
175 };
176 let mut detail = details;
177 if !result.is_empty() && result != "ok" {
178 let mut obj = match detail {
181 JsonValue::Object(map) => map,
182 JsonValue::Null => Map::new(),
183 other => {
184 let mut m = Map::new();
185 m.insert("legacy".to_string(), other);
186 m
187 }
188 };
189 obj.entry("result_text".to_string())
190 .or_insert(JsonValue::String(result.to_string()));
191 detail = JsonValue::Object(obj);
192 }
193 Self {
194 ts: crate::utils::now_unix_millis() as u128,
195 event_id: new_event_id(),
196 principal: if principal.is_empty() || principal == "system" {
197 None
198 } else {
199 Some(principal.to_string())
200 },
201 source: if principal == "system" {
202 AuditAuthSource::System
203 } else if principal.is_empty() {
204 AuditAuthSource::Anonymous
205 } else {
206 AuditAuthSource::Password
207 },
208 tenant: None,
209 action: action.to_string(),
210 resource: if target.is_empty() {
211 None
212 } else {
213 Some(target.to_string())
214 },
215 outcome,
216 detail,
217 remote_addr: None,
218 correlation_id: None,
219 }
220 }
221
222 pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225 let mut object = Map::new();
226 object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227 object.insert(
228 "ts_iso".to_string(),
229 JsonValue::String(format_iso8601(self.ts as u64)),
230 );
231 object.insert(
232 "event_id".to_string(),
233 JsonValue::String(self.event_id.clone()),
234 );
235 if let Some(p) = &self.principal {
236 object.insert("principal".to_string(), JsonValue::String(p.clone()));
237 }
238 object.insert(
239 "source".to_string(),
240 JsonValue::String(self.source.as_str().to_string()),
241 );
242 if let Some(t) = &self.tenant {
243 object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244 }
245 object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246 if let Some(r) = &self.resource {
247 object.insert("resource".to_string(), JsonValue::String(r.clone()));
248 }
249 object.insert(
250 "outcome".to_string(),
251 JsonValue::String(self.outcome.as_str().to_string()),
252 );
253 if !matches!(self.detail, JsonValue::Null) {
254 object.insert("detail".to_string(), self.detail.clone());
255 }
256 if let Some(ip) = &self.remote_addr {
257 object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258 }
259 if let Some(cid) = &self.correlation_id {
260 object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261 }
262 if let Some(h) = prev_hash {
263 object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264 }
265 JsonValue::Object(object).to_string_compact()
266 }
267
268 pub fn parse_line(line: &str) -> Option<Self> {
272 let v: JsonValue = crate::json::from_str(line).ok()?;
273 let action = v.get("action")?.as_str()?.to_string();
274 let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275 let outcome = Outcome::parse(outcome_s)?;
276 let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277 let event_id = v
278 .get("event_id")
279 .and_then(|n| n.as_str())
280 .unwrap_or("")
281 .to_string();
282 let source = v
283 .get("source")
284 .and_then(|n| n.as_str())
285 .and_then(AuditAuthSource::parse)
286 .unwrap_or(AuditAuthSource::System);
287 Some(Self {
288 ts,
289 event_id,
290 principal: v
291 .get("principal")
292 .and_then(|n| n.as_str())
293 .map(|s| s.to_string()),
294 source,
295 tenant: v
296 .get("tenant")
297 .and_then(|n| n.as_str())
298 .map(|s| s.to_string()),
299 action,
300 resource: v
301 .get("resource")
302 .and_then(|n| n.as_str())
303 .map(|s| s.to_string()),
304 outcome,
305 detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306 remote_addr: v
307 .get("remote_addr")
308 .and_then(|n| n.as_str())
309 .map(|s| s.to_string()),
310 correlation_id: v
311 .get("correlation_id")
312 .and_then(|n| n.as_str())
313 .map(|s| s.to_string()),
314 })
315 }
316}
317
318pub struct AuditEventBuilder {
320 inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324 pub fn principal(mut self, principal: impl Into<String>) -> Self {
325 self.inner.principal = Some(principal.into());
326 self
327 }
328
329 pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330 self.inner.principal = principal;
331 self
332 }
333
334 pub fn source(mut self, source: AuditAuthSource) -> Self {
335 self.inner.source = source;
336 self
337 }
338
339 pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340 self.inner.tenant = Some(tenant.into());
341 self
342 }
343
344 pub fn resource(mut self, resource: impl Into<String>) -> Self {
345 self.inner.resource = Some(resource.into());
346 self
347 }
348
349 pub fn outcome(mut self, outcome: Outcome) -> Self {
350 self.inner.outcome = outcome;
351 self
352 }
353
354 pub fn detail(mut self, detail: JsonValue) -> Self {
355 self.inner.detail = detail;
356 self
357 }
358
359 pub fn field(mut self, field: AuditField) -> Self {
363 let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364 JsonValue::Object(map) => map,
365 JsonValue::Null => Map::new(),
366 other => {
367 let mut m = Map::new();
370 m.insert("legacy_detail".to_string(), other);
371 m
372 }
373 };
374 obj.insert(field.name.to_string(), field.value.into_json_value());
375 self.inner.detail = JsonValue::Object(obj);
376 self
377 }
378
379 pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381 for f in fields {
382 self = self.field(f);
383 }
384 self
385 }
386
387 pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388 self.inner.remote_addr = Some(addr.into());
389 self
390 }
391
392 pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393 self.inner.correlation_id = Some(cid.into());
394 self
395 }
396
397 pub fn build(self) -> AuditEvent {
398 self.inner
399 }
400}
401
402#[derive(Debug, Clone)]
416pub enum AuditValue {
417 String(String),
418 Bytes(Vec<u8>),
422 Number(i64),
423 Bool(bool),
424 Null,
425}
426
427impl AuditValue {
428 fn into_json_value(self) -> JsonValue {
429 match self {
430 AuditValue::String(s) => JsonValue::String(s),
431 AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432 AuditValue::Number(n) => JsonValue::Number(n as f64),
433 AuditValue::Bool(b) => JsonValue::Bool(b),
434 AuditValue::Null => JsonValue::Null,
435 }
436 }
437}
438
439impl From<String> for AuditValue {
440 fn from(value: String) -> Self {
441 AuditValue::String(value)
442 }
443}
444
445impl From<&str> for AuditValue {
446 fn from(value: &str) -> Self {
447 AuditValue::String(value.to_string())
448 }
449}
450
451impl From<&String> for AuditValue {
452 fn from(value: &String) -> Self {
453 AuditValue::String(value.clone())
454 }
455}
456
457impl From<Vec<u8>> for AuditValue {
458 fn from(value: Vec<u8>) -> Self {
459 AuditValue::Bytes(value)
460 }
461}
462
463impl From<&[u8]> for AuditValue {
464 fn from(value: &[u8]) -> Self {
465 AuditValue::Bytes(value.to_vec())
466 }
467}
468
469impl From<i64> for AuditValue {
470 fn from(value: i64) -> Self {
471 AuditValue::Number(value)
472 }
473}
474
475impl From<u64> for AuditValue {
476 fn from(value: u64) -> Self {
477 AuditValue::Number(if value > i64::MAX as u64 {
480 i64::MAX
481 } else {
482 value as i64
483 })
484 }
485}
486
487impl From<u32> for AuditValue {
488 fn from(value: u32) -> Self {
489 AuditValue::Number(value as i64)
490 }
491}
492
493impl From<usize> for AuditValue {
494 fn from(value: usize) -> Self {
495 AuditValue::Number(if value > i64::MAX as usize {
496 i64::MAX
497 } else {
498 value as i64
499 })
500 }
501}
502
503impl From<bool> for AuditValue {
504 fn from(value: bool) -> Self {
505 AuditValue::Bool(value)
506 }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510 fn from(value: Option<T>) -> Self {
511 match value {
512 Some(v) => v.into(),
513 None => AuditValue::Null,
514 }
515 }
516}
517
518#[derive(Debug, Clone)]
523pub struct AuditField {
524 name: &'static str,
525 value: AuditValue,
526}
527
528impl AuditField {
529 pub fn name(&self) -> &'static str {
530 self.name
531 }
532 pub fn value(&self) -> &AuditValue {
533 &self.value
534 }
535}
536
537pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555 pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560 AuditField {
561 name,
562 value: value.into(),
563 }
564 }
565}
566
567fn base64_encode(input: &[u8]) -> String {
574 const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575 let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576 let mut i = 0;
577 while i + 3 <= input.len() {
578 let b0 = input[i] as u32;
579 let b1 = input[i + 1] as u32;
580 let b2 = input[i + 2] as u32;
581 let n = (b0 << 16) | (b1 << 8) | b2;
582 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585 out.push(ALPHABET[(n & 0x3f) as usize] as char);
586 i += 3;
587 }
588 let rem = input.len() - i;
589 if rem == 1 {
590 let n = (input[i] as u32) << 16;
591 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593 out.push('=');
594 out.push('=');
595 } else if rem == 2 {
596 let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600 out.push('=');
601 }
602 out
603}
604
605fn new_event_id() -> String {
612 const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613 let now_ms = crate::utils::now_unix_millis();
614 let mut rand_bytes = [0u8; 10];
615 let _ = os_random::fill_bytes(&mut rand_bytes);
616
617 let mut out = String::with_capacity(26);
618 for i in (0..10).rev() {
620 let shift = (i as u32) * 5;
621 let idx = ((now_ms >> shift) & 0x1f) as usize;
622 out.push(ALPHABET[idx] as char);
623 }
624 let mut acc: u128 = 0;
626 for &b in &rand_bytes {
627 acc = (acc << 8) | (b as u128);
628 }
629 for i in (0..16).rev() {
630 let shift = (i as u32) * 5;
631 let idx = ((acc >> shift) & 0x1f) as usize;
632 out.push(ALPHABET[idx] as char);
633 }
634 out
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643 Off,
645 Every,
647 Periodic,
650}
651
652impl FsyncMode {
653 fn from_env() -> Self {
654 match std::env::var("RED_AUDIT_FSYNC")
655 .unwrap_or_default()
656 .to_ascii_lowercase()
657 .as_str()
658 {
659 "every" | "strong" | "on" => Self::Every,
660 "off" | "none" => Self::Off,
661 _ => Self::Periodic,
662 }
663 }
664}
665
666#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672 Event(AuditEvent),
673 Flush(mpsc::Sender<()>),
674 Shutdown,
675}
676
677#[derive(Debug)]
682pub struct AuditLogger {
683 path: PathBuf,
684 tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685 fallback_lock: Mutex<()>,
689 last_hash: Arc<Mutex<Option<String>>>,
693 max_bytes: u64,
694 fsync_mode: FsyncMode,
695 stream_url: Option<String>,
696 writer_alive: Arc<AtomicBool>,
698 pending: Arc<AtomicU64>,
699 handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703 pub fn for_data_path(data_path: &Path) -> Self {
706 let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
707 let path = parent.join(".audit.log");
708 Self::with_path(path)
709 }
710
711 pub fn for_destination(
718 dest: &crate::storage::layout::LogDestination,
719 fallback_data_path: &Path,
720 ) -> Self {
721 use crate::storage::layout::LogDestination;
722 match dest {
723 LogDestination::File(path) => Self::with_path(path.clone()),
724 LogDestination::Stderr => Self::for_data_path(fallback_data_path),
725 LogDestination::Syslog => {
726 tracing::warn!(
727 target: "reddb::audit",
728 "audit LogDestination::Syslog requested; sink not implemented, falling back to file next to data path"
729 );
730 Self::for_data_path(fallback_data_path)
731 }
732 }
733 }
734
735 pub fn with_path(path: PathBuf) -> Self {
737 let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
738 .ok()
739 .and_then(|v| v.parse::<u64>().ok())
740 .unwrap_or(64 * 1024 * 1024);
741 let fsync_mode = FsyncMode::from_env();
742 let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
743 .ok()
744 .filter(|s| !s.is_empty());
745 Self::with_settings(path, max_bytes, fsync_mode, stream_url)
746 }
747
748 pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
751 Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
752 }
753
754 fn with_settings(
755 path: PathBuf,
756 max_bytes: u64,
757 fsync_mode: FsyncMode,
758 stream_url: Option<String>,
759 ) -> Self {
760 let writer_alive = Arc::new(AtomicBool::new(false));
761 let pending = Arc::new(AtomicU64::new(0));
762 let mut seed: Option<String> = None;
765 if let Ok(body) = std::fs::read_to_string(&path) {
766 if let Some(line) = body.lines().last() {
767 seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
768 }
769 }
770 let last_hash = Arc::new(Mutex::new(seed));
771 let logger = Self {
772 path,
773 tx: Mutex::new(None),
774 fallback_lock: Mutex::new(()),
775 last_hash,
776 max_bytes,
777 fsync_mode,
778 stream_url,
779 writer_alive: Arc::clone(&writer_alive),
780 pending: Arc::clone(&pending),
781 handle: Mutex::new(None),
782 };
783 logger.start_writer_thread();
784 logger
785 }
786
787 pub fn path(&self) -> &Path {
788 &self.path
789 }
790
791 pub fn max_bytes(&self) -> u64 {
793 self.max_bytes
794 }
795
796 fn start_writer_thread(&self) {
797 let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
798 *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
799 let path = self.path.clone();
800 let max_bytes = self.max_bytes;
801 let fsync_mode = self.fsync_mode;
802 let stream_url = self.stream_url.clone();
803 let writer_alive = Arc::clone(&self.writer_alive);
804 let pending = Arc::clone(&self.pending);
805 let last_hash = Arc::clone(&self.last_hash);
806 writer_alive.store(true, Ordering::SeqCst);
807 let handle = thread::Builder::new()
808 .name("reddb-audit-writer".to_string())
809 .spawn(move || {
810 writer_loop(
811 rx,
812 path,
813 max_bytes,
814 fsync_mode,
815 stream_url,
816 writer_alive,
817 pending,
818 last_hash,
819 );
820 })
821 .expect("spawn audit writer thread");
822 *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
823 }
824
825 pub fn current_hash(&self) -> Option<String> {
827 self.last_hash
828 .lock()
829 .unwrap_or_else(|e| e.into_inner())
830 .clone()
831 }
832
833 pub fn wait_idle(&self, timeout: Duration) -> bool {
836 let deadline = Instant::now() + timeout;
837 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
838 if let Some(tx) = tx_guard.as_ref() {
839 let (back_tx, back_rx) = mpsc::channel();
840 if tx.send(WriterMsg::Flush(back_tx)).is_err() {
841 return false;
842 }
843 drop(tx_guard);
844 let remaining = deadline.saturating_duration_since(Instant::now());
845 return back_rx.recv_timeout(remaining).is_ok();
846 }
847 false
848 }
849
850 pub fn record(
853 &self,
854 action: &str,
855 principal: &str,
856 target: &str,
857 result: &str,
858 details: JsonValue,
859 ) {
860 let event = AuditEvent::from_legacy(action, principal, target, result, details);
861 self.record_event(event);
862 }
863
864 pub fn record_event(&self, event: AuditEvent) {
868 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
869 let recovered_event: AuditEvent;
870 if let Some(tx) = tx_guard.as_ref() {
871 self.pending.fetch_add(1, Ordering::SeqCst);
872 match tx.try_send(WriterMsg::Event(event)) {
873 Ok(()) => return,
874 Err(mpsc::TrySendError::Full(msg)) => {
875 self.pending.fetch_sub(1, Ordering::SeqCst);
876 tracing::warn!(
877 target: "reddb::audit",
878 "audit channel saturated; falling back to sync write"
879 );
880 recovered_event = match msg {
881 WriterMsg::Event(ev) => ev,
882 _ => return,
883 };
884 }
885 Err(mpsc::TrySendError::Disconnected(msg)) => {
886 self.pending.fetch_sub(1, Ordering::SeqCst);
887 recovered_event = match msg {
888 WriterMsg::Event(ev) => ev,
889 _ => return,
890 };
891 }
892 }
893 } else {
894 recovered_event = event;
895 }
896 drop(tx_guard);
897 self.write_direct(recovered_event);
898 }
899
900 fn write_direct(&self, event: AuditEvent) {
903 let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
904 let prev = self
905 .last_hash
906 .lock()
907 .unwrap_or_else(|e| e.into_inner())
908 .clone();
909 let line = event.to_json_line(prev.as_deref());
910 if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
911 tracing::warn!(
912 target: "reddb::audit",
913 error = %err,
914 path = %self.path.display(),
915 "direct audit append failed"
916 );
917 return;
918 }
919 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
920 if let Ok(mut g) = self.last_hash.lock() {
921 *g = Some(new_hash);
922 }
923 if let Some(url) = &self.stream_url {
924 stream_post(url, &line);
925 }
926 tracing::info!(target: "reddb::audit", "{line}");
927 }
928}
929
930impl Drop for AuditLogger {
931 fn drop(&mut self) {
932 let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
933 if let Some(tx) = tx_guard.take() {
934 let _ = tx.send(WriterMsg::Shutdown);
935 }
936 drop(tx_guard);
937 if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
938 let _ = handle.join();
939 }
940 }
941}
942
943#[allow(clippy::too_many_arguments)]
948fn writer_loop(
949 rx: mpsc::Receiver<WriterMsg>,
950 path: PathBuf,
951 max_bytes: u64,
952 fsync_mode: FsyncMode,
953 stream_url: Option<String>,
954 writer_alive: Arc<AtomicBool>,
955 pending: Arc<AtomicU64>,
956 last_hash: Arc<Mutex<Option<String>>>,
957) {
958 if let Some(parent) = path.parent() {
959 if !parent.as_os_str().is_empty() {
960 let _ = std::fs::create_dir_all(parent);
961 }
962 }
963
964 let mut writer = match open_active(&path) {
965 Ok(w) => w,
966 Err(err) => {
967 tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
968 writer_alive.store(false, Ordering::SeqCst);
969 return;
970 }
971 };
972 let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
976
977 let periodic_interval = Duration::from_millis(250);
978 let mut last_flush = Instant::now();
979 let mut buffered_since_fsync: u64 = 0;
980
981 loop {
982 let recv_timeout = match fsync_mode {
985 FsyncMode::Periodic => periodic_interval,
986 FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
987 };
988 match rx.recv_timeout(recv_timeout) {
989 Ok(WriterMsg::Event(event)) => {
990 let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
991 let line = event.to_json_line(prev.as_deref());
992
993 let line_bytes = line.len() as u64 + 1; if let Err(err) = write_line(&mut writer, &line) {
995 tracing::warn!(
996 target: "reddb::audit",
997 error = %err,
998 "audit write failed; reopening"
999 );
1000 if let Ok(w2) = open_active(&path) {
1001 writer = w2;
1002 let _ = write_line(&mut writer, &line);
1003 }
1004 }
1005 bytes_written = bytes_written.saturating_add(line_bytes);
1006 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
1007 if let Ok(mut g) = last_hash.lock() {
1008 *g = Some(new_hash);
1009 }
1010 if let Some(url) = &stream_url {
1011 stream_post(url, &line);
1012 }
1013 tracing::info!(target: "reddb::audit", "{line}");
1014 pending.fetch_sub(1, Ordering::SeqCst);
1015 buffered_since_fsync += 1;
1016
1017 match fsync_mode {
1018 FsyncMode::Every => {
1019 let _ = writer.flush();
1020 let _ = writer.get_ref().sync_data();
1021 buffered_since_fsync = 0;
1022 }
1023 FsyncMode::Periodic => {
1024 if last_flush.elapsed() >= periodic_interval {
1025 let _ = writer.flush();
1026 let _ = writer.get_ref().sync_data();
1027 last_flush = Instant::now();
1028 buffered_since_fsync = 0;
1029 }
1030 }
1031 FsyncMode::Off => {}
1032 }
1033
1034 if bytes_written >= max_bytes {
1037 let _ = writer.flush();
1038 let _ = writer.get_ref().sync_data();
1039 if let Err(err) = rotate(&path) {
1040 tracing::warn!(
1041 target: "reddb::audit",
1042 error = %err,
1043 "audit rotation failed; continuing in-place"
1044 );
1045 }
1046 match open_active(&path) {
1047 Ok(w2) => writer = w2,
1048 Err(err) => {
1049 tracing::error!(
1050 target: "reddb::audit",
1051 error = %err,
1052 "audit reopen failed after rotate"
1053 );
1054 break;
1055 }
1056 }
1057 last_flush = Instant::now();
1058 buffered_since_fsync = 0;
1059 bytes_written = 0;
1060 }
1061 }
1062 Ok(WriterMsg::Flush(ack)) => {
1063 let _ = writer.flush();
1064 let _ = writer.get_ref().sync_data();
1065 last_flush = Instant::now();
1066 buffered_since_fsync = 0;
1067 let _ = ack.send(());
1071 }
1072 Ok(WriterMsg::Shutdown) => {
1073 let _ = writer.flush();
1074 let _ = writer.get_ref().sync_data();
1075 break;
1076 }
1077 Err(mpsc::RecvTimeoutError::Timeout) => {
1078 if buffered_since_fsync > 0 {
1079 let _ = writer.flush();
1080 let _ = writer.get_ref().sync_data();
1081 last_flush = Instant::now();
1082 buffered_since_fsync = 0;
1083 }
1084 }
1085 Err(mpsc::RecvTimeoutError::Disconnected) => {
1086 let _ = writer.flush();
1087 let _ = writer.get_ref().sync_data();
1088 break;
1089 }
1090 }
1091 }
1092
1093 writer_alive.store(false, Ordering::SeqCst);
1094}
1095
1096fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1097 if let Some(parent) = path.parent() {
1098 if !parent.as_os_str().is_empty() {
1099 std::fs::create_dir_all(parent)?;
1100 }
1101 }
1102 let f = std::fs::OpenOptions::new()
1103 .create(true)
1104 .append(true)
1105 .open(path)?;
1106 Ok(BufWriter::new(f))
1107}
1108
1109fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1110 writer.write_all(line.as_bytes())?;
1111 writer.write_all(b"\n")?;
1112 Ok(())
1113}
1114
1115fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1116 if let Some(parent) = path.parent() {
1117 if !parent.as_os_str().is_empty() {
1118 std::fs::create_dir_all(parent)?;
1119 }
1120 }
1121 let mut file = std::fs::OpenOptions::new()
1122 .create(true)
1123 .append(true)
1124 .open(path)?;
1125 file.write_all(line.as_bytes())?;
1126 file.write_all(b"\n")?;
1127 file.sync_data()?;
1128 drop(file);
1129 if let Ok(meta) = std::fs::metadata(path) {
1130 if meta.len() >= max_bytes {
1131 let _ = rotate(path);
1132 }
1133 }
1134 Ok(())
1135}
1136
1137fn rotate(active: &Path) -> std::io::Result<()> {
1145 let ts = crate::utils::now_unix_nanos();
1146 let stem = active
1147 .file_name()
1148 .and_then(|s| s.to_str())
1149 .unwrap_or(".audit.log");
1150 let parent = active.parent().unwrap_or_else(|| Path::new("."));
1151 let plain = parent.join(format!("{stem}.{ts}"));
1152 std::fs::rename(active, &plain)?;
1153 let raw = std::fs::read(&plain)?;
1154 let compressed = match zstd::bulk::compress(&raw, 3) {
1155 Ok(c) => c,
1156 Err(err) => {
1157 tracing::warn!(
1160 target: "reddb::audit",
1161 error = %err,
1162 "audit rotation: zstd compress failed; leaving plaintext"
1163 );
1164 return Ok(());
1165 }
1166 };
1167 let zst = parent.join(format!("{stem}.{ts}.zst"));
1168 let mut out = std::fs::File::create(&zst)?;
1169 out.write_all(&compressed)?;
1170 out.sync_data()?;
1171 drop(out);
1172 let _ = std::fs::remove_file(&plain);
1173 Ok(())
1174}
1175
1176fn stream_post(url: &str, line: &str) {
1181 let url = url.to_string();
1182 let line = line.to_string();
1183 let _ = thread::Builder::new()
1187 .name("reddb-audit-siem".to_string())
1188 .spawn(move || {
1189 let agent: ureq::Agent = ureq::Agent::config_builder()
1190 .timeout_connect(Some(Duration::from_secs(2)))
1191 .timeout_send_request(Some(Duration::from_secs(3)))
1192 .timeout_recv_response(Some(Duration::from_secs(3)))
1193 .http_status_as_error(false)
1194 .build()
1195 .into();
1196 let _ = agent
1197 .post(&url)
1198 .header("content-type", "application/x-ndjson")
1199 .send(line.as_bytes());
1200 });
1201}
1202
1203fn format_iso8601(ms_since_epoch: u64) -> String {
1208 let secs = ms_since_epoch / 1000;
1209 let ms = ms_since_epoch % 1000;
1210 let days = secs / 86_400;
1211 let rem = secs % 86_400;
1212 let (y, mo, d) = civil_from_days(days as i64);
1213 let h = rem / 3600;
1214 let mi = (rem % 3600) / 60;
1215 let s = rem % 60;
1216 format!(
1217 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1218 y, mo, d, h, mi, s, ms
1219 )
1220}
1221
1222fn civil_from_days(z: i64) -> (i64, u32, u32) {
1223 let z = z + 719_468;
1224 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1225 let doe = (z - era * 146_097) as u64;
1226 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1227 let y = (yoe as i64) + era * 400;
1228 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1229 let mp = (5 * doy + 2) / 153;
1230 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1231 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1232 (if m <= 2 { y + 1 } else { y }, m, d)
1233}
1234
1235#[cfg(test)]
1240mod tests {
1241 use super::*;
1242
1243 fn temp_data_path(tag: &str) -> PathBuf {
1244 let mut p = std::env::temp_dir();
1245 p.push(format!(
1246 "reddb-audit-{}-{}-{}",
1247 tag,
1248 std::process::id(),
1249 crate::utils::now_unix_nanos()
1250 ));
1251 std::fs::create_dir_all(&p).unwrap();
1252 p.push("data.rdb");
1253 p
1254 }
1255
1256 fn drain(logger: &AuditLogger) {
1257 assert!(logger.wait_idle(Duration::from_secs(2)));
1258 }
1259
1260 #[test]
1261 fn record_writes_one_line_per_call() {
1262 let data = temp_data_path("one-line");
1263 let logger = AuditLogger::for_data_path(&data);
1264 logger.record(
1265 "admin/readonly",
1266 "operator",
1267 "instance",
1268 "ok",
1269 JsonValue::Null,
1270 );
1271 drain(&logger);
1272 let body = std::fs::read_to_string(logger.path()).unwrap();
1273 let lines: Vec<&str> = body.lines().collect();
1274 assert_eq!(lines.len(), 1);
1275 assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1276 assert!(lines[0].contains("\"outcome\":\"success\""));
1277 }
1278
1279 #[test]
1280 fn record_appends_across_calls() {
1281 let data = temp_data_path("append");
1282 let logger = AuditLogger::for_data_path(&data);
1283 logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1284 logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1285 drain(&logger);
1286 let lines = std::fs::read_to_string(logger.path()).unwrap();
1287 assert_eq!(lines.lines().count(), 2);
1288 }
1289
1290 #[test]
1291 fn record_event_emits_full_schema() {
1292 let data = temp_data_path("schema");
1293 let logger = AuditLogger::for_data_path(&data);
1294 let mut detail = Map::new();
1295 detail.insert("ms".to_string(), JsonValue::Number(412.0));
1296 let ev = AuditEvent::builder("admin/shutdown")
1297 .principal("alice@acme")
1298 .source(AuditAuthSource::Session)
1299 .tenant("acme")
1300 .resource("instance")
1301 .outcome(Outcome::Success)
1302 .detail(JsonValue::Object(detail))
1303 .remote_addr("203.0.113.5")
1304 .correlation_id("req-42")
1305 .build();
1306 logger.record_event(ev);
1307 drain(&logger);
1308 let body = std::fs::read_to_string(logger.path()).unwrap();
1309 assert!(body.contains("\"action\":\"admin/shutdown\""));
1310 assert!(body.contains("\"principal\":\"alice@acme\""));
1311 assert!(body.contains("\"tenant\":\"acme\""));
1312 assert!(body.contains("\"source\":\"session\""));
1313 assert!(body.contains("\"correlation_id\":\"req-42\""));
1314 assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1315 assert!(body.contains("\"event_id\":\""));
1316 assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1317 }
1318
1319 #[test]
1320 fn hash_chain_links_every_event() {
1321 let data = temp_data_path("chain");
1322 let logger = AuditLogger::for_data_path(&data);
1323 for i in 0..5 {
1324 logger.record_event(
1325 AuditEvent::builder(format!("test/event/{i}"))
1326 .principal("tester")
1327 .build(),
1328 );
1329 }
1330 drain(&logger);
1331 let body = std::fs::read_to_string(logger.path()).unwrap();
1332 let lines: Vec<&str> = body.lines().collect();
1333 assert_eq!(lines.len(), 5);
1334 let mut prev: Option<String> = None;
1335 for (idx, line) in lines.iter().enumerate() {
1336 let parsed: JsonValue = crate::json::from_str(line).unwrap();
1337 let stored_prev = parsed
1338 .get("prev_hash")
1339 .and_then(|v| v.as_str())
1340 .map(|s| s.to_string());
1341 assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1342 prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1343 }
1344 }
1345
1346 #[test]
1347 fn legacy_record_back_compat_maps_outcomes() {
1348 let data = temp_data_path("legacy");
1349 let logger = AuditLogger::for_data_path(&data);
1350 logger.record(
1351 "admin/restore",
1352 "operator",
1353 "instance",
1354 "err: disk full",
1355 JsonValue::Null,
1356 );
1357 drain(&logger);
1358 let body = std::fs::read_to_string(logger.path()).unwrap();
1359 assert!(body.contains("\"outcome\":\"error\""));
1360 assert!(body.contains("\"result_text\":\"err: disk full\""));
1361 }
1362
1363 #[test]
1364 fn iso8601_formats_known_epoch() {
1365 assert_eq!(
1366 format_iso8601(1_709_210_096_789),
1367 "2024-02-29T12:34:56.789Z"
1368 );
1369 }
1370
1371 #[test]
1372 fn rotation_at_threshold() {
1373 let data = temp_data_path("rotate");
1374 let parent = data.parent().unwrap().to_path_buf();
1375 let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1376 for i in 0..30 {
1377 logger.record_event(
1378 AuditEvent::builder(format!("test/rotate/{i}"))
1379 .principal("rotator")
1380 .detail(JsonValue::String(
1381 "lorem ipsum dolor sit amet consectetur padding padding padding"
1382 .to_string(),
1383 ))
1384 .build(),
1385 );
1386 }
1387 drain(&logger);
1388 let parent = logger.path().parent().unwrap();
1389 let rotated: Vec<_> = std::fs::read_dir(parent)
1390 .unwrap()
1391 .filter_map(|e| e.ok())
1392 .filter(|e| {
1393 e.file_name()
1394 .to_str()
1395 .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1396 .unwrap_or(false)
1397 })
1398 .collect();
1399 assert!(
1400 !rotated.is_empty(),
1401 "expected at least one rotated .zst file"
1402 );
1403 }
1404
1405 #[test]
1406 fn parse_line_round_trips() {
1407 let ev = AuditEvent::builder("auth/login.ok")
1408 .principal("alice")
1409 .source(AuditAuthSource::Password)
1410 .tenant("acme")
1411 .outcome(Outcome::Success)
1412 .build();
1413 let line = ev.to_json_line(None);
1414 let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1415 assert_eq!(parsed.action, "auth/login.ok");
1416 assert_eq!(parsed.principal.as_deref(), Some("alice"));
1417 assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1418 assert_eq!(parsed.outcome, Outcome::Success);
1419 assert_eq!(parsed.source, AuditAuthSource::Password);
1420 }
1421
1422 #[test]
1423 fn event_id_is_lexicographically_sortable_by_time() {
1424 let a = new_event_id();
1425 std::thread::sleep(Duration::from_millis(2));
1426 let b = new_event_id();
1427 assert!(a < b, "event_id ordering broken: {a} >= {b}");
1428 }
1429
1430 #[test]
1435 fn audit_field_escaper_typed_string() {
1436 let f = AuditFieldEscaper::field("collection", "users");
1437 assert_eq!(f.name(), "collection");
1438 match f.value() {
1439 AuditValue::String(s) => assert_eq!(s, "users"),
1440 other => panic!("expected String, got {:?}", other),
1441 }
1442 }
1443
1444 #[test]
1445 fn audit_field_escaper_bytes_emit_base64() {
1446 let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1447 let f = AuditFieldEscaper::field("blob", bytes);
1448 let ev = AuditEvent::builder("test/bytes").field(f).build();
1449 let line = ev.to_json_line(None);
1450 assert!(
1452 line.contains("\"blob\":\"3q2+7w==\""),
1453 "expected base64 emission: {line}"
1454 );
1455 }
1456
1457 #[test]
1458 fn audit_field_escaper_number_bool_null() {
1459 let ev = AuditEvent::builder("test/types")
1460 .field(AuditFieldEscaper::field("count", 42i64))
1461 .field(AuditFieldEscaper::field("ok", true))
1462 .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1463 .build();
1464 let line = ev.to_json_line(None);
1465 assert!(line.contains("\"count\":42"));
1466 assert!(line.contains("\"ok\":true"));
1467 assert!(line.contains("\"missing\":null"));
1468 }
1469
1470 #[test]
1471 fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1472 let cases: &[(&str, &str)] = &[
1478 ("crlf", "line1\r\nline2"),
1479 ("nul", "before\0after"),
1480 ("quote", "she said \"hi\""),
1481 ("semicolon", "a;b;c"),
1482 ("json_in_json", r#"{"injected":"yes"}"#),
1483 ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1484 ("backslash", "C:\\path\\file"),
1485 ("mixed", "name=\"x\"\n\\path\t\x01end"),
1486 ("empty", ""),
1487 ];
1494 let mut survivors = 0usize;
1495 for (label, payload) in cases {
1496 let f = AuditFieldEscaper::field("user_input", *payload);
1497 let ev = AuditEvent::builder(format!("test/adv/{label}"))
1498 .principal("attacker")
1499 .field(f)
1500 .build();
1501 let line = ev.to_json_line(None);
1502 assert!(
1504 !line.contains('\n'),
1505 "{label}: embedded newline in JSONL row: {line:?}"
1506 );
1507 let parsed: JsonValue = crate::json::from_str(&line)
1509 .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1510 let detail = parsed.get("detail").expect("detail present");
1511 let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1512 assert_eq!(
1513 recovered, *payload,
1514 "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1515 );
1516 survivors += 1;
1517 }
1518 assert_eq!(
1519 survivors,
1520 cases.len(),
1521 "adversarial corpus survival rate: {survivors}/{}",
1522 cases.len()
1523 );
1524 }
1525
1526 #[test]
1527 fn audit_emission_emits_one_line_per_call_through_guard() {
1528 let data = temp_data_path("guard-emission");
1529 let logger = AuditLogger::for_data_path(&data);
1530 let attacker = "users\";DROP\r\n{\"x\":1}\0";
1532 logger.record_event(
1533 AuditEvent::builder("admin/scan")
1534 .principal("evil")
1535 .field(AuditFieldEscaper::field("collection", attacker))
1536 .build(),
1537 );
1538 drain(&logger);
1539 let body = std::fs::read_to_string(logger.path()).unwrap();
1540 let lines: Vec<&str> = body.lines().collect();
1541 assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1542 let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1544 let recovered = parsed
1545 .get("detail")
1546 .and_then(|d| d.get("collection"))
1547 .and_then(|v| v.as_str())
1548 .unwrap();
1549 assert_eq!(recovered, attacker);
1550 }
1551
1552 #[test]
1553 fn audit_field_escaper_no_format_macro_in_value_path() {
1554 let _ = AuditFieldEscaper::field("name", "value"); }
1564
1565 #[test]
1566 fn audit_field_escaper_chains_via_builder_fields() {
1567 let ev = AuditEvent::builder("test/multi")
1568 .fields([
1569 AuditFieldEscaper::field("a", "x"),
1570 AuditFieldEscaper::field("b", 7i64),
1571 AuditFieldEscaper::field("c", true),
1572 ])
1573 .build();
1574 let line = ev.to_json_line(None);
1575 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1576 let d = parsed.get("detail").unwrap();
1577 assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1578 assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1579 assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1580 }
1581
1582 proptest::proptest! {
1583 #[test]
1595 fn prop_audit_field_round_trips_arbitrary_strings(
1596 payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1597 ) {
1598 let f = AuditFieldEscaper::field("p", payload.as_str());
1599 let ev = AuditEvent::builder("prop/test").field(f).build();
1600 let line = ev.to_json_line(None);
1601 proptest::prop_assert!(!line.contains('\n'));
1603 let parsed: JsonValue = crate::json::from_str(&line)
1604 .expect("emission must always parse");
1605 let recovered = parsed
1606 .get("detail")
1607 .and_then(|d| d.get("p"))
1608 .and_then(|v| v.as_str())
1609 .unwrap();
1610 proptest::prop_assert_eq!(recovered, payload.as_str());
1611 }
1612
1613 #[test]
1616 fn prop_audit_field_bytes_base64_round_trip(
1617 bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1618 ) {
1619 let f = AuditFieldEscaper::field("b", bytes.clone());
1620 let ev = AuditEvent::builder("prop/bytes").field(f).build();
1621 let line = ev.to_json_line(None);
1622 proptest::prop_assert!(!line.contains('\n'));
1623 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1624 let recovered_b64 = parsed
1625 .get("detail")
1626 .and_then(|d| d.get("b"))
1627 .and_then(|v| v.as_str())
1628 .unwrap()
1629 .to_string();
1630 proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1631 }
1632 }
1633}