1use std::collections::BTreeMap;
39use std::io::{Read, Write};
40use std::path::PathBuf;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use std::sync::{Arc, LazyLock, Mutex, OnceLock};
43use std::time::Duration;
44
45use harn_vm::VmValue;
46
47use crate::error::HostlibError;
48use crate::process::{self as process_handle, ProcessHandle, ProcessKiller, SpawnSpec};
49use crate::tools::proc::{self, CaptureConfig, CommandStatus, EnvMode};
50
51static HANDLE_COUNTER: AtomicU64 = AtomicU64::new(1);
53
54struct CancelState {
56 cancelled: AtomicBool,
59 timed_out: AtomicBool,
63}
64
65#[derive(Default)]
66struct OutputState {
67 stdout: Vec<u8>,
68 stderr: Vec<u8>,
69}
70
71struct HandleEntry {
73 handle: Option<Box<dyn ProcessHandle>>,
75 killer: Arc<dyn ProcessKiller>,
77 session_id: String,
78 cancel_state: Arc<CancelState>,
80 completion_tx: Option<std::sync::mpsc::SyncSender<()>>,
84 result_tx: Option<std::sync::mpsc::SyncSender<VmValue>>,
87}
88
89#[derive(Default)]
90struct HandleStore {
91 entries: BTreeMap<String, HandleEntry>,
92}
93
94static HANDLE_STORE: LazyLock<Mutex<HandleStore>> =
95 LazyLock::new(|| Mutex::new(HandleStore::default()));
96
97pub struct LongRunningHandleInfo {
100 pub command_id: String,
102 pub handle_id: String,
104 pub started_at: String,
106 pub pid: u32,
108 pub process_group_id: Option<u32>,
110 pub command_display: String,
112}
113
114pub(crate) struct LongRunningSpawnOptions {
115 pub(crate) env_mode: EnvMode,
116 pub(crate) capture: CaptureConfig,
117 pub(crate) session_id: String,
118 pub(crate) progress_interval: Option<Duration>,
119 pub(crate) progress_max_inline_bytes: usize,
120}
121
122struct WaiterContext {
123 command_id: String,
124 handle_id: String,
125 session_id: String,
126 started_at: String,
127 process_group_id: Option<u32>,
128 command_display: String,
129 progress_interval: Option<Duration>,
130 progress_max_inline_bytes: usize,
131}
132
133struct ProgressThreadContext {
134 command_id: String,
135 handle_id: String,
136 session_id: String,
137 started_at: String,
138 command_display: String,
139 process_group_id: Option<u32>,
140 output_path: PathBuf,
141 stdout_path: PathBuf,
142 stderr_path: PathBuf,
143 output_state: Arc<Mutex<OutputState>>,
144 cancel_state: Arc<CancelState>,
145 done: Arc<AtomicBool>,
146 started: std::time::Instant,
147 interval: Duration,
148 max_inline_bytes: usize,
149}
150
151impl LongRunningHandleInfo {
152 pub fn into_handle_response(self) -> VmValue {
154 proc::running_response(
155 self.command_id,
156 self.handle_id,
157 self.pid,
158 self.process_group_id,
159 self.started_at,
160 self.command_display,
161 )
162 }
163}
164
165pub fn spawn_long_running(
171 builtin: &'static str,
172 program: String,
173 args: Vec<String>,
174 cwd: Option<PathBuf>,
175 env: BTreeMap<String, String>,
176 session_id: String,
177) -> Result<LongRunningHandleInfo, HostlibError> {
178 spawn_long_running_with_options(
179 builtin,
180 program,
181 args,
182 cwd,
183 env,
184 LongRunningSpawnOptions {
185 env_mode: EnvMode::InheritClean,
186 capture: CaptureConfig::default(),
187 session_id,
188 progress_interval: None,
189 progress_max_inline_bytes: CaptureConfig::default().max_inline_bytes,
190 },
191 )
192}
193
194pub(crate) fn spawn_long_running_with_options(
195 builtin: &'static str,
196 program: String,
197 args: Vec<String>,
198 cwd: Option<PathBuf>,
199 env: BTreeMap<String, String>,
200 options: LongRunningSpawnOptions,
201) -> Result<LongRunningHandleInfo, HostlibError> {
202 let spec = SpawnSpec {
203 builtin,
204 program: program.clone(),
205 args: args.clone(),
206 cwd,
207 env,
208 env_mode: options.env_mode,
209 use_stdin: false,
210 configure_process_group: true,
211 };
212 let handle = process_handle::spawn_process(spec)
213 .map_err(|e| proc::process_error_to_hostlib(builtin, e))?;
214
215 let pid = handle.pid().unwrap_or(0);
216 let process_group_id = handle.process_group_id();
217 let killer = handle.killer();
218 let id = HANDLE_COUNTER.fetch_add(1, Ordering::SeqCst);
219 let handle_id = format!("hto-{:x}-{id}", std::process::id());
220 let command_id = proc::next_command_id();
221 let started_at = proc::now_rfc3339();
222
223 let mut all_argv = vec![program];
224 all_argv.extend(args.iter().cloned());
225 let command_display = all_argv.join(" ");
226
227 let cancel_state = Arc::new(CancelState {
228 cancelled: AtomicBool::new(false),
229 timed_out: AtomicBool::new(false),
230 });
231
232 {
233 let mut store = HANDLE_STORE
234 .lock()
235 .expect("long-running handle store poisoned");
236 store.entries.insert(
237 handle_id.clone(),
238 HandleEntry {
239 handle: Some(handle),
240 killer,
241 session_id: options.session_id.clone(),
242 cancel_state: cancel_state.clone(),
243 completion_tx: None,
244 result_tx: None,
245 },
246 );
247 }
248
249 let waiter_context = WaiterContext {
250 command_id: command_id.clone(),
251 handle_id: handle_id.clone(),
252 session_id: options.session_id,
253 started_at: started_at.clone(),
254 process_group_id,
255 command_display: command_display.clone(),
256 progress_interval: options.progress_interval,
257 progress_max_inline_bytes: options.progress_max_inline_bytes,
258 };
259 let waiter_thread_name = waiter_context.handle_id.clone();
260 let capture = options.capture;
261 std::thread::Builder::new()
262 .name(format!("hto-waiter-{waiter_thread_name}"))
263 .spawn(move || {
264 waiter_thread(waiter_context, cancel_state, capture);
265 })
266 .map_err(|e| HostlibError::Backend {
267 builtin,
268 message: format!("failed to spawn waiter thread: {e}"),
269 })?;
270
271 Ok(LongRunningHandleInfo {
272 command_id,
273 handle_id,
274 started_at,
275 pid,
276 process_group_id,
277 command_display,
278 })
279}
280
281fn waiter_thread(context: WaiterContext, cancel_state: Arc<CancelState>, capture: CaptureConfig) {
283 let waiter_start = std::time::Instant::now();
284
285 let mut handle = {
288 let mut store = HANDLE_STORE
289 .lock()
290 .expect("long-running handle store poisoned");
291 match store.entries.get_mut(&context.handle_id) {
292 Some(entry) => match entry.handle.take() {
293 Some(h) => h,
294 None => return, },
296 None => return, }
298 };
299
300 let output_state = Arc::new(Mutex::new(OutputState::default()));
301 let done = Arc::new(AtomicBool::new(false));
302 let planned = proc::planned_artifact_paths(&context.command_id);
303 if let Some(parent) = planned.output_path.parent() {
304 let _ = std::fs::create_dir_all(parent);
305 }
306 let _ = std::fs::File::create(&planned.stdout_path);
307 let _ = std::fs::File::create(&planned.stderr_path);
308 let combined_file = std::fs::File::create(&planned.output_path)
309 .ok()
310 .map(|file| Arc::new(Mutex::new(file)));
311
312 let stdout_thread = handle.take_stdout().map(|out| {
313 spawn_output_drain(
314 out,
315 output_state.clone(),
316 planned.stdout_path.clone(),
317 combined_file.clone(),
318 true,
319 )
320 });
321 let stderr_thread = handle.take_stderr().map(|err| {
322 spawn_output_drain(
323 err,
324 output_state.clone(),
325 planned.stderr_path.clone(),
326 combined_file.clone(),
327 false,
328 )
329 });
330
331 let progress_thread = context
332 .progress_interval
333 .filter(|interval| !interval.is_zero())
334 .map(|interval| {
335 spawn_progress_thread(ProgressThreadContext {
336 command_id: context.command_id.clone(),
337 handle_id: context.handle_id.clone(),
338 session_id: context.session_id.clone(),
339 started_at: context.started_at.clone(),
340 command_display: context.command_display.clone(),
341 process_group_id: context.process_group_id,
342 output_path: planned.output_path.clone(),
343 stdout_path: planned.stdout_path.clone(),
344 stderr_path: planned.stderr_path.clone(),
345 output_state: output_state.clone(),
346 cancel_state: cancel_state.clone(),
347 done: done.clone(),
348 started: waiter_start,
349 interval,
350 max_inline_bytes: context.progress_max_inline_bytes,
351 })
352 });
353
354 let status = handle.wait().ok();
355
356 if let Some(thread) = stdout_thread {
357 let _ = thread.join();
358 }
359 if let Some(thread) = stderr_thread {
360 let _ = thread.join();
361 }
362 done.store(true, Ordering::Release);
363 drop(progress_thread);
364 let (stdout, stderr) = {
365 let state = output_state
366 .lock()
367 .unwrap_or_else(|poison| poison.into_inner());
368 (state.stdout.clone(), state.stderr.clone())
369 };
370
371 let (completion_tx, result_tx) = {
374 let mut store = HANDLE_STORE
375 .lock()
376 .expect("long-running handle store poisoned");
377 let entry = store
378 .entries
379 .remove(&context.handle_id)
380 .map(|mut e| (e.completion_tx.take(), e.result_tx.take()));
381 entry.unwrap_or((None, None))
382 };
383
384 let signal_done = move || {
385 if let Some(tx) = completion_tx {
386 let _ = tx.try_send(());
387 }
388 };
389
390 let cancelled = cancel_state.cancelled.load(Ordering::Acquire);
391 let timed_out = cancelled && cancel_state.timed_out.load(Ordering::Acquire);
392
393 let (exit_code, signal_name) = match status {
394 Some(s) => decode_exit_status(s),
395 None => (-1, Some("SIGKILL".to_string())),
397 };
398 let command_status = if timed_out {
399 CommandStatus::TimedOut
400 } else if cancelled {
401 CommandStatus::Killed
402 } else {
403 CommandStatus::Completed
404 };
405 let duration = waiter_start.elapsed();
406 let duration_ms = duration.as_millis() as i64;
407 let artifacts = match proc::persist_artifacts(
408 &context.command_id,
409 &stdout,
410 &stderr,
411 Some(&context.handle_id),
412 ) {
413 Ok(artifacts) => artifacts,
414 Err(_) => return,
415 };
416 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
417
418 let mut payload = serde_json::Map::new();
419 payload.insert(
420 "command_id".into(),
421 serde_json::Value::String(context.command_id.clone()),
422 );
423 payload.insert(
424 "status".into(),
425 serde_json::Value::String(command_status.as_str().to_string()),
426 );
427 payload.insert(
428 "handle_id".into(),
429 serde_json::Value::String(context.handle_id),
430 );
431 payload.insert(
432 "command_or_op_descriptor".into(),
433 serde_json::Value::String(context.command_display),
434 );
435 payload.insert(
436 "started_at".into(),
437 serde_json::Value::String(context.started_at),
438 );
439 payload.insert(
440 "ended_at".into(),
441 serde_json::Value::String(proc::now_rfc3339()),
442 );
443 payload.insert(
444 "duration_ms".into(),
445 serde_json::Value::Number(duration_ms.into()),
446 );
447 payload.insert(
448 "exit_code".into(),
449 serde_json::Value::Number(exit_code.into()),
450 );
451 payload.insert("timed_out".into(), serde_json::Value::Bool(timed_out));
452 payload.insert("stdout".into(), serde_json::Value::String(inline_stdout));
453 payload.insert("stderr".into(), serde_json::Value::String(inline_stderr));
454 payload.insert(
455 "output_path".into(),
456 serde_json::Value::String(artifacts.output_path.display().to_string()),
457 );
458 payload.insert(
459 "stdout_path".into(),
460 serde_json::Value::String(artifacts.stdout_path.display().to_string()),
461 );
462 payload.insert(
463 "stderr_path".into(),
464 serde_json::Value::String(artifacts.stderr_path.display().to_string()),
465 );
466 payload.insert(
467 "line_count".into(),
468 serde_json::Value::Number(artifacts.line_count.into()),
469 );
470 payload.insert(
471 "byte_count".into(),
472 serde_json::Value::Number(artifacts.byte_count.into()),
473 );
474 payload.insert(
475 "output_sha256".into(),
476 serde_json::Value::String(artifacts.output_sha256),
477 );
478 if let Some(pgid) = context.process_group_id {
479 payload.insert(
480 "process_group_id".into(),
481 serde_json::Value::Number((pgid as u64).into()),
482 );
483 }
484 if let Some(sig) = signal_name {
485 payload.insert("signal".into(), serde_json::Value::String(sig));
486 } else {
487 payload.insert("signal".into(), serde_json::Value::Null);
488 }
489
490 if let Some(tx) = result_tx {
491 let value = serde_json::Value::Object(payload.clone());
492 let _ = tx.try_send(harn_vm::json_to_vm_value(&value));
493 }
494 if !cancelled {
495 let content = serde_json::to_string(&payload).unwrap_or_default();
496 harn_vm::orchestration::agent_inbox::push(
497 &context.session_id,
498 "tool_result",
499 &content,
500 "hostlib.long_running.exit",
501 );
502 }
503 signal_done();
504}
505
506fn spawn_output_drain(
507 mut reader: Box<dyn Read + Send>,
508 state: Arc<Mutex<OutputState>>,
509 path: std::path::PathBuf,
510 combined_file: Option<Arc<Mutex<std::fs::File>>>,
511 stdout: bool,
512) -> std::thread::JoinHandle<()> {
513 std::thread::spawn(move || {
514 let mut file = std::fs::File::create(path).ok();
515 let mut buf = [0_u8; 8192];
516 loop {
517 let read = match reader.read(&mut buf) {
518 Ok(0) => break,
519 Ok(read) => read,
520 Err(_) => break,
521 };
522 let chunk = &buf[..read];
523 if let Some(file) = file.as_mut() {
524 let _ = file.write_all(chunk);
525 }
526 if let Some(combined) = combined_file.as_ref() {
527 if let Ok(mut combined) = combined.lock() {
528 let _ = combined.write_all(chunk);
529 }
530 }
531 if let Ok(mut state) = state.lock() {
532 if stdout {
533 state.stdout.extend_from_slice(chunk);
534 } else {
535 state.stderr.extend_from_slice(chunk);
536 }
537 }
538 }
539 })
540}
541
542fn spawn_progress_thread(context: ProgressThreadContext) -> std::thread::JoinHandle<()> {
543 std::thread::spawn(move || {
544 while !context.done.load(Ordering::Acquire)
545 && !context.cancel_state.cancelled.load(Ordering::Acquire)
546 {
547 std::thread::sleep(context.interval);
548 if context.done.load(Ordering::Acquire)
549 || context.cancel_state.cancelled.load(Ordering::Acquire)
550 {
551 break;
552 }
553 let (stdout, stderr) = {
554 let state = context
555 .output_state
556 .lock()
557 .unwrap_or_else(|poison| poison.into_inner());
558 (state.stdout.clone(), state.stderr.clone())
559 };
560 let capture = CaptureConfig {
561 max_inline_bytes: context.max_inline_bytes,
562 ..CaptureConfig::default()
563 };
564 let (inline_stdout, inline_stderr) = proc::inline_output(&stdout, &stderr, capture);
565 let byte_count = stdout.len().saturating_add(stderr.len());
566 let payload = serde_json::json!({
567 "command_id": &context.command_id,
568 "handle_id": &context.handle_id,
569 "status": CommandStatus::Running.as_str(),
570 "command_or_op_descriptor": &context.command_display,
571 "started_at": &context.started_at,
572 "ended_at": null,
573 "duration_ms": context.started.elapsed().as_millis() as i64,
574 "exit_code": null,
575 "signal": null,
576 "stdout": inline_stdout,
577 "stderr": inline_stderr,
578 "output_path": context.output_path.display().to_string(),
579 "stdout_path": context.stdout_path.display().to_string(),
580 "stderr_path": context.stderr_path.display().to_string(),
581 "byte_count": byte_count as i64,
582 "line_count": stdout.iter().chain(stderr.iter()).filter(|byte| **byte == b'\n').count() as i64,
583 "process_group_id": context.process_group_id,
584 });
585 harn_vm::orchestration::agent_inbox::push(
586 &context.session_id,
587 "tool_progress",
588 &payload.to_string(),
589 "hostlib.long_running.progress",
590 );
591 }
592 })
593}
594
595pub(crate) struct CancelOptions {
596 pub(crate) timed_out: bool,
597 pub(crate) wait_result: Option<Duration>,
598}
599
600pub(crate) struct CancelOutcome {
601 pub(crate) cancelled: bool,
602 pub(crate) result: Option<VmValue>,
603}
604
605pub fn cancel_handle(handle_id: &str) -> bool {
609 cancel_handle_with_options(
610 handle_id,
611 CancelOptions {
612 timed_out: false,
613 wait_result: None,
614 },
615 )
616 .cancelled
617}
618
619pub(crate) fn cancel_handle_with_options(handle_id: &str, options: CancelOptions) -> CancelOutcome {
620 let (killer, cancel_state, result_rx) = {
621 let mut store = HANDLE_STORE
622 .lock()
623 .expect("long-running handle store poisoned");
624 let Some(entry) = store.entries.get_mut(handle_id) else {
625 return CancelOutcome {
626 cancelled: false,
627 result: None,
628 };
629 };
630 if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
631 return CancelOutcome {
632 cancelled: false,
633 result: None,
634 };
635 }
636 entry
637 .cancel_state
638 .timed_out
639 .store(options.timed_out, Ordering::Release);
640 let result_rx = options.wait_result.map(|_| {
641 let (tx, rx) = std::sync::mpsc::sync_channel::<VmValue>(1);
642 entry.result_tx = Some(tx);
643 rx
644 });
645 (entry.killer.clone(), entry.cancel_state.clone(), result_rx)
646 };
647 do_kill(killer, cancel_state);
648 let result = match (options.wait_result, result_rx) {
649 (Some(timeout), Some(rx)) => rx.recv_timeout(timeout).ok(),
650 _ => None,
651 };
652 CancelOutcome {
653 cancelled: true,
654 result,
655 }
656}
657
658type SessionKillEntry = (Arc<dyn ProcessKiller>, Arc<CancelState>);
662
663pub fn cancel_session_handles(session_id: &str) {
666 let to_kill: Vec<SessionKillEntry> = {
667 let store = HANDLE_STORE
668 .lock()
669 .expect("long-running handle store poisoned");
670 let matching: Vec<String> = store
671 .entries
672 .iter()
673 .filter(|(_, e)| e.session_id == session_id)
674 .map(|(id, _)| id.clone())
675 .collect();
676 matching
677 .into_iter()
678 .filter_map(|id| {
679 let entry = store.entries.get(&id)?;
680 if entry.cancel_state.cancelled.swap(true, Ordering::AcqRel) {
681 return None;
682 }
683 entry.cancel_state.timed_out.store(false, Ordering::Release);
684 Some((entry.killer.clone(), entry.cancel_state.clone()))
685 })
686 .collect()
687 };
688 for (killer, cancel_state) in to_kill {
689 do_kill(killer, cancel_state);
690 }
691}
692
693fn do_kill(killer: Arc<dyn ProcessKiller>, cancel_state: Arc<CancelState>) {
696 killer.kill();
699 cancel_state.cancelled.store(true, Ordering::Release);
700}
701
702pub(crate) fn register_cleanup_hook() {
706 static REGISTERED: OnceLock<()> = OnceLock::new();
707 REGISTERED.get_or_init(|| {
708 let hook: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|session_id: &str| {
709 cancel_session_handles(session_id);
710 });
711 harn_vm::register_session_end_hook(hook);
712 });
713}
714
715fn decode_exit_status(status: process_handle::ExitStatus) -> (i32, Option<String>) {
716 if let Some(code) = status.code {
717 return (code, None);
718 }
719 if let Some(sig) = status.signal {
720 return (-1, Some(format!("SIG{sig}")));
721 }
722 (-1, None)
723}
724
725pub fn register_completion_notifier(handle_id: &str) -> Option<std::sync::mpsc::Receiver<()>> {
731 let (tx, rx) = std::sync::mpsc::sync_channel::<()>(1);
732 let mut store = HANDLE_STORE
733 .lock()
734 .expect("long-running handle store poisoned");
735 let entry = store.entries.get_mut(handle_id)?;
736 entry.completion_tx = Some(tx);
737 Some(rx)
738}