pub mod command;
pub mod response;
pub mod websocket;
pub use response::{NoneBodyData, Response};
pub mod access_token;
pub use access_token::{get_token_and_path, AccessToken, TokenPermission};
pub mod termenv;
pub use termenv::{termenv_check, termenv_get, termenv_init};
pub mod permission;
pub use permission::*;
pub mod mysql;
pub mod redis;
pub mod webhttp;
use actix::Actor;
use actix::Addr;
use actix_multipart::form::MultipartFormConfig;
use actix_settings::ApplySettings;
use actix_web::web::PayloadConfig;
use crossbeam::queue::SegQueue;
use websocket::{ActorMsg, Connect, Disconnect, InMessage};
use actix_cors::Cors;
#[allow(unused_imports)]
use actix_web::{
dev::{Server, Service},
http::{KeepAlive, Method},
middleware, web,
web::ServiceConfig,
App, HttpResponse, HttpServer, *,
};
use env_logger::Env;
use sea_orm::DatabaseConnection;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use time::macros::offset;
#[allow(unused_imports)]
use tracing::*;
#[allow(clippy::unused_async)]
pub async fn not_found() -> HttpResponse {
HttpResponse::NotFound().body("Not Found 404")
}
#[allow(clippy::unused_async)]
pub async fn health_check() -> HttpResponse {
let now = time::OffsetDateTime::now_utc().to_offset(offset!(+8));
let return_info = format!("running: {:?}", now,);
HttpResponse::Ok().body(return_info)
}
pub enum WsData {
WsMessage { data: InMessage },
WsConnect { data: Connect },
WsDisconnect { data: Disconnect },
}
pub fn api_init_none_func(_: &mut ServiceConfig) {}
pub trait ServiceCallback: Send + Sync {
fn as_any(&self) -> &dyn Any;
fn api_init(&self, web_app: &mut ServiceConfig);
fn wsdata(&self, data: WsData, consumer: Arc<dyn ServiceCallback>) -> anyhow::Result<ActorMsg>;
}
#[derive(Clone)]
pub struct AppState {
pub worker: Option<Vec<Addr<websocket::Worker>>>,
pub consumer: Option<Arc<dyn ServiceCallback>>,
pub database: Option<DatabaseConnection>,
pub redis: Option<fred::prelude::RedisPool>,
pub config: Option<serde_json::Value>,
pub wsapi: Option<String>,
pub token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>,
pub jwt_secret: Option<String>,
}
pub async fn start(
name: String, port: u16, config: Option<serde_json::Value>, ws_consumer: Option<Arc<dyn ServiceCallback>>, ws_api: Option<String>, worker_num: Option<usize>, api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static, thread_num: Option<usize>, database: Option<DatabaseConnection>, redis: Option<fred::prelude::RedisPool>, token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>, jwt_secret: Option<String>, api_prefix: Option<String>, ) -> anyhow::Result<()> {
env_logger::init_from_env(Env::default().default_filter_or("info"));
let new_addr_list = if ws_consumer.is_some() {
let copied_consumer = ws_consumer.clone().unwrap();
let worker_thread_number = if worker_num.is_none() {
3
} else {
worker_num.unwrap()
};
let worker_addr = Arc::new(SegQueue::<Addr<websocket::Worker>>::default());
for _i in 0..worker_thread_number {
let cusumer_copied = copied_consumer.clone();
let worker_addr_copied = worker_addr.clone();
let arbiter = actix_rt::Arbiter::new();
arbiter.spawn(async move {
let addr = websocket::Worker::new(cusumer_copied.clone()).start();
worker_addr_copied.push(addr);
let addr = websocket::Worker::new(cusumer_copied).start();
worker_addr_copied.push(addr);
});
}
loop {
if worker_addr.len() != worker_thread_number * 2 {
continue;
} else {
break;
}
}
let mut new_addr_list = Vec::<Addr<websocket::Worker>>::default();
for _i in 0..worker_addr.len() {
new_addr_list.push(worker_addr.pop().unwrap());
}
Some(new_addr_list)
} else {
None
};
let state = AppState {
worker: new_addr_list,
consumer: ws_consumer,
database: database,
redis: redis,
config: config,
wsapi: ws_api,
token_check: token_check,
jwt_secret: jwt_secret,
};
start_internal(state, name, port, api_init, thread_num, api_prefix).await?;
anyhow::Ok(())
}
async fn start_internal(
state: AppState,
name: String,
port: u16,
api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static,
thread_num: Option<usize>,
api_prefix: Option<String>,
) -> anyhow::Result<()> {
let mut api_prefix = if api_prefix.is_some() {
api_prefix.unwrap().clone()
} else {
"/".into()
};
if !api_prefix.ends_with("/") {
api_prefix = format!("{}/", api_prefix);
}
let metrics = actix_web_prom::PrometheusMetricsBuilder::new(&name)
.endpoint(format!("{}metrics", api_prefix).as_str())
.build()
.unwrap();
let found_errors = prometheus::IntCounterVec::new(
prometheus::Opts {
namespace: name.clone(),
subsystem: String::new(),
name: "errors".into(),
help: "FoundErrors".into(),
const_labels: HashMap::new(),
variable_labels: Vec::new(),
},
&["path", "description"],
)?;
metrics.registry.register(Box::new(found_errors.clone()))?;
let api_init = Arc::new(api_init);
let payload_config = PayloadConfig::new(256 * 1024 * 1024);
let json_payload_config = web::JsonConfig::default();
let json_payload_config = json_payload_config.limit(16 * 1024 * 1024);
let multi_part_config = MultipartFormConfig::default()
.total_limit(512 * 1024 * 1024)
.memory_limit(64 * 1024 * 1024);
let mut settings = actix_settings::Settings::from_default_template();
actix_settings::Settings::override_field(&mut settings.actix.mode, "production")?;
actix_settings::Settings::override_field(
&mut settings.actix.hosts,
format!("[[\"0.0.0.0\", {}]]", port),
)?;
if thread_num.is_some() {
let thread_str = thread_num.unwrap().to_string();
actix_settings::Settings::override_field(&mut settings.actix.num_workers, thread_str)?;
} else {
actix_settings::Settings::override_field(&mut settings.actix.num_workers, "4")?;
}
HttpServer::new(move || {
let cors = Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header()
.expose_any_header()
.max_age(3600);
let error_metrics = found_errors.clone();
App::new()
.app_data(payload_config.clone())
.app_data(json_payload_config.clone())
.app_data(web::Data::new(state.clone()))
.app_data(multi_part_config.clone())
.route(
format!("{}health", api_prefix).as_str(),
web::get().to(health_check),
)
.configure(init_service(
state.clone(),
api_init.clone(),
api_prefix.clone(),
))
.wrap(cors)
.wrap(metrics.clone())
.wrap_fn(move |req, srv| {
let start_time = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros();
let addr = req.request().peer_addr().unwrap().to_string();
let method = req.request().method().to_string();
let path = req.request().path().to_string();
let fut = srv.call(req);
let error_counter = error_metrics.clone();
async move {
let srv_response = fut.await?;
if let Some(err) = srv_response.response().error() {
let url = match srv_response.request().match_pattern() {
Some(pattern) => pattern,
None => String::new(),
};
let err_desc = format!("{err}");
error_counter
.clone()
.with_label_values(&[url.as_str(), err_desc.as_str()])
.inc();
}
let end_time = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros();
let micros_diff = (end_time - start_time) as f32 / 1000.0;
let log_info = format!(
"{} {} {} {} {}ms",
addr,
method,
path,
srv_response.response().status().as_str(),
micros_diff
);
if path.eq("/metrics") {
debug!("{}", log_info)
} else {
info!("{}", log_info)
}
Ok(srv_response)
}
})
.default_service(web::route().to(not_found))
})
.try_apply_settings(&settings)
.unwrap()
.run()
.await?;
Ok(())
}
fn init_service(
state: AppState,
api_init: Arc<impl Fn(&mut web::ServiceConfig) + Send + Sync>,
api_prefix: String,
) -> impl Fn(&mut web::ServiceConfig) {
move |web_app| {
if state.consumer.is_some() {
let mut ws_api_url =
if state.wsapi.is_some() && !state.wsapi.as_ref().unwrap().is_empty() {
let ws_api_url = state.wsapi.as_ref().unwrap();
info!("register ws api service as: {}", ws_api_url);
ws_api_url
} else {
let ws_api_url = "websocket/api";
info!("register ws api service as default: {}", ws_api_url);
ws_api_url
};
if ws_api_url.starts_with("/") {
ws_api_url = ws_api_url.strip_prefix("/").unwrap();
}
web_app.service(
web::scope(format!("{}{}", api_prefix, ws_api_url).as_str())
.service(websocket::api::ws_api()),
);
state.consumer.as_ref().unwrap().api_init(web_app);
return;
}
api_init(web_app);
}
}