use std::sync::Arc;
use kafka_threadpool::kafka_publisher::KafkaPublisher;
use kafka_threadpool::start_threadpool::start_threadpool;
use crate::pools::get_db_pool::get_db_pool;
use crate::tls::tls_info::TlsInfo;
use crate::core::core_config::CoreConfig;
use crate::core::server::core_services::CoreServices;
pub async fn start_core_server(
config: &CoreConfig,
) -> std::result::Result<String, hyper::Error> {
let db_pool = get_db_pool(config).await;
let kafka_pool: KafkaPublisher =
start_threadpool(Some(&config.label)).await;
let listener = match tokio::net::TcpListener::bind(
&config.api_config.socket_addr.unwrap(),
)
.await
{
Ok(v) => v,
Err(e) => {
let err_msg = format!(
"Server startup failed - unable to \
open server server_endpoint: {} with err='{e}' - stopping",
config.api_config.server_endpoint
);
error!("{err_msg}");
panic!("{err_msg}");
}
};
let local_addr = listener.local_addr().unwrap();
let http = hyper::server::conn::Http::new();
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(
config.api_config.server_config.clone(),
));
loop {
let (conn, remote_addr) = listener.accept().await.unwrap();
let acceptor = acceptor.clone();
let http = http.clone();
let cloned_config = config.clone();
let cloned_db_pool = db_pool.clone();
let cloned_kafka_pool = kafka_pool.clone();
let fut = async move {
match acceptor.accept(conn).await {
Ok(stream) => {
let (_io, tls_connection) = stream.get_ref();
let supported_services = CoreServices {
config: cloned_config,
db_pool: cloned_db_pool,
kafka_pool: cloned_kafka_pool,
local_addr,
remote_addr,
tls_info: Some(TlsInfo::from_tls_connection(
tls_connection,
)),
};
if let Err(e) =
http.serve_connection(stream, supported_services).await
{
let err_msg = format!("{e}");
if !err_msg.contains("connection error: not connected")
&& !err_msg
.contains("connection error: connection reset")
{
trace!("hyper server hit an internal error: {e}");
}
}
}
Err(e) => error!("hyper server tls error: {e}"),
}
};
tokio::spawn(fut);
}
}