next_web_dev/application/
application.rs

1use async_trait::async_trait;
2use axum::body::Bytes;
3use axum::http::{Response, StatusCode};
4use axum::Router;
5use hashbrown::HashMap;
6use http_body_util::Full;
7use next_web_core::context::application_context::ApplicationContext;
8use next_web_core::context::properties::{ApplicationProperties, Properties};
9use next_web_core::core::router::ApplyRouter;
10use next_web_core::AutoRegister;
11use rust_embed_for_web::{EmbedableFile, RustEmbed};
12use std::io::BufRead;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tower_http::catch_panic::CatchPanicLayer;
16use tower_http::cors::CorsLayer;
17use tower_http::limit::RequestBodyLimitLayer;
18use tower_http::timeout::TimeoutLayer;
19use tower_http::trace::TraceLayer;
20use tracing::{error, info, warn};
21
22use crate::application::application_shutdown::ApplicationShutdown;
23use crate::application::next_application::NextApplication;
24
25use crate::autoregister::register_single::ApplicationDefaultRegisterContainer;
26use crate::event::application_event_multicaster::ApplicationEventMulticaster;
27use crate::event::application_listener::ApplicationListener;
28
29use crate::autoconfigure::context::next_properties::NextProperties;
30use crate::banner::top_banner::{TopBanner, DEFAULT_TOP_BANNER};
31use crate::event::default_application_event_multicaster::DefaultApplicationEventMulticaster;
32use crate::event::default_application_event_publisher::DefaultApplicationEventPublisher;
33use crate::router::open_router::OpenRouter;
34use crate::router::private_router::PrivateRouter;
35use crate::util::date_time_util::LocalDateTimeUtil;
36use crate::util::file_util::FileUtil;
37
38use next_web_core::context::application_resources::ApplicationResources;
39
40#[cfg(feature = "job_scheduler")]
41use crate::manager::job_scheduler_manager::{ApplicationJob, JobSchedulerManager};
42
43#[cfg(feature = "redis_enabled")]
44use crate::event::redis_expired_event::RedisExpiredEvent;
45#[cfg(feature = "redis_enabled")]
46use crate::manager::redis_manager::RedisManager;
47
48pub const APPLICATION_BANNER_NAME: &str = "banner.txt";
49pub const APPLICATION_USER_PERMISSION_RESOURCE: &str = "user_permission_resource.json";
50
51#[async_trait]
52pub trait Application: Send + Sync {
53    /// initialize the middleware.
54    async fn init_middleware(&mut self, properties: &ApplicationProperties);
55
56    // get the application router. (open api  and private api)
57    async fn application_router(
58        &mut self,
59        ctx: &mut ApplicationContext,
60    ) -> (OpenRouter, PrivateRouter);
61
62    /// register the rpc server.
63    #[cfg(feature = "grpc_enabled")]
64    async fn register_rpc_server(&mut self, properties: &ApplicationProperties);
65
66    /// register the grpc client.
67    #[cfg(feature = "grpc_enabled")]
68    async fn connect_rpc_client(&mut self, properties: &ApplicationProperties);
69
70    /// show the banner of the application.
71    fn banner_show() {
72        if let Some(content) = ApplicationResources::get(APPLICATION_BANNER_NAME) {
73            TopBanner::show(std::str::from_utf8(&content.data()).unwrap_or(DEFAULT_TOP_BANNER));
74        } else {
75            TopBanner::show(DEFAULT_TOP_BANNER);
76        }
77    }
78
79    /// initialize the message source.
80    async fn init_message_source<T>(
81        &mut self,
82        application_properties: &NextProperties,
83    ) -> HashMap<String, HashMap<String, String>> {
84        // message source
85        let mut messages = HashMap::new();
86        if let Some(message_source) = application_properties.messages() {
87            let mut load_local_message = |name: &str| {
88                if let Some(dyn_file) = ApplicationResources::get(&format!("messages/{}", &name)) {
89                    let data = dyn_file.data();
90                    if !data.is_empty() {
91                        let mut map = HashMap::new();
92                        let _ = data.lines().map(|var| {
93                            var.map(|var1| {
94                                let var2: Vec<&str> = var1.split("=").collect();
95                                if var2.len() == 2 {
96                                    let key = var2.get(0).unwrap();
97                                    let value = var2.get(1).unwrap();
98                                    map.insert(key.to_string(), value.to_string());
99                                }
100                            })
101                        });
102                        messages.insert(name.to_string(), map);
103                    }
104                }
105            };
106
107            // load default messages
108            load_local_message(&"messages.properties");
109
110            message_source.local().map(|item| {
111                item.trim_end().split(",").for_each(|s| {
112                    let name = format!("messages_{}.properties", s);
113                    load_local_message(&name);
114                });
115            });
116        }
117        messages
118    }
119
120    #[cfg(feature = "user_security")]
121    fn user_permission_resource(
122        &self,
123    ) -> Option<crate::security::user_permission_resource::UserPermissionResource> {
124        use crate::security::user_permission_resource::UserPermissionResourceBuilder;
125
126        let path = PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap())
127            .join(APPLICATION_USER_PERMISSION_RESOURCE)
128            .display()
129            .to_string();
130        if let Ok(content) = FileUtil::read_file_to_string(&path) {
131            if content.is_empty() {
132                return None;
133            }
134            let user_permission_resource: Vec<UserPermissionResourceBuilder> =
135                serde_json::from_str(&content).unwrap();
136            if user_permission_resource.is_empty() {
137                return None;
138            }
139            return Some(user_permission_resource.into());
140        }
141        None
142    }
143
144    /// initialize the logger.
145    fn init_logger(&self, application_properties: &ApplicationProperties) {
146        let application_name = application_properties
147            .next()
148            .appliation()
149            .map(|app| app.name())
150            .unwrap_or_default();
151        let file_appender = application_properties.next().logger().map_or_else(
152            || None,
153            |logger| {
154                // logger enable?
155                if logger.enable() {
156                    let path = logger.log_dir().unwrap_or_else(|| "./logs");
157                    let log_name = format!(
158                        "{}{}.log",
159                        application_name,
160                        if logger.additional_date() {
161                            format!("-{}", LocalDateTimeUtil::date())
162                        } else {
163                            String::new()
164                        }
165                    );
166                    return Some(tracing_appender::rolling::daily(path, log_name));
167                }
168                None
169            },
170        );
171
172        let config = tracing_subscriber::fmt::format()
173            .with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
174                "%Y-%m-%d %H:%M:%S%.3f".to_string(),
175            ))
176            .with_level(true)
177            .with_target(true)
178            .with_line_number(true)
179            .with_thread_ids(true)
180            .with_file(true)
181            .with_thread_names(true);
182
183        // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
184
185        let logger = tracing_subscriber::fmt()
186            // test
187            .with_max_level(tracing::Level::INFO)
188            .with_ansi(false)
189            .event_format(config);
190
191        if let Some(file_appender) = file_appender {
192            let (non_blocking, _worker) = tracing_appender::non_blocking(file_appender);
193            logger.with_writer(non_blocking).with_test_writer().init();
194        } else {
195            logger.init();
196        }
197    }
198
199    /// Autowire properties
200    async fn autowire_properties(
201        &mut self,
202        ctx: &mut ApplicationContext,
203        application_properties: &ApplicationProperties,
204    ) {
205        let properties = ctx.resolve_by_type::<Box<dyn Properties>>();
206        for item in properties {
207            item.register(ctx, application_properties).await.unwrap();
208        }
209    }
210
211    /// Register application singleton
212    async fn register_singleton(
213        &self,
214        ctx: &mut ApplicationContext,
215        application_properties: &ApplicationProperties,
216    ) {
217        // Register application singleton
218        let mut container = ApplicationDefaultRegisterContainer::new();
219        container.register_all(ctx, application_properties).await;
220
221        // Resove AutoRegister
222        let auto_register = ctx.resolve_by_type::<Box<dyn AutoRegister>>();
223        for item in auto_register.iter() {
224            item.register(ctx, application_properties).await.unwrap();
225        }
226    }
227
228    /// initialize the application infrastructure
229    async fn init_infrastructure(
230        &self,
231        ctx: &mut ApplicationContext,
232        _application_properties: &ApplicationProperties,
233    ) {
234        println!("\n========================================================================");
235
236        // Register job
237        let producers = ctx.resolve_by_type::<Arc<dyn ApplicationJob>>();
238        if let Some(schedluer_manager) =
239            ctx.get_single_option_with_name::<JobSchedulerManager>("jobSchedulerManager")
240        {
241            let mut schedluer_manager = schedluer_manager.clone();
242            for producer in producers {
243                schedluer_manager.add_job(producer.gen_job()).await;
244            }
245            schedluer_manager.start();
246        } else {
247            warn!("Job scheduler manager not found");
248        }
249
250        // Register redis expired event
251        #[cfg(feature = "redis_enabled")]
252        if let Some(redis_manager) = ctx.resolve_option_with_name::<RedisManager>("redisManager") {
253            if let Some(handle) =
254                ctx.resolve_option::<Arc<tokio::sync::Mutex<dyn RedisExpiredEvent>>>()
255            {
256                let _ = redis_manager
257                    .expired_event(handle)
258                    .await
259                    .map(|_| info!("Redis expired event listen success!"));
260            }
261        }
262
263        // Register application event
264        let (tx, rx) = flume::unbounded();
265        let mut default_event_publisher = DefaultApplicationEventPublisher::new();
266        let mut multicaster = DefaultApplicationEventMulticaster::new();
267
268        default_event_publisher.set_channel(Some(tx));
269        multicaster.set_event_channel(rx);
270
271        let listeners = ctx.resolve_by_type::<Box<dyn ApplicationListener>>();
272        listeners.into_iter().for_each(|listener| {
273            multicaster.add_application_listener(listener);
274        });
275
276        multicaster.run();
277
278        ctx.insert_singleton_with_name(default_event_publisher, "");
279        ctx.insert_singleton_with_name(multicaster, "");
280
281        println!("========================================================================\n");
282    }
283
284    /// bind tcp server.
285    async fn bind_tcp_server(
286        &mut self,
287        ctx: &mut ApplicationContext,
288        application_properties: &ApplicationProperties,
289        time: std::time::Instant,
290    ) {
291        let config = application_properties.next().server();
292
293        let (open_router, private_router) = self.application_router(ctx).await;
294        // run our app with hyper, listening globally on port
295        let mut app = Router::new()
296            .route("/", axum::routing::get(root))
297            .merge(Router::new().nest("/open", open_router.0))
298            // handle not found route
299            .fallback(fall_back);
300
301        // add prometheus layer
302        #[cfg(feature = "prometheus_enabled")]
303        {
304            let (prometheus_layer, metric_handle) = axum_prometheus::PrometheusMetricLayer::pair();
305            app = app
306                .route(
307                    "/metrics",
308                    axum::routing::get(|| async move { metric_handle.render() }),
309                )
310                .layer(prometheus_layer);
311        }
312
313        let mut router = private_router.0;
314        // set layer
315        if let Some(http) = config.http() {
316            let val = http
317                .request()
318                .map(|req| {
319                    let var1 = req.trace();
320                    let var2 = req.max_request_size().unwrap_or_default();
321                    (var1, var2)
322                })
323                .unwrap_or_default();
324            let _ = http.response();
325            if val.0 {
326                router = router.layer(TraceLayer::new_for_http());
327            }
328            // 3MB
329            if val.1 >= 3145728 {
330                router = router.route_layer(RequestBodyLimitLayer::new(val.1));
331            }
332        }
333
334        router = router
335            // global panic handler
336            .layer(CatchPanicLayer::custom(handle_panic))
337            // handler request  max timeout
338            .layer(TimeoutLayer::new(std::time::Duration::from_secs(5)))
339            // cors  pass -> anyeventing request
340            .layer(
341                CorsLayer::new()
342                    .allow_origin(tower_http::cors::Any)
343                    .allow_methods(tower_http::cors::Any)
344                    .allow_headers(tower_http::cors::Any)
345                    .max_age(std::time::Duration::from_secs(60) * 10),
346            );
347        // #[cfg(feature = "user_security")]
348        // router = router.layer(axum::middleware::from_fn_with_state(crate::security::request_auth_middleware));
349        if !config.context_path().is_empty() {
350            app = app.nest(config.context_path(), router);
351        } else {
352            app = app.merge(router);
353        }
354
355        // apply router
356        let routers = ctx.resolve_by_type::<Box<dyn ApplyRouter>>();
357        for item in routers.into_iter() {
358            app = app.merge(item.router(ctx));
359        }
360
361        println!(
362            "\napplication listening on: [{}]",
363            format!("0.0.0.0:{}", config.port())
364        );
365
366        println!("application start time: {:?}", time.elapsed());
367
368        // configure certificate and private key used by https
369        #[cfg(feature = "tls_rustls")]
370        {
371            let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
372                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
373                    .join("self_signed_certs")
374                    .join("cert.pem"),
375                std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
376                    .join("self_signed_certs")
377                    .join("key.pem"),
378            )
379            .await
380            .unwrap();
381            let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port()));
382            let mut server = axum_server::bind_rustls(addr, tls_config);
383            // IMPORTANT: This is required to advertise our support for HTTP/2 websockets to the client.
384            // If you use axum::serve, it is enabled by default.
385            server.http_builder().http2().enable_connect_protocol();
386            server.serve(app.into_make_service()).await.unwrap();
387        }
388
389        #[cfg(not(feature = "tls_rustls"))]
390        {
391            // run http server
392            let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port()))
393                .await
394                .unwrap();
395
396            let shutdowns = ctx.resolve_by_type::<Box<dyn ApplicationShutdown>>();
397            axum::serve(listener, app.into_make_service())
398                .with_graceful_shutdown(async move {
399                    let ctrl_c = async {
400                        tokio::signal::ctrl_c()
401                            .await
402                            .expect("failed to install Ctrl+C handler");
403                    };
404
405                    #[cfg(unix)]
406                    let terminate = async {
407                        signal::unix::signal(signal::unix::SignalKind::terminate())
408                            .expect("failed to install signal handler")
409                            .recv()
410                            .await;
411                    };
412
413                    #[cfg(not(unix))]
414                    let terminate = std::future::pending::<()>();
415
416                    tokio::select! {
417                        _ = ctrl_c =>  {
418                            info!("Received Ctrl+C. Shutting down...");
419                            for service in shutdowns.iter() {
420                                service.shutdown().await;
421                            }
422
423                        },
424                        _ = terminate =>  {
425                            info!("Received terminate signal. Shutting down...");
426                            for service in shutdowns.iter() {
427                                service.shutdown().await;
428                            }
429                        },
430                    }
431                })
432                .await
433                .unwrap();
434        }
435    }
436
437    /// run the application.
438    async fn run()
439    where
440        Self: Application + Default,
441    {
442        // record application start time
443        let start_time = std::time::Instant::now();
444
445        // banner show
446        Self::banner_show();
447
448        // get a base application instance
449        let mut next_application: NextApplication<Self> = NextApplication::new();
450        let properties = next_application.application_properties().clone();
451
452        let application = next_application.application();
453
454        application.init_logger(&properties);
455        info!("init logger success");
456
457        let mut ctx = ApplicationContext::options()
458            .allow_override(true)
459            .auto_register();
460        info!("init application context success");
461
462        // autowire properties
463        application.autowire_properties(&mut ctx, &properties).await;
464        info!("autowire properties success");
465
466        // register singleton
467        application.register_singleton(&mut ctx, &properties).await;
468        info!("register singleton success");
469
470        // init infrastructure
471        application.init_infrastructure(&mut ctx, &properties).await;
472        info!("init infrastructure success");
473
474        // init middleware
475        application.init_middleware(&properties).await;
476        info!("init middleware success");
477
478        #[cfg(feature = "grpc_enabled")]
479        {
480            application.register_rpc_server(&properties).await;
481            info!("register grpc server success");
482
483            application.connect_rpc_client(&properties).await;
484            info!("connect grpc client success");
485        }
486
487        //
488        // application.init_cache().await;
489        application
490            .bind_tcp_server(&mut ctx, &properties, start_time)
491            .await;
492    }
493}
494
495fn handle_panic(err: Box<dyn std::any::Any + Send + 'static>) -> Response<Full<Bytes>> {
496    error!("Http server handle panic: {:?}", err);
497    let err_str = r#"
498{
499    "status": 500,
500    "message": "Internal Server Error",
501    "data": null
502}"#;
503    Response::builder()
504        .status(StatusCode::INTERNAL_SERVER_ERROR)
505        .header("content-type", "application/json")
506        .body(Full::from(err_str))
507        .unwrap()
508}
509
510/// no route match handler
511async fn fall_back() -> (StatusCode, &'static str) {
512    (StatusCode::NOT_FOUND, "Not Found Route")
513}
514
515/// basic handler that responds with a static string
516async fn root() -> axum::response::Html<&'static str> {
517    axum::response::Html("<html><body><h1>Welcome to Rust Web</h1></body></html>")
518}