use std::any::Any;
use std::net::SocketAddr;
use std::time;
use crate::api::{CollectorIWeb, IWeb, RequesterInfo};
use actix_web::web::{Payload, Query};
use actix_web::{
web, App, HttpRequest, HttpResponse, HttpServer,
};
use downcast_rs::Downcast;
use futures_util::StreamExt;
use once_map::OnceMap;
use serde_json::{error, json, Value};
use crate::cin::{CollectorIDaemonLoop, CollectorIGrpcServer, CollectorIOnAppready};
use crate::myparameter::*;
use std::env;
use std::ops::Deref;
use chrono::Duration;
use sqlx::Error;
use tonic::{transport::Server, Request, Response, Status};
use tonic::body::BoxBody;
use tonic::codegen::http::{request, response};
use tonic::codegen::{ Service};
use tonic::server::NamedService;
use tonic::transport::Body;
use tracing::span;
use crate::{mydbg, myjson, mylog};
use crate::myerror::MyError;
pub async fn main() -> std::io::Result<()> {
let mut serverconfig = Server::builder().timeout(time::Duration::from_secs(6 * 3600)).http2_keepalive_timeout(Some(time::Duration::from_secs(6 * 3600))).trace_fn(|x| {
mylog!(x.uri(),x.method());
span!(tracing::Level::TRACE, "GRPC","{} {} {:?}",x.method(), x.uri(),x.headers())
}).accept_http1(true);
let mut server=serverconfig.add_optional_service(Some(crate::grpcs::default_business::DefaultBusiness::server()));
for implementor in inventory::iter::<CollectorIGrpcServer> {
let obj = &(implementor.0);
mylog!(format!("grpc server: {}",obj.classname()));
server=obj.add_to_server(server);
}
let grpcport=env::var("GRPC_PORT").unwrap_or("8088".to_string());
let grpcaddress=format!("0.0.0.0:{}",grpcport);
let addr:SocketAddr =grpcaddress.parse().unwrap();
actix_rt::spawn(async move {
mylog!(format!("grpc server start at address:{grpcaddress}"));
server.serve(addr).await.unwrap();
});
let httpport=env::var("WEB_PORT").unwrap_or("80".to_string());
let httpaddress=format!("0.0.0.0:{}",httpport);
for implementor in inventory::iter::<CollectorIWeb> {
let obj = &(implementor.0);
mydbg!(format!("http server: {}",obj.route()));
}
let httpserver=HttpServer::new(|| {
let mut app = App::new();
for implementor in inventory::iter::<CollectorIWeb> {
let obj = &(implementor.0);
app = app.app_data(obj).route(
obj.route(),
web::post().to(
move |a: HttpRequest, payload: web::Payload| async move {
handler_http(obj, a, payload).await
},
),
);
app = app.app_data(obj).route(
obj.route(),
web::get().to(
move |a: HttpRequest, payload: web::Payload| async move {
handler_http(obj, a, payload).await
},
),
);
}
app.service(actix_files::Files::new("static", ".").show_files_listing())
}).bind(("0.0.0.0", httpport.as_str().parse::<u16>().unwrap()))?.run();
mylog!(format!("http server start at address:{httpaddress}").as_str());
for implementor in inventory::iter::<CollectorIDaemonLoop> {
let obj = &(implementor.0);
mylog!(format!("daemon: {}",obj.classname()));
actix_rt::spawn(async move {
loop {
let result = obj.handle();
if result.is_err() {
mylog!(result.err().unwrap().to_string());
}
actix_rt::time::sleep(std::time::Duration::from_millis(obj.miniseconds() as u64)).await;
}
});
}
for implementor in inventory::iter::<CollectorIOnAppready> {
let obj = &(implementor.0);
mylog!(format!("appready: {}",obj.classname()));
actix_rt::spawn(async move {
let result=obj.handle().await;
if result.is_err(){
mylog!(result.err().unwrap().to_string());
}
});
}
httpserver.await;
mylog!("unreached");
Ok(())
}
async fn handler_http(obj: &Box<dyn IWeb>, a: HttpRequest, mut payload: Payload) -> HttpResponse {
let info: Query<Value> = web::Query::from_query(a.query_string()).unwrap();
let query: OnceMap<String, String> = OnceMap::new();
for (key, val) in info.into_inner().as_object().unwrap().iter() {
query.insert(key.as_str().to_string(), |_| {
format!("{}", val.as_str().unwrap())
});
}
let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await {
body.extend_from_slice(&chunk.unwrap());
}
let mut data = json!({});
let _data: Result<Value, error::Error> = serde_json::from_slice(&body);
if _data.is_ok() {
data = _data.unwrap();
}
let mut my_parameter = MyParameter::default();
my_parameter.set_data(data);
my_parameter.set_query_map(query);
let mut request_info=RequesterInfo::default();
request_info.from_app_name=my_parameter.get_string("__app_name");
request_info.from_action=my_parameter.get_string("__action");
request_info.from_ip=a.connection_info().realip_remote_addr().unwrap_or("").to_string();
let mut optional_group: Vec<String> = vec![];
let field_names = obj.parameters();
for name in field_names {
if my_parameter.contains_key(name.as_str()) {
continue
}
if name.as_str().starts_with("result") {
continue
}
if name.as_str().starts_with("error") {
continue
}
if !name.as_str().contains("optional") && !my_parameter.contains_key(name.as_str()) {
return HttpResponse::BadRequest().body(format!("Lack Parameter:{name}"))
}
if name.as_str().starts_with("optional") {
continue
}
if name.as_str().starts_with("optional_") {
return HttpResponse::BadRequest().body(format!("Optional Parameter Name Not Given :{name}"))
}
let index = name.as_str().find("_optional_");
if index.is_none() || index.unwrap() == 0 || index.unwrap() + 10 == name.len() {
return HttpResponse::BadRequest().body(format!("Optional Parameter Group Or Name Not Given :{name}"))
}
let after_index = index.unwrap();
let group_name = name.as_str()[..after_index].to_string();
if !optional_group.contains(&group_name) {
optional_group.push(group_name.clone());
}
if my_parameter.contains_key(name.as_str()) {
optional_group.retain(|value| *value != group_name);
}
}
if optional_group.len() > 0 {
let group_string = optional_group.join(",");
return HttpResponse::BadRequest().body(format!("Parameter For Optional Group Not Exists :{group_string} "))
}
match obj.handle(my_parameter, request_info).await {
Ok(val) => HttpResponse::Ok().body(val),
Err(e) => HttpResponse::Ok().body(e.to_string()),
}
}