#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
#[macro_use]
extern crate lazy_static;
use cached::proc_macro::once;
mod conf;
mod modify;
mod proxy;
use conf::{
CACHEABLE, CHROME_ADDRESS, CHROME_ARGS, CHROME_INSTANCES, CHROME_PATH, DEFAULT_PORT,
DEFAULT_PORT_SERVER, ENDPOINT, HOST_NAME, IS_HEALTHY, LAST_CACHE, LIGHTPANDA_ARGS, LIGHT_PANDA,
TARGET_REPLACEMENT,
};
use core::sync::atomic::Ordering;
use http_body_util::Full;
use hyper::{
body::{Bytes, Incoming},
server::conn::http1,
service::service_fn,
Method, Request, Response, StatusCode,
};
use hyper_util::rt::TokioIo;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::process::Command;
use tokio::{
net::{TcpListener, TcpStream},
signal,
};
use std::time::Duration;
use tokio::time::{sleep, timeout};
const EMPTY_RESPONSE: Bytes = Bytes::from_static(
br#"{
"Browser": "",
"Protocol-Version": "",
"User-Agent": "",
"V8-Version": "",
"WebKit-Version": "",
"webSocketDebuggerUrl": ""
}"#,
);
async fn connect_with_retries(address: &str) -> Option<TcpStream> {
let mut attempts = 0;
loop {
match timeout(Duration::from_secs(7), TcpStream::connect(address)).await {
Ok(Ok(stream)) => return Some(stream),
Ok(Err(e)) => {
attempts += 1;
tracing::warn!("Failed to connect: {}. Attempt {} of 20", e, attempts);
}
Err(_) => {
attempts += 1;
tracing::error!("Connection attempt timed out. Attempt {} of 20", attempts);
}
}
if attempts >= 20 {
return None;
}
sleep(Duration::from_millis(250)).await;
}
}
#[cfg(target_os = "windows")]
fn shutdown(pid: &u32) {
let _ = Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.spawn();
}
#[cfg(not(target_os = "windows"))]
fn shutdown(pid: &u32) {
let _ = Command::new("kill").args(["-9", &pid.to_string()]).spawn();
}
async fn fork(port: Option<u32>) -> String {
let id = if !*LIGHT_PANDA {
let mut command = Command::new(&*CHROME_PATH);
let mut chrome_args = CHROME_ARGS.map(|e| e.to_string());
if !CHROME_ADDRESS.is_empty() {
chrome_args[0] = format!("--remote-debugging-address={}", &CHROME_ADDRESS.to_string());
}
if let Some(port) = port {
chrome_args[1] = format!("--remote-debugging-port={}", &port.to_string());
}
let cmd = command.args(&chrome_args);
let id = if let Ok(child) = cmd.spawn() {
let cid = child.id();
tracing::info!("Chrome PID: {}", cid);
cid
} else {
tracing::error!("chrome command didn't start");
0
};
id
} else {
let panda_args = LIGHTPANDA_ARGS.map(|e| e.to_string());
let mut command = Command::new(&*CHROME_PATH);
let host = panda_args[0].replace("--host=", "");
let port = panda_args[1].replace("--port=", "");
let id = if let Ok(child) = command
.args(["--port", &port])
.args(["--host", &host])
.spawn()
{
let cid = child.id();
tracing::info!("Chrome PID: {}", cid);
cid
} else {
tracing::error!("chrome command didn't start");
0
};
id
};
CHROME_INSTANCES.lock().await.insert(id.into());
id.to_string()
}
async fn version_handler_bytes_base(endpoint_path: Option<&str>) -> Option<Bytes> {
use http_body_util::BodyExt;
let url = endpoint_path
.unwrap_or(&ENDPOINT.as_str())
.parse::<hyper::Uri>()
.expect("valid chrome endpoint");
let req = Request::builder()
.method(Method::GET)
.uri("/json/version")
.header(
hyper::header::HOST,
url.authority()
.map_or_else(|| "localhost".to_string(), |f| f.as_str().to_string()),
)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(http_body_util::Empty::<Bytes>::new())
.expect("Failed to build the request");
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let address = format!("{}:{}", host, port);
let resp = if let Some(stream) = connect_with_retries(&address).await {
let io = TokioIo::new(stream);
if let Ok((mut client, conn)) = hyper::client::conn::http1::handshake(io).await {
tokio::task::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection failed: {:?}", err);
}
});
match client.send_request(req).await {
Ok(mut resp) => {
IS_HEALTHY.store(true, Ordering::Relaxed);
let mut bytes_mut = vec![];
while let Some(next) = resp.frame().await {
if let Ok(frame) = next {
if let Some(chunk) = frame.data_ref() {
bytes_mut.extend(chunk);
}
}
}
if !HOST_NAME.is_empty() {
let body = modify::modify_json_output(bytes_mut.into());
Some(body)
} else {
Some(bytes_mut.into())
}
}
_ => {
IS_HEALTHY.store(false, Ordering::Relaxed);
None
}
}
} else {
None
}
} else {
None
};
resp
}
#[once(option = true, sync_writes = true, time = 10)]
async fn version_handler_bytes(endpoint_path: Option<&str>) -> Option<Bytes> {
version_handler_bytes_base(endpoint_path).await
}
async fn health_check_handler() -> Result<Response<Full<Bytes>>, Infallible> {
if IS_HEALTHY.load(Ordering::Relaxed) {
Ok(Response::new(Full::new(Bytes::from("healthy"))))
} else {
let mut response = Response::new(Full::new(Bytes::from("unhealthy")));
*response.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
Ok(response)
}
}
async fn fork_handler(port: Option<u32>) -> Result<Response<Full<Bytes>>, Infallible> {
let pid = fork(port).await;
let pid = format!("Forked process with pid: {}", pid);
Ok(Response::new(Full::new(Bytes::from(pid))))
}
pub async fn shutdown_instances() {
let mut mutx = CHROME_INSTANCES.lock().await;
for pid in mutx.iter() {
shutdown(pid);
}
mutx.clear();
}
async fn shutdown_handler() -> Result<Response<Full<Bytes>>, Infallible> {
shutdown_instances().await;
Ok(Response::new(Full::new(Bytes::from(
"Shutdown successful.",
))))
}
async fn request_handler(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/health") => health_check_handler().await,
(&Method::GET, "/") => health_check_handler().await,
(&Method::POST, "/fork") => fork_handler(None).await,
(&Method::POST, path) if path.starts_with("/fork/") => {
if let Some(port) = path.split('/').nth(2) {
if let Ok(port) = port.parse::<u32>() {
fork_handler(Some(port)).await
} else {
let message = Response::new(Full::new(Bytes::from("Invalid port argument")));
Ok(message)
}
} else {
let message = Response::new(Full::new(Bytes::from("Invalid path")));
Ok(message)
}
}
(&Method::GET, "/json/version") => {
let mut attempts = 0;
let mut body: Option<Bytes> = None;
while attempts < 10 && body.is_none() {
body = if CACHEABLE.load(Ordering::Relaxed) {
version_handler_bytes(None).await
} else {
version_handler_bytes_base(None).await
};
attempts += 1;
if body.is_none() {
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
let empty = body.is_none();
let mut resp = Response::new(Full::new(body.unwrap_or_else(|| EMPTY_RESPONSE)));
resp.headers_mut().insert(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("application/json"), );
if empty {
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
}
Ok(resp)
}
(&Method::POST, "/shutdown") => shutdown_handler().await,
_ => {
let mut resp = Response::new(Full::new(Bytes::from("Not Found")));
*resp.status_mut() = StatusCode::NOT_FOUND;
Ok(resp)
}
}
}
async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tracing_subscriber::fmt::init();
let auto_start = std::env::args().nth(3).unwrap_or_else(|| {
let auto = std::env::var("CHROME_INIT").unwrap_or("true".into());
if auto == "true" {
"init".into()
} else {
"ignore".into()
}
});
if auto_start == "init" {
fork(Some(*DEFAULT_PORT)).await;
}
let addr = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
*DEFAULT_PORT_SERVER,
);
let listener = TcpListener::bind(addr).await.expect("connection");
let make_svc = async move {
let builder_options = std::sync::Arc::new(
http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.header_read_timeout(None)
.half_close(true)
.auto_date_header(false)
.to_owned(),
);
loop {
if let Ok((tcp, _)) = listener.accept().await {
let io = TokioIo::new(tcp);
let builder_options = builder_options.clone();
tokio::task::spawn(async move {
if let Err(err) = builder_options
.serve_connection(io, service_fn(request_handler))
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
};
println!(
"Chrome server running on {}:{}",
if CHROME_ADDRESS.is_empty() {
"localhost"
} else {
&CHROME_ADDRESS
},
DEFAULT_PORT_SERVER.to_string()
);
tokio::select! {
_ = make_svc => Ok(()),
_ = crate::proxy::proxy::run_proxy() => Ok(()),
_ = signal::ctrl_c() => Ok(()),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
run_main().await
}