use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, OnceLock};
use anyhow::{Result, bail};
use async_process::{Command, Stdio};
use async_trait::async_trait;
use hydro_deploy_integration::ServerBindConfig;
use crate::progress::ProgressTracker;
use crate::rust_crate::build::BuildOutput;
use crate::rust_crate::tracing_options::TracingOptions;
use crate::{
BaseServerStrategy, ClientStrategy, Host, HostStrategyGetter, HostTargetType, LaunchedBinary,
LaunchedHost, PortNetworkHint, ResourceBatch, ResourceResult,
};
pub mod launched_binary;
pub use launched_binary::*;
#[cfg(feature = "profile-folding")]
#[cfg(any(target_os = "macos", target_family = "windows"))]
mod samply;
static LOCAL_LIBDIR: OnceLock<String> = OnceLock::new();
#[derive(Debug)]
pub struct LocalhostHost {
pub id: usize,
client_only: bool,
}
impl LocalhostHost {
pub fn new(id: usize) -> LocalhostHost {
LocalhostHost {
id,
client_only: false,
}
}
pub fn client_only(&self) -> LocalhostHost {
LocalhostHost {
id: self.id,
client_only: true,
}
}
}
impl Host for LocalhostHost {
fn target_type(&self) -> HostTargetType {
HostTargetType::Local
}
fn request_port_base(&self, _bind_type: &BaseServerStrategy) {}
fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {}
fn request_custom_binary(&self) {}
fn id(&self) -> usize {
self.id
}
fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
Some(Arc::new(LaunchedLocalhost))
}
fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
Arc::new(LaunchedLocalhost)
}
fn strategy_as_server<'a>(
&'a self,
connection_from: &dyn Host,
network_hint: PortNetworkHint,
) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
if self.client_only {
anyhow::bail!("Localhost cannot be a server if it is client only")
}
if matches!(network_hint, PortNetworkHint::Auto)
&& connection_from.can_connect_to(ClientStrategy::UnixSocket(self.id))
{
Ok((
ClientStrategy::UnixSocket(self.id),
Box::new(|_| BaseServerStrategy::UnixSocket),
))
} else if matches!(
network_hint,
PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
) && connection_from.can_connect_to(ClientStrategy::InternalTcpPort(self))
{
Ok((
ClientStrategy::InternalTcpPort(self),
Box::new(move |_| {
BaseServerStrategy::InternalTcpPort(match network_hint {
PortNetworkHint::Auto => None,
PortNetworkHint::TcpPort(port) => port,
})
}),
))
} else {
anyhow::bail!("Could not find a strategy to connect to localhost")
}
}
fn can_connect_to(&self, typ: ClientStrategy) -> bool {
match typ {
ClientStrategy::UnixSocket(id) => {
#[cfg(unix)]
{
self.id == id
}
#[cfg(not(unix))]
{
let _ = id;
false
}
}
ClientStrategy::InternalTcpPort(target_host) => self.id == target_host.id(),
ClientStrategy::ForwardedTcpPort(_) => true,
}
}
}
struct LaunchedLocalhost;
#[async_trait]
impl LaunchedHost for LaunchedLocalhost {
fn base_server_config(&self, bind_type: &BaseServerStrategy) -> ServerBindConfig {
match bind_type {
BaseServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
BaseServerStrategy::InternalTcpPort(port) => {
ServerBindConfig::TcpPort("127.0.0.1".to_owned(), *port)
}
BaseServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
}
}
async fn copy_binary(&self, _binary: &BuildOutput) -> Result<()> {
Ok(())
}
async fn launch_binary(
&self,
id: String,
binary: &BuildOutput,
args: &[String],
tracing: Option<TracingOptions>,
env: &HashMap<String, String>,
pin_to_core: Option<usize>,
) -> Result<Box<dyn LaunchedBinary>> {
if pin_to_core.is_some() {
ProgressTracker::println(format!(
"[{id}] pin_to_core is not supported on localhost, ignoring"
));
}
let (maybe_perf_outfile, mut command) = if let Some(tracing) = tracing.as_ref() {
if cfg!(any(target_os = "macos", target_family = "windows")) {
ProgressTracker::println(
format!("[{id} tracing] Profiling binary with `samply`.",),
);
let samply_outfile = tempfile::NamedTempFile::new()?;
let mut command = Command::new("samply");
command
.arg("record")
.arg("--save-only")
.arg("--output")
.arg(samply_outfile.as_ref())
.arg(&binary.bin_path)
.args(args);
(Some(samply_outfile), command)
} else if cfg!(target_family = "unix") {
ProgressTracker::println(format!("[{} tracing] Tracing binary with `perf`.", id));
let perf_outfile = tempfile::NamedTempFile::new()?;
let mut command = Command::new("perf");
command
.args([
"record",
"-F",
&tracing.frequency.to_string(),
"-e",
"cycles:u",
"--call-graph",
"dwarf,65528",
"-o",
])
.arg(perf_outfile.as_ref())
.arg(&binary.bin_path)
.args(args);
(Some(perf_outfile), command)
} else {
bail!(
"Unknown OS for samply/perf tracing: {}",
std::env::consts::OS
);
}
} else {
let mut command = Command::new(&binary.bin_path);
command.args(args);
(None, command)
};
let dylib_path_var = if cfg!(windows) {
"PATH"
} else if cfg!(target_os = "macos") {
"DYLD_FALLBACK_LIBRARY_PATH"
} else if cfg!(target_os = "aix") {
"LIBPATH"
} else {
"LD_LIBRARY_PATH"
};
let local_libdir = LOCAL_LIBDIR.get_or_init(|| {
std::process::Command::new("rustc")
.arg("--print")
.arg("target-libdir")
.output()
.map(|output| str::from_utf8(&output.stdout).unwrap().trim().to_owned())
.unwrap()
});
command.env(
dylib_path_var,
std::env::var_os(dylib_path_var).map_or_else(
|| {
std::env::join_paths(
[
binary.shared_library_path.as_ref(),
Some(&std::path::PathBuf::from(local_libdir)),
]
.into_iter()
.flatten(),
)
.unwrap()
},
|paths| {
let mut paths = std::env::split_paths(&paths).collect::<Vec<_>>();
paths.insert(0, std::path::PathBuf::from(local_libdir));
if let Some(shared_path) = &binary.shared_library_path {
paths.insert(0, shared_path.to_path_buf());
}
std::env::join_paths(paths).unwrap()
},
),
);
command.envs(env);
command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
#[cfg(not(target_family = "unix"))]
command.kill_on_drop(true);
ProgressTracker::println(format!("[{}] running command: `{:?}`", id, command));
let child = command.spawn().map_err(|e| {
let msg = if maybe_perf_outfile.is_some() && std::io::ErrorKind::NotFound == e.kind() {
"Tracing executable not found, ensure it is installed"
} else {
"Failed to execute command"
};
anyhow::Error::new(e).context(format!("{}: {:?}", msg, command))
})?;
Ok(Box::new(LaunchedLocalhostBinary::new(
child,
id,
tracing,
maybe_perf_outfile.map(|f| TracingDataLocal { outfile: f }),
)))
}
async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
Ok(*addr)
}
}