Skip to main content

webhttp/
lib.rs

1pub mod command;
2pub mod response;
3pub mod websocket;
4pub use response::{NoneBodyData, Response};
5pub mod access_token;
6pub use access_token::{get_token_and_path, AccessToken, TokenPermission};
7pub mod termenv;
8pub use termenv::{termenv_check, termenv_get, termenv_init};
9pub mod permission;
10pub use permission::*;
11
12pub mod mysql;
13pub mod redis;
14pub mod webhttp;
15
16use actix::Actor;
17use actix::Addr;
18use actix_multipart::form::MultipartFormConfig;
19use actix_settings::ApplySettings;
20use actix_web::web::PayloadConfig;
21use crossbeam::queue::SegQueue;
22use websocket::{ActorMsg, Connect, Disconnect, InMessage};
23
24use actix_cors::Cors;
25#[allow(unused_imports)]
26use actix_web::{
27    dev::{Server, Service},
28    http::{KeepAlive, Method},
29    middleware, web,
30    web::ServiceConfig,
31    App, HttpResponse, HttpServer, *,
32};
33
34// use actix_web::middleware::Logger;
35use env_logger::Env;
36use sea_orm::DatabaseConnection;
37use std::any::Any;
38use std::collections::HashMap;
39use std::sync::Arc;
40use time::macros::offset;
41#[allow(unused_imports)]
42use tracing::*;
43
44#[allow(clippy::unused_async)]
45// #[cfg_attr(coverage, no_coverage)]
46pub async fn not_found() -> HttpResponse {
47    HttpResponse::NotFound().body("Not Found 404")
48}
49
50#[allow(clippy::unused_async)]
51// #[cfg_attr(coverage, no_coverage)]
52pub async fn health_check() -> HttpResponse {
53    let now = time::OffsetDateTime::now_utc().to_offset(offset!(+8));
54    let return_info = format!("running: {:?}", now,);
55    HttpResponse::Ok().body(return_info)
56}
57
58pub enum WsData {
59    WsMessage { data: InMessage },
60    WsConnect { data: Connect },
61    WsDisconnect { data: Disconnect },
62}
63
64pub fn api_init_none_func(_: &mut ServiceConfig) {}
65
66// public trait that outsite code should use
67// use async_trait::async_trait;
68// #[async_trait(?Send)]
69// #[async_trait]
70pub trait ServiceCallback: Send + Sync {
71    fn as_any(&self) -> &dyn Any;
72    fn api_init(&self, web_app: &mut ServiceConfig);
73    fn wsdata(&self, data: WsData, consumer: Arc<dyn ServiceCallback>) -> anyhow::Result<ActorMsg>;
74}
75
76#[derive(Clone)]
77pub struct AppState {
78    pub worker: Option<Vec<Addr<websocket::Worker>>>,
79    pub consumer: Option<Arc<dyn ServiceCallback>>,
80    pub database: Option<DatabaseConnection>,
81    pub redis: Option<fred::prelude::RedisPool>,
82    pub config: Option<serde_json::Value>,
83    pub wsapi: Option<String>,
84    pub token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>,
85    pub jwt_secret: Option<String>,
86}
87
88pub async fn start(
89    name: String,                                                       // server name
90    port: u16,                                                          // server port
91    config: Option<serde_json::Value>,                                  // configuration
92    ws_consumer: Option<Arc<dyn ServiceCallback>>,                      // Websocket consumer
93    ws_api: Option<String>,                                             // Websocket api register
94    worker_num: Option<usize>,                                          // actor number of websocket
95    api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static, // outer api register
96    thread_num: Option<usize>,                                          // web thread number
97    database: Option<DatabaseConnection>,                               // database connector
98    redis: Option<fred::prelude::RedisPool>,                            // redis connector
99    token_check: Option<Arc<dyn crate::access_token::TokenPermission + Send + Sync>>, // permissison
100    jwt_secret: Option<String>,                                         // jwt secret
101    api_prefix: Option<String>, // api prefix url, such as /api/v1/test
102) -> anyhow::Result<()> {
103    env_logger::init_from_env(Env::default().default_filter_or("info"));
104
105    let new_addr_list = if ws_consumer.is_some() {
106        let copied_consumer = ws_consumer.clone().unwrap();
107        let worker_thread_number = if worker_num.is_none() {
108            3
109        } else {
110            worker_num.unwrap()
111        };
112        let worker_addr = Arc::new(SegQueue::<Addr<websocket::Worker>>::default());
113
114        // all arbiter actor number = worker_num * 2
115        for _i in 0..worker_thread_number {
116            let cusumer_copied = copied_consumer.clone();
117            let worker_addr_copied = worker_addr.clone();
118            let arbiter = actix_rt::Arbiter::new();
119            arbiter.spawn(async move {
120                let addr = websocket::Worker::new(cusumer_copied.clone()).start();
121                worker_addr_copied.push(addr);
122                let addr = websocket::Worker::new(cusumer_copied).start();
123                worker_addr_copied.push(addr);
124            });
125        }
126
127        // check arbiter actor number
128        loop {
129            if worker_addr.len() != worker_thread_number * 2 {
130                continue;
131            } else {
132                break;
133            }
134        }
135
136        let mut new_addr_list = Vec::<Addr<websocket::Worker>>::default();
137        for _i in 0..worker_addr.len() {
138            new_addr_list.push(worker_addr.pop().unwrap());
139        }
140        Some(new_addr_list)
141    } else {
142        None
143    };
144
145    let state = AppState {
146        worker: new_addr_list,
147        consumer: ws_consumer,
148        database: database,
149        redis: redis,
150        config: config,
151        wsapi: ws_api,
152        token_check: token_check,
153        jwt_secret: jwt_secret,
154    };
155    start_internal(state, name, port, api_init, thread_num, api_prefix).await?;
156    anyhow::Ok(())
157}
158
159async fn start_internal(
160    state: AppState,
161    name: String,
162    port: u16,
163    api_init: impl Fn(&mut web::ServiceConfig) + Sync + Send + 'static,
164    thread_num: Option<usize>,
165    api_prefix: Option<String>,
166) -> anyhow::Result<()> {
167    let mut api_prefix = if api_prefix.is_some() {
168        api_prefix.unwrap().clone()
169    } else {
170        "/".into()
171    };
172    if !api_prefix.ends_with("/") {
173        api_prefix = format!("{}/", api_prefix);
174    }
175
176    let metrics = actix_web_prom::PrometheusMetricsBuilder::new(&name)
177        .endpoint(format!("{}metrics", api_prefix).as_str())
178        .build()
179        .unwrap();
180    let found_errors = prometheus::IntCounterVec::new(
181        prometheus::Opts {
182            namespace: name.clone(),
183            subsystem: String::new(),
184            name: "errors".into(),
185            help: "FoundErrors".into(),
186            const_labels: HashMap::new(),
187            variable_labels: Vec::new(),
188        },
189        &["path", "description"],
190    )?;
191    metrics.registry.register(Box::new(found_errors.clone()))?;
192
193    let api_init = Arc::new(api_init);
194    let payload_config = PayloadConfig::new(256 * 1024 * 1024);
195    let json_payload_config = web::JsonConfig::default();
196    let json_payload_config = json_payload_config.limit(16 * 1024 * 1024);
197    let multi_part_config = MultipartFormConfig::default()
198        .total_limit(512 * 1024 * 1024)
199        .memory_limit(64 * 1024 * 1024);
200
201    let mut settings = actix_settings::Settings::from_default_template();
202    actix_settings::Settings::override_field(&mut settings.actix.mode, "production")?;
203    actix_settings::Settings::override_field(
204        &mut settings.actix.hosts,
205        format!("[[\"0.0.0.0\", {}]]", port),
206    )?;
207    if thread_num.is_some() {
208        let thread_str = thread_num.unwrap().to_string();
209        actix_settings::Settings::override_field(&mut settings.actix.num_workers, thread_str)?;
210    } else {
211        actix_settings::Settings::override_field(&mut settings.actix.num_workers, "4")?;
212    }
213
214    HttpServer::new(move || {
215        let cors = Cors::default()
216            .allow_any_origin()
217            .allow_any_method()
218            .allow_any_header()
219            .expose_any_header()
220            .max_age(3600);
221        let error_metrics = found_errors.clone();
222        App::new()
223            .app_data(payload_config.clone())
224            .app_data(json_payload_config.clone())
225            .app_data(web::Data::new(state.clone()))
226            .app_data(multi_part_config.clone())
227            .route(
228                format!("{}health", api_prefix).as_str(),
229                web::get().to(health_check),
230            )
231            .configure(init_service(
232                state.clone(),
233                api_init.clone(),
234                api_prefix.clone(),
235            ))
236            .wrap(cors)
237            // not use middlewar of logger, because the format is not same as tracing
238            // .wrap(Logger::new("%a %r[%t]-%s %T %b"))
239            .wrap(metrics.clone())
240            .wrap_fn(move |req, srv| {
241                // let now = time::OffsetDateTime::now_utc().to_offset(offset!(+8));
242                let start_time = std::time::SystemTime::now()
243                    .duration_since(std::time::SystemTime::UNIX_EPOCH)
244                    .unwrap()
245                    .as_micros();
246                // with port
247                let addr = req.request().peer_addr().unwrap().to_string();
248                // without port
249                // let addr = req
250                //     .request()
251                //     .connection_info()
252                //     .realip_remote_addr()
253                //     .unwrap()
254                //     .to_string();
255                let method = req.request().method().to_string();
256                let path = req.request().path().to_string();
257
258                let fut = srv.call(req);
259                let error_counter = error_metrics.clone();
260                async move {
261                    let srv_response = fut.await?;
262                    if let Some(err) = srv_response.response().error() {
263                        let url = match srv_response.request().match_pattern() {
264                            Some(pattern) => pattern,
265                            None => String::new(),
266                        };
267                        let err_desc = format!("{err}");
268                        error_counter
269                            .clone()
270                            .with_label_values(&[url.as_str(), err_desc.as_str()])
271                            .inc();
272                    }
273
274                    let end_time = std::time::SystemTime::now()
275                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
276                        .unwrap()
277                        .as_micros();
278                    let micros_diff = (end_time - start_time) as f32 / 1000.0;
279
280                    // let log_info = format!(
281                    //     "[{:?}] {} {} {} {} {}ms",
282                    //     now,
283                    //     addr,
284                    //     method,
285                    //     path,
286                    //     srv_response.response().status().as_str(),
287                    //     micros_diff
288                    // );
289                    let log_info = format!(
290                        "{} {} {} {} {}ms",
291                        addr,
292                        method,
293                        path,
294                        srv_response.response().status().as_str(),
295                        micros_diff
296                    );
297                    if path.eq("/metrics") {
298                        debug!("{}", log_info)
299                    } else {
300                        info!("{}", log_info)
301                    }
302
303                    Ok(srv_response)
304                }
305            })
306            .default_service(web::route().to(not_found))
307    })
308    // .keep_alive(std::time::Duration::from_secs(75))
309    // .keep_alive(KeepAlive::Os)
310    .try_apply_settings(&settings)
311    .unwrap()
312    // .apply_settings(&settings)
313    // .bind(("0.0.0.0", port))? // if use settings, it already has binding addr, can't add again
314    .run()
315    .await?;
316    Ok(())
317}
318
319fn init_service(
320    state: AppState,
321    api_init: Arc<impl Fn(&mut web::ServiceConfig) + Send + Sync>,
322    api_prefix: String,
323) -> impl Fn(&mut web::ServiceConfig) {
324    move |web_app| {
325        if state.consumer.is_some() {
326            // info!("http and websocket mode");
327            let mut ws_api_url =
328                if state.wsapi.is_some() && !state.wsapi.as_ref().unwrap().is_empty() {
329                    let ws_api_url = state.wsapi.as_ref().unwrap();
330                    info!("register ws api service as: {}", ws_api_url);
331                    ws_api_url
332                } else {
333                    let ws_api_url = "websocket/api";
334                    info!("register ws api service as default: {}", ws_api_url);
335                    ws_api_url
336                };
337
338            if ws_api_url.starts_with("/") {
339                ws_api_url = ws_api_url.strip_prefix("/").unwrap();
340            }
341
342            web_app.service(
343                web::scope(format!("{}{}", api_prefix, ws_api_url).as_str())
344                    .service(websocket::api::ws_api()),
345            );
346            state.consumer.as_ref().unwrap().api_init(web_app);
347            return;
348        }
349        // info!("http mode only");
350        api_init(web_app);
351    }
352}