vertigo_cli/serve/
serve_run.rs1use actix_proxy::IntoHttpResponse;
2use actix_web::{
3 App, HttpRequest, HttpResponse, HttpServer,
4 dev::{ServiceFactory, ServiceRequest},
5 rt::System,
6 web,
7};
8use std::{num::NonZeroUsize, time::Duration};
9
10use crate::commons::{
11 ErrorCode,
12 spawn::{ServerOwner, term_signal},
13};
14use crate::serve::mount_path::MountConfig;
15
16use super::{
17 ServeOpts, ServeOptsInner, server_state::ServerState, vertigo_install::vertigo_install,
18};
19
20pub async fn run(opts: ServeOpts, port_watch: Option<u16>) -> Result<(), ErrorCode> {
21 log::info!("serve params => {opts:#?}");
22
23 let ServeOptsInner {
24 host,
25 port,
26 proxy,
27 mount_point,
28 env,
29 wasm_preload,
30 disable_hydration,
31 threads,
32 } = opts.inner;
33
34 let mount_config = MountConfig::new(
35 mount_point,
36 opts.common.dest_dir,
37 env,
38 wasm_preload,
39 disable_hydration,
40 )?;
41
42 ServerState::init_with_watch(&mount_config, port_watch)?;
43
44 let app = move || {
45 let mut app = App::new();
46
47 for (path, target) in &proxy {
48 app = install_proxy(app, path.clone(), target.clone());
49 }
50
51 app.configure(|cfg| {
52 vertigo_install(cfg, &mount_config);
53 })
54 };
55
56 let addr = format!("{host}:{port}");
57
58 let server =
59 HttpServer::new(app)
60 .workers(threads.unwrap_or_else(|| {
61 std::thread::available_parallelism().map_or(2, NonZeroUsize::get)
62 }))
63 .bind(addr.clone())
64 .map_err(|err| {
65 log::error!("Can't bind/serve on {addr}: {err}");
66 ErrorCode::ServeCantOpenPort
67 })?;
68
69 let server = server
70 .disable_signals()
71 .client_disconnect_timeout(Duration::from_secs(2))
72 .shutdown_timeout(5)
73 .run();
74
75 let handle = server.handle();
76 let handle2 = server.handle();
77
78 std::thread::spawn(move || System::new().block_on(server));
79
80 tokio::select! {
81 _ = ServerOwner { handle } => {},
82 msg = term_signal() => {
83 log::info!("{msg} received, shutting down");
84 handle2.stop(true).await;
85 }
86 }
87
88 Ok(())
89}
90
91fn install_proxy<T>(app: App<T>, path: String, target: String) -> App<T>
92where
93 T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
94{
95 app.service(web::scope(&path).default_service(web::to({
96 move |req: HttpRequest, body: web::Bytes| {
97 let path = path.clone();
98 let target = target.clone();
99 async move {
100 let method = req.method();
101 let uri = req.uri();
102 let current_path = uri.path();
103 let tail = current_path.strip_prefix(&path).unwrap_or(current_path);
104 let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
105
106 let target_url = format!("{target}{tail}{query}");
107
108 log::info!("proxy {method} {path}{tail} -> {target_url}");
109
110 let request = awc::Client::new().request_from(&target_url, req.head());
111
112 let response = if !body.is_empty() {
113 request.send_body(body)
114 } else {
115 request.send()
116 };
117
118 match response.await {
119 Ok(response) => response.into_http_response(),
120 Err(error) => {
121 let message = format!("Error fetching from url={target_url} error={error}");
122 HttpResponse::InternalServerError().body(message)
123 }
124 }
125 }
126 }
127 })))
128}