1use std::path::Path;
11use std::process::Stdio;
12use std::time::Duration;
13
14use tokio::io::AsyncWriteExt;
15use tokio::process::{Child, Command};
16use tokio::time::{sleep, Instant};
17
18use crate::discovery::Fabric;
19use crate::proto::AgentRecord;
20use crate::provision::ProvisionSpec;
21use crate::transport::{TransportError, TransportResult};
22
23pub struct SpawnedChild {
25 pub record: AgentRecord,
26 process: Child,
27}
28
29impl SpawnedChild {
30 pub async fn kill(mut self) {
32 let _ = self.process.kill().await;
33 }
34
35 pub fn pid(&self) -> Option<u32> {
36 self.process.id()
37 }
38}
39
40pub async fn spawn_worker(
46 worker_bin: &Path,
47 worker_args: &[String],
48 spec: &ProvisionSpec,
49 wait: Duration,
50) -> TransportResult<SpawnedChild> {
51 let fabric_dir = Path::new(&spec.fabric_dir);
52 tokio::fs::create_dir_all(fabric_dir).await.ok();
53
54 let spec_json = spec
55 .to_json()
56 .map_err(|e| TransportError::Protocol(format!("provision spec encode: {e}")))?;
57
58 let mut cmd = Command::new(worker_bin);
59 cmd.args(worker_args);
60 cmd.stdin(Stdio::piped());
61 cmd.kill_on_drop(true);
62 let mut process = cmd.spawn().map_err(TransportError::Io)?;
63
64 {
66 let mut stdin = process
67 .stdin
68 .take()
69 .ok_or_else(|| TransportError::Protocol("worker stdin unavailable".to_string()))?;
70 stdin
71 .write_all(spec_json.as_bytes())
72 .await
73 .map_err(TransportError::Io)?;
74 stdin.shutdown().await.map_err(TransportError::Io)?;
75 }
76
77 let child_id = spec.identity.child_id.clone();
78 let fab = Fabric::at(fabric_dir);
79 let deadline = Instant::now() + wait;
80 loop {
81 if let Ok(Some(record)) = fab.resolve(&child_id).await {
82 return Ok(SpawnedChild { record, process });
83 }
84 if let Ok(Some(status)) = process.try_wait() {
86 return Err(TransportError::Protocol(format!(
87 "worker exited before registering: {status}"
88 )));
89 }
90 if Instant::now() >= deadline {
91 let _ = process.kill().await;
92 return Err(TransportError::Protocol(format!(
93 "worker '{child_id}' did not register within {wait:?}"
94 )));
95 }
96 sleep(Duration::from_millis(20)).await;
97 }
98}