use std::path::PathBuf;
use std::time::Duration;
use async_trait::async_trait;
use chrono::Utc;
use crate::fleet::{spawn_worker, SpawnedChild};
use crate::proto::AgentRecord;
use crate::provision::{Placement, ProvisionSpec};
use crate::transport::{ChildClient, TransportError, TransportResult};
#[async_trait]
pub trait WorkerLauncher: Send + Sync {
async fn launch(&self, spec: &ProvisionSpec, wait: Duration) -> TransportResult<SpawnedChild>;
}
pub struct LocalSubprocessLauncher {
pub worker_bin: PathBuf,
pub worker_args: Vec<String>,
}
impl LocalSubprocessLauncher {
pub fn new(worker_bin: impl Into<PathBuf>, worker_args: Vec<String>) -> Self {
Self {
worker_bin: worker_bin.into(),
worker_args,
}
}
}
#[async_trait]
impl WorkerLauncher for LocalSubprocessLauncher {
async fn launch(&self, spec: &ProvisionSpec, wait: Duration) -> TransportResult<SpawnedChild> {
spawn_worker(&self.worker_bin, &self.worker_args, spec, wait).await
}
}
pub struct ConnectLauncher;
#[async_trait]
impl WorkerLauncher for ConnectLauncher {
async fn launch(&self, spec: &ProvisionSpec, wait: Duration) -> TransportResult<SpawnedChild> {
let endpoint = match &spec.placement {
Placement::Remote { endpoint } => endpoint.clone(),
other => {
return Err(TransportError::Protocol(format!(
"ConnectLauncher requires Placement::Remote, got {other:?}"
)))
}
};
let token = spec.secrets.worker_auth_token.as_deref();
let probe = ChildClient::connect_with_auth(&endpoint, token);
match tokio::time::timeout(wait, probe).await {
Ok(Ok(client)) => {
let _ = client.close().await;
}
Ok(Err(e)) => {
return Err(TransportError::Protocol(format!(
"remote worker '{}' unreachable at {endpoint}: {e}",
spec.identity.child_id
)))
}
Err(_) => {
return Err(TransportError::Protocol(format!(
"remote worker '{}' did not accept a connection at {endpoint} within {wait:?}",
spec.identity.child_id
)))
}
}
let record = AgentRecord {
agent_id: spec.identity.child_id.clone(),
role: spec.identity.role.clone(),
labels: Vec::new(),
endpoint,
pid: 0,
version: env!("CARGO_PKG_VERSION").to_string(),
started_at: Utc::now(),
lease_expires_at: Utc::now() + chrono::Duration::seconds(60),
};
Ok(SpawnedChild::remote(record))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provision::{ChildIdentity, ExecutorSpec};
#[test]
fn local_subprocess_launcher_is_a_trait_object() {
let launcher = LocalSubprocessLauncher::new("/bin/true", vec!["subagent-worker".into()]);
let _dyn: &dyn WorkerLauncher = &launcher;
assert_eq!(launcher.worker_bin, PathBuf::from("/bin/true"));
assert_eq!(launcher.worker_args, vec!["subagent-worker".to_string()]);
}
#[test]
fn connect_launcher_is_a_trait_object() {
let _dyn: &dyn WorkerLauncher = &ConnectLauncher;
}
#[tokio::test]
async fn connect_launcher_rejects_non_remote_placement() {
let spec = ProvisionSpec::new(
ChildIdentity {
child_id: "c1".into(),
parent_id: None,
project_key: None,
role: "demo".into(),
depth: 0,
},
ExecutorSpec::Echo,
"/tmp/fabric".into(),
);
let result = ConnectLauncher
.launch(&spec, Duration::from_millis(100))
.await;
match result {
Err(TransportError::Protocol(m)) if m.contains("Placement::Remote") => {}
Err(other) => panic!("expected a clear Placement::Remote error, got {other:?}"),
Ok(_) => panic!("Local placement must be rejected"),
}
}
}