Skip to main content

autumn_web/
actuator.rs

1//! Actuator endpoints for operational observability.
2//!
3//! Provides health, info, env, metrics, configprops, loggers, and tasks
4//! endpoints under the configured actuator prefix.
5//!
6//! Sensitive endpoints are gated by profile-aware defaults:
7//! - **dev**: all endpoints enabled
8//! - **prod**: only health, info, and metrics
9
10use std::collections::HashMap;
11use std::sync::{Arc, RwLock};
12
13use axum::Json;
14use axum::extract::{Path, State};
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::{Deserialize, Serialize};
18
19/// Scaffold-level accessibility posture reported by `/actuator/a11y`.
20///
21/// Each field indicates whether a foundational WCAG 2.1 AA scaffold concern is
22/// addressed in the application.  Apps generated with `autumn new` satisfy all
23/// three by default; existing apps can opt in incrementally.
24#[derive(Debug, Clone, Serialize, Default)]
25pub struct A11yPosture {
26    /// `<html lang="…">` is set in the page template.
27    pub lang_set: bool,
28    /// A skip-to-content link is present as the first focusable element.
29    pub skip_link_present: bool,
30    /// Semantic landmark regions (`<header>`, `<main>`, `<nav>`, `<footer>`)
31    /// are used in the page layout.
32    pub landmark_regions_present: bool,
33}
34
35impl A11yPosture {
36    /// Returns `true` when all scaffold-level a11y concerns are addressed.
37    #[must_use]
38    pub const fn is_compliant(&self) -> bool {
39        self.lang_set && self.skip_link_present && self.landmark_regions_present
40    }
41}
42
43// ── Plugin-contributed metrics ──────────────────────────────────
44
45/// Kind of a Prometheus metric family.
46///
47/// Used in [`MetricFamily`] to emit the correct `# TYPE` line in the
48/// Prometheus text format.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum MetricKind {
51    /// A monotonically increasing value (e.g., request count).
52    Counter,
53    /// An arbitrary up-or-down value (e.g., queue depth, active connections).
54    Gauge,
55}
56
57impl MetricKind {
58    const fn as_str(&self) -> &'static str {
59        match self {
60            Self::Counter => "counter",
61            Self::Gauge => "gauge",
62        }
63    }
64}
65
66/// A single metric sample with optional label set and a value.
67///
68/// Labels are `(name, value)` pairs rendered as `{name="value"}` in
69/// Prometheus text format.
70#[derive(Debug, Clone)]
71pub struct MetricSample {
72    /// Label key-value pairs. Empty means no labels.
73    pub labels: Vec<(String, String)>,
74    /// The metric value.
75    pub value: f64,
76}
77
78/// A complete metric family: name, kind, help text, and current samples.
79///
80/// Each `MetricFamily` is rendered as one `# HELP` / `# TYPE` block followed
81/// by one line per sample in the Prometheus text format.
82#[derive(Debug, Clone)]
83pub struct MetricFamily {
84    /// Unique metric name (e.g., `"harvest_workflow_completions_total"`).
85    ///
86    /// Use a stable namespace prefix so names don't collide with the built-in
87    /// `autumn_*` families or other registered sources.
88    pub name: String,
89    /// One-line description emitted as `# HELP` in the Prometheus output.
90    pub help: String,
91    /// Metric type: counter, gauge, or histogram.
92    pub kind: MetricKind,
93    /// Current samples.  Each sample produces one line in the scrape output.
94    pub samples: Vec<MetricSample>,
95}
96
97/// Contract for a subsystem that contributes metrics to the unified actuator endpoints.
98///
99/// Implement this trait and register the implementation via
100/// [`crate::app::AppBuilder::metrics_source`] to publish metric families that appear in
101/// `/actuator/prometheus` alongside the built-in `autumn_http_*` families, and
102/// in `/actuator/metrics` under the `sources` key.
103///
104/// # Naming rules
105///
106/// Prefix every metric name with a stable namespace (e.g. `harvest_` for
107/// autumn-harvest, `myapp_` for an application-level source).  The registry
108/// enforces that two sources cannot share the same **registration name**; metric
109/// family name uniqueness is the source's responsibility.
110///
111/// # Sync-snapshot contract
112///
113/// `collect` is called synchronously on the HTTP request goroutine.
114/// Implementations **must not block on I/O** — read from atomics,
115/// `RwLock`-protected snapshots, or channels that already have buffered data.
116/// If async work is needed, collect it into a pre-computed cache and update
117/// that cache from a background task.
118pub trait MetricsSource: Send + Sync + 'static {
119    /// Return zero or more metric families, all read from in-memory state.
120    fn collect(&self) -> Vec<MetricFamily>;
121}
122
123/// Registry of named [`MetricsSource`] implementations.
124///
125/// Maintained by [`crate::app::AppBuilder`] and stored on
126/// [`crate::AppState`]. Provides duplicate-registration detection at startup
127/// and per-source panic isolation at scrape time.
128#[derive(Clone, Default)]
129pub struct MetricsSourceRegistry {
130    inner: Arc<RwLock<MetricsSourceRegistryInner>>,
131}
132
133#[derive(Default)]
134struct MetricsSourceRegistryInner {
135    /// Registered sources in insertion order.
136    sources: Vec<(String, Arc<dyn MetricsSource>)>,
137    /// Per-source scrape-error counter incremented when a source panics.
138    error_counts: HashMap<String, u64>,
139}
140
141impl MetricsSourceRegistry {
142    /// Create a new, empty registry.
143    #[must_use]
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    /// Register a named source.
149    ///
150    /// Returns `Err` containing a message if a source with `name` has already
151    /// been registered (startup-time collision detection).
152    ///
153    /// # Errors
154    ///
155    /// Returns an error string when `name` is already registered.
156    pub fn register(
157        &self,
158        name: impl Into<String>,
159        source: Arc<dyn MetricsSource>,
160    ) -> Result<(), String> {
161        let name = name.into();
162        {
163            let mut inner = self
164                .inner
165                .write()
166                .unwrap_or_else(std::sync::PoisonError::into_inner);
167            if inner.sources.iter().any(|(n, _)| n == &name) {
168                return Err(format!(
169                    "MetricsSource '{name}' is already registered; skipping duplicate"
170                ));
171            }
172            inner.sources.push((name, source));
173        }
174        Ok(())
175    }
176
177    /// Collect from all registered sources, isolating panics.
178    ///
179    /// Returns one entry per registered source; panicking sources contribute an
180    /// empty `Vec<MetricFamily>` and increment their error counter.
181    pub fn collect_all(&self) -> Vec<(String, Vec<MetricFamily>)> {
182        let sources: Vec<(String, Arc<dyn MetricsSource>)> = self
183            .inner
184            .read()
185            .unwrap_or_else(std::sync::PoisonError::into_inner)
186            .sources
187            .clone();
188
189        let mut results = Vec::with_capacity(sources.len());
190        let mut panicked = Vec::new();
191
192        for (name, source) in &sources {
193            let result =
194                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| source.collect()));
195            if let Ok(families) = result {
196                results.push((name.clone(), families));
197            } else {
198                tracing::error!(source_name = %name, "MetricsSource panicked during collection");
199                panicked.push(name.clone());
200                results.push((name.clone(), vec![]));
201            }
202        }
203
204        if !panicked.is_empty() {
205            let mut inner = self
206                .inner
207                .write()
208                .unwrap_or_else(std::sync::PoisonError::into_inner);
209            for name in panicked {
210                *inner.error_counts.entry(name).or_insert(0) += 1;
211            }
212        }
213
214        results
215    }
216
217    /// Current per-source scrape-error counts (incremented on panic isolation).
218    #[must_use]
219    pub fn error_counts(&self) -> HashMap<String, u64> {
220        self.inner
221            .read()
222            .unwrap_or_else(std::sync::PoisonError::into_inner)
223            .error_counts
224            .clone()
225    }
226
227    /// Names of all registered sources, in insertion order.
228    #[must_use]
229    pub fn source_names(&self) -> Vec<String> {
230        self.inner
231            .read()
232            .unwrap_or_else(std::sync::PoisonError::into_inner)
233            .sources
234            .iter()
235            .map(|(n, _)| n.clone())
236            .collect()
237    }
238
239    /// Returns `true` when no sources have been registered.
240    #[must_use]
241    pub fn is_empty(&self) -> bool {
242        self.inner
243            .read()
244            .unwrap_or_else(std::sync::PoisonError::into_inner)
245            .sources
246            .is_empty()
247    }
248}
249
250/// Trait to abstract the state requirements for actuator handlers.
251///
252/// Implement this trait on your application's state type to provide
253/// the necessary dependencies for actuator endpoints (e.g. `/actuator/metrics`).
254/// This avoids tight coupling between the actuator middleware and the specific `AppState`.
255pub trait ProvideActuatorState {
256    /// Returns a reference to the [`crate::middleware::MetricsCollector`]
257    /// tracking current HTTP traffic metrics.
258    fn metrics(&self) -> &crate::middleware::MetricsCollector;
259
260    /// Returns a reference to the dynamic [`LogLevels`] configuration
261    /// allowing runtime adjustment of `tracing` filters.
262    fn log_levels(&self) -> &LogLevels;
263
264    /// Returns a reference to the [`TaskRegistry`] holding status and metadata
265    /// for async scheduled background tasks.
266    fn task_registry(&self) -> &TaskRegistry;
267
268    /// Returns a reference to the [`JobRegistry`] holding queue and failure
269    /// information for ad-hoc background jobs.
270    fn job_registry(&self) -> &JobRegistry;
271
272    /// Returns a reference to the [`ConfigProperties`] snapshot, providing
273    /// active configuration state for the environment endpoint.
274    fn config_props(&self) -> &ConfigProperties;
275
276    /// Returns the currently active execution profile (e.g. "dev", "prod")
277    /// which modifies what sensitive endpoints are exposed.
278    fn profile(&self) -> &str;
279
280    /// Returns a human-readable string displaying how long the application
281    /// has been running (e.g., "2d 4h 13m").
282    fn uptime_display(&self) -> String;
283
284    /// Returns a reference to the system [`crate::channels::Channels`] which
285    /// broadcasts operational events to WebSocket streams.
286    #[cfg(feature = "ws")]
287    fn channels(&self) -> &crate::channels::Channels;
288
289    /// Returns the main cancellation token that triggers a graceful framework shutdown.
290    #[cfg(feature = "ws")]
291    fn shutdown_token(&self) -> tokio_util::sync::CancellationToken;
292
293    /// Returns an optional reference to the database connection pool,
294    /// used to expose database connection metrics in the `/actuator/metrics` endpoint.
295    #[cfg(feature = "db")]
296    fn pool(
297        &self,
298    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
299
300    /// Returns the scaffold-level accessibility posture reported by `/actuator/a11y`.
301    ///
302    /// Override this in your `AppState` implementation to declare which
303    /// WCAG 2.1 AA scaffold concerns your application addresses.  The default
304    /// returns all-false (no concerns addressed) — a conservative safe default.
305    fn a11y_posture(&self) -> A11yPosture {
306        A11yPosture::default()
307    }
308
309    /// Returns the registry of plugin-contributed [`MetricsSource`] implementations.
310    ///
311    /// The default returns `None`, meaning no plugin sources are consulted.
312    /// [`crate::AppState`] overrides this to return its registry, which is
313    /// populated by [`crate::app::AppBuilder::metrics_source`].
314    fn metrics_source_registry(&self) -> Option<&MetricsSourceRegistry> {
315        None
316    }
317
318    /// Returns the registry of [`HealthIndicator`] implementations.
319    ///
320    /// The default returns `None`, meaning no custom indicators are consulted.
321    /// [`crate::AppState`] overrides this to return its registry, which is
322    /// populated by [`crate::app::AppBuilder::health_indicator`].
323    fn health_indicator_registry(&self) -> Option<&HealthIndicatorRegistry> {
324        None
325    }
326
327    /// Returns whether detailed health information should be included in responses.
328    ///
329    /// When `false`, per-component `details` maps are omitted from
330    /// `/actuator/health` output. Defaults to `true`.
331    fn health_detailed(&self) -> bool {
332        true
333    }
334
335    /// Returns the deploy-version label for this replica (e.g. `"stable"` or
336    /// `"canary"`), used to tag Prometheus metrics so a canary controller can
337    /// compare canary vs. stable cohorts.
338    ///
339    /// Defaults to [`crate::canary::STABLE`]. [`crate::AppState`] overrides this
340    /// to return the value resolved from `AUTUMN_DEPLOY_VERSION` /
341    /// `AUTUMN_CANARY` (see [`crate::canary`]).
342    fn deploy_version(&self) -> String {
343        crate::canary::STABLE.to_owned()
344    }
345
346    #[cfg(feature = "http-client")]
347    /// Returns the optional webhook outbound manager if enabled/registered.
348    fn webhook_outbound(&self) -> Option<crate::webhook_outbound::WebhookOutboundManager> {
349        None
350    }
351
352    /// Returns the in-memory log capture buffer, if capture is enabled.
353    ///
354    /// The default returns `None` (capture disabled). [`crate::AppState`]
355    /// overrides this to return the buffer installed at startup when
356    /// `log.capture.enabled = true`.
357    fn log_buffer(&self) -> Option<crate::log::capture::LogBuffer> {
358        None
359    }
360}
361
362// ── Shared types for AppState ──────────────────────────────────
363
364/// Runtime log level management for the loggers actuator endpoint.
365///
366/// Stores the current effective log level and per-logger overrides.
367/// Changes are ephemeral -- they reset on restart.
368#[derive(Clone)]
369pub struct LogLevels {
370    inner: Arc<RwLock<LogLevelsInner>>,
371}
372
373struct LogLevelsInner {
374    /// The current global log level.
375    current_level: String,
376    /// Per-logger level overrides applied at runtime.
377    logger_overrides: HashMap<String, String>,
378}
379
380impl LogLevels {
381    /// Create a new `LogLevels` with the given initial level.
382    #[must_use]
383    pub fn new(initial_level: &str) -> Self {
384        Self {
385            inner: Arc::new(RwLock::new(LogLevelsInner {
386                current_level: initial_level.to_string(),
387                logger_overrides: HashMap::new(),
388            })),
389        }
390    }
391
392    /// Get the current global log level.
393    #[must_use]
394    pub fn current_level(&self) -> String {
395        self.inner
396            .read()
397            .map_or_else(|_| "info".to_string(), |guard| guard.current_level.clone())
398    }
399
400    /// Get all per-logger overrides.
401    #[must_use]
402    pub fn logger_overrides(&self) -> HashMap<String, String> {
403        self.inner
404            .read()
405            .map(|guard| guard.logger_overrides.clone())
406            .unwrap_or_default()
407    }
408
409    /// Set the level for a specific logger. Returns the previous level if any.
410    #[must_use]
411    pub fn set_logger_level(&self, name: &str, level: &str) -> Option<String> {
412        let Ok(mut guard) = self.inner.write() else {
413            return None;
414        };
415        // Prevent unbounded memory growth from arbitrary logger names
416        if guard.logger_overrides.len() >= 1000 && !guard.logger_overrides.contains_key(name) {
417            return None;
418        }
419
420        let previous = guard.logger_overrides.get(name).cloned();
421        guard
422            .logger_overrides
423            .insert(name.to_string(), level.to_string());
424        // If setting the root level, update current_level too
425        if name == "root" || name.is_empty() {
426            let prev = Some(guard.current_level.clone());
427            guard.current_level = level.to_string();
428            return prev;
429        }
430        previous
431    }
432}
433
434impl std::fmt::Debug for LogLevels {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        f.debug_struct("LogLevels")
437            .field("current_level", &self.current_level())
438            .finish()
439    }
440}
441
442/// Scheduled task status information.
443#[derive(Debug, Clone, Serialize)]
444pub struct TaskStatus {
445    /// The schedule description (e.g., "every 5m" or "cron 0 0 * * *").
446    pub schedule: String,
447    /// Whether this task is coordinated across the fleet or per replica.
448    pub coordination: crate::task::TaskCoordination,
449    /// Scheduler backend currently coordinating this task.
450    pub scheduler_backend: String,
451    /// Replica id for this process.
452    pub replica_id: String,
453    /// Replica id that last acquired leadership for this task.
454    #[serde(skip_serializing_if = "Option::is_none")]
455    pub current_leader: Option<String>,
456    /// Last global tick key observed for this task.
457    #[serde(skip_serializing_if = "Option::is_none")]
458    pub last_tick: Option<String>,
459    /// Last time this task fired (ISO 8601), if ever.
460    #[serde(skip_serializing_if = "Option::is_none")]
461    pub last_fired_at: Option<String>,
462    /// Next scheduled run time (ISO 8601), if known.
463    #[serde(skip_serializing_if = "Option::is_none")]
464    pub next_run_at: Option<String>,
465    /// Current task state.
466    pub status: String,
467    /// Last time the task ran (ISO 8601), if ever.
468    #[serde(skip_serializing_if = "Option::is_none")]
469    pub last_run: Option<String>,
470    /// Duration of last run in milliseconds.
471    #[serde(skip_serializing_if = "Option::is_none")]
472    pub last_duration_ms: Option<u64>,
473    /// Result of last run.
474    #[serde(skip_serializing_if = "Option::is_none")]
475    pub last_result: Option<String>,
476    /// Last error message, if the task failed.
477    #[serde(skip_serializing_if = "Option::is_none")]
478    pub last_error: Option<String>,
479    /// Total number of times the task has run.
480    pub total_runs: u64,
481    /// Total number of failures.
482    pub total_failures: u64,
483}
484
485/// Registry of scheduled tasks and their runtime status.
486#[derive(Clone)]
487pub struct TaskRegistry {
488    inner: Arc<RwLock<HashMap<String, TaskStatus>>>,
489}
490
491/// On-demand background job status information.
492#[derive(Debug, Clone, Serialize)]
493pub struct JobStatus {
494    /// Approximate queued jobs waiting to run.
495    pub queued: u64,
496    /// Number of currently running jobs.
497    pub in_flight: u64,
498    /// Approximate jobs currently waiting on a free concurrency slot.
499    pub blocked_on_concurrency: u64,
500    /// Total successful executions.
501    pub total_successes: u64,
502    /// Total failed executions.
503    pub total_failures: u64,
504    /// Total dead-lettered executions.
505    pub dead_letters: u64,
506    /// Total enqueues coalesced because a matching unique job was already held.
507    pub total_deduplicated: u64,
508    /// Last observed error for this job, if any.
509    #[serde(skip_serializing_if = "Option::is_none")]
510    pub last_error: Option<String>,
511}
512
513impl JobStatus {
514    const fn empty() -> Self {
515        Self {
516            queued: 0,
517            in_flight: 0,
518            blocked_on_concurrency: 0,
519            total_successes: 0,
520            total_failures: 0,
521            dead_letters: 0,
522            total_deduplicated: 0,
523            last_error: None,
524        }
525    }
526}
527
528/// Registry of ad-hoc jobs and their runtime status.
529#[derive(Clone)]
530pub struct JobRegistry {
531    inner: Arc<RwLock<HashMap<String, JobStatus>>>,
532}
533
534impl JobRegistry {
535    /// Create a new empty job registry.
536    #[must_use]
537    pub fn new() -> Self {
538        Self {
539            inner: Arc::new(RwLock::new(HashMap::new())),
540        }
541    }
542
543    /// Register a job name with initial counters.
544    pub fn register(&self, name: &str) {
545        if let Ok(mut guard) = self.inner.write() {
546            guard.entry(name.to_string()).or_insert(JobStatus::empty());
547        }
548    }
549
550    /// Record that a new job instance was enqueued.
551    pub fn record_enqueue(&self, name: &str) {
552        if let Ok(mut guard) = self.inner.write() {
553            let status = guard.entry(name.to_string()).or_insert(JobStatus::empty());
554            status.queued = status.queued.saturating_add(1);
555        }
556    }
557
558    /// Record that an enqueue was coalesced into an existing unique job.
559    ///
560    /// Reverses the `record_enqueue` bookkeeping for the coalesced instance
561    /// and bumps the deduplication counter.
562    pub fn record_deduplicated(&self, name: &str) {
563        if let Ok(mut guard) = self.inner.write()
564            && let Some(status) = guard.get_mut(name)
565        {
566            status.queued = status.queued.saturating_sub(1);
567            status.total_deduplicated = status.total_deduplicated.saturating_add(1);
568        }
569    }
570
571    /// Record that a job is parked waiting on a free concurrency slot.
572    pub fn record_concurrency_blocked(&self, name: &str) {
573        if let Ok(mut guard) = self.inner.write()
574            && let Some(status) = guard.get_mut(name)
575        {
576            status.blocked_on_concurrency = status.blocked_on_concurrency.saturating_add(1);
577        }
578    }
579
580    /// Record that a parked job was released back to the queue.
581    pub fn record_concurrency_unblocked(&self, name: &str) {
582        if let Ok(mut guard) = self.inner.write()
583            && let Some(status) = guard.get_mut(name)
584        {
585            status.blocked_on_concurrency = status.blocked_on_concurrency.saturating_sub(1);
586        }
587    }
588
589    /// Replace the blocked-on-concurrency gauges from a backend-wide survey.
590    ///
591    /// Names absent from `counts` are reset to zero. Used by the durable
592    /// backends whose blocked set is observed periodically rather than
593    /// tracked per event.
594    pub fn set_concurrency_blocked_counts(&self, counts: &HashMap<String, u64>) {
595        if let Ok(mut guard) = self.inner.write() {
596            for (name, status) in guard.iter_mut() {
597                status.blocked_on_concurrency = counts.get(name).copied().unwrap_or(0);
598            }
599        }
600    }
601
602    /// Record that a queued job started execution.
603    pub fn record_start(&self, name: &str) {
604        if let Ok(mut guard) = self.inner.write()
605            && let Some(status) = guard.get_mut(name)
606        {
607            status.queued = status.queued.saturating_sub(1);
608            status.in_flight = status.in_flight.saturating_add(1);
609        }
610    }
611
612    /// Record that a queued job was canceled before execution.
613    pub fn record_cancel(&self, name: &str) {
614        if let Ok(mut guard) = self.inner.write()
615            && let Some(status) = guard.get_mut(name)
616        {
617            status.queued = status.queued.saturating_sub(1);
618        }
619    }
620
621    /// Record a successful execution.
622    pub fn record_success(&self, name: &str) {
623        if let Ok(mut guard) = self.inner.write()
624            && let Some(status) = guard.get_mut(name)
625        {
626            status.in_flight = status.in_flight.saturating_sub(1);
627            status.total_successes = status.total_successes.saturating_add(1);
628            status.last_error = None;
629        }
630    }
631
632    /// Record a retriable failure.
633    pub fn record_retry(&self, name: &str, error: &str, _attempt: u32) {
634        if let Ok(mut guard) = self.inner.write()
635            && let Some(status) = guard.get_mut(name)
636        {
637            status.in_flight = status.in_flight.saturating_sub(1);
638            status.last_error = Some(error.to_string());
639        }
640    }
641
642    /// Record a terminal failure.
643    pub fn record_failure(&self, name: &str, error: String, dead_lettered: bool) {
644        if let Ok(mut guard) = self.inner.write()
645            && let Some(status) = guard.get_mut(name)
646        {
647            status.in_flight = status.in_flight.saturating_sub(1);
648            status.total_failures = status.total_failures.saturating_add(1);
649            status.last_error = Some(error);
650            if dead_lettered {
651                status.dead_letters = status.dead_letters.saturating_add(1);
652            }
653        }
654    }
655
656    /// Snapshot all registered jobs.
657    #[must_use]
658    pub fn snapshot(&self) -> HashMap<String, JobStatus> {
659        self.inner.read().map(|g| g.clone()).unwrap_or_default()
660    }
661}
662
663impl Default for JobRegistry {
664    fn default() -> Self {
665        Self::new()
666    }
667}
668
669impl TaskRegistry {
670    /// Create a new empty task registry.
671    #[must_use]
672    pub fn new() -> Self {
673        Self {
674            inner: Arc::new(RwLock::new(HashMap::new())),
675        }
676    }
677
678    /// Register a task with its schedule description.
679    pub fn register(&self, name: &str, schedule: &str) {
680        self.register_scheduled(
681            name,
682            schedule,
683            crate::task::TaskCoordination::Fleet,
684            "in_process",
685            "unknown",
686        );
687    }
688
689    /// Register a scheduled task with scheduler coordination metadata.
690    pub fn register_scheduled(
691        &self,
692        name: &str,
693        schedule: &str,
694        coordination: crate::task::TaskCoordination,
695        scheduler_backend: &str,
696        replica_id: &str,
697    ) {
698        let Ok(mut guard) = self.inner.write() else {
699            return;
700        };
701        guard.insert(
702            name.to_string(),
703            TaskStatus {
704                schedule: schedule.to_string(),
705                coordination,
706                scheduler_backend: scheduler_backend.to_string(),
707                replica_id: replica_id.to_string(),
708                current_leader: None,
709                last_tick: None,
710                last_fired_at: None,
711                next_run_at: None,
712                status: "idle".to_string(),
713                last_run: None,
714                last_duration_ms: None,
715                last_result: None,
716                last_error: None,
717                total_runs: 0,
718                total_failures: 0,
719            },
720        );
721    }
722
723    /// Record the replica that acquired leadership for a global task tick.
724    pub fn record_leader(&self, name: &str, leader_id: &str, tick_key: &str) {
725        let Ok(mut guard) = self.inner.write() else {
726            return;
727        };
728        let Some(task) = guard.get_mut(name) else {
729            return;
730        };
731        task.current_leader = Some(leader_id.to_string());
732        task.last_tick = Some(tick_key.to_string());
733    }
734
735    /// Record that a task started running.
736    pub fn record_start(&self, name: &str) {
737        let Ok(mut guard) = self.inner.write() else {
738            return;
739        };
740        let Some(task) = guard.get_mut(name) else {
741            return;
742        };
743        task.status = "running".to_string();
744        task.next_run_at = None;
745    }
746
747    /// Record the next scheduled run time for an idle task.
748    pub fn record_next_run_at(&self, name: &str, next_run_at: &str) {
749        let Ok(mut guard) = self.inner.write() else {
750            return;
751        };
752        let Some(task) = guard.get_mut(name) else {
753            return;
754        };
755        task.next_run_at = Some(next_run_at.to_string());
756    }
757
758    /// Record that a task completed successfully.
759    pub fn record_success(&self, name: &str, duration_ms: u64) {
760        let Ok(mut guard) = self.inner.write() else {
761            return;
762        };
763        let Some(task) = guard.get_mut(name) else {
764            return;
765        };
766        task.status = "idle".to_string();
767        let now = chrono::Utc::now().to_rfc3339();
768        task.last_run = Some(now.clone());
769        task.last_fired_at = Some(now);
770        task.last_duration_ms = Some(duration_ms);
771        task.last_result = Some("ok".to_string());
772        task.last_error = None;
773        task.total_runs += 1;
774    }
775
776    /// Record that a task failed.
777    pub fn record_failure(&self, name: &str, duration_ms: u64, error: &str) {
778        let Ok(mut guard) = self.inner.write() else {
779            return;
780        };
781        let Some(task) = guard.get_mut(name) else {
782            return;
783        };
784        task.status = "idle".to_string();
785        let now = chrono::Utc::now().to_rfc3339();
786        task.last_run = Some(now.clone());
787        task.last_fired_at = Some(now);
788        task.last_duration_ms = Some(duration_ms);
789        task.last_result = Some("failed".to_string());
790        task.last_error = Some(error.to_string());
791        task.total_runs += 1;
792        task.total_failures += 1;
793    }
794
795    /// Get a snapshot of all task statuses.
796    #[must_use]
797    pub fn snapshot(&self) -> HashMap<String, TaskStatus> {
798        self.inner
799            .read()
800            .map(|guard| guard.clone())
801            .unwrap_or_default()
802    }
803}
804
805impl Default for TaskRegistry {
806    fn default() -> Self {
807        Self::new()
808    }
809}
810
811impl std::fmt::Debug for TaskRegistry {
812    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813        f.debug_struct("TaskRegistry")
814            .field("count", &self.snapshot().len())
815            .finish()
816    }
817}
818
819/// Resolved config property with source provenance.
820#[derive(Debug, Clone, Serialize, Deserialize)]
821pub struct ConfigProperty {
822    /// The resolved value (redacted if sensitive).
823    pub value: serde_json::Value,
824    /// Where the value came from.
825    pub source: String,
826}
827
828/// Collection of resolved config properties with source tracking.
829#[derive(Debug, Clone, Default)]
830pub struct ConfigProperties {
831    inner: Arc<RwLock<HashMap<String, ConfigProperty>>>,
832}
833
834impl ConfigProperties {
835    /// Build config properties with source tracking from the loaded config.
836    #[must_use]
837    #[allow(clippy::too_many_lines)]
838    pub fn from_config(config: &crate::config::AutumnConfig) -> Self {
839        let profile = config.profile.as_deref().unwrap_or("default");
840        let defaults = crate::config::AutumnConfig::default();
841
842        // Avoids dynamic reallocation since we know roughly how many config properties are tracked.
843        let mut props = HashMap::with_capacity(32);
844        let profile_str = profile.to_string();
845
846        Self::track_server_props(&mut props, config, &defaults, &profile_str);
847        Self::track_db_props(&mut props, config, &defaults, &profile_str);
848        Self::track_log_props(&mut props, config, &defaults, &profile_str);
849        Self::track_telemetry_props(&mut props, config, &defaults, &profile_str);
850        Self::track_health_props(&mut props, config, &defaults, &profile_str);
851        Self::track_actuator_props(&mut props, config, &defaults, &profile_str);
852        Self::track_session_props(&mut props, config, &defaults, &profile_str);
853        Self::track_channels_props(&mut props, config, &defaults, &profile_str);
854
855        Self {
856            inner: Arc::new(RwLock::new(props)),
857        }
858    }
859
860    fn track_server_props(
861        props: &mut HashMap<String, ConfigProperty>,
862        config: &crate::config::AutumnConfig,
863        defaults: &crate::config::AutumnConfig,
864        profile_str: &str,
865    ) {
866        Self::track_property(
867            props,
868            "server.host",
869            &config.server.host,
870            &defaults.server.host,
871            profile_str,
872        );
873        Self::track_property(
874            props,
875            "server.port",
876            &config.server.port.to_string(),
877            &defaults.server.port.to_string(),
878            profile_str,
879        );
880        Self::track_property(
881            props,
882            "server.shutdown_timeout_secs",
883            &config.server.shutdown_timeout_secs.to_string(),
884            &defaults.server.shutdown_timeout_secs.to_string(),
885            profile_str,
886        );
887    }
888
889    fn track_db_props(
890        props: &mut HashMap<String, ConfigProperty>,
891        config: &crate::config::AutumnConfig,
892        defaults: &crate::config::AutumnConfig,
893        profile_str: &str,
894    ) {
895        let db_url = config.database.url.as_deref().unwrap_or("").to_string();
896        let primary_url = config
897            .database
898            .primary_url
899            .as_deref()
900            .unwrap_or("")
901            .to_string();
902        let replica_url = config
903            .database
904            .replica_url
905            .as_deref()
906            .unwrap_or("")
907            .to_string();
908        Self::track_property(props, "database.url", &db_url, "", profile_str);
909        Self::track_property(props, "database.primary_url", &primary_url, "", profile_str);
910        Self::track_property(props, "database.replica_url", &replica_url, "", profile_str);
911        Self::track_property(
912            props,
913            "database.pool_size",
914            &config.database.pool_size.to_string(),
915            &defaults.database.pool_size.to_string(),
916            profile_str,
917        );
918        Self::track_property(
919            props,
920            "database.primary_pool_size",
921            &config.database.effective_primary_pool_size().to_string(),
922            &defaults.database.effective_primary_pool_size().to_string(),
923            profile_str,
924        );
925        Self::track_property(
926            props,
927            "database.replica_pool_size",
928            &config.database.effective_replica_pool_size().to_string(),
929            &defaults.database.effective_replica_pool_size().to_string(),
930            profile_str,
931        );
932        Self::track_property(
933            props,
934            "database.replica_fallback",
935            &format!("{:?}", config.database.replica_fallback),
936            &format!("{:?}", defaults.database.replica_fallback),
937            profile_str,
938        );
939    }
940
941    fn track_log_props(
942        props: &mut HashMap<String, ConfigProperty>,
943        config: &crate::config::AutumnConfig,
944        defaults: &crate::config::AutumnConfig,
945        profile_str: &str,
946    ) {
947        Self::track_property(
948            props,
949            "log.level",
950            &config.log.level,
951            &defaults.log.level,
952            profile_str,
953        );
954        Self::track_property(
955            props,
956            "log.format",
957            &format!("{:?}", config.log.format),
958            &format!("{:?}", defaults.log.format),
959            profile_str,
960        );
961        Self::track_property(
962            props,
963            "log.capture.enabled",
964            &config.log.capture.enabled.to_string(),
965            &defaults.log.capture.enabled.to_string(),
966            profile_str,
967        );
968        Self::track_property(
969            props,
970            "log.capture.capacity",
971            &config.log.capture.capacity.to_string(),
972            &defaults.log.capture.capacity.to_string(),
973            profile_str,
974        );
975    }
976
977    fn track_telemetry_props(
978        props: &mut HashMap<String, ConfigProperty>,
979        config: &crate::config::AutumnConfig,
980        defaults: &crate::config::AutumnConfig,
981        profile_str: &str,
982    ) {
983        Self::track_property(
984            props,
985            "telemetry.enabled",
986            &config.telemetry.enabled.to_string(),
987            &defaults.telemetry.enabled.to_string(),
988            profile_str,
989        );
990        Self::track_property(
991            props,
992            "telemetry.service_name",
993            &config.telemetry.service_name,
994            &defaults.telemetry.service_name,
995            profile_str,
996        );
997        Self::track_property(
998            props,
999            "telemetry.service_namespace",
1000            config.telemetry.service_namespace.as_deref().unwrap_or(""),
1001            defaults
1002                .telemetry
1003                .service_namespace
1004                .as_deref()
1005                .unwrap_or(""),
1006            profile_str,
1007        );
1008        Self::track_property(
1009            props,
1010            "telemetry.service_version",
1011            &config.telemetry.service_version,
1012            &defaults.telemetry.service_version,
1013            profile_str,
1014        );
1015        Self::track_property(
1016            props,
1017            "telemetry.environment",
1018            &config.telemetry.environment,
1019            &defaults.telemetry.environment,
1020            profile_str,
1021        );
1022        Self::track_property(
1023            props,
1024            "telemetry.otlp_endpoint",
1025            config.telemetry.otlp_endpoint.as_deref().unwrap_or(""),
1026            defaults.telemetry.otlp_endpoint.as_deref().unwrap_or(""),
1027            profile_str,
1028        );
1029        Self::track_property(
1030            props,
1031            "telemetry.protocol",
1032            &format!("{:?}", config.telemetry.protocol),
1033            &format!("{:?}", defaults.telemetry.protocol),
1034            profile_str,
1035        );
1036        Self::track_property(
1037            props,
1038            "telemetry.strict",
1039            &config.telemetry.strict.to_string(),
1040            &defaults.telemetry.strict.to_string(),
1041            profile_str,
1042        );
1043    }
1044
1045    fn track_health_props(
1046        props: &mut HashMap<String, ConfigProperty>,
1047        config: &crate::config::AutumnConfig,
1048        defaults: &crate::config::AutumnConfig,
1049        profile_str: &str,
1050    ) {
1051        Self::track_property(
1052            props,
1053            "health.path",
1054            &config.health.path,
1055            &defaults.health.path,
1056            profile_str,
1057        );
1058        Self::track_property(
1059            props,
1060            "health.live_path",
1061            &config.health.live_path,
1062            &defaults.health.live_path,
1063            profile_str,
1064        );
1065        Self::track_property(
1066            props,
1067            "health.ready_path",
1068            &config.health.ready_path,
1069            &defaults.health.ready_path,
1070            profile_str,
1071        );
1072        Self::track_property(
1073            props,
1074            "health.startup_path",
1075            &config.health.startup_path,
1076            &defaults.health.startup_path,
1077            profile_str,
1078        );
1079        Self::track_property(
1080            props,
1081            "health.detailed",
1082            &config.health.detailed.to_string(),
1083            &defaults.health.detailed.to_string(),
1084            profile_str,
1085        );
1086    }
1087
1088    fn track_actuator_props(
1089        props: &mut HashMap<String, ConfigProperty>,
1090        config: &crate::config::AutumnConfig,
1091        defaults: &crate::config::AutumnConfig,
1092        profile_str: &str,
1093    ) {
1094        Self::track_property(
1095            props,
1096            "actuator.prefix",
1097            &config.actuator.prefix,
1098            &defaults.actuator.prefix,
1099            profile_str,
1100        );
1101        Self::track_property(
1102            props,
1103            "actuator.sensitive",
1104            &config.actuator.sensitive.to_string(),
1105            &defaults.actuator.sensitive.to_string(),
1106            profile_str,
1107        );
1108        Self::track_property(
1109            props,
1110            "actuator.prometheus",
1111            &config.actuator.prometheus.to_string(),
1112            &defaults.actuator.prometheus.to_string(),
1113            profile_str,
1114        );
1115    }
1116
1117    fn track_session_props(
1118        props: &mut HashMap<String, ConfigProperty>,
1119        config: &crate::config::AutumnConfig,
1120        defaults: &crate::config::AutumnConfig,
1121        profile_str: &str,
1122    ) {
1123        Self::track_property(
1124            props,
1125            "session.backend",
1126            &format!("{:?}", config.session.backend),
1127            &format!("{:?}", defaults.session.backend),
1128            profile_str,
1129        );
1130        Self::track_property(
1131            props,
1132            "session.cookie_name",
1133            &config.session.cookie_name,
1134            &defaults.session.cookie_name,
1135            profile_str,
1136        );
1137        Self::track_property(
1138            props,
1139            "session.max_age_secs",
1140            &config.session.max_age_secs.to_string(),
1141            &defaults.session.max_age_secs.to_string(),
1142            profile_str,
1143        );
1144        Self::track_property(
1145            props,
1146            "session.secure",
1147            &config.session.secure.to_string(),
1148            &defaults.session.secure.to_string(),
1149            profile_str,
1150        );
1151        Self::track_property(
1152            props,
1153            "session.same_site",
1154            &config.session.same_site,
1155            &defaults.session.same_site,
1156            profile_str,
1157        );
1158        Self::track_property(
1159            props,
1160            "session.http_only",
1161            &config.session.http_only.to_string(),
1162            &defaults.session.http_only.to_string(),
1163            profile_str,
1164        );
1165        Self::track_property(
1166            props,
1167            "session.path",
1168            &config.session.path,
1169            &defaults.session.path,
1170            profile_str,
1171        );
1172        Self::track_property(
1173            props,
1174            "session.allow_memory_in_production",
1175            &config.session.allow_memory_in_production.to_string(),
1176            &defaults.session.allow_memory_in_production.to_string(),
1177            profile_str,
1178        );
1179        Self::track_property(
1180            props,
1181            "session.redis.url",
1182            config.session.redis.url.as_deref().unwrap_or(""),
1183            defaults.session.redis.url.as_deref().unwrap_or(""),
1184            profile_str,
1185        );
1186        Self::track_property(
1187            props,
1188            "session.redis.key_prefix",
1189            &config.session.redis.key_prefix,
1190            &defaults.session.redis.key_prefix,
1191            profile_str,
1192        );
1193    }
1194
1195    fn track_channels_props(
1196        props: &mut HashMap<String, ConfigProperty>,
1197        config: &crate::config::AutumnConfig,
1198        defaults: &crate::config::AutumnConfig,
1199        profile_str: &str,
1200    ) {
1201        Self::track_property(
1202            props,
1203            "channels.backend",
1204            &format!("{:?}", config.channels.backend),
1205            &format!("{:?}", defaults.channels.backend),
1206            profile_str,
1207        );
1208        Self::track_property(
1209            props,
1210            "channels.capacity",
1211            &config.channels.capacity.to_string(),
1212            &defaults.channels.capacity.to_string(),
1213            profile_str,
1214        );
1215        Self::track_property(
1216            props,
1217            "channels.redis.url",
1218            config.channels.redis.url.as_deref().unwrap_or(""),
1219            defaults.channels.redis.url.as_deref().unwrap_or(""),
1220            profile_str,
1221        );
1222        Self::track_property(
1223            props,
1224            "channels.redis.key_prefix",
1225            &config.channels.redis.key_prefix,
1226            &defaults.channels.redis.key_prefix,
1227            profile_str,
1228        );
1229    }
1230
1231    fn track_property(
1232        props: &mut HashMap<String, ConfigProperty>,
1233        key: &str,
1234        value: &str,
1235        default_value: &str,
1236        profile: &str,
1237    ) {
1238        // Check if there's an env var override
1239        let env_key = format!("AUTUMN_{}", key.replace('.', "__").to_uppercase());
1240        let source = if std::env::var(&env_key).is_ok() {
1241            env_key
1242        } else if value != default_value && (profile == "dev" || profile == "prod") {
1243            format!("profile_default:{profile}")
1244        } else if value != default_value {
1245            "autumn.toml".to_string()
1246        } else {
1247            "default".to_string()
1248        };
1249
1250        let display_value = if should_redact(key) {
1251            serde_json::Value::String("****".into())
1252        } else {
1253            serde_json::Value::String(value.to_string())
1254        };
1255
1256        props.insert(
1257            key.to_string(),
1258            ConfigProperty {
1259                value: display_value,
1260                source,
1261            },
1262        );
1263    }
1264
1265    /// Get a snapshot of all properties.
1266    #[must_use]
1267    pub fn snapshot(&self) -> HashMap<String, ConfigProperty> {
1268        self.inner
1269            .read()
1270            .map(|guard| guard.clone())
1271            .unwrap_or_default()
1272    }
1273}
1274
1275// ── Health Indicator ─────────────────────────────────────────────
1276
1277/// Health status reported by a [`HealthIndicator`].
1278///
1279/// Follows Spring Boot precedence:
1280/// `Down` > `OutOfService` > `Unknown` > `Up`
1281#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1282pub enum HealthStatus {
1283    /// The component is functioning normally.
1284    #[serde(rename = "UP")]
1285    Up,
1286    /// The component is unavailable.
1287    #[serde(rename = "DOWN")]
1288    Down,
1289    /// The component is out of service (maintenance, etc.).
1290    #[serde(rename = "OUT_OF_SERVICE")]
1291    OutOfService,
1292    /// The component status cannot be determined.
1293    #[serde(rename = "UNKNOWN")]
1294    Unknown,
1295}
1296
1297impl HealthStatus {
1298    /// Human-readable string for this status.
1299    #[must_use]
1300    pub const fn as_str(self) -> &'static str {
1301        match self {
1302            Self::Up => "UP",
1303            Self::Down => "DOWN",
1304            Self::OutOfService => "OUT_OF_SERVICE",
1305            Self::Unknown => "UNKNOWN",
1306        }
1307    }
1308
1309    /// Returns `true` when this status does not indicate a failure
1310    /// (`Up` and `Unknown` are healthy; `Down` and `OutOfService` are not).
1311    #[must_use]
1312    pub const fn is_healthy(self) -> bool {
1313        matches!(self, Self::Up | Self::Unknown)
1314    }
1315}
1316
1317/// Which group a [`HealthIndicator`] belongs to.
1318///
1319/// `Readiness` indicators gate both `/ready` and `/actuator/health`.
1320/// `HealthOnly` indicators appear only in `/actuator/health`.
1321#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1322pub enum IndicatorGroup {
1323    /// Participates in `/ready` and `/actuator/health`.
1324    Readiness,
1325    /// Participates only in `/actuator/health`.
1326    HealthOnly,
1327}
1328
1329/// Output from a single [`HealthIndicator::check`] call.
1330#[derive(Debug, Clone)]
1331pub struct HealthCheckOutput {
1332    /// The health status of this component.
1333    pub status: HealthStatus,
1334    /// Optional human-readable key-value detail map.
1335    pub details: HashMap<String, serde_json::Value>,
1336}
1337
1338impl HealthCheckOutput {
1339    /// Create an `Up` output with no details.
1340    #[must_use]
1341    pub fn up() -> Self {
1342        Self {
1343            status: HealthStatus::Up,
1344            details: HashMap::new(),
1345        }
1346    }
1347
1348    /// Create a `Down` output with no details.
1349    #[must_use]
1350    pub fn down() -> Self {
1351        Self {
1352            status: HealthStatus::Down,
1353            details: HashMap::new(),
1354        }
1355    }
1356
1357    /// Attach a detail map to this output.
1358    #[must_use]
1359    pub fn with_details(mut self, details: HashMap<String, serde_json::Value>) -> Self {
1360        self.details = details;
1361        self
1362    }
1363}
1364
1365/// Contract for a custom health check.
1366///
1367/// Implement this trait and register it via [`crate::app::AppBuilder::health_indicator`]
1368/// to surface the health of an external dependency in `/actuator/health` and optionally
1369/// in `/ready`.
1370///
1371/// # Example
1372///
1373/// ```rust
1374/// use autumn_web::actuator::{HealthCheckOutput, HealthIndicator, HealthStatus};
1375///
1376/// pub struct StripeIndicator;
1377///
1378/// impl HealthIndicator for StripeIndicator {
1379///     fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
1380///         Box::pin(async move {
1381///             // TODO: ping Stripe API
1382///             HealthCheckOutput::up()
1383///         })
1384///     }
1385/// }
1386/// ```
1387pub trait HealthIndicator: Send + Sync + 'static {
1388    /// Run the check and return the current health output.
1389    ///
1390    /// The future is polled inside a per-indicator timeout; if it does not
1391    /// resolve within [`Self::timeout_ms`] milliseconds it is cancelled and
1392    /// the indicator is reported as `Unknown` with `timed_out: true`.
1393    fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput>;
1394
1395    /// Per-indicator timeout in milliseconds. Default: 2 000 ms.
1396    fn timeout_ms(&self) -> u64 {
1397        2000
1398    }
1399
1400    /// Which probe group this indicator belongs to. Default: [`IndicatorGroup::Readiness`].
1401    fn group(&self) -> IndicatorGroup {
1402        IndicatorGroup::Readiness
1403    }
1404}
1405
1406/// A single result returned from [`HealthIndicatorRegistry::run_all`] or
1407/// [`HealthIndicatorRegistry::run_readiness`].
1408#[derive(Debug, Clone)]
1409pub struct HealthRunResult {
1410    /// The registration name of this indicator.
1411    pub name: String,
1412    /// Which probe group this indicator belongs to.
1413    pub group: IndicatorGroup,
1414    /// The output of the check (possibly timed-out).
1415    pub output: HealthCheckOutput,
1416}
1417
1418type IndicatorList = Vec<(String, IndicatorGroup, Arc<dyn HealthIndicator>)>;
1419
1420/// Registry of named [`HealthIndicator`] implementations.
1421///
1422/// Populated by [`crate::app::AppBuilder::health_indicator`] and stored on
1423/// [`crate::AppState`]. Provides duplicate-registration detection at startup
1424/// and per-indicator timeout enforcement at request time.
1425#[derive(Clone, Default)]
1426pub struct HealthIndicatorRegistry {
1427    inner: Arc<RwLock<IndicatorList>>,
1428}
1429
1430impl HealthIndicatorRegistry {
1431    /// Create a new, empty registry.
1432    #[must_use]
1433    pub fn new() -> Self {
1434        Self::default()
1435    }
1436
1437    /// Register a named indicator with its group.
1438    ///
1439    /// Returns `Err` if a indicator with `name` was already registered.
1440    ///
1441    /// # Errors
1442    ///
1443    /// Returns an error string when `name` is already registered.
1444    pub fn register(
1445        &self,
1446        name: impl Into<String>,
1447        group: IndicatorGroup,
1448        indicator: Arc<dyn HealthIndicator>,
1449    ) -> Result<(), String> {
1450        let name = name.into();
1451        let mut inner = self
1452            .inner
1453            .write()
1454            .unwrap_or_else(std::sync::PoisonError::into_inner);
1455        if inner.iter().any(|(n, _, _)| n == &name) {
1456            return Err(format!(
1457                "HealthIndicator '{name}' is already registered; skipping duplicate"
1458            ));
1459        }
1460        inner.push((name, group, indicator));
1461        drop(inner);
1462        Ok(())
1463    }
1464
1465    /// Returns `true` when no indicators have been registered.
1466    #[must_use]
1467    pub fn is_empty(&self) -> bool {
1468        self.inner
1469            .read()
1470            .unwrap_or_else(std::sync::PoisonError::into_inner)
1471            .is_empty()
1472    }
1473
1474    /// Run all registered indicators (both groups) with per-indicator timeouts.
1475    ///
1476    /// All indicators execute **concurrently**; total wall time is bounded by
1477    /// the slowest single indicator rather than N × timeout.
1478    pub async fn run_all(&self) -> Vec<HealthRunResult> {
1479        let entries = self
1480            .inner
1481            .read()
1482            .unwrap_or_else(std::sync::PoisonError::into_inner)
1483            .clone();
1484
1485        let mut results = futures::future::join_all(entries.into_iter().map(
1486            |(name, group, indicator)| async move {
1487                let output = run_with_timeout(indicator.as_ref()).await;
1488                HealthRunResult {
1489                    name,
1490                    group,
1491                    output,
1492                }
1493            },
1494        ))
1495        .await;
1496
1497        for breaker in crate::circuit_breaker::global_registry().all_breakers() {
1498            let state = breaker.state();
1499            let status = match state {
1500                crate::circuit_breaker::CircuitState::Open
1501                | crate::circuit_breaker::CircuitState::HalfOpen => HealthStatus::Down,
1502                crate::circuit_breaker::CircuitState::Closed => HealthStatus::Up,
1503            };
1504
1505            let mut details = HashMap::new();
1506            details.insert(
1507                "state".to_string(),
1508                serde_json::Value::String(state.as_str().to_string()),
1509            );
1510            if let Some(ratio_num) = serde_json::Number::from_f64(breaker.failure_ratio()) {
1511                details.insert(
1512                    "failure_ratio".to_string(),
1513                    serde_json::Value::Number(ratio_num),
1514                );
1515            }
1516
1517            results.push(HealthRunResult {
1518                name: format!("circuit_breaker.{}", breaker.name()),
1519                group: IndicatorGroup::HealthOnly,
1520                output: HealthCheckOutput { status, details },
1521            });
1522        }
1523
1524        results
1525    }
1526
1527    /// Run only `Readiness`-group indicators with per-indicator timeouts.
1528    ///
1529    /// All indicators execute **concurrently**; total wall time is bounded by
1530    /// the slowest single indicator rather than N × timeout.
1531    pub async fn run_readiness(&self) -> Vec<HealthRunResult> {
1532        // Clone the full list to release the read lock before async work begins.
1533        let entries = self
1534            .inner
1535            .read()
1536            .unwrap_or_else(std::sync::PoisonError::into_inner)
1537            .clone();
1538
1539        futures::future::join_all(
1540            entries
1541                .into_iter()
1542                .filter(|(_, g, _)| *g == IndicatorGroup::Readiness)
1543                .map(|(name, group, indicator)| async move {
1544                    let output = run_with_timeout(indicator.as_ref()).await;
1545                    HealthRunResult {
1546                        name,
1547                        group,
1548                        output,
1549                    }
1550                }),
1551        )
1552        .await
1553    }
1554
1555    /// Compute the aggregate status following Spring Boot precedence.
1556    ///
1557    /// Precedence: `Down` > `OutOfService` > `Unknown` > `Up`.
1558    /// An empty slice returns `Up`.
1559    #[must_use]
1560    pub fn aggregate_status(statuses: &[HealthStatus]) -> HealthStatus {
1561        let mut overall = HealthStatus::Up;
1562        for &s in statuses {
1563            overall = match (overall, s) {
1564                (_, HealthStatus::Down) | (HealthStatus::Down, _) => HealthStatus::Down,
1565                (_, HealthStatus::OutOfService) | (HealthStatus::OutOfService, _) => {
1566                    HealthStatus::OutOfService
1567                }
1568                (_, HealthStatus::Unknown) | (HealthStatus::Unknown, _) => HealthStatus::Unknown,
1569                _ => HealthStatus::Up,
1570            };
1571        }
1572        overall
1573    }
1574}
1575
1576/// Run a single indicator with its declared timeout. Returns `Unknown` with
1577/// `timed_out: true` when the future does not resolve in time.
1578async fn run_with_timeout(indicator: &dyn HealthIndicator) -> HealthCheckOutput {
1579    let duration = tokio::time::Duration::from_millis(indicator.timeout_ms());
1580    match tokio::time::timeout(duration, indicator.check()).await {
1581        Ok(output) => output,
1582        Err(_elapsed) => {
1583            let mut details = HashMap::new();
1584            details.insert("timed_out".to_string(), serde_json::Value::Bool(true));
1585            HealthCheckOutput {
1586                status: HealthStatus::Unknown,
1587                details,
1588            }
1589        }
1590    }
1591}
1592
1593// ── Health ──────────────────────────────────────────────────────
1594
1595/// Enhanced health response for the actuator health endpoint.
1596#[derive(Serialize)]
1597struct ActuatorHealth {
1598    /// Overall aggregate status following Spring Boot precedence.
1599    status: &'static str,
1600    version: &'static str,
1601    profile: String,
1602    uptime: String,
1603    #[cfg(feature = "db")]
1604    autumn_after_commit_failures_total: u64,
1605    /// Per-component health, keyed by indicator name.
1606    #[serde(skip_serializing_if = "HashMap::is_empty")]
1607    components: HashMap<String, ComponentHealth>,
1608    /// Backwards-compatible checks block (populated by built-in db indicator).
1609    #[serde(skip_serializing_if = "Option::is_none")]
1610    checks: Option<HealthChecks>,
1611}
1612
1613#[derive(Serialize, Clone)]
1614struct ComponentHealth {
1615    status: &'static str,
1616    #[serde(skip_serializing_if = "Option::is_none")]
1617    details: Option<serde_json::Value>,
1618}
1619
1620#[derive(Serialize)]
1621struct HealthChecks {
1622    #[serde(skip_serializing_if = "Option::is_none")]
1623    database: Option<DatabaseCheck>,
1624}
1625
1626#[derive(Serialize)]
1627struct DatabaseCheck {
1628    status: &'static str,
1629    pool_size: u64,
1630    active_connections: u64,
1631    idle_connections: u64,
1632}
1633
1634fn build_health_components(
1635    db_status: Option<HealthStatus>,
1636    db_check: Option<&DatabaseCheck>,
1637    indicator_results: &[HealthRunResult],
1638    detailed: bool,
1639) -> HashMap<String, ComponentHealth> {
1640    let mut components: HashMap<String, ComponentHealth> = HashMap::new();
1641    // Custom indicators first so the built-in "db" key inserted below can never
1642    // be overwritten by a user-registered indicator with the same name.
1643    for result in indicator_results {
1644        if !detailed
1645            && result.name.starts_with("circuit_breaker.")
1646            && result.output.status.is_healthy()
1647        {
1648            continue;
1649        }
1650        let details = (detailed && !result.output.details.is_empty())
1651            .then(|| serde_json::to_value(&result.output.details).unwrap_or_default());
1652        components.insert(
1653            result.name.clone(),
1654            ComponentHealth {
1655                status: result.output.status.as_str(),
1656                details,
1657            },
1658        );
1659    }
1660    if let Some(s) = db_status {
1661        let details = detailed
1662            .then(|| {
1663                db_check.map(|d| {
1664                    serde_json::json!({
1665                        "status": d.status,
1666                        "pool_size": d.pool_size,
1667                        "active_connections": d.active_connections,
1668                        "idle_connections": d.idle_connections,
1669                    })
1670                })
1671            })
1672            .flatten();
1673        components.insert(
1674            "db".to_string(),
1675            ComponentHealth {
1676                status: s.as_str(),
1677                details,
1678            },
1679        );
1680    }
1681    components
1682}
1683
1684/// `GET <actuator-prefix>/health`
1685pub async fn health<S: ProvideActuatorState + Send + Sync + 'static>(
1686    State(state): State<S>,
1687) -> impl IntoResponse {
1688    let detailed = state.health_detailed();
1689
1690    // ── built-in db component ────────────────────────────────────
1691    let (db_component_status, db_check) = {
1692        #[cfg(feature = "db")]
1693        {
1694            #[allow(clippy::option_if_let_else)]
1695            if let Some(pool) = state.pool() {
1696                let status = pool.status();
1697                let available = status.available as u64;
1698                let size = status.max_size as u64;
1699                let waiting = status.waiting as u64;
1700                let idle = available;
1701                let active = size.saturating_sub(available);
1702
1703                let healthy = available > 0 || waiting == 0;
1704                let db_status = if healthy {
1705                    HealthStatus::Up
1706                } else {
1707                    HealthStatus::Down
1708                };
1709                let db_check = Some(DatabaseCheck {
1710                    status: if healthy { "ok" } else { "down" },
1711                    pool_size: size,
1712                    active_connections: active,
1713                    idle_connections: idle,
1714                });
1715                (Some(db_status), db_check)
1716            } else {
1717                (None, None)
1718            }
1719        }
1720        #[cfg(not(feature = "db"))]
1721        {
1722            (None::<HealthStatus>, None::<DatabaseCheck>)
1723        }
1724    };
1725
1726    // ── registered custom indicators ───────────────────────────
1727    let indicator_results = if let Some(registry) = state.health_indicator_registry() {
1728        registry.run_all().await
1729    } else {
1730        Vec::new()
1731    };
1732
1733    // ── aggregate status ────────────────────────────────────────
1734    let mut all_statuses: Vec<HealthStatus> =
1735        indicator_results.iter().map(|r| r.output.status).collect();
1736    if let Some(s) = db_component_status {
1737        all_statuses.push(s);
1738    }
1739    let overall = HealthIndicatorRegistry::aggregate_status(&all_statuses);
1740
1741    // ── build components map ────────────────────────────────────
1742    let components = build_health_components(
1743        db_component_status,
1744        db_check.as_ref(),
1745        &indicator_results,
1746        detailed,
1747    );
1748
1749    let checks = db_check.map(|db| HealthChecks { database: Some(db) });
1750
1751    let body = ActuatorHealth {
1752        status: overall.as_str(),
1753        version: env!("CARGO_PKG_VERSION"),
1754        profile: state.profile().to_owned(),
1755        uptime: state.uptime_display(),
1756        #[cfg(feature = "db")]
1757        autumn_after_commit_failures_total: crate::db::AFTER_COMMIT_FAILURES_TOTAL
1758            .load(std::sync::atomic::Ordering::Relaxed),
1759        components,
1760        checks,
1761    };
1762
1763    let code = if overall.is_healthy() {
1764        StatusCode::OK
1765    } else {
1766        StatusCode::SERVICE_UNAVAILABLE
1767    };
1768    (code, Json(body))
1769}
1770
1771// ── Info ────────────────────────────────────────────────────────
1772
1773/// Application info response.
1774#[derive(Serialize)]
1775pub(crate) struct ActuatorInfo {
1776    app: AppInfo,
1777    autumn: FrameworkInfo,
1778    runtime: RuntimeInfo,
1779}
1780
1781#[derive(Serialize)]
1782struct AppInfo {
1783    name: String,
1784    version: String,
1785}
1786
1787#[derive(Serialize)]
1788struct FrameworkInfo {
1789    version: &'static str,
1790    profile: String,
1791}
1792
1793#[derive(Serialize)]
1794struct RuntimeInfo {
1795    uptime: String,
1796}
1797
1798/// `GET <actuator-prefix>/info`
1799pub(crate) async fn info<S: ProvideActuatorState + Send + Sync + 'static>(
1800    State(state): State<S>,
1801) -> Json<ActuatorInfo> {
1802    Json(ActuatorInfo {
1803        app: AppInfo {
1804            name: std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".into()),
1805            version: std::env::var("CARGO_PKG_VERSION").unwrap_or_else(|_| "unknown".into()),
1806        },
1807        autumn: FrameworkInfo {
1808            version: env!("CARGO_PKG_VERSION"),
1809            profile: state.profile().to_owned(),
1810        },
1811        runtime: RuntimeInfo {
1812            uptime: state.uptime_display(),
1813        },
1814    })
1815}
1816
1817// ── Env (sensitive) ─────────────────────────────────────────────
1818
1819/// Config environment response with redacted secrets.
1820#[derive(Serialize)]
1821pub(crate) struct ActuatorEnv {
1822    active_profile: String,
1823    properties: std::collections::HashMap<String, serde_json::Value>,
1824}
1825
1826/// Keys that trigger value redaction.
1827const REDACT_PATTERNS: &[&str] = &[
1828    "password",
1829    "secret",
1830    "key",
1831    "token",
1832    "credential",
1833    "auth",
1834    "url",
1835];
1836
1837fn should_redact(key: &str) -> bool {
1838    let lower = key.to_lowercase();
1839    REDACT_PATTERNS.iter().any(|p| lower.contains(p))
1840}
1841
1842/// `GET /actuator/env` — only available when actuator sensitive mode is enabled.
1843pub(crate) async fn env_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1844    State(state): State<S>,
1845) -> Json<ActuatorEnv> {
1846    let properties = state
1847        .config_props()
1848        .snapshot()
1849        .into_iter()
1850        .map(|(key, prop)| (key, prop.value))
1851        .collect();
1852
1853    Json(ActuatorEnv {
1854        active_profile: state.profile().to_owned(),
1855        properties,
1856    })
1857}
1858
1859// ── Metrics ────────────────────────────────────────────────────
1860
1861/// `GET <actuator-prefix>/metrics` -- request metrics, latency, status codes, DB pool stats.
1862pub(crate) async fn metrics_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1863    State(state): State<S>,
1864) -> Json<serde_json::Value> {
1865    let snapshot = state.metrics().snapshot();
1866    let mut result = serde_json::to_value(&snapshot).unwrap_or_default();
1867
1868    // Include DB pool stats if available
1869    #[cfg(feature = "db")]
1870    if let Some(pool) = state.pool() {
1871        let status = pool.status();
1872        let db_stats = serde_json::json!({
1873            "pool_size": status.max_size,
1874            "active_connections": (status.max_size as u64).saturating_sub(status.available as u64),
1875            "idle_connections": status.available,
1876        });
1877        if let serde_json::Value::Object(ref mut map) = result {
1878            map.insert("database".to_string(), db_stats);
1879        }
1880    }
1881
1882    // Include plugin-contributed sources under the "sources" key
1883    if let Some(registry) = state.metrics_source_registry() {
1884        let all = registry.collect_all();
1885        let mut sources = serde_json::Map::new();
1886        for (source_name, families) in all {
1887            let families_json: Vec<serde_json::Value> = families
1888                .iter()
1889                .map(|f| {
1890                    serde_json::json!({
1891                        "name": f.name,
1892                        "help": f.help,
1893                        "kind": f.kind.as_str(),
1894                        "samples": f.samples.iter().map(|s| {
1895                            let labels: serde_json::Map<String, serde_json::Value> = s.labels
1896                                .iter()
1897                                .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1898                                .collect();
1899                            serde_json::json!({
1900                                "labels": labels,
1901                                "value": s.value,
1902                            })
1903                        }).collect::<Vec<_>>(),
1904                    })
1905                })
1906                .collect();
1907            sources.insert(source_name, serde_json::Value::Array(families_json));
1908        }
1909        if let serde_json::Value::Object(ref mut map) = result {
1910            map.insert("sources".to_string(), serde_json::Value::Object(sources));
1911        }
1912    }
1913
1914    Json(result)
1915}
1916
1917#[derive(Serialize)]
1918pub(crate) struct CircuitBreakerActuatorResponse {
1919    pub name: String,
1920    pub state: &'static str,
1921    pub failure_ratio: f64,
1922    #[serde(skip_serializing_if = "Option::is_none")]
1923    pub failure_ratio_threshold: Option<f64>,
1924    #[serde(skip_serializing_if = "Option::is_none")]
1925    pub sample_window_secs: Option<u64>,
1926    #[serde(skip_serializing_if = "Option::is_none")]
1927    pub minimum_sample_count: Option<u64>,
1928    #[serde(skip_serializing_if = "Option::is_none")]
1929    pub open_duration_secs: Option<u64>,
1930    #[serde(skip_serializing_if = "Option::is_none")]
1931    pub half_open_trial_count: Option<u64>,
1932}
1933
1934/// `GET <actuator-prefix>/circuitbreakers`
1935pub(crate) async fn circuitbreakers_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1936    State(state): State<S>,
1937) -> Json<Vec<CircuitBreakerActuatorResponse>> {
1938    let detailed = state.health_detailed();
1939    let mut responses = Vec::new();
1940
1941    for breaker in crate::circuit_breaker::global_registry().all_breakers() {
1942        let policy = breaker.config();
1943        responses.push(CircuitBreakerActuatorResponse {
1944            name: breaker.name().to_string(),
1945            state: breaker.state().as_str(),
1946            failure_ratio: breaker.failure_ratio(),
1947            failure_ratio_threshold: detailed.then_some(policy.failure_ratio_threshold),
1948            sample_window_secs: detailed.then_some(policy.sample_window.as_secs()),
1949            minimum_sample_count: detailed.then_some(policy.minimum_sample_count),
1950            open_duration_secs: detailed.then_some(policy.open_duration.as_secs()),
1951            half_open_trial_count: detailed.then_some(policy.half_open_trial_count),
1952        });
1953    }
1954
1955    Json(responses)
1956}
1957
1958// ── Prometheus ─────────────────────────────────────────────────
1959
1960/// Render label set `{k="v",...}` or empty string for no labels.
1961///
1962/// Writes directly into a pre-allocated `String` to avoid per-pair heap allocations.
1963fn render_labels(labels: &[(String, String)]) -> String {
1964    if labels.is_empty() {
1965        return String::new();
1966    }
1967    let mut out = String::with_capacity(64);
1968    out.push('{');
1969    for (i, (k, v)) in labels.iter().enumerate() {
1970        if i > 0 {
1971            out.push(',');
1972        }
1973        out.push_str(k);
1974        out.push_str("=\"");
1975        for c in v.chars() {
1976            match c {
1977                '\\' => out.push_str("\\\\"),
1978                '\n' => out.push_str("\\n"),
1979                '"' => out.push_str("\\\""),
1980                other => out.push(other),
1981            }
1982        }
1983        out.push('"');
1984    }
1985    out.push('}');
1986    out
1987}
1988
1989/// Returns true if `s` is a valid Prometheus metric name (`[a-zA-Z_:][a-zA-Z0-9_:]*`).
1990fn is_valid_metric_name(s: &str) -> bool {
1991    let mut it = s.chars();
1992    matches!(it.next(), Some(c) if c.is_ascii_alphabetic() || c == '_' || c == ':')
1993        && it.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == ':')
1994}
1995
1996/// Returns true if `s` is a valid Prometheus label name (`[a-zA-Z_][a-zA-Z0-9_]*`).
1997fn is_valid_label_name(s: &str) -> bool {
1998    let mut it = s.chars();
1999    matches!(it.next(), Some(c) if c.is_ascii_alphabetic() || c == '_')
2000        && it.all(|c| c.is_ascii_alphanumeric() || c == '_')
2001}
2002
2003/// Escape a Prometheus label value (backslash, newline, and double-quote).
2004fn escape_prometheus_label_value(value: &str) -> String {
2005    let mut out = String::with_capacity(value.len());
2006    for c in value.chars() {
2007        match c {
2008            '\\' => out.push_str("\\\\"),
2009            '\n' => out.push_str("\\n"),
2010            '"' => out.push_str("\\\""),
2011            other => out.push(other),
2012        }
2013    }
2014    out
2015}
2016
2017/// Escape a Prometheus HELP string (backslash and newline only).
2018fn escape_help_text(text: &str) -> String {
2019    let mut out = String::with_capacity(text.len());
2020    for c in text.chars() {
2021        match c {
2022            '\\' => out.push_str("\\\\"),
2023            '\n' => out.push_str("\\n"),
2024            other => out.push(other),
2025        }
2026    }
2027    out
2028}
2029
2030/// Format a sample value per Prometheus text format (handles ±Inf and NaN).
2031fn format_sample_value(v: f64) -> String {
2032    if v == f64::INFINITY {
2033        "+Inf".to_string()
2034    } else if v == f64::NEG_INFINITY {
2035        "-Inf".to_string()
2036    } else if v.is_nan() {
2037        "NaN".to_string()
2038    } else {
2039        v.to_string()
2040    }
2041}
2042
2043/// Append all plugin-contributed metric families (and their error counters) to `out`.
2044///
2045/// `emitted_families` must be pre-seeded with the names of every built-in
2046/// metric family already written to `out`; families whose names collide are
2047/// skipped with a warning so no duplicate `# HELP`/`# TYPE` blocks are emitted.
2048fn render_plugin_sources(
2049    registry: &MetricsSourceRegistry,
2050    out: &mut String,
2051    emitted_families: &mut std::collections::HashSet<String>,
2052) {
2053    use std::fmt::Write;
2054
2055    let all_sources = registry.collect_all();
2056    for (_source_name, families) in &all_sources {
2057        for family in families {
2058            if !is_valid_metric_name(&family.name) {
2059                tracing::warn!(name = %family.name, "MetricsSource returned invalid metric name; skipping family");
2060                continue;
2061            }
2062            if !emitted_families.insert(family.name.clone()) {
2063                tracing::warn!(name = %family.name, "MetricsSource returned duplicate metric family name; skipping family");
2064                continue;
2065            }
2066            let _ = writeln!(
2067                out,
2068                "# HELP {} {}",
2069                family.name,
2070                escape_help_text(&family.help)
2071            );
2072            let _ = writeln!(out, "# TYPE {} {}", family.name, family.kind.as_str());
2073            let mut emitted_series: std::collections::HashSet<String> =
2074                std::collections::HashSet::new();
2075            for sample in &family.samples {
2076                let mut bad_key = false;
2077                let mut seen_keys = std::collections::HashSet::new();
2078                let mut valid_labels: Vec<(String, String)> = Vec::new();
2079                for (k, v) in &sample.labels {
2080                    if !is_valid_label_name(k) {
2081                        tracing::warn!(
2082                            label_name = %k,
2083                            metric = %family.name,
2084                            "MetricsSource returned invalid label name; skipping sample"
2085                        );
2086                        bad_key = true;
2087                        break;
2088                    }
2089                    if !seen_keys.insert(k.as_str()) {
2090                        tracing::warn!(label_name = %k, "MetricsSource returned duplicate label name; dropping duplicate");
2091                        continue;
2092                    }
2093                    valid_labels.push((k.clone(), v.clone()));
2094                }
2095                if bad_key {
2096                    continue;
2097                }
2098                // Sort by key so {a="1",b="2"} and {b="2",a="1"} produce the
2099                // same canonical string and are treated as one series.
2100                valid_labels.sort_by(|(a, _), (b, _)| a.cmp(b));
2101                let labels = render_labels(&valid_labels);
2102                if !emitted_series.insert(labels.clone()) {
2103                    tracing::warn!(
2104                        metric = %family.name,
2105                        labels = %labels,
2106                        "MetricsSource returned duplicate series; skipping sample"
2107                    );
2108                    continue;
2109                }
2110                let _ = writeln!(
2111                    out,
2112                    "{}{} {}",
2113                    family.name,
2114                    labels,
2115                    format_sample_value(sample.value)
2116                );
2117            }
2118        }
2119    }
2120
2121    let error_counts = registry.error_counts();
2122    if !error_counts.is_empty() {
2123        out.push_str(
2124            "# HELP autumn_metrics_source_errors_total \
2125             Number of scrape errors (panics) per plugin metrics source\n",
2126        );
2127        out.push_str("# TYPE autumn_metrics_source_errors_total counter\n");
2128        let mut names: Vec<&String> = error_counts.keys().collect();
2129        names.sort();
2130        for name in names {
2131            let label = render_labels(&[("source".to_string(), name.clone())]);
2132            let _ = writeln!(
2133                out,
2134                "autumn_metrics_source_errors_total{} {}",
2135                label, error_counts[name]
2136            );
2137        }
2138    }
2139}
2140
2141/// Render the built-in `autumn_http_*` metric families into `out`, tagged with
2142/// the replica's deploy `version` label so canary and stable cohorts can be
2143/// compared by a controller scraping both.
2144fn write_builtin_http_metrics(
2145    out: &mut String,
2146    version: &str,
2147    snapshot: &crate::middleware::metrics::MetricsSnapshot,
2148) {
2149    use std::fmt::Write;
2150
2151    // requests_total
2152    out.push_str("# HELP autumn_http_requests_total Total number of HTTP requests\n");
2153    out.push_str("# TYPE autumn_http_requests_total counter\n");
2154    let _ = writeln!(
2155        out,
2156        "autumn_http_requests_total{{version=\"{version}\"}} {}",
2157        snapshot.http.requests_total
2158    );
2159
2160    // requests_active
2161    out.push_str("# HELP autumn_http_requests_active Currently active HTTP requests\n");
2162    out.push_str("# TYPE autumn_http_requests_active gauge\n");
2163    let _ = writeln!(
2164        out,
2165        "autumn_http_requests_active{{version=\"{version}\"}} {}",
2166        snapshot.http.requests_active
2167    );
2168
2169    // by_status
2170    out.push_str("# HELP autumn_http_responses_total HTTP responses by status code\n");
2171    out.push_str("# TYPE autumn_http_responses_total counter\n");
2172    for (status, count) in [
2173        ("2xx", snapshot.http.by_status.s2xx),
2174        ("3xx", snapshot.http.by_status.s3xx),
2175        ("4xx", snapshot.http.by_status.s4xx),
2176        ("5xx", snapshot.http.by_status.s5xx),
2177    ] {
2178        let _ = writeln!(
2179            out,
2180            "autumn_http_responses_total{{version=\"{version}\",status=\"{status}\"}} {count}"
2181        );
2182    }
2183
2184    // request_duration_seconds — global latency percentiles exposed as Prometheus
2185    // summary-style quantiles, labelled by deploy version so a canary controller
2186    // can gate promotion on p99 latency per cohort.
2187    out.push_str(
2188        "# HELP autumn_http_request_duration_seconds HTTP request latency percentiles in seconds\n",
2189    );
2190    out.push_str("# TYPE autumn_http_request_duration_seconds summary\n");
2191    for (quantile, millis) in [
2192        ("0.5", snapshot.http.latency_ms.p50),
2193        ("0.95", snapshot.http.latency_ms.p95),
2194        ("0.99", snapshot.http.latency_ms.p99),
2195    ] {
2196        #[allow(clippy::cast_precision_loss)]
2197        let seconds = millis as f64 / 1000.0;
2198        let _ = writeln!(
2199            out,
2200            "autumn_http_request_duration_seconds{{version=\"{version}\",quantile=\"{quantile}\"}} {seconds}"
2201        );
2202    }
2203
2204    // autumn_shutdown_aborted_requests_total
2205    out.push_str(
2206        "# HELP autumn_shutdown_aborted_requests_total \
2207         HTTP requests forcibly dropped when the graceful-shutdown drain deadline expired\n",
2208    );
2209    out.push_str("# TYPE autumn_shutdown_aborted_requests_total counter\n");
2210    let _ = writeln!(
2211        out,
2212        "autumn_shutdown_aborted_requests_total{{version=\"{version}\"}} {}",
2213        snapshot.http.shutdown_aborted_requests_total
2214    );
2215
2216    // autumn_request_timeouts_total
2217    out.push_str(
2218        "# HELP autumn_request_timeouts_total \
2219         HTTP requests that exceeded the configured per-request timeout\n",
2220    );
2221    out.push_str("# TYPE autumn_request_timeouts_total counter\n");
2222    let _ = writeln!(
2223        out,
2224        "autumn_request_timeouts_total{{version=\"{version}\"}} {}",
2225        snapshot.http.request_timeouts_total
2226    );
2227
2228    // by_route
2229    if !snapshot.http.by_route.is_empty() {
2230        out.push_str("# HELP autumn_http_route_requests_total HTTP requests by route and method\n");
2231        out.push_str("# TYPE autumn_http_route_requests_total counter\n");
2232        let mut route_keys: Vec<&String> = snapshot.http.by_route.keys().collect();
2233        route_keys.sort();
2234        for route_key in route_keys {
2235            let metrics = &snapshot.http.by_route[route_key];
2236            // route_key is formatted as "METHOD /path"
2237            if let Some((method, path)) = route_key.split_once(' ') {
2238                let _ = writeln!(
2239                    out,
2240                    "autumn_http_route_requests_total{{version=\"{version}\",method=\"{method}\",route=\"{path}\"}} {}",
2241                    metrics.count
2242                );
2243            }
2244        }
2245    }
2246}
2247
2248/// `GET <actuator-prefix>/prometheus` -- export metrics in Prometheus format.
2249pub(crate) async fn prometheus_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2250    State(state): State<S>,
2251) -> impl IntoResponse {
2252    let snapshot = state.metrics().snapshot();
2253    // Deploy-version label so a canary controller can compare canary vs. stable
2254    // cohorts. Escaped defensively in case an operator sets an exotic value via
2255    // AUTUMN_DEPLOY_VERSION.
2256    let version = escape_prometheus_label_value(&state.deploy_version());
2257    let mut out = String::with_capacity(2048);
2258
2259    write_builtin_http_metrics(&mut out, &version, &snapshot);
2260
2261    // Plugin-contributed metric families — seed with built-in names so
2262    // plugins cannot shadow or duplicate them.
2263    if let Some(registry) = state.metrics_source_registry() {
2264        let mut emitted_families: std::collections::HashSet<String> = [
2265            "autumn_http_requests_total",
2266            "autumn_http_requests_active",
2267            "autumn_http_responses_total",
2268            "autumn_http_request_duration_seconds",
2269            "autumn_shutdown_aborted_requests_total",
2270            "autumn_request_timeouts_total",
2271            "autumn_http_route_requests_total",
2272            "autumn_metrics_source_errors_total",
2273        ]
2274        .iter()
2275        .map(|s| (*s).to_string())
2276        .collect();
2277        render_plugin_sources(registry, &mut out, &mut emitted_families);
2278    }
2279
2280    (
2281        [(
2282            axum::http::header::CONTENT_TYPE,
2283            "text/plain; version=0.0.4",
2284        )],
2285        out,
2286    )
2287}
2288
2289// ── Config Properties (sensitive) ──────────────────────────────
2290
2291/// `GET <actuator-prefix>/configprops` -- all config properties with source tracking.
2292pub(crate) async fn configprops_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2293    State(state): State<S>,
2294) -> Json<serde_json::Value> {
2295    let props = state.config_props().snapshot();
2296
2297    Json(serde_json::json!({
2298        "active_profile": state.profile(),
2299        "properties": props,
2300    }))
2301}
2302
2303// ── Loggers (sensitive) ────────────────────────────────────────
2304
2305/// Available log levels for the loggers endpoint.
2306const AVAILABLE_LEVELS: &[&str] = &["trace", "debug", "info", "warn", "error"];
2307
2308/// Response for `GET <actuator-prefix>/loggers`.
2309#[derive(Serialize)]
2310pub(crate) struct LoggersResponse {
2311    current_level: String,
2312    available_levels: Vec<&'static str>,
2313    loggers: HashMap<String, String>,
2314}
2315
2316/// `GET <actuator-prefix>/loggers` -- view current log levels.
2317pub(crate) async fn loggers_get<S: ProvideActuatorState + Send + Sync + 'static>(
2318    State(state): State<S>,
2319) -> Json<LoggersResponse> {
2320    Json(LoggersResponse {
2321        current_level: state.log_levels().current_level(),
2322        available_levels: AVAILABLE_LEVELS.to_vec(),
2323        loggers: state.log_levels().logger_overrides(),
2324    })
2325}
2326
2327/// Request body for `PUT <actuator-prefix>/loggers/{name}`.
2328#[derive(Deserialize)]
2329pub(crate) struct SetLoggerRequest {
2330    level: String,
2331}
2332
2333/// `PUT <actuator-prefix>/loggers/{name}` -- change a logger's level at runtime.
2334pub(crate) async fn loggers_put<S: ProvideActuatorState + Send + Sync + 'static>(
2335    State(state): State<S>,
2336    Path(name): Path<String>,
2337    Json(body): Json<SetLoggerRequest>,
2338) -> impl IntoResponse {
2339    let level = body.level.to_lowercase();
2340
2341    // Validate the level
2342    if !AVAILABLE_LEVELS.contains(&level.as_str()) {
2343        return (
2344            StatusCode::BAD_REQUEST,
2345            Json(serde_json::json!({
2346                "status": "error",
2347                "message": format!(
2348                    "Invalid level '{}'. Available levels: {}",
2349                    level,
2350                    AVAILABLE_LEVELS.join(", ")
2351                ),
2352            })),
2353        );
2354    }
2355
2356    let previous = state.log_levels().set_logger_level(&name, &level);
2357
2358    (
2359        StatusCode::OK,
2360        Json(serde_json::json!({
2361            "status": "ok",
2362            "message": format!("Logger '{}' set to '{}'", name, level),
2363            "previous": previous,
2364        })),
2365    )
2366}
2367
2368// ── Logfile (sensitive) ────────────────────────────────────────
2369
2370/// Query parameters for `GET <actuator-prefix>/logfile`.
2371#[derive(Debug, Deserialize, Default)]
2372pub(crate) struct LogfileQuery {
2373    /// Minimum log level to include (case-insensitive).
2374    ///
2375    /// Valid values: `trace`, `debug`, `info`, `warn`, `error`.
2376    /// When absent all levels are returned.
2377    pub level: Option<String>,
2378    /// Maximum number of entries to return (most-recent N, newest-last).
2379    pub limit: Option<usize>,
2380}
2381
2382/// JSON response shape for `GET <actuator-prefix>/logfile`.
2383#[derive(Debug, Serialize)]
2384pub(crate) struct LogfileResponse {
2385    /// Captured log entries, oldest first.
2386    pub entries: Vec<crate::log::capture::CapturedLogEntry>,
2387    /// Total entries in the buffer (before `limit` is applied).
2388    pub total: usize,
2389    /// `true` when the capture buffer is enabled and populated by the layer.
2390    pub capture_enabled: bool,
2391}
2392
2393/// `GET <actuator-prefix>/logfile` — recent structured log entries.
2394///
2395/// Returns entries from the in-memory capture buffer, filtered by `?level=`
2396/// and capped by `?limit=`. Only available when `actuator.sensitive = true`
2397/// and `log.capture.enabled = true`.  When capture is disabled the endpoint
2398/// still responds with `200` and an empty list so API consumers can handle
2399/// the case uniformly.
2400///
2401/// Returns `400 Bad Request` when an unrecognised `?level=` value is supplied
2402/// so that typos (e.g. `?level=warning`) are rejected rather than silently
2403/// broadening the response to all captured entries.
2404pub(crate) async fn logfile_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2405    State(state): State<S>,
2406    axum::extract::Query(query): axum::extract::Query<LogfileQuery>,
2407) -> Result<axum::Json<LogfileResponse>, (StatusCode, axum::Json<serde_json::Value>)> {
2408    let min_level = match query.level.as_deref() {
2409        None => None,
2410        Some(s) => match crate::log::capture::level_from_str(s) {
2411            Some(level) => Some(level),
2412            None => {
2413                return Err((
2414                    StatusCode::BAD_REQUEST,
2415                    axum::Json(serde_json::json!({
2416                        "error": format!(
2417                            "invalid level {:?}; valid values: TRACE, DEBUG, INFO, WARN, ERROR",
2418                            s
2419                        )
2420                    })),
2421                ));
2422            }
2423        },
2424    };
2425
2426    Ok(match state.log_buffer() {
2427        None => axum::Json(LogfileResponse {
2428            entries: vec![],
2429            total: 0,
2430            capture_enabled: false,
2431        }),
2432        Some(buf) => {
2433            let total = buf.len();
2434            let entries = buf.snapshot(min_level, query.limit);
2435            axum::Json(LogfileResponse {
2436                entries,
2437                total,
2438                capture_enabled: true,
2439            })
2440        }
2441    })
2442}
2443
2444// ── Tasks (sensitive) ──────────────────────────────────────────
2445
2446/// `GET <actuator-prefix>/tasks` -- scheduled task status.
2447pub(crate) async fn tasks_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2448    State(state): State<S>,
2449) -> Json<serde_json::Value> {
2450    let tasks = state.task_registry().snapshot();
2451
2452    Json(serde_json::json!({
2453        "scheduled_tasks": tasks,
2454    }))
2455}
2456
2457/// `GET <actuator-prefix>/jobs` -- ad-hoc background job status.
2458pub(crate) async fn jobs_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2459    State(state): State<S>,
2460) -> Json<serde_json::Value> {
2461    let jobs = state.job_registry().snapshot();
2462    Json(serde_json::json!({ "jobs": jobs }))
2463}
2464
2465#[cfg(feature = "http-client")]
2466/// Request body for `POST <actuator-prefix>/webhooks/replay`.
2467#[derive(Deserialize)]
2468pub(crate) struct ReplayRequest {
2469    log_id: String,
2470}
2471
2472#[cfg(feature = "http-client")]
2473async fn enqueue_webhook_replay_job(log_id: &str) -> Result<(), String> {
2474    let job_payload = serde_json::json!({
2475        "log_id": log_id,
2476        "replay": true,
2477    });
2478
2479    let Some(job_client) = crate::job::global_job_client() else {
2480        return Err("Global job client is not available".to_string());
2481    };
2482
2483    job_client
2484        .enqueue("autumn_webhook_delivery", job_payload)
2485        .await
2486        .map_err(|e| format!("Failed to enqueue job: {e}"))
2487}
2488
2489#[cfg(feature = "http-client")]
2490/// `GET <actuator-prefix>/webhooks/dlq` -- list dead-lettered webhook logs.
2491pub(crate) async fn webhooks_dlq_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2492    State(state): State<S>,
2493) -> impl IntoResponse {
2494    let Some(manager) = state.webhook_outbound() else {
2495        return (
2496            StatusCode::NOT_IMPLEMENTED,
2497            Json(serde_json::json!({
2498                "status": "error",
2499                "message": "Outbound webhook support is not configured or enabled"
2500            })),
2501        )
2502            .into_response();
2503    };
2504
2505    match manager.store().get_dlq_logs().await {
2506        Ok(logs) => (StatusCode::OK, Json(logs)).into_response(),
2507        Err(e) => (
2508            StatusCode::INTERNAL_SERVER_ERROR,
2509            Json(serde_json::json!({
2510                "status": "error",
2511                "message": format!("Failed to fetch DLQ logs: {}", e)
2512            })),
2513        )
2514            .into_response(),
2515    }
2516}
2517
2518#[cfg(feature = "http-client")]
2519/// `POST <actuator-prefix>/webhooks/replay` -- replay a dead-lettered webhook log.
2520pub(crate) async fn webhooks_replay_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2521    State(state): State<S>,
2522    Json(body): Json<ReplayRequest>,
2523) -> impl IntoResponse {
2524    let Some(manager) = state.webhook_outbound() else {
2525        return (
2526            StatusCode::NOT_IMPLEMENTED,
2527            Json(serde_json::json!({
2528                "status": "error",
2529                "message": "Outbound webhook support is not configured or enabled"
2530            })),
2531        )
2532            .into_response();
2533    };
2534
2535    let log_opt = match manager.store().get_delivery_log(&body.log_id).await {
2536        Ok(log) => log,
2537        Err(e) => {
2538            return (
2539                StatusCode::INTERNAL_SERVER_ERROR,
2540                Json(serde_json::json!({
2541                    "status": "error",
2542                    "message": format!("Failed to retrieve log: {}", e)
2543                })),
2544            )
2545                .into_response();
2546        }
2547    };
2548
2549    let Some(log) = log_opt else {
2550        return (
2551            StatusCode::NOT_FOUND,
2552            Json(serde_json::json!({
2553                "status": "error",
2554                "message": format!("Log with ID {} not found", body.log_id)
2555            })),
2556        )
2557            .into_response();
2558    };
2559
2560    if !log.is_dlq {
2561        return (StatusCode::BAD_REQUEST, Json(serde_json::json!({
2562            "status": "error",
2563            "message": format!("Log with ID {} is not in the Dead Letter Queue (DLQ)", body.log_id)
2564        }))).into_response();
2565    }
2566
2567    if let Some(response) = blocked_webhook_replay_response(&manager, &log, &body.log_id).await {
2568        return response;
2569    }
2570
2571    let subscription_id = log.subscription_id.clone();
2572    let original_log = log.clone();
2573    let log = reset_webhook_replay_log(log);
2574
2575    if let Err(e) = manager.store().log_delivery(log).await {
2576        return (
2577            StatusCode::INTERNAL_SERVER_ERROR,
2578            Json(serde_json::json!({
2579                "status": "error",
2580                "message": format!("Failed to update delivery log state: {}", e)
2581            })),
2582        )
2583            .into_response();
2584    }
2585
2586    // Enqueue background delivery job now that the log state is safely reset in the store
2587    if let Err(message) = enqueue_webhook_replay_job(&body.log_id).await {
2588        if let Err(rollback_error) = manager.store().replace_delivery_log(original_log).await {
2589            tracing::error!(
2590                log_id = %body.log_id,
2591                "Failed to roll back webhook replay log after enqueue failure: {}",
2592                rollback_error
2593            );
2594            return (
2595                StatusCode::INTERNAL_SERVER_ERROR,
2596                Json(serde_json::json!({
2597                    "status": "error",
2598                    "message": format!("{message}; failed to restore DLQ log state: {rollback_error}")
2599                })),
2600            )
2601                .into_response();
2602        }
2603
2604        return (
2605            StatusCode::INTERNAL_SERVER_ERROR,
2606            Json(serde_json::json!({
2607                "status": "error",
2608                "message": message
2609            })),
2610        )
2611            .into_response();
2612    }
2613
2614    // Reactivate auto-failed subscriptions only after the replay job is queued.
2615    if let Err(e) = manager
2616        .store()
2617        .reactivate_failed_subscription(&subscription_id)
2618        .await
2619    {
2620        tracing::warn!(subscription_id = %subscription_id, "Failed to reactivate subscription during replay: {}", e);
2621    }
2622
2623    (
2624        StatusCode::OK,
2625        Json(serde_json::json!({
2626            "status": "ok",
2627            "message": format!("Replay successfully enqueued for log {}", body.log_id)
2628        })),
2629    )
2630        .into_response()
2631}
2632
2633#[cfg(feature = "http-client")]
2634fn reset_webhook_replay_log(
2635    mut log: crate::webhook_outbound::WebhookDeliveryLog,
2636) -> crate::webhook_outbound::WebhookDeliveryLog {
2637    log.is_dlq = false;
2638    log.attempt = 1;
2639    log.last_error = None;
2640    log.response_status = None;
2641    log.response_body = None;
2642    log.timestamp = chrono::Utc::now();
2643    log
2644}
2645
2646#[cfg(feature = "http-client")]
2647async fn blocked_webhook_replay_response(
2648    manager: &crate::webhook_outbound::WebhookOutboundManager,
2649    log: &crate::webhook_outbound::WebhookDeliveryLog,
2650    log_id: &str,
2651) -> Option<axum::response::Response> {
2652    let subscription = match manager.store().get_subscription(&log.subscription_id).await {
2653        Ok(subscription) => subscription,
2654        Err(e) => {
2655            return Some(
2656                (
2657                    StatusCode::INTERNAL_SERVER_ERROR,
2658                    Json(serde_json::json!({
2659                        "status": "error",
2660                        "message": format!("Failed to retrieve subscription: {}", e)
2661                    })),
2662                )
2663                    .into_response(),
2664            );
2665        }
2666    };
2667
2668    let Some(subscription) = subscription else {
2669        return Some(
2670            (
2671                StatusCode::NOT_FOUND,
2672                Json(serde_json::json!({
2673                    "status": "error",
2674                    "message": format!(
2675                        "Subscription {} for replay log {} was not found",
2676                        log.subscription_id, log_id
2677                    )
2678                })),
2679            )
2680                .into_response(),
2681        );
2682    };
2683
2684    if subscription.status != crate::webhook_outbound::WebhookSubscriptionStatus::Disabled {
2685        return None;
2686    }
2687
2688    Some(
2689        (
2690            StatusCode::CONFLICT,
2691            Json(serde_json::json!({
2692                "status": "error",
2693                "message": format!(
2694                    "Subscription {} is disabled; re-enable it before replaying log {}",
2695                    log.subscription_id, log_id
2696                )
2697            })),
2698        )
2699            .into_response(),
2700    )
2701}
2702
2703// ── A11y ───────────────────────────────────────────────────────
2704
2705/// `GET <actuator-prefix>/a11y` -- scaffold-level accessibility posture.
2706///
2707/// Returns a JSON object describing which WCAG 2.1 AA scaffold concerns the
2708/// application addresses.  Available in all profiles (like `/actuator/health`).
2709pub(crate) async fn a11y_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2710    State(state): State<S>,
2711) -> Json<A11yPosture> {
2712    Json(state.a11y_posture())
2713}
2714
2715// ── Channels (sensitive) ───────────────────────────────────────
2716
2717/// `GET <actuator-prefix>/channels` -- get current channel snapshots.
2718#[cfg(feature = "ws")]
2719pub(crate) async fn channels_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2720    State(state): State<S>,
2721) -> Json<serde_json::Value> {
2722    let channels = state.channels().snapshot();
2723    Json(serde_json::json!({
2724        "channels": channels,
2725    }))
2726}
2727
2728// ── Tasks Stream (WebSocket) ───────────────────────────────────
2729
2730/// `GET <actuator-prefix>/tasks/stream` -- stream scheduled task events.
2731#[cfg(feature = "ws")]
2732pub(crate) async fn tasks_stream_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2733    State(state): State<S>,
2734    ws: axum::extract::ws::WebSocketUpgrade,
2735) -> impl IntoResponse {
2736    ws.on_upgrade(move |mut socket| async move {
2737        let mut rx = state.channels().subscribe("sys:tasks");
2738        let shutdown = state.shutdown_token();
2739
2740        loop {
2741            tokio::select! {
2742                res = rx.recv() => {
2743                    match res {
2744                        Ok(msg) => {
2745                            let ws_msg = axum::extract::ws::Message::Text(msg.into_string().into());
2746                            if socket.send(ws_msg).await.is_err() {
2747                                break;
2748                            }
2749                        }
2750                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
2751                        Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2752                    }
2753                }
2754                () = shutdown.cancelled() => {
2755                    let _ = socket.send(axum::extract::ws::Message::Close(None)).await;
2756                    break;
2757                }
2758                else => break,
2759            }
2760        }
2761    })
2762}
2763
2764// ── Router builder ──────────────────────────────────────────────
2765
2766pub(crate) fn normalize_actuator_prefix(prefix: &str) -> String {
2767    let trimmed = prefix.trim();
2768    if trimmed.is_empty() || trimmed == "/" {
2769        String::new()
2770    } else {
2771        let trimmed = trimmed.trim_end_matches('/');
2772        if trimmed.starts_with('/') {
2773            trimmed.to_owned()
2774        } else {
2775            format!("/{trimmed}")
2776        }
2777    }
2778}
2779
2780pub(crate) fn actuator_route_glob(prefix: &str) -> String {
2781    let prefix = normalize_actuator_prefix(prefix);
2782    if prefix.is_empty() {
2783        "/*".to_owned()
2784    } else {
2785        format!("{prefix}/*")
2786    }
2787}
2788
2789pub(crate) fn actuator_route_path(prefix: &str, suffix: &str) -> String {
2790    let prefix = normalize_actuator_prefix(prefix);
2791    if prefix.is_empty() {
2792        suffix.to_owned()
2793    } else {
2794        format!("{prefix}{suffix}")
2795    }
2796}
2797
2798pub(crate) fn actuator_endpoint_paths(
2799    prefix: &str,
2800    sensitive: bool,
2801    prometheus_enabled: bool,
2802) -> Vec<String> {
2803    let mut paths = vec![
2804        actuator_route_path(prefix, "/health"),
2805        actuator_route_path(prefix, "/info"),
2806        actuator_route_path(prefix, "/metrics"),
2807        actuator_route_path(prefix, "/a11y"),
2808        actuator_route_path(prefix, "/ui"),
2809        actuator_route_path(prefix, "/ui/metrics"),
2810    ];
2811
2812    if prometheus_enabled {
2813        paths.push(actuator_route_path(prefix, "/prometheus"));
2814    }
2815
2816    if sensitive {
2817        paths.push(actuator_route_path(prefix, "/circuitbreakers"));
2818        paths.push(actuator_route_path(prefix, "/env"));
2819        paths.push(actuator_route_path(prefix, "/configprops"));
2820        paths.push(actuator_route_path(prefix, "/loggers"));
2821        paths.push(actuator_route_path(prefix, "/logfile"));
2822        paths.push(actuator_route_path(prefix, "/tasks"));
2823        paths.push(actuator_route_path(prefix, "/jobs"));
2824        paths.push(actuator_route_path(prefix, "/ui/tasks"));
2825        #[cfg(feature = "http-client")]
2826        {
2827            paths.push(actuator_route_path(prefix, "/webhooks/dlq"));
2828            paths.push(actuator_route_path(prefix, "/webhooks/replay"));
2829        }
2830        #[cfg(feature = "ws")]
2831        {
2832            paths.push(actuator_route_path(prefix, "/channels"));
2833            paths.push(actuator_route_path(prefix, "/tasks/stream"));
2834        }
2835    }
2836
2837    paths
2838}
2839
2840/// Build the actuator router with profile-aware endpoint exposure.
2841///
2842/// In dev mode (or when `actuator.sensitive = true`), all endpoints are
2843/// exposed. In prod mode, only health, info, and metrics are available.
2844///
2845/// The Prometheus scrape endpoint is mounted unconditionally here (independent
2846/// of `sensitive`). The framework router mounts the actuator from configuration
2847/// and gates `/actuator/prometheus` on the `actuator.prometheus` flag.
2848pub fn actuator_router<S: ProvideActuatorState + Send + Sync + Clone + 'static>(
2849    sensitive: bool,
2850) -> axum::Router<S> {
2851    actuator_router_with_prefix("/actuator", sensitive, true)
2852}
2853
2854/// Build the actuator router at a configured prefix.
2855///
2856/// This is the prefix-aware variant used by the framework router.
2857///
2858/// `prometheus_enabled` controls the `/actuator/prometheus` scrape endpoint
2859/// independently of `sensitive`, so platform metrics scraping can be exposed
2860/// without also exposing sensitive actuator surfaces.
2861#[allow(clippy::too_many_lines)]
2862pub(crate) fn actuator_router_with_prefix<
2863    S: ProvideActuatorState + Send + Sync + Clone + 'static,
2864>(
2865    prefix: &str,
2866    sensitive: bool,
2867    prometheus_enabled: bool,
2868) -> axum::Router<S> {
2869    let mut router = axum::Router::new()
2870        .route(
2871            &actuator_route_path(prefix, "/health"),
2872            axum::routing::get(health::<S>),
2873        )
2874        .route(
2875            &actuator_route_path(prefix, "/info"),
2876            axum::routing::get(info::<S>),
2877        )
2878        .route(
2879            &actuator_route_path(prefix, "/metrics"),
2880            axum::routing::get(metrics_endpoint::<S>),
2881        )
2882        .route(
2883            &actuator_route_path(prefix, "/a11y"),
2884            axum::routing::get(a11y_endpoint::<S>),
2885        );
2886
2887    if prometheus_enabled {
2888        router = router.route(
2889            &actuator_route_path(prefix, "/prometheus"),
2890            axum::routing::get(prometheus_endpoint::<S>),
2891        );
2892    }
2893
2894    if sensitive {
2895        router = router
2896            .route(
2897                &actuator_route_path(prefix, "/circuitbreakers"),
2898                axum::routing::get(circuitbreakers_endpoint::<S>),
2899            )
2900            .route(
2901                &actuator_route_path(prefix, "/env"),
2902                axum::routing::get(env_endpoint::<S>),
2903            )
2904            .route(
2905                &actuator_route_path(prefix, "/configprops"),
2906                axum::routing::get(configprops_endpoint::<S>),
2907            )
2908            .route(
2909                &actuator_route_path(prefix, "/loggers"),
2910                axum::routing::get(loggers_get::<S>),
2911            )
2912            .route(
2913                &actuator_route_path(prefix, "/loggers/{name}"),
2914                axum::routing::put(loggers_put::<S>),
2915            )
2916            .route(
2917                &actuator_route_path(prefix, "/logfile"),
2918                axum::routing::get(logfile_endpoint::<S>),
2919            )
2920            .route(
2921                &actuator_route_path(prefix, "/tasks"),
2922                axum::routing::get(tasks_endpoint::<S>),
2923            )
2924            .route(
2925                &actuator_route_path(prefix, "/jobs"),
2926                axum::routing::get(jobs_endpoint::<S>),
2927            )
2928            .route(
2929                &actuator_route_path(prefix, "/ui/tasks"),
2930                axum::routing::get(ui_tasks::<S>),
2931            );
2932        #[cfg(feature = "http-client")]
2933        {
2934            router = router
2935                .route(
2936                    &actuator_route_path(prefix, "/webhooks/dlq"),
2937                    axum::routing::get(webhooks_dlq_endpoint::<S>),
2938                )
2939                .route(
2940                    &actuator_route_path(prefix, "/webhooks/replay"),
2941                    axum::routing::post(webhooks_replay_endpoint::<S>),
2942                );
2943        }
2944
2945        #[cfg(feature = "system-info")]
2946        {
2947            router = router.route(
2948                &actuator_route_path(prefix, "/system"),
2949                axum::routing::get(crate::system_info::system_info_handler),
2950            );
2951        }
2952
2953        #[cfg(feature = "ws")]
2954        {
2955            router = router
2956                .route(
2957                    &actuator_route_path(prefix, "/channels"),
2958                    axum::routing::get(channels_endpoint::<S>),
2959                )
2960                .route(
2961                    &actuator_route_path(prefix, "/tasks/stream"),
2962                    axum::routing::get(tasks_stream_endpoint::<S>),
2963                );
2964        }
2965    }
2966
2967    // Nova: Add HTMX UI endpoints available unconditionally like metrics
2968    router
2969        .route(
2970            &actuator_route_path(prefix, "/ui"),
2971            axum::routing::get(ui_dashboard),
2972        )
2973        .route(
2974            &actuator_route_path(prefix, "/ui/metrics"),
2975            axum::routing::get(ui_metrics::<S>),
2976        )
2977}
2978
2979#[cfg(test)]
2980mod tests {
2981    use super::*;
2982    use crate::config::AutumnConfig;
2983
2984    #[test]
2985    fn task_registry_flow() {
2986        let registry = TaskRegistry::new();
2987
2988        registry.register_scheduled(
2989            "my_task",
2990            "0 * * * * *",
2991            crate::task::TaskCoordination::Fleet,
2992            "mock",
2993            "node-1",
2994        );
2995        let snap1 = registry.snapshot();
2996        assert_eq!(snap1.get("my_task").unwrap().total_runs, 0);
2997
2998        registry.record_leader("my_task", "node-1", "mock_tick");
2999        let snap3 = registry.snapshot();
3000        assert_eq!(
3001            snap3.get("my_task").unwrap().current_leader.as_deref(),
3002            Some("node-1")
3003        );
3004
3005        registry.record_start("my_task");
3006        let snap4 = registry.snapshot();
3007        assert_eq!(snap4.get("my_task").unwrap().status, "running");
3008
3009        registry.record_next_run_at("my_task", "tomorrow");
3010        let snap5 = registry.snapshot();
3011        assert_eq!(
3012            snap5.get("my_task").unwrap().next_run_at.as_deref(),
3013            Some("tomorrow")
3014        );
3015
3016        registry.record_success("my_task", 100);
3017        let snap6 = registry.snapshot();
3018        assert_eq!(snap6.get("my_task").unwrap().total_runs, 1);
3019        assert_eq!(snap6.get("my_task").unwrap().last_error, None);
3020
3021        registry.record_failure("my_task", 150, "error message");
3022        let snap7 = registry.snapshot();
3023        assert_eq!(snap7.get("my_task").unwrap().total_runs, 2);
3024        assert_eq!(snap7.get("my_task").unwrap().total_failures, 1);
3025        assert_eq!(
3026            snap7.get("my_task").unwrap().last_error.as_deref(),
3027            Some("error message")
3028        );
3029
3030        let registry2 = TaskRegistry::default();
3031        assert!(registry2.snapshot().is_empty());
3032    }
3033    #[test]
3034    fn job_registry_flow() {
3035        let registry = JobRegistry::new();
3036
3037        registry.register("my_job");
3038        let snap1 = registry.snapshot();
3039        assert_eq!(snap1.get("my_job").unwrap().queued, 0);
3040
3041        registry.record_enqueue("my_job");
3042        let snap2 = registry.snapshot();
3043        assert_eq!(snap2.get("my_job").unwrap().queued, 1);
3044
3045        registry.record_start("my_job");
3046        let snap3 = registry.snapshot();
3047        assert_eq!(snap3.get("my_job").unwrap().queued, 0);
3048        assert_eq!(snap3.get("my_job").unwrap().in_flight, 1);
3049
3050        registry.record_retry("my_job", "timeout", 1);
3051        let snap4 = registry.snapshot();
3052        assert_eq!(snap4.get("my_job").unwrap().in_flight, 0);
3053        assert_eq!(
3054            snap4.get("my_job").unwrap().last_error.as_deref(),
3055            Some("timeout")
3056        );
3057
3058        registry.record_enqueue("my_job");
3059        registry.record_start("my_job");
3060        registry.record_success("my_job");
3061        let snap5 = registry.snapshot();
3062        assert_eq!(snap5.get("my_job").unwrap().in_flight, 0);
3063        assert_eq!(snap5.get("my_job").unwrap().total_successes, 1);
3064        assert_eq!(snap5.get("my_job").unwrap().last_error, None);
3065
3066        registry.record_enqueue("my_job");
3067        registry.record_cancel("my_job");
3068        let snap6 = registry.snapshot();
3069        assert_eq!(snap6.get("my_job").unwrap().queued, 0);
3070        assert_eq!(snap6.get("my_job").unwrap().in_flight, 0);
3071
3072        registry.record_enqueue("my_job");
3073        registry.record_start("my_job");
3074        registry.record_failure("my_job", "failure".to_string(), true);
3075        let snap7 = registry.snapshot();
3076        assert_eq!(snap7.get("my_job").unwrap().in_flight, 0);
3077        assert_eq!(snap7.get("my_job").unwrap().total_failures, 1);
3078        assert_eq!(snap7.get("my_job").unwrap().dead_letters, 1);
3079        assert_eq!(
3080            snap7.get("my_job").unwrap().last_error.as_deref(),
3081            Some("failure")
3082        );
3083
3084        let registry2 = JobRegistry::default();
3085        let snap8 = registry2.snapshot();
3086        assert!(snap8.is_empty());
3087    }
3088    use axum::body::Body;
3089    use axum::http::Request;
3090    use tower::ServiceExt;
3091
3092    #[derive(Clone)]
3093    struct TestActuatorState {
3094        profile: String,
3095        deploy_version: String,
3096        metrics: crate::middleware::MetricsCollector,
3097        log_levels: LogLevels,
3098        task_registry: TaskRegistry,
3099        job_registry: JobRegistry,
3100        config_props: ConfigProperties,
3101        metrics_source_registry: MetricsSourceRegistry,
3102        health_indicator_registry: HealthIndicatorRegistry,
3103        health_detailed: bool,
3104        log_buffer: Option<crate::log::capture::LogBuffer>,
3105        #[cfg(feature = "http-client")]
3106        webhook_outbound: Option<crate::webhook_outbound::WebhookOutboundManager>,
3107        #[cfg(feature = "db")]
3108        pool: Option<
3109            diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
3110        >,
3111        #[cfg(feature = "ws")]
3112        channels: crate::channels::Channels,
3113        #[cfg(feature = "ws")]
3114        shutdown: tokio_util::sync::CancellationToken,
3115    }
3116
3117    impl ProvideActuatorState for TestActuatorState {
3118        fn metrics(&self) -> &crate::middleware::MetricsCollector {
3119            &self.metrics
3120        }
3121        fn log_levels(&self) -> &LogLevels {
3122            &self.log_levels
3123        }
3124        fn task_registry(&self) -> &TaskRegistry {
3125            &self.task_registry
3126        }
3127        fn job_registry(&self) -> &JobRegistry {
3128            &self.job_registry
3129        }
3130        fn config_props(&self) -> &ConfigProperties {
3131            &self.config_props
3132        }
3133        fn profile(&self) -> &str {
3134            &self.profile
3135        }
3136        fn uptime_display(&self) -> String {
3137            "test_uptime".to_string()
3138        }
3139        fn deploy_version(&self) -> String {
3140            self.deploy_version.clone()
3141        }
3142        fn metrics_source_registry(&self) -> Option<&MetricsSourceRegistry> {
3143            Some(&self.metrics_source_registry)
3144        }
3145        #[cfg(feature = "http-client")]
3146        fn webhook_outbound(&self) -> Option<crate::webhook_outbound::WebhookOutboundManager> {
3147            self.webhook_outbound.clone()
3148        }
3149        #[cfg(feature = "db")]
3150        fn pool(
3151            &self,
3152        ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
3153        {
3154            self.pool.as_ref()
3155        }
3156        #[cfg(feature = "ws")]
3157        fn channels(&self) -> &crate::channels::Channels {
3158            &self.channels
3159        }
3160        #[cfg(feature = "ws")]
3161        fn shutdown_token(&self) -> tokio_util::sync::CancellationToken {
3162            self.shutdown.clone()
3163        }
3164        fn health_indicator_registry(&self) -> Option<&HealthIndicatorRegistry> {
3165            Some(&self.health_indicator_registry)
3166        }
3167        fn health_detailed(&self) -> bool {
3168            self.health_detailed
3169        }
3170        fn log_buffer(&self) -> Option<crate::log::capture::LogBuffer> {
3171            self.log_buffer.clone()
3172        }
3173    }
3174
3175    fn test_state() -> TestActuatorState {
3176        test_state_with_config(&AutumnConfig::default())
3177    }
3178
3179    fn test_state_with_config(config: &AutumnConfig) -> TestActuatorState {
3180        TestActuatorState {
3181            profile: config.profile.clone().unwrap_or_else(|| "dev".into()),
3182            deploy_version: crate::canary::STABLE.to_owned(),
3183            metrics: crate::middleware::MetricsCollector::new(),
3184            log_levels: LogLevels::new("info"),
3185            task_registry: TaskRegistry::new(),
3186            job_registry: JobRegistry::new(),
3187            config_props: ConfigProperties::from_config(config),
3188            metrics_source_registry: MetricsSourceRegistry::new(),
3189            health_indicator_registry: HealthIndicatorRegistry::new(),
3190            health_detailed: config.health.detailed,
3191            log_buffer: None,
3192            #[cfg(feature = "http-client")]
3193            webhook_outbound: None,
3194            #[cfg(feature = "db")]
3195            pool: None,
3196            #[cfg(feature = "ws")]
3197            channels: crate::channels::Channels::new(32),
3198            #[cfg(feature = "ws")]
3199            shutdown: tokio_util::sync::CancellationToken::new(),
3200        }
3201    }
3202
3203    #[cfg(feature = "http-client")]
3204    fn test_state_with_webhook_outbound(
3205        manager: crate::webhook_outbound::WebhookOutboundManager,
3206    ) -> TestActuatorState {
3207        let mut state = test_state();
3208        state.webhook_outbound = Some(manager);
3209        state
3210    }
3211
3212    #[cfg(feature = "http-client")]
3213    fn replay_test_subscription() -> crate::webhook_outbound::WebhookSubscription {
3214        crate::webhook_outbound::WebhookSubscription {
3215            id: "sub-replay".to_string(),
3216            target_url: "https://example.test/webhook".to_string(),
3217            event_topics: vec!["order.created".to_string()],
3218            secret: "secret".to_string(),
3219            status: crate::webhook_outbound::WebhookSubscriptionStatus::Failed,
3220            consecutive_failures: 50,
3221        }
3222    }
3223
3224    #[cfg(feature = "http-client")]
3225    fn replay_test_dlq_log() -> crate::webhook_outbound::WebhookDeliveryLog {
3226        crate::webhook_outbound::WebhookDeliveryLog {
3227            id: "log-replay".to_string(),
3228            subscription_id: "sub-replay".to_string(),
3229            topic: "order.created".to_string(),
3230            payload: "{\"id\":123}".to_string(),
3231            request_headers: std::collections::HashMap::new(),
3232            response_status: Some(503),
3233            response_body: Some("unavailable".to_string()),
3234            elapsed_ms: 42,
3235            attempt: 5,
3236            max_attempts: 5,
3237            is_dlq: true,
3238            last_error: Some("server returned status: 503".to_string()),
3239            timestamp: chrono::Utc::now(),
3240        }
3241    }
3242
3243    #[cfg(feature = "http-client")]
3244    #[tokio::test]
3245    async fn webhooks_replay_preserves_dlq_log_and_failures_when_enqueue_is_unavailable() {
3246        use crate::webhook_outbound::{
3247            InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3248        };
3249
3250        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3251        crate::job::clear_global_job_client();
3252
3253        let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3254        handler
3255            .create_subscription(replay_test_subscription())
3256            .await
3257            .expect("subscription setup");
3258        let original_log = replay_test_dlq_log();
3259        handler
3260            .log_delivery(original_log.clone())
3261            .await
3262            .expect("dlq log setup");
3263        let failures_before_replay = handler
3264            .get_subscription("sub-replay")
3265            .await
3266            .expect("subscription lookup")
3267            .expect("subscription should exist")
3268            .consecutive_failures;
3269
3270        let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3271        let response = webhooks_replay_endpoint(
3272            State(state),
3273            Json(ReplayRequest {
3274                log_id: original_log.id.clone(),
3275            }),
3276        )
3277        .await
3278        .into_response();
3279
3280        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
3281
3282        let stored_log = handler
3283            .get_delivery_log(&original_log.id)
3284            .await
3285            .expect("delivery log lookup")
3286            .expect("delivery log should still exist");
3287        assert!(stored_log.is_dlq, "failed enqueue must keep log in DLQ");
3288        assert_eq!(stored_log.attempt, original_log.attempt);
3289        assert_eq!(stored_log.last_error, original_log.last_error);
3290        assert_eq!(stored_log.response_status, original_log.response_status);
3291        assert_eq!(stored_log.response_body, original_log.response_body);
3292
3293        let subscription = handler
3294            .get_subscription("sub-replay")
3295            .await
3296            .expect("subscription lookup")
3297            .expect("subscription should exist");
3298        assert_eq!(
3299            subscription.consecutive_failures, failures_before_replay,
3300            "failed enqueue must not reset subscription failure history"
3301        );
3302        assert_eq!(
3303            subscription.status,
3304            crate::webhook_outbound::WebhookSubscriptionStatus::Failed,
3305            "failed enqueue must not reactivate an auto-failed subscription"
3306        );
3307
3308        crate::job::clear_global_job_client();
3309    }
3310
3311    #[cfg(feature = "http-client")]
3312    #[tokio::test]
3313    async fn webhooks_replay_rejects_disabled_subscription_without_removing_dlq() {
3314        use crate::webhook_outbound::{
3315            InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3316            WebhookSubscriptionStatus,
3317        };
3318
3319        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3320        crate::job::clear_global_job_client();
3321
3322        let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3323        let mut subscription = replay_test_subscription();
3324        subscription.status = WebhookSubscriptionStatus::Disabled;
3325        subscription.consecutive_failures = 0;
3326        handler
3327            .create_subscription(subscription)
3328            .await
3329            .expect("subscription setup");
3330        let original_log = replay_test_dlq_log();
3331        handler
3332            .log_delivery(original_log.clone())
3333            .await
3334            .expect("dlq log setup");
3335
3336        let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3337        let response = webhooks_replay_endpoint(
3338            State(state),
3339            Json(ReplayRequest {
3340                log_id: original_log.id.clone(),
3341            }),
3342        )
3343        .await
3344        .into_response();
3345
3346        assert_eq!(response.status(), StatusCode::CONFLICT);
3347
3348        let stored_log = handler
3349            .get_delivery_log(&original_log.id)
3350            .await
3351            .expect("delivery log lookup")
3352            .expect("delivery log should still exist");
3353        assert!(stored_log.is_dlq);
3354        assert_eq!(stored_log.attempt, original_log.attempt);
3355        assert_eq!(stored_log.response_status, original_log.response_status);
3356        assert_eq!(stored_log.last_error, original_log.last_error);
3357
3358        let subscription = handler
3359            .get_subscription("sub-replay")
3360            .await
3361            .expect("subscription lookup")
3362            .expect("subscription should exist");
3363        assert_eq!(subscription.status, WebhookSubscriptionStatus::Disabled);
3364
3365        crate::job::clear_global_job_client();
3366    }
3367
3368    #[cfg(feature = "http-client")]
3369    #[tokio::test]
3370    async fn webhooks_replay_rejects_missing_subscription_without_removing_dlq() {
3371        use crate::webhook_outbound::{
3372            InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3373        };
3374
3375        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3376        crate::job::clear_global_job_client();
3377
3378        let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3379        let original_log = replay_test_dlq_log();
3380        handler
3381            .log_delivery(original_log.clone())
3382            .await
3383            .expect("dlq log setup");
3384
3385        let runtime_state = crate::AppState::for_test().with_profile("test");
3386        let shutdown = tokio_util::sync::CancellationToken::new();
3387        crate::job::start_runtime(
3388            vec![crate::job::JobInfo {
3389                name: "autumn_webhook_delivery".to_string(),
3390                max_attempts: 1,
3391                initial_backoff_ms: 1,
3392                uniqueness: None,
3393                concurrency: None,
3394                handler: |_state, _payload| Box::pin(async move { Ok(()) }),
3395            }],
3396            &runtime_state,
3397            &shutdown,
3398            &crate::config::JobConfig::default(),
3399        )
3400        .expect("job runtime should start");
3401
3402        let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3403        let response = webhooks_replay_endpoint(
3404            State(state),
3405            Json(ReplayRequest {
3406                log_id: original_log.id.clone(),
3407            }),
3408        )
3409        .await
3410        .into_response();
3411
3412        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3413
3414        let stored_log = handler
3415            .get_delivery_log(&original_log.id)
3416            .await
3417            .expect("delivery log lookup")
3418            .expect("delivery log should still exist");
3419        assert!(stored_log.is_dlq);
3420        assert_eq!(stored_log.attempt, original_log.attempt);
3421        assert_eq!(stored_log.response_status, original_log.response_status);
3422        assert_eq!(stored_log.response_body, original_log.response_body);
3423        assert_eq!(stored_log.last_error, original_log.last_error);
3424
3425        assert!(
3426            handler
3427                .get_subscription("sub-replay")
3428                .await
3429                .expect("subscription lookup")
3430                .is_none(),
3431            "test setup should leave the subscription missing"
3432        );
3433
3434        shutdown.cancel();
3435        crate::job::clear_global_job_client();
3436    }
3437
3438    #[cfg(feature = "http-client")]
3439    #[tokio::test]
3440    async fn webhooks_replay_resets_log_and_failures_after_enqueue_succeeds() {
3441        use crate::webhook_outbound::{
3442            InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3443        };
3444
3445        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3446        crate::job::clear_global_job_client();
3447
3448        let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3449        handler
3450            .create_subscription(replay_test_subscription())
3451            .await
3452            .expect("subscription setup");
3453        let original_log = replay_test_dlq_log();
3454        handler
3455            .log_delivery(original_log.clone())
3456            .await
3457            .expect("dlq log setup");
3458
3459        let runtime_state = crate::AppState::for_test().with_profile("test");
3460        let shutdown = tokio_util::sync::CancellationToken::new();
3461        crate::job::start_runtime(
3462            vec![crate::job::JobInfo {
3463                name: "autumn_webhook_delivery".to_string(),
3464                max_attempts: 1,
3465                initial_backoff_ms: 1,
3466                uniqueness: None,
3467                concurrency: None,
3468                handler: |_state, _payload| Box::pin(async move { Ok(()) }),
3469            }],
3470            &runtime_state,
3471            &shutdown,
3472            &crate::config::JobConfig::default(),
3473        )
3474        .expect("job runtime should start");
3475
3476        let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3477        let response = webhooks_replay_endpoint(
3478            State(state),
3479            Json(ReplayRequest {
3480                log_id: original_log.id.clone(),
3481            }),
3482        )
3483        .await
3484        .into_response();
3485
3486        assert_eq!(response.status(), StatusCode::OK);
3487
3488        let stored_log = handler
3489            .get_delivery_log(&original_log.id)
3490            .await
3491            .expect("delivery log lookup")
3492            .expect("delivery log should still exist");
3493        assert!(!stored_log.is_dlq);
3494        assert_eq!(stored_log.attempt, 1);
3495        assert_eq!(stored_log.last_error, None);
3496        assert_eq!(stored_log.response_status, None);
3497        assert_eq!(stored_log.response_body, None);
3498
3499        let subscription = handler
3500            .get_subscription("sub-replay")
3501            .await
3502            .expect("subscription lookup")
3503            .expect("subscription should exist");
3504        assert_eq!(subscription.consecutive_failures, 0);
3505        assert_eq!(
3506            subscription.status,
3507            crate::webhook_outbound::WebhookSubscriptionStatus::Active
3508        );
3509
3510        shutdown.cancel();
3511        crate::job::clear_global_job_client();
3512    }
3513
3514    #[tokio::test]
3515    async fn actuator_health_returns_ok() {
3516        let app = actuator_router(true).with_state(test_state());
3517        let resp = app
3518            .oneshot(
3519                Request::builder()
3520                    .uri("/actuator/health")
3521                    .body(Body::empty())
3522                    .unwrap(),
3523            )
3524            .await
3525            .unwrap();
3526
3527        assert_eq!(resp.status(), StatusCode::OK);
3528        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3529            .await
3530            .unwrap();
3531        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3532        assert_eq!(json["status"], "UP");
3533        assert_eq!(json["profile"], "dev");
3534        assert!(json["uptime"].is_string());
3535    }
3536
3537    #[cfg(feature = "db")]
3538    #[tokio::test]
3539    async fn actuator_health_exposes_after_commit_failure_counter() {
3540        let app = actuator_router(true).with_state(test_state());
3541        let resp = app
3542            .oneshot(
3543                Request::builder()
3544                    .uri("/actuator/health")
3545                    .body(Body::empty())
3546                    .unwrap(),
3547            )
3548            .await
3549            .unwrap();
3550
3551        assert_eq!(resp.status(), StatusCode::OK);
3552        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3553            .await
3554            .unwrap();
3555        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3556        assert_eq!(
3557            json["autumn_after_commit_failures_total"],
3558            crate::db::AFTER_COMMIT_FAILURES_TOTAL.load(std::sync::atomic::Ordering::Relaxed),
3559            "/actuator/health should expose the documented after_commit counter"
3560        );
3561    }
3562
3563    #[tokio::test]
3564    #[allow(clippy::await_holding_lock)]
3565    async fn actuator_circuitbreakers_returns_breakers() {
3566        let _lock = crate::circuit_breaker::TEST_LOCK
3567            .lock()
3568            .unwrap_or_else(std::sync::PoisonError::into_inner);
3569        crate::circuit_breaker::global_registry().clear();
3570        let breaker = crate::circuit_breaker::global_registry().get_or_create(
3571            "actuator_endpoint_test_breaker",
3572            crate::circuit_breaker::CircuitBreakerPolicy {
3573                failure_ratio_threshold: 0.5,
3574                sample_window: std::time::Duration::from_secs(10),
3575                minimum_sample_count: 2,
3576                open_duration: std::time::Duration::from_secs(60),
3577                half_open_trial_count: 2,
3578            },
3579        );
3580        assert_eq!(
3581            breaker.state(),
3582            crate::circuit_breaker::CircuitState::Closed
3583        );
3584
3585        let mut detailed_config = AutumnConfig::default();
3586        detailed_config.health.detailed = true;
3587        let state = test_state_with_config(&detailed_config);
3588        let app = actuator_router(true).with_state(state);
3589        let resp = app
3590            .clone()
3591            .oneshot(
3592                Request::builder()
3593                    .uri("/actuator/circuitbreakers")
3594                    .body(Body::empty())
3595                    .unwrap(),
3596            )
3597            .await
3598            .unwrap();
3599
3600        assert_eq!(resp.status(), StatusCode::OK);
3601        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3602            .await
3603            .unwrap();
3604        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3605        let list = json.as_array().expect("Should be a JSON array");
3606        let item = list
3607            .iter()
3608            .find(|i| i["name"] == "actuator_endpoint_test_breaker")
3609            .expect("Should find our breaker");
3610        assert_eq!(item["state"], "CLOSED");
3611        assert_eq!(item["failure_ratio_threshold"], 0.5);
3612        assert_eq!(item["minimum_sample_count"], 2);
3613
3614        let mut undetailed_config = AutumnConfig::default();
3615        undetailed_config.health.detailed = false;
3616        let undetailed_state = test_state_with_config(&undetailed_config);
3617        let app_undetailed = actuator_router(true).with_state(undetailed_state);
3618        let resp_undetailed = app_undetailed
3619            .oneshot(
3620                Request::builder()
3621                    .uri("/actuator/circuitbreakers")
3622                    .body(Body::empty())
3623                    .unwrap(),
3624            )
3625            .await
3626            .unwrap();
3627
3628        assert_eq!(resp_undetailed.status(), StatusCode::OK);
3629        let body_undetailed = axum::body::to_bytes(resp_undetailed.into_body(), usize::MAX)
3630            .await
3631            .unwrap();
3632        let json_undetailed: serde_json::Value = serde_json::from_slice(&body_undetailed).unwrap();
3633        let list_undetailed = json_undetailed.as_array().expect("Should be a JSON array");
3634        let item_undetailed = list_undetailed
3635            .iter()
3636            .find(|i| i["name"] == "actuator_endpoint_test_breaker")
3637            .expect("Should find our breaker");
3638        assert_eq!(item_undetailed["state"], "CLOSED");
3639        assert!(item_undetailed.get("failure_ratio_threshold").is_none());
3640        assert!(item_undetailed.get("minimum_sample_count").is_none());
3641        crate::circuit_breaker::global_registry().clear();
3642    }
3643
3644    #[tokio::test]
3645    #[allow(clippy::await_holding_lock)]
3646    async fn test_health_hides_circuit_breakers_when_undetailed() {
3647        let _lock = crate::circuit_breaker::TEST_LOCK
3648            .lock()
3649            .unwrap_or_else(std::sync::PoisonError::into_inner);
3650        crate::circuit_breaker::global_registry().clear();
3651
3652        let _breaker = crate::circuit_breaker::global_registry().get_or_create(
3653            "test_health_hide_breaker",
3654            crate::circuit_breaker::CircuitBreakerPolicy {
3655                failure_ratio_threshold: 0.5,
3656                sample_window: std::time::Duration::from_secs(10),
3657                minimum_sample_count: 2,
3658                open_duration: std::time::Duration::from_secs(60),
3659                half_open_trial_count: 2,
3660            },
3661        );
3662
3663        let mut detailed_config = AutumnConfig::default();
3664        detailed_config.health.detailed = true;
3665        let state = test_state_with_config(&detailed_config);
3666        let app = actuator_router(true).with_state(state);
3667        let resp = app
3668            .oneshot(
3669                Request::builder()
3670                    .uri("/actuator/health")
3671                    .body(Body::empty())
3672                    .unwrap(),
3673            )
3674            .await
3675            .unwrap();
3676
3677        assert_eq!(resp.status(), StatusCode::OK);
3678        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3679            .await
3680            .unwrap();
3681        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3682        assert!(json["components"]["circuit_breaker.test_health_hide_breaker"].is_object());
3683
3684        let mut undetailed_config = AutumnConfig::default();
3685        undetailed_config.health.detailed = false;
3686        let undetailed_state = test_state_with_config(&undetailed_config);
3687        let app_undetailed = actuator_router(true).with_state(undetailed_state);
3688        let resp_undetailed = app_undetailed
3689            .oneshot(
3690                Request::builder()
3691                    .uri("/actuator/health")
3692                    .body(Body::empty())
3693                    .unwrap(),
3694            )
3695            .await
3696            .unwrap();
3697
3698        assert_eq!(resp_undetailed.status(), StatusCode::OK);
3699        let body_undetailed = axum::body::to_bytes(resp_undetailed.into_body(), usize::MAX)
3700            .await
3701            .unwrap();
3702        let json_undetailed: serde_json::Value = serde_json::from_slice(&body_undetailed).unwrap();
3703
3704        if let Some(components) = json_undetailed.get("components") {
3705            assert!(
3706                components
3707                    .get("circuit_breaker.test_health_hide_breaker")
3708                    .is_none()
3709            );
3710        }
3711
3712        crate::circuit_breaker::global_registry().clear();
3713    }
3714
3715    #[tokio::test]
3716    async fn actuator_routes_respect_custom_prefix() {
3717        let app = actuator_router_with_prefix("/ops", true, true).with_state(test_state());
3718
3719        let prefixed = app
3720            .clone()
3721            .oneshot(
3722                Request::builder()
3723                    .uri("/ops/health")
3724                    .body(Body::empty())
3725                    .unwrap(),
3726            )
3727            .await
3728            .unwrap();
3729        assert_eq!(prefixed.status(), StatusCode::OK);
3730
3731        let legacy = app
3732            .oneshot(
3733                Request::builder()
3734                    .uri("/actuator/health")
3735                    .body(Body::empty())
3736                    .unwrap(),
3737            )
3738            .await
3739            .unwrap();
3740        assert_eq!(legacy.status(), StatusCode::NOT_FOUND);
3741    }
3742
3743    #[test]
3744    fn actuator_route_helpers_normalize_prefixes() {
3745        assert_eq!(actuator_route_glob("ops/"), "/ops/*");
3746        assert_eq!(actuator_route_path("ops/", "/health"), "/ops/health");
3747        assert_eq!(actuator_route_glob("/"), "/*");
3748    }
3749
3750    #[tokio::test]
3751    async fn actuator_info_returns_metadata() {
3752        let app = actuator_router(true).with_state(test_state());
3753        let resp = app
3754            .oneshot(
3755                Request::builder()
3756                    .uri("/actuator/info")
3757                    .body(Body::empty())
3758                    .unwrap(),
3759            )
3760            .await
3761            .unwrap();
3762
3763        assert_eq!(resp.status(), StatusCode::OK);
3764        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3765            .await
3766            .unwrap();
3767        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3768        assert!(json["autumn"]["version"].is_string());
3769        assert_eq!(json["autumn"]["profile"], "dev");
3770    }
3771
3772    #[tokio::test]
3773    async fn actuator_env_available_in_sensitive_mode() {
3774        let config = AutumnConfig {
3775            profile: Some("prod".into()),
3776            server: crate::config::ServerConfig {
3777                port: 4100,
3778                ..crate::config::ServerConfig::default()
3779            },
3780            telemetry: crate::config::TelemetryConfig {
3781                enabled: true,
3782                service_name: "cloud-app".into(),
3783                ..crate::config::TelemetryConfig::default()
3784            },
3785            health: crate::config::HealthConfig {
3786                path: "/healthz".into(),
3787                ..crate::config::HealthConfig::default()
3788            },
3789            ..AutumnConfig::default()
3790        };
3791
3792        let app = actuator_router(true).with_state(test_state_with_config(&config));
3793        let resp = app
3794            .oneshot(
3795                Request::builder()
3796                    .uri("/actuator/env")
3797                    .body(Body::empty())
3798                    .unwrap(),
3799            )
3800            .await
3801            .unwrap();
3802        assert_eq!(resp.status(), StatusCode::OK);
3803        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3804            .await
3805            .unwrap();
3806        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3807        assert_eq!(json["active_profile"], "prod");
3808        assert_eq!(json["properties"]["server.port"], "4100");
3809        assert_eq!(json["properties"]["telemetry.enabled"], "true");
3810        assert_eq!(json["properties"]["telemetry.service_name"], "cloud-app");
3811        assert_eq!(json["properties"]["health.path"], "/healthz");
3812    }
3813
3814    #[tokio::test]
3815    async fn actuator_env_hidden_in_nonsensitive_mode() {
3816        let app = actuator_router(false).with_state(test_state());
3817        let resp = app
3818            .oneshot(
3819                Request::builder()
3820                    .uri("/actuator/env")
3821                    .body(Body::empty())
3822                    .unwrap(),
3823            )
3824            .await
3825            .unwrap();
3826        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3827    }
3828
3829    #[tokio::test]
3830    async fn actuator_circuitbreakers_hidden_in_nonsensitive_mode() {
3831        let app = actuator_router(false).with_state(test_state());
3832        let resp = app
3833            .oneshot(
3834                Request::builder()
3835                    .uri("/actuator/circuitbreakers")
3836                    .body(Body::empty())
3837                    .unwrap(),
3838            )
3839            .await
3840            .unwrap();
3841        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3842    }
3843
3844    #[test]
3845    fn redaction_patterns() {
3846        assert!(should_redact("database.url"));
3847        assert!(should_redact("api_token"));
3848        assert!(should_redact("secret_key"));
3849        assert!(!should_redact("server.port"));
3850        assert!(!should_redact("log.level"));
3851    }
3852
3853    // ── Metrics endpoint tests ─────────────────────────────────
3854
3855    #[tokio::test]
3856    async fn actuator_metrics_returns_http_stats() {
3857        let state = test_state();
3858        state.metrics().record("GET", "/test", 200, 10);
3859        state.metrics().record("POST", "/test", 500, 50);
3860
3861        let app = actuator_router(true).with_state(state);
3862        let resp = app
3863            .oneshot(
3864                Request::builder()
3865                    .uri("/actuator/metrics")
3866                    .body(Body::empty())
3867                    .unwrap(),
3868            )
3869            .await
3870            .unwrap();
3871
3872        assert_eq!(resp.status(), StatusCode::OK);
3873        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3874            .await
3875            .unwrap();
3876        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3877        assert_eq!(json["http"]["requests_total"], 2);
3878        assert_eq!(json["http"]["by_status"]["2xx"], 1);
3879        assert_eq!(json["http"]["by_status"]["5xx"], 1);
3880    }
3881
3882    #[tokio::test]
3883    async fn actuator_metrics_available_in_nonsensitive_mode() {
3884        let app = actuator_router(false).with_state(test_state());
3885        let resp = app
3886            .oneshot(
3887                Request::builder()
3888                    .uri("/actuator/metrics")
3889                    .body(Body::empty())
3890                    .unwrap(),
3891            )
3892            .await
3893            .unwrap();
3894        assert_eq!(resp.status(), StatusCode::OK);
3895    }
3896
3897    #[tokio::test]
3898    #[cfg(feature = "db")]
3899    async fn actuator_metrics_returns_db_stats_when_pool_present() {
3900        use diesel_async::AsyncPgConnection;
3901        use diesel_async::pooled_connection::AsyncDieselConnectionManager;
3902        use diesel_async::pooled_connection::deadpool::Pool;
3903
3904        let mut state = test_state();
3905
3906        let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(
3907            "postgres://postgres:postgres@localhost:5432/postgres",
3908        );
3909        let pool = Pool::builder(manager).build().unwrap();
3910
3911        state.pool = Some(pool);
3912
3913        let app = actuator_router(true).with_state(state);
3914        let resp = app
3915            .oneshot(
3916                Request::builder()
3917                    .uri("/actuator/metrics")
3918                    .body(Body::empty())
3919                    .unwrap(),
3920            )
3921            .await
3922            .unwrap();
3923
3924        assert_eq!(resp.status(), StatusCode::OK);
3925        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3926            .await
3927            .unwrap();
3928        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3929
3930        assert!(json.get("database").is_some());
3931    }
3932
3933    // ── Config properties endpoint tests ───────────────────────
3934
3935    #[tokio::test]
3936    async fn actuator_configprops_returns_properties() {
3937        let app = actuator_router(true).with_state(test_state());
3938        let resp = app
3939            .oneshot(
3940                Request::builder()
3941                    .uri("/actuator/configprops")
3942                    .body(Body::empty())
3943                    .unwrap(),
3944            )
3945            .await
3946            .unwrap();
3947
3948        assert_eq!(resp.status(), StatusCode::OK);
3949        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3950            .await
3951            .unwrap();
3952        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3953        assert_eq!(json["active_profile"], "dev");
3954        assert!(json["properties"].is_object());
3955    }
3956
3957    #[tokio::test]
3958    async fn actuator_configprops_hidden_in_nonsensitive_mode() {
3959        let app = actuator_router(false).with_state(test_state());
3960        let resp = app
3961            .oneshot(
3962                Request::builder()
3963                    .uri("/actuator/configprops")
3964                    .body(Body::empty())
3965                    .unwrap(),
3966            )
3967            .await
3968            .unwrap();
3969        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3970    }
3971
3972    #[test]
3973    fn configprops_redacts_sensitive_values() {
3974        let mut props = HashMap::new();
3975        ConfigProperties::track_property(
3976            &mut props,
3977            "database.url",
3978            "postgres://user:pass@host/db",
3979            "",
3980            "dev",
3981        );
3982        assert_eq!(props["database.url"].value, "****");
3983    }
3984
3985    #[test]
3986    fn configprops_tracks_default_source() {
3987        let mut props = HashMap::new();
3988        ConfigProperties::track_property(&mut props, "server.port", "3000", "3000", "dev");
3989        assert_eq!(props["server.port"].source, "default");
3990        assert_eq!(props["server.port"].value, "3000");
3991    }
3992
3993    #[test]
3994    fn configprops_tracks_profile_source() {
3995        let mut props = HashMap::new();
3996        ConfigProperties::track_property(&mut props, "log.level", "debug", "info", "dev");
3997        assert_eq!(props["log.level"].source, "profile_default:dev");
3998    }
3999
4000    // ── Loggers endpoint tests ─────────────────────────────────
4001
4002    #[tokio::test]
4003    async fn actuator_loggers_get_returns_levels() {
4004        let app = actuator_router(true).with_state(test_state());
4005        let resp = app
4006            .oneshot(
4007                Request::builder()
4008                    .uri("/actuator/loggers")
4009                    .body(Body::empty())
4010                    .unwrap(),
4011            )
4012            .await
4013            .unwrap();
4014
4015        assert_eq!(resp.status(), StatusCode::OK);
4016        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4017            .await
4018            .unwrap();
4019        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4020        assert_eq!(json["current_level"], "info");
4021        assert!(json["available_levels"].is_array());
4022    }
4023
4024    #[tokio::test]
4025    async fn actuator_loggers_put_changes_level() {
4026        let state = test_state();
4027        let app = actuator_router(true).with_state(state.clone());
4028        let resp = app
4029            .oneshot(
4030                Request::builder()
4031                    .method("PUT")
4032                    .uri("/actuator/loggers/autumn_web")
4033                    .header("content-type", "application/json")
4034                    .body(Body::from(r#"{"level": "debug"}"#))
4035                    .unwrap(),
4036            )
4037            .await
4038            .unwrap();
4039
4040        assert_eq!(resp.status(), StatusCode::OK);
4041        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4042            .await
4043            .unwrap();
4044        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4045        assert_eq!(json["status"], "ok");
4046        assert_eq!(json["message"], "Logger 'autumn_web' set to 'debug'");
4047
4048        let overrides = state.log_levels().logger_overrides();
4049        assert_eq!(
4050            overrides.get("autumn_web").map(String::as_str),
4051            Some("debug")
4052        );
4053    }
4054
4055    #[tokio::test]
4056    async fn actuator_loggers_put_rejects_invalid_level() {
4057        let app = actuator_router(true).with_state(test_state());
4058        let resp = app
4059            .oneshot(
4060                Request::builder()
4061                    .method("PUT")
4062                    .uri("/actuator/loggers/autumn_web")
4063                    .header("content-type", "application/json")
4064                    .body(Body::from(r#"{"level": "banana"}"#))
4065                    .unwrap(),
4066            )
4067            .await
4068            .unwrap();
4069
4070        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
4071        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4072            .await
4073            .unwrap();
4074        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4075        assert_eq!(json["status"], "error");
4076    }
4077
4078    #[tokio::test]
4079    async fn actuator_loggers_hidden_in_nonsensitive_mode() {
4080        let app = actuator_router(false).with_state(test_state());
4081        let resp = app
4082            .oneshot(
4083                Request::builder()
4084                    .uri("/actuator/loggers")
4085                    .body(Body::empty())
4086                    .unwrap(),
4087            )
4088            .await
4089            .unwrap();
4090        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4091    }
4092
4093    #[test]
4094    fn log_levels_set_and_get() {
4095        let levels = LogLevels::new("info");
4096        assert_eq!(levels.current_level(), "info");
4097
4098        let _ = levels.set_logger_level("my_crate", "debug");
4099        let overrides = levels.logger_overrides();
4100        assert_eq!(overrides.get("my_crate").map(String::as_str), Some("debug"));
4101    }
4102
4103    #[test]
4104    fn log_levels_root_updates_current() {
4105        let levels = LogLevels::new("info");
4106        let prev = levels.set_logger_level("root", "trace");
4107        assert_eq!(prev, Some("info".to_string()));
4108        assert_eq!(levels.current_level(), "trace");
4109    }
4110
4111    // ── Prometheus endpoint tests ──────────────────────────────
4112
4113    #[tokio::test]
4114    async fn actuator_prometheus_returns_metrics() {
4115        let state = test_state();
4116        state.metrics().record("GET", "/test", 200, 10);
4117        state.metrics().record("POST", "/test", 500, 50);
4118
4119        let app = actuator_router(true).with_state(state);
4120        let resp = app
4121            .oneshot(
4122                Request::builder()
4123                    .uri("/actuator/prometheus")
4124                    .body(Body::empty())
4125                    .unwrap(),
4126            )
4127            .await
4128            .unwrap();
4129
4130        assert_eq!(resp.status(), StatusCode::OK);
4131        assert_eq!(
4132            resp.headers().get("content-type").unwrap(),
4133            "text/plain; version=0.0.4"
4134        );
4135
4136        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4137            .await
4138            .unwrap();
4139        let text = String::from_utf8(body.to_vec()).unwrap();
4140
4141        assert!(text.contains("# HELP autumn_http_requests_total Total number of HTTP requests"));
4142        assert!(text.contains("# TYPE autumn_http_requests_total counter"));
4143        assert!(text.contains("autumn_http_requests_total{version=\"stable\"} 2"));
4144
4145        assert!(text.contains("autumn_http_requests_active{version=\"stable\"} "));
4146        assert!(text.contains("autumn_http_responses_total{version=\"stable\",status=\"2xx\"} 1"));
4147        assert!(text.contains("autumn_http_responses_total{version=\"stable\",status=\"5xx\"} 1"));
4148
4149        // Latency percentiles are exposed in seconds, labelled by version.
4150        assert!(text.contains("# TYPE autumn_http_request_duration_seconds summary"));
4151        assert!(text.contains(
4152            "autumn_http_request_duration_seconds{version=\"stable\",quantile=\"0.99\"}"
4153        ));
4154
4155        assert!(text.contains(
4156            "autumn_http_route_requests_total{version=\"stable\",method=\"GET\",route=\"/test\"} 1"
4157        ));
4158        assert!(text.contains(
4159            "autumn_http_route_requests_total{version=\"stable\",method=\"POST\",route=\"/test\"} 1"
4160        ));
4161
4162        assert!(text.contains("# HELP autumn_request_timeouts_total"));
4163        assert!(text.contains("# TYPE autumn_request_timeouts_total counter"));
4164        assert!(text.contains("autumn_request_timeouts_total{version=\"stable\"} 0"));
4165    }
4166
4167    #[tokio::test]
4168    async fn actuator_prometheus_labels_metrics_with_canary_version() {
4169        // A replica whose deploy_version() is "canary" must tag its metric
4170        // families with version="canary" so a controller can compare cohorts.
4171        let mut state = test_state();
4172        state.deploy_version = crate::canary::CANARY.to_owned();
4173        // Latencies in ms: spread so p50 < p95/p99 and the slowest is 1200 ms.
4174        state.metrics().record("GET", "/test", 200, 10);
4175        state.metrics().record("GET", "/test", 200, 20);
4176        state.metrics().record("GET", "/test", 500, 1200);
4177
4178        let app = actuator_router(true).with_state(state);
4179        let resp = app
4180            .oneshot(
4181                Request::builder()
4182                    .uri("/actuator/prometheus")
4183                    .body(Body::empty())
4184                    .unwrap(),
4185            )
4186            .await
4187            .unwrap();
4188        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4189            .await
4190            .unwrap();
4191        let text = String::from_utf8(body.to_vec()).unwrap();
4192
4193        assert!(text.contains("autumn_http_requests_total{version=\"canary\"} 3"));
4194        assert!(text.contains("autumn_http_responses_total{version=\"canary\",status=\"5xx\"} 1"));
4195        // Must not leak the default "stable" label when running as canary.
4196        assert!(!text.contains("version=\"stable\""));
4197
4198        // Verify the percentile math: values are reported in seconds (ms / 1000)
4199        // and satisfy the quantile invariant p50 <= p95 <= p99.
4200        let quantile = |q: &str| -> f64 {
4201            let needle = format!(
4202                "autumn_http_request_duration_seconds{{version=\"canary\",quantile=\"{q}\"}} "
4203            );
4204            let line = text
4205                .lines()
4206                .find(|l| l.starts_with(&needle))
4207                .unwrap_or_else(|| panic!("missing duration line for quantile {q}"));
4208            line[needle.len()..].trim().parse().unwrap()
4209        };
4210        let (p50, p95, p99) = (quantile("0.5"), quantile("0.95"), quantile("0.99"));
4211        assert!(p50 <= p95, "p50 ({p50}) must be <= p95 ({p95})");
4212        assert!(p95 <= p99, "p95 ({p95}) must be <= p99 ({p99})");
4213        // Slowest sample was 1200 ms, so the top quantile must read 1.2 seconds.
4214        assert!(
4215            (p99 - 1.2).abs() < f64::EPSILON,
4216            "p99 should be 1.2s, got {p99}"
4217        );
4218    }
4219
4220    #[tokio::test]
4221    async fn actuator_prometheus_available_in_nonsensitive_mode() {
4222        let app = actuator_router(false).with_state(test_state());
4223        let resp = app
4224            .oneshot(
4225                Request::builder()
4226                    .uri("/actuator/prometheus")
4227                    .body(Body::empty())
4228                    .unwrap(),
4229            )
4230            .await
4231            .unwrap();
4232        assert_eq!(resp.status(), StatusCode::OK);
4233    }
4234
4235    #[tokio::test]
4236    async fn actuator_prometheus_available_when_export_enabled_and_nonsensitive() {
4237        // Metrics export decoupled from sensitive: prometheus is reachable even
4238        // though sensitive endpoints (env/configprops/loggers/tasks) are not.
4239        let app = actuator_router_with_prefix("/actuator", false, true).with_state(test_state());
4240        let resp = app
4241            .clone()
4242            .oneshot(
4243                Request::builder()
4244                    .uri("/actuator/prometheus")
4245                    .body(Body::empty())
4246                    .unwrap(),
4247            )
4248            .await
4249            .unwrap();
4250        assert_eq!(resp.status(), StatusCode::OK);
4251
4252        // Sensitive surfaces stay closed under the non-sensitive metrics config.
4253        for sensitive_path in [
4254            "/actuator/env",
4255            "/actuator/configprops",
4256            "/actuator/loggers",
4257            "/actuator/tasks",
4258            "/actuator/jobs",
4259            "/actuator/ui/tasks",
4260        ] {
4261            let resp = app
4262                .clone()
4263                .oneshot(
4264                    Request::builder()
4265                        .uri(sensitive_path)
4266                        .body(Body::empty())
4267                        .unwrap(),
4268                )
4269                .await
4270                .unwrap();
4271            assert_eq!(
4272                resp.status(),
4273                StatusCode::NOT_FOUND,
4274                "{sensitive_path} should be unavailable when actuator is non-sensitive"
4275            );
4276        }
4277    }
4278
4279    #[tokio::test]
4280    async fn actuator_prometheus_unavailable_when_export_disabled() {
4281        // Regression: with metrics export disabled, the scrape endpoint is gone.
4282        let app = actuator_router_with_prefix("/actuator", false, false).with_state(test_state());
4283        let resp = app
4284            .oneshot(
4285                Request::builder()
4286                    .uri("/actuator/prometheus")
4287                    .body(Body::empty())
4288                    .unwrap(),
4289            )
4290            .await
4291            .unwrap();
4292        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4293    }
4294
4295    #[tokio::test]
4296    async fn actuator_prometheus_unavailable_when_export_disabled_even_if_sensitive() {
4297        // Disabling export wins even when sensitive endpoints are enabled.
4298        let app = actuator_router_with_prefix("/actuator", true, false).with_state(test_state());
4299        let resp = app
4300            .oneshot(
4301                Request::builder()
4302                    .uri("/actuator/prometheus")
4303                    .body(Body::empty())
4304                    .unwrap(),
4305            )
4306            .await
4307            .unwrap();
4308        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4309    }
4310
4311    #[test]
4312    fn actuator_endpoint_paths_respects_prometheus_toggle() {
4313        let enabled = actuator_endpoint_paths("/actuator", false, true);
4314        assert!(
4315            enabled.iter().any(|p| p == "/actuator/prometheus"),
4316            "prometheus path should be listed when export is enabled: {enabled:?}"
4317        );
4318
4319        let disabled = actuator_endpoint_paths("/actuator", false, false);
4320        assert!(
4321            !disabled.iter().any(|p| p == "/actuator/prometheus"),
4322            "prometheus path should be absent when export is disabled: {disabled:?}"
4323        );
4324    }
4325
4326    // ── Tasks endpoint tests ───────────────────────────────────
4327
4328    #[tokio::test]
4329    async fn actuator_tasks_returns_registered_tasks() {
4330        let state = test_state();
4331        state.task_registry().register("cleanup", "every 5m");
4332        state.task_registry().record_start("cleanup");
4333        state.task_registry().record_success("cleanup", 150);
4334
4335        let app = actuator_router(true).with_state(state);
4336        let resp = app
4337            .oneshot(
4338                Request::builder()
4339                    .uri("/actuator/tasks")
4340                    .body(Body::empty())
4341                    .unwrap(),
4342            )
4343            .await
4344            .unwrap();
4345
4346        assert_eq!(resp.status(), StatusCode::OK);
4347        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4348            .await
4349            .unwrap();
4350        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4351        let task = &json["scheduled_tasks"]["cleanup"];
4352        assert_eq!(task["schedule"], "every 5m");
4353        assert_eq!(task["status"], "idle");
4354        assert_eq!(task["total_runs"], 1);
4355        assert_eq!(task["total_failures"], 0);
4356        assert_eq!(task["last_result"], "ok");
4357        assert_eq!(task["last_duration_ms"], 150);
4358    }
4359
4360    #[tokio::test]
4361    async fn actuator_jobs_returns_registered_jobs() {
4362        let state = test_state();
4363        state.job_registry().register("send_email");
4364        state.job_registry().record_enqueue("send_email");
4365        state.job_registry().record_start("send_email");
4366        state.job_registry().record_success("send_email");
4367
4368        let app = actuator_router(true).with_state(state);
4369        let resp = app
4370            .oneshot(
4371                Request::builder()
4372                    .uri("/actuator/jobs")
4373                    .body(Body::empty())
4374                    .unwrap(),
4375            )
4376            .await
4377            .unwrap();
4378
4379        assert_eq!(resp.status(), StatusCode::OK);
4380        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4381            .await
4382            .unwrap();
4383        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4384        let job = &json["jobs"]["send_email"];
4385        assert_eq!(job["queued"], 0);
4386        assert_eq!(job["in_flight"], 0);
4387        assert_eq!(job["total_successes"], 1);
4388        assert_eq!(job["total_failures"], 0);
4389    }
4390
4391    #[cfg(feature = "ws")]
4392    #[tokio::test]
4393    async fn actuator_channels_returns_metrics() {
4394        let state = test_state();
4395        let mut rx = state.channels().subscribe("feed");
4396        state
4397            .channels()
4398            .broadcast()
4399            .publish("feed", "hello")
4400            .expect("publish should succeed");
4401        rx.try_recv().expect("subscriber should receive payload");
4402
4403        let app = actuator_router(true).with_state(state);
4404        let resp = app
4405            .oneshot(
4406                Request::builder()
4407                    .uri("/actuator/channels")
4408                    .body(Body::empty())
4409                    .unwrap(),
4410            )
4411            .await
4412            .unwrap();
4413
4414        assert_eq!(resp.status(), StatusCode::OK);
4415        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4416            .await
4417            .unwrap();
4418        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4419        let feed = &json["channels"]["feed"];
4420        assert_eq!(feed["subscriber_count"], 1);
4421        assert_eq!(feed["lifetime_publish_count"], 1);
4422        assert_eq!(feed["dropped_count"], 0);
4423        assert_eq!(feed["lagged_count"], 0);
4424    }
4425
4426    #[tokio::test]
4427    async fn actuator_tasks_hidden_in_nonsensitive_mode() {
4428        let app = actuator_router(false).with_state(test_state());
4429        let resp = app
4430            .oneshot(
4431                Request::builder()
4432                    .uri("/actuator/tasks")
4433                    .body(Body::empty())
4434                    .unwrap(),
4435            )
4436            .await
4437            .unwrap();
4438        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4439    }
4440
4441    #[test]
4442    fn task_registry_records_failure() {
4443        let registry = TaskRegistry::new();
4444        registry.register("my_task", "cron 0 * * * *");
4445        registry.record_start("my_task");
4446        registry.record_failure("my_task", 200, "connection refused");
4447
4448        let snapshot = registry.snapshot();
4449        let task = &snapshot["my_task"];
4450        assert_eq!(task.status, "idle");
4451        assert_eq!(task.total_runs, 1);
4452        assert_eq!(task.total_failures, 1);
4453        assert_eq!(task.last_result.as_deref(), Some("failed"));
4454        assert_eq!(task.last_error.as_deref(), Some("connection refused"));
4455    }
4456
4457    #[test]
4458    fn task_registry_empty_snapshot() {
4459        let registry = TaskRegistry::new();
4460        assert!(registry.snapshot().is_empty());
4461    }
4462    #[test]
4463    fn log_levels_rejects_new_key_at_capacity() {
4464        let levels = LogLevels::new("info");
4465        // Fill to capacity
4466        for i in 0..1000 {
4467            let _ = levels.set_logger_level(&format!("logger_{i}"), "debug");
4468        }
4469
4470        // Try to add a new key, should be rejected
4471        let result = levels.set_logger_level("logger_1000", "warn");
4472        assert_eq!(result, None);
4473        assert_eq!(levels.logger_overrides().len(), 1000);
4474        assert_eq!(levels.logger_overrides().get("logger_1000"), None);
4475    }
4476
4477    #[test]
4478    fn log_levels_accepts_existing_key_at_capacity() {
4479        let levels = LogLevels::new("info");
4480        // Fill to capacity
4481        for i in 0..1000 {
4482            let _ = levels.set_logger_level(&format!("logger_{i}"), "debug");
4483        }
4484
4485        // Try to update an existing key, should succeed
4486        let prev = levels.set_logger_level("logger_999", "warn");
4487        assert_eq!(prev.as_deref(), Some("debug"));
4488        assert_eq!(levels.logger_overrides().len(), 1000);
4489        assert_eq!(
4490            levels
4491                .logger_overrides()
4492                .get("logger_999")
4493                .map(String::as_str),
4494            Some("warn")
4495        );
4496    }
4497
4498    #[test]
4499    fn task_registry_records_multiple_successes_and_failures() {
4500        let registry = TaskRegistry::new();
4501        registry.register("my_task", "cron * * * * *");
4502
4503        // 1st success
4504        registry.record_start("my_task");
4505        registry.record_success("my_task", 100);
4506
4507        // 2nd success
4508        registry.record_start("my_task");
4509        registry.record_success("my_task", 110);
4510
4511        let snapshot = registry.snapshot();
4512        let task = &snapshot["my_task"];
4513        assert_eq!(task.total_runs, 2);
4514        assert_eq!(task.total_failures, 0);
4515
4516        // 1st failure
4517        registry.record_start("my_task");
4518        registry.record_failure("my_task", 50, "failed");
4519
4520        let snapshot2 = registry.snapshot();
4521        let task2 = &snapshot2["my_task"];
4522        assert_eq!(task2.total_runs, 3);
4523        assert_eq!(task2.total_failures, 1);
4524    }
4525
4526    #[test]
4527    fn configprops_tracks_custom_profile() {
4528        let mut props = HashMap::new();
4529        ConfigProperties::track_property(
4530            &mut props,
4531            "log.level",
4532            "debug",
4533            "info",
4534            "custom_profile",
4535        );
4536        assert_eq!(props["log.level"].source, "autumn.toml");
4537    }
4538
4539    #[test]
4540    fn configprops_tracks_dev_prod_profiles() {
4541        let mut props = HashMap::new();
4542        ConfigProperties::track_property(&mut props, "log.level", "debug", "info", "dev");
4543        assert_eq!(props["log.level"].source, "profile_default:dev");
4544
4545        ConfigProperties::track_property(&mut props, "log.format", "json", "text", "prod");
4546        assert_eq!(props["log.format"].source, "profile_default:prod");
4547    }
4548
4549    #[test]
4550    fn configprops_returns_default_when_values_match() {
4551        let mut props = HashMap::new();
4552        ConfigProperties::track_property(&mut props, "log.level", "info", "info", "dev");
4553        assert_eq!(props["log.level"].source, "default");
4554    }
4555
4556    #[tokio::test]
4557    async fn actuator_ui_dashboard_returns_html_or_unimplemented() {
4558        let app = actuator_router(true).with_state(test_state());
4559
4560        let res = app
4561            .oneshot(
4562                Request::builder()
4563                    .uri("/actuator/ui")
4564                    .body(Body::empty())
4565                    .unwrap(),
4566            )
4567            .await
4568            .unwrap();
4569
4570        if cfg!(feature = "maud") {
4571            assert_eq!(res.status(), StatusCode::OK);
4572            assert_eq!(
4573                res.headers().get("content-type").unwrap(),
4574                "text/html; charset=utf-8"
4575            );
4576        } else {
4577            assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4578        }
4579    }
4580
4581    #[tokio::test]
4582    async fn actuator_ui_metrics_returns_html_or_unimplemented() {
4583        let app = actuator_router(true).with_state(test_state());
4584
4585        let res = app
4586            .oneshot(
4587                Request::builder()
4588                    .uri("/actuator/ui/metrics")
4589                    .body(Body::empty())
4590                    .unwrap(),
4591            )
4592            .await
4593            .unwrap();
4594
4595        if cfg!(feature = "maud") {
4596            assert_eq!(res.status(), StatusCode::OK);
4597            assert_eq!(
4598                res.headers().get("content-type").unwrap(),
4599                "text/html; charset=utf-8"
4600            );
4601        } else {
4602            assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4603        }
4604    }
4605
4606    #[tokio::test]
4607    async fn actuator_ui_tasks_returns_html_or_unimplemented() {
4608        let app = actuator_router(true).with_state(test_state());
4609
4610        let res = app
4611            .oneshot(
4612                Request::builder()
4613                    .uri("/actuator/ui/tasks")
4614                    .body(Body::empty())
4615                    .unwrap(),
4616            )
4617            .await
4618            .unwrap();
4619
4620        if cfg!(feature = "maud") {
4621            assert_eq!(res.status(), StatusCode::OK);
4622            assert_eq!(
4623                res.headers().get("content-type").unwrap(),
4624                "text/html; charset=utf-8"
4625            );
4626        } else {
4627            assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4628        }
4629    }
4630
4631    #[tokio::test]
4632    async fn test_actuator_router_calls_prefix_variant() {
4633        // The `actuator_router` function is a convenience wrapper around `actuator_router_with_prefix`
4634        // using "/actuator" as the prefix. We can test it by building it and hitting one of the endpoints.
4635        let app = actuator_router(false).with_state(test_state());
4636        let resp = app
4637            .oneshot(
4638                Request::builder()
4639                    .uri("/actuator/health")
4640                    .body(Body::empty())
4641                    .unwrap(),
4642            )
4643            .await
4644            .unwrap();
4645
4646        assert_eq!(resp.status(), StatusCode::OK);
4647    }
4648
4649    // ── RED: /actuator/a11y endpoint ───────────────────────────────
4650
4651    #[tokio::test]
4652    async fn actuator_a11y_returns_posture_json() {
4653        let app = actuator_router(false).with_state(test_state());
4654        let resp = app
4655            .oneshot(
4656                Request::builder()
4657                    .uri("/actuator/a11y")
4658                    .body(Body::empty())
4659                    .unwrap(),
4660            )
4661            .await
4662            .unwrap();
4663
4664        assert_eq!(resp.status(), StatusCode::OK);
4665        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4666            .await
4667            .unwrap();
4668        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4669        assert!(json["lang_set"].is_boolean(), "{json}");
4670        assert!(json["skip_link_present"].is_boolean(), "{json}");
4671        assert!(json["landmark_regions_present"].is_boolean(), "{json}");
4672    }
4673
4674    #[tokio::test]
4675    async fn actuator_a11y_available_in_nonsensitive_mode() {
4676        let app = actuator_router(false).with_state(test_state());
4677        let resp = app
4678            .oneshot(
4679                Request::builder()
4680                    .uri("/actuator/a11y")
4681                    .body(Body::empty())
4682                    .unwrap(),
4683            )
4684            .await
4685            .unwrap();
4686        assert_eq!(resp.status(), StatusCode::OK);
4687    }
4688
4689    #[tokio::test]
4690    async fn actuator_a11y_posture_default_values() {
4691        let app = actuator_router(true).with_state(test_state());
4692        let resp = app
4693            .oneshot(
4694                Request::builder()
4695                    .uri("/actuator/a11y")
4696                    .body(Body::empty())
4697                    .unwrap(),
4698            )
4699            .await
4700            .unwrap();
4701
4702        assert_eq!(resp.status(), StatusCode::OK);
4703        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4704            .await
4705            .unwrap();
4706        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4707        // Default test state should report false for all posture fields
4708        assert_eq!(json["lang_set"], false, "{json}");
4709        assert_eq!(json["skip_link_present"], false, "{json}");
4710        assert_eq!(json["landmark_regions_present"], false, "{json}");
4711    }
4712
4713    #[test]
4714    fn a11y_posture_all_passing_is_compliant() {
4715        let posture = A11yPosture {
4716            lang_set: true,
4717            skip_link_present: true,
4718            landmark_regions_present: true,
4719        };
4720        assert!(posture.is_compliant());
4721    }
4722
4723    #[test]
4724    fn a11y_posture_missing_lang_is_not_compliant() {
4725        let posture = A11yPosture {
4726            lang_set: false,
4727            skip_link_present: true,
4728            landmark_regions_present: true,
4729        };
4730        assert!(!posture.is_compliant());
4731    }
4732
4733    #[tokio::test]
4734    async fn actuator_a11y_endpoint_paths_includes_a11y() {
4735        let paths = actuator_endpoint_paths("/actuator", false, true);
4736        assert!(
4737            paths.iter().any(|p| p == "/actuator/a11y"),
4738            "a11y path not found in: {paths:?}"
4739        );
4740    }
4741
4742    // ── MetricsSource / MetricsSourceRegistry tests ────────────
4743
4744    #[test]
4745    fn metrics_source_registry_registers_and_collects() {
4746        struct FixedSource;
4747        impl MetricsSource for FixedSource {
4748            fn collect(&self) -> Vec<MetricFamily> {
4749                vec![MetricFamily {
4750                    name: "plugin_requests_total".to_string(),
4751                    help: "Plugin request count".to_string(),
4752                    kind: MetricKind::Counter,
4753                    samples: vec![MetricSample {
4754                        labels: vec![],
4755                        value: 42.0,
4756                    }],
4757                }]
4758            }
4759        }
4760
4761        let registry = MetricsSourceRegistry::new();
4762        registry
4763            .register("myplugin", Arc::new(FixedSource))
4764            .unwrap();
4765
4766        let all = registry.collect_all();
4767        assert_eq!(all.len(), 1);
4768        assert_eq!(all[0].0, "myplugin");
4769        assert_eq!(all[0].1[0].name, "plugin_requests_total");
4770        assert!((all[0].1[0].samples[0].value - 42.0).abs() < f64::EPSILON);
4771    }
4772
4773    #[test]
4774    fn metrics_source_registry_rejects_duplicate_name() {
4775        struct EmptySource;
4776        impl MetricsSource for EmptySource {
4777            fn collect(&self) -> Vec<MetricFamily> {
4778                vec![]
4779            }
4780        }
4781
4782        let registry = MetricsSourceRegistry::new();
4783        registry.register("dup", Arc::new(EmptySource)).unwrap();
4784        let result = registry.register("dup", Arc::new(EmptySource));
4785        assert!(result.is_err());
4786        assert!(result.unwrap_err().contains("dup"));
4787    }
4788
4789    #[test]
4790    fn metrics_source_registry_isolates_panicking_source() {
4791        struct PanickingSource;
4792        impl MetricsSource for PanickingSource {
4793            fn collect(&self) -> Vec<MetricFamily> {
4794                panic!("source panicked!")
4795            }
4796        }
4797
4798        let registry = MetricsSourceRegistry::new();
4799        registry
4800            .register("panicker", Arc::new(PanickingSource))
4801            .unwrap();
4802
4803        let all = registry.collect_all();
4804        assert_eq!(all.len(), 1);
4805        assert_eq!(
4806            all[0].1.len(),
4807            0,
4808            "panicking source should yield no families"
4809        );
4810
4811        let errors = registry.error_counts();
4812        assert_eq!(errors.get("panicker"), Some(&1));
4813    }
4814
4815    #[tokio::test]
4816    async fn prometheus_endpoint_includes_plugin_source_families() {
4817        struct GaugeSource;
4818        impl MetricsSource for GaugeSource {
4819            fn collect(&self) -> Vec<MetricFamily> {
4820                vec![MetricFamily {
4821                    name: "plugin_queue_depth".to_string(),
4822                    help: "Plugin queue depth".to_string(),
4823                    kind: MetricKind::Gauge,
4824                    samples: vec![MetricSample {
4825                        labels: vec![("shard".to_string(), "a".to_string())],
4826                        value: 7.0,
4827                    }],
4828                }]
4829            }
4830        }
4831
4832        let state = test_state();
4833        state
4834            .metrics_source_registry
4835            .register("gauge_plugin", Arc::new(GaugeSource))
4836            .unwrap();
4837
4838        let app = actuator_router(true).with_state(state);
4839        let resp = app
4840            .oneshot(
4841                Request::builder()
4842                    .uri("/actuator/prometheus")
4843                    .body(Body::empty())
4844                    .unwrap(),
4845            )
4846            .await
4847            .unwrap();
4848
4849        assert_eq!(resp.status(), StatusCode::OK);
4850        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4851            .await
4852            .unwrap();
4853        let text = String::from_utf8(body.to_vec()).unwrap();
4854
4855        assert!(
4856            text.contains("# HELP plugin_queue_depth Plugin queue depth"),
4857            "missing HELP line in:\n{text}"
4858        );
4859        assert!(
4860            text.contains("# TYPE plugin_queue_depth gauge"),
4861            "missing TYPE line in:\n{text}"
4862        );
4863        assert!(
4864            text.contains("plugin_queue_depth{shard=\"a\"} 7"),
4865            "missing sample line in:\n{text}"
4866        );
4867    }
4868
4869    #[tokio::test]
4870    async fn prometheus_endpoint_emits_error_counter_for_panicking_source() {
4871        struct PanickingSource;
4872        impl MetricsSource for PanickingSource {
4873            fn collect(&self) -> Vec<MetricFamily> {
4874                panic!("oops")
4875            }
4876        }
4877
4878        let state = test_state();
4879        state
4880            .metrics_source_registry
4881            .register("panic_src", Arc::new(PanickingSource))
4882            .unwrap();
4883
4884        let app = actuator_router(true).with_state(state);
4885        let resp = app
4886            .oneshot(
4887                Request::builder()
4888                    .uri("/actuator/prometheus")
4889                    .body(Body::empty())
4890                    .unwrap(),
4891            )
4892            .await
4893            .unwrap();
4894
4895        assert_eq!(resp.status(), StatusCode::OK);
4896        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4897            .await
4898            .unwrap();
4899        let text = String::from_utf8(body.to_vec()).unwrap();
4900
4901        assert!(
4902            text.contains("autumn_metrics_source_errors_total{source=\"panic_src\"} 1"),
4903            "missing error counter in:\n{text}"
4904        );
4905    }
4906
4907    #[tokio::test]
4908    async fn metrics_endpoint_includes_sources_section() {
4909        struct SampleSource;
4910        impl MetricsSource for SampleSource {
4911            fn collect(&self) -> Vec<MetricFamily> {
4912                vec![MetricFamily {
4913                    name: "custom_counter".to_string(),
4914                    help: "A custom counter".to_string(),
4915                    kind: MetricKind::Counter,
4916                    samples: vec![MetricSample {
4917                        labels: vec![],
4918                        value: 5.0,
4919                    }],
4920                }]
4921            }
4922        }
4923
4924        let state = test_state();
4925        state
4926            .metrics_source_registry
4927            .register("my_source", Arc::new(SampleSource))
4928            .unwrap();
4929
4930        let app = actuator_router(true).with_state(state);
4931        let resp = app
4932            .oneshot(
4933                Request::builder()
4934                    .uri("/actuator/metrics")
4935                    .body(Body::empty())
4936                    .unwrap(),
4937            )
4938            .await
4939            .unwrap();
4940
4941        assert_eq!(resp.status(), StatusCode::OK);
4942        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4943            .await
4944            .unwrap();
4945        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4946
4947        assert!(
4948            json.get("sources").is_some(),
4949            "metrics JSON missing 'sources' key"
4950        );
4951        assert!(
4952            json["sources"].get("my_source").is_some(),
4953            "sources missing 'my_source' key"
4954        );
4955    }
4956
4957    #[test]
4958    fn metrics_source_registry_preserves_insertion_order() {
4959        struct NamedSource(&'static str);
4960        impl MetricsSource for NamedSource {
4961            fn collect(&self) -> Vec<MetricFamily> {
4962                vec![MetricFamily {
4963                    name: self.0.to_string(),
4964                    help: String::new(),
4965                    kind: MetricKind::Counter,
4966                    samples: vec![],
4967                }]
4968            }
4969        }
4970
4971        let registry = MetricsSourceRegistry::new();
4972        registry
4973            .register("alpha", Arc::new(NamedSource("alpha_metric")))
4974            .unwrap();
4975        registry
4976            .register("beta", Arc::new(NamedSource("beta_metric")))
4977            .unwrap();
4978        registry
4979            .register("gamma", Arc::new(NamedSource("gamma_metric")))
4980            .unwrap();
4981
4982        let all = registry.collect_all();
4983        assert_eq!(all[0].0, "alpha");
4984        assert_eq!(all[1].0, "beta");
4985        assert_eq!(all[2].0, "gamma");
4986    }
4987
4988    // ── render_plugin_sources edge-case coverage ──────────────────────────
4989
4990    #[test]
4991    fn escape_help_text_escapes_backslash_and_newline() {
4992        assert_eq!(escape_help_text("a\\b\nc"), "a\\\\b\\nc");
4993        assert_eq!(escape_help_text("plain"), "plain");
4994        assert_eq!(escape_help_text(""), "");
4995    }
4996
4997    #[test]
4998    fn format_sample_value_handles_special_floats() {
4999        assert_eq!(format_sample_value(f64::INFINITY), "+Inf");
5000        assert_eq!(format_sample_value(f64::NEG_INFINITY), "-Inf");
5001        assert_eq!(format_sample_value(f64::NAN), "NaN");
5002        assert_eq!(format_sample_value(0.0), "0");
5003        assert_eq!(format_sample_value(1.5), "1.5");
5004    }
5005
5006    #[test]
5007    fn is_valid_metric_name_accepts_valid_and_rejects_invalid() {
5008        assert!(is_valid_metric_name("http_requests_total"));
5009        assert!(is_valid_metric_name("_private"));
5010        assert!(is_valid_metric_name("ns:metric"));
5011        assert!(!is_valid_metric_name(""));
5012        assert!(!is_valid_metric_name("0starts_with_digit"));
5013        assert!(!is_valid_metric_name("has-hyphen"));
5014    }
5015
5016    #[test]
5017    fn is_valid_label_name_accepts_valid_and_rejects_invalid() {
5018        assert!(is_valid_label_name("shard"));
5019        assert!(is_valid_label_name("_internal"));
5020        assert!(is_valid_label_name("a1"));
5021        assert!(!is_valid_label_name(""));
5022        assert!(!is_valid_label_name("0starts_digit"));
5023        assert!(!is_valid_label_name("has-hyphen"));
5024        assert!(!is_valid_label_name("has.dot"));
5025    }
5026
5027    #[tokio::test]
5028    async fn prometheus_endpoint_skips_family_with_invalid_metric_name() {
5029        struct BadNameSource;
5030        impl MetricsSource for BadNameSource {
5031            fn collect(&self) -> Vec<MetricFamily> {
5032                vec![
5033                    MetricFamily {
5034                        name: "invalid-name".to_string(),
5035                        help: "should be skipped".to_string(),
5036                        kind: MetricKind::Counter,
5037                        samples: vec![],
5038                    },
5039                    MetricFamily {
5040                        name: "valid_name".to_string(),
5041                        help: "should appear".to_string(),
5042                        kind: MetricKind::Counter,
5043                        samples: vec![MetricSample {
5044                            labels: vec![],
5045                            value: 1.0,
5046                        }],
5047                    },
5048                ]
5049            }
5050        }
5051
5052        let state = test_state();
5053        state
5054            .metrics_source_registry
5055            .register("bad_name_src", Arc::new(BadNameSource))
5056            .unwrap();
5057
5058        let app = actuator_router(true).with_state(state);
5059        let resp = app
5060            .oneshot(
5061                Request::builder()
5062                    .uri("/actuator/prometheus")
5063                    .body(Body::empty())
5064                    .unwrap(),
5065            )
5066            .await
5067            .unwrap();
5068        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5069            .await
5070            .unwrap();
5071        let text = String::from_utf8(body.to_vec()).unwrap();
5072
5073        assert!(
5074            !text.contains("invalid-name"),
5075            "invalid family must be skipped:\n{text}"
5076        );
5077        assert!(
5078            text.contains("valid_name"),
5079            "valid family must appear:\n{text}"
5080        );
5081    }
5082
5083    #[tokio::test]
5084    async fn prometheus_endpoint_skips_sample_with_invalid_label_key() {
5085        // A sample containing an invalid label key (bad-key) must be skipped
5086        // entirely — not emitted with the bad key dropped — to avoid creating
5087        // a phantom duplicate series in the Prometheus scrape.
5088        struct DirtyLabelsSource;
5089        impl MetricsSource for DirtyLabelsSource {
5090            fn collect(&self) -> Vec<MetricFamily> {
5091                vec![MetricFamily {
5092                    name: "dirty_labels_metric".to_string(),
5093                    help: "test".to_string(),
5094                    kind: MetricKind::Counter,
5095                    samples: vec![
5096                        MetricSample {
5097                            labels: vec![
5098                                ("good".to_string(), "a".to_string()),
5099                                ("bad-key".to_string(), "b".to_string()),
5100                            ],
5101                            value: 1.0,
5102                        },
5103                        MetricSample {
5104                            labels: vec![("good".to_string(), "a".to_string())],
5105                            value: 2.0,
5106                        },
5107                    ],
5108                }]
5109            }
5110        }
5111
5112        let state = test_state();
5113        state
5114            .metrics_source_registry
5115            .register("dirty", Arc::new(DirtyLabelsSource))
5116            .unwrap();
5117
5118        let app = actuator_router(true).with_state(state);
5119        let resp = app
5120            .oneshot(
5121                Request::builder()
5122                    .uri("/actuator/prometheus")
5123                    .body(Body::empty())
5124                    .unwrap(),
5125            )
5126            .await
5127            .unwrap();
5128        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5129            .await
5130            .unwrap();
5131        let text = String::from_utf8(body.to_vec()).unwrap();
5132
5133        // First sample (bad-key) must be absent entirely
5134        assert!(
5135            !text.contains("dirty_labels_metric{good=\"a\"} 1"),
5136            "sample with invalid label key must be skipped:\n{text}"
5137        );
5138        // Second (clean) sample must still appear
5139        assert!(
5140            text.contains("dirty_labels_metric{good=\"a\"} 2"),
5141            "clean sample must appear:\n{text}"
5142        );
5143    }
5144
5145    #[tokio::test]
5146    async fn prometheus_endpoint_deduplicates_label_keys() {
5147        struct DupLabelSource;
5148        impl MetricsSource for DupLabelSource {
5149            fn collect(&self) -> Vec<MetricFamily> {
5150                vec![MetricFamily {
5151                    name: "dup_label_metric".to_string(),
5152                    help: "test".to_string(),
5153                    kind: MetricKind::Counter,
5154                    samples: vec![MetricSample {
5155                        labels: vec![
5156                            ("env".to_string(), "prod".to_string()),
5157                            ("env".to_string(), "staging".to_string()),
5158                        ],
5159                        value: 5.0,
5160                    }],
5161                }]
5162            }
5163        }
5164
5165        let state = test_state();
5166        state
5167            .metrics_source_registry
5168            .register("dup_src", Arc::new(DupLabelSource))
5169            .unwrap();
5170
5171        let app = actuator_router(true).with_state(state);
5172        let resp = app
5173            .oneshot(
5174                Request::builder()
5175                    .uri("/actuator/prometheus")
5176                    .body(Body::empty())
5177                    .unwrap(),
5178            )
5179            .await
5180            .unwrap();
5181        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5182            .await
5183            .unwrap();
5184        let text = String::from_utf8(body.to_vec()).unwrap();
5185
5186        // Only the first occurrence of `env` is kept
5187        assert!(
5188            text.contains("dup_label_metric{env=\"prod\"} 5"),
5189            "first env value must be kept:\n{text}"
5190        );
5191        assert!(
5192            !text.contains("staging"),
5193            "duplicate env key value must be dropped:\n{text}"
5194        );
5195    }
5196
5197    #[tokio::test]
5198    async fn prometheus_endpoint_escapes_help_text_and_formats_inf() {
5199        struct SpecialSource;
5200        impl MetricsSource for SpecialSource {
5201            fn collect(&self) -> Vec<MetricFamily> {
5202                vec![MetricFamily {
5203                    name: "inf_gauge".to_string(),
5204                    help: "has\\backslash and\nnewline".to_string(),
5205                    kind: MetricKind::Gauge,
5206                    samples: vec![
5207                        MetricSample {
5208                            labels: vec![("dir".to_string(), "pos".to_string())],
5209                            value: f64::INFINITY,
5210                        },
5211                        MetricSample {
5212                            labels: vec![("dir".to_string(), "neg".to_string())],
5213                            value: f64::NEG_INFINITY,
5214                        },
5215                    ],
5216                }]
5217            }
5218        }
5219
5220        let state = test_state();
5221        state
5222            .metrics_source_registry
5223            .register("special", Arc::new(SpecialSource))
5224            .unwrap();
5225
5226        let app = actuator_router(true).with_state(state);
5227        let resp = app
5228            .oneshot(
5229                Request::builder()
5230                    .uri("/actuator/prometheus")
5231                    .body(Body::empty())
5232                    .unwrap(),
5233            )
5234            .await
5235            .unwrap();
5236        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5237            .await
5238            .unwrap();
5239        let text = String::from_utf8(body.to_vec()).unwrap();
5240
5241        assert!(
5242            text.contains("# HELP inf_gauge has\\\\backslash and\\nnewline"),
5243            "help text must be escaped in:\n{text}"
5244        );
5245        assert!(
5246            text.contains("inf_gauge{dir=\"pos\"} +Inf"),
5247            "must render +Inf in:\n{text}"
5248        );
5249        assert!(
5250            text.contains("inf_gauge{dir=\"neg\"} -Inf"),
5251            "must render -Inf in:\n{text}"
5252        );
5253    }
5254
5255    #[tokio::test]
5256    async fn prometheus_endpoint_skips_duplicate_family_name_across_sources() {
5257        struct FirstSource;
5258        impl MetricsSource for FirstSource {
5259            fn collect(&self) -> Vec<MetricFamily> {
5260                vec![MetricFamily {
5261                    name: "shared_counter".to_string(),
5262                    help: "from first".to_string(),
5263                    kind: MetricKind::Counter,
5264                    samples: vec![MetricSample {
5265                        labels: vec![],
5266                        value: 1.0,
5267                    }],
5268                }]
5269            }
5270        }
5271        struct SecondSource;
5272        impl MetricsSource for SecondSource {
5273            fn collect(&self) -> Vec<MetricFamily> {
5274                vec![MetricFamily {
5275                    name: "shared_counter".to_string(),
5276                    help: "from second".to_string(),
5277                    kind: MetricKind::Counter,
5278                    samples: vec![MetricSample {
5279                        labels: vec![],
5280                        value: 2.0,
5281                    }],
5282                }]
5283            }
5284        }
5285
5286        let state = test_state();
5287        state
5288            .metrics_source_registry
5289            .register("first", Arc::new(FirstSource))
5290            .unwrap();
5291        state
5292            .metrics_source_registry
5293            .register("second", Arc::new(SecondSource))
5294            .unwrap();
5295
5296        let app = actuator_router(true).with_state(state);
5297        let resp = app
5298            .oneshot(
5299                Request::builder()
5300                    .uri("/actuator/prometheus")
5301                    .body(Body::empty())
5302                    .unwrap(),
5303            )
5304            .await
5305            .unwrap();
5306        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5307            .await
5308            .unwrap();
5309        let text = String::from_utf8(body.to_vec()).unwrap();
5310
5311        let occurrences = text.matches("# HELP shared_counter").count();
5312        assert_eq!(
5313            occurrences, 1,
5314            "must emit exactly one HELP block for shared_counter:\n{text}"
5315        );
5316    }
5317
5318    #[tokio::test]
5319    async fn prometheus_endpoint_skips_builtin_name_collision() {
5320        struct ShadowSource;
5321        impl MetricsSource for ShadowSource {
5322            fn collect(&self) -> Vec<MetricFamily> {
5323                vec![MetricFamily {
5324                    name: "autumn_http_requests_total".to_string(),
5325                    help: "plugin trying to shadow built-in".to_string(),
5326                    kind: MetricKind::Counter,
5327                    samples: vec![MetricSample {
5328                        labels: vec![],
5329                        value: 999.0,
5330                    }],
5331                }]
5332            }
5333        }
5334
5335        let state = test_state();
5336        state
5337            .metrics_source_registry
5338            .register("shadow", Arc::new(ShadowSource))
5339            .unwrap();
5340
5341        let app = actuator_router(true).with_state(state);
5342        let resp = app
5343            .oneshot(
5344                Request::builder()
5345                    .uri("/actuator/prometheus")
5346                    .body(Body::empty())
5347                    .unwrap(),
5348            )
5349            .await
5350            .unwrap();
5351        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5352            .await
5353            .unwrap();
5354        let text = String::from_utf8(body.to_vec()).unwrap();
5355
5356        let occurrences = text.matches("# HELP autumn_http_requests_total").count();
5357        assert_eq!(
5358            occurrences, 1,
5359            "built-in must not be shadowed by plugin:\n{text}"
5360        );
5361        assert!(
5362            !text.contains("999"),
5363            "plugin shadow value must not appear:\n{text}"
5364        );
5365    }
5366
5367    #[tokio::test]
5368    async fn prometheus_endpoint_skips_builtin_duration_family_collision() {
5369        // The new built-in latency family must be in the duplicate guard so a
5370        // plugin emitting the same family cannot produce a second HELP/TYPE block.
5371        struct ShadowLatency;
5372        impl MetricsSource for ShadowLatency {
5373            fn collect(&self) -> Vec<MetricFamily> {
5374                vec![MetricFamily {
5375                    name: "autumn_http_request_duration_seconds".to_string(),
5376                    help: "plugin trying to shadow built-in latency".to_string(),
5377                    kind: MetricKind::Gauge,
5378                    samples: vec![MetricSample {
5379                        labels: vec![],
5380                        value: 999.0,
5381                    }],
5382                }]
5383            }
5384        }
5385
5386        let state = test_state();
5387        state
5388            .metrics_source_registry
5389            .register("shadow_latency", Arc::new(ShadowLatency))
5390            .unwrap();
5391
5392        let app = actuator_router(true).with_state(state);
5393        let resp = app
5394            .oneshot(
5395                Request::builder()
5396                    .uri("/actuator/prometheus")
5397                    .body(Body::empty())
5398                    .unwrap(),
5399            )
5400            .await
5401            .unwrap();
5402        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5403            .await
5404            .unwrap();
5405        let text = String::from_utf8(body.to_vec()).unwrap();
5406
5407        let occurrences = text
5408            .matches("# HELP autumn_http_request_duration_seconds")
5409            .count();
5410        assert_eq!(
5411            occurrences, 1,
5412            "built-in latency family must not be shadowed by plugin:\n{text}"
5413        );
5414        assert!(
5415            !text.contains("999"),
5416            "plugin shadow value must not appear:\n{text}"
5417        );
5418    }
5419
5420    #[tokio::test]
5421    async fn prometheus_endpoint_skips_duplicate_series_within_family() {
5422        struct DupSeriesSource;
5423        impl MetricsSource for DupSeriesSource {
5424            fn collect(&self) -> Vec<MetricFamily> {
5425                vec![MetricFamily {
5426                    name: "dup_series_metric".to_string(),
5427                    help: "test".to_string(),
5428                    kind: MetricKind::Counter,
5429                    samples: vec![
5430                        MetricSample {
5431                            labels: vec![("region".to_string(), "us".to_string())],
5432                            value: 10.0,
5433                        },
5434                        MetricSample {
5435                            labels: vec![("region".to_string(), "us".to_string())],
5436                            value: 20.0,
5437                        },
5438                    ],
5439                }]
5440            }
5441        }
5442
5443        let state = test_state();
5444        state
5445            .metrics_source_registry
5446            .register("dup_series", Arc::new(DupSeriesSource))
5447            .unwrap();
5448
5449        let app = actuator_router(true).with_state(state);
5450        let resp = app
5451            .oneshot(
5452                Request::builder()
5453                    .uri("/actuator/prometheus")
5454                    .body(Body::empty())
5455                    .unwrap(),
5456            )
5457            .await
5458            .unwrap();
5459        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5460            .await
5461            .unwrap();
5462        let text = String::from_utf8(body.to_vec()).unwrap();
5463
5464        // First occurrence kept, second skipped
5465        assert!(
5466            text.contains("dup_series_metric{region=\"us\"} 10"),
5467            "first sample must appear:\n{text}"
5468        );
5469        assert!(
5470            !text.contains("dup_series_metric{region=\"us\"} 20"),
5471            "duplicate series must be dropped:\n{text}"
5472        );
5473    }
5474
5475    // ── RED then GREEN: /actuator/logfile endpoint ─────────────
5476
5477    fn make_log_buffer_with_entries() -> crate::log::capture::LogBuffer {
5478        use crate::log::capture::{CapturedLogEntry, LogBuffer};
5479        use crate::log::filter::ParameterFilter;
5480        let buf = LogBuffer::new(100, ParameterFilter::default());
5481        buf.push(CapturedLogEntry {
5482            timestamp: "2024-01-01T00:00:00.000Z".to_owned(),
5483            level: "INFO".to_owned(),
5484            target: "myapp::orders".to_owned(),
5485            message: "order created".to_owned(),
5486            fields: {
5487                let mut m = serde_json::Map::new();
5488                m.insert("order_id".to_owned(), serde_json::json!("A-1001"));
5489                m
5490            },
5491            request_id: Some("req-abc".to_owned()),
5492        });
5493        buf.push(CapturedLogEntry {
5494            timestamp: "2024-01-01T00:00:01.000Z".to_owned(),
5495            level: "WARN".to_owned(),
5496            target: "myapp::payments".to_owned(),
5497            message: "payment slow".to_owned(),
5498            fields: serde_json::Map::new(),
5499            request_id: None,
5500        });
5501        buf.push(CapturedLogEntry {
5502            timestamp: "2024-01-01T00:00:02.000Z".to_owned(),
5503            level: "ERROR".to_owned(),
5504            target: "myapp::payments".to_owned(),
5505            message: "payment failed".to_owned(),
5506            fields: serde_json::Map::new(),
5507            request_id: None,
5508        });
5509        buf
5510    }
5511
5512    #[tokio::test]
5513    async fn green_logfile_returns_empty_when_capture_disabled() {
5514        let state = test_state(); // log_buffer = None
5515        let response =
5516            logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5517                .await
5518                .unwrap();
5519        let body = response.0;
5520        assert!(!body.capture_enabled);
5521        assert!(body.entries.is_empty());
5522        assert_eq!(body.total, 0);
5523    }
5524
5525    #[tokio::test]
5526    async fn green_logfile_returns_all_entries_when_no_filter() {
5527        let mut state = test_state();
5528        state.log_buffer = Some(make_log_buffer_with_entries());
5529
5530        let response =
5531            logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5532                .await
5533                .unwrap();
5534        let body = response.0;
5535        assert!(body.capture_enabled);
5536        assert_eq!(body.total, 3);
5537        assert_eq!(body.entries.len(), 3);
5538        // newest-last ordering
5539        assert_eq!(body.entries[0].level, "INFO");
5540        assert_eq!(body.entries[2].level, "ERROR");
5541    }
5542
5543    #[tokio::test]
5544    async fn green_logfile_level_filter_excludes_info_when_min_warn() {
5545        let mut state = test_state();
5546        state.log_buffer = Some(make_log_buffer_with_entries());
5547
5548        let response = logfile_endpoint(
5549            State(state),
5550            axum::extract::Query(LogfileQuery {
5551                level: Some("warn".to_owned()),
5552                limit: None,
5553            }),
5554        )
5555        .await
5556        .unwrap();
5557        let body = response.0;
5558        assert_eq!(body.entries.len(), 2);
5559        assert!(body.entries.iter().all(|e| e.level != "INFO"));
5560    }
5561
5562    #[tokio::test]
5563    async fn green_logfile_limit_returns_most_recent_n() {
5564        let mut state = test_state();
5565        state.log_buffer = Some(make_log_buffer_with_entries());
5566
5567        let response = logfile_endpoint(
5568            State(state),
5569            axum::extract::Query(LogfileQuery {
5570                level: None,
5571                limit: Some(2),
5572            }),
5573        )
5574        .await
5575        .unwrap();
5576        let body = response.0;
5577        assert_eq!(body.entries.len(), 2);
5578        // Most recent 2 = WARN and ERROR
5579        assert_eq!(body.entries[0].level, "WARN");
5580        assert_eq!(body.entries[1].level, "ERROR");
5581    }
5582
5583    #[tokio::test]
5584    async fn green_logfile_sensitive_fields_in_response_are_served_scrubbed() {
5585        use crate::log::capture::{CapturedLogEntry, LogBuffer};
5586        use crate::log::filter::{FILTERED_PLACEHOLDER, ParameterFilter};
5587        let buf = LogBuffer::new(10, ParameterFilter::default());
5588        // The layer scrubs before storage; simulate stored entry with scrubbed value.
5589        buf.push(CapturedLogEntry {
5590            timestamp: "2024-01-01T00:00:00.000Z".to_owned(),
5591            level: "INFO".to_owned(),
5592            target: "auth".to_owned(),
5593            message: "login attempt".to_owned(),
5594            fields: {
5595                let mut m = serde_json::Map::new();
5596                m.insert(
5597                    "password".to_owned(),
5598                    serde_json::Value::String(FILTERED_PLACEHOLDER.to_owned()),
5599                );
5600                m
5601            },
5602            request_id: None,
5603        });
5604
5605        let mut state = test_state();
5606        state.log_buffer = Some(buf);
5607
5608        let response =
5609            logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5610                .await
5611                .unwrap();
5612        let body = response.0;
5613        assert_eq!(
5614            body.entries[0].fields["password"].as_str().unwrap(),
5615            FILTERED_PLACEHOLDER,
5616            "sensitive value must remain scrubbed in the response"
5617        );
5618    }
5619
5620    #[tokio::test]
5621    async fn green_logfile_invalid_level_returns_400() {
5622        let state = test_state();
5623        let result = logfile_endpoint(
5624            State(state),
5625            axum::extract::Query(LogfileQuery {
5626                level: Some("warning".to_owned()), // invalid — should be "warn"
5627                limit: None,
5628            }),
5629        )
5630        .await;
5631        let (status, _body) = result.unwrap_err();
5632        assert_eq!(status, StatusCode::BAD_REQUEST);
5633    }
5634
5635    #[tokio::test]
5636    async fn green_logfile_endpoint_in_sensitive_router() {
5637        // The endpoint must be reachable when sensitive=true.
5638        let state = test_state();
5639        let app = actuator_router::<TestActuatorState>(true).with_state(state);
5640        let resp = app
5641            .oneshot(
5642                Request::builder()
5643                    .uri("/actuator/logfile")
5644                    .body(Body::empty())
5645                    .unwrap(),
5646            )
5647            .await
5648            .unwrap();
5649        assert_eq!(resp.status(), StatusCode::OK);
5650    }
5651
5652    #[tokio::test]
5653    async fn green_logfile_endpoint_not_in_non_sensitive_router() {
5654        // The endpoint must NOT be reachable when sensitive=false.
5655        let state = test_state();
5656        let app = actuator_router::<TestActuatorState>(false).with_state(state);
5657        let resp = app
5658            .oneshot(
5659                Request::builder()
5660                    .uri("/actuator/logfile")
5661                    .body(Body::empty())
5662                    .unwrap(),
5663            )
5664            .await
5665            .unwrap();
5666        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
5667    }
5668
5669    #[tokio::test]
5670    async fn green_logfile_structured_fields_preserved() {
5671        let mut state = test_state();
5672        state.log_buffer = Some(make_log_buffer_with_entries());
5673
5674        let response =
5675            logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5676                .await
5677                .unwrap();
5678        let body = response.0;
5679        let first = &body.entries[0];
5680        assert_eq!(first.target, "myapp::orders");
5681        assert_eq!(first.fields["order_id"].as_str().unwrap(), "A-1001");
5682        assert_eq!(first.request_id.as_deref(), Some("req-abc"));
5683    }
5684}
5685
5686#[cfg(test)]
5687mod health_indicator_tests {
5688    use super::*;
5689
5690    struct AlwaysUp;
5691    impl HealthIndicator for AlwaysUp {
5692        fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5693            Box::pin(async { HealthCheckOutput::up() })
5694        }
5695    }
5696
5697    struct AlwaysDown;
5698    impl HealthIndicator for AlwaysDown {
5699        fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5700            Box::pin(async { HealthCheckOutput::down() })
5701        }
5702    }
5703
5704    #[test]
5705    fn health_status_as_str_values() {
5706        assert_eq!(HealthStatus::Up.as_str(), "UP");
5707        assert_eq!(HealthStatus::Down.as_str(), "DOWN");
5708        assert_eq!(HealthStatus::OutOfService.as_str(), "OUT_OF_SERVICE");
5709        assert_eq!(HealthStatus::Unknown.as_str(), "UNKNOWN");
5710    }
5711
5712    #[test]
5713    fn health_status_is_healthy() {
5714        assert!(HealthStatus::Up.is_healthy());
5715        assert!(HealthStatus::Unknown.is_healthy());
5716        assert!(!HealthStatus::Down.is_healthy());
5717        assert!(!HealthStatus::OutOfService.is_healthy());
5718    }
5719
5720    #[test]
5721    fn aggregate_status_precedence() {
5722        assert_eq!(
5723            HealthIndicatorRegistry::aggregate_status(&[HealthStatus::Up]),
5724            HealthStatus::Up
5725        );
5726        assert_eq!(
5727            HealthIndicatorRegistry::aggregate_status(&[HealthStatus::Up, HealthStatus::Unknown]),
5728            HealthStatus::Unknown
5729        );
5730        assert_eq!(
5731            HealthIndicatorRegistry::aggregate_status(&[
5732                HealthStatus::Unknown,
5733                HealthStatus::OutOfService
5734            ]),
5735            HealthStatus::OutOfService
5736        );
5737        assert_eq!(
5738            HealthIndicatorRegistry::aggregate_status(&[
5739                HealthStatus::OutOfService,
5740                HealthStatus::Down
5741            ]),
5742            HealthStatus::Down
5743        );
5744        assert_eq!(
5745            HealthIndicatorRegistry::aggregate_status(&[]),
5746            HealthStatus::Up
5747        );
5748    }
5749
5750    #[tokio::test]
5751    async fn registry_run_all_collects_results() {
5752        let registry = HealthIndicatorRegistry::new();
5753        registry
5754            .register("svc_a", IndicatorGroup::Readiness, Arc::new(AlwaysUp))
5755            .unwrap();
5756        registry
5757            .register("svc_b", IndicatorGroup::HealthOnly, Arc::new(AlwaysDown))
5758            .unwrap();
5759
5760        let results = registry.run_all().await;
5761        assert!(
5762            results
5763                .iter()
5764                .any(|r| r.name == "svc_a" && r.output.status == HealthStatus::Up)
5765        );
5766        assert!(
5767            results
5768                .iter()
5769                .any(|r| r.name == "svc_b" && r.output.status == HealthStatus::Down)
5770        );
5771    }
5772
5773    #[tokio::test]
5774    async fn registry_run_readiness_filters_health_only() {
5775        let registry = HealthIndicatorRegistry::new();
5776        registry
5777            .register("probe_check", IndicatorGroup::Readiness, Arc::new(AlwaysUp))
5778            .unwrap();
5779        registry
5780            .register(
5781                "health_only",
5782                IndicatorGroup::HealthOnly,
5783                Arc::new(AlwaysDown),
5784            )
5785            .unwrap();
5786
5787        let results = registry.run_readiness().await;
5788        assert_eq!(results.len(), 1);
5789        assert_eq!(results[0].name, "probe_check");
5790    }
5791
5792    #[tokio::test]
5793    async fn timed_out_indicator_reports_unknown_with_flag() {
5794        struct SlowIndicator;
5795        impl HealthIndicator for SlowIndicator {
5796            fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5797                Box::pin(async {
5798                    tokio::time::sleep(std::time::Duration::from_secs(30)).await;
5799                    HealthCheckOutput::up()
5800                })
5801            }
5802            fn timeout_ms(&self) -> u64 {
5803                5
5804            }
5805        }
5806        let registry = HealthIndicatorRegistry::new();
5807        registry
5808            .register("slow", IndicatorGroup::Readiness, Arc::new(SlowIndicator))
5809            .unwrap();
5810        let results = registry.run_all().await;
5811        let slow_res = results
5812            .iter()
5813            .find(|r| r.name == "slow")
5814            .expect("slow indicator not found");
5815        assert_eq!(slow_res.output.status, HealthStatus::Unknown);
5816        assert_eq!(
5817            slow_res.output.details.get("timed_out"),
5818            Some(&serde_json::Value::Bool(true))
5819        );
5820    }
5821
5822    #[tokio::test]
5823    #[allow(clippy::await_holding_lock)]
5824    async fn test_circuit_breakers_in_health_indicator_registry() {
5825        let _lock = crate::circuit_breaker::TEST_LOCK
5826            .lock()
5827            .unwrap_or_else(std::sync::PoisonError::into_inner);
5828        crate::circuit_breaker::global_registry().clear();
5829        let registry = HealthIndicatorRegistry::new();
5830        let breaker = crate::circuit_breaker::global_registry().get_or_create(
5831            "actuator_test_breaker",
5832            crate::circuit_breaker::CircuitBreakerPolicy {
5833                failure_ratio_threshold: 0.5,
5834                sample_window: std::time::Duration::from_secs(10),
5835                minimum_sample_count: 2,
5836                open_duration: std::time::Duration::from_secs(60),
5837                half_open_trial_count: 2,
5838            },
5839        );
5840
5841        let results = registry.run_all().await;
5842        let found = results
5843            .iter()
5844            .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5845        assert!(found.is_some(), "Should find circuit breaker in run_all");
5846        let result = found.unwrap();
5847        assert_eq!(result.group, IndicatorGroup::HealthOnly);
5848        assert_eq!(result.output.status, HealthStatus::Up);
5849        assert_eq!(result.output.details.get("state").unwrap(), "CLOSED");
5850
5851        breaker.after_call(false);
5852        breaker.after_call(false);
5853        assert_eq!(breaker.state(), crate::circuit_breaker::CircuitState::Open);
5854
5855        let results = registry.run_all().await;
5856        let found = results
5857            .iter()
5858            .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5859        assert_eq!(found.unwrap().output.status, HealthStatus::Down);
5860        assert_eq!(found.unwrap().output.details.get("state").unwrap(), "OPEN");
5861
5862        // Transition to HalfOpen manually to check status
5863        {
5864            let mut inner = breaker.inner.lock().unwrap();
5865            inner.state = crate::circuit_breaker::CircuitState::HalfOpen;
5866            inner.half_open_in_flight = 0;
5867            inner.half_open_successes = 0;
5868            inner.half_open_failures = 0;
5869        }
5870        assert_eq!(
5871            breaker.state(),
5872            crate::circuit_breaker::CircuitState::HalfOpen
5873        );
5874
5875        let results = registry.run_all().await;
5876        let found = results
5877            .iter()
5878            .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5879        assert_eq!(found.unwrap().output.status, HealthStatus::Down);
5880        assert_eq!(
5881            found.unwrap().output.details.get("state").unwrap(),
5882            "HALF_OPEN"
5883        );
5884
5885        let readiness_results = registry.run_readiness().await;
5886        let found_readiness = readiness_results
5887            .iter()
5888            .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5889        assert!(
5890            found_readiness.is_none(),
5891            "Should NOT find circuit breaker in run_readiness"
5892        );
5893        crate::circuit_breaker::global_registry().clear();
5894    }
5895}
5896
5897#[cfg(test)]
5898mod havoc_proptest {
5899    use super::*;
5900    use proptest::prelude::*;
5901
5902    proptest! {
5903        #![proptest_config(ProptestConfig::with_cases(1))]
5904        #[test]
5905        fn log_levels_memory_exhaustion(names in proptest::collection::vec(".*", 5000)) {
5906            let levels = LogLevels::new("info");
5907            for name in names {
5908                let _ = levels.set_logger_level(&name, "debug");
5909            }
5910            assert!(levels.logger_overrides().len() <= 1000, "Memory leak: unbounded loggers inserted");
5911        }
5912    }
5913}
5914
5915// ── Nova: Actuator HTMX Dashboard UI ──────────────────────────
5916
5917#[cfg(all(feature = "maud", feature = "htmx"))]
5918async fn ui_dashboard() -> impl IntoResponse {
5919    let html = maud::html! {
5920        (maud::DOCTYPE)
5921        html lang="en" {
5922            head {
5923                meta charset="utf-8";
5924                meta name="viewport" content="width=device-width, initial-scale=1";
5925                title { "Autumn Actuator Dashboard" }
5926                script src="/static/js/htmx.min.js" {}
5927                style {
5928                    (crate::ui::tokens::TOKENS_CSS)
5929                    "body { font-family: var(--font-family); background: var(--bg); color: var(--text); margin: 0; padding: 2rem; }"
5930                    "h1 { font-size: 1.5rem; font-weight: 600; margin-bottom: 1.5rem; }"
5931                    ".grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1.5rem; }"
5932                    ".card { background: var(--surface); padding: 1.5rem; border-radius: var(--radius); box-shadow: var(--shadow); }"
5933                    ".card h2 { font-size: 1.125rem; font-weight: 500; margin-top: 0; margin-bottom: 1rem; border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; }"
5934                    ".stat { display: flex; justify-content: space-between; margin-bottom: 0.5rem; }"
5935                    ".stat-label { color: var(--text-muted); }"
5936                    ".stat-value { font-weight: 500; }"
5937                    ".task-item { border: 1px solid var(--border); padding: 0.75rem; border-radius: 0.375rem; margin-bottom: 0.75rem; }"
5938                    ".task-name { font-weight: 600; display: block; margin-bottom: 0.25rem; }"
5939                    ".task-meta { font-size: 0.875rem; color: var(--text-muted); }"
5940                    ".badge { display: inline-block; padding: 0.125rem 0.375rem; border-radius: 9999px; font-size: 0.75rem; font-weight: 500; }"
5941                    ".badge-green { background: #dcfce7; color: #166534; }"
5942                    ".badge-gray { background: #f3f4f6; color: #374151; }"
5943                    ".badge-red { background: #fee2e2; color: #991b1b; }"
5944                }
5945            }
5946            body {
5947                h1 { "🍂 Autumn Actuator Dashboard" }
5948                div class="grid" {
5949                    div class="card" hx-get="ui/metrics" hx-trigger="load, every 2s" {
5950                        "Loading metrics..."
5951                    }
5952                    div class="card" hx-get="ui/tasks" hx-trigger="load, every 2s" {
5953                        "Loading tasks..."
5954                    }
5955                }
5956            }
5957        }
5958    };
5959    (
5960        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
5961        html.into_string(),
5962    )
5963}
5964
5965#[cfg(not(all(feature = "maud", feature = "htmx")))]
5966async fn ui_dashboard() -> impl IntoResponse {
5967    (
5968        StatusCode::NOT_IMPLEMENTED,
5969        "Maud feature is required for the UI dashboard",
5970    )
5971}
5972
5973#[cfg(all(feature = "maud", feature = "htmx"))]
5974async fn ui_metrics<S: ProvideActuatorState>(State(state): State<S>) -> impl IntoResponse {
5975    let metrics = state.metrics().snapshot();
5976    let uptime = state.uptime_display();
5977
5978    let html = maud::html! {
5979        h2 { "System Metrics" }
5980        div class="stat" {
5981            span class="stat-label" { "Uptime" }
5982            span class="stat-value" { (uptime) }
5983        }
5984        div class="stat" {
5985            span class="stat-label" { "Total Requests" }
5986            span class="stat-value" { (metrics.http.requests_total) }
5987        }
5988        div class="stat" {
5989            span class="stat-label" { "Active Requests" }
5990            span class="stat-value" { (metrics.http.requests_active) }
5991        }
5992        div class="stat" {
5993            span class="stat-label" { "P95 Latency" }
5994            span class="stat-value" { (metrics.http.latency_ms.p95) " ms" }
5995        }
5996        div class="stat" {
5997            span class="stat-label" { "P99 Latency" }
5998            span class="stat-value" { (metrics.http.latency_ms.p99) " ms" }
5999        }
6000    };
6001    (
6002        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
6003        html.into_string(),
6004    )
6005}
6006
6007#[cfg(not(all(feature = "maud", feature = "htmx")))]
6008async fn ui_metrics<S: ProvideActuatorState>() -> impl IntoResponse {
6009    (
6010        StatusCode::NOT_IMPLEMENTED,
6011        "Maud feature is required for the UI dashboard",
6012    )
6013}
6014
6015#[cfg(all(feature = "maud", feature = "htmx"))]
6016async fn ui_tasks<S: ProvideActuatorState>(State(state): State<S>) -> impl IntoResponse {
6017    let tasks = state.task_registry().snapshot();
6018
6019    let html = maud::html! {
6020        h2 { "Background Tasks" }
6021        @if tasks.is_empty() {
6022            p class="stat-label" { "No tasks registered." }
6023        } @else {
6024            @for (name, task) in tasks.iter() {
6025                div class="task-item" {
6026                    span class="task-name" { (name) }
6027                    div class="task-meta" {
6028                        @if task.status == "running" {
6029                            span class="badge badge-green" { "Running" }
6030                        } @else {
6031                            span class="badge badge-gray" { "Idle" }
6032                        }
6033                        " "
6034                        "Runs: " (task.total_runs)
6035                        @if task.total_failures > 0 {
6036                            " " span class="badge badge-red" { "Failures: " (task.total_failures) }
6037                        }
6038                    }
6039                }
6040            }
6041        }
6042    };
6043    (
6044        [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
6045        html.into_string(),
6046    )
6047}
6048
6049#[cfg(not(all(feature = "maud", feature = "htmx")))]
6050async fn ui_tasks<S: ProvideActuatorState>() -> impl IntoResponse {
6051    (
6052        StatusCode::NOT_IMPLEMENTED,
6053        "Maud feature is required for the UI dashboard",
6054    )
6055}