1use std::collections::HashMap;
11use std::sync::{Arc, RwLock};
12
13use axum::Json;
14use axum::extract::{Path, State};
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Clone, Serialize, Default)]
25pub struct A11yPosture {
26 pub lang_set: bool,
28 pub skip_link_present: bool,
30 pub landmark_regions_present: bool,
33}
34
35impl A11yPosture {
36 #[must_use]
38 pub const fn is_compliant(&self) -> bool {
39 self.lang_set && self.skip_link_present && self.landmark_regions_present
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum MetricKind {
51 Counter,
53 Gauge,
55}
56
57impl MetricKind {
58 const fn as_str(&self) -> &'static str {
59 match self {
60 Self::Counter => "counter",
61 Self::Gauge => "gauge",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
71pub struct MetricSample {
72 pub labels: Vec<(String, String)>,
74 pub value: f64,
76}
77
78#[derive(Debug, Clone)]
83pub struct MetricFamily {
84 pub name: String,
89 pub help: String,
91 pub kind: MetricKind,
93 pub samples: Vec<MetricSample>,
95}
96
97pub trait MetricsSource: Send + Sync + 'static {
119 fn collect(&self) -> Vec<MetricFamily>;
121}
122
123#[derive(Clone, Default)]
129pub struct MetricsSourceRegistry {
130 inner: Arc<RwLock<MetricsSourceRegistryInner>>,
131}
132
133#[derive(Default)]
134struct MetricsSourceRegistryInner {
135 sources: Vec<(String, Arc<dyn MetricsSource>)>,
137 error_counts: HashMap<String, u64>,
139}
140
141impl MetricsSourceRegistry {
142 #[must_use]
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub fn register(
157 &self,
158 name: impl Into<String>,
159 source: Arc<dyn MetricsSource>,
160 ) -> Result<(), String> {
161 let name = name.into();
162 {
163 let mut inner = self
164 .inner
165 .write()
166 .unwrap_or_else(std::sync::PoisonError::into_inner);
167 if inner.sources.iter().any(|(n, _)| n == &name) {
168 return Err(format!(
169 "MetricsSource '{name}' is already registered; skipping duplicate"
170 ));
171 }
172 inner.sources.push((name, source));
173 }
174 Ok(())
175 }
176
177 pub fn collect_all(&self) -> Vec<(String, Vec<MetricFamily>)> {
182 let sources: Vec<(String, Arc<dyn MetricsSource>)> = self
183 .inner
184 .read()
185 .unwrap_or_else(std::sync::PoisonError::into_inner)
186 .sources
187 .clone();
188
189 let mut results = Vec::with_capacity(sources.len());
190 let mut panicked = Vec::new();
191
192 for (name, source) in &sources {
193 let result =
194 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| source.collect()));
195 if let Ok(families) = result {
196 results.push((name.clone(), families));
197 } else {
198 tracing::error!(source_name = %name, "MetricsSource panicked during collection");
199 panicked.push(name.clone());
200 results.push((name.clone(), vec![]));
201 }
202 }
203
204 if !panicked.is_empty() {
205 let mut inner = self
206 .inner
207 .write()
208 .unwrap_or_else(std::sync::PoisonError::into_inner);
209 for name in panicked {
210 *inner.error_counts.entry(name).or_insert(0) += 1;
211 }
212 }
213
214 results
215 }
216
217 #[must_use]
219 pub fn error_counts(&self) -> HashMap<String, u64> {
220 self.inner
221 .read()
222 .unwrap_or_else(std::sync::PoisonError::into_inner)
223 .error_counts
224 .clone()
225 }
226
227 #[must_use]
229 pub fn source_names(&self) -> Vec<String> {
230 self.inner
231 .read()
232 .unwrap_or_else(std::sync::PoisonError::into_inner)
233 .sources
234 .iter()
235 .map(|(n, _)| n.clone())
236 .collect()
237 }
238
239 #[must_use]
241 pub fn is_empty(&self) -> bool {
242 self.inner
243 .read()
244 .unwrap_or_else(std::sync::PoisonError::into_inner)
245 .sources
246 .is_empty()
247 }
248}
249
250pub trait ProvideActuatorState {
256 fn metrics(&self) -> &crate::middleware::MetricsCollector;
259
260 fn log_levels(&self) -> &LogLevels;
263
264 fn task_registry(&self) -> &TaskRegistry;
267
268 fn job_registry(&self) -> &JobRegistry;
271
272 fn config_props(&self) -> &ConfigProperties;
275
276 fn profile(&self) -> &str;
279
280 fn uptime_display(&self) -> String;
283
284 #[cfg(feature = "ws")]
287 fn channels(&self) -> &crate::channels::Channels;
288
289 #[cfg(feature = "ws")]
291 fn shutdown_token(&self) -> tokio_util::sync::CancellationToken;
292
293 #[cfg(feature = "db")]
296 fn pool(
297 &self,
298 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
299
300 fn a11y_posture(&self) -> A11yPosture {
306 A11yPosture::default()
307 }
308
309 fn metrics_source_registry(&self) -> Option<&MetricsSourceRegistry> {
315 None
316 }
317
318 fn health_indicator_registry(&self) -> Option<&HealthIndicatorRegistry> {
324 None
325 }
326
327 fn health_detailed(&self) -> bool {
332 true
333 }
334
335 fn deploy_version(&self) -> String {
343 crate::canary::STABLE.to_owned()
344 }
345
346 #[cfg(feature = "http-client")]
347 fn webhook_outbound(&self) -> Option<crate::webhook_outbound::WebhookOutboundManager> {
349 None
350 }
351
352 fn log_buffer(&self) -> Option<crate::log::capture::LogBuffer> {
358 None
359 }
360}
361
362#[derive(Clone)]
369pub struct LogLevels {
370 inner: Arc<RwLock<LogLevelsInner>>,
371}
372
373struct LogLevelsInner {
374 current_level: String,
376 logger_overrides: HashMap<String, String>,
378}
379
380impl LogLevels {
381 #[must_use]
383 pub fn new(initial_level: &str) -> Self {
384 Self {
385 inner: Arc::new(RwLock::new(LogLevelsInner {
386 current_level: initial_level.to_string(),
387 logger_overrides: HashMap::new(),
388 })),
389 }
390 }
391
392 #[must_use]
394 pub fn current_level(&self) -> String {
395 self.inner
396 .read()
397 .map_or_else(|_| "info".to_string(), |guard| guard.current_level.clone())
398 }
399
400 #[must_use]
402 pub fn logger_overrides(&self) -> HashMap<String, String> {
403 self.inner
404 .read()
405 .map(|guard| guard.logger_overrides.clone())
406 .unwrap_or_default()
407 }
408
409 #[must_use]
411 pub fn set_logger_level(&self, name: &str, level: &str) -> Option<String> {
412 let Ok(mut guard) = self.inner.write() else {
413 return None;
414 };
415 if guard.logger_overrides.len() >= 1000 && !guard.logger_overrides.contains_key(name) {
417 return None;
418 }
419
420 let previous = guard.logger_overrides.get(name).cloned();
421 guard
422 .logger_overrides
423 .insert(name.to_string(), level.to_string());
424 if name == "root" || name.is_empty() {
426 let prev = Some(guard.current_level.clone());
427 guard.current_level = level.to_string();
428 return prev;
429 }
430 previous
431 }
432}
433
434impl std::fmt::Debug for LogLevels {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 f.debug_struct("LogLevels")
437 .field("current_level", &self.current_level())
438 .finish()
439 }
440}
441
442#[derive(Debug, Clone, Serialize)]
444pub struct TaskStatus {
445 pub schedule: String,
447 pub coordination: crate::task::TaskCoordination,
449 pub scheduler_backend: String,
451 pub replica_id: String,
453 #[serde(skip_serializing_if = "Option::is_none")]
455 pub current_leader: Option<String>,
456 #[serde(skip_serializing_if = "Option::is_none")]
458 pub last_tick: Option<String>,
459 #[serde(skip_serializing_if = "Option::is_none")]
461 pub last_fired_at: Option<String>,
462 #[serde(skip_serializing_if = "Option::is_none")]
464 pub next_run_at: Option<String>,
465 pub status: String,
467 #[serde(skip_serializing_if = "Option::is_none")]
469 pub last_run: Option<String>,
470 #[serde(skip_serializing_if = "Option::is_none")]
472 pub last_duration_ms: Option<u64>,
473 #[serde(skip_serializing_if = "Option::is_none")]
475 pub last_result: Option<String>,
476 #[serde(skip_serializing_if = "Option::is_none")]
478 pub last_error: Option<String>,
479 pub total_runs: u64,
481 pub total_failures: u64,
483}
484
485#[derive(Clone)]
487pub struct TaskRegistry {
488 inner: Arc<RwLock<HashMap<String, TaskStatus>>>,
489}
490
491#[derive(Debug, Clone, Serialize)]
493pub struct JobStatus {
494 pub queued: u64,
496 pub in_flight: u64,
498 pub blocked_on_concurrency: u64,
500 pub total_successes: u64,
502 pub total_failures: u64,
504 pub dead_letters: u64,
506 pub total_deduplicated: u64,
508 #[serde(skip_serializing_if = "Option::is_none")]
510 pub last_error: Option<String>,
511}
512
513impl JobStatus {
514 const fn empty() -> Self {
515 Self {
516 queued: 0,
517 in_flight: 0,
518 blocked_on_concurrency: 0,
519 total_successes: 0,
520 total_failures: 0,
521 dead_letters: 0,
522 total_deduplicated: 0,
523 last_error: None,
524 }
525 }
526}
527
528#[derive(Clone)]
530pub struct JobRegistry {
531 inner: Arc<RwLock<HashMap<String, JobStatus>>>,
532}
533
534impl JobRegistry {
535 #[must_use]
537 pub fn new() -> Self {
538 Self {
539 inner: Arc::new(RwLock::new(HashMap::new())),
540 }
541 }
542
543 pub fn register(&self, name: &str) {
545 if let Ok(mut guard) = self.inner.write() {
546 guard.entry(name.to_string()).or_insert(JobStatus::empty());
547 }
548 }
549
550 pub fn record_enqueue(&self, name: &str) {
552 if let Ok(mut guard) = self.inner.write() {
553 let status = guard.entry(name.to_string()).or_insert(JobStatus::empty());
554 status.queued = status.queued.saturating_add(1);
555 }
556 }
557
558 pub fn record_deduplicated(&self, name: &str) {
563 if let Ok(mut guard) = self.inner.write()
564 && let Some(status) = guard.get_mut(name)
565 {
566 status.queued = status.queued.saturating_sub(1);
567 status.total_deduplicated = status.total_deduplicated.saturating_add(1);
568 }
569 }
570
571 pub fn record_concurrency_blocked(&self, name: &str) {
573 if let Ok(mut guard) = self.inner.write()
574 && let Some(status) = guard.get_mut(name)
575 {
576 status.blocked_on_concurrency = status.blocked_on_concurrency.saturating_add(1);
577 }
578 }
579
580 pub fn record_concurrency_unblocked(&self, name: &str) {
582 if let Ok(mut guard) = self.inner.write()
583 && let Some(status) = guard.get_mut(name)
584 {
585 status.blocked_on_concurrency = status.blocked_on_concurrency.saturating_sub(1);
586 }
587 }
588
589 pub fn set_concurrency_blocked_counts(&self, counts: &HashMap<String, u64>) {
595 if let Ok(mut guard) = self.inner.write() {
596 for (name, status) in guard.iter_mut() {
597 status.blocked_on_concurrency = counts.get(name).copied().unwrap_or(0);
598 }
599 }
600 }
601
602 pub fn record_start(&self, name: &str) {
604 if let Ok(mut guard) = self.inner.write()
605 && let Some(status) = guard.get_mut(name)
606 {
607 status.queued = status.queued.saturating_sub(1);
608 status.in_flight = status.in_flight.saturating_add(1);
609 }
610 }
611
612 pub fn record_cancel(&self, name: &str) {
614 if let Ok(mut guard) = self.inner.write()
615 && let Some(status) = guard.get_mut(name)
616 {
617 status.queued = status.queued.saturating_sub(1);
618 }
619 }
620
621 pub fn record_success(&self, name: &str) {
623 if let Ok(mut guard) = self.inner.write()
624 && let Some(status) = guard.get_mut(name)
625 {
626 status.in_flight = status.in_flight.saturating_sub(1);
627 status.total_successes = status.total_successes.saturating_add(1);
628 status.last_error = None;
629 }
630 }
631
632 pub fn record_retry(&self, name: &str, error: &str, _attempt: u32) {
634 if let Ok(mut guard) = self.inner.write()
635 && let Some(status) = guard.get_mut(name)
636 {
637 status.in_flight = status.in_flight.saturating_sub(1);
638 status.last_error = Some(error.to_string());
639 }
640 }
641
642 pub fn record_failure(&self, name: &str, error: String, dead_lettered: bool) {
644 if let Ok(mut guard) = self.inner.write()
645 && let Some(status) = guard.get_mut(name)
646 {
647 status.in_flight = status.in_flight.saturating_sub(1);
648 status.total_failures = status.total_failures.saturating_add(1);
649 status.last_error = Some(error);
650 if dead_lettered {
651 status.dead_letters = status.dead_letters.saturating_add(1);
652 }
653 }
654 }
655
656 #[must_use]
658 pub fn snapshot(&self) -> HashMap<String, JobStatus> {
659 self.inner.read().map(|g| g.clone()).unwrap_or_default()
660 }
661}
662
663impl Default for JobRegistry {
664 fn default() -> Self {
665 Self::new()
666 }
667}
668
669impl TaskRegistry {
670 #[must_use]
672 pub fn new() -> Self {
673 Self {
674 inner: Arc::new(RwLock::new(HashMap::new())),
675 }
676 }
677
678 pub fn register(&self, name: &str, schedule: &str) {
680 self.register_scheduled(
681 name,
682 schedule,
683 crate::task::TaskCoordination::Fleet,
684 "in_process",
685 "unknown",
686 );
687 }
688
689 pub fn register_scheduled(
691 &self,
692 name: &str,
693 schedule: &str,
694 coordination: crate::task::TaskCoordination,
695 scheduler_backend: &str,
696 replica_id: &str,
697 ) {
698 let Ok(mut guard) = self.inner.write() else {
699 return;
700 };
701 guard.insert(
702 name.to_string(),
703 TaskStatus {
704 schedule: schedule.to_string(),
705 coordination,
706 scheduler_backend: scheduler_backend.to_string(),
707 replica_id: replica_id.to_string(),
708 current_leader: None,
709 last_tick: None,
710 last_fired_at: None,
711 next_run_at: None,
712 status: "idle".to_string(),
713 last_run: None,
714 last_duration_ms: None,
715 last_result: None,
716 last_error: None,
717 total_runs: 0,
718 total_failures: 0,
719 },
720 );
721 }
722
723 pub fn record_leader(&self, name: &str, leader_id: &str, tick_key: &str) {
725 let Ok(mut guard) = self.inner.write() else {
726 return;
727 };
728 let Some(task) = guard.get_mut(name) else {
729 return;
730 };
731 task.current_leader = Some(leader_id.to_string());
732 task.last_tick = Some(tick_key.to_string());
733 }
734
735 pub fn record_start(&self, name: &str) {
737 let Ok(mut guard) = self.inner.write() else {
738 return;
739 };
740 let Some(task) = guard.get_mut(name) else {
741 return;
742 };
743 task.status = "running".to_string();
744 task.next_run_at = None;
745 }
746
747 pub fn record_next_run_at(&self, name: &str, next_run_at: &str) {
749 let Ok(mut guard) = self.inner.write() else {
750 return;
751 };
752 let Some(task) = guard.get_mut(name) else {
753 return;
754 };
755 task.next_run_at = Some(next_run_at.to_string());
756 }
757
758 pub fn record_success(&self, name: &str, duration_ms: u64) {
760 let Ok(mut guard) = self.inner.write() else {
761 return;
762 };
763 let Some(task) = guard.get_mut(name) else {
764 return;
765 };
766 task.status = "idle".to_string();
767 let now = chrono::Utc::now().to_rfc3339();
768 task.last_run = Some(now.clone());
769 task.last_fired_at = Some(now);
770 task.last_duration_ms = Some(duration_ms);
771 task.last_result = Some("ok".to_string());
772 task.last_error = None;
773 task.total_runs += 1;
774 }
775
776 pub fn record_failure(&self, name: &str, duration_ms: u64, error: &str) {
778 let Ok(mut guard) = self.inner.write() else {
779 return;
780 };
781 let Some(task) = guard.get_mut(name) else {
782 return;
783 };
784 task.status = "idle".to_string();
785 let now = chrono::Utc::now().to_rfc3339();
786 task.last_run = Some(now.clone());
787 task.last_fired_at = Some(now);
788 task.last_duration_ms = Some(duration_ms);
789 task.last_result = Some("failed".to_string());
790 task.last_error = Some(error.to_string());
791 task.total_runs += 1;
792 task.total_failures += 1;
793 }
794
795 #[must_use]
797 pub fn snapshot(&self) -> HashMap<String, TaskStatus> {
798 self.inner
799 .read()
800 .map(|guard| guard.clone())
801 .unwrap_or_default()
802 }
803}
804
805impl Default for TaskRegistry {
806 fn default() -> Self {
807 Self::new()
808 }
809}
810
811impl std::fmt::Debug for TaskRegistry {
812 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813 f.debug_struct("TaskRegistry")
814 .field("count", &self.snapshot().len())
815 .finish()
816 }
817}
818
819#[derive(Debug, Clone, Serialize, Deserialize)]
821pub struct ConfigProperty {
822 pub value: serde_json::Value,
824 pub source: String,
826}
827
828#[derive(Debug, Clone, Default)]
830pub struct ConfigProperties {
831 inner: Arc<RwLock<HashMap<String, ConfigProperty>>>,
832}
833
834impl ConfigProperties {
835 #[must_use]
837 #[allow(clippy::too_many_lines)]
838 pub fn from_config(config: &crate::config::AutumnConfig) -> Self {
839 let profile = config.profile.as_deref().unwrap_or("default");
840 let defaults = crate::config::AutumnConfig::default();
841
842 let mut props = HashMap::with_capacity(32);
844 let profile_str = profile.to_string();
845
846 Self::track_server_props(&mut props, config, &defaults, &profile_str);
847 Self::track_db_props(&mut props, config, &defaults, &profile_str);
848 Self::track_log_props(&mut props, config, &defaults, &profile_str);
849 Self::track_telemetry_props(&mut props, config, &defaults, &profile_str);
850 Self::track_health_props(&mut props, config, &defaults, &profile_str);
851 Self::track_actuator_props(&mut props, config, &defaults, &profile_str);
852 Self::track_session_props(&mut props, config, &defaults, &profile_str);
853 Self::track_channels_props(&mut props, config, &defaults, &profile_str);
854
855 Self {
856 inner: Arc::new(RwLock::new(props)),
857 }
858 }
859
860 fn track_server_props(
861 props: &mut HashMap<String, ConfigProperty>,
862 config: &crate::config::AutumnConfig,
863 defaults: &crate::config::AutumnConfig,
864 profile_str: &str,
865 ) {
866 Self::track_property(
867 props,
868 "server.host",
869 &config.server.host,
870 &defaults.server.host,
871 profile_str,
872 );
873 Self::track_property(
874 props,
875 "server.port",
876 &config.server.port.to_string(),
877 &defaults.server.port.to_string(),
878 profile_str,
879 );
880 Self::track_property(
881 props,
882 "server.shutdown_timeout_secs",
883 &config.server.shutdown_timeout_secs.to_string(),
884 &defaults.server.shutdown_timeout_secs.to_string(),
885 profile_str,
886 );
887 }
888
889 fn track_db_props(
890 props: &mut HashMap<String, ConfigProperty>,
891 config: &crate::config::AutumnConfig,
892 defaults: &crate::config::AutumnConfig,
893 profile_str: &str,
894 ) {
895 let db_url = config.database.url.as_deref().unwrap_or("").to_string();
896 let primary_url = config
897 .database
898 .primary_url
899 .as_deref()
900 .unwrap_or("")
901 .to_string();
902 let replica_url = config
903 .database
904 .replica_url
905 .as_deref()
906 .unwrap_or("")
907 .to_string();
908 Self::track_property(props, "database.url", &db_url, "", profile_str);
909 Self::track_property(props, "database.primary_url", &primary_url, "", profile_str);
910 Self::track_property(props, "database.replica_url", &replica_url, "", profile_str);
911 Self::track_property(
912 props,
913 "database.pool_size",
914 &config.database.pool_size.to_string(),
915 &defaults.database.pool_size.to_string(),
916 profile_str,
917 );
918 Self::track_property(
919 props,
920 "database.primary_pool_size",
921 &config.database.effective_primary_pool_size().to_string(),
922 &defaults.database.effective_primary_pool_size().to_string(),
923 profile_str,
924 );
925 Self::track_property(
926 props,
927 "database.replica_pool_size",
928 &config.database.effective_replica_pool_size().to_string(),
929 &defaults.database.effective_replica_pool_size().to_string(),
930 profile_str,
931 );
932 Self::track_property(
933 props,
934 "database.replica_fallback",
935 &format!("{:?}", config.database.replica_fallback),
936 &format!("{:?}", defaults.database.replica_fallback),
937 profile_str,
938 );
939 }
940
941 fn track_log_props(
942 props: &mut HashMap<String, ConfigProperty>,
943 config: &crate::config::AutumnConfig,
944 defaults: &crate::config::AutumnConfig,
945 profile_str: &str,
946 ) {
947 Self::track_property(
948 props,
949 "log.level",
950 &config.log.level,
951 &defaults.log.level,
952 profile_str,
953 );
954 Self::track_property(
955 props,
956 "log.format",
957 &format!("{:?}", config.log.format),
958 &format!("{:?}", defaults.log.format),
959 profile_str,
960 );
961 Self::track_property(
962 props,
963 "log.capture.enabled",
964 &config.log.capture.enabled.to_string(),
965 &defaults.log.capture.enabled.to_string(),
966 profile_str,
967 );
968 Self::track_property(
969 props,
970 "log.capture.capacity",
971 &config.log.capture.capacity.to_string(),
972 &defaults.log.capture.capacity.to_string(),
973 profile_str,
974 );
975 }
976
977 fn track_telemetry_props(
978 props: &mut HashMap<String, ConfigProperty>,
979 config: &crate::config::AutumnConfig,
980 defaults: &crate::config::AutumnConfig,
981 profile_str: &str,
982 ) {
983 Self::track_property(
984 props,
985 "telemetry.enabled",
986 &config.telemetry.enabled.to_string(),
987 &defaults.telemetry.enabled.to_string(),
988 profile_str,
989 );
990 Self::track_property(
991 props,
992 "telemetry.service_name",
993 &config.telemetry.service_name,
994 &defaults.telemetry.service_name,
995 profile_str,
996 );
997 Self::track_property(
998 props,
999 "telemetry.service_namespace",
1000 config.telemetry.service_namespace.as_deref().unwrap_or(""),
1001 defaults
1002 .telemetry
1003 .service_namespace
1004 .as_deref()
1005 .unwrap_or(""),
1006 profile_str,
1007 );
1008 Self::track_property(
1009 props,
1010 "telemetry.service_version",
1011 &config.telemetry.service_version,
1012 &defaults.telemetry.service_version,
1013 profile_str,
1014 );
1015 Self::track_property(
1016 props,
1017 "telemetry.environment",
1018 &config.telemetry.environment,
1019 &defaults.telemetry.environment,
1020 profile_str,
1021 );
1022 Self::track_property(
1023 props,
1024 "telemetry.otlp_endpoint",
1025 config.telemetry.otlp_endpoint.as_deref().unwrap_or(""),
1026 defaults.telemetry.otlp_endpoint.as_deref().unwrap_or(""),
1027 profile_str,
1028 );
1029 Self::track_property(
1030 props,
1031 "telemetry.protocol",
1032 &format!("{:?}", config.telemetry.protocol),
1033 &format!("{:?}", defaults.telemetry.protocol),
1034 profile_str,
1035 );
1036 Self::track_property(
1037 props,
1038 "telemetry.strict",
1039 &config.telemetry.strict.to_string(),
1040 &defaults.telemetry.strict.to_string(),
1041 profile_str,
1042 );
1043 }
1044
1045 fn track_health_props(
1046 props: &mut HashMap<String, ConfigProperty>,
1047 config: &crate::config::AutumnConfig,
1048 defaults: &crate::config::AutumnConfig,
1049 profile_str: &str,
1050 ) {
1051 Self::track_property(
1052 props,
1053 "health.path",
1054 &config.health.path,
1055 &defaults.health.path,
1056 profile_str,
1057 );
1058 Self::track_property(
1059 props,
1060 "health.live_path",
1061 &config.health.live_path,
1062 &defaults.health.live_path,
1063 profile_str,
1064 );
1065 Self::track_property(
1066 props,
1067 "health.ready_path",
1068 &config.health.ready_path,
1069 &defaults.health.ready_path,
1070 profile_str,
1071 );
1072 Self::track_property(
1073 props,
1074 "health.startup_path",
1075 &config.health.startup_path,
1076 &defaults.health.startup_path,
1077 profile_str,
1078 );
1079 Self::track_property(
1080 props,
1081 "health.detailed",
1082 &config.health.detailed.to_string(),
1083 &defaults.health.detailed.to_string(),
1084 profile_str,
1085 );
1086 }
1087
1088 fn track_actuator_props(
1089 props: &mut HashMap<String, ConfigProperty>,
1090 config: &crate::config::AutumnConfig,
1091 defaults: &crate::config::AutumnConfig,
1092 profile_str: &str,
1093 ) {
1094 Self::track_property(
1095 props,
1096 "actuator.prefix",
1097 &config.actuator.prefix,
1098 &defaults.actuator.prefix,
1099 profile_str,
1100 );
1101 Self::track_property(
1102 props,
1103 "actuator.sensitive",
1104 &config.actuator.sensitive.to_string(),
1105 &defaults.actuator.sensitive.to_string(),
1106 profile_str,
1107 );
1108 Self::track_property(
1109 props,
1110 "actuator.prometheus",
1111 &config.actuator.prometheus.to_string(),
1112 &defaults.actuator.prometheus.to_string(),
1113 profile_str,
1114 );
1115 }
1116
1117 fn track_session_props(
1118 props: &mut HashMap<String, ConfigProperty>,
1119 config: &crate::config::AutumnConfig,
1120 defaults: &crate::config::AutumnConfig,
1121 profile_str: &str,
1122 ) {
1123 Self::track_property(
1124 props,
1125 "session.backend",
1126 &format!("{:?}", config.session.backend),
1127 &format!("{:?}", defaults.session.backend),
1128 profile_str,
1129 );
1130 Self::track_property(
1131 props,
1132 "session.cookie_name",
1133 &config.session.cookie_name,
1134 &defaults.session.cookie_name,
1135 profile_str,
1136 );
1137 Self::track_property(
1138 props,
1139 "session.max_age_secs",
1140 &config.session.max_age_secs.to_string(),
1141 &defaults.session.max_age_secs.to_string(),
1142 profile_str,
1143 );
1144 Self::track_property(
1145 props,
1146 "session.secure",
1147 &config.session.secure.to_string(),
1148 &defaults.session.secure.to_string(),
1149 profile_str,
1150 );
1151 Self::track_property(
1152 props,
1153 "session.same_site",
1154 &config.session.same_site,
1155 &defaults.session.same_site,
1156 profile_str,
1157 );
1158 Self::track_property(
1159 props,
1160 "session.http_only",
1161 &config.session.http_only.to_string(),
1162 &defaults.session.http_only.to_string(),
1163 profile_str,
1164 );
1165 Self::track_property(
1166 props,
1167 "session.path",
1168 &config.session.path,
1169 &defaults.session.path,
1170 profile_str,
1171 );
1172 Self::track_property(
1173 props,
1174 "session.allow_memory_in_production",
1175 &config.session.allow_memory_in_production.to_string(),
1176 &defaults.session.allow_memory_in_production.to_string(),
1177 profile_str,
1178 );
1179 Self::track_property(
1180 props,
1181 "session.redis.url",
1182 config.session.redis.url.as_deref().unwrap_or(""),
1183 defaults.session.redis.url.as_deref().unwrap_or(""),
1184 profile_str,
1185 );
1186 Self::track_property(
1187 props,
1188 "session.redis.key_prefix",
1189 &config.session.redis.key_prefix,
1190 &defaults.session.redis.key_prefix,
1191 profile_str,
1192 );
1193 }
1194
1195 fn track_channels_props(
1196 props: &mut HashMap<String, ConfigProperty>,
1197 config: &crate::config::AutumnConfig,
1198 defaults: &crate::config::AutumnConfig,
1199 profile_str: &str,
1200 ) {
1201 Self::track_property(
1202 props,
1203 "channels.backend",
1204 &format!("{:?}", config.channels.backend),
1205 &format!("{:?}", defaults.channels.backend),
1206 profile_str,
1207 );
1208 Self::track_property(
1209 props,
1210 "channels.capacity",
1211 &config.channels.capacity.to_string(),
1212 &defaults.channels.capacity.to_string(),
1213 profile_str,
1214 );
1215 Self::track_property(
1216 props,
1217 "channels.redis.url",
1218 config.channels.redis.url.as_deref().unwrap_or(""),
1219 defaults.channels.redis.url.as_deref().unwrap_or(""),
1220 profile_str,
1221 );
1222 Self::track_property(
1223 props,
1224 "channels.redis.key_prefix",
1225 &config.channels.redis.key_prefix,
1226 &defaults.channels.redis.key_prefix,
1227 profile_str,
1228 );
1229 }
1230
1231 fn track_property(
1232 props: &mut HashMap<String, ConfigProperty>,
1233 key: &str,
1234 value: &str,
1235 default_value: &str,
1236 profile: &str,
1237 ) {
1238 let env_key = format!("AUTUMN_{}", key.replace('.', "__").to_uppercase());
1240 let source = if std::env::var(&env_key).is_ok() {
1241 env_key
1242 } else if value != default_value && (profile == "dev" || profile == "prod") {
1243 format!("profile_default:{profile}")
1244 } else if value != default_value {
1245 "autumn.toml".to_string()
1246 } else {
1247 "default".to_string()
1248 };
1249
1250 let display_value = if should_redact(key) {
1251 serde_json::Value::String("****".into())
1252 } else {
1253 serde_json::Value::String(value.to_string())
1254 };
1255
1256 props.insert(
1257 key.to_string(),
1258 ConfigProperty {
1259 value: display_value,
1260 source,
1261 },
1262 );
1263 }
1264
1265 #[must_use]
1267 pub fn snapshot(&self) -> HashMap<String, ConfigProperty> {
1268 self.inner
1269 .read()
1270 .map(|guard| guard.clone())
1271 .unwrap_or_default()
1272 }
1273}
1274
1275#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1282pub enum HealthStatus {
1283 #[serde(rename = "UP")]
1285 Up,
1286 #[serde(rename = "DOWN")]
1288 Down,
1289 #[serde(rename = "OUT_OF_SERVICE")]
1291 OutOfService,
1292 #[serde(rename = "UNKNOWN")]
1294 Unknown,
1295}
1296
1297impl HealthStatus {
1298 #[must_use]
1300 pub const fn as_str(self) -> &'static str {
1301 match self {
1302 Self::Up => "UP",
1303 Self::Down => "DOWN",
1304 Self::OutOfService => "OUT_OF_SERVICE",
1305 Self::Unknown => "UNKNOWN",
1306 }
1307 }
1308
1309 #[must_use]
1312 pub const fn is_healthy(self) -> bool {
1313 matches!(self, Self::Up | Self::Unknown)
1314 }
1315}
1316
1317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1322pub enum IndicatorGroup {
1323 Readiness,
1325 HealthOnly,
1327}
1328
1329#[derive(Debug, Clone)]
1331pub struct HealthCheckOutput {
1332 pub status: HealthStatus,
1334 pub details: HashMap<String, serde_json::Value>,
1336}
1337
1338impl HealthCheckOutput {
1339 #[must_use]
1341 pub fn up() -> Self {
1342 Self {
1343 status: HealthStatus::Up,
1344 details: HashMap::new(),
1345 }
1346 }
1347
1348 #[must_use]
1350 pub fn down() -> Self {
1351 Self {
1352 status: HealthStatus::Down,
1353 details: HashMap::new(),
1354 }
1355 }
1356
1357 #[must_use]
1359 pub fn with_details(mut self, details: HashMap<String, serde_json::Value>) -> Self {
1360 self.details = details;
1361 self
1362 }
1363}
1364
1365pub trait HealthIndicator: Send + Sync + 'static {
1388 fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput>;
1394
1395 fn timeout_ms(&self) -> u64 {
1397 2000
1398 }
1399
1400 fn group(&self) -> IndicatorGroup {
1402 IndicatorGroup::Readiness
1403 }
1404}
1405
1406#[derive(Debug, Clone)]
1409pub struct HealthRunResult {
1410 pub name: String,
1412 pub group: IndicatorGroup,
1414 pub output: HealthCheckOutput,
1416}
1417
1418type IndicatorList = Vec<(String, IndicatorGroup, Arc<dyn HealthIndicator>)>;
1419
1420#[derive(Clone, Default)]
1426pub struct HealthIndicatorRegistry {
1427 inner: Arc<RwLock<IndicatorList>>,
1428}
1429
1430impl HealthIndicatorRegistry {
1431 #[must_use]
1433 pub fn new() -> Self {
1434 Self::default()
1435 }
1436
1437 pub fn register(
1445 &self,
1446 name: impl Into<String>,
1447 group: IndicatorGroup,
1448 indicator: Arc<dyn HealthIndicator>,
1449 ) -> Result<(), String> {
1450 let name = name.into();
1451 let mut inner = self
1452 .inner
1453 .write()
1454 .unwrap_or_else(std::sync::PoisonError::into_inner);
1455 if inner.iter().any(|(n, _, _)| n == &name) {
1456 return Err(format!(
1457 "HealthIndicator '{name}' is already registered; skipping duplicate"
1458 ));
1459 }
1460 inner.push((name, group, indicator));
1461 drop(inner);
1462 Ok(())
1463 }
1464
1465 #[must_use]
1467 pub fn is_empty(&self) -> bool {
1468 self.inner
1469 .read()
1470 .unwrap_or_else(std::sync::PoisonError::into_inner)
1471 .is_empty()
1472 }
1473
1474 pub async fn run_all(&self) -> Vec<HealthRunResult> {
1479 let entries = self
1480 .inner
1481 .read()
1482 .unwrap_or_else(std::sync::PoisonError::into_inner)
1483 .clone();
1484
1485 let mut results = futures::future::join_all(entries.into_iter().map(
1486 |(name, group, indicator)| async move {
1487 let output = run_with_timeout(indicator.as_ref()).await;
1488 HealthRunResult {
1489 name,
1490 group,
1491 output,
1492 }
1493 },
1494 ))
1495 .await;
1496
1497 for breaker in crate::circuit_breaker::global_registry().all_breakers() {
1498 let state = breaker.state();
1499 let status = match state {
1500 crate::circuit_breaker::CircuitState::Open
1501 | crate::circuit_breaker::CircuitState::HalfOpen => HealthStatus::Down,
1502 crate::circuit_breaker::CircuitState::Closed => HealthStatus::Up,
1503 };
1504
1505 let mut details = HashMap::new();
1506 details.insert(
1507 "state".to_string(),
1508 serde_json::Value::String(state.as_str().to_string()),
1509 );
1510 if let Some(ratio_num) = serde_json::Number::from_f64(breaker.failure_ratio()) {
1511 details.insert(
1512 "failure_ratio".to_string(),
1513 serde_json::Value::Number(ratio_num),
1514 );
1515 }
1516
1517 results.push(HealthRunResult {
1518 name: format!("circuit_breaker.{}", breaker.name()),
1519 group: IndicatorGroup::HealthOnly,
1520 output: HealthCheckOutput { status, details },
1521 });
1522 }
1523
1524 results
1525 }
1526
1527 pub async fn run_readiness(&self) -> Vec<HealthRunResult> {
1532 let entries = self
1534 .inner
1535 .read()
1536 .unwrap_or_else(std::sync::PoisonError::into_inner)
1537 .clone();
1538
1539 futures::future::join_all(
1540 entries
1541 .into_iter()
1542 .filter(|(_, g, _)| *g == IndicatorGroup::Readiness)
1543 .map(|(name, group, indicator)| async move {
1544 let output = run_with_timeout(indicator.as_ref()).await;
1545 HealthRunResult {
1546 name,
1547 group,
1548 output,
1549 }
1550 }),
1551 )
1552 .await
1553 }
1554
1555 #[must_use]
1560 pub fn aggregate_status(statuses: &[HealthStatus]) -> HealthStatus {
1561 let mut overall = HealthStatus::Up;
1562 for &s in statuses {
1563 overall = match (overall, s) {
1564 (_, HealthStatus::Down) | (HealthStatus::Down, _) => HealthStatus::Down,
1565 (_, HealthStatus::OutOfService) | (HealthStatus::OutOfService, _) => {
1566 HealthStatus::OutOfService
1567 }
1568 (_, HealthStatus::Unknown) | (HealthStatus::Unknown, _) => HealthStatus::Unknown,
1569 _ => HealthStatus::Up,
1570 };
1571 }
1572 overall
1573 }
1574}
1575
1576async fn run_with_timeout(indicator: &dyn HealthIndicator) -> HealthCheckOutput {
1579 let duration = tokio::time::Duration::from_millis(indicator.timeout_ms());
1580 match tokio::time::timeout(duration, indicator.check()).await {
1581 Ok(output) => output,
1582 Err(_elapsed) => {
1583 let mut details = HashMap::new();
1584 details.insert("timed_out".to_string(), serde_json::Value::Bool(true));
1585 HealthCheckOutput {
1586 status: HealthStatus::Unknown,
1587 details,
1588 }
1589 }
1590 }
1591}
1592
1593#[derive(Serialize)]
1597struct ActuatorHealth {
1598 status: &'static str,
1600 version: &'static str,
1601 profile: String,
1602 uptime: String,
1603 #[cfg(feature = "db")]
1604 autumn_after_commit_failures_total: u64,
1605 #[serde(skip_serializing_if = "HashMap::is_empty")]
1607 components: HashMap<String, ComponentHealth>,
1608 #[serde(skip_serializing_if = "Option::is_none")]
1610 checks: Option<HealthChecks>,
1611}
1612
1613#[derive(Serialize, Clone)]
1614struct ComponentHealth {
1615 status: &'static str,
1616 #[serde(skip_serializing_if = "Option::is_none")]
1617 details: Option<serde_json::Value>,
1618}
1619
1620#[derive(Serialize)]
1621struct HealthChecks {
1622 #[serde(skip_serializing_if = "Option::is_none")]
1623 database: Option<DatabaseCheck>,
1624}
1625
1626#[derive(Serialize)]
1627struct DatabaseCheck {
1628 status: &'static str,
1629 pool_size: u64,
1630 active_connections: u64,
1631 idle_connections: u64,
1632}
1633
1634fn build_health_components(
1635 db_status: Option<HealthStatus>,
1636 db_check: Option<&DatabaseCheck>,
1637 indicator_results: &[HealthRunResult],
1638 detailed: bool,
1639) -> HashMap<String, ComponentHealth> {
1640 let mut components: HashMap<String, ComponentHealth> = HashMap::new();
1641 for result in indicator_results {
1644 if !detailed
1645 && result.name.starts_with("circuit_breaker.")
1646 && result.output.status.is_healthy()
1647 {
1648 continue;
1649 }
1650 let details = (detailed && !result.output.details.is_empty())
1651 .then(|| serde_json::to_value(&result.output.details).unwrap_or_default());
1652 components.insert(
1653 result.name.clone(),
1654 ComponentHealth {
1655 status: result.output.status.as_str(),
1656 details,
1657 },
1658 );
1659 }
1660 if let Some(s) = db_status {
1661 let details = detailed
1662 .then(|| {
1663 db_check.map(|d| {
1664 serde_json::json!({
1665 "status": d.status,
1666 "pool_size": d.pool_size,
1667 "active_connections": d.active_connections,
1668 "idle_connections": d.idle_connections,
1669 })
1670 })
1671 })
1672 .flatten();
1673 components.insert(
1674 "db".to_string(),
1675 ComponentHealth {
1676 status: s.as_str(),
1677 details,
1678 },
1679 );
1680 }
1681 components
1682}
1683
1684pub async fn health<S: ProvideActuatorState + Send + Sync + 'static>(
1686 State(state): State<S>,
1687) -> impl IntoResponse {
1688 let detailed = state.health_detailed();
1689
1690 let (db_component_status, db_check) = {
1692 #[cfg(feature = "db")]
1693 {
1694 #[allow(clippy::option_if_let_else)]
1695 if let Some(pool) = state.pool() {
1696 let status = pool.status();
1697 let available = status.available as u64;
1698 let size = status.max_size as u64;
1699 let waiting = status.waiting as u64;
1700 let idle = available;
1701 let active = size.saturating_sub(available);
1702
1703 let healthy = available > 0 || waiting == 0;
1704 let db_status = if healthy {
1705 HealthStatus::Up
1706 } else {
1707 HealthStatus::Down
1708 };
1709 let db_check = Some(DatabaseCheck {
1710 status: if healthy { "ok" } else { "down" },
1711 pool_size: size,
1712 active_connections: active,
1713 idle_connections: idle,
1714 });
1715 (Some(db_status), db_check)
1716 } else {
1717 (None, None)
1718 }
1719 }
1720 #[cfg(not(feature = "db"))]
1721 {
1722 (None::<HealthStatus>, None::<DatabaseCheck>)
1723 }
1724 };
1725
1726 let indicator_results = if let Some(registry) = state.health_indicator_registry() {
1728 registry.run_all().await
1729 } else {
1730 Vec::new()
1731 };
1732
1733 let mut all_statuses: Vec<HealthStatus> =
1735 indicator_results.iter().map(|r| r.output.status).collect();
1736 if let Some(s) = db_component_status {
1737 all_statuses.push(s);
1738 }
1739 let overall = HealthIndicatorRegistry::aggregate_status(&all_statuses);
1740
1741 let components = build_health_components(
1743 db_component_status,
1744 db_check.as_ref(),
1745 &indicator_results,
1746 detailed,
1747 );
1748
1749 let checks = db_check.map(|db| HealthChecks { database: Some(db) });
1750
1751 let body = ActuatorHealth {
1752 status: overall.as_str(),
1753 version: env!("CARGO_PKG_VERSION"),
1754 profile: state.profile().to_owned(),
1755 uptime: state.uptime_display(),
1756 #[cfg(feature = "db")]
1757 autumn_after_commit_failures_total: crate::db::AFTER_COMMIT_FAILURES_TOTAL
1758 .load(std::sync::atomic::Ordering::Relaxed),
1759 components,
1760 checks,
1761 };
1762
1763 let code = if overall.is_healthy() {
1764 StatusCode::OK
1765 } else {
1766 StatusCode::SERVICE_UNAVAILABLE
1767 };
1768 (code, Json(body))
1769}
1770
1771#[derive(Serialize)]
1775pub(crate) struct ActuatorInfo {
1776 app: AppInfo,
1777 autumn: FrameworkInfo,
1778 runtime: RuntimeInfo,
1779}
1780
1781#[derive(Serialize)]
1782struct AppInfo {
1783 name: String,
1784 version: String,
1785}
1786
1787#[derive(Serialize)]
1788struct FrameworkInfo {
1789 version: &'static str,
1790 profile: String,
1791}
1792
1793#[derive(Serialize)]
1794struct RuntimeInfo {
1795 uptime: String,
1796}
1797
1798pub(crate) async fn info<S: ProvideActuatorState + Send + Sync + 'static>(
1800 State(state): State<S>,
1801) -> Json<ActuatorInfo> {
1802 Json(ActuatorInfo {
1803 app: AppInfo {
1804 name: std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".into()),
1805 version: std::env::var("CARGO_PKG_VERSION").unwrap_or_else(|_| "unknown".into()),
1806 },
1807 autumn: FrameworkInfo {
1808 version: env!("CARGO_PKG_VERSION"),
1809 profile: state.profile().to_owned(),
1810 },
1811 runtime: RuntimeInfo {
1812 uptime: state.uptime_display(),
1813 },
1814 })
1815}
1816
1817#[derive(Serialize)]
1821pub(crate) struct ActuatorEnv {
1822 active_profile: String,
1823 properties: std::collections::HashMap<String, serde_json::Value>,
1824}
1825
1826const REDACT_PATTERNS: &[&str] = &[
1828 "password",
1829 "secret",
1830 "key",
1831 "token",
1832 "credential",
1833 "auth",
1834 "url",
1835];
1836
1837fn should_redact(key: &str) -> bool {
1838 let lower = key.to_lowercase();
1839 REDACT_PATTERNS.iter().any(|p| lower.contains(p))
1840}
1841
1842pub(crate) async fn env_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1844 State(state): State<S>,
1845) -> Json<ActuatorEnv> {
1846 let properties = state
1847 .config_props()
1848 .snapshot()
1849 .into_iter()
1850 .map(|(key, prop)| (key, prop.value))
1851 .collect();
1852
1853 Json(ActuatorEnv {
1854 active_profile: state.profile().to_owned(),
1855 properties,
1856 })
1857}
1858
1859pub(crate) async fn metrics_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1863 State(state): State<S>,
1864) -> Json<serde_json::Value> {
1865 let snapshot = state.metrics().snapshot();
1866 let mut result = serde_json::to_value(&snapshot).unwrap_or_default();
1867
1868 #[cfg(feature = "db")]
1870 if let Some(pool) = state.pool() {
1871 let status = pool.status();
1872 let db_stats = serde_json::json!({
1873 "pool_size": status.max_size,
1874 "active_connections": (status.max_size as u64).saturating_sub(status.available as u64),
1875 "idle_connections": status.available,
1876 });
1877 if let serde_json::Value::Object(ref mut map) = result {
1878 map.insert("database".to_string(), db_stats);
1879 }
1880 }
1881
1882 if let Some(registry) = state.metrics_source_registry() {
1884 let all = registry.collect_all();
1885 let mut sources = serde_json::Map::new();
1886 for (source_name, families) in all {
1887 let families_json: Vec<serde_json::Value> = families
1888 .iter()
1889 .map(|f| {
1890 serde_json::json!({
1891 "name": f.name,
1892 "help": f.help,
1893 "kind": f.kind.as_str(),
1894 "samples": f.samples.iter().map(|s| {
1895 let labels: serde_json::Map<String, serde_json::Value> = s.labels
1896 .iter()
1897 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
1898 .collect();
1899 serde_json::json!({
1900 "labels": labels,
1901 "value": s.value,
1902 })
1903 }).collect::<Vec<_>>(),
1904 })
1905 })
1906 .collect();
1907 sources.insert(source_name, serde_json::Value::Array(families_json));
1908 }
1909 if let serde_json::Value::Object(ref mut map) = result {
1910 map.insert("sources".to_string(), serde_json::Value::Object(sources));
1911 }
1912 }
1913
1914 Json(result)
1915}
1916
1917#[derive(Serialize)]
1918pub(crate) struct CircuitBreakerActuatorResponse {
1919 pub name: String,
1920 pub state: &'static str,
1921 pub failure_ratio: f64,
1922 #[serde(skip_serializing_if = "Option::is_none")]
1923 pub failure_ratio_threshold: Option<f64>,
1924 #[serde(skip_serializing_if = "Option::is_none")]
1925 pub sample_window_secs: Option<u64>,
1926 #[serde(skip_serializing_if = "Option::is_none")]
1927 pub minimum_sample_count: Option<u64>,
1928 #[serde(skip_serializing_if = "Option::is_none")]
1929 pub open_duration_secs: Option<u64>,
1930 #[serde(skip_serializing_if = "Option::is_none")]
1931 pub half_open_trial_count: Option<u64>,
1932}
1933
1934pub(crate) async fn circuitbreakers_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
1936 State(state): State<S>,
1937) -> Json<Vec<CircuitBreakerActuatorResponse>> {
1938 let detailed = state.health_detailed();
1939 let mut responses = Vec::new();
1940
1941 for breaker in crate::circuit_breaker::global_registry().all_breakers() {
1942 let policy = breaker.config();
1943 responses.push(CircuitBreakerActuatorResponse {
1944 name: breaker.name().to_string(),
1945 state: breaker.state().as_str(),
1946 failure_ratio: breaker.failure_ratio(),
1947 failure_ratio_threshold: detailed.then_some(policy.failure_ratio_threshold),
1948 sample_window_secs: detailed.then_some(policy.sample_window.as_secs()),
1949 minimum_sample_count: detailed.then_some(policy.minimum_sample_count),
1950 open_duration_secs: detailed.then_some(policy.open_duration.as_secs()),
1951 half_open_trial_count: detailed.then_some(policy.half_open_trial_count),
1952 });
1953 }
1954
1955 Json(responses)
1956}
1957
1958fn render_labels(labels: &[(String, String)]) -> String {
1964 if labels.is_empty() {
1965 return String::new();
1966 }
1967 let mut out = String::with_capacity(64);
1968 out.push('{');
1969 for (i, (k, v)) in labels.iter().enumerate() {
1970 if i > 0 {
1971 out.push(',');
1972 }
1973 out.push_str(k);
1974 out.push_str("=\"");
1975 for c in v.chars() {
1976 match c {
1977 '\\' => out.push_str("\\\\"),
1978 '\n' => out.push_str("\\n"),
1979 '"' => out.push_str("\\\""),
1980 other => out.push(other),
1981 }
1982 }
1983 out.push('"');
1984 }
1985 out.push('}');
1986 out
1987}
1988
1989fn is_valid_metric_name(s: &str) -> bool {
1991 let mut it = s.chars();
1992 matches!(it.next(), Some(c) if c.is_ascii_alphabetic() || c == '_' || c == ':')
1993 && it.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == ':')
1994}
1995
1996fn is_valid_label_name(s: &str) -> bool {
1998 let mut it = s.chars();
1999 matches!(it.next(), Some(c) if c.is_ascii_alphabetic() || c == '_')
2000 && it.all(|c| c.is_ascii_alphanumeric() || c == '_')
2001}
2002
2003fn escape_prometheus_label_value(value: &str) -> String {
2005 let mut out = String::with_capacity(value.len());
2006 for c in value.chars() {
2007 match c {
2008 '\\' => out.push_str("\\\\"),
2009 '\n' => out.push_str("\\n"),
2010 '"' => out.push_str("\\\""),
2011 other => out.push(other),
2012 }
2013 }
2014 out
2015}
2016
2017fn escape_help_text(text: &str) -> String {
2019 let mut out = String::with_capacity(text.len());
2020 for c in text.chars() {
2021 match c {
2022 '\\' => out.push_str("\\\\"),
2023 '\n' => out.push_str("\\n"),
2024 other => out.push(other),
2025 }
2026 }
2027 out
2028}
2029
2030fn format_sample_value(v: f64) -> String {
2032 if v == f64::INFINITY {
2033 "+Inf".to_string()
2034 } else if v == f64::NEG_INFINITY {
2035 "-Inf".to_string()
2036 } else if v.is_nan() {
2037 "NaN".to_string()
2038 } else {
2039 v.to_string()
2040 }
2041}
2042
2043fn render_plugin_sources(
2049 registry: &MetricsSourceRegistry,
2050 out: &mut String,
2051 emitted_families: &mut std::collections::HashSet<String>,
2052) {
2053 use std::fmt::Write;
2054
2055 let all_sources = registry.collect_all();
2056 for (_source_name, families) in &all_sources {
2057 for family in families {
2058 if !is_valid_metric_name(&family.name) {
2059 tracing::warn!(name = %family.name, "MetricsSource returned invalid metric name; skipping family");
2060 continue;
2061 }
2062 if !emitted_families.insert(family.name.clone()) {
2063 tracing::warn!(name = %family.name, "MetricsSource returned duplicate metric family name; skipping family");
2064 continue;
2065 }
2066 let _ = writeln!(
2067 out,
2068 "# HELP {} {}",
2069 family.name,
2070 escape_help_text(&family.help)
2071 );
2072 let _ = writeln!(out, "# TYPE {} {}", family.name, family.kind.as_str());
2073 let mut emitted_series: std::collections::HashSet<String> =
2074 std::collections::HashSet::new();
2075 for sample in &family.samples {
2076 let mut bad_key = false;
2077 let mut seen_keys = std::collections::HashSet::new();
2078 let mut valid_labels: Vec<(String, String)> = Vec::new();
2079 for (k, v) in &sample.labels {
2080 if !is_valid_label_name(k) {
2081 tracing::warn!(
2082 label_name = %k,
2083 metric = %family.name,
2084 "MetricsSource returned invalid label name; skipping sample"
2085 );
2086 bad_key = true;
2087 break;
2088 }
2089 if !seen_keys.insert(k.as_str()) {
2090 tracing::warn!(label_name = %k, "MetricsSource returned duplicate label name; dropping duplicate");
2091 continue;
2092 }
2093 valid_labels.push((k.clone(), v.clone()));
2094 }
2095 if bad_key {
2096 continue;
2097 }
2098 valid_labels.sort_by(|(a, _), (b, _)| a.cmp(b));
2101 let labels = render_labels(&valid_labels);
2102 if !emitted_series.insert(labels.clone()) {
2103 tracing::warn!(
2104 metric = %family.name,
2105 labels = %labels,
2106 "MetricsSource returned duplicate series; skipping sample"
2107 );
2108 continue;
2109 }
2110 let _ = writeln!(
2111 out,
2112 "{}{} {}",
2113 family.name,
2114 labels,
2115 format_sample_value(sample.value)
2116 );
2117 }
2118 }
2119 }
2120
2121 let error_counts = registry.error_counts();
2122 if !error_counts.is_empty() {
2123 out.push_str(
2124 "# HELP autumn_metrics_source_errors_total \
2125 Number of scrape errors (panics) per plugin metrics source\n",
2126 );
2127 out.push_str("# TYPE autumn_metrics_source_errors_total counter\n");
2128 let mut names: Vec<&String> = error_counts.keys().collect();
2129 names.sort();
2130 for name in names {
2131 let label = render_labels(&[("source".to_string(), name.clone())]);
2132 let _ = writeln!(
2133 out,
2134 "autumn_metrics_source_errors_total{} {}",
2135 label, error_counts[name]
2136 );
2137 }
2138 }
2139}
2140
2141fn write_builtin_http_metrics(
2145 out: &mut String,
2146 version: &str,
2147 snapshot: &crate::middleware::metrics::MetricsSnapshot,
2148) {
2149 use std::fmt::Write;
2150
2151 out.push_str("# HELP autumn_http_requests_total Total number of HTTP requests\n");
2153 out.push_str("# TYPE autumn_http_requests_total counter\n");
2154 let _ = writeln!(
2155 out,
2156 "autumn_http_requests_total{{version=\"{version}\"}} {}",
2157 snapshot.http.requests_total
2158 );
2159
2160 out.push_str("# HELP autumn_http_requests_active Currently active HTTP requests\n");
2162 out.push_str("# TYPE autumn_http_requests_active gauge\n");
2163 let _ = writeln!(
2164 out,
2165 "autumn_http_requests_active{{version=\"{version}\"}} {}",
2166 snapshot.http.requests_active
2167 );
2168
2169 out.push_str("# HELP autumn_http_responses_total HTTP responses by status code\n");
2171 out.push_str("# TYPE autumn_http_responses_total counter\n");
2172 for (status, count) in [
2173 ("2xx", snapshot.http.by_status.s2xx),
2174 ("3xx", snapshot.http.by_status.s3xx),
2175 ("4xx", snapshot.http.by_status.s4xx),
2176 ("5xx", snapshot.http.by_status.s5xx),
2177 ] {
2178 let _ = writeln!(
2179 out,
2180 "autumn_http_responses_total{{version=\"{version}\",status=\"{status}\"}} {count}"
2181 );
2182 }
2183
2184 out.push_str(
2188 "# HELP autumn_http_request_duration_seconds HTTP request latency percentiles in seconds\n",
2189 );
2190 out.push_str("# TYPE autumn_http_request_duration_seconds summary\n");
2191 for (quantile, millis) in [
2192 ("0.5", snapshot.http.latency_ms.p50),
2193 ("0.95", snapshot.http.latency_ms.p95),
2194 ("0.99", snapshot.http.latency_ms.p99),
2195 ] {
2196 #[allow(clippy::cast_precision_loss)]
2197 let seconds = millis as f64 / 1000.0;
2198 let _ = writeln!(
2199 out,
2200 "autumn_http_request_duration_seconds{{version=\"{version}\",quantile=\"{quantile}\"}} {seconds}"
2201 );
2202 }
2203
2204 out.push_str(
2206 "# HELP autumn_shutdown_aborted_requests_total \
2207 HTTP requests forcibly dropped when the graceful-shutdown drain deadline expired\n",
2208 );
2209 out.push_str("# TYPE autumn_shutdown_aborted_requests_total counter\n");
2210 let _ = writeln!(
2211 out,
2212 "autumn_shutdown_aborted_requests_total{{version=\"{version}\"}} {}",
2213 snapshot.http.shutdown_aborted_requests_total
2214 );
2215
2216 out.push_str(
2218 "# HELP autumn_request_timeouts_total \
2219 HTTP requests that exceeded the configured per-request timeout\n",
2220 );
2221 out.push_str("# TYPE autumn_request_timeouts_total counter\n");
2222 let _ = writeln!(
2223 out,
2224 "autumn_request_timeouts_total{{version=\"{version}\"}} {}",
2225 snapshot.http.request_timeouts_total
2226 );
2227
2228 if !snapshot.http.by_route.is_empty() {
2230 out.push_str("# HELP autumn_http_route_requests_total HTTP requests by route and method\n");
2231 out.push_str("# TYPE autumn_http_route_requests_total counter\n");
2232 let mut route_keys: Vec<&String> = snapshot.http.by_route.keys().collect();
2233 route_keys.sort();
2234 for route_key in route_keys {
2235 let metrics = &snapshot.http.by_route[route_key];
2236 if let Some((method, path)) = route_key.split_once(' ') {
2238 let _ = writeln!(
2239 out,
2240 "autumn_http_route_requests_total{{version=\"{version}\",method=\"{method}\",route=\"{path}\"}} {}",
2241 metrics.count
2242 );
2243 }
2244 }
2245 }
2246}
2247
2248pub(crate) async fn prometheus_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2250 State(state): State<S>,
2251) -> impl IntoResponse {
2252 let snapshot = state.metrics().snapshot();
2253 let version = escape_prometheus_label_value(&state.deploy_version());
2257 let mut out = String::with_capacity(2048);
2258
2259 write_builtin_http_metrics(&mut out, &version, &snapshot);
2260
2261 if let Some(registry) = state.metrics_source_registry() {
2264 let mut emitted_families: std::collections::HashSet<String> = [
2265 "autumn_http_requests_total",
2266 "autumn_http_requests_active",
2267 "autumn_http_responses_total",
2268 "autumn_http_request_duration_seconds",
2269 "autumn_shutdown_aborted_requests_total",
2270 "autumn_request_timeouts_total",
2271 "autumn_http_route_requests_total",
2272 "autumn_metrics_source_errors_total",
2273 ]
2274 .iter()
2275 .map(|s| (*s).to_string())
2276 .collect();
2277 render_plugin_sources(registry, &mut out, &mut emitted_families);
2278 }
2279
2280 (
2281 [(
2282 axum::http::header::CONTENT_TYPE,
2283 "text/plain; version=0.0.4",
2284 )],
2285 out,
2286 )
2287}
2288
2289pub(crate) async fn configprops_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2293 State(state): State<S>,
2294) -> Json<serde_json::Value> {
2295 let props = state.config_props().snapshot();
2296
2297 Json(serde_json::json!({
2298 "active_profile": state.profile(),
2299 "properties": props,
2300 }))
2301}
2302
2303const AVAILABLE_LEVELS: &[&str] = &["trace", "debug", "info", "warn", "error"];
2307
2308#[derive(Serialize)]
2310pub(crate) struct LoggersResponse {
2311 current_level: String,
2312 available_levels: Vec<&'static str>,
2313 loggers: HashMap<String, String>,
2314}
2315
2316pub(crate) async fn loggers_get<S: ProvideActuatorState + Send + Sync + 'static>(
2318 State(state): State<S>,
2319) -> Json<LoggersResponse> {
2320 Json(LoggersResponse {
2321 current_level: state.log_levels().current_level(),
2322 available_levels: AVAILABLE_LEVELS.to_vec(),
2323 loggers: state.log_levels().logger_overrides(),
2324 })
2325}
2326
2327#[derive(Deserialize)]
2329pub(crate) struct SetLoggerRequest {
2330 level: String,
2331}
2332
2333pub(crate) async fn loggers_put<S: ProvideActuatorState + Send + Sync + 'static>(
2335 State(state): State<S>,
2336 Path(name): Path<String>,
2337 Json(body): Json<SetLoggerRequest>,
2338) -> impl IntoResponse {
2339 let level = body.level.to_lowercase();
2340
2341 if !AVAILABLE_LEVELS.contains(&level.as_str()) {
2343 return (
2344 StatusCode::BAD_REQUEST,
2345 Json(serde_json::json!({
2346 "status": "error",
2347 "message": format!(
2348 "Invalid level '{}'. Available levels: {}",
2349 level,
2350 AVAILABLE_LEVELS.join(", ")
2351 ),
2352 })),
2353 );
2354 }
2355
2356 let previous = state.log_levels().set_logger_level(&name, &level);
2357
2358 (
2359 StatusCode::OK,
2360 Json(serde_json::json!({
2361 "status": "ok",
2362 "message": format!("Logger '{}' set to '{}'", name, level),
2363 "previous": previous,
2364 })),
2365 )
2366}
2367
2368#[derive(Debug, Deserialize, Default)]
2372pub(crate) struct LogfileQuery {
2373 pub level: Option<String>,
2378 pub limit: Option<usize>,
2380}
2381
2382#[derive(Debug, Serialize)]
2384pub(crate) struct LogfileResponse {
2385 pub entries: Vec<crate::log::capture::CapturedLogEntry>,
2387 pub total: usize,
2389 pub capture_enabled: bool,
2391}
2392
2393pub(crate) async fn logfile_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2405 State(state): State<S>,
2406 axum::extract::Query(query): axum::extract::Query<LogfileQuery>,
2407) -> Result<axum::Json<LogfileResponse>, (StatusCode, axum::Json<serde_json::Value>)> {
2408 let min_level = match query.level.as_deref() {
2409 None => None,
2410 Some(s) => match crate::log::capture::level_from_str(s) {
2411 Some(level) => Some(level),
2412 None => {
2413 return Err((
2414 StatusCode::BAD_REQUEST,
2415 axum::Json(serde_json::json!({
2416 "error": format!(
2417 "invalid level {:?}; valid values: TRACE, DEBUG, INFO, WARN, ERROR",
2418 s
2419 )
2420 })),
2421 ));
2422 }
2423 },
2424 };
2425
2426 Ok(match state.log_buffer() {
2427 None => axum::Json(LogfileResponse {
2428 entries: vec![],
2429 total: 0,
2430 capture_enabled: false,
2431 }),
2432 Some(buf) => {
2433 let total = buf.len();
2434 let entries = buf.snapshot(min_level, query.limit);
2435 axum::Json(LogfileResponse {
2436 entries,
2437 total,
2438 capture_enabled: true,
2439 })
2440 }
2441 })
2442}
2443
2444pub(crate) async fn tasks_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2448 State(state): State<S>,
2449) -> Json<serde_json::Value> {
2450 let tasks = state.task_registry().snapshot();
2451
2452 Json(serde_json::json!({
2453 "scheduled_tasks": tasks,
2454 }))
2455}
2456
2457pub(crate) async fn jobs_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2459 State(state): State<S>,
2460) -> Json<serde_json::Value> {
2461 let jobs = state.job_registry().snapshot();
2462 Json(serde_json::json!({ "jobs": jobs }))
2463}
2464
2465#[cfg(feature = "http-client")]
2466#[derive(Deserialize)]
2468pub(crate) struct ReplayRequest {
2469 log_id: String,
2470}
2471
2472#[cfg(feature = "http-client")]
2473async fn enqueue_webhook_replay_job(log_id: &str) -> Result<(), String> {
2474 let job_payload = serde_json::json!({
2475 "log_id": log_id,
2476 "replay": true,
2477 });
2478
2479 let Some(job_client) = crate::job::global_job_client() else {
2480 return Err("Global job client is not available".to_string());
2481 };
2482
2483 job_client
2484 .enqueue("autumn_webhook_delivery", job_payload)
2485 .await
2486 .map_err(|e| format!("Failed to enqueue job: {e}"))
2487}
2488
2489#[cfg(feature = "http-client")]
2490pub(crate) async fn webhooks_dlq_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2492 State(state): State<S>,
2493) -> impl IntoResponse {
2494 let Some(manager) = state.webhook_outbound() else {
2495 return (
2496 StatusCode::NOT_IMPLEMENTED,
2497 Json(serde_json::json!({
2498 "status": "error",
2499 "message": "Outbound webhook support is not configured or enabled"
2500 })),
2501 )
2502 .into_response();
2503 };
2504
2505 match manager.store().get_dlq_logs().await {
2506 Ok(logs) => (StatusCode::OK, Json(logs)).into_response(),
2507 Err(e) => (
2508 StatusCode::INTERNAL_SERVER_ERROR,
2509 Json(serde_json::json!({
2510 "status": "error",
2511 "message": format!("Failed to fetch DLQ logs: {}", e)
2512 })),
2513 )
2514 .into_response(),
2515 }
2516}
2517
2518#[cfg(feature = "http-client")]
2519pub(crate) async fn webhooks_replay_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2521 State(state): State<S>,
2522 Json(body): Json<ReplayRequest>,
2523) -> impl IntoResponse {
2524 let Some(manager) = state.webhook_outbound() else {
2525 return (
2526 StatusCode::NOT_IMPLEMENTED,
2527 Json(serde_json::json!({
2528 "status": "error",
2529 "message": "Outbound webhook support is not configured or enabled"
2530 })),
2531 )
2532 .into_response();
2533 };
2534
2535 let log_opt = match manager.store().get_delivery_log(&body.log_id).await {
2536 Ok(log) => log,
2537 Err(e) => {
2538 return (
2539 StatusCode::INTERNAL_SERVER_ERROR,
2540 Json(serde_json::json!({
2541 "status": "error",
2542 "message": format!("Failed to retrieve log: {}", e)
2543 })),
2544 )
2545 .into_response();
2546 }
2547 };
2548
2549 let Some(log) = log_opt else {
2550 return (
2551 StatusCode::NOT_FOUND,
2552 Json(serde_json::json!({
2553 "status": "error",
2554 "message": format!("Log with ID {} not found", body.log_id)
2555 })),
2556 )
2557 .into_response();
2558 };
2559
2560 if !log.is_dlq {
2561 return (StatusCode::BAD_REQUEST, Json(serde_json::json!({
2562 "status": "error",
2563 "message": format!("Log with ID {} is not in the Dead Letter Queue (DLQ)", body.log_id)
2564 }))).into_response();
2565 }
2566
2567 if let Some(response) = blocked_webhook_replay_response(&manager, &log, &body.log_id).await {
2568 return response;
2569 }
2570
2571 let subscription_id = log.subscription_id.clone();
2572 let original_log = log.clone();
2573 let log = reset_webhook_replay_log(log);
2574
2575 if let Err(e) = manager.store().log_delivery(log).await {
2576 return (
2577 StatusCode::INTERNAL_SERVER_ERROR,
2578 Json(serde_json::json!({
2579 "status": "error",
2580 "message": format!("Failed to update delivery log state: {}", e)
2581 })),
2582 )
2583 .into_response();
2584 }
2585
2586 if let Err(message) = enqueue_webhook_replay_job(&body.log_id).await {
2588 if let Err(rollback_error) = manager.store().replace_delivery_log(original_log).await {
2589 tracing::error!(
2590 log_id = %body.log_id,
2591 "Failed to roll back webhook replay log after enqueue failure: {}",
2592 rollback_error
2593 );
2594 return (
2595 StatusCode::INTERNAL_SERVER_ERROR,
2596 Json(serde_json::json!({
2597 "status": "error",
2598 "message": format!("{message}; failed to restore DLQ log state: {rollback_error}")
2599 })),
2600 )
2601 .into_response();
2602 }
2603
2604 return (
2605 StatusCode::INTERNAL_SERVER_ERROR,
2606 Json(serde_json::json!({
2607 "status": "error",
2608 "message": message
2609 })),
2610 )
2611 .into_response();
2612 }
2613
2614 if let Err(e) = manager
2616 .store()
2617 .reactivate_failed_subscription(&subscription_id)
2618 .await
2619 {
2620 tracing::warn!(subscription_id = %subscription_id, "Failed to reactivate subscription during replay: {}", e);
2621 }
2622
2623 (
2624 StatusCode::OK,
2625 Json(serde_json::json!({
2626 "status": "ok",
2627 "message": format!("Replay successfully enqueued for log {}", body.log_id)
2628 })),
2629 )
2630 .into_response()
2631}
2632
2633#[cfg(feature = "http-client")]
2634fn reset_webhook_replay_log(
2635 mut log: crate::webhook_outbound::WebhookDeliveryLog,
2636) -> crate::webhook_outbound::WebhookDeliveryLog {
2637 log.is_dlq = false;
2638 log.attempt = 1;
2639 log.last_error = None;
2640 log.response_status = None;
2641 log.response_body = None;
2642 log.timestamp = chrono::Utc::now();
2643 log
2644}
2645
2646#[cfg(feature = "http-client")]
2647async fn blocked_webhook_replay_response(
2648 manager: &crate::webhook_outbound::WebhookOutboundManager,
2649 log: &crate::webhook_outbound::WebhookDeliveryLog,
2650 log_id: &str,
2651) -> Option<axum::response::Response> {
2652 let subscription = match manager.store().get_subscription(&log.subscription_id).await {
2653 Ok(subscription) => subscription,
2654 Err(e) => {
2655 return Some(
2656 (
2657 StatusCode::INTERNAL_SERVER_ERROR,
2658 Json(serde_json::json!({
2659 "status": "error",
2660 "message": format!("Failed to retrieve subscription: {}", e)
2661 })),
2662 )
2663 .into_response(),
2664 );
2665 }
2666 };
2667
2668 let Some(subscription) = subscription else {
2669 return Some(
2670 (
2671 StatusCode::NOT_FOUND,
2672 Json(serde_json::json!({
2673 "status": "error",
2674 "message": format!(
2675 "Subscription {} for replay log {} was not found",
2676 log.subscription_id, log_id
2677 )
2678 })),
2679 )
2680 .into_response(),
2681 );
2682 };
2683
2684 if subscription.status != crate::webhook_outbound::WebhookSubscriptionStatus::Disabled {
2685 return None;
2686 }
2687
2688 Some(
2689 (
2690 StatusCode::CONFLICT,
2691 Json(serde_json::json!({
2692 "status": "error",
2693 "message": format!(
2694 "Subscription {} is disabled; re-enable it before replaying log {}",
2695 log.subscription_id, log_id
2696 )
2697 })),
2698 )
2699 .into_response(),
2700 )
2701}
2702
2703pub(crate) async fn a11y_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2710 State(state): State<S>,
2711) -> Json<A11yPosture> {
2712 Json(state.a11y_posture())
2713}
2714
2715#[cfg(feature = "ws")]
2719pub(crate) async fn channels_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2720 State(state): State<S>,
2721) -> Json<serde_json::Value> {
2722 let channels = state.channels().snapshot();
2723 Json(serde_json::json!({
2724 "channels": channels,
2725 }))
2726}
2727
2728#[cfg(feature = "ws")]
2732pub(crate) async fn tasks_stream_endpoint<S: ProvideActuatorState + Send + Sync + 'static>(
2733 State(state): State<S>,
2734 ws: axum::extract::ws::WebSocketUpgrade,
2735) -> impl IntoResponse {
2736 ws.on_upgrade(move |mut socket| async move {
2737 let mut rx = state.channels().subscribe("sys:tasks");
2738 let shutdown = state.shutdown_token();
2739
2740 loop {
2741 tokio::select! {
2742 res = rx.recv() => {
2743 match res {
2744 Ok(msg) => {
2745 let ws_msg = axum::extract::ws::Message::Text(msg.into_string().into());
2746 if socket.send(ws_msg).await.is_err() {
2747 break;
2748 }
2749 }
2750 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
2751 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2752 }
2753 }
2754 () = shutdown.cancelled() => {
2755 let _ = socket.send(axum::extract::ws::Message::Close(None)).await;
2756 break;
2757 }
2758 else => break,
2759 }
2760 }
2761 })
2762}
2763
2764pub(crate) fn normalize_actuator_prefix(prefix: &str) -> String {
2767 let trimmed = prefix.trim();
2768 if trimmed.is_empty() || trimmed == "/" {
2769 String::new()
2770 } else {
2771 let trimmed = trimmed.trim_end_matches('/');
2772 if trimmed.starts_with('/') {
2773 trimmed.to_owned()
2774 } else {
2775 format!("/{trimmed}")
2776 }
2777 }
2778}
2779
2780pub(crate) fn actuator_route_glob(prefix: &str) -> String {
2781 let prefix = normalize_actuator_prefix(prefix);
2782 if prefix.is_empty() {
2783 "/*".to_owned()
2784 } else {
2785 format!("{prefix}/*")
2786 }
2787}
2788
2789pub(crate) fn actuator_route_path(prefix: &str, suffix: &str) -> String {
2790 let prefix = normalize_actuator_prefix(prefix);
2791 if prefix.is_empty() {
2792 suffix.to_owned()
2793 } else {
2794 format!("{prefix}{suffix}")
2795 }
2796}
2797
2798pub(crate) fn actuator_endpoint_paths(
2799 prefix: &str,
2800 sensitive: bool,
2801 prometheus_enabled: bool,
2802) -> Vec<String> {
2803 let mut paths = vec![
2804 actuator_route_path(prefix, "/health"),
2805 actuator_route_path(prefix, "/info"),
2806 actuator_route_path(prefix, "/metrics"),
2807 actuator_route_path(prefix, "/a11y"),
2808 actuator_route_path(prefix, "/ui"),
2809 actuator_route_path(prefix, "/ui/metrics"),
2810 ];
2811
2812 if prometheus_enabled {
2813 paths.push(actuator_route_path(prefix, "/prometheus"));
2814 }
2815
2816 if sensitive {
2817 paths.push(actuator_route_path(prefix, "/circuitbreakers"));
2818 paths.push(actuator_route_path(prefix, "/env"));
2819 paths.push(actuator_route_path(prefix, "/configprops"));
2820 paths.push(actuator_route_path(prefix, "/loggers"));
2821 paths.push(actuator_route_path(prefix, "/logfile"));
2822 paths.push(actuator_route_path(prefix, "/tasks"));
2823 paths.push(actuator_route_path(prefix, "/jobs"));
2824 paths.push(actuator_route_path(prefix, "/ui/tasks"));
2825 #[cfg(feature = "http-client")]
2826 {
2827 paths.push(actuator_route_path(prefix, "/webhooks/dlq"));
2828 paths.push(actuator_route_path(prefix, "/webhooks/replay"));
2829 }
2830 #[cfg(feature = "ws")]
2831 {
2832 paths.push(actuator_route_path(prefix, "/channels"));
2833 paths.push(actuator_route_path(prefix, "/tasks/stream"));
2834 }
2835 }
2836
2837 paths
2838}
2839
2840pub fn actuator_router<S: ProvideActuatorState + Send + Sync + Clone + 'static>(
2849 sensitive: bool,
2850) -> axum::Router<S> {
2851 actuator_router_with_prefix("/actuator", sensitive, true)
2852}
2853
2854#[allow(clippy::too_many_lines)]
2862pub(crate) fn actuator_router_with_prefix<
2863 S: ProvideActuatorState + Send + Sync + Clone + 'static,
2864>(
2865 prefix: &str,
2866 sensitive: bool,
2867 prometheus_enabled: bool,
2868) -> axum::Router<S> {
2869 let mut router = axum::Router::new()
2870 .route(
2871 &actuator_route_path(prefix, "/health"),
2872 axum::routing::get(health::<S>),
2873 )
2874 .route(
2875 &actuator_route_path(prefix, "/info"),
2876 axum::routing::get(info::<S>),
2877 )
2878 .route(
2879 &actuator_route_path(prefix, "/metrics"),
2880 axum::routing::get(metrics_endpoint::<S>),
2881 )
2882 .route(
2883 &actuator_route_path(prefix, "/a11y"),
2884 axum::routing::get(a11y_endpoint::<S>),
2885 );
2886
2887 if prometheus_enabled {
2888 router = router.route(
2889 &actuator_route_path(prefix, "/prometheus"),
2890 axum::routing::get(prometheus_endpoint::<S>),
2891 );
2892 }
2893
2894 if sensitive {
2895 router = router
2896 .route(
2897 &actuator_route_path(prefix, "/circuitbreakers"),
2898 axum::routing::get(circuitbreakers_endpoint::<S>),
2899 )
2900 .route(
2901 &actuator_route_path(prefix, "/env"),
2902 axum::routing::get(env_endpoint::<S>),
2903 )
2904 .route(
2905 &actuator_route_path(prefix, "/configprops"),
2906 axum::routing::get(configprops_endpoint::<S>),
2907 )
2908 .route(
2909 &actuator_route_path(prefix, "/loggers"),
2910 axum::routing::get(loggers_get::<S>),
2911 )
2912 .route(
2913 &actuator_route_path(prefix, "/loggers/{name}"),
2914 axum::routing::put(loggers_put::<S>),
2915 )
2916 .route(
2917 &actuator_route_path(prefix, "/logfile"),
2918 axum::routing::get(logfile_endpoint::<S>),
2919 )
2920 .route(
2921 &actuator_route_path(prefix, "/tasks"),
2922 axum::routing::get(tasks_endpoint::<S>),
2923 )
2924 .route(
2925 &actuator_route_path(prefix, "/jobs"),
2926 axum::routing::get(jobs_endpoint::<S>),
2927 )
2928 .route(
2929 &actuator_route_path(prefix, "/ui/tasks"),
2930 axum::routing::get(ui_tasks::<S>),
2931 );
2932 #[cfg(feature = "http-client")]
2933 {
2934 router = router
2935 .route(
2936 &actuator_route_path(prefix, "/webhooks/dlq"),
2937 axum::routing::get(webhooks_dlq_endpoint::<S>),
2938 )
2939 .route(
2940 &actuator_route_path(prefix, "/webhooks/replay"),
2941 axum::routing::post(webhooks_replay_endpoint::<S>),
2942 );
2943 }
2944
2945 #[cfg(feature = "system-info")]
2946 {
2947 router = router.route(
2948 &actuator_route_path(prefix, "/system"),
2949 axum::routing::get(crate::system_info::system_info_handler),
2950 );
2951 }
2952
2953 #[cfg(feature = "ws")]
2954 {
2955 router = router
2956 .route(
2957 &actuator_route_path(prefix, "/channels"),
2958 axum::routing::get(channels_endpoint::<S>),
2959 )
2960 .route(
2961 &actuator_route_path(prefix, "/tasks/stream"),
2962 axum::routing::get(tasks_stream_endpoint::<S>),
2963 );
2964 }
2965 }
2966
2967 router
2969 .route(
2970 &actuator_route_path(prefix, "/ui"),
2971 axum::routing::get(ui_dashboard),
2972 )
2973 .route(
2974 &actuator_route_path(prefix, "/ui/metrics"),
2975 axum::routing::get(ui_metrics::<S>),
2976 )
2977}
2978
2979#[cfg(test)]
2980mod tests {
2981 use super::*;
2982 use crate::config::AutumnConfig;
2983
2984 #[test]
2985 fn task_registry_flow() {
2986 let registry = TaskRegistry::new();
2987
2988 registry.register_scheduled(
2989 "my_task",
2990 "0 * * * * *",
2991 crate::task::TaskCoordination::Fleet,
2992 "mock",
2993 "node-1",
2994 );
2995 let snap1 = registry.snapshot();
2996 assert_eq!(snap1.get("my_task").unwrap().total_runs, 0);
2997
2998 registry.record_leader("my_task", "node-1", "mock_tick");
2999 let snap3 = registry.snapshot();
3000 assert_eq!(
3001 snap3.get("my_task").unwrap().current_leader.as_deref(),
3002 Some("node-1")
3003 );
3004
3005 registry.record_start("my_task");
3006 let snap4 = registry.snapshot();
3007 assert_eq!(snap4.get("my_task").unwrap().status, "running");
3008
3009 registry.record_next_run_at("my_task", "tomorrow");
3010 let snap5 = registry.snapshot();
3011 assert_eq!(
3012 snap5.get("my_task").unwrap().next_run_at.as_deref(),
3013 Some("tomorrow")
3014 );
3015
3016 registry.record_success("my_task", 100);
3017 let snap6 = registry.snapshot();
3018 assert_eq!(snap6.get("my_task").unwrap().total_runs, 1);
3019 assert_eq!(snap6.get("my_task").unwrap().last_error, None);
3020
3021 registry.record_failure("my_task", 150, "error message");
3022 let snap7 = registry.snapshot();
3023 assert_eq!(snap7.get("my_task").unwrap().total_runs, 2);
3024 assert_eq!(snap7.get("my_task").unwrap().total_failures, 1);
3025 assert_eq!(
3026 snap7.get("my_task").unwrap().last_error.as_deref(),
3027 Some("error message")
3028 );
3029
3030 let registry2 = TaskRegistry::default();
3031 assert!(registry2.snapshot().is_empty());
3032 }
3033 #[test]
3034 fn job_registry_flow() {
3035 let registry = JobRegistry::new();
3036
3037 registry.register("my_job");
3038 let snap1 = registry.snapshot();
3039 assert_eq!(snap1.get("my_job").unwrap().queued, 0);
3040
3041 registry.record_enqueue("my_job");
3042 let snap2 = registry.snapshot();
3043 assert_eq!(snap2.get("my_job").unwrap().queued, 1);
3044
3045 registry.record_start("my_job");
3046 let snap3 = registry.snapshot();
3047 assert_eq!(snap3.get("my_job").unwrap().queued, 0);
3048 assert_eq!(snap3.get("my_job").unwrap().in_flight, 1);
3049
3050 registry.record_retry("my_job", "timeout", 1);
3051 let snap4 = registry.snapshot();
3052 assert_eq!(snap4.get("my_job").unwrap().in_flight, 0);
3053 assert_eq!(
3054 snap4.get("my_job").unwrap().last_error.as_deref(),
3055 Some("timeout")
3056 );
3057
3058 registry.record_enqueue("my_job");
3059 registry.record_start("my_job");
3060 registry.record_success("my_job");
3061 let snap5 = registry.snapshot();
3062 assert_eq!(snap5.get("my_job").unwrap().in_flight, 0);
3063 assert_eq!(snap5.get("my_job").unwrap().total_successes, 1);
3064 assert_eq!(snap5.get("my_job").unwrap().last_error, None);
3065
3066 registry.record_enqueue("my_job");
3067 registry.record_cancel("my_job");
3068 let snap6 = registry.snapshot();
3069 assert_eq!(snap6.get("my_job").unwrap().queued, 0);
3070 assert_eq!(snap6.get("my_job").unwrap().in_flight, 0);
3071
3072 registry.record_enqueue("my_job");
3073 registry.record_start("my_job");
3074 registry.record_failure("my_job", "failure".to_string(), true);
3075 let snap7 = registry.snapshot();
3076 assert_eq!(snap7.get("my_job").unwrap().in_flight, 0);
3077 assert_eq!(snap7.get("my_job").unwrap().total_failures, 1);
3078 assert_eq!(snap7.get("my_job").unwrap().dead_letters, 1);
3079 assert_eq!(
3080 snap7.get("my_job").unwrap().last_error.as_deref(),
3081 Some("failure")
3082 );
3083
3084 let registry2 = JobRegistry::default();
3085 let snap8 = registry2.snapshot();
3086 assert!(snap8.is_empty());
3087 }
3088 use axum::body::Body;
3089 use axum::http::Request;
3090 use tower::ServiceExt;
3091
3092 #[derive(Clone)]
3093 struct TestActuatorState {
3094 profile: String,
3095 deploy_version: String,
3096 metrics: crate::middleware::MetricsCollector,
3097 log_levels: LogLevels,
3098 task_registry: TaskRegistry,
3099 job_registry: JobRegistry,
3100 config_props: ConfigProperties,
3101 metrics_source_registry: MetricsSourceRegistry,
3102 health_indicator_registry: HealthIndicatorRegistry,
3103 health_detailed: bool,
3104 log_buffer: Option<crate::log::capture::LogBuffer>,
3105 #[cfg(feature = "http-client")]
3106 webhook_outbound: Option<crate::webhook_outbound::WebhookOutboundManager>,
3107 #[cfg(feature = "db")]
3108 pool: Option<
3109 diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
3110 >,
3111 #[cfg(feature = "ws")]
3112 channels: crate::channels::Channels,
3113 #[cfg(feature = "ws")]
3114 shutdown: tokio_util::sync::CancellationToken,
3115 }
3116
3117 impl ProvideActuatorState for TestActuatorState {
3118 fn metrics(&self) -> &crate::middleware::MetricsCollector {
3119 &self.metrics
3120 }
3121 fn log_levels(&self) -> &LogLevels {
3122 &self.log_levels
3123 }
3124 fn task_registry(&self) -> &TaskRegistry {
3125 &self.task_registry
3126 }
3127 fn job_registry(&self) -> &JobRegistry {
3128 &self.job_registry
3129 }
3130 fn config_props(&self) -> &ConfigProperties {
3131 &self.config_props
3132 }
3133 fn profile(&self) -> &str {
3134 &self.profile
3135 }
3136 fn uptime_display(&self) -> String {
3137 "test_uptime".to_string()
3138 }
3139 fn deploy_version(&self) -> String {
3140 self.deploy_version.clone()
3141 }
3142 fn metrics_source_registry(&self) -> Option<&MetricsSourceRegistry> {
3143 Some(&self.metrics_source_registry)
3144 }
3145 #[cfg(feature = "http-client")]
3146 fn webhook_outbound(&self) -> Option<crate::webhook_outbound::WebhookOutboundManager> {
3147 self.webhook_outbound.clone()
3148 }
3149 #[cfg(feature = "db")]
3150 fn pool(
3151 &self,
3152 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
3153 {
3154 self.pool.as_ref()
3155 }
3156 #[cfg(feature = "ws")]
3157 fn channels(&self) -> &crate::channels::Channels {
3158 &self.channels
3159 }
3160 #[cfg(feature = "ws")]
3161 fn shutdown_token(&self) -> tokio_util::sync::CancellationToken {
3162 self.shutdown.clone()
3163 }
3164 fn health_indicator_registry(&self) -> Option<&HealthIndicatorRegistry> {
3165 Some(&self.health_indicator_registry)
3166 }
3167 fn health_detailed(&self) -> bool {
3168 self.health_detailed
3169 }
3170 fn log_buffer(&self) -> Option<crate::log::capture::LogBuffer> {
3171 self.log_buffer.clone()
3172 }
3173 }
3174
3175 fn test_state() -> TestActuatorState {
3176 test_state_with_config(&AutumnConfig::default())
3177 }
3178
3179 fn test_state_with_config(config: &AutumnConfig) -> TestActuatorState {
3180 TestActuatorState {
3181 profile: config.profile.clone().unwrap_or_else(|| "dev".into()),
3182 deploy_version: crate::canary::STABLE.to_owned(),
3183 metrics: crate::middleware::MetricsCollector::new(),
3184 log_levels: LogLevels::new("info"),
3185 task_registry: TaskRegistry::new(),
3186 job_registry: JobRegistry::new(),
3187 config_props: ConfigProperties::from_config(config),
3188 metrics_source_registry: MetricsSourceRegistry::new(),
3189 health_indicator_registry: HealthIndicatorRegistry::new(),
3190 health_detailed: config.health.detailed,
3191 log_buffer: None,
3192 #[cfg(feature = "http-client")]
3193 webhook_outbound: None,
3194 #[cfg(feature = "db")]
3195 pool: None,
3196 #[cfg(feature = "ws")]
3197 channels: crate::channels::Channels::new(32),
3198 #[cfg(feature = "ws")]
3199 shutdown: tokio_util::sync::CancellationToken::new(),
3200 }
3201 }
3202
3203 #[cfg(feature = "http-client")]
3204 fn test_state_with_webhook_outbound(
3205 manager: crate::webhook_outbound::WebhookOutboundManager,
3206 ) -> TestActuatorState {
3207 let mut state = test_state();
3208 state.webhook_outbound = Some(manager);
3209 state
3210 }
3211
3212 #[cfg(feature = "http-client")]
3213 fn replay_test_subscription() -> crate::webhook_outbound::WebhookSubscription {
3214 crate::webhook_outbound::WebhookSubscription {
3215 id: "sub-replay".to_string(),
3216 target_url: "https://example.test/webhook".to_string(),
3217 event_topics: vec!["order.created".to_string()],
3218 secret: "secret".to_string(),
3219 status: crate::webhook_outbound::WebhookSubscriptionStatus::Failed,
3220 consecutive_failures: 50,
3221 }
3222 }
3223
3224 #[cfg(feature = "http-client")]
3225 fn replay_test_dlq_log() -> crate::webhook_outbound::WebhookDeliveryLog {
3226 crate::webhook_outbound::WebhookDeliveryLog {
3227 id: "log-replay".to_string(),
3228 subscription_id: "sub-replay".to_string(),
3229 topic: "order.created".to_string(),
3230 payload: "{\"id\":123}".to_string(),
3231 request_headers: std::collections::HashMap::new(),
3232 response_status: Some(503),
3233 response_body: Some("unavailable".to_string()),
3234 elapsed_ms: 42,
3235 attempt: 5,
3236 max_attempts: 5,
3237 is_dlq: true,
3238 last_error: Some("server returned status: 503".to_string()),
3239 timestamp: chrono::Utc::now(),
3240 }
3241 }
3242
3243 #[cfg(feature = "http-client")]
3244 #[tokio::test]
3245 async fn webhooks_replay_preserves_dlq_log_and_failures_when_enqueue_is_unavailable() {
3246 use crate::webhook_outbound::{
3247 InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3248 };
3249
3250 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3251 crate::job::clear_global_job_client();
3252
3253 let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3254 handler
3255 .create_subscription(replay_test_subscription())
3256 .await
3257 .expect("subscription setup");
3258 let original_log = replay_test_dlq_log();
3259 handler
3260 .log_delivery(original_log.clone())
3261 .await
3262 .expect("dlq log setup");
3263 let failures_before_replay = handler
3264 .get_subscription("sub-replay")
3265 .await
3266 .expect("subscription lookup")
3267 .expect("subscription should exist")
3268 .consecutive_failures;
3269
3270 let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3271 let response = webhooks_replay_endpoint(
3272 State(state),
3273 Json(ReplayRequest {
3274 log_id: original_log.id.clone(),
3275 }),
3276 )
3277 .await
3278 .into_response();
3279
3280 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
3281
3282 let stored_log = handler
3283 .get_delivery_log(&original_log.id)
3284 .await
3285 .expect("delivery log lookup")
3286 .expect("delivery log should still exist");
3287 assert!(stored_log.is_dlq, "failed enqueue must keep log in DLQ");
3288 assert_eq!(stored_log.attempt, original_log.attempt);
3289 assert_eq!(stored_log.last_error, original_log.last_error);
3290 assert_eq!(stored_log.response_status, original_log.response_status);
3291 assert_eq!(stored_log.response_body, original_log.response_body);
3292
3293 let subscription = handler
3294 .get_subscription("sub-replay")
3295 .await
3296 .expect("subscription lookup")
3297 .expect("subscription should exist");
3298 assert_eq!(
3299 subscription.consecutive_failures, failures_before_replay,
3300 "failed enqueue must not reset subscription failure history"
3301 );
3302 assert_eq!(
3303 subscription.status,
3304 crate::webhook_outbound::WebhookSubscriptionStatus::Failed,
3305 "failed enqueue must not reactivate an auto-failed subscription"
3306 );
3307
3308 crate::job::clear_global_job_client();
3309 }
3310
3311 #[cfg(feature = "http-client")]
3312 #[tokio::test]
3313 async fn webhooks_replay_rejects_disabled_subscription_without_removing_dlq() {
3314 use crate::webhook_outbound::{
3315 InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3316 WebhookSubscriptionStatus,
3317 };
3318
3319 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3320 crate::job::clear_global_job_client();
3321
3322 let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3323 let mut subscription = replay_test_subscription();
3324 subscription.status = WebhookSubscriptionStatus::Disabled;
3325 subscription.consecutive_failures = 0;
3326 handler
3327 .create_subscription(subscription)
3328 .await
3329 .expect("subscription setup");
3330 let original_log = replay_test_dlq_log();
3331 handler
3332 .log_delivery(original_log.clone())
3333 .await
3334 .expect("dlq log setup");
3335
3336 let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3337 let response = webhooks_replay_endpoint(
3338 State(state),
3339 Json(ReplayRequest {
3340 log_id: original_log.id.clone(),
3341 }),
3342 )
3343 .await
3344 .into_response();
3345
3346 assert_eq!(response.status(), StatusCode::CONFLICT);
3347
3348 let stored_log = handler
3349 .get_delivery_log(&original_log.id)
3350 .await
3351 .expect("delivery log lookup")
3352 .expect("delivery log should still exist");
3353 assert!(stored_log.is_dlq);
3354 assert_eq!(stored_log.attempt, original_log.attempt);
3355 assert_eq!(stored_log.response_status, original_log.response_status);
3356 assert_eq!(stored_log.last_error, original_log.last_error);
3357
3358 let subscription = handler
3359 .get_subscription("sub-replay")
3360 .await
3361 .expect("subscription lookup")
3362 .expect("subscription should exist");
3363 assert_eq!(subscription.status, WebhookSubscriptionStatus::Disabled);
3364
3365 crate::job::clear_global_job_client();
3366 }
3367
3368 #[cfg(feature = "http-client")]
3369 #[tokio::test]
3370 async fn webhooks_replay_rejects_missing_subscription_without_removing_dlq() {
3371 use crate::webhook_outbound::{
3372 InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3373 };
3374
3375 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3376 crate::job::clear_global_job_client();
3377
3378 let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3379 let original_log = replay_test_dlq_log();
3380 handler
3381 .log_delivery(original_log.clone())
3382 .await
3383 .expect("dlq log setup");
3384
3385 let runtime_state = crate::AppState::for_test().with_profile("test");
3386 let shutdown = tokio_util::sync::CancellationToken::new();
3387 crate::job::start_runtime(
3388 vec![crate::job::JobInfo {
3389 name: "autumn_webhook_delivery".to_string(),
3390 max_attempts: 1,
3391 initial_backoff_ms: 1,
3392 uniqueness: None,
3393 concurrency: None,
3394 handler: |_state, _payload| Box::pin(async move { Ok(()) }),
3395 }],
3396 &runtime_state,
3397 &shutdown,
3398 &crate::config::JobConfig::default(),
3399 )
3400 .expect("job runtime should start");
3401
3402 let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3403 let response = webhooks_replay_endpoint(
3404 State(state),
3405 Json(ReplayRequest {
3406 log_id: original_log.id.clone(),
3407 }),
3408 )
3409 .await
3410 .into_response();
3411
3412 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3413
3414 let stored_log = handler
3415 .get_delivery_log(&original_log.id)
3416 .await
3417 .expect("delivery log lookup")
3418 .expect("delivery log should still exist");
3419 assert!(stored_log.is_dlq);
3420 assert_eq!(stored_log.attempt, original_log.attempt);
3421 assert_eq!(stored_log.response_status, original_log.response_status);
3422 assert_eq!(stored_log.response_body, original_log.response_body);
3423 assert_eq!(stored_log.last_error, original_log.last_error);
3424
3425 assert!(
3426 handler
3427 .get_subscription("sub-replay")
3428 .await
3429 .expect("subscription lookup")
3430 .is_none(),
3431 "test setup should leave the subscription missing"
3432 );
3433
3434 shutdown.cancel();
3435 crate::job::clear_global_job_client();
3436 }
3437
3438 #[cfg(feature = "http-client")]
3439 #[tokio::test]
3440 async fn webhooks_replay_resets_log_and_failures_after_enqueue_succeeds() {
3441 use crate::webhook_outbound::{
3442 InMemoryOutboundWebhookHandler, OutboundWebhookHandler, WebhookOutboundManager,
3443 };
3444
3445 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
3446 crate::job::clear_global_job_client();
3447
3448 let handler = Arc::new(InMemoryOutboundWebhookHandler::new());
3449 handler
3450 .create_subscription(replay_test_subscription())
3451 .await
3452 .expect("subscription setup");
3453 let original_log = replay_test_dlq_log();
3454 handler
3455 .log_delivery(original_log.clone())
3456 .await
3457 .expect("dlq log setup");
3458
3459 let runtime_state = crate::AppState::for_test().with_profile("test");
3460 let shutdown = tokio_util::sync::CancellationToken::new();
3461 crate::job::start_runtime(
3462 vec![crate::job::JobInfo {
3463 name: "autumn_webhook_delivery".to_string(),
3464 max_attempts: 1,
3465 initial_backoff_ms: 1,
3466 uniqueness: None,
3467 concurrency: None,
3468 handler: |_state, _payload| Box::pin(async move { Ok(()) }),
3469 }],
3470 &runtime_state,
3471 &shutdown,
3472 &crate::config::JobConfig::default(),
3473 )
3474 .expect("job runtime should start");
3475
3476 let state = test_state_with_webhook_outbound(WebhookOutboundManager::new(handler.clone()));
3477 let response = webhooks_replay_endpoint(
3478 State(state),
3479 Json(ReplayRequest {
3480 log_id: original_log.id.clone(),
3481 }),
3482 )
3483 .await
3484 .into_response();
3485
3486 assert_eq!(response.status(), StatusCode::OK);
3487
3488 let stored_log = handler
3489 .get_delivery_log(&original_log.id)
3490 .await
3491 .expect("delivery log lookup")
3492 .expect("delivery log should still exist");
3493 assert!(!stored_log.is_dlq);
3494 assert_eq!(stored_log.attempt, 1);
3495 assert_eq!(stored_log.last_error, None);
3496 assert_eq!(stored_log.response_status, None);
3497 assert_eq!(stored_log.response_body, None);
3498
3499 let subscription = handler
3500 .get_subscription("sub-replay")
3501 .await
3502 .expect("subscription lookup")
3503 .expect("subscription should exist");
3504 assert_eq!(subscription.consecutive_failures, 0);
3505 assert_eq!(
3506 subscription.status,
3507 crate::webhook_outbound::WebhookSubscriptionStatus::Active
3508 );
3509
3510 shutdown.cancel();
3511 crate::job::clear_global_job_client();
3512 }
3513
3514 #[tokio::test]
3515 async fn actuator_health_returns_ok() {
3516 let app = actuator_router(true).with_state(test_state());
3517 let resp = app
3518 .oneshot(
3519 Request::builder()
3520 .uri("/actuator/health")
3521 .body(Body::empty())
3522 .unwrap(),
3523 )
3524 .await
3525 .unwrap();
3526
3527 assert_eq!(resp.status(), StatusCode::OK);
3528 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3529 .await
3530 .unwrap();
3531 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3532 assert_eq!(json["status"], "UP");
3533 assert_eq!(json["profile"], "dev");
3534 assert!(json["uptime"].is_string());
3535 }
3536
3537 #[cfg(feature = "db")]
3538 #[tokio::test]
3539 async fn actuator_health_exposes_after_commit_failure_counter() {
3540 let app = actuator_router(true).with_state(test_state());
3541 let resp = app
3542 .oneshot(
3543 Request::builder()
3544 .uri("/actuator/health")
3545 .body(Body::empty())
3546 .unwrap(),
3547 )
3548 .await
3549 .unwrap();
3550
3551 assert_eq!(resp.status(), StatusCode::OK);
3552 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3553 .await
3554 .unwrap();
3555 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3556 assert_eq!(
3557 json["autumn_after_commit_failures_total"],
3558 crate::db::AFTER_COMMIT_FAILURES_TOTAL.load(std::sync::atomic::Ordering::Relaxed),
3559 "/actuator/health should expose the documented after_commit counter"
3560 );
3561 }
3562
3563 #[tokio::test]
3564 #[allow(clippy::await_holding_lock)]
3565 async fn actuator_circuitbreakers_returns_breakers() {
3566 let _lock = crate::circuit_breaker::TEST_LOCK
3567 .lock()
3568 .unwrap_or_else(std::sync::PoisonError::into_inner);
3569 crate::circuit_breaker::global_registry().clear();
3570 let breaker = crate::circuit_breaker::global_registry().get_or_create(
3571 "actuator_endpoint_test_breaker",
3572 crate::circuit_breaker::CircuitBreakerPolicy {
3573 failure_ratio_threshold: 0.5,
3574 sample_window: std::time::Duration::from_secs(10),
3575 minimum_sample_count: 2,
3576 open_duration: std::time::Duration::from_secs(60),
3577 half_open_trial_count: 2,
3578 },
3579 );
3580 assert_eq!(
3581 breaker.state(),
3582 crate::circuit_breaker::CircuitState::Closed
3583 );
3584
3585 let mut detailed_config = AutumnConfig::default();
3586 detailed_config.health.detailed = true;
3587 let state = test_state_with_config(&detailed_config);
3588 let app = actuator_router(true).with_state(state);
3589 let resp = app
3590 .clone()
3591 .oneshot(
3592 Request::builder()
3593 .uri("/actuator/circuitbreakers")
3594 .body(Body::empty())
3595 .unwrap(),
3596 )
3597 .await
3598 .unwrap();
3599
3600 assert_eq!(resp.status(), StatusCode::OK);
3601 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3602 .await
3603 .unwrap();
3604 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3605 let list = json.as_array().expect("Should be a JSON array");
3606 let item = list
3607 .iter()
3608 .find(|i| i["name"] == "actuator_endpoint_test_breaker")
3609 .expect("Should find our breaker");
3610 assert_eq!(item["state"], "CLOSED");
3611 assert_eq!(item["failure_ratio_threshold"], 0.5);
3612 assert_eq!(item["minimum_sample_count"], 2);
3613
3614 let mut undetailed_config = AutumnConfig::default();
3615 undetailed_config.health.detailed = false;
3616 let undetailed_state = test_state_with_config(&undetailed_config);
3617 let app_undetailed = actuator_router(true).with_state(undetailed_state);
3618 let resp_undetailed = app_undetailed
3619 .oneshot(
3620 Request::builder()
3621 .uri("/actuator/circuitbreakers")
3622 .body(Body::empty())
3623 .unwrap(),
3624 )
3625 .await
3626 .unwrap();
3627
3628 assert_eq!(resp_undetailed.status(), StatusCode::OK);
3629 let body_undetailed = axum::body::to_bytes(resp_undetailed.into_body(), usize::MAX)
3630 .await
3631 .unwrap();
3632 let json_undetailed: serde_json::Value = serde_json::from_slice(&body_undetailed).unwrap();
3633 let list_undetailed = json_undetailed.as_array().expect("Should be a JSON array");
3634 let item_undetailed = list_undetailed
3635 .iter()
3636 .find(|i| i["name"] == "actuator_endpoint_test_breaker")
3637 .expect("Should find our breaker");
3638 assert_eq!(item_undetailed["state"], "CLOSED");
3639 assert!(item_undetailed.get("failure_ratio_threshold").is_none());
3640 assert!(item_undetailed.get("minimum_sample_count").is_none());
3641 crate::circuit_breaker::global_registry().clear();
3642 }
3643
3644 #[tokio::test]
3645 #[allow(clippy::await_holding_lock)]
3646 async fn test_health_hides_circuit_breakers_when_undetailed() {
3647 let _lock = crate::circuit_breaker::TEST_LOCK
3648 .lock()
3649 .unwrap_or_else(std::sync::PoisonError::into_inner);
3650 crate::circuit_breaker::global_registry().clear();
3651
3652 let _breaker = crate::circuit_breaker::global_registry().get_or_create(
3653 "test_health_hide_breaker",
3654 crate::circuit_breaker::CircuitBreakerPolicy {
3655 failure_ratio_threshold: 0.5,
3656 sample_window: std::time::Duration::from_secs(10),
3657 minimum_sample_count: 2,
3658 open_duration: std::time::Duration::from_secs(60),
3659 half_open_trial_count: 2,
3660 },
3661 );
3662
3663 let mut detailed_config = AutumnConfig::default();
3664 detailed_config.health.detailed = true;
3665 let state = test_state_with_config(&detailed_config);
3666 let app = actuator_router(true).with_state(state);
3667 let resp = app
3668 .oneshot(
3669 Request::builder()
3670 .uri("/actuator/health")
3671 .body(Body::empty())
3672 .unwrap(),
3673 )
3674 .await
3675 .unwrap();
3676
3677 assert_eq!(resp.status(), StatusCode::OK);
3678 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3679 .await
3680 .unwrap();
3681 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3682 assert!(json["components"]["circuit_breaker.test_health_hide_breaker"].is_object());
3683
3684 let mut undetailed_config = AutumnConfig::default();
3685 undetailed_config.health.detailed = false;
3686 let undetailed_state = test_state_with_config(&undetailed_config);
3687 let app_undetailed = actuator_router(true).with_state(undetailed_state);
3688 let resp_undetailed = app_undetailed
3689 .oneshot(
3690 Request::builder()
3691 .uri("/actuator/health")
3692 .body(Body::empty())
3693 .unwrap(),
3694 )
3695 .await
3696 .unwrap();
3697
3698 assert_eq!(resp_undetailed.status(), StatusCode::OK);
3699 let body_undetailed = axum::body::to_bytes(resp_undetailed.into_body(), usize::MAX)
3700 .await
3701 .unwrap();
3702 let json_undetailed: serde_json::Value = serde_json::from_slice(&body_undetailed).unwrap();
3703
3704 if let Some(components) = json_undetailed.get("components") {
3705 assert!(
3706 components
3707 .get("circuit_breaker.test_health_hide_breaker")
3708 .is_none()
3709 );
3710 }
3711
3712 crate::circuit_breaker::global_registry().clear();
3713 }
3714
3715 #[tokio::test]
3716 async fn actuator_routes_respect_custom_prefix() {
3717 let app = actuator_router_with_prefix("/ops", true, true).with_state(test_state());
3718
3719 let prefixed = app
3720 .clone()
3721 .oneshot(
3722 Request::builder()
3723 .uri("/ops/health")
3724 .body(Body::empty())
3725 .unwrap(),
3726 )
3727 .await
3728 .unwrap();
3729 assert_eq!(prefixed.status(), StatusCode::OK);
3730
3731 let legacy = app
3732 .oneshot(
3733 Request::builder()
3734 .uri("/actuator/health")
3735 .body(Body::empty())
3736 .unwrap(),
3737 )
3738 .await
3739 .unwrap();
3740 assert_eq!(legacy.status(), StatusCode::NOT_FOUND);
3741 }
3742
3743 #[test]
3744 fn actuator_route_helpers_normalize_prefixes() {
3745 assert_eq!(actuator_route_glob("ops/"), "/ops/*");
3746 assert_eq!(actuator_route_path("ops/", "/health"), "/ops/health");
3747 assert_eq!(actuator_route_glob("/"), "/*");
3748 }
3749
3750 #[tokio::test]
3751 async fn actuator_info_returns_metadata() {
3752 let app = actuator_router(true).with_state(test_state());
3753 let resp = app
3754 .oneshot(
3755 Request::builder()
3756 .uri("/actuator/info")
3757 .body(Body::empty())
3758 .unwrap(),
3759 )
3760 .await
3761 .unwrap();
3762
3763 assert_eq!(resp.status(), StatusCode::OK);
3764 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3765 .await
3766 .unwrap();
3767 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3768 assert!(json["autumn"]["version"].is_string());
3769 assert_eq!(json["autumn"]["profile"], "dev");
3770 }
3771
3772 #[tokio::test]
3773 async fn actuator_env_available_in_sensitive_mode() {
3774 let config = AutumnConfig {
3775 profile: Some("prod".into()),
3776 server: crate::config::ServerConfig {
3777 port: 4100,
3778 ..crate::config::ServerConfig::default()
3779 },
3780 telemetry: crate::config::TelemetryConfig {
3781 enabled: true,
3782 service_name: "cloud-app".into(),
3783 ..crate::config::TelemetryConfig::default()
3784 },
3785 health: crate::config::HealthConfig {
3786 path: "/healthz".into(),
3787 ..crate::config::HealthConfig::default()
3788 },
3789 ..AutumnConfig::default()
3790 };
3791
3792 let app = actuator_router(true).with_state(test_state_with_config(&config));
3793 let resp = app
3794 .oneshot(
3795 Request::builder()
3796 .uri("/actuator/env")
3797 .body(Body::empty())
3798 .unwrap(),
3799 )
3800 .await
3801 .unwrap();
3802 assert_eq!(resp.status(), StatusCode::OK);
3803 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3804 .await
3805 .unwrap();
3806 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3807 assert_eq!(json["active_profile"], "prod");
3808 assert_eq!(json["properties"]["server.port"], "4100");
3809 assert_eq!(json["properties"]["telemetry.enabled"], "true");
3810 assert_eq!(json["properties"]["telemetry.service_name"], "cloud-app");
3811 assert_eq!(json["properties"]["health.path"], "/healthz");
3812 }
3813
3814 #[tokio::test]
3815 async fn actuator_env_hidden_in_nonsensitive_mode() {
3816 let app = actuator_router(false).with_state(test_state());
3817 let resp = app
3818 .oneshot(
3819 Request::builder()
3820 .uri("/actuator/env")
3821 .body(Body::empty())
3822 .unwrap(),
3823 )
3824 .await
3825 .unwrap();
3826 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3827 }
3828
3829 #[tokio::test]
3830 async fn actuator_circuitbreakers_hidden_in_nonsensitive_mode() {
3831 let app = actuator_router(false).with_state(test_state());
3832 let resp = app
3833 .oneshot(
3834 Request::builder()
3835 .uri("/actuator/circuitbreakers")
3836 .body(Body::empty())
3837 .unwrap(),
3838 )
3839 .await
3840 .unwrap();
3841 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3842 }
3843
3844 #[test]
3845 fn redaction_patterns() {
3846 assert!(should_redact("database.url"));
3847 assert!(should_redact("api_token"));
3848 assert!(should_redact("secret_key"));
3849 assert!(!should_redact("server.port"));
3850 assert!(!should_redact("log.level"));
3851 }
3852
3853 #[tokio::test]
3856 async fn actuator_metrics_returns_http_stats() {
3857 let state = test_state();
3858 state.metrics().record("GET", "/test", 200, 10);
3859 state.metrics().record("POST", "/test", 500, 50);
3860
3861 let app = actuator_router(true).with_state(state);
3862 let resp = app
3863 .oneshot(
3864 Request::builder()
3865 .uri("/actuator/metrics")
3866 .body(Body::empty())
3867 .unwrap(),
3868 )
3869 .await
3870 .unwrap();
3871
3872 assert_eq!(resp.status(), StatusCode::OK);
3873 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3874 .await
3875 .unwrap();
3876 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3877 assert_eq!(json["http"]["requests_total"], 2);
3878 assert_eq!(json["http"]["by_status"]["2xx"], 1);
3879 assert_eq!(json["http"]["by_status"]["5xx"], 1);
3880 }
3881
3882 #[tokio::test]
3883 async fn actuator_metrics_available_in_nonsensitive_mode() {
3884 let app = actuator_router(false).with_state(test_state());
3885 let resp = app
3886 .oneshot(
3887 Request::builder()
3888 .uri("/actuator/metrics")
3889 .body(Body::empty())
3890 .unwrap(),
3891 )
3892 .await
3893 .unwrap();
3894 assert_eq!(resp.status(), StatusCode::OK);
3895 }
3896
3897 #[tokio::test]
3898 #[cfg(feature = "db")]
3899 async fn actuator_metrics_returns_db_stats_when_pool_present() {
3900 use diesel_async::AsyncPgConnection;
3901 use diesel_async::pooled_connection::AsyncDieselConnectionManager;
3902 use diesel_async::pooled_connection::deadpool::Pool;
3903
3904 let mut state = test_state();
3905
3906 let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(
3907 "postgres://postgres:postgres@localhost:5432/postgres",
3908 );
3909 let pool = Pool::builder(manager).build().unwrap();
3910
3911 state.pool = Some(pool);
3912
3913 let app = actuator_router(true).with_state(state);
3914 let resp = app
3915 .oneshot(
3916 Request::builder()
3917 .uri("/actuator/metrics")
3918 .body(Body::empty())
3919 .unwrap(),
3920 )
3921 .await
3922 .unwrap();
3923
3924 assert_eq!(resp.status(), StatusCode::OK);
3925 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3926 .await
3927 .unwrap();
3928 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3929
3930 assert!(json.get("database").is_some());
3931 }
3932
3933 #[tokio::test]
3936 async fn actuator_configprops_returns_properties() {
3937 let app = actuator_router(true).with_state(test_state());
3938 let resp = app
3939 .oneshot(
3940 Request::builder()
3941 .uri("/actuator/configprops")
3942 .body(Body::empty())
3943 .unwrap(),
3944 )
3945 .await
3946 .unwrap();
3947
3948 assert_eq!(resp.status(), StatusCode::OK);
3949 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
3950 .await
3951 .unwrap();
3952 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
3953 assert_eq!(json["active_profile"], "dev");
3954 assert!(json["properties"].is_object());
3955 }
3956
3957 #[tokio::test]
3958 async fn actuator_configprops_hidden_in_nonsensitive_mode() {
3959 let app = actuator_router(false).with_state(test_state());
3960 let resp = app
3961 .oneshot(
3962 Request::builder()
3963 .uri("/actuator/configprops")
3964 .body(Body::empty())
3965 .unwrap(),
3966 )
3967 .await
3968 .unwrap();
3969 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3970 }
3971
3972 #[test]
3973 fn configprops_redacts_sensitive_values() {
3974 let mut props = HashMap::new();
3975 ConfigProperties::track_property(
3976 &mut props,
3977 "database.url",
3978 "postgres://user:pass@host/db",
3979 "",
3980 "dev",
3981 );
3982 assert_eq!(props["database.url"].value, "****");
3983 }
3984
3985 #[test]
3986 fn configprops_tracks_default_source() {
3987 let mut props = HashMap::new();
3988 ConfigProperties::track_property(&mut props, "server.port", "3000", "3000", "dev");
3989 assert_eq!(props["server.port"].source, "default");
3990 assert_eq!(props["server.port"].value, "3000");
3991 }
3992
3993 #[test]
3994 fn configprops_tracks_profile_source() {
3995 let mut props = HashMap::new();
3996 ConfigProperties::track_property(&mut props, "log.level", "debug", "info", "dev");
3997 assert_eq!(props["log.level"].source, "profile_default:dev");
3998 }
3999
4000 #[tokio::test]
4003 async fn actuator_loggers_get_returns_levels() {
4004 let app = actuator_router(true).with_state(test_state());
4005 let resp = app
4006 .oneshot(
4007 Request::builder()
4008 .uri("/actuator/loggers")
4009 .body(Body::empty())
4010 .unwrap(),
4011 )
4012 .await
4013 .unwrap();
4014
4015 assert_eq!(resp.status(), StatusCode::OK);
4016 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4017 .await
4018 .unwrap();
4019 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4020 assert_eq!(json["current_level"], "info");
4021 assert!(json["available_levels"].is_array());
4022 }
4023
4024 #[tokio::test]
4025 async fn actuator_loggers_put_changes_level() {
4026 let state = test_state();
4027 let app = actuator_router(true).with_state(state.clone());
4028 let resp = app
4029 .oneshot(
4030 Request::builder()
4031 .method("PUT")
4032 .uri("/actuator/loggers/autumn_web")
4033 .header("content-type", "application/json")
4034 .body(Body::from(r#"{"level": "debug"}"#))
4035 .unwrap(),
4036 )
4037 .await
4038 .unwrap();
4039
4040 assert_eq!(resp.status(), StatusCode::OK);
4041 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4042 .await
4043 .unwrap();
4044 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4045 assert_eq!(json["status"], "ok");
4046 assert_eq!(json["message"], "Logger 'autumn_web' set to 'debug'");
4047
4048 let overrides = state.log_levels().logger_overrides();
4049 assert_eq!(
4050 overrides.get("autumn_web").map(String::as_str),
4051 Some("debug")
4052 );
4053 }
4054
4055 #[tokio::test]
4056 async fn actuator_loggers_put_rejects_invalid_level() {
4057 let app = actuator_router(true).with_state(test_state());
4058 let resp = app
4059 .oneshot(
4060 Request::builder()
4061 .method("PUT")
4062 .uri("/actuator/loggers/autumn_web")
4063 .header("content-type", "application/json")
4064 .body(Body::from(r#"{"level": "banana"}"#))
4065 .unwrap(),
4066 )
4067 .await
4068 .unwrap();
4069
4070 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
4071 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4072 .await
4073 .unwrap();
4074 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4075 assert_eq!(json["status"], "error");
4076 }
4077
4078 #[tokio::test]
4079 async fn actuator_loggers_hidden_in_nonsensitive_mode() {
4080 let app = actuator_router(false).with_state(test_state());
4081 let resp = app
4082 .oneshot(
4083 Request::builder()
4084 .uri("/actuator/loggers")
4085 .body(Body::empty())
4086 .unwrap(),
4087 )
4088 .await
4089 .unwrap();
4090 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4091 }
4092
4093 #[test]
4094 fn log_levels_set_and_get() {
4095 let levels = LogLevels::new("info");
4096 assert_eq!(levels.current_level(), "info");
4097
4098 let _ = levels.set_logger_level("my_crate", "debug");
4099 let overrides = levels.logger_overrides();
4100 assert_eq!(overrides.get("my_crate").map(String::as_str), Some("debug"));
4101 }
4102
4103 #[test]
4104 fn log_levels_root_updates_current() {
4105 let levels = LogLevels::new("info");
4106 let prev = levels.set_logger_level("root", "trace");
4107 assert_eq!(prev, Some("info".to_string()));
4108 assert_eq!(levels.current_level(), "trace");
4109 }
4110
4111 #[tokio::test]
4114 async fn actuator_prometheus_returns_metrics() {
4115 let state = test_state();
4116 state.metrics().record("GET", "/test", 200, 10);
4117 state.metrics().record("POST", "/test", 500, 50);
4118
4119 let app = actuator_router(true).with_state(state);
4120 let resp = app
4121 .oneshot(
4122 Request::builder()
4123 .uri("/actuator/prometheus")
4124 .body(Body::empty())
4125 .unwrap(),
4126 )
4127 .await
4128 .unwrap();
4129
4130 assert_eq!(resp.status(), StatusCode::OK);
4131 assert_eq!(
4132 resp.headers().get("content-type").unwrap(),
4133 "text/plain; version=0.0.4"
4134 );
4135
4136 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4137 .await
4138 .unwrap();
4139 let text = String::from_utf8(body.to_vec()).unwrap();
4140
4141 assert!(text.contains("# HELP autumn_http_requests_total Total number of HTTP requests"));
4142 assert!(text.contains("# TYPE autumn_http_requests_total counter"));
4143 assert!(text.contains("autumn_http_requests_total{version=\"stable\"} 2"));
4144
4145 assert!(text.contains("autumn_http_requests_active{version=\"stable\"} "));
4146 assert!(text.contains("autumn_http_responses_total{version=\"stable\",status=\"2xx\"} 1"));
4147 assert!(text.contains("autumn_http_responses_total{version=\"stable\",status=\"5xx\"} 1"));
4148
4149 assert!(text.contains("# TYPE autumn_http_request_duration_seconds summary"));
4151 assert!(text.contains(
4152 "autumn_http_request_duration_seconds{version=\"stable\",quantile=\"0.99\"}"
4153 ));
4154
4155 assert!(text.contains(
4156 "autumn_http_route_requests_total{version=\"stable\",method=\"GET\",route=\"/test\"} 1"
4157 ));
4158 assert!(text.contains(
4159 "autumn_http_route_requests_total{version=\"stable\",method=\"POST\",route=\"/test\"} 1"
4160 ));
4161
4162 assert!(text.contains("# HELP autumn_request_timeouts_total"));
4163 assert!(text.contains("# TYPE autumn_request_timeouts_total counter"));
4164 assert!(text.contains("autumn_request_timeouts_total{version=\"stable\"} 0"));
4165 }
4166
4167 #[tokio::test]
4168 async fn actuator_prometheus_labels_metrics_with_canary_version() {
4169 let mut state = test_state();
4172 state.deploy_version = crate::canary::CANARY.to_owned();
4173 state.metrics().record("GET", "/test", 200, 10);
4175 state.metrics().record("GET", "/test", 200, 20);
4176 state.metrics().record("GET", "/test", 500, 1200);
4177
4178 let app = actuator_router(true).with_state(state);
4179 let resp = app
4180 .oneshot(
4181 Request::builder()
4182 .uri("/actuator/prometheus")
4183 .body(Body::empty())
4184 .unwrap(),
4185 )
4186 .await
4187 .unwrap();
4188 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4189 .await
4190 .unwrap();
4191 let text = String::from_utf8(body.to_vec()).unwrap();
4192
4193 assert!(text.contains("autumn_http_requests_total{version=\"canary\"} 3"));
4194 assert!(text.contains("autumn_http_responses_total{version=\"canary\",status=\"5xx\"} 1"));
4195 assert!(!text.contains("version=\"stable\""));
4197
4198 let quantile = |q: &str| -> f64 {
4201 let needle = format!(
4202 "autumn_http_request_duration_seconds{{version=\"canary\",quantile=\"{q}\"}} "
4203 );
4204 let line = text
4205 .lines()
4206 .find(|l| l.starts_with(&needle))
4207 .unwrap_or_else(|| panic!("missing duration line for quantile {q}"));
4208 line[needle.len()..].trim().parse().unwrap()
4209 };
4210 let (p50, p95, p99) = (quantile("0.5"), quantile("0.95"), quantile("0.99"));
4211 assert!(p50 <= p95, "p50 ({p50}) must be <= p95 ({p95})");
4212 assert!(p95 <= p99, "p95 ({p95}) must be <= p99 ({p99})");
4213 assert!(
4215 (p99 - 1.2).abs() < f64::EPSILON,
4216 "p99 should be 1.2s, got {p99}"
4217 );
4218 }
4219
4220 #[tokio::test]
4221 async fn actuator_prometheus_available_in_nonsensitive_mode() {
4222 let app = actuator_router(false).with_state(test_state());
4223 let resp = app
4224 .oneshot(
4225 Request::builder()
4226 .uri("/actuator/prometheus")
4227 .body(Body::empty())
4228 .unwrap(),
4229 )
4230 .await
4231 .unwrap();
4232 assert_eq!(resp.status(), StatusCode::OK);
4233 }
4234
4235 #[tokio::test]
4236 async fn actuator_prometheus_available_when_export_enabled_and_nonsensitive() {
4237 let app = actuator_router_with_prefix("/actuator", false, true).with_state(test_state());
4240 let resp = app
4241 .clone()
4242 .oneshot(
4243 Request::builder()
4244 .uri("/actuator/prometheus")
4245 .body(Body::empty())
4246 .unwrap(),
4247 )
4248 .await
4249 .unwrap();
4250 assert_eq!(resp.status(), StatusCode::OK);
4251
4252 for sensitive_path in [
4254 "/actuator/env",
4255 "/actuator/configprops",
4256 "/actuator/loggers",
4257 "/actuator/tasks",
4258 "/actuator/jobs",
4259 "/actuator/ui/tasks",
4260 ] {
4261 let resp = app
4262 .clone()
4263 .oneshot(
4264 Request::builder()
4265 .uri(sensitive_path)
4266 .body(Body::empty())
4267 .unwrap(),
4268 )
4269 .await
4270 .unwrap();
4271 assert_eq!(
4272 resp.status(),
4273 StatusCode::NOT_FOUND,
4274 "{sensitive_path} should be unavailable when actuator is non-sensitive"
4275 );
4276 }
4277 }
4278
4279 #[tokio::test]
4280 async fn actuator_prometheus_unavailable_when_export_disabled() {
4281 let app = actuator_router_with_prefix("/actuator", false, false).with_state(test_state());
4283 let resp = app
4284 .oneshot(
4285 Request::builder()
4286 .uri("/actuator/prometheus")
4287 .body(Body::empty())
4288 .unwrap(),
4289 )
4290 .await
4291 .unwrap();
4292 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4293 }
4294
4295 #[tokio::test]
4296 async fn actuator_prometheus_unavailable_when_export_disabled_even_if_sensitive() {
4297 let app = actuator_router_with_prefix("/actuator", true, false).with_state(test_state());
4299 let resp = app
4300 .oneshot(
4301 Request::builder()
4302 .uri("/actuator/prometheus")
4303 .body(Body::empty())
4304 .unwrap(),
4305 )
4306 .await
4307 .unwrap();
4308 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4309 }
4310
4311 #[test]
4312 fn actuator_endpoint_paths_respects_prometheus_toggle() {
4313 let enabled = actuator_endpoint_paths("/actuator", false, true);
4314 assert!(
4315 enabled.iter().any(|p| p == "/actuator/prometheus"),
4316 "prometheus path should be listed when export is enabled: {enabled:?}"
4317 );
4318
4319 let disabled = actuator_endpoint_paths("/actuator", false, false);
4320 assert!(
4321 !disabled.iter().any(|p| p == "/actuator/prometheus"),
4322 "prometheus path should be absent when export is disabled: {disabled:?}"
4323 );
4324 }
4325
4326 #[tokio::test]
4329 async fn actuator_tasks_returns_registered_tasks() {
4330 let state = test_state();
4331 state.task_registry().register("cleanup", "every 5m");
4332 state.task_registry().record_start("cleanup");
4333 state.task_registry().record_success("cleanup", 150);
4334
4335 let app = actuator_router(true).with_state(state);
4336 let resp = app
4337 .oneshot(
4338 Request::builder()
4339 .uri("/actuator/tasks")
4340 .body(Body::empty())
4341 .unwrap(),
4342 )
4343 .await
4344 .unwrap();
4345
4346 assert_eq!(resp.status(), StatusCode::OK);
4347 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4348 .await
4349 .unwrap();
4350 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4351 let task = &json["scheduled_tasks"]["cleanup"];
4352 assert_eq!(task["schedule"], "every 5m");
4353 assert_eq!(task["status"], "idle");
4354 assert_eq!(task["total_runs"], 1);
4355 assert_eq!(task["total_failures"], 0);
4356 assert_eq!(task["last_result"], "ok");
4357 assert_eq!(task["last_duration_ms"], 150);
4358 }
4359
4360 #[tokio::test]
4361 async fn actuator_jobs_returns_registered_jobs() {
4362 let state = test_state();
4363 state.job_registry().register("send_email");
4364 state.job_registry().record_enqueue("send_email");
4365 state.job_registry().record_start("send_email");
4366 state.job_registry().record_success("send_email");
4367
4368 let app = actuator_router(true).with_state(state);
4369 let resp = app
4370 .oneshot(
4371 Request::builder()
4372 .uri("/actuator/jobs")
4373 .body(Body::empty())
4374 .unwrap(),
4375 )
4376 .await
4377 .unwrap();
4378
4379 assert_eq!(resp.status(), StatusCode::OK);
4380 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4381 .await
4382 .unwrap();
4383 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4384 let job = &json["jobs"]["send_email"];
4385 assert_eq!(job["queued"], 0);
4386 assert_eq!(job["in_flight"], 0);
4387 assert_eq!(job["total_successes"], 1);
4388 assert_eq!(job["total_failures"], 0);
4389 }
4390
4391 #[cfg(feature = "ws")]
4392 #[tokio::test]
4393 async fn actuator_channels_returns_metrics() {
4394 let state = test_state();
4395 let mut rx = state.channels().subscribe("feed");
4396 state
4397 .channels()
4398 .broadcast()
4399 .publish("feed", "hello")
4400 .expect("publish should succeed");
4401 rx.try_recv().expect("subscriber should receive payload");
4402
4403 let app = actuator_router(true).with_state(state);
4404 let resp = app
4405 .oneshot(
4406 Request::builder()
4407 .uri("/actuator/channels")
4408 .body(Body::empty())
4409 .unwrap(),
4410 )
4411 .await
4412 .unwrap();
4413
4414 assert_eq!(resp.status(), StatusCode::OK);
4415 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4416 .await
4417 .unwrap();
4418 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4419 let feed = &json["channels"]["feed"];
4420 assert_eq!(feed["subscriber_count"], 1);
4421 assert_eq!(feed["lifetime_publish_count"], 1);
4422 assert_eq!(feed["dropped_count"], 0);
4423 assert_eq!(feed["lagged_count"], 0);
4424 }
4425
4426 #[tokio::test]
4427 async fn actuator_tasks_hidden_in_nonsensitive_mode() {
4428 let app = actuator_router(false).with_state(test_state());
4429 let resp = app
4430 .oneshot(
4431 Request::builder()
4432 .uri("/actuator/tasks")
4433 .body(Body::empty())
4434 .unwrap(),
4435 )
4436 .await
4437 .unwrap();
4438 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
4439 }
4440
4441 #[test]
4442 fn task_registry_records_failure() {
4443 let registry = TaskRegistry::new();
4444 registry.register("my_task", "cron 0 * * * *");
4445 registry.record_start("my_task");
4446 registry.record_failure("my_task", 200, "connection refused");
4447
4448 let snapshot = registry.snapshot();
4449 let task = &snapshot["my_task"];
4450 assert_eq!(task.status, "idle");
4451 assert_eq!(task.total_runs, 1);
4452 assert_eq!(task.total_failures, 1);
4453 assert_eq!(task.last_result.as_deref(), Some("failed"));
4454 assert_eq!(task.last_error.as_deref(), Some("connection refused"));
4455 }
4456
4457 #[test]
4458 fn task_registry_empty_snapshot() {
4459 let registry = TaskRegistry::new();
4460 assert!(registry.snapshot().is_empty());
4461 }
4462 #[test]
4463 fn log_levels_rejects_new_key_at_capacity() {
4464 let levels = LogLevels::new("info");
4465 for i in 0..1000 {
4467 let _ = levels.set_logger_level(&format!("logger_{i}"), "debug");
4468 }
4469
4470 let result = levels.set_logger_level("logger_1000", "warn");
4472 assert_eq!(result, None);
4473 assert_eq!(levels.logger_overrides().len(), 1000);
4474 assert_eq!(levels.logger_overrides().get("logger_1000"), None);
4475 }
4476
4477 #[test]
4478 fn log_levels_accepts_existing_key_at_capacity() {
4479 let levels = LogLevels::new("info");
4480 for i in 0..1000 {
4482 let _ = levels.set_logger_level(&format!("logger_{i}"), "debug");
4483 }
4484
4485 let prev = levels.set_logger_level("logger_999", "warn");
4487 assert_eq!(prev.as_deref(), Some("debug"));
4488 assert_eq!(levels.logger_overrides().len(), 1000);
4489 assert_eq!(
4490 levels
4491 .logger_overrides()
4492 .get("logger_999")
4493 .map(String::as_str),
4494 Some("warn")
4495 );
4496 }
4497
4498 #[test]
4499 fn task_registry_records_multiple_successes_and_failures() {
4500 let registry = TaskRegistry::new();
4501 registry.register("my_task", "cron * * * * *");
4502
4503 registry.record_start("my_task");
4505 registry.record_success("my_task", 100);
4506
4507 registry.record_start("my_task");
4509 registry.record_success("my_task", 110);
4510
4511 let snapshot = registry.snapshot();
4512 let task = &snapshot["my_task"];
4513 assert_eq!(task.total_runs, 2);
4514 assert_eq!(task.total_failures, 0);
4515
4516 registry.record_start("my_task");
4518 registry.record_failure("my_task", 50, "failed");
4519
4520 let snapshot2 = registry.snapshot();
4521 let task2 = &snapshot2["my_task"];
4522 assert_eq!(task2.total_runs, 3);
4523 assert_eq!(task2.total_failures, 1);
4524 }
4525
4526 #[test]
4527 fn configprops_tracks_custom_profile() {
4528 let mut props = HashMap::new();
4529 ConfigProperties::track_property(
4530 &mut props,
4531 "log.level",
4532 "debug",
4533 "info",
4534 "custom_profile",
4535 );
4536 assert_eq!(props["log.level"].source, "autumn.toml");
4537 }
4538
4539 #[test]
4540 fn configprops_tracks_dev_prod_profiles() {
4541 let mut props = HashMap::new();
4542 ConfigProperties::track_property(&mut props, "log.level", "debug", "info", "dev");
4543 assert_eq!(props["log.level"].source, "profile_default:dev");
4544
4545 ConfigProperties::track_property(&mut props, "log.format", "json", "text", "prod");
4546 assert_eq!(props["log.format"].source, "profile_default:prod");
4547 }
4548
4549 #[test]
4550 fn configprops_returns_default_when_values_match() {
4551 let mut props = HashMap::new();
4552 ConfigProperties::track_property(&mut props, "log.level", "info", "info", "dev");
4553 assert_eq!(props["log.level"].source, "default");
4554 }
4555
4556 #[tokio::test]
4557 async fn actuator_ui_dashboard_returns_html_or_unimplemented() {
4558 let app = actuator_router(true).with_state(test_state());
4559
4560 let res = app
4561 .oneshot(
4562 Request::builder()
4563 .uri("/actuator/ui")
4564 .body(Body::empty())
4565 .unwrap(),
4566 )
4567 .await
4568 .unwrap();
4569
4570 if cfg!(feature = "maud") {
4571 assert_eq!(res.status(), StatusCode::OK);
4572 assert_eq!(
4573 res.headers().get("content-type").unwrap(),
4574 "text/html; charset=utf-8"
4575 );
4576 } else {
4577 assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4578 }
4579 }
4580
4581 #[tokio::test]
4582 async fn actuator_ui_metrics_returns_html_or_unimplemented() {
4583 let app = actuator_router(true).with_state(test_state());
4584
4585 let res = app
4586 .oneshot(
4587 Request::builder()
4588 .uri("/actuator/ui/metrics")
4589 .body(Body::empty())
4590 .unwrap(),
4591 )
4592 .await
4593 .unwrap();
4594
4595 if cfg!(feature = "maud") {
4596 assert_eq!(res.status(), StatusCode::OK);
4597 assert_eq!(
4598 res.headers().get("content-type").unwrap(),
4599 "text/html; charset=utf-8"
4600 );
4601 } else {
4602 assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4603 }
4604 }
4605
4606 #[tokio::test]
4607 async fn actuator_ui_tasks_returns_html_or_unimplemented() {
4608 let app = actuator_router(true).with_state(test_state());
4609
4610 let res = app
4611 .oneshot(
4612 Request::builder()
4613 .uri("/actuator/ui/tasks")
4614 .body(Body::empty())
4615 .unwrap(),
4616 )
4617 .await
4618 .unwrap();
4619
4620 if cfg!(feature = "maud") {
4621 assert_eq!(res.status(), StatusCode::OK);
4622 assert_eq!(
4623 res.headers().get("content-type").unwrap(),
4624 "text/html; charset=utf-8"
4625 );
4626 } else {
4627 assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED);
4628 }
4629 }
4630
4631 #[tokio::test]
4632 async fn test_actuator_router_calls_prefix_variant() {
4633 let app = actuator_router(false).with_state(test_state());
4636 let resp = app
4637 .oneshot(
4638 Request::builder()
4639 .uri("/actuator/health")
4640 .body(Body::empty())
4641 .unwrap(),
4642 )
4643 .await
4644 .unwrap();
4645
4646 assert_eq!(resp.status(), StatusCode::OK);
4647 }
4648
4649 #[tokio::test]
4652 async fn actuator_a11y_returns_posture_json() {
4653 let app = actuator_router(false).with_state(test_state());
4654 let resp = app
4655 .oneshot(
4656 Request::builder()
4657 .uri("/actuator/a11y")
4658 .body(Body::empty())
4659 .unwrap(),
4660 )
4661 .await
4662 .unwrap();
4663
4664 assert_eq!(resp.status(), StatusCode::OK);
4665 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4666 .await
4667 .unwrap();
4668 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4669 assert!(json["lang_set"].is_boolean(), "{json}");
4670 assert!(json["skip_link_present"].is_boolean(), "{json}");
4671 assert!(json["landmark_regions_present"].is_boolean(), "{json}");
4672 }
4673
4674 #[tokio::test]
4675 async fn actuator_a11y_available_in_nonsensitive_mode() {
4676 let app = actuator_router(false).with_state(test_state());
4677 let resp = app
4678 .oneshot(
4679 Request::builder()
4680 .uri("/actuator/a11y")
4681 .body(Body::empty())
4682 .unwrap(),
4683 )
4684 .await
4685 .unwrap();
4686 assert_eq!(resp.status(), StatusCode::OK);
4687 }
4688
4689 #[tokio::test]
4690 async fn actuator_a11y_posture_default_values() {
4691 let app = actuator_router(true).with_state(test_state());
4692 let resp = app
4693 .oneshot(
4694 Request::builder()
4695 .uri("/actuator/a11y")
4696 .body(Body::empty())
4697 .unwrap(),
4698 )
4699 .await
4700 .unwrap();
4701
4702 assert_eq!(resp.status(), StatusCode::OK);
4703 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4704 .await
4705 .unwrap();
4706 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4707 assert_eq!(json["lang_set"], false, "{json}");
4709 assert_eq!(json["skip_link_present"], false, "{json}");
4710 assert_eq!(json["landmark_regions_present"], false, "{json}");
4711 }
4712
4713 #[test]
4714 fn a11y_posture_all_passing_is_compliant() {
4715 let posture = A11yPosture {
4716 lang_set: true,
4717 skip_link_present: true,
4718 landmark_regions_present: true,
4719 };
4720 assert!(posture.is_compliant());
4721 }
4722
4723 #[test]
4724 fn a11y_posture_missing_lang_is_not_compliant() {
4725 let posture = A11yPosture {
4726 lang_set: false,
4727 skip_link_present: true,
4728 landmark_regions_present: true,
4729 };
4730 assert!(!posture.is_compliant());
4731 }
4732
4733 #[tokio::test]
4734 async fn actuator_a11y_endpoint_paths_includes_a11y() {
4735 let paths = actuator_endpoint_paths("/actuator", false, true);
4736 assert!(
4737 paths.iter().any(|p| p == "/actuator/a11y"),
4738 "a11y path not found in: {paths:?}"
4739 );
4740 }
4741
4742 #[test]
4745 fn metrics_source_registry_registers_and_collects() {
4746 struct FixedSource;
4747 impl MetricsSource for FixedSource {
4748 fn collect(&self) -> Vec<MetricFamily> {
4749 vec![MetricFamily {
4750 name: "plugin_requests_total".to_string(),
4751 help: "Plugin request count".to_string(),
4752 kind: MetricKind::Counter,
4753 samples: vec![MetricSample {
4754 labels: vec![],
4755 value: 42.0,
4756 }],
4757 }]
4758 }
4759 }
4760
4761 let registry = MetricsSourceRegistry::new();
4762 registry
4763 .register("myplugin", Arc::new(FixedSource))
4764 .unwrap();
4765
4766 let all = registry.collect_all();
4767 assert_eq!(all.len(), 1);
4768 assert_eq!(all[0].0, "myplugin");
4769 assert_eq!(all[0].1[0].name, "plugin_requests_total");
4770 assert!((all[0].1[0].samples[0].value - 42.0).abs() < f64::EPSILON);
4771 }
4772
4773 #[test]
4774 fn metrics_source_registry_rejects_duplicate_name() {
4775 struct EmptySource;
4776 impl MetricsSource for EmptySource {
4777 fn collect(&self) -> Vec<MetricFamily> {
4778 vec![]
4779 }
4780 }
4781
4782 let registry = MetricsSourceRegistry::new();
4783 registry.register("dup", Arc::new(EmptySource)).unwrap();
4784 let result = registry.register("dup", Arc::new(EmptySource));
4785 assert!(result.is_err());
4786 assert!(result.unwrap_err().contains("dup"));
4787 }
4788
4789 #[test]
4790 fn metrics_source_registry_isolates_panicking_source() {
4791 struct PanickingSource;
4792 impl MetricsSource for PanickingSource {
4793 fn collect(&self) -> Vec<MetricFamily> {
4794 panic!("source panicked!")
4795 }
4796 }
4797
4798 let registry = MetricsSourceRegistry::new();
4799 registry
4800 .register("panicker", Arc::new(PanickingSource))
4801 .unwrap();
4802
4803 let all = registry.collect_all();
4804 assert_eq!(all.len(), 1);
4805 assert_eq!(
4806 all[0].1.len(),
4807 0,
4808 "panicking source should yield no families"
4809 );
4810
4811 let errors = registry.error_counts();
4812 assert_eq!(errors.get("panicker"), Some(&1));
4813 }
4814
4815 #[tokio::test]
4816 async fn prometheus_endpoint_includes_plugin_source_families() {
4817 struct GaugeSource;
4818 impl MetricsSource for GaugeSource {
4819 fn collect(&self) -> Vec<MetricFamily> {
4820 vec![MetricFamily {
4821 name: "plugin_queue_depth".to_string(),
4822 help: "Plugin queue depth".to_string(),
4823 kind: MetricKind::Gauge,
4824 samples: vec![MetricSample {
4825 labels: vec![("shard".to_string(), "a".to_string())],
4826 value: 7.0,
4827 }],
4828 }]
4829 }
4830 }
4831
4832 let state = test_state();
4833 state
4834 .metrics_source_registry
4835 .register("gauge_plugin", Arc::new(GaugeSource))
4836 .unwrap();
4837
4838 let app = actuator_router(true).with_state(state);
4839 let resp = app
4840 .oneshot(
4841 Request::builder()
4842 .uri("/actuator/prometheus")
4843 .body(Body::empty())
4844 .unwrap(),
4845 )
4846 .await
4847 .unwrap();
4848
4849 assert_eq!(resp.status(), StatusCode::OK);
4850 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4851 .await
4852 .unwrap();
4853 let text = String::from_utf8(body.to_vec()).unwrap();
4854
4855 assert!(
4856 text.contains("# HELP plugin_queue_depth Plugin queue depth"),
4857 "missing HELP line in:\n{text}"
4858 );
4859 assert!(
4860 text.contains("# TYPE plugin_queue_depth gauge"),
4861 "missing TYPE line in:\n{text}"
4862 );
4863 assert!(
4864 text.contains("plugin_queue_depth{shard=\"a\"} 7"),
4865 "missing sample line in:\n{text}"
4866 );
4867 }
4868
4869 #[tokio::test]
4870 async fn prometheus_endpoint_emits_error_counter_for_panicking_source() {
4871 struct PanickingSource;
4872 impl MetricsSource for PanickingSource {
4873 fn collect(&self) -> Vec<MetricFamily> {
4874 panic!("oops")
4875 }
4876 }
4877
4878 let state = test_state();
4879 state
4880 .metrics_source_registry
4881 .register("panic_src", Arc::new(PanickingSource))
4882 .unwrap();
4883
4884 let app = actuator_router(true).with_state(state);
4885 let resp = app
4886 .oneshot(
4887 Request::builder()
4888 .uri("/actuator/prometheus")
4889 .body(Body::empty())
4890 .unwrap(),
4891 )
4892 .await
4893 .unwrap();
4894
4895 assert_eq!(resp.status(), StatusCode::OK);
4896 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4897 .await
4898 .unwrap();
4899 let text = String::from_utf8(body.to_vec()).unwrap();
4900
4901 assert!(
4902 text.contains("autumn_metrics_source_errors_total{source=\"panic_src\"} 1"),
4903 "missing error counter in:\n{text}"
4904 );
4905 }
4906
4907 #[tokio::test]
4908 async fn metrics_endpoint_includes_sources_section() {
4909 struct SampleSource;
4910 impl MetricsSource for SampleSource {
4911 fn collect(&self) -> Vec<MetricFamily> {
4912 vec![MetricFamily {
4913 name: "custom_counter".to_string(),
4914 help: "A custom counter".to_string(),
4915 kind: MetricKind::Counter,
4916 samples: vec![MetricSample {
4917 labels: vec![],
4918 value: 5.0,
4919 }],
4920 }]
4921 }
4922 }
4923
4924 let state = test_state();
4925 state
4926 .metrics_source_registry
4927 .register("my_source", Arc::new(SampleSource))
4928 .unwrap();
4929
4930 let app = actuator_router(true).with_state(state);
4931 let resp = app
4932 .oneshot(
4933 Request::builder()
4934 .uri("/actuator/metrics")
4935 .body(Body::empty())
4936 .unwrap(),
4937 )
4938 .await
4939 .unwrap();
4940
4941 assert_eq!(resp.status(), StatusCode::OK);
4942 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
4943 .await
4944 .unwrap();
4945 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4946
4947 assert!(
4948 json.get("sources").is_some(),
4949 "metrics JSON missing 'sources' key"
4950 );
4951 assert!(
4952 json["sources"].get("my_source").is_some(),
4953 "sources missing 'my_source' key"
4954 );
4955 }
4956
4957 #[test]
4958 fn metrics_source_registry_preserves_insertion_order() {
4959 struct NamedSource(&'static str);
4960 impl MetricsSource for NamedSource {
4961 fn collect(&self) -> Vec<MetricFamily> {
4962 vec![MetricFamily {
4963 name: self.0.to_string(),
4964 help: String::new(),
4965 kind: MetricKind::Counter,
4966 samples: vec![],
4967 }]
4968 }
4969 }
4970
4971 let registry = MetricsSourceRegistry::new();
4972 registry
4973 .register("alpha", Arc::new(NamedSource("alpha_metric")))
4974 .unwrap();
4975 registry
4976 .register("beta", Arc::new(NamedSource("beta_metric")))
4977 .unwrap();
4978 registry
4979 .register("gamma", Arc::new(NamedSource("gamma_metric")))
4980 .unwrap();
4981
4982 let all = registry.collect_all();
4983 assert_eq!(all[0].0, "alpha");
4984 assert_eq!(all[1].0, "beta");
4985 assert_eq!(all[2].0, "gamma");
4986 }
4987
4988 #[test]
4991 fn escape_help_text_escapes_backslash_and_newline() {
4992 assert_eq!(escape_help_text("a\\b\nc"), "a\\\\b\\nc");
4993 assert_eq!(escape_help_text("plain"), "plain");
4994 assert_eq!(escape_help_text(""), "");
4995 }
4996
4997 #[test]
4998 fn format_sample_value_handles_special_floats() {
4999 assert_eq!(format_sample_value(f64::INFINITY), "+Inf");
5000 assert_eq!(format_sample_value(f64::NEG_INFINITY), "-Inf");
5001 assert_eq!(format_sample_value(f64::NAN), "NaN");
5002 assert_eq!(format_sample_value(0.0), "0");
5003 assert_eq!(format_sample_value(1.5), "1.5");
5004 }
5005
5006 #[test]
5007 fn is_valid_metric_name_accepts_valid_and_rejects_invalid() {
5008 assert!(is_valid_metric_name("http_requests_total"));
5009 assert!(is_valid_metric_name("_private"));
5010 assert!(is_valid_metric_name("ns:metric"));
5011 assert!(!is_valid_metric_name(""));
5012 assert!(!is_valid_metric_name("0starts_with_digit"));
5013 assert!(!is_valid_metric_name("has-hyphen"));
5014 }
5015
5016 #[test]
5017 fn is_valid_label_name_accepts_valid_and_rejects_invalid() {
5018 assert!(is_valid_label_name("shard"));
5019 assert!(is_valid_label_name("_internal"));
5020 assert!(is_valid_label_name("a1"));
5021 assert!(!is_valid_label_name(""));
5022 assert!(!is_valid_label_name("0starts_digit"));
5023 assert!(!is_valid_label_name("has-hyphen"));
5024 assert!(!is_valid_label_name("has.dot"));
5025 }
5026
5027 #[tokio::test]
5028 async fn prometheus_endpoint_skips_family_with_invalid_metric_name() {
5029 struct BadNameSource;
5030 impl MetricsSource for BadNameSource {
5031 fn collect(&self) -> Vec<MetricFamily> {
5032 vec![
5033 MetricFamily {
5034 name: "invalid-name".to_string(),
5035 help: "should be skipped".to_string(),
5036 kind: MetricKind::Counter,
5037 samples: vec![],
5038 },
5039 MetricFamily {
5040 name: "valid_name".to_string(),
5041 help: "should appear".to_string(),
5042 kind: MetricKind::Counter,
5043 samples: vec![MetricSample {
5044 labels: vec![],
5045 value: 1.0,
5046 }],
5047 },
5048 ]
5049 }
5050 }
5051
5052 let state = test_state();
5053 state
5054 .metrics_source_registry
5055 .register("bad_name_src", Arc::new(BadNameSource))
5056 .unwrap();
5057
5058 let app = actuator_router(true).with_state(state);
5059 let resp = app
5060 .oneshot(
5061 Request::builder()
5062 .uri("/actuator/prometheus")
5063 .body(Body::empty())
5064 .unwrap(),
5065 )
5066 .await
5067 .unwrap();
5068 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5069 .await
5070 .unwrap();
5071 let text = String::from_utf8(body.to_vec()).unwrap();
5072
5073 assert!(
5074 !text.contains("invalid-name"),
5075 "invalid family must be skipped:\n{text}"
5076 );
5077 assert!(
5078 text.contains("valid_name"),
5079 "valid family must appear:\n{text}"
5080 );
5081 }
5082
5083 #[tokio::test]
5084 async fn prometheus_endpoint_skips_sample_with_invalid_label_key() {
5085 struct DirtyLabelsSource;
5089 impl MetricsSource for DirtyLabelsSource {
5090 fn collect(&self) -> Vec<MetricFamily> {
5091 vec![MetricFamily {
5092 name: "dirty_labels_metric".to_string(),
5093 help: "test".to_string(),
5094 kind: MetricKind::Counter,
5095 samples: vec![
5096 MetricSample {
5097 labels: vec![
5098 ("good".to_string(), "a".to_string()),
5099 ("bad-key".to_string(), "b".to_string()),
5100 ],
5101 value: 1.0,
5102 },
5103 MetricSample {
5104 labels: vec![("good".to_string(), "a".to_string())],
5105 value: 2.0,
5106 },
5107 ],
5108 }]
5109 }
5110 }
5111
5112 let state = test_state();
5113 state
5114 .metrics_source_registry
5115 .register("dirty", Arc::new(DirtyLabelsSource))
5116 .unwrap();
5117
5118 let app = actuator_router(true).with_state(state);
5119 let resp = app
5120 .oneshot(
5121 Request::builder()
5122 .uri("/actuator/prometheus")
5123 .body(Body::empty())
5124 .unwrap(),
5125 )
5126 .await
5127 .unwrap();
5128 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5129 .await
5130 .unwrap();
5131 let text = String::from_utf8(body.to_vec()).unwrap();
5132
5133 assert!(
5135 !text.contains("dirty_labels_metric{good=\"a\"} 1"),
5136 "sample with invalid label key must be skipped:\n{text}"
5137 );
5138 assert!(
5140 text.contains("dirty_labels_metric{good=\"a\"} 2"),
5141 "clean sample must appear:\n{text}"
5142 );
5143 }
5144
5145 #[tokio::test]
5146 async fn prometheus_endpoint_deduplicates_label_keys() {
5147 struct DupLabelSource;
5148 impl MetricsSource for DupLabelSource {
5149 fn collect(&self) -> Vec<MetricFamily> {
5150 vec![MetricFamily {
5151 name: "dup_label_metric".to_string(),
5152 help: "test".to_string(),
5153 kind: MetricKind::Counter,
5154 samples: vec![MetricSample {
5155 labels: vec![
5156 ("env".to_string(), "prod".to_string()),
5157 ("env".to_string(), "staging".to_string()),
5158 ],
5159 value: 5.0,
5160 }],
5161 }]
5162 }
5163 }
5164
5165 let state = test_state();
5166 state
5167 .metrics_source_registry
5168 .register("dup_src", Arc::new(DupLabelSource))
5169 .unwrap();
5170
5171 let app = actuator_router(true).with_state(state);
5172 let resp = app
5173 .oneshot(
5174 Request::builder()
5175 .uri("/actuator/prometheus")
5176 .body(Body::empty())
5177 .unwrap(),
5178 )
5179 .await
5180 .unwrap();
5181 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5182 .await
5183 .unwrap();
5184 let text = String::from_utf8(body.to_vec()).unwrap();
5185
5186 assert!(
5188 text.contains("dup_label_metric{env=\"prod\"} 5"),
5189 "first env value must be kept:\n{text}"
5190 );
5191 assert!(
5192 !text.contains("staging"),
5193 "duplicate env key value must be dropped:\n{text}"
5194 );
5195 }
5196
5197 #[tokio::test]
5198 async fn prometheus_endpoint_escapes_help_text_and_formats_inf() {
5199 struct SpecialSource;
5200 impl MetricsSource for SpecialSource {
5201 fn collect(&self) -> Vec<MetricFamily> {
5202 vec![MetricFamily {
5203 name: "inf_gauge".to_string(),
5204 help: "has\\backslash and\nnewline".to_string(),
5205 kind: MetricKind::Gauge,
5206 samples: vec![
5207 MetricSample {
5208 labels: vec![("dir".to_string(), "pos".to_string())],
5209 value: f64::INFINITY,
5210 },
5211 MetricSample {
5212 labels: vec![("dir".to_string(), "neg".to_string())],
5213 value: f64::NEG_INFINITY,
5214 },
5215 ],
5216 }]
5217 }
5218 }
5219
5220 let state = test_state();
5221 state
5222 .metrics_source_registry
5223 .register("special", Arc::new(SpecialSource))
5224 .unwrap();
5225
5226 let app = actuator_router(true).with_state(state);
5227 let resp = app
5228 .oneshot(
5229 Request::builder()
5230 .uri("/actuator/prometheus")
5231 .body(Body::empty())
5232 .unwrap(),
5233 )
5234 .await
5235 .unwrap();
5236 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5237 .await
5238 .unwrap();
5239 let text = String::from_utf8(body.to_vec()).unwrap();
5240
5241 assert!(
5242 text.contains("# HELP inf_gauge has\\\\backslash and\\nnewline"),
5243 "help text must be escaped in:\n{text}"
5244 );
5245 assert!(
5246 text.contains("inf_gauge{dir=\"pos\"} +Inf"),
5247 "must render +Inf in:\n{text}"
5248 );
5249 assert!(
5250 text.contains("inf_gauge{dir=\"neg\"} -Inf"),
5251 "must render -Inf in:\n{text}"
5252 );
5253 }
5254
5255 #[tokio::test]
5256 async fn prometheus_endpoint_skips_duplicate_family_name_across_sources() {
5257 struct FirstSource;
5258 impl MetricsSource for FirstSource {
5259 fn collect(&self) -> Vec<MetricFamily> {
5260 vec![MetricFamily {
5261 name: "shared_counter".to_string(),
5262 help: "from first".to_string(),
5263 kind: MetricKind::Counter,
5264 samples: vec![MetricSample {
5265 labels: vec![],
5266 value: 1.0,
5267 }],
5268 }]
5269 }
5270 }
5271 struct SecondSource;
5272 impl MetricsSource for SecondSource {
5273 fn collect(&self) -> Vec<MetricFamily> {
5274 vec![MetricFamily {
5275 name: "shared_counter".to_string(),
5276 help: "from second".to_string(),
5277 kind: MetricKind::Counter,
5278 samples: vec![MetricSample {
5279 labels: vec![],
5280 value: 2.0,
5281 }],
5282 }]
5283 }
5284 }
5285
5286 let state = test_state();
5287 state
5288 .metrics_source_registry
5289 .register("first", Arc::new(FirstSource))
5290 .unwrap();
5291 state
5292 .metrics_source_registry
5293 .register("second", Arc::new(SecondSource))
5294 .unwrap();
5295
5296 let app = actuator_router(true).with_state(state);
5297 let resp = app
5298 .oneshot(
5299 Request::builder()
5300 .uri("/actuator/prometheus")
5301 .body(Body::empty())
5302 .unwrap(),
5303 )
5304 .await
5305 .unwrap();
5306 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5307 .await
5308 .unwrap();
5309 let text = String::from_utf8(body.to_vec()).unwrap();
5310
5311 let occurrences = text.matches("# HELP shared_counter").count();
5312 assert_eq!(
5313 occurrences, 1,
5314 "must emit exactly one HELP block for shared_counter:\n{text}"
5315 );
5316 }
5317
5318 #[tokio::test]
5319 async fn prometheus_endpoint_skips_builtin_name_collision() {
5320 struct ShadowSource;
5321 impl MetricsSource for ShadowSource {
5322 fn collect(&self) -> Vec<MetricFamily> {
5323 vec![MetricFamily {
5324 name: "autumn_http_requests_total".to_string(),
5325 help: "plugin trying to shadow built-in".to_string(),
5326 kind: MetricKind::Counter,
5327 samples: vec![MetricSample {
5328 labels: vec![],
5329 value: 999.0,
5330 }],
5331 }]
5332 }
5333 }
5334
5335 let state = test_state();
5336 state
5337 .metrics_source_registry
5338 .register("shadow", Arc::new(ShadowSource))
5339 .unwrap();
5340
5341 let app = actuator_router(true).with_state(state);
5342 let resp = app
5343 .oneshot(
5344 Request::builder()
5345 .uri("/actuator/prometheus")
5346 .body(Body::empty())
5347 .unwrap(),
5348 )
5349 .await
5350 .unwrap();
5351 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5352 .await
5353 .unwrap();
5354 let text = String::from_utf8(body.to_vec()).unwrap();
5355
5356 let occurrences = text.matches("# HELP autumn_http_requests_total").count();
5357 assert_eq!(
5358 occurrences, 1,
5359 "built-in must not be shadowed by plugin:\n{text}"
5360 );
5361 assert!(
5362 !text.contains("999"),
5363 "plugin shadow value must not appear:\n{text}"
5364 );
5365 }
5366
5367 #[tokio::test]
5368 async fn prometheus_endpoint_skips_builtin_duration_family_collision() {
5369 struct ShadowLatency;
5372 impl MetricsSource for ShadowLatency {
5373 fn collect(&self) -> Vec<MetricFamily> {
5374 vec![MetricFamily {
5375 name: "autumn_http_request_duration_seconds".to_string(),
5376 help: "plugin trying to shadow built-in latency".to_string(),
5377 kind: MetricKind::Gauge,
5378 samples: vec![MetricSample {
5379 labels: vec![],
5380 value: 999.0,
5381 }],
5382 }]
5383 }
5384 }
5385
5386 let state = test_state();
5387 state
5388 .metrics_source_registry
5389 .register("shadow_latency", Arc::new(ShadowLatency))
5390 .unwrap();
5391
5392 let app = actuator_router(true).with_state(state);
5393 let resp = app
5394 .oneshot(
5395 Request::builder()
5396 .uri("/actuator/prometheus")
5397 .body(Body::empty())
5398 .unwrap(),
5399 )
5400 .await
5401 .unwrap();
5402 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5403 .await
5404 .unwrap();
5405 let text = String::from_utf8(body.to_vec()).unwrap();
5406
5407 let occurrences = text
5408 .matches("# HELP autumn_http_request_duration_seconds")
5409 .count();
5410 assert_eq!(
5411 occurrences, 1,
5412 "built-in latency family must not be shadowed by plugin:\n{text}"
5413 );
5414 assert!(
5415 !text.contains("999"),
5416 "plugin shadow value must not appear:\n{text}"
5417 );
5418 }
5419
5420 #[tokio::test]
5421 async fn prometheus_endpoint_skips_duplicate_series_within_family() {
5422 struct DupSeriesSource;
5423 impl MetricsSource for DupSeriesSource {
5424 fn collect(&self) -> Vec<MetricFamily> {
5425 vec![MetricFamily {
5426 name: "dup_series_metric".to_string(),
5427 help: "test".to_string(),
5428 kind: MetricKind::Counter,
5429 samples: vec![
5430 MetricSample {
5431 labels: vec![("region".to_string(), "us".to_string())],
5432 value: 10.0,
5433 },
5434 MetricSample {
5435 labels: vec![("region".to_string(), "us".to_string())],
5436 value: 20.0,
5437 },
5438 ],
5439 }]
5440 }
5441 }
5442
5443 let state = test_state();
5444 state
5445 .metrics_source_registry
5446 .register("dup_series", Arc::new(DupSeriesSource))
5447 .unwrap();
5448
5449 let app = actuator_router(true).with_state(state);
5450 let resp = app
5451 .oneshot(
5452 Request::builder()
5453 .uri("/actuator/prometheus")
5454 .body(Body::empty())
5455 .unwrap(),
5456 )
5457 .await
5458 .unwrap();
5459 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5460 .await
5461 .unwrap();
5462 let text = String::from_utf8(body.to_vec()).unwrap();
5463
5464 assert!(
5466 text.contains("dup_series_metric{region=\"us\"} 10"),
5467 "first sample must appear:\n{text}"
5468 );
5469 assert!(
5470 !text.contains("dup_series_metric{region=\"us\"} 20"),
5471 "duplicate series must be dropped:\n{text}"
5472 );
5473 }
5474
5475 fn make_log_buffer_with_entries() -> crate::log::capture::LogBuffer {
5478 use crate::log::capture::{CapturedLogEntry, LogBuffer};
5479 use crate::log::filter::ParameterFilter;
5480 let buf = LogBuffer::new(100, ParameterFilter::default());
5481 buf.push(CapturedLogEntry {
5482 timestamp: "2024-01-01T00:00:00.000Z".to_owned(),
5483 level: "INFO".to_owned(),
5484 target: "myapp::orders".to_owned(),
5485 message: "order created".to_owned(),
5486 fields: {
5487 let mut m = serde_json::Map::new();
5488 m.insert("order_id".to_owned(), serde_json::json!("A-1001"));
5489 m
5490 },
5491 request_id: Some("req-abc".to_owned()),
5492 });
5493 buf.push(CapturedLogEntry {
5494 timestamp: "2024-01-01T00:00:01.000Z".to_owned(),
5495 level: "WARN".to_owned(),
5496 target: "myapp::payments".to_owned(),
5497 message: "payment slow".to_owned(),
5498 fields: serde_json::Map::new(),
5499 request_id: None,
5500 });
5501 buf.push(CapturedLogEntry {
5502 timestamp: "2024-01-01T00:00:02.000Z".to_owned(),
5503 level: "ERROR".to_owned(),
5504 target: "myapp::payments".to_owned(),
5505 message: "payment failed".to_owned(),
5506 fields: serde_json::Map::new(),
5507 request_id: None,
5508 });
5509 buf
5510 }
5511
5512 #[tokio::test]
5513 async fn green_logfile_returns_empty_when_capture_disabled() {
5514 let state = test_state(); let response =
5516 logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5517 .await
5518 .unwrap();
5519 let body = response.0;
5520 assert!(!body.capture_enabled);
5521 assert!(body.entries.is_empty());
5522 assert_eq!(body.total, 0);
5523 }
5524
5525 #[tokio::test]
5526 async fn green_logfile_returns_all_entries_when_no_filter() {
5527 let mut state = test_state();
5528 state.log_buffer = Some(make_log_buffer_with_entries());
5529
5530 let response =
5531 logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5532 .await
5533 .unwrap();
5534 let body = response.0;
5535 assert!(body.capture_enabled);
5536 assert_eq!(body.total, 3);
5537 assert_eq!(body.entries.len(), 3);
5538 assert_eq!(body.entries[0].level, "INFO");
5540 assert_eq!(body.entries[2].level, "ERROR");
5541 }
5542
5543 #[tokio::test]
5544 async fn green_logfile_level_filter_excludes_info_when_min_warn() {
5545 let mut state = test_state();
5546 state.log_buffer = Some(make_log_buffer_with_entries());
5547
5548 let response = logfile_endpoint(
5549 State(state),
5550 axum::extract::Query(LogfileQuery {
5551 level: Some("warn".to_owned()),
5552 limit: None,
5553 }),
5554 )
5555 .await
5556 .unwrap();
5557 let body = response.0;
5558 assert_eq!(body.entries.len(), 2);
5559 assert!(body.entries.iter().all(|e| e.level != "INFO"));
5560 }
5561
5562 #[tokio::test]
5563 async fn green_logfile_limit_returns_most_recent_n() {
5564 let mut state = test_state();
5565 state.log_buffer = Some(make_log_buffer_with_entries());
5566
5567 let response = logfile_endpoint(
5568 State(state),
5569 axum::extract::Query(LogfileQuery {
5570 level: None,
5571 limit: Some(2),
5572 }),
5573 )
5574 .await
5575 .unwrap();
5576 let body = response.0;
5577 assert_eq!(body.entries.len(), 2);
5578 assert_eq!(body.entries[0].level, "WARN");
5580 assert_eq!(body.entries[1].level, "ERROR");
5581 }
5582
5583 #[tokio::test]
5584 async fn green_logfile_sensitive_fields_in_response_are_served_scrubbed() {
5585 use crate::log::capture::{CapturedLogEntry, LogBuffer};
5586 use crate::log::filter::{FILTERED_PLACEHOLDER, ParameterFilter};
5587 let buf = LogBuffer::new(10, ParameterFilter::default());
5588 buf.push(CapturedLogEntry {
5590 timestamp: "2024-01-01T00:00:00.000Z".to_owned(),
5591 level: "INFO".to_owned(),
5592 target: "auth".to_owned(),
5593 message: "login attempt".to_owned(),
5594 fields: {
5595 let mut m = serde_json::Map::new();
5596 m.insert(
5597 "password".to_owned(),
5598 serde_json::Value::String(FILTERED_PLACEHOLDER.to_owned()),
5599 );
5600 m
5601 },
5602 request_id: None,
5603 });
5604
5605 let mut state = test_state();
5606 state.log_buffer = Some(buf);
5607
5608 let response =
5609 logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5610 .await
5611 .unwrap();
5612 let body = response.0;
5613 assert_eq!(
5614 body.entries[0].fields["password"].as_str().unwrap(),
5615 FILTERED_PLACEHOLDER,
5616 "sensitive value must remain scrubbed in the response"
5617 );
5618 }
5619
5620 #[tokio::test]
5621 async fn green_logfile_invalid_level_returns_400() {
5622 let state = test_state();
5623 let result = logfile_endpoint(
5624 State(state),
5625 axum::extract::Query(LogfileQuery {
5626 level: Some("warning".to_owned()), limit: None,
5628 }),
5629 )
5630 .await;
5631 let (status, _body) = result.unwrap_err();
5632 assert_eq!(status, StatusCode::BAD_REQUEST);
5633 }
5634
5635 #[tokio::test]
5636 async fn green_logfile_endpoint_in_sensitive_router() {
5637 let state = test_state();
5639 let app = actuator_router::<TestActuatorState>(true).with_state(state);
5640 let resp = app
5641 .oneshot(
5642 Request::builder()
5643 .uri("/actuator/logfile")
5644 .body(Body::empty())
5645 .unwrap(),
5646 )
5647 .await
5648 .unwrap();
5649 assert_eq!(resp.status(), StatusCode::OK);
5650 }
5651
5652 #[tokio::test]
5653 async fn green_logfile_endpoint_not_in_non_sensitive_router() {
5654 let state = test_state();
5656 let app = actuator_router::<TestActuatorState>(false).with_state(state);
5657 let resp = app
5658 .oneshot(
5659 Request::builder()
5660 .uri("/actuator/logfile")
5661 .body(Body::empty())
5662 .unwrap(),
5663 )
5664 .await
5665 .unwrap();
5666 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
5667 }
5668
5669 #[tokio::test]
5670 async fn green_logfile_structured_fields_preserved() {
5671 let mut state = test_state();
5672 state.log_buffer = Some(make_log_buffer_with_entries());
5673
5674 let response =
5675 logfile_endpoint(State(state), axum::extract::Query(LogfileQuery::default()))
5676 .await
5677 .unwrap();
5678 let body = response.0;
5679 let first = &body.entries[0];
5680 assert_eq!(first.target, "myapp::orders");
5681 assert_eq!(first.fields["order_id"].as_str().unwrap(), "A-1001");
5682 assert_eq!(first.request_id.as_deref(), Some("req-abc"));
5683 }
5684}
5685
5686#[cfg(test)]
5687mod health_indicator_tests {
5688 use super::*;
5689
5690 struct AlwaysUp;
5691 impl HealthIndicator for AlwaysUp {
5692 fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5693 Box::pin(async { HealthCheckOutput::up() })
5694 }
5695 }
5696
5697 struct AlwaysDown;
5698 impl HealthIndicator for AlwaysDown {
5699 fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5700 Box::pin(async { HealthCheckOutput::down() })
5701 }
5702 }
5703
5704 #[test]
5705 fn health_status_as_str_values() {
5706 assert_eq!(HealthStatus::Up.as_str(), "UP");
5707 assert_eq!(HealthStatus::Down.as_str(), "DOWN");
5708 assert_eq!(HealthStatus::OutOfService.as_str(), "OUT_OF_SERVICE");
5709 assert_eq!(HealthStatus::Unknown.as_str(), "UNKNOWN");
5710 }
5711
5712 #[test]
5713 fn health_status_is_healthy() {
5714 assert!(HealthStatus::Up.is_healthy());
5715 assert!(HealthStatus::Unknown.is_healthy());
5716 assert!(!HealthStatus::Down.is_healthy());
5717 assert!(!HealthStatus::OutOfService.is_healthy());
5718 }
5719
5720 #[test]
5721 fn aggregate_status_precedence() {
5722 assert_eq!(
5723 HealthIndicatorRegistry::aggregate_status(&[HealthStatus::Up]),
5724 HealthStatus::Up
5725 );
5726 assert_eq!(
5727 HealthIndicatorRegistry::aggregate_status(&[HealthStatus::Up, HealthStatus::Unknown]),
5728 HealthStatus::Unknown
5729 );
5730 assert_eq!(
5731 HealthIndicatorRegistry::aggregate_status(&[
5732 HealthStatus::Unknown,
5733 HealthStatus::OutOfService
5734 ]),
5735 HealthStatus::OutOfService
5736 );
5737 assert_eq!(
5738 HealthIndicatorRegistry::aggregate_status(&[
5739 HealthStatus::OutOfService,
5740 HealthStatus::Down
5741 ]),
5742 HealthStatus::Down
5743 );
5744 assert_eq!(
5745 HealthIndicatorRegistry::aggregate_status(&[]),
5746 HealthStatus::Up
5747 );
5748 }
5749
5750 #[tokio::test]
5751 async fn registry_run_all_collects_results() {
5752 let registry = HealthIndicatorRegistry::new();
5753 registry
5754 .register("svc_a", IndicatorGroup::Readiness, Arc::new(AlwaysUp))
5755 .unwrap();
5756 registry
5757 .register("svc_b", IndicatorGroup::HealthOnly, Arc::new(AlwaysDown))
5758 .unwrap();
5759
5760 let results = registry.run_all().await;
5761 assert!(
5762 results
5763 .iter()
5764 .any(|r| r.name == "svc_a" && r.output.status == HealthStatus::Up)
5765 );
5766 assert!(
5767 results
5768 .iter()
5769 .any(|r| r.name == "svc_b" && r.output.status == HealthStatus::Down)
5770 );
5771 }
5772
5773 #[tokio::test]
5774 async fn registry_run_readiness_filters_health_only() {
5775 let registry = HealthIndicatorRegistry::new();
5776 registry
5777 .register("probe_check", IndicatorGroup::Readiness, Arc::new(AlwaysUp))
5778 .unwrap();
5779 registry
5780 .register(
5781 "health_only",
5782 IndicatorGroup::HealthOnly,
5783 Arc::new(AlwaysDown),
5784 )
5785 .unwrap();
5786
5787 let results = registry.run_readiness().await;
5788 assert_eq!(results.len(), 1);
5789 assert_eq!(results[0].name, "probe_check");
5790 }
5791
5792 #[tokio::test]
5793 async fn timed_out_indicator_reports_unknown_with_flag() {
5794 struct SlowIndicator;
5795 impl HealthIndicator for SlowIndicator {
5796 fn check(&self) -> futures::future::BoxFuture<'_, HealthCheckOutput> {
5797 Box::pin(async {
5798 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
5799 HealthCheckOutput::up()
5800 })
5801 }
5802 fn timeout_ms(&self) -> u64 {
5803 5
5804 }
5805 }
5806 let registry = HealthIndicatorRegistry::new();
5807 registry
5808 .register("slow", IndicatorGroup::Readiness, Arc::new(SlowIndicator))
5809 .unwrap();
5810 let results = registry.run_all().await;
5811 let slow_res = results
5812 .iter()
5813 .find(|r| r.name == "slow")
5814 .expect("slow indicator not found");
5815 assert_eq!(slow_res.output.status, HealthStatus::Unknown);
5816 assert_eq!(
5817 slow_res.output.details.get("timed_out"),
5818 Some(&serde_json::Value::Bool(true))
5819 );
5820 }
5821
5822 #[tokio::test]
5823 #[allow(clippy::await_holding_lock)]
5824 async fn test_circuit_breakers_in_health_indicator_registry() {
5825 let _lock = crate::circuit_breaker::TEST_LOCK
5826 .lock()
5827 .unwrap_or_else(std::sync::PoisonError::into_inner);
5828 crate::circuit_breaker::global_registry().clear();
5829 let registry = HealthIndicatorRegistry::new();
5830 let breaker = crate::circuit_breaker::global_registry().get_or_create(
5831 "actuator_test_breaker",
5832 crate::circuit_breaker::CircuitBreakerPolicy {
5833 failure_ratio_threshold: 0.5,
5834 sample_window: std::time::Duration::from_secs(10),
5835 minimum_sample_count: 2,
5836 open_duration: std::time::Duration::from_secs(60),
5837 half_open_trial_count: 2,
5838 },
5839 );
5840
5841 let results = registry.run_all().await;
5842 let found = results
5843 .iter()
5844 .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5845 assert!(found.is_some(), "Should find circuit breaker in run_all");
5846 let result = found.unwrap();
5847 assert_eq!(result.group, IndicatorGroup::HealthOnly);
5848 assert_eq!(result.output.status, HealthStatus::Up);
5849 assert_eq!(result.output.details.get("state").unwrap(), "CLOSED");
5850
5851 breaker.after_call(false);
5852 breaker.after_call(false);
5853 assert_eq!(breaker.state(), crate::circuit_breaker::CircuitState::Open);
5854
5855 let results = registry.run_all().await;
5856 let found = results
5857 .iter()
5858 .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5859 assert_eq!(found.unwrap().output.status, HealthStatus::Down);
5860 assert_eq!(found.unwrap().output.details.get("state").unwrap(), "OPEN");
5861
5862 {
5864 let mut inner = breaker.inner.lock().unwrap();
5865 inner.state = crate::circuit_breaker::CircuitState::HalfOpen;
5866 inner.half_open_in_flight = 0;
5867 inner.half_open_successes = 0;
5868 inner.half_open_failures = 0;
5869 }
5870 assert_eq!(
5871 breaker.state(),
5872 crate::circuit_breaker::CircuitState::HalfOpen
5873 );
5874
5875 let results = registry.run_all().await;
5876 let found = results
5877 .iter()
5878 .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5879 assert_eq!(found.unwrap().output.status, HealthStatus::Down);
5880 assert_eq!(
5881 found.unwrap().output.details.get("state").unwrap(),
5882 "HALF_OPEN"
5883 );
5884
5885 let readiness_results = registry.run_readiness().await;
5886 let found_readiness = readiness_results
5887 .iter()
5888 .find(|r| r.name == "circuit_breaker.actuator_test_breaker");
5889 assert!(
5890 found_readiness.is_none(),
5891 "Should NOT find circuit breaker in run_readiness"
5892 );
5893 crate::circuit_breaker::global_registry().clear();
5894 }
5895}
5896
5897#[cfg(test)]
5898mod havoc_proptest {
5899 use super::*;
5900 use proptest::prelude::*;
5901
5902 proptest! {
5903 #![proptest_config(ProptestConfig::with_cases(1))]
5904 #[test]
5905 fn log_levels_memory_exhaustion(names in proptest::collection::vec(".*", 5000)) {
5906 let levels = LogLevels::new("info");
5907 for name in names {
5908 let _ = levels.set_logger_level(&name, "debug");
5909 }
5910 assert!(levels.logger_overrides().len() <= 1000, "Memory leak: unbounded loggers inserted");
5911 }
5912 }
5913}
5914
5915#[cfg(all(feature = "maud", feature = "htmx"))]
5918async fn ui_dashboard() -> impl IntoResponse {
5919 let html = maud::html! {
5920 (maud::DOCTYPE)
5921 html lang="en" {
5922 head {
5923 meta charset="utf-8";
5924 meta name="viewport" content="width=device-width, initial-scale=1";
5925 title { "Autumn Actuator Dashboard" }
5926 script src="/static/js/htmx.min.js" {}
5927 style {
5928 (crate::ui::tokens::TOKENS_CSS)
5929 "body { font-family: var(--font-family); background: var(--bg); color: var(--text); margin: 0; padding: 2rem; }"
5930 "h1 { font-size: 1.5rem; font-weight: 600; margin-bottom: 1.5rem; }"
5931 ".grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1.5rem; }"
5932 ".card { background: var(--surface); padding: 1.5rem; border-radius: var(--radius); box-shadow: var(--shadow); }"
5933 ".card h2 { font-size: 1.125rem; font-weight: 500; margin-top: 0; margin-bottom: 1rem; border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; }"
5934 ".stat { display: flex; justify-content: space-between; margin-bottom: 0.5rem; }"
5935 ".stat-label { color: var(--text-muted); }"
5936 ".stat-value { font-weight: 500; }"
5937 ".task-item { border: 1px solid var(--border); padding: 0.75rem; border-radius: 0.375rem; margin-bottom: 0.75rem; }"
5938 ".task-name { font-weight: 600; display: block; margin-bottom: 0.25rem; }"
5939 ".task-meta { font-size: 0.875rem; color: var(--text-muted); }"
5940 ".badge { display: inline-block; padding: 0.125rem 0.375rem; border-radius: 9999px; font-size: 0.75rem; font-weight: 500; }"
5941 ".badge-green { background: #dcfce7; color: #166534; }"
5942 ".badge-gray { background: #f3f4f6; color: #374151; }"
5943 ".badge-red { background: #fee2e2; color: #991b1b; }"
5944 }
5945 }
5946 body {
5947 h1 { "🍂 Autumn Actuator Dashboard" }
5948 div class="grid" {
5949 div class="card" hx-get="ui/metrics" hx-trigger="load, every 2s" {
5950 "Loading metrics..."
5951 }
5952 div class="card" hx-get="ui/tasks" hx-trigger="load, every 2s" {
5953 "Loading tasks..."
5954 }
5955 }
5956 }
5957 }
5958 };
5959 (
5960 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
5961 html.into_string(),
5962 )
5963}
5964
5965#[cfg(not(all(feature = "maud", feature = "htmx")))]
5966async fn ui_dashboard() -> impl IntoResponse {
5967 (
5968 StatusCode::NOT_IMPLEMENTED,
5969 "Maud feature is required for the UI dashboard",
5970 )
5971}
5972
5973#[cfg(all(feature = "maud", feature = "htmx"))]
5974async fn ui_metrics<S: ProvideActuatorState>(State(state): State<S>) -> impl IntoResponse {
5975 let metrics = state.metrics().snapshot();
5976 let uptime = state.uptime_display();
5977
5978 let html = maud::html! {
5979 h2 { "System Metrics" }
5980 div class="stat" {
5981 span class="stat-label" { "Uptime" }
5982 span class="stat-value" { (uptime) }
5983 }
5984 div class="stat" {
5985 span class="stat-label" { "Total Requests" }
5986 span class="stat-value" { (metrics.http.requests_total) }
5987 }
5988 div class="stat" {
5989 span class="stat-label" { "Active Requests" }
5990 span class="stat-value" { (metrics.http.requests_active) }
5991 }
5992 div class="stat" {
5993 span class="stat-label" { "P95 Latency" }
5994 span class="stat-value" { (metrics.http.latency_ms.p95) " ms" }
5995 }
5996 div class="stat" {
5997 span class="stat-label" { "P99 Latency" }
5998 span class="stat-value" { (metrics.http.latency_ms.p99) " ms" }
5999 }
6000 };
6001 (
6002 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
6003 html.into_string(),
6004 )
6005}
6006
6007#[cfg(not(all(feature = "maud", feature = "htmx")))]
6008async fn ui_metrics<S: ProvideActuatorState>() -> impl IntoResponse {
6009 (
6010 StatusCode::NOT_IMPLEMENTED,
6011 "Maud feature is required for the UI dashboard",
6012 )
6013}
6014
6015#[cfg(all(feature = "maud", feature = "htmx"))]
6016async fn ui_tasks<S: ProvideActuatorState>(State(state): State<S>) -> impl IntoResponse {
6017 let tasks = state.task_registry().snapshot();
6018
6019 let html = maud::html! {
6020 h2 { "Background Tasks" }
6021 @if tasks.is_empty() {
6022 p class="stat-label" { "No tasks registered." }
6023 } @else {
6024 @for (name, task) in tasks.iter() {
6025 div class="task-item" {
6026 span class="task-name" { (name) }
6027 div class="task-meta" {
6028 @if task.status == "running" {
6029 span class="badge badge-green" { "Running" }
6030 } @else {
6031 span class="badge badge-gray" { "Idle" }
6032 }
6033 " "
6034 "Runs: " (task.total_runs)
6035 @if task.total_failures > 0 {
6036 " " span class="badge badge-red" { "Failures: " (task.total_failures) }
6037 }
6038 }
6039 }
6040 }
6041 }
6042 };
6043 (
6044 [(axum::http::header::CONTENT_TYPE, "text/html; charset=utf-8")],
6045 html.into_string(),
6046 )
6047}
6048
6049#[cfg(not(all(feature = "maud", feature = "htmx")))]
6050async fn ui_tasks<S: ProvideActuatorState>() -> impl IntoResponse {
6051 (
6052 StatusCode::NOT_IMPLEMENTED,
6053 "Maud feature is required for the UI dashboard",
6054 )
6055}