Skip to main content

reddb_server/telemetry/
operator_event_router.rs

1//! Config-driven per-variant routing for [`OperatorEvent`].
2//!
3//! See `docs/operations/logging.md` § "Operator event routing" for the TOML
4//! config schema and examples.
5//!
6//! # Routing resolution order
7//!
8//! 1. Per-variant block (`variant_routes["AuthBypass"]`) — most specific.
9//! 2. `default_handlers` in config — user-supplied default.
10//! 3. Code default `["audit_log", "tracing"]` — zero upgrade burden.
11//!
12//! Empty config = identical to the current `OperatorEvent::emit()` behaviour.
13
14use std::collections::{HashMap, VecDeque};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, Condvar, Mutex};
17use std::thread;
18use std::time::{Duration, Instant};
19
20use super::operator_event::OperatorEvent;
21use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
22
23// ---------------------------------------------------------------------------
24// Validation helpers
25// ---------------------------------------------------------------------------
26
27const KNOWN_HANDLERS: &[&str] = &[
28    "audit_log",
29    "tracing",
30    "stderr",
31    "pagerduty",
32    "generic_webhook",
33];
34
35fn closest_match(input: &str, candidates: &[&str]) -> Option<String> {
36    candidates
37        .iter()
38        .map(|c| (*c, strsim::levenshtein(input, c)))
39        .min_by_key(|(_, d)| *d)
40        .filter(|(_, d)| *d <= 4)
41        .map(|(c, _)| c.to_string())
42}
43
44fn validate_handler_names(names: &[String]) -> Result<(), ConfigError> {
45    for name in names {
46        if !KNOWN_HANDLERS.contains(&name.as_str()) {
47            return Err(ConfigError::UnknownHandler { key: name.clone() });
48        }
49    }
50    Ok(())
51}
52
53// ---------------------------------------------------------------------------
54// Config types
55// ---------------------------------------------------------------------------
56
57/// Token-bucket rate limit for a webhook handler.
58#[derive(Debug, Clone, Default)]
59pub struct RateLimitConfig {
60    /// Number of requests allowed per `window_sec`.
61    pub requests: u32,
62    pub window_sec: u64,
63}
64
65/// Config for a webhook handler (PagerDuty or generic).
66#[derive(Debug, Clone)]
67pub struct WebhookHandlerConfig {
68    pub url: String,
69    /// Name of the environment variable holding the bearer token.
70    /// Resolved at router construction time; boot fails if unset.
71    pub auth_env: String,
72    pub rate_limit: Option<RateLimitConfig>,
73}
74
75/// Full router configuration (`[telemetry.operator_event]` in TOML).
76#[derive(Debug, Default)]
77pub struct RouterConfig {
78    /// Handler list applied when no per-variant route matches.
79    /// `None` → code default `["audit_log", "tracing"]`.
80    pub default_handlers: Option<Vec<String>>,
81    /// Per-variant overrides. Keys are CamelCase variant names.
82    pub variant_routes: HashMap<String, Vec<String>>,
83    /// PagerDuty webhook — referenced as `"pagerduty"` in handler lists.
84    pub pagerduty: Option<WebhookHandlerConfig>,
85    /// Generic webhook — referenced as `"generic_webhook"` in handler lists.
86    pub generic_webhook: Option<WebhookHandlerConfig>,
87}
88
89// ---------------------------------------------------------------------------
90// Config error
91// ---------------------------------------------------------------------------
92
93#[derive(Debug)]
94pub enum ConfigError {
95    UnknownVariant {
96        key: String,
97        suggestion: Option<String>,
98    },
99    UnknownHandler {
100        key: String,
101    },
102    MissingEnvVar {
103        handler: String,
104        var: String,
105    },
106}
107
108impl std::fmt::Display for ConfigError {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Self::UnknownVariant { key, suggestion } => {
112                write!(f, "unknown OperatorEvent variant '{key}'")?;
113                if let Some(s) = suggestion {
114                    write!(f, "; did you mean '{s}'?")?;
115                }
116                Ok(())
117            }
118            Self::UnknownHandler { key } => write!(
119                f,
120                "unknown handler name '{key}'; known: {}",
121                KNOWN_HANDLERS.join(", ")
122            ),
123            Self::MissingEnvVar { handler, var } => write!(
124                f,
125                "handler '{handler}' requires env var '{var}' which is not set"
126            ),
127        }
128    }
129}
130
131impl std::error::Error for ConfigError {}
132
133// ---------------------------------------------------------------------------
134// Token bucket (per-handler rate limit)
135// ---------------------------------------------------------------------------
136
137#[derive(Debug)]
138struct TokenBucket {
139    tokens: f64,
140    rate: f64,
141    burst: f64,
142    last: Instant,
143}
144
145impl TokenBucket {
146    fn new(cfg: &RateLimitConfig) -> Self {
147        let rate = if cfg.window_sec > 0 {
148            cfg.requests as f64 / cfg.window_sec as f64
149        } else {
150            cfg.requests as f64
151        };
152        let burst = rate.max(1.0);
153        Self {
154            tokens: burst,
155            rate,
156            burst,
157            last: Instant::now(),
158        }
159    }
160
161    fn try_consume(&mut self) -> bool {
162        let now = Instant::now();
163        let elapsed = now.saturating_duration_since(self.last).as_secs_f64();
164        self.tokens = (self.tokens + elapsed * self.rate).min(self.burst);
165        self.last = now;
166        if self.tokens >= 1.0 {
167            self.tokens -= 1.0;
168            true
169        } else {
170            false
171        }
172    }
173}
174
175// ---------------------------------------------------------------------------
176// Webhook queue (bounded, drop-oldest on saturation)
177// ---------------------------------------------------------------------------
178
179const QUEUE_CAPACITY: usize = 1_000;
180
181#[derive(Debug)]
182struct WebhookQueue {
183    inner: Mutex<VecDeque<WebhookPayload>>,
184    not_empty: Condvar,
185    dropped_queue_full: AtomicU64,
186    dropped_rate_limit: AtomicU64,
187    dropped_max_retries: AtomicU64,
188    sent: AtomicU64,
189}
190
191impl WebhookQueue {
192    fn new() -> Arc<Self> {
193        Arc::new(Self {
194            inner: Mutex::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
195            not_empty: Condvar::new(),
196            dropped_queue_full: AtomicU64::new(0),
197            dropped_rate_limit: AtomicU64::new(0),
198            dropped_max_retries: AtomicU64::new(0),
199            sent: AtomicU64::new(0),
200        })
201    }
202
203    fn push(&self, payload: WebhookPayload) {
204        let mut q = self.inner.lock().expect("webhook queue mutex");
205        if q.len() >= QUEUE_CAPACITY {
206            q.pop_front(); // drop oldest
207            self.dropped_queue_full.fetch_add(1, Ordering::Relaxed);
208        }
209        q.push_back(payload);
210        drop(q);
211        self.not_empty.notify_one();
212    }
213
214    fn pop_blocking(&self) -> WebhookPayload {
215        let mut q = self.inner.lock().expect("webhook queue mutex");
216        loop {
217            if let Some(item) = q.pop_front() {
218                return item;
219            }
220            q = self.not_empty.wait(q).expect("webhook queue condvar");
221        }
222    }
223}
224
225// ---------------------------------------------------------------------------
226// Webhook payload
227// ---------------------------------------------------------------------------
228
229#[derive(Clone, Debug)]
230struct WebhookPayload {
231    action: String,
232    summary: String,
233    ts_ms: u64,
234}
235
236impl WebhookPayload {
237    fn to_json_body(&self) -> String {
238        // Build JSON via the serde_json encoder so special chars in event/summary
239        // are escaped correctly (RFC 8259 §7 per ADR 0010).
240        use crate::serde_json::Value;
241        let event_json = Value::String(self.action.clone()).to_string_compact();
242        let summary_json = Value::String(self.summary.clone()).to_string_compact();
243        format!(
244            r#"{{"event":{event_json},"summary":{summary_json},"ts":{}}}"#,
245            self.ts_ms
246        )
247    }
248}
249
250// ---------------------------------------------------------------------------
251// Background webhook worker
252// ---------------------------------------------------------------------------
253
254fn spawn_webhook_worker(
255    name: &str,
256    url: String,
257    auth_token: String,
258    queue: Arc<WebhookQueue>,
259) -> thread::JoinHandle<()> {
260    let name = name.to_string();
261    thread::Builder::new()
262        .name(format!("reddb-webhook-{name}"))
263        .spawn(move || {
264            let agent: ureq::Agent = ureq::Agent::config_builder()
265                .timeout_connect(Some(Duration::from_secs(3)))
266                .timeout_send_request(Some(Duration::from_secs(5)))
267                .timeout_recv_response(Some(Duration::from_secs(5)))
268                .http_status_as_error(false)
269                .build()
270                .into();
271
272            loop {
273                let payload = queue.pop_blocking();
274                let body = payload.to_json_body();
275                let bearer = format!("Bearer {auth_token}");
276
277                let mut success = false;
278                for attempt in 1u32..=3 {
279                    let result = agent
280                        .post(&url)
281                        .header("content-type", "application/json")
282                        .header("authorization", &bearer)
283                        .send(body.as_bytes());
284
285                    match result {
286                        Ok(_) => {
287                            queue.sent.fetch_add(1, Ordering::Relaxed);
288                            success = true;
289                            break;
290                        }
291                        Err(_) if attempt < 3 => {
292                            thread::sleep(Duration::from_millis(100 * (1u64 << attempt)));
293                        }
294                        Err(_) => {}
295                    }
296                }
297
298                if !success {
299                    queue.dropped_max_retries.fetch_add(1, Ordering::Relaxed);
300                    tracing::warn!(
301                        target: "reddb::operator_router",
302                        handler = %name,
303                        "webhook delivery failed after 3 attempts; event dropped"
304                    );
305                }
306            }
307        })
308        .expect("spawn webhook worker thread")
309}
310
311// ---------------------------------------------------------------------------
312// Effective handler (runtime form)
313// ---------------------------------------------------------------------------
314
315#[derive(Debug)]
316enum EffectiveHandler {
317    AuditLog,
318    Tracing,
319    Stderr,
320    Webhook {
321        name: String,
322        queue: Arc<WebhookQueue>,
323        rate_limiter: Option<Mutex<TokenBucket>>,
324    },
325}
326
327impl EffectiveHandler {
328    fn name(&self) -> &str {
329        match self {
330            Self::AuditLog => "audit_log",
331            Self::Tracing => "tracing",
332            Self::Stderr => "stderr",
333            Self::Webhook { name, .. } => name,
334        }
335    }
336}
337
338// ---------------------------------------------------------------------------
339// Metrics snapshot
340// ---------------------------------------------------------------------------
341
342/// Prometheus-style counters snapshot for external scraping.
343#[derive(Debug, Default)]
344pub struct RouterMetricsSnapshot {
345    /// `(handler_name, dropped_count)` — aggregated across all drop reasons.
346    pub dropped: Vec<(String, u64)>,
347    /// `(handler_name, sent_count)` for webhook handlers.
348    pub sent: Vec<(String, u64)>,
349}
350
351// ---------------------------------------------------------------------------
352// OperatorEventRouter
353// ---------------------------------------------------------------------------
354
355/// Config-driven dispatcher for [`OperatorEvent`].
356///
357/// Build with [`OperatorEventRouter::new`]; the construction validates the
358/// config strictly (unknown variant names / handler names / missing env vars
359/// all return `Err`). With an empty config the router behaves identically to
360/// the current `OperatorEvent::emit()`.
361#[derive(Debug)]
362pub struct OperatorEventRouter {
363    audit_logger: Option<Arc<AuditLogger>>,
364    default_route: Vec<Arc<EffectiveHandler>>,
365    variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>>,
366    webhook_queues: HashMap<String, Arc<WebhookQueue>>,
367    // Mutex makes the field Sync (JoinHandle is Send but !Sync); the workers
368    // run until process exit and are never joined during normal operation.
369    _workers: Mutex<Vec<thread::JoinHandle<()>>>,
370}
371
372impl OperatorEventRouter {
373    /// Build a router.
374    ///
375    /// Validation order (boot fail-fast on any error):
376    /// 1. Parse → struct (caller's responsibility for TOML).
377    /// 2. Strict variant name check with Levenshtein suggestion.
378    /// 3. Strict handler name check (closed set).
379    /// 4. Webhook env-var presence check.
380    pub fn new(
381        config: RouterConfig,
382        audit_logger: Option<Arc<AuditLogger>>,
383    ) -> Result<Self, ConfigError> {
384        let known_variants = OperatorEvent::all_variant_names();
385
386        // --- Validate variant names ---
387        for key in config.variant_routes.keys() {
388            if !known_variants.contains(&key.as_str()) {
389                let suggestion = closest_match(key, known_variants);
390                return Err(ConfigError::UnknownVariant {
391                    key: key.clone(),
392                    suggestion,
393                });
394            }
395        }
396
397        // --- Validate handler names ---
398        for names in config.variant_routes.values() {
399            validate_handler_names(names)?;
400        }
401        if let Some(ref dh) = config.default_handlers {
402            validate_handler_names(dh)?;
403        }
404
405        // --- Build webhook handlers (env-var check happens here) ---
406        let mut webhook_queues: HashMap<String, Arc<WebhookQueue>> = HashMap::new();
407        let mut workers: Vec<thread::JoinHandle<()>> = Vec::new();
408
409        let pd_handler = config
410            .pagerduty
411            .as_ref()
412            .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
413                let token =
414                    std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
415                        handler: "pagerduty".into(),
416                        var: cfg.auth_env.clone(),
417                    })?;
418                let queue = WebhookQueue::new();
419                webhook_queues.insert("pagerduty".into(), Arc::clone(&queue));
420                workers.push(spawn_webhook_worker(
421                    "pagerduty",
422                    cfg.url.clone(),
423                    token,
424                    Arc::clone(&queue),
425                ));
426                let rate_limiter = cfg
427                    .rate_limit
428                    .as_ref()
429                    .map(|rl| Mutex::new(TokenBucket::new(rl)));
430                Ok(Arc::new(EffectiveHandler::Webhook {
431                    name: "pagerduty".into(),
432                    queue,
433                    rate_limiter,
434                }))
435            })
436            .transpose()?;
437
438        let gw_handler = config
439            .generic_webhook
440            .as_ref()
441            .map(|cfg| -> Result<Arc<EffectiveHandler>, ConfigError> {
442                let token =
443                    std::env::var(&cfg.auth_env).map_err(|_| ConfigError::MissingEnvVar {
444                        handler: "generic_webhook".into(),
445                        var: cfg.auth_env.clone(),
446                    })?;
447                let queue = WebhookQueue::new();
448                webhook_queues.insert("generic_webhook".into(), Arc::clone(&queue));
449                workers.push(spawn_webhook_worker(
450                    "generic_webhook",
451                    cfg.url.clone(),
452                    token,
453                    Arc::clone(&queue),
454                ));
455                let rate_limiter = cfg
456                    .rate_limit
457                    .as_ref()
458                    .map(|rl| Mutex::new(TokenBucket::new(rl)));
459                Ok(Arc::new(EffectiveHandler::Webhook {
460                    name: "generic_webhook".into(),
461                    queue,
462                    rate_limiter,
463                }))
464            })
465            .transpose()?;
466
467        // Helper: resolve handler name list → runtime handler vec.
468        // Webhook handlers reference shared Arcs so the queue counters are
469        // shared between the router and the worker threads.
470        let resolve = |names: &[String]| -> Vec<Arc<EffectiveHandler>> {
471            names
472                .iter()
473                .filter_map(|n| match n.as_str() {
474                    "audit_log" => Some(Arc::new(EffectiveHandler::AuditLog)),
475                    "tracing" => Some(Arc::new(EffectiveHandler::Tracing)),
476                    "stderr" => Some(Arc::new(EffectiveHandler::Stderr)),
477                    "pagerduty" => pd_handler.clone(),
478                    "generic_webhook" => gw_handler.clone(),
479                    _ => None,
480                })
481                .collect()
482        };
483
484        let code_default = vec!["audit_log".to_string(), "tracing".to_string()];
485        let default_names = config.default_handlers.as_deref().unwrap_or(&code_default);
486        let default_route = resolve(default_names);
487
488        let mut variant_routes: HashMap<&'static str, Vec<Arc<EffectiveHandler>>> = HashMap::new();
489        for (key, names) in &config.variant_routes {
490            if let Some(static_key) = known_variants.iter().copied().find(|v| *v == key.as_str()) {
491                variant_routes.insert(static_key, resolve(names));
492            }
493        }
494
495        Ok(Self {
496            audit_logger,
497            default_route,
498            variant_routes,
499            webhook_queues,
500            _workers: Mutex::new(workers),
501        })
502    }
503
504    /// Dispatch `event` through the configured handlers.
505    ///
506    /// Always synchronous — safe to call from `Drop` impls, signal handlers,
507    /// and crash paths.
508    pub fn route(&self, event: OperatorEvent) {
509        let variant = event.variant_name();
510        let handlers = self
511            .variant_routes
512            .get(variant)
513            .unwrap_or(&self.default_route);
514
515        let (action, fields, summary) = event.decompose();
516        let ts_ms = crate::utils::now_unix_millis();
517
518        for handler in handlers {
519            match handler.as_ref() {
520                EffectiveHandler::AuditLog => {
521                    if let Some(audit) = &self.audit_logger {
522                        let ev = AuditEvent::builder(action)
523                            .source(AuditAuthSource::System)
524                            .outcome(Outcome::Error)
525                            .fields(fields.clone())
526                            .build();
527                        audit.record_event(ev);
528                    }
529                }
530                EffectiveHandler::Tracing => {
531                    tracing::warn!(target: "reddb::operator", "{summary}");
532                }
533                EffectiveHandler::Stderr => {
534                    eprintln!("[reddb::operator] {summary}");
535                }
536                EffectiveHandler::Webhook {
537                    name,
538                    queue,
539                    rate_limiter,
540                } => {
541                    if let Some(rl) = rate_limiter {
542                        let allowed = rl.lock().expect("rate limiter mutex").try_consume();
543                        if !allowed {
544                            queue.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
545                            tracing::debug!(
546                                target: "reddb::operator_router",
547                                handler = %name,
548                                "event rate-limited; skipping webhook"
549                            );
550                            continue;
551                        }
552                    }
553                    queue.push(WebhookPayload {
554                        action: action.to_string(),
555                        summary: summary.clone(),
556                        ts_ms,
557                    });
558                }
559            }
560        }
561    }
562
563    /// Prometheus-style metrics snapshot.
564    pub fn metrics(&self) -> RouterMetricsSnapshot {
565        let mut snap = RouterMetricsSnapshot::default();
566        for (name, q) in &self.webhook_queues {
567            let dropped = q.dropped_queue_full.load(Ordering::Relaxed)
568                + q.dropped_rate_limit.load(Ordering::Relaxed)
569                + q.dropped_max_retries.load(Ordering::Relaxed);
570            snap.dropped.push((name.clone(), dropped));
571            snap.sent
572                .push((name.clone(), q.sent.load(Ordering::Relaxed)));
573        }
574        snap
575    }
576}
577
578// ---------------------------------------------------------------------------
579// Tests
580// ---------------------------------------------------------------------------
581
582#[cfg(test)]
583mod tests {
584    use std::io::Read;
585    use std::net::TcpListener;
586    use std::sync::Arc;
587
588    use super::*;
589    use crate::runtime::audit_log::AuditLogger;
590
591    fn make_audit_logger() -> (Arc<AuditLogger>, std::path::PathBuf) {
592        let mut dir = std::env::temp_dir();
593        dir.push(format!(
594            "reddb-router-test-{}-{}",
595            std::process::id(),
596            crate::utils::now_unix_nanos()
597        ));
598        std::fs::create_dir_all(&dir).unwrap();
599        let path = dir.join(".audit.log");
600        let logger = Arc::new(AuditLogger::with_path(path.clone()));
601        (logger, path)
602    }
603
604    fn drain(logger: &AuditLogger) {
605        assert!(
606            logger.wait_idle(Duration::from_secs(2)),
607            "audit logger drain timed out"
608        );
609    }
610
611    fn last_audit_action(path: &std::path::Path) -> Option<String> {
612        let body = std::fs::read_to_string(path).ok()?;
613        let line = body.lines().last()?;
614        let v: crate::serde_json::Value = crate::serde_json::from_str(line).ok()?;
615        v.get("action")
616            .and_then(|x| x.as_str())
617            .map(|s| s.to_string())
618    }
619
620    // -----------------------------------------------------------------------
621    // Default routing: empty config = audit_log + tracing
622    // -----------------------------------------------------------------------
623
624    #[test]
625    fn empty_config_routes_all_variants_to_audit_and_tracing() {
626        let (audit, path) = make_audit_logger();
627        let router = OperatorEventRouter::new(RouterConfig::default(), Some(Arc::clone(&audit)))
628            .expect("router build");
629
630        let variants: &[OperatorEvent] = &[
631            OperatorEvent::ReplicationBroken {
632                peer: "p".into(),
633                reason: "r".into(),
634            },
635            OperatorEvent::Divergence {
636                peer: "p".into(),
637                leader_lsn: 1,
638                follower_lsn: 0,
639            },
640            OperatorEvent::WalFsyncFailed {
641                path: "/d".into(),
642                error: "e".into(),
643            },
644            OperatorEvent::DiskSpaceCritical {
645                path: "/d".into(),
646                available_bytes: 1,
647                threshold_bytes: 2,
648            },
649            OperatorEvent::AuthBypass {
650                principal: "a".into(),
651                resource: "r".into(),
652                detail: "d".into(),
653            },
654            OperatorEvent::AdminCapabilityGranted {
655                granted_to: "a".into(),
656                capability: "c".into(),
657                granted_by: "b".into(),
658            },
659            OperatorEvent::SecretRotationFailed {
660                secret_ref: "s".into(),
661                error: "e".into(),
662            },
663            OperatorEvent::ConfigChanged {
664                key: "k".into(),
665                old_value: "o".into(),
666                new_value: "n".into(),
667                changed_by: "b".into(),
668            },
669            OperatorEvent::StartupFailed {
670                phase: "p".into(),
671                error: "e".into(),
672            },
673            OperatorEvent::ShutdownForced { reason: "r".into() },
674            OperatorEvent::SchemaCorruption {
675                collection: "c".into(),
676                detail: "d".into(),
677            },
678            OperatorEvent::CheckpointFailed {
679                lsn: 1,
680                error: "e".into(),
681            },
682            OperatorEvent::ConfigChangeRequiresRestart {
683                fields_changed: "f".into(),
684            },
685            OperatorEvent::SubscriptionSchemaChange {
686                collection: "c".into(),
687                subscription_names: "sub1".into(),
688                fields_added: "phone".into(),
689                fields_removed: "".into(),
690                lsn: 42,
691            },
692            OperatorEvent::OutboxDlqActivated {
693                queue: "users_events".into(),
694                dlq: "users_events_outbox_dlq".into(),
695                reason: "queue_full".into(),
696            },
697        ];
698
699        // Verify count matches all_variant_names minus DanglingAdminIntent
700        // (which requires crypto types — tested separately).
701        for event in variants {
702            // Clone variant_name before routing (event is consumed by route).
703            let vname = event.variant_name();
704            router.route(clone_event(event));
705            let _ = vname; // just ensure it compiled
706        }
707
708        drain(&audit);
709        // At least one line written: the last event.
710        let action = last_audit_action(&path).expect("at least one audit line");
711        assert!(action.starts_with("operator/"), "action={action}");
712    }
713
714    // Clone helper for test variants that don't use crypto types.
715    fn clone_event(e: &OperatorEvent) -> OperatorEvent {
716        match e {
717            OperatorEvent::ReplicationBroken { peer, reason } => OperatorEvent::ReplicationBroken {
718                peer: peer.clone(),
719                reason: reason.clone(),
720            },
721            OperatorEvent::Divergence {
722                peer,
723                leader_lsn,
724                follower_lsn,
725            } => OperatorEvent::Divergence {
726                peer: peer.clone(),
727                leader_lsn: *leader_lsn,
728                follower_lsn: *follower_lsn,
729            },
730            OperatorEvent::WalFsyncFailed { path, error } => OperatorEvent::WalFsyncFailed {
731                path: path.clone(),
732                error: error.clone(),
733            },
734            OperatorEvent::DiskSpaceCritical {
735                path,
736                available_bytes,
737                threshold_bytes,
738            } => OperatorEvent::DiskSpaceCritical {
739                path: path.clone(),
740                available_bytes: *available_bytes,
741                threshold_bytes: *threshold_bytes,
742            },
743            OperatorEvent::AuthBypass {
744                principal,
745                resource,
746                detail,
747            } => OperatorEvent::AuthBypass {
748                principal: principal.clone(),
749                resource: resource.clone(),
750                detail: detail.clone(),
751            },
752            OperatorEvent::AdminCapabilityGranted {
753                granted_to,
754                capability,
755                granted_by,
756            } => OperatorEvent::AdminCapabilityGranted {
757                granted_to: granted_to.clone(),
758                capability: capability.clone(),
759                granted_by: granted_by.clone(),
760            },
761            OperatorEvent::SecretRotationFailed { secret_ref, error } => {
762                OperatorEvent::SecretRotationFailed {
763                    secret_ref: secret_ref.clone(),
764                    error: error.clone(),
765                }
766            }
767            OperatorEvent::ConfigChanged {
768                key,
769                old_value,
770                new_value,
771                changed_by,
772            } => OperatorEvent::ConfigChanged {
773                key: key.clone(),
774                old_value: old_value.clone(),
775                new_value: new_value.clone(),
776                changed_by: changed_by.clone(),
777            },
778            OperatorEvent::StartupFailed { phase, error } => OperatorEvent::StartupFailed {
779                phase: phase.clone(),
780                error: error.clone(),
781            },
782            OperatorEvent::ShutdownForced { reason } => OperatorEvent::ShutdownForced {
783                reason: reason.clone(),
784            },
785            OperatorEvent::SchemaCorruption { collection, detail } => {
786                OperatorEvent::SchemaCorruption {
787                    collection: collection.clone(),
788                    detail: detail.clone(),
789                }
790            }
791            OperatorEvent::CheckpointFailed { lsn, error } => OperatorEvent::CheckpointFailed {
792                lsn: *lsn,
793                error: error.clone(),
794            },
795            OperatorEvent::ConfigChangeRequiresRestart { fields_changed } => {
796                OperatorEvent::ConfigChangeRequiresRestart {
797                    fields_changed: fields_changed.clone(),
798                }
799            }
800            OperatorEvent::DanglingAdminIntent { .. } => {
801                // Not cloneable without crypto type impls; skip.
802                OperatorEvent::ShutdownForced {
803                    reason: "clone_placeholder".into(),
804                }
805            }
806            OperatorEvent::SubscriptionSchemaChange {
807                collection,
808                subscription_names,
809                fields_added,
810                fields_removed,
811                lsn,
812            } => OperatorEvent::SubscriptionSchemaChange {
813                collection: collection.clone(),
814                subscription_names: subscription_names.clone(),
815                fields_added: fields_added.clone(),
816                fields_removed: fields_removed.clone(),
817                lsn: *lsn,
818            },
819            OperatorEvent::OutboxDlqActivated { queue, dlq, reason } => {
820                OperatorEvent::OutboxDlqActivated {
821                    queue: queue.clone(),
822                    dlq: dlq.clone(),
823                    reason: reason.clone(),
824                }
825            }
826        }
827    }
828
829    // -----------------------------------------------------------------------
830    // Config validation: unknown variant name → Levenshtein suggestion
831    // -----------------------------------------------------------------------
832
833    #[test]
834    fn unknown_variant_gives_suggestion() {
835        let mut config = RouterConfig::default();
836        // "AuthBypas" is one edit away from "AuthBypass" (missing 's').
837        config
838            .variant_routes
839            .insert("AuthBypas".into(), vec!["audit_log".into()]);
840        let err = OperatorEventRouter::new(config, None).unwrap_err();
841        match err {
842            ConfigError::UnknownVariant { key, suggestion } => {
843                assert_eq!(key, "AuthBypas");
844                // strsim should suggest "AuthBypass" (levenshtein distance 1)
845                assert_eq!(suggestion.as_deref(), Some("AuthBypass"));
846            }
847            other => panic!("expected UnknownVariant, got: {other}"),
848        }
849    }
850
851    #[test]
852    fn unknown_handler_name_is_rejected() {
853        let mut config = RouterConfig::default();
854        config.variant_routes.insert(
855            "AuthBypass".into(),
856            vec!["slack".into()], // not a known handler
857        );
858        let err = OperatorEventRouter::new(config, None).unwrap_err();
859        assert!(matches!(err, ConfigError::UnknownHandler { .. }));
860    }
861
862    // -----------------------------------------------------------------------
863    // Per-variant route override
864    // -----------------------------------------------------------------------
865
866    #[test]
867    fn per_variant_route_overrides_default() {
868        let (audit, path) = make_audit_logger();
869        let mut config = RouterConfig::default();
870        // Route AuthBypass to stderr-only (no audit log for this test variant).
871        config
872            .variant_routes
873            .insert("AuthBypass".into(), vec!["stderr".into()]);
874        let router = OperatorEventRouter::new(config, Some(Arc::clone(&audit))).unwrap();
875
876        // Emit AuthBypass — should NOT go to audit (only stderr per config).
877        router.route(OperatorEvent::AuthBypass {
878            principal: "test".into(),
879            resource: "/secret".into(),
880            detail: "test override".into(),
881        });
882        drain(&audit);
883        // Audit log should still be empty (or unchanged from no lines).
884        let body = std::fs::read_to_string(&path).unwrap_or_default();
885        assert!(
886            body.lines().all(|l| !l.contains("auth_bypass")),
887            "auth_bypass should not appear in audit (stderr-only route)"
888        );
889
890        // Emit a different variant — should still go to default (audit+tracing).
891        router.route(OperatorEvent::ShutdownForced {
892            reason: "test".into(),
893        });
894        drain(&audit);
895        let action = last_audit_action(&path).expect("shutdown_forced in audit");
896        assert_eq!(action, "operator/shutdown_forced");
897    }
898
899    // -----------------------------------------------------------------------
900    // Token bucket rate limit
901    // -----------------------------------------------------------------------
902
903    #[test]
904    fn token_bucket_throttles_after_burst() {
905        let mut bucket = TokenBucket::new(&RateLimitConfig {
906            requests: 3,
907            window_sec: 60,
908        });
909        // rate = 3/60 = 0.05 t/s, burst = max(0.05, 1.0) = 1.0
910        // First consume should succeed.
911        assert!(bucket.try_consume(), "first consume should succeed");
912        // Second should fail (burst exhausted, no time passed).
913        assert!(!bucket.try_consume(), "second consume should be throttled");
914    }
915
916    #[test]
917    fn token_bucket_refills_over_time() {
918        let mut bucket = TokenBucket::new(&RateLimitConfig {
919            requests: 100,
920            window_sec: 1,
921        });
922        // rate = 100/s, burst = 100
923        for _ in 0..100 {
924            assert!(bucket.try_consume());
925        }
926        assert!(!bucket.try_consume(), "burst exhausted");
927        thread::sleep(Duration::from_millis(20));
928        // After 20ms at 100/s we get ~2 tokens.
929        assert!(bucket.try_consume(), "should refill after sleep");
930    }
931
932    // -----------------------------------------------------------------------
933    // Webhook queue: drop oldest on saturation
934    // -----------------------------------------------------------------------
935
936    #[test]
937    fn queue_drops_oldest_on_saturation() {
938        let queue = WebhookQueue::new();
939        for i in 0..QUEUE_CAPACITY {
940            queue.push(WebhookPayload {
941                action: format!("ev/{i}"),
942                summary: format!("s{i}"),
943                ts_ms: i as u64,
944            });
945        }
946        assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 0);
947
948        // One more push → drops oldest (ev/0).
949        queue.push(WebhookPayload {
950            action: "ev/overflow".into(),
951            summary: "overflow".into(),
952            ts_ms: QUEUE_CAPACITY as u64,
953        });
954        assert_eq!(queue.dropped_queue_full.load(Ordering::Relaxed), 1);
955
956        // Oldest item in queue should now be ev/1 (ev/0 was dropped).
957        let first = queue.pop_blocking();
958        assert_eq!(first.action, "ev/1");
959    }
960
961    // -----------------------------------------------------------------------
962    // Integration: mock webhook server receives payload
963    // -----------------------------------------------------------------------
964
965    #[test]
966    fn webhook_delivers_payload_to_mock_server() {
967        // Bind a local TCP server to act as the webhook endpoint.
968        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
969        let addr = listener.local_addr().unwrap();
970        let url = format!("http://{addr}/webhook");
971
972        // Accept one connection in a background thread.
973        let server_thread = thread::spawn(move || {
974            let (mut stream, _) = listener.accept().unwrap();
975            let mut buf = vec![0u8; 4096];
976            let n = stream.read(&mut buf).unwrap_or(0);
977            String::from_utf8_lossy(&buf[..n]).to_string()
978        });
979
980        // Set up env var for auth.
981        std::env::set_var("TEST_WEBHOOK_TOKEN_ROUTER", "test-token-42");
982
983        let config = RouterConfig {
984            default_handlers: None,
985            variant_routes: {
986                let mut m = HashMap::new();
987                m.insert("ShutdownForced".into(), vec!["generic_webhook".into()]);
988                m
989            },
990            pagerduty: None,
991            generic_webhook: Some(WebhookHandlerConfig {
992                url,
993                auth_env: "TEST_WEBHOOK_TOKEN_ROUTER".into(),
994                rate_limit: None,
995            }),
996        };
997
998        let router = OperatorEventRouter::new(config, None).unwrap();
999        router.route(OperatorEvent::ShutdownForced {
1000            reason: "integration-test".into(),
1001        });
1002
1003        let raw = server_thread.join().expect("server thread");
1004        // The request should contain our auth header and JSON body.
1005        assert!(raw.contains("Bearer test-token-42"), "missing auth header");
1006        assert!(raw.contains("shutdown_forced"), "missing event in body");
1007    }
1008
1009    // -----------------------------------------------------------------------
1010    // Race: concurrent route() calls don't corrupt rate limiter
1011    // -----------------------------------------------------------------------
1012
1013    #[test]
1014    fn concurrent_route_calls_safe() {
1015        let router = Arc::new(OperatorEventRouter::new(RouterConfig::default(), None).unwrap());
1016        let handles: Vec<_> = (0..16)
1017            .map(|_| {
1018                let r = Arc::clone(&router);
1019                thread::spawn(move || {
1020                    for _ in 0..50 {
1021                        r.route(OperatorEvent::ShutdownForced {
1022                            reason: "stress".into(),
1023                        });
1024                    }
1025                })
1026            })
1027            .collect();
1028        for h in handles {
1029            h.join().unwrap();
1030        }
1031        // No panic = pass.
1032    }
1033
1034    // -----------------------------------------------------------------------
1035    // Missing env var → boot fail-fast
1036    // -----------------------------------------------------------------------
1037
1038    #[test]
1039    fn missing_env_var_fails_at_construction() {
1040        let config = RouterConfig {
1041            default_handlers: None,
1042            variant_routes: HashMap::new(),
1043            pagerduty: Some(WebhookHandlerConfig {
1044                url: "http://localhost/pd".into(),
1045                auth_env: "REDDB_TEST_PD_KEY_DEFINITELY_NOT_SET_12345".into(),
1046                rate_limit: None,
1047            }),
1048            generic_webhook: None,
1049        };
1050        let err = OperatorEventRouter::new(config, None).unwrap_err();
1051        assert!(matches!(err, ConfigError::MissingEnvVar { .. }));
1052    }
1053}