use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context as _, Result, anyhow};
use crate::{
Tracer,
ipc_transport::TypedTransport,
process_model::{
BootstrapMessage, HealthCheckConfig, ProcessClass, ProcessInfo, ProcessSpawnOptions,
RestartPolicy, SupervisorEvent, WorkerError, WorkerProgress, WorkerRequest, WorkerResponse,
},
supervisor::ProcessSupervisor,
worker_api::WorkerHandle,
};
pub struct WorkerHost {
supervisor: ProcessSupervisor,
#[allow(dead_code)]
socket_dir: PathBuf,
}
impl WorkerHost {
pub fn new(socket_dir: impl Into<PathBuf>) -> Self {
Self {
supervisor: ProcessSupervisor::new(),
socket_dir: socket_dir.into(),
}
}
pub fn with_temp_dir() -> Self {
Self::new(std::env::temp_dir())
}
pub fn with_tracer(mut self, tracer: Tracer) -> Self {
self.supervisor = self.supervisor.with_tracer(tracer);
self
}
pub fn on_event<F>(&mut self, callback: F)
where
F: Fn(SupervisorEvent) + Send + Sync + 'static,
{
self.supervisor.on_event(callback);
}
pub fn spawn_worker(
&mut self,
class: ProcessClass,
mut info: ProcessInfo,
) -> Result<WorkerHandle> {
let socket_name = format!("gpui-worker-{}", uuid::Uuid::new_v4());
#[cfg(not(target_os = "windows"))]
let socket_path = self.socket_dir.join(format!("{}.sock", socket_name));
#[cfg(target_os = "windows")]
let pipe_name = socket_name;
info.class = class;
#[cfg(not(target_os = "windows"))]
{
info = info.env(
"GPUI_WORKER_SOCKET",
socket_path.to_string_lossy().to_string(),
);
}
#[cfg(target_os = "windows")]
{
info = info.env("GPUI_WORKER_PIPE", format!("\\\\.\\pipe\\{}", pipe_name));
}
#[cfg(not(target_os = "windows"))]
let listener = {
let _ = std::fs::remove_file(&socket_path);
std::os::unix::net::UnixListener::bind(&socket_path)
.with_context(|| format!("failed to bind unix socket: {}", socket_path.display()))?
};
let id = self.supervisor.spawn_with_options(
info,
ProcessSpawnOptions::new(
RestartPolicy::OnFailure {
max_restarts: 3,
backoff: Duration::from_secs(1),
},
HealthCheckConfig::default(),
),
)?;
#[cfg(not(target_os = "windows"))]
let transport = {
let (stream, _) = listener
.accept()
.context("failed to accept worker connection")?;
stream
.set_nonblocking(false)
.context("failed to set blocking mode")?;
let unix_transport =
crate::ipc_transport::UnixDomainSocketTransport::from_stream(stream)?;
Box::new(unix_transport) as Box<dyn crate::ipc_transport::Transport>
};
#[cfg(target_os = "windows")]
let transport = {
let pipe_transport = crate::ipc_transport::NamedPipeTransport::server(&pipe_name)?;
Box::new(pipe_transport) as Box<dyn crate::ipc_transport::Transport>
};
let mut bootstrap =
TypedTransport::<BootstrapMessage, BootstrapMessage, (), String>::new(transport);
bootstrap.send_request(
1,
BootstrapMessage::Handshake {
version: 1,
capabilities: vec!["file-io".to_string(), "network".to_string()],
},
)?;
let ack = bootstrap
.recv_message()
.context("failed to receive bootstrap ack")?;
match ack {
crate::process_model::IpcMessage::Response {
id: 1,
result:
Ok(BootstrapMessage::HandshakeAck {
heartbeat_interval_secs,
granted_capabilities,
}),
} => {
let _ = heartbeat_interval_secs;
let _ = granted_capabilities;
}
_ => return Err(anyhow!("unexpected bootstrap response: {:?}", ack)),
}
let worker_transport =
TypedTransport::<WorkerRequest, WorkerResponse, WorkerProgress, WorkerError>::new(
bootstrap.into_inner(),
);
Ok(WorkerHandle::new(id, worker_transport))
}
pub fn supervisor(&self) -> &ProcessSupervisor {
&self.supervisor
}
pub fn supervisor_mut(&mut self) -> &mut ProcessSupervisor {
&mut self.supervisor
}
}