use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, Instant};
use chrono::Utc;
use netsky_ai::{
AiHandleState, AiRequest, ai_handle_dir, build_worker_command, read_handle_output,
read_handle_state, resolve_prompt, run_worker,
};
use netsky_core::error::Error;
use serde_json::json;
use crate::observability::record_worker_cli_invocation;
const ENV_AI_HANDLE: &str = "NETSKY_AI_HANDLE";
pub fn run(
prompt: Option<&str>,
model: &str,
skill_flags: &[String],
timeout_secs: u64,
json_out: bool,
detach: bool,
cwd: Option<PathBuf>,
) -> netsky_core::Result<()> {
let mut stdin = Vec::new();
use std::io::Read;
std::io::stdin().read_to_end(&mut stdin)?;
let prompt = resolve_prompt(prompt, &stdin)?;
let skills = parse_skills(skill_flags);
let request = AiRequest {
prompt,
model: model.to_string(),
skills,
timeout: Duration::from_secs(timeout_secs),
cwd,
};
if detach {
return run_detached(&request, json_out);
}
let handle = std::env::var(ENV_AI_HANDLE).ok();
let result = run_foreground(&request, json_out);
if let Some(handle) = handle.as_deref() {
write_final_status(handle, &result)?;
}
result
}
fn run_foreground(request: &AiRequest, json_out: bool) -> netsky_core::Result<()> {
let spec = build_worker_command(request)?;
let argv_json = serde_json::to_string(&spec.argv)?;
let started = Instant::now();
let result = run_worker(request);
let duration = started.elapsed();
let exit_code = match &result {
Ok(normalized) => Some(i64::from(normalized.exit_code)),
Err(Error::SubprocessFailed { code, .. }) => Some(i64::from(*code)),
Err(Error::SubprocessTimeout { .. }) => None,
Err(_) => Some(1),
};
record_worker_cli_invocation(&spec.program, &argv_json, exit_code, duration, &host());
match result {
Ok(normalized) => print_success(normalized, json_out),
Err(error) => Err(error),
}
}
fn print_success(normalized: netsky_ai::AiNormalized, json_out: bool) -> netsky_core::Result<()> {
if json_out {
println!("{}", serde_json::to_string_pretty(&normalized)?);
} else {
print!("{}", normalized.text);
if !normalized.text.ends_with('\n') {
println!();
}
}
Ok(())
}
pub fn status(handle: &str) -> netsky_core::Result<()> {
println!("{}", render_handle_state(&read_handle_state(handle)?));
Ok(())
}
pub fn wait(handle: &str, timeout_secs: u64) -> netsky_core::Result<()> {
let deadline = Instant::now() + Duration::from_secs(timeout_secs);
loop {
match read_handle_state(handle)? {
AiHandleState::Running => {
if Instant::now() >= deadline {
std::process::exit(124);
}
thread::sleep(Duration::from_millis(100));
}
AiHandleState::Done { .. } => std::process::exit(0),
AiHandleState::Failed { .. } => std::process::exit(1),
AiHandleState::Unknown => std::process::exit(2),
}
}
}
pub fn cat(handle: &str) -> netsky_core::Result<()> {
let mut emitted = false;
loop {
if !emitted && let Some(output) = try_read_output(handle)? {
print!("{}", output.text);
if !output.text.ends_with('\n') {
println!();
}
io::stdout().flush()?;
emitted = true;
}
match read_handle_state(handle)? {
AiHandleState::Running => thread::sleep(Duration::from_millis(100)),
AiHandleState::Done { .. } | AiHandleState::Failed { .. } => return Ok(()),
AiHandleState::Unknown => {
return Err(Error::Invalid(format!("unknown ai handle `{handle}`")));
}
}
}
}
fn run_detached(request: &AiRequest, json_out: bool) -> netsky_core::Result<()> {
let handle = ai_handle();
let dir = ai_handle_dir(&handle);
fs::create_dir_all(&dir)?;
write_request(&dir, request)?;
fs::write(
dir.join("status.json"),
serde_json::to_vec_pretty(&json!({
"handle": handle,
"status": "running",
"ts_utc": Utc::now().to_rfc3339(),
}))?,
)?;
let exe = std::env::current_exe()?;
let stdout = File::create(dir.join("stdout.json"))?;
let stderr = File::create(dir.join("stderr.log"))?;
let mut child = Command::new(exe);
child.arg("ai");
child.arg("--model");
child.arg(&request.model);
for raw in &request.skills {
child.arg("--skill");
child.arg(raw);
}
child.arg("--timeout");
child.arg(request.timeout.as_secs().to_string());
child.arg("--json");
if let Some(cwd) = &request.cwd {
child.arg("--cwd");
child.arg(cwd);
}
child.arg(&request.prompt);
child.env(ENV_AI_HANDLE, &handle);
child.stdin(Stdio::null());
child.stdout(Stdio::from(stdout));
child.stderr(Stdio::from(stderr));
child.spawn()?;
let envelope = json!({
"ok": true,
"handle": handle,
"status": "running",
});
if json_out {
println!("{}", serde_json::to_string_pretty(&envelope)?);
} else {
println!("{handle}");
}
Ok(())
}
fn write_request(dir: &Path, request: &AiRequest) -> netsky_core::Result<()> {
let body = json!({
"prompt": request.prompt,
"model": request.model,
"skills": request.skills,
"timeout_s": request.timeout.as_secs(),
"cwd": request.cwd,
"ts_utc": Utc::now().to_rfc3339(),
});
fs::write(dir.join("request.json"), serde_json::to_vec_pretty(&body)?)?;
Ok(())
}
fn write_final_status(handle: &str, result: &netsky_core::Result<()>) -> netsky_core::Result<()> {
let dir = ai_handle_dir(handle);
fs::create_dir_all(&dir)?;
let body = match result {
Ok(()) => json!({
"handle": handle,
"status": "completed",
"exit_code": 0,
"ts_utc": Utc::now().to_rfc3339(),
}),
Err(error) => {
let exit_code = match error {
Error::SubprocessFailed { code, .. } => *code,
Error::SubprocessTimeout { .. } => 124,
_ => 1,
};
json!({
"handle": handle,
"status": "failed",
"exit_code": exit_code,
"error": error.to_string(),
"reason": error.to_string(),
"ts_utc": Utc::now().to_rfc3339(),
})
}
};
fs::write(dir.join("status.json"), serde_json::to_vec_pretty(&body)?)?;
Ok(())
}
fn ai_handle() -> String {
format!(
"{}-{}-{}",
Utc::now().format("%Y%m%dT%H%M%SZ"),
std::process::id(),
short_rand()
)
}
fn short_rand() -> String {
format!("{:06x}", Utc::now().timestamp_subsec_micros())
}
fn parse_skills(flags: &[String]) -> Vec<String> {
let mut skills = Vec::new();
for raw in flags {
for item in raw.split(',') {
let skill = item.trim();
if !skill.is_empty() {
skills.push(skill.to_string());
}
}
}
skills
}
fn host() -> String {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.unwrap_or_else(|_| "unknown".to_string())
}
fn render_handle_state(state: &AiHandleState) -> String {
match state {
AiHandleState::Running => "running".to_string(),
AiHandleState::Done { exit_code } => format!("done exit={exit_code}"),
AiHandleState::Failed { exit_code, reason } => {
format!("failed exit={exit_code} reason={reason}")
}
AiHandleState::Unknown => "unknown".to_string(),
}
}
fn try_read_output(handle: &str) -> netsky_core::Result<Option<netsky_ai::AiNormalized>> {
match read_handle_output(handle) {
Ok(output) => Ok(output),
Err(Error::AiJsonParse { .. }) => Ok(None),
Err(error) => Err(error),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_skill_csv_flags_in_order() {
let skills = parse_skills(&["docker,duckdb".to_string(), "notes".to_string()]);
assert_eq!(skills, vec!["docker", "duckdb", "notes"]);
}
#[test]
fn handle_dir_uses_state_ai_subdir() {
let dir = ai_handle_dir("abc");
assert!(dir.ends_with(".netsky/state/ai/abc"));
}
#[test]
fn renders_terminal_handle_states() {
assert_eq!(render_handle_state(&AiHandleState::Running), "running");
assert_eq!(
render_handle_state(&AiHandleState::Done { exit_code: 0 }),
"done exit=0"
);
assert_eq!(
render_handle_state(&AiHandleState::Failed {
exit_code: 9,
reason: "boom".to_string(),
}),
"failed exit=9 reason=boom"
);
assert_eq!(render_handle_state(&AiHandleState::Unknown), "unknown");
}
}