wrangler/commands/dev/edge/server/
http.rs

1use super::preview_request;
2use crate::commands::dev::utils::{get_path_as_str, rewrite_redirect};
3use crate::commands::dev::{self, Protocol, ServerConfig};
4use crate::terminal::emoji;
5
6use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use chrono::prelude::*;
10use hyper::service::{make_service_fn, service_fn};
11use hyper::upgrade::OnUpgrade;
12use hyper::Server;
13use tokio::sync::oneshot::{Receiver, Sender};
14
15pub async fn http(
16    server_config: ServerConfig,
17    preview_token: Arc<Mutex<String>>,
18    host: String,
19    upstream_protocol: Protocol,
20    shutdown_channel: (Receiver<()>, Sender<()>),
21) -> Result<()> {
22    // set up https client to connect to the preview service
23    let client = dev::client();
24
25    let listening_address = server_config.listening_address;
26
27    // create a closure that hyper will use later to handle HTTP requests
28    let make_service = make_service_fn(move |_| {
29        let client = client.to_owned();
30        let preview_token = preview_token.to_owned();
31        let host = host.to_owned();
32        let server_config = server_config.to_owned();
33
34        async move {
35            Ok::<_, anyhow::Error>(service_fn(move |req| {
36                let is_websocket = req
37                    .headers()
38                    .get("upgrade")
39                    .map_or(false, |h| h.as_bytes() == b"websocket");
40
41                let client = client.to_owned();
42                let preview_token = preview_token.lock().unwrap().to_owned();
43                let host = host.to_owned();
44                let version = req.version();
45                let (parts, body) = req.into_parts();
46                let local_host = format!(
47                    "{}:{}",
48                    server_config.listening_address.ip().to_string(),
49                    server_config.listening_address.port().to_string()
50                );
51                let req_method = parts.method.to_string();
52                let now: DateTime<Local> = Local::now();
53                let path = get_path_as_str(&parts.uri);
54                async move {
55                    let mut req = preview_request(
56                        parts,
57                        body,
58                        preview_token.to_owned(),
59                        host.clone(),
60                        upstream_protocol,
61                    );
62                    let client_on_upgrade = req.extensions_mut().remove::<OnUpgrade>();
63
64                    let mut resp = client.request(req).await?;
65                    super::maybe_proxy_websocket(is_websocket, client_on_upgrade, &mut resp);
66                    rewrite_redirect(&mut resp, &host, &local_host, false);
67
68                    println!(
69                        "[{}] {} {}{} {:?} {}",
70                        now.format("%Y-%m-%d %H:%M:%S"),
71                        req_method,
72                        host,
73                        path,
74                        version,
75                        resp.status()
76                    );
77                    Ok::<_, anyhow::Error>(resp)
78                }
79            }))
80        }
81    });
82
83    let (rx, tx) = shutdown_channel;
84    let server = Server::bind(&listening_address)
85        .serve(make_service)
86        .with_graceful_shutdown(async {
87            rx.await.expect("Could not receive shutdown initiation");
88        });
89    println!("{} Listening on http://{}", emoji::EAR, listening_address);
90
91    if let Err(e) = server.await {
92        eprintln!("{}", e);
93    }
94    if let Err(e) = tx.send(()) {
95        log::error!("Could not acknowledge dev http listener shutdown: {:?}", e);
96    }
97
98    Ok(())
99}