1use std::any::{Any, TypeId};
28use std::collections::{HashMap, HashSet};
29use std::future::Future;
30use std::pin::Pin;
31use std::sync::Arc;
32
33use futures::FutureExt as _;
34use tracing::Instrument as _;
35
36use crate::config::{AutumnConfig, ConfigLoader};
37use crate::error_pages::{ErrorPageRenderer, SharedRenderer};
38use crate::middleware::exception_filter::ExceptionFilter;
39#[cfg(feature = "db")]
40use crate::migrate;
41use crate::route::Route;
42use crate::state::AppState;
43
44#[must_use]
67pub fn app() -> AppBuilder {
68 AppBuilder {
69 routes: Vec::new(),
70 route_sources: Vec::new(),
71 current_plugin: None,
72 tasks: Vec::new(),
73 one_off_tasks: Vec::new(),
74 jobs: Vec::new(),
75 static_metas: Vec::new(),
76 exception_filters: Vec::new(),
77 scoped_groups: Vec::new(),
78 merge_routers: Vec::new(),
79 nest_routers: Vec::new(),
80 custom_layers: Vec::new(),
81 startup_hooks: Vec::new(),
82 shutdown_hooks: Vec::new(),
83 extensions: HashMap::new(),
84 registered_plugins: HashSet::new(),
85 error_page_renderer: None,
86 #[cfg(feature = "db")]
87 migrations: Vec::new(),
88 config_loader_factory: None,
89 #[cfg(feature = "db")]
90 pool_provider_factory: None,
91 telemetry_provider: None,
92 session_store: None,
93 #[cfg(feature = "ws")]
94 channels_backend: None,
95 #[cfg(feature = "storage")]
96 blob_store: None,
97 cache_backend: None,
98 #[cfg(feature = "openapi")]
99 openapi: None,
100 audit_logger: None,
101 #[cfg(feature = "i18n")]
102 i18n_bundle: None,
103 #[cfg(feature = "i18n")]
104 i18n_auto_load: false,
105 policy_registrations: Vec::new(),
106 #[cfg(feature = "mail")]
107 mail_delivery_queue_factory: None,
108 #[cfg(feature = "mail")]
109 mail_previews: Vec::new(),
110 declared_routes: Vec::new(),
111 }
112}
113
114type StartupHookFuture = Pin<Box<dyn Future<Output = crate::AutumnResult<()>> + Send>>;
115type StartupHook = Box<dyn Fn(AppState) -> StartupHookFuture + Send + Sync>;
116type ShutdownHookFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
117type ShutdownHook = Box<dyn Fn() -> ShutdownHookFuture + Send + Sync>;
118
119type ConfigLoaderFactory = Box<
127 dyn FnOnce() -> Pin<
128 Box<dyn Future<Output = Result<AutumnConfig, crate::config::ConfigError>> + Send>,
129 > + Send,
130>;
131#[cfg(feature = "db")]
132type PoolProviderFactory = Box<
133 dyn FnOnce(
134 crate::config::DatabaseConfig,
135 ) -> Pin<
136 Box<
137 dyn Future<
138 Output = Result<Option<crate::db::DatabaseTopology>, crate::db::PoolError>,
139 > + Send,
140 >,
141 > + Send,
142>;
143
144type PolicyRegistration = Box<dyn FnOnce(&crate::authorization::PolicyRegistry) + Send>;
147
148pub struct AppBuilder {
177 routes: Vec<Route>,
178 route_sources: Vec<crate::route_listing::RouteSource>,
180 current_plugin: Option<String>,
183 tasks: Vec<crate::task::TaskInfo>,
184 one_off_tasks: Vec<crate::task::OneOffTaskInfo>,
185 jobs: Vec<crate::job::JobInfo>,
186 pub(crate) static_metas: Vec<crate::static_gen::StaticRouteMeta>,
187 exception_filters: Vec<Arc<dyn ExceptionFilter>>,
188 scoped_groups: Vec<ScopedGroup>,
189 merge_routers: Vec<axum::Router<AppState>>,
190 nest_routers: Vec<(String, axum::Router<AppState>)>,
191 custom_layers: Vec<CustomLayerRegistration>,
194 startup_hooks: Vec<StartupHook>,
195 shutdown_hooks: Vec<ShutdownHook>,
196 extensions: HashMap<TypeId, Box<dyn Any + Send>>,
197 registered_plugins: HashSet<String>,
199 error_page_renderer: Option<SharedRenderer>,
201 #[cfg(feature = "db")]
203 migrations: Vec<migrate::EmbeddedMigrations>,
204 config_loader_factory: Option<ConfigLoaderFactory>,
207 #[cfg(feature = "db")]
210 pool_provider_factory: Option<PoolProviderFactory>,
211 telemetry_provider: Option<Box<dyn crate::telemetry::TelemetryProvider>>,
214 session_store: Option<Arc<dyn crate::session::BoxedSessionStore>>,
218 #[cfg(feature = "ws")]
221 channels_backend: Option<Arc<dyn crate::channels::ChannelsBackend>>,
222 #[cfg(feature = "storage")]
226 blob_store: Option<crate::storage::SharedBlobStore>,
227 cache_backend: Option<Arc<dyn crate::cache::Cache>>,
231 #[cfg(feature = "openapi")]
239 openapi: Option<crate::openapi::OpenApiConfig>,
240 audit_logger: Option<Arc<crate::audit::AuditLogger>>,
242 #[cfg(feature = "i18n")]
246 i18n_bundle: Option<Arc<crate::i18n::Bundle>>,
247 #[cfg(feature = "i18n")]
251 i18n_auto_load: bool,
252 policy_registrations: Vec<PolicyRegistration>,
258 #[cfg(feature = "mail")]
262 mail_delivery_queue_factory: Option<MailDeliveryQueueFactory>,
263 #[cfg(feature = "mail")]
265 mail_previews: Vec<crate::mail::MailPreview>,
266 declared_routes: Vec<crate::route_listing::RouteInfo>,
270}
271
272#[cfg(feature = "mail")]
276pub(crate) type MailDeliveryQueueFactory = Box<
277 dyn FnOnce(&AppState) -> crate::AutumnResult<Arc<dyn crate::mail::MailDeliveryQueue>> + Send,
278>;
279
280pub(crate) struct ScopedGroup {
285 pub(crate) prefix: String,
286 pub(crate) routes: Vec<Route>,
287 pub(crate) source: crate::route_listing::RouteSource,
289 pub(crate) apply_layer:
291 Box<dyn FnOnce(axum::Router<AppState>) -> axum::Router<AppState> + Send>,
292}
293
294pub(crate) type CustomLayerApplier =
300 Box<dyn FnOnce(axum::Router<AppState>) -> axum::Router<AppState> + Send>;
301
302pub(crate) struct CustomLayerRegistration {
304 pub(crate) type_id: TypeId,
306 pub(crate) apply: CustomLayerApplier,
308}
309
310mod sealed {
311 pub trait Sealed {}
312}
313
314#[diagnostic::on_unimplemented(
328 message = "`{Self}` is not a usable Autumn app-wide Tower layer",
329 label = "this type does not implement `tower::Layer<axum::routing::Route>` with the required service bounds",
330 note = "`AppBuilder::layer(..)` requires:\n L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,\n L::Service: Service<axum::extract::Request, Response = axum::response::Response, Error = Infallible> + Clone + Send + Sync + 'static,\n <L::Service as Service<axum::extract::Request>>::Future: Send + 'static\nSee docs/guide/middleware.md for common patterns and how to wrap raw-error layers (e.g. TimeoutLayer) with HandleErrorLayer."
331)]
332pub trait IntoAppLayer: sealed::Sealed + Send + Sync + 'static {
333 #[doc(hidden)]
335 fn apply_to(self, router: axum::Router<AppState>) -> axum::Router<AppState>;
336}
337
338impl<L> sealed::Sealed for L
339where
340 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
341 L::Service: tower::Service<
342 axum::extract::Request,
343 Response = axum::response::Response,
344 Error = std::convert::Infallible,
345 > + Clone
346 + Send
347 + Sync
348 + 'static,
349 <L::Service as tower::Service<axum::extract::Request>>::Future: Send + 'static,
350{
351}
352
353impl<L> IntoAppLayer for L
354where
355 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
356 L::Service: tower::Service<
357 axum::extract::Request,
358 Response = axum::response::Response,
359 Error = std::convert::Infallible,
360 > + Clone
361 + Send
362 + Sync
363 + 'static,
364 <L::Service as tower::Service<axum::extract::Request>>::Future: Send + 'static,
365{
366 fn apply_to(self, router: axum::Router<AppState>) -> axum::Router<AppState> {
367 router.layer(self)
368 }
369}
370
371impl AppBuilder {
372 #[must_use]
394 pub fn routes(mut self, routes: Vec<Route>) -> Self {
395 let source = self
396 .current_plugin
397 .as_ref()
398 .map_or(crate::route_listing::RouteSource::User, |name| {
399 crate::route_listing::RouteSource::Plugin(name.clone())
400 });
401 for _ in &routes {
402 self.route_sources.push(source.clone());
403 }
404 self.routes.extend(routes);
405 self
406 }
407
408 #[must_use]
414 pub fn tasks(mut self, tasks: Vec<crate::task::TaskInfo>) -> Self {
415 self.tasks.extend(tasks);
416 self
417 }
418
419 #[must_use]
424 pub fn one_off_tasks(mut self, tasks: Vec<crate::task::OneOffTaskInfo>) -> Self {
425 self.one_off_tasks.extend(tasks);
426 self
427 }
428
429 #[must_use]
431 pub fn jobs(mut self, jobs: Vec<crate::job::JobInfo>) -> Self {
432 self.jobs.extend(jobs);
433 self
434 }
435
436 #[must_use]
441 pub fn static_routes(mut self, metas: Vec<crate::static_gen::StaticRouteMeta>) -> Self {
442 self.static_metas.extend(metas);
443 self
444 }
445
446 #[cfg(feature = "openapi")]
494 #[must_use]
495 pub fn openapi(mut self, config: crate::openapi::OpenApiConfig) -> Self {
496 self.openapi = Some(config);
497 self
498 }
499
500 #[must_use]
532 pub fn exception_filter(mut self, filter: impl ExceptionFilter) -> Self {
533 self.exception_filters.push(Arc::new(filter));
534 self
535 }
536
537 #[must_use]
574 pub fn error_pages(mut self, renderer: impl ErrorPageRenderer) -> Self {
575 self.error_page_renderer = Some(Arc::new(renderer));
576 self
577 }
578
579 #[must_use]
602 pub fn scoped<L>(mut self, prefix: &str, layer: L, routes: Vec<Route>) -> Self
603 where
604 L: tower::Layer<axum::routing::Route> + Clone + Send + Sync + 'static,
605 L::Service: tower::Service<
606 axum::http::Request<axum::body::Body>,
607 Response = axum::http::Response<axum::body::Body>,
608 Error = std::convert::Infallible,
609 > + Clone
610 + Send
611 + Sync
612 + 'static,
613 <L::Service as tower::Service<axum::http::Request<axum::body::Body>>>::Future:
614 Send + 'static,
615 {
616 let source = self
617 .current_plugin
618 .as_ref()
619 .map_or(crate::route_listing::RouteSource::User, |name| {
620 crate::route_listing::RouteSource::Plugin(name.clone())
621 });
622 self.scoped_groups.push(ScopedGroup {
623 prefix: prefix.to_owned(),
624 routes,
625 source,
626 apply_layer: Box::new(move |router| router.layer(layer)),
627 });
628 self
629 }
630
631 #[must_use]
703 pub fn layer<L: IntoAppLayer>(mut self, layer: L) -> Self {
704 self.custom_layers.push(CustomLayerRegistration {
705 type_id: TypeId::of::<L>(),
706 apply: Box::new(move |router| layer.apply_to(router)),
707 });
708 self
709 }
710
711 #[must_use]
716 pub fn has_layer<L: 'static>(&self) -> bool {
717 let layer_type = TypeId::of::<L>();
718 self.custom_layers
719 .iter()
720 .any(|registered| registered.type_id == layer_type)
721 }
722
723 #[must_use]
728 pub fn get_layer_types(&self) -> Vec<TypeId> {
729 self.custom_layers
730 .iter()
731 .map(|registered| registered.type_id)
732 .collect()
733 }
734
735 #[must_use]
773 pub fn merge(mut self, router: axum::Router<AppState>) -> Self {
774 self.merge_routers.push(router);
775 self
776 }
777
778 #[must_use]
811 pub fn nest(mut self, path: &str, router: axum::Router<AppState>) -> Self {
812 self.nest_routers.push((path.to_owned(), router));
813 self
814 }
815
816 #[must_use]
826 pub fn declare_plugin_routes(
827 mut self,
828 routes: impl IntoIterator<Item = crate::route_listing::RouteInfo>,
829 ) -> Self {
830 let source = self
831 .current_plugin
832 .as_deref()
833 .map_or(crate::route_listing::RouteSource::User, |name| {
834 crate::route_listing::RouteSource::Plugin(name.to_owned())
835 });
836 for mut route in routes {
837 route.source = source.clone();
838 self.declared_routes.push(route);
839 }
840 self
841 }
842
843 #[must_use]
849 pub fn on_startup<F, Fut>(mut self, hook: F) -> Self
850 where
851 F: Fn(AppState) -> Fut + Send + Sync + 'static,
852 Fut: Future<Output = crate::AutumnResult<()>> + Send + 'static,
853 {
854 self.startup_hooks
855 .push(Box::new(move |state| Box::pin(hook(state))));
856 self
857 }
858
859 #[must_use]
864 pub fn on_shutdown<F, Fut>(mut self, hook: F) -> Self
865 where
866 F: Fn() -> Fut + Send + Sync + 'static,
867 Fut: Future<Output = ()> + Send + 'static,
868 {
869 self.shutdown_hooks.push(Box::new(move || Box::pin(hook())));
870 self
871 }
872
873 #[must_use]
878 pub fn with_extension<T>(mut self, value: T) -> Self
879 where
880 T: Any + Send + 'static,
881 {
882 self.extensions.insert(TypeId::of::<T>(), Box::new(value));
883 self
884 }
885
886 #[must_use]
894 pub fn update_extension<T, Init, Update>(mut self, init: Init, update: Update) -> Self
895 where
896 T: Any + Send + 'static,
897 Init: FnOnce() -> T,
898 Update: FnOnce(&mut T),
899 {
900 let type_id = TypeId::of::<T>();
901 let entry = self
902 .extensions
903 .entry(type_id)
904 .or_insert_with(|| Box::new(init()));
905 let typed = entry
906 .downcast_mut::<T>()
907 .expect("extension type map corrupted");
908 update(typed);
909 self
910 }
911
912 #[must_use]
914 pub fn extension<T>(&self) -> Option<&T>
915 where
916 T: Any + Send + 'static,
917 {
918 self.extensions.get(&TypeId::of::<T>())?.downcast_ref::<T>()
919 }
920
921 #[cfg(feature = "i18n")]
929 #[must_use]
930 pub fn i18n(mut self, bundle: crate::i18n::Bundle) -> Self {
931 self.i18n_bundle = Some(Arc::new(bundle));
932 self.i18n_auto_load = false;
933 self
934 }
935
936 #[cfg(feature = "i18n")]
972 #[must_use]
973 pub fn i18n_auto(mut self) -> Self {
974 self.i18n_bundle = None;
975 self.i18n_auto_load = true;
976 self
977 }
978
979 #[must_use]
995 pub fn with_config_loader<L>(mut self, loader: L) -> Self
996 where
997 L: crate::config::ConfigLoader,
998 {
999 if self.config_loader_factory.is_some() {
1000 tracing::warn!(
1001 "config loader replaced; the previously-installed loader was overwritten"
1002 );
1003 }
1004 self.config_loader_factory = Some(Box::new(move || {
1005 Box::pin(async move { loader.load().await })
1006 }));
1007 self
1008 }
1009
1010 #[cfg(feature = "db")]
1018 #[must_use]
1019 pub fn with_pool_provider<P>(mut self, provider: P) -> Self
1020 where
1021 P: crate::db::DatabasePoolProvider,
1022 {
1023 if self.pool_provider_factory.is_some() {
1024 tracing::warn!(
1025 "database pool provider replaced; the previously-installed provider was overwritten"
1026 );
1027 }
1028 self.pool_provider_factory =
1029 Some(Box::new(move |config: crate::config::DatabaseConfig| {
1030 Box::pin(async move { provider.create_topology(&config).await })
1031 }));
1032 self
1033 }
1034
1035 #[must_use]
1042 pub fn with_telemetry_provider<T>(mut self, provider: T) -> Self
1043 where
1044 T: crate::telemetry::TelemetryProvider,
1045 {
1046 if self.telemetry_provider.is_some() {
1047 tracing::warn!(
1048 "telemetry provider replaced; the previously-installed provider was overwritten"
1049 );
1050 }
1051 self.telemetry_provider = Some(Box::new(provider));
1052 self
1053 }
1054
1055 #[must_use]
1062 pub fn with_session_store<S>(mut self, store: S) -> Self
1063 where
1064 S: crate::session::SessionStore,
1065 {
1066 if self.session_store.is_some() {
1067 tracing::warn!(
1068 "session store replaced; the previously-installed store was overwritten"
1069 );
1070 }
1071 self.session_store = Some(Arc::new(store));
1072 self
1073 }
1074
1075 #[cfg(feature = "ws")]
1082 #[must_use]
1083 pub fn with_channels_backend<B>(mut self, backend: B) -> Self
1084 where
1085 B: crate::channels::ChannelsBackend,
1086 {
1087 if self.channels_backend.is_some() {
1088 tracing::warn!(
1089 "channels backend replaced; the previously-installed backend was overwritten"
1090 );
1091 }
1092 self.channels_backend = Some(Arc::new(backend));
1093 self
1094 }
1095
1096 #[cfg(feature = "storage")]
1130 #[must_use]
1131 pub fn with_blob_store<B>(mut self, store: B) -> Self
1132 where
1133 B: crate::storage::BlobStore,
1134 {
1135 if self.blob_store.is_some() {
1136 tracing::warn!("blob store replaced; the previously-installed store was overwritten");
1137 }
1138 self.blob_store = Some(std::sync::Arc::new(store));
1139 self
1140 }
1141
1142 #[must_use]
1161 pub fn with_cache_backend<C: crate::cache::Cache>(mut self, cache: C) -> Self {
1162 if self.cache_backend.is_some() {
1163 tracing::warn!(
1164 "cache backend replaced; the previously-installed backend was overwritten"
1165 );
1166 }
1167 self.cache_backend = Some(Arc::new(cache) as Arc<dyn crate::cache::Cache>);
1168 self
1169 }
1170
1171 #[cfg(feature = "mail")]
1182 #[must_use]
1183 pub fn with_mail_delivery_queue(
1184 mut self,
1185 queue: impl crate::mail::MailDeliveryQueue + 'static,
1186 ) -> Self {
1187 let arc: Arc<dyn crate::mail::MailDeliveryQueue> = Arc::new(queue);
1188 self.mail_delivery_queue_factory = Some(Box::new(move |_state| Ok(arc)));
1189 self
1190 }
1191
1192 #[cfg(feature = "mail")]
1202 #[must_use]
1203 pub fn with_mail_delivery_queue_factory<F, Q>(mut self, factory: F) -> Self
1204 where
1205 F: FnOnce(&AppState) -> crate::AutumnResult<Q> + Send + 'static,
1206 Q: crate::mail::MailDeliveryQueue + 'static,
1207 {
1208 self.mail_delivery_queue_factory = Some(Box::new(move |state| {
1209 factory(state).map(|q| Arc::new(q) as Arc<dyn crate::mail::MailDeliveryQueue>)
1210 }));
1211 self
1212 }
1213
1214 #[cfg(feature = "mail")]
1218 #[must_use]
1219 pub fn mail_previews(
1220 mut self,
1221 previews: impl IntoIterator<Item = crate::mail::MailPreview>,
1222 ) -> Self {
1223 self.mail_previews.extend(previews);
1224 self
1225 }
1226
1227 #[must_use]
1232 pub fn with_audit_sink<S>(mut self, sink: S) -> Self
1233 where
1234 S: crate::audit::AuditSink,
1235 {
1236 let logger = self
1237 .audit_logger
1238 .take()
1239 .map_or_else(crate::audit::AuditLogger::new, |logger| (*logger).clone())
1240 .with_sink(Arc::new(sink));
1241 self.audit_logger = Some(Arc::new(logger));
1242 self
1243 }
1244
1245 #[must_use]
1268 pub fn policy<R, P>(mut self, policy: P) -> Self
1269 where
1270 R: Send + Sync + 'static,
1271 P: crate::authorization::Policy<R>,
1272 {
1273 self.policy_registrations.push(Box::new(move |registry| {
1274 registry.register_policy::<R, _>(policy);
1275 }));
1276 self
1277 }
1278
1279 #[must_use]
1287 pub fn scope<R, S>(mut self, scope: S) -> Self
1288 where
1289 R: Send + Sync + 'static,
1290 S: crate::authorization::Scope<R>,
1291 {
1292 self.policy_registrations.push(Box::new(move |registry| {
1293 registry.register_scope::<R, _>(scope);
1294 }));
1295 self
1296 }
1297
1298 #[must_use]
1306 #[track_caller]
1307 pub fn plugin<P>(mut self, plugin: P) -> Self
1308 where
1309 P: crate::plugin::Plugin,
1310 {
1311 let name = plugin.name();
1312 if self.registered_plugins.contains(name.as_ref()) {
1313 tracing::warn!(
1314 plugin = name.as_ref(),
1315 "plugin already registered; skipping duplicate"
1316 );
1317 return self;
1318 }
1319 let name_str = name.into_owned();
1320 self.registered_plugins.insert(name_str.clone());
1321 let outer_plugin = self.current_plugin.replace(name_str);
1324 let mut result = plugin.build(self);
1325 result.current_plugin = outer_plugin;
1326 result
1327 }
1328
1329 #[must_use]
1332 pub fn plugins<P>(self, plugins: P) -> Self
1333 where
1334 P: crate::plugin::Plugins,
1335 {
1336 plugins.apply(self)
1337 }
1338
1339 #[must_use]
1342 pub fn has_plugin(&self, name: &str) -> bool {
1343 self.registered_plugins.contains(name)
1344 }
1345
1346 #[cfg(feature = "db")]
1373 #[must_use]
1374 pub fn migrations(mut self, migrations: migrate::EmbeddedMigrations) -> Self {
1375 self.migrations.push(migrations);
1376 self
1377 }
1378
1379 #[allow(clippy::too_many_lines)]
1399 #[allow(clippy::cognitive_complexity)]
1400 pub async fn run(self) {
1401 if is_static_build_mode() {
1405 self.run_build_mode().await;
1406 return;
1407 }
1408
1409 if is_dump_routes_mode() {
1414 self.run_dump_routes_mode().await;
1415 return;
1416 }
1417
1418 if is_list_one_off_tasks_mode() {
1419 self.run_list_one_off_tasks_mode();
1420 return;
1421 }
1422
1423 if let Some(task_name) = one_off_task_name_from_env() {
1424 self.run_one_off_task_mode(task_name).await;
1425 return;
1426 }
1427
1428 let Self {
1429 routes,
1430 route_sources: _,
1431 current_plugin: _,
1432 tasks,
1433 one_off_tasks: _,
1434 jobs,
1435 static_metas: _,
1436 exception_filters,
1437 scoped_groups,
1438 merge_routers,
1439 nest_routers,
1440 custom_layers,
1441 startup_hooks,
1442 shutdown_hooks,
1443 extensions: _,
1444 registered_plugins: _,
1445 error_page_renderer,
1446 #[cfg(feature = "db")]
1447 migrations,
1448 config_loader_factory,
1449 #[cfg(feature = "db")]
1450 pool_provider_factory,
1451 telemetry_provider,
1452 session_store,
1453 #[cfg(feature = "ws")]
1454 channels_backend,
1455 #[cfg(feature = "storage")]
1456 blob_store,
1457 cache_backend,
1458 #[cfg(feature = "openapi")]
1459 openapi,
1460 audit_logger,
1461 #[cfg(feature = "i18n")]
1462 i18n_bundle,
1463 #[cfg(feature = "i18n")]
1464 i18n_auto_load,
1465 policy_registrations,
1466 #[cfg(feature = "mail")]
1467 mail_delivery_queue_factory,
1468 #[cfg(feature = "mail")]
1469 mail_previews,
1470 declared_routes: _,
1471 } = self;
1472
1473 let all_routes = routes;
1474
1475 let (config, _telemetry_guard) =
1477 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
1478
1479 #[cfg(feature = "i18n")]
1480 let i18n_bundle =
1481 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
1482
1483 assert!(
1485 !all_routes.is_empty(),
1486 "No routes registered. Did you forget to call .routes()?"
1487 );
1488
1489 let profile_display = config.profile.as_deref().unwrap_or("none");
1491 tracing::info!(
1492 version = env!("CARGO_PKG_VERSION"),
1493 profile = profile_display,
1494 "Autumn starting"
1495 );
1496
1497 let show_config = std::env::var("AUTUMN_SHOW_CONFIG").as_deref() == Ok("1");
1499 if show_config {
1500 log_startup_transparency(&all_routes, &tasks, &scoped_groups, &config);
1501 }
1502
1503 fail_fast_on_invalid_session_config(&config, session_store.is_some());
1507
1508 fail_fast_on_invalid_signing_secret(&config);
1511
1512 fail_fast_on_invalid_webhook_config(&config);
1516
1517 #[cfg(feature = "storage")]
1526 let storage_bootstrap = blob_store.map_or_else(
1527 || preflight_storage(&config),
1528 |store| {
1529 Some(StorageBootstrap {
1530 store,
1531 serving: None,
1532 })
1533 },
1534 );
1535
1536 #[cfg(feature = "db")]
1538 let database = setup_database(&config, migrations, pool_provider_factory)
1539 .await
1540 .unwrap_or_else(|e| {
1541 tracing::error!("{e}");
1542 std::process::exit(1);
1543 });
1544 #[cfg(feature = "db")]
1545 let pool = database.topology;
1546 #[cfg(feature = "db")]
1547 let replica_readiness = database.replica_readiness;
1548 #[cfg(feature = "db")]
1549 let replica_migration_check = database.replica_migration_check;
1550
1551 #[cfg(feature = "db")]
1552 if pool.is_some() {
1553 tracing::info!(
1554 primary_max_connections = config.database.effective_primary_pool_size(),
1555 replica_configured = config.database.replica_url.is_some(),
1556 replica_max_connections = config.database.effective_replica_pool_size(),
1557 "Database topology configured"
1558 );
1559 } else {
1560 tracing::info!("Database not configured");
1561 }
1562
1563 validate_repository_api_policies(&all_routes, &scoped_groups, &config);
1571
1572 let mut state = build_state(
1574 &config,
1575 #[cfg(feature = "db")]
1576 pool.as_ref(),
1577 #[cfg(feature = "ws")]
1578 channels_backend,
1579 );
1580 #[cfg(feature = "db")]
1581 configure_replica_migration_check(&state, replica_migration_check);
1582 #[cfg(feature = "db")]
1583 apply_replica_migration_readiness(&state, replica_readiness);
1584 if let Some(cache) = cache_backend {
1585 crate::cache::set_global_cache(cache.clone());
1586 state.shared_cache = Some(cache);
1587 } else {
1588 crate::cache::clear_global_cache();
1589 }
1590 for register in policy_registrations {
1595 register(state.policy_registry());
1596 }
1597 validate_repository_policies_registered(&all_routes, &scoped_groups, &state, &config);
1603 #[cfg(feature = "mail")]
1604 crate::mail::install_mailer_with_factory(
1605 &state,
1606 &config.mail,
1607 mail_delivery_queue_factory,
1608 true,
1609 )
1610 .unwrap_or_else(|error| {
1611 tracing::error!(error = %error, "Failed to configure mailer");
1612 std::process::exit(1);
1613 });
1614 #[cfg(feature = "mail")]
1615 state.insert_extension(crate::mail::MailPreviewRegistry::new(mail_previews));
1616 if let Some(logger) = audit_logger {
1617 state.insert_extension::<crate::audit::AuditLogger>((*logger).clone());
1618 }
1619 #[cfg(feature = "i18n")]
1620 let custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
1621
1622 #[cfg(feature = "storage")]
1626 let storage_router = storage_bootstrap.and_then(|b| b.install(&state));
1627 install_webhook_registry(&state, &config);
1628
1629 let env = crate::config::OsEnv;
1630 let dist_dir = project_dir("dist", &env);
1631 let dist_ref = if dist_dir.exists() {
1632 Some(dist_dir.as_path())
1633 } else {
1634 None
1635 };
1636 #[cfg_attr(not(feature = "storage"), allow(unused_mut))]
1637 let mut merge_routers = merge_routers;
1638 #[cfg(feature = "storage")]
1639 if let Some(router) = storage_router {
1640 merge_routers.push(router);
1641 }
1642 let router = crate::router::try_build_router_with_static_inner(
1643 all_routes,
1644 &config,
1645 state.clone(),
1646 dist_ref,
1647 crate::router::RouterContext {
1648 exception_filters,
1649 scoped_groups,
1650 merge_routers,
1651 nest_routers,
1652 custom_layers,
1653 error_page_renderer,
1654 session_store,
1655 #[cfg(feature = "openapi")]
1658 openapi: if config.openapi_runtime.enabled {
1659 openapi
1660 } else {
1661 None
1662 },
1663 },
1664 )
1665 .unwrap_or_else(|error| {
1666 tracing::error!(error = %error, "Failed to build router");
1667 std::process::exit(1);
1668 });
1669
1670 let addr = format!("{}:{}", config.server.host, config.server.port);
1674 let listener = tokio::net::TcpListener::bind(&addr)
1675 .await
1676 .unwrap_or_else(|e| {
1677 tracing::error!(addr = %addr, "Failed to bind: {e}");
1678 std::process::exit(1);
1679 });
1680
1681 let shutdown_timeout = config.server.shutdown_timeout_secs;
1682 let server_shutdown = tokio_util::sync::CancellationToken::new();
1683
1684 if let Err(error) = initialize_job_runtime(jobs, &state, &server_shutdown, &config.jobs) {
1685 tracing::error!(error = %error, "job runtime initialization failed");
1686 std::process::exit(1);
1687 }
1688
1689 tracing::info!(addr = %addr, "Listening");
1690
1691 let server_shutdown_wait = server_shutdown.clone();
1692 let server_task = tokio::spawn(async move {
1693 axum::serve(
1694 listener,
1695 router.into_make_service_with_connect_info::<std::net::SocketAddr>(),
1696 )
1697 .with_graceful_shutdown(async move {
1698 server_shutdown_wait.cancelled().await;
1699 })
1700 .await
1701 });
1702
1703 let shutdown_state = state.clone();
1704 let shutdown_signal_token = server_shutdown.clone();
1705 #[cfg(feature = "ws")]
1706 let websocket_shutdown = state.shutdown.clone();
1707
1708 let shutdown_task = tokio::spawn(async move {
1709 shutdown_signal().await;
1710 shutdown_state.begin_shutdown();
1711
1712 #[cfg(feature = "ws")]
1713 websocket_shutdown.cancel();
1714
1715 if shutdown_timeout > 5 {
1716 tokio::spawn(async move {
1717 tokio::time::sleep(std::time::Duration::from_secs(
1718 shutdown_timeout.saturating_sub(5),
1719 ))
1720 .await;
1721 tracing::warn!(
1722 timeout_secs = shutdown_timeout,
1723 "Shutdown draining near timeout, force-kill may be imminent"
1724 );
1725 });
1726 }
1727
1728 run_shutdown_hooks(&shutdown_hooks).await;
1729 shutdown_signal_token.cancel();
1730 });
1731
1732 if let Err(error) = run_startup_hooks(&startup_hooks, state.clone()).await {
1733 tracing::error!(error = %error, "startup hook failed");
1734 server_shutdown.cancel();
1735 server_task.abort();
1736 std::process::exit(1);
1737 }
1738
1739 if !state.probes().is_shutting_down() {
1740 if !tasks.is_empty()
1741 && let Err(error) = start_task_scheduler_with_config(
1742 tasks,
1743 &state,
1744 &server_shutdown,
1745 &config.scheduler,
1746 )
1747 {
1748 tracing::error!(error = %error, "scheduled task runtime initialization failed");
1749 server_shutdown.cancel();
1750 server_task.abort();
1751 std::process::exit(1);
1752 }
1753 state.probes().mark_startup_complete();
1754 }
1755
1756 let server_result = server_task.await.unwrap_or_else(|e| {
1757 tracing::error!("Server task join error: {e}");
1758 std::process::exit(1);
1759 });
1760 shutdown_task.abort();
1761 server_result.unwrap_or_else(|e| {
1762 tracing::error!("Server error: {e}");
1763 std::process::exit(1);
1764 });
1765
1766 tracing::info!("Server shut down cleanly");
1767 }
1768
1769 #[allow(clippy::too_many_lines)]
1775 async fn run_build_mode(self) {
1776 let Self {
1777 routes,
1778 route_sources: _,
1779 current_plugin: _,
1780 tasks: _,
1781 one_off_tasks: _,
1782 jobs: _,
1783 static_metas,
1784 exception_filters: _,
1785 #[cfg(feature = "openapi")]
1786 scoped_groups,
1787 #[cfg(not(feature = "openapi"))]
1788 scoped_groups: _,
1789 merge_routers: _,
1790 nest_routers: _,
1791 custom_layers,
1792 startup_hooks: _,
1793 shutdown_hooks: _,
1794 extensions: _,
1795 registered_plugins: _,
1796 error_page_renderer: _,
1797 #[cfg(feature = "db")]
1798 migrations: _,
1799 config_loader_factory,
1800 #[cfg(feature = "db")]
1801 pool_provider_factory,
1802 telemetry_provider,
1803 session_store,
1804 #[cfg(feature = "ws")]
1805 channels_backend,
1806 #[cfg(feature = "storage")]
1807 blob_store,
1808 cache_backend,
1809 #[cfg(feature = "openapi")]
1810 openapi,
1811 audit_logger: _,
1812 #[cfg(feature = "i18n")]
1813 i18n_bundle,
1814 #[cfg(feature = "i18n")]
1815 i18n_auto_load,
1816 policy_registrations,
1817 #[cfg(feature = "mail")]
1818 mail_delivery_queue_factory,
1819 #[cfg(feature = "mail")]
1820 mail_previews,
1821 declared_routes: _,
1822 } = self;
1823
1824 let all_routes = routes;
1825
1826 let (config, _telemetry_guard) =
1828 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
1829
1830 #[cfg(feature = "i18n")]
1831 let i18n_bundle =
1832 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
1833
1834 #[cfg(feature = "openapi")]
1838 let api_docs_snapshot: Vec<crate::openapi::ApiDoc> = {
1839 let mut docs: Vec<crate::openapi::ApiDoc> =
1840 all_routes.iter().map(|r| r.api_doc.clone()).collect();
1841 for group in &scoped_groups {
1842 let prefix_params = crate::router::extract_path_params(&group.prefix);
1846 for route in &group.routes {
1847 let mut doc = route.api_doc.clone();
1848 let full = crate::router::join_nested_path(&group.prefix, route.api_doc.path);
1849 doc.path = Box::leak(full.into_boxed_str());
1850 if !prefix_params.is_empty() {
1851 let mut merged: Vec<&'static str> = prefix_params
1852 .iter()
1853 .map(|p| &*Box::leak(p.clone().into_boxed_str()))
1854 .collect();
1855 merged.extend_from_slice(doc.path_params);
1856 doc.path_params = Box::leak(merged.into_boxed_slice());
1857 }
1858 docs.push(doc);
1859 }
1860 }
1861 docs
1862 };
1863
1864 if static_metas.is_empty() {
1865 eprintln!("No static routes registered. Nothing to build.");
1866 eprintln!("Hint: use .static_routes(static_routes![...]) on your AppBuilder.");
1867 std::process::exit(1);
1868 }
1869
1870 fail_fast_on_invalid_session_config(&config, session_store.is_some());
1874 fail_fast_on_invalid_signing_secret(&config);
1875
1876 #[cfg(feature = "storage")]
1883 let storage_bootstrap = blob_store.map_or_else(
1884 || preflight_storage(&config),
1885 |store| {
1886 Some(StorageBootstrap {
1887 store,
1888 serving: None,
1889 })
1890 },
1891 );
1892
1893 #[cfg(feature = "db")]
1895 let database = setup_database(&config, vec![], pool_provider_factory)
1896 .await
1897 .unwrap_or_else(|e| {
1898 eprintln!("{e}");
1899 std::process::exit(1);
1900 });
1901 #[cfg(feature = "db")]
1902 let pool = database.topology;
1903 #[cfg(feature = "db")]
1904 let replica_readiness = database.replica_readiness;
1905 #[cfg(feature = "db")]
1906 let replica_migration_check = database.replica_migration_check;
1907
1908 let mut state = build_state(
1909 &config,
1910 #[cfg(feature = "db")]
1911 pool.as_ref(),
1912 #[cfg(feature = "ws")]
1913 channels_backend,
1914 );
1915 #[cfg(feature = "db")]
1916 configure_replica_migration_check(&state, replica_migration_check);
1917 #[cfg(feature = "db")]
1918 apply_replica_migration_readiness(&state, replica_readiness);
1919 if let Some(cache) = cache_backend {
1920 crate::cache::set_global_cache(cache.clone());
1921 state.shared_cache = Some(cache);
1922 } else {
1923 crate::cache::clear_global_cache();
1924 }
1925 #[cfg(feature = "mail")]
1932 crate::mail::install_mailer_with_factory(
1933 &state,
1934 &config.mail,
1935 mail_delivery_queue_factory,
1936 false,
1937 )
1938 .unwrap_or_else(|error| {
1939 eprintln!("Failed to configure mailer: {error}");
1940 std::process::exit(1);
1941 });
1942 #[cfg(feature = "mail")]
1943 state.insert_extension(crate::mail::MailPreviewRegistry::new(mail_previews));
1944 state.probes = crate::probe::ProbeState::default();
1946
1947 for register in policy_registrations {
1957 register(state.policy_registry());
1958 }
1959
1960 #[cfg(feature = "i18n")]
1961 let custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
1962
1963 #[cfg(feature = "storage")]
1967 let storage_router = storage_bootstrap.and_then(|b| b.install(&state));
1968
1969 #[cfg_attr(not(feature = "storage"), allow(unused_mut))]
1977 let mut merge_routers: Vec<axum::Router<AppState>> = Vec::new();
1978 #[cfg(feature = "storage")]
1979 if let Some(router) = storage_router {
1980 merge_routers.push(router);
1981 }
1982 let router = crate::router::try_build_router_inner(
1983 all_routes,
1984 &config,
1985 state,
1986 crate::router::RouterContext {
1987 exception_filters: Vec::new(),
1988 scoped_groups: Vec::new(),
1989 merge_routers,
1990 nest_routers: Vec::new(),
1991 custom_layers,
1992 error_page_renderer: None,
1993 session_store,
1994 #[cfg(feature = "openapi")]
1995 openapi: None,
1996 },
1997 )
1998 .unwrap_or_else(|error| {
1999 eprintln!("Failed to build router: {error}");
2000 std::process::exit(1);
2001 });
2002
2003 let env = crate::config::OsEnv;
2004 let dist_dir = project_dir("dist", &env);
2005
2006 eprintln!("Building {} static route(s)...", static_metas.len());
2007
2008 match crate::static_gen::render_static_routes(router, &static_metas, &dist_dir).await {
2009 Ok(()) => {
2010 eprintln!(
2011 "\n \u{2713} Static build complete \u{2192} {}",
2012 dist_dir.display()
2013 );
2014 }
2015 Err(e) => {
2016 eprintln!("\n \u{2717} Static build failed: {e}");
2017 std::process::exit(1);
2018 }
2019 }
2020
2021 #[cfg(feature = "openapi")]
2024 if let Some(openapi_config) = openapi {
2025 let openapi_config =
2026 openapi_config.session_cookie_name(config.session.cookie_name.clone());
2027 let docs: Vec<&crate::openapi::ApiDoc> = api_docs_snapshot.iter().collect();
2028 let spec = crate::openapi::generate_spec(&openapi_config, &docs);
2029 match crate::openapi::write_openapi_spec_to_dist(&spec, &dist_dir) {
2030 Ok(()) => {
2031 eprintln!(
2032 " \u{2713} OpenAPI spec written \u{2192} {}/openapi.json",
2033 dist_dir.display()
2034 );
2035 }
2036 Err(e) => {
2037 eprintln!(" \u{26A0} Failed to write OpenAPI spec: {e}");
2038 }
2039 }
2040 }
2041 }
2042
2043 async fn run_dump_routes_mode(self) {
2049 let Self {
2050 routes,
2051 route_sources,
2052 scoped_groups,
2053 merge_routers,
2054 nest_routers,
2055 declared_routes,
2056 config_loader_factory,
2057 telemetry_provider,
2058 #[cfg(feature = "openapi")]
2059 openapi,
2060 ..
2061 } = self;
2062
2063 let hidden = merge_routers.len() + nest_routers.len();
2067 if hidden > 0 {
2068 eprintln!(
2069 "[autumn routes] warning: {hidden} raw router(s) added via \
2070 .merge()/.nest() are not enumerable and are omitted from this listing"
2071 );
2072 }
2073
2074 let (config, _telemetry_guard) =
2075 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
2076
2077 let mut infos =
2078 crate::route_listing::collect_route_infos(&routes, &route_sources, &scoped_groups);
2079 infos.extend(declared_routes);
2080 crate::route_listing::append_framework_routes(&mut infos, &config);
2081 #[cfg(feature = "openapi")]
2082 if let Some(ref oa) = openapi {
2083 crate::route_listing::append_openapi_routes(&mut infos, oa);
2084 }
2085 crate::route_listing::append_dev_reload_routes(&mut infos);
2086 crate::route_listing::sort_route_infos(&mut infos);
2087
2088 let json = serde_json::to_string_pretty(&infos).unwrap_or_else(|e| {
2089 eprintln!("Failed to serialize route listing: {e}");
2090 std::process::exit(1);
2091 });
2092 println!("{json}");
2093 std::process::exit(0);
2094 }
2095
2096 fn run_list_one_off_tasks_mode(self) {
2100 let Self { one_off_tasks, .. } = self;
2101
2102 if let Err(error) = crate::task::validate_unique_one_off_task_names(&one_off_tasks) {
2103 eprintln!("Invalid task registration: {error}");
2104 std::process::exit(1);
2105 }
2106
2107 let listing = crate::task::list_one_off_tasks(&one_off_tasks);
2108 let json = serde_json::to_string_pretty(&listing).unwrap_or_else(|error| {
2109 eprintln!("Failed to serialize task listing: {error}");
2110 std::process::exit(1);
2111 });
2112 println!("{json}");
2113 std::process::exit(0);
2114 }
2115
2116 #[allow(clippy::too_many_lines)]
2120 #[allow(clippy::cognitive_complexity)]
2121 async fn run_one_off_task_mode(self, requested_name: String) {
2122 let Self {
2123 one_off_tasks,
2124 jobs,
2125 #[cfg(feature = "i18n")]
2126 custom_layers,
2127 #[cfg(not(feature = "i18n"))]
2128 custom_layers: _,
2129 startup_hooks,
2130 shutdown_hooks,
2131 config_loader_factory,
2132 #[cfg(feature = "db")]
2133 migrations,
2134 #[cfg(feature = "db")]
2135 pool_provider_factory,
2136 telemetry_provider,
2137 session_store,
2138 #[cfg(feature = "ws")]
2139 channels_backend,
2140 #[cfg(feature = "storage")]
2141 blob_store,
2142 audit_logger,
2143 #[cfg(feature = "i18n")]
2144 i18n_bundle,
2145 #[cfg(feature = "i18n")]
2146 i18n_auto_load,
2147 policy_registrations,
2148 cache_backend,
2149 #[cfg(feature = "mail")]
2150 mail_delivery_queue_factory,
2151 ..
2152 } = self;
2153
2154 if let Err(error) = crate::task::validate_unique_one_off_task_names(&one_off_tasks) {
2155 eprintln!("Invalid task registration: {error}");
2156 std::process::exit(1);
2157 }
2158
2159 let Some((task_name, task_handler)) = one_off_tasks
2160 .iter()
2161 .find(|task| task.name == requested_name)
2162 .map(|task| (task.name.clone(), task.handler))
2163 else {
2164 eprintln!("No one-off task named '{requested_name}' is registered.");
2165 print_available_one_off_tasks(&one_off_tasks);
2166 std::process::exit(1);
2167 };
2168
2169 let args = one_off_task_args_from_env().unwrap_or_else(|error| {
2170 eprintln!("Invalid task args: {error}");
2171 std::process::exit(1);
2172 });
2173
2174 let (config, _telemetry_guard) =
2175 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
2176
2177 #[cfg(feature = "i18n")]
2178 let i18n_bundle =
2179 resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &config, &crate::config::OsEnv);
2180
2181 fail_fast_on_invalid_session_config(&config, session_store.is_some());
2182 fail_fast_on_invalid_signing_secret(&config);
2183
2184 #[cfg(feature = "storage")]
2185 let storage_bootstrap = blob_store.map_or_else(
2186 || preflight_storage(&config),
2187 |store| {
2188 Some(StorageBootstrap {
2189 store,
2190 serving: None,
2191 })
2192 },
2193 );
2194
2195 #[cfg(feature = "db")]
2196 let database = setup_database(&config, migrations, pool_provider_factory)
2197 .await
2198 .unwrap_or_else(|error| {
2199 eprintln!("{error}");
2200 std::process::exit(1);
2201 });
2202 #[cfg(feature = "db")]
2203 let pool = database.topology;
2204 #[cfg(feature = "db")]
2205 let replica_readiness = database.replica_readiness;
2206 #[cfg(feature = "db")]
2207 let replica_migration_check = database.replica_migration_check;
2208
2209 let mut state = build_state(
2210 &config,
2211 #[cfg(feature = "db")]
2212 pool.as_ref(),
2213 #[cfg(feature = "ws")]
2214 channels_backend,
2215 );
2216 #[cfg(feature = "db")]
2217 configure_replica_migration_check(&state, replica_migration_check);
2218 #[cfg(feature = "db")]
2219 apply_replica_migration_readiness(&state, replica_readiness);
2220 if let Some(cache) = cache_backend {
2221 crate::cache::set_global_cache(cache.clone());
2222 state.shared_cache = Some(cache);
2223 } else {
2224 crate::cache::clear_global_cache();
2225 }
2226
2227 for register in policy_registrations {
2228 register(state.policy_registry());
2229 }
2230
2231 #[cfg(feature = "mail")]
2232 crate::mail::install_mailer_with_factory(
2233 &state,
2234 &config.mail,
2235 mail_delivery_queue_factory,
2236 true,
2237 )
2238 .unwrap_or_else(|error| {
2239 eprintln!("Failed to configure mailer: {error}");
2240 std::process::exit(1);
2241 });
2242
2243 if let Some(logger) = audit_logger {
2244 state.insert_extension::<crate::audit::AuditLogger>((*logger).clone());
2245 }
2246
2247 #[cfg(feature = "i18n")]
2248 let _custom_layers = install_i18n_bundle_layer(custom_layers, &state, i18n_bundle);
2249
2250 #[cfg(feature = "storage")]
2251 let _storage_router = storage_bootstrap.and_then(|bootstrap| bootstrap.install(&state));
2252
2253 let task_shutdown = tokio_util::sync::CancellationToken::new();
2254 if let Err(error) = initialize_job_runtime(jobs, &state, &task_shutdown, &config.jobs) {
2255 eprintln!("job runtime initialization failed: {error}");
2256 std::process::exit(1);
2257 }
2258
2259 if let Err(error) = run_startup_hooks(&startup_hooks, state.clone()).await {
2260 eprintln!("startup hook failed: {error}");
2261 task_shutdown.cancel();
2262 std::process::exit(1);
2263 }
2264 state.probes().mark_startup_complete();
2265
2266 tracing::info!(task = %task_name, "Running one-off task");
2267 let span = tracing::info_span!("one_off_task", task = %task_name);
2268 let result = (task_handler)(state.clone(), args).instrument(span).await;
2269
2270 task_shutdown.cancel();
2271 run_shutdown_hooks(&shutdown_hooks).await;
2272
2273 match result {
2274 Ok(()) => {
2275 tracing::info!(task = %task_name, "One-off task completed");
2276 }
2277 Err(error) => {
2278 tracing::error!(task = %task_name, error = %error, "One-off task failed");
2279 eprintln!("Task '{task_name}' failed: {error}");
2280 for cause in error.source_chain() {
2281 eprintln!("Caused by: {cause}");
2282 }
2283 std::process::exit(1);
2284 }
2285 }
2286 }
2287}
2288
2289pub(crate) fn is_static_build_mode() -> bool {
2290 std::env::var("AUTUMN_BUILD_STATIC").as_deref() == Ok("1")
2291}
2292
2293pub(crate) fn is_dump_routes_mode() -> bool {
2294 std::env::var("AUTUMN_DUMP_ROUTES").as_deref() == Ok("1")
2295}
2296
2297pub(crate) fn is_list_one_off_tasks_mode() -> bool {
2298 std::env::var("AUTUMN_LIST_TASKS").as_deref() == Ok("1")
2299}
2300
2301fn one_off_task_name_from_env() -> Option<String> {
2302 std::env::var("AUTUMN_RUN_TASK")
2303 .ok()
2304 .map(|value| value.trim().to_owned())
2305 .filter(|value| !value.is_empty())
2306}
2307
2308fn one_off_task_args_from_env() -> Result<Vec<String>, String> {
2309 match std::env::var("AUTUMN_TASK_ARGS_JSON") {
2310 Ok(raw) if !raw.trim().is_empty() => serde_json::from_str(&raw)
2311 .map_err(|error| format!("AUTUMN_TASK_ARGS_JSON must be a JSON string array: {error}")),
2312 _ => Ok(Vec::new()),
2313 }
2314}
2315
2316fn print_available_one_off_tasks(tasks: &[crate::task::OneOffTaskInfo]) {
2317 let listing = crate::task::list_one_off_tasks(tasks);
2318 if listing.is_empty() {
2319 eprintln!("No one-off tasks are registered. Add .one_off_tasks(one_off_tasks![...]).");
2320 return;
2321 }
2322
2323 eprintln!("Available tasks:");
2324 for task in listing {
2325 if task.description.is_empty() {
2326 eprintln!(" {}", task.name);
2327 } else {
2328 eprintln!(" {:<24} {}", task.name, task.description);
2329 }
2330 }
2331}
2332
2333#[allow(clippy::cast_possible_truncation)]
2340#[allow(clippy::cognitive_complexity)]
2341#[allow(dead_code)]
2342fn start_task_scheduler(
2343 tasks: Vec<crate::task::TaskInfo>,
2344 state: &AppState,
2345 shutdown: &tokio_util::sync::CancellationToken,
2346) {
2347 if let Err(error) = start_task_scheduler_with_config(
2348 tasks,
2349 state,
2350 shutdown,
2351 &crate::config::SchedulerConfig::default(),
2352 ) {
2353 tracing::error!(error = %error, "scheduled task runtime initialization failed");
2354 }
2355}
2356
2357#[allow(clippy::cast_possible_truncation)]
2358#[allow(clippy::cognitive_complexity)]
2359fn start_task_scheduler_with_config(
2360 tasks: Vec<crate::task::TaskInfo>,
2361 state: &AppState,
2362 shutdown: &tokio_util::sync::CancellationToken,
2363 scheduler_config: &crate::config::SchedulerConfig,
2364) -> crate::AutumnResult<()> {
2365 tracing::info!(count = tasks.len(), "Starting scheduled tasks");
2366 let coordinator = crate::scheduler::coordinator_from_config(scheduler_config, state)?;
2367 let lease_ttl = std::time::Duration::from_secs(scheduler_config.lease_ttl_secs);
2368 for task_info in &tasks {
2369 let schedule_desc = task_info.schedule.to_string();
2370 tracing::info!(
2371 name = %task_info.name,
2372 schedule = %schedule_desc,
2373 coordination = %task_info.coordination,
2374 scheduler_backend = coordinator.backend(),
2375 replica_id = coordinator.replica_id(),
2376 lease_ttl_secs = scheduler_config.lease_ttl_secs,
2377 "Registered task"
2378 );
2379 }
2380
2381 let mut cron_tasks: Vec<CronTaskSpec> = Vec::new();
2382
2383 for task_info in tasks {
2384 let state = state.clone();
2385 let name = task_info.name.clone();
2386 let handler = task_info.handler;
2387 let coordination = task_info.coordination;
2388 let schedule_desc = task_info.schedule.to_string();
2389 state.task_registry.register_scheduled(
2390 &name,
2391 &schedule_desc,
2392 coordination,
2393 coordinator.backend(),
2394 coordinator.replica_id(),
2395 );
2396
2397 match task_info.schedule {
2398 crate::task::Schedule::FixedDelay(delay) => {
2399 let coordinator = Arc::clone(&coordinator);
2400 let shutdown = shutdown.child_token();
2401 tokio::spawn(async move {
2402 loop {
2403 state
2404 .task_registry
2405 .record_next_run_at(&name, &format_next_task_run_after(delay));
2406 tokio::select! {
2407 () = shutdown.cancelled() => break,
2408 () = tokio::time::sleep(delay) => {
2409 execute_fixed_delay_task(
2410 name.clone(),
2411 state.clone(),
2412 handler,
2413 delay,
2414 coordination,
2415 Arc::clone(&coordinator),
2416 lease_ttl,
2417 )
2418 .await;
2419 }
2420 }
2421 }
2422 });
2423 }
2424 crate::task::Schedule::Cron {
2425 expression,
2426 timezone,
2427 } => {
2428 cron_tasks.push(CronTaskSpec {
2429 name,
2430 expression,
2431 timezone,
2432 coordination,
2433 handler,
2434 });
2435 }
2436 }
2437 }
2438
2439 run_cron_scheduler(cron_tasks, state, shutdown, &coordinator, lease_ttl);
2440
2441 Ok(())
2442}
2443
2444#[allow(unused_variables, clippy::needless_pass_by_value)]
2445fn send_ws_sys_task_msg(
2446 state: &AppState,
2447 event: &str,
2448 name: &str,
2449 extra: Vec<(&str, serde_json::Value)>,
2450) {
2451 #[cfg(feature = "ws")]
2452 {
2453 let mut msg = serde_json::json!({
2457 "event": event,
2458 "task": name,
2459 "timestamp": chrono::Utc::now().to_rfc3339(),
2460 });
2461 if let Some(map) = msg.as_object_mut() {
2462 for (k, v) in extra {
2463 map.insert(k.to_string(), v);
2464 }
2465 }
2466 let _ = state.channels().sender("sys:tasks").send(msg.to_string());
2467 }
2468}
2469
2470async fn execute_task_result(
2471 state: &AppState,
2472 handler: crate::task::TaskHandler,
2473 start: std::time::Instant,
2474 name: &str,
2475 schedule: &'static str,
2476) -> Result<u64, (u64, String)> {
2477 let task_span = tracing::info_span!(
2481 parent: None,
2482 "scheduled_task",
2483 otel.kind = "internal",
2484 task = %name,
2485 schedule = schedule,
2486 );
2487 let future = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
2488 (handler)(state.clone()).instrument(task_span)
2489 })) {
2490 Ok(future) => future,
2491 Err(panic) => {
2492 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2493 return Err((duration_ms, format_scheduled_task_panic(panic.as_ref())));
2494 }
2495 };
2496 let result = std::panic::AssertUnwindSafe(future).catch_unwind().await;
2497 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2498
2499 match result {
2500 Ok(Ok(())) => Ok(duration_ms),
2501 Ok(Err(e)) => Err((duration_ms, e.to_string())),
2502 Err(panic) => Err((duration_ms, format_scheduled_task_panic(panic.as_ref()))),
2503 }
2504}
2505
2506fn format_scheduled_task_panic(panic: &(dyn Any + Send)) -> String {
2507 let detail = panic
2508 .downcast_ref::<String>()
2509 .map(String::as_str)
2510 .or_else(|| panic.downcast_ref::<&'static str>().copied())
2511 .unwrap_or("non-string panic payload");
2512 format!("scheduled task handler panicked: {detail}")
2513}
2514
2515async fn execute_task_result_with_optional_lease_ttl(
2516 state: &AppState,
2517 handler: crate::task::TaskHandler,
2518 start: std::time::Instant,
2519 name: &str,
2520 schedule: &'static str,
2521 lease_ttl: Option<std::time::Duration>,
2522) -> Result<u64, (u64, String)> {
2523 let Some(lease_ttl) = lease_ttl else {
2524 return execute_task_result(state, handler, start, name, schedule).await;
2525 };
2526
2527 tokio::time::timeout(
2528 lease_ttl,
2529 execute_task_result(state, handler, start, name, schedule),
2530 )
2531 .await
2532 .map_or_else(
2533 |_| {
2534 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2535 Err((
2536 duration_ms,
2537 format!(
2538 "scheduled task exceeded lease TTL of {}s",
2539 lease_ttl.as_secs()
2540 ),
2541 ))
2542 },
2543 std::convert::identity,
2544 )
2545}
2546
2547#[allow(clippy::cognitive_complexity)]
2549async fn execute_fixed_delay_task(
2550 name: String,
2551 state: AppState,
2552 handler: crate::task::TaskHandler,
2553 delay: std::time::Duration,
2554 coordination: crate::task::TaskCoordination,
2555 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
2556 lease_ttl: std::time::Duration,
2557) {
2558 let tick_key =
2559 crate::scheduler::fixed_delay_tick_key(&name, delay, crate::scheduler::now_unix_duration());
2560 let lease = match coordinator
2561 .try_acquire(&name, &tick_key, coordination)
2562 .await
2563 {
2564 Ok(Some(lease)) => lease,
2565 Ok(None) => {
2566 tracing::debug!(task = %name, tick = %tick_key, "Scheduled task tick already claimed");
2567 return;
2568 }
2569 Err(error) => {
2570 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to acquire scheduled task lease");
2571 return;
2572 }
2573 };
2574 state
2575 .task_registry
2576 .record_leader(&name, lease.leader_id(), &tick_key);
2577 tracing::debug!(task = %name, "Running scheduled task");
2578 state.task_registry.record_start(&name);
2579
2580 send_ws_sys_task_msg(&state, "started", &name, vec![]);
2581
2582 let start = std::time::Instant::now();
2583 let lease_ttl = lease_ttl_for_run(&lease, coordination, lease_ttl);
2584 match execute_task_result_with_optional_lease_ttl(
2585 &state,
2586 handler,
2587 start,
2588 &name,
2589 "fixed_delay",
2590 lease_ttl,
2591 )
2592 .await
2593 {
2594 Ok(duration_ms) => {
2595 state.task_registry.record_success(&name, duration_ms);
2596 tracing::debug!(task = %name, "Task completed");
2597 send_ws_sys_task_msg(
2598 &state,
2599 "success",
2600 &name,
2601 vec![("duration_ms", serde_json::json!(duration_ms))],
2602 );
2603 }
2604 Err((duration_ms, error_str)) => {
2605 state
2606 .task_registry
2607 .record_failure(&name, duration_ms, &error_str);
2608 tracing::warn!(task = %name, error = %error_str, "Task failed");
2609 send_ws_sys_task_msg(
2610 &state,
2611 "failure",
2612 &name,
2613 vec![
2614 ("duration_ms", serde_json::json!(duration_ms)),
2615 ("error", serde_json::json!(error_str)),
2616 ],
2617 );
2618 }
2619 }
2620
2621 if let Err(error) = lease.release().await {
2622 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to release scheduled task lease");
2623 }
2624}
2625
2626#[allow(clippy::cognitive_complexity)]
2628async fn execute_cron_task(
2629 name: String,
2630 state: AppState,
2631 handler: crate::task::TaskHandler,
2632 coordination: crate::task::TaskCoordination,
2633 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
2634 lease_ttl: std::time::Duration,
2635 scheduled_unix_secs: u64,
2636) {
2637 let tick_key = crate::scheduler::cron_tick_key(&name, scheduled_unix_secs);
2638 let lease = match coordinator
2639 .try_acquire(&name, &tick_key, coordination)
2640 .await
2641 {
2642 Ok(Some(lease)) => lease,
2643 Ok(None) => {
2644 tracing::debug!(task = %name, tick = %tick_key, "Cron task tick already claimed");
2645 return;
2646 }
2647 Err(error) => {
2648 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to acquire cron task lease");
2649 return;
2650 }
2651 };
2652 state
2653 .task_registry
2654 .record_leader(&name, lease.leader_id(), &tick_key);
2655 tracing::debug!(task = %name, "Running cron task");
2656 state.task_registry.record_start(&name);
2657
2658 send_ws_sys_task_msg(&state, "started", &name, vec![]);
2659
2660 let start = std::time::Instant::now();
2661 let lease_ttl = lease_ttl_for_run(&lease, coordination, lease_ttl);
2662 match execute_task_result_with_optional_lease_ttl(
2663 &state, handler, start, &name, "cron", lease_ttl,
2664 )
2665 .await
2666 {
2667 Ok(duration_ms) => {
2668 state.task_registry.record_success(&name, duration_ms);
2669 tracing::debug!(task = %name, "Cron task completed");
2670 send_ws_sys_task_msg(
2671 &state,
2672 "success",
2673 &name,
2674 vec![("duration_ms", serde_json::json!(duration_ms))],
2675 );
2676 }
2677 Err((duration_ms, error_str)) => {
2678 state
2679 .task_registry
2680 .record_failure(&name, duration_ms, &error_str);
2681 tracing::warn!(task = %name, error = %error_str, "Cron task failed");
2682 send_ws_sys_task_msg(
2683 &state,
2684 "failure",
2685 &name,
2686 vec![
2687 ("duration_ms", serde_json::json!(duration_ms)),
2688 ("error", serde_json::json!(error_str)),
2689 ],
2690 );
2691 }
2692 }
2693
2694 if let Err(error) = lease.release().await {
2695 tracing::warn!(task = %name, tick = %tick_key, error = %error, "Failed to release cron task lease");
2696 }
2697}
2698
2699struct CronTaskSpec {
2700 name: String,
2701 expression: String,
2702 timezone: Option<String>,
2703 coordination: crate::task::TaskCoordination,
2704 handler: crate::task::TaskHandler,
2705}
2706
2707fn lease_ttl_for_run(
2708 lease: &crate::scheduler::SchedulerLease,
2709 coordination: crate::task::TaskCoordination,
2710 lease_ttl: std::time::Duration,
2711) -> Option<std::time::Duration> {
2712 (coordination == crate::task::TaskCoordination::Fleet && lease.backend() == "postgres")
2713 .then_some(lease_ttl)
2714}
2715
2716fn run_cron_scheduler(
2717 tasks: Vec<CronTaskSpec>,
2718 state: &AppState,
2719 shutdown: &tokio_util::sync::CancellationToken,
2720 coordinator: &Arc<dyn crate::scheduler::SchedulerCoordinator>,
2721 lease_ttl: std::time::Duration,
2722) {
2723 if tasks.is_empty() {
2724 return;
2725 }
2726
2727 tracing::info!(count = tasks.len(), "Cron scheduler started");
2728 for task in tasks {
2729 let state = state.clone();
2730 let coordinator = Arc::clone(coordinator);
2731 let shutdown = shutdown.child_token();
2732 tokio::spawn(async move {
2733 run_cron_task_loop(task, state, shutdown, coordinator, lease_ttl).await;
2734 });
2735 }
2736}
2737
2738#[allow(clippy::cognitive_complexity)]
2739async fn run_cron_task_loop(
2740 task: CronTaskSpec,
2741 state: AppState,
2742 shutdown: tokio_util::sync::CancellationToken,
2743 coordinator: Arc<dyn crate::scheduler::SchedulerCoordinator>,
2744 lease_ttl: std::time::Duration,
2745) {
2746 let CronTaskSpec {
2747 name,
2748 expression,
2749 timezone,
2750 coordination,
2751 handler,
2752 } = task;
2753
2754 let cron = match expression.parse::<croner::Cron>() {
2755 Ok(cron) => cron,
2756 Err(error) => {
2757 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to create cron job");
2758 return;
2759 }
2760 };
2761 let timezone = timezone
2762 .as_deref()
2763 .and_then(|timezone| {
2764 timezone.parse::<chrono_tz::Tz>().map_or_else(
2765 |_| {
2766 tracing::warn!(task = %name, timezone = %timezone, "Unrecognized timezone; falling back to UTC");
2767 None
2768 },
2769 Some,
2770 )
2771 })
2772 .unwrap_or(chrono_tz::UTC);
2773 let mut cursor = chrono::Utc::now().with_timezone(&timezone);
2774
2775 loop {
2776 let now = chrono::Utc::now().with_timezone(&timezone);
2777 let scheduled_at = match next_cron_occurrence_after(&cron, &cursor, &now) {
2778 Ok(scheduled_at) => scheduled_at,
2779 Err(error) => {
2780 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to compute next cron tick");
2781 return;
2782 }
2783 };
2784 state.task_registry.record_next_run_at(
2785 &name,
2786 &scheduled_at.with_timezone(&chrono::Utc).to_rfc3339(),
2787 );
2788 let sleep_for = cron_sleep_duration_until(&scheduled_at);
2789 tokio::select! {
2790 () = shutdown.cancelled() => break,
2791 () = tokio::time::sleep(sleep_for) => {
2792 let woke_at = chrono::Utc::now().with_timezone(&timezone);
2793 match cron_occurrence_is_overdue(&cron, &scheduled_at, &woke_at) {
2794 Ok(true) => {
2795 tracing::warn!(
2796 task = %name,
2797 scheduled_at = %scheduled_at,
2798 woke_at = %woke_at,
2799 "Skipping overdue cron task tick"
2800 );
2801 cursor = woke_at;
2802 continue;
2803 }
2804 Ok(false) => {}
2805 Err(error) => {
2806 tracing::error!(task = %name, expression = %expression, error = %error, "Failed to evaluate cron tick lateness");
2807 return;
2808 }
2809 }
2810 let scheduled_unix_secs = u64::try_from(scheduled_at.timestamp()).unwrap_or_default();
2811 tokio::spawn(execute_cron_task(
2812 name.clone(),
2813 state.clone(),
2814 handler,
2815 coordination,
2816 Arc::clone(&coordinator),
2817 lease_ttl,
2818 scheduled_unix_secs,
2819 ));
2820 cursor = scheduled_at;
2821 }
2822 }
2823 }
2824}
2825
2826fn format_next_task_run_after(delay: std::time::Duration) -> String {
2827 let now = chrono::Utc::now();
2828 let Ok(delay) = chrono::TimeDelta::from_std(delay) else {
2829 return now.to_rfc3339();
2830 };
2831 (now + delay).to_rfc3339()
2832}
2833
2834fn next_cron_occurrence_after<Tz: chrono::TimeZone>(
2835 cron: &croner::Cron,
2836 cursor: &chrono::DateTime<Tz>,
2837 now: &chrono::DateTime<Tz>,
2838) -> Result<chrono::DateTime<Tz>, croner::errors::CronError> {
2839 let anchor = if cursor < now { now } else { cursor };
2840 cron.find_next_occurrence(anchor, false)
2841}
2842
2843fn cron_occurrence_is_overdue<Tz: chrono::TimeZone>(
2844 cron: &croner::Cron,
2845 scheduled_at: &chrono::DateTime<Tz>,
2846 now: &chrono::DateTime<Tz>,
2847) -> Result<bool, croner::errors::CronError> {
2848 let next_after_scheduled = cron.find_next_occurrence(scheduled_at, false)?;
2849 Ok(&next_after_scheduled <= now)
2850}
2851
2852fn cron_sleep_duration_until<Tz: chrono::TimeZone>(
2853 scheduled_at: &chrono::DateTime<Tz>,
2854) -> std::time::Duration {
2855 scheduled_at
2856 .with_timezone(&chrono::Utc)
2857 .signed_duration_since(chrono::Utc::now())
2858 .to_std()
2859 .unwrap_or_default()
2860}
2861
2862async fn run_startup_hooks(hooks: &[StartupHook], state: AppState) -> crate::AutumnResult<()> {
2863 for hook in hooks {
2864 hook(state.clone()).await?;
2865 }
2866 Ok(())
2867}
2868
2869fn initialize_job_runtime(
2870 jobs: Vec<crate::job::JobInfo>,
2871 state: &AppState,
2872 shutdown: &tokio_util::sync::CancellationToken,
2873 config: &crate::config::JobConfig,
2874) -> crate::AutumnResult<()> {
2875 crate::job::clear_global_job_client();
2876 if jobs.is_empty() {
2877 Ok(())
2878 } else {
2879 crate::job::start_runtime(jobs, state, shutdown, config)
2880 }
2881}
2882
2883async fn run_shutdown_hooks(hooks: &[ShutdownHook]) {
2884 for hook in hooks.iter().rev() {
2885 hook().await;
2886 }
2887}
2888
2889#[allow(clippy::cognitive_complexity)]
2896fn log_startup_transparency(
2897 routes: &[Route],
2898 tasks: &[crate::task::TaskInfo],
2899 scoped_groups: &[ScopedGroup],
2900 config: &AutumnConfig,
2901) {
2902 tracing::info!(
2903 "Registered routes:{}",
2904 format_route_lines(routes, scoped_groups, config)
2905 );
2906
2907 if let Some(task_lines) = format_task_lines(tasks) {
2908 tracing::info!("Scheduled tasks:{task_lines}");
2909 }
2910
2911 tracing::info!("Active middleware: {}", format_middleware_list(config));
2912
2913 tracing::info!("Configuration:{}", format_config_summary(config));
2914}
2915
2916fn fail_fast_on_invalid_session_config(config: &AutumnConfig, has_custom_session_store: bool) {
2929 if has_custom_session_store {
2930 return;
2931 }
2932 if let Err(error) = config.session.backend_plan(config.profile.as_deref()) {
2933 eprintln!("Invalid session backend config: {error}");
2934 std::process::exit(1);
2935 }
2936}
2937
2938fn fail_fast_on_invalid_signing_secret(config: &AutumnConfig) {
2944 use crate::security::config::validate_signing_secret;
2945
2946 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
2947 let secret = config.security.signing_secret.secret.as_deref();
2948
2949 if let Err(error) = validate_signing_secret(secret, is_production) {
2950 eprintln!("Invalid signing secret configuration: {error}");
2951 eprintln!(
2952 " hint: generate a secret with `openssl rand -hex 32` and set \
2953 AUTUMN_SECURITY__SIGNING_SECRET"
2954 );
2955 std::process::exit(1);
2956 }
2957
2958 if is_production {
2961 for (i, prev) in config
2962 .security
2963 .signing_secret
2964 .previous_secrets
2965 .iter()
2966 .enumerate()
2967 {
2968 if let Err(error) = validate_signing_secret(Some(prev.as_str()), true) {
2969 eprintln!("Invalid signing secret configuration: previous_secrets[{i}]: {error}");
2970 eprintln!(
2971 " hint: every previous secret must meet the same entropy requirement \
2972 as the current secret"
2973 );
2974 std::process::exit(1);
2975 }
2976 }
2977 }
2978}
2979
2980fn fail_fast_on_invalid_webhook_config(config: &AutumnConfig) {
2981 let is_production = matches!(config.profile.as_deref(), Some("prod" | "production"));
2982 if let Err(error) = config.security.webhooks.validate(is_production) {
2983 eprintln!("Invalid signed webhook configuration: {error}");
2984 std::process::exit(1);
2985 }
2986}
2987
2988pub(crate) fn install_webhook_registry(state: &AppState, config: &AutumnConfig) {
2989 if let Err(error) =
2990 crate::webhook::install_registry_from_config(state, &config.security.webhooks)
2991 {
2992 eprintln!("Invalid signed webhook configuration: {error}");
2993 std::process::exit(1);
2994 }
2995}
2996
2997#[cfg(feature = "storage")]
3003struct StorageBootstrap {
3004 store: crate::storage::SharedBlobStore,
3005 serving: Option<axum::Router<AppState>>,
3006}
3007
3008#[cfg(feature = "storage")]
3009impl StorageBootstrap {
3010 fn install(self, state: &AppState) -> Option<axum::Router<AppState>> {
3014 state.insert_extension::<crate::storage::BlobStoreState>(
3015 crate::storage::BlobStoreState::new(self.store),
3016 );
3017 self.serving
3018 }
3019}
3020
3021#[cfg(feature = "storage")]
3029#[allow(clippy::too_many_lines)] fn preflight_storage(config: &AutumnConfig) -> Option<StorageBootstrap> {
3031 use crate::storage::StorageBackendPlan;
3032
3033 let plan = config
3034 .storage
3035 .backend_plan(config.profile.as_deref())
3036 .unwrap_or_else(|error| {
3037 tracing::error!(%error, "invalid storage backend config; aborting startup");
3043 std::process::exit(1);
3044 });
3045
3046 match plan {
3047 StorageBackendPlan::Disabled => None,
3048 StorageBackendPlan::Local {
3049 provider_id,
3050 root,
3051 mount_path,
3052 default_url_expiry_secs,
3053 warn_in_production,
3054 } => Some(bootstrap_local_storage(
3055 config,
3056 &provider_id,
3057 &root,
3058 &mount_path,
3059 default_url_expiry_secs,
3060 warn_in_production,
3061 )),
3062 StorageBackendPlan::S3 { .. } => {
3063 tracing::error!(
3068 "storage.backend=s3 requires the `autumn-storage-s3` plugin. \
3069 Add it to your Cargo.toml, build an S3BlobStore from your config, \
3070 and call `.with_blob_store(store)` on your AppBuilder. \
3071 Aborting startup."
3072 );
3073 std::process::exit(1);
3074 }
3075 }
3076}
3077
3078#[cfg(feature = "storage")]
3079fn bootstrap_local_storage(
3080 config: &AutumnConfig,
3081 provider_id: &str,
3082 root: &std::path::Path,
3083 mount_path: &str,
3084 default_url_expiry_secs: u64,
3085 warn_in_production: bool,
3086) -> StorageBootstrap {
3087 use crate::storage::{LocalBlobStore, SharedBlobStore, local::SigningKey};
3088
3089 if warn_in_production {
3090 tracing::warn!(
3091 "prod profile is using the local-disk blob store; \
3092 bytes won't survive replica turnover. Set \
3093 storage.backend=s3 or storage.allow_local_in_production=true \
3094 to acknowledge"
3095 );
3096 }
3097
3098 let (signing_key, previous_signing_keys) = config
3103 .security
3104 .signing_secret
3105 .secret
3106 .as_deref()
3107 .filter(|s| !s.is_empty())
3108 .map_or_else(
3109 || {
3110 config
3111 .storage
3112 .local
3113 .signing_key
3114 .as_deref()
3115 .filter(|s| !s.is_empty())
3116 .map_or_else(
3117 || {
3118 if matches!(config.profile.as_deref(), Some("prod" | "production")) {
3119 tracing::warn!(
3120 "no signing secret configured in prod; blob URL signatures \
3121 won't survive a process restart. Set \
3122 AUTUMN_SECURITY__SIGNING_SECRET."
3123 );
3124 }
3125 (SigningKey::random(), vec![])
3126 },
3127 |legacy| (SigningKey::new(legacy.as_bytes().to_vec()), vec![]),
3128 )
3129 },
3130 |secret| {
3131 let current = SigningKey::new(secret.as_bytes().to_vec());
3132 let previous = config
3133 .security
3134 .signing_secret
3135 .previous_secrets
3136 .iter()
3137 .map(|s| SigningKey::new(s.as_bytes().to_vec()))
3138 .collect::<Vec<_>>();
3139 (current, previous)
3140 },
3141 );
3142
3143 let store = match LocalBlobStore::new(
3144 provider_id.to_string(),
3145 root.to_path_buf(),
3146 mount_path.to_string(),
3147 std::time::Duration::from_secs(default_url_expiry_secs),
3148 signing_key,
3149 previous_signing_keys,
3150 ) {
3151 Ok(store) => store,
3152 Err(err) => {
3153 tracing::error!(
3158 error = %err,
3159 root = %root.display(),
3160 "failed to initialize local blob store; aborting startup"
3161 );
3162 std::process::exit(1);
3163 }
3164 };
3165
3166 let serving = crate::storage::local::serve_router(&store);
3167 let arc: SharedBlobStore = std::sync::Arc::new(store);
3168
3169 tracing::info!(
3170 provider = %provider_id,
3171 root = %root.display(),
3172 mount = %mount_path,
3173 "Local blob store mounted"
3174 );
3175
3176 StorageBootstrap {
3177 store: arc,
3178 serving: Some(serving),
3179 }
3180}
3181async fn load_config_and_telemetry(
3182 config_loader: Option<ConfigLoaderFactory>,
3183 telemetry_provider: Option<Box<dyn crate::telemetry::TelemetryProvider>>,
3184) -> (AutumnConfig, crate::telemetry::TelemetryGuard) {
3185 let config = match config_loader {
3188 Some(factory) => factory().await,
3189 None => crate::config::TomlEnvConfigLoader::new().load().await,
3190 }
3191 .unwrap_or_else(|e| {
3192 eprintln!("Failed to load configuration: {e}");
3193 std::process::exit(1);
3194 });
3195
3196 let provider: Box<dyn crate::telemetry::TelemetryProvider> = telemetry_provider
3199 .unwrap_or_else(|| Box::new(crate::telemetry::TracingOtlpTelemetryProvider::new()));
3200 let telemetry_guard = provider
3201 .init(&config.log, &config.telemetry, config.profile.as_deref())
3202 .unwrap_or_else(|error| {
3203 eprintln!("Failed to initialize telemetry: {error}");
3204 std::process::exit(1);
3205 });
3206
3207 (config, telemetry_guard)
3208}
3209
3210#[cfg(feature = "i18n")]
3211fn resolve_i18n_bundle(
3212 explicit_bundle: Option<Arc<crate::i18n::Bundle>>,
3213 auto_load: bool,
3214 config: &AutumnConfig,
3215 env: &dyn crate::config::Env,
3216) -> Option<Arc<crate::i18n::Bundle>> {
3217 if explicit_bundle.is_some() {
3218 return explicit_bundle;
3219 }
3220 if !auto_load {
3221 return None;
3222 }
3223
3224 let dir = project_dir(&config.i18n.dir, env);
3225 Some(Arc::new(
3226 crate::i18n::Bundle::load_from_dir(&dir, &config.i18n)
3227 .unwrap_or_else(|e| panic!("i18n_auto: {e}")),
3228 ))
3229}
3230
3231#[cfg(feature = "i18n")]
3232fn install_i18n_bundle_layer(
3233 mut custom_layers: Vec<CustomLayerRegistration>,
3234 state: &AppState,
3235 bundle: Option<Arc<crate::i18n::Bundle>>,
3236) -> Vec<CustomLayerRegistration> {
3237 let Some(bundle) = bundle else {
3238 return custom_layers;
3239 };
3240
3241 tracing::info!(
3242 locales = ?bundle.locales(),
3243 default = bundle.default_locale(),
3244 "i18n bundle loaded"
3245 );
3246 state.insert_extension::<Arc<crate::i18n::Bundle>>(bundle.clone());
3247 let ext_layer = axum::Extension(bundle);
3251 custom_layers.push(CustomLayerRegistration {
3252 type_id: TypeId::of::<axum::Extension<Arc<crate::i18n::Bundle>>>(),
3253 apply: Box::new(move |router| router.layer(ext_layer)),
3254 });
3255 custom_layers
3256}
3257
3258#[cfg(feature = "db")]
3259struct DatabaseBootstrap {
3260 topology: Option<crate::db::DatabaseTopology>,
3261 replica_readiness: Option<crate::migrate::ReplicaMigrationReadiness>,
3262 replica_migration_check: Option<(String, String)>,
3263}
3264
3265#[cfg(feature = "db")]
3266async fn setup_database(
3267 config: &AutumnConfig,
3268 migrations: Vec<crate::migrate::EmbeddedMigrations>,
3269 pool_provider: Option<PoolProviderFactory>,
3270) -> Result<DatabaseBootstrap, String> {
3271 let check_replica_migrations = !migrations.is_empty();
3272 let topology = match pool_provider {
3273 Some(factory) => factory(config.database.clone()).await,
3274 None => crate::db::create_topology(&config.database),
3275 }
3276 .map_err(|e| format!("Failed to create database pool: {e}"))?;
3277
3278 if topology.is_some()
3283 && let Some(url) = config.database.effective_primary_url()
3284 {
3285 for mig in migrations {
3286 crate::migrate::auto_migrate(
3287 url,
3288 config.profile.as_deref(),
3289 config.database.auto_migrate_in_production,
3290 mig,
3291 );
3292 }
3293 }
3294
3295 let (replica_readiness, replica_migration_check) = if topology
3296 .as_ref()
3297 .is_some_and(|topology| check_replica_migrations && topology.replica().is_some())
3298 {
3299 match (
3300 config.database.effective_primary_url(),
3301 config.database.replica_url.as_deref(),
3302 ) {
3303 (Some(primary_url), Some(replica_url)) => {
3304 let primary_url = primary_url.to_owned();
3305 let replica_url = replica_url.to_owned();
3306 let readiness = crate::migrate::check_replica_migration_readiness_blocking(
3307 primary_url.clone(),
3308 replica_url.clone(),
3309 )
3310 .await;
3311 (Some(readiness), Some((primary_url, replica_url)))
3312 }
3313 _ => (None, None),
3314 }
3315 } else {
3316 (None, None)
3317 };
3318
3319 Ok(DatabaseBootstrap {
3320 topology,
3321 replica_readiness,
3322 replica_migration_check,
3323 })
3324}
3325
3326#[cfg(feature = "db")]
3327fn apply_replica_migration_readiness(
3328 state: &AppState,
3329 readiness: Option<crate::migrate::ReplicaMigrationReadiness>,
3330) {
3331 let Some(readiness) = readiness else {
3332 return;
3333 };
3334
3335 if readiness.is_ready() {
3336 state.probes().mark_replica_migrations_ready();
3337 } else if let Some(detail) = readiness.detail() {
3338 state.probes().mark_replica_migrations_unready(detail);
3339 }
3340}
3341
3342#[cfg(feature = "db")]
3343fn configure_replica_migration_check(state: &AppState, check: Option<(String, String)>) {
3344 let Some((primary_url, replica_url)) = check else {
3345 return;
3346 };
3347
3348 state
3349 .probes()
3350 .configure_replica_migration_check(primary_url, replica_url);
3351}
3352
3353fn collect_unguarded_repository_writes(
3376 routes: &[Route],
3377 scoped_groups: &[ScopedGroup],
3378) -> Vec<(String, String)> {
3379 let mut offenders: Vec<(String, String)> = Vec::new();
3380 let mut seen: std::collections::HashSet<(&'static str, &'static str)> =
3381 std::collections::HashSet::new();
3382 let mut record_route = |route: &Route| {
3383 if let Some(meta) = route.repository
3384 && !meta.has_policy
3385 && is_mutating_method(&route.method)
3386 && seen.insert((meta.resource_type_name, meta.api_path))
3387 {
3388 offenders.push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
3389 }
3390 };
3391 for route in routes {
3392 record_route(route);
3393 }
3394 for group in scoped_groups {
3395 for route in &group.routes {
3396 record_route(route);
3397 }
3398 }
3399 offenders
3400}
3401
3402fn format_unguarded_repository_listing(offenders: &[(String, String)]) -> String {
3406 use std::fmt::Write;
3407 let mut s = String::new();
3408 let mut first = true;
3409 for (name, path) in offenders {
3410 if !first {
3411 s.push('\n');
3412 }
3413 first = false;
3414 write!(s, " - #[repository({name}, api = \"{path}\")]").unwrap();
3415 }
3416 s
3417}
3418
3419fn validate_repository_api_policies(
3420 routes: &[Route],
3421 scoped_groups: &[ScopedGroup],
3422 config: &AutumnConfig,
3423) {
3424 let profile = config.profile.as_deref().unwrap_or("default");
3425 let strict =
3426 is_production_profile(profile) && !config.security.allow_unauthorized_repository_api;
3427
3428 let offenders = collect_unguarded_repository_writes(routes, scoped_groups);
3429 if offenders.is_empty() {
3430 return;
3431 }
3432
3433 let listing = format_unguarded_repository_listing(&offenders);
3434
3435 if strict {
3436 tracing::error!(
3437 "refusing to start: the following #[repository(api = ...)] mutating endpoints have no paired `policy = ...` argument:\n{listing}\n\
3438 Add `policy = SomePolicy` to each, or set `[security] allow_unauthorized_repository_api = true` to opt out explicitly."
3439 );
3440 std::process::exit(1);
3441 } else {
3442 tracing::warn!(
3443 "the following #[repository(api = ...)] mutating endpoints have no paired `policy = ...` argument; \
3444 auto-generated POST/PUT/PATCH/DELETE handlers will accept writes from any authenticated user:\n{listing}\n\
3445 This will become a startup-time error in `prod` profile builds."
3446 );
3447 }
3448}
3449
3450type MissingRepositoryRegistration = (String, String);
3465
3466fn collect_unregistered_repository_handlers(
3476 routes: &[Route],
3477 scoped_groups: &[ScopedGroup],
3478 registry: &crate::authorization::PolicyRegistry,
3479) -> (
3480 Vec<MissingRepositoryRegistration>,
3481 Vec<MissingRepositoryRegistration>,
3482) {
3483 let mut missing_policies: Vec<(String, String)> = Vec::new();
3484 let mut missing_scopes: Vec<(String, String)> = Vec::new();
3485 let mut seen_policies: std::collections::HashSet<(&'static str, &'static str)> =
3486 std::collections::HashSet::new();
3487 let mut seen_scopes: std::collections::HashSet<(&'static str, &'static str)> =
3488 std::collections::HashSet::new();
3489 let mut record_route = |route: &Route| {
3490 if let Some(meta) = route.repository {
3491 if let Some(check) = meta.policy_check
3492 && !check(registry)
3493 && seen_policies.insert((meta.resource_type_name, meta.api_path))
3494 {
3495 missing_policies
3496 .push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
3497 }
3498 if let Some(check) = meta.scope_check
3499 && !check(registry)
3500 && seen_scopes.insert((meta.resource_type_name, meta.api_path))
3501 {
3502 missing_scopes.push((meta.resource_type_name.to_owned(), meta.api_path.to_owned()));
3503 }
3504 }
3505 };
3506 for route in routes {
3507 record_route(route);
3508 }
3509 for group in scoped_groups {
3510 for route in &group.routes {
3511 record_route(route);
3512 }
3513 }
3514 (missing_policies, missing_scopes)
3515}
3516
3517fn format_missing_policy_listing(missing: &[(String, String)]) -> String {
3520 use std::fmt::Write;
3521 let mut s = String::new();
3522 let mut first = true;
3523 for (name, path) in missing {
3524 if !first {
3525 s.push('\n');
3526 }
3527 first = false;
3528 write!(s, " - #[repository({name}, api = \"{path}\", policy = ...)]: call `.policy::<{name}, _>(...)` on the app builder").unwrap();
3529 }
3530 s
3531}
3532
3533fn format_missing_scope_listing(missing: &[(String, String)]) -> String {
3536 use std::fmt::Write;
3537 let mut s = String::new();
3538 let mut first = true;
3539 for (name, path) in missing {
3540 if !first {
3541 s.push('\n');
3542 }
3543 first = false;
3544 write!(s, " - #[repository({name}, api = \"{path}\", scope = ...)]: call `.scope::<{name}, _>(...)` on the app builder").unwrap();
3545 }
3546 s
3547}
3548
3549#[allow(clippy::cognitive_complexity)]
3550fn validate_repository_policies_registered(
3551 routes: &[Route],
3552 scoped_groups: &[ScopedGroup],
3553 state: &AppState,
3554 config: &AutumnConfig,
3555) {
3556 let profile = config.profile.as_deref().unwrap_or("default");
3557 let strict = is_production_profile(profile);
3558
3559 let (missing_policies, missing_scopes) =
3560 collect_unregistered_repository_handlers(routes, scoped_groups, state.policy_registry());
3561
3562 if missing_policies.is_empty() && missing_scopes.is_empty() {
3563 return;
3564 }
3565
3566 if !missing_policies.is_empty() {
3567 let listing = format_missing_policy_listing(&missing_policies);
3568
3569 if strict {
3570 tracing::error!(
3571 "refusing to start: the following #[repository] routes declare a `policy = ...` argument, but no policy is registered for the resource type. Without registration, every protected request would fail at runtime with `500 no policy registered`:\n{listing}"
3572 );
3573 } else {
3574 tracing::warn!(
3575 "the following #[repository] routes declare `policy = ...` but no matching `.policy::<R, _>(...)` registration is on the app builder. Protected requests will 500 at runtime:\n{listing}\n\
3576 This will become a startup-time error in `prod` profile builds."
3577 );
3578 }
3579 }
3580
3581 if !missing_scopes.is_empty() {
3582 let listing = format_missing_scope_listing(&missing_scopes);
3583
3584 if strict {
3585 tracing::error!(
3586 "refusing to start: the following #[repository] routes declare a `scope = ...` argument, but no scope is registered for the resource type. Without registration, every list request would fail at runtime with `500 missing scope registration`:\n{listing}"
3587 );
3588 } else {
3589 tracing::warn!(
3590 "the following #[repository] routes declare `scope = ...` but no matching `.scope::<R, _>(...)` registration is on the app builder. List requests will 500 at runtime:\n{listing}\n\
3591 This will become a startup-time error in `prod` profile builds."
3592 );
3593 }
3594 }
3595
3596 if strict {
3597 std::process::exit(1);
3598 }
3599}
3600
3601const fn is_mutating_method(method: &http::Method) -> bool {
3602 matches!(
3603 *method,
3604 http::Method::POST | http::Method::PUT | http::Method::PATCH | http::Method::DELETE
3605 )
3606}
3607
3608fn is_production_profile(profile: &str) -> bool {
3614 matches!(profile, "prod" | "production")
3615}
3616
3617#[cfg(test)]
3618mod validate_repository_api_policies_tests {
3619 use super::*;
3620 use crate::RepositoryApiMeta;
3621
3622 fn build_route(
3623 method: http::Method,
3624 path: &'static str,
3625 meta: Option<RepositoryApiMeta>,
3626 ) -> Route {
3627 Route {
3628 method,
3629 path,
3630 handler: axum::routing::any(|| async { "" }),
3631 name: "test_route",
3632 api_doc: crate::openapi::ApiDoc::default(),
3633 repository: meta,
3634 }
3635 }
3636
3637 fn unguarded(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
3638 RepositoryApiMeta {
3639 resource_type_name: type_name,
3640 api_path: path,
3641 has_policy: false,
3642 policy_check: None,
3643 scope_check: None,
3644 }
3645 }
3646
3647 fn collect_offenders(routes: &[Route]) -> Vec<(String, String)> {
3651 collect_unguarded_repository_writes(routes, &[])
3652 }
3653
3654 #[test]
3655 fn read_only_mount_without_policy_is_not_an_offender() {
3656 let routes = vec![
3657 build_route(
3658 http::Method::GET,
3659 "/api/posts",
3660 Some(unguarded("/api/posts", "Post")),
3661 ),
3662 build_route(
3663 http::Method::GET,
3664 "/api/posts/{id}",
3665 Some(unguarded("/api/posts", "Post")),
3666 ),
3667 ];
3668 let offenders = collect_offenders(&routes);
3669 assert!(
3670 offenders.is_empty(),
3671 "read-only mounts should not trigger the unauthorized-repo guard"
3672 );
3673 }
3674
3675 #[test]
3676 fn write_mount_without_policy_is_an_offender() {
3677 let routes = vec![build_route(
3678 http::Method::POST,
3679 "/api/posts",
3680 Some(unguarded("/api/posts", "Post")),
3681 )];
3682 let offenders = collect_offenders(&routes);
3683 assert_eq!(offenders.len(), 1);
3684 assert_eq!(offenders[0].0, "Post");
3685 assert_eq!(offenders[0].1, "/api/posts");
3686 }
3687
3688 #[test]
3689 fn mixed_mount_only_dedups_one_offender_per_repository() {
3690 let routes = vec![
3691 build_route(
3692 http::Method::GET,
3693 "/api/posts",
3694 Some(unguarded("/api/posts", "Post")),
3695 ),
3696 build_route(
3697 http::Method::POST,
3698 "/api/posts",
3699 Some(unguarded("/api/posts", "Post")),
3700 ),
3701 build_route(
3702 http::Method::PUT,
3703 "/api/posts/{id}",
3704 Some(unguarded("/api/posts", "Post")),
3705 ),
3706 build_route(
3707 http::Method::DELETE,
3708 "/api/posts/{id}",
3709 Some(unguarded("/api/posts", "Post")),
3710 ),
3711 ];
3712 let offenders = collect_offenders(&routes);
3713 assert_eq!(offenders.len(), 1);
3714 }
3715
3716 #[test]
3717 fn is_mutating_method_classifies_methods() {
3718 assert!(is_mutating_method(&http::Method::POST));
3719 assert!(is_mutating_method(&http::Method::PUT));
3720 assert!(is_mutating_method(&http::Method::PATCH));
3721 assert!(is_mutating_method(&http::Method::DELETE));
3722 assert!(!is_mutating_method(&http::Method::GET));
3723 assert!(!is_mutating_method(&http::Method::HEAD));
3724 assert!(!is_mutating_method(&http::Method::OPTIONS));
3725 }
3726
3727 use crate::authorization::{Policy, PolicyRegistry};
3730
3731 #[derive(Debug, Clone, PartialEq)]
3732 struct TestPost;
3733
3734 #[derive(Default)]
3735 struct TestPostPolicy;
3736 impl Policy<TestPost> for TestPostPolicy {}
3737
3738 fn guarded_with_check(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
3739 RepositoryApiMeta {
3740 resource_type_name: type_name,
3741 api_path: path,
3742 has_policy: true,
3743 policy_check: Some(|registry: &PolicyRegistry| registry.has_policy::<TestPost>()),
3744 scope_check: None,
3745 }
3746 }
3747
3748 fn collect_missing(routes: &[Route], registry: &PolicyRegistry) -> Vec<(String, String)> {
3749 let (missing_policies, _) = collect_unregistered_repository_handlers(routes, &[], registry);
3750 missing_policies
3751 }
3752
3753 #[test]
3754 fn registry_check_flags_routes_missing_their_policy_registration() {
3755 let registry = PolicyRegistry::default();
3758 let routes = vec![build_route(
3759 http::Method::POST,
3760 "/api/posts",
3761 Some(guarded_with_check("/api/posts", "TestPost")),
3762 )];
3763 let missing = collect_missing(&routes, ®istry);
3764 assert_eq!(missing.len(), 1);
3765 assert_eq!(missing[0].0, "TestPost");
3766 assert_eq!(missing[0].1, "/api/posts");
3767 }
3768
3769 #[test]
3770 fn registry_check_passes_when_policy_is_registered() {
3771 let registry = PolicyRegistry::default();
3772 registry.register_policy::<TestPost, _>(TestPostPolicy);
3773 let routes = vec![build_route(
3774 http::Method::POST,
3775 "/api/posts",
3776 Some(guarded_with_check("/api/posts", "TestPost")),
3777 )];
3778 let missing = collect_missing(&routes, ®istry);
3779 assert!(missing.is_empty(), "policy is registered, no offenders");
3780 }
3781
3782 #[test]
3783 fn registry_check_skips_routes_without_policy_check_fn() {
3784 let registry = PolicyRegistry::default();
3789 let routes = vec![build_route(
3790 http::Method::POST,
3791 "/api/posts",
3792 Some(unguarded("/api/posts", "TestPost")),
3793 )];
3794 let missing = collect_missing(&routes, ®istry);
3795 assert!(missing.is_empty());
3796 }
3797
3798 #[test]
3799 fn registry_check_dedups_one_offender_per_repository() {
3800 let registry = PolicyRegistry::default();
3801 let routes = vec![
3802 build_route(
3803 http::Method::GET,
3804 "/api/posts",
3805 Some(guarded_with_check("/api/posts", "TestPost")),
3806 ),
3807 build_route(
3808 http::Method::POST,
3809 "/api/posts",
3810 Some(guarded_with_check("/api/posts", "TestPost")),
3811 ),
3812 build_route(
3813 http::Method::DELETE,
3814 "/api/posts/{id}",
3815 Some(guarded_with_check("/api/posts", "TestPost")),
3816 ),
3817 ];
3818 let missing = collect_missing(&routes, ®istry);
3819 assert_eq!(missing.len(), 1);
3820 }
3821
3822 use crate::authorization::{BoxFuture, PolicyContext, Scope};
3825
3826 #[derive(Default)]
3827 struct TestPostScope;
3828 impl Scope<TestPost> for TestPostScope {
3829 fn list<'a>(
3830 &'a self,
3831 _ctx: &'a PolicyContext,
3832 _conn: &'a mut diesel_async::AsyncPgConnection,
3833 ) -> BoxFuture<'a, crate::AutumnResult<Vec<TestPost>>> {
3834 Box::pin(async { Ok(Vec::new()) })
3835 }
3836 }
3837
3838 fn scope_only_meta(path: &'static str, type_name: &'static str) -> RepositoryApiMeta {
3839 RepositoryApiMeta {
3840 resource_type_name: type_name,
3841 api_path: path,
3842 has_policy: false,
3843 policy_check: None,
3844 scope_check: Some(|registry: &PolicyRegistry| registry.scope::<TestPost>().is_some()),
3845 }
3846 }
3847
3848 fn collect_missing_scopes(
3849 routes: &[Route],
3850 registry: &PolicyRegistry,
3851 ) -> Vec<(String, String)> {
3852 let (_, missing_scopes) = collect_unregistered_repository_handlers(routes, &[], registry);
3853 missing_scopes
3854 }
3855
3856 #[test]
3857 fn scope_check_flags_unregistered_scope() {
3858 let registry = PolicyRegistry::default();
3859 let routes = vec![build_route(
3860 http::Method::GET,
3861 "/api/posts",
3862 Some(scope_only_meta("/api/posts", "TestPost")),
3863 )];
3864 let missing = collect_missing_scopes(&routes, ®istry);
3865 assert_eq!(missing.len(), 1);
3866 assert_eq!(missing[0].0, "TestPost");
3867 }
3868
3869 #[test]
3870 fn scope_check_passes_when_scope_is_registered() {
3871 let registry = PolicyRegistry::default();
3872 registry.register_scope::<TestPost, _>(TestPostScope);
3873 let routes = vec![build_route(
3874 http::Method::GET,
3875 "/api/posts",
3876 Some(scope_only_meta("/api/posts", "TestPost")),
3877 )];
3878 let missing = collect_missing_scopes(&routes, ®istry);
3879 assert!(missing.is_empty());
3880 }
3881
3882 #[test]
3883 fn scope_check_skips_routes_without_scope_check_fn() {
3884 let registry = PolicyRegistry::default();
3885 let routes = vec![build_route(
3886 http::Method::POST,
3887 "/api/posts",
3888 Some(unguarded("/api/posts", "TestPost")),
3889 )];
3890 let missing = collect_missing_scopes(&routes, ®istry);
3891 assert!(missing.is_empty());
3892 }
3893
3894 #[test]
3897 fn is_production_profile_matches_both_aliases() {
3898 assert!(is_production_profile("prod"));
3899 assert!(is_production_profile("production"));
3900 assert!(!is_production_profile("dev"));
3901 assert!(!is_production_profile("staging"));
3902 assert!(!is_production_profile("test"));
3903 assert!(!is_production_profile("default"));
3904 assert!(!is_production_profile("Prod"));
3908 assert!(!is_production_profile("Production"));
3909 }
3910
3911 #[test]
3914 fn format_unguarded_listing_renders_one_bullet_per_offender() {
3915 let offenders = vec![
3916 ("Post".to_owned(), "/api/posts".to_owned()),
3917 ("Comment".to_owned(), "/api/comments".to_owned()),
3918 ];
3919 let listing = format_unguarded_repository_listing(&offenders);
3920 assert!(listing.contains("Post"));
3921 assert!(listing.contains("/api/posts"));
3922 assert!(listing.contains("Comment"));
3923 assert!(listing.contains("/api/comments"));
3924 assert_eq!(listing.matches("\n - ").count() + 1, 2);
3925 }
3926
3927 #[test]
3928 fn format_unguarded_listing_empty_input_yields_empty_string() {
3929 let listing = format_unguarded_repository_listing(&[]);
3930 assert!(listing.is_empty());
3931 }
3932
3933 #[test]
3934 fn format_missing_policy_listing_includes_policy_call_hint() {
3935 let missing = vec![("Post".to_owned(), "/api/posts".to_owned())];
3936 let listing = format_missing_policy_listing(&missing);
3937 assert!(listing.contains("Post"));
3938 assert!(listing.contains("/api/posts"));
3939 assert!(listing.contains(".policy::<Post, _>"));
3940 assert!(listing.contains("policy = ..."));
3941 }
3942
3943 #[test]
3944 fn format_missing_scope_listing_includes_scope_call_hint() {
3945 let missing = vec![("Post".to_owned(), "/api/posts".to_owned())];
3946 let listing = format_missing_scope_listing(&missing);
3947 assert!(listing.contains("Post"));
3948 assert!(listing.contains("/api/posts"));
3949 assert!(listing.contains(".scope::<Post, _>"));
3950 assert!(listing.contains("scope = ..."));
3951 }
3952
3953 #[test]
3956 fn collect_unguarded_walks_scoped_groups() {
3957 let group_route = build_route(
3962 http::Method::POST,
3963 "/api/posts",
3964 Some(unguarded("/api/posts", "Post")),
3965 );
3966 let group = ScopedGroup {
3967 prefix: "/scoped".to_owned(),
3968 routes: vec![group_route],
3969 source: crate::route_listing::RouteSource::User,
3970 apply_layer: Box::new(|r| r),
3971 };
3972 let offenders = collect_unguarded_repository_writes(&[], std::slice::from_ref(&group));
3973 assert_eq!(offenders.len(), 1);
3974 assert_eq!(offenders[0].0, "Post");
3975 }
3976
3977 #[test]
3978 fn collect_unregistered_walks_scoped_groups() {
3979 let group_route = build_route(
3980 http::Method::POST,
3981 "/api/posts",
3982 Some(guarded_with_check("/api/posts", "TestPost")),
3983 );
3984 let group = ScopedGroup {
3985 prefix: "/scoped".to_owned(),
3986 routes: vec![group_route],
3987 source: crate::route_listing::RouteSource::User,
3988 apply_layer: Box::new(|r| r),
3989 };
3990 let registry = PolicyRegistry::default();
3991 let (missing, _) =
3992 collect_unregistered_repository_handlers(&[], std::slice::from_ref(&group), ®istry);
3993 assert_eq!(missing.len(), 1);
3994 assert_eq!(missing[0].0, "TestPost");
3995 }
3996}
3997
3998fn build_state(
3999 config: &AutumnConfig,
4000 #[cfg(feature = "db")] database_topology: Option<&crate::db::DatabaseTopology>,
4001 #[cfg(feature = "ws")] channels_backend: Option<Arc<dyn crate::channels::ChannelsBackend>>,
4002) -> AppState {
4003 #[cfg(feature = "ws")]
4004 let shutdown = tokio_util::sync::CancellationToken::new();
4005 #[cfg(feature = "ws")]
4006 let channels = channels_backend.map_or_else(
4007 || {
4008 crate::channels::Channels::from_config(&config.channels, shutdown.child_token())
4009 .unwrap_or_else(|error| {
4010 tracing::error!(error = %error, "Failed to configure channels backend");
4011 std::process::exit(1);
4012 })
4013 },
4014 crate::channels::Channels::with_shared_backend,
4015 );
4016
4017 let state = AppState {
4018 extensions: std::sync::Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
4019 #[cfg(feature = "db")]
4020 pool: database_topology.map(|topology| topology.primary().clone()),
4021 #[cfg(feature = "db")]
4022 replica_pool: database_topology.and_then(|topology| topology.replica().cloned()),
4023 profile: config.profile.clone(),
4024 started_at: std::time::Instant::now(),
4025 health_detailed: config.health.detailed,
4026 probes: crate::probe::ProbeState::pending_startup(),
4027 metrics: crate::middleware::MetricsCollector::new(),
4028 log_levels: crate::actuator::LogLevels::new(&config.log.level),
4029 task_registry: crate::actuator::TaskRegistry::new(),
4030 job_registry: crate::actuator::JobRegistry::new(),
4031 config_props: crate::actuator::ConfigProperties::from_config(config),
4032 #[cfg(feature = "ws")]
4033 channels,
4034 #[cfg(feature = "ws")]
4035 shutdown,
4036 policy_registry: crate::authorization::PolicyRegistry::default(),
4037 forbidden_response: config.security.forbidden_response,
4038 auth_session_key: config.auth.session_key.clone(),
4039 shared_cache: None,
4040 };
4041 #[cfg(feature = "db")]
4042 if state.replica_pool.is_some() {
4043 state
4044 .probes()
4045 .configure_replica_dependency(config.database.replica_fallback);
4046 }
4047 state.insert_extension(config.clone());
4048 state
4049}
4050
4051fn format_route_lines(
4053 routes: &[Route],
4054 scoped_groups: &[ScopedGroup],
4055 config: &AutumnConfig,
4056) -> String {
4057 use std::fmt::Write as _;
4058
4059 let mut out = String::new();
4060 for route in routes {
4061 let _ = write!(
4062 out,
4063 "\n {} {:<8} -> {}",
4064 route.path, route.method, route.name
4065 );
4066 }
4067 for group in scoped_groups {
4068 for route in &group.routes {
4069 let _ = write!(
4070 out,
4071 "\n {}{} {:<8} -> {} (scoped)",
4072 group.prefix, route.path, route.method, route.name
4073 );
4074 }
4075 }
4076 let mut probe_paths = std::collections::HashSet::new();
4077 for (path, name) in [
4078 (config.health.live_path.as_str(), "live"),
4079 (config.health.ready_path.as_str(), "ready"),
4080 (config.health.startup_path.as_str(), "startup"),
4081 (config.health.path.as_str(), "health"),
4082 ] {
4083 if probe_paths.insert(path) {
4084 let _ = write!(out, "\n {} {:<8} -> {}", path, "GET", name);
4085 }
4086 }
4087 let _ = write!(
4088 out,
4089 "\n {} {:<8} -> actuator",
4090 crate::actuator::actuator_route_glob(&config.actuator.prefix),
4091 "GET"
4092 );
4093 #[cfg(feature = "htmx")]
4094 {
4095 out.push_str("\n /static/js/htmx.min.js GET -> htmx");
4096 out.push_str("\n /static/js/autumn-htmx-csrf.js GET -> htmx csrf");
4097 }
4098 out
4099}
4100
4101fn format_task_lines(tasks: &[crate::task::TaskInfo]) -> Option<String> {
4103 use std::fmt::Write as _;
4104
4105 if tasks.is_empty() {
4106 return None;
4107 }
4108
4109 let mut out = String::new();
4110 for task in tasks {
4111 let schedule = task.schedule.to_string();
4112 let _ = write!(out, "\n {} ({schedule})", task.name);
4113 }
4114 Some(out)
4115}
4116
4117fn format_middleware_list(config: &AutumnConfig) -> String {
4119 let mut items = vec![
4120 "RequestId",
4121 "SecurityHeaders",
4122 "Session (in-memory)",
4123 "ErrorPages",
4124 ];
4125 if !config.cors.allowed_origins.is_empty() {
4126 items.push("CORS");
4127 }
4128 if config.security.csrf.enabled {
4129 items.push("CSRF");
4130 }
4131 items.push("Metrics");
4132 items.join(", ")
4133}
4134
4135fn mask_database_url(url: &str, pool_size: usize) -> String {
4137 if let Ok(mut parsed_url) = url::Url::parse(url) {
4138 if parsed_url.password().is_some() {
4139 let _ = parsed_url.set_password(Some("****"));
4140 return format!("{parsed_url} (pool_size={pool_size})");
4141 }
4142 format!("{parsed_url} (pool_size={pool_size})")
4143 } else {
4144 format!("**** (pool_size={pool_size})")
4147 }
4148}
4149
4150fn format_config_summary(config: &AutumnConfig) -> String {
4152 let profile = config.profile.as_deref().unwrap_or("none");
4153 let db_status = config.database.effective_primary_url().map_or_else(
4154 || "not configured".to_owned(),
4155 |url| {
4156 let primary = mask_database_url(url, config.database.effective_primary_pool_size());
4157 if config.database.replica_url.is_some() {
4158 format!(
4159 "primary={primary}, replica=configured (pool_size={})",
4160 config.database.effective_replica_pool_size()
4161 )
4162 } else {
4163 primary
4164 }
4165 },
4166 );
4167 let telemetry_status = if config.telemetry.enabled {
4168 let endpoint = config
4169 .telemetry
4170 .otlp_endpoint
4171 .as_deref()
4172 .unwrap_or("<missing endpoint>");
4173 format!("{:?} -> {endpoint}", config.telemetry.protocol)
4174 } else {
4175 "disabled".to_owned()
4176 };
4177 format!(
4178 "\
4179 \n profile: {profile}\
4180 \n server: {}:{}\
4181 \n database: {db_status}\
4182 \n log_level: {}\
4183 \n log_format: {:?}\
4184 \n telemetry: {telemetry_status}\
4185 \n health: {} (detailed={})\
4186 \n actuator: sensitive={}\
4187 \n shutdown: {}s",
4188 config.server.host,
4189 config.server.port,
4190 config.log.level,
4191 config.log.format,
4192 config.health.path,
4193 config.health.detailed,
4194 config.actuator.sensitive,
4195 config.server.shutdown_timeout_secs,
4196 )
4197}
4198
4199pub(crate) fn project_dir(subdir: &str, env: &dyn crate::config::Env) -> std::path::PathBuf {
4202 env.var("AUTUMN_MANIFEST_DIR").map_or_else(
4203 |_| std::path::PathBuf::from(subdir),
4204 |d| std::path::PathBuf::from(d).join(subdir),
4205 )
4206}
4207
4208async fn shutdown_signal() {
4213 let ctrl_c = async {
4214 tokio::signal::ctrl_c()
4215 .await
4216 .expect("Failed to install Ctrl+C handler");
4217 tracing::info!("Received Ctrl+C, starting graceful shutdown");
4218 };
4219
4220 #[cfg(unix)]
4221 let terminate = async {
4222 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
4223 .expect("Failed to install SIGTERM handler")
4224 .recv()
4225 .await;
4226 tracing::info!("Received SIGTERM, starting graceful shutdown");
4227 };
4228
4229 #[cfg(not(unix))]
4230 let terminate = std::future::pending::<()>();
4231
4232 tokio::select! {
4233 () = ctrl_c => {},
4234 () = terminate => {},
4235 }
4236}
4237
4238#[cfg(test)]
4239mod tests {
4240 use super::*;
4241 use axum::body::Body;
4242 use axum::http::{Request, StatusCode};
4243 use std::sync::atomic::{AtomicUsize, Ordering};
4244 use tower::ServiceExt;
4245
4246 #[cfg(feature = "mail")]
4249 struct MailTestNoopQueue;
4250
4251 #[cfg(feature = "mail")]
4252 impl crate::mail::MailDeliveryQueue for MailTestNoopQueue {
4253 fn enqueue<'a>(
4254 &'a self,
4255 _mail: crate::mail::Mail,
4256 ) -> std::pin::Pin<
4257 Box<dyn std::future::Future<Output = Result<(), crate::mail::MailError>> + Send + 'a>,
4258 > {
4259 Box::pin(async { Ok(()) })
4260 }
4261 }
4262
4263 #[cfg(feature = "mail")]
4264 fn test_mail() -> crate::mail::Mail {
4265 crate::mail::Mail::builder()
4266 .to("test@example.com")
4267 .subject("hi")
4268 .text("hello")
4269 .build()
4270 .expect("test mail should build")
4271 }
4272
4273 pub fn test_router(routes: Vec<Route>) -> axum::Router {
4275 let config = AutumnConfig::default();
4276 let state = AppState {
4277 extensions: std::sync::Arc::new(std::sync::RwLock::new(
4278 std::collections::HashMap::new(),
4279 )),
4280 #[cfg(feature = "db")]
4281 pool: None,
4282 #[cfg(feature = "db")]
4283 replica_pool: None,
4284 profile: None,
4285 started_at: std::time::Instant::now(),
4286 health_detailed: true,
4287 probes: crate::probe::ProbeState::ready_for_test(),
4288 metrics: crate::middleware::MetricsCollector::new(),
4289 log_levels: crate::actuator::LogLevels::new("info"),
4290 task_registry: crate::actuator::TaskRegistry::new(),
4291 job_registry: crate::actuator::JobRegistry::new(),
4292 config_props: crate::actuator::ConfigProperties::default(),
4293 #[cfg(feature = "ws")]
4294 channels: crate::channels::Channels::new(32),
4295 #[cfg(feature = "ws")]
4296 shutdown: tokio_util::sync::CancellationToken::new(),
4297 policy_registry: crate::authorization::PolicyRegistry::default(),
4298 forbidden_response: crate::authorization::ForbiddenResponse::default(),
4299 auth_session_key: "user_id".to_owned(),
4300 shared_cache: None,
4301 };
4302 crate::router::build_router(routes, &config, state)
4303 }
4304
4305 #[cfg(feature = "db")]
4306 #[test]
4307 fn build_state_applies_replica_fallback_policy_to_read_routing() {
4308 let mut config = AutumnConfig::default();
4309 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
4310 config.database.primary_pool_size = Some(5);
4311 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
4312 config.database.replica_pool_size = Some(2);
4313 config.database.replica_fallback = crate::config::ReplicaFallback::Primary;
4314 let topology = crate::db::create_topology(&config.database)
4315 .expect("topology should build")
4316 .expect("database should be configured");
4317
4318 let state = build_state(
4319 &config,
4320 Some(&topology),
4321 #[cfg(feature = "ws")]
4322 None,
4323 );
4324 state
4325 .probes()
4326 .mark_replica_unready("replica migrations lag primary");
4327
4328 assert_eq!(state.read_pool().expect("read pool").status().max_size, 5);
4329 }
4330
4331 #[cfg(feature = "db")]
4332 #[tokio::test]
4333 async fn custom_pool_provider_preserves_configured_replica_topology() {
4334 struct PassthroughPoolProvider;
4335
4336 impl crate::db::DatabasePoolProvider for PassthroughPoolProvider {
4337 async fn create_pool(
4338 &self,
4339 config: &crate::config::DatabaseConfig,
4340 ) -> Result<
4341 Option<
4342 diesel_async::pooled_connection::deadpool::Pool<
4343 diesel_async::AsyncPgConnection,
4344 >,
4345 >,
4346 crate::db::PoolError,
4347 > {
4348 crate::db::create_pool(config)
4349 }
4350 }
4351
4352 let mut config = AutumnConfig::default();
4353 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
4354 config.database.primary_pool_size = Some(5);
4355 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
4356 config.database.replica_pool_size = Some(2);
4357 config.database.replica_fallback = crate::config::ReplicaFallback::FailReadiness;
4358 let AppBuilder {
4359 pool_provider_factory,
4360 ..
4361 } = app().with_pool_provider(PassthroughPoolProvider);
4362
4363 let database = setup_database(&config, Vec::new(), pool_provider_factory)
4364 .await
4365 .expect("custom provider should build database topology");
4366 let topology = database.topology.expect("database should be configured");
4367
4368 assert_eq!(topology.primary().status().max_size, 5);
4369 assert_eq!(
4370 topology
4371 .replica()
4372 .expect("custom provider should create replica pool")
4373 .status()
4374 .max_size,
4375 2
4376 );
4377
4378 let state = build_state(
4379 &config,
4380 Some(&topology),
4381 #[cfg(feature = "ws")]
4382 None,
4383 );
4384 state
4385 .probes()
4386 .mark_replica_connection_unready("replica connection failed");
4387
4388 assert!(state.read_pool().is_none());
4389 let (status, _) = crate::probe::readiness_response(&state).await;
4390 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
4391 }
4392
4393 #[cfg(feature = "db")]
4394 #[test]
4395 fn configure_replica_migration_check_stores_recheck_urls() {
4396 let mut config = AutumnConfig::default();
4397 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
4398 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
4399 let topology = crate::db::create_topology(&config.database)
4400 .expect("topology should build")
4401 .expect("database should be configured");
4402
4403 let state = build_state(
4404 &config,
4405 Some(&topology),
4406 #[cfg(feature = "ws")]
4407 None,
4408 );
4409
4410 assert!(
4411 state.probes().replica_migration_check().is_none(),
4412 "build_state should not enable migration checks without registered migrations"
4413 );
4414
4415 configure_replica_migration_check(
4416 &state,
4417 Some((
4418 "postgres://localhost/primary".to_owned(),
4419 "postgres://localhost/replica".to_owned(),
4420 )),
4421 );
4422
4423 let check = state
4424 .probes()
4425 .replica_migration_check()
4426 .expect("replica migration check should be configured");
4427
4428 assert_eq!(check.primary_url, "postgres://localhost/primary");
4429 assert_eq!(check.replica_url, "postgres://localhost/replica");
4430 }
4431
4432 #[cfg(feature = "db")]
4433 #[tokio::test]
4434 async fn replica_migration_readiness_marks_ready_endpoint_degraded() {
4435 let mut config = AutumnConfig::default();
4436 config.database.primary_url = Some("postgres://localhost/primary".to_owned());
4437 config.database.primary_pool_size = Some(5);
4438 config.database.replica_url = Some("postgres://localhost/replica".to_owned());
4439 config.database.replica_pool_size = Some(2);
4440 config.database.replica_fallback = crate::config::ReplicaFallback::FailReadiness;
4441 let topology = crate::db::create_topology(&config.database)
4442 .expect("topology should build")
4443 .expect("database should be configured");
4444 let state = build_state(
4445 &config,
4446 Some(&topology),
4447 #[cfg(feature = "ws")]
4448 None,
4449 );
4450
4451 apply_replica_migration_readiness(
4452 &state,
4453 Some(crate::migrate::ReplicaMigrationReadiness::Stale {
4454 primary_latest: Some("00000000000002".to_owned()),
4455 replica_latest: Some("00000000000001".to_owned()),
4456 }),
4457 );
4458
4459 let (status, _) = crate::probe::readiness_response(&state).await;
4460
4461 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
4462 }
4463
4464 #[cfg(feature = "db")]
4465 #[tokio::test]
4466 async fn blocking_replica_migration_readiness_reports_unknown_connection_errors() {
4467 let readiness = crate::migrate::check_replica_migration_readiness_blocking(
4468 "not-a-primary-url".to_owned(),
4469 "not-a-replica-url".to_owned(),
4470 )
4471 .await;
4472
4473 assert!(matches!(
4474 readiness,
4475 crate::migrate::ReplicaMigrationReadiness::Unknown(_)
4476 ));
4477 }
4478
4479 #[cfg(feature = "ws")]
4480 #[test]
4481 fn with_channels_backend_overrides_config_driven_backend_selection() {
4482 let builder = app().with_channels_backend(crate::channels::LocalChannelsBackend::new(4));
4483 let AppBuilder {
4484 channels_backend, ..
4485 } = builder;
4486 assert!(channels_backend.is_some());
4487
4488 let mut config = AutumnConfig::default();
4489 config.channels.backend = crate::config::ChannelBackend::Redis;
4490 config.channels.redis.url = None;
4491
4492 let state = build_state(
4493 &config,
4494 #[cfg(feature = "db")]
4495 None,
4496 #[cfg(feature = "ws")]
4497 channels_backend,
4498 );
4499 let mut rx = state.channels().subscribe("override");
4500
4501 state
4502 .broadcast()
4503 .publish("override", "ok")
4504 .expect("custom local backend should publish");
4505
4506 assert_eq!(rx.try_recv().expect("message should arrive").as_str(), "ok");
4507 }
4508
4509 pub fn test_get_route(path: &'static str, name: &'static str) -> Route {
4511 Route {
4512 method: http::Method::GET,
4513 path,
4514 handler: axum::routing::get(|| async { "ok" }),
4515 name,
4516 api_doc: crate::openapi::ApiDoc {
4517 method: "GET",
4518 path,
4519 operation_id: name,
4520 success_status: 200,
4521 ..Default::default()
4522 },
4523 repository: None,
4524 }
4525 }
4526
4527 #[cfg(feature = "i18n")]
4528 fn test_i18n_bundle(key: &str, value: &str) -> Arc<crate::i18n::Bundle> {
4529 let mut messages = std::collections::HashMap::new();
4530 let mut en = std::collections::HashMap::new();
4531 en.insert(key.to_owned(), value.to_owned());
4532 messages.insert("en".to_owned(), en);
4533 Arc::new(crate::i18n::Bundle::from_messages(
4534 messages,
4535 &crate::i18n::I18nConfig::default(),
4536 ))
4537 }
4538
4539 #[cfg(feature = "i18n")]
4540 #[test]
4541 fn i18n_auto_defers_loading_until_runtime_config_is_available() {
4542 let builder = app().i18n_auto();
4543
4544 assert!(builder.i18n_bundle.is_none());
4545 assert!(builder.i18n_auto_load);
4546 }
4547
4548 #[cfg(feature = "i18n")]
4549 #[derive(Clone)]
4550 struct StaticConfigLoader {
4551 config: AutumnConfig,
4552 }
4553
4554 #[cfg(feature = "i18n")]
4555 impl crate::config::ConfigLoader for StaticConfigLoader {
4556 async fn load(&self) -> Result<AutumnConfig, crate::config::ConfigError> {
4557 Ok(self.config.clone())
4558 }
4559 }
4560
4561 #[cfg(feature = "i18n")]
4562 struct NoopTelemetryProvider;
4563
4564 #[cfg(feature = "i18n")]
4565 impl crate::telemetry::TelemetryProvider for NoopTelemetryProvider {
4566 fn init(
4567 &self,
4568 _log: &crate::config::LogConfig,
4569 _telemetry: &crate::config::TelemetryConfig,
4570 _profile: Option<&str>,
4571 ) -> Result<crate::telemetry::TelemetryGuard, crate::telemetry::TelemetryInitError>
4572 {
4573 Ok(crate::telemetry::TelemetryGuard::disabled())
4574 }
4575 }
4576
4577 #[cfg(feature = "i18n")]
4578 #[tokio::test]
4579 async fn i18n_auto_uses_config_loader_output_for_bundle_dir() {
4580 let project = tempfile::tempdir().expect("project dir");
4581 let i18n_dir = project.path().join("custom-i18n");
4582 std::fs::create_dir_all(&i18n_dir).expect("i18n dir");
4583 std::fs::write(i18n_dir.join("en.ftl"), "nav.home = Loader Home\n").expect("bundle");
4584
4585 let mut config = AutumnConfig::default();
4586 config.i18n.dir = "custom-i18n".to_owned();
4587 let builder = app()
4588 .with_config_loader(StaticConfigLoader { config })
4589 .with_telemetry_provider(NoopTelemetryProvider)
4590 .i18n_auto();
4591 let AppBuilder {
4592 config_loader_factory,
4593 telemetry_provider,
4594 i18n_bundle,
4595 i18n_auto_load,
4596 ..
4597 } = builder;
4598
4599 let (loaded_config, _guard) =
4600 load_config_and_telemetry(config_loader_factory, telemetry_provider).await;
4601 let env = crate::config::MockEnv::new().with(
4602 "AUTUMN_MANIFEST_DIR",
4603 project.path().to_str().expect("utf-8 path"),
4604 );
4605 let bundle = resolve_i18n_bundle(i18n_bundle, i18n_auto_load, &loaded_config, &env)
4606 .expect("bundle loaded from configured dir");
4607
4608 assert_eq!(bundle.translate("en", "nav.home", &[]), "Loader Home");
4609 }
4610
4611 #[cfg(feature = "i18n")]
4612 #[tokio::test]
4613 async fn i18n_bundle_layer_is_applied_to_static_route_rendering() {
4614 async fn localized(locale: crate::i18n::Locale) -> String {
4615 locale.t("nav.home")
4616 }
4617
4618 let config = AutumnConfig::default();
4619 let state = AppState::for_test();
4620 let custom_layers = install_i18n_bundle_layer(
4621 Vec::new(),
4622 &state,
4623 Some(test_i18n_bundle("nav.home", "Home")),
4624 );
4625 let router = crate::router::try_build_router_inner(
4626 vec![Route {
4627 method: http::Method::GET,
4628 path: "/about",
4629 handler: axum::routing::get(localized),
4630 name: "localized",
4631 api_doc: crate::openapi::ApiDoc {
4632 method: "GET",
4633 path: "/about",
4634 operation_id: "localized",
4635 success_status: 200,
4636 ..Default::default()
4637 },
4638 repository: None,
4639 }],
4640 &config,
4641 state,
4642 crate::router::RouterContext {
4643 exception_filters: Vec::new(),
4644 scoped_groups: Vec::new(),
4645 merge_routers: Vec::new(),
4646 nest_routers: Vec::new(),
4647 custom_layers,
4648 error_page_renderer: None,
4649 session_store: None,
4650 #[cfg(feature = "openapi")]
4651 openapi: None,
4652 },
4653 )
4654 .expect("router builds");
4655 let tmp = tempfile::tempdir().expect("dist parent");
4656 let dist = tmp.path().join("dist");
4657
4658 crate::static_gen::render_static_routes(
4659 router,
4660 &[crate::static_gen::StaticRouteMeta {
4661 path: "/about",
4662 name: "localized",
4663 revalidate: None,
4664 params_fn: None,
4665 }],
4666 &dist,
4667 )
4668 .await
4669 .expect("static render succeeds");
4670
4671 let html = std::fs::read_to_string(dist.join("about/index.html")).expect("rendered html");
4672 assert_eq!(html, "Home");
4673 }
4674
4675 #[test]
4676 fn app_builder_routes_adds_routes() {
4677 let builder = app();
4678 assert_eq!(builder.routes.len(), 0);
4679
4680 let builder = builder.routes(vec![test_get_route("/1", "route1")]);
4681 assert_eq!(builder.routes.len(), 1);
4682
4683 let builder = builder.routes(vec![
4684 test_get_route("/2", "route2"),
4685 test_get_route("/3", "route3"),
4686 ]);
4687 assert_eq!(builder.routes.len(), 3);
4688
4689 assert_eq!(builder.routes[0].path, "/1");
4690 assert_eq!(builder.routes[1].path, "/2");
4691 assert_eq!(builder.routes[2].path, "/3");
4692 }
4693
4694 #[test]
4695 fn app_builder_extensions_store_and_update_typed_values() {
4696 let builder = app()
4697 .with_extension::<String>("haunted".into())
4698 .update_extension::<String, _, _>(String::new, |value| value.push_str(" harvest"));
4699
4700 let value = builder
4701 .extension::<String>()
4702 .expect("string extension should be present");
4703 assert_eq!(value, "haunted harvest");
4704 }
4705
4706 #[cfg(feature = "mail")]
4707 #[tokio::test]
4708 async fn app_builder_with_mail_delivery_queue_stores_queue_for_install() {
4709 let builder = app().with_mail_delivery_queue(MailTestNoopQueue);
4710 let factory = builder
4711 .mail_delivery_queue_factory
4712 .expect("with_mail_delivery_queue should store a factory on the builder");
4713
4714 let state = AppState::for_test();
4717 let queue = factory(&state).expect("trivial factory should produce the queue");
4718 assert!(Arc::strong_count(&queue) >= 1);
4719 queue
4721 .enqueue(test_mail())
4722 .await
4723 .expect("noop queue should always succeed");
4724 }
4725
4726 #[cfg(feature = "mail")]
4727 #[test]
4728 fn app_builder_with_mail_delivery_queue_factory_runs_with_app_state() {
4729 let observed_profile: Arc<std::sync::Mutex<Option<String>>> =
4730 Arc::new(std::sync::Mutex::new(None));
4731 let captured = Arc::clone(&observed_profile);
4732 let builder = app().with_mail_delivery_queue_factory(move |state| {
4733 *captured.lock().expect("lock") = Some(state.profile().to_owned());
4734 Ok::<_, crate::AutumnError>(MailTestNoopQueue)
4735 });
4736
4737 let factory = builder
4738 .mail_delivery_queue_factory
4739 .expect("factory should be stored on the builder");
4740 let state = AppState::for_test().with_profile("dev");
4741 let _queue = factory(&state).expect("factory should succeed");
4742
4743 assert_eq!(
4744 observed_profile.lock().expect("lock").as_deref(),
4745 Some("dev"),
4746 "factory must run with the live AppState"
4747 );
4748 }
4749
4750 #[cfg(feature = "mail")]
4751 #[test]
4752 fn app_builder_with_mail_delivery_queue_factory_propagates_errors() {
4753 let builder = app().with_mail_delivery_queue_factory(|_state| {
4754 Err::<MailTestNoopQueue, _>(crate::AutumnError::service_unavailable_msg("factory boom"))
4755 });
4756
4757 let factory = builder
4758 .mail_delivery_queue_factory
4759 .expect("factory present");
4760 let state = AppState::for_test();
4761 match factory(&state) {
4762 Ok(_) => panic!("factory should have errored"),
4763 Err(err) => assert!(err.to_string().contains("factory boom")),
4764 }
4765 }
4766
4767 #[tokio::test]
4768 async fn startup_and_shutdown_hooks_run_in_expected_order() {
4769 let events = Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
4770 let startup_events = Arc::clone(&events);
4771 let shutdown_a = Arc::clone(&events);
4772 let shutdown_b = Arc::clone(&events);
4773 let builder = app()
4774 .on_startup(move |_state| {
4775 let startup_events = Arc::clone(&startup_events);
4776 async move {
4777 startup_events
4778 .lock()
4779 .expect("events lock poisoned")
4780 .push("start");
4781 Ok(())
4782 }
4783 })
4784 .on_shutdown(move || {
4785 let shutdown_a = Arc::clone(&shutdown_a);
4786 async move {
4787 shutdown_a
4788 .lock()
4789 .expect("events lock poisoned")
4790 .push("stop-a");
4791 }
4792 })
4793 .on_shutdown(move || {
4794 let shutdown_b = Arc::clone(&shutdown_b);
4795 async move {
4796 shutdown_b
4797 .lock()
4798 .expect("events lock poisoned")
4799 .push("stop-b");
4800 }
4801 });
4802
4803 run_startup_hooks(&builder.startup_hooks, AppState::for_test())
4804 .await
4805 .expect("startup hooks should succeed");
4806 run_shutdown_hooks(&builder.shutdown_hooks).await;
4807
4808 let recorded_events = events.lock().expect("events lock poisoned").clone();
4809 assert_eq!(recorded_events, vec!["start", "stop-b", "stop-a"]);
4810 }
4811
4812 fn startup_noop_job_handler(
4813 _state: AppState,
4814 _payload: serde_json::Value,
4815 ) -> Pin<Box<dyn Future<Output = crate::AutumnResult<()>> + Send + 'static>> {
4816 Box::pin(async move { Ok(()) })
4817 }
4818
4819 #[tokio::test]
4820 async fn startup_hooks_can_enqueue_jobs_after_runtime_init() {
4821 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
4822 crate::job::clear_global_job_client();
4823
4824 let builder = app()
4825 .jobs(vec![crate::job::JobInfo {
4826 name: "startup-seed".to_string(),
4827 max_attempts: 1,
4828 initial_backoff_ms: 1,
4829 handler: startup_noop_job_handler,
4830 }])
4831 .on_startup(|_state| async {
4832 crate::job::enqueue("startup-seed", serde_json::json!({ "kind": "warmup" })).await
4833 });
4834
4835 let state = AppState::for_test().with_profile("dev");
4836 let shutdown = tokio_util::sync::CancellationToken::new();
4837
4838 initialize_job_runtime(
4839 builder.jobs.clone(),
4840 &state,
4841 &shutdown,
4842 &crate::config::JobConfig::default(),
4843 )
4844 .expect("job runtime should initialize before startup hooks");
4845
4846 run_startup_hooks(&builder.startup_hooks, state.clone())
4847 .await
4848 .expect("startup hook should be able to enqueue jobs");
4849
4850 tokio::time::timeout(std::time::Duration::from_secs(1), async {
4851 loop {
4852 let snapshot = state.job_registry().snapshot();
4853 let status = snapshot
4854 .get("startup-seed")
4855 .expect("job should be registered before startup hooks run");
4856 if status.total_successes == 1 {
4857 break;
4858 }
4859 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
4860 }
4861 })
4862 .await
4863 .expect("startup-enqueued job should complete");
4864
4865 shutdown.cancel();
4866 crate::job::clear_global_job_client();
4867 }
4868
4869 #[tokio::test]
4870 async fn initialize_job_runtime_propagates_redis_init_errors() {
4871 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
4872 crate::job::clear_global_job_client();
4873
4874 let state = AppState::for_test().with_profile("dev");
4875 let shutdown = tokio_util::sync::CancellationToken::new();
4876 let config = crate::config::JobConfig {
4877 backend: "redis".to_string(),
4878 ..Default::default()
4879 };
4880
4881 let error = initialize_job_runtime(
4882 vec![crate::job::JobInfo {
4883 name: "startup-seed".to_string(),
4884 max_attempts: 1,
4885 initial_backoff_ms: 1,
4886 handler: startup_noop_job_handler,
4887 }],
4888 &state,
4889 &shutdown,
4890 &config,
4891 )
4892 .expect_err("redis init errors should abort startup");
4893
4894 #[cfg(feature = "redis")]
4895 assert!(
4896 error
4897 .to_string()
4898 .contains("jobs.backend=redis requires jobs.redis.url"),
4899 "unexpected error: {error}"
4900 );
4901
4902 #[cfg(not(feature = "redis"))]
4903 assert!(
4904 error
4905 .to_string()
4906 .contains("jobs.backend=redis requested but redis feature is disabled"),
4907 "unexpected error: {error}"
4908 );
4909 }
4910
4911 #[tokio::test]
4912 async fn startup_hook_errors_propagate() {
4913 let builder = app().on_startup(|_state| async {
4914 Err(crate::AutumnError::service_unavailable_msg(
4915 "startup ritual failed",
4916 ))
4917 });
4918
4919 let error = run_startup_hooks(&builder.startup_hooks, AppState::for_test())
4920 .await
4921 .expect_err("startup hook should fail");
4922 assert!(error.to_string().contains("startup ritual failed"));
4923 }
4924
4925 #[tokio::test]
4926 async fn build_router_mounts_user_routes() {
4927 let router = test_router(vec![test_get_route("/test", "test_handler")]);
4928
4929 let response = router
4930 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
4931 .await
4932 .unwrap();
4933
4934 assert_eq!(response.status(), StatusCode::OK);
4935 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
4936 .await
4937 .unwrap();
4938 assert_eq!(&body[..], b"ok");
4939 }
4940
4941 #[tokio::test]
4942 async fn build_router_mounts_health_check_at_default_path() {
4943 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
4944
4945 let response = router
4946 .oneshot(
4947 Request::builder()
4948 .uri("/health")
4949 .body(Body::empty())
4950 .unwrap(),
4951 )
4952 .await
4953 .unwrap();
4954
4955 assert_eq!(response.status(), StatusCode::OK);
4956 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
4957 .await
4958 .unwrap();
4959 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
4960 assert_eq!(json["status"], "ok");
4961 }
4962
4963 #[tokio::test]
4964 async fn build_router_mounts_health_check_at_custom_path() {
4965 let mut config = AutumnConfig::default();
4966 config.health.path = "/healthz".to_owned();
4967 let state = AppState {
4968 extensions: std::sync::Arc::new(std::sync::RwLock::new(
4969 std::collections::HashMap::new(),
4970 )),
4971 #[cfg(feature = "db")]
4972 pool: None,
4973 #[cfg(feature = "db")]
4974 replica_pool: None,
4975 profile: None,
4976 started_at: std::time::Instant::now(),
4977 health_detailed: true,
4978 probes: crate::probe::ProbeState::ready_for_test(),
4979 metrics: crate::middleware::MetricsCollector::new(),
4980 log_levels: crate::actuator::LogLevels::new("info"),
4981 task_registry: crate::actuator::TaskRegistry::new(),
4982 job_registry: crate::actuator::JobRegistry::new(),
4983 config_props: crate::actuator::ConfigProperties::default(),
4984 #[cfg(feature = "ws")]
4985 channels: crate::channels::Channels::new(32),
4986 #[cfg(feature = "ws")]
4987 shutdown: tokio_util::sync::CancellationToken::new(),
4988 policy_registry: crate::authorization::PolicyRegistry::default(),
4989 forbidden_response: crate::authorization::ForbiddenResponse::default(),
4990 auth_session_key: "user_id".to_owned(),
4991 shared_cache: None,
4992 };
4993 let router =
4994 crate::router::build_router(vec![test_get_route("/dummy", "dummy")], &config, state);
4995
4996 let response = router
4997 .oneshot(
4998 Request::builder()
4999 .uri("/healthz")
5000 .body(Body::empty())
5001 .unwrap(),
5002 )
5003 .await
5004 .unwrap();
5005
5006 assert_eq!(response.status(), StatusCode::OK);
5007 }
5008
5009 #[tokio::test]
5010 async fn build_router_adds_request_id_header() {
5011 let router = test_router(vec![test_get_route("/test", "test")]);
5012
5013 let response = router
5014 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
5015 .await
5016 .unwrap();
5017
5018 assert!(response.headers().contains_key("x-request-id"));
5019 }
5020
5021 #[tokio::test]
5022 async fn build_router_unknown_route_returns_404() {
5023 let router = test_router(vec![test_get_route("/exists", "exists")]);
5024
5025 let response = router
5026 .oneshot(Request::builder().uri("/nope").body(Body::empty()).unwrap())
5027 .await
5028 .unwrap();
5029
5030 assert_eq!(response.status(), StatusCode::NOT_FOUND);
5031 }
5032
5033 #[tokio::test]
5034 async fn build_router_multiple_routes() {
5035 let router = test_router(vec![test_get_route("/a", "a"), test_get_route("/b", "b")]);
5036
5037 let resp_a = router
5038 .clone()
5039 .oneshot(Request::builder().uri("/a").body(Body::empty()).unwrap())
5040 .await
5041 .unwrap();
5042 assert_eq!(resp_a.status(), StatusCode::OK);
5043
5044 let resp_b = router
5045 .oneshot(Request::builder().uri("/b").body(Body::empty()).unwrap())
5046 .await
5047 .unwrap();
5048 assert_eq!(resp_b.status(), StatusCode::OK);
5049 }
5050
5051 #[tokio::test]
5052 async fn build_router_post_route() {
5053 let post_routes = vec![Route {
5054 method: http::Method::POST,
5055 path: "/submit",
5056 handler: axum::routing::post(|| async { "posted" }),
5057 name: "submit",
5058 api_doc: crate::openapi::ApiDoc {
5059 method: "POST",
5060 path: "/submit",
5061 operation_id: "submit",
5062 success_status: 200,
5063 ..Default::default()
5064 },
5065 repository: None,
5066 }];
5067 let config = AutumnConfig::default();
5068 let state = AppState {
5069 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5070 std::collections::HashMap::new(),
5071 )),
5072 #[cfg(feature = "db")]
5073 pool: None,
5074 #[cfg(feature = "db")]
5075 replica_pool: None,
5076 profile: None,
5077 started_at: std::time::Instant::now(),
5078 health_detailed: true,
5079 probes: crate::probe::ProbeState::ready_for_test(),
5080 metrics: crate::middleware::MetricsCollector::new(),
5081 log_levels: crate::actuator::LogLevels::new("info"),
5082 task_registry: crate::actuator::TaskRegistry::new(),
5083 job_registry: crate::actuator::JobRegistry::new(),
5084 config_props: crate::actuator::ConfigProperties::default(),
5085 #[cfg(feature = "ws")]
5086 channels: crate::channels::Channels::new(32),
5087 #[cfg(feature = "ws")]
5088 shutdown: tokio_util::sync::CancellationToken::new(),
5089 policy_registry: crate::authorization::PolicyRegistry::default(),
5090 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5091 auth_session_key: "user_id".to_owned(),
5092 shared_cache: None,
5093 };
5094 let router = crate::router::build_router(post_routes, &config, state);
5095
5096 let response = router
5097 .oneshot(
5098 Request::builder()
5099 .method("POST")
5100 .uri("/submit")
5101 .body(Body::empty())
5102 .unwrap(),
5103 )
5104 .await
5105 .unwrap();
5106
5107 assert_eq!(response.status(), StatusCode::OK);
5108 }
5109
5110 #[tokio::test]
5111 async fn build_router_merges_methods_on_same_path() {
5112 let route_list = vec![
5113 Route {
5114 method: http::Method::GET,
5115 path: "/admin",
5116 handler: axum::routing::get(|| async { "list" }),
5117 name: "admin_list",
5118 api_doc: crate::openapi::ApiDoc {
5119 method: "GET",
5120 path: "/admin",
5121 operation_id: "admin_list",
5122 success_status: 200,
5123 ..Default::default()
5124 },
5125 repository: None,
5126 },
5127 Route {
5128 method: http::Method::POST,
5129 path: "/admin",
5130 handler: axum::routing::post(|| async { "created" }),
5131 name: "create",
5132 api_doc: crate::openapi::ApiDoc {
5133 method: "POST",
5134 path: "/admin",
5135 operation_id: "create",
5136 success_status: 200,
5137 ..Default::default()
5138 },
5139 repository: None,
5140 },
5141 ];
5142 let config = AutumnConfig::default();
5143 let state = AppState {
5144 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5145 std::collections::HashMap::new(),
5146 )),
5147 #[cfg(feature = "db")]
5148 pool: None,
5149 #[cfg(feature = "db")]
5150 replica_pool: None,
5151 profile: None,
5152 started_at: std::time::Instant::now(),
5153 health_detailed: true,
5154 probes: crate::probe::ProbeState::ready_for_test(),
5155 metrics: crate::middleware::MetricsCollector::new(),
5156 log_levels: crate::actuator::LogLevels::new("info"),
5157 task_registry: crate::actuator::TaskRegistry::new(),
5158 job_registry: crate::actuator::JobRegistry::new(),
5159 config_props: crate::actuator::ConfigProperties::default(),
5160 #[cfg(feature = "ws")]
5161 channels: crate::channels::Channels::new(32),
5162 #[cfg(feature = "ws")]
5163 shutdown: tokio_util::sync::CancellationToken::new(),
5164 policy_registry: crate::authorization::PolicyRegistry::default(),
5165 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5166 auth_session_key: "user_id".to_owned(),
5167 shared_cache: None,
5168 };
5169 let router = crate::router::build_router(route_list, &config, state);
5170
5171 let resp = router
5173 .clone()
5174 .oneshot(
5175 Request::builder()
5176 .uri("/admin")
5177 .body(Body::empty())
5178 .unwrap(),
5179 )
5180 .await
5181 .unwrap();
5182 assert_eq!(resp.status(), StatusCode::OK);
5183 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5184 .await
5185 .unwrap();
5186 assert_eq!(&body[..], b"list");
5187
5188 let resp = router
5190 .oneshot(
5191 Request::builder()
5192 .method("POST")
5193 .uri("/admin")
5194 .body(Body::empty())
5195 .unwrap(),
5196 )
5197 .await
5198 .unwrap();
5199 assert_eq!(resp.status(), StatusCode::OK);
5200 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
5201 .await
5202 .unwrap();
5203 assert_eq!(&body[..], b"created");
5204 }
5205
5206 #[cfg(feature = "htmx")]
5207 #[tokio::test]
5208 async fn htmx_handler_returns_javascript_with_correct_headers() {
5209 let app = axum::Router::new().route(
5210 crate::htmx::HTMX_JS_PATH,
5211 axum::routing::get(crate::router::htmx_handler),
5212 );
5213
5214 let response = app
5215 .oneshot(
5216 Request::builder()
5217 .uri(crate::htmx::HTMX_JS_PATH)
5218 .body(Body::empty())
5219 .unwrap(),
5220 )
5221 .await
5222 .unwrap();
5223
5224 assert_eq!(response.status(), StatusCode::OK);
5225
5226 let content_type = response
5227 .headers()
5228 .get("content-type")
5229 .unwrap()
5230 .to_str()
5231 .unwrap();
5232 assert!(
5233 content_type.contains("application/javascript"),
5234 "Expected application/javascript, got {content_type}"
5235 );
5236
5237 let cache_control = response
5238 .headers()
5239 .get("cache-control")
5240 .unwrap()
5241 .to_str()
5242 .unwrap();
5243 assert!(
5244 cache_control.contains("immutable"),
5245 "Expected immutable cache, got {cache_control}"
5246 );
5247
5248 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5249 .await
5250 .unwrap();
5251
5252 assert_eq!(body.len(), crate::htmx::HTMX_JS.len());
5254
5255 let start = std::str::from_utf8(&body[..50]).expect("htmx should be valid UTF-8");
5257 assert!(
5258 start.contains("htmx") || start.contains("function"),
5259 "Response doesn't look like htmx JavaScript: {start}"
5260 );
5261 }
5262
5263 #[cfg(feature = "htmx")]
5264 #[tokio::test]
5265 async fn htmx_csrf_handler_returns_csp_compatible_javascript() {
5266 let app = axum::Router::new().route(
5267 crate::htmx::HTMX_CSRF_JS_PATH,
5268 axum::routing::get(crate::router::htmx_csrf_handler),
5269 );
5270
5271 let response = app
5272 .oneshot(
5273 Request::builder()
5274 .uri(crate::htmx::HTMX_CSRF_JS_PATH)
5275 .body(Body::empty())
5276 .unwrap(),
5277 )
5278 .await
5279 .unwrap();
5280
5281 assert_eq!(response.status(), StatusCode::OK);
5282 assert_eq!(
5283 response
5284 .headers()
5285 .get("content-type")
5286 .and_then(|value| value.to_str().ok()),
5287 Some("application/javascript")
5288 );
5289
5290 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5291 .await
5292 .unwrap();
5293 let js = std::str::from_utf8(&body).expect("csrf helper should be valid utf-8");
5294
5295 assert!(js.contains("htmx:configRequest"));
5296 assert!(js.contains("X-CSRF-Token"));
5297 assert!(!js.contains("<script"));
5298 }
5299
5300 #[cfg(feature = "htmx")]
5301 #[tokio::test]
5302 async fn build_router_serves_htmx_js() {
5303 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5304
5305 let response = router
5306 .oneshot(
5307 Request::builder()
5308 .uri(crate::htmx::HTMX_JS_PATH)
5309 .body(Body::empty())
5310 .unwrap(),
5311 )
5312 .await
5313 .unwrap();
5314
5315 assert_eq!(response.status(), StatusCode::OK);
5316 let ct = response
5317 .headers()
5318 .get("content-type")
5319 .unwrap()
5320 .to_str()
5321 .unwrap();
5322 assert!(ct.contains("javascript"));
5323 }
5324
5325 #[cfg(feature = "htmx")]
5326 #[tokio::test]
5327 async fn build_router_serves_htmx_csrf_js() {
5328 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5329
5330 let response = router
5331 .oneshot(
5332 Request::builder()
5333 .uri(crate::htmx::HTMX_CSRF_JS_PATH)
5334 .body(Body::empty())
5335 .unwrap(),
5336 )
5337 .await
5338 .unwrap();
5339
5340 assert_eq!(response.status(), StatusCode::OK);
5341 let csp = response
5342 .headers()
5343 .get("content-security-policy")
5344 .expect("framework JS should still receive security headers")
5345 .to_str()
5346 .unwrap();
5347 assert!(csp.contains("script-src 'self'"), "csp = {csp}");
5348 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5349 .await
5350 .unwrap();
5351 let js = std::str::from_utf8(&body).expect("csrf helper should be valid utf-8");
5352 assert!(js.contains("htmx:configRequest"));
5353 assert!(js.contains("X-CSRF-Token"));
5354 }
5355
5356 #[tokio::test]
5357 async fn build_router_serves_default_favicon_without_404() {
5358 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5359
5360 let response = router
5361 .oneshot(
5362 Request::builder()
5363 .uri(crate::router::DEFAULT_FAVICON_PATH)
5364 .body(Body::empty())
5365 .unwrap(),
5366 )
5367 .await
5368 .unwrap();
5369
5370 assert_eq!(response.status(), StatusCode::NO_CONTENT);
5371 assert!(
5372 response.headers().contains_key("content-security-policy"),
5373 "framework fallback responses should still receive security headers"
5374 );
5375 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5376 .await
5377 .unwrap();
5378 assert!(body.is_empty());
5379 }
5380
5381 #[tokio::test]
5382 async fn build_router_does_not_override_user_favicon_route() {
5383 let router = test_router(vec![test_get_route(
5384 crate::router::DEFAULT_FAVICON_PATH,
5385 "favicon",
5386 )]);
5387
5388 let response = router
5389 .oneshot(
5390 Request::builder()
5391 .uri(crate::router::DEFAULT_FAVICON_PATH)
5392 .body(Body::empty())
5393 .unwrap(),
5394 )
5395 .await
5396 .unwrap();
5397
5398 assert_eq!(response.status(), StatusCode::OK);
5399 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5400 .await
5401 .unwrap();
5402 assert_eq!(&body[..], b"ok");
5403 }
5404
5405 #[tokio::test]
5406 async fn build_router_serves_static_files_for_unmatched_paths() {
5407 use std::collections::HashMap;
5408
5409 let tmp = tempfile::tempdir().expect("tempdir");
5411 let dist = tmp.path().join("dist");
5412 std::fs::create_dir_all(dist.join("docs")).expect("mkdir");
5413 std::fs::write(dist.join("docs/index.html"), "<h1>Static Docs</h1>").expect("write");
5414
5415 let manifest = crate::static_gen::StaticManifest {
5416 generated_at: "2026-03-27T00:00:00Z".to_owned(),
5417 autumn_version: "0.2.0".to_owned(),
5418 routes: HashMap::from([(
5419 "/docs".to_owned(),
5420 crate::static_gen::ManifestEntry {
5421 file: "docs/index.html".to_owned(),
5422 revalidate: None,
5423 },
5424 )]),
5425 };
5426 let json = serde_json::to_string(&manifest).expect("serialize");
5427 std::fs::write(dist.join("manifest.json"), json).expect("write manifest");
5428
5429 let config = AutumnConfig::default();
5431 let state = AppState {
5432 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5433 std::collections::HashMap::new(),
5434 )),
5435 #[cfg(feature = "db")]
5436 pool: None,
5437 #[cfg(feature = "db")]
5438 replica_pool: None,
5439 profile: None,
5440 started_at: std::time::Instant::now(),
5441 health_detailed: true,
5442 probes: crate::probe::ProbeState::ready_for_test(),
5443 metrics: crate::middleware::MetricsCollector::new(),
5444 log_levels: crate::actuator::LogLevels::new("info"),
5445 task_registry: crate::actuator::TaskRegistry::new(),
5446 job_registry: crate::actuator::JobRegistry::new(),
5447 config_props: crate::actuator::ConfigProperties::default(),
5448 #[cfg(feature = "ws")]
5449 channels: crate::channels::Channels::new(32),
5450 #[cfg(feature = "ws")]
5451 shutdown: tokio_util::sync::CancellationToken::new(),
5452 policy_registry: crate::authorization::PolicyRegistry::default(),
5453 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5454 auth_session_key: "user_id".to_owned(),
5455 shared_cache: None,
5456 };
5457 let router = crate::router::build_router_with_static(
5458 vec![test_get_route("/other", "other_page")],
5459 &config,
5460 state,
5461 Some(dist.as_path()),
5462 );
5463
5464 let response = router
5467 .oneshot(
5468 Request::builder()
5469 .uri("/docs/")
5470 .body(Body::empty())
5471 .unwrap(),
5472 )
5473 .await
5474 .unwrap();
5475
5476 assert_eq!(response.status(), StatusCode::OK);
5477 let csp = response
5478 .headers()
5479 .get("content-security-policy")
5480 .expect("static-first HTML should still receive security headers")
5481 .to_str()
5482 .unwrap();
5483 assert!(csp.contains("script-src 'self'"), "csp = {csp}");
5484 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5485 .await
5486 .unwrap();
5487 assert_eq!(std::str::from_utf8(&body).unwrap(), "<h1>Static Docs</h1>");
5488 }
5489
5490 #[tokio::test]
5491 async fn build_mode_static_rendering_bypasses_startup_barrier() {
5492 temp_env::async_with_vars([("AUTUMN_BUILD_STATIC", Some("1"))], async {
5493 let config = AutumnConfig::default();
5494 let state = AppState::for_test().with_startup_complete(false);
5495 let router = crate::router::build_router(
5496 vec![Route {
5497 method: http::Method::GET,
5498 path: "/about",
5499 handler: axum::routing::get(|| async { "About Page Content" }),
5500 name: "about",
5501 api_doc: crate::openapi::ApiDoc {
5502 method: "GET",
5503 path: "/about",
5504 operation_id: "about",
5505 success_status: 200,
5506 ..Default::default()
5507 },
5508 repository: None,
5509 }],
5510 &config,
5511 state,
5512 );
5513 let tmp = tempfile::tempdir().unwrap();
5514 let dist = tmp.path().join("dist");
5515
5516 let result = crate::static_gen::render_static_routes(
5517 router,
5518 &[crate::static_gen::StaticRouteMeta {
5519 path: "/about",
5520 name: "about",
5521 revalidate: None,
5522 params_fn: None,
5523 }],
5524 &dist,
5525 )
5526 .await;
5527
5528 assert!(result.is_ok(), "build failed: {:?}", result.err());
5529 let html = std::fs::read_to_string(dist.join("about/index.html")).unwrap();
5530 assert_eq!(html, "About Page Content");
5531 })
5532 .await;
5533 }
5534
5535 #[tokio::test]
5536 async fn build_router_injects_live_reload_script_when_enabled() {
5537 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
5538 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
5539 temp_env::async_with_vars(
5540 [
5541 ("AUTUMN_DEV_RELOAD", Some("1")),
5542 (
5543 "AUTUMN_DEV_RELOAD_STATE",
5544 Some(reload_file.path().to_str().expect("utf-8 path")),
5545 ),
5546 ],
5547 async {
5548 let router = test_router(vec![Route {
5549 method: http::Method::GET,
5550 path: "/page",
5551 handler: axum::routing::get(|| async {
5552 axum::response::Html("<html><body><main>ok</main></body></html>")
5553 }),
5554 name: "page",
5555 api_doc: crate::openapi::ApiDoc {
5556 method: "GET",
5557 path: "/page",
5558 operation_id: "page",
5559 success_status: 200,
5560 ..Default::default()
5561 },
5562 repository: None,
5563 }]);
5564
5565 let response = router
5566 .oneshot(Request::builder().uri("/page").body(Body::empty()).unwrap())
5567 .await
5568 .unwrap();
5569
5570 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5571 .await
5572 .unwrap();
5573 let html = std::str::from_utf8(&body).expect("utf-8");
5574 assert!(html.contains("/__autumn/live-reload"));
5575 },
5576 )
5577 .await;
5578 }
5579
5580 #[tokio::test]
5581 async fn build_router_mounts_dev_reload_script_endpoint_when_enabled() {
5582 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
5587 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
5588 temp_env::async_with_vars(
5589 [
5590 ("AUTUMN_DEV_RELOAD", Some("1")),
5591 (
5592 "AUTUMN_DEV_RELOAD_STATE",
5593 Some(reload_file.path().to_str().expect("utf-8 path")),
5594 ),
5595 ],
5596 async {
5597 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5598
5599 let response = router
5600 .oneshot(
5601 Request::builder()
5602 .uri("/__autumn/live-reload.js")
5603 .body(Body::empty())
5604 .unwrap(),
5605 )
5606 .await
5607 .unwrap();
5608
5609 assert_eq!(response.status(), StatusCode::OK);
5610 assert_eq!(
5611 response
5612 .headers()
5613 .get("content-type")
5614 .and_then(|v| v.to_str().ok()),
5615 Some("application/javascript; charset=utf-8")
5616 );
5617 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5618 .await
5619 .unwrap();
5620 let js = std::str::from_utf8(&body).expect("utf-8");
5621 assert!(js.contains("fetch("), "js body: {js}");
5622 },
5623 )
5624 .await;
5625 }
5626
5627 #[tokio::test]
5628 async fn build_router_mounts_dev_reload_endpoint_when_enabled() {
5629 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
5630 std::fs::write(reload_file.path(), r#"{"version":7,"kind":"css"}"#).expect("write");
5631 temp_env::async_with_vars(
5632 [
5633 ("AUTUMN_DEV_RELOAD", Some("1")),
5634 (
5635 "AUTUMN_DEV_RELOAD_STATE",
5636 Some(reload_file.path().to_str().expect("utf-8 path")),
5637 ),
5638 ],
5639 async {
5640 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5641
5642 let response = router
5643 .oneshot(
5644 Request::builder()
5645 .uri("/__autumn/live-reload")
5646 .body(Body::empty())
5647 .unwrap(),
5648 )
5649 .await
5650 .unwrap();
5651
5652 assert_eq!(response.status(), StatusCode::OK);
5653 assert_eq!(
5654 response.headers().get("cache-control").unwrap(),
5655 "no-store, no-cache, must-revalidate"
5656 );
5657 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
5658 .await
5659 .unwrap();
5660 assert_eq!(&body[..], br#"{"version":7,"kind":"css"}"#);
5661 },
5662 )
5663 .await;
5664 }
5665
5666 #[tokio::test]
5667 async fn build_router_disables_cache_for_static_assets_in_dev_reload_mode() {
5668 let project = tempfile::tempdir().expect("project dir");
5669 let static_dir = project.path().join("static");
5670 std::fs::create_dir_all(&static_dir).expect("mkdir");
5671 std::fs::write(static_dir.join("demo.txt"), "hello").expect("write static file");
5672 let reload_file = tempfile::NamedTempFile::new().expect("reload state file");
5673 std::fs::write(reload_file.path(), r#"{"version":0,"kind":"full"}"#).expect("write");
5674 temp_env::async_with_vars(
5675 [
5676 (
5677 "AUTUMN_MANIFEST_DIR",
5678 Some(project.path().to_str().expect("utf-8 path")),
5679 ),
5680 ("AUTUMN_DEV_RELOAD", Some("1")),
5681 (
5682 "AUTUMN_DEV_RELOAD_STATE",
5683 Some(reload_file.path().to_str().expect("utf-8 path")),
5684 ),
5685 ],
5686 async {
5687 let router = test_router(vec![test_get_route("/dummy", "dummy")]);
5688
5689 let response = router
5690 .oneshot(
5691 Request::builder()
5692 .uri("/static/demo.txt")
5693 .body(Body::empty())
5694 .unwrap(),
5695 )
5696 .await
5697 .unwrap();
5698
5699 assert_eq!(response.status(), StatusCode::OK);
5700 assert_eq!(
5701 response.headers().get("cache-control").unwrap(),
5702 "no-store, no-cache, must-revalidate"
5703 );
5704 },
5705 )
5706 .await;
5707 }
5708
5709 #[test]
5710 fn app_builder_accepts_static_routes() {
5711 use crate::static_gen::StaticRouteMeta;
5712 let metas = vec![StaticRouteMeta {
5713 path: "/about",
5714 name: "about",
5715 revalidate: None,
5716 params_fn: None,
5717 }];
5718 let builder = app().static_routes(metas);
5719 assert_eq!(builder.static_metas.len(), 1);
5720 }
5721
5722 #[test]
5723 fn project_dir_defaults_to_subdir() {
5724 let env = crate::config::MockEnv::new();
5727 let dir = super::project_dir("dist", &env);
5728 assert_eq!(dir, std::path::PathBuf::from("dist"));
5729 }
5730
5731 pub fn test_router_with_config(routes: Vec<Route>, config: &AutumnConfig) -> axum::Router {
5733 let state = AppState {
5734 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5735 std::collections::HashMap::new(),
5736 )),
5737 #[cfg(feature = "db")]
5738 pool: None,
5739 #[cfg(feature = "db")]
5740 replica_pool: None,
5741 profile: None,
5742 started_at: std::time::Instant::now(),
5743 health_detailed: true,
5744 probes: crate::probe::ProbeState::ready_for_test(),
5745 metrics: crate::middleware::MetricsCollector::new(),
5746 log_levels: crate::actuator::LogLevels::new("info"),
5747 task_registry: crate::actuator::TaskRegistry::new(),
5748 job_registry: crate::actuator::JobRegistry::new(),
5749 config_props: crate::actuator::ConfigProperties::default(),
5750 #[cfg(feature = "ws")]
5751 channels: crate::channels::Channels::new(32),
5752 #[cfg(feature = "ws")]
5753 shutdown: tokio_util::sync::CancellationToken::new(),
5754 policy_registry: crate::authorization::PolicyRegistry::default(),
5755 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5756 auth_session_key: "user_id".to_owned(),
5757 shared_cache: None,
5758 };
5759 crate::router::build_router(routes, config, state)
5760 }
5761
5762 #[tokio::test]
5763 async fn cors_wildcard_allows_any_origin() {
5764 let mut config = AutumnConfig::default();
5765 config.cors.allowed_origins = vec!["*".to_owned()];
5766 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
5767
5768 let response = router
5769 .oneshot(
5770 Request::builder()
5771 .uri("/test")
5772 .header("Origin", "https://example.com")
5773 .body(Body::empty())
5774 .unwrap(),
5775 )
5776 .await
5777 .unwrap();
5778
5779 assert_eq!(response.status(), StatusCode::OK);
5780 assert_eq!(
5781 response
5782 .headers()
5783 .get("access-control-allow-origin")
5784 .unwrap(),
5785 "*"
5786 );
5787 }
5788
5789 #[tokio::test]
5790 async fn cors_specific_origin_reflected() {
5791 let mut config = AutumnConfig::default();
5792 config.cors.allowed_origins = vec!["https://example.com".to_owned()];
5793 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
5794
5795 let response = router
5796 .oneshot(
5797 Request::builder()
5798 .uri("/test")
5799 .header("Origin", "https://example.com")
5800 .body(Body::empty())
5801 .unwrap(),
5802 )
5803 .await
5804 .unwrap();
5805
5806 assert_eq!(response.status(), StatusCode::OK);
5807 assert_eq!(
5808 response
5809 .headers()
5810 .get("access-control-allow-origin")
5811 .unwrap(),
5812 "https://example.com"
5813 );
5814 }
5815
5816 #[tokio::test]
5817 async fn cors_disabled_when_no_origins() {
5818 let config = AutumnConfig::default();
5819 assert!(config.cors.allowed_origins.is_empty());
5820 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
5821
5822 let response = router
5823 .oneshot(
5824 Request::builder()
5825 .uri("/test")
5826 .header("Origin", "https://example.com")
5827 .body(Body::empty())
5828 .unwrap(),
5829 )
5830 .await
5831 .unwrap();
5832
5833 assert_eq!(response.status(), StatusCode::OK);
5834 assert!(
5835 response
5836 .headers()
5837 .get("access-control-allow-origin")
5838 .is_none()
5839 );
5840 }
5841
5842 #[tokio::test]
5843 async fn cors_preflight_returns_204() {
5844 let mut config = AutumnConfig::default();
5845 config.cors.allowed_origins = vec!["https://example.com".to_owned()];
5846 let router = test_router_with_config(vec![test_get_route("/test", "test")], &config);
5847
5848 let response = router
5849 .oneshot(
5850 Request::builder()
5851 .method("OPTIONS")
5852 .uri("/test")
5853 .header("Origin", "https://example.com")
5854 .header("Access-Control-Request-Method", "GET")
5855 .body(Body::empty())
5856 .unwrap(),
5857 )
5858 .await
5859 .unwrap();
5860
5861 assert_eq!(response.status(), StatusCode::OK);
5862 assert!(
5863 response
5864 .headers()
5865 .contains_key("access-control-allow-methods")
5866 );
5867 }
5868
5869 #[tokio::test]
5870 async fn build_router_with_static_skips_without_manifest() {
5871 let tmp = tempfile::tempdir().expect("tempdir");
5874 let dist = tmp.path().join("dist");
5875 std::fs::create_dir_all(&dist).expect("mkdir");
5876 let config = AutumnConfig::default();
5879 let state = AppState {
5880 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5881 std::collections::HashMap::new(),
5882 )),
5883 #[cfg(feature = "db")]
5884 pool: None,
5885 #[cfg(feature = "db")]
5886 replica_pool: None,
5887 profile: None,
5888 started_at: std::time::Instant::now(),
5889 health_detailed: true,
5890 probes: crate::probe::ProbeState::ready_for_test(),
5891 metrics: crate::middleware::MetricsCollector::new(),
5892 log_levels: crate::actuator::LogLevels::new("info"),
5893 task_registry: crate::actuator::TaskRegistry::new(),
5894 job_registry: crate::actuator::JobRegistry::new(),
5895 config_props: crate::actuator::ConfigProperties::default(),
5896 #[cfg(feature = "ws")]
5897 channels: crate::channels::Channels::new(32),
5898 #[cfg(feature = "ws")]
5899 shutdown: tokio_util::sync::CancellationToken::new(),
5900 policy_registry: crate::authorization::PolicyRegistry::default(),
5901 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5902 auth_session_key: "user_id".to_owned(),
5903 shared_cache: None,
5904 };
5905 let router = crate::router::build_router_with_static(
5906 vec![test_get_route("/test", "test")],
5907 &config,
5908 state,
5909 Some(dist.as_path()),
5910 );
5911
5912 let response = router
5913 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
5914 .await
5915 .unwrap();
5916 assert_eq!(response.status(), StatusCode::OK);
5917 }
5918
5919 #[tokio::test]
5920 async fn build_router_with_static_none_dist() {
5921 let config = AutumnConfig::default();
5923 let state = AppState {
5924 extensions: std::sync::Arc::new(std::sync::RwLock::new(
5925 std::collections::HashMap::new(),
5926 )),
5927 #[cfg(feature = "db")]
5928 pool: None,
5929 #[cfg(feature = "db")]
5930 replica_pool: None,
5931 profile: None,
5932 started_at: std::time::Instant::now(),
5933 health_detailed: true,
5934 probes: crate::probe::ProbeState::ready_for_test(),
5935 metrics: crate::middleware::MetricsCollector::new(),
5936 log_levels: crate::actuator::LogLevels::new("info"),
5937 task_registry: crate::actuator::TaskRegistry::new(),
5938 job_registry: crate::actuator::JobRegistry::new(),
5939 config_props: crate::actuator::ConfigProperties::default(),
5940 #[cfg(feature = "ws")]
5941 channels: crate::channels::Channels::new(32),
5942 #[cfg(feature = "ws")]
5943 shutdown: tokio_util::sync::CancellationToken::new(),
5944 policy_registry: crate::authorization::PolicyRegistry::default(),
5945 forbidden_response: crate::authorization::ForbiddenResponse::default(),
5946 auth_session_key: "user_id".to_owned(),
5947 shared_cache: None,
5948 };
5949 let router = crate::router::build_router_with_static(
5950 vec![test_get_route("/test", "test")],
5951 &config,
5952 state,
5953 None,
5954 );
5955
5956 let response = router
5957 .oneshot(Request::builder().uri("/test").body(Body::empty()).unwrap())
5958 .await
5959 .unwrap();
5960 assert_eq!(response.status(), StatusCode::OK);
5961 }
5962
5963 #[test]
5966 fn format_route_lines_lists_user_routes() {
5967 let routes = vec![
5968 test_get_route("/", "index"),
5969 test_get_route("/users/{id}", "get_user"),
5970 ];
5971 let config = AutumnConfig::default();
5972 let output = format_route_lines(&routes, &[], &config);
5973 assert!(output.contains("-> index"));
5974 assert!(output.contains("/ GET"));
5975 assert!(output.contains("/users/{id}"));
5976 assert!(output.contains("-> get_user"));
5977 }
5978
5979 #[test]
5980 fn config_runtime_drift_format_route_lines_uses_actuator_prefix() {
5981 let mut config = AutumnConfig::default();
5982 config.actuator.prefix = "/ops".to_owned();
5983 let output = format_route_lines(&[], &[], &config);
5984 assert!(output.contains("-> health"));
5985 assert!(output.contains("/ops/*"));
5986 }
5987
5988 #[test]
5989 fn format_task_lines_none_when_empty() {
5990 assert!(format_task_lines(&[]).is_none());
5991 }
5992
5993 #[test]
5994 fn format_task_lines_fixed_delay() {
5995 let tasks = vec![crate::task::TaskInfo {
5996 name: "cleanup".into(),
5997 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_secs(300)),
5998 coordination: crate::task::TaskCoordination::Fleet,
5999 handler: |_| Box::pin(async { Ok(()) }),
6000 }];
6001 let output = format_task_lines(&tasks).unwrap();
6002 assert!(output.contains("cleanup (every 300s)"));
6003 }
6004
6005 #[test]
6006 fn format_task_lines_cron() {
6007 let tasks = vec![crate::task::TaskInfo {
6008 name: "nightly".into(),
6009 schedule: crate::task::Schedule::Cron {
6010 expression: "0 0 * * *".into(),
6011 timezone: None,
6012 },
6013 coordination: crate::task::TaskCoordination::Fleet,
6014 handler: |_| Box::pin(async { Ok(()) }),
6015 }];
6016 let output = format_task_lines(&tasks).unwrap();
6017 assert!(output.contains("nightly (cron 0 0 * * *)"));
6018 }
6019
6020 #[test]
6021 fn format_middleware_list_default() {
6022 let config = AutumnConfig::default();
6023 let output = format_middleware_list(&config);
6024 assert!(output.contains("RequestId"));
6025 assert!(output.contains("SecurityHeaders"));
6026 assert!(output.contains("Session (in-memory)"));
6027 assert!(output.contains("Metrics"));
6028 assert!(!output.contains("CORS"));
6030 assert!(!output.contains("CSRF"));
6031 }
6032
6033 #[test]
6034 fn format_middleware_list_with_cors_and_csrf() {
6035 let config = AutumnConfig {
6036 cors: crate::config::CorsConfig {
6037 allowed_origins: vec!["https://example.com".into()],
6038 ..crate::config::CorsConfig::default()
6039 },
6040 security: crate::security::config::SecurityConfig {
6041 csrf: crate::security::config::CsrfConfig {
6042 enabled: true,
6043 ..crate::security::config::CsrfConfig::default()
6044 },
6045 ..crate::security::config::SecurityConfig::default()
6046 },
6047 ..AutumnConfig::default()
6048 };
6049 let output = format_middleware_list(&config);
6050 assert!(output.contains("CORS"));
6051 assert!(output.contains("CSRF"));
6052 }
6053
6054 #[test]
6055 fn mask_database_url_with_password() {
6056 let masked = mask_database_url("postgres://user:secret@localhost:5432/mydb", 10);
6057 assert!(masked.contains("****"));
6058 assert!(!masked.contains("secret"));
6059 assert!(masked.contains("postgres://user:****@localhost:5432/mydb"));
6060 assert!(masked.contains("pool_size=10"));
6061 }
6062
6063 #[test]
6064 fn mask_database_url_without_password() {
6065 let masked = mask_database_url("postgres://localhost/mydb", 5);
6066 assert!(!masked.contains("****"));
6067 assert!(masked.contains("postgres://localhost/mydb"));
6068 assert!(masked.contains("pool_size=5"));
6069 }
6070
6071 #[test]
6072 fn mask_database_url_edge_cases() {
6073 let masked2 = mask_database_url("postgres://user:p%40ssw%3Ard%21@localhost:5432/mydb", 10);
6080 assert!(masked2.contains("****"));
6081 assert!(!masked2.contains("p%40ssw%3Ard%21"));
6082 assert!(masked2.contains("postgres://user:****@localhost:5432/mydb"));
6083
6084 let masked3 = mask_database_url("postgres://:secret@localhost:5432/mydb", 10);
6086 assert!(masked3.contains("****"));
6087 assert!(!masked3.contains("secret"));
6088 assert!(masked3.contains("postgres://:****@localhost:5432/mydb"));
6089 }
6090 #[test]
6091 fn mask_database_url_invalid_url_fallback() {
6092 let masked = mask_database_url("this is completely invalid as a URL with supersecret", 10);
6093 assert!(masked.contains("****"));
6094 assert!(!masked.contains("supersecret"));
6095 assert!(masked.contains("pool_size=10"));
6096 }
6097
6098 #[test]
6099 fn format_config_summary_defaults() {
6100 let config = AutumnConfig::default();
6101 let output = format_config_summary(&config);
6102 assert!(output.contains("profile: none"));
6103 assert!(output.contains("server: 127.0.0.1:3000"));
6104 assert!(output.contains("database: not configured"));
6105 assert!(output.contains("log_level:"));
6106 assert!(output.contains("telemetry: disabled"));
6107 assert!(output.contains("health: /health"));
6108 }
6109
6110 #[test]
6111 fn format_config_summary_with_db() {
6112 let config = AutumnConfig {
6113 database: crate::config::DatabaseConfig {
6114 url: Some("postgres://user:pass@host/db".into()),
6115 pool_size: 20,
6116 ..crate::config::DatabaseConfig::default()
6117 },
6118 ..AutumnConfig::default()
6119 };
6120 let output = format_config_summary(&config);
6121 assert!(output.contains("user:****@host/db"));
6122 assert!(output.contains("pool_size=20"));
6123 assert!(!output.contains("pass"));
6124 }
6125
6126 #[test]
6127 fn format_config_summary_with_profile() {
6128 let config = AutumnConfig {
6129 profile: Some("prod".into()),
6130 ..AutumnConfig::default()
6131 };
6132 let output = format_config_summary(&config);
6133 assert!(output.contains("profile: prod"));
6134 }
6135
6136 #[test]
6137 fn format_config_summary_with_telemetry() {
6138 let config = AutumnConfig {
6139 telemetry: crate::config::TelemetryConfig {
6140 enabled: true,
6141 service_name: "orders-api".into(),
6142 otlp_endpoint: Some("http://otel-collector:4317".into()),
6143 ..crate::config::TelemetryConfig::default()
6144 },
6145 ..AutumnConfig::default()
6146 };
6147 let output = format_config_summary(&config);
6148 assert!(output.contains("telemetry: Grpc -> http://otel-collector:4317"));
6149 }
6150
6151 #[test]
6152 fn log_startup_transparency_runs_without_panic() {
6153 let routes = vec![test_get_route("/", "index")];
6157 let tasks = vec![crate::task::TaskInfo {
6158 name: "cleanup".into(),
6159 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_secs(60)),
6160 coordination: crate::task::TaskCoordination::Fleet,
6161 handler: |_| Box::pin(async { Ok(()) }),
6162 }];
6163 let config = AutumnConfig::default();
6164 log_startup_transparency(&routes, &tasks, &[], &config);
6165 }
6166
6167 #[test]
6168 fn log_startup_transparency_no_tasks() {
6169 let routes = vec![test_get_route("/health", "check")];
6170 let config = AutumnConfig::default();
6171 log_startup_transparency(&routes, &[], &[], &config);
6172 }
6173
6174 #[cfg(feature = "ws")]
6175 #[tokio::test]
6176 async fn start_task_scheduler_broadcasts_events() {
6177 let state = AppState {
6178 extensions: std::sync::Arc::new(std::sync::RwLock::new(
6179 std::collections::HashMap::new(),
6180 )),
6181 #[cfg(feature = "db")]
6182 pool: None,
6183 #[cfg(feature = "db")]
6184 replica_pool: None,
6185 profile: None,
6186 started_at: std::time::Instant::now(),
6187 health_detailed: true,
6188 probes: crate::probe::ProbeState::ready_for_test(),
6189 metrics: crate::middleware::MetricsCollector::new(),
6190 log_levels: crate::actuator::LogLevels::new("info"),
6191 task_registry: crate::actuator::TaskRegistry::new(),
6192 job_registry: crate::actuator::JobRegistry::new(),
6193 config_props: crate::actuator::ConfigProperties::default(),
6194 channels: crate::channels::Channels::new(32),
6195 shutdown: tokio_util::sync::CancellationToken::new(),
6196 policy_registry: crate::authorization::PolicyRegistry::default(),
6197 forbidden_response: crate::authorization::ForbiddenResponse::default(),
6198 auth_session_key: "user_id".to_owned(),
6199 shared_cache: None,
6200 };
6201
6202 let mut rx = state.channels().subscribe("sys:tasks");
6203
6204 let task = crate::task::TaskInfo {
6205 name: "test_broadcaster".into(),
6206 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_millis(1)),
6208 coordination: crate::task::TaskCoordination::Fleet,
6209 handler: |_| Box::pin(async { Ok(()) }),
6210 };
6211
6212 let state_clone = state.clone();
6214 tokio::spawn(async move {
6215 super::start_task_scheduler(
6216 vec![task],
6217 &state_clone,
6218 &tokio_util::sync::CancellationToken::new(),
6219 );
6220 });
6221
6222 let msg1 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
6224 .await
6225 .expect("timeout waiting for start event")
6226 .expect("channel closed");
6227 let json1: serde_json::Value = serde_json::from_str(msg1.as_str()).unwrap();
6228 assert_eq!(json1["event"], "started");
6229 assert_eq!(json1["task"], "test_broadcaster");
6230
6231 let msg2 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
6233 .await
6234 .expect("timeout waiting for success event")
6235 .expect("channel closed");
6236 let json2: serde_json::Value = serde_json::from_str(msg2.as_str()).unwrap();
6237 assert_eq!(json2["event"], "success");
6238 assert_eq!(json2["task"], "test_broadcaster");
6239 assert!(json2.get("duration_ms").is_some());
6240 }
6241
6242 #[cfg(feature = "ws")]
6243 #[tokio::test]
6244 async fn start_task_scheduler_broadcasts_failure_events() {
6245 let state = AppState {
6246 extensions: std::sync::Arc::new(std::sync::RwLock::new(
6247 std::collections::HashMap::new(),
6248 )),
6249 #[cfg(feature = "db")]
6250 pool: None,
6251 #[cfg(feature = "db")]
6252 replica_pool: None,
6253 profile: None,
6254 started_at: std::time::Instant::now(),
6255 health_detailed: true,
6256 probes: crate::probe::ProbeState::ready_for_test(),
6257 metrics: crate::middleware::MetricsCollector::new(),
6258 log_levels: crate::actuator::LogLevels::new("info"),
6259 task_registry: crate::actuator::TaskRegistry::new(),
6260 job_registry: crate::actuator::JobRegistry::new(),
6261 config_props: crate::actuator::ConfigProperties::default(),
6262 channels: crate::channels::Channels::new(32),
6263 shutdown: tokio_util::sync::CancellationToken::new(),
6264 policy_registry: crate::authorization::PolicyRegistry::default(),
6265 forbidden_response: crate::authorization::ForbiddenResponse::default(),
6266 auth_session_key: "user_id".to_owned(),
6267 shared_cache: None,
6268 };
6269
6270 let mut rx = state.channels().subscribe("sys:tasks");
6271
6272 let task = crate::task::TaskInfo {
6273 name: "test_failing_task".into(),
6274 schedule: crate::task::Schedule::FixedDelay(std::time::Duration::from_millis(1)),
6275 coordination: crate::task::TaskCoordination::Fleet,
6276 handler: |_| {
6277 Box::pin(async { Err(crate::AutumnError::bad_request_msg("forced error")) })
6278 },
6279 };
6280
6281 let state_clone = state.clone();
6282 tokio::spawn(async move {
6283 super::start_task_scheduler(
6284 vec![task],
6285 &state_clone,
6286 &tokio_util::sync::CancellationToken::new(),
6287 );
6288 });
6289
6290 let _ = rx.recv().await.unwrap();
6292
6293 let msg2 = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
6295 .await
6296 .expect("timeout waiting for failure event")
6297 .expect("channel closed");
6298 let json2: serde_json::Value = serde_json::from_str(msg2.as_str()).unwrap();
6299 assert_eq!(json2["event"], "failure");
6300 assert_eq!(json2["task"], "test_failing_task");
6301 assert_eq!(json2["error"], "forced error");
6302 }
6303
6304 #[tokio::test]
6305 async fn execute_task_result_ok_returns_duration() {
6306 let state = AppState::for_test();
6307 let handler: crate::task::TaskHandler = |_| Box::pin(async { Ok(()) });
6308 let start = std::time::Instant::now();
6309 let result =
6310 super::execute_task_result(&state, handler, start, "test_task", "fixed_delay").await;
6311 assert!(result.is_ok(), "expected Ok from successful handler");
6312 assert!(result.unwrap() < u64::MAX);
6314 }
6315
6316 #[tokio::test]
6317 async fn execute_task_result_err_returns_duration_and_message() {
6318 let state = AppState::for_test();
6319 let handler: crate::task::TaskHandler =
6320 |_| Box::pin(async { Err(crate::AutumnError::bad_request_msg("test error")) });
6321 let start = std::time::Instant::now();
6322 let result =
6323 super::execute_task_result(&state, handler, start, "test_task", "fixed_delay").await;
6324 assert!(result.is_err(), "expected Err from failing handler");
6325 let (duration_ms, msg) = result.unwrap_err();
6326 assert!(duration_ms < u64::MAX);
6327 assert!(msg.contains("test error"));
6328 }
6329
6330 fn instantly_panicking_scheduled_handler(
6331 _state: AppState,
6332 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::AutumnResult<()>> + Send>> {
6333 panic!("panic before scheduled future")
6334 }
6335
6336 #[tokio::test]
6337 async fn execute_task_result_reports_immediate_handler_panics() {
6338 let state = AppState::for_test();
6339 let start = std::time::Instant::now();
6340 let result = super::execute_task_result(
6341 &state,
6342 instantly_panicking_scheduled_handler,
6343 start,
6344 "test_task",
6345 "fixed_delay",
6346 )
6347 .await;
6348
6349 let (duration_ms, msg) = result.expect_err("expected Err from panicking handler");
6350 assert!(duration_ms < u64::MAX);
6351 assert!(msg.contains("scheduled task handler panicked: panic before scheduled future"));
6352 }
6353
6354 #[tokio::test]
6355 async fn execute_fixed_delay_task_does_not_timeout_in_process_runs() {
6356 let state = AppState::for_test();
6357 state.task_registry.register_scheduled(
6358 "slow_task",
6359 "every 1s",
6360 crate::task::TaskCoordination::Fleet,
6361 "in_process",
6362 "replica-a",
6363 );
6364 let handler: crate::task::TaskHandler = |_| {
6365 Box::pin(async {
6366 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
6367 Ok(())
6368 })
6369 };
6370 let coordinator = std::sync::Arc::new(
6371 crate::scheduler::InProcessSchedulerCoordinator::new("replica-a"),
6372 );
6373
6374 super::execute_fixed_delay_task(
6375 "slow_task".to_owned(),
6376 state.clone(),
6377 handler,
6378 std::time::Duration::from_secs(1),
6379 crate::task::TaskCoordination::Fleet,
6380 coordinator,
6381 std::time::Duration::from_millis(10),
6382 )
6383 .await;
6384
6385 let snapshot = state.task_registry.snapshot();
6386 let status = &snapshot["slow_task"];
6387 assert_eq!(status.status, "idle");
6388 assert_eq!(status.last_result.as_deref(), Some("ok"));
6389 assert_eq!(status.total_runs, 1);
6390 assert_eq!(status.total_failures, 0);
6391 assert!(status.last_error.is_none());
6392 }
6393
6394 static SKIPPED_LEASE_HANDLER_CALLS: AtomicUsize = AtomicUsize::new(0);
6395
6396 struct DenyingSchedulerCoordinator;
6397
6398 impl crate::scheduler::SchedulerCoordinator for DenyingSchedulerCoordinator {
6399 fn backend(&self) -> &'static str {
6400 "postgres"
6401 }
6402
6403 fn replica_id(&self) -> &'static str {
6404 "replica-a"
6405 }
6406
6407 fn try_acquire<'a>(
6408 &'a self,
6409 _task_name: &'a str,
6410 _tick_key: &'a str,
6411 _coordination: crate::task::TaskCoordination,
6412 ) -> crate::scheduler::SchedulerFuture<
6413 'a,
6414 crate::AutumnResult<Option<crate::scheduler::SchedulerLease>>,
6415 > {
6416 Box::pin(async { Ok(None) })
6417 }
6418 }
6419
6420 struct GrantingSchedulerCoordinator {
6421 backend: &'static str,
6422 tick_keys: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
6423 release_count: Option<std::sync::Arc<AtomicUsize>>,
6424 }
6425
6426 impl crate::scheduler::SchedulerCoordinator for GrantingSchedulerCoordinator {
6427 fn backend(&self) -> &'static str {
6428 self.backend
6429 }
6430
6431 fn replica_id(&self) -> &'static str {
6432 "replica-a"
6433 }
6434
6435 fn try_acquire<'a>(
6436 &'a self,
6437 _task_name: &'a str,
6438 tick_key: &'a str,
6439 _coordination: crate::task::TaskCoordination,
6440 ) -> crate::scheduler::SchedulerFuture<
6441 'a,
6442 crate::AutumnResult<Option<crate::scheduler::SchedulerLease>>,
6443 > {
6444 Box::pin(async move {
6445 self.tick_keys.lock().unwrap().push(tick_key.to_owned());
6446 let lease = self.release_count.as_ref().map_or_else(
6447 || crate::scheduler::SchedulerLease::local(self.backend, "replica-a"),
6448 |release_count| {
6449 crate::scheduler::SchedulerLease::tracked(
6450 self.backend,
6451 "replica-a",
6452 std::sync::Arc::clone(release_count),
6453 )
6454 },
6455 );
6456 Ok(Some(lease))
6457 })
6458 }
6459 }
6460
6461 fn counted_scheduled_handler(
6462 _state: AppState,
6463 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::AutumnResult<()>> + Send>> {
6464 Box::pin(async {
6465 SKIPPED_LEASE_HANDLER_CALLS.fetch_add(1, Ordering::SeqCst);
6466 Ok(())
6467 })
6468 }
6469
6470 #[tokio::test]
6471 async fn execute_fixed_delay_task_skips_handler_when_lease_is_not_acquired() {
6472 SKIPPED_LEASE_HANDLER_CALLS.store(0, Ordering::SeqCst);
6473 let state = AppState::for_test();
6474 state.task_registry.register_scheduled(
6475 "claimed_elsewhere",
6476 "every 1s",
6477 crate::task::TaskCoordination::Fleet,
6478 "postgres",
6479 "replica-a",
6480 );
6481 let coordinator = std::sync::Arc::new(DenyingSchedulerCoordinator);
6482
6483 super::execute_fixed_delay_task(
6484 "claimed_elsewhere".to_owned(),
6485 state.clone(),
6486 counted_scheduled_handler,
6487 std::time::Duration::from_secs(1),
6488 crate::task::TaskCoordination::Fleet,
6489 coordinator,
6490 std::time::Duration::from_secs(1),
6491 )
6492 .await;
6493
6494 let snapshot = state.task_registry.snapshot();
6495 let status = &snapshot["claimed_elsewhere"];
6496 assert_eq!(SKIPPED_LEASE_HANDLER_CALLS.load(Ordering::SeqCst), 0);
6497 assert_eq!(status.total_runs, 0);
6498 assert!(status.current_leader.is_none());
6499 assert!(status.last_tick.is_none());
6500 }
6501
6502 #[tokio::test]
6503 async fn execute_fixed_delay_task_records_distributed_lease_ttl_timeout() {
6504 let state = AppState::for_test();
6505 state.task_registry.register_scheduled(
6506 "slow_distributed_task",
6507 "every 1s",
6508 crate::task::TaskCoordination::Fleet,
6509 "postgres",
6510 "replica-a",
6511 );
6512 let handler: crate::task::TaskHandler = |_| {
6513 Box::pin(async {
6514 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
6515 Ok(())
6516 })
6517 };
6518 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
6519 backend: "postgres",
6520 tick_keys: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
6521 release_count: None,
6522 });
6523
6524 super::execute_fixed_delay_task(
6525 "slow_distributed_task".to_owned(),
6526 state.clone(),
6527 handler,
6528 std::time::Duration::from_secs(1),
6529 crate::task::TaskCoordination::Fleet,
6530 coordinator,
6531 std::time::Duration::from_millis(10),
6532 )
6533 .await;
6534
6535 let snapshot = state.task_registry.snapshot();
6536 let status = &snapshot["slow_distributed_task"];
6537 assert_eq!(status.status, "idle");
6538 assert_eq!(status.last_result.as_deref(), Some("failed"));
6539 assert_eq!(status.total_runs, 1);
6540 assert_eq!(status.total_failures, 1);
6541 assert!(
6542 status
6543 .last_error
6544 .as_deref()
6545 .is_some_and(|error| error.contains("lease TTL"))
6546 );
6547 }
6548
6549 #[tokio::test]
6550 async fn execute_cron_task_uses_scheduled_occurrence_for_tick_key() {
6551 let state = AppState::for_test();
6552 state.task_registry.register_scheduled(
6553 "cron_review_task",
6554 "cron */10 * * * * *",
6555 crate::task::TaskCoordination::Fleet,
6556 "postgres",
6557 "replica-a",
6558 );
6559 let tick_keys = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
6560 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
6561 backend: "postgres",
6562 tick_keys: std::sync::Arc::clone(&tick_keys),
6563 release_count: None,
6564 });
6565 let handler: crate::task::TaskHandler = |_| Box::pin(async { Ok(()) });
6566 let scheduled_unix_secs = 1_700_000_000;
6567
6568 super::execute_cron_task(
6569 "cron_review_task".to_owned(),
6570 state.clone(),
6571 handler,
6572 crate::task::TaskCoordination::Fleet,
6573 coordinator,
6574 std::time::Duration::from_secs(30),
6575 scheduled_unix_secs,
6576 )
6577 .await;
6578
6579 assert_eq!(
6580 tick_keys.lock().unwrap().as_slice(),
6581 ["cron_review_task:1700000000"]
6582 );
6583 }
6584
6585 #[tokio::test]
6586 async fn execute_fixed_delay_task_releases_lease_when_handler_panics() {
6587 let state = AppState::for_test();
6588 state.task_registry.register_scheduled(
6589 "panic_task",
6590 "every 1s",
6591 crate::task::TaskCoordination::Fleet,
6592 "postgres",
6593 "replica-a",
6594 );
6595 let release_count = std::sync::Arc::new(AtomicUsize::new(0));
6596 let coordinator = std::sync::Arc::new(GrantingSchedulerCoordinator {
6597 backend: "postgres",
6598 tick_keys: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
6599 release_count: Some(std::sync::Arc::clone(&release_count)),
6600 });
6601 let handler: crate::task::TaskHandler = |_| {
6602 Box::pin(async {
6603 panic!("forced scheduled panic");
6604 #[allow(unreachable_code)]
6605 Ok(())
6606 })
6607 };
6608
6609 super::execute_fixed_delay_task(
6610 "panic_task".to_owned(),
6611 state.clone(),
6612 handler,
6613 std::time::Duration::from_secs(1),
6614 crate::task::TaskCoordination::Fleet,
6615 coordinator,
6616 std::time::Duration::from_secs(30),
6617 )
6618 .await;
6619
6620 let snapshot = state.task_registry.snapshot();
6621 let status = &snapshot["panic_task"];
6622 assert_eq!(release_count.load(Ordering::SeqCst), 1);
6623 assert_eq!(status.status, "idle");
6624 assert_eq!(status.last_result.as_deref(), Some("failed"));
6625 assert_eq!(status.total_runs, 1);
6626 assert_eq!(status.total_failures, 1);
6627 assert!(
6628 status
6629 .last_error
6630 .as_deref()
6631 .is_some_and(|error| error.contains("scheduled task handler panicked"))
6632 );
6633 }
6634
6635 #[test]
6636 fn next_cron_occurrence_skips_overdue_slots() {
6637 use chrono::TimeZone as _;
6638
6639 let cron = "0 * * * * *"
6640 .parse::<croner::Cron>()
6641 .expect("cron expression should parse");
6642 let stale_cursor = chrono_tz::UTC
6643 .with_ymd_and_hms(2026, 5, 5, 12, 0, 0)
6644 .unwrap();
6645 let now = chrono_tz::UTC
6646 .with_ymd_and_hms(2026, 5, 5, 12, 30, 5)
6647 .unwrap();
6648 let next = super::next_cron_occurrence_after(&cron, &stale_cursor, &now)
6649 .expect("next cron occurrence should resolve");
6650
6651 assert_eq!(
6652 next,
6653 chrono_tz::UTC
6654 .with_ymd_and_hms(2026, 5, 5, 12, 31, 0)
6655 .unwrap()
6656 );
6657 }
6658
6659 #[test]
6660 fn cron_occurrence_is_overdue_after_later_slot_passed() {
6661 use chrono::TimeZone as _;
6662
6663 let cron = "0 * * * * *"
6664 .parse::<croner::Cron>()
6665 .expect("cron expression should parse");
6666 let scheduled_at = chrono_tz::UTC
6667 .with_ymd_and_hms(2026, 5, 5, 12, 1, 0)
6668 .unwrap();
6669 let slightly_late = chrono_tz::UTC
6670 .with_ymd_and_hms(2026, 5, 5, 12, 1, 5)
6671 .unwrap();
6672 let after_later_slot = chrono_tz::UTC
6673 .with_ymd_and_hms(2026, 5, 5, 12, 30, 5)
6674 .unwrap();
6675
6676 assert!(
6677 !super::cron_occurrence_is_overdue(&cron, &scheduled_at, &slightly_late)
6678 .expect("overdue check should resolve")
6679 );
6680 assert!(
6681 super::cron_occurrence_is_overdue(&cron, &scheduled_at, &after_later_slot)
6682 .expect("overdue check should resolve")
6683 );
6684 }
6685
6686 #[cfg(feature = "storage")]
6687 mod storage_preflight {
6688 use super::super::{StorageBootstrap, preflight_storage};
6689 use crate::AppState;
6690 use crate::config::AutumnConfig;
6691 use crate::storage::{BlobStoreState, StorageBackend, StorageConfig, StorageLocalConfig};
6692
6693 fn config_with_storage(storage: StorageConfig) -> AutumnConfig {
6694 AutumnConfig {
6695 profile: Some("dev".into()),
6696 storage,
6697 ..AutumnConfig::default()
6698 }
6699 }
6700
6701 #[test]
6702 fn preflight_returns_none_when_disabled() {
6703 let cfg = config_with_storage(StorageConfig {
6704 backend: StorageBackend::Disabled,
6705 ..StorageConfig::default()
6706 });
6707 assert!(preflight_storage(&cfg).is_none());
6708 }
6709
6710 #[test]
6711 fn preflight_provisions_local_backend_against_tempdir() {
6712 let dir = tempfile::tempdir().unwrap();
6713 let cfg = config_with_storage(StorageConfig {
6714 backend: StorageBackend::Local,
6715 local: StorageLocalConfig {
6716 root: dir.path().to_path_buf(),
6717 ..StorageLocalConfig::default()
6718 },
6719 ..StorageConfig::default()
6720 });
6721 let bootstrap = preflight_storage(&cfg).expect("local backend should provision");
6722 assert_eq!(bootstrap.store.provider_id(), "default");
6723 assert!(bootstrap.serving.is_some(), "local backend mounts a route");
6724 }
6725
6726 #[tokio::test]
6727 async fn install_registers_blob_store_on_state() {
6728 let dir = tempfile::tempdir().unwrap();
6729 let cfg = config_with_storage(StorageConfig {
6730 backend: StorageBackend::Local,
6731 local: StorageLocalConfig {
6732 root: dir.path().to_path_buf(),
6733 ..StorageLocalConfig::default()
6734 },
6735 ..StorageConfig::default()
6736 });
6737 let bootstrap: StorageBootstrap = preflight_storage(&cfg).unwrap();
6738
6739 let state = AppState::for_test();
6740 assert!(state.extension::<BlobStoreState>().is_none());
6741 let serving = bootstrap.install(&state);
6742 assert!(serving.is_some());
6743 assert!(state.extension::<BlobStoreState>().is_some());
6744 }
6745
6746 #[test]
6747 fn with_blob_store_stores_custom_store() {
6748 use crate::storage::{
6749 Blob, BlobFuture, BlobMeta, BlobStore, BlobStoreError, ByteStream,
6750 };
6751 use bytes::Bytes;
6752 use std::time::Duration;
6753
6754 struct FakeStore;
6755 impl BlobStore for FakeStore {
6756 fn provider_id(&self) -> &'static str {
6757 "fake"
6758 }
6759 fn put<'a>(&'a self, _k: &'a str, _ct: &'a str, _b: Bytes) -> BlobFuture<'a, Blob> {
6760 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6761 }
6762 fn put_stream<'a>(
6763 &'a self,
6764 _k: &'a str,
6765 _ct: &'a str,
6766 _d: ByteStream<'a>,
6767 ) -> BlobFuture<'a, Blob> {
6768 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6769 }
6770 fn get<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Bytes> {
6771 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6772 }
6773 fn delete<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, ()> {
6774 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6775 }
6776 fn head<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
6777 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6778 }
6779 fn presigned_url<'a>(
6780 &'a self,
6781 _k: &'a str,
6782 _e: Duration,
6783 ) -> BlobFuture<'a, String> {
6784 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6785 }
6786 }
6787
6788 let builder = crate::app().with_blob_store(FakeStore);
6789 assert!(builder.blob_store.is_some());
6790 }
6791
6792 #[tokio::test]
6793 async fn with_blob_store_is_installed_on_state() {
6794 use crate::storage::{
6795 Blob, BlobFuture, BlobMeta, BlobStore, BlobStoreError, ByteStream,
6796 };
6797 use bytes::Bytes;
6798 use std::time::Duration;
6799
6800 struct FakeStore;
6801 impl BlobStore for FakeStore {
6802 fn provider_id(&self) -> &'static str {
6803 "fake-installed"
6804 }
6805 fn put<'a>(&'a self, _k: &'a str, _ct: &'a str, _b: Bytes) -> BlobFuture<'a, Blob> {
6806 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6807 }
6808 fn put_stream<'a>(
6809 &'a self,
6810 _k: &'a str,
6811 _ct: &'a str,
6812 _d: ByteStream<'a>,
6813 ) -> BlobFuture<'a, Blob> {
6814 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6815 }
6816 fn get<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Bytes> {
6817 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6818 }
6819 fn delete<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, ()> {
6820 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6821 }
6822 fn head<'a>(&'a self, _k: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
6823 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6824 }
6825 fn presigned_url<'a>(
6826 &'a self,
6827 _k: &'a str,
6828 _e: Duration,
6829 ) -> BlobFuture<'a, String> {
6830 Box::pin(async { Err(BlobStoreError::Unsupported("fake".into())) })
6831 }
6832 }
6833
6834 let builder = crate::app().with_blob_store(FakeStore);
6835 let bootstrap = builder.blob_store.map(|store| StorageBootstrap {
6836 store,
6837 serving: None,
6838 });
6839 let state = AppState::for_test();
6840 assert!(state.extension::<BlobStoreState>().is_none());
6841 if let Some(b) = bootstrap {
6842 b.install(&state);
6843 }
6844 let installed = state
6845 .extension::<BlobStoreState>()
6846 .expect("store should be installed");
6847 assert_eq!(installed.store().provider_id(), "fake-installed");
6848 }
6849 }
6850
6851 struct TestPlugin {
6855 name: &'static str,
6856 route: Route,
6857 }
6858
6859 impl crate::plugin::Plugin for TestPlugin {
6860 fn name(&self) -> std::borrow::Cow<'static, str> {
6861 std::borrow::Cow::Borrowed(self.name)
6862 }
6863
6864 fn build(self, app: AppBuilder) -> AppBuilder {
6865 app.routes(vec![self.route])
6866 }
6867 }
6868
6869 #[test]
6870 fn routes_registered_before_plugin_are_user_sourced() {
6871 let user_route = test_get_route("/home", "home");
6872 let builder = app().routes(vec![user_route]);
6873 assert_eq!(builder.route_sources.len(), 1);
6874 assert_eq!(
6875 builder.route_sources[0],
6876 crate::route_listing::RouteSource::User
6877 );
6878 }
6879
6880 #[test]
6881 fn routes_registered_inside_plugin_are_plugin_sourced() {
6882 let plugin_route = test_get_route("/plugin-page", "plugin_page");
6883 let plugin = TestPlugin {
6884 name: "my-plugin",
6885 route: plugin_route,
6886 };
6887 let builder = app().plugin(plugin);
6888 assert_eq!(builder.route_sources.len(), 1);
6889 assert_eq!(
6890 builder.route_sources[0],
6891 crate::route_listing::RouteSource::Plugin("my-plugin".to_owned())
6892 );
6893 }
6894
6895 #[test]
6896 fn routes_registered_after_plugin_revert_to_user_sourced() {
6897 let plugin_route = test_get_route("/plugin-page", "plugin_page");
6898 let user_route = test_get_route("/home", "home");
6899 let plugin = TestPlugin {
6900 name: "my-plugin",
6901 route: plugin_route,
6902 };
6903 let builder = app().plugin(plugin).routes(vec![user_route]);
6904 assert_eq!(builder.route_sources.len(), 2);
6905 assert_eq!(
6906 builder.route_sources[0],
6907 crate::route_listing::RouteSource::Plugin("my-plugin".to_owned())
6908 );
6909 assert_eq!(
6910 builder.route_sources[1],
6911 crate::route_listing::RouteSource::User
6912 );
6913 }
6914
6915 struct OuterPlugin;
6917
6918 impl crate::plugin::Plugin for OuterPlugin {
6919 fn name(&self) -> std::borrow::Cow<'static, str> {
6920 "outer".into()
6921 }
6922
6923 fn build(self, app: AppBuilder) -> AppBuilder {
6924 let inner = TestPlugin {
6925 name: "inner",
6926 route: test_get_route("/inner", "inner"),
6927 };
6928 app.plugin(inner)
6929 .routes(vec![test_get_route("/outer-after", "outer_after")])
6930 }
6931 }
6932
6933 #[test]
6934 fn outer_plugin_source_restored_after_nested_plugin() {
6935 let builder = app().plugin(OuterPlugin);
6936 assert_eq!(builder.route_sources.len(), 2);
6938 assert_eq!(
6939 builder.route_sources[0],
6940 crate::route_listing::RouteSource::Plugin("inner".to_owned()),
6941 "first route should be attributed to inner plugin"
6942 );
6943 assert_eq!(
6944 builder.route_sources[1],
6945 crate::route_listing::RouteSource::Plugin("outer".to_owned()),
6946 "second route should be re-attributed to outer plugin after nested build"
6947 );
6948 }
6949}