1use std::collections::{HashMap, VecDeque};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Condvar, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use super::operator_event::OperatorEvent;
21use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
22
23const KNOWN_HANDLERS: &[&str] = &[
28 "audit_log",
29 "tracing",
30 "stderr",
31 "pagerduty",
32 "generic_webhook",
33];
34
35fn closest_match(input: &str, candidates: &[&str]) -> Option<String> {
36 candidates
37 .iter()
38 .map(|c| (*c, strsim::levenshtein(input, c)))
39 .min_by_key(|(_, d)| *d)
40 .filter(|(_, d)| *d <= 4)
41 .map(|(c, _)| c.to_string())
42}
43
44fn validate_handler_names(names: &[String]) -> Result<(), ConfigError> {
45 for name in names {
46 if !KNOWN_HANDLERS.contains(&name.as_str()) {
47 return Err(ConfigError::UnknownHandler { key: name.clone() });
48 }
49 }
50 Ok(())
51}
52
53#[derive(Debug, Clone, Default)]
59pub struct RateLimitConfig {
60 pub requests: u32,
62 pub window_sec: u64,
63}
64
65#[derive(Debug, Clone)]
67pub struct WebhookHandlerConfig {
68 pub url: String,
69 pub auth_env: String,
72 pub rate_limit: Option<RateLimitConfig>,
73}
74
75#[derive(Debug, Default)]
77pub struct RouterConfig {
78 pub default_handlers: Option<Vec<String>>,
81 pub variant_routes: HashMap<String, Vec<String>>,
83 pub pagerduty: Option<WebhookHandlerConfig>,
85 pub generic_webhook: Option<WebhookHandlerConfig>,
87}
88
89#[derive(Debug)]
94pub enum ConfigError {
95 UnknownVariant {
96 key: String,
97 suggestion: Option<String>,
98 },
99 UnknownHandler {
100 key: String,
101 },
102 MissingEnvVar {
103 handler: String,
104 var: String,
105 },
106}
107
108impl std::fmt::Display for ConfigError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::UnknownVariant { key, suggestion } => {
112 write!(f, "unknown OperatorEvent variant '{key}'")?;
113 if let Some(s) = suggestion {
114 write!(f, "; did you mean '{s}'?")?;
115 }
116 Ok(())
117 }
118 Self::UnknownHandler { key } => write!(
119 f,
120 "unknown handler name '{key}'; known: {}",
121 KNOWN_HANDLERS.join(", ")
122 ),
123 Self::MissingEnvVar { handler, var } => write!(
124 f,
125 "handler '{handler}' requires env var '{var}' which is not set"
126 ),
127 }
128 }
129}
130
131impl std::error::Error for ConfigError {}
132
133#[derive(Debug)]
138struct TokenBucket {
139 tokens: f64,
140 rate: f64,
141 burst: f64,
142 last: Instant,
143}
144
145impl TokenBucket {
146 fn new(cfg: &RateLimitConfig) -> Self {
147 let rate = if cfg.window_sec > 0 {
148 cfg.requests as f64 / cfg.window_sec as f64
149 } else {
150 cfg.requests as f64
151 };
152 let burst = rate.max(1.0);
153 Self {
154 tokens: burst,
155 rate,
156 burst,
157 last: Instant::now(),
158 }
159 }
160
161 fn try_consume(&mut self) -> bool {
162 let now = Instant::now();
163 let elapsed = now.saturating_duration_since(self.last).as_secs_f64();
164 self.tokens = (self.tokens + elapsed * self.rate).min(self.burst);
165 self.last = now;
166 if self.tokens >= 1.0 {
167 self.tokens -= 1.0;
168 true
169 } else {
170 false
171 }
172 }
173}
174
175const QUEUE_CAPACITY: usize = 1_000;
180
181#[derive(Debug)]
182struct WebhookQueue {
183 inner: Mutex<VecDeque<WebhookPayload>>,
184 not_empty: Condvar,
185 dropped_queue_full: AtomicU64,
186 dropped_rate_limit: AtomicU64,
187 dropped_max_retries: AtomicU64,
188 sent: AtomicU64,
189}
190
191impl WebhookQueue {
192 fn new() -> Arc<Self> {
193 Arc::new(Self {
194 inner: Mutex::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
195 not_empty: Condvar::new(),
196 dropped_queue_full: AtomicU64::new(0),
197 dropped_rate_limit: AtomicU64::new(0),
198 dropped_max_retries: AtomicU64::new(0),
199 sent: AtomicU64::new(0),
200 })
201 }
202
203 fn push(&self, payload: WebhookPayload) {
204 let mut q = self.inner.lock().expect("webhook queue mutex");
205 if q.len() >= QUEUE_CAPACITY {
206 q.pop_front(); self.dropped_queue_full.fetch_add(1, Ordering::Relaxed);
208 }
209 q.push_back(payload);
210 drop(q);
211 self.not_empty.notify_one();
212 }
213
214 fn pop_blocking(&self) -> WebhookPayload {
215 let mut q = self.inner.lock().expect("webhook queue mutex");
216 loop {
217 if let Some(item) = q.pop_front() {
218 return item;
219 }
220 q = self.not_empty.wait(q).expect("webhook queue condvar");
221 }
222 }
223}
224
225#[derive(Clone, Debug)]
230struct WebhookPayload {
231 action: String,
232 summary: String,
233 ts_ms: u64,
234}
235
236impl WebhookPayload {
237 fn to_json_body(&self) -> String {
238 use crate::serde_json::Value;
241 let event_json = Value::String(self.action.clone()).to_string_compact();
242 let summary_json = Value::String(self.summary.clone()).to_string_compact();
243 format!(
244 r#"{{"event":{event_json},"summary":{summary_json},"ts":{}}}"#,
245 self.ts_ms
246 )
247 }
248}
249
250fn spawn_webhook_worker(
255 name: &str,
256 url: String,
257 auth_token: String,
258 queue: Arc<WebhookQueue>,
259) -> thread::JoinHandle<()> {
260 let name = name.to_string();
261 thread::Builder::new()
262 .name(format!("reddb-webhook-{name}"))
263 .spawn(move || {
264 let agent: ureq::Agent = ureq::Agent::config_builder()
265 .timeout_connect(Some(Duration::from_secs(3)))
266 .timeout_send_request(Some(Duration::from_secs(5)))
267 .timeout_recv_response(Some(Duration::from_secs(5)))
268 .http_status_as_error(false)
269 .build()
270 .into();
271
272 loop {
273 let payload = queue.pop_blocking();
274 let body = payload.to_json_body();
275 let bearer = format!("Bearer {auth_token}");
276
277 let mut success = false;
278 for attempt in 1u32..=3 {
279 let result = agent
280 .post(&url)
281 .header("content-type", "application/json")
282 .header("authorization", &bearer)
283 .send(body.as_bytes());
284
285 match result {
286 Ok(_) => {
287 queue.sent.fetch_add(1, Ordering::Relaxed);
288 success = true;
289 break;
290 }
291 Err(_) if attempt < 3 => {
292 thread::sleep(Duration::from_millis(100 * (1u64 << attempt)));
293 }
294 Err(_) => {}
295 }
296 }
297
298 if !success {
299 queue.dropped_max_retries.fetch_add(1, Ordering::Relaxed);
300 tracing::warn!(
301 target: "reddb::operator_router",
302 handler = %name,
303 "webhook delivery failed after 3 attempts; event dropped"
304 );
305 }
306 }
307 })
308 .expect("spawn webhook worker thread")
309}
310
311#[derive(Debug)]
316enum EffectiveHandler {
317 AuditLog,
318 Tracing,
319 Stderr,
320 Webhook {
321 name: String,
322 queue: Arc<WebhookQueue>,
323 rate_limiter: Option<Mutex<TokenBucket>>,
324 },
325}
326
327impl EffectiveHandler {
328 fn name(&self) -> &str {
329 match self {
330 Self::AuditLog => "audit_log",
331 Self::Tracing => "tracing",
332 Self::Stderr => "stderr",
333 Self::Webhook { name, .. } => name,
334 }
335 }
336}
337
338#[derive(Debug, Default)]
344pub struct RouterMetricsSnapshot {
345 pub dropped: Vec<(String, u64)>,
347 pub sent: Vec<(String, u64)>,
349}
350
351#[derive(Debug)]
362pub struct OperatorEventRouter {
363 audit_logger: Option<Arc<AuditLogger>>,
364 default_route: Vec<Arc<EffectiveHandler>>,
365 variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>>,
366 webhook_queues: HashMap<String, Arc<WebhookQueue>>,
367 _workers: Mutex<Vec<thread::JoinHandle<()>>>,
370}
371
372impl OperatorEventRouter {
373 pub fn new(
381 config: RouterConfig,
382 audit_logger: Option<Arc<AuditLogger>>,
383 ) -> Result<Self, ConfigError> {
384 let known_variants = OperatorEvent::all_variant_names();
385
386 for key in config.variant_routes.keys() {
388 if !known_variants.contains(&key.as_str()) {
389 let suggestion = closest_match(key, known_variants);
390 return Err(ConfigError::UnknownVariant {
391 key: key.clone(),
392 suggestion,
393 });
394 }
395 }
396
397 for names in config.variant_routes.values() {
399 validate_handler_names(names)?;
400 }
401 if let Some(ref dh) = config.default_handlers {
402 validate_handler_names(dh)?;
403 }
404
405 let mut webhook_queues: HashMap<String, Arc<WebhookQueue>> = HashMap::new();
407 let mut workers: Vec<thread::JoinHandle<()>> = Vec::new();
408
409 let pd_handler = config
410 .pagerduty
411 .as_ref()
412 .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
413 let token =
414 std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
415 handler: "pagerduty".into(),
416 var: cfg.auth_env.clone(),
417 })?;
418 let queue = WebhookQueue::new();
419 webhook_queues.insert("pagerduty".into(), Arc::clone(&queue));
420 workers.push(spawn_webhook_worker(
421 "pagerduty",
422 cfg.url.clone(),
423 token,
424 Arc::clone(&queue),
425 ));
426 let rate_limiter = cfg
427 .rate_limit
428 .as_ref()
429 .map(|rl| Mutex::new(TokenBucket::new(rl)));
430 Ok(Arc::new(EffectiveHandler::Webhook {
431 name: "pagerduty".into(),
432 queue,
433 rate_limiter,
434 }))
435 })
436 .transpose()?;
437
438 let gw_handler = config
439 .generic_webhook
440 .as_ref()
441 .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
442 let token =
443 std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
444 handler: "generic_webhook".into(),
445 var: cfg.auth_env.clone(),
446 })?;
447 let queue = WebhookQueue::new();
448 webhook_queues.insert("generic_webhook".into(), Arc::clone(&queue));
449 workers.push(spawn_webhook_worker(
450 "generic_webhook",
451 cfg.url.clone(),
452 token,
453 Arc::clone(&queue),
454 ));
455 let rate_limiter = cfg
456 .rate_limit
457 .as_ref()
458 .map(|rl| Mutex::new(TokenBucket::new(rl)));
459 Ok(Arc::new(EffectiveHandler::Webhook {
460 name: "generic_webhook".into(),
461 queue,
462 rate_limiter,
463 }))
464 })
465 .transpose()?;
466
467 let resolve = |names: &[String]| -> Vec<Arc<EffectiveHandler>> {
471 names
472 .iter()
473 .filter_map(|n| match n.as_str() {
474 "audit_log" => Some(Arc::new(EffectiveHandler::AuditLog)),
475 "tracing" => Some(Arc::new(EffectiveHandler::Tracing)),
476 "stderr" => Some(Arc::new(EffectiveHandler::Stderr)),
477 "pagerduty" => pd_handler.clone(),
478 "generic_webhook" => gw_handler.clone(),
479 _ => None,
480 })
481 .collect()
482 };
483
484 let code_default = vec!["audit_log".to_string(), "tracing".to_string()];
485 let default_names = config.default_handlers.as_deref().unwrap_or(&code_default);
486 let default_route = resolve(default_names);
487
488 let mut variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>> = HashMap::new();
489 for (key, names) in &config.variant_routes {
490 if let Some(static_key) = known_variants.iter().copied().find(|v| *v == key.as_str()) {
491 variant_routes.insert(static_key, resolve(names));
492 }
493 }
494
495 Ok(Self {
496 audit_logger,
497 default_route,
498 variant_routes,
499 webhook_queues,
500 _workers: Mutex::new(workers),
501 })
502 }
503
504 pub fn route(&self, event: OperatorEvent) {
509 let variant = event.variant_name();
510 let handlers = self
511 .variant_routes
512 .get(variant)
513 .unwrap_or(&self.default_route);
514
515 let (action, fields, summary) = event.decompose();
516 let ts_ms = crate::utils::now_unix_millis();
517
518 for handler in handlers {
519 match handler.as_ref() {
520 EffectiveHandler::AuditLog => {
521 if let Some(audit) = &self.audit_logger {
522 let ev = AuditEvent::builder(action)
523 .source(AuditAuthSource::System)
524 .outcome(Outcome::Error)
525 .fields(fields.clone())
526 .build();
527 audit.record_event(ev);
528 }
529 }
530 EffectiveHandler::Tracing => {
531 tracing::warn!(target: "reddb::operator", "{summary}");
532 }
533 EffectiveHandler::Stderr => {
534 eprintln!("[reddb::operator] {summary}");
535 }
536 EffectiveHandler::Webhook {
537 name,
538 queue,
539 rate_limiter,
540 } => {
541 if let Some(rl) = rate_limiter {
542 let allowed = rl.lock().expect("rate limiter mutex").try_consume();
543 if !allowed {
544 queue.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
545 tracing::debug!(
546 target: "reddb::operator_router",
547 handler = %name,
548 "event rate-limited; skipping webhook"
549 );
550 continue;
551 }
552 }
553 queue.push(WebhookPayload {
554 action: action.to_string(),
555 summary: summary.clone(),
556 ts_ms,
557 });
558 }
559 }
560 }
561 }
562
563 pub fn metrics(&self) -> RouterMetricsSnapshot {
565 let mut snap = RouterMetricsSnapshot::default();
566 for (name, q) in &self.webhook_queues {
567 let dropped = q.dropped_queue_full.load(Ordering::Relaxed)
568 + q.dropped_rate_limit.load(Ordering::Relaxed)
569 + q.dropped_max_retries.load(Ordering::Relaxed);
570 snap.dropped.push((name.clone(), dropped));
571 snap.sent
572 .push((name.clone(), q.sent.load(Ordering::Relaxed)));
573 }
574 snap
575 }
576}
577
578#[cfg(test)]
583mod tests {
584 use std::io::Read;
585 use std::net::TcpListener;
586 use std::sync::Arc;
587
588 use super::*;
589 use crate::runtime::audit_log::AuditLogger;
590
591 fn make_audit_logger() -> (Arc<AuditLogger>, std::path::PathBuf) {
592 let mut dir = std::env::temp_dir();
593 dir.push(format!(
594 "reddb-router-test-{}-{}",
595 std::process::id(),
596 crate::utils::now_unix_nanos()
597 ));
598 std::fs::create_dir_all(&dir).unwrap();
599 let path = dir.join(".audit.log");
600 let logger = Arc::new(AuditLogger::with_path(path.clone()));
601 (logger, path)
602 }
603
604 fn drain(logger: &AuditLogger) {
605 assert!(
606 logger.wait_idle(Duration::from_secs(2)),
607 "audit logger drain timed out"
608 );
609 }
610
611 fn last_audit_action(path: &std::path::Path) -> Option<String> {
612 let body = std::fs::read_to_string(path).ok()?;
613 let line = body.lines().last()?;
614 let v: crate::serde_json::Value = crate::serde_json::from_str(line).ok()?;
615 v.get("action")
616 .and_then(|x| x.as_str())
617 .map(|s| s.to_string())
618 }
619
620 #[test]
625 fn empty_config_routes_all_variants_to_audit_and_tracing() {
626 let (audit, path) = make_audit_logger();
627 let router = OperatorEventRouter::new(RouterConfig::default(), Some(Arc::clone(&audit)))
628 .expect("router build");
629
630 let variants: &[OperatorEvent] = &[
631 OperatorEvent::ReplicationBroken {
632 peer: "p".into(),
633 reason: "r".into(),
634 },
635 OperatorEvent::Divergence {
636 peer: "p".into(),
637 leader_lsn: 1,
638 follower_lsn: 0,
639 },
640 OperatorEvent::WalFsyncFailed {
641 path: "/d".into(),
642 error: "e".into(),
643 },
644 OperatorEvent::DiskSpaceCritical {
645 path: "/d".into(),
646 available_bytes: 1,
647 threshold_bytes: 2,
648 },
649 OperatorEvent::AuthBypass {
650 principal: "a".into(),
651 resource: "r".into(),
652 detail: "d".into(),
653 },
654 OperatorEvent::AdminCapabilityGranted {
655 granted_to: "a".into(),
656 capability: "c".into(),
657 granted_by: "b".into(),
658 },
659 OperatorEvent::SecretRotationFailed {
660 secret_ref: "s".into(),
661 error: "e".into(),
662 },
663 OperatorEvent::ConfigChanged {
664 key: "k".into(),
665 old_value: "o".into(),
666 new_value: "n".into(),
667 changed_by: "b".into(),
668 },
669 OperatorEvent::StartupFailed {
670 phase: "p".into(),
671 error: "e".into(),
672 },
673 OperatorEvent::ShutdownForced { reason: "r".into() },
674 OperatorEvent::SchemaCorruption {
675 collection: "c".into(),
676 detail: "d".into(),
677 },
678 OperatorEvent::CheckpointFailed {
679 lsn: 1,
680 error: "e".into(),
681 },
682 OperatorEvent::ConfigChangeRequiresRestart {
683 fields_changed: "f".into(),
684 },
685 OperatorEvent::SubscriptionSchemaChange {
686 collection: "c".into(),
687 subscription_names: "sub1".into(),
688 fields_added: "phone".into(),
689 fields_removed: "".into(),
690 lsn: 42,
691 },
692 OperatorEvent::OutboxDlqActivated {
693 queue: "users_events".into(),
694 dlq: "users_events_outbox_dlq".into(),
695 reason: "queue_full".into(),
696 },
697 ];
698
699 for event in variants {
702 let vname = event.variant_name();
704 router.route(clone_event(event));
705 let _ = vname; }
707
708 drain(&audit);
709 let action = last_audit_action(&path).expect("at least one audit line");
711 assert!(action.starts_with("operator/"), "action={action}");
712 }
713
714 fn clone_event(e: &OperatorEvent) -> OperatorEvent {
716 match e {
717 OperatorEvent::ReplicationBroken { peer, reason } => OperatorEvent::ReplicationBroken {
718 peer: peer.clone(),
719 reason: reason.clone(),
720 },
721 OperatorEvent::Divergence {
722 peer,
723 leader_lsn,
724 follower_lsn,
725 } => OperatorEvent::Divergence {
726 peer: peer.clone(),
727 leader_lsn: *leader_lsn,
728 follower_lsn: *follower_lsn,
729 },
730 OperatorEvent::WalFsyncFailed { path, error } => OperatorEvent::WalFsyncFailed {
731 path: path.clone(),
732 error: error.clone(),
733 },
734 OperatorEvent::DiskSpaceCritical {
735 path,
736 available_bytes,
737 threshold_bytes,
738 } => OperatorEvent::DiskSpaceCritical {
739 path: path.clone(),
740 available_bytes: *available_bytes,
741 threshold_bytes: *threshold_bytes,
742 },
743 OperatorEvent::AuthBypass {
744 principal,
745 resource,
746 detail,
747 } => OperatorEvent::AuthBypass {
748 principal: principal.clone(),
749 resource: resource.clone(),
750 detail: detail.clone(),
751 },
752 OperatorEvent::AdminCapabilityGranted {
753 granted_to,
754 capability,
755 granted_by,
756 } => OperatorEvent::AdminCapabilityGranted {
757 granted_to: granted_to.clone(),
758 capability: capability.clone(),
759 granted_by: granted_by.clone(),
760 },
761 OperatorEvent::SecretRotationFailed { secret_ref, error } => {
762 OperatorEvent::SecretRotationFailed {
763 secret_ref: secret_ref.clone(),
764 error: error.clone(),
765 }
766 }
767 OperatorEvent::ConfigChanged {
768 key,
769 old_value,
770 new_value,
771 changed_by,
772 } => OperatorEvent::ConfigChanged {
773 key: key.clone(),
774 old_value: old_value.clone(),
775 new_value: new_value.clone(),
776 changed_by: changed_by.clone(),
777 },
778 OperatorEvent::StartupFailed { phase, error } => OperatorEvent::StartupFailed {
779 phase: phase.clone(),
780 error: error.clone(),
781 },
782 OperatorEvent::ShutdownForced { reason } => OperatorEvent::ShutdownForced {
783 reason: reason.clone(),
784 },
785 OperatorEvent::SchemaCorruption { collection, detail } => {
786 OperatorEvent::SchemaCorruption {
787 collection: collection.clone(),
788 detail: detail.clone(),
789 }
790 }
791 OperatorEvent::CheckpointFailed { lsn, error } => OperatorEvent::CheckpointFailed {
792 lsn: *lsn,
793 error: error.clone(),
794 },
795 OperatorEvent::ConfigChangeRequiresRestart { fields_changed } => {
796 OperatorEvent::ConfigChangeRequiresRestart {
797 fields_changed: fields_changed.clone(),
798 }
799 }
800 OperatorEvent::DanglingAdminIntent { .. } => {
801 OperatorEvent::ShutdownForced {
803 reason: "clone_placeholder".into(),
804 }
805 }
806 OperatorEvent::SubscriptionSchemaChange {
807 collection,
808 subscription_names,
809 fields_added,
810 fields_removed,
811 lsn,
812 } => OperatorEvent::SubscriptionSchemaChange {
813 collection: collection.clone(),
814 subscription_names: subscription_names.clone(),
815 fields_added: fields_added.clone(),
816 fields_removed: fields_removed.clone(),
817 lsn: *lsn,
818 },
819 OperatorEvent::OutboxDlqActivated { queue, dlq, reason } => {
820 OperatorEvent::OutboxDlqActivated {
821 queue: queue.clone(),
822 dlq: dlq.clone(),
823 reason: reason.clone(),
824 }
825 }
826 OperatorEvent::QueueDlqPromoted {
827 queue,
828 group,
829 dlq,
830 message_id,
831 attempts,
832 reason,
833 } => OperatorEvent::QueueDlqPromoted {
834 queue: queue.clone(),
835 group: group.clone(),
836 dlq: dlq.clone(),
837 message_id: *message_id,
838 attempts: *attempts,
839 reason: reason.clone(),
840 },
841 }
842 }
843
844 #[test]
849 fn unknown_variant_gives_suggestion() {
850 let mut config = RouterConfig::default();
851 config
853 .variant_routes
854 .insert("AuthBypas".into(), vec!["audit_log".into()]);
855 let err = OperatorEventRouter::new(config, None).unwrap_err();
856 match err {
857 ConfigError::UnknownVariant { key, suggestion } => {
858 assert_eq!(key, "AuthBypas");
859 assert_eq!(suggestion.as_deref(), Some("AuthBypass"));
861 }
862 other => panic!("expected UnknownVariant, got: {other}"),
863 }
864 }
865
866 #[test]
867 fn unknown_handler_name_is_rejected() {
868 let mut config = RouterConfig::default();
869 config.variant_routes.insert(
870 "AuthBypass".into(),
871 vec!["slack".into()], );
873 let err = OperatorEventRouter::new(config, None).unwrap_err();
874 assert!(matches!(err, ConfigError::UnknownHandler { .. }));
875 }
876
877 #[test]
882 fn per_variant_route_overrides_default() {
883 let (audit, path) = make_audit_logger();
884 let mut config = RouterConfig::default();
885 config
887 .variant_routes
888 .insert("AuthBypass".into(), vec!["stderr".into()]);
889 let router = OperatorEventRouter::new(config, Some(Arc::clone(&audit))).unwrap();
890
891 router.route(OperatorEvent::AuthBypass {
893 principal: "test".into(),
894 resource: "/secret".into(),
895 detail: "test override".into(),
896 });
897 drain(&audit);
898 let body = std::fs::read_to_string(&path).unwrap_or_default();
900 assert!(
901 body.lines().all(|l| !l.contains("auth_bypass")),
902 "auth_bypass should not appear in audit (stderr-only route)"
903 );
904
905 router.route(OperatorEvent::ShutdownForced {
907 reason: "test".into(),
908 });
909 drain(&audit);
910 let action = last_audit_action(&path).expect("shutdown_forced in audit");
911 assert_eq!(action, "operator/shutdown_forced");
912 }
913
914 #[test]
919 fn token_bucket_throttles_after_burst() {
920 let mut bucket = TokenBucket::new(&RateLimitConfig {
921 requests: 3,
922 window_sec: 60,
923 });
924 assert!(bucket.try_consume(), "first consume should succeed");
927 assert!(!bucket.try_consume(), "second consume should be throttled");
929 }
930
931 #[test]
932 fn token_bucket_refills_over_time() {
933 let mut bucket = TokenBucket::new(&RateLimitConfig {
934 requests: 100,
935 window_sec: 1,
936 });
937 for _ in 0..100 {
939 assert!(bucket.try_consume());
940 }
941 assert!(!bucket.try_consume(), "burst exhausted");
942 thread::sleep(Duration::from_millis(20));
943 assert!(bucket.try_consume(), "should refill after sleep");
945 }
946
947 #[test]
952 fn queue_drops_oldest_on_saturation() {
953 let queue = WebhookQueue::new();
954 for i in 0..QUEUE_CAPACITY {
955 queue.push(WebhookPayload {
956 action: format!("ev/{i}"),
957 summary: format!("s{i}"),
958 ts_ms: i as u64,
959 });
960 }
961 assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 0);
962
963 queue.push(WebhookPayload {
965 action: "ev/overflow".into(),
966 summary: "overflow".into(),
967 ts_ms: QUEUE_CAPACITY as u64,
968 });
969 assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 1);
970
971 let first = queue.pop_blocking();
973 assert_eq!(first.action, "ev/1");
974 }
975
976 #[test]
981 fn webhook_delivers_payload_to_mock_server() {
982 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
984 let addr = listener.local_addr().unwrap();
985 let url = format!("http://{addr}/webhook");
986
987 let server_thread = thread::spawn(move || {
989 let (mut stream, _) = listener.accept().unwrap();
990 let mut buf = vec![0u8; 4096];
991 let n = stream.read(&mut buf).unwrap_or(0);
992 String::from_utf8_lossy(&buf[..n]).to_string()
993 });
994
995 std::env::set_var("TEST_WEBHOOK_TOKEN_ROUTER", "test-token-42");
997
998 let config = RouterConfig {
999 default_handlers: None,
1000 variant_routes: {
1001 let mut m = HashMap::new();
1002 m.insert("ShutdownForced".into(), vec!["generic_webhook".into()]);
1003 m
1004 },
1005 pagerduty: None,
1006 generic_webhook: Some(WebhookHandlerConfig {
1007 url,
1008 auth_env: "TEST_WEBHOOK_TOKEN_ROUTER".into(),
1009 rate_limit: None,
1010 }),
1011 };
1012
1013 let router = OperatorEventRouter::new(config, None).unwrap();
1014 router.route(OperatorEvent::ShutdownForced {
1015 reason: "integration-test".into(),
1016 });
1017
1018 let raw = server_thread.join().expect("server thread");
1019 assert!(raw.contains("Bearer test-token-42"), "missing auth header");
1021 assert!(raw.contains("shutdown_forced"), "missing event in body");
1022 }
1023
1024 #[test]
1029 fn concurrent_route_calls_safe() {
1030 let router = Arc::new(OperatorEventRouter::new(RouterConfig::default(), None).unwrap());
1031 let handles: Vec<_> = (0..16)
1032 .map(|_| {
1033 let r = Arc::clone(&router);
1034 thread::spawn(move || {
1035 for _ in 0..50 {
1036 r.route(OperatorEvent::ShutdownForced {
1037 reason: "stress".into(),
1038 });
1039 }
1040 })
1041 })
1042 .collect();
1043 for h in handles {
1044 h.join().unwrap();
1045 }
1046 }
1048
1049 #[test]
1054 fn missing_env_var_fails_at_construction() {
1055 let config = RouterConfig {
1056 default_handlers: None,
1057 variant_routes: HashMap::new(),
1058 pagerduty: Some(WebhookHandlerConfig {
1059 url: "http://localhost/pd".into(),
1060 auth_env: "REDDB_TEST_PD_KEY_DEFINITELY_NOT_SET_12345".into(),
1061 rate_limit: None,
1062 }),
1063 generic_webhook: None,
1064 };
1065 let err = OperatorEventRouter::new(config, None).unwrap_err();
1066 assert!(matches!(err, ConfigError::MissingEnvVar { .. }));
1067 }
1068}