usc 1.20230730.1349

A common lib for unitedservices
Documentation
use std::any::Any;
use std::net::SocketAddr;
use std::time;
use crate::api::{CollectorIWeb, IWeb, RequesterInfo};


use actix_web::web::{Payload, Query};
use actix_web::{
    web, App, HttpRequest, HttpResponse, HttpServer,
};
use downcast_rs::Downcast;

use futures_util::StreamExt;
use once_map::OnceMap;
use serde_json::{error, json, Value};
use crate::cin::{CollectorIDaemonLoop, CollectorIGrpcServer, CollectorIOnAppready};

use crate::myparameter::*;
use std::env;
use std::ops::Deref;
use chrono::Duration;
use sqlx::Error;

use tonic::{transport::Server, Request, Response, Status};
use tonic::body::BoxBody;
use tonic::codegen::http::{request, response};
use tonic::codegen::{ Service};
use tonic::server::NamedService;
use tonic::transport::Body;
use tracing::span;
use crate::{mydbg, myjson, mylog};
use crate::myerror::MyError;

pub async fn main() -> std::io::Result<()>  {

    //grpc
    let  mut serverconfig = Server::builder().timeout(time::Duration::from_secs(6 * 3600)).http2_keepalive_timeout(Some(time::Duration::from_secs(6 * 3600))).trace_fn(|x| {
        mylog!(x.uri(),x.method());
        span!(tracing::Level::TRACE, "GRPC","{} {} {:?}",x.method(), x.uri(),x.headers())
    }).accept_http1(true);
    let mut server=serverconfig.add_optional_service(Some(crate::grpcs::default_business::DefaultBusiness::server()));
     for implementor in inventory::iter::<CollectorIGrpcServer> {
        let obj = &(implementor.0);
         mylog!(format!("grpc server: {}",obj.classname()));
        server=obj.add_to_server(server);
    }
    let grpcport=env::var("GRPC_PORT").unwrap_or("8088".to_string());
    let grpcaddress=format!("0.0.0.0:{}",grpcport);
    let addr:SocketAddr =grpcaddress.parse().unwrap();
    actix_rt::spawn(async move {
        mylog!(format!("grpc server start at address:{grpcaddress}"));
        server.serve(addr).await.unwrap();
    });


    //http
    let httpport=env::var("WEB_PORT").unwrap_or("80".to_string());
    let httpaddress=format!("0.0.0.0:{}",httpport);
    for implementor in inventory::iter::<CollectorIWeb> {
        let obj = &(implementor.0);
        mydbg!(format!("http server: {}",obj.route()));
    }
    let httpserver=HttpServer::new(|| {
        let mut app = App::new();
        for implementor in inventory::iter::<CollectorIWeb> {
            let obj = &(implementor.0);
            app = app.app_data(obj).route(
                obj.route(),
                web::post().to(
                    move |a: HttpRequest,  payload: web::Payload| async move {
                        handler_http(obj, a, payload).await
                    },
                ),
            );
            app = app.app_data(obj).route(
                obj.route(),
                web::get().to(
                    move |a: HttpRequest,  payload: web::Payload| async move {
                        handler_http(obj, a, payload).await
                    },
                ),
            );
        }
        app.service(actix_files::Files::new("static", ".").show_files_listing())
    }).bind(("0.0.0.0", httpport.as_str().parse::<u16>().unwrap()))?.run();
    mylog!(format!("http server start at address:{httpaddress}").as_str());



    //daemon
    for implementor in inventory::iter::<CollectorIDaemonLoop> {
        let obj = &(implementor.0);
        mylog!(format!("daemon: {}",obj.classname()));
        actix_rt::spawn(async move {
            loop {
                let result = obj.handle();
                if result.is_err() {
                    mylog!(result.err().unwrap().to_string());
                }
                actix_rt::time::sleep(std::time::Duration::from_millis(obj.miniseconds() as u64)).await;
            }
        });
    }

    //ready
    for implementor in inventory::iter::<CollectorIOnAppready> {
        let obj = &(implementor.0);
        mylog!(format!("appready: {}",obj.classname()));
        actix_rt::spawn(async move {
            let result=obj.handle().await;
            if result.is_err(){
                mylog!(result.err().unwrap().to_string());
            }
        });
    }

    httpserver.await;
    mylog!("unreached");
    Ok(())
}

async fn handler_http(obj: &Box<dyn IWeb>, a: HttpRequest, mut payload: Payload) -> HttpResponse {
    let info: Query<Value> = web::Query::from_query(a.query_string()).unwrap();
    let query: OnceMap<String, String> = OnceMap::new();
    for (key, val) in info.into_inner().as_object().unwrap().iter() {
        query.insert(key.as_str().to_string(), |_| {
            format!("{}", val.as_str().unwrap())
        });
    }
    let mut body = web::BytesMut::new();
    while let Some(chunk) = payload.next().await {
        body.extend_from_slice(&chunk.unwrap());
    }

    // Deserialize the body to a serde_json::Value
    let mut data = json!({});
    let _data: Result<Value, error::Error> = serde_json::from_slice(&body);
    if _data.is_ok() {
        data = _data.unwrap();
    }
    let mut my_parameter = MyParameter::default();
    my_parameter.set_data(data);
    my_parameter.set_query_map(query);

    let mut request_info=RequesterInfo::default();
    request_info.from_app_name=my_parameter.get_string("__app_name");
    request_info.from_action=my_parameter.get_string("__action");
    request_info.from_ip=a.connection_info().realip_remote_addr().unwrap_or("").to_string();


    let mut optional_group: Vec<String> = vec![];
    let field_names = obj.parameters();
    for name in field_names {
        if my_parameter.contains_key(name.as_str()) {
            continue
        }
        if name.as_str().starts_with("result") {
            continue
        }
        if name.as_str().starts_with("error") {
            continue
        }
        if !name.as_str().contains("optional") && !my_parameter.contains_key(name.as_str()) {
            return HttpResponse::BadRequest().body(format!("Lack Parameter:{name}"))
        }
        if name.as_str().starts_with("optional") {
            continue
        }
        if name.as_str().starts_with("optional_") {
            return HttpResponse::BadRequest().body(format!("Optional Parameter Name Not Given :{name}"))
        }
        let index = name.as_str().find("_optional_");

        if index.is_none() || index.unwrap() == 0 || index.unwrap() + 10 == name.len() {
            return HttpResponse::BadRequest().body(format!("Optional Parameter Group Or Name Not Given :{name}"))
        }
        let after_index = index.unwrap();
        let group_name = name.as_str()[..after_index].to_string();

        if !optional_group.contains(&group_name) {
            optional_group.push(group_name.clone());
        }
        if my_parameter.contains_key(name.as_str()) {
            optional_group.retain(|value| *value != group_name);
        }
    }
    if optional_group.len() > 0 {
        let group_string = optional_group.join(",");

        return HttpResponse::BadRequest().body(format!("Parameter For Optional Group Not Exists :{group_string} "))
    }


    match obj.handle(my_parameter, request_info).await {
        Ok(val) => HttpResponse::Ok().body(val),
        Err(e) => HttpResponse::Ok().body(e.to_string()),
    }
}