1use anyhow::Result;
10use axum::{
11 Json, Router,
12 extract::{Path, Request, State},
13 http::StatusCode,
14 response::{IntoResponse, Response as AxumResponse},
15 routing::{get, post},
16};
17use serde::Deserialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::jobstore::{JobDir, JobNotFound, resolve_root};
22use crate::schema::{
23 JobMeta, JobMetaJob, KillData, Response, RunData, SCHEMA_VERSION, StatusData, TailData,
24 WaitData,
25};
26
27pub struct ServeOpts {
29 pub bind: String,
30 pub root: Option<String>,
31}
32
33#[derive(Clone)]
34struct AppState {
35 root: Option<String>,
36}
37
38pub fn execute(opts: ServeOpts) -> Result<()> {
40 let rt = tokio::runtime::Runtime::new()?;
41 rt.block_on(async_main(opts))
42}
43
44async fn async_main(opts: ServeOpts) -> Result<()> {
45 let state = Arc::new(AppState { root: opts.root });
46
47 let router = Router::new()
48 .route("/health", get(health_handler))
49 .route("/exec", post(exec_handler))
50 .route("/status/{id}", get(status_handler))
51 .route("/tail/{id}", get(tail_handler))
52 .route("/wait/{id}", get(wait_handler))
53 .route("/kill/{id}", post(kill_handler))
54 .with_state(state);
55
56 let addr: std::net::SocketAddr = opts
57 .bind
58 .parse()
59 .map_err(|e| anyhow::anyhow!("invalid bind address '{}': {e}", opts.bind))?;
60
61 tracing::info!("serve listening on {addr}");
62 let listener = tokio::net::TcpListener::bind(addr).await?;
63 axum::serve(listener, router).await?;
64 Ok(())
65}
66
67fn error_json(code: &str, message: &str) -> serde_json::Value {
70 serde_json::json!({
71 "schema_version": SCHEMA_VERSION,
72 "ok": false,
73 "type": "error",
74 "error": {
75 "code": code,
76 "message": message,
77 "retryable": false
78 }
79 })
80}
81
82fn err_resp(status: StatusCode, code: &str, message: &str) -> AxumResponse {
83 (status, Json(error_json(code, message))).into_response()
84}
85
86fn map_err_to_response(e: anyhow::Error) -> AxumResponse {
87 if e.downcast_ref::<JobNotFound>().is_some() {
88 err_resp(StatusCode::NOT_FOUND, "job_not_found", &format!("{e:#}"))
89 } else if e
90 .downcast_ref::<crate::jobstore::InvalidJobState>()
91 .is_some()
92 {
93 err_resp(StatusCode::BAD_REQUEST, "invalid_state", &format!("{e:#}"))
94 } else {
95 err_resp(
96 StatusCode::INTERNAL_SERVER_ERROR,
97 "internal_error",
98 &format!("{e:#}"),
99 )
100 }
101}
102
103async fn health_handler() -> impl IntoResponse {
106 let resp = serde_json::json!({
107 "schema_version": SCHEMA_VERSION,
108 "ok": true,
109 "type": "health"
110 });
111 (StatusCode::OK, Json(resp))
112}
113
114#[derive(Deserialize)]
117struct ExecRequest {
118 command: Option<Vec<String>>,
119 cwd: Option<String>,
120 env: Option<HashMap<String, String>>,
121 timeout_ms: Option<u64>,
122 wait: Option<bool>,
123}
124
125async fn exec_handler(State(state): State<Arc<AppState>>, request: Request) -> AxumResponse {
126 let body_bytes = match axum::body::to_bytes(request.into_body(), 1024 * 1024).await {
128 Ok(b) => b,
129 Err(_) => {
130 return err_resp(
131 StatusCode::BAD_REQUEST,
132 "invalid_request",
133 "failed to read request body",
134 );
135 }
136 };
137
138 if body_bytes.is_empty() {
139 return err_resp(
140 StatusCode::BAD_REQUEST,
141 "invalid_request",
142 "request body is required",
143 );
144 }
145
146 let req: ExecRequest = match serde_json::from_slice(&body_bytes) {
147 Ok(r) => r,
148 Err(e) => {
149 return err_resp(
150 StatusCode::BAD_REQUEST,
151 "invalid_request",
152 &format!("invalid JSON: {e}"),
153 );
154 }
155 };
156
157 let command = match req.command {
158 Some(c) if !c.is_empty() => c,
159 _ => {
160 return err_resp(
161 StatusCode::BAD_REQUEST,
162 "invalid_request",
163 "command field is required and must be non-empty",
164 );
165 }
166 };
167
168 let root_opt = state.root.clone();
169 let env_vars: Vec<String> = req
170 .env
171 .unwrap_or_default()
172 .into_iter()
173 .map(|(k, v)| format!("{k}={v}"))
174 .collect();
175 let cwd = req.cwd;
176 let timeout_ms = req.timeout_ms.unwrap_or(0);
177 let do_wait = req.wait.unwrap_or(false);
178
179 let result = tokio::task::spawn_blocking(move || {
180 run_exec_inner(
181 root_opt.as_deref(),
182 command,
183 cwd.as_deref(),
184 env_vars,
185 timeout_ms,
186 do_wait,
187 )
188 })
189 .await;
190
191 match result {
192 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
193 Ok(Err(e)) => map_err_to_response(e),
194 Err(e) => err_resp(
195 StatusCode::INTERNAL_SERVER_ERROR,
196 "internal_error",
197 &format!("task error: {e}"),
198 ),
199 }
200}
201
202fn run_exec_inner(
204 root: Option<&str>,
205 command: Vec<String>,
206 cwd: Option<&str>,
207 env_vars: Vec<String>,
208 timeout_ms: u64,
209 do_wait: bool,
210) -> Result<serde_json::Value> {
211 use crate::run::{
212 SnapshotWaitOpts, SpawnSupervisorParams, now_rfc3339_pub, pre_create_log_files,
213 resolve_effective_cwd, run_snapshot_wait, spawn_supervisor_process,
214 };
215
216 let elapsed_start = std::time::Instant::now();
217 let resolved_root = resolve_root(root);
218 std::fs::create_dir_all(&resolved_root)
219 .map_err(|e| anyhow::anyhow!("create jobs root: {e}"))?;
220
221 let job_id = ulid::Ulid::new().to_string();
222 let created_at = now_rfc3339_pub();
223 let effective_cwd = resolve_effective_cwd(cwd);
224 let shell_wrapper = crate::config::default_shell_wrapper();
225
226 let env_keys: Vec<String> = env_vars
227 .iter()
228 .map(|kv| kv.split('=').next().unwrap_or(kv).to_string())
229 .collect();
230
231 let meta = JobMeta {
232 job: JobMetaJob { id: job_id.clone() },
233 schema_version: SCHEMA_VERSION.to_string(),
234 command: command.clone(),
235 created_at,
236 root: resolved_root.display().to_string(),
237 env_keys,
238 env_vars: env_vars.clone(),
239 env_vars_runtime: vec![],
240 mask: vec![],
241 cwd: Some(effective_cwd),
242 notification: None,
243 inherit_env: true,
244 env_files: vec![],
245 timeout_ms,
246 kill_after_ms: 0,
247 progress_every_ms: 0,
248 shell_wrapper: Some(shell_wrapper.clone()),
249 tags: vec![],
250 };
251
252 let job_dir = JobDir::create(&resolved_root, &job_id, &meta)?;
253 pre_create_log_files(&job_dir)?;
254
255 spawn_supervisor_process(
256 &job_dir,
257 SpawnSupervisorParams {
258 job_id: job_id.clone(),
259 root: resolved_root.clone(),
260 full_log_path: job_dir.full_log_path().display().to_string(),
261 timeout_ms,
262 kill_after_ms: 0,
263 cwd: cwd.map(|s| s.to_string()),
264 env_vars: env_vars.clone(),
265 env_files: vec![],
266 inherit_env: true,
267 progress_every_ms: 0,
268 notify_command: None,
269 notify_file: None,
270 shell_wrapper: shell_wrapper.clone(),
271 command: command.clone(),
272 },
273 )?;
274
275 let stdout_log_path = job_dir.stdout_path().display().to_string();
276 let stderr_log_path = job_dir.stderr_path().display().to_string();
277
278 let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
279 run_snapshot_wait(
280 &job_dir,
281 &SnapshotWaitOpts {
282 snapshot_after: 0,
283 tail_lines: 50,
284 max_bytes: 65536,
285 wait: do_wait,
286 wait_poll_ms: 200,
287 wait_until_ms: 0,
288 wait_forever: true,
289 },
290 );
291
292 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
293
294 let response = Response::new(
295 "run",
296 RunData {
297 job_id,
298 state: final_state,
299 tags: vec![],
300 env_vars: vec![],
301 snapshot,
302 stdout_log_path,
303 stderr_log_path,
304 waited_ms,
305 elapsed_ms,
306 exit_code: exit_code_opt,
307 finished_at: finished_at_opt,
308 final_snapshot: final_snapshot_opt,
309 },
310 );
311
312 Ok(serde_json::to_value(&response)?)
313}
314
315async fn status_handler(
318 State(state): State<Arc<AppState>>,
319 Path(id): Path<String>,
320) -> AxumResponse {
321 let root_opt = state.root.clone();
322 let result = tokio::task::spawn_blocking(move || {
323 let root = resolve_root(root_opt.as_deref());
324 let job_dir = JobDir::open(&root, &id)?;
325 let meta = job_dir.read_meta()?;
326 let st = job_dir.read_state()?;
327 let response = Response::new(
328 "status",
329 StatusData {
330 job_id: id,
331 state: st.status().as_str().to_string(),
332 exit_code: st.exit_code(),
333 created_at: meta.created_at,
334 started_at: st.started_at().map(|s| s.to_string()),
335 finished_at: st.finished_at,
336 },
337 );
338 Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
339 })
340 .await;
341
342 match result {
343 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
344 Ok(Err(e)) => map_err_to_response(e),
345 Err(e) => err_resp(
346 StatusCode::INTERNAL_SERVER_ERROR,
347 "internal_error",
348 &format!("task error: {e}"),
349 ),
350 }
351}
352
353async fn tail_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
356 let root_opt = state.root.clone();
357 let result = tokio::task::spawn_blocking(move || {
358 let root = resolve_root(root_opt.as_deref());
359 let job_dir = JobDir::open(&root, &id)?;
360 let stdout_log_path = job_dir.stdout_path();
361 let stderr_log_path = job_dir.stderr_path();
362 let stdout = job_dir.read_tail_metrics("stdout.log", 50, 65536);
363 let stderr = job_dir.read_tail_metrics("stderr.log", 50, 65536);
364 let response = Response::new(
365 "tail",
366 TailData {
367 job_id: id,
368 stdout_tail: stdout.tail,
369 stderr_tail: stderr.tail,
370 truncated: stdout.truncated || stderr.truncated,
371 encoding: "utf-8-lossy".to_string(),
372 stdout_log_path: stdout_log_path.display().to_string(),
373 stderr_log_path: stderr_log_path.display().to_string(),
374 stdout_observed_bytes: stdout.observed_bytes,
375 stderr_observed_bytes: stderr.observed_bytes,
376 stdout_included_bytes: stdout.included_bytes,
377 stderr_included_bytes: stderr.included_bytes,
378 },
379 );
380 Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
381 })
382 .await;
383
384 match result {
385 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
386 Ok(Err(e)) => map_err_to_response(e),
387 Err(e) => err_resp(
388 StatusCode::INTERNAL_SERVER_ERROR,
389 "internal_error",
390 &format!("task error: {e}"),
391 ),
392 }
393}
394
395async fn wait_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
398 let root_opt = state.root.clone();
399 let result = tokio::task::spawn_blocking(move || {
400 let root = resolve_root(root_opt.as_deref());
401 let job_dir = JobDir::open(&root, &id)?;
402 let poll = std::time::Duration::from_millis(200);
403 loop {
404 let st = job_dir.read_state()?;
405 if !st.status().is_non_terminal() {
406 let response = Response::new(
407 "wait",
408 WaitData {
409 job_id: id,
410 state: st.status().as_str().to_string(),
411 exit_code: st.exit_code(),
412 },
413 );
414 return Ok::<_, anyhow::Error>(serde_json::to_value(&response)?);
415 }
416 std::thread::sleep(poll);
417 }
418 })
419 .await;
420
421 match result {
422 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
423 Ok(Err(e)) => map_err_to_response(e),
424 Err(e) => err_resp(
425 StatusCode::INTERNAL_SERVER_ERROR,
426 "internal_error",
427 &format!("task error: {e}"),
428 ),
429 }
430}
431
432async fn kill_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
435 let root_opt = state.root.clone();
436 let result = tokio::task::spawn_blocking(move || kill_inner(&id, root_opt.as_deref())).await;
437
438 match result {
439 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
440 Ok(Err(e)) => map_err_to_response(e),
441 Err(e) => err_resp(
442 StatusCode::INTERNAL_SERVER_ERROR,
443 "internal_error",
444 &format!("task error: {e}"),
445 ),
446 }
447}
448
449fn kill_inner(job_id: &str, root: Option<&str>) -> Result<serde_json::Value> {
451 use crate::schema::JobStatus;
452
453 let resolved_root = resolve_root(root);
454 let job_dir = JobDir::open(&resolved_root, job_id)?;
455 let st = job_dir.read_state()?;
456
457 let signal = "TERM";
458
459 if *st.status() == JobStatus::Created {
460 return Err(anyhow::Error::new(crate::jobstore::InvalidJobState(
461 format!(
462 "job {} is in 'created' state and has not been started; cannot send signal",
463 job_id
464 ),
465 )));
466 }
467
468 if *st.status() == JobStatus::Running
469 && let Some(pid) = st.pid
470 {
471 send_term(pid);
472 }
473 let response = Response::new(
476 "kill",
477 KillData {
478 job_id: job_id.to_string(),
479 signal: signal.to_string(),
480 },
481 );
482 Ok(serde_json::to_value(&response)?)
483}
484
485#[cfg(unix)]
487fn send_term(pid: u32) {
488 let pgid = -(pid as libc::pid_t);
489 let ret = unsafe { libc::kill(pgid, libc::SIGTERM) };
490 if ret != 0 {
491 unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
493 }
494}
495
496#[cfg(not(unix))]
497fn send_term(_pid: u32) {
498 }