wrangler/commands/dev/edge/server/
http.rs1use super::preview_request;
2use crate::commands::dev::utils::{get_path_as_str, rewrite_redirect};
3use crate::commands::dev::{self, Protocol, ServerConfig};
4use crate::terminal::emoji;
5
6use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use chrono::prelude::*;
10use hyper::service::{make_service_fn, service_fn};
11use hyper::upgrade::OnUpgrade;
12use hyper::Server;
13use tokio::sync::oneshot::{Receiver, Sender};
14
15pub async fn http(
16 server_config: ServerConfig,
17 preview_token: Arc<Mutex<String>>,
18 host: String,
19 upstream_protocol: Protocol,
20 shutdown_channel: (Receiver<()>, Sender<()>),
21) -> Result<()> {
22 let client = dev::client();
24
25 let listening_address = server_config.listening_address;
26
27 let make_service = make_service_fn(move |_| {
29 let client = client.to_owned();
30 let preview_token = preview_token.to_owned();
31 let host = host.to_owned();
32 let server_config = server_config.to_owned();
33
34 async move {
35 Ok::<_, anyhow::Error>(service_fn(move |req| {
36 let is_websocket = req
37 .headers()
38 .get("upgrade")
39 .map_or(false, |h| h.as_bytes() == b"websocket");
40
41 let client = client.to_owned();
42 let preview_token = preview_token.lock().unwrap().to_owned();
43 let host = host.to_owned();
44 let version = req.version();
45 let (parts, body) = req.into_parts();
46 let local_host = format!(
47 "{}:{}",
48 server_config.listening_address.ip().to_string(),
49 server_config.listening_address.port().to_string()
50 );
51 let req_method = parts.method.to_string();
52 let now: DateTime<Local> = Local::now();
53 let path = get_path_as_str(&parts.uri);
54 async move {
55 let mut req = preview_request(
56 parts,
57 body,
58 preview_token.to_owned(),
59 host.clone(),
60 upstream_protocol,
61 );
62 let client_on_upgrade = req.extensions_mut().remove::<OnUpgrade>();
63
64 let mut resp = client.request(req).await?;
65 super::maybe_proxy_websocket(is_websocket, client_on_upgrade, &mut resp);
66 rewrite_redirect(&mut resp, &host, &local_host, false);
67
68 println!(
69 "[{}] {} {}{} {:?} {}",
70 now.format("%Y-%m-%d %H:%M:%S"),
71 req_method,
72 host,
73 path,
74 version,
75 resp.status()
76 );
77 Ok::<_, anyhow::Error>(resp)
78 }
79 }))
80 }
81 });
82
83 let (rx, tx) = shutdown_channel;
84 let server = Server::bind(&listening_address)
85 .serve(make_service)
86 .with_graceful_shutdown(async {
87 rx.await.expect("Could not receive shutdown initiation");
88 });
89 println!("{} Listening on http://{}", emoji::EAR, listening_address);
90
91 if let Err(e) = server.await {
92 eprintln!("{}", e);
93 }
94 if let Err(e) = tx.send(()) {
95 log::error!("Could not acknowledge dev http listener shutdown: {:?}", e);
96 }
97
98 Ok(())
99}