1use anyhow::Result;
10use axum::{
11 Json, Router,
12 extract::{Path, Request, State},
13 http::StatusCode,
14 response::{IntoResponse, Response as AxumResponse},
15 routing::{get, post},
16};
17use serde::Deserialize;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::jobstore::{JobDir, JobNotFound, resolve_root};
22use crate::schema::{
23 JobMeta, JobMetaJob, KillData, Response, RunData, SCHEMA_VERSION, StatusData, TailData,
24 WaitData,
25};
26
27pub struct ServeOpts {
29 pub bind: String,
30 pub root: Option<String>,
31}
32
33#[derive(Clone)]
34struct AppState {
35 root: Option<String>,
36}
37
38pub fn execute(opts: ServeOpts) -> Result<()> {
40 let rt = tokio::runtime::Runtime::new()?;
41 rt.block_on(async_main(opts))
42}
43
44async fn async_main(opts: ServeOpts) -> Result<()> {
45 let state = Arc::new(AppState { root: opts.root });
46
47 let router = Router::new()
48 .route("/health", get(health_handler))
49 .route("/exec", post(exec_handler))
50 .route("/status/{id}", get(status_handler))
51 .route("/tail/{id}", get(tail_handler))
52 .route("/wait/{id}", get(wait_handler))
53 .route("/kill/{id}", post(kill_handler))
54 .with_state(state);
55
56 let addr: std::net::SocketAddr = opts
57 .bind
58 .parse()
59 .map_err(|e| anyhow::anyhow!("invalid bind address '{}': {e}", opts.bind))?;
60
61 tracing::info!("serve listening on {addr}");
62 let listener = tokio::net::TcpListener::bind(addr).await?;
63 axum::serve(listener, router).await?;
64 Ok(())
65}
66
67fn error_json(code: &str, message: &str) -> serde_json::Value {
70 serde_json::json!({
71 "schema_version": SCHEMA_VERSION,
72 "ok": false,
73 "type": "error",
74 "error": {
75 "code": code,
76 "message": message,
77 "retryable": false
78 }
79 })
80}
81
82fn err_resp(status: StatusCode, code: &str, message: &str) -> AxumResponse {
83 (status, Json(error_json(code, message))).into_response()
84}
85
86fn map_err_to_response(e: anyhow::Error) -> AxumResponse {
87 if e.downcast_ref::<JobNotFound>().is_some() {
88 err_resp(StatusCode::NOT_FOUND, "job_not_found", &format!("{e:#}"))
89 } else if e
90 .downcast_ref::<crate::jobstore::InvalidJobState>()
91 .is_some()
92 {
93 err_resp(StatusCode::BAD_REQUEST, "invalid_state", &format!("{e:#}"))
94 } else {
95 err_resp(
96 StatusCode::INTERNAL_SERVER_ERROR,
97 "internal_error",
98 &format!("{e:#}"),
99 )
100 }
101}
102
103async fn health_handler() -> impl IntoResponse {
106 let resp = serde_json::json!({
107 "schema_version": SCHEMA_VERSION,
108 "ok": true,
109 "type": "health"
110 });
111 (StatusCode::OK, Json(resp))
112}
113
114#[derive(Deserialize)]
117struct ExecRequest {
118 command: Option<Vec<String>>,
119 cwd: Option<String>,
120 env: Option<HashMap<String, String>>,
121 timeout_ms: Option<u64>,
122 wait: Option<bool>,
123}
124
125async fn exec_handler(State(state): State<Arc<AppState>>, request: Request) -> AxumResponse {
126 let body_bytes = match axum::body::to_bytes(request.into_body(), 1024 * 1024).await {
128 Ok(b) => b,
129 Err(_) => {
130 return err_resp(
131 StatusCode::BAD_REQUEST,
132 "invalid_request",
133 "failed to read request body",
134 );
135 }
136 };
137
138 if body_bytes.is_empty() {
139 return err_resp(
140 StatusCode::BAD_REQUEST,
141 "invalid_request",
142 "request body is required",
143 );
144 }
145
146 let req: ExecRequest = match serde_json::from_slice(&body_bytes) {
147 Ok(r) => r,
148 Err(e) => {
149 return err_resp(
150 StatusCode::BAD_REQUEST,
151 "invalid_request",
152 &format!("invalid JSON: {e}"),
153 );
154 }
155 };
156
157 let command = match req.command {
158 Some(c) if !c.is_empty() => c,
159 _ => {
160 return err_resp(
161 StatusCode::BAD_REQUEST,
162 "invalid_request",
163 "command field is required and must be non-empty",
164 );
165 }
166 };
167
168 let root_opt = state.root.clone();
169 let env_vars: Vec<String> = req
170 .env
171 .unwrap_or_default()
172 .into_iter()
173 .map(|(k, v)| format!("{k}={v}"))
174 .collect();
175 let cwd = req.cwd;
176 let timeout_ms = req.timeout_ms.unwrap_or(0);
177 let do_wait = req.wait.unwrap_or(false);
178
179 let result = tokio::task::spawn_blocking(move || {
180 run_exec_inner(
181 root_opt.as_deref(),
182 command,
183 cwd.as_deref(),
184 env_vars,
185 timeout_ms,
186 do_wait,
187 )
188 })
189 .await;
190
191 match result {
192 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
193 Ok(Err(e)) => map_err_to_response(e),
194 Err(e) => err_resp(
195 StatusCode::INTERNAL_SERVER_ERROR,
196 "internal_error",
197 &format!("task error: {e}"),
198 ),
199 }
200}
201
202fn run_exec_inner(
204 root: Option<&str>,
205 command: Vec<String>,
206 cwd: Option<&str>,
207 env_vars: Vec<String>,
208 timeout_ms: u64,
209 do_wait: bool,
210) -> Result<serde_json::Value> {
211 use crate::run::{
212 SnapshotWaitOpts, SpawnSupervisorParams, now_rfc3339_pub, pre_create_log_files,
213 resolve_effective_cwd, run_snapshot_wait, spawn_supervisor_process,
214 };
215
216 let elapsed_start = std::time::Instant::now();
217 let resolved_root = resolve_root(root);
218 std::fs::create_dir_all(&resolved_root)
219 .map_err(|e| anyhow::anyhow!("create jobs root: {e}"))?;
220
221 let job_id = ulid::Ulid::new().to_string();
222 let created_at = now_rfc3339_pub();
223 let effective_cwd = resolve_effective_cwd(cwd);
224 let shell_wrapper = crate::config::default_shell_wrapper();
225
226 let env_keys: Vec<String> = env_vars
227 .iter()
228 .map(|kv| kv.split('=').next().unwrap_or(kv).to_string())
229 .collect();
230
231 let meta = JobMeta {
232 job: JobMetaJob { id: job_id.clone() },
233 schema_version: SCHEMA_VERSION.to_string(),
234 command: command.clone(),
235 created_at,
236 root: resolved_root.display().to_string(),
237 env_keys,
238 env_vars: env_vars.clone(),
239 env_vars_runtime: vec![],
240 mask: vec![],
241 cwd: Some(effective_cwd),
242 notification: None,
243 inherit_env: true,
244 env_files: vec![],
245 timeout_ms,
246 kill_after_ms: 0,
247 progress_every_ms: 0,
248 shell_wrapper: Some(shell_wrapper.clone()),
249 tags: vec![],
250 };
251
252 let job_dir = JobDir::create(&resolved_root, &job_id, &meta)?;
253 pre_create_log_files(&job_dir)?;
254
255 spawn_supervisor_process(
256 &job_dir,
257 SpawnSupervisorParams {
258 job_id: job_id.clone(),
259 root: resolved_root.clone(),
260 full_log_path: job_dir.full_log_path().display().to_string(),
261 timeout_ms,
262 kill_after_ms: 0,
263 cwd: cwd.map(|s| s.to_string()),
264 env_vars: env_vars.clone(),
265 env_files: vec![],
266 inherit_env: true,
267 progress_every_ms: 0,
268 notify_command: None,
269 notify_file: None,
270 shell_wrapper: shell_wrapper.clone(),
271 command: command.clone(),
272 },
273 )?;
274
275 let stdout_log_path = job_dir.stdout_path().display().to_string();
276 let stderr_log_path = job_dir.stderr_path().display().to_string();
277
278 let (final_state, exit_code_opt, finished_at_opt, snapshot, final_snapshot_opt, waited_ms) =
279 run_snapshot_wait(
280 &job_dir,
281 &SnapshotWaitOpts {
282 snapshot_after: 0,
283 tail_lines: 50,
284 max_bytes: 65536,
285 wait: do_wait,
286 wait_poll_ms: 200,
287 },
288 );
289
290 let elapsed_ms = elapsed_start.elapsed().as_millis() as u64;
291
292 let response = Response::new(
293 "run",
294 RunData {
295 job_id,
296 state: final_state,
297 tags: vec![],
298 env_vars: vec![],
299 snapshot,
300 stdout_log_path,
301 stderr_log_path,
302 waited_ms,
303 elapsed_ms,
304 exit_code: exit_code_opt,
305 finished_at: finished_at_opt,
306 final_snapshot: final_snapshot_opt,
307 },
308 );
309
310 Ok(serde_json::to_value(&response)?)
311}
312
313async fn status_handler(
316 State(state): State<Arc<AppState>>,
317 Path(id): Path<String>,
318) -> AxumResponse {
319 let root_opt = state.root.clone();
320 let result = tokio::task::spawn_blocking(move || {
321 let root = resolve_root(root_opt.as_deref());
322 let job_dir = JobDir::open(&root, &id)?;
323 let meta = job_dir.read_meta()?;
324 let st = job_dir.read_state()?;
325 let response = Response::new(
326 "status",
327 StatusData {
328 job_id: id,
329 state: st.status().as_str().to_string(),
330 exit_code: st.exit_code(),
331 created_at: meta.created_at,
332 started_at: st.started_at().map(|s| s.to_string()),
333 finished_at: st.finished_at,
334 },
335 );
336 Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
337 })
338 .await;
339
340 match result {
341 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
342 Ok(Err(e)) => map_err_to_response(e),
343 Err(e) => err_resp(
344 StatusCode::INTERNAL_SERVER_ERROR,
345 "internal_error",
346 &format!("task error: {e}"),
347 ),
348 }
349}
350
351async fn tail_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
354 let root_opt = state.root.clone();
355 let result = tokio::task::spawn_blocking(move || {
356 let root = resolve_root(root_opt.as_deref());
357 let job_dir = JobDir::open(&root, &id)?;
358 let stdout_log_path = job_dir.stdout_path();
359 let stderr_log_path = job_dir.stderr_path();
360 let stdout = job_dir.read_tail_metrics("stdout.log", 50, 65536);
361 let stderr = job_dir.read_tail_metrics("stderr.log", 50, 65536);
362 let response = Response::new(
363 "tail",
364 TailData {
365 job_id: id,
366 stdout_tail: stdout.tail,
367 stderr_tail: stderr.tail,
368 truncated: stdout.truncated || stderr.truncated,
369 encoding: "utf-8-lossy".to_string(),
370 stdout_log_path: stdout_log_path.display().to_string(),
371 stderr_log_path: stderr_log_path.display().to_string(),
372 stdout_observed_bytes: stdout.observed_bytes,
373 stderr_observed_bytes: stderr.observed_bytes,
374 stdout_included_bytes: stdout.included_bytes,
375 stderr_included_bytes: stderr.included_bytes,
376 },
377 );
378 Ok::<_, anyhow::Error>(serde_json::to_value(&response)?)
379 })
380 .await;
381
382 match result {
383 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
384 Ok(Err(e)) => map_err_to_response(e),
385 Err(e) => err_resp(
386 StatusCode::INTERNAL_SERVER_ERROR,
387 "internal_error",
388 &format!("task error: {e}"),
389 ),
390 }
391}
392
393async fn wait_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
396 let root_opt = state.root.clone();
397 let result = tokio::task::spawn_blocking(move || {
398 let root = resolve_root(root_opt.as_deref());
399 let job_dir = JobDir::open(&root, &id)?;
400 let poll = std::time::Duration::from_millis(200);
401 loop {
402 let st = job_dir.read_state()?;
403 if !st.status().is_non_terminal() {
404 let response = Response::new(
405 "wait",
406 WaitData {
407 job_id: id,
408 state: st.status().as_str().to_string(),
409 exit_code: st.exit_code(),
410 },
411 );
412 return Ok::<_, anyhow::Error>(serde_json::to_value(&response)?);
413 }
414 std::thread::sleep(poll);
415 }
416 })
417 .await;
418
419 match result {
420 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
421 Ok(Err(e)) => map_err_to_response(e),
422 Err(e) => err_resp(
423 StatusCode::INTERNAL_SERVER_ERROR,
424 "internal_error",
425 &format!("task error: {e}"),
426 ),
427 }
428}
429
430async fn kill_handler(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> AxumResponse {
433 let root_opt = state.root.clone();
434 let result = tokio::task::spawn_blocking(move || kill_inner(&id, root_opt.as_deref())).await;
435
436 match result {
437 Ok(Ok(val)) => (StatusCode::OK, Json(val)).into_response(),
438 Ok(Err(e)) => map_err_to_response(e),
439 Err(e) => err_resp(
440 StatusCode::INTERNAL_SERVER_ERROR,
441 "internal_error",
442 &format!("task error: {e}"),
443 ),
444 }
445}
446
447fn kill_inner(job_id: &str, root: Option<&str>) -> Result<serde_json::Value> {
449 use crate::schema::JobStatus;
450
451 let resolved_root = resolve_root(root);
452 let job_dir = JobDir::open(&resolved_root, job_id)?;
453 let st = job_dir.read_state()?;
454
455 let signal = "TERM";
456
457 if *st.status() == JobStatus::Created {
458 return Err(anyhow::Error::new(crate::jobstore::InvalidJobState(
459 format!(
460 "job {} is in 'created' state and has not been started; cannot send signal",
461 job_id
462 ),
463 )));
464 }
465
466 if *st.status() == JobStatus::Running
467 && let Some(pid) = st.pid
468 {
469 send_term(pid);
470 }
471 let response = Response::new(
474 "kill",
475 KillData {
476 job_id: job_id.to_string(),
477 signal: signal.to_string(),
478 },
479 );
480 Ok(serde_json::to_value(&response)?)
481}
482
483#[cfg(unix)]
485fn send_term(pid: u32) {
486 let pgid = -(pid as libc::pid_t);
487 let ret = unsafe { libc::kill(pgid, libc::SIGTERM) };
488 if ret != 0 {
489 unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
491 }
492}
493
494#[cfg(not(unix))]
495fn send_term(_pid: u32) {
496 }