use std::{env, process::Stdio};
use anyhow::{Context as _, Result, anyhow, bail};
use tokio::{
io::{AsyncReadExt as _, BufReader},
process::{Child, ChildStdin, ChildStdout, Command},
task::JoinHandle,
};
use tokio_util::compat::{
Compat,
TokioAsyncReadCompatExt,
TokioAsyncWriteCompatExt,
};
use tracing::{debug, info};
use crate::{
Event,
WORKER_ENV,
remote_proto::{ClientMessage, ServerMessage, read_server, write_client},
worker_config::WorkerConfig,
};
pub(crate) struct WorkerProcess {
pub(crate) label: String,
proc: Child,
stdin: Compat<ChildStdin>,
stdout: Compat<BufReader<ChildStdout>>,
stderr_task: JoinHandle<Result<String>>,
}
pub(crate) struct WorkResponse {
pub(crate) event: Event,
pub(crate) status: WorkerStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WorkerStatus {
Ready,
Restart,
}
impl WorkerProcess {
pub(crate) async fn spawn_local(
config: &WorkerConfig,
label: impl Into<String>,
) -> Result<Self> {
let label = label.into();
let exe = env::current_exe().context("resolving current exe")?;
debug!(worker = %label, "spawning local worker process");
let mut command = Command::new(&exe);
command
.env(WORKER_ENV, "1")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = command
.spawn()
.with_context(|| format!("spawning worker process for {label}"))?;
let mut stdin = child.stdin.take().context("worker stdin")?.compat_write();
write_client(&mut stdin, &ClientMessage::Setup(config.clone())).await?;
let stdout =
BufReader::new(child.stdout.take().context("worker stdout")?).compat();
let stderr = child.stderr.take().context("worker stderr")?;
let stderr_task = tokio::spawn(async move {
let mut stderr = BufReader::new(stderr);
let mut buf = String::new();
stderr.read_to_string(&mut buf).await?;
Ok(buf)
});
let mut worker = Self {
label,
proc: child,
stdin,
stdout,
stderr_task,
};
worker.read_ready().await?;
info!(worker = %worker.label, "worker ready");
Ok(worker)
}
pub(crate) async fn work(&mut self, path: &[String]) -> Result<WorkResponse> {
write_client(&mut self.stdin, &ClientMessage::Work(path.to_vec())).await?;
let attr = path.join(".");
let event = self.read_event(path).await?;
let status = self.read_status(&attr).await?;
Ok(WorkResponse { event, status })
}
pub(crate) async fn stop(&mut self) {
let _ = write_client(&mut self.stdin, &ClientMessage::Shutdown).await;
let _ = self.proc.wait().await;
let _ = (&mut self.stderr_task).await;
}
pub(crate) async fn wait_for_restart(&mut self) {
let _ = self.proc.wait().await;
let _ = (&mut self.stderr_task).await;
}
async fn read_ready(&mut self) -> Result<()> {
match self.read_message("handshake", "<startup>").await? {
ServerMessage::Ready => Ok(()),
ServerMessage::Error(error) => {
bail!("worker {} failed: {error}", self.label)
},
other => bail!("unexpected worker handshake: {other:?}"),
}
}
async fn read_event(&mut self, path: &[String]) -> Result<Event> {
let attr = path.join(".");
match self.read_message("event", &attr).await? {
ServerMessage::Event(event) => Ok(*event),
ServerMessage::Error(error) => {
bail!("worker {} failed: {error}", self.label)
},
other => bail!("unexpected worker event for {path:?}: {other:?}"),
}
}
async fn read_status(&mut self, attr: &str) -> Result<WorkerStatus> {
match self.read_message("status", attr).await? {
ServerMessage::Status(status) => Ok(status),
ServerMessage::Error(error) => {
bail!("worker {} failed: {error}", self.label)
},
other => bail!("unexpected worker status for {attr}: {other:?}"),
}
}
async fn read_message(
&mut self,
phase: &str,
attr: &str,
) -> Result<ServerMessage> {
match read_server(&mut self.stdout).await {
Ok(message) => Ok(message),
Err(err) => Err(self.exit_error(phase, attr, err).await),
}
}
async fn exit_error(
&mut self,
phase: &str,
attr: &str,
source: anyhow::Error,
) -> anyhow::Error {
let status = self.proc.wait().await.ok();
let stderr = (&mut self.stderr_task)
.await
.ok()
.and_then(Result::ok)
.unwrap_or_default();
let stderr = stderr.trim();
let mut message = format!(
"evix worker {} failed while reading {phase} for {attr}: {source}",
self.label,
);
if let Some(status) = status {
message.push_str(&format!(" (status: {status})"));
}
if !stderr.is_empty() {
message.push_str("\nworker stderr:\n");
message.push_str(stderr);
}
anyhow!(message)
}
}