use super::*;
pub(super) fn orchestrate_cmd(args: &[String]) -> Result<(), String> {
let Some(subcommand) = args.first().map(String::as_str) else {
return Err(String::from(
"missing orchestrate subcommand (expected init|validate|plan|run|up|down|status)",
));
};
match subcommand {
"init" => orchestrate_init(&args[1..]),
"validate" => orchestrate_validate(&args[1..]),
"plan" => orchestrate_plan(&args[1..]),
"run" => orchestrate_run(&args[1..]),
"up" => orchestrate_up(&args[1..]),
"down" => orchestrate_down(&args[1..]),
"status" => orchestrate_status(&args[1..]),
_ => Err(format!(
"unsupported orchestrate subcommand: {subcommand} (expected init|validate|plan|run|up|down|status)"
)),
}
}
fn orchestrate_init(args: &[String]) -> Result<(), String> {
let output = option_value(args, "--output")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(DEFAULT_ORCHESTRATION_CONFIG_PATH));
let json = has_flag(args, "--json");
if let Some(parent) = output.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.map_err(|err| format!("create config directory {} failed: {err}", parent.display()))?;
}
let template = default_orchestration_config();
let content = serde_json::to_string_pretty(&template)
.map_err(|err| format!("serialize orchestration template failed: {err}"))?;
std::fs::write(&output, format!("{content}\n")).map_err(|err| {
format!(
"write orchestrate config {} failed: {err}",
output.display()
)
})?;
if json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.init.v1",
"output": output,
"task_count": template.tasks.len(),
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate init output failed: {err}"))?
);
} else {
println!("orchestrate config initialized: {}", output.display());
println!("task_count: {}", template.tasks.len());
}
Ok(())
}
fn orchestrate_validate(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
let config = load_resolved_config(&opts)?;
let errors = validate_config(&config);
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.validate.v1",
"config_file": opts.config_path,
"profile": opts.profile,
"overlays": opts.overlay_files,
"status": if errors.is_empty() { "pass" } else { "fail" },
"task_count": config.tasks.len(),
"errors": errors,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate validate output failed: {err}"))?
);
}
if !errors.is_empty() {
return Err(format!("orchestrate config invalid: {}", errors.join("; ")));
}
if !opts.json {
println!("orchestrate config valid: {}", opts.config_path.display());
println!("task_count: {}", config.tasks.len());
}
Ok(())
}
fn orchestrate_plan(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
let config = load_resolved_config(&opts)?;
let errors = validate_config(&config);
if !errors.is_empty() {
return Err(format!("orchestrate config invalid: {}", errors.join("; ")));
}
let ordered =
filtered_ordered_tasks(&config, opts.target_task.as_deref(), opts.group.as_deref())?;
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.plan.v1",
"config_file": opts.config_path,
"profile": opts.profile,
"overlays": opts.overlay_files,
"task": opts.target_task,
"group": opts.group,
"planned_tasks_count": ordered.len(),
"planned_tasks": ordered,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate plan output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Plan");
println!("config: {}", opts.config_path.display());
if let Some(profile) = opts.profile {
println!("profile: {profile}");
}
if let Some(task) = opts.target_task {
println!("target: {task}");
} else {
println!("target: <all>");
}
if let Some(group) = opts.group {
println!("group: {group}");
}
for (idx, task) in ordered.iter().enumerate() {
println!("{:02}. {}", idx + 1, task);
}
}
Ok(())
}
fn orchestrate_run(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
let config = load_resolved_config(&opts)?;
let errors = validate_config(&config);
if !errors.is_empty() {
return Err(format!("orchestrate config invalid: {}", errors.join("; ")));
}
let ordered =
filtered_ordered_tasks(&config, opts.target_task.as_deref(), opts.group.as_deref())?;
let max_parallel = resolve_max_parallel(&config, opts.max_parallel_override)?;
if opts.dry_run {
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.run.v1",
"mode": "dry_run",
"config_file": opts.config_path,
"profile": opts.profile,
"task": opts.target_task,
"group": opts.group,
"planned_tasks_count": ordered.len(),
"planned_tasks": ordered,
"max_parallel": max_parallel,
"status": "pass",
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate dry-run output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Dry Run");
println!("config: {}", opts.config_path.display());
println!("max_parallel: {}", max_parallel);
println!("tasks:");
for (idx, task) in ordered.iter().enumerate() {
println!("{:02}. {}", idx + 1, task);
}
}
return Ok(());
}
let task_index = build_task_index(&config)?;
for name in &ordered {
let task = &config.tasks[*task_index
.get(name)
.ok_or_else(|| format!("internal error: task not found: {name}"))?];
if task.run_mode == TaskRunMode::Service {
return Err(format!(
"task {} is run_mode=service; use orchestrate up/down/status for lifecycle management",
task.name
));
}
}
let started = Instant::now();
let (status, results, failed_message) =
run_oneshot_parallel(&config, &ordered, max_parallel, opts.continue_on_error)?;
let total_elapsed_ms = started.elapsed().as_millis() as u64;
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.run.v1",
"mode": "execute_parallel",
"scheduler": "dag_parallel",
"config_file": opts.config_path,
"profile": opts.profile,
"task": opts.target_task,
"group": opts.group,
"status": status,
"continue_on_error": opts.continue_on_error,
"planned_tasks_count": ordered.len(),
"executed_tasks_count": results.len(),
"max_parallel": max_parallel,
"total_elapsed_ms": total_elapsed_ms,
"log_dir": DEFAULT_RUN_LOG_DIR,
"results": results,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate run output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Run");
println!("config: {}", opts.config_path.display());
println!("max_parallel: {}", max_parallel);
println!("planned_tasks: {}", ordered.len());
println!("executed_tasks: {}", results.len());
println!("total_elapsed_ms: {}", total_elapsed_ms);
println!("status: {status}");
for result in &results {
println!(
"- {} status={} exit_code={:?} elapsed_ms={} log={}",
result.name,
result.status,
result.exit_code,
result.elapsed_ms,
result
.log_file
.as_ref()
.map(|p| p.display().to_string())
.unwrap_or_else(|| String::from("<none>"))
);
}
}
if status != "pass" {
return Err(failed_message.unwrap_or_else(|| String::from("orchestrate run failed")));
}
Ok(())
}
fn orchestrate_up(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
let config = load_resolved_config(&opts)?;
let errors = validate_config(&config);
if !errors.is_empty() {
return Err(format!("orchestrate config invalid: {}", errors.join("; ")));
}
let task_index = build_task_index(&config)?;
let ordered =
filtered_ordered_tasks(&config, opts.target_task.as_deref(), opts.group.as_deref())?;
if ordered.is_empty() {
return Err(String::from("no tasks selected for orchestrate up"));
}
let mut state =
load_runtime_state_or_default(&opts.state_file, &opts.config_path, opts.profile.clone())?;
let mut active_pids = HashSet::new();
for record in &state.processes {
if process_alive(record.pid) {
active_pids.insert(record.pid);
}
}
let mut started = Vec::new();
for name in ordered {
let task = &config.tasks[*task_index
.get(&name)
.ok_or_else(|| format!("internal error: task not found: {name}"))?];
if !task.enabled {
continue;
}
if task.run_mode == TaskRunMode::Oneshot {
run_oneshot_task(task, &HashMap::new())?;
continue;
}
if state
.processes
.iter()
.any(|record| record.task == task.name && process_alive(record.pid))
{
continue;
}
let logs_dir = PathBuf::from(DEFAULT_RUN_LOG_DIR);
std::fs::create_dir_all(&logs_dir)
.map_err(|err| format!("create log directory {} failed: {err}", logs_dir.display()))?;
let log_file = logs_dir.join(format!("service-{}.log", sanitize_name(&task.name)));
let child = spawn_service_task(task, &log_file)?;
let pid = child.id();
drop(child);
if !process_alive(pid) {
return Err(format!("service task {} exited immediately", task.name));
}
if active_pids.insert(pid) {
state.processes.push(ServiceProcessRecord {
task: task.name.clone(),
group: task.group.clone(),
pid,
command: task.command.clone(),
cwd: task.cwd.clone(),
log_file: log_file.clone(),
started_at_unix_ms: now_unix_ms(),
});
started.push((task.name.clone(), pid, log_file));
}
}
state.updated_at_unix_ms = now_unix_ms();
save_runtime_state(&opts.state_file, &state)?;
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.up.v1",
"config_file": opts.config_path,
"profile": opts.profile,
"state_file": opts.state_file,
"started_count": started.len(),
"started": started.iter().map(|(name, pid, log)| {
serde_json::json!({
"task": name,
"pid": pid,
"log_file": log,
})
}).collect::<Vec<_>>(),
"status": "pass",
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate up output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Up");
println!("state_file: {}", opts.state_file.display());
for (task, pid, log_file) in started {
println!("- task={} pid={} log={}", task, pid, log_file.display());
}
}
Ok(())
}
fn orchestrate_down(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
if !opts.state_file.exists() {
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.down.v1",
"state_file": opts.state_file,
"stopped_count": 0,
"status": "pass",
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate down output failed: {err}"))?
);
} else {
println!(
"orchestrate state file not found: {}",
opts.state_file.display()
);
}
return Ok(());
}
let mut state = load_runtime_state(&opts.state_file)?;
let mut keep = Vec::new();
let mut stopped = Vec::new();
for record in state.processes {
let selected_by_task = opts
.target_task
.as_ref()
.is_none_or(|target| target == &record.task);
let selected_by_group = opts
.group
.as_ref()
.is_none_or(|group| record.group.as_ref() == Some(group));
if selected_by_task && selected_by_group {
let was_running = process_alive(record.pid);
if was_running {
terminate_process(record.pid)?;
}
stopped.push(serde_json::json!({
"task": record.task,
"pid": record.pid,
"was_running": was_running,
}));
} else {
keep.push(record);
}
}
state.processes = keep;
state.updated_at_unix_ms = now_unix_ms();
save_runtime_state(&opts.state_file, &state)?;
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.down.v1",
"state_file": opts.state_file,
"stopped_count": stopped.len(),
"stopped": stopped,
"remaining_count": state.processes.len(),
"status": "pass",
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate down output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Down");
println!("state_file: {}", opts.state_file.display());
println!("stopped_count: {}", stopped.len());
println!("remaining_count: {}", state.processes.len());
}
Ok(())
}
fn orchestrate_status(args: &[String]) -> Result<(), String> {
let opts = parse_cli_options(args)?;
let mut state = if opts.state_file.exists() {
load_runtime_state(&opts.state_file)?
} else {
RuntimeState {
api_version: ORCHESTRATION_RUNTIME_STATE_API_VERSION.to_string(),
config_file: opts.config_path.clone(),
profile: opts.profile.clone(),
updated_at_unix_ms: now_unix_ms(),
processes: Vec::new(),
}
};
let mut running = 0usize;
let mut entries = Vec::new();
let mut keep = Vec::new();
for record in state.processes {
let alive = process_alive(record.pid);
if alive {
running += 1;
keep.push(record.clone());
} else if !opts.prune {
keep.push(record.clone());
}
entries.push(serde_json::json!({
"task": record.task,
"group": record.group,
"pid": record.pid,
"alive": alive,
"log_file": record.log_file,
"started_at_unix_ms": record.started_at_unix_ms,
}));
}
if opts.prune {
state.processes = keep;
state.updated_at_unix_ms = now_unix_ms();
save_runtime_state(&opts.state_file, &state)?;
}
if opts.json {
let payload = serde_json::json!({
"api_version": "robotrt.orchestrate.status.v1",
"state_file": opts.state_file,
"total_count": entries.len(),
"running_count": running,
"entries": entries,
"status": "pass",
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize orchestrate status output failed: {err}"))?
);
} else {
println!("RobotRT Orchestration Status");
println!("state_file: {}", opts.state_file.display());
println!("running_count: {}", running);
println!("total_count: {}", entries.len());
}
Ok(())
}