1use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57 ApiKey,
58 Session,
59 Password,
60 Oauth,
61 ClientCert,
62 Anonymous,
63 System,
64}
65
66impl AuditAuthSource {
67 pub fn as_str(self) -> &'static str {
68 match self {
69 Self::ApiKey => "api_key",
70 Self::Session => "session",
71 Self::Password => "password",
72 Self::Oauth => "oauth",
73 Self::ClientCert => "client_cert",
74 Self::Anonymous => "anonymous",
75 Self::System => "system",
76 }
77 }
78
79 pub fn parse(s: &str) -> Option<Self> {
80 Some(match s {
81 "api_key" => Self::ApiKey,
82 "session" => Self::Session,
83 "password" => Self::Password,
84 "oauth" => Self::Oauth,
85 "client_cert" => Self::ClientCert,
86 "anonymous" => Self::Anonymous,
87 "system" => Self::System,
88 _ => return None,
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96 Success,
97 Denied,
98 Error,
99}
100
101impl Outcome {
102 pub fn as_str(self) -> &'static str {
103 match self {
104 Self::Success => "success",
105 Self::Denied => "denied",
106 Self::Error => "error",
107 }
108 }
109
110 pub fn parse(s: &str) -> Option<Self> {
111 Some(match s {
112 "success" | "ok" => Self::Success,
113 "denied" | "deny" => Self::Denied,
114 "error" | "err" => Self::Error,
115 _ => return None,
116 })
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct AuditEvent {
123 pub ts: u128,
124 pub event_id: String,
125 pub principal: Option<String>,
126 pub source: AuditAuthSource,
127 pub tenant: Option<String>,
128 pub action: String,
129 pub resource: Option<String>,
130 pub outcome: Outcome,
131 pub detail: JsonValue,
132 pub remote_addr: Option<String>,
133 pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137 pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140 AuditEventBuilder {
141 inner: AuditEvent {
142 ts: crate::utils::now_unix_millis() as u128,
143 event_id: new_event_id(),
144 principal: None,
145 source: AuditAuthSource::System,
146 tenant: None,
147 action: action.into(),
148 resource: None,
149 outcome: Outcome::Success,
150 detail: JsonValue::Null,
151 remote_addr: None,
152 correlation_id: None,
153 },
154 }
155 }
156
157 pub fn from_legacy(
161 action: &str,
162 principal: &str,
163 target: &str,
164 result: &str,
165 details: JsonValue,
166 ) -> Self {
167 let outcome = if result == "ok" {
168 Outcome::Success
169 } else if result.starts_with("err") {
170 Outcome::Error
171 } else if result.starts_with("denied") || result.starts_with("deny") {
172 Outcome::Denied
173 } else {
174 Outcome::Success
175 };
176 let mut detail = details;
177 if !result.is_empty() && result != "ok" {
178 let mut obj = match detail {
181 JsonValue::Object(map) => map,
182 JsonValue::Null => Map::new(),
183 other => {
184 let mut m = Map::new();
185 m.insert("legacy".to_string(), other);
186 m
187 }
188 };
189 obj.entry("result_text".to_string())
190 .or_insert(JsonValue::String(result.to_string()));
191 detail = JsonValue::Object(obj);
192 }
193 Self {
194 ts: crate::utils::now_unix_millis() as u128,
195 event_id: new_event_id(),
196 principal: if principal.is_empty() || principal == "system" {
197 None
198 } else {
199 Some(principal.to_string())
200 },
201 source: if principal == "system" {
202 AuditAuthSource::System
203 } else if principal.is_empty() {
204 AuditAuthSource::Anonymous
205 } else {
206 AuditAuthSource::Password
207 },
208 tenant: None,
209 action: action.to_string(),
210 resource: if target.is_empty() {
211 None
212 } else {
213 Some(target.to_string())
214 },
215 outcome,
216 detail,
217 remote_addr: None,
218 correlation_id: None,
219 }
220 }
221
222 pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225 let mut object = Map::new();
226 object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227 object.insert(
228 "ts_iso".to_string(),
229 JsonValue::String(format_iso8601(self.ts as u64)),
230 );
231 object.insert(
232 "event_id".to_string(),
233 JsonValue::String(self.event_id.clone()),
234 );
235 if let Some(p) = &self.principal {
236 object.insert("principal".to_string(), JsonValue::String(p.clone()));
237 }
238 object.insert(
239 "source".to_string(),
240 JsonValue::String(self.source.as_str().to_string()),
241 );
242 if let Some(t) = &self.tenant {
243 object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244 }
245 object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246 if let Some(r) = &self.resource {
247 object.insert("resource".to_string(), JsonValue::String(r.clone()));
248 }
249 object.insert(
250 "outcome".to_string(),
251 JsonValue::String(self.outcome.as_str().to_string()),
252 );
253 if !matches!(self.detail, JsonValue::Null) {
254 object.insert("detail".to_string(), self.detail.clone());
255 }
256 if let Some(ip) = &self.remote_addr {
257 object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258 }
259 if let Some(cid) = &self.correlation_id {
260 object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261 }
262 if let Some(h) = prev_hash {
263 object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264 }
265 JsonValue::Object(object).to_string_compact()
266 }
267
268 pub fn parse_line(line: &str) -> Option<Self> {
272 let v: JsonValue = crate::json::from_str(line).ok()?;
273 let action = v.get("action")?.as_str()?.to_string();
274 let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275 let outcome = Outcome::parse(outcome_s)?;
276 let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277 let event_id = v
278 .get("event_id")
279 .and_then(|n| n.as_str())
280 .unwrap_or("")
281 .to_string();
282 let source = v
283 .get("source")
284 .and_then(|n| n.as_str())
285 .and_then(AuditAuthSource::parse)
286 .unwrap_or(AuditAuthSource::System);
287 Some(Self {
288 ts,
289 event_id,
290 principal: v
291 .get("principal")
292 .and_then(|n| n.as_str())
293 .map(|s| s.to_string()),
294 source,
295 tenant: v
296 .get("tenant")
297 .and_then(|n| n.as_str())
298 .map(|s| s.to_string()),
299 action,
300 resource: v
301 .get("resource")
302 .and_then(|n| n.as_str())
303 .map(|s| s.to_string()),
304 outcome,
305 detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306 remote_addr: v
307 .get("remote_addr")
308 .and_then(|n| n.as_str())
309 .map(|s| s.to_string()),
310 correlation_id: v
311 .get("correlation_id")
312 .and_then(|n| n.as_str())
313 .map(|s| s.to_string()),
314 })
315 }
316}
317
318pub struct AuditEventBuilder {
320 inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324 pub fn principal(mut self, principal: impl Into<String>) -> Self {
325 self.inner.principal = Some(principal.into());
326 self
327 }
328
329 pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330 self.inner.principal = principal;
331 self
332 }
333
334 pub fn source(mut self, source: AuditAuthSource) -> Self {
335 self.inner.source = source;
336 self
337 }
338
339 pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340 self.inner.tenant = Some(tenant.into());
341 self
342 }
343
344 pub fn resource(mut self, resource: impl Into<String>) -> Self {
345 self.inner.resource = Some(resource.into());
346 self
347 }
348
349 pub fn outcome(mut self, outcome: Outcome) -> Self {
350 self.inner.outcome = outcome;
351 self
352 }
353
354 pub fn detail(mut self, detail: JsonValue) -> Self {
355 self.inner.detail = detail;
356 self
357 }
358
359 pub fn field(mut self, field: AuditField) -> Self {
363 let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364 JsonValue::Object(map) => map,
365 JsonValue::Null => Map::new(),
366 other => {
367 let mut m = Map::new();
370 m.insert("legacy_detail".to_string(), other);
371 m
372 }
373 };
374 obj.insert(field.name.to_string(), field.value.into_json_value());
375 self.inner.detail = JsonValue::Object(obj);
376 self
377 }
378
379 pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381 for f in fields {
382 self = self.field(f);
383 }
384 self
385 }
386
387 pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388 self.inner.remote_addr = Some(addr.into());
389 self
390 }
391
392 pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393 self.inner.correlation_id = Some(cid.into());
394 self
395 }
396
397 pub fn build(self) -> AuditEvent {
398 self.inner
399 }
400}
401
402#[derive(Debug, Clone)]
416pub enum AuditValue {
417 String(String),
418 Bytes(Vec<u8>),
422 Number(i64),
423 Bool(bool),
424 Null,
425}
426
427impl AuditValue {
428 fn into_json_value(self) -> JsonValue {
429 match self {
430 AuditValue::String(s) => JsonValue::String(s),
431 AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432 AuditValue::Number(n) => JsonValue::Number(n as f64),
433 AuditValue::Bool(b) => JsonValue::Bool(b),
434 AuditValue::Null => JsonValue::Null,
435 }
436 }
437}
438
439impl From<String> for AuditValue {
440 fn from(value: String) -> Self {
441 AuditValue::String(value)
442 }
443}
444
445impl From<&str> for AuditValue {
446 fn from(value: &str) -> Self {
447 AuditValue::String(value.to_string())
448 }
449}
450
451impl From<&String> for AuditValue {
452 fn from(value: &String) -> Self {
453 AuditValue::String(value.clone())
454 }
455}
456
457impl From<Vec<u8>> for AuditValue {
458 fn from(value: Vec<u8>) -> Self {
459 AuditValue::Bytes(value)
460 }
461}
462
463impl From<&[u8]> for AuditValue {
464 fn from(value: &[u8]) -> Self {
465 AuditValue::Bytes(value.to_vec())
466 }
467}
468
469impl From<i64> for AuditValue {
470 fn from(value: i64) -> Self {
471 AuditValue::Number(value)
472 }
473}
474
475impl From<u64> for AuditValue {
476 fn from(value: u64) -> Self {
477 AuditValue::Number(if value > i64::MAX as u64 {
480 i64::MAX
481 } else {
482 value as i64
483 })
484 }
485}
486
487impl From<u32> for AuditValue {
488 fn from(value: u32) -> Self {
489 AuditValue::Number(value as i64)
490 }
491}
492
493impl From<usize> for AuditValue {
494 fn from(value: usize) -> Self {
495 AuditValue::Number(if value > i64::MAX as usize {
496 i64::MAX
497 } else {
498 value as i64
499 })
500 }
501}
502
503impl From<bool> for AuditValue {
504 fn from(value: bool) -> Self {
505 AuditValue::Bool(value)
506 }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510 fn from(value: Option<T>) -> Self {
511 match value {
512 Some(v) => v.into(),
513 None => AuditValue::Null,
514 }
515 }
516}
517
518#[derive(Debug, Clone)]
523pub struct AuditField {
524 name: &'static str,
525 value: AuditValue,
526}
527
528impl AuditField {
529 pub fn name(&self) -> &'static str {
530 self.name
531 }
532 pub fn value(&self) -> &AuditValue {
533 &self.value
534 }
535}
536
537pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555 pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560 AuditField {
561 name,
562 value: value.into(),
563 }
564 }
565}
566
567fn base64_encode(input: &[u8]) -> String {
574 const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575 let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576 let mut i = 0;
577 while i + 3 <= input.len() {
578 let b0 = input[i] as u32;
579 let b1 = input[i + 1] as u32;
580 let b2 = input[i + 2] as u32;
581 let n = (b0 << 16) | (b1 << 8) | b2;
582 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585 out.push(ALPHABET[(n & 0x3f) as usize] as char);
586 i += 3;
587 }
588 let rem = input.len() - i;
589 if rem == 1 {
590 let n = (input[i] as u32) << 16;
591 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593 out.push('=');
594 out.push('=');
595 } else if rem == 2 {
596 let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597 out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598 out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599 out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600 out.push('=');
601 }
602 out
603}
604
605fn new_event_id() -> String {
612 const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613 let now_ms = crate::utils::now_unix_millis();
614 let mut rand_bytes = [0u8; 10];
615 let _ = os_random::fill_bytes(&mut rand_bytes);
616
617 let mut out = String::with_capacity(26);
618 for i in (0..10).rev() {
620 let shift = (i as u32) * 5;
621 let idx = ((now_ms >> shift) & 0x1f) as usize;
622 out.push(ALPHABET[idx] as char);
623 }
624 let mut acc: u128 = 0;
626 for &b in &rand_bytes {
627 acc = (acc << 8) | (b as u128);
628 }
629 for i in (0..16).rev() {
630 let shift = (i as u32) * 5;
631 let idx = ((acc >> shift) & 0x1f) as usize;
632 out.push(ALPHABET[idx] as char);
633 }
634 out
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643 Off,
645 Every,
647 Periodic,
650}
651
652impl FsyncMode {
653 fn from_env() -> Self {
654 match std::env::var("RED_AUDIT_FSYNC")
655 .unwrap_or_default()
656 .to_ascii_lowercase()
657 .as_str()
658 {
659 "every" | "strong" | "on" => Self::Every,
660 "off" | "none" => Self::Off,
661 _ => Self::Periodic,
662 }
663 }
664}
665
666#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672 Event(AuditEvent),
673 Flush(mpsc::Sender<()>),
674 Shutdown,
675}
676
677#[derive(Debug)]
682pub struct AuditLogger {
683 path: PathBuf,
684 tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685 fallback_lock: Mutex<()>,
689 last_hash: Arc<Mutex<Option<String>>>,
693 max_bytes: u64,
694 fsync_mode: FsyncMode,
695 stream_url: Option<String>,
696 writer_alive: Arc<AtomicBool>,
698 pending: Arc<AtomicU64>,
699 handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703 pub fn for_data_path(data_path: &Path) -> Self {
706 let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
707 let path = parent.join(".audit.log");
708 Self::with_path(path)
709 }
710
711 pub fn with_path(path: PathBuf) -> Self {
713 let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
714 .ok()
715 .and_then(|v| v.parse::<u64>().ok())
716 .unwrap_or(64 * 1024 * 1024);
717 let fsync_mode = FsyncMode::from_env();
718 let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
719 .ok()
720 .filter(|s| !s.is_empty());
721 Self::with_settings(path, max_bytes, fsync_mode, stream_url)
722 }
723
724 pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
727 Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
728 }
729
730 fn with_settings(
731 path: PathBuf,
732 max_bytes: u64,
733 fsync_mode: FsyncMode,
734 stream_url: Option<String>,
735 ) -> Self {
736 let writer_alive = Arc::new(AtomicBool::new(false));
737 let pending = Arc::new(AtomicU64::new(0));
738 let mut seed: Option<String> = None;
741 if let Ok(body) = std::fs::read_to_string(&path) {
742 if let Some(line) = body.lines().last() {
743 seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
744 }
745 }
746 let last_hash = Arc::new(Mutex::new(seed));
747 let logger = Self {
748 path,
749 tx: Mutex::new(None),
750 fallback_lock: Mutex::new(()),
751 last_hash,
752 max_bytes,
753 fsync_mode,
754 stream_url,
755 writer_alive: Arc::clone(&writer_alive),
756 pending: Arc::clone(&pending),
757 handle: Mutex::new(None),
758 };
759 logger.start_writer_thread();
760 logger
761 }
762
763 pub fn path(&self) -> &Path {
764 &self.path
765 }
766
767 pub fn max_bytes(&self) -> u64 {
769 self.max_bytes
770 }
771
772 fn start_writer_thread(&self) {
773 let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
774 *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
775 let path = self.path.clone();
776 let max_bytes = self.max_bytes;
777 let fsync_mode = self.fsync_mode;
778 let stream_url = self.stream_url.clone();
779 let writer_alive = Arc::clone(&self.writer_alive);
780 let pending = Arc::clone(&self.pending);
781 let last_hash = Arc::clone(&self.last_hash);
782 writer_alive.store(true, Ordering::SeqCst);
783 let handle = thread::Builder::new()
784 .name("reddb-audit-writer".to_string())
785 .spawn(move || {
786 writer_loop(
787 rx,
788 path,
789 max_bytes,
790 fsync_mode,
791 stream_url,
792 writer_alive,
793 pending,
794 last_hash,
795 );
796 })
797 .expect("spawn audit writer thread");
798 *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
799 }
800
801 pub fn current_hash(&self) -> Option<String> {
803 self.last_hash
804 .lock()
805 .unwrap_or_else(|e| e.into_inner())
806 .clone()
807 }
808
809 pub fn wait_idle(&self, timeout: Duration) -> bool {
812 let deadline = Instant::now() + timeout;
813 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
814 if let Some(tx) = tx_guard.as_ref() {
815 let (back_tx, back_rx) = mpsc::channel();
816 if tx.send(WriterMsg::Flush(back_tx)).is_err() {
817 return false;
818 }
819 drop(tx_guard);
820 let remaining = deadline.saturating_duration_since(Instant::now());
821 return back_rx.recv_timeout(remaining).is_ok();
822 }
823 false
824 }
825
826 pub fn record(
829 &self,
830 action: &str,
831 principal: &str,
832 target: &str,
833 result: &str,
834 details: JsonValue,
835 ) {
836 let event = AuditEvent::from_legacy(action, principal, target, result, details);
837 self.record_event(event);
838 }
839
840 pub fn record_event(&self, event: AuditEvent) {
844 let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
845 let recovered_event: AuditEvent;
846 if let Some(tx) = tx_guard.as_ref() {
847 self.pending.fetch_add(1, Ordering::SeqCst);
848 match tx.try_send(WriterMsg::Event(event)) {
849 Ok(()) => return,
850 Err(mpsc::TrySendError::Full(msg)) => {
851 self.pending.fetch_sub(1, Ordering::SeqCst);
852 tracing::warn!(
853 target: "reddb::audit",
854 "audit channel saturated; falling back to sync write"
855 );
856 recovered_event = match msg {
857 WriterMsg::Event(ev) => ev,
858 _ => return,
859 };
860 }
861 Err(mpsc::TrySendError::Disconnected(msg)) => {
862 self.pending.fetch_sub(1, Ordering::SeqCst);
863 recovered_event = match msg {
864 WriterMsg::Event(ev) => ev,
865 _ => return,
866 };
867 }
868 }
869 } else {
870 recovered_event = event;
871 }
872 drop(tx_guard);
873 self.write_direct(recovered_event);
874 }
875
876 fn write_direct(&self, event: AuditEvent) {
879 let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
880 let prev = self
881 .last_hash
882 .lock()
883 .unwrap_or_else(|e| e.into_inner())
884 .clone();
885 let line = event.to_json_line(prev.as_deref());
886 if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
887 tracing::warn!(
888 target: "reddb::audit",
889 error = %err,
890 path = %self.path.display(),
891 "direct audit append failed"
892 );
893 return;
894 }
895 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
896 if let Ok(mut g) = self.last_hash.lock() {
897 *g = Some(new_hash);
898 }
899 if let Some(url) = &self.stream_url {
900 stream_post(url, &line);
901 }
902 tracing::info!(target: "reddb::audit", "{line}");
903 }
904}
905
906impl Drop for AuditLogger {
907 fn drop(&mut self) {
908 let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
909 if let Some(tx) = tx_guard.take() {
910 let _ = tx.send(WriterMsg::Shutdown);
911 }
912 drop(tx_guard);
913 if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
914 let _ = handle.join();
915 }
916 }
917}
918
919#[allow(clippy::too_many_arguments)]
924fn writer_loop(
925 rx: mpsc::Receiver<WriterMsg>,
926 path: PathBuf,
927 max_bytes: u64,
928 fsync_mode: FsyncMode,
929 stream_url: Option<String>,
930 writer_alive: Arc<AtomicBool>,
931 pending: Arc<AtomicU64>,
932 last_hash: Arc<Mutex<Option<String>>>,
933) {
934 if let Some(parent) = path.parent() {
935 if !parent.as_os_str().is_empty() {
936 let _ = std::fs::create_dir_all(parent);
937 }
938 }
939
940 let mut writer = match open_active(&path) {
941 Ok(w) => w,
942 Err(err) => {
943 tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
944 writer_alive.store(false, Ordering::SeqCst);
945 return;
946 }
947 };
948 let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
952
953 let periodic_interval = Duration::from_millis(250);
954 let mut last_flush = Instant::now();
955 let mut buffered_since_fsync: u64 = 0;
956
957 loop {
958 let recv_timeout = match fsync_mode {
961 FsyncMode::Periodic => periodic_interval,
962 FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
963 };
964 match rx.recv_timeout(recv_timeout) {
965 Ok(WriterMsg::Event(event)) => {
966 let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
967 let line = event.to_json_line(prev.as_deref());
968
969 let line_bytes = line.len() as u64 + 1; if let Err(err) = write_line(&mut writer, &line) {
971 tracing::warn!(
972 target: "reddb::audit",
973 error = %err,
974 "audit write failed; reopening"
975 );
976 if let Ok(w2) = open_active(&path) {
977 writer = w2;
978 let _ = write_line(&mut writer, &line);
979 }
980 }
981 bytes_written = bytes_written.saturating_add(line_bytes);
982 let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
983 if let Ok(mut g) = last_hash.lock() {
984 *g = Some(new_hash);
985 }
986 if let Some(url) = &stream_url {
987 stream_post(url, &line);
988 }
989 tracing::info!(target: "reddb::audit", "{line}");
990 pending.fetch_sub(1, Ordering::SeqCst);
991 buffered_since_fsync += 1;
992
993 match fsync_mode {
994 FsyncMode::Every => {
995 let _ = writer.flush();
996 let _ = writer.get_ref().sync_data();
997 buffered_since_fsync = 0;
998 }
999 FsyncMode::Periodic => {
1000 if last_flush.elapsed() >= periodic_interval {
1001 let _ = writer.flush();
1002 let _ = writer.get_ref().sync_data();
1003 last_flush = Instant::now();
1004 buffered_since_fsync = 0;
1005 }
1006 }
1007 FsyncMode::Off => {}
1008 }
1009
1010 if bytes_written >= max_bytes {
1013 let _ = writer.flush();
1014 let _ = writer.get_ref().sync_data();
1015 if let Err(err) = rotate(&path) {
1016 tracing::warn!(
1017 target: "reddb::audit",
1018 error = %err,
1019 "audit rotation failed; continuing in-place"
1020 );
1021 }
1022 match open_active(&path) {
1023 Ok(w2) => writer = w2,
1024 Err(err) => {
1025 tracing::error!(
1026 target: "reddb::audit",
1027 error = %err,
1028 "audit reopen failed after rotate"
1029 );
1030 break;
1031 }
1032 }
1033 last_flush = Instant::now();
1034 buffered_since_fsync = 0;
1035 bytes_written = 0;
1036 }
1037 }
1038 Ok(WriterMsg::Flush(ack)) => {
1039 let _ = writer.flush();
1040 let _ = writer.get_ref().sync_data();
1041 last_flush = Instant::now();
1042 buffered_since_fsync = 0;
1043 let _ = ack.send(());
1047 }
1048 Ok(WriterMsg::Shutdown) => {
1049 let _ = writer.flush();
1050 let _ = writer.get_ref().sync_data();
1051 break;
1052 }
1053 Err(mpsc::RecvTimeoutError::Timeout) => {
1054 if buffered_since_fsync > 0 {
1055 let _ = writer.flush();
1056 let _ = writer.get_ref().sync_data();
1057 last_flush = Instant::now();
1058 buffered_since_fsync = 0;
1059 }
1060 }
1061 Err(mpsc::RecvTimeoutError::Disconnected) => {
1062 let _ = writer.flush();
1063 let _ = writer.get_ref().sync_data();
1064 break;
1065 }
1066 }
1067 }
1068
1069 writer_alive.store(false, Ordering::SeqCst);
1070}
1071
1072fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1073 if let Some(parent) = path.parent() {
1074 if !parent.as_os_str().is_empty() {
1075 std::fs::create_dir_all(parent)?;
1076 }
1077 }
1078 let f = std::fs::OpenOptions::new()
1079 .create(true)
1080 .append(true)
1081 .open(path)?;
1082 Ok(BufWriter::new(f))
1083}
1084
1085fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1086 writer.write_all(line.as_bytes())?;
1087 writer.write_all(b"\n")?;
1088 Ok(())
1089}
1090
1091fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1092 if let Some(parent) = path.parent() {
1093 if !parent.as_os_str().is_empty() {
1094 std::fs::create_dir_all(parent)?;
1095 }
1096 }
1097 let mut file = std::fs::OpenOptions::new()
1098 .create(true)
1099 .append(true)
1100 .open(path)?;
1101 file.write_all(line.as_bytes())?;
1102 file.write_all(b"\n")?;
1103 file.sync_data()?;
1104 drop(file);
1105 if let Ok(meta) = std::fs::metadata(path) {
1106 if meta.len() >= max_bytes {
1107 let _ = rotate(path);
1108 }
1109 }
1110 Ok(())
1111}
1112
1113fn rotate(active: &Path) -> std::io::Result<()> {
1121 let ts = crate::utils::now_unix_nanos();
1122 let stem = active
1123 .file_name()
1124 .and_then(|s| s.to_str())
1125 .unwrap_or(".audit.log");
1126 let parent = active.parent().unwrap_or_else(|| Path::new("."));
1127 let plain = parent.join(format!("{stem}.{ts}"));
1128 std::fs::rename(active, &plain)?;
1129 let raw = std::fs::read(&plain)?;
1130 let compressed = match zstd::bulk::compress(&raw, 3) {
1131 Ok(c) => c,
1132 Err(err) => {
1133 tracing::warn!(
1136 target: "reddb::audit",
1137 error = %err,
1138 "audit rotation: zstd compress failed; leaving plaintext"
1139 );
1140 return Ok(());
1141 }
1142 };
1143 let zst = parent.join(format!("{stem}.{ts}.zst"));
1144 let mut out = std::fs::File::create(&zst)?;
1145 out.write_all(&compressed)?;
1146 out.sync_data()?;
1147 drop(out);
1148 let _ = std::fs::remove_file(&plain);
1149 Ok(())
1150}
1151
1152fn stream_post(url: &str, line: &str) {
1157 let url = url.to_string();
1158 let line = line.to_string();
1159 let _ = thread::Builder::new()
1163 .name("reddb-audit-siem".to_string())
1164 .spawn(move || {
1165 let agent: ureq::Agent = ureq::Agent::config_builder()
1166 .timeout_connect(Some(Duration::from_secs(2)))
1167 .timeout_send_request(Some(Duration::from_secs(3)))
1168 .timeout_recv_response(Some(Duration::from_secs(3)))
1169 .http_status_as_error(false)
1170 .build()
1171 .into();
1172 let _ = agent
1173 .post(&url)
1174 .header("content-type", "application/x-ndjson")
1175 .send(line.as_bytes());
1176 });
1177}
1178
1179fn format_iso8601(ms_since_epoch: u64) -> String {
1184 let secs = ms_since_epoch / 1000;
1185 let ms = ms_since_epoch % 1000;
1186 let days = secs / 86_400;
1187 let rem = secs % 86_400;
1188 let (y, mo, d) = civil_from_days(days as i64);
1189 let h = rem / 3600;
1190 let mi = (rem % 3600) / 60;
1191 let s = rem % 60;
1192 format!(
1193 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1194 y, mo, d, h, mi, s, ms
1195 )
1196}
1197
1198fn civil_from_days(z: i64) -> (i64, u32, u32) {
1199 let z = z + 719_468;
1200 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1201 let doe = (z - era * 146_097) as u64;
1202 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1203 let y = (yoe as i64) + era * 400;
1204 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1205 let mp = (5 * doy + 2) / 153;
1206 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1207 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1208 (if m <= 2 { y + 1 } else { y }, m, d)
1209}
1210
1211#[cfg(test)]
1216mod tests {
1217 use super::*;
1218
1219 fn temp_data_path(tag: &str) -> PathBuf {
1220 let mut p = std::env::temp_dir();
1221 p.push(format!(
1222 "reddb-audit-{}-{}-{}",
1223 tag,
1224 std::process::id(),
1225 crate::utils::now_unix_nanos()
1226 ));
1227 std::fs::create_dir_all(&p).unwrap();
1228 p.push("data.rdb");
1229 p
1230 }
1231
1232 fn drain(logger: &AuditLogger) {
1233 assert!(logger.wait_idle(Duration::from_secs(2)));
1234 }
1235
1236 #[test]
1237 fn record_writes_one_line_per_call() {
1238 let data = temp_data_path("one-line");
1239 let logger = AuditLogger::for_data_path(&data);
1240 logger.record(
1241 "admin/readonly",
1242 "operator",
1243 "instance",
1244 "ok",
1245 JsonValue::Null,
1246 );
1247 drain(&logger);
1248 let body = std::fs::read_to_string(logger.path()).unwrap();
1249 let lines: Vec<&str> = body.lines().collect();
1250 assert_eq!(lines.len(), 1);
1251 assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1252 assert!(lines[0].contains("\"outcome\":\"success\""));
1253 }
1254
1255 #[test]
1256 fn record_appends_across_calls() {
1257 let data = temp_data_path("append");
1258 let logger = AuditLogger::for_data_path(&data);
1259 logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1260 logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1261 drain(&logger);
1262 let lines = std::fs::read_to_string(logger.path()).unwrap();
1263 assert_eq!(lines.lines().count(), 2);
1264 }
1265
1266 #[test]
1267 fn record_event_emits_full_schema() {
1268 let data = temp_data_path("schema");
1269 let logger = AuditLogger::for_data_path(&data);
1270 let mut detail = Map::new();
1271 detail.insert("ms".to_string(), JsonValue::Number(412.0));
1272 let ev = AuditEvent::builder("admin/shutdown")
1273 .principal("alice@acme")
1274 .source(AuditAuthSource::Session)
1275 .tenant("acme")
1276 .resource("instance")
1277 .outcome(Outcome::Success)
1278 .detail(JsonValue::Object(detail))
1279 .remote_addr("203.0.113.5")
1280 .correlation_id("req-42")
1281 .build();
1282 logger.record_event(ev);
1283 drain(&logger);
1284 let body = std::fs::read_to_string(logger.path()).unwrap();
1285 assert!(body.contains("\"action\":\"admin/shutdown\""));
1286 assert!(body.contains("\"principal\":\"alice@acme\""));
1287 assert!(body.contains("\"tenant\":\"acme\""));
1288 assert!(body.contains("\"source\":\"session\""));
1289 assert!(body.contains("\"correlation_id\":\"req-42\""));
1290 assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1291 assert!(body.contains("\"event_id\":\""));
1292 assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1293 }
1294
1295 #[test]
1296 fn hash_chain_links_every_event() {
1297 let data = temp_data_path("chain");
1298 let logger = AuditLogger::for_data_path(&data);
1299 for i in 0..5 {
1300 logger.record_event(
1301 AuditEvent::builder(format!("test/event/{i}"))
1302 .principal("tester")
1303 .build(),
1304 );
1305 }
1306 drain(&logger);
1307 let body = std::fs::read_to_string(logger.path()).unwrap();
1308 let lines: Vec<&str> = body.lines().collect();
1309 assert_eq!(lines.len(), 5);
1310 let mut prev: Option<String> = None;
1311 for (idx, line) in lines.iter().enumerate() {
1312 let parsed: JsonValue = crate::json::from_str(line).unwrap();
1313 let stored_prev = parsed
1314 .get("prev_hash")
1315 .and_then(|v| v.as_str())
1316 .map(|s| s.to_string());
1317 assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1318 prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1319 }
1320 }
1321
1322 #[test]
1323 fn legacy_record_back_compat_maps_outcomes() {
1324 let data = temp_data_path("legacy");
1325 let logger = AuditLogger::for_data_path(&data);
1326 logger.record(
1327 "admin/restore",
1328 "operator",
1329 "instance",
1330 "err: disk full",
1331 JsonValue::Null,
1332 );
1333 drain(&logger);
1334 let body = std::fs::read_to_string(logger.path()).unwrap();
1335 assert!(body.contains("\"outcome\":\"error\""));
1336 assert!(body.contains("\"result_text\":\"err: disk full\""));
1337 }
1338
1339 #[test]
1340 fn iso8601_formats_known_epoch() {
1341 assert_eq!(
1342 format_iso8601(1_709_210_096_789),
1343 "2024-02-29T12:34:56.789Z"
1344 );
1345 }
1346
1347 #[test]
1348 fn rotation_at_threshold() {
1349 let data = temp_data_path("rotate");
1350 let parent = data.parent().unwrap().to_path_buf();
1351 let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1352 for i in 0..30 {
1353 logger.record_event(
1354 AuditEvent::builder(format!("test/rotate/{i}"))
1355 .principal("rotator")
1356 .detail(JsonValue::String(
1357 "lorem ipsum dolor sit amet consectetur padding padding padding"
1358 .to_string(),
1359 ))
1360 .build(),
1361 );
1362 }
1363 drain(&logger);
1364 let parent = logger.path().parent().unwrap();
1365 let rotated: Vec<_> = std::fs::read_dir(parent)
1366 .unwrap()
1367 .filter_map(|e| e.ok())
1368 .filter(|e| {
1369 e.file_name()
1370 .to_str()
1371 .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1372 .unwrap_or(false)
1373 })
1374 .collect();
1375 assert!(
1376 !rotated.is_empty(),
1377 "expected at least one rotated .zst file"
1378 );
1379 }
1380
1381 #[test]
1382 fn parse_line_round_trips() {
1383 let ev = AuditEvent::builder("auth/login.ok")
1384 .principal("alice")
1385 .source(AuditAuthSource::Password)
1386 .tenant("acme")
1387 .outcome(Outcome::Success)
1388 .build();
1389 let line = ev.to_json_line(None);
1390 let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1391 assert_eq!(parsed.action, "auth/login.ok");
1392 assert_eq!(parsed.principal.as_deref(), Some("alice"));
1393 assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1394 assert_eq!(parsed.outcome, Outcome::Success);
1395 assert_eq!(parsed.source, AuditAuthSource::Password);
1396 }
1397
1398 #[test]
1399 fn event_id_is_lexicographically_sortable_by_time() {
1400 let a = new_event_id();
1401 std::thread::sleep(Duration::from_millis(2));
1402 let b = new_event_id();
1403 assert!(a < b, "event_id ordering broken: {a} >= {b}");
1404 }
1405
1406 #[test]
1411 fn audit_field_escaper_typed_string() {
1412 let f = AuditFieldEscaper::field("collection", "users");
1413 assert_eq!(f.name(), "collection");
1414 match f.value() {
1415 AuditValue::String(s) => assert_eq!(s, "users"),
1416 other => panic!("expected String, got {:?}", other),
1417 }
1418 }
1419
1420 #[test]
1421 fn audit_field_escaper_bytes_emit_base64() {
1422 let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1423 let f = AuditFieldEscaper::field("blob", bytes);
1424 let ev = AuditEvent::builder("test/bytes").field(f).build();
1425 let line = ev.to_json_line(None);
1426 assert!(
1428 line.contains("\"blob\":\"3q2+7w==\""),
1429 "expected base64 emission: {line}"
1430 );
1431 }
1432
1433 #[test]
1434 fn audit_field_escaper_number_bool_null() {
1435 let ev = AuditEvent::builder("test/types")
1436 .field(AuditFieldEscaper::field("count", 42i64))
1437 .field(AuditFieldEscaper::field("ok", true))
1438 .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1439 .build();
1440 let line = ev.to_json_line(None);
1441 assert!(line.contains("\"count\":42"));
1442 assert!(line.contains("\"ok\":true"));
1443 assert!(line.contains("\"missing\":null"));
1444 }
1445
1446 #[test]
1447 fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1448 let cases: &[(&str, &str)] = &[
1454 ("crlf", "line1\r\nline2"),
1455 ("nul", "before\0after"),
1456 ("quote", "she said \"hi\""),
1457 ("semicolon", "a;b;c"),
1458 ("json_in_json", r#"{"injected":"yes"}"#),
1459 ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1460 ("backslash", "C:\\path\\file"),
1461 ("mixed", "name=\"x\"\n\\path\t\x01end"),
1462 ("empty", ""),
1463 ];
1470 let mut survivors = 0usize;
1471 for (label, payload) in cases {
1472 let f = AuditFieldEscaper::field("user_input", *payload);
1473 let ev = AuditEvent::builder(format!("test/adv/{label}"))
1474 .principal("attacker")
1475 .field(f)
1476 .build();
1477 let line = ev.to_json_line(None);
1478 assert!(
1480 !line.contains('\n'),
1481 "{label}: embedded newline in JSONL row: {line:?}"
1482 );
1483 let parsed: JsonValue = crate::json::from_str(&line)
1485 .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1486 let detail = parsed.get("detail").expect("detail present");
1487 let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1488 assert_eq!(
1489 recovered, *payload,
1490 "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1491 );
1492 survivors += 1;
1493 }
1494 assert_eq!(
1495 survivors,
1496 cases.len(),
1497 "adversarial corpus survival rate: {survivors}/{}",
1498 cases.len()
1499 );
1500 }
1501
1502 #[test]
1503 fn audit_emission_emits_one_line_per_call_through_guard() {
1504 let data = temp_data_path("guard-emission");
1505 let logger = AuditLogger::for_data_path(&data);
1506 let attacker = "users\";DROP\r\n{\"x\":1}\0";
1508 logger.record_event(
1509 AuditEvent::builder("admin/scan")
1510 .principal("evil")
1511 .field(AuditFieldEscaper::field("collection", attacker))
1512 .build(),
1513 );
1514 drain(&logger);
1515 let body = std::fs::read_to_string(logger.path()).unwrap();
1516 let lines: Vec<&str> = body.lines().collect();
1517 assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1518 let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1520 let recovered = parsed
1521 .get("detail")
1522 .and_then(|d| d.get("collection"))
1523 .and_then(|v| v.as_str())
1524 .unwrap();
1525 assert_eq!(recovered, attacker);
1526 }
1527
1528 #[test]
1529 fn audit_field_escaper_no_format_macro_in_value_path() {
1530 let _ = AuditFieldEscaper::field("name", "value"); }
1540
1541 #[test]
1542 fn audit_field_escaper_chains_via_builder_fields() {
1543 let ev = AuditEvent::builder("test/multi")
1544 .fields([
1545 AuditFieldEscaper::field("a", "x"),
1546 AuditFieldEscaper::field("b", 7i64),
1547 AuditFieldEscaper::field("c", true),
1548 ])
1549 .build();
1550 let line = ev.to_json_line(None);
1551 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1552 let d = parsed.get("detail").unwrap();
1553 assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1554 assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1555 assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1556 }
1557
1558 proptest::proptest! {
1559 #[test]
1571 fn prop_audit_field_round_trips_arbitrary_strings(
1572 payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1573 ) {
1574 let f = AuditFieldEscaper::field("p", payload.as_str());
1575 let ev = AuditEvent::builder("prop/test").field(f).build();
1576 let line = ev.to_json_line(None);
1577 proptest::prop_assert!(!line.contains('\n'));
1579 let parsed: JsonValue = crate::json::from_str(&line)
1580 .expect("emission must always parse");
1581 let recovered = parsed
1582 .get("detail")
1583 .and_then(|d| d.get("p"))
1584 .and_then(|v| v.as_str())
1585 .unwrap();
1586 proptest::prop_assert_eq!(recovered, payload.as_str());
1587 }
1588
1589 #[test]
1592 fn prop_audit_field_bytes_base64_round_trip(
1593 bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1594 ) {
1595 let f = AuditFieldEscaper::field("b", bytes.clone());
1596 let ev = AuditEvent::builder("prop/bytes").field(f).build();
1597 let line = ev.to_json_line(None);
1598 proptest::prop_assert!(!line.contains('\n'));
1599 let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1600 let recovered_b64 = parsed
1601 .get("detail")
1602 .and_then(|d| d.get("b"))
1603 .and_then(|v| v.as_str())
1604 .unwrap()
1605 .to_string();
1606 proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1607 }
1608 }
1609}