use std::path::PathBuf;
use axum::Router;
#[cfg(feature = "with-db")]
use sea_orm_migration::MigratorTrait;
use tokio::{select, signal, task::JoinHandle};
use tracing::{debug, error, info, warn};
#[cfg(feature = "with-db")]
use crate::db;
use crate::{
app::{AppContext, Hooks},
banner::print_banner,
bgworker, cache,
config::{self, WorkerMode},
controller::ListRoutes,
environment::Environment,
errors::Error,
mailer::{EmailSender, MailerWorker},
prelude::BackgroundWorker,
scheduler::{self, Scheduler},
storage::{self, Storage},
task::{self, Tasks},
Result,
};
#[derive(Debug)]
pub enum StartMode {
ServerOnly,
ServerAndWorker,
WorkerOnly,
}
pub struct BootResult {
pub app_context: AppContext,
pub router: Option<Router>,
pub run_worker: bool,
}
#[derive(Debug)]
pub struct ServeParams {
pub port: i32,
pub binding: String,
}
pub async fn start<H: Hooks>(
boot: BootResult,
server_config: ServeParams,
no_banner: bool,
) -> Result<()> {
if !no_banner {
print_banner(&boot, &server_config);
}
let BootResult {
router,
run_worker,
app_context,
} = boot;
match (router, run_worker) {
(Some(router), false) => {
H::serve(router, &app_context, &server_config).await?;
}
(Some(router), true) => {
let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
Some(start_queue_worker(&app_context)?)
} else {
None
};
H::serve(router, &app_context, &server_config).await?;
if let Some(handle) = handle {
shutdown_and_await_queue_worker(&app_context, handle).await?;
}
}
(None, true) => {
let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
Some(start_queue_worker(&app_context)?)
} else {
None
};
shutdown_signal().await;
if let Some(handle) = handle {
shutdown_and_await_queue_worker(&app_context, handle).await?;
}
}
_ => {}
}
Ok(())
}
fn start_queue_worker(app_context: &AppContext) -> Result<JoinHandle<()>> {
debug!("note: worker is run in-process (tokio spawn)");
if let Some(queue) = &app_context.queue_provider {
let cloned_queue = queue.clone();
let handle = tokio::spawn(async move {
let res = cloned_queue.run().await;
if res.is_err() {
error!(
err = res.unwrap_err().to_string(),
"error while running worker"
);
}
});
return Ok(handle);
}
Err(Error::QueueProviderMissing)
}
async fn shutdown_and_await_queue_worker(
app_context: &AppContext,
handle: JoinHandle<()>,
) -> Result<(), Error> {
if let Some(queue) = &app_context.queue_provider {
queue.shutdown()?;
}
println!("press ctrl-c again to force quit");
select! {
_ = handle => {}
() = shutdown_signal() => {}
}
Ok(())
}
pub async fn run_task<H: Hooks>(
app_context: &AppContext,
task: Option<&String>,
vars: &task::Vars,
) -> Result<()> {
let mut tasks = Tasks::default();
H::register_tasks(&mut tasks);
if let Some(task) = task {
let task_span = tracing::span!(tracing::Level::DEBUG, "task", task,);
let _guard = task_span.enter();
tasks.run(app_context, task, vars).await?;
} else {
let list = tasks.list();
for item in &list {
println!("{:<30}[{}]", item.name, item.detail);
}
}
Ok(())
}
pub async fn run_scheduler<H: Hooks>(
app_context: &AppContext,
config: Option<&PathBuf>,
name: Option<String>,
tag: Option<String>,
list: bool,
) -> Result<()> {
let mut tasks = Tasks::default();
H::register_tasks(&mut tasks);
let task_span = tracing::span!(tracing::Level::DEBUG, "scheduler_jobs");
let _guard = task_span.enter();
let scheduler = match config {
Some(path) => Scheduler::from_config::<H>(path, &app_context.environment)?,
None => {
if let Some(config) = &app_context.config.scheduler {
Scheduler::new::<H>(config, &app_context.environment)?
} else {
return Err(Error::Scheduler(scheduler::Error::Empty));
}
}
};
let scheduler = scheduler.by_spec(&scheduler::Spec { name, tag });
if list {
println!("{scheduler}");
Ok(())
} else {
Ok(scheduler.run().await?)
}
}
#[derive(Debug)]
pub enum RunDbCommand {
Migrate,
Down(u32),
Reset,
Status,
Entities,
Truncate,
}
#[cfg(feature = "with-db")]
pub async fn run_db<H: Hooks, M: MigratorTrait>(
app_context: &AppContext,
cmd: RunDbCommand,
) -> Result<()> {
match cmd {
RunDbCommand::Migrate => {
tracing::warn!("migrate:");
db::migrate::<M>(&app_context.db).await?;
}
RunDbCommand::Down(steps) => {
tracing::warn!("down:");
db::down::<M>(&app_context.db, steps).await?;
}
RunDbCommand::Reset => {
tracing::warn!("reset:");
db::reset::<M>(&app_context.db).await?;
}
RunDbCommand::Status => {
tracing::warn!("status:");
db::status::<M>(&app_context.db).await?;
}
RunDbCommand::Entities => {
tracing::warn!("entities:");
tracing::warn!("{}", db::entities::<M>(app_context).await?);
}
RunDbCommand::Truncate => {
tracing::warn!("truncate:");
H::truncate(&app_context.db).await?;
}
}
Ok(())
}
pub async fn create_context<H: Hooks>(environment: &Environment) -> Result<AppContext> {
let config = environment.load()?;
if config.logger.pretty_backtrace {
std::env::set_var("RUST_BACKTRACE", "1");
warn!(
"pretty backtraces are enabled (this is great for development but has a runtime cost \
for production. disable with `logger.pretty_backtrace` in your config yaml)"
);
}
#[cfg(feature = "with-db")]
let db = db::connect(&config.database).await?;
let mailer = if let Some(cfg) = config.mailer.as_ref() {
create_mailer(cfg)?
} else {
None
};
let queue_provider = bgworker::create_queue_provider(&config).await?;
let ctx = AppContext {
environment: environment.clone(),
#[cfg(feature = "with-db")]
db,
queue_provider,
storage: Storage::single(storage::drivers::null::new()).into(),
cache: cache::Cache::new(cache::drivers::null::new()).into(),
config,
mailer,
};
H::after_context(ctx).await
}
#[cfg(feature = "with-db")]
pub async fn create_app<H: Hooks, M: MigratorTrait>(
mode: StartMode,
environment: &Environment,
) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
db::converge::<H, M>(&app_context.db, &app_context.config.database).await?;
if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
bgworker::converge(queue, config).await?;
}
run_app::<H>(&mode, app_context).await
}
#[cfg(not(feature = "with-db"))]
pub async fn create_app<H: Hooks>(
mode: StartMode,
environment: &Environment,
) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
bgworker::converge(queue, config).await?;
}
run_app::<H>(&mode, app_context).await
}
pub async fn run_app<H: Hooks>(mode: &StartMode, app_context: AppContext) -> Result<BootResult> {
H::before_run(&app_context).await?;
let initializers = H::initializers(&app_context).await?;
info!(initializers = ?initializers.iter().map(|init| init.name()).collect::<Vec<_>>().join(","), "initializers loaded");
for initializer in &initializers {
initializer.before_run(&app_context).await?;
}
match mode {
StartMode::ServerOnly => {
let app = H::before_routes(&app_context).await?;
let app = H::routes(&app_context).to_router::<H>(app_context.clone(), app)?;
let mut router = H::after_routes(app, &app_context).await?;
for initializer in &initializers {
router = initializer.after_routes(router, &app_context).await?;
}
Ok(BootResult {
app_context,
router: Some(router),
run_worker: false,
})
}
StartMode::ServerAndWorker => {
register_workers::<H>(&app_context).await?;
let app = H::before_routes(&app_context).await?;
let app = H::routes(&app_context).to_router::<H>(app_context.clone(), app)?;
let mut router = H::after_routes(app, &app_context).await?;
for initializer in &initializers {
router = initializer.after_routes(router, &app_context).await?;
}
Ok(BootResult {
app_context,
router: Some(router),
run_worker: true,
})
}
StartMode::WorkerOnly => {
register_workers::<H>(&app_context).await?;
Ok(BootResult {
app_context,
router: None,
run_worker: true,
})
}
}
}
async fn register_workers<H: Hooks>(app_context: &AppContext) -> Result<()> {
if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
if let Some(queue) = &app_context.queue_provider {
queue.register(MailerWorker::build(app_context)).await?;
H::connect_workers(app_context, queue).await?;
} else {
return Err(Error::QueueProviderMissing);
}
debug!("done registering workers and queues");
}
Ok(())
}
#[must_use]
pub fn list_endpoints<H: Hooks>(ctx: &AppContext) -> Vec<ListRoutes> {
H::routes(ctx).collect()
}
pub async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
}
}
pub struct MiddlewareInfo {
pub id: String,
pub enabled: bool,
pub detail: String,
}
#[must_use]
pub fn list_middlewares<H: Hooks>(ctx: &AppContext) -> Vec<MiddlewareInfo> {
H::middlewares(ctx)
.iter()
.map(|m| MiddlewareInfo {
id: m.name().to_string(),
enabled: m.is_enabled(),
detail: m.config().unwrap_or_default().to_string(),
})
.collect::<Vec<_>>()
}
fn create_mailer(config: &config::Mailer) -> Result<Option<EmailSender>> {
if config.stub {
return Ok(Some(EmailSender::stub()));
}
if let Some(smtp) = config.smtp.as_ref() {
if smtp.enable {
return Ok(Some(EmailSender::smtp(smtp)?));
}
}
Ok(None)
}