edgee_server/
lib.rs

1use std::sync::Arc;
2use std::{convert::Infallible, fs, io, net::SocketAddr};
3
4use bytes::Bytes;
5use edgee_components_runtime::context::ComponentsContext;
6use http_body_util::combinators::BoxBody;
7use hyper_util::rt::{TokioExecutor, TokioIo};
8use hyper_util::server::conn::auto::Builder;
9use hyper_util::service::TowerToHyperService;
10use rustls::ServerConfig;
11use rustls_pki_types::{CertificateDer, PrivateKeyDer};
12use tokio::net::TcpListener;
13use tokio::sync::OnceCell;
14use tokio_rustls::TlsAcceptor;
15use tower::util::BoxCloneService;
16use tower_http::compression::CompressionBody;
17use tracing::{error, info};
18
19pub mod config;
20pub mod monitor;
21mod proxy;
22mod tools;
23
24type Body = CompressionBody<BoxBody<Bytes, Infallible>>;
25static COMPONENTS_CONTEXT: OnceCell<ComponentsContext> = OnceCell::const_new();
26
27pub fn init() -> anyhow::Result<()> {
28    let components_configuration = &config::get().components;
29    let ctx = ComponentsContext::new(components_configuration)?;
30
31    COMPONENTS_CONTEXT
32        .set(ctx)
33        .map_err(|err| anyhow::anyhow!("Failed to register ComponentsContext: {err}"))
34}
35
36pub fn get_components_ctx() -> &'static ComponentsContext {
37    COMPONENTS_CONTEXT
38        .get()
39        .expect("ComponentsContext should be registered")
40}
41
42pub async fn start() -> anyhow::Result<()> {
43    use futures::future::try_join_all;
44
45    let config = config::get();
46    let mut tasks = Vec::new();
47
48    if config.http.is_some() {
49        tasks.push(tokio::spawn(async {
50            if let Err(err) = http().await {
51                anyhow::bail!("Failed to start HTTP entrypoint: {err}");
52            }
53
54            Ok(())
55        }));
56    }
57
58    if config.https.is_some() {
59        tasks.push(tokio::spawn(async {
60            if let Err(err) = https().await {
61                anyhow::bail!("Failed to start HTTPS entrypoint: {err}");
62            }
63
64            Ok(())
65        }));
66    }
67
68    try_join_all(tasks)
69        .await?
70        .into_iter()
71        .collect::<Result<Vec<_>, _>>()
72        .map(|_| ())
73}
74
75async fn http() -> anyhow::Result<()> {
76    let cfg = config::get()
77        .http
78        .as_ref()
79        .ok_or_else(|| anyhow::anyhow!("HTTP configuration is missing"))?;
80
81    info!(
82        address = cfg.address,
83        force_https = cfg.force_https,
84        "Starting HTTP entrypoint"
85    );
86
87    let addr: SocketAddr = cfg.address.parse()?;
88    let listener = TcpListener::bind(addr).await?;
89    loop {
90        let (stream, remote_addr) = listener.accept().await?;
91        let io = TokioIo::new(stream);
92
93        let service = TowerToHyperService::new(make_service(remote_addr, "http"));
94
95        tokio::spawn(async move {
96            if let Err(err) = Builder::new(TokioExecutor::new())
97                .serve_connection_with_upgrades(io, service)
98                .await
99            {
100                error!(?err, ?remote_addr, "failed to serve connections");
101            }
102        });
103    }
104}
105
106async fn https() -> anyhow::Result<()> {
107    let cfg = config::get()
108        .https
109        .as_ref()
110        .ok_or_else(|| anyhow::anyhow!("HTTPS configuration is missing"))?;
111
112    info!(address = cfg.address, "Starting HTTPS entrypoint");
113
114    let addr: SocketAddr = cfg.address.parse()?;
115    let listener = TcpListener::bind(addr).await?;
116    fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
117        let certfile = fs::File::open(filename).unwrap();
118        let mut reader = io::BufReader::new(certfile);
119        rustls_pemfile::certs(&mut reader).collect()
120    }
121
122    fn load_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
123        let keyfile = fs::File::open(filename).unwrap();
124        let mut reader = io::BufReader::new(keyfile);
125        rustls_pemfile::private_key(&mut reader).map(|key| key.unwrap())
126    }
127
128    let _ = rustls::crypto::ring::default_provider().install_default();
129    let certs = load_certs(&cfg.cert).unwrap();
130    let key = load_key(&cfg.key).unwrap();
131    let mut server_config = ServerConfig::builder()
132        .with_no_client_auth()
133        .with_single_cert(certs, key)
134        .unwrap();
135    server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
136    let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
137
138    loop {
139        let (stream, remote_addr) = listener.accept().await?;
140        let tls_acceptor = tls_acceptor.clone();
141        tokio::spawn(async move {
142            let tls_stream = match tls_acceptor.accept(stream).await {
143                Ok(tls_stream) => tls_stream,
144                Err(err) => {
145                    error!(?err, "failed to perform tls handshake");
146                    return;
147                }
148            };
149            let io = TokioIo::new(tls_stream);
150
151            let service = TowerToHyperService::new(make_service(remote_addr, "https"));
152
153            if let Err(err) = Builder::new(TokioExecutor::new())
154                .serve_connection_with_upgrades(io, service)
155                .await
156            {
157                error!(?err, "failed to serve connections");
158            }
159        });
160    }
161}
162
163/// Create the service pipeline, using the proxy handler as the "final" request handler
164///
165/// Arguments:
166/// - `remote_addr`: Remote client address
167/// - `proto`: Protocol used (HTTP or HTTPS)
168///
169/// Returns:
170///
171/// A full service pipeline for handling a client request
172fn make_service(
173    remote_addr: SocketAddr,
174    proto: &str,
175) -> BoxCloneService<proxy::Request, http::Response<Body>, anyhow::Error> {
176    use tower::{ServiceBuilder, ServiceExt};
177    use tower_http::compression::CompressionLayer;
178
179    let proto = proto.to_string();
180    ServiceBuilder::new()
181        .layer(CompressionLayer::new())
182        .service_fn(move |req| {
183            let proto = proto.clone();
184            async move { proxy::handle_request(req, remote_addr, &proto).await }
185        })
186        .boxed_clone()
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use crate::config::init_test_config;
193    use pretty_assertions::assert_eq;
194
195    #[test]
196    fn test_init() {
197        init_test_config();
198        init().unwrap();
199        let ctx = get_components_ctx();
200        assert_eq!(ctx.engine.is_async(), true);
201    }
202}