1use std::sync::Arc;
2use std::{convert::Infallible, fs, io, net::SocketAddr};
3
4use bytes::Bytes;
5use edgee_components_runtime::context::ComponentsContext;
6use http_body_util::combinators::BoxBody;
7use hyper_util::rt::{TokioExecutor, TokioIo};
8use hyper_util::server::conn::auto::Builder;
9use hyper_util::service::TowerToHyperService;
10use rustls::ServerConfig;
11use rustls_pki_types::{CertificateDer, PrivateKeyDer};
12use tokio::net::TcpListener;
13use tokio::sync::OnceCell;
14use tokio_rustls::TlsAcceptor;
15use tower::util::BoxCloneService;
16use tower_http::compression::CompressionBody;
17use tracing::{error, info};
18
19pub mod config;
20pub mod monitor;
21mod proxy;
22mod tools;
23
24type Body = CompressionBody<BoxBody<Bytes, Infallible>>;
25static COMPONENTS_CONTEXT: OnceCell<ComponentsContext> = OnceCell::const_new();
26
27pub fn init() -> anyhow::Result<()> {
28 let components_configuration = &config::get().components;
29 let ctx = ComponentsContext::new(components_configuration)?;
30
31 COMPONENTS_CONTEXT
32 .set(ctx)
33 .map_err(|err| anyhow::anyhow!("Failed to register ComponentsContext: {err}"))
34}
35
36pub fn get_components_ctx() -> &'static ComponentsContext {
37 COMPONENTS_CONTEXT
38 .get()
39 .expect("ComponentsContext should be registered")
40}
41
42pub async fn start() -> anyhow::Result<()> {
43 use futures::future::try_join_all;
44
45 let config = config::get();
46 let mut tasks = Vec::new();
47
48 if config.http.is_some() {
49 tasks.push(tokio::spawn(async {
50 if let Err(err) = http().await {
51 anyhow::bail!("Failed to start HTTP entrypoint: {err}");
52 }
53
54 Ok(())
55 }));
56 }
57
58 if config.https.is_some() {
59 tasks.push(tokio::spawn(async {
60 if let Err(err) = https().await {
61 anyhow::bail!("Failed to start HTTPS entrypoint: {err}");
62 }
63
64 Ok(())
65 }));
66 }
67
68 try_join_all(tasks)
69 .await?
70 .into_iter()
71 .collect::<Result<Vec<_>, _>>()
72 .map(|_| ())
73}
74
75async fn http() -> anyhow::Result<()> {
76 let cfg = config::get()
77 .http
78 .as_ref()
79 .ok_or_else(|| anyhow::anyhow!("HTTP configuration is missing"))?;
80
81 info!(
82 address = cfg.address,
83 force_https = cfg.force_https,
84 "Starting HTTP entrypoint"
85 );
86
87 let addr: SocketAddr = cfg.address.parse()?;
88 let listener = TcpListener::bind(addr).await?;
89 loop {
90 let (stream, remote_addr) = listener.accept().await?;
91 let io = TokioIo::new(stream);
92
93 let service = TowerToHyperService::new(make_service(remote_addr, "http"));
94
95 tokio::spawn(async move {
96 if let Err(err) = Builder::new(TokioExecutor::new())
97 .serve_connection_with_upgrades(io, service)
98 .await
99 {
100 error!(?err, ?remote_addr, "failed to serve connections");
101 }
102 });
103 }
104}
105
106async fn https() -> anyhow::Result<()> {
107 let cfg = config::get()
108 .https
109 .as_ref()
110 .ok_or_else(|| anyhow::anyhow!("HTTPS configuration is missing"))?;
111
112 info!(address = cfg.address, "Starting HTTPS entrypoint");
113
114 let addr: SocketAddr = cfg.address.parse()?;
115 let listener = TcpListener::bind(addr).await?;
116 fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
117 let certfile = fs::File::open(filename).unwrap();
118 let mut reader = io::BufReader::new(certfile);
119 rustls_pemfile::certs(&mut reader).collect()
120 }
121
122 fn load_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
123 let keyfile = fs::File::open(filename).unwrap();
124 let mut reader = io::BufReader::new(keyfile);
125 rustls_pemfile::private_key(&mut reader).map(|key| key.unwrap())
126 }
127
128 let _ = rustls::crypto::ring::default_provider().install_default();
129 let certs = load_certs(&cfg.cert).unwrap();
130 let key = load_key(&cfg.key).unwrap();
131 let mut server_config = ServerConfig::builder()
132 .with_no_client_auth()
133 .with_single_cert(certs, key)
134 .unwrap();
135 server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
136 let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
137
138 loop {
139 let (stream, remote_addr) = listener.accept().await?;
140 let tls_acceptor = tls_acceptor.clone();
141 tokio::spawn(async move {
142 let tls_stream = match tls_acceptor.accept(stream).await {
143 Ok(tls_stream) => tls_stream,
144 Err(err) => {
145 error!(?err, "failed to perform tls handshake");
146 return;
147 }
148 };
149 let io = TokioIo::new(tls_stream);
150
151 let service = TowerToHyperService::new(make_service(remote_addr, "https"));
152
153 if let Err(err) = Builder::new(TokioExecutor::new())
154 .serve_connection_with_upgrades(io, service)
155 .await
156 {
157 error!(?err, "failed to serve connections");
158 }
159 });
160 }
161}
162
163fn make_service(
173 remote_addr: SocketAddr,
174 proto: &str,
175) -> BoxCloneService<proxy::Request, http::Response<Body>, anyhow::Error> {
176 use tower::{ServiceBuilder, ServiceExt};
177 use tower_http::compression::CompressionLayer;
178
179 let proto = proto.to_string();
180 ServiceBuilder::new()
181 .layer(CompressionLayer::new())
182 .service_fn(move |req| {
183 let proto = proto.clone();
184 async move { proxy::handle_request(req, remote_addr, &proto).await }
185 })
186 .boxed_clone()
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use crate::config::init_test_config;
193 use pretty_assertions::assert_eq;
194
195 #[test]
196 fn test_init() {
197 init_test_config();
198 init().unwrap();
199 let ctx = get_components_ctx();
200 assert_eq!(ctx.engine.is_async(), true);
201 }
202}