use std::collections::{HashMap, VecDeque};
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use crate::error::ClientError;
const DEFAULT_LOG_BUFFER_CAPACITY: usize = 500;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OrchestratorConfig {
pub plugins: HashMap<String, PluginConfig>,
pub routes: HashMap<String, Vec<RouteRule>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub storage_config: Option<StorageConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum PluginConfig {
Builtin,
Grpc(GrpcPluginConfig),
Nats(NatsPluginConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GrpcPluginConfig {
pub queue_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NatsPluginConfig {
pub url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stream: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consumer: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
#[serde(default = "default_queue_timeout_secs")]
pub queue_timeout_secs: u64,
}
fn default_queue_timeout_secs() -> u64 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RouteRule {
pub plugin: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum StorageConfig {
InMemory,
Sqlite {
#[serde(rename = "databaseUrl")]
database_url: String,
#[serde(default)]
auto_migrate: bool,
},
}
pub struct LocalOrchestrator {
child: Child,
url: String,
log_buffer: Arc<Mutex<VecDeque<String>>>,
dump_logs_on_panic: bool,
}
impl LocalOrchestrator {
pub async fn start(config: OrchestratorConfig) -> Result<Self, ClientError> {
Self::start_with_timeout(config, Duration::from_secs(30)).await
}
pub async fn start_with_timeout(
config: OrchestratorConfig,
startup_timeout: Duration,
) -> Result<Self, ClientError> {
let binary = std::env::var("STEPFLOW_DEV_BINARY").map_err(|_| {
ClientError::LocalServer(
"STEPFLOW_DEV_BINARY environment variable is not set. \
Set it to the path of the stepflow binary to run integration tests."
.to_string(),
)
})?;
let config_json = serde_json::to_string(&config).map_err(|e| {
ClientError::LocalServer(format!("Failed to serialize orchestrator config: {e}"))
})?;
let child = Command::new(&binary)
.args(["--port", "0", "--config-stdin"])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit()) .spawn()
.map_err(|e| {
ClientError::LocalServer(format!(
"Failed to launch orchestrator binary '{binary}': {e}"
))
})?;
let mut guard = ChildKillGuard::new(child);
{
use std::io::Write;
let child = guard.0.as_mut().expect("guard is armed");
let stdin = child.stdin.take().expect("stdin was piped");
let mut stdin = stdin;
stdin.write_all(config_json.as_bytes()).map_err(|e| {
ClientError::LocalServer(format!(
"Failed to write config to orchestrator stdin: {e}"
))
})?;
}
let stdout = guard
.0
.as_mut()
.expect("guard is armed")
.stdout
.take()
.expect("stdout was piped");
let (port, reader) = tokio::time::timeout(
startup_timeout,
tokio::task::spawn_blocking(move || read_port_from_stdout(stdout, startup_timeout)),
)
.await
.map_err(|_| {
ClientError::LocalServer(format!(
"Timed out after {}s waiting for orchestrator port announcement",
startup_timeout.as_secs()
))
})?
.map_err(|e| ClientError::LocalServer(format!("Port reader task panicked: {e}")))?
.map_err(ClientError::LocalServer)?;
let log_buffer = Arc::new(Mutex::new(VecDeque::with_capacity(
DEFAULT_LOG_BUFFER_CAPACITY,
)));
let buf = Arc::clone(&log_buffer);
std::thread::spawn(move || drain_stdout(reader, buf));
let url = format!("http://127.0.0.1:{port}");
wait_for_health(&url, startup_timeout).await.map_err(|e| {
ClientError::LocalServer(format!("Orchestrator health check failed: {e}"))
})?;
let child = guard.disarm();
Ok(Self {
child,
url,
log_buffer,
dump_logs_on_panic: true,
})
}
pub fn url(&self) -> &str {
&self.url
}
pub fn tasks_url(&self) -> &str {
&self.url
}
pub fn set_dump_logs_on_panic(&mut self, enabled: bool) {
self.dump_logs_on_panic = enabled;
}
pub fn recent_logs(&self, n: usize) -> Vec<String> {
let buf = self.log_buffer.lock().expect("log buffer lock poisoned");
let len = buf.len();
buf.iter().skip(len.saturating_sub(n)).cloned().collect()
}
}
impl Drop for LocalOrchestrator {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
if self.dump_logs_on_panic && std::thread::panicking() {
let buf = self.log_buffer.lock().expect("log buffer lock poisoned");
if !buf.is_empty() {
eprintln!("\n--- Recent orchestrator logs ({} lines) ---", buf.len());
for line in buf.iter() {
eprintln!(" {line}");
}
eprintln!("--- End orchestrator logs ---\n");
}
}
}
}
struct ChildKillGuard(Option<Child>);
impl ChildKillGuard {
fn new(child: Child) -> Self {
Self(Some(child))
}
fn disarm(&mut self) -> Child {
self.0.take().expect("guard already disarmed")
}
}
impl Drop for ChildKillGuard {
fn drop(&mut self) {
if let Some(mut child) = self.0.take() {
let _ = child.kill();
let _ = child.wait();
}
}
}
fn read_port_from_stdout(
stdout: std::process::ChildStdout,
timeout: Duration,
) -> Result<(u16, BufReader<std::process::ChildStdout>), String> {
let mut reader = BufReader::new(stdout);
let deadline = Instant::now() + timeout;
let mut line_buf = String::new();
loop {
if Instant::now() > deadline {
return Err(format!(
"Timed out after {}s waiting for port announcement",
timeout.as_secs()
));
}
line_buf.clear();
let n = reader
.read_line(&mut line_buf)
.map_err(|e| format!("Failed to read orchestrator stdout: {e}"))?;
if n == 0 {
return Err("Orchestrator stdout closed without announcing port".to_string());
}
let line = line_buf.trim();
if line.starts_with('{')
&& line.contains("port")
&& let Ok(obj) = serde_json::from_str::<serde_json::Value>(line)
&& let Some(port) = obj.get("port").and_then(|v| v.as_u64())
{
return Ok((port as u16, reader));
}
}
}
fn drain_stdout(
reader: BufReader<std::process::ChildStdout>,
buffer: Arc<Mutex<VecDeque<String>>>,
) {
for line in reader.lines() {
match line {
Ok(line) => {
let mut buf = buffer.lock().expect("log buffer lock poisoned");
if buf.len() == DEFAULT_LOG_BUFFER_CAPACITY {
buf.pop_front();
}
buf.push_back(line);
}
Err(_) => break,
}
}
}
async fn wait_for_health(base_url: &str, timeout: Duration) -> Result<(), String> {
let health_url = format!("{base_url}/api/v1/health");
let deadline = tokio::time::Instant::now() + timeout;
loop {
match tokio::time::timeout(Duration::from_secs(5), reqwest_health_check(&health_url)).await
{
Ok(Ok(())) => return Ok(()),
_ => {}
}
if tokio::time::Instant::now() >= deadline {
return Err(format!(
"Orchestrator at {base_url} did not become healthy within {}s",
timeout.as_secs()
));
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
async fn reqwest_health_check(url: &str) -> Result<(), ()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
let without_scheme = url.trim_start_matches("http://");
let (host_port, _path) = without_scheme
.split_once('/')
.unwrap_or((without_scheme, ""));
let mut stream = TcpStream::connect(host_port).await.map_err(|_| ())?;
let request = format!(
"GET /{} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n\r\n",
url.splitn(4, '/').nth(3).unwrap_or("api/v1/health"),
host_port
);
stream.write_all(request.as_bytes()).await.map_err(|_| ())?;
let mut response = Vec::new();
stream.read_to_end(&mut response).await.map_err(|_| ())?;
let head = std::str::from_utf8(&response[..response.len().min(64)]).unwrap_or("");
if head.contains("200") {
Ok(())
} else {
Err(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_log_buffer(lines: &[&str]) -> Arc<Mutex<VecDeque<String>>> {
let buf: VecDeque<String> = lines.iter().map(|s| s.to_string()).collect();
Arc::new(Mutex::new(buf))
}
#[test]
fn test_recent_logs_returns_last_n_lines() {
let buffer = make_log_buffer(&["line1", "line2", "line3", "line4", "line5"]);
let buf = buffer.lock().unwrap();
let len = buf.len();
let recent: Vec<String> = buf.iter().skip(len.saturating_sub(3)).cloned().collect();
assert_eq!(recent, vec!["line3", "line4", "line5"]);
}
#[test]
fn test_recent_logs_fewer_than_n() {
let buffer = make_log_buffer(&["only_line"]);
let buf = buffer.lock().unwrap();
let len = buf.len();
let recent: Vec<String> = buf.iter().skip(len.saturating_sub(10)).cloned().collect();
assert_eq!(recent, vec!["only_line"]);
}
#[test]
fn test_recent_logs_empty_buffer() {
let buffer = make_log_buffer(&[]);
let buf = buffer.lock().unwrap();
let len = buf.len();
let recent: Vec<String> = buf.iter().skip(len.saturating_sub(5)).cloned().collect();
assert!(recent.is_empty());
}
#[test]
fn test_ring_buffer_caps_at_capacity() {
let mut buf = VecDeque::with_capacity(DEFAULT_LOG_BUFFER_CAPACITY);
let total_lines = DEFAULT_LOG_BUFFER_CAPACITY + 100;
for i in 0..total_lines {
if buf.len() == DEFAULT_LOG_BUFFER_CAPACITY {
buf.pop_front();
}
buf.push_back(format!("line {i}"));
}
assert_eq!(buf.len(), DEFAULT_LOG_BUFFER_CAPACITY);
assert_eq!(buf.front().unwrap(), "line 100");
assert_eq!(buf.back().unwrap(), &format!("line {}", total_lines - 1));
}
#[test]
fn test_child_kill_guard_kills_on_drop() {
let child = Command::new("sleep")
.arg("60")
.spawn()
.expect("failed to spawn sleep");
let id = child.id();
{
let _guard = ChildKillGuard::new(child);
}
std::thread::sleep(Duration::from_millis(50));
let status = Command::new("kill").args(["-0", &id.to_string()]).status();
assert!(
status.is_err() || !status.unwrap().success(),
"process should no longer exist"
);
}
#[test]
fn test_child_kill_guard_disarm_does_not_kill() {
let child = Command::new("sleep")
.arg("60")
.spawn()
.expect("failed to spawn sleep");
let mut guard = ChildKillGuard::new(child);
let mut child = guard.disarm();
let alive = Command::new("kill")
.args(["-0", &child.id().to_string()])
.status()
.map(|s| s.success())
.unwrap_or(false);
assert!(alive, "process should still be alive after disarm");
let _ = child.kill();
let _ = child.wait();
}
}