use std::borrow::Cow;
use std::env;
use bytes::Bytes;
use clap::{ArgAction, Command, arg};
use config::Config;
use http_body_util::Full;
use http_body_util::combinators::BoxBody;
use hyper::{Request, Response, StatusCode};
use prosa::core::adaptor::Adaptor;
use prosa::core::error::ProcError;
use prosa::core::main::MainRunnable as _;
use prosa::core::proc::{Proc, ProcBusParam as _, ProcConfig};
use prosa::core::settings::settings;
use prosa::stub::adaptor::StubParotAdaptor;
use prosa::stub::proc::StubSettings;
use prosa::{core::main::MainProc, stub::proc::StubProc};
use prosa_hyper::server::adaptor::{HyperServerAdaptor, default_srv_error_response};
use prosa_hyper::server::proc::{HyperServerProc, HyperServerSettings};
use prosa_hyper::{HyperResp, PRODUCT_VERSION_HEADER};
use prosa_utils::config::tracing::TelemetryFilter;
use prosa_utils::msg::simple_string_tvf::SimpleStringTvf;
use serde::{Deserialize, Serialize};
use tracing::debug;
#[derive(Debug, Adaptor, Clone)]
pub struct HyperDemoAdaptor {
prosa_name: String,
}
impl<M> HyperServerAdaptor<M> for HyperDemoAdaptor
where
M: 'static
+ std::marker::Send
+ std::marker::Sync
+ std::marker::Sized
+ std::clone::Clone
+ std::fmt::Debug
+ prosa_utils::msg::tvf::Tvf
+ std::default::Default,
{
fn new(proc: &HyperServerProc<M>) -> Result<Self, Box<dyn ProcError + Send + Sync>> {
Ok(HyperDemoAdaptor {
prosa_name: proc.name().to_string(),
})
}
async fn process_http_request(
&self,
req: Request<hyper::body::Incoming>,
) -> crate::HyperResp<Self, M> {
match req.uri().path() {
"/" => Response::builder()
.header("Server", PRODUCT_VERSION_HEADER)
.body(BoxBody::new(Full::new(Bytes::from(format!(
"{} - Home of {}",
if req.version() == hyper::Version::HTTP_2 {
"H2"
} else {
"HTTP/1.1"
},
self.prosa_name,
)))))
.into(),
"/test" => {
let mut tvf_req = M::default();
tvf_req.put_string(1, req.method().to_string());
tvf_req.put_string(2, "/test");
HyperResp::SrvReq(
String::from("SRV_TEST"),
tvf_req,
Box::new(move |adaptor, result| match result {
Ok(resp) => {
let body = resp
.get_string(10)
.unwrap_or(Cow::Owned(String::from("empty body")));
<HyperDemoAdaptor as HyperServerAdaptor<M>>::response_builder(
adaptor,
StatusCode::OK,
)
.body(BoxBody::new(Full::new(Bytes::from(format!(
"Body: {body}\nTvfResp: {resp:?}"
)))))
.map_err(|e| e.into())
}
Err(err) => default_srv_error_response(&err, |s| {
<HyperDemoAdaptor as HyperServerAdaptor<M>>::response_builder(
adaptor, s,
)
}),
}),
)
}
_ => Response::builder()
.status(404)
.header("Server", PRODUCT_VERSION_HEADER)
.body(BoxBody::new(Full::new(Bytes::from("Not Found"))))
.into(),
}
}
}
#[settings]
#[derive(Debug, Default, Deserialize, Serialize, Clone)]
pub(crate) struct MainHyperSettings {
pub(crate) hyper_server: HyperServerSettings,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let matches = Command::new("hyper")
.version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION"))
.arg(arg!(-v - -verbose).action(ArgAction::SetTrue))
.arg(arg!(-s --stub "Start a Stub processor").action(ArgAction::SetTrue))
.arg(
arg!(-c --config <CONFIG_PATH> "Path of the Hyper ProSA server configuration file")
.default_value("examples/config.yml"),
)
.get_matches();
let config = Config::builder()
.add_source(config::File::with_name(
matches.get_one::<String>("config").unwrap().as_str(),
))
.add_source(
config::Environment::with_prefix("PROSA")
.try_parsing(true)
.separator("_")
.list_separator(" "),
)
.build()
.unwrap();
let prosa_hyper_settings = config.try_deserialize::<MainHyperSettings>()?;
println!("ProSA HYPER settings: {prosa_hyper_settings:?}");
let filter = TelemetryFilter::default();
prosa_hyper_settings.observability.tracing_init(&filter)?;
let (bus, main) = MainProc::<SimpleStringTvf>::create(&prosa_hyper_settings, Some(2));
debug!("Launch the main task");
let main_task = main.run();
debug!("Start the Hyper processor");
let http_proc = HyperServerProc::<SimpleStringTvf>::create(
1,
String::from("hyper"),
bus.clone(),
prosa_hyper_settings.hyper_server,
);
Proc::<HyperDemoAdaptor>::run(http_proc);
if matches.contains_id("stub") && matches.get_flag("stub") {
debug!("Start a Stub processor");
let stub_settings = StubSettings::new(vec![String::from("SRV_TEST")]);
let stub_proc = StubProc::<SimpleStringTvf>::create(
2,
String::from("STUB_PROC"),
bus.clone(),
stub_settings,
);
Proc::<StubParotAdaptor>::run(stub_proc);
}
main_task.await;
Ok(())
}