1use std::{
5 env,
6 path::{Path, PathBuf},
7 sync::Arc,
8};
9
10use axum::Router;
11#[cfg(feature = "with-db")]
12use sea_orm_migration::MigratorTrait;
13use tokio::{select, signal, task::JoinHandle};
14use tracing::{debug, error, info, warn};
15
16#[cfg(feature = "with-db")]
17use crate::db;
18use crate::{
19 app::{AppContext, Hooks, Initializer},
20 banner::print_banner,
21 bgworker, cache,
22 config::{self, Config, WorkerMode},
23 controller::ListRoutes,
24 env_vars,
25 environment::Environment,
26 errors::Error,
27 mailer::{EmailSender, MailerWorker},
28 prelude::BackgroundWorker,
29 scheduler::{self, Scheduler},
30 storage::{self, Storage},
31 task::{self, Tasks},
32 Result,
33};
34
35#[derive(Debug)]
37pub enum StartMode {
38 ServerOnly,
41 ServerAndWorker,
43 WorkerOnly {
45 tags: Vec<String>,
48 },
49 All,
51}
52
53pub struct BootResult {
54 pub app_context: AppContext,
56 pub router: Option<Router>,
58 pub worker: Option<Vec<String>>,
60 pub run_scheduler: bool,
62}
63
64#[derive(Debug)]
66pub struct ServeParams {
67 pub port: i32,
70 pub binding: String,
73}
74
75pub async fn start<H: Hooks>(
84 boot: BootResult,
85 server_config: ServeParams,
86 no_banner: bool,
87) -> Result<()> {
88 if boot.run_scheduler {
89 let scheduler = scheduler::<H>(&boot.app_context, None, None, None)?;
90 tokio::spawn(async move {
91 let res = scheduler.run().await;
92 if res.is_err() {
93 error!(
94 err = res.unwrap_err().to_string(),
95 "error while running scheduler"
96 );
97 }
98 });
99 }
100
101 if !no_banner {
102 print_banner(&boot, &server_config);
103 }
104
105 let BootResult {
106 router,
107 worker,
108 run_scheduler: _,
109 app_context,
110 } = boot;
111
112 match (router, worker) {
113 (Some(router), None) => {
114 H::serve(router, &app_context, &server_config).await?;
115 }
116 (Some(router), Some(tags)) => {
117 let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
118 Some(start_queue_worker(&app_context, tags)?)
119 } else {
120 None
121 };
122
123 H::serve(router, &app_context, &server_config).await?;
124
125 if let Some(handle) = handle {
126 shutdown_and_await_queue_worker(&app_context, handle).await?;
127 }
128 }
129 (None, Some(tags)) => {
130 let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
131 Some(start_queue_worker(&app_context, tags)?)
132 } else {
133 None
134 };
135
136 shutdown_signal().await;
137
138 if let Some(handle) = handle {
139 shutdown_and_await_queue_worker(&app_context, handle).await?;
140 }
141 }
142 _ => {}
143 }
144 Ok(())
145}
146
147fn start_queue_worker(app_context: &AppContext, tags: Vec<String>) -> Result<JoinHandle<()>> {
148 debug!("note: worker is run in-process (tokio spawn)");
149
150 if let Some(queue) = &app_context.queue_provider {
151 let cloned_queue = queue.clone();
152 let handle = tokio::spawn(async move {
153 let res = cloned_queue.run(tags).await;
154 if res.is_err() {
155 error!(
156 err = res.unwrap_err().to_string(),
157 "error while running worker"
158 );
159 }
160 });
161 return Ok(handle);
162 }
163
164 Err(Error::QueueProviderMissing)
165}
166
167async fn shutdown_and_await_queue_worker(
168 app_context: &AppContext,
169 handle: JoinHandle<()>,
170) -> Result<(), Error> {
171 if let Some(queue) = &app_context.queue_provider {
172 queue.shutdown()?;
173 }
174
175 println!("press ctrl-c again to force quit");
176 select! {
177 _ = handle => {}
178 () = shutdown_signal() => {}
179 }
180 Ok(())
181}
182
183pub async fn run_task<H: Hooks>(
189 app_context: &AppContext,
190 task: Option<&String>,
191 vars: &task::Vars,
192) -> Result<()> {
193 let mut tasks = Tasks::default();
194 H::register_tasks(&mut tasks);
195
196 if let Some(task) = task {
197 let task_span = tracing::span!(tracing::Level::DEBUG, "task", task,);
198 let _guard = task_span.enter();
199 tasks.run(app_context, task, vars).await?;
200 } else {
201 let list = tasks.list();
202 for item in &list {
203 println!("{:<30}[{}]", item.name, item.detail);
204 }
205 }
206 Ok(())
207}
208
209fn scheduler<H: Hooks>(
211 app_context: &AppContext,
212 config: Option<&PathBuf>,
213 name: Option<String>,
214 tag: Option<String>,
215) -> Result<Scheduler> {
216 let env_config_path = env::var(env_vars::SCHEDULER_CONFIG).ok();
217
218 let config_path: Option<&Path> = config.map_or_else(
219 || env_config_path.as_deref().map(Path::new),
220 |path| Some(path.as_path()),
221 );
222
223 let scheduler = match config_path {
224 Some(path) => Scheduler::from_config::<H>(path, &app_context.environment)?,
225 None => {
226 if let Some(config) = &app_context.config.scheduler {
227 Scheduler::new::<H>(config, &app_context.environment)?
228 } else {
229 return Err(Error::Scheduler(scheduler::Error::Empty));
230 }
231 }
232 };
233
234 Ok(scheduler.by_spec(&scheduler::Spec { name, tag }))
235}
236
237pub async fn run_scheduler<H: Hooks>(
251 app_context: &AppContext,
252 config: Option<&PathBuf>,
253 name: Option<String>,
254 tag: Option<String>,
255 list: bool,
256) -> Result<()> {
257 let task_span = tracing::span!(tracing::Level::DEBUG, "scheduler_jobs");
258 let _guard = task_span.enter();
259
260 let scheduler = scheduler::<H>(app_context, config, name, tag)?;
261 if list {
262 println!("{scheduler}");
263 Ok(())
264 } else {
265 Ok(scheduler.run().await?)
266 }
267}
268
269#[derive(Debug)]
271pub enum RunDbCommand {
272 Migrate,
274 Down(u32),
276 Reset,
278 Status,
280 Entities,
282 Truncate,
285 Seed {
287 reset: bool,
288 from: PathBuf,
289 dump: bool,
290 dump_tables: Option<Vec<String>>,
291 },
292 Schema,
294}
295
296#[cfg(feature = "with-db")]
297pub async fn run_db<H: Hooks, M: MigratorTrait>(
304 app_context: &AppContext,
305 cmd: RunDbCommand,
306) -> Result<()> {
307 match cmd {
308 RunDbCommand::Migrate => {
309 tracing::warn!("migrate:");
310 db::migrate::<M>(&app_context.db).await?;
311 }
312 RunDbCommand::Down(steps) => {
313 tracing::warn!("down:");
314 db::down::<M>(&app_context.db, steps).await?;
315 }
316 RunDbCommand::Reset => {
317 tracing::warn!("reset:");
318 db::reset::<M>(&app_context.db).await?;
319 }
320 RunDbCommand::Status => {
321 tracing::warn!("status:");
322 db::status::<M>(&app_context.db).await?;
323 }
324 RunDbCommand::Entities => {
325 tracing::warn!("entities:");
326
327 tracing::warn!("{}", db::entities::<M>(app_context).await?);
328 }
329 RunDbCommand::Truncate => {
330 tracing::warn!("truncate:");
331 H::truncate(app_context).await?;
332 }
333 RunDbCommand::Seed {
334 reset,
335 from,
336 dump,
337 dump_tables,
338 } => {
339 tracing::warn!(reset = reset, from = %from.display(), "seed:");
340
341 if dump || dump_tables.is_some() {
342 db::dump_tables(&app_context.db, from.as_path(), dump_tables).await?;
343 } else {
344 if reset {
345 db::reset::<M>(&app_context.db).await?;
346 }
347 db::run_app_seed::<H>(app_context, &from).await?;
348 }
349 }
350 RunDbCommand::Schema => {
351 db::dump_schema(app_context, "schema_dump.json").await?;
352 println!("Database schema dumped to 'schema_dump.json'");
353 }
354 }
355 Ok(())
356}
357
358pub async fn create_context<H: Hooks>(
364 environment: &Environment,
365 config: Config,
366) -> Result<AppContext> {
367 if config.logger.pretty_backtrace {
368 std::env::set_var("RUST_BACKTRACE", "1");
369 warn!(
370 "pretty backtraces are enabled (this is great for development but has a runtime cost \
371 for production. disable with `logger.pretty_backtrace` in your config yaml)"
372 );
373 }
374 #[cfg(feature = "with-db")]
375 let db = db::connect(&config.database).await?;
376
377 let mailer = if let Some(cfg) = config.mailer.as_ref() {
378 create_mailer(cfg)?
379 } else {
380 None
381 };
382
383 let queue_provider = bgworker::create_queue_provider(&config).await?;
384 let ctx = AppContext {
385 environment: environment.clone(),
386 #[cfg(feature = "with-db")]
387 db,
388 queue_provider,
389 storage: Storage::single(storage::drivers::null::new()).into(),
390 cache: cache::create_cache_provider(&config).await?,
391 config,
392 mailer,
393 shared_store: Arc::new(crate::app::SharedStore::default()),
394 };
395
396 H::after_context(ctx).await
397}
398
399#[cfg(feature = "with-db")]
400pub async fn create_app<H: Hooks, M: MigratorTrait>(
406 mode: StartMode,
407 environment: &Environment,
408 config: Config,
409) -> Result<BootResult> {
410 let app_context = create_context::<H>(environment, config).await?;
411 db::converge::<H, M>(&app_context, &app_context.config.database).await?;
412
413 if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
414 bgworker::converge(queue, config).await?;
415 }
416
417 run_app::<H>(&mode, app_context).await
418}
419
420#[cfg(not(feature = "with-db"))]
421pub async fn create_app<H: Hooks>(
422 mode: StartMode,
423 environment: &Environment,
424 config: Config,
425) -> Result<BootResult> {
426 let app_context = create_context::<H>(environment, config).await?;
427
428 if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
429 bgworker::converge(queue, config).await?;
430 }
431
432 run_app::<H>(&mode, app_context).await
433}
434
435pub async fn run_app<H: Hooks>(mode: &StartMode, app_context: AppContext) -> Result<BootResult> {
440 H::before_run(&app_context).await?;
441 let initializers = H::initializers(&app_context).await?;
442
443 info!(
444 initializers = ?initializers.iter().map(|init| init.name()).collect::<Vec<_>>().join(","),
445 "initializers loaded"
446 );
447
448 for initializer in &initializers {
449 initializer.before_run(&app_context).await?;
450 }
451
452 match mode {
453 StartMode::ServerOnly => {
454 let router = setup_routes::<H>(&app_context, &initializers).await?;
455 Ok(BootResult {
456 app_context,
457 router: Some(router),
458 worker: None,
459 run_scheduler: false,
460 })
461 }
462 StartMode::ServerAndWorker => {
463 register_workers::<H>(&app_context).await?;
464 let router = setup_routes::<H>(&app_context, &initializers).await?;
465 Ok(BootResult {
466 app_context,
467 router: Some(router),
468 worker: Some(vec![]),
469 run_scheduler: false,
470 })
471 }
472 StartMode::All => {
473 register_workers::<H>(&app_context).await?;
474 let router = setup_routes::<H>(&app_context, &initializers).await?;
475 Ok(BootResult {
476 app_context,
477 router: Some(router),
478 worker: Some(vec![]),
479 run_scheduler: true,
480 })
481 }
482 StartMode::WorkerOnly { tags } => {
483 register_workers::<H>(&app_context).await?;
484 Ok(BootResult {
485 app_context,
486 router: None,
487 worker: Some(tags.clone()),
488 run_scheduler: false,
489 })
490 }
491 }
492}
493
494async fn setup_routes<H: Hooks>(
496 app_context: &AppContext,
497 initializers: &[Box<dyn Initializer>],
498) -> Result<Router> {
499 let app = H::before_routes(app_context).await?;
500 let app = H::routes(app_context).to_router::<H>(app_context.clone(), app)?;
501 let mut router = H::after_routes(app, app_context).await?;
502
503 for initializer in initializers {
504 router = initializer.after_routes(router, app_context).await?;
505 }
506
507 Ok(router)
508}
509
510async fn register_workers<H: Hooks>(app_context: &AppContext) -> Result<()> {
511 if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
512 if let Some(queue) = &app_context.queue_provider {
513 queue.register(MailerWorker::build(app_context)).await?;
514 H::connect_workers(app_context, queue).await?;
515 } else {
516 return Err(Error::QueueProviderMissing);
517 }
518
519 debug!("done registering workers and queues");
520 }
521 Ok(())
522}
523
524#[must_use]
525pub fn list_endpoints<H: Hooks>(ctx: &AppContext) -> Vec<ListRoutes> {
526 H::routes(ctx).collect()
527}
528
529pub async fn shutdown_signal() {
536 let ctrl_c = async {
537 signal::ctrl_c()
538 .await
539 .expect("failed to install Ctrl+C handler");
540 };
541
542 #[cfg(unix)]
543 let terminate = async {
544 signal::unix::signal(signal::unix::SignalKind::terminate())
545 .expect("failed to install signal handler")
546 .recv()
547 .await;
548 };
549
550 #[cfg(not(unix))]
551 let terminate = std::future::pending::<()>();
552
553 tokio::select! {
554 () = ctrl_c => {},
555 () = terminate => {},
556 }
557}
558
559pub struct MiddlewareInfo {
560 pub id: String,
561 pub enabled: bool,
562 pub detail: String,
563}
564
565#[must_use]
566pub fn list_middlewares<H: Hooks>(ctx: &AppContext) -> Vec<MiddlewareInfo> {
567 H::middlewares(ctx)
568 .iter()
569 .map(|m| MiddlewareInfo {
570 id: m.name().to_string(),
571 enabled: m.is_enabled(),
572 detail: m.config().unwrap_or_default().to_string(),
573 })
574 .collect::<Vec<_>>()
575}
576
577fn create_mailer(config: &config::Mailer) -> Result<Option<EmailSender>> {
580 if config.stub {
581 return Ok(Some(EmailSender::stub()));
582 }
583 if let Some(smtp) = config.smtp.as_ref() {
584 if smtp.enable {
585 return Ok(Some(EmailSender::smtp(smtp)?));
586 }
587 }
588 Ok(None)
589}