Skip to main content

vertigo_cli/serve/
serve_run.rs

1use actix_proxy::IntoHttpResponse;
2use actix_web::{
3    App, HttpRequest, HttpResponse, HttpServer,
4    dev::{ServiceFactory, ServiceRequest},
5    rt::System,
6    web,
7};
8use std::{num::NonZeroUsize, time::Duration};
9
10use crate::commons::{
11    ErrorCode,
12    spawn::{ServerOwner, term_signal},
13};
14use crate::serve::mount_path::MountConfig;
15
16use super::{
17    ServeOpts, ServeOptsInner, server_state::ServerState, vertigo_install::vertigo_install,
18};
19
20async fn wait_for_port(addr: &str, port: u16) {
21    for i in 0..20 {
22        match std::net::TcpListener::bind(addr) {
23            Ok(listener) => {
24                // Drop the listener to free the port for actix
25                drop(listener);
26                break;
27            }
28            Err(_) => {
29                log::warn!(
30                    "Port {} is still in use, waiting 1s... ({}/20)",
31                    port,
32                    i + 1
33                );
34                tokio::time::sleep(Duration::from_secs(1)).await;
35            }
36        }
37    }
38}
39
40pub async fn run(opts: ServeOpts, port_watch: Option<u16>) -> Result<(), ErrorCode> {
41    log::info!("serve params => {opts:#?}");
42
43    let ServeOptsInner {
44        host,
45        port,
46        proxy,
47        mount_point,
48        env,
49        wasm_preload,
50        disable_hydration,
51        threads,
52    } = opts.inner;
53
54    let mount_config = MountConfig::new(
55        mount_point,
56        opts.common.dest_dir,
57        env,
58        wasm_preload,
59        disable_hydration,
60    )?;
61
62    ServerState::init_with_watch(&mount_config, port_watch)?;
63
64    let app = move || {
65        let mut app = App::new();
66
67        for (path, target) in &proxy {
68            app = install_proxy(app, path.clone(), target.clone());
69        }
70
71        app.configure(|cfg| {
72            vertigo_install(cfg, &mount_config);
73        })
74    };
75
76    let addr = format!("{host}:{port}");
77
78    wait_for_port(&addr, port).await;
79
80    let server =
81        HttpServer::new(app)
82            .workers(threads.unwrap_or_else(|| {
83                std::thread::available_parallelism().map_or(2, NonZeroUsize::get)
84            }))
85            .bind(addr.clone())
86            .map_err(|err| {
87                log::error!("Can't bind/serve on {addr}: {err}");
88                ErrorCode::ServeCantOpenPort
89            })?;
90
91    let server = server
92        .disable_signals()
93        .client_disconnect_timeout(Duration::from_secs(2))
94        .shutdown_timeout(5)
95        .run();
96
97    let handle = server.handle();
98    let handle2 = server.handle();
99
100    std::thread::spawn(move || System::new().block_on(server));
101
102    tokio::select! {
103        _ = ServerOwner { handle } => {},
104        msg = term_signal() => {
105            log::info!("{msg} received, shutting down");
106            handle2.stop(false).await;
107        }
108    }
109
110    Ok(())
111}
112
113fn install_proxy<T>(app: App<T>, path: String, target: String) -> App<T>
114where
115    T: ServiceFactory<ServiceRequest, Config = (), Error = actix_web::Error, InitError = ()>,
116{
117    app.service(web::scope(&path).default_service(web::to({
118        move |req: HttpRequest, body: web::Bytes| {
119            let path = path.clone();
120            let target = target.clone();
121            async move {
122                let method = req.method();
123                let uri = req.uri();
124                let current_path = uri.path();
125                let tail = current_path.strip_prefix(&path).unwrap_or(current_path);
126                let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
127
128                let target_url = format!("{target}{tail}{query}");
129
130                log::info!("proxy {method} {path}{tail} -> {target_url}");
131
132                let request = awc::Client::new().request_from(&target_url, req.head());
133
134                let response = if !body.is_empty() {
135                    request.send_body(body)
136                } else {
137                    request.send()
138                };
139
140                match response.await {
141                    Ok(response) => response.into_http_response(),
142                    Err(error) => {
143                        let message = format!("Error fetching from url={target_url} error={error}");
144                        HttpResponse::InternalServerError().body(message)
145                    }
146                }
147            }
148        }
149    })))
150}