vertigo_cli/serve/
serve_run.rs

1use actix_proxy::IntoHttpResponse;
2use actix_web::{
3    App, HttpRequest, HttpResponse, HttpServer,
4    dev::{ServiceFactory, ServiceRequest},
5    rt::System,
6    web,
7};
8use std::{num::NonZeroUsize, time::Duration};
9
10use crate::commons::{
11    ErrorCode,
12    spawn::{ServerOwner, term_signal},
13};
14use crate::serve::mount_path::MountConfig;
15
16use super::{
17    ServeOpts, ServeOptsInner, server_state::ServerState, vertigo_install::vertigo_install,
18};
19
20pub async fn run(opts: ServeOpts, port_watch: Option<u16>) -> Result<(), ErrorCode> {
21    log::info!("serve params => {opts:#?}");
22
23    let ServeOptsInner {
24        host,
25        port,
26        proxy,
27        mount_point,
28        env,
29        wasm_preload,
30        disable_hydration,
31        threads,
32    } = opts.inner;
33
34    let mount_config = MountConfig::new(
35        mount_point,
36        opts.common.dest_dir,
37        env,
38        wasm_preload,
39        disable_hydration,
40    )?;
41
42    ServerState::init_with_watch(&mount_config, port_watch)?;
43
44    let app = move || {
45        let mut app = App::new();
46
47        for (path, target) in &proxy {
48            app = install_proxy(app, path.clone(), target.clone());
49        }
50
51        app.configure(|cfg| {
52            vertigo_install(cfg, &mount_config);
53        })
54    };
55
56    let addr = format!("{host}:{port}");
57
58    let server =
59        HttpServer::new(app)
60            .workers(threads.unwrap_or_else(|| {
61                std::thread::available_parallelism().map_or(2, NonZeroUsize::get)
62            }))
63            .bind(addr.clone())
64            .map_err(|err| {
65                log::error!("Can't bind/serve on {addr}: {err}");
66                ErrorCode::ServeCantOpenPort
67            })?;
68
69    let server = server
70        .disable_signals()
71        .client_disconnect_timeout(Duration::from_secs(2))
72        .shutdown_timeout(5)
73        .run();
74
75    let handle = server.handle();
76    let handle2 = server.handle();
77
78    std::thread::spawn(move || System::new().block_on(server));
79
80    tokio::select! {
81        _ = ServerOwner { handle } => {},
82        msg = term_signal() => {
83            log::info!("{msg} received, shutting down");
84            handle2.stop(true).await;
85        }
86    }
87
88    Ok(())
89}
90
91fn install_proxy<T>(app: App<T>, path: String, target: String) -> App<T>
92where
93    T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
94{
95    app.service(web::scope(&path).default_service(web::to({
96        move |req: HttpRequest, body: web::Bytes| {
97            let path = path.clone();
98            let target = target.clone();
99            async move {
100                let method = req.method();
101                let uri = req.uri();
102                let current_path = uri.path();
103                let tail = current_path.strip_prefix(&path).unwrap_or(current_path);
104                let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
105
106                let target_url = format!("{target}{tail}{query}");
107
108                log::info!("proxy {method} {path}{tail} -> {target_url}");
109
110                let request = awc::Client::new().request_from(&target_url, req.head());
111
112                let response = if !body.is_empty() {
113                    request.send_body(body)
114                } else {
115                    request.send()
116                };
117
118                match response.await {
119                    Ok(response) => response.into_http_response(),
120                    Err(error) => {
121                        let message = format!("Error fetching from url={target_url} error={error}");
122                        HttpResponse::InternalServerError().body(message)
123                    }
124                }
125            }
126        }
127    })))
128}