use axum::Router;
#[cfg(feature = "with-db")]
use sea_orm_migration::MigratorTrait;
use tracing::{info, trace, warn};
#[cfg(feature = "with-db")]
use crate::db;
use crate::{
app::{AppContext, Hooks},
banner::print_banner,
cache,
config::{self, Config},
controller::ListRoutes,
environment::Environment,
errors::Error,
mailer::{EmailSender, MailerWorker},
redis,
storage::{self, Storage},
task::{self, Tasks},
worker::{self, AppWorker, Pool, Processor, RedisConnectionManager},
Result,
};
pub enum StartMode {
ServerOnly,
ServerAndWorker,
WorkerOnly,
}
pub struct BootResult {
pub app_context: AppContext,
pub router: Option<Router>,
pub processor: Option<Processor>,
}
pub struct ServeParams {
pub port: i32,
pub binding: String,
}
pub async fn start<H: Hooks>(boot: BootResult, server_config: ServeParams) -> Result<()> {
print_banner(&boot, &server_config);
let BootResult {
router,
processor,
app_context: _,
} = boot;
match (router, processor) {
(Some(router), Some(processor)) => {
tokio::spawn(async move {
if let Err(err) = process(processor).await {
tracing::error!("Error in processing: {:?}", err);
}
});
H::serve(router, server_config).await?;
}
(Some(router), None) => {
H::serve(router, server_config).await?;
}
(None, Some(processor)) => {
process(processor).await?;
}
_ => {}
}
Ok(())
}
async fn process(processor: Processor) -> Result<()> {
processor.run().await;
Ok(())
}
pub async fn run_task<H: Hooks>(
app_context: &AppContext,
task: Option<&String>,
vars: &task::Vars,
) -> Result<()> {
let mut tasks = Tasks::default();
H::register_tasks(&mut tasks);
if let Some(task) = task {
let task_span = tracing::span!(tracing::Level::DEBUG, "task", task,);
let _guard = task_span.enter();
tasks.run(app_context, task, vars).await?;
} else {
let list = tasks.list();
for item in &list {
println!("{:<30}[{}]", item.name, item.detail);
}
}
Ok(())
}
#[derive(Debug)]
pub enum RunDbCommand {
Migrate,
Down(u32),
Reset,
Status,
Entities,
Truncate,
}
#[cfg(feature = "with-db")]
pub async fn run_db<H: Hooks, M: MigratorTrait>(
app_context: &AppContext,
cmd: RunDbCommand,
) -> Result<()> {
match cmd {
RunDbCommand::Migrate => {
tracing::warn!("migrate:");
db::migrate::<M>(&app_context.db).await?;
}
RunDbCommand::Down(steps) => {
tracing::warn!("down:");
db::down::<M>(&app_context.db, steps).await?;
}
RunDbCommand::Reset => {
tracing::warn!("reset:");
db::reset::<M>(&app_context.db).await?;
}
RunDbCommand::Status => {
tracing::warn!("status:");
db::status::<M>(&app_context.db).await?;
}
RunDbCommand::Entities => {
tracing::warn!("entities:");
tracing::warn!("{}", db::entities::<M>(app_context).await?);
}
RunDbCommand::Truncate => {
tracing::warn!("truncate:");
H::truncate(&app_context.db).await?;
}
}
Ok(())
}
pub async fn create_context<H: Hooks>(environment: &Environment) -> Result<AppContext> {
let config = environment.load()?;
if config.logger.pretty_backtrace {
std::env::set_var("RUST_BACKTRACE", "1");
warn!(
"pretty backtraces are enabled (this is great for development but has a runtime cost \
for production. disable with `logger.pretty_backtrace` in your config yaml)"
);
}
#[cfg(feature = "with-db")]
let db = db::connect(&config.database).await?;
let mailer = if let Some(cfg) = config.mailer.as_ref() {
create_mailer(cfg)?
} else {
None
};
let ctx = AppContext {
environment: environment.clone(),
#[cfg(feature = "with-db")]
db,
queue: connect_redis(&config).await,
storage: Storage::single(storage::drivers::null::new()).into(),
cache: cache::Cache::new(cache::drivers::null::new()).into(),
config,
mailer,
};
H::after_context(ctx).await
}
#[cfg(feature = "with-db")]
pub async fn create_app<H: Hooks, M: MigratorTrait>(
mode: StartMode,
environment: &Environment,
) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
db::converge::<H, M>(&app_context.db, &app_context.config.database).await?;
if let Some(pool) = &app_context.queue {
redis::converge(pool, &app_context.config.queue).await?;
}
run_app::<H>(&mode, app_context).await
}
#[cfg(not(feature = "with-db"))]
pub async fn create_app<H: Hooks>(
mode: StartMode,
environment: &Environment,
) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
if let Some(pool) = &app_context.queue {
redis::converge(pool, &app_context.config.queue).await?;
}
run_app::<H>(&mode, app_context).await
}
pub async fn run_app<H: Hooks>(mode: &StartMode, app_context: AppContext) -> Result<BootResult> {
H::before_run(&app_context).await?;
let initializers = H::initializers(&app_context).await?;
info!(initializers = ?initializers.iter().map(|init| init.name()).collect::<Vec<_>>().join(","), "initializers loaded");
for initializer in &initializers {
initializer.before_run(&app_context).await?;
}
match mode {
StartMode::ServerOnly => {
let app = H::before_routes(&app_context).await?;
let app = H::routes(&app_context).to_router(app_context.clone(), app)?;
let mut router = H::after_routes(app, &app_context).await?;
for initializer in &initializers {
router = initializer.after_routes(router, &app_context).await?;
}
Ok(BootResult {
app_context,
router: Some(router),
processor: None,
})
}
StartMode::ServerAndWorker => {
let processor = create_processor::<H>(&app_context)?;
let app = H::before_routes(&app_context).await?;
let app = H::routes(&app_context).to_router(app_context.clone(), app)?;
let mut router = H::after_routes(app, &app_context).await?;
for initializer in &initializers {
router = initializer.after_routes(router, &app_context).await?;
}
Ok(BootResult {
app_context,
router: Some(router),
processor: Some(processor),
})
}
StartMode::WorkerOnly => {
let processor = create_processor::<H>(&app_context)?;
Ok(BootResult {
app_context,
router: None,
processor: Some(processor),
})
}
}
}
fn create_processor<H: Hooks>(app_context: &AppContext) -> Result<Processor> {
let queues = worker::get_queues(&app_context.config.workers.queues);
trace!(
queues = ?queues,
"registering queues (merged config and default)"
);
let mut p = if let Some(queue) = &app_context.queue {
Processor::new(queue.clone(), queues)
} else {
return Err(Error::Message(
"queue is missing, cannot initialize workers".to_string(),
));
};
p.register(MailerWorker::build(app_context));
H::connect_workers(&mut p, app_context);
trace!("done registering workers and queues");
Ok(p)
}
#[must_use]
pub fn list_endpoints<H: Hooks>(ctx: &AppContext) -> Vec<ListRoutes> {
H::routes(ctx).collect()
}
fn create_mailer(config: &config::Mailer) -> Result<Option<EmailSender>> {
if config.stub {
return Ok(Some(EmailSender::stub()));
}
if let Some(smtp) = config.smtp.as_ref() {
if smtp.enable {
return Ok(Some(EmailSender::smtp(smtp)?));
}
}
Ok(None)
}
#[allow(clippy::missing_panics_doc)]
pub async fn connect_redis(config: &Config) -> Option<Pool<RedisConnectionManager>> {
if let Some(redis) = &config.queue {
let manager = RedisConnectionManager::new(redis.uri.clone()).unwrap();
let redis = Pool::builder().build(manager).await.unwrap();
Some(redis)
} else {
None
}
}