next_web_dev/application/
application.rs1use async_trait::async_trait;
2use axum::body::Bytes;
3use axum::http::{Response, StatusCode};
4use axum::Router;
5use hashbrown::HashMap;
6use http_body_util::Full;
7use next_web_core::constants::application_constants::APPLICATION_BANNER_FILE;
8use next_web_core::context::application_args::ApplicationArgs;
9use next_web_core::context::application_context::ApplicationContext;
10use next_web_core::context::properties::{ApplicationProperties, Properties};
11use next_web_core::core::apply_router::ApplyRouter;
12use next_web_core::AutoRegister;
13use rust_embed_for_web::{EmbedableFile, RustEmbed};
14use std::io::BufRead;
15use std::sync::Arc;
17use tower_http::catch_panic::CatchPanicLayer;
18use tower_http::cors::CorsLayer;
19use tower_http::limit::RequestBodyLimitLayer;
20use tower_http::timeout::TimeoutLayer;
21use tower_http::trace::TraceLayer;
22use tracing::{error, info, warn};
23
24use crate::application::application_shutdown::ApplicationShutdown;
25use crate::application::next_application::NextApplication;
26
27use crate::autoregister::register_single::ApplicationDefaultRegisterContainer;
28use crate::event::application_event_multicaster::ApplicationEventMulticaster;
29use crate::event::application_listener::ApplicationListener;
30
31use crate::autoconfigure::context::next_properties::NextProperties;
32use crate::banner::top_banner::{TopBanner, DEFAULT_TOP_BANNER};
33use crate::event::default_application_event_multicaster::DefaultApplicationEventMulticaster;
34use crate::event::default_application_event_publisher::DefaultApplicationEventPublisher;
35use crate::util::date_time_util::LocalDateTime;
36
37use next_web_core::context::application_resources::ApplicationResources;
38
39#[cfg(feature = "job_scheduler")]
40use crate::manager::job_scheduler_manager::{ApplicationJob, JobSchedulerManager};
41
42#[async_trait]
43pub trait Application: Send + Sync {
44 async fn init_middleware(&mut self, properties: &ApplicationProperties);
46
47 async fn application_router(&mut self, ctx: &mut ApplicationContext) -> axum::Router;
49
50 #[cfg(feature = "enable_grpc")]
52 async fn register_rpc_server(&mut self, properties: &ApplicationProperties);
53
54 #[cfg(feature = "enable_grpc")]
56 async fn connect_rpc_client(&mut self, properties: &ApplicationProperties);
57
58 fn banner_show() {
60 if let Some(content) = ApplicationResources::get(APPLICATION_BANNER_FILE) {
61 TopBanner::show(std::str::from_utf8(&content.data()).unwrap_or(DEFAULT_TOP_BANNER));
62 } else {
63 TopBanner::show(DEFAULT_TOP_BANNER);
64 }
65 }
66
67 async fn init_message_source<T>(
69 &mut self,
70 application_properties: &NextProperties,
71 ) -> HashMap<String, HashMap<String, String>> {
72 let mut messages = HashMap::new();
73 if let Some(message_source) = application_properties.messages() {
74 let mut load_local_message = |name: &str| {
75 if let Some(dyn_file) = ApplicationResources::get(&format!("messages/{}", &name)) {
76 let data = dyn_file.data();
77 if !data.is_empty() {
78 let mut map = HashMap::new();
79 let _ = data.lines().map(|var| {
80 var.map(|var1| {
81 let var2: Vec<&str> = var1.split("=").collect();
82 if var2.len() == 2 {
83 let key = var2.get(0).unwrap();
84 let value = var2.get(1).unwrap();
85 map.insert(key.to_string(), value.to_string());
86 }
87 })
88 });
89 messages.insert(name.to_string(), map);
90 }
91 }
92 };
93
94 load_local_message(&"messages.properties");
96
97 message_source.local().map(|item| {
98 item.trim_end().split(",").for_each(|s| {
99 let name = format!("messages_{}.properties", s);
100 load_local_message(&name);
101 });
102 });
103 }
104 messages
105 }
106
107 fn init_logger(&self, application_properties: &ApplicationProperties) {
109 let application_name = application_properties
110 .next()
111 .appliation()
112 .map(|app| app.name())
113 .unwrap_or_default();
114 let file_appender = application_properties.next().logger().map_or_else(
115 || None,
116 |logger| {
117 if logger.enable() {
119 let path = logger.log_dir().unwrap_or_else(|| "./logs");
120 let log_name = format!(
121 "{}{}.log",
122 application_name,
123 if logger.additional_date() {
124 format!("-{}", LocalDateTime::date())
125 } else {
126 String::new()
127 }
128 );
129 return Some(tracing_appender::rolling::daily(path, log_name));
130 }
131 None
132 },
133 );
134
135 let config = tracing_subscriber::fmt::format()
136 .with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
137 "%Y-%m-%d %H:%M:%S%.3f".to_string(),
138 ))
139 .with_level(true)
140 .with_target(true)
141 .with_line_number(true)
142 .with_thread_ids(true)
143 .with_file(true)
144 .with_thread_names(true);
145
146 let logger = tracing_subscriber::fmt()
149 .with_max_level(tracing::Level::INFO)
151 .with_ansi(false)
152 .event_format(config);
153
154 if let Some(file_appender) = file_appender {
155 let (non_blocking, _worker) = tracing_appender::non_blocking(file_appender);
156 logger.with_writer(non_blocking).with_test_writer().init();
157 } else {
158 logger.init();
159 }
160 }
161
162 async fn autowire_properties(
164 &mut self,
165 ctx: &mut ApplicationContext,
166 application_properties: &ApplicationProperties,
167 ) {
168 let properties = ctx.resolve_by_type::<Box<dyn Properties>>();
169 for item in properties {
170 item.register(ctx, application_properties).await.unwrap();
171 }
172 }
173
174 async fn register_singleton(
176 &self,
177 ctx: &mut ApplicationContext,
178 application_properties: &ApplicationProperties,
179 application_args: &ApplicationArgs,
180 ) {
181 let mut container = ApplicationDefaultRegisterContainer::new();
183 container.register_all(ctx, application_properties).await;
184
185 ctx.insert_singleton_with_name(application_properties.clone(), "");
187 ctx.insert_singleton_with_name(application_args.clone(), "");
188
189 let auto_register = ctx.resolve_by_type::<Arc<dyn AutoRegister>>();
191 for item in auto_register.iter() {
192 item.register(ctx, application_properties).await.unwrap();
193 }
194 }
195
196 async fn init_infrastructure(
198 &self,
199 ctx: &mut ApplicationContext,
200 _application_properties: &ApplicationProperties,
201 ) {
202 let producers = ctx.resolve_by_type::<Arc<dyn ApplicationJob>>();
204 if let Some(schedluer_manager) =
205 ctx.get_single_option_with_name::<JobSchedulerManager>("jobSchedulerManager")
206 {
207 let mut schedluer_manager = schedluer_manager.clone();
208 for producer in producers {
209 schedluer_manager.add_job(producer.gen_job()).await;
210 }
211 schedluer_manager.start();
212 } else {
213 warn!("Job scheduler manager not found");
214 }
215
216 let (tx, rx) = flume::unbounded();
218 let mut default_event_publisher = DefaultApplicationEventPublisher::new();
219 let mut multicaster = DefaultApplicationEventMulticaster::new();
220
221 default_event_publisher.set_channel(Some(tx));
222 multicaster.set_event_channel(rx);
223
224 let listeners = ctx.resolve_by_type::<Box<dyn ApplicationListener>>();
225 listeners.into_iter().for_each(|listener| {
226 multicaster.add_application_listener(listener);
227 });
228
229 multicaster.run();
230
231 ctx.insert_singleton_with_name(default_event_publisher, "");
232 ctx.insert_singleton_with_name(multicaster, "");
233 }
234
235 async fn bind_tcp_server(
237 &mut self,
238 ctx: &mut ApplicationContext,
239 application_properties: &ApplicationProperties,
240 time: std::time::Instant,
241 ) {
242 let config = application_properties.next().server();
243
244 let context_path = config.context_path().unwrap_or("");
245 let server_port = config.port().unwrap_or(11011);
246 let server_addr = if config.local().unwrap_or(false) {
247 "127.0.0.1"
248 } else {
249 "0.0.0.0"
250 };
251
252 let mut application_router = self.application_router(ctx).await;
253
254 let mut app = Router::new()
256 .fallback(fall_back);
258
259 #[cfg(feature = "enable_prometheus")]
261 {
262 let (prometheus_layer, metric_handle) = axum_prometheus::PrometheusMetricLayer::pair();
263 app = app
264 .route(
265 "/metrics",
266 axum::routing::get(|| async move { metric_handle.render() }),
267 )
268 .layer(prometheus_layer);
269 }
270
271 if let Some(http) = config.http() {
273 let val = http
274 .request()
275 .map(|req| {
276 let var1 = req.trace();
277 let var2 = req.max_request_size().unwrap_or_default();
278 (var1, var2)
279 })
280 .unwrap_or_default();
281 let _ = http.response();
282 if val.0 {
283 application_router = application_router.route_layer(TraceLayer::new_for_http());
284 }
285 if val.1 >= 3145728 {
287 application_router =
288 application_router.route_layer(RequestBodyLimitLayer::new(val.1));
289 }
290 }
291
292 application_router = application_router
293 .layer(CatchPanicLayer::custom(handle_panic))
295 .layer(TimeoutLayer::new(std::time::Duration::from_secs(5)))
297 .layer(
299 CorsLayer::new()
300 .allow_origin(tower_http::cors::Any)
301 .allow_methods(tower_http::cors::Any)
302 .allow_headers(tower_http::cors::Any)
303 .max_age(std::time::Duration::from_secs(60) * 10),
304 );
305
306 if !context_path.is_empty() {
307 app = app.nest(context_path, application_router);
308 } else {
309 app = app.merge(application_router);
310 }
311
312 let routers = ctx.resolve_by_type::<Box<dyn ApplyRouter>>();
314
315 let (mut open_routers, mut common_routers): (Vec<_>, Vec<_>) =
316 routers.into_iter().partition(|item| item.open());
317
318 let var10 = Router::new();
319
320 open_routers.sort_by_key(|k| k.order());
321 let open_router = open_routers
322 .into_iter()
323 .map(|item| item.router(ctx))
324 .filter(|item1| item1.has_routes())
325 .fold(var10, |acc, router| {
326 if !context_path.is_empty() {
327 acc.nest(context_path, router)
328 } else {
329 acc.merge(router)
330 }
331 });
332
333 app = app.merge(open_router);
334
335 common_routers.sort_by_key(|k| k.order());
336 for item2 in common_routers
337 .into_iter()
338 .map(|item| item.router(ctx))
339 .filter(|item1| item1.has_routes())
340 {
341 if !context_path.is_empty() {
342 app = app.nest(context_path, item2);
343 continue;
344 }
345 app = app.merge(item2);
346 }
347
348 println!(
349 "\nApplication Listening on: {}",
350 format!("{}:{}", server_addr, server_port)
351 );
352
353 println!("Application Running on: {}", LocalDateTime::now());
354 println!("Application StartTime on: {:?}", time.elapsed());
355 println!("Application CurrentPid on: {:?}\n", std::process::id());
356
357 #[cfg(feature = "tls_rustls")]
359 {
360 let tls_config = axum_server::tls_rustls::RustlsConfig::from_pem_file(
361 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
362 .join("self_signed_certs")
363 .join("cert.pem"),
364 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
365 .join("self_signed_certs")
366 .join("key.pem"),
367 )
368 .await
369 .unwrap();
370 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port()));
371 let mut server = axum_server::bind_rustls(addr, tls_config);
372 server.http_builder().http2().enable_connect_protocol();
375 server.serve(app.into_make_service()).await.unwrap();
376 }
377
378 #[cfg(not(feature = "tls_rustls"))]
379 {
380 let listener =
382 tokio::net::TcpListener::bind(format!("{}:{}", server_addr, server_port))
383 .await
384 .unwrap();
385
386 let mut shutdowns = ctx.resolve_by_type::<Box<dyn ApplicationShutdown>>();
387 axum::serve(
388 listener,
389 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
390 )
391 .with_graceful_shutdown(async move {
392 let ctrl_c = async {
393 tokio::signal::ctrl_c()
394 .await
395 .expect("failed to install Ctrl+C handler");
396 };
397
398 #[cfg(unix)]
399 let terminate = async {
400 signal::unix::signal(signal::unix::SignalKind::terminate())
401 .expect("failed to install signal handler")
402 .recv()
403 .await;
404 };
405
406 #[cfg(not(unix))]
407 let terminate = std::future::pending::<()>();
408
409 tokio::select! {
410 _ = ctrl_c => {
411 info!("Received Ctrl+C. Shutting down...");
412 for service in shutdowns.iter_mut() {
413 service.shutdown().await;
414 }
415 },
416 _ = terminate => {
417 info!("Received terminate signal. Shutting down...");
418 for service in shutdowns.iter_mut() {
419 service.shutdown().await;
420 }
421 },
422 }
423 })
424 .await
425 .unwrap();
426 }
427 }
428
429 async fn run()
431 where
432 Self: Application + Default,
433 {
434 let start_time = std::time::Instant::now();
436
437 Self::banner_show();
439
440 let mut next_application: NextApplication<Self> = NextApplication::new();
442 let properties = next_application.application_properties().clone();
443 let args = next_application.application_args().clone();
444
445 let application = next_application.application();
446
447 println!("========================================================================\n");
448
449 application.init_logger(&properties);
450 println!(
451 "Init logger success!\nCurrent Time: {}\n",
452 LocalDateTime::now()
453 );
454
455 let mut ctx = ApplicationContext::options()
456 .allow_override(false)
457 .auto_register();
458 println!(
459 "Init application context success!\nCurrent Time: {}\n",
460 LocalDateTime::now()
461 );
462
463 application.autowire_properties(&mut ctx, &properties).await;
465 println!(
466 "Autowire properties success!\nCurrent Time: {}\n",
467 LocalDateTime::now()
468 );
469
470 application
472 .register_singleton(&mut ctx, &properties, &args)
473 .await;
474 println!(
475 "Register singleton success!\nCurrent Time: {}\n",
476 LocalDateTime::now()
477 );
478
479 application.init_infrastructure(&mut ctx, &properties).await;
481 println!(
482 "Init infrastructure success!\nCurrent Time: {}\n",
483 LocalDateTime::now()
484 );
485
486 application.init_middleware(&properties).await;
488 println!(
489 "Init middleware success!\nCurrent Time: {}\n",
490 LocalDateTime::now()
491 );
492
493 #[cfg(feature = "enable_grpc")]
494 {
495 application.register_rpc_server(&properties).await;
496 println!(
497 "Register grpc server success!\nCurrent Time: {}\n",
498 LocalDateTime::now()
499 );
500
501 application.connect_rpc_client(&properties).await;
502 println!(
503 "Connect grpc client success!\nCurrent Time: {}\n",
504 LocalDateTime::now()
505 );
506 }
507
508 println!("========================================================================");
509
510 application
513 .bind_tcp_server(&mut ctx, &properties, start_time)
514 .await;
515 }
516}
517
518fn handle_panic(err: Box<dyn std::any::Any + Send + 'static>) -> Response<Full<Bytes>> {
519 error!("Application handle panic, case: {:?}", err);
520 let err_str = format!(
521 "internal server error, case: {:?},\ntimestamp: {}",
522 err,
523 LocalDateTime::timestamp()
524 );
525 Response::builder()
526 .status(StatusCode::INTERNAL_SERVER_ERROR)
527 .body(err_str.into())
528 .unwrap()
529}
530
531async fn fall_back() -> (StatusCode, &'static str) {
533 (StatusCode::NOT_FOUND, "not macth route")
534}