Skip to main content

agent_exec/
serve.rs

1//! Implementation of the `serve` sub-command.
2//!
3//! Starts an HTTP server that exposes job operations as REST endpoints.
4//! Endpoints mirror the existing CLI subcommands.
5//!
6//! Default bind address: `127.0.0.1:19263` (localhost only).
7//! Use `--bind 0.0.0.0:19263` to expose externally (requires network access control).
8
9use anyhow::Result;
10use axum::{
11    Json, Router,
12    extract::{Path, Request, State},
13    http::StatusCode,
14    response::{IntoResponse, Response as AxumResponse},
15    routing::{get, post},
16};
17use serde::Deserialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::jobstore::{JobDir, JobNotFound, resolve_root};
22use crate::schema::{
23    JobMeta, JobMetaJob, KillData, Response, RunData, SCHEMA_VERSION, StatusData, TailData,
24    WaitData,
25};
26
27/// Options for the `serve` sub-command.
28pub struct ServeOpts {
29    pub bind: String,
30    pub root: Option<String>,
31}
32
33#[derive(Clone)]
34struct AppState {
35    root: Option<String>,
36}
37
38/// Execute `serve`: start the HTTP server and block until shutdown.
39pub fn execute(opts: ServeOpts) -> Result<()> {
40    let rt = tokio::runtime::Runtime::new()?;
41    rt.block_on(async_main(opts))
42}
43
44async fn async_main(opts: ServeOpts) -> Result<()> {
45    let state = Arc::new(AppState { root: opts.root });
46
47    let router = Router::new()
48        .route("/health", get(health_handler))
49        .route("/exec", post(exec_handler))
50        .route("/status/{id}", get(status_handler))
51        .route("/tail/{id}", get(tail_handler))
52        .route("/wait/{id}", get(wait_handler))
53        .route("/kill/{id}", post(kill_handler))
54        .with_state(state);
55
56    let addr: std::net::SocketAddr = opts
57        .bind
58        .parse()
59        .map_err(|e| anyhow::anyhow!("invalid bind address '{}': {e}", opts.bind))?;
60
61    tracing::info!("serve listening on {addr}");
62    let listener = tokio::net::TcpListener::bind(addr).await?;
63    axum::serve(listener, router).await?;
64    Ok(())
65}
66
67// ---- Shared error/response helpers ----
68
69fn error_json(code: &str, message: &str) -> serde_json::Value {
70    serde_json::json!({
71        "schema_version": SCHEMA_VERSION,
72        "ok": false,
73        "type": "error",
74        "error": {
75            "code": code,
76            "message": message,
77            "retryable": false
78        }
79    })
80}
81
82fn err_resp(status: StatusCode, code: &str, message: &str) -> AxumResponse {
83    (status, Json(error_json(code, message))).into_response()
84}
85
86fn map_err_to_response(e: anyhow::Error) -> AxumResponse {
87    if e.downcast_ref::<JobNotFound>().is_some() {
88        err_resp(StatusCode::NOT_FOUND, "job_not_found", &format!("{e:#}"))
89    } else if e
90        .downcast_ref::<crate::jobstore::InvalidJobState>()
91        .is_some()
92    {
93        err_resp(StatusCode::BAD_REQUEST, "invalid_state", &format!("{e:#}"))
94    } else {
95        err_resp(
96            StatusCode::INTERNAL_SERVER_ERROR,
97            "internal_error",
98            &format!("{e:#}"),
99        )
100    }
101}
102
103// ---- GET /health ----
104
105async fn health_handler() -> impl IntoResponse {
106    let resp = serde_json::json!({
107        "schema_version": SCHEMA_VERSION,
108        "ok": true,
109        "type": "health"
110    });
111    (StatusCode::OK, Json(resp))
112}
113
114// ---- POST /exec ----
115
116#[derive(Deserialize)]
117struct ExecRequest {
118    command: Option<Vec<String>>,
119    cwd: Option<String>,
120    env: Option<HashMap<String, String>>,
121    timeout_ms: Option<u64>,
122    wait: Option<bool>,
123}
124
125async fn exec_handler(State(state): State<Arc<AppState>>, request: Request) -> AxumResponse {
126    // Read body bytes manually to control error handling.
127    let body_bytes = match axum::body::to_bytes(request.into_body(), 1024 * 1024).await {
128        Ok(b) => b,
129        Err(_) => {
130            return err_resp(
131                StatusCode::BAD_REQUEST,
132                "invalid_request",
133                "failed to read request body",
134            );
135        }
136    };
137
138    if body_bytes.is_empty() {
139        return err_resp(
140            StatusCode::BAD_REQUEST,
141            "invalid_request",
142            "request body is required",
143        );
144    }
145
146    let req: ExecRequest = match serde_json::from_slice(&body_bytes) {
147        Ok(r) => r,
148        Err(e) => {
149            return err_resp(
150                StatusCode::BAD_REQUEST,
151                "invalid_request",
152                &format!("invalid JSON: {e}"),
153            );
154        }
155    };
156
157    let command = match req.command {
158        Some(c) if !c.is_empty() => c,
159        _ => {
160            return err_resp(
161                StatusCode::BAD_REQUEST,
162                "invalid_request",
163                "command field is required and must be non-empty",
164            );
165        }
166    };
167
168    let root_opt = state.root.clone();
169    let env_vars: Vec<String> = req
170        .env
171        .unwrap_or_default()
172        .into_iter()
173        .map(|(k, v)| format!("{k}={v}"))
174        .collect();
175    let cwd = req.cwd;
176    let timeout_ms = req.timeout_ms.unwrap_or(0);
177    let do_wait = req.wait.unwrap_or(false);
178
179    let result = tokio::task::spawn_blocking(move || {
180        run_exec_inner(
181            root_opt.as_deref(),
182            command,
183            cwd.as_deref(),
184            env_vars,
185            timeout_ms,
186            do_wait,
187        )
188    })
189    .await;
190
191    match result {
192        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
193        Ok(Err(e)) => map_err_to_response(e),
194        Err(e) => err_resp(
195            StatusCode::INTERNAL_SERVER_ERROR,
196            "internal_error",
197            &format!("task error: {e}"),
198        ),
199    }
200}
201
202/// Core logic for POST /exec: create and launch a job, return RunData as JSON value.
203fn run_exec_inner(
204    root: Option<&str>,
205    command: Vec<String>,
206    cwd: Option<&str>,
207    env_vars: Vec<String>,
208    timeout_ms: u64,
209    do_wait: bool,
210) -> Result<serde_json::Value> {
211    use crate::run::{
212        SnapshotWaitOpts, SpawnSupervisorParams, now_rfc3339_pub, pre_create_log_files,
213        resolve_effective_cwd, run_snapshot_wait, spawn_supervisor_process,
214    };
215
216    let elapsed_start = std::time::Instant::now();
217    let resolved_root = resolve_root(root);
218    std::fs::create_dir_all(&resolved_root)
219        .map_err(|e| anyhow::anyhow!("create jobs root: {e}"))?;
220
221    let job_id = ulid::Ulid::new().to_string();
222    let created_at = now_rfc3339_pub();
223    let effective_cwd = resolve_effective_cwd(cwd);
224    let shell_wrapper = crate::config::default_shell_wrapper();
225
226    let env_keys: Vec<String> = env_vars
227        .iter()
228        .map(|kv| kv.split('=').next().unwrap_or(kv).to_string())
229        .collect();
230
231    let meta = JobMeta {
232        job: JobMetaJob { id: job_id.clone() },
233        schema_version: SCHEMA_VERSION.to_string(),
234        command: command.clone(),
235        created_at,
236        root: resolved_root.display().to_string(),
237        env_keys,
238        env_vars: env_vars.clone(),
239        env_vars_runtime: vec![],
240        mask: vec![],
241        cwd: Some(effective_cwd),
242        notification: None,
243        inherit_env: true,
244        env_files: vec![],
245        timeout_ms,
246        kill_after_ms: 0,
247        progress_every_ms: 0,
248        shell_wrapper: Some(shell_wrapper.clone()),
249        tags: vec![],
250    };
251
252    let job_dir = JobDir::create(&resolved_root, &job_id, &meta)?;
253    pre_create_log_files(&job_dir)?;
254
255    spawn_supervisor_process(
256        &job_dir,
257        SpawnSupervisorParams {
258            job_id: job_id.clone(),
259            root: resolved_root.clone(),
260            full_log_path: job_dir.full_log_path().display().to_string(),
261            timeout_ms,
262            kill_after_ms: 0,
263            cwd: cwd.map(|s| s.to_string()),
264            env_vars: env_vars.clone(),
265            env_files: vec![],
266            inherit_env: true,
267            progress_every_ms: 0,
268            notify_command: None,
269            notify_file: None,
270            shell_wrapper: shell_wrapper.clone(),
271            command: command.clone(),
272        },
273    )?;
274
275    let stdout_log_path = job_dir.stdout_path().display().to_string();
276    let stderr_log_path = job_dir.stderr_path().display().to_string();
277
278    let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
279        run_snapshot_wait(
280            &job_dir,
281            &SnapshotWaitOpts {
282                snapshot_after: 0,
283                tail_lines: 50,
284                max_bytes: 65536,
285                wait: do_wait,
286                wait_poll_ms: 200,
287                wait_until_ms: 0,
288                wait_forever: true,
289            },
290        );
291
292    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
293
294    let response = Response::new(
295        "run",
296        RunData {
297            job_id,
298            state: final_state,
299            tags: vec![],
300            env_vars: vec![],
301            snapshot,
302            stdout_log_path,
303            stderr_log_path,
304            waited_ms,
305            elapsed_ms,
306            exit_code: exit_code_opt,
307            finished_at: finished_at_opt,
308            final_snapshot: final_snapshot_opt,
309        },
310    );
311
312    Ok(serde_json::to_value(&response)?)
313}
314
315// ---- GET /status/:id ----
316
317async fn status_handler(
318    State(state): State<Arc<AppState>>,
319    Path(id): Path<String>,
320) -> AxumResponse {
321    let root_opt = state.root.clone();
322    let result = tokio::task::spawn_blocking(move || {
323        let root = resolve_root(root_opt.as_deref());
324        let job_dir = JobDir::open(&root, &id)?;
325        let meta = job_dir.read_meta()?;
326        let st = job_dir.read_state()?;
327        let response = Response::new(
328            "status",
329            StatusData {
330                job_id: id,
331                state: st.status().as_str().to_string(),
332                exit_code: st.exit_code(),
333                created_at: meta.created_at,
334                started_at: st.started_at().map(|s| s.to_string()),
335                finished_at: st.finished_at,
336            },
337        );
338        Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
339    })
340    .await;
341
342    match result {
343        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
344        Ok(Err(e)) => map_err_to_response(e),
345        Err(e) => err_resp(
346            StatusCode::INTERNAL_SERVER_ERROR,
347            "internal_error",
348            &format!("task error: {e}"),
349        ),
350    }
351}
352
353// ---- GET /tail/:id ----
354
355async fn tail_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
356    let root_opt = state.root.clone();
357    let result = tokio::task::spawn_blocking(move || {
358        let root = resolve_root(root_opt.as_deref());
359        let job_dir = JobDir::open(&root, &id)?;
360        let stdout_log_path = job_dir.stdout_path();
361        let stderr_log_path = job_dir.stderr_path();
362        let stdout = job_dir.read_tail_metrics("stdout.log", 50, 65536);
363        let stderr = job_dir.read_tail_metrics("stderr.log", 50, 65536);
364        let response = Response::new(
365            "tail",
366            TailData {
367                job_id: id,
368                stdout_tail: stdout.tail,
369                stderr_tail: stderr.tail,
370                truncated: stdout.truncated || stderr.truncated,
371                encoding: "utf-8-lossy".to_string(),
372                stdout_log_path: stdout_log_path.display().to_string(),
373                stderr_log_path: stderr_log_path.display().to_string(),
374                stdout_observed_bytes: stdout.observed_bytes,
375                stderr_observed_bytes: stderr.observed_bytes,
376                stdout_included_bytes: stdout.included_bytes,
377                stderr_included_bytes: stderr.included_bytes,
378            },
379        );
380        Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
381    })
382    .await;
383
384    match result {
385        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
386        Ok(Err(e)) => map_err_to_response(e),
387        Err(e) => err_resp(
388            StatusCode::INTERNAL_SERVER_ERROR,
389            "internal_error",
390            &format!("task error: {e}"),
391        ),
392    }
393}
394
395// ---- GET /wait/:id ----
396
397async fn wait_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
398    let root_opt = state.root.clone();
399    let result = tokio::task::spawn_blocking(move || {
400        let root = resolve_root(root_opt.as_deref());
401        let job_dir = JobDir::open(&root, &id)?;
402        let poll = std::time::Duration::from_millis(200);
403        loop {
404            let st = job_dir.read_state()?;
405            if !st.status().is_non_terminal() {
406                let response = Response::new(
407                    "wait",
408                    WaitData {
409                        job_id: id,
410                        state: st.status().as_str().to_string(),
411                        exit_code: st.exit_code(),
412                    },
413                );
414                return Ok::<_, anyhow::Error>(serde_json::to_value(&response)?);
415            }
416            std::thread::sleep(poll);
417        }
418    })
419    .await;
420
421    match result {
422        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
423        Ok(Err(e)) => map_err_to_response(e),
424        Err(e) => err_resp(
425            StatusCode::INTERNAL_SERVER_ERROR,
426            "internal_error",
427            &format!("task error: {e}"),
428        ),
429    }
430}
431
432// ---- POST /kill/:id ----
433
434async fn kill_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
435    let root_opt = state.root.clone();
436    let result = tokio::task::spawn_blocking(move || kill_inner(&id, root_opt.as_deref())).await;
437
438    match result {
439        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
440        Ok(Err(e)) => map_err_to_response(e),
441        Err(e) => err_resp(
442            StatusCode::INTERNAL_SERVER_ERROR,
443            "internal_error",
444            &format!("task error: {e}"),
445        ),
446    }
447}
448
449/// Core logic for POST /kill/:id: send SIGTERM to the job process.
450fn kill_inner(job_id: &str, root: Option<&str>) -> Result<serde_json::Value> {
451    use crate::schema::JobStatus;
452
453    let resolved_root = resolve_root(root);
454    let job_dir = JobDir::open(&resolved_root, job_id)?;
455    let st = job_dir.read_state()?;
456
457    let signal = "TERM";
458
459    if *st.status() == JobStatus::Created {
460        return Err(anyhow::Error::new(crate::jobstore::InvalidJobState(
461            format!(
462                "job {} is in 'created' state and has not been started; cannot send signal",
463                job_id
464            ),
465        )));
466    }
467
468    if *st.status() == JobStatus::Running
469        && let Some(pid) = st.pid
470    {
471        send_term(pid);
472    }
473    // If already in a terminal state, it's a no-op (signal ignored gracefully).
474
475    let response = Response::new(
476        "kill",
477        KillData {
478            job_id: job_id.to_string(),
479            signal: signal.to_string(),
480        },
481    );
482    Ok(serde_json::to_value(&response)?)
483}
484
485/// Send SIGTERM to the process group, falling back to the single process.
486#[cfg(unix)]
487fn send_term(pid: u32) {
488    let pgid = -(pid as libc::pid_t);
489    let ret = unsafe { libc::kill(pgid, libc::SIGTERM) };
490    if ret != 0 {
491        // Fallback: try single-process kill.
492        unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
493    }
494}
495
496#[cfg(not(unix))]
497fn send_term(_pid: u32) {
498    // Windows: not implemented for serve (Windows kill support is in kill.rs).
499}