vertigo_cli/serve/
serve_run.rs1use actix_proxy::IntoHttpResponse;
2use actix_web::{
3 App, HttpRequest, HttpResponse, HttpServer,
4 dev::{ServiceFactory, ServiceRequest},
5 rt::System,
6 web,
7};
8use std::{num::NonZeroUsize, time::Duration};
9
10use crate::commons::{
11 ErrorCode,
12 spawn::{ServerOwner, term_signal},
13};
14use crate::serve::mount_path::MountConfig;
15
16use super::{
17 ServeOpts, ServeOptsInner, server_state::ServerState, vertigo_install::vertigo_install,
18};
19
20async fn wait_for_port(addr: &str, port: u16) {
21 for i in 0..20 {
22 match std::net::TcpListener::bind(addr) {
23 Ok(listener) => {
24 drop(listener);
26 break;
27 }
28 Err(_) => {
29 log::warn!(
30 "Port {} is still in use, waiting 1s... ({}/20)",
31 port,
32 i + 1
33 );
34 tokio::time::sleep(Duration::from_secs(1)).await;
35 }
36 }
37 }
38}
39
40pub async fn run(opts: ServeOpts, port_watch: Option<u16>) -> Result<(), ErrorCode> {
41 log::info!("serve params => {opts:#?}");
42
43 let ServeOptsInner {
44 host,
45 port,
46 proxy,
47 mount_point,
48 env,
49 wasm_preload,
50 disable_hydration,
51 threads,
52 } = opts.inner;
53
54 let mount_config = MountConfig::new(
55 mount_point,
56 opts.common.dest_dir,
57 env,
58 wasm_preload,
59 disable_hydration,
60 )?;
61
62 ServerState::init_with_watch(&mount_config, port_watch)?;
63
64 let app = move || {
65 let mut app = App::new();
66
67 for (path, target) in &proxy {
68 app = install_proxy(app, path.clone(), target.clone());
69 }
70
71 app.configure(|cfg| {
72 vertigo_install(cfg, &mount_config);
73 })
74 };
75
76 let addr = format!("{host}:{port}");
77
78 wait_for_port(&addr, port).await;
79
80 let server =
81 HttpServer::new(app)
82 .workers(threads.unwrap_or_else(|| {
83 std::thread::available_parallelism().map_or(2, NonZeroUsize::get)
84 }))
85 .bind(addr.clone())
86 .map_err(|err| {
87 log::error!("Can't bind/serve on {addr}: {err}");
88 ErrorCode::ServeCantOpenPort
89 })?;
90
91 let server = server
92 .disable_signals()
93 .client_disconnect_timeout(Duration::from_secs(2))
94 .shutdown_timeout(5)
95 .run();
96
97 let handle = server.handle();
98 let handle2 = server.handle();
99
100 std::thread::spawn(move || System::new().block_on(server));
101
102 tokio::select! {
103 _ = ServerOwner { handle } => {},
104 msg = term_signal() => {
105 log::info!("{msg} received, shutting down");
106 handle2.stop(false).await;
107 }
108 }
109
110 Ok(())
111}
112
113fn install_proxy<T>(app: App<T>, path: String, target: String) -> App<T>
114where
115 T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
116{
117 app.service(web::scope(&path).default_service(web::to({
118 move |req: HttpRequest, body: web::Bytes| {
119 let path = path.clone();
120 let target = target.clone();
121 async move {
122 let method = req.method();
123 let uri = req.uri();
124 let current_path = uri.path();
125 let tail = current_path.strip_prefix(&path).unwrap_or(current_path);
126 let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
127
128 let target_url = format!("{target}{tail}{query}");
129
130 log::info!("proxy {method} {path}{tail} -> {target_url}");
131
132 let request = awc::Client::new().request_from(&target_url, req.head());
133
134 let response = if !body.is_empty() {
135 request.send_body(body)
136 } else {
137 request.send()
138 };
139
140 match response.await {
141 Ok(response) => response.into_http_response(),
142 Err(error) => {
143 let message = format!("Error fetching from url={target_url} error={error}");
144 HttpResponse::InternalServerError().body(message)
145 }
146 }
147 }
148 }
149 })))
150}