next_web_dev/application/
application.rs

1use async_trait::async_trait;
2use axum::body::Bytes;
3use axum::http::{Response, StatusCode};
4use axum::Router;
5use hashbrown::HashMap;
6use http_body_util::Full;
7use next_web_core::constants::application_constants::APPLICATION_BANNER_FILE;
8use next_web_core::context::application_args::ApplicationArgs;
9use next_web_core::context::application_context::ApplicationContext;
10use next_web_core::context::properties::{ApplicationProperties, Properties};
11use next_web_core::core::apply_router::ApplyRouter;
12use next_web_core::AutoRegister;
13use rust_embed_for_web::{EmbedableFile, RustEmbed};
14use std::io::BufRead;
15// use std::path::PathBuf;
16use std::sync::Arc;
17use tower_http::catch_panic::CatchPanicLayer;
18use tower_http::cors::CorsLayer;
19use tower_http::limit::RequestBodyLimitLayer;
20use tower_http::timeout::TimeoutLayer;
21use tower_http::trace::TraceLayer;
22use tracing::{error, info, warn};
23
24use crate::application::application_shutdown::ApplicationShutdown;
25use crate::application::next_application::NextApplication;
26
27use crate::autoregister::register_single::ApplicationDefaultRegisterContainer;
28use crate::event::application_event_multicaster::ApplicationEventMulticaster;
29use crate::event::application_listener::ApplicationListener;
30
31use crate::autoconfigure::context::next_properties::NextProperties;
32use crate::banner::top_banner::{TopBanner, DEFAULT_TOP_BANNER};
33use crate::event::default_application_event_multicaster::DefaultApplicationEventMulticaster;
34use crate::event::default_application_event_publisher::DefaultApplicationEventPublisher;
35use crate::util::date_time_util::LocalDateTime;
36
37use next_web_core::context::application_resources::ApplicationResources;
38
39#[cfg(feature = "job_scheduler")]
40use crate::manager::job_scheduler_manager::{ApplicationJob, JobSchedulerManager};
41
42#[async_trait]
43pub trait Application: Send + Sync {
44    /// Initialize the middleware.
45    async fn init_middleware(&mut self, properties: &ApplicationProperties);
46
47    // Get the application router. (open api  and private api)
48    async fn application_router(&mut self, ctx: &mut ApplicationContext) -> axum::Router;
49
50    /// Register the rpc server.
51    #[cfg(feature = "enable_grpc")]
52    async fn register_rpc_server(&mut self, properties: &ApplicationProperties);
53
54    /// Register the grpc client.
55    #[cfg(feature = "enable_grpc")]
56    async fn connect_rpc_client(&mut self, properties: &ApplicationProperties);
57
58    /// Show the banner of the application.
59    fn banner_show() {
60        if let Some(content) = ApplicationResources::get(APPLICATION_BANNER_FILE) {
61            TopBanner::show(std::str::from_utf8(&content.data()).unwrap_or(DEFAULT_TOP_BANNER));
62        } else {
63            TopBanner::show(DEFAULT_TOP_BANNER);
64        }
65    }
66
67    /// Initialize the message source.
68    async fn init_message_source<T>(
69        &mut self,
70        application_properties: &NextProperties,
71    ) -> HashMap<String, HashMap<String, String>> {
72        let mut messages = HashMap::new();
73        if let Some(message_source) = application_properties.messages() {
74            let mut load_local_message = |name: &str| {
75                if let Some(dyn_file) = ApplicationResources::get(&format!("messages/{}", &name)) {
76                    let data = dyn_file.data();
77                    if !data.is_empty() {
78                        let mut map = HashMap::new();
79                        let _ = data.lines().map(|var| {
80                            var.map(|var1| {
81                                let var2: Vec<&str> = var1.split("=").collect();
82                                if var2.len() == 2 {
83                                    let key = var2.get(0).unwrap();
84                                    let value = var2.get(1).unwrap();
85                                    map.insert(key.to_string(), value.to_string());
86                                }
87                            })
88                        });
89                        messages.insert(name.to_string(), map);
90                    }
91                }
92            };
93
94            // load default messages
95            load_local_message(&"messages.properties");
96
97            message_source.local().map(|item| {
98                item.trim_end().split(",").for_each(|s| {
99                    let name = format!("messages_{}.properties", s);
100                    load_local_message(&name);
101                });
102            });
103        }
104        messages
105    }
106
107    /// Initialize the logger.
108    fn init_logger(&self, application_properties: &ApplicationProperties) {
109        let application_name = application_properties
110            .next()
111            .appliation()
112            .map(|app| app.name())
113            .unwrap_or_default();
114        let file_appender = application_properties.next().logger().map_or_else(
115            || None,
116            |logger| {
117                // logger enable?
118                if logger.enable() {
119                    let path = logger.log_dir().unwrap_or_else(|| "./logs");
120                    let log_name = format!(
121                        "{}{}.log",
122                        application_name,
123                        if logger.additional_date() {
124                            format!("-{}", LocalDateTime::date())
125                        } else {
126                            String::new()
127                        }
128                    );
129                    return Some(tracing_appender::rolling::daily(path, log_name));
130                }
131                None
132            },
133        );
134
135        let config = tracing_subscriber::fmt::format()
136            .with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
137                "%Y-%m-%d %H:%M:%S%.3f".to_string(),
138            ))
139            .with_level(true)
140            .with_target(true)
141            .with_line_number(true)
142            .with_thread_ids(true)
143            .with_file(true)
144            .with_thread_names(true);
145
146        // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
147
148        let logger = tracing_subscriber::fmt()
149            // test
150            .with_max_level(tracing::Level::INFO)
151            .with_ansi(false)
152            .event_format(config);
153
154        if let Some(file_appender) = file_appender {
155            let (non_blocking, _worker) = tracing_appender::non_blocking(file_appender);
156            logger.with_writer(non_blocking).with_test_writer().init();
157        } else {
158            logger.init();
159        }
160    }
161
162    /// Autowire properties
163    async fn autowire_properties(
164        &mut self,
165        ctx: &mut ApplicationContext,
166        application_properties: &ApplicationProperties,
167    ) {
168        let properties = ctx.resolve_by_type::<Box<dyn Properties>>();
169        for item in properties {
170            item.register(ctx, application_properties).await.unwrap();
171        }
172    }
173
174    /// Register application singleton
175    async fn register_singleton(
176        &self,
177        ctx: &mut ApplicationContext,
178        application_properties: &ApplicationProperties,
179        application_args: &ApplicationArgs,
180    ) {
181        // Register application singleton
182        let mut container = ApplicationDefaultRegisterContainer::new();
183        container.register_all(ctx, application_properties).await;
184
185        // register singletion with properties and args
186        ctx.insert_singleton_with_name(application_properties.clone(), "");
187        ctx.insert_singleton_with_name(application_args.clone(), "");
188
189        // Resove autoRegister
190        let auto_register = ctx.resolve_by_type::<Arc<dyn AutoRegister>>();
191        for item in auto_register.iter() {
192            item.register(ctx, application_properties).await.unwrap();
193        }
194    }
195
196    /// Initialize the application infrastructure
197    async fn init_infrastructure(
198        &self,
199        ctx: &mut ApplicationContext,
200        _application_properties: &ApplicationProperties,
201    ) {
202        // Register job
203        let producers = ctx.resolve_by_type::<Arc<dyn ApplicationJob>>();
204        if let Some(schedluer_manager) =
205            ctx.get_single_option_with_name::<JobSchedulerManager>("jobSchedulerManager")
206        {
207            let mut schedluer_manager = schedluer_manager.clone();
208            for producer in producers {
209                schedluer_manager.add_job(producer.gen_job()).await;
210            }
211            schedluer_manager.start();
212        } else {
213            warn!("Job scheduler manager not found");
214        }
215
216        // Register application event
217        let (tx, rx) = flume::unbounded();
218        let mut default_event_publisher = DefaultApplicationEventPublisher::new();
219        let mut multicaster = DefaultApplicationEventMulticaster::new();
220
221        default_event_publisher.set_channel(Some(tx));
222        multicaster.set_event_channel(rx);
223
224        let listeners = ctx.resolve_by_type::<Box<dyn ApplicationListener>>();
225        listeners.into_iter().for_each(|listener| {
226            multicaster.add_application_listener(listener);
227        });
228
229        multicaster.run();
230
231        ctx.insert_singleton_with_name(default_event_publisher, "");
232        ctx.insert_singleton_with_name(multicaster, "");
233    }
234
235    /// Bind tcp server.
236    async fn bind_tcp_server(
237        &mut self,
238        ctx: &mut ApplicationContext,
239        application_properties: &ApplicationProperties,
240        time: std::time::Instant,
241    ) {
242        let config = application_properties.next().server();
243
244        let context_path = config.context_path().unwrap_or("");
245        let server_port = config.port().unwrap_or(11011);
246        let server_addr = if config.local().unwrap_or(false) {
247            "127.0.0.1"
248        } else {
249            "0.0.0.0"
250        };
251
252        let mut application_router = self.application_router(ctx).await;
253
254        // Run our app with hyper, listening globally on port
255        let mut app = Router::new()
256            // handle not found route
257            .fallback(fall_back);
258
259        // Add prometheus layer
260        #[cfg(feature = "enable_prometheus")]
261        {
262            let (prometheus_layer, metric_handle) = axum_prometheus::PrometheusMetricLayer::pair();
263            app = app
264                .route(
265                    "/metrics",
266                    axum::routing::get(|| async move { metric_handle.render() }),
267                )
268                .layer(prometheus_layer);
269        }
270
271        // set layer
272        if let Some(http) = config.http() {
273            let val = http
274                .request()
275                .map(|req| {
276                    let var1 = req.trace();
277                    let var2 = req.max_request_size().unwrap_or_default();
278                    (var1, var2)
279                })
280                .unwrap_or_default();
281            let _ = http.response();
282            if val.0 {
283                application_router = application_router.route_layer(TraceLayer::new_for_http());
284            }
285            // 3MB
286            if val.1 >= 3145728 {
287                application_router =
288                    application_router.route_layer(RequestBodyLimitLayer::new(val.1));
289            }
290        }
291
292        application_router = application_router
293            // global panic handler
294            .layer(CatchPanicLayer::custom(handle_panic))
295            // handler request  max timeout
296            .layer(TimeoutLayer::new(std::time::Duration::from_secs(5)))
297            // cors  pass -> anyeventing request
298            .layer(
299                CorsLayer::new()
300                    .allow_origin(tower_http::cors::Any)
301                    .allow_methods(tower_http::cors::Any)
302                    .allow_headers(tower_http::cors::Any)
303                    .max_age(std::time::Duration::from_secs(60) * 10),
304            );
305
306        if !context_path.is_empty() {
307            app = app.nest(context_path, application_router);
308        } else {
309            app = app.merge(application_router);
310        }
311
312        // apply router
313        let routers = ctx.resolve_by_type::<Box<dyn ApplyRouter>>();
314
315        let (mut open_routers, mut common_routers): (Vec<_>, Vec<_>) =
316            routers.into_iter().partition(|item| item.open());
317
318        let var10 = Router::new();
319
320        open_routers.sort_by_key(|k| k.order());
321        let open_router = open_routers
322            .into_iter()
323            .map(|item| item.router(ctx))
324            .filter(|item1| item1.has_routes())
325            .fold(var10, |acc, router| {
326                if !context_path.is_empty() {
327                    acc.nest(context_path, router)
328                } else {
329                    acc.merge(router)
330                }
331            });
332
333        app = app.merge(open_router);
334
335        common_routers.sort_by_key(|k| k.order());
336        for item2 in common_routers
337            .into_iter()
338            .map(|item| item.router(ctx))
339            .filter(|item1| item1.has_routes())
340        {
341            if !context_path.is_empty() {
342                app = app.nest(context_path, item2);
343                continue;
344            }
345            app = app.merge(item2);
346        }
347
348        println!(
349            "\nApplication Listening  on:  {}",
350            format!("{}:{}", server_addr, server_port)
351        );
352
353        println!("Application Running    on:  {}", LocalDateTime::now());
354        println!("Application StartTime  on:  {:?}", time.elapsed());
355        println!("Application CurrentPid on:  {:?}\n", std::process::id());
356
357        // configure certificate and private key used by https
358        #[cfg(feature = "tls_rustls")]
359        {
360            let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
361                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
362                    .join("self_signed_certs")
363                    .join("cert.pem"),
364                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
365                    .join("self_signed_certs")
366                    .join("key.pem"),
367            )
368            .await
369            .unwrap();
370            let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port()));
371            let mut server = axum_server::bind_rustls(addr, tls_config);
372            // IMPORTANT: This is required to advertise our support for HTTP/2 websockets to the client.
373            // If you use axum::serve, it is enabled by default.
374            server.http_builder().http2().enable_connect_protocol();
375            server.serve(app.into_make_service()).await.unwrap();
376        }
377
378        #[cfg(not(feature = "tls_rustls"))]
379        {
380            // run http server
381            let listener =
382                tokio::net::TcpListener::bind(format!("{}:{}", server_addr, server_port))
383                    .await
384                    .unwrap();
385
386            let mut shutdowns = ctx.resolve_by_type::<Box<dyn ApplicationShutdown>>();
387            axum::serve(
388                listener,
389                app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
390            )
391            .with_graceful_shutdown(async move {
392                let ctrl_c = async {
393                    tokio::signal::ctrl_c()
394                        .await
395                        .expect("failed to install Ctrl+C handler");
396                };
397
398                #[cfg(unix)]
399                let terminate = async {
400                    signal::unix::signal(signal::unix::SignalKind::terminate())
401                        .expect("failed to install signal handler")
402                        .recv()
403                        .await;
404                };
405
406                #[cfg(not(unix))]
407                let terminate = std::future::pending::<()>();
408
409                tokio::select! {
410                    _ = ctrl_c =>  {
411                        info!("Received Ctrl+C. Shutting down...");
412                        for service in shutdowns.iter_mut() {
413                            service.shutdown().await;
414                        }
415                    },
416                    _ = terminate =>  {
417                        info!("Received terminate signal. Shutting down...");
418                        for service in shutdowns.iter_mut() {
419                            service.shutdown().await;
420                        }
421                    },
422                }
423            })
424            .await
425            .unwrap();
426        }
427    }
428
429    /// Run the application.
430    async fn run()
431    where
432        Self: Application + Default,
433    {
434        // Record application start time
435        let start_time = std::time::Instant::now();
436
437        // Banner show
438        Self::banner_show();
439
440        // Get a base application instance
441        let mut next_application: NextApplication<Self> = NextApplication::new();
442        let properties = next_application.application_properties().clone();
443        let args = next_application.application_args().clone();
444
445        let application = next_application.application();
446
447        println!("========================================================================\n");
448
449        application.init_logger(&properties);
450        println!(
451            "Init logger success!\nCurrent Time: {}\n",
452            LocalDateTime::now()
453        );
454
455        let mut ctx = ApplicationContext::options()
456            .allow_override(false)
457            .auto_register();
458        println!(
459            "Init application context success!\nCurrent Time: {}\n",
460            LocalDateTime::now()
461        );
462
463        // Autowire properties
464        application.autowire_properties(&mut ctx, &properties).await;
465        println!(
466            "Autowire properties success!\nCurrent Time: {}\n",
467            LocalDateTime::now()
468        );
469
470        // Register singleton
471        application
472            .register_singleton(&mut ctx, &properties, &args)
473            .await;
474        println!(
475            "Register singleton success!\nCurrent Time: {}\n",
476            LocalDateTime::now()
477        );
478
479        // Init infrastructure
480        application.init_infrastructure(&mut ctx, &properties).await;
481        println!(
482            "Init infrastructure success!\nCurrent Time: {}\n",
483            LocalDateTime::now()
484        );
485
486        // Init middleware
487        application.init_middleware(&properties).await;
488        println!(
489            "Init middleware success!\nCurrent Time: {}\n",
490            LocalDateTime::now()
491        );
492
493        #[cfg(feature = "enable_grpc")]
494        {
495            application.register_rpc_server(&properties).await;
496            println!(
497                "Register grpc server success!\nCurrent Time: {}\n",
498                LocalDateTime::now()
499            );
500
501            application.connect_rpc_client(&properties).await;
502            println!(
503                "Connect grpc client success!\nCurrent Time: {}\n",
504                LocalDateTime::now()
505            );
506        }
507
508        println!("========================================================================");
509
510        // application.init_cache().await;
511
512        application
513            .bind_tcp_server(&mut ctx, &properties, start_time)
514            .await;
515    }
516}
517
518fn handle_panic(err: Box<dyn std::any::Any + Send + 'static>) -> Response<Full<Bytes>> {
519    error!("Application handle panic, case: {:?}", err);
520    let err_str = format!(
521        "internal server error, case: {:?},\ntimestamp: {}",
522        err,
523        LocalDateTime::timestamp()
524    );
525    Response::builder()
526        .status(StatusCode::INTERNAL_SERVER_ERROR)
527        .body(err_str.into())
528        .unwrap()
529}
530
531/// no route match handler
532async fn fall_back() -> (StatusCode, &'static str) {
533    (StatusCode::NOT_FOUND, "not macth route")
534}