Skip to main content

autumn_web/
state.rs

1//! Shared application state.
2//!
3//! This module defines [`AppState`], the core state object passed to all
4//! Axum route handlers. It contains framework-managed resources like the
5//! database connection pool, metrics collector, and WebSocket channels.
6//!
7//! Handlers typically don't extract `AppState` directly. Instead, they use
8//! specialized extractors like [`Db`](crate::Db) which pull what they need
9//! from the state. However, custom extractors can access the state via
10//! `crate::extract::State<AppState>`.
11
12use std::any::{Any, TypeId};
13use std::collections::HashMap;
14use std::sync::Arc;
15
16use crate::cache::Cache;
17use crate::time::{ClockSource, SystemClock};
18
19/// Newtype wrapper used to store the global cache in the extension map so that
20/// `set_cache` (called from startup hooks) is visible to all `AppState` clones.
21pub struct GlobalCacheEntry(pub Arc<dyn Cache>);
22
23use crate::actuator;
24use crate::authorization::{ForbiddenResponse, Policy, PolicyRegistry, Scope};
25#[cfg(feature = "ws")]
26use crate::channels::Channels;
27#[cfg(feature = "db")]
28use crate::db::DbState;
29use crate::middleware;
30#[cfg(feature = "presence")]
31use crate::presence::Presence;
32use crate::probe;
33#[cfg(feature = "ws")]
34use tokio_util::sync::CancellationToken;
35
36/// Shared application state passed to all route handlers.
37///
38/// Holds framework-managed resources such as the database connection pool.
39/// Axum requires handler state to be [`Clone`], so internal resources use
40/// `Arc` or are already cheaply cloneable (`deadpool::Pool` is `Arc`-wrapped
41/// internally).
42///
43/// This struct is normally constructed by [`crate::app::AppBuilder::run`] and
44/// should not need to be created manually. It is public so that custom
45/// Axum extractors can access framework resources via
46/// `State<AppState>`.
47///
48/// # Examples
49///
50/// ```rust
51/// use autumn_web::AppState;
52///
53/// // State without a database (e.g., for testing)
54/// let state = AppState::for_test().with_profile("dev");
55/// ```
56#[derive(Clone)]
57#[non_exhaustive]
58pub struct AppState {
59    /// Runtime-managed typed extensions installed by integrations after the app
60    /// state has been constructed.
61    pub(crate) extensions: Arc<std::sync::RwLock<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>>,
62
63    /// Primary/write database connection pool, or `None` when no
64    /// `database.primary_url` or legacy `database.url` is configured.
65    #[cfg(feature = "db")]
66    pub(crate) pool:
67        Option<diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>,
68
69    /// Read-replica connection pool, or `None` when no replica role is configured.
70    #[cfg(feature = "db")]
71    pub(crate) replica_pool:
72        Option<diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>,
73
74    /// Active profile name (e.g., "dev", "prod", "staging").
75    pub(crate) profile: Option<String>,
76
77    /// When the application started. Used for uptime calculation.
78    pub(crate) started_at: std::time::Instant,
79
80    /// Whether the health endpoint should include detailed info.
81    pub(crate) health_detailed: bool,
82
83    /// Probe lifecycle state for liveness, readiness, and startup endpoints.
84    pub(crate) probes: probe::ProbeState,
85
86    /// In-memory metrics collector for the `/actuator/metrics` endpoint.
87    pub(crate) metrics: middleware::MetricsCollector,
88
89    /// Runtime log level state for the `/actuator/loggers` endpoint.
90    pub(crate) log_levels: actuator::LogLevels,
91
92    /// Scheduled task registry for the `/actuator/tasks` endpoint.
93    pub(crate) task_registry: actuator::TaskRegistry,
94    /// Job registry for the `/actuator/jobs` endpoint.
95    pub(crate) job_registry: actuator::JobRegistry,
96
97    /// Resolved config properties with source tracking for `/actuator/configprops`.
98    pub(crate) config_props: actuator::ConfigProperties,
99
100    /// Registry of plugin-contributed metrics sources, populated by
101    /// [`crate::app::AppBuilder::metrics_source`].
102    pub(crate) metrics_source_registry: actuator::MetricsSourceRegistry,
103
104    /// Registry of custom health indicators, populated by
105    /// [`crate::app::AppBuilder::health_indicator`].
106    pub(crate) health_indicator_registry: actuator::HealthIndicatorRegistry,
107
108    /// Named broadcast channel registry for real-time messaging.
109    ///
110    /// Available when the `ws` feature is enabled. Use
111    /// [`channels()`](Self::channels) for convenient access.
112    #[cfg(feature = "ws")]
113    pub(crate) channels: Channels,
114
115    /// Distributed presence tracker layered on top of [`Channels`].
116    ///
117    /// Available when the `presence` feature is enabled. Use
118    /// [`presence()`](Self::presence) for convenient access.
119    #[cfg(feature = "presence")]
120    pub(crate) presence: Presence,
121
122    /// Cancellation token signalled during graceful shutdown.
123    ///
124    /// WebSocket handlers receive a child token so they can clean up
125    /// when the server is stopping.
126    #[cfg(feature = "ws")]
127    pub(crate) shutdown: CancellationToken,
128
129    /// Per-resource policy + scope registry used by `#[authorize]`
130    /// and `#[repository(policy = ...)]`-generated handlers.
131    pub(crate) policy_registry: PolicyRegistry,
132
133    /// HTTP status returned when a [`Policy`] denies a record-level
134    /// action. Defaults to `404 Not Found` to mirror Rails / Phoenix
135    /// posture and avoid leaking record existence.
136    pub(crate) forbidden_response: ForbiddenResponse,
137
138    /// Session key the `#[authorize]` machinery reads to resolve the
139    /// authenticated user id for the
140    /// [`PolicyContext`](crate::authorization::PolicyContext).
141    /// Mirrors `[auth] session_key` (default: `"user_id"`).
142    pub(crate) auth_session_key: String,
143
144    /// Shared application cache backend. `None` means no global cache has been
145    /// registered; `#[cached]` will fall back to its per-function Moka store.
146    pub(crate) shared_cache: Option<Arc<dyn Cache>>,
147
148    /// Injected wall-clock. Defaults to [`SystemClock`] (real time).
149    /// Tests override via [`crate::test::TestApp::with_clock`].
150    pub(crate) clock: Arc<dyn ClockSource>,
151}
152
153impl AppState {
154    /// Install or replace a typed runtime extension.
155    ///
156    /// Integrations use this to publish typed runtime resources, such as
157    /// background-worker handles or dedicated storage pools, after startup.
158    ///
159    /// # Panics
160    ///
161    /// Panics if the internal extension map mutex is poisoned.
162    pub fn insert_extension<T>(&self, value: T)
163    where
164        T: Any + Send + Sync + 'static,
165    {
166        self.extensions
167            .write()
168            .expect("app state extension lock poisoned")
169            .insert(TypeId::of::<T>(), Arc::new(value));
170    }
171
172    /// Borrow a typed runtime extension if it has been installed.
173    ///
174    /// The returned [`Arc`] is cloned out of the internal registry so callers
175    /// do not hold the state mutex while using the value.
176    ///
177    /// # Panics
178    ///
179    /// Panics if the internal extension map mutex is poisoned.
180    #[must_use]
181    pub fn extension<T>(&self) -> Option<Arc<T>>
182    where
183        T: Any + Send + Sync + 'static,
184    {
185        self.extensions
186            .read()
187            .expect("app state extension lock poisoned")
188            .get(&TypeId::of::<T>())
189            .cloned()
190            .and_then(|value| Arc::downcast::<T>(value).ok())
191    }
192
193    /// Returns the registered error reporters, if any were installed via
194    /// [`AppBuilder::with_error_reporter`](crate::app::AppBuilder::with_error_reporter).
195    ///
196    /// Returns an empty `Vec` when none are registered; the
197    /// [`ReportingLayer`](crate::reporting::ReportingLayer) then falls back to
198    /// the built-in [`LogReporter`](crate::reporting::LogReporter).
199    #[cfg(feature = "reporting")]
200    #[must_use]
201    pub(crate) fn error_reporters(
202        &self,
203    ) -> Vec<std::sync::Arc<dyn crate::reporting::ErrorReporter>> {
204        self.extension::<crate::reporting::RegisteredReporters>()
205            .map(|reporters| reporters.0.clone())
206            .unwrap_or_default()
207    }
208
209    /// Returns the database connection pool.
210    #[cfg(feature = "db")]
211    #[must_use]
212    pub const fn pool(
213        &self,
214    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
215    {
216        self.pool.as_ref()
217    }
218
219    /// Returns the read-replica database connection pool, if configured.
220    #[cfg(feature = "db")]
221    #[must_use]
222    pub const fn replica_pool(
223        &self,
224    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
225    {
226        self.replica_pool.as_ref()
227    }
228
229    /// Returns the pool used for read-only work.
230    #[cfg(feature = "db")]
231    #[must_use]
232    pub fn read_pool(
233        &self,
234    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
235    {
236        if self.replica_pool.is_some() && self.probes.should_route_reads_to_replica() {
237            self.replica_pool.as_ref()
238        } else if self.replica_pool.is_some() && self.probes.should_fallback_reads_to_primary() {
239            self.pool.as_ref()
240        } else if self.replica_pool.is_some() {
241            None
242        } else {
243            self.pool.as_ref()
244        }
245    }
246
247    /// Returns the metrics collector.
248    #[must_use]
249    pub const fn metrics(&self) -> &middleware::MetricsCollector {
250        &self.metrics
251    }
252
253    /// Returns the log levels configuration.
254    #[must_use]
255    pub const fn log_levels(&self) -> &actuator::LogLevels {
256        &self.log_levels
257    }
258
259    /// Returns the task registry.
260    #[must_use]
261    pub const fn task_registry(&self) -> &actuator::TaskRegistry {
262        &self.task_registry
263    }
264
265    /// Returns the job registry.
266    #[must_use]
267    pub const fn job_registry(&self) -> &actuator::JobRegistry {
268        &self.job_registry
269    }
270
271    /// Returns the config properties.
272    #[must_use]
273    pub const fn config_props(&self) -> &actuator::ConfigProperties {
274        &self.config_props
275    }
276
277    /// Returns the registry of plugin-contributed metrics sources.
278    #[must_use]
279    pub const fn metrics_source_registry(&self) -> &actuator::MetricsSourceRegistry {
280        &self.metrics_source_registry
281    }
282
283    /// Returns the registry of custom health indicators.
284    #[must_use]
285    pub const fn health_indicator_registry(&self) -> &actuator::HealthIndicatorRegistry {
286        &self.health_indicator_registry
287    }
288
289    /// Returns the resolved [`crate::config::AutumnConfig`] from the extension map.
290    ///
291    /// Falls back to a default config if no config has been installed
292    /// (typically only in tests that don't wire the full startup pipeline).
293    #[must_use]
294    pub fn config(&self) -> crate::config::AutumnConfig {
295        self.extension::<crate::config::AutumnConfig>()
296            .map_or_else(crate::config::AutumnConfig::default, |arc| (*arc).clone())
297    }
298
299    /// Returns the shared probe lifecycle state.
300    #[must_use]
301    pub const fn probes(&self) -> &probe::ProbeState {
302        &self.probes
303    }
304
305    /// Mark startup as complete so readiness can become healthy.
306    pub fn mark_startup_complete(&self) {
307        self.probes.mark_startup_complete();
308    }
309
310    /// Mark the application as draining so readiness flips unhealthy.
311    pub fn begin_shutdown(&self) {
312        self.probes.begin_shutdown();
313    }
314
315    /// Sets the database pool.
316    #[cfg(feature = "db")]
317    #[must_use]
318    pub fn with_pool(
319        mut self,
320        pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
321    ) -> Self {
322        self.pool = Some(pool);
323        self
324    }
325
326    /// Sets the read-replica database pool.
327    #[cfg(feature = "db")]
328    #[must_use]
329    pub fn with_replica_pool(
330        mut self,
331        pool: diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>,
332    ) -> Self {
333        self.replica_pool = Some(pool);
334        self
335    }
336
337    /// Install a typed runtime extension while building test or ad-hoc state.
338    #[must_use]
339    pub fn with_extension<T>(self, value: T) -> Self
340    where
341        T: Any + Send + Sync + 'static,
342    {
343        self.insert_extension(value);
344        self
345    }
346
347    /// Returns the registered global cache backend, if any.
348    ///
349    /// Checks the extension map first (populated at runtime by startup hooks
350    /// via [`Self::set_cache`]) so that a plugin replacing a build-time backend
351    /// is always visible. Falls back to `shared_cache` (set at build time via
352    /// [`Self::with_cache`]).
353    #[must_use]
354    pub fn cache(&self) -> Option<Arc<dyn Cache>> {
355        self.extension::<GlobalCacheEntry>()
356            .map(|e| e.0.clone())
357            .or_else(|| self.shared_cache.clone())
358    }
359
360    /// Register a global cache backend (builder / test helper, build-time).
361    #[must_use]
362    pub fn with_cache(mut self, cache: Arc<dyn Cache>) -> Self {
363        self.shared_cache = Some(cache);
364        self
365    }
366
367    /// Returns the active clock source wired into this state.
368    ///
369    /// Handlers should prefer the [`crate::time::Clock`] extractor; this
370    /// accessor exists for framework internals (middleware, storage) that
371    /// need the time without going through Axum's extractor machinery.
372    #[must_use]
373    pub fn clock(&self) -> &dyn ClockSource {
374        self.clock.as_ref()
375    }
376
377    /// Replace the clock (builder / test helper).
378    #[must_use]
379    pub fn with_clock(mut self, clock: Arc<dyn ClockSource>) -> Self {
380        self.clock = clock;
381        self
382    }
383
384    /// Install or replace the global cache backend at runtime (e.g. from a startup hook).
385    ///
386    /// Updates both the process-level global (used by `#[cached]` functions) and
387    /// the extension map (used by `CacheResponseLayer::from_app` and `state.cache()`).
388    pub fn set_cache(&self, cache: Arc<dyn Cache>) {
389        crate::cache::set_global_cache(cache.clone());
390        self.insert_extension(GlobalCacheEntry(cache));
391    }
392
393    /// Sets the active profile.
394    #[must_use]
395    pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
396        self.profile = Some(profile.into());
397        self
398    }
399
400    /// Returns a reference to the [`PolicyRegistry`].
401    #[must_use]
402    pub const fn policy_registry(&self) -> &PolicyRegistry {
403        &self.policy_registry
404    }
405
406    /// Resolve the registered [`Policy`] for resource `R`, if any.
407    #[must_use]
408    pub fn policy<R: Send + Sync + 'static>(&self) -> Option<std::sync::Arc<dyn Policy<R>>> {
409        self.policy_registry.policy::<R>()
410    }
411
412    /// Resolve the registered [`Scope`] for resource `R`, if any.
413    #[must_use]
414    pub fn scope<R: Send + Sync + 'static>(&self) -> Option<std::sync::Arc<dyn Scope<R>>> {
415        self.policy_registry.scope::<R>()
416    }
417
418    /// Configured deny-response shape. See
419    /// [`ForbiddenResponse`] for the trade-off between `403` and
420    /// `404` defaults.
421    #[must_use]
422    pub const fn forbidden_response(&self) -> ForbiddenResponse {
423        self.forbidden_response
424    }
425
426    /// Session key used to resolve the authenticated user id for
427    /// [`PolicyContext`](crate::authorization::PolicyContext).
428    #[must_use]
429    pub fn auth_session_key(&self) -> &str {
430        &self.auth_session_key
431    }
432
433    /// Override the configured deny response (test helper).
434    #[doc(hidden)]
435    #[must_use]
436    pub const fn with_forbidden_response(mut self, value: ForbiddenResponse) -> Self {
437        self.forbidden_response = value;
438        self
439    }
440
441    /// Override the auth session key (test helper).
442    #[doc(hidden)]
443    #[must_use]
444    pub fn with_auth_session_key(mut self, value: impl Into<String>) -> Self {
445        self.auth_session_key = value.into();
446        self
447    }
448
449    /// Set the startup probe completion flag.
450    #[doc(hidden)]
451    #[must_use]
452    pub fn with_startup_complete(self, startup_complete: bool) -> Self {
453        self.probes.set_startup_complete(startup_complete);
454        self
455    }
456
457    /// Set the readiness draining flag.
458    #[doc(hidden)]
459    #[must_use]
460    pub fn with_draining(self, draining: bool) -> Self {
461        self.probes.set_draining(draining);
462        self
463    }
464
465    /// Returns the active profile name, or `"default"` if none is set.
466    #[must_use]
467    pub fn profile(&self) -> &str {
468        self.profile.as_deref().unwrap_or("default")
469    }
470
471    /// Returns how long the application has been running.
472    #[must_use]
473    pub fn uptime(&self) -> std::time::Duration {
474        self.started_at.elapsed()
475    }
476
477    /// Format uptime as a human-readable string (e.g., "2h 15m").
478    #[must_use]
479    pub fn uptime_display(&self) -> String {
480        let secs = self.started_at.elapsed().as_secs();
481        if secs < 60 {
482            format!("{secs}s")
483        } else if secs < 3600 {
484            format!("{}m {}s", secs / 60, secs % 60)
485        } else {
486            let hours = secs / 3600;
487            let mins = (secs % 3600) / 60;
488            format!("{hours}h {mins}m")
489        }
490    }
491
492    /// Returns a reference to the broadcast channel registry.
493    ///
494    /// Shorthand for accessing `self.channels` directly.
495    #[cfg(feature = "ws")]
496    #[must_use]
497    pub const fn channels(&self) -> &Channels {
498        &self.channels
499    }
500
501    /// Returns a reference to the distributed presence tracker.
502    #[cfg(feature = "presence")]
503    #[must_use]
504    pub const fn presence(&self) -> &Presence {
505        &self.presence
506    }
507
508    /// Returns a high-level broadcast facade for raw and htmx HTML payloads.
509    #[cfg(feature = "ws")]
510    #[must_use]
511    pub fn broadcast(&self) -> crate::channels::Broadcast {
512        self.channels.broadcast()
513    }
514
515    /// Returns a child cancellation token for the server shutdown signal.
516    ///
517    /// WebSocket handlers should select on this to clean up when the
518    /// server is shutting down.
519    #[cfg(feature = "ws")]
520    #[must_use]
521    pub fn shutdown_token(&self) -> CancellationToken {
522        self.shutdown.child_token()
523    }
524
525    /// Helper for integration tests to simulate a server shutdown.
526    #[cfg(feature = "ws")]
527    #[doc(hidden)]
528    pub fn trigger_shutdown_for_test(&self) {
529        self.begin_shutdown();
530        self.shutdown.cancel();
531    }
532
533    /// Update startup completion in tests after the router is already built.
534    #[doc(hidden)]
535    pub fn set_startup_complete_for_test(&self, startup_complete: bool) {
536        self.probes.set_startup_complete(startup_complete);
537    }
538
539    /// Update draining state in tests after the router is already built.
540    #[doc(hidden)]
541    pub fn set_draining_for_test(&self, draining: bool) {
542        self.probes.set_draining(draining);
543    }
544
545    /// Compatibility helper for tests that model shutdown as readiness drain.
546    #[doc(hidden)]
547    pub fn begin_shutdown_for_test(&self) {
548        self.set_draining_for_test(true);
549    }
550
551    /// Create a minimal detached `AppState` without an HTTP server.
552    ///
553    /// This is useful for background runtimes or helper processes that still
554    /// need framework-managed resources such as typed extensions, metrics, or
555    /// WebSocket channel registries.
556    #[must_use]
557    pub fn detached() -> Self {
558        #[cfg(feature = "ws")]
559        let channels = Channels::new(32);
560        Self {
561            extensions: Arc::new(std::sync::RwLock::new(HashMap::new())),
562            #[cfg(feature = "db")]
563            pool: None,
564            #[cfg(feature = "db")]
565            replica_pool: None,
566            profile: None,
567            started_at: std::time::Instant::now(),
568            health_detailed: true,
569            probes: probe::ProbeState::ready_for_test(),
570            metrics: middleware::MetricsCollector::new(),
571            log_levels: actuator::LogLevels::new("info"),
572            task_registry: actuator::TaskRegistry::new(),
573            job_registry: actuator::JobRegistry::new(),
574            config_props: actuator::ConfigProperties::default(),
575            metrics_source_registry: actuator::MetricsSourceRegistry::new(),
576            health_indicator_registry: actuator::HealthIndicatorRegistry::new(),
577            #[cfg(feature = "presence")]
578            presence: Presence::new(channels.clone()),
579            #[cfg(feature = "ws")]
580            channels,
581            #[cfg(feature = "ws")]
582            shutdown: CancellationToken::new(),
583            policy_registry: PolicyRegistry::default(),
584            forbidden_response: ForbiddenResponse::default(),
585            auth_session_key: "user_id".to_owned(),
586            shared_cache: None,
587            clock: Arc::new(SystemClock),
588        }
589    }
590
591    /// Create an `AppState` suitable for testing, with sensible defaults
592    /// for all fields. Database pool is `None`.
593    #[allow(dead_code)]
594    #[must_use]
595    pub fn for_test() -> Self {
596        Self::detached()
597    }
598}
599
600#[cfg(feature = "db")]
601impl DbState for AppState {
602    fn metrics(&self) -> Option<&crate::middleware::MetricsCollector> {
603        Some(&self.metrics)
604    }
605
606    fn pool(
607        &self,
608    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
609    {
610        self.pool.as_ref()
611    }
612
613    fn replica_pool(
614        &self,
615    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
616    {
617        self.replica_pool.as_ref()
618    }
619
620    fn read_pool(
621        &self,
622    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
623    {
624        Self::read_pool(self)
625    }
626
627    fn db_interceptors(
628        &self,
629    ) -> Vec<std::sync::Arc<dyn crate::interceptor::DbConnectionInterceptor>> {
630        self.extension::<Arc<dyn crate::interceptor::DbConnectionInterceptor>>()
631            .map(|arc| vec![(*arc).clone()])
632            .unwrap_or_default()
633    }
634    fn statement_timeout(&self) -> Option<std::time::Duration> {
635        self.extension::<crate::config::AutumnConfig>()
636            .and_then(|cfg| cfg.database.statement_timeout)
637    }
638
639    fn slow_query_threshold(&self) -> std::time::Duration {
640        self.extension::<crate::config::AutumnConfig>().map_or_else(
641            || std::time::Duration::from_millis(500),
642            |cfg| cfg.database.slow_query_threshold,
643        )
644    }
645}
646
647impl crate::probe::ProvideProbeState for AppState {
648    fn probes(&self) -> &crate::probe::ProbeState {
649        &self.probes
650    }
651
652    fn health_detailed(&self) -> bool {
653        self.health_detailed
654    }
655
656    fn profile(&self) -> &str {
657        self.profile()
658    }
659
660    fn uptime_display(&self) -> String {
661        self.uptime_display()
662    }
663
664    #[cfg(feature = "db")]
665    fn pool(
666        &self,
667    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
668    {
669        self.pool.as_ref()
670    }
671
672    #[cfg(feature = "db")]
673    fn replica_pool(
674        &self,
675    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
676    {
677        self.replica_pool.as_ref()
678    }
679
680    fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
681        Some(&self.health_indicator_registry)
682    }
683}
684
685impl crate::actuator::ProvideActuatorState for AppState {
686    fn metrics(&self) -> &crate::middleware::MetricsCollector {
687        &self.metrics
688    }
689
690    fn log_levels(&self) -> &crate::actuator::LogLevels {
691        &self.log_levels
692    }
693
694    fn task_registry(&self) -> &crate::actuator::TaskRegistry {
695        &self.task_registry
696    }
697
698    fn job_registry(&self) -> &crate::actuator::JobRegistry {
699        &self.job_registry
700    }
701
702    fn config_props(&self) -> &crate::actuator::ConfigProperties {
703        &self.config_props
704    }
705
706    fn profile(&self) -> &str {
707        self.profile()
708    }
709
710    fn uptime_display(&self) -> String {
711        self.uptime_display()
712    }
713
714    fn metrics_source_registry(&self) -> Option<&crate::actuator::MetricsSourceRegistry> {
715        Some(&self.metrics_source_registry)
716    }
717
718    fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
719        Some(&self.health_indicator_registry)
720    }
721
722    fn health_detailed(&self) -> bool {
723        self.health_detailed
724    }
725
726    fn deploy_version(&self) -> String {
727        self.extension::<crate::canary::CanaryState>().map_or_else(
728            || crate::canary::STABLE.to_owned(),
729            |c| c.version().to_owned(),
730        )
731    }
732
733    #[cfg(feature = "ws")]
734    fn channels(&self) -> &crate::channels::Channels {
735        &self.channels
736    }
737
738    #[cfg(feature = "ws")]
739    fn shutdown_token(&self) -> tokio_util::sync::CancellationToken {
740        self.shutdown_token()
741    }
742
743    #[cfg(feature = "db")]
744    fn pool(
745        &self,
746    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
747    {
748        self.pool.as_ref()
749    }
750    // a11y_posture() uses the trait default (all-false) intentionally: AppState
751    // cannot know whether the application's layout is accessible.  Override this
752    // method on your own state type — or in a custom ProvideActuatorState impl —
753    // once you have verified that your pages include lang, a skip link, and
754    // landmark regions.  See docs/guide/accessibility.md for details.
755
756    #[cfg(feature = "http-client")]
757    fn webhook_outbound(&self) -> Option<crate::webhook_outbound::WebhookOutboundManager> {
758        self.extension::<crate::webhook_outbound::WebhookOutboundManager>()
759            .map(|x| (*x).clone())
760    }
761
762    fn log_buffer(&self) -> Option<crate::log::capture::LogBuffer> {
763        self.extension::<crate::log::capture::LogBuffer>()
764            .map(|x| (*x).clone())
765    }
766}
767
768impl std::fmt::Debug for AppState {
769    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
770        let mut s = f.debug_struct("AppState");
771        #[cfg(feature = "db")]
772        s.field(
773            "pool",
774            &self
775                .pool
776                .as_ref()
777                .map(|p| format!("Pool(max={})", p.status().max_size)),
778        );
779        s.field(
780            "extensions",
781            &self
782                .extensions
783                .read()
784                .map_or(0, |extensions| extensions.len()),
785        );
786        s.field("profile", &self.profile)
787            .field("started_at", &self.started_at)
788            .field("health_detailed", &self.health_detailed)
789            .field("probes", &self.probes)
790            .field("metrics", &"MetricsCollector")
791            .field("log_levels", &"LogLevels")
792            .field("task_registry", &"TaskRegistry")
793            .finish_non_exhaustive()
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    #[cfg(feature = "db")]
801    use crate::config;
802    #[cfg(feature = "db")]
803    use crate::db;
804
805    #[test]
806    fn app_state_debug_without_pool() {
807        let state = AppState::for_test().with_profile("dev");
808        let debug = format!("{state:?}");
809        assert!(debug.contains("AppState"));
810        assert!(debug.contains("dev"));
811    }
812
813    #[cfg(feature = "db")]
814    #[test]
815    fn app_state_debug_with_pool() {
816        let config = config::DatabaseConfig {
817            url: Some("postgres://localhost/test".into()),
818            pool_size: 5,
819            ..Default::default()
820        };
821        let pool = db::create_pool(&config).unwrap().unwrap();
822        let state = AppState::for_test().with_pool(pool);
823        let debug = format!("{state:?}");
824        assert!(debug.contains("Pool(max=5)"));
825    }
826
827    #[cfg(feature = "db")]
828    #[test]
829    fn database_topology_state_exposes_replica_as_read_pool() {
830        let primary_config = config::DatabaseConfig {
831            url: Some("postgres://localhost/primary".into()),
832            pool_size: 5,
833            ..Default::default()
834        };
835        let replica_config = config::DatabaseConfig {
836            url: Some("postgres://localhost/replica".into()),
837            pool_size: 2,
838            ..Default::default()
839        };
840        let primary = db::create_pool(&primary_config).unwrap().unwrap();
841        let replica = db::create_pool(&replica_config).unwrap().unwrap();
842
843        let state = AppState::for_test()
844            .with_pool(primary)
845            .with_replica_pool(replica);
846
847        assert_eq!(state.pool().expect("primary pool").status().max_size, 5);
848        assert_eq!(
849            state
850                .replica_pool()
851                .expect("replica pool")
852                .status()
853                .max_size,
854            2
855        );
856        assert_eq!(state.read_pool().expect("read pool").status().max_size, 2);
857    }
858
859    #[cfg(feature = "db")]
860    #[test]
861    fn read_pool_uses_primary_when_replica_is_unready_and_policy_allows_fallback() {
862        let primary_config = config::DatabaseConfig {
863            url: Some("postgres://localhost/primary".into()),
864            pool_size: 5,
865            ..Default::default()
866        };
867        let replica_config = config::DatabaseConfig {
868            url: Some("postgres://localhost/replica".into()),
869            pool_size: 2,
870            ..Default::default()
871        };
872        let primary = db::create_pool(&primary_config).unwrap().unwrap();
873        let replica = db::create_pool(&replica_config).unwrap().unwrap();
874
875        let state = AppState::for_test()
876            .with_pool(primary)
877            .with_replica_pool(replica);
878        state
879            .probes()
880            .configure_replica_dependency(config::ReplicaFallback::Primary);
881        state
882            .probes()
883            .mark_replica_unready("replica migrations lag primary");
884
885        assert_eq!(state.read_pool().expect("read pool").status().max_size, 5);
886        assert_eq!(
887            db::DbState::read_pool(&state)
888                .expect("trait read pool")
889                .status()
890                .max_size,
891            5
892        );
893    }
894
895    #[cfg(feature = "db")]
896    #[test]
897    fn read_pool_does_not_route_to_unready_replica_when_policy_fails_readiness() {
898        let primary_config = config::DatabaseConfig {
899            url: Some("postgres://localhost/primary".into()),
900            pool_size: 5,
901            ..Default::default()
902        };
903        let replica_config = config::DatabaseConfig {
904            url: Some("postgres://localhost/replica".into()),
905            pool_size: 2,
906            ..Default::default()
907        };
908        let primary = db::create_pool(&primary_config).unwrap().unwrap();
909        let replica = db::create_pool(&replica_config).unwrap().unwrap();
910
911        let state = AppState::for_test()
912            .with_pool(primary)
913            .with_replica_pool(replica);
914        state
915            .probes()
916            .configure_replica_dependency(config::ReplicaFallback::FailReadiness);
917        state
918            .probes()
919            .mark_replica_unready("replica connection failed");
920
921        assert!(state.read_pool().is_none());
922    }
923
924    #[cfg(feature = "db")]
925    #[tokio::test]
926    async fn readiness_fails_when_app_state_replica_is_unready_and_policy_is_fail_readiness() {
927        let primary_config = config::DatabaseConfig {
928            url: Some("postgres://localhost/primary".into()),
929            pool_size: 5,
930            ..Default::default()
931        };
932        let replica_config = config::DatabaseConfig {
933            url: Some("postgres://localhost/replica".into()),
934            pool_size: 2,
935            ..Default::default()
936        };
937        let primary = db::create_pool(&primary_config).unwrap().unwrap();
938        let replica = db::create_pool(&replica_config).unwrap().unwrap();
939
940        let state = AppState::for_test()
941            .with_pool(primary)
942            .with_replica_pool(replica);
943        state
944            .probes()
945            .configure_replica_dependency(config::ReplicaFallback::FailReadiness);
946        state
947            .probes()
948            .mark_replica_unready("replica migrations lag primary");
949
950        let (status, _) = crate::probe::readiness_response(&state).await;
951
952        assert_eq!(status, http::StatusCode::SERVICE_UNAVAILABLE);
953    }
954
955    #[test]
956    fn detached_state_starts_without_profile() {
957        let state = AppState::detached();
958
959        assert_eq!(state.profile(), "default");
960    }
961
962    fn require_clone<T: Clone>(t: &T) -> T {
963        t.clone()
964    }
965
966    #[test]
967    fn app_state_is_clone() {
968        let state = AppState::for_test();
969        let _cloned = require_clone(&state);
970    }
971
972    #[test]
973    fn app_state_profile_accessor() {
974        let state = AppState::for_test().with_profile("staging");
975        assert_eq!(state.profile(), "staging");
976    }
977
978    #[test]
979    fn app_state_deploy_version_defaults_to_stable() {
980        use crate::actuator::ProvideActuatorState;
981        let state = AppState::for_test();
982        assert_eq!(state.deploy_version(), crate::canary::STABLE);
983    }
984
985    #[test]
986    fn app_state_deploy_version_reads_canary_extension() {
987        use crate::actuator::ProvideActuatorState;
988        let state = AppState::for_test();
989        state.insert_extension(crate::canary::CanaryState::new(crate::canary::CANARY));
990        assert_eq!(state.deploy_version(), crate::canary::CANARY);
991    }
992
993    #[test]
994    fn app_state_profile_default() {
995        let state = AppState::for_test();
996        assert_eq!(state.profile(), "default");
997    }
998
999    #[test]
1000    fn app_state_uptime_display() {
1001        let state = AppState::for_test();
1002        let display = state.uptime_display();
1003        assert!(
1004            display.contains('s'),
1005            "uptime should contain 's': {display}"
1006        );
1007    }
1008
1009    #[test]
1010    fn app_state_accessors() {
1011        let state = AppState::for_test();
1012
1013        // Exercise the new getters to ensure they compile and return the expected types
1014        let _metrics = state.metrics();
1015        let _log_levels = state.log_levels();
1016        let _task_registry = state.task_registry();
1017        let _config_props = state.config_props();
1018
1019        #[cfg(feature = "db")]
1020        {
1021            let _pool = state.pool();
1022        }
1023        let _missing = state.extension::<String>();
1024    }
1025
1026    #[test]
1027    fn app_state_runtime_extensions_round_trip() {
1028        let state = AppState::for_test();
1029        state.insert_extension(String::from("haunted"));
1030
1031        let stored = state
1032            .extension::<String>()
1033            .expect("runtime extension should be installed");
1034
1035        assert_eq!(stored.as_str(), "haunted");
1036    }
1037}