use std::{collections::BTreeMap, str::FromStr};
use axum::Router;
#[cfg(feature = "with-db")]
use sea_orm_migration::MigratorTrait;
use tracing::trace;
#[cfg(feature = "with-db")]
use crate::db;
use crate::{
app::{AppContext, Hooks},
banner::print_banner,
config::{self, Config},
controller::ListRoutes,
environment::Environment,
errors::Error,
logger,
mailer::{EmailSender, MailerWorker},
redis,
task::Tasks,
worker::{self, AppWorker, Pool, Processor, RedisConnectionManager, DEFAULT_QUEUES},
Result,
};
pub enum StartMode {
ServerOnly,
ServerAndWorker,
WorkerOnly,
}
pub struct BootResult {
pub app_context: AppContext,
pub router: Option<Router>,
pub processor: Option<Processor>,
}
pub async fn start(boot: BootResult) -> Result<()> {
print_banner(&boot);
let BootResult {
router,
processor,
app_context,
} = boot;
match (router, processor) {
(Some(router), Some(processor)) => {
tokio::spawn(async move {
if let Err(err) = process(processor).await {
tracing::error!("Error in processing: {:?}", err);
}
});
serve(router, &app_context.config).await?;
}
(Some(router), None) => {
serve(router, &app_context.config).await?;
}
(None, Some(processor)) => {
process(processor).await?;
}
_ => {}
}
Ok(())
}
async fn process(processor: Processor) -> Result<()> {
processor.run().await;
Ok(())
}
pub async fn run_task<H: Hooks>(
app_context: &AppContext,
task: Option<&String>,
vars: &BTreeMap<String, String>,
) -> Result<()> {
let mut tasks = Tasks::default();
H::register_tasks(&mut tasks);
if let Some(task) = task {
tasks.run(app_context, task, vars).await?;
} else {
let list = tasks.list();
for item in &list {
println!("{}\t\t[{}]", item.name, item.detail);
}
}
Ok(())
}
#[derive(Debug)]
pub enum RunDbCommand {
Migrate,
Reset,
Status,
Entities,
Truncate,
}
#[cfg(feature = "with-db")]
pub async fn run_db<H: Hooks, M: MigratorTrait>(
app_context: &AppContext,
cmd: RunDbCommand,
) -> Result<()> {
match cmd {
RunDbCommand::Migrate => {
tracing::warn!("migrate:");
let _ = db::migrate::<M>(&app_context.db).await;
}
RunDbCommand::Reset => {
tracing::warn!("reset:");
let _ = db::reset::<M>(&app_context.db).await;
}
RunDbCommand::Status => {
tracing::warn!("status:");
let _ = db::status::<M>(&app_context.db).await;
}
RunDbCommand::Entities => {
tracing::warn!("entities:");
tracing::warn!("{}", db::entities::<M>(app_context)?);
}
RunDbCommand::Truncate => {
tracing::warn!("truncate:");
H::truncate(&app_context.db).await?;
}
}
Ok(())
}
async fn serve(app: Router, config: &Config) -> Result<()> {
let listener =
tokio::net::TcpListener::bind(&format!("0.0.0.0:{}", config.server.port)).await?;
axum::serve(listener, app).await?;
Ok(())
}
pub async fn create_context<H: Hooks>(environment: &str) -> Result<AppContext> {
let environment = Environment::from_str(environment)
.unwrap_or_else(|_| Environment::Any(environment.to_string()));
let config = environment.load()?;
if config.logger.enable {
logger::init::<H>(&config.logger);
}
#[cfg(feature = "with-db")]
let db = db::connect(&config.database).await?;
let mailer = if let Some(cfg) = config.mailer.as_ref() {
create_mailer(cfg)?
} else {
None
};
let redis = connect_redis(&config).await;
Ok(AppContext {
environment,
#[cfg(feature = "with-db")]
db,
redis,
config,
mailer,
})
}
#[cfg(feature = "with-db")]
pub async fn create_app<H: Hooks, M: MigratorTrait>(
mode: StartMode,
environment: &str,
) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
db::converge::<H, M>(&app_context.db, &app_context.config.database).await?;
if let Some(pool) = &app_context.redis {
redis::converge(pool, &app_context.config.redis).await?;
}
run_app::<H>(&mode, app_context)
}
#[cfg(not(feature = "with-db"))]
pub async fn create_app<H: Hooks>(mode: StartMode, environment: &str) -> Result<BootResult> {
let app_context = create_context::<H>(environment).await?;
if let Some(pool) = &app_context.redis {
redis::converge(pool, &app_context.config.redis).await?;
}
run_app::<H>(&mode, app_context)
}
pub fn run_app<H: Hooks>(mode: &StartMode, app_context: AppContext) -> Result<BootResult> {
match mode {
StartMode::ServerOnly => {
let app = H::routes().to_router(app_context.clone())?;
Ok(BootResult {
app_context,
router: Some(app),
processor: None,
})
}
StartMode::ServerAndWorker => {
let processor = create_processor::<H>(&app_context)?;
let app = H::routes().to_router(app_context.clone())?;
Ok(BootResult {
app_context,
router: Some(app),
processor: Some(processor),
})
}
StartMode::WorkerOnly => {
let processor = create_processor::<H>(&app_context)?;
Ok(BootResult {
app_context,
router: None,
processor: Some(processor),
})
}
}
}
fn create_processor<H: Hooks>(app_context: &AppContext) -> Result<Processor> {
let queues = worker::get_queues(&app_context.config.workers.queues);
trace!(
queues = ?queues,
"registering queues (merged config and default)"
);
let mut p = if let Some(redis) = &app_context.redis {
Processor::new(
redis.clone(),
DEFAULT_QUEUES
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>(),
)
} else {
return Err(Error::Message(
"redis is missing, cannot initialize workers".to_string(),
));
};
p.register(MailerWorker::build(app_context));
H::connect_workers(&mut p, app_context);
trace!("done registering workers and queues");
Ok(p)
}
#[must_use]
pub fn list_endpoints<H: Hooks>() -> Vec<ListRoutes> {
H::routes().collect()
}
fn create_mailer(config: &config::Mailer) -> Result<Option<EmailSender>> {
#[cfg(feature = "testing")]
if config.stub {
return Ok(Some(EmailSender::stub()));
}
if let Some(smtp) = config.smtp.as_ref() {
if smtp.enable {
return Ok(Some(EmailSender::smtp(smtp)?));
}
}
Ok(None)
}
async fn connect_redis(config: &Config) -> Option<Pool<RedisConnectionManager>> {
if let Some(redis) = &config.redis {
let manager = RedisConnectionManager::new(redis.uri.clone()).unwrap();
let redis = Pool::builder().build(manager).await.unwrap();
Some(redis)
} else {
None
}
}