by_loco/
boot.rs

1//! # Application Bootstrapping and Logic
2//! This module contains functions and structures for bootstrapping and running
3//! your application.
4use std::{
5    env,
6    path::{Path, PathBuf},
7    sync::Arc,
8};
9
10use axum::Router;
11#[cfg(feature = "with-db")]
12use sea_orm_migration::MigratorTrait;
13use tokio::{select, signal, task::JoinHandle};
14use tracing::{debug, error, info, warn};
15
16#[cfg(feature = "with-db")]
17use crate::db;
18use crate::{
19    app::{AppContext, Hooks, Initializer},
20    banner::print_banner,
21    bgworker, cache,
22    config::{self, Config, WorkerMode},
23    controller::ListRoutes,
24    env_vars,
25    environment::Environment,
26    errors::Error,
27    mailer::{EmailSender, MailerWorker},
28    prelude::BackgroundWorker,
29    scheduler::{self, Scheduler},
30    storage::{self, Storage},
31    task::{self, Tasks},
32    Result,
33};
34
35/// Represents the application startup mode.
36#[derive(Debug)]
37pub enum StartMode {
38    /// Run the application as a server only. when running web server only,
39    /// workers job will not handle.
40    ServerOnly,
41    /// Run the application web server and the worker in the same process.
42    ServerAndWorker,
43    /// Pulling job worker and execute them
44    WorkerOnly {
45        /// Specifies that the worker should only handle jobs associated with one of these tags.
46        /// If empty, the worker handles all jobs.
47        tags: Vec<String>,
48    },
49    /// Run the app with all available components in the same process.
50    All,
51}
52
53pub struct BootResult {
54    /// Application Context
55    pub app_context: AppContext,
56    /// Web server routes
57    pub router: Option<Router>,
58    /// worker processor
59    pub worker: Option<Vec<String>>,
60    /// scheduler processor
61    pub run_scheduler: bool,
62}
63
64/// Configuration structure for serving an application.
65#[derive(Debug)]
66pub struct ServeParams {
67    /// The port number on which the server will listen for incoming
68    /// connections.
69    pub port: i32,
70    /// The network address to which the server will bind. It specifies the
71    /// interface to listen on.
72    pub binding: String,
73}
74
75/// Runs the application based on the provided `BootResult`.
76///
77/// This function is responsible for starting the application, including the
78/// server and/or workers.
79///
80/// # Errors
81///
82/// When could not initialize the application.
83pub async fn start<H: Hooks>(
84    boot: BootResult,
85    server_config: ServeParams,
86    no_banner: bool,
87) -> Result<()> {
88    if boot.run_scheduler {
89        let scheduler = scheduler::<H>(&boot.app_context, None, None, None)?;
90        tokio::spawn(async move {
91            let res = scheduler.run().await;
92            if res.is_err() {
93                error!(
94                    err = res.unwrap_err().to_string(),
95                    "error while running scheduler"
96                );
97            }
98        });
99    }
100
101    if !no_banner {
102        print_banner(&boot, &server_config);
103    }
104
105    let BootResult {
106        router,
107        worker,
108        run_scheduler: _,
109        app_context,
110    } = boot;
111
112    match (router, worker) {
113        (Some(router), None) => {
114            H::serve(router, &app_context, &server_config).await?;
115        }
116        (Some(router), Some(tags)) => {
117            let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
118                Some(start_queue_worker(&app_context, tags)?)
119            } else {
120                None
121            };
122
123            H::serve(router, &app_context, &server_config).await?;
124
125            if let Some(handle) = handle {
126                shutdown_and_await_queue_worker(&app_context, handle).await?;
127            }
128        }
129        (None, Some(tags)) => {
130            let handle = if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
131                Some(start_queue_worker(&app_context, tags)?)
132            } else {
133                None
134            };
135
136            shutdown_signal().await;
137
138            if let Some(handle) = handle {
139                shutdown_and_await_queue_worker(&app_context, handle).await?;
140            }
141        }
142        _ => {}
143    }
144    Ok(())
145}
146
147fn start_queue_worker(app_context: &AppContext, tags: Vec<String>) -> Result<JoinHandle<()>> {
148    debug!("note: worker is run in-process (tokio spawn)");
149
150    if let Some(queue) = &app_context.queue_provider {
151        let cloned_queue = queue.clone();
152        let handle = tokio::spawn(async move {
153            let res = cloned_queue.run(tags).await;
154            if res.is_err() {
155                error!(
156                    err = res.unwrap_err().to_string(),
157                    "error while running worker"
158                );
159            }
160        });
161        return Ok(handle);
162    }
163
164    Err(Error::QueueProviderMissing)
165}
166
167async fn shutdown_and_await_queue_worker(
168    app_context: &AppContext,
169    handle: JoinHandle<()>,
170) -> Result<(), Error> {
171    if let Some(queue) = &app_context.queue_provider {
172        queue.shutdown()?;
173    }
174
175    println!("press ctrl-c again to force quit");
176    select! {
177        _ = handle => {}
178        () = shutdown_signal() => {}
179    }
180    Ok(())
181}
182
183/// Run task
184///
185/// # Errors
186///
187/// When running could not run the task.
188pub async fn run_task<H: Hooks>(
189    app_context: &AppContext,
190    task: Option<&String>,
191    vars: &task::Vars,
192) -> Result<()> {
193    let mut tasks = Tasks::default();
194    H::register_tasks(&mut tasks);
195
196    if let Some(task) = task {
197        let task_span = tracing::span!(tracing::Level::DEBUG, "task", task,);
198        let _guard = task_span.enter();
199        tasks.run(app_context, task, vars).await?;
200    } else {
201        let list = tasks.list();
202        for item in &list {
203            println!("{:<30}[{}]", item.name, item.detail);
204        }
205    }
206    Ok(())
207}
208
209/// Initializes a new scheduler instance based on the provided configuration and context.
210fn scheduler<H: Hooks>(
211    app_context: &AppContext,
212    config: Option<&PathBuf>,
213    name: Option<String>,
214    tag: Option<String>,
215) -> Result<Scheduler> {
216    let env_config_path = env::var(env_vars::SCHEDULER_CONFIG).ok();
217
218    let config_path: Option<&Path> = config.map_or_else(
219        || env_config_path.as_deref().map(Path::new),
220        |path| Some(path.as_path()),
221    );
222
223    let scheduler = match config_path {
224        Some(path) => Scheduler::from_config::<H>(path, &app_context.environment)?,
225        None => {
226            if let Some(config) = &app_context.config.scheduler {
227                Scheduler::new::<H>(config, &app_context.environment)?
228            } else {
229                return Err(Error::Scheduler(scheduler::Error::Empty));
230            }
231        }
232    };
233
234    Ok(scheduler.by_spec(&scheduler::Spec { name, tag }))
235}
236
237/// Runs the scheduler with the given configuration and context. in case if list
238/// args is true prints scheduler job configuration
239///
240/// This function initializes the scheduler, registers tasks through the
241/// provided [`Hooks`], and executes the scheduler based on the specified
242/// configuration or context. The scheduler continuously runs, managing and
243/// executing scheduled tasks until a signal is received to shut down.
244/// Upon receiving this signal, the function gracefully shuts down all running
245/// tasks and exits safely.
246///
247/// # Errors
248///
249/// When running could not run the scheduler.
250pub async fn run_scheduler<H: Hooks>(
251    app_context: &AppContext,
252    config: Option<&PathBuf>,
253    name: Option<String>,
254    tag: Option<String>,
255    list: bool,
256) -> Result<()> {
257    let task_span = tracing::span!(tracing::Level::DEBUG, "scheduler_jobs");
258    let _guard = task_span.enter();
259
260    let scheduler = scheduler::<H>(app_context, config, name, tag)?;
261    if list {
262        println!("{scheduler}");
263        Ok(())
264    } else {
265        Ok(scheduler.run().await?)
266    }
267}
268
269/// Represents commands for handling database-related operations.
270#[derive(Debug)]
271pub enum RunDbCommand {
272    /// Apply pending migrations.
273    Migrate,
274    /// Run one or more down migrations.
275    Down(u32),
276    /// Drop all tables, then reapply all migrations.
277    Reset,
278    /// Check the status of all migrations.
279    Status,
280    /// Generate entity.
281    Entities,
282    /// Truncate tables, by executing the implementation in [`Hooks::seed`]
283    /// (without dropping).
284    Truncate,
285    /// Seed database.
286    Seed {
287        reset: bool,
288        from: PathBuf,
289        dump: bool,
290        dump_tables: Option<Vec<String>>,
291    },
292    /// Dump database schema
293    Schema,
294}
295
296#[cfg(feature = "with-db")]
297/// Handles database commands.
298///
299/// # Errors
300///
301/// Return an error when the given command fails. mostly return
302/// [`sea_orm::DbErr`]
303pub async fn run_db<H: Hooks, M: MigratorTrait>(
304    app_context: &AppContext,
305    cmd: RunDbCommand,
306) -> Result<()> {
307    match cmd {
308        RunDbCommand::Migrate => {
309            tracing::warn!("migrate:");
310            db::migrate::<M>(&app_context.db).await?;
311        }
312        RunDbCommand::Down(steps) => {
313            tracing::warn!("down:");
314            db::down::<M>(&app_context.db, steps).await?;
315        }
316        RunDbCommand::Reset => {
317            tracing::warn!("reset:");
318            db::reset::<M>(&app_context.db).await?;
319        }
320        RunDbCommand::Status => {
321            tracing::warn!("status:");
322            db::status::<M>(&app_context.db).await?;
323        }
324        RunDbCommand::Entities => {
325            tracing::warn!("entities:");
326
327            tracing::warn!("{}", db::entities::<M>(app_context).await?);
328        }
329        RunDbCommand::Truncate => {
330            tracing::warn!("truncate:");
331            H::truncate(app_context).await?;
332        }
333        RunDbCommand::Seed {
334            reset,
335            from,
336            dump,
337            dump_tables,
338        } => {
339            tracing::warn!(reset = reset, from = %from.display(), "seed:");
340
341            if dump || dump_tables.is_some() {
342                db::dump_tables(&app_context.db, from.as_path(), dump_tables).await?;
343            } else {
344                if reset {
345                    db::reset::<M>(&app_context.db).await?;
346                }
347                db::run_app_seed::<H>(app_context, &from).await?;
348            }
349        }
350        RunDbCommand::Schema => {
351            db::dump_schema(app_context, "schema_dump.json").await?;
352            println!("Database schema dumped to 'schema_dump.json'");
353        }
354    }
355    Ok(())
356}
357
358/// Initializes the application context by loading configuration and
359/// establishing connections.
360///
361/// # Errors
362/// When has an error to create DB connection.
363pub async fn create_context<H: Hooks>(
364    environment: &Environment,
365    config: Config,
366) -> Result<AppContext> {
367    if config.logger.pretty_backtrace {
368        std::env::set_var("RUST_BACKTRACE", "1");
369        warn!(
370            "pretty backtraces are enabled (this is great for development but has a runtime cost \
371             for production. disable with `logger.pretty_backtrace` in your config yaml)"
372        );
373    }
374    #[cfg(feature = "with-db")]
375    let db = db::connect(&config.database).await?;
376
377    let mailer = if let Some(cfg) = config.mailer.as_ref() {
378        create_mailer(cfg)?
379    } else {
380        None
381    };
382
383    let queue_provider = bgworker::create_queue_provider(&config).await?;
384    let ctx = AppContext {
385        environment: environment.clone(),
386        #[cfg(feature = "with-db")]
387        db,
388        queue_provider,
389        storage: Storage::single(storage::drivers::null::new()).into(),
390        cache: cache::create_cache_provider(&config).await?,
391        config,
392        mailer,
393        shared_store: Arc::new(crate::app::SharedStore::default()),
394    };
395
396    H::after_context(ctx).await
397}
398
399#[cfg(feature = "with-db")]
400/// Creates an application based on the specified mode and environment.
401///
402/// # Errors
403///
404/// When could not create the application
405pub async fn create_app<H: Hooks, M: MigratorTrait>(
406    mode: StartMode,
407    environment: &Environment,
408    config: Config,
409) -> Result<BootResult> {
410    let app_context = create_context::<H>(environment, config).await?;
411    db::converge::<H, M>(&app_context, &app_context.config.database).await?;
412
413    if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
414        bgworker::converge(queue, config).await?;
415    }
416
417    run_app::<H>(&mode, app_context).await
418}
419
420#[cfg(not(feature = "with-db"))]
421pub async fn create_app<H: Hooks>(
422    mode: StartMode,
423    environment: &Environment,
424    config: Config,
425) -> Result<BootResult> {
426    let app_context = create_context::<H>(environment, config).await?;
427
428    if let (Some(queue), Some(config)) = (&app_context.queue_provider, &app_context.config.queue) {
429        bgworker::converge(queue, config).await?;
430    }
431
432    run_app::<H>(&mode, app_context).await
433}
434
435/// Run the application with the  given mode
436/// # Errors
437///
438/// When could not create the application
439pub async fn run_app<H: Hooks>(mode: &StartMode, app_context: AppContext) -> Result<BootResult> {
440    H::before_run(&app_context).await?;
441    let initializers = H::initializers(&app_context).await?;
442
443    info!(
444        initializers = ?initializers.iter().map(|init| init.name()).collect::<Vec<_>>().join(","),
445        "initializers loaded"
446    );
447
448    for initializer in &initializers {
449        initializer.before_run(&app_context).await?;
450    }
451
452    match mode {
453        StartMode::ServerOnly => {
454            let router = setup_routes::<H>(&app_context, &initializers).await?;
455            Ok(BootResult {
456                app_context,
457                router: Some(router),
458                worker: None,
459                run_scheduler: false,
460            })
461        }
462        StartMode::ServerAndWorker => {
463            register_workers::<H>(&app_context).await?;
464            let router = setup_routes::<H>(&app_context, &initializers).await?;
465            Ok(BootResult {
466                app_context,
467                router: Some(router),
468                worker: Some(vec![]),
469                run_scheduler: false,
470            })
471        }
472        StartMode::All => {
473            register_workers::<H>(&app_context).await?;
474            let router = setup_routes::<H>(&app_context, &initializers).await?;
475            Ok(BootResult {
476                app_context,
477                router: Some(router),
478                worker: Some(vec![]),
479                run_scheduler: true,
480            })
481        }
482        StartMode::WorkerOnly { tags } => {
483            register_workers::<H>(&app_context).await?;
484            Ok(BootResult {
485                app_context,
486                router: None,
487                worker: Some(tags.clone()),
488                run_scheduler: false,
489            })
490        }
491    }
492}
493
494/// Sets up the application's routes based on the provided initializers and hooks.
495async fn setup_routes<H: Hooks>(
496    app_context: &AppContext,
497    initializers: &[Box<dyn Initializer>],
498) -> Result<Router> {
499    let app = H::before_routes(app_context).await?;
500    let app = H::routes(app_context).to_router::<H>(app_context.clone(), app)?;
501    let mut router = H::after_routes(app, app_context).await?;
502
503    for initializer in initializers {
504        router = initializer.after_routes(router, app_context).await?;
505    }
506
507    Ok(router)
508}
509
510async fn register_workers<H: Hooks>(app_context: &AppContext) -> Result<()> {
511    if app_context.config.workers.mode == WorkerMode::BackgroundQueue {
512        if let Some(queue) = &app_context.queue_provider {
513            queue.register(MailerWorker::build(app_context)).await?;
514            H::connect_workers(app_context, queue).await?;
515        } else {
516            return Err(Error::QueueProviderMissing);
517        }
518
519        debug!("done registering workers and queues");
520    }
521    Ok(())
522}
523
524#[must_use]
525pub fn list_endpoints<H: Hooks>(ctx: &AppContext) -> Vec<ListRoutes> {
526    H::routes(ctx).collect()
527}
528
529/// Waits for a shutdown signal, either via Ctrl+C or termination signal.
530///
531/// # Panics
532///
533/// This function will panic if it fails to install the signal handlers for
534/// Ctrl+C or the terminate signal on Unix-based systems.
535pub async fn shutdown_signal() {
536    let ctrl_c = async {
537        signal::ctrl_c()
538            .await
539            .expect("failed to install Ctrl+C handler");
540    };
541
542    #[cfg(unix)]
543    let terminate = async {
544        signal::unix::signal(signal::unix::SignalKind::terminate())
545            .expect("failed to install signal handler")
546            .recv()
547            .await;
548    };
549
550    #[cfg(not(unix))]
551    let terminate = std::future::pending::<()>();
552
553    tokio::select! {
554        () = ctrl_c => {},
555        () = terminate => {},
556    }
557}
558
559pub struct MiddlewareInfo {
560    pub id: String,
561    pub enabled: bool,
562    pub detail: String,
563}
564
565#[must_use]
566pub fn list_middlewares<H: Hooks>(ctx: &AppContext) -> Vec<MiddlewareInfo> {
567    H::middlewares(ctx)
568        .iter()
569        .map(|m| MiddlewareInfo {
570            id: m.name().to_string(),
571            enabled: m.is_enabled(),
572            detail: m.config().unwrap_or_default().to_string(),
573        })
574        .collect::<Vec<_>>()
575}
576
577/// Initializes an [`EmailSender`] based on the mailer configuration settings
578/// ([`config::Mailer`]).
579fn create_mailer(config: &config::Mailer) -> Result<Option<EmailSender>> {
580    if config.stub {
581        return Ok(Some(EmailSender::stub()));
582    }
583    if let Some(smtp) = config.smtp.as_ref() {
584        if smtp.enable {
585            return Ok(Some(EmailSender::smtp(smtp)?));
586        }
587    }
588    Ok(None)
589}