1use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57 ApiKey,
58 Session,
59 Password,
60 Oauth,
61 ClientCert,
62 Anonymous,
63 System,
64}
65
66impl AuditAuthSource {
67 pub fn as_str(self) -> &'static str {
68 match self {
69 Self::ApiKey => "api_key",
70 Self::Session => "session",
71 Self::Password => "password",
72 Self::Oauth => "oauth",
73 Self::ClientCert => "client_cert",
74 Self::Anonymous => "anonymous",
75 Self::System => "system",
76 }
77 }
78
79 pub fn parse(s: &str) -> Option<Self> {
80 Some(match s {
81 "api_key" => Self::ApiKey,
82 "session" => Self::Session,
83 "password" => Self::Password,
84 "oauth" => Self::Oauth,
85 "client_cert" => Self::ClientCert,
86 "anonymous" => Self::Anonymous,
87 "system" => Self::System,
88 _ => return None,
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96 Success,
97 Denied,
98 Error,
99}
100
101impl Outcome {
102 pub fn as_str(self) -> &'static str {
103 match self {
104 Self::Success => "success",
105 Self::Denied => "denied",
106 Self::Error => "error",
107 }
108 }
109
110 pub fn parse(s: &str) -> Option<Self> {
111 Some(match s {
112 "success" | "ok" => Self::Success,
113 "denied" | "deny" => Self::Denied,
114 "error" | "err" => Self::Error,
115 _ => return None,
116 })
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct AuditEvent {
123 pub ts: u128,
124 pub event_id: String,
125 pub principal: Option<String>,
126 pub source: AuditAuthSource,
127 pub tenant: Option<String>,
128 pub action: String,
129 pub resource: Option<String>,
130 pub outcome: Outcome,
131 pub detail: JsonValue,
132 pub remote_addr: Option<String>,
133 pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137 pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140 AuditEventBuilder {
141 inner: AuditEvent {
142 ts: crate::utils::now_unix_millis() as u128,
143 event_id: new_event_id(),
144 principal: None,
145 source: AuditAuthSource::System,
146 tenant: None,
147 action: action.into(),
148 resource: None,
149 outcome: Outcome::Success,
150 detail: JsonValue::Null,
151 remote_addr: None,
152 correlation_id: None,
153 },
154 }
155 }
156
157 pub fn from_legacy(
161 action: &str,
162 principal: &str,
163 target: &str,
164 result: &str,
165 details: JsonValue,
166 ) -> Self {
167 let outcome = if result == "ok" {
168 Outcome::Success
169 } else if result.starts_with("err") {
170 Outcome::Error
171 } else if result.starts_with("denied") || result.starts_with("deny") {
172 Outcome::Denied
173 } else {
174 Outcome::Success
175 };
176 let mut detail = details;
177 if !result.is_empty() && result != "ok" {
178 let mut obj = match detail {
181 JsonValue::Object(map) => map,
182 JsonValue::Null => Map::new(),
183 other => {
184 let mut m = Map::new();
185 m.insert("legacy".to_string(), other);
186 m
187 }
188 };
189 obj.entry("result_text".to_string())
190 .or_insert(JsonValue::String(result.to_string()));
191 detail = JsonValue::Object(obj);
192 }
193 Self {
194 ts: crate::utils::now_unix_millis() as u128,
195 event_id: new_event_id(),
196 principal: if principal.is_empty() || principal == "system" {
197 None
198 } else {
199 Some(principal.to_string())
200 },
201 source: if principal == "system" {
202 AuditAuthSource::System
203 } else if principal.is_empty() {
204 AuditAuthSource::Anonymous
205 } else {
206 AuditAuthSource::Password
207 },
208 tenant: None,
209 action: action.to_string(),
210 resource: if target.is_empty() {
211 None
212 } else {
213 Some(target.to_string())
214 },
215 outcome,
216 detail,
217 remote_addr: None,
218 correlation_id: None,
219 }
220 }
221
222 pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225 let mut object = Map::new();
226 object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227 object.insert(
228 "ts_iso".to_string(),
229 JsonValue::String(format_iso8601(self.ts as u64)),
230 );
231 object.insert(
232 "event_id".to_string(),
233 JsonValue::String(self.event_id.clone()),
234 );
235 if let Some(p) = &self.principal {
236 object.insert("principal".to_string(), JsonValue::String(p.clone()));
237 }
238 object.insert(
239 "source".to_string(),
240 JsonValue::String(self.source.as_str().to_string()),
241 );
242 if let Some(t) = &self.tenant {
243 object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244 }
245 object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246 if let Some(r) = &self.resource {
247 object.insert("resource".to_string(), JsonValue::String(r.clone()));
248 }
249 object.insert(
250 "outcome".to_string(),
251 JsonValue::String(self.outcome.as_str().to_string()),
252 );
253 if !matches!(self.detail, JsonValue::Null) {
254 object.insert("detail".to_string(), self.detail.clone());
255 }
256 if let Some(ip) = &self.remote_addr {
257 object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258 }
259 if let Some(cid) = &self.correlation_id {
260 object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261 }
262 if let Some(h) = prev_hash {
263 object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264 }
265 JsonValue::Object(object).to_string_compact()
266 }
267
268 pub fn parse_line(line: &str) -> Option<Self> {
272 let v: JsonValue = crate::json::from_str(line).ok()?;
273 let action = v.get("action")?.as_str()?.to_string();
274 let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275 let outcome = Outcome::parse(outcome_s)?;
276 let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277 let event_id = v
278 .get("event_id")
279 .and_then(|n| n.as_str())
280 .unwrap_or("")
281 .to_string();
282 let source = v
283 .get("source")
284 .and_then(|n| n.as_str())
285 .and_then(AuditAuthSource::parse)
286 .unwrap_or(AuditAuthSource::System);
287 Some(Self {
288 ts,
289 event_id,
290 principal: v
291 .get("principal")
292 .and_then(|n| n.as_str())
293 .map(|s| s.to_string()),
294 source,
295 tenant: v
296 .get("tenant")
297 .and_then(|n| n.as_str())
298 .map(|s| s.to_string()),
299 action,
300 resource: v
301 .get("resource")
302 .and_then(|n| n.as_str())
303 .map(|s| s.to_string()),
304 outcome,
305 detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306 remote_addr: v
307 .get("remote_addr")
308 .and_then(|n| n.as_str())
309 .map(|s| s.to_string()),
310 correlation_id: v
311 .get("correlation_id")
312 .and_then(|n| n.as_str())
313 .map(|s| s.to_string()),
314 })
315 }
316}
317
318pub struct AuditEventBuilder {
320 inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324 pub fn principal(mut self, principal: impl Into<String>) -> Self {
325 self.inner.principal = Some(principal.into());
326 self
327 }
328
329 pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330 self.inner.principal = principal;
331 self
332 }
333
334 pub fn source(mut self, source: AuditAuthSource) -> Self {
335 self.inner.source = source;
336 self
337 }
338
339 pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340 self.inner.tenant = Some(tenant.into());
341 self
342 }
343
344 pub fn resource(mut self, resource: impl Into<String>) -> Self {
345 self.inner.resource = Some(resource.into());
346 self
347 }
348
349 pub fn outcome(mut self, outcome: Outcome) -> Self {
350 self.inner.outcome = outcome;
351 self
352 }
353
354 pub fn detail(mut self, detail: JsonValue) -> Self {
355 self.inner.detail = detail;
356 self
357 }
358
359 pub fn field(mut self, field: AuditField) -> Self {
363 let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364 JsonValue::Object(map) => map,
365 JsonValue::Null => Map::new(),
366 other => {
367 let mut m = Map::new();
370 m.insert("legacy_detail".to_string(), other);
371 m
372 }
373 };
374 obj.insert(field.name.to_string(), field.value.into_json_value());
375 self.inner.detail = JsonValue::Object(obj);
376 self
377 }
378
379 pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381 for f in fields {
382 self = self.field(f);
383 }
384 self
385 }
386
387 pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388 self.inner.remote_addr = Some(addr.into());
389 self
390 }
391
392 pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393 self.inner.correlation_id = Some(cid.into());
394 self
395 }
396
397 pub fn build(self) -> AuditEvent {
398 self.inner
399 }
400}
401
402#[derive(Debug, Clone)]
416pub enum AuditValue {
417 String(String),
418 Bytes(Vec<u8>),
422 Number(i64),
423 Bool(bool),
424 Null,
425}
426
427impl AuditValue {
428 fn into_json_value(self) -> JsonValue {
429 match self {
430 AuditValue::String(s) => JsonValue::String(s),
431 AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432 AuditValue::Number(n) => JsonValue::Number(n as f64),
433 AuditValue::Bool(b) => JsonValue::Bool(b),
434 AuditValue::Null => JsonValue::Null,
435 }
436 }
437}
438
439impl From<String> for AuditValue {
440 fn from(value: String) -> Self {
441 AuditValue::String(value)
442 }
443}
444
445impl From<&str> for AuditValue {
446 fn from(value: &str) -> Self {
447 AuditValue::String(value.to_string())
448 }
449}
450
451impl From<&String> for AuditValue {
452 fn from(value: &String) -> Self {
453 AuditValue::String(value.clone())
454 }
455}
456
457impl From<Vec<u8>> for AuditValue {
458 fn from(value: Vec<u8>) -> Self {
459 AuditValue::Bytes(value)
460 }
461}
462
463impl From<&[u8]> for AuditValue {
464 fn from(value: &[u8]) -> Self {
465 AuditValue::Bytes(value.to_vec())
466 }
467}
468
469impl From<i64> for AuditValue {
470 fn from(value: i64) -> Self {
471 AuditValue::Number(value)
472 }
473}
474
475impl From<u64> for AuditValue {
476 fn from(value: u64) -> Self {
477 AuditValue::Number(if value > i64::MAX as u64 {
480 i64::MAX
481 } else {
482 value as i64
483 })
484 }
485}
486
487impl From<u32> for AuditValue {
488 fn from(value: u32) -> Self {
489 AuditValue::Number(value as i64)
490 }
491}
492
493impl From<usize> for AuditValue {
494 fn from(value: usize) -> Self {
495 AuditValue::Number(if value > i64::MAX as usize {
496 i64::MAX
497 } else {
498 value as i64
499 })
500 }
501}
502
503impl From<bool> for AuditValue {
504 fn from(value: bool) -> Self {
505 AuditValue::Bool(value)
506 }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510 fn from(value: Option<T>) -> Self {
511 match value {
512 Some(v) => v.into(),
513 None => AuditValue::Null,
514 }
515 }
516}
517
518#[derive(Debug, Clone)]
523pub struct AuditField {
524 name: &'static str,
525 value: AuditValue,
526}
527
528impl AuditField {
529 pub fn name(&self) -> &'static str {
530 self.name
531 }
532 pub fn value(&self) -> &AuditValue {
533 &self.value
534 }
535}
536
537pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555 pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560 AuditField {
561 name,
562 value: value.into(),
563 }
564 }
565}
566
567fn base64_encode(input: &[u8]) -> String {
574 const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575 let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576 let mut i = 0;
577 while i + 3 <= input.len() {
578 let b0 = input[i] as u32;
579 let b1 = input[i + 1] as u32;
580 let b2 = input[i + 2] as u32;
581 let n = (b0 << 16) | (b1 << 8) | b2;
582 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585 out.push(ALPHABET[(n & 0x3f) as usize] as char);
586 i += 3;
587 }
588 let rem = input.len() - i;
589 if rem == 1 {
590 let n = (input[i] as u32) << 16;
591 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593 out.push('=');
594 out.push('=');
595 } else if rem == 2 {
596 let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600 out.push('=');
601 }
602 out
603}
604
605fn new_event_id() -> String {
612 const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613 let now_ms = crate::utils::now_unix_millis();
614 let mut rand_bytes = [0u8; 10];
615 let _ = os_random::fill_bytes(&mut rand_bytes);
616
617 let mut out = String::with_capacity(26);
618 for i in (0..10).rev() {
620 let shift = (i as u32) * 5;
621 let idx = ((now_ms >> shift) & 0x1f) as usize;
622 out.push(ALPHABET[idx] as char);
623 }
624 let mut acc: u128 = 0;
626 for &b in &rand_bytes {
627 acc = (acc << 8) | (b as u128);
628 }
629 for i in (0..16).rev() {
630 let shift = (i as u32) * 5;
631 let idx = ((acc >> shift) & 0x1f) as usize;
632 out.push(ALPHABET[idx] as char);
633 }
634 out
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643 Off,
645 Every,
647 Periodic,
650}
651
652impl FsyncMode {
653 fn from_env() -> Self {
654 match std::env::var("RED_AUDIT_FSYNC")
655 .unwrap_or_default()
656 .to_ascii_lowercase()
657 .as_str()
658 {
659 "every" | "strong" | "on" => Self::Every,
660 "off" | "none" => Self::Off,
661 _ => Self::Periodic,
662 }
663 }
664}
665
666#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672 Event(AuditEvent),
673 Flush(mpsc::Sender<()>),
674 Shutdown,
675}
676
677#[derive(Debug)]
682pub struct AuditLogger {
683 path: PathBuf,
684 tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685 fallback_lock: Mutex<()>,
689 last_hash: Arc<Mutex<Option<String>>>,
693 max_bytes: u64,
694 fsync_mode: FsyncMode,
695 stream_url: Option<String>,
696 writer_alive: Arc<AtomicBool>,
698 pending: Arc<AtomicU64>,
699 handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703 pub fn for_data_path(data_path: &Path) -> Self {
706 Self::with_path(reddb_file::layout::legacy_audit_log_path(data_path))
707 }
708
709 pub fn for_destination(
716 dest: &crate::storage::layout::LogDestination,
717 fallback_data_path: &Path,
718 ) -> Self {
719 use crate::storage::layout::LogDestination;
720 match dest {
721 LogDestination::File(path) => Self::with_path(path.clone()),
722 LogDestination::Stderr => Self::for_data_path(fallback_data_path),
723 LogDestination::Syslog => {
724 tracing::warn!(
725 target: "reddb::audit",
726 "audit LogDestination::Syslog requested; sink not implemented, falling back to file next to data path"
727 );
728 Self::for_data_path(fallback_data_path)
729 }
730 }
731 }
732
733 pub fn with_path(path: PathBuf) -> Self {
735 let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
736 .ok()
737 .and_then(|v| v.parse::<u64>().ok())
738 .unwrap_or(64 * 1024 * 1024);
739 let fsync_mode = FsyncMode::from_env();
740 let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
741 .ok()
742 .filter(|s| !s.is_empty());
743 Self::with_settings(path, max_bytes, fsync_mode, stream_url)
744 }
745
746 pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
749 Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
750 }
751
752 fn with_settings(
753 path: PathBuf,
754 max_bytes: u64,
755 fsync_mode: FsyncMode,
756 stream_url: Option<String>,
757 ) -> Self {
758 let writer_alive = Arc::new(AtomicBool::new(false));
759 let pending = Arc::new(AtomicU64::new(0));
760 let mut seed: Option<String> = None;
763 if let Ok(body) = std::fs::read_to_string(&path) {
764 if let Some(line) = body.lines().last() {
765 seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
766 }
767 }
768 let last_hash = Arc::new(Mutex::new(seed));
769 let logger = Self {
770 path,
771 tx: Mutex::new(None),
772 fallback_lock: Mutex::new(()),
773 last_hash,
774 max_bytes,
775 fsync_mode,
776 stream_url,
777 writer_alive: Arc::clone(&writer_alive),
778 pending: Arc::clone(&pending),
779 handle: Mutex::new(None),
780 };
781 logger.start_writer_thread();
782 logger
783 }
784
785 pub fn path(&self) -> &Path {
786 &self.path
787 }
788
789 pub fn max_bytes(&self) -> u64 {
791 self.max_bytes
792 }
793
794 fn start_writer_thread(&self) {
795 let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
796 *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
797 let path = self.path.clone();
798 let max_bytes = self.max_bytes;
799 let fsync_mode = self.fsync_mode;
800 let stream_url = self.stream_url.clone();
801 let writer_alive = Arc::clone(&self.writer_alive);
802 let pending = Arc::clone(&self.pending);
803 let last_hash = Arc::clone(&self.last_hash);
804 writer_alive.store(true, Ordering::SeqCst);
805 let handle = thread::Builder::new()
806 .name("reddb-audit-writer".to_string())
807 .spawn(move || {
808 writer_loop(
809 rx,
810 path,
811 max_bytes,
812 fsync_mode,
813 stream_url,
814 writer_alive,
815 pending,
816 last_hash,
817 );
818 })
819 .expect("spawn audit writer thread");
820 *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
821 }
822
823 pub fn current_hash(&self) -> Option<String> {
825 self.last_hash
826 .lock()
827 .unwrap_or_else(|e| e.into_inner())
828 .clone()
829 }
830
831 pub fn wait_idle(&self, timeout: Duration) -> bool {
834 let deadline = Instant::now() + timeout;
835 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
836 if let Some(tx) = tx_guard.as_ref() {
837 let (back_tx, back_rx) = mpsc::channel();
838 if tx.send(WriterMsg::Flush(back_tx)).is_err() {
839 return false;
840 }
841 drop(tx_guard);
842 let remaining = deadline.saturating_duration_since(Instant::now());
843 return back_rx.recv_timeout(remaining).is_ok();
844 }
845 false
846 }
847
848 pub fn record(
851 &self,
852 action: &str,
853 principal: &str,
854 target: &str,
855 result: &str,
856 details: JsonValue,
857 ) {
858 let event = AuditEvent::from_legacy(action, principal, target, result, details);
859 self.record_event(event);
860 }
861
862 pub fn record_event(&self, event: AuditEvent) {
866 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
867 let recovered_event: AuditEvent;
868 if let Some(tx) = tx_guard.as_ref() {
869 self.pending.fetch_add(1, Ordering::SeqCst);
870 match tx.try_send(WriterMsg::Event(event)) {
871 Ok(()) => return,
872 Err(mpsc::TrySendError::Full(msg)) => {
873 self.pending.fetch_sub(1, Ordering::SeqCst);
874 tracing::warn!(
875 target: "reddb::audit",
876 "audit channel saturated; falling back to sync write"
877 );
878 recovered_event = match msg {
879 WriterMsg::Event(ev) => ev,
880 _ => return,
881 };
882 }
883 Err(mpsc::TrySendError::Disconnected(msg)) => {
884 self.pending.fetch_sub(1, Ordering::SeqCst);
885 recovered_event = match msg {
886 WriterMsg::Event(ev) => ev,
887 _ => return,
888 };
889 }
890 }
891 } else {
892 recovered_event = event;
893 }
894 drop(tx_guard);
895 self.write_direct(recovered_event);
896 }
897
898 fn write_direct(&self, event: AuditEvent) {
901 let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
902 let prev = self
903 .last_hash
904 .lock()
905 .unwrap_or_else(|e| e.into_inner())
906 .clone();
907 let line = event.to_json_line(prev.as_deref());
908 if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
909 tracing::warn!(
910 target: "reddb::audit",
911 error = %err,
912 path = %self.path.display(),
913 "direct audit append failed"
914 );
915 return;
916 }
917 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
918 if let Ok(mut g) = self.last_hash.lock() {
919 *g = Some(new_hash);
920 }
921 if let Some(url) = &self.stream_url {
922 stream_post(url, &line);
923 }
924 tracing::info!(target: "reddb::audit", "{line}");
925 }
926}
927
928impl Drop for AuditLogger {
929 fn drop(&mut self) {
930 let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
931 if let Some(tx) = tx_guard.take() {
932 let _ = tx.send(WriterMsg::Shutdown);
933 }
934 drop(tx_guard);
935 if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
936 let _ = handle.join();
937 }
938 }
939}
940
941#[allow(clippy::too_many_arguments)]
946fn writer_loop(
947 rx: mpsc::Receiver<WriterMsg>,
948 path: PathBuf,
949 max_bytes: u64,
950 fsync_mode: FsyncMode,
951 stream_url: Option<String>,
952 writer_alive: Arc<AtomicBool>,
953 pending: Arc<AtomicU64>,
954 last_hash: Arc<Mutex<Option<String>>>,
955) {
956 if let Some(parent) = path.parent() {
957 if !parent.as_os_str().is_empty() {
958 let _ = std::fs::create_dir_all(parent);
959 }
960 }
961
962 let mut writer = match open_active(&path) {
963 Ok(w) => w,
964 Err(err) => {
965 tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
966 writer_alive.store(false, Ordering::SeqCst);
967 return;
968 }
969 };
970 let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
974
975 let periodic_interval = Duration::from_millis(250);
976 let mut last_flush = Instant::now();
977 let mut buffered_since_fsync: u64 = 0;
978
979 loop {
980 let recv_timeout = match fsync_mode {
983 FsyncMode::Periodic => periodic_interval,
984 FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
985 };
986 match rx.recv_timeout(recv_timeout) {
987 Ok(WriterMsg::Event(event)) => {
988 let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
989 let line = event.to_json_line(prev.as_deref());
990
991 let line_bytes = line.len() as u64 + 1; if let Err(err) = write_line(&mut writer, &line) {
993 tracing::warn!(
994 target: "reddb::audit",
995 error = %err,
996 "audit write failed; reopening"
997 );
998 if let Ok(w2) = open_active(&path) {
999 writer = w2;
1000 let _ = write_line(&mut writer, &line);
1001 }
1002 }
1003 bytes_written = bytes_written.saturating_add(line_bytes);
1004 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
1005 if let Ok(mut g) = last_hash.lock() {
1006 *g = Some(new_hash);
1007 }
1008 if let Some(url) = &stream_url {
1009 stream_post(url, &line);
1010 }
1011 tracing::info!(target: "reddb::audit", "{line}");
1012 pending.fetch_sub(1, Ordering::SeqCst);
1013 buffered_since_fsync += 1;
1014
1015 match fsync_mode {
1016 FsyncMode::Every => {
1017 let _ = writer.flush();
1018 let _ = writer.get_ref().sync_data();
1019 buffered_since_fsync = 0;
1020 }
1021 FsyncMode::Periodic => {
1022 if last_flush.elapsed() >= periodic_interval {
1023 let _ = writer.flush();
1024 let _ = writer.get_ref().sync_data();
1025 last_flush = Instant::now();
1026 buffered_since_fsync = 0;
1027 }
1028 }
1029 FsyncMode::Off => {}
1030 }
1031
1032 if bytes_written >= max_bytes {
1035 let _ = writer.flush();
1036 let _ = writer.get_ref().sync_data();
1037 if let Err(err) = rotate(&path) {
1038 tracing::warn!(
1039 target: "reddb::audit",
1040 error = %err,
1041 "audit rotation failed; continuing in-place"
1042 );
1043 }
1044 match open_active(&path) {
1045 Ok(w2) => writer = w2,
1046 Err(err) => {
1047 tracing::error!(
1048 target: "reddb::audit",
1049 error = %err,
1050 "audit reopen failed after rotate"
1051 );
1052 break;
1053 }
1054 }
1055 last_flush = Instant::now();
1056 buffered_since_fsync = 0;
1057 bytes_written = 0;
1058 }
1059 }
1060 Ok(WriterMsg::Flush(ack)) => {
1061 let _ = writer.flush();
1062 let _ = writer.get_ref().sync_data();
1063 last_flush = Instant::now();
1064 buffered_since_fsync = 0;
1065 let _ = ack.send(());
1069 }
1070 Ok(WriterMsg::Shutdown) => {
1071 let _ = writer.flush();
1072 let _ = writer.get_ref().sync_data();
1073 break;
1074 }
1075 Err(mpsc::RecvTimeoutError::Timeout) => {
1076 if buffered_since_fsync > 0 {
1077 let _ = writer.flush();
1078 let _ = writer.get_ref().sync_data();
1079 last_flush = Instant::now();
1080 buffered_since_fsync = 0;
1081 }
1082 }
1083 Err(mpsc::RecvTimeoutError::Disconnected) => {
1084 let _ = writer.flush();
1085 let _ = writer.get_ref().sync_data();
1086 break;
1087 }
1088 }
1089 }
1090
1091 writer_alive.store(false, Ordering::SeqCst);
1092}
1093
1094fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1095 if let Some(parent) = path.parent() {
1096 if !parent.as_os_str().is_empty() {
1097 std::fs::create_dir_all(parent)?;
1098 }
1099 }
1100 let f = std::fs::OpenOptions::new()
1101 .create(true)
1102 .append(true)
1103 .open(path)?;
1104 Ok(BufWriter::new(f))
1105}
1106
1107fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1108 writer.write_all(line.as_bytes())?;
1109 writer.write_all(b"\n")?;
1110 Ok(())
1111}
1112
1113fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1114 if let Some(parent) = path.parent() {
1115 if !parent.as_os_str().is_empty() {
1116 std::fs::create_dir_all(parent)?;
1117 }
1118 }
1119 let mut file = std::fs::OpenOptions::new()
1120 .create(true)
1121 .append(true)
1122 .open(path)?;
1123 file.write_all(line.as_bytes())?;
1124 file.write_all(b"\n")?;
1125 file.sync_data()?;
1126 drop(file);
1127 if let Ok(meta) = std::fs::metadata(path) {
1128 if meta.len() >= max_bytes {
1129 let _ = rotate(path);
1130 }
1131 }
1132 Ok(())
1133}
1134
1135fn rotate(active: &Path) -> std::io::Result<()> {
1143 let ts = crate::utils::now_unix_nanos();
1144 let rotation = reddb_file::rotate_audit_log(active, ts)?;
1145 if let Some(err) = rotation.compression_error {
1146 tracing::warn!(
1147 target: "reddb::audit",
1148 error = %err,
1149 "audit rotation: zstd compress failed; leaving plaintext"
1150 );
1151 }
1152 Ok(())
1153}
1154
1155fn stream_post(url: &str, line: &str) {
1160 let url = url.to_string();
1161 let line = line.to_string();
1162 let _ = thread::Builder::new()
1166 .name("reddb-audit-siem".to_string())
1167 .spawn(move || {
1168 let agent: ureq::Agent = ureq::Agent::config_builder()
1169 .timeout_connect(Some(Duration::from_secs(2)))
1170 .timeout_send_request(Some(Duration::from_secs(3)))
1171 .timeout_recv_response(Some(Duration::from_secs(3)))
1172 .http_status_as_error(false)
1173 .build()
1174 .into();
1175 let _ = agent
1176 .post(&url)
1177 .header("content-type", "application/x-ndjson")
1178 .send(line.as_bytes());
1179 });
1180}
1181
1182fn format_iso8601(ms_since_epoch: u64) -> String {
1187 let secs = ms_since_epoch / 1000;
1188 let ms = ms_since_epoch % 1000;
1189 let days = secs / 86_400;
1190 let rem = secs % 86_400;
1191 let (y, mo, d) = civil_from_days(days as i64);
1192 let h = rem / 3600;
1193 let mi = (rem % 3600) / 60;
1194 let s = rem % 60;
1195 format!(
1196 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1197 y, mo, d, h, mi, s, ms
1198 )
1199}
1200
1201fn civil_from_days(z: i64) -> (i64, u32, u32) {
1202 let z = z + 719_468;
1203 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1204 let doe = (z - era * 146_097) as u64;
1205 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1206 let y = (yoe as i64) + era * 400;
1207 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1208 let mp = (5 * doy + 2) / 153;
1209 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1210 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1211 (if m <= 2 { y + 1 } else { y }, m, d)
1212}
1213
1214#[cfg(test)]
1219mod tests {
1220 use super::*;
1221
1222 fn temp_data_path(tag: &str) -> PathBuf {
1223 let mut p = std::env::temp_dir();
1224 p.push(format!(
1225 "reddb-audit-{}-{}-{}",
1226 tag,
1227 std::process::id(),
1228 crate::utils::now_unix_nanos()
1229 ));
1230 std::fs::create_dir_all(&p).unwrap();
1231 p.push("data.rdb");
1232 p
1233 }
1234
1235 fn drain(logger: &AuditLogger) {
1236 assert!(logger.wait_idle(Duration::from_secs(2)));
1237 }
1238
1239 #[test]
1240 fn record_writes_one_line_per_call() {
1241 let data = temp_data_path("one-line");
1242 let logger = AuditLogger::for_data_path(&data);
1243 logger.record(
1244 "admin/readonly",
1245 "operator",
1246 "instance",
1247 "ok",
1248 JsonValue::Null,
1249 );
1250 drain(&logger);
1251 let body = std::fs::read_to_string(logger.path()).unwrap();
1252 let lines: Vec<&str> = body.lines().collect();
1253 assert_eq!(lines.len(), 1);
1254 assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1255 assert!(lines[0].contains("\"outcome\":\"success\""));
1256 }
1257
1258 #[test]
1259 fn record_appends_across_calls() {
1260 let data = temp_data_path("append");
1261 let logger = AuditLogger::for_data_path(&data);
1262 logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1263 logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1264 drain(&logger);
1265 let lines = std::fs::read_to_string(logger.path()).unwrap();
1266 assert_eq!(lines.lines().count(), 2);
1267 }
1268
1269 #[test]
1270 fn record_event_emits_full_schema() {
1271 let data = temp_data_path("schema");
1272 let logger = AuditLogger::for_data_path(&data);
1273 let mut detail = Map::new();
1274 detail.insert("ms".to_string(), JsonValue::Number(412.0));
1275 let ev = AuditEvent::builder("admin/shutdown")
1276 .principal("alice@acme")
1277 .source(AuditAuthSource::Session)
1278 .tenant("acme")
1279 .resource("instance")
1280 .outcome(Outcome::Success)
1281 .detail(JsonValue::Object(detail))
1282 .remote_addr("203.0.113.5")
1283 .correlation_id("req-42")
1284 .build();
1285 logger.record_event(ev);
1286 drain(&logger);
1287 let body = std::fs::read_to_string(logger.path()).unwrap();
1288 assert!(body.contains("\"action\":\"admin/shutdown\""));
1289 assert!(body.contains("\"principal\":\"alice@acme\""));
1290 assert!(body.contains("\"tenant\":\"acme\""));
1291 assert!(body.contains("\"source\":\"session\""));
1292 assert!(body.contains("\"correlation_id\":\"req-42\""));
1293 assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1294 assert!(body.contains("\"event_id\":\""));
1295 assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1296 }
1297
1298 #[test]
1299 fn hash_chain_links_every_event() {
1300 let data = temp_data_path("chain");
1301 let logger = AuditLogger::for_data_path(&data);
1302 for i in 0..5 {
1303 logger.record_event(
1304 AuditEvent::builder(format!("test/event/{i}"))
1305 .principal("tester")
1306 .build(),
1307 );
1308 }
1309 drain(&logger);
1310 let body = std::fs::read_to_string(logger.path()).unwrap();
1311 let lines: Vec<&str> = body.lines().collect();
1312 assert_eq!(lines.len(), 5);
1313 let mut prev: Option<String> = None;
1314 for (idx, line) in lines.iter().enumerate() {
1315 let parsed: JsonValue = crate::json::from_str(line).unwrap();
1316 let stored_prev = parsed
1317 .get("prev_hash")
1318 .and_then(|v| v.as_str())
1319 .map(|s| s.to_string());
1320 assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1321 prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1322 }
1323 }
1324
1325 #[test]
1326 fn legacy_record_back_compat_maps_outcomes() {
1327 let data = temp_data_path("legacy");
1328 let logger = AuditLogger::for_data_path(&data);
1329 logger.record(
1330 "admin/restore",
1331 "operator",
1332 "instance",
1333 "err: disk full",
1334 JsonValue::Null,
1335 );
1336 drain(&logger);
1337 let body = std::fs::read_to_string(logger.path()).unwrap();
1338 assert!(body.contains("\"outcome\":\"error\""));
1339 assert!(body.contains("\"result_text\":\"err: disk full\""));
1340 }
1341
1342 #[test]
1343 fn iso8601_formats_known_epoch() {
1344 assert_eq!(
1345 format_iso8601(1_709_210_096_789),
1346 "2024-02-29T12:34:56.789Z"
1347 );
1348 }
1349
1350 #[test]
1351 fn rotation_at_threshold() {
1352 let data = temp_data_path("rotate");
1353 let parent = data.parent().unwrap().to_path_buf();
1354 let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1355 for i in 0..30 {
1356 logger.record_event(
1357 AuditEvent::builder(format!("test/rotate/{i}"))
1358 .principal("rotator")
1359 .detail(JsonValue::String(
1360 "lorem ipsum dolor sit amet consectetur padding padding padding"
1361 .to_string(),
1362 ))
1363 .build(),
1364 );
1365 }
1366 drain(&logger);
1367 let parent = logger.path().parent().unwrap();
1368 let rotated: Vec<_> = std::fs::read_dir(parent)
1369 .unwrap()
1370 .filter_map(|e| e.ok())
1371 .filter(|e| {
1372 e.file_name()
1373 .to_str()
1374 .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1375 .unwrap_or(false)
1376 })
1377 .collect();
1378 assert!(
1379 !rotated.is_empty(),
1380 "expected at least one rotated .zst file"
1381 );
1382 }
1383
1384 #[test]
1385 fn parse_line_round_trips() {
1386 let ev = AuditEvent::builder("auth/login.ok")
1387 .principal("alice")
1388 .source(AuditAuthSource::Password)
1389 .tenant("acme")
1390 .outcome(Outcome::Success)
1391 .build();
1392 let line = ev.to_json_line(None);
1393 let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1394 assert_eq!(parsed.action, "auth/login.ok");
1395 assert_eq!(parsed.principal.as_deref(), Some("alice"));
1396 assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1397 assert_eq!(parsed.outcome, Outcome::Success);
1398 assert_eq!(parsed.source, AuditAuthSource::Password);
1399 }
1400
1401 #[test]
1402 fn event_id_is_lexicographically_sortable_by_time() {
1403 let a = new_event_id();
1404 std::thread::sleep(Duration::from_millis(2));
1405 let b = new_event_id();
1406 assert!(a < b, "event_id ordering broken: {a} >= {b}");
1407 }
1408
1409 #[test]
1414 fn audit_field_escaper_typed_string() {
1415 let f = AuditFieldEscaper::field("collection", "users");
1416 assert_eq!(f.name(), "collection");
1417 match f.value() {
1418 AuditValue::String(s) => assert_eq!(s, "users"),
1419 other => panic!("expected String, got {:?}", other),
1420 }
1421 }
1422
1423 #[test]
1424 fn audit_field_escaper_bytes_emit_base64() {
1425 let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1426 let f = AuditFieldEscaper::field("blob", bytes);
1427 let ev = AuditEvent::builder("test/bytes").field(f).build();
1428 let line = ev.to_json_line(None);
1429 assert!(
1431 line.contains("\"blob\":\"3q2+7w==\""),
1432 "expected base64 emission: {line}"
1433 );
1434 }
1435
1436 #[test]
1437 fn audit_field_escaper_number_bool_null() {
1438 let ev = AuditEvent::builder("test/types")
1439 .field(AuditFieldEscaper::field("count", 42i64))
1440 .field(AuditFieldEscaper::field("ok", true))
1441 .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1442 .build();
1443 let line = ev.to_json_line(None);
1444 assert!(line.contains("\"count\":42"));
1445 assert!(line.contains("\"ok\":true"));
1446 assert!(line.contains("\"missing\":null"));
1447 }
1448
1449 #[test]
1450 fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1451 let cases: &[(&str, &str)] = &[
1457 ("crlf", "line1\r\nline2"),
1458 ("nul", "before\0after"),
1459 ("quote", "she said \"hi\""),
1460 ("semicolon", "a;b;c"),
1461 ("json_in_json", r#"{"injected":"yes"}"#),
1462 ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1463 ("backslash", "C:\\path\\file"),
1464 ("mixed", "name=\"x\"\n\\path\t\x01end"),
1465 ("empty", ""),
1466 ];
1473 let mut survivors = 0usize;
1474 for (label, payload) in cases {
1475 let f = AuditFieldEscaper::field("user_input", *payload);
1476 let ev = AuditEvent::builder(format!("test/adv/{label}"))
1477 .principal("attacker")
1478 .field(f)
1479 .build();
1480 let line = ev.to_json_line(None);
1481 assert!(
1483 !line.contains('\n'),
1484 "{label}: embedded newline in JSONL row: {line:?}"
1485 );
1486 let parsed: JsonValue = crate::json::from_str(&line)
1488 .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1489 let detail = parsed.get("detail").expect("detail present");
1490 let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1491 assert_eq!(
1492 recovered, *payload,
1493 "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1494 );
1495 survivors += 1;
1496 }
1497 assert_eq!(
1498 survivors,
1499 cases.len(),
1500 "adversarial corpus survival rate: {survivors}/{}",
1501 cases.len()
1502 );
1503 }
1504
1505 #[test]
1506 fn audit_emission_emits_one_line_per_call_through_guard() {
1507 let data = temp_data_path("guard-emission");
1508 let logger = AuditLogger::for_data_path(&data);
1509 let attacker = "users\";DROP\r\n{\"x\":1}\0";
1511 logger.record_event(
1512 AuditEvent::builder("admin/scan")
1513 .principal("evil")
1514 .field(AuditFieldEscaper::field("collection", attacker))
1515 .build(),
1516 );
1517 drain(&logger);
1518 let body = std::fs::read_to_string(logger.path()).unwrap();
1519 let lines: Vec<&str> = body.lines().collect();
1520 assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1521 let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1523 let recovered = parsed
1524 .get("detail")
1525 .and_then(|d| d.get("collection"))
1526 .and_then(|v| v.as_str())
1527 .unwrap();
1528 assert_eq!(recovered, attacker);
1529 }
1530
1531 #[test]
1532 fn audit_field_escaper_no_format_macro_in_value_path() {
1533 let _ = AuditFieldEscaper::field("name", "value"); }
1543
1544 #[test]
1545 fn audit_field_escaper_chains_via_builder_fields() {
1546 let ev = AuditEvent::builder("test/multi")
1547 .fields([
1548 AuditFieldEscaper::field("a", "x"),
1549 AuditFieldEscaper::field("b", 7i64),
1550 AuditFieldEscaper::field("c", true),
1551 ])
1552 .build();
1553 let line = ev.to_json_line(None);
1554 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1555 let d = parsed.get("detail").unwrap();
1556 assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1557 assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1558 assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1559 }
1560
1561 proptest::proptest! {
1562 #[test]
1574 fn prop_audit_field_round_trips_arbitrary_strings(
1575 payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1576 ) {
1577 let f = AuditFieldEscaper::field("p", payload.as_str());
1578 let ev = AuditEvent::builder("prop/test").field(f).build();
1579 let line = ev.to_json_line(None);
1580 proptest::prop_assert!(!line.contains('\n'));
1582 let parsed: JsonValue = crate::json::from_str(&line)
1583 .expect("emission must always parse");
1584 let recovered = parsed
1585 .get("detail")
1586 .and_then(|d| d.get("p"))
1587 .and_then(|v| v.as_str())
1588 .unwrap();
1589 proptest::prop_assert_eq!(recovered, payload.as_str());
1590 }
1591
1592 #[test]
1595 fn prop_audit_field_bytes_base64_round_trip(
1596 bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1597 ) {
1598 let f = AuditFieldEscaper::field("b", bytes.clone());
1599 let ev = AuditEvent::builder("prop/bytes").field(f).build();
1600 let line = ev.to_json_line(None);
1601 proptest::prop_assert!(!line.contains('\n'));
1602 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1603 let recovered_b64 = parsed
1604 .get("detail")
1605 .and_then(|d| d.get("b"))
1606 .and_then(|v| v.as_str())
1607 .unwrap()
1608 .to_string();
1609 proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1610 }
1611 }
1612}