1use std::any::{Any, TypeId};
28use std::collections::{HashMap, HashSet};
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32
33use futures::FutureExt as _;
34use tracing::Instrument as _;
35
36use crate::config::{AutumnConfig, ConfigLoader};
37use crate::error_pages::{ErrorPageRenderer, SharedRenderer};
38use crate::middleware::exception_filter::ExceptionFilter;
39#[cfg(feature = "db")]
40use crate::migrate;
41use crate::route::Route;
42use crate::state::AppState;
43
44#[must_use]
67pub fn app() -> AppBuilder {
68 AppBuilder {
69 routes: Vec::new(),
70 api_versions: Vec::new(),
71 route_sources: Vec::new(),
72 current_plugin: None,
73 tasks: Vec::new(),
74 one_off_tasks: Vec::new(),
75 jobs: Vec::new(),
76 static_metas: Vec::new(),
77 exception_filters: Vec::new(),
78 scoped_groups: Vec::new(),
79 merge_routers: Vec::new(),
80 nest_routers: Vec::new(),
81 custom_layers: Vec::new(),
82 startup_hooks: Vec::new(),
83 state_initializers: Vec::new(),
84 shutdown_hooks: Vec::new(),
85 extensions: HashMap::new(),
86 registered_plugins: HashSet::new(),
87 error_page_renderer: None,
88 #[cfg(feature = "db")]
89 migrations: Vec::new(),
90 config_loader_factory: None,
91 #[cfg(feature = "db")]
92 pool_provider_factory: None,
93 telemetry_provider: None,
94 session_store: None,
95 #[cfg(feature = "ws")]
96 channels_backend: None,
97 #[cfg(feature = "storage")]
98 blob_store: None,
99 cache_backend: None,
100 #[cfg(feature = "reporting")]
101 error_reporters: Vec::new(),
102 #[cfg(feature = "openapi")]
103 openapi: None,
104 #[cfg(feature = "mcp")]
105 mcp: None,
106 audit_logger: None,
107 #[cfg(feature = "i18n")]
108 i18n_bundle: None,
109 #[cfg(feature = "i18n")]
110 i18n_auto_load: false,
111 policy_registrations: Vec::new(),
112 #[cfg(feature = "mail")]
113 mail_delivery_queue_factory: None,
114 #[cfg(feature = "mail")]
115 mail_previews: Vec::new(),
116 declared_routes: Vec::new(),
117 idempotency_enabled: false,
118 #[cfg(feature = "mail")]
119 mail_interceptor: None,
120 job_interceptor: None,
121 #[cfg(feature = "db")]
122 db_interceptor: None,
123 #[cfg(feature = "ws")]
124 channels_interceptor: None,
125 #[cfg(feature = "oauth2")]
126 http_interceptor: None,
127 seo_sources: Vec::new(),
128 metrics_sources: Vec::new(),
129 health_indicators: Vec::new(),
130 #[cfg(feature = "inbound-mail")]
131 inbound_mail_router: None,
132 }
133}
134
135type StartupHookFuture = Pin<Box<dyn Future<Output = crate::AutumnResult<()>> + Send>>;
136type StartupHook = Box<dyn Fn(AppState) -> StartupHookFuture + Send + Sync>;
137type StateInitializer = Box<dyn FnOnce(&AppState) + Send>;
138type ShutdownHookFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
139type ShutdownHook = Box<dyn Fn() -> ShutdownHookFuture + Send + Sync>;
140
141type ConfigLoaderFactory = Box<
149 dyn FnOnce() -> Pin<
150 Box<dyn Future<Output = Result<AutumnConfig, crate::config::ConfigError>> + Send>,
151 > + Send,
152>;
153#[cfg(feature = "db")]
154type PoolProviderFactory = Box<
155 dyn FnOnce(
156 crate::config::DatabaseConfig,
157 ) -> Pin<
158 Box<
159 dyn Future<
160 Output = Result<Option<crate::db::DatabaseTopology>, crate::db::PoolError>,
161 > + Send,
162 >,
163 > + Send,
164>;
165
166type PolicyRegistration = Box<dyn FnOnce(&crate::authorization::PolicyRegistry) + Send>;
169
170#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
172pub struct ApiVersion {
173 pub version: String,
175 pub deprecated_at: Option<chrono::DateTime<chrono::Utc>>,
177 pub sunset_at: Option<chrono::DateTime<chrono::Utc>>,
179}
180
181#[derive(Clone, Debug)]
183pub struct RegisteredApiVersions(pub Vec<ApiVersion>);
184
185pub struct AppBuilder {
214 pub(crate) routes: Vec<Route>,
215 pub api_versions: Vec<ApiVersion>,
217 route_sources: Vec<crate::route_listing::RouteSource>,
219 current_plugin: Option<String>,
222 tasks: Vec<crate::task::TaskInfo>,
223 one_off_tasks: Vec<crate::task::OneOffTaskInfo>,
224 pub(crate) jobs: Vec<crate::job::JobInfo>,
225 pub(crate) static_metas: Vec<crate::static_gen::StaticRouteMeta>,
226 pub(crate) exception_filters: Vec<Arc<dyn ExceptionFilter>>,
227 pub(crate) scoped_groups: Vec<ScopedGroup>,
228 pub(crate) merge_routers: Vec<axum::Router<AppState>>,
229 pub(crate) nest_routers: Vec<(String, axum::Router<AppState>)>,
230 pub(crate) custom_layers: Vec<CustomLayerRegistration>,
233 pub(crate) startup_hooks: Vec<StartupHook>,
234 pub(crate) state_initializers: Vec<StateInitializer>,
235 pub(crate) shutdown_hooks: Vec<ShutdownHook>,
236 pub(crate) extensions: HashMap<TypeId, Box<dyn Any + Send>>,
237 pub(crate) registered_plugins: HashSet<String>,
239 error_page_renderer: Option<SharedRenderer>,
241 #[cfg(feature = "db")]
243 migrations: Vec<migrate::EmbeddedMigrations>,
244 config_loader_factory: Option<ConfigLoaderFactory>,
247 #[cfg(feature = "db")]
250 pool_provider_factory: Option<PoolProviderFactory>,
251 telemetry_provider: Option<Box<dyn crate::telemetry::TelemetryProvider>>,
254 session_store: Option<Arc<dyn crate::session::BoxedSessionStore>>,
258 #[cfg(feature = "ws")]
261 channels_backend: Option<Arc<dyn crate::channels::ChannelsBackend>>,
262 #[cfg(feature = "storage")]
266 blob_store: Option<crate::storage::SharedBlobStore>,
267 cache_backend: Option<Arc<dyn crate::cache::Cache>>,
271 #[cfg(feature = "reporting")]
277 pub(crate) error_reporters: Vec<Arc<dyn crate::reporting::ErrorReporter>>,
278 #[cfg(feature = "openapi")]
286 openapi: Option<crate::openapi::OpenApiConfig>,
287 #[cfg(feature = "mcp")]
292 mcp: Option<crate::mcp::McpRuntime>,
293 audit_logger: Option<Arc<crate::audit::AuditLogger>>,
295 #[cfg(feature = "i18n")]
299 i18n_bundle: Option<Arc<crate::i18n::Bundle>>,
300 #[cfg(feature = "i18n")]
304 i18n_auto_load: bool,
305 policy_registrations: Vec<PolicyRegistration>,
311 #[cfg(feature = "mail")]
315 mail_delivery_queue_factory: Option<MailDeliveryQueueFactory>,
316 #[cfg(feature = "mail")]
318 mail_previews: Vec<crate::mail::MailPreview>,
319 declared_routes: Vec<crate::route_listing::RouteInfo>,
323 idempotency_enabled: bool,
327 #[cfg(feature = "mail")]
328 mail_interceptor: Option<Arc<dyn crate::interceptor::MailInterceptor>>,
329 job_interceptor: Option<Arc<dyn crate::interceptor::JobInterceptor>>,
330 #[cfg(feature = "db")]
331 db_interceptor: Option<Arc<dyn crate::interceptor::DbConnectionInterceptor>>,
332 #[cfg(feature = "ws")]
333 channels_interceptor: Option<Arc<dyn crate::interceptor::ChannelsInterceptor>>,
334 #[cfg(feature = "oauth2")]
335 http_interceptor: Option<Arc<dyn crate::interceptor::HttpInterceptor>>,
336 seo_sources: Vec<Arc<dyn crate::seo::SitemapSource>>,
339
340 pub(crate) metrics_sources: Vec<(String, Arc<dyn crate::actuator::MetricsSource>)>,
342 pub(crate) health_indicators: Vec<(
344 String,
345 crate::actuator::IndicatorGroup,
346 Arc<dyn crate::actuator::HealthIndicator>,
347 )>,
348 #[cfg(feature = "inbound-mail")]
352 pub(crate) inbound_mail_router: Option<Arc<crate::inbound_mail::InboundMailRouter>>,
353}
354
355#[cfg(feature = "mail")]
359pub(crate) type MailDeliveryQueueFactory = Box<
360 dyn FnOnce(&AppState) -> crate::AutumnResult<Arc<dyn crate::mail::MailDeliveryQueue>> + Send,
361>;
362
363pub struct ScopedGroup {
368 pub prefix: String,
369 pub routes: Vec<Route>,
370 pub source: crate::route_listing::RouteSource,
372 pub apply_layer: Box<dyn FnOnce(axum::Router<AppState>) -> axum::Router<AppState> + Send>,
374}
375
376pub(crate) type CustomLayerApplier =
382 Box<dyn FnOnce(axum::Router<AppState>) -> axum::Router<AppState> + Send>;
383
384pub(crate) struct CustomLayerRegistration {
386 pub(crate) type_id: TypeId,
388 pub(crate) type_name: &'static str,
391 pub(crate) apply: CustomLayerApplier,
393}
394
395mod sealed {
396 pub trait Sealed {}
397}
398
399#[diagnostic::on_unimplemented(
413 message = "`{Self}` is not a usable Autumn app-wide Tower layer",
414 label = "this type does not implement `tower::Layer<axum::routing::Route>` with the required service bounds",
415 note = "`AppBuilder::layer(..)` requires:\n L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,\n L::Service: Service<axum::extract::Request, Response = axum::response::Response, Error = Infallible> + Clone + Send + Sync + 'static,\n <L::Service as Service<axum::extract::Request>>::Future: Send + 'static\nSee docs/guide/middleware.md for common patterns and how to wrap raw-error layers (e.g. TimeoutLayer) with HandleErrorLayer."
416)]
417pub trait IntoAppLayer: sealed::Sealed + Send + Sync + 'static {
418 #[doc(hidden)]
420 fn apply_to(self, router: axum::Router<AppState>) -> axum::Router<AppState>;
421}
422
423impl<L> sealed::Sealed for L
424where
425 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
426 L::Service: tower::Service<
427 axum::extract::Request,
428 Response = axum::response::Response,
429 Error = std::convert::Infallible,
430 > + Clone
431 + Send
432 + Sync
433 + 'static,
434 <L::Service as tower::Service<axum::extract::Request>>::Future: Send + 'static,
435{
436}
437
438impl<L> IntoAppLayer for L
439where
440 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
441 L::Service: tower::Service<
442 axum::extract::Request,
443 Response = axum::response::Response,
444 Error = std::convert::Infallible,
445 > + Clone
446 + Send
447 + Sync
448 + 'static,
449 <L::Service as tower::Service<axum::extract::Request>>::Future: Send + 'static,
450{
451 fn apply_to(self, router: axum::Router<AppState>) -> axum::Router<AppState> {
452 router.layer(self)
453 }
454}
455
456impl AppBuilder {
457 #[must_use]
479 pub fn routes(mut self, routes: Vec<Route>) -> Self {
480 let source = self
481 .current_plugin
482 .as_ref()
483 .map_or(crate::route_listing::RouteSource::User, |name| {
484 crate::route_listing::RouteSource::Plugin(name.clone())
485 });
486 for _ in &routes {
487 self.route_sources.push(source.clone());
488 }
489 self.routes.extend(routes);
490 self
491 }
492
493 #[must_use]
499 pub fn tasks(mut self, tasks: Vec<crate::task::TaskInfo>) -> Self {
500 self.tasks.extend(tasks);
501 self
502 }
503
504 #[must_use]
509 pub fn one_off_tasks(mut self, tasks: Vec<crate::task::OneOffTaskInfo>) -> Self {
510 self.one_off_tasks.extend(tasks);
511 self
512 }
513
514 #[must_use]
516 pub fn jobs(mut self, jobs: Vec<crate::job::JobInfo>) -> Self {
517 self.jobs.extend(jobs);
518 self
519 }
520
521 #[must_use]
526 pub fn static_routes(mut self, metas: Vec<crate::static_gen::StaticRouteMeta>) -> Self {
527 self.static_metas.extend(metas);
528 self
529 }
530
531 #[must_use]
569 pub fn seo_source<S: crate::seo::SitemapSource + 'static>(mut self, source: S) -> Self {
570 self.seo_sources.push(Arc::new(source));
571 self
572 }
573
574 #[cfg(feature = "openapi")]
622 #[must_use]
623 pub fn openapi(mut self, config: crate::openapi::OpenApiConfig) -> Self {
624 self.openapi = Some(config);
625 self
626 }
627
628 #[cfg(feature = "mcp")]
668 #[must_use]
669 pub fn mount_mcp(mut self, path: impl Into<String>) -> Self {
670 let path = path.into();
671 if let Some(rt) = self.mcp.as_mut() {
672 rt.mount_path = path;
673 } else {
674 self.mcp = Some(crate::mcp::McpRuntime::new(path));
675 }
676 self
677 }
678
679 #[cfg(feature = "mcp")]
693 #[must_use]
694 pub fn expose_all_as_mcp(mut self) -> Self {
695 if let Some(rt) = self.mcp.as_mut() {
696 rt.expose_all = true;
697 } else {
698 let mut rt = crate::mcp::McpRuntime::new("/mcp");
699 rt.expose_all = true;
700 self.mcp = Some(rt);
701 }
702 self
703 }
704
705 #[cfg(feature = "mcp")]
718 #[must_use]
719 pub fn secure_mcp<L>(mut self, layer: L) -> Self
720 where
721 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
722 L::Service: tower::Service<
723 axum::http::Request<axum::body::Body>,
724 Response = axum::http::Response<axum::body::Body>,
725 Error = std::convert::Infallible,
726 > + Clone
727 + Send
728 + Sync
729 + 'static,
730 <L::Service as tower::Service<axum::http::Request<axum::body::Body>>>::Future:
731 Send + 'static,
732 {
733 let applier: crate::mcp::McpEndpointLayer = Box::new(move |router| router.layer(layer));
734 if let Some(rt) = self.mcp.as_mut() {
735 rt.endpoint_layer = Some(applier);
736 } else {
737 let mut rt = crate::mcp::McpRuntime::new("/mcp");
738 rt.endpoint_layer = Some(applier);
739 self.mcp = Some(rt);
740 }
741 self
742 }
743
744 #[must_use]
776 pub fn exception_filter(mut self, filter: impl ExceptionFilter) -> Self {
777 self.exception_filters.push(Arc::new(filter));
778 self
779 }
780
781 #[must_use]
818 pub fn error_pages(mut self, renderer: impl ErrorPageRenderer) -> Self {
819 self.error_page_renderer = Some(Arc::new(renderer));
820 self
821 }
822
823 #[must_use]
846 pub fn scoped<L>(mut self, prefix: &str, layer: L, routes: Vec<Route>) -> Self
847 where
848 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
849 L::Service: tower::Service<
850 axum::http::Request<axum::body::Body>,
851 Response = axum::http::Response<axum::body::Body>,
852 Error = std::convert::Infallible,
853 > + Clone
854 + Send
855 + Sync
856 + 'static,
857 <L::Service as tower::Service<axum::http::Request<axum::body::Body>>>::Future:
858 Send + 'static,
859 {
860 let source = self
861 .current_plugin
862 .as_ref()
863 .map_or(crate::route_listing::RouteSource::User, |name| {
864 crate::route_listing::RouteSource::Plugin(name.clone())
865 });
866 self.scoped_groups.push(ScopedGroup {
867 prefix: prefix.to_owned(),
868 routes,
869 source,
870 apply_layer: Box::new(move |router| router.layer(layer)),
871 });
872 self
873 }
874
875 #[must_use]
947 pub fn layer<L: IntoAppLayer>(mut self, layer: L) -> Self {
948 self.custom_layers.push(CustomLayerRegistration {
949 type_id: TypeId::of::<L>(),
950 type_name: std::any::type_name::<L>(),
951 apply: Box::new(move |router| layer.apply_to(router)),
952 });
953 self
954 }
955
956 #[must_use]
961 pub fn has_layer<L: 'static>(&self) -> bool {
962 let layer_type = TypeId::of::<L>();
963 self.custom_layers
964 .iter()
965 .any(|registered| registered.type_id == layer_type)
966 }
967
968 #[must_use]
994 pub const fn idempotent(mut self) -> Self {
995 self.idempotency_enabled = true;
996 self
997 }
998
999 #[must_use]
1004 pub fn get_layer_types(&self) -> Vec<TypeId> {
1005 self.custom_layers
1006 .iter()
1007 .map(|registered| registered.type_id)
1008 .collect()
1009 }
1010
1011 #[must_use]
1054 pub fn merge(mut self, router: axum::Router<AppState>) -> Self {
1055 self.merge_routers.push(router);
1056 self
1057 }
1058
1059 #[must_use]
1096 pub fn nest(mut self, path: &str, router: axum::Router<AppState>) -> Self {
1097 self.nest_routers.push((path.to_owned(), router));
1098 self
1099 }
1100
1101 #[must_use]
1111 pub fn declare_plugin_routes(
1112 mut self,
1113 routes: impl IntoIterator<Item = crate::route_listing::RouteInfo>,
1114 ) -> Self {
1115 let source = self
1116 .current_plugin
1117 .as_deref()
1118 .map_or(crate::route_listing::RouteSource::User, |name| {
1119 crate::route_listing::RouteSource::Plugin(name.to_owned())
1120 });
1121 for mut route in routes {
1122 route.source = source.clone();
1123 self.declared_routes.push(route);
1124 }
1125 self
1126 }
1127
1128 #[must_use]
1134 pub fn on_startup<F, Fut>(mut self, hook: F) -> Self
1135 where
1136 F: Fn(AppState) -> Fut + Send + Sync + 'static,
1137 Fut: Future<Output = crate::AutumnResult<()>> + Send + 'static,
1138 {
1139 self.startup_hooks
1140 .push(Box::new(move |state| Box::pin(hook(state))));
1141 self
1142 }
1143
1144 #[must_use]
1147 pub fn state_initializer<F>(mut self, initializer: F) -> Self
1148 where
1149 F: FnOnce(&AppState) + Send + 'static,
1150 {
1151 self.state_initializers.push(Box::new(initializer));
1152 self
1153 }
1154
1155 #[must_use]
1160 pub fn on_shutdown<F, Fut>(mut self, hook: F) -> Self
1161 where
1162 F: Fn() -> Fut + Send + Sync + 'static,
1163 Fut: Future<Output = ()> + Send + 'static,
1164 {
1165 self.shutdown_hooks.push(Box::new(move || Box::pin(hook())));
1166 self
1167 }
1168
1169 #[must_use]
1171 pub fn api_version(mut self, version: ApiVersion) -> Self {
1172 if let Some(pos) = self
1173 .api_versions
1174 .iter()
1175 .position(|v| v.version == version.version)
1176 {
1177 self.api_versions[pos] = version;
1178 } else {
1179 self.api_versions.push(version);
1180 }
1181 self
1182 }
1183
1184 #[must_use]
1186 pub fn api_versions(mut self, versions: impl IntoIterator<Item = ApiVersion>) -> Self {
1187 for version in versions {
1188 if let Some(pos) = self
1189 .api_versions
1190 .iter()
1191 .position(|v| v.version == version.version)
1192 {
1193 self.api_versions[pos] = version;
1194 } else {
1195 self.api_versions.push(version);
1196 }
1197 }
1198 self
1199 }
1200
1201 #[must_use]
1206 pub fn with_extension<T>(mut self, value: T) -> Self
1207 where
1208 T: Any + Send + 'static,
1209 {
1210 self.extensions.insert(TypeId::of::<T>(), Box::new(value));
1211 self
1212 }
1213
1214 #[must_use]
1222 pub fn update_extension<T, Init, Update>(mut self, init: Init, update: Update) -> Self
1223 where
1224 T: Any + Send + 'static,
1225 Init: FnOnce() -> T,
1226 Update: FnOnce(&mut T),
1227 {
1228 let type_id = TypeId::of::<T>();
1229 let entry = self
1230 .extensions
1231 .entry(type_id)
1232 .or_insert_with(|| Box::new(init()));
1233 let typed = entry
1234 .downcast_mut::<T>()
1235 .expect("extension type map corrupted");
1236 update(typed);
1237 self
1238 }
1239
1240 #[must_use]
1242 pub fn extension<T>(&self) -> Option<&T>
1243 where
1244 T: Any + Send + 'static,
1245 {
1246 self.extensions.get(&TypeId::of::<T>())?.downcast_ref::<T>()
1247 }
1248
1249 #[cfg(feature = "mail")]
1250 #[must_use]
1251 pub fn with_mail_interceptor(
1252 mut self,
1253 interceptor: impl crate::interceptor::MailInterceptor,
1254 ) -> Self {
1255 self.mail_interceptor = Some(Arc::new(interceptor));
1256 self
1257 }
1258
1259 #[must_use]
1260 pub fn with_job_interceptor(
1261 mut self,
1262 interceptor: impl crate::interceptor::JobInterceptor,
1263 ) -> Self {
1264 self.job_interceptor = Some(Arc::new(interceptor));
1265 self
1266 }
1267
1268 #[cfg(feature = "db")]
1269 #[must_use]
1270 pub fn with_db_interceptor(
1271 mut self,
1272 interceptor: impl crate::interceptor::DbConnectionInterceptor,
1273 ) -> Self {
1274 self.db_interceptor = Some(Arc::new(interceptor));
1275 self
1276 }
1277
1278 #[cfg(feature = "ws")]
1279 #[must_use]
1280 pub fn with_channels_interceptor(
1281 mut self,
1282 interceptor: impl crate::interceptor::ChannelsInterceptor,
1283 ) -> Self {
1284 self.channels_interceptor = Some(Arc::new(interceptor));
1285 self
1286 }
1287
1288 #[cfg(feature = "oauth2")]
1289 #[must_use]
1290 pub fn with_http_interceptor(
1291 mut self,
1292 interceptor: impl crate::interceptor::HttpInterceptor,
1293 ) -> Self {
1294 self.http_interceptor = Some(Arc::new(interceptor));
1295 self
1296 }
1297
1298 #[cfg(feature = "i18n")]
1306 #[must_use]
1307 pub fn i18n(mut self, bundle: crate::i18n::Bundle) -> Self {
1308 self.i18n_bundle = Some(Arc::new(bundle));
1309 self.i18n_auto_load = false;
1310 self
1311 }
1312
1313 #[cfg(feature = "i18n")]
1349 #[must_use]
1350 pub fn i18n_auto(mut self) -> Self {
1351 self.i18n_bundle = None;
1352 self.i18n_auto_load = true;
1353 self
1354 }
1355
1356 #[must_use]
1372 pub fn with_config_loader<L>(mut self, loader: L) -> Self
1373 where
1374 L: crate::config::ConfigLoader,
1375 {
1376 if self.config_loader_factory.is_some() {
1377 tracing::warn!(
1378 "config loader replaced; the previously-installed loader was overwritten"
1379 );
1380 }
1381 self.config_loader_factory = Some(Box::new(move || {
1382 Box::pin(async move { loader.load().await })
1383 }));
1384 self
1385 }
1386
1387 #[cfg(feature = "db")]
1395 #[must_use]
1396 pub fn with_pool_provider<P>(mut self, provider: P) -> Self
1397 where
1398 P: crate::db::DatabasePoolProvider,
1399 {
1400 if self.pool_provider_factory.is_some() {
1401 tracing::warn!(
1402 "database pool provider replaced; the previously-installed provider was overwritten"
1403 );
1404 }
1405 self.pool_provider_factory =
1406 Some(Box::new(move |config: crate::config::DatabaseConfig| {
1407 Box::pin(async move { provider.create_topology(&config).await })
1408 }));
1409 self
1410 }
1411
1412 #[must_use]
1419 pub fn with_telemetry_provider<T>(mut self, provider: T) -> Self
1420 where
1421 T: crate::telemetry::TelemetryProvider,
1422 {
1423 if self.telemetry_provider.is_some() {
1424 tracing::warn!(
1425 "telemetry provider replaced; the previously-installed provider was overwritten"
1426 );
1427 }
1428 self.telemetry_provider = Some(Box::new(provider));
1429 self
1430 }
1431
1432 #[must_use]
1439 pub fn with_session_store<S>(mut self, store: S) -> Self
1440 where
1441 S: crate::session::SessionStore,
1442 {
1443 if self.session_store.is_some() {
1444 tracing::warn!(
1445 "session store replaced; the previously-installed store was overwritten"
1446 );
1447 }
1448 self.session_store = Some(Arc::new(store));
1449 self
1450 }
1451
1452 #[cfg(feature = "ws")]
1459 #[must_use]
1460 pub fn with_channels_backend<B>(mut self, backend: B) -> Self
1461 where
1462 B: crate::channels::ChannelsBackend,
1463 {
1464 if self.channels_backend.is_some() {
1465 tracing::warn!(
1466 "channels backend replaced; the previously-installed backend was overwritten"
1467 );
1468 }
1469 self.channels_backend = Some(Arc::new(backend));
1470 self
1471 }
1472
1473 #[cfg(feature = "storage")]
1507 #[must_use]
1508 pub fn with_blob_store<B>(mut self, store: B) -> Self
1509 where
1510 B: crate::storage::BlobStore,
1511 {
1512 if self.blob_store.is_some() {
1513 tracing::warn!("blob store replaced; the previously-installed store was overwritten");
1514 }
1515 self.blob_store = Some(std::sync::Arc::new(store));
1516 self
1517 }
1518
1519 #[must_use]
1538 pub fn with_cache_backend<C: crate::cache::Cache>(mut self, cache: C) -> Self {
1539 if self.cache_backend.is_some() {
1540 tracing::warn!(
1541 "cache backend replaced; the previously-installed backend was overwritten"
1542 );
1543 }
1544 self.cache_backend = Some(Arc::new(cache) as Arc<dyn crate::cache::Cache>);
1545 self
1546 }
1547
1548 #[cfg(feature = "reporting")]
1583 #[must_use]
1584 pub fn with_error_reporter<R: crate::reporting::ErrorReporter>(mut self, reporter: R) -> Self {
1585 self.error_reporters
1586 .push(Arc::new(reporter) as Arc<dyn crate::reporting::ErrorReporter>);
1587 self
1588 }
1589
1590 #[must_use]
1634 pub fn with_flag_store<S>(self, store: S) -> Self
1635 where
1636 S: crate::feature_flags::FlagStore,
1637 {
1638 let service = crate::feature_flags::FeatureFlagService::new(Arc::new(store) as Arc<_>);
1639 self.state_initializer(move |state| {
1640 state.insert_extension(service);
1641 })
1642 }
1643
1644 #[must_use]
1666 pub fn with_flag_store_and_resolver<S>(
1667 self,
1668 store: S,
1669 resolver: crate::feature_flags::GroupResolver,
1670 ) -> Self
1671 where
1672 S: crate::feature_flags::FlagStore,
1673 {
1674 let service = crate::feature_flags::FeatureFlagService::new(Arc::new(store) as Arc<_>)
1675 .with_group_resolver(resolver);
1676 self.state_initializer(move |state| {
1677 state.insert_extension(service);
1678 })
1679 }
1680
1681 #[must_use]
1718 pub fn with_experiment_store<S>(self, store: S) -> Self
1719 where
1720 S: crate::experiments::ExperimentStore,
1721 {
1722 let service = crate::experiments::ExperimentService::new(Arc::new(store) as Arc<_>);
1723 self.state_initializer(move |state| {
1724 state.insert_extension(service);
1725 })
1726 }
1727
1728 #[must_use]
1750 pub fn with_experiment_store_and_sink<S>(
1751 self,
1752 store: S,
1753 sink: Arc<dyn crate::experiments::ExposureSink>,
1754 ) -> Self
1755 where
1756 S: crate::experiments::ExperimentStore,
1757 {
1758 let service = crate::experiments::ExperimentService::new(Arc::new(store) as Arc<_>)
1759 .with_exposure_sink(sink);
1760 self.state_initializer(move |state| {
1761 state.insert_extension(service);
1762 })
1763 }
1764
1765 #[cfg(feature = "mail")]
1776 #[must_use]
1777 pub fn with_mail_delivery_queue(
1778 mut self,
1779 queue: impl crate::mail::MailDeliveryQueue + 'static,
1780 ) -> Self {
1781 let arc: Arc<dyn crate::mail::MailDeliveryQueue> = Arc::new(queue);
1782 self.mail_delivery_queue_factory = Some(Box::new(move |_state| Ok(arc)));
1783 self
1784 }
1785
1786 #[cfg(feature = "mail")]
1796 #[must_use]
1797 pub fn with_mail_delivery_queue_factory<F, Q>(mut self, factory: F) -> Self
1798 where
1799 F: FnOnce(&AppState) -> crate::AutumnResult<Q> + Send + 'static,
1800 Q: crate::mail::MailDeliveryQueue + 'static,
1801 {
1802 self.mail_delivery_queue_factory = Some(Box::new(move |state| {
1803 factory(state).map(|q| Arc::new(q) as Arc<dyn crate::mail::MailDeliveryQueue>)
1804 }));
1805 self
1806 }
1807
1808 #[cfg(feature = "inbound-mail")]
1838 #[must_use]
1839 pub fn inbound_mail_router(mut self, router: crate::inbound_mail::InboundMailRouter) -> Self {
1840 self.inbound_mail_router = Some(Arc::new(router));
1841 self
1842 }
1843
1844 #[cfg(feature = "mail")]
1848 #[must_use]
1849 pub fn mail_previews(
1850 mut self,
1851 previews: impl IntoIterator<Item = crate::mail::MailPreview>,
1852 ) -> Self {
1853 self.mail_previews.extend(previews);
1854 self
1855 }
1856
1857 #[must_use]
1862 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
1863 where
1864 S: crate::audit::AuditSink,
1865 {
1866 let logger = self
1867 .audit_logger
1868 .take()
1869 .map_or_else(crate::audit::AuditLogger::new, |logger| (*logger).clone())
1870 .with_sink(Arc::new(sink));
1871 self.audit_logger = Some(Arc::new(logger));
1872 self
1873 }
1874
1875 #[must_use]
1898 pub fn policy<R, P>(mut self, policy: P) -> Self
1899 where
1900 R: Send + Sync + 'static,
1901 P: crate::authorization::Policy<R>,
1902 {
1903 self.policy_registrations.push(Box::new(move |registry| {
1904 registry.register_policy::<R, _>(policy);
1905 }));
1906 self
1907 }
1908
1909 #[must_use]
1917 pub fn scope<R, S>(mut self, scope: S) -> Self
1918 where
1919 R: Send + Sync + 'static,
1920 S: crate::authorization::Scope<R>,
1921 {
1922 self.policy_registrations.push(Box::new(move |registry| {
1923 registry.register_scope::<R, _>(scope);
1924 }));
1925 self
1926 }
1927
1928 #[must_use]
1936 #[track_caller]
1937 pub fn plugin<P>(mut self, plugin: P) -> Self
1938 where
1939 P: crate::plugin::Plugin,
1940 {
1941 let name = plugin.name();
1942 if self.registered_plugins.contains(name.as_ref()) {
1943 tracing::warn!(
1944 plugin = name.as_ref(),
1945 "plugin already registered; skipping duplicate"
1946 );
1947 return self;
1948 }
1949 let name_str = name.into_owned();
1950 self.registered_plugins.insert(name_str.clone());
1951 let outer_plugin = self.current_plugin.replace(name_str);
1954 let mut result = plugin.build(self);
1955 result.current_plugin = outer_plugin;
1956 result
1957 }
1958
1959 #[must_use]
1962 pub fn plugins<P>(self, plugins: P) -> Self
1963 where
1964 P: crate::plugin::Plugins,
1965 {
1966 plugins.apply(self)
1967 }
1968
1969 #[must_use]
1972 pub fn has_plugin(&self, name: &str) -> bool {
1973 self.registered_plugins.contains(name)
1974 }
1975
1976 #[must_use]
2012 pub fn metrics_source(
2013 mut self,
2014 name: impl Into<String>,
2015 source: Arc<dyn crate::actuator::MetricsSource>,
2016 ) -> Self {
2017 let name = name.into();
2018 if self.metrics_sources.iter().any(|(n, _)| n == &name) {
2019 tracing::warn!(
2020 source_name = %name,
2021 "MetricsSource '{}' is already registered; skipping duplicate",
2022 name
2023 );
2024 return self;
2025 }
2026 self.metrics_sources.push((name, source));
2027 self
2028 }
2029
2030 #[must_use]
2054 pub fn health_indicator(
2055 mut self,
2056 name: impl Into<String>,
2057 indicator: Arc<dyn crate::actuator::HealthIndicator>,
2058 ) -> Self {
2059 let name = name.into();
2060 #[cfg(feature = "db")]
2065 if name == "db" {
2066 tracing::warn!(
2067 indicator_name = %name,
2068 "\"db\" is a reserved built-in health indicator name; registration skipped. \
2069 Use a different name for your custom indicator."
2070 );
2071 return self;
2072 }
2073 if self.health_indicators.iter().any(|(n, _, _)| n == &name) {
2074 tracing::warn!(
2075 indicator_name = %name,
2076 "HealthIndicator '{}' is already registered; skipping duplicate",
2077 name
2078 );
2079 return self;
2080 }
2081 let group = indicator.group();
2082 self.health_indicators.push((name, group, indicator));
2083 self
2084 }
2085
2086 #[cfg(feature = "db")]
2113 #[must_use]
2114 pub fn migrations(mut self, migrations: migrate::EmbeddedMigrations) -> Self {
2115 self.migrations.push(migrations);
2116 self
2117 }
2118
2119 #[allow(clippy::too_many_lines)]
2139 #[allow(clippy::cognitive_complexity)]
2140 pub async fn run(self) {
2141 if is_static_build_mode() {
2145 self.run_build_mode().await;
2146 return;
2147 }
2148
2149 if is_dump_routes_mode() {
2154 self.run_dump_routes_mode().await;
2155 return;
2156 }
2157
2158 if is_list_one_off_tasks_mode() {
2159 self.run_list_one_off_tasks_mode();
2160 return;
2161 }
2162
2163 if let Some(task_name) = one_off_task_name_from_env() {
2164 self.run_one_off_task_mode(task_name).await;
2165 return;
2166 }
2167
2168 let Self {
2169 routes,
2170 api_versions,
2171 route_sources: _,
2172 current_plugin: _,
2173 tasks,
2174 one_off_tasks: _,
2175 jobs,
2176 static_metas,
2177 exception_filters,
2178 scoped_groups,
2179 merge_routers,
2180 nest_routers,
2181 custom_layers,
2182 startup_hooks,
2183 state_initializers,
2184 shutdown_hooks,
2185 extensions: _,
2186 registered_plugins: _,
2187 error_page_renderer,
2188 #[cfg(feature = "db")]
2189 migrations,
2190 config_loader_factory,
2191 #[cfg(feature = "db")]
2192 pool_provider_factory,
2193 telemetry_provider,
2194 session_store,
2195 #[cfg(feature = "ws")]
2196 channels_backend,
2197 #[cfg(feature = "storage")]
2198 blob_store,
2199 cache_backend,
2200 #[cfg(feature = "reporting")]
2201 error_reporters,
2202 #[cfg(feature = "openapi")]
2203 openapi,
2204 #[cfg(feature = "mcp")]
2205 mcp,
2206 audit_logger,
2207 #[cfg(feature = "i18n")]
2208 i18n_bundle,
2209 #[cfg(feature = "i18n")]
2210 i18n_auto_load,
2211 policy_registrations,
2212 #[cfg(feature = "mail")]
2213 mail_delivery_queue_factory,
2214 #[cfg(feature = "mail")]
2215 mail_previews,
2216 declared_routes: _,
2217 idempotency_enabled,
2218 #[cfg(feature = "mail")]
2219 mail_interceptor,
2220 job_interceptor,
2221 #[cfg(feature = "db")]
2222 db_interceptor,
2223 #[cfg(feature = "ws")]
2224 channels_interceptor,
2225 #[cfg(feature = "oauth2")]
2226 http_interceptor,
2227 seo_sources,
2228 metrics_sources,
2229 health_indicators,
2230 #[cfg(feature = "inbound-mail")]
2231 inbound_mail_router,
2232 } = self;
2233
2234 let all_routes = routes;
2235
2236 let (mut config, telemetry_guard) =
2238 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
2239
2240 if idempotency_enabled {
2246 let env_disabled = std::env::var("AUTUMN_IDEMPOTENCY__ENABLED")
2247 .is_ok_and(|v| matches!(v.to_lowercase().as_str(), "false" | "0" | "no" | "off"));
2248 if !env_disabled && config.idempotency.enabled != Some(false) {
2251 config.idempotency.enabled = Some(true);
2252 }
2253 }
2254
2255 #[cfg(feature = "i18n")]
2256 let i18n_bundle =
2257 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
2258
2259 assert!(
2261 !all_routes.is_empty(),
2262 "No routes registered. Did you forget to call .routes()?"
2263 );
2264
2265 let profile_display = config.profile.as_deref().unwrap_or("none");
2267 tracing::info!(
2268 version = env!("CARGO_PKG_VERSION"),
2269 profile = profile_display,
2270 "Autumn starting"
2271 );
2272
2273 let show_config = std::env::var("AUTUMN_SHOW_CONFIG").as_deref() == Ok("1");
2275 if show_config {
2276 log_startup_transparency(&all_routes, &tasks, &scoped_groups, &config);
2277 }
2278
2279 fail_fast_on_invalid_session_config(&config, session_store.is_some());
2283
2284 fail_fast_on_invalid_signing_secret(&config);
2287 fail_fast_on_missing_encryption_keys(&config);
2288 fail_fast_on_invalid_trusted_hosts(&config);
2289
2290 fail_fast_on_invalid_webhook_config(&config);
2294
2295 fail_fast_on_invalid_idempotency_config(&config);
2297
2298 #[cfg(feature = "storage")]
2307 let storage_bootstrap = blob_store.map_or_else(
2308 || preflight_storage(&config),
2309 |store| {
2310 Some(StorageBootstrap {
2311 store,
2312 serving: None,
2313 })
2314 },
2315 );
2316
2317 #[cfg(feature = "db")]
2319 let database = setup_database(
2320 &config,
2321 migrations,
2322 pool_provider_factory,
2323 RepositoryCommitHookQueueMigrationMode::Runtime,
2324 )
2325 .await
2326 .unwrap_or_else(|e| {
2327 tracing::error!("{e}");
2328 std::process::exit(1);
2329 });
2330 #[cfg(feature = "db")]
2331 let pool = database.topology;
2332 #[cfg(feature = "db")]
2333 let replica_readiness = database.replica_readiness;
2334 #[cfg(feature = "db")]
2335 let replica_migration_check = database.replica_migration_check;
2336
2337 #[cfg(feature = "db")]
2338 if pool.is_some() {
2339 tracing::info!(
2340 primary_max_connections = config.database.effective_primary_pool_size(),
2341 replica_configured = config.database.replica_url.is_some(),
2342 replica_max_connections = config.database.effective_replica_pool_size(),
2343 "Database topology configured"
2344 );
2345 } else {
2346 tracing::info!("Database not configured");
2347 }
2348
2349 validate_repository_api_policies(&all_routes, &scoped_groups, &config);
2357
2358 let mut state = build_state(
2360 &config,
2361 #[cfg(feature = "db")]
2362 pool.as_ref(),
2363 #[cfg(feature = "ws")]
2364 channels_backend,
2365 );
2366
2367 if let Some(buf) = telemetry_guard.log_buffer.clone() {
2370 state.insert_extension(buf);
2371 }
2372
2373 let maintenance_state = crate::maintenance::MaintenanceState::new();
2375 let flag_path = std::path::Path::new(crate::maintenance::MAINTENANCE_FLAG_FILE);
2376 if let Ok(Some(cfg)) = crate::maintenance::MaintenanceState::load_from_file(flag_path) {
2377 maintenance_state.enable(cfg);
2378 }
2379 state.insert_extension(maintenance_state.clone());
2380
2381 let poller_state = maintenance_state.clone();
2382 tokio::spawn(async move {
2383 let path = std::path::Path::new(crate::maintenance::MAINTENANCE_FLAG_FILE);
2384 let interval = std::time::Duration::from_millis(500);
2385 loop {
2386 let load_res = tokio::task::spawn_blocking(move || {
2387 crate::maintenance::MaintenanceState::load_from_file(path)
2388 })
2389 .await;
2390
2391 match load_res {
2392 Ok(Ok(Some(cfg))) => {
2393 if poller_state.get() != Some(cfg.clone()) {
2394 poller_state.enable(cfg);
2395 }
2396 }
2397 Ok(Ok(None)) => {
2398 if poller_state.is_active() {
2399 poller_state.disable();
2400 }
2401 }
2402 Ok(Err(e)) => {
2403 tracing::error!(error = %e, "failed to load maintenance flag file");
2404 }
2405 Err(e) => {
2406 tracing::error!(error = %e, "maintenance poller task panicked");
2407 }
2408 }
2409 tokio::time::sleep(interval).await;
2410 }
2411 });
2412
2413 let canary_state = crate::canary::CanaryState::from_env();
2417 if canary_state.is_canary() {
2418 tracing::info!(
2419 version = canary_state.version(),
2420 "canary: replica labelled as canary cohort"
2421 );
2422 }
2423 state.insert_extension(canary_state);
2424
2425 if crate::canary::CanaryState::rollback_flag_present(std::path::Path::new(
2430 crate::canary::CANARY_ROLLBACK_FLAG_FILE,
2431 )) {
2432 tracing::warn!(
2433 "canary: rollback flag present at startup; /ready will report draining until \
2434 the flag is cleared (`autumn canary promote`)"
2435 );
2436 state.begin_shutdown();
2437 }
2438
2439 #[cfg(feature = "mail")]
2440 if let Some(interceptor) = mail_interceptor {
2441 state.insert_extension(interceptor);
2442 }
2443 if let Some(interceptor) = job_interceptor {
2444 state.insert_extension(interceptor);
2445 }
2446 #[cfg(feature = "db")]
2447 if let Some(interceptor) = db_interceptor {
2448 state.insert_extension(interceptor);
2449 }
2450 #[cfg(feature = "ws")]
2451 if let Some(interceptor) = channels_interceptor {
2452 state.insert_extension(interceptor.clone());
2453 state.channels = crate::channels::Channels::with_shared_backend(std::sync::Arc::new(
2454 crate::channels::InterceptedChannelsBackend::new(
2455 state.channels.backend().clone(),
2456 vec![interceptor],
2457 ),
2458 ));
2459 #[cfg(feature = "presence")]
2460 {
2461 state.presence = crate::presence::Presence::new(state.channels.clone());
2462 }
2463 }
2464 #[cfg(feature = "oauth2")]
2465 if let Some(interceptor) = http_interceptor {
2466 state.insert_extension(interceptor);
2467 }
2468
2469 for (name, source) in metrics_sources {
2473 if let Err(e) = state.metrics_source_registry.register(name, source) {
2474 tracing::warn!("{e}");
2475 }
2476 }
2477
2478 for (name, group, indicator) in health_indicators {
2480 if let Err(e) = state
2481 .health_indicator_registry
2482 .register(name, group, indicator)
2483 {
2484 tracing::warn!("{e}");
2485 }
2486 }
2487
2488 #[cfg(feature = "db")]
2489 configure_replica_migration_check(&state, replica_migration_check);
2490 #[cfg(feature = "db")]
2491 apply_replica_migration_readiness(&state, replica_readiness);
2492 if let Some(cache) = cache_backend {
2493 crate::cache::set_global_cache(cache.clone());
2494 state.shared_cache = Some(cache);
2495 } else {
2496 crate::cache::clear_global_cache();
2497 }
2498 state.insert_extension(RegisteredApiVersions(api_versions));
2499
2500 #[cfg(feature = "reporting")]
2504 if !error_reporters.is_empty() {
2505 state.insert_extension(crate::reporting::RegisteredReporters(error_reporters));
2506 }
2507 for register in policy_registrations {
2512 register(state.policy_registry());
2513 }
2514 validate_repository_policies_registered(&all_routes, &scoped_groups, &state, &config);
2520 #[cfg(feature = "mail")]
2521 crate::mail::install_mailer_with_factory(
2522 &state,
2523 &config.mail,
2524 mail_delivery_queue_factory,
2525 true,
2526 )
2527 .unwrap_or_else(|error| {
2528 tracing::error!(error = %error, "Failed to configure mailer");
2529 std::process::exit(1);
2530 });
2531 #[cfg(feature = "mail")]
2532 state.insert_extension(crate::mail::MailPreviewRegistry::new(mail_previews));
2533 if let Some(logger) = audit_logger {
2534 state.insert_extension::<crate::audit::AuditLogger>((*logger).clone());
2535 }
2536 #[cfg(feature = "i18n")]
2537 let custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
2538
2539 #[cfg(feature = "storage")]
2543 let storage_router = storage_bootstrap.and_then(|b| b.install(&state));
2544 install_webhook_registry(&state, &config);
2545 run_state_initializers(state_initializers, &state);
2546
2547 let env = crate::config::OsEnv;
2548 let dist_dir = project_dir("dist", &env);
2549 let dist_ref = if dist_dir.exists() {
2550 Some(dist_dir.as_path())
2551 } else {
2552 None
2553 };
2554 #[cfg_attr(
2555 not(any(feature = "storage", feature = "inbound-mail")),
2556 allow(unused_mut)
2557 )]
2558 let mut merge_routers = merge_routers;
2559 #[cfg(feature = "storage")]
2560 if let Some(router) = storage_router {
2561 merge_routers.push(router);
2562 }
2563
2564 if !seo_sources.is_empty() || crate::seo::has_seo_config(&config.seo) {
2567 let seo_cfg = &config.seo;
2568 let raw_profile = config.profile.as_deref().unwrap_or("dev");
2569 let profile = crate::seo::effective_seo_profile(raw_profile, seo_cfg.robots.allow_all);
2570 let static_paths: Vec<&str> = static_metas.iter().map(|m| m.path).collect();
2571 let (robots_body, sitemap_body) = crate::seo::assemble_seo_bodies(
2572 profile,
2573 seo_cfg.base_url.as_deref(),
2574 seo_cfg.robots.sitemap_url.as_deref(),
2575 &seo_cfg.robots.additional_rules,
2576 &seo_sources,
2577 &static_paths,
2578 )
2579 .await;
2580 let seo_router = crate::seo::build_seo_router_from_bodies(robots_body, sitemap_body);
2581 let is_seo_path = |p: &str| p == "/robots.txt" || p == "/sitemap.xml";
2582 let seo_collision = all_routes.iter().any(|r| is_seo_path(r.path))
2583 || static_metas.iter().any(|m| is_seo_path(m.path))
2584 || scoped_groups.iter().any(|g| {
2585 let prefix = g.prefix.trim_end_matches('/');
2586 g.routes
2587 .iter()
2588 .any(|r| is_seo_path(&format!("{prefix}{}", r.path)))
2589 });
2590 if seo_collision {
2591 tracing::warn!(
2592 "seo: /robots.txt or /sitemap.xml is already registered by the application; \
2593 skipping automatic SEO routes to prevent a startup panic"
2594 );
2595 } else {
2596 merge_routers.push(seo_router);
2597 }
2598 }
2599
2600 #[cfg(feature = "inbound-mail")]
2601 if let Some(ref im_router) = inbound_mail_router {
2602 let mut registered_inbound: std::collections::HashSet<String> =
2603 std::collections::HashSet::new();
2604 for (path, axum_router) in crate::inbound_mail::build_routes(im_router) {
2605 if all_routes
2610 .iter()
2611 .any(|r| r.method == http::Method::POST && r.path == path)
2612 || scoped_groups.iter().any(|g| {
2613 g.routes.iter().any(|r| {
2614 r.method == http::Method::POST
2615 && crate::router::join_nested_path(&g.prefix, r.path)
2616 == path.as_str()
2617 })
2618 })
2619 || nest_routers.iter().any(|(nest_path, _)| {
2620 let p = nest_path.as_str();
2621 path.as_str() == p
2622 || path.starts_with(p)
2623 && (p.ends_with('/') || path.as_bytes().get(p.len()) == Some(&b'/'))
2624 })
2625 {
2626 tracing::warn!(
2627 path = %path,
2628 "inbound_mail: skipping webhook route — a POST handler is \
2629 already registered at this path by the application"
2630 );
2631 continue;
2632 }
2633 if !registered_inbound.insert(path.clone()) {
2636 tracing::warn!(
2637 path = %path,
2638 "inbound_mail: skipping duplicate inbound webhook path"
2639 );
2640 continue;
2641 }
2642 config.security.csrf.exempt_paths.push(path.clone());
2646 config.security.captcha_exempt_paths.push(path);
2647 merge_routers.push(axum_router);
2648 }
2649 }
2650 let router = crate::router::try_build_router_with_static_inner(
2651 all_routes,
2652 &config,
2653 state.clone(),
2654 dist_ref,
2655 crate::router::RouterContext {
2656 exception_filters,
2657 scoped_groups,
2658 merge_routers,
2659 nest_routers,
2660 custom_layers,
2661 error_page_renderer,
2662 session_store,
2663 #[cfg(feature = "openapi")]
2666 openapi: if config.openapi_runtime.enabled {
2667 openapi
2668 } else {
2669 None
2670 },
2671 #[cfg(feature = "mcp")]
2672 mcp,
2673 },
2674 )
2675 .unwrap_or_else(|error| {
2676 tracing::error!(error = %error, "Failed to build router");
2677 std::process::exit(1);
2678 });
2679
2680 let addr = format!("{}:{}", config.server.host, config.server.port);
2684 let listener = tokio::net::TcpListener::bind(&addr)
2685 .await
2686 .unwrap_or_else(|e| {
2687 tracing::error!(addr = %addr, "Failed to bind: {e}");
2688 std::process::exit(1);
2689 });
2690
2691 let shutdown_timeout = config.server.shutdown_timeout_secs;
2692 let prestop_grace = config.server.prestop_grace_secs;
2693 let server_shutdown = tokio_util::sync::CancellationToken::new();
2694
2695 if let Err(error) = initialize_job_runtime(jobs, &state, &server_shutdown, &config.jobs) {
2696 tracing::error!(error = %error, "job runtime initialization failed");
2697 std::process::exit(1);
2698 }
2699
2700 #[cfg(feature = "db")]
2701 if let Some(pool) = state.pool().cloned() {
2702 crate::repository_commit_hooks::start_repository_commit_hook_worker(
2703 pool,
2704 server_shutdown.child_token(),
2705 );
2706 }
2707
2708 #[cfg(feature = "presence")]
2709 {
2710 let presence = state.presence().clone();
2711 let sweep_shutdown = server_shutdown.child_token();
2712 tokio::spawn(async move {
2713 let interval = std::time::Duration::from_secs(15);
2714 loop {
2715 tokio::select! {
2716 () = tokio::time::sleep(interval) => {
2717 presence.sweep_expired();
2718 }
2719 () = sweep_shutdown.cancelled() => break,
2720 }
2721 }
2722 });
2723 }
2724
2725 tracing::info!(addr = %addr, "Listening");
2726
2727 let server_shutdown_wait = server_shutdown.clone();
2728 let after_method = tower::Layer::layer(
2740 &crate::middleware::MethodOverrideLayer::new()
2741 .with_max_scan_bytes(config.security.upload.max_request_size_bytes),
2742 router,
2743 );
2744 let service = tower::Layer::layer(
2745 &crate::security::TrustedProxiesLayer::from_config(&config.security.trusted_proxies),
2746 after_method,
2747 );
2748 let make_service =
2749 axum::ServiceExt::<axum::extract::Request>::into_make_service_with_connect_info::<
2750 std::net::SocketAddr,
2751 >(service);
2752 let server_task = tokio::spawn(async move {
2753 axum::serve(listener, make_service)
2754 .with_graceful_shutdown(async move {
2755 server_shutdown_wait.cancelled().await;
2756 })
2757 .await
2758 });
2759
2760 let shutdown_state = state.clone();
2761 let shutdown_signal_token = server_shutdown.clone();
2762 #[cfg(feature = "ws")]
2763 let websocket_shutdown = state.shutdown.clone();
2764 let shutdown_metrics = state.metrics.clone();
2766
2767 let drain_started_at: std::sync::Arc<std::sync::OnceLock<std::time::Instant>> =
2771 std::sync::Arc::new(std::sync::OnceLock::new());
2772 let drain_started_clone = std::sync::Arc::clone(&drain_started_at);
2773
2774 let drain_phase_notify = std::sync::Arc::new(tokio::sync::Notify::new());
2778 let drain_phase_notify_for_watchdog = std::sync::Arc::clone(&drain_phase_notify);
2779 let server_entered_drain = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
2782 let server_entered_drain_for_watchdog = std::sync::Arc::clone(&server_entered_drain);
2783
2784 let shutdown_task = tokio::spawn(async move {
2801 shutdown_signal().await;
2803 tracing::info!(
2804 phase = "signal_received",
2805 prestop_grace_secs = prestop_grace,
2806 shutdown_timeout_secs = shutdown_timeout,
2807 "shutdown: graceful shutdown initiated"
2808 );
2809
2810 shutdown_state.begin_shutdown();
2812 tracing::info!(phase = "ready_draining", "shutdown: /ready now 503");
2813
2814 if prestop_grace > 0 {
2816 tokio::time::sleep(std::time::Duration::from_secs(prestop_grace)).await;
2817 }
2818 tracing::info!(phase = "listener_stopping", "shutdown: stopping listener");
2819
2820 #[cfg(feature = "ws")]
2822 websocket_shutdown.cancel();
2823
2824 let _ = drain_started_clone.set(std::time::Instant::now());
2828 shutdown_signal_token.cancel();
2829
2830 if !server_entered_drain_for_watchdog.load(std::sync::atomic::Ordering::Acquire) {
2844 tracing::warn!(
2845 phase = "signal_during_startup",
2846 "shutdown: SIGTERM during startup hooks; waiting for drain phase \
2847 to begin before enforcing the drain deadline"
2848 );
2849 drain_phase_notify_for_watchdog.notified().await;
2853 }
2854 tokio::time::sleep(std::time::Duration::from_secs(shutdown_timeout)).await;
2855 if shutdown_metrics.snapshot().http.requests_active == 0 {
2860 return;
2861 }
2862 let aborted = shutdown_metrics.snapshot().http.requests_active;
2863 shutdown_metrics.record_shutdown_aborted(aborted);
2864 tracing::error!(
2865 phase = "in_flight_drain",
2866 timeout_secs = shutdown_timeout,
2867 autumn_shutdown_aborted_requests_total = aborted,
2868 exit_code = 1,
2869 "shutdown: in_flight_drain phase exceeded deadline; terminating"
2870 );
2871 std::process::exit(1);
2872 });
2873
2874 if let Err(error) = run_startup_hooks(&startup_hooks, state.clone()).await {
2875 tracing::error!(error = %error, "startup hook failed");
2876 server_shutdown.cancel();
2877 server_task.abort();
2878 std::process::exit(1);
2879 }
2880
2881 if !state.probes().is_shutting_down() {
2882 if !tasks.is_empty() {
2883 let res = start_task_scheduler_with_config(
2884 tasks,
2885 &state,
2886 &server_shutdown,
2887 &config.scheduler,
2888 );
2889 if let Err(err) = res {
2890 tracing::error!(error = %err, "scheduled task runtime initialization failed");
2891 server_shutdown.cancel();
2892 server_task.abort();
2893 std::process::exit(1);
2894 }
2895 }
2896 state.probes().mark_startup_complete();
2897 }
2898
2899 server_entered_drain.store(true, std::sync::atomic::Ordering::Release);
2904 drain_phase_notify.notify_one();
2905
2906 let server_result = server_task.await.unwrap_or_else(|e| {
2909 tracing::error!("Server task join error: {e}");
2910 std::process::exit(1);
2911 });
2912 shutdown_task.abort();
2914 server_result.unwrap_or_else(|e| {
2915 tracing::error!("Server error: {e}");
2916 std::process::exit(1);
2917 });
2918
2919 let drain_elapsed = drain_started_at
2924 .get()
2925 .map_or(std::time::Duration::ZERO, std::time::Instant::elapsed);
2926 let hook_budget =
2927 std::time::Duration::from_secs(shutdown_timeout).saturating_sub(drain_elapsed);
2928 run_shutdown_hooks_with_timeout(&shutdown_hooks, hook_budget, hook_budget).await;
2929
2930 tracing::info!(exit_code = 0, "shutdown: all phases completed cleanly");
2931 }
2932
2933 #[allow(clippy::too_many_lines)]
2939 async fn run_build_mode(self) {
2940 let Self {
2941 routes,
2942 api_versions,
2943 route_sources: _,
2944 current_plugin: _,
2945 tasks: _,
2946 one_off_tasks: _,
2947 jobs: _,
2948 static_metas,
2949 exception_filters: _,
2950 scoped_groups,
2951 merge_routers: _,
2952 nest_routers: _,
2953 custom_layers,
2954 startup_hooks: _,
2955 state_initializers,
2956 shutdown_hooks: _,
2957 extensions: _,
2958 registered_plugins: _,
2959 error_page_renderer: _,
2960 #[cfg(feature = "db")]
2961 migrations: _,
2962 config_loader_factory,
2963 #[cfg(feature = "db")]
2964 pool_provider_factory,
2965 telemetry_provider,
2966 session_store,
2967 #[cfg(feature = "ws")]
2968 channels_backend,
2969 #[cfg(feature = "storage")]
2970 blob_store,
2971 cache_backend,
2972 #[cfg(feature = "reporting")]
2973 error_reporters,
2974 #[cfg(feature = "openapi")]
2975 openapi,
2976 #[cfg(feature = "mcp")]
2977 mcp: _,
2978 audit_logger: _,
2979 #[cfg(feature = "i18n")]
2980 i18n_bundle,
2981 #[cfg(feature = "i18n")]
2982 i18n_auto_load,
2983 policy_registrations,
2984 #[cfg(feature = "mail")]
2985 mail_delivery_queue_factory,
2986 #[cfg(feature = "mail")]
2987 mail_previews,
2988 declared_routes: _,
2989 idempotency_enabled,
2990 #[cfg(feature = "mail")]
2991 mail_interceptor,
2992 job_interceptor,
2993 #[cfg(feature = "db")]
2994 db_interceptor,
2995 #[cfg(feature = "ws")]
2996 channels_interceptor,
2997 #[cfg(feature = "oauth2")]
2998 http_interceptor,
2999 seo_sources,
3000 metrics_sources,
3001 health_indicators,
3002 #[cfg(feature = "inbound-mail")]
3003 inbound_mail_router: _,
3004 } = self;
3005
3006 let _ = &api_versions;
3007 let _ = &metrics_sources;
3008 let _ = &health_indicators;
3009 let all_routes = routes;
3010
3011 let (mut config, telemetry_guard) =
3013 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
3014 if idempotency_enabled {
3015 let env_disabled = std::env::var("AUTUMN_IDEMPOTENCY__ENABLED")
3016 .is_ok_and(|v| matches!(v.to_lowercase().as_str(), "false" | "0" | "no" | "off"));
3017 if !env_disabled && config.idempotency.enabled != Some(false) {
3020 config.idempotency.enabled = Some(true);
3021 }
3022 }
3023
3024 #[cfg(feature = "i18n")]
3025 let i18n_bundle =
3026 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
3027
3028 #[cfg(feature = "openapi")]
3032 let api_docs_snapshot: Vec<crate::openapi::ApiDoc> = {
3033 let mut docs: Vec<crate::openapi::ApiDoc> = all_routes
3034 .iter()
3035 .map(|r| {
3036 let mut doc = r.api_doc.clone();
3037 doc.api_version = r.api_version;
3038 doc.sunset_opt_out = r.sunset_opt_out;
3039 doc
3040 })
3041 .collect();
3042 for group in &scoped_groups {
3043 let prefix_params = crate::router::extract_path_params(&group.prefix);
3047 for route in &group.routes {
3048 let mut doc = route.api_doc.clone();
3049 doc.api_version = route.api_version;
3050 doc.sunset_opt_out = route.sunset_opt_out;
3051 let full = crate::router::join_nested_path(&group.prefix, route.api_doc.path);
3052 doc.path = Box::leak(full.into_boxed_str());
3053 if !prefix_params.is_empty() {
3054 let mut merged: Vec<&'static str> = prefix_params
3055 .iter()
3056 .map(|p| &*Box::leak(p.clone().into_boxed_str()))
3057 .collect();
3058 merged.extend_from_slice(doc.path_params);
3059 doc.path_params = Box::leak(merged.into_boxed_slice());
3060 }
3061 docs.push(doc);
3062 }
3063 }
3064 docs
3065 };
3066
3067 if static_metas.is_empty() {
3068 eprintln!("No static routes registered. Nothing to build.");
3069 eprintln!("Hint: use .static_routes(static_routes![...]) on your AppBuilder.");
3070 std::process::exit(1);
3071 }
3072
3073 fail_fast_on_invalid_session_config(&config, session_store.is_some());
3077 fail_fast_on_invalid_signing_secret(&config);
3078 fail_fast_on_missing_encryption_keys(&config);
3079 fail_fast_on_invalid_trusted_hosts(&config);
3080
3081 #[cfg(feature = "storage")]
3088 let storage_bootstrap = blob_store.map_or_else(
3089 || preflight_storage(&config),
3090 |store| {
3091 Some(StorageBootstrap {
3092 store,
3093 serving: None,
3094 })
3095 },
3096 );
3097
3098 #[cfg(feature = "db")]
3100 let database = setup_database(
3101 &config,
3102 vec![],
3103 pool_provider_factory,
3104 RepositoryCommitHookQueueMigrationMode::StaticBuild,
3105 )
3106 .await
3107 .unwrap_or_else(|e| {
3108 eprintln!("{e}");
3109 std::process::exit(1);
3110 });
3111 #[cfg(feature = "db")]
3112 let pool = database.topology;
3113 #[cfg(feature = "db")]
3114 let replica_readiness = database.replica_readiness;
3115 #[cfg(feature = "db")]
3116 let replica_migration_check = database.replica_migration_check;
3117
3118 let mut state = build_state(
3119 &config,
3120 #[cfg(feature = "db")]
3121 pool.as_ref(),
3122 #[cfg(feature = "ws")]
3123 channels_backend,
3124 );
3125 if let Some(buf) = telemetry_guard.log_buffer.clone() {
3126 state.insert_extension(buf);
3127 }
3128 state.insert_extension(RegisteredApiVersions(api_versions.clone()));
3129 #[cfg(feature = "mail")]
3130 if let Some(interceptor) = mail_interceptor {
3131 state.insert_extension(interceptor);
3132 }
3133 if let Some(interceptor) = job_interceptor {
3134 state.insert_extension(interceptor);
3135 }
3136 #[cfg(feature = "db")]
3137 if let Some(interceptor) = db_interceptor {
3138 state.insert_extension(interceptor);
3139 }
3140 #[cfg(feature = "ws")]
3141 if let Some(interceptor) = channels_interceptor {
3142 state.insert_extension(interceptor.clone());
3143 state.channels = crate::channels::Channels::with_shared_backend(std::sync::Arc::new(
3144 crate::channels::InterceptedChannelsBackend::new(
3145 state.channels.backend().clone(),
3146 vec![interceptor],
3147 ),
3148 ));
3149 #[cfg(feature = "presence")]
3150 {
3151 state.presence = crate::presence::Presence::new(state.channels.clone());
3152 }
3153 }
3154 #[cfg(feature = "oauth2")]
3155 if let Some(interceptor) = http_interceptor {
3156 state.insert_extension(interceptor);
3157 }
3158 #[cfg(feature = "db")]
3159 configure_replica_migration_check(&state, replica_migration_check);
3160 #[cfg(feature = "db")]
3161 apply_replica_migration_readiness(&state, replica_readiness);
3162 if let Some(cache) = cache_backend {
3163 crate::cache::set_global_cache(cache.clone());
3164 state.shared_cache = Some(cache);
3165 } else {
3166 crate::cache::clear_global_cache();
3167 }
3168 #[cfg(feature = "reporting")]
3169 if !error_reporters.is_empty() {
3170 state.insert_extension(crate::reporting::RegisteredReporters(error_reporters));
3171 }
3172 #[cfg(feature = "mail")]
3179 crate::mail::install_mailer_with_factory(
3180 &state,
3181 &config.mail,
3182 mail_delivery_queue_factory,
3183 false,
3184 )
3185 .unwrap_or_else(|error| {
3186 eprintln!("Failed to configure mailer: {error}");
3187 std::process::exit(1);
3188 });
3189 #[cfg(feature = "mail")]
3190 state.insert_extension(crate::mail::MailPreviewRegistry::new(mail_previews));
3191 state.probes = crate::probe::ProbeState::default();
3193
3194 for register in policy_registrations {
3204 register(state.policy_registry());
3205 }
3206
3207 #[cfg(feature = "i18n")]
3208 let custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
3209
3210 #[cfg(feature = "storage")]
3214 let storage_router = storage_bootstrap.and_then(|b| b.install(&state));
3215 install_webhook_registry(&state, &config);
3216 run_state_initializers(state_initializers, &state);
3217
3218 #[cfg_attr(not(feature = "storage"), allow(unused_mut))]
3226 let mut merge_routers: Vec<axum::Router<AppState>> = Vec::new();
3227 #[cfg(feature = "storage")]
3228 if let Some(router) = storage_router {
3229 merge_routers.push(router);
3230 }
3231 let router = crate::router::try_build_router_inner(
3232 all_routes,
3233 &config,
3234 state,
3235 crate::router::RouterContext {
3236 exception_filters: Vec::new(),
3237 scoped_groups,
3238 merge_routers,
3239 nest_routers: Vec::new(),
3240 custom_layers,
3241 error_page_renderer: None,
3242 session_store,
3243 #[cfg(feature = "openapi")]
3244 openapi: None,
3245 #[cfg(feature = "mcp")]
3246 mcp: None,
3247 },
3248 )
3249 .unwrap_or_else(|error| {
3250 eprintln!("Failed to build router: {error}");
3251 std::process::exit(1);
3252 });
3253
3254 let env = crate::config::OsEnv;
3255 let dist_dir = project_dir("dist", &env);
3256
3257 eprintln!("Building {} static route(s)...", static_metas.len());
3258
3259 match crate::static_gen::render_static_routes(router, &static_metas, &dist_dir).await {
3260 Ok(()) => {
3261 eprintln!(
3262 "\n \u{2713} Static build complete \u{2192} {}",
3263 dist_dir.display()
3264 );
3265 }
3266 Err(e) => {
3267 eprintln!("\n \u{2717} Static build failed: {e}");
3268 std::process::exit(1);
3269 }
3270 }
3271
3272 #[cfg(feature = "openapi")]
3275 if let Some(mut openapi_config) = openapi {
3276 openapi_config.api_versions = api_versions;
3277 let openapi_config =
3278 openapi_config.session_cookie_name(config.session.cookie_name.clone());
3279 let docs: Vec<&crate::openapi::ApiDoc> = api_docs_snapshot.iter().collect();
3280 let spec = crate::openapi::generate_spec(&openapi_config, &docs);
3281 match crate::openapi::write_openapi_spec_to_dist(&spec, &dist_dir) {
3282 Ok(()) => {
3283 eprintln!(
3284 " \u{2713} OpenAPI spec written \u{2192} {}/openapi.json",
3285 dist_dir.display()
3286 );
3287 }
3288 Err(e) => {
3289 eprintln!(" \u{26A0} Failed to write OpenAPI spec: {e}");
3290 }
3291 }
3292 }
3293
3294 if !seo_sources.is_empty() || crate::seo::has_seo_config(&config.seo) {
3298 let seo_cfg = &config.seo;
3299 let raw_profile = config.profile.as_deref().unwrap_or("dev");
3300 let profile = crate::seo::effective_seo_profile(raw_profile, seo_cfg.robots.allow_all);
3301 let static_paths: Vec<&str> = static_metas.iter().map(|m| m.path).collect();
3302 let (robots_body, sitemap_body) = crate::seo::assemble_seo_bodies(
3303 profile,
3304 seo_cfg.base_url.as_deref(),
3305 seo_cfg.robots.sitemap_url.as_deref(),
3306 &seo_cfg.robots.additional_rules,
3307 &seo_sources,
3308 &static_paths,
3309 )
3310 .await;
3311 let robots_path = dist_dir.join("robots.txt");
3314 let sitemap_path = dist_dir.join("sitemap.xml");
3315 if robots_path.exists() {
3316 eprintln!(
3317 " \u{2713} SEO: robots.txt already present (custom static route), skipping"
3318 );
3319 } else {
3320 match tokio::fs::write(&robots_path, robots_body).await {
3321 Ok(()) => eprintln!(
3322 " \u{2713} SEO: robots.txt written \u{2192} {}",
3323 robots_path.display()
3324 ),
3325 Err(e) => eprintln!(" \u{26A0} Failed to write robots.txt: {e}"),
3326 }
3327 }
3328 if sitemap_path.exists() {
3329 eprintln!(
3330 " \u{2713} SEO: sitemap.xml already present (custom static route), skipping"
3331 );
3332 } else {
3333 match tokio::fs::write(&sitemap_path, sitemap_body).await {
3334 Ok(()) => eprintln!(
3335 " \u{2713} SEO: sitemap.xml written \u{2192} {}",
3336 sitemap_path.display()
3337 ),
3338 Err(e) => eprintln!(" \u{26A0} Failed to write sitemap.xml: {e}"),
3339 }
3340 }
3341 }
3342 }
3343
3344 async fn run_dump_routes_mode(self) {
3350 let Self {
3351 routes,
3352 api_versions,
3353 route_sources,
3354 scoped_groups,
3355 merge_routers,
3356 nest_routers,
3357 declared_routes,
3358 config_loader_factory,
3359 telemetry_provider,
3360 #[cfg(feature = "openapi")]
3361 openapi,
3362 ..
3363 } = self;
3364
3365 let registered_versions: std::collections::HashSet<&str> =
3367 api_versions.iter().map(|av| av.version.as_str()).collect();
3368
3369 for route in &routes {
3370 if let Some(ver) = route
3371 .api_version
3372 .filter(|ver| !registered_versions.contains(*ver))
3373 {
3374 eprintln!(
3375 "Failed to build router: route '{}' uses unregistered API version '{}'",
3376 route.name, ver
3377 );
3378 std::process::exit(1);
3379 }
3380 }
3381
3382 for group in &scoped_groups {
3383 for route in &group.routes {
3384 if let Some(ver) = route
3385 .api_version
3386 .filter(|ver| !registered_versions.contains(*ver))
3387 {
3388 eprintln!(
3389 "Failed to build router: route '{}' uses unregistered API version '{}'",
3390 route.name, ver
3391 );
3392 std::process::exit(1);
3393 }
3394 }
3395 }
3396
3397 let hidden = merge_routers.len() + nest_routers.len();
3401 if hidden > 0 {
3402 eprintln!(
3403 "[autumn routes] warning: {hidden} raw router(s) added via \
3404 .merge()/.nest() are not enumerable and are omitted from this listing"
3405 );
3406 }
3407
3408 let (config, _telemetry_guard) =
3409 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
3410
3411 let mut infos = match crate::route_listing::collect_route_infos(
3412 &routes,
3413 &route_sources,
3414 &scoped_groups,
3415 &api_versions,
3416 ) {
3417 Ok(infos) => infos,
3418 Err(e) => {
3419 eprintln!("Failed to build router: {e}");
3420 std::process::exit(1);
3421 }
3422 };
3423 infos.extend(declared_routes);
3424 crate::route_listing::append_framework_routes(&mut infos, &config);
3425 #[cfg(feature = "openapi")]
3426 if let Some(ref oa) = openapi {
3427 crate::route_listing::append_openapi_routes(&mut infos, oa);
3428 }
3429 crate::route_listing::append_dev_reload_routes(&mut infos);
3430 crate::route_listing::sort_route_infos(&mut infos);
3431
3432 let json = serde_json::to_string_pretty(&infos).unwrap_or_else(|e| {
3433 eprintln!("Failed to serialize route listing: {e}");
3434 std::process::exit(1);
3435 });
3436 println!("{json}");
3437 std::process::exit(0);
3438 }
3439
3440 fn run_list_one_off_tasks_mode(self) {
3444 let Self { one_off_tasks, .. } = self;
3445
3446 if let Err(error) = crate::task::validate_unique_one_off_task_names(&one_off_tasks) {
3447 eprintln!("Invalid task registration: {error}");
3448 std::process::exit(1);
3449 }
3450
3451 let listing = crate::task::list_one_off_tasks(&one_off_tasks);
3452 let json = serde_json::to_string_pretty(&listing).unwrap_or_else(|error| {
3453 eprintln!("Failed to serialize task listing: {error}");
3454 std::process::exit(1);
3455 });
3456 println!("{json}");
3457 std::process::exit(0);
3458 }
3459
3460 #[allow(clippy::too_many_lines)]
3464 #[allow(clippy::cognitive_complexity)]
3465 async fn run_one_off_task_mode(self, requested_name: String) {
3466 let Self {
3467 one_off_tasks,
3468 jobs,
3469 #[cfg(feature = "i18n")]
3470 custom_layers,
3471 #[cfg(not(feature = "i18n"))]
3472 custom_layers: _,
3473 startup_hooks,
3474 state_initializers,
3475 shutdown_hooks,
3476 config_loader_factory,
3477 #[cfg(feature = "db")]
3478 migrations,
3479 #[cfg(feature = "db")]
3480 pool_provider_factory,
3481 telemetry_provider,
3482 session_store,
3483 #[cfg(feature = "ws")]
3484 channels_backend,
3485 #[cfg(feature = "storage")]
3486 blob_store,
3487 audit_logger,
3488 #[cfg(feature = "i18n")]
3489 i18n_bundle,
3490 #[cfg(feature = "i18n")]
3491 i18n_auto_load,
3492 policy_registrations,
3493 cache_backend,
3494 #[cfg(feature = "mail")]
3495 mail_delivery_queue_factory,
3496 #[cfg(feature = "mail")]
3497 mail_interceptor,
3498 job_interceptor,
3499 #[cfg(feature = "db")]
3500 db_interceptor,
3501 #[cfg(feature = "ws")]
3502 channels_interceptor,
3503 #[cfg(feature = "oauth2")]
3504 http_interceptor,
3505 ..
3506 } = self;
3507
3508 if let Err(error) = crate::task::validate_unique_one_off_task_names(&one_off_tasks) {
3509 eprintln!("Invalid task registration: {error}");
3510 std::process::exit(1);
3511 }
3512
3513 let Some((task_name, task_handler)) = one_off_tasks
3514 .iter()
3515 .find(|task| task.name == requested_name)
3516 .map(|task| (task.name.clone(), task.handler))
3517 else {
3518 eprintln!("No one-off task named '{requested_name}' is registered.");
3519 print_available_one_off_tasks(&one_off_tasks);
3520 std::process::exit(1);
3521 };
3522
3523 let args = one_off_task_args_from_env().unwrap_or_else(|error| {
3524 eprintln!("Invalid task args: {error}");
3525 std::process::exit(1);
3526 });
3527
3528 let (config, telemetry_guard) =
3529 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
3530
3531 #[cfg(feature = "i18n")]
3532 let i18n_bundle =
3533 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
3534
3535 fail_fast_on_invalid_session_config(&config, session_store.is_some());
3536 fail_fast_on_invalid_signing_secret(&config);
3537 fail_fast_on_missing_encryption_keys(&config);
3538 fail_fast_on_invalid_trusted_hosts(&config);
3539
3540 #[cfg(feature = "storage")]
3541 let storage_bootstrap = blob_store.map_or_else(
3542 || preflight_storage(&config),
3543 |store| {
3544 Some(StorageBootstrap {
3545 store,
3546 serving: None,
3547 })
3548 },
3549 );
3550
3551 #[cfg(feature = "db")]
3552 let database = setup_database(
3553 &config,
3554 migrations,
3555 pool_provider_factory,
3556 RepositoryCommitHookQueueMigrationMode::Runtime,
3557 )
3558 .await
3559 .unwrap_or_else(|error| {
3560 eprintln!("{error}");
3561 std::process::exit(1);
3562 });
3563 #[cfg(feature = "db")]
3564 let pool = database.topology;
3565 #[cfg(feature = "db")]
3566 let replica_readiness = database.replica_readiness;
3567 #[cfg(feature = "db")]
3568 let replica_migration_check = database.replica_migration_check;
3569
3570 let mut state = build_state(
3571 &config,
3572 #[cfg(feature = "db")]
3573 pool.as_ref(),
3574 #[cfg(feature = "ws")]
3575 channels_backend,
3576 );
3577 if let Some(buf) = telemetry_guard.log_buffer.clone() {
3578 state.insert_extension(buf);
3579 }
3580 #[cfg(feature = "mail")]
3581 if let Some(interceptor) = mail_interceptor {
3582 state.insert_extension(interceptor);
3583 }
3584 if let Some(interceptor) = job_interceptor {
3585 state.insert_extension(interceptor);
3586 }
3587 #[cfg(feature = "db")]
3588 if let Some(interceptor) = db_interceptor {
3589 state.insert_extension(interceptor);
3590 }
3591 #[cfg(feature = "ws")]
3592 if let Some(interceptor) = channels_interceptor {
3593 state.insert_extension(interceptor.clone());
3594 state.channels = crate::channels::Channels::with_shared_backend(std::sync::Arc::new(
3595 crate::channels::InterceptedChannelsBackend::new(
3596 state.channels.backend().clone(),
3597 vec![interceptor],
3598 ),
3599 ));
3600 #[cfg(feature = "presence")]
3601 {
3602 state.presence = crate::presence::Presence::new(state.channels.clone());
3603 }
3604 }
3605 #[cfg(feature = "oauth2")]
3606 if let Some(interceptor) = http_interceptor {
3607 state.insert_extension(interceptor);
3608 }
3609 #[cfg(feature = "db")]
3610 configure_replica_migration_check(&state, replica_migration_check);
3611 #[cfg(feature = "db")]
3612 apply_replica_migration_readiness(&state, replica_readiness);
3613 if let Some(cache) = cache_backend {
3614 crate::cache::set_global_cache(cache.clone());
3615 state.shared_cache = Some(cache);
3616 } else {
3617 crate::cache::clear_global_cache();
3618 }
3619
3620 for register in policy_registrations {
3621 register(state.policy_registry());
3622 }
3623
3624 #[cfg(feature = "mail")]
3625 crate::mail::install_mailer_with_factory(
3626 &state,
3627 &config.mail,
3628 mail_delivery_queue_factory,
3629 true,
3630 )
3631 .unwrap_or_else(|error| {
3632 eprintln!("Failed to configure mailer: {error}");
3633 std::process::exit(1);
3634 });
3635
3636 if let Some(logger) = audit_logger {
3637 state.insert_extension::<crate::audit::AuditLogger>((*logger).clone());
3638 }
3639
3640 #[cfg(feature = "i18n")]
3641 let _custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
3642
3643 #[cfg(feature = "storage")]
3644 let _storage_router = storage_bootstrap.and_then(|bootstrap| bootstrap.install(&state));
3645 run_state_initializers(state_initializers, &state);
3646
3647 let task_shutdown = tokio_util::sync::CancellationToken::new();
3648 if let Err(error) = initialize_job_runtime(jobs, &state, &task_shutdown, &config.jobs) {
3649 eprintln!("job runtime initialization failed: {error}");
3650 std::process::exit(1);
3651 }
3652
3653 #[cfg(feature = "db")]
3654 if let Some(pool) = state.pool().cloned() {
3655 crate::repository_commit_hooks::start_repository_commit_hook_worker(
3656 pool,
3657 task_shutdown.child_token(),
3658 );
3659 }
3660
3661 if let Err(error) = run_startup_hooks(&startup_hooks, state.clone()).await {
3662 eprintln!("startup hook failed: {error}");
3663 task_shutdown.cancel();
3664 std::process::exit(1);
3665 }
3666 state.probes().mark_startup_complete();
3667
3668 tracing::info!(task = %task_name, "Running one-off task");
3669 let span = tracing::info_span!("one_off_task", task = %task_name);
3670 #[cfg(feature = "oauth2")]
3671 let result = {
3672 use crate::interceptor::{ACTIVE_HTTP_INTERCEPTORS, HttpInterceptor};
3673 let interceptors: Vec<std::sync::Arc<dyn HttpInterceptor>> = state
3674 .extension::<std::sync::Arc<dyn HttpInterceptor>>()
3675 .map(|interceptor_arc| vec![(*interceptor_arc).clone()])
3676 .unwrap_or_default();
3677 ACTIVE_HTTP_INTERCEPTORS
3678 .scope(
3679 interceptors,
3680 (task_handler)(state.clone(), args).instrument(span),
3681 )
3682 .await
3683 };
3684 #[cfg(not(feature = "oauth2"))]
3685 let result = (task_handler)(state.clone(), args).instrument(span).await;
3686
3687 task_shutdown.cancel();
3688 run_shutdown_hooks(&shutdown_hooks).await;
3689
3690 match result {
3691 Ok(()) => {
3692 tracing::info!(task = %task_name, "One-off task completed");
3693 }
3694 Err(error) => {
3695 tracing::error!(task = %task_name, error = %error, "One-off task failed");
3696 eprintln!("Task '{task_name}' failed: {error}");
3697 for cause in error.source_chain() {
3698 eprintln!("Caused by: {cause}");
3699 }
3700 std::process::exit(1);
3701 }
3702 }
3703 }
3704}
3705
3706pub(crate) fn is_static_build_mode() -> bool {
3707 std::env::var("AUTUMN_BUILD_STATIC").as_deref() == Ok("1")
3708}
3709
3710pub(crate) fn is_dump_routes_mode() -> bool {
3711 std::env::var("AUTUMN_DUMP_ROUTES").as_deref() == Ok("1")
3712}
3713
3714pub(crate) fn is_list_one_off_tasks_mode() -> bool {
3715 std::env::var("AUTUMN_LIST_TASKS").as_deref() == Ok("1")
3716}
3717
3718fn one_off_task_name_from_env() -> Option<String> {
3719 std::env::var("AUTUMN_RUN_TASK")
3720 .ok()
3721 .map(|value| value.trim().to_owned())
3722 .filter(|value| !value.is_empty())
3723}
3724
3725fn one_off_task_args_from_env() -> Result<Vec<String>, String> {
3726 match std::env::var("AUTUMN_TASK_ARGS_JSON") {
3727 Ok(raw) if !raw.trim().is_empty() => serde_json::from_str(&raw)
3728 .map_err(|error| format!("AUTUMN_TASK_ARGS_JSON must be a JSON string array: {error}")),
3729 _ => Ok(Vec::new()),
3730 }
3731}
3732
3733fn print_available_one_off_tasks(tasks: &[crate::task::OneOffTaskInfo]) {
3734 let listing = crate::task::list_one_off_tasks(tasks);
3735 if listing.is_empty() {
3736 eprintln!("No one-off tasks are registered. Add .one_off_tasks(one_off_tasks![...]).");
3737 return;
3738 }
3739
3740 eprintln!("Available tasks:");
3741 for task in listing {
3742 if task.description.is_empty() {
3743 eprintln!(" {}", task.name);
3744 } else {
3745 eprintln!(" {:<24} {}", task.name, task.description);
3746 }
3747 }
3748}
3749
3750#[allow(clippy::cast_possible_truncation)]
3757#[allow(clippy::cognitive_complexity)]
3758#[allow(dead_code)]
3759fn start_task_scheduler(
3760 tasks: Vec<crate::task::TaskInfo>,
3761 state: &AppState,
3762 shutdown: &tokio_util::sync::CancellationToken,
3763) {
3764 if let Err(error) = start_task_scheduler_with_config(
3765 tasks,
3766 state,
3767 shutdown,
3768 &crate::config::SchedulerConfig::default(),
3769 ) {
3770 tracing::error!(error = %error, "scheduled task runtime initialization failed");
3771 }
3772}
3773
3774#[allow(clippy::cast_possible_truncation)]
3775#[allow(clippy::cognitive_complexity)]
3776fn start_task_scheduler_with_config(
3777 tasks: Vec<crate::task::TaskInfo>,
3778 state: &AppState,
3779 shutdown: &tokio_util::sync::CancellationToken,
3780 scheduler_config: &crate::config::SchedulerConfig,
3781) -> crate::AutumnResult<()> {
3782 tracing::info!(count = tasks.len(), "Starting scheduled tasks");
3783 let coordinator = crate::scheduler::coordinator_from_config(scheduler_config, state)?;
3784 let lease_ttl = std::time::Duration::from_secs(scheduler_config.lease_ttl_secs);
3785 for task_info in &tasks {
3786 let schedule_desc = task_info.schedule.to_string();
3787 tracing::info!(
3788 name = %task_info.name,
3789 schedule = %schedule_desc,
3790 coordination = %task_info.coordination,
3791 scheduler_backend = coordinator.backend(),
3792 replica_id = coordinator.replica_id(),
3793 lease_ttl_secs = scheduler_config.lease_ttl_secs,
3794 "Registered task"
3795 );
3796 }
3797
3798 let mut cron_tasks: Vec<CronTaskSpec> = Vec::new();
3799
3800 for task_info in tasks {
3801 let state = state.clone();
3802 let name = task_info.name.clone();
3803 let handler = task_info.handler;
3804 let coordination = task_info.coordination;
3805 let schedule_desc = task_info.schedule.to_string();
3806 state.task_registry.register_scheduled(
3807 &name,
3808 &schedule_desc,
3809 coordination,
3810 coordinator.backend(),
3811 coordinator.replica_id(),
3812 );
3813
3814 match task_info.schedule {
3815 crate::task::Schedule::FixedDelay(delay) => {
3816 let coordinator = Arc::clone(&coordinator);
3817 let shutdown = shutdown.child_token();
3818 tokio::spawn(async move {
3819 loop {
3820 state
3821 .task_registry
3822 .record_next_run_at(&name, &format_next_task_run_after(delay));
3823 tokio::select! {
3824 () = shutdown.cancelled() => break,
3825 () = tokio::time::sleep(delay) => {
3826 execute_fixed_delay_task(
3827 name.clone(),
3828 state.clone(),
3829 handler,
3830 delay,
3831 coordination,
3832 Arc::clone(&coordinator),
3833 lease_ttl,
3834 )
3835 .await;
3836 }
3837 }
3838 }
3839 });
3840 }
3841 crate::task::Schedule::Cron {
3842 expression,
3843 timezone,
3844 } => {
3845 cron_tasks.push(CronTaskSpec {
3846 name,
3847 expression,
3848 timezone,
3849 coordination,
3850 handler,
3851 });
3852 }
3853 }
3854 }
3855
3856 run_cron_scheduler(cron_tasks, state, shutdown, &coordinator, lease_ttl);
3857
3858 Ok(())
3859}
3860
3861#[allow(unused_variables, clippy::needless_pass_by_value)]
3862fn send_ws_sys_task_msg(
3863 state: &AppState,
3864 event: &str,
3865 name: &str,
3866 extra: Vec<(&str, serde_json::Value)>,
3867) {
3868 #[cfg(feature = "ws")]
3869 {
3870 let mut msg = serde_json::json!({
3874 "event": event,
3875 "task": name,
3876 "timestamp": chrono::Utc::now().to_rfc3339(),
3877 });
3878 if let Some(map) = msg.as_object_mut() {
3879 for (k, v) in extra {
3880 map.insert(k.to_string(), v);
3881 }
3882 }
3883 let _ = state.channels().sender("sys:tasks").send(msg.to_string());
3884 }
3885}
3886
3887async fn execute_task_result(
3888 state: &AppState,
3889 handler: crate::task::TaskHandler,
3890 start: std::time::Instant,
3891 name: &str,
3892 schedule: &'static str,
3893) -> Result<u64, (u64, String)> {
3894 let task_span = tracing::info_span!(
3898 parent: None,
3899 "scheduled_task",
3900 otel.kind = "internal",
3901 task = %name,
3902 schedule = schedule,
3903 );
3904 let future = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
3905 (handler)(state.clone()).instrument(task_span)
3906 })) {
3907 Ok(future) => future,
3908 Err(panic) => {
3909 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
3910 return Err((duration_ms, format_scheduled_task_panic(panic.as_ref())));
3911 }
3912 };
3913 let result = std::panic::AssertUnwindSafe(future).catch_unwind().await;
3914 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
3915
3916 match result {
3917 Ok(Ok(())) => Ok(duration_ms),
3918 Ok(Err(e)) => Err((duration_ms, e.to_string())),
3919 Err(panic) => Err((duration_ms, format_scheduled_task_panic(panic.as_ref()))),
3920 }
3921}
3922
3923fn format_scheduled_task_panic(panic: &(dyn Any + Send)) -> String {
3924 let detail = panic
3925 .downcast_ref::<String>()
3926 .map(String::as_str)
3927 .or_else(|| panic.downcast_ref::<&'static str>().copied())
3928 .unwrap_or("non-string panic payload");
3929 format!("scheduled task handler panicked: {detail}")
3930}
3931
3932async fn execute_task_result_with_optional_lease_ttl(
3933 state: &AppState,
3934 handler: crate::task::TaskHandler,
3935 start: std::time::Instant,
3936 name: &str,
3937 schedule: &'static str,
3938 lease_ttl: Option<std::time::Duration>,
3939) -> Result<u64, (u64, String)> {
3940 let Some(lease_ttl) = lease_ttl else {
3941 return execute_task_result(state, handler, start, name, schedule).await;
3942 };
3943
3944 tokio::time::timeout(
3945 lease_ttl,
3946 execute_task_result(state, handler, start, name, schedule),
3947 )
3948 .await
3949 .unwrap_or_else(|_| {
3950 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
3951 Err((
3952 duration_ms,
3953 format!(
3954 "scheduled task exceeded lease TTL of {}s",
3955 lease_ttl.as_secs()
3956 ),
3957 ))
3958 })
3959}
3960
3961#[allow(clippy::cognitive_complexity)]
3963async fn execute_fixed_delay_task(
3964 name: String,
3965 state: AppState,
3966 handler: crate::task::TaskHandler,
3967 delay: std::time::Duration,
3968 coordination: crate::task::TaskCoordination,
3969 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
3970 lease_ttl: std::time::Duration,
3971) {
3972 let tick_key = crate::scheduler::fixed_delay_tick_key(
3973 &name,
3974 delay,
3975 crate::time::clock_unix_duration(state.clock()),
3976 );
3977 let lease = match coordinator
3978 .try_acquire(&name, &tick_key, coordination)
3979 .await
3980 {
3981 Ok(Some(lease)) => lease,
3982 Ok(None) => {
3983 tracing::debug!(task = %name, tick = %tick_key, "Scheduled task tick already claimed");
3984 return;
3985 }
3986 Err(error) => {
3987 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to acquire scheduled task lease");
3988 return;
3989 }
3990 };
3991 state
3992 .task_registry
3993 .record_leader(&name, lease.leader_id(), &tick_key);
3994 tracing::debug!(task = %name, "Running scheduled task");
3995 state.task_registry.record_start(&name);
3996
3997 send_ws_sys_task_msg(&state, "started", &name, vec![]);
3998
3999 let start = std::time::Instant::now();
4000 let lease_ttl = lease_ttl_for_run(&lease, coordination, lease_ttl);
4001 match execute_task_result_with_optional_lease_ttl(
4002 &state,
4003 handler,
4004 start,
4005 &name,
4006 "fixed_delay",
4007 lease_ttl,
4008 )
4009 .await
4010 {
4011 Ok(duration_ms) => {
4012 state.task_registry.record_success(&name, duration_ms);
4013 tracing::debug!(task = %name, "Task completed");
4014 send_ws_sys_task_msg(
4015 &state,
4016 "success",
4017 &name,
4018 vec![("duration_ms", serde_json::json!(duration_ms))],
4019 );
4020 }
4021 Err((duration_ms, error_str)) => {
4022 state
4023 .task_registry
4024 .record_failure(&name, duration_ms, &error_str);
4025 tracing::warn!(task = %name, error = %error_str, "Task failed");
4026 send_ws_sys_task_msg(
4027 &state,
4028 "failure",
4029 &name,
4030 vec![
4031 ("duration_ms", serde_json::json!(duration_ms)),
4032 ("error", serde_json::json!(error_str)),
4033 ],
4034 );
4035 }
4036 }
4037
4038 if let Err(error) = lease.release().await {
4039 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to release scheduled task lease");
4040 }
4041}
4042
4043#[allow(clippy::cognitive_complexity)]
4045async fn execute_cron_task(
4046 name: String,
4047 state: AppState,
4048 handler: crate::task::TaskHandler,
4049 coordination: crate::task::TaskCoordination,
4050 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
4051 lease_ttl: std::time::Duration,
4052 scheduled_unix_secs: u64,
4053) {
4054 let tick_key = crate::scheduler::cron_tick_key(&name, scheduled_unix_secs);
4055 let lease = match coordinator
4056 .try_acquire(&name, &tick_key, coordination)
4057 .await
4058 {
4059 Ok(Some(lease)) => lease,
4060 Ok(None) => {
4061 tracing::debug!(task = %name, tick = %tick_key, "Cron task tick already claimed");
4062 return;
4063 }
4064 Err(error) => {
4065 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to acquire cron task lease");
4066 return;
4067 }
4068 };
4069 state
4070 .task_registry
4071 .record_leader(&name, lease.leader_id(), &tick_key);
4072 tracing::debug!(task = %name, "Running cron task");
4073 state.task_registry.record_start(&name);
4074
4075 send_ws_sys_task_msg(&state, "started", &name, vec![]);
4076
4077 let start = std::time::Instant::now();
4078 let lease_ttl = lease_ttl_for_run(&lease, coordination, lease_ttl);
4079 match execute_task_result_with_optional_lease_ttl(
4080 &state, handler, start, &name, "cron", lease_ttl,
4081 )
4082 .await
4083 {
4084 Ok(duration_ms) => {
4085 state.task_registry.record_success(&name, duration_ms);
4086 tracing::debug!(task = %name, "Cron task completed");
4087 send_ws_sys_task_msg(
4088 &state,
4089 "success",
4090 &name,
4091 vec![("duration_ms", serde_json::json!(duration_ms))],
4092 );
4093 }
4094 Err((duration_ms, error_str)) => {
4095 state
4096 .task_registry
4097 .record_failure(&name, duration_ms, &error_str);
4098 tracing::warn!(task = %name, error = %error_str, "Cron task failed");
4099 send_ws_sys_task_msg(
4100 &state,
4101 "failure",
4102 &name,
4103 vec![
4104 ("duration_ms", serde_json::json!(duration_ms)),
4105 ("error", serde_json::json!(error_str)),
4106 ],
4107 );
4108 }
4109 }
4110
4111 if let Err(error) = lease.release().await {
4112 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to release cron task lease");
4113 }
4114}
4115
4116struct CronTaskSpec {
4117 name: String,
4118 expression: String,
4119 timezone: Option<String>,
4120 coordination: crate::task::TaskCoordination,
4121 handler: crate::task::TaskHandler,
4122}
4123
4124fn lease_ttl_for_run(
4125 lease: &crate::scheduler::SchedulerLease,
4126 coordination: crate::task::TaskCoordination,
4127 lease_ttl: std::time::Duration,
4128) -> Option<std::time::Duration> {
4129 (coordination == crate::task::TaskCoordination::Fleet && lease.backend() == "postgres")
4130 .then_some(lease_ttl)
4131}
4132
4133fn run_cron_scheduler(
4134 tasks: Vec<CronTaskSpec>,
4135 state: &AppState,
4136 shutdown: &tokio_util::sync::CancellationToken,
4137 coordinator: &Arc<dyn crate::scheduler::SchedulerCoordinator>,
4138 lease_ttl: std::time::Duration,
4139) {
4140 if tasks.is_empty() {
4141 return;
4142 }
4143
4144 tracing::info!(count = tasks.len(), "Cron scheduler started");
4145 for task in tasks {
4146 let state = state.clone();
4147 let coordinator = Arc::clone(coordinator);
4148 let shutdown = shutdown.child_token();
4149 tokio::spawn(async move {
4150 run_cron_task_loop(task, state, shutdown, coordinator, lease_ttl).await;
4151 });
4152 }
4153}
4154
4155#[allow(clippy::cognitive_complexity)]
4156async fn run_cron_task_loop(
4157 task: CronTaskSpec,
4158 state: AppState,
4159 shutdown: tokio_util::sync::CancellationToken,
4160 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
4161 lease_ttl: std::time::Duration,
4162) {
4163 let CronTaskSpec {
4164 name,
4165 expression,
4166 timezone,
4167 coordination,
4168 handler,
4169 } = task;
4170
4171 let cron = match expression.parse::<croner::Cron>() {
4172 Ok(cron) => cron,
4173 Err(error) => {
4174 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to create cron job");
4175 return;
4176 }
4177 };
4178 let timezone = timezone
4179 .as_deref()
4180 .and_then(|timezone| {
4181 timezone.parse::<chrono_tz::Tz>().map_or_else(
4182 |_| {
4183 tracing::warn!(task = %name, timezone = %timezone, "Unrecognized timezone; falling back to UTC");
4184 None
4185 },
4186 Some,
4187 )
4188 })
4189 .unwrap_or(chrono_tz::UTC);
4190 let mut cursor = chrono::Utc::now().with_timezone(&timezone);
4191
4192 loop {
4193 let now = chrono::Utc::now().with_timezone(&timezone);
4194 let scheduled_at = match next_cron_occurrence_after(&cron, &cursor, &now) {
4195 Ok(scheduled_at) => scheduled_at,
4196 Err(error) => {
4197 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to compute next cron tick");
4198 return;
4199 }
4200 };
4201 state.task_registry.record_next_run_at(
4202 &name,
4203 &scheduled_at.with_timezone(&chrono::Utc).to_rfc3339(),
4204 );
4205 let sleep_for = cron_sleep_duration_until(&scheduled_at);
4206 tokio::select! {
4207 () = shutdown.cancelled() => break,
4208 () = tokio::time::sleep(sleep_for) => {
4209 let woke_at = chrono::Utc::now().with_timezone(&timezone);
4210 match cron_occurrence_is_overdue(&cron, &scheduled_at, &woke_at) {
4211 Ok(true) => {
4212 tracing::warn!(
4213 task = %name,
4214 scheduled_at = %scheduled_at,
4215 woke_at = %woke_at,
4216 "Skipping overdue cron task tick"
4217 );
4218 cursor = woke_at;
4219 continue;
4220 }
4221 Ok(false) => {}
4222 Err(error) => {
4223 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to evaluate cron tick lateness");
4224 return;
4225 }
4226 }
4227 let scheduled_unix_secs = u64::try_from(scheduled_at.timestamp()).unwrap_or_default();
4228 tokio::spawn(execute_cron_task(
4229 name.clone(),
4230 state.clone(),
4231 handler,
4232 coordination,
4233 Arc::clone(&coordinator),
4234 lease_ttl,
4235 scheduled_unix_secs,
4236 ));
4237 cursor = scheduled_at;
4238 }
4239 }
4240 }
4241}
4242
4243fn format_next_task_run_after(delay: std::time::Duration) -> String {
4244 let now = chrono::Utc::now();
4245 let Ok(delay) = chrono::TimeDelta::from_std(delay) else {
4246 return now.to_rfc3339();
4247 };
4248 (now + delay).to_rfc3339()
4249}
4250
4251fn next_cron_occurrence_after<Tz: chrono::TimeZone>(
4252 cron: &croner::Cron,
4253 cursor: &chrono::DateTime<Tz>,
4254 now: &chrono::DateTime<Tz>,
4255) -> Result<chrono::DateTime<Tz>, croner::errors::CronError> {
4256 let anchor = if cursor < now { now } else { cursor };
4257 cron.find_next_occurrence(anchor, false)
4258}
4259
4260fn cron_occurrence_is_overdue<Tz: chrono::TimeZone>(
4261 cron: &croner::Cron,
4262 scheduled_at: &chrono::DateTime<Tz>,
4263 now: &chrono::DateTime<Tz>,
4264) -> Result<bool, croner::errors::CronError> {
4265 let next_after_scheduled = cron.find_next_occurrence(scheduled_at, false)?;
4266 Ok(&next_after_scheduled <= now)
4267}
4268
4269fn cron_sleep_duration_until<Tz: chrono::TimeZone>(
4270 scheduled_at: &chrono::DateTime<Tz>,
4271) -> std::time::Duration {
4272 scheduled_at
4273 .with_timezone(&chrono::Utc)
4274 .signed_duration_since(chrono::Utc::now())
4275 .to_std()
4276 .unwrap_or_default()
4277}
4278
4279async fn run_startup_hooks(hooks: &[StartupHook], state: AppState) -> crate::AutumnResult<()> {
4280 for hook in hooks {
4281 hook(state.clone()).await?;
4282 }
4283 Ok(())
4284}
4285
4286fn run_state_initializers(initializers: Vec<StateInitializer>, state: &AppState) {
4287 for initializer in initializers {
4288 initializer(state);
4289 }
4290}
4291
4292fn initialize_job_runtime(
4293 jobs: Vec<crate::job::JobInfo>,
4294 state: &AppState,
4295 shutdown: &tokio_util::sync::CancellationToken,
4296 config: &crate::config::JobConfig,
4297) -> crate::AutumnResult<()> {
4298 crate::job::clear_global_job_client();
4299 if jobs.is_empty() {
4300 Ok(())
4301 } else {
4302 crate::job::start_runtime(jobs, state, shutdown, config)
4303 }
4304}
4305
4306async fn run_shutdown_hooks(hooks: &[ShutdownHook]) {
4307 for hook in hooks.iter().rev() {
4308 hook().await;
4309 }
4310}
4311
4312async fn run_shutdown_hooks_with_timeout(
4321 hooks: &[ShutdownHook],
4322 per_hook_budget: std::time::Duration,
4323 total_budget: std::time::Duration,
4324) {
4325 let deadline = tokio::time::Instant::now() + total_budget;
4326 for hook in hooks.iter().rev() {
4327 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
4328 if remaining.is_zero() {
4329 tracing::warn!("shutdown: total hook budget exhausted; skipping remaining hooks");
4330 break;
4331 }
4332 let timeout = remaining.min(per_hook_budget);
4333 if tokio::time::timeout(timeout, hook()).await.is_err() {
4336 tracing::warn!(
4337 per_hook_budget_ms = timeout.as_millis(),
4338 "shutdown: hook overran per-hook timeout; continuing with remaining budget"
4339 );
4340 }
4341 }
4342}
4343
4344#[allow(clippy::cognitive_complexity)]
4351fn log_startup_transparency(
4352 routes: &[Route],
4353 tasks: &[crate::task::TaskInfo],
4354 scoped_groups: &[ScopedGroup],
4355 config: &AutumnConfig,
4356) {
4357 tracing::info!(
4358 "Registered routes:{}",
4359 format_route_lines(routes, scoped_groups, config)
4360 );
4361
4362 if let Some(task_lines) = format_task_lines(tasks) {
4363 tracing::info!("Scheduled tasks:{task_lines}");
4364 }
4365
4366 tracing::info!("Active middleware: {}", format_middleware_list(config));
4367
4368 tracing::info!("Configuration:{}", format_config_summary(config));
4369}
4370
4371fn fail_fast_on_invalid_session_config(config: &AutumnConfig, has_custom_session_store: bool) {
4384 if has_custom_session_store {
4385 return;
4386 }
4387 if let Err(error) = config.session.backend_plan(config.profile.as_deref()) {
4388 eprintln!("Invalid session backend config: {error}");
4389 std::process::exit(1);
4390 }
4391}
4392
4393fn fail_fast_on_missing_encryption_keys(config: &AutumnConfig) {
4403 if let Err(diagnostic) = crate::encryption::init_attribute_encryption(config.credentials()) {
4404 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
4405 if is_production {
4406 eprintln!("Attribute encryption misconfiguration: {diagnostic}");
4407 std::process::exit(1);
4408 }
4409 eprintln!(
4410 "warning: attribute encryption is not fully configured (dev): {diagnostic}\n \
4411 note: encrypted-column reads/writes will fail until keys are set; \
4412 this is a hard error in production."
4413 );
4414 }
4415}
4416
4417fn fail_fast_on_invalid_signing_secret(config: &AutumnConfig) {
4423 use crate::security::config::validate_signing_secret;
4424
4425 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
4426 let secret = config.security.signing_secret.secret.as_deref();
4427
4428 if let Err(error) = validate_signing_secret(secret, is_production) {
4429 eprintln!("Invalid signing secret configuration: {error}");
4430 eprintln!(
4431 " hint: generate a secret with `openssl rand -hex 32` and set \
4432 AUTUMN_SECURITY__SIGNING_SECRET"
4433 );
4434 std::process::exit(1);
4435 }
4436
4437 if is_production {
4440 for (i, prev) in config
4441 .security
4442 .signing_secret
4443 .previous_secrets
4444 .iter()
4445 .enumerate()
4446 {
4447 if let Err(error) = validate_signing_secret(Some(prev.as_str()), true) {
4448 eprintln!("Invalid signing secret configuration: previous_secrets[{i}]: {error}");
4449 eprintln!(
4450 " hint: every previous secret must meet the same entropy requirement \
4451 as the current secret"
4452 );
4453 std::process::exit(1);
4454 }
4455 }
4456 }
4457}
4458
4459fn fail_fast_on_invalid_webhook_config(config: &AutumnConfig) {
4460 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
4461 if let Err(error) = config.security.webhooks.validate(is_production) {
4462 eprintln!("Invalid signed webhook configuration: {error}");
4463 std::process::exit(1);
4464 }
4465}
4466
4467fn fail_fast_on_invalid_trusted_hosts(config: &AutumnConfig) {
4468 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
4469 if !is_production {
4470 return;
4471 }
4472 let hosts: Vec<String> = config
4473 .security
4474 .trusted_hosts
4475 .hosts
4476 .iter()
4477 .map(|h| h.trim().to_owned())
4478 .filter(|h| !h.is_empty())
4479 .collect();
4480 if hosts.is_empty() {
4481 eprintln!(
4482 "[security.trusted_hosts] is required in production; set hosts = [\"example.com\"] or explicit entries"
4483 );
4484 std::process::exit(1);
4485 }
4486 if hosts.iter().any(|h| h == "*") {
4487 tracing::warn!("trusted host validation disabled via wildcard '*' in production");
4488 }
4489}
4490
4491fn fail_fast_on_invalid_idempotency_config(config: &AutumnConfig) {
4492 if !config.idempotency.enabled.unwrap_or(false) {
4493 return;
4494 }
4495 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
4496 if is_production
4497 && config.idempotency.backend == crate::config::IdempotencyBackend::Memory
4498 && !config.idempotency.allow_memory_in_production
4499 {
4500 eprintln!(
4501 "The in-memory idempotency backend is not safe for multi-replica production use.\n\
4502 Set `[idempotency] backend = \"redis\"` in autumn.toml, or set \
4503 `allow_memory_in_production = true` to suppress this check."
4504 );
4505 std::process::exit(1);
4506 }
4507 #[cfg(feature = "redis")]
4508 if config.idempotency.backend == crate::config::IdempotencyBackend::Redis {
4509 let url_missing = config
4510 .idempotency
4511 .redis
4512 .url
4513 .as_deref()
4514 .is_none_or(|u| u.trim().is_empty());
4515 if url_missing {
4516 eprintln!(
4517 "Redis idempotency backend requires a connection URL.\n\
4518 Set AUTUMN_IDEMPOTENCY__REDIS__URL or `[idempotency.redis] url` in autumn.toml."
4519 );
4520 std::process::exit(1);
4521 }
4522 }
4523}
4524
4525pub(crate) fn install_webhook_registry(state: &AppState, config: &AutumnConfig) {
4526 if let Err(error) =
4527 crate::webhook::install_registry_from_config(state, &config.security.webhooks)
4528 {
4529 eprintln!("Invalid signed webhook configuration: {error}");
4530 std::process::exit(1);
4531 }
4532}
4533
4534#[cfg(feature = "storage")]
4540struct StorageBootstrap {
4541 store: crate::storage::SharedBlobStore,
4542 serving: Option<axum::Router<AppState>>,
4543}
4544
4545#[cfg(feature = "storage")]
4546impl StorageBootstrap {
4547 fn install(self, state: &AppState) -> Option<axum::Router<AppState>> {
4551 state.insert_extension::<crate::storage::BlobStoreState>(
4552 crate::storage::BlobStoreState::new(self.store),
4553 );
4554 self.serving
4555 }
4556}
4557
4558#[cfg(feature = "storage")]
4566#[allow(clippy::too_many_lines)] fn preflight_storage(config: &AutumnConfig) -> Option<StorageBootstrap> {
4568 use crate::storage::StorageBackendPlan;
4569
4570 let plan = config
4571 .storage
4572 .backend_plan(config.profile.as_deref())
4573 .unwrap_or_else(|error| {
4574 tracing::error!(%error, "invalid storage backend config; aborting startup");
4580 std::process::exit(1);
4581 });
4582
4583 match plan {
4584 StorageBackendPlan::Disabled => None,
4585 StorageBackendPlan::Local {
4586 provider_id,
4587 root,
4588 mount_path,
4589 default_url_expiry_secs,
4590 warn_in_production,
4591 } => Some(bootstrap_local_storage(
4592 config,
4593 &provider_id,
4594 &root,
4595 &mount_path,
4596 default_url_expiry_secs,
4597 warn_in_production,
4598 )),
4599 StorageBackendPlan::S3 { .. } => {
4600 tracing::error!(
4605 "storage.backend=s3 requires the `autumn-storage-s3` plugin. \
4606 Add it to your Cargo.toml, build an S3BlobStore from your config, \
4607 and call `.with_blob_store(store)` on your AppBuilder. \
4608 Aborting startup."
4609 );
4610 std::process::exit(1);
4611 }
4612 }
4613}
4614
4615#[cfg(feature = "storage")]
4616fn bootstrap_local_storage(
4617 config: &AutumnConfig,
4618 provider_id: &str,
4619 root: &std::path::Path,
4620 mount_path: &str,
4621 default_url_expiry_secs: u64,
4622 warn_in_production: bool,
4623) -> StorageBootstrap {
4624 use crate::storage::{LocalBlobStore, SharedBlobStore, local::SigningKey};
4625
4626 if warn_in_production {
4627 tracing::warn!(
4628 "prod profile is using the local-disk blob store; \
4629 bytes won't survive replica turnover. Set \
4630 storage.backend=s3 or storage.allow_local_in_production=true \
4631 to acknowledge"
4632 );
4633 }
4634
4635 let (signing_key, previous_signing_keys) = config
4640 .security
4641 .signing_secret
4642 .secret
4643 .as_deref()
4644 .filter(|s| !s.is_empty())
4645 .map_or_else(
4646 || {
4647 config
4648 .storage
4649 .local
4650 .signing_key
4651 .as_deref()
4652 .filter(|s| !s.is_empty())
4653 .map_or_else(
4654 || {
4655 if matches!(config.profile.as_deref(), Some("prod" | "production")) {
4656 tracing::warn!(
4657 "no signing secret configured in prod; blob URL signatures \
4658 won't survive a process restart. Set \
4659 AUTUMN_SECURITY__SIGNING_SECRET."
4660 );
4661 }
4662 (SigningKey::random(), vec![])
4663 },
4664 |legacy| (SigningKey::new(legacy.as_bytes().to_vec()), vec![]),
4665 )
4666 },
4667 |secret| {
4668 let current = SigningKey::new(secret.as_bytes().to_vec());
4669 let previous = config
4670 .security
4671 .signing_secret
4672 .previous_secrets
4673 .iter()
4674 .map(|s| SigningKey::new(s.as_bytes().to_vec()))
4675 .collect::<Vec<_>>();
4676 (current, previous)
4677 },
4678 );
4679
4680 let store = match LocalBlobStore::new(
4681 provider_id.to_string(),
4682 root.to_path_buf(),
4683 mount_path.to_string(),
4684 std::time::Duration::from_secs(default_url_expiry_secs),
4685 signing_key,
4686 previous_signing_keys,
4687 ) {
4688 Ok(store) => store,
4689 Err(err) => {
4690 tracing::error!(
4695 error = %err,
4696 root = %root.display(),
4697 "failed to initialize local blob store; aborting startup"
4698 );
4699 std::process::exit(1);
4700 }
4701 };
4702
4703 let serving = crate::storage::local::serve_router(&store);
4704 let arc: SharedBlobStore = std::sync::Arc::new(store);
4705
4706 tracing::info!(
4707 provider = %provider_id,
4708 root = %root.display(),
4709 mount = %mount_path,
4710 "Local blob store mounted"
4711 );
4712
4713 StorageBootstrap {
4714 store: arc,
4715 serving: Some(serving),
4716 }
4717}
4718async fn load_config_and_telemetry(
4719 config_loader: Option<ConfigLoaderFactory>,
4720 telemetry_provider: Option<Box<dyn crate::telemetry::TelemetryProvider>>,
4721) -> (AutumnConfig, crate::telemetry::TelemetryGuard) {
4722 let config = match config_loader {
4725 Some(factory) => factory().await,
4726 None => crate::config::TomlEnvConfigLoader::new().load().await,
4727 }
4728 .unwrap_or_else(|e| {
4729 eprintln!("Failed to load configuration: {e}");
4730 std::process::exit(1);
4731 });
4732
4733 let provider: Box<dyn crate::telemetry::TelemetryProvider> = telemetry_provider
4736 .unwrap_or_else(|| Box::new(crate::telemetry::TracingOtlpTelemetryProvider::new()));
4737 let telemetry_guard = provider
4738 .init(&config.log, &config.telemetry, config.profile.as_deref())
4739 .unwrap_or_else(|error| {
4740 eprintln!("Failed to initialize telemetry: {error}");
4741 std::process::exit(1);
4742 });
4743
4744 (config, telemetry_guard)
4745}
4746
4747#[cfg(feature = "i18n")]
4748fn resolve_i18n_bundle(
4749 explicit_bundle: Option<Arc<crate::i18n::Bundle>>,
4750 auto_load: bool,
4751 config: &AutumnConfig,
4752 env: &dyn crate::config::Env,
4753) -> Option<Arc<crate::i18n::Bundle>> {
4754 if explicit_bundle.is_some() {
4755 return explicit_bundle;
4756 }
4757 if !auto_load {
4758 return None;
4759 }
4760
4761 let dir = project_dir(&config.i18n.dir, env);
4762 Some(Arc::new(
4763 crate::i18n::Bundle::load_from_dir(&dir, &config.i18n)
4764 .unwrap_or_else(|e| panic!("i18n_auto: {e}")),
4765 ))
4766}
4767
4768#[cfg(feature = "i18n")]
4769fn install_i18n_bundle_layer(
4770 mut custom_layers: Vec<CustomLayerRegistration>,
4771 state: &AppState,
4772 bundle: Option<Arc<crate::i18n::Bundle>>,
4773) -> Vec<CustomLayerRegistration> {
4774 let Some(bundle) = bundle else {
4775 return custom_layers;
4776 };
4777
4778 tracing::info!(
4779 locales = ?bundle.locales(),
4780 default = bundle.default_locale(),
4781 "i18n bundle loaded"
4782 );
4783 state.insert_extension::<Arc<crate::i18n::Bundle>>(bundle.clone());
4784 let ext_layer = axum::Extension(bundle);
4788 custom_layers.push(CustomLayerRegistration {
4789 type_id: TypeId::of::<axum::Extension<Arc<crate::i18n::Bundle>>>(),
4790 type_name: std::any::type_name::<axum::Extension<Arc<crate::i18n::Bundle>>>(),
4791 apply: Box::new(move |router| router.layer(ext_layer)),
4792 });
4793 custom_layers
4794}
4795
4796#[cfg(feature = "db")]
4797struct DatabaseBootstrap {
4798 topology: Option<crate::db::DatabaseTopology>,
4799 replica_readiness: Option<crate::migrate::ReplicaMigrationReadiness>,
4800 replica_migration_check: Option<(String, String)>,
4801}
4802
4803#[cfg(feature = "db")]
4804async fn setup_database(
4805 config: &AutumnConfig,
4806 migrations: Vec<crate::migrate::EmbeddedMigrations>,
4807 pool_provider: Option<PoolProviderFactory>,
4808 hook_queue_migration_mode: RepositoryCommitHookQueueMigrationMode,
4809) -> Result<DatabaseBootstrap, String> {
4810 let migrations = migrations_with_repository_framework_migrations(
4811 migrations,
4812 crate::repository_commit_hooks::has_repository_commit_hook_descriptors(),
4813 crate::version_history::has_versioned_repository_descriptors(),
4814 hook_queue_migration_mode,
4815 );
4816 let check_replica_migrations = !migrations.is_empty();
4817 let topology = match pool_provider {
4818 Some(factory) => factory(config.database.clone()).await,
4819 None => crate::db::create_topology(&config.database),
4820 }
4821 .map_err(|e| format!("Failed to create database pool: {e}"))?;
4822
4823 if topology.is_some()
4828 && let Some(url) = config.database.effective_primary_url()
4829 {
4830 let url = url.to_owned();
4831 let profile = config.profile.clone();
4832 let auto_in_prod = config.database.auto_migrate_in_production;
4833 for mig in migrations {
4834 let url = url.clone();
4835 let profile = profile.clone();
4836 tokio::task::spawn_blocking(move || {
4839 crate::migrate::auto_migrate(&url, profile.as_deref(), auto_in_prod, mig);
4840 })
4841 .await
4842 .unwrap_or_else(|e| {
4843 tracing::error!(error = %e, "Migration task panicked");
4844 std::process::exit(1);
4845 });
4846 }
4847 }
4848
4849 let (replica_readiness, replica_migration_check) = if topology
4850 .as_ref()
4851 .is_some_and(|topology| check_replica_migrations && topology.replica().is_some())
4852 {
4853 match (
4854 config.database.effective_primary_url(),
4855 config.database.replica_url.as_deref(),
4856 ) {
4857 (Some(primary_url), Some(replica_url)) => {
4858 let primary_url = primary_url.to_owned();
4859 let replica_url = replica_url.to_owned();
4860 let readiness = crate::migrate::check_replica_migration_readiness_blocking(
4861 primary_url.clone(),
4862 replica_url.clone(),
4863 )
4864 .await;
4865 (Some(readiness), Some((primary_url, replica_url)))
4866 }
4867 _ => (None, None),
4868 }
4869 } else {
4870 (None, None)
4871 };
4872
4873 Ok(DatabaseBootstrap {
4874 topology,
4875 replica_readiness,
4876 replica_migration_check,
4877 })
4878}
4879
4880#[cfg(feature = "db")]
4881const REPOSITORY_COMMIT_HOOK_QUEUE_MIGRATION: &str =
4882 "20260515000000_create_repository_commit_hook_queue";
4883
4884#[cfg(feature = "db")]
4885const VERSION_HISTORY_MIGRATION: &str = "20260526000000_create_version_history";
4886
4887#[cfg(feature = "db")]
4888#[derive(Clone, Copy, Debug, Eq, PartialEq)]
4889enum RepositoryCommitHookQueueMigrationMode {
4890 Runtime,
4891 StaticBuild,
4892}
4893
4894#[cfg(feature = "db")]
4895fn migrations_with_repository_framework_migrations(
4896 mut migrations: Vec<crate::migrate::EmbeddedMigrations>,
4897 hook_queue_required: bool,
4898 version_history_required: bool,
4899 mode: RepositoryCommitHookQueueMigrationMode,
4900) -> Vec<crate::migrate::EmbeddedMigrations> {
4901 if hook_queue_required
4902 && mode == RepositoryCommitHookQueueMigrationMode::Runtime
4903 && !migration_sets_include(&migrations, REPOSITORY_COMMIT_HOOK_QUEUE_MIGRATION)
4904 {
4905 migrations.push(crate::repository_commit_hooks::REPOSITORY_COMMIT_HOOK_MIGRATIONS);
4906 }
4907 if version_history_required
4908 && mode == RepositoryCommitHookQueueMigrationMode::Runtime
4909 && !migration_sets_include(&migrations, VERSION_HISTORY_MIGRATION)
4910 {
4911 migrations.push(crate::version_history::VERSION_HISTORY_MIGRATIONS);
4912 }
4913 migrations
4914}
4915
4916#[cfg(feature = "db")]
4917fn migration_sets_include(
4918 migrations: &[crate::migrate::EmbeddedMigrations],
4919 migration_name: &str,
4920) -> bool {
4921 use diesel::migration::{Migration, MigrationSource as _};
4922 use diesel::pg::Pg;
4923
4924 migrations.iter().any(|source| {
4925 let Ok(source_migrations): Result<Vec<Box<dyn Migration<Pg>>>, _> = source.migrations()
4926 else {
4927 return false;
4928 };
4929
4930 source_migrations
4931 .iter()
4932 .any(|migration| migration.name().to_string() == migration_name)
4933 })
4934}
4935
4936#[cfg(feature = "db")]
4937fn apply_replica_migration_readiness(
4938 state: &AppState,
4939 readiness: Option<crate::migrate::ReplicaMigrationReadiness>,
4940) {
4941 let Some(readiness) = readiness else {
4942 return;
4943 };
4944
4945 if readiness.is_ready() {
4946 state.probes().mark_replica_migrations_ready();
4947 } else if let Some(detail) = readiness.detail() {
4948 state.probes().mark_replica_migrations_unready(detail);
4949 }
4950}
4951
4952#[cfg(feature = "db")]
4953fn configure_replica_migration_check(state: &AppState, check: Option<(String, String)>) {
4954 let Some((primary_url, replica_url)) = check else {
4955 return;
4956 };
4957
4958 state
4959 .probes()
4960 .configure_replica_migration_check(primary_url, replica_url);
4961}
4962
4963fn collect_unguarded_repository_writes(
4986 routes: &[Route],
4987 scoped_groups: &[ScopedGroup],
4988) -> Vec<(String, String)> {
4989 let mut offenders: Vec<(String, String)> = Vec::new();
4990 let mut seen: std::collections::HashSet<(&'static str, &'static str)> =
4991 std::collections::HashSet::new();
4992 let mut record_route = |route: &Route| {
4993 if let Some(meta) = route.repository
4994 && !meta.has_policy
4995 && is_mutating_method(&route.method)
4996 && seen.insert((meta.resource_type_name, meta.api_path))
4997 {
4998 offenders.push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
4999 }
5000 };
5001 for route in routes {
5002 record_route(route);
5003 }
5004 for group in scoped_groups {
5005 for route in &group.routes {
5006 record_route(route);
5007 }
5008 }
5009 offenders
5010}
5011
5012fn format_unguarded_repository_listing(offenders: &[(String, String)]) -> String {
5016 use std::fmt::Write;
5017 let mut s = String::new();
5018 let mut first = true;
5019 for (name, path) in offenders {
5020 if !first {
5021 s.push('\n');
5022 }
5023 first = false;
5024 write!(s, " - #[repository({name}, api = \"{path}\")]").unwrap();
5025 }
5026 s
5027}
5028
5029fn validate_repository_api_policies(
5030 routes: &[Route],
5031 scoped_groups: &[ScopedGroup],
5032 config: &AutumnConfig,
5033) {
5034 let profile = config.profile.as_deref().unwrap_or("default");
5035 let strict =
5036 is_production_profile(profile) && !config.security.allow_unauthorized_repository_api;
5037
5038 let offenders = collect_unguarded_repository_writes(routes, scoped_groups);
5039 if offenders.is_empty() {
5040 return;
5041 }
5042
5043 let listing = format_unguarded_repository_listing(&offenders);
5044
5045 if strict {
5046 tracing::error!(
5047 "refusing to start: the following #[repository(api = ...)] mutating endpoints have no paired `policy = ...` argument:\n{listing}\n\
5048 Add `policy = SomePolicy` to each, or set `[security] allow_unauthorized_repository_api = true` to opt out explicitly."
5049 );
5050 std::process::exit(1);
5051 } else {
5052 tracing::warn!(
5053 "the following #[repository(api = ...)] mutating endpoints have no paired `policy = ...` argument; \
5054 auto-generated POST/PUT/PATCH/DELETE handlers will accept writes from any authenticated user:\n{listing}\n\
5055 This will become a startup-time error in `prod` profile builds."
5056 );
5057 }
5058}
5059
5060type MissingRepositoryRegistration = (String, String);
5075
5076fn collect_unregistered_repository_handlers(
5086 routes: &[Route],
5087 scoped_groups: &[ScopedGroup],
5088 registry: &crate::authorization::PolicyRegistry,
5089) -> (
5090 Vec<MissingRepositoryRegistration>,
5091 Vec<MissingRepositoryRegistration>,
5092) {
5093 let mut missing_policies: Vec<(String, String)> = Vec::new();
5094 let mut missing_scopes: Vec<(String, String)> = Vec::new();
5095 let mut seen_policies: std::collections::HashSet<(&'static str, &'static str)> =
5096 std::collections::HashSet::new();
5097 let mut seen_scopes: std::collections::HashSet<(&'static str, &'static str)> =
5098 std::collections::HashSet::new();
5099 let mut record_route = |route: &Route| {
5100 if let Some(meta) = route.repository {
5101 if let Some(check) = meta.policy_check
5102 && !check(registry)
5103 && seen_policies.insert((meta.resource_type_name, meta.api_path))
5104 {
5105 missing_policies
5106 .push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
5107 }
5108 if let Some(check) = meta.scope_check
5109 && !check(registry)
5110 && seen_scopes.insert((meta.resource_type_name, meta.api_path))
5111 {
5112 missing_scopes.push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
5113 }
5114 }
5115 };
5116 for route in routes {
5117 record_route(route);
5118 }
5119 for group in scoped_groups {
5120 for route in &group.routes {
5121 record_route(route);
5122 }
5123 }
5124 (missing_policies, missing_scopes)
5125}
5126
5127fn format_missing_policy_listing(missing: &[(String, String)]) -> String {
5130 use std::fmt::Write;
5131 let mut s = String::new();
5132 let mut first = true;
5133 for (name, path) in missing {
5134 if !first {
5135 s.push('\n');
5136 }
5137 first = false;
5138 write!(s, " - #[repository({name}, api = \"{path}\", policy = ...)]: call `.policy::<{name}, _>(...)` on the app builder").unwrap();
5139 }
5140 s
5141}
5142
5143fn format_missing_scope_listing(missing: &[(String, String)]) -> String {
5146 use std::fmt::Write;
5147 let mut s = String::new();
5148 let mut first = true;
5149 for (name, path) in missing {
5150 if !first {
5151 s.push('\n');
5152 }
5153 first = false;
5154 write!(s, " - #[repository({name}, api = \"{path}\", scope = ...)]: call `.scope::<{name}, _>(...)` on the app builder").unwrap();
5155 }
5156 s
5157}
5158
5159#[allow(clippy::cognitive_complexity)]
5160fn validate_repository_policies_registered(
5161 routes: &[Route],
5162 scoped_groups: &[ScopedGroup],
5163 state: &AppState,
5164 config: &AutumnConfig,
5165) {
5166 let profile = config.profile.as_deref().unwrap_or("default");
5167 let strict = is_production_profile(profile);
5168
5169 let (missing_policies, missing_scopes) =
5170 collect_unregistered_repository_handlers(routes, scoped_groups, state.policy_registry());
5171
5172 if missing_policies.is_empty() && missing_scopes.is_empty() {
5173 return;
5174 }
5175
5176 if !missing_policies.is_empty() {
5177 let listing = format_missing_policy_listing(&missing_policies);
5178
5179 if strict {
5180 tracing::error!(
5181 "refusing to start: the following #[repository] routes declare a `policy = ...` argument, but no policy is registered for the resource type. Without registration, every protected request would fail at runtime with `500 no policy registered`:\n{listing}"
5182 );
5183 } else {
5184 tracing::warn!(
5185 "the following #[repository] routes declare `policy = ...` but no matching `.policy::<R, _>(...)` registration is on the app builder. Protected requests will 500 at runtime:\n{listing}\n\
5186 This will become a startup-time error in `prod` profile builds."
5187 );
5188 }
5189 }
5190
5191 if !missing_scopes.is_empty() {
5192 let listing = format_missing_scope_listing(&missing_scopes);
5193
5194 if strict {
5195 tracing::error!(
5196 "refusing to start: the following #[repository] routes declare a `scope = ...` argument, but no scope is registered for the resource type. Without registration, every list request would fail at runtime with `500 missing scope registration`:\n{listing}"
5197 );
5198 } else {
5199 tracing::warn!(
5200 "the following #[repository] routes declare `scope = ...` but no matching `.scope::<R, _>(...)` registration is on the app builder. List requests will 500 at runtime:\n{listing}\n\
5201 This will become a startup-time error in `prod` profile builds."
5202 );
5203 }
5204 }
5205
5206 if strict {
5207 std::process::exit(1);
5208 }
5209}
5210
5211const fn is_mutating_method(method: &http::Method) -> bool {
5212 matches!(
5213 *method,
5214 http::Method::POST | http::Method::PUT | http::Method::PATCH | http::Method::DELETE
5215 )
5216}
5217
5218fn is_production_profile(profile: &str) -> bool {
5224 matches!(profile, "prod" | "production")
5225}
5226
5227#[cfg(test)]
5228mod validate_repository_api_policies_tests {
5229 use super::*;
5230 use crate::RepositoryApiMeta;
5231
5232 fn build_route(
5233 method: http::Method,
5234 path: &'static str,
5235 meta: Option<RepositoryApiMeta>,
5236 ) -> Route {
5237 Route {
5238 method,
5239 path,
5240 handler: axum::routing::any(|| async { "" }),
5241 name: "test_route",
5242 api_doc: crate::openapi::ApiDoc::default(),
5243 repository: meta,
5244 idempotency: crate::route::RouteIdempotency::Direct,
5245 api_version: None,
5246 sunset_opt_out: false,
5247 }
5248 }
5249
5250 fn unguarded(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
5251 RepositoryApiMeta {
5252 resource_type_name: type_name,
5253 api_path: path,
5254 has_policy: false,
5255 policy_check: None,
5256 scope_check: None,
5257 }
5258 }
5259
5260 fn collect_offenders(routes: &[Route]) -> Vec<(String, String)> {
5264 collect_unguarded_repository_writes(routes, &[])
5265 }
5266
5267 #[test]
5268 fn read_only_mount_without_policy_is_not_an_offender() {
5269 let routes = vec![
5270 build_route(
5271 http::Method::GET,
5272 "/api/posts",
5273 Some(unguarded("/api/posts", "Post")),
5274 ),
5275 build_route(
5276 http::Method::GET,
5277 "/api/posts/{id}",
5278 Some(unguarded("/api/posts", "Post")),
5279 ),
5280 ];
5281 let offenders = collect_offenders(&routes);
5282 assert!(
5283 offenders.is_empty(),
5284 "read-only mounts should not trigger the unauthorized-repo guard"
5285 );
5286 }
5287
5288 #[test]
5289 fn write_mount_without_policy_is_an_offender() {
5290 let routes = vec![build_route(
5291 http::Method::POST,
5292 "/api/posts",
5293 Some(unguarded("/api/posts", "Post")),
5294 )];
5295 let offenders = collect_offenders(&routes);
5296 assert_eq!(offenders.len(), 1);
5297 assert_eq!(offenders[0].0, "Post");
5298 assert_eq!(offenders[0].1, "/api/posts");
5299 }
5300
5301 #[test]
5302 fn mixed_mount_only_dedups_one_offender_per_repository() {
5303 let routes = vec![
5304 build_route(
5305 http::Method::GET,
5306 "/api/posts",
5307 Some(unguarded("/api/posts", "Post")),
5308 ),
5309 build_route(
5310 http::Method::POST,
5311 "/api/posts",
5312 Some(unguarded("/api/posts", "Post")),
5313 ),
5314 build_route(
5315 http::Method::PUT,
5316 "/api/posts/{id}",
5317 Some(unguarded("/api/posts", "Post")),
5318 ),
5319 build_route(
5320 http::Method::DELETE,
5321 "/api/posts/{id}",
5322 Some(unguarded("/api/posts", "Post")),
5323 ),
5324 ];
5325 let offenders = collect_offenders(&routes);
5326 assert_eq!(offenders.len(), 1);
5327 }
5328
5329 #[test]
5330 fn is_mutating_method_classifies_methods() {
5331 assert!(is_mutating_method(&http::Method::POST));
5332 assert!(is_mutating_method(&http::Method::PUT));
5333 assert!(is_mutating_method(&http::Method::PATCH));
5334 assert!(is_mutating_method(&http::Method::DELETE));
5335 assert!(!is_mutating_method(&http::Method::GET));
5336 assert!(!is_mutating_method(&http::Method::HEAD));
5337 assert!(!is_mutating_method(&http::Method::OPTIONS));
5338 }
5339
5340 use crate::authorization::{Policy, PolicyRegistry};
5343
5344 #[derive(Debug, Clone, PartialEq)]
5345 struct TestPost;
5346
5347 #[derive(Default)]
5348 struct TestPostPolicy;
5349 impl Policy<TestPost> for TestPostPolicy {}
5350
5351 fn guarded_with_check(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
5352 RepositoryApiMeta {
5353 resource_type_name: type_name,
5354 api_path: path,
5355 has_policy: true,
5356 policy_check: Some(|registry: &PolicyRegistry| registry.has_policy::<TestPost>()),
5357 scope_check: None,
5358 }
5359 }
5360
5361 fn collect_missing(routes: &[Route], registry: &PolicyRegistry) -> Vec<(String, String)> {
5362 let (missing_policies, _) = collect_unregistered_repository_handlers(routes, &[], registry);
5363 missing_policies
5364 }
5365
5366 #[test]
5367 fn registry_check_flags_routes_missing_their_policy_registration() {
5368 let registry = PolicyRegistry::default();
5371 let routes = vec![build_route(
5372 http::Method::POST,
5373 "/api/posts",
5374 Some(guarded_with_check("/api/posts", "TestPost")),
5375 )];
5376 let missing = collect_missing(&routes, ®istry);
5377 assert_eq!(missing.len(), 1);
5378 assert_eq!(missing[0].0, "TestPost");
5379 assert_eq!(missing[0].1, "/api/posts");
5380 }
5381
5382 #[test]
5383 fn registry_check_passes_when_policy_is_registered() {
5384 let registry = PolicyRegistry::default();
5385 registry.register_policy::<TestPost, _>(TestPostPolicy);
5386 let routes = vec![build_route(
5387 http::Method::POST,
5388 "/api/posts",
5389 Some(guarded_with_check("/api/posts", "TestPost")),
5390 )];
5391 let missing = collect_missing(&routes, ®istry);
5392 assert!(missing.is_empty(), "policy is registered, no offenders");
5393 }
5394
5395 #[test]
5396 fn registry_check_skips_routes_without_policy_check_fn() {
5397 let registry = PolicyRegistry::default();
5402 let routes = vec![build_route(
5403 http::Method::POST,
5404 "/api/posts",
5405 Some(unguarded("/api/posts", "TestPost")),
5406 )];
5407 let missing = collect_missing(&routes, ®istry);
5408 assert!(missing.is_empty());
5409 }
5410
5411 #[test]
5412 fn registry_check_dedups_one_offender_per_repository() {
5413 let registry = PolicyRegistry::default();
5414 let routes = vec![
5415 build_route(
5416 http::Method::GET,
5417 "/api/posts",
5418 Some(guarded_with_check("/api/posts", "TestPost")),
5419 ),
5420 build_route(
5421 http::Method::POST,
5422 "/api/posts",
5423 Some(guarded_with_check("/api/posts", "TestPost")),
5424 ),
5425 build_route(
5426 http::Method::DELETE,
5427 "/api/posts/{id}",
5428 Some(guarded_with_check("/api/posts", "TestPost")),
5429 ),
5430 ];
5431 let missing = collect_missing(&routes, ®istry);
5432 assert_eq!(missing.len(), 1);
5433 }
5434
5435 use crate::authorization::{BoxFuture, PolicyContext, Scope};
5438
5439 #[derive(Default)]
5440 struct TestPostScope;
5441 impl Scope<TestPost> for TestPostScope {
5442 fn list<'a>(
5443 &'a self,
5444 _ctx: &'a PolicyContext,
5445 _conn: &'a mut diesel_async::AsyncPgConnection,
5446 ) -> BoxFuture<'a, crate::AutumnResult<Vec<TestPost>>> {
5447 Box::pin(async { Ok(Vec::new()) })
5448 }
5449 }
5450
5451 fn scope_only_meta(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
5452 RepositoryApiMeta {
5453 resource_type_name: type_name,
5454 api_path: path,
5455 has_policy: false,
5456 policy_check: None,
5457 scope_check: Some(|registry: &PolicyRegistry| registry.scope::<TestPost>().is_some()),
5458 }
5459 }
5460
5461 fn collect_missing_scopes(
5462 routes: &[Route],
5463 registry: &PolicyRegistry,
5464 ) -> Vec<(String, String)> {
5465 let (_, missing_scopes) = collect_unregistered_repository_handlers(routes, &[], registry);
5466 missing_scopes
5467 }
5468
5469 #[test]
5470 fn scope_check_flags_unregistered_scope() {
5471 let registry = PolicyRegistry::default();
5472 let routes = vec![build_route(
5473 http::Method::GET,
5474 "/api/posts",
5475 Some(scope_only_meta("/api/posts", "TestPost")),
5476 )];
5477 let missing = collect_missing_scopes(&routes, ®istry);
5478 assert_eq!(missing.len(), 1);
5479 assert_eq!(missing[0].0, "TestPost");
5480 }
5481
5482 #[test]
5483 fn scope_check_passes_when_scope_is_registered() {
5484 let registry = PolicyRegistry::default();
5485 registry.register_scope::<TestPost, _>(TestPostScope);
5486 let routes = vec![build_route(
5487 http::Method::GET,
5488 "/api/posts",
5489 Some(scope_only_meta("/api/posts", "TestPost")),
5490 )];
5491 let missing = collect_missing_scopes(&routes, ®istry);
5492 assert!(missing.is_empty());
5493 }
5494
5495 #[test]
5496 fn scope_check_skips_routes_without_scope_check_fn() {
5497 let registry = PolicyRegistry::default();
5498 let routes = vec![build_route(
5499 http::Method::POST,
5500 "/api/posts",
5501 Some(unguarded("/api/posts", "TestPost")),
5502 )];
5503 let missing = collect_missing_scopes(&routes, ®istry);
5504 assert!(missing.is_empty());
5505 }
5506
5507 #[test]
5510 fn is_production_profile_matches_both_aliases() {
5511 assert!(is_production_profile("prod"));
5512 assert!(is_production_profile("production"));
5513 assert!(!is_production_profile("dev"));
5514 assert!(!is_production_profile("staging"));
5515 assert!(!is_production_profile("test"));
5516 assert!(!is_production_profile("default"));
5517 assert!(!is_production_profile("Prod"));
5521 assert!(!is_production_profile("Production"));
5522 }
5523
5524 #[test]
5527 fn format_unguarded_listing_renders_one_bullet_per_offender() {
5528 let offenders = vec![
5529 ("Post".to_owned(), "/api/posts".to_owned()),
5530 ("Comment".to_owned(), "/api/comments".to_owned()),
5531 ];
5532 let listing = format_unguarded_repository_listing(&offenders);
5533 assert!(listing.contains("Post"));
5534 assert!(listing.contains("/api/posts"));
5535 assert!(listing.contains("Comment"));
5536 assert!(listing.contains("/api/comments"));
5537 assert_eq!(listing.matches("\n - ").count() + 1, 2);
5538 }
5539
5540 #[test]
5541 fn format_unguarded_listing_empty_input_yields_empty_string() {
5542 let listing = format_unguarded_repository_listing(&[]);
5543 assert!(listing.is_empty());
5544 }
5545
5546 #[test]
5547 fn format_missing_policy_listing_includes_policy_call_hint() {
5548 let missing = vec![("Post".to_owned(), "/api/posts".to_owned())];
5549 let listing = format_missing_policy_listing(&missing);
5550 assert!(listing.contains("Post"));
5551 assert!(listing.contains("/api/posts"));
5552 assert!(listing.contains(".policy::<Post, _>"));
5553 assert!(listing.contains("policy = ..."));
5554 }
5555
5556 #[test]
5557 fn format_missing_scope_listing_includes_scope_call_hint() {
5558 let missing = vec![("Post".to_owned(), "/api/posts".to_owned())];
5559 let listing = format_missing_scope_listing(&missing);
5560 assert!(listing.contains("Post"));
5561 assert!(listing.contains("/api/posts"));
5562 assert!(listing.contains(".scope::<Post, _>"));
5563 assert!(listing.contains("scope = ..."));
5564 }
5565
5566 #[test]
5569 fn collect_unguarded_walks_scoped_groups() {
5570 let group_route = build_route(
5575 http::Method::POST,
5576 "/api/posts",
5577 Some(unguarded("/api/posts", "Post")),
5578 );
5579 let group = ScopedGroup {
5580 prefix: "/scoped".to_owned(),
5581 routes: vec![group_route],
5582 source: crate::route_listing::RouteSource::User,
5583 apply_layer: Box::new(|r| r),
5584 };
5585 let offenders = collect_unguarded_repository_writes(&[], std::slice::from_ref(&group));
5586 assert_eq!(offenders.len(), 1);
5587 assert_eq!(offenders[0].0, "Post");
5588 }
5589
5590 #[test]
5591 fn collect_unregistered_walks_scoped_groups() {
5592 let group_route = build_route(
5593 http::Method::POST,
5594 "/api/posts",
5595 Some(guarded_with_check("/api/posts", "TestPost")),
5596 );
5597 let group = ScopedGroup {
5598 prefix: "/scoped".to_owned(),
5599 routes: vec![group_route],
5600 source: crate::route_listing::RouteSource::User,
5601 apply_layer: Box::new(|r| r),
5602 };
5603 let registry = PolicyRegistry::default();
5604 let (missing, _) =
5605 collect_unregistered_repository_handlers(&[], std::slice::from_ref(&group), ®istry);
5606 assert_eq!(missing.len(), 1);
5607 assert_eq!(missing[0].0, "TestPost");
5608 }
5609}
5610
5611fn build_state(
5612 config: &AutumnConfig,
5613 #[cfg(feature = "db")] database_topology: Option<&crate::db::DatabaseTopology>,
5614 #[cfg(feature = "ws")] channels_backend: Option<Arc<dyn crate::channels::ChannelsBackend>>,
5615) -> AppState {
5616 #[cfg(feature = "ws")]
5617 let shutdown = tokio_util::sync::CancellationToken::new();
5618 #[cfg(feature = "ws")]
5619 let channels = channels_backend.map_or_else(
5620 || {
5621 crate::channels::Channels::from_config(&config.channels, shutdown.child_token())
5622 .unwrap_or_else(|error| {
5623 tracing::error!(error = %error, "Failed to configure channels backend");
5624 std::process::exit(1);
5625 })
5626 },
5627 crate::channels::Channels::with_shared_backend,
5628 );
5629
5630 let state = AppState {
5631 extensions: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
5632 #[cfg(feature = "db")]
5633 pool: database_topology.map(|topology| topology.primary().clone()),
5634 #[cfg(feature = "db")]
5635 replica_pool: database_topology.and_then(|topology| topology.replica().cloned()),
5636 profile: config.profile.clone(),
5637 started_at: std::time::Instant::now(),
5638 health_detailed: config.health.detailed,
5639 probes: crate::probe::ProbeState::pending_startup(),
5640 metrics: crate::middleware::MetricsCollector::new(),
5641 log_levels: crate::actuator::LogLevels::new(&config.log.level),
5642 task_registry: crate::actuator::TaskRegistry::new(),
5643 job_registry: crate::actuator::JobRegistry::new(),
5644 config_props: crate::actuator::ConfigProperties::from_config(config),
5645 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
5646 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
5647 #[cfg(feature = "presence")]
5648 presence: crate::presence::Presence::new(channels.clone()),
5649 #[cfg(feature = "ws")]
5650 channels,
5651 #[cfg(feature = "ws")]
5652 shutdown,
5653 policy_registry: crate::authorization::PolicyRegistry::default(),
5654 forbidden_response: config.security.forbidden_response,
5655 auth_session_key: config.auth.session_key.clone(),
5656 shared_cache: None,
5657 clock: std::sync::Arc::new(crate::time::SystemClock),
5658 };
5659 #[cfg(feature = "db")]
5660 if state.replica_pool.is_some() {
5661 state
5662 .probes()
5663 .configure_replica_dependency(config.database.replica_fallback);
5664 }
5665 state.insert_extension(config.clone());
5666 state.insert_extension(crate::step_up::StepUpGlobalConfig {
5667 default_max_age_secs: config.auth.step_up.default_max_age_secs,
5668 });
5669 state
5670}
5671
5672fn format_route_lines(
5674 routes: &[Route],
5675 scoped_groups: &[ScopedGroup],
5676 config: &AutumnConfig,
5677) -> String {
5678 use std::fmt::Write as _;
5679
5680 let mut out = String::new();
5681 for route in routes {
5682 let _ = write!(
5683 out,
5684 "\n {} {:<8} -> {}",
5685 route.path, route.method, route.name
5686 );
5687 }
5688 for group in scoped_groups {
5689 for route in &group.routes {
5690 let _ = write!(
5691 out,
5692 "\n {}{} {:<8} -> {} (scoped)",
5693 group.prefix, route.path, route.method, route.name
5694 );
5695 }
5696 }
5697 let mut probe_paths = std::collections::HashSet::new();
5698 for (path, name) in [
5699 (config.health.live_path.as_str(), "live"),
5700 (config.health.ready_path.as_str(), "ready"),
5701 (config.health.startup_path.as_str(), "startup"),
5702 (config.health.path.as_str(), "health"),
5703 ] {
5704 if probe_paths.insert(path) {
5705 let _ = write!(out, "\n {} {:<8} -> {}", path, "GET", name);
5706 }
5707 }
5708 let _ = write!(
5709 out,
5710 "\n {} {:<8} -> actuator",
5711 crate::actuator::actuator_route_glob(&config.actuator.prefix),
5712 "GET"
5713 );
5714 #[cfg(feature = "htmx")]
5715 {
5716 out.push_str("\n /static/js/htmx.min.js GET -> htmx");
5717 out.push_str("\n /static/js/autumn-htmx-csrf.js GET -> htmx csrf");
5718 }
5719 out
5720}
5721
5722fn format_task_lines(tasks: &[crate::task::TaskInfo]) -> Option<String> {
5724 use std::fmt::Write as _;
5725
5726 if tasks.is_empty() {
5727 return None;
5728 }
5729
5730 let mut out = String::new();
5731 for task in tasks {
5732 let schedule = task.schedule.to_string();
5733 let _ = write!(out, "\n {} ({schedule})", task.name);
5734 }
5735 Some(out)
5736}
5737
5738fn format_middleware_list(config: &AutumnConfig) -> String {
5740 let mut items = vec![
5741 "RequestId",
5742 "SecurityHeaders",
5743 "Session (in-memory)",
5744 "ErrorPages",
5745 ];
5746 if !config.cors.allowed_origins.is_empty() {
5747 items.push("CORS");
5748 }
5749 if config.security.csrf.enabled {
5750 items.push("CSRF");
5751 }
5752 items.push("Metrics");
5753 items.join(", ")
5754}
5755
5756fn mask_database_url(url: &str, pool_size: usize) -> String {
5758 if let Ok(mut parsed_url) = url::Url::parse(url) {
5759 if parsed_url.password().is_some() {
5760 let _ = parsed_url.set_password(Some("****"));
5761 return format!("{parsed_url} (pool_size={pool_size})");
5762 }
5763 format!("{parsed_url} (pool_size={pool_size})")
5764 } else {
5765 format!("**** (pool_size={pool_size})")
5768 }
5769}
5770
5771fn format_config_summary(config: &AutumnConfig) -> String {
5773 let profile = config.profile.as_deref().unwrap_or("none");
5774 let db_status = config.database.effective_primary_url().map_or_else(
5775 || "not configured".to_owned(),
5776 |url| {
5777 let primary = mask_database_url(url, config.database.effective_primary_pool_size());
5778 if config.database.replica_url.is_some() {
5779 format!(
5780 "primary={primary}, replica=configured (pool_size={})",
5781 config.database.effective_replica_pool_size()
5782 )
5783 } else {
5784 primary
5785 }
5786 },
5787 );
5788 let telemetry_status = if config.telemetry.enabled {
5789 let endpoint = config
5790 .telemetry
5791 .otlp_endpoint
5792 .as_deref()
5793 .unwrap_or("<missing endpoint>");
5794 format!("{:?} -> {endpoint}", config.telemetry.protocol)
5795 } else {
5796 "disabled".to_owned()
5797 };
5798 format!(
5799 "\
5800 \n profile: {profile}\
5801 \n server: {}:{}\
5802 \n database: {db_status}\
5803 \n log_level: {}\
5804 \n log_format: {:?}\
5805 \n telemetry: {telemetry_status}\
5806 \n health: {} (detailed={})\
5807 \n actuator: sensitive={}\
5808 \n shutdown: prestop={}s drain={}s",
5809 config.server.host,
5810 config.server.port,
5811 config.log.level,
5812 config.log.format,
5813 config.health.path,
5814 config.health.detailed,
5815 config.actuator.sensitive,
5816 config.server.prestop_grace_secs,
5817 config.server.shutdown_timeout_secs,
5818 )
5819}
5820
5821pub(crate) fn project_dir(subdir: &str, env: &dyn crate::config::Env) -> std::path::PathBuf {
5824 env.var("AUTUMN_MANIFEST_DIR").map_or_else(
5825 |_| std::path::PathBuf::from(subdir),
5826 |d| std::path::PathBuf::from(d).join(subdir),
5827 )
5828}
5829
5830async fn shutdown_signal() {
5841 let ctrl_c = async {
5842 tokio::signal::ctrl_c()
5843 .await
5844 .expect("Failed to install Ctrl+C handler");
5845 tracing::info!("Received Ctrl+C, starting graceful shutdown");
5846 };
5847
5848 #[cfg(unix)]
5849 let terminate = async {
5850 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
5851 .expect("Failed to install SIGTERM handler")
5852 .recv()
5853 .await;
5854 tracing::info!("Received SIGTERM, starting graceful shutdown");
5855 };
5856
5857 #[cfg(not(unix))]
5858 let terminate = std::future::pending::<()>();
5859
5860 let canary_rollback = async {
5861 canary_rollback_signal(std::path::Path::new(
5862 crate::canary::CANARY_ROLLBACK_FLAG_FILE,
5863 ))
5864 .await;
5865 tracing::info!("Canary rollback signalled, starting graceful shutdown");
5866 };
5867
5868 tokio::select! {
5869 () = ctrl_c => {},
5870 () = terminate => {},
5871 () = canary_rollback => {},
5872 }
5873}
5874
5875async fn canary_rollback_signal(path: &std::path::Path) {
5888 let interval = std::time::Duration::from_millis(500);
5889 loop {
5890 if tokio::fs::metadata(path).await.is_ok() {
5891 return;
5892 }
5893 tokio::time::sleep(interval).await;
5894 }
5895}
5896
5897#[cfg(test)]
5898mod tests {
5899 use super::*;
5900 use axum::body::Body;
5901 use axum::http::{Request, StatusCode};
5902 use std::sync::atomic::{AtomicUsize, Ordering};
5903 use tower::ServiceExt;
5904
5905 #[cfg(feature = "db")]
5906 const APP_TEST_MIGRATIONS: crate::migrate::EmbeddedMigrations =
5907 diesel_migrations::embed_migrations!("test_migrations");
5908
5909 #[cfg(feature = "mail")]
5912 struct MailTestNoopQueue;
5913
5914 #[cfg(feature = "mail")]
5915 impl crate::mail::MailDeliveryQueue for MailTestNoopQueue {
5916 fn enqueue<'a>(
5917 &'a self,
5918 _mail: crate::mail::Mail,
5919 ) -> std::pin::Pin<
5920 Box<dyn std::future::Future<Output = Result<(), crate::mail::MailError>> + Send + 'a>,
5921 > {
5922 Box::pin(async { Ok(()) })
5923 }
5924 }
5925
5926 #[cfg(feature = "mail")]
5927 fn test_mail() -> crate::mail::Mail {
5928 crate::mail::Mail::builder()
5929 .to("test@example.com")
5930 .subject("hi")
5931 .text("hello")
5932 .build()
5933 .expect("test mail should build")
5934 }
5935
5936 pub fn test_router(routes: Vec<Route>) -> axum::Router {
5938 let config = AutumnConfig::default();
5939 let state = AppState {
5940 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5941 std::collections::HashMap::new(),
5942 )),
5943 #[cfg(feature = "db")]
5944 pool: None,
5945 #[cfg(feature = "db")]
5946 replica_pool: None,
5947 profile: None,
5948 started_at: std::time::Instant::now(),
5949 health_detailed: true,
5950 probes: crate::probe::ProbeState::ready_for_test(),
5951 metrics: crate::middleware::MetricsCollector::new(),
5952 log_levels: crate::actuator::LogLevels::new("info"),
5953 task_registry: crate::actuator::TaskRegistry::new(),
5954 job_registry: crate::actuator::JobRegistry::new(),
5955 config_props: crate::actuator::ConfigProperties::default(),
5956 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
5957 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
5958 #[cfg(feature = "ws")]
5959 channels: crate::channels::Channels::new(32),
5960 #[cfg(feature = "presence")]
5961 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
5962 #[cfg(feature = "ws")]
5963 shutdown: tokio_util::sync::CancellationToken::new(),
5964 policy_registry: crate::authorization::PolicyRegistry::default(),
5965 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5966 auth_session_key: "user_id".to_owned(),
5967 shared_cache: None,
5968 clock: std::sync::Arc::new(crate::time::SystemClock),
5969 };
5970 crate::router::build_router(routes, &config, state)
5971 }
5972
5973 #[tokio::test]
5974 async fn canary_rollback_signal_resolves_when_flag_newly_written() {
5975 let tmp = tempfile::TempDir::new().unwrap();
5976 let path = tmp.path().join("canary-rollback.json");
5977
5978 let writer_path = path.clone();
5980 let writer = tokio::spawn(async move {
5981 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
5982 crate::canary::CanaryState::write_rollback_flag(
5983 &writer_path,
5984 &crate::canary::RollbackSignal::default(),
5985 )
5986 .unwrap();
5987 });
5988
5989 let signalled = tokio::time::timeout(
5990 std::time::Duration::from_secs(5),
5991 canary_rollback_signal(&path),
5992 )
5993 .await;
5994 assert!(signalled.is_ok(), "rollback signal should resolve");
5995 writer.await.unwrap();
5996 }
5997
5998 #[tokio::test]
5999 async fn canary_rollback_signal_resolves_immediately_when_flag_present_at_boot() {
6000 let tmp = tempfile::TempDir::new().unwrap();
6001 let path = tmp.path().join("canary-rollback.json");
6002 crate::canary::CanaryState::write_rollback_flag(
6005 &path,
6006 &crate::canary::RollbackSignal::default(),
6007 )
6008 .unwrap();
6009
6010 let signalled = tokio::time::timeout(
6011 std::time::Duration::from_secs(5),
6012 canary_rollback_signal(&path),
6013 )
6014 .await;
6015 assert!(
6016 signalled.is_ok(),
6017 "a flag present at boot must trigger rollback (sticky across restarts)"
6018 );
6019 }
6020
6021 #[cfg(feature = "db")]
6022 #[test]
6023 fn build_state_applies_replica_fallback_policy_to_read_routing() {
6024 let mut config = AutumnConfig::default();
6025 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
6026 config.database.primary_pool_size = Some(5);
6027 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
6028 config.database.replica_pool_size = Some(2);
6029 config.database.replica_fallback = crate::config::ReplicaFallback::Primary;
6030 let topology = crate::db::create_topology(&config.database)
6031 .expect("topology should build")
6032 .expect("database should be configured");
6033
6034 let state = build_state(
6035 &config,
6036 Some(&topology),
6037 #[cfg(feature = "ws")]
6038 None,
6039 );
6040 state
6041 .probes()
6042 .mark_replica_unready("replica migrations lag primary");
6043
6044 assert_eq!(state.read_pool().expect("read pool").status().max_size, 5);
6045 }
6046
6047 #[cfg(feature = "db")]
6048 #[tokio::test]
6049 async fn custom_pool_provider_preserves_configured_replica_topology() {
6050 struct PassthroughPoolProvider;
6051
6052 impl crate::db::DatabasePoolProvider for PassthroughPoolProvider {
6053 async fn create_pool(
6054 &self,
6055 config: &crate::config::DatabaseConfig,
6056 ) -> Result<
6057 Option<
6058 diesel_async::pooled_connection::deadpool::Pool<
6059 diesel_async::AsyncPgConnection,
6060 >,
6061 >,
6062 crate::db::PoolError,
6063 > {
6064 crate::db::create_pool(config)
6065 }
6066 }
6067
6068 let mut config = AutumnConfig::default();
6069 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
6070 config.database.primary_pool_size = Some(5);
6071 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
6072 config.database.replica_pool_size = Some(2);
6073 config.database.replica_fallback = crate::config::ReplicaFallback::FailReadiness;
6074 let AppBuilder {
6075 pool_provider_factory,
6076 ..
6077 } = app().with_pool_provider(PassthroughPoolProvider);
6078
6079 let database = setup_database(
6080 &config,
6081 Vec::new(),
6082 pool_provider_factory,
6083 RepositoryCommitHookQueueMigrationMode::Runtime,
6084 )
6085 .await
6086 .expect("custom provider should build database topology");
6087 let topology = database.topology.expect("database should be configured");
6088
6089 assert_eq!(topology.primary().status().max_size, 5);
6090 assert_eq!(
6091 topology
6092 .replica()
6093 .expect("custom provider should create replica pool")
6094 .status()
6095 .max_size,
6096 2
6097 );
6098
6099 let state = build_state(
6100 &config,
6101 Some(&topology),
6102 #[cfg(feature = "ws")]
6103 None,
6104 );
6105 state
6106 .probes()
6107 .mark_replica_connection_unready("replica connection failed");
6108
6109 assert!(state.read_pool().is_none());
6110 let (status, _) = crate::probe::readiness_response(&state).await;
6111 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
6112 }
6113
6114 #[cfg(feature = "db")]
6115 #[test]
6116 fn repository_commit_hook_worker_starts_after_job_runtime_initialization() {
6117 let source = include_str!("app.rs").replace("\r\n", "\n");
6118 let server_init = "initialize_job_runtime(jobs, &state, &server_shutdown, &config.jobs)";
6119 let server_worker = "start_repository_commit_hook_worker(\n pool,\n server_shutdown.child_token(),\n );";
6120 let task_init = "initialize_job_runtime(jobs, &state, &task_shutdown, &config.jobs)";
6121 let task_worker = "start_repository_commit_hook_worker(\n pool,\n task_shutdown.child_token(),\n );";
6122
6123 assert!(
6124 source
6125 .find(server_init)
6126 .expect("normal server path should initialize jobs")
6127 < source
6128 .find(server_worker)
6129 .expect("normal server path should start repository hook worker"),
6130 "normal server startup must initialize jobs before repository commit hooks can enqueue them"
6131 );
6132 assert!(
6133 source
6134 .find(task_init)
6135 .expect("task runner path should initialize jobs")
6136 < source
6137 .find(task_worker)
6138 .expect("task runner path should start repository hook worker"),
6139 "task runner startup must initialize jobs before repository commit hooks can enqueue them"
6140 );
6141 }
6142
6143 #[test]
6144 fn state_initializers_run_before_job_runtime_initialization() {
6145 let source = include_str!("app.rs").replace("\r\n", "\n");
6146 let server_start = source
6147 .find("pub async fn run(self)")
6148 .expect("normal server path should exist");
6149 let build_mode_start = source
6150 .find("async fn run_build_mode(self)")
6151 .expect("static build path should follow server path");
6152 let task_start = source
6153 .find("async fn run_one_off_task_mode(self, requested_name: String)")
6154 .expect("task runner path should exist");
6155 let server_source = &source[server_start..build_mode_start];
6156 let task_source = &source[task_start..];
6157 let server_init = "initialize_job_runtime(jobs, &state, &server_shutdown, &config.jobs)";
6158 let task_init = "initialize_job_runtime(jobs, &state, &task_shutdown, &config.jobs)";
6159 let server_initializer = server_source
6160 .find("run_state_initializers(state_initializers, &state);")
6161 .expect("normal server path should run state initializers");
6162 let task_initializer = task_source
6163 .find("run_state_initializers(state_initializers, &state);")
6164 .expect("task runner path should run state initializers");
6165 let server_job = server_source
6166 .find(server_init)
6167 .expect("normal server path should initialize jobs");
6168 let task_job = task_source
6169 .find(task_init)
6170 .expect("task runner path should initialize jobs");
6171
6172 assert!(
6173 server_initializer < server_job,
6174 "normal server startup must install state-initialized resources before job workers start"
6175 );
6176 assert!(
6177 task_initializer < task_job,
6178 "task runner startup must install state-initialized resources before job workers start"
6179 );
6180 }
6181
6182 #[test]
6183 fn static_builds_run_state_initializers_before_router_build() {
6184 let source = include_str!("app.rs").replace("\r\n", "\n");
6185 let build_mode_start = source
6186 .find("async fn run_build_mode(self)")
6187 .expect("static build path should exist");
6188 let dump_mode_start = source
6189 .find("async fn run_dump_routes_mode(self)")
6190 .expect("route dump path should follow static build path");
6191 let build_mode_source = &source[build_mode_start..dump_mode_start];
6192 let state_initializer = build_mode_source
6193 .find("run_state_initializers(state_initializers, &state);")
6194 .expect("static build path should run state initializers");
6195 let router_build = build_mode_source
6196 .find("let router = crate::router::try_build_router_inner(")
6197 .expect("static build path should build a router");
6198
6199 assert!(
6200 state_initializer < router_build,
6201 "static builds must install state-initialized resources before rendering routes"
6202 );
6203 }
6204
6205 #[cfg(feature = "db")]
6206 #[test]
6207 fn hooked_repository_apps_include_hook_queue_framework_migration() {
6208 let migrations = migrations_with_repository_framework_migrations(
6209 vec![APP_TEST_MIGRATIONS],
6210 true,
6211 false,
6212 RepositoryCommitHookQueueMigrationMode::Runtime,
6213 );
6214 let names = migration_names(&migrations);
6215
6216 assert!(
6217 names
6218 .iter()
6219 .any(|name| name == REPOSITORY_COMMIT_HOOK_QUEUE_MIGRATION),
6220 "hooked repository apps must auto-register the durable hook queue migration"
6221 );
6222 assert!(
6223 names.iter().all(|name| !name.contains("api_tokens")),
6224 "hooked repository apps must not auto-register unrelated framework migrations: {names:?}"
6225 );
6226 }
6227
6228 #[cfg(feature = "db")]
6229 #[test]
6230 fn runtime_hooked_apps_include_hook_queue_framework_migration_without_app_migrations() {
6231 let migrations = migrations_with_repository_framework_migrations(
6232 Vec::new(),
6233 true,
6234 false,
6235 RepositoryCommitHookQueueMigrationMode::Runtime,
6236 );
6237 let names = migration_names(&migrations);
6238
6239 assert!(
6240 names
6241 .iter()
6242 .any(|name| name == REPOSITORY_COMMIT_HOOK_QUEUE_MIGRATION),
6243 "runtime hooked repository apps must install the durable hook queue even when app migrations are managed elsewhere"
6244 );
6245 }
6246
6247 #[cfg(feature = "db")]
6248 #[test]
6249 fn versioned_repository_apps_include_version_history_framework_migration() {
6250 let migrations = migrations_with_repository_framework_migrations(
6251 vec![APP_TEST_MIGRATIONS],
6252 false,
6253 true,
6254 RepositoryCommitHookQueueMigrationMode::Runtime,
6255 );
6256 let names = migration_names(&migrations);
6257
6258 assert!(
6259 names.iter().any(|name| name == VERSION_HISTORY_MIGRATION),
6260 "versioned repository apps must auto-register the version-history migration"
6261 );
6262 assert!(
6263 names
6264 .iter()
6265 .all(|name| !name.contains("repository_commit_hook_queue")),
6266 "versioned-only repository apps must not auto-register the durable hook queue: {names:?}"
6267 );
6268 }
6269
6270 #[cfg(feature = "db")]
6271 #[test]
6272 fn runtime_versioned_apps_include_version_history_framework_migration_without_app_migrations() {
6273 let migrations = migrations_with_repository_framework_migrations(
6274 Vec::new(),
6275 false,
6276 true,
6277 RepositoryCommitHookQueueMigrationMode::Runtime,
6278 );
6279 let names = migration_names(&migrations);
6280
6281 assert!(
6282 names.iter().any(|name| name == VERSION_HISTORY_MIGRATION),
6283 "runtime versioned repository apps must install version history even when app migrations are managed elsewhere"
6284 );
6285 }
6286
6287 #[cfg(feature = "db")]
6288 #[test]
6289 fn static_builds_do_not_auto_add_hook_queue_when_no_migrations_registered() {
6290 let migrations = migrations_with_repository_framework_migrations(
6291 Vec::new(),
6292 true,
6293 true,
6294 RepositoryCommitHookQueueMigrationMode::StaticBuild,
6295 );
6296
6297 assert!(
6298 migrations.is_empty(),
6299 "static/export builds that pass no migrations must not mutate the database"
6300 );
6301 }
6302
6303 #[cfg(feature = "db")]
6304 #[test]
6305 fn unhooked_apps_do_not_auto_add_hook_queue_framework_migration() {
6306 let migrations = migrations_with_repository_framework_migrations(
6307 Vec::new(),
6308 false,
6309 false,
6310 RepositoryCommitHookQueueMigrationMode::Runtime,
6311 );
6312
6313 assert!(
6314 migrations.is_empty(),
6315 "unhooked apps should not get durable hook queue migrations for free"
6316 );
6317 }
6318
6319 #[cfg(feature = "db")]
6320 fn migration_names(migrations: &[crate::migrate::EmbeddedMigrations]) -> Vec<String> {
6321 use diesel::migration::{Migration, MigrationSource as _};
6322 use diesel::pg::Pg;
6323
6324 migrations
6325 .iter()
6326 .flat_map(|source| {
6327 let migrations: Vec<Box<dyn Migration<Pg>>> = source.migrations().unwrap();
6328 migrations
6329 })
6330 .map(|migration| migration.name().to_string())
6331 .collect()
6332 }
6333
6334 #[cfg(feature = "db")]
6335 #[test]
6336 fn configure_replica_migration_check_stores_recheck_urls() {
6337 let mut config = AutumnConfig::default();
6338 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
6339 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
6340 let topology = crate::db::create_topology(&config.database)
6341 .expect("topology should build")
6342 .expect("database should be configured");
6343
6344 let state = build_state(
6345 &config,
6346 Some(&topology),
6347 #[cfg(feature = "ws")]
6348 None,
6349 );
6350
6351 assert!(
6352 state.probes().replica_migration_check().is_none(),
6353 "build_state should not enable migration checks without registered migrations"
6354 );
6355
6356 configure_replica_migration_check(
6357 &state,
6358 Some((
6359 "postgres://localhost/primary".to_owned(),
6360 "postgres://localhost/replica".to_owned(),
6361 )),
6362 );
6363
6364 let check = state
6365 .probes()
6366 .replica_migration_check()
6367 .expect("replica migration check should be configured");
6368
6369 assert_eq!(check.primary_url, "postgres://localhost/primary");
6370 assert_eq!(check.replica_url, "postgres://localhost/replica");
6371 }
6372
6373 #[cfg(feature = "db")]
6374 #[tokio::test]
6375 async fn replica_migration_readiness_marks_ready_endpoint_degraded() {
6376 let mut config = AutumnConfig::default();
6377 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
6378 config.database.primary_pool_size = Some(5);
6379 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
6380 config.database.replica_pool_size = Some(2);
6381 config.database.replica_fallback = crate::config::ReplicaFallback::FailReadiness;
6382 let topology = crate::db::create_topology(&config.database)
6383 .expect("topology should build")
6384 .expect("database should be configured");
6385 let state = build_state(
6386 &config,
6387 Some(&topology),
6388 #[cfg(feature = "ws")]
6389 None,
6390 );
6391
6392 apply_replica_migration_readiness(
6393 &state,
6394 Some(crate::migrate::ReplicaMigrationReadiness::Stale {
6395 primary_latest: Some("00000000000002".to_owned()),
6396 replica_latest: Some("00000000000001".to_owned()),
6397 }),
6398 );
6399
6400 let (status, _) = crate::probe::readiness_response(&state).await;
6401
6402 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
6403 }
6404
6405 #[cfg(feature = "db")]
6406 #[tokio::test]
6407 async fn blocking_replica_migration_readiness_reports_unknown_connection_errors() {
6408 let readiness = crate::migrate::check_replica_migration_readiness_blocking(
6409 "not-a-primary-url".to_owned(),
6410 "not-a-replica-url".to_owned(),
6411 )
6412 .await;
6413
6414 assert!(matches!(
6415 readiness,
6416 crate::migrate::ReplicaMigrationReadiness::Unknown(_)
6417 ));
6418 }
6419
6420 #[cfg(feature = "ws")]
6421 #[test]
6422 fn with_channels_backend_overrides_config_driven_backend_selection() {
6423 let builder = app().with_channels_backend(crate::channels::LocalChannelsBackend::new(4));
6424 let AppBuilder {
6425 channels_backend, ..
6426 } = builder;
6427 assert!(channels_backend.is_some());
6428
6429 let mut config = AutumnConfig::default();
6430 config.channels.backend = crate::config::ChannelBackend::Redis;
6431 config.channels.redis.url = None;
6432
6433 let state = build_state(
6434 &config,
6435 #[cfg(feature = "db")]
6436 None,
6437 #[cfg(feature = "ws")]
6438 channels_backend,
6439 );
6440 let mut rx = state.channels().subscribe("override");
6441
6442 state
6443 .broadcast()
6444 .publish("override", "ok")
6445 .expect("custom local backend should publish");
6446
6447 assert_eq!(rx.try_recv().expect("message should arrive").as_str(), "ok");
6448 }
6449
6450 pub fn test_get_route(path: &'static str, name: &'static str) -> Route {
6452 Route {
6453 method: http::Method::GET,
6454 path,
6455 handler: axum::routing::get(|| async { "ok" }),
6456 name,
6457 api_doc: crate::openapi::ApiDoc {
6458 method: "GET",
6459 path,
6460 operation_id: name,
6461 success_status: 200,
6462 ..Default::default()
6463 },
6464 repository: None,
6465 idempotency: crate::route::RouteIdempotency::Direct,
6466 api_version: None,
6467 sunset_opt_out: false,
6468 }
6469 }
6470
6471 #[cfg(feature = "i18n")]
6472 fn test_i18n_bundle(key: &str, value: &str) -> Arc<crate::i18n::Bundle> {
6473 let mut messages = std::collections::HashMap::new();
6474 let mut en = std::collections::HashMap::new();
6475 en.insert(key.to_owned(), value.to_owned());
6476 messages.insert("en".to_owned(), en);
6477 Arc::new(crate::i18n::Bundle::from_messages(
6478 messages,
6479 &crate::i18n::I18nConfig::default(),
6480 ))
6481 }
6482
6483 #[cfg(feature = "i18n")]
6484 #[test]
6485 fn i18n_auto_defers_loading_until_runtime_config_is_available() {
6486 let builder = app().i18n_auto();
6487
6488 assert!(builder.i18n_bundle.is_none());
6489 assert!(builder.i18n_auto_load);
6490 }
6491
6492 #[cfg(feature = "i18n")]
6493 #[derive(Clone)]
6494 struct StaticConfigLoader {
6495 config: AutumnConfig,
6496 }
6497
6498 #[cfg(feature = "i18n")]
6499 impl crate::config::ConfigLoader for StaticConfigLoader {
6500 async fn load(&self) -> Result<AutumnConfig, crate::config::ConfigError> {
6501 Ok(self.config.clone())
6502 }
6503 }
6504
6505 #[cfg(feature = "i18n")]
6506 struct NoopTelemetryProvider;
6507
6508 #[cfg(feature = "i18n")]
6509 impl crate::telemetry::TelemetryProvider for NoopTelemetryProvider {
6510 fn init(
6511 &self,
6512 _log: &crate::config::LogConfig,
6513 _telemetry: &crate::config::TelemetryConfig,
6514 _profile: Option<&str>,
6515 ) -> Result<crate::telemetry::TelemetryGuard, crate::telemetry::TelemetryInitError>
6516 {
6517 Ok(crate::telemetry::TelemetryGuard::disabled())
6518 }
6519 }
6520
6521 #[cfg(feature = "i18n")]
6522 #[tokio::test]
6523 async fn i18n_auto_uses_config_loader_output_for_bundle_dir() {
6524 let project = tempfile::tempdir().expect("project dir");
6525 let i18n_dir = project.path().join("custom-i18n");
6526 std::fs::create_dir_all(&i18n_dir).expect("i18n dir");
6527 std::fs::write(i18n_dir.join("en.ftl"), "nav.home = Loader Home\n").expect("bundle");
6528
6529 let mut config = AutumnConfig::default();
6530 config.i18n.dir = "custom-i18n".to_owned();
6531 let builder = app()
6532 .with_config_loader(StaticConfigLoader { config })
6533 .with_telemetry_provider(NoopTelemetryProvider)
6534 .i18n_auto();
6535 let AppBuilder {
6536 config_loader_factory,
6537 telemetry_provider,
6538 i18n_bundle,
6539 i18n_auto_load,
6540 ..
6541 } = builder;
6542
6543 let (loaded_config, _guard) =
6544 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
6545 let env = crate::config::MockEnv::new().with(
6546 "AUTUMN_MANIFEST_DIR",
6547 project.path().to_str().expect("utf-8 path"),
6548 );
6549 let bundle = resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &loaded_config, &env)
6550 .expect("bundle loaded from configured dir");
6551
6552 assert_eq!(bundle.translate("en", "nav.home", &[]), "Loader Home");
6553 }
6554
6555 #[cfg(feature = "i18n")]
6556 #[tokio::test]
6557 async fn i18n_bundle_layer_is_applied_to_static_route_rendering() {
6558 async fn localized(locale: crate::i18n::Locale) -> String {
6559 locale.t("nav.home")
6560 }
6561
6562 let config = AutumnConfig::default();
6563 let state = AppState::for_test();
6564 let custom_layers = install_i18n_bundle_layer(
6565 Vec::new(),
6566 &state,
6567 Some(test_i18n_bundle("nav.home", "Home")),
6568 );
6569 let router = crate::router::try_build_router_inner(
6570 vec![Route {
6571 method: http::Method::GET,
6572 path: "/about",
6573 handler: axum::routing::get(localized),
6574 name: "localized",
6575 api_doc: crate::openapi::ApiDoc {
6576 method: "GET",
6577 path: "/about",
6578 operation_id: "localized",
6579 success_status: 200,
6580 ..Default::default()
6581 },
6582 repository: None,
6583 idempotency: crate::route::RouteIdempotency::Direct,
6584 api_version: None,
6585 sunset_opt_out: false,
6586 }],
6587 &config,
6588 state,
6589 crate::router::RouterContext {
6590 exception_filters: Vec::new(),
6591 scoped_groups: Vec::new(),
6592 merge_routers: Vec::new(),
6593 nest_routers: Vec::new(),
6594 custom_layers,
6595 error_page_renderer: None,
6596 session_store: None,
6597 #[cfg(feature = "openapi")]
6598 openapi: None,
6599 #[cfg(feature = "mcp")]
6600 mcp: None,
6601 },
6602 )
6603 .expect("router builds");
6604 let tmp = tempfile::tempdir().expect("dist parent");
6605 let dist = tmp.path().join("dist");
6606
6607 crate::static_gen::render_static_routes(
6608 router,
6609 &[crate::static_gen::StaticRouteMeta {
6610 path: "/about",
6611 name: "localized",
6612 revalidate: None,
6613 params_fn: None,
6614 }],
6615 &dist,
6616 )
6617 .await
6618 .expect("static render succeeds");
6619
6620 let html = std::fs::read_to_string(dist.join("about/index.html")).expect("rendered html");
6621 assert_eq!(html, "Home");
6622 }
6623
6624 #[test]
6625 fn app_builder_routes_adds_routes() {
6626 let builder = app();
6627 assert_eq!(builder.routes.len(), 0);
6628
6629 let builder = builder.routes(vec![test_get_route("/1", "route1")]);
6630 assert_eq!(builder.routes.len(), 1);
6631
6632 let builder = builder.routes(vec![
6633 test_get_route("/2", "route2"),
6634 test_get_route("/3", "route3"),
6635 ]);
6636 assert_eq!(builder.routes.len(), 3);
6637
6638 assert_eq!(builder.routes[0].path, "/1");
6639 assert_eq!(builder.routes[1].path, "/2");
6640 assert_eq!(builder.routes[2].path, "/3");
6641 }
6642
6643 #[test]
6644 fn app_builder_extensions_store_and_update_typed_values() {
6645 let builder = app()
6646 .with_extension::<String>("haunted".into())
6647 .update_extension::<String, _, _>(String::new, |value| value.push_str(" harvest"));
6648
6649 let value = builder
6650 .extension::<String>()
6651 .expect("string extension should be present");
6652 assert_eq!(value, "haunted harvest");
6653 }
6654
6655 #[cfg(feature = "mail")]
6656 #[tokio::test]
6657 async fn app_builder_with_mail_delivery_queue_stores_queue_for_install() {
6658 let builder = app().with_mail_delivery_queue(MailTestNoopQueue);
6659 let factory = builder
6660 .mail_delivery_queue_factory
6661 .expect("with_mail_delivery_queue should store a factory on the builder");
6662
6663 let state = AppState::for_test();
6666 let queue = factory(&state).expect("trivial factory should produce the queue");
6667 assert!(Arc::strong_count(&queue) >= 1);
6668 queue
6670 .enqueue(test_mail())
6671 .await
6672 .expect("noop queue should always succeed");
6673 }
6674
6675 #[cfg(feature = "mail")]
6676 #[test]
6677 fn app_builder_with_mail_delivery_queue_factory_runs_with_app_state() {
6678 let observed_profile: Arc<std::sync::Mutex<Option<String>>> =
6679 Arc::new(std::sync::Mutex::new(None));
6680 let captured = Arc::clone(&observed_profile);
6681 let builder = app().with_mail_delivery_queue_factory(move |state| {
6682 *captured.lock().expect("lock") = Some(state.profile().to_owned());
6683 Ok::<_, crate::AutumnError>(MailTestNoopQueue)
6684 });
6685
6686 let factory = builder
6687 .mail_delivery_queue_factory
6688 .expect("factory should be stored on the builder");
6689 let state = AppState::for_test().with_profile("dev");
6690 let _queue = factory(&state).expect("factory should succeed");
6691
6692 assert_eq!(
6693 observed_profile.lock().expect("lock").as_deref(),
6694 Some("dev"),
6695 "factory must run with the live AppState"
6696 );
6697 }
6698
6699 #[cfg(feature = "mail")]
6700 #[test]
6701 fn app_builder_with_mail_delivery_queue_factory_propagates_errors() {
6702 let builder = app().with_mail_delivery_queue_factory(|_state| {
6703 Err::<MailTestNoopQueue, _>(crate::AutumnError::service_unavailable_msg("factory boom"))
6704 });
6705
6706 let factory = builder
6707 .mail_delivery_queue_factory
6708 .expect("factory present");
6709 let state = AppState::for_test();
6710 match factory(&state) {
6711 Ok(_) => panic!("factory should have errored"),
6712 Err(err) => assert!(err.to_string().contains("factory boom")),
6713 }
6714 }
6715
6716 #[tokio::test]
6717 async fn startup_and_shutdown_hooks_run_in_expected_order() {
6718 let events = Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
6719 let startup_events = Arc::clone(&events);
6720 let shutdown_a = Arc::clone(&events);
6721 let shutdown_b = Arc::clone(&events);
6722 let builder = app()
6723 .on_startup(move |_state| {
6724 let startup_events = Arc::clone(&startup_events);
6725 async move {
6726 startup_events
6727 .lock()
6728 .expect("events lock poisoned")
6729 .push("start");
6730 Ok(())
6731 }
6732 })
6733 .on_shutdown(move || {
6734 let shutdown_a = Arc::clone(&shutdown_a);
6735 async move {
6736 shutdown_a
6737 .lock()
6738 .expect("events lock poisoned")
6739 .push("stop-a");
6740 }
6741 })
6742 .on_shutdown(move || {
6743 let shutdown_b = Arc::clone(&shutdown_b);
6744 async move {
6745 shutdown_b
6746 .lock()
6747 .expect("events lock poisoned")
6748 .push("stop-b");
6749 }
6750 });
6751
6752 run_startup_hooks(&builder.startup_hooks, AppState::for_test())
6753 .await
6754 .expect("startup hooks should succeed");
6755 run_shutdown_hooks(&builder.shutdown_hooks).await;
6756
6757 let recorded_events = events.lock().expect("events lock poisoned").clone();
6758 assert_eq!(recorded_events, vec!["start", "stop-b", "stop-a"]);
6759 }
6760
6761 fn startup_noop_job_handler(
6762 _state: AppState,
6763 _payload: serde_json::Value,
6764 ) -> Pin<Box<dyn Future<Output = crate::AutumnResult<()>> + Send + 'static>> {
6765 Box::pin(async move { Ok(()) })
6766 }
6767
6768 #[tokio::test]
6769 async fn startup_hooks_can_enqueue_jobs_after_runtime_init() {
6770 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
6771 crate::job::clear_global_job_client();
6772
6773 let builder = app()
6774 .jobs(vec![crate::job::JobInfo {
6775 name: "startup-seed".to_string(),
6776 max_attempts: 1,
6777 initial_backoff_ms: 1,
6778 uniqueness: None,
6779 concurrency: None,
6780 handler: startup_noop_job_handler,
6781 }])
6782 .on_startup(|_state| async {
6783 crate::job::enqueue("startup-seed", serde_json::json!({ "kind": "warmup" })).await
6784 });
6785
6786 let state = AppState::for_test().with_profile("dev");
6787 let shutdown = tokio_util::sync::CancellationToken::new();
6788
6789 initialize_job_runtime(
6790 builder.jobs.clone(),
6791 &state,
6792 &shutdown,
6793 &crate::config::JobConfig::default(),
6794 )
6795 .expect("job runtime should initialize before startup hooks");
6796
6797 run_startup_hooks(&builder.startup_hooks, state.clone())
6798 .await
6799 .expect("startup hook should be able to enqueue jobs");
6800
6801 tokio::time::timeout(std::time::Duration::from_secs(1), async {
6802 loop {
6803 let snapshot = state.job_registry().snapshot();
6804 let status = snapshot
6805 .get("startup-seed")
6806 .expect("job should be registered before startup hooks run");
6807 if status.total_successes == 1 {
6808 break;
6809 }
6810 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
6811 }
6812 })
6813 .await
6814 .expect("startup-enqueued job should complete");
6815
6816 shutdown.cancel();
6817 crate::job::clear_global_job_client();
6818 }
6819
6820 #[tokio::test]
6821 async fn initialize_job_runtime_propagates_redis_init_errors() {
6822 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
6823 crate::job::clear_global_job_client();
6824
6825 let state = AppState::for_test().with_profile("dev");
6826 let shutdown = tokio_util::sync::CancellationToken::new();
6827 let config = crate::config::JobConfig {
6828 backend: "redis".to_string(),
6829 ..Default::default()
6830 };
6831
6832 let error = initialize_job_runtime(
6833 vec![crate::job::JobInfo {
6834 name: "startup-seed".to_string(),
6835 max_attempts: 1,
6836 initial_backoff_ms: 1,
6837 uniqueness: None,
6838 concurrency: None,
6839 handler: startup_noop_job_handler,
6840 }],
6841 &state,
6842 &shutdown,
6843 &config,
6844 )
6845 .expect_err("redis init errors should abort startup");
6846
6847 #[cfg(feature = "redis")]
6848 assert!(
6849 error
6850 .to_string()
6851 .contains("jobs.backend=redis requires jobs.redis.url"),
6852 "unexpected error: {error}"
6853 );
6854
6855 #[cfg(not(feature = "redis"))]
6856 assert!(
6857 error
6858 .to_string()
6859 .contains("jobs.backend=redis requested but redis feature is disabled"),
6860 "unexpected error: {error}"
6861 );
6862 }
6863
6864 #[tokio::test]
6865 async fn startup_hook_errors_propagate() {
6866 let builder = app().on_startup(|_state| async {
6867 Err(crate::AutumnError::service_unavailable_msg(
6868 "startup ritual failed",
6869 ))
6870 });
6871
6872 let error = run_startup_hooks(&builder.startup_hooks, AppState::for_test())
6873 .await
6874 .expect_err("startup hook should fail");
6875 assert!(error.to_string().contains("startup ritual failed"));
6876 }
6877
6878 #[tokio::test]
6879 async fn build_router_mounts_user_routes() {
6880 let router = test_router(vec![test_get_route("/test", "test_handler")]);
6881
6882 let response = router
6883 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
6884 .await
6885 .unwrap();
6886
6887 assert_eq!(response.status(), StatusCode::OK);
6888 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
6889 .await
6890 .unwrap();
6891 assert_eq!(&body[..], b"ok");
6892 }
6893
6894 #[tokio::test]
6895 async fn build_router_mounts_health_check_at_default_path() {
6896 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
6897
6898 let response = router
6899 .oneshot(
6900 Request::builder()
6901 .uri("/health")
6902 .body(Body::empty())
6903 .unwrap(),
6904 )
6905 .await
6906 .unwrap();
6907
6908 assert_eq!(response.status(), StatusCode::OK);
6909 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
6910 .await
6911 .unwrap();
6912 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
6913 assert_eq!(json["status"], "ok");
6914 }
6915
6916 #[tokio::test]
6917 async fn build_router_mounts_health_check_at_custom_path() {
6918 let mut config = AutumnConfig::default();
6919 config.health.path = "/healthz".to_owned();
6920 let state = AppState {
6921 extensions: std::sync::Arc::new(std::sync::RwLock::new(
6922 std::collections::HashMap::new(),
6923 )),
6924 #[cfg(feature = "db")]
6925 pool: None,
6926 #[cfg(feature = "db")]
6927 replica_pool: None,
6928 profile: None,
6929 started_at: std::time::Instant::now(),
6930 health_detailed: true,
6931 probes: crate::probe::ProbeState::ready_for_test(),
6932 metrics: crate::middleware::MetricsCollector::new(),
6933 log_levels: crate::actuator::LogLevels::new("info"),
6934 task_registry: crate::actuator::TaskRegistry::new(),
6935 job_registry: crate::actuator::JobRegistry::new(),
6936 config_props: crate::actuator::ConfigProperties::default(),
6937 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
6938 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
6939 #[cfg(feature = "ws")]
6940 channels: crate::channels::Channels::new(32),
6941 #[cfg(feature = "presence")]
6942 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
6943 #[cfg(feature = "ws")]
6944 shutdown: tokio_util::sync::CancellationToken::new(),
6945 policy_registry: crate::authorization::PolicyRegistry::default(),
6946 forbidden_response: crate::authorization::ForbiddenResponse::default(),
6947 auth_session_key: "user_id".to_owned(),
6948 shared_cache: None,
6949 clock: std::sync::Arc::new(crate::time::SystemClock),
6950 };
6951 let router =
6952 crate::router::build_router(vec![test_get_route("/dummy", "dummy")], &config, state);
6953
6954 let response = router
6955 .oneshot(
6956 Request::builder()
6957 .uri("/healthz")
6958 .body(Body::empty())
6959 .unwrap(),
6960 )
6961 .await
6962 .unwrap();
6963
6964 assert_eq!(response.status(), StatusCode::OK);
6965 }
6966
6967 #[tokio::test]
6968 async fn build_router_adds_request_id_header() {
6969 let router = test_router(vec![test_get_route("/test", "test")]);
6970
6971 let response = router
6972 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
6973 .await
6974 .unwrap();
6975
6976 assert!(response.headers().contains_key("x-request-id"));
6977 }
6978
6979 #[tokio::test]
6980 async fn build_router_unknown_route_returns_404() {
6981 let router = test_router(vec![test_get_route("/exists", "exists")]);
6982
6983 let response = router
6984 .oneshot(Request::builder().uri("/nope").body(Body::empty()).unwrap())
6985 .await
6986 .unwrap();
6987
6988 assert_eq!(response.status(), StatusCode::NOT_FOUND);
6989 }
6990
6991 #[tokio::test]
6992 async fn build_router_multiple_routes() {
6993 let router = test_router(vec![test_get_route("/a", "a"), test_get_route("/b", "b")]);
6994
6995 let resp_a = router
6996 .clone()
6997 .oneshot(Request::builder().uri("/a").body(Body::empty()).unwrap())
6998 .await
6999 .unwrap();
7000 assert_eq!(resp_a.status(), StatusCode::OK);
7001
7002 let resp_b = router
7003 .oneshot(Request::builder().uri("/b").body(Body::empty()).unwrap())
7004 .await
7005 .unwrap();
7006 assert_eq!(resp_b.status(), StatusCode::OK);
7007 }
7008
7009 #[tokio::test]
7010 async fn build_router_post_route() {
7011 let post_routes = vec![Route {
7012 method: http::Method::POST,
7013 path: "/submit",
7014 handler: axum::routing::post(|| async { "posted" }),
7015 name: "submit",
7016 api_doc: crate::openapi::ApiDoc {
7017 method: "POST",
7018 path: "/submit",
7019 operation_id: "submit",
7020 success_status: 200,
7021 ..Default::default()
7022 },
7023 repository: None,
7024 idempotency: crate::route::RouteIdempotency::Direct,
7025 api_version: None,
7026 sunset_opt_out: false,
7027 }];
7028 let config = AutumnConfig::default();
7029 let state = AppState {
7030 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7031 std::collections::HashMap::new(),
7032 )),
7033 #[cfg(feature = "db")]
7034 pool: None,
7035 #[cfg(feature = "db")]
7036 replica_pool: None,
7037 profile: None,
7038 started_at: std::time::Instant::now(),
7039 health_detailed: true,
7040 probes: crate::probe::ProbeState::ready_for_test(),
7041 metrics: crate::middleware::MetricsCollector::new(),
7042 log_levels: crate::actuator::LogLevels::new("info"),
7043 task_registry: crate::actuator::TaskRegistry::new(),
7044 job_registry: crate::actuator::JobRegistry::new(),
7045 config_props: crate::actuator::ConfigProperties::default(),
7046 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7047 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7048 #[cfg(feature = "ws")]
7049 channels: crate::channels::Channels::new(32),
7050 #[cfg(feature = "presence")]
7051 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7052 #[cfg(feature = "ws")]
7053 shutdown: tokio_util::sync::CancellationToken::new(),
7054 policy_registry: crate::authorization::PolicyRegistry::default(),
7055 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7056 auth_session_key: "user_id".to_owned(),
7057 shared_cache: None,
7058 clock: std::sync::Arc::new(crate::time::SystemClock),
7059 };
7060 let router = crate::router::build_router(post_routes, &config, state);
7061
7062 let response = router
7063 .oneshot(
7064 Request::builder()
7065 .method("POST")
7066 .uri("/submit")
7067 .body(Body::empty())
7068 .unwrap(),
7069 )
7070 .await
7071 .unwrap();
7072
7073 assert_eq!(response.status(), StatusCode::OK);
7074 }
7075
7076 #[tokio::test]
7077 async fn build_router_merges_methods_on_same_path() {
7078 let route_list = vec![
7079 Route {
7080 method: http::Method::GET,
7081 path: "/admin",
7082 handler: axum::routing::get(|| async { "list" }),
7083 name: "admin_list",
7084 api_doc: crate::openapi::ApiDoc {
7085 method: "GET",
7086 path: "/admin",
7087 operation_id: "admin_list",
7088 success_status: 200,
7089 ..Default::default()
7090 },
7091 repository: None,
7092 idempotency: crate::route::RouteIdempotency::Direct,
7093 api_version: None,
7094 sunset_opt_out: false,
7095 },
7096 Route {
7097 method: http::Method::POST,
7098 path: "/admin",
7099 handler: axum::routing::post(|| async { "created" }),
7100 name: "create",
7101 api_doc: crate::openapi::ApiDoc {
7102 method: "POST",
7103 path: "/admin",
7104 operation_id: "create",
7105 success_status: 200,
7106 ..Default::default()
7107 },
7108 repository: None,
7109 idempotency: crate::route::RouteIdempotency::Direct,
7110 api_version: None,
7111 sunset_opt_out: false,
7112 },
7113 ];
7114 let config = AutumnConfig::default();
7115 let state = AppState {
7116 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7117 std::collections::HashMap::new(),
7118 )),
7119 #[cfg(feature = "db")]
7120 pool: None,
7121 #[cfg(feature = "db")]
7122 replica_pool: None,
7123 profile: None,
7124 started_at: std::time::Instant::now(),
7125 health_detailed: true,
7126 probes: crate::probe::ProbeState::ready_for_test(),
7127 metrics: crate::middleware::MetricsCollector::new(),
7128 log_levels: crate::actuator::LogLevels::new("info"),
7129 task_registry: crate::actuator::TaskRegistry::new(),
7130 job_registry: crate::actuator::JobRegistry::new(),
7131 config_props: crate::actuator::ConfigProperties::default(),
7132 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7133 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7134 #[cfg(feature = "ws")]
7135 channels: crate::channels::Channels::new(32),
7136 #[cfg(feature = "presence")]
7137 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7138 #[cfg(feature = "ws")]
7139 shutdown: tokio_util::sync::CancellationToken::new(),
7140 policy_registry: crate::authorization::PolicyRegistry::default(),
7141 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7142 auth_session_key: "user_id".to_owned(),
7143 shared_cache: None,
7144 clock: std::sync::Arc::new(crate::time::SystemClock),
7145 };
7146 let router = crate::router::build_router(route_list, &config, state);
7147
7148 let resp = router
7150 .clone()
7151 .oneshot(
7152 Request::builder()
7153 .uri("/admin")
7154 .body(Body::empty())
7155 .unwrap(),
7156 )
7157 .await
7158 .unwrap();
7159 assert_eq!(resp.status(), StatusCode::OK);
7160 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
7161 .await
7162 .unwrap();
7163 assert_eq!(&body[..], b"list");
7164
7165 let resp = router
7167 .oneshot(
7168 Request::builder()
7169 .method("POST")
7170 .uri("/admin")
7171 .body(Body::empty())
7172 .unwrap(),
7173 )
7174 .await
7175 .unwrap();
7176 assert_eq!(resp.status(), StatusCode::OK);
7177 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
7178 .await
7179 .unwrap();
7180 assert_eq!(&body[..], b"created");
7181 }
7182
7183 #[cfg(feature = "htmx")]
7184 #[tokio::test]
7185 async fn htmx_handler_returns_javascript_with_correct_headers() {
7186 let app = axum::Router::new().route(
7187 crate::htmx::HTMX_JS_PATH,
7188 axum::routing::get(crate::router::htmx_handler),
7189 );
7190
7191 let response = app
7192 .oneshot(
7193 Request::builder()
7194 .uri(crate::htmx::HTMX_JS_PATH)
7195 .body(Body::empty())
7196 .unwrap(),
7197 )
7198 .await
7199 .unwrap();
7200
7201 assert_eq!(response.status(), StatusCode::OK);
7202
7203 let content_type = response
7204 .headers()
7205 .get("content-type")
7206 .unwrap()
7207 .to_str()
7208 .unwrap();
7209 assert!(
7210 content_type.contains("application/javascript"),
7211 "Expected application/javascript, got {content_type}"
7212 );
7213
7214 let cache_control = response
7215 .headers()
7216 .get("cache-control")
7217 .unwrap()
7218 .to_str()
7219 .unwrap();
7220 assert!(
7221 cache_control.contains("immutable"),
7222 "Expected immutable cache, got {cache_control}"
7223 );
7224
7225 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7226 .await
7227 .unwrap();
7228
7229 assert_eq!(body.len(), crate::htmx::HTMX_JS.len());
7231
7232 let start = std::str::from_utf8(&body[..50]).expect("htmx should be valid UTF-8");
7234 assert!(
7235 start.contains("htmx") || start.contains("function"),
7236 "Response doesn't look like htmx JavaScript: {start}"
7237 );
7238 }
7239
7240 #[cfg(feature = "htmx")]
7241 #[tokio::test]
7242 async fn htmx_csrf_handler_returns_csp_compatible_javascript() {
7243 let app = axum::Router::new().route(
7244 crate::htmx::HTMX_CSRF_JS_PATH,
7245 axum::routing::get(crate::router::htmx_csrf_handler),
7246 );
7247
7248 let response = app
7249 .oneshot(
7250 Request::builder()
7251 .uri(crate::htmx::HTMX_CSRF_JS_PATH)
7252 .body(Body::empty())
7253 .unwrap(),
7254 )
7255 .await
7256 .unwrap();
7257
7258 assert_eq!(response.status(), StatusCode::OK);
7259 assert_eq!(
7260 response
7261 .headers()
7262 .get("content-type")
7263 .and_then(|value| value.to_str().ok()),
7264 Some("application/javascript")
7265 );
7266
7267 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7268 .await
7269 .unwrap();
7270 let js = std::str::from_utf8(&body).expect("csrf helper should be valid utf-8");
7271
7272 assert!(js.contains("htmx:configRequest"));
7273 assert!(js.contains("X-CSRF-Token"));
7274 assert!(!js.contains("<script"));
7275 }
7276
7277 #[cfg(feature = "htmx")]
7278 #[tokio::test]
7279 async fn build_router_serves_htmx_js() {
7280 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7281
7282 let response = router
7283 .oneshot(
7284 Request::builder()
7285 .uri(crate::htmx::HTMX_JS_PATH)
7286 .body(Body::empty())
7287 .unwrap(),
7288 )
7289 .await
7290 .unwrap();
7291
7292 assert_eq!(response.status(), StatusCode::OK);
7293 let ct = response
7294 .headers()
7295 .get("content-type")
7296 .unwrap()
7297 .to_str()
7298 .unwrap();
7299 assert!(ct.contains("javascript"));
7300 }
7301
7302 #[cfg(feature = "htmx")]
7303 #[tokio::test]
7304 async fn build_router_serves_htmx_csrf_js() {
7305 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7306
7307 let response = router
7308 .oneshot(
7309 Request::builder()
7310 .uri(crate::htmx::HTMX_CSRF_JS_PATH)
7311 .body(Body::empty())
7312 .unwrap(),
7313 )
7314 .await
7315 .unwrap();
7316
7317 assert_eq!(response.status(), StatusCode::OK);
7318 let csp = response
7319 .headers()
7320 .get("content-security-policy")
7321 .expect("framework JS should still receive security headers")
7322 .to_str()
7323 .unwrap();
7324 assert!(csp.contains("script-src 'self'"), "csp = {csp}");
7325 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7326 .await
7327 .unwrap();
7328 let js = std::str::from_utf8(&body).expect("csrf helper should be valid utf-8");
7329 assert!(js.contains("htmx:configRequest"));
7330 assert!(js.contains("X-CSRF-Token"));
7331 }
7332
7333 #[tokio::test]
7334 async fn build_router_serves_default_favicon_without_404() {
7335 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7336
7337 let response = router
7338 .oneshot(
7339 Request::builder()
7340 .uri(crate::router::DEFAULT_FAVICON_PATH)
7341 .body(Body::empty())
7342 .unwrap(),
7343 )
7344 .await
7345 .unwrap();
7346
7347 assert_eq!(response.status(), StatusCode::NO_CONTENT);
7348 assert!(
7349 response.headers().contains_key("content-security-policy"),
7350 "framework fallback responses should still receive security headers"
7351 );
7352 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7353 .await
7354 .unwrap();
7355 assert!(body.is_empty());
7356 }
7357
7358 #[tokio::test]
7359 async fn build_router_does_not_override_user_favicon_route() {
7360 let router = test_router(vec![test_get_route(
7361 crate::router::DEFAULT_FAVICON_PATH,
7362 "favicon",
7363 )]);
7364
7365 let response = router
7366 .oneshot(
7367 Request::builder()
7368 .uri(crate::router::DEFAULT_FAVICON_PATH)
7369 .body(Body::empty())
7370 .unwrap(),
7371 )
7372 .await
7373 .unwrap();
7374
7375 assert_eq!(response.status(), StatusCode::OK);
7376 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7377 .await
7378 .unwrap();
7379 assert_eq!(&body[..], b"ok");
7380 }
7381
7382 #[tokio::test]
7383 async fn build_router_serves_static_files_for_unmatched_paths() {
7384 use std::collections::HashMap;
7385
7386 let tmp = tempfile::tempdir().expect("tempdir");
7388 let dist = tmp.path().join("dist");
7389 std::fs::create_dir_all(dist.join("docs")).expect("mkdir");
7390 std::fs::write(dist.join("docs/index.html"), "<h1>Static Docs</h1>").expect("write");
7391
7392 let manifest = crate::static_gen::StaticManifest {
7393 generated_at: "2026-03-27T00:00:00Z".to_owned(),
7394 autumn_version: "0.2.0".to_owned(),
7395 routes: HashMap::from([(
7396 "/docs".to_owned(),
7397 crate::static_gen::ManifestEntry {
7398 file: "docs/index.html".to_owned(),
7399 revalidate: None,
7400 },
7401 )]),
7402 };
7403 let json = serde_json::to_string(&manifest).expect("serialize");
7404 std::fs::write(dist.join("manifest.json"), json).expect("write manifest");
7405
7406 let config = AutumnConfig::default();
7408 let state = AppState {
7409 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7410 std::collections::HashMap::new(),
7411 )),
7412 #[cfg(feature = "db")]
7413 pool: None,
7414 #[cfg(feature = "db")]
7415 replica_pool: None,
7416 profile: None,
7417 started_at: std::time::Instant::now(),
7418 health_detailed: true,
7419 probes: crate::probe::ProbeState::ready_for_test(),
7420 metrics: crate::middleware::MetricsCollector::new(),
7421 log_levels: crate::actuator::LogLevels::new("info"),
7422 task_registry: crate::actuator::TaskRegistry::new(),
7423 job_registry: crate::actuator::JobRegistry::new(),
7424 config_props: crate::actuator::ConfigProperties::default(),
7425 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7426 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7427 #[cfg(feature = "ws")]
7428 channels: crate::channels::Channels::new(32),
7429 #[cfg(feature = "presence")]
7430 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7431 #[cfg(feature = "ws")]
7432 shutdown: tokio_util::sync::CancellationToken::new(),
7433 policy_registry: crate::authorization::PolicyRegistry::default(),
7434 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7435 auth_session_key: "user_id".to_owned(),
7436 shared_cache: None,
7437 clock: std::sync::Arc::new(crate::time::SystemClock),
7438 };
7439 let router = crate::router::build_router_with_static(
7440 vec![test_get_route("/other", "other_page")],
7441 &config,
7442 state,
7443 Some(dist.as_path()),
7444 );
7445
7446 let response = router
7449 .oneshot(
7450 Request::builder()
7451 .uri("/docs/")
7452 .body(Body::empty())
7453 .unwrap(),
7454 )
7455 .await
7456 .unwrap();
7457
7458 assert_eq!(response.status(), StatusCode::OK);
7459 let csp = response
7460 .headers()
7461 .get("content-security-policy")
7462 .expect("static-first HTML should still receive security headers")
7463 .to_str()
7464 .unwrap();
7465 assert!(csp.contains("script-src 'self'"), "csp = {csp}");
7466 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7467 .await
7468 .unwrap();
7469 assert_eq!(std::str::from_utf8(&body).unwrap(), "<h1>Static Docs</h1>");
7470 }
7471
7472 #[tokio::test]
7473 async fn build_mode_static_rendering_bypasses_startup_barrier() {
7474 temp_env::async_with_vars([("AUTUMN_BUILD_STATIC", Some("1"))], async {
7475 let config = AutumnConfig::default();
7476 let state = AppState::for_test().with_startup_complete(false);
7477 let router = crate::router::build_router(
7478 vec![Route {
7479 method: http::Method::GET,
7480 path: "/about",
7481 handler: axum::routing::get(|| async { "About Page Content" }),
7482 name: "about",
7483 api_doc: crate::openapi::ApiDoc {
7484 method: "GET",
7485 path: "/about",
7486 operation_id: "about",
7487 success_status: 200,
7488 ..Default::default()
7489 },
7490 repository: None,
7491 idempotency: crate::route::RouteIdempotency::Direct,
7492 api_version: None,
7493 sunset_opt_out: false,
7494 }],
7495 &config,
7496 state,
7497 );
7498 let tmp = tempfile::tempdir().unwrap();
7499 let dist = tmp.path().join("dist");
7500
7501 let result = crate::static_gen::render_static_routes(
7502 router,
7503 &[crate::static_gen::StaticRouteMeta {
7504 path: "/about",
7505 name: "about",
7506 revalidate: None,
7507 params_fn: None,
7508 }],
7509 &dist,
7510 )
7511 .await;
7512
7513 assert!(result.is_ok(), "build failed: {:?}", result.err());
7514 let html = std::fs::read_to_string(dist.join("about/index.html")).unwrap();
7515 assert_eq!(html, "About Page Content");
7516 })
7517 .await;
7518 }
7519
7520 #[tokio::test]
7521 async fn build_router_injects_live_reload_script_when_enabled() {
7522 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
7523 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
7524 temp_env::async_with_vars(
7525 [
7526 ("AUTUMN_DEV_RELOAD", Some("1")),
7527 (
7528 "AUTUMN_DEV_RELOAD_STATE",
7529 Some(reload_file.path().to_str().expect("utf-8 path")),
7530 ),
7531 ],
7532 async {
7533 let router = test_router(vec![Route {
7534 method: http::Method::GET,
7535 path: "/page",
7536 handler: axum::routing::get(|| async {
7537 axum::response::Html("<html><body><main>ok</main></body></html>")
7538 }),
7539 name: "page",
7540 api_doc: crate::openapi::ApiDoc {
7541 method: "GET",
7542 path: "/page",
7543 operation_id: "page",
7544 success_status: 200,
7545 ..Default::default()
7546 },
7547 repository: None,
7548 idempotency: crate::route::RouteIdempotency::Direct,
7549 api_version: None,
7550 sunset_opt_out: false,
7551 }]);
7552
7553 let response = router
7554 .oneshot(Request::builder().uri("/page").body(Body::empty()).unwrap())
7555 .await
7556 .unwrap();
7557
7558 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7559 .await
7560 .unwrap();
7561 let html = std::str::from_utf8(&body).expect("utf-8");
7562 assert!(html.contains("/__autumn/live-reload"));
7563 },
7564 )
7565 .await;
7566 }
7567
7568 #[tokio::test]
7569 async fn build_router_mounts_dev_reload_script_endpoint_when_enabled() {
7570 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
7575 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
7576 temp_env::async_with_vars(
7577 [
7578 ("AUTUMN_DEV_RELOAD", Some("1")),
7579 (
7580 "AUTUMN_DEV_RELOAD_STATE",
7581 Some(reload_file.path().to_str().expect("utf-8 path")),
7582 ),
7583 ],
7584 async {
7585 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7586
7587 let response = router
7588 .oneshot(
7589 Request::builder()
7590 .uri("/__autumn/live-reload.js")
7591 .body(Body::empty())
7592 .unwrap(),
7593 )
7594 .await
7595 .unwrap();
7596
7597 assert_eq!(response.status(), StatusCode::OK);
7598 assert_eq!(
7599 response
7600 .headers()
7601 .get("content-type")
7602 .and_then(|v| v.to_str().ok()),
7603 Some("application/javascript; charset=utf-8")
7604 );
7605 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7606 .await
7607 .unwrap();
7608 let js = std::str::from_utf8(&body).expect("utf-8");
7609 assert!(js.contains("fetch("), "js body: {js}");
7610 },
7611 )
7612 .await;
7613 }
7614
7615 #[tokio::test]
7616 async fn build_router_mounts_dev_reload_endpoint_when_enabled() {
7617 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
7618 std::fs::write(reload_file.path(), r#"{"version":7,"kind":"css"}"#).expect("write");
7619 temp_env::async_with_vars(
7620 [
7621 ("AUTUMN_DEV_RELOAD", Some("1")),
7622 (
7623 "AUTUMN_DEV_RELOAD_STATE",
7624 Some(reload_file.path().to_str().expect("utf-8 path")),
7625 ),
7626 ],
7627 async {
7628 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7629
7630 let response = router
7631 .oneshot(
7632 Request::builder()
7633 .uri("/__autumn/live-reload")
7634 .body(Body::empty())
7635 .unwrap(),
7636 )
7637 .await
7638 .unwrap();
7639
7640 assert_eq!(response.status(), StatusCode::OK);
7641 assert_eq!(
7642 response.headers().get("cache-control").unwrap(),
7643 "no-store, no-cache, must-revalidate"
7644 );
7645 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
7646 .await
7647 .unwrap();
7648 assert_eq!(&body[..], br#"{"version":7,"kind":"css"}"#);
7649 },
7650 )
7651 .await;
7652 }
7653
7654 #[tokio::test]
7655 async fn build_router_disables_cache_for_static_assets_in_dev_reload_mode() {
7656 let project = tempfile::tempdir().expect("project dir");
7657 let static_dir = project.path().join("static");
7658 std::fs::create_dir_all(&static_dir).expect("mkdir");
7659 std::fs::write(static_dir.join("demo.txt"), "hello").expect("write static file");
7660 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
7661 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
7662 temp_env::async_with_vars(
7663 [
7664 (
7665 "AUTUMN_MANIFEST_DIR",
7666 Some(project.path().to_str().expect("utf-8 path")),
7667 ),
7668 ("AUTUMN_DEV_RELOAD", Some("1")),
7669 (
7670 "AUTUMN_DEV_RELOAD_STATE",
7671 Some(reload_file.path().to_str().expect("utf-8 path")),
7672 ),
7673 ],
7674 async {
7675 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
7676
7677 let response = router
7678 .oneshot(
7679 Request::builder()
7680 .uri("/static/demo.txt")
7681 .body(Body::empty())
7682 .unwrap(),
7683 )
7684 .await
7685 .unwrap();
7686
7687 assert_eq!(response.status(), StatusCode::OK);
7688 assert_eq!(
7689 response.headers().get("cache-control").unwrap(),
7690 "no-store, no-cache, must-revalidate"
7691 );
7692 },
7693 )
7694 .await;
7695 }
7696
7697 #[test]
7698 fn app_builder_accepts_static_routes() {
7699 use crate::static_gen::StaticRouteMeta;
7700 let metas = vec![StaticRouteMeta {
7701 path: "/about",
7702 name: "about",
7703 revalidate: None,
7704 params_fn: None,
7705 }];
7706 let builder = app().static_routes(metas);
7707 assert_eq!(builder.static_metas.len(), 1);
7708 }
7709
7710 #[test]
7711 fn project_dir_defaults_to_subdir() {
7712 let env = crate::config::MockEnv::new();
7715 let dir = super::project_dir("dist", &env);
7716 assert_eq!(dir, std::path::PathBuf::from("dist"));
7717 }
7718
7719 pub fn test_router_with_config(routes: Vec<Route>, config: &AutumnConfig) -> axum::Router {
7721 let state = AppState {
7722 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7723 std::collections::HashMap::new(),
7724 )),
7725 #[cfg(feature = "db")]
7726 pool: None,
7727 #[cfg(feature = "db")]
7728 replica_pool: None,
7729 profile: None,
7730 started_at: std::time::Instant::now(),
7731 health_detailed: true,
7732 probes: crate::probe::ProbeState::ready_for_test(),
7733 metrics: crate::middleware::MetricsCollector::new(),
7734 log_levels: crate::actuator::LogLevels::new("info"),
7735 task_registry: crate::actuator::TaskRegistry::new(),
7736 job_registry: crate::actuator::JobRegistry::new(),
7737 config_props: crate::actuator::ConfigProperties::default(),
7738 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7739 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7740 #[cfg(feature = "ws")]
7741 channels: crate::channels::Channels::new(32),
7742 #[cfg(feature = "presence")]
7743 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7744 #[cfg(feature = "ws")]
7745 shutdown: tokio_util::sync::CancellationToken::new(),
7746 policy_registry: crate::authorization::PolicyRegistry::default(),
7747 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7748 auth_session_key: "user_id".to_owned(),
7749 shared_cache: None,
7750 clock: std::sync::Arc::new(crate::time::SystemClock),
7751 };
7752 crate::router::build_router(routes, config, state)
7753 }
7754
7755 #[tokio::test]
7756 async fn cors_wildcard_allows_any_origin() {
7757 let mut config = AutumnConfig::default();
7758 config.cors.allowed_origins = vec!["*".to_owned()];
7759 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
7760
7761 let response = router
7762 .oneshot(
7763 Request::builder()
7764 .uri("/test")
7765 .header("Origin", "https://example.com")
7766 .body(Body::empty())
7767 .unwrap(),
7768 )
7769 .await
7770 .unwrap();
7771
7772 assert_eq!(response.status(), StatusCode::OK);
7773 assert_eq!(
7774 response
7775 .headers()
7776 .get("access-control-allow-origin")
7777 .unwrap(),
7778 "*"
7779 );
7780 }
7781
7782 #[tokio::test]
7783 async fn cors_specific_origin_reflected() {
7784 let mut config = AutumnConfig::default();
7785 config.cors.allowed_origins = vec!["https://example.com".to_owned()];
7786 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
7787
7788 let response = router
7789 .oneshot(
7790 Request::builder()
7791 .uri("/test")
7792 .header("Origin", "https://example.com")
7793 .body(Body::empty())
7794 .unwrap(),
7795 )
7796 .await
7797 .unwrap();
7798
7799 assert_eq!(response.status(), StatusCode::OK);
7800 assert_eq!(
7801 response
7802 .headers()
7803 .get("access-control-allow-origin")
7804 .unwrap(),
7805 "https://example.com"
7806 );
7807 }
7808
7809 #[tokio::test]
7810 async fn cors_disabled_when_no_origins() {
7811 let config = AutumnConfig::default();
7812 assert!(config.cors.allowed_origins.is_empty());
7813 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
7814
7815 let response = router
7816 .oneshot(
7817 Request::builder()
7818 .uri("/test")
7819 .header("Origin", "https://example.com")
7820 .body(Body::empty())
7821 .unwrap(),
7822 )
7823 .await
7824 .unwrap();
7825
7826 assert_eq!(response.status(), StatusCode::OK);
7827 assert!(
7828 response
7829 .headers()
7830 .get("access-control-allow-origin")
7831 .is_none()
7832 );
7833 }
7834
7835 #[tokio::test]
7836 async fn cors_preflight_returns_204() {
7837 let mut config = AutumnConfig::default();
7838 config.cors.allowed_origins = vec!["https://example.com".to_owned()];
7839 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
7840
7841 let response = router
7842 .oneshot(
7843 Request::builder()
7844 .method("OPTIONS")
7845 .uri("/test")
7846 .header("Origin", "https://example.com")
7847 .header("Access-Control-Request-Method", "GET")
7848 .body(Body::empty())
7849 .unwrap(),
7850 )
7851 .await
7852 .unwrap();
7853
7854 assert_eq!(response.status(), StatusCode::OK);
7855 assert!(
7856 response
7857 .headers()
7858 .contains_key("access-control-allow-methods")
7859 );
7860 }
7861
7862 #[tokio::test]
7863 async fn build_router_with_static_skips_without_manifest() {
7864 let tmp = tempfile::tempdir().expect("tempdir");
7867 let dist = tmp.path().join("dist");
7868 std::fs::create_dir_all(&dist).expect("mkdir");
7869 let config = AutumnConfig::default();
7872 let state = AppState {
7873 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7874 std::collections::HashMap::new(),
7875 )),
7876 #[cfg(feature = "db")]
7877 pool: None,
7878 #[cfg(feature = "db")]
7879 replica_pool: None,
7880 profile: None,
7881 started_at: std::time::Instant::now(),
7882 health_detailed: true,
7883 probes: crate::probe::ProbeState::ready_for_test(),
7884 metrics: crate::middleware::MetricsCollector::new(),
7885 log_levels: crate::actuator::LogLevels::new("info"),
7886 task_registry: crate::actuator::TaskRegistry::new(),
7887 job_registry: crate::actuator::JobRegistry::new(),
7888 config_props: crate::actuator::ConfigProperties::default(),
7889 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7890 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7891 #[cfg(feature = "ws")]
7892 channels: crate::channels::Channels::new(32),
7893 #[cfg(feature = "presence")]
7894 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7895 #[cfg(feature = "ws")]
7896 shutdown: tokio_util::sync::CancellationToken::new(),
7897 policy_registry: crate::authorization::PolicyRegistry::default(),
7898 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7899 auth_session_key: "user_id".to_owned(),
7900 shared_cache: None,
7901 clock: std::sync::Arc::new(crate::time::SystemClock),
7902 };
7903 let router = crate::router::build_router_with_static(
7904 vec![test_get_route("/test", "test")],
7905 &config,
7906 state,
7907 Some(dist.as_path()),
7908 );
7909
7910 let response = router
7911 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
7912 .await
7913 .unwrap();
7914 assert_eq!(response.status(), StatusCode::OK);
7915 }
7916
7917 #[tokio::test]
7918 async fn build_router_with_static_none_dist() {
7919 let config = AutumnConfig::default();
7921 let state = AppState {
7922 extensions: std::sync::Arc::new(std::sync::RwLock::new(
7923 std::collections::HashMap::new(),
7924 )),
7925 #[cfg(feature = "db")]
7926 pool: None,
7927 #[cfg(feature = "db")]
7928 replica_pool: None,
7929 profile: None,
7930 started_at: std::time::Instant::now(),
7931 health_detailed: true,
7932 probes: crate::probe::ProbeState::ready_for_test(),
7933 metrics: crate::middleware::MetricsCollector::new(),
7934 log_levels: crate::actuator::LogLevels::new("info"),
7935 task_registry: crate::actuator::TaskRegistry::new(),
7936 job_registry: crate::actuator::JobRegistry::new(),
7937 config_props: crate::actuator::ConfigProperties::default(),
7938 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
7939 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
7940 #[cfg(feature = "ws")]
7941 channels: crate::channels::Channels::new(32),
7942 #[cfg(feature = "presence")]
7943 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
7944 #[cfg(feature = "ws")]
7945 shutdown: tokio_util::sync::CancellationToken::new(),
7946 policy_registry: crate::authorization::PolicyRegistry::default(),
7947 forbidden_response: crate::authorization::ForbiddenResponse::default(),
7948 auth_session_key: "user_id".to_owned(),
7949 shared_cache: None,
7950 clock: std::sync::Arc::new(crate::time::SystemClock),
7951 };
7952 let router = crate::router::build_router_with_static(
7953 vec![test_get_route("/test", "test")],
7954 &config,
7955 state,
7956 None,
7957 );
7958
7959 let response = router
7960 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
7961 .await
7962 .unwrap();
7963 assert_eq!(response.status(), StatusCode::OK);
7964 }
7965
7966 #[test]
7969 fn format_route_lines_lists_user_routes() {
7970 let routes = vec![
7971 test_get_route("/", "index"),
7972 test_get_route("/users/{id}", "get_user"),
7973 ];
7974 let config = AutumnConfig::default();
7975 let output = format_route_lines(&routes, &[], &config);
7976 assert!(output.contains("-> index"));
7977 assert!(output.contains("/ GET"));
7978 assert!(output.contains("/users/{id}"));
7979 assert!(output.contains("-> get_user"));
7980 }
7981
7982 #[test]
7983 fn config_runtime_drift_format_route_lines_uses_actuator_prefix() {
7984 let mut config = AutumnConfig::default();
7985 config.actuator.prefix = "/ops".to_owned();
7986 let output = format_route_lines(&[], &[], &config);
7987 assert!(output.contains("-> health"));
7988 assert!(output.contains("/ops/*"));
7989 }
7990
7991 #[test]
7992 fn format_task_lines_none_when_empty() {
7993 assert!(format_task_lines(&[]).is_none());
7994 }
7995
7996 #[test]
7997 fn format_task_lines_fixed_delay() {
7998 let tasks = vec![crate::task::TaskInfo {
7999 name: "cleanup".into(),
8000 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_secs(300)),
8001 coordination: crate::task::TaskCoordination::Fleet,
8002 handler: |_| Box::pin(async { Ok(()) }),
8003 }];
8004 let output = format_task_lines(&tasks).unwrap();
8005 assert!(output.contains("cleanup (every 300s)"));
8006 }
8007
8008 #[test]
8009 fn format_task_lines_cron() {
8010 let tasks = vec![crate::task::TaskInfo {
8011 name: "nightly".into(),
8012 schedule: crate::task::Schedule::Cron {
8013 expression: "0 0 * * *".into(),
8014 timezone: None,
8015 },
8016 coordination: crate::task::TaskCoordination::Fleet,
8017 handler: |_| Box::pin(async { Ok(()) }),
8018 }];
8019 let output = format_task_lines(&tasks).unwrap();
8020 assert!(output.contains("nightly (cron 0 0 * * *)"));
8021 }
8022
8023 #[test]
8024 fn format_middleware_list_default() {
8025 let config = AutumnConfig::default();
8026 let output = format_middleware_list(&config);
8027 assert!(output.contains("RequestId"));
8028 assert!(output.contains("SecurityHeaders"));
8029 assert!(output.contains("Session (in-memory)"));
8030 assert!(output.contains("Metrics"));
8031 assert!(!output.contains("CORS"));
8033 assert!(!output.contains("CSRF"));
8034 }
8035
8036 #[test]
8037 fn format_middleware_list_with_cors_and_csrf() {
8038 let config = AutumnConfig {
8039 cors: crate::config::CorsConfig {
8040 allowed_origins: vec!["https://example.com".into()],
8041 ..crate::config::CorsConfig::default()
8042 },
8043 security: crate::security::config::SecurityConfig {
8044 csrf: crate::security::config::CsrfConfig {
8045 enabled: true,
8046 ..crate::security::config::CsrfConfig::default()
8047 },
8048 ..crate::security::config::SecurityConfig::default()
8049 },
8050 ..AutumnConfig::default()
8051 };
8052 let output = format_middleware_list(&config);
8053 assert!(output.contains("CORS"));
8054 assert!(output.contains("CSRF"));
8055 }
8056
8057 #[test]
8058 fn mask_database_url_with_password() {
8059 let masked = mask_database_url("postgres://user:secret@localhost:5432/mydb", 10);
8060 assert!(masked.contains("****"));
8061 assert!(!masked.contains("secret"));
8062 assert!(masked.contains("postgres://user:****@localhost:5432/mydb"));
8063 assert!(masked.contains("pool_size=10"));
8064 }
8065
8066 #[test]
8067 fn mask_database_url_without_password() {
8068 let masked = mask_database_url("postgres://localhost/mydb", 5);
8069 assert!(!masked.contains("****"));
8070 assert!(masked.contains("postgres://localhost/mydb"));
8071 assert!(masked.contains("pool_size=5"));
8072 }
8073
8074 #[test]
8075 fn mask_database_url_edge_cases() {
8076 let masked2 = mask_database_url("postgres://user:p%40ssw%3Ard%21@localhost:5432/mydb", 10);
8083 assert!(masked2.contains("****"));
8084 assert!(!masked2.contains("p%40ssw%3Ard%21"));
8085 assert!(masked2.contains("postgres://user:****@localhost:5432/mydb"));
8086
8087 let masked3 = mask_database_url("postgres://:secret@localhost:5432/mydb", 10);
8089 assert!(masked3.contains("****"));
8090 assert!(!masked3.contains("secret"));
8091 assert!(masked3.contains("postgres://:****@localhost:5432/mydb"));
8092 }
8093 #[test]
8094 fn mask_database_url_invalid_url_fallback() {
8095 let masked = mask_database_url("this is completely invalid as a URL with supersecret", 10);
8096 assert!(masked.contains("****"));
8097 assert!(!masked.contains("supersecret"));
8098 assert!(masked.contains("pool_size=10"));
8099 }
8100
8101 #[test]
8102 fn format_config_summary_defaults() {
8103 let config = AutumnConfig::default();
8104 let output = format_config_summary(&config);
8105 assert!(output.contains("profile: none"));
8106 assert!(output.contains("server: 127.0.0.1:3000"));
8107 assert!(output.contains("database: not configured"));
8108 assert!(output.contains("log_level:"));
8109 assert!(output.contains("telemetry: disabled"));
8110 assert!(output.contains("health: /health"));
8111 }
8112
8113 #[test]
8114 fn format_config_summary_with_db() {
8115 let config = AutumnConfig {
8116 database: crate::config::DatabaseConfig {
8117 url: Some("postgres://user:pass@host/db".into()),
8118 pool_size: 20,
8119 ..crate::config::DatabaseConfig::default()
8120 },
8121 ..AutumnConfig::default()
8122 };
8123 let output = format_config_summary(&config);
8124 assert!(output.contains("user:****@host/db"));
8125 assert!(output.contains("pool_size=20"));
8126 assert!(!output.contains("pass"));
8127 }
8128
8129 #[test]
8130 fn format_config_summary_with_profile() {
8131 let config = AutumnConfig {
8132 profile: Some("prod".into()),
8133 ..AutumnConfig::default()
8134 };
8135 let output = format_config_summary(&config);
8136 assert!(output.contains("profile: prod"));
8137 }
8138
8139 #[test]
8140 fn format_config_summary_with_telemetry() {
8141 let config = AutumnConfig {
8142 telemetry: crate::config::TelemetryConfig {
8143 enabled: true,
8144 service_name: "orders-api".into(),
8145 otlp_endpoint: Some("http://otel-collector:4317".into()),
8146 ..crate::config::TelemetryConfig::default()
8147 },
8148 ..AutumnConfig::default()
8149 };
8150 let output = format_config_summary(&config);
8151 assert!(output.contains("telemetry: Grpc -> http://otel-collector:4317"));
8152 }
8153
8154 #[test]
8155 fn log_startup_transparency_runs_without_panic() {
8156 let routes = vec![test_get_route("/", "index")];
8160 let tasks = vec![crate::task::TaskInfo {
8161 name: "cleanup".into(),
8162 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_secs(60)),
8163 coordination: crate::task::TaskCoordination::Fleet,
8164 handler: |_| Box::pin(async { Ok(()) }),
8165 }];
8166 let config = AutumnConfig::default();
8167 log_startup_transparency(&routes, &tasks, &[], &config);
8168 }
8169
8170 #[test]
8171 fn log_startup_transparency_no_tasks() {
8172 let routes = vec![test_get_route("/health", "check")];
8173 let config = AutumnConfig::default();
8174 log_startup_transparency(&routes, &[], &[], &config);
8175 }
8176
8177 #[cfg(feature = "ws")]
8178 #[tokio::test]
8179 async fn start_task_scheduler_broadcasts_events() {
8180 let state = AppState {
8181 extensions: std::sync::Arc::new(std::sync::RwLock::new(
8182 std::collections::HashMap::new(),
8183 )),
8184 #[cfg(feature = "db")]
8185 pool: None,
8186 #[cfg(feature = "db")]
8187 replica_pool: None,
8188 profile: None,
8189 started_at: std::time::Instant::now(),
8190 health_detailed: true,
8191 probes: crate::probe::ProbeState::ready_for_test(),
8192 metrics: crate::middleware::MetricsCollector::new(),
8193 log_levels: crate::actuator::LogLevels::new("info"),
8194 task_registry: crate::actuator::TaskRegistry::new(),
8195 job_registry: crate::actuator::JobRegistry::new(),
8196 config_props: crate::actuator::ConfigProperties::default(),
8197 channels: crate::channels::Channels::new(32),
8198 #[cfg(feature = "presence")]
8199 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
8200 shutdown: tokio_util::sync::CancellationToken::new(),
8201 policy_registry: crate::authorization::PolicyRegistry::default(),
8202 forbidden_response: crate::authorization::ForbiddenResponse::default(),
8203 auth_session_key: "user_id".to_owned(),
8204 shared_cache: None,
8205 clock: std::sync::Arc::new(crate::time::SystemClock),
8206 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
8207 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
8208 };
8209
8210 let mut rx = state.channels().subscribe("sys:tasks");
8211
8212 let task = crate::task::TaskInfo {
8213 name: "test_broadcaster".into(),
8214 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_millis(1)),
8216 coordination: crate::task::TaskCoordination::Fleet,
8217 handler: |_| Box::pin(async { Ok(()) }),
8218 };
8219
8220 let state_clone = state.clone();
8222 tokio::spawn(async move {
8223 super::start_task_scheduler(
8224 vec![task],
8225 &state_clone,
8226 &tokio_util::sync::CancellationToken::new(),
8227 );
8228 });
8229
8230 let msg1 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
8232 .await
8233 .expect("timeout waiting for start event")
8234 .expect("channel closed");
8235 let json1: serde_json::Value = serde_json::from_str(msg1.as_str()).unwrap();
8236 assert_eq!(json1["event"], "started");
8237 assert_eq!(json1["task"], "test_broadcaster");
8238
8239 let msg2 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
8241 .await
8242 .expect("timeout waiting for success event")
8243 .expect("channel closed");
8244 let json2: serde_json::Value = serde_json::from_str(msg2.as_str()).unwrap();
8245 assert_eq!(json2["event"], "success");
8246 assert_eq!(json2["task"], "test_broadcaster");
8247 assert!(json2.get("duration_ms").is_some());
8248 }
8249
8250 #[cfg(feature = "ws")]
8251 #[tokio::test]
8252 async fn start_task_scheduler_broadcasts_failure_events() {
8253 let state = AppState {
8254 extensions: std::sync::Arc::new(std::sync::RwLock::new(
8255 std::collections::HashMap::new(),
8256 )),
8257 #[cfg(feature = "db")]
8258 pool: None,
8259 #[cfg(feature = "db")]
8260 replica_pool: None,
8261 profile: None,
8262 started_at: std::time::Instant::now(),
8263 health_detailed: true,
8264 probes: crate::probe::ProbeState::ready_for_test(),
8265 metrics: crate::middleware::MetricsCollector::new(),
8266 log_levels: crate::actuator::LogLevels::new("info"),
8267 task_registry: crate::actuator::TaskRegistry::new(),
8268 job_registry: crate::actuator::JobRegistry::new(),
8269 config_props: crate::actuator::ConfigProperties::default(),
8270 channels: crate::channels::Channels::new(32),
8271 #[cfg(feature = "presence")]
8272 presence: crate::presence::Presence::new(crate::channels::Channels::new(32)),
8273 shutdown: tokio_util::sync::CancellationToken::new(),
8274 policy_registry: crate::authorization::PolicyRegistry::default(),
8275 forbidden_response: crate::authorization::ForbiddenResponse::default(),
8276 auth_session_key: "user_id".to_owned(),
8277 shared_cache: None,
8278 clock: std::sync::Arc::new(crate::time::SystemClock),
8279 metrics_source_registry: crate::actuator::MetricsSourceRegistry::new(),
8280 health_indicator_registry: crate::actuator::HealthIndicatorRegistry::new(),
8281 };
8282
8283 let mut rx = state.channels().subscribe("sys:tasks");
8284
8285 let task = crate::task::TaskInfo {
8286 name: "test_failing_task".into(),
8287 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_millis(1)),
8288 coordination: crate::task::TaskCoordination::Fleet,
8289 handler: |_| {
8290 Box::pin(async { Err(crate::AutumnError::bad_request_msg("forced error")) })
8291 },
8292 };
8293
8294 let state_clone = state.clone();
8295 tokio::spawn(async move {
8296 super::start_task_scheduler(
8297 vec![task],
8298 &state_clone,
8299 &tokio_util::sync::CancellationToken::new(),
8300 );
8301 });
8302
8303 let _ = rx.recv().await.unwrap();
8305
8306 let msg2 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
8308 .await
8309 .expect("timeout waiting for failure event")
8310 .expect("channel closed");
8311 let json2: serde_json::Value = serde_json::from_str(msg2.as_str()).unwrap();
8312 assert_eq!(json2["event"], "failure");
8313 assert_eq!(json2["task"], "test_failing_task");
8314 assert_eq!(json2["error"], "forced error");
8315 }
8316
8317 #[tokio::test]
8318 async fn execute_task_result_ok_returns_duration() {
8319 let state = AppState::for_test();
8320 let handler: crate::task::TaskHandler = |_| Box::pin(async { Ok(()) });
8321 let start = std::time::Instant::now();
8322 let result =
8323 super::execute_task_result(&state, handler, start, "test_task", "fixed_delay").await;
8324 assert!(result.is_ok(), "expected Ok from successful handler");
8325 assert!(result.unwrap() < u64::MAX);
8327 }
8328
8329 #[tokio::test]
8330 async fn execute_task_result_err_returns_duration_and_message() {
8331 let state = AppState::for_test();
8332 let handler: crate::task::TaskHandler =
8333 |_| Box::pin(async { Err(crate::AutumnError::bad_request_msg("test error")) });
8334 let start = std::time::Instant::now();
8335 let result =
8336 super::execute_task_result(&state, handler, start, "test_task", "fixed_delay").await;
8337 assert!(result.is_err(), "expected Err from failing handler");
8338 let (duration_ms, msg) = result.unwrap_err();
8339 assert!(duration_ms < u64::MAX);
8340 assert!(msg.contains("test error"));
8341 }
8342
8343 fn instantly_panicking_scheduled_handler(
8344 _state: AppState,
8345 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::AutumnResult<()>> + Send>> {
8346 panic!("panic before scheduled future")
8347 }
8348
8349 #[tokio::test]
8350 async fn execute_task_result_reports_immediate_handler_panics() {
8351 let state = AppState::for_test();
8352 let start = std::time::Instant::now();
8353 let result = super::execute_task_result(
8354 &state,
8355 instantly_panicking_scheduled_handler,
8356 start,
8357 "test_task",
8358 "fixed_delay",
8359 )
8360 .await;
8361
8362 let (duration_ms, msg) = result.expect_err("expected Err from panicking handler");
8363 assert!(duration_ms < u64::MAX);
8364 assert!(msg.contains("scheduled task handler panicked: panic before scheduled future"));
8365 }
8366
8367 #[tokio::test]
8368 async fn execute_fixed_delay_task_does_not_timeout_in_process_runs() {
8369 let state = AppState::for_test();
8370 state.task_registry.register_scheduled(
8371 "slow_task",
8372 "every 1s",
8373 crate::task::TaskCoordination::Fleet,
8374 "in_process",
8375 "replica-a",
8376 );
8377 let handler: crate::task::TaskHandler = |_| {
8378 Box::pin(async {
8379 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
8380 Ok(())
8381 })
8382 };
8383 let coordinator = std::sync::Arc::new(
8384 crate::scheduler::InProcessSchedulerCoordinator::new("replica-a"),
8385 );
8386
8387 super::execute_fixed_delay_task(
8388 "slow_task".to_owned(),
8389 state.clone(),
8390 handler,
8391 std::time::Duration::from_secs(1),
8392 crate::task::TaskCoordination::Fleet,
8393 coordinator,
8394 std::time::Duration::from_millis(10),
8395 )
8396 .await;
8397
8398 let snapshot = state.task_registry.snapshot();
8399 let status = &snapshot["slow_task"];
8400 assert_eq!(status.status, "idle");
8401 assert_eq!(status.last_result.as_deref(), Some("ok"));
8402 assert_eq!(status.total_runs, 1);
8403 assert_eq!(status.total_failures, 0);
8404 assert!(status.last_error.is_none());
8405 }
8406
8407 static SKIPPED_LEASE_HANDLER_CALLS: AtomicUsize = AtomicUsize::new(0);
8408
8409 struct DenyingSchedulerCoordinator;
8410
8411 impl crate::scheduler::SchedulerCoordinator for DenyingSchedulerCoordinator {
8412 fn backend(&self) -> &'static str {
8413 "postgres"
8414 }
8415
8416 fn replica_id(&self) -> &'static str {
8417 "replica-a"
8418 }
8419
8420 fn try_acquire<'a>(
8421 &'a self,
8422 _task_name: &'a str,
8423 _tick_key: &'a str,
8424 _coordination: crate::task::TaskCoordination,
8425 ) -> crate::scheduler::SchedulerFuture<
8426 'a,
8427 crate::AutumnResult<Option<crate::scheduler::SchedulerLease>>,
8428 > {
8429 Box::pin(async { Ok(None) })
8430 }
8431 }
8432
8433 struct GrantingSchedulerCoordinator {
8434 backend: &'static str,
8435 tick_keys: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
8436 release_count: Option<std::sync::Arc<AtomicUsize>>,
8437 }
8438
8439 impl crate::scheduler::SchedulerCoordinator for GrantingSchedulerCoordinator {
8440 fn backend(&self) -> &'static str {
8441 self.backend
8442 }
8443
8444 fn replica_id(&self) -> &'static str {
8445 "replica-a"
8446 }
8447
8448 fn try_acquire<'a>(
8449 &'a self,
8450 _task_name: &'a str,
8451 tick_key: &'a str,
8452 _coordination: crate::task::TaskCoordination,
8453 ) -> crate::scheduler::SchedulerFuture<
8454 'a,
8455 crate::AutumnResult<Option<crate::scheduler::SchedulerLease>>,
8456 > {
8457 Box::pin(async move {
8458 self.tick_keys.lock().unwrap().push(tick_key.to_owned());
8459 let lease = self.release_count.as_ref().map_or_else(
8460 || crate::scheduler::SchedulerLease::local(self.backend, "replica-a"),
8461 |release_count| {
8462 crate::scheduler::SchedulerLease::tracked(
8463 self.backend,
8464 "replica-a",
8465 std::sync::Arc::clone(release_count),
8466 )
8467 },
8468 );
8469 Ok(Some(lease))
8470 })
8471 }
8472 }
8473
8474 fn counted_scheduled_handler(
8475 _state: AppState,
8476 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::AutumnResult<()>> + Send>> {
8477 Box::pin(async {
8478 SKIPPED_LEASE_HANDLER_CALLS.fetch_add(1, Ordering::SeqCst);
8479 Ok(())
8480 })
8481 }
8482
8483 #[tokio::test]
8484 async fn execute_fixed_delay_task_skips_handler_when_lease_is_not_acquired() {
8485 SKIPPED_LEASE_HANDLER_CALLS.store(0, Ordering::SeqCst);
8486 let state = AppState::for_test();
8487 state.task_registry.register_scheduled(
8488 "claimed_elsewhere",
8489 "every 1s",
8490 crate::task::TaskCoordination::Fleet,
8491 "postgres",
8492 "replica-a",
8493 );
8494 let coordinator = std::sync::Arc::new(DenyingSchedulerCoordinator);
8495
8496 super::execute_fixed_delay_task(
8497 "claimed_elsewhere".to_owned(),
8498 state.clone(),
8499 counted_scheduled_handler,
8500 std::time::Duration::from_secs(1),
8501 crate::task::TaskCoordination::Fleet,
8502 coordinator,
8503 std::time::Duration::from_secs(1),
8504 )
8505 .await;
8506
8507 let snapshot = state.task_registry.snapshot();
8508 let status = &snapshot["claimed_elsewhere"];
8509 assert_eq!(SKIPPED_LEASE_HANDLER_CALLS.load(Ordering::SeqCst), 0);
8510 assert_eq!(status.total_runs, 0);
8511 assert!(status.current_leader.is_none());
8512 assert!(status.last_tick.is_none());
8513 }
8514
8515 #[tokio::test]
8516 async fn execute_fixed_delay_task_records_distributed_lease_ttl_timeout() {
8517 let state = AppState::for_test();
8518 state.task_registry.register_scheduled(
8519 "slow_distributed_task",
8520 "every 1s",
8521 crate::task::TaskCoordination::Fleet,
8522 "postgres",
8523 "replica-a",
8524 );
8525 let handler: crate::task::TaskHandler = |_| {
8526 Box::pin(async {
8527 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
8528 Ok(())
8529 })
8530 };
8531 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
8532 backend: "postgres",
8533 tick_keys: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
8534 release_count: None,
8535 });
8536
8537 super::execute_fixed_delay_task(
8538 "slow_distributed_task".to_owned(),
8539 state.clone(),
8540 handler,
8541 std::time::Duration::from_secs(1),
8542 crate::task::TaskCoordination::Fleet,
8543 coordinator,
8544 std::time::Duration::from_millis(10),
8545 )
8546 .await;
8547
8548 let snapshot = state.task_registry.snapshot();
8549 let status = &snapshot["slow_distributed_task"];
8550 assert_eq!(status.status, "idle");
8551 assert_eq!(status.last_result.as_deref(), Some("failed"));
8552 assert_eq!(status.total_runs, 1);
8553 assert_eq!(status.total_failures, 1);
8554 assert!(
8555 status
8556 .last_error
8557 .as_deref()
8558 .is_some_and(|error| error.contains("lease TTL"))
8559 );
8560 }
8561
8562 #[tokio::test]
8563 async fn execute_cron_task_uses_scheduled_occurrence_for_tick_key() {
8564 let state = AppState::for_test();
8565 state.task_registry.register_scheduled(
8566 "cron_review_task",
8567 "cron */10 * * * * *",
8568 crate::task::TaskCoordination::Fleet,
8569 "postgres",
8570 "replica-a",
8571 );
8572 let tick_keys = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
8573 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
8574 backend: "postgres",
8575 tick_keys: std::sync::Arc::clone(&tick_keys),
8576 release_count: None,
8577 });
8578 let handler: crate::task::TaskHandler = |_| Box::pin(async { Ok(()) });
8579 let scheduled_unix_secs = 1_700_000_000;
8580
8581 super::execute_cron_task(
8582 "cron_review_task".to_owned(),
8583 state.clone(),
8584 handler,
8585 crate::task::TaskCoordination::Fleet,
8586 coordinator,
8587 std::time::Duration::from_secs(30),
8588 scheduled_unix_secs,
8589 )
8590 .await;
8591
8592 assert_eq!(
8593 tick_keys.lock().unwrap().as_slice(),
8594 ["cron_review_task:1700000000"]
8595 );
8596 }
8597
8598 #[tokio::test]
8599 async fn execute_fixed_delay_task_releases_lease_when_handler_panics() {
8600 let state = AppState::for_test();
8601 state.task_registry.register_scheduled(
8602 "panic_task",
8603 "every 1s",
8604 crate::task::TaskCoordination::Fleet,
8605 "postgres",
8606 "replica-a",
8607 );
8608 let release_count = std::sync::Arc::new(AtomicUsize::new(0));
8609 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
8610 backend: "postgres",
8611 tick_keys: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
8612 release_count: Some(std::sync::Arc::clone(&release_count)),
8613 });
8614 let handler: crate::task::TaskHandler = |_| {
8615 Box::pin(async {
8616 panic!("forced scheduled panic");
8617 #[allow(unreachable_code)]
8618 Ok(())
8619 })
8620 };
8621
8622 super::execute_fixed_delay_task(
8623 "panic_task".to_owned(),
8624 state.clone(),
8625 handler,
8626 std::time::Duration::from_secs(1),
8627 crate::task::TaskCoordination::Fleet,
8628 coordinator,
8629 std::time::Duration::from_secs(30),
8630 )
8631 .await;
8632
8633 let snapshot = state.task_registry.snapshot();
8634 let status = &snapshot["panic_task"];
8635 assert_eq!(release_count.load(Ordering::SeqCst), 1);
8636 assert_eq!(status.status, "idle");
8637 assert_eq!(status.last_result.as_deref(), Some("failed"));
8638 assert_eq!(status.total_runs, 1);
8639 assert_eq!(status.total_failures, 1);
8640 assert!(
8641 status
8642 .last_error
8643 .as_deref()
8644 .is_some_and(|error| error.contains("scheduled task handler panicked"))
8645 );
8646 }
8647
8648 #[test]
8649 fn next_cron_occurrence_skips_overdue_slots() {
8650 use chrono::TimeZone as _;
8651
8652 let cron = "0 * * * * *"
8653 .parse::<croner::Cron>()
8654 .expect("cron expression should parse");
8655 let stale_cursor = chrono_tz::UTC
8656 .with_ymd_and_hms(2026, 5, 5, 12, 0, 0)
8657 .unwrap();
8658 let now = chrono_tz::UTC
8659 .with_ymd_and_hms(2026, 5, 5, 12, 30, 5)
8660 .unwrap();
8661 let next = super::next_cron_occurrence_after(&cron, &stale_cursor, &now)
8662 .expect("next cron occurrence should resolve");
8663
8664 assert_eq!(
8665 next,
8666 chrono_tz::UTC
8667 .with_ymd_and_hms(2026, 5, 5, 12, 31, 0)
8668 .unwrap()
8669 );
8670 }
8671
8672 #[test]
8673 fn cron_occurrence_is_overdue_after_later_slot_passed() {
8674 use chrono::TimeZone as _;
8675
8676 let cron = "0 * * * * *"
8677 .parse::<croner::Cron>()
8678 .expect("cron expression should parse");
8679 let scheduled_at = chrono_tz::UTC
8680 .with_ymd_and_hms(2026, 5, 5, 12, 1, 0)
8681 .unwrap();
8682 let slightly_late = chrono_tz::UTC
8683 .with_ymd_and_hms(2026, 5, 5, 12, 1, 5)
8684 .unwrap();
8685 let after_later_slot = chrono_tz::UTC
8686 .with_ymd_and_hms(2026, 5, 5, 12, 30, 5)
8687 .unwrap();
8688
8689 assert!(
8690 !super::cron_occurrence_is_overdue(&cron, &scheduled_at, &slightly_late)
8691 .expect("overdue check should resolve")
8692 );
8693 assert!(
8694 super::cron_occurrence_is_overdue(&cron, &scheduled_at, &after_later_slot)
8695 .expect("overdue check should resolve")
8696 );
8697 }
8698
8699 #[cfg(feature = "storage")]
8700 mod storage_preflight {
8701 use super::super::{StorageBootstrap, preflight_storage};
8702 use crate::AppState;
8703 use crate::config::AutumnConfig;
8704 use crate::storage::{BlobStoreState, StorageBackend, StorageConfig, StorageLocalConfig};
8705
8706 fn config_with_storage(storage: StorageConfig) -> AutumnConfig {
8707 AutumnConfig {
8708 profile: Some("dev".into()),
8709 storage,
8710 ..AutumnConfig::default()
8711 }
8712 }
8713
8714 #[test]
8715 fn preflight_returns_none_when_disabled() {
8716 let cfg = config_with_storage(StorageConfig {
8717 backend: StorageBackend::Disabled,
8718 ..StorageConfig::default()
8719 });
8720 assert!(preflight_storage(&cfg).is_none());
8721 }
8722
8723 #[test]
8724 fn preflight_provisions_local_backend_against_tempdir() {
8725 let dir = tempfile::tempdir().unwrap();
8726 let cfg = config_with_storage(StorageConfig {
8727 backend: StorageBackend::Local,
8728 local: StorageLocalConfig {
8729 root: dir.path().to_path_buf(),
8730 ..StorageLocalConfig::default()
8731 },
8732 ..StorageConfig::default()
8733 });
8734 let bootstrap = preflight_storage(&cfg).expect("local backend should provision");
8735 assert_eq!(bootstrap.store.provider_id(), "default");
8736 assert!(bootstrap.serving.is_some(), "local backend mounts a route");
8737 }
8738
8739 #[tokio::test]
8740 async fn install_registers_blob_store_on_state() {
8741 let dir = tempfile::tempdir().unwrap();
8742 let cfg = config_with_storage(StorageConfig {
8743 backend: StorageBackend::Local,
8744 local: StorageLocalConfig {
8745 root: dir.path().to_path_buf(),
8746 ..StorageLocalConfig::default()
8747 },
8748 ..StorageConfig::default()
8749 });
8750 let bootstrap: StorageBootstrap = preflight_storage(&cfg).unwrap();
8751
8752 let state = AppState::for_test();
8753 assert!(state.extension::<BlobStoreState>().is_none());
8754 let serving = bootstrap.install(&state);
8755 assert!(serving.is_some());
8756 assert!(state.extension::<BlobStoreState>().is_some());
8757 }
8758
8759 #[test]
8760 fn with_blob_store_stores_custom_store() {
8761 use crate::storage::{
8762 Blob, BlobFuture, BlobMeta, BlobStore, BlobStoreError, ByteStream,
8763 };
8764 use bytes::Bytes;
8765 use std::time::Duration;
8766
8767 struct FakeStore;
8768 impl BlobStore for FakeStore {
8769 fn provider_id(&self) -> &'static str {
8770 "fake"
8771 }
8772 fn put<'a>(&'a self, _k: &'a str, _ct: &'a str, _b: Bytes) -> BlobFuture<'a, Blob> {
8773 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8774 }
8775 fn put_stream<'a>(
8776 &'a self,
8777 _k: &'a str,
8778 _ct: &'a str,
8779 _d: ByteStream<'a>,
8780 ) -> BlobFuture<'a, Blob> {
8781 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8782 }
8783 fn get<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Bytes> {
8784 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8785 }
8786 fn delete<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, ()> {
8787 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8788 }
8789 fn head<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
8790 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8791 }
8792 fn presigned_url<'a>(
8793 &'a self,
8794 _k: &'a str,
8795 _e: Duration,
8796 ) -> BlobFuture<'a, String> {
8797 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8798 }
8799 }
8800
8801 let builder = crate::app().with_blob_store(FakeStore);
8802 assert!(builder.blob_store.is_some());
8803 }
8804
8805 #[tokio::test]
8806 async fn with_blob_store_is_installed_on_state() {
8807 use crate::storage::{
8808 Blob, BlobFuture, BlobMeta, BlobStore, BlobStoreError, ByteStream,
8809 };
8810 use bytes::Bytes;
8811 use std::time::Duration;
8812
8813 struct FakeStore;
8814 impl BlobStore for FakeStore {
8815 fn provider_id(&self) -> &'static str {
8816 "fake-installed"
8817 }
8818 fn put<'a>(&'a self, _k: &'a str, _ct: &'a str, _b: Bytes) -> BlobFuture<'a, Blob> {
8819 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8820 }
8821 fn put_stream<'a>(
8822 &'a self,
8823 _k: &'a str,
8824 _ct: &'a str,
8825 _d: ByteStream<'a>,
8826 ) -> BlobFuture<'a, Blob> {
8827 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8828 }
8829 fn get<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Bytes> {
8830 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8831 }
8832 fn delete<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, ()> {
8833 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8834 }
8835 fn head<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
8836 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8837 }
8838 fn presigned_url<'a>(
8839 &'a self,
8840 _k: &'a str,
8841 _e: Duration,
8842 ) -> BlobFuture<'a, String> {
8843 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
8844 }
8845 }
8846
8847 let builder = crate::app().with_blob_store(FakeStore);
8848 let bootstrap = builder.blob_store.map(|store| StorageBootstrap {
8849 store,
8850 serving: None,
8851 });
8852 let state = AppState::for_test();
8853 assert!(state.extension::<BlobStoreState>().is_none());
8854 if let Some(b) = bootstrap {
8855 b.install(&state);
8856 }
8857 let installed = state
8858 .extension::<BlobStoreState>()
8859 .expect("store should be installed");
8860 assert_eq!(installed.store().provider_id(), "fake-installed");
8861 }
8862 }
8863
8864 struct TestPlugin {
8868 name: &'static str,
8869 route: Route,
8870 }
8871
8872 impl crate::plugin::Plugin for TestPlugin {
8873 fn name(&self) -> std::borrow::Cow<'static, str> {
8874 std::borrow::Cow::Borrowed(self.name)
8875 }
8876
8877 fn build(self, app: AppBuilder) -> AppBuilder {
8878 app.routes(vec![self.route])
8879 }
8880 }
8881
8882 #[test]
8883 fn routes_registered_before_plugin_are_user_sourced() {
8884 let user_route = test_get_route("/home", "home");
8885 let builder = app().routes(vec![user_route]);
8886 assert_eq!(builder.route_sources.len(), 1);
8887 assert_eq!(
8888 builder.route_sources[0],
8889 crate::route_listing::RouteSource::User
8890 );
8891 }
8892
8893 #[test]
8894 fn routes_registered_inside_plugin_are_plugin_sourced() {
8895 let plugin_route = test_get_route("/plugin-page", "plugin_page");
8896 let plugin = TestPlugin {
8897 name: "my-plugin",
8898 route: plugin_route,
8899 };
8900 let builder = app().plugin(plugin);
8901 assert_eq!(builder.route_sources.len(), 1);
8902 assert_eq!(
8903 builder.route_sources[0],
8904 crate::route_listing::RouteSource::Plugin("my-plugin".to_owned())
8905 );
8906 }
8907
8908 #[test]
8909 fn routes_registered_after_plugin_revert_to_user_sourced() {
8910 let plugin_route = test_get_route("/plugin-page", "plugin_page");
8911 let user_route = test_get_route("/home", "home");
8912 let plugin = TestPlugin {
8913 name: "my-plugin",
8914 route: plugin_route,
8915 };
8916 let builder = app().plugin(plugin).routes(vec![user_route]);
8917 assert_eq!(builder.route_sources.len(), 2);
8918 assert_eq!(
8919 builder.route_sources[0],
8920 crate::route_listing::RouteSource::Plugin("my-plugin".to_owned())
8921 );
8922 assert_eq!(
8923 builder.route_sources[1],
8924 crate::route_listing::RouteSource::User
8925 );
8926 }
8927
8928 struct OuterPlugin;
8930
8931 impl crate::plugin::Plugin for OuterPlugin {
8932 fn name(&self) -> std::borrow::Cow<'static, str> {
8933 "outer".into()
8934 }
8935
8936 fn build(self, app: AppBuilder) -> AppBuilder {
8937 let inner = TestPlugin {
8938 name: "inner",
8939 route: test_get_route("/inner", "inner"),
8940 };
8941 app.plugin(inner)
8942 .routes(vec![test_get_route("/outer-after", "outer_after")])
8943 }
8944 }
8945
8946 #[test]
8947 fn outer_plugin_source_restored_after_nested_plugin() {
8948 let builder = app().plugin(OuterPlugin);
8949 assert_eq!(builder.route_sources.len(), 2);
8951 assert_eq!(
8952 builder.route_sources[0],
8953 crate::route_listing::RouteSource::Plugin("inner".to_owned()),
8954 "first route should be attributed to inner plugin"
8955 );
8956 assert_eq!(
8957 builder.route_sources[1],
8958 crate::route_listing::RouteSource::Plugin("outer".to_owned()),
8959 "second route should be re-attributed to outer plugin after nested build"
8960 );
8961 }
8962
8963 #[tokio::test]
8966 async fn shutdown_hooks_with_timeout_runs_all_fast_hooks() {
8967 use std::sync::atomic::{AtomicUsize, Ordering};
8968 let counter = Arc::new(AtomicUsize::new(0));
8969 let c1 = Arc::clone(&counter);
8970 let c2 = Arc::clone(&counter);
8971
8972 let hooks: Vec<ShutdownHook> = vec![
8973 Box::new(move || {
8974 let c = Arc::clone(&c1);
8975 Box::pin(async move {
8976 c.fetch_add(1, Ordering::SeqCst);
8977 })
8978 }),
8979 Box::new(move || {
8980 let c = Arc::clone(&c2);
8981 Box::pin(async move {
8982 c.fetch_add(1, Ordering::SeqCst);
8983 })
8984 }),
8985 ];
8986
8987 run_shutdown_hooks_with_timeout(
8988 &hooks,
8989 std::time::Duration::from_secs(2),
8990 std::time::Duration::from_secs(10),
8991 )
8992 .await;
8993
8994 assert_eq!(counter.load(Ordering::SeqCst), 2, "both hooks must run");
8995 }
8996
8997 #[tokio::test]
8998 async fn shutdown_hooks_with_timeout_tolerates_slow_hook_overrun() {
8999 use std::sync::atomic::{AtomicBool, Ordering};
9000 let fast_ran = Arc::new(AtomicBool::new(false));
9001 let fr = Arc::clone(&fast_ran);
9002
9003 let hooks: Vec<ShutdownHook> = vec![
9004 Box::new(move || {
9006 let fr = Arc::clone(&fr);
9007 Box::pin(async move {
9008 fr.store(true, Ordering::SeqCst);
9009 })
9010 }),
9011 Box::new(|| {
9013 Box::pin(async move {
9014 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
9015 })
9016 }),
9017 ];
9018
9019 run_shutdown_hooks_with_timeout(
9022 &hooks,
9023 std::time::Duration::from_millis(50),
9024 std::time::Duration::from_secs(1),
9025 )
9026 .await;
9027
9028 assert!(
9029 fast_ran.load(Ordering::SeqCst),
9030 "fast hook must still run even after slow hook overruns its per-hook budget"
9031 );
9032 }
9033}