1pub mod command;
2pub mod response;
3pub mod websocket;
4pub use response::{NoneBodyData, Response};
5pub mod access_token;
6pub use access_token::{get_token_and_path, AccessToken, TokenPermission};
7pub mod termenv;
8pub use termenv::{termenv_check, termenv_get, termenv_init};
9pub mod permission;
10pub use permission::*;
11
12pub mod mysql;
13pub mod redis;
14pub mod webhttp;
15
16use actix::Actor;
17use actix::Addr;
18use actix_multipart::form::MultipartFormConfig;
19use actix_settings::ApplySettings;
20use actix_web::web::PayloadConfig;
21use crossbeam::queue::SegQueue;
22use websocket::{ActorMsg, Connect, Disconnect, InMessage};
23
24use actix_cors::Cors;
25#[allow(unused_imports)]
26use actix_web::{
27 dev::{Server, Service},
28 http::{KeepAlive, Method},
29 middleware, web,
30 web::ServiceConfig,
31 App, HttpResponse, HttpServer, *,
32};
33
34use env_logger::Env;
36use sea_orm::DatabaseConnection;
37use std::any::Any;
38use std::collections::HashMap;
39use std::sync::Arc;
40use time::macros::offset;
41#[allow(unused_imports)]
42use tracing::*;
43
44#[allow(clippy::unused_async)]
45pub async fn not_found() -> HttpResponse {
47 HttpResponse::NotFound().body("Not Found 404")
48}
49
50#[allow(clippy::unused_async)]
51pub async fn health_check() -> HttpResponse {
53 let now = time::OffsetDateTime::now_utc().to_offset(offset!(+8));
54 let return_info = format!("running: {:?}", now,);
55 HttpResponse::Ok().body(return_info)
56}
57
58pub enum WsData {
59 WsMessage { data: InMessage },
60 WsConnect { data: Connect },
61 WsDisconnect { data: Disconnect },
62}
63
64pub fn api_init_none_func(_: &mut ServiceConfig) {}
65
66pub trait ServiceCallback: Send + Sync {
71 fn as_any(&self) -> &dyn Any;
72 fn api_init(&self, web_app: &mut ServiceConfig);
73 fn wsdata(&self, data: WsData, consumer: Arc<dyn ServiceCallback>) -> anyhow::Result<ActorMsg>;
74}
75
76#[derive(Clone)]
77pub struct AppState {
78 pub worker: Option<Vec<Addr<websocket::Worker>>>,
79 pub consumer: Option<Arc<dyn ServiceCallback>>,
80 pub database: Option<DatabaseConnection>,
81 pub redis: Option<fred::prelude::RedisPool>,
82 pub config: Option<serde_json::Value>,
83 pub wsapi: Option<String>,
84 pub token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>,
85 pub jwt_secret: Option<String>,
86}
87
88pub async fn start(
89 name: String, port: u16, config: Option<serde_json::Value>, ws_consumer: Option<Arc<dyn ServiceCallback>>, ws_api: Option<String>, worker_num: Option<usize>, api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static, thread_num: Option<usize>, database: Option<DatabaseConnection>, redis: Option<fred::prelude::RedisPool>, token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>, jwt_secret: Option<String>, api_prefix: Option<String>, ) -> anyhow::Result<()> {
103 env_logger::init_from_env(Env::default().default_filter_or("info"));
104
105 let new_addr_list = if ws_consumer.is_some() {
106 let copied_consumer = ws_consumer.clone().unwrap();
107 let worker_thread_number = if worker_num.is_none() {
108 3
109 } else {
110 worker_num.unwrap()
111 };
112 let worker_addr = Arc::new(SegQueue::<Addr<websocket::Worker>>::default());
113
114 for _i in 0..worker_thread_number {
116 let cusumer_copied = copied_consumer.clone();
117 let worker_addr_copied = worker_addr.clone();
118 let arbiter = actix_rt::Arbiter::new();
119 arbiter.spawn(async move {
120 let addr = websocket::Worker::new(cusumer_copied.clone()).start();
121 worker_addr_copied.push(addr);
122 let addr = websocket::Worker::new(cusumer_copied).start();
123 worker_addr_copied.push(addr);
124 });
125 }
126
127 loop {
129 if worker_addr.len() != worker_thread_number * 2 {
130 continue;
131 } else {
132 break;
133 }
134 }
135
136 let mut new_addr_list = Vec::<Addr<websocket::Worker>>::default();
137 for _i in 0..worker_addr.len() {
138 new_addr_list.push(worker_addr.pop().unwrap());
139 }
140 Some(new_addr_list)
141 } else {
142 None
143 };
144
145 let state = AppState {
146 worker: new_addr_list,
147 consumer: ws_consumer,
148 database: database,
149 redis: redis,
150 config: config,
151 wsapi: ws_api,
152 token_check: token_check,
153 jwt_secret: jwt_secret,
154 };
155 start_internal(state, name, port, api_init, thread_num, api_prefix).await?;
156 anyhow::Ok(())
157}
158
159async fn start_internal(
160 state: AppState,
161 name: String,
162 port: u16,
163 api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static,
164 thread_num: Option<usize>,
165 api_prefix: Option<String>,
166) -> anyhow::Result<()> {
167 let mut api_prefix = if api_prefix.is_some() {
168 api_prefix.unwrap().clone()
169 } else {
170 "/".into()
171 };
172 if !api_prefix.ends_with("/") {
173 api_prefix = format!("{}/", api_prefix);
174 }
175
176 let metrics = actix_web_prom::PrometheusMetricsBuilder::new(&name)
177 .endpoint(format!("{}metrics", api_prefix).as_str())
178 .build()
179 .unwrap();
180 let found_errors = prometheus::IntCounterVec::new(
181 prometheus::Opts {
182 namespace: name.clone(),
183 subsystem: String::new(),
184 name: "errors".into(),
185 help: "FoundErrors".into(),
186 const_labels: HashMap::new(),
187 variable_labels: Vec::new(),
188 },
189 &["path", "description"],
190 )?;
191 metrics.registry.register(Box::new(found_errors.clone()))?;
192
193 let api_init = Arc::new(api_init);
194 let payload_config = PayloadConfig::new(256 * 1024 * 1024);
195 let json_payload_config = web::JsonConfig::default();
196 let json_payload_config = json_payload_config.limit(16 * 1024 * 1024);
197 let multi_part_config = MultipartFormConfig::default()
198 .total_limit(512 * 1024 * 1024)
199 .memory_limit(64 * 1024 * 1024);
200
201 let mut settings = actix_settings::Settings::from_default_template();
202 actix_settings::Settings::override_field(&mut settings.actix.mode, "production")?;
203 actix_settings::Settings::override_field(
204 &mut settings.actix.hosts,
205 format!("[[\"0.0.0.0\", {}]]", port),
206 )?;
207 if thread_num.is_some() {
208 let thread_str = thread_num.unwrap().to_string();
209 actix_settings::Settings::override_field(&mut settings.actix.num_workers, thread_str)?;
210 } else {
211 actix_settings::Settings::override_field(&mut settings.actix.num_workers, "4")?;
212 }
213
214 HttpServer::new(move || {
215 let cors = Cors::default()
216 .allow_any_origin()
217 .allow_any_method()
218 .allow_any_header()
219 .expose_any_header()
220 .max_age(3600);
221 let error_metrics = found_errors.clone();
222 App::new()
223 .app_data(payload_config.clone())
224 .app_data(json_payload_config.clone())
225 .app_data(web::Data::new(state.clone()))
226 .app_data(multi_part_config.clone())
227 .route(
228 format!("{}health", api_prefix).as_str(),
229 web::get().to(health_check),
230 )
231 .configure(init_service(
232 state.clone(),
233 api_init.clone(),
234 api_prefix.clone(),
235 ))
236 .wrap(cors)
237 .wrap(metrics.clone())
240 .wrap_fn(move |req, srv| {
241 let start_time = std::time::SystemTime::now()
243 .duration_since(std::time::SystemTime::UNIX_EPOCH)
244 .unwrap()
245 .as_micros();
246 let addr = req.request().peer_addr().unwrap().to_string();
248 let method = req.request().method().to_string();
256 let path = req.request().path().to_string();
257
258 let fut = srv.call(req);
259 let error_counter = error_metrics.clone();
260 async move {
261 let srv_response = fut.await?;
262 if let Some(err) = srv_response.response().error() {
263 let url = match srv_response.request().match_pattern() {
264 Some(pattern) => pattern,
265 None => String::new(),
266 };
267 let err_desc = format!("{err}");
268 error_counter
269 .clone()
270 .with_label_values(&[url.as_str(), err_desc.as_str()])
271 .inc();
272 }
273
274 let end_time = std::time::SystemTime::now()
275 .duration_since(std::time::SystemTime::UNIX_EPOCH)
276 .unwrap()
277 .as_micros();
278 let micros_diff = (end_time - start_time) as f32 / 1000.0;
279
280 let log_info = format!(
290 "{} {} {} {} {}ms",
291 addr,
292 method,
293 path,
294 srv_response.response().status().as_str(),
295 micros_diff
296 );
297 if path.eq("/metrics") {
298 debug!("{}", log_info)
299 } else {
300 info!("{}", log_info)
301 }
302
303 Ok(srv_response)
304 }
305 })
306 .default_service(web::route().to(not_found))
307 })
308 .try_apply_settings(&settings)
311 .unwrap()
312 .run()
315 .await?;
316 Ok(())
317}
318
319fn init_service(
320 state: AppState,
321 api_init: Arc<impl Fn(&mut web::ServiceConfig) + Send + Sync>,
322 api_prefix: String,
323) -> impl Fn(&mut web::ServiceConfig) {
324 move |web_app| {
325 if state.consumer.is_some() {
326 let mut ws_api_url =
328 if state.wsapi.is_some() && !state.wsapi.as_ref().unwrap().is_empty() {
329 let ws_api_url = state.wsapi.as_ref().unwrap();
330 info!("register ws api service as: {}", ws_api_url);
331 ws_api_url
332 } else {
333 let ws_api_url = "websocket/api";
334 info!("register ws api service as default: {}", ws_api_url);
335 ws_api_url
336 };
337
338 if ws_api_url.starts_with("/") {
339 ws_api_url = ws_api_url.strip_prefix("/").unwrap();
340 }
341
342 web_app.service(
343 web::scope(format!("{}{}", api_prefix, ws_api_url).as_str())
344 .service(websocket::api::ws_api()),
345 );
346 state.consumer.as_ref().unwrap().api_init(web_app);
347 return;
348 }
349 api_init(web_app);
351 }
352}