1use std::collections::{HashMap, VecDeque};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Condvar, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use super::operator_event::OperatorEvent;
21use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
22
23const KNOWN_HANDLERS: &[&str] = &[
28 "audit_log",
29 "tracing",
30 "stderr",
31 "pagerduty",
32 "generic_webhook",
33];
34
35fn closest_match(input: &str, candidates: &[&str]) -> Option<String> {
36 candidates
37 .iter()
38 .map(|c| (*c, strsim::levenshtein(input, c)))
39 .min_by_key(|(_, d)| *d)
40 .filter(|(_, d)| *d <= 4)
41 .map(|(c, _)| c.to_string())
42}
43
44fn validate_handler_names(names: &[String]) -> Result<(), ConfigError> {
45 for name in names {
46 if !KNOWN_HANDLERS.contains(&name.as_str()) {
47 return Err(ConfigError::UnknownHandler { key: name.clone() });
48 }
49 }
50 Ok(())
51}
52
53#[derive(Debug, Clone, Default)]
59pub struct RateLimitConfig {
60 pub requests: u32,
62 pub window_sec: u64,
63}
64
65#[derive(Debug, Clone)]
67pub struct WebhookHandlerConfig {
68 pub url: String,
69 pub auth_env: String,
72 pub rate_limit: Option<RateLimitConfig>,
73}
74
75#[derive(Debug, Default)]
77pub struct RouterConfig {
78 pub default_handlers: Option<Vec<String>>,
81 pub variant_routes: HashMap<String, Vec<String>>,
83 pub pagerduty: Option<WebhookHandlerConfig>,
85 pub generic_webhook: Option<WebhookHandlerConfig>,
87}
88
89#[derive(Debug)]
94pub enum ConfigError {
95 UnknownVariant {
96 key: String,
97 suggestion: Option<String>,
98 },
99 UnknownHandler {
100 key: String,
101 },
102 MissingEnvVar {
103 handler: String,
104 var: String,
105 },
106}
107
108impl std::fmt::Display for ConfigError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::UnknownVariant { key, suggestion } => {
112 write!(f, "unknown OperatorEvent variant '{key}'")?;
113 if let Some(s) = suggestion {
114 write!(f, "; did you mean '{s}'?")?;
115 }
116 Ok(())
117 }
118 Self::UnknownHandler { key } => write!(
119 f,
120 "unknown handler name '{key}'; known: {}",
121 KNOWN_HANDLERS.join(", ")
122 ),
123 Self::MissingEnvVar { handler, var } => write!(
124 f,
125 "handler '{handler}' requires env var '{var}' which is not set"
126 ),
127 }
128 }
129}
130
131impl std::error::Error for ConfigError {}
132
133#[derive(Debug)]
138struct TokenBucket {
139 tokens: f64,
140 rate: f64,
141 burst: f64,
142 last: Instant,
143}
144
145impl TokenBucket {
146 fn new(cfg: &RateLimitConfig) -> Self {
147 let rate = if cfg.window_sec > 0 {
148 cfg.requests as f64 / cfg.window_sec as f64
149 } else {
150 cfg.requests as f64
151 };
152 let burst = rate.max(1.0);
153 Self {
154 tokens: burst,
155 rate,
156 burst,
157 last: Instant::now(),
158 }
159 }
160
161 fn try_consume(&mut self) -> bool {
162 let now = Instant::now();
163 let elapsed = now.saturating_duration_since(self.last).as_secs_f64();
164 self.tokens = (self.tokens + elapsed * self.rate).min(self.burst);
165 self.last = now;
166 if self.tokens >= 1.0 {
167 self.tokens -= 1.0;
168 true
169 } else {
170 false
171 }
172 }
173}
174
175const QUEUE_CAPACITY: usize = 1_000;
180
181#[derive(Debug)]
182struct WebhookQueue {
183 inner: Mutex<VecDeque<WebhookPayload>>,
184 not_empty: Condvar,
185 dropped_queue_full: AtomicU64,
186 dropped_rate_limit: AtomicU64,
187 dropped_max_retries: AtomicU64,
188 sent: AtomicU64,
189}
190
191impl WebhookQueue {
192 fn new() -> Arc<Self> {
193 Arc::new(Self {
194 inner: Mutex::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
195 not_empty: Condvar::new(),
196 dropped_queue_full: AtomicU64::new(0),
197 dropped_rate_limit: AtomicU64::new(0),
198 dropped_max_retries: AtomicU64::new(0),
199 sent: AtomicU64::new(0),
200 })
201 }
202
203 fn push(&self, payload: WebhookPayload) {
204 let mut q = self.inner.lock().expect("webhook queue mutex");
205 if q.len() >= QUEUE_CAPACITY {
206 q.pop_front(); self.dropped_queue_full.fetch_add(1, Ordering::Relaxed);
208 }
209 q.push_back(payload);
210 drop(q);
211 self.not_empty.notify_one();
212 }
213
214 fn pop_blocking(&self) -> WebhookPayload {
215 let mut q = self.inner.lock().expect("webhook queue mutex");
216 loop {
217 if let Some(item) = q.pop_front() {
218 return item;
219 }
220 q = self.not_empty.wait(q).expect("webhook queue condvar");
221 }
222 }
223}
224
225#[derive(Clone, Debug)]
230struct WebhookPayload {
231 action: String,
232 summary: String,
233 ts_ms: u64,
234}
235
236impl WebhookPayload {
237 fn to_json_body(&self) -> String {
238 use crate::serde_json::Value;
241 let event_json = Value::String(self.action.clone()).to_string_compact();
242 let summary_json = Value::String(self.summary.clone()).to_string_compact();
243 format!(
244 r#"{{"event":{event_json},"summary":{summary_json},"ts":{}}}"#,
245 self.ts_ms
246 )
247 }
248}
249
250fn spawn_webhook_worker(
255 name: &str,
256 url: String,
257 auth_token: String,
258 queue: Arc<WebhookQueue>,
259) -> thread::JoinHandle<()> {
260 let name = name.to_string();
261 thread::Builder::new()
262 .name(format!("reddb-webhook-{name}"))
263 .spawn(move || {
264 let agent: ureq::Agent = ureq::Agent::config_builder()
265 .timeout_connect(Some(Duration::from_secs(3)))
266 .timeout_send_request(Some(Duration::from_secs(5)))
267 .timeout_recv_response(Some(Duration::from_secs(5)))
268 .http_status_as_error(false)
269 .build()
270 .into();
271
272 loop {
273 let payload = queue.pop_blocking();
274 let body = payload.to_json_body();
275 let bearer = format!("Bearer {auth_token}");
276
277 let mut success = false;
278 for attempt in 1u32..=3 {
279 let result = agent
280 .post(&url)
281 .header("content-type", "application/json")
282 .header("authorization", &bearer)
283 .send(body.as_bytes());
284
285 match result {
286 Ok(_) => {
287 queue.sent.fetch_add(1, Ordering::Relaxed);
288 success = true;
289 break;
290 }
291 Err(_) if attempt < 3 => {
292 thread::sleep(Duration::from_millis(100 * (1u64 << attempt)));
293 }
294 Err(_) => {}
295 }
296 }
297
298 if !success {
299 queue.dropped_max_retries.fetch_add(1, Ordering::Relaxed);
300 tracing::warn!(
301 target: "reddb::operator_router",
302 handler = %name,
303 "webhook delivery failed after 3 attempts; event dropped"
304 );
305 }
306 }
307 })
308 .expect("spawn webhook worker thread")
309}
310
311#[derive(Debug)]
316enum EffectiveHandler {
317 AuditLog,
318 Tracing,
319 Stderr,
320 Webhook {
321 name: String,
322 queue: Arc<WebhookQueue>,
323 rate_limiter: Option<Mutex<TokenBucket>>,
324 },
325}
326
327impl EffectiveHandler {
328 fn name(&self) -> &str {
329 match self {
330 Self::AuditLog => "audit_log",
331 Self::Tracing => "tracing",
332 Self::Stderr => "stderr",
333 Self::Webhook { name, .. } => name,
334 }
335 }
336}
337
338#[derive(Debug, Default)]
344pub struct RouterMetricsSnapshot {
345 pub dropped: Vec<(String, u64)>,
347 pub sent: Vec<(String, u64)>,
349}
350
351#[derive(Debug)]
362pub struct OperatorEventRouter {
363 audit_logger: Option<Arc<AuditLogger>>,
364 default_route: Vec<Arc<EffectiveHandler>>,
365 variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>>,
366 webhook_queues: HashMap<String, Arc<WebhookQueue>>,
367 _workers: Mutex<Vec<thread::JoinHandle<()>>>,
370}
371
372impl OperatorEventRouter {
373 pub fn new(
381 config: RouterConfig,
382 audit_logger: Option<Arc<AuditLogger>>,
383 ) -> Result<Self, ConfigError> {
384 let known_variants = OperatorEvent::all_variant_names();
385
386 for key in config.variant_routes.keys() {
388 if !known_variants.contains(&key.as_str()) {
389 let suggestion = closest_match(key, known_variants);
390 return Err(ConfigError::UnknownVariant {
391 key: key.clone(),
392 suggestion,
393 });
394 }
395 }
396
397 for names in config.variant_routes.values() {
399 validate_handler_names(names)?;
400 }
401 if let Some(ref dh) = config.default_handlers {
402 validate_handler_names(dh)?;
403 }
404
405 let mut webhook_queues: HashMap<String, Arc<WebhookQueue>> = HashMap::new();
407 let mut workers: Vec<thread::JoinHandle<()>> = Vec::new();
408
409 let pd_handler = config
410 .pagerduty
411 .as_ref()
412 .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
413 let token =
414 std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
415 handler: "pagerduty".into(),
416 var: cfg.auth_env.clone(),
417 })?;
418 let queue = WebhookQueue::new();
419 webhook_queues.insert("pagerduty".into(), Arc::clone(&queue));
420 workers.push(spawn_webhook_worker(
421 "pagerduty",
422 cfg.url.clone(),
423 token,
424 Arc::clone(&queue),
425 ));
426 let rate_limiter = cfg
427 .rate_limit
428 .as_ref()
429 .map(|rl| Mutex::new(TokenBucket::new(rl)));
430 Ok(Arc::new(EffectiveHandler::Webhook {
431 name: "pagerduty".into(),
432 queue,
433 rate_limiter,
434 }))
435 })
436 .transpose()?;
437
438 let gw_handler = config
439 .generic_webhook
440 .as_ref()
441 .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
442 let token =
443 std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
444 handler: "generic_webhook".into(),
445 var: cfg.auth_env.clone(),
446 })?;
447 let queue = WebhookQueue::new();
448 webhook_queues.insert("generic_webhook".into(), Arc::clone(&queue));
449 workers.push(spawn_webhook_worker(
450 "generic_webhook",
451 cfg.url.clone(),
452 token,
453 Arc::clone(&queue),
454 ));
455 let rate_limiter = cfg
456 .rate_limit
457 .as_ref()
458 .map(|rl| Mutex::new(TokenBucket::new(rl)));
459 Ok(Arc::new(EffectiveHandler::Webhook {
460 name: "generic_webhook".into(),
461 queue,
462 rate_limiter,
463 }))
464 })
465 .transpose()?;
466
467 let resolve = |names: &[String]| -> Vec<Arc<EffectiveHandler>> {
471 names
472 .iter()
473 .filter_map(|n| match n.as_str() {
474 "audit_log" => Some(Arc::new(EffectiveHandler::AuditLog)),
475 "tracing" => Some(Arc::new(EffectiveHandler::Tracing)),
476 "stderr" => Some(Arc::new(EffectiveHandler::Stderr)),
477 "pagerduty" => pd_handler.clone(),
478 "generic_webhook" => gw_handler.clone(),
479 _ => None,
480 })
481 .collect()
482 };
483
484 let code_default = vec!["audit_log".to_string(), "tracing".to_string()];
485 let default_names = config.default_handlers.as_deref().unwrap_or(&code_default);
486 let default_route = resolve(default_names);
487
488 let mut variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>> = HashMap::new();
489 for (key, names) in &config.variant_routes {
490 if let Some(static_key) = known_variants.iter().copied().find(|v| *v == key.as_str()) {
491 variant_routes.insert(static_key, resolve(names));
492 }
493 }
494
495 Ok(Self {
496 audit_logger,
497 default_route,
498 variant_routes,
499 webhook_queues,
500 _workers: Mutex::new(workers),
501 })
502 }
503
504 pub fn route(&self, event: OperatorEvent) {
509 let variant = event.variant_name();
510 let handlers = self
511 .variant_routes
512 .get(variant)
513 .unwrap_or(&self.default_route);
514
515 let (action, fields, summary) = event.decompose();
516 let ts_ms = crate::utils::now_unix_millis();
517
518 for handler in handlers {
519 match handler.as_ref() {
520 EffectiveHandler::AuditLog => {
521 if let Some(audit) = &self.audit_logger {
522 let ev = AuditEvent::builder(action)
523 .source(AuditAuthSource::System)
524 .outcome(Outcome::Error)
525 .fields(fields.clone())
526 .build();
527 audit.record_event(ev);
528 }
529 }
530 EffectiveHandler::Tracing => {
531 tracing::warn!(target: "reddb::operator", "{summary}");
532 }
533 EffectiveHandler::Stderr => {
534 eprintln!("[reddb::operator] {summary}");
535 }
536 EffectiveHandler::Webhook {
537 name,
538 queue,
539 rate_limiter,
540 } => {
541 if let Some(rl) = rate_limiter {
542 let allowed = rl.lock().expect("rate limiter mutex").try_consume();
543 if !allowed {
544 queue.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
545 tracing::debug!(
546 target: "reddb::operator_router",
547 handler = %name,
548 "event rate-limited; skipping webhook"
549 );
550 continue;
551 }
552 }
553 queue.push(WebhookPayload {
554 action: action.to_string(),
555 summary: summary.clone(),
556 ts_ms,
557 });
558 }
559 }
560 }
561 }
562
563 pub fn metrics(&self) -> RouterMetricsSnapshot {
565 let mut snap = RouterMetricsSnapshot::default();
566 for (name, q) in &self.webhook_queues {
567 let dropped = q.dropped_queue_full.load(Ordering::Relaxed)
568 + q.dropped_rate_limit.load(Ordering::Relaxed)
569 + q.dropped_max_retries.load(Ordering::Relaxed);
570 snap.dropped.push((name.clone(), dropped));
571 snap.sent
572 .push((name.clone(), q.sent.load(Ordering::Relaxed)));
573 }
574 snap
575 }
576}
577
578#[cfg(test)]
583mod tests {
584 use std::io::Read;
585 use std::net::TcpListener;
586 use std::sync::Arc;
587
588 use super::*;
589 use crate::runtime::audit_log::AuditLogger;
590
591 fn make_audit_logger() -> (Arc<AuditLogger>, std::path::PathBuf) {
592 let mut dir = std::env::temp_dir();
593 dir.push(format!(
594 "reddb-router-test-{}-{}",
595 std::process::id(),
596 crate::utils::now_unix_nanos()
597 ));
598 std::fs::create_dir_all(&dir).unwrap();
599 let path = dir.join(".audit.log");
600 let logger = Arc::new(AuditLogger::with_path(path.clone()));
601 (logger, path)
602 }
603
604 fn drain(logger: &AuditLogger) {
605 assert!(
606 logger.wait_idle(Duration::from_secs(2)),
607 "audit logger drain timed out"
608 );
609 }
610
611 fn last_audit_action(path: &std::path::Path) -> Option<String> {
612 let body = std::fs::read_to_string(path).ok()?;
613 let line = body.lines().last()?;
614 let v: crate::serde_json::Value = crate::serde_json::from_str(line).ok()?;
615 v.get("action")
616 .and_then(|x| x.as_str())
617 .map(|s| s.to_string())
618 }
619
620 #[test]
625 fn empty_config_routes_all_variants_to_audit_and_tracing() {
626 let (audit, path) = make_audit_logger();
627 let router = OperatorEventRouter::new(RouterConfig::default(), Some(Arc::clone(&audit)))
628 .expect("router build");
629
630 let variants: &[OperatorEvent] = &[
631 OperatorEvent::ReplicationBroken {
632 peer: "p".into(),
633 reason: "r".into(),
634 },
635 OperatorEvent::Divergence {
636 peer: "p".into(),
637 leader_lsn: 1,
638 follower_lsn: 0,
639 },
640 OperatorEvent::WalFsyncFailed {
641 path: "/d".into(),
642 error: "e".into(),
643 },
644 OperatorEvent::DiskSpaceCritical {
645 path: "/d".into(),
646 available_bytes: 1,
647 threshold_bytes: 2,
648 },
649 OperatorEvent::AuthBypass {
650 principal: "a".into(),
651 resource: "r".into(),
652 detail: "d".into(),
653 },
654 OperatorEvent::AdminCapabilityGranted {
655 granted_to: "a".into(),
656 capability: "c".into(),
657 granted_by: "b".into(),
658 },
659 OperatorEvent::SecretRotationFailed {
660 secret_ref: "s".into(),
661 error: "e".into(),
662 },
663 OperatorEvent::ConfigChanged {
664 key: "k".into(),
665 old_value: "o".into(),
666 new_value: "n".into(),
667 changed_by: "b".into(),
668 },
669 OperatorEvent::StartupFailed {
670 phase: "p".into(),
671 error: "e".into(),
672 },
673 OperatorEvent::ShutdownForced { reason: "r".into() },
674 OperatorEvent::SchemaCorruption {
675 collection: "c".into(),
676 detail: "d".into(),
677 },
678 OperatorEvent::CheckpointFailed {
679 lsn: 1,
680 error: "e".into(),
681 },
682 OperatorEvent::ConfigChangeRequiresRestart {
683 fields_changed: "f".into(),
684 },
685 OperatorEvent::SubscriptionSchemaChange {
686 collection: "c".into(),
687 subscription_names: "sub1".into(),
688 fields_added: "phone".into(),
689 fields_removed: "".into(),
690 lsn: 42,
691 },
692 OperatorEvent::OutboxDlqActivated {
693 queue: "users_events".into(),
694 dlq: "users_events_outbox_dlq".into(),
695 reason: "queue_full".into(),
696 },
697 ];
698
699 for event in variants {
702 let vname = event.variant_name();
704 router.route(clone_event(event));
705 let _ = vname; }
707
708 drain(&audit);
709 let action = last_audit_action(&path).expect("at least one audit line");
711 assert!(action.starts_with("operator/"), "action={action}");
712 }
713
714 fn clone_event(e: &OperatorEvent) -> OperatorEvent {
716 match e {
717 OperatorEvent::ReplicationBroken { peer, reason } => OperatorEvent::ReplicationBroken {
718 peer: peer.clone(),
719 reason: reason.clone(),
720 },
721 OperatorEvent::Divergence {
722 peer,
723 leader_lsn,
724 follower_lsn,
725 } => OperatorEvent::Divergence {
726 peer: peer.clone(),
727 leader_lsn: *leader_lsn,
728 follower_lsn: *follower_lsn,
729 },
730 OperatorEvent::WalFsyncFailed { path, error } => OperatorEvent::WalFsyncFailed {
731 path: path.clone(),
732 error: error.clone(),
733 },
734 OperatorEvent::DiskSpaceCritical {
735 path,
736 available_bytes,
737 threshold_bytes,
738 } => OperatorEvent::DiskSpaceCritical {
739 path: path.clone(),
740 available_bytes: *available_bytes,
741 threshold_bytes: *threshold_bytes,
742 },
743 OperatorEvent::AuthBypass {
744 principal,
745 resource,
746 detail,
747 } => OperatorEvent::AuthBypass {
748 principal: principal.clone(),
749 resource: resource.clone(),
750 detail: detail.clone(),
751 },
752 OperatorEvent::AdminCapabilityGranted {
753 granted_to,
754 capability,
755 granted_by,
756 } => OperatorEvent::AdminCapabilityGranted {
757 granted_to: granted_to.clone(),
758 capability: capability.clone(),
759 granted_by: granted_by.clone(),
760 },
761 OperatorEvent::SecretRotationFailed { secret_ref, error } => {
762 OperatorEvent::SecretRotationFailed {
763 secret_ref: secret_ref.clone(),
764 error: error.clone(),
765 }
766 }
767 OperatorEvent::ConfigChanged {
768 key,
769 old_value,
770 new_value,
771 changed_by,
772 } => OperatorEvent::ConfigChanged {
773 key: key.clone(),
774 old_value: old_value.clone(),
775 new_value: new_value.clone(),
776 changed_by: changed_by.clone(),
777 },
778 OperatorEvent::StartupFailed { phase, error } => OperatorEvent::StartupFailed {
779 phase: phase.clone(),
780 error: error.clone(),
781 },
782 OperatorEvent::ShutdownForced { reason } => OperatorEvent::ShutdownForced {
783 reason: reason.clone(),
784 },
785 OperatorEvent::SchemaCorruption { collection, detail } => {
786 OperatorEvent::SchemaCorruption {
787 collection: collection.clone(),
788 detail: detail.clone(),
789 }
790 }
791 OperatorEvent::CheckpointFailed { lsn, error } => OperatorEvent::CheckpointFailed {
792 lsn: *lsn,
793 error: error.clone(),
794 },
795 OperatorEvent::ConfigChangeRequiresRestart { fields_changed } => {
796 OperatorEvent::ConfigChangeRequiresRestart {
797 fields_changed: fields_changed.clone(),
798 }
799 }
800 OperatorEvent::DanglingAdminIntent { .. } => {
801 OperatorEvent::ShutdownForced {
803 reason: "clone_placeholder".into(),
804 }
805 }
806 OperatorEvent::SubscriptionSchemaChange {
807 collection,
808 subscription_names,
809 fields_added,
810 fields_removed,
811 lsn,
812 } => OperatorEvent::SubscriptionSchemaChange {
813 collection: collection.clone(),
814 subscription_names: subscription_names.clone(),
815 fields_added: fields_added.clone(),
816 fields_removed: fields_removed.clone(),
817 lsn: *lsn,
818 },
819 OperatorEvent::OutboxDlqActivated { queue, dlq, reason } => {
820 OperatorEvent::OutboxDlqActivated {
821 queue: queue.clone(),
822 dlq: dlq.clone(),
823 reason: reason.clone(),
824 }
825 }
826 }
827 }
828
829 #[test]
834 fn unknown_variant_gives_suggestion() {
835 let mut config = RouterConfig::default();
836 config
838 .variant_routes
839 .insert("AuthBypas".into(), vec!["audit_log".into()]);
840 let err = OperatorEventRouter::new(config, None).unwrap_err();
841 match err {
842 ConfigError::UnknownVariant { key, suggestion } => {
843 assert_eq!(key, "AuthBypas");
844 assert_eq!(suggestion.as_deref(), Some("AuthBypass"));
846 }
847 other => panic!("expected UnknownVariant, got: {other}"),
848 }
849 }
850
851 #[test]
852 fn unknown_handler_name_is_rejected() {
853 let mut config = RouterConfig::default();
854 config.variant_routes.insert(
855 "AuthBypass".into(),
856 vec!["slack".into()], );
858 let err = OperatorEventRouter::new(config, None).unwrap_err();
859 assert!(matches!(err, ConfigError::UnknownHandler { .. }));
860 }
861
862 #[test]
867 fn per_variant_route_overrides_default() {
868 let (audit, path) = make_audit_logger();
869 let mut config = RouterConfig::default();
870 config
872 .variant_routes
873 .insert("AuthBypass".into(), vec!["stderr".into()]);
874 let router = OperatorEventRouter::new(config, Some(Arc::clone(&audit))).unwrap();
875
876 router.route(OperatorEvent::AuthBypass {
878 principal: "test".into(),
879 resource: "/secret".into(),
880 detail: "test override".into(),
881 });
882 drain(&audit);
883 let body = std::fs::read_to_string(&path).unwrap_or_default();
885 assert!(
886 body.lines().all(|l| !l.contains("auth_bypass")),
887 "auth_bypass should not appear in audit (stderr-only route)"
888 );
889
890 router.route(OperatorEvent::ShutdownForced {
892 reason: "test".into(),
893 });
894 drain(&audit);
895 let action = last_audit_action(&path).expect("shutdown_forced in audit");
896 assert_eq!(action, "operator/shutdown_forced");
897 }
898
899 #[test]
904 fn token_bucket_throttles_after_burst() {
905 let mut bucket = TokenBucket::new(&RateLimitConfig {
906 requests: 3,
907 window_sec: 60,
908 });
909 assert!(bucket.try_consume(), "first consume should succeed");
912 assert!(!bucket.try_consume(), "second consume should be throttled");
914 }
915
916 #[test]
917 fn token_bucket_refills_over_time() {
918 let mut bucket = TokenBucket::new(&RateLimitConfig {
919 requests: 100,
920 window_sec: 1,
921 });
922 for _ in 0..100 {
924 assert!(bucket.try_consume());
925 }
926 assert!(!bucket.try_consume(), "burst exhausted");
927 thread::sleep(Duration::from_millis(20));
928 assert!(bucket.try_consume(), "should refill after sleep");
930 }
931
932 #[test]
937 fn queue_drops_oldest_on_saturation() {
938 let queue = WebhookQueue::new();
939 for i in 0..QUEUE_CAPACITY {
940 queue.push(WebhookPayload {
941 action: format!("ev/{i}"),
942 summary: format!("s{i}"),
943 ts_ms: i as u64,
944 });
945 }
946 assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 0);
947
948 queue.push(WebhookPayload {
950 action: "ev/overflow".into(),
951 summary: "overflow".into(),
952 ts_ms: QUEUE_CAPACITY as u64,
953 });
954 assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 1);
955
956 let first = queue.pop_blocking();
958 assert_eq!(first.action, "ev/1");
959 }
960
961 #[test]
966 fn webhook_delivers_payload_to_mock_server() {
967 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
969 let addr = listener.local_addr().unwrap();
970 let url = format!("http://{addr}/webhook");
971
972 let server_thread = thread::spawn(move || {
974 let (mut stream, _) = listener.accept().unwrap();
975 let mut buf = vec![0u8; 4096];
976 let n = stream.read(&mut buf).unwrap_or(0);
977 String::from_utf8_lossy(&buf[..n]).to_string()
978 });
979
980 std::env::set_var("TEST_WEBHOOK_TOKEN_ROUTER", "test-token-42");
982
983 let config = RouterConfig {
984 default_handlers: None,
985 variant_routes: {
986 let mut m = HashMap::new();
987 m.insert("ShutdownForced".into(), vec!["generic_webhook".into()]);
988 m
989 },
990 pagerduty: None,
991 generic_webhook: Some(WebhookHandlerConfig {
992 url,
993 auth_env: "TEST_WEBHOOK_TOKEN_ROUTER".into(),
994 rate_limit: None,
995 }),
996 };
997
998 let router = OperatorEventRouter::new(config, None).unwrap();
999 router.route(OperatorEvent::ShutdownForced {
1000 reason: "integration-test".into(),
1001 });
1002
1003 let raw = server_thread.join().expect("server thread");
1004 assert!(raw.contains("Bearer test-token-42"), "missing auth header");
1006 assert!(raw.contains("shutdown_forced"), "missing event in body");
1007 }
1008
1009 #[test]
1014 fn concurrent_route_calls_safe() {
1015 let router = Arc::new(OperatorEventRouter::new(RouterConfig::default(), None).unwrap());
1016 let handles: Vec<_> = (0..16)
1017 .map(|_| {
1018 let r = Arc::clone(&router);
1019 thread::spawn(move || {
1020 for _ in 0..50 {
1021 r.route(OperatorEvent::ShutdownForced {
1022 reason: "stress".into(),
1023 });
1024 }
1025 })
1026 })
1027 .collect();
1028 for h in handles {
1029 h.join().unwrap();
1030 }
1031 }
1033
1034 #[test]
1039 fn missing_env_var_fails_at_construction() {
1040 let config = RouterConfig {
1041 default_handlers: None,
1042 variant_routes: HashMap::new(),
1043 pagerduty: Some(WebhookHandlerConfig {
1044 url: "http://localhost/pd".into(),
1045 auth_env: "REDDB_TEST_PD_KEY_DEFINITELY_NOT_SET_12345".into(),
1046 rate_limit: None,
1047 }),
1048 generic_webhook: None,
1049 };
1050 let err = OperatorEventRouter::new(config, None).unwrap_err();
1051 assert!(matches!(err, ConfigError::MissingEnvVar { .. }));
1052 }
1053}