next_web_dev/application/
application.rs1use async_trait::async_trait;
2use axum::body::Bytes;
3use axum::http::{Response, StatusCode};
4use axum::Router;
5use hashbrown::HashMap;
6use http_body_util::Full;
7use next_web_core::context::application_context::ApplicationContext;
8use next_web_core::context::properties::{ApplicationProperties, Properties};
9use next_web_core::core::router::ApplyRouter;
10use next_web_core::AutoRegister;
11use rust_embed_for_web::{EmbedableFile, RustEmbed};
12use std::io::BufRead;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tower_http::catch_panic::CatchPanicLayer;
16use tower_http::cors::CorsLayer;
17use tower_http::limit::RequestBodyLimitLayer;
18use tower_http::timeout::TimeoutLayer;
19use tower_http::trace::TraceLayer;
20use tracing::{error, info, warn};
21
22use crate::application::application_shutdown::ApplicationShutdown;
23use crate::application::next_application::NextApplication;
24
25use crate::autoregister::register_single::ApplicationDefaultRegisterContainer;
26use crate::event::application_event_multicaster::ApplicationEventMulticaster;
27use crate::event::application_listener::ApplicationListener;
28
29use crate::autoconfigure::context::next_properties::NextProperties;
30use crate::banner::top_banner::{TopBanner, DEFAULT_TOP_BANNER};
31use crate::event::default_application_event_multicaster::DefaultApplicationEventMulticaster;
32use crate::event::default_application_event_publisher::DefaultApplicationEventPublisher;
33use crate::router::open_router::OpenRouter;
34use crate::router::private_router::PrivateRouter;
35use crate::util::date_time_util::LocalDateTimeUtil;
36use crate::util::file_util::FileUtil;
37
38use next_web_core::context::application_resources::ApplicationResources;
39
40#[cfg(feature = "job_scheduler")]
41use crate::manager::job_scheduler_manager::{ApplicationJob, JobSchedulerManager};
42
43#[cfg(feature = "redis_enabled")]
44use crate::event::redis_expired_event::RedisExpiredEvent;
45#[cfg(feature = "redis_enabled")]
46use crate::manager::redis_manager::RedisManager;
47
48pub const APPLICATION_BANNER_NAME: &str = "banner.txt";
49pub const APPLICATION_USER_PERMISSION_RESOURCE: &str = "user_permission_resource.json";
50
51#[async_trait]
52pub trait Application: Send + Sync {
53 async fn init_middleware(&mut self, properties: &ApplicationProperties);
55
56 async fn application_router(
58 &mut self,
59 ctx: &mut ApplicationContext,
60 ) -> (OpenRouter, PrivateRouter);
61
62 #[cfg(feature = "grpc_enabled")]
64 async fn register_rpc_server(&mut self, properties: &ApplicationProperties);
65
66 #[cfg(feature = "grpc_enabled")]
68 async fn connect_rpc_client(&mut self, properties: &ApplicationProperties);
69
70 fn banner_show() {
72 if let Some(content) = ApplicationResources::get(APPLICATION_BANNER_NAME) {
73 TopBanner::show(std::str::from_utf8(&content.data()).unwrap_or(DEFAULT_TOP_BANNER));
74 } else {
75 TopBanner::show(DEFAULT_TOP_BANNER);
76 }
77 }
78
79 async fn init_message_source<T>(
81 &mut self,
82 application_properties: &NextProperties,
83 ) -> HashMap<String, HashMap<String, String>> {
84 let mut messages = HashMap::new();
86 if let Some(message_source) = application_properties.messages() {
87 let mut load_local_message = |name: &str| {
88 if let Some(dyn_file) = ApplicationResources::get(&format!("messages/{}", &name)) {
89 let data = dyn_file.data();
90 if !data.is_empty() {
91 let mut map = HashMap::new();
92 let _ = data.lines().map(|var| {
93 var.map(|var1| {
94 let var2: Vec<&str> = var1.split("=").collect();
95 if var2.len() == 2 {
96 let key = var2.get(0).unwrap();
97 let value = var2.get(1).unwrap();
98 map.insert(key.to_string(), value.to_string());
99 }
100 })
101 });
102 messages.insert(name.to_string(), map);
103 }
104 }
105 };
106
107 load_local_message(&"messages.properties");
109
110 message_source.local().map(|item| {
111 item.trim_end().split(",").for_each(|s| {
112 let name = format!("messages_{}.properties", s);
113 load_local_message(&name);
114 });
115 });
116 }
117 messages
118 }
119
120 #[cfg(feature = "user_security")]
121 fn user_permission_resource(
122 &self,
123 ) -> Option<crate::security::user_permission_resource::UserPermissionResource> {
124 use crate::security::user_permission_resource::UserPermissionResourceBuilder;
125
126 let path = PathBuf::from(std::env::var("CARGO_MANIFEST_DIR").unwrap())
127 .join(APPLICATION_USER_PERMISSION_RESOURCE)
128 .display()
129 .to_string();
130 if let Ok(content) = FileUtil::read_file_to_string(&path) {
131 if content.is_empty() {
132 return None;
133 }
134 let user_permission_resource: Vec<UserPermissionResourceBuilder> =
135 serde_json::from_str(&content).unwrap();
136 if user_permission_resource.is_empty() {
137 return None;
138 }
139 return Some(user_permission_resource.into());
140 }
141 None
142 }
143
144 fn init_logger(&self, application_properties: &ApplicationProperties) {
146 let application_name = application_properties
147 .next()
148 .appliation()
149 .map(|app| app.name())
150 .unwrap_or_default();
151 let file_appender = application_properties.next().logger().map_or_else(
152 || None,
153 |logger| {
154 if logger.enable() {
156 let path = logger.log_dir().unwrap_or_else(|| "./logs");
157 let log_name = format!(
158 "{}{}.log",
159 application_name,
160 if logger.additional_date() {
161 format!("-{}", LocalDateTimeUtil::date())
162 } else {
163 String::new()
164 }
165 );
166 return Some(tracing_appender::rolling::daily(path, log_name));
167 }
168 None
169 },
170 );
171
172 let config = tracing_subscriber::fmt::format()
173 .with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
174 "%Y-%m-%d %H:%M:%S%.3f".to_string(),
175 ))
176 .with_level(true)
177 .with_target(true)
178 .with_line_number(true)
179 .with_thread_ids(true)
180 .with_file(true)
181 .with_thread_names(true);
182
183 let logger = tracing_subscriber::fmt()
186 .with_max_level(tracing::Level::INFO)
188 .with_ansi(false)
189 .event_format(config);
190
191 if let Some(file_appender) = file_appender {
192 let (non_blocking, _worker) = tracing_appender::non_blocking(file_appender);
193 logger.with_writer(non_blocking).with_test_writer().init();
194 } else {
195 logger.init();
196 }
197 }
198
199 async fn autowire_properties(
201 &mut self,
202 ctx: &mut ApplicationContext,
203 application_properties: &ApplicationProperties,
204 ) {
205 let properties = ctx.resolve_by_type::<Box<dyn Properties>>();
206 for item in properties {
207 item.register(ctx, application_properties).await.unwrap();
208 }
209 }
210
211 async fn register_singleton(
213 &self,
214 ctx: &mut ApplicationContext,
215 application_properties: &ApplicationProperties,
216 ) {
217 let mut container = ApplicationDefaultRegisterContainer::new();
219 container.register_all(ctx, application_properties).await;
220
221 let auto_register = ctx.resolve_by_type::<Box<dyn AutoRegister>>();
223 for item in auto_register.iter() {
224 item.register(ctx, application_properties).await.unwrap();
225 }
226 }
227
228 async fn init_infrastructure(
230 &self,
231 ctx: &mut ApplicationContext,
232 _application_properties: &ApplicationProperties,
233 ) {
234 println!("\n========================================================================");
235
236 let producers = ctx.resolve_by_type::<Arc<dyn ApplicationJob>>();
238 if let Some(schedluer_manager) =
239 ctx.get_single_option_with_name::<JobSchedulerManager>("jobSchedulerManager")
240 {
241 let mut schedluer_manager = schedluer_manager.clone();
242 for producer in producers {
243 schedluer_manager.add_job(producer.gen_job()).await;
244 }
245 schedluer_manager.start();
246 } else {
247 warn!("Job scheduler manager not found");
248 }
249
250 #[cfg(feature = "redis_enabled")]
252 if let Some(redis_manager) = ctx.resolve_option_with_name::<RedisManager>("redisManager") {
253 if let Some(handle) =
254 ctx.resolve_option::<Arc<tokio::sync::Mutex<dyn RedisExpiredEvent>>>()
255 {
256 let _ = redis_manager
257 .expired_event(handle)
258 .await
259 .map(|_| info!("Redis expired event listen success!"));
260 }
261 }
262
263 let (tx, rx) = flume::unbounded();
265 let mut default_event_publisher = DefaultApplicationEventPublisher::new();
266 let mut multicaster = DefaultApplicationEventMulticaster::new();
267
268 default_event_publisher.set_channel(Some(tx));
269 multicaster.set_event_channel(rx);
270
271 let listeners = ctx.resolve_by_type::<Box<dyn ApplicationListener>>();
272 listeners.into_iter().for_each(|listener| {
273 multicaster.add_application_listener(listener);
274 });
275
276 multicaster.run();
277
278 ctx.insert_singleton_with_name(default_event_publisher, "");
279 ctx.insert_singleton_with_name(multicaster, "");
280
281 println!("========================================================================\n");
282 }
283
284 async fn bind_tcp_server(
286 &mut self,
287 ctx: &mut ApplicationContext,
288 application_properties: &ApplicationProperties,
289 time: std::time::Instant,
290 ) {
291 let config = application_properties.next().server();
292
293 let (open_router, private_router) = self.application_router(ctx).await;
294 let mut app = Router::new()
296 .route("/", axum::routing::get(root))
297 .merge(Router::new().nest("/open", open_router.0))
298 .fallback(fall_back);
300
301 #[cfg(feature = "prometheus_enabled")]
303 {
304 let (prometheus_layer, metric_handle) = axum_prometheus::PrometheusMetricLayer::pair();
305 app = app
306 .route(
307 "/metrics",
308 axum::routing::get(|| async move { metric_handle.render() }),
309 )
310 .layer(prometheus_layer);
311 }
312
313 let mut router = private_router.0;
314 if let Some(http) = config.http() {
316 let val = http
317 .request()
318 .map(|req| {
319 let var1 = req.trace();
320 let var2 = req.max_request_size().unwrap_or_default();
321 (var1, var2)
322 })
323 .unwrap_or_default();
324 let _ = http.response();
325 if val.0 {
326 router = router.layer(TraceLayer::new_for_http());
327 }
328 if val.1 >= 3145728 {
330 router = router.route_layer(RequestBodyLimitLayer::new(val.1));
331 }
332 }
333
334 router = router
335 .layer(CatchPanicLayer::custom(handle_panic))
337 .layer(TimeoutLayer::new(std::time::Duration::from_secs(5)))
339 .layer(
341 CorsLayer::new()
342 .allow_origin(tower_http::cors::Any)
343 .allow_methods(tower_http::cors::Any)
344 .allow_headers(tower_http::cors::Any)
345 .max_age(std::time::Duration::from_secs(60) * 10),
346 );
347 if !config.context_path().is_empty() {
350 app = app.nest(config.context_path(), router);
351 } else {
352 app = app.merge(router);
353 }
354
355 let routers = ctx.resolve_by_type::<Box<dyn ApplyRouter>>();
357 for item in routers.into_iter() {
358 app = app.merge(item.router(ctx));
359 }
360
361 println!(
362 "\napplication listening on: [{}]",
363 format!("0.0.0.0:{}", config.port())
364 );
365
366 println!("application start time: {:?}", time.elapsed());
367
368 #[cfg(feature = "tls_rustls")]
370 {
371 let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
372 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
373 .join("self_signed_certs")
374 .join("cert.pem"),
375 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
376 .join("self_signed_certs")
377 .join("key.pem"),
378 )
379 .await
380 .unwrap();
381 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port()));
382 let mut server = axum_server::bind_rustls(addr, tls_config);
383 server.http_builder().http2().enable_connect_protocol();
386 server.serve(app.into_make_service()).await.unwrap();
387 }
388
389 #[cfg(not(feature = "tls_rustls"))]
390 {
391 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port()))
393 .await
394 .unwrap();
395
396 let shutdowns = ctx.resolve_by_type::<Box<dyn ApplicationShutdown>>();
397 axum::serve(listener, app.into_make_service())
398 .with_graceful_shutdown(async move {
399 let ctrl_c = async {
400 tokio::signal::ctrl_c()
401 .await
402 .expect("failed to install Ctrl+C handler");
403 };
404
405 #[cfg(unix)]
406 let terminate = async {
407 signal::unix::signal(signal::unix::SignalKind::terminate())
408 .expect("failed to install signal handler")
409 .recv()
410 .await;
411 };
412
413 #[cfg(not(unix))]
414 let terminate = std::future::pending::<()>();
415
416 tokio::select! {
417 _ = ctrl_c => {
418 info!("Received Ctrl+C. Shutting down...");
419 for service in shutdowns.iter() {
420 service.shutdown().await;
421 }
422
423 },
424 _ = terminate => {
425 info!("Received terminate signal. Shutting down...");
426 for service in shutdowns.iter() {
427 service.shutdown().await;
428 }
429 },
430 }
431 })
432 .await
433 .unwrap();
434 }
435 }
436
437 async fn run()
439 where
440 Self: Application + Default,
441 {
442 let start_time = std::time::Instant::now();
444
445 Self::banner_show();
447
448 let mut next_application: NextApplication<Self> = NextApplication::new();
450 let properties = next_application.application_properties().clone();
451
452 let application = next_application.application();
453
454 application.init_logger(&properties);
455 info!("init logger success");
456
457 let mut ctx = ApplicationContext::options()
458 .allow_override(true)
459 .auto_register();
460 info!("init application context success");
461
462 application.autowire_properties(&mut ctx, &properties).await;
464 info!("autowire properties success");
465
466 application.register_singleton(&mut ctx, &properties).await;
468 info!("register singleton success");
469
470 application.init_infrastructure(&mut ctx, &properties).await;
472 info!("init infrastructure success");
473
474 application.init_middleware(&properties).await;
476 info!("init middleware success");
477
478 #[cfg(feature = "grpc_enabled")]
479 {
480 application.register_rpc_server(&properties).await;
481 info!("register grpc server success");
482
483 application.connect_rpc_client(&properties).await;
484 info!("connect grpc client success");
485 }
486
487 application
490 .bind_tcp_server(&mut ctx, &properties, start_time)
491 .await;
492 }
493}
494
495fn handle_panic(err: Box<dyn std::any::Any + Send + 'static>) -> Response<Full<Bytes>> {
496 error!("Http server handle panic: {:?}", err);
497 let err_str = r#"
498{
499 "status": 500,
500 "message": "Internal Server Error",
501 "data": null
502}"#;
503 Response::builder()
504 .status(StatusCode::INTERNAL_SERVER_ERROR)
505 .header("content-type", "application/json")
506 .body(Full::from(err_str))
507 .unwrap()
508}
509
510async fn fall_back() -> (StatusCode, &'static str) {
512 (StatusCode::NOT_FOUND, "Not Found Route")
513}
514
515async fn root() -> axum::response::Html<&'static str> {
517 axum::response::Html("<html><body><h1>Welcome to Rust Web</h1></body></html>")
518}