use crate::beans::{Bean, BeanRegistry, BeanState};
use crate::controller::Controller;
use crate::lifecycle::{ShutdownHook, StartupHook};
use crate::plugin::Plugin;
use crate::scheduling::{ScheduledTaskDef, SchedulerStartFn, SchedulerStopFn};
use crate::type_list::{BuildableFrom, TCons, TNil};
use std::marker::PhantomData;
use tracing::info;
type ConsumerReg<T> =
Box<dyn FnOnce(T) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send>;
type LayerFn = Box<dyn FnOnce(crate::http::Router) -> crate::http::Router + Send>;
#[derive(Clone)]
pub struct NoState;
struct BuilderConfig {
config: Option<crate::config::QuarlusConfig>,
custom_layers: Vec<LayerFn>,
bean_registry: BeanRegistry,
}
pub struct AppBuilder<T: Clone + Send + Sync + 'static = NoState, P = TNil> {
shared: BuilderConfig,
state: Option<T>,
routes: Vec<crate::http::Router<T>>,
startup_hooks: Vec<StartupHook<T>>,
shutdown_hooks: Vec<ShutdownHook>,
route_metadata: Vec<Vec<crate::openapi::RouteInfo>>,
openapi_builder:
Option<Box<dyn FnOnce(Vec<Vec<crate::openapi::RouteInfo>>) -> crate::http::Router<T> + Send>>,
consumer_registrations: Vec<ConsumerReg<T>>,
scheduled_task_defs: Vec<ScheduledTaskDef<T>>,
scheduler_starter: Option<SchedulerStartFn<T>>,
scheduler_stopper: Option<SchedulerStopFn>,
_provided: PhantomData<P>,
}
impl AppBuilder<NoState, TNil> {
pub fn new() -> Self {
Self {
shared: BuilderConfig {
config: None,
custom_layers: Vec::new(),
bean_registry: BeanRegistry::new(),
},
state: None,
routes: Vec::new(),
startup_hooks: Vec::new(),
shutdown_hooks: Vec::new(),
route_metadata: Vec::new(),
openapi_builder: None,
consumer_registrations: Vec::new(),
scheduled_task_defs: Vec::new(),
scheduler_starter: None,
scheduler_stopper: None,
_provided: PhantomData,
}
}
}
impl<P> AppBuilder<NoState, P> {
pub fn provide<B: Clone + Send + Sync + 'static>(mut self, bean: B) -> AppBuilder<NoState, TCons<B, P>> {
self.shared.bean_registry.provide(bean);
AppBuilder {
shared: self.shared,
state: None,
routes: self.routes,
startup_hooks: self.startup_hooks,
shutdown_hooks: self.shutdown_hooks,
route_metadata: self.route_metadata,
openapi_builder: self.openapi_builder,
consumer_registrations: self.consumer_registrations,
scheduled_task_defs: self.scheduled_task_defs,
scheduler_starter: self.scheduler_starter,
scheduler_stopper: self.scheduler_stopper,
_provided: PhantomData,
}
}
pub fn with_bean<B: Bean>(mut self) -> AppBuilder<NoState, TCons<B, P>> {
self.shared.bean_registry.register::<B>();
AppBuilder {
shared: self.shared,
state: None,
routes: self.routes,
startup_hooks: self.startup_hooks,
shutdown_hooks: self.shutdown_hooks,
route_metadata: self.route_metadata,
openapi_builder: self.openapi_builder,
consumer_registrations: self.consumer_registrations,
scheduled_task_defs: self.scheduled_task_defs,
scheduler_starter: self.scheduler_starter,
scheduler_stopper: self.scheduler_stopper,
_provided: PhantomData,
}
}
pub fn build_state<S, Idx>(self) -> AppBuilder<S>
where
S: BeanState + BuildableFrom<P, Idx>,
{
self.try_build_state()
.expect("Failed to resolve bean dependency graph")
}
pub fn try_build_state<S, Idx>(
mut self,
) -> Result<AppBuilder<S>, crate::beans::BeanError>
where
S: BeanState + BuildableFrom<P, Idx>,
{
let registry = std::mem::replace(&mut self.shared.bean_registry, BeanRegistry::new());
let ctx = registry.resolve()?;
let state = S::from_context(&ctx);
Ok(AppBuilder::<S>::from_pre(self.shared, state))
}
pub fn with_state<S: Clone + Send + Sync + 'static>(self, state: S) -> AppBuilder<S> {
AppBuilder::<S>::from_pre(self.shared, state)
}
}
impl Default for AppBuilder<NoState, TNil> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + Send + Sync + 'static> AppBuilder<T> {
fn from_pre(mut shared: BuilderConfig, state: T) -> Self {
shared.bean_registry = BeanRegistry::new();
Self {
shared,
state: Some(state),
routes: Vec::new(),
startup_hooks: Vec::new(),
shutdown_hooks: Vec::new(),
route_metadata: Vec::new(),
openapi_builder: None,
consumer_registrations: Vec::new(),
scheduled_task_defs: Vec::new(),
scheduler_starter: None,
scheduler_stopper: None,
_provided: PhantomData,
}
}
pub fn with<Pl: Plugin<T>>(self, plugin: Pl) -> Self {
plugin.install(self)
}
pub fn with_config(mut self, config: crate::config::QuarlusConfig) -> Self {
self.shared.config = Some(config);
self
}
pub fn with_layer<L>(mut self, layer: L) -> Self
where
L: tower::Layer<crate::http::routing::Route> + Clone + Send + Sync + 'static,
L::Service: Clone
+ tower::Service<crate::http::header::HttpRequest<crate::http::body::Body>>
+ Send
+ Sync
+ 'static,
<L::Service as tower::Service<crate::http::header::HttpRequest<crate::http::body::Body>>>::Response:
crate::http::response::IntoResponse + 'static,
<L::Service as tower::Service<crate::http::header::HttpRequest<crate::http::body::Body>>>::Error:
Into<std::convert::Infallible> + 'static,
<L::Service as tower::Service<crate::http::header::HttpRequest<crate::http::body::Body>>>::Future:
Send + 'static,
{
self.shared
.custom_layers
.push(Box::new(move |router| router.layer(layer)));
self
}
pub fn with_layer_fn<F>(mut self, f: F) -> Self
where
F: FnOnce(crate::http::Router) -> crate::http::Router + Send + 'static,
{
self.shared.custom_layers.push(Box::new(f));
self
}
pub fn with_service_builder<F>(self, f: F) -> Self
where
F: FnOnce(crate::http::Router) -> crate::http::Router + Send + 'static,
{
self.with_layer_fn(f)
}
pub fn on_start<F, Fut>(mut self, hook: F) -> Self
where
F: FnOnce(T) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send
+ 'static,
{
self.startup_hooks
.push(Box::new(move |state| Box::pin(hook(state))));
self
}
pub fn on_stop<F, Fut>(mut self, hook: F) -> Self
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
self.shutdown_hooks
.push(Box::new(move || Box::pin(hook())));
self
}
pub fn register_routes(mut self, router: crate::http::Router<T>) -> Self {
self.routes.push(router);
self
}
pub fn register_controller<C: Controller<T>>(mut self) -> Self {
self.routes.push(C::routes());
self.route_metadata.push(C::route_metadata());
self.consumer_registrations
.push(Box::new(|state| C::register_consumers(state)));
self.scheduled_task_defs.extend(C::scheduled_tasks());
self
}
pub fn set_scheduler_backend(
mut self,
starter: SchedulerStartFn<T>,
stopper: SchedulerStopFn,
) -> Self {
self.scheduler_starter = Some(starter);
self.scheduler_stopper = Some(stopper);
self
}
pub fn with_openapi_builder<F>(mut self, f: F) -> Self
where
F: FnOnce(Vec<Vec<crate::openapi::RouteInfo>>) -> crate::http::Router<T> + Send + 'static,
{
self.openapi_builder = Some(Box::new(f));
self
}
pub fn build(mut self) -> crate::http::Router {
if !self.scheduled_task_defs.is_empty() && self.scheduler_starter.is_none() {
tracing::warn!(
"Scheduled tasks were registered but no scheduler backend was installed. \
Add `.with(Scheduler)` (from quarlus-scheduler) to start them."
);
}
self.scheduled_task_defs.clear();
self.scheduler_starter = None;
self.scheduler_stopper = None;
self.build_inner().0
}
fn build_inner(
self,
) -> (
crate::http::Router,
Vec<StartupHook<T>>,
Vec<ShutdownHook>,
Vec<ConsumerReg<T>>,
Vec<ScheduledTaskDef<T>>,
Option<SchedulerStartFn<T>>,
Option<SchedulerStopFn>,
T,
) {
let state = self
.state
.expect("AppBuilder: state must be set before build");
let mut router = crate::http::Router::new();
for r in self.routes {
router = router.merge(r);
}
if let Some(builder) = self.openapi_builder {
let openapi_router = builder(self.route_metadata);
router = router.merge(openapi_router);
}
let mut app = router.with_state(state.clone());
for layer_fn in self.shared.custom_layers {
app = layer_fn(app);
}
(
app,
self.startup_hooks,
self.shutdown_hooks,
self.consumer_registrations,
self.scheduled_task_defs,
self.scheduler_starter,
self.scheduler_stopper,
state,
)
}
pub async fn serve(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let (
app,
startup_hooks,
shutdown_hooks,
consumer_regs,
scheduled_task_defs,
scheduler_starter,
scheduler_stopper,
state,
) = self.build_inner();
for reg in consumer_regs {
reg(state.clone()).await;
}
if !scheduled_task_defs.is_empty() {
if let Some(starter) = scheduler_starter {
info!(
count = scheduled_task_defs.len(),
"Starting scheduled tasks"
);
starter(scheduled_task_defs, state.clone());
} else {
tracing::warn!(
"Scheduled tasks were registered but no scheduler backend was installed. \
Add `.with(Scheduler)` (from quarlus-scheduler) to start them."
);
}
}
for hook in startup_hooks {
hook(state.clone())
.await
.map_err(|e| -> Box<dyn std::error::Error> { e })?;
}
let listener = tokio::net::TcpListener::bind(addr).await?;
info!(%addr, "Quarlus server listening");
crate::http::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
if let Some(stopper) = scheduler_stopper {
stopper();
}
for hook in shutdown_hooks {
hook().await;
}
info!("Quarlus server stopped");
Ok(())
}
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for Ctrl-C");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to listen for SIGTERM")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
tracing::info!("Shutdown signal received, starting graceful shutdown");
}