Skip to main content

agent_exec/
serve.rs

1//! Implementation of the `serve` sub-command.
2//!
3//! Starts an HTTP server that exposes job operations as REST endpoints.
4//! Endpoints mirror the existing CLI subcommands.
5//!
6//! Default bind address: `127.0.0.1:19263` (localhost only).
7//! Use `--bind 0.0.0.0:19263` to expose externally (requires network access control).
8
9use anyhow::Result;
10use axum::{
11    Json, Router,
12    extract::{Path, Request, State},
13    http::StatusCode,
14    response::{IntoResponse, Response as AxumResponse},
15    routing::{get, post},
16};
17use serde::Deserialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::jobstore::{JobDir, JobNotFound, resolve_root};
22use crate::schema::{
23    JobMeta, JobMetaJob, KillData, Response, RunData, SCHEMA_VERSION, StatusData, TailData,
24    WaitData,
25};
26
27/// Options for the `serve` sub-command.
28pub struct ServeOpts {
29    pub bind: String,
30    pub root: Option<String>,
31}
32
33#[derive(Clone)]
34struct AppState {
35    root: Option<String>,
36}
37
38/// Execute `serve`: start the HTTP server and block until shutdown.
39pub fn execute(opts: ServeOpts) -> Result<()> {
40    let rt = tokio::runtime::Runtime::new()?;
41    rt.block_on(async_main(opts))
42}
43
44async fn async_main(opts: ServeOpts) -> Result<()> {
45    let state = Arc::new(AppState { root: opts.root });
46
47    let router = Router::new()
48        .route("/health", get(health_handler))
49        .route("/exec", post(exec_handler))
50        .route("/status/{id}", get(status_handler))
51        .route("/tail/{id}", get(tail_handler))
52        .route("/wait/{id}", get(wait_handler))
53        .route("/kill/{id}", post(kill_handler))
54        .with_state(state);
55
56    let addr: std::net::SocketAddr = opts
57        .bind
58        .parse()
59        .map_err(|e| anyhow::anyhow!("invalid bind address '{}': {e}", opts.bind))?;
60
61    tracing::info!("serve listening on {addr}");
62    let listener = tokio::net::TcpListener::bind(addr).await?;
63    axum::serve(listener, router).await?;
64    Ok(())
65}
66
67// ---- Shared error/response helpers ----
68
69fn error_json(code: &str, message: &str) -> serde_json::Value {
70    serde_json::json!({
71        "schema_version": SCHEMA_VERSION,
72        "ok": false,
73        "type": "error",
74        "error": {
75            "code": code,
76            "message": message,
77            "retryable": false
78        }
79    })
80}
81
82fn err_resp(status: StatusCode, code: &str, message: &str) -> AxumResponse {
83    (status, Json(error_json(code, message))).into_response()
84}
85
86fn map_err_to_response(e: anyhow::Error) -> AxumResponse {
87    if e.downcast_ref::<JobNotFound>().is_some() {
88        err_resp(StatusCode::NOT_FOUND, "job_not_found", &format!("{e:#}"))
89    } else if e
90        .downcast_ref::<crate::jobstore::InvalidJobState>()
91        .is_some()
92    {
93        err_resp(StatusCode::BAD_REQUEST, "invalid_state", &format!("{e:#}"))
94    } else {
95        err_resp(
96            StatusCode::INTERNAL_SERVER_ERROR,
97            "internal_error",
98            &format!("{e:#}"),
99        )
100    }
101}
102
103// ---- GET /health ----
104
105async fn health_handler() -> impl IntoResponse {
106    let resp = serde_json::json!({
107        "schema_version": SCHEMA_VERSION,
108        "ok": true,
109        "type": "health"
110    });
111    (StatusCode::OK, Json(resp))
112}
113
114// ---- POST /exec ----
115
116#[derive(Deserialize)]
117struct ExecRequest {
118    command: Option<Vec<String>>,
119    cwd: Option<String>,
120    env: Option<HashMap<String, String>>,
121    timeout_ms: Option<u64>,
122    wait: Option<bool>,
123}
124
125async fn exec_handler(State(state): State<Arc<AppState>>, request: Request) -> AxumResponse {
126    // Read body bytes manually to control error handling.
127    let body_bytes = match axum::body::to_bytes(request.into_body(), 1024 * 1024).await {
128        Ok(b) => b,
129        Err(_) => {
130            return err_resp(
131                StatusCode::BAD_REQUEST,
132                "invalid_request",
133                "failed to read request body",
134            );
135        }
136    };
137
138    if body_bytes.is_empty() {
139        return err_resp(
140            StatusCode::BAD_REQUEST,
141            "invalid_request",
142            "request body is required",
143        );
144    }
145
146    let req: ExecRequest = match serde_json::from_slice(&body_bytes) {
147        Ok(r) => r,
148        Err(e) => {
149            return err_resp(
150                StatusCode::BAD_REQUEST,
151                "invalid_request",
152                &format!("invalid JSON: {e}"),
153            );
154        }
155    };
156
157    let command = match req.command {
158        Some(c) if !c.is_empty() => c,
159        _ => {
160            return err_resp(
161                StatusCode::BAD_REQUEST,
162                "invalid_request",
163                "command field is required and must be non-empty",
164            );
165        }
166    };
167
168    let root_opt = state.root.clone();
169    let env_vars: Vec<String> = req
170        .env
171        .unwrap_or_default()
172        .into_iter()
173        .map(|(k, v)| format!("{k}={v}"))
174        .collect();
175    let cwd = req.cwd;
176    let timeout_ms = req.timeout_ms.unwrap_or(0);
177    let do_wait = req.wait.unwrap_or(false);
178
179    let result = tokio::task::spawn_blocking(move || {
180        run_exec_inner(
181            root_opt.as_deref(),
182            command,
183            cwd.as_deref(),
184            env_vars,
185            timeout_ms,
186            do_wait,
187        )
188    })
189    .await;
190
191    match result {
192        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
193        Ok(Err(e)) => map_err_to_response(e),
194        Err(e) => err_resp(
195            StatusCode::INTERNAL_SERVER_ERROR,
196            "internal_error",
197            &format!("task error: {e}"),
198        ),
199    }
200}
201
202/// Core logic for POST /exec: create and launch a job, return RunData as JSON value.
203fn run_exec_inner(
204    root: Option<&str>,
205    command: Vec<String>,
206    cwd: Option<&str>,
207    env_vars: Vec<String>,
208    timeout_ms: u64,
209    do_wait: bool,
210) -> Result<serde_json::Value> {
211    use crate::run::{
212        SnapshotWaitOpts, SpawnSupervisorParams, now_rfc3339_pub, pre_create_log_files,
213        resolve_effective_cwd, run_snapshot_wait, spawn_supervisor_process,
214    };
215
216    let elapsed_start = std::time::Instant::now();
217    let resolved_root = resolve_root(root);
218    std::fs::create_dir_all(&resolved_root)
219        .map_err(|e| anyhow::anyhow!("create jobs root: {e}"))?;
220
221    let job_id = ulid::Ulid::new().to_string();
222    let created_at = now_rfc3339_pub();
223    let effective_cwd = resolve_effective_cwd(cwd);
224    let shell_wrapper = crate::config::default_shell_wrapper();
225
226    let env_keys: Vec<String> = env_vars
227        .iter()
228        .map(|kv| kv.split('=').next().unwrap_or(kv).to_string())
229        .collect();
230
231    let meta = JobMeta {
232        job: JobMetaJob { id: job_id.clone() },
233        schema_version: SCHEMA_VERSION.to_string(),
234        command: command.clone(),
235        created_at,
236        root: resolved_root.display().to_string(),
237        env_keys,
238        env_vars: env_vars.clone(),
239        env_vars_runtime: vec![],
240        mask: vec![],
241        cwd: Some(effective_cwd),
242        notification: None,
243        inherit_env: true,
244        env_files: vec![],
245        timeout_ms,
246        kill_after_ms: 0,
247        progress_every_ms: 0,
248        shell_wrapper: Some(shell_wrapper.clone()),
249        tags: vec![],
250    };
251
252    let job_dir = JobDir::create(&resolved_root, &job_id, &meta)?;
253    pre_create_log_files(&job_dir)?;
254
255    spawn_supervisor_process(
256        &job_dir,
257        SpawnSupervisorParams {
258            job_id: job_id.clone(),
259            root: resolved_root.clone(),
260            full_log_path: job_dir.full_log_path().display().to_string(),
261            timeout_ms,
262            kill_after_ms: 0,
263            cwd: cwd.map(|s| s.to_string()),
264            env_vars: env_vars.clone(),
265            env_files: vec![],
266            inherit_env: true,
267            progress_every_ms: 0,
268            notify_command: None,
269            notify_file: None,
270            shell_wrapper: shell_wrapper.clone(),
271            command: command.clone(),
272        },
273    )?;
274
275    let stdout_log_path = job_dir.stdout_path().display().to_string();
276    let stderr_log_path = job_dir.stderr_path().display().to_string();
277
278    let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
279        run_snapshot_wait(
280            &job_dir,
281            &SnapshotWaitOpts {
282                snapshot_after: 0,
283                tail_lines: 50,
284                max_bytes: 65536,
285                wait: do_wait,
286                wait_poll_ms: 200,
287            },
288        );
289
290    let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
291
292    let response = Response::new(
293        "run",
294        RunData {
295            job_id,
296            state: final_state,
297            tags: vec![],
298            env_vars: vec![],
299            snapshot,
300            stdout_log_path,
301            stderr_log_path,
302            waited_ms,
303            elapsed_ms,
304            exit_code: exit_code_opt,
305            finished_at: finished_at_opt,
306            final_snapshot: final_snapshot_opt,
307        },
308    );
309
310    Ok(serde_json::to_value(&response)?)
311}
312
313// ---- GET /status/:id ----
314
315async fn status_handler(
316    State(state): State<Arc<AppState>>,
317    Path(id): Path<String>,
318) -> AxumResponse {
319    let root_opt = state.root.clone();
320    let result = tokio::task::spawn_blocking(move || {
321        let root = resolve_root(root_opt.as_deref());
322        let job_dir = JobDir::open(&root, &id)?;
323        let meta = job_dir.read_meta()?;
324        let st = job_dir.read_state()?;
325        let response = Response::new(
326            "status",
327            StatusData {
328                job_id: id,
329                state: st.status().as_str().to_string(),
330                exit_code: st.exit_code(),
331                created_at: meta.created_at,
332                started_at: st.started_at().map(|s| s.to_string()),
333                finished_at: st.finished_at,
334            },
335        );
336        Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
337    })
338    .await;
339
340    match result {
341        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
342        Ok(Err(e)) => map_err_to_response(e),
343        Err(e) => err_resp(
344            StatusCode::INTERNAL_SERVER_ERROR,
345            "internal_error",
346            &format!("task error: {e}"),
347        ),
348    }
349}
350
351// ---- GET /tail/:id ----
352
353async fn tail_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
354    let root_opt = state.root.clone();
355    let result = tokio::task::spawn_blocking(move || {
356        let root = resolve_root(root_opt.as_deref());
357        let job_dir = JobDir::open(&root, &id)?;
358        let stdout_log_path = job_dir.stdout_path();
359        let stderr_log_path = job_dir.stderr_path();
360        let stdout = job_dir.read_tail_metrics("stdout.log", 50, 65536);
361        let stderr = job_dir.read_tail_metrics("stderr.log", 50, 65536);
362        let response = Response::new(
363            "tail",
364            TailData {
365                job_id: id,
366                stdout_tail: stdout.tail,
367                stderr_tail: stderr.tail,
368                truncated: stdout.truncated || stderr.truncated,
369                encoding: "utf-8-lossy".to_string(),
370                stdout_log_path: stdout_log_path.display().to_string(),
371                stderr_log_path: stderr_log_path.display().to_string(),
372                stdout_observed_bytes: stdout.observed_bytes,
373                stderr_observed_bytes: stderr.observed_bytes,
374                stdout_included_bytes: stdout.included_bytes,
375                stderr_included_bytes: stderr.included_bytes,
376            },
377        );
378        Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
379    })
380    .await;
381
382    match result {
383        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
384        Ok(Err(e)) => map_err_to_response(e),
385        Err(e) => err_resp(
386            StatusCode::INTERNAL_SERVER_ERROR,
387            "internal_error",
388            &format!("task error: {e}"),
389        ),
390    }
391}
392
393// ---- GET /wait/:id ----
394
395async fn wait_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
396    let root_opt = state.root.clone();
397    let result = tokio::task::spawn_blocking(move || {
398        let root = resolve_root(root_opt.as_deref());
399        let job_dir = JobDir::open(&root, &id)?;
400        let poll = std::time::Duration::from_millis(200);
401        loop {
402            let st = job_dir.read_state()?;
403            if !st.status().is_non_terminal() {
404                let response = Response::new(
405                    "wait",
406                    WaitData {
407                        job_id: id,
408                        state: st.status().as_str().to_string(),
409                        exit_code: st.exit_code(),
410                    },
411                );
412                return Ok::<_, anyhow::Error>(serde_json::to_value(&response)?);
413            }
414            std::thread::sleep(poll);
415        }
416    })
417    .await;
418
419    match result {
420        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
421        Ok(Err(e)) => map_err_to_response(e),
422        Err(e) => err_resp(
423            StatusCode::INTERNAL_SERVER_ERROR,
424            "internal_error",
425            &format!("task error: {e}"),
426        ),
427    }
428}
429
430// ---- POST /kill/:id ----
431
432async fn kill_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
433    let root_opt = state.root.clone();
434    let result = tokio::task::spawn_blocking(move || kill_inner(&id, root_opt.as_deref())).await;
435
436    match result {
437        Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
438        Ok(Err(e)) => map_err_to_response(e),
439        Err(e) => err_resp(
440            StatusCode::INTERNAL_SERVER_ERROR,
441            "internal_error",
442            &format!("task error: {e}"),
443        ),
444    }
445}
446
447/// Core logic for POST /kill/:id: send SIGTERM to the job process.
448fn kill_inner(job_id: &str, root: Option<&str>) -> Result<serde_json::Value> {
449    use crate::schema::JobStatus;
450
451    let resolved_root = resolve_root(root);
452    let job_dir = JobDir::open(&resolved_root, job_id)?;
453    let st = job_dir.read_state()?;
454
455    let signal = "TERM";
456
457    if *st.status() == JobStatus::Created {
458        return Err(anyhow::Error::new(crate::jobstore::InvalidJobState(
459            format!(
460                "job {} is in 'created' state and has not been started; cannot send signal",
461                job_id
462            ),
463        )));
464    }
465
466    if *st.status() == JobStatus::Running
467        && let Some(pid) = st.pid
468    {
469        send_term(pid);
470    }
471    // If already in a terminal state, it's a no-op (signal ignored gracefully).
472
473    let response = Response::new(
474        "kill",
475        KillData {
476            job_id: job_id.to_string(),
477            signal: signal.to_string(),
478        },
479    );
480    Ok(serde_json::to_value(&response)?)
481}
482
483/// Send SIGTERM to the process group, falling back to the single process.
484#[cfg(unix)]
485fn send_term(pid: u32) {
486    let pgid = -(pid as libc::pid_t);
487    let ret = unsafe { libc::kill(pgid, libc::SIGTERM) };
488    if ret != 0 {
489        // Fallback: try single-process kill.
490        unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
491    }
492}
493
494#[cfg(not(unix))]
495fn send_term(_pid: u32) {
496    // Windows: not implemented for serve (Windows kill support is in kill.rs).
497}